在.NET 中创建阻塞队列 < T > ?

我有一个场景,我有多个线程添加到一个队列和多个线程从同一个队列读取。如果队列达到一个特定的大小 所有线索,填充队列将被阻塞,直到添加一个项目从队列中删除。

下面的解决方案就是我现在正在使用的,我的问题是: 如何才能改进它?在 BCL 中是否有一个对象已经启用了我应该使用的这种行为?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
//todo: might be worth changing this into a proper QUEUE


private AutoResetEvent _FullEvent = new AutoResetEvent(false);


internal T this[int i]
{
get { return (T) List[i]; }
}


private int _MaxSize;
internal int MaxSize
{
get { return _MaxSize; }
set
{
_MaxSize = value;
checkSize();
}
}


internal BlockingCollection(int maxSize)
{
MaxSize = maxSize;
}


internal void Add(T item)
{
Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));


_FullEvent.WaitOne();


List.Add(item);


Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));


checkSize();
}


internal void Remove(T item)
{
lock (List)
{
List.Remove(item);
}


Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
}


protected override void OnRemoveComplete(int index, object value)
{
checkSize();
base.OnRemoveComplete(index, value);
}


internal new IEnumerator GetEnumerator()
{
return List.GetEnumerator();
}


private void checkSize()
{
if (Count < MaxSize)
{
Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
_FullEvent.Set();
}
else
{
Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
_FullEvent.Reset();
}
}
}
115227 次浏览

如果您希望获得最大的吞吐量,允许多个读取器读取并且只有一个写入器写入,BCL 有一个叫做 ReaderWriterLockSlim 的东西,它应该有助于减少您的代码..。

这看起来非常不安全(同步非常少) ; 这样怎么样:

class SizeQueue<T>
{
private readonly Queue<T> queue = new Queue<T>();
private readonly int maxSize;
public SizeQueue(int maxSize) { this.maxSize = maxSize; }


public void Enqueue(T item)
{
lock (queue)
{
while (queue.Count >= maxSize)
{
Monitor.Wait(queue);
}
queue.Enqueue(item);
if (queue.Count == 1)
{
// wake up any blocked dequeue
Monitor.PulseAll(queue);
}
}
}
public T Dequeue()
{
lock (queue)
{
while (queue.Count == 0)
{
Monitor.Wait(queue);
}
T item = queue.Dequeue();
if (queue.Count == maxSize - 1)
{
// wake up any blocked enqueue
Monitor.PulseAll(queue);
}
return item;
}
}
}

(编辑)

实际上,您需要一种方法来关闭队列,以便读取器开始干净地退出(可能类似于 bool 标志) ,如果设置了这种方法,一个空队列将返回(而不是阻塞) :

bool closing;
public void Close()
{
lock(queue)
{
closing = true;
Monitor.PulseAll(queue);
}
}
public bool TryDequeue(out T value)
{
lock (queue)
{
while (queue.Count == 0)
{
if (closing)
{
value = default(T);
return false;
}
Monitor.Wait(queue);
}
value = queue.Dequeue();
if (queue.Count == maxSize - 1)
{
// wake up any blocked enqueue
Monitor.PulseAll(queue);
}
return true;
}
}

你可以看看 System.Threading.Semaphore课程。除此之外,不,你得自己做。AFAIK 没有这样的内置收藏。

我还没有完全探索的 TPL,但他们可能有一些适合你的需要,或至少,一些反射器素材,从中获取一些灵感。

希望能帮上忙。

“如何才能改善这种状况?”

您需要查看类中的每个方法并考虑如果另一个线程同时调用该方法或任何其他方法会发生什么情况。例如,您将一个锁放置在“移除”方法中,但不放置在“添加”方法中。如果一个线程同时添加另一个线程,会发生什么情况?坏事。

还要考虑一个方法可以返回第二个对象,该对象提供对第一个对象的内部数据的访问——例如,GetEnumerator。假设一个线程正在通过该枚举器,另一个线程正在同时修改列表。情况不妙。

