diff options
Diffstat (limited to 'activities/views/search.py')
| -rw-r--r-- | activities/views/search.py | 128 | 
1 files changed, 3 insertions, 125 deletions
| diff --git a/activities/views/search.py b/activities/views/search.py index f7ab237..93c0012 100644 --- a/activities/views/search.py +++ b/activities/views/search.py @@ -1,12 +1,7 @@ -import httpx -from asgiref.sync import async_to_sync  from django import forms  from django.views.generic import FormView -from activities.models import Hashtag, Post -from core.ld import canonicalise -from users.models import Domain, Identity, IdentityStates -from users.models.system_actor import SystemActor +from activities.search import Searcher  class Search(FormView): @@ -19,126 +14,9 @@ class Search(FormView):              widget=forms.TextInput(attrs={"type": "search", "autofocus": "autofocus"}),          ) -    def search_identities_handle(self, query: str): -        """ -        Searches for identities by their handles -        """ - -        # Short circuit if it's obviously not for us -        if "://" in query: -            return set() - -        # Try to fetch the user by handle -        query = query.lstrip("@") -        results: set[Identity] = set() -        if "@" in query: -            username, domain = query.split("@", 1) - -            # Resolve the domain to the display domain -            domain_instance = Domain.get_domain(domain) -            try: -                if domain_instance is None: -                    raise Identity.DoesNotExist() -                identity = Identity.objects.get( -                    domain=domain_instance, username=username -                ) -            except Identity.DoesNotExist: -                if self.request.identity is not None: -                    # Allow authenticated users to fetch remote -                    identity = Identity.by_username_and_domain( -                        username, domain, fetch=True -                    ) -                    if identity and identity.state == IdentityStates.outdated: -                        async_to_sync(identity.fetch_actor)() -                else: -                    identity = None -            if identity: -                results.add(identity) - -        else: -            for identity in Identity.objects.filter(username=query)[:20]: -                results.add(identity) -            for identity in Identity.objects.filter(username__startswith=query)[:20]: -                results.add(identity) -        return results - -    def search_url(self, query: str) -> Post | Identity | None: -        """ -        Searches for an identity or post by URL. -        """ - -        # Short circuit if it's obviously not for us -        if "://" not in query: -            return None - -        # Clean up query -        query = query.strip() - -        # Fetch the provided URL as the system actor to retrieve the AP JSON -        try: -            response = async_to_sync(SystemActor().signed_request)( -                method="get", uri=query -            ) -        except (httpx.RequestError, httpx.ConnectError): -            return None -        if response.status_code >= 400: -            return None -        document = canonicalise(response.json(), include_security=True) -        type = document.get("type", "unknown").lower() - -        # Is it an identity? -        if type == "person": -            # Try and retrieve the profile by actor URI -            identity = Identity.by_actor_uri(document["id"], create=True) -            if identity and identity.state == IdentityStates.outdated: -                async_to_sync(identity.fetch_actor)() -            return identity - -        # Is it a post? -        elif type == "note": -            # Try and retrieve the post by URI -            # (we do not trust the JSON we just got - fetch from source!) -            try: -                return Post.by_object_uri(document["id"], fetch=True) -            except Post.DoesNotExist: -                return None - -        # Dunno what it is -        else: -            return None - -    def search_hashtags(self, query: str): -        """ -        Searches for hashtags by their name -        """ - -        # Short circuit out if it's obviously not a hashtag -        if "@" in query or "://" in query: -            return set() - -        results: set[Hashtag] = set() -        query = query.lstrip("#") -        for hashtag in Hashtag.objects.public().hashtag_or_alias(query)[:10]: -            results.add(hashtag) -        for hashtag in Hashtag.objects.public().filter(hashtag__startswith=query)[:10]: -            results.add(hashtag) -        return results -      def form_valid(self, form): -        query = form.cleaned_data["query"].lower() -        results = { -            "identities": self.search_identities_handle(query), -            "hashtags": self.search_hashtags(query), -            "posts": set(), -        } - -        url_result = self.search_url(query) -        if isinstance(url_result, Identity): -            results["identities"].add(url_result) -        if isinstance(url_result, Post): -            results["posts"].add(url_result) - +        searcher = Searcher(form.cleaned_data["query"], self.request.identity)          # Render results          context = self.get_context_data(form=form) -        context["results"] = results +        context["results"] = searcher.search_all()          return self.render_to_response(context) | 
