Java并发编程之 常用并发工具类

2022-10-07 10:36:26

并发工具类

一、CountDownLatch

1. 概念

  • 可以将其理解为一个计数器,创建实例时,构造器中赋予计数器的初始值,实例每调用一次countDown() 计数器的值就减一
  • 实例调用await() ,线程会进入阻塞状态
  • 当计数器的值变成0,会唤醒通过上述方法进入阻塞状态的所有线程,继续向下执行

2. 代码示例

  1. 多个多线程使用一个计数器

    publicclassCountDownLatchTest1{publicstaticvoidmain(String[] args){System.out.println("定义了计数器,初始值为3");finalCountDownLatch countDownLatch=newCountDownLatch(3);newThread(newRunnable(){@Overridepublicvoidrun(){try{System.out.println("启动t2线程,调用await方法,阻塞");
    					countDownLatch.await();System.out.println("计数器的值成为0,t2被唤醒,继续运行");}catch(InterruptedException e){
    					e.printStackTrace();}}},"t2").start();newThread(newRunnable(){@Overridepublicvoidrun(){try{System.out.println("启动t3线程,调用await方法,阻塞");
    					countDownLatch.await();System.out.println("计数器的值成为0,t3被唤醒,继续运行");}catch(InterruptedException e){
    					e.printStackTrace();}}},"t3").start();newThread(newRunnable(){@Overridepublicvoidrun(){try{System.out.println("t1线程启动,连续三次将计数器的值减一");
    					countDownLatch.countDown();//3-1=2
    					countDownLatch.countDown();//2-1=1
    					countDownLatch.countDown();//1-1=0}catch(Exception e){
    					e.printStackTrace();}}},"t1").start();}}

    运行结果:

  2. 也可以定义多个计数器,各个计数器的阻塞和唤醒各不相干

二、CyclicBarrier

1. 概念

  • 设置一个公共屏障点,所有线程在屏障点互相等待,所有线程都准备好后,所有线程同时运行

  • 可以循环使用,如下图

    1 - 5 表示5个线程,所有线程在屏障点互相等待,全部准备好后,同时启动运行

2. 代码实现

  • 创建实例CyclicBarrier barrier = new CyclicBarrier(3); 构造器参数中声明屏障点的线程数
    • 如果等待的线程数少于构造器中设定的初始值,则程序会一直等待下去,无法运行
  • 实例调用barrier.await(); 表示此线程在所有线程未到达屏障点之前将一直阻塞
    • 最后一个线程调用此方法时,所有阻塞的线程都会立即启动运行
  • 实例调用barrier.await(1,TimeUnit.SECONDS); 表示此线程等待了1秒钟,但是还有线程没有到达屏障点,则此线程会自己先运行,抛出TimeoutException异常,导致Barrier被标记为一个已经破坏的Barrier
    • 当其他线程调用await() 时发现屏障点已经被破环,抛出 BrokenBarrierException异常
//所有的子任务都完成时,才执行主任务,这个时候就可以选择使用CyclicBarrierpublicclassCyclicBarrierTest2{publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,BrokenBarrierException{// 需要5个等待线程CyclicBarrier barrier=newCyclicBarrier(5);// 创建4个等待线程 + 1个main线程newThread(newWorker(barrier,"worker1")).start();newThread(newWorker(barrier,"worker2")).start();newThread(newWorker(barrier,"worker3")).start();newThread(newWorker(barrier,"worker4")).start();System.out.println("main线程等待中");
		barrier.await();System.out.println("所有的线程都工作完毕了, main线程继续执行!");}}// 要等待的线程classWorkerimplementsRunnable{privatefinalCyclicBarrier barrier;//线程的名字privatefinalString name;//构造器publicWorker(CyclicBarrier barrier,String name){this.barrier= barrier;this.name= name;}@Overridepublicvoidrun(){try{Thread.sleep(1000);System.out.println(name+"等待中");
			barrier.await();}catch(InterruptedException|BrokenBarrierException e){
			e.printStackTrace();}}}

运行结果:

三、Semaphore

1. 概念

  • Semaphore一个计数信号量,信号量维护了一个许可证集合
  • 通过acquire()release()获取释放访问许可证
  • 线程只有获取了许可证才能执行,如果获取不到(许可证集合中的许可证已全部被别的线程获取)阻塞
  • 线程释放了许可证,其他线程才可以获取

2. 代码实现

  • 创建实例Semaphore semp = new Semaphore(1); 构造器的参数是许可证的数量
  • 获取许可证semp.acquire();
  • 释放许可证semp.release();
publicclassSemaphoreTest1{publicstaticvoidmain(String[] args){// 创建线程池ExecutorService exec=Executors.newCachedThreadPool();// 创建Semaphore实例// 最多同时只能有两个线程获取到许可证finalSemaphore semp=newSemaphore(2);// 创建三个线程获取许可证for(int index=0; index<3; index++){Runnable run=newRunnable(){publicvoidrun(){try{System.out.println(Thread.currentThread().getName()+"尝试获取许可证");// 获取许可证
						semp.acquire();System.out.println(Thread.currentThread().getName()+"成功获取许可证");Thread.sleep(1000);System.out.println(Thread.currentThread().getName()+"释放许可证");// 释放许可证
						semp.release();}catch(InterruptedException e){}}};
			exec.execute(run);}// 退出线程池
		exec.shutdown();}}

运行结果:

3. 公平性

  • 观察上述运行结果可知,1最先尝试获取许可证,但最先成功获取许可证的并不是1,表明默认是非公平
  • 公平性指的是谁最先尝试获取许可证,谁就应该先获取到许可证
  • 强制公平会影响到并发性能,所以除非确实需要它否则不要启用它
  • 创建公平的实例final Semaphore semp = new Semaphore(2, true);

