From 61c324508e62bb640b4526183d0837fc57d742c2 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Tue, 8 Nov 2022 23:06:29 -0700 Subject: Midway point in task refactor - changing direction --- miniq/models.py | 71 --------------------------------------------------------- 1 file changed, 71 deletions(-) delete mode 100644 miniq/models.py (limited to 'miniq/models.py') diff --git a/miniq/models.py b/miniq/models.py deleted file mode 100644 index 24d311c..0000000 --- a/miniq/models.py +++ /dev/null @@ -1,71 +0,0 @@ -from typing import Optional - -from django.db import models, transaction -from django.utils import timezone - - -class Task(models.Model): - """ - A task that must be done by a queue processor - """ - - class TypeChoices(models.TextChoices): - identity_fetch = "identity_fetch" - inbox_item = "inbox_item" - follow_request = "follow_request" - follow_acknowledge = "follow_acknowledge" - - type = models.CharField(max_length=500, choices=TypeChoices.choices) - priority = models.IntegerField(default=0) - subject = models.TextField() - payload = models.JSONField(blank=True, null=True) - error = models.TextField(blank=True, null=True) - - created = models.DateTimeField(auto_now_add=True) - completed = models.DateTimeField(blank=True, null=True) - failed = models.DateTimeField(blank=True, null=True) - locked = models.DateTimeField(blank=True, null=True) - locked_by = models.CharField(max_length=500, blank=True, null=True) - - def __str__(self): - return f"{self.id}/{self.type}({self.subject})" - - @classmethod - def get_one_available(cls, processor_id) -> Optional["Task"]: - """ - Gets one task off the list while reserving it, atomically. - """ - with transaction.atomic(): - next_task = cls.objects.filter(locked__isnull=True).first() - if next_task is None: - return None - next_task.locked = timezone.now() - next_task.locked_by = processor_id - next_task.save() - return next_task - - @classmethod - def submit(cls, type, subject: str, payload=None, deduplicate=True): - # Deduplication is done against tasks that have not started yet only, - # and only on tasks without payloads - if deduplicate and not payload: - if cls.objects.filter( - type=type, - subject=subject, - completed__isnull=True, - failed__isnull=True, - locked__isnull=True, - ).exists(): - return - cls.objects.create(type=type, subject=subject, payload=payload) - - async def complete(self): - await self.__class__.objects.filter(id=self.id).aupdate( - completed=timezone.now() - ) - - async def fail(self, error): - await self.__class__.objects.filter(id=self.id).aupdate( - failed=timezone.now(), - error=error, - ) -- cgit v1.2.3