hat.stc

Statechart library

 1"""Statechart library"""
 2
 3from hat.stc.common import (EventName,
 4                            StateName,
 5                            ActionName,
 6                            ConditionName,
 7                            Event,
 8                            Transition,
 9                            State)
10from hat.stc.dot import create_dot_graph
11from hat.stc.runner import (SyncRunner,
12                            AsyncRunner)
13from hat.stc.scxml import parse_scxml
14from hat.stc.statechart import (Action,
15                                Condition,
16                                Statechart)
17
18
19__all__ = ['EventName',
20           'StateName',
21           'ActionName',
22           'ConditionName',
23           'Event',
24           'Transition',
25           'State',
26           'create_dot_graph',
27           'SyncRunner',
28           'AsyncRunner',
29           'parse_scxml',
30           'Action',
31           'Condition',
32           'Statechart']
EventName = <class 'str'>
StateName = <class 'str'>
ActionName = <class 'str'>
ConditionName = <class 'str'>
class Event(typing.NamedTuple):
18class Event(typing.NamedTuple):
19    """Event instance"""
20    name: EventName
21    """Event name"""
22    payload: typing.Any = None
23    """Optional payload"""

Event instance

Event(name: str, payload: Any = None)

Create new instance of Event(name, payload)

name: str

Event name

payload: Any

Optional payload

Inherited Members
builtins.tuple
index
count
class Transition(typing.NamedTuple):
26class Transition(typing.NamedTuple):
27    """Transition definition"""
28    event: EventName
29    """Event identifier. Occurrence of event with this exact identifier can
30    trigger state transition."""
31    target: StateName | None
32    """Destination state identifier. If destination state is not defined,
33    local transition is assumed - state is not changed and transition
34    actions are triggered."""
35    actions: list[ActionName] = []
36    """Actions executed on transition."""
37    conditions: list[ConditionName] = []
38    """List of conditions. Transition is triggered only if all provided
39    conditions are met."""
40    internal: bool = False
41    """Internal transition modifier. Determines whether the source state is
42    exited in transitions whose target state is a descendant of the source
43    state."""

Transition definition

Transition( event: str, target: str | None, actions: list[str] = [], conditions: list[str] = [], internal: bool = False)

Create new instance of Transition(event, target, actions, conditions, internal)

event: str

Event identifier. Occurrence of event with this exact identifier can trigger state transition.

target: str | None

Destination state identifier. If destination state is not defined, local transition is assumed - state is not changed and transition actions are triggered.

actions: list[str]

Actions executed on transition.

conditions: list[str]

List of conditions. Transition is triggered only if all provided conditions are met.

internal: bool

Internal transition modifier. Determines whether the source state is exited in transitions whose target state is a descendant of the source state.

Inherited Members
builtins.tuple
index
count
class State(typing.NamedTuple):
46class State(typing.NamedTuple):
47    """State definition"""
48    name: StateName
49    """Unique state identifier."""
50    children: typing.List['State'] = []
51    """Optional child states. If state has children, first child is
52    considered as its initial state."""
53    transitions: list[Transition] = []
54    """Possible transitions to other states."""
55    entries: list[ActionName] = []
56    """Actions executed when state is entered."""
57    exits: list[ActionName] = []
58    """Actions executed when state is exited."""
59    final: bool = False
60    """Is state final."""

State definition

State( name: str, children: List[ForwardRef('State')] = [], transitions: list[Transition] = [], entries: list[str] = [], exits: list[str] = [], final: bool = False)

Create new instance of State(name, children, transitions, entries, exits, final)

name: str

Unique state identifier.

children: List[State]

Optional child states. If state has children, first child is considered as its initial state.

transitions: list[Transition]

Possible transitions to other states.

entries: list[str]

Actions executed when state is entered.

exits: list[str]

Actions executed when state is exited.

final: bool

Is state final.

Inherited Members
builtins.tuple
index
count
def create_dot_graph(states: Iterable[State]) -> str:
 7def create_dot_graph(states: typing.Iterable[State]) -> str:
 8    """Create DOT representation of statechart"""
 9    state_name_ids = {}
