hat.stc

Statechart library

 1"""Statechart library"""
 2
 3from hat.stc.common import (EventName,
 4                            StateName,
 5                            ActionName,
 6                            ConditionName,
 7                            Event,
 8                            Transition,
 9                            State)
10from hat.stc.dot import create_dot_graph
11from hat.stc.runner import (SyncRunner,
12                            AsyncRunner,
13                            AsyncTimer)
14from hat.stc.scxml import parse_scxml
15from hat.stc.statechart import (Action,
16                                Condition,
17                                Statechart)
18
19
20__all__ = ['EventName',
21           'StateName',
22           'ActionName',
23           'ConditionName',
24           'Event',
25           'Transition',
26           'State',
27           'create_dot_graph',
28           'SyncRunner',
29           'AsyncRunner',
30           'AsyncTimer',
31           'parse_scxml',
32           'Action',
33           'Condition',
34           'Statechart']
EventName = <class 'str'>
StateName = <class 'str'>
ActionName = <class 'str'>
ConditionName = <class 'str'>
class Event(typing.NamedTuple):
19class Event(typing.NamedTuple):
20    """Event instance"""
21    name: EventName
22    """Event name"""
23    payload: typing.Any = None
24    """Optional payload"""

Event instance

Event(name: str, payload: Any = None)

Create new instance of Event(name, payload)

name: str

Event name

payload: Any

Optional payload

class Transition(typing.NamedTuple):
27class Transition(typing.NamedTuple):
28    """Transition definition"""
29    event: EventName
30    """Event identifier. Occurrence of event with this exact identifier can
31    trigger state transition."""
32    target: StateName | None
33    """Destination state identifier. If destination state is not defined,
34    local transition is assumed - state is not changed and transition
35    actions are triggered."""
36    actions: Collection[ActionName] = []
37    """Actions executed on transition."""
38    conditions: Collection[ConditionName] = []
39    """List of conditions. Transition is triggered only if all provided
40    conditions are met."""
41    internal: bool = False
42    """Internal transition modifier. Determines whether the source state is
43    exited in transitions whose target state is a descendant of the source
44    state."""

Transition definition

Transition( event: str, target: str | None, actions: Collection[str] = [], conditions: Collection[str] = [], internal: bool = False)

Create new instance of Transition(event, target, actions, conditions, internal)

event: str

Event identifier. Occurrence of event with this exact identifier can trigger state transition.

target: str | None

Destination state identifier. If destination state is not defined, local transition is assumed - state is not changed and transition actions are triggered.

actions: Collection[str]

Actions executed on transition.

conditions: Collection[str]

List of conditions. Transition is triggered only if all provided conditions are met.

internal: bool

Internal transition modifier. Determines whether the source state is exited in transitions whose target state is a descendant of the source state.

class State(typing.NamedTuple):
47class State(typing.NamedTuple):
48    """State definition"""
49    name: StateName
50    """Unique state identifier."""
51    children: Collection['State'] = []
52    """Optional child states. If state has children, first child is
53    considered as its initial state."""
54    transitions: Collection[Transition] = []
55    """Possible transitions to other states."""
56    entries: Collection[ActionName] = []
57    """Actions executed when state is entered."""
58    exits: Collection[ActionName] = []
59    """Actions executed when state is exited."""
60    final: bool = False
61    """Is state final."""

State definition

State( name: str, children: Collection['State'] = [], transitions: Collection[Transition] = [], entries: Collection[str] = [], exits: Collection[str] = [], final: bool = False)

Create new instance of State(name, children, transitions, entries, exits, final)

name: str

Unique state identifier.

children: Collection[State]

Optional child states. If state has children, first child is considered as its initial state.

transitions: Collection[Transition]

Possible transitions to other states.

entries: Collection[str]

Actions executed when state is entered.

exits: Collection[str]

Actions executed when state is exited.

final: bool

Is state final.

