1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
import httpx
from asgiref.sync import async_to_sync
from django import forms
from django.views.generic import FormView
from activities.models import Hashtag, Post
from core.ld import canonicalise
from users.models import Domain, Identity, IdentityStates
from users.models.system_actor import SystemActor
class Search(FormView):
template_name = "activities/search.html"
class form_class(forms.Form):
query = forms.CharField(
help_text="Search for:\nA user by @username@domain or their profile URL\nA hashtag by #tagname\nA post by its URL",
widget=forms.TextInput(attrs={"type": "search", "autofocus": "autofocus"}),
)
def search_identities_handle(self, query: str):
"""
Searches for identities by their handles
"""
# Short circuit if it's obviously not for us
if "://" in query:
return set()
# Try to fetch the user by handle
query = query.lstrip("@")
results: set[Identity] = set()
if "@" in query:
username, domain = query.split("@", 1)
# Resolve the domain to the display domain
domain_instance = Domain.get_domain(domain)
try:
if domain_instance is None:
raise Identity.DoesNotExist()
identity = Identity.objects.get(
domain=domain_instance, username=username
)
except Identity.DoesNotExist:
if self.request.identity is not None:
# Allow authenticated users to fetch remote
identity = Identity.by_username_and_domain(
username, domain, fetch=True
)
if identity and identity.state == IdentityStates.outdated:
async_to_sync(identity.fetch_actor)()
else:
identity = None
if identity:
results.add(identity)
else:
for identity in Identity.objects.filter(username=query)[:20]:
results.add(identity)
for identity in Identity.objects.filter(username__startswith=query)[:20]:
results.add(identity)
return results
def search_url(self, query: str) -> Post | Identity | None:
"""
Searches for an identity or post by URL.
"""
# Short circuit if it's obviously not for us
if "://" not in query:
return None
# Clean up query
query = query.strip()
# Fetch the provided URL as the system actor to retrieve the AP JSON
try:
response = async_to_sync(SystemActor().signed_request)(
method="get", uri=query
)
except (httpx.RequestError, httpx.ConnectError):
return None
if response.status_code >= 400:
return None
document = canonicalise(response.json(), include_security=True)
type = document.get("type", "unknown").lower()
# Is it an identity?
if type == "person":
# Try and retrieve the profile by actor URI
identity = Identity.by_actor_uri(document["id"], create=True)
if identity and identity.state == IdentityStates.outdated:
async_to_sync(identity.fetch_actor)()
return identity
# Is it a post?
elif type == "note":
# Try and retrieve the post by URI
# (we do not trust the JSON we just got - fetch from source!)
try:
return Post.by_object_uri(document["id"], fetch=True)
except Post.DoesNotExist:
return None
# Dunno what it is
else:
return None
def search_hashtags(self, query: str):
"""
Searches for hashtags by their name
"""
# Short circuit out if it's obviously not a hashtag
if "@" in query or "://" in query:
return set()
results: set[Hashtag] = set()
query = query.lstrip("#")
for hashtag in Hashtag.objects.public().hashtag_or_alias(query)[:10]:
results.add(hashtag)
for hashtag in Hashtag.objects.public().filter(hashtag__startswith=query)[:10]:
results.add(hashtag)
return results
def form_valid(self, form):
query = form.cleaned_data["query"].lower()
results = {
"identities": self.search_identities_handle(query),
"hashtags": self.search_hashtags(query),
"posts": set(),
}
url_result = self.search_url(query)
if isinstance(url_result, Identity):
results["identities"].add(url_result)
if isinstance(url_result, Post):
results["posts"].add(url_result)
# Render results
context = self.get_context_data(form=form)
context["results"] = results
return self.render_to_response(context)
|