{share_item.caption.user.username}" f" {share_item.caption.text}" f'instagram.com/p/{share_item.code}' ) else: caption_body = external_url caption_formatted_body = ( f'instagram.com/p/{share_item.code}' ) caption = TextMessageEventContent( msgtype=MessageType.TEXT, body=caption_body, formatted_body=caption_formatted_body, format=Format.HTML, external_url=external_url, ) if self.bridge.config["bridge.caption_in_message"]: if isinstance(content, TextMessageEventContent): content.ensure_has_html() prefix.ensure_has_html() caption.ensure_has_html() combined = TextMessageEventContent( msgtype=MessageType.TEXT, body="\n".join((content.body, prefix.body, caption.body)), formatted_body=( f"
{content.formatted_body}
" f"{prefix.formatted_body}
" f"{caption.formatted_body}
" ), format=Format.HTML, external_url=external_url, ) else: prefix.ensure_has_html() caption.ensure_has_html() combined_body = "\n".join((prefix.body, caption.body)) combined_formatted_body = ( f"{prefix.formatted_body}
{caption.formatted_body}
" ) combined = content combined["filename"] = content.body combined.body = combined_body combined["format"] = str(Format.HTML) combined["org.matrix.msc1767.caption"] = { "org.matrix.msc1767.text": combined_body, "org.matrix.msc1767.html": combined_formatted_body, } combined["formatted_body"] = combined_formatted_body event_id = await self._send_message(intent, combined, timestamp=item.timestamp_ms) else: await self._send_message(intent, prefix, timestamp=item.timestamp_ms) event_id = await self._send_message(intent, content, timestamp=item.timestamp_ms) await self._send_message(intent, caption, timestamp=item.timestamp_ms) return event_id async def _handle_instagram_reel_share( self, source: u.User, intent: IntentAPI, item: ThreadItem ) -> EventID | None: assert item.reel_share media = item.reel_share.media prefix_html = None if item.reel_share.type == ReelShareType.REPLY: if item.reel_share.reel_owner_id == source.igpk: prefix = "Replied to your story" else: username = media.user.username prefix = f"Sent @{username}'s story" user_link = f'@{username}' prefix_html = f"Sent {user_link}'s story" elif item.reel_share.type == ReelShareType.REACTION: if item.reel_share.reel_owner_id == source.igpk: prefix = "Reacted to your story" elif item.user_id == source.igpk: prefix = "You reacted to their story" else: prefix = "Reacted to a story" elif item.reel_share.type == ReelShareType.MENTION: if item.reel_share.mentioned_user_id == source.igpk: prefix = "Mentioned you in their story" else: prefix = "You mentioned them in your story" else: self.log.debug(f"Unsupported reel share type {item.reel_share.type}") return None prefix_content = TextMessageEventContent(msgtype=MessageType.NOTICE, body=prefix) if prefix_html: prefix_content.format = Format.HTML prefix_content.formatted_body = prefix_html caption_content = TextMessageEventContent( msgtype=MessageType.TEXT, body=item.reel_share.text ) if not caption_content.body and isinstance(media, MediaShareItem): caption_content.body = media.caption.text if media.caption else "" if not caption_content.body: caption_content.body = "{media_content.formatted_body}
" f"{prefix_content.formatted_body}
" f"{caption_content.formatted_body}
", ), format=Format.HTML, ) else: prefix_content.ensure_has_html() caption_content.ensure_has_html() combined_body = "\n".join((prefix_content.body, caption_content.body)) combined_formatted_body = ( f"{prefix_content.formatted_body}
" f"{caption_content.formatted_body}
" ) combined = media_content combined["filename"] = combined.body combined.body = combined_body combined["format"] = str(Format.HTML) combined["org.matrix.msc1767.caption"] = { "org.matrix.msc1767.text": combined_body, "org.matrix.msc1767.html": combined_formatted_body, } combined["formatted_body"] = combined_formatted_body else: combined = caption_content event_id = await self._send_message(intent, combined, timestamp=item.timestamp_ms) else: await self._send_message(intent, prefix_content, timestamp=item.timestamp_ms) if media_content: media_event_id = await self._send_message( intent, media_content, timestamp=item.timestamp_ms ) await DBMessage( mxid=media_event_id, mx_room=self.mxid, item_id=fake_item_id, client_context=None, receiver=self.receiver, sender=media.user.pk, ig_timestamp=None, ).insert() event_id = await self._send_message( intent, caption_content, timestamp=item.timestamp_ms ) return event_id async def _handle_instagram_link( self, source: u.User, intent: IntentAPI, item: ThreadItem, ) -> EventID: content = TextMessageEventContent(msgtype=MessageType.TEXT, body=item.link.text) link = item.link.link_context preview = { "og:url": link.link_url, "og:title": link.link_title, "og:description": link.link_summary, } if link.link_image_url: reuploaded = await self._reupload_instagram_file( source, link.link_image_url, msgtype=None, info=ImageInfo(), intent=intent ) preview["og:image"] = reuploaded.url preview["og:image:type"] = reuploaded.info.mimetype preview["og:image:width"] = reuploaded.info.width preview["og:image:height"] = reuploaded.info.height preview["matrix:image:size"] = reuploaded.info.size if reuploaded.file: preview["beeper:image:encryption"] = reuploaded.file.serialize() preview = {k: v for k, v in preview.items() if v} content["com.beeper.linkpreviews"] = [preview] if "og:title" in preview else [] await self._add_instagram_reply(content, item.replied_to_message) return await self._send_message(intent, content, timestamp=item.timestamp_ms) async def _handle_instagram_text( self, intent: IntentAPI, item: ThreadItem, text: str ) -> EventID: content = TextMessageEventContent(msgtype=MessageType.TEXT, body=text) content["com.beeper.linkpreviews"] = [] await self._add_instagram_reply(content, item.replied_to_message) return await self._send_message(intent, content, timestamp=item.timestamp_ms) async def _send_instagram_unhandled(self, intent: IntentAPI, item: ThreadItem) -> EventID: content = TextMessageEventContent( msgtype=MessageType.NOTICE, body=f"Unsupported message type {item.item_type.value}" ) await self._add_instagram_reply(content, item.replied_to_message) return await self._send_message(intent, content, timestamp=item.timestamp_ms) async def _handle_instagram_location( self, intent: IntentAPI, item: ThreadItem ) -> EventID | None: loc = item.location if not loc.lng or not loc.lat: # TODO handle somehow return None long_char = "E" if loc.lng > 0 else "W" lat_char = "N" if loc.lat > 0 else "S" body = ( f"{loc.name} - {round(abs(loc.lat), 4)}° {lat_char}, " f"{round(abs(loc.lng), 4)}° {long_char}" ) url = f"https://www.openstreetmap.org/#map=15/{loc.lat}/{loc.lng}" external_url = None if loc.external_source == "facebook_places": external_url = f"https://www.facebook.com/{loc.short_name}-{loc.facebook_places_id}" content = LocationMessageEventContent( msgtype=MessageType.LOCATION, geo_uri=f"geo:{loc.lat},{loc.lng}", body=f"Location: {body}\n{url}", external_url=external_url, ) content["format"] = str(Format.HTML) content["formatted_body"] = f"Location: {body}" await self._add_instagram_reply(content, item.replied_to_message) return await self._send_message(intent, content, timestamp=item.timestamp_ms) async def _handle_instagram_profile( self, intent: IntentAPI, item: ThreadItem ) -> EventID | None: username = item.profile.username user_link = f'@{username}' text = f"Shared @{username}'s profile" html = f"Shared {user_link}'s profile" content = TextMessageEventContent( msgtype=MessageType.TEXT, format=Format.HTML, body=text, formatted_body=html ) await self._add_instagram_reply(content, item.replied_to_message) return await self._send_message(intent, content, timestamp=item.timestamp_ms) async def handle_instagram_item( self, source: u.User, sender: p.Puppet, item: ThreadItem, is_backfill: bool = False ) -> None: try: await self._handle_instagram_item(source, sender, item, is_backfill) except Exception: self.log.exception("Fatal error handling Instagram item") self.log.trace("Item content: %s", item.serialize()) async def _add_instagram_reply( self, content: MessageEventContent, reply_to: ThreadItem | None ) -> None: if not reply_to: return message = await DBMessage.get_by_item_id(reply_to.item_id, self.receiver) if not message: return content.set_reply(message.mxid) if not isinstance(content, TextMessageEventContent): return try: evt = await self.main_intent.get_event(message.mx_room, message.mxid) except (MNotFound, MForbidden): evt = None if not evt: return if evt.type == EventType.ROOM_ENCRYPTED: try: evt = await self.matrix.e2ee.decrypt(evt, wait_session_timeout=0) except SessionNotFound: return if isinstance(evt.content, TextMessageEventContent): evt.content.trim_reply_fallback() content.set_reply(evt) async def _handle_instagram_item( self, source: u.User, sender: p.Puppet, item: ThreadItem, is_backfill: bool = False ) -> None: if not isinstance(item, ThreadItem): # Parsing these items failed, they should have been logged already return client_context = item.client_context link_client_context = item.link.client_context if item.link else None cc = client_context if link_client_context: if not client_context: cc = f"link:{link_client_context}" elif client_context != link_client_context: cc = f"{client_context}/link:{link_client_context}" if client_context and client_context in self._reqid_dedup: self.log.debug( f"Ignoring message {item.item_id} ({cc}) by {item.user_id}" " as it was sent by us (client_context in dedup queue)" ) return elif link_client_context and link_client_context in self._reqid_dedup: self.log.debug( f"Ignoring message {item.item_id} ({cc}) by {item.user_id}" " as it was sent by us (link.client_context in dedup queue)" ) return if item.item_id in self._msgid_dedup: self.log.debug( f"Ignoring message {item.item_id} ({cc}) by {item.user_id}" " as it was already handled (message.id in dedup queue)" ) return self._msgid_dedup.appendleft(item.item_id) if await DBMessage.get_by_item_id(item.item_id, self.receiver) is not None: self.log.debug( f"Ignoring message {item.item_id} ({cc}) by {item.user_id}" " as it was already handled (message.id in database)" ) return self.log.debug(f"Starting handling of message {item.item_id} ({cc}) by {item.user_id}") asyncio.create_task(sender.intent_for(self).set_typing(self.mxid, is_typing=False)) await self._handle_deduplicated_instagram_item(source, sender, item, is_backfill) async def _handle_deduplicated_instagram_item( self, source: u.User, sender: p.Puppet, item: ThreadItem, is_backfill: bool = False ) -> None: if self.backfill_lock.locked and sender.need_backfill_invite(self): self.log.debug("Adding %s's default puppet to room for backfilling", sender.mxid) if self.is_direct: await self.main_intent.invite_user(self.mxid, sender.default_mxid) intent = sender.default_mxid_intent await intent.ensure_joined(self.mxid) self._backfill_leave.add(intent) else: intent = sender.intent_for(self) event_id = None needs_handling = True if item.media or item.animated_media or item.voice_media or item.visual_media: content = await self._convert_instagram_media(source, intent, item) event_id = await self._send_message(intent, content, timestamp=item.timestamp_ms) elif item.location: event_id = await self._handle_instagram_location(intent, item) elif item.profile: event_id = await self._handle_instagram_profile(intent, item) elif item.reel_share: event_id = await self._handle_instagram_reel_share(source, intent, item) elif ( item.media_share or item.direct_media_share or item.story_share or item.clip or item.felix_share ): event_id = await self._handle_instagram_media_share(source, intent, item) elif item.action_log: # These probably don't need to be bridged needs_handling = False self.log.debug(f"Ignoring action log message {item.item_id}") # TODO handle item.clip? if item.text: event_id = await self._handle_instagram_text(intent, item, item.text) elif item.like: # We handle likes as text because Matrix clients do big emoji on their own. event_id = await self._handle_instagram_text(intent, item, item.like) elif item.link: event_id = await self._handle_instagram_link(source, intent, item) handled = bool(event_id) if not event_id and needs_handling: self.log.debug(f"Unhandled Instagram message {item.item_id}") event_id = await self._send_instagram_unhandled(intent, item) cc = item.client_context if not cc and item.link and item.link.client_context: cc = item.link.client_context msg = DBMessage( mxid=event_id, mx_room=self.mxid, item_id=item.item_id, client_context=cc, receiver=self.receiver, sender=sender.pk, ig_timestamp=item.timestamp, ) await msg.insert() await self._send_delivery_receipt(event_id) if handled: self.log.debug(f"Handled Instagram message {item.item_id} -> {event_id}") elif needs_handling: self.log.debug( f"Unhandled Instagram message {item.item_id} " f"(type {item.item_type} -> fallback error {event_id})" ) if is_backfill and item.reactions: await self._handle_instagram_reactions(msg, item.reactions.emojis, is_backfill=True) async def handle_instagram_remove(self, item_id: str) -> None: message = await DBMessage.get_by_item_id(item_id, self.receiver) if message is None: return await message.delete() if message.mxid: sender = await p.Puppet.get_by_pk(message.sender) try: await sender.intent_for(self).redact(self.mxid, message.mxid) except MForbidden: await self.main_intent.redact(self.mxid, message.mxid) self.log.debug(f"Redacted {message.mxid} after Instagram unsend") async def handle_instagram_reaction(self, item: ThreadItem, remove: bool) -> None: sender = await p.Puppet.get_by_pk(item.new_reaction.sender_id) message = await DBMessage.get_by_item_id(item.item_id, self.receiver) if not message: self.log.debug(f"Dropping reaction by {sender.pk} to unknown message {item.item_id}") return emoji = item.new_reaction.emoji async with self._reaction_lock: existing = await DBReaction.get_by_item_id(item.item_id, self.receiver, sender.pk) if not existing and remove: self.log.debug( f"Ignoring duplicate reaction removal by {sender.pk} to {item.item_id}" ) return elif not remove and existing and existing.reaction == emoji: self.log.debug(f"Ignoring duplicate reaction by {sender.pk} to {item.item_id}") return intent = sender.intent_for(self) if remove: await existing.delete() await intent.redact(self.mxid, existing.mxid) self.log.debug( f"Removed {sender.pk}'s reaction to {item.item_id} (redacted {existing.mxid})" ) else: timestamp = item.new_reaction.timestamp_ms reaction_event_id = await intent.react( self.mxid, message.mxid, key=emoji, timestamp=timestamp ) await self._upsert_reaction( existing, intent, reaction_event_id, message, sender, emoji, timestamp ) self.log.debug( f"Handled {sender.pk}'s reaction to {item.item_id} -> {reaction_event_id}" ) async def _handle_instagram_reactions( self, message: DBMessage, reactions: list[Reaction], is_backfill: bool = False ) -> None: old_reactions: dict[int, DBReaction] old_reactions = { reaction.ig_sender: reaction for reaction in await DBReaction.get_all_by_item_id(message.item_id, self.receiver) } timestamp_deduplicator = 1 for new_reaction in reactions: old_reaction = old_reactions.pop(new_reaction.sender_id, None) if old_reaction and old_reaction.reaction == new_reaction.emoji: continue puppet = await p.Puppet.get_by_pk(new_reaction.sender_id) intent = puppet.intent_for(self) timestamp = new_reaction.timestamp_ms if is_backfill else int(time.time() * 1000) if is_backfill: timestamp += timestamp_deduplicator timestamp_deduplicator += 1 reaction_event_id = await intent.react( self.mxid, message.mxid, new_reaction.emoji, timestamp=timestamp ) await self._upsert_reaction( old_reaction, intent, reaction_event_id, message, puppet, new_reaction.emoji, timestamp, ) for old_reaction in old_reactions.values(): await old_reaction.delete() puppet = await p.Puppet.get_by_pk(old_reaction.ig_sender) await puppet.intent_for(self).redact(self.mxid, old_reaction.mxid) async def handle_instagram_update(self, item: MessageSyncMessage) -> None: message = await DBMessage.get_by_item_id(item.item_id, self.receiver) if not message: return if item.has_seen: puppet = await p.Puppet.get_by_pk(item.has_seen, create=False) if puppet: await puppet.intent_for(self).mark_read(self.mxid, message.mxid) else: async with self._reaction_lock: await self._handle_instagram_reactions( message, (item.reactions.emojis if item.reactions else []) ) # endregion # region Updating portal info def _get_thread_name(self, thread: Thread) -> str: if self.is_direct: if self.other_user_pk == thread.viewer_id and len(thread.users) == 0: return "Instagram chat with yourself" elif len(thread.users) == 1: tpl = self.config["bridge.private_chat_name_template"] ui = thread.users[0] return tpl.format(displayname=ui.full_name, id=ui.pk, username=ui.username) pass elif thread.thread_title: return self.config["bridge.group_chat_name_template"].format(name=thread.thread_title) else: return "" async def update_info(self, thread: Thread, source: u.User) -> None: changed = await self._update_name(self._get_thread_name(thread)) changed = await self._update_participants(thread.users, source) or changed if changed: await self.update_bridge_info() await self.update() # TODO update power levels with thread.admin_user_ids async def update_info_from_puppet(self, puppet: p.Puppet | None = None) -> None: if not self.is_direct: return if not puppet: puppet = await self.get_dm_puppet() await self._update_photo_from_puppet(puppet) if self.name and not self.name_set: await self._update_name(self.name) async def _update_name(self, name: str) -> bool: if name and (self.name != name or not self.name_set): self.name = name if self.mxid: try: await self.main_intent.set_room_name(self.mxid, name) self.name_set = True except Exception: self.log.exception("Failed to update name") self.name_set = False return True return False async def _update_photo_from_puppet(self, puppet: p.Puppet) -> bool: if not self.private_chat_portal_meta and not self.encrypted: return False if self.avatar_set and self.avatar_url == puppet.photo_mxc: return False self.avatar_url = puppet.photo_mxc if self.mxid: try: await self.main_intent.set_room_avatar(self.mxid, puppet.photo_mxc) self.avatar_set = True except Exception: self.log.exception("Failed to set room avatar") self.avatar_set = False return True async def _update_participants(self, users: list[ThreadUser], source: u.User) -> bool: meta_changed = False # Make sure puppets who should be here are here for user in users: puppet = await p.Puppet.get_by_pk(user.pk) await puppet.update_info(user, source) if self.mxid: await puppet.intent_for(self).ensure_joined(self.mxid) if puppet.pk == self.other_user_pk: meta_changed = await self._update_photo_from_puppet(puppet) if self.mxid: # Kick puppets who shouldn't be here current_members = {int(user.pk) for user in users} for user_id in await self.main_intent.get_room_members(self.mxid): pk = p.Puppet.get_id_from_mxid(user_id) if pk and pk not in current_members and pk != self.other_user_pk: await self.main_intent.kick_user( self.mxid, p.Puppet.get_mxid_from_id(pk), reason="User had left this Instagram DM", ) return meta_changed async def _update_read_receipts(self, receipts: dict[int | str, ThreadUserLastSeenAt]) -> None: for user_id, receipt in receipts.items(): message: DBMessage | DBReaction message = await DBMessage.get_by_item_id(receipt.item_id, self.receiver) if not message: reaction: DBReaction message, reaction = await asyncio.gather( DBMessage.get_closest(self.mxid, int(receipt.timestamp)), DBReaction.get_closest(self.mxid, receipt.timestamp_ms), ) if (not message or not message.mxid) and not reaction: self.log.debug( "Couldn't find message %s to mark as read by %s", receipt, user_id ) continue elif not message or (reaction and reaction.mx_timestamp > message.ig_timestamp_ms): message = reaction puppet = await p.Puppet.get_by_pk(int(user_id), create=False) if not puppet: continue try: await puppet.intent_for(self).mark_read(message.mx_room, message.mxid) except Exception: self.log.warning( f"Failed to mark {message.mxid} in {message.mx_room} " f"as read by {puppet.intent.mxid}", exc_info=True, ) async def get_dm_puppet(self) -> p.Puppet | None: if not self.is_direct: return None return await p.Puppet.get_by_pk(self.other_user_pk) # endregion # region Backfilling async def backfill(self, source: u.User, thread: Thread, is_initial: bool = False) -> None: limit = ( self.config["bridge.backfill.initial_limit"] if is_initial else self.config["bridge.backfill.missed_limit"] ) if limit == 0: return elif limit < 0: limit = None with self.backfill_lock: await self._backfill(source, thread, is_initial, limit) async def _backfill( self, source: u.User, thread: Thread, is_initial: bool, limit: int ) -> None: self.log.debug("Backfilling history through %s", source.mxid) entries = await self._fetch_backfill_items(source, thread, is_initial, limit) if not entries: self.log.debug("Didn't get any items to backfill from server") return self.log.debug("Got %d entries from server", len(entries)) self._backfill_leave = set() async with NotificationDisabler(self.mxid, source): for entry in reversed(entries): sender = await p.Puppet.get_by_pk(int(entry.user_id)) await self.handle_instagram_item(source, sender, entry, is_backfill=True) for intent in self._backfill_leave: self.log.trace("Leaving room with %s post-backfill", intent.mxid) await intent.leave_room(self.mxid) self._backfill_leave = None self.log.info("Backfilled %d messages through %s", len(entries), source.mxid) async def _fetch_backfill_items( self, source: u.User, thread: Thread, is_initial: bool, limit: int ) -> list[ThreadItem]: items = [] excluded_count = 0 self.log.debug("Fetching up to %d messages through %s", limit, source.igpk) async for item in source.client.iter_thread(self.thread_id, start_at=thread): if len(items) - excluded_count >= limit: self.log.debug(f"Fetched {len(items)} messages (the limit)") break elif not is_initial: msg = await DBMessage.get_by_item_id(item.item_id, receiver=self.receiver) if msg is not None: self.log.debug( f"Fetched {len(items)} messages and hit a message" " that's already in the database." ) break elif not item.is_handleable: self.log.debug( f"Not counting {item.unhandleable_type} item {item.item_id}" " against backfill limit" ) excluded_count += 1 items.append(item) return items # endregion # region Bridge info state event @property def bridge_info_state_key(self) -> str: return f"net.maunium.instagram://instagram/{self.thread_id}" @property def bridge_info(self) -> dict[str, Any]: return { "bridgebot": self.az.bot_mxid, "creator": self.main_intent.mxid, "protocol": { "id": "instagram", "displayname": "Instagram DM", "avatar_url": self.config["appservice.bot_avatar"], }, "channel": { "id": self.thread_id, "displayname": self.name, "avatar_url": self.avatar_url, }, } async def update_bridge_info(self) -> None: if not self.mxid: self.log.debug("Not updating bridge info: no Matrix room created") return try: self.log.debug("Updating bridge info...") await self.main_intent.send_state_event( self.mxid, StateBridge, self.bridge_info, self.bridge_info_state_key ) # TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec await self.main_intent.send_state_event( self.mxid, StateHalfShotBridge, self.bridge_info, self.bridge_info_state_key ) except Exception: self.log.warning("Failed to update bridge info", exc_info=True) # endregion # region Creating Matrix rooms async def create_matrix_room(self, source: u.User, info: Thread) -> RoomID | None: if self.mxid: try: await self.update_matrix_room(source, info) except Exception: self.log.exception("Failed to update portal") return self.mxid async with self._create_room_lock: return await self._create_matrix_room(source, info) def _get_invite_content(self, double_puppet: p.Puppet | None) -> dict[str, bool]: invite_content = {} if double_puppet: invite_content["fi.mau.will_auto_accept"] = True if self.is_direct: invite_content["is_direct"] = True return invite_content async def update_matrix_room( self, source: u.User, info: Thread, backfill: bool = False ) -> None: puppet = await p.Puppet.get_by_custom_mxid(source.mxid) await self.main_intent.invite_user( self.mxid, source.mxid, check_cache=True, extra_content=self._get_invite_content(puppet), ) if puppet: did_join = await puppet.intent.ensure_joined(self.mxid) if did_join and self.is_direct: await source.update_direct_chats({self.main_intent.mxid: [self.mxid]}) await self.update_info(info, source) if backfill: last_msg = await DBMessage.get_by_item_id( info.last_permanent_item.item_id, receiver=self.receiver ) if last_msg is None: self.log.debug( f"Last permanent item ({info.last_permanent_item.item_id})" " not found in database, starting backfilling" ) await self.backfill(source, thread=info, is_initial=False) await self._update_read_receipts(info.last_seen_at) async def _create_matrix_room(self, source: u.User, info: Thread) -> RoomID | None: if self.mxid: await self.update_matrix_room(source, info) return self.mxid await self.update_info(info, source) self.log.debug("Creating Matrix room") name: str | None = None initial_state = [ { "type": str(StateBridge), "state_key": self.bridge_info_state_key, "content": self.bridge_info, }, # TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec { "type": str(StateHalfShotBridge), "state_key": self.bridge_info_state_key, "content": self.bridge_info, }, ] invites = [] if self.config["bridge.encryption.default"] and self.matrix.e2ee: self.encrypted = True initial_state.append( { "type": "m.room.encryption", "content": self.get_encryption_state_event_json(), } ) if self.is_direct: invites.append(self.az.bot_mxid) if self.encrypted or self.private_chat_portal_meta or not self.is_direct: name = self.name # We lock backfill lock here so any messages that come between the room being created # and the initial backfill finishing wouldn't be bridged before the backfill messages. with self.backfill_lock: creation_content = {} if not self.config["bridge.federate_rooms"]: creation_content["m.federate"] = False self.mxid = await self.main_intent.create_room( name=name, is_direct=self.is_direct, initial_state=initial_state, invitees=invites, creation_content=creation_content, ) if not self.mxid: raise Exception("Failed to create room: no mxid returned") if self.encrypted and self.matrix.e2ee and self.is_direct: try: await self.az.intent.ensure_joined(self.mxid) except Exception: self.log.warning(f"Failed to add bridge bot to new private chat {self.mxid}") await self.update() self.log.debug(f"Matrix room created: {self.mxid}") self.by_mxid[self.mxid] = self puppet = await p.Puppet.get_by_custom_mxid(source.mxid) await self.main_intent.invite_user( self.mxid, source.mxid, extra_content=self._get_invite_content(puppet) ) if puppet: try: if self.is_direct: await source.update_direct_chats({self.main_intent.mxid: [self.mxid]}) await puppet.intent.join_room_by_id(self.mxid) except MatrixError: self.log.debug( "Failed to join custom puppet into newly created portal", exc_info=True ) await self._update_participants(info.users, source) try: await self.backfill(source, thread=info, is_initial=True) except Exception: self.log.exception("Failed to backfill new portal") await self._update_read_receipts(info.last_seen_at) return self.mxid # endregion # region Database getters async def postinit(self) -> None: self.by_thread_id[(self.thread_id, self.receiver)] = self if self.mxid: self.by_mxid[self.mxid] = self self._main_intent = ( (await p.Puppet.get_by_pk(self.other_user_pk)).default_mxid_intent if self.other_user_pk else self.az.intent ) async def delete(self) -> None: await DBMessage.delete_all(self.mxid) self.by_mxid.pop(self.mxid, None) self.mxid = None self.encrypted = False await self.update() async def save(self) -> None: await self.update() @classmethod def all_with_room(cls) -> AsyncGenerator[Portal, None]: return cls._db_to_portals(super().all_with_room()) @classmethod def find_private_chats_with(cls, other_user: int) -> AsyncGenerator[Portal, None]: return cls._db_to_portals(super().find_private_chats_with(other_user)) @classmethod async def find_private_chat(cls, receiver: int, other_user: int) -> Portal | None: thread_id = await super().find_private_chat_id(receiver, other_user) if not thread_id: return None return await cls.get_by_thread_id(thread_id, receiver=receiver, is_group=False) @classmethod async def _db_to_portals(cls, query: Awaitable[list[Portal]]) -> AsyncGenerator[Portal, None]: portals = await query for index, portal in enumerate(portals): try: yield cls.by_thread_id[(portal.thread_id, portal.receiver)] except KeyError: await portal.postinit() yield portal @classmethod @async_getter_lock async def get_by_mxid(cls, mxid: RoomID) -> Portal | None: try: return cls.by_mxid[mxid] except KeyError: pass portal = cast(cls, await super().get_by_mxid(mxid)) if portal is not None: await portal.postinit() return portal return None @classmethod @async_getter_lock async def get_by_thread_id( cls, thread_id: str, *, receiver: int, is_group: bool | None = None, other_user_pk: int | None = None, ) -> Portal | None: if is_group and receiver != 0: receiver = 0 try: return cls.by_thread_id[(thread_id, receiver)] except KeyError: pass if is_group is None and receiver != 0: try: return cls.by_thread_id[(thread_id, 0)] except KeyError: pass portal = cast( cls, await super().get_by_thread_id( thread_id, receiver=receiver, rec_must_match=is_group is not None ), ) if portal is not None: await portal.postinit() return portal if is_group is not None: portal = cls(thread_id, receiver, other_user_pk=other_user_pk) await portal.insert() await portal.postinit() return portal return None @classmethod async def get_by_thread(cls, thread: Thread, receiver: int) -> Portal | None: if thread.is_group: receiver = 0 other_user_pk = None else: if len(thread.users) == 0: other_user_pk = receiver else: other_user_pk = thread.users[0].pk return await cls.get_by_thread_id( thread.thread_id, receiver=receiver, is_group=thread.is_group, other_user_pk=other_user_pk, ) # endregion