如何使用异步和 ForEach?

在使用 ForEach 时可以使用 Async 吗? 下面是我正在尝试的代码:

using (DataContext db = new DataLayer.DataContext())
{
db.Groups.ToList().ForEach(i => async {
await GetAdminsFromGroup(i.Gid);
});
}

我得到了一个错误:

“ Async”名称在当前上下文中不存在

使用语句所包含的方法被设置为异步。

224627 次浏览

问题在于,async关键字需要出现在 lambda 之前,而不是主体之前:

db.Groups.ToList().ForEach(async (i) => {
await GetAdminsFromGroup(i.Gid);
});

List<T>.ForEach不能很好地与 async协同工作(LINQ-to-object 也是如此,原因相同)。

在这种情况下,我建议将 投射每个元素放入一个异步操作中,然后您可以(异步)等待它们全部完成。

using (DataContext db = new DataLayer.DataContext())
{
var tasks = db.Groups.ToList().Select(i => GetAdminsFromGroupAsync(i.Gid));
var results = await Task.WhenAll(tasks);
}

这种办法比让 async代表参加 ForEach有以下好处:

  1. 错误处理更合适。catch无法捕获来自 async void的异常; 这种方法将在 await Task.WhenAll行传播异常,从而允许自然的异常处理。
  2. 您知道在此方法结束时任务已经完成,因为它执行 await Task.WhenAll。如果您使用 async void,您不能轻易地告诉当操作已经完成。
  3. 这种方法具有检索结果的自然语法。GetAdminsFromGroupAsync听起来像是一个产生结果的操作(管理员) ,如果这样的操作可以 返回它们的结果,而不是设置一个值作为副作用,那么这样的代码更自然。

这个小小的扩展方法应该可以为您提供异常安全的异步迭代:

public static async Task ForEachAsync<T>(this List<T> list, Func<T, Task> func)
{
foreach (var value in list)
{
await func(value);
}
}

由于我们将 lambda 的返回类型从 void更改为 Task,因此异常将正确地向上传播。这样你就可以在实践中写出这样的东西:

await db.Groups.ToList().ForEachAsync(async i => {
await GetAdminsFromGroup(i.Gid);
});

添加此扩展方法

public static class ForEachAsyncExtension
{
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
while (partition.MoveNext())
await body(partition.Current).ConfigureAwait(false);
}));
}
}

然后像这样使用:

Task.Run(async () =>
{
var s3 = new AmazonS3Client(Config.Instance.Aws.Credentials, Config.Instance.Aws.RegionEndpoint);
var buckets = await s3.ListBucketsAsync();


foreach (var s3Bucket in buckets.Buckets)
{
if (s3Bucket.BucketName.StartsWith("mybucket-"))
{
log.Information("Bucket => {BucketName}", s3Bucket.BucketName);


ListObjectsResponse objects;
try
{
objects = await s3.ListObjectsAsync(s3Bucket.BucketName);
}
catch
{
log.Error("Error getting objects. Bucket => {BucketName}", s3Bucket.BucketName);
continue;
}


// ForEachAsync (4 is how many tasks you want to run in parallel)
await objects.S3Objects.ForEachAsync(4, async s3Object =>
{
try
{
log.Information("Bucket => {BucketName} => {Key}", s3Bucket.BucketName, s3Object.Key);
await s3.DeleteObjectAsync(s3Bucket.BucketName, s3Object.Key);
}
catch
{
log.Error("Error deleting bucket {BucketName} object {Key}", s3Bucket.BucketName, s3Object.Key);
}
});


try
{
await s3.DeleteBucketAsync(s3Bucket.BucketName);
}
catch
{
log.Error("Error deleting bucket {BucketName}", s3Bucket.BucketName);
}
}
}
}).Wait();

下面是具有顺序处理的上述异步 foreach 变体的实际工作版本:

public static async Task ForEachAsync<T>(this List<T> enumerable, Action<T> action)
{
foreach (var item in enumerable)
await Task.Run(() => { action(item); }).ConfigureAwait(false);
}

以下是实施方案:

public async void SequentialAsync()
{
var list = new List<Action>();


Action action1 = () => {
//do stuff 1
};


Action action2 = () => {
//do stuff 2
};


list.Add(action1);
list.Add(action2);


await list.ForEachAsync();
}

.ConfigureAwait(false);保持主线程的上下文,同时对每个任务进行异步顺序处理。

简单的答案是使用 foreach关键字而不是 List()ForEach()方法。

using (DataContext db = new DataLayer.DataContext())
{
foreach(var i in db.Groups)
{
await GetAdminsFromGroup(i.Gid);
}
}