一个很好的经验法则是通过将类中的方法数量减少到绝对最少来简化这个过程。

特别是,不要继承另一个容器类,因为您将公开该类的所有方法,为调用方提供一种方法来破坏内部数据,或者查看数据的部分完整更改(同样糟糕,因为数据在那个时候似乎已经破坏)。隐藏所有的细节,并完全无情地让你如何允许访问它们。

我强烈建议你使用现成的解决方案-得到一本关于线程或使用第三方图书馆的书。否则,考虑到您正在尝试的东西,您将在很长一段时间内调试代码。

而且,相对于调用者选择一个特定的条目而言,“移除”返回一个条目(比如说,首先添加的条目,因为它是一个队列)不是更有意义吗?当队列为空时,或许“移除”也应该阻塞。

更新: 马克的回答实际上实现了所有这些建议!:)但我把这个留在这里,因为它可能有助于理解为什么他的版本是这样一个进步。

这就是我来操作一个线程安全的有界阻塞队列的原因。

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;


public class BlockingBuffer<T>
{
private Object t_lock;
private Semaphore sema_NotEmpty;
private Semaphore sema_NotFull;
private T[] buf;


private int getFromIndex;
private int putToIndex;
private int size;
private int numItems;


public BlockingBuffer(int Capacity)
{
if (Capacity <= 0)
throw new ArgumentOutOfRangeException("Capacity must be larger than 0");


t_lock = new Object();
buf = new T[Capacity];
sema_NotEmpty = new Semaphore(0, Capacity);
sema_NotFull = new Semaphore(Capacity, Capacity);
getFromIndex = 0;
putToIndex = 0;
size = Capacity;
numItems = 0;
}


public void put(T item)
{
sema_NotFull.WaitOne();
lock (t_lock)
{
while (numItems == size)
{
Monitor.Pulse(t_lock);
Monitor.Wait(t_lock);
}


buf[putToIndex++] = item;


if (putToIndex == size)
putToIndex = 0;


numItems++;


Monitor.Pulse(t_lock);


}
sema_NotEmpty.Release();




}


public T take()
{
T item;


sema_NotEmpty.WaitOne();
lock (t_lock)
{


while (numItems == 0)
{
Monitor.Pulse(t_lock);
Monitor.Wait(t_lock);
}


item = buf[getFromIndex++];


if (getFromIndex == size)
getFromIndex = 0;


numItems--;


Monitor.Pulse(t_lock);


}
sema_NotFull.Release();


return item;
}
}

我只是用反应扩展把它拼凑起来,然后想起了这个问题:

public class BlockingQueue<T>
{
private readonly Subject<T> _queue;
private readonly IEnumerator<T> _enumerator;
private readonly object _sync = new object();


public BlockingQueue()
{
_queue = new Subject<T>();
_enumerator = _queue.GetEnumerator();
}


public void Enqueue(T item)
{
lock (_sync)
{
_queue.OnNext(item);
}
}


public T Dequeue()
{
_enumerator.MoveNext();
return _enumerator.Current;
}
}

不一定完全安全,但很简单。

使用。Net 4 BlockingCollection,排队使用 Add () ,排队使用 Take ()。它在内部使用非阻塞的 ConcurrentQueue。更多信息请看 快速和最佳生产者/消费者队列技术 BlockingCollection 与并发队列

可以在 System.Collections.Concurrent 命名空间中使用 BlockingCollection并发队列

 public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
/// <summary>
/// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
/// </summary>
public ProducerConsumerQueue()
: base(new ConcurrentQueue<T>())
{
}


/// <summary>
/// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
/// </summary>
/// <param name="maxSize"></param>
public ProducerConsumerQueue(int maxSize)
: base(new ConcurrentQueue<T>(), maxSize)
{
}






}

从.NET 5.0/Core 3.0开始,您可以使用 系统,线程,通道
来自 This (. NET (C #)中的异步生产者消费者模式)文章的基准显示了对 BlockingCollection 的显著速度提升!