对多个任务使用async/await

我使用的API客户端是完全异步的,也就是说,每个操作返回TaskTask<T>,例如:

static async Task DoSomething(int siteId, int postId, IBlogClient client)
{
await client.DeletePost(siteId, postId); // call API client
Console.WriteLine("Deleted post {0}.", siteId);
}

使用c# 5 async/await操作符,启动多个任务并等待它们全部完成的正确/最有效的方法是什么:

int[] ids = new[] { 1, 2, 3, 4, 5 };
Parallel.ForEach(ids, i => DoSomething(1, i, blogClient).Wait());

或者:

int[] ids = new[] { 1, 2, 3, 4, 5 };
Task.WaitAll(ids.Select(i => DoSomething(1, i, blogClient)).ToArray());

由于API客户端在内部使用HttpClient,我希望它立即发出5个HTTP请求,每一个请求完成时都写入控制台。

371602 次浏览
int[] ids = new[] { 1, 2, 3, 4, 5 };
Parallel.ForEach(ids, i => DoSomething(1, i, blogClient).Wait());

尽管您将这些操作与上面的代码并行运行,但此代码将阻塞每个操作所运行的每个线程。例如,如果网络调用需要2秒,每个线程挂起2秒,不做任何事情,只是等待。

int[] ids = new[] { 1, 2, 3, 4, 5 };
Task.WaitAll(ids.Select(i => DoSomething(1, i, blogClient)).ToArray());

另一方面,上面带有WaitAll的代码也会阻塞线程,在操作结束之前,线程将无法自由处理任何其他工作。

推荐的方法

我更喜欢WhenAll,它将在并行中异步执行您的操作。

public async Task DoWork() {


int[] ids = new[] { 1, 2, 3, 4, 5 };
await Task.WhenAll(ids.Select(i => DoSomething(1, i, blogClient)));
}

事实上,在上面的例子中,你甚至不需要await,你可以直接从方法返回,因为你没有任何延续:

public Task DoWork()
{
int[] ids = new[] { 1, 2, 3, 4, 5 };
return Task.WhenAll(ids.Select(i => DoSomething(1, i, blogClient)));
}
为了支持这一点,这里有一篇详细的博客文章,讲述了所有的 如何与ASP并发异步I/O。NET Web API

因为你调用的API是异步的,所以Parallel.ForEach版本没有太大意义。你不应该在WaitAll版本中使用.Wait,因为那会失去并行性。如果调用者是异步的,另一个替代方法是在执行SelectToArray之后使用Task.WhenAll来生成任务数组。第二种选择是使用Rx 2.0

我很想看看问题中提供的方法和公认答案的结果,所以我对它进行了测试。

代码如下:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;


namespace AsyncTest
{
class Program
{
class Worker
{
public int Id;
public int SleepTimeout;


public async Task DoWork(DateTime testStart)
{
var workerStart = DateTime.Now;
Console.WriteLine("Worker {0} started on thread {1}, beginning {2} seconds after test start.",
Id, Thread.CurrentThread.ManagedThreadId, (workerStart-testStart).TotalSeconds.ToString("F2"));
await Task.Run(() => Thread.Sleep(SleepTimeout));
var workerEnd = DateTime.Now;
Console.WriteLine("Worker {0} stopped; the worker took {1} seconds, and it finished {2} seconds after the test start.",
Id, (workerEnd-workerStart).TotalSeconds.ToString("F2"), (workerEnd-testStart).TotalSeconds.ToString("F2"));
}
}


static void Main(string[] args)
{
var workers = new List<Worker>
{
new Worker { Id = 1, SleepTimeout = 1000 },
new Worker { Id = 2, SleepTimeout = 2000 },
new Worker { Id = 3, SleepTimeout = 3000 },
new Worker { Id = 4, SleepTimeout = 4000 },
new Worker { Id = 5, SleepTimeout = 5000 },
};


var startTime = DateTime.Now;
Console.WriteLine("Starting test: Parallel.ForEach...");
PerformTest_ParallelForEach(workers, startTime);
var endTime = DateTime.Now;
Console.WriteLine("Test finished after {0} seconds.\n",
(endTime - startTime).TotalSeconds.ToString("F2"));


startTime = DateTime.Now;
Console.WriteLine("Starting test: Task.WaitAll...");
PerformTest_TaskWaitAll(workers, startTime);
endTime = DateTime.Now;
Console.WriteLine("Test finished after {0} seconds.\n",
(endTime - startTime).TotalSeconds.ToString("F2"));


startTime = DateTime.Now;
Console.WriteLine("Starting test: Task.WhenAll...");
var task = PerformTest_TaskWhenAll(workers, startTime);
task.Wait();
endTime = DateTime.Now;
Console.WriteLine("Test finished after {0} seconds.\n",
(endTime - startTime).TotalSeconds.ToString("F2"));


Console.ReadKey();
}


static void PerformTest_ParallelForEach(List<Worker> workers, DateTime testStart)
{
Parallel.ForEach(workers, worker => worker.DoWork(testStart).Wait());
}


static void PerformTest_TaskWaitAll(List<Worker> workers, DateTime testStart)
{
Task.WaitAll(workers.Select(worker => worker.DoWork(testStart)).ToArray());
}


static Task PerformTest_TaskWhenAll(List<Worker> workers, DateTime testStart)
{
return Task.WhenAll(workers.Select(worker => worker.DoWork(testStart)));
}
}
}

