Browse Source

portal: refactor to convert to Matrix events rather than send

Signed-off-by: Sumner Evans <sumner@beeper.com>
Sumner Evans 2 years ago
parent
commit
8879a61673
1 changed files with 131 additions and 136 deletions
  1. 131 136
      mautrix_instagram/portal.py

+ 131 - 136
mautrix_instagram/portal.py

@@ -933,7 +933,7 @@ class Portal(DBPortal, BasePortal):
 
     async def _convert_instagram_media(
         self, source: u.User, intent: IntentAPI, item: ThreadItem
-    ) -> MessageEventContent:
+    ) -> ConvertedMessage:
         try:
             reupload_func, media_data = self._get_instagram_media_info(item)
             content = await reupload_func(source, media_data, intent)
@@ -946,12 +946,12 @@ class Portal(DBPortal, BasePortal):
             )
 
         await self._add_instagram_reply(content, item.replied_to_message)
-        return content
+        return EventType.ROOM_MESSAGE, content
 
     # TODO this might be unused
-    async def _handle_instagram_media_share(
+    async def _convert_instagram_media_share(
         self, source: u.User, intent: IntentAPI, item: ThreadItem
-    ) -> EventID | None:
+    ) -> list[ConvertedMessage]:
         item_type_name = None
         if item.media_share:
             share_item = item.media_share
@@ -966,7 +966,8 @@ class Portal(DBPortal, BasePortal):
         elif item.direct_media_share:
             share_item = item.direct_media_share.media
         else:
-            return None
+            self.log.debug("No media share to bridge")
+            return []
         item_type_name = item_type_name or share_item.media_type.human_name
         user_text = f"@{share_item.user.username}"
         user_link = (
@@ -985,7 +986,7 @@ class Portal(DBPortal, BasePortal):
             elif share_item.user.pk == source.igpk and tagged_user_id == self.other_user_pk:
                 prefix.body = prefix.formatted_body = "Tagged them in your post"
 
-        content = await self._convert_instagram_media(source, intent, item)
+        _, content = await self._convert_instagram_media(source, intent, item)
 
         external_url = f"https://www.instagram.com/p/{share_item.code}/"
         if share_item.caption:
@@ -1045,17 +1046,17 @@ class Portal(DBPortal, BasePortal):
                 }
                 combined["formatted_body"] = combined_formatted_body
 
-            event_id = await self._send_message(intent, combined, timestamp=item.timestamp_ms)
+            return [(EventType.ROOM_MESSAGE, combined)]
         else:
-            await self._send_message(intent, prefix, timestamp=item.timestamp_ms)
-            event_id = await self._send_message(intent, content, timestamp=item.timestamp_ms)
-            await self._send_message(intent, caption, timestamp=item.timestamp_ms)
+            return [
+                (EventType.ROOM_MESSAGE, prefix),
+                (EventType.ROOM_MESSAGE, content),
+                (EventType.ROOM_MESSAGE, caption),
+            ]
 
-        return event_id
-
-    async def _handle_instagram_xma_media_share(
+    async def _convert_instagram_xma_media_share(
         self, source: u.User, intent: IntentAPI, item: ThreadItem
-    ) -> EventID | None:
+    ) -> list[ConvertedMessage]:
         # N.B. _get_instagram_media_info also only supports downloading the first xma item
         xma_list = (
             item.xma_media_share
@@ -1069,7 +1070,7 @@ class Portal(DBPortal, BasePortal):
             self.log.warning(f"Item {item.item_id} has multiple xma media share parts")
         if media.xma_layout_type not in (0, 4):
             self.log.warning(f"Unrecognized xma layout type {media.xma_layout_type}")
-        content = await self._convert_instagram_media(source, intent, item)
+        _, content = await self._convert_instagram_media(source, intent, item)
 
         # Post shares (layout type 0): media title text
         # Reel shares/replies/reactions (layout type 4): item text
@@ -1158,17 +1159,14 @@ class Portal(DBPortal, BasePortal):
                     "org.matrix.msc1767.html": content["formatted_body"],
                 }
 
