This document describes the current stable version of Celery (5.0). For development docs, go here.
celery.contrib.testing.mocks¶
API Reference¶
Useful mocks for unit testing.
-
celery.contrib.testing.mocks.TaskMessage(name: str, id: str = None, args: Sequence = (), kwargs: Mapping = None, callbacks: Sequence[Signature] = None, errbacks: Sequence[Signature] = None, chain: Sequence[Signature] = None, shadow: str = None, utc: bool = None, **options: Any) → Any[source]¶ Create task message in protocol 2 format.
-
celery.contrib.testing.mocks.TaskMessage1(name: str, id: str = None, args: Sequence = (), kwargs: Mapping = None, callbacks: Sequence[Signature] = None, errbacks: Sequence[Signature] = None, chain: Squence[Signature] = None, **options: Any) → Any[source]¶ Create task message in protocol 1 format.
-
celery.contrib.testing.mocks.task_message_from_sig(app: Celery, sig: Signature, utc: bool = True, TaskMessage: Any = <function TaskMessage>) → Any[source]¶ Create task message from
celery.Signature.Example
>>> m = task_message_from_sig(app, add.s(2, 2)) >>> amqp_client.basic_publish(m, exchange='ex', routing_key='rkey')