以指数后退方式重试芹菜任务

对于这样的任务:

from celery.decorators import task


@task()
def add(x, y):
if not x or not y:
raise Exception("test error")
return self.wait_until_server_responds(

如果它抛出一个异常,我想从守护进程端重试,如何应用指数退出算法,即后 2^2, 2^3,2^4等秒?

另外,是否从服务器端维护重试,以便如果工作者碰巧被杀死,那么下一个产生的工作者将执行重试任务?

39573 次浏览

The task.request.retries attribute contains the number of tries so far, so you can use this to implement exponential back-off:

from celery.task import task


@task(bind=True, max_retries=3)
def update_status(self, auth, status):
try:
Twitter(auth).update_status(status)
except Twitter.WhaleFail as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries)

To prevent a Thundering Herd Problem, you may consider adding a random jitter to your exponential backoff:

import random
self.retry(exc=exc, countdown=int(random.uniform(2, 4) ** self.request.retries))

As of Celery 4.2 you can configure your tasks to use an exponential backoff automatically: http://docs.celeryproject.org/en/master/userguide/tasks.html#automatic-retry-for-known-exceptions

@app.task(autoretry_for=(Exception,), retry_backoff=2)
def add(x, y):
...

(This was already in the docs for Celery 4.1 but actually wasn't released then, see merge request)

FYI, celery has a util function to calculate exponential backoff time with jitter here, so you don't need to write your own.

def get_exponential_backoff_interval(
factor,
retries,
maximum,
full_jitter=False
):
"""Calculate the exponential backoff wait time."""
# Will be zero if factor equals 0
countdown = min(maximum, factor * (2 ** retries))
# Full jitter according to
# https://www.awsarchitectureblog.com/2015/03/backoff.html
if full_jitter:
countdown = random.randrange(countdown + 1)
# Adjust according to maximum wait time and account for negative values.
return max(0, countdown)