contrib.testing.mocks

Useful mocks for unit testing.

Module Contents

Functions

TaskMessage(name,id=None,args=tuple,kwargs=dict,callbacks=None,errbacks=None,chain=None,shadow=None,utc=None,**options) Create task message in protocol 2 format.
TaskMessage1(name,id=None,args=tuple,kwargs=dict,callbacks=None,errbacks=None,chain=None,**options) Create task message in protocol 1 format.
task_message_from_sig(app,sig,utc=True,TaskMessage=TaskMessage) Create task message from celery.Signature.
TaskMessage(name, id=None, args=tuple, kwargs=dict, callbacks=None, errbacks=None, chain=None, shadow=None, utc=None, **options)

Create task message in protocol 2 format.

TaskMessage1(name, id=None, args=tuple, kwargs=dict, callbacks=None, errbacks=None, chain=None, **options)

Create task message in protocol 1 format.

task_message_from_sig(app, sig, utc=True, TaskMessage=TaskMessage)

Create task message from celery.Signature.

Example:
>>> m = task_message_from_sig(app, add.s(2, 2))
>>> amqp_client.basic_publish(m, exchange='ex', routing_key='rkey')