summaryrefslogtreecommitdiffstats
path: root/stator/runner.py
blob: a954a2e6576fc33b8ad830a11492a0bb6c5f569e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import asyncio
import datetime
import time
import traceback
import uuid
from typing import List, Optional, Type

from django.utils import timezone

from core import exceptions
from core.models import Config
from stator.models import StatorModel


class StatorRunner:
    """
    Runs tasks on models that are looking for state changes.
    Designed to run for a determinate amount of time, and then exit.
    """

    def __init__(
        self,
        models: List[Type[StatorModel]],
        concurrency: int = 50,
        concurrency_per_model: int = 10,
        liveness_file: Optional[str] = None,
        schedule_interval: int = 30,
        lock_expiry: int = 300,
    ):
        self.models = models
        self.runner_id = uuid.uuid4().hex
        self.concurrency = concurrency
        self.concurrency_per_model = concurrency_per_model
        self.liveness_file = liveness_file
        self.schedule_interval = schedule_interval
        self.lock_expiry = lock_expiry

    async def run(self):
        self.handled = 0
        self.last_clean = time.monotonic() - self.schedule_interval
        self.tasks = []
        # For the first time period, launch tasks
        print("Running main task loop")
        try:
            while True:
                # Do we need to do cleaning?
                if (time.monotonic() - self.last_clean) >= self.schedule_interval:
                    # Refresh the config
                    Config.system = await Config.aload_system()
                    print(f"{self.handled} tasks processed so far")
                    print("Running cleaning and scheduling")
                    for model in self.models:
                        asyncio.create_task(model.atransition_clean_locks())
                        asyncio.create_task(model.atransition_schedule_due())
                    self.last_clean = time.monotonic()
                # Calculate space left for tasks
                self.remove_completed_tasks()
                space_remaining = self.concurrency - len(self.tasks)
                # Fetch new tasks
                for model in self.models:
                    if space_remaining > 0:
                        for instance in await model.atransition_get_with_lock(
                            number=min(space_remaining, self.concurrency_per_model),
                            lock_expiry=(
                                timezone.now()
                                + datetime.timedelta(seconds=self.lock_expiry)
                            ),
                        ):
                            self.tasks.append(
                                asyncio.create_task(self.run_transition(instance))
                            )
                            self.handled += 1
                            space_remaining -= 1
                # Prevent busylooping
                await asyncio.sleep(0.1)
        except KeyboardInterrupt:
            # Wait for tasks to finish
            print("Waiting for tasks to complete")
            while True:
                self.remove_completed_tasks()
                if not self.tasks:
                    break
                # Prevent busylooping
                await asyncio.sleep(1)
        print("Complete")
        return self.handled

    async def run_transition(self, instance: StatorModel):
        """
        Wrapper for atransition_attempt with fallback error handling
        """
        try:
            print(
                f"Attempting transition on {instance._meta.label_lower}#{instance.pk} from state {instance.state}"
            )
            await instance.atransition_attempt()
        except BaseException as e:
            await exceptions.acapture_exception(e)
            traceback.print_exc()

    def remove_completed_tasks(self):
        """
        Removes all completed asyncio.Tasks from our local in-progress list
        """
        self.tasks = [t for t in self.tasks if not t.done()]