-            event_id = await self._send_message(intent, content, timestamp=item.timestamp_ms)
+            return [(EventType.ROOM_MESSAGE, content)]
         else:
-            event_id = await self._send_message(intent, content, timestamp=item.timestamp_ms)
-            await self._send_message(intent, caption, timestamp=item.timestamp_ms)
-
-        return event_id
+            return [(EventType.ROOM_MESSAGE, content), (EventType.ROOM_MESSAGE, caption)]
 
     # TODO this is probably unused
-    async def _handle_instagram_reel_share(
+    async def _convert_instagram_reel_share(
         self, source: u.User, intent: IntentAPI, item: ThreadItem
-    ) -> EventID | None:
+    ) -> list[ConvertedMessage]:
         assert item.reel_share
         media = item.reel_share.media
         prefix_html = None
@@ -1194,7 +1192,7 @@ class Portal(DBPortal, BasePortal):
                 prefix = "You mentioned them in your story"
         else:
             self.log.debug(f"Unsupported reel share type {item.reel_share.type}")
-            return None
+            return []
         prefix_content = TextMessageEventContent(msgtype=MessageType.NOTICE, body=prefix)
         if prefix_html:
             prefix_content.format = Format.HTML
@@ -1220,7 +1218,7 @@ class Portal(DBPortal, BasePortal):
                 # use a Matrix reply instead of reposting the image.
                 caption_content.set_reply(existing.mxid)
             else:
-                media_content = await self._convert_instagram_media(source, intent, item)
+                _, media_content = await self._convert_instagram_media(source, intent, item)
 
         if self.bridge.config["bridge.caption_in_message"]:
             if media_content:
@@ -1261,33 +1259,21 @@ class Portal(DBPortal, BasePortal):
             else:
                 combined = caption_content
 
-            event_id = await self._send_message(intent, combined, timestamp=item.timestamp_ms)
+            return [(EventType.ROOM_MESSAGE, combined)]
         else:
             await self._send_message(intent, prefix_content, timestamp=item.timestamp_ms)
+            converted: list[ConvertedMessage] = []
             if media_content:
-                media_event_id = await self._send_message(
-                    intent, media_content, timestamp=item.timestamp_ms
-                )
-                await DBMessage(
-                    mxid=media_event_id,
-                    mx_room=self.mxid,
-                    item_id=fake_item_id,
-                    client_context=None,
-                    receiver=self.receiver,
-                    sender=media.user.pk,
-                    ig_timestamp=None,
-                ).insert()
-            event_id = await self._send_message(
-                intent, caption_content, timestamp=item.timestamp_ms
-            )
-        return event_id
+                converted.append((EventType.ROOM_MESSAGE, media_content))
+            converted.append((EventType.ROOM_MESSAGE, caption_content))
+            return converted
 
