From a2404e01cdbeef2ba332e147a5f2f1ca0a0310d7 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Sat, 5 Nov 2022 22:49:25 -0600 Subject: Queuing system and lazy profile fetch --- miniq/__init__.py | 0 miniq/admin.py | 21 +++++++++++++ miniq/apps.py | 6 ++++ miniq/migrations/0001_initial.py | 37 ++++++++++++++++++++++ miniq/migrations/__init__.py | 0 miniq/models.py | 68 ++++++++++++++++++++++++++++++++++++++++ miniq/views.py | 68 ++++++++++++++++++++++++++++++++++++++++ 7 files changed, 200 insertions(+) create mode 100644 miniq/__init__.py create mode 100644 miniq/admin.py create mode 100644 miniq/apps.py create mode 100644 miniq/migrations/0001_initial.py create mode 100644 miniq/migrations/__init__.py create mode 100644 miniq/models.py create mode 100644 miniq/views.py (limited to 'miniq') diff --git a/miniq/__init__.py b/miniq/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/miniq/admin.py b/miniq/admin.py new file mode 100644 index 0000000..1166f89 --- /dev/null +++ b/miniq/admin.py @@ -0,0 +1,21 @@ +from django.contrib import admin + +from miniq.models import Task + + +@admin.register(Task) +class TaskAdmin(admin.ModelAdmin): + + list_display = ["id", "created", "type", "subject", "completed", "failed"] + ordering = ["-created"] + actions = ["reset"] + + @admin.action(description="Reset Task") + def reset(self, request, queryset): + queryset.update( + failed=None, + completed=None, + locked=None, + locked_by=None, + error=None, + ) diff --git a/miniq/apps.py b/miniq/apps.py new file mode 100644 index 0000000..4c7e773 --- /dev/null +++ b/miniq/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class MiniqConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "miniq" diff --git a/miniq/migrations/0001_initial.py b/miniq/migrations/0001_initial.py new file mode 100644 index 0000000..6775ff3 --- /dev/null +++ b/miniq/migrations/0001_initial.py @@ -0,0 +1,37 @@ +# Generated by Django 4.1.3 on 2022-11-06 03:59 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="Task", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("type", models.CharField(max_length=500)), + ("priority", models.IntegerField(default=0)), + ("subject", models.TextField()), + ("payload", models.JSONField(blank=True, null=True)), + ("error", models.TextField(blank=True, null=True)), + ("created", models.DateTimeField(auto_now_add=True)), + ("completed", models.DateTimeField(blank=True, null=True)), + ("failed", models.DateTimeField(blank=True, null=True)), + ("locked", models.DateTimeField(blank=True, null=True)), + ("locked_by", models.CharField(blank=True, max_length=500, null=True)), + ], + ), + ] diff --git a/miniq/migrations/__init__.py b/miniq/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/miniq/models.py b/miniq/models.py new file mode 100644 index 0000000..996b482 --- /dev/null +++ b/miniq/models.py @@ -0,0 +1,68 @@ +from typing import Optional + +from django.db import models, transaction +from django.utils import timezone + + +class Task(models.Model): + """ + A task that must be done by a queue processor + """ + + class TypeChoices(models.TextChoices): + identity_fetch = "identity_fetch" + + type = models.CharField(max_length=500, choices=TypeChoices.choices) + priority = models.IntegerField(default=0) + subject = models.TextField() + payload = models.JSONField(blank=True, null=True) + error = models.TextField(blank=True, null=True) + + created = models.DateTimeField(auto_now_add=True) + completed = models.DateTimeField(blank=True, null=True) + failed = models.DateTimeField(blank=True, null=True) + locked = models.DateTimeField(blank=True, null=True) + locked_by = models.CharField(max_length=500, blank=True, null=True) + + def __str__(self): + return f"{self.id}/{self.type}({self.subject})" + + @classmethod + def get_one_available(cls, processor_id) -> Optional["Task"]: + """ + Gets one task off the list while reserving it, atomically. + """ + with transaction.atomic(): + next_task = cls.objects.filter(locked__isnull=True).first() + if next_task is None: + return None + next_task.locked = timezone.now() + next_task.locked_by = processor_id + next_task.save() + return next_task + + @classmethod + def submit(cls, type, subject, payload=None, deduplicate=True): + # Deduplication is done against tasks that have not started yet only, + # and only on tasks without payloads + if deduplicate and not payload: + if cls.objects.filter( + type=type, + subject=subject, + completed__isnull=True, + failed__isnull=True, + locked__isnull=True, + ).exists(): + return + cls.objects.create(type=type, subject=subject, payload=payload) + + async def complete(self): + await self.__class__.objects.filter(id=self.id).aupdate( + completed=timezone.now() + ) + + async def fail(self, error): + await self.__class__.objects.filter(id=self.id).aupdate( + failed=timezone.now(), + error=error, + ) diff --git a/miniq/views.py b/miniq/views.py new file mode 100644 index 0000000..12da7cd --- /dev/null +++ b/miniq/views.py @@ -0,0 +1,68 @@ +import asyncio +import time +import traceback +import uuid + +from asgiref.sync import sync_to_async +from django.http import HttpResponse +from django.views import View + +from miniq.models import Task +from users.models import Identity + + +class QueueProcessor(View): + """ + A view that takes some items off the queue and processes them. + Tries to limit its own runtime so it's within HTTP timeout limits. + """ + + START_TIMEOUT = 30 + TOTAL_TIMEOUT = 60 + MAX_TASKS = 10 + + async def get(self, request): + start_time = time.monotonic() + processor_id = uuid.uuid4().hex + handled = 0 + self.tasks = [] + # For the first time period, launch tasks + while (time.monotonic() - start_time) < self.START_TIMEOUT: + # Remove completed tasks + self.tasks = [t for t in self.tasks if not t.done()] + # See if there's a new task + if len(self.tasks) < self.MAX_TASKS: + # Pop a task off the queue and run it + task = await sync_to_async(Task.get_one_available)(processor_id) + if task is not None: + self.tasks.append(asyncio.create_task(self.run_task(task))) + handled += 1 + # Prevent busylooping + await asyncio.sleep(0.01) + # Then wait for tasks to finish + while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT: + # Remove completed tasks + self.tasks = [t for t in self.tasks if not t.done()] + if not self.tasks: + break + # Prevent busylooping + await asyncio.sleep(1) + return HttpResponse(f"{handled} tasks handled") + + async def run_task(self, task): + try: + print(f"Task {task}: Starting") + handler = getattr(self, f"handle_{task.type}", None) + if handler is None: + raise ValueError(f"Cannot handle type {task.type}") + await handler(task.subject, task.payload) + await task.complete() + print(f"Task {task}: Complete") + except BaseException as e: + print(f"Task {task}: Error {e}") + traceback.print_exc() + await task.fail(f"{e}\n\n" + traceback.format_exc()) + + async def handle_identity_fetch(self, subject, payload): + identity = await sync_to_async(Identity.by_handle)(subject) + await identity.fetch_details() -- cgit v1.2.3