java数据迁移程序

2022-09-18 09:57:56

环境:mysql 目标:亿级数据迁移 最终耗时:1-2小时(服务器更佳),建议在晚上或者没人访问的情况下操作

思路:

1.不能一下将所有数据,导入到目标数据表,耗时太久,且占用资源,所有就用程序批量执行,每次执行一个范围段,比如第一个线程: 1 -1000,第二个线程: 1001-2000,第三个线程:2001-3000这样,当然我这里是为了方便理解,实际这个是要根据自己插入数据的性能,来进行调整的,当前数据库,大概每次插入28w数据,耗时在2-5秒,所以每次也就是,1-280000,2800001-560000这样子执行。

2.我这里插入的目标数据表是有索引的,因为业务需要,如果不加索引,插入速度会更好,但是,查询的话,影响很大,看你的需求,如果不是需要查询的表,可以不加。这里加上的原因就是,如果等数据导入进去,再添加索引,会锁表,并且耗时很久也没执行成功。

public class Test{

		
 		 public static void main(String[] args) {
        //数据库中总记录数
        long rows = 2177847407L;

        //核心线程数
        int kThreads = Runtime.getRuntime().availableProcessors() * 2;
        //任务数(这里需要求一下平均每个任务需要执行的任务id大小是多少,实际测试中,20-30w 快则 2秒,慢则3-5秒,这个阈值是比较理想的 也就是 总记录数/任意数 等到想要的平均任务数)
        Long talks = (rows / 8000) + 1;

				//线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(kThreads);

        //数据起始位
        Long startIndex = 0L;
        
        //数据结束位
        Long endIndex = startIndex+talks;

		//存放线程执行结果
        List<Future> list = new Vector<>();
        //执行多少次任务 = 结束位置不小于总记录数 
        for (Long i = 0l; i < rows; i = endIndex) {
            final Future submit = threadPool.submit(new DataThread(startIndex, endIndex, new FaultRealMapper()));
            //每次执行完成,开始id+1
            startIndex = endIndex+1;
            //结束id=开始id+任务数
            endIndex = startIndex+talks;
            list.add(submit);
        }
        //打印结果
        list.forEach(dx -> {
            try {
                System.out.println(dx.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });


    }


		//线程操作资源类
    class DataThread implements Callable {

        private FaultRealMapper faultRealMapper;
        private Long startId, endId;

        public DataThread(Long startId, Long endId, FaultRealMapper faultRealMapper) {
            this.faultRealMapper = faultRealMapper;
            this.startId = startId;
            this.endId = endId;
            System.out.println("startId:" + startId + ",endId:" + endId);
        }

        @Override
        public Object call() throws Exception {
          	System.out.println("开始执行");
         		faultRealMapper.insert(startId, endId);
            return "执行成功";
        }
    }
}

Sql:

<insert id="insert">
  Insert INTO `newTable` ( `car_id`, `insert_time`)
  SELECT `car_id`, `insert_time`
  ROM `oldTable`
  WHERE id&lt;=#{endId} and id>=#{startId} 
 </insert>

Mapper.class

package com.test.mapper;


@Mapper
public interface FaultRealMapper {
  
  	//插入
    void insert(Long startId, Long endId);
}

本期文章就到这里了,我是梦辰,可以微信搜一搜「梦辰的架构笔记 」公众号,保证干货满满!!!欢迎大家和我交流。|

  • 作者:陵越
  • 原文链接:https://blog.csdn.net/weixin_43803688/article/details/119026551
    更新时间:2022-09-18 09:57:56