百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

万字解析ConcurrentHashMap源码 大写万字有亻吗

lipiwang 2024-11-06 19:40 7 浏览 0 评论

ConcurrentHashMap

在多线程环境下,使用HashMap进行put操作时存在丢失数据的情况,为了避免这种bug的隐患,强烈建议使用ConcurrentHashMap代替HashMap

1. ConcurrentHashMap的结构

和HashMap一样是一个散列链表,数据存储在hash桶中。

2. 源码解析

2.1 属性及构造

属性

private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final int DEFAULT_CAPACITY = 16;
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
private static final float LOAD_FACTOR = 0.75f;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;
static final int MOVED     = -1; // hash for forwarding nodes
static final int TREEBIN   = -2; // hash for roots of trees
static final int RESERVED  = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
static final int NCPU = Runtime.getRuntime().availableProcessors();
// volatile修饰的table,表示table数组在内存中对所有线程都及时可见,如果一个线程修改了table数组的值,其他线程中如果自己的线程栈中有table的副本,就会把table缓存行设置为失效,强制从内存中读取table数组的值。当然下面使用了volatile修饰的也是一样的
transient volatile Node<K,V>[] table;
// 一个过渡的table表  只有在扩容的时候才会使用 
private transient volatile Node<K,V>[] nextTable;
private transient volatile long baseCount;
/** 
   * Table initialization and resizing control.  When negative, the 
   * table is being initialized or resized: -1 for initialization, 
   * else -(1 + the number of active resizing threads).  Otherwise, 
   * when table is null, holds the initial table size to use upon 
   * creation, or 0 for default. After initialization, holds the 
   * next element count value upon which to resize the table. 
   hash表初始化或扩容时的一个控制位标识量。 
   负数代表正在进行初始化或扩容操作 
   -1代表正在初始化 
   -N 表示有N-1个线程正在进行扩容操作 
   正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小    
   */  
private transient volatile int sizeCtl;
private transient volatile int transferIndex;
private transient volatile int cellsBusy;
private transient volatile CounterCell[] counterCells;
// views
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;

构造

// Creates a new, empty map with the default initial table size (16).
public ConcurrentHashMap() {
}

public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
}

2.2 重点解析

public void putAll(Map<? extends K, ? extends V> m) {
    tryPresize(m.size());
    for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
        putVal(e.getKey(), e.getValue(), false);
}
// hash方法
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

2.2.1 putVal

final V putVal(K key, V value, boolean onlyIfAbsent) {
    // key value都不能为空
    if (key == null || value == null) throw new NullPointerException();
    // concurrentHashMap的hash方法
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        /*
            如果table是空,那么调用初始化
            若不空则根据hash寻找,若没有找到则插入。
        */
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        /**
         static final int MOVED = -1;
         如果当前位置的 hashcode == MOVED == -1, map 正在扩容,其他线程帮助扩容,也就是多线程扩容。
        **/
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            /**
                利用 synchronized 锁写入数据
                fh〉0 说明这个节点是一个链表的节点不是树的节点。
                如果是红黑树,照树的方式插入值  
                
                内置锁synchronized锁住了f,因为f是指定特定的tab[i]的。所以就锁住了整行链表,
                这个设计跟分段锁有异曲同工之妙,只是其他读取操作需要用cas来保证
            **/
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            /** 当链表长度大于8时,将链表转换为红黑树 */
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

下面这段内容是搬运最开始的资料中

putVal(K key, V value, boolean onlyIfAbsent, boolean evict)大体流程如下: 1、检查key/value是否为null,如果为null,则抛异常,否则进行2 2、进入for死循环,进行3 3、检查table是否初始化了,如果没有,则调用initTable()进行初始化然后进行 2,否则进行4 4、根据key的hash值计算出其应该在table中储存的位置i(根据key的hashcode计算出Hash值,在将Hash值与length-1进行按位与,length是2的整数次幂,减1后的二进制与Hash值进行按位与相当于取余运算,但取余的位运算次数肯定不止1次,而这里一次位运算就得出结果,效率更高),取出table[i]的节点用f表示。 根据f的不同有如下三种情况:

1)如果table[i]==null(即该位置的节点为空,没有发生碰撞),则利用CAS操作直接存储在该位置,如果CAS操作成功则退出死循环。

  2)如果table[i]!=null(即该位置已经有其它节点,发生碰撞),碰撞处理也有两种情况     2.1)检查table[i]的节点的hash是否等于MOVED(-1),如果等于,则检测到正在扩容,则帮助其扩容     2.2)说明table[i]的节点的hash值不等于MOVED,synchronized锁住头结点table[i],进行插入操作;

       如果table[i]为链表节点,则将此节点插入链表末尾中即可;

       如果table[i]为树节点,则将此节点插入树中即可;

       插入成功后,进行 5

