Java并发——这些工具类你用过吗?

2022-09-23 12:05:25

前言

J.U.C包中提供了一些非常有用的工具类。在合适的场景下使用它们往往能够达到事半功倍的效果。比如Atomic工具类、Exchanger、CountDownLatch、CyclicBarrier、Semaphore这些。

Atomic工具类

Atomic工具类能够实现原子操作数据。从数据类型的角度来看,可以分为:基本数据类型、数组、引用类型、引用类型属性的原子更新操作。它的底层原理其实就是对于Unsafe类和volatile的封装。
像AtomicInteger、AtomicLong等内部源码都是差不多的。值得注意的是像AtomicBoolean/AtomicChar等的实现原理其实是内部将boolean、char、byte、等数据类型向上转型为了int类型,本质上和AtomicInteger差别不大。

Exchanger

Exchanger可以交换线程间的数据,是一个线程间协作工作的工具类。它提供了一个同步点,用于线程间交换数据。Exchanger只能交换2个线程间的数据。理应如此:生活中,不论是交换利益、交换物品,这些行为都是发生在两者之间。
在实际的场景中,Exchanger十分适用于两个任务之间有数据上的相互依赖的场景。比如交易系统中的对账功能。我们以一个实际例子来看一看Exchanger的使用:

publicclassExchangerTest{publicstaticfinal Exchanger<Integer> transExchanger=newExchanger<>();publicstaticvoidmain(String[] args){
        Thread th1=newThread(()->{// 伪代码
            Integer transA=1000;try{
                Integer transFromB= transExchanger.exchange(transA);
                System.out.println("我是系统A:我系统中统计的交易额为:"+ transA+" 我在交易系统中产生的交易额为:"+ transFromB);}catch(InterruptedException e){
                e.printStackTrace();}});

        Thread th2=newThread(()->{// 伪代码
            Integer transB=1001;try{
                Integer transFromA= transExchanger.exchange(transB);
                System.out.println("我是交易系统:系统A统计出的交易额为:"+ transB+" 系统A实际在我这里产生的交易额为:"+ transFromA);}catch(InterruptedException e){
                e.printStackTrace();}});

        th1.start();
        th2.start();}}

执行结果如下:

我是交易系统:系统A统计出的交易额为:1001 系统A实际在我这里产生的交易额为:1000
我是系统A:我系统中统计的交易额为:1000 我在交易系统中产生的交易额为:1001

CountDownLatch

CountDownLatch的作用是用于多线程间同步完成任务。值得注意的是它只能使用一次,原因在于CountDownLatch设置锁资源之后,只提供了countDown()方法来释放锁资源,和await()方法来等待锁资源释放完毕。并没有重置锁资源的功能。如果我们理解AQS源码的话,那么读源码和对于CountDownLatch的使用就再简单不过了。

publicclassCountDownLatchTest{publicstaticfinal CountDownLatch countDownLatch=newCountDownLatch(2);publicstaticvoidmain(String[] args){
        Thread th1=newThread(()->{
            System.out.println("th1 run.");
            countDownLatch.countDown();});

        Thread th2=newThread(()->{try{
                Thread.sleep(1000);}catch(InterruptedException e){
                e.printStackTrace();}
            System.out.println("th2 run.");
            countDownLatch.countDown();});

        Thread th3=newThread(()->{
            System.out.println("th3 waiting until count down 0.");try{
                countDownLatch.await();}catch(InterruptedException e){
                e.printStackTrace();}
            System.out.println("th3 last to run.");});

        th1.start();
        th2.start();
        th3.start();}}

执行结果:

th1 run.
th3 waiting until count down 0.
th2 run.
th3 last to run.

由于CountDownLatch只能使用一次,因此它适用于那些一次性任务。比如一些框架的初始化。

CyclicBarrier

CyclicBarrier通常被翻译为回栏栅。它的功能其实是类似于CountDownLatch。不同之处在于:

  • CyclicBarrier一轮完成之后可以重复使用。
  • CyclicBarrier的构造方法还支持注册一个额外的线程,在唤醒await中的线程之前先执行这个线程。

我们分析一个例子如下:

publicclassCyclicBarrierTest{// 回栏珊,注册一个额外的线程publicstaticfinal CyclicBarrier cyclicBarrier=newCyclicBarrier(4,()->{
        System.out.println("令牌已经为空了,准备唤醒等待中的线程.");});publicstaticvoidmain(String[] args){// 创建一个固定大小的线程池
        ExecutorService executorService= Executors.newFixedThreadPool(4);for(int i=0; i<4; i++){
            executorService.submit(()->{try{
                    String thName= Thread.currentThread().getName();
                    System.out.println(thName+" 执行完毕,等待通知...");
                    cyclicBarrier.await();
                    System.out.println(thName+" 收到通知.");}catch(InterruptedException e){
                    e.printStackTrace();}catch(BrokenBarrierException e){// ignore}});}

        executorService.shutdown();}}

执行结果如下:

执行结果
网络上有很多针对于CyclicBarrier和CountDownLatch的区别的文章,说法总体相近。其实这二者最大的区别就在于前者支持重置资源,而后置不支持。前者提供的功能更丰富。因此前者更加适用于复杂的业务场景。如果Java并发的基础牢固,那么阅读具体的源码也是比较简单的!

Semaphore

Semaphore翻译过来是信号量。它的功能主要是控制并发线程数,特别适用于流量的控制,尤其是那些比较珍贵的公共资源。比如:数据库的连接,IO操作等。
我们举个例子如下:

publicclassSemaphoreTest{publicstaticvoidmain(String[] args){
        Semaphore semaphore=newSemaphore(1);for(int i=0; i<10; i++){
            Thread thread=newThread(()->{try{
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"获取令牌成功");
                    Thread.sleep(100);
                    semaphore.release();}catch(InterruptedException e){
                    e.printStackTrace();}});

            thread.start();}}}

Semaphore提供了acquire()方法来获取令牌,使用release来释放令牌。它也是基于AQS实现的。

总结

本篇文章并没有过多的去解读源码层面。截止到本篇文章,其实不难发现,如果掌握了Java并发的基本思想和底层基础知识后,对于应用层面的解读阅读大多数其实是比较简单的。这也许就是知其所以然的好处罢!

  • 作者:兵临奇点
  • 原文链接:https://blog.csdn.net/sunLeon1993/article/details/107128916
    更新时间:2022-09-23 12:05:25