-    async def _handle_instagram_link(
+    async def _convert_instagram_link(
         self,
         source: u.User,
         intent: IntentAPI,
         item: ThreadItem,
-    ) -> EventID:
+    ) -> ConvertedMessage:
         content = TextMessageEventContent(msgtype=MessageType.TEXT, body=item.link.text)
         link = item.link.link_context
         preview = {
@@ -1309,28 +1295,24 @@ class Portal(DBPortal, BasePortal):
         preview = {k: v for k, v in preview.items() if v}
         content["com.beeper.linkpreviews"] = [preview] if "og:title" in preview else []
         await self._add_instagram_reply(content, item.replied_to_message)
-        return await self._send_message(intent, content, timestamp=item.timestamp_ms)
+        return EventType.ROOM_MESSAGE, content
 
-    async def _handle_instagram_text(
-        self, intent: IntentAPI, item: ThreadItem, text: str
-    ) -> EventID:
+    async def _convert_instagram_text(self, item: ThreadItem, text: str) -> ConvertedMessage:
         content = TextMessageEventContent(msgtype=MessageType.TEXT, body=text)
         content["com.beeper.linkpreviews"] = []
         await self._add_instagram_reply(content, item.replied_to_message)
-        return await self._send_message(intent, content, timestamp=item.timestamp_ms)
+        return EventType.ROOM_MESSAGE, content
 
-    async def _send_instagram_unhandled(self, intent: IntentAPI, item: ThreadItem) -> EventID:
+    async def _convert_instagram_unhandled(self, item: ThreadItem) -> ConvertedMessage:
         content = TextMessageEventContent(
             msgtype=MessageType.NOTICE, body=f"Unsupported message type {item.item_type.value}"
         )
         await self._add_instagram_reply(content, item.replied_to_message)
-        return await self._send_message(intent, content, timestamp=item.timestamp_ms)
+        return EventType.ROOM_MESSAGE, content
 
-    async def _handle_instagram_location(
-        self, intent: IntentAPI, item: ThreadItem
-    ) -> EventID | None:
+    async def _convert_instagram_location(self, item: ThreadItem) -> ConvertedMessage | None:
         loc = item.location
-        if not loc.lng or not loc.lat:
+        if not loc or not loc.lng or not loc.lat:
             # TODO handle somehow
             return None
         long_char = "E" if loc.lng > 0 else "W"
@@ -1356,12 +1338,9 @@ class Portal(DBPortal, BasePortal):
         content["formatted_body"] = f"Location: <a href='{url}'>{body}</a>"
 
         await self._add_instagram_reply(content, item.replied_to_message)
+        return EventType.ROOM_MESSAGE, content
 
-        return await self._send_message(intent, content, timestamp=item.timestamp_ms)
-
-    async def _handle_instagram_profile(
-        self, intent: IntentAPI, item: ThreadItem
-    ) -> EventID | None:
+    async def _convert_instagram_profile(self, item: ThreadItem) -> ConvertedMessage:
         username = item.profile.username
         user_link = f'<a href="https://www.instagram.com/{username}/">@{username}</a>'
         text = f"Shared @{username}'s profile"
@@ -1370,16 +1349,7 @@ class Portal(DBPortal, BasePortal):
             msgtype=MessageType.TEXT, format=Format.HTML, body=text, formatted_body=html
         )
         await self._add_instagram_reply(content, item.replied_to_message)
-        return await self._send_message(intent, content, timestamp=item.timestamp_ms)
-
-    async def handle_instagram_item(
-        self, source: u.User, sender: p.Puppet, item: ThreadItem, is_backfill: bool = False
-    ) -> None:
-        try:
-            await self._handle_instagram_item(source, sender, item, is_backfill)
-        except Exception:
-            self.log.exception("Fatal error handling Instagram item")
-            self.log.trace("Item content: %s", item.serialize())
+        return EventType.ROOM_MESSAGE, content
 
     async def _add_instagram_reply(
         self, content: MessageEventContent, reply_to: ThreadItem | None
@@ -1413,13 +1383,7 @@ class Portal(DBPortal, BasePortal):
 
         content.set_reply(evt)
 
-    async def _handle_instagram_item(
-        self, source: u.User, sender: p.Puppet, item: ThreadItem, is_backfill: bool = False
-    ) -> None:
-        if not isinstance(item, ThreadItem):
-            # Parsing these items failed, they should have been logged already
-            return
-
+    async def handle_instagram_item(self, source: u.User, sender: p.Puppet, item: ThreadItem):
         client_context = item.client_context
         link_client_context = item.link.client_context if item.link else None
         cc = client_context
@@ -1433,48 +1397,94 @@ class Portal(DBPortal, BasePortal):
                 f"Ignoring message {item.item_id} ({cc}) by {item.user_id}"
                 " as it was sent by us (client_context in dedup queue)"
             )
-            return
+            return []
         elif link_client_context and link_client_context in self._reqid_dedup:
             self.log.debug(
                 f"Ignoring message {item.item_id} ({cc}) by {item.user_id}"
                 " as it was sent by us (link.client_context in dedup queue)"
             )
