events.state

In-memory representation of cluster state.

This module implements a data-structure used to keep track of the state of a cluster of workers and the tasks it is working on (by consuming events).

For every event consumed the state is updated, so the state represents the state of the cluster at the time of the last event.

Snapshots (celery.events.snapshot) can be used to take “pictures” of this state at regular intervals to for example, store that in a database.

Module Contents

Classes

CallableDefaultdict(self,fun,*args,**kwargs) defaultdict with configurable __call__.
Worker(self,hostname=None,pid=None,freq=60,heartbeats=None,clock=0,active=None,processed=None,loadavg=None,sw_ident=None,sw_ver=None,sw_sys=None) Worker State.
Task(self,uuid=None,cluster_state=None,children=None,**kwargs) Task State.
State(self,callback=None,workers=None,tasks=None,taskheap=None,max_workers_in_memory=5000,max_tasks_in_memory=10000,on_node_join=None,on_node_leave=None,tasks_by_type=None,tasks_by_worker=None) Records clusters state.

Functions

_warn_drift(hostname,drift,local_received,timestamp)
heartbeat_expires(timestamp,freq=60,expire_window=HEARTBEAT_EXPIRE_WINDOW,Decimal=Decimal,float=float,isinstance=isinstance) Return time when heartbeat expires.
_depickle_task(cls,fields)
with_unique_field(attr)
_serialize_Task_WeakSet_Mapping(mapping)
_deserialize_Task_WeakSet_Mapping(mapping,tasks)
class CallableDefaultdict(fun, *args, **kwargs)

defaultdict with configurable __call__.

We use this for backwards compatibility in State.tasks_by_type etc, which used to be a method but is now an index instead.

So you can do:

>>> add_tasks = state.tasks_by_type['proj.tasks.add']

while still supporting the method call:

>>> add_tasks = list(state.tasks_by_type(
...     'proj.tasks.add', reverse=True))
__init__(fun, *args, **kwargs)
__call__(*args, **kwargs)
_warn_drift(hostname, drift, local_received, timestamp)
heartbeat_expires(timestamp, freq=60, expire_window=HEARTBEAT_EXPIRE_WINDOW, Decimal=Decimal, float=float, isinstance=isinstance)

Return time when heartbeat expires.

_depickle_task(cls, fields)
with_unique_field(attr)
class Worker(hostname=None, pid=None, freq=60, heartbeats=None, clock=0, active=None, processed=None, loadavg=None, sw_ident=None, sw_ver=None, sw_sys=None)

Worker State.

__init__(hostname=None, pid=None, freq=60, heartbeats=None, clock=0, active=None, processed=None, loadavg=None, sw_ident=None, sw_ver=None, sw_sys=None)
__reduce__()
_create_event_handler()
update(f, **kw)
__repr__()
status_string()
heartbeat_expires()
alive(nowfun=time)
id()
class Task(uuid=None, cluster_state=None, children=None, **kwargs)

Task State.

__init__(uuid=None, cluster_state=None, children=None, **kwargs)
event(type_, timestamp=None, local_received=None, fields=None, precedence=None, items=items, setattr=setattr, task_event_to_state=None, RETRY=None)
info(fields=None, extra=list)

Information about this task suitable for on-screen display.

__repr__()
as_dict()
_serializable_children(value)
_serializable_root(value)
_serializable_parent(value)
__reduce__()
id()
origin()
ready()
parent()
root()
class State(callback=None, workers=None, tasks=None, taskheap=None, max_workers_in_memory=5000, max_tasks_in_memory=10000, on_node_join=None, on_node_leave=None, tasks_by_type=None, tasks_by_worker=None)

Records clusters state.

__init__(callback=None, workers=None, tasks=None, taskheap=None, max_workers_in_memory=5000, max_tasks_in_memory=10000, on_node_join=None, on_node_leave=None, tasks_by_type=None, tasks_by_worker=None)
_event()
freeze_while(fun, *args, **kwargs)
clear_tasks(ready=True)
_clear_tasks(ready=True)
_clear(ready=True)
clear(ready=True)
get_or_create_worker(hostname, **kwargs)

Get or create worker by hostname.

Returns:
Tuple: of (worker, was_created) pairs.
get_or_create_task(uuid)

Get or create task by uuid.

event(event)
task_event(type_, fields)

Deprecated, use event().

worker_event(type_, fields)

Deprecated, use event().

_create_dispatcher()
_add_pending_task_child(task)
rebuild_taskheap(timetuple=timetuple)
itertasks(limit=None)
tasks_by_time(limit=None, reverse=True)

Generator yielding tasks ordered by time.

Yields:
Tuples of (uuid, Task).
_tasks_by_type(name, limit=None, reverse=True)

Get all tasks by type.

This is slower than accessing tasks_by_type, but will be ordered by time.

Returns:
Generator: giving (uuid, Task) pairs.
_tasks_by_worker(hostname, limit=None, reverse=True)

Get all tasks by worker.

Slower than accessing tasks_by_worker, but ordered by time.

task_types()

Return a list of all seen task types.

alive_workers()

Return a list of (seemingly) alive workers.

__repr__()
__reduce__()
_serialize_Task_WeakSet_Mapping(mapping)
_deserialize_Task_WeakSet_Mapping(mapping, tasks)