def create_dot_graph(states: Iterable[State]) -> str:
 7def create_dot_graph(states: Iterable[State]) -> str:
 8    """Create DOT representation of statechart"""
 9    state_name_ids = {}
10    id_prefix = 'state'
11    states_dot = '\n'.join(
12        _create_dot_graph_states(states, state_name_ids, id_prefix))
13    transitions_dot = '\n'.join(
14        _create_dot_graph_transitions(states, state_name_ids, id_prefix))
15    return _dot_graph.format(states=states_dot,
16                             transitions=transitions_dot)

Create DOT representation of statechart

class SyncRunner:
16class SyncRunner:
17
18    def __init__(self):
19        self._queue = collections.deque()
20
21    @property
22    def empty(self) -> bool:
23        """Is event queue empty"""
24        return not self._queue
25
26    def register(self, stc: Statechart, event: Event):
27        """Add event to queue"""
28        self._queue.append((stc, event))
29
30    def step(self):
31        """Process next queued event"""
32        if not self._queue:
33            return
34
35        stc, event = self._queue.popleft()
36        stc.step(event)
empty: bool
21    @property
22    def empty(self) -> bool:
23        """Is event queue empty"""
24        return not self._queue

Is event queue empty

def register( self, stc: Statechart, event: Event):
26    def register(self, stc: Statechart, event: Event):
27        """Add event to queue"""
28        self._queue.append((stc, event))

Add event to queue

def step(self):
30    def step(self):
31        """Process next queued event"""
32        if not self._queue:
33            return
34
35        stc, event = self._queue.popleft()
36        stc.step(event)

Process next queued event

class AsyncRunner(hat.aio.group.Resource):
39class AsyncRunner(aio.Resource):
40
41    def __init__(self):
42        self._queue = aio.Queue()
43        self._async_group = aio.Group()
44
45        self.async_group.spawn(self._runner_loop)
46
47    @property
48    def async_group(self):
49        """Async group"""
50        return self._async_group
51
52    def register(self, stc: Statechart, event: Event):
53        """Add event to queue"""
54        self._queue.put_nowait((stc, event))
55
56    async def _runner_loop(self):
57        try:
58            while True:
59                stc, event = await self._queue.get()
60                stc.step(event)
61
62        except Exception as e:
63            mlog.error("runner loop error: %s", e, exc_info=e)
64
65        finally:
66            self.close()
67            self._queue.close()

Resource with lifetime control based on Group.

async_group
47    @property
48    def async_group(self):
49        """Async group"""
50        return self._async_group

Async group

def register( self, stc: Statechart, event: Event):
52    def register(self, stc: Statechart, event: Event):
53        """Add event to queue"""
54        self._queue.put_nowait((stc, event))

Add event to queue

class AsyncTimer(hat.aio.group.Resource):
 70class AsyncTimer(aio.Resource):
 71
 72    def __init__(self,
 73                 runner: AsyncRunner,
 74                 event: EventName,
 75                 duration: float):
 76        self._runner = runner
 77        self._event = event
 78        self._duration = duration
 79        self._loop = asyncio.get_running_loop()
 80        self._async_group = runner.async_group.create_subgroup()
 81        self._next_tokens = itertools.count(1)
 82        self._active_token = None
 83        self._timer = None
 84
 85        self.async_group.spawn(aio.call_on_cancel, self._stop, None, None)
 86
 87    @property
 88    def async_group(self) -> aio.Group:
 89        return self._async_group
 90
 91    @property
 92    def start(self) -> Action:
 93        return self._start
 94
 95    @property
 96    def stop(self) -> Action:
 97        return self._stop
 98
 99    @property
100    def condition(self) -> Condition:
101        return self._condition
102
103    def _start(self, stc, _):
104        if not self.is_open:
105            return
106
107        if self._timer:
108            self._timer.cancel()
109
110        self._active_token = next(self._next_tokens)
111        self._timer = self._loop.call_later(self._duration, self._on_timer,
112                                            stc, self._active_token)
113
114    def _stop(self, _, __):
115        self._active_token = None
116        if not self._timer:
117            return
118
119        self._timer.cancel()
120        self._timer = None
121
122    def _condition(self, _, event):
123        return bool(event and event.payload == self._active_token)
124
125    def _on_timer(self, stc, token):
126        if not self.is_open:
127            return
128
129        self._runner.register(stc, Event(name=self._event,
130                                         payload=token))

