Java并发 同步容器与并发容器

2022-09-14 12:35:59

  一、同步容器

  在Java的集合容器框架中,主要有四大类别:List、Set、Queue、Map。List、Set、Queue接口分别继承了Collection接口,Map本身是一个接口。
  注意Collection和Map是一个顶层接口,而List、Set、Queue则继承了Collection接口,分别代表数组、集合和队列这三大类容器。像ArrayList、LinkedList都是实现了List接口,HashSet实现了Set接口,而Deque(双向队列,允许在队首、队尾进行入队和出队操作)继承了Queue接口,PriorityQueue实现了Queue接口。另外LinkedList(实际上是双向链表)实现了了Deque接口。像ArrayList、LinkedList、HashMap这些容器都是非线程安全的。如果有多个线程并发地访问这些容器时,就会出现问题。因此,在编写程序时,必须要求程序员手动地在任何访问到这些容器的地方进行同步处理,这样导致在使用这些容器的时候非常地不方便。所以,Java提供了同步容器供用户使用。

  在Java中,同步容器主要包括2类:

  1)Vector、Stack、HashTable

  2)Collections类中提供的静态工厂方法创建的类
  Vector实现了List接口,Vector实际上就是一个数组,和ArrayList类似,但是Vector中的方法都是synchronized方法,即进行了同步措施。
  Stack也是一个同步容器,它的方法也用synchronized进行了同步,它实际上是继承于Vector类。
  HashTable实现了Map接口,它和HashMap很相似,但是HashTable进行了同步处理,而HashMap没有。
  Collections类是一个工具提供类,注意,它和Collection不同,Collection是一个顶层的接口。在Collections类中提供了大量的方法,比如对集合或者容器进行排序、查找等操作。最重要的是,在它里面提供了几个静态工厂方法来创建同步容器类,如下图所示:


  从同步容器的具体实现源码可知,同步容器中的方法采用了synchronized进行了同步,那么很显然,这必然会影响到执行性能,另外,同步容器就一定是真正地完全线程安全吗?不一定,这个在下面会讲到。
  我们首先来看一下传统的非同步容器和同步容器的性能差异,我们以ArrayList和Vector为例:
1.性能问题
  我们先通过一个例子看一下Vector和ArrayList在插入数据时性能上的差异:

public class Test {
    public static void main(String[] args) throws InterruptedException {
        ArrayList<Integer> list = new ArrayList<Integer>();
        Vector<Integer> vector = new Vector<Integer>();
        long start = System.currentTimeMillis();
        for(int i=0;i<100000;i++)
            list.add(i);
        long end = System.currentTimeMillis();
        System.out.println("ArrayList进行100000次插入操作耗时:"+(end-start)+"ms");
        start = System.currentTimeMillis();
        for(int i=0;i<100000;i++)
            vector.add(i);
        end = System.currentTimeMillis();
        System.out.println("Vector进行100000次插入操作耗时:"+(end-start)+"ms");
    }
}
  这段代码在我机器上跑出来的结果是:


  进行同样多的插入操作,Vector的耗时是ArrayList的两倍。这只是其中的一方面性能问题上的反映。

  另外,由于Vector中的add方法和get方法都进行了同步,因此,在有多个线程进行访问时,如果多个线程都只是进行读取操作,那么每个时刻就只能有一个线程进行读取,其他线程便只能等待,这些线程必须竞争同一把锁。因此为了解决同步容器的性能问题,在Java 1.5中提供了并发容器,位于java.util.concurrent目录下,并发容器的相关知识将在下一篇文章中讲述。
  2.同步容器真的是安全的吗?
  也有人认为Vector中的方法都进行了同步处理,那么一定就是线程安全的,事实上这可不一定。看下面这段代码:

public class Test {
    static Vector<Integer> vector = new Vector<Integer>();
    public static void main(String[] args) throws InterruptedException {
        while(true) {
            for(int i=0;i<10;i++)
                vector.add(i);
            Thread thread1 = new Thread(){
                public void run() {
                    for(int i=0;i<vector.size();i++)
                        vector.remove(i);
                };
            };
            Thread thread2 = new Thread(){
                public void run() {
                    for(int i=0;i<vector.size();i++)
                        vector.get(i);
                };
            };
            thread1.start();
            thread2.start();
            while(Thread.activeCount()>10)   {
                 
            }
        }
    }
}
  在我机器上运行的结果:

  正如大家所看到的,这段代码报错了:数组下标越界。
  也许有朋友会问:Vector是线程安全的,为什么还会报这个错?很简单,对于Vector,虽然能保证每一个时刻只能有一个线程访问它,但是不排除这种可能:
  当某个线程在某个时刻执行这句时:

for(int i=0;i<vector.size();i++)
    vector.get(i);
  假若此时vector的size方法返回的是10,i的值为9。然后另外一个线程执行了这句:
for(int i=0;i<vector.size();i++)
    vector.remove(i);
  将下标为9的元素删除了。那么通过get方法访问下标为9的元素肯定就会出问题了。
  因此为了保证线程安全,必须在方法调用端做额外的同步措施,如下面所示:

public class Test {
    static Vector<Integer> vector = new Vector<Integer>();
    public static void main(String[] args) throws InterruptedException {
        while(true) {
            for(int i=0;i<10;i++)
                vector.add(i);
            Thread thread1 = new Thread(){
                public void run() {
                    synchronized (Test.class) {   //进行额外的同步
                        for(int i=0;i<vector.size();i++)
                            vector.remove(i);
                    }
                };
            };
            Thread thread2 = new Thread(){
                public void run() {
                    synchronized (Test.class) {
                        for(int i=0;i<vector.size();i++)
                            vector.get(i);
                    }
                };
            };
            thread1.start();
            thread2.start();
            while(Thread.activeCount()>10)   {
                 
            }
        }
    }
}
  3. ConcurrentModificationException异常
  在对Vector等容器并发地进行迭代修改时,会报ConcurrentModificationException异常,但是在并发容器中不会出现这个问题。比如:
public class Test {
    public static void main(String[] args)  {
        ArrayList<Integer> list = new ArrayList<Integer>();
        list.add(2);
        Iterator<Integer> iterator = list.iterator();
        while(iterator.hasNext()){
            Integer integer = iterator.next();
            if(integer==2)
                list.remove(integer);
        }
    }
}
  运行结果:

  从异常信息可以发现,异常出现在checkForComodification()方法中。
  我们不忙看checkForComodification()方法的具体实现,我们先根据程序的代码一步一步看ArrayList源码的实现:
首先看ArrayList的iterator()方法的具体实现,查看源码发现在ArrayList的源码中并没有iterator()这个方法,那么很显然这个方法应该是其父类或者实现的接口中的方法,我们在其父类AbstractList中找到了iterator()方法的具体实现,下面是其实现代码:

public Iterator<E> iterator() {
    return new Itr();
}
  从这段代码可以看出返回的是一个指向Itr类型对象的引用,我们接着看Itr的具体实现,在AbstractList类中找到了Itr类的具体实现,它是AbstractList的一个成员内部类,下面这段代码是Itr类的所有实现:
private class Itr implements Iterator<E> {
    int cursor = 0;
    int lastRet = -1;
    int expectedModCount = modCount;
    public boolean hasNext() {
           return cursor != size();
    }
    public E next() {
           checkForComodification();
        try {
        E next = get(cursor);
        lastRet = cursor++;
        return next;
        } catch (IndexOutOfBoundsException e) {
        checkForComodification();
        throw new NoSuchElementException();
        }
    }
    public void remove() {
        if (lastRet == -1)
        throw new IllegalStateException();
           checkForComodification();
 
        try {
        AbstractList.this.remove(lastRet);
        if (lastRet < cursor)
            cursor--;
        lastRet = -1;
        expectedModCount = modCount;
        } catch (IndexOutOfBoundsException e) {
        throw new ConcurrentModificationException();
        }
    }
 
