并发HashSet< T>在。net框架?

我有下面的课。

class Test{
public HashSet<string> Data = new HashSet<string>();
}

我需要从不同的线程改变字段“数据”,所以我想对我目前的线程安全实现的一些意见。

class Test{
public HashSet<string> Data = new HashSet<string>();


public void Add(string Val){
lock(Data) Data.Add(Val);
}


public void Remove(string Val){
lock(Data) Data.Remove(Val);
}
}

是否有更好的解决方案,直接到字段并保护它免受多线程的并发访问?

132774 次浏览

您的实现是正确的。不幸的是,. net框架没有提供内置并发哈希集类型。然而,有一些变通办法。

ConcurrentDictionary最后(推荐)

第一个是在命名空间System.Collections.Concurrent中使用类ConcurrentDictionary<TKey, TValue>。在这种情况下,值是没有意义的,所以我们可以使用简单的byte(内存中的1字节)。

private ConcurrentDictionary<string, byte> _data;

这是推荐的选项,因为该类型是线程安全的,并且提供了与HashSet<T>相同的优点,除了键和值是不同的对象。

来源:社会MSDN

Self-implementation

最后,正如您所做的,您可以实现自己的数据类型,使用锁或其他。net提供的线程安全的方法。这里有一个很好的例子:如何在。net中实现ConcurrentHashSet

这个解决方案的唯一缺点是HashSet<T>类型没有正式的并发访问,即使对于读取操作也是如此。

我引用了链接帖子的代码(最初由本莫舍编写)。

using System;
using System.Collections.Generic;
using System.Threading;


