hat.stc
Statechart library
1"""Statechart library""" 2 3from hat.stc.common import (EventName, 4 StateName, 5 ActionName, 6 ConditionName, 7 Event, 8 Transition, 9 State) 10from hat.stc.dot import create_dot_graph 11from hat.stc.runner import (SyncRunner, 12 AsyncRunner, 13 AsyncTimer) 14from hat.stc.scxml import parse_scxml 15from hat.stc.statechart import (Action, 16 Condition, 17 Statechart) 18 19 20__all__ = ['EventName', 21 'StateName', 22 'ActionName', 23 'ConditionName', 24 'Event', 25 'Transition', 26 'State', 27 'create_dot_graph', 28 'SyncRunner', 29 'AsyncRunner', 30 'AsyncTimer', 31 'parse_scxml', 32 'Action', 33 'Condition', 34 'Statechart']
19class Event(typing.NamedTuple): 20 """Event instance""" 21 name: EventName 22 """Event name""" 23 payload: typing.Any = None 24 """Optional payload"""
Event instance
27class Transition(typing.NamedTuple): 28 """Transition definition""" 29 event: EventName 30 """Event identifier. Occurrence of event with this exact identifier can 31 trigger state transition.""" 32 target: StateName | None 33 """Destination state identifier. If destination state is not defined, 34 local transition is assumed - state is not changed and transition 35 actions are triggered.""" 36 actions: Collection[ActionName] = [] 37 """Actions executed on transition.""" 38 conditions: Collection[ConditionName] = [] 39 """List of conditions. Transition is triggered only if all provided 40 conditions are met.""" 41 internal: bool = False 42 """Internal transition modifier. Determines whether the source state is 43 exited in transitions whose target state is a descendant of the source 44 state."""
Transition definition
Create new instance of Transition(event, target, actions, conditions, internal)
Event identifier. Occurrence of event with this exact identifier can trigger state transition.
Destination state identifier. If destination state is not defined, local transition is assumed - state is not changed and transition actions are triggered.
47class State(typing.NamedTuple): 48 """State definition""" 49 name: StateName 50 """Unique state identifier.""" 51 children: Collection['State'] = [] 52 """Optional child states. If state has children, first child is 53 considered as its initial state.""" 54 transitions: Collection[Transition] = [] 55 """Possible transitions to other states.""" 56 entries: Collection[ActionName] = [] 57 """Actions executed when state is entered.""" 58 exits: Collection[ActionName] = [] 59 """Actions executed when state is exited.""" 60 final: bool = False 61 """Is state final."""
State definition
Create new instance of State(name, children, transitions, entries, exits, final)
Optional child states. If state has children, first child is considered as its initial state.
7def create_dot_graph(states: Iterable[State]) -> str: 8 """Create DOT representation of statechart""" 9 state_name_ids = {} 10 id_prefix = 'state' 11 states_dot = '\n'.join( 12 _create_dot_graph_states(states, state_name_ids, id_prefix)) 13 transitions_dot = '\n'.join( 14 _create_dot_graph_transitions(states, state_name_ids, id_prefix)) 15 return _dot_graph.format(states=states_dot, 16 transitions=transitions_dot)
Create DOT representation of statechart
16class SyncRunner: 17 18 def __init__(self): 19 self._queue = collections.deque() 20 21 @property 22 def empty(self) -> bool: 23 """Is event queue empty""" 24 return not self._queue 25 26 def register(self, stc: Statechart, event: Event): 27 """Add event to queue""" 28 self._queue.append((stc, event)) 29 30 def step(self): 31 """Process next queued event""" 32 if not self._queue: 33 return 34 35 stc, event = self._queue.popleft() 36 stc.step(event)
39class AsyncRunner(aio.Resource): 40 41 def __init__(self): 42 self._queue = aio.Queue() 43 self._async_group = aio.Group() 44 45 self.async_group.spawn(self._runner_loop) 46 47 @property 48 def async_group(self): 49 """Async group""" 50 return self._async_group 51 52 def register(self, stc: Statechart, event: Event): 53 """Add event to queue""" 54 self._queue.put_nowait((stc, event)) 55 56 async def _runner_loop(self): 57 try: 58 while True: 59 stc, event = await self._queue.get() 60 stc.step(event) 61 62 except Exception as e: 63 mlog.error("runner loop error: %s", e, exc_info=e) 64 65 finally: 66 self.close() 67 self._queue.close()
Resource with lifetime control based on Group
.
70class AsyncTimer(aio.Resource): 71 72 def __init__(self, 73 runner: AsyncRunner, 74 event: EventName, 75 duration: float): 76 self._runner = runner 77 self._event = event 78 self._duration = duration 79 self._loop = asyncio.get_running_loop() 80 self._async_group = runner.async_group.create_subgroup() 81 self._next_tokens = itertools.count(1) 82 self._active_token = None 83 self._timer = None 84 85 self.async_group.spawn(aio.call_on_cancel, self._stop, None, None) 86 87 @property 88 def async_group(self) -> aio.Group: 89 return self._async_group 90 91 @property 92 def start(self) -> Action: 93 return self._start 94 95 @property 96 def stop(self) -> Action: 97 return self._stop 98 99 @property 100 def condition(self) -> Condition: 101 return self._condition 102 103 def _start(self, stc, _): 104 if not self.is_open: 105 return 106 107 if self._timer: 108 self._timer.cancel() 109 110 self._active_token = next(self._next_tokens) 111 self._timer = self._loop.call_later(self._duration, self._on_timer, 112 stc, self._active_token) 113 114 def _stop(self, _, __): 115 self._active_token = None 116 if not self._timer: 117 return 118 119 self._timer.cancel() 120 self._timer = None 121 122 def _condition(self, _, event): 123 return bool(event and event.payload == self._active_token) 124 125 def _on_timer(self, stc, token): 126 if not self.is_open: 127 return 128 129 self._runner.register(stc, Event(name=self._event, 130 payload=token))
Resource with lifetime control based on Group
.
72 def __init__(self, 73 runner: AsyncRunner, 74 event: EventName, 75 duration: float): 76 self._runner = runner 77 self._event = event 78 self._duration = duration 79 self._loop = asyncio.get_running_loop() 80 self._async_group = runner.async_group.create_subgroup() 81 self._next_tokens = itertools.count(1) 82 self._active_token = None 83 self._timer = None 84 85 self.async_group.spawn(aio.call_on_cancel, self._stop, None, None)
11def parse_scxml(scxml: typing.TextIO) -> list[State]: 12 """Parse SCXML into list of state definitions""" 13 root_el = _read_xml(scxml) 14 return _parse_scxml_states(root_el)
Parse SCXML into list of state definitions
35class Statechart: 36 """Statechart engine 37 38 Each instance is initialized with state definitions (first state is 39 considered initial) and action and condition definitions. 40 41 During initialization, statechart will transition to initial state. 42 43 Statechart execution is simulated by repetitive calling of 44 `Statechart.step` method which accepts event instances containing event 45 name and optional event payload. 46 47 During statechart execution, actions and conditions are called based on 48 state changes and associated transitions provided during initialization. 49 50 Condition is considered met only if result of calling condition function is 51 ``True``. 52 53 Args: 54 states: all state definitions with (first state is initial) 55 actions: mapping of action names to their implementation 56 conditions: mapping of conditions names to their implementation 57 58 """ 59 60 def __init__(self, 61 states: Iterable[State], 62 actions: dict[ActionName, Action], 63 conditions: dict[ConditionName, Condition] = {}): 64 states = collections.deque(states) 65 initial = states[0].name if states else None 66 67 self._actions = actions 68 self._conditions = conditions 69 self._states = {} 70 self._parents = {} 71 self._stack = collections.deque() 72 73 while states: 74 state = states.pop() 75 states.extend(state.children) 76 self._states[state.name] = state 77 self._parents.update({i.name: state.name for i in state.children}) 78 79 if initial: 80 self._walk_down(initial, None) 81 82 @property 83 def state(self) -> StateName | None: 84 """Current state""" 85 return self._stack[-1] if self._stack else None 86 87 @property 88 def finished(self) -> bool: 89 """Is statechart in final state""" 90 state = self.state 91 return not state or self._states[state].final 92 93 def step(self, event: Event): 94 """Process single event""" 95 if self.finished: 96 return 97 98 state, transition = self._find_state_transition(self.state, event) 99 if not transition: 100 return 101 102 if transition.target: 103 ancestor = self._find_ancestor(state, transition.target, 104 transition.internal) 105 self._walk_up(ancestor, event) 106 107 self._exec_actions(transition.actions, event) 108 109 if transition.target: 110 self._walk_down(transition.target, event) 111 112 def _walk_up(self, target, event): 113 while self.state != target: 114 state = self._states[self.state] 115 self._exec_actions(state.exits, event) 116 self._stack.pop() 117 118 def _walk_down(self, target, event): 119 states = collections.deque([self._states[target]]) 120 121 while ((state := states[0]).name != self.state and 122 (parent := self._parents.get(state.name))): 123 states.appendleft(self._states[parent]) 124 125 while (state := states[-1]).children: 126 states.append(state.children[0]) 127 128 if states[0].name == self.state: 129 states.popleft() 130 131 for state in states: 132 self._stack.append(state.name) 133 self._exec_actions(state.entries, event) 134 135 def _find_state_transition(self, state, event): 136 while state: 137 for transition in self._states[state].transitions: 138 if transition.event != event.name: 139 continue 140 141 if not all(self._conditions[condition](self, event) 142 for condition in transition.conditions): 143 continue 144 145 return state, transition 146 147 state = self._parents.get(state) 148 149 return None, None 150 151 def _find_ancestor(self, state, sibling, internal): 152 if not sibling or not state: 153 return 154 155 path = collections.deque([sibling]) 156 while (parent := self._parents.get(path[0])): 157 path.appendleft(parent) 158 159 ancestor = None 160 for i, j in zip(self._stack, path): 161 if i != j: 162 break 163 164 if i in [sibling, state]: 165 if internal and i == state: 166 ancestor = i 167 break 168 169 ancestor = i 170 171 return ancestor 172 173 def _exec_actions(self, names, event): 174 for name in names: 175 action = self._actions[name] 176 action(self, event)
Statechart engine
Each instance is initialized with state definitions (first state is considered initial) and action and condition definitions.
During initialization, statechart will transition to initial state.
Statechart execution is simulated by repetitive calling of
Statechart.step
method which accepts event instances containing event
name and optional event payload.
During statechart execution, actions and conditions are called based on state changes and associated transitions provided during initialization.
Condition is considered met only if result of calling condition function is
True
.
Arguments:
- states: all state definitions with (first state is initial)
- actions: mapping of action names to their implementation
- conditions: mapping of conditions names to their implementation
60 def __init__(self, 61 states: Iterable[State], 62 actions: dict[ActionName, Action], 63 conditions: dict[ConditionName, Condition] = {}): 64 states = collections.deque(states) 65 initial = states[0].name if states else None 66 67 self._actions = actions 68 self._conditions = conditions 69 self._states = {} 70 self._parents = {} 71 self._stack = collections.deque() 72 73 while states: 74 state = states.pop() 75 states.extend(state.children) 76 self._states[state.name] = state 77 self._parents.update({i.name: state.name for i in state.children}) 78 79 if initial: 80 self._walk_down(initial, None)
82 @property 83 def state(self) -> StateName | None: 84 """Current state""" 85 return self._stack[-1] if self._stack else None
Current state
87 @property 88 def finished(self) -> bool: 89 """Is statechart in final state""" 90 state = self.state 91 return not state or self._states[state].final
Is statechart in final state
93 def step(self, event: Event): 94 """Process single event""" 95 if self.finished: 96 return 97 98 state, transition = self._find_state_transition(self.state, event) 99 if not transition: 100 return 101 102 if transition.target: 103 ancestor = self._find_ancestor(state, transition.target, 104 transition.internal) 105 self._walk_up(ancestor, event) 106 107 self._exec_actions(transition.actions, event) 108 109 if transition.target: 110 self._walk_down(transition.target, event)
Process single event