From fc8a21fc5c6809ea115092eeec57e09e984cdd76 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Sun, 11 Dec 2022 11:22:06 -0700 Subject: More API read coverage --- activities/models/hashtag.py | 7 ++ activities/models/post.py | 2 +- activities/models/timeline_event.py | 25 +++++++ activities/search.py | 136 ++++++++++++++++++++++++++++++++++++ activities/views/search.py | 128 +-------------------------------- 5 files changed, 172 insertions(+), 126 deletions(-) create mode 100644 activities/search.py (limited to 'activities') diff --git a/activities/models/hashtag.py b/activities/models/hashtag.py index 162f8b4..4e1a735 100644 --- a/activities/models/hashtag.py +++ b/activities/models/hashtag.py @@ -185,3 +185,10 @@ class Hashtag(StatorModel): return f'#{hashtag}' return mark_safe(Hashtag.hashtag_regex.sub(replacer, content)) + + def to_mastodon_json(self): + return { + "name": self.hashtag, + "url": self.urls.view.full(), + "history": [], + } diff --git a/activities/models/post.py b/activities/models/post.py index 1e372c2..16e798c 100644 --- a/activities/models/post.py +++ b/activities/models/post.py @@ -262,7 +262,7 @@ class Post(StatorModel): r"(^|[^\w\d\-_])@([\w\d\-_]+(?:@[\w\d\-_]+\.[\w\d\-_\.]+)?)" ) - def linkify_mentions(self, content, local=False): + def linkify_mentions(self, content: str, local: bool = False) -> str: """ Links mentions _in the context of the post_ - as in, using the mentions property as the only source (as we might be doing this without other diff --git a/activities/models/timeline_event.py b/activities/models/timeline_event.py index e598e3f..30d473d 100644 --- a/activities/models/timeline_event.py +++ b/activities/models/timeline_event.py @@ -1,5 +1,7 @@ from django.db import models +from core.ld import format_ld_date + class TimelineEvent(models.Model): """ @@ -143,3 +145,26 @@ class TimelineEvent(models.Model): subject_post_id=interaction.post_id, subject_identity_id=interaction.identity_id, ).delete() + + ### Mastodon Client API ### + + def to_mastodon_notification_json(self): + result = { + "id": self.pk, + "created_at": format_ld_date(self.created), + "account": self.subject_identity.to_mastodon_json(), + } + if self.type == self.Types.liked: + result["type"] = "favourite" + result["status"] = self.subject_post.to_mastodon_json() + elif self.type == self.Types.boosted: + result["type"] = "reblog" + result["status"] = self.subject_post.to_mastodon_json() + elif self.type == self.Types.mentioned: + result["type"] = "mention" + result["status"] = self.subject_post.to_mastodon_json() + elif self.type == self.Types.followed: + result["type"] = "follow" + else: + raise ValueError(f"Cannot convert {self.type} to notification JSON") + return result diff --git a/activities/search.py b/activities/search.py new file mode 100644 index 0000000..e192b94 --- /dev/null +++ b/activities/search.py @@ -0,0 +1,136 @@ +import httpx +from asgiref.sync import async_to_sync + +from activities.models import Hashtag, Post +from core.ld import canonicalise +from users.models import Domain, Identity, IdentityStates +from users.models.system_actor import SystemActor + + +class Searcher: + """ + Captures the logic needed to search - reused in the UI and API + """ + + def __init__(self, query: str, identity: Identity | None): + self.query = query.strip().lower() + self.identity = identity + + def search_identities_handle(self) -> set[Identity]: + """ + Searches for identities by their handles + """ + + # Short circuit if it's obviously not for us + if "://" in self.query: + return set() + + # Try to fetch the user by handle + handle = self.query.lstrip("@") + results: set[Identity] = set() + if "@" in handle: + username, domain = handle.split("@", 1) + + # Resolve the domain to the display domain + domain_instance = Domain.get_domain(domain) + try: + if domain_instance is None: + raise Identity.DoesNotExist() + identity = Identity.objects.get( + domain=domain_instance, username=username + ) + except Identity.DoesNotExist: + if self.identity is not None: + # Allow authenticated users to fetch remote + identity = Identity.by_username_and_domain( + username, domain, fetch=True + ) + if identity and identity.state == IdentityStates.outdated: + async_to_sync(identity.fetch_actor)() + else: + identity = None + if identity: + results.add(identity) + + else: + for identity in Identity.objects.filter(username=handle)[:20]: + results.add(identity) + for identity in Identity.objects.filter(username__startswith=handle)[:20]: + results.add(identity) + return results + + def search_url(self) -> Post | Identity | None: + """ + Searches for an identity or post by URL. + """ + + # Short circuit if it's obviously not for us + if "://" not in self.query: + return None + + # Fetch the provided URL as the system actor to retrieve the AP JSON + try: + response = async_to_sync(SystemActor().signed_request)( + method="get", + uri=self.query, + ) + except (httpx.RequestError, httpx.ConnectError): + return None + if response.status_code >= 400: + return None + document = canonicalise(response.json(), include_security=True) + type = document.get("type", "unknown").lower() + + # Is it an identity? + if type == "person": + # Try and retrieve the profile by actor URI + identity = Identity.by_actor_uri(document["id"], create=True) + if identity and identity.state == IdentityStates.outdated: + async_to_sync(identity.fetch_actor)() + return identity + + # Is it a post? + elif type == "note": + # Try and retrieve the post by URI + # (we do not trust the JSON we just got - fetch from source!) + try: + return Post.by_object_uri(document["id"], fetch=True) + except Post.DoesNotExist: + return None + + # Dunno what it is + else: + return None + + def search_hashtags(self) -> set[Hashtag]: + """ + Searches for hashtags by their name + """ + + # Short circuit out if it's obviously not a hashtag + if "@" in self.query or "://" in self.query: + return set() + + results: set[Hashtag] = set() + name = self.query.lstrip("#") + for hashtag in Hashtag.objects.public().hashtag_or_alias(name)[:10]: + results.add(hashtag) + for hashtag in Hashtag.objects.public().filter(hashtag__startswith=name)[:10]: + results.add(hashtag) + return results + + def search_all(self): + """ + Returns all possible results for a search + """ + results = { + "identities": self.search_identities_handle(), + "hashtags": self.search_hashtags(), + "posts": set(), + } + url_result = self.search_url() + if isinstance(url_result, Identity): + results["identities"].add(url_result) + if isinstance(url_result, Post): + results["posts"].add(url_result) + return results diff --git a/activities/views/search.py b/activities/views/search.py index f7ab237..93c0012 100644 --- a/activities/views/search.py +++ b/activities/views/search.py @@ -1,12 +1,7 @@ -import httpx -from asgiref.sync import async_to_sync from django import forms from django.views.generic import FormView -from activities.models import Hashtag, Post -from core.ld import canonicalise -from users.models import Domain, Identity, IdentityStates -from users.models.system_actor import SystemActor +from activities.search import Searcher class Search(FormView): @@ -19,126 +14,9 @@ class Search(FormView): widget=forms.TextInput(attrs={"type": "search", "autofocus": "autofocus"}), ) - def search_identities_handle(self, query: str): - """ - Searches for identities by their handles - """ - - # Short circuit if it's obviously not for us - if "://" in query: - return set() - - # Try to fetch the user by handle - query = query.lstrip("@") - results: set[Identity] = set() - if "@" in query: - username, domain = query.split("@", 1) - - # Resolve the domain to the display domain - domain_instance = Domain.get_domain(domain) - try: - if domain_instance is None: - raise Identity.DoesNotExist() - identity = Identity.objects.get( - domain=domain_instance, username=username - ) - except Identity.DoesNotExist: - if self.request.identity is not None: - # Allow authenticated users to fetch remote - identity = Identity.by_username_and_domain( - username, domain, fetch=True - ) - if identity and identity.state == IdentityStates.outdated: - async_to_sync(identity.fetch_actor)() - else: - identity = None - if identity: - results.add(identity) - - else: - for identity in Identity.objects.filter(username=query)[:20]: - results.add(identity) - for identity in Identity.objects.filter(username__startswith=query)[:20]: - results.add(identity) - return results - - def search_url(self, query: str) -> Post | Identity | None: - """ - Searches for an identity or post by URL. - """ - - # Short circuit if it's obviously not for us - if "://" not in query: - return None - - # Clean up query - query = query.strip() - - # Fetch the provided URL as the system actor to retrieve the AP JSON - try: - response = async_to_sync(SystemActor().signed_request)( - method="get", uri=query - ) - except (httpx.RequestError, httpx.ConnectError): - return None - if response.status_code >= 400: - return None - document = canonicalise(response.json(), include_security=True) - type = document.get("type", "unknown").lower() - - # Is it an identity? - if type == "person": - # Try and retrieve the profile by actor URI - identity = Identity.by_actor_uri(document["id"], create=True) - if identity and identity.state == IdentityStates.outdated: - async_to_sync(identity.fetch_actor)() - return identity - - # Is it a post? - elif type == "note": - # Try and retrieve the post by URI - # (we do not trust the JSON we just got - fetch from source!) - try: - return Post.by_object_uri(document["id"], fetch=True) - except Post.DoesNotExist: - return None - - # Dunno what it is - else: - return None - - def search_hashtags(self, query: str): - """ - Searches for hashtags by their name - """ - - # Short circuit out if it's obviously not a hashtag - if "@" in query or "://" in query: - return set() - - results: set[Hashtag] = set() - query = query.lstrip("#") - for hashtag in Hashtag.objects.public().hashtag_or_alias(query)[:10]: - results.add(hashtag) - for hashtag in Hashtag.objects.public().filter(hashtag__startswith=query)[:10]: - results.add(hashtag) - return results - def form_valid(self, form): - query = form.cleaned_data["query"].lower() - results = { - "identities": self.search_identities_handle(query), - "hashtags": self.search_hashtags(query), - "posts": set(), - } - - url_result = self.search_url(query) - if isinstance(url_result, Identity): - results["identities"].add(url_result) - if isinstance(url_result, Post): - results["posts"].add(url_result) - + searcher = Searcher(form.cleaned_data["query"], self.request.identity) # Render results context = self.get_context_data(form=form) - context["results"] = results + context["results"] = searcher.search_all() return self.render_to_response(context) -- cgit v1.2.3