user.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2022 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable, cast
  18. import asyncio
  19. import logging
  20. import time
  21. from mauigpapi import AndroidAPI, AndroidMQTT, AndroidState
  22. from mauigpapi.errors import (
  23. IGNotLoggedInError,
  24. IGUserIDNotFoundError,
  25. IrisSubscribeError,
  26. MQTTNotConnected,
  27. MQTTNotLoggedIn,
  28. )
  29. from mauigpapi.mqtt import Connect, Disconnect, GraphQLSubscription, SkywalkerSubscription
  30. from mauigpapi.types import (
  31. ActivityIndicatorData,
  32. CurrentUser,
  33. MessageSyncEvent,
  34. Operation,
  35. RealtimeDirectEvent,
  36. Thread,
  37. ThreadSyncEvent,
  38. TypingStatus,
  39. )
  40. from mautrix.appservice import AppService
  41. from mautrix.bridge import BaseUser, async_getter_lock
  42. from mautrix.types import EventID, MessageType, RoomID, TextMessageEventContent, UserID
  43. from mautrix.util.bridge_state import BridgeState, BridgeStateEvent
  44. from mautrix.util.logging import TraceLogger
  45. from mautrix.util.opt_prometheus import Gauge, Summary, async_time
  46. from . import portal as po, puppet as pu
  47. from .config import Config
  48. from .db import Portal as DBPortal, User as DBUser
  49. if TYPE_CHECKING:
  50. from .__main__ import InstagramBridge
  51. METRIC_MESSAGE = Summary("bridge_on_message", "calls to handle_message")
  52. METRIC_THREAD_SYNC = Summary("bridge_on_thread_sync", "calls to handle_thread_sync")
  53. METRIC_RTD = Summary("bridge_on_rtd", "calls to handle_rtd")
  54. METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
  55. METRIC_CONNECTED = Gauge("bridge_connected", "Bridged users connected to Instagram")
  56. BridgeState.human_readable_errors.update(
  57. {
  58. "ig-connection-error": "Instagram disconnected unexpectedly",
  59. "ig-auth-error": "Authentication error from Instagram: {message}",
  60. "ig-disconnected": None,
  61. "ig-no-mqtt": "You're not connected to Instagram",
  62. "logged-out": "You're not logged into Instagram",
  63. }
  64. )
  65. class User(DBUser, BaseUser):
  66. ig_base_log: TraceLogger = logging.getLogger("mau.instagram")
  67. _activity_indicator_ids: dict[str, int] = {}
  68. by_mxid: dict[UserID, User] = {}
  69. by_igpk: dict[int, User] = {}
  70. config: Config
  71. az: AppService
  72. loop: asyncio.AbstractEventLoop
  73. client: AndroidAPI | None
  74. mqtt: AndroidMQTT | None
  75. _listen_task: asyncio.Task | None = None
  76. permission_level: str
  77. username: str | None
  78. _notice_room_lock: asyncio.Lock
  79. _notice_send_lock: asyncio.Lock
  80. _is_logged_in: bool
  81. _is_connected: bool
  82. shutdown: bool
  83. remote_typing_status: TypingStatus | None
  84. def __init__(
  85. self,
  86. mxid: UserID,
  87. igpk: int | None = None,
  88. state: AndroidState | None = None,
  89. notice_room: RoomID | None = None,
  90. ) -> None:
  91. super().__init__(mxid=mxid, igpk=igpk, state=state, notice_room=notice_room)
  92. BaseUser.__init__(self)
  93. self._notice_room_lock = asyncio.Lock()
  94. self._notice_send_lock = asyncio.Lock()
  95. perms = self.config.get_permissions(mxid)
  96. self.relay_whitelisted, self.is_whitelisted, self.is_admin, self.permission_level = perms
  97. self.client = None
  98. self.mqtt = None
  99. self.username = None
  100. self._is_logged_in = False
  101. self._is_connected = False
  102. self._is_refreshing = False
  103. self.shutdown = False
  104. self._listen_task = None
  105. self.remote_typing_status = None
  106. @classmethod
  107. def init_cls(cls, bridge: "InstagramBridge") -> AsyncIterable[Awaitable[None]]:
  108. cls.bridge = bridge
  109. cls.config = bridge.config
  110. cls.az = bridge.az
  111. cls.loop = bridge.loop
  112. return (user.try_connect() async for user in cls.all_logged_in())
  113. # region Connection management
  114. async def is_logged_in(self) -> bool:
  115. return bool(self.client) and self._is_logged_in
  116. async def get_puppet(self) -> pu.Puppet | None:
  117. if not self.igpk:
  118. return None
  119. return await pu.Puppet.get_by_pk(self.igpk)
  120. async def try_connect(self) -> None:
  121. try:
  122. await self.connect()
  123. except Exception:
  124. self.log.exception("Error while connecting to Instagram")
  125. @property
  126. def api_log(self) -> TraceLogger:
  127. return self.ig_base_log.getChild("http").getChild(self.mxid)
  128. @property
  129. def is_connected(self) -> bool:
  130. return bool(self.client) and bool(self.mqtt) and self._is_connected
  131. async def connect(self) -> None:
  132. client = AndroidAPI(self.state, log=self.api_log)
  133. try:
  134. resp = await client.current_user()
  135. except IGNotLoggedInError as e:
  136. self.log.warning(f"Failed to connect to Instagram: {e}, logging out")
  137. await self.send_bridge_notice(
  138. f"You have been logged out of Instagram: {e!s}",
  139. important=True,
  140. error_code="ig-auth-error",
  141. error_message=str(e),
  142. )
  143. await self.logout(from_error=True)
  144. return
  145. self.client = client
  146. self._is_logged_in = True
  147. self.igpk = resp.user.pk
  148. self.username = resp.user.username
  149. self._track_metric(METRIC_LOGGED_IN, True)
  150. self.by_igpk[self.igpk] = self
  151. self.mqtt = AndroidMQTT(
  152. self.state, loop=self.loop, log=self.ig_base_log.getChild("mqtt").getChild(self.mxid)
  153. )
  154. self.mqtt.add_event_handler(Connect, self.on_connect)
  155. self.mqtt.add_event_handler(Disconnect, self.on_disconnect)
  156. self.mqtt.add_event_handler(MessageSyncEvent, self.handle_message)
  157. self.mqtt.add_event_handler(ThreadSyncEvent, self.handle_thread_sync)
  158. self.mqtt.add_event_handler(RealtimeDirectEvent, self.handle_rtd)
  159. await self.update()
  160. self.loop.create_task(self._try_sync_puppet(resp.user))
  161. self.loop.create_task(self._try_sync())
  162. async def on_connect(self, evt: Connect) -> None:
  163. self.log.debug("Connected to Instagram")
  164. self._track_metric(METRIC_CONNECTED, True)
  165. self._is_connected = True
  166. await self.send_bridge_notice("Connected to Instagram")
  167. await self.push_bridge_state(BridgeStateEvent.CONNECTED)
  168. async def on_disconnect(self, evt: Disconnect) -> None:
  169. self.log.debug("Disconnected from Instagram")
  170. self._track_metric(METRIC_CONNECTED, False)
  171. self._is_connected = False
  172. # TODO this stuff could probably be moved to mautrix-python
  173. async def get_notice_room(self) -> RoomID:
  174. if not self.notice_room:
  175. async with self._notice_room_lock:
  176. # If someone already created the room while this call was waiting,
  177. # don't make a new room
  178. if self.notice_room:
  179. return self.notice_room
  180. creation_content = {}
  181. if not self.config["bridge.federate_rooms"]:
  182. creation_content["m.federate"] = False
  183. self.notice_room = await self.az.intent.create_room(
  184. is_direct=True,
  185. invitees=[self.mxid],
  186. topic="Instagram bridge notices",
  187. creation_content=creation_content,
  188. )
  189. await self.update()
  190. return self.notice_room
  191. async def fill_bridge_state(self, state: BridgeState) -> None:
  192. await super().fill_bridge_state(state)
  193. if not state.remote_id:
  194. if self.igpk:
  195. state.remote_id = str(self.igpk)
  196. else:
  197. try:
  198. state.remote_id = self.state.user_id
  199. except IGUserIDNotFoundError:
  200. state.remote_id = None
  201. if self.username:
  202. state.remote_name = f"@{self.username}"
  203. async def get_bridge_states(self) -> list[BridgeState]:
  204. if not self.state:
  205. return []
  206. state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR)
  207. if self.is_connected:
  208. state.state_event = BridgeStateEvent.CONNECTED
  209. elif self._is_refreshing or self.mqtt:
  210. state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
  211. return [state]
  212. async def send_bridge_notice(
  213. self,
  214. text: str,
  215. edit: EventID | None = None,
  216. state_event: BridgeStateEvent | None = None,
  217. important: bool = False,
  218. error_code: str | None = None,
  219. error_message: str | None = None,
  220. ) -> EventID | None:
  221. if state_event:
  222. await self.push_bridge_state(
  223. state_event, error=error_code, message=error_message if error_code else text
  224. )
  225. if self.config["bridge.disable_bridge_notices"]:
  226. return None
  227. if not important and not self.config["bridge.unimportant_bridge_notices"]:
  228. self.log.debug("Not sending unimportant bridge notice: %s", text)
  229. return None
  230. event_id = None
  231. try:
  232. self.log.debug("Sending bridge notice: %s", text)
  233. content = TextMessageEventContent(
  234. body=text, msgtype=(MessageType.TEXT if important else MessageType.NOTICE)
  235. )
  236. if edit:
  237. content.set_edit(edit)
  238. # This is locked to prevent notices going out in the wrong order
  239. async with self._notice_send_lock:
  240. event_id = await self.az.intent.send_message(await self.get_notice_room(), content)
  241. except Exception:
  242. self.log.warning("Failed to send bridge notice", exc_info=True)
  243. return edit or event_id
  244. async def _try_sync_puppet(self, user_info: CurrentUser) -> None:
  245. puppet = await pu.Puppet.get_by_pk(self.igpk)
  246. try:
  247. await puppet.update_info(user_info, self)
  248. except Exception:
  249. self.log.exception("Failed to update own puppet info")
  250. try:
  251. if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
  252. self.log.info(f"Automatically enabling custom puppet")
  253. await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
  254. except Exception:
  255. self.log.exception("Failed to automatically enable custom puppet")
  256. async def _try_sync(self) -> None:
  257. try:
  258. await self.sync()
  259. except Exception:
  260. self.log.exception("Exception while syncing")
  261. await self.push_bridge_state(BridgeStateEvent.UNKNOWN_ERROR)
  262. async def get_direct_chats(self) -> dict[UserID, list[RoomID]]:
  263. return {
  264. pu.Puppet.get_mxid_from_id(portal.other_user_pk): [portal.mxid]
  265. for portal in await DBPortal.find_private_chats_of(self.igpk)
  266. if portal.mxid
  267. }
  268. async def refresh(self, resync: bool = True) -> None:
  269. self._is_refreshing = True
  270. try:
  271. await self.stop_listen()
  272. if resync:
  273. retry_count = 0
  274. while True:
  275. try:
  276. await self.sync()
  277. return
  278. except Exception:
  279. if retry_count >= 4:
  280. raise
  281. retry_count += 1
  282. self.log.exception("Error while syncing for refresh, retrying in 1 minute")
  283. await self.push_bridge_state(BridgeStateEvent.UNKNOWN_ERROR)
  284. await asyncio.sleep(60)
  285. else:
  286. await self.start_listen()
  287. finally:
  288. self._is_refreshing = False
  289. async def _sync_thread(self, thread: Thread, min_active_at: int) -> None:
  290. portal = await po.Portal.get_by_thread(thread, self.igpk)
  291. if portal.mxid:
  292. self.log.debug(f"{thread.thread_id} has a portal, syncing and backfilling...")
  293. await portal.update_matrix_room(self, thread, backfill=True)
  294. elif thread.last_activity_at > min_active_at:
  295. self.log.debug(f"{thread.thread_id} has been active recently, creating portal...")
  296. await portal.create_matrix_room(self, thread)
  297. else:
  298. self.log.debug(f"{thread.thread_id} is not active and doesn't have a portal")
  299. async def sync(self) -> None:
  300. resp = await self.client.get_inbox()
  301. max_age = self.config["bridge.portal_create_max_age"] * 1_000_000
  302. limit = self.config["bridge.chat_sync_limit"]
  303. min_active_at = (time.time() * 1_000_000) - max_age
  304. i = 0
  305. await self.push_bridge_state(BridgeStateEvent.BACKFILLING)
  306. async for thread in self.client.iter_inbox(start_at=resp):
  307. try:
  308. await self._sync_thread(thread, min_active_at)
  309. except Exception:
  310. self.log.exception(f"Error syncing thread {thread.thread_id}")
  311. i += 1
  312. if i >= limit:
  313. break
  314. try:
  315. await self.update_direct_chats()
  316. except Exception:
  317. self.log.exception("Error updating direct chat list")
  318. if not self._listen_task:
  319. await self.start_listen(resp.seq_id, resp.snapshot_at_ms)
  320. async def start_listen(
  321. self, seq_id: int | None = None, snapshot_at_ms: int | None = None
  322. ) -> None:
  323. self.shutdown = False
  324. if not seq_id:
  325. resp = await self.client.get_inbox(limit=1)
  326. seq_id, snapshot_at_ms = resp.seq_id, resp.snapshot_at_ms
  327. task = self.listen(seq_id=seq_id, snapshot_at_ms=snapshot_at_ms)
  328. self._listen_task = self.loop.create_task(task)
  329. async def listen(self, seq_id: int, snapshot_at_ms: int) -> None:
  330. try:
  331. await self.mqtt.listen(
  332. graphql_subs={
  333. GraphQLSubscription.app_presence(),
  334. GraphQLSubscription.direct_typing(self.state.user_id),
  335. GraphQLSubscription.direct_status(),
  336. },
  337. skywalker_subs={
  338. SkywalkerSubscription.direct_sub(self.state.user_id),
  339. SkywalkerSubscription.live_sub(self.state.user_id),
  340. },
  341. seq_id=seq_id,
  342. snapshot_at_ms=snapshot_at_ms,
  343. )
  344. except IrisSubscribeError as e:
  345. self.log.warning(f"Got IrisSubscribeError {e}, refreshing...")
  346. await self.refresh()
  347. except (MQTTNotConnected, MQTTNotLoggedIn) as e:
  348. await self.send_bridge_notice(
  349. f"Error in listener: {e}",
  350. important=True,
  351. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  352. error_code="ig-connection-error",
  353. )
  354. self.mqtt.disconnect()
  355. except Exception:
  356. self.log.exception("Fatal error in listener")
  357. await self.send_bridge_notice(
  358. "Fatal error in listener (see logs for more info)",
  359. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  360. important=True,
  361. error_code="ig-connection-error",
  362. )
  363. self.mqtt.disconnect()
  364. else:
  365. if not self.shutdown:
  366. await self.send_bridge_notice(
  367. "Instagram connection closed without error",
  368. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  369. error_code="ig-disconnected",
  370. )
  371. finally:
  372. self._listen_task = None
  373. self._is_connected = False
  374. self._track_metric(METRIC_CONNECTED, False)
  375. async def stop_listen(self) -> None:
  376. if self.mqtt:
  377. self.shutdown = True
  378. self.mqtt.disconnect()
  379. if self._listen_task:
  380. await self._listen_task
  381. self.shutdown = False
  382. self._track_metric(METRIC_CONNECTED, False)
  383. self._is_connected = False
  384. await self.update()
  385. async def logout(self, from_error: bool = False) -> None:
  386. if self.client:
  387. try:
  388. await self.client.logout(one_tap_app_login=False)
  389. except Exception:
  390. self.log.debug("Exception logging out", exc_info=True)
  391. if self.mqtt:
  392. self.mqtt.disconnect()
  393. self._track_metric(METRIC_CONNECTED, False)
  394. self._track_metric(METRIC_LOGGED_IN, False)
  395. if not from_error:
  396. await self.push_bridge_state(BridgeStateEvent.LOGGED_OUT)
  397. puppet = await pu.Puppet.get_by_pk(self.igpk, create=False)
  398. if puppet and puppet.is_real_user:
  399. await puppet.switch_mxid(None, None)
  400. try:
  401. del self.by_igpk[self.igpk]
  402. except KeyError:
  403. pass
  404. self.igpk = None
  405. else:
  406. await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS)
  407. self.client = None
  408. self.mqtt = None
  409. self.state = None
  410. self._is_logged_in = False
  411. await self.update()
  412. # endregion
  413. # region Event handlers
  414. @async_time(METRIC_MESSAGE)
  415. async def handle_message(self, evt: MessageSyncEvent) -> None:
  416. portal = await po.Portal.get_by_thread_id(evt.message.thread_id, receiver=self.igpk)
  417. if not portal or not portal.mxid:
  418. self.log.debug("Got message in thread with no portal, getting info...")
  419. resp = await self.client.get_thread(evt.message.thread_id)
  420. portal = await po.Portal.get_by_thread(resp.thread, self.igpk)
  421. self.log.debug("Got info for unknown portal, creating room")
  422. await portal.create_matrix_room(self, resp.thread)
  423. if not portal.mxid:
  424. self.log.warning(
  425. "Room creation appears to have failed, "
  426. f"dropping message in {evt.message.thread_id}"
  427. )
  428. return
  429. self.log.trace(f"Received message sync event {evt.message}")
  430. sender = await pu.Puppet.get_by_pk(evt.message.user_id) if evt.message.user_id else None
  431. if evt.message.op == Operation.ADD:
  432. if not sender:
  433. # I don't think we care about adds with no sender
  434. return
  435. await portal.handle_instagram_item(self, sender, evt.message)
  436. elif evt.message.op == Operation.REMOVE:
  437. # Removes don't have a sender, only the message sender can unsend messages anyway
  438. await portal.handle_instagram_remove(evt.message.item_id)
  439. elif evt.message.op == Operation.REPLACE:
  440. await portal.handle_instagram_update(evt.message)
  441. @async_time(METRIC_THREAD_SYNC)
  442. async def handle_thread_sync(self, evt: ThreadSyncEvent) -> None:
  443. self.log.trace("Received thread sync event %s", evt)
  444. portal = await po.Portal.get_by_thread(evt, receiver=self.igpk)
  445. await portal.create_matrix_room(self, evt)
  446. @async_time(METRIC_RTD)
  447. async def handle_rtd(self, evt: RealtimeDirectEvent) -> None:
  448. if not isinstance(evt.value, ActivityIndicatorData):
  449. return
  450. now = int(time.time() * 1000)
  451. date = int(evt.value.timestamp) // 1000
  452. expiry = date + evt.value.ttl
  453. if expiry < now:
  454. return
  455. if evt.activity_indicator_id in self._activity_indicator_ids:
  456. return
  457. # TODO clear expired items from this dict
  458. self._activity_indicator_ids[evt.activity_indicator_id] = expiry
  459. puppet = await pu.Puppet.get_by_pk(int(evt.value.sender_id))
  460. portal = await po.Portal.get_by_thread_id(evt.thread_id, receiver=self.igpk)
  461. if not puppet or not portal or not portal.mxid:
  462. return
  463. is_typing = evt.value.activity_status != TypingStatus.OFF
  464. if puppet.pk == self.igpk:
  465. self.remote_typing_status = TypingStatus.TEXT if is_typing else TypingStatus.OFF
  466. await puppet.intent_for(portal).set_typing(
  467. portal.mxid, is_typing=is_typing, timeout=evt.value.ttl
  468. )
  469. # endregion
  470. # region Database getters
  471. def _add_to_cache(self) -> None:
  472. self.by_mxid[self.mxid] = self
  473. if self.igpk:
  474. self.by_igpk[self.igpk] = self
  475. @classmethod
  476. @async_getter_lock
  477. async def get_by_mxid(cls, mxid: UserID, *, create: bool = True) -> User | None:
  478. # Never allow ghosts to be users
  479. if pu.Puppet.get_id_from_mxid(mxid):
  480. return None
  481. try:
  482. return cls.by_mxid[mxid]
  483. except KeyError:
  484. pass
  485. user = cast(cls, await super().get_by_mxid(mxid))
  486. if user is not None:
  487. user._add_to_cache()
  488. return user
  489. if create:
  490. user = cls(mxid)
  491. await user.insert()
  492. user._add_to_cache()
  493. return user
  494. return None
  495. @classmethod
  496. @async_getter_lock
  497. async def get_by_igpk(cls, igpk: int) -> User | None:
  498. try:
  499. return cls.by_igpk[igpk]
  500. except KeyError:
  501. pass
  502. user = cast(cls, await super().get_by_igpk(igpk))
  503. if user is not None:
  504. user._add_to_cache()
  505. return user
  506. return None
  507. @classmethod
  508. async def all_logged_in(cls) -> AsyncGenerator[User, None]:
  509. users = await super().all_logged_in()
  510. user: cls
  511. for index, user in enumerate(users):
  512. try:
  513. yield cls.by_mxid[user.mxid]
  514. except KeyError:
  515. user._add_to_cache()
  516. yield user
  517. # endregion