user.py 34 KB


  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2022 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable, cast
  18. import asyncio
  19. import logging
  20. import time
  21. from aiohttp import ClientConnectionError
  22. from mauigpapi import AndroidAPI, AndroidMQTT, AndroidState, ProxyHandler
  23. from mauigpapi.errors import (
  24. IGChallengeError,
  25. IGCheckpointError,
  26. IGConsentRequiredError,
  27. IGNotLoggedInError,
  28. IGRateLimitError,
  29. IGUserIDNotFoundError,
  30. IrisSubscribeError,
  31. MQTTConnectionUnauthorized,
  32. MQTTNotConnected,
  33. MQTTNotLoggedIn,
  34. )
  35. from mauigpapi.mqtt import (
  36. Connect,
  37. Disconnect,
  38. GraphQLSubscription,
  39. NewSequenceID,
  40. ProxyUpdate,
  41. SkywalkerSubscription,
  42. )
  43. from mauigpapi.types import (
  44. ActivityIndicatorData,
  45. CurrentUser,
  46. MessageSyncEvent,
  47. Operation,
  48. RealtimeDirectEvent,
  49. Thread,
  50. ThreadSyncEvent,
  51. TypingStatus,
  52. )
  53. from mautrix.appservice import AppService
  54. from mautrix.bridge import BaseUser, async_getter_lock
  55. from mautrix.types import EventID, MessageType, RoomID, TextMessageEventContent, UserID
  56. from mautrix.util.bridge_state import BridgeState, BridgeStateEvent
  57. from mautrix.util.logging import TraceLogger
  58. from mautrix.util.opt_prometheus import Gauge, Summary, async_time
  59. from . import portal as po, puppet as pu
  60. from .config import Config
  61. from .db import Portal as DBPortal, User as DBUser
  62. if TYPE_CHECKING:
  63. from .__main__ import InstagramBridge
  64. try:
  65. from aiohttp_socks import ProxyConnectionError, ProxyError, ProxyTimeoutError
  66. except ImportError:
  67. class ProxyError(Exception):
  68. pass
  69. ProxyConnectionError = ProxyTimeoutError = ProxyError
  70. METRIC_MESSAGE = Summary("bridge_on_message", "calls to handle_message")
  71. METRIC_THREAD_SYNC = Summary("bridge_on_thread_sync", "calls to handle_thread_sync")
  72. METRIC_RTD = Summary("bridge_on_rtd", "calls to handle_rtd")
  73. METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
  74. METRIC_CONNECTED = Gauge("bridge_connected", "Bridged users connected to Instagram")
  75. BridgeState.human_readable_errors.update(
  76. {
  77. "ig-connection-error": "Instagram disconnected unexpectedly",
  78. "ig-refresh-connection-error": "Reconnecting failed again after refresh: {message}",
  79. "ig-connection-fatal-error": "Instagram disconnected unexpectedly",
  80. "ig-auth-error": "Authentication error from Instagram: {message}",
  81. "ig-checkpoint": "Instagram checkpoint error. Please check the Instagram website.",
  82. "ig-consent-required": "Instagram requires a consent update. Please check the Instagram website.",
  83. "ig-checkpoint-locked": "Instagram checkpoint error. Please check the Instagram website.",
  84. "ig-rate-limit": "Got Instagram ratelimit error, waiting a few minutes before retrying...",
  85. "ig-disconnected": None,
  86. "ig-no-mqtt": "You're not connected to Instagram",
  87. "logged-out": "You're not logged into Instagram",
  88. }
  89. )
  90. class User(DBUser, BaseUser):
  91. ig_base_log: TraceLogger = logging.getLogger("mau.instagram")
  92. _activity_indicator_ids: dict[str, int] = {}
  93. by_mxid: dict[UserID, User] = {}
  94. by_igpk: dict[int, User] = {}
  95. config: Config
  96. az: AppService
  97. loop: asyncio.AbstractEventLoop
  98. client: AndroidAPI | None
  99. mqtt: AndroidMQTT | None
  100. _listen_task: asyncio.Task | None = None
  101. _seq_id_save_task: asyncio.Task | None
  102. permission_level: str
  103. username: str | None
  104. _notice_room_lock: asyncio.Lock
  105. _notice_send_lock: asyncio.Lock
  106. _is_logged_in: bool
  107. _is_connected: bool
  108. shutdown: bool
  109. remote_typing_status: TypingStatus | None
  110. def __init__(
  111. self,
  112. mxid: UserID,
  113. igpk: int | None = None,
  114. state: AndroidState | None = None,
  115. notice_room: RoomID | None = None,
  116. seq_id: int | None = None,
  117. snapshot_at_ms: int | None = None,
  118. ) -> None:
  119. super().__init__(
  120. mxid=mxid,
  121. igpk=igpk,
  122. state=state,
  123. notice_room=notice_room,
  124. seq_id=seq_id,
  125. snapshot_at_ms=snapshot_at_ms,
  126. )
  127. BaseUser.__init__(self)
  128. self._notice_room_lock = asyncio.Lock()
  129. self._notice_send_lock = asyncio.Lock()
  130. perms = self.config.get_permissions(mxid)
  131. self.relay_whitelisted, self.is_whitelisted, self.is_admin, self.permission_level = perms
  132. self.client = None
  133. self.mqtt = None
  134. self.username = None
  135. self._is_logged_in = False
  136. self._is_connected = False
  137. self._is_refreshing = False
  138. self.shutdown = False
  139. self._listen_task = None
  140. self.remote_typing_status = None
  141. self._seq_id_save_task = None
  142. self.proxy_handler = ProxyHandler(
  143. api_url=self.config["bridge.get_proxy_api_url"],
  144. )
  145. @classmethod
  146. def init_cls(cls, bridge: "InstagramBridge") -> AsyncIterable[Awaitable[None]]:
  147. cls.bridge = bridge
  148. cls.config = bridge.config
  149. cls.az = bridge.az
  150. cls.loop = bridge.loop
  151. return (user.try_connect() async for user in cls.all_logged_in())
  152. # region Connection management
  153. async def is_logged_in(self) -> bool:
  154. return bool(self.client) and self._is_logged_in
  155. async def get_puppet(self) -> pu.Puppet | None:
  156. if not self.igpk:
  157. return None
  158. return await pu.Puppet.get_by_pk(self.igpk)
  159. async def get_portal_with(self, puppet: pu.Puppet, create: bool = True) -> po.Portal | None:
  160. if not self.igpk:
  161. return None
  162. portal = await po.Portal.find_private_chat(self.igpk, puppet.pk)
  163. if portal:
  164. return portal
  165. if create:
  166. # TODO add error handling somewhere
  167. thread = await self.client.create_group_thread([puppet.pk])
  168. portal = await po.Portal.get_by_thread(thread, self.igpk)
  169. await portal.update_info(thread, self)
  170. return portal
  171. return None
  172. async def try_connect(self) -> None:
  173. try:
  174. await self.connect()
  175. except Exception as e:
  176. self.log.exception("Error while connecting to Instagram")
  177. await self.push_bridge_state(
  178. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  179. )
  180. @property
  181. def api_log(self) -> TraceLogger:
  182. return self.ig_base_log.getChild("http").getChild(self.mxid)
  183. @property
  184. def is_connected(self) -> bool:
  185. return bool(self.client) and bool(self.mqtt) and self._is_connected
  186. async def connect(self, user: CurrentUser | None = None) -> None:
  187. if not self.state:
  188. await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error="logged-out")
  189. return
  190. client = AndroidAPI(
  191. self.state,
  192. log=self.api_log,
  193. proxy_handler=self.proxy_handler,
  194. )
  195. if not user:
  196. try:
  197. resp = await client.current_user()
  198. user = resp.user
  199. except IGNotLoggedInError as e:
  200. self.log.warning(f"Failed to connect to Instagram: {e}, logging out")
  201. await self.logout(error=e)
  202. return
  203. except IGCheckpointError as e:
  204. self.log.debug("Checkpoint error content: %s", e.body)
  205. raise
  206. except (IGChallengeError, IGConsentRequiredError) as e:
  207. await self._handle_checkpoint(e, on="connect", client=client)
  208. return
  209. self.client = client
  210. self._is_logged_in = True
  211. self.igpk = user.pk
  212. self.username = user.username
  213. await self.push_bridge_state(BridgeStateEvent.CONNECTING)
  214. self._track_metric(METRIC_LOGGED_IN, True)
  215. self.by_igpk[self.igpk] = self
  216. self.mqtt = AndroidMQTT(
  217. self.state,
  218. loop=self.loop,
  219. log=self.ig_base_log.getChild("mqtt").getChild(self.mxid),
  220. proxy_handler=self.proxy_handler,
  221. )
  222. self.mqtt.add_event_handler(Connect, self.on_connect)
  223. self.mqtt.add_event_handler(Disconnect, self.on_disconnect)
  224. self.mqtt.add_event_handler(NewSequenceID, self.update_seq_id)
  225. self.mqtt.add_event_handler(MessageSyncEvent, self.handle_message)
  226. self.mqtt.add_event_handler(ThreadSyncEvent, self.handle_thread_sync)
  227. self.mqtt.add_event_handler(RealtimeDirectEvent, self.handle_rtd)
  228. self.mqtt.add_event_handler(ProxyUpdate, self.on_proxy_update)
  229. await self.update()
  230. self.loop.create_task(self._try_sync_puppet(user))
  231. if not self.seq_id or self.config["bridge.max_startup_thread_sync_count"]:
  232. self.loop.create_task(self._try_sync())
  233. else:
  234. self.log.debug("Connecting to MQTT directly as resync_on_startup is false")
  235. self.start_listen()
  236. async def on_connect(self, evt: Connect) -> None:
  237. self.log.debug("Connected to Instagram")
  238. self._track_metric(METRIC_CONNECTED, True)
  239. self._is_connected = True
  240. await self.send_bridge_notice("Connected to Instagram")
  241. await self.push_bridge_state(BridgeStateEvent.CONNECTED)
  242. async def on_disconnect(self, evt: Disconnect) -> None:
  243. self.log.debug("Disconnected from Instagram")
  244. self._track_metric(METRIC_CONNECTED, False)
  245. self._is_connected = False
  246. async def on_proxy_update(self, evt: ProxyUpdate | None = None) -> None:
  247. if self.client:
  248. self.client.setup_http(self.state.cookies.jar)
  249. # TODO this stuff could probably be moved to mautrix-python
  250. async def get_notice_room(self) -> RoomID:
  251. if not self.notice_room:
  252. async with self._notice_room_lock:
  253. # If someone already created the room while this call was waiting,
  254. # don't make a new room
  255. if self.notice_room:
  256. return self.notice_room
  257. creation_content = {}
  258. if not self.config["bridge.federate_rooms"]:
  259. creation_content["m.federate"] = False
  260. self.notice_room = await self.az.intent.create_room(
  261. is_direct=True,
  262. invitees=[self.mxid],
  263. topic="Instagram bridge notices",
  264. creation_content=creation_content,
  265. )
  266. await self.update()
  267. return self.notice_room
  268. async def fill_bridge_state(self, state: BridgeState) -> None:
  269. await super().fill_bridge_state(state)
  270. if not state.remote_id:
  271. if self.igpk:
  272. state.remote_id = str(self.igpk)
  273. else:
  274. try:
  275. state.remote_id = self.state.user_id
  276. except IGUserIDNotFoundError:
  277. state.remote_id = None
  278. if self.username:
  279. state.remote_name = f"@{self.username}"
  280. async def get_bridge_states(self) -> list[BridgeState]:
  281. if not self.state:
  282. return []
  283. state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR)
  284. if self.is_connected:
  285. state.state_event = BridgeStateEvent.CONNECTED
  286. elif self._is_refreshing or self.mqtt:
  287. state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
  288. return [state]
  289. async def send_bridge_notice(
  290. self,
  291. text: str,
  292. edit: EventID | None = None,
  293. state_event: BridgeStateEvent | None = None,
  294. important: bool = False,
  295. error_code: str | None = None,
  296. error_message: str | None = None,
  297. info: dict | None = None,
  298. ) -> EventID | None:
  299. if state_event:
  300. await self.push_bridge_state(
  301. state_event,
  302. error=error_code,
  303. message=error_message if error_code else text,
  304. info=info,
  305. )
  306. if self.config["bridge.disable_bridge_notices"]:
  307. return None
  308. if not important and not self.config["bridge.unimportant_bridge_notices"]:
  309. self.log.debug("Not sending unimportant bridge notice: %s", text)
  310. return None
  311. event_id = None
  312. try:
  313. self.log.debug("Sending bridge notice: %s", text)
  314. content = TextMessageEventContent(
  315. body=text, msgtype=(MessageType.TEXT if important else MessageType.NOTICE)
  316. )
  317. if edit:
  318. content.set_edit(edit)
  319. # This is locked to prevent notices going out in the wrong order
  320. async with self._notice_send_lock:
  321. event_id = await self.az.intent.send_message(await self.get_notice_room(), content)
  322. except Exception:
  323. self.log.warning("Failed to send bridge notice", exc_info=True)
  324. return edit or event_id
  325. async def _try_sync_puppet(self, user_info: CurrentUser) -> None:
  326. puppet = await pu.Puppet.get_by_pk(self.igpk)
  327. try:
  328. await puppet.update_info(user_info, self)
  329. except Exception:
  330. self.log.exception("Failed to update own puppet info")
  331. try:
  332. if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
  333. self.log.info(f"Automatically enabling custom puppet")
  334. await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
  335. except Exception:
  336. self.log.exception("Failed to automatically enable custom puppet")
  337. async def _try_sync(self) -> None:
  338. try:
  339. await self.sync()
  340. except Exception as e:
  341. self.log.exception("Exception while syncing")
  342. if isinstance(e, IGCheckpointError):
  343. self.log.debug("Checkpoint error content: %s", e.body)
  344. await self.push_bridge_state(
  345. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  346. )
  347. async def get_direct_chats(self) -> dict[UserID, list[RoomID]]:
  348. return {
  349. pu.Puppet.get_mxid_from_id(portal.other_user_pk): [portal.mxid]
  350. for portal in await DBPortal.find_private_chats_of(self.igpk)
  351. if portal.mxid
  352. }
  353. async def refresh(self, resync: bool = True) -> None:
  354. self._is_refreshing = True
  355. try:
  356. await self.stop_listen()
  357. if resync:
  358. retry_count = 0
  359. minutes = 1
  360. while True:
  361. try:
  362. await self.sync()
  363. return
  364. except Exception as e:
  365. if retry_count >= 4 and minutes < 10:
  366. minutes += 1
  367. retry_count += 1
  368. s = "s" if minutes != 1 else ""
  369. self.log.exception(
  370. f"Error while syncing for refresh, retrying in {minutes} minute{s}"
  371. )
  372. if isinstance(e, IGCheckpointError):
  373. self.log.debug("Checkpoint error content: %s", e.body)
  374. await self.push_bridge_state(
  375. BridgeStateEvent.UNKNOWN_ERROR,
  376. error="unknown-error",
  377. message="An unknown error occurred while connecting to Instagram",
  378. info={"python_error": str(e)},
  379. )
  380. await asyncio.sleep(minutes * 60)
  381. else:
  382. self.start_listen()
  383. finally:
  384. self._is_refreshing = False
  385. self.proxy_handler.update_proxy_url()
  386. async def _handle_checkpoint(
  387. self,
  388. e: IGChallengeError | IGConsentRequiredError,
  389. on: str,
  390. client: AndroidAPI | None = None,
  391. ) -> None:
  392. self.log.warning(f"Got checkpoint error on {on}: {e.body.serialize()}")
  393. client = client or self.client
  394. self.client = None
  395. self.mqtt = None
  396. if isinstance(e, IGConsentRequiredError):
  397. await self.push_bridge_state(
  398. BridgeStateEvent.BAD_CREDENTIALS,
  399. error="ig-consent-required",
  400. info=e.body.serialize(),
  401. )
  402. return
  403. error_code = "ig-checkpoint"
  404. try:
  405. resp = await client.challenge_reset()
  406. info = {
  407. "challenge_context": (
  408. resp.challenge_context.serialize() if resp.challenge_context_str else None
  409. ),
  410. "step_name": resp.step_name,
  411. "step_data": resp.step_data.serialize() if resp.step_data else None,
  412. "user_id": resp.user_id,
  413. "action": resp.action,
  414. "status": resp.status,
  415. "challenge": e.body.challenge.serialize() if e.body.challenge else None,
  416. }
  417. self.log.debug(f"Challenge state: {resp.serialize()}")
  418. if resp.challenge_context.challenge_type_enum == "HACKED_LOCK":
  419. error_code = "ig-checkpoint-locked"
  420. except Exception:
  421. self.log.exception("Error resetting challenge state")
  422. info = {"challenge": e.body.challenge.serialize() if e.body.challenge else None}
  423. await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error=error_code, info=info)
  424. async def _sync_thread(self, thread: Thread, allow_create: bool) -> None:
  425. portal = await po.Portal.get_by_thread(thread, self.igpk)
  426. if portal.mxid:
  427. self.log.debug(f"{thread.thread_id} has a portal, syncing and backfilling...")
  428. await portal.update_matrix_room(self, thread)
  429. elif allow_create:
  430. self.log.debug(f"{thread.thread_id} has been active recently, creating portal...")
  431. await portal.create_matrix_room(self, thread)
  432. else:
  433. self.log.debug(f"{thread.thread_id} is not active and doesn't have a portal")
  434. async def _maybe_update_proxy(self, source: str) -> None:
  435. if not self._listen_task:
  436. self.proxy_handler.update_proxy_url()
  437. await self.on_proxy_update()
  438. else:
  439. self.log.debug(f"Not updating proxy: listen_task is still running? (caller: {source})")
  440. async def sync(self) -> None:
  441. sleep_minutes = 2
  442. errors = 0
  443. while True:
  444. try:
  445. resp = await self.client.get_inbox()
  446. break
  447. except (
  448. ProxyError,
  449. ProxyTimeoutError,
  450. ProxyConnectionError,
  451. ClientConnectionError,
  452. ConnectionError,
  453. asyncio.TimeoutError,
  454. ) as e:
  455. errors += 1
  456. wait = min(errors * 10, 60)
  457. self.log.warning(
  458. f"{e.__class__.__name__} while trying to sync, retrying in {wait} seconds: {e}"
  459. )
  460. await asyncio.sleep(wait)
  461. await self._maybe_update_proxy("sync error")
  462. except IGNotLoggedInError as e:
  463. self.log.exception("Got not logged in error while syncing")
  464. await self.logout(error=e)
  465. return
  466. except IGRateLimitError as e:
  467. self.log.error(
  468. "Got ratelimit error while trying to get inbox (%s), retrying in %d minutes",
  469. e.body,
  470. sleep_minutes,
  471. )
  472. await self.push_bridge_state(
  473. BridgeStateEvent.TRANSIENT_DISCONNECT, error="ig-rate-limit"
  474. )
  475. await asyncio.sleep(sleep_minutes * 60)
  476. sleep_minutes += 2
  477. except IGCheckpointError as e:
  478. self.log.debug("Checkpoint error content: %s", e.body)
  479. raise
  480. except (IGChallengeError, IGConsentRequiredError) as e:
  481. await self._handle_checkpoint(e, on="sync")
  482. return
  483. self.seq_id = resp.seq_id
  484. self.snapshot_at_ms = resp.snapshot_at_ms
  485. await self.save_seq_id()
  486. if not self._listen_task:
  487. self.start_listen(is_after_sync=True)
  488. max_age = self.config["bridge.portal_create_max_age"] * 1_000_000
  489. limit = self.config["bridge.backfill.max_conversations"] # TODO
  490. create_limit = self.config["bridge.backfill.max_conversations"] # TODO
  491. min_active_at = (time.time() * 1_000_000) - max_age
  492. i = 0
  493. await self.push_bridge_state(BridgeStateEvent.BACKFILLING)
  494. async for thread in self.client.iter_inbox(start_at=resp):
  495. try:
  496. await self._sync_thread(
  497. thread=thread,
  498. allow_create=thread.last_activity_at > min_active_at and i < create_limit,
  499. )
  500. except Exception:
  501. self.log.exception(f"Error syncing thread {thread.thread_id}")
  502. i += 1
  503. if i >= limit:
  504. break
  505. try:
  506. await self.update_direct_chats()
  507. except Exception:
  508. self.log.exception("Error updating direct chat list")
  509. def start_listen(self, is_after_sync: bool = False) -> None:
  510. self.shutdown = False
  511. task = self._listen(
  512. seq_id=self.seq_id, snapshot_at_ms=self.snapshot_at_ms, is_after_sync=is_after_sync
  513. )
  514. self._listen_task = self.loop.create_task(task)
  515. async def fetch_user_and_reconnect(self) -> None:
  516. self.log.debug("Refetching current user after disconnection")
  517. errors = 0
  518. while True:
  519. try:
  520. resp = await self.client.current_user()
  521. except (
  522. ProxyError,
  523. ProxyTimeoutError,
  524. ProxyConnectionError,
  525. ClientConnectionError,
  526. ConnectionError,
  527. asyncio.TimeoutError,
  528. ) as e:
  529. errors += 1
  530. wait = min(errors * 10, 60)
  531. self.log.warning(
  532. f"{e.__class__.__name__} while trying to check user for reconnection, "
  533. f"retrying in {wait} seconds: {e}"
  534. )
  535. await asyncio.sleep(wait)
  536. await self._maybe_update_proxy("fetch_user_and_reconnect error")
  537. except IGNotLoggedInError as e:
  538. self.log.warning(f"Failed to reconnect to Instagram: {e}, logging out")
  539. await self.logout(error=e)
  540. return
  541. except (IGChallengeError, IGConsentRequiredError) as e:
  542. await self._handle_checkpoint(e, on="reconnect")
  543. return
  544. except Exception as e:
  545. self.log.exception("Error while reconnecting to Instagram")
  546. if isinstance(e, IGCheckpointError):
  547. self.log.debug("Checkpoint error content: %s", e.body)
  548. await self.push_bridge_state(
  549. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  550. )
  551. return
  552. else:
  553. self.log.debug(f"Confirmed current user {resp.user.pk}")
  554. self.start_listen()
  555. return
  556. async def _listen(self, seq_id: int, snapshot_at_ms: int, is_after_sync: bool) -> None:
  557. try:
  558. await self.mqtt.listen(
  559. graphql_subs={
  560. GraphQLSubscription.app_presence(),
  561. GraphQLSubscription.direct_typing(self.state.user_id),
  562. GraphQLSubscription.direct_status(),
  563. },
  564. skywalker_subs={
  565. SkywalkerSubscription.direct_sub(self.state.user_id),
  566. SkywalkerSubscription.live_sub(self.state.user_id),
  567. },
  568. seq_id=seq_id,
  569. snapshot_at_ms=snapshot_at_ms,
  570. )
  571. except IrisSubscribeError as e:
  572. if is_after_sync:
  573. self.log.exception("Got IrisSubscribeError right after refresh")
  574. await self.send_bridge_notice(
  575. f"Reconnecting failed again after refresh: {e}",
  576. important=True,
  577. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  578. error_code="ig-refresh-connection-error",
  579. error_message=str(e),
  580. info={"python_error": str(e)},
  581. )
  582. else:
  583. self.log.warning(f"Got IrisSubscribeError {e}, refreshing...")
  584. asyncio.create_task(self.refresh())
  585. except (MQTTNotConnected, MQTTNotLoggedIn, MQTTConnectionUnauthorized) as e:
  586. self.log.warning(
  587. f"Unexpected connection error: {e}", exc_info="MQTT reconnection failed" in str(e)
  588. )
  589. await self.send_bridge_notice(
  590. f"Error in listener: {e}",
  591. important=True,
  592. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  593. error_code="ig-connection-error",
  594. )
  595. self.mqtt.disconnect()
  596. asyncio.create_task(self.fetch_user_and_reconnect())
  597. except Exception as e:
  598. self.log.exception("Fatal error in listener")
  599. await self.send_bridge_notice(
  600. "Fatal error in listener (see logs for more info)",
  601. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  602. important=True,
  603. error_code="ig-unknown-connection-error",
  604. info={"python_error": str(e)},
  605. )
  606. self.mqtt.disconnect()
  607. else:
  608. if not self.shutdown:
  609. await self.send_bridge_notice(
  610. "Instagram connection closed without error",
  611. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  612. error_code="ig-disconnected",
  613. )
  614. finally:
  615. self._listen_task = None
  616. self._is_connected = False
  617. self._track_metric(METRIC_CONNECTED, False)
  618. async def stop_listen(self) -> None:
  619. if self.mqtt:
  620. self.shutdown = True
  621. self.mqtt.disconnect()
  622. if self._listen_task:
  623. await self._listen_task
  624. self.shutdown = False
  625. self._track_metric(METRIC_CONNECTED, False)
  626. self._is_connected = False
  627. await self.update()
  628. async def logout(self, error: IGNotLoggedInError | None = None) -> None:
  629. if self.client and error is None:
  630. try:
  631. await self.client.logout(one_tap_app_login=False)
  632. except Exception:
  633. self.log.debug("Exception logging out", exc_info=True)
  634. if self.mqtt:
  635. self.mqtt.disconnect()
  636. self._track_metric(METRIC_CONNECTED, False)
  637. self._track_metric(METRIC_LOGGED_IN, False)
  638. if error is None:
  639. await self.push_bridge_state(BridgeStateEvent.LOGGED_OUT)
  640. puppet = await pu.Puppet.get_by_pk(self.igpk, create=False)
  641. if puppet and puppet.is_real_user:
  642. await puppet.switch_mxid(None, None)
  643. try:
  644. del self.by_igpk[self.igpk]
  645. except KeyError:
  646. pass
  647. self.igpk = None
  648. else:
  649. self.log.debug("Auth error body: %s", error.body.serialize())
  650. await self.send_bridge_notice(
  651. f"You have been logged out of Instagram: {error.proper_message}",
  652. important=True,
  653. state_event=BridgeStateEvent.BAD_CREDENTIALS,
  654. error_code="ig-auth-error",
  655. error_message=error.proper_message,
  656. )
  657. self.client = None
  658. self.mqtt = None
  659. self.state = None
  660. self.seq_id = None
  661. self.snapshot_at_ms = None
  662. self._is_logged_in = False
  663. await self.update()
  664. # endregion
  665. # region Event handlers
  666. async def _save_seq_id_after_sleep(self) -> None:
  667. await asyncio.sleep(120)
  668. self._seq_id_save_task = None
  669. self.log.trace("Saving sequence ID %d/%d", self.seq_id, self.snapshot_at_ms)
  670. try:
  671. await self.save_seq_id()
  672. except Exception:
  673. self.log.exception("Error saving sequence ID")
  674. async def update_seq_id(self, evt: NewSequenceID) -> None:
  675. self.seq_id = evt.seq_id
  676. self.snapshot_at_ms = evt.snapshot_at_ms
  677. if not self._seq_id_save_task or self._seq_id_save_task.done():
  678. self.log.trace("Starting seq id save task (%d/%d)", evt.seq_id, evt.snapshot_at_ms)
  679. self._seq_id_save_task = asyncio.create_task(self._save_seq_id_after_sleep())
  680. else:
  681. self.log.trace("Not starting seq id save task (%d/%d)", evt.seq_id, evt.snapshot_at_ms)
  682. @async_time(METRIC_MESSAGE)
  683. async def handle_message(self, evt: MessageSyncEvent) -> None:
  684. portal = await po.Portal.get_by_thread_id(evt.message.thread_id, receiver=self.igpk)
  685. if not portal or not portal.mxid:
  686. self.log.debug("Got message in thread with no portal, getting info...")
  687. resp = await self.client.get_thread(evt.message.thread_id)
  688. portal = await po.Portal.get_by_thread(resp.thread, self.igpk)
  689. self.log.debug("Got info for unknown portal, creating room")
  690. await portal.create_matrix_room(self, resp.thread)
  691. if not portal.mxid:
  692. self.log.warning(
  693. "Room creation appears to have failed, "
  694. f"dropping message in {evt.message.thread_id}"
  695. )
  696. return
  697. self.log.trace(f"Received message sync event {evt.message}")
  698. if evt.message.new_reaction:
  699. await portal.handle_instagram_reaction(
  700. evt.message, remove=evt.message.op == Operation.REMOVE
  701. )
  702. return
  703. sender = await pu.Puppet.get_by_pk(evt.message.user_id) if evt.message.user_id else None
  704. if evt.message.op == Operation.ADD:
  705. if not sender:
  706. # I don't think we care about adds with no sender
  707. return
  708. await portal.handle_instagram_item(self, sender, evt.message)
  709. elif evt.message.op == Operation.REMOVE:
  710. # Removes don't have a sender, only the message sender can unsend messages anyway
  711. await portal.handle_instagram_remove(evt.message.item_id)
  712. elif evt.message.op == Operation.REPLACE:
  713. await portal.handle_instagram_update(evt.message)
  714. @async_time(METRIC_THREAD_SYNC)
  715. async def handle_thread_sync(self, evt: ThreadSyncEvent) -> None:
  716. self.log.trace("Thread sync event content: %s", evt)
  717. portal = await po.Portal.get_by_thread(evt, receiver=self.igpk)
  718. if portal.mxid:
  719. self.log.debug("Got thread sync event for %s with existing portal", portal.thread_id)
  720. await portal.update_matrix_room(self, evt)
  721. elif evt.is_group:
  722. self.log.debug(
  723. "Got thread sync event for group %s without existing portal, creating room",
  724. portal.thread_id,
  725. )
  726. await portal.create_matrix_room(self, evt)
  727. else:
  728. self.log.debug(
  729. "Got thread sync event for DM %s without existing portal, ignoring",
  730. portal.thread_id,
  731. )
  732. @async_time(METRIC_RTD)
  733. async def handle_rtd(self, evt: RealtimeDirectEvent) -> None:
  734. if not isinstance(evt.value, ActivityIndicatorData):
  735. return
  736. now = int(time.time() * 1000)
  737. date = evt.value.timestamp_ms
  738. expiry = date + evt.value.ttl
  739. if expiry < now:
  740. return
  741. if evt.activity_indicator_id in self._activity_indicator_ids:
  742. return
  743. # TODO clear expired items from this dict
  744. self._activity_indicator_ids[evt.activity_indicator_id] = expiry
  745. puppet = await pu.Puppet.get_by_pk(int(evt.value.sender_id))
  746. portal = await po.Portal.get_by_thread_id(evt.thread_id, receiver=self.igpk)
  747. if not puppet or not portal or not portal.mxid:
  748. return
  749. is_typing = evt.value.activity_status != TypingStatus.OFF
  750. if puppet.pk == self.igpk:
  751. self.remote_typing_status = TypingStatus.TEXT if is_typing else TypingStatus.OFF
  752. await puppet.intent_for(portal).set_typing(
  753. portal.mxid, is_typing=is_typing, timeout=evt.value.ttl
  754. )
  755. # endregion
  756. # region Database getters
  757. def _add_to_cache(self) -> None:
  758. self.by_mxid[self.mxid] = self
  759. if self.igpk:
  760. self.by_igpk[self.igpk] = self
  761. @classmethod
  762. @async_getter_lock
  763. async def get_by_mxid(cls, mxid: UserID, *, create: bool = True) -> User | None:
  764. # Never allow ghosts to be users
  765. if pu.Puppet.get_id_from_mxid(mxid):
  766. return None
  767. try:
  768. return cls.by_mxid[mxid]
  769. except KeyError:
  770. pass
  771. user = cast(cls, await super().get_by_mxid(mxid))
  772. if user is not None:
  773. user._add_to_cache()
  774. return user
  775. if create:
  776. user = cls(mxid)
  777. await user.insert()
  778. user._add_to_cache()
  779. return user
  780. return None
  781. @classmethod
  782. @async_getter_lock
  783. async def get_by_igpk(cls, igpk: int) -> User | None:
  784. try:
  785. return cls.by_igpk[igpk]
  786. except KeyError:
  787. pass
  788. user = cast(cls, await super().get_by_igpk(igpk))
  789. if user is not None:
  790. user._add_to_cache()
  791. return user
  792. return None
  793. @classmethod
  794. async def all_logged_in(cls) -> AsyncGenerator[User, None]:
  795. users = await super().all_logged_in()
  796. user: cls
  797. for index, user in enumerate(users):
  798. try:
  799. yield cls.by_mxid[user.mxid]
  800. except KeyError:
  801. user._add_to_cache()
  802. yield user
  803. # endregion