diff options
author | Andrew Godwin | 2022-11-19 10:20:13 -0700 |
---|---|---|
committer | Andrew Godwin | 2022-11-19 10:20:13 -0700 |
commit | 2142677b015507bc1aeb6179c5dfc4dfa3aaf0ce (patch) | |
tree | daac448f073c16a3e48157f2897ee6eff2a4d4d7 /stator/runner.py | |
parent | 80193114909a3f6ca1eda9a47b6330ef249a8ee5 (diff) | |
download | takahe-2142677b015507bc1aeb6179c5dfc4dfa3aaf0ce.tar.gz takahe-2142677b015507bc1aeb6179c5dfc4dfa3aaf0ce.tar.bz2 takahe-2142677b015507bc1aeb6179c5dfc4dfa3aaf0ce.zip |
A few more tweaks for an initial deploy
Diffstat (limited to 'stator/runner.py')
-rw-r--r-- | stator/runner.py | 90 |
1 files changed, 48 insertions, 42 deletions
diff --git a/stator/runner.py b/stator/runner.py index bb1b009..d286bc1 100644 --- a/stator/runner.py +++ b/stator/runner.py @@ -3,7 +3,7 @@ import datetime import time import traceback import uuid -from typing import List, Type +from typing import List, Optional, Type from django.utils import timezone @@ -13,7 +13,7 @@ from stator.models import StatorModel class StatorRunner: """ Runs tasks on models that are looking for state changes. - Designed to run in a one-shot mode, living inside a request. + Designed to run for a determinate amount of time, and then exit. """ def __init__( @@ -21,57 +21,63 @@ class StatorRunner: models: List[Type[StatorModel]], concurrency: int = 50, concurrency_per_model: int = 10, - run_period: int = 60, - wait_period: int = 30, + liveness_file: Optional[str] = None, + schedule_interval: int = 30, + lock_expiry: int = 300, ): self.models = models self.runner_id = uuid.uuid4().hex self.concurrency = concurrency self.concurrency_per_model = concurrency_per_model - self.run_period = run_period - self.total_period = run_period + wait_period + self.liveness_file = liveness_file + self.schedule_interval = schedule_interval + self.lock_expiry = lock_expiry async def run(self): - start_time = time.monotonic() self.handled = 0 + self.last_clean = time.monotonic() - self.schedule_interval self.tasks = [] - # Clean up old locks - print("Running initial cleaning and scheduling") - initial_tasks = [] - for model in self.models: - initial_tasks.append(model.atransition_clean_locks()) - initial_tasks.append(model.atransition_schedule_due()) - await asyncio.gather(*initial_tasks) # For the first time period, launch tasks print("Running main task loop") - while (time.monotonic() - start_time) < self.run_period: - self.remove_completed_tasks() - space_remaining = self.concurrency - len(self.tasks) - # Fetch new tasks - for model in self.models: - if space_remaining > 0: - for instance in await model.atransition_get_with_lock( - number=min(space_remaining, self.concurrency_per_model), - lock_expiry=( - timezone.now() - + datetime.timedelta(seconds=(self.total_period * 2) + 60) - ), - ): - self.tasks.append( - asyncio.create_task(self.run_transition(instance)) - ) - self.handled += 1 - space_remaining -= 1 - # Prevent busylooping - await asyncio.sleep(0.1) - # Then wait for tasks to finish - print("Waiting for tasks to complete") - while (time.monotonic() - start_time) < self.total_period: - self.remove_completed_tasks() - if not self.tasks: - break - # Prevent busylooping - await asyncio.sleep(1) + try: + while True: + # Do we need to do cleaning? + if (time.monotonic() - self.last_clean) >= self.schedule_interval: + print(f"{self.handled} tasks processed so far") + print("Running cleaning and scheduling") + self.remove_completed_tasks() + for model in self.models: + asyncio.create_task(model.atransition_clean_locks()) + asyncio.create_task(model.atransition_schedule_due()) + self.last_clean = time.monotonic() + # Calculate space left for tasks + space_remaining = self.concurrency - len(self.tasks) + # Fetch new tasks + for model in self.models: + if space_remaining > 0: + for instance in await model.atransition_get_with_lock( + number=min(space_remaining, self.concurrency_per_model), + lock_expiry=( + timezone.now() + + datetime.timedelta(seconds=self.lock_expiry) + ), + ): + self.tasks.append( + asyncio.create_task(self.run_transition(instance)) + ) + self.handled += 1 + space_remaining -= 1 + # Prevent busylooping + await asyncio.sleep(0.1) + except KeyboardInterrupt: + # Wait for tasks to finish + print("Waiting for tasks to complete") + while True: + self.remove_completed_tasks() + if not self.tasks: + break + # Prevent busylooping + await asyncio.sleep(1) print("Complete") return self.handled |