summaryrefslogtreecommitdiffstats
path: root/users/models/inbox_message.py
blob: 0bf6851b0d31ef76163d698ffb756f777bd3f0ab (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from asgiref.sync import sync_to_async
from django.db import models

from core.models import Config
from stator.models import State, StateField, StateGraph, StatorModel


class InboxMessageStates(StateGraph):
    received = State(try_interval=300)
    processed = State(try_interval=86400, attempt_immediately=False)
    purged = State()  # This is actually deletion, it will never get here

    received.transitions_to(processed)
    processed.transitions_to(purged)

    @classmethod
    async def handle_received(cls, instance: "InboxMessage"):
        from activities.models import Post, PostInteraction
        from users.models import Follow, Identity

        match instance.message_type:
            case "follow":
                await sync_to_async(Follow.handle_request_ap)(instance.message)
            case "announce":
                await sync_to_async(PostInteraction.handle_ap)(instance.message)
            case "like":
                await sync_to_async(PostInteraction.handle_ap)(instance.message)
            case "create":
                match instance.message_object_type:
                    case "note":
                        await sync_to_async(Post.handle_create_ap)(instance.message)
                    case unknown:
                        raise ValueError(
                            f"Cannot handle activity of type create.{unknown}"
                        )
            case "update":
                match instance.message_object_type:
                    case "note":
                        await sync_to_async(Post.handle_update_ap)(instance.message)
                    case "person":
                        await sync_to_async(Identity.handle_update_ap)(instance.message)
                    case unknown:
                        raise ValueError(
                            f"Cannot handle activity of type update.{unknown}"
                        )
            case "accept":
                match instance.message_object_type:
                    case "follow":
                        await sync_to_async(Follow.handle_accept_ap)(instance.message)
                    case unknown:
                        raise ValueError(
                            f"Cannot handle activity of type accept.{unknown}"
                        )
            case "undo":
                match instance.message_object_type:
                    case "follow":
                        await sync_to_async(Follow.handle_undo_ap)(instance.message)
                    case "like":
                        await sync_to_async(PostInteraction.handle_undo_ap)(
                            instance.message
                        )
                    case "announce":
                        await sync_to_async(PostInteraction.handle_undo_ap)(
                            instance.message
                        )
                    case unknown:
                        raise ValueError(
                            f"Cannot handle activity of type undo.{unknown}"
                        )
            case "delete":
                # If there is no object type, it's probably a profile
                if not isinstance(instance.message["object"], dict):
                    await sync_to_async(Identity.handle_delete_ap)(instance.message)
                else:
                    match instance.message_object_type:
                        case "tombstone":
                            await sync_to_async(Post.handle_delete_ap)(instance.message)
                        case unknown:
                            raise ValueError(
                                f"Cannot handle activity of type delete.{unknown}"
                            )
            case "add":
                # We are ignoring these right now (probably pinned items)
                pass
            case "remove":
                # We are ignoring these right now (probably pinned items)
                pass
            case unknown:
                raise ValueError(f"Cannot handle activity of type {unknown}")
        return cls.processed

    @classmethod
    async def handle_processed(cls, instance: "InboxMessage"):
        if instance.state_age > Config.system.inbox_message_purge_after:
            await InboxMessage.objects.filter(pk=instance.pk).adelete()


class InboxMessage(StatorModel):
    """
    an incoming inbox message that needs processing.

    Yes, this is kind of its own message queue built on the state graph system.
    It's fine. It'll scale up to a decent point.
    """

    message = models.JSONField()

    state = StateField(InboxMessageStates)

    @property
    def message_type(self):
        return self.message["type"].lower()

    @property
    def message_object_type(self):
        return self.message["object"]["type"].lower()

    @property
    def message_actor(self):
        return self.message.get("actor")