异步等待任务使用超时完成

I 想用一些特殊的规则等待Task< T>完成: 如果在X毫秒后还没有完成,我希望向用户显示一条消息。 如果在Y毫秒后还没有完成,I 想要自动请求取消.

I 可以使用的任务。ContinueWith来异步等待任务完成(即计划在任务完成时执行一个动作),但不允许指定超时。 我可以使用的任务。等待来同步等待任务超时完成,但这会阻塞我的线程。 我如何异步等待任务超时完成?< / p >
326555 次浏览

像这样的东西怎么样?

    const int x = 3000;
const int y = 1000;


static void Main(string[] args)
{
// Your scheduler
TaskScheduler scheduler = TaskScheduler.Default;


Task nonblockingTask = new Task(() =>
{
CancellationTokenSource source = new CancellationTokenSource();


Task t1 = new Task(() =>
{
while (true)
{
// Do something
if (source.IsCancellationRequested)
break;
}
}, source.Token);


t1.Start(scheduler);


// Wait for task 1
bool firstTimeout = t1.Wait(x);


if (!firstTimeout)
{
// If it hasn't finished at first timeout display message
Console.WriteLine("Message to user: the operation hasn't completed yet.");


bool secondTimeout = t1.Wait(y);


if (!secondTimeout)
{
source.Cancel();
Console.WriteLine("Operation stopped!");
}
}
});


nonblockingTask.Start();
Console.WriteLine("Do whatever you want...");
Console.ReadLine();
}

您可以使用任务。等待选项,不阻塞主线程使用另一个任务。

你可以使用Task.WaitAny来等待多个任务中的第一个。

你可以创建两个额外的任务(在指定的超时后完成),然后使用WaitAny来等待先完成的任务。如果最先完成的任务是你的“工作”任务,那么你就完成了。如果最先完成的任务是一个超时任务,那么您可以对超时做出反应(例如,请求取消)。

使用计时器来处理消息和自动取消。当Task完成时,对计时器调用Dispose,以便它们永远不会触发。这里有一个例子;将taskDelay改为500、1500或2500来查看不同的情况:

using System;
using System.Threading;
using System.Threading.Tasks;


namespace ConsoleApplication1
{
class Program
{
private static Task CreateTaskWithTimeout(
int xDelay, int yDelay, int taskDelay)
{
var cts = new CancellationTokenSource();
var token = cts.Token;
var task = Task.Factory.StartNew(() =>
{
// Do some work, but fail if cancellation was requested
token.WaitHandle.WaitOne(taskDelay);
token.ThrowIfCancellationRequested();
Console.WriteLine("Task complete");
});
var messageTimer = new Timer(state =>
{
// Display message at first timeout
Console.WriteLine("X milliseconds elapsed");
}, null, xDelay, -1);
var cancelTimer = new Timer(state =>
{
// Display message and cancel task at second timeout
Console.WriteLine("Y milliseconds elapsed");
cts.Cancel();
}
, null, yDelay, -1);
task.ContinueWith(t =>
{
// Dispose the timers when the task completes
// This will prevent the message from being displayed
// if the task completes before the timeout
messageTimer.Dispose();
cancelTimer.Dispose();
});
return task;
}


static void Main(string[] args)
{
var task = CreateTaskWithTimeout(1000, 2000, 2500);
// The task has been started and will display a message after
// one timeout and then cancel itself after the second
// You can add continuations to the task
// or wait for the result as needed
try
{
task.Wait();
Console.WriteLine("Done waiting for task");
}
catch (AggregateException ex)
{
Console.WriteLine("Error waiting for task:");
foreach (var e in ex.InnerExceptions)
{
Console.WriteLine(e);
}
}
}
}
}

另外,异步CTP提供了一个TaskEx。Delay方法,它将为您在任务中包装计时器。这可以给你更多的控制来做一些事情,比如设置TaskScheduler为Timer触发时的延续。

