summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndrew Godwin2022-11-05 22:49:25 -0600
committerAndrew Godwin2022-11-05 22:49:25 -0600
commita2404e01cdbeef2ba332e147a5f2f1ca0a0310d7 (patch)
tree9aa1ae2ffad20a7afe8eac8cbe99ab9a53dcc0ca
parent56de2362a01089c8a5ca3c6e1affcade00ffdfce (diff)
downloadtakahe-a2404e01cdbeef2ba332e147a5f2f1ca0a0310d7.tar.gz
takahe-a2404e01cdbeef2ba332e147a5f2f1ca0a0310d7.tar.bz2
takahe-a2404e01cdbeef2ba332e147a5f2f1ca0a0310d7.zip
Queuing system and lazy profile fetch
-rw-r--r--miniq/__init__.py0
-rw-r--r--miniq/admin.py21
-rw-r--r--miniq/apps.py6
-rw-r--r--miniq/migrations/0001_initial.py37
-rw-r--r--miniq/migrations/__init__.py0
-rw-r--r--miniq/models.py68
-rw-r--r--miniq/views.py68
-rw-r--r--static/css/style.css2
-rw-r--r--takahe/settings.py2
-rw-r--r--takahe/urls.py3
-rw-r--r--templates/identity/view.html14
-rw-r--r--users/models/identity.py26
-rw-r--r--users/views/identity.py5
13 files changed, 246 insertions, 6 deletions
diff --git a/miniq/__init__.py b/miniq/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/miniq/__init__.py
diff --git a/miniq/admin.py b/miniq/admin.py
new file mode 100644
index 0000000..1166f89
--- /dev/null
+++ b/miniq/admin.py
@@ -0,0 +1,21 @@
+from django.contrib import admin
+
+from miniq.models import Task
+
+
+@admin.register(Task)
+class TaskAdmin(admin.ModelAdmin):
+
+ list_display = ["id", "created", "type", "subject", "completed", "failed"]
+ ordering = ["-created"]
+ actions = ["reset"]
+
+ @admin.action(description="Reset Task")
+ def reset(self, request, queryset):
+ queryset.update(
+ failed=None,
+ completed=None,
+ locked=None,
+ locked_by=None,
+ error=None,
+ )
diff --git a/miniq/apps.py b/miniq/apps.py
new file mode 100644
index 0000000..4c7e773
--- /dev/null
+++ b/miniq/apps.py
@@ -0,0 +1,6 @@
+from django.apps import AppConfig
+
+
+class MiniqConfig(AppConfig):
+ default_auto_field = "django.db.models.BigAutoField"
+ name = "miniq"
diff --git a/miniq/migrations/0001_initial.py b/miniq/migrations/0001_initial.py
new file mode 100644
index 0000000..6775ff3
--- /dev/null
+++ b/miniq/migrations/0001_initial.py
@@ -0,0 +1,37 @@
+# Generated by Django 4.1.3 on 2022-11-06 03:59
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ initial = True
+
+ dependencies = []
+
+ operations = [
+ migrations.CreateModel(
+ name="Task",
+ fields=[
+ (
+ "id",
+ models.BigAutoField(
+ auto_created=True,
+ primary_key=True,
+ serialize=False,
+ verbose_name="ID",
+ ),
+ ),
+ ("type", models.CharField(max_length=500)),
+ ("priority", models.IntegerField(default=0)),
+ ("subject", models.TextField()),
+ ("payload", models.JSONField(blank=True, null=True)),
+ ("error", models.TextField(blank=True, null=True)),
+ ("created", models.DateTimeField(auto_now_add=True)),
+ ("completed", models.DateTimeField(blank=True, null=True)),
+ ("failed", models.DateTimeField(blank=True, null=True)),
+ ("locked", models.DateTimeField(blank=True, null=True)),
+ ("locked_by", models.CharField(blank=True, max_length=500, null=True)),
+ ],
+ ),
+ ]
diff --git a/miniq/migrations/__init__.py b/miniq/migrations/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/miniq/migrations/__init__.py
diff --git a/miniq/models.py b/miniq/models.py
new file mode 100644
index 0000000..996b482
--- /dev/null
+++ b/miniq/models.py
@@ -0,0 +1,68 @@
+from typing import Optional
+
+from django.db import models, transaction
+from django.utils import timezone
+
+
+class Task(models.Model):
+ """
+ A task that must be done by a queue processor
+ """
+
+ class TypeChoices(models.TextChoices):
+ identity_fetch = "identity_fetch"
+
+ type = models.CharField(max_length=500, choices=TypeChoices.choices)
+ priority = models.IntegerField(default=0)
+ subject = models.TextField()
+ payload = models.JSONField(blank=True, null=True)
+ error = models.TextField(blank=True, null=True)
+
+ created = models.DateTimeField(auto_now_add=True)
+ completed = models.DateTimeField(blank=True, null=True)
+ failed = models.DateTimeField(blank=True, null=True)
+ locked = models.DateTimeField(blank=True, null=True)
+ locked_by = models.CharField(max_length=500, blank=True, null=True)
+
+ def __str__(self):
+ return f"{self.id}/{self.type}({self.subject})"
+
+ @classmethod
+ def get_one_available(cls, processor_id) -> Optional["Task"]:
+ """
+ Gets one task off the list while reserving it, atomically.
+ """
+ with transaction.atomic():
+ next_task = cls.objects.filter(locked__isnull=True).first()
+ if next_task is None:
+ return None
+ next_task.locked = timezone.now()
+ next_task.locked_by = processor_id
+ next_task.save()
+ return next_task
+
+ @classmethod
+ def submit(cls, type, subject, payload=None, deduplicate=True):
+ # Deduplication is done against tasks that have not started yet only,
+ # and only on tasks without payloads
+ if deduplicate and not payload:
+ if cls.objects.filter(
+ type=type,
+ subject=subject,
+ completed__isnull=True,
+ failed__isnull=True,
+ locked__isnull=True,
+ ).exists():
+ return
+ cls.objects.create(type=type, subject=subject, payload=payload)
+
+ async def complete(self):
+ await self.__class__.objects.filter(id=self.id).aupdate(
+ completed=timezone.now()
+ )
+
+ async def fail(self, error):
+ await self.__class__.objects.filter(id=self.id).aupdate(
+ failed=timezone.now(),
+ error=error,
+ )
diff --git a/miniq/views.py b/miniq/views.py
new file mode 100644
index 0000000..12da7cd
--- /dev/null
+++ b/miniq/views.py
@@ -0,0 +1,68 @@
+import asyncio
+import time
+import traceback
+import uuid
+
+from asgiref.sync import sync_to_async
+from django.http import HttpResponse
+from django.views import View
+
+from miniq.models import Task
+from users.models import Identity
+
+
+class QueueProcessor(View):
+ """
+ A view that takes some items off the queue and processes them.
+ Tries to limit its own runtime so it's within HTTP timeout limits.
+ """
+
+ START_TIMEOUT = 30
+ TOTAL_TIMEOUT = 60
+ MAX_TASKS = 10
+
+ async def get(self, request):
+ start_time = time.monotonic()
+ processor_id = uuid.uuid4().hex
+ handled = 0
+ self.tasks = []
+ # For the first time period, launch tasks
+ while (time.monotonic() - start_time) < self.START_TIMEOUT:
+ # Remove completed tasks
+ self.tasks = [t for t in self.tasks if not t.done()]
+ # See if there's a new task
+ if len(self.tasks) < self.MAX_TASKS:
+ # Pop a task off the queue and run it
+ task = await sync_to_async(Task.get_one_available)(processor_id)
+ if task is not None:
+ self.tasks.append(asyncio.create_task(self.run_task(task)))
+ handled += 1
+ # Prevent busylooping
+ await asyncio.sleep(0.01)
+ # Then wait for tasks to finish
+ while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT:
+ # Remove completed tasks
+ self.tasks = [t for t in self.tasks if not t.done()]
+ if not self.tasks:
+ break
+ # Prevent busylooping
+ await asyncio.sleep(1)
+ return HttpResponse(f"{handled} tasks handled")
+
+ async def run_task(self, task):
+ try:
+ print(f"Task {task}: Starting")
+ handler = getattr(self, f"handle_{task.type}", None)
+ if handler is None:
+ raise ValueError(f"Cannot handle type {task.type}")
+ await handler(task.subject, task.payload)
+ await task.complete()
+ print(f"Task {task}: Complete")
+ except BaseException as e:
+ print(f"Task {task}: Error {e}")
+ traceback.print_exc()
+ await task.fail(f"{e}\n\n" + traceback.format_exc())
+
+ async def handle_identity_fetch(self, subject, payload):
+ identity = await sync_to_async(Identity.by_handle)(subject)
+ await identity.fetch_details()
diff --git a/static/css/style.css b/static/css/style.css
index 69eaa44..7a3b20a 100644
--- a/static/css/style.css
+++ b/static/css/style.css
@@ -100,7 +100,7 @@ header h1 {
font-family: "Raleway";
font-weight: normal;
background: var(--color-fg2);
- padding: 10px 7px 7px 7px;
+ padding: 10px 7px 7px 0;
font-size: 130%;
height: 2.2em;
color: var(--color-fg1);
diff --git a/takahe/settings.py b/takahe/settings.py
index 3e2b75a..26fd705 100644
--- a/takahe/settings.py
+++ b/takahe/settings.py
@@ -25,6 +25,7 @@ INSTALLED_APPS = [
"core",
"statuses",
"users",
+ "miniq",
]
MIDDLEWARE = [
@@ -113,3 +114,4 @@ CRISPY_FAIL_SILENTLY = not DEBUG
SITE_NAME = "takahē"
DEFAULT_DOMAIN = "feditest.aeracode.org"
ALLOWED_DOMAINS = ["feditest.aeracode.org"]
+IDENTITY_MAX_AGE = 24 * 60 * 60
diff --git a/takahe/urls.py b/takahe/urls.py
index d6e4d8f..f8bff07 100644
--- a/takahe/urls.py
+++ b/takahe/urls.py
@@ -2,6 +2,7 @@ from django.contrib import admin
from django.urls import path
from core import views as core
+from miniq import views as miniq
from users.views import auth, identity
urlpatterns = [
@@ -19,6 +20,8 @@ urlpatterns = [
path("identity/create/", identity.CreateIdentity.as_view()),
# Well-known endpoints
path(".well-known/webfinger", identity.Webfinger.as_view()),
+ # Task runner
+ path(".queue/process/", miniq.QueueProcessor.as_view()),
# Django admin
path("djadmin/", admin.site.urls),
]
diff --git a/templates/identity/view.html b/templates/identity/view.html
index deb7ae5..2a82478 100644
--- a/templates/identity/view.html
+++ b/templates/identity/view.html
@@ -14,10 +14,16 @@
</h1>
{% if not identity.local %}
- <p class="system-note">
- This user is a member of another server.
- <a href="{{ identity.profile_uri }}">See their original profile</a>
- </p>
+ {% if not identity.actor_uri %}
+ <p class="system-note">
+ The system is still fetching this profile. Refresh to see updates.
+ </p>
+ {% else %}
+ <p class="system-note">
+ This is a member of another server.
+ <a href="{{ identity.profile_uri }}">See their original profile</a>
+ </p>
+ {% endif %}
{% endif %}
{% for status in statuses %}
diff --git a/users/models/identity.py b/users/models/identity.py
index c20ef60..5586f27 100644
--- a/users/models/identity.py
+++ b/users/models/identity.py
@@ -59,6 +59,19 @@ class Identity(models.Model):
fetched = models.DateTimeField(null=True, blank=True)
deleted = models.DateTimeField(null=True, blank=True)
+ @classmethod
+ def by_handle(cls, handle, create=True):
+ if handle.startswith("@"):
+ raise ValueError("Handle must not start with @")
+ if "@" not in handle:
+ raise ValueError("Handle must contain domain")
+ try:
+ return cls.objects.filter(handle=handle).get()
+ except cls.DoesNotExist:
+ if create:
+ return cls.objects.create(handle=handle, local=False)
+ return None
+
@property
def short_handle(self):
if self.handle.endswith("@" + settings.DEFAULT_DOMAIN):
@@ -69,6 +82,17 @@ class Identity(models.Model):
def domain(self):
return self.handle.split("@", 1)[1]
+ @property
+ def data_age(self) -> float:
+ """
+ How old our copy of this data is, in seconds
+ """
+ if self.local:
+ return 0
+ if self.fetched is None:
+ return 10000000000
+ return (timezone.now() - self.fetched).total_seconds()
+
def generate_keypair(self):
private_key = rsa.generate_private_key(
public_exponent=65537,
@@ -104,6 +128,7 @@ class Identity(models.Model):
response = await client.get(
f"https://{self.domain}/.well-known/webfinger?resource=acct:{self.handle}",
headers={"Accept": "application/json"},
+ follow_redirects=True,
)
if response.status_code >= 400:
return False
@@ -126,6 +151,7 @@ class Identity(models.Model):
response = await client.get(
self.actor_uri,
headers={"Accept": "application/json"},
+ follow_redirects=True,
)
if response.status_code >= 400:
return False
diff --git a/users/views/identity.py b/users/views/identity.py
index fff521b..456fead 100644
--- a/users/views/identity.py
+++ b/users/views/identity.py
@@ -10,6 +10,7 @@ from django.views.decorators.csrf import csrf_exempt
from django.views.generic import FormView, TemplateView, View
from core.forms import FormHelper
+from miniq.models import Task
from users.models import Identity
from users.shortcuts import by_handle_or_404
@@ -19,8 +20,10 @@ class ViewIdentity(TemplateView):
template_name = "identity/view.html"
def get_context_data(self, handle):
- identity = by_handle_or_404(self.request, handle, local=False)
+ identity = Identity.by_handle(handle=handle)
statuses = identity.statuses.all()[:100]
+ if identity.data_age > settings.IDENTITY_MAX_AGE:
+ Task.submit("identity_fetch", identity.handle)
return {
"identity": identity,
"statuses": statuses,