# mautrix-signal - A Matrix-Signal puppeting bridge # Copyright (C) 2022 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from __future__ import annotations from typing import Awaitable import asyncio import base64 import json from mausignald.errors import UnknownIdentityKey, UnregisteredUserError from mausignald.types import Address, GroupID, TrustLevel from mautrix.appservice import IntentAPI from mautrix.bridge.commands import SECTION_ADMIN, HelpSection, command_handler from mautrix.types import ( ContentURI, EventID, EventType, JoinRule, PowerLevelStateEventContent, RoomID, ) from .. import portal as po, puppet as pu from ..util import normalize_number, user_has_power_level from .auth import make_qr from .typehint import CommandEvent try: import PIL as _ import qrcode except ImportError: qrcode = None SECTION_SIGNAL = HelpSection("Signal actions", 20, "") async def _get_puppet_from_cmd(evt: CommandEvent) -> pu.Puppet | None: try: phone = normalize_number("".join(evt.args)) except Exception: await evt.reply( f"**Usage:** `$cmdprefix+sp {evt.command} ` " "(enter phone number in international format)" ) return None puppet: pu.Puppet = await pu.Puppet.get_by_number(phone) if not puppet: if not evt.sender.username: await evt.reply("UUID of user not known") return None try: uuid = await evt.bridge.signal.find_uuid(evt.sender.username, puppet.number) except UnregisteredUserError: await evt.reply("User not registered") return None if uuid: puppet = await pu.Puppet.get_by_uuid(uuid) else: await evt.reply("UUID of user not found") return None return puppet def _format_safety_number(number: str) -> str: line_size = 20 chunk_size = 5 return "\n".join( " ".join( [ number[chunk : chunk + chunk_size] for chunk in range(line, line + line_size, chunk_size) ] ) for line in range(0, len(number), line_size) ) def _pill(puppet: "pu.Puppet") -> str: return f"[{puppet.name}](https://matrix.to/#/{puppet.mxid})" @command_handler( needs_auth=True, management_only=False, help_section=SECTION_SIGNAL, help_text="Open a private chat portal with a specific phone number", help_args="<_phone_>", ) async def pm(evt: CommandEvent) -> None: puppet = await _get_puppet_from_cmd(evt) if not puppet: return portal = await po.Portal.get_by_chat_id( puppet.address, receiver=evt.sender.username, create=True ) if portal.mxid: await evt.reply( f"You already have a private chat with {puppet.name}: " f"[{portal.mxid}](https://matrix.to/#/{portal.mxid})" ) await portal.main_intent.invite_user(portal.mxid, evt.sender.mxid) return await portal.create_matrix_room(evt.sender, puppet.address) await evt.reply(f"Created a portal room with {_pill(puppet)} and invited you to it") @command_handler( needs_auth=True, management_only=False, help_section=SECTION_SIGNAL, help_text="Join a Signal group with an invite link", help_args="<_link_>", ) async def join(evt: CommandEvent) -> EventID: if len(evt.args) == 0: return await evt.reply("**Usage:** `$cmdprefix+sp join `") try: resp = await evt.bridge.signal.join_group(evt.sender.username, evt.args[0]) if resp.pending_admin_approval: return await evt.reply( f"Successfully requested to join {resp.title}, waiting for admin approval." ) else: return await evt.reply(f"Successfully joined {resp.title}") except Exception: evt.log.exception("Error trying to join group") await evt.reply("Failed to join group (see logs for more details)") @command_handler( needs_auth=True, management_only=False, help_section=SECTION_SIGNAL, help_text="Get the invite link to the current group", ) async def invite_link(evt: CommandEvent) -> EventID: if not evt.is_portal: return await evt.reply("This is not a portal room.") group = await evt.bridge.signal.get_group( evt.sender.username, evt.portal.chat_id, evt.portal.revision ) if not group: await evt.reply("Failed to get group info") elif not group.invite_link: await evt.reply("Invite link not available") else: await evt.reply(group.invite_link) @command_handler( needs_auth=True, management_only=False, help_section=SECTION_SIGNAL, help_text="View the safety number of a specific user", help_args="[--qr] [_phone_]", ) async def safety_number(evt: CommandEvent) -> None: show_qr = evt.args and evt.args[0].lower() == "--qr" if show_qr: if not qrcode: await evt.reply("Can't generate QR code: qrcode and/or PIL not installed") return evt.args = evt.args[1:] if len(evt.args) == 0 and evt.portal and evt.portal.is_direct: puppet = await pu.Puppet.get_by_uuid(evt.portal.chat_id.uuid) else: puppet = await _get_puppet_from_cmd(evt) if not puppet: return resp = await evt.bridge.signal.get_identities(evt.sender.username, puppet.address) if not resp.identities: await evt.reply(f"No identities found for {_pill(puppet)}") return most_recent = resp.identities[0] for identity in resp.identities: if identity.added > most_recent.added: most_recent = identity uuid = resp.address.uuid or "unknown" await evt.reply( f"### {puppet.name}\n\n" f"**UUID:** {uuid} \n" f"**Trust level:** {most_recent.trust_level} \n" f"**Safety number:**\n" f"```\n{_format_safety_number(most_recent.safety_number)}\n```" ) if show_qr and most_recent.qr_code_data: data = base64.b64decode(most_recent.qr_code_data) content = await make_qr(evt.main_intent, data, "verification-qr.png") await evt.main_intent.send_message(evt.room_id, content) @command_handler( needs_auth=True, management_only=False, help_section=SECTION_SIGNAL, help_text="Set your Signal profile name", help_args="<_name_>", ) async def set_profile_name(evt: CommandEvent) -> None: await evt.bridge.signal.set_profile(evt.sender.username, name=" ".join(evt.args)) await evt.reply("Successfully updated profile name") _trust_levels = [x.value for x in TrustLevel] @command_handler( needs_auth=True, management_only=False, help_section=SECTION_SIGNAL, help_text="Mark another user's safety number as trusted", help_args="<_recipient phone_> [_level_] <_safety number_>", ) async def mark_trusted(evt: CommandEvent) -> EventID: if len(evt.args) < 2: return await evt.reply( "**Usage:** `$cmdprefix+sp mark-trusted [level] `" ) number = normalize_number(evt.args[0]) remaining_args = evt.args[1:] trust_level = TrustLevel.TRUSTED_VERIFIED if len(evt.args) > 2 and evt.args[1].upper() in _trust_levels: trust_level = TrustLevel(evt.args[1]) remaining_args = evt.args[2:] safety_num = "".join(remaining_args).replace("\n", "") if len(safety_num) != 60 or not safety_num.isdecimal(): return await evt.reply("That doesn't look like a valid safety number") try: await evt.bridge.signal.trust( evt.sender.username, Address(number=number), safety_number=safety_num, trust_level=trust_level, ) except UnknownIdentityKey as e: return await evt.reply(f"Failed to mark {number} as {trust_level.human_str}: {e}") return await evt.reply(f"Successfully marked {number} as {trust_level.human_str}") @command_handler( needs_admin=False, needs_auth=True, help_section=SECTION_SIGNAL, help_text="Sync data from Signal", ) async def sync(evt: CommandEvent) -> None: await evt.sender.sync() await evt.reply("Sync complete") @command_handler( needs_admin=True, needs_auth=False, help_section=SECTION_ADMIN, help_text="Send raw requests to signald", help_args="[--user] <_json_>", ) async def raw(evt: CommandEvent) -> None: add_username = False while True: flag = evt.args[0].lower() if flag == "--user": add_username = True else: break evt.args = evt.args[1:] type = evt.args[0] version = "v0" if "." in type: version, type = type.split(".", 1) try: args = json.loads(" ".join(evt.args[1:])) except json.JSONDecodeError as e: await evt.reply(f"JSON decode error: {e}") return if add_username: if version == "v0" or (version == "v1" and type in ("send", "react")): args["username"] = evt.sender.username else: args["account"] = evt.sender.username if version: args["version"] = version try: resp_type, resp_data = await evt.bridge.signal._raw_request(type, **args) except Exception as e: await evt.reply(f"Error sending request: {e}") else: if resp_data is None: await evt.reply(f"Got reply `{resp_type}` with no content") else: await evt.reply( f"Got reply `{resp_type}`:\n\n```json\n{json.dumps(resp_data, indent=2)}\n```" ) missing_power_warning = ( "Warning: The bridge bot ([{bot_mxid}](https://matrix.to/#/{bot_mxid})) does not have " "sufficient privileges to change power levels on Matrix. Power level changes will not be " "bridged." ) low_power_warning = ( "Warning: The bridge bot ([{bot_mxid}](https://matrix.to/#/{bot_mxid})) has a power level " "below or equal to 50. Bridged moderator rights are currently hardcoded to PL 50, so the " "bridge bot must have a higher level to properly bridge them." ) meta_power_warning = ( "Warning: Permissions for changing name, topic and avatar cannot be set separately on Signal. " "Changes to those may not be bridged properly, unless the permissions are set to the same " "level or lower than state_default." ) @command_handler( needs_auth=True, management_only=False, help_section=SECTION_SIGNAL, help_text="Create a Signal group for the current Matrix room.", ) async def create(evt: CommandEvent) -> EventID: if evt.portal: return await evt.reply("This is already a portal room.") title, about, levels, encrypted, avatar_url, join_rule = await get_initial_state( evt.az.intent, evt.room_id ) portal = po.Portal( chat_id=GroupID(""), mxid=evt.room_id, name=title, topic=about or "", encrypted=encrypted, receiver="", avatar_url=avatar_url, ) await warn_missing_power(levels, evt) await portal.create_signal_group(evt.sender, levels, join_rule) await evt.reply(f"Signal chat created. ID: {portal.chat_id}") @command_handler( name="id", needs_auth=True, management_only=False, help_section=SECTION_SIGNAL, help_text="Get the ID of the Signal chat where this room is bridged.", ) async def get_id(evt: CommandEvent) -> EventID: if evt.portal: return await evt.reply(f"This room is bridged to Signal chat ID `{evt.portal.chat_id}`.") await evt.reply("This is not a portal room.") @command_handler( needs_auth=True, management_only=False, help_section=SECTION_SIGNAL, help_text="Bridge the current Matrix room to the Signal chat with the given ID.", help_args=" [matrix room ID]", ) async def bridge(evt: CommandEvent) -> EventID: if len(evt.args) == 0: return await evt.reply( "**Usage:** `$cmdprefix+sp bridge [matrix room ID]`" ) room_id = RoomID(evt.args[1]) if len(evt.args) > 1 else evt.room_id that_this = "This" if room_id == evt.room_id else "That" portal = await po.Portal.get_by_mxid(room_id) if portal: return await evt.reply(f"{that_this} room is already a portal room.") if not await user_has_power_level(room_id, evt.az.intent, evt.sender, "bridge"): return await evt.reply(f"You do not have the permissions to bridge {that_this} room.") portal = await po.Portal.get_by_chat_id(GroupID(evt.args[0]), create=True) if portal.mxid: has_portal_message = ( "That Signal chat already has a portal at " f"[{portal.mxid}](https://matrix.to/#/{portal.mxid}). " ) if not await user_has_power_level(portal.mxid, evt.az.intent, evt.sender, "unbridge"): return await evt.reply( f"{has_portal_message}" "Additionally, you do not have the permissions to unbridge that room." ) evt.sender.command_status = { "next": confirm_bridge, "action": "Room bridging", "mxid": portal.mxid, "bridge_to_mxid": room_id, "chat_id": portal.chat_id, } return await evt.reply( f"{has_portal_message}" "However, you have the permissions to unbridge that room.\n\n" "To delete that portal completely and continue bridging, use " "`$cmdprefix+sp delete-and-continue`. To unbridge the portal " "without kicking Matrix users, use `$cmdprefix+sp unbridge-and-" "continue`. To cancel, use `$cmdprefix+sp cancel`" ) evt.sender.command_status = { "next": confirm_bridge, "action": "Room bridging", "bridge_to_mxid": room_id, "chat_id": portal.chat_id, } return await evt.reply( "That Signal chat has no existing portal. To confirm bridging the " "chat to this room, use `$cmdprefix+sp continue`" ) async def cleanup_old_portal_while_bridging( evt: CommandEvent, portal: po.Portal ) -> tuple[bool, Awaitable[None] | None]: if not portal.mxid: await evt.reply( "The portal seems to have lost its Matrix room between you" "calling `$cmdprefix+sp bridge` and this command.\n\n" "Continuing without touching previous Matrix room..." ) return True, None elif evt.args[0] == "delete-and-continue": return True, portal.cleanup_portal("Portal deleted (moving to another room)") elif evt.args[0] == "unbridge-and-continue": return True, portal.cleanup_portal( "Room unbridged (portal moving to another room)", puppets_only=True ) else: await evt.reply( "The chat you were trying to bridge already has a Matrix portal room.\n\n" "Please use `$cmdprefix+sp delete-and-continue` or `$cmdprefix+sp unbridge-and-" "continue` to either delete or unbridge the existing room (respectively) and " "continue with the bridging.\n\n" "If you changed your mind, use `$cmdprefix+sp cancel` to cancel." ) return False, None async def confirm_bridge(evt: CommandEvent) -> EventID | None: status = evt.sender.command_status try: portal = await po.Portal.get_by_chat_id(status["chat_id"]) bridge_to_mxid = status["bridge_to_mxid"] except KeyError: evt.sender.command_status = None return await evt.reply( "Fatal error: chat_id missing from command_status. " "This shouldn't happen unless you're messing with the command handler code." ) is_logged_in = await evt.sender.is_logged_in() if "mxid" in status: ok, coro = await cleanup_old_portal_while_bridging(evt, portal) if not ok: return None elif coro: await evt.reply("Cleaning up previous portal room...") await coro elif portal.mxid: evt.sender.command_status = None return await evt.reply( "The portal seems to have created a Matrix room between you " "calling `$cmdprefix+sp bridge` and this command.\n\n" "Please start over by calling the bridge command again." ) elif evt.args[0] != "continue": return await evt.reply( "Please use `$cmdprefix+sp continue` to confirm the bridging or " "`$cmdprefix+sp cancel` to cancel." ) evt.sender.command_status = None async with portal._create_room_lock: await _locked_confirm_bridge( evt, portal=portal, room_id=bridge_to_mxid, is_logged_in=is_logged_in ) async def _locked_confirm_bridge( evt: CommandEvent, portal: po.Portal, room_id: RoomID, is_logged_in: bool ) -> EventID | None: try: group = await evt.bridge.signal.get_group( evt.sender.username, portal.chat_id, portal.revision ) except Exception: evt.log.exception("Failed to get_group(%s) for manual bridging.", portal.chat_id) if is_logged_in: return await evt.reply( "Failed to get info of signal chat. You are logged in, are you in that chat?" ) else: return await evt.reply( "Failed to get info of signal chat. " "You're not logged in, this should not happen." ) portal.mxid = room_id portal.by_mxid[portal.mxid] = portal ( portal.title, portal.about, levels, portal.encrypted, portal.photo_id, ) = await get_initial_state(evt.az.intent, evt.room_id) await portal.save() await portal.update_bridge_info() asyncio.create_task(portal.update_matrix_room(evt.sender, group)) await warn_missing_power(levels, evt) return await evt.reply("Bridging complete. Portal synchronization should begin momentarily.") async def get_initial_state( intent: IntentAPI, room_id: RoomID ) -> tuple[ str | None, str | None, PowerLevelStateEventContent | None, bool, ContentURI | None, JoinRule | None, ]: state = await intent.get_state(room_id) title: str | None = None about: str | None = None levels: PowerLevelStateEventContent | None = None encrypted: bool = False avatar_url: ContentURI | None = None join_rule: JoinRule | None = None for event in state: try: if event.type == EventType.ROOM_NAME: title = event.content.name elif event.type == EventType.ROOM_TOPIC: about = event.content.topic elif event.type == EventType.ROOM_POWER_LEVELS: levels = event.content elif event.type == EventType.ROOM_CANONICAL_ALIAS: title = title or event.content.canonical_alias elif event.type == EventType.ROOM_ENCRYPTION: encrypted = True elif event.type == EventType.ROOM_AVATAR: avatar_url = event.content.url elif event.type == EventType.ROOM_JOIN_RULES: join_rule = event.content.join_rule except KeyError: # Some state event probably has empty content pass return title, about, levels, encrypted, avatar_url, join_rule async def warn_missing_power(levels: PowerLevelStateEventContent, evt: CommandEvent) -> None: bot_pl = levels.get_user_level(evt.az.bot_mxid) if bot_pl < levels.get_event_level(EventType.ROOM_POWER_LEVELS): await evt.reply(missing_power_warning.format(bot_mxid=evt.az.bot_mxid)) elif bot_pl <= 50: await evt.reply(low_power_warning.format(bot_mxid=evt.az.bot_mxid)) if levels.state_default < 50 and ( levels.events[EventType.ROOM_NAME] >= 50 or levels.events[EventType.ROOM_AVATAR] >= 50 or levels.events[EventType.ROOM_TOPIC] >= 50 ): await evt.reply(meta_power_warning)