    final void checkForComodification() {
        if (modCount != expectedModCount)
        throw new ConcurrentModificationException();
    }
}
  首先我们看一下它的几个成员变量:
  1)cursor:表示下一个要访问的元素的索引,从next()方法的具体实现就可看出
  2)lastRet:表示上一个访问的元素的索引
  3)expectedModCount:表示对ArrayList修改次数的期望值,它的初始值为modCount。
  4)modCount是AbstractList类中的一个成员变量

protected transient int modCount = 0;
  该值表示对List的修改次数,查看ArrayList的add()和remove()方法就可以发现,每次调用add()方法或者remove()方法就会对modCount进行加1操作。
  好了,到这里我们再看看上面的程序:
  当调用list.iterator()返回一个Iterator之后,通过Iterator的hashNext()方法判断是否还有元素未被访问,我们看一下hasNext()方法,hashNext()方法的实现很简单:

public boolean hasNext() {
    return cursor != size();
}
  如果下一个访问的元素下标不等于ArrayList的大小,就表示有元素需要访问,这个很容易理解,如果下一个访问元素的下标等于ArrayList的大小,则肯定到达末尾了。
  然后通过Iterator的next()方法获取到下标为0的元素,我们看一下next()方法的具体实现:

public E next() {
    checkForComodification();
 try {
    E next = get(cursor);
    lastRet = cursor++;
    return next;
 } catch (IndexOutOfBoundsException e) {
    checkForComodification();
    throw new NoSuchElementException();
 }
}
  这里是非常关键的地方:首先在next()方法中会调用checkForComodification()方法,然后根据cursor的值获取到元素,接着将cursor的值赋给lastRet,并对cursor的值进行加1操作。初始时,cursor为0,lastRet为-1,那么调用一次之后,cursor的值为1,lastRet的值为0。注意此时,modCount为0,expectedModCount也为0。
  接着往下看,程序中判断当前元素的值是否为2,若为2,则调用list.remove()方法来删除该元素。
  我们看一下在ArrayList中的remove()方法做了什么:

public boolean remove(Object o) {
    if (o == null) {
        for (int index = 0; index < size; index++)
            if (elementData[index] == null) {
                fastRemove(index);
                return true;
            }
    } else {
        for (int index = 0; index < size; index++)
            if (o.equals(elementData[index])) {
                fastRemove(index);
                return true;
            }
    }
    return false;
}
 
 
private void fastRemove(int index) {
    modCount++;
    int numMoved = size - index - 1;
    if (numMoved > 0)
        System.arraycopy(elementData, index+1, elementData, index,
                numMoved);
    elementData[--size] = null; // Let gc do its work
}
  通过remove方法删除元素最终是调用的fastRemove()方法,在fastRemove()方法中,首先对modCount进行加1操作(因为对集合修改了一次),然后接下来就是删除元素的操作,最后将size进行减1操作,并将引用置为null以方便垃圾收集器进行回收工作。
  那么注意此时各个变量的值:对于iterator,其expectedModCount为0,cursor的值为1,lastRet的值为0。
对于list,其modCount为1,size为0。
  接着看程序代码,执行完删除操作后,继续while循环,调用hasNext方法()判断,由于此时cursor为1,而size为0,那么返回true,所以继续执行while循环,然后继续调用iterator的next()方法:
  注意,此时要注意next()方法中的第一句:checkForComodification()。
  在checkForComodification方法中进行的操作是:

final void checkForComodification() {
    if (modCount != expectedModCount)
    throw new ConcurrentModificationException();
}
  如果modCount不等于expectedModCount,则抛出ConcurrentModificationException异常。
  很显然,此时modCount为1,而expectedModCount为0,因此程序就抛出了ConcurrentModificationException异常。
  到这里,想必大家应该明白为何上述代码会抛出ConcurrentModificationException异常了。
  关键点就在于:调用list.remove()方法导致modCount和expectedModCount的值不一致。这是因为Iterator和AbstractList的关系。
  注意,像使用for-each进行迭代实际上也会出现这种问题。除非用for语句。
  既然知道原因了,那么如何解决呢?
  其实很简单,细心的朋友可能发现在Itr类中也给出了一个remove()方法:

public void remove() {
    if (lastRet == -1)
    throw new IllegalStateException();
       checkForComodification();
 
    try {
    AbstractList.this.remove(lastRet);
    if (lastRet < cursor)
        cursor--;
    lastRet = -1;
    expectedModCount = modCount;
    } catch (IndexOutOfBoundsException e) {
    throw new ConcurrentModificationException();
    }
}
  在这个方法中,删除元素实际上调用的就是list.remove()方法,但是它多了一个操作:expectedModCount = modCount;
  因此,在迭代器中如果要删除元素的话,需要调用Itr类的remove方法。
  将上述代码改为下面这样就不会报错了:

public class Test {
    public static void main(String[] args)  {
        ArrayList<Integer> list = new ArrayList<Integer>();
        list.add(2);
        Iterator<Integer> iterator = list.iterator();
        while(iterator.hasNext()){
            Integer integer = iterator.next();
            if(integer==2)
                iterator.remove();   //注意这个地方
        }
    }
}
  上面的解决办法在单线程环境下适用,但是在多线程下适用吗?看下面一个例子:
public class Test {
    static ArrayList<Integer> list = new ArrayList<Integer>();
    public static void main(String[] args)  {
        list.add(1);
        list.add(2);
        list.add(3);
        list.add(4);
        list.add(5);
        Thread thread1 = new Thread(){
            public void run() {
                Iterator<Integer> iterator = list.iterator();
                while(iterator.hasNext()){
                    Integer integer = iterator.next();
                    System.out.println(integer);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
        };
        Thread thread2 = new Thread(){
            public void run() {
                Iterator<Integer> iterator = list.iterator();
                while(iterator.hasNext()){
                    Integer integer = iterator.next();
                    if(integer==2)
                        iterator.remove(); 
                }
            };
        };
        thread1.start();
        thread2.start();
    }
}
  运行结果:

  有可能有朋友说ArrayList是非线程安全的容器,换成Vector就没问题了,实际上换成Vector还是会出现这种错误。
原因在于,虽然Vector的方法采用了synchronized进行了同步,但是由于Vector是继承的AbstarctList,因此通过Iterator来访问容器的话,事实上是不需要获取锁就可以访问。那么显然,由于使用iterator对容器进行访问不需要获取锁,在多线程中就会造成当一个线程删除了元素,由于modCount是AbstarctList的成员变量,因此可能会导致在其他线程中modCount和expectedModCount值不等。
  就比如上面的代码中,很显然iterator是线程私有的,初始时,线程1和线程2中的modCount、expectedModCount都为0,
当线程2通过iterator.remove()删除元素时,会修改modCount值为1,并且会修改线程2中的expectedModCount的值为1,
而此时线程1中的expectedModCount值为0,虽然modCount不是volatile变量,但是照样导致线程1中比较expectedModCount和modCount不等,而抛出异常。
  因此一般有2种解决办法:
  1)在使用iterator迭代的时候使用synchronized或者Lock进行同步;
  2)使用并发容器CopyOnWriteArrayList代替ArrayList和Vector。

  二、并发容器

  JDK5中添加了新的concurrent包,相对同步容器而言,并发容器通过一些机制改进了并发性能。因为同步容器将所有对容器状态的访问都串行化了,这样保证了线程的安全性,所以这种方法的代价就是严重降低了并发性,当多个线程竞争容器时,吞吐量严重降低。因此Java5.0开始针对多线程并发访问设计,提供了并发性能较好的并发容器,引入了java.util.concurrent包。与Vector和Hashtable、Collections.synchronizedXxx()同步容器等相比,util.concurrent中引入的并发容器主要解决了两个问题:
  1)根据具体场景进行设计,尽量避免synchronized,提供并发性。
  2)定义了一些并发安全的复合操作,并且保证并发环境下的迭代操作不会出错。