5、如果table[i]的节点是链表节点,则检查table的第i个位置的链表的元素个数是否大于了8,大于8就需要转化为树,如果需要则调用treeifyBin函数进行转化。   链表转树:将数组tab的第index位置的链表转化为红黑树。 6、插入成功后,如果key已经存在,返回oldValue;key开始不存在,返回null。上面第4点中的2)中的2.1)帮助扩容:如果当前正在扩容,则尝试协助其扩容,死循环再次发挥了重试的作用,有趣的是ConcurrentHashMap是可以多线程同时扩容的。

这里说协助的原因在于,对于数组扩容,一般分为两步:1.新建一个更大的数组;2.将原数组数据(重新散列Hash值)copy到新数组中。

对于第一步,ConcurrentHashMap通过CAS来控制一个int变量保证新建数组这一步建一个更大的数组只会执行一次;

对于第二步,ConcurrentHashMap采用CAS + synchronized + 移动后标记 的方式来达到多线程扩容的目的。

感兴趣可以查看transfer函数。

目前的猜想多线程扩容可能是:多线程操作不同的table位置的链表或红黑树,将元素重新散列到新的table数组中。

2.2.2 initTable

/**
  * Initializes table, using the size recorded in sizeCtl.
  */
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        //  初始化的"功劳"被其他线程"抢去"了
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // CAS 一下,将 sizeCtl 设置为 -1,代表抢到了锁
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

2.2.3 addCount

// 从 putVal 传入的参数是 1, binCount,binCount 默认是0,只有 hash 冲突了才会大于 1.且他的大小是链表的长度(如果不是红黑数结构的话)。
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
     // 如果计数盒子不是空 或者  修改 baseCount 失败
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        // 如果计数盒子是空(尚未出现并发)
        // 如果随机取余一个数组位置为空 或者
        // 修改这个槽位的变量失败(出现并发了)
        // 执行 fullAddCount 方法。并结束
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
         // 当条件满足开始扩容
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            // 如果小于0说明已经有线程在进行扩容操作了
            // 已经有在扩容或者多线程进行了扩容,则break不要进入扩容操作
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // 这个时候sizeCtl已经等于(rs << RESIZE_STAMP_SHIFT) + 2等于一个大的负数,这边加上2很巧妙,因为transfer后面对sizeCtl--操作的时候,最多只能减两次就结束
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

// CounterCell 中volatile  value
@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}

x 参数表示的此次需要对表中元素的个数加几。check 参数表示是否需要进行扩容检查,大于等于0 需要进行检查,而我们的 putVal 方法的 binCount 参数最小也是 0 ,因此,每次添加元素都会进行检查。(除非是覆盖操作)

判断计数盒子属性是否是空,如果是空,就尝试修改 baseCount 变量,对该变量进行加 X。

如果计数盒子不是空,或者修改 baseCount 变量失败了,则放弃对 baseCount 进行操作。

如果计数盒子是 null 或者计数盒子的 length 是 0,或者随机取一个位置取于数组长度是 null,那么就对刚刚的元素进行 CAS 赋值。

如果赋值失败,或者满足上面的条件,则调用 fullAddCount 方法重新死循环插入。

这里如果操作 baseCount 失败了(或者计数盒子不是 Null),且对计数盒子赋值成功,那么就检查 check 变量,如果该变量小于等于 1. 直接结束。否则,计算一下 count 变量。

如果 check 大于等于 0 ,说明需要对是否扩容进行检查。

如果 map 的 size 大于 sizeCtl(扩容阈值),且 table 的长度小于 1 << 30,那么就进行扩容。

根据 length 得到一个标识符,然后,判断 sizeCtl 状态,如果小于 0 ,说明要么在初始化,要么在扩容。

