From 1130c23b1e6bb1e4db71c41f5e4add903267830e Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Thu, 15 Dec 2022 12:26:17 -0700 Subject: Stator stats overhaul Removes the error table, adds a stats table and admin page. Fixes #166 --- stator/models.py | 157 +++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 124 insertions(+), 33 deletions(-) (limited to 'stator/models.py') diff --git a/stator/models.py b/stator/models.py index 261584c..c6e777a 100644 --- a/stator/models.py +++ b/stator/models.py @@ -1,5 +1,4 @@ import datetime -import pprint import traceback from typing import ClassVar, cast @@ -127,6 +126,19 @@ class StatorModel(models.Model): ) -> list["StatorModel"]: return await sync_to_async(cls.transition_get_with_lock)(number, lock_expiry) + @classmethod + async def atransition_ready_count(cls) -> int: + """ + Returns how many instances are "queued" + """ + return await ( + cls.objects.filter( + state_locked_until__isnull=True, + state_ready=True, + state__in=cls.state_graph.automatic_states, + ).acount() + ) + @classmethod async def atransition_clean_locks(cls): await cls.objects.filter(state_locked_until__lte=timezone.now()).aupdate( @@ -158,7 +170,6 @@ class StatorModel(models.Model): try: next_state = await current_state.handler(self) except BaseException as e: - await StatorError.acreate_from_instance(self, e) await exceptions.acapture_exception(e) traceback.print_exc() else: @@ -209,46 +220,126 @@ class StatorModel(models.Model): atransition_perform = sync_to_async(transition_perform) -class StatorError(models.Model): +class Stats(models.Model): """ - Tracks any errors running the transitions. - Meant to be cleaned out regularly. Should probably be a log. + Tracks summary statistics of each model over time. """ # appname.modelname (lowercased) label for the model this represents - model_label = models.CharField(max_length=200) + model_label = models.CharField(max_length=200, primary_key=True) - # The primary key of that model (probably int or str) - instance_pk = models.CharField(max_length=200) + statistics = models.JSONField() - # The state we were on - state = models.CharField(max_length=200) + created = models.DateTimeField(auto_now_add=True) + updated = models.DateTimeField(auto_now=True) - # When it happened - date = models.DateTimeField(auto_now_add=True) - - # Error name - error = models.TextField() + class Meta: + verbose_name_plural = "Stats" - # Error details - error_details = models.TextField(blank=True, null=True) + @classmethod + def get_for_model(cls, model: type[StatorModel]) -> "Stats": + instance = cls.objects.filter(model_label=model._meta.label_lower).first() + if instance is None: + instance = cls(model_label=model._meta.label_lower) + if not instance.statistics: + instance.statistics = {} + # Ensure there are the right keys + for key in ["queued", "hourly", "daily", "monthly"]: + if key not in instance.statistics: + instance.statistics[key] = {} + return instance @classmethod - async def acreate_from_instance( - cls, - instance: StatorModel, - exception: BaseException | None = None, - ): - detail = traceback.format_exc() - if exception and len(exception.args) > 1: - detail += "\n\n" + "\n\n".join( - pprint.pformat(arg) for arg in exception.args - ) + async def aget_for_model(cls, model: type[StatorModel]) -> "Stats": + return await sync_to_async(cls.get_for_model)(model) + + def set_queued(self, number: int): + """ + Sets the current queued amount. + + The queue is an instantaneous value (a "gauge") rather than a + sum ("counter"). It's mostly used for reporting what things are right + now, but basic trend analysis is also used to see if we think the + queue is backing up. + """ + self.statistics["queued"][ + int(timezone.now().replace(second=0, microsecond=0).timestamp()) + ] = number + + def add_handled(self, number: int): + """ + Adds the "handled" number to the current stats. + """ + hour = timezone.now().replace(minute=0, second=0, microsecond=0) + day = hour.replace(hour=0) + hour_timestamp = str(int(hour.timestamp())) + day_timestamp = str(int(day.timestamp())) + month_timestamp = str(int(day.replace(day=1).timestamp())) + self.statistics["hourly"][hour_timestamp] = ( + self.statistics["hourly"].get(hour_timestamp, 0) + number + ) + self.statistics["daily"][day_timestamp] = ( + self.statistics["daily"].get(day_timestamp, 0) + number + ) + self.statistics["monthly"][month_timestamp] = ( + self.statistics["monthly"].get(month_timestamp, 0) + number + ) - return await cls.objects.acreate( - model_label=instance._meta.label_lower, - instance_pk=str(instance.pk), - state=instance.state, - error=str(exception), - error_details=detail, + def trim_data(self): + """ + Removes excessively old data from the field + """ + queued_horizon = int((timezone.now() - datetime.timedelta(hours=2)).timestamp()) + hourly_horizon = int( + (timezone.now() - datetime.timedelta(hours=50)).timestamp() + ) + daily_horizon = int((timezone.now() - datetime.timedelta(days=62)).timestamp()) + monthly_horizon = int( + (timezone.now() - datetime.timedelta(days=3653)).timestamp() + ) + self.statistics["queued"] = { + ts: v + for ts, v in self.statistics["queued"].items() + if int(ts) >= queued_horizon + } + self.statistics["hourly"] = { + ts: v + for ts, v in self.statistics["hourly"].items() + if int(ts) >= hourly_horizon + } + self.statistics["daily"] = { + ts: v + for ts, v in self.statistics["daily"].items() + if int(ts) >= daily_horizon + } + self.statistics["monthly"] = { + ts: v + for ts, v in self.statistics["monthly"].items() + if int(ts) >= monthly_horizon + } + + def most_recent_queued(self) -> int: + """ + Returns the most recent number of how many were queued + """ + queued = [(int(ts), v) for ts, v in self.statistics["queued"].items()] + queued.sort(reverse=True) + if queued: + return queued[0][1] + else: + return 0 + + def most_recent_handled(self) -> tuple[int, int, int]: + """ + Returns the current handling numbers for hour, day, month + """ + hour = timezone.now().replace(minute=0, second=0, microsecond=0) + day = hour.replace(hour=0) + hour_timestamp = str(int(hour.timestamp())) + day_timestamp = str(int(day.timestamp())) + month_timestamp = str(int(day.replace(day=1).timestamp())) + return ( + self.statistics["hourly"].get(hour_timestamp, 0), + self.statistics["daily"].get(day_timestamp, 0), + self.statistics["monthly"].get(month_timestamp, 0), ) -- cgit v1.2.3