user.py 47 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2023 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable, Callable, cast
  18. from datetime import datetime, timedelta
  19. from functools import partial
  20. import asyncio
  21. import logging
  22. import time
  23. from mauigpapi import (
  24. RETRYABLE_PROXY_EXCEPTIONS,
  25. AndroidAPI,
  26. AndroidMQTT,
  27. AndroidState,
  28. ProxyHandler,
  29. )
  30. from mauigpapi.errors import (
  31. IGChallengeError,
  32. IGCheckpointError,
  33. IGConsentRequiredError,
  34. IGNotLoggedInError,
  35. IGRateLimitError,
  36. IGUserIDNotFoundError,
  37. IrisSubscribeError,
  38. MQTTConnectionUnauthorized,
  39. MQTTNotConnected,
  40. MQTTNotLoggedIn,
  41. MQTTReconnectionError,
  42. )
  43. from mauigpapi.mqtt import (
  44. Connect,
  45. Disconnect,
  46. GraphQLSubscription,
  47. NewSequenceID,
  48. ProxyUpdate,
  49. SkywalkerSubscription,
  50. )
  51. from mauigpapi.types import (
  52. ActivityIndicatorData,
  53. CurrentUser,
  54. MessageSyncEvent,
  55. Operation,
  56. RealtimeDirectEvent,
  57. Thread,
  58. ThreadRemoveEvent,
  59. ThreadSyncEvent,
  60. TypingStatus,
  61. )
  62. from mauigpapi.types.direct_inbox import DMInbox, DMInboxResponse
  63. from mautrix.appservice import AppService
  64. from mautrix.bridge import BaseUser, async_getter_lock
  65. from mautrix.types import EventID, MessageType, RoomID, TextMessageEventContent, UserID
  66. from mautrix.util import background_task
  67. from mautrix.util.bridge_state import BridgeState, BridgeStateEvent
  68. from mautrix.util.logging import TraceLogger
  69. from mautrix.util.opt_prometheus import Gauge, Summary, async_time
  70. from mautrix.util.simple_lock import SimpleLock
  71. from . import portal as po, puppet as pu
  72. from .config import Config
  73. from .db import Backfill, Message as DBMessage, Portal as DBPortal, User as DBUser
  74. if TYPE_CHECKING:
  75. from .__main__ import InstagramBridge
  76. try:
  77. from aiohttp_socks import ProxyConnectionError, ProxyError, ProxyTimeoutError
  78. except ImportError:
  79. class ProxyError(Exception):
  80. pass
  81. ProxyConnectionError = ProxyTimeoutError = ProxyError
  82. METRIC_MESSAGE = Summary("bridge_on_message", "calls to handle_message")
  83. METRIC_THREAD_SYNC = Summary("bridge_on_thread_sync", "calls to handle_thread_sync")
  84. METRIC_RTD = Summary("bridge_on_rtd", "calls to handle_rtd")
  85. METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
  86. METRIC_CONNECTED = Gauge("bridge_connected", "Bridged users connected to Instagram")
  87. BridgeState.human_readable_errors.update(
  88. {
  89. "ig-connection-error": "Instagram disconnected unexpectedly",
  90. "ig-refresh-connection-error": "Reconnecting failed again after refresh: {message}",
  91. "ig-connection-fatal-error": "Instagram disconnected unexpectedly",
  92. "ig-auth-error": "Authentication error from Instagram: {message}",
  93. "ig-checkpoint": "Instagram checkpoint error. Please check the Instagram website.",
  94. "ig-consent-required": "Instagram requires a consent update. Please check the Instagram website.",
  95. "ig-checkpoint-locked": "Instagram checkpoint error. Please check the Instagram website.",
  96. "ig-rate-limit": "Got Instagram ratelimit error, waiting a few minutes before retrying...",
  97. "ig-disconnected": None,
  98. "ig-no-mqtt": "You're not connected to Instagram",
  99. "logged-out": "You're not logged into Instagram",
  100. }
  101. )
  102. class User(DBUser, BaseUser):
  103. ig_base_log: TraceLogger = logging.getLogger("mau.instagram")
  104. _activity_indicator_ids: dict[str, int] = {}
  105. by_mxid: dict[UserID, User] = {}
  106. by_igpk: dict[int, User] = {}
  107. config: Config
  108. az: AppService
  109. loop: asyncio.AbstractEventLoop
  110. client: AndroidAPI | None
  111. mqtt: AndroidMQTT | None
  112. _listen_task: asyncio.Task | None = None
  113. _sync_lock: SimpleLock
  114. _backfill_loop_task: asyncio.Task | None
  115. _thread_sync_task: asyncio.Task | None
  116. _seq_id_save_task: asyncio.Task | None
  117. permission_level: str
  118. username: str | None
  119. _notice_room_lock: asyncio.Lock
  120. _notice_send_lock: asyncio.Lock
  121. _is_logged_in: bool
  122. _is_connected: bool
  123. shutdown: bool
  124. remote_typing_status: TypingStatus | None
  125. def __init__(
  126. self,
  127. mxid: UserID,
  128. igpk: int | None = None,
  129. state: AndroidState | None = None,
  130. notice_room: RoomID | None = None,
  131. seq_id: int | None = None,
  132. snapshot_at_ms: int | None = None,
  133. oldest_cursor: str | None = None,
  134. total_backfilled_portals: int | None = None,
  135. thread_sync_completed: bool = False,
  136. ) -> None:
  137. super().__init__(
  138. mxid=mxid,
  139. igpk=igpk,
  140. state=state,
  141. notice_room=notice_room,
  142. seq_id=seq_id,
  143. snapshot_at_ms=snapshot_at_ms,
  144. oldest_cursor=oldest_cursor,
  145. total_backfilled_portals=total_backfilled_portals,
  146. thread_sync_completed=thread_sync_completed,
  147. )
  148. BaseUser.__init__(self)
  149. self._notice_room_lock = asyncio.Lock()
  150. self._notice_send_lock = asyncio.Lock()
  151. perms = self.config.get_permissions(mxid)
  152. self.relay_whitelisted, self.is_whitelisted, self.is_admin, self.permission_level = perms
  153. self.client = None
  154. self.mqtt = None
  155. self.username = None
  156. self._is_logged_in = False
  157. self._is_connected = False
  158. self._is_refreshing = False
  159. self.shutdown = False
  160. self._sync_lock = SimpleLock(
  161. "Waiting for thread sync to finish before handling %s", log=self.log
  162. )
  163. self._listen_task = None
  164. self._thread_sync_task = None
  165. self._backfill_loop_task = None
  166. self.remote_typing_status = None
  167. self._seq_id_save_task = None
  168. self.proxy_handler = ProxyHandler(
  169. api_url=self.config["bridge.get_proxy_api_url"],
  170. )
  171. @classmethod
  172. def init_cls(cls, bridge: "InstagramBridge") -> AsyncIterable[Awaitable[None]]:
  173. cls.bridge = bridge
  174. cls.config = bridge.config
  175. cls.az = bridge.az
  176. cls.loop = bridge.loop
  177. return (user.try_connect() async for user in cls.all_logged_in())
  178. # region Connection management
  179. async def is_logged_in(self) -> bool:
  180. return bool(self.client) and self._is_logged_in
  181. async def get_puppet(self) -> pu.Puppet | None:
  182. if not self.igpk:
  183. return None
  184. return await pu.Puppet.get_by_pk(self.igpk)
  185. async def get_portal_with(self, puppet: pu.Puppet, create: bool = True) -> po.Portal | None:
  186. if not self.igpk:
  187. return None
  188. portal = await po.Portal.find_private_chat(self.igpk, puppet.pk)
  189. if portal:
  190. return portal
  191. if create:
  192. # TODO add error handling somewhere
  193. thread = await self.client.create_group_thread([puppet.pk])
  194. portal = await po.Portal.get_by_thread(thread, self.igpk)
  195. await portal.update_info(thread, self)
  196. return portal
  197. return None
  198. async def try_connect(self) -> None:
  199. try:
  200. await self.connect()
  201. except Exception as e:
  202. self.log.exception("Error while connecting to Instagram")
  203. await self.push_bridge_state(
  204. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  205. )
  206. @property
  207. def api_log(self) -> TraceLogger:
  208. return self.ig_base_log.getChild("http").getChild(self.mxid)
  209. @property
  210. def is_connected(self) -> bool:
  211. return bool(self.client) and bool(self.mqtt) and self._is_connected
  212. async def connect(self, user: CurrentUser | None = None) -> None:
  213. if not self.state:
  214. await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error="logged-out")
  215. return
  216. client = AndroidAPI(
  217. self.state,
  218. log=self.api_log,
  219. proxy_handler=self.proxy_handler,
  220. )
  221. if not user:
  222. try:
  223. resp = await client.current_user()
  224. user = resp.user
  225. except IGNotLoggedInError as e:
  226. self.log.warning(f"Failed to connect to Instagram: {e}, logging out")
  227. await self.logout(error=e)
  228. return
  229. except IGCheckpointError as e:
  230. self.log.debug("Checkpoint error content: %s", e.body)
  231. raise
  232. except (IGChallengeError, IGConsentRequiredError) as e:
  233. await self._handle_checkpoint(e, on="connect", client=client)
  234. return
  235. self.client = client
  236. self._is_logged_in = True
  237. self.igpk = user.pk
  238. self.username = user.username
  239. await self.push_bridge_state(BridgeStateEvent.CONNECTING)
  240. self._track_metric(METRIC_LOGGED_IN, True)
  241. self.by_igpk[self.igpk] = self
  242. self.mqtt = AndroidMQTT(
  243. self.state,
  244. log=self.ig_base_log.getChild("mqtt").getChild(self.mxid),
  245. proxy_handler=self.proxy_handler,
  246. )
  247. self.mqtt.add_event_handler(Connect, self.on_connect)
  248. self.mqtt.add_event_handler(Disconnect, self.on_disconnect)
  249. self.mqtt.add_event_handler(NewSequenceID, self.update_seq_id)
  250. self.mqtt.add_event_handler(MessageSyncEvent, self.handle_message)
  251. self.mqtt.add_event_handler(ThreadSyncEvent, self.handle_thread_sync)
  252. self.mqtt.add_event_handler(ThreadRemoveEvent, self.handle_thread_remove)
  253. self.mqtt.add_event_handler(RealtimeDirectEvent, self.handle_rtd)
  254. self.mqtt.add_event_handler(ProxyUpdate, self.on_proxy_update)
  255. await self.update()
  256. self.loop.create_task(self._try_sync_puppet(user))
  257. self.loop.create_task(self._post_connect())
  258. async def _post_connect(self):
  259. # Backfill requests are handled synchronously so as not to overload the homeserver.
  260. # Users can configure their backfill stages to be more or less aggressive with backfilling
  261. # to try and avoid getting banned.
  262. if not self._backfill_loop_task or self._backfill_loop_task.done():
  263. self._backfill_loop_task = asyncio.create_task(self._handle_backfill_requests_loop())
  264. if not self.seq_id:
  265. await self._try_sync()
  266. else:
  267. self.log.debug("Connecting to MQTT directly as resync_on_startup is false")
  268. self.start_listen()
  269. if self.config["bridge.backfill.enable"]:
  270. if self._thread_sync_task and not self._thread_sync_task.done():
  271. self.log.warning("Cancelling existing background thread sync task")
  272. self._thread_sync_task.cancel()
  273. self._thread_sync_task = asyncio.create_task(self.backfill_threads())
  274. async def _handle_backfill_requests_loop(self) -> None:
  275. if not self.config["bridge.backfill.enable"] or not self.config["bridge.backfill.msc2716"]:
  276. return
  277. while True:
  278. await self._sync_lock.wait("backfill request")
  279. req = await Backfill.get_next(self.mxid)
  280. if not req:
  281. await asyncio.sleep(30)
  282. continue
  283. self.log.info("Backfill request %s", req)
  284. try:
  285. portal = await po.Portal.get_by_thread_id(
  286. req.portal_thread_id, receiver=req.portal_receiver
  287. )
  288. await req.mark_dispatched()
  289. await portal.backfill(self, req)
  290. await req.mark_done()
  291. except IGNotLoggedInError as e:
  292. self.log.exception("User got logged out during backfill loop")
  293. await self.logout(error=e)
  294. break
  295. except (IGChallengeError, IGConsentRequiredError) as e:
  296. self.log.exception("User got a challenge during backfill loop")
  297. await self._handle_checkpoint(e, on="backfill")
  298. break
  299. except Exception as e:
  300. self.log.exception("Failed to backfill portal %s: %s", req.portal_thread_id, e)
  301. # Don't try again to backfill this portal for a minute.
  302. await req.set_cooldown_timeout(60)
  303. self._backfill_loop_task = None
  304. async def on_connect(self, evt: Connect) -> None:
  305. self.log.debug("Connected to Instagram")
  306. self._track_metric(METRIC_CONNECTED, True)
  307. self._is_connected = True
  308. await self.send_bridge_notice("Connected to Instagram")
  309. await self.push_bridge_state(BridgeStateEvent.CONNECTED)
  310. async def on_disconnect(self, evt: Disconnect) -> None:
  311. self.log.debug("Disconnected from Instagram")
  312. self._track_metric(METRIC_CONNECTED, False)
  313. self._is_connected = False
  314. async def on_proxy_update(self, evt: ProxyUpdate | None = None) -> None:
  315. if self.client:
  316. self.client.setup_http(self.state.cookies.jar)
  317. # TODO this stuff could probably be moved to mautrix-python
  318. async def get_notice_room(self) -> RoomID:
  319. if not self.notice_room:
  320. async with self._notice_room_lock:
  321. # If someone already created the room while this call was waiting,
  322. # don't make a new room
  323. if self.notice_room:
  324. return self.notice_room
  325. creation_content = {}
  326. if not self.config["bridge.federate_rooms"]:
  327. creation_content["m.federate"] = False
  328. self.notice_room = await self.az.intent.create_room(
  329. is_direct=True,
  330. invitees=[self.mxid],
  331. topic="Instagram bridge notices",
  332. creation_content=creation_content,
  333. )
  334. await self.update()
  335. return self.notice_room
  336. async def fill_bridge_state(self, state: BridgeState) -> None:
  337. await super().fill_bridge_state(state)
  338. if not state.remote_id:
  339. if self.igpk:
  340. state.remote_id = str(self.igpk)
  341. else:
  342. try:
  343. state.remote_id = self.state.user_id
  344. except IGUserIDNotFoundError:
  345. state.remote_id = None
  346. if self.username:
  347. state.remote_name = f"@{self.username}"
  348. async def get_bridge_states(self) -> list[BridgeState]:
  349. if not self.state:
  350. return []
  351. state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR)
  352. if self.is_connected:
  353. state.state_event = BridgeStateEvent.CONNECTED
  354. elif self._is_refreshing or self.mqtt:
  355. state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
  356. return [state]
  357. async def send_bridge_notice(
  358. self,
  359. text: str,
  360. edit: EventID | None = None,
  361. state_event: BridgeStateEvent | None = None,
  362. important: bool = False,
  363. error_code: str | None = None,
  364. error_message: str | None = None,
  365. info: dict | None = None,
  366. ) -> EventID | None:
  367. if state_event:
  368. await self.push_bridge_state(
  369. state_event,
  370. error=error_code,
  371. message=error_message if error_code else text,
  372. info=info,
  373. )
  374. if self.config["bridge.disable_bridge_notices"]:
  375. return None
  376. if not important and not self.config["bridge.unimportant_bridge_notices"]:
  377. self.log.debug("Not sending unimportant bridge notice: %s", text)
  378. return None
  379. event_id = None
  380. try:
  381. self.log.debug("Sending bridge notice: %s", text)
  382. content = TextMessageEventContent(
  383. body=text, msgtype=(MessageType.TEXT if important else MessageType.NOTICE)
  384. )
  385. if edit:
  386. content.set_edit(edit)
  387. # This is locked to prevent notices going out in the wrong order
  388. async with self._notice_send_lock:
  389. event_id = await self.az.intent.send_message(await self.get_notice_room(), content)
  390. except Exception:
  391. self.log.warning("Failed to send bridge notice", exc_info=True)
  392. return edit or event_id
  393. async def _try_sync_puppet(self, user_info: CurrentUser) -> None:
  394. puppet = await pu.Puppet.get_by_pk(self.igpk)
  395. try:
  396. await puppet.update_info(user_info, self)
  397. except Exception:
  398. self.log.exception("Failed to update own puppet info")
  399. try:
  400. if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
  401. self.log.info(f"Automatically enabling custom puppet")
  402. await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
  403. except Exception:
  404. self.log.exception("Failed to automatically enable custom puppet")
  405. async def _try_sync(self) -> None:
  406. try:
  407. await self.sync()
  408. except Exception as e:
  409. self.log.exception("Exception while syncing")
  410. if isinstance(e, IGCheckpointError):
  411. self.log.debug("Checkpoint error content: %s", e.body)
  412. await self.push_bridge_state(
  413. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  414. )
  415. async def get_direct_chats(self) -> dict[UserID, list[RoomID]]:
  416. return {
  417. pu.Puppet.get_mxid_from_id(portal.other_user_pk): [portal.mxid]
  418. for portal in await DBPortal.find_private_chats_of(self.igpk)
  419. if portal.mxid
  420. }
  421. async def refresh(self, resync: bool = True) -> None:
  422. self._is_refreshing = True
  423. try:
  424. await self.stop_listen()
  425. self.state.reset_pigeon_session_id()
  426. if resync:
  427. retry_count = 0
  428. minutes = 1
  429. while True:
  430. try:
  431. await self.sync()
  432. return
  433. except Exception as e:
  434. if retry_count >= 4 and minutes < 10:
  435. minutes += 1
  436. retry_count += 1
  437. s = "s" if minutes != 1 else ""
  438. self.log.exception(
  439. f"Error while syncing for refresh, retrying in {minutes} minute{s}"
  440. )
  441. if isinstance(e, IGCheckpointError):
  442. self.log.debug("Checkpoint error content: %s", e.body)
  443. await self.push_bridge_state(
  444. BridgeStateEvent.UNKNOWN_ERROR,
  445. error="unknown-error",
  446. message="An unknown error occurred while connecting to Instagram",
  447. info={"python_error": str(e)},
  448. )
  449. await asyncio.sleep(minutes * 60)
  450. else:
  451. self.start_listen()
  452. finally:
  453. self._is_refreshing = False
  454. self.proxy_handler.update_proxy_url()
  455. async def _handle_checkpoint(
  456. self,
  457. e: IGChallengeError | IGConsentRequiredError,
  458. on: str,
  459. client: AndroidAPI | None = None,
  460. ) -> None:
  461. self.log.warning(f"Got checkpoint error on {on}: {e.body.serialize()}")
  462. client = client or self.client
  463. self.client = None
  464. self.mqtt = None
  465. if isinstance(e, IGConsentRequiredError):
  466. await self.push_bridge_state(
  467. BridgeStateEvent.BAD_CREDENTIALS,
  468. error="ig-consent-required",
  469. info=e.body.serialize(),
  470. )
  471. return
  472. error_code = "ig-checkpoint"
  473. try:
  474. resp = await client.challenge_reset()
  475. info = {
  476. "challenge_context": (
  477. resp.challenge_context.serialize() if resp.challenge_context_str else None
  478. ),
  479. "step_name": resp.step_name,
  480. "step_data": resp.step_data.serialize() if resp.step_data else None,
  481. "user_id": resp.user_id,
  482. "action": resp.action,
  483. "status": resp.status,
  484. "challenge": e.body.challenge.serialize() if e.body.challenge else None,
  485. }
  486. self.log.debug(f"Challenge state: {resp.serialize()}")
  487. if resp.challenge_context.challenge_type_enum == "HACKED_LOCK":
  488. error_code = "ig-checkpoint-locked"
  489. except Exception:
  490. self.log.exception("Error resetting challenge state")
  491. info = {"challenge": e.body.challenge.serialize() if e.body.challenge else None}
  492. await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error=error_code, info=info)
  493. async def _sync_thread(self, thread: Thread) -> bool:
  494. """
  495. Sync a specific thread. Returns whether the thread had messages after the last message in
  496. the database before the sync.
  497. """
  498. self.log.debug(f"Syncing thread {thread.thread_id}")
  499. forward_messages = thread.items
  500. assert self.client
  501. portal = await po.Portal.get_by_thread(thread, self.igpk)
  502. assert portal
  503. # Create or update the Matrix room
  504. if not portal.mxid:
  505. await portal.create_matrix_room(self, thread)
  506. else:
  507. await portal.update_matrix_room(self, thread)
  508. if not self.config["bridge.backfill.enable_initial"]:
  509. return True
  510. last_message = await DBMessage.get_last(portal.mxid)
  511. cursor = thread.oldest_cursor
  512. if last_message:
  513. original_number_of_messages = len(thread.items)
  514. new_messages = [
  515. m for m in thread.items if last_message.ig_timestamp_ms < m.timestamp_ms
  516. ]
  517. forward_messages = new_messages
  518. portal.log.debug(
  519. f"{len(new_messages)}/{original_number_of_messages} messages are after most recent"
  520. " message."
  521. )
  522. # Fetch more messages until we get back to messages that have been bridged already.
  523. while len(new_messages) > 0 and len(new_messages) == original_number_of_messages:
  524. await asyncio.sleep(self.config["bridge.backfill.incremental.page_delay"])
  525. portal.log.debug("Fetching more messages for forward backfill")
  526. resp = await self.client.get_thread(portal.thread_id, cursor=cursor)
  527. if len(resp.thread.items) == 0:
  528. break
  529. original_number_of_messages = len(resp.thread.items)
  530. new_messages = [
  531. m for m in resp.thread.items if last_message.ig_timestamp_ms < m.timestamp_ms
  532. ]
  533. forward_messages = new_messages + forward_messages
  534. cursor = resp.thread.oldest_cursor
  535. portal.log.debug(
  536. f"{len(new_messages)}/{original_number_of_messages} messages are after most "
  537. "recent message."
  538. )
  539. elif not portal.first_event_id:
  540. self.log.debug(
  541. f"Skipping backfilling {portal.thread_id} as the first event ID is not known"
  542. )
  543. return False
  544. if forward_messages:
  545. portal.cursor = cursor
  546. await portal.update()
  547. mark_read = thread.read_state == 0 or (
  548. (hours := self.config["bridge.backfill.unread_hours_threshold"]) > 0
  549. and (
  550. datetime.fromtimestamp(forward_messages[0].timestamp_ms / 1000)
  551. < datetime.now() - timedelta(hours=hours)
  552. )
  553. )
  554. base_insertion_event_id = await portal.backfill_message_page(
  555. self,
  556. list(reversed(forward_messages)),
  557. forward=True,
  558. last_message=last_message,
  559. mark_read=mark_read,
  560. )
  561. if (
  562. not self.bridge.homeserver_software.is_hungry
  563. and self.config["bridge.backfill.msc2716"]
  564. ):
  565. await portal.send_post_backfill_dummy(
  566. forward_messages[0].timestamp, base_insertion_event_id=base_insertion_event_id
  567. )
  568. if (
  569. mark_read
  570. and not self.bridge.homeserver_software.is_hungry
  571. and (puppet := await self.get_puppet())
  572. ):
  573. last_message = await DBMessage.get_last(portal.mxid)
  574. if last_message:
  575. await puppet.intent_for(portal).mark_read(portal.mxid, last_message.mxid)
  576. await portal._update_read_receipts(thread.last_seen_at)
  577. if self.config["bridge.backfill.msc2716"]:
  578. await portal.enqueue_immediate_backfill(self, 1)
  579. return len(forward_messages) > 0
  580. async def _maybe_update_proxy(self, source: str) -> None:
  581. if not self._listen_task:
  582. self.proxy_handler.update_proxy_url()
  583. await self.on_proxy_update()
  584. else:
  585. self.log.debug(f"Not updating proxy: listen_task is still running? (caller: {source})")
  586. async def sync(self, increment_total_backfilled_portals: bool = False) -> None:
  587. await self.run_with_sync_lock(partial(self._sync, increment_total_backfilled_portals))
  588. async def _sync(self, increment_total_backfilled_portals: bool = False) -> None:
  589. if not self._listen_task:
  590. self.state.reset_pigeon_session_id()
  591. sleep_minutes = 2
  592. errors = 0
  593. while True:
  594. try:
  595. resp = await self.client.get_inbox()
  596. break
  597. except RETRYABLE_PROXY_EXCEPTIONS as e:
  598. errors += 1
  599. wait = min(errors * 10, 60)
  600. self.log.warning(
  601. f"{e.__class__.__name__} while trying to sync, retrying in {wait} seconds: {e}"
  602. )
  603. await asyncio.sleep(wait)
  604. if errors > 1:
  605. await self._maybe_update_proxy("sync error")
  606. except IGNotLoggedInError as e:
  607. self.log.exception("Got not logged in error while syncing")
  608. await self.logout(error=e)
  609. return
  610. except IGRateLimitError as e:
  611. self.log.error(
  612. "Got ratelimit error while trying to get inbox (%s), retrying in %d minutes",
  613. e.body,
  614. sleep_minutes,
  615. )
  616. await self.push_bridge_state(
  617. BridgeStateEvent.TRANSIENT_DISCONNECT, error="ig-rate-limit"
  618. )
  619. await asyncio.sleep(sleep_minutes * 60)
  620. sleep_minutes += 2
  621. except IGCheckpointError as e:
  622. self.log.debug("Checkpoint error content: %s", e.body)
  623. raise
  624. except (IGChallengeError, IGConsentRequiredError) as e:
  625. await self._handle_checkpoint(e, on="sync")
  626. return
  627. self.seq_id = resp.seq_id
  628. self.snapshot_at_ms = resp.snapshot_at_ms
  629. await self.save_seq_id()
  630. if not self._listen_task:
  631. self.start_listen(is_after_sync=True)
  632. sync_count = min(
  633. self.config["bridge.backfill.max_conversations"],
  634. self.config["bridge.max_startup_thread_sync_count"],
  635. )
  636. self.log.debug(f"Fetching {sync_count} threads, 20 at a time...")
  637. local_limit: int | None = sync_count
  638. if sync_count == 0:
  639. return
  640. elif sync_count < 0:
  641. local_limit = None
  642. await self._sync_threads_with_delay(
  643. self.client.iter_inbox(
  644. self._update_seq_id_and_cursor, start_at=resp, local_limit=local_limit
  645. ),
  646. stop_when_threads_have_no_messages_to_backfill=True,
  647. increment_total_backfilled_portals=increment_total_backfilled_portals,
  648. local_limit=local_limit,
  649. )
  650. try:
  651. await self.update_direct_chats()
  652. except Exception:
  653. self.log.exception("Error updating direct chat list")
  654. async def backfill_threads(self):
  655. try:
  656. await self.run_with_sync_lock(self._backfill_threads)
  657. except Exception:
  658. self.log.exception("Error in thread backfill loop")
  659. async def _backfill_threads(self):
  660. assert self.client
  661. if not self.config["bridge.backfill.enable"]:
  662. return
  663. max_conversations = self.config["bridge.backfill.max_conversations"] or 0
  664. if 0 <= max_conversations <= (self.total_backfilled_portals or 0):
  665. self.log.info("Backfill max_conversations count reached, not syncing any more portals")
  666. return
  667. elif self.thread_sync_completed:
  668. self.log.debug("Thread backfill is marked as completed, not syncing more portals")
  669. return
  670. local_limit = (
  671. max_conversations - (self.total_backfilled_portals or 0)
  672. if max_conversations >= 0
  673. else None
  674. )
  675. start_at = None
  676. if self.oldest_cursor:
  677. start_at = DMInboxResponse(
  678. status="",
  679. seq_id=self.seq_id,
  680. snapshot_at_ms=0,
  681. pending_requests_total=0,
  682. has_pending_top_requests=False,
  683. viewer=None,
  684. inbox=DMInbox(
  685. threads=[],
  686. has_older=True,
  687. unseen_count=0,
  688. unseen_count_ts=0,
  689. blended_inbox_enabled=False,
  690. oldest_cursor=self.oldest_cursor,
  691. ),
  692. )
  693. backoff = self.config.get("bridge.backfill.backoff.thread_list", 300)
  694. await self._sync_threads_with_delay(
  695. self.client.iter_inbox(
  696. self._update_seq_id_and_cursor,
  697. start_at=start_at,
  698. local_limit=local_limit,
  699. rate_limit_exceeded_backoff=backoff,
  700. ),
  701. increment_total_backfilled_portals=True,
  702. local_limit=local_limit,
  703. )
  704. await self.update_direct_chats()
  705. def _update_seq_id_and_cursor(self, seq_id: int, cursor: str | None):
  706. self.seq_id = seq_id
  707. if cursor:
  708. self.oldest_cursor = cursor
  709. async def _sync_threads_with_delay(
  710. self,
  711. threads: AsyncIterable[Thread],
  712. increment_total_backfilled_portals: bool = False,
  713. stop_when_threads_have_no_messages_to_backfill: bool = False,
  714. local_limit: int | None = None,
  715. ):
  716. sync_delay = self.config["bridge.backfill.min_sync_thread_delay"]
  717. last_thread_sync_ts = 0.0
  718. found_thread_count = 0
  719. async for thread in threads:
  720. found_thread_count += 1
  721. now = time.monotonic()
  722. if now < last_thread_sync_ts + sync_delay:
  723. delay = last_thread_sync_ts + sync_delay - now
  724. self.log.debug("Thread sync is happening too quickly. Waiting for %ds", delay)
  725. await asyncio.sleep(delay)
  726. last_thread_sync_ts = time.monotonic()
  727. had_new_messages = await self._sync_thread(thread)
  728. if not had_new_messages and stop_when_threads_have_no_messages_to_backfill:
  729. self.log.debug("Got to threads with no new messages. Stopping sync.")
  730. return
  731. if increment_total_backfilled_portals:
  732. self.total_backfilled_portals = (self.total_backfilled_portals or 0) + 1
  733. await self.update()
  734. if local_limit is None or found_thread_count < local_limit:
  735. if local_limit is None:
  736. self.log.info(
  737. "Reached end of thread list with no limit, marking thread sync as completed"
  738. )
  739. else:
  740. self.log.info(
  741. f"Reached end of thread list (got {found_thread_count} with "
  742. f"limit {local_limit}), marking thread sync as completed"
  743. )
  744. self.thread_sync_completed = True
  745. await self.update()
  746. async def run_with_sync_lock(self, func: Callable[[], Awaitable]):
  747. with self._sync_lock:
  748. retry_count = 0
  749. while retry_count < 5:
  750. try:
  751. retry_count += 1
  752. await func()
  753. # The sync was successful. Exit the loop.
  754. return
  755. except IGNotLoggedInError as e:
  756. await self.logout(error=e)
  757. return
  758. except Exception:
  759. self.log.exception(
  760. "Failed to sync threads. Waiting 30 seconds before retrying sync."
  761. )
  762. await asyncio.sleep(30)
  763. # If we get here, it means that the sync has failed five times. If this happens, most
  764. # likely something very bad has happened.
  765. self.log.error("Failed to sync threads five times. Will not retry.")
  766. def start_listen(self, is_after_sync: bool = False) -> None:
  767. self.shutdown = False
  768. task = self._listen(
  769. seq_id=self.seq_id, snapshot_at_ms=self.snapshot_at_ms, is_after_sync=is_after_sync
  770. )
  771. self._listen_task = self.loop.create_task(task)
  772. async def delayed_start_listen(self, sleep: int) -> None:
  773. await asyncio.sleep(sleep)
  774. if self.is_connected:
  775. self.log.debug(
  776. "Already reconnected before delay after MQTT reconnection error finished"
  777. )
  778. else:
  779. self.log.debug("Reconnecting after MQTT connection error")
  780. self.start_listen()
  781. async def fetch_user_and_reconnect(self, sleep_first: int | None = None) -> None:
  782. if sleep_first:
  783. await asyncio.sleep(sleep_first)
  784. if self.is_connected:
  785. self.log.debug("Canceling user fetch, already reconnected")
  786. return
  787. self.log.debug("Refetching current user after disconnection")
  788. errors = 0
  789. while True:
  790. try:
  791. resp = await self.client.current_user()
  792. except RETRYABLE_PROXY_EXCEPTIONS as e:
  793. errors += 1
  794. wait = min(errors * 10, 60)
  795. self.log.warning(
  796. f"{e.__class__.__name__} while trying to check user for reconnection, "
  797. f"retrying in {wait} seconds: {e}"
  798. )
  799. await asyncio.sleep(wait)
  800. if errors > 1:
  801. await self._maybe_update_proxy("fetch_user_and_reconnect error")
  802. except IGNotLoggedInError as e:
  803. self.log.warning(f"Failed to reconnect to Instagram: {e}, logging out")
  804. await self.logout(error=e)
  805. return
  806. except (IGChallengeError, IGConsentRequiredError) as e:
  807. await self._handle_checkpoint(e, on="reconnect")
  808. return
  809. except Exception as e:
  810. self.log.exception("Error while reconnecting to Instagram")
  811. if isinstance(e, IGCheckpointError):
  812. self.log.debug("Checkpoint error content: %s", e.body)
  813. await self.push_bridge_state(
  814. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  815. )
  816. return
  817. else:
  818. self.log.debug(f"Confirmed current user {resp.user.pk}")
  819. self.start_listen()
  820. return
  821. async def _listen(self, seq_id: int, snapshot_at_ms: int, is_after_sync: bool) -> None:
  822. try:
  823. await self.mqtt.listen(
  824. graphql_subs={
  825. GraphQLSubscription.app_presence(),
  826. GraphQLSubscription.direct_typing(self.state.user_id),
  827. GraphQLSubscription.direct_status(),
  828. },
  829. skywalker_subs={
  830. SkywalkerSubscription.direct_sub(self.state.user_id),
  831. SkywalkerSubscription.live_sub(self.state.user_id),
  832. },
  833. seq_id=seq_id,
  834. snapshot_at_ms=snapshot_at_ms,
  835. )
  836. except IrisSubscribeError as e:
  837. if is_after_sync:
  838. self.log.exception("Got IrisSubscribeError right after refresh")
  839. await self.send_bridge_notice(
  840. f"Reconnecting failed again after refresh: {e}",
  841. important=True,
  842. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  843. error_code="ig-refresh-connection-error",
  844. error_message=str(e),
  845. info={"python_error": str(e)},
  846. )
  847. else:
  848. self.log.warning(f"Got IrisSubscribeError {e}, refreshing...")
  849. background_task.create(self.refresh())
  850. except MQTTReconnectionError as e:
  851. self.log.warning(
  852. f"Unexpected connection error: {e}, reconnecting in 1 minute", exc_info=True
  853. )
  854. await self.send_bridge_notice(
  855. f"Error in listener: {e}",
  856. important=True,
  857. state_event=BridgeStateEvent.TRANSIENT_DISCONNECT,
  858. error_code="ig-connection-error-socket",
  859. )
  860. self.mqtt.disconnect()
  861. background_task.create(self.delayed_start_listen(sleep=60))
  862. except (MQTTNotConnected, MQTTNotLoggedIn, MQTTConnectionUnauthorized) as e:
  863. self.log.warning(f"Unexpected connection error: {e}, checking auth and reconnecting")
  864. await self.send_bridge_notice(
  865. f"Error in listener: {e}",
  866. important=True,
  867. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  868. error_code="ig-connection-error-maybe-auth",
  869. )
  870. self.mqtt.disconnect()
  871. background_task.create(self.fetch_user_and_reconnect())
  872. except Exception as e:
  873. self.log.exception("Fatal error in listener, reconnecting in 5 minutes")
  874. await self.send_bridge_notice(
  875. "Fatal error in listener (see logs for more info)",
  876. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  877. important=True,
  878. error_code="ig-unknown-connection-error",
  879. info={"python_error": str(e)},
  880. )
  881. self.mqtt.disconnect()
  882. background_task.create(self.fetch_user_and_reconnect(sleep_first=300))
  883. else:
  884. if not self.shutdown:
  885. await self.send_bridge_notice(
  886. "Instagram connection closed without error",
  887. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  888. error_code="ig-disconnected",
  889. )
  890. finally:
  891. self._listen_task = None
  892. self._is_connected = False
  893. self._track_metric(METRIC_CONNECTED, False)
  894. async def stop_listen(self) -> None:
  895. if self.mqtt:
  896. self.shutdown = True
  897. self.mqtt.disconnect()
  898. if self._listen_task:
  899. await self._listen_task
  900. self.shutdown = False
  901. self._track_metric(METRIC_CONNECTED, False)
  902. self._is_connected = False
  903. await self.update()
  904. def stop_backfill_tasks(self) -> None:
  905. if self._backfill_loop_task:
  906. self._backfill_loop_task.cancel()
  907. self._backfill_loop_task = None
  908. if self._thread_sync_task:
  909. self._thread_sync_task.cancel()
  910. self._thread_sync_task = None
  911. async def logout(self, error: IGNotLoggedInError | None = None) -> None:
  912. await self.stop_listen()
  913. self.stop_backfill_tasks()
  914. if self.client and error is None:
  915. try:
  916. await self.client.logout(one_tap_app_login=False)
  917. except Exception:
  918. self.log.debug("Exception logging out", exc_info=True)
  919. if self.mqtt:
  920. self.mqtt.disconnect()
  921. self._track_metric(METRIC_CONNECTED, False)
  922. self._track_metric(METRIC_LOGGED_IN, False)
  923. if error is None:
  924. await self.push_bridge_state(BridgeStateEvent.LOGGED_OUT)
  925. puppet = await pu.Puppet.get_by_pk(self.igpk, create=False)
  926. if puppet and puppet.is_real_user:
  927. await puppet.switch_mxid(None, None)
  928. try:
  929. del self.by_igpk[self.igpk]
  930. except KeyError:
  931. pass
  932. self.igpk = None
  933. else:
  934. self.log.debug("Auth error body: %s", error.body.serialize())
  935. await self.send_bridge_notice(
  936. f"You have been logged out of Instagram: {error.proper_message}",
  937. important=True,
  938. state_event=BridgeStateEvent.BAD_CREDENTIALS,
  939. error_code="ig-auth-error",
  940. error_message=error.proper_message,
  941. )
  942. self.client = None
  943. self.mqtt = None
  944. self.state = None
  945. self.seq_id = None
  946. self.snapshot_at_ms = None
  947. self.thread_sync_completed = False
  948. self._is_logged_in = False
  949. await self.update()
  950. # endregion
  951. # region Event handlers
  952. async def _save_seq_id_after_sleep(self) -> None:
  953. await asyncio.sleep(120)
  954. self._seq_id_save_task = None
  955. self.log.trace("Saving sequence ID %d/%d", self.seq_id, self.snapshot_at_ms)
  956. try:
  957. await self.save_seq_id()
  958. except Exception:
  959. self.log.exception("Error saving sequence ID")
  960. async def update_seq_id(self, evt: NewSequenceID) -> None:
  961. self.seq_id = evt.seq_id
  962. self.snapshot_at_ms = evt.snapshot_at_ms
  963. if not self._seq_id_save_task or self._seq_id_save_task.done():
  964. self.log.trace("Starting seq id save task (%d/%d)", evt.seq_id, evt.snapshot_at_ms)
  965. self._seq_id_save_task = asyncio.create_task(self._save_seq_id_after_sleep())
  966. else:
  967. self.log.trace("Not starting seq id save task (%d/%d)", evt.seq_id, evt.snapshot_at_ms)
  968. @async_time(METRIC_MESSAGE)
  969. async def handle_message(self, evt: MessageSyncEvent) -> None:
  970. portal = await po.Portal.get_by_thread_id(evt.message.thread_id, receiver=self.igpk)
  971. if not portal or not portal.mxid:
  972. self.log.debug("Got message in thread with no portal, getting info...")
  973. resp = await self.client.get_thread(evt.message.thread_id)
  974. portal = await po.Portal.get_by_thread(resp.thread, self.igpk)
  975. self.log.debug("Got info for unknown portal, creating room")
  976. await portal.create_matrix_room(self, resp.thread)
  977. if not portal.mxid:
  978. self.log.warning(
  979. "Room creation appears to have failed, "
  980. f"dropping message in {evt.message.thread_id}"
  981. )
  982. return
  983. self.log.trace(f"Received message sync event {evt.message}")
  984. if evt.message.new_reaction:
  985. await portal.handle_instagram_reaction(
  986. evt.message, remove=evt.message.op == Operation.REMOVE
  987. )
  988. return
  989. sender = await pu.Puppet.get_by_pk(evt.message.user_id) if evt.message.user_id else None
  990. if evt.message.op == Operation.ADD:
  991. if not sender:
  992. # I don't think we care about adds with no sender
  993. return
  994. await portal.handle_instagram_item(self, sender, evt.message)
  995. elif evt.message.op == Operation.REMOVE:
  996. # Removes don't have a sender, only the message sender can unsend messages anyway
  997. await portal.handle_instagram_remove(evt.message.item_id)
  998. elif evt.message.op == Operation.REPLACE:
  999. await portal.handle_instagram_update(evt.message)
  1000. @async_time(METRIC_THREAD_SYNC)
  1001. async def handle_thread_sync(self, evt: ThreadSyncEvent) -> None:
  1002. self.log.trace("Thread sync event content: %s", evt)
  1003. portal = await po.Portal.get_by_thread(evt, receiver=self.igpk)
  1004. if portal.mxid:
  1005. self.log.debug("Got thread sync event for %s with existing portal", portal.thread_id)
  1006. await portal.update_matrix_room(self, evt)
  1007. elif evt.is_group:
  1008. self.log.debug(
  1009. "Got thread sync event for group %s without existing portal, creating room",
  1010. portal.thread_id,
  1011. )
  1012. await portal.create_matrix_room(self, evt)
  1013. else:
  1014. self.log.debug(
  1015. "Got thread sync event for DM %s without existing portal, ignoring",
  1016. portal.thread_id,
  1017. )
  1018. async def handle_thread_remove(self, evt: ThreadRemoveEvent) -> None:
  1019. self.log.debug("Got thread remove event: %s", evt.serialize())
  1020. @async_time(METRIC_RTD)
  1021. async def handle_rtd(self, evt: RealtimeDirectEvent) -> None:
  1022. if not isinstance(evt.value, ActivityIndicatorData):
  1023. return
  1024. now = int(time.time() * 1000)
  1025. date = evt.value.timestamp_ms
  1026. expiry = date + evt.value.ttl
  1027. if expiry < now:
  1028. return
  1029. if evt.activity_indicator_id in self._activity_indicator_ids:
  1030. return
  1031. # TODO clear expired items from this dict
  1032. self._activity_indicator_ids[evt.activity_indicator_id] = expiry
  1033. puppet = await pu.Puppet.get_by_pk(int(evt.value.sender_id))
  1034. portal = await po.Portal.get_by_thread_id(evt.thread_id, receiver=self.igpk)
  1035. if not puppet or not portal or not portal.mxid:
  1036. return
  1037. is_typing = evt.value.activity_status != TypingStatus.OFF
  1038. if puppet.pk == self.igpk:
  1039. self.remote_typing_status = TypingStatus.TEXT if is_typing else TypingStatus.OFF
  1040. await puppet.intent_for(portal).set_typing(portal.mxid, timeout=evt.value.ttl)
  1041. # endregion
  1042. # region Database getters
  1043. def _add_to_cache(self) -> None:
  1044. self.by_mxid[self.mxid] = self
  1045. if self.igpk:
  1046. self.by_igpk[self.igpk] = self
  1047. @classmethod
  1048. @async_getter_lock
  1049. async def get_by_mxid(cls, mxid: UserID, *, create: bool = True) -> User | None:
  1050. # Never allow ghosts to be users
  1051. if pu.Puppet.get_id_from_mxid(mxid):
  1052. return None
  1053. try:
  1054. return cls.by_mxid[mxid]
  1055. except KeyError:
  1056. pass
  1057. user = cast(cls, await super().get_by_mxid(mxid))
  1058. if user is not None:
  1059. user._add_to_cache()
  1060. return user
  1061. if create:
  1062. user = cls(mxid)
  1063. await user.insert()
  1064. user._add_to_cache()
  1065. return user
  1066. return None
  1067. @classmethod
  1068. @async_getter_lock
  1069. async def get_by_igpk(cls, igpk: int) -> User | None:
  1070. try:
  1071. return cls.by_igpk[igpk]
  1072. except KeyError:
  1073. pass
  1074. user = cast(cls, await super().get_by_igpk(igpk))
  1075. if user is not None:
  1076. user._add_to_cache()
  1077. return user
  1078. return None
  1079. @classmethod
  1080. async def all_logged_in(cls) -> AsyncGenerator[User, None]:
  1081. users = await super().all_logged_in()
  1082. user: cls
  1083. for index, user in enumerate(users):
  1084. try:
  1085. yield cls.by_mxid[user.mxid]
  1086. except KeyError:
  1087. user._add_to_cache()
  1088. yield user
  1089. # endregion