Resource with lifetime control based on Group.

AsyncTimer(runner: AsyncRunner, event: str, duration: float)
72    def __init__(self,
73                 runner: AsyncRunner,
74                 event: EventName,
75                 duration: float):
76        self._runner = runner
77        self._event = event
78        self._duration = duration
79        self._loop = asyncio.get_running_loop()
80        self._async_group = runner.async_group.create_subgroup()
81        self._next_tokens = itertools.count(1)
82        self._active_token = None
83        self._timer = None
84
85        self.async_group.spawn(aio.call_on_cancel, self._stop, None, None)
async_group: hat.aio.group.Group
87    @property
88    def async_group(self) -> aio.Group:
89        return self._async_group

Group controlling resource's lifetime.

start: Callable[Statechart, Event | None, None]
91    @property
92    def start(self) -> Action:
93        return self._start
stop: Callable[Statechart, Event | None, None]
95    @property
96    def stop(self) -> Action:
97        return self._stop
condition: Callable[Statechart, Event | None, bool]
 99    @property
100    def condition(self) -> Condition:
101        return self._condition
def parse_scxml(scxml: <class 'TextIO'>) -> list[State]:
11def parse_scxml(scxml: typing.TextIO) -> list[State]:
12    """Parse SCXML into list of state definitions"""
13    root_el = _read_xml(scxml)
14    return _parse_scxml_states(root_el)

Parse SCXML into list of state definitions

Action = collections.abc.Callable[['Statechart', Event | None], None]
Condition = collections.abc.Callable[['Statechart', Event | None], bool]
class Statechart:
 35class Statechart:
 36    """Statechart engine
 37
 38    Each instance is initialized with state definitions (first state is
 39    considered initial) and action and condition definitions.
 40
 41    During initialization, statechart will transition to initial state.
 42
 43    Statechart execution is simulated by repetitive calling of
 44    `Statechart.step` method which accepts event instances containing event
 45    name and optional event payload.
 46
 47    During statechart execution, actions and conditions are called based on
 48    state changes and associated transitions provided during initialization.
 49
 50    Condition is considered met only if result of calling condition function is
 51    ``True``.
 52
 53    Args:
 54        states: all state definitions with (first state is initial)
 55        actions: mapping of action names to their implementation
 56        conditions: mapping of conditions names to their implementation
 57
 58    """
 59
 60    def __init__(self,
 61                 states: Iterable[State],
 62                 actions: dict[ActionName, Action],
 63                 conditions: dict[ConditionName, Condition] = {}):
 64        states = collections.deque(states)
 65        initial = states[0].name if states else None
 66
 67        self._actions = actions
 68        self._conditions = conditions
 69        self._states = {}
 70        self._parents = {}
 71        self._stack = collections.deque()
 72
 73        while states:
 74            state = states.pop()
 75            states.extend(state.children)
 76            self._states[state.name] = state
 77            self._parents.update({i.name: state.name for i in state.children})
 78
 79        if initial:
 80            self._walk_down(initial, None)
 81
 82    @property
 83    def state(self) -> StateName | None:
 84        """Current state"""
 85        return self._stack[-1] if self._stack else None
 86
 87    @property
 88    def finished(self) -> bool:
 89        """Is statechart in final state"""
 90        state = self.state
 91        return not state or self._states[state].final
 92
 93    def step(self, event: Event):
 94        """Process single event"""
 95        if self.finished:
 96            return
 97
 98        state, transition = self._find_state_transition(self.state, event)
 99        if not transition:
