# mautrix-signal - A Matrix-Signal puppeting bridge # Copyright (C) 2022 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from __future__ import annotations from typing import Awaitable import base64 import json from mausignald.errors import UnknownIdentityKey, UnregisteredUserError from mausignald.types import Address, GroupID, TrustLevel from mautrix.bridge import RejectMatrixInvite from mautrix.bridge.commands import SECTION_ADMIN, HelpSection, command_handler from mautrix.types import ( ContentURI, EventID, EventType, JoinRule, PowerLevelStateEventContent, RoomID, ) from mautrix.util import background_task from .. import portal as po, puppet as pu from ..util import normalize_number, user_has_power_level from .auth import make_qr from .typehint import CommandEvent from .util import get_initial_state try: import PIL as _ import qrcode except ImportError: qrcode = None SECTION_SIGNAL = HelpSection("Signal actions", 20, "") async def _get_puppet_from_cmd(evt: CommandEvent) -> pu.Puppet | None: try: phone = normalize_number("".join(evt.args)) except Exception: await evt.reply( f"**Usage:** `$cmdprefix+sp {evt.command} ` " "(enter phone number in international format)" ) return None puppet: pu.Puppet = await pu.Puppet.get_by_number(phone) if not puppet: if not evt.sender.username: await evt.reply("UUID of user not known") return None try: uuid = await evt.bridge.signal.find_uuid(evt.sender.username, phone) except UnregisteredUserError: await evt.reply("User not registered") return None if uuid: puppet = await pu.Puppet.get_by_uuid(uuid) else: await evt.reply("UUID of user not found") return None return puppet def _format_safety_number(number: str) -> str: line_size = 20 chunk_size = 5 return "\n".join( " ".join( [ number[chunk : chunk + chunk_size] for chunk in range(line, line + line_size, chunk_size) ] ) for line in range(0, len(number), line_size) ) def _pill(puppet: "pu.Puppet") -> str: return f"[{puppet.name}](https://matrix.to/#/{puppet.mxid})" @command_handler( needs_auth=True, management_only=False, help_section=SECTION_SIGNAL, help_text="Open a private chat portal with a specific phone number", help_args="<_phone_>", ) async def pm(evt: CommandEvent) -> None: puppet = await _get_puppet_from_cmd(evt) if not puppet: return portal = await po.Portal.get_by_chat_id(puppet.uuid, receiver=evt.sender.username, create=True) if portal.mxid: await evt.reply( f"You already have a private chat with {puppet.name}: " f"[{portal.mxid}](https://matrix.to/#/{portal.mxid})" ) await portal.main_intent.invite_user(portal.mxid, evt.sender.mxid) return await portal.create_matrix_room(evt.sender, puppet.address) await evt.reply(f"Created a portal room with {_pill(puppet)} and invited you to it") @command_handler( needs_auth=True, management_only=False, help_section=SECTION_SIGNAL, help_text="Join a Signal group with an invite link", help_args="<_link_>", ) async def join(evt: CommandEvent) -> EventID: if len(evt.args) == 0: return await evt.reply("**Usage:** `$cmdprefix+sp join `") try: resp = await evt.bridge.signal.join_group(evt.sender.username, evt.args[0]) if resp.pending_admin_approval: return await evt.reply( f"Successfully requested to join {resp.title}, waiting for admin approval." ) else: return await evt.reply(f"Successfully joined {resp.title}") except Exception: evt.log.exception("Error trying to join group") await evt.reply("Failed to join group (see logs for more details)") @command_handler( needs_auth=True, management_only=False, help_section=SECTION_SIGNAL, help_text="Get the invite link to the current group", ) async def invite_link(evt: CommandEvent) -> EventID: if not evt.is_portal: return await evt.reply("This is not a portal room.") group = await evt.bridge.signal.get_group( evt.sender.username, evt.portal.chat_id, evt.portal.revision ) if not group: await evt.reply("Failed to get group info") elif not group.invite_link: await evt.reply("Invite link not available") else: await evt.reply(group.invite_link) @command_handler( needs_auth=True, management_only=False, help_section=SECTION_SIGNAL, help_text="View the safety number of a specific user", help_args="[--qr] [_phone_]", ) async def safety_number(evt: CommandEvent) -> None: show_qr = evt.args and evt.args[0].lower() == "--qr" if show_qr: if not qrcode: await evt.reply("Can't generate QR code: qrcode and/or PIL not installed") return evt.args = evt.args[1:] if len(evt.args) == 0 and evt.portal and evt.portal.is_direct: puppet = await evt.portal.get_dm_puppet() else: puppet = await _get_puppet_from_cmd(evt) if not puppet: return resp = await evt.bridge.signal.get_identities(evt.sender.username, puppet.address) if not resp.identities: await evt.reply(f"No identities found for {_pill(puppet)}") return most_recent = resp.identities[0] for identity in resp.identities: if identity.added > most_recent.added: most_recent = identity uuid = resp.address.uuid or "unknown" await evt.reply( f"### {puppet.name}\n\n" f"**UUID:** {uuid} \n" f"**Trust level:** {most_recent.trust_level} \n" f"**Safety number:**\n" f"```\n{_format_safety_number(most_recent.safety_number)}\n```" ) if show_qr and most_recent.qr_code_data: data = base64.b64decode(most_recent.qr_code_data) content = await make_qr(evt.main_intent, data, "verification-qr.png") await evt.main_intent.send_message(evt.room_id, content) @command_handler( needs_auth=True, management_only=False, help_section=SECTION_SIGNAL, help_text="Set your Signal profile name", help_args="<_name_>", ) async def set_profile_name(evt: CommandEvent) -> None: await evt.bridge.signal.set_profile(evt.sender.username, name=" ".join(evt.args)) await evt.reply("Successfully updated profile name") _trust_levels = [x.value for x in TrustLevel] @command_handler( needs_auth=True, management_only=False, help_section=SECTION_SIGNAL, help_text="Mark another user's safety number as trusted", help_args="<_recipient phone_> [_level_] <_safety number_>", ) async def mark_trusted(evt: CommandEvent) -> EventID: if len(evt.args) < 2: return await evt.reply( "**Usage:** `$cmdprefix+sp mark-trusted [level] `" ) number = normalize_number(evt.args[0]) remaining_args = evt.args[1:] trust_level = TrustLevel.TRUSTED_VERIFIED if len(evt.args) > 2 and evt.args[1].upper() in _trust_levels: trust_level = TrustLevel(evt.args[1]) remaining_args = evt.args[2:] safety_num = "".join(remaining_args).replace("\n", "") if len(safety_num) != 60 or not safety_num.isdecimal(): return await evt.reply("That doesn't look like a valid safety number") try: await evt.bridge.signal.trust( evt.sender.username, Address(number=number), safety_number=safety_num, trust_level=trust_level, ) except UnknownIdentityKey as e: return await evt.reply(f"Failed to mark {number} as {trust_level.human_str}: {e}") return await evt.reply(f"Successfully marked {number} as {trust_level.human_str}") @command_handler( needs_admin=False, needs_auth=True, help_section=SECTION_SIGNAL, help_text="Sync data from Signal", ) async def sync(evt: CommandEvent) -> None: await evt.sender.sync() await evt.reply("Sync complete") @command_handler( needs_admin=True, needs_auth=False, help_section=SECTION_ADMIN, help_text="Send raw requests to signald", help_args="[--user] <_json_>", ) async def raw(evt: CommandEvent) -> None: add_username = False while True: flag = evt.args[0].lower() if flag == "--user": add_username = True else: break evt.args = evt.args[1:] type = evt.args[0] version = "v0" if "." in type: version, type = type.split(".", 1) try: args = json.loads(" ".join(evt.args[1:])) except json.JSONDecodeError as e: await evt.reply(f"JSON decode error: {e}") return if add_username: if version == "v0" or (version == "v1" and type in ("send", "react")): args["username"] = evt.sender.username else: args["account"] = evt.sender.username if version: args["version"] = version try: resp_type, resp_data = await evt.bridge.signal._raw_request(type, **args) except Exception as e: await evt.reply(f"Error sending request: {e}") else: if resp_data is None: await evt.reply(f"Got reply `{resp_type}` with no content") else: await evt.reply( f"Got reply `{resp_type}`:\n\n```json\n{json.dumps(resp_data, indent=2)}\n```" ) missing_power_warning = ( "Warning: The bridge bot ([{bot_mxid}](https://matrix.to/#/{bot_mxid})) does not have " "sufficient privileges to change power levels on Matrix. Power level changes will not be " "bridged." ) low_power_warning = ( "Warning: The bridge bot ([{bot_mxid}](https://matrix.to/#/{bot_mxid})) has a power level " "below or equal to 50. Bridged moderator rights are currently hardcoded to PL 50, so the " "bridge bot must have a higher level to properly bridge them." ) meta_power_warning = ( "Warning: Permissions for changing name, topic and avatar cannot be set separately on Signal. " "Changes to those may not be bridged properly, unless the permissions are set to the same " "level or lower than state_default." ) @command_handler( needs_auth=True, management_only=False, help_section=SECTION_SIGNAL, help_text="Create a Signal group for the current Matrix room.", ) async def create(evt: CommandEvent) -> EventID: if evt.portal: return await evt.reply("This is already a portal room.") title, about, levels, encrypted, avatar_url, join_rule = await get_initial_state( evt.az.intent, evt.room_id ) if not title: return await evt.reply("Please set a room name before creating a Signal group.") portal = po.Portal( chat_id=GroupID(""), mxid=evt.room_id, name=title, topic=about or "", encrypted=encrypted, receiver="", avatar_url=avatar_url, ) await warn_missing_power(levels, evt) await portal.create_signal_group(evt.sender, levels, join_rule) @command_handler( name="id", needs_auth=True, management_only=False, help_section=SECTION_SIGNAL, help_text="Get the ID of the Signal chat where this room is bridged.", ) async def get_id(evt: CommandEvent) -> EventID: if evt.portal: return await evt.reply(f"This room is bridged to Signal chat ID `{evt.portal.chat_id}`.") await evt.reply("This is not a portal room.") @command_handler( needs_auth=True, management_only=False, help_section=SECTION_SIGNAL, help_text="Bridge the current Matrix room to the Signal chat with the given ID.", help_args=" [matrix room ID]", ) async def bridge(evt: CommandEvent) -> EventID: if len(evt.args) == 0: return await evt.reply( "**Usage:** `$cmdprefix+sp bridge [matrix room ID]`" ) room_id = RoomID(evt.args[1]) if len(evt.args) > 1 else evt.room_id that_this = "This" if room_id == evt.room_id else "That" portal = await po.Portal.get_by_mxid(room_id) if portal: return await evt.reply(f"{that_this} room is already a portal room.") if not await user_has_power_level(room_id, evt.az.intent, evt.sender, "bridge"): return await evt.reply(f"You do not have the permissions to bridge {that_this} room.") portal = await po.Portal.get_by_chat_id(GroupID(evt.args[0]), create=True) if portal.mxid: has_portal_message = ( "That Signal chat already has a portal at " f"[{portal.mxid}](https://matrix.to/#/{portal.mxid}). " ) if not await user_has_power_level(portal.mxid, evt.az.intent, evt.sender, "unbridge"): return await evt.reply( f"{has_portal_message}" "Additionally, you do not have the permissions to unbridge that room." ) evt.sender.command_status = { "next": confirm_bridge, "action": "Room bridging", "mxid": portal.mxid, "bridge_to_mxid": room_id, "chat_id": portal.chat_id, } return await evt.reply( f"{has_portal_message}" "However, you have the permissions to unbridge that room.\n\n" "To delete that portal completely and continue bridging, use " "`$cmdprefix+sp delete-and-continue`. To unbridge the portal " "without kicking Matrix users, use `$cmdprefix+sp unbridge-and-" "continue`. To cancel, use `$cmdprefix+sp cancel`" ) evt.sender.command_status = { "next": confirm_bridge, "action": "Room bridging", "bridge_to_mxid": room_id, "chat_id": portal.chat_id, } return await evt.reply( "That Signal chat has no existing portal. To confirm bridging the " "chat to this room, use `$cmdprefix+sp continue`" ) async def cleanup_old_portal_while_bridging( evt: CommandEvent, portal: po.Portal ) -> tuple[bool, Awaitable[None] | None]: if not portal.mxid: await evt.reply( "The portal seems to have lost its Matrix room between you" "calling `$cmdprefix+sp bridge` and this command.\n\n" "Continuing without touching previous Matrix room..." ) return True, None elif evt.args[0] == "delete-and-continue": return True, portal.cleanup_portal("Portal deleted (moving to another room)") elif evt.args[0] == "unbridge-and-continue": return True, portal.cleanup_portal( "Room unbridged (portal moving to another room)", puppets_only=True ) else: await evt.reply( "The chat you were trying to bridge already has a Matrix portal room.\n\n" "Please use `$cmdprefix+sp delete-and-continue` or `$cmdprefix+sp unbridge-and-" "continue` to either delete or unbridge the existing room (respectively) and " "continue with the bridging.\n\n" "If you changed your mind, use `$cmdprefix+sp cancel` to cancel." ) return False, None async def confirm_bridge(evt: CommandEvent) -> EventID | None: status = evt.sender.command_status try: portal = await po.Portal.get_by_chat_id(status["chat_id"]) bridge_to_mxid = status["bridge_to_mxid"] except KeyError: evt.sender.command_status = None return await evt.reply( "Fatal error: chat_id missing from command_status. " "This shouldn't happen unless you're messing with the command handler code." ) is_logged_in = await evt.sender.is_logged_in() if "mxid" in status: ok, coro = await cleanup_old_portal_while_bridging(evt, portal) if not ok: return None elif coro: await evt.reply("Cleaning up previous portal room...") await coro elif portal.mxid: evt.sender.command_status = None return await evt.reply( "The portal seems to have created a Matrix room between you " "calling `$cmdprefix+sp bridge` and this command.\n\n" "Please start over by calling the bridge command again." ) elif evt.args[0] != "continue": return await evt.reply( "Please use `$cmdprefix+sp continue` to confirm the bridging or " "`$cmdprefix+sp cancel` to cancel." ) evt.sender.command_status = None async with portal._create_room_lock: await _locked_confirm_bridge( evt, portal=portal, room_id=bridge_to_mxid, is_logged_in=is_logged_in ) async def _locked_confirm_bridge( evt: CommandEvent, portal: po.Portal, room_id: RoomID, is_logged_in: bool ) -> EventID | None: try: group = await evt.bridge.signal.get_group( evt.sender.username, portal.chat_id, portal.revision ) except Exception: evt.log.exception("Failed to get_group(%s) for manual bridging.", portal.chat_id) if is_logged_in: return await evt.reply( "Failed to get info of signal chat. You are logged in, are you in that chat?" ) else: return await evt.reply( "Failed to get info of signal chat. " "You're not logged in, this should not happen." ) portal.mxid = room_id portal.by_mxid[portal.mxid] = portal ( portal.title, portal.about, levels, portal.encrypted, portal.photo_id, join_rule, ) = await get_initial_state(evt.az.intent, evt.room_id) await portal.save() await portal.update_bridge_info() background_task.create(portal.update_matrix_room(evt.sender, group)) await warn_missing_power(levels, evt) return await evt.reply("Bridging complete. Portal synchronization should begin momentarily.") async def warn_missing_power(levels: PowerLevelStateEventContent, evt: CommandEvent) -> None: bot_pl = levels.get_user_level(evt.az.bot_mxid) if bot_pl < levels.get_event_level(EventType.ROOM_POWER_LEVELS): await evt.reply(missing_power_warning.format(bot_mxid=evt.az.bot_mxid)) elif bot_pl <= 50: await evt.reply(low_power_warning.format(bot_mxid=evt.az.bot_mxid)) if levels.state_default < 50 and ( levels.events[EventType.ROOM_NAME] >= 50 or levels.events[EventType.ROOM_AVATAR] >= 50 or levels.events[EventType.ROOM_TOPIC] >= 50 ): await evt.reply(meta_power_warning) @command_handler( needs_auth=False, management_only=False, help_section=SECTION_SIGNAL, help_text="Invite a Signal user by phone number", help_args="<_phone_>", ) async def invite(evt: CommandEvent) -> EventID | None: if not evt.is_portal: return await evt.reply("This is not a portal room.") portal = evt.portal puppet = await _get_puppet_from_cmd(evt) if not puppet: return None levels = await portal.main_intent.get_power_levels(portal.mxid) if levels.get_user_level(puppet.mxid) < levels.invite: return await evt.reply("You do not have permissions to invite users to this room") try: info = await portal.handle_matrix_invite(evt.sender, puppet) sender, is_relay = await portal.get_relay_sender(evt.sender, "updating info") await portal.update_info(sender, info) except RejectMatrixInvite as e: return await evt.reply(f"Failed to invite {puppet.name}: {e}")