如果正在扩容,那么就校验一下数据是否变化了(具体可以看上面代码的注释)。如果检验数据不通过,break。

如果校验数据通过了,那么将 sizeCtl 加一,表示多了一个线程帮助扩容。然后进行扩容。

如果没有在扩容,但是需要扩容。那么就将 sizeCtl 更新,赋值为标识符左移 16 位 —— 一个负数。然后加 2。 表示,已经有一个线程开始扩容了。然后进行扩容。然后再次更新 count,看看是否还需要扩容。

addCount 方法做了 2 件事情

对 table 的长度加一。无论是通过修改 baseCount,还是通过使用 CounterCell。当 CounterCell 被初始化了,就优先使用他,不再使用 baseCount。

检查是否需要扩容,或者是否正在扩容。如果需要扩容,就调用扩容方法,如果正在扩容,就帮助其扩容。

2.2.4 helpTransfer

/**
  * Helps transfer if a resize is in progress.
  * 如果线程进入到这边说明已经有其他线程正在做扩容操作,这个是一个辅助方法
  */
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        int rs = resizeStamp(tab.length);
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

2.2.5 transfer

/**
  * Moves and/or copies the nodes in each bin to new table. See
  * above for explanation.
  */
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        // 单个线程允许处理的最少table桶首节点个数,即每个线程的处理任务量
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            // 构造一个nextTable对象 它的容量是原来的两倍  
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        // transferIndex为扩容复制过程中的桶首节点遍历索引,所以从n开始,表示从后向前遍历
        transferIndex = n;
    }
    int nextn = nextTab.length;
    // 调用内部类ForwardingNode的构造,将扩容后的table放在fwd节点后面
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            // transferIndex = 0表示table中所有数组元素都已经有其他线程负责扩容
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // 尝试更新transferIndex,获取当前线程执行扩容复制的索引区间,更新成功,则当前线程负责完成索引为(nextBound,nextIndex)之间的桶首节点扩容
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                nextTable = null;
                table = nextTab;
                // 扩容成功,设置新sizeCtl,仍然为总大小的0.75
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // 利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作  
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
         // 当前table节点为空,不需要复制,直接放入ForwardingNode
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 当前table节点已经是ForwardingNode,表示已经被其他线程处理了,则直接往前遍历,通过CAS读写ForwardingNode节点状态,达到多线程互斥处理
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            //  锁住当前桶首节点
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                        (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                        (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

/** 涉及的内部类 ForwardingNode
  * A node inserted at head of bins during transfer operations.
  */
static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }

    Node<K,V> find(int h, Object k) {
        // loop to avoid arbitrarily deep recursion on forwarding nodes
        outer: for (Node<K,V>[] tab = nextTable;;) {
            Node<K,V> e; int n;
            if (k == null || tab == null || (n = tab.length) == 0 ||
                (e = tabAt(tab, (n - 1) & h)) == null)
                return null;
            for (;;) {
                int eh; K ek;
                if ((eh = e.hash) == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
                if (eh < 0) {
                    if (e instanceof ForwardingNode) {
                        tab = ((ForwardingNode<K,V>)e).nextTable;
                        continue outer;
                    }
                    else
                        return e.find(h, k);
                }
                if ((e = e.next) == null)
                    return null;
            }
        }
    }
}

下面内容搬运自链接 https://blog.csdn.net/zguoshuaiiii/article/details/78495332

整个扩容操作分为两个部分

第一部分是构建一个nextTable,它的容量是原来的两倍,这个操作是单线程完成的。这个单线程的保证是通过RESIZE_STAMP_SHIFT这个常量经过一次运算来保证的,这个地方在后面会有提到;

第二个部分就是将原来table中的元素复制到nextTable中,这里允许多线程进行操作。

先来看一下单线程是如何完成的:

它的大体思想就是遍历、复制的过程。首先根据运算得到需要遍历的次数i,然后利用tabAt方法获得i位置的元素:

如果这个位置为空,就在原table中的i位置放入forwardNode节点,这个也是触发并发扩容的关键点;

如果这个位置是Node节点(fh>=0),如果它是一个链表的头节点,就构造一个反序链表,把他们分别放在nextTable的i和i+n的位置上

如果这个位置是TreeBin节点(fh<0),也做一个反序处理,并且判断是否需要untreefi,把处理的结果分别放在nextTable的i和i+n的位置上

遍历过所有的节点以后就完成了复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍 ,完成扩容。

看一下多线程是如何完成的: 在扩容部分代码中有一个判断,如果遍历到的节点是forward节点,就向后继续遍历,再加上给节点上锁的机制,就完成了多线程的控制。多线程遍历节点,处理了一个节点,就把对应点的值set为forward,另一个线程看到forward,就向后遍历。这样交叉就完成了复制工作。而且还很好的解决了线程安全的问题。

2.2.6 get

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 判断是否就是桶首节点
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // hash为负表示是扩容中的ForwardingNode节点,直接调用ForwardingNode的find方法(可以是代理到扩容中的nextTable)
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        // 通过next指针,逐一查找
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

2.2.7 remove

public V remove(Object key) {
    return replaceNode(key, null, null);
}

final V replaceNode(Object key, V value, Object cv) {
    int hash = spread(key.hashCode());
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0 ||
            /*每次循环都会重新计算槽的位置,因为在扩容完成后会使用新表
             槽的位置可能会发生改变*/
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        // 如果有线程正在扩容,先帮助它一起扩容,然后在新表中进行put操作
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            boolean validated = false;
            // 加锁操作,防止其它线程对此桶同时进行put,remove,transfer操作
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        validated = true;
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            if (validated) {
                if (oldVal != null) {
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

2.2.8 ConcurrentHashMap实现高效的并发操作的三个函数

得益于ConcurrentHashMap中的如下三个函数(sun.misc.Unsafe U):

/*
   3个用的比较多的CAS操作
*/
@SuppressWarnings("unchecked") // ASHIFT等均为private static final  
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { // 获取索引i处Node  
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);  
}  
// 利用CAS算法设置i位置上的Node节点(将c和table[i]比较,相同则插入v)。 
/*
 * 但是这边为什么i要等于((long)i << ASHIFT) + ABASE呢,计算偏移量
 * ASHIFT是指tab[i]中第i个元素在相对于数组第一个元素的偏移量,而ABASE就算第一数组的内存素的偏移地址
 * 所以呢,((long)i << ASHIFT) + ABASE就算i最后的地址
 * 那么compareAndSwapObject的作用就算tab[i]和c比较,如果相等就tab[i]=v否则tab[i]=c;
*/
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {  
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);  
}  
// 在链表转树时用到了这个方法;设置节点位置的值,仅在上锁区被调用  
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {  
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);  
}

2.2.9 keySet、values

public Collection<V> values() {
    ValuesView<K,V> vs;
    return (vs = values) != null ? vs : (values = new ValuesView<K,V>(this));
}

public KeySetView<K,V> keySet(V mappedValue) {
    if (mappedValue == null)
        throw new NullPointerException();
    return new KeySetView<K,V>(this, mappedValue);
}

同HashMap,ConcurrentHashMap也是实现自己的内部迭代器来实现迭代,以KeySe为例,可以看到返回的是一个内部类KeySetView,这个类里面重点属性是一个构造器对应类型是另一个内部类KeyIterator,KeyIterator类继承至BaseIterator类,而BaseIterator又继承至Traverser。KeyIterator调用next方法时,最终会调用Traverser的advance方法。这个方法源码如下:

// Traverser
static class Traverser<K,V> {
        Node<K,V>[] tab;        // current table; updated if resized
        Node<K,V> next;         // the next entry to use
        TableStack<K,V> stack, spare; // to save/restore on ForwardingNodes
        int index;              // index of bin to use next
        int baseIndex;          // current index of initial table
        int baseLimit;          // index bound for initial table
        final int baseSize;     // initial table size

        Traverser(Node<K,V>[] tab, int size, int index, int limit) {
            this.tab = tab;
            this.baseSize = size;
            this.baseIndex = this.index = index;
            this.baseLimit = limit;
            this.next = null;
        }

        /**
         * Advances if possible, returning next valid node, or null if none.
           如果有可能,返回下一个有效节点,否则返回null。
         */
        final Node<K,V> advance() {
            Node<K,V> e;
            // 获取Node链表的下一个元素e
            if ((e = next) != null)
                e = e.next;
            for (;;) {
                Node<K,V>[] t; int i, n;  // must use locals in checks
                if (e != null)
                    return next = e;
                // e为空,说明此链表已经遍历完成,准备遍历下一个hash桶
                if (baseIndex >= baseLimit || (t = tab) == null ||
                    (n = t.length) <= (i = index) || i < 0)
                    return next = null;
                // 获取下一个hash桶对应的node链表的头节点
                if ((e = tabAt(t, i)) != null && e.hash < 0) {
                    if (e instanceof ForwardingNode) {
                        tab = ((ForwardingNode<K,V>)e).nextTable;
                        e = null;
                        pushState(t, i, n);
                        continue;
                    }
                    else if (e instanceof TreeBin)
                        e = ((TreeBin<K,V>)e).first;
                    else
                        e = null;
                }
                if (stack != null)
                    recoverState(n);
                else if ((index = i + baseSize) >= n)
                    index = ++baseIndex; // visit upper slots if present
            }
        }
    }

2.3 内部类

2.3.1 Node

Node是最核心的内部类,它包装了key-value键值对,与HashMap中的定义很相似,但是它对value和next属性设置了volatile同步锁,它不允许调用setValue方法直接改变Node的value域,它增加了find方法辅助map.get()方法。

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;          // 带有同步锁的value
    volatile Node<K,V> next;  // 带有同步锁的next指针

    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }

    public final K getKey()       { return key; }
    public final V getValue()     { return val; }
    public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
    public final String toString(){ return key + "=" + val; }
    //不允许直接改变value的值
    public final V setValue(V value) {
        throw new UnsupportedOperationException();
    }

    public final boolean equals(Object o) {
        Object k, v, u; Map.Entry<?,?> e;
        return ((o instanceof Map.Entry) &&
                (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                (v = e.getValue()) != null &&
                (k == key || k.equals(key)) &&
                (v == (u = val) || v.equals(u)));
    }

    /**
      * Virtualized support for map.get(); overridden in subclasses.
      */
    Node<K,V> find(int h, Object k) {
        Node<K,V> e = this;
        if (k != null) {
            do {
                K ek;
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
            } while ((e = e.next) != null);
        }
        return null;
    }
}

