java多线程学习记录--并发工具类

2022-10-07 10:58:19

并发工具类:

CountDownLatch:

await():进入等待状态。
countDown()计数器减一。
CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。
CountDownLatch是在java1.5被引入的,跟它一起被引入的并发工具类还有CyclicBarrier、Semaphore、ConcurrentHashMap和BlockingQueue,它们都存在于java.util.concurrent包下。CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

public void CountDownLatch(int count) {...}

构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值。

与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

其他N 个线程必须引用闭锁对象,因为他们需要通知CountDownLatch对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。所以当N个线程都调 用了这个方法,count的值等于0,然后主线程就能通过await()方法,恢复执行自己的任务。

多线程 join和countDownLatch.await()区别:

调用thread.join() 方法必须等thread 执行完毕,当前线程才能继续往下执行,而CountDownLatch通过计数器提供了更灵活的控制,只要检测到计数器为0当前线程就可以往下执行而不用管相应的thread是否执行完毕。

特有方法: 
public CountDownLatch(int count); //指定计数的次数,只能被设置1次
public void countDown();          //调用此方法则计数减1
public void await() throws InterruptedException   //调用此方法会一直阻塞当前线程,直到计时器的值为0,除非线程被中断。
Public Long getCount();           //得到当前的计数
Public boolean await(long timeout, TimeUnit unit) //调用此方法会一直阻塞当前线程,直到计时器的值为0,除非线程被中断或者计数器超时,返回false代表计数器超时。
From Object Inherited:
Clone、equals、hashCode、notify、notifyALL、wait等。

参考例子:

public class CountDownLatchDemo {  
    final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
    public static void main(String[] args) throws InterruptedException {  
        CountDownLatch latch=new CountDownLatch(2);//两个工人的协作  
        Worker worker1=new Worker("zhang san", 5000, latch);  
        Worker worker2=new Worker("li si", 8000, latch);  
        worker1.start();//  
        worker2.start();//  
        latch.await();//等待所有工人完成工作  
        System.out.println("all work done at "+sdf.format(new Date()));  
    }  
      
      
    static class Worker extends Thread{  
        String workerName;   
        int workTime;  
        CountDownLatch latch;  
        public Worker(String workerName ,int workTime ,CountDownLatch latch){  
             this.workerName=workerName;  
             this.workTime=workTime;  
             this.latch=latch;  
        }  
        public void run(){  
            System.out.println("Worker "+workerName+" do work begin at "+sdf.format(new Date()));  
            doWork();//工作了  
            System.out.println("Worker "+workerName+" do work complete at "+sdf.format(new Date()));  
            latch.countDown();//工人完成工作,计数器减一  
  
        }  
          
