接收未注册类型的任务(运行示例)

我正在从芹菜的文档中运行 例子

我跑: celeryd --loglevel=INFO

/usr/local/lib/python2.7/dist-packages/celery/loaders/default.py:64: NotConfigured: No 'celeryconfig' module found! Please make sure it exists and is available to Python.
"is available to Python." % (configname, )))
[2012-03-19 04:26:34,899: WARNING/MainProcess]


-------------- celery@ubuntu v2.5.1
---- **** -----
--- * ***  * -- [Configuration]
-- * - **** ---   . broker:      amqp://guest@localhost:5672//
- ** ----------   . loader:      celery.loaders.default.Loader
- ** ----------   . logfile:     [stderr]@INFO
- ** ----------   . concurrency: 4
- ** ----------   . events:      OFF
- *** --- * ---   . beat:        OFF
-- ******* ----
--- ***** ----- [Queues]
--------------   . celery:      exchange:celery (direct) binding:celery

Py:

# -*- coding: utf-8 -*-
from celery.task import task


@task
def add(x, y):
return x + y

Run _ task. py:

# -*- coding: utf-8 -*-
from tasks import add
result = add.delay(4, 4)
print (result)
print (result.ready())
print (result.get())

在同一个文件夹 celeryconfig.py 中:

CELERY_IMPORTS = ("tasks", )
CELERY_RESULT_BACKEND = "amqp"
BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_TASK_RESULT_EXPIRES = 300

当我运行“ run _ task.py”:

在巨蟒控制台上

eb503f77-b5fc-44e2-ac0b-91ce6ddbf153
False

芹菜服务器上的错误

[2012-03-19 04:34:14,913: ERROR/MainProcess] Received unregistered task of type 'tasks.add'.
The message has been ignored and discarded.


Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see http://bit.ly/gLye1c for more information.


The full contents of the message body was:
{'retries': 0, 'task': 'tasks.add', 'utc': False, 'args': (4, 4), 'expires': None, 'eta': None, 'kwargs': {}, 'id': '841bc21f-8124-436b-92f1-e3b62cafdfe7'}


Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/celery/worker/consumer.py", line 444, in receive_message
self.strategies[name](message, body, message.ack_log_error)
KeyError: 'tasks.add'

请解释一下有什么问题。

141406 次浏览

您可以在 celery.registry.TaskRegistry类中看到当前已注册任务的列表。可能是你的芹菜配置(工作目录)不在 PYTHONPATH中,所以芹菜找不到它,回到默认值。只要在开始芹菜时明确地指定它。

celeryd --loglevel=INFO --settings=celeryconfig

您也可以设置 --loglevel=DEBUG,您应该可以立即看到问题。

我认为您需要重新启动工作服务器。我遇到了同样的问题,并通过重新启动来解决它。

我也有同样的问题: "Received unregistered task of type.."的原因是 celeryd service 在服务启动时没有找到并注册任务(顺便说一下,当您启动时,它们的列表是可见的 ./manage.py celeryd --loglevel=info).

这些任务应该在设置文件的 CELERY_IMPORTS = ("tasks", )中声明。
如果你有一个特殊的 celery_settings.py文件,它必须声明在芹菜服务启动作为 --settings=celery_settings.py数字吸血鬼写。

我也有同样的问题

CELERY_IMPORTS=("mytasks")

在我的 celeryconfig.py文件解决它。

如果您遇到这种错误,可能有许多原因,但我发现的解决方案是,我在/etc/default/celeryd 中的 celeryd 配置文件配置为标准使用,而不是我的特定 django 项目。当我把它转换成 芹菜文档中指定的格式时,一切都很好。

使用——设置对我来说不起作用:

celery --config=celeryconfig --loglevel=INFO

下面是添加了 CELERY _ IMPORTS 的 celeryconfig 文件:

# Celery configuration file
BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'


CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'America/Los_Angeles'
CELERY_ENABLE_UTC = True


CELERY_IMPORTS = ("tasks",)

我的设置有点棘手,因为我使用主管来启动芹菜作为守护进程。

