From fb6c409a9af5b8a686e977ee2251c359071e0ec3 Mon Sep 17 00:00:00 2001
From: Andrew Godwin
Date: Sun, 6 Nov 2022 21:30:07 -0700
Subject: Rework task system and fetching.

I can taste how close follow is to working.
---
 miniq/migrations/0001_initial.py | 10 ++++++++--
 miniq/models.py                  |  5 ++++-
 miniq/tasks.py                   | 34 ++++++++++++++++++++++++++++++++++
 miniq/views.py                   | 30 +++++-------------------------
 4 files changed, 51 insertions(+), 28 deletions(-)
 create mode 100644 miniq/tasks.py

(limited to 'miniq')

diff --git a/miniq/migrations/0001_initial.py b/miniq/migrations/0001_initial.py
index 32c5d53..dc6d42b 100644
--- a/miniq/migrations/0001_initial.py
+++ b/miniq/migrations/0001_initial.py
@@ -1,4 +1,4 @@
-# Generated by Django 4.1.3 on 2022-11-06 19:58
+# Generated by Django 4.1.3 on 2022-11-07 04:19
 
 from django.db import migrations, models
 
@@ -25,7 +25,13 @@ class Migration(migrations.Migration):
                 (
                     "type",
                     models.CharField(
-                        choices=[("identity_fetch", "Identity Fetch")], max_length=500
+                        choices=[
+                            ("identity_fetch", "Identity Fetch"),
+                            ("inbox_item", "Inbox Item"),
+                            ("follow_request", "Follow Request"),
+                            ("follow_acknowledge", "Follow Acknowledge"),
+                        ],
+                        max_length=500,
                     ),
                 ),
                 ("priority", models.IntegerField(default=0)),
diff --git a/miniq/models.py b/miniq/models.py
index 996b482..24d311c 100644
--- a/miniq/models.py
+++ b/miniq/models.py
@@ -11,6 +11,9 @@ class Task(models.Model):
 
     class TypeChoices(models.TextChoices):
         identity_fetch = "identity_fetch"
+        inbox_item = "inbox_item"
+        follow_request = "follow_request"
+        follow_acknowledge = "follow_acknowledge"
 
     type = models.CharField(max_length=500, choices=TypeChoices.choices)
     priority = models.IntegerField(default=0)
@@ -42,7 +45,7 @@ class Task(models.Model):
             return next_task
 
     @classmethod
-    def submit(cls, type, subject, payload=None, deduplicate=True):
+    def submit(cls, type, subject: str, payload=None, deduplicate=True):
         # Deduplication is done against tasks that have not started yet only,
         # and only on tasks without payloads
         if deduplicate and not payload:
diff --git a/miniq/tasks.py b/miniq/tasks.py
new file mode 100644
index 0000000..fedf8fd
--- /dev/null
+++ b/miniq/tasks.py
@@ -0,0 +1,34 @@
+import traceback
+
+from users.tasks.follow import handle_follow_request
+from users.tasks.identity import handle_identity_fetch
+from users.tasks.inbox import handle_inbox_item
+
+
+class TaskHandler:
+
+    handlers = {
+        "identity_fetch": handle_identity_fetch,
+        "inbox_item": handle_inbox_item,
+        "follow_request": handle_follow_request,
+    }
+
+    def __init__(self, task):
+        self.task = task
+        self.subject = self.task.subject
+        self.payload = self.task.payload
+
+    async def handle(self):
+        try:
+            print(f"Task {self.task}: Starting")
+            if self.task.type not in self.handlers:
+                raise ValueError(f"Cannot handle type {self.task.type}")
+            await self.handlers[self.task.type](
+                self,
+            )
+            await self.task.complete()
+            print(f"Task {self.task}: Complete")
+        except BaseException as e:
+            print(f"Task {self.task}: Error {e}")
+            traceback.print_exc()
+            await self.task.fail(f"{e}\n\n" + traceback.format_exc())
diff --git a/miniq/views.py b/miniq/views.py
index 21275f8..80c9ee2 100644
--- a/miniq/views.py
+++ b/miniq/views.py
@@ -1,6 +1,5 @@
 import asyncio
 import time
-import traceback
 import uuid
 
 from asgiref.sync import sync_to_async
@@ -8,7 +7,7 @@ from django.http import HttpResponse
 from django.views import View
 
 from miniq.models import Task
-from users.models import Identity
+from miniq.tasks import TaskHandler
 
 
 class QueueProcessor(View):
@@ -19,7 +18,8 @@ class QueueProcessor(View):
 
     START_TIMEOUT = 30
     TOTAL_TIMEOUT = 60
-    MAX_TASKS = 10
+    LOCK_TIMEOUT = 200
+    MAX_TASKS = 20
 
     async def get(self, request):
         start_time = time.monotonic()
@@ -35,10 +35,11 @@ class QueueProcessor(View):
                 # Pop a task off the queue and run it
                 task = await sync_to_async(Task.get_one_available)(processor_id)
                 if task is not None:
-                    self.tasks.append(asyncio.create_task(self.run_task(task)))
+                    self.tasks.append(asyncio.create_task(TaskHandler(task).handle()))
                     handled += 1
             # Prevent busylooping
             await asyncio.sleep(0.01)
+        # TODO: Clean up old locks here
         # Then wait for tasks to finish
         while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT:
             # Remove completed tasks
@@ -48,24 +49,3 @@ class QueueProcessor(View):
             # Prevent busylooping
             await asyncio.sleep(1)
         return HttpResponse(f"{handled} tasks handled")
-
-    async def run_task(self, task):
-        try:
-            print(f"Task {task}: Starting")
-            handler = getattr(self, f"handle_{task.type}", None)
-            if handler is None:
-                raise ValueError(f"Cannot handle type {task.type}")
-            await handler(task.subject, task.payload)
-            await task.complete()
-            print(f"Task {task}: Complete")
-        except BaseException as e:
-            print(f"Task {task}: Error {e}")
-            traceback.print_exc()
-            await task.fail(f"{e}\n\n" + traceback.format_exc())
-
-    async def handle_identity_fetch(self, subject, payload):
-        # Get the actor URI via webfinger
-        actor_uri, handle = await Identity.fetch_webfinger(subject)
-        # Get or create the identity, then fetch
-        identity = await sync_to_async(Identity.by_actor_uri)(actor_uri, create=True)
-        await identity.fetch_actor()
-- 
cgit v1.2.3