并发工具类
文章目录
一、CountDownLatch
1. 概念
- 可以将其理解为一个计数器,创建实例时,构造器中赋予计数器的初始值,实例每调用一次
countDown()
计数器的值就减一 - 实例调用
await()
,线程会进入阻塞状态 - 当计数器的值变成0,会唤醒通过上述方法进入阻塞状态的所有线程,继续向下执行
2. 代码示例
多个多线程使用一个计数器
publicclassCountDownLatchTest1{publicstaticvoidmain(String[] args){System.out.println("定义了计数器,初始值为3");finalCountDownLatch countDownLatch=newCountDownLatch(3);newThread(newRunnable(){@Overridepublicvoidrun(){try{System.out.println("启动t2线程,调用await方法,阻塞"); countDownLatch.await();System.out.println("计数器的值成为0,t2被唤醒,继续运行");}catch(InterruptedException e){ e.printStackTrace();}}},"t2").start();newThread(newRunnable(){@Overridepublicvoidrun(){try{System.out.println("启动t3线程,调用await方法,阻塞"); countDownLatch.await();System.out.println("计数器的值成为0,t3被唤醒,继续运行");}catch(InterruptedException e){ e.printStackTrace();}}},"t3").start();newThread(newRunnable(){@Overridepublicvoidrun(){try{System.out.println("t1线程启动,连续三次将计数器的值减一"); countDownLatch.countDown();//3-1=2 countDownLatch.countDown();//2-1=1 countDownLatch.countDown();//1-1=0}catch(Exception e){ e.printStackTrace();}}},"t1").start();}}
运行结果:
也可以定义多个计数器,各个计数器的阻塞和唤醒各不相干
二、CyclicBarrier
1. 概念
设置一个公共屏障点,所有线程在屏障点互相等待,所有线程都准备好后,所有线程同时运行
可以循环使用,如下图
1 - 5 表示5个线程,所有线程在屏障点互相等待,全部准备好后,同时启动运行
2. 代码实现
- 创建实例
CyclicBarrier barrier = new CyclicBarrier(3);
构造器参数中声明屏障点的线程数- 如果等待的线程数少于构造器中设定的初始值,则程序会一直等待下去,无法运行
- 实例调用
barrier.await();
表示此线程在所有线程未到达屏障点之前将一直阻塞- 最后一个线程调用此方法时,所有阻塞的线程都会立即启动运行
- 实例调用
barrier.await(1,TimeUnit.SECONDS);
表示此线程等待了1秒钟,但是还有线程没有到达屏障点,则此线程会自己先运行,抛出TimeoutException异常,导致Barrier被标记为一个已经破坏的Barrier- 当其他线程调用
await()
时发现屏障点已经被破环,抛出 BrokenBarrierException异常
- 当其他线程调用
//所有的子任务都完成时,才执行主任务,这个时候就可以选择使用CyclicBarrierpublicclassCyclicBarrierTest2{publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,BrokenBarrierException{// 需要5个等待线程CyclicBarrier barrier=newCyclicBarrier(5);// 创建4个等待线程 + 1个main线程newThread(newWorker(barrier,"worker1")).start();newThread(newWorker(barrier,"worker2")).start();newThread(newWorker(barrier,"worker3")).start();newThread(newWorker(barrier,"worker4")).start();System.out.println("main线程等待中");
barrier.await();System.out.println("所有的线程都工作完毕了, main线程继续执行!");}}// 要等待的线程classWorkerimplementsRunnable{privatefinalCyclicBarrier barrier;//线程的名字privatefinalString name;//构造器publicWorker(CyclicBarrier barrier,String name){this.barrier= barrier;this.name= name;}@Overridepublicvoidrun(){try{Thread.sleep(1000);System.out.println(name+"等待中");
barrier.await();}catch(InterruptedException|BrokenBarrierException e){
e.printStackTrace();}}}
运行结果:
三、Semaphore
1. 概念
- Semaphore一个计数信号量,信号量维护了一个许可证集合
- 通过
acquire()
和release()
来获取和释放访问许可证 - 线程只有获取了许可证才能执行,如果获取不到(许可证集合中的许可证已全部被别的线程获取)则阻塞
- 线程释放了许可证,其他线程才可以获取
2. 代码实现
- 创建实例
Semaphore semp = new Semaphore(1);
构造器的参数是许可证的数量 - 获取许可证
semp.acquire();
- 释放许可证
semp.release();
publicclassSemaphoreTest1{publicstaticvoidmain(String[] args){// 创建线程池ExecutorService exec=Executors.newCachedThreadPool();// 创建Semaphore实例// 最多同时只能有两个线程获取到许可证finalSemaphore semp=newSemaphore(2);// 创建三个线程获取许可证for(int index=0; index<3; index++){Runnable run=newRunnable(){publicvoidrun(){try{System.out.println(Thread.currentThread().getName()+"尝试获取许可证");// 获取许可证
semp.acquire();System.out.println(Thread.currentThread().getName()+"成功获取许可证");Thread.sleep(1000);System.out.println(Thread.currentThread().getName()+"释放许可证");// 释放许可证
semp.release();}catch(InterruptedException e){}}};
exec.execute(run);}// 退出线程池
exec.shutdown();}}
运行结果:
3. 公平性
- 观察上述运行结果可知,1最先尝试获取许可证,但最先成功获取许可证的并不是1,表明默认是非公平的
- 公平性指的是谁最先尝试获取许可证,谁就应该先获取到许可证
- 强制公平会影响到并发性能,所以除非确实需要它否则不要启用它
- 创建公平的实例
final Semaphore semp = new Semaphore(2, true);
四、Exchanger
1. 概念
- 此类用于交换两个线程的数据(必须是两个线程)
- 两个线程必须都到达汇合点才可以交换,若有未到达的,则到达的需要等待
2. 代码实现
- 创建实例
Exchanger<T> exchanger = new Exchanger<>();
泛型中声明要交换的数据的类型 - 两个线程同时调用
exchanger.exchange(T t)
才可以交换数据,一个先调用则必须等待另一个线程调用- 返回值是和另一个线程交换后的数据
publicclassExchangerTest1{publicstaticvoidmain(String[] args){//创建实例,两个线程必须使用同一个交换器Exchanger<String> exchanger=newExchanger<String>();ExchangerRunnable exchangerRunnable1=newExchangerRunnable(exchanger,"A");ExchangerRunnable exchangerRunnable2=newExchangerRunnable(exchanger,"B");newThread(exchangerRunnable1).start();newThread(exchangerRunnable2).start();}}classExchangerRunnableimplementsRunnable{Exchanger<String> exchanger=null;String object=null;//每个线程要交换的数据publicExchangerRunnable(Exchanger<String> exchanger,String object){this.exchanger= exchanger;this.object= object;}@Overridepublicvoidrun(){try{Object previous=this.object;//获取到本线程要交换的数据System.out.println(Thread.currentThread().getName()+"交换前的数据:"+this.object);//保证不同线程到达交汇点的时间不同if("C".equals(previous)){Thread.sleep(1000);System.out.println(Thread.currentThread().getName()+"对数据C的处理耗时1s");}elseif("D".equals(previous)){Thread.sleep(2000);System.out.println(Thread.currentThread().getName()+"对数据D的处理耗时2s");}elseif("A".equals(previous)){Thread.sleep(3000);System.out.println(Thread.currentThread().getName()+"对数据A的处理耗时3s");}elseif("B".equals(previous)){Thread.sleep(4000);System.out.println(Thread.currentThread().getName()+"对数据B的处理耗时4s");}// 两个对象必须在此处汇合,只有一个线程调用change方法是不会进行数据交换的this.object=this.exchanger.exchange(this.object);System.out.println(Thread.currentThread().getName()+" 交换后: "+this.object);}catch(InterruptedException e){
e.printStackTrace();}}}
运行结果:
五、ReentrantReadWriteLock
1. 概念
- ReentrantReadWriteLock称为读写锁(写锁 + 悲观读锁),采用读写加锁分离的方式
- 高并发下读多写少时性能优于ReentrantLock
- 读读共享,写写互斥,读写互斥
- 共享指的共享锁,互斥指的是排他锁
- ReentrantReadWriteLock写锁是可重入锁
2. 代码实现
创建实例
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
创建读锁
ReadLock readLock = rwLock.readLock();
创建写锁
WriteLock writeLock = rwLock.writeLock();
读写锁都是通过调用
lock()
和unlock()
进行加锁解锁
publicclassReadWriteLockTest1{privatefinalReentrantReadWriteLock rwLock=newReentrantReadWriteLock();privatefinalReadLock readLock= rwLock.readLock();privatefinalWriteLock writeLock= rwLock.writeLock();//读publicvoidread(){try{
readLock.lock();System.out.println("当前线程:"+Thread.currentThread().getName()+"read进入...");Thread.sleep(3000);}catch(Exception e){
e.printStackTrace();}finally{System.out.println("当前线程:"+Thread.currentThread().getName()+"read退出...");
readLock.unlock();}}//写publicvoidwrite(){try{
writeLock.lock();System.out.println("当前线程:"+Thread.currentThread().getName()+"write进入...");Thread.sleep(3000);}catch(Exception e){
e.printStackTrace();}finally{System.out.println("当前线程:"+Thread.currentThread().getName()+"write退出...");
writeLock.unlock();}}publicstaticvoidmain(String[] args){finalReadWriteLockTest1 urrw=newReadWriteLockTest1();Thread t1=newThread(newRunnable(){@Overridepublicvoidrun(){
urrw.read();}},"t1");Thread t2=newThread(newRunnable(){@Overridepublicvoidrun(){
urrw.read();}},"t2");Thread t3=newThread(newRunnable(){@Overridepublicvoidrun(){
urrw.write();}},"t3");//读读模式
t1.start();// Read
t2.start();// Read}}
运行结果:
t2无需等待t1释放锁才执行
- 将读读模式修改为读写模式
//读写模式
t1.start();// Read
t3.start();// Write
运行结果:
六、StampedLock
1. 概念
StampedLock支持三种锁:写锁、悲观读锁、乐观读锁,相比ReentrantReadWriteLock多了乐观读锁
StampedLock中引入了一个stamp(邮戳)的概念,它代表线程获取到锁的版本,每次获得锁之后都会得到一个新的long值
写锁是排它锁(独占锁),相同时间只能有一个线程获取写锁,其他线程请求读锁和写锁 都会被阻塞
StampedLock的写锁是不可重入锁
- ReentrantLock锁是可重入锁
- ReentrantReadWriteLock的写锁是可重入锁
2. 写锁
- 创建实例
StampedLock lock = new StampedLock();
- 获取写锁
long stamp = lock.writeLock();
- 释放写锁
lock.unlockWrite(stamp);
参数需要传递要释放锁的long值 - 获取带超时时间的写锁
lock.tryWriteLock
如果没有获得写锁,返回值为0,不阻塞,向下运行
情况一:StampedLock写锁的正常使用
//创建StampedLock对象