对我来说,通过确保包含任务的应用程序包含在 django 的 INSTALLED _ APPS 设置下,这个错误得到了解决。

将这一行添加到/etc/default/celeryd 的解决方案

CELERYD_OPTS="-A tasks"

因为当我运行这些命令时:

celery worker --loglevel=INFO
celery worker -A tasks --loglevel=INFO

只有后一个命令完全显示任务名称。

我还尝试过添加 CELERY _ APP 行/etc/default/celeryd,但也没有用。

CELERY_APP="tasks"

无论您使用 CELERY_IMPORTS还是 autodiscover_tasks,重要的一点是能够找到任务,并且在芹菜中注册的任务名称应该与工人试图获取的名称相匹配。

当您启动芹菜,比如说 celery worker -A project --loglevel=DEBUG,您应该看到任务的名称。例如,如果我的 celery.py中有一个 debug_task任务。

[tasks]
. project.celery.debug_task
. celery.backend_cleanup
. celery.chain
. celery.chord
. celery.chord_unlock
. celery.chunks
. celery.group
. celery.map
. celery.starmap

如果您在列表中看不到您的任务,请检查您的芹菜配置正确导入任务,无论是在 --setting--configceleryconfigconfig_from_object

如果您正在使用芹菜拍,请确保 CELERYBEAT_SCHEDULE中使用的任务名称 task与芹菜任务列表中的名称相匹配。

我也遇到过这个问题,但是不完全一样,所以仅供参考。由于此修饰符语法,最近的升级会导致此错误消息。

ERROR/MainProcess] Received unregistered task of type 'my_server_check'.

@task('my_server_check')

不得不改变

@task()

不知道为什么。

我在 django-celery 中遇到了类  件件的问题,而在每次执行触发芹菜工人时,它们的名字都显示得很好:

键错误: u‘ my _ app. tasks.run’

我的任务是一个名为“ CleanUp”的类,而不仅仅是一个名为“ run”的方法。

当我检查表“ djcelery _ 遗传”我看到过期的条目,并删除他们修复的问题。

当我在 Django 应用程序中添加了一些信号处理功能时,这个问题莫名其妙地出现了。在这样做时,我将应用程序转换为使用 AppConfig,这意味着它读取的不是 INSTALLED_APPS中简单的 'booking’,而是 'booking.app.BookingConfig'

芹菜不明白这意味着什么,所以我添加,INSTALLED_APPS_WITH_APPCONFIGS = ('booking',)到我的 django 设置,并修改我的 celery.py

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

app.autodiscover_tasks(
lambda: settings.INSTALLED_APPS + settings.INSTALLED_APPS_WITH_APPCONFIGS
)

我发现我们的一个程序员在其中一个导入中添加了以下代码行:

os.chdir(<path_to_a_local_folder>)

这导致芹菜工人将其工作目录从项目的默认工作目录(可以在其中找到任务)更改为另一个目录(无法在其中找到任务)。

删除此行代码后,将找到并注册所有任务。

我在处理芹菜节奏的任务时也遇到了同样的问题。芹菜不喜欢相对导入,所以在我的 celeryconfig.py中,我必须显式地设置完整的包名:

app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'full.path.to.add',
'schedule': 30.0,
'args': (16, 16)
},
}

为了补充一下我对这个错误的看法。

我的路径是 /vagrant/devops/test,其中包含 app.py__init__.py

当我运行 cd /vagrant/devops/ && celery worker -A test.app.celery --loglevel=info时,我得到了这个错误。

但是当我像 cd /vagrant/devops/test && celery worker -A app.celery --loglevel=info一样运行它时,一切都没问题。

芹菜不支持相对导入,所以在我的 celeryconfig.py 中,需要绝对导入。

CELERYBEAT_SCHEDULE = {
'add_num': {
'task': 'app.tasks.add_num.add_nums',
'schedule': timedelta(seconds=10),
'args': (1, 2)
}
}

一个真正有用的列表的附加项目。

