summaryrefslogtreecommitdiffstats
path: root/stator
diff options
context:
space:
mode:
authorAndrew Godwin2022-11-13 18:42:47 -0700
committerAndrew Godwin2022-11-13 18:43:09 -0700
commitddb3436275d3f02183f515c38cd3193cd1dfe2f4 (patch)
tree8902d4f085ad6d8323f43af20ca497d291e4d28a /stator
parent68c156fd2758da5831bd83bfb1249dd014d78177 (diff)
downloadtakahe-ddb3436275d3f02183f515c38cd3193cd1dfe2f4.tar.gz
takahe-ddb3436275d3f02183f515c38cd3193cd1dfe2f4.tar.bz2
takahe-ddb3436275d3f02183f515c38cd3193cd1dfe2f4.zip
Boosting! Incoming, anyway.
Diffstat (limited to 'stator')
-rw-r--r--stator/management/commands/runstator.py11
-rw-r--r--stator/runner.py33
2 files changed, 29 insertions, 15 deletions
diff --git a/stator/management/commands/runstator.py b/stator/management/commands/runstator.py
index 1307fef..a77192e 100644
--- a/stator/management/commands/runstator.py
+++ b/stator/management/commands/runstator.py
@@ -12,9 +12,16 @@ class Command(BaseCommand):
help = "Runs a Stator runner for a short period"
def add_arguments(self, parser):
+ parser.add_argument(
+ "--concurrency",
+ "-c",
+ type=int,
+ default=30,
+ help="How many tasks to run at once",
+ )
parser.add_argument("model_labels", nargs="*", type=str)
- def handle(self, model_labels: List[str], *args, **options):
+ def handle(self, model_labels: List[str], concurrency: int, *args, **options):
# Resolve the models list into names
models = cast(
List[Type[StatorModel]],
@@ -24,5 +31,5 @@ class Command(BaseCommand):
models = StatorModel.subclasses
print("Running for models: " + " ".join(m._meta.label_lower for m in models))
# Run a runner
- runner = StatorRunner(models)
+ runner = StatorRunner(models, concurrency=concurrency)
async_to_sync(runner.run)()
diff --git a/stator/runner.py b/stator/runner.py
index 0b42b27..187aa47 100644
--- a/stator/runner.py
+++ b/stator/runner.py
@@ -16,16 +16,20 @@ class StatorRunner:
Designed to run in a one-shot mode, living inside a request.
"""
- START_TIMEOUT = 30
- TOTAL_TIMEOUT = 60
- LOCK_TIMEOUT = 120
-
- MAX_TASKS = 30
- MAX_TASKS_PER_MODEL = 5
-
- def __init__(self, models: List[Type[StatorModel]]):
+ def __init__(
+ self,
+ models: List[Type[StatorModel]],
+ concurrency: int = 30,
+ concurrency_per_model: int = 5,
+ run_period: int = 30,
+ wait_period: int = 30,
+ ):
self.models = models
self.runner_id = uuid.uuid4().hex
+ self.concurrency = concurrency
+ self.concurrency_per_model = concurrency_per_model
+ self.run_period = run_period
+ self.total_period = run_period + wait_period
async def run(self):
start_time = time.monotonic()
@@ -40,15 +44,18 @@ class StatorRunner:
await asyncio.gather(*initial_tasks)
# For the first time period, launch tasks
print("Running main task loop")
- while (time.monotonic() - start_time) < self.START_TIMEOUT:
+ while (time.monotonic() - start_time) < self.run_period:
self.remove_completed_tasks()
- space_remaining = self.MAX_TASKS - len(self.tasks)
+ space_remaining = self.concurrency - len(self.tasks)
# Fetch new tasks
for model in self.models:
if space_remaining > 0:
for instance in await model.atransition_get_with_lock(
- min(space_remaining, self.MAX_TASKS_PER_MODEL),
- timezone.now() + datetime.timedelta(seconds=self.LOCK_TIMEOUT),
+ number=min(space_remaining, self.concurrency_per_model),
+ lock_expiry=(
+ timezone.now()
+ + datetime.timedelta(seconds=(self.total_period * 2) + 60)
+ ),
):
self.tasks.append(
asyncio.create_task(self.run_transition(instance))
@@ -59,7 +66,7 @@ class StatorRunner:
await asyncio.sleep(0.1)
# Then wait for tasks to finish
print("Waiting for tasks to complete")
- while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT:
+ while (time.monotonic() - start_time) < self.total_period:
self.remove_completed_tasks()
if not self.tasks:
break