From ddb3436275d3f02183f515c38cd3193cd1dfe2f4 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Sun, 13 Nov 2022 18:42:47 -0700 Subject: Boosting! Incoming, anyway. --- stator/management/commands/runstator.py | 11 +++++++++-- stator/runner.py | 33 ++++++++++++++++++++------------- 2 files changed, 29 insertions(+), 15 deletions(-) (limited to 'stator') diff --git a/stator/management/commands/runstator.py b/stator/management/commands/runstator.py index 1307fef..a77192e 100644 --- a/stator/management/commands/runstator.py +++ b/stator/management/commands/runstator.py @@ -12,9 +12,16 @@ class Command(BaseCommand): help = "Runs a Stator runner for a short period" def add_arguments(self, parser): + parser.add_argument( + "--concurrency", + "-c", + type=int, + default=30, + help="How many tasks to run at once", + ) parser.add_argument("model_labels", nargs="*", type=str) - def handle(self, model_labels: List[str], *args, **options): + def handle(self, model_labels: List[str], concurrency: int, *args, **options): # Resolve the models list into names models = cast( List[Type[StatorModel]], @@ -24,5 +31,5 @@ class Command(BaseCommand): models = StatorModel.subclasses print("Running for models: " + " ".join(m._meta.label_lower for m in models)) # Run a runner - runner = StatorRunner(models) + runner = StatorRunner(models, concurrency=concurrency) async_to_sync(runner.run)() diff --git a/stator/runner.py b/stator/runner.py index 0b42b27..187aa47 100644 --- a/stator/runner.py +++ b/stator/runner.py @@ -16,16 +16,20 @@ class StatorRunner: Designed to run in a one-shot mode, living inside a request. """ - START_TIMEOUT = 30 - TOTAL_TIMEOUT = 60 - LOCK_TIMEOUT = 120 - - MAX_TASKS = 30 - MAX_TASKS_PER_MODEL = 5 - - def __init__(self, models: List[Type[StatorModel]]): + def __init__( + self, + models: List[Type[StatorModel]], + concurrency: int = 30, + concurrency_per_model: int = 5, + run_period: int = 30, + wait_period: int = 30, + ): self.models = models self.runner_id = uuid.uuid4().hex + self.concurrency = concurrency + self.concurrency_per_model = concurrency_per_model + self.run_period = run_period + self.total_period = run_period + wait_period async def run(self): start_time = time.monotonic() @@ -40,15 +44,18 @@ class StatorRunner: await asyncio.gather(*initial_tasks) # For the first time period, launch tasks print("Running main task loop") - while (time.monotonic() - start_time) < self.START_TIMEOUT: + while (time.monotonic() - start_time) < self.run_period: self.remove_completed_tasks() - space_remaining = self.MAX_TASKS - len(self.tasks) + space_remaining = self.concurrency - len(self.tasks) # Fetch new tasks for model in self.models: if space_remaining > 0: for instance in await model.atransition_get_with_lock( - min(space_remaining, self.MAX_TASKS_PER_MODEL), - timezone.now() + datetime.timedelta(seconds=self.LOCK_TIMEOUT), + number=min(space_remaining, self.concurrency_per_model), + lock_expiry=( + timezone.now() + + datetime.timedelta(seconds=(self.total_period * 2) + 60) + ), ): self.tasks.append( asyncio.create_task(self.run_transition(instance)) @@ -59,7 +66,7 @@ class StatorRunner: await asyncio.sleep(0.1) # Then wait for tasks to finish print("Waiting for tasks to complete") - while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT: + while (time.monotonic() - start_time) < self.total_period: self.remove_completed_tasks() if not self.tasks: break -- cgit v1.2.3