summaryrefslogtreecommitdiffstats
path: root/stator
diff options
context:
space:
mode:
Diffstat (limited to 'stator')
-rw-r--r--stator/management/commands/runstator.py32
-rw-r--r--stator/runner.py90
-rw-r--r--stator/views.py23
3 files changed, 77 insertions, 68 deletions
diff --git a/stator/management/commands/runstator.py b/stator/management/commands/runstator.py
index eaa2585..3030960 100644
--- a/stator/management/commands/runstator.py
+++ b/stator/management/commands/runstator.py
@@ -10,7 +10,7 @@ from stator.runner import StatorRunner
class Command(BaseCommand):
- help = "Runs a Stator runner for a short period"
+ help = "Runs a Stator runner"
def add_arguments(self, parser):
parser.add_argument(
@@ -20,9 +20,30 @@ class Command(BaseCommand):
default=30,
help="How many tasks to run at once",
)
+ parser.add_argument(
+ "--liveness-file",
+ type=str,
+ default=None,
+ help="A file to touch at least every 30 seconds to say the runner is alive",
+ )
+ parser.add_argument(
+ "--schedule-interval",
+ "-s",
+ type=int,
+ default=30,
+ help="How often to run cleaning and scheduling",
+ )
parser.add_argument("model_labels", nargs="*", type=str)
- def handle(self, model_labels: List[str], concurrency: int, *args, **options):
+ def handle(
+ self,
+ model_labels: List[str],
+ concurrency: int,
+ liveness_file: str,
+ schedule_interval: int,
+ *args,
+ **options
+ ):
# Cache system config
Config.system = Config.load_system()
# Resolve the models list into names
@@ -34,5 +55,10 @@ class Command(BaseCommand):
models = StatorModel.subclasses
print("Running for models: " + " ".join(m._meta.label_lower for m in models))
# Run a runner
- runner = StatorRunner(models, concurrency=concurrency)
+ runner = StatorRunner(
+ models,
+ concurrency=concurrency,
+ liveness_file=liveness_file,
+ schedule_interval=schedule_interval,
+ )
async_to_sync(runner.run)()
diff --git a/stator/runner.py b/stator/runner.py
index bb1b009..d286bc1 100644
--- a/stator/runner.py
+++ b/stator/runner.py
@@ -3,7 +3,7 @@ import datetime
import time
import traceback
import uuid
-from typing import List, Type
+from typing import List, Optional, Type
from django.utils import timezone
@@ -13,7 +13,7 @@ from stator.models import StatorModel
class StatorRunner:
"""
Runs tasks on models that are looking for state changes.
- Designed to run in a one-shot mode, living inside a request.
+ Designed to run for a determinate amount of time, and then exit.
"""
def __init__(
@@ -21,57 +21,63 @@ class StatorRunner:
models: List[Type[StatorModel]],
concurrency: int = 50,
concurrency_per_model: int = 10,
- run_period: int = 60,
- wait_period: int = 30,
+ liveness_file: Optional[str] = None,
+ schedule_interval: int = 30,
+ lock_expiry: int = 300,
):
self.models = models
self.runner_id = uuid.uuid4().hex
self.concurrency = concurrency
self.concurrency_per_model = concurrency_per_model
- self.run_period = run_period
- self.total_period = run_period + wait_period
+ self.liveness_file = liveness_file
+ self.schedule_interval = schedule_interval
+ self.lock_expiry = lock_expiry
async def run(self):
- start_time = time.monotonic()
self.handled = 0
+ self.last_clean = time.monotonic() - self.schedule_interval
self.tasks = []
- # Clean up old locks
- print("Running initial cleaning and scheduling")
- initial_tasks = []
- for model in self.models:
- initial_tasks.append(model.atransition_clean_locks())
- initial_tasks.append(model.atransition_schedule_due())
- await asyncio.gather(*initial_tasks)
# For the first time period, launch tasks
print("Running main task loop")
- while (time.monotonic() - start_time) < self.run_period:
- self.remove_completed_tasks()
- space_remaining = self.concurrency - len(self.tasks)
- # Fetch new tasks
- for model in self.models:
- if space_remaining > 0:
- for instance in await model.atransition_get_with_lock(
- number=min(space_remaining, self.concurrency_per_model),
- lock_expiry=(
- timezone.now()
- + datetime.timedelta(seconds=(self.total_period * 2) + 60)
- ),
- ):
- self.tasks.append(
- asyncio.create_task(self.run_transition(instance))
- )
- self.handled += 1
- space_remaining -= 1
- # Prevent busylooping
- await asyncio.sleep(0.1)
- # Then wait for tasks to finish
- print("Waiting for tasks to complete")
- while (time.monotonic() - start_time) < self.total_period:
- self.remove_completed_tasks()
- if not self.tasks:
- break
- # Prevent busylooping
- await asyncio.sleep(1)
+ try:
+ while True:
+ # Do we need to do cleaning?
+ if (time.monotonic() - self.last_clean) >= self.schedule_interval:
+ print(f"{self.handled} tasks processed so far")
+ print("Running cleaning and scheduling")
+ self.remove_completed_tasks()
+ for model in self.models:
+ asyncio.create_task(model.atransition_clean_locks())
+ asyncio.create_task(model.atransition_schedule_due())
+ self.last_clean = time.monotonic()
+ # Calculate space left for tasks
+ space_remaining = self.concurrency - len(self.tasks)
+ # Fetch new tasks
+ for model in self.models:
+ if space_remaining > 0:
+ for instance in await model.atransition_get_with_lock(
+ number=min(space_remaining, self.concurrency_per_model),
+ lock_expiry=(
+ timezone.now()
+ + datetime.timedelta(seconds=self.lock_expiry)
+ ),
+ ):
+ self.tasks.append(
+ asyncio.create_task(self.run_transition(instance))
+ )
+ self.handled += 1
+ space_remaining -= 1
+ # Prevent busylooping
+ await asyncio.sleep(0.1)
+ except KeyboardInterrupt:
+ # Wait for tasks to finish
+ print("Waiting for tasks to complete")
+ while True:
+ self.remove_completed_tasks()
+ if not self.tasks:
+ break
+ # Prevent busylooping
+ await asyncio.sleep(1)
print("Complete")
return self.handled
diff --git a/stator/views.py b/stator/views.py
deleted file mode 100644
index 9d2e154..0000000
--- a/stator/views.py
+++ /dev/null
@@ -1,23 +0,0 @@
-from django.conf import settings
-from django.http import HttpResponse, HttpResponseForbidden
-from django.views import View
-
-from stator.models import StatorModel
-from stator.runner import StatorRunner
-
-
-class RequestRunner(View):
- """
- Runs a Stator runner within a HTTP request. For when you're on something
- serverless.
- """
-
- async def get(self, request):
- # Check the token, if supplied
- if settings.STATOR_TOKEN:
- if request.GET.get("token") != settings.STATOR_TOKEN:
- return HttpResponseForbidden()
- # Run on all models
- runner = StatorRunner(StatorModel.subclasses)
- handled = await runner.run()
- return HttpResponse(f"Handled {handled}")