summaryrefslogtreecommitdiffstats
path: root/miniq
diff options
context:
space:
mode:
authorAndrew Godwin2022-11-05 22:49:25 -0600
committerAndrew Godwin2022-11-05 22:49:25 -0600
commita2404e01cdbeef2ba332e147a5f2f1ca0a0310d7 (patch)
tree9aa1ae2ffad20a7afe8eac8cbe99ab9a53dcc0ca /miniq
parent56de2362a01089c8a5ca3c6e1affcade00ffdfce (diff)
downloadtakahe-a2404e01cdbeef2ba332e147a5f2f1ca0a0310d7.tar.gz
takahe-a2404e01cdbeef2ba332e147a5f2f1ca0a0310d7.tar.bz2
takahe-a2404e01cdbeef2ba332e147a5f2f1ca0a0310d7.zip
Queuing system and lazy profile fetch
Diffstat (limited to 'miniq')
-rw-r--r--miniq/__init__.py0
-rw-r--r--miniq/admin.py21
-rw-r--r--miniq/apps.py6
-rw-r--r--miniq/migrations/0001_initial.py37
-rw-r--r--miniq/migrations/__init__.py0
-rw-r--r--miniq/models.py68
-rw-r--r--miniq/views.py68
7 files changed, 200 insertions, 0 deletions
diff --git a/miniq/__init__.py b/miniq/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/miniq/__init__.py
diff --git a/miniq/admin.py b/miniq/admin.py
new file mode 100644
index 0000000..1166f89
--- /dev/null
+++ b/miniq/admin.py
@@ -0,0 +1,21 @@
+from django.contrib import admin
+
+from miniq.models import Task
+
+
+@admin.register(Task)
+class TaskAdmin(admin.ModelAdmin):
+
+ list_display = ["id", "created", "type", "subject", "completed", "failed"]
+ ordering = ["-created"]
+ actions = ["reset"]
+
+ @admin.action(description="Reset Task")
+ def reset(self, request, queryset):
+ queryset.update(
+ failed=None,
+ completed=None,
+ locked=None,
+ locked_by=None,
+ error=None,
+ )
diff --git a/miniq/apps.py b/miniq/apps.py
new file mode 100644
index 0000000..4c7e773
--- /dev/null
+++ b/miniq/apps.py
@@ -0,0 +1,6 @@
+from django.apps import AppConfig
+
+
+class MiniqConfig(AppConfig):
+ default_auto_field = "django.db.models.BigAutoField"
+ name = "miniq"
diff --git a/miniq/migrations/0001_initial.py b/miniq/migrations/0001_initial.py
new file mode 100644
index 0000000..6775ff3
--- /dev/null
+++ b/miniq/migrations/0001_initial.py
@@ -0,0 +1,37 @@
+# Generated by Django 4.1.3 on 2022-11-06 03:59
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ initial = True
+
+ dependencies = []
+
+ operations = [
+ migrations.CreateModel(
+ name="Task",
+ fields=[
+ (
+ "id",
+ models.BigAutoField(
+ auto_created=True,
+ primary_key=True,
+ serialize=False,
+ verbose_name="ID",
+ ),
+ ),
+ ("type", models.CharField(max_length=500)),
+ ("priority", models.IntegerField(default=0)),
+ ("subject", models.TextField()),
+ ("payload", models.JSONField(blank=True, null=True)),
+ ("error", models.TextField(blank=True, null=True)),
+ ("created", models.DateTimeField(auto_now_add=True)),
+ ("completed", models.DateTimeField(blank=True, null=True)),
+ ("failed", models.DateTimeField(blank=True, null=True)),
+ ("locked", models.DateTimeField(blank=True, null=True)),
+ ("locked_by", models.CharField(blank=True, max_length=500, null=True)),
+ ],
+ ),
+ ]
diff --git a/miniq/migrations/__init__.py b/miniq/migrations/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/miniq/migrations/__init__.py
diff --git a/miniq/models.py b/miniq/models.py
new file mode 100644
index 0000000..996b482
--- /dev/null
+++ b/miniq/models.py
@@ -0,0 +1,68 @@
+from typing import Optional
+
+from django.db import models, transaction
+from django.utils import timezone
+
+
+class Task(models.Model):
+ """
+ A task that must be done by a queue processor
+ """
+
+ class TypeChoices(models.TextChoices):
+ identity_fetch = "identity_fetch"
+
+ type = models.CharField(max_length=500, choices=TypeChoices.choices)
+ priority = models.IntegerField(default=0)
+ subject = models.TextField()
+ payload = models.JSONField(blank=True, null=True)
+ error = models.TextField(blank=True, null=True)
+
+ created = models.DateTimeField(auto_now_add=True)
+ completed = models.DateTimeField(blank=True, null=True)
+ failed = models.DateTimeField(blank=True, null=True)
+ locked = models.DateTimeField(blank=True, null=True)
+ locked_by = models.CharField(max_length=500, blank=True, null=True)
+
+ def __str__(self):
+ return f"{self.id}/{self.type}({self.subject})"
+
+ @classmethod
+ def get_one_available(cls, processor_id) -> Optional["Task"]:
+ """
+ Gets one task off the list while reserving it, atomically.
+ """
+ with transaction.atomic():
+ next_task = cls.objects.filter(locked__isnull=True).first()
+ if next_task is None:
+ return None
+ next_task.locked = timezone.now()
+ next_task.locked_by = processor_id
+ next_task.save()
+ return next_task
+
+ @classmethod
+ def submit(cls, type, subject, payload=None, deduplicate=True):
+ # Deduplication is done against tasks that have not started yet only,
+ # and only on tasks without payloads
+ if deduplicate and not payload:
+ if cls.objects.filter(
+ type=type,
+ subject=subject,
+ completed__isnull=True,
+ failed__isnull=True,
+ locked__isnull=True,
+ ).exists():
+ return
+ cls.objects.create(type=type, subject=subject, payload=payload)
+
+ async def complete(self):
+ await self.__class__.objects.filter(id=self.id).aupdate(
+ completed=timezone.now()
+ )
+
+ async def fail(self, error):
+ await self.__class__.objects.filter(id=self.id).aupdate(
+ failed=timezone.now(),
+ error=error,
+ )
diff --git a/miniq/views.py b/miniq/views.py
new file mode 100644
index 0000000..12da7cd
--- /dev/null
+++ b/miniq/views.py
@@ -0,0 +1,68 @@
+import asyncio
+import time
+import traceback
+import uuid
+
+from asgiref.sync import sync_to_async
+from django.http import HttpResponse
+from django.views import View
+
+from miniq.models import Task
+from users.models import Identity
+
+
+class QueueProcessor(View):
+ """
+ A view that takes some items off the queue and processes them.
+ Tries to limit its own runtime so it's within HTTP timeout limits.
+ """
+
+ START_TIMEOUT = 30
+ TOTAL_TIMEOUT = 60
+ MAX_TASKS = 10
+
+ async def get(self, request):
+ start_time = time.monotonic()
+ processor_id = uuid.uuid4().hex
+ handled = 0
+ self.tasks = []
+ # For the first time period, launch tasks
+ while (time.monotonic() - start_time) < self.START_TIMEOUT:
+ # Remove completed tasks
+ self.tasks = [t for t in self.tasks if not t.done()]
+ # See if there's a new task
+ if len(self.tasks) < self.MAX_TASKS:
+ # Pop a task off the queue and run it
+ task = await sync_to_async(Task.get_one_available)(processor_id)
+ if task is not None:
+ self.tasks.append(asyncio.create_task(self.run_task(task)))
+ handled += 1
+ # Prevent busylooping
+ await asyncio.sleep(0.01)
+ # Then wait for tasks to finish
+ while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT:
+ # Remove completed tasks
+ self.tasks = [t for t in self.tasks if not t.done()]
+ if not self.tasks:
+ break
+ # Prevent busylooping
+ await asyncio.sleep(1)
+ return HttpResponse(f"{handled} tasks handled")
+
+ async def run_task(self, task):
+ try:
+ print(f"Task {task}: Starting")
+ handler = getattr(self, f"handle_{task.type}", None)
+ if handler is None:
+ raise ValueError(f"Cannot handle type {task.type}")
+ await handler(task.subject, task.payload)
+ await task.complete()
+ print(f"Task {task}: Complete")
+ except BaseException as e:
+ print(f"Task {task}: Error {e}")
+ traceback.print_exc()
+ await task.fail(f"{e}\n\n" + traceback.format_exc())
+
+ async def handle_identity_fetch(self, subject, payload):
+ identity = await sync_to_async(Identity.by_handle)(subject)
+ await identity.fetch_details()