summaryrefslogtreecommitdiffstats
path: root/stator/runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'stator/runner.py')
-rw-r--r--stator/runner.py33
1 files changed, 20 insertions, 13 deletions
diff --git a/stator/runner.py b/stator/runner.py
index 0b42b27..187aa47 100644
--- a/stator/runner.py
+++ b/stator/runner.py
@@ -16,16 +16,20 @@ class StatorRunner:
Designed to run in a one-shot mode, living inside a request.
"""
- START_TIMEOUT = 30
- TOTAL_TIMEOUT = 60
- LOCK_TIMEOUT = 120
-
- MAX_TASKS = 30
- MAX_TASKS_PER_MODEL = 5
-
- def __init__(self, models: List[Type[StatorModel]]):
+ def __init__(
+ self,
+ models: List[Type[StatorModel]],
+ concurrency: int = 30,
+ concurrency_per_model: int = 5,
+ run_period: int = 30,
+ wait_period: int = 30,
+ ):
self.models = models
self.runner_id = uuid.uuid4().hex
+ self.concurrency = concurrency
+ self.concurrency_per_model = concurrency_per_model
+ self.run_period = run_period
+ self.total_period = run_period + wait_period
async def run(self):
start_time = time.monotonic()
@@ -40,15 +44,18 @@ class StatorRunner:
await asyncio.gather(*initial_tasks)
# For the first time period, launch tasks
print("Running main task loop")
- while (time.monotonic() - start_time) < self.START_TIMEOUT:
+ while (time.monotonic() - start_time) < self.run_period:
self.remove_completed_tasks()
- space_remaining = self.MAX_TASKS - len(self.tasks)
+ space_remaining = self.concurrency - len(self.tasks)
# Fetch new tasks
for model in self.models:
if space_remaining > 0:
for instance in await model.atransition_get_with_lock(
- min(space_remaining, self.MAX_TASKS_PER_MODEL),
- timezone.now() + datetime.timedelta(seconds=self.LOCK_TIMEOUT),
+ number=min(space_remaining, self.concurrency_per_model),
+ lock_expiry=(
+ timezone.now()
+ + datetime.timedelta(seconds=(self.total_period * 2) + 60)
+ ),
):
self.tasks.append(
asyncio.create_task(self.run_transition(instance))
@@ -59,7 +66,7 @@ class StatorRunner:
await asyncio.sleep(0.1)
# Then wait for tasks to finish
print("Waiting for tasks to complete")
- while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT:
+ while (time.monotonic() - start_time) < self.total_period:
self.remove_completed_tasks()
if not self.tasks:
break