Rx:4-[编外篇] .NET4里的Concurrent Collections
Rx可以在3.5里用,他带给了3.5可以使用的支持并发的集合类型,说白了,就是提供了Thread-Safe的Collection。
在.NET 4.0之前,是不直接支持线程安全的集合的。除非自己去做lock。而Lock带给我们的除了风险之外,系统的性能下降问题特别明显。在我之前的项目中,需要用到一个Thread-safe的Queue,我当时使用Compare-And-Swap方式实现的。但是这种方式并不能保证高效,个人能力有限,Google也没帮上多大忙。所以.net 4.0一出来的时候,宣布支持ConcurrentQueue<T>,我立刻就抄起Reflector去看看它的实现方式。
当然,除了ConcurrentQueue<T>外,还有其他几个支持线程安全的并发集合类型,看完之后,才发现他们并不全都是Lock-free的,这点还是跟我的判断不同的。
我们常说的Lock-free,对于.net来讲,其实是说就是不用lock(),也就是Monitor这个类。在Team的技术分享中我多次建议大家不要直接写lock,而是写Monitor.TryEnter(object, 200)这样的方式,这样的好处是避免整个程序面临尴尬的状况,毕竟对于一个互联网来说,绝大多数应用并不是追求完整性的,牺牲整个站点的代价太高了。在.net里,我们可以通过memory barrier或者使用CPU的CAS(就是上面说的Compare-And-Swap,那个Interlocked类就是)指令来进行一些粒度锁,这些都是很轻量级的,我们认为他是不会损坏系统性能的。
注:(感谢@Kavin Yang的提醒)
使用lock()的时候,.net会一直等待能够lock才行。而lock()的实现就是调用Monitor.Enter。但是Monitor还有一个叫做TryEnter的方法,参数除了要lock的object外,还提供一个时间参数作为该操作的超时时间。
可以参见:http://msdn.microsoft.com/en-us/library/42h9d380(v=VS.80).aspx
那看几个实现:
ConcurrentQueue<T>和ConcurrentStack<T>这两个是使用CAS的方式来完成任务的,实现原理是一样的:
1: private void PushCore(Node<T> head, Node<T> tail)<!--CRLF-->
2: {<!--CRLF-->
3: SpinWait wait = new SpinWait();<!--CRLF-->
4: do<!--CRLF-->
5: {<!--CRLF-->
6: wait.SpinOnce();<!--CRLF-->
7: tail.m_next = this.m_head;<!--CRLF-->
8: }<!--CRLF-->
9: while (Interlocked.CompareExchange<Node<T>>(ref this.m_head, head, tail.m_next) != tail.m_next);<!--CRLF-->
10: }<!--CRLF-->
注意到do…while。整个实现都没有使用lock的地方,唯独在CAS失败的时候使用SpinWait的SpinOnce方法。
ConcurrentDictionary<TKey, TValue>这个类也在mscorlib.dll里,同样位于System.Collections.Concurrent命名空间下。但是它的add、update等方法都是使用了lock来完成的(当然不是完全依赖lock,所以有性能提升)
1: try<!--CRLF-->
2: {<!--CRLF-->
3: if (acquireLock)<!--CRLF-->
4: {<!--CRLF-->
5: Monitor.Enter(this.m_locks[num3], ref lockTaken);<!--CRLF-->
6: }<!--CRLF-->
7: if (nodeArray != this.m_buckets)<!--CRLF-->
8: {<!--CRLF-->
9: goto Label_000D;<!--CRLF-->
10: }<!--CRLF-->
11: Node<TKey, TValue> node = null;<!--CRLF-->
12: ....<!--CRLF-->
但是对于他的读操作:
1: public bool TryGetValue(TKey key, out TValue value)<!--CRLF-->
2: {<!--CRLF-->
3: int num;<!--CRLF-->
4: int num2;<!--CRLF-->
5: if (key == null)<!--CRLF-->
6: {<!--CRLF-->
7: throw new ArgumentNullException("key");<!--CRLF-->
8: }<!--CRLF-->
9: Node<TKey, TValue>[] buckets = this.m_buckets;<!--CRLF-->
10: this.GetBucketAndLockNo(this.m_comparer.GetHashCode(key), out num, out num2, buckets.Length);<!--CRLF-->
11: Node<TKey, TValue> next = buckets[num];<!--CRLF-->
12: Thread.MemoryBarrier();<!--CRLF-->
13: while (next != null)<!--CRLF-->
14: {<!--CRLF-->
15: if (this.m_comparer.Equals(next.m_key, key))<!--CRLF-->
16: {<!--CRLF-->
17: value = next.m_value;<!--CRLF-->
18: return true;<!--CRLF-->
19: }<!--CRLF-->
20: next = next.m_next;<!--CRLF-->
21: }<!--CRLF-->
22: value = default(TValue);<!--CRLF-->
23: return false;<!--CRLF-->
24: }<!--CRLF-->
能看到这里的处理方式还是很巧妙的,Dictionary这个数据结构本身就是一个设计为读多的类型,read的次数肯定要远远超过write的次数。所以这个ConcrruentDictionary的设计并不是追求lock-free的改/写操作,而是将read操作做到了lock-free!
ConcrruntBat<T>这个类设计的更有意思,思路很像是采用了Pool的方式(Pool一般有3种常见的使用方式),将pool对应到应用的thread上,他们之间就没有锁的我问题了(或者无损耗的锁),比如说Add操作:
1: public void Add(T item)<!--CRLF-->
2: {<!--CRLF-->
3: ThreadLocalList<T> threadList = this.GetThreadList(true);<!--CRLF-->
4: this.AddInternal(threadList, item);<!--CRLF-->
5: }<!--CRLF-->
?
1: private void AddInternal(ThreadLocalList<T> list, T item)<!--CRLF-->
2: {<!--CRLF-->
3: bool taken = false;<!--CRLF-->
4: try<!--CRLF-->
5: {<!--CRLF-->
6: Interlocked.Exchange(ref list.m_currentOp, 1);<!--CRLF-->
7: if ((list.Count < 2) || this.m_needSync)<!--CRLF-->
8: {<!--CRLF-->
9: list.m_currentOp = 0;<!--CRLF-->
10: Monitor2.Enter(list, ref taken);<!--CRLF-->
11: }<!--CRLF-->
12: list.Add(item, taken);<!--CRLF-->
13: }<!--CRLF-->
14: finally<!--CRLF-->
15: {<!--CRLF-->
16: list.m_currentOp = 0;<!--CRLF-->
17: if (taken)<!--CRLF-->
18: {<!--CRLF-->
19: Monitor.Exit(list);<!--CRLF-->
20: }<!--CRLF-->
21: }<!--CRLF-->
22: }<!--CRLF-->
?
Rx系列:
Rx:1-ObservableRx:2-Observable moreRx:3-System.CoreEx.dll