util.concurrent中容器在迭代时,可以不封装在synchronized中,可以保证不抛异常,但是未必每次看到的都是"最新的、当前的"数据。
  下面是对并发容器的简单介绍:
  ConcurrentHashMap代替同步的Map(Collections.synchronized(new HashMap())),众所周知,HashMap是根据散列值分段存储的,同步Map在同步的时候锁住了所有的段,而ConcurrentHashMap加锁的时候根据散列值锁住了散列值锁对应的那段,因此提高了并发性能。ConcurrentHashMap也增加了对常用复合操作的支持,比如"若没有则添加":putIfAbsent(),替换:replace()。这2个操作都是原子操作。
  CopyOnWriteArrayList和CopyOnWriteArraySet分别代替List和Set,主要是在遍历操作为主的情况下来代替同步的List和同步的Set,这也就是上面所述的思路:迭代过程要保证不出错,除了加锁,另外一种方法就是"克隆"容器对象。
  ConcurrentLinkedQuerue是一个先进先出的队列。它是非阻塞队列。
  ConcurrentSkipListMap可以在高效并发中替代SoredMap(例如用Collections.synchronzedMap包装的TreeMap)。
  ConcurrentSkipListSet可以在高效并发中替代SoredSet(例如用Collections.synchronzedSet包装的TreeMap)。
  本节着重讲解2个并发容器:ConcurrentHashMap和CopyOnWriteArrayList

  1、ConcurrentHashMap

  大家都知道HashMap是非线程安全的,Hashtable是线程安全的,但是由于Hashtable是采用synchronized进行同步,相当于所有线程进行读写时都去竞争一把锁,导致效率非常低下。
  ConcurrentHashMap可以做到读取数据不加锁,并且其内部的结构可以让其在进行写操作的时候能够将锁的粒度保持地尽量地小,不用对整个ConcurrentHashMap加锁。ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个叫做Segment的结构,一个Segment其实就是一个类Hash Table的结构,Segment内部维护了一个链表数组,我们用下面这一幅图来看下ConcurrentHashMap的内部结构:


  从上面的结构我们可以了解到,ConcurrentHashMap定位一个元素的过程需要进行两次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在的链表的头部,因此,这一种结构的带来的副作用是Hash的过程要比普通的HashMap要长,但是带来的好处是写操作的时候可以只对元素所在的Segment进行加锁即可,不会影响到其他的Segment,这样,在最理想的情况下,ConcurrentHashMap可以最高同时支持Segment数量大小的写操作(刚好这些写操作都非常平均地分布在所有的Segment上),所以,通过这一种结构,ConcurrentHashMap的并发能力可以大大的提高。

  我们再来具体了解一下Segment的数据结构:

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    transient volatile int count;
    transient int modCount;
    transient int threshold;
    transient volatile HashEntry<K,V>[] table;
    final float loadFactor;
}
  详细解释一下Segment里面的成员变量的意义:
  count:Segment中元素的数量
  modCount:对table的大小造成影响的操作的数量(比如put或者remove操作)
  threshold:阈值,Segment里面元素的数量超过这个值依旧就会对Segment进行扩容
  table:链表数组,数组中的每一个元素代表了一个链表的头部
  loadFactor:负载因子,用于确定threshold

  Segment中的元素是以HashEntry的形式存放在链表数组中的,看一下HashEntry的结构:

static final class HashEntry<K,V> {
    final K key;
    final int hash;
    volatile V value;
    final HashEntry<K,V> next;
}
  可以看到HashEntry的一个特点,除了value以外,其他的几个变量都是final的,这样做是为了防止链表结构被破坏,出现ConcurrentModification的情况。
  下面我们来结合源代码来具体分析一下ConcurrentHashMap的实现,先看下初始化方法:

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
  
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
  
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    segmentShift = 32 - sshift;
    segmentMask = ssize - 1;
    this.segments = Segment.newArray(ssize);
  
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = 1;
    while (cap < c)
        cap <<= 1;
  
    for (int i = 0; i < this.segments.length; ++i)
        this.segments[i] = new Segment<K,V>(cap, loadFactor);
}
  CurrentHashMap的初始化一共有三个参数,一个initialCapacity,表示初始的容量,一个loadFactor,表示负载参数,最后一个是concurrentLevel,代表ConcurrentHashMap内部的Segment的数量,ConcurrentLevel一经指定,不可改变,后续如果ConcurrentHashMap的元素数量增加导致ConrruentHashMap需要扩容,ConcurrentHashMap不会增加Segment的数量,而只会增加Segment中链表数组的容量大小,这样的好处是扩容过程不需要对整个ConcurrentHashMap做rehash,而只需要对Segment里面的元素做一次rehash就可以了。
  整个ConcurrentHashMap的初始化方法还是非常简单的,先是根据concurrentLevel来new出Segment,这里Segment的数量是不大于concurrentLevel的最大的2的指数,就是说Segment的数量永远是2的指数个,这样的好处是方便采用移位操作来进行hash,加快hash的过程。接下来就是根据intialCapacity确定Segment的容量的大小,每一个Segment的容量大小也是2的指数,同样使为了加快hash的过程。
  这边需要特别注意一下两个变量,分别是segmentShift和segmentMask,这两个变量在后面将会起到很大的作用,假设构造函数确定了Segment的数量是2的n次方,那么segmentShift就等于32减去n,而segmentMask就等于2的n次方减一(为什么这么设计,下面会讲到)。

  前面提到过ConcurrentHashMap的get操作是不用加锁的,我们这里看一下其实现:

public V get(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).get(key, hash);
}
  看第三行,segmentFor这个函数用于确定操作应该在哪一个segment中进行,几乎对ConcurrentHashMap的所有操作都需要用到这个函数,我们看下这个函数的实现:
final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];
}
  这个函数用了位操作来确定Segment,根据传入的hash值向右无符号右移segmentShift位,然后和segmentMask进行与操作,结合我们之前说的segmentShift和segmentMask的值,就可以得出以下结论:假设Segment的数量是2的n次方,根据元素的hash值的高n位就可以确定元素到底在哪一个Segment中。为什么可以这么说?原因在于,我们看回之前segmentShift和segmentMask的设计,segmentShift=32-n,32是不是很眼熟?就是Java中整形的上限呀。举个例子,我们知道Integer类型的hashcode值就是其本身,那么任意值无符号右移了(32-n)位以后,在整形范围之内,是不是就剩下了该值的前n位,而segmentMask=2^n-1,正好是后n位全为1,其余位数全为0,此时一&,不就是原hash值的高n位了吗,很巧妙!

  在确定了需要在哪一个segment中进行操作以后,接下来的事情就是调用对应的Segment的get方法:

V get(Object key, int hash) {
    if (count != 0) { // read-volatile
        HashEntry<K,V> e = getFirst(hash);
        while (e != null) {
            if (e.hash == hash && key.equals(e.key)) {
                V v = e.value;
                if (v != null)
                    return v;
                return readValueUnderLock(e); // recheck
            }
            e = e.next;
        }
    }
    return null;
}
  先看第二行代码,这里对count进行了一次判断,其中count表示Segment中元素的数量,我们可以来看一下count的定义:
transient volatile int count;
  可以看到count是volatile的,实际上这里里面利用了volatile的语义:
  对volatile字段的写入操作happens-before于每一个后续的同一个字段的读操作。
  因为实际上put、remove等操作也会更新count的值,所以当竞争发生的时候,volatile的语义可以保证写操作在读操作之前,也就保证了写操作对后续的读操作都是可见的,这样后面get的后续操作就可以拿到完整的元素内容。
  然后,在第三行,调用了getFirst()来取得链表的头部:

HashEntry<K,V> getFirst(int hash) {
    HashEntry<K,V>[] tab = table;
    return tab[hash & (tab.length - 1)];
}
  同样,这里也是用位操作来确定链表的头部,hash值和HashTable的长度减一做与操作,最后的结果就是hash值的低n位,其中n是HashTable的长度以2为底的结果。
  在确定了链表的头部以后,就可以对整个链表进行遍历,看第4行,取出key对应的value的值,如果拿出的value的值是null,则可能这个key,value对正在put的过程中,如果出现这种情况,那么就加锁来保证取出的value是完整的,如果不是null,则直接返回value。

  看完了get操作,再看下put操作,put操作的前面也是确定Segment的过程,这里不再赘述,直接看关键的segment的put方法:

V put(K key, int hash, V value, boolean onlyIfAbsent) {
    lock();
    try {
        int c = count;
        if (c++ > threshold) // ensure capacity
            rehash();
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;
  
        V oldValue;
        if (e != null) {
            oldValue = e.value;
            if (!onlyIfAbsent)
                e.value = value;
        }
        else {
            oldValue = null;
            ++modCount;
            tab[index] = new HashEntry<K,V>(key, hash, first, value);
            count = c; // write-volatile
        }
        return oldValue;
    } finally {
        unlock();
    }
}
  首先对Segment的put操作是加锁完成的,然后在第五行,如果Segment中元素的数量超过了阈值(由构造函数中的loadFactor算出)这需要进行对Segment扩容,并且要进行rehash,关于rehash的过程大家可以自己去了解,这里不详细讲了。
  第8和第9行的操作就是getFirst的过程,确定链表头部的位置。
  第11行这里的这个while循环是在链表中寻找和要put的元素相同key的元素,如果找到,就直接更新更新key的value,如果没有找到,则进入21行这里,生成一个新的HashEntry并且把它加到整个Segment的头部,然后再更新count的值。
  Remove操作的前面一部分和前面的get和put操作一样,都是定位Segment的过程,然后再调用Segment的remove方法:

V remove(Object key, int hash, Object value) {
    lock();
    try {
        int c = count - 1;
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;
  
        V oldValue = null;
        if (e != null) {
            V v = e.value;
            if (value == null || value.equals(v)) {
                oldValue = v;
                // All entries following removed node can stay
                // in list, but all preceding ones need to be
                // cloned.
                ++modCount;
                HashEntry<K,V> newFirst = e.next;
                for (HashEntry<K,V> p = first; p != e; p = p.next)
                    newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                  newFirst, p.value);
                tab[index] = newFirst;
                count = c; // write-volatile
            }
        }
        return oldValue;
    } finally {
        unlock();
    }
}
  首先remove操作也是确定需要删除的元素的位置,不过这里删除元素的方法不是简单地把待删除元素的前面的一个元素的next指向后面一个就完事了,我们之前已经说过HashEntry中的next是final的,一经赋值以后就不可修改,在定位到待删除元素的位置以后,程序就将待删除元素前面的那一些元素全部复制一遍,然后再一个一个重新接到链表上去,看一下下面这一幅图来了解这个过程:

  假设链表中原来的元素如上图所示,现在要删除元素3,那么删除元素3以后的链表就如下图所示:


  在前面的章节中,我们涉及到的操作都是在单个Segment中进行的,但是ConcurrentHashMap有一些操作是在多个Segment中进行,比如size操作,ConcurrentHashMap的size操作也采用了一种比较巧的方式,来尽量避免对所有的Segment都加锁。
  前面我们提到了一个Segment中的有一个modCount变量,代表的是对Segment中元素的数量造成影响的操作的次数,这个值只增不减,size操作就是遍历了两次Segment,每次记录Segment的modCount值,然后将两次的modCount进行比较,如果相同,则表示期间没有发生过写入操作,就将原先遍历的结果返回,如果不相同,则把这个过程再重复做一次,如果再不相同,则就需要将所有的Segment都锁住,然后一个一个遍历了,具体的实现大家可以看ConcurrentHashMap的源码,这里就不贴了。

  2、CopyOnWriteArrayList

  Copy-On-Write简称COW,是一种用于程序设计中的优化策略。其基本思路是,从一开始大家都在共享同一个内容,当某个人想要修改这个内容的时候,才会真正把内容Copy出去形成一个新的内容然后再改,这是一种延时懒惰策略。从JDK1.5开始Java并发包里提供了两个使用CopyOnWrite机制实现的并发容器,它们是CopyOnWriteArrayList和CopyOnWriteArraySet。CopyOnWrite容器非常有用,可以在非常多的并发场景中使用到。

  CopyOnWrite容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

  在使用CopyOnWriteArrayList之前,我们先阅读其源码了解下它是如何实现的。以下代码是向CopyOnWriteArrayList中add方法的实现(向CopyOnWriteArrayList里添加元素),可以发现在添加的时候是需要加锁的,否则多线程写的时候会Copy出N个副本出来。

