hat.stc
Statechart library
1"""Statechart library""" 2 3from hat.stc.common import (EventName, 4 StateName, 5 ActionName, 6 ConditionName, 7 Event, 8 Transition, 9 State) 10from hat.stc.dot import create_dot_graph 11from hat.stc.runner import (SyncRunner, 12 AsyncRunner) 13from hat.stc.scxml import parse_scxml 14from hat.stc.statechart import (Action, 15 Condition, 16 Statechart) 17 18 19__all__ = ['EventName', 20 'StateName', 21 'ActionName', 22 'ConditionName', 23 'Event', 24 'Transition', 25 'State', 26 'create_dot_graph', 27 'SyncRunner', 28 'AsyncRunner', 29 'parse_scxml', 30 'Action', 31 'Condition', 32 'Statechart']
18class Event(typing.NamedTuple): 19 """Event instance""" 20 name: EventName 21 """Event name""" 22 payload: typing.Any = None 23 """Optional payload"""
Event instance
Inherited Members
- builtins.tuple
- index
- count
26class Transition(typing.NamedTuple): 27 """Transition definition""" 28 event: EventName 29 """Event identifier. Occurrence of event with this exact identifier can 30 trigger state transition.""" 31 target: StateName | None 32 """Destination state identifier. If destination state is not defined, 33 local transition is assumed - state is not changed and transition 34 actions are triggered.""" 35 actions: list[ActionName] = [] 36 """Actions executed on transition.""" 37 conditions: list[ConditionName] = [] 38 """List of conditions. Transition is triggered only if all provided 39 conditions are met.""" 40 internal: bool = False 41 """Internal transition modifier. Determines whether the source state is 42 exited in transitions whose target state is a descendant of the source 43 state."""
Transition definition
Create new instance of Transition(event, target, actions, conditions, internal)
Event identifier. Occurrence of event with this exact identifier can trigger state transition.
Destination state identifier. If destination state is not defined, local transition is assumed - state is not changed and transition actions are triggered.
List of conditions. Transition is triggered only if all provided conditions are met.
Internal transition modifier. Determines whether the source state is exited in transitions whose target state is a descendant of the source state.
Inherited Members
- builtins.tuple
- index
- count
46class State(typing.NamedTuple): 47 """State definition""" 48 name: StateName 49 """Unique state identifier.""" 50 children: typing.List['State'] = [] 51 """Optional child states. If state has children, first child is 52 considered as its initial state.""" 53 transitions: list[Transition] = [] 54 """Possible transitions to other states.""" 55 entries: list[ActionName] = [] 56 """Actions executed when state is entered.""" 57 exits: list[ActionName] = [] 58 """Actions executed when state is exited.""" 59 final: bool = False 60 """Is state final."""
State definition
Create new instance of State(name, children, transitions, entries, exits, final)
Optional child states. If state has children, first child is considered as its initial state.
Inherited Members
- builtins.tuple
- index
- count
7def create_dot_graph(states: typing.Iterable[State]) -> str: 8 """Create DOT representation of statechart""" 9 state_name_ids = {} 10 id_prefix = 'state' 11 states_dot = '\n'.join( 12 _create_dot_graph_states(states, state_name_ids, id_prefix)) 13 transitions_dot = '\n'.join( 14 _create_dot_graph_transitions(states, state_name_ids, id_prefix)) 15 return _dot_graph.format(states=states_dot, 16 transitions=transitions_dot)
Create DOT representation of statechart
14class SyncRunner: 15 16 def __init__(self): 17 self._queue = collections.deque() 18 19 @property 20 def empty(self) -> bool: 21 """Is event queue empty""" 22 return not self._queue 23 24 def register(self, stc: Statechart, event: Event): 25 """Add event to queue""" 26 self._queue.append((stc, event)) 27 28 def step(self): 29 """Process next queued event""" 30 if not self._queue: 31 return 32 33 stc, event = self._queue.popleft() 34 stc.step(event)
37class AsyncRunner(aio.Resource): 38 39 def __init__(self): 40 self._queue = aio.Queue() 41 self._async_group = aio.Group() 42 43 self.async_group.spawn(self._runner_loop) 44 45 @property 46 def async_group(self): 47 """Async group""" 48 return self._async_group 49 50 def register(self, stc: Statechart, event: Event): 51 """Add event to queue""" 52 self._queue.put_nowait((stc, event)) 53 54 async def _runner_loop(self): 55 try: 56 while True: 57 stc, event = await self._queue.get() 58 stc.step(event) 59 60 except Exception as e: 61 mlog.error("runner loop error: %s", e, exc_info=e) 62 63 finally: 64 self.close() 65 self._queue.close()
Resource with lifetime control based on Group
.
50 def register(self, stc: Statechart, event: Event): 51 """Add event to queue""" 52 self._queue.put_nowait((stc, event))
Add event to queue
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close
11def parse_scxml(scxml: typing.TextIO) -> list[State]: 12 """Parse SCXML into list of state definitions""" 13 root_el = _read_xml(scxml) 14 return _parse_scxml_states(root_el)
Parse SCXML into list of state definitions
34class Statechart: 35 """Statechart engine 36 37 Each instance is initialized with state definitions (first state is 38 considered initial) and action and condition definitions. 39 40 During initialization, statechart will transition to initial state. 41 42 Statechart execution is simulated by repetitive calling of 43 `Statechart.step` method which accepts event instances containing event 44 name and optional event payload. 45 46 During statechart execution, actions and conditions are called based on 47 state changes and associated transitions provided during initialization. 48 49 Condition is considered met only if result of calling condition function is 50 ``True``. 51 52 Args: 53 states: all state definitions with (first state is initial) 54 actions: mapping of action names to their implementation 55 conditions: mapping of conditions names to their implementation 56 57 """ 58 59 def __init__(self, 60 states: typing.Iterable[State], 61 actions: dict[ActionName, Action], 62 conditions: dict[ConditionName, Condition] = {}): 63 states = collections.deque(states) 64 initial = states[0].name if states else None 65 66 self._actions = actions 67 self._conditions = conditions 68 self._states = {} 69 self._parents = {} 70 self._stack = collections.deque() 71 72 while states: 73 state = states.pop() 74 states.extend(state.children) 75 self._states[state.name] = state 76 self._parents.update({i.name: state.name for i in state.children}) 77 78 if initial: 79 self._walk_down(initial, None) 80 81 @property 82 def state(self) -> StateName | None: 83 """Current state""" 84 return self._stack[-1] if self._stack else None 85 86 @property 87 def finished(self) -> bool: 88 """Is statechart in final state""" 89 state = self.state 90 return not state or self._states[state].final 91 92 def step(self, event: Event): 93 """Process single event""" 94 if self.finished: 95 return 96 97 state, transition = self._find_state_transition(self.state, event) 98 if not transition: 99 return 100 101 if transition.target: 102 ancestor = self._find_ancestor(state, transition.target, 103 transition.internal) 104 self._walk_up(ancestor, event) 105 106 self._exec_actions(transition.actions, event) 107 108 if transition.target: 109 self._walk_down(transition.target, event) 110 111 def _walk_up(self, target, event): 112 while self.state != target: 113 state = self._states[self.state] 114 self._exec_actions(state.exits, event) 115 self._stack.pop() 116 117 def _walk_down(self, target, event): 118 states = collections.deque([self._states[target]]) 119 120 while ((state := states[0]).name != self.state and 121 (parent := self._parents.get(state.name))): 122 states.appendleft(self._states[parent]) 123 124 while (state := states[-1]).children: 125 states.append(state.children[0]) 126 127 if states[0].name == self.state: 128 states.popleft() 129 130 for state in states: 131 self._stack.append(state.name) 132 self._exec_actions(state.entries, event) 133 134 def _find_state_transition(self, state, event): 135 while state: 136 for transition in self._states[state].transitions: 137 if transition.event != event.name: 138 continue 139 140 if not all(self._conditions[condition](self, event) 141 for condition in transition.conditions): 142 continue 143 144 return state, transition 145 146 state = self._parents.get(state) 147 148 return None, None 149 150 def _find_ancestor(self, state, sibling, internal): 151 if not sibling or not state: 152 return 153 154 path = collections.deque([sibling]) 155 while (parent := self._parents.get(path[0])): 156 path.appendleft(parent) 157 158 ancestor = None 159 for i, j in zip(self._stack, path): 160 if i != j: 161 break 162 163 if i in [sibling, state]: 164 if internal and i == state: 165 ancestor = i 166 break 167 168 ancestor = i 169 170 return ancestor 171 172 def _exec_actions(self, names, event): 173 for name in names: 174 action = self._actions[name] 175 action(self, event)
Statechart engine
Each instance is initialized with state definitions (first state is considered initial) and action and condition definitions.
During initialization, statechart will transition to initial state.
Statechart execution is simulated by repetitive calling of
Statechart.step
method which accepts event instances containing event
name and optional event payload.
During statechart execution, actions and conditions are called based on state changes and associated transitions provided during initialization.
Condition is considered met only if result of calling condition function is
True
.
Arguments:
- states: all state definitions with (first state is initial)
- actions: mapping of action names to their implementation
- conditions: mapping of conditions names to their implementation
59 def __init__(self, 60 states: typing.Iterable[State], 61 actions: dict[ActionName, Action], 62 conditions: dict[ConditionName, Condition] = {}): 63 states = collections.deque(states) 64 initial = states[0].name if states else None 65 66 self._actions = actions 67 self._conditions = conditions 68 self._states = {} 69 self._parents = {} 70 self._stack = collections.deque() 71 72 while states: 73 state = states.pop() 74 states.extend(state.children) 75 self._states[state.name] = state 76 self._parents.update({i.name: state.name for i in state.children}) 77 78 if initial: 79 self._walk_down(initial, None)
81 @property 82 def state(self) -> StateName | None: 83 """Current state""" 84 return self._stack[-1] if self._stack else None
Current state
86 @property 87 def finished(self) -> bool: 88 """Is statechart in final state""" 89 state = self.state 90 return not state or self._states[state].final
Is statechart in final state
92 def step(self, event: Event): 93 """Process single event""" 94 if self.finished: 95 return 96 97 state, transition = self._find_state_transition(self.state, event) 98 if not transition: 99 return 100 101 if transition.target: 102 ancestor = self._find_ancestor(state, transition.target, 103 transition.internal) 104 self._walk_up(ancestor, event) 105 106 self._exec_actions(transition.actions, event) 107 108 if transition.target: 109 self._walk_down(transition.target, event)
Process single event