From fb6c409a9af5b8a686e977ee2251c359071e0ec3 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Sun, 6 Nov 2022 21:30:07 -0700 Subject: Rework task system and fetching. I can taste how close follow is to working. --- miniq/migrations/0001_initial.py | 10 ++++++++-- miniq/models.py | 5 ++++- miniq/tasks.py | 34 ++++++++++++++++++++++++++++++++++ miniq/views.py | 30 +++++------------------------- 4 files changed, 51 insertions(+), 28 deletions(-) create mode 100644 miniq/tasks.py (limited to 'miniq') diff --git a/miniq/migrations/0001_initial.py b/miniq/migrations/0001_initial.py index 32c5d53..dc6d42b 100644 --- a/miniq/migrations/0001_initial.py +++ b/miniq/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 4.1.3 on 2022-11-06 19:58 +# Generated by Django 4.1.3 on 2022-11-07 04:19 from django.db import migrations, models @@ -25,7 +25,13 @@ class Migration(migrations.Migration): ( "type", models.CharField( - choices=[("identity_fetch", "Identity Fetch")], max_length=500 + choices=[ + ("identity_fetch", "Identity Fetch"), + ("inbox_item", "Inbox Item"), + ("follow_request", "Follow Request"), + ("follow_acknowledge", "Follow Acknowledge"), + ], + max_length=500, ), ), ("priority", models.IntegerField(default=0)), diff --git a/miniq/models.py b/miniq/models.py index 996b482..24d311c 100644 --- a/miniq/models.py +++ b/miniq/models.py @@ -11,6 +11,9 @@ class Task(models.Model): class TypeChoices(models.TextChoices): identity_fetch = "identity_fetch" + inbox_item = "inbox_item" + follow_request = "follow_request" + follow_acknowledge = "follow_acknowledge" type = models.CharField(max_length=500, choices=TypeChoices.choices) priority = models.IntegerField(default=0) @@ -42,7 +45,7 @@ class Task(models.Model): return next_task @classmethod - def submit(cls, type, subject, payload=None, deduplicate=True): + def submit(cls, type, subject: str, payload=None, deduplicate=True): # Deduplication is done against tasks that have not started yet only, # and only on tasks without payloads if deduplicate and not payload: diff --git a/miniq/tasks.py b/miniq/tasks.py new file mode 100644 index 0000000..fedf8fd --- /dev/null +++ b/miniq/tasks.py @@ -0,0 +1,34 @@ +import traceback + +from users.tasks.follow import handle_follow_request +from users.tasks.identity import handle_identity_fetch +from users.tasks.inbox import handle_inbox_item + + +class TaskHandler: + + handlers = { + "identity_fetch": handle_identity_fetch, + "inbox_item": handle_inbox_item, + "follow_request": handle_follow_request, + } + + def __init__(self, task): + self.task = task + self.subject = self.task.subject + self.payload = self.task.payload + + async def handle(self): + try: + print(f"Task {self.task}: Starting") + if self.task.type not in self.handlers: + raise ValueError(f"Cannot handle type {self.task.type}") + await self.handlers[self.task.type]( + self, + ) + await self.task.complete() + print(f"Task {self.task}: Complete") + except BaseException as e: + print(f"Task {self.task}: Error {e}") + traceback.print_exc() + await self.task.fail(f"{e}\n\n" + traceback.format_exc()) diff --git a/miniq/views.py b/miniq/views.py index 21275f8..80c9ee2 100644 --- a/miniq/views.py +++ b/miniq/views.py @@ -1,6 +1,5 @@ import asyncio import time -import traceback import uuid from asgiref.sync import sync_to_async @@ -8,7 +7,7 @@ from django.http import HttpResponse from django.views import View from miniq.models import Task -from users.models import Identity +from miniq.tasks import TaskHandler class QueueProcessor(View): @@ -19,7 +18,8 @@ class QueueProcessor(View): START_TIMEOUT = 30 TOTAL_TIMEOUT = 60 - MAX_TASKS = 10 + LOCK_TIMEOUT = 200 + MAX_TASKS = 20 async def get(self, request): start_time = time.monotonic() @@ -35,10 +35,11 @@ class QueueProcessor(View): # Pop a task off the queue and run it task = await sync_to_async(Task.get_one_available)(processor_id) if task is not None: - self.tasks.append(asyncio.create_task(self.run_task(task))) + self.tasks.append(asyncio.create_task(TaskHandler(task).handle())) handled += 1 # Prevent busylooping await asyncio.sleep(0.01) + # TODO: Clean up old locks here # Then wait for tasks to finish while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT: # Remove completed tasks @@ -48,24 +49,3 @@ class QueueProcessor(View): # Prevent busylooping await asyncio.sleep(1) return HttpResponse(f"{handled} tasks handled") - - async def run_task(self, task): - try: - print(f"Task {task}: Starting") - handler = getattr(self, f"handle_{task.type}", None) - if handler is None: - raise ValueError(f"Cannot handle type {task.type}") - await handler(task.subject, task.payload) - await task.complete() - print(f"Task {task}: Complete") - except BaseException as e: - print(f"Task {task}: Error {e}") - traceback.print_exc() - await task.fail(f"{e}\n\n" + traceback.format_exc()) - - async def handle_identity_fetch(self, subject, payload): - # Get the actor URI via webfinger - actor_uri, handle = await Identity.fetch_webfinger(subject) - # Get or create the identity, then fetch - identity = await sync_to_async(Identity.by_actor_uri)(actor_uri, create=True) - await identity.fetch_actor() -- cgit v1.2.3