using (DataContext db = new DataLayer.DataContext())
{
var tasks = db.Groups.ToList().Select(i => GetAdminsFromGroupAsync(i.Gid));
var results = await Task.WhenAll(tasks);
}
public static class ForEachAsyncExtension
{
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
while (partition.MoveNext())
await body(partition.Current).ConfigureAwait(false);
}));
}
}
然后像这样使用:
Task.Run(async () =>
{
var s3 = new AmazonS3Client(Config.Instance.Aws.Credentials, Config.Instance.Aws.RegionEndpoint);
var buckets = await s3.ListBucketsAsync();
foreach (var s3Bucket in buckets.Buckets)
{
if (s3Bucket.BucketName.StartsWith("mybucket-"))
{
log.Information("Bucket => {BucketName}", s3Bucket.BucketName);
ListObjectsResponse objects;
try
{
objects = await s3.ListObjectsAsync(s3Bucket.BucketName);
}
catch
{
log.Error("Error getting objects. Bucket => {BucketName}", s3Bucket.BucketName);
continue;
}
// ForEachAsync (4 is how many tasks you want to run in parallel)
await objects.S3Objects.ForEachAsync(4, async s3Object =>
{
try
{
log.Information("Bucket => {BucketName} => {Key}", s3Bucket.BucketName, s3Object.Key);
await s3.DeleteObjectAsync(s3Bucket.BucketName, s3Object.Key);
}
catch
{
log.Error("Error deleting bucket {BucketName} object {Key}", s3Bucket.BucketName, s3Object.Key);
}
});
try
{
await s3.DeleteBucketAsync(s3Bucket.BucketName);
}
catch
{
log.Error("Error deleting bucket {BucketName}", s3Bucket.BucketName);
}
}
}
}).Wait();
public static class ParallelExecutor
{
/// <summary>
/// Executes asynchronously given function on all elements of given enumerable with task count restriction.
/// Executor will continue starting new tasks even if one of the tasks throws. If at least one of the tasks throwed exception then <see cref="AggregateException"/> is throwed at the end of the method run.
/// </summary>
/// <typeparam name="T">Type of elements in enumerable</typeparam>
/// <param name="maxTaskCount">The maximum task count.</param>
/// <param name="enumerable">The enumerable.</param>
/// <param name="asyncFunc">asynchronous function that will be executed on every element of the enumerable. MUST be thread safe.</param>
/// <param name="onException">Acton that will be executed on every exception that would be thrown by asyncFunc. CAN be thread unsafe.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task ForEachAsync<T>(int maxTaskCount, IEnumerable<T> enumerable, Func<T, Task> asyncFunc, Action<Exception> onException = null, CancellationToken cancellationToken = default)
{
using var semaphore = new SemaphoreSlim(initialCount: maxTaskCount, maxCount: maxTaskCount);
// This `lockObject` is used only in `catch { }` block.
object lockObject = new object();
var exceptions = new List<Exception>();
var tasks = new Task[enumerable.Count()];
int i = 0;
try
{
foreach (var t in enumerable)
{
await semaphore.WaitAsync(cancellationToken);
tasks[i++] = Task.Run(
async () =>
{
try
{
await asyncFunc(t);
}
catch (Exception e)
{
if (onException != null)
{
lock (lockObject)
{
onException.Invoke(e);
}
}
// This exception will be swallowed here but it will be collected at the end of ForEachAsync method in order to generate AggregateException.
throw;
}
finally
{
semaphore.Release();
}
}, cancellationToken);
if (cancellationToken.IsCancellationRequested)
{
break;
}
}
}
catch (OperationCanceledException e)
{
exceptions.Add(e);
}
foreach (var t in tasks)
{
if (cancellationToken.IsCancellationRequested)
{
break;
}
// Exception handling in this case is actually pretty fast.
// https://gist.github.com/shoter/d943500eda37c7d99461ce3dace42141
try
{
await t;
}
#pragma warning disable CA1031 // Do not catch general exception types - we want to throw that exception later as aggregate exception. Nothing wrong here.
catch (Exception e)
#pragma warning restore CA1031 // Do not catch general exception types
{
exceptions.Add(e);
}
}
if (exceptions.Any())
{
throw new AggregateException(exceptions);
}
}
}