四、Exchanger

1. 概念

  • 此类用于交换两个线程的数据(必须是两个线程)
  • 两个线程必须都到达汇合点才可以交换,若有未到达的,则到达的需要等待

2. 代码实现

  • 创建实例Exchanger<T> exchanger = new Exchanger<>(); 泛型中声明要交换的数据的类型
  • 两个线程同时调用exchanger.exchange(T t) 才可以交换数据,一个先调用则必须等待另一个线程调用
    • 返回值是和另一个线程交换后的数据
publicclassExchangerTest1{publicstaticvoidmain(String[] args){//创建实例,两个线程必须使用同一个交换器Exchanger<String> exchanger=newExchanger<String>();ExchangerRunnable exchangerRunnable1=newExchangerRunnable(exchanger,"A");ExchangerRunnable exchangerRunnable2=newExchangerRunnable(exchanger,"B");newThread(exchangerRunnable1).start();newThread(exchangerRunnable2).start();}}classExchangerRunnableimplementsRunnable{Exchanger<String> exchanger=null;String object=null;//每个线程要交换的数据publicExchangerRunnable(Exchanger<String> exchanger,String object){this.exchanger= exchanger;this.object= object;}@Overridepublicvoidrun(){try{Object previous=this.object;//获取到本线程要交换的数据System.out.println(Thread.currentThread().getName()+"交换前的数据:"+this.object);//保证不同线程到达交汇点的时间不同if("C".equals(previous)){Thread.sleep(1000);System.out.println(Thread.currentThread().getName()+"对数据C的处理耗时1s");}elseif("D".equals(previous)){Thread.sleep(2000);System.out.println(Thread.currentThread().getName()+"对数据D的处理耗时2s");}elseif("A".equals(previous)){Thread.sleep(3000);System.out.println(Thread.currentThread().getName()+"对数据A的处理耗时3s");}elseif("B".equals(previous)){Thread.sleep(4000);System.out.println(Thread.currentThread().getName()+"对数据B的处理耗时4s");}// 两个对象必须在此处汇合,只有一个线程调用change方法是不会进行数据交换的this.object=this.exchanger.exchange(this.object);System.out.println(Thread.currentThread().getName()+" 交换后: "+this.object);}catch(InterruptedException e){
			e.printStackTrace();}}}

运行结果:

五、ReentrantReadWriteLock

1. 概念

  • ReentrantReadWriteLock称为读写锁(写锁 + 悲观读锁),采用读写加锁分离的方式
  • 高并发下读多写少时性能优于ReentrantLock
  • 读读共享,写写互斥,读写互斥
    • 共享指的共享锁,互斥指的是排他锁
  • ReentrantReadWriteLock写锁可重入锁

2. 代码实现

  • 创建实例ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();

  • 创建读锁ReadLock readLock = rwLock.readLock();

  • 创建写锁WriteLock writeLock = rwLock.writeLock();

  • 读写锁都是通过调用lock()unlock() 进行加锁解锁

publicclassReadWriteLockTest1{privatefinalReentrantReadWriteLock rwLock=newReentrantReadWriteLock();privatefinalReadLock readLock= rwLock.readLock();privatefinalWriteLock writeLock= rwLock.writeLock();//读publicvoidread(){try{
			readLock.lock();System.out.println("当前线程:"+Thread.currentThread().getName()+"read进入...");Thread.sleep(3000);}catch(Exception e){
			e.printStackTrace();}finally{System.out.println("当前线程:"+Thread.currentThread().getName()+"read退出...");
			readLock.unlock();}}//写publicvoidwrite(){try{
			writeLock.lock();System.out.println("当前线程:"+Thread.currentThread().getName()+"write进入...");Thread.sleep(3000);}catch(Exception e){
			e.printStackTrace();}finally{System.out.println("当前线程:"+Thread.currentThread().getName()+"write退出...");
			writeLock.unlock();}}publicstaticvoidmain(String[] args){finalReadWriteLockTest1 urrw=newReadWriteLockTest1();Thread t1=newThread(newRunnable(){@Overridepublicvoidrun(){
				urrw.read();}},"t1");Thread t2=newThread(newRunnable(){@Overridepublicvoidrun(){
				urrw.read();}},"t2");Thread t3=newThread(newRunnable(){@Overridepublicvoidrun(){
				urrw.write();}},"t3");//读读模式
		t1.start();// Read
		t2.start();// Read}}

运行结果:

t2无需等待t1释放锁才执行

  • 将读读模式修改为读写模式
//读写模式
t1.start();// Read
t3.start();// Write

运行结果:

六、StampedLock

1. 概念

  • StampedLock支持三种锁:写锁、悲观读锁、乐观读锁,相比ReentrantReadWriteLock多了乐观读锁

  • StampedLock中引入了一个stamp(邮戳)的概念,它代表线程获取到锁的版本每次获得锁之后都会得到一个新的long值

  • 写锁是排它锁(独占锁),相同时间只能有一个线程获取写锁,其他线程请求读锁和写锁 都会被阻塞

  • StampedLock的写锁不可重入锁

    • ReentrantLock锁是可重入锁
    • ReentrantReadWriteLock的写锁可重入锁

2. 写锁

  • 创建实例StampedLock lock = new StampedLock();
  • 获取写锁long stamp = lock.writeLock();
  • 释放写锁lock.unlockWrite(stamp);参数需要传递要释放锁的long值
  • 获取带超时时间的写锁lock.tryWriteLock 如果没有获得写锁返回值为0,不阻塞,向下运行

情况一:StampedLock写锁的正常使用

//创建StampedLock对象
  • 作者:Nice2cu_Code
  • 原文链接:https://gaoqize.blog.csdn.net/article/details/115249879
    更新时间:2022-10-07 10:36:26