user.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2022 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable, cast
  18. import asyncio
  19. import logging
  20. import time
  21. from mauigpapi import AndroidAPI, AndroidMQTT, AndroidState, ProxyHandler
  22. from mauigpapi.errors import (
  23. IGChallengeError,
  24. IGCheckpointError,
  25. IGConsentRequiredError,
  26. IGNotLoggedInError,
  27. IGRateLimitError,
  28. IGUserIDNotFoundError,
  29. IrisSubscribeError,
  30. MQTTConnectionUnauthorized,
  31. MQTTNotConnected,
  32. MQTTNotLoggedIn,
  33. )
  34. from mauigpapi.mqtt import (
  35. Connect,
  36. Disconnect,
  37. GraphQLSubscription,
  38. NewSequenceID,
  39. ProxyUpdate,
  40. SkywalkerSubscription,
  41. )
  42. from mauigpapi.types import (
  43. ActivityIndicatorData,
  44. CurrentUser,
  45. MessageSyncEvent,
  46. Operation,
  47. RealtimeDirectEvent,
  48. Thread,
  49. ThreadSyncEvent,
  50. TypingStatus,
  51. )
  52. from mautrix.appservice import AppService
  53. from mautrix.bridge import BaseUser, async_getter_lock
  54. from mautrix.types import EventID, MessageType, RoomID, TextMessageEventContent, UserID
  55. from mautrix.util.bridge_state import BridgeState, BridgeStateEvent
  56. from mautrix.util.logging import TraceLogger
  57. from mautrix.util.opt_prometheus import Gauge, Summary, async_time
  58. from . import portal as po, puppet as pu
  59. from .config import Config
  60. from .db import Portal as DBPortal, User as DBUser
  61. if TYPE_CHECKING:
  62. from .__main__ import InstagramBridge
  63. METRIC_MESSAGE = Summary("bridge_on_message", "calls to handle_message")
  64. METRIC_THREAD_SYNC = Summary("bridge_on_thread_sync", "calls to handle_thread_sync")
  65. METRIC_RTD = Summary("bridge_on_rtd", "calls to handle_rtd")
  66. METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
  67. METRIC_CONNECTED = Gauge("bridge_connected", "Bridged users connected to Instagram")
  68. BridgeState.human_readable_errors.update(
  69. {
  70. "ig-connection-error": "Instagram disconnected unexpectedly",
  71. "ig-refresh-connection-error": "Reconnecting failed again after refresh: {message}",
  72. "ig-connection-fatal-error": "Instagram disconnected unexpectedly",
  73. "ig-auth-error": "Authentication error from Instagram: {message}",
  74. "ig-checkpoint": "Instagram checkpoint error. Please check the Instagram website.",
  75. "ig-consent-required": "Instagram requires a consent update. Please check the Instagram website.",
  76. "ig-checkpoint-locked": "Instagram checkpoint error. Please check the Instagram website.",
  77. "ig-rate-limit": "Got Instagram ratelimit error, waiting a few minutes before retrying...",
  78. "ig-disconnected": None,
  79. "ig-no-mqtt": "You're not connected to Instagram",
  80. "logged-out": "You're not logged into Instagram",
  81. }
  82. )
  83. class User(DBUser, BaseUser):
  84. ig_base_log: TraceLogger = logging.getLogger("mau.instagram")
  85. _activity_indicator_ids: dict[str, int] = {}
  86. by_mxid: dict[UserID, User] = {}
  87. by_igpk: dict[int, User] = {}
  88. config: Config
  89. az: AppService
  90. loop: asyncio.AbstractEventLoop
  91. client: AndroidAPI | None
  92. mqtt: AndroidMQTT | None
  93. _listen_task: asyncio.Task | None = None
  94. _seq_id_save_task: asyncio.Task | None
  95. permission_level: str
  96. username: str | None
  97. _notice_room_lock: asyncio.Lock
  98. _notice_send_lock: asyncio.Lock
  99. _is_logged_in: bool
  100. _is_connected: bool
  101. shutdown: bool
  102. remote_typing_status: TypingStatus | None
  103. def __init__(
  104. self,
  105. mxid: UserID,
  106. igpk: int | None = None,
  107. state: AndroidState | None = None,
  108. notice_room: RoomID | None = None,
  109. seq_id: int | None = None,
  110. snapshot_at_ms: int | None = None,
  111. ) -> None:
  112. super().__init__(
  113. mxid=mxid,
  114. igpk=igpk,
  115. state=state,
  116. notice_room=notice_room,
  117. seq_id=seq_id,
  118. snapshot_at_ms=snapshot_at_ms,
  119. )
  120. BaseUser.__init__(self)
  121. self._notice_room_lock = asyncio.Lock()
  122. self._notice_send_lock = asyncio.Lock()
  123. perms = self.config.get_permissions(mxid)
  124. self.relay_whitelisted, self.is_whitelisted, self.is_admin, self.permission_level = perms
  125. self.client = None
  126. self.mqtt = None
  127. self.username = None
  128. self._is_logged_in = False
  129. self._is_connected = False
  130. self._is_refreshing = False
  131. self.shutdown = False
  132. self._listen_task = None
  133. self.remote_typing_status = None
  134. self._seq_id_save_task = None
  135. self.proxy_handler = ProxyHandler(
  136. api_url=self.config["bridge.get_proxy_api_url"],
  137. )
  138. @classmethod
  139. def init_cls(cls, bridge: "InstagramBridge") -> AsyncIterable[Awaitable[None]]:
  140. cls.bridge = bridge
  141. cls.config = bridge.config
  142. cls.az = bridge.az
  143. cls.loop = bridge.loop
  144. return (user.try_connect() async for user in cls.all_logged_in())
  145. # region Connection management
  146. async def is_logged_in(self) -> bool:
  147. return bool(self.client) and self._is_logged_in
  148. async def get_puppet(self) -> pu.Puppet | None:
  149. if not self.igpk:
  150. return None
  151. return await pu.Puppet.get_by_pk(self.igpk)
  152. async def get_portal_with(self, puppet: pu.Puppet, create: bool = True) -> po.Portal | None:
  153. if not self.igpk:
  154. return None
  155. portal = await po.Portal.find_private_chat(self.igpk, puppet.pk)
  156. if portal:
  157. return portal
  158. if create:
  159. # TODO add error handling somewhere
  160. thread = await self.client.create_group_thread([puppet.pk])
  161. portal = await po.Portal.get_by_thread(thread, self.igpk)
  162. await portal.update_info(thread, self)
  163. return portal
  164. return None
  165. async def try_connect(self) -> None:
  166. try:
  167. await self.connect()
  168. except Exception as e:
  169. self.log.exception("Error while connecting to Instagram")
  170. await self.push_bridge_state(
  171. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  172. )
  173. @property
  174. def api_log(self) -> TraceLogger:
  175. return self.ig_base_log.getChild("http").getChild(self.mxid)
  176. @property
  177. def is_connected(self) -> bool:
  178. return bool(self.client) and bool(self.mqtt) and self._is_connected
  179. async def connect(self, user: CurrentUser | None = None) -> None:
  180. if not self.state:
  181. await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error="logged-out")
  182. return
  183. client = AndroidAPI(
  184. self.state,
  185. log=self.api_log,
  186. proxy_handler=self.proxy_handler,
  187. )
  188. if not user:
  189. try:
  190. resp = await client.current_user()
  191. user = resp.user
  192. except IGNotLoggedInError as e:
  193. self.log.warning(f"Failed to connect to Instagram: {e}, logging out")
  194. await self.logout(error=e)
  195. return
  196. except IGCheckpointError as e:
  197. self.log.debug("Checkpoint error content: %s", e.body)
  198. raise
  199. except (IGChallengeError, IGConsentRequiredError) as e:
  200. await self._handle_checkpoint(e, on="connect", client=client)
  201. return
  202. self.client = client
  203. self._is_logged_in = True
  204. self.igpk = user.pk
  205. self.username = user.username
  206. await self.push_bridge_state(BridgeStateEvent.CONNECTING)
  207. self._track_metric(METRIC_LOGGED_IN, True)
  208. self.by_igpk[self.igpk] = self
  209. self.mqtt = AndroidMQTT(
  210. self.state,
  211. loop=self.loop,
  212. log=self.ig_base_log.getChild("mqtt").getChild(self.mxid),
  213. proxy_handler=self.proxy_handler,
  214. )
  215. self.mqtt.add_event_handler(Connect, self.on_connect)
  216. self.mqtt.add_event_handler(Disconnect, self.on_disconnect)
  217. self.mqtt.add_event_handler(NewSequenceID, self.update_seq_id)
  218. self.mqtt.add_event_handler(MessageSyncEvent, self.handle_message)
  219. self.mqtt.add_event_handler(ThreadSyncEvent, self.handle_thread_sync)
  220. self.mqtt.add_event_handler(RealtimeDirectEvent, self.handle_rtd)
  221. self.mqtt.add_event_handler(ProxyUpdate, self.on_proxy_update)
  222. await self.update()
  223. self.loop.create_task(self._try_sync_puppet(user))
  224. if not self.seq_id or self.config["bridge.resync_on_startup"]:
  225. self.loop.create_task(self._try_sync())
  226. else:
  227. self.log.debug("Connecting to MQTT directly as resync_on_startup is false")
  228. self.start_listen()
  229. async def on_connect(self, evt: Connect) -> None:
  230. self.log.debug("Connected to Instagram")
  231. self._track_metric(METRIC_CONNECTED, True)
  232. self._is_connected = True
  233. await self.send_bridge_notice("Connected to Instagram")
  234. await self.push_bridge_state(BridgeStateEvent.CONNECTED)
  235. async def on_disconnect(self, evt: Disconnect) -> None:
  236. self.log.debug("Disconnected from Instagram")
  237. self._track_metric(METRIC_CONNECTED, False)
  238. self._is_connected = False
  239. async def on_proxy_update(self, evt: ProxyUpdate | None = None) -> None:
  240. if self.client:
  241. self.client.setup_http(self.state.cookies.jar)
  242. # TODO this stuff could probably be moved to mautrix-python
  243. async def get_notice_room(self) -> RoomID:
  244. if not self.notice_room:
  245. async with self._notice_room_lock:
  246. # If someone already created the room while this call was waiting,
  247. # don't make a new room
  248. if self.notice_room:
  249. return self.notice_room
  250. creation_content = {}
  251. if not self.config["bridge.federate_rooms"]:
  252. creation_content["m.federate"] = False
  253. self.notice_room = await self.az.intent.create_room(
  254. is_direct=True,
  255. invitees=[self.mxid],
  256. topic="Instagram bridge notices",
  257. creation_content=creation_content,
  258. )
  259. await self.update()
  260. return self.notice_room
  261. async def fill_bridge_state(self, state: BridgeState) -> None:
  262. await super().fill_bridge_state(state)
  263. if not state.remote_id:
  264. if self.igpk:
  265. state.remote_id = str(self.igpk)
  266. else:
  267. try:
  268. state.remote_id = self.state.user_id
  269. except IGUserIDNotFoundError:
  270. state.remote_id = None
  271. if self.username:
  272. state.remote_name = f"@{self.username}"
  273. async def get_bridge_states(self) -> list[BridgeState]:
  274. if not self.state:
  275. return []
  276. state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR)
  277. if self.is_connected:
  278. state.state_event = BridgeStateEvent.CONNECTED
  279. elif self._is_refreshing or self.mqtt:
  280. state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
  281. return [state]
  282. async def send_bridge_notice(
  283. self,
  284. text: str,
  285. edit: EventID | None = None,
  286. state_event: BridgeStateEvent | None = None,
  287. important: bool = False,
  288. error_code: str | None = None,
  289. error_message: str | None = None,
  290. info: dict | None = None,
  291. ) -> EventID | None:
  292. if state_event:
  293. await self.push_bridge_state(
  294. state_event,
  295. error=error_code,
  296. message=error_message if error_code else text,
  297. info=info,
  298. )
  299. if self.config["bridge.disable_bridge_notices"]:
  300. return None
  301. if not important and not self.config["bridge.unimportant_bridge_notices"]:
  302. self.log.debug("Not sending unimportant bridge notice: %s", text)
  303. return None
  304. event_id = None
  305. try:
  306. self.log.debug("Sending bridge notice: %s", text)
  307. content = TextMessageEventContent(
  308. body=text, msgtype=(MessageType.TEXT if important else MessageType.NOTICE)
  309. )
  310. if edit:
  311. content.set_edit(edit)
  312. # This is locked to prevent notices going out in the wrong order
  313. async with self._notice_send_lock:
  314. event_id = await self.az.intent.send_message(await self.get_notice_room(), content)
  315. except Exception:
  316. self.log.warning("Failed to send bridge notice", exc_info=True)
  317. return edit or event_id
  318. async def _try_sync_puppet(self, user_info: CurrentUser) -> None:
  319. puppet = await pu.Puppet.get_by_pk(self.igpk)
  320. try:
  321. await puppet.update_info(user_info, self)
  322. except Exception:
  323. self.log.exception("Failed to update own puppet info")
  324. try:
  325. if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
  326. self.log.info(f"Automatically enabling custom puppet")
  327. await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
  328. except Exception:
  329. self.log.exception("Failed to automatically enable custom puppet")
  330. async def _try_sync(self) -> None:
  331. try:
  332. await self.sync()
  333. except Exception as e:
  334. self.log.exception("Exception while syncing")
  335. if isinstance(e, IGCheckpointError):
  336. self.log.debug("Checkpoint error content: %s", e.body)
  337. await self.push_bridge_state(
  338. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  339. )
  340. async def get_direct_chats(self) -> dict[UserID, list[RoomID]]:
  341. return {
  342. pu.Puppet.get_mxid_from_id(portal.other_user_pk): [portal.mxid]
  343. for portal in await DBPortal.find_private_chats_of(self.igpk)
  344. if portal.mxid
  345. }
  346. async def refresh(self, resync: bool = True) -> None:
  347. self._is_refreshing = True
  348. try:
  349. await self.stop_listen()
  350. if resync:
  351. retry_count = 0
  352. minutes = 1
  353. while True:
  354. try:
  355. await self.sync()
  356. return
  357. except Exception as e:
  358. if retry_count >= 4 and minutes < 10:
  359. minutes += 1
  360. retry_count += 1
  361. s = "s" if minutes != 1 else ""
  362. self.log.exception(
  363. f"Error while syncing for refresh, retrying in {minutes} minute{s}"
  364. )
  365. if isinstance(e, IGCheckpointError):
  366. self.log.debug("Checkpoint error content: %s", e.body)
  367. await self.push_bridge_state(
  368. BridgeStateEvent.UNKNOWN_ERROR,
  369. error="unknown-error",
  370. message="An unknown error occurred while connecting to Instagram",
  371. info={"python_error": str(e)},
  372. )
  373. await asyncio.sleep(minutes * 60)
  374. else:
  375. self.start_listen()
  376. finally:
  377. self._is_refreshing = False
  378. self.proxy_handler.update_proxy_url()
  379. async def _handle_checkpoint(
  380. self,
  381. e: IGChallengeError | IGConsentRequiredError,
  382. on: str,
  383. client: AndroidAPI | None = None,
  384. ) -> None:
  385. self.log.warning(f"Got checkpoint error on {on}: {e.body.serialize()}")
  386. client = client or self.client
  387. self.client = None
  388. self.mqtt = None
  389. if isinstance(e, IGConsentRequiredError):
  390. await self.push_bridge_state(
  391. BridgeStateEvent.BAD_CREDENTIALS,
  392. error="ig-consent-required",
  393. info=e.body.serialize(),
  394. )
  395. return
  396. error_code = "ig-checkpoint"
  397. try:
  398. resp = await client.challenge_reset()
  399. info = {
  400. "challenge_context": (
  401. resp.challenge_context.serialize() if resp.challenge_context_str else None
  402. ),
  403. "step_name": resp.step_name,
  404. "step_data": resp.step_data.serialize() if resp.step_data else None,
  405. "user_id": resp.user_id,
  406. "action": resp.action,
  407. "status": resp.status,
  408. "challenge": e.body.challenge.serialize() if e.body.challenge else None,
  409. }
  410. self.log.debug(f"Challenge state: {resp.serialize()}")
  411. if resp.challenge_context.challenge_type_enum == "HACKED_LOCK":
  412. error_code = "ig-checkpoint-locked"
  413. except Exception:
  414. self.log.exception("Error resetting challenge state")
  415. info = {"challenge": e.body.challenge.serialize() if e.body.challenge else None}
  416. await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error=error_code, info=info)
  417. async def _sync_thread(self, thread: Thread, allow_create: bool) -> None:
  418. portal = await po.Portal.get_by_thread(thread, self.igpk)
  419. if portal.mxid:
  420. self.log.debug(f"{thread.thread_id} has a portal, syncing and backfilling...")
  421. await portal.update_matrix_room(self, thread, backfill=True)
  422. elif allow_create:
  423. self.log.debug(f"{thread.thread_id} has been active recently, creating portal...")
  424. await portal.create_matrix_room(self, thread)
  425. else:
  426. self.log.debug(f"{thread.thread_id} is not active and doesn't have a portal")
  427. async def sync(self) -> None:
  428. sleep_minutes = 2
  429. while True:
  430. try:
  431. resp = await self.client.get_inbox()
  432. break
  433. except IGNotLoggedInError as e:
  434. self.log.exception("Got not logged in error while syncing")
  435. await self.logout(error=e)
  436. return
  437. except IGRateLimitError as e:
  438. self.log.error(
  439. "Got ratelimit error while trying to get inbox (%s), retrying in %d minutes",
  440. e.body,
  441. sleep_minutes,
  442. )
  443. await self.push_bridge_state(
  444. BridgeStateEvent.TRANSIENT_DISCONNECT, error="ig-rate-limit"
  445. )
  446. await asyncio.sleep(sleep_minutes * 60)
  447. sleep_minutes += 2
  448. except IGCheckpointError as e:
  449. self.log.debug("Checkpoint error content: %s", e.body)
  450. raise
  451. except (IGChallengeError, IGConsentRequiredError) as e:
  452. await self._handle_checkpoint(e, on="sync")
  453. return
  454. self.seq_id = resp.seq_id
  455. self.snapshot_at_ms = resp.snapshot_at_ms
  456. await self.save_seq_id()
  457. if not self._listen_task:
  458. self.start_listen(is_after_sync=True)
  459. max_age = self.config["bridge.portal_create_max_age"] * 1_000_000
  460. limit = self.config["bridge.chat_sync_limit"]
  461. create_limit = self.config["bridge.chat_create_limit"]
  462. min_active_at = (time.time() * 1_000_000) - max_age
  463. i = 0
  464. await self.push_bridge_state(BridgeStateEvent.BACKFILLING)
  465. async for thread in self.client.iter_inbox(start_at=resp):
  466. try:
  467. await self._sync_thread(
  468. thread=thread,
  469. allow_create=thread.last_activity_at > min_active_at and i < create_limit,
  470. )
  471. except Exception:
  472. self.log.exception(f"Error syncing thread {thread.thread_id}")
  473. i += 1
  474. if i >= limit:
  475. break
  476. try:
  477. await self.update_direct_chats()
  478. except Exception:
  479. self.log.exception("Error updating direct chat list")
  480. def start_listen(self, is_after_sync: bool = False) -> None:
  481. self.shutdown = False
  482. task = self._listen(
  483. seq_id=self.seq_id, snapshot_at_ms=self.snapshot_at_ms, is_after_sync=is_after_sync
  484. )
  485. self._listen_task = self.loop.create_task(task)
  486. async def fetch_user_and_reconnect(self) -> None:
  487. self.log.debug("Refetching current user after disconnection")
  488. try:
  489. resp = await self.client.current_user()
  490. except IGNotLoggedInError as e:
  491. self.log.warning(f"Failed to reconnect to Instagram: {e}, logging out")
  492. await self.logout(error=e)
  493. return
  494. except (IGChallengeError, IGConsentRequiredError) as e:
  495. await self._handle_checkpoint(e, on="reconnect")
  496. return
  497. except Exception as e:
  498. self.log.exception("Error while reconnecting to Instagram")
  499. if isinstance(e, IGCheckpointError):
  500. self.log.debug("Checkpoint error content: %s", e.body)
  501. await self.push_bridge_state(
  502. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  503. )
  504. return
  505. else:
  506. self.log.debug(f"Confirmed current user {resp.user.pk}")
  507. self.proxy_handler.update_proxy_url()
  508. await self.on_proxy_update()
  509. self.start_listen()
  510. async def _listen(self, seq_id: int, snapshot_at_ms: int, is_after_sync: bool) -> None:
  511. try:
  512. await self.mqtt.listen(
  513. graphql_subs={
  514. GraphQLSubscription.app_presence(),
  515. GraphQLSubscription.direct_typing(self.state.user_id),
  516. GraphQLSubscription.direct_status(),
  517. },
  518. skywalker_subs={
  519. SkywalkerSubscription.direct_sub(self.state.user_id),
  520. SkywalkerSubscription.live_sub(self.state.user_id),
  521. },
  522. seq_id=seq_id,
  523. snapshot_at_ms=snapshot_at_ms,
  524. )
  525. except IrisSubscribeError as e:
  526. if is_after_sync:
  527. self.log.exception("Got IrisSubscribeError right after refresh")
  528. await self.send_bridge_notice(
  529. f"Reconnecting failed again after refresh: {e}",
  530. important=True,
  531. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  532. error_code="ig-refresh-connection-error",
  533. error_message=str(e),
  534. info={"python_error": str(e)},
  535. )
  536. else:
  537. self.log.warning(f"Got IrisSubscribeError {e}, refreshing...")
  538. asyncio.create_task(self.refresh())
  539. except (MQTTNotConnected, MQTTNotLoggedIn, MQTTConnectionUnauthorized) as e:
  540. self.log.warning(f"Unexpected connection error: {e}")
  541. await self.send_bridge_notice(
  542. f"Error in listener: {e}",
  543. important=True,
  544. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  545. error_code="ig-connection-error",
  546. )
  547. self.mqtt.disconnect()
  548. asyncio.create_task(self.fetch_user_and_reconnect())
  549. except Exception as e:
  550. self.log.exception("Fatal error in listener")
  551. await self.send_bridge_notice(
  552. "Fatal error in listener (see logs for more info)",
  553. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  554. important=True,
  555. error_code="ig-unknown-connection-error",
  556. info={"python_error": str(e)},
  557. )
  558. self.mqtt.disconnect()
  559. else:
  560. if not self.shutdown:
  561. await self.send_bridge_notice(
  562. "Instagram connection closed without error",
  563. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  564. error_code="ig-disconnected",
  565. )
  566. finally:
  567. self._listen_task = None
  568. self._is_connected = False
  569. self._track_metric(METRIC_CONNECTED, False)
  570. async def stop_listen(self) -> None:
  571. if self.mqtt:
  572. self.shutdown = True
  573. self.mqtt.disconnect()
  574. if self._listen_task:
  575. await self._listen_task
  576. self.shutdown = False
  577. self._track_metric(METRIC_CONNECTED, False)
  578. self._is_connected = False
  579. await self.update()
  580. async def logout(self, error: IGNotLoggedInError | None = None) -> None:
  581. if self.client and error is None:
  582. try:
  583. await self.client.logout(one_tap_app_login=False)
  584. except Exception:
  585. self.log.debug("Exception logging out", exc_info=True)
  586. if self.mqtt:
  587. self.mqtt.disconnect()
  588. self._track_metric(METRIC_CONNECTED, False)
  589. self._track_metric(METRIC_LOGGED_IN, False)
  590. if error is None:
  591. await self.push_bridge_state(BridgeStateEvent.LOGGED_OUT)
  592. puppet = await pu.Puppet.get_by_pk(self.igpk, create=False)
  593. if puppet and puppet.is_real_user:
  594. await puppet.switch_mxid(None, None)
  595. try:
  596. del self.by_igpk[self.igpk]
  597. except KeyError:
  598. pass
  599. self.igpk = None
  600. else:
  601. self.log.debug("Auth error body: %s", error.body.serialize())
  602. await self.send_bridge_notice(
  603. f"You have been logged out of Instagram: {error.proper_message}",
  604. important=True,
  605. state_event=BridgeStateEvent.BAD_CREDENTIALS,
  606. error_code="ig-auth-error",
  607. error_message=error.proper_message,
  608. )
  609. self.client = None
  610. self.mqtt = None
  611. self.state = None
  612. self.seq_id = None
  613. self.snapshot_at_ms = None
  614. self._is_logged_in = False
  615. await self.update()
  616. # endregion
  617. # region Event handlers
  618. async def _save_seq_id_after_sleep(self) -> None:
  619. await asyncio.sleep(120)
  620. self._seq_id_save_task = None
  621. self.log.trace("Saving sequence ID %d/%d", self.seq_id, self.snapshot_at_ms)
  622. try:
  623. await self.save_seq_id()
  624. except Exception:
  625. self.log.exception("Error saving sequence ID")
  626. async def update_seq_id(self, evt: NewSequenceID) -> None:
  627. self.seq_id = evt.seq_id
  628. self.snapshot_at_ms = evt.snapshot_at_ms
  629. if not self._seq_id_save_task or self._seq_id_save_task.done():
  630. self.log.trace("Starting seq id save task (%d/%d)", evt.seq_id, evt.snapshot_at_ms)
  631. self._seq_id_save_task = asyncio.create_task(self._save_seq_id_after_sleep())
  632. else:
  633. self.log.trace("Not starting seq id save task (%d/%d)", evt.seq_id, evt.snapshot_at_ms)
  634. @async_time(METRIC_MESSAGE)
  635. async def handle_message(self, evt: MessageSyncEvent) -> None:
  636. portal = await po.Portal.get_by_thread_id(evt.message.thread_id, receiver=self.igpk)
  637. if not portal or not portal.mxid:
  638. self.log.debug("Got message in thread with no portal, getting info...")
  639. resp = await self.client.get_thread(evt.message.thread_id)
  640. portal = await po.Portal.get_by_thread(resp.thread, self.igpk)
  641. self.log.debug("Got info for unknown portal, creating room")
  642. await portal.create_matrix_room(self, resp.thread)
  643. if not portal.mxid:
  644. self.log.warning(
  645. "Room creation appears to have failed, "
  646. f"dropping message in {evt.message.thread_id}"
  647. )
  648. return
  649. self.log.trace(f"Received message sync event {evt.message}")
  650. await portal.backfill_lock.wait(f"{evt.message.op} {evt.message.item_id}")
  651. if evt.message.new_reaction:
  652. await portal.handle_instagram_reaction(
  653. evt.message, remove=evt.message.op == Operation.REMOVE
  654. )
  655. return
  656. sender = await pu.Puppet.get_by_pk(evt.message.user_id) if evt.message.user_id else None
  657. if evt.message.op == Operation.ADD:
  658. if not sender:
  659. # I don't think we care about adds with no sender
  660. return
  661. await portal.handle_instagram_item(self, sender, evt.message)
  662. elif evt.message.op == Operation.REMOVE:
  663. # Removes don't have a sender, only the message sender can unsend messages anyway
  664. await portal.handle_instagram_remove(evt.message.item_id)
  665. elif evt.message.op == Operation.REPLACE:
  666. await portal.handle_instagram_update(evt.message)
  667. @async_time(METRIC_THREAD_SYNC)
  668. async def handle_thread_sync(self, evt: ThreadSyncEvent) -> None:
  669. self.log.trace("Thread sync event content: %s", evt)
  670. portal = await po.Portal.get_by_thread(evt, receiver=self.igpk)
  671. if portal.mxid:
  672. self.log.debug("Got thread sync event for %s with existing portal", portal.thread_id)
  673. await portal.update_matrix_room(self, evt)
  674. elif evt.is_group:
  675. self.log.debug(
  676. "Got thread sync event for group %s without existing portal, creating room",
  677. portal.thread_id,
  678. )
  679. await portal.create_matrix_room(self, evt)
  680. else:
  681. self.log.debug(
  682. "Got thread sync event for DM %s without existing portal, ignoring",
  683. portal.thread_id,
  684. )
  685. @async_time(METRIC_RTD)
  686. async def handle_rtd(self, evt: RealtimeDirectEvent) -> None:
  687. if not isinstance(evt.value, ActivityIndicatorData):
  688. return
  689. now = int(time.time() * 1000)
  690. date = evt.value.timestamp_ms
  691. expiry = date + evt.value.ttl
  692. if expiry < now:
  693. return
  694. if evt.activity_indicator_id in self._activity_indicator_ids:
  695. return
  696. # TODO clear expired items from this dict
  697. self._activity_indicator_ids[evt.activity_indicator_id] = expiry
  698. puppet = await pu.Puppet.get_by_pk(int(evt.value.sender_id))
  699. portal = await po.Portal.get_by_thread_id(evt.thread_id, receiver=self.igpk)
  700. if not puppet or not portal or not portal.mxid:
  701. return
  702. is_typing = evt.value.activity_status != TypingStatus.OFF
  703. if puppet.pk == self.igpk:
  704. self.remote_typing_status = TypingStatus.TEXT if is_typing else TypingStatus.OFF
  705. await puppet.intent_for(portal).set_typing(
  706. portal.mxid, is_typing=is_typing, timeout=evt.value.ttl
  707. )
  708. # endregion
  709. # region Database getters
  710. def _add_to_cache(self) -> None:
  711. self.by_mxid[self.mxid] = self
  712. if self.igpk:
  713. self.by_igpk[self.igpk] = self
  714. @classmethod
  715. @async_getter_lock
  716. async def get_by_mxid(cls, mxid: UserID, *, create: bool = True) -> User | None:
  717. # Never allow ghosts to be users
  718. if pu.Puppet.get_id_from_mxid(mxid):
  719. return None
  720. try:
  721. return cls.by_mxid[mxid]
  722. except KeyError:
  723. pass
  724. user = cast(cls, await super().get_by_mxid(mxid))
  725. if user is not None:
  726. user._add_to_cache()
  727. return user
  728. if create:
  729. user = cls(mxid)
  730. await user.insert()
  731. user._add_to_cache()
  732. return user
  733. return None
  734. @classmethod
  735. @async_getter_lock
  736. async def get_by_igpk(cls, igpk: int) -> User | None:
  737. try:
  738. return cls.by_igpk[igpk]
  739. except KeyError:
  740. pass
  741. user = cast(cls, await super().get_by_igpk(igpk))
  742. if user is not None:
  743. user._add_to_cache()
  744. return user
  745. return None
  746. @classmethod
  747. async def all_logged_in(cls) -> AsyncGenerator[User, None]:
  748. users = await super().all_logged_in()
  749. user: cls
  750. for index, user in enumerate(users):
  751. try:
  752. yield cls.by_mxid[user.mxid]
  753. except KeyError:
  754. user._add_to_cache()
  755. yield user
  756. # endregion