10    id_prefix = 'state'
11    states_dot = '\n'.join(
12        _create_dot_graph_states(states, state_name_ids, id_prefix))
13    transitions_dot = '\n'.join(
14        _create_dot_graph_transitions(states, state_name_ids, id_prefix))
15    return _dot_graph.format(states=states_dot,
16                             transitions=transitions_dot)

Create DOT representation of statechart

class SyncRunner:
14class SyncRunner:
15
16    def __init__(self):
17        self._queue = collections.deque()
18
19    @property
20    def empty(self) -> bool:
21        """Is event queue empty"""
22        return not self._queue
23
24    def register(self, stc: Statechart, event: Event):
25        """Add event to queue"""
26        self._queue.append((stc, event))
27
28    def step(self):
29        """Process next queued event"""
30        if not self._queue:
31            return
32
33        stc, event = self._queue.popleft()
34        stc.step(event)
empty: bool

Is event queue empty

def register( self, stc: Statechart, event: Event):
24    def register(self, stc: Statechart, event: Event):
25        """Add event to queue"""
26        self._queue.append((stc, event))

Add event to queue

def step(self):
28    def step(self):
29        """Process next queued event"""
30        if not self._queue:
31            return
32
33        stc, event = self._queue.popleft()
34        stc.step(event)

Process next queued event

class AsyncRunner(hat.aio.group.Resource):
37class AsyncRunner(aio.Resource):
38
39    def __init__(self):
40        self._queue = aio.Queue()
41        self._async_group = aio.Group()
42
43        self.async_group.spawn(self._runner_loop)
44
45    @property
46    def async_group(self):
47        """Async group"""
48        return self._async_group
49
50    def register(self, stc: Statechart, event: Event):
51        """Add event to queue"""
52        self._queue.put_nowait((stc, event))
53
54    async def _runner_loop(self):
55        try:
56            while True:
57                stc, event = await self._queue.get()
58                stc.step(event)
59
60        except Exception as e:
61            mlog.error("runner loop error: %s", e, exc_info=e)
62
63        finally:
64            self.close()
65            self._queue.close()

Resource with lifetime control based on Group.

async_group

Async group

def register( self, stc: Statechart, event: Event):
50    def register(self, stc: Statechart, event: Event):
51        """Add event to queue"""
52        self._queue.put_nowait((stc, event))

Add event to queue

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
def parse_scxml(scxml: <class 'TextIO'>) -> list[State]:
11def parse_scxml(scxml: typing.TextIO) -> list[State]:
12    """Parse SCXML into list of state definitions"""
13    root_el = _read_xml(scxml)
14    return _parse_scxml_states(root_el)

Parse SCXML into list of state definitions

Action = typing.Callable[[ForwardRef('Statechart'), Event | None], NoneType]
Condition = typing.Callable[[ForwardRef('Statechart'), Event | None], bool]
class Statechart:
 34class Statechart:
 35    """Statechart engine
 36
 37    Each instance is initialized with state definitions (first state is
 38    considered initial) and action and condition definitions.
 39
 40    During initialization, statechart will transition to initial state.
 41
 42    Statechart execution is simulated by repetitive calling of
 43    `Statechart.step` method which accepts event instances containing event
 44    name and optional event payload.
 45
 46    During statechart execution, actions and conditions are called based on
 47    state changes and associated transitions provided during initialization.
 48
 49    Condition is considered met only if result of calling condition function is
 50    ``True``.
 51
 52    Args:
 53        states: all state definitions with (first state is initial)
 54        actions: mapping of action names to their implementation
 55        conditions: mapping of conditions names to their implementation
 56
 57    """
 58
 59    def __init__(self,
 60                 states: typing.Iterable[State],
 61                 actions: dict[ActionName, Action],
 62                 conditions: dict[ConditionName, Condition] = {}):
 63        states = collections.deque(states)
 64        initial = states[0].name if states else None
 65
 66        self._actions = actions
 67        self._conditions = conditions
 68        self._states = {}
 69        self._parents = {}
 70        self._stack = collections.deque()
 71
 72        while states:
 73            state = states.pop()
 74            states.extend(state.children)
 75            self._states[state.name] = state
 76            self._parents.update({i.name: state.name for i in state.children})
 77
 78        if initial:
 79            self._walk_down(initial, None)
 80
 81    @property
 82    def state(self) -> StateName | None:
 83        """Current state"""
 84        return self._stack[-1] if self._stack else None
 85
 86    @property
 87    def finished(self) -> bool:
 88        """Is statechart in final state"""
 89        state = self.state
 90        return not state or self._states[state].final
 91
 92    def step(self, event: Event):
 93        """Process single event"""
 94        if self.finished:
 95            return
 96
 97        state, transition = self._find_state_transition(self.state, event)
 98        if not transition:
 99            return
