diff options
author | Andrew Godwin | 2022-11-08 23:06:29 -0700 |
---|---|---|
committer | Andrew Godwin | 2022-11-09 22:29:49 -0700 |
commit | 61c324508e62bb640b4526183d0837fc57d742c2 (patch) | |
tree | 618ee8c88ce8a28224a187dc33b7c5fad6831d04 /miniq/views.py | |
parent | 8a0a7558894afce8d25b7f0dc16775e899b72a94 (diff) | |
download | takahe-61c324508e62bb640b4526183d0837fc57d742c2.tar.gz takahe-61c324508e62bb640b4526183d0837fc57d742c2.tar.bz2 takahe-61c324508e62bb640b4526183d0837fc57d742c2.zip |
Midway point in task refactor - changing direction
Diffstat (limited to 'miniq/views.py')
-rw-r--r-- | miniq/views.py | 51 |
1 files changed, 0 insertions, 51 deletions
diff --git a/miniq/views.py b/miniq/views.py deleted file mode 100644 index 80c9ee2..0000000 --- a/miniq/views.py +++ /dev/null @@ -1,51 +0,0 @@ -import asyncio -import time -import uuid - -from asgiref.sync import sync_to_async -from django.http import HttpResponse -from django.views import View - -from miniq.models import Task -from miniq.tasks import TaskHandler - - -class QueueProcessor(View): - """ - A view that takes some items off the queue and processes them. - Tries to limit its own runtime so it's within HTTP timeout limits. - """ - - START_TIMEOUT = 30 - TOTAL_TIMEOUT = 60 - LOCK_TIMEOUT = 200 - MAX_TASKS = 20 - - async def get(self, request): - start_time = time.monotonic() - processor_id = uuid.uuid4().hex - handled = 0 - self.tasks = [] - # For the first time period, launch tasks - while (time.monotonic() - start_time) < self.START_TIMEOUT: - # Remove completed tasks - self.tasks = [t for t in self.tasks if not t.done()] - # See if there's a new task - if len(self.tasks) < self.MAX_TASKS: - # Pop a task off the queue and run it - task = await sync_to_async(Task.get_one_available)(processor_id) - if task is not None: - self.tasks.append(asyncio.create_task(TaskHandler(task).handle())) - handled += 1 - # Prevent busylooping - await asyncio.sleep(0.01) - # TODO: Clean up old locks here - # Then wait for tasks to finish - while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT: - # Remove completed tasks - self.tasks = [t for t in self.tasks if not t.done()] - if not self.tasks: - break - # Prevent busylooping - await asyncio.sleep(1) - return HttpResponse(f"{handled} tasks handled") |