blob: 80c9ee2a117468d38fbfb1daeb84266c9e1473b1 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
import asyncio
import time
import uuid
from asgiref.sync import sync_to_async
from django.http import HttpResponse
from django.views import View
from miniq.models import Task
from miniq.tasks import TaskHandler
class QueueProcessor(View):
"""
A view that takes some items off the queue and processes them.
Tries to limit its own runtime so it's within HTTP timeout limits.
"""
START_TIMEOUT = 30
TOTAL_TIMEOUT = 60
LOCK_TIMEOUT = 200
MAX_TASKS = 20
async def get(self, request):
start_time = time.monotonic()
processor_id = uuid.uuid4().hex
handled = 0
self.tasks = []
# For the first time period, launch tasks
while (time.monotonic() - start_time) < self.START_TIMEOUT:
# Remove completed tasks
self.tasks = [t for t in self.tasks if not t.done()]
# See if there's a new task
if len(self.tasks) < self.MAX_TASKS:
# Pop a task off the queue and run it
task = await sync_to_async(Task.get_one_available)(processor_id)
if task is not None:
self.tasks.append(asyncio.create_task(TaskHandler(task).handle()))
handled += 1
# Prevent busylooping
await asyncio.sleep(0.01)
# TODO: Clean up old locks here
# Then wait for tasks to finish
while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT:
# Remove completed tasks
self.tasks = [t for t in self.tasks if not t.done()]
if not self.tasks:
break
# Prevent busylooping
await asyncio.sleep(1)
return HttpResponse(f"{handled} tasks handled")
|