取消芹菜已经执行的任务?

我一直在阅读这篇文章,寻找答案,但似乎找不到一个直截了当的答案:

可以取消已经执行的任务吗?(因为任务已经开始,需要一段时间,中途需要取消)

我从 芹菜常见问题的医生那里找到了这个

>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

但是我不清楚这是否会取消排队的任务,或者它是否会杀死一个正在运行的工作进程。谢谢你为我们带来的一切光明!

120739 次浏览

请参阅以下任务选项: 时间限制软时间限制(或者您可以为 worker 设置它)。如果您不仅想控制执行时间,那么请参见 application _ sync 方法的 过期了参数。

Reveke 取消任务执行。如果一个任务被撤销,工作者将忽略该任务并不执行它。如果您不使用持久撤销,您的任务可以在 worker 重新启动后执行。

Https://docs.celeryq.dev/en/stable/userguide/workers.html#worker-persistent-revokes

Reveke 有一个终止选项,默认情况下是 假的。如果需要终止正在执行的任务,则需要将 finally 设置为 没错

>>> from celery.task.control import revoke
>>> revoke(task_id, terminate=True)

https://docs.celeryq.dev/en/stable/userguide/workers.html#revoke-revoking-tasks

在芹菜3.1中,API of revoking tasks发生了改变。

根据 Celery FAQ,您应该使用 result. revoke:

>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

或者如果你只有任务 ID:

>>> from proj.celery import app
>>> app.control.revoke(task_id)

@ 0x00mh 的回答是正确的,但是最近的芹菜 医生说,使用 terminate选项是“ 管理人员的最后手段”,因为您可能意外地终止另一个在此期间开始执行的任务。可能更好的解决方案是将 terminate=Truesignal='SIGUSR1'结合起来(这会导致在任务中引发 SoftTimeLimitExceeded 异常)。

In addition, unsatisfactory, there is another way(abort task) to stop the task, but there are many unreliability, more details, see: Http://docs.celeryproject.org/en/latest/reference/celery.contrib.abortable.html

from celery.app import default_app


revoked = default_app.control.revoke(task_id, terminated=True, signal='SIGKILL')
print(revoked)

根据5.2.3文档,可以运行以下命令:

    celery.control.revoke(task_id, terminate=True, signal='SIGKILL')

where celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])

链接到文档: https://docs.celeryq.dev/en/stable/reference/celery.app.control.html?highlight=revoke#celery.app.control.Control.revoke

你定义芹菜应用程序与代理和后端类似:

from celery import Celery
celeryapp = Celery('app', broker=redis_uri, backend=redis_uri)

当您运行 send task 时,它会返回 Task 的惟一 id:

task_id = celeryapp.send_task('run.send_email', queue = "demo")

要撤销任务,你需要芹菜应用程序和任务 ID:

celeryapp.control.revoke(task_id, terminate=True)