summaryrefslogtreecommitdiffstats
path: root/miniq/views.py
blob: 21275f83312dadeb431fc645332643530c555a7d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import asyncio
import time
import traceback
import uuid

from asgiref.sync import sync_to_async
from django.http import HttpResponse
from django.views import View

from miniq.models import Task
from users.models import Identity


class QueueProcessor(View):
    """
    A view that takes some items off the queue and processes them.
    Tries to limit its own runtime so it's within HTTP timeout limits.
    """

    START_TIMEOUT = 30
    TOTAL_TIMEOUT = 60
    MAX_TASKS = 10

    async def get(self, request):
        start_time = time.monotonic()
        processor_id = uuid.uuid4().hex
        handled = 0
        self.tasks = []
        # For the first time period, launch tasks
        while (time.monotonic() - start_time) < self.START_TIMEOUT:
            # Remove completed tasks
            self.tasks = [t for t in self.tasks if not t.done()]
            # See if there's a new task
            if len(self.tasks) < self.MAX_TASKS:
                # Pop a task off the queue and run it
                task = await sync_to_async(Task.get_one_available)(processor_id)
                if task is not None:
                    self.tasks.append(asyncio.create_task(self.run_task(task)))
                    handled += 1
            # Prevent busylooping
            await asyncio.sleep(0.01)
        # Then wait for tasks to finish
        while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT:
            # Remove completed tasks
            self.tasks = [t for t in self.tasks if not t.done()]
            if not self.tasks:
                break
            # Prevent busylooping
            await asyncio.sleep(1)
        return HttpResponse(f"{handled} tasks handled")

    async def run_task(self, task):
        try:
            print(f"Task {task}: Starting")
            handler = getattr(self, f"handle_{task.type}", None)
            if handler is None:
                raise ValueError(f"Cannot handle type {task.type}")
            await handler(task.subject, task.payload)
            await task.complete()
            print(f"Task {task}: Complete")
        except BaseException as e:
            print(f"Task {task}: Error {e}")
            traceback.print_exc()
            await task.fail(f"{e}\n\n" + traceback.format_exc())

    async def handle_identity_fetch(self, subject, payload):
        # Get the actor URI via webfinger
        actor_uri, handle = await Identity.fetch_webfinger(subject)
        # Get or create the identity, then fetch
        identity = await sync_to_async(Identity.by_actor_uri)(actor_uri, create=True)
        await identity.fetch_actor()