hat.stc
Statechart module
1"""Statechart module""" 2 3import collections 4import itertools 5import pathlib 6import typing 7import xml.etree.ElementTree 8 9from hat import aio 10 11 12EventName: typing.TypeAlias = str 13"""Event name""" 14 15StateName: typing.TypeAlias = str 16"""State name""" 17 18ActionName: typing.TypeAlias = str 19"""Action name""" 20 21ConditionName: typing.TypeAlias = str 22"""Condition name""" 23 24 25class Event(typing.NamedTuple): 26 """Event instance""" 27 name: EventName 28 """Event name""" 29 payload: typing.Any = None 30 """Optional payload""" 31 32 33class Transition(typing.NamedTuple): 34 """Transition definition""" 35 event: EventName 36 """Event identifier. Occurrence of event with this exact identifier can 37 trigger state transition.""" 38 target: StateName | None 39 """Destination state identifier. If destination state is not defined, 40 local transition is assumed - state is not changed and transition 41 actions are triggered.""" 42 actions: list[ActionName] = [] 43 """Actions executed on transition.""" 44 conditions: list[ConditionName] = [] 45 """List of conditions. Transition is triggered only if all provided 46 conditions are met.""" 47 internal: bool = False 48 """Internal transition modifier. Determines whether the source state is 49 exited in transitions whose target state is a descendant of the source 50 state.""" 51 52 53class State(typing.NamedTuple): 54 """State definition""" 55 name: StateName 56 """Unique state identifier.""" 57 children: typing.List['State'] = [] 58 """Optional child states. If state has children, first child is 59 considered as its initial state.""" 60 transitions: list[Transition] = [] 61 """Possible transitions to other states.""" 62 entries: list[ActionName] = [] 63 """Actions executed when state is entered.""" 64 exits: list[ActionName] = [] 65 """Actions executed when state is exited.""" 66 final: bool = False 67 """Is state final.""" 68 69 70Action: typing.TypeAlias = typing.Callable[['Statechart', Event | None], None] 71"""Action function 72 73Action implementation which can be executed as part of entering/exiting 74state or transition execution. It is called with statechart instance and 75`Event` which triggered transition. In case of initial actions, run during 76transition to initial state, it is called with ``None``. 77 78""" 79 80Condition: typing.TypeAlias = typing.Callable[['Statechart', Event | None], 81 bool] 82"""Condition function 83 84Condition implementation used as transition guard. It is called with statechart 85instance and `Event` which triggered transition. Return value ``True`` is 86interpreted as satisfied condition. 87 88""" 89 90 91class Statechart: 92 """Statechart engine 93 94 Each instance is initialized with state definitions (first state is 95 considered initial) and action and condition definitions. 96 97 During initialization, statechart will transition to initial state. 98 99 Statechart execution is simulated by repetitive calling of 100 `Statechart.step` method. Alternatively, `Statechart.run` coroutine can 101 be used for waiting and processing new event occurrences. Coroutine `run` 102 continues execution until statechart transitions to final state. Once final 103 state is reached, `Statechart.run` finishes execution. 104 105 New events are registered with `Statechart.register` method which accepts 106 event instances containing event name and optional event payload. All event 107 registrations are queued and processed sequentially. 108 109 During statechart execution, actions and conditions are called based on 110 state changes and associated transitions provided during initialization. 111 112 Condition is considered met only if result of calling condition function is 113 ``True``. 114 115 Args: 116 states: all state definitions with (first state is initial) 117 actions: mapping of action names to their implementation 118 conditions: mapping of conditions names to their implementation 119 120 """ 121 122 def __init__(self, 123 states: typing.Iterable[State], 124 actions: dict[str, Action], 125 conditions: dict[str, Condition] = {}): 126 states = collections.deque(states) 127 initial = states[0].name if states else None 128 129 self._actions = actions 130 self._conditions = conditions 131 self._states = {} 132 self._parents = {} 133 self._stack = collections.deque() 134 self._queue = aio.Queue() 135 136 while states: 137 state = states.pop() 138 states.extend(state.children) 139 self._states[state.name] = state 140 self._parents.update({i.name: state.name for i in state.children}) 141 142 if initial: 143 self._walk_down(initial, None) 144 145 @property 146 def state(self) -> StateName | None: 147 """Current state""" 148 return self._stack[-1] if self._stack else None 149 150 @property 151 def finished(self) -> bool: 152 """Is statechart in final state""" 153 state = self.state 154 return not state or self._states[state].final 155 156 def register(self, event: Event): 157 """Add event to queue""" 158 self._queue.put_nowait(event) 159 160 def step(self) -> bool: 161 """Process next queued event 162 163 Returns ``False`` if event queue is empty. 164 165 """ 166 if self._queue.empty(): 167 return False 168 event = self._queue.get_nowait() 169 if not self.finished: 170 self._step(event) 171 return not self._queue.empty() 172 173 async def run(self): 174 """Run statechart step loop 175 176 This coroutine finishes once statechart enters final state. 177 178 """ 179 while not self.finished: 180 event = await self._queue.get() 181 self._step(event) 182 183 def _step(self, event): 184 state, transition = self._find_state_transition(self.state, event) 185 if not transition: 186 return 187 if transition.target: 188 ancestor = self._find_ancestor(state, transition.target, 189 transition.internal) 190 self._walk_up(ancestor, event) 191 self._exec_actions(transition.actions, event) 192 if transition.target: 193 self._walk_down(transition.target, event) 194 195 def _walk_up(self, target, event): 196 while self.state != target: 197 state = self._states[self.state] 198 self._exec_actions(state.exits, event) 199 self._stack.pop() 200 201 def _walk_down(self, target, event): 202 states = collections.deque([self._states[target]]) 203 while ((state := states[0]).name != self.state and 204 (parent := self._parents.get(state.name))): 205 states.appendleft(self._states[parent]) 206 while (state := states[-1]).children: 207 states.append(state.children[0]) 208 if states[0].name == self.state: 209 states.popleft() 210 for state in states: 211 self._stack.append(state.name) 212 self._exec_actions(state.entries, event) 213 214 def _find_state_transition(self, state, event): 215 while state: 216 for transition in self._states[state].transitions: 217 if transition.event != event.name: 218 continue 219 if not all(self._conditions[condition](self, event) 220 for condition in transition.conditions): 221 continue 222 return state, transition 223 state = self._parents.get(state) 224 return None, None 225 226 def _find_ancestor(self, state, sibling, internal): 227 if not sibling or not state: 228 return 229 path = collections.deque([sibling]) 230 while (parent := self._parents.get(path[0])): 231 path.appendleft(parent) 232 ancestor = None 233 for i, j in zip(self._stack, path): 234 if i != j: 235 break 236 if i in [sibling, state]: 237 if internal and i == state: 238 ancestor = i 239 break 240 ancestor = i 241 return ancestor 242 243 def _exec_actions(self, names, event): 244 for name in names: 245 action = self._actions[name] 246 action(self, event) 247 248 249def parse_scxml(scxml: typing.TextIO | pathlib.Path) -> list[State]: 250 """Parse SCXML into list of state definitions""" 251 if isinstance(scxml, pathlib.Path): 252 with open(scxml, encoding='utf-8') as f: 253 root_el = _read_xml(f) 254 else: 255 root_el = _read_xml(scxml) 256 return _parse_scxml_states(root_el) 257 258 259def create_dot_graph(states: typing.Iterable[State]) -> str: 260 """Create DOT representation of statechart""" 261 state_name_ids = {} 262 id_prefix = 'state' 263 states_dot = '\n'.join( 264 _create_dot_graph_states(states, state_name_ids, id_prefix)) 265 transitions_dot = '\n'.join( 266 _create_dot_graph_transitions(states, state_name_ids, id_prefix)) 267 return _dot_graph.format(states=states_dot, 268 transitions=transitions_dot) 269 270 271def _parse_scxml_states(parent_el): 272 states = {} 273 for state_el in itertools.chain(parent_el.findall("./state"), 274 parent_el.findall("./final")): 275 state = _parse_scxml_state(state_el) 276 states[state.name] = state 277 if not states: 278 return [] 279 initial = parent_el.get('initial') 280 return [states[initial], *(state for name, state in states.items() 281 if name != initial)] 282 283 284def _parse_scxml_state(state_el): 285 return State(name=state_el.get('id'), 286 children=_parse_scxml_states(state_el), 287 transitions=[_parse_scxml_transition(i) 288 for i in state_el.findall('./transition')], 289 entries=[entry_el.text 290 for entry_el in state_el.findall('./onentry') 291 if entry_el.text], 292 exits=[exit_el.text 293 for exit_el in state_el.findall('./onexit') 294 if exit_el.text], 295 final=state_el.tag == 'final') 296 297 298def _parse_scxml_transition(transition_el): 299 return Transition(event=transition_el.get('event'), 300 target=transition_el.get('target'), 301 actions=[i 302 for i in (transition_el.text or '').split() 303 if i], 304 conditions=[i for i in (transition_el.get('cond') or 305 '').split() 306 if i], 307 internal=transition_el.get('type') == 'internal') 308 309 310def _read_xml(source): 311 it = xml.etree.ElementTree.iterparse(source) 312 for _, el in it: 313 prefix, has_namespace, postfix = el.tag.partition('}') 314 if has_namespace: 315 el.tag = postfix 316 return it.root 317 318 319def _create_dot_graph_states(states, state_name_ids, id_prefix): 320 if not states: 321 return 322 yield _dot_graph_initial.format(id=f'{id_prefix}_initial') 323 for i, state in enumerate(states): 324 state_id = f'{id_prefix}_{i}' 325 state_name_ids[state.name] = state_id 326 actions = '\n'.join(_create_dot_graph_state_actions(state)) 327 separator = _dot_graph_separator if actions else '' 328 children = '\n'.join( 329 _create_dot_graph_states(state.children, state_name_ids, state_id)) 330 yield _dot_graph_state.format(id=state_id, 331 name=state.name, 332 separator=separator, 333 actions=actions, 334 children=children) 335 336 337def _create_dot_graph_state_actions(state): 338 for name in state.entries: 339 yield _dot_graph_state_action.format(type='entry', name=name) 340 for name in state.entries: 341 yield _dot_graph_state_action.format(type='exit', name=name) 342 343 344def _create_dot_graph_transitions(states, state_name_ids, id_prefix): 345 if not states: 346 return 347 yield _dot_graph_transition.format(src_id=f'{id_prefix}_initial', 348 dst_id=f'{id_prefix}_0', 349 label='""', 350 lhead=f'cluster_{id_prefix}_0', 351 ltail='') 352 for state in states: 353 src_id = state_name_ids[state.name] 354 for transition in state.transitions: 355 dst_id = (state_name_ids[transition.target] if transition.target 356 else src_id) 357 label = _create_dot_graph_transition_label(transition) 358 lhead = f'cluster_{dst_id}' 359 ltail = f'cluster_{src_id}' 360 if lhead == ltail: 361 lhead, ltail = '', '' 362 elif ltail.startswith(lhead): 363 lhead = '' 364 elif lhead.startswith(ltail): 365 ltail = '' 366 yield _dot_graph_transition.format(src_id=src_id, 367 dst_id=dst_id, 368 label=label, 369 lhead=lhead, 370 ltail=ltail) 371 yield from _create_dot_graph_transitions(state.children, 372 state_name_ids, src_id) 373 374 375def _create_dot_graph_transition_label(transition): 376 separator = (_dot_graph_separator 377 if transition.actions or transition.conditions 378 else '') 379 actions = '\n'.join(_dot_graph_transition_action.format(name=name) 380 for name in transition.actions) 381 condition = (f" [{' '.join(transition.conditions)}]" 382 if transition.conditions else "") 383 internal = ' (internal)' if transition.internal else '' 384 local = ' (local)' if transition.target is None else '' 385 return _dot_graph_transition_label.format(event=transition.event, 386 condition=condition, 387 internal=internal, 388 local=local, 389 separator=separator, 390 actions=actions) 391 392 393_dot_graph = r"""digraph "stc" {{ 394 fontname = Helvetica 395 fontsize = 12 396 penwidth = 2.0 397 splines = true 398 ordering = out 399 compound = true 400 overlap = scale 401 nodesep = 0.3 402 ranksep = 0.1 403 node [ 404 shape = plaintext 405 style = filled 406 fillcolor = transparent 407 fontname = Helvetica 408 fontsize = 12 409 penwidth = 2.0 410 ] 411 edge [ 412 fontname = Helvetica 413 fontsize = 12 414 ] 415 {states} 416 {transitions} 417}} 418""" 419 420_dot_graph_initial = r"""{id} [ 421 shape = circle 422 style = filled 423 fillcolor = black 424 fixedsize = true 425 height = 0.15 426 label = "" 427]""" 428 429_dot_graph_state = r"""subgraph "cluster_{id}" {{ 430 label = < 431 <table cellborder="0" border="0"> 432 <tr><td>{name}</td></tr> 433 {separator} 434 {actions} 435 </table> 436 > 437 style = rounded 438 penwidth = 2.0 439 {children} 440 {id} [ 441 shape=point 442 style=invis 443 margin=0 444 width=0 445 height=0 446 fixedsize=true 447 ] 448}}""" 449 450_dot_graph_separator = "<hr/>" 451 452_dot_graph_state_action = r"""<tr><td align="left">{type}/ {name}</td></tr>""" 453 454_dot_graph_transition = r"""{src_id} -> {dst_id} [ 455 label = {label} 456 lhead = "{lhead}" 457 ltail = "{ltail}" 458]""" 459 460_dot_graph_transition_label = r"""< 461<table cellborder="0" border="0"> 462 <tr><td>{event}{condition}{internal}{local}</td></tr> 463 {separator} 464 {actions} 465</table> 466>""" 467 468_dot_graph_transition_action = r"""<tr><td>{name}</td></tr>"""
Event name
State name
Action name
Condition name
26class Event(typing.NamedTuple): 27 """Event instance""" 28 name: EventName 29 """Event name""" 30 payload: typing.Any = None 31 """Optional payload"""
Event instance
Inherited Members
- builtins.tuple
- index
- count
34class Transition(typing.NamedTuple): 35 """Transition definition""" 36 event: EventName 37 """Event identifier. Occurrence of event with this exact identifier can 38 trigger state transition.""" 39 target: StateName | None 40 """Destination state identifier. If destination state is not defined, 41 local transition is assumed - state is not changed and transition 42 actions are triggered.""" 43 actions: list[ActionName] = [] 44 """Actions executed on transition.""" 45 conditions: list[ConditionName] = [] 46 """List of conditions. Transition is triggered only if all provided 47 conditions are met.""" 48 internal: bool = False 49 """Internal transition modifier. Determines whether the source state is 50 exited in transitions whose target state is a descendant of the source 51 state."""
Transition definition
Create new instance of Transition(event, target, actions, conditions, internal)
Event identifier. Occurrence of event with this exact identifier can trigger state transition.
Destination state identifier. If destination state is not defined, local transition is assumed - state is not changed and transition actions are triggered.
List of conditions. Transition is triggered only if all provided conditions are met.
Internal transition modifier. Determines whether the source state is exited in transitions whose target state is a descendant of the source state.
Inherited Members
- builtins.tuple
- index
- count
54class State(typing.NamedTuple): 55 """State definition""" 56 name: StateName 57 """Unique state identifier.""" 58 children: typing.List['State'] = [] 59 """Optional child states. If state has children, first child is 60 considered as its initial state.""" 61 transitions: list[Transition] = [] 62 """Possible transitions to other states.""" 63 entries: list[ActionName] = [] 64 """Actions executed when state is entered.""" 65 exits: list[ActionName] = [] 66 """Actions executed when state is exited.""" 67 final: bool = False 68 """Is state final."""
State definition
Create new instance of State(name, children, transitions, entries, exits, final)
Optional child states. If state has children, first child is considered as its initial state.
Inherited Members
- builtins.tuple
- index
- count
Action function
Action implementation which can be executed as part of entering/exiting
state or transition execution. It is called with statechart instance and
Event
which triggered transition. In case of initial actions, run during
transition to initial state, it is called with None
.
Condition function
Condition implementation used as transition guard. It is called with statechart
instance and Event
which triggered transition. Return value True
is
interpreted as satisfied condition.
92class Statechart: 93 """Statechart engine 94 95 Each instance is initialized with state definitions (first state is 96 considered initial) and action and condition definitions. 97 98 During initialization, statechart will transition to initial state. 99 100 Statechart execution is simulated by repetitive calling of 101 `Statechart.step` method. Alternatively, `Statechart.run` coroutine can 102 be used for waiting and processing new event occurrences. Coroutine `run` 103 continues execution until statechart transitions to final state. Once final 104 state is reached, `Statechart.run` finishes execution. 105 106 New events are registered with `Statechart.register` method which accepts 107 event instances containing event name and optional event payload. All event 108 registrations are queued and processed sequentially. 109 110 During statechart execution, actions and conditions are called based on 111 state changes and associated transitions provided during initialization. 112 113 Condition is considered met only if result of calling condition function is 114 ``True``. 115 116 Args: 117 states: all state definitions with (first state is initial) 118 actions: mapping of action names to their implementation 119 conditions: mapping of conditions names to their implementation 120 121 """ 122 123 def __init__(self, 124 states: typing.Iterable[State], 125 actions: dict[str, Action], 126 conditions: dict[str, Condition] = {}): 127 states = collections.deque(states) 128 initial = states[0].name if states else None 129 130 self._actions = actions 131 self._conditions = conditions 132 self._states = {} 133 self._parents = {} 134 self._stack = collections.deque() 135 self._queue = aio.Queue() 136 137 while states: 138 state = states.pop() 139 states.extend(state.children) 140 self._states[state.name] = state 141 self._parents.update({i.name: state.name for i in state.children}) 142 143 if initial: 144 self._walk_down(initial, None) 145 146 @property 147 def state(self) -> StateName | None: 148 """Current state""" 149 return self._stack[-1] if self._stack else None 150 151 @property 152 def finished(self) -> bool: 153 """Is statechart in final state""" 154 state = self.state 155 return not state or self._states[state].final 156 157 def register(self, event: Event): 158 """Add event to queue""" 159 self._queue.put_nowait(event) 160 161 def step(self) -> bool: 162 """Process next queued event 163 164 Returns ``False`` if event queue is empty. 165 166 """ 167 if self._queue.empty(): 168 return False 169 event = self._queue.get_nowait() 170 if not self.finished: 171 self._step(event) 172 return not self._queue.empty() 173 174 async def run(self): 175 """Run statechart step loop 176 177 This coroutine finishes once statechart enters final state. 178 179 """ 180 while not self.finished: 181 event = await self._queue.get() 182 self._step(event) 183 184 def _step(self, event): 185 state, transition = self._find_state_transition(self.state, event) 186 if not transition: 187 return 188 if transition.target: 189 ancestor = self._find_ancestor(state, transition.target, 190 transition.internal) 191 self._walk_up(ancestor, event) 192 self._exec_actions(transition.actions, event) 193 if transition.target: 194 self._walk_down(transition.target, event) 195 196 def _walk_up(self, target, event): 197 while self.state != target: 198 state = self._states[self.state] 199 self._exec_actions(state.exits, event) 200 self._stack.pop() 201 202 def _walk_down(self, target, event): 203 states = collections.deque([self._states[target]]) 204 while ((state := states[0]).name != self.state and 205 (parent := self._parents.get(state.name))): 206 states.appendleft(self._states[parent]) 207 while (state := states[-1]).children: 208 states.append(state.children[0]) 209 if states[0].name == self.state: 210 states.popleft() 211 for state in states: 212 self._stack.append(state.name) 213 self._exec_actions(state.entries, event) 214 215 def _find_state_transition(self, state, event): 216 while state: 217 for transition in self._states[state].transitions: 218 if transition.event != event.name: 219 continue 220 if not all(self._conditions[condition](self, event) 221 for condition in transition.conditions): 222 continue 223 return state, transition 224 state = self._parents.get(state) 225 return None, None 226 227 def _find_ancestor(self, state, sibling, internal): 228 if not sibling or not state: 229 return 230 path = collections.deque([sibling]) 231 while (parent := self._parents.get(path[0])): 232 path.appendleft(parent) 233 ancestor = None 234 for i, j in zip(self._stack, path): 235 if i != j: 236 break 237 if i in [sibling, state]: 238 if internal and i == state: 239 ancestor = i 240 break 241 ancestor = i 242 return ancestor 243 244 def _exec_actions(self, names, event): 245 for name in names: 246 action = self._actions[name] 247 action(self, event)
Statechart engine
Each instance is initialized with state definitions (first state is considered initial) and action and condition definitions.
During initialization, statechart will transition to initial state.
Statechart execution is simulated by repetitive calling of
Statechart.step
method. Alternatively, Statechart.run
coroutine can
be used for waiting and processing new event occurrences. Coroutine run
continues execution until statechart transitions to final state. Once final
state is reached, Statechart.run
finishes execution.
New events are registered with Statechart.register
method which accepts
event instances containing event name and optional event payload. All event
registrations are queued and processed sequentially.
During statechart execution, actions and conditions are called based on state changes and associated transitions provided during initialization.
Condition is considered met only if result of calling condition function is
True
.
Arguments:
- states: all state definitions with (first state is initial)
- actions: mapping of action names to their implementation
- conditions: mapping of conditions names to their implementation
123 def __init__(self, 124 states: typing.Iterable[State], 125 actions: dict[str, Action], 126 conditions: dict[str, Condition] = {}): 127 states = collections.deque(states) 128 initial = states[0].name if states else None 129 130 self._actions = actions 131 self._conditions = conditions 132 self._states = {} 133 self._parents = {} 134 self._stack = collections.deque() 135 self._queue = aio.Queue() 136 137 while states: 138 state = states.pop() 139 states.extend(state.children) 140 self._states[state.name] = state 141 self._parents.update({i.name: state.name for i in state.children}) 142 143 if initial: 144 self._walk_down(initial, None)
157 def register(self, event: Event): 158 """Add event to queue""" 159 self._queue.put_nowait(event)
Add event to queue
161 def step(self) -> bool: 162 """Process next queued event 163 164 Returns ``False`` if event queue is empty. 165 166 """ 167 if self._queue.empty(): 168 return False 169 event = self._queue.get_nowait() 170 if not self.finished: 171 self._step(event) 172 return not self._queue.empty()
Process next queued event
Returns False
if event queue is empty.
174 async def run(self): 175 """Run statechart step loop 176 177 This coroutine finishes once statechart enters final state. 178 179 """ 180 while not self.finished: 181 event = await self._queue.get() 182 self._step(event)
Run statechart step loop
This coroutine finishes once statechart enters final state.
250def parse_scxml(scxml: typing.TextIO | pathlib.Path) -> list[State]: 251 """Parse SCXML into list of state definitions""" 252 if isinstance(scxml, pathlib.Path): 253 with open(scxml, encoding='utf-8') as f: 254 root_el = _read_xml(f) 255 else: 256 root_el = _read_xml(scxml) 257 return _parse_scxml_states(root_el)
Parse SCXML into list of state definitions
260def create_dot_graph(states: typing.Iterable[State]) -> str: 261 """Create DOT representation of statechart""" 262 state_name_ids = {} 263 id_prefix = 'state' 264 states_dot = '\n'.join( 265 _create_dot_graph_states(states, state_name_ids, id_prefix)) 266 transitions_dot = '\n'.join( 267 _create_dot_graph_transitions(states, state_name_ids, id_prefix)) 268 return _dot_graph.format(states=states_dot, 269 transitions=transitions_dot)
Create DOT representation of statechart