CountDownLatch和CyclicBarrier:如何让多线程步调一致

2022年5月29日10:57:34

案例:对账系统的业务是这样的,用户通过在线商城下单,会生成电子订单,保存在订单库;之后物流会生成派送单给用户发货,派送单保存在派送单库。为了防止漏派送或者重复派送,对账系统每天还会校验是否存在异常订单。对账系统的处理逻辑很简单,系统流程图如下。目前对账系统的处理逻辑是首先查询订单,然后查询派送单,之后对比订单和派送单,将差异写入差异库。

CountDownLatch和CyclicBarrier:如何让多线程步调一致

对上面的代码抽象就是这样的,就是在一个单线程里面循环查询订单、派送单,然后执行对账,最后将写入差异库。

while(存在未对账订单){
// 查询未对账订单
pos=getPOrders();
// 查询派送单
dos=getDOrders();
// 执行对账操作
diff=check(pos,dos);
// 差异写入差异库
save(diff);
}

1)上面的系统现在执行很慢,该怎样优化来执行速度呢?

  • 目前是单线程的,那单线程的话我们就考虑是否可以用多线程来做。查询未对账订单和查询派送单这两个操作是可以并行处理的。

2)实现查询对账订单和查询派送单并行执行的代码应该是怎样的?


while(存在未对账订单){
// 查询未对账订单
ThreadT1=newThread(()->{
pos=getPOrders();
});
T1.start();
// 查询派送单
ThreadT2=newThread(()->{
dos=getDOrders();
});
T2.start();
// 等待T1、T2结束
T1.join();
T2.join();
// 执行对账操作
diff=check(pos,dos);
// 差异写入差异库
save(diff);
}
  • 我们在主线程中开了两个插队的线程,等这两个查询的插队线程执行完了,阻塞的主线程被唤醒,那么就可以执行对账还有写入差异库的操作了。

3)思考一下,我们上面的代码还有没有优化的空间呢?

  • 我们每次进行新的查询的对账的时候,都要创建两个新的线程出来,我们知道创建线程是比较好费时间的。那思考一下可不可以用线程池来减少创建线程的开销呢。


    // 创建2个线程的线程池
    Executorexecutor=
    Executors.newFixedThreadPool(2);
    while(存在未对账订单){
    // 查询未对账订单
    executor.execute(()-> {
    pos=getPOrders();
    });
    // 查询派送单
    executor.execute(()-> {
    dos=getDOrders();
    });

    /* ??如何实现等待??*/

    // 执行对账操作
    diff=check(pos,dos);
    // 差异写入差异库
    save(diff);
    }

4)使用上面线程池的代码的话,我的join就不能调用了,那我的主线程就不知道什么时候两个查询操作执行完了,这个时候该怎么办?

  • 我们可以使用一个计数器,初始值呢设置为2,查询一次就减1,当两个查询执行完,那计数器就是0了,我们的主线程也就能被唤醒执行了。

5)上面的方案是我们自己想出来的,那java其实提供了一个非常方便的实现我们上面方案的工具类CountDownLatch,那使用CountDownLatch怎样优化我们的代码呢?

  • 主线程的话我们调用await方法来阻塞,两个查询线程我们执行countDown方法,会减1。主线程当检测到为0时就可以执行了


// 创建2个线程的线程池
Executorexecutor=
Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 计数器初始化为2
CountDownLatchlatch=
newCountDownLatch(2);
// 查询未对账订单
executor.execute(()-> {
pos=getPOrders();
latch.countDown();
});
// 查询派送单
executor.execute(()-> {
dos=getDOrders();
latch.countDown();
});

// 等待两个查询操作结束
latch.await();

// 执行对账操作
diff=check(pos,dos);
// 差异写入差异库
save(diff);
}

6)上面使用CountDownLatch和线程池的方案已经很不错了,在思考一下,我们的这个程序还能不能优化一下?

  • 我的对账操作和下一次查询操作其实是不影响的,那么他们之间是可以并发执行的。也就是我在进行本次对账的同时,是可以执行下一次的查询操作的。

7)对账需要查询出数据来才可以执行,这种的话对应什么模型?

  • 生产者-消费者模型

8)既然看出来了是生产者消费者模型,那就需要一个队列,生产者生产出来东西放到队列,消费者去队列当中取。但是针对我们上面的案例,一个队列的话肯会造成数据混乱,我们应该怎样设计?

  • 使用两个队列,两个队列间的元素还有对应关系。订单查询操作将订单查询结果插入订单队列,派送单查询操作将派送单插入派送单队列,这两个队列的元素之间是有一一对应的关系的。我们的对账操作每次从两个队列当中各取一个,这样数据肯定不会发生混乱。

9)我们如何用代码来实现查询和对账之间的并行呢?

  • 使用三个线程,一个线程 T1 执行订单的查询工作,一个线程 T2 执行派送单的查询工作,当线程 T1 和 T2 都各自生产完 1 条数据的时候,通知线程 T3 执行对账操作。这个想法虽看上去简单,但其实还隐藏着一个条件,那就是线程 T1 和线程 T2 的工作要步调一致,不能一个跑得太快,一个跑得太慢,只有这样才能做到各自生产完 1 条数据的时候,通知线程 T3。

10)我们上面的方案有哪些要解决的问题?怎样解决?

  • T1和T2要走的齐

  • 他们执行完之后要能通知到T3

  • 解决这两个问题的方案也很简单,还是搞一个计数器,初始化为2,T1执行完减1,T2执行完减1。当计数器值为0时,T3就可以执行了,T3执行的时候把我们计数器又重置为2,此时T1,T2又可以执行了。

11)实际项目中java其实给了我们现成的实现上面方案的工具类CyclicBarrier,代码实现的怎样的?

  • CyclicBarrier 的计数器有自动重置的功能,当减到 0 的时候,会自动重置你设置的初始值,所以他要带个循环


// 订单队列
Vector<P>pos;
// 派送单队列
Vector<D>dos;
// 执行回调的线程池
Executorexecutor=
Executors.newFixedThreadPool(1);
finalCyclicBarrierbarrier=
newCyclicBarrier(2, ()->{
executor.execute(()->check());
});

voidcheck(){
Pp=pos.remove(0);
Dd=dos.remove(0);
// 执行对账操作
diff=check(p,d);
// 差异写入差异库
save(diff);
}

voidcheckAll(){
// 循环查询订单库
ThreadT1=newThread(()->{
while(存在未对账订单){
// 查询订单库
pos.add(getPOrders());
// 等待
barrier.await();
  }
});
T1.start();
// 循环查询运单库
ThreadT2=newThread(()->{
while(存在未对账订单){
// 查询运单库
dos.add(getDOrders());
// 等待
barrier.await();
  }
});
T2.start();
}

  • 作者:Love&Share
  • 原文链接:https://www.cnblogs.com/YXBLOGXYY/p/16079520.html
    更新时间:2022年5月29日10:57:34 ,共 3001 字。