summaryrefslogtreecommitdiffstats
path: root/activities/search.py
blob: 5a43156b86129acc98be838ba63a02b924b8659d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import httpx
from asgiref.sync import async_to_sync

from activities.models import Hashtag, Post
from core.ld import canonicalise
from users.models import Domain, Identity, IdentityStates
from users.models.system_actor import SystemActor


class Searcher:
    """
    Captures the logic needed to search - reused in the UI and API
    """

    def __init__(self, query: str, identity: Identity | None):
        self.query = query.strip().lower()
        self.identity = identity

    def search_identities_handle(self) -> set[Identity]:
        """
        Searches for identities by their handles
        """

        # Short circuit if it's obviously not for us
        if "://" in self.query:
            return set()

        # Try to fetch the user by handle
        handle = self.query.lstrip("@")
        results: set[Identity] = set()
        if "@" in handle:
            username, domain = handle.split("@", 1)

            # Resolve the domain to the display domain
            domain_instance = Domain.get_domain(domain)
            try:
                if domain_instance is None:
                    raise Identity.DoesNotExist()
                identity = Identity.objects.get(
                    domain=domain_instance, username=username
                )
            except Identity.DoesNotExist:
                if self.identity is not None:
                    # Allow authenticated users to fetch remote
                    identity = Identity.by_username_and_domain(
                        username, domain, fetch=True
                    )
                    if identity and identity.state == IdentityStates.outdated:
                        async_to_sync(identity.fetch_actor)()
                else:
                    identity = None
            if identity:
                results.add(identity)

        else:
            for identity in Identity.objects.filter(username=handle)[:20]:
                results.add(identity)
            for identity in Identity.objects.filter(username__startswith=handle)[:20]:
                results.add(identity)
        return results

    def search_url(self) -> Post | Identity | None:
        """
        Searches for an identity or post by URL.
        """

        # Short circuit if it's obviously not for us
        if "://" not in self.query:
            return None

        # Fetch the provided URL as the system actor to retrieve the AP JSON
        try:
            response = async_to_sync(SystemActor().signed_request)(
                method="get",
                uri=self.query,
            )
        except httpx.RequestError:
            return None
        if response.status_code >= 400:
            return None
        document = canonicalise(response.json(), include_security=True)
        type = document.get("type", "unknown").lower()

        # Is it an identity?
        if type == "person":
            # Try and retrieve the profile by actor URI
            identity = Identity.by_actor_uri(document["id"], create=True)
            if identity and identity.state == IdentityStates.outdated:
                async_to_sync(identity.fetch_actor)()
            return identity

        # Is it a post?
        elif type == "note":
            # Try and retrieve the post by URI
            # (we do not trust the JSON we just got - fetch from source!)
            try:
                return Post.by_object_uri(document["id"], fetch=True)
            except Post.DoesNotExist:
                return None

        # Dunno what it is
        else:
            return None

    def search_hashtags(self) -> set[Hashtag]:
        """
        Searches for hashtags by their name
        """

        # Short circuit out if it's obviously not a hashtag
        if "@" in self.query or "://" in self.query:
            return set()

        results: set[Hashtag] = set()
        name = self.query.lstrip("#")
        for hashtag in Hashtag.objects.public().hashtag_or_alias(name)[:10]:
            results.add(hashtag)
        for hashtag in Hashtag.objects.public().filter(hashtag__startswith=name)[:10]:
            results.add(hashtag)
        return results

    def search_all(self):
        """
        Returns all possible results for a search
        """
        results = {
            "identities": self.search_identities_handle(),
            "hashtags": self.search_hashtags(),
            "posts": set(),
        }
        url_result = self.search_url()
        if isinstance(url_result, Identity):
            results["identities"].add(url_result)
        if isinstance(url_result, Post):
            results["posts"].add(url_result)
        return results