diff options
author | Andrew Godwin | 2022-11-06 21:30:07 -0700 |
---|---|---|
committer | Andrew Godwin | 2022-11-06 21:30:07 -0700 |
commit | fb6c409a9af5b8a686e977ee2251c359071e0ec3 (patch) | |
tree | 322469694585b766eb623b47955623b3e43c89b1 /miniq/views.py | |
parent | 0d5f7e7a891bec4b8af26c2d86d8be0209a3202d (diff) | |
download | takahe-fb6c409a9af5b8a686e977ee2251c359071e0ec3.tar.gz takahe-fb6c409a9af5b8a686e977ee2251c359071e0ec3.tar.bz2 takahe-fb6c409a9af5b8a686e977ee2251c359071e0ec3.zip |
Rework task system and fetching.
I can taste how close follow is to working.
Diffstat (limited to 'miniq/views.py')
-rw-r--r-- | miniq/views.py | 30 |
1 files changed, 5 insertions, 25 deletions
diff --git a/miniq/views.py b/miniq/views.py index 21275f8..80c9ee2 100644 --- a/miniq/views.py +++ b/miniq/views.py @@ -1,6 +1,5 @@ import asyncio import time -import traceback import uuid from asgiref.sync import sync_to_async @@ -8,7 +7,7 @@ from django.http import HttpResponse from django.views import View from miniq.models import Task -from users.models import Identity +from miniq.tasks import TaskHandler class QueueProcessor(View): @@ -19,7 +18,8 @@ class QueueProcessor(View): START_TIMEOUT = 30 TOTAL_TIMEOUT = 60 - MAX_TASKS = 10 + LOCK_TIMEOUT = 200 + MAX_TASKS = 20 async def get(self, request): start_time = time.monotonic() @@ -35,10 +35,11 @@ class QueueProcessor(View): # Pop a task off the queue and run it task = await sync_to_async(Task.get_one_available)(processor_id) if task is not None: - self.tasks.append(asyncio.create_task(self.run_task(task))) + self.tasks.append(asyncio.create_task(TaskHandler(task).handle())) handled += 1 # Prevent busylooping await asyncio.sleep(0.01) + # TODO: Clean up old locks here # Then wait for tasks to finish while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT: # Remove completed tasks @@ -48,24 +49,3 @@ class QueueProcessor(View): # Prevent busylooping await asyncio.sleep(1) return HttpResponse(f"{handled} tasks handled") - - async def run_task(self, task): - try: - print(f"Task {task}: Starting") - handler = getattr(self, f"handle_{task.type}", None) - if handler is None: - raise ValueError(f"Cannot handle type {task.type}") - await handler(task.subject, task.payload) - await task.complete() - print(f"Task {task}: Complete") - except BaseException as e: - print(f"Task {task}: Error {e}") - traceback.print_exc() - await task.fail(f"{e}\n\n" + traceback.format_exc()) - - async def handle_identity_fetch(self, subject, payload): - # Get the actor URI via webfinger - actor_uri, handle = await Identity.fetch_webfinger(subject) - # Get or create the identity, then fetch - identity = await sync_to_async(Identity.by_actor_uri)(actor_uri, create=True) - await identity.fetch_actor() |