-            return
+            return []
 
+        # Check in-memory queues for duplicates
         if item.item_id in self._msgid_dedup:
             self.log.debug(
-                f"Ignoring message {item.item_id} ({cc}) by {item.user_id}"
+                f"Ignoring message {item.item_id} ({item.client_context}) by {item.user_id}"
                 " as it was already handled (message.id in dedup queue)"
             )
             return
         self._msgid_dedup.appendleft(item.item_id)
 
+        # Check database for duplicates
         if await DBMessage.get_by_item_id(item.item_id, self.receiver) is not None:
             self.log.debug(
-                f"Ignoring message {item.item_id} ({cc}) by {item.user_id}"
+                f"Ignoring message {item.item_id} ({item.client_context}) by {item.user_id}"
                 " as it was already handled (message.id in database)"
             )
             return
 
-        self.log.debug(f"Starting handling of message {item.item_id} ({cc}) by {item.user_id}")
-        asyncio.create_task(sender.intent_for(self).set_typing(self.mxid, is_typing=False))
-        await self._handle_deduplicated_instagram_item(source, sender, item, is_backfill)
+        self.log.debug(
+            f"Handling Instagram message {item.item_id} ({item.client_context}) by {item.user_id}"
+        )
+        if not self.mxid:
+            # TODO figure out where to get the info from
+            mxid = await self.create_matrix_room(source)
+            if not mxid:
+                # Failed to create
+                return
 
-    async def _handle_deduplicated_instagram_item(
-        self, source: u.User, sender: p.Puppet, item: ThreadItem, is_backfill: bool = False
-    ) -> None:
-        if self.backfill_lock.locked and sender.need_backfill_invite(self):
-            self.log.debug("Adding %s's default puppet to room for backfilling", sender.mxid)
-            if self.is_direct:
-                await self.main_intent.invite_user(self.mxid, sender.default_mxid)
-            intent = sender.default_mxid_intent
-            await intent.ensure_joined(self.mxid)
-            self._backfill_leave.add(intent)
-        else:
-            intent = sender.intent_for(self)
-        event_id = None
-        needs_handling = True
-        allow_text_handle = True
+            if self.config["bridge.backfill.enable"]:
+                await self.enqueue_immediate_backfill(source, 0)
+
+        intent = sender.intent_for(self)
+        asyncio.create_task(intent.set_typing(self.mxid, is_typing=False))
+        event_ids = []
+        for event_type, content in await self.convert_instagram_item(source, sender, item):
+            event_ids.append(
+                await self._send_message(
+                    intent, content, event_type=event_type, timestamp=item.timestamp_ms
+                )
+            )
+        event_ids = [event_id for event_id in event_ids if event_id]
+        if not event_ids:
+            self.log.warning(f"Unhandled Instagram message {item.item_id}")
+            return
+        self.log.debug(f"Handled Instagram message {item.item_id} -> {event_ids}")
+        await DBMessage(
+            mxid=event_ids[-1],
+            mx_room=self.mxid,
+            item_id=item.item_id,
+            client_context=item.client_context,
+            receiver=self.receiver,
+            sender=sender.igpk,
+            ig_timestamp=item.timestamp_ms,
+        ).insert()
+        await self._send_delivery_receipt(event_ids[-1])
+
+        # TODO handle reactions
+        return
+        if isinstance(message, graphql.Message) and message.message_reactions:
+            await self._handle_graphql_reactions(
+                source, created_msgs[0], message.message_reactions, timestamp
+            )
+
+    async def convert_instagram_item(
+        self, source: u.User, sender: p.Puppet, item: ThreadItem
+    ) -> list[ConvertedMessage]:
+        if not isinstance(item, ThreadItem):
+            # Parsing these items failed, they should have been logged already
+            return []
+
+        try:
+            return await self._convert_instagram_item(source, sender, item)
+        except Exception:
+            self.log.exception("Fatal error converting Instagram item")
+            self.log.trace("Item content: %s", item.serialize())
+            return []
+
+    async def _convert_instagram_item(
+        self, source: u.User, sender: p.Puppet, item: ThreadItem
+    ) -> list[ConvertedMessage]:
+        intent = sender.intent_for(self)
         if (
             item.xma_media_share
             or item.xma_reel_share
@@ -1482,17 +1492,19 @@ class Portal(DBPortal, BasePortal):
             or item.xma_story_share
             or item.generic_xma
         ):