private static Task CreateTaskWithTimeout(
int xDelay, int yDelay, int taskDelay)
{
var cts = new CancellationTokenSource();
var token = cts.Token;
var task = Task.Factory.StartNew(() =>
{
// Do some work, but fail if cancellation was requested
token.WaitHandle.WaitOne(taskDelay);
token.ThrowIfCancellationRequested();
Console.WriteLine("Task complete");
});


var timerCts = new CancellationTokenSource();


var messageTask = TaskEx.Delay(xDelay, timerCts.Token);
messageTask.ContinueWith(t =>
{
// Display message at first timeout
Console.WriteLine("X milliseconds elapsed");
}, TaskContinuationOptions.OnlyOnRanToCompletion);


var cancelTask = TaskEx.Delay(yDelay, timerCts.Token);
cancelTask.ContinueWith(t =>
{
// Display message and cancel task at second timeout
Console.WriteLine("Y milliseconds elapsed");
cts.Cancel();
}, TaskContinuationOptions.OnlyOnRanToCompletion);


task.ContinueWith(t =>
{
timerCts.Cancel();
});


return task;
}

这个怎么样:

int timeout = 1000;
var task = SomeOperationAsync();
if (await Task.WhenAny(task, Task.Delay(timeout)) == task) {
// task completed within timeout
} else {
// timeout logic
}

这里是一篇很棒的博客文章“制定任务”。TimeoutAfter Method”(来自MS并行库团队)提供了关于这类事情的更多信息

除了:在对我的回答的评论的要求下,这里是一个扩展的解决方案,包括取消处理。请注意,将取消传递给任务和计时器意味着在代码中可以经历多种取消方式,您应该确保测试并确信您正确地处理了所有这些方法。不要让各种组合的机会,并希望您的计算机在运行时做正确的事情。

int timeout = 1000;
var task = SomeOperationAsync(cancellationToken);
if (await Task.WhenAny(task, Task.Delay(timeout, cancellationToken)) == task)
{
// Task completed within timeout.
// Consider that the task may have faulted or been canceled.
// We re-await the task so that any exceptions/cancellation is rethrown.
await task;


}
else
{
// timeout/cancellation logic
}

下面是一个扩展方法版本,它包含了Andrew Arnott在他的回答注释中建议的在原始任务完成时取消超时。

public static async Task<TResult> TimeoutAfter<TResult>(this Task<TResult> task, TimeSpan timeout) {


using (var timeoutCancellationTokenSource = new CancellationTokenSource()) {


var completedTask = await Task.WhenAny(task, Task.Delay(timeout, timeoutCancellationTokenSource.Token));
if (completedTask == task) {
timeoutCancellationTokenSource.Cancel();
return await task;  // Very important in order to propagate exceptions
} else {
throw new TimeoutException("The operation has timed out.");
}
}
}

下面是一个基于投票最多的答案的完整示例,即:

int timeout = 1000;
var task = SomeOperationAsync();
if (await Task.WhenAny(task, Task.Delay(timeout)) == task) {
// task completed within timeout
} else {
// timeout logic
}

这个答案中的实现的主要优点是添加了泛型,因此函数(或任务)可以返回一个值。这意味着任何现有的函数都可以包装在超时函数中,例如:

之前:

int x = MyFunc();

后:

// Throws a TimeoutException if MyFunc takes more than 1 second
int x = TimeoutAfter(MyFunc, TimeSpan.FromSeconds(1));

这段代码需要。net 4.5。

using System;
using System.Threading;
using System.Threading.Tasks;