100            return
101
102        if transition.target:
103            ancestor = self._find_ancestor(state, transition.target,
104                                           transition.internal)
105            self._walk_up(ancestor, event)
106
107        self._exec_actions(transition.actions, event)
108
109        if transition.target:
110            self._walk_down(transition.target, event)
111
112    def _walk_up(self, target, event):
113        while self.state != target:
114            state = self._states[self.state]
115            self._exec_actions(state.exits, event)
116            self._stack.pop()
117
118    def _walk_down(self, target, event):
119        states = collections.deque([self._states[target]])
120
121        while ((state := states[0]).name != self.state and
122                (parent := self._parents.get(state.name))):
123            states.appendleft(self._states[parent])
124
125        while (state := states[-1]).children:
126            states.append(state.children[0])
127
128        if states[0].name == self.state:
129            states.popleft()
130
131        for state in states:
132            self._stack.append(state.name)
133            self._exec_actions(state.entries, event)
134
135    def _find_state_transition(self, state, event):
136        while state:
137            for transition in self._states[state].transitions:
138                if transition.event != event.name:
139                    continue
140
141                if not all(self._conditions[condition](self, event)
142                           for condition in transition.conditions):
143                    continue
144
145                return state, transition
146
147            state = self._parents.get(state)
148
149        return None, None
150
151    def _find_ancestor(self, state, sibling, internal):
152        if not sibling or not state:
153            return
154
155        path = collections.deque([sibling])
156        while (parent := self._parents.get(path[0])):
157            path.appendleft(parent)
158
159        ancestor = None
160        for i, j in zip(self._stack, path):
161            if i != j:
162                break
163
164            if i in [sibling, state]:
165                if internal and i == state:
166                    ancestor = i
167                break
168
169            ancestor = i
170
171        return ancestor
172
173    def _exec_actions(self, names, event):
174        for name in names:
175            action = self._actions[name]
176            action(self, event)

Statechart engine

Each instance is initialized with state definitions (first state is considered initial) and action and condition definitions.

During initialization, statechart will transition to initial state.

Statechart execution is simulated by repetitive calling of Statechart.step method which accepts event instances containing event name and optional event payload.

During statechart execution, actions and conditions are called based on state changes and associated transitions provided during initialization.

Condition is considered met only if result of calling condition function is True.

Arguments:
  • states: all state definitions with (first state is initial)
  • actions: mapping of action names to their implementation
  • conditions: mapping of conditions names to their implementation
Statechart( states: Iterable[State], actions: dict[str, Callable[Statechart, Event | None, None]], conditions: dict[str, Callable[Statechart, Event | None, bool]] = {})
60    def __init__(self,
61                 states: Iterable[State],
62                 actions: dict[ActionName, Action],
63                 conditions: dict[ConditionName, Condition] = {}):
64        states = collections.deque(states)
65        initial = states[0].name if states else None
66
67        self._actions = actions
68        self._conditions = conditions
69        self._states = {}
70        self._parents = {}
71        self._stack = collections.deque()
72
73        while states:
74            state = states.pop()
75            states.extend(state.children)
76            self._states[state.name] = state
77            self._parents.update({i.name: state.name for i in state.children})
78
79        if initial:
80            self._walk_down(initial, None)
state: str | None
82    @property
83    def state(self) -> StateName | None:
84        """Current state"""
85        return self._stack[-1] if self._stack else None

Current state

finished: bool
87    @property
88    def finished(self) -> bool:
89        """Is statechart in final state"""
90        state = self.state
91        return not state or self._states[state].final

Is statechart in final state

def step(self, event: Event):
 93    def step(self, event: Event):
 94        """Process single event"""
 95        if self.finished:
 96            return
 97
 98        state, transition = self._find_state_transition(self.state, event)
 99        if not transition:
100            return
101
102        if transition.target:
103            ancestor = self._find_ancestor(state, transition.target,
104                                           transition.internal)
105            self._walk_up(ancestor, event)
106
107        self._exec_actions(transition.actions, event)
108
109        if transition.target:
110            self._walk_down(transition.target, event)

Process single event