summaryrefslogtreecommitdiffstats
path: root/activities/views/search.py
blob: f7ab237b2c1c20341f85e2d02c2ac2de8e76b246 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import httpx
from asgiref.sync import async_to_sync
from django import forms
from django.views.generic import FormView

from activities.models import Hashtag, Post
from core.ld import canonicalise
from users.models import Domain, Identity, IdentityStates
from users.models.system_actor import SystemActor


class Search(FormView):

    template_name = "activities/search.html"

    class form_class(forms.Form):
        query = forms.CharField(
            help_text="Search for:\nA user by @username@domain or their profile URL\nA hashtag by #tagname\nA post by its URL",
            widget=forms.TextInput(attrs={"type": "search", "autofocus": "autofocus"}),
        )

    def search_identities_handle(self, query: str):
        """
        Searches for identities by their handles
        """

        # Short circuit if it's obviously not for us
        if "://" in query:
            return set()

        # Try to fetch the user by handle
        query = query.lstrip("@")
        results: set[Identity] = set()
        if "@" in query:
            username, domain = query.split("@", 1)

            # Resolve the domain to the display domain
            domain_instance = Domain.get_domain(domain)
            try:
                if domain_instance is None:
                    raise Identity.DoesNotExist()
                identity = Identity.objects.get(
                    domain=domain_instance, username=username
                )
            except Identity.DoesNotExist:
                if self.request.identity is not None:
                    # Allow authenticated users to fetch remote
                    identity = Identity.by_username_and_domain(
                        username, domain, fetch=True
                    )
                    if identity and identity.state == IdentityStates.outdated:
                        async_to_sync(identity.fetch_actor)()
                else:
                    identity = None
            if identity:
                results.add(identity)

        else:
            for identity in Identity.objects.filter(username=query)[:20]:
                results.add(identity)
            for identity in Identity.objects.filter(username__startswith=query)[:20]:
                results.add(identity)
        return results

    def search_url(self, query: str) -> Post | Identity | None:
        """
        Searches for an identity or post by URL.
        """

        # Short circuit if it's obviously not for us
        if "://" not in query:
            return None

        # Clean up query
        query = query.strip()

        # Fetch the provided URL as the system actor to retrieve the AP JSON
        try:
            response = async_to_sync(SystemActor().signed_request)(
                method="get", uri=query
            )
        except (httpx.RequestError, httpx.ConnectError):
            return None
        if response.status_code >= 400:
            return None
        document = canonicalise(response.json(), include_security=True)
        type = document.get("type", "unknown").lower()

        # Is it an identity?
        if type == "person":
            # Try and retrieve the profile by actor URI
            identity = Identity.by_actor_uri(document["id"], create=True)
            if identity and identity.state == IdentityStates.outdated:
                async_to_sync(identity.fetch_actor)()
            return identity

        # Is it a post?
        elif type == "note":
            # Try and retrieve the post by URI
            # (we do not trust the JSON we just got - fetch from source!)
            try:
                return Post.by_object_uri(document["id"], fetch=True)
            except Post.DoesNotExist:
                return None

        # Dunno what it is
        else:
            return None

    def search_hashtags(self, query: str):
        """
        Searches for hashtags by their name
        """

        # Short circuit out if it's obviously not a hashtag
        if "@" in query or "://" in query:
            return set()

        results: set[Hashtag] = set()
        query = query.lstrip("#")
        for hashtag in Hashtag.objects.public().hashtag_or_alias(query)[:10]:
            results.add(hashtag)
        for hashtag in Hashtag.objects.public().filter(hashtag__startswith=query)[:10]:
            results.add(hashtag)
        return results

    def form_valid(self, form):
        query = form.cleaned_data["query"].lower()
        results = {
            "identities": self.search_identities_handle(query),
            "hashtags": self.search_hashtags(query),
            "posts": set(),
        }

        url_result = self.search_url(query)
        if isinstance(url_result, Identity):
            results["identities"].add(url_result)
        if isinstance(url_result, Post):
            results["posts"].add(url_result)

        # Render results
        context = self.get_context_data(form=form)
        context["results"] = results
        return self.render_to_response(context)