Преглед изворни кода

Add support for xma_media_share items

Tulir Asokan пре 2 година
родитељ
комит
e8264354bb
3 измењених фајлова са 99 додато и 2 уклоњено
  1. 1 0
      mauigpapi/types/__init__.py
  2. 18 0
      mauigpapi/types/thread_item.py
  3. 80 2
      mautrix_instagram/portal.py

+ 1 - 0
mauigpapi/types/__init__.py

@@ -97,6 +97,7 @@ from .thread_item import (
     VisualMedia,
     VoiceMediaData,
     VoiceMediaItem,
+    XMAMediaShareItem,
 )
 from .upload import FinishUploadResponse, UploadPhotoResponse, UploadVideoResponse
 from .user import SearchResultUser, UserSearchResponse

+ 18 - 0
mauigpapi/types/thread_item.py

@@ -77,6 +77,7 @@ class ThreadItemType(ExtensibleEnum):
     REACTION = "reaction"
     CLIP = "clip"
     GUIDE_SHARE = "guide_share"
+    XMA_MEDIA_SHARE = "xma_media_share"
 
 
 @dataclass(kw_only=True)
@@ -493,6 +494,22 @@ class DirectMediaShareItem(SerializableAttrs):
     media: MediaShareItem
 
 
+@dataclass
+class XMAMediaShareItem(SerializableAttrs):
+    xma_layout_type: int
+
+    target_url: str
+
+    title_text: str
+    header_title_text: str
+    # subtitle_text: Optional[str]
+
+    preview_url: str
+    preview_url_mime_type: str
+    preview_width: int
+    preview_height: int
+
+
 @dataclass
 class ClipItem(SerializableAttrs):
     # TODO there are some additional fields in clips
@@ -532,6 +549,7 @@ class ThreadItem(SerializableAttrs):
     visual_media: Optional[VisualMedia] = None
     media_share: Optional[MediaShareItem] = None
     direct_media_share: Optional[DirectMediaShareItem] = None
+    xma_media_share: Optional[List[XMAMediaShareItem]] = None
     reel_share: Optional[ReelShareItem] = None
     story_share: Optional[StoryShareItem] = None
     location: Optional[Location] = None

+ 80 - 2
mautrix_instagram/portal.py

@@ -19,12 +19,14 @@ from typing import TYPE_CHECKING, Any, AsyncGenerator, Awaitable, Callable, Unio
 from collections import deque
 from io import BytesIO
 import asyncio
+import html
 import json
 import mimetypes
 import re
 import sqlite3
 import time
 
+from yarl import URL
 import asyncpg
 import magic
 
@@ -47,6 +49,7 @@ from mauigpapi.types import (
     ThreadUserLastSeenAt,
     TypingStatus,
     VoiceMediaItem,
+    XMAMediaShareItem,
 )
 from mautrix.appservice import AppService, IntentAPI
 from mautrix.bridge import BasePortal, NotificationDisabler, async_getter_lock
@@ -103,6 +106,7 @@ MediaData = Union[
     ReelMediaShareItem,
     RegularMediaItem,
     VoiceMediaItem,
+    XMAMediaShareItem,
 ]
 MediaUploadFunc = Callable[["u.User", MediaData, IntentAPI], Awaitable[MediaMessageEventContent]]
 
@@ -761,6 +765,17 @@ class Portal(DBPortal, BasePortal):
         )
         return await self._reupload_instagram_file(source, url, MessageType.IMAGE, info, intent)
 
+    async def _reupload_instagram_xma(
+        self, source: u.User, media: XMAMediaShareItem, intent: IntentAPI
+    ) -> MediaMessageEventContent:
+        url = media.preview_url
+        info = ImageInfo(
+            height=media.preview_height,
+            width=media.preview_width,
+            mimetype=media.preview_url_mime_type,
+        )
+        return await self._reupload_instagram_file(source, url, MessageType.IMAGE, info, intent)
+
     async def _reupload_instagram_voice(
         self, source: u.User, media: VoiceMediaItem, intent: IntentAPI
     ) -> MediaMessageEventContent:
@@ -866,6 +881,9 @@ class Portal(DBPortal, BasePortal):
         elif item.voice_media:
             media_data = item.voice_media
             method = self._reupload_instagram_voice
