summaryrefslogtreecommitdiffstats
path: root/stator/runner.py
blob: d286bc1b49273819dc288e92c9d9da5ba752efd8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import asyncio
import datetime
import time
import traceback
import uuid
from typing import List, Optional, Type

from django.utils import timezone

from stator.models import StatorModel


class StatorRunner:
    """
    Runs tasks on models that are looking for state changes.
    Designed to run for a determinate amount of time, and then exit.
    """

    def __init__(
        self,
        models: List[Type[StatorModel]],
        concurrency: int = 50,
        concurrency_per_model: int = 10,
        liveness_file: Optional[str] = None,
        schedule_interval: int = 30,
        lock_expiry: int = 300,
    ):
        self.models = models
        self.runner_id = uuid.uuid4().hex
        self.concurrency = concurrency
        self.concurrency_per_model = concurrency_per_model
        self.liveness_file = liveness_file
        self.schedule_interval = schedule_interval
        self.lock_expiry = lock_expiry

    async def run(self):
        self.handled = 0
        self.last_clean = time.monotonic() - self.schedule_interval
        self.tasks = []
        # For the first time period, launch tasks
        print("Running main task loop")
        try:
            while True:
                # Do we need to do cleaning?
                if (time.monotonic() - self.last_clean) >= self.schedule_interval:
                    print(f"{self.handled} tasks processed so far")
                    print("Running cleaning and scheduling")
                    self.remove_completed_tasks()
                    for model in self.models:
                        asyncio.create_task(model.atransition_clean_locks())
                        asyncio.create_task(model.atransition_schedule_due())
                    self.last_clean = time.monotonic()
                # Calculate space left for tasks
                space_remaining = self.concurrency - len(self.tasks)
                # Fetch new tasks
                for model in self.models:
                    if space_remaining > 0:
                        for instance in await model.atransition_get_with_lock(
                            number=min(space_remaining, self.concurrency_per_model),
                            lock_expiry=(
                                timezone.now()
                                + datetime.timedelta(seconds=self.lock_expiry)
                            ),
                        ):
                            self.tasks.append(
                                asyncio.create_task(self.run_transition(instance))
                            )
                            self.handled += 1
                            space_remaining -= 1
                # Prevent busylooping
                await asyncio.sleep(0.1)
        except KeyboardInterrupt:
            # Wait for tasks to finish
            print("Waiting for tasks to complete")
            while True:
                self.remove_completed_tasks()
                if not self.tasks:
                    break
                # Prevent busylooping
                await asyncio.sleep(1)
        print("Complete")
        return self.handled

    async def run_transition(self, instance: StatorModel):
        """
        Wrapper for atransition_attempt with fallback error handling
        """
        try:
            print(
                f"Attempting transition on {instance._meta.label_lower}#{instance.pk} from state {instance.state}"
            )
            await instance.atransition_attempt()
        except BaseException:
            traceback.print_exc()

    def remove_completed_tasks(self):
        """
        Removes all completed asyncio.Tasks from our local in-progress list
        """
        self.tasks = [t for t in self.tasks if not t.done()]