summaryrefslogtreecommitdiffstats
path: root/stator/runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'stator/runner.py')
-rw-r--r--stator/runner.py27
1 files changed, 22 insertions, 5 deletions
diff --git a/stator/runner.py b/stator/runner.py
index 7305a6e..ad3f660 100644
--- a/stator/runner.py
+++ b/stator/runner.py
@@ -4,12 +4,12 @@ import time
import traceback
import uuid
-from asgiref.sync import async_to_sync
+from asgiref.sync import async_to_sync, sync_to_async
from django.utils import timezone
from core import exceptions, sentry
from core.models import Config
-from stator.models import StatorModel
+from stator.models import StatorModel, Stats
class StatorRunner:
@@ -39,7 +39,7 @@ class StatorRunner:
async def run(self):
sentry.set_takahe_app("stator")
- self.handled = 0
+ self.handled = {}
self.started = time.monotonic()
self.last_clean = time.monotonic() - self.schedule_interval
self.tasks = []
@@ -52,7 +52,9 @@ class StatorRunner:
if (time.monotonic() - self.last_clean) >= self.schedule_interval:
# Refresh the config
Config.system = await Config.aload_system()
- print(f"{self.handled} tasks processed so far")
+ print("Tasks processed this loop:")
+ for label, number in self.handled.items():
+ print(f" {label}: {number}")
print("Running cleaning and scheduling")
await self.run_scheduling()
@@ -91,10 +93,23 @@ class StatorRunner:
"""
with sentry.start_transaction(op="task", name="stator.run_scheduling"):
for model in self.models:
+ asyncio.create_task(self.submit_stats(model))
asyncio.create_task(model.atransition_clean_locks())
asyncio.create_task(model.atransition_schedule_due())
self.last_clean = time.monotonic()
+ async def submit_stats(self, model):
+ """
+ Pop some statistics into the database
+ """
+ stats_instance = await Stats.aget_for_model(model)
+ if stats_instance.model_label in self.handled:
+ stats_instance.add_handled(self.handled[stats_instance.model_label])
+ del self.handled[stats_instance.model_label]
+ stats_instance.set_queued(await model.atransition_ready_count())
+ stats_instance.trim_data()
+ await sync_to_async(stats_instance.save)()
+
async def fetch_and_process_tasks(self):
# Calculate space left for tasks
space_remaining = self.concurrency - len(self.tasks)
@@ -110,7 +125,9 @@ class StatorRunner:
self.tasks.append(
asyncio.create_task(self.run_transition(instance))
)
- self.handled += 1
+ self.handled[model._meta.label_lower] = (
+ self.handled.get(model._meta.label_lower, 0) + 1
+ )
space_remaining -= 1
async def run_transition(self, instance: StatorModel):