summaryrefslogtreecommitdiffstats
path: root/stator/runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'stator/runner.py')
-rw-r--r--stator/runner.py90
1 files changed, 48 insertions, 42 deletions
diff --git a/stator/runner.py b/stator/runner.py
index bb1b009..d286bc1 100644
--- a/stator/runner.py
+++ b/stator/runner.py
@@ -3,7 +3,7 @@ import datetime
import time
import traceback
import uuid
-from typing import List, Type
+from typing import List, Optional, Type
from django.utils import timezone
@@ -13,7 +13,7 @@ from stator.models import StatorModel
class StatorRunner:
"""
Runs tasks on models that are looking for state changes.
- Designed to run in a one-shot mode, living inside a request.
+ Designed to run for a determinate amount of time, and then exit.
"""
def __init__(
@@ -21,57 +21,63 @@ class StatorRunner:
models: List[Type[StatorModel]],
concurrency: int = 50,
concurrency_per_model: int = 10,
- run_period: int = 60,
- wait_period: int = 30,
+ liveness_file: Optional[str] = None,
+ schedule_interval: int = 30,
+ lock_expiry: int = 300,
):
self.models = models
self.runner_id = uuid.uuid4().hex
self.concurrency = concurrency
self.concurrency_per_model = concurrency_per_model
- self.run_period = run_period
- self.total_period = run_period + wait_period
+ self.liveness_file = liveness_file
+ self.schedule_interval = schedule_interval
+ self.lock_expiry = lock_expiry
async def run(self):
- start_time = time.monotonic()
self.handled = 0
+ self.last_clean = time.monotonic() - self.schedule_interval
self.tasks = []
- # Clean up old locks
- print("Running initial cleaning and scheduling")
- initial_tasks = []
- for model in self.models:
- initial_tasks.append(model.atransition_clean_locks())
- initial_tasks.append(model.atransition_schedule_due())
- await asyncio.gather(*initial_tasks)
# For the first time period, launch tasks
print("Running main task loop")
- while (time.monotonic() - start_time) < self.run_period:
- self.remove_completed_tasks()
- space_remaining = self.concurrency - len(self.tasks)
- # Fetch new tasks
- for model in self.models:
- if space_remaining > 0:
- for instance in await model.atransition_get_with_lock(
- number=min(space_remaining, self.concurrency_per_model),
- lock_expiry=(
- timezone.now()
- + datetime.timedelta(seconds=(self.total_period * 2) + 60)
- ),
- ):
- self.tasks.append(
- asyncio.create_task(self.run_transition(instance))
- )
- self.handled += 1
- space_remaining -= 1
- # Prevent busylooping
- await asyncio.sleep(0.1)
- # Then wait for tasks to finish
- print("Waiting for tasks to complete")
- while (time.monotonic() - start_time) < self.total_period:
- self.remove_completed_tasks()
- if not self.tasks:
- break
- # Prevent busylooping
- await asyncio.sleep(1)
+ try:
+ while True:
+ # Do we need to do cleaning?
+ if (time.monotonic() - self.last_clean) >= self.schedule_interval:
+ print(f"{self.handled} tasks processed so far")
+ print("Running cleaning and scheduling")
+ self.remove_completed_tasks()
+ for model in self.models:
+ asyncio.create_task(model.atransition_clean_locks())
+ asyncio.create_task(model.atransition_schedule_due())
+ self.last_clean = time.monotonic()
+ # Calculate space left for tasks
+ space_remaining = self.concurrency - len(self.tasks)
+ # Fetch new tasks
+ for model in self.models:
+ if space_remaining > 0:
+ for instance in await model.atransition_get_with_lock(
+ number=min(space_remaining, self.concurrency_per_model),
+ lock_expiry=(
+ timezone.now()
+ + datetime.timedelta(seconds=self.lock_expiry)
+ ),
+ ):
+ self.tasks.append(
+ asyncio.create_task(self.run_transition(instance))
+ )
+ self.handled += 1
+ space_remaining -= 1
+ # Prevent busylooping
+ await asyncio.sleep(0.1)
+ except KeyboardInterrupt:
+ # Wait for tasks to finish
+ print("Waiting for tasks to complete")
+ while True:
+ self.remove_completed_tasks()
+ if not self.tasks:
+ break
+ # Prevent busylooping
+ await asyncio.sleep(1)
print("Complete")
return self.handled