summaryrefslogtreecommitdiffstats
path: root/stator/models.py
diff options
context:
space:
mode:
Diffstat (limited to 'stator/models.py')
-rw-r--r--stator/models.py157
1 files changed, 124 insertions, 33 deletions
diff --git a/stator/models.py b/stator/models.py
index 261584c..c6e777a 100644
--- a/stator/models.py
+++ b/stator/models.py
@@ -1,5 +1,4 @@
import datetime
-import pprint
import traceback
from typing import ClassVar, cast
@@ -128,6 +127,19 @@ class StatorModel(models.Model):
return await sync_to_async(cls.transition_get_with_lock)(number, lock_expiry)
@classmethod
+ async def atransition_ready_count(cls) -> int:
+ """
+ Returns how many instances are "queued"
+ """
+ return await (
+ cls.objects.filter(
+ state_locked_until__isnull=True,
+ state_ready=True,
+ state__in=cls.state_graph.automatic_states,
+ ).acount()
+ )
+
+ @classmethod
async def atransition_clean_locks(cls):
await cls.objects.filter(state_locked_until__lte=timezone.now()).aupdate(
state_locked_until=None
@@ -158,7 +170,6 @@ class StatorModel(models.Model):
try:
next_state = await current_state.handler(self)
except BaseException as e:
- await StatorError.acreate_from_instance(self, e)
await exceptions.acapture_exception(e)
traceback.print_exc()
else:
@@ -209,46 +220,126 @@ class StatorModel(models.Model):
atransition_perform = sync_to_async(transition_perform)
-class StatorError(models.Model):
+class Stats(models.Model):
"""
- Tracks any errors running the transitions.
- Meant to be cleaned out regularly. Should probably be a log.
+ Tracks summary statistics of each model over time.
"""
# appname.modelname (lowercased) label for the model this represents
- model_label = models.CharField(max_length=200)
+ model_label = models.CharField(max_length=200, primary_key=True)
- # The primary key of that model (probably int or str)
- instance_pk = models.CharField(max_length=200)
+ statistics = models.JSONField()
- # The state we were on
- state = models.CharField(max_length=200)
+ created = models.DateTimeField(auto_now_add=True)
+ updated = models.DateTimeField(auto_now=True)
- # When it happened
- date = models.DateTimeField(auto_now_add=True)
-
- # Error name
- error = models.TextField()
+ class Meta:
+ verbose_name_plural = "Stats"
- # Error details
- error_details = models.TextField(blank=True, null=True)
+ @classmethod
+ def get_for_model(cls, model: type[StatorModel]) -> "Stats":
+ instance = cls.objects.filter(model_label=model._meta.label_lower).first()
+ if instance is None:
+ instance = cls(model_label=model._meta.label_lower)
+ if not instance.statistics:
+ instance.statistics = {}
+ # Ensure there are the right keys
+ for key in ["queued", "hourly", "daily", "monthly"]:
+ if key not in instance.statistics:
+ instance.statistics[key] = {}
+ return instance
@classmethod
- async def acreate_from_instance(
- cls,
- instance: StatorModel,
- exception: BaseException | None = None,
- ):
- detail = traceback.format_exc()
- if exception and len(exception.args) > 1:
- detail += "\n\n" + "\n\n".join(
- pprint.pformat(arg) for arg in exception.args
- )
+ async def aget_for_model(cls, model: type[StatorModel]) -> "Stats":
+ return await sync_to_async(cls.get_for_model)(model)
+
+ def set_queued(self, number: int):
+ """
+ Sets the current queued amount.
+
+ The queue is an instantaneous value (a "gauge") rather than a
+ sum ("counter"). It's mostly used for reporting what things are right
+ now, but basic trend analysis is also used to see if we think the
+ queue is backing up.
+ """
+ self.statistics["queued"][
+ int(timezone.now().replace(second=0, microsecond=0).timestamp())
+ ] = number
+
+ def add_handled(self, number: int):
+ """
+ Adds the "handled" number to the current stats.
+ """
+ hour = timezone.now().replace(minute=0, second=0, microsecond=0)
+ day = hour.replace(hour=0)
+ hour_timestamp = str(int(hour.timestamp()))
+ day_timestamp = str(int(day.timestamp()))
+ month_timestamp = str(int(day.replace(day=1).timestamp()))
+ self.statistics["hourly"][hour_timestamp] = (
+ self.statistics["hourly"].get(hour_timestamp, 0) + number
+ )
+ self.statistics["daily"][day_timestamp] = (
+ self.statistics["daily"].get(day_timestamp, 0) + number
+ )
+ self.statistics["monthly"][month_timestamp] = (
+ self.statistics["monthly"].get(month_timestamp, 0) + number
+ )
- return await cls.objects.acreate(
- model_label=instance._meta.label_lower,
- instance_pk=str(instance.pk),
- state=instance.state,
- error=str(exception),
- error_details=detail,
+ def trim_data(self):
+ """
+ Removes excessively old data from the field
+ """
+ queued_horizon = int((timezone.now() - datetime.timedelta(hours=2)).timestamp())
+ hourly_horizon = int(
+ (timezone.now() - datetime.timedelta(hours=50)).timestamp()
+ )
+ daily_horizon = int((timezone.now() - datetime.timedelta(days=62)).timestamp())
+ monthly_horizon = int(
+ (timezone.now() - datetime.timedelta(days=3653)).timestamp()
+ )
+ self.statistics["queued"] = {
+ ts: v
+ for ts, v in self.statistics["queued"].items()
+ if int(ts) >= queued_horizon
+ }
+ self.statistics["hourly"] = {
+ ts: v
+ for ts, v in self.statistics["hourly"].items()
+ if int(ts) >= hourly_horizon
+ }
+ self.statistics["daily"] = {
+ ts: v
+ for ts, v in self.statistics["daily"].items()
+ if int(ts) >= daily_horizon
+ }
+ self.statistics["monthly"] = {
+ ts: v
+ for ts, v in self.statistics["monthly"].items()
+ if int(ts) >= monthly_horizon
+ }
+
+ def most_recent_queued(self) -> int:
+ """
+ Returns the most recent number of how many were queued
+ """
+ queued = [(int(ts), v) for ts, v in self.statistics["queued"].items()]
+ queued.sort(reverse=True)
+ if queued:
+ return queued[0][1]
+ else:
+ return 0
+
+ def most_recent_handled(self) -> tuple[int, int, int]:
+ """
+ Returns the current handling numbers for hour, day, month
+ """
+ hour = timezone.now().replace(minute=0, second=0, microsecond=0)
+ day = hour.replace(hour=0)
+ hour_timestamp = str(int(hour.timestamp()))
+ day_timestamp = str(int(day.timestamp()))
+ month_timestamp = str(int(day.replace(day=1).timestamp()))
+ return (
+ self.statistics["hourly"].get(hour_timestamp, 0),
+ self.statistics["daily"].get(day_timestamp, 0),
+ self.statistics["monthly"].get(month_timestamp, 0),
)