diff options
Diffstat (limited to 'activities')
| -rw-r--r-- | activities/models/hashtag.py | 7 | ||||
| -rw-r--r-- | activities/models/post.py | 2 | ||||
| -rw-r--r-- | activities/models/timeline_event.py | 25 | ||||
| -rw-r--r-- | activities/search.py | 136 | ||||
| -rw-r--r-- | activities/views/search.py | 128 | 
5 files changed, 172 insertions, 126 deletions
| diff --git a/activities/models/hashtag.py b/activities/models/hashtag.py index 162f8b4..4e1a735 100644 --- a/activities/models/hashtag.py +++ b/activities/models/hashtag.py @@ -185,3 +185,10 @@ class Hashtag(StatorModel):              return f'<a class="hashtag" href="/tags/{hashtag.lower()}/">#{hashtag}</a>'          return mark_safe(Hashtag.hashtag_regex.sub(replacer, content)) + +    def to_mastodon_json(self): +        return { +            "name": self.hashtag, +            "url": self.urls.view.full(), +            "history": [], +        } diff --git a/activities/models/post.py b/activities/models/post.py index 1e372c2..16e798c 100644 --- a/activities/models/post.py +++ b/activities/models/post.py @@ -262,7 +262,7 @@ class Post(StatorModel):          r"(^|[^\w\d\-_])@([\w\d\-_]+(?:@[\w\d\-_]+\.[\w\d\-_\.]+)?)"      ) -    def linkify_mentions(self, content, local=False): +    def linkify_mentions(self, content: str, local: bool = False) -> str:          """          Links mentions _in the context of the post_ - as in, using the mentions          property as the only source (as we might be doing this without other diff --git a/activities/models/timeline_event.py b/activities/models/timeline_event.py index e598e3f..30d473d 100644 --- a/activities/models/timeline_event.py +++ b/activities/models/timeline_event.py @@ -1,5 +1,7 @@  from django.db import models +from core.ld import format_ld_date +  class TimelineEvent(models.Model):      """ @@ -143,3 +145,26 @@ class TimelineEvent(models.Model):                  subject_post_id=interaction.post_id,                  subject_identity_id=interaction.identity_id,              ).delete() + +    ### Mastodon Client API ### + +    def to_mastodon_notification_json(self): +        result = { +            "id": self.pk, +            "created_at": format_ld_date(self.created), +            "account": self.subject_identity.to_mastodon_json(), +        } +        if self.type == self.Types.liked: +            result["type"] = "favourite" +            result["status"] = self.subject_post.to_mastodon_json() +        elif self.type == self.Types.boosted: +            result["type"] = "reblog" +            result["status"] = self.subject_post.to_mastodon_json() +        elif self.type == self.Types.mentioned: +            result["type"] = "mention" +            result["status"] = self.subject_post.to_mastodon_json() +        elif self.type == self.Types.followed: +            result["type"] = "follow" +        else: +            raise ValueError(f"Cannot convert {self.type} to notification JSON") +        return result diff --git a/activities/search.py b/activities/search.py new file mode 100644 index 0000000..e192b94 --- /dev/null +++ b/activities/search.py @@ -0,0 +1,136 @@ +import httpx +from asgiref.sync import async_to_sync + +from activities.models import Hashtag, Post +from core.ld import canonicalise +from users.models import Domain, Identity, IdentityStates +from users.models.system_actor import SystemActor + + +class Searcher: +    """ +    Captures the logic needed to search - reused in the UI and API +    """ + +    def __init__(self, query: str, identity: Identity | None): +        self.query = query.strip().lower() +        self.identity = identity + +    def search_identities_handle(self) -> set[Identity]: +        """ +        Searches for identities by their handles +        """ + +        # Short circuit if it's obviously not for us +        if "://" in self.query: +            return set() + +        # Try to fetch the user by handle +        handle = self.query.lstrip("@") +        results: set[Identity] = set() +        if "@" in handle: +            username, domain = handle.split("@", 1) + +            # Resolve the domain to the display domain +            domain_instance = Domain.get_domain(domain) +            try: +                if domain_instance is None: +                    raise Identity.DoesNotExist() +                identity = Identity.objects.get( +                    domain=domain_instance, username=username +                ) +            except Identity.DoesNotExist: +                if self.identity is not None: +                    # Allow authenticated users to fetch remote +                    identity = Identity.by_username_and_domain( +                        username, domain, fetch=True +                    ) +                    if identity and identity.state == IdentityStates.outdated: +                        async_to_sync(identity.fetch_actor)() +                else: +                    identity = None +            if identity: +                results.add(identity) + +        else: +            for identity in Identity.objects.filter(username=handle)[:20]: +                results.add(identity) +            for identity in Identity.objects.filter(username__startswith=handle)[:20]: +                results.add(identity) +        return results + +    def search_url(self) -> Post | Identity | None: +        """ +        Searches for an identity or post by URL. +        """ + +        # Short circuit if it's obviously not for us +        if "://" not in self.query: +            return None + +        # Fetch the provided URL as the system actor to retrieve the AP JSON +        try: +            response = async_to_sync(SystemActor().signed_request)( +                method="get", +                uri=self.query, +            ) +        except (httpx.RequestError, httpx.ConnectError): +            return None +        if response.status_code >= 400: +            return None +        document = canonicalise(response.json(), include_security=True) +        type = document.get("type", "unknown").lower() + +        # Is it an identity? +        if type == "person": +            # Try and retrieve the profile by actor URI +            identity = Identity.by_actor_uri(document["id"], create=True) +            if identity and identity.state == IdentityStates.outdated: +                async_to_sync(identity.fetch_actor)() +            return identity + +        # Is it a post? +        elif type == "note": +            # Try and retrieve the post by URI +            # (we do not trust the JSON we just got - fetch from source!) +            try: +                return Post.by_object_uri(document["id"], fetch=True) +            except Post.DoesNotExist: +                return None + +        # Dunno what it is +        else: +            return None + +    def search_hashtags(self) -> set[Hashtag]: +        """ +        Searches for hashtags by their name +        """ + +        # Short circuit out if it's obviously not a hashtag +        if "@" in self.query or "://" in self.query: +            return set() + +        results: set[Hashtag] = set() +        name = self.query.lstrip("#") +        for hashtag in Hashtag.objects.public().hashtag_or_alias(name)[:10]: +            results.add(hashtag) +        for hashtag in Hashtag.objects.public().filter(hashtag__startswith=name)[:10]: +            results.add(hashtag) +        return results + +    def search_all(self): +        """ +        Returns all possible results for a search +        """ +        results = { +            "identities": self.search_identities_handle(), +            "hashtags": self.search_hashtags(), +            "posts": set(), +        } +        url_result = self.search_url() +        if isinstance(url_result, Identity): +            results["identities"].add(url_result) +        if isinstance(url_result, Post): +            results["posts"].add(url_result) +        return results diff --git a/activities/views/search.py b/activities/views/search.py index f7ab237..93c0012 100644 --- a/activities/views/search.py +++ b/activities/views/search.py @@ -1,12 +1,7 @@ -import httpx -from asgiref.sync import async_to_sync  from django import forms  from django.views.generic import FormView -from activities.models import Hashtag, Post -from core.ld import canonicalise -from users.models import Domain, Identity, IdentityStates -from users.models.system_actor import SystemActor +from activities.search import Searcher  class Search(FormView): @@ -19,126 +14,9 @@ class Search(FormView):              widget=forms.TextInput(attrs={"type": "search", "autofocus": "autofocus"}),          ) -    def search_identities_handle(self, query: str): -        """ -        Searches for identities by their handles -        """ - -        # Short circuit if it's obviously not for us -        if "://" in query: -            return set() - -        # Try to fetch the user by handle -        query = query.lstrip("@") -        results: set[Identity] = set() -        if "@" in query: -            username, domain = query.split("@", 1) - -            # Resolve the domain to the display domain -            domain_instance = Domain.get_domain(domain) -            try: -                if domain_instance is None: -                    raise Identity.DoesNotExist() -                identity = Identity.objects.get( -                    domain=domain_instance, username=username -                ) -            except Identity.DoesNotExist: -                if self.request.identity is not None: -                    # Allow authenticated users to fetch remote -                    identity = Identity.by_username_and_domain( -                        username, domain, fetch=True -                    ) -                    if identity and identity.state == IdentityStates.outdated: -                        async_to_sync(identity.fetch_actor)() -                else: -                    identity = None -            if identity: -                results.add(identity) - -        else: -            for identity in Identity.objects.filter(username=query)[:20]: -                results.add(identity) -            for identity in Identity.objects.filter(username__startswith=query)[:20]: -                results.add(identity) -        return results - -    def search_url(self, query: str) -> Post | Identity | None: -        """ -        Searches for an identity or post by URL. -        """ - -        # Short circuit if it's obviously not for us -        if "://" not in query: -            return None - -        # Clean up query -        query = query.strip() - -        # Fetch the provided URL as the system actor to retrieve the AP JSON -        try: -            response = async_to_sync(SystemActor().signed_request)( -                method="get", uri=query -            ) -        except (httpx.RequestError, httpx.ConnectError): -            return None -        if response.status_code >= 400: -            return None -        document = canonicalise(response.json(), include_security=True) -        type = document.get("type", "unknown").lower() - -        # Is it an identity? -        if type == "person": -            # Try and retrieve the profile by actor URI -            identity = Identity.by_actor_uri(document["id"], create=True) -            if identity and identity.state == IdentityStates.outdated: -                async_to_sync(identity.fetch_actor)() -            return identity - -        # Is it a post? -        elif type == "note": -            # Try and retrieve the post by URI -            # (we do not trust the JSON we just got - fetch from source!) -            try: -                return Post.by_object_uri(document["id"], fetch=True) -            except Post.DoesNotExist: -                return None - -        # Dunno what it is -        else: -            return None - -    def search_hashtags(self, query: str): -        """ -        Searches for hashtags by their name -        """ - -        # Short circuit out if it's obviously not a hashtag -        if "@" in query or "://" in query: -            return set() - -        results: set[Hashtag] = set() -        query = query.lstrip("#") -        for hashtag in Hashtag.objects.public().hashtag_or_alias(query)[:10]: -            results.add(hashtag) -        for hashtag in Hashtag.objects.public().filter(hashtag__startswith=query)[:10]: -            results.add(hashtag) -        return results -      def form_valid(self, form): -        query = form.cleaned_data["query"].lower() -        results = { -            "identities": self.search_identities_handle(query), -            "hashtags": self.search_hashtags(query), -            "posts": set(), -        } - -        url_result = self.search_url(query) -        if isinstance(url_result, Identity): -            results["identities"].add(url_result) -        if isinstance(url_result, Post): -            results["posts"].add(url_result) - +        searcher = Searcher(form.cleaned_data["query"], self.request.identity)          # Render results          context = self.get_context_data(form=form) -        context["results"] = results +        context["results"] = searcher.search_all()          return self.render_to_response(context) | 
