summaryrefslogtreecommitdiffstats
path: root/stator/runner.py
blob: 8c6e0f139dca194e0ece88969a69bea100862196 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import asyncio
import datetime
import time
import uuid
from typing import List, Type

from asgiref.sync import sync_to_async
from django.db import transaction
from django.utils import timezone

from stator.models import StatorModel, StatorTask


class StatorRunner:
    """
    Runs tasks on models that are looking for state changes.
    Designed to run in a one-shot mode, living inside a request.
    """

    START_TIMEOUT = 30
    TOTAL_TIMEOUT = 60
    LOCK_TIMEOUT = 120

    MAX_TASKS = 30

    def __init__(self, models: List[Type[StatorModel]]):
        self.models = models
        self.runner_id = uuid.uuid4().hex

    async def run(self):
        start_time = time.monotonic()
        self.handled = 0
        self.tasks = []
        # Clean up old locks
        await StatorTask.aclean_old_locks()
        # Examine what needs scheduling

        # For the first time period, launch tasks
        while (time.monotonic() - start_time) < self.START_TIMEOUT:
            self.remove_completed_tasks()
            space_remaining = self.MAX_TASKS - len(self.tasks)
            # Fetch new tasks
            if space_remaining > 0:
                for new_task in await StatorTask.aget_for_execution(
                    space_remaining,
                    timezone.now() + datetime.timedelta(seconds=self.LOCK_TIMEOUT),
                ):
                    self.tasks.append(asyncio.create_task(self.run_task(new_task)))
                    self.handled += 1
            # Prevent busylooping
            await asyncio.sleep(0.01)
        # Then wait for tasks to finish
        while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT:
            self.remove_completed_tasks()
            if not self.tasks:
                break
            # Prevent busylooping
            await asyncio.sleep(1)
        return self.handled

    async def run_task(self, task: StatorTask):
        # Resolve the model instance
        model_instance = await task.aget_model_instance()
        await model_instance.attempt_transition()
        # Remove ourselves from the database as complete
        await task.adelete()

    def remove_completed_tasks(self):
        self.tasks = [t for t in self.tasks if not t.done()]