如何检查芹菜的任务状态?

如何检查任务是否在芹菜中运行(具体来说,我使用的是芹菜 -django) ?

我看过文件,谷歌过,但我看不到这样的电话:

my_example_task.state() == RUNNING

我的用例是我有一个用于代码转换的外部(java)服务。当我发送要转码的文档时,我想检查运行该服务的任务是否正在运行,如果没有,则(重新)启动它。

我使用的是当前的稳定版本——我相信是2.4。

164824 次浏览

返回 task _ id (这是从.  延迟()中给出的) ,然后询问芹菜实例关于状态的信息:

x = method.delay(1,2)
print x.task_id

在询问时,使用 task _ id 获取一个新的 AsyncResult:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()

每个 Task对象都有一个 .request属性,其中包含它的 AsyncRequest对象。因此,下面一行给出了 Tasktask的状态:

task.AsyncResult(task.request.id).state

您还可以创建自定义状态并更新它的值调整任务执行。 这个例子来自文档:

@app.task(bind=True)
def upload_files(self, filenames):
for i, file in enumerate(filenames):
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(filenames)})

Http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states

试试:

task.AsyncResult(task.request.id).state

这将提供芹菜任务状态。如果芹菜任务已经处于 失败状态,它将抛出一个异常:

raised unexpected: KeyError('exc_type',)

Creating an AsyncResult object from the task id the way recommended in the 常见问题 to obtain the task status when the only thing you have is the task id.

然而,对于芹菜3.x,有一些重要的警告,如果人们不注意它们,它们可能会咬人。它实际上取决于特定的用例场景。

默认情况下,芹菜没有记录“运行”状态。

为了让芹菜记录任务正在运行,必须将 task_track_started设置为 True。下面是一个简单的测试任务:

@app.task(bind=True)
def test(self):
print self.AsyncResult(self.request.id).state

task_track_startedFalse(默认值)时,即使任务已经启动,状态显示也是 PENDING。如果将 task_track_started设置为 True,则状态为 STARTED

PENDING的意思是“我不知道”

状态为 PENDINGAsyncResult并不意味着比芹菜不知道任务的状态更多的东西。这可能有很多原因。

一方面,AsyncResult可以用无效的任务 ID 来构建。这样的“任务”将被芹菜视为未决的:

>>> task.AsyncResult("invalid").status
'PENDING'

好的,所以没有人会把 obviously的无效 ID 提供给 AsyncResult。公平的足够,但它也有效,AsyncResult0再次,AsyncResult1这可能是一个问题。部分问题取决于如何配置芹菜来保存任务的结果,因为它取决于结果后端中“ Tombstone”的可用性。(“ Tombstone”是 Celery 文档中用于记录任务结束方式的数据块的术语。)如果 AsyncResult2是 True,那么使用 AsyncResult根本不起作用。更令人烦恼的问题是,芹菜的过期墓碑默认。默认情况下,AsyncResult3设置为24小时。因此,如果您启动一个任务,并在长期存储中记录 id,24小时以后,您用它创建了一个 AsyncResult,那么状态将是 PENDING

All "real tasks" start in the PENDING state. So getting PENDING on a task could mean that the task was requested but never progressed further than this (for whatever reason). Or it could mean the task ran but Celery forgot its state.

Ouch! AsyncResult won't work for me. What else can I do?

我喜欢保持轨道的 目标比保持轨道的 任务本身。我确实保留了一些任务信息,但这对于跟踪目标来说是次要的。目标是储存在独立的存储芹菜。当一个请求需要执行一个计算依赖于某个目标已经实现,它检查目标是否已经实现,如果是,那么它使用这个缓存的目标,否则它启动将影响目标的任务,并向客户端发送一个响应,该响应指示它应该等待一个结果。


上面的变量名和超链接是针对芹菜4.x 的。在3.x 中,相应的变量和超链接是: CELERY_TRACK_STARTEDCELERY_IGNORE_RESULTCELERY_TASK_RESULT_EXPIRES

老问题,但我最近遇到了这个问题。

如果你想得到 task _ id,你可以这样做:

import celery
from celery_app import add
from celery import uuid


task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

现在您已经准确地知道 task _ id 是什么,并且现在可以使用它来获取 AsyncResult:

# grab the AsyncResult
result = celery.result.AsyncResult(task_id)


# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf


# print the AsyncResult's status
print result.status
SUCCESS


# print the result returned
print result.result
4

对于简单的任务,我们可以使用 http://flower.readthedocs.io/en/latest/screenshots.htmlhttp://policystat.github.io/jobtastic/来进行监视。

对于复杂的任务,比如说一个任务需要处理很多其他模块。我们建议手动记录特定任务单元上的进度和消息。

我在网上找到了有用的信息

芹菜工程检验工人指南

对于我来说,我在检查芹菜是否在竞选。

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
state = 'FAILURE'
else:
state = str(task.state)

你可以通过检查来满足你的需求。

除了上述方法 使用花卉任务状态可以很容易地看到。

使用芹菜事件进行实时监控。 Flower 是一个基于 Web 的工具,用于监控和管理芹菜群。

  1. 任务进度和历史
  2. 能够显示任务详细信息(参数、开始时间、运行时等)
  3. 图表和统计数据

Official Document: 花卉-芹菜监测工具

Installation:

$ pip install flower

用法:

http://localhost:5555

Update: 这是版本控制的问题,flower (version = 0.9.7)只适用于芹菜(version = 4.4.7) ,当你安装 flower 的时候,它会把你的芹菜高级版卸载到4.4.7,这对注册任务是不起作用的

只要使用 芹菜常见问题中的这个 API

result = app.AsyncResult(task_id)

This works fine.

  • 首先,在你的芹菜应用程序中:

Vi my _ celery _ apps/app1.py

app = Celery(worker_name)
  • 然后,切换到任务文件,从芹菜应用程序模块导入应用程序。

Vi task/task1.py

from my_celery_apps.app1 import app


app.AsyncResult(taskid)


try:
if task.state.lower() != "success":
return
except:
""" do something """

2020年答案:

#### tasks.py
@celery.task()
def mytask(arg1):
print(arg1)


#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
state = process.state
return f"Thanks for your patience, your job {process.task_id} \
is being processed. Status {state}"
res = method.delay()
    

print(f"id={res.id}, state={res.state}, status={res.status} ")


print(res.get())