2.3.2 TreeNode

树节点类,另外一个核心的数据结构。当链表长度过长的时候,会转换为TreeNode。但是与HashMap不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。而且TreeNode在ConcurrentHashMap集成自Node类,而并非HashMap中的集成自LinkedHashMap.Entry<K,V>类,也就是说TreeNode带有next指针,这样做的目的是方便基于TreeBin的访问。

2.3.3 TreeBin

包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。

/**
  * Creates bin with initial set of nodes headed by b.
  */
TreeBin(TreeNode<K,V> b) {
    super(TREEBIN, null, null, null);
    this.first = b;
    TreeNode<K,V> r = null;
    for (TreeNode<K,V> x = b, next; x != null; x = next) {
        next = (TreeNode<K,V>)x.next;
        x.left = x.right = null;
        if (r == null) {
            x.parent = null;
            x.red = false;
            r = x;
        }
        else {
            K k = x.key;
            int h = x.hash;
            Class<?> kc = null;
            for (TreeNode<K,V> p = r;;) {
                int dir, ph;
                K pk = p.key;
                if ((ph = p.hash) > h)
                    dir = -1;
                else if (ph < h)
                    dir = 1;
                else if ((kc == null &&
                          (kc = comparableClassFor(k)) == null) ||
                         (dir = compareComparables(kc, k, pk)) == 0)
                    dir = tieBreakOrder(k, pk);
                TreeNode<K,V> xp = p;
                if ((p = (dir <= 0) ? p.left : p.right) == null) {
                    x.parent = xp;
                    if (dir <= 0)
                        xp.left = x;
                    else
                        xp.right = x;
                    r = balanceInsertion(r, x);
                    break;
                }
            }
        }
    }
    this.root = r;
    assert checkInvariants(root);
}

