canvas

Composing task work-flows.

Module Contents

Classes

Signature(self,task=None,args=None,kwargs=None,options=None,type=None,subtask_type=None,immutable=False,app=None,**ex) Task Signature.
_chain(self,*tasks,**options)
chain() Chain tasks together.
_basemap(self,task,it,**options)
xmap() Map operation for tasks.
xstarmap() Map operation for tasks, using star arguments.
chunks(self,task,it,n,**options) Partition of tasks in n chunks.
group(self,*tasks,**options) Creates a group of tasks to be executed in parallel.
chord(self,header,body=None,task=”celery.chord”,args=tuple,kwargs=dict,app=None,**options) rBarrier synchronization primitive.

Functions

maybe_unroll_group(g) Unroll group with only one member.
task_name_from(task)
_upgrade(fields,sig) Used by custom signatures in .from_dict, to keep common fields.
_maybe_group(tasks,app)
signature(varies,*args,**kwargs) Create new signature.
maybe_signature(d,app=None,clone=False) Ensure obj is a signature, or None.
maybe_unroll_group(g)

Unroll group with only one member.

task_name_from(task)
_upgrade(fields, sig)

Used by custom signatures in .from_dict, to keep common fields.

class Signature(task=None, args=None, kwargs=None, options=None, type=None, subtask_type=None, immutable=False, app=None, **ex)

Task Signature.

Class that wraps the arguments and execution options for a single task invocation.

Used as the parts in a group and other constructs, or to pass tasks around as callbacks while being compatible with serializers with a strict type subset.

Signatures can also be created from tasks:

  • Using the .signature() method that has the same signature as Task.apply_async:

    >>> add.signature(args=(1,), kwargs={'kw': 2}, options={})
    
  • or the .s() shortcut that works for star arguments:

    >>> add.s(1, kw=2)
    
  • the .s() shortcut does not allow you to specify execution options but there’s a chaning .set method that returns the signature:

    >>> add.s(2, 2).set(countdown=10).set(expires=30).delay()
    
Note:
You should use signature() to create new signatures. The Signature class is the type returned by that function and should be used for isinstance checks for signatures.
See Also:
guide-canvas for the complete guide.
Arguments:
task (Task, str): Either a task class/instance, or the name of a task. args (Tuple): Positional arguments to apply. kwargs (Dict): Keyword arguments to apply. options (Dict): Additional options to Task.apply_async().
Note:

If the first argument is a dict, the other arguments will be ignored and the values in the dict will be used instead:

>>> s = signature('tasks.add', args=(2, 2))
>>> signature(s)
{'task': 'tasks.add', args=(2, 2), kwargs={}, options={}}
register_type(name=None)
from_dict(d, app=None)
__init__(task=None, args=None, kwargs=None, options=None, type=None, subtask_type=None, immutable=False, app=None, **ex)
__call__(*partial_args, **partial_kwargs)

Call the task directly (in the current process).

delay(*partial_args, **partial_kwargs)

Shortcut to apply_async() using star arguments.

apply(args=tuple, kwargs=dict, **options)

Call task locally.

Same as apply_async() but executed the task inline instead of sending a task message.

apply_async(args=tuple, kwargs=dict, route_name=None, **options)

Apply this task asynchronously.

Arguments:

args (Tuple): Partial args to be prepended to the existing args. kwargs (Dict): Partial kwargs to be merged with existing kwargs. options (Dict): Partial options to be merged

with existing options.
Returns:
~@AsyncResult: promise of future evaluation.
See also:
apply_async() and the guide-calling guide.
_merge(args=tuple, kwargs=dict, options=dict, force=False)
clone(args=tuple, kwargs=dict, **opts)

Create a copy of this signature.

Arguments:

args (Tuple): Partial args to be prepended to the existing args. kwargs (Dict): Partial kwargs to be merged with existing kwargs. options (Dict): Partial options to be merged with

existing options.
freeze(_id=None, group_id=None, chord=None, root_id=None, parent_id=None)

Finalize the signature by adding a concrete task id.

The task won’t be called and you shouldn’t call the signature twice after freezing it as that’ll result in two task messages using the same task id.

Returns:
~@AsyncResult: promise of future evaluation.
replace(args=None, kwargs=None, options=None)

Replace the args, kwargs or options set for this signature.

These are only replaced if the argument for the section is not None.

set(immutable=None, **options)

Set arbitrary execution options (same as .options.update(…)).

Returns:
Signature: This is a chaining method call
(i.e., it will return self).
set_immutable(immutable)
_with_list_option(key)
append_to_list_option(key, value)
extend_list_option(key, value)

Add callback task to be applied if this task succeeds.

Returns:
Signature: the argument passed, for chaining
or use with reduce().

Add callback task to be applied on error in task execution.

Returns:
Signature: the argument passed, for chaining
or use with reduce().
on_error(errback)

Version of link_error() that supports chaining.

on_error chains the original signature, not the errback so:

>>> add.s(2, 2).on_error(errback.s()).delay()

calls the add task, not the errback task, but the reverse is true for link_error().

Return a recursive list of dependencies.

“unchain” if you will, but with links intact.

__or__(other)
election()
reprcall(*args, **kwargs)
__deepcopy__(memo)
__invert__()
__reduce__()
__json__()
__repr__()
items()
name()
type()
app()
AsyncResult()
_apply_async()
class _chain(*tasks, **options)
from_dict(d, app=None)
__init__(*tasks, **options)
__call__(*args, **kwargs)
clone(*args, **kwargs)
unchain_tasks()
apply_async(args=tuple, kwargs=dict, **options)
run(args=tuple, kwargs=dict, group_id=None, chord=None, task_id=None, link=None, link_error=None, publisher=None, producer=None, root_id=None, parent_id=None, app=None, **options)
freeze(_id=None, group_id=None, chord=None, root_id=None, parent_id=None)
prepare_steps(args, kwargs, tasks, root_id=None, parent_id=None, link_error=None, app=None, last_task_id=None, group_id=None, chord_body=None, clone=True, from_dict=None)
apply(args=tuple, kwargs=dict, **options)
app()
__repr__()
class chain