namespace BlahBlah.Utilities
{
public class ConcurrentHashSet<T> : IDisposable
{
private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private readonly HashSet<T> _hashSet = new HashSet<T>();


#region Implementation of ICollection<T> ...ish
public bool Add(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Add(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}


public void Clear()
{
_lock.EnterWriteLock();
try
{
_hashSet.Clear();
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}


public bool Contains(T item)
{
_lock.EnterReadLock();
try
{
return _hashSet.Contains(item);
}
finally
{
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}


public bool Remove(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Remove(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}


public int Count
{
get
{
_lock.EnterReadLock();
try
{
return _hashSet.Count;
}
finally
{
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}
}
#endregion


#region Dispose
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
if (_lock != null)
_lock.Dispose();
}
~ConcurrentHashSet()
{
Dispose(false);
}
#endregion
}
}

编辑:将入口锁方法移到try块之外,因为它们可以抛出异常并执行finally块中包含的指令。

ConcurrentBag(不建议)

不建议使用ConcurrentBag<T>,因为这种类型只允许以线程安全的方式插入给定的元素和删除随机元素。这个类是为促进生产者-消费者场景而设计的,这不是OP的目标(更多解释在这里)。

其他操作(例如,由扩展方法提供)支持并发使用。MSDN文档警告:“ConcurrentBag的所有public和protected成员都是线程安全的,可以从多个线程并发使用。然而,通过ConcurrentBag实现的接口之一访问的成员(包括扩展方法)不保证是线程安全的,可能需要调用者同步。"

我更喜欢完整的解决方案,所以我这样做了:请注意,我的计数以一种不同的方式实现,因为我不明白为什么应该禁止在尝试计数它的值时读取哈希集。

@Zen,谢谢你让它开始。

[DebuggerDisplay("Count = {Count}")]
[Serializable]
public class ConcurrentHashSet<T> : ICollection<T>, ISet<T>, ISerializable, IDeserializationCallback
{
private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);


private readonly HashSet<T> _hashSet = new HashSet<T>();


public ConcurrentHashSet()
{
}


public ConcurrentHashSet(IEqualityComparer<T> comparer)
{
_hashSet = new HashSet<T>(comparer);
}


public ConcurrentHashSet(IEnumerable<T> collection)
{
_hashSet = new HashSet<T>(collection);
}


public ConcurrentHashSet(IEnumerable<T> collection, IEqualityComparer<T> comparer)
{
_hashSet = new HashSet<T>(collection, comparer);
}


protected ConcurrentHashSet(SerializationInfo info, StreamingContext context)
{
_hashSet = new HashSet<T>();


// not sure about this one really...
var iSerializable = _hashSet as ISerializable;
iSerializable.GetObjectData(info, context);
}


#region Dispose


public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}


protected virtual void Dispose(bool disposing)
{
if (disposing)
if (_lock != null)
_lock.Dispose();
}


public IEnumerator<T> GetEnumerator()
{
return _hashSet.GetEnumerator();
}


~ConcurrentHashSet()
{
Dispose(false);
}


public void OnDeserialization(object sender)
{
_hashSet.OnDeserialization(sender);
}


public void GetObjectData(SerializationInfo info, StreamingContext context)
{
_hashSet.GetObjectData(info, context);
}


IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}


#endregion


public void Add(T item)
{
_lock.EnterWriteLock();
try
{
_hashSet.Add(item);
}
finally
{
if(_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}


public void UnionWith(IEnumerable<T> other)
{
_lock.EnterWriteLock();
_lock.EnterReadLock();
try
{
_hashSet.UnionWith(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}


public void IntersectWith(IEnumerable<T> other)
{
_lock.EnterWriteLock();
_lock.EnterReadLock();
try
{
_hashSet.IntersectWith(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}


public void ExceptWith(IEnumerable<T> other)
{
_lock.EnterWriteLock();
_lock.EnterReadLock();
try
{
_hashSet.ExceptWith(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}


public void SymmetricExceptWith(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
_hashSet.SymmetricExceptWith(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}


public bool IsSubsetOf(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.IsSubsetOf(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}


public bool IsSupersetOf(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.IsSupersetOf(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}


public bool IsProperSupersetOf(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.IsProperSupersetOf(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}


public bool IsProperSubsetOf(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.IsProperSubsetOf(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}


public bool Overlaps(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Overlaps(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}


public bool SetEquals(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.SetEquals(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}


bool ISet<T>.Add(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Add(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}


public void Clear()
{
_lock.EnterWriteLock();
try
{
_hashSet.Clear();
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}


public bool Contains(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Contains(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}


public void CopyTo(T[] array, int arrayIndex)
{
_lock.EnterWriteLock();
try
{
_hashSet.CopyTo(array, arrayIndex);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}


public bool Remove(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Remove(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}


public int Count
{
get
{
_lock.EnterWriteLock();
try
{
return _hashSet.Count;
}
finally
{
if(_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}


}
}


public bool IsReadOnly
{
get { return false; }
}
}

使ISet<T>并发的棘手之处在于set方法(union, intersection, difference)本质上是迭代的。至少,您必须遍历操作中涉及的一个集合的所有n个成员,同时锁定两个集合。

当你不得不在迭代期间锁定整个集合时,你就失去了ConcurrentDictionary<T,byte>的优势。如果没有锁定,这些操作就不是线程安全的。

考虑到ConcurrentDictionary<T,byte>所增加的开销,使用较轻的权重HashSet<T>并将所有内容都锁起来可能更明智。

如果您不需要设置操作,则使用ConcurrentDictionary<T,byte>并在添加键时仅使用default(byte)作为值。

由于没有人提到它,我将提供另一种方法,可能适合也可能不适合你的特定目的:

微软不可变集合

来自后面MS团队的博客:

虽然并发创建和运行比以往任何时候都容易,但仍然存在一个基本问题:可变共享状态。从多个线程读取通常非常容易,但一旦需要更新状态,就会变得非常困难,特别是在需要锁定的设计中。

锁定的另一种选择是使用不可变状态。不可变数据结构保证永远不会改变,因此可以在不同的线程之间自由传递,而不用担心踩到其他人的脚趾。

但是这种设计产生了一个新问题:如何在不每次复制整个状态的情况下管理状态更改?当涉及到收集时,这尤其棘手。

这就是不可变集合的用武之地。

这些集合包括ImmutableHashSet< T>ImmutableList< T>

性能

由于不可变集合在底层使用树数据结构来支持结构共享,因此它们的性能特征不同于可变集合。当与锁定可变集合进行比较时,结果将取决于锁争用和访问模式。然而,从另一篇博文中获取的关于不可变集合:

问:我听说不可变集合很慢。这些有什么不同吗?当性能或内存很重要时,我可以使用它们吗?

答:这些不可变集合经过了高度调优,在平衡内存共享的同时具有与可变集合竞争的性能特征。在某些情况下,它们在算法上和实际时间上几乎和可变集合一样快,有时甚至更快,而在其他情况下,它们在算法上更复杂。然而,在许多情况下,差异是可以忽略不计的。通常,您应该使用最简单的代码来完成工作,然后根据需要对性能进行调优。不可变集合帮助您编写简单的代码,特别是在必须考虑线程安全的情况下。

换句话说,在许多情况下,差异不会被注意到,你应该选择更简单的选择——对于并发集,将使用ImmutableHashSet<T>,因为你没有现有的锁定可变实现!: -)

我基于ConcurrentDictionary创建了一个实际的ConcurrentHashSet,而不是包装ConcurrentDictionary或锁定HashSet

这个实现支持每个项目的基本操作,而不需要HashSet的set操作,因为它们在并发场景中没有意义:

var concurrentHashSet = new ConcurrentHashSet<string>(
new[]
{
"hamster",
"HAMster",
"bar",
},
StringComparer.OrdinalIgnoreCase);


concurrentHashSet.TryRemove("foo");


if (concurrentHashSet.Contains("BAR"))
{
Console.WriteLine(concurrentHashSet.Count);
}

输出:2

你可以从NuGet 在这里得到它,并在GitHub 在这里上看到源代码。

我发现既不能简单地锁定System.Collections.Generic的添加和删除方法。HashSet,或包装框架的ConcurrentDictionary在“高吞吐量”方面是足够的;需要良好性能的场景。

通过使用这个简单的想法就可以获得相当好的性能:

public class ExampleHashSet<T>
{
const int ConcurrencyLevel = 124;
const int Lower31BitMask = 0x7FFFFFFF;
    

HashSet<T>[] sets = new HashSet<T>[ConcurrencyLevel];
IEqualityComparer<T> comparer;


public ExampleHashSet()
{
comparer = EqualityComparer<T>.Default;


for(int i = 0; i < ConcurrencyLevel; i++)
sets[i] = new HashSet<T>();
}


public bool Add(T item)
{
int hash = (comparer.GetHashCode(item) & Lower31BitMask) % ConcurrencyLevel;
        

lock(sets[hash])
{
return sets[hash].Add(item);
}
}


public bool Remove(T item)
{
int hash = (comparer.GetHashCode(item) & Lower31BitMask) % ConcurrencyLevel;
        

lock(sets[hash])
{
return sets[hash].Remove(item);
}
}


// further methods ...
}
系统的HashSet是被包装的,但与其他答案相比,我们持有多个答案的锁 hashset。不同的线程能够“工作”;在不同的hashset上,降低整体等待时间 这个想法可以被概括并直接在HashSet本身中实现(在桶上持有锁,而不是锁定 成套)。一个例子可以找到在这里.

基于ConcurrentDictionary<TKey, TValue>的解决方案通常具有良好的可扩展性;然而,如果你需要访问CountKeysValues属性,或者你迭代项目,它会变得比一个单一的锁定集合更糟糕。这是因为ConcurrentDictionary使用了一组锁(默认情况下,它们的数量取决于CPU内核的数量),访问这些成员会导致所有的锁都被获取,所以CPU的内核越多,它们的性能就越差。

另一个答案建议使用不可变集合。虽然它们是线程安全的,但只有在很少向它们添加新项的情况下(总是创建一个新实例,尽管尝试从前一个实例继承尽可能多的节点),它们才会表现良好,但即使在这种情况下,它们的性能通常也较差。

我最终使用了另一个解决方案(后来我也应用到我的ThreadSafeHashSet<T>上):与ConcurrentDictionary相反,我只使用一个锁,但只是暂时的:不时地,新项目被移动到完全无锁的存储中,即使删除和重新添加相同的密钥也会变得无锁,因此非常快。执行这些合并不使用计时器。只有当需要访问锁定存储且创建时间“足够长”时,才会发生对无锁存储的合并。time ago,即可配置的

📝注意:查看ThreadSafeDictionary<TKey TValue>类的讲话部分的性能对照表(它具有与ThreadSafeHashSet<T>相同的特征),以查看它是否适合您的需求。在这里你可以找到源代码的性能测试,如果你想自己执行他们。

源代码是可用的在这里,你也可以下载它作为NuGet包