说到线程安全,不要一下子就想到加锁,尤其是可能会调用频繁或者是要求高性能的场合。
对于性能要求不高或者同步的对象数量不多的时候,加锁是一个比较简单而且易于实现的选择。比方说.NET提供的一些基础类库,比如线程安全的堆栈和队列,如果使用加锁的方式那么会使性能大打折扣(速度可能会降低好几个数量级),而且如果设计得不好的话还有可能发生死锁。
现在通过查看微软的源代码来学习一些不直接lock(等价于Monitor类)的线程同步技巧吧。
这里我们主要用的是Interlocked类,这个类按照M$的描述,是“为多个线程共享的变量提供原子操作”,当然这个类是一个静态类。这个类的源代码看不到,因为是调用的CLR内部的方法,不过基本思想应该是通过硬件原语try and set来实现的。
该类提供的Add、Increment、Decrement能够完成简单的原子操作。
假如我们要提供一个计数器,每访问一次就递增地返回一个新的数值用于计数。在多线程环境下,s++这一条语句不是线程安全的。因为执行这个语句要经过:移到寄存器(读取)、运算、写入这几个步骤,在任何时候都可能会切换到其他线程,这样子s被多个线程访问值可能会在切换的过程中丢失。有了Interlocked提供的这几个原子操作的方法,就不用自己去加锁实现这些简单的运算了。由于是使用的硬件原语,其效率自然也比加锁高得多。
但是大多数情况下,问题并没有执行相加相减运算那么简单,这时如果不想用锁的话就要想想办法了。
以微软的ConcurrentStack提供的线程安全的堆栈为例,分析一下如何实现如果往栈头添加数据。
m_head是指向堆顶的指针,在定义的时候由于是多线程访问的,所以要加上volatile修饰符:
private volatile Node m_head;
如果是单线程的,那么入栈语句就是下面这个样子:
Node newNode = new Node(item);
newNode.m_next = m_head;
m_head = newNode;
假如有两个线程并发访问入栈方法的话,那么可能会产生如下情况:第一个线程执行完第二条语句被打断,第二个线程执行到第二条语句又切换回第一个线程,两个线程执行完后有一个入栈的元素就不见了。那么如何实现线程安全呢?M$的代码是这样写的:
Node newNode = new Node(item);
newNode.m_next = m_head;
if (Interlocked.CompareExchange(ref m_head, newNode, newNode.m_next) == newNode.m_next)
{
return;
}
// If we failed, go to the slow path and loop around until we succeed.
PushCore(newNode, newNode);
首先,Interlocked.CompareExchange比较两个元素是否相等,并根据比较的结果替换其中一个元素,返回结果始终是第一个元素的原值。这个方法是原子操作。
那么这段代码首先设置newNode的下一节点为堆栈顶部的元素,接下来CompareExchange,判断栈顶元素有没有被修改过。假如此时没有另一个线程修改栈顶元素,那么m_head还是原来的值(上一条语句设置的新栈顶的下一个元素),此时就可以安全地把栈顶指针指向新元素,操作完成(return)。注意CompareExchange是原子操作的,所以在这期间栈顶元素不可能再被修改。
如果比较结果不相等,那么说明栈顶元素已经被其他线程修改了(此时返回值就是被修改后的栈顶,和上一条语句设置m_next不一样),这样CompareExchange就不会修改m_head,说明入栈不成功,执行PushCore方法。
这个东东的代码如下:
private void PushCore(Node head, Node tail)
{
SpinWait spin = new SpinWait();
// Keep trying to CAS the exising head with the new node until we succeed.
do
{
spin.SpinOnce();
// Reread the head and link our new node.
tail.m_next = m_head;
}
while (Interlocked.CompareExchange(
ref m_head, head, tail.m_next) != tail.m_next);
#if !FEATURE_PAL && !FEATURE_CORECLR
if (CDSCollectionETWBCLProvider.Log.IsEnabled())
{
CDSCollectionETWBCLProvider.Log.ConcurrentStack_FastPushFailed(spin.Count);
}
#endif //!FEATURE_PAL && !FEATURE_CORECLR
}
可以看到其逻辑还是和上面那个一样,只是加了一个循环直到操作完成。在这期间使用了一个SpinWait对象和SpinOnce方法,那么我们又要了解一下这是干嘛的。
关于SpinWait对象,M$的说明是:System.Threading.SpinWait 是一个轻量同步类型,可以在低级别方案中使用它来避免内核事件所需的高开销的上下文切换和内核转换。
关于它的说明还有一堆,你可以参考这里。如果不想看那么多,那么只需了解它的使用场合是在资源不会被占用很长时间的时候进行等待,以用户模式自旋以避免高额的开销。
SpinOnce的说明很简单,就是执行单一自旋,可以理解为等待一个很短的时间。关于这个方法的代码分析你可以参考这里,总的来说,当自旋此时达到5次时,会切换到同一处理器上的另一个线程,当达到20次时,会调用Thread的Sleep方法阻塞当前线程,此时可以切换到其他同优先级或更高优先级的线程上去。
这样,就可以避免加锁(lock free)的高昂代价来实现线程的同步。
但是有的时候我们不能保证线程安全。比如堆栈的Count属性,在程序调用这个属性后,我们并不能保证这个属性返回的时候是正确的,在返回到应用程序的线程前元素数量是有可能变化的,因此我们也就只能保证我们的返回值曾经正确。
不过显而易见,这是可以接受的。对于开发者来说,假如我们要访问一个多线程字典(ConcurrentDictionary)中的指定元素,我们不应该是先判断是否为空再取元素(因为元素可能在这两步操作之间被删掉),而是应该使用TryGetValue这种保证线程安全的方法来进行操作。