summaryrefslogtreecommitdiffstats
path: root/miniq/views.py
diff options
context:
space:
mode:
Diffstat (limited to 'miniq/views.py')
-rw-r--r--miniq/views.py68
1 files changed, 68 insertions, 0 deletions
diff --git a/miniq/views.py b/miniq/views.py
new file mode 100644
index 0000000..12da7cd
--- /dev/null
+++ b/miniq/views.py
@@ -0,0 +1,68 @@
+import asyncio
+import time
+import traceback
+import uuid
+
+from asgiref.sync import sync_to_async
+from django.http import HttpResponse
+from django.views import View
+
+from miniq.models import Task
+from users.models import Identity
+
+
+class QueueProcessor(View):
+ """
+ A view that takes some items off the queue and processes them.
+ Tries to limit its own runtime so it's within HTTP timeout limits.
+ """
+
+ START_TIMEOUT = 30
+ TOTAL_TIMEOUT = 60
+ MAX_TASKS = 10
+
+ async def get(self, request):
+ start_time = time.monotonic()
+ processor_id = uuid.uuid4().hex
+ handled = 0
+ self.tasks = []
+ # For the first time period, launch tasks
+ while (time.monotonic() - start_time) < self.START_TIMEOUT:
+ # Remove completed tasks
+ self.tasks = [t for t in self.tasks if not t.done()]
+ # See if there's a new task
+ if len(self.tasks) < self.MAX_TASKS:
+ # Pop a task off the queue and run it
+ task = await sync_to_async(Task.get_one_available)(processor_id)
+ if task is not None:
+ self.tasks.append(asyncio.create_task(self.run_task(task)))
+ handled += 1
+ # Prevent busylooping
+ await asyncio.sleep(0.01)
+ # Then wait for tasks to finish
+ while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT:
+ # Remove completed tasks
+ self.tasks = [t for t in self.tasks if not t.done()]
+ if not self.tasks:
+ break
+ # Prevent busylooping
+ await asyncio.sleep(1)
+ return HttpResponse(f"{handled} tasks handled")
+
+ async def run_task(self, task):
+ try:
+ print(f"Task {task}: Starting")
+ handler = getattr(self, f"handle_{task.type}", None)
+ if handler is None:
+ raise ValueError(f"Cannot handle type {task.type}")
+ await handler(task.subject, task.payload)
+ await task.complete()
+ print(f"Task {task}: Complete")
+ except BaseException as e:
+ print(f"Task {task}: Error {e}")
+ traceback.print_exc()
+ await task.fail(f"{e}\n\n" + traceback.format_exc())
+
+ async def handle_identity_fetch(self, subject, payload):
+ identity = await sync_to_async(Identity.by_handle)(subject)
+ await identity.fetch_details()