How to limit the amount of concurrent async I/O operations?

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };


// now let's send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
var client = new HttpClient();
var html = await client.GetStringAsync(url);
});

Here is the problem, it starts 1000+ simultaneous web requests. Is there an easy way to limit the concurrent amount of these async http requests? So that no more than 20 web pages are downloaded at any given time. How to do it in the most efficient manner?

69081 次浏览

尽管1000个任务可能排队得非常快,但是并行任务库只能处理等于计算机中 CPU 核数量的并发任务。这意味着如果您有一台四核机器,那么在给定的时间内只会执行4个任务(除非您降低 MaxDegreeOfParallelism)。

应该使用并行计算来加速 CPU 绑定操作。这里我们讨论的是 I/O 绑定操作。您的实现应该是 完全异步,除非您的多核 CPU 占用了繁忙的单核。

我喜欢 usr 提出的在这里使用“异步信号量”的建议。

遗憾的是,.NETFramework 缺少用于协调并行异步任务的最重要的组合器。

看看由最受尊敬的斯蒂芬 · 图博建造的 异步信号量课堂。您需要的是一个信号量,并且需要它的异步版本。

您完全可以在最新版本的。NET,使用。NET 4.5 Beta.上一篇来自“ usr”的文章指向了 Stephen Toub 写的一篇好文章,但是没有宣布的消息是,这个异步信号量实际上已经进入了。NET 4.5

如果你看看我们心爱的 SemaphoreSlim类(你应该使用它,因为它比原来的 Semaphore性能更好) ,它现在拥有 WaitAsync(...)系列的重载,所有预期的参数-超时间隔,取消令牌,所有你通常的调度朋友:)

斯蒂芬最近还写了一篇关于新的。NET 4.5的好东西,出来的测试版见 .NET 4.5 Beta 中并行性的新特点

最后,这里有一些关于如何使用 SemaphoreSlim 进行异步方法节流的示例代码:

public async Task MyOuterMethod()
{
// let's say there is a list of 1000+ URLs
var urls = { "http://google.com", "http://yahoo.com", ... };


// now let's send HTTP requests to each of these URLs in parallel
var allTasks = new List<Task>();
var throttler = new SemaphoreSlim(initialCount: 20);
foreach (var url in urls)
{
// do an async wait until we can schedule again
await throttler.WaitAsync();


// using Task.Run(...) to run the lambda in its own parallel
// flow on the threadpool
allTasks.Add(
Task.Run(async () =>
{
try
{
var client = new HttpClient();
var html = await client.GetStringAsync(url);
}
finally
{
throttler.Release();
}
}));
}


// won't get here until all urls have been put into tasks
await Task.WhenAll(allTasks);


// won't get here until all tasks have completed in some way
// (either success or exception)
}

最后,但可能值得一提的是使用基于 TPL 的调度的解决方案。您可以在 TPL 上创建尚未启动的委托绑定任务,并允许自定义任务调度程序限制并发性。事实上,这里有一个 MSDN 示例:

参见 任务计划程序

从本质上讲,您需要为您想要点击的每个 URL 创建一个 Action 或 Task,将它们放在一个 List 中,然后处理该列表,限制可并行处理的数量。

我的博客文章 展示了如何使用 Tasks 和 Actions 实现这一点,并提供了一个示例项目,您可以下载并运行该项目来查看两者的运行情况。

用行动

如果使用 Actions,则可以使用内置的。平行网络。调用函数。这里我们将其限制为最多并行运行20个线程。

var listOfActions = new List<Action>();
foreach (var url in urls)
{
var localUrl = url;
// Note that we create the Task here, but do not start it.
listOfTasks.Add(new Task(() => CallUrl(localUrl)));
}


var options = new ParallelOptions {MaxDegreeOfParallelism = 20};
Parallel.Invoke(options, listOfActions.ToArray());

与任务

任务没有内置的功能。但是,你可以使用我在我的博客上提供的功能。

    /// <summary>
/// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
/// </summary>
/// <param name="tasksToRun">The tasks to run.</param>
/// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
{
await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
}


/// <summary>
/// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
/// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
/// </summary>
/// <param name="tasksToRun">The tasks to run.</param>
/// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
/// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
{
// Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
var tasks = tasksToRun.ToList();


using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
{
var postTaskTasks = new List<Task>();


// Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));


// Start running each task.
foreach (var task in tasks)
{
// Increment the number of tasks currently running and wait if too many are running.
await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);


cancellationToken.ThrowIfCancellationRequested();
task.Start();
}


// Wait for all of the provided tasks to complete.
// We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
await Task.WhenAll(postTaskTasks.ToArray());
}
}

然后创建任务列表并调用函数让它们运行,一次最多同时运行20个任务,你可以这样做:

var listOfTasks = new List<Task>();
foreach (var url in urls)
{
var localUrl = url;
// Note that we create the Task here, but do not start it.
listOfTasks.Add(new Task(async () => await CallUrl(localUrl)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 20);

如果你有一个 IEnumable (即。字符串) ,并且您希望对其中的每个字符串(即。制作一个异步 http 请求)并发和可选地,你也想设置最大数量的并发 I/O 请求在实时,这里是你如何做到这一点。这样你就不需要使用线程池等,该方法使用信号量来控制最大并发 I/O 请求,类似于一个请求完成的滑动窗口模式,离开信号量,下一个请求进入。

用途:

await ForEachAsync(urlStrings, YourAsyncFunc, optionalMaxDegreeOfConcurrency);
public static Task ForEachAsync<TIn>(
IEnumerable<TIn> inputEnumerable,
Func<TIn, Task> asyncProcessor,
int? maxDegreeOfParallelism = null)
{
int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);


IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
{
await throttler.WaitAsync().ConfigureAwait(false);
try
{
await asyncProcessor(input).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});


return Task.WhenAll(tasks);
}

这里有很多陷阱,而且在错误情况下直接使用信号量可能会很棘手,所以我建议使用 异步枚举器 NuGet 包而不是重新发明车轮:

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };


// now let's send HTTP requests to each of these URLs in parallel
await urls.ParallelForEachAsync(async (url) => {
var client = new HttpClient();
var html = await client.GetStringAsync(url);
}, maxDegreeOfParalellism: 20);

SemaphoreSlim 在这里非常有用。

    /// <summary>
/// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxActionsToRunInParallel">Optional, max numbers of the actions to run in parallel,
/// Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
this IEnumerable<T> enumerable,
Func<T, Task> action,
int? maxActionsToRunInParallel = null)
{
if (maxActionsToRunInParallel.HasValue)
{
using (var semaphoreSlim = new SemaphoreSlim(
maxActionsToRunInParallel.Value, maxActionsToRunInParallel.Value))
{
var tasksWithThrottler = new List<Task>();


foreach (var item in enumerable)
{
// Increment the number of currently running tasks and wait if they are more than limit.
await semaphoreSlim.WaitAsync();


tasksWithThrottler.Add(Task.Run(async () =>
{
await action(item).ContinueWith(res =>
{
// action is completed, so decrement the number of currently running tasks
semaphoreSlim.Release();
});
}));
}


// Wait for all of the provided tasks to complete.
await Task.WhenAll(tasksWithThrottler.ToArray());
}
}
else
{
await Task.WhenAll(enumerable.Select(item => action(item)));
}
}

用法示例:

await enumerable.ForEachAsyncConcurrent(
async item =>
{
await SomeAsyncMethod(item);
},
5);

这不是一个好的实践,因为它改变了一个全局变量。它也不是异步的通用解决方案。但是对于 HttpClient 的所有实例来说都很容易,如果这就是您想要的全部内容的话。你可以简单地试试:

System.Net.ServicePointManager.DefaultConnectionLimit = 20;

.NET 6发布之后(2021年11月) ,限制并发异步 I/O 操作数量的推荐方法是使用 MaxDegreeOfParallelism配置的 Parallel.ForEachAsync API。以下是它在实践中的使用方法:

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", /*...*/ };
var client = new HttpClient();
var options = new ParallelOptions() { MaxDegreeOfParallelism = 20 };


// now let's send HTTP requests to each of these URLs in parallel
await Parallel.ForEachAsync(urls, options, async (url, cancellationToken) =>
{
var html = await client.GetStringAsync(url, cancellationToken);
});

在上面的例子中,Parallel.ForEachAsync任务是异步等待的。如果需要,还可以使用 Wait进行同步,这将阻塞当前线程,直到完成所有异步操作。同步 Wait的优点是,在出现错误的情况下,所有异常都将被传播。相反,await操作符按设计只传播第一个异常。如果这是一个问题,您可以找到解决方案 给你

(注意: ForEachAsync扩展方法的惯用实现也传播结果,可以在此答案的 第四版中找到)

下面是一个方便的扩展方法,你可以创建一个包装任务列表的扩展方法,这样它们可以在最大程度上并发执行:

/// <summary>Allows to do any async operation in bulk while limiting the system to a number of concurrent items being processed.</summary>
private static IEnumerable<Task<T>> WithMaxConcurrency<T>(this IEnumerable<Task<T>> tasks, int maxParallelism)
{
SemaphoreSlim maxOperations = new SemaphoreSlim(maxParallelism);
// The original tasks get wrapped in a new task that must first await a semaphore before the original task is called.
return tasks.Select(task => maxOperations.WaitAsync().ContinueWith(_ =>
{
try { return task; }
finally { maxOperations.Release(); }
}).Unwrap());
}

而不是现在:

await Task.WhenAll(someTasks);

你可以走了

await Task.WhenAll(someTasks.WithMaxConcurrency(20));