summaryrefslogtreecommitdiffstats
path: root/stator/graph.py
blob: b06ffb835ad9056d5a538c91897e638979b717af (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import datetime
from functools import wraps
from typing import Callable, ClassVar, Dict, List, Optional, Set, Tuple, Union

from django.db import models
from django.utils import timezone


class StateGraph:
    """
    Represents a graph of possible states and transitions to attempt on them.
    Does not support subclasses of existing graphs yet.
    """

    states: ClassVar[Dict[str, "State"]]
    choices: ClassVar[List[Tuple[str, str]]]
    initial_state: ClassVar["State"]
    terminal_states: ClassVar[Set["State"]]

    def __init_subclass__(cls) -> None:
        # Collect state memebers
        cls.states = {}
        for name, value in cls.__dict__.items():
            if name in ["__module__", "__doc__", "states"]:
                pass
            elif name in ["initial_state", "terminal_states", "choices"]:
                raise ValueError(f"Cannot name a state {name} - this is reserved")
            elif isinstance(value, State):
                value._add_to_graph(cls, name)
            elif callable(value) or isinstance(value, classmethod):
                pass
            else:
                raise ValueError(
                    f"Graph has item {name} of unallowed type {type(value)}"
                )
        # Check the graph layout
        terminal_states = set()
        initial_state = None
        for state in cls.states.values():
            if state.initial:
                if initial_state:
                    raise ValueError(
                        f"The graph has more than one initial state: {initial_state} and {state}"
                    )
                initial_state = state
            if state.terminal:
                terminal_states.add(state)
        if initial_state is None:
            raise ValueError("The graph has no initial state")
        cls.initial_state = initial_state
        cls.terminal_states = terminal_states
        # Generate choices
        cls.choices = [(name, name) for name in cls.states.keys()]


class State:
    """
    Represents an individual state
    """

    def __init__(self, try_interval: float = 300):
        self.try_interval = try_interval
        self.parents: Set["State"] = set()
        self.children: Dict["State", "Transition"] = {}

    def _add_to_graph(self, graph: StateGraph, name: str):
        self.graph = graph
        self.name = name
        self.graph.states[name] = self

    def __repr__(self):
        return f"<State {self.name}>"

    def add_transition(
        self,
        other: "State",
        handler: Optional[Union[str, Callable]] = None,
        priority: int = 0,
    ) -> Callable:
        def decorator(handler: Union[str, Callable]):
            self.children[other] = Transition(
                self,
                other,
                handler,
                priority=priority,
            )
            other.parents.add(self)
            # All handlers should be class methods, so do that automatically.
            if callable(handler):
                return classmethod(handler)

        # If we're not being called as a decorator, invoke it immediately
        if handler is not None:
            decorator(handler)
        return decorator

    def add_manual_transition(self, other: "State"):
        self.children[other] = ManualTransition(self, other)
        other.parents.add(self)

    @property
    def initial(self):
        return not self.parents

    @property
    def terminal(self):
        return not self.children

    def transitions(self, automatic_only=False) -> List["Transition"]:
        """
        Returns all transitions from this State in priority order
        """
        if automatic_only:
            transitions = [t for t in self.children.values() if t.automatic]
        else:
            transitions = self.children.values()
        return sorted(transitions, key=lambda t: t.priority, reverse=True)


class Transition:
    """
    A possible transition from one state to another
    """

    def __init__(
        self,
        from_state: State,
        to_state: State,
        handler: Union[str, Callable],
        priority: int = 0,
    ):
        self.from_state = from_state
        self.to_state = to_state
        self.handler = handler
        self.priority = priority
        self.automatic = True

    def get_handler(self) -> Callable:
        """
        Returns the handler (it might need resolving from a string)
        """
        if isinstance(self.handler, str):
            self.handler = getattr(self.from_state.graph, self.handler)
        return self.handler


class ManualTransition(Transition):
    """
    A possible transition from one state to another that cannot be done by
    the stator task runner, and must come from an external source.
    """

    def __init__(
        self,
        from_state: State,
        to_state: State,
    ):
        self.from_state = from_state
        self.to_state = to_state
        self.handler = None
        self.priority = 0
        self.automatic = False