summaryrefslogtreecommitdiffstats
path: root/miniq/models.py
blob: 24d311c2ef21977924c6e7d8d739289d1fd8364a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from typing import Optional

from django.db import models, transaction
from django.utils import timezone


class Task(models.Model):
    """
    A task that must be done by a queue processor
    """

    class TypeChoices(models.TextChoices):
        identity_fetch = "identity_fetch"
        inbox_item = "inbox_item"
        follow_request = "follow_request"
        follow_acknowledge = "follow_acknowledge"

    type = models.CharField(max_length=500, choices=TypeChoices.choices)
    priority = models.IntegerField(default=0)
    subject = models.TextField()
    payload = models.JSONField(blank=True, null=True)
    error = models.TextField(blank=True, null=True)

    created = models.DateTimeField(auto_now_add=True)
    completed = models.DateTimeField(blank=True, null=True)
    failed = models.DateTimeField(blank=True, null=True)
    locked = models.DateTimeField(blank=True, null=True)
    locked_by = models.CharField(max_length=500, blank=True, null=True)

    def __str__(self):
        return f"{self.id}/{self.type}({self.subject})"

    @classmethod
    def get_one_available(cls, processor_id) -> Optional["Task"]:
        """
        Gets one task off the list while reserving it, atomically.
        """
        with transaction.atomic():
            next_task = cls.objects.filter(locked__isnull=True).first()
            if next_task is None:
                return None
            next_task.locked = timezone.now()
            next_task.locked_by = processor_id
            next_task.save()
            return next_task

    @classmethod
    def submit(cls, type, subject: str, payload=None, deduplicate=True):
        # Deduplication is done against tasks that have not started yet only,
        # and only on tasks without payloads
        if deduplicate and not payload:
            if cls.objects.filter(
                type=type,
                subject=subject,
                completed__isnull=True,
                failed__isnull=True,
                locked__isnull=True,
            ).exists():
                return
        cls.objects.create(type=type, subject=subject, payload=payload)

    async def complete(self):
        await self.__class__.objects.filter(id=self.id).aupdate(
            completed=timezone.now()
        )

    async def fail(self, error):
        await self.__class__.objects.filter(id=self.id).aupdate(
            failed=timezone.now(),
            error=error,
        )