100
101        if transition.target:
102            ancestor = self._find_ancestor(state, transition.target,
103                                           transition.internal)
104            self._walk_up(ancestor, event)
105
106        self._exec_actions(transition.actions, event)
107
108        if transition.target:
109            self._walk_down(transition.target, event)
110
111    def _walk_up(self, target, event):
112        while self.state != target:
113            state = self._states[self.state]
114            self._exec_actions(state.exits, event)
115            self._stack.pop()
116
117    def _walk_down(self, target, event):
118        states = collections.deque([self._states[target]])
119
120        while ((state := states[0]).name != self.state and
121                (parent := self._parents.get(state.name))):
122            states.appendleft(self._states[parent])
123
124        while (state := states[-1]).children:
125            states.append(state.children[0])
126
127        if states[0].name == self.state:
128            states.popleft()
129
130        for state in states:
131            self._stack.append(state.name)
132            self._exec_actions(state.entries, event)
133
134    def _find_state_transition(self, state, event):
135        while state:
136            for transition in self._states[state].transitions:
137                if transition.event != event.name:
138                    continue
139
140                if not all(self._conditions[condition](self, event)
141                           for condition in transition.conditions):
142                    continue
143
144                return state, transition
145
146            state = self._parents.get(state)
147
148        return None, None
149
150    def _find_ancestor(self, state, sibling, internal):
151        if not sibling or not state:
152            return
153
154        path = collections.deque([sibling])
155        while (parent := self._parents.get(path[0])):
156            path.appendleft(parent)
157
158        ancestor = None
159        for i, j in zip(self._stack, path):
160            if i != j:
161                break
162
163            if i in [sibling, state]:
164                if internal and i == state:
165                    ancestor = i
166                break
167
168            ancestor = i
169
170        return ancestor
171
172    def _exec_actions(self, names, event):
173        for name in names:
174            action = self._actions[name]
175            action(self, event)

Statechart engine

Each instance is initialized with state definitions (first state is considered initial) and action and condition definitions.

During initialization, statechart will transition to initial state.

Statechart execution is simulated by repetitive calling of Statechart.step method which accepts event instances containing event name and optional event payload.

During statechart execution, actions and conditions are called based on state changes and associated transitions provided during initialization.

Condition is considered met only if result of calling condition function is True.

Arguments:
  • states: all state definitions with (first state is initial)
  • actions: mapping of action names to their implementation
  • conditions: mapping of conditions names to their implementation
Statechart( states: Iterable[State], actions: dict[str, typing.Callable[[Statechart, Event | None], NoneType]], conditions: dict[str, typing.Callable[[Statechart, Event | None], bool]] = {})
59    def __init__(self,
60                 states: typing.Iterable[State],
61                 actions: dict[ActionName, Action],
62                 conditions: dict[ConditionName, Condition] = {}):
63        states = collections.deque(states)
64        initial = states[0].name if states else None
65
66        self._actions = actions
67        self._conditions = conditions
68        self._states = {}
69        self._parents = {}
70        self._stack = collections.deque()
71
72        while states:
73            state = states.pop()
74            states.extend(state.children)
75            self._states[state.name] = state
76            self._parents.update({i.name: state.name for i in state.children})
77
78        if initial:
79            self._walk_down(initial, None)
state: str | None

Current state

finished: bool

Is statechart in final state

def step(self, event: Event):
 92    def step(self, event: Event):
 93        """Process single event"""
 94        if self.finished:
 95            return
 96
 97        state, transition = self._find_state_transition(self.state, event)
 98        if not transition:
 99            return
100
101        if transition.target:
102            ancestor = self._find_ancestor(state, transition.target,
103                                           transition.internal)
104            self._walk_up(ancestor, event)
105
106        self._exec_actions(transition.actions, event)
107
108        if transition.target:
109            self._walk_down(transition.target, event)

Process single event