diff options
author | Andrew Godwin | 2022-11-19 10:20:13 -0700 |
---|---|---|
committer | Andrew Godwin | 2022-11-19 10:20:13 -0700 |
commit | 2142677b015507bc1aeb6179c5dfc4dfa3aaf0ce (patch) | |
tree | daac448f073c16a3e48157f2897ee6eff2a4d4d7 /stator | |
parent | 80193114909a3f6ca1eda9a47b6330ef249a8ee5 (diff) | |
download | takahe-2142677b015507bc1aeb6179c5dfc4dfa3aaf0ce.tar.gz takahe-2142677b015507bc1aeb6179c5dfc4dfa3aaf0ce.tar.bz2 takahe-2142677b015507bc1aeb6179c5dfc4dfa3aaf0ce.zip |
A few more tweaks for an initial deploy
Diffstat (limited to 'stator')
-rw-r--r-- | stator/management/commands/runstator.py | 32 | ||||
-rw-r--r-- | stator/runner.py | 90 | ||||
-rw-r--r-- | stator/views.py | 23 |
3 files changed, 77 insertions, 68 deletions
diff --git a/stator/management/commands/runstator.py b/stator/management/commands/runstator.py index eaa2585..3030960 100644 --- a/stator/management/commands/runstator.py +++ b/stator/management/commands/runstator.py @@ -10,7 +10,7 @@ from stator.runner import StatorRunner class Command(BaseCommand): - help = "Runs a Stator runner for a short period" + help = "Runs a Stator runner" def add_arguments(self, parser): parser.add_argument( @@ -20,9 +20,30 @@ class Command(BaseCommand): default=30, help="How many tasks to run at once", ) + parser.add_argument( + "--liveness-file", + type=str, + default=None, + help="A file to touch at least every 30 seconds to say the runner is alive", + ) + parser.add_argument( + "--schedule-interval", + "-s", + type=int, + default=30, + help="How often to run cleaning and scheduling", + ) parser.add_argument("model_labels", nargs="*", type=str) - def handle(self, model_labels: List[str], concurrency: int, *args, **options): + def handle( + self, + model_labels: List[str], + concurrency: int, + liveness_file: str, + schedule_interval: int, + *args, + **options + ): # Cache system config Config.system = Config.load_system() # Resolve the models list into names @@ -34,5 +55,10 @@ class Command(BaseCommand): models = StatorModel.subclasses print("Running for models: " + " ".join(m._meta.label_lower for m in models)) # Run a runner - runner = StatorRunner(models, concurrency=concurrency) + runner = StatorRunner( + models, + concurrency=concurrency, + liveness_file=liveness_file, + schedule_interval=schedule_interval, + ) async_to_sync(runner.run)() diff --git a/stator/runner.py b/stator/runner.py index bb1b009..d286bc1 100644 --- a/stator/runner.py +++ b/stator/runner.py @@ -3,7 +3,7 @@ import datetime import time import traceback import uuid -from typing import List, Type +from typing import List, Optional, Type from django.utils import timezone @@ -13,7 +13,7 @@ from stator.models import StatorModel class StatorRunner: """ Runs tasks on models that are looking for state changes. - Designed to run in a one-shot mode, living inside a request. + Designed to run for a determinate amount of time, and then exit. """ def __init__( @@ -21,57 +21,63 @@ class StatorRunner: models: List[Type[StatorModel]], concurrency: int = 50, concurrency_per_model: int = 10, - run_period: int = 60, - wait_period: int = 30, + liveness_file: Optional[str] = None, + schedule_interval: int = 30, + lock_expiry: int = 300, ): self.models = models self.runner_id = uuid.uuid4().hex self.concurrency = concurrency self.concurrency_per_model = concurrency_per_model - self.run_period = run_period - self.total_period = run_period + wait_period + self.liveness_file = liveness_file + self.schedule_interval = schedule_interval + self.lock_expiry = lock_expiry async def run(self): - start_time = time.monotonic() self.handled = 0 + self.last_clean = time.monotonic() - self.schedule_interval self.tasks = [] - # Clean up old locks - print("Running initial cleaning and scheduling") - initial_tasks = [] - for model in self.models: - initial_tasks.append(model.atransition_clean_locks()) - initial_tasks.append(model.atransition_schedule_due()) - await asyncio.gather(*initial_tasks) # For the first time period, launch tasks print("Running main task loop") - while (time.monotonic() - start_time) < self.run_period: - self.remove_completed_tasks() - space_remaining = self.concurrency - len(self.tasks) - # Fetch new tasks - for model in self.models: - if space_remaining > 0: - for instance in await model.atransition_get_with_lock( - number=min(space_remaining, self.concurrency_per_model), - lock_expiry=( - timezone.now() - + datetime.timedelta(seconds=(self.total_period * 2) + 60) - ), - ): - self.tasks.append( - asyncio.create_task(self.run_transition(instance)) - ) - self.handled += 1 - space_remaining -= 1 - # Prevent busylooping - await asyncio.sleep(0.1) - # Then wait for tasks to finish - print("Waiting for tasks to complete") - while (time.monotonic() - start_time) < self.total_period: - self.remove_completed_tasks() - if not self.tasks: - break - # Prevent busylooping - await asyncio.sleep(1) + try: + while True: + # Do we need to do cleaning? + if (time.monotonic() - self.last_clean) >= self.schedule_interval: + print(f"{self.handled} tasks processed so far") + print("Running cleaning and scheduling") + self.remove_completed_tasks() + for model in self.models: + asyncio.create_task(model.atransition_clean_locks()) + asyncio.create_task(model.atransition_schedule_due()) + self.last_clean = time.monotonic() + # Calculate space left for tasks + space_remaining = self.concurrency - len(self.tasks) + # Fetch new tasks + for model in self.models: + if space_remaining > 0: + for instance in await model.atransition_get_with_lock( + number=min(space_remaining, self.concurrency_per_model), + lock_expiry=( + timezone.now() + + datetime.timedelta(seconds=self.lock_expiry) + ), + ): + self.tasks.append( + asyncio.create_task(self.run_transition(instance)) + ) + self.handled += 1 + space_remaining -= 1 + # Prevent busylooping + await asyncio.sleep(0.1) + except KeyboardInterrupt: + # Wait for tasks to finish + print("Waiting for tasks to complete") + while True: + self.remove_completed_tasks() + if not self.tasks: + break + # Prevent busylooping + await asyncio.sleep(1) print("Complete") return self.handled diff --git a/stator/views.py b/stator/views.py deleted file mode 100644 index 9d2e154..0000000 --- a/stator/views.py +++ /dev/null @@ -1,23 +0,0 @@ -from django.conf import settings -from django.http import HttpResponse, HttpResponseForbidden -from django.views import View - -from stator.models import StatorModel -from stator.runner import StatorRunner - - -class RequestRunner(View): - """ - Runs a Stator runner within a HTTP request. For when you're on something - serverless. - """ - - async def get(self, request): - # Check the token, if supplied - if settings.STATOR_TOKEN: - if request.GET.get("token") != settings.STATOR_TOKEN: - return HttpResponseForbidden() - # Run on all models - runner = StatorRunner(StatorModel.subclasses) - handled = await runner.run() - return HttpResponse(f"Handled {handled}") |