Java线程并发常用工具类使用

2022-10-28 09:28:21

这次整理了一些比较常用的线程工具类啦。

CountDownLatch:在一组线程执行完后,才能开始执行调用等待的线程。上片文章提到过junit的测试尽量不要测试线程,如果硬是要可以使用CountDownLatch进行测试

CyclicBarrier:在一组线程中调用等待方法后,只有这组所有线程都进入等待后,会执行一个指定的线程,在指定的线程执行完后,这组等待的线程才能继续执行。

Semaphore:可用于限流使用。

Exchanger:当两组线程都执行到交换的方法时,能将数据在这两个线程之间进行数据交换。

CountDownLatch

该类实现主要是由一个内部类Sync实现的,Sync继承了AbstractQueuedSynchronizer(就是经常提到的AQS),

常用的方法有两个:

1.await():线程调用该方法进入带阻塞状态,只有当调用countDown()并骤减到0的时候,才能继续执行

2.countDown():线程骤减一个单位。

具体实现:

public class CountDownLatchMain {
	
	static CountDownLatch latch = new CountDownLatch(6);
	
	static class InitThread implements Runnable{

		public void run() {
			try {
				TimeUnit.MILLISECONDS.sleep(200L);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			System.out.println("Thread name:"+Thread.currentThread().getName()+" init ...");
			latch.countDown();
		}
		
	}
	
	static class BusinessThread implements Runnable{

		public void run() {
			try {
				latch.await();
			} catch (InterruptedException e1) {
				e1.printStackTrace();
			}
			for(int i=0;i<3;i++) {
				try {
					TimeUnit.MILLISECONDS.sleep(100L);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("Thread name : " + Thread.currentThread().getName()+" work_" + i);
			}
			
		}
		
	}
	
	public static void main(String[] args) throws InterruptedException {
		
		new Thread(new Runnable() {

			public void run() {
				latch.countDown();
				System.out.println("thread name "+Thread.currentThread().getName()+" 1st init ...");
				try {
					TimeUnit.MILLISECONDS.sleep(1000);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				latch.countDown();
				System.out.println("thread name "+Thread.currentThread().getName()+" 2nd init ...");
			}
			
		},"Thread-0").start();
		new Thread(new BusinessThread()).start();

		for(int i=0;i<=4 ;i++) {
			new Thread(new InitThread()).start();
		}
		
		latch.await();
		TimeUnit.MILLISECONDS.sleep(300);
		System.out.println("main end ...");
	}
}

 执行结果:

thread name Thread-0 1st init ...
Thread name:Thread-1 init ...
Thread name:Thread-3 init ...
Thread name:Thread-2 init ...
Thread name:Thread-5 init ...
Thread name:Thread-4 init ...
Thread name : Thread-0 work_0
Thread name : Thread-0 work_1
Thread name : Thread-0 work_2
main end ...
thread name Thread-0 2nd init ...

CyclicBarrier

与CountDownLatch差不多,都是等待线程执行完后,才能继续执行,不过这两个不同的地方就是:CountDownLatch需要手动在逻辑代码中进行骤减,减到临界点后,阻塞的线程会继续执行,而CountDownLatch是一组线程都进入到阻塞状态后,然后执行指定线程执行完后,那组阻塞的线程才能继续执行。 示例:

public class CyclicBarrierMain {
	
