1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
from asgiref.sync import async_to_sync
from django.contrib import admin
from activities.models import (
FanOut,
Hashtag,
Post,
PostAttachment,
PostInteraction,
TimelineEvent,
)
@admin.register(Hashtag)
class HashtagAdmin(admin.ModelAdmin):
list_display = ["hashtag", "name_override", "state", "stats_updated", "created"]
readonly_fields = ["created", "updated", "stats_updated"]
actions = ["force_execution"]
@admin.action(description="Force Execution")
def force_execution(self, request, queryset):
for instance in queryset:
instance.transition_perform("outdated")
class PostAttachmentInline(admin.StackedInline):
model = PostAttachment
extra = 0
@admin.register(Post)
class PostAdmin(admin.ModelAdmin):
list_display = ["id", "state", "author", "created"]
raw_id_fields = ["to", "mentions", "author"]
actions = ["force_fetch", "reparse_hashtags"]
search_fields = ["content"]
inlines = [PostAttachmentInline]
readonly_fields = ["created", "updated", "object_json"]
@admin.action(description="Force Fetch")
def force_fetch(self, request, queryset):
for instance in queryset:
instance.debug_fetch()
@admin.action(description="Reprocess content for hashtags")
def reparse_hashtags(self, request, queryset):
for instance in queryset:
instance.hashtags = Hashtag.hashtags_from_content(instance.content) or None
instance.save()
async_to_sync(instance.ensure_hashtags)()
@admin.display(description="ActivityPub JSON")
def object_json(self, instance):
return instance.to_ap()
def has_add_permission(self, request, obj=None):
"""
Disables admin creation of posts as it will skip steps
"""
return False
@admin.register(TimelineEvent)
class TimelineEventAdmin(admin.ModelAdmin):
list_display = ["id", "identity", "created", "type"]
readonly_fields = ["created"]
raw_id_fields = [
"identity",
"subject_post",
"subject_identity",
"subject_post_interaction",
]
def has_add_permission(self, request, obj=None):
return False
@admin.register(FanOut)
class FanOutAdmin(admin.ModelAdmin):
list_display = ["id", "state", "state_attempted", "type", "identity"]
raw_id_fields = ["identity", "subject_post", "subject_post_interaction"]
readonly_fields = ["created", "updated"]
actions = ["force_execution"]
@admin.action(description="Force Execution")
def force_execution(self, request, queryset):
for instance in queryset:
instance.transition_perform("new")
def has_add_permission(self, request, obj=None):
return False
@admin.register(PostInteraction)
class PostInteractionAdmin(admin.ModelAdmin):
list_display = ["id", "state", "state_attempted", "type", "identity", "post"]
raw_id_fields = ["identity", "post"]
def has_add_permission(self, request, obj=None):
return False
|