From fc8a21fc5c6809ea115092eeec57e09e984cdd76 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Sun, 11 Dec 2022 11:22:06 -0700 Subject: More API read coverage --- activities/views/search.py | 128 ++------------------------------------------- 1 file changed, 3 insertions(+), 125 deletions(-) (limited to 'activities/views') diff --git a/activities/views/search.py b/activities/views/search.py index f7ab237..93c0012 100644 --- a/activities/views/search.py +++ b/activities/views/search.py @@ -1,12 +1,7 @@ -import httpx -from asgiref.sync import async_to_sync from django import forms from django.views.generic import FormView -from activities.models import Hashtag, Post -from core.ld import canonicalise -from users.models import Domain, Identity, IdentityStates -from users.models.system_actor import SystemActor +from activities.search import Searcher class Search(FormView): @@ -19,126 +14,9 @@ class Search(FormView): widget=forms.TextInput(attrs={"type": "search", "autofocus": "autofocus"}), ) - def search_identities_handle(self, query: str): - """ - Searches for identities by their handles - """ - - # Short circuit if it's obviously not for us - if "://" in query: - return set() - - # Try to fetch the user by handle - query = query.lstrip("@") - results: set[Identity] = set() - if "@" in query: - username, domain = query.split("@", 1) - - # Resolve the domain to the display domain - domain_instance = Domain.get_domain(domain) - try: - if domain_instance is None: - raise Identity.DoesNotExist() - identity = Identity.objects.get( - domain=domain_instance, username=username - ) - except Identity.DoesNotExist: - if self.request.identity is not None: - # Allow authenticated users to fetch remote - identity = Identity.by_username_and_domain( - username, domain, fetch=True - ) - if identity and identity.state == IdentityStates.outdated: - async_to_sync(identity.fetch_actor)() - else: - identity = None - if identity: - results.add(identity) - - else: - for identity in Identity.objects.filter(username=query)[:20]: - results.add(identity) - for identity in Identity.objects.filter(username__startswith=query)[:20]: - results.add(identity) - return results - - def search_url(self, query: str) -> Post | Identity | None: - """ - Searches for an identity or post by URL. - """ - - # Short circuit if it's obviously not for us - if "://" not in query: - return None - - # Clean up query - query = query.strip() - - # Fetch the provided URL as the system actor to retrieve the AP JSON - try: - response = async_to_sync(SystemActor().signed_request)( - method="get", uri=query - ) - except (httpx.RequestError, httpx.ConnectError): - return None - if response.status_code >= 400: - return None - document = canonicalise(response.json(), include_security=True) - type = document.get("type", "unknown").lower() - - # Is it an identity? - if type == "person": - # Try and retrieve the profile by actor URI - identity = Identity.by_actor_uri(document["id"], create=True) - if identity and identity.state == IdentityStates.outdated: - async_to_sync(identity.fetch_actor)() - return identity - - # Is it a post? - elif type == "note": - # Try and retrieve the post by URI - # (we do not trust the JSON we just got - fetch from source!) - try: - return Post.by_object_uri(document["id"], fetch=True) - except Post.DoesNotExist: - return None - - # Dunno what it is - else: - return None - - def search_hashtags(self, query: str): - """ - Searches for hashtags by their name - """ - - # Short circuit out if it's obviously not a hashtag - if "@" in query or "://" in query: - return set() - - results: set[Hashtag] = set() - query = query.lstrip("#") - for hashtag in Hashtag.objects.public().hashtag_or_alias(query)[:10]: - results.add(hashtag) - for hashtag in Hashtag.objects.public().filter(hashtag__startswith=query)[:10]: - results.add(hashtag) - return results - def form_valid(self, form): - query = form.cleaned_data["query"].lower() - results = { - "identities": self.search_identities_handle(query), - "hashtags": self.search_hashtags(query), - "posts": set(), - } - - url_result = self.search_url(query) - if isinstance(url_result, Identity): - results["identities"].add(url_result) - if isinstance(url_result, Post): - results["posts"].add(url_result) - + searcher = Searcher(form.cleaned_data["query"], self.request.identity) # Render results context = self.get_context_data(form=form) - context["results"] = results + context["results"] = searcher.search_all() return self.render_to_response(context) -- cgit v1.2.3