	static CyclicBarrier barrier = new CyclicBarrier(5,new Runnable() {

		public void run() {
			System.out.println("Thread name : " + Thread.currentThread().getName() + " barrier start...");
			try {
				TimeUnit.MILLISECONDS.sleep(100L);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			System.out.println("Thread name : " + Thread.currentThread().getName() + " barrier thread ...");
		}
		
	});
	
	static class SubThread implements Runnable{

		public void run() {
			long sleep = (long) (Math.random()*1000);
			System.out.println("thread name : " + Thread.currentThread().getName() + " init ...");
			try {
				TimeUnit.MILLISECONDS.sleep(sleep);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			try {
				System.out.println("thread name : " + Thread.currentThread().getName() + " sleep time:"+sleep);
				barrier.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (BrokenBarrierException e) {
				e.printStackTrace();
			}
			System.out.println("thread name : " + Thread.currentThread().getName() + " end ....");
		}
		
	}
	
	public static void main(String[] args) {
		
		for(int i =0 ; i<5;i++) {
			new Thread(new SubThread()).start();
		}
		
	}
}

执行结果:

thread name : Thread-2 init ...
thread name : Thread-4 init ...
thread name : Thread-1 init ...
thread name : Thread-0 init ...
thread name : Thread-3 init ...
thread name : Thread-2 sleep time:405
thread name : Thread-0 sleep time:488
thread name : Thread-1 sleep time:564
thread name : Thread-4 sleep time:777
thread name : Thread-3 sleep time:860
Thread name : Thread-3 barrier start...
Thread name : Thread-3 barrier thread ...
thread name : Thread-3 end ....
thread name : Thread-2 end ....
thread name : Thread-1 end ....
thread name : Thread-0 end ....
thread name : Thread-4 end ....

Semaphore

主要用于需要做限制的场景,比如限制连接池获取次数等等,也有一个Sync内部类继承了AQS

常用方法:

1、acquire():骤减一个单位,也可调用带参的方法可指定减值,当骤减到0的时候,调用该方法会进入阻塞状态。

2、release():释放一个单位,也会在初始化的数量进行增加。

3、availablePermits():得到可获取单位的数量。

4、getQueueLength():调用了acquire()方法并进入到阻塞状态的总数量。

具体用法:

Semaphore semaphore = new Semaphore(5);
semaphore.acquire();//也可指定减少多个semaphore.acquire(2);
//...第6个acquire()方法时,再次调用将进入等待,直到在某个线程中执行了semaphore.release()方法才会继续执行后面的
//...

Exchanger

主要用于两个线程之间的数据交换,个人觉得这个用处不大,既然看到了这个,也就顺便整理了一下

使用示例:

public class UseExcahnger {
	
	static Exchanger<Set<String>> exchanger = new Exchanger<Set<String>>();
	
	static class ThreadOne extends Thread{
		@Override
		public void run() {
			Set<String> set = new HashSet<String>();
			set.add("1");
			set.add("2");
			
			try {
				System.out.println(Thread.currentThread().getName() + " > set:" + set);
				Thread.sleep(2000L);
				Set<String> exchange = exchanger.exchange(set);
				System.out.println(Thread.currentThread().getName() + " > " + exchange);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	static class ThreadTwo extends Thread{
		@Override
		public void run() {
			Set<String> set = new HashSet<String>();
			set.add("3");
			set.add("4");
			
			try {
				System.out.println(Thread.currentThread().getName() + " > set:" + set);
				Thread.sleep(3000L);
				Set<String> exchange = exchanger.exchange(set);
				System.out.println(Thread.currentThread().getName() + " > " + exchange);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	public static void main(String[] args) {
		new ThreadOne().start();
		new ThreadTwo().start();
	}
}

执行结果为:

Thread-0 > set:[1, 2]
Thread-1 > set:[3, 4]
Thread-1 > [1, 2]
Thread-0 > [3, 4]

Future/FutureTask

这个在之前的提到过,与Callable一起使用,用来做回调的,个人觉得这个与之前的Fork/Join的分而治之有些相似,都是异步同时执行完后将结果返回,然后发现相似之后,回去看了一下源代码

RecursiveActionRecursiveTask<T>都分别继承了Future接口,而FutureTask也继承了Future、Runnable,所以FutureTask既能作为Callable带有返回结果,也能作为Thread去执行它。

这里就介绍一下类中的一些方法,示例的话可以翻看之前的文章

1、isDone():判断线程是否已结束。

2、boolean cancel(boolean mayInterruptIfRunning):参数为true是中断线程,但是只会发送中断信号,在程序中需要自行判断,参数为false则不会中断,返回值为true,如果线程已结束或未开始则返回false。

3、isCancelled():判断线程是否关闭。

4、get():获取线程返回值。

好啦,就先整理这些啦,后面还有一些还在整理,后期会继续分享的呀,如果有问题烦请各路大佬指出

  • 作者:IT_小白鼠
  • 原文链接:https://blog.csdn.net/qq_40874285/article/details/115406795
    更新时间:2022-10-28 09:28:21