namespace TaskTimeout
{
public static class Program
{
/// <summary>
///     Demo of how to wrap any function in a timeout.
/// </summary>
private static void Main(string[] args)
{


// Version without timeout.
int a = MyFunc();
Console.Write("Result: {0}\n", a);
// Version with timeout.
int b = TimeoutAfter(() => { return MyFunc(); },TimeSpan.FromSeconds(1));
Console.Write("Result: {0}\n", b);
// Version with timeout (short version that uses method groups).
int c = TimeoutAfter(MyFunc, TimeSpan.FromSeconds(1));
Console.Write("Result: {0}\n", c);


// Version that lets you see what happens when a timeout occurs.
try
{
int d = TimeoutAfter(
() =>
{
Thread.Sleep(TimeSpan.FromSeconds(123));
return 42;
},
TimeSpan.FromSeconds(1));
Console.Write("Result: {0}\n", d);
}
catch (TimeoutException e)
{
Console.Write("Exception: {0}\n", e.Message);
}


// Version that works on tasks.
var task = Task.Run(() =>
{
Thread.Sleep(TimeSpan.FromSeconds(1));
return 42;
});


// To use async/await, add "await" and remove "GetAwaiter().GetResult()".
var result = task.TimeoutAfterAsync(TimeSpan.FromSeconds(2)).
GetAwaiter().GetResult();


Console.Write("Result: {0}\n", result);


Console.Write("[any key to exit]");
Console.ReadKey();
}


public static int MyFunc()
{
return 42;
}


public static TResult TimeoutAfter<TResult>(
this Func<TResult> func, TimeSpan timeout)
{
var task = Task.Run(func);
return TimeoutAfterAsync(task, timeout).GetAwaiter().GetResult();
}


private static async Task<TResult> TimeoutAfterAsync<TResult>(
this Task<TResult> task, TimeSpan timeout)
{
var result = await Task.WhenAny(task, Task.Delay(timeout));
if (result == task)
{
// Task completed within timeout.
return task.GetAwaiter().GetResult();
}
else
{
// Task timed out.
throw new TimeoutException();
}
}
}
}

警告

给出这个答案后,在正常操作期间在代码中抛出异常通常是的好做法,除非你绝对必须:

  • 每次抛出异常,都是非常重量级的操作,
  • 如果异常处于紧密循环中,异常会使代码速度降低100倍或更多。

只有当你绝对不能改变你正在调用的函数时才使用这段代码,这样它就会在特定的TimeSpan之后超时。

这个答案实际上只适用于处理无法重构为包含超时参数的第三方库库。

如何编写健壮的代码

如果你想写健壮的代码,一般规则是这样的:

每一个可能无限期阻塞的操作都必须有一个超时。

如果你遵守这个规则,你的代码最终会遇到一个操作因为某种原因失败,然后它会无限期地阻塞,你的应用程序只是永久挂起。

如果在一段时间后出现了合理的超时,那么你的应用程序会挂起一段极端的时间(例如30秒),然后它会显示一个错误并继续它的快乐方式,或者重试。

如果使用BlockingCollection来调度任务,生产者可以运行可能长时间运行的任务,消费者可以使用TryTake方法,该方法具有内置的超时和取消令牌。

解决这个问题的另一种方法是使用响应式扩展:

public static Task TimeoutAfter(this Task task, TimeSpan timeout, IScheduler scheduler)
{
return task.ToObservable().Timeout(timeout, scheduler).ToTask();
}

测试上面使用下面的代码在你的单元测试,它为我工作

TestScheduler scheduler = new TestScheduler();
Task task = Task.Run(() =>
{
int i = 0;
while (i < 5)
{
Console.WriteLine(i);
i++;
Thread.Sleep(1000);
}
})
.TimeoutAfter(TimeSpan.FromSeconds(5), scheduler)
.ContinueWith(t => { }, TaskContinuationOptions.OnlyOnFaulted);


scheduler.AdvanceBy(TimeSpan.FromSeconds(6).Ticks);

您可能需要以下命名空间:

using System.Threading.Tasks;
using System.Reactive.Subjects;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using Microsoft.Reactive.Testing;
using System.Threading;
using System.Reactive.Concurrency;

上面@Kevan的答案的通用版本,使用响应式扩展。

public static Task<T> TimeoutAfter<T>(this Task<T> task, TimeSpan timeout, IScheduler scheduler)
{
return task.ToObservable().Timeout(timeout, scheduler).ToTask();
}

可选调度器:

public static Task<T> TimeoutAfter<T>(this Task<T> task, TimeSpan timeout, Scheduler scheduler = null)
{
return scheduler is null
? task.ToObservable().Timeout(timeout).ToTask()
: task.ToObservable().Timeout(timeout, scheduler).ToTask();
}

