contrib.migrate

Message migration tools (Broker <-> Broker).

Module Contents

Classes

StopFiltering() Semi-predicate used to signal filter stop.
State() Migration progress state.
Filterer(self,app,conn,filter,limit=None,timeout=1.0,ack_messages=False,tasks=None,queues=None,callback=None,forever=False,on_declare_queue=None,consume_from=None,state=None,accept=None,**kwargs)

Functions

republish(producer,message,exchange=None,routing_key=None,remove_props=list) Republish message.
migrate_task(producer,body_,message,queues=None) Migrate single task message.
filter_callback(callback,tasks)
migrate_tasks(source,dest,migrate=migrate_task,app=None,queues=None,**kwargs) Migrate tasks from one broker to another.
_maybe_queue(app,q)
move(predicate,connection=None,exchange=None,routing_key=None,source=None,app=None,callback=None,limit=None,transform=None,**kwargs) Find tasks by filtering them and move the tasks to a new queue.
expand_dest(ret,exchange,routing_key)
task_id_eq(task_id,body,message) Return true if task id equals task_id’.
task_id_in(ids,body,message) Return true if task id is member of set ids’.
prepare_queues(queues)
start_filter(app,conn,filter,limit=None,timeout=1.0,ack_messages=False,tasks=None,queues=None,callback=None,forever=False,on_declare_queue=None,consume_from=None,state=None,accept=None,**kwargs) Filter tasks.
move_task_by_id(task_id,dest,**kwargs) Find a task by id and move it to another queue.
move_by_idmap(map,**kwargs) Move tasks by matching from a task_id: queue mapping.
move_by_taskmap(map,**kwargs) Move tasks by matching from a task_name: queue mapping.
filter_status(state,body,message,**kwargs)
class StopFiltering

Semi-predicate used to signal filter stop.

class State

Migration progress state.

strtotal()
__repr__()
republish(producer, message, exchange=None, routing_key=None, remove_props=list)

Republish message.

migrate_task(producer, body_, message, queues=None)

Migrate single task message.

filter_callback(callback, tasks)
migrate_tasks(source, dest, migrate=migrate_task, app=None, queues=None, **kwargs)

Migrate tasks from one broker to another.

_maybe_queue(app, q)
move(predicate, connection=None, exchange=None, routing_key=None, source=None, app=None, callback=None, limit=None, transform=None, **kwargs)

Find tasks by filtering them and move the tasks to a new queue.

Arguments:
predicate (Callable): Filter function used to decide the messages

to move. Must accept the standard signature of (body, message) used by Kombu consumer callbacks. If the predicate wants the message to be moved it must return either:

  1. a tuple of (exchange, routing_key), or
  2. a Queue instance, or
  3. any other true value means the specified
    exchange and routing_key arguments will be used.

connection (kombu.Connection): Custom connection to use. source: List[Union[str, kombu.Queue]]: Optional list of source

queues to use instead of the default (queues in :setting:`task_queues`). This list can also contain Queue instances.

exchange (str, kombu.Exchange): Default destination exchange. routing_key (str): Default destination routing key. limit (int): Limit number of messages to filter. callback (Callable): Callback called after message moved,

with signature (state, body, message).
transform (Callable): Optional function to transform the return
value (destination) of the filter function.

Also supports the same keyword arguments as start_filter().

To demonstrate, the move_task_by_id() operation can be implemented like this:

def is_wanted_task(body, message):
    if body['id'] == wanted_id:
        return Queue('foo', exchange=Exchange('foo'),
                     routing_key='foo')

move(is_wanted_task)

or with a transform:

def transform(value):
    if isinstance(value, string_t):
        return Queue(value, Exchange(value), value)
    return value

move(is_wanted_task, transform=transform)
Note:
The predicate may also return a tuple of (exchange, routing_key) to specify the destination to where the task should be moved, or a Queue instance. Any other true value means that the task will be moved to the default exchange/routing_key.
expand_dest(ret, exchange, routing_key)
task_id_eq(task_id, body, message)

Return true if task id equals task_id’.

task_id_in(ids, body, message)

Return true if task id is member of set ids’.

prepare_queues(queues)
class Filterer(app, conn, filter, limit=None, timeout=1.0, ack_messages=False, tasks=None, queues=None, callback=None, forever=False, on_declare_queue=None, consume_from=None, state=None, accept=None, **kwargs)
__init__(app, conn, filter, limit=None, timeout=1.0, ack_messages=False, tasks=None, queues=None, callback=None, forever=False, on_declare_queue=None, consume_from=None, state=None, accept=None, **kwargs)
start()
update_state(body, message)
ack_message(body, message)
create_consumer()
prepare_consumer(consumer)
declare_queues(consumer)
start_filter(app, conn, filter, limit=None, timeout=1.0, ack_messages=False, tasks=None, queues=None, callback=None, forever=False, on_declare_queue=None, consume_from=None, state=None, accept=None, **kwargs)

Filter tasks.

move_task_by_id(task_id, dest, **kwargs)

Find a task by id and move it to another queue.

Arguments:

task_id (str): Id of task to find and move. dest: (str, kombu.Queue): Destination queue. **kwargs (Any): Also supports the same keyword

arguments as move().
move_by_idmap(map, **kwargs)

Move tasks by matching from a task_id: queue mapping.

Where queue is a queue to move the task to.

Example:
>>> move_by_idmap({
...     '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue('name'),
...     'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue('name'),
...     '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue('name')},
...   queues=['hipri'])
move_by_taskmap(map, **kwargs)

Move tasks by matching from a task_name: queue mapping.

queue is the queue to move the task to.

Example:
>>> move_by_taskmap({
...     'tasks.add': Queue('name'),
...     'tasks.mul': Queue('name'),
... })
filter_status(state, body, message, **kwargs)