2.3.4 ForwardingNode

这个内部类在2.3.5 中可以看到源码

其实是一个包含一个nextTable指针,用于指向下一张表。而且这个节点的key value next指针全部为null,它的hash值为-1. 这里面定义的find的方法是从nextTable里进行查询节点,而不是以自身为头节点进行查找

2.3.5 ValuesView、ValueIterator、KeySetView、KeyIterator

在2.2.9中已经查看。此处不赘述

3、Unsafe与CAS

unsafe代码块控制了一些属性的修改工作,比如最常用的SIZECTL 。 在这一版本的concurrentHashMap中,大量应用来的CAS方法进行变量、属性的修改工作。 利用CAS进行无锁操作,可以大大提高性能。

private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;

static {
    try {
        U = sun.misc.Unsafe.getUnsafe();
        Class<?> k = ConcurrentHashMap.class;
        SIZECTL = U.objectFieldOffset
            (k.getDeclaredField("sizeCtl"));
        TRANSFERINDEX = U.objectFieldOffset
            (k.getDeclaredField("transferIndex"));
        BASECOUNT = U.objectFieldOffset
            (k.getDeclaredField("baseCount"));
        CELLSBUSY = U.objectFieldOffset
            (k.getDeclaredField("cellsBusy"));
        Class<?> ck = CounterCell.class;
        CELLVALUE = U.objectFieldOffset
            (ck.getDeclaredField("value"));
        Class<?> ak = Node[].class;
        ABASE = U.arrayBaseOffset(ak);
        int scale = U.arrayIndexScale(ak);
        if ((scale & (scale - 1)) != 0)
            throw new Error("data type scale not a power of two");
        ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
    } catch (Exception e) {
        throw new Error(e);
    }
}

