iotile.core.utilities.workqueue_thread

A thread that hands workqueue items to a user function.

Module Contents

STOP_WORKER_ITEM
MarkLocationItem
WorkItem
WaitIdleItem
class WorkQueueThread(handler)

Bases:threading.Thread

A worker thread that handles queued work.

This class takes a single callable as a parameter in __init__ and starts a background thread with a shared queue. Whenever an item is put in the shared queue, the callable is called with that work item.

The return value of that callable is passed back to the caller or given via a callback function.

The key methods of this class are:

  • dispatch(item, callback=None): Synchronously dispatches a work item to the queue and waits for the result. If callback is not None, this function will return immediately and the result will be passed to the callback when it is ready.
  • stop(): Synchronously stop the background thread. This will wait for all currently queued work items to finish and then cleanly shut down the background thread.
  • flush(): Wait until the work queue is momentarily empty(). This method is useful for ensuring that all work items received up until this method was called have been processed.
Args:
handler (callable): The handler function that will be passed all of the
work items queued in dispatch() and should return a result that will be the return value of dispatch(). If this function throws an exception, it will be rethrown from dispatch.
STILL_PENDING

Special return value from handler to indicate callback should be deferred.

This allows the background handler function to store away a callback and call it in the future if it needs to. The handler can inspect the callback by calling current_callback(), which will raise an exception if not called while an item is being dispatched.

current_callback(self)

Get the current callback from a handler function.

This method allows a handler to get the callback that would be called when it returns. It is only useful to use in conjunction with the STILL_PENDING return value so that the handler can store the callback away and call it later when a long-running operation has finished.

Returns:

callable: The callback function associated with the current work item.

This will be None if there was no callback passed.

dispatch(self, value, callback=None)

Dispatch an item to the workqueue and optionally wait.

This is the only way to add work to the background work queue. Unless you also pass a callback object, this method will synchronously wait for the work to finish and return the result. If the work raises an exception, the exception will be reraised in this method.

If you pass an optional callback(exc_info, return_value), this method will not block and instead your callback will be called when the work finishes. If an exception was raised during processing, exc_info will be set with the contents of sys.exc_info(). Otherwise, exc_info will be None and whatever the work_queue handler returned will be passed as the return_value parameter to the supplied callback.

Args:
value (object): Arbitrary object that will be passed to the work
queue handler.
callback (callable): Optional callback to receive the result of
the work queue when it finishes. If not passed, this method will be synchronous and return the result from the dispatch() method itself
Returns:

object: The result of the work_queue handler function or None.

If callback is not None, then this method will return immediately with a None return value. Otherwise it will block until the work item is finished (including any work items ahead in the queue) and return whatever the work item handler returned.

future_raise(self, tp, value=None, tb=None)

raise_ implementation from future.utils

flush(self)

Synchronously wait until this work item is processed.

This has the effect of waiting until all work items queued before this method has been called have finished.

defer(self, callback)

Schedule a callback once all current items in the queue are finished.

This function can schedule work synchronous with the work queue without passing through the work queue handler.

This method returns immediately.

Args:
callback (callable): A callable with no arguments that will be
called once all current items in the workqueue have been executed.
defer_until_idle(self, callback)

Wait until the work queue is (temporarily) empty.

This is different from flush() because processing a work queue entry may add additional work queue entries. This method lets you wait until there are no more entries in the work queue.

Depending on how work is being added to the work queue, this may be a very interesting condtion.

This method will return immeidately and schedule callback to be called as soon as the work queue becomes empty. You can queue as many callbacks as you like via multiple calls to defer_until_idle. These will be executed in the same order together the first time that the queue becomes momentarily idle.

Note that the concept of an “empty” workqueue is a very unstable concept in general. Unless you as the caller know that no one else except you and possibly the work-queue items themselves can add a task to the work queue, then there is no guarantee that this callback will ever fire since it could be that someone else is adding work queue items just as fast as they are being completed.

This is a specialty method that is useful in a few defined circumstances.

Args:
callback (callable): A callable with no arguments that will be
called once the queue is temporarily empty.
wait_until_idle(self)

Block the calling thread until the work queue is (temporarily) empty.

See the detailed discussion under defer_until_idle() for restrictions and expected use cases for this method.

This routine will block the calling thread.

direct_dispatch(self, arg, callback)

Directly dispatch a work item.

This method MUST only be called from inside of another work item and will synchronously invoke the work item as if it was passed to dispatch(). Calling this method from any other thread has undefined consequences since it will be unsynchronized with respect to items dispatched from inside the background work queue itself.

run(self)

The target routine called to start thread activity.

stop(self, timeout=None, force=False)

Stop the worker thread and synchronously wait for it to finish.

Args:
timeout (float): The maximum time to wait for the thread to stop
before raising a TimeoutExpiredError. If force is True, TimeoutExpiredError is not raised and the thread is just marked as a daemon thread so that it does not block cleanly exiting the process.
force (bool): If true and the thread does not exit in timeout seconds
no error is raised since the thread is marked as daemon and will be killed when the process exits.
signal_stop(self)

Signal that the worker thread should stop but don’t wait.

This function is useful for stopping multiple threads in parallel when combined with wait_stopped().

wait_stopped(self, timeout=None, force=False)

Wait for the thread to stop.

You must have previously called signal_stop or this function will hang.

Args:

timeout (float): The maximum time to wait for the thread to stop
before raising a TimeoutExpiredError. If force is True, TimeoutExpiredError is not raised and the thread is just marked as a daemon thread so that it does not block cleanly exiting the process.
force (bool): If true and the thread does not exit in timeout seconds
no error is raised since the thread is marked as daemon and will be killed when the process exits.