blob: 0b42b274f03f19f42cdf2586cad1c70c8a1a466c (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
import asyncio
import datetime
import time
import traceback
import uuid
from typing import List, Type
from django.utils import timezone
from stator.models import StatorModel
class StatorRunner:
"""
Runs tasks on models that are looking for state changes.
Designed to run in a one-shot mode, living inside a request.
"""
START_TIMEOUT = 30
TOTAL_TIMEOUT = 60
LOCK_TIMEOUT = 120
MAX_TASKS = 30
MAX_TASKS_PER_MODEL = 5
def __init__(self, models: List[Type[StatorModel]]):
self.models = models
self.runner_id = uuid.uuid4().hex
async def run(self):
start_time = time.monotonic()
self.handled = 0
self.tasks = []
# Clean up old locks
print("Running initial cleaning and scheduling")
initial_tasks = []
for model in self.models:
initial_tasks.append(model.atransition_clean_locks())
initial_tasks.append(model.atransition_schedule_due())
await asyncio.gather(*initial_tasks)
# For the first time period, launch tasks
print("Running main task loop")
while (time.monotonic() - start_time) < self.START_TIMEOUT:
self.remove_completed_tasks()
space_remaining = self.MAX_TASKS - len(self.tasks)
# Fetch new tasks
for model in self.models:
if space_remaining > 0:
for instance in await model.atransition_get_with_lock(
min(space_remaining, self.MAX_TASKS_PER_MODEL),
timezone.now() + datetime.timedelta(seconds=self.LOCK_TIMEOUT),
):
self.tasks.append(
asyncio.create_task(self.run_transition(instance))
)
self.handled += 1
space_remaining -= 1
# Prevent busylooping
await asyncio.sleep(0.1)
# Then wait for tasks to finish
print("Waiting for tasks to complete")
while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT:
self.remove_completed_tasks()
if not self.tasks:
break
# Prevent busylooping
await asyncio.sleep(1)
print("Complete")
return self.handled
async def run_transition(self, instance: StatorModel):
"""
Wrapper for atransition_attempt with fallback error handling
"""
try:
print(
f"Attempting transition on {instance._meta.label_lower}#{instance.pk} from state {instance.state}"
)
await instance.atransition_attempt()
except BaseException:
traceback.print_exc()
def remove_completed_tasks(self):
"""
Removes all completed asyncio.Tasks from our local in-progress list
"""
self.tasks = [t for t in self.tasks if not t.done()]
|