我发现芹菜对于任务中的错误是无情的(或者至少我还没有能够追踪到适当的日志条目) ,而且它不会注册它们。在运行芹菜作为服务时,我遇到了许多问题,主要是与权限相关的问题。

与写入日志文件的权限有关的最新信息。我在开发或在命令行运行芹菜方面没有任何问题,但是服务报告任务为未注册。

我需要更改日志文件夹权限,以使服务能够对其进行写操作。

我的两分钱

我用阿尔卑斯山拍的码头照片。Django 设置引用 /dev/log来记录 syslog。Django 应用程序和芹菜工人都是基于同一个图像。Django 应用程序映像的入口点在启动时启动 syslogd,但芹菜工人的入口点没有启动。这会导致像 ./manage.py shell这样的事情失败,因为不会有任何 /dev/log。芹菜工人没有失败。相反,它默默地忽略了应用程序启动的其余部分,其中包括从 django 项目中的应用程序加载 shared_task条目

我与 姜戈没有任何问题。但是我在使用 酒瓶时遇到了这个问题。解决方案是设置配置选项。

celery worker -A app.celery --loglevel=DEBUG --config=settings

和姜戈在一起的时候,我只有:

python manage.py celery worker -c 2 --loglevel=info

在我的例子中,错误是因为一个容器在一个文件夹中创建了文件,这些文件用 docker-compose 挂载在主机文件系统上。

我只需要删除主机系统上容器创建的文件,就可以再次启动我的项目了。

Sudo rm-Rf 文件夹名

(我必须使用 sudo,因为这些文件属于 root 用户)

Docker 版本: 18.03.1

如果您使用 autodiscover_tasks,请确保您要注册的 functions保留在 tasks.py中,而不是其他任何文件。或者芹菜找不到你想注册的 functions

使用 app.register_task也会做这项工作,但似乎有点天真。

请参阅 autodiscover_tasks的正式规格。

def autodiscover_tasks(self, packages=None, related_name='tasks', force=False):
"""Auto-discover task modules.


Searches a list of packages for a "tasks.py" module (or use
related_name argument).


If the name is empty, this will be delegated to fix-ups (e.g., Django).


For example if you have a directory layout like this:


.. code-block:: text


foo/__init__.py
tasks.py
models.py


bar/__init__.py
tasks.py
models.py


baz/__init__.py
models.py


Then calling ``app.autodiscover_tasks(['foo', bar', 'baz'])`` will
result in the modules ``foo.tasks`` and ``bar.tasks`` being imported.


Arguments:
packages (List[str]): List of packages to search.
This argument may also be a callable, in which case the
value returned is used (for lazy evaluation).
related_name (str): The name of the module to find.  Defaults
to "tasks": meaning "look for 'module.tasks' for every
module in ``packages``."
force (bool): By default this call is lazy so that the actual
auto-discovery won't happen until an application imports
the default modules.  Forcing will cause the auto-discovery
to happen immediately.
"""

对我来说,有效的方法是给芹菜任务装饰器添加明确的名称。我将任务声明从 @app.tasks更改为 @app.tasks(name='module.submodule.task')

这里有一个例子

一开始我的任务是:

# tasks/test_tasks.py
@celery.task
def test_task():
print("Celery Task  !!!!")

我改成了:

# tasks/test_tasks.py
@celery.task(name='tasks.test_tasks.test_task')
def test_task():
print("Celery Task  !!!!")

如果没有专用的 tasks.py 文件将其包含在芹菜配置中,则此方法非常有用。

app = Celery('proj',
broker='amqp://',
backend='amqp://',
include=['proj.tasks'])

请包括 = [‘ proj.asks’] 您需要转到 top 目录,然后执行此操作

celery -A app.celery_module.celeryapp worker --loglevel=info

没有

celery -A celeryapp worker --loglevel=info

在 celeryconfig.py 输入导入 = (“ path.path.asks”,)

请在其他模块中调用任务!

奇怪的是,这也可能是因为缺少一个软件包。运行 pip 来安装所有必要的软件包: pip install -r requirements.txt

