From 7746abbbb7700fa918450101bbc6d29ed9b4b608 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Wed, 9 Nov 2022 22:29:33 -0700 Subject: Most of the way through the stator refactor --- stator/runner.py | 47 ++++++++++++++++++++++++++--------------------- 1 file changed, 26 insertions(+), 21 deletions(-) (limited to 'stator/runner.py') diff --git a/stator/runner.py b/stator/runner.py index 8c6e0f1..f9c726e 100644 --- a/stator/runner.py +++ b/stator/runner.py @@ -4,11 +4,9 @@ import time import uuid from typing import List, Type -from asgiref.sync import sync_to_async -from django.db import transaction from django.utils import timezone -from stator.models import StatorModel, StatorTask +from stator.models import StatorModel class StatorRunner: @@ -22,6 +20,7 @@ class StatorRunner: LOCK_TIMEOUT = 120 MAX_TASKS = 30 + MAX_TASKS_PER_MODEL = 5 def __init__(self, models: List[Type[StatorModel]]): self.models = models @@ -32,38 +31,44 @@ class StatorRunner: self.handled = 0 self.tasks = [] # Clean up old locks - await StatorTask.aclean_old_locks() - # Examine what needs scheduling - + print("Running initial cleaning and scheduling") + initial_tasks = [] + for model in self.models: + initial_tasks.append(model.atransition_clean_locks()) + initial_tasks.append(model.atransition_schedule_due()) + await asyncio.gather(*initial_tasks) # For the first time period, launch tasks + print("Running main task loop") while (time.monotonic() - start_time) < self.START_TIMEOUT: self.remove_completed_tasks() space_remaining = self.MAX_TASKS - len(self.tasks) # Fetch new tasks - if space_remaining > 0: - for new_task in await StatorTask.aget_for_execution( - space_remaining, - timezone.now() + datetime.timedelta(seconds=self.LOCK_TIMEOUT), - ): - self.tasks.append(asyncio.create_task(self.run_task(new_task))) - self.handled += 1 + for model in self.models: + if space_remaining > 0: + for instance in await model.atransition_get_with_lock( + min(space_remaining, self.MAX_TASKS_PER_MODEL), + timezone.now() + datetime.timedelta(seconds=self.LOCK_TIMEOUT), + ): + print( + f"Attempting transition on {instance._meta.label_lower}#{instance.pk}" + ) + self.tasks.append( + asyncio.create_task(instance.atransition_attempt()) + ) + self.handled += 1 + space_remaining -= 1 # Prevent busylooping - await asyncio.sleep(0.01) + await asyncio.sleep(0.1) # Then wait for tasks to finish + print("Waiting for tasks to complete") while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT: self.remove_completed_tasks() if not self.tasks: break # Prevent busylooping await asyncio.sleep(1) + print("Complete") return self.handled - async def run_task(self, task: StatorTask): - # Resolve the model instance - model_instance = await task.aget_model_instance() - await model_instance.attempt_transition() - # Remove ourselves from the database as complete - await task.adelete() - def remove_completed_tasks(self): self.tasks = [t for t in self.tasks if not t.done()] -- cgit v1.2.3