CAS 会单独写文章来进行分析这个理论

4、 常见Q&A

Tip: 本文分析的1.8的源码,没有看1.7源码,因此涉及1.7部分的回答都是网上摘抄,可自行求证

4.1 ConcurrentHashMap有哪些构造函数?

无参构造

初始容器大小的构造函数

传入Map的构造

设置阈值和初始容量

初始容量和阈值和并发级别

4.2 ConcurrentHashMap使用什么技术来保证线程安全?

jdk1.7:Segment+HashEntry来进行实现的;

jdk1.8:放弃了Segment臃肿的设计,采用Node+CAS+Synchronized来保证线程安全;

4.3 ConcurrentHashMap的get方法是否要加锁,为什么?

不需要,get方法采用了unsafe方法,来保证线程安全。

4.4 ConcurrentHashMap迭代器是强一致性还是弱一致性?HashMap呢?

弱一致性,hashmap强一直性。

ConcurrentHashMap可以支持在迭代过程中,向map添加新元素,而HashMap则抛出了ConcurrentModificationException,

因为HashMap包含一个修改计数器,当你调用他的next()方法来获取下一个元素时,迭代器将会用到这个计数器。

4.5 ConcurrentHashMap1.7和1.8的区别;

jdk1.8的实现降低锁的粒度,jdk1.7锁的粒度是基于Segment的,包含多个HashEntry,而jdk1.8锁的粒度就是Node

数据结构:jdk1.7 Segment+HashEntry;jdk1.8 数组+链表+红黑树+CAS+synchronized

4.6 ConcurrentHashMap中变量使用final和volatile修饰有什么用呢?其中链表是final的next属性,那么发生删除某个元素,如何实现的?

使用final来实现不变模式(immutable),他是多线程安全里最简单的一种保障方式。因为你拿他没有办法,想改变它也没有机会。不变模式主要通过final关键字来限定的。在JMM中final关键字还有特殊的语义。Final域使得确保初始化安全性(initialization safety)成为可能,初始化安全性让不可变形对象不需要同步就能自由地被访问和共享。

使用volatile来保证某个变量内存的改变对其他线程即时可见,在配合CAS可以实现不加锁对并发操作的支持

remove执行的开始就将table赋给一个局部变量tab,将tab依次复制出来,最后直到该删除位置,将指针指向下一个变量。

5、 总结

总结只针对1.8版本。至于1.7版本若有时间在进行解析源码(1.8对这个类做了很大的变动)

数据结构和HashMap类似,大部分属性都加上了final或者volidate关键字来确保线程安全。在做新增操作的时候采用了CAS(比较和交换Conmpare And Swap)的思路来处理线程的问题(这个思想中主要使用了关键字volidate)。另外在更新或者扩容等操作的时候使用了Synchronized来锁住hash桶的头节点来保证每个节点只能同时有一个线程来操作。jdk1.8的线程颗粒度相比1.7更细。

迭代器的弱一致性、强一致。强一致要求在迭代过程中不能对数据进行修改,一旦修改就迭代器会抛出异常。

