summaryrefslogtreecommitdiffstats
path: root/stator
diff options
context:
space:
mode:
Diffstat (limited to 'stator')
-rw-r--r--stator/admin.py13
-rw-r--r--stator/migrations/0002_stats_delete_statorerror.py31
-rw-r--r--stator/models.py157
-rw-r--r--stator/runner.py27
4 files changed, 181 insertions, 47 deletions
diff --git a/stator/admin.py b/stator/admin.py
index 2d001ea..d22fd1b 100644
--- a/stator/admin.py
+++ b/stator/admin.py
@@ -1,20 +1,15 @@
from django.contrib import admin
-from stator.models import StatorError
+from stator.models import Stats
-@admin.register(StatorError)
+@admin.register(Stats)
class DomainAdmin(admin.ModelAdmin):
list_display = [
- "id",
- "date",
"model_label",
- "instance_pk",
- "state",
- "error",
+ "updated",
]
- list_filter = ["model_label", "date"]
- ordering = ["-date"]
+ ordering = ["model_label"]
def has_add_permission(self, request, obj=None):
return False
diff --git a/stator/migrations/0002_stats_delete_statorerror.py b/stator/migrations/0002_stats_delete_statorerror.py
new file mode 100644
index 0000000..5d22003
--- /dev/null
+++ b/stator/migrations/0002_stats_delete_statorerror.py
@@ -0,0 +1,31 @@
+# Generated by Django 4.1.4 on 2022-12-15 18:38
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("stator", "0001_initial"),
+ ]
+
+ operations = [
+ migrations.CreateModel(
+ name="Stats",
+ fields=[
+ (
+ "model_label",
+ models.CharField(max_length=200, primary_key=True, serialize=False),
+ ),
+ ("statistics", models.JSONField()),
+ ("created", models.DateTimeField(auto_now_add=True)),
+ ("updated", models.DateTimeField(auto_now=True)),
+ ],
+ options={
+ "verbose_name_plural": "Stats",
+ },
+ ),
+ migrations.DeleteModel(
+ name="StatorError",
+ ),
+ ]
diff --git a/stator/models.py b/stator/models.py
index 261584c..c6e777a 100644
--- a/stator/models.py
+++ b/stator/models.py
@@ -1,5 +1,4 @@
import datetime
-import pprint
import traceback
from typing import ClassVar, cast
@@ -128,6 +127,19 @@ class StatorModel(models.Model):
return await sync_to_async(cls.transition_get_with_lock)(number, lock_expiry)
@classmethod
+ async def atransition_ready_count(cls) -> int:
+ """
+ Returns how many instances are "queued"
+ """
+ return await (
+ cls.objects.filter(
+ state_locked_until__isnull=True,
+ state_ready=True,
+ state__in=cls.state_graph.automatic_states,
+ ).acount()
+ )
+
+ @classmethod
async def atransition_clean_locks(cls):
await cls.objects.filter(state_locked_until__lte=timezone.now()).aupdate(
state_locked_until=None
@@ -158,7 +170,6 @@ class StatorModel(models.Model):
try:
next_state = await current_state.handler(self)
except BaseException as e:
- await StatorError.acreate_from_instance(self, e)
await exceptions.acapture_exception(e)
traceback.print_exc()
else:
@@ -209,46 +220,126 @@ class StatorModel(models.Model):
atransition_perform = sync_to_async(transition_perform)
-class StatorError(models.Model):
+class Stats(models.Model):
"""
- Tracks any errors running the transitions.
- Meant to be cleaned out regularly. Should probably be a log.
+ Tracks summary statistics of each model over time.
"""
# appname.modelname (lowercased) label for the model this represents
- model_label = models.CharField(max_length=200)
+ model_label = models.CharField(max_length=200, primary_key=True)
- # The primary key of that model (probably int or str)
- instance_pk = models.CharField(max_length=200)
+ statistics = models.JSONField()
- # The state we were on
- state = models.CharField(max_length=200)
+ created = models.DateTimeField(auto_now_add=True)
+ updated = models.DateTimeField(auto_now=True)
- # When it happened
- date = models.DateTimeField(auto_now_add=True)
-
- # Error name
- error = models.TextField()
+ class Meta:
+ verbose_name_plural = "Stats"
- # Error details
- error_details = models.TextField(blank=True, null=True)
+ @classmethod
+ def get_for_model(cls, model: type[StatorModel]) -> "Stats":
+ instance = cls.objects.filter(model_label=model._meta.label_lower).first()
+ if instance is None:
+ instance = cls(model_label=model._meta.label_lower)
+ if not instance.statistics:
+ instance.statistics = {}
+ # Ensure there are the right keys
+ for key in ["queued", "hourly", "daily", "monthly"]:
+ if key not in instance.statistics:
+ instance.statistics[key] = {}
+ return instance
@classmethod
- async def acreate_from_instance(
- cls,
- instance: StatorModel,
- exception: BaseException | None = None,
- ):
- detail = traceback.format_exc()
- if exception and len(exception.args) > 1:
- detail += "\n\n" + "\n\n".join(
- pprint.pformat(arg) for arg in exception.args
- )
+ async def aget_for_model(cls, model: type[StatorModel]) -> "Stats":
+ return await sync_to_async(cls.get_for_model)(model)
+
+ def set_queued(self, number: int):
+ """
+ Sets the current queued amount.
+
+ The queue is an instantaneous value (a "gauge") rather than a
+ sum ("counter"). It's mostly used for reporting what things are right
+ now, but basic trend analysis is also used to see if we think the
+ queue is backing up.
+ """
+ self.statistics["queued"][
+ int(timezone.now().replace(second=0, microsecond=0).timestamp())
+ ] = number
+
+ def add_handled(self, number: int):
+ """
+ Adds the "handled" number to the current stats.
+ """
+ hour = timezone.now().replace(minute=0, second=0, microsecond=0)
+ day = hour.replace(hour=0)
+ hour_timestamp = str(int(hour.timestamp()))
+ day_timestamp = str(int(day.timestamp()))
+ month_timestamp = str(int(day.replace(day=1).timestamp()))
+ self.statistics["hourly"][hour_timestamp] = (
+ self.statistics["hourly"].get(hour_timestamp, 0) + number
+ )
+ self.statistics["daily"][day_timestamp] = (
+ self.statistics["daily"].get(day_timestamp, 0) + number
+ )
+ self.statistics["monthly"][month_timestamp] = (
+ self.statistics["monthly"].get(month_timestamp, 0) + number
+ )
- return await cls.objects.acreate(
- model_label=instance._meta.label_lower,
- instance_pk=str(instance.pk),
- state=instance.state,
- error=str(exception),
- error_details=detail,
+ def trim_data(self):
+ """
+ Removes excessively old data from the field
+ """
+ queued_horizon = int((timezone.now() - datetime.timedelta(hours=2)).timestamp())
+ hourly_horizon = int(
+ (timezone.now() - datetime.timedelta(hours=50)).timestamp()
+ )
+ daily_horizon = int((timezone.now() - datetime.timedelta(days=62)).timestamp())
+ monthly_horizon = int(
+ (timezone.now() - datetime.timedelta(days=3653)).timestamp()
+ )
+ self.statistics["queued"] = {
+ ts: v
+ for ts, v in self.statistics["queued"].items()
+ if int(ts) >= queued_horizon
+ }
+ self.statistics["hourly"] = {
+ ts: v
+ for ts, v in self.statistics["hourly"].items()
+ if int(ts) >= hourly_horizon
+ }
+ self.statistics["daily"] = {
+ ts: v
+ for ts, v in self.statistics["daily"].items()
+ if int(ts) >= daily_horizon
+ }
+ self.statistics["monthly"] = {
+ ts: v
+ for ts, v in self.statistics["monthly"].items()
+ if int(ts) >= monthly_horizon
+ }
+
+ def most_recent_queued(self) -> int:
+ """
+ Returns the most recent number of how many were queued
+ """
+ queued = [(int(ts), v) for ts, v in self.statistics["queued"].items()]
+ queued.sort(reverse=True)
+ if queued:
+ return queued[0][1]
+ else:
+ return 0
+
+ def most_recent_handled(self) -> tuple[int, int, int]:
+ """
+ Returns the current handling numbers for hour, day, month
+ """
+ hour = timezone.now().replace(minute=0, second=0, microsecond=0)
+ day = hour.replace(hour=0)
+ hour_timestamp = str(int(hour.timestamp()))
+ day_timestamp = str(int(day.timestamp()))
+ month_timestamp = str(int(day.replace(day=1).timestamp()))
+ return (
+ self.statistics["hourly"].get(hour_timestamp, 0),
+ self.statistics["daily"].get(day_timestamp, 0),
+ self.statistics["monthly"].get(month_timestamp, 0),
)
diff --git a/stator/runner.py b/stator/runner.py
index 7305a6e..ad3f660 100644
--- a/stator/runner.py
+++ b/stator/runner.py
@@ -4,12 +4,12 @@ import time
import traceback
import uuid
-from asgiref.sync import async_to_sync
+from asgiref.sync import async_to_sync, sync_to_async
from django.utils import timezone
from core import exceptions, sentry
from core.models import Config
-from stator.models import StatorModel
+from stator.models import StatorModel, Stats
class StatorRunner:
@@ -39,7 +39,7 @@ class StatorRunner:
async def run(self):
sentry.set_takahe_app("stator")
- self.handled = 0
+ self.handled = {}
self.started = time.monotonic()
self.last_clean = time.monotonic() - self.schedule_interval
self.tasks = []
@@ -52,7 +52,9 @@ class StatorRunner:
if (time.monotonic() - self.last_clean) >= self.schedule_interval:
# Refresh the config
Config.system = await Config.aload_system()
- print(f"{self.handled} tasks processed so far")
+ print("Tasks processed this loop:")
+ for label, number in self.handled.items():
+ print(f" {label}: {number}")
print("Running cleaning and scheduling")
await self.run_scheduling()
@@ -91,10 +93,23 @@ class StatorRunner:
"""
with sentry.start_transaction(op="task", name="stator.run_scheduling"):
for model in self.models:
+ asyncio.create_task(self.submit_stats(model))
asyncio.create_task(model.atransition_clean_locks())
asyncio.create_task(model.atransition_schedule_due())
self.last_clean = time.monotonic()
+ async def submit_stats(self, model):
+ """
+ Pop some statistics into the database
+ """
+ stats_instance = await Stats.aget_for_model(model)
+ if stats_instance.model_label in self.handled:
+ stats_instance.add_handled(self.handled[stats_instance.model_label])
+ del self.handled[stats_instance.model_label]
+ stats_instance.set_queued(await model.atransition_ready_count())
+ stats_instance.trim_data()
+ await sync_to_async(stats_instance.save)()
+
async def fetch_and_process_tasks(self):
# Calculate space left for tasks
space_remaining = self.concurrency - len(self.tasks)
@@ -110,7 +125,9 @@ class StatorRunner:
self.tasks.append(
asyncio.create_task(self.run_transition(instance))
)
- self.handled += 1
+ self.handled[model._meta.label_lower] = (
+ self.handled.get(model._meta.label_lower, 0) + 1
+ )
space_remaining -= 1
async def run_transition(self, instance: StatorModel):