summaryrefslogtreecommitdiffstats
path: root/stator/runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'stator/runner.py')
-rw-r--r--stator/runner.py47
1 files changed, 26 insertions, 21 deletions
diff --git a/stator/runner.py b/stator/runner.py
index 8c6e0f1..f9c726e 100644
--- a/stator/runner.py
+++ b/stator/runner.py
@@ -4,11 +4,9 @@ import time
import uuid
from typing import List, Type
-from asgiref.sync import sync_to_async
-from django.db import transaction
from django.utils import timezone
-from stator.models import StatorModel, StatorTask
+from stator.models import StatorModel
class StatorRunner:
@@ -22,6 +20,7 @@ class StatorRunner:
LOCK_TIMEOUT = 120
MAX_TASKS = 30
+ MAX_TASKS_PER_MODEL = 5
def __init__(self, models: List[Type[StatorModel]]):
self.models = models
@@ -32,38 +31,44 @@ class StatorRunner:
self.handled = 0
self.tasks = []
# Clean up old locks
- await StatorTask.aclean_old_locks()
- # Examine what needs scheduling
-
+ print("Running initial cleaning and scheduling")
+ initial_tasks = []
+ for model in self.models:
+ initial_tasks.append(model.atransition_clean_locks())
+ initial_tasks.append(model.atransition_schedule_due())
+ await asyncio.gather(*initial_tasks)
# For the first time period, launch tasks
+ print("Running main task loop")
while (time.monotonic() - start_time) < self.START_TIMEOUT:
self.remove_completed_tasks()
space_remaining = self.MAX_TASKS - len(self.tasks)
# Fetch new tasks
- if space_remaining > 0:
- for new_task in await StatorTask.aget_for_execution(
- space_remaining,
- timezone.now() + datetime.timedelta(seconds=self.LOCK_TIMEOUT),
- ):
- self.tasks.append(asyncio.create_task(self.run_task(new_task)))
- self.handled += 1
+ for model in self.models:
+ if space_remaining > 0:
+ for instance in await model.atransition_get_with_lock(
+ min(space_remaining, self.MAX_TASKS_PER_MODEL),
+ timezone.now() + datetime.timedelta(seconds=self.LOCK_TIMEOUT),
+ ):
+ print(
+ f"Attempting transition on {instance._meta.label_lower}#{instance.pk}"
+ )
+ self.tasks.append(
+ asyncio.create_task(instance.atransition_attempt())
+ )
+ self.handled += 1
+ space_remaining -= 1
# Prevent busylooping
- await asyncio.sleep(0.01)
+ await asyncio.sleep(0.1)
# Then wait for tasks to finish
+ print("Waiting for tasks to complete")
while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT:
self.remove_completed_tasks()
if not self.tasks:
break
# Prevent busylooping
await asyncio.sleep(1)
+ print("Complete")
return self.handled
- async def run_task(self, task: StatorTask):
- # Resolve the model instance
- model_instance = await task.aget_model_instance()
- await model_instance.attempt_transition()
- # Remove ourselves from the database as complete
- await task.adelete()
-
def remove_completed_tasks(self):
self.tasks = [t for t in self.tasks if not t.done()]