hat.stc
Statechart module
1"""Statechart module""" 2 3import collections 4import itertools 5import pathlib 6import typing 7import xml.etree.ElementTree 8 9from hat import aio 10from hat import util 11 12 13EventName = str 14"""Event name""" 15util.register_type_alias('EventName') 16 17StateName = str 18"""State name""" 19util.register_type_alias('StateName') 20 21ActionName = str 22"""Action name""" 23util.register_type_alias('ActionName') 24 25ConditionName = str 26"""Condition name""" 27util.register_type_alias('ConditionName') 28 29 30class Event(typing.NamedTuple): 31 """Event instance""" 32 name: EventName 33 """Event name""" 34 payload: typing.Any = None 35 """Optional payload""" 36 37 38class Transition(typing.NamedTuple): 39 """Transition definition""" 40 event: EventName 41 """Event identifier. Occurrence of event with this exact identifier can 42 trigger state transition.""" 43 target: typing.Optional[StateName] 44 """Destination state identifier. If destination state is not defined, 45 local transition is assumed - state is not changed and transition 46 actions are triggered.""" 47 actions: typing.List[ActionName] = [] 48 """Actions executed on transition.""" 49 conditions: typing.List[ConditionName] = [] 50 """List of conditions. Transition is triggered only if all provided 51 conditions are met.""" 52 internal: bool = False 53 """Internal transition modifier. Determines whether the source state is 54 exited in transitions whose target state is a descendant of the source 55 state.""" 56 57 58class State(typing.NamedTuple): 59 """State definition""" 60 name: StateName 61 """Unique state identifier.""" 62 children: typing.List['State'] = [] 63 """Optional child states. If state has children, first child is 64 considered as its initial state.""" 65 transitions: typing.List[Transition] = [] 66 """Possible transitions to other states.""" 67 entries: typing.List[ActionName] = [] 68 """Actions executed when state is entered.""" 69 exits: typing.List[ActionName] = [] 70 """Actions executed when state is exited.""" 71 final: bool = False 72 """Is state final.""" 73 74 75Action = typing.Callable[['Statechart', typing.Optional[Event]], None] 76"""Action function 77 78Action implementation which can be executed as part of entering/exiting 79state or transition execution. It is called with statechart instance and 80`Event` which triggered transition. In case of initial actions, run during 81transition to initial state, it is called with ``None``. 82 83""" 84util.register_type_alias('Action') 85 86Condition = typing.Callable[['Statechart', typing.Optional[Event]], bool] 87"""Condition function 88 89Condition implementation used as transition guard. It is called with statechart 90instance and `Event` which triggered transition. Return value ``True`` is 91interpreted as satisfied condition. 92 93""" 94util.register_type_alias('Condition') 95 96 97class Statechart: 98 """Statechart engine 99 100 Each instance is initialized with state definitions (first state is 101 considered initial) and action and condition definitions. 102 103 During initialization, statechart will transition to initial state. 104 105 Statechart execution is simulated by repetitive calling of 106 `Statechart.step` method. Alternatively, `Statechart.run` coroutine can 107 be used for waiting and processing new event occurrences. Coroutine `run` 108 continues execution until statechart transitions to final state. Once final 109 state is reached, `Statechart.run` finishes execution. 110 111 New events are registered with `Statechart.register` method which accepts 112 event instances containing event name and optional event payload. All event 113 registrations are queued and processed sequentially. 114 115 During statechart execution, actions and conditions are called based on 116 state changes and associated transitions provided during initialization. 117 118 Condition is considered met only if result of calling condition function is 119 ``True``. 120 121 Args: 122 states: all state definitions with (first state is initial) 123 actions: mapping of action names to their implementation 124 conditions: mapping of conditions names to their implementation 125 126 """ 127 128 def __init__(self, 129 states: typing.Iterable[State], 130 actions: typing.Dict[str, Action], 131 conditions: typing.Dict[str, Condition] = {}): 132 states = collections.deque(states) 133 initial = states[0].name if states else None 134 135 self._actions = actions 136 self._conditions = conditions 137 self._states = {} 138 self._parents = {} 139 self._stack = collections.deque() 140 self._queue = aio.Queue() 141 142 while states: 143 state = states.pop() 144 states.extend(state.children) 145 self._states[state.name] = state 146 self._parents.update({i.name: state.name for i in state.children}) 147 148 if initial: 149 self._walk_down(initial, None) 150 151 @property 152 def state(self) -> typing.Optional[StateName]: 153 """Current state""" 154 return self._stack[-1] if self._stack else None 155 156 @property 157 def finished(self) -> bool: 158 """Is statechart in final state""" 159 state = self.state 160 return not state or self._states[state].final 161 162 def register(self, event: Event): 163 """Add event to queue""" 164 self._queue.put_nowait(event) 165 166 def step(self) -> bool: 167 """Process next queued event 168 169 Returns ``False`` if event queue is empty. 170 171 """ 172 if self._queue.empty(): 173 return False 174 event = self._queue.get_nowait() 175 if not self.finished: 176 self._step(event) 177 return not self._queue.empty() 178 179 async def run(self): 180 """Run statechart step loop 181 182 This coroutine finishes once statechart enters final state. 183 184 """ 185 while not self.finished: 186 event = await self._queue.get() 187 self._step(event) 188 189 def _step(self, event): 190 state, transition = self._find_state_transition(self.state, event) 191 if not transition: 192 return 193 if transition.target: 194 ancestor = self._find_ancestor(state, transition.target, 195 transition.internal) 196 self._walk_up(ancestor, event) 197 self._exec_actions(transition.actions, event) 198 if transition.target: 199 self._walk_down(transition.target, event) 200 201 def _walk_up(self, target, event): 202 while self.state != target: 203 state = self._states[self.state] 204 self._exec_actions(state.exits, event) 205 self._stack.pop() 206 207 def _walk_down(self, target, event): 208 states = collections.deque([self._states[target]]) 209 while ((state := states[0]).name != self.state and 210 (parent := self._parents.get(state.name))): 211 states.appendleft(self._states[parent]) 212 while (state := states[-1]).children: 213 states.append(state.children[0]) 214 if states[0].name == self.state: 215 states.popleft() 216 for state in states: 217 self._stack.append(state.name) 218 self._exec_actions(state.entries, event) 219 220 def _find_state_transition(self, state, event): 221 while state: 222 for transition in self._states[state].transitions: 223 if transition.event != event.name: 224 continue 225 if not all(self._conditions[condition](self, event) 226 for condition in transition.conditions): 227 continue 228 return state, transition 229 state = self._parents.get(state) 230 return None, None 231 232 def _find_ancestor(self, state, sibling, internal): 233 if not sibling or not state: 234 return 235 path = collections.deque([sibling]) 236 while (parent := self._parents.get(path[0])): 237 path.appendleft(parent) 238 ancestor = None 239 for i, j in zip(self._stack, path): 240 if i != j: 241 break 242 if i in [sibling, state]: 243 if internal and i == state: 244 ancestor = i 245 break 246 ancestor = i 247 return ancestor 248 249 def _exec_actions(self, names, event): 250 for name in names: 251 action = self._actions[name] 252 action(self, event) 253 254 255def parse_scxml(scxml: typing.Union[typing.TextIO, pathlib.Path] 256 ) -> typing.List[State]: 257 """Parse SCXML into list of state definitions""" 258 if isinstance(scxml, pathlib.Path): 259 with open(scxml, encoding='utf-8') as f: 260 root_el = _read_xml(f) 261 else: 262 root_el = _read_xml(scxml) 263 return _parse_scxml_states(root_el) 264 265 266def create_dot_graph(states: typing.Iterable[State]) -> str: 267 """Create DOT representation of statechart""" 268 state_name_ids = {} 269 id_prefix = 'state' 270 states_dot = '\n'.join( 271 _create_dot_graph_states(states, state_name_ids, id_prefix)) 272 transitions_dot = '\n'.join( 273 _create_dot_graph_transitions(states, state_name_ids, id_prefix)) 274 return _dot_graph.format(states=states_dot, 275 transitions=transitions_dot) 276 277 278def _parse_scxml_states(parent_el): 279 states = {} 280 for state_el in itertools.chain(parent_el.findall("./state"), 281 parent_el.findall("./final")): 282 state = _parse_scxml_state(state_el) 283 states[state.name] = state 284 if not states: 285 return [] 286 initial = parent_el.get('initial') 287 return [states[initial], *(state for name, state in states.items() 288 if name != initial)] 289 290 291def _parse_scxml_state(state_el): 292 return State(name=state_el.get('id'), 293 children=_parse_scxml_states(state_el), 294 transitions=[_parse_scxml_transition(i) 295 for i in state_el.findall('./transition')], 296 entries=[entry_el.text 297 for entry_el in state_el.findall('./onentry') 298 if entry_el.text], 299 exits=[exit_el.text 300 for exit_el in state_el.findall('./onexit') 301 if exit_el.text], 302 final=state_el.tag == 'final') 303 304 305def _parse_scxml_transition(transition_el): 306 return Transition(event=transition_el.get('event'), 307 target=transition_el.get('target'), 308 actions=[i 309 for i in (transition_el.text or '').split() 310 if i], 311 conditions=[i for i in (transition_el.get('cond') or 312 '').split() 313 if i], 314 internal=transition_el.get('type') == 'internal') 315 316 317def _read_xml(source): 318 it = xml.etree.ElementTree.iterparse(source) 319 for _, el in it: 320 prefix, has_namespace, postfix = el.tag.partition('}') 321 if has_namespace: 322 el.tag = postfix 323 return it.root 324 325 326def _create_dot_graph_states(states, state_name_ids, id_prefix): 327 if not states: 328 return 329 yield _dot_graph_initial.format(id=f'{id_prefix}_initial') 330 for i, state in enumerate(states): 331 state_id = f'{id_prefix}_{i}' 332 state_name_ids[state.name] = state_id 333 actions = '\n'.join(_create_dot_graph_state_actions(state)) 334 separator = _dot_graph_separator if actions else '' 335 children = '\n'.join( 336 _create_dot_graph_states(state.children, state_name_ids, state_id)) 337 yield _dot_graph_state.format(id=state_id, 338 name=state.name, 339 separator=separator, 340 actions=actions, 341 children=children) 342 343 344def _create_dot_graph_state_actions(state): 345 for name in state.entries: 346 yield _dot_graph_state_action.format(type='entry', name=name) 347 for name in state.entries: 348 yield _dot_graph_state_action.format(type='exit', name=name) 349 350 351def _create_dot_graph_transitions(states, state_name_ids, id_prefix): 352 if not states: 353 return 354 yield _dot_graph_transition.format(src_id=f'{id_prefix}_initial', 355 dst_id=f'{id_prefix}_0', 356 label='""', 357 lhead=f'cluster_{id_prefix}_0', 358 ltail='') 359 for state in states: 360 src_id = state_name_ids[state.name] 361 for transition in state.transitions: 362 dst_id = (state_name_ids[transition.target] if transition.target 363 else src_id) 364 label = _create_dot_graph_transition_label(transition) 365 lhead = f'cluster_{dst_id}' 366 ltail = f'cluster_{src_id}' 367 if lhead == ltail: 368 lhead, ltail = '', '' 369 elif ltail.startswith(lhead): 370 lhead = '' 371 elif lhead.startswith(ltail): 372 ltail = '' 373 yield _dot_graph_transition.format(src_id=src_id, 374 dst_id=dst_id, 375 label=label, 376 lhead=lhead, 377 ltail=ltail) 378 yield from _create_dot_graph_transitions(state.children, 379 state_name_ids, src_id) 380 381 382def _create_dot_graph_transition_label(transition): 383 separator = (_dot_graph_separator 384 if transition.actions or transition.conditions 385 else '') 386 actions = '\n'.join(_dot_graph_transition_action.format(name=name) 387 for name in transition.actions) 388 condition = (f" [{' '.join(transition.conditions)}]" 389 if transition.conditions else "") 390 internal = ' (internal)' if transition.internal else '' 391 local = ' (local)' if transition.target is None else '' 392 return _dot_graph_transition_label.format(event=transition.event, 393 condition=condition, 394 internal=internal, 395 local=local, 396 separator=separator, 397 actions=actions) 398 399 400_dot_graph = r"""digraph "stc" {{ 401 fontname = Helvetica 402 fontsize = 12 403 penwidth = 2.0 404 splines = true 405 ordering = out 406 compound = true 407 overlap = scale 408 nodesep = 0.3 409 ranksep = 0.1 410 node [ 411 shape = plaintext 412 style = filled 413 fillcolor = transparent 414 fontname = Helvetica 415 fontsize = 12 416 penwidth = 2.0 417 ] 418 edge [ 419 fontname = Helvetica 420 fontsize = 12 421 ] 422 {states} 423 {transitions} 424}} 425""" 426 427_dot_graph_initial = r"""{id} [ 428 shape = circle 429 style = filled 430 fillcolor = black 431 fixedsize = true 432 height = 0.15 433 label = "" 434]""" 435 436_dot_graph_state = r"""subgraph "cluster_{id}" {{ 437 label = < 438 <table cellborder="0" border="0"> 439 <tr><td>{name}</td></tr> 440 {separator} 441 {actions} 442 </table> 443 > 444 style = rounded 445 penwidth = 2.0 446 {children} 447 {id} [ 448 shape=point 449 style=invis 450 margin=0 451 width=0 452 height=0 453 fixedsize=true 454 ] 455}}""" 456 457_dot_graph_separator = "<hr/>" 458 459_dot_graph_state_action = r"""<tr><td align="left">{type}/ {name}</td></tr>""" 460 461_dot_graph_transition = r"""{src_id} -> {dst_id} [ 462 label = {label} 463 lhead = "{lhead}" 464 ltail = "{ltail}" 465]""" 466 467_dot_graph_transition_label = r"""< 468<table cellborder="0" border="0"> 469 <tr><td>{event}{condition}{internal}{local}</td></tr> 470 {separator} 471 {actions} 472</table> 473>""" 474 475_dot_graph_transition_action = r"""<tr><td>{name}</td></tr>"""
Event name
State name
Action name
Condition name
31class Event(typing.NamedTuple): 32 """Event instance""" 33 name: EventName 34 """Event name""" 35 payload: typing.Any = None 36 """Optional payload"""
Event instance
Inherited Members
- builtins.tuple
- index
- count
39class Transition(typing.NamedTuple): 40 """Transition definition""" 41 event: EventName 42 """Event identifier. Occurrence of event with this exact identifier can 43 trigger state transition.""" 44 target: typing.Optional[StateName] 45 """Destination state identifier. If destination state is not defined, 46 local transition is assumed - state is not changed and transition 47 actions are triggered.""" 48 actions: typing.List[ActionName] = [] 49 """Actions executed on transition.""" 50 conditions: typing.List[ConditionName] = [] 51 """List of conditions. Transition is triggered only if all provided 52 conditions are met.""" 53 internal: bool = False 54 """Internal transition modifier. Determines whether the source state is 55 exited in transitions whose target state is a descendant of the source 56 state."""
Transition definition
Create new instance of Transition(event, target, actions, conditions, internal)
Event identifier. Occurrence of event with this exact identifier can trigger state transition.
Destination state identifier. If destination state is not defined, local transition is assumed - state is not changed and transition actions are triggered.
List of conditions. Transition is triggered only if all provided conditions are met.
Internal transition modifier. Determines whether the source state is exited in transitions whose target state is a descendant of the source state.
Inherited Members
- builtins.tuple
- index
- count
59class State(typing.NamedTuple): 60 """State definition""" 61 name: StateName 62 """Unique state identifier.""" 63 children: typing.List['State'] = [] 64 """Optional child states. If state has children, first child is 65 considered as its initial state.""" 66 transitions: typing.List[Transition] = [] 67 """Possible transitions to other states.""" 68 entries: typing.List[ActionName] = [] 69 """Actions executed when state is entered.""" 70 exits: typing.List[ActionName] = [] 71 """Actions executed when state is exited.""" 72 final: bool = False 73 """Is state final."""
State definition
Create new instance of State(name, children, transitions, entries, exits, final)
Optional child states. If state has children, first child is considered as its initial state.
Inherited Members
- builtins.tuple
- index
- count
Action function
Action implementation which can be executed as part of entering/exiting
state or transition execution. It is called with statechart instance and
Event
which triggered transition. In case of initial actions, run during
transition to initial state, it is called with None
.
Condition function
Condition implementation used as transition guard. It is called with statechart
instance and Event
which triggered transition. Return value True
is
interpreted as satisfied condition.
98class Statechart: 99 """Statechart engine 100 101 Each instance is initialized with state definitions (first state is 102 considered initial) and action and condition definitions. 103 104 During initialization, statechart will transition to initial state. 105 106 Statechart execution is simulated by repetitive calling of 107 `Statechart.step` method. Alternatively, `Statechart.run` coroutine can 108 be used for waiting and processing new event occurrences. Coroutine `run` 109 continues execution until statechart transitions to final state. Once final 110 state is reached, `Statechart.run` finishes execution. 111 112 New events are registered with `Statechart.register` method which accepts 113 event instances containing event name and optional event payload. All event 114 registrations are queued and processed sequentially. 115 116 During statechart execution, actions and conditions are called based on 117 state changes and associated transitions provided during initialization. 118 119 Condition is considered met only if result of calling condition function is 120 ``True``. 121 122 Args: 123 states: all state definitions with (first state is initial) 124 actions: mapping of action names to their implementation 125 conditions: mapping of conditions names to their implementation 126 127 """ 128 129 def __init__(self, 130 states: typing.Iterable[State], 131 actions: typing.Dict[str, Action], 132 conditions: typing.Dict[str, Condition] = {}): 133 states = collections.deque(states) 134 initial = states[0].name if states else None 135 136 self._actions = actions 137 self._conditions = conditions 138 self._states = {} 139 self._parents = {} 140 self._stack = collections.deque() 141 self._queue = aio.Queue() 142 143 while states: 144 state = states.pop() 145 states.extend(state.children) 146 self._states[state.name] = state 147 self._parents.update({i.name: state.name for i in state.children}) 148 149 if initial: 150 self._walk_down(initial, None) 151 152 @property 153 def state(self) -> typing.Optional[StateName]: 154 """Current state""" 155 return self._stack[-1] if self._stack else None 156 157 @property 158 def finished(self) -> bool: 159 """Is statechart in final state""" 160 state = self.state 161 return not state or self._states[state].final 162 163 def register(self, event: Event): 164 """Add event to queue""" 165 self._queue.put_nowait(event) 166 167 def step(self) -> bool: 168 """Process next queued event 169 170 Returns ``False`` if event queue is empty. 171 172 """ 173 if self._queue.empty(): 174 return False 175 event = self._queue.get_nowait() 176 if not self.finished: 177 self._step(event) 178 return not self._queue.empty() 179 180 async def run(self): 181 """Run statechart step loop 182 183 This coroutine finishes once statechart enters final state. 184 185 """ 186 while not self.finished: 187 event = await self._queue.get() 188 self._step(event) 189 190 def _step(self, event): 191 state, transition = self._find_state_transition(self.state, event) 192 if not transition: 193 return 194 if transition.target: 195 ancestor = self._find_ancestor(state, transition.target, 196 transition.internal) 197 self._walk_up(ancestor, event) 198 self._exec_actions(transition.actions, event) 199 if transition.target: 200 self._walk_down(transition.target, event) 201 202 def _walk_up(self, target, event): 203 while self.state != target: 204 state = self._states[self.state] 205 self._exec_actions(state.exits, event) 206 self._stack.pop() 207 208 def _walk_down(self, target, event): 209 states = collections.deque([self._states[target]]) 210 while ((state := states[0]).name != self.state and 211 (parent := self._parents.get(state.name))): 212 states.appendleft(self._states[parent]) 213 while (state := states[-1]).children: 214 states.append(state.children[0]) 215 if states[0].name == self.state: 216 states.popleft() 217 for state in states: 218 self._stack.append(state.name) 219 self._exec_actions(state.entries, event) 220 221 def _find_state_transition(self, state, event): 222 while state: 223 for transition in self._states[state].transitions: 224 if transition.event != event.name: 225 continue 226 if not all(self._conditions[condition](self, event) 227 for condition in transition.conditions): 228 continue 229 return state, transition 230 state = self._parents.get(state) 231 return None, None 232 233 def _find_ancestor(self, state, sibling, internal): 234 if not sibling or not state: 235 return 236 path = collections.deque([sibling]) 237 while (parent := self._parents.get(path[0])): 238 path.appendleft(parent) 239 ancestor = None 240 for i, j in zip(self._stack, path): 241 if i != j: 242 break 243 if i in [sibling, state]: 244 if internal and i == state: 245 ancestor = i 246 break 247 ancestor = i 248 return ancestor 249 250 def _exec_actions(self, names, event): 251 for name in names: 252 action = self._actions[name] 253 action(self, event)
Statechart engine
Each instance is initialized with state definitions (first state is considered initial) and action and condition definitions.
During initialization, statechart will transition to initial state.
Statechart execution is simulated by repetitive calling of
Statechart.step
method. Alternatively, Statechart.run
coroutine can
be used for waiting and processing new event occurrences. Coroutine run
continues execution until statechart transitions to final state. Once final
state is reached, Statechart.run
finishes execution.
New events are registered with Statechart.register
method which accepts
event instances containing event name and optional event payload. All event
registrations are queued and processed sequentially.
During statechart execution, actions and conditions are called based on state changes and associated transitions provided during initialization.
Condition is considered met only if result of calling condition function is
True
.
Arguments:
- states: all state definitions with (first state is initial)
- actions: mapping of action names to their implementation
- conditions: mapping of conditions names to their implementation
129 def __init__(self, 130 states: typing.Iterable[State], 131 actions: typing.Dict[str, Action], 132 conditions: typing.Dict[str, Condition] = {}): 133 states = collections.deque(states) 134 initial = states[0].name if states else None 135 136 self._actions = actions 137 self._conditions = conditions 138 self._states = {} 139 self._parents = {} 140 self._stack = collections.deque() 141 self._queue = aio.Queue() 142 143 while states: 144 state = states.pop() 145 states.extend(state.children) 146 self._states[state.name] = state 147 self._parents.update({i.name: state.name for i in state.children}) 148 149 if initial: 150 self._walk_down(initial, None)
163 def register(self, event: Event): 164 """Add event to queue""" 165 self._queue.put_nowait(event)
Add event to queue
167 def step(self) -> bool: 168 """Process next queued event 169 170 Returns ``False`` if event queue is empty. 171 172 """ 173 if self._queue.empty(): 174 return False 175 event = self._queue.get_nowait() 176 if not self.finished: 177 self._step(event) 178 return not self._queue.empty()
Process next queued event
Returns False
if event queue is empty.
180 async def run(self): 181 """Run statechart step loop 182 183 This coroutine finishes once statechart enters final state. 184 185 """ 186 while not self.finished: 187 event = await self._queue.get() 188 self._step(event)
Run statechart step loop
This coroutine finishes once statechart enters final state.
256def parse_scxml(scxml: typing.Union[typing.TextIO, pathlib.Path] 257 ) -> typing.List[State]: 258 """Parse SCXML into list of state definitions""" 259 if isinstance(scxml, pathlib.Path): 260 with open(scxml, encoding='utf-8') as f: 261 root_el = _read_xml(f) 262 else: 263 root_el = _read_xml(scxml) 264 return _parse_scxml_states(root_el)
Parse SCXML into list of state definitions
267def create_dot_graph(states: typing.Iterable[State]) -> str: 268 """Create DOT representation of statechart""" 269 state_name_ids = {} 270 id_prefix = 'state' 271 states_dot = '\n'.join( 272 _create_dot_graph_states(states, state_name_ids, id_prefix)) 273 transitions_dot = '\n'.join( 274 _create_dot_graph_transitions(states, state_name_ids, id_prefix)) 275 return _dot_graph.format(states=states_dot, 276 transitions=transitions_dot)
Create DOT representation of statechart