summaryrefslogtreecommitdiffstats
path: root/miniq
diff options
context:
space:
mode:
Diffstat (limited to 'miniq')
-rw-r--r--miniq/__init__.py0
-rw-r--r--miniq/admin.py21
-rw-r--r--miniq/apps.py6
-rw-r--r--miniq/migrations/0001_initial.py48
-rw-r--r--miniq/migrations/__init__.py0
-rw-r--r--miniq/models.py71
-rw-r--r--miniq/tasks.py34
-rw-r--r--miniq/views.py51
8 files changed, 0 insertions, 231 deletions
diff --git a/miniq/__init__.py b/miniq/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/miniq/__init__.py
+++ /dev/null
diff --git a/miniq/admin.py b/miniq/admin.py
deleted file mode 100644
index 1166f89..0000000
--- a/miniq/admin.py
+++ /dev/null
@@ -1,21 +0,0 @@
-from django.contrib import admin
-
-from miniq.models import Task
-
-
-@admin.register(Task)
-class TaskAdmin(admin.ModelAdmin):
-
- list_display = ["id", "created", "type", "subject", "completed", "failed"]
- ordering = ["-created"]
- actions = ["reset"]
-
- @admin.action(description="Reset Task")
- def reset(self, request, queryset):
- queryset.update(
- failed=None,
- completed=None,
- locked=None,
- locked_by=None,
- error=None,
- )
diff --git a/miniq/apps.py b/miniq/apps.py
deleted file mode 100644
index 4c7e773..0000000
--- a/miniq/apps.py
+++ /dev/null
@@ -1,6 +0,0 @@
-from django.apps import AppConfig
-
-
-class MiniqConfig(AppConfig):
- default_auto_field = "django.db.models.BigAutoField"
- name = "miniq"
diff --git a/miniq/migrations/0001_initial.py b/miniq/migrations/0001_initial.py
deleted file mode 100644
index dc6d42b..0000000
--- a/miniq/migrations/0001_initial.py
+++ /dev/null
@@ -1,48 +0,0 @@
-# Generated by Django 4.1.3 on 2022-11-07 04:19
-
-from django.db import migrations, models
-
-
-class Migration(migrations.Migration):
-
- initial = True
-
- dependencies = []
-
- operations = [
- migrations.CreateModel(
- name="Task",
- fields=[
- (
- "id",
- models.BigAutoField(
- auto_created=True,
- primary_key=True,
- serialize=False,
- verbose_name="ID",
- ),
- ),
- (
- "type",
- models.CharField(
- choices=[
- ("identity_fetch", "Identity Fetch"),
- ("inbox_item", "Inbox Item"),
- ("follow_request", "Follow Request"),
- ("follow_acknowledge", "Follow Acknowledge"),
- ],
- max_length=500,
- ),
- ),
- ("priority", models.IntegerField(default=0)),
- ("subject", models.TextField()),
- ("payload", models.JSONField(blank=True, null=True)),
- ("error", models.TextField(blank=True, null=True)),
- ("created", models.DateTimeField(auto_now_add=True)),
- ("completed", models.DateTimeField(blank=True, null=True)),
- ("failed", models.DateTimeField(blank=True, null=True)),
- ("locked", models.DateTimeField(blank=True, null=True)),
- ("locked_by", models.CharField(blank=True, max_length=500, null=True)),
- ],
- ),
- ]
diff --git a/miniq/migrations/__init__.py b/miniq/migrations/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/miniq/migrations/__init__.py
+++ /dev/null
diff --git a/miniq/models.py b/miniq/models.py
deleted file mode 100644
index 24d311c..0000000
--- a/miniq/models.py
+++ /dev/null
@@ -1,71 +0,0 @@
-from typing import Optional
-
-from django.db import models, transaction
-from django.utils import timezone
-
-
-class Task(models.Model):
- """
- A task that must be done by a queue processor
- """
-
- class TypeChoices(models.TextChoices):
- identity_fetch = "identity_fetch"
- inbox_item = "inbox_item"
- follow_request = "follow_request"
- follow_acknowledge = "follow_acknowledge"
-
- type = models.CharField(max_length=500, choices=TypeChoices.choices)
- priority = models.IntegerField(default=0)
- subject = models.TextField()
- payload = models.JSONField(blank=True, null=True)
- error = models.TextField(blank=True, null=True)
-
- created = models.DateTimeField(auto_now_add=True)
- completed = models.DateTimeField(blank=True, null=True)
- failed = models.DateTimeField(blank=True, null=True)
- locked = models.DateTimeField(blank=True, null=True)
- locked_by = models.CharField(max_length=500, blank=True, null=True)
-
- def __str__(self):
- return f"{self.id}/{self.type}({self.subject})"
-
- @classmethod
- def get_one_available(cls, processor_id) -> Optional["Task"]:
- """
- Gets one task off the list while reserving it, atomically.
- """
- with transaction.atomic():
- next_task = cls.objects.filter(locked__isnull=True).first()
- if next_task is None:
- return None
- next_task.locked = timezone.now()
- next_task.locked_by = processor_id
- next_task.save()
- return next_task
-
- @classmethod
- def submit(cls, type, subject: str, payload=None, deduplicate=True):
- # Deduplication is done against tasks that have not started yet only,
- # and only on tasks without payloads
- if deduplicate and not payload:
- if cls.objects.filter(
- type=type,
- subject=subject,
- completed__isnull=True,
- failed__isnull=True,
- locked__isnull=True,
- ).exists():
- return
- cls.objects.create(type=type, subject=subject, payload=payload)
-
- async def complete(self):
- await self.__class__.objects.filter(id=self.id).aupdate(
- completed=timezone.now()
- )
-
- async def fail(self, error):
- await self.__class__.objects.filter(id=self.id).aupdate(
- failed=timezone.now(),
- error=error,
- )
diff --git a/miniq/tasks.py b/miniq/tasks.py
deleted file mode 100644
index fedf8fd..0000000
--- a/miniq/tasks.py
+++ /dev/null
@@ -1,34 +0,0 @@
-import traceback
-
-from users.tasks.follow import handle_follow_request
-from users.tasks.identity import handle_identity_fetch
-from users.tasks.inbox import handle_inbox_item
-
-
-class TaskHandler:
-
- handlers = {
- "identity_fetch": handle_identity_fetch,
- "inbox_item": handle_inbox_item,
- "follow_request": handle_follow_request,
- }
-
- def __init__(self, task):
- self.task = task
- self.subject = self.task.subject
- self.payload = self.task.payload
-
- async def handle(self):
- try:
- print(f"Task {self.task}: Starting")
- if self.task.type not in self.handlers:
- raise ValueError(f"Cannot handle type {self.task.type}")
- await self.handlers[self.task.type](
- self,
- )
- await self.task.complete()
- print(f"Task {self.task}: Complete")
- except BaseException as e:
- print(f"Task {self.task}: Error {e}")
- traceback.print_exc()
- await self.task.fail(f"{e}\n\n" + traceback.format_exc())
diff --git a/miniq/views.py b/miniq/views.py
deleted file mode 100644
index 80c9ee2..0000000
--- a/miniq/views.py
+++ /dev/null
@@ -1,51 +0,0 @@
-import asyncio
-import time
-import uuid
-
-from asgiref.sync import sync_to_async
-from django.http import HttpResponse
-from django.views import View
-
-from miniq.models import Task
-from miniq.tasks import TaskHandler
-
-
-class QueueProcessor(View):
- """
- A view that takes some items off the queue and processes them.
- Tries to limit its own runtime so it's within HTTP timeout limits.
- """
-
- START_TIMEOUT = 30
- TOTAL_TIMEOUT = 60
- LOCK_TIMEOUT = 200
- MAX_TASKS = 20
-
- async def get(self, request):
- start_time = time.monotonic()
- processor_id = uuid.uuid4().hex
- handled = 0
- self.tasks = []
- # For the first time period, launch tasks
- while (time.monotonic() - start_time) < self.START_TIMEOUT:
- # Remove completed tasks
- self.tasks = [t for t in self.tasks if not t.done()]
- # See if there's a new task
- if len(self.tasks) < self.MAX_TASKS:
- # Pop a task off the queue and run it
- task = await sync_to_async(Task.get_one_available)(processor_id)
- if task is not None:
- self.tasks.append(asyncio.create_task(TaskHandler(task).handle()))
- handled += 1
- # Prevent busylooping
- await asyncio.sleep(0.01)
- # TODO: Clean up old locks here
- # Then wait for tasks to finish
- while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT:
- # Remove completed tasks
- self.tasks = [t for t in self.tasks if not t.done()]
- if not self.tasks:
- break
- # Prevent busylooping
- await asyncio.sleep(1)
- return HttpResponse(f"{handled} tasks handled")