From 1130c23b1e6bb1e4db71c41f5e4add903267830e Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Thu, 15 Dec 2022 12:26:17 -0700 Subject: Stator stats overhaul Removes the error table, adds a stats table and admin page. Fixes #166 --- stator/runner.py | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) (limited to 'stator/runner.py') diff --git a/stator/runner.py b/stator/runner.py index 7305a6e..ad3f660 100644 --- a/stator/runner.py +++ b/stator/runner.py @@ -4,12 +4,12 @@ import time import traceback import uuid -from asgiref.sync import async_to_sync +from asgiref.sync import async_to_sync, sync_to_async from django.utils import timezone from core import exceptions, sentry from core.models import Config -from stator.models import StatorModel +from stator.models import StatorModel, Stats class StatorRunner: @@ -39,7 +39,7 @@ class StatorRunner: async def run(self): sentry.set_takahe_app("stator") - self.handled = 0 + self.handled = {} self.started = time.monotonic() self.last_clean = time.monotonic() - self.schedule_interval self.tasks = [] @@ -52,7 +52,9 @@ class StatorRunner: if (time.monotonic() - self.last_clean) >= self.schedule_interval: # Refresh the config Config.system = await Config.aload_system() - print(f"{self.handled} tasks processed so far") + print("Tasks processed this loop:") + for label, number in self.handled.items(): + print(f" {label}: {number}") print("Running cleaning and scheduling") await self.run_scheduling() @@ -91,10 +93,23 @@ class StatorRunner: """ with sentry.start_transaction(op="task", name="stator.run_scheduling"): for model in self.models: + asyncio.create_task(self.submit_stats(model)) asyncio.create_task(model.atransition_clean_locks()) asyncio.create_task(model.atransition_schedule_due()) self.last_clean = time.monotonic() + async def submit_stats(self, model): + """ + Pop some statistics into the database + """ + stats_instance = await Stats.aget_for_model(model) + if stats_instance.model_label in self.handled: + stats_instance.add_handled(self.handled[stats_instance.model_label]) + del self.handled[stats_instance.model_label] + stats_instance.set_queued(await model.atransition_ready_count()) + stats_instance.trim_data() + await sync_to_async(stats_instance.save)() + async def fetch_and_process_tasks(self): # Calculate space left for tasks space_remaining = self.concurrency - len(self.tasks) @@ -110,7 +125,9 @@ class StatorRunner: self.tasks.append( asyncio.create_task(self.run_transition(instance)) ) - self.handled += 1 + self.handled[model._meta.label_lower] = ( + self.handled.get(model._meta.label_lower, 0) + 1 + ) space_remaining -= 1 async def run_transition(self, instance: StatorModel): -- cgit v1.2.3