Преглед изворни кода

Try to mark reactions as read on Matrix when backfilling

Tulir Asokan пре 3 година
родитељ
комит
3abf4c3ec3

+ 1 - 1
mauigpapi/mqtt/conn.py

@@ -448,7 +448,7 @@ class AndroidMQTT:
             try:
                 await handler(evt)
             except Exception:
-                self.log.exception(f"Error in {type(evt)} handler")
+                self.log.exception(f"Error in {type(evt).__name__} handler")
 
     def disconnect(self) -> None:
         self._client.disconnect()

+ 4 - 0
mauigpapi/types/thread.py

@@ -42,6 +42,10 @@ class ThreadUserLastSeenAt(SerializableAttrs):
     item_id: str
     shh_seen_state: Dict[str, Any]
 
+    @property
+    def timestamp_ms(self) -> int:
+        return int(self.timestamp) // 1000
+
 
 @dataclass(kw_only=True)
 class Thread(SerializableAttrs):

+ 5 - 1
mauigpapi/types/thread_item.py

@@ -383,10 +383,14 @@ class AnimatedMediaItem(SerializableAttrs):
 class Reaction(SerializableAttrs):
     sender_id: int
     timestamp: int
-    client_context: int
+    client_context: Optional[str]
     emoji: str = "❤️"
     super_react_type: Optional[str] = None
 
+    @property
+    def timestamp_ms(self) -> int:
+        return self.timestamp // 1000
+
 
 @dataclass
 class Reactions(SerializableAttrs):

+ 4 - 0
mautrix_instagram/db/message.py

@@ -37,6 +37,10 @@ class Message:
     sender: int
     ig_timestamp: int | None
 