Chain tasks together.

Each tasks follows one another, by being applied as a callback of the previous task.

Note:
If called with only one argument, then that argument must be an iterable of tasks to chain: this allows us to use generator expressions.
Example:

This is effectively :

>>> res = chain(add.s(2, 2), add.s(4))()
>>> res.get()
8

Calling a chain will return the result of the last task in the chain. You can get to the other tasks by following the result.parent’s:

>>> res.parent.get()
4

Using a generator expression:

>>> lazy_chain = chain(add.s(i) for i in range(10))
>>> res = lazy_chain(3)
Arguments:
*tasks (Signature): List of task signatures to chain.
If only one argument is passed and that argument is an iterable, then that’ll be used as the list of signatures to chain instead. This means that you can use a generator expression.
Returns:
~celery.chain: A lazy signature that can be called to apply the first
task in the chain. When that task succeeed the next task in the chain is applied, and so on.
__new__(*tasks, **kwargs)
class _basemap(task, it, **options)
from_dict(d, app=None)
__init__(task, it, **options)
apply_async(args=tuple, kwargs=dict, **opts)
class xmap

Map operation for tasks.

Note:
Tasks executed sequentially in process, this is not a parallel operation like group.
__repr__()
class xstarmap

Map operation for tasks, using star arguments.

__repr__()
class chunks(task, it, n, **options)

Partition of tasks in n chunks.

from_dict(d, app=None)
__init__(task, it, n, **options)
__call__(**options)
apply_async(args=tuple, kwargs=dict, **opts)
group()
apply_chunks(task, it, n, app=None)
_maybe_group(tasks, app)
class group(*tasks, **options)

Creates a group of tasks to be executed in parallel.

A group is lazy so you must call it to take action and evaluate the group.

Note:
If only one argument is passed, and that argument is an iterable then that’ll be used as the list of tasks instead: this allows us to use group with generator expressions.
Example:
>>> lazy_group = group([add.s(2, 2), add.s(4, 4)])
>>> promise = lazy_group()  # <-- evaluate: returns lazy result.
>>> promise.get()  # <-- will wait for the task to return
[4, 8]
Arguments:
*tasks (List[Signature]): A list of signatures that this group will
call. If there’s only one argument, and that argument is an iterable, then that’ll define the list of signatures instead.
**options (Any): Execution options applied to all tasks
in the group.
Returns:
~celery.group: signature that when called will then call all of the
tasks in the group (and return a GroupResult instance that can be used to inspect the state of the group).
from_dict(d, app=None)
__init__(*tasks, **options)
__call__(*partial_args, **options)
skew(start=1.0, stop=None, step=1.0)
apply_async(args=tuple, kwargs=None, add_to_parent=True, producer=None, link=None, link_error=None, **options)
apply(args=tuple, kwargs=dict, **options)
set_immutable(immutable)
_prepared(tasks, partial_args, group_id, root_id, app, CallableSignature=None, from_dict=None, isinstance=isinstance, tuple=tuple)
_apply_tasks(tasks, producer=None, app=None, p=None, add_to_parent=None, chord=None, args=None, kwargs=None, **options)
_freeze_gid(options)
freeze(_id=None, group_id=None, chord=None, root_id=None, parent_id=None)
_freeze_unroll(new_tasks, group_id, chord, root_id, parent_id)
__repr__()
__len__()
app()
class chord(header, body=None, task="celery.chord", args=tuple, kwargs=dict, app=None, **options)

rBarrier synchronization primitive.

A chord consists of a header and a body.

The header is a group of tasks that must complete before the callback is called. A chord is essentially a callback for a group of tasks.

The body is applied with the return values of all the header tasks as a list.

Example:

The chord:

>>> res = chord([add.s(2, 2), add.s(4, 4)])(sum_task.s())

is effectively :

>>> res.get()
12
from_dict(d, app=None)
_unpack_args(body=None, **kwargs)
__init__(header, body=None, task="celery.chord", args=tuple, kwargs=dict, app=None, **options)
__call__(body=None, **options)
freeze(_id=None, group_id=None, chord=None, root_id=None, parent_id=None)
apply_async(args=tuple, kwargs=dict, task_id=None, producer=None, publisher=None, connection=None, router=None, result_cls=None, **options)
apply(args=tuple, kwargs=dict, propagate=True, body=None, **options)
_traverse_tasks(tasks, value=None)
__length_hint__()
run(header, body, partial_args, app=None, interval=None, countdown=1, max_retries=None, eager=False, task_id=None, **options)
clone(*args, **kwargs)
set_immutable(immutable)
__repr__()
app()
_get_app(body=None)
signature(varies, *args, **kwargs)

Create new signature.

  • if the first argument is a signature already then it’s cloned.
  • if the first argument is a dict, then a Signature version is returned.
Returns:
Signature: The resulting signature.
maybe_signature(d, app=None, clone=False)

Ensure obj is a signature, or None.

Arguments:
d (Optional[Union[abstract.CallableSignature, Mapping]]):
Signature or dict-serialized signature.
app (celery.Celery):
App to bind signature to.
clone (bool):
If d’ is already a signature, the signature

will be cloned when this flag is enabled.

Returns:
Optional[abstract.CallableSignature]