user.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2020 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from typing import (Dict, Optional, AsyncIterable, Awaitable, AsyncGenerator, List, TYPE_CHECKING,
  17. cast)
  18. import asyncio
  19. import logging
  20. import time
  21. from mauigpapi import AndroidAPI, AndroidState, AndroidMQTT
  22. from mauigpapi.mqtt import Connect, Disconnect, GraphQLSubscription, SkywalkerSubscription
  23. from mauigpapi.types import (CurrentUser, MessageSyncEvent, Operation, RealtimeDirectEvent,
  24. ActivityIndicatorData, TypingStatus, ThreadSyncEvent, Thread)
  25. from mauigpapi.errors import (IGNotLoggedInError, MQTTNotLoggedIn, MQTTNotConnected,
  26. IrisSubscribeError, IGUserIDNotFoundError)
  27. from mautrix.bridge import BaseUser, BridgeState, async_getter_lock
  28. from mautrix.types import UserID, RoomID, EventID, TextMessageEventContent, MessageType
  29. from mautrix.appservice import AppService
  30. from mautrix.util.bridge_state import BridgeStateEvent
  31. from mautrix.util.logging import TraceLogger
  32. from mautrix.util.opt_prometheus import Summary, Gauge, async_time
  33. from .db import User as DBUser, Portal as DBPortal
  34. from .config import Config
  35. from . import puppet as pu, portal as po
  36. if TYPE_CHECKING:
  37. from .__main__ import InstagramBridge
  38. METRIC_MESSAGE = Summary("bridge_on_message", "calls to handle_message")
  39. METRIC_THREAD_SYNC = Summary("bridge_on_thread_sync", "calls to handle_thread_sync")
  40. METRIC_RTD = Summary("bridge_on_rtd", "calls to handle_rtd")
  41. METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
  42. METRIC_CONNECTED = Gauge("bridge_connected", "Bridged users connected to Instagram")
  43. BridgeState.human_readable_errors.update({
  44. "ig-connection-error": "Instagram disconnected unexpectedly",
  45. "ig-auth-error": "Authentication error from Instagram: {message}",
  46. "ig-disconnected": None,
  47. "ig-no-mqtt": "You're not connected to Instagram",
  48. "logged-out": "You're not logged into Instagram",
  49. })
  50. class User(DBUser, BaseUser):
  51. ig_base_log: TraceLogger = logging.getLogger("mau.instagram")
  52. _activity_indicator_ids: Dict[str, int] = {}
  53. by_mxid: Dict[UserID, 'User'] = {}
  54. by_igpk: Dict[int, 'User'] = {}
  55. config: Config
  56. az: AppService
  57. loop: asyncio.AbstractEventLoop
  58. client: Optional[AndroidAPI]
  59. mqtt: Optional[AndroidMQTT]
  60. _listen_task: Optional[asyncio.Task] = None
  61. permission_level: str
  62. username: Optional[str]
  63. _notice_room_lock: asyncio.Lock
  64. _notice_send_lock: asyncio.Lock
  65. _is_logged_in: bool
  66. _is_connected: bool
  67. shutdown: bool
  68. remote_typing_status: Optional[TypingStatus]
  69. def __init__(self, mxid: UserID, igpk: Optional[int] = None,
  70. state: Optional[AndroidState] = None, notice_room: Optional[RoomID] = None
  71. ) -> None:
  72. super().__init__(mxid=mxid, igpk=igpk, state=state, notice_room=notice_room)
  73. BaseUser.__init__(self)
  74. self._notice_room_lock = asyncio.Lock()
  75. self._notice_send_lock = asyncio.Lock()
  76. perms = self.config.get_permissions(mxid)
  77. self.is_whitelisted, self.is_admin, self.permission_level = perms
  78. self.client = None
  79. self.mqtt = None
  80. self.username = None
  81. self._is_logged_in = False
  82. self._is_connected = False
  83. self._is_refreshing = False
  84. self.shutdown = False
  85. self._listen_task = None
  86. self.remote_typing_status = None
  87. @classmethod
  88. def init_cls(cls, bridge: 'InstagramBridge') -> AsyncIterable[Awaitable[None]]:
  89. cls.bridge = bridge
  90. cls.config = bridge.config
  91. cls.az = bridge.az
  92. cls.loop = bridge.loop
  93. return (user.try_connect() async for user in cls.all_logged_in())
  94. # region Connection management
  95. async def is_logged_in(self) -> bool:
  96. return bool(self.client) and self._is_logged_in
  97. async def try_connect(self) -> None:
  98. try:
  99. await self.connect()
  100. except Exception:
  101. self.log.exception("Error while connecting to Instagram")
  102. @property
  103. def api_log(self) -> TraceLogger:
  104. return self.ig_base_log.getChild("http").getChild(self.mxid)
  105. @property
  106. def is_connected(self) -> bool:
  107. return bool(self.client) and bool(self.mqtt) and self._is_connected
  108. async def connect(self) -> None:
  109. client = AndroidAPI(self.state, log=self.api_log)
  110. try:
  111. resp = await client.current_user()
  112. except IGNotLoggedInError as e:
  113. self.log.warning(f"Failed to connect to Instagram: {e}, logging out")
  114. await self.send_bridge_notice(f"You have been logged out of Instagram: {e!s}",
  115. important=True, error_code="ig-auth-error",
  116. error_message=str(e))
  117. await self.logout(from_error=True)
  118. return
  119. self.client = client
  120. self._is_logged_in = True
  121. self.igpk = resp.user.pk
  122. self.username = resp.user.username
  123. self._track_metric(METRIC_LOGGED_IN, True)
  124. self.by_igpk[self.igpk] = self
  125. self.mqtt = AndroidMQTT(self.state, loop=self.loop,
  126. log=self.ig_base_log.getChild("mqtt").getChild(self.mxid))
  127. self.mqtt.add_event_handler(Connect, self.on_connect)
  128. self.mqtt.add_event_handler(Disconnect, self.on_disconnect)
  129. self.mqtt.add_event_handler(MessageSyncEvent, self.handle_message)
  130. self.mqtt.add_event_handler(ThreadSyncEvent, self.handle_thread_sync)
  131. self.mqtt.add_event_handler(RealtimeDirectEvent, self.handle_rtd)
  132. await self.update()
  133. self.loop.create_task(self._try_sync_puppet(resp.user))
  134. self.loop.create_task(self._try_sync())
  135. async def on_connect(self, evt: Connect) -> None:
  136. self.log.debug("Connected to Instagram")
  137. self._track_metric(METRIC_CONNECTED, True)
  138. self._is_connected = True
  139. await self.send_bridge_notice("Connected to Instagram")
  140. await self.push_bridge_state(BridgeStateEvent.CONNECTED)
  141. async def on_disconnect(self, evt: Disconnect) -> None:
  142. self.log.debug("Disconnected from Instagram")
  143. self._track_metric(METRIC_CONNECTED, False)
  144. self._is_connected = False
  145. # TODO this stuff could probably be moved to mautrix-python
  146. async def get_notice_room(self) -> RoomID:
  147. if not self.notice_room:
  148. async with self._notice_room_lock:
  149. # If someone already created the room while this call was waiting,
  150. # don't make a new room
  151. if self.notice_room:
  152. return self.notice_room
  153. self.notice_room = await self.az.intent.create_room(
  154. is_direct=True, invitees=[self.mxid],
  155. topic="Instagram bridge notices")
  156. await self.update()
  157. return self.notice_room
  158. async def fill_bridge_state(self, state: BridgeState) -> None:
  159. await super().fill_bridge_state(state)
  160. if not state.remote_id:
  161. if self.igpk:
  162. state.remote_id = str(self.igpk)
  163. else:
  164. try:
  165. state.remote_id = self.state.user_id
  166. except IGUserIDNotFoundError:
  167. state.remote_id = None
  168. if self.username:
  169. state.remote_name = f"@{self.username}"
  170. async def get_bridge_states(self) -> List[BridgeState]:
  171. if not self.state:
  172. return []
  173. state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR)
  174. if self.is_connected:
  175. state.state_event = BridgeStateEvent.CONNECTED
  176. elif self._is_refreshing or self.mqtt:
  177. state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
  178. return [state]
  179. async def send_bridge_notice(self, text: str, edit: Optional[EventID] = None,
  180. state_event: Optional[BridgeStateEvent] = None,
  181. important: bool = False, error_code: Optional[str] = None,
  182. error_message: Optional[str] = None) -> Optional[EventID]:
  183. if state_event:
  184. await self.push_bridge_state(state_event, error=error_code,
  185. message=error_message if error_code else text)
  186. if self.config["bridge.disable_bridge_notices"]:
  187. return None
  188. if not important and not self.config["bridge.unimportant_bridge_notices"]:
  189. self.log.debug("Not sending unimportant bridge notice: %s", text)
  190. return None
  191. event_id = None
  192. try:
  193. self.log.debug("Sending bridge notice: %s", text)
  194. content = TextMessageEventContent(body=text, msgtype=(MessageType.TEXT if important
  195. else MessageType.NOTICE))
  196. if edit:
  197. content.set_edit(edit)
  198. # This is locked to prevent notices going out in the wrong order
  199. async with self._notice_send_lock:
  200. event_id = await self.az.intent.send_message(await self.get_notice_room(), content)
  201. except Exception:
  202. self.log.warning("Failed to send bridge notice", exc_info=True)
  203. return edit or event_id
  204. async def _try_sync_puppet(self, user_info: CurrentUser) -> None:
  205. puppet = await pu.Puppet.get_by_pk(self.igpk)
  206. try:
  207. await puppet.update_info(user_info, self)
  208. except Exception:
  209. self.log.exception("Failed to update own puppet info")
  210. try:
  211. if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
  212. self.log.info(f"Automatically enabling custom puppet")
  213. await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
  214. except Exception:
  215. self.log.exception("Failed to automatically enable custom puppet")
  216. async def _try_sync(self) -> None:
  217. try:
  218. await self.sync()
  219. except Exception:
  220. self.log.exception("Exception while syncing")
  221. await self.push_bridge_state(BridgeStateEvent.UNKNOWN_ERROR)
  222. async def get_direct_chats(self) -> Dict[UserID, List[RoomID]]:
  223. return {
  224. pu.Puppet.get_mxid_from_id(portal.other_user_pk): [portal.mxid]
  225. for portal in await DBPortal.find_private_chats_of(self.igpk)
  226. if portal.mxid
  227. }
  228. async def refresh(self, resync: bool = True) -> None:
  229. self._is_refreshing = True
  230. try:
  231. await self.stop_listen()
  232. if resync:
  233. retry_count = 0
  234. while True:
  235. try:
  236. await self.sync()
  237. return
  238. except Exception:
  239. if retry_count >= 4:
  240. raise
  241. retry_count += 1
  242. self.log.exception("Error while syncing for refresh, retrying in 1 minute")
  243. await self.push_bridge_state(BridgeStateEvent.UNKNOWN_ERROR)
  244. await asyncio.sleep(60)
  245. else:
  246. await self.start_listen()
  247. finally:
  248. self._is_refreshing = False
  249. async def _sync_thread(self, thread: Thread, min_active_at: int) -> None:
  250. portal = await po.Portal.get_by_thread(thread, self.igpk)
  251. if portal.mxid:
  252. self.log.debug(f"{thread.thread_id} has a portal, syncing and backfilling...")
  253. await portal.update_matrix_room(self, thread, backfill=True)
  254. elif thread.last_activity_at > min_active_at:
  255. self.log.debug(f"{thread.thread_id} has been active recently, creating portal...")
  256. await portal.create_matrix_room(self, thread)
  257. else:
  258. self.log.debug(f"{thread.thread_id} is not active and doesn't have a portal")
  259. async def sync(self) -> None:
  260. resp = await self.client.get_inbox()
  261. max_age = self.config["bridge.portal_create_max_age"] * 1_000_000
  262. limit = self.config["bridge.chat_sync_limit"]
  263. min_active_at = (time.time() * 1_000_000) - max_age
  264. i = 0
  265. await self.push_bridge_state(BridgeStateEvent.BACKFILLING)
  266. async for thread in self.client.iter_inbox(start_at=resp):
  267. try:
  268. await self._sync_thread(thread, min_active_at)
  269. except Exception:
  270. self.log.exception(f"Error syncing thread {thread.thread_id}")
  271. i += 1
  272. if i >= limit:
  273. break
  274. try:
  275. await self.update_direct_chats()
  276. except Exception:
  277. self.log.exception("Error updating direct chat list")
  278. if not self._listen_task:
  279. await self.start_listen(resp.seq_id, resp.snapshot_at_ms)
  280. async def start_listen(self, seq_id: Optional[int] = None, snapshot_at_ms: Optional[int] = None
  281. ) -> None:
  282. self.shutdown = False
  283. if not seq_id:
  284. resp = await self.client.get_inbox(limit=1)
  285. seq_id, snapshot_at_ms = resp.seq_id, resp.snapshot_at_ms
  286. task = self.listen(seq_id=seq_id, snapshot_at_ms=snapshot_at_ms)
  287. self._listen_task = self.loop.create_task(task)
  288. async def listen(self, seq_id: int, snapshot_at_ms: int) -> None:
  289. try:
  290. await self.mqtt.listen(
  291. graphql_subs={GraphQLSubscription.app_presence(),
  292. GraphQLSubscription.direct_typing(self.state.user_id),
  293. GraphQLSubscription.direct_status()},
  294. skywalker_subs={SkywalkerSubscription.direct_sub(self.state.user_id),
  295. SkywalkerSubscription.live_sub(self.state.user_id)},
  296. seq_id=seq_id, snapshot_at_ms=snapshot_at_ms)
  297. except IrisSubscribeError as e:
  298. self.log.warning(f"Got IrisSubscribeError {e}, refreshing...")
  299. await self.refresh()
  300. except (MQTTNotConnected, MQTTNotLoggedIn) as e:
  301. await self.send_bridge_notice(f"Error in listener: {e}", important=True,
  302. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  303. error_code="ig-connection-error")
  304. self.mqtt.disconnect()
  305. except Exception:
  306. self.log.exception("Fatal error in listener")
  307. await self.send_bridge_notice("Fatal error in listener (see logs for more info)",
  308. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  309. important=True, error_code="ig-connection-error")
  310. self.mqtt.disconnect()
  311. else:
  312. if not self.shutdown:
  313. await self.send_bridge_notice("Instagram connection closed without error",
  314. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  315. error_code="ig-disconnected")
  316. finally:
  317. self._listen_task = None
  318. self._is_connected = False
  319. self._track_metric(METRIC_CONNECTED, False)
  320. async def stop_listen(self) -> None:
  321. if self.mqtt:
  322. self.shutdown = True
  323. self.mqtt.disconnect()
  324. if self._listen_task:
  325. await self._listen_task
  326. self.shutdown = False
  327. self._track_metric(METRIC_CONNECTED, False)
  328. self._is_connected = False
  329. await self.update()
  330. async def logout(self, from_error: bool = False) -> None:
  331. if self.client:
  332. try:
  333. await self.client.logout(one_tap_app_login=False)
  334. except Exception:
  335. self.log.debug("Exception logging out", exc_info=True)
  336. if self.mqtt:
  337. self.mqtt.disconnect()
  338. self._track_metric(METRIC_CONNECTED, False)
  339. self._track_metric(METRIC_LOGGED_IN, False)
  340. if not from_error:
  341. await self.push_bridge_state(BridgeStateEvent.LOGGED_OUT)
  342. puppet = await pu.Puppet.get_by_pk(self.igpk, create=False)
  343. if puppet and puppet.is_real_user:
  344. await puppet.switch_mxid(None, None)
  345. try:
  346. del self.by_igpk[self.igpk]
  347. except KeyError:
  348. pass
  349. self.igpk = None
  350. else:
  351. await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS)
  352. self.client = None
  353. self.mqtt = None
  354. self.state = None
  355. self._is_logged_in = False
  356. await self.update()
  357. # endregion
  358. # region Event handlers
  359. @async_time(METRIC_MESSAGE)
  360. async def handle_message(self, evt: MessageSyncEvent) -> None:
  361. portal = await po.Portal.get_by_thread_id(evt.message.thread_id, receiver=self.igpk)
  362. if not portal or not portal.mxid:
  363. self.log.debug("Got message in thread with no portal, getting info...")
  364. resp = await self.client.get_thread(evt.message.thread_id)
  365. portal = await po.Portal.get_by_thread(resp.thread, self.igpk)
  366. self.log.debug("Got info for unknown portal, creating room")
  367. await portal.create_matrix_room(self, resp.thread)
  368. if not portal.mxid:
  369. self.log.warning("Room creation appears to have failed, "
  370. f"dropping message in {evt.message.thread_id}")
  371. return
  372. self.log.trace(f"Received message sync event {evt.message}")
  373. sender = await pu.Puppet.get_by_pk(evt.message.user_id) if evt.message.user_id else None
  374. if evt.message.op == Operation.ADD:
  375. if not sender:
  376. # I don't think we care about adds with no sender
  377. return
  378. await portal.handle_instagram_item(self, sender, evt.message)
  379. elif evt.message.op == Operation.REMOVE:
  380. # Removes don't have a sender, only the message sender can unsend messages anyway
  381. await portal.handle_instagram_remove(evt.message.item_id)
  382. elif evt.message.op == Operation.REPLACE:
  383. await portal.handle_instagram_update(evt.message)
  384. @async_time(METRIC_THREAD_SYNC)
  385. async def handle_thread_sync(self, evt: ThreadSyncEvent) -> None:
  386. self.log.trace("Received thread sync event %s", evt)
  387. portal = await po.Portal.get_by_thread(evt, receiver=self.igpk)
  388. await portal.create_matrix_room(self, evt)
  389. @async_time(METRIC_RTD)
  390. async def handle_rtd(self, evt: RealtimeDirectEvent) -> None:
  391. if not isinstance(evt.value, ActivityIndicatorData):
  392. return
  393. now = int(time.time() * 1000)
  394. date = int(evt.value.timestamp) // 1000
  395. expiry = date + evt.value.ttl
  396. if expiry < now:
  397. return
  398. if evt.activity_indicator_id in self._activity_indicator_ids:
  399. return
  400. # TODO clear expired items from this dict
  401. self._activity_indicator_ids[evt.activity_indicator_id] = expiry
  402. puppet = await pu.Puppet.get_by_pk(int(evt.value.sender_id))
  403. portal = await po.Portal.get_by_thread_id(evt.thread_id, receiver=self.igpk)
  404. if not puppet or not portal or not portal.mxid:
  405. return
  406. is_typing = evt.value.activity_status != TypingStatus.OFF
  407. if puppet.pk == self.igpk:
  408. self.remote_typing_status = TypingStatus.TEXT if is_typing else TypingStatus.OFF
  409. await puppet.intent_for(portal).set_typing(portal.mxid, is_typing=is_typing,
  410. timeout=evt.value.ttl)
  411. # endregion
  412. # region Database getters
  413. def _add_to_cache(self) -> None:
  414. self.by_mxid[self.mxid] = self
  415. if self.igpk:
  416. self.by_igpk[self.igpk] = self
  417. @classmethod
  418. @async_getter_lock
  419. async def get_by_mxid(cls, mxid: UserID, *, create: bool = True) -> Optional['User']:
  420. # Never allow ghosts to be users
  421. if pu.Puppet.get_id_from_mxid(mxid):
  422. return None
  423. try:
  424. return cls.by_mxid[mxid]
  425. except KeyError:
  426. pass
  427. user = cast(cls, await super().get_by_mxid(mxid))
  428. if user is not None:
  429. user._add_to_cache()
  430. return user
  431. if create:
  432. user = cls(mxid)
  433. await user.insert()
  434. user._add_to_cache()
  435. return user
  436. return None
  437. @classmethod
  438. @async_getter_lock
  439. async def get_by_igpk(cls, igpk: int) -> Optional['User']:
  440. try:
  441. return cls.by_igpk[igpk]
  442. except KeyError:
  443. pass
  444. user = cast(cls, await super().get_by_igpk(igpk))
  445. if user is not None:
  446. user._add_to_cache()
  447. return user
  448. return None
  449. @classmethod
  450. async def all_logged_in(cls) -> AsyncGenerator['User', None]:
  451. users = await super().all_logged_in()
  452. user: cls
  453. for index, user in enumerate(users):
  454. try:
  455. yield cls.by_mxid[user.mxid]
  456. except KeyError:
  457. user._add_to_cache()
  458. yield user
  459. # endregion