contrib.testing.manager

Integration testing utilities.

Module Contents

Classes

Sentinel() Signifies the end of something.
ManagerMixin() Mixin that adds Manager capabilities.
Manager(self,app,**kwargs) Test helpers for task integration tests.
class Sentinel

Signifies the end of something.

class ManagerMixin

Mixin that adds Manager capabilities.

_init_manager(block_timeout=None, no_join=False, stdout=None, stderr=None)
remark(s, sep="-")
missing_results(r)
wait_for(fun, catch, desc="thing", args=tuple, kwargs=dict, errback=None, max_retries=10, interval_start=0.1, interval_step=0.5, interval_max=5.0, emit_warning=False, **options)

Wait for event to happen.

The catch argument specifies the exception that means the event has not happened yet.

ensure_not_for_a_while(fun, catch, desc="thing", max_retries=20, interval_start=0.1, interval_step=0.02, interval_max=1.0, emit_warning=False, **options)

Make sure something does not happen (at least for a while).

retry_over_time(*args, **kwargs)
join(r, propagate=False, max_retries=10, **kwargs)
inspect(timeout=3.0)
query_tasks(ids, timeout=0.5)
query_task_states(ids, timeout=0.5)
assert_accepted(ids, interval=0.5, desc="waiting for tasks to be accepted", **policy)
assert_received(ids, interval=0.5, desc="waiting for tasks to be received", **policy)
assert_result_tasks_in_progress_or_completed(async_results, interval=0.5, desc="waiting for tasks to be started or completed", **policy)
assert_task_state_from_result(fun, results, interval=0.5, **policy)
is_result_task_in_progress(**kwargs)
assert_task_worker_state(fun, ids, interval=0.5, **policy)
is_received(ids, **kwargs)
is_accepted(ids, **kwargs)
_ids_matches_state(expected_states, ids, timeout=0.5)
true_or_raise(fun, *args, **kwargs)
class Manager(app, **kwargs)

Test helpers for task integration tests.

__init__(app, **kwargs)