From 2142677b015507bc1aeb6179c5dfc4dfa3aaf0ce Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Sat, 19 Nov 2022 10:20:13 -0700 Subject: A few more tweaks for an initial deploy --- stator/runner.py | 90 ++++++++++++++++++++++++++++++-------------------------- 1 file changed, 48 insertions(+), 42 deletions(-) (limited to 'stator/runner.py') diff --git a/stator/runner.py b/stator/runner.py index bb1b009..d286bc1 100644 --- a/stator/runner.py +++ b/stator/runner.py @@ -3,7 +3,7 @@ import datetime import time import traceback import uuid -from typing import List, Type +from typing import List, Optional, Type from django.utils import timezone @@ -13,7 +13,7 @@ from stator.models import StatorModel class StatorRunner: """ Runs tasks on models that are looking for state changes. - Designed to run in a one-shot mode, living inside a request. + Designed to run for a determinate amount of time, and then exit. """ def __init__( @@ -21,57 +21,63 @@ class StatorRunner: models: List[Type[StatorModel]], concurrency: int = 50, concurrency_per_model: int = 10, - run_period: int = 60, - wait_period: int = 30, + liveness_file: Optional[str] = None, + schedule_interval: int = 30, + lock_expiry: int = 300, ): self.models = models self.runner_id = uuid.uuid4().hex self.concurrency = concurrency self.concurrency_per_model = concurrency_per_model - self.run_period = run_period - self.total_period = run_period + wait_period + self.liveness_file = liveness_file + self.schedule_interval = schedule_interval + self.lock_expiry = lock_expiry async def run(self): - start_time = time.monotonic() self.handled = 0 + self.last_clean = time.monotonic() - self.schedule_interval self.tasks = [] - # Clean up old locks - print("Running initial cleaning and scheduling") - initial_tasks = [] - for model in self.models: - initial_tasks.append(model.atransition_clean_locks()) - initial_tasks.append(model.atransition_schedule_due()) - await asyncio.gather(*initial_tasks) # For the first time period, launch tasks print("Running main task loop") - while (time.monotonic() - start_time) < self.run_period: - self.remove_completed_tasks() - space_remaining = self.concurrency - len(self.tasks) - # Fetch new tasks - for model in self.models: - if space_remaining > 0: - for instance in await model.atransition_get_with_lock( - number=min(space_remaining, self.concurrency_per_model), - lock_expiry=( - timezone.now() - + datetime.timedelta(seconds=(self.total_period * 2) + 60) - ), - ): - self.tasks.append( - asyncio.create_task(self.run_transition(instance)) - ) - self.handled += 1 - space_remaining -= 1 - # Prevent busylooping - await asyncio.sleep(0.1) - # Then wait for tasks to finish - print("Waiting for tasks to complete") - while (time.monotonic() - start_time) < self.total_period: - self.remove_completed_tasks() - if not self.tasks: - break - # Prevent busylooping - await asyncio.sleep(1) + try: + while True: + # Do we need to do cleaning? + if (time.monotonic() - self.last_clean) >= self.schedule_interval: + print(f"{self.handled} tasks processed so far") + print("Running cleaning and scheduling") + self.remove_completed_tasks() + for model in self.models: + asyncio.create_task(model.atransition_clean_locks()) + asyncio.create_task(model.atransition_schedule_due()) + self.last_clean = time.monotonic() + # Calculate space left for tasks + space_remaining = self.concurrency - len(self.tasks) + # Fetch new tasks + for model in self.models: + if space_remaining > 0: + for instance in await model.atransition_get_with_lock( + number=min(space_remaining, self.concurrency_per_model), + lock_expiry=( + timezone.now() + + datetime.timedelta(seconds=self.lock_expiry) + ), + ): + self.tasks.append( + asyncio.create_task(self.run_transition(instance)) + ) + self.handled += 1 + space_remaining -= 1 + # Prevent busylooping + await asyncio.sleep(0.1) + except KeyboardInterrupt: + # Wait for tasks to finish + print("Waiting for tasks to complete") + while True: + self.remove_completed_tasks() + if not self.tasks: + break + # Prevent busylooping + await asyncio.sleep(1) print("Complete") return self.handled -- cgit v1.2.3