# mautrix-signal - A Matrix-Signal puppeting bridge
# Copyright (C) 2022 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
from __future__ import annotations
import base64
import json
from mausignald.errors import UnknownIdentityKey, UnregisteredUserError
from mausignald.types import Address, GroupID, TrustLevel
from mautrix.appservice import IntentAPI
from mautrix.bridge.commands import SECTION_ADMIN, HelpSection, command_handler
from mautrix.types import ContentURI, EventID, EventType, PowerLevelStateEventContent, RoomID
from .. import portal as po, puppet as pu
from ..util import normalize_number, id_to_str
from .auth import make_qr
from .typehint import CommandEvent
try:
import PIL as _
import qrcode
except ImportError:
qrcode = None
SECTION_SIGNAL = HelpSection("Signal actions", 20, "")
async def _get_puppet_from_cmd(evt: CommandEvent) -> pu.Puppet | None:
try:
phone = normalize_number("".join(evt.args))
except Exception:
await evt.reply(
f"**Usage:** `$cmdprefix+sp {evt.command} ` "
"(enter phone number in international format)"
)
return None
puppet: pu.Puppet = await pu.Puppet.get_by_address(Address(number=phone))
if not puppet.uuid and evt.sender.username:
try:
uuid = await evt.bridge.signal.find_uuid(evt.sender.username, puppet.number)
except UnregisteredUserError:
await evt.reply("User not registered")
return None
if uuid:
await puppet.handle_uuid_receive(uuid)
return puppet
def _format_safety_number(number: str) -> str:
line_size = 20
chunk_size = 5
return "\n".join(
" ".join(
[
number[chunk : chunk + chunk_size]
for chunk in range(line, line + line_size, chunk_size)
]
)
for line in range(0, len(number), line_size)
)
def _pill(puppet: "pu.Puppet") -> str:
return f"[{puppet.name}](https://matrix.to/#/{puppet.mxid})"
@command_handler(
needs_auth=True,
management_only=False,
help_section=SECTION_SIGNAL,
help_text="Open a private chat portal with a specific phone number",
help_args="<_phone_>",
)
async def pm(evt: CommandEvent) -> None:
puppet = await _get_puppet_from_cmd(evt)
if not puppet:
return
portal = await po.Portal.get_by_chat_id(
puppet.address, receiver=evt.sender.username, create=True
)
if portal.mxid:
await evt.reply(
f"You already have a private chat with {puppet.name}: "
f"[{portal.mxid}](https://matrix.to/#/{portal.mxid})"
)
await portal.main_intent.invite_user(portal.mxid, evt.sender.mxid)
return
await portal.create_matrix_room(evt.sender, puppet.address)
await evt.reply(f"Created a portal room with {_pill(puppet)} and invited you to it")
@command_handler(
needs_auth=True,
management_only=False,
help_section=SECTION_SIGNAL,
help_text="Join a Signal group with an invite link",
help_args="<_link_>",
)
async def join(evt: CommandEvent) -> EventID:
if len(evt.args) == 0:
return await evt.reply("**Usage:** `$cmdprefix+sp join `")
try:
resp = await evt.bridge.signal.join_group(evt.sender.username, evt.args[0])
if resp.pending_admin_approval:
return await evt.reply(
f"Successfully requested to join {resp.title}, waiting for admin approval."
)
else:
return await evt.reply(f"Successfully joined {resp.title}")
except Exception:
evt.log.exception("Error trying to join group")
await evt.reply("Failed to join group (see logs for more details)")
@command_handler(
needs_auth=True,
management_only=False,
help_section=SECTION_SIGNAL,
help_text="Get the invite link to the current group",
)
async def invite_link(evt: CommandEvent) -> EventID:
if not evt.is_portal:
return await evt.reply("This is not a portal room.")
group = await evt.bridge.signal.get_group(
evt.sender.username, evt.portal.chat_id, evt.portal.revision
)
if not group:
await evt.reply("Failed to get group info")
elif not group.invite_link:
await evt.reply("Invite link not available")
else:
await evt.reply(group.invite_link)
@command_handler(
needs_auth=True,
management_only=False,
help_section=SECTION_SIGNAL,
help_text="View the safety number of a specific user",
help_args="[--qr] [_phone_]",
)
async def safety_number(evt: CommandEvent) -> None:
show_qr = evt.args and evt.args[0].lower() == "--qr"
if show_qr:
if not qrcode:
await evt.reply("Can't generate QR code: qrcode and/or PIL not installed")
return
evt.args = evt.args[1:]
if len(evt.args) == 0 and evt.portal and evt.portal.is_direct:
puppet = await pu.Puppet.get_by_address(evt.portal.chat_id)
else:
puppet = await _get_puppet_from_cmd(evt)
if not puppet:
return
resp = await evt.bridge.signal.get_identities(evt.sender.username, puppet.address)
if not resp.identities:
await evt.reply(f"No identities found for {_pill(puppet)}")
return
most_recent = resp.identities[0]
for identity in resp.identities:
if identity.added > most_recent.added:
most_recent = identity
uuid = resp.address.uuid or "unknown"
await evt.reply(
f"### {puppet.name}\n\n"
f"**UUID:** {uuid} \n"
f"**Trust level:** {most_recent.trust_level} \n"
f"**Safety number:**\n"
f"```\n{_format_safety_number(most_recent.safety_number)}\n```"
)
if show_qr and most_recent.qr_code_data:
data = base64.b64decode(most_recent.qr_code_data)
content = await make_qr(evt.main_intent, data, "verification-qr.png")
await evt.main_intent.send_message(evt.room_id, content)
@command_handler(
needs_auth=True,
management_only=False,
help_section=SECTION_SIGNAL,
help_text="Set your Signal profile name",
help_args="<_name_>",
)
async def set_profile_name(evt: CommandEvent) -> None:
await evt.bridge.signal.set_profile(evt.sender.username, name=" ".join(evt.args))
await evt.reply("Successfully updated profile name")
_trust_levels = [x.value for x in TrustLevel]
@command_handler(
needs_auth=True,
management_only=False,
help_section=SECTION_SIGNAL,
help_text="Mark another user's safety number as trusted",
help_args="<_recipient phone_> [_level_] <_safety number_>",
)
async def mark_trusted(evt: CommandEvent) -> EventID:
if len(evt.args) < 2:
return await evt.reply(
"**Usage:** `$cmdprefix+sp mark-trusted [level] `"
)
number = normalize_number(evt.args[0])
remaining_args = evt.args[1:]
trust_level = TrustLevel.TRUSTED_VERIFIED
if len(evt.args) > 2 and evt.args[1].upper() in _trust_levels:
trust_level = TrustLevel(evt.args[1])
remaining_args = evt.args[2:]
safety_num = "".join(remaining_args).replace("\n", "")
if len(safety_num) != 60 or not safety_num.isdecimal():
return await evt.reply("That doesn't look like a valid safety number")
try:
await evt.bridge.signal.trust(
evt.sender.username,
Address(number=number),
safety_number=safety_num,
trust_level=trust_level,
)
except UnknownIdentityKey as e:
return await evt.reply(f"Failed to mark {number} as {trust_level.human_str}: {e}")
return await evt.reply(f"Successfully marked {number} as {trust_level.human_str}")
@command_handler(
needs_admin=False,
needs_auth=True,
help_section=SECTION_SIGNAL,
help_text="Sync data from Signal",
)
async def sync(evt: CommandEvent) -> None:
await evt.sender.sync()
await evt.reply("Sync complete")
@command_handler(
needs_admin=True,
needs_auth=False,
help_section=SECTION_ADMIN,
help_text="Send raw requests to signald",
help_args="[--user] <_json_>",
)
async def raw(evt: CommandEvent) -> None:
add_username = False
while True:
flag = evt.args[0].lower()
if flag == "--user":
add_username = True
else:
break
evt.args = evt.args[1:]
type = evt.args[0]
version = "v0"
if "." in type:
version, type = type.split(".", 1)
try:
args = json.loads(" ".join(evt.args[1:]))
except json.JSONDecodeError as e:
await evt.reply(f"JSON decode error: {e}")
return
if add_username:
if version == "v0" or (version == "v1" and type in ("send", "react")):
args["username"] = evt.sender.username
else:
args["account"] = evt.sender.username
if version:
args["version"] = version
try:
resp_type, resp_data = await evt.bridge.signal._raw_request(type, **args)
except Exception as e:
await evt.reply(f"Error sending request: {e}")
else:
if resp_data is None:
await evt.reply(f"Got reply `{resp_type}` with no content")
else:
await evt.reply(
f"Got reply `{resp_type}`:\n\n```json\n{json.dumps(resp_data, indent=2)}\n```"
)
missing_power_warning = (
"Warning: The bridge bot ([{bot_mxid}](https://matrix.to/#/{bot_mxid})) does not have "
"sufficient privileges to change power levels on Matrix. Power level changes will not be "
"bridged."
)
low_power_warning = (
"Warning: The bridge bot ([{bot_mxid}](https://matrix.to/#/{bot_mxid})) has a power level "
"below or equal to 50. Bridged moderator rights are currently hardcoded to PL 50, so the "
"bridge bot must have a higher level to properly bridge them."
)
meta_power_warning = (
"Warning: Permissions for changing name, topic and avatar cannot be set separately on Signal. "
"Changes to those may not be bridged properly, unless the permissions are set to the same "
"level or lower than state_default."
)
@command_handler(
needs_auth=True,
management_only=False,
help_section=SECTION_SIGNAL,
help_text="Create a Signal group for the current Matrix room.",
)
async def create(evt: CommandEvent) -> EventID:
if evt.portal:
return await evt.reply("This is already a portal room.")
title, about, levels, encrypted, avatar_url = await get_initial_state(
evt.az.intent, evt.room_id
)
portal = po.Portal(
chat_id=GroupID(""),
mxid=evt.room_id,
name=title,
topic=about or "",
encrypted=encrypted,
receiver="",
avatar_url=avatar_url,
)
bot_pl = levels.get_user_level(evt.az.bot_mxid)
if bot_pl < levels.get_event_level(EventType.ROOM_POWER_LEVELS):
await evt.reply(missing_power_warning.format(bot_mxid=evt.az.bot_mxid))
elif bot_pl <= 50:
await evt.reply(low_power_warning.format(bot_mxid=evt.az.bot_mxid))
if levels.state_default < 50 and (
levels.events[EventType.ROOM_NAME] >= 50
or levels.events[EventType.ROOM_AVATAR] >= 50
or levels.events[EventType.ROOM_TOPIC] >= 50
):
await evt.reply(meta_power_warning)
await portal.create_signal_group(evt.sender, levels)
await evt.reply(f"Signal chat created. ID: {portal.chat_id}")
@command_handler(
needs_auth=True,
management_only=False,
help_section=SECTION_SIGNAL,
help_text="Get the current Signal group ID.",
)
async def get_id(evt: CommandEvent) -> EventID:
if evt.portal:
return await evt.reply(f"ID: {evt.portal.chat_id}")
await evt.reply("This is not a portal room.")
@command_handler(
needs_auth=True,
management_only=False,
help_section=SECTION_SIGNAL,
help_text="Bridge the current Matrix room to the Signal chat with the given ID",
help_args="",
)
async def bridge(evt: CommandEvent) -> EventID:
if evt.portal:
return await evt.reply("This is already a portal room.")
chat_id = GroupID(evt.args[0])
title, about, levels, encrypted, avatar_url = await get_initial_state(
evt.az.intent, evt.room_id
)
portal = await po.Portal.get_by_chat_id(
chat_id, create=True
)
if portal.mxid:
await evt.reply(
f"You already have a chat with the groupID {id_to_str(chat_id)}: "
f"[{portal.mxid}](https://matrix.to/#/{portal.mxid})"
)
await portal.main_intent.invite_user(portal.mxid, evt.sender.mxid)
return
portal = po.Portal(
chat_id=chat_id,
mxid=evt.room_id,
name=title,
topic=about or "",
encrypted=encrypted,
receiver="",
avatar_url=avatar_url,
)
bot_pl = levels.get_user_level(evt.az.bot_mxid)
if bot_pl < levels.get_event_level(EventType.ROOM_POWER_LEVELS):
await evt.reply(missing_power_warning.format(bot_mxid=evt.az.bot_mxid))
elif bot_pl <= 50:
await evt.reply(low_power_warning.format(bot_mxid=evt.az.bot_mxid))
if levels.state_default < 50 and (
levels.events[EventType.ROOM_NAME] >= 50
or levels.events[EventType.ROOM_AVATAR] >= 50
or levels.events[EventType.ROOM_TOPIC] >= 50
):
await evt.reply(meta_power_warning)
await portal.bridge_signal_group(evt.sender, levels)
await evt.reply(f"Signal chat bridged. ID: {portal.chat_id}")
async def get_initial_state(
intent: IntentAPI, room_id: RoomID
) -> tuple[str | None, str | None, PowerLevelStateEventContent | None, bool, ContentURI | None]:
state = await intent.get_state(room_id)
title: str | None = None
about: str | None = None
levels: PowerLevelStateEventContent | None = None
encrypted: bool = False
avatar_url: ContentURI | None = None
for event in state:
try:
if event.type == EventType.ROOM_NAME:
title = event.content.name
elif event.type == EventType.ROOM_TOPIC:
about = event.content.topic
elif event.type == EventType.ROOM_POWER_LEVELS:
levels = event.content
elif event.type == EventType.ROOM_CANONICAL_ALIAS:
title = title or event.content.canonical_alias
elif event.type == EventType.ROOM_ENCRYPTION:
encrypted = True
elif event.type == EventType.ROOM_AVATAR:
avatar_url = event.content.url
except KeyError:
# Some state event probably has empty content
pass
return title, about, levels, encrypted, avatar_url