From fb6c409a9af5b8a686e977ee2251c359071e0ec3 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Sun, 6 Nov 2022 21:30:07 -0700 Subject: Rework task system and fetching. I can taste how close follow is to working. --- miniq/views.py | 30 +++++------------------------- 1 file changed, 5 insertions(+), 25 deletions(-) (limited to 'miniq/views.py') diff --git a/miniq/views.py b/miniq/views.py index 21275f8..80c9ee2 100644 --- a/miniq/views.py +++ b/miniq/views.py @@ -1,6 +1,5 @@ import asyncio import time -import traceback import uuid from asgiref.sync import sync_to_async @@ -8,7 +7,7 @@ from django.http import HttpResponse from django.views import View from miniq.models import Task -from users.models import Identity +from miniq.tasks import TaskHandler class QueueProcessor(View): @@ -19,7 +18,8 @@ class QueueProcessor(View): START_TIMEOUT = 30 TOTAL_TIMEOUT = 60 - MAX_TASKS = 10 + LOCK_TIMEOUT = 200 + MAX_TASKS = 20 async def get(self, request): start_time = time.monotonic() @@ -35,10 +35,11 @@ class QueueProcessor(View): # Pop a task off the queue and run it task = await sync_to_async(Task.get_one_available)(processor_id) if task is not None: - self.tasks.append(asyncio.create_task(self.run_task(task))) + self.tasks.append(asyncio.create_task(TaskHandler(task).handle())) handled += 1 # Prevent busylooping await asyncio.sleep(0.01) + # TODO: Clean up old locks here # Then wait for tasks to finish while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT: # Remove completed tasks @@ -48,24 +49,3 @@ class QueueProcessor(View): # Prevent busylooping await asyncio.sleep(1) return HttpResponse(f"{handled} tasks handled") - - async def run_task(self, task): - try: - print(f"Task {task}: Starting") - handler = getattr(self, f"handle_{task.type}", None) - if handler is None: - raise ValueError(f"Cannot handle type {task.type}") - await handler(task.subject, task.payload) - await task.complete() - print(f"Task {task}: Complete") - except BaseException as e: - print(f"Task {task}: Error {e}") - traceback.print_exc() - await task.fail(f"{e}\n\n" + traceback.format_exc()) - - async def handle_identity_fetch(self, subject, payload): - # Get the actor URI via webfinger - actor_uri, handle = await Identity.fetch_webfinger(subject) - # Get or create the identity, then fetch - identity = await sync_to_async(Identity.by_actor_uri)(actor_uri, create=True) - await identity.fetch_actor() -- cgit v1.2.3