JAVA多线程之同步容器&并发容器

2022年11月2日12:55:19

同步容器

在java早期,提供了Vector和HashTable两个同步容器,分别是List和Map的具体实现。
查看其源码,实现线程同步的方法是对每个公共使用synchronize关键字,在方法上实现同步。源码如下:

//vector.add(e)publicsynchronizedbooleanadd(E e){
        modCount++;ensureCapacityHelper(elementCount+1);
        elementData[elementCount++]= e;returntrue;}
//hashTable.get(key)publicsynchronized Vget(Object key){
        Entry<?,?> tab[]= table;int hash= key.hashCode();int index=(hash&0x7FFFFFFF)% tab.length;for(Entry<?,?> e= tab[index]; e!= null; e= e.next){if((e.hash== hash)&& e.key.equals(key)){return(V)e.value;}}return null;}

通过源代码可看出,这两个同步容器的实现非常简单,仅仅只是在执行方法上加上同步,那么其实这样做会产生一些问题。一是在一些符合操作的时候,并不是线程安全,例如如下代码:

publicvoidm1(){int index=vector.size()-1;
        vector.remove(index);}

在上段代码中,vector.size()和vector.add(e)都是同步方法,但是在执行m1()方法时,仍然会线程不安全,解决方法任然是在m1方法上加上同步,如下:

publicsynchronizedvoidm1(){int index=vector.size()-1;
        vector.remove(index);}

二是这种在公共方法加synchronized的方法,使得每个线程一个一个进入临界区,其实也就是相当于对普通的list或者map手动加synchronized或者lock,线程串行运行,降低并发性,效率低下。


并发容器

上面说同步容器存在两个问题,那么使用并发容器可以有效地提高并发性。
并发容器跟同步容器比具有以下特点:

  1. 有针对使用情境的设计,减少锁的使用。比如CopyOnWrite…,读并不加锁,写时复制。
  2. 定义一些线程安全的符合操作。
  3. 在迭代时,可以不加synchronized。但是会产生数据脏读。

并发容器是juc包中提供的一系列容器类,里面包括了BlockingQueue(BlockingDeque是双向队列,姑且算是一类)和ConcurrentMap两个接口及其实现类,和一些CopyOnWriteXxx与ConcurrentXxx类

BlockingQueue
BlockingQueue主要有ArrayBlockingQueue,LinkedBlocking,SynchronousQueue三种实现
以ArrayBlockingQueue为例,看下具体使用和实现
使用示例:

/**
 * 阻塞队列
 * 示例:生产消费问题
 */publicclassT08_BlockingQueue{publicstaticvoidmain(String[] args){
        BlockingQueue<Object> queue=newArrayBlockingQueue<>(10);
        Produce p=newProduce(queue);
        Custom c=newCustom(queue);newThread(p).start();newThread(c).start();}}classProduceimplementsRunnable{

    BlockingQueue<Object> queue;Produce(BlockingQueue<Object> queue){this.queue=queue;}@Overridepublicvoidrun(){while(true){try{this.queue.put(product());}catch(InterruptedException e){
                e.printStackTrace();}}}
    Objectproduct(){returnnewObject();}}classCustomimplementsRunnable{
    BlockingQueue<Object> queue;Custom(BlockingQueue<Object> queue){this.queue=queue;}@Overridepublicvoidrun(){while(true){try{this.queue.take();}catch(InterruptedException e){
                e.printStackTrace();}}}}

上述代码实现了生产者和消费者模式,使用了put(e)和take()方法。查看源码,put和take方法源码如下:

publicvoidput(E e)throws InterruptedException{checkNotNull(e);final ReentrantLock lock=this.lock;
        lock.lockInterruptibly();try{while(count== items.length)
                notFull.await();enqueue(e);}finally{
            lock.unlock();}}public Etake()throws InterruptedException{final ReentrantLock lock=this.lock;
        lock.lockInterruptibly();try{while(count==0)
                notEmpty.await();returndequeue();}finally{
            lock.unlock();}}

put和take执行逻辑如下:

  • 加锁
  • 判断当前队列是否已空(已满),若是,则当前线程等待
  • 若不是,添加(移除)元素,唤醒另一个condition线程
  • 释放锁

观察BlockingQueue接口,不单单有put和take方法,还有add(e),offer(e),remove(),poll()等功功能相似的方法,其具体使用区别如下:

抛出异常 返回特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
获取首个元素 element() peek()

BlockingQueue允许多个线程对其进行读写操作,与同步容器比,极大提高并发量。


CopyOnWriteXxx
在juc下。CopyOnWrite写时复制容器是一种常用的并发容器,它通过多线程下读写分离来达到提高并发性能的目的。也就是说在读的时候任何时候,任何线程都可以,无需加锁,但是写的时候需要加锁,且构造一个容器的Copy,在这个备份中进行操作,完成后将容器替换为新容器即可。
CopyOnWriteXxx类有两个,CopyOnWriteArrayList和CopyOnWriteListSet,以CopyOnWriteArrayList为例进行分析。
add(e)和get(index)源码如下:

publicbooleanadd(E e){final ReentrantLock lock=this.lock;//获取独占锁
        lock.lock();try{
            Object[] elements=getArray();int len= elements.length;//复制一份copy
            Object[] newElements= Arrays.copyOf(elements, len+1);//插入数据
            newElements[len]= e;//更新为容器setArray(newElements);returntrue;}finally{//释放锁
            lock.unlock();}}public Eget(int index){//直接读取index位置上的数据returnget(getArray(), index);}

从源码可以看出,读取时(get)并没有对方法进行加锁,也就是说,任意线程都能够并发的访问数组数据,而在修改时(add),却要获得独占锁,也就是说同一时间只能有一个线程可以修改,而copy一份数组作为当前操作的对象,避免和读线程发生冲突。
从以上分析推出,CopyOnWrite要比同步容器的并发效率高很多,然而即便如此,仍然有其缺点和局限性:

  • 调用add时,需要开辟一段空间保存原数组备份,修改完后将指针指向当前备份数组,原数组会被GC。所以当高并发修改时,则会造成频繁的开辟空间和频繁GC,对性能有一点影响,这种情况CopyOnWrite并不适用;
  • 由于修改时是用的原数组的备份,则若此时有线程正在读取数据时,新的数组还没有更新,此时新修改的数据就不会被读取,因此,在对数据有较强一致性要求的情况下也不适用。

ConcurrentMap
在juc中,ConcurrentMap是提供线程安全性和原子性保证的Map。
ConcurrentMap的实现类有两个,ConcurrentHashMap和ConcurrentSkipListMap,以ConcurrentHashMap为例进行分析,ConcurrentHashMap可以理解为HashMap的并发版本,它的底层实现仍然使用的是与HashMap一般的数组+链表+红黑树的实现数据结构,只是在操作上增加了并发控制。
与CopyOnWrite的锁机制不同,ConcurrentMap的锁的颗粒度更小,使用的是一种叫分段锁的东西。这种锁机制能够使得任意读取线程都能并发访问容器,同时且可以允许一定量的写线程并发修改容器。
所谓的分段锁,就是对容器内的数据分成若干段,对每一段数据分别进行加锁。毕竟多个线程修改的数据很有可能不在一块,当修改一块数据的时候却把所有数据锁住是不合适的。所以使用分段锁时,当多个线程分别修改不同段的数据时并不会造成冲突;
下面以put()方法源码进行分析:

final VputVal(K key, V value,boolean onlyIfAbsent){if(key== null|| value== null)thrownewNullPointerException();//计算桶的hash值,确定桶的位置int hash=spread(key.hashCode());int binCount=0;for(Node<K,V>[] tab= table;;){
            Node<K,V> f;int n, i, fh;if(tab== null||(n= tab.length)==0)//
                tab=initTable();elseif((f=tabAt(tab, i=(n-1)& hash))== null){//若当前桶还么有元素,则cas式添加元素if(casTabAt(tab, i, null,newNode<K,V>(hash, key, value, null)))break;// no lock when adding to empty bin}//扩容时,帮助扩容elseif((fh= f.hash)== MOVED)
                tab=helpTransfer(tab, f);else{
                V oldVal= null;//hash冲突时锁住当前需要添加节点的头元素synchronized(f){if(tabAt(tab, i)== f){if(fh>=0){
                            binCount=1;for(Node<K,V> e= f;;++binCount){
                                K ek;if(e.hash== hash&&((ek= e.key)== key||(ek!= null&& key.equals(ek)))){
                                    oldVal= e.val;if(!onlyIfAbsent)
                                        e.val= value;break;}
                                Node<K,V> pred= e;if((e= e.next)== null){
                                    pred.next=newNode<K,V>(hash, key,
                                                              value, null);break;}}}elseif(finstanceofTreeBin){
                            Node<K,V> p;
                            binCount=2;if((p=((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value))!= null){
                                oldVal= p.val;if(!onlyIfAbsent)
                                    p.val= value;}}}}if(binCount!=0){if(binCount>= TREEIFY_THRESHOLD)treeifyBin(tab, i);if(oldVal!= null)return oldVal;break;}}}addCount(1L, binCount);return null;}

put的执行逻辑如下:

  1. 计算当前key的hash值,确定桶的位置
  2. 若此时桶为空,则使用CAS操作插入新节点
  3. 若此时正在扩容,则协助扩容。
  4. 在发生hash冲突时仅仅只锁住当前需要添加节点的头元素即可,其他桶节点都不需要加锁,大大减小了锁粒度。
  5. 添加节点,并调整至合适的结构(当前若是链表且长度过长时转为红黑树)
    ConcurrentHashMap使用的是CAS+synchronized实现并发控制,分段锁是针对每个桶加锁,也就是说访问不同桶中的数据多个线程可以并发访问。

总结

同步容器和并发容器都是java提供的在多线程环境下使用线程安全的容器,通过各种并发控制机制,都有其使用领域和局限性,使用的时候还是需要考虑考虑。

  • 作者:十万大山深处
  • 原文链接:https://blog.csdn.net/shanhaikeping/article/details/109223358
    更新时间:2022年11月2日12:55:19 ,共 5583 字。