# mautrix-instagram - A Matrix-Instagram puppeting bridge.
# Copyright (C) 2020 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see
{share_item.caption.user.username}" f" {share_item.caption.text}" f'instagram.com/p/{share_item.code}') caption = TextMessageEventContent(msgtype=MessageType.TEXT, body=body, formatted_body=formatted_body, format=Format.HTML, external_url=external_url) await self._send_message(intent, caption, timestamp=item.timestamp // 1000) return event_id async def _handle_instagram_reel_share(self, source: 'u.User', intent: IntentAPI, item: ThreadItem) -> Optional[EventID]: media = item.reel_share.media prefix_html = None if item.reel_share.type == ReelShareType.REPLY: if item.reel_share.reel_owner_id == source.igpk: prefix = "Replied to your story" else: username = media.user.username prefix = f"Sent @{username}'s story" user_link = f'@{username}' prefix_html = f"Sent {user_link}'s story" elif item.reel_share.type == ReelShareType.REACTION: prefix = "Reacted to your story" else: self.log.debug(f"Unsupported reel share type {item.reel_share.type}") return None prefix_content = TextMessageEventContent(msgtype=MessageType.NOTICE, body=prefix) if prefix_html: prefix_content.format = Format.HTML prefix_content.formatted_body = prefix_html content = TextMessageEventContent(msgtype=MessageType.TEXT, body=item.reel_share.text) await self._send_message(intent, prefix_content, timestamp=item.timestamp // 1000) if isinstance(media, ExpiredMediaItem): # TODO send message about expired story pass else: fake_item_id = f"fi.mau.instagram.reel_share.{item.user_id}.{media.pk}" existing = await DBMessage.get_by_item_id(fake_item_id, self.receiver) if existing: # If the user already reacted or replied to the same reel share item, # use a Matrix reply instead of reposting the image. content.set_reply(existing.mxid) else: media_event_id = await self._handle_instagram_media(source, intent, item) await DBMessage(mxid=media_event_id, mx_room=self.mxid, item_id=fake_item_id, receiver=self.receiver, sender=media.user.pk).insert() return await self._send_message(intent, content, timestamp=item.timestamp // 1000) async def _handle_instagram_text(self, intent: IntentAPI, item: ThreadItem, text: str, ) -> EventID: content = TextMessageEventContent(msgtype=MessageType.TEXT, body=text) await self._add_instagram_reply(content, item.replied_to_message) return await self._send_message(intent, content, timestamp=item.timestamp // 1000) async def _handle_instagram_location(self, intent: IntentAPI, item: ThreadItem) -> EventID: loc = item.location long_char = "E" if loc.lng > 0 else "W" lat_char = "N" if loc.lat > 0 else "S" body = (f"{loc.name} - {round(abs(loc.lat), 4)}° {lat_char}, " f"{round(abs(loc.lng), 4)}° {long_char}") url = f"https://www.openstreetmap.org/#map=15/{loc.lat}/{loc.lng}" external_url = None if loc.external_source == "facebook_places": external_url = f"https://www.facebook.com/{loc.short_name}-{loc.facebook_places_id}" content = LocationMessageEventContent( msgtype=MessageType.LOCATION, geo_uri=f"geo:{loc.lat},{loc.lng}", body=f"Location: {body}\n{url}", external_url=external_url) content["format"] = str(Format.HTML) content["formatted_body"] = f"Location: {body}" await self._add_instagram_reply(content, item.replied_to_message) return await self._send_message(intent, content, timestamp=item.timestamp // 1000) async def handle_instagram_item(self, source: 'u.User', sender: 'p.Puppet', item: ThreadItem, is_backfill: bool = False) -> None: try: await self._handle_instagram_item(source, sender, item, is_backfill) except Exception: self.log.exception("Fatal error handling Instagram item") self.log.trace("Item content: %s", item.serialize()) async def _add_instagram_reply(self, content: MessageEventContent, reply_to: Optional[ThreadItem]) -> None: if not reply_to: return message = await DBMessage.get_by_item_id(reply_to.item_id, self.receiver) if not message: return content.set_reply(message.mxid) if not isinstance(content, TextMessageEventContent): return try: evt = await self.main_intent.get_event(message.mx_room, message.mxid) except (MNotFound, MForbidden): evt = None if not evt: return if evt.type == EventType.ROOM_ENCRYPTED: try: evt = await self.matrix.e2ee.decrypt(evt, wait_session_timeout=0) except SessionNotFound: return if isinstance(evt.content, TextMessageEventContent): evt.content.trim_reply_fallback() content.set_reply(evt) async def _handle_instagram_item(self, source: 'u.User', sender: 'p.Puppet', item: ThreadItem, is_backfill: bool = False) -> None: if not isinstance(item, ThreadItem): # Parsing these items failed, they should have been logged already return elif item.client_context in self._reqid_dedup: self.log.debug(f"Ignoring message {item.item_id} by {item.user_id}" " as it was sent by us (client_context in dedup queue)") elif item.item_id in self._msgid_dedup: self.log.debug(f"Ignoring message {item.item_id} by {item.user_id}" " as it was already handled (message.id in dedup queue)") elif await DBMessage.get_by_item_id(item.item_id, self.receiver) is not None: self.log.debug(f"Ignoring message {item.item_id} by {item.user_id}" " as it was already handled (message.id found in database)") else: self._msgid_dedup.appendleft(item.item_id) if self.backfill_lock.locked and sender.need_backfill_invite(self): self.log.debug("Adding %s's default puppet to room for backfilling", sender.mxid) if self.is_direct: await self.main_intent.invite_user(self.mxid, sender.default_mxid) intent = sender.default_mxid_intent await intent.ensure_joined(self.mxid) self._backfill_leave.add(intent) else: intent = sender.intent_for(self) event_id = None if item.media or item.animated_media or item.voice_media or item.visual_media: event_id = await self._handle_instagram_media(source, intent, item) elif item.location: event_id = await self._handle_instagram_location(intent, item) elif item.reel_share: event_id = await self._handle_instagram_reel_share(source, intent, item) elif item.media_share or item.story_share: event_id = await self._handle_instagram_media_share(source, intent, item) if item.text: event_id = await self._handle_instagram_text(intent, item, item.text) elif item.like: # We handle likes as text because Matrix clients do big emoji on their own. event_id = await self._handle_instagram_text(intent, item, item.like) elif item.link: event_id = await self._handle_instagram_text(intent, item, item.link.text) if event_id: msg = DBMessage(mxid=event_id, mx_room=self.mxid, item_id=item.item_id, receiver=self.receiver, sender=sender.pk) await msg.insert() await self._send_delivery_receipt(event_id) self.log.debug(f"Handled Instagram message {item.item_id} -> {event_id}") if is_backfill and item.reactions: await self._handle_instagram_reactions(msg, item.reactions.emojis) else: self.log.debug(f"Unhandled Instagram message {item.item_id}") async def handle_instagram_remove(self, item_id: str) -> None: message = await DBMessage.get_by_item_id(item_id, self.receiver) if message is None: return await message.delete() sender = await p.Puppet.get_by_pk(message.sender) try: await sender.intent_for(self).redact(self.mxid, message.mxid) except MForbidden: await self.main_intent.redact(self.mxid, message.mxid) self.log.debug(f"Redacted {message.mxid} after Instagram unsend") async def _handle_instagram_reactions(self, message: DBMessage, reactions: List[Reaction] ) -> None: old_reactions: Dict[int, DBReaction] old_reactions = {reaction.ig_sender: reaction for reaction in await DBReaction.get_all_by_item_id(message.item_id, self.receiver)} for new_reaction in reactions: old_reaction = old_reactions.pop(new_reaction.sender_id, None) if old_reaction and old_reaction.reaction == new_reaction.emoji: continue puppet = await p.Puppet.get_by_pk(new_reaction.sender_id) intent = puppet.intent_for(self) reaction_event_id = await intent.react(self.mxid, message.mxid, new_reaction.emoji) await self._upsert_reaction(old_reaction, intent, reaction_event_id, message, puppet, new_reaction.emoji) for old_reaction in old_reactions.values(): await old_reaction.delete() puppet = await p.Puppet.get_by_pk(old_reaction.ig_sender) await puppet.intent_for(self).redact(self.mxid, old_reaction.mxid) async def handle_instagram_update(self, item: MessageSyncMessage) -> None: message = await DBMessage.get_by_item_id(item.item_id, self.receiver) if not message: return if item.has_seen: puppet = await p.Puppet.get_by_pk(item.has_seen, create=False) if puppet: await puppet.intent_for(self).mark_read(self.mxid, message.mxid) else: async with self._reaction_lock: await self._handle_instagram_reactions(message, (item.reactions.emojis if item.reactions else [])) # endregion # region Updating portal info def _get_thread_name(self, thread: Thread) -> str: if self.is_direct: if self.other_user_pk == thread.viewer_id and len(thread.users) == 0: return "Instagram chat with yourself" elif len(thread.users) == 1: tpl = self.config["bridge.private_chat_name_template"] ui = thread.users[0] return tpl.format(displayname=ui.full_name, id=ui.pk, username=ui.username) pass elif thread.thread_title: return self.config["bridge.group_chat_name_template"].format(name=thread.thread_title) else: return "" async def update_info(self, thread: Thread, source: 'u.User') -> None: if self.is_direct and self.other_user_pk == source.igpk and not thread.thread_title: thread.thread_title = "Instagram chat with yourself" changed = await self._update_name(self._get_thread_name(thread)) if changed: await self.update_bridge_info() await self.update() await self._update_participants(thread.users, source) # TODO update power levels with thread.admin_user_ids async def _update_name(self, name: str) -> bool: if self.name != name and name: self.name = name if self.mxid: await self.main_intent.set_room_name(self.mxid, name) return True return False async def _update_participants(self, users: List[ThreadUser], source: 'u.User') -> None: if not self.mxid: return # Make sure puppets who should be here are here for user in users: puppet = await p.Puppet.get_by_pk(user.pk) await puppet.update_info(user, source) await puppet.intent_for(self).ensure_joined(self.mxid) # Kick puppets who shouldn't be here current_members = {int(user.pk) for user in users} for user_id in await self.main_intent.get_room_members(self.mxid): pk = p.Puppet.get_id_from_mxid(user_id) if pk and pk not in current_members and pk != self.other_user_pk: await self.main_intent.kick_user(self.mxid, p.Puppet.get_mxid_from_id(pk), reason="User had left this Instagram DM") # endregion # region Backfilling async def backfill(self, source: 'u.User', is_initial: bool = False) -> None: limit = (self.config["bridge.backfill.initial_limit"] if is_initial else self.config["bridge.backfill.missed_limit"]) if limit == 0: return elif limit < 0: limit = None with self.backfill_lock: await self._backfill(source, is_initial, limit) async def _backfill(self, source: 'u.User', is_initial: bool, limit: int) -> None: self.log.debug("Backfilling history through %s", source.mxid) entries = await self._fetch_backfill_items(source, is_initial, limit) if not entries: self.log.debug("Didn't get any items to backfill from server") return self.log.debug("Got %d entries from server", len(entries)) self._backfill_leave = set() async with NotificationDisabler(self.mxid, source): for entry in reversed(entries): sender = await p.Puppet.get_by_pk(int(entry.user_id)) await self.handle_instagram_item(source, sender, entry, is_backfill=True) for intent in self._backfill_leave: self.log.trace("Leaving room with %s post-backfill", intent.mxid) await intent.leave_room(self.mxid) self._backfill_leave = None self.log.info("Backfilled %d messages through %s", len(entries), source.mxid) async def _fetch_backfill_items(self, source: 'u.User', is_initial: bool, limit: int ) -> List[ThreadItem]: items = [] self.log.debug("Fetching up to %d messages through %s", limit, source.igpk) async for item in source.client.iter_thread(self.thread_id): if len(items) >= limit: self.log.debug(f"Fetched {len(items)} messages (the limit)") break elif not is_initial: msg = await DBMessage.get_by_item_id(item.item_id, receiver=self.receiver) if msg is not None: self.log.debug(f"Fetched {len(items)} messages and hit a message" " that's already in the database.") break items.append(item) return items # endregion # region Bridge info state event @property def bridge_info_state_key(self) -> str: return f"net.maunium.instagram://instagram/{self.thread_id}" @property def bridge_info(self) -> Dict[str, Any]: return { "bridgebot": self.az.bot_mxid, "creator": self.main_intent.mxid, "protocol": { "id": "instagram", "displayname": "Instagram DM", "avatar_url": self.config["appservice.bot_avatar"], }, "channel": { "id": self.thread_id, "displayname": self.name, } } async def update_bridge_info(self) -> None: if not self.mxid: self.log.debug("Not updating bridge info: no Matrix room created") return try: self.log.debug("Updating bridge info...") await self.main_intent.send_state_event(self.mxid, StateBridge, self.bridge_info, self.bridge_info_state_key) # TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec await self.main_intent.send_state_event(self.mxid, StateHalfShotBridge, self.bridge_info, self.bridge_info_state_key) except Exception: self.log.warning("Failed to update bridge info", exc_info=True) # endregion # region Creating Matrix rooms async def create_matrix_room(self, source: 'u.User', info: Thread) -> Optional[RoomID]: if self.mxid: try: await self.update_matrix_room(source, info) except Exception: self.log.exception("Failed to update portal") return self.mxid async with self._create_room_lock: return await self._create_matrix_room(source, info) async def update_matrix_room(self, source: 'u.User', info: Thread, backfill: bool = False ) -> None: await self.main_intent.invite_user(self.mxid, source.mxid, check_cache=True) puppet = await p.Puppet.get_by_custom_mxid(source.mxid) if puppet: did_join = await puppet.intent.ensure_joined(self.mxid) if did_join and self.is_direct: await source.update_direct_chats({self.main_intent.mxid: [self.mxid]}) await self.update_info(info, source) if backfill: last_msg = await DBMessage.get_by_item_id(info.last_permanent_item.item_id, receiver=self.receiver) if last_msg is None: self.log.debug(f"Last permanent item ({info.last_permanent_item.item_id})" " not found in database, starting backfilling") await self.backfill(source, is_initial=False) # TODO # up = DBUserPortal.get(source.fbid, self.fbid, self.fb_receiver) # if not up: # in_community = await source._community_helper.add_room(source._community_id, self.mxid) # DBUserPortal(user=source.fbid, portal=self.fbid, portal_receiver=self.fb_receiver, # in_community=in_community).insert() # elif not up.in_community: # in_community = await source._community_helper.add_room(source._community_id, self.mxid) # up.edit(in_community=in_community) async def _create_matrix_room(self, source: 'u.User', info: Thread) -> Optional[RoomID]: if self.mxid: await self.update_matrix_room(source, info) return self.mxid await self.update_info(info, source) self.log.debug("Creating Matrix room") name: Optional[str] = None initial_state = [{ "type": str(StateBridge), "state_key": self.bridge_info_state_key, "content": self.bridge_info, }, { # TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec "type": str(StateHalfShotBridge), "state_key": self.bridge_info_state_key, "content": self.bridge_info, }] invites = [source.mxid] if self.config["bridge.encryption.default"] and self.matrix.e2ee: self.encrypted = True initial_state.append({ "type": "m.room.encryption", "content": {"algorithm": "m.megolm.v1.aes-sha2"}, }) if self.is_direct: invites.append(self.az.bot_mxid) if self.encrypted or self.private_chat_portal_meta or not self.is_direct: name = self.name if self.config["appservice.community_id"]: initial_state.append({ "type": "m.room.related_groups", "content": {"groups": [self.config["appservice.community_id"]]}, }) # We lock backfill lock here so any messages that come between the room being created # and the initial backfill finishing wouldn't be bridged before the backfill messages. with self.backfill_lock: self.mxid = await self.main_intent.create_room(name=name, is_direct=self.is_direct, initial_state=initial_state, invitees=invites) if not self.mxid: raise Exception("Failed to create room: no mxid returned") if self.encrypted and self.matrix.e2ee and self.is_direct: try: await self.az.intent.ensure_joined(self.mxid) except Exception: self.log.warning("Failed to add bridge bot " f"to new private chat {self.mxid}") await self.update() self.log.debug(f"Matrix room created: {self.mxid}") self.by_mxid[self.mxid] = self await self._update_participants(info.users, source) puppet = await p.Puppet.get_by_custom_mxid(source.mxid) if puppet: try: await puppet.intent.join_room_by_id(self.mxid) if self.is_direct: await source.update_direct_chats({self.main_intent.mxid: [self.mxid]}) except MatrixError: self.log.debug("Failed to join custom puppet into newly created portal", exc_info=True) # TODO # in_community = await source._community_helper.add_room(source._community_id, self.mxid) # DBUserPortal(user=source.fbid, portal=self.fbid, portal_receiver=self.fb_receiver, # in_community=in_community).upsert() try: await self.backfill(source, is_initial=True) except Exception: self.log.exception("Failed to backfill new portal") return self.mxid # endregion # region Database getters async def postinit(self) -> None: self.by_thread_id[(self.thread_id, self.receiver)] = self if self.mxid: self.by_mxid[self.mxid] = self self._main_intent = ((await p.Puppet.get_by_pk(self.other_user_pk)).default_mxid_intent if self.other_user_pk else self.az.intent) async def delete(self) -> None: await DBMessage.delete_all(self.mxid) self.by_mxid.pop(self.mxid, None) self.mxid = None self.encrypted = False await self.update() async def save(self) -> None: await self.update() @classmethod def all_with_room(cls) -> AsyncGenerator['Portal', None]: return cls._db_to_portals(super().all_with_room()) @classmethod def find_private_chats_with(cls, other_user: int) -> AsyncGenerator['Portal', None]: return cls._db_to_portals(super().find_private_chats_with(other_user)) @classmethod async def _db_to_portals(cls, query: Awaitable[List['Portal']] ) -> AsyncGenerator['Portal', None]: portals = await query for index, portal in enumerate(portals): try: yield cls.by_thread_id[(portal.thread_id, portal.receiver)] except KeyError: await portal.postinit() yield portal @classmethod @async_getter_lock async def get_by_mxid(cls, mxid: RoomID) -> Optional['Portal']: try: return cls.by_mxid[mxid] except KeyError: pass portal = cast(cls, await super().get_by_mxid(mxid)) if portal is not None: await portal.postinit() return portal return None @classmethod @async_getter_lock async def get_by_thread_id(cls, thread_id: str, *, receiver: int, is_group: Optional[bool] = None, other_user_pk: Optional[int] = None) -> Optional['Portal']: if is_group and receiver != 0: receiver = 0 try: return cls.by_thread_id[(thread_id, receiver)] except KeyError: pass if is_group is None and receiver != 0: try: return cls.by_thread_id[(thread_id, 0)] except KeyError: pass portal = cast(cls, await super().get_by_thread_id(thread_id, receiver=receiver, rec_must_match=is_group is not None)) if portal is not None: await portal.postinit() return portal if is_group is not None: portal = cls(thread_id, receiver, other_user_pk=other_user_pk) await portal.insert() await portal.postinit() return portal return None @classmethod async def get_by_thread(cls, thread: Thread, receiver: int) -> Optional['Portal']: if thread.is_group: receiver = 0 other_user_pk = None else: if len(thread.users) == 0: other_user_pk = receiver else: other_user_pk = thread.users[0].pk return await cls.get_by_thread_id(thread.thread_id, receiver=receiver, is_group=thread.is_group, other_user_pk=other_user_pk) # endregion