弱一致则没有此要求。ConcurrentHashMap实现弱一致性其实就时通过不断循环进行寻找对应元素。

相关推荐

linux实例之设置时区的方式有哪些

linux系统下的时间管理是一个复杂但精细的功能,而时区又是时间管理非常重要的一个辅助功能。时区解决了本地时间和UTC时间的差异,从而确保了linux系统下时间戳和时间的准确性和一致性。比如文件的时间...

Linux set命令用法(linux cp命令的用法)

Linux中的set命令用于设置或显示系统环境变量。1.设置环境变量:-setVAR=value:设置环境变量VAR的值为value。-exportVAR:将已设置的环境变量VAR导出,使其...

python环境怎么搭建?小白看完就会!简简单单

很多小伙伴安装了python不会搭建环境,看完这个你就会了Python可应用于多平台包括Linux和MacOSX。你可以通过终端窗口输入"python"命令来查看本地是否...

Linux环境下如何设置多个交叉编译工具链?

常见的Linux操作系统都可以通过包管理器安装交叉编译工具链,比如Ubuntu环境下使用如下命令安装gcc交叉编译器:sudoapt-getinstallgcc-arm-linux-gnueab...

JMeter环境变量配置技巧与注意事项

通过给JMeter配置环境变量,可以快捷的打开JMeter:打开终端。执行jmeter。配置环境变量的方法如下。Mac和Linux系统在~/.bashrc中加如下内容:export...

C/C++|头文件、源文件分开写的源起及作用

1C/C++编译模式通常,在一个C++程序中,只包含两类文件——.cpp文件和.h文件。其中,.cpp文件被称作C++源文件,里面放的都是C++的源代码;而.h文件则被称...

linux中内部变量,环境变量,用户变量的区别

unixshell的变量分类在Shell中有三种变量:内部变量,环境变量,用户变量。内部变量:系统提供,不用定义,不能修改环境变量:系统提供,不用定义,可以修改,可以利用export将用户变量转为环...

在Linux中输入一行命令后究竟发生了什么?

Linux,这个开源的操作系统巨人,以其强大的命令行界面而闻名。无论你是初学者还是经验丰富的系统管理员,理解在Linux终端输入一条命令并按下回车后发生的事情,都是掌握Linux核心的关键。从表面上看...

Nodejs安装、配置与快速入门(node. js安装)

Nodejs是现代JavaScript语言产生革命性变化的一个主要框架,它使得JavaScript从一门浏览器语言成为可以在服务器端运行、开发各种各样应用的通用语言。在不同的平台下,Nodejs的安装...

Ollama使用指南【超全版】(olaplex使用方法图解)

一、Ollama快速入门Ollama是一个用于在本地运行大型语言模型的工具,下面将介绍如何在不同操作系统上安装和使用Ollama。官网:https://ollama.comGithub:http...

linux移植(linux移植lvgl)

1uboot移植l移植linux之前需要先移植一个bootlader代码,主要用于启动linux内核,lLinux系统包括u-boot、内核、根文件系统(rootfs)l引导程序的主要作用将...

Linux日常小技巧参数优化(linux参数调优)

Linux系统参数优化可以让系统更加稳定、高效、安全,提高系统的性能和使用体验。下面列出一些常见的Linux系统参数优化示例,包括修改默认配置、网络等多方面。1.修改默认配置1.1修改默认编辑器默...

Linux系统编程—条件变量(linux 条件变量开销)

条件变量是用来等待线程而不是上锁的,条件变量通常和互斥锁一起使用。条件变量之所以要和互斥锁一起使用,主要是因为互斥锁的一个明显的特点就是它只有两种状态:锁定和非锁定,而条件变量可以通过允许线程阻塞和等...

面试题-Linux系统优化进阶学习(linux系统的优化)

一.基础必备优化:1.关闭SElinux2.FirewalldCenetOS7Iptables(C6)安全组(阿里云)3.网络管理服务||NetworkManager|network...

嵌入式Linux开发教程:Linux Shell

本章重点介绍Linux的常用操作和命令。在介绍命令之前,先对Linux的Shell进行了简单介绍,然后按照大多数用户的使用习惯,对各种操作和相关命令进行了分类介绍。对相关命令的介绍都力求通俗易懂,都给...

取消回复欢迎 发表评论: