在芹菜中检索队列中的任务列表

如何检索队列中尚未处理的任务列表?

212181 次浏览

我认为获取正在等待的任务的唯一方法是保留一个已启动任务的列表,并让任务在启动时将自己从列表中删除。

通过rabbitmqctl和list_queues,你可以大致了解有多少任务正在等待,而不是任务本身:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

如果你想要的包括正在处理的任务,但还没有完成,你可以保留一个任务列表,并检查它们的状态:

from tasks import add
result = add.delay(4, 4)


result.ready() # True if finished

或者让芹菜使用CELERY_RESULT_BACKEND存储结果,并检查哪些任务不在其中。

EDIT:查看获取队列中任务列表的其他答案。

你应该看这里: 芹菜指南-检查工人 < / p >

基本上是这样的:

my_app = Celery(...)


# Inspect all nodes.
i = my_app.control.inspect()


# Show the items that have an ETA or are scheduled for later processing
i.scheduled()


# Show tasks that are currently active.
i.active()


# Show tasks that have been claimed by workers
i.reserved()

这取决于你想要什么

要从后端检索任务,使用这个

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)

如果你正在使用rabbitMQ,在终端中使用这个:

sudo rabbitmqctl list_queues

它将打印带有挂起任务数量的队列列表。例如:

Listing queues ...
0b27d8c59fba4974893ec22d478a7093    0
0e0a2da9828a48bc86fe993b210d984f    0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7    0
15c036ad25884b82839495fb29bd6395    1
celerey_mail_worker@torob2.celery.pidbox    0
celery  166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa   0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6   0

右边列的数字是队列中的任务数。在上面,芹菜队列有166个待处理的任务。

芹菜检查模块似乎只知道从工作人员的角度来看的任务。如果你想查看队列中的消息(还没有被工人拉出来),我建议使用pyrabbit,它可以与rabbitmq http api接口,从队列中检索各种信息。

这里有一个例子: 使用芹菜检索队列长度(RabbitMQ, Django) < / p >

如果你不使用优先级任务,如果你使用Redis,这实际上是很简单。获取任务计数:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

但是,优先级任务在redis中使用不同的键,所以整个情况稍微复杂一些。总的来说,您需要为任务的每个优先级查询redis。在python中(以及在Flower项目中),它看起来像:

PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]




def make_queue_name_for_pri(queue, pri):
"""Make a queue name for redis


Celery uses PRIORITY_SEP to separate different priorities of tasks into
different queues in Redis. Each queue-priority combination becomes a key in
redis with names like:


- batch1\x06\x163 <-- P3 queue named batch1


There's more information about this in Github, but it doesn't look like it
will change any time soon:


- https://github.com/celery/kombu/issues/422


In that ticket the code below, from the Flower project, is referenced:


- https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135


:param queue: The name of the queue to make a name for.
:param pri: The priority to make a name with.
:return: A name for the queue-priority pair.
"""
if pri not in DEFAULT_PRIORITY_STEPS:
raise ValueError('Priority not in priority steps')
return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
(queue, '', '')))




def get_queue_length(queue_name='celery'):
"""Get the number of tasks in a celery queue.


:param queue_name: The name of the queue you want to inspect.
:return: the number of items in the queue.
"""
priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
DEFAULT_PRIORITY_STEPS]
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
return sum([r.llen(x) for x in priority_names])

如果你想要获得一个实际的任务,你可以使用以下方法:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1

从那里,您必须反序列化返回的列表。以我为例,我可以通过以下方法来实现:

r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))

请注意,反序列化可能需要一些时间,您需要调整上面的命令以处理不同的优先级。

我得出的结论是,获得队列上的作业数的最佳方法是使用rabbitmqctl,正如在这里多次建议的那样。为了允许任何选定的用户使用sudo运行命令,我遵循了指令在这里(我跳过了编辑配置文件部分,因为我不介意在命令之前键入sudo)。

我还抓取了jamesc的grepcut代码片段,并将其包装在子进程调用中。

from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))

据我所知,芹菜没有提供API来检查队列中等待的任务。这是特定于代理的。例如,如果你使用Redis作为代理,那么检查正在celery(默认)队列中等待的任务就像这样简单:

  1. 连接到代理
  2. celery列表中列出项目(以LRANGE命令为例)

请记住,这些任务等待可用的员工来挑选。您的集群可能有一些正在运行的任务——这些任务不会在这个列表中,因为它们已经被选中了。

检索特定队列中的任务的过程是特定于代理的。

Redis json序列化的复制粘贴解决方案:

def get_celery_queue_items(queue_name):
import base64
import json


# Get a configured instance of a celery app:
from yourproject.celery import app as celery_app


with celery_app.pool.acquire(block=True) as conn:
tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
decoded_tasks = []


for task in tasks:
j = json.loads(task)
body = json.loads(base64.b64decode(j['body']))
decoded_tasks.append(body)


return decoded_tasks

它与Django一起工作。只是别忘了改变yourproject.celery

如果你控制了任务的代码,那么你可以通过让任务在第一次执行时触发一个微不足道的重试来解决这个问题,然后检查inspect().reserved()。重试将任务注册到结果后端,芹菜可以看到这一点。任务必须接受selfcontext作为第一个参数,这样我们才能访问重试计数。

@task(bind=True)
def mytask(self):
if self.request.retries == 0:
raise self.retry(exc=MyTrivialError(), countdown=1)
...

这个解决方案与代理无关。你不必担心你是用RabbitMQ还是Redis来存储任务。

编辑:经过测试,我发现这只是一个部分的解决方案。预留的大小受限于worker的预取设置。

如果你正在使用芹菜+ Django最简单的方法来检查任务,使用命令直接从你的终端在你的虚拟环境或使用完整路径芹菜:

医生: http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers

$ celery inspect reserved
$ celery inspect active
$ celery inspect registered
$ celery inspect scheduled

另外,如果你正在使用芹菜+ RabbitMQ,你可以使用以下命令检查队列列表:

更多信息: https://linux.die.net/man/1/rabbitmqctl

$ sudo rabbitmqctl list_queues
from celery.task.control import inspect
def key_in_list(k, l):
return bool([True for i in l if k in i.values()])


def check_task(task_id):
task_value_dict = inspect().active().values()
for task_list in task_value_dict:
if self.key_in_list(task_id, task_list):
return True
return False

subprocess.run:

import subprocess
import re
active_process_txt = subprocess.run(['celery', '-A', 'my_proj', 'inspect', 'active'],
stdout=subprocess.PIPE).stdout.decode('utf-8')
return len(re.findall(r'worker_pid', active_process_txt))

小心用your_proj改变my_proj

这在我的申请中很奏效:

def get_celery_queue_active_jobs(queue_name):
connection = <CELERY_APP_INSTANCE>.connection()


try:
channel = connection.channel()
name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
active_jobs = []


def dump_message(message):
active_jobs.append(message.properties['application_headers']['task'])


channel.basic_consume(queue=queue_name, callback=dump_message)


for job in range(jobs):
connection.drain_events()


return active_jobs
finally:
connection.close()

active_jobs将是一个字符串列表,对应于队列中的任务。

不要忘记将CELERY_APP_INSTANCE替换为您自己的。

感谢@ashish用他的答案在这里给我指明了正确的方向:https://stackoverflow.com/a/19465670/9843399

要获取队列上的任务数,可以使用库,下面是一个简化的示例:

from flower.utils.broker import Broker
from django.conf import settings


def get_queue_length(queue):
broker = Broker(settings.CELERY_BROKER_URL)
queues_result = broker.queues([queue])
return queues_result.result()[0]['messages']