“Java 的源代码学习”系列
(1)基本类型和对应的类
(2)HashMap 和 ConcurrentHashMap
(4)ReentrantLock 相关类(上)(本文)
经过前两篇与各种基础类库的 CAS 的“战斗”,大部分人估计已经晕了。在大多数情况,我们并不会直接参与 CAS 相关的实现细节,而是使用 JDK 提供的各种并发控制的类包。
例如我们常见的锁机制。这里老夫就要来研究一下 ReentrantLock 这个类相关的源代码实现。
老夫把此篇拆为上、下两篇,上篇(本文)是在看 ReentrantLock 相关代码前的一些知识准备(包括对 ThreadLocal 类的代码学习)。在下篇再开始读 ReentrantLock 相关的代码。
现在博客用的垃圾代码高亮插件貌似有问题,如果遇到下面的代码混乱一片,通常刷新下页面就行了。等老夫有空再把这个插件换掉。
一、Object 对象提供的同步机制
先回忆一下 Object 这个类提供的并发控制机制。比如我们要写一个循环队列,当队列满的时候,入队线程被挂起;当有元素出队时,唤醒被挂起的线程。出队的示例代码如下(这些代码抄自这里):
synchronized (taskQueue)
{
while (taskQueue.isEmpty())
{
taskQueue.wait();
}
int i = (Integer) taskQueue.remove(0);
taskQueue.notifyAll();
}
而入队的代码如下:
synchronized (taskQueue) { while (taskQueue.size() == MAX_CAPACITY) { taskQueue.wait(); } taskQueue.add(i); taskQueue.notifyAll(); }
首先要明确两点:
- 在调用 wait 或 notifyAll 方法时,必须要获得该对象的监视器(monitor),也就是上面加的 synchronized 关键字。在没有 monitor 的情况下调用 wait 或 notifyAll,Java 会抛出 IllegalMonitorStateException 的异常。在 wait 返回后,会自动获得 monitor。
- 线程被唤醒后,必须要再次判断条件是否成立,也就是要写到一个 while 循环里面。这个应该是没啥好说的。
- 对于上面的代码,不能用 notify 代替 notifyAll。因为使用 notify 后被唤醒的线程是随机选取的,比如入队后我们调用 notify,此时被唤醒的线程可能是另一个入队的线程,而不是我们所期望的出队线程。因此我们只能 notifyAll 让这些线程去竞争,以“期待”出队线程能够被选中。
至于一个线程何时被 notify(也就是调用的 wait 方法返回),Java 的文档中指出,当满足下面四种条件之一时,wait 方法会返回:
- 其他线程调用了此对象的 notify 方法,并且当前线程被选择为唤醒线程。
- 其他线程调用了此对象的 notifyAll 方法。
- 当前线程被中断(调用了 interrupt 方法)。
- 如果在调用 wait 时指定了超时时间,并且时限已过。
此外要特别注意,上面四种情况之一并不是线程被唤醒的充分必要条件,即使上述条件都不满足,线程还是【有可能】被唤醒,这种情况叫“spurious wakeup”,此时无论如何用 while 循环判断条件是必要的(虽然概率很低,但是一旦发生,便会让人找不着头脑)。
可以看出直接使用 Object 类的这些方法在唤醒时会比较混乱,这样导致的后果就是很多不必要的竞争。所以现在大多数情况都是使用的其他类包,比如上述队列可以改用 Lock 和 Condition 来实现,这也就是老夫需要分析的重点内容。老夫以前做过的一道题目,就是使用其他类库代替 Object 类的这些方法来控制同步,这也是 Google 的一道面试题,具体参见这里。
二、ThreadLocal 和 ThreadLocalMap
JDK 提供了 ThreadLocal<T> 类用于保存线程的私有变量。例如乃想为每一个线程分配一个 id,但这个 id 仅仅是为了不重复而设定的,那使用 ThreadLocal 就很好啦,JDK 的示例代码如下:
import java.util.concurrent.atomic.AtomicInteger; public class ThreadId { // Atomic integer containing the next thread ID to be assigned private static final AtomicInteger nextId = new AtomicInteger(0); // Thread local variable containing each thread's ID private static final ThreadLocal<Integer> threadId = new ThreadLocal<Integer>() { @Override protected Integer initialValue() { return nextId.getAndIncrement(); } }; // Returns the current thread's unique ID, assigning it if necessary public static int get() { return threadId.get(); } }
在实现上,Thread 类有两个 ThreadLocal.ThreadLocalMap 类型的变量 threadLocals 和 inheritableThreadLocals。ThreadLocalMap 是一个内部的静态类,其成员变量的定义如下:
static class ThreadLocalMap { static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } } private static final int INITIAL_CAPACITY = 16; private Entry[] table; private int size = 0; private int threshold; // Default to 0 ...... }
这里的 Entry 是一个 WeakReference,同时注意 super(k) 这句话,弱引用的是 ThreadLocal<?> 这个对象。这样做的好处是,当 ThreadLocal 被回收后,相关的 Entry 也就可以扔掉了,这样使 ThreadLocal 被回收后,Entry 能够“自动”被回收,而不用把处理 Entry 的相关代码糅杂在其他类中。
这个 Map 实际上是一个单线程访问(废话!)的哈希表,在有冲突时,直接放到下一格去:
private static int nextIndex(int i, int len) { return ((i + 1 < len) ? i + 1 : 0); } private static int prevIndex(int i, int len) { return ((i - 1 >= 0) ? i - 1 : len - 1); }
很显然,冲突是一个麻烦事。先看一下 set 方法:
private void set(ThreadLocal<?> key, Object value) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get(); if (k == key) { e.value = value; return; } if (k == null) { replaceStaleEntry(key, value, i); return; } } tab[i] = new Entry(key, value); int sz = ++size; if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); }
如果相关区域已经存在元素,并且 key 相同,直接替换 value,这是最简答的情况。如果 key 为 null,就麻烦了,因为相关的 ThreadLocal 已经被回收(注意,如果此区域不存在元素,e 就为 null,而不是 e.get() 为 null),此时调用 replaceStaleEntry 方法。
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) { Entry[] tab = table; int len = tab.length; Entry e; int slotToExpunge = staleSlot; for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len)) if (e.get() == null) slotToExpunge = i;
看上去像是向前找一个被回收的 Entry(直到遇到空格子前)。这是在干啥呢?
for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); if (k == key) { e.value = value; tab[i] = tab[staleSlot]; tab[staleSlot] = e; if (slotToExpunge == staleSlot) slotToExpunge = i; cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); return; } if (k == null && slotToExpunge == staleSlot) slotToExpunge = i; }
接着向后遍历每一个格子,如果遇到 key 与待插入的相同的 Entry,则将该 Entry 与待插入的位置的元素进行交换。然后调用 expungeStaleEntry 方法对这个格子进行回收。
private int expungeStaleEntry(int staleSlot) { Entry[] tab = table; int len = tab.length; // expunge entry at staleSlot tab[staleSlot].value = null; tab[staleSlot] = null; size--; // Rehash until we encounter null Entry e; int i; for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); if (k == null) { e.value = null; tab[i] = null; size--; } else { int h = k.threadLocalHashCode & (len - 1); if (h != i) { tab[i] = null; // Unlike Knuth 6.4 Algorithm R, we must scan until // null because multiple entries could have been stale. while (tab[h] != null) h = nextIndex(h, len); tab[h] = e; } } } return i; }
这样子在遇到下一个 null 之前,把所有 key 为 null 的(也就是被回收的 ThreadLocal 相关的值)回收掉。如果不是 null,则尝试把它放到“正确”的位置上(指直接映射的位置),以提高第一次命中的概率。
而 cleanSomeSlots,则是扫描 log n 次元素把可以回收的干掉。至于为啥是 log n 次,看似是随便估计的,使其能够回收一定量的格子,又不至于耗时太久。
private boolean cleanSomeSlots(int i, int n) { boolean removed = false; Entry[] tab = table; int len = tab.length; do { i = nextIndex(i, len); Entry e = tab[i]; if (e != null && e.get() == null) { n = len; removed = true; i = expungeStaleEntry(i); } } while ( (n >>>= 1) != 0); return removed; }
所以可以看出,在插入元素的时候,ThreadLocalMap 的复杂度是 O(n),因为它要负责清理一些陈旧的元素。因此,使用 WeakReference 的实现模式带来的负面影响就是实现起来很麻烦,而且时间复杂度稍高。不过带来的好处就是 Thread 类不用负责 ThreadLocal 里元素的生命周期管理,大大降低了耦合性。
顺便一提,网上很多讨论 ThreadLocal 带来的内存泄露,其本质上是使用方式有问题,而不是 ThreadLocal 类本身的 bug。这就好比在开发 Android 时一大堆内存泄露的问题都是由于没有好好看官方文档所引起的,面对这种情况,能有啥办法……
总结一下,ThreadLocalMap 其实就是存储于 Thread 对象里的,其中使用的 WeakReference 使乃不用去刻意去 remove 相关的元素,只要 ThreadLocal 被回收了就行了(当然前提是正确使用)。
三、自旋锁和 CLH 锁
乃会说,说了一大堆,这和 CLH 有啥关系?有了 ThreadLocal,我们就可以“轻易”实现 CLH。
CLH 是由 Craig、Landin 和 Hagersten 这几个哥们在九十年代提出的。以前老夫在分析 M$ 的 ConurrentQueue 的时候,见到过自旋锁这个东东。当临界区非常小时,正在占用临界区的线程可能只需要少数几条指令就能够释放锁,而如果此时将进程睡眠(使用操作系统内核提供的 mutex 等机制)再等待唤醒,那至少是几十毫秒(从用户态到内核态的相互切换)的开销,相较于几天指令就能完事的情况,浪费的时间太多。
而自旋锁使用一个循环来检查临界资源是否已经被释放,例如:
public void lock() { Thread currentThread = Thread.currentThread(); while (!owner.compareAndSet(null, currentThread)) { }
可以看到,自旋锁在多处理器(此时,需要 lock 指令锁总线)或者内存可抢占时使用才有效。此外,由于一直在 while 占用 CPU 资源,如果尝试了几次还是得不到资源,最好是使用 CPU 提供的 PAUSE 等待一段时间(如果没有这条指令其实除了耗电以外,倒没啥别的影响)。如果长时间得不到,择可以使用 sched_yield 短时间出让一下 CPU 资源。由此我们可以看出:
- CAS 需要硬件的相关指令(不过目前不支持这个指令的处理器应该很少见了吧?);
- 需要保存 CPU 各级缓存和内存的数据一致性,通讯开销比较大,而且在多处理器时还要跨 CPU Socket;
- 没法保证公平性,调用此方法的线程只能互相碰运气竞争资源。
为了改进公平性,我们为每一个来排队的线程分配一个编号,另外设置一个临界区正在使用的线程编号。当某个线程使用完后,把编号加一,此时其它正在轮询的线程检查是否是自己所等待的编号,如果是,则进入临界区。
很容易发现这样只会使情况更糟。为此有两个哥们提出了 MCS 锁(也是以他们名字的首字母命名的),使用链表来实现自旋锁,线程只在本地变量上自选,上一结点通知后一个结点结束自选,从而减少了缓存同步的次数。
之后 CLH 等人提出了一种改进(之后称之为 CLH 自旋锁)。比如下面这种实现(代码抄自文末参见的第三篇)。
class ClhSpinLock { private final ThreadLocal<Node> prev; private final ThreadLocal<Node> node; private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node()); public ClhSpinLock() { this.node = new ThreadLocal<Node>() { protected Node initialValue() { return new Node(); } }; this.prev = new ThreadLocal<Node>() { protected Node initialValue() { return null; } }; } public void lock() { final Node node = this.node.get(); node.locked = true; Node pred = this.tail.getAndSet(node); this.prev.set(pred); while (pred.locked) { } } public void unlock() { final Node node = this.node.get(); node.locked = false; this.node.set(this.prev.get()); } private static class Node { private volatile boolean locked; } }
其实上述代码有问题,getAndSet 这段逻辑应该在一个 while 循环里,或者直接使用 AtomicReferenceFieldUpdater 这个类提供的 getAndSet,因为可能会出现多个线程竞争更新 tail 的情况。
对于每一个线程,都保存了队列中前面一个结点的引用(直接使用 ThreadLocal,这样我们就不用搞另外一个结构来管理这些线程私有的变量了)。每次调用 lock 方法,都会在当前的 tail 后添加一个新结点(使用 AtomicReference.getAndSet 方法保证操作的原子性),之后使用 while 循环等待前一个结点释放(locked 为 true)。
此时,对于每一个线程,只等待其前一个结点释放,避免了多个线程竞争同一个变量的情况,同时可以实现公平性。
四、LockSupport
LockSupport 是实现锁(和相关同步类)的基本线程阻塞原语。
ReentrantLock 内部使用的 Sync 继承自 AbstractQueuedSynchronizer(下一篇文章会讲到),其实现是基于 LockSupport 的。所以是 Java 中各种锁的“始祖”,因此有必要好好理解一下这个类。
LockSupport 是一个“静态类”。这里的静态类指其构造方法是私有的,所有方法和字段都是 static 的。
偶们来看一下它的用法(JDK 里面的示例代码):
class FIFOMutex { private final AtomicBoolean locked = new AtomicBoolean(false); private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>(); public void lock() { boolean wasInterrupted = false; Thread current = Thread.currentThread(); waiters.add(current); // Block while not first in queue or cannot acquire lock while (waiters.peek() != current || !locked.compareAndSet(false, true)) { LockSupport.park(this); if (Thread.interrupted()) // ignore interrupts while waiting wasInterrupted = true; } waiters.remove(); if (wasInterrupted) // reassert interrupt status on exit current.interrupt(); } public void unlock() { locked.set(false); LockSupport.unpark(waiters.peek()); } }
初看起来,这个东西貌似和 Object 的 wait、notify 没啥区别嘛?但是注意,LockSupport 是基于线程的;而 Object 的 wait 和 notify 是被动的。具体说来:
- LockSupport 为每个线程提供两种状态:许可、不许可。
- LockSupport.park(Object blocker):如果当前线程处于“许可”状态,该方法立即返回;否则将会等到其他线程为当前线程调用 unpark 重新回到许可状态或者中断当前线程。
- LockSpport.unpark(Thread thread):解除 thread 线程的阻塞状态,如果未被阻塞则在下次调用 park 时不会阻塞。如果指定线程没有启动,则没有效果。
- 连续调用 park 时,将会一直被阻塞。
- park 有虚假唤醒的情况,因此要放在 while 循环里面重复检查条件是否满足。
简单来说,LockSupport 和 Object 类提供的方法相比:
- 面向的对象不同。LockSupport 针对线程来设置其许可、不许可状态,Object 类针对某个对象。
- LockSupport 与监视器(monitor)无关,而 Object 类操作前必须要有监视器。
- LockSupport 是实现各种锁的基本原语,一般的项目中应使用更高层次的实现,不建议直接使用。(Object 类的 wait、notify 等实际上最好也不要直接在项目中使用。)
- LockSupport 在线程中断时不会抛出异常。需要自己去判断线程的 Thread.interupted (如上面代码所示)。
LockSupport 的大部分代码都是直接调用的 Unsafe 类中的相关方法,这些方法都是 native 的,与操作系统提供的 API 有关。例如 park 方法:
public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); }
private static void setBlocker(Thread t, Object arg) { // Even though volatile, hotspot doesn't need a write barrier here. UNSAFE.putObject(t, parkBlockerOffset, arg); }
LockSupport 类直接在 Thread 对象里的 parkBlocker 字段里写入一个 Object 对象用于标记使用(和 ThreadLocalMap 类似),主要是为了方便调试。当然你也可以不提供或随便给一个东西。
在 LockSupport 中实现了一个我们经常使用的 nextSecondarySeed 方法(ThreadLocalRandom 也提供了这个方法),用于生成每个线程独立的随机数种子。
static final int nextSecondarySeed() { int r; Thread t = Thread.currentThread(); if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) { r ^= r << 13; // xorshift r ^= r >>> 17; r ^= r << 5; } else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0) r = 1; // avoid zero UNSAFE.putInt(t, SECONDARY, r); return r; }
这个信息存储于 Thread 对象的 threadLocalRandomSeed 等字段。无论是 LockSupport 还是 ThreadLocal,实际上都是封装了对 Thread 类不同功能的访问,实际上所有内容都是塞在 Thread 类里面。
AbstractQueuedSynchronized 里面主要是用了 LockSupport 进行并发控制。
知道了上面这些乱七八糟的基础知识,我们就能够比较方便地分析 ReentrantLock 了。具体老夫会在下一篇中具体分析。
参见:
[1] 自旋锁、排队自旋锁、MCS锁、CLH锁(http://coderbee.net/index.php/concurrent/20131115/577);
[2] 【linux】spinlock 的实现(http://www.cnblogs.com/chenpingzhao/p/5043746.html)
[3] Java并发包源码学习之AQS框架(二)(http://zhanjindong.com/2015/03/11/java-concurrent-package-aqs-clh-and-spin-lock)