BTW:当Timeout发生时,将抛出一个超时异常

使用Stephen Cleary的优秀AsyncEx库,你可以做到:

TimeSpan timeout = TimeSpan.FromSeconds(10);


using (var cts = new CancellationTokenSource(timeout))
{
await myTask.WaitAsync(cts.Token);
}

TaskCanceledException将在超时时被抛出。

我觉得另一个中的Task.Delay()任务和CancellationTokenSource回答了我在一个紧密的网络循环中的用例。

虽然乔·霍格的《制作任务》。MSDN博客上的TimeoutAfter方法是鼓舞人心的,但出于同样的原因,我对使用TimeoutException进行流控制有点厌倦,因为超时的频率比不超时的频率要高。

所以我使用了这个,它也处理了博客中提到的优化:

public static async Task<bool> BeforeTimeout(this Task task, int millisecondsTimeout)
{
if (task.IsCompleted) return true;
if (millisecondsTimeout == 0) return false;


if (millisecondsTimeout == Timeout.Infinite)
{
await Task.WhenAll(task);
return true;
}


var tcs = new TaskCompletionSource<object>();


using (var timer = new Timer(state => ((TaskCompletionSource<object>)state).TrySetCanceled(), tcs,
millisecondsTimeout, Timeout.Infinite))
{
return await Task.WhenAny(task, tcs.Task) == task;
}
}

一个示例用例如下:

var receivingTask = conn.ReceiveAsync(ct);


while (!await receivingTask.BeforeTimeout(keepAliveMilliseconds))
{
// Send keep-alive
}


// Read and do something with data
var data = await receivingTask;

安德鲁·阿诺特(Andrew Arnott)回答的几个变体:

  1. 如果你想等待一个现有的任务,并找出它是否完成或超时,但不想在超时发生时取消它:

    public static async Task<bool> TimedOutAsync(this Task task, int timeoutMilliseconds)
    {
    if (timeoutMilliseconds < 0 || (timeoutMilliseconds > 0 && timeoutMilliseconds < 100)) { throw new ArgumentOutOfRangeException(); }
    
    
    if (timeoutMilliseconds == 0) {
    return !task.IsCompleted; // timed out if not completed
    }
    var cts = new CancellationTokenSource();
    if (await Task.WhenAny( task, Task.Delay(timeoutMilliseconds, cts.Token)) == task) {
    cts.Cancel(); // task completed, get rid of timer
    await task; // test for exceptions or task cancellation
    return false; // did not timeout
    } else {
    return true; // did timeout
    }
    }
    
  2. If you want to start a work task and cancel the work if the timeout occurs:

    public static async Task<T> CancelAfterAsync<T>( this Func<CancellationToken,Task<T>> actionAsync, int timeoutMilliseconds)
    {
    if (timeoutMilliseconds < 0 || (timeoutMilliseconds > 0 && timeoutMilliseconds < 100)) { throw new ArgumentOutOfRangeException(); }
    
    
    var taskCts = new CancellationTokenSource();
    var timerCts = new CancellationTokenSource();
    Task<T> task = actionAsync(taskCts.Token);
    if (await Task.WhenAny(task, Task.Delay(timeoutMilliseconds, timerCts.Token)) == task) {
    timerCts.Cancel(); // task completed, get rid of timer
    } else {
    taskCts.Cancel(); // timer completed, get rid of task
    }
    return await task; // test for exceptions or task cancellation
    }
    
  3. If you have a task already created that you want to cancel if a timeout occurs:

    public static async Task<T> CancelAfterAsync<T>(this Task<T> task, int timeoutMilliseconds, CancellationTokenSource taskCts)
    {
    if (timeoutMilliseconds < 0 || (timeoutMilliseconds > 0 && timeoutMilliseconds < 100)) { throw new ArgumentOutOfRangeException(); }
    
    
    var timerCts = new CancellationTokenSource();
    if (await Task.WhenAny(task, Task.Delay(timeoutMilliseconds, timerCts.Token)) == task) {
    timerCts.Cancel(); // task completed, get rid of timer
    } else {
    taskCts.Cancel(); // timer completed, get rid of task
    }
    return await task; // test for exceptions or task cancellation
    }
    

Another comment, these versions will cancel the timer if the timeout does not occur, so multiple calls will not cause timers to pile up.

sjb

这是对之前答案的稍微强化版。

  • 除了劳伦斯的回答之外,它还在超时发生时取消原始任务。
  • 除了Sjb的答案变量2和3,你还可以为原始任务提供CancellationToken,当超时发生时,你会得到TimeoutException而不是OperationCanceledException
async Task<TResult> CancelAfterAsync<TResult>(
Func<CancellationToken, Task<TResult>> startTask,
TimeSpan timeout, CancellationToken cancellationToken)
{
using (var timeoutCancellation = new CancellationTokenSource())
using (var combinedCancellation = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken, timeoutCancellation.Token))
{
var originalTask = startTask(combinedCancellation.Token);
var delayTask = Task.Delay(timeout, timeoutCancellation.Token);
var completedTask = await Task.WhenAny(originalTask, delayTask);
// Cancel timeout to stop either task:
// - Either the original task completed, so we need to cancel the delay task.
// - Or the timeout expired, so we need to cancel the original task.
// Canceling will not affect a task, that is already completed.
timeoutCancellation.Cancel();
if (completedTask == originalTask)
{
// original task completed
return await originalTask;
}
else
{
// timeout
throw new TimeoutException();
}
}
}

使用

InnerCallAsync可能需要很长时间才能完成。CallAsync用一个超时来包装它。

async Task<int> CallAsync(CancellationToken cancellationToken)
{
var timeout = TimeSpan.FromMinutes(1);
int result = await CancelAfterAsync(ct => InnerCallAsync(ct), timeout,
cancellationToken);
return result;
}


async Task<int> InnerCallAsync(CancellationToken cancellationToken)
{
return 42;
}

我将这里的一些其他答案和这个答案在另一个线程的想法重新组合成一个try风格的扩展方法。如果您想要一个扩展方法,这有一个好处,同时避免超时时出现异常。

public static async Task<bool> TryWithTimeoutAfter<TResult>(this Task<TResult> task,
TimeSpan timeout, Action<TResult> successor)
{


using var timeoutCancellationTokenSource = new CancellationTokenSource();
var completedTask = await Task.WhenAny(task, Task.Delay(timeout, timeoutCancellationTokenSource.Token))
.ConfigureAwait(continueOnCapturedContext: false);


if (completedTask == task)
{
timeoutCancellationTokenSource.Cancel();


// propagate exception rather than AggregateException, if calling task.Result.
var result = await task.ConfigureAwait(continueOnCapturedContext: false);
successor(result);
return true;
}
else return false;
}


async Task Example(Task<string> task)
{
string result = null;
if (await task.TryWithTimeoutAfter(TimeSpan.FromSeconds(1), r => result = r))
{
Console.WriteLine(result);
}
}

所以这是古老的,但有一个更好的现代解决方案。不确定c#/的哪个版本。NET是必需的,但这是我怎么做的:


... Other method code not relevant to the question.


// a token source that will timeout at the specified interval, or if cancelled outside of this scope
using var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, timeoutTokenSource.Token);


async Task<MessageResource> FetchAsync()
{
try
{
return await MessageResource.FetchAsync(m.Sid);
} catch (TaskCanceledException e)
{
if (timeoutTokenSource.IsCancellationRequested)
throw new TimeoutException("Timeout", e);
throw;
}
}


return await Task.Run(FetchAsync, linkedTokenSource.Token);

CancellationTokenSource构造函数接受一个TimeSpan参数,该参数将导致令牌在该间隔结束后取消。然后,您可以将异步(或同步,就此而言)代码包装在另一个Task.Run调用中,传递超时令牌。

这假设你正在传递一个取消令牌(token变量)。如果你不需要在超时后单独取消任务,你可以直接使用timeoutTokenSource。否则,创建linkedTokenSource,它将在超时发生时取消,如果超时发生则取消

