summaryrefslogtreecommitdiffstats
path: root/miniq/views.py
diff options
context:
space:
mode:
Diffstat (limited to 'miniq/views.py')
-rw-r--r--miniq/views.py30
1 files changed, 5 insertions, 25 deletions
diff --git a/miniq/views.py b/miniq/views.py
index 21275f8..80c9ee2 100644
--- a/miniq/views.py
+++ b/miniq/views.py
@@ -1,6 +1,5 @@
import asyncio
import time
-import traceback
import uuid
from asgiref.sync import sync_to_async
@@ -8,7 +7,7 @@ from django.http import HttpResponse
from django.views import View
from miniq.models import Task
-from users.models import Identity
+from miniq.tasks import TaskHandler
class QueueProcessor(View):
@@ -19,7 +18,8 @@ class QueueProcessor(View):
START_TIMEOUT = 30
TOTAL_TIMEOUT = 60
- MAX_TASKS = 10
+ LOCK_TIMEOUT = 200
+ MAX_TASKS = 20
async def get(self, request):
start_time = time.monotonic()
@@ -35,10 +35,11 @@ class QueueProcessor(View):
# Pop a task off the queue and run it
task = await sync_to_async(Task.get_one_available)(processor_id)
if task is not None:
- self.tasks.append(asyncio.create_task(self.run_task(task)))
+ self.tasks.append(asyncio.create_task(TaskHandler(task).handle()))
handled += 1
# Prevent busylooping
await asyncio.sleep(0.01)
+ # TODO: Clean up old locks here
# Then wait for tasks to finish
while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT:
# Remove completed tasks
@@ -48,24 +49,3 @@ class QueueProcessor(View):
# Prevent busylooping
await asyncio.sleep(1)
return HttpResponse(f"{handled} tasks handled")
-
- async def run_task(self, task):
- try:
- print(f"Task {task}: Starting")
- handler = getattr(self, f"handle_{task.type}", None)
- if handler is None:
- raise ValueError(f"Cannot handle type {task.type}")
- await handler(task.subject, task.payload)
- await task.complete()
- print(f"Task {task}: Complete")
- except BaseException as e:
- print(f"Task {task}: Error {e}")
- traceback.print_exc()
- await task.fail(f"{e}\n\n" + traceback.format_exc())
-
- async def handle_identity_fetch(self, subject, payload):
- # Get the actor URI via webfinger
- actor_uri, handle = await Identity.fetch_webfinger(subject)
- # Get or create the identity, then fetch
- identity = await sync_to_async(Identity.by_actor_uri)(actor_uri, create=True)
- await identity.fetch_actor()