From 61c324508e62bb640b4526183d0837fc57d742c2 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Tue, 8 Nov 2022 23:06:29 -0700 Subject: Midway point in task refactor - changing direction --- miniq/__init__.py | 0 miniq/admin.py | 21 ------------ miniq/apps.py | 6 ---- miniq/migrations/0001_initial.py | 48 --------------------------- miniq/migrations/__init__.py | 0 miniq/models.py | 71 ---------------------------------------- miniq/tasks.py | 34 ------------------- miniq/views.py | 51 ----------------------------- 8 files changed, 231 deletions(-) delete mode 100644 miniq/__init__.py delete mode 100644 miniq/admin.py delete mode 100644 miniq/apps.py delete mode 100644 miniq/migrations/0001_initial.py delete mode 100644 miniq/migrations/__init__.py delete mode 100644 miniq/models.py delete mode 100644 miniq/tasks.py delete mode 100644 miniq/views.py (limited to 'miniq') diff --git a/miniq/__init__.py b/miniq/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/miniq/admin.py b/miniq/admin.py deleted file mode 100644 index 1166f89..0000000 --- a/miniq/admin.py +++ /dev/null @@ -1,21 +0,0 @@ -from django.contrib import admin - -from miniq.models import Task - - -@admin.register(Task) -class TaskAdmin(admin.ModelAdmin): - - list_display = ["id", "created", "type", "subject", "completed", "failed"] - ordering = ["-created"] - actions = ["reset"] - - @admin.action(description="Reset Task") - def reset(self, request, queryset): - queryset.update( - failed=None, - completed=None, - locked=None, - locked_by=None, - error=None, - ) diff --git a/miniq/apps.py b/miniq/apps.py deleted file mode 100644 index 4c7e773..0000000 --- a/miniq/apps.py +++ /dev/null @@ -1,6 +0,0 @@ -from django.apps import AppConfig - - -class MiniqConfig(AppConfig): - default_auto_field = "django.db.models.BigAutoField" - name = "miniq" diff --git a/miniq/migrations/0001_initial.py b/miniq/migrations/0001_initial.py deleted file mode 100644 index dc6d42b..0000000 --- a/miniq/migrations/0001_initial.py +++ /dev/null @@ -1,48 +0,0 @@ -# Generated by Django 4.1.3 on 2022-11-07 04:19 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - initial = True - - dependencies = [] - - operations = [ - migrations.CreateModel( - name="Task", - fields=[ - ( - "id", - models.BigAutoField( - auto_created=True, - primary_key=True, - serialize=False, - verbose_name="ID", - ), - ), - ( - "type", - models.CharField( - choices=[ - ("identity_fetch", "Identity Fetch"), - ("inbox_item", "Inbox Item"), - ("follow_request", "Follow Request"), - ("follow_acknowledge", "Follow Acknowledge"), - ], - max_length=500, - ), - ), - ("priority", models.IntegerField(default=0)), - ("subject", models.TextField()), - ("payload", models.JSONField(blank=True, null=True)), - ("error", models.TextField(blank=True, null=True)), - ("created", models.DateTimeField(auto_now_add=True)), - ("completed", models.DateTimeField(blank=True, null=True)), - ("failed", models.DateTimeField(blank=True, null=True)), - ("locked", models.DateTimeField(blank=True, null=True)), - ("locked_by", models.CharField(blank=True, max_length=500, null=True)), - ], - ), - ] diff --git a/miniq/migrations/__init__.py b/miniq/migrations/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/miniq/models.py b/miniq/models.py deleted file mode 100644 index 24d311c..0000000 --- a/miniq/models.py +++ /dev/null @@ -1,71 +0,0 @@ -from typing import Optional - -from django.db import models, transaction -from django.utils import timezone - - -class Task(models.Model): - """ - A task that must be done by a queue processor - """ - - class TypeChoices(models.TextChoices): - identity_fetch = "identity_fetch" - inbox_item = "inbox_item" - follow_request = "follow_request" - follow_acknowledge = "follow_acknowledge" - - type = models.CharField(max_length=500, choices=TypeChoices.choices) - priority = models.IntegerField(default=0) - subject = models.TextField() - payload = models.JSONField(blank=True, null=True) - error = models.TextField(blank=True, null=True) - - created = models.DateTimeField(auto_now_add=True) - completed = models.DateTimeField(blank=True, null=True) - failed = models.DateTimeField(blank=True, null=True) - locked = models.DateTimeField(blank=True, null=True) - locked_by = models.CharField(max_length=500, blank=True, null=True) - - def __str__(self): - return f"{self.id}/{self.type}({self.subject})" - - @classmethod - def get_one_available(cls, processor_id) -> Optional["Task"]: - """ - Gets one task off the list while reserving it, atomically. - """ - with transaction.atomic(): - next_task = cls.objects.filter(locked__isnull=True).first() - if next_task is None: - return None - next_task.locked = timezone.now() - next_task.locked_by = processor_id - next_task.save() - return next_task - - @classmethod - def submit(cls, type, subject: str, payload=None, deduplicate=True): - # Deduplication is done against tasks that have not started yet only, - # and only on tasks without payloads - if deduplicate and not payload: - if cls.objects.filter( - type=type, - subject=subject, - completed__isnull=True, - failed__isnull=True, - locked__isnull=True, - ).exists(): - return - cls.objects.create(type=type, subject=subject, payload=payload) - - async def complete(self): - await self.__class__.objects.filter(id=self.id).aupdate( - completed=timezone.now() - ) - - async def fail(self, error): - await self.__class__.objects.filter(id=self.id).aupdate( - failed=timezone.now(), - error=error, - ) diff --git a/miniq/tasks.py b/miniq/tasks.py deleted file mode 100644 index fedf8fd..0000000 --- a/miniq/tasks.py +++ /dev/null @@ -1,34 +0,0 @@ -import traceback - -from users.tasks.follow import handle_follow_request -from users.tasks.identity import handle_identity_fetch -from users.tasks.inbox import handle_inbox_item - - -class TaskHandler: - - handlers = { - "identity_fetch": handle_identity_fetch, - "inbox_item": handle_inbox_item, - "follow_request": handle_follow_request, - } - - def __init__(self, task): - self.task = task - self.subject = self.task.subject - self.payload = self.task.payload - - async def handle(self): - try: - print(f"Task {self.task}: Starting") - if self.task.type not in self.handlers: - raise ValueError(f"Cannot handle type {self.task.type}") - await self.handlers[self.task.type]( - self, - ) - await self.task.complete() - print(f"Task {self.task}: Complete") - except BaseException as e: - print(f"Task {self.task}: Error {e}") - traceback.print_exc() - await self.task.fail(f"{e}\n\n" + traceback.format_exc()) diff --git a/miniq/views.py b/miniq/views.py deleted file mode 100644 index 80c9ee2..0000000 --- a/miniq/views.py +++ /dev/null @@ -1,51 +0,0 @@ -import asyncio -import time -import uuid - -from asgiref.sync import sync_to_async -from django.http import HttpResponse -from django.views import View - -from miniq.models import Task -from miniq.tasks import TaskHandler - - -class QueueProcessor(View): - """ - A view that takes some items off the queue and processes them. - Tries to limit its own runtime so it's within HTTP timeout limits. - """ - - START_TIMEOUT = 30 - TOTAL_TIMEOUT = 60 - LOCK_TIMEOUT = 200 - MAX_TASKS = 20 - - async def get(self, request): - start_time = time.monotonic() - processor_id = uuid.uuid4().hex - handled = 0 - self.tasks = [] - # For the first time period, launch tasks - while (time.monotonic() - start_time) < self.START_TIMEOUT: - # Remove completed tasks - self.tasks = [t for t in self.tasks if not t.done()] - # See if there's a new task - if len(self.tasks) < self.MAX_TASKS: - # Pop a task off the queue and run it - task = await sync_to_async(Task.get_one_available)(processor_id) - if task is not None: - self.tasks.append(asyncio.create_task(TaskHandler(task).handle())) - handled += 1 - # Prevent busylooping - await asyncio.sleep(0.01) - # TODO: Clean up old locks here - # Then wait for tasks to finish - while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT: - # Remove completed tasks - self.tasks = [t for t in self.tasks if not t.done()] - if not self.tasks: - break - # Prevent busylooping - await asyncio.sleep(1) - return HttpResponse(f"{handled} tasks handled") -- cgit v1.2.3