user.py 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2023 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable, Callable, cast
  18. from datetime import datetime, timedelta
  19. from functools import partial
  20. import asyncio
  21. import logging
  22. import time
  23. from mauigpapi import (
  24. RETRYABLE_PROXY_EXCEPTIONS,
  25. AndroidAPI,
  26. AndroidMQTT,
  27. AndroidState,
  28. ProxyHandler,
  29. )
  30. from mauigpapi.errors import (
  31. IGChallengeError,
  32. IGCheckpointError,
  33. IGConsentRequiredError,
  34. IGNotLoggedInError,
  35. IGRateLimitError,
  36. IGUserIDNotFoundError,
  37. IrisSubscribeError,
  38. MQTTConnectionUnauthorized,
  39. MQTTNotConnected,
  40. MQTTNotLoggedIn,
  41. MQTTReconnectionError,
  42. )
  43. from mauigpapi.mqtt import (
  44. Connect,
  45. Disconnect,
  46. GraphQLSubscription,
  47. NewSequenceID,
  48. ProxyUpdate,
  49. SkywalkerSubscription,
  50. )
  51. from mauigpapi.types import (
  52. ActivityIndicatorData,
  53. CurrentUser,
  54. MessageSyncEvent,
  55. Operation,
  56. RealtimeDirectEvent,
  57. Thread,
  58. ThreadRemoveEvent,
  59. ThreadSyncEvent,
  60. TypingStatus,
  61. )
  62. from mauigpapi.types.direct_inbox import DMInbox, DMInboxResponse
  63. from mautrix.appservice import AppService
  64. from mautrix.bridge import BaseUser, async_getter_lock
  65. from mautrix.types import EventID, MessageType, RoomID, TextMessageEventContent, UserID
  66. from mautrix.util import background_task
  67. from mautrix.util.bridge_state import BridgeState, BridgeStateEvent
  68. from mautrix.util.logging import TraceLogger
  69. from mautrix.util.opt_prometheus import Gauge, Summary, async_time
  70. from mautrix.util.simple_lock import SimpleLock
  71. from . import portal as po, puppet as pu
  72. from .config import Config
  73. from .db import Backfill, Message as DBMessage, Portal as DBPortal, User as DBUser
  74. if TYPE_CHECKING:
  75. from .__main__ import InstagramBridge
  76. try:
  77. from aiohttp_socks import ProxyConnectionError, ProxyError, ProxyTimeoutError
  78. except ImportError:
  79. class ProxyError(Exception):
  80. pass
  81. ProxyConnectionError = ProxyTimeoutError = ProxyError
  82. METRIC_MESSAGE = Summary("bridge_on_message", "calls to handle_message")
  83. METRIC_THREAD_SYNC = Summary("bridge_on_thread_sync", "calls to handle_thread_sync")
  84. METRIC_RTD = Summary("bridge_on_rtd", "calls to handle_rtd")
  85. METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
  86. METRIC_CONNECTED = Gauge("bridge_connected", "Bridged users connected to Instagram")
  87. BridgeState.human_readable_errors.update(
  88. {
  89. "ig-connection-error": "Instagram disconnected unexpectedly",
  90. "ig-refresh-connection-error": "Reconnecting failed again after refresh: {message}",
  91. "ig-connection-fatal-error": "Instagram disconnected unexpectedly",
  92. "ig-auth-error": "Authentication error from Instagram: {message}, please login again to continue",
  93. "ig-checkpoint": "Instagram checkpoint error. Please check the Instagram website.",
  94. "ig-consent-required": "Instagram requires a consent update. Please check the Instagram website.",
  95. "ig-checkpoint-locked": "Instagram checkpoint error. Please check the Instagram website.",
  96. "ig-rate-limit": "Got Instagram ratelimit error, waiting a few minutes before retrying...",
  97. "ig-disconnected": None,
  98. "ig-no-mqtt": "You're not connected to Instagram",
  99. "logged-out": "You've been logged out of instagram, please login again to continue",
  100. }
  101. )
  102. class User(DBUser, BaseUser):
  103. ig_base_log: TraceLogger = logging.getLogger("mau.instagram")
  104. _activity_indicator_ids: dict[str, int] = {}
  105. by_mxid: dict[UserID, User] = {}
  106. by_igpk: dict[int, User] = {}
  107. config: Config
  108. az: AppService
  109. loop: asyncio.AbstractEventLoop
  110. client: AndroidAPI | None
  111. mqtt: AndroidMQTT | None
  112. _listen_task: asyncio.Task | None = None
  113. _sync_lock: SimpleLock
  114. _backfill_loop_task: asyncio.Task | None
  115. _thread_sync_task: asyncio.Task | None
  116. _seq_id_save_task: asyncio.Task | None
  117. permission_level: str
  118. username: str | None
  119. _notice_room_lock: asyncio.Lock
  120. _notice_send_lock: asyncio.Lock
  121. _is_logged_in: bool
  122. _is_connected: bool
  123. shutdown: bool
  124. remote_typing_status: TypingStatus | None
  125. def __init__(
  126. self,
  127. mxid: UserID,
  128. igpk: int | None = None,
  129. state: AndroidState | None = None,
  130. notice_room: RoomID | None = None,
  131. seq_id: int | None = None,
  132. snapshot_at_ms: int | None = None,
  133. oldest_cursor: str | None = None,
  134. total_backfilled_portals: int | None = None,
  135. thread_sync_completed: bool = False,
  136. ) -> None:
  137. super().__init__(
  138. mxid=mxid,
  139. igpk=igpk,
  140. state=state,
  141. notice_room=notice_room,
  142. seq_id=seq_id,
  143. snapshot_at_ms=snapshot_at_ms,
  144. oldest_cursor=oldest_cursor,
  145. total_backfilled_portals=total_backfilled_portals,
  146. thread_sync_completed=thread_sync_completed,
  147. )
  148. BaseUser.__init__(self)
  149. self._notice_room_lock = asyncio.Lock()
  150. self._notice_send_lock = asyncio.Lock()
  151. perms = self.config.get_permissions(mxid)
  152. self.relay_whitelisted, self.is_whitelisted, self.is_admin, self.permission_level = perms
  153. self.client = None
  154. self.mqtt = None
  155. self.username = None
  156. self._is_logged_in = False
  157. self._is_connected = False
  158. self._is_refreshing = False
  159. self.shutdown = False
  160. self._sync_lock = SimpleLock(
  161. "Waiting for thread sync to finish before handling %s", log=self.log
  162. )
  163. self._listen_task = None
  164. self._thread_sync_task = None
  165. self._backfill_loop_task = None
  166. self.remote_typing_status = None
  167. self._seq_id_save_task = None
  168. self.proxy_handler = ProxyHandler(
  169. api_url=self.config["bridge.get_proxy_api_url"],
  170. )
  171. @classmethod
  172. def init_cls(cls, bridge: "InstagramBridge") -> AsyncIterable[Awaitable[None]]:
  173. cls.bridge = bridge
  174. cls.config = bridge.config
  175. cls.az = bridge.az
  176. cls.loop = bridge.loop
  177. return (user.try_connect() async for user in cls.all_logged_in())
  178. # region Connection management
  179. async def is_logged_in(self) -> bool:
  180. return bool(self.client) and self._is_logged_in
  181. async def get_puppet(self) -> pu.Puppet | None:
  182. if not self.igpk:
  183. return None
  184. return await pu.Puppet.get_by_pk(self.igpk)
  185. async def get_portal_with(self, puppet: pu.Puppet, create: bool = True) -> po.Portal | None:
  186. if not self.igpk:
  187. return None
  188. portal = await po.Portal.find_private_chat(self.igpk, puppet.pk)
  189. if portal:
  190. return portal
  191. if create:
  192. # TODO add error handling somewhere
  193. thread = await self.client.create_group_thread([puppet.pk])
  194. portal = await po.Portal.get_by_thread(thread, self.igpk)
  195. await portal.update_info(thread, self)
  196. return portal
  197. return None
  198. async def try_connect(self) -> None:
  199. try:
  200. await self.connect()
  201. except Exception as e:
  202. self.log.exception("Error while connecting to Instagram")
  203. await self.push_bridge_state(
  204. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  205. )
  206. @property
  207. def api_log(self) -> TraceLogger:
  208. return self.ig_base_log.getChild("http").getChild(self.mxid)
  209. @property
  210. def is_connected(self) -> bool:
  211. return bool(self.client) and bool(self.mqtt) and self._is_connected
  212. async def connect(self, user: CurrentUser | None = None) -> None:
  213. if not self.state:
  214. await self.push_bridge_state(
  215. BridgeStateEvent.BAD_CREDENTIALS,
  216. error="logged-out",
  217. info={"cnd_action": "reauth"},
  218. )
  219. return
  220. client = AndroidAPI(
  221. self.state,
  222. log=self.api_log,
  223. proxy_handler=self.proxy_handler,
  224. )
  225. if not user:
  226. try:
  227. resp = await client.current_user()
  228. user = resp.user
  229. except IGNotLoggedInError as e:
  230. self.log.warning(f"Failed to connect to Instagram: {e}, logging out")
  231. await self.logout(error=e)
  232. return
  233. except IGCheckpointError as e:
  234. self.log.debug("Checkpoint error content: %s", e.body)
  235. raise
  236. except (IGChallengeError, IGConsentRequiredError) as e:
  237. await self._handle_checkpoint(e, on="connect", client=client)
  238. return
  239. self.client = client
  240. self._is_logged_in = True
  241. self.igpk = user.pk
  242. self.username = user.username
  243. await self.push_bridge_state(BridgeStateEvent.CONNECTING)
  244. self._track_metric(METRIC_LOGGED_IN, True)
  245. self.by_igpk[self.igpk] = self
  246. self.mqtt = AndroidMQTT(
  247. self.state,
  248. log=self.ig_base_log.getChild("mqtt").getChild(self.mxid),
  249. proxy_handler=self.proxy_handler,
  250. )
  251. self.mqtt.add_event_handler(Connect, self.on_connect)
  252. self.mqtt.add_event_handler(Disconnect, self.on_disconnect)
  253. self.mqtt.add_event_handler(NewSequenceID, self.update_seq_id)
  254. self.mqtt.add_event_handler(MessageSyncEvent, self.handle_message)
  255. self.mqtt.add_event_handler(ThreadSyncEvent, self.handle_thread_sync)
  256. self.mqtt.add_event_handler(ThreadRemoveEvent, self.handle_thread_remove)
  257. self.mqtt.add_event_handler(RealtimeDirectEvent, self.handle_rtd)
  258. self.mqtt.add_event_handler(ProxyUpdate, self.on_proxy_update)
  259. await self.update()
  260. self.loop.create_task(self._try_sync_puppet(user))
  261. self.loop.create_task(self._post_connect())
  262. async def _post_connect(self):
  263. # Backfill requests are handled synchronously so as not to overload the homeserver.
  264. # Users can configure their backfill stages to be more or less aggressive with backfilling
  265. # to try and avoid getting banned.
  266. if not self._backfill_loop_task or self._backfill_loop_task.done():
  267. self._backfill_loop_task = asyncio.create_task(self._handle_backfill_requests_loop())
  268. if not self.seq_id:
  269. await self._try_sync()
  270. else:
  271. self.log.debug("Connecting to MQTT directly as resync_on_startup is false")
  272. self.start_listen()
  273. if self.config["bridge.backfill.enable"]:
  274. if self._thread_sync_task and not self._thread_sync_task.done():
  275. self.log.warning("Cancelling existing background thread sync task")
  276. self._thread_sync_task.cancel()
  277. self._thread_sync_task = asyncio.create_task(self.backfill_threads())
  278. async def _handle_backfill_requests_loop(self) -> None:
  279. if not self.config["bridge.backfill.enable"] or not self.config["bridge.backfill.msc2716"]:
  280. return
  281. while True:
  282. await self._sync_lock.wait("backfill request")
  283. req = await Backfill.get_next(self.mxid)
  284. if not req:
  285. await asyncio.sleep(30)
  286. continue
  287. self.log.info("Backfill request %s", req)
  288. try:
  289. portal = await po.Portal.get_by_thread_id(
  290. req.portal_thread_id, receiver=req.portal_receiver
  291. )
  292. await req.mark_dispatched()
  293. await portal.backfill(self, req)
  294. await req.mark_done()
  295. except IGNotLoggedInError as e:
  296. self.log.exception("User got logged out during backfill loop")
  297. await self.logout(error=e)
  298. break
  299. except (IGChallengeError, IGConsentRequiredError) as e:
  300. self.log.exception("User got a challenge during backfill loop")
  301. await self._handle_checkpoint(e, on="backfill")
  302. break
  303. except Exception as e:
  304. self.log.exception("Failed to backfill portal %s: %s", req.portal_thread_id, e)
  305. # Don't try again to backfill this portal for a minute.
  306. await req.set_cooldown_timeout(60)
  307. self._backfill_loop_task = None
  308. async def on_connect(self, evt: Connect) -> None:
  309. self.log.debug("Connected to Instagram")
  310. self._track_metric(METRIC_CONNECTED, True)
  311. self._is_connected = True
  312. await self.send_bridge_notice("Connected to Instagram")
  313. await self.push_bridge_state(BridgeStateEvent.CONNECTED)
  314. async def on_disconnect(self, evt: Disconnect) -> None:
  315. self.log.debug("Disconnected from Instagram")
  316. self._track_metric(METRIC_CONNECTED, False)
  317. self._is_connected = False
  318. async def on_proxy_update(self, evt: ProxyUpdate | None = None) -> None:
  319. if self.client:
  320. self.client.setup_http(self.state.cookies.jar)
  321. # TODO this stuff could probably be moved to mautrix-python
  322. async def get_notice_room(self) -> RoomID:
  323. if not self.notice_room:
  324. async with self._notice_room_lock:
  325. # If someone already created the room while this call was waiting,
  326. # don't make a new room
  327. if self.notice_room:
  328. return self.notice_room
  329. creation_content = {}
  330. if not self.config["bridge.federate_rooms"]:
  331. creation_content["m.federate"] = False
  332. self.notice_room = await self.az.intent.create_room(
  333. is_direct=True,
  334. invitees=[self.mxid],
  335. topic="Instagram bridge notices",
  336. creation_content=creation_content,
  337. )
  338. await self.update()
  339. return self.notice_room
  340. async def fill_bridge_state(self, state: BridgeState) -> None:
  341. await super().fill_bridge_state(state)
  342. if not state.remote_id:
  343. if self.igpk:
  344. state.remote_id = str(self.igpk)
  345. else:
  346. try:
  347. state.remote_id = self.state.user_id
  348. except IGUserIDNotFoundError:
  349. state.remote_id = None
  350. if self.username:
  351. state.remote_name = f"@{self.username}"
  352. async def get_bridge_states(self) -> list[BridgeState]:
  353. if not self.state:
  354. return []
  355. state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR)
  356. if self.is_connected:
  357. state.state_event = BridgeStateEvent.CONNECTED
  358. elif self._is_refreshing or self.mqtt:
  359. state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
  360. return [state]
  361. async def send_bridge_notice(
  362. self,
  363. text: str,
  364. edit: EventID | None = None,
  365. state_event: BridgeStateEvent | None = None,
  366. important: bool = False,
  367. error_code: str | None = None,
  368. error_message: str | None = None,
  369. info: dict | None = None,
  370. ) -> EventID | None:
  371. if state_event:
  372. await self.push_bridge_state(
  373. state_event,
  374. error=error_code,
  375. message=error_message if error_code else text,
  376. info=info,
  377. )
  378. if self.config["bridge.disable_bridge_notices"]:
  379. return None
  380. if not important and not self.config["bridge.unimportant_bridge_notices"]:
  381. self.log.debug("Not sending unimportant bridge notice: %s", text)
  382. return None
  383. event_id = None
  384. try:
  385. self.log.debug("Sending bridge notice: %s", text)
  386. content = TextMessageEventContent(
  387. body=text, msgtype=(MessageType.TEXT if important else MessageType.NOTICE)
  388. )
  389. if edit:
  390. content.set_edit(edit)
  391. # This is locked to prevent notices going out in the wrong order
  392. async with self._notice_send_lock:
  393. event_id = await self.az.intent.send_message(await self.get_notice_room(), content)
  394. except Exception:
  395. self.log.warning("Failed to send bridge notice", exc_info=True)
  396. return edit or event_id
  397. async def _try_sync_puppet(self, user_info: CurrentUser) -> None:
  398. puppet = await pu.Puppet.get_by_pk(self.igpk)
  399. try:
  400. await puppet.update_info(user_info, self)
  401. except Exception:
  402. self.log.exception("Failed to update own puppet info")
  403. try:
  404. if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
  405. self.log.info(f"Automatically enabling custom puppet")
  406. await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
  407. except Exception:
  408. self.log.exception("Failed to automatically enable custom puppet")
  409. async def _try_sync(self) -> None:
  410. try:
  411. await self.sync()
  412. except Exception as e:
  413. self.log.exception("Exception while syncing")
  414. if isinstance(e, IGCheckpointError):
  415. self.log.debug("Checkpoint error content: %s", e.body)
  416. await self.push_bridge_state(
  417. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  418. )
  419. async def get_direct_chats(self) -> dict[UserID, list[RoomID]]:
  420. return {
  421. pu.Puppet.get_mxid_from_id(portal.other_user_pk): [portal.mxid]
  422. for portal in await DBPortal.find_private_chats_of(self.igpk)
  423. if portal.mxid
  424. }
  425. async def refresh(self, resync: bool = True) -> None:
  426. self._is_refreshing = True
  427. try:
  428. await self.stop_listen()
  429. self.state.reset_pigeon_session_id()
  430. if resync:
  431. retry_count = 0
  432. minutes = 1
  433. while True:
  434. try:
  435. await self.sync()
  436. return
  437. except Exception as e:
  438. if retry_count >= 4 and minutes < 10:
  439. minutes += 1
  440. retry_count += 1
  441. s = "s" if minutes != 1 else ""
  442. self.log.exception(
  443. f"Error while syncing for refresh, retrying in {minutes} minute{s}"
  444. )
  445. if isinstance(e, IGCheckpointError):
  446. self.log.debug("Checkpoint error content: %s", e.body)
  447. await self.push_bridge_state(
  448. BridgeStateEvent.UNKNOWN_ERROR,
  449. error="unknown-error",
  450. message="An unknown error occurred while connecting to Instagram",
  451. info={"python_error": str(e)},
  452. )
  453. await asyncio.sleep(minutes * 60)
  454. else:
  455. self.start_listen()
  456. finally:
  457. self._is_refreshing = False
  458. self.proxy_handler.update_proxy_url()
  459. async def _handle_checkpoint(
  460. self,
  461. e: IGChallengeError | IGConsentRequiredError,
  462. on: str,
  463. client: AndroidAPI | None = None,
  464. ) -> None:
  465. self.log.warning(f"Got checkpoint error on {on}: {e.body.serialize()}")
  466. client = client or self.client
  467. self.client = None
  468. self.mqtt = None
  469. if isinstance(e, IGConsentRequiredError):
  470. await self.push_bridge_state(
  471. BridgeStateEvent.BAD_CREDENTIALS,
  472. error="ig-consent-required",
  473. info=e.body.serialize(),
  474. )
  475. return
  476. error_code = "ig-checkpoint"
  477. try:
  478. resp = await client.challenge_reset()
  479. info = {
  480. "challenge_context": (
  481. resp.challenge_context.serialize() if resp.challenge_context_str else None
  482. ),
  483. "step_name": resp.step_name,
  484. "step_data": resp.step_data.serialize() if resp.step_data else None,
  485. "user_id": resp.user_id,
  486. "action": resp.action,
  487. "status": resp.status,
  488. "challenge": e.body.challenge.serialize() if e.body.challenge else None,
  489. }
  490. self.log.debug(f"Challenge state: {resp.serialize()}")
  491. if resp.challenge_context.challenge_type_enum == "HACKED_LOCK":
  492. error_code = "ig-checkpoint-locked"
  493. except Exception:
  494. self.log.exception("Error resetting challenge state")
  495. info = {"challenge": e.body.challenge.serialize() if e.body.challenge else None}
  496. await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error=error_code, info=info)
  497. async def _sync_thread(self, thread: Thread) -> bool:
  498. """
  499. Sync a specific thread. Returns whether the thread had messages after the last message in
  500. the database before the sync.
  501. """
  502. self.log.debug(f"Syncing thread {thread.thread_id}")
  503. forward_messages = thread.items
  504. assert self.client
  505. portal = await po.Portal.get_by_thread(thread, self.igpk)
  506. assert portal
  507. # Create or update the Matrix room
  508. if not portal.mxid:
  509. await portal.create_matrix_room(self, thread)
  510. else:
  511. await portal.update_matrix_room(self, thread)
  512. if not self.config["bridge.backfill.enable_initial"]:
  513. return True
  514. last_message = await DBMessage.get_last(portal.mxid)
  515. cursor = thread.oldest_cursor
  516. if last_message:
  517. original_number_of_messages = len(thread.items)
  518. new_messages = [
  519. m for m in thread.items if last_message.ig_timestamp_ms < m.timestamp_ms
  520. ]
  521. forward_messages = new_messages
  522. portal.log.debug(
  523. f"{len(new_messages)}/{original_number_of_messages} messages are after most recent"
  524. " message."
  525. )
  526. # Fetch more messages until we get back to messages that have been bridged already.
  527. while len(new_messages) > 0 and len(new_messages) == original_number_of_messages:
  528. await asyncio.sleep(self.config["bridge.backfill.incremental.page_delay"])
  529. portal.log.debug("Fetching more messages for forward backfill")
  530. resp = await self.client.get_thread(portal.thread_id, cursor=cursor)
  531. if len(resp.thread.items) == 0:
  532. break
  533. original_number_of_messages = len(resp.thread.items)
  534. new_messages = [
  535. m for m in resp.thread.items if last_message.ig_timestamp_ms < m.timestamp_ms
  536. ]
  537. forward_messages = new_messages + forward_messages
  538. cursor = resp.thread.oldest_cursor
  539. portal.log.debug(
  540. f"{len(new_messages)}/{original_number_of_messages} messages are after most "
  541. "recent message."
  542. )
  543. elif not portal.first_event_id:
  544. self.log.debug(
  545. f"Skipping backfilling {portal.thread_id} as the first event ID is not known"
  546. )
  547. return False
  548. if forward_messages:
  549. portal.cursor = cursor
  550. await portal.update()
  551. mark_read = thread.read_state == 0 or (
  552. (hours := self.config["bridge.backfill.unread_hours_threshold"]) > 0
  553. and (
  554. datetime.fromtimestamp(forward_messages[0].timestamp_ms / 1000)
  555. < datetime.now() - timedelta(hours=hours)
  556. )
  557. )
  558. base_insertion_event_id = await portal.backfill_message_page(
  559. self,
  560. list(reversed(forward_messages)),
  561. forward=True,
  562. last_message=last_message,
  563. mark_read=mark_read,
  564. )
  565. if (
  566. not self.bridge.homeserver_software.is_hungry
  567. and self.config["bridge.backfill.msc2716"]
  568. ):
  569. await portal.send_post_backfill_dummy(
  570. forward_messages[0].timestamp, base_insertion_event_id=base_insertion_event_id
  571. )
  572. if (
  573. mark_read
  574. and not self.bridge.homeserver_software.is_hungry
  575. and (puppet := await self.get_puppet())
  576. ):
  577. last_message = await DBMessage.get_last(portal.mxid)
  578. if last_message:
  579. await puppet.intent_for(portal).mark_read(portal.mxid, last_message.mxid)
  580. await portal._update_read_receipts(thread.last_seen_at)
  581. if self.config["bridge.backfill.msc2716"]:
  582. await portal.enqueue_immediate_backfill(self, 1)
  583. return len(forward_messages) > 0
  584. async def _maybe_update_proxy(self, source: str) -> None:
  585. if not self._listen_task:
  586. self.proxy_handler.update_proxy_url(source)
  587. await self.on_proxy_update()
  588. else:
  589. self.log.debug(f"Not updating proxy: listen_task is still running? (caller: {source})")
  590. async def sync(self, increment_total_backfilled_portals: bool = False) -> None:
  591. await self.run_with_sync_lock(partial(self._sync, increment_total_backfilled_portals))
  592. async def _sync(self, increment_total_backfilled_portals: bool = False) -> None:
  593. if not self._listen_task:
  594. self.state.reset_pigeon_session_id()
  595. sleep_minutes = 2
  596. errors = 0
  597. while True:
  598. try:
  599. resp = await self.client.get_inbox()
  600. break
  601. except RETRYABLE_PROXY_EXCEPTIONS as e:
  602. errors += 1
  603. wait = min(errors * 10, 60)
  604. self.log.warning(
  605. f"{e.__class__.__name__} while trying to sync, retrying in {wait} seconds: {e}"
  606. )
  607. await asyncio.sleep(wait)
  608. if errors > 1:
  609. await self._maybe_update_proxy("sync error")
  610. except IGNotLoggedInError as e:
  611. self.log.exception("Got not logged in error while syncing")
  612. await self.logout(error=e)
  613. return
  614. except IGRateLimitError as e:
  615. self.log.error(
  616. "Got ratelimit error while trying to get inbox (%s), retrying in %d minutes",
  617. e.body,
  618. sleep_minutes,
  619. )
  620. await self.push_bridge_state(
  621. BridgeStateEvent.TRANSIENT_DISCONNECT, error="ig-rate-limit"
  622. )
  623. await asyncio.sleep(sleep_minutes * 60)
  624. sleep_minutes += 2
  625. except IGCheckpointError as e:
  626. self.log.debug("Checkpoint error content: %s", e.body)
  627. raise
  628. except (IGChallengeError, IGConsentRequiredError) as e:
  629. await self._handle_checkpoint(e, on="sync")
  630. return
  631. self.seq_id = resp.seq_id
  632. self.snapshot_at_ms = resp.snapshot_at_ms
  633. await self.save_seq_id()
  634. if not self._listen_task:
  635. self.start_listen(is_after_sync=True)
  636. sync_count = min(
  637. self.config["bridge.backfill.max_conversations"],
  638. self.config["bridge.max_startup_thread_sync_count"],
  639. )
  640. self.log.debug(f"Fetching {sync_count} threads, 20 at a time...")
  641. local_limit: int | None = sync_count
  642. if sync_count == 0:
  643. return
  644. elif sync_count < 0:
  645. local_limit = None
  646. await self._sync_threads_with_delay(
  647. self.client.iter_inbox(
  648. self._update_seq_id_and_cursor, start_at=resp, local_limit=local_limit
  649. ),
  650. stop_when_threads_have_no_messages_to_backfill=True,
  651. increment_total_backfilled_portals=increment_total_backfilled_portals,
  652. local_limit=local_limit,
  653. )
  654. try:
  655. await self.update_direct_chats()
  656. except Exception:
  657. self.log.exception("Error updating direct chat list")
  658. async def backfill_threads(self):
  659. try:
  660. await self.run_with_sync_lock(self._backfill_threads)
  661. except Exception:
  662. self.log.exception("Error in thread backfill loop")
  663. async def _backfill_threads(self):
  664. assert self.client
  665. if not self.config["bridge.backfill.enable"]:
  666. return
  667. max_conversations = self.config["bridge.backfill.max_conversations"] or 0
  668. if 0 <= max_conversations <= (self.total_backfilled_portals or 0):
  669. self.log.info("Backfill max_conversations count reached, not syncing any more portals")
  670. return
  671. elif self.thread_sync_completed:
  672. self.log.debug("Thread backfill is marked as completed, not syncing more portals")
  673. return
  674. local_limit = (
  675. max_conversations - (self.total_backfilled_portals or 0)
  676. if max_conversations >= 0
  677. else None
  678. )
  679. start_at = None
  680. if self.oldest_cursor:
  681. start_at = DMInboxResponse(
  682. status="",
  683. seq_id=self.seq_id,
  684. snapshot_at_ms=0,
  685. pending_requests_total=0,
  686. has_pending_top_requests=False,
  687. viewer=None,
  688. inbox=DMInbox(
  689. threads=[],
  690. has_older=True,
  691. unseen_count=0,
  692. unseen_count_ts=0,
  693. blended_inbox_enabled=False,
  694. oldest_cursor=self.oldest_cursor,
  695. ),
  696. )
  697. backoff = self.config.get("bridge.backfill.backoff.thread_list", 300)
  698. await self._sync_threads_with_delay(
  699. self.client.iter_inbox(
  700. self._update_seq_id_and_cursor,
  701. start_at=start_at,
  702. local_limit=local_limit,
  703. rate_limit_exceeded_backoff=backoff,
  704. ),
  705. increment_total_backfilled_portals=True,
  706. local_limit=local_limit,
  707. )
  708. await self.update_direct_chats()
  709. def _update_seq_id_and_cursor(self, seq_id: int, cursor: str | None):
  710. self.seq_id = seq_id
  711. if cursor:
  712. self.oldest_cursor = cursor
  713. async def _sync_threads_with_delay(
  714. self,
  715. threads: AsyncIterable[Thread],
  716. increment_total_backfilled_portals: bool = False,
  717. stop_when_threads_have_no_messages_to_backfill: bool = False,
  718. local_limit: int | None = None,
  719. ):
  720. sync_delay = self.config["bridge.backfill.min_sync_thread_delay"]
  721. last_thread_sync_ts = 0.0
  722. found_thread_count = 0
  723. async for thread in threads:
  724. found_thread_count += 1
  725. now = time.monotonic()
  726. if now < last_thread_sync_ts + sync_delay:
  727. delay = last_thread_sync_ts + sync_delay - now
  728. self.log.debug("Thread sync is happening too quickly. Waiting for %ds", delay)
  729. await asyncio.sleep(delay)
  730. last_thread_sync_ts = time.monotonic()
  731. had_new_messages = await self._sync_thread(thread)
  732. if not had_new_messages and stop_when_threads_have_no_messages_to_backfill:
  733. self.log.debug("Got to threads with no new messages. Stopping sync.")
  734. return
  735. if increment_total_backfilled_portals:
  736. self.total_backfilled_portals = (self.total_backfilled_portals or 0) + 1
  737. await self.update()
  738. if local_limit is None or found_thread_count < local_limit:
  739. if local_limit is None:
  740. self.log.info(
  741. "Reached end of thread list with no limit, marking thread sync as completed"
  742. )
  743. else:
  744. self.log.info(
  745. f"Reached end of thread list (got {found_thread_count} with "
  746. f"limit {local_limit}), marking thread sync as completed"
  747. )
  748. self.thread_sync_completed = True
  749. await self.update()
  750. async def run_with_sync_lock(self, func: Callable[[], Awaitable]):
  751. with self._sync_lock:
  752. retry_count = 0
  753. while retry_count < 5:
  754. try:
  755. retry_count += 1
  756. await func()
  757. # The sync was successful. Exit the loop.
  758. return
  759. except IGNotLoggedInError as e:
  760. await self.logout(error=e)
  761. return
  762. except Exception:
  763. self.log.exception(
  764. "Failed to sync threads. Waiting 30 seconds before retrying sync."
  765. )
  766. await asyncio.sleep(30)
  767. # If we get here, it means that the sync has failed five times. If this happens, most
  768. # likely something very bad has happened.
  769. self.log.error("Failed to sync threads five times. Will not retry.")
  770. def start_listen(self, is_after_sync: bool = False) -> None:
  771. self.shutdown = False
  772. task = self._listen(
  773. seq_id=self.seq_id, snapshot_at_ms=self.snapshot_at_ms, is_after_sync=is_after_sync
  774. )
  775. self._listen_task = self.loop.create_task(task)
  776. async def delayed_start_listen(self, sleep: int) -> None:
  777. await asyncio.sleep(sleep)
  778. if self.is_connected:
  779. self.log.debug(
  780. "Already reconnected before delay after MQTT reconnection error finished"
  781. )
  782. else:
  783. self.log.debug("Reconnecting after MQTT connection error")
  784. self.start_listen()
  785. async def fetch_user_and_reconnect(self, sleep_first: int | None = None) -> None:
  786. if sleep_first:
  787. await asyncio.sleep(sleep_first)
  788. if self.is_connected:
  789. self.log.debug("Canceling user fetch, already reconnected")
  790. return
  791. self.log.debug("Refetching current user after disconnection")
  792. errors = 0
  793. while True:
  794. try:
  795. resp = await self.client.current_user()
  796. except RETRYABLE_PROXY_EXCEPTIONS as e:
  797. errors += 1
  798. wait = min(errors * 10, 60)
  799. self.log.warning(
  800. f"{e.__class__.__name__} while trying to check user for reconnection, "
  801. f"retrying in {wait} seconds: {e}"
  802. )
  803. await asyncio.sleep(wait)
  804. if errors > 1:
  805. await self._maybe_update_proxy("fetch_user_and_reconnect error")
  806. except IGNotLoggedInError as e:
  807. self.log.warning(f"Failed to reconnect to Instagram: {e}, logging out")
  808. await self.logout(error=e)
  809. return
  810. except (IGChallengeError, IGConsentRequiredError) as e:
  811. await self._handle_checkpoint(e, on="reconnect")
  812. return
  813. except Exception as e:
  814. self.log.exception("Error while reconnecting to Instagram")
  815. if isinstance(e, IGCheckpointError):
  816. self.log.debug("Checkpoint error content: %s", e.body)
  817. await self.push_bridge_state(
  818. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  819. )
  820. return
  821. else:
  822. self.log.debug(f"Confirmed current user {resp.user.pk}")
  823. self.start_listen()
  824. return
  825. async def _listen(self, seq_id: int, snapshot_at_ms: int, is_after_sync: bool) -> None:
  826. try:
  827. await self.mqtt.listen(
  828. graphql_subs={
  829. GraphQLSubscription.app_presence(),
  830. GraphQLSubscription.direct_typing(self.state.user_id),
  831. GraphQLSubscription.direct_status(),
  832. },
  833. skywalker_subs={
  834. SkywalkerSubscription.direct_sub(self.state.user_id),
  835. SkywalkerSubscription.live_sub(self.state.user_id),
  836. },
  837. seq_id=seq_id,
  838. snapshot_at_ms=snapshot_at_ms,
  839. )
  840. except IrisSubscribeError as e:
  841. if is_after_sync:
  842. self.log.exception("Got IrisSubscribeError right after refresh")
  843. await self.send_bridge_notice(
  844. f"Reconnecting failed again after refresh: {e}",
  845. important=True,
  846. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  847. error_code="ig-refresh-connection-error",
  848. error_message=str(e),
  849. info={"python_error": str(e)},
  850. )
  851. else:
  852. self.log.warning(f"Got IrisSubscribeError {e}, refreshing...")
  853. background_task.create(self.refresh())
  854. except MQTTReconnectionError as e:
  855. self.log.warning(
  856. f"Unexpected connection error: {e}, reconnecting in 1 minute", exc_info=True
  857. )
  858. await self.send_bridge_notice(
  859. f"Error in listener: {e}",
  860. important=True,
  861. state_event=BridgeStateEvent.TRANSIENT_DISCONNECT,
  862. error_code="ig-connection-error-socket",
  863. )
  864. self.mqtt.disconnect()
  865. background_task.create(self.delayed_start_listen(sleep=60))
  866. except (MQTTNotConnected, MQTTNotLoggedIn, MQTTConnectionUnauthorized) as e:
  867. self.log.warning(f"Unexpected connection error: {e}, checking auth and reconnecting")
  868. await self.send_bridge_notice(
  869. f"Error in listener: {e}",
  870. important=True,
  871. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  872. error_code="ig-connection-error-maybe-auth",
  873. )
  874. self.mqtt.disconnect()
  875. background_task.create(self.fetch_user_and_reconnect())
  876. except Exception as e:
  877. self.log.exception("Fatal error in listener, reconnecting in 5 minutes")
  878. await self.send_bridge_notice(
  879. "Fatal error in listener (see logs for more info)",
  880. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  881. important=True,
  882. error_code="ig-unknown-connection-error",
  883. info={"python_error": str(e)},
  884. )
  885. self.mqtt.disconnect()
  886. background_task.create(self.fetch_user_and_reconnect(sleep_first=300))
  887. else:
  888. if not self.shutdown:
  889. await self.send_bridge_notice(
  890. "Instagram connection closed without error",
  891. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  892. error_code="ig-disconnected",
  893. )
  894. finally:
  895. self._listen_task = None
  896. self._is_connected = False
  897. self._track_metric(METRIC_CONNECTED, False)
  898. async def stop_listen(self) -> None:
  899. if self.mqtt:
  900. self.shutdown = True
  901. self.mqtt.disconnect()
  902. if self._listen_task:
  903. await self._listen_task
  904. self.shutdown = False
  905. self._track_metric(METRIC_CONNECTED, False)
  906. self._is_connected = False
  907. await self.update()
  908. def stop_backfill_tasks(self) -> None:
  909. if self._backfill_loop_task:
  910. self._backfill_loop_task.cancel()
  911. self._backfill_loop_task = None
  912. if self._thread_sync_task:
  913. self._thread_sync_task.cancel()
  914. self._thread_sync_task = None
  915. async def logout(self, error: IGNotLoggedInError | None = None) -> None:
  916. await self.stop_listen()
  917. self.stop_backfill_tasks()
  918. if self.client and error is None:
  919. try:
  920. await self.client.logout(one_tap_app_login=False)
  921. except Exception:
  922. self.log.debug("Exception logging out", exc_info=True)
  923. if self.mqtt:
  924. self.mqtt.disconnect()
  925. self._track_metric(METRIC_CONNECTED, False)
  926. self._track_metric(METRIC_LOGGED_IN, False)
  927. if error is None:
  928. await self.push_bridge_state(BridgeStateEvent.LOGGED_OUT)
  929. puppet = await pu.Puppet.get_by_pk(self.igpk, create=False)
  930. if puppet and puppet.is_real_user:
  931. await puppet.switch_mxid(None, None)
  932. try:
  933. del self.by_igpk[self.igpk]
  934. except KeyError:
  935. pass
  936. self.igpk = None
  937. else:
  938. self.log.debug("Auth error body: %s", error.body.serialize())
  939. await self.send_bridge_notice(
  940. f"You have been logged out of Instagram: {error.proper_message}",
  941. important=True,
  942. state_event=BridgeStateEvent.BAD_CREDENTIALS,
  943. error_code="ig-auth-error",
  944. error_message=error.proper_message,
  945. info={"cnd_action": "reauth"},
  946. )
  947. self.client = None
  948. self.mqtt = None
  949. self.state = None
  950. self.seq_id = None
  951. self.snapshot_at_ms = None
  952. self.thread_sync_completed = False
  953. self._is_logged_in = False
  954. await self.update()
  955. # endregion
  956. # region Event handlers
  957. async def _save_seq_id_after_sleep(self) -> None:
  958. await asyncio.sleep(120)
  959. self._seq_id_save_task = None
  960. self.log.trace("Saving sequence ID %d/%d", self.seq_id, self.snapshot_at_ms)
  961. try:
  962. await self.save_seq_id()
  963. except Exception:
  964. self.log.exception("Error saving sequence ID")
  965. async def update_seq_id(self, evt: NewSequenceID) -> None:
  966. self.seq_id = evt.seq_id
  967. self.snapshot_at_ms = evt.snapshot_at_ms
  968. if not self._seq_id_save_task or self._seq_id_save_task.done():
  969. self.log.trace("Starting seq id save task (%d/%d)", evt.seq_id, evt.snapshot_at_ms)
  970. self._seq_id_save_task = asyncio.create_task(self._save_seq_id_after_sleep())
  971. else:
  972. self.log.trace("Not starting seq id save task (%d/%d)", evt.seq_id, evt.snapshot_at_ms)
  973. @async_time(METRIC_MESSAGE)
  974. async def handle_message(self, evt: MessageSyncEvent) -> None:
  975. portal = await po.Portal.get_by_thread_id(evt.message.thread_id, receiver=self.igpk)
  976. if not portal or not portal.mxid:
  977. self.log.debug("Got message in thread with no portal, getting info...")
  978. resp = await self.client.get_thread(evt.message.thread_id)
  979. portal = await po.Portal.get_by_thread(resp.thread, self.igpk)
  980. self.log.debug("Got info for unknown portal, creating room")
  981. await portal.create_matrix_room(self, resp.thread)
  982. if not portal.mxid:
  983. self.log.warning(
  984. "Room creation appears to have failed, "
  985. f"dropping message in {evt.message.thread_id}"
  986. )
  987. return
  988. self.log.trace(f"Received message sync event {evt.message}")
  989. if evt.message.new_reaction:
  990. await portal.handle_instagram_reaction(
  991. evt.message, remove=evt.message.op == Operation.REMOVE
  992. )
  993. return
  994. sender = await pu.Puppet.get_by_pk(evt.message.user_id) if evt.message.user_id else None
  995. if evt.message.op == Operation.ADD:
  996. if not sender:
  997. # I don't think we care about adds with no sender
  998. return
  999. await portal.handle_instagram_item(self, sender, evt.message)
  1000. elif evt.message.op == Operation.REMOVE:
  1001. # Removes don't have a sender, only the message sender can unsend messages anyway
  1002. await portal.handle_instagram_remove(evt.message.item_id)
  1003. elif evt.message.op == Operation.REPLACE:
  1004. await portal.handle_instagram_update(evt.message)
  1005. @async_time(METRIC_THREAD_SYNC)
  1006. async def handle_thread_sync(self, evt: ThreadSyncEvent) -> None:
  1007. self.log.trace("Thread sync event content: %s", evt)
  1008. portal = await po.Portal.get_by_thread(evt, receiver=self.igpk)
  1009. if portal.mxid:
  1010. self.log.debug("Got thread sync event for %s with existing portal", portal.thread_id)
  1011. await portal.update_matrix_room(self, evt)
  1012. elif evt.is_group:
  1013. self.log.debug(
  1014. "Got thread sync event for group %s without existing portal, creating room",
  1015. portal.thread_id,
  1016. )
  1017. await portal.create_matrix_room(self, evt)
  1018. else:
  1019. self.log.debug(
  1020. "Got thread sync event for DM %s without existing portal, ignoring",
  1021. portal.thread_id,
  1022. )
  1023. async def handle_thread_remove(self, evt: ThreadRemoveEvent) -> None:
  1024. self.log.debug("Got thread remove event: %s", evt.serialize())
  1025. @async_time(METRIC_RTD)
  1026. async def handle_rtd(self, evt: RealtimeDirectEvent) -> None:
  1027. if not isinstance(evt.value, ActivityIndicatorData):
  1028. return
  1029. now = int(time.time() * 1000)
  1030. date = evt.value.timestamp_ms
  1031. expiry = date + evt.value.ttl
  1032. if expiry < now:
  1033. return
  1034. if evt.activity_indicator_id in self._activity_indicator_ids:
  1035. return
  1036. # TODO clear expired items from this dict
  1037. self._activity_indicator_ids[evt.activity_indicator_id] = expiry
  1038. puppet = await pu.Puppet.get_by_pk(int(evt.value.sender_id))
  1039. portal = await po.Portal.get_by_thread_id(evt.thread_id, receiver=self.igpk)
  1040. if not puppet or not portal or not portal.mxid:
  1041. return
  1042. is_typing = evt.value.activity_status != TypingStatus.OFF
  1043. if puppet.pk == self.igpk:
  1044. self.remote_typing_status = TypingStatus.TEXT if is_typing else TypingStatus.OFF
  1045. await puppet.intent_for(portal).set_typing(portal.mxid, timeout=evt.value.ttl)
  1046. # endregion
  1047. # region Database getters
  1048. def _add_to_cache(self) -> None:
  1049. self.by_mxid[self.mxid] = self
  1050. if self.igpk:
  1051. self.by_igpk[self.igpk] = self
  1052. @classmethod
  1053. @async_getter_lock
  1054. async def get_by_mxid(cls, mxid: UserID, *, create: bool = True) -> User | None:
  1055. # Never allow ghosts to be users
  1056. if pu.Puppet.get_id_from_mxid(mxid):
  1057. return None
  1058. try:
  1059. return cls.by_mxid[mxid]
  1060. except KeyError:
  1061. pass
  1062. user = cast(cls, await super().get_by_mxid(mxid))
  1063. if user is not None:
  1064. user._add_to_cache()
  1065. return user
  1066. if create:
  1067. user = cls(mxid)
  1068. await user.insert()
  1069. user._add_to_cache()
  1070. return user
  1071. return None
  1072. @classmethod
  1073. @async_getter_lock
  1074. async def get_by_igpk(cls, igpk: int) -> User | None:
  1075. try:
  1076. return cls.by_igpk[igpk]
  1077. except KeyError:
  1078. pass
  1079. user = cast(cls, await super().get_by_igpk(igpk))
  1080. if user is not None:
  1081. user._add_to_cache()
  1082. return user
  1083. return None
  1084. @classmethod
  1085. async def all_logged_in(cls) -> AsyncGenerator[User, None]:
  1086. users = await super().all_logged_in()
  1087. user: cls
  1088. for index, user in enumerate(users):
  1089. try:
  1090. yield cls.by_mxid[user.mxid]
  1091. except KeyError:
  1092. user._add_to_cache()
  1093. yield user
  1094. # endregion