Java多线程批量数据导入的方法

2022-09-23 14:35:26

前言:

当遇到大量数据导入时,为了提高处理的速度,可以选择使用多线程来批量处理这些处理。常见的场景有:

  • 大文件导入数据库(这个文件不一定是标准的CSV可导入文件或者需要在内存中经过一定的处理)
  • 数据同步(从第三方接口拉取数据处理后写入自己的数据库)

以上的场景有一个共性,这类数据导入的场景简单来说就是将数据从一个数据源移动到另外一个数据源,而其中必定可以分为两步

  • 数据读取:从数据源读取数据到内存
  • 数据写入:将内存中的数据写入到另外一个数据源,可能存在数据处理

而且根据读取的速度一般会比数据写入的速度快很多,即读取快,写入慢。

设计思路

由于场景的特点是读取快,写入慢,如果是使用多线程处理,建议是数据写入部分改造为多线程。而数据读取可以改造成批量读取数据。简单来说就是两个要点:

  • 批量读取数据
  • 多线程写入数据

示例

多线程批量处理最简单的方案是使用线程池来进行处理,下面会通过一个模拟批量读取和写入的服务,以及对这个服务的多线程写入调用作为示例,展示如何多线程批量数据导入

  1. 模拟服务
importjava.util.concurrent.atomic.AtomicLong;/**
* 数据批量写入用的模拟服务
*
* @author RJH
* create at 2019-04-01
*/publicclassMockService{/**
* 可读取总数
*/privatelong canReadTotal;/**
* 写入总数
*/privateAtomicLong writeTotal=newAtomicLong(0);/**
* 写入休眠时间(单位:毫秒)
*/privatefinallong sleepTime;/**
* 构造方法
*
* @param canReadTotal
* @param sleepTime
*/publicMockService(long canReadTotal,long sleepTime){this.canReadTotal= canReadTotal;this.sleepTime= sleepTime;}/**
* 批量读取数据接口
*
* @param num
* @return
*/publicsynchronizedlongreadData(int num){long readNum;if(canReadTotal>= num){
canReadTotal-= num;
readNum= num;}else{
readNum= canReadTotal;
canReadTotal=0;}//System.out.println("read data size:" + readNum);return readNum;}/**
* 写入数据接口
*/publicvoidwriteData(){try{// 休眠一定时间模拟写入速度慢Thread.sleep(sleepTime);}catch(InterruptedException e){
e.printStackTrace();}// 写入总数自增System.out.println("thread:"+Thread.currentThread()+" write data:"+ writeTotal.incrementAndGet());}/**
* 获取写入的总数
*
* @return
*/publiclonggetWriteTotal(){return writeTotal.get();}}
  1. 批量数据处理器
importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;/**
* 基于线程池的多线程批量写入处理器
* @author RJH
* create at 2019-04-01
*/publicclassSimpleBatchHandler{privateExecutorService executorService;privateMockService service;/**
* 每次批量读取的数据量
*/privateint batch;/**
* 线程个数
*/privateint threadNum;publicSimpleBatchHandler(MockService service,int batch,int threadNum){this.service= service;this.batch= batch;//使用固定数目的线程池this.executorService=Executors.newFixedThreadPool(threadNum);}/**
* 开始处理
*/publicvoidstartHandle(){// 开始处理的时间long startTime=System.currentTimeMillis();System.out.println("start handle time:"+ startTime);long readData;while((readData= service.readData(batch))!=0){// 批量读取数据,知道读取不到数据才停止for(long i=0; i< readData; i++){
executorService.execute(()-> service.writeData());}}// 关闭线程池
executorService.shutdown();while(!executorService.isTerminated()){//等待线程池中的线程执行完}// 结束时间long endTime=System.currentTimeMillis();System.out.println("end handle time:"+ endTime);// 总耗时System.out.println("total handle time:"+(endTime- startTime)+"ms");// 写入总数System.out.println("total write num:"+ service.getWriteTotal());}}
  1. 测试类
/**
* SimpleBatchHandler的测试类
*/publicclassSimpleBatchHandlerTest{publicstaticvoidmain(String[] args){// 总数long total=100000;// 休眠时间long sleepTime=100;// 每次拉取的数量int batch=100;// 线程个数int threadNum=16;MockService mockService=newMockService(total,sleepTime);SimpleBatchHandler handler=newSimpleBatchHandler(mockService,batch,threadNum);
handler.startHandle();}}
  1. 运行结果
start handle time:1554298681755
thread:Thread[pool-1-thread-2,5,main] write data:1
thread:Thread[pool-1-thread-1,5,main] write data:2...省略部分输出
thread:Thread[pool-1-thread-4,5,main] write data:100000
end handle time:1554299330202
total handle time:648447ms
total write num:100000
  • 作者:weixin_45086773
  • 原文链接:https://blog.csdn.net/weixin_45086773/article/details/124918604
    更新时间:2022-09-23 14:35:26