结果输出:

Starting test: Parallel.ForEach...
Worker 1 started on thread 1, beginning 0.21 seconds after test start.
Worker 4 started on thread 5, beginning 0.21 seconds after test start.
Worker 2 started on thread 3, beginning 0.21 seconds after test start.
Worker 5 started on thread 6, beginning 0.21 seconds after test start.
Worker 3 started on thread 4, beginning 0.21 seconds after test start.
Worker 1 stopped; the worker took 1.90 seconds, and it finished 2.11 seconds after the test start.
Worker 2 stopped; the worker took 3.89 seconds, and it finished 4.10 seconds after the test start.
Worker 3 stopped; the worker took 5.89 seconds, and it finished 6.10 seconds after the test start.
Worker 4 stopped; the worker took 5.90 seconds, and it finished 6.11 seconds after the test start.
Worker 5 stopped; the worker took 8.89 seconds, and it finished 9.10 seconds after the test start.
Test finished after 9.10 seconds.


Starting test: Task.WaitAll...
Worker 1 started on thread 1, beginning 0.01 seconds after test start.
Worker 2 started on thread 1, beginning 0.01 seconds after test start.
Worker 3 started on thread 1, beginning 0.01 seconds after test start.
Worker 4 started on thread 1, beginning 0.01 seconds after test start.
Worker 5 started on thread 1, beginning 0.01 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.01 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.01 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.01 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.01 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.01 seconds after the test start.
Test finished after 5.01 seconds.


Starting test: Task.WhenAll...
Worker 1 started on thread 1, beginning 0.00 seconds after test start.
Worker 2 started on thread 1, beginning 0.00 seconds after test start.
Worker 3 started on thread 1, beginning 0.00 seconds after test start.
Worker 4 started on thread 1, beginning 0.00 seconds after test start.
Worker 5 started on thread 1, beginning 0.00 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.00 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.00 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.00 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.00 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.00 seconds after the test start.
Test finished after 5.00 seconds.

可以使用Task.WhenAll函数,可以将任意数量的任务传递给该函数。Task.WhenAll返回一个新任务,该任务将在所有任务完成时完成。确保在Task.WhenAll上异步等待,以避免阻塞UI线程:

public async Task DoSomethingAsync() {
Task[] tasks = new Task[numTasks];
for (int i = 0; i < numTasks; i++)
{
tasks[i] = DoChildTaskAsync();
}
await Task.WhenAll(tasks);
// Code here will execute on UI thread
}

Parallel.ForEach需要一个用户定义的工作者列表和一个non-async Action来为每个工作者执行。

Task.WaitAllTask.WhenAll需要List<Task>,根据定义它们是异步的。

我发现RiaanDP响应对于理解差异非常有用,但它需要对Parallel.ForEach进行更正。没有足够的声誉来回应他的评论,因此我自己的回应。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;


namespace AsyncTest
{
class Program
{
class Worker
{
public int Id;
public int SleepTimeout;


public void DoWork(DateTime testStart)
{
var workerStart = DateTime.Now;
Console.WriteLine("Worker {0} started on thread {1}, beginning {2} seconds after test start.",
Id, Thread.CurrentThread.ManagedThreadId, (workerStart - testStart).TotalSeconds.ToString("F2"));
Thread.Sleep(SleepTimeout);
var workerEnd = DateTime.Now;
Console.WriteLine("Worker {0} stopped; the worker took {1} seconds, and it finished {2} seconds after the test start.",
Id, (workerEnd - workerStart).TotalSeconds.ToString("F2"), (workerEnd - testStart).TotalSeconds.ToString("F2"));
}


public async Task DoWorkAsync(DateTime testStart)
{
var workerStart = DateTime.Now;
Console.WriteLine("Worker {0} started on thread {1}, beginning {2} seconds after test start.",
Id, Thread.CurrentThread.ManagedThreadId, (workerStart - testStart).TotalSeconds.ToString("F2"));
await Task.Run(() => Thread.Sleep(SleepTimeout));
var workerEnd = DateTime.Now;
Console.WriteLine("Worker {0} stopped; the worker took {1} seconds, and it finished {2} seconds after the test start.",
Id, (workerEnd - workerStart).TotalSeconds.ToString("F2"), (workerEnd - testStart).TotalSeconds.ToString("F2"));
}
}


static void Main(string[] args)
{
var workers = new List<Worker>
{
new Worker { Id = 1, SleepTimeout = 1000 },
new Worker { Id = 2, SleepTimeout = 2000 },
new Worker { Id = 3, SleepTimeout = 3000 },
new Worker { Id = 4, SleepTimeout = 4000 },
new Worker { Id = 5, SleepTimeout = 5000 },
};


var startTime = DateTime.Now;
Console.WriteLine("Starting test: Parallel.ForEach...");
PerformTest_ParallelForEach(workers, startTime);
var endTime = DateTime.Now;
Console.WriteLine("Test finished after {0} seconds.\n",
(endTime - startTime).TotalSeconds.ToString("F2"));


startTime = DateTime.Now;
Console.WriteLine("Starting test: Task.WaitAll...");
PerformTest_TaskWaitAll(workers, startTime);
endTime = DateTime.Now;
Console.WriteLine("Test finished after {0} seconds.\n",
(endTime - startTime).TotalSeconds.ToString("F2"));


startTime = DateTime.Now;
Console.WriteLine("Starting test: Task.WhenAll...");
var task = PerformTest_TaskWhenAll(workers, startTime);
task.Wait();
endTime = DateTime.Now;
Console.WriteLine("Test finished after {0} seconds.\n",
(endTime - startTime).TotalSeconds.ToString("F2"));


Console.ReadKey();
}


static void PerformTest_ParallelForEach(List<Worker> workers, DateTime testStart)
{
Parallel.ForEach(workers, worker => worker.DoWork(testStart));
}


static void PerformTest_TaskWaitAll(List<Worker> workers, DateTime testStart)
{
Task.WaitAll(workers.Select(worker => worker.DoWorkAsync(testStart)).ToArray());
}


static Task PerformTest_TaskWhenAll(List<Worker> workers, DateTime testStart)
{
return Task.WhenAll(workers.Select(worker => worker.DoWorkAsync(testStart)));
}
}
}

结果输出如下所示。执行时间具有可比性。当我的电脑每周进行防病毒扫描时,我进行了这个测试。改变测试的顺序确实会改变测试的执行时间。

Starting test: Parallel.ForEach...
Worker 1 started on thread 9, beginning 0.02 seconds after test start.
Worker 2 started on thread 10, beginning 0.02 seconds after test start.
Worker 3 started on thread 11, beginning 0.02 seconds after test start.
Worker 4 started on thread 13, beginning 0.03 seconds after test start.
Worker 5 started on thread 14, beginning 0.03 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.02 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.02 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.03 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.03 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.03 seconds after the test start.
Test finished after 5.03 seconds.


Starting test: Task.WaitAll...
Worker 1 started on thread 9, beginning 0.00 seconds after test start.
Worker 2 started on thread 9, beginning 0.00 seconds after test start.
Worker 3 started on thread 9, beginning 0.00 seconds after test start.
Worker 4 started on thread 9, beginning 0.00 seconds after test start.
Worker 5 started on thread 9, beginning 0.01 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.01 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.01 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.01 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.01 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.01 seconds after the test start.
Test finished after 5.01 seconds.


Starting test: Task.WhenAll...
Worker 1 started on thread 9, beginning 0.00 seconds after test start.
Worker 2 started on thread 9, beginning 0.00 seconds after test start.
Worker 3 started on thread 9, beginning 0.00 seconds after test start.
Worker 4 started on thread 9, beginning 0.00 seconds after test start.
Worker 5 started on thread 9, beginning 0.00 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.00 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.00 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.00 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.00 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.01 seconds after the test start.
Test finished after 5.01 seconds.

我只是想给上面所有的好答案补充一点, 如果你写一个库,使用ConfigureAwait(false)是一个很好的实践 并获得更好的性能,如在这里.

所以这个片段似乎更好:

 public static async Task DoWork()
{
int[] ids = new[] { 1, 2, 3, 4, 5 };
await Task.WhenAll(ids.Select(i => DoSomething(1, i))).ConfigureAwait(false);
}

一个完整的小提琴链接在这里

所有的答案都是为了运行相同的函数。

下面的代码用于调用不同的函数。只需将常规的Task.Run()放入一个数组中,并调用Task.WhenAll():

await Task.WhenAll(new Task[] {
Task.Run(() => Func1(args)),
Task.Run(() => Func2(args))
});

这个问题已经10年了,OP问的是c# 5。

到今天为止,还有一个选择:并行。在。net 6中引入的ForEachAsync方法。

下面是一个基于OP代码的例子:

int[] ids = new[] { 1, 2, 3, 4, 5 };
await Parallel.ForEachAsync(ids, async (i,token) => await DoSomething(1, i, blogClient));

这是完全异步的,不会阻塞任何线程。

此外,它可能比Task更好。WaitAll和Task。WhenAll接近,因为它们不限制并行运行的线程数量。如果你有一个巨大的数组,它会消耗掉你所有的RAM。平行的。ForEachAsync允许你像这样指定并行度:

var options = new ParallelOptions { MaxDegreeOfParallelism = 4 };


await Parallel.ForEachAsync(ids, options, async (i,token) => await DoSomething(1, i, blogClient));

通过这种方式,只有4个线程并行运行。