+    @property
+    def ig_timestamp_ms(self) -> int:
+        return (self.ig_timestamp // 1000) if self.ig_timestamp else 0
+
     async def insert(self) -> None:
         q = """
             INSERT INTO message (mxid, mx_room, item_id, client_context, receiver, sender,

+ 37 - 20
mautrix_instagram/db/reaction.py

@@ -35,12 +35,14 @@ class Reaction:
     ig_receiver: int
     ig_sender: int
     reaction: str
+    mx_timestamp: int | None
 
     async def insert(self) -> None:
-        q = (
-            "INSERT INTO reaction (mxid, mx_room, ig_item_id, ig_receiver, ig_sender, reaction) "
-            "VALUES ($1, $2, $3, $4, $5, $6)"
-        )
+        q = """
+        INSERT INTO reaction (mxid, mx_room, ig_item_id, ig_receiver, ig_sender, reaction,
+                              mx_timestamp)
+        VALUES ($1, $2, $3, $4, $5, $6, $7)
+        """
         await self.db.execute(
             q,
             self.mxid,
@@ -49,27 +51,34 @@ class Reaction:
             self.ig_receiver,
             self.ig_sender,
             self.reaction,
+            self.mx_timestamp,
         )
 
-    async def edit(self, mx_room: RoomID, mxid: EventID, reaction: str) -> None:
-        q = (
-            "UPDATE reaction SET mxid=$1, mx_room=$2, reaction=$3 "
-            "WHERE ig_item_id=$4 AND ig_receiver=$5 AND ig_sender=$6"
-        )
+    async def edit(self, mx_room: RoomID, mxid: EventID, reaction: str, mx_timestamp: int) -> None:
+        q = """
+        UPDATE reaction SET mxid=$1, mx_room=$2, reaction=$3, mx_timestamp=$4
+        WHERE ig_item_id=$5 AND ig_receiver=$6 AND ig_sender=$7
+        """
         await self.db.execute(
-            q, mxid, mx_room, reaction, self.ig_item_id, self.ig_receiver, self.ig_sender
+            q,
+            mxid,
+            mx_room,
+            reaction,
+            mx_timestamp,
+            self.ig_item_id,
+            self.ig_receiver,
+            self.ig_sender,
         )
 
     async def delete(self) -> None:
         q = "DELETE FROM reaction WHERE ig_item_id=$1 AND ig_receiver=$2 AND ig_sender=$3"
         await self.db.execute(q, self.ig_item_id, self.ig_receiver, self.ig_sender)
 
+    _columns = "mxid, mx_room, ig_item_id, ig_receiver, ig_sender, reaction, mx_timestamp"
+
     @classmethod
     async def get_by_mxid(cls, mxid: EventID, mx_room: RoomID) -> Reaction | None:
-        q = (
-            "SELECT mxid, mx_room, ig_item_id, ig_receiver, ig_sender, reaction "
-            "FROM reaction WHERE mxid=$1 AND mx_room=$2"
-        )
+        q = f"SELECT {cls._columns} FROM reaction WHERE mxid=$1 AND mx_room=$2"
         row = await cls.db.fetchrow(q, mxid, mx_room)
         if not row:
             return None
@@ -80,8 +89,8 @@ class Reaction:
         cls, ig_item_id: str, ig_receiver: int, ig_sender: int
     ) -> Reaction | None:
         q = (
-            "SELECT mxid, mx_room, ig_item_id, ig_receiver, ig_sender, reaction "
-            "FROM reaction WHERE ig_item_id=$1 AND ig_sender=$2 AND ig_receiver=$3"
+            f"SELECT {cls._columns} FROM reaction"
+            " WHERE ig_item_id=$1 AND ig_sender=$2 AND ig_receiver=$3"
         )
         row = await cls.db.fetchrow(q, ig_item_id, ig_sender, ig_receiver)
         if not row:
@@ -95,9 +104,17 @@ class Reaction:
 
     @classmethod
     async def get_all_by_item_id(cls, ig_item_id: str, ig_receiver: int) -> list[Reaction]:
-        q = (
-            "SELECT mxid, mx_room, ig_item_id, ig_receiver, ig_sender, reaction "
-            "FROM reaction WHERE ig_item_id=$1 AND ig_receiver=$2"
-        )
+        q = f"SELECT {cls._columns} FROM reaction WHERE ig_item_id=$1 AND ig_receiver=$2"
         rows = await cls.db.fetch(q, ig_item_id, ig_receiver)
         return [cls(**row) for row in rows]
+
+    @classmethod
+    async def get_closest(cls, mx_room: RoomID, before_ts: int) -> Reaction | None:
+        q = f"""
+        SELECT {cls._columns} FROM reaction WHERE mx_room=$1 AND mx_timestamp<=$2
+        ORDER BY mx_timestamp DESC LIMIT 1
+        """
+        row = await cls.db.fetchrow(q, mx_room, before_ts)
+        if not row:
+            return None
+        return cls(**row)

+ 5 - 0
mautrix_instagram/db/upgrade.py

@@ -120,3 +120,8 @@ async def upgrade_v5(conn: Connection) -> None:
 @upgrade_table.register(description="Allow hidden events in message table")
 async def upgrade_v6(conn: Connection) -> None:
     await conn.execute("ALTER TABLE message ALTER COLUMN mxid DROP NOT NULL")
+
+
+@upgrade_table.register(description="Store reaction timestamps")
+async def upgrade_v7(conn: Connection) -> None:
+    await conn.execute("ALTER TABLE reaction ADD COLUMN mx_timestamp BIGINT")

+ 10 - 3
mautrix_instagram/matrix.py

@@ -94,7 +94,12 @@ class MatrixHandler(BaseMatrixHandler):
 
     @classmethod
     async def handle_reaction(
-        cls, room_id: RoomID, user_id: UserID, event_id: EventID, content: ReactionEventContent
+        cls,
+        room_id: RoomID,
+        user_id: UserID,
+        event_id: EventID,
+        content: ReactionEventContent,
+        timestamp: int,
     ) -> None:
         if content.relates_to.rel_type != RelationType.ANNOTATION:
             cls.log.debug(
@@ -111,7 +116,7 @@ class MatrixHandler(BaseMatrixHandler):
             return
 
         await portal.handle_matrix_reaction(
-            user, event_id, content.relates_to.event_id, content.relates_to.key
+            user, event_id, content.relates_to.event_id, content.relates_to.key, timestamp
         )
 
     async def handle_read_receipt(
@@ -143,7 +148,9 @@ class MatrixHandler(BaseMatrixHandler):
             await self.handle_redaction(evt.room_id, evt.sender, evt.redacts, evt.event_id)
         elif evt.type == EventType.REACTION:
             evt: ReactionEvent
-            await self.handle_reaction(evt.room_id, evt.sender, evt.event_id, evt.content)
+            await self.handle_reaction(
+                evt.room_id, evt.sender, evt.event_id, evt.content, evt.timestamp
+            )
 
     async def handle_ephemeral_event(
         self, evt: ReceiptEvent | PresenceEvent | TypingEvent

+ 40 - 17
mautrix_instagram/portal.py

@@ -22,6 +22,7 @@ import asyncio
 import json
 import mimetypes
 import re
+import time
 
 import asyncpg
 import magic
@@ -30,7 +31,6 @@ from mauigpapi.types import (
     AnimatedMediaItem,
     CommandResponse,
     ExpiredMediaItem,
-    LinkContext,
     MediaShareItem,
     MediaType,
     MessageSyncMessage,
@@ -241,6 +241,7 @@ class Portal(DBPortal, BasePortal):
         message: DBMessage,
         sender: u.User | p.Puppet,
         reaction: str,
+        mx_timestamp: int,
     ) -> None:
         if existing:
             self.log.debug(
@@ -248,7 +249,9 @@ class Portal(DBPortal, BasePortal):
                 f" (message: {message.mxid})"
             )
             await intent.redact(existing.mx_room, existing.mxid)
-            await existing.edit(reaction=reaction, mxid=mxid, mx_room=message.mx_room)
+            await existing.edit(
+                reaction=reaction, mxid=mxid, mx_room=message.mx_room, mx_timestamp=mx_timestamp
+            )
         else:
             self.log.debug(f"_upsert_reaction inserting {mxid} (message: {message.mxid})")
             await DBReaction(
@@ -258,6 +261,7 @@ class Portal(DBPortal, BasePortal):
                 ig_receiver=self.receiver,
                 ig_sender=sender.igpk,
                 reaction=reaction,
+                mx_timestamp=mx_timestamp,
             ).insert()
 
     # endregion
@@ -521,10 +525,10 @@ class Portal(DBPortal, BasePortal):
             )
 
     async def handle_matrix_reaction(
-        self, sender: u.User, event_id: EventID, reacting_to: EventID, emoji: str
+        self, sender: u.User, event_id: EventID, reacting_to: EventID, emoji: str, timestamp: int
     ) -> None:
         try:
-            await self._handle_matrix_reaction(sender, event_id, reacting_to, emoji)
+            await self._handle_matrix_reaction(sender, event_id, reacting_to, emoji, timestamp)
         except Exception as e:
             self.log.exception(f"Fatal error handling Matrix event {event_id}: {e}")
             message = "Fatal error handling reaction (see logs for more details)"
@@ -542,7 +546,7 @@ class Portal(DBPortal, BasePortal):
             )
 
     async def _handle_matrix_reaction(
-        self, sender: u.User, event_id: EventID, reacting_to: EventID, emoji: str
+        self, sender: u.User, event_id: EventID, reacting_to: EventID, emoji: str, timestamp: int
     ) -> None:
         message = await DBMessage.get_by_mxid(reacting_to, self.mxid)
         if not message or message.is_internal:
@@ -592,7 +596,7 @@ class Portal(DBPortal, BasePortal):
                 await self._send_delivery_receipt(event_id)
                 self.log.trace(f"{sender.mxid} reacted to {message.item_id} with {emoji}")
                 await self._upsert_reaction(
-                    existing, self.main_intent, event_id, message, sender, emoji
+                    existing, self.main_intent, event_id, message, sender, emoji, timestamp
                 )
 
     async def handle_matrix_redaction(
@@ -1260,39 +1264,51 @@ class Portal(DBPortal, BasePortal):
                 f"(type {item.item_type} -> fallback error {event_id})"
             )
         if is_backfill and item.reactions:
-            await self._handle_instagram_reactions(msg, item.reactions.emojis, item.timestamp_ms)
+            await self._handle_instagram_reactions(msg, item.reactions.emojis, is_backfill=True)
 
     async def handle_instagram_remove(self, item_id: str) -> None:
         message = await DBMessage.get_by_item_id(item_id, self.receiver)
         if message is None:
             return
         await message.delete()
-        sender = await p.Puppet.get_by_pk(message.sender)
-        try:
-            await sender.intent_for(self).redact(self.mxid, message.mxid)
-        except MForbidden:
-            await self.main_intent.redact(self.mxid, message.mxid)
-        self.log.debug(f"Redacted {message.mxid} after Instagram unsend")
+        if message.mxid:
+            sender = await p.Puppet.get_by_pk(message.sender)
+            try:
+                await sender.intent_for(self).redact(self.mxid, message.mxid)
+            except MForbidden:
+                await self.main_intent.redact(self.mxid, message.mxid)
+            self.log.debug(f"Redacted {message.mxid} after Instagram unsend")
 
     async def _handle_instagram_reactions(
-        self, message: DBMessage, reactions: list[Reaction], timestamp: int | None = None
+        self, message: DBMessage, reactions: list[Reaction], is_backfill: bool = False
     ) -> None:
         old_reactions: dict[int, DBReaction]
         old_reactions = {
             reaction.ig_sender: reaction
             for reaction in await DBReaction.get_all_by_item_id(message.item_id, self.receiver)
         }
+        timestamp_deduplicator = 1
         for new_reaction in reactions:
             old_reaction = old_reactions.pop(new_reaction.sender_id, None)
             if old_reaction and old_reaction.reaction == new_reaction.emoji:
                 continue
             puppet = await p.Puppet.get_by_pk(new_reaction.sender_id)
             intent = puppet.intent_for(self)
+            timestamp = new_reaction.timestamp_ms if is_backfill else int(time.time() * 1000)
+            if is_backfill:
+                timestamp += timestamp_deduplicator
+                timestamp_deduplicator += 1
             reaction_event_id = await intent.react(
                 self.mxid, message.mxid, new_reaction.emoji, timestamp=timestamp
             )
             await self._upsert_reaction(
-                old_reaction, intent, reaction_event_id, message, puppet, new_reaction.emoji
+                old_reaction,
+                intent,
+                reaction_event_id,
+                message,
+                puppet,
+                new_reaction.emoji,
+                timestamp,
             )
         for old_reaction in old_reactions.values():
             await old_reaction.delete()
@@ -1403,14 +1419,21 @@ class Portal(DBPortal, BasePortal):
 
     async def _update_read_receipts(self, receipts: dict[int | str, ThreadUserLastSeenAt]) -> None:
         for user_id, receipt in receipts.items():
+            message: DBMessage | DBReaction
             message = await DBMessage.get_by_item_id(receipt.item_id, self.receiver)
             if not message:
-                message = await DBMessage.get_closest(self.mxid, int(receipt.timestamp))
-                if not message:
+                reaction: DBReaction
+                message, reaction = await asyncio.gather(
+                    DBMessage.get_closest(self.mxid, int(receipt.timestamp)),
+                    DBReaction.get_closest(self.mxid, receipt.timestamp_ms),
+                )
+                if (not message or not message.mxid) and not reaction:
                     self.log.debug(
                         "Couldn't find message %s to mark as read by %s", receipt, user_id
                     )
                     continue
+                elif not message or (reaction and reaction.mx_timestamp > message.ig_timestamp_ms):
+                    message = reaction
             puppet = await p.Puppet.get_by_pk(int(user_id), create=False)
             if not puppet:
                 continue