worker.consumer.consumer

Worker Consumer Blueprint.

This module contains the components responsible for consuming messages from the broker, processing the messages and keeping the broker connections up and running.

Module Contents

Classes

Consumer(self,on_task_request,init_callback=noop,hostname=None,pool=None,app=None,timer=None,controller=None,hub=None,amqheartbeat=None,worker_options=None,disable_rate_limits=False,initial_prefetch_count=2,prefetch_multiplier=1,**kwargs) Consumer blueprint.
Evloop() Event loop service.

Functions

dump_body(m,body) Format message body for debugging purposes.
dump_body(m, body)

Format message body for debugging purposes.

class Consumer(on_task_request, init_callback=noop, hostname=None, pool=None, app=None, timer=None, controller=None, hub=None, amqheartbeat=None, worker_options=None, disable_rate_limits=False, initial_prefetch_count=2, prefetch_multiplier=1, **kwargs)

Consumer blueprint.

class Blueprint

Consumer blueprint.

shutdown(parent)
__init__(on_task_request, init_callback=noop, hostname=None, pool=None, app=None, timer=None, controller=None, hub=None, amqheartbeat=None, worker_options=None, disable_rate_limits=False, initial_prefetch_count=2, prefetch_multiplier=1, **kwargs)
call_soon(p, *args, **kwargs)
perform_pending_operations()
bucket_for_task(type)
reset_rate_limits()
_update_prefetch_count(index=0)

Update prefetch count after pool/shrink grow operations.

Index must be the change in number of processes as a positive (increasing) or negative (decreasing) number.

Note:
Currently pool grow operations will end up with an offset of +1 if the initial size of the pool was 0 (e.g. --autoscale=1,0).
_update_qos_eventually(index)
_limit_move_to_pool(request)
_schedule_bucket_request(bucket)
_limit_task(request, bucket, tokens)
_limit_post_eta(request, bucket, tokens)
start()
on_connection_error_before_connected(exc)
on_connection_error_after_connected(exc)
register_with_event_loop(hub)
shutdown()
stop()
on_ready()
loop_args()
on_decode_error(message, exc)

Callback called if an error occurs while decoding a message.

Simply logs the error and acknowledges the message so it doesn’t enter a loop.

Arguments:
message (kombu.Message): The message received. exc (Exception): The exception being handled.
on_close()
connect()

Establish the broker connection used for consuming tasks.

Retries establishing the connection if the :setting:`broker_connection_retry` setting is enabled

connection_for_read(heartbeat=None)
connection_for_write(heartbeat=None)
ensure_connected(conn)
_flush_events()
on_send_event_buffered()
add_task_queue(queue, exchange=None, exchange_type=None, routing_key=None, **options)
cancel_task_queue(queue)
apply_eta_task(task)

Method called by the timer to apply a task with an ETA/countdown.

_message_report(body, message)
on_unknown_message(body, message)
on_unknown_task(body, message, exc)
on_invalid_task(body, message, exc)
update_strategies()
create_task_handler(promise=promise)
__repr__()

repr(self).

class Evloop

Event loop service.

Note:
This is always started last.
start(c)
patch_all(c)