-            event_id = await self._handle_instagram_xma_media_share(source, intent, item)
-            allow_text_handle = False
-        elif item.media or item.animated_media or item.voice_media or item.visual_media:
-            content = await self._convert_instagram_media(source, intent, item)
-            event_id = await self._send_message(intent, content, timestamp=item.timestamp_ms)
+            return await self._convert_instagram_xma_media_share(source, intent, item)
+
+        converted: list[ConvertedMessage] = []
+
+        if item.media or item.animated_media or item.voice_media or item.visual_media:
+            converted.append(await self._convert_instagram_media(source, intent, item))
         elif item.location:
-            event_id = await self._handle_instagram_location(intent, item)
+            if loc_content := await self._convert_instagram_location(item):
+                converted.append(loc_content)
         elif item.profile:
-            event_id = await self._handle_instagram_profile(intent, item)
+            converted.append(await self._convert_instagram_profile(item))
         elif item.reel_share:
-            event_id = await self._handle_instagram_reel_share(source, intent, item)
+            converted.extend(await self._convert_instagram_reel_share(source, intent, item))
         elif (
             item.media_share
             or item.direct_media_share
@@ -1500,45 +1512,28 @@ class Portal(DBPortal, BasePortal):
             or item.clip
             or item.felix_share
         ):
-            event_id = await self._handle_instagram_media_share(source, intent, item)
+            converted.extend(await self._convert_instagram_media_share(source, intent, item))
         elif item.action_log:
             # These probably don't need to be bridged
-            needs_handling = False
             self.log.debug(f"Ignoring action log message {item.item_id}")
+            return []
+
         # TODO handle item.clip?
-        if item.text and allow_text_handle:
-            event_id = await self._handle_instagram_text(intent, item, item.text)
+        # TODO should these be put into a caption?
+        if item.text:
+            converted.append(await self._convert_instagram_text(item, item.text))
         elif item.like:
             # We handle likes as text because Matrix clients do big emoji on their own.
-            event_id = await self._handle_instagram_text(intent, item, item.like)
+            converted.append(await self._convert_instagram_text(item, item.like))
         elif item.link:
-            event_id = await self._handle_instagram_link(source, intent, item)
-        handled = bool(event_id)
-        if not event_id and needs_handling:
+            converted.append(await self._convert_instagram_link(source, intent, item))
+
+        if len(converted) == 0:
             self.log.debug(f"Unhandled Instagram message {item.item_id}")
-            event_id = await self._send_instagram_unhandled(intent, item)
+            converted.append(await self._convert_instagram_unhandled(item))
 
-        cc = item.client_context
-        if not cc and item.link and item.link.client_context:
-            cc = item.link.client_context
-        msg = DBMessage(
-            mxid=event_id,
-            mx_room=self.mxid,
-            item_id=item.item_id,
-            client_context=cc,
-            receiver=self.receiver,
-            sender=sender.pk,
-            ig_timestamp=item.timestamp,
-        )
-        await msg.insert()
-        await self._send_delivery_receipt(event_id)
-        if handled:
-            self.log.debug(f"Handled Instagram message {item.item_id} -> {event_id}")
-        elif needs_handling:
-            self.log.debug(
-                f"Unhandled Instagram message {item.item_id} "
-                f"(type {item.item_type} -> fallback error {event_id})"
-            )
+        return converted
+        # TODO HANDLE REACTIONS
         if is_backfill and item.reactions:
             await self._handle_instagram_reactions(msg, item.reactions.emojis, is_backfill=True)