Welcome to magicbus documentation!

A pub/sub Bus for managing states and transitions.

The ‘process’ subpackage defines a ProcessBus object, which is used to connect applications, servers, and frameworks with site-wide services such as daemonization, process reload, signal handling, drop privileges, PID file management, logging for all of these, and many more.

The ‘plugins’ subpackage defines a few abstract and concrete services for use with a Bus. Some use custom channels; see the documentation for each class.

class magicbus.Bus(transitions=None, errors=None, initial_state=None, extra_channels=None, id=None)

Bases: object

State machine and pub/sub messenger.

If the ‘select’ module is present (POSIX systems), then select.select() (on an os.pipe) will be used in self.wait instead of time.sleep().

clear()

Discard all subscribed callees.

log(msg='', level=20, traceback=False)

Log the given message. Append the last traceback if requested.

publish(channel, *args, **kwargs)

Return output of all subscribers for the given channel.

publish_exception_class

alias of magicbus.base.ChannelFailures

property states
subscribe(channel, callee, priority=None)

Add the given callee at the given channel (if not present).

transition(desired_state)

Move to the desired state. Return output (list of lists).

unsubscribe(channel, callee)

Discard the given callee (if present).

wait(state, interval=0.1, channel=None, sleep=False)

Poll for the given state(s) at intervals; publish to channel.

If sleep is True, the calling thread loops, sleeping for the given interval each time, then returning only when the bus state is one of the given states to wait for.

If sleep is False (the default) and the operating system supports I/O multiplexing via the ‘select’ module, then an anonymous pipe will be used to signal the waiting thread to wake up whenever the state transitions. This allows the waiting thread to return when the bus shuts down, for example, rather than waiting for the sleep interval to elapse first. Each thread that calls wait() creates a new pipe, so if file descriptors are in short supply on your system you might need to use sleep instead.

exception magicbus.ChannelFailures(*args, **kwargs)

Bases: Exception

Exception raised when errors occur in a listener during .publish().

delimiter = '\n'
get_instances()

Return a list of seen exception instances.

handle_exception()

Append the current exception to self.

class magicbus.ProcessBus

Bases: magicbus.base.Bus

A Bus subclass for managing the state of a process.

In general, there should only ever be a single ProcessBus object per process. Frameworks and site containers share a single ProcessBus by publishing messages and subscribing listeners.

The ProcessBus works as a state machine which models the current state of the process. ProcessBus methods transition it from one state to another; those methods publish to subscribed listeners on the new state’s channel after setting the new state. A simplified model might be:

   _start_
  /       \
 V         \
RUN       IDLE --exit--> EXITED
 \         A
  \_______/
    stop

But of course, nothing is ever so easy in the engineering world. In reality, start or stop could throw an error, or even fail to return. We always need an error handler (even if many states share the same one), and we always need to allow a transition, not just from one final state to another, but from the middle of start/stop to a new state. We model this by elevating each of our naive transitions to its own intermediate state, and adding error states. That is:

     XXXXXXXXXXXXXXXX START              XXXXXX-> EXIT_ERROR
     |              /   |   A            X            |
     V             V    |    \           X            V
START_ERROR <-XX RUN    |    IDLE ----> EXIT ----> EXITED ---> X
     |             \    |    A| A
     |              V   V   / |  \
     +---------------> STOP   X    ENTER <--- INITIAL
                        X     X      X
                        |     |      X
                        V     V      X
                       STOP_ERROR <-XX

Now the movement to the “RUN” state from the “IDLE” state encompasses two transitions, four if you count error transitions.

EXIT_ERROR(*exc_info)
START_ERROR(*exc_info)
STOP_ERROR(*exc_info)
block(interval=0.1, sleep=False)

Wait for the EXITED state, KeyboardInterrupt or SystemExit.

This function is intended to be called only by the main thread. After waiting for the EXITED state, it also waits for all threads to terminate, and then calls publish(‘execv’). This design allows another thread to call bus.restart, yet have the main thread perform the actual execv call (required on some platforms).

graceful()

Move to the IDLE state, then back to RUN.

restart()

Restart the process (may close connections).

This method does not restart the process from the calling thread; instead, it stops the bus and asks the main thread to call execv.

start_with_callback(func, args=None, kwargs=None)

Start ‘func’ in a new thread T, then start self (and return T).

throws = (<class 'KeyboardInterrupt'>, <class 'SystemExit'>)

A pub/sub Bus for managing states.

A Bus object is used to contain and manage behavior for any system of diverse components. A Bus object provides a place for frameworks to register code that runs in response to events, or which controls or otherwise interacts with the components.

The Bus object in this package uses topic-based publish-subscribe messaging to accomplish all this. Frameworks and site containers are free to define their own channels. If a message is sent to a channel that has not been defined or has no listeners, there is no effect.

class magicbus.base.Bus(transitions=None, errors=None, initial_state=None, extra_channels=None, id=None)

Bases: object

State machine and pub/sub messenger.

If the ‘select’ module is present (POSIX systems), then select.select() (on an os.pipe) will be used in self.wait instead of time.sleep().

clear()

Discard all subscribed callees.

log(msg='', level=20, traceback=False)

Log the given message. Append the last traceback if requested.

publish(channel, *args, **kwargs)

Return output of all subscribers for the given channel.

publish_exception_class

alias of magicbus.base.ChannelFailures

property states
subscribe(channel, callee, priority=None)

Add the given callee at the given channel (if not present).

transition(desired_state)

Move to the desired state. Return output (list of lists).

unsubscribe(channel, callee)

Discard the given callee (if present).

wait(state, interval=0.1, channel=None, sleep=False)

Poll for the given state(s) at intervals; publish to channel.

If sleep is True, the calling thread loops, sleeping for the given interval each time, then returning only when the bus state is one of the given states to wait for.

If sleep is False (the default) and the operating system supports I/O multiplexing via the ‘select’ module, then an anonymous pipe will be used to signal the waiting thread to wake up whenever the state transitions. This allows the waiting thread to return when the bus shuts down, for example, rather than waiting for the sleep interval to elapse first. Each thread that calls wait() creates a new pipe, so if file descriptors are in short supply on your system you might need to use sleep instead.

exception magicbus.base.ChannelFailures(*args, **kwargs)

Bases: Exception

Exception raised when errors occur in a listener during .publish().

delimiter = '\n'
get_instances()

Return a list of seen exception instances.

handle_exception()

Append the current exception to self.

class magicbus.base.Graph

Bases: dict

A map of {(A, C): B} where B is next in the shortest path from A to C.

Each key is a 2-tuple of nodes (A, C), and the corresponding value is the next node B to take on the shortest path between them. For example, the Graph {(“A”, “B”): “B”, (“A”, “C”): “B”} declares that the shortest path from A to B is directly to B, while the shortest path from A to C starts by moving to B. Calling code can find the shortest path [Pa, …, Pz] by iteratively calling self.get((Pn, Pz)).

Any pair (A, B) not in the map has no path.

classmethod from_edges(edges)

Form a Graph instance from the given {from: (to1, to2)} dict.

The given ‘edges’ dictionary includes a key for each node from which a transition might originate. The corresponding value is either a node or a tuple of nodes; the graph includes an edge from the key node to each node in the value. For example, the dict {“A”: “B”, “B”, (“C”, “D”)} defines 3 edges: A to B, B to C and B to D.

property states

The set of all states in the graph.

class magicbus.base.State(name)

Bases: object

class magicbus.base.StateEnum

Bases: object

An object with enumerated state attributes.

Indices and tables