+        elif item.xma_media_share:
+            media_data = item.xma_media_share[0]
+            method = self._reupload_instagram_xma
         elif item.reel_share:
             media_data = item.reel_share.media
         elif item.story_share:
@@ -952,7 +970,7 @@ class Portal(DBPortal, BasePortal):
             )
             caption_formatted_body = (
                 f"<blockquote><strong>{share_item.caption.user.username}</strong>"
-                f" {share_item.caption.text}</blockquote>"
+                f" {html.escape(share_item.caption.text)}</blockquote>"
                 f'<a href="{external_url}">instagram.com/p/{share_item.code}</a>'
             )
         else:
@@ -1010,6 +1028,64 @@ class Portal(DBPortal, BasePortal):
 
         return event_id
 
+    async def _handle_instagram_xma_media_share(
+        self, source: u.User, intent: IntentAPI, item: ThreadItem
+    ) -> EventID | None:
+        # N.B. _get_instagram_media_info also only supports downloading the first xma item
+        media = item.xma_media_share[0]
+        if len(item.xma_media_share) != 1:
+            self.log.warning(f"Item {item.item_id} has multiple xma media share parts")
+        if media.xma_layout_type != 0:
+            self.log.warning(f"Unrecognized xma layout type {media.xma_layout_type}")
+        content = await self._convert_instagram_media(source, intent, item)
+
+        caption_body = f"> {media.title_text}\n\n" f"{media.target_url}"
+        target_url_pretty = str(URL(media.target_url).with_query(None)).replace("https://www.", "")
+        escaped_title_text = html.escape(media.title_text)
+        escaped_header_text = html.escape(media.header_title_text)
+        if escaped_title_text.startswith(escaped_header_text):
+            escaped_title_text = (
+                f"<strong>{escaped_header_text}</strong>"
+                f"{escaped_title_text[len(escaped_header_text):]}"
+            )
+        caption_formatted_body = (
+            f"<blockquote>{escaped_title_text}</blockquote>"
+            f'<a href="{media.target_url}">{target_url_pretty}</a>'
+        )
+        content.external_url = media.target_url
+        caption = TextMessageEventContent(
+            msgtype=MessageType.TEXT,
+            body=caption_body,
+            formatted_body=caption_formatted_body,
+            format=Format.HTML,
+            external_url=media.target_url,
+        )
+
+        if self.bridge.config["bridge.caption_in_message"]:
+            if isinstance(content, TextMessageEventContent):
+                content.ensure_has_html()
+                caption.ensure_has_html()
+                content.body += f"\n\n{caption.body}"
+                content.formatted_body = (
+                    f"<p><b>{content.formatted_body}</b></p><p>{caption.formatted_body}</p>"
+                )
+            else:
+                content["filename"] = content.body
+                content.body = caption.body
+                content["format"] = str(Format.HTML)
+                content["formatted_body"] = caption.formatted_body
+                content["org.matrix.msc1767.caption"] = {
+                    "org.matrix.msc1767.text": content.body,
+                    "org.matrix.msc1767.html": content["formatted_body"],
+                }
+
+            event_id = await self._send_message(intent, content, timestamp=item.timestamp_ms)
+        else:
+            event_id = await self._send_message(intent, content, timestamp=item.timestamp_ms)
+            await self._send_message(intent, caption, timestamp=item.timestamp_ms)
+
+        return event_id
+
     async def _handle_instagram_reel_share(
         self, source: u.User, intent: IntentAPI, item: ThreadItem
     ) -> EventID | None:
@@ -1080,7 +1156,7 @@ class Portal(DBPortal, BasePortal):
                         formatted_body=(
                             f"<p><b>{media_content.formatted_body}</b></p>"
                             f"<p><i>{prefix_content.formatted_body}</i></p>"
-                            f"<p>{caption_content.formatted_body}</p>",
+                            f"<p>{caption_content.formatted_body}</p>"
                         ),
                         format=Format.HTML,
                     )
@@ -1335,6 +1411,8 @@ class Portal(DBPortal, BasePortal):
             or item.felix_share
         ):
             event_id = await self._handle_instagram_media_share(source, intent, item)
+        elif item.xma_media_share:
+            event_id = await self._handle_instagram_xma_media_share(source, intent, item)
         elif item.action_log:
             # These probably don't need to be bridged
             needs_handling = False