1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
import httpx
from asgiref.sync import async_to_sync
from activities.models import Hashtag, Post
from core.ld import canonicalise
from users.models import Domain, Identity, IdentityStates
from users.models.system_actor import SystemActor
class Searcher:
"""
Captures the logic needed to search - reused in the UI and API
"""
def __init__(self, query: str, identity: Identity | None):
self.query = query.strip().lower()
self.identity = identity
def search_identities_handle(self) -> set[Identity]:
"""
Searches for identities by their handles
"""
# Short circuit if it's obviously not for us
if "://" in self.query:
return set()
# Try to fetch the user by handle
handle = self.query.lstrip("@")
results: set[Identity] = set()
if "@" in handle:
username, domain = handle.split("@", 1)
# Resolve the domain to the display domain
domain_instance = Domain.get_domain(domain)
try:
if domain_instance is None:
raise Identity.DoesNotExist()
identity = Identity.objects.get(
domain=domain_instance, username=username
)
except Identity.DoesNotExist:
if self.identity is not None:
# Allow authenticated users to fetch remote
identity = Identity.by_username_and_domain(
username, domain, fetch=True
)
if identity and identity.state == IdentityStates.outdated:
async_to_sync(identity.fetch_actor)()
else:
identity = None
if identity:
results.add(identity)
else:
for identity in Identity.objects.filter(username=handle)[:20]:
results.add(identity)
for identity in Identity.objects.filter(username__startswith=handle)[:20]:
results.add(identity)
return results
def search_url(self) -> Post | Identity | None:
"""
Searches for an identity or post by URL.
"""
# Short circuit if it's obviously not for us
if "://" not in self.query:
return None
# Fetch the provided URL as the system actor to retrieve the AP JSON
try:
response = async_to_sync(SystemActor().signed_request)(
method="get",
uri=self.query,
)
except httpx.RequestError:
return None
if response.status_code >= 400:
return None
document = canonicalise(response.json(), include_security=True)
type = document.get("type", "unknown").lower()
# Is it an identity?
if type == "person":
# Try and retrieve the profile by actor URI
identity = Identity.by_actor_uri(document["id"], create=True)
if identity and identity.state == IdentityStates.outdated:
async_to_sync(identity.fetch_actor)()
return identity
# Is it a post?
elif type == "note":
# Try and retrieve the post by URI
# (we do not trust the JSON we just got - fetch from source!)
try:
return Post.by_object_uri(document["id"], fetch=True)
except Post.DoesNotExist:
return None
# Dunno what it is
else:
return None
def search_hashtags(self) -> set[Hashtag]:
"""
Searches for hashtags by their name
"""
# Short circuit out if it's obviously not a hashtag
if "@" in self.query or "://" in self.query:
return set()
results: set[Hashtag] = set()
name = self.query.lstrip("#")
for hashtag in Hashtag.objects.public().hashtag_or_alias(name)[:10]:
results.add(hashtag)
for hashtag in Hashtag.objects.public().filter(hashtag__startswith=name)[:10]:
results.add(hashtag)
return results
def search_all(self):
"""
Returns all possible results for a search
"""
results = {
"identities": self.search_identities_handle(),
"hashtags": self.search_hashtags(),
"posts": set(),
}
url_result = self.search_url()
if isinstance(url_result, Identity):
results["identities"].add(url_result)
if isinstance(url_result, Post):
results["posts"].add(url_result)
return results
|