autodiscover_tasks没有接收使用丢失包的任务。

写入文件任务的正确路径

app.conf.beat_schedule = {
'send-task': {
'task': 'appdir.tasks.testapp',
'schedule': crontab(minute='*/5'),
},

}

当运行芹菜与“芹菜-一个 conf 工作者-l 信息”命令所有的任务都得到了像我一样的日志列表 任务 我得到这个错误是因为我没有给出确切的任务路径。 因此,请通过复制和粘贴确切的任务 ID 来重新检查这一点。

app = Celery(__name__, broker=app.config['CELERY_BROKER'],
backend=app.config['CELERY_BACKEND'], include=['util.xxxx', 'util.yyyy'])

您的问题的答案在您在问题中提供的输出的第一行: < code >/usr/local/lib/python2.7/dist-packages/celery/loader/default.py: 64: NotConfig: 没有找到‘ celeryconfig’模块!请确保它存在,并且 Python 可以使用它。 (conigname,)) 。如果没有正确的配置,Celery 将无法执行任何操作

它找不到 celeryconfig 的原因很可能不在您的 PYTHONPATH 中。

我已经解决了我的问题,我的’任务’是在一个名为’芹菜 _ 任务’的 Python 包,当我退出这个包,并运行命令 celery worker -A celery_task.task --loglevel=info。有用。

如果你像下面这样在已安装的应用程序中使用应用程序配置:

LOCAL_APPS = [
'apps.myapp.apps.MyAppConfig']

然后在你的配置应用程序中,像这样以 ready 方法导入任务:

from django.apps import AppConfig


class MyAppConfig(AppConfig):
name = 'apps.myapp'


def ready(self):
try:
import apps.myapp.signals  # noqa F401
import apps.myapp.tasks
except ImportError:
pass

正如其他一些答案已经指出的那样,芹菜默默忽略任务的原因有很多,包括依赖性问题,以及任何语法或代码问题。

找到它们的一个快速方法是逃跑:

./manage.py check

很多时候,在修复了报告的错误之后,任务被芹菜识别出来。

是否包含了 tasks.py 文件或者异步方法的存储位置?

app = Celery('APP_NAME', broker='redis://redis:6379/0', include=['app1.tasks', 'app2.tasks', ...])

尝试在 Python Shell 中导入芹菜任务-芹菜可能由于导入语句错误而无法注册任务。

我的 tasks.py 文件中有一个 ImportError异常,导致芹菜没有注册模块中的任务。正确注册了所有其他模块任务。

直到我尝试在 Python Shell 中导入芹菜任务时,这个错误才显而易见。我修复了错误的导入语句,然后成功地注册了任务。

这解决了我的问题(把它放在 create _ app ()函数中) :

celery.conf.update(app.config)


class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)


celery.Task = ContextTask

在我的案例中,问题是,我的项目没有正确地选择 autodiscover_tasks

celery.py文件中,获取 autodiscover_tasks的代码是:

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

我把它改成了下面这个:

from django.apps import apps
app.autodiscover_tasks(lambda: [n.name for n in apps.get_app_configs()])

祝你好运。

如果你正在使用 Docker,就像我说的那样,@给你会杀死你的痛苦。

docker stop $(docker ps -a -q)

对我来说,错误的任务名称已经被芹菜节拍坚持了下来... ... 对我来说还足够早,以至于我可以用核武器摧毁一切。

当我使用 python app.py运行服务器时,Flask 也出现了同样的错误。为了解决这个问题,我使用 flask run运行服务器

搜索了一整天后,终于在清理了.pyc 文件后开始工作

py3clean .

对我来说,重启经纪人(Redis)解决了这个问题。


任务已经正确显示在芹菜的任务列表和所有相关的 Django 设置和导入工作正常。

在我写任务之前,我的经纪人就已经在运行了,仅仅重启芹菜和姜戈并不能解决问题。

然而,使用 Ctrl + C 停止 Redis,然后使用 redis-server重新启动 Redis 可以帮助芹菜正确识别任务。