summaryrefslogtreecommitdiffstats
path: root/miniq
diff options
context:
space:
mode:
authorAndrew Godwin2022-11-06 21:30:07 -0700
committerAndrew Godwin2022-11-06 21:30:07 -0700
commitfb6c409a9af5b8a686e977ee2251c359071e0ec3 (patch)
tree322469694585b766eb623b47955623b3e43c89b1 /miniq
parent0d5f7e7a891bec4b8af26c2d86d8be0209a3202d (diff)
downloadtakahe-fb6c409a9af5b8a686e977ee2251c359071e0ec3.tar.gz
takahe-fb6c409a9af5b8a686e977ee2251c359071e0ec3.tar.bz2
takahe-fb6c409a9af5b8a686e977ee2251c359071e0ec3.zip
Rework task system and fetching.
I can taste how close follow is to working.
Diffstat (limited to 'miniq')
-rw-r--r--miniq/migrations/0001_initial.py10
-rw-r--r--miniq/models.py5
-rw-r--r--miniq/tasks.py34
-rw-r--r--miniq/views.py30
4 files changed, 51 insertions, 28 deletions
diff --git a/miniq/migrations/0001_initial.py b/miniq/migrations/0001_initial.py
index 32c5d53..dc6d42b 100644
--- a/miniq/migrations/0001_initial.py
+++ b/miniq/migrations/0001_initial.py
@@ -1,4 +1,4 @@
-# Generated by Django 4.1.3 on 2022-11-06 19:58
+# Generated by Django 4.1.3 on 2022-11-07 04:19
from django.db import migrations, models
@@ -25,7 +25,13 @@ class Migration(migrations.Migration):
(
"type",
models.CharField(
- choices=[("identity_fetch", "Identity Fetch")], max_length=500
+ choices=[
+ ("identity_fetch", "Identity Fetch"),
+ ("inbox_item", "Inbox Item"),
+ ("follow_request", "Follow Request"),
+ ("follow_acknowledge", "Follow Acknowledge"),
+ ],
+ max_length=500,
),
),
("priority", models.IntegerField(default=0)),
diff --git a/miniq/models.py b/miniq/models.py
index 996b482..24d311c 100644
--- a/miniq/models.py
+++ b/miniq/models.py
@@ -11,6 +11,9 @@ class Task(models.Model):
class TypeChoices(models.TextChoices):
identity_fetch = "identity_fetch"
+ inbox_item = "inbox_item"
+ follow_request = "follow_request"
+ follow_acknowledge = "follow_acknowledge"
type = models.CharField(max_length=500, choices=TypeChoices.choices)
priority = models.IntegerField(default=0)
@@ -42,7 +45,7 @@ class Task(models.Model):
return next_task
@classmethod
- def submit(cls, type, subject, payload=None, deduplicate=True):
+ def submit(cls, type, subject: str, payload=None, deduplicate=True):
# Deduplication is done against tasks that have not started yet only,
# and only on tasks without payloads
if deduplicate and not payload:
diff --git a/miniq/tasks.py b/miniq/tasks.py
new file mode 100644
index 0000000..fedf8fd
--- /dev/null
+++ b/miniq/tasks.py
@@ -0,0 +1,34 @@
+import traceback
+
+from users.tasks.follow import handle_follow_request
+from users.tasks.identity import handle_identity_fetch
+from users.tasks.inbox import handle_inbox_item
+
+
+class TaskHandler:
+
+ handlers = {
+ "identity_fetch": handle_identity_fetch,
+ "inbox_item": handle_inbox_item,
+ "follow_request": handle_follow_request,
+ }
+
+ def __init__(self, task):
+ self.task = task
+ self.subject = self.task.subject
+ self.payload = self.task.payload
+
+ async def handle(self):
+ try:
+ print(f"Task {self.task}: Starting")
+ if self.task.type not in self.handlers:
+ raise ValueError(f"Cannot handle type {self.task.type}")
+ await self.handlers[self.task.type](
+ self,
+ )
+ await self.task.complete()
+ print(f"Task {self.task}: Complete")
+ except BaseException as e:
+ print(f"Task {self.task}: Error {e}")
+ traceback.print_exc()
+ await self.task.fail(f"{e}\n\n" + traceback.format_exc())
diff --git a/miniq/views.py b/miniq/views.py
index 21275f8..80c9ee2 100644
--- a/miniq/views.py
+++ b/miniq/views.py
@@ -1,6 +1,5 @@
import asyncio
import time
-import traceback
import uuid
from asgiref.sync import sync_to_async
@@ -8,7 +7,7 @@ from django.http import HttpResponse
from django.views import View
from miniq.models import Task
-from users.models import Identity
+from miniq.tasks import TaskHandler
class QueueProcessor(View):
@@ -19,7 +18,8 @@ class QueueProcessor(View):
START_TIMEOUT = 30
TOTAL_TIMEOUT = 60
- MAX_TASKS = 10
+ LOCK_TIMEOUT = 200
+ MAX_TASKS = 20
async def get(self, request):
start_time = time.monotonic()
@@ -35,10 +35,11 @@ class QueueProcessor(View):
# Pop a task off the queue and run it
task = await sync_to_async(Task.get_one_available)(processor_id)
if task is not None:
- self.tasks.append(asyncio.create_task(self.run_task(task)))
+ self.tasks.append(asyncio.create_task(TaskHandler(task).handle()))
handled += 1
# Prevent busylooping
await asyncio.sleep(0.01)
+ # TODO: Clean up old locks here
# Then wait for tasks to finish
while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT:
# Remove completed tasks
@@ -48,24 +49,3 @@ class QueueProcessor(View):
# Prevent busylooping
await asyncio.sleep(1)
return HttpResponse(f"{handled} tasks handled")
-
- async def run_task(self, task):
- try:
- print(f"Task {task}: Starting")
- handler = getattr(self, f"handle_{task.type}", None)
- if handler is None:
- raise ValueError(f"Cannot handle type {task.type}")
- await handler(task.subject, task.payload)
- await task.complete()
- print(f"Task {task}: Complete")
- except BaseException as e:
- print(f"Task {task}: Error {e}")
- traceback.print_exc()
- await task.fail(f"{e}\n\n" + traceback.format_exc())
-
- async def handle_identity_fetch(self, subject, payload):
- # Get the actor URI via webfinger
- actor_uri, handle = await Identity.fetch_webfinger(subject)
- # Get or create the identity, then fetch
- identity = await sync_to_async(Identity.by_actor_uri)(actor_uri, create=True)
- await identity.fetch_actor()