有没有类似于异步 BlockingCollection < T > 的东西?

我想对 await的结果 BlockingCollection<T>.Take()异步,所以我不阻塞线程。寻找这样的东西:

var item = await blockingCollection.TakeAsync();

我知道我能做到:

var item = await Task.Run(() => blockingCollection.Take());

但是这有点扼杀了整个想法,因为另一个线程(ThreadPool)被阻塞了。

还有别的选择吗?

27201 次浏览

There are four alternatives that I know of.

The first is Channels, which provides a threadsafe queue that supports asynchronous Read and Write operations. Channels are highly optimized and optionally support dropping some items if a threshold is reached.

The next is BufferBlock<T> from TPL Dataflow. If you only have a single consumer, you can use OutputAvailableAsync or ReceiveAsync, or just link it to an ActionBlock<T>. For more information, see my blog.

The last two are types that I created, available in my AsyncEx library.

AsyncCollection<T> is the async near-equivalent of BlockingCollection<T>, capable of wrapping a concurrent producer/consumer collection such as ConcurrentQueue<T> or ConcurrentBag<T>. You can use TakeAsync to asynchronously consume items from the collection. For more information, see my blog.

AsyncProducerConsumerQueue<T> is a more portable async-compatible producer/consumer queue. You can use DequeueAsync to asynchronously consume items from the queue. For more information, see my blog.

The last three of these alternatives allow synchronous and asynchronous puts and takes.

...or you can do this:

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;


public class AsyncQueue<T>
{
private readonly SemaphoreSlim _sem;
private readonly ConcurrentQueue<T> _que;


public AsyncQueue()
{
_sem = new SemaphoreSlim(0);
_que = new ConcurrentQueue<T>();
}


public void Enqueue(T item)
{
_que.Enqueue(item);
_sem.Release();
}


public void EnqueueRange(IEnumerable<T> source)
{
var n = 0;
foreach (var item in source)
{
_que.Enqueue(item);
n++;
}
_sem.Release(n);
}


public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
{
for (; ; )
{
await _sem.WaitAsync(cancellationToken);


T item;
if (_que.TryDequeue(out item))
{
return item;
}
}
}
}

Simple, fully functional asynchronous FIFO queue.

Note: SemaphoreSlim.WaitAsync was added in .NET 4.5 before that, this was not all that straightforward.

Here is a very basic implementation of a BlockingCollection that supports awaiting, with lots of missing features. It uses the AsyncEnumerable library, that makes asynchronous enumeration possible for C# versions older than 8.0.

public class AsyncBlockingCollection<T>
{ // Missing features: cancellation, boundedCapacity, TakeAsync
private Queue<T> _queue = new Queue<T>();
private SemaphoreSlim _semaphore = new SemaphoreSlim(0);
private int _consumersCount = 0;
private bool _isAddingCompleted;


public void Add(T item)
{
lock (_queue)
{
if (_isAddingCompleted) throw new InvalidOperationException();
_queue.Enqueue(item);
}
_semaphore.Release();
}


public void CompleteAdding()
{
lock (_queue)
{
if (_isAddingCompleted) return;
_isAddingCompleted = true;
if (_consumersCount > 0) _semaphore.Release(_consumersCount);
}
}


public IAsyncEnumerable<T> GetConsumingEnumerable()
{
lock (_queue) _consumersCount++;
return new AsyncEnumerable<T>(async yield =>
{
while (true)
{
lock (_queue)
{
if (_queue.Count == 0 && _isAddingCompleted) break;
}
await _semaphore.WaitAsync();
bool hasItem;
T item = default;
lock (_queue)
{
hasItem = _queue.Count > 0;
if (hasItem) item = _queue.Dequeue();
}
if (hasItem) await yield.ReturnAsync(item);
}
});
}
}

Usage example:

var abc = new AsyncBlockingCollection<int>();
var producer = Task.Run(async () =>
{
for (int i = 1; i <= 10; i++)
{
await Task.Delay(100);
abc.Add(i);
}
abc.CompleteAdding();
});
var consumer = Task.Run(async () =>
{
await abc.GetConsumingEnumerable().ForEachAsync(async item =>
{
await Task.Delay(200);
await Console.Out.WriteAsync(item + " ");
});
});
await Task.WhenAll(producer, consumer);

Output:

1 2 3 4 5 6 7 8 9 10


Update: With the release of C# 8, asynchronous enumeration has become a build-in language feature. The required classes (IAsyncEnumerable, IAsyncEnumerator) are embedded in .NET Core 3.0, and are offered as a package for .NET Framework 4.6.1+ (Microsoft.Bcl.AsyncInterfaces).

Here is an alternative GetConsumingEnumerable implementation, featuring the new C# 8 syntax:

public async IAsyncEnumerable<T> GetConsumingEnumerable()
{
lock (_queue) _consumersCount++;
while (true)
{
lock (_queue)
{
if (_queue.Count == 0 && _isAddingCompleted) break;
}
await _semaphore.WaitAsync();
bool hasItem;
T item = default;
lock (_queue)
{
hasItem = _queue.Count > 0;
if (hasItem) item = _queue.Dequeue();
}
if (hasItem) yield return item;
}
}

Note the coexistence of await and yield in the same method.

Usage example (C# 8):

var consumer = Task.Run(async () =>
{
await foreach (var item in abc.GetConsumingEnumerable())
{
await Task.Delay(200);
await Console.Out.WriteAsync(item + " ");
}
});

Note the await before the foreach.

If you don't mind a little hack, you can try these extensions.

public static async Task AddAsync<TEntity>(
this BlockingCollection<TEntity> Bc, TEntity item, CancellationToken abortCt)
{
while (true)
{
try
{
if (Bc.TryAdd(item, 0, abortCt))
return;
else
await Task.Delay(100, abortCt);
}
catch (Exception)
{
throw;
}
}
}


public static async Task<TEntity> TakeAsync<TEntity>(
this BlockingCollection<TEntity> Bc, CancellationToken abortCt)
{
while (true)
{
try
{
TEntity item;


if (Bc.TryTake(out item, 0, abortCt))
return item;
else
await Task.Delay(100, abortCt);
}
catch (Exception)
{
throw;
}
}
}