如何对芹菜任务进行单元测试?

芹菜文档 提到在姜戈境内测试芹菜没有解释如何在不使用 Django 的情况下测试芹菜任务。你是怎么做到的?

77941 次浏览

取决于你到底想测试什么。

  • 直接测试任务代码。不要调用“ task.late (...)”,只要从单元测试中调用“ task (...)”即可。
  • 使用 芹菜,永远,渴望。这将导致您的任务在您说“ task.late (...)”时立即被调用,因此您可以测试整个路径(但不是任何异步行为)。

可以使用任何单元测试库同步测试任务。在处理芹菜任务时,我通常会做两个不同的测试。第一个(正如我在下面建议的那样)是完全同步的,应该是确保算法完成其应该完成的任务的那个。第二个会话使用整个系统(包括代理) ,并确保我没有出现序列化问题或任何其他分发、通信问题。

所以:

from celery import Celery


celery = Celery()


@celery.task
def add(x, y):
return x + y

还有你的测试:

from nose.tools import eq_


def test_add_task():
rst = add.apply(args=(4, 4)).get()
eq_(rst, 8)

希望能帮上忙!

以下是我七年前的答案:

您可以通过 pytest fixture在单独的线程中运行 worker:

Https://docs.celeryq.dev/en/v5.2.6/userguide/testing.html#celery-worker-embed-live-worker

根据文件,你不应该使用 "always_eager"(见上面链接页面的顶部)。


老答案:

我用这个:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
...

医生: https://docs.celeryq.dev/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER允许您同步运行任务,并且不需要芹菜服务器。

至于 芹菜3.0,在 姜戈中设置 CELERY_ALWAYS_EAGER的一种方法是:

from django.test import TestCase, override_settings


from .foo import foo_celery_task


class MyTest(TestCase):


@override_settings(CELERY_ALWAYS_EAGER=True)
def test_foo(self):
self.assertTrue(foo_celery_task.delay())

团结一致

import unittest


from myproject.myapp import celeryapp


class TestMyCeleryWorker(unittest.TestCase):


def setUp(self):
celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

测试装置

# conftest.py
from myproject.myapp import celeryapp


@pytest.fixture(scope='module')
def celery_app(request):
celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
return celeryapp


# test_tasks.py
def test_some_task(celery_app):
...

附录: 使 send _ task 尊重渴望

from celery import current_app


def send_task(name, args=(), kwargs={}, **opts):
# https://github.com/celery/celery/issues/581
task = current_app.tasks[name]
return task.apply(args, kwargs, **opts)


current_app.send_task = send_task

对于卖芹菜4的人来说,它是:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)

由于设置名称已更改,如果选择升级,则需要更新,请参见

Https://docs.celeryproject.org/en/latest/history/whatsnew-4.0.html?highlight=what%20is%20new#lowercase-setting-names

在我的例子中(我假设还有许多其他的例子) ,我所需要的只是使用 pytest 测试任务的内部逻辑。

结果嘲笑了一切(选择二)


示例用例 :

proj/tasks.py

@shared_task(bind=True)
def add_task(self, a, b):
return a+b;

tests/test_tasks.py

from proj import add_task


def test_add():
assert add_task(1, 2) == 3, '1 + 2 should equal 3'

但是,因为 shared_task装饰器做了很多芹菜内部逻辑,所以它不是一个真正的单元测试。

所以,对我来说,有两个选择:

选项1: 单独的内部逻辑

proj/tasks_logic.py

def internal_add(a, b):
return a + b;

proj/tasks.py

from .tasks_logic import internal_add


@shared_task(bind=True)
def add_task(self, a, b):
return internal_add(a, b);

这看起来非常奇怪,而且除了降低可读性之外,它还需要手动提取和传递属性,这些属性是请求的一部分,例如,如果您需要 task_id,则需要手动提取和传递属性,从而降低逻辑的纯度。

选项2: 模拟
嘲笑芹菜的内在

tests/__init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task


from mock import patch




def mock_signature(**kwargs):
return {}




def mocked_shared_task(*decorator_args, **decorator_kwargs):
def mocked_shared_decorator(func):
func.signature = func.si = func.s = mock_signature
return func


return mocked_shared_decorator


patch('celery.shared_task', mocked_shared_task).start()

然后,它允许我模拟请求对象(同样,如果您需要请求中的内容,比如 id 或重试计数器,也可以使用它)。

tests/test_tasks.py

from proj import add_task


class MockedRequest:
def __init__(self, id=None):
self.id = id or 1




class MockedTask:
def __init__(self, id=None):
self.request = MockedRequest(id=id)




def test_add():
mocked_task = MockedTask(id=3)
assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'

这个解决方案是更多的手动,但是,它给我的控制,我需要实际 单位测试,没有重复自己,没有失去芹菜范围。

因为芹菜 V4.0,py.test fixture 是 提供启动芹菜工人只是为了测试,并关闭时,完成:

def test_myfunc_is_executed(celery_session_worker):
# celery_session_worker: <Worker: gen93553@mymachine.local (running)>
assert myfunc.delay().wait(3)

http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test描述的其他装置中,你可以通过重新定义 celery_config装置来更改芹菜的默认选项:

@pytest.fixture(scope='session')
def celery_config():
return {
'accept_content': ['json', 'pickle'],
'result_serializer': 'pickle',
}

默认情况下,测试工作者使用内存中的代理和结果后端。如果没有测试特定的特性,就不需要使用本地 Redis 或 RabbitMQ。

参考文献 使用 pytest。

def test_add(celery_worker):
mytask.delay()

如果您使用烧瓶,设置应用程序配置

    CELERY_BROKER_URL = 'memory://'
CELERY_RESULT_BACKEND = 'cache+memory://'

以及 conftest.py

@pytest.fixture
def app():
yield app   # Your actual Flask application


@pytest.fixture
def celery_app(app):
from celery.contrib.testing import tasks   # need it
yield celery_app    # Your actual Flask-Celery application

我在单元测试方法中看到很多 CELERY_ALWAYS_EAGER = true作为单元测试的一种解决方案,但是由于5.0.5版本已经可用,有很多变化使得大多数旧的答案不再适用,对我来说这是一个浪费时间的废话,所以对于在这里搜索解决方案的每个人来说,去文档并阅读新版本中有详细文档记录的单元测试例子:

Https://docs.celeryproject.org/en/stable/userguide/testing.html

至于带有单元测试的渴望模式,这里引用了实际文档中的一段话:

急切模式

Task _ always _  临时设置启用的临时模式为 定义不适合单元测试。

当使用急切模式进行测试时,您只是在测试一个什么的模拟 发生在工人身上,这两者之间有很多不一致的地方 模仿和现实中发生的事情。

另一种选择是,如果不需要运行任务的副作用,则模拟该任务。

from unittest import mock




@mock.patch('module.module.task')
def test_name(self, mock_task): ...