diff options
author | Andrew Godwin | 2022-11-13 18:42:47 -0700 |
---|---|---|
committer | Andrew Godwin | 2022-11-13 18:43:09 -0700 |
commit | ddb3436275d3f02183f515c38cd3193cd1dfe2f4 (patch) | |
tree | 8902d4f085ad6d8323f43af20ca497d291e4d28a /stator/runner.py | |
parent | 68c156fd2758da5831bd83bfb1249dd014d78177 (diff) | |
download | takahe-ddb3436275d3f02183f515c38cd3193cd1dfe2f4.tar.gz takahe-ddb3436275d3f02183f515c38cd3193cd1dfe2f4.tar.bz2 takahe-ddb3436275d3f02183f515c38cd3193cd1dfe2f4.zip |
Boosting! Incoming, anyway.
Diffstat (limited to 'stator/runner.py')
-rw-r--r-- | stator/runner.py | 33 |
1 files changed, 20 insertions, 13 deletions
diff --git a/stator/runner.py b/stator/runner.py index 0b42b27..187aa47 100644 --- a/stator/runner.py +++ b/stator/runner.py @@ -16,16 +16,20 @@ class StatorRunner: Designed to run in a one-shot mode, living inside a request. """ - START_TIMEOUT = 30 - TOTAL_TIMEOUT = 60 - LOCK_TIMEOUT = 120 - - MAX_TASKS = 30 - MAX_TASKS_PER_MODEL = 5 - - def __init__(self, models: List[Type[StatorModel]]): + def __init__( + self, + models: List[Type[StatorModel]], + concurrency: int = 30, + concurrency_per_model: int = 5, + run_period: int = 30, + wait_period: int = 30, + ): self.models = models self.runner_id = uuid.uuid4().hex + self.concurrency = concurrency + self.concurrency_per_model = concurrency_per_model + self.run_period = run_period + self.total_period = run_period + wait_period async def run(self): start_time = time.monotonic() @@ -40,15 +44,18 @@ class StatorRunner: await asyncio.gather(*initial_tasks) # For the first time period, launch tasks print("Running main task loop") - while (time.monotonic() - start_time) < self.START_TIMEOUT: + while (time.monotonic() - start_time) < self.run_period: self.remove_completed_tasks() - space_remaining = self.MAX_TASKS - len(self.tasks) + space_remaining = self.concurrency - len(self.tasks) # Fetch new tasks for model in self.models: if space_remaining > 0: for instance in await model.atransition_get_with_lock( - min(space_remaining, self.MAX_TASKS_PER_MODEL), - timezone.now() + datetime.timedelta(seconds=self.LOCK_TIMEOUT), + number=min(space_remaining, self.concurrency_per_model), + lock_expiry=( + timezone.now() + + datetime.timedelta(seconds=(self.total_period * 2) + 60) + ), ): self.tasks.append( asyncio.create_task(self.run_transition(instance)) @@ -59,7 +66,7 @@ class StatorRunner: await asyncio.sleep(0.1) # Then wait for tasks to finish print("Waiting for tasks to complete") - while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT: + while (time.monotonic() - start_time) < self.total_period: self.remove_completed_tasks() if not self.tasks: break |