summaryrefslogtreecommitdiffstats
path: root/stator/runner.py
blob: ad3f6607618cc2091514b2f26c466f23a80e485f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import asyncio
import datetime
import time
import traceback
import uuid

from asgiref.sync import async_to_sync, sync_to_async
from django.utils import timezone

from core import exceptions, sentry
from core.models import Config
from stator.models import StatorModel, Stats


class StatorRunner:
    """
    Runs tasks on models that are looking for state changes.
    Designed to run either indefinitely, or just for a few seconds.
    """

    def __init__(
        self,
        models: list[type[StatorModel]],
        concurrency: int = 50,
        concurrency_per_model: int = 10,
        liveness_file: str | None = None,
        schedule_interval: int = 30,
        lock_expiry: int = 300,
        run_for: int = 0,
    ):
        self.models = models
        self.runner_id = uuid.uuid4().hex
        self.concurrency = concurrency
        self.concurrency_per_model = concurrency_per_model
        self.liveness_file = liveness_file
        self.schedule_interval = schedule_interval
        self.lock_expiry = lock_expiry
        self.run_for = run_for

    async def run(self):
        sentry.set_takahe_app("stator")
        self.handled = {}
        self.started = time.monotonic()
        self.last_clean = time.monotonic() - self.schedule_interval
        self.tasks = []
        # For the first time period, launch tasks
        print("Running main task loop")
        try:
            with sentry.configure_scope() as scope:
                while True:
                    # Do we need to do cleaning?
                    if (time.monotonic() - self.last_clean) >= self.schedule_interval:
                        # Refresh the config
                        Config.system = await Config.aload_system()
                        print("Tasks processed this loop:")
                        for label, number in self.handled.items():
                            print(f"  {label}: {number}")
                        print("Running cleaning and scheduling")
                        await self.run_scheduling()

                    # Clear the cleaning breadcrumbs/extra for the main part of the loop
                    sentry.scope_clear(scope)

                    self.remove_completed_tasks()
                    await self.fetch_and_process_tasks()

                    # Are we in limited run mode?
                    if (
                        self.run_for
                        and (time.monotonic() - self.started) > self.run_for
                    ):
                        break
                    # Prevent busylooping
                    await asyncio.sleep(0.5)
                    # Clear the Sentry breadcrumbs and extra for next loop
                    sentry.scope_clear(scope)
        except KeyboardInterrupt:
            pass
        # Wait for tasks to finish
        print("Waiting for tasks to complete")
        while True:
            self.remove_completed_tasks()
            if not self.tasks:
                break
            # Prevent busylooping
            await asyncio.sleep(0.1)
        print("Complete")
        return self.handled

    async def run_scheduling(self):
        """
        Do any transition cleanup tasks
        """
        with sentry.start_transaction(op="task", name="stator.run_scheduling"):
            for model in self.models:
                asyncio.create_task(self.submit_stats(model))
                asyncio.create_task(model.atransition_clean_locks())
                asyncio.create_task(model.atransition_schedule_due())
            self.last_clean = time.monotonic()

    async def submit_stats(self, model):
        """
        Pop some statistics into the database
        """
        stats_instance = await Stats.aget_for_model(model)
        if stats_instance.model_label in self.handled:
            stats_instance.add_handled(self.handled[stats_instance.model_label])
            del self.handled[stats_instance.model_label]
        stats_instance.set_queued(await model.atransition_ready_count())
        stats_instance.trim_data()
        await sync_to_async(stats_instance.save)()

    async def fetch_and_process_tasks(self):
        # Calculate space left for tasks
        space_remaining = self.concurrency - len(self.tasks)
        # Fetch new tasks
        for model in self.models:
            if space_remaining > 0:
                for instance in await model.atransition_get_with_lock(
                    number=min(space_remaining, self.concurrency_per_model),
                    lock_expiry=(
                        timezone.now() + datetime.timedelta(seconds=self.lock_expiry)
                    ),
                ):
                    self.tasks.append(
                        asyncio.create_task(self.run_transition(instance))
                    )
                    self.handled[model._meta.label_lower] = (
                        self.handled.get(model._meta.label_lower, 0) + 1
                    )
                    space_remaining -= 1

    async def run_transition(self, instance: StatorModel):
        """
        Wrapper for atransition_attempt with fallback error handling
        """
        task_name = f"stator.run_transition:{instance._meta.label_lower}#{{id}} from {instance.state}"
        with sentry.start_transaction(op="task", name=task_name):
            sentry.set_context(
                "instance",
                {
                    "model": instance._meta.label_lower,
                    "pk": instance.pk,
                    "state": instance.state,
                    "state_age": instance.state_age,
                },
            )

            try:
                print(
                    f"Attempting transition on {instance._meta.label_lower}#{instance.pk} from state {instance.state}"
                )
                await instance.atransition_attempt()
            except BaseException as e:
                await exceptions.acapture_exception(e)
                traceback.print_exc()

    def remove_completed_tasks(self):
        """
        Removes all completed asyncio.Tasks from our local in-progress list
        """
        self.tasks = [t for t in self.tasks if not t.done()]

    async def run_single_cycle(self):
        """
        Testing entrypoint to advance things just one cycle
        """
        await asyncio.wait_for(self.fetch_and_process_tasks(), timeout=1)
        for _ in range(100):
            if not self.tasks:
                break
            self.remove_completed_tasks()
            await asyncio.sleep(0.01)

    run_single_cycle_sync = async_to_sync(run_single_cycle)