环境:mysql 目标:亿级数据迁移 最终耗时:1-2小时(服务器更佳),建议在晚上或者没人访问的情况下操作
思路:
1.不能一下将所有数据,导入到目标数据表,耗时太久,且占用资源,所有就用程序批量执行,每次执行一个范围段,比如第一个线程: 1 -1000,第二个线程: 1001-2000,第三个线程:2001-3000这样,当然我这里是为了方便理解,实际这个是要根据自己插入数据的性能,来进行调整的,当前数据库,大概每次插入28w数据,耗时在2-5秒,所以每次也就是,1-280000,2800001-560000这样子执行。
2.我这里插入的目标数据表是有索引的,因为业务需要,如果不加索引,插入速度会更好,但是,查询的话,影响很大,看你的需求,如果不是需要查询的表,可以不加。这里加上的原因就是,如果等数据导入进去,再添加索引,会锁表,并且耗时很久也没执行成功。
public class Test{
public static void main(String[] args) {
//数据库中总记录数
long rows = 2177847407L;
//核心线程数
int kThreads = Runtime.getRuntime().availableProcessors() * 2;
//任务数(这里需要求一下平均每个任务需要执行的任务id大小是多少,实际测试中,20-30w 快则 2秒,慢则3-5秒,这个阈值是比较理想的 也就是 总记录数/任意数 等到想要的平均任务数)
Long talks = (rows / 8000) + 1;
//线程池
ExecutorService threadPool = Executors.newFixedThreadPool(kThreads);
//数据起始位
Long startIndex = 0L;
//数据结束位
Long endIndex = startIndex+talks;
//存放线程执行结果
List<Future> list = new Vector<>();
//执行多少次任务 = 结束位置不小于总记录数
for (Long i = 0l; i < rows; i = endIndex) {
final Future submit = threadPool.submit(new DataThread(startIndex, endIndex, new FaultRealMapper()));
//每次执行完成,开始id+1
startIndex = endIndex+1;
//结束id=开始id+任务数
endIndex = startIndex+talks;
list.add(submit);
}
//打印结果
list.forEach(dx -> {
try {
System.out.println(dx.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
}
//线程操作资源类
class DataThread implements Callable {
private FaultRealMapper faultRealMapper;
private Long startId, endId;
public DataThread(Long startId, Long endId, FaultRealMapper faultRealMapper) {
this.faultRealMapper = faultRealMapper;
this.startId = startId;
this.endId = endId;
System.out.println("startId:" + startId + ",endId:" + endId);
}
@Override
public Object call() throws Exception {
System.out.println("开始执行");
faultRealMapper.insert(startId, endId);
return "执行成功";
}
}
}
Sql:
<insert id="insert">
Insert INTO `newTable` ( `car_id`, `insert_time`)
SELECT `car_id`, `insert_time`
ROM `oldTable`
WHERE id<=#{endId} and id>=#{startId}
</insert>
Mapper.class
package com.test.mapper;
@Mapper
public interface FaultRealMapper {
//插入
void insert(Long startId, Long endId);
}
本期文章就到这里了,我是梦辰,可以微信搜一搜「梦辰的架构笔记 」公众号,保证干货满满!!!欢迎大家和我交流。|