Java多线程并发容器

2022-09-10 07:59:24

容器主要是为了后面的线程池做铺垫

从Vector到Queue的发展

代码解释:
有一万张车票,分10个窗口卖票(也就是10个线程),看看各种容器在这种场景下会不会超卖以及效率。

总结:
1- ArrayList 没有加锁 线程不安全 超卖
2- Vector size和remove都有加锁 但是他们2个中间没有加锁 会超卖
3- LinkedList 使用了并发容器 并且加了synchronized 可以实现 但是效率不是最高方案
4- 效率最高的queue 多线程的单个元素的时候可以考虑用queue
import java.util.*;import java.util.concurrent.ConcurrentLinkedQueue;/**
 * @program: bike-lease
 * @description: 容器
 * @author: Jack.Fang
 * @date:2021-06-29 下午 1:43
 **/publicclassTestSeller{//1- 没有加锁 线程不安全 超卖static List<String> tickes=newArrayList<>();//2- size和remove都有加锁 但是他们2个中间没有加锁 会超卖static Vector<String> tickes2=newVector<>();//3- 使用了并发容器 并且加了synchronized 可以实现 但是效率不是最高方案static List<String> tickes3=newLinkedList<>();//4- 效率最高的queue 多线程的单个元素的时候可以考虑用queuestatic Queue<String> tickes4=newConcurrentLinkedQueue<>();static{//        for(int i=0;i<10000; i++) tickes.add("票编号"+i);//        for(int i=0;i<10000; i++) tickes2.add("票编号"+i);//        for(int i=0;i<10000; i++) tickes3.add("票编号"+i);for(int i=0;i<10000; i++) tickes4.add("票编号"+i);}publicstaticvoidmain(String[] args){for(int i=0;i<10;i++){newThread(()->{//1- ArrayList 没有加锁 线程不安全 超卖/*while (tickes.size() > 0) {
                    System.out.println("销售了--" + tickes.remove(0));
                }*///2- Vector size和remove都有加锁 但是他们2个中间没有加锁 会超卖/*while (tickes2.size() > 0) {
                    try {
                        TimeUnit.MICROSECONDS.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("销售了--" + tickes2.remove(0));
                }*///3- LinkedList 使用了并发容器 并且加了synchronized 可以实现 但是效率不是最高方案/*while (true){
                    synchronized (tickes3){
                        if(tickes3.size()<=0) break;

                        try {
                            TimeUnit.MICROSECONDS.sleep(10);
                            System.out.println("销售了--" + tickes3.remove(0));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("销售了--" + tickes3.remove(0));
                    }
                }*///4- 效率最高的queue 多线程的单个元素的时候可以考虑用queuewhile(true){synchronized(tickes4){
                        String s= tickes4.poll();if(s==null)break;
                        System.out.println("销售了--"+ s);}}}).start();}}}

concurrentMap、concurrentSkipListMap

concurrentHashMap 用hash表实现的一个高并发容器
由于没有concurrentTreeMap高并发的排序容器,于是就有了concurrentSkipListMap跳表结构来实现排序

copyOnWrite 写时复制

高并发有个经常使用的类copyOnWrite, 有两个copyOnWriteList、copyOnWriteSet。
copyOnWrite的意思叫写时复制

在读比较多 写比较少的情况下用copeOnWrite

blockingQueue 阻塞队列

它提供了一系列方法,我们可以在这些方法的基础之上做到让线程实现自动的阻塞。

