diff options
author | Andrew Godwin | 2022-11-05 22:49:25 -0600 |
---|---|---|
committer | Andrew Godwin | 2022-11-05 22:49:25 -0600 |
commit | a2404e01cdbeef2ba332e147a5f2f1ca0a0310d7 (patch) | |
tree | 9aa1ae2ffad20a7afe8eac8cbe99ab9a53dcc0ca | |
parent | 56de2362a01089c8a5ca3c6e1affcade00ffdfce (diff) | |
download | takahe-a2404e01cdbeef2ba332e147a5f2f1ca0a0310d7.tar.gz takahe-a2404e01cdbeef2ba332e147a5f2f1ca0a0310d7.tar.bz2 takahe-a2404e01cdbeef2ba332e147a5f2f1ca0a0310d7.zip |
Queuing system and lazy profile fetch
-rw-r--r-- | miniq/__init__.py | 0 | ||||
-rw-r--r-- | miniq/admin.py | 21 | ||||
-rw-r--r-- | miniq/apps.py | 6 | ||||
-rw-r--r-- | miniq/migrations/0001_initial.py | 37 | ||||
-rw-r--r-- | miniq/migrations/__init__.py | 0 | ||||
-rw-r--r-- | miniq/models.py | 68 | ||||
-rw-r--r-- | miniq/views.py | 68 | ||||
-rw-r--r-- | static/css/style.css | 2 | ||||
-rw-r--r-- | takahe/settings.py | 2 | ||||
-rw-r--r-- | takahe/urls.py | 3 | ||||
-rw-r--r-- | templates/identity/view.html | 14 | ||||
-rw-r--r-- | users/models/identity.py | 26 | ||||
-rw-r--r-- | users/views/identity.py | 5 |
13 files changed, 246 insertions, 6 deletions
diff --git a/miniq/__init__.py b/miniq/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/miniq/__init__.py diff --git a/miniq/admin.py b/miniq/admin.py new file mode 100644 index 0000000..1166f89 --- /dev/null +++ b/miniq/admin.py @@ -0,0 +1,21 @@ +from django.contrib import admin + +from miniq.models import Task + + +@admin.register(Task) +class TaskAdmin(admin.ModelAdmin): + + list_display = ["id", "created", "type", "subject", "completed", "failed"] + ordering = ["-created"] + actions = ["reset"] + + @admin.action(description="Reset Task") + def reset(self, request, queryset): + queryset.update( + failed=None, + completed=None, + locked=None, + locked_by=None, + error=None, + ) diff --git a/miniq/apps.py b/miniq/apps.py new file mode 100644 index 0000000..4c7e773 --- /dev/null +++ b/miniq/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class MiniqConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "miniq" diff --git a/miniq/migrations/0001_initial.py b/miniq/migrations/0001_initial.py new file mode 100644 index 0000000..6775ff3 --- /dev/null +++ b/miniq/migrations/0001_initial.py @@ -0,0 +1,37 @@ +# Generated by Django 4.1.3 on 2022-11-06 03:59 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="Task", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("type", models.CharField(max_length=500)), + ("priority", models.IntegerField(default=0)), + ("subject", models.TextField()), + ("payload", models.JSONField(blank=True, null=True)), + ("error", models.TextField(blank=True, null=True)), + ("created", models.DateTimeField(auto_now_add=True)), + ("completed", models.DateTimeField(blank=True, null=True)), + ("failed", models.DateTimeField(blank=True, null=True)), + ("locked", models.DateTimeField(blank=True, null=True)), + ("locked_by", models.CharField(blank=True, max_length=500, null=True)), + ], + ), + ] diff --git a/miniq/migrations/__init__.py b/miniq/migrations/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/miniq/migrations/__init__.py diff --git a/miniq/models.py b/miniq/models.py new file mode 100644 index 0000000..996b482 --- /dev/null +++ b/miniq/models.py @@ -0,0 +1,68 @@ +from typing import Optional + +from django.db import models, transaction +from django.utils import timezone + + +class Task(models.Model): + """ + A task that must be done by a queue processor + """ + + class TypeChoices(models.TextChoices): + identity_fetch = "identity_fetch" + + type = models.CharField(max_length=500, choices=TypeChoices.choices) + priority = models.IntegerField(default=0) + subject = models.TextField() + payload = models.JSONField(blank=True, null=True) + error = models.TextField(blank=True, null=True) + + created = models.DateTimeField(auto_now_add=True) + completed = models.DateTimeField(blank=True, null=True) + failed = models.DateTimeField(blank=True, null=True) + locked = models.DateTimeField(blank=True, null=True) + locked_by = models.CharField(max_length=500, blank=True, null=True) + + def __str__(self): + return f"{self.id}/{self.type}({self.subject})" + + @classmethod + def get_one_available(cls, processor_id) -> Optional["Task"]: + """ + Gets one task off the list while reserving it, atomically. + """ + with transaction.atomic(): + next_task = cls.objects.filter(locked__isnull=True).first() + if next_task is None: + return None + next_task.locked = timezone.now() + next_task.locked_by = processor_id + next_task.save() + return next_task + + @classmethod + def submit(cls, type, subject, payload=None, deduplicate=True): + # Deduplication is done against tasks that have not started yet only, + # and only on tasks without payloads + if deduplicate and not payload: + if cls.objects.filter( + type=type, + subject=subject, + completed__isnull=True, + failed__isnull=True, + locked__isnull=True, + ).exists(): + return + cls.objects.create(type=type, subject=subject, payload=payload) + + async def complete(self): + await self.__class__.objects.filter(id=self.id).aupdate( + completed=timezone.now() + ) + + async def fail(self, error): + await self.__class__.objects.filter(id=self.id).aupdate( + failed=timezone.now(), + error=error, + ) diff --git a/miniq/views.py b/miniq/views.py new file mode 100644 index 0000000..12da7cd --- /dev/null +++ b/miniq/views.py @@ -0,0 +1,68 @@ +import asyncio +import time +import traceback +import uuid + +from asgiref.sync import sync_to_async +from django.http import HttpResponse +from django.views import View + +from miniq.models import Task +from users.models import Identity + + +class QueueProcessor(View): + """ + A view that takes some items off the queue and processes them. + Tries to limit its own runtime so it's within HTTP timeout limits. + """ + + START_TIMEOUT = 30 + TOTAL_TIMEOUT = 60 + MAX_TASKS = 10 + + async def get(self, request): + start_time = time.monotonic() + processor_id = uuid.uuid4().hex + handled = 0 + self.tasks = [] + # For the first time period, launch tasks + while (time.monotonic() - start_time) < self.START_TIMEOUT: + # Remove completed tasks + self.tasks = [t for t in self.tasks if not t.done()] + # See if there's a new task + if len(self.tasks) < self.MAX_TASKS: + # Pop a task off the queue and run it + task = await sync_to_async(Task.get_one_available)(processor_id) + if task is not None: + self.tasks.append(asyncio.create_task(self.run_task(task))) + handled += 1 + # Prevent busylooping + await asyncio.sleep(0.01) + # Then wait for tasks to finish + while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT: + # Remove completed tasks + self.tasks = [t for t in self.tasks if not t.done()] + if not self.tasks: + break + # Prevent busylooping + await asyncio.sleep(1) + return HttpResponse(f"{handled} tasks handled") + + async def run_task(self, task): + try: + print(f"Task {task}: Starting") + handler = getattr(self, f"handle_{task.type}", None) + if handler is None: + raise ValueError(f"Cannot handle type {task.type}") + await handler(task.subject, task.payload) + await task.complete() + print(f"Task {task}: Complete") + except BaseException as e: + print(f"Task {task}: Error {e}") + traceback.print_exc() + await task.fail(f"{e}\n\n" + traceback.format_exc()) + + async def handle_identity_fetch(self, subject, payload): + identity = await sync_to_async(Identity.by_handle)(subject) + await identity.fetch_details() diff --git a/static/css/style.css b/static/css/style.css index 69eaa44..7a3b20a 100644 --- a/static/css/style.css +++ b/static/css/style.css @@ -100,7 +100,7 @@ header h1 { font-family: "Raleway"; font-weight: normal; background: var(--color-fg2); - padding: 10px 7px 7px 7px; + padding: 10px 7px 7px 0; font-size: 130%; height: 2.2em; color: var(--color-fg1); diff --git a/takahe/settings.py b/takahe/settings.py index 3e2b75a..26fd705 100644 --- a/takahe/settings.py +++ b/takahe/settings.py @@ -25,6 +25,7 @@ INSTALLED_APPS = [ "core", "statuses", "users", + "miniq", ] MIDDLEWARE = [ @@ -113,3 +114,4 @@ CRISPY_FAIL_SILENTLY = not DEBUG SITE_NAME = "takahē" DEFAULT_DOMAIN = "feditest.aeracode.org" ALLOWED_DOMAINS = ["feditest.aeracode.org"] +IDENTITY_MAX_AGE = 24 * 60 * 60 diff --git a/takahe/urls.py b/takahe/urls.py index d6e4d8f..f8bff07 100644 --- a/takahe/urls.py +++ b/takahe/urls.py @@ -2,6 +2,7 @@ from django.contrib import admin from django.urls import path from core import views as core +from miniq import views as miniq from users.views import auth, identity urlpatterns = [ @@ -19,6 +20,8 @@ urlpatterns = [ path("identity/create/", identity.CreateIdentity.as_view()), # Well-known endpoints path(".well-known/webfinger", identity.Webfinger.as_view()), + # Task runner + path(".queue/process/", miniq.QueueProcessor.as_view()), # Django admin path("djadmin/", admin.site.urls), ] diff --git a/templates/identity/view.html b/templates/identity/view.html index deb7ae5..2a82478 100644 --- a/templates/identity/view.html +++ b/templates/identity/view.html @@ -14,10 +14,16 @@ </h1> {% if not identity.local %} - <p class="system-note"> - This user is a member of another server. - <a href="{{ identity.profile_uri }}">See their original profile</a> - </p> + {% if not identity.actor_uri %} + <p class="system-note"> + The system is still fetching this profile. Refresh to see updates. + </p> + {% else %} + <p class="system-note"> + This is a member of another server. + <a href="{{ identity.profile_uri }}">See their original profile</a> + </p> + {% endif %} {% endif %} {% for status in statuses %} diff --git a/users/models/identity.py b/users/models/identity.py index c20ef60..5586f27 100644 --- a/users/models/identity.py +++ b/users/models/identity.py @@ -59,6 +59,19 @@ class Identity(models.Model): fetched = models.DateTimeField(null=True, blank=True) deleted = models.DateTimeField(null=True, blank=True) + @classmethod + def by_handle(cls, handle, create=True): + if handle.startswith("@"): + raise ValueError("Handle must not start with @") + if "@" not in handle: + raise ValueError("Handle must contain domain") + try: + return cls.objects.filter(handle=handle).get() + except cls.DoesNotExist: + if create: + return cls.objects.create(handle=handle, local=False) + return None + @property def short_handle(self): if self.handle.endswith("@" + settings.DEFAULT_DOMAIN): @@ -69,6 +82,17 @@ class Identity(models.Model): def domain(self): return self.handle.split("@", 1)[1] + @property + def data_age(self) -> float: + """ + How old our copy of this data is, in seconds + """ + if self.local: + return 0 + if self.fetched is None: + return 10000000000 + return (timezone.now() - self.fetched).total_seconds() + def generate_keypair(self): private_key = rsa.generate_private_key( public_exponent=65537, @@ -104,6 +128,7 @@ class Identity(models.Model): response = await client.get( f"https://{self.domain}/.well-known/webfinger?resource=acct:{self.handle}", headers={"Accept": "application/json"}, + follow_redirects=True, ) if response.status_code >= 400: return False @@ -126,6 +151,7 @@ class Identity(models.Model): response = await client.get( self.actor_uri, headers={"Accept": "application/json"}, + follow_redirects=True, ) if response.status_code >= 400: return False diff --git a/users/views/identity.py b/users/views/identity.py index fff521b..456fead 100644 --- a/users/views/identity.py +++ b/users/views/identity.py @@ -10,6 +10,7 @@ from django.views.decorators.csrf import csrf_exempt from django.views.generic import FormView, TemplateView, View from core.forms import FormHelper +from miniq.models import Task from users.models import Identity from users.shortcuts import by_handle_or_404 @@ -19,8 +20,10 @@ class ViewIdentity(TemplateView): template_name = "identity/view.html" def get_context_data(self, handle): - identity = by_handle_or_404(self.request, handle, local=False) + identity = Identity.by_handle(handle=handle) statuses = identity.statuses.all()[:100] + if identity.data_age > settings.IDENTITY_MAX_AGE: + Task.submit("identity_fetch", identity.handle) return { "identity": identity, "statuses": statuses, |