diff options
author | Andrew Godwin | 2022-11-05 22:49:25 -0600 |
---|---|---|
committer | Andrew Godwin | 2022-11-05 22:49:25 -0600 |
commit | a2404e01cdbeef2ba332e147a5f2f1ca0a0310d7 (patch) | |
tree | 9aa1ae2ffad20a7afe8eac8cbe99ab9a53dcc0ca /miniq/views.py | |
parent | 56de2362a01089c8a5ca3c6e1affcade00ffdfce (diff) | |
download | takahe-a2404e01cdbeef2ba332e147a5f2f1ca0a0310d7.tar.gz takahe-a2404e01cdbeef2ba332e147a5f2f1ca0a0310d7.tar.bz2 takahe-a2404e01cdbeef2ba332e147a5f2f1ca0a0310d7.zip |
Queuing system and lazy profile fetch
Diffstat (limited to 'miniq/views.py')
-rw-r--r-- | miniq/views.py | 68 |
1 files changed, 68 insertions, 0 deletions
diff --git a/miniq/views.py b/miniq/views.py new file mode 100644 index 0000000..12da7cd --- /dev/null +++ b/miniq/views.py @@ -0,0 +1,68 @@ +import asyncio +import time +import traceback +import uuid + +from asgiref.sync import sync_to_async +from django.http import HttpResponse +from django.views import View + +from miniq.models import Task +from users.models import Identity + + +class QueueProcessor(View): + """ + A view that takes some items off the queue and processes them. + Tries to limit its own runtime so it's within HTTP timeout limits. + """ + + START_TIMEOUT = 30 + TOTAL_TIMEOUT = 60 + MAX_TASKS = 10 + + async def get(self, request): + start_time = time.monotonic() + processor_id = uuid.uuid4().hex + handled = 0 + self.tasks = [] + # For the first time period, launch tasks + while (time.monotonic() - start_time) < self.START_TIMEOUT: + # Remove completed tasks + self.tasks = [t for t in self.tasks if not t.done()] + # See if there's a new task + if len(self.tasks) < self.MAX_TASKS: + # Pop a task off the queue and run it + task = await sync_to_async(Task.get_one_available)(processor_id) + if task is not None: + self.tasks.append(asyncio.create_task(self.run_task(task))) + handled += 1 + # Prevent busylooping + await asyncio.sleep(0.01) + # Then wait for tasks to finish + while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT: + # Remove completed tasks + self.tasks = [t for t in self.tasks if not t.done()] + if not self.tasks: + break + # Prevent busylooping + await asyncio.sleep(1) + return HttpResponse(f"{handled} tasks handled") + + async def run_task(self, task): + try: + print(f"Task {task}: Starting") + handler = getattr(self, f"handle_{task.type}", None) + if handler is None: + raise ValueError(f"Cannot handle type {task.type}") + await handler(task.subject, task.payload) + await task.complete() + print(f"Task {task}: Complete") + except BaseException as e: + print(f"Task {task}: Error {e}") + traceback.print_exc() + await task.fail(f"{e}\n\n" + traceback.format_exc()) + + async def handle_identity_fetch(self, subject, payload): + identity = await sync_to_async(Identity.by_handle)(subject) + await identity.fetch_details() |