这Queue提供了一些友好的接口,第一个就是offer对应原来的那个add,提供了poll取数据,然后提供了peek
拿出来这个数据。offer就是往里头添加,加进去它会给你一个布尔值的返回值,原来的add加不进去会抛异常,
所以一般情况下queue用的最多的offer,
peek概念是取并不是remove掉,poll是取并且remove掉,
而且这几个对于blockingQueue来说也确实是线程安全的一个操作。
import java.util.*;import java.util.concurrent.ConcurrentLinkedQueue;publicstaticvoidmain(String[] args){
        Queue<String> strs=newConcurrentLinkedQueue<>();for(int i=0;i<10;i++){
            strs.offer("a"+i);//add}
        System.out.println(strs);
        System.out.println(strs.size());

        System.out.println(strs.poll());
        System.out.println(strs.size());

        System.out.println(strs.peek());
        System.out.println(strs.size());}

LinkedBlockingQueue 无界队列

用链表实现的BlockingQueue,是一个无界队列。就是它可以一直装到你内存满了为止。

ArrayBlockingQueue 有界队列

你可以指定它一个固定的值10,它容器就是10,那么当你往里面扔容器的时候,一旦他满了这个put方法
就会阻塞住。
然后你可以看看用add方法满了之后他会报异常。offer用返回值来判断到底加没加成功,offer还有另外一个写法
你可以指定一个时间尝试着往里面加一秒,一秒之后如果加不进去它就返回了。

Queue和List的区别到底在哪里,主要就在这里,添加了offer、peek、poll、put、take
这些个对线程友好的或者阻塞,或者等待方法。

DelayQueue 延时队列

他可以实现在时间上的排序,它能实现按照在里面等待的时间来进行排序。它是BlockingQueue的一种也是用于
阻塞的队列,这个阻塞队列装任务的时候要求你必须实现Delayed接口,往后拖延推迟,它需要做一个比较
compareTo,最后这个队列的实现,DelayQueue 就是按照时间进行任务调度。

常用于业务场景:30分钟内订单未支付时,则关闭订单。
import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.DelayQueue;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;/**
 * @program: bike-lease
 * @description: 延时队列
 * @author: Jack.Fang
 * @date:2021-06-29 下午 4:52
 **/publicclassTestDelayQueue{static BlockingQueue<MyTask> tasks=newDelayQueue<>();static Random r=newRandom();staticclassMyTaskimplementsDelayed{

        String name;long runningTime;MyTask(String name,long runningTime){this.name=name;this.runningTime=runningTime;}@OverridepubliclonggetDelay(TimeUnit unit){return unit.convert(runningTime- System.currentTimeMillis(),TimeUnit.MILLISECONDS);}@OverridepublicintcompareTo(Delayed o){if(this.getDelay(TimeUnit.MILLISECONDS)<o.getDelay(TimeUnit.MILLISECONDS))return-1;elseif(this.getDelay(TimeUnit.MILLISECONDS)>o.getDelay(TimeUnit.MILLISECONDS))return1;elsereturn0;}@Overridepublic StringtoString(){return name+" "+ runningTime;}}publicstaticvoidmain(String[] args)throws InterruptedException{long now= System.currentTimeMillis();
        MyTask t1=newMyTask("t1",now+1000);
        MyTask t2=newMyTask("t2",now+2000);
        MyTask t3=newMyTask("t3",now+1500);
        MyTask t4=newMyTask("t4",now+2500);
        MyTask t5=newMyTask("t5",now+500);

        tasks.put(t1);
        tasks.put(t2);
        tasks.put(t3);
        tasks.put(t4);
        tasks.put(t5);

        System.out.println(tasks);for(int i=0;i<5;i++){
            System.out.println(tasks.take());}}}
DelayQueue本质上用的是一个PriorityQueue,PriorityQueue是从AbstractQueue继承的。
PriorityQueue特点是它内部你往里面装的时候并不是按顺序装的,而是内部进行了一个排序,
按照优先级,最小的优先。它内部实现的结构是一个二叉树,这个二叉树可以认为是
堆排序里的那个最小堆值排在最上面。
publicstaticvoidmain(String[] args){

        PriorityQueue<String> q=newPriorityQueue<>();

        q.add("c");
        q.add("e");
        q.add("a");
        q.add("d");
        q.add("z");for(int i=0;i<5;i++){
            System.out.println(q.poll());}}

结果
a
c
d
e
z

SynchronousQueue

它的容量为0,它不是用来装东西的,是专门用来两个线程之间传递内容的,给线程下达任务的,
类似于Exchanger。

这个queue和其他的queue最大的区别就是你不能往里面装东西,只能用来阻塞式的put调用,要求是前面得有人
等着拿这个东西的时候你才可以往里装,但容量为0

SynchronousQueue 看似没有用,但它在线程池里的用处特别大,很多线程取任务,
互相之间进行任务的一个调度的时候用的就是它。
publicstaticvoidmain(String[] args)throws InterruptedException{

        BlockingQueue<String> strings=newSynchronousQueue<>();newThread(()->{try{
                System.out.println(strings.take());}catch(InterruptedException e){
                e.printStackTrace();}}).start();
        strings.put("aaa");//阻塞等待消费者消费//        strings.put("bbb");//        strings.add("aaa");

        System.out.println(strings.size());}

TransferQueue 传递

是上面各种queue的一个组合,他可以给线程来传递任务,与此同时不像是SynchronousQueue 只能传递一个,
TransferQueue做成列表可以传好多个,比较牛逼的是它添加了一个transfer方法,如果我们用put就相当于
一个线程来了往里一装它就走了,transfer就是装完在这等着,阻塞等有人把它取走我这个线程才回去干我自己
的事情。

一般使用场景:是我做了一件事情,我这个事情要求有一个结果,有了这个结果之后我可以继续进行我下面的这个
事情的时候,比方说我付了钱,这个订单我付账完成了,但是我一直要等待这个付账的结果完成才可以给客户反馈。
import java.util.concurrent.LinkedTransferQueue;/**
 * @program: bike-lease
 * @description: 传递队列
 * @author: Jack.Fang
 * @date:2021-06-29 下午 5:53
 **/publicclassTestTransfer{publicstaticvoidmain(String[] args)throws InterruptedException{

        LinkedTransferQueue<String> strings=newLinkedTransferQueue<>();newThread(()->{try{
                System.out.println(strings.take());}catch(InterruptedException e){
                e.printStackTrace();}}).start();
        strings.transfer("aaa");


        strings.put("aaaa");newThread(()->{try{
                System.out.println(strings.take());}catch(InterruptedException e){
                e.printStackTrace();}}).start();}
  • 作者:Jack方
  • 原文链接:https://blog.csdn.net/fangchao2011/article/details/118337634
    更新时间:2022-09-10 07:59:24