concurrency.asynpool

Version of multiprocessing.Pool using Async I/O.

Note

This module will be moved soon, so don’t use it directly.

This is a non-blocking version of multiprocessing.Pool.

This code deals with three major challenges:

  1. Starting up child processes and keeping them running.
  2. Sending jobs to the processes and receiving results back.
  3. Safely shutting down this system.

Module Contents

Classes

Worker() Pool worker process.
ResultHandler(self,*args,**kwargs) Handles messages from the pool processes.
AsynPool(self,processes=None,synack=False,sched_strategy=None,*args,**kwargs) AsyncIO Pool (no threads).

Functions

unpack_from(fmt,iobuf,unpack=None)
__read__(fd,buf,size,read=None)
unpack_from(fmt,iobuf,unpack=None)
gen_not_started(gen) Return true if generator is not started.
_get_job_writer(job)
_select_imp(readers=None,writers=None,err=None,timeout=0)
_select_imp(readers=None,writers=None,err=None,timeout=0)
_select(readers=None,writers=None,err=None,timeout=0,poll=_select_imp) Simple wrapper to select, using :~select.poll.
unpack_from(fmt, view, _unpack_from=_unpack_from)
__read__(fd, buf, size, read=None)
unpack_from(fmt, iobuf, unpack=None)
gen_not_started(gen)

Return true if generator is not started.

_get_job_writer(job)
_select_imp(readers=None, writers=None, err=None, timeout=0, poll=None, POLLIN=None, POLLOUT=None, POLLERR=None)
_select_imp(readers=None, writers=None, err=None, timeout=0)
_select(readers=None, writers=None, err=None, timeout=0, poll=_select_imp)

Simple wrapper to select, using :~select.poll.

Arguments:
readers (Set[Fd]): Set of reader fds to test if readable. writers (Set[Fd]): Set of writer fds to test if writable. err (Set[Fd]): Set of fds to test for error condition.

All fd sets passed must be mutable as this function will remove non-working fds from them, this also means the caller must make sure there are still fds in the sets before calling us again.

Returns:
Tuple[Set, Set, Set]: of (readable, writable, again), where readable is a set of fds that have data available for read, writable is a set of fds that’s ready to be written to and again is a flag that if set means the caller must throw away the result and call us again.
class Worker

Pool worker process.

on_loop_start(pid)
class ResultHandler(*args, **kwargs)

Handles messages from the pool processes.

__init__(*args, **kwargs)
_recv_message(add_reader, fd, callback, __read__=__read__, readcanbuf=readcanbuf, BytesIO=BytesIO, unpack_from=unpack_from, load=None)
_make_process_result(hub)

Coroutine reading messages from the pool processes.

register_with_event_loop(hub)
handle_event(*args)
on_stop_not_started()
_flush_outqueue(fd, remove, process_index, on_state_change)
class AsynPool(processes=None, synack=False, sched_strategy=None, *args, **kwargs)

AsyncIO Pool (no threads).

WorkerProcess(worker)
__init__(processes=None, synack=False, sched_strategy=None, *args, **kwargs)
_create_worker_process(i)
_event_process_exit(hub, proc)
_track_child_process(proc, hub)
_untrack_child_process(proc, hub)
register_with_event_loop(hub)

Register the async pool with the current event loop.

_create_timelimit_handlers(hub)

Create handlers used to implement time limits.

_on_soft_timeout(job, soft, hard, hub)
_on_hard_timeout(job)
on_job_ready(job, i, obj, inqW_fd)
_create_process_handlers(hub)

Create handlers called on process up/down, etc.

_create_write_handlers(hub, pack=None, dumps=None, protocol=HIGHEST_PROTOCOL)

Create handlers used to write data to child processes.

flush()
_flush_writer(proc, writer)
get_process_queues()

Get queues for a new process.

Here we’ll find an unused slot, as there should always be one available when we start a new process.

on_grow(n)

Grow the pool by n proceses.

on_shrink(n)

Shrink the pool by n processes.

create_process_queues()

Create new in, out, etc. queues, returned as a tuple.

on_process_alive(pid)

Called when reciving the WORKER_UP message.

Marks the process as ready to receive work.

on_job_process_down(job, pid_gone)

Called for each job when the process assigned to it exits.

on_job_process_lost(job, pid, exitcode)

Called when the process executing job’ exits.

This happens when the process job’ was assigned to exited by mysterious means (error exitcodes and signals).

human_write_stats()
_process_cleanup_queues(proc)

Called to clean up queues after process exit.

_stop_task_handler()

Called at shutdown to tell processes that we’re shutting down.

create_result_handler()
_process_register_queues(proc, queues)

Mark new ownership for queues to update fileno indices.

_find_worker_queues(proc)

Find the queues owned by proc.

_setup_queues()
process_flush_queues(proc)

Flush all queues.

Including the outbound buffer, so that all tasks that haven’t been started will be discarded.

In Celery this is called whenever the transport connection is lost (consumer restart), and when a process is terminated.

on_partial_read(job, proc)

Called when a job was partially written to exited child.

destroy_queues(queues, proc)

Destroy queues that can no longer be used.

This way they can be replaced by new usable sockets.

_create_payload(type_, args, dumps=None, pack=None, protocol=HIGHEST_PROTOCOL)
_set_result_sentinel(_outqueue, _pool)
_help_stuff_finish_args()
_help_stuff_finish(pool)
timers()