/**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return <tt>true</tt> (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
    }
  读的时候不需要加锁,如果读的时候有多个线程正在向CopyOnWriteArrayList添加数据,读还是会读到旧的数据,因为写的时候不会锁住旧的CopyOnWriteArrayList。
public E get(int index) {
    return get(getArray(), index);
}
  CopyOnWrite并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新场景,假如我们有一个搜索网站,用户在这个网站的搜索框中,输入关键字搜索内容,但是某些关键字不允许被搜索。这些不能被搜索的关键字会被放在一个黑名单当中,黑名单每天晚上更新一次。当用户搜索时,会检查当前关键字在不在黑名单当中,如果在,则提示不能搜索。实现代码如下:
package com.ifeve.book;
 
import java.util.Map;
 
import com.ifeve.book.forkjoin.CopyOnWriteMap;
 
/**
 * 黑名单服务
 *
 * @author fangtengfei
 *
 */
public class BlackListServiceImpl {
 
    private static CopyOnWriteMap<String, Boolean> blackListMap = new CopyOnWriteMap<String, Boolean>(
            1000);
 
    public static boolean isBlackList(String id) {
        return blackListMap.get(id) == null ? false : true;
    }
 
    public static void addBlackList(String id) {
        blackListMap.put(id, Boolean.TRUE);
    }
 
    /**
     * 批量添加黑名单
     *
     * @param ids
     */
    public static void addBlackList(Map<String,Boolean> ids) {
        blackListMap.putAll(ids);
    }
 
}
  JDK中并没有提供CopyOnWriteMap,我们可以参考CopyOnWriteArrayList来实现一个,基本代码如下:
import java.util.Collection;
import java.util.Map;
import java.util.Set;
 
public class CopyOnWriteMap<K, V> implements Map<K, V>, Cloneable {
    private volatile Map<K, V> internalMap;
 
    public CopyOnWriteMap() {
        internalMap = new HashMap<K, V>();
    }
 
    public V put(K key, V value) {
 
        synchronized (this) {
            Map<K, V> newMap = new HashMap<K, V>(internalMap);
            V val = newMap.put(key, value);
            internalMap = newMap;
            return val;
        }
    }
 
    public V get(Object key) {
        return internalMap.get(key);
    }
 
    public void putAll(Map<? extends K, ? extends V> newData) {
        synchronized (this) {
            Map<K, V> newMap = new HashMap<K, V>(internalMap);
            newMap.putAll(newData);
            internalMap = newMap;
        }
    }
}
  代码很简单,但是使用CopyOnWriteMap需要注意两件事情:
1. 减少扩容开销。根据实际需要,初始化CopyOnWriteMap的大小,避免写时CopyOnWriteMap扩容的开销。
  2. 使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。如使用上面代码里的addBlackList方法。

  CopyOnWrite容器有很多优点,但是同时也存在两个问题,即内存占用问题和数据一致性问题。所以在开发的时候需要注意一下。
内存占用问题。因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。如果这些对象占用的内存比较大,比如说200M左右,那么再写入100M数据进去,内存就会占用300M,那么这个时候很有可能造成频繁的Yong GC和Full GC。之前我们系统中使用了一个服务由于每晚使用CopyOnWrite机制更新大对象,造成了每晚15秒的Full GC,应用响应时间也随之变长。
  针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap。
数据一致性问题。CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。



  • 作者:sjjsh2
  • 原文链接:https://blog.csdn.net/sjjsh2/article/details/53286001
    更新时间:2022-09-14 12:35:59