user.py 15 KB


  1. # mautrix-signal - A Matrix-Signal puppeting bridge
  2. # Copyright (C) 2021 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from typing import TYPE_CHECKING, AsyncGenerator, Dict, List, Optional, Union, cast
  17. from asyncio.tasks import sleep
  18. from datetime import datetime
  19. from uuid import UUID
  20. import asyncio
  21. from mautrix.appservice import AppService
  22. from mautrix.bridge import AutologinError, BaseUser, async_getter_lock
  23. from mautrix.types import RoomID, UserID
  24. from mautrix.util.bridge_state import BridgeState, BridgeStateEvent
  25. from mautrix.util.opt_prometheus import Gauge
  26. from mausignald.types import (
  27. Account,
  28. Address,
  29. Group,
  30. GroupV2,
  31. Profile,
  32. WebsocketConnectionState,
  33. WebsocketConnectionStateChangeEvent,
  34. )
  35. from . import portal as po
  36. from . import puppet as pu
  37. from .config import Config
  38. from .db import User as DBUser
  39. if TYPE_CHECKING:
  40. from .__main__ import SignalBridge
  41. METRIC_CONNECTED = Gauge("bridge_connected", "Bridge users connected to Signal")
  42. METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Bridge users logged into Signal")
  43. BridgeState.human_readable_errors.update(
  44. {
  45. "logged-out": "You're not logged into Signal",
  46. "signal-not-connected": None,
  47. }
  48. )
  49. class User(DBUser, BaseUser):
  50. by_mxid: Dict[UserID, "User"] = {}
  51. by_username: Dict[str, "User"] = {}
  52. by_uuid: Dict[UUID, "User"] = {}
  53. config: Config
  54. az: AppService
  55. loop: asyncio.AbstractEventLoop
  56. bridge: "SignalBridge"
  57. relay_whitelisted: bool
  58. is_admin: bool
  59. permission_level: str
  60. _sync_lock: asyncio.Lock
  61. _notice_room_lock: asyncio.Lock
  62. _connected: bool
  63. _websocket_connection_state: Optional[BridgeStateEvent]
  64. _latest_non_transient_disconnect_state: Optional[datetime]
  65. def __init__(
  66. self,
  67. mxid: UserID,
  68. username: Optional[str] = None,
  69. uuid: Optional[UUID] = None,
  70. notice_room: Optional[RoomID] = None,
  71. ) -> None:
  72. super().__init__(mxid=mxid, username=username, uuid=uuid, notice_room=notice_room)
  73. BaseUser.__init__(self)
  74. self._notice_room_lock = asyncio.Lock()
  75. self._sync_lock = asyncio.Lock()
  76. self._connected = False
  77. self._websocket_connection_state = None
  78. perms = self.config.get_permissions(mxid)
  79. self.relay_whitelisted, self.is_whitelisted, self.is_admin, self.permission_level = perms
  80. @classmethod
  81. def init_cls(cls, bridge: "SignalBridge") -> None:
  82. cls.bridge = bridge
  83. cls.config = bridge.config
  84. cls.az = bridge.az
  85. cls.loop = bridge.loop
  86. @property
  87. def address(self) -> Optional[Address]:
  88. if not self.username:
  89. return None
  90. return Address(uuid=self.uuid, number=self.username)
  91. async def is_logged_in(self) -> bool:
  92. return bool(self.username)
  93. async def needs_relay(self, portal: "po.Portal") -> bool:
  94. return not await self.is_logged_in() or (
  95. portal.is_direct and portal.receiver != self.username
  96. )
  97. async def logout(self) -> None:
  98. if not self.username:
  99. return
  100. username = self.username
  101. if self.uuid and self.by_uuid.get(self.uuid) == self:
  102. del self.by_uuid[self.uuid]
  103. if self.username and self.by_username.get(self.username) == self:
  104. del self.by_username[self.username]
  105. self.username = None
  106. self.uuid = None
  107. await self.update()
  108. await self.bridge.signal.unsubscribe(username)
  109. # Wait a while for signald to finish disconnecting
  110. await asyncio.sleep(1)
  111. await self.bridge.signal.delete_account(username)
  112. self._track_metric(METRIC_LOGGED_IN, False)
  113. await self.push_bridge_state(BridgeStateEvent.LOGGED_OUT, remote_id=username)
  114. async def fill_bridge_state(self, state: BridgeState) -> None:
  115. await super().fill_bridge_state(state)
  116. if not state.remote_id:
  117. state.remote_id = self.username
  118. if self.address:
  119. puppet = await self.get_puppet()
  120. state.remote_name = puppet.name or self.username
  121. async def get_bridge_states(self) -> List[BridgeState]:
  122. if not self.username:
  123. return []
  124. state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR)
  125. if self.bridge.signal.is_connected and self._connected:
  126. state.state_event = BridgeStateEvent.CONNECTED
  127. else:
  128. state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
  129. return [state]
  130. async def get_puppet(self) -> Optional["pu.Puppet"]:
  131. if not self.address:
  132. return None
  133. return await pu.Puppet.get_by_address(self.address)
  134. async def on_signin(self, account: Account) -> None:
  135. self.username = account.account_id
  136. self.uuid = account.address.uuid
  137. self._add_to_cache()
  138. await self.update()
  139. await self.bridge.signal.subscribe(self.username)
  140. asyncio.create_task(self.sync())
  141. self._track_metric(METRIC_LOGGED_IN, True)
  142. def on_websocket_connection_state_change(
  143. self, evt: WebsocketConnectionStateChangeEvent
  144. ) -> None:
  145. if evt.state == WebsocketConnectionState.CONNECTED:
  146. self.log.info("Connected to Signal")
  147. self._track_metric(METRIC_CONNECTED, True)
  148. self._track_metric(METRIC_LOGGED_IN, True)
  149. self._connected = True
  150. else:
  151. self.log.warning(
  152. f"New websocket state from signald: {evt.state}. Error: {evt.exception}"
  153. )
  154. self._track_metric(METRIC_CONNECTED, False)
  155. self._connected = False
  156. bridge_state = {
  157. # Signald disconnected
  158. WebsocketConnectionState.SOCKET_DISCONNECTED: BridgeStateEvent.TRANSIENT_DISCONNECT,
  159. # Websocket state reported by signald
  160. WebsocketConnectionState.DISCONNECTED: (
  161. None
  162. if self._websocket_connection_state == BridgeStateEvent.BAD_CREDENTIALS
  163. else BridgeStateEvent.TRANSIENT_DISCONNECT
  164. ),
  165. WebsocketConnectionState.CONNECTING: BridgeStateEvent.CONNECTING,
  166. WebsocketConnectionState.CONNECTED: BridgeStateEvent.CONNECTED,
  167. WebsocketConnectionState.RECONNECTING: BridgeStateEvent.TRANSIENT_DISCONNECT,
  168. WebsocketConnectionState.DISCONNECTING: BridgeStateEvent.TRANSIENT_DISCONNECT,
  169. WebsocketConnectionState.AUTHENTICATION_FAILED: BridgeStateEvent.BAD_CREDENTIALS,
  170. WebsocketConnectionState.FAILED: BridgeStateEvent.TRANSIENT_DISCONNECT,
  171. }.get(evt.state)
  172. if bridge_state is None:
  173. self.log.info(f"Websocket state {evt.state} seen. Will not report new Bridge State")
  174. return
  175. now = datetime.now()
  176. if bridge_state == BridgeStateEvent.TRANSIENT_DISCONNECT:
  177. async def wait_report_transient_disconnect():
  178. # Wait for 10 seconds (that should be enough for the bridge to get connected)
  179. # before sending a TRANSIENT_DISCONNECT.
  180. # self._latest_non_transient_disconnect_state will only be None if the bridge is
  181. # still starting.
  182. if self._latest_non_transient_disconnect_state is None:
  183. await sleep(15)
  184. if self._latest_non_transient_disconnect_state is None:
  185. asyncio.create_task(self.push_bridge_state(bridge_state))
  186. # Wait for another minute. If the bridge stays in TRANSIENT_DISCONNECT for that
  187. # long, something terrible has happened (signald failed to restart, the internet
  188. # broke, etc.)
  189. await sleep(60)
  190. if (
  191. self._latest_non_transient_disconnect_state
  192. and now > self._latest_non_transient_disconnect_state
  193. ):
  194. asyncio.create_task(self.push_bridge_state(BridgeStateEvent.UNKNOWN_ERROR))
  195. else:
  196. self.log.info(
  197. "New state since last TRANSIENT_DISCONNECT push. "
  198. "Not transitioning to UNKNOWN_ERROR."
  199. )
  200. asyncio.create_task(wait_report_transient_disconnect())
  201. else:
  202. asyncio.create_task(self.push_bridge_state(bridge_state))
  203. self._latest_non_transient_disconnect_state = now
  204. self._websocket_connection_state = bridge_state
  205. async def _sync_puppet(self) -> None:
  206. puppet = await pu.Puppet.get_by_address(self.address)
  207. if puppet.uuid and not self.uuid:
  208. self.uuid = puppet.uuid
  209. self.by_uuid[self.uuid] = self
  210. if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
  211. self.log.info("Automatically enabling custom puppet")
  212. try:
  213. await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
  214. except AutologinError as e:
  215. self.log.warning(f"Failed to enable custom puppet: {e}")
  216. async def sync(self) -> None:
  217. await self.sync_puppet()
  218. await self.sync_contacts()
  219. await self.sync_groups()
  220. async def sync_puppet(self) -> None:
  221. try:
  222. async with self._sync_lock:
  223. await self._sync_puppet()
  224. except Exception:
  225. self.log.exception("Error while syncing own puppet")
  226. async def sync_contacts(self) -> None:
  227. try:
  228. async with self._sync_lock:
  229. await self._sync_contacts()
  230. except Exception:
  231. self.log.exception("Error while syncing contacts")
  232. async def sync_groups(self) -> None:
  233. try:
  234. async with self._sync_lock:
  235. await self._sync_groups()
  236. except Exception:
  237. self.log.exception("Error while syncing groups")
  238. async def sync_contact(
  239. self, contact: Union[Profile, Address], create_portals: bool = False
  240. ) -> None:
  241. self.log.trace("Syncing contact %s", contact)
  242. if isinstance(contact, Address):
  243. address = contact
  244. profile = await self.bridge.signal.get_profile(self.username, address, use_cache=True)
  245. if profile and profile.name:
  246. self.log.trace("Got profile for %s: %s", address, profile)
  247. else:
  248. address = contact.address
  249. profile = contact
  250. puppet = await pu.Puppet.get_by_address(address)
  251. await puppet.update_info(profile)
  252. if create_portals:
  253. portal = await po.Portal.get_by_chat_id(
  254. puppet.address, receiver=self.username, create=True
  255. )
  256. await portal.create_matrix_room(self, profile)
  257. async def _sync_group(self, group: Group, create_portals: bool) -> None:
  258. self.log.trace("Syncing group %s", group)
  259. portal = await po.Portal.get_by_chat_id(group.group_id, create=True)
  260. if create_portals:
  261. await portal.create_matrix_room(self, group)
  262. elif portal.mxid:
  263. await portal.update_matrix_room(self, group)
  264. async def _sync_group_v2(self, group: GroupV2, create_portals: bool) -> None:
  265. self.log.trace("Syncing group %s", group.id)
  266. portal = await po.Portal.get_by_chat_id(group.id, create=True)
  267. if create_portals:
  268. await portal.create_matrix_room(self, group)
  269. elif portal.mxid:
  270. await portal.update_matrix_room(self, group)
  271. async def _sync_contacts(self) -> None:
  272. create_contact_portal = self.config["bridge.autocreate_contact_portal"]
  273. for contact in await self.bridge.signal.list_contacts(self.username):
  274. try:
  275. await self.sync_contact(contact, create_contact_portal)
  276. except Exception:
  277. self.log.exception(f"Failed to sync contact {contact.address}")
  278. async def _sync_groups(self) -> None:
  279. create_group_portal = self.config["bridge.autocreate_group_portal"]
  280. for group in await self.bridge.signal.list_groups(self.username):
  281. group_id = group.group_id if isinstance(group, Group) else group.id
  282. try:
  283. if isinstance(group, Group):
  284. await self._sync_group(group, create_group_portal)
  285. elif isinstance(group, GroupV2):
  286. await self._sync_group_v2(group, create_group_portal)
  287. else:
  288. self.log.warning("Unknown return type in list_groups: %s", type(group))
  289. except Exception:
  290. self.log.exception(f"Failed to sync group {group_id}")
  291. # region Database getters
  292. def _add_to_cache(self) -> None:
  293. self.by_mxid[self.mxid] = self
  294. if self.username:
  295. self.by_username[self.username] = self
  296. if self.uuid:
  297. self.by_uuid[self.uuid] = self
  298. @classmethod
  299. @async_getter_lock
  300. async def get_by_mxid(cls, mxid: UserID, create: bool = True) -> Optional["User"]:
  301. # Never allow ghosts to be users
  302. if pu.Puppet.get_id_from_mxid(mxid):
  303. return None
  304. try:
  305. return cls.by_mxid[mxid]
  306. except KeyError:
  307. pass
  308. user = cast(cls, await super().get_by_mxid(mxid))
  309. if user is not None:
  310. user._add_to_cache()
  311. return user
  312. if create:
  313. user = cls(mxid)
  314. await user.insert()
  315. user._add_to_cache()
  316. return user
  317. return None
  318. @classmethod
  319. @async_getter_lock
  320. async def get_by_username(cls, username: str) -> Optional["User"]:
  321. try:
  322. return cls.by_username[username]
  323. except KeyError:
  324. pass
  325. user = cast(cls, await super().get_by_username(username))
  326. if user is not None:
  327. user._add_to_cache()
  328. return user
  329. return None
  330. @classmethod
  331. @async_getter_lock
  332. async def get_by_uuid(cls, uuid: UUID) -> Optional["User"]:
  333. try:
  334. return cls.by_uuid[uuid]
  335. except KeyError:
  336. pass
  337. user = cast(cls, await super().get_by_uuid(uuid))
  338. if user is not None:
  339. user._add_to_cache()
  340. return user
  341. return None
  342. @classmethod
  343. async def get_by_address(cls, address: Address) -> Optional["User"]:
  344. if address.uuid:
  345. return await cls.get_by_uuid(address.uuid)
  346. elif address.number:
  347. return await cls.get_by_username(address.number)
  348. else:
  349. raise ValueError("Given address is blank")
  350. @classmethod
  351. async def all_logged_in(cls) -> AsyncGenerator["User", None]:
  352. users = await super().all_logged_in()
  353. user: cls
  354. for user in users:
  355. try:
  356. yield cls.by_mxid[user.mxid]
  357. except KeyError:
  358. user._add_to_cache()
  359. yield user
  360. # endregion