        private void doWork(){  
            try {  
                Thread.sleep(workTime);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
}

常见面试题:

  • 解释一下CountDownLatch概念?
  • CountDownLatch 和CyclicBarrier的不同之处?
  • 给出一些CountDownLatch使用的例子?
  • CountDownLatch 类中主要的方法?

CyclicBarrier:
在这里插入图片描述

CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障点。与CountDownLatch不同的是该barrier在释放等待线程后可以重用,所以称它为循环(Cyclic)的屏障(Barrier),CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。

如果某线程不是最后一个调用await()方法的线程,则它会一直处于等待状态,除非发生一下情况:

  • 最后一个线程到达,即index==0
  • 某个参与线程等待超时
  • 某个参与线程被中断
  • 调用CyclicBarrier的reset方法,该方法将屏障重置为初始状态。

原理详见:https://blog.csdn.net/qq_38293564/article/details/80558157

提供的方法:

//parties表示屏障拦截的线程数量,当屏障撤销时,先执行barrierAction,然后在释放所有线程
public CyclicBarrier(int parties, Runnable barrierAction)
//barrierAction默认为null
public CyclicBarrier(int parties)

/*
 *当前线程等待直到所有线程都调用了该屏障的await()方法
 *如果当前线程不是将到达的最后一个线程,将会被阻塞。解除阻塞的情况有以下几种
 *    1)最后一个线程调用await()
 *    2)当前线程被中断
    3)其他正在该CyclicBarrier上等待的线程被中断
    4)其他正在该CyclicBarrier上等待的线程超时
    5)其他某个线程调用该CyclicBarrier的reset()方法
 *如果当前线程在进入此方法时已经设置了该线程的中断状态或者在等待时被中断,
 将抛出InterruptedException,并且清除当前线程的已中断状态。
 *如果在线程处于等待状态时barrier被reset()或者在调用await()时 barrier 被损坏,将抛出 BrokenBarrierException 异常。
 *如果任何线程在等待时被中断,则其他所有等待线程都将抛出 BrokenBarrierException 异常,
 并将 barrier 置于损坏状态。 *如果当前线程是最后一个将要到达的线程,并且构造方法中提供了一个
 非空的屏障操作(barrierAction),那么在允许其他线程继续运行之前,当前线程将运行该操作。
 如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,并将 barrier 置于损坏状态。
 *
 *返回值为当前线程的索引,0表示当前线程是最后一个到达的线程
 */
public int await() throws InterruptedException, BrokenBarrierException
//在await()的基础上增加超时机制,如果超出指定的等待时间,则抛出 TimeoutException 异常。
如果该时间小于等于零,则此方法根本不会等待。
public int await(long timeout, TimeUnit unit) throws InterruptedException,
 BrokenBarrierException, TimeoutException

//将屏障重置为其初始状态。如果所有参与者目前都在屏障处等待,则它们将返回,同时抛出一个BrokenBarrierException。
public void reset()

例子:比赛时等待所有人准备好再开始

public class CyclicBarrierTest {
 
	public static void main(String[] args) throws IOException, InterruptedException {
		//如果将参数改为4,但是下面只加入了3个选手,这永远等待下去
		//Waits until all parties have invoked await on this barrier. 
		CyclicBarrier barrier = new CyclicBarrier(3);
 
		ExecutorService executor = Executors.newFixedThreadPool(3);
		executor.submit(new Thread(new Runner(barrier, "1号选手")));
		executor.submit(new Thread(new Runner(barrier, "2号选手")));
		executor.submit(new Thread(new Runner(barrier, "3号选手")));
 
		executor.shutdown();
	}
}
 
class Runner implements Runnable {
	// 一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)
	private CyclicBarrier barrier;
 
	private String name;
 
	public Runner(CyclicBarrier barrier, String name) {
		super();
		this.barrier = barrier;
		this.name = name;
	}
 
	@Override
	public void run() {
		try {
			Thread.sleep(1000 * (new Random()).nextInt(8));
			System.out.println(name + " 准备好了...");
			// barrier的await方法,在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
			barrier.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
		System.out.println(name + " 起跑!");
	}
}

结果:

3号选手 准备好了...
2号选手 准备好了...
1号选手 准备好了...
1号选手 起跑!
2号选手 起跑!
3号选手 起跑!

CyclicBarrier和CountDownLatch的区别:

看了各种资料和书,大家一致的意见都是CountDownLatch是计数器,只能使用一次,而CyclicBarrier的计数器提供reset功能,可以多次使用。但是我不那么认为它们之间的区别仅仅就是这么简单的一点。我们来从jdk作者设计的目的来看,javadoc是这么描述它们的:

CountDownLatch: A synchronization aid that allows one or more threads 
to wait until a set of operations being performed in other threads completes.
CyclicBarrier : A synchronization aid that allows a set of threads to all wait for
 each other to reach a common barrier point.
  • CountDownLatch:一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;
  • CyclicBarrier:多个线程互相等待,直到到达同一个同步点,再继续一起执行。

对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。

CountDownLatch是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行

semaphore:

Semaphore类是一个计数信号量,必须由获取它的线程释放, 通常用于限制可以访问某些资源(物理或逻辑的)线程数目。
一个信号量有且仅有3种操作,且它们全部是原子的:初始化、增加和减少
增加可以为一个进程解除阻塞;
减少可以让一个进程进入阻塞。

信号量维护一个许可集,若有必要,会在获得许可之前阻塞每一个线程:

      //从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。 
          acquireUninterruptibly(int permits){}

每一个release()添加一个许可,从而可能释放一个正在阻塞的获取者。
Semaphore只对可用许可的号码进行计数,并采取相应的行动。

如何获得Semaphore对象?
public Semaphore(int permits,boolean fair)
permits:初始化可用的许可数目。
fair: 若该信号量保证在征用时按FIFO的顺序授予许可,则为true,否则为false;

如何从信号量获得许可?
public void acquire() throws InterruptedException

如何释放一个许可,并返回信号量?
public void release()

代码实例:
20个人去银行存款,但是该银行只有两个办公柜台,有空位则上去存钱,没有空位则只能去排队等待

package com.xhj.thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * 线程信号量Semaphore的运用
 * 
 * @author XIEHEJUN
 * 
 */
public class SemaphoreThread {
    private int a = 0;

    /**
     * 银行存钱类
     */
    class Bank {
        private int account = 100;

        public int getAccount() {
            return account;
        }

        public void save(int money) {
            account += money;
        }
    }

    /**
     * 线程执行类,每次存10块钱
     */
    class NewThread implements Runnable {
        private Bank bank;
        private Semaphore semaphore;

        public NewThread(Bank bank, Semaphore semaphore) {
            this.bank = bank;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            int b = a++;
            if (semaphore.availablePermits() > 0) {
                System.out.println("线程" + b + "启动,进入银行,有位置立即去存钱");
            } else {
                System.out.println("线程" + b + "启动,进入银行,无位置,去排队等待等待");
            }
            try {
                semaphore.acquire();
                bank.save(10);
                System.out.println(b + "账户余额为:" + bank.getAccount());
                Thread.sleep(1000);
                System.out.println("线程" + b + "存钱完毕,离开银行");
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 建立线程,调用内部类,开始存钱
     */
    public void useThread() {
        Bank bank = new Bank();
        // 定义2个新号量
        Semaphore semaphore = new Semaphore(2);
        // 建立一个缓存线程池
        ExecutorService es = Executors.newCachedThreadPool();
        // 建立20个线程
        for (int i = 0; i < 10; i++) {
            // 执行一个线程
            es.submit(new Thread(new NewThread(bank, semaphore)));
        }
        // 关闭线程池
        es.shutdown();

        // 从信号量中获取两个许可,并且在获得许可之前,一直将线程阻塞
        semaphore.acquireUninterruptibly(2);
        System.out.println("到点了,工作人员要吃饭了");
        // 释放两个许可,并将其返回给信号量
        semaphore.release(2);
    }

    public static void main(String[] args) {
        SemaphoreThread test = new SemaphoreThread();
        test.useThread();
    }
}
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    private Semaphore smp = new Semaphore(3,true); //公平策略
    private Random rnd = new Random();
    
    class Task implements Runnable{
        private String id;
        Task(String id){
            this.id = id;
        }
        
        public void run(){
            try {
                smp.acquire();
                //smp.acquire(int permits);//使用有参数方法可以使用permits个许可
                System.out.println("Thread " + id + " is working");
                //System.out.println("在等待的线程数目:"+ smp.getQueueLength());
                work();
                System.out.println("Thread " + id + " is over");
            } catch (InterruptedException e) {
            }
            finally 
            {
                smp.release();
            }
        }
        
        public void work() {//假装在工作,实际在睡觉
            int worktime = rnd.nextInt(1000);
            System.out.println("Thread " + id + " worktime  is "+ worktime);
            try {
                Thread.sleep(worktime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args){
        SemaphoreDemo semaphoreDemo = new SemaphoreDemo();
        ExecutorService se = Executors.newCachedThreadPool();
        se.submit(semaphoreDemo.new Task("a"));
        se.submit(semaphoreDemo.new Task("b"));
        se.submit(semaphoreDemo.new Task("c"));
        se.submit(semaphoreDemo.new Task("d"));
        se.submit(semaphoreDemo.new Task("e"));
        se.submit(semaphoreDemo.new Task("f"));
        se.shutdown();
    }
}

面试题思考
在很多情况下,可能有多个线程需要访问数目很少的资源。假想在服务器上运行着若干个回答客户端请求的线程。这些线程需要连接到同一数据库,但任一时刻
只能获得一定数目的数据库连接。你要怎样才能够有效地将这些固定数目的数据库连接分配给大量的线程?

答:1.给方法加同步锁,保证同一时刻只能有一个人去调用此方法,其他所有线程排队等待,但是此种情况下即使你的数据库链接有10个,也始终只有一个处于使 用状态。这样将会大大的浪费系统资源,而且系统的运行效率非常的低下。

2.另外一种方法当然是使用信号量,通过信号量许可与数据库可用连接数相同的数目,将大大的提高效率和性能。

Exchanger:

用于线程交换数据,提供一个同步点在这个同步点两个线程可以交换彼此的数据,通过exchange方法交换。如果第一个线程先执行exchange方法他会一直等待第二个线程也执行exchange,因此该工具类的线程对象是成对的。

提供的方法:

1     // 等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。
2     public V exchange(V x) throws InterruptedException
3     //增加超时机制,超过指定时间,抛TimeoutException异常
4     public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

使用实例:

class FillAndEmpty {
    Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
    DataBuffer initialEmptyBuffer = ... a made-up type
    DataBuffer initialFullBuffer = ...
    
    //填充缓冲区线程
    class FillingLoop implements Runnable {
        public void run() {
            DataBuffer currentBuffer = initialEmptyBuffer;    //空的缓冲区
            try {
                while (currentBuffer != null) {
                    addToBuffer(currentBuffer);    //填充数据
                    //如果缓冲区被数据填满,执行exchange。等待清空缓冲区线程也执行exchange方法。当两个线程都到达同步点,交换数据。
                    if (currentBuffer.isFull())
                        currentBuffer = exchanger.exchange(currentBuffer);    
                }
            } catch (InterruptedException ex) { ... handle ... }
        }
    }
    
    //清空缓冲区线程
    class EmptyingLoop implements Runnable {
        public void run() {
            DataBuffer currentBuffer = initialFullBuffer;    //满的缓冲区
            try {
                while (currentBuffer != null) {
                    takeFromBuffer(currentBuffer);    //清空缓冲区
                    //如果缓冲区被清空,执行exchange。等待填充缓冲区线程也执行exchange方法。当两个线程都到达同步点,交换数据。
                    if (currentBuffer.isEmpty())
                        currentBuffer = exchanger.exchange(currentBuffer);
                }
            } catch (InterruptedException ex) { ... handle ...}
        }
    }

    void start() {
        new Thread(new FillingLoop()).start();
        new Thread(new EmptyingLoop()).start();
    }
}

实现原理参见:https://www.cnblogs.com/zaizhoumo/p/7787169.html

  • 作者:蓝莲花@com
  • 原文链接:https://blog.csdn.net/sdfgtr/article/details/90080737
    更新时间:2022-10-07 10:58:19