然后,我们只捕获OperationCancelledException并检查是哪个令牌引发了异常,如果超时导致引发异常,则抛出TimeoutException。否则,我们重新抛出。

此外,我在这里使用的是c# 7中引入的局部函数,但您可以很容易地使用lambda或实际函数来达到同样的效果。类似地,c# 8为使用语句引入了更简单的语法,但这些语法很容易重写。

创建一个扩展来等待任务或延迟完成,以先发生者为准。如果延迟成功,则抛出异常。

public static async Task<TResult> WithTimeout<TResult>(this Task<TResult> task, TimeSpan timeout)
{
if (await Task.WhenAny(task, Task.Delay(timeout)) != task)
throw new TimeoutException();
return await task;
}
对于。net 6(预览7作为回答的日期),可以使用新的CancellationToken WaitAsync(时间间隔)来回答这个特殊的需要。 如果你可以使用。net 6,如果我们将这个版本与本文中提出的大多数好的解决方案进行比较,那么这个版本将被描述为是优化的

(感谢所有参与者,因为我多年来一直使用您的解决方案)

从。net 6 (Preview 7)或更高版本开始,有一个新的内置方法的任务。WaitAsync来实现这一点。

// Using TimeSpan
await myTask.WaitAsync(TimeSpan.FromSeconds(10));


// Using CancellationToken
await myTask.WaitAsync(cancellationToken);


// Using both TimeSpan and CancellationToken
await myTask.WaitAsync(TimeSpan.FromSeconds(10), cancellationToken);

如果任务在TimeSpanCancellationToken之前没有完成,则分别抛出TimeoutExceptionTaskCanceledException

try
{
await myTask.WaitAsync(TimeSpan.FromSeconds(10), cancellationToken);


}
catch (TaskCanceledException)
{
Console.WriteLine("Task didn't get finished before the `CancellationToken`");
}
catch (TimeoutException)
{
Console.WriteLine("Task didn't get finished before the `TimeSpan`");
}

下面是WaitAsync方法的低级实现,它同时接受超时和CancellationToken,并且在异常情况下传播Task<T>的所有错误,而不仅仅是第一个错误:

public static Task<TResult> WaitAsync<TResult>(this Task<TResult> task,
TimeSpan timeout, CancellationToken cancellationToken = default)
{
if (task == null) throw new ArgumentNullException(nameof(task));
if (timeout < TimeSpan.Zero && timeout != Timeout.InfiniteTimeSpan)
throw new ArgumentOutOfRangeException(nameof(timeout));


var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(timeout);


return task
.ContinueWith(_ => { }, cts.Token,
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default)
.ContinueWith(continuation =>
{
cts.Dispose();
if (task.IsCompleted) return task;
cancellationToken.ThrowIfCancellationRequested();
if (continuation.IsCanceled) throw new TimeoutException();
return task;
}, TaskScheduler.Default).Unwrap();
}

如果在task完成之前超时,则抛出TimeoutException

老实说,在这种情况下,传播所有错误并不是真正的增值功能。原因是,如果你像这样使用WaitAsync: await someTask.WaitAsync(timeout),任何额外的错误都会被await操作符吞噬,根据设计,它只传播等待任务的第一个异常。将WaitAsync任务存储在变量中并在catch块中检查它没有太大意义,因为你已经有了可用的someTask,你可以检查这个。

为了它的乐趣,我做了一个“OnTimeout”扩展任务。超时时Task执行所需的内联lambda Action()并返回true,否则返回false。

public static async Task<bool> OnTimeout<T>(this T t, Action<T> action, int waitms) where T : Task
{
if (!(await Task.WhenAny(t, Task.Delay(waitms)) == t))
{
action(t);
return true;
} else {
return false;
}
}

OnTimeout扩展返回一个bool结果,可以分配给一个变量,就像这个例子调用UDP套接字Async:

var t = UdpSocket.ReceiveAsync();


var timeout = await t.OnTimeout(task => {
Console.WriteLine("No Response");
}, 5000);

在timeout lambda中可以访问“task”变量以进行更多处理。

Action接收对象的使用可能会启发其他各种扩展设计。