C# 8.0开始,您可以异步创建和使用流。

    private async void button1_Click(object sender, EventArgs e)
{
IAsyncEnumerable<int> enumerable = GenerateSequence();


await foreach (var i in enumerable)
{
Debug.WriteLine(i);
}
}


public static async IAsyncEnumerable<int> GenerateSequence()
{
for (int i = 0; i < 20; i++)
{
await Task.Delay(100);
yield return i;
}
}

更多

这是我创建的用于处理 ForEach异步场景的方法。

  • 如果其中一个任务失败,那么其他任务将继续执行。
  • 您可以添加将在每个异常上执行的函数。
  • 异常将在最后作为 generateException 收集,并且可以使用。
  • 可以处理取消令牌
 public static class ParallelExecutor
{
/// <summary>
/// Executes asynchronously given function on all elements of given enumerable with task count restriction.
/// Executor will continue starting new tasks even if one of the tasks throws. If at least one of the tasks throwed exception then <see cref="AggregateException"/> is throwed at the end of the method run.
/// </summary>
/// <typeparam name="T">Type of elements in enumerable</typeparam>
/// <param name="maxTaskCount">The maximum task count.</param>
/// <param name="enumerable">The enumerable.</param>
/// <param name="asyncFunc">asynchronous function that will be executed on every element of the enumerable. MUST be thread safe.</param>
/// <param name="onException">Acton that will be executed on every exception that would be thrown by asyncFunc. CAN be thread unsafe.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task ForEachAsync<T>(int maxTaskCount, IEnumerable<T> enumerable, Func<T, Task> asyncFunc, Action<Exception> onException = null, CancellationToken cancellationToken = default)
{
using var semaphore = new SemaphoreSlim(initialCount: maxTaskCount, maxCount: maxTaskCount);


// This `lockObject` is used only in `catch { }` block.
object lockObject = new object();
var exceptions = new List<Exception>();
var tasks = new Task[enumerable.Count()];
int i = 0;


try
{
foreach (var t in enumerable)
{
await semaphore.WaitAsync(cancellationToken);
tasks[i++] = Task.Run(
async () =>
{
try
{
await asyncFunc(t);
}
catch (Exception e)
{
if (onException != null)
{
lock (lockObject)
{
onException.Invoke(e);
}
}


// This exception will be swallowed here but it will be collected at the end of ForEachAsync method in order to generate AggregateException.
throw;
}
finally
{
semaphore.Release();
}
}, cancellationToken);


if (cancellationToken.IsCancellationRequested)
{
break;
}
}
}
catch (OperationCanceledException e)
{
exceptions.Add(e);
}


foreach (var t in tasks)
{
if (cancellationToken.IsCancellationRequested)
{
break;
}


// Exception handling in this case is actually pretty fast.
// https://gist.github.com/shoter/d943500eda37c7d99461ce3dace42141
try
{
await t;
}
#pragma warning disable CA1031 // Do not catch general exception types - we want to throw that exception later as aggregate exception. Nothing wrong here.
catch (Exception e)
#pragma warning restore CA1031 // Do not catch general exception types
{
exceptions.Add(e);
}
}


if (exceptions.Any())
{
throw new AggregateException(exceptions);
}
}
}

我想补充的是,有一个内置 ForEach 函数的 平行班可以用于此目的。

如果您正在使用 EntityFramework.Core,则有 一个扩展方法 ForEachAsync

示例用法如下:

using Microsoft.EntityFrameworkCore;
using System.Threading.Tasks;


public class Example
{
private readonly DbContext _dbContext;
public Example(DbContext dbContext)
{
_dbContext = dbContext;
}
public async void LogicMethod()
{
        

await _dbContext.Set<dbTable>().ForEachAsync(async x =>
{
//logic
await AsyncTask(x);
});
}


public async Task<bool> AsyncTask(object x)
{
//other logic
return await Task.FromResult<bool>(true);
}
}

这不是一个老问题,但 。网6介绍了 并行 ForeachAsync:

var collectionToIterate = db.Groups.ToList();
await Parallel.ForEachAsync(collectionToIterate, async (i, token) =>
{
await GetAdminsFromGroup(i);
});

ForeachAsync 也接受一个 ParallelOptions 对象,但是通常你不想弄乱 最大并行度属性:

ParallelOptions parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 4 };
var collectionToIterate = db.Groups.ToList();


await Parallel.ForEachAsync(collectionToIterate, parallelOptions , async (i, token) =>
{
await GetAdminsFromGroup(i);
});

来自 Microsoft Docs: https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.paralleloptions.maxdegreeofparallelism?view=net-6.0

默认情况下,For 和 ForEach 将使用底层调度程序提供的多个线程,因此将 MaxDegreeOfParallelism 从默认值更改为仅限于使用多少并发任务。

通常,您不需要修改此设置... ..。