user.py 49 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2023 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable, Callable, cast
  18. from datetime import datetime, timedelta
  19. from functools import partial
  20. import asyncio
  21. import logging
  22. import time
  23. from mauigpapi import AndroidAPI, AndroidMQTT, AndroidState
  24. from mauigpapi.errors import (
  25. IGChallengeError,
  26. IGCheckpointError,
  27. IGConsentRequiredError,
  28. IGNotLoggedInError,
  29. IGRateLimitError,
  30. IGResponseError,
  31. IGUnknownError,
  32. IGUserIDNotFoundError,
  33. IrisSubscribeError,
  34. MQTTConnectionUnauthorized,
  35. MQTTNotConnected,
  36. MQTTNotLoggedIn,
  37. MQTTReconnectionError,
  38. )
  39. from mauigpapi.mqtt import (
  40. Connect,
  41. Disconnect,
  42. GraphQLSubscription,
  43. NewSequenceID,
  44. ProxyUpdate,
  45. SkywalkerSubscription,
  46. )
  47. from mauigpapi.types import (
  48. ActivityIndicatorData,
  49. CurrentUser,
  50. MessageSyncEvent,
  51. Operation,
  52. RealtimeDirectEvent,
  53. Thread,
  54. ThreadRemoveEvent,
  55. ThreadSyncEvent,
  56. TypingStatus,
  57. )
  58. from mauigpapi.types.direct_inbox import DMInbox, DMInboxResponse
  59. from mautrix.appservice import AppService
  60. from mautrix.bridge import BaseUser, async_getter_lock
  61. from mautrix.types import EventID, MessageType, RoomID, TextMessageEventContent, UserID
  62. from mautrix.util import background_task
  63. from mautrix.util.bridge_state import BridgeState, BridgeStateEvent
  64. from mautrix.util.logging import TraceLogger
  65. from mautrix.util.opt_prometheus import Gauge, Summary, async_time
  66. from mautrix.util.proxy import RETRYABLE_PROXY_EXCEPTIONS, ProxyHandler
  67. from mautrix.util.simple_lock import SimpleLock
  68. from . import portal as po, puppet as pu
  69. from .config import Config
  70. from .db import Backfill, Message as DBMessage, Portal as DBPortal, User as DBUser
  71. if TYPE_CHECKING:
  72. from .__main__ import InstagramBridge
  73. METRIC_MESSAGE = Summary("bridge_on_message", "calls to handle_message")
  74. METRIC_THREAD_SYNC = Summary("bridge_on_thread_sync", "calls to handle_thread_sync")
  75. METRIC_RTD = Summary("bridge_on_rtd", "calls to handle_rtd")
  76. METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
  77. METRIC_CONNECTED = Gauge("bridge_connected", "Bridged users connected to Instagram")
  78. BridgeState.human_readable_errors.update(
  79. {
  80. "ig-connection-error": "Instagram disconnected unexpectedly",
  81. "ig-refresh-connection-error": "Reconnecting failed again after refresh: {message}",
  82. "ig-connection-fatal-error": "Instagram disconnected unexpectedly",
  83. "ig-auth-error": "Authentication error from Instagram: {message}, please login again to continue",
  84. "ig-checkpoint": "Instagram checkpoint error. Please check the Instagram website.",
  85. "ig-consent-required": "Instagram requires a consent update. Please check the Instagram website.",
  86. "ig-checkpoint-locked": "Instagram checkpoint error. Please check the Instagram website.",
  87. "ig-rate-limit": "Got Instagram ratelimit error, waiting a few minutes before retrying...",
  88. "ig-disconnected": None,
  89. "logged-out": "You've been logged out of instagram, please login again to continue",
  90. }
  91. )
  92. class User(DBUser, BaseUser):
  93. ig_base_log: TraceLogger = logging.getLogger("mau.instagram")
  94. _activity_indicator_ids: dict[str, int] = {}
  95. by_mxid: dict[UserID, User] = {}
  96. by_igpk: dict[int, User] = {}
  97. config: Config
  98. az: AppService
  99. loop: asyncio.AbstractEventLoop
  100. client: AndroidAPI | None
  101. mqtt: AndroidMQTT | None
  102. _listen_task: asyncio.Task | None = None
  103. _sync_lock: SimpleLock
  104. _backfill_loop_task: asyncio.Task | None
  105. _thread_sync_task: asyncio.Task | None
  106. _seq_id_save_task: asyncio.Task | None
  107. permission_level: str
  108. username: str | None
  109. _notice_room_lock: asyncio.Lock
  110. _notice_send_lock: asyncio.Lock
  111. _is_logged_in: bool
  112. _is_connected: bool
  113. shutdown: bool
  114. remote_typing_status: TypingStatus | None
  115. def __init__(
  116. self,
  117. mxid: UserID,
  118. igpk: int | None = None,
  119. state: AndroidState | None = None,
  120. notice_room: RoomID | None = None,
  121. seq_id: int | None = None,
  122. snapshot_at_ms: int | None = None,
  123. oldest_cursor: str | None = None,
  124. total_backfilled_portals: int | None = None,
  125. thread_sync_completed: bool = False,
  126. ) -> None:
  127. super().__init__(
  128. mxid=mxid,
  129. igpk=igpk,
  130. state=state,
  131. notice_room=notice_room,
  132. seq_id=seq_id,
  133. snapshot_at_ms=snapshot_at_ms,
  134. oldest_cursor=oldest_cursor,
  135. total_backfilled_portals=total_backfilled_portals,
  136. thread_sync_completed=thread_sync_completed,
  137. )
  138. BaseUser.__init__(self)
  139. self._notice_room_lock = asyncio.Lock()
  140. self._notice_send_lock = asyncio.Lock()
  141. self.command_status = None
  142. perms = self.config.get_permissions(mxid)
  143. self.relay_whitelisted, self.is_whitelisted, self.is_admin, self.permission_level = perms
  144. self.client = None
  145. self.mqtt = None
  146. self.username = None
  147. self._is_logged_in = False
  148. self._is_connected = False
  149. self._is_refreshing = False
  150. self.shutdown = False
  151. self._sync_lock = SimpleLock(
  152. "Waiting for thread sync to finish before handling %s", log=self.log
  153. )
  154. self._listen_task = None
  155. self._thread_sync_task = None
  156. self._backfill_loop_task = None
  157. self.remote_typing_status = None
  158. self._seq_id_save_task = None
  159. self.proxy_handler = ProxyHandler(
  160. api_url=self.config["bridge.get_proxy_api_url"],
  161. )
  162. @classmethod
  163. def init_cls(cls, bridge: "InstagramBridge") -> AsyncIterable[Awaitable[None]]:
  164. cls.bridge = bridge
  165. cls.config = bridge.config
  166. cls.az = bridge.az
  167. cls.loop = bridge.loop
  168. return (user.try_connect() async for user in cls.all_logged_in())
  169. # region Connection management
  170. async def is_logged_in(self) -> bool:
  171. return bool(self.client) and self._is_logged_in
  172. async def get_puppet(self) -> pu.Puppet | None:
  173. if not self.igpk:
  174. return None
  175. return await pu.Puppet.get_by_pk(self.igpk)
  176. async def get_portal_with(self, puppet: pu.Puppet, create: bool = True) -> po.Portal | None:
  177. if not self.igpk:
  178. return None
  179. portal = await po.Portal.find_private_chat(self.igpk, puppet.pk)
  180. if portal:
  181. return portal
  182. if create:
  183. # TODO add error handling somewhere
  184. thread = await self.client.create_group_thread([puppet.pk])
  185. portal = await po.Portal.get_by_thread(thread, self.igpk)
  186. await portal.update_info(thread, self)
  187. return portal
  188. return None
  189. async def try_connect(self) -> None:
  190. while True:
  191. try:
  192. await self.connect()
  193. except RETRYABLE_PROXY_EXCEPTIONS as e:
  194. # These are retried by the client up to 10 times, but we actually want to retry
  195. # these indefinitely so we capture them here again and retry.
  196. self.log.warning(
  197. f"Proxy error connecting to Instagram: {e}, retrying in 1 minute",
  198. )
  199. await asyncio.sleep(60)
  200. continue
  201. except Exception as e:
  202. self.log.exception("Error while connecting to Instagram")
  203. await self.push_bridge_state(
  204. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  205. )
  206. return
  207. @property
  208. def api_log(self) -> TraceLogger:
  209. return self.ig_base_log.getChild("http").getChild(self.mxid)
  210. @property
  211. def is_connected(self) -> bool:
  212. return bool(self.client) and bool(self.mqtt) and self._is_connected
  213. async def ensure_connected(self, max_wait_seconds: int = 5) -> None:
  214. sleep_interval = 0.1
  215. max_attempts = max_wait_seconds / sleep_interval
  216. attempts = 0
  217. while True:
  218. if self.is_connected:
  219. return
  220. attempts += 1
  221. if attempts > max_attempts:
  222. raise Exception("You're not connected to instagram")
  223. await asyncio.sleep(sleep_interval)
  224. async def connect(self, user: CurrentUser | None = None) -> None:
  225. if not self.state:
  226. await self.push_bridge_state(
  227. BridgeStateEvent.BAD_CREDENTIALS,
  228. error="logged-out",
  229. info={"cnd_action": "reauth"},
  230. )
  231. return
  232. client = AndroidAPI(
  233. self.state,
  234. log=self.api_log,
  235. proxy_handler=self.proxy_handler,
  236. on_proxy_update=self.on_proxy_update,
  237. on_response_error=self.on_response_error,
  238. )
  239. if not user:
  240. try:
  241. resp = await client.current_user()
  242. user = resp.user
  243. except IGNotLoggedInError as e:
  244. self.log.warning(f"Failed to connect to Instagram: {e}, logging out")
  245. await self.logout(error=e)
  246. return
  247. except IGCheckpointError as e:
  248. self.log.debug("Checkpoint error content: %s", e.body)
  249. raise
  250. except (IGChallengeError, IGConsentRequiredError) as e:
  251. await self._handle_checkpoint(e, on="connect", client=client)
  252. return
  253. self.client = client
  254. self._is_logged_in = True
  255. self.igpk = user.pk
  256. self.username = user.username
  257. await self.push_bridge_state(BridgeStateEvent.CONNECTING)
  258. self._track_metric(METRIC_LOGGED_IN, True)
  259. self.by_igpk[self.igpk] = self
  260. self.mqtt = AndroidMQTT(
  261. self.state,
  262. log=self.ig_base_log.getChild("mqtt").getChild(self.mxid),
  263. proxy_handler=self.proxy_handler,
  264. )
  265. self.mqtt.add_event_handler(Connect, self.on_connect)
  266. self.mqtt.add_event_handler(Disconnect, self.on_disconnect)
  267. self.mqtt.add_event_handler(NewSequenceID, self.update_seq_id)
  268. self.mqtt.add_event_handler(MessageSyncEvent, self.handle_message)
  269. self.mqtt.add_event_handler(ThreadSyncEvent, self.handle_thread_sync)
  270. self.mqtt.add_event_handler(ThreadRemoveEvent, self.handle_thread_remove)
  271. self.mqtt.add_event_handler(RealtimeDirectEvent, self.handle_rtd)
  272. self.mqtt.add_event_handler(ProxyUpdate, self.on_proxy_update)
  273. await self.update()
  274. self.loop.create_task(self._try_sync_puppet(user))
  275. self.loop.create_task(self._post_connect())
  276. async def _post_connect(self):
  277. # Backfill requests are handled synchronously so as not to overload the homeserver.
  278. # Users can configure their backfill stages to be more or less aggressive with backfilling
  279. # to try and avoid getting banned.
  280. if not self._backfill_loop_task or self._backfill_loop_task.done():
  281. self._backfill_loop_task = asyncio.create_task(self._handle_backfill_requests_loop())
  282. if not self.seq_id:
  283. await self._try_sync()
  284. else:
  285. self.log.debug("Connecting to MQTT directly as resync_on_startup is false")
  286. self.start_listen()
  287. if self.config["bridge.backfill.enable"]:
  288. if self._thread_sync_task and not self._thread_sync_task.done():
  289. self.log.warning("Cancelling existing background thread sync task")
  290. self._thread_sync_task.cancel()
  291. self._thread_sync_task = asyncio.create_task(self.backfill_threads())
  292. if self.bridge.homeserver_software.is_hungry:
  293. self.log.info("Updating contact info for all users")
  294. asyncio.gather(*[puppet.update_contact_info() async for puppet in pu.Puppet.get_all()])
  295. async def _handle_backfill_requests_loop(self) -> None:
  296. if not self.config["bridge.backfill.enable"] or not self.config["bridge.backfill.msc2716"]:
  297. return
  298. while True:
  299. await self._sync_lock.wait("backfill request")
  300. req = await Backfill.get_next(self.mxid)
  301. if not req:
  302. await asyncio.sleep(30)
  303. continue
  304. self.log.info("Backfill request %s", req)
  305. try:
  306. portal = await po.Portal.get_by_thread_id(
  307. req.portal_thread_id, receiver=req.portal_receiver
  308. )
  309. await req.mark_dispatched()
  310. await portal.backfill(self, req)
  311. await req.mark_done()
  312. except IGNotLoggedInError as e:
  313. self.log.exception("User got logged out during backfill loop")
  314. await self.logout(error=e)
  315. break
  316. except (IGChallengeError, IGConsentRequiredError) as e:
  317. self.log.exception("User got a challenge during backfill loop")
  318. await self._handle_checkpoint(e, on="backfill")
  319. break
  320. except Exception as e:
  321. self.log.exception("Failed to backfill portal %s: %s", req.portal_thread_id, e)
  322. # Don't try again to backfill this portal for a minute.
  323. await req.set_cooldown_timeout(60)
  324. self._backfill_loop_task = None
  325. async def on_connect(self, evt: Connect) -> None:
  326. self.log.debug("Connected to Instagram")
  327. self._track_metric(METRIC_CONNECTED, True)
  328. self._is_connected = True
  329. await self.send_bridge_notice("Connected to Instagram")
  330. await self.push_bridge_state(BridgeStateEvent.CONNECTED)
  331. async def on_disconnect(self, evt: Disconnect) -> None:
  332. self.log.debug("Disconnected from Instagram")
  333. self._track_metric(METRIC_CONNECTED, False)
  334. self._is_connected = False
  335. async def on_proxy_update(self, evt: ProxyUpdate | None = None) -> None:
  336. if self.client:
  337. self.client.setup_http(self.state.cookies.jar)
  338. if self.mqtt:
  339. self.mqtt.setup_proxy()
  340. if self.command_status:
  341. self.command_status["api"].setup_http(self.command_status["state"].cookies.jar)
  342. async def on_response_error(self, err: IGResponseError) -> None:
  343. if isinstance(err, IGNotLoggedInError):
  344. self.log.warning(f"Noticed logout in API error response: {err}")
  345. await self.logout(error=err)
  346. # TODO this stuff could probably be moved to mautrix-python
  347. async def get_notice_room(self) -> RoomID:
  348. if not self.notice_room:
  349. async with self._notice_room_lock:
  350. # If someone already created the room while this call was waiting,
  351. # don't make a new room
  352. if self.notice_room:
  353. return self.notice_room
  354. creation_content = {}
  355. if not self.config["bridge.federate_rooms"]:
  356. creation_content["m.federate"] = False
  357. self.notice_room = await self.az.intent.create_room(
  358. is_direct=True,
  359. invitees=[self.mxid],
  360. topic="Instagram bridge notices",
  361. creation_content=creation_content,
  362. )
  363. await self.update()
  364. return self.notice_room
  365. async def fill_bridge_state(self, state: BridgeState) -> None:
  366. await super().fill_bridge_state(state)
  367. if not state.remote_id:
  368. if self.igpk:
  369. state.remote_id = str(self.igpk)
  370. else:
  371. try:
  372. state.remote_id = self.state.user_id
  373. except IGUserIDNotFoundError:
  374. state.remote_id = None
  375. if self.username:
  376. state.remote_name = f"@{self.username}"
  377. async def get_bridge_states(self) -> list[BridgeState]:
  378. if not self.state:
  379. return []
  380. state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR)
  381. if self.is_connected:
  382. state.state_event = BridgeStateEvent.CONNECTED
  383. elif self._is_refreshing or self.mqtt:
  384. state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
  385. return [state]
  386. async def send_bridge_notice(
  387. self,
  388. text: str,
  389. edit: EventID | None = None,
  390. state_event: BridgeStateEvent | None = None,
  391. important: bool = False,
  392. error_code: str | None = None,
  393. error_message: str | None = None,
  394. info: dict | None = None,
  395. ) -> EventID | None:
  396. if state_event:
  397. await self.push_bridge_state(
  398. state_event,
  399. error=error_code,
  400. message=error_message if error_code else text,
  401. info=info,
  402. )
  403. if self.config["bridge.disable_bridge_notices"]:
  404. return None
  405. if not important and not self.config["bridge.unimportant_bridge_notices"]:
  406. self.log.debug("Not sending unimportant bridge notice: %s", text)
  407. return None
  408. event_id = None
  409. try:
  410. self.log.debug("Sending bridge notice: %s", text)
  411. content = TextMessageEventContent(
  412. body=text, msgtype=(MessageType.TEXT if important else MessageType.NOTICE)
  413. )
  414. if edit:
  415. content.set_edit(edit)
  416. # This is locked to prevent notices going out in the wrong order
  417. async with self._notice_send_lock:
  418. event_id = await self.az.intent.send_message(await self.get_notice_room(), content)
  419. except Exception:
  420. self.log.warning("Failed to send bridge notice", exc_info=True)
  421. return edit or event_id
  422. async def _try_sync_puppet(self, user_info: CurrentUser) -> None:
  423. puppet = await pu.Puppet.get_by_pk(self.igpk)
  424. try:
  425. if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
  426. self.log.info("Automatically enabling custom puppet")
  427. await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
  428. except Exception:
  429. self.log.exception("Failed to automatically enable custom puppet")
  430. try:
  431. await puppet.update_info(user_info, self)
  432. except Exception:
  433. self.log.exception("Failed to update own puppet info")
  434. async def _try_sync(self) -> None:
  435. try:
  436. await self.sync()
  437. except Exception as e:
  438. self.log.exception("Exception while syncing")
  439. if isinstance(e, IGCheckpointError):
  440. self.log.debug("Checkpoint error content: %s", e.body)
  441. await self.push_bridge_state(
  442. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  443. )
  444. async def get_direct_chats(self) -> dict[UserID, list[RoomID]]:
  445. return {
  446. pu.Puppet.get_mxid_from_id(portal.other_user_pk): [portal.mxid]
  447. for portal in await DBPortal.find_private_chats_of(self.igpk)
  448. if portal.mxid
  449. }
  450. async def refresh(self, resync: bool = True, update_proxy: bool = False) -> None:
  451. self._is_refreshing = True
  452. try:
  453. await self.stop_listen()
  454. self.state.reset_pigeon_session_id()
  455. if update_proxy and self.proxy_handler.update_proxy_url(reason="reconnect"):
  456. await self.on_proxy_update()
  457. if resync:
  458. retry_count = 0
  459. minutes = 1
  460. while True:
  461. try:
  462. await self.sync()
  463. return
  464. except Exception as e:
  465. if retry_count >= 4 and minutes < 10:
  466. minutes += 1
  467. retry_count += 1
  468. s = "s" if minutes != 1 else ""
  469. self.log.exception(
  470. f"Error while syncing for refresh, retrying in {minutes} minute{s}"
  471. )
  472. if isinstance(e, IGCheckpointError):
  473. self.log.debug("Checkpoint error content: %s", e.body)
  474. await self.push_bridge_state(
  475. BridgeStateEvent.UNKNOWN_ERROR,
  476. error="unknown-error",
  477. message="An unknown error occurred while connecting to Instagram",
  478. info={"python_error": str(e)},
  479. )
  480. await asyncio.sleep(minutes * 60)
  481. else:
  482. self.start_listen()
  483. finally:
  484. self._is_refreshing = False
  485. async def _handle_checkpoint(
  486. self,
  487. e: IGChallengeError | IGConsentRequiredError,
  488. on: str,
  489. client: AndroidAPI | None = None,
  490. ) -> None:
  491. self.log.warning(f"Got checkpoint error on {on}: {e.body.serialize()}")
  492. client = client or self.client
  493. self.client = None
  494. self.mqtt = None
  495. if isinstance(e, IGConsentRequiredError):
  496. await self.push_bridge_state(
  497. BridgeStateEvent.BAD_CREDENTIALS,
  498. error="ig-consent-required",
  499. info=e.body.serialize(),
  500. )
  501. return
  502. error_code = "ig-checkpoint"
  503. try:
  504. resp = await client.challenge_reset()
  505. info = {
  506. "challenge_context": (
  507. resp.challenge_context.serialize() if resp.challenge_context_str else None
  508. ),
  509. "step_name": resp.step_name,
  510. "step_data": resp.step_data.serialize() if resp.step_data else None,
  511. "user_id": resp.user_id,
  512. "action": resp.action,
  513. "status": resp.status,
  514. "challenge": e.body.challenge.serialize() if e.body.challenge else None,
  515. }
  516. self.log.debug(f"Challenge state: {resp.serialize()}")
  517. if resp.challenge_context.challenge_type_enum == "HACKED_LOCK":
  518. error_code = "ig-checkpoint-locked"
  519. except Exception:
  520. self.log.exception("Error resetting challenge state")
  521. info = {"challenge": e.body.challenge.serialize() if e.body.challenge else None}
  522. await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error=error_code, info=info)
  523. async def _sync_thread(
  524. self, thread: Thread, enqueue_backfill: bool = True, portal: po.Portal | None = None
  525. ) -> bool:
  526. """
  527. Sync a specific thread. Returns whether the thread had messages after the last message in
  528. the database before the sync.
  529. """
  530. self.log.debug(f"Syncing thread {thread.thread_id}")
  531. forward_messages = thread.items
  532. assert self.client
  533. if not portal:
  534. portal = await po.Portal.get_by_thread(thread, self.igpk)
  535. assert portal
  536. else:
  537. assert portal.thread_id == thread.thread_id
  538. # Create or update the Matrix room
  539. if not portal.mxid:
  540. await portal.create_matrix_room(self, thread)
  541. else:
  542. await portal.update_matrix_room(self, thread)
  543. if not self.config["bridge.backfill.enable_initial"]:
  544. return True
  545. last_message = await DBMessage.get_last(portal.mxid)
  546. cursor = thread.oldest_cursor
  547. if last_message:
  548. original_number_of_messages = len(thread.items)
  549. new_messages = [
  550. m for m in thread.items if last_message.ig_timestamp_ms < m.timestamp_ms
  551. ]
  552. forward_messages = new_messages
  553. portal.log.debug(
  554. f"{len(new_messages)}/{original_number_of_messages} messages are after most recent"
  555. " message."
  556. )
  557. # Fetch more messages until we get back to messages that have been bridged already.
  558. while len(new_messages) > 0 and len(new_messages) == original_number_of_messages:
  559. await asyncio.sleep(self.config["bridge.backfill.incremental.page_delay"])
  560. portal.log.debug("Fetching more messages for forward backfill")
  561. resp = await self.client.get_thread(portal.thread_id, cursor=cursor)
  562. if len(resp.thread.items) == 0:
  563. break
  564. original_number_of_messages = len(resp.thread.items)
  565. new_messages = [
  566. m for m in resp.thread.items if last_message.ig_timestamp_ms < m.timestamp_ms
  567. ]
  568. forward_messages = new_messages + forward_messages
  569. cursor = resp.thread.oldest_cursor
  570. portal.log.debug(
  571. f"{len(new_messages)}/{original_number_of_messages} messages are after most "
  572. "recent message."
  573. )
  574. elif not portal.first_event_id:
  575. self.log.debug(
  576. f"Skipping backfilling {portal.thread_id} as the first event ID is not known"
  577. )
  578. return False
  579. if forward_messages:
  580. portal.cursor = cursor
  581. await portal.update()
  582. mark_read = thread.read_state == 0 or (
  583. (hours := self.config["bridge.backfill.unread_hours_threshold"]) > 0
  584. and (
  585. datetime.fromtimestamp(forward_messages[0].timestamp_ms / 1000)
  586. < datetime.now() - timedelta(hours=hours)
  587. )
  588. )
  589. base_insertion_event_id = await portal.backfill_message_page(
  590. self,
  591. list(reversed(forward_messages)),
  592. forward=True,
  593. last_message=last_message,
  594. mark_read=mark_read,
  595. )
  596. if (
  597. not self.bridge.homeserver_software.is_hungry
  598. and self.config["bridge.backfill.msc2716"]
  599. ):
  600. await portal.send_post_backfill_dummy(
  601. forward_messages[0].timestamp, base_insertion_event_id=base_insertion_event_id
  602. )
  603. if (
  604. mark_read
  605. and not self.bridge.homeserver_software.is_hungry
  606. and (puppet := await self.get_puppet())
  607. ):
  608. last_message = await DBMessage.get_last(portal.mxid)
  609. if last_message:
  610. await puppet.intent_for(portal).mark_read(portal.mxid, last_message.mxid)
  611. await portal._update_read_receipts(thread.last_seen_at)
  612. if self.config["bridge.backfill.msc2716"] and enqueue_backfill:
  613. await portal.enqueue_immediate_backfill(self, 1)
  614. return len(forward_messages) > 0
  615. async def sync(self, increment_total_backfilled_portals: bool = False) -> None:
  616. await self.run_with_sync_lock(partial(self._sync, increment_total_backfilled_portals))
  617. async def _sync(self, increment_total_backfilled_portals: bool = False) -> None:
  618. if not self._listen_task:
  619. self.state.reset_pigeon_session_id()
  620. sleep_minutes = 2
  621. while True:
  622. try:
  623. resp = await self.client.get_inbox()
  624. break
  625. except IGNotLoggedInError as e:
  626. self.log.exception("Got not logged in error while syncing")
  627. await self.logout(error=e)
  628. return
  629. except IGRateLimitError as e:
  630. self.log.error(
  631. "Got ratelimit error while trying to get inbox (%s), retrying in %d minutes",
  632. e.body,
  633. sleep_minutes,
  634. )
  635. await self.push_bridge_state(
  636. BridgeStateEvent.TRANSIENT_DISCONNECT, error="ig-rate-limit"
  637. )
  638. await asyncio.sleep(sleep_minutes * 60)
  639. sleep_minutes += 2
  640. except IGCheckpointError as e:
  641. self.log.debug("Checkpoint error content: %s", e.body)
  642. raise
  643. except (IGChallengeError, IGConsentRequiredError) as e:
  644. await self._handle_checkpoint(e, on="sync")
  645. return
  646. self.seq_id = resp.seq_id
  647. self.snapshot_at_ms = resp.snapshot_at_ms
  648. await self.save_seq_id()
  649. if not self._listen_task:
  650. self.start_listen(is_after_sync=True)
  651. sync_count = min(
  652. self.config["bridge.backfill.max_conversations"],
  653. self.config["bridge.max_startup_thread_sync_count"],
  654. )
  655. self.log.debug(f"Fetching {sync_count} threads, 20 at a time...")
  656. local_limit: int | None = sync_count
  657. if sync_count == 0:
  658. return
  659. elif sync_count < 0:
  660. local_limit = None
  661. await self._sync_threads_with_delay(
  662. self.client.iter_inbox(
  663. self._update_seq_id_and_cursor, start_at=resp, local_limit=local_limit
  664. ),
  665. stop_when_threads_have_no_messages_to_backfill=True,
  666. increment_total_backfilled_portals=increment_total_backfilled_portals,
  667. local_limit=local_limit,
  668. )
  669. try:
  670. await self.update_direct_chats()
  671. except Exception:
  672. self.log.exception("Error updating direct chat list")
  673. async def backfill_threads(self):
  674. try:
  675. await self.run_with_sync_lock(self._backfill_threads)
  676. except Exception:
  677. self.log.exception("Error in thread backfill loop")
  678. async def _backfill_threads(self):
  679. assert self.client
  680. if not self.config["bridge.backfill.enable"]:
  681. return
  682. max_conversations = self.config["bridge.backfill.max_conversations"] or 0
  683. if 0 <= max_conversations <= (self.total_backfilled_portals or 0):
  684. self.log.info("Backfill max_conversations count reached, not syncing any more portals")
  685. return
  686. elif self.thread_sync_completed:
  687. self.log.debug("Thread backfill is marked as completed, not syncing more portals")
  688. return
  689. local_limit = (
  690. max_conversations - (self.total_backfilled_portals or 0)
  691. if max_conversations >= 0
  692. else None
  693. )
  694. start_at = None
  695. if self.oldest_cursor:
  696. start_at = DMInboxResponse(
  697. status="",
  698. seq_id=self.seq_id,
  699. snapshot_at_ms=0,
  700. pending_requests_total=0,
  701. has_pending_top_requests=False,
  702. viewer=None,
  703. inbox=DMInbox(
  704. threads=[],
  705. has_older=True,
  706. unseen_count=0,
  707. unseen_count_ts=0,
  708. blended_inbox_enabled=False,
  709. oldest_cursor=self.oldest_cursor,
  710. ),
  711. )
  712. backoff = self.config.get("bridge.backfill.backoff.thread_list", 300)
  713. await self._sync_threads_with_delay(
  714. self.client.iter_inbox(
  715. self._update_seq_id_and_cursor,
  716. start_at=start_at,
  717. local_limit=local_limit,
  718. rate_limit_exceeded_backoff=backoff,
  719. ),
  720. increment_total_backfilled_portals=True,
  721. local_limit=local_limit,
  722. )
  723. await self.update_direct_chats()
  724. def _update_seq_id_and_cursor(self, seq_id: int, cursor: str | None):
  725. self.seq_id = seq_id
  726. if cursor:
  727. self.oldest_cursor = cursor
  728. async def _sync_threads_with_delay(
  729. self,
  730. threads: AsyncIterable[Thread],
  731. increment_total_backfilled_portals: bool = False,
  732. stop_when_threads_have_no_messages_to_backfill: bool = False,
  733. local_limit: int | None = None,
  734. ):
  735. sync_delay = self.config["bridge.backfill.min_sync_thread_delay"]
  736. last_thread_sync_ts = 0.0
  737. found_thread_count = 0
  738. async for thread in threads:
  739. found_thread_count += 1
  740. now = time.monotonic()
  741. if now < last_thread_sync_ts + sync_delay:
  742. delay = last_thread_sync_ts + sync_delay - now
  743. self.log.debug("Thread sync is happening too quickly. Waiting for %ds", delay)
  744. await asyncio.sleep(delay)
  745. last_thread_sync_ts = time.monotonic()
  746. had_new_messages = await self._sync_thread(thread)
  747. if not had_new_messages and stop_when_threads_have_no_messages_to_backfill:
  748. self.log.debug("Got to threads with no new messages. Stopping sync.")
  749. return
  750. if increment_total_backfilled_portals:
  751. self.total_backfilled_portals = (self.total_backfilled_portals or 0) + 1
  752. await self.update()
  753. if local_limit is None or found_thread_count < local_limit:
  754. if local_limit is None:
  755. self.log.info(
  756. "Reached end of thread list with no limit, marking thread sync as completed"
  757. )
  758. else:
  759. self.log.info(
  760. f"Reached end of thread list (got {found_thread_count} with "
  761. f"limit {local_limit}), marking thread sync as completed"
  762. )
  763. self.thread_sync_completed = True
  764. await self.update()
  765. async def run_with_sync_lock(self, func: Callable[[], Awaitable]):
  766. with self._sync_lock:
  767. retry_count = 0
  768. while retry_count < 5:
  769. try:
  770. retry_count += 1
  771. await func()
  772. # The sync was successful. Exit the loop.
  773. return
  774. except IGNotLoggedInError as e:
  775. await self.logout(error=e)
  776. return
  777. except Exception:
  778. self.log.exception(
  779. "Failed to sync threads. Waiting 30 seconds before retrying sync."
  780. )
  781. await asyncio.sleep(30)
  782. # If we get here, it means that the sync has failed five times. If this happens, most
  783. # likely something very bad has happened.
  784. self.log.error("Failed to sync threads five times. Will not retry.")
  785. def start_listen(self, is_after_sync: bool = False) -> None:
  786. self.shutdown = False
  787. task = self._listen(
  788. seq_id=self.seq_id, snapshot_at_ms=self.snapshot_at_ms, is_after_sync=is_after_sync
  789. )
  790. self._listen_task = self.loop.create_task(task)
  791. async def delayed_start_listen(self, sleep: int) -> None:
  792. await asyncio.sleep(sleep)
  793. if self.is_connected:
  794. self.log.debug(
  795. "Already reconnected before delay after MQTT reconnection error finished"
  796. )
  797. else:
  798. self.log.debug("Reconnecting after MQTT connection error")
  799. self.start_listen()
  800. async def fetch_user_and_reconnect(self, sleep_first: int | None = None) -> None:
  801. if sleep_first:
  802. await asyncio.sleep(sleep_first)
  803. if self.is_connected:
  804. self.log.debug("Canceling user fetch, already reconnected")
  805. return
  806. self.log.debug("Refetching current user after disconnection")
  807. errors = 0
  808. while True:
  809. try:
  810. resp = await self.client.current_user()
  811. except RETRYABLE_PROXY_EXCEPTIONS as e:
  812. # These are retried by the client up to 10 times, but we actually want to retry
  813. # these indefinitely so we capture them here again and retry.
  814. self.log.warning(
  815. f"Proxy error fetching user from Instagram: {e}, retrying in 1 minute",
  816. )
  817. await asyncio.sleep(60)
  818. except IGNotLoggedInError as e:
  819. self.log.warning(f"Failed to reconnect to Instagram: {e}, logging out")
  820. await self.logout(error=e)
  821. return
  822. except (IGChallengeError, IGConsentRequiredError) as e:
  823. await self._handle_checkpoint(e, on="reconnect")
  824. return
  825. except IGUnknownError:
  826. errors += 1
  827. if errors > 10:
  828. raise
  829. self.log.warning(
  830. "Non-JSON body while trying to check user for reconnection, retrying in 10s"
  831. )
  832. await asyncio.sleep(10)
  833. except Exception as e:
  834. self.log.exception("Error while reconnecting to Instagram")
  835. if isinstance(e, IGCheckpointError):
  836. self.log.debug("Checkpoint error content: %s", e.body)
  837. await self.push_bridge_state(
  838. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  839. )
  840. return
  841. else:
  842. self.log.debug(f"Confirmed current user {resp.user.pk}")
  843. self.start_listen()
  844. return
  845. async def _listen(self, seq_id: int, snapshot_at_ms: int, is_after_sync: bool) -> None:
  846. try:
  847. await self.mqtt.listen(
  848. graphql_subs={
  849. GraphQLSubscription.app_presence(),
  850. GraphQLSubscription.direct_typing(self.state.user_id),
  851. GraphQLSubscription.direct_status(),
  852. },
  853. skywalker_subs={
  854. SkywalkerSubscription.direct_sub(self.state.user_id),
  855. SkywalkerSubscription.live_sub(self.state.user_id),
  856. },
  857. seq_id=seq_id,
  858. snapshot_at_ms=snapshot_at_ms,
  859. )
  860. except IrisSubscribeError as e:
  861. if is_after_sync:
  862. self.log.exception("Got IrisSubscribeError right after refresh")
  863. await self.send_bridge_notice(
  864. f"Reconnecting failed again after refresh: {e}",
  865. important=True,
  866. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  867. error_code="ig-refresh-connection-error",
  868. error_message=str(e),
  869. info={"python_error": str(e)},
  870. )
  871. else:
  872. self.log.warning(f"Got IrisSubscribeError {e}, refreshing...")
  873. background_task.create(self.refresh())
  874. except MQTTReconnectionError as e:
  875. self.log.warning(
  876. f"Unexpected connection error: {e}, reconnecting in 1 minute", exc_info=True
  877. )
  878. await self.send_bridge_notice(
  879. f"Error in listener: {e}",
  880. important=True,
  881. state_event=BridgeStateEvent.TRANSIENT_DISCONNECT,
  882. error_code="ig-connection-error-socket",
  883. )
  884. self.mqtt.disconnect()
  885. background_task.create(self.delayed_start_listen(sleep=60))
  886. except (MQTTNotConnected, MQTTNotLoggedIn, MQTTConnectionUnauthorized) as e:
  887. self.log.warning(f"Unexpected connection error: {e}, checking auth and reconnecting")
  888. await self.send_bridge_notice(
  889. f"Error in listener: {e}",
  890. important=True,
  891. state_event=BridgeStateEvent.TRANSIENT_DISCONNECT,
  892. error_code="ig-connection-error-maybe-auth",
  893. )
  894. self.mqtt.disconnect()
  895. background_task.create(self.fetch_user_and_reconnect())
  896. except Exception as e:
  897. self.log.exception("Fatal error in listener, reconnecting in 5 minutes")
  898. await self.send_bridge_notice(
  899. "Fatal error in listener (see logs for more info)",
  900. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  901. important=True,
  902. error_code="ig-unknown-connection-error",
  903. info={"python_error": str(e)},
  904. )
  905. self.mqtt.disconnect()
  906. background_task.create(self.fetch_user_and_reconnect(sleep_first=300))
  907. else:
  908. if not self.shutdown:
  909. await self.send_bridge_notice(
  910. "Instagram connection closed without error",
  911. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  912. error_code="ig-disconnected",
  913. )
  914. finally:
  915. self._listen_task = None
  916. self._is_connected = False
  917. self._track_metric(METRIC_CONNECTED, False)
  918. async def stop_listen(self) -> None:
  919. if self.mqtt:
  920. self.shutdown = True
  921. self.mqtt.disconnect()
  922. if self._listen_task:
  923. await self._listen_task
  924. self.shutdown = False
  925. self._track_metric(METRIC_CONNECTED, False)
  926. self._is_connected = False
  927. await self.update()
  928. def stop_backfill_tasks(self) -> None:
  929. if self._backfill_loop_task:
  930. self._backfill_loop_task.cancel()
  931. self._backfill_loop_task = None
  932. if self._thread_sync_task:
  933. self._thread_sync_task.cancel()
  934. self._thread_sync_task = None
  935. async def logout(self, error: IGNotLoggedInError | None = None) -> None:
  936. await self.stop_listen()
  937. self.stop_backfill_tasks()
  938. if self.client and error is None:
  939. try:
  940. await self.client.logout(one_tap_app_login=False)
  941. except Exception:
  942. self.log.debug("Exception logging out", exc_info=True)
  943. if self.mqtt:
  944. self.mqtt.disconnect()
  945. self._track_metric(METRIC_CONNECTED, False)
  946. self._track_metric(METRIC_LOGGED_IN, False)
  947. if error is None:
  948. await self.push_bridge_state(BridgeStateEvent.LOGGED_OUT)
  949. puppet = await pu.Puppet.get_by_pk(self.igpk, create=False)
  950. if puppet and puppet.is_real_user:
  951. await puppet.switch_mxid(None, None)
  952. try:
  953. del self.by_igpk[self.igpk]
  954. except KeyError:
  955. pass
  956. self.igpk = None
  957. else:
  958. self.log.debug("Auth error body: %s", error.body.serialize())
  959. await self.send_bridge_notice(
  960. f"You have been logged out of Instagram: {error.proper_message}",
  961. important=True,
  962. state_event=BridgeStateEvent.BAD_CREDENTIALS,
  963. error_code="ig-auth-error",
  964. error_message=error.proper_message,
  965. info={"cnd_action": "reauth"},
  966. )
  967. self.client = None
  968. self.mqtt = None
  969. self.state = None
  970. self.seq_id = None
  971. if self._seq_id_save_task and not self._seq_id_save_task.done():
  972. self._seq_id_save_task.cancel()
  973. self._seq_id_save_task = None
  974. self.snapshot_at_ms = None
  975. self.thread_sync_completed = False
  976. self._is_logged_in = False
  977. await self.update()
  978. # endregion
  979. # region Event handlers
  980. async def _save_seq_id_after_sleep(self) -> None:
  981. await asyncio.sleep(120)
  982. if self.seq_id is None:
  983. return
  984. self._seq_id_save_task = None
  985. self.log.trace("Saving sequence ID %d/%d", self.seq_id, self.snapshot_at_ms)
  986. try:
  987. await self.save_seq_id()
  988. except Exception:
  989. self.log.exception("Error saving sequence ID")
  990. async def update_seq_id(self, evt: NewSequenceID) -> None:
  991. self.seq_id = evt.seq_id
  992. self.snapshot_at_ms = evt.snapshot_at_ms
  993. if not self._seq_id_save_task or self._seq_id_save_task.done():
  994. self.log.trace("Starting seq id save task (%d/%d)", evt.seq_id, evt.snapshot_at_ms)
  995. self._seq_id_save_task = asyncio.create_task(self._save_seq_id_after_sleep())
  996. else:
  997. self.log.trace("Not starting seq id save task (%d/%d)", evt.seq_id, evt.snapshot_at_ms)
  998. @async_time(METRIC_MESSAGE)
  999. async def handle_message(self, evt: MessageSyncEvent) -> None:
  1000. portal = await po.Portal.get_by_thread_id(evt.message.thread_id, receiver=self.igpk)
  1001. if not portal or not portal.mxid:
  1002. self.log.debug(
  1003. "Got message in thread with no portal, getting info and syncing thread..."
  1004. )
  1005. resp = await self.client.get_thread(evt.message.thread_id)
  1006. portal = await po.Portal.get_by_thread(resp.thread, self.igpk)
  1007. await self._sync_thread(resp.thread, enqueue_backfill=False, portal=portal)
  1008. if not portal.mxid:
  1009. self.log.warning(
  1010. "Room creation appears to have failed, "
  1011. f"dropping message in {evt.message.thread_id}"
  1012. )
  1013. return
  1014. self.log.trace(f"Received message sync event {evt.message}")
  1015. if evt.message.new_reaction:
  1016. await portal.handle_instagram_reaction(
  1017. evt.message, remove=evt.message.op == Operation.REMOVE
  1018. )
  1019. return
  1020. sender = await pu.Puppet.get_by_pk(evt.message.user_id) if evt.message.user_id else None
  1021. if evt.message.is_thread_image:
  1022. await portal.update_thread_image(self, evt.message.thread_image, sender=sender)
  1023. elif evt.message.op == Operation.ADD:
  1024. if not sender:
  1025. # I don't think we care about adds with no sender
  1026. return
  1027. await portal.handle_instagram_item(self, sender, evt.message)
  1028. elif evt.message.op == Operation.REMOVE:
  1029. # Removes don't have a sender, only the message sender can unsend messages anyway
  1030. await portal.handle_instagram_remove(evt.message.item_id)
  1031. elif evt.message.op == Operation.REPLACE:
  1032. await portal.handle_instagram_update(evt.message)
  1033. @async_time(METRIC_THREAD_SYNC)
  1034. async def handle_thread_sync(self, evt: ThreadSyncEvent) -> None:
  1035. self.log.trace("Thread sync event content: %s", evt)
  1036. portal = await po.Portal.get_by_thread(evt, receiver=self.igpk)
  1037. if portal.mxid:
  1038. self.log.debug("Got thread sync event for %s with existing portal", portal.thread_id)
  1039. elif evt.is_group:
  1040. self.log.debug(
  1041. "Got thread sync event for group %s without existing portal, creating room",
  1042. portal.thread_id,
  1043. )
  1044. else:
  1045. self.log.debug(
  1046. "Got thread sync event for DM %s without existing portal, ignoring",
  1047. portal.thread_id,
  1048. )
  1049. return
  1050. await self._sync_thread(evt, enqueue_backfill=False, portal=portal)
  1051. async def handle_thread_remove(self, evt: ThreadRemoveEvent) -> None:
  1052. self.log.debug("Got thread remove event: %s", evt.serialize())
  1053. @async_time(METRIC_RTD)
  1054. async def handle_rtd(self, evt: RealtimeDirectEvent) -> None:
  1055. if not isinstance(evt.value, ActivityIndicatorData):
  1056. return
  1057. now = int(time.time() * 1000)
  1058. date = evt.value.timestamp_ms
  1059. expiry = date + evt.value.ttl
  1060. if expiry < now:
  1061. return
  1062. if evt.activity_indicator_id in self._activity_indicator_ids:
  1063. return
  1064. # TODO clear expired items from this dict
  1065. self._activity_indicator_ids[evt.activity_indicator_id] = expiry
  1066. puppet = await pu.Puppet.get_by_pk(int(evt.value.sender_id))
  1067. portal = await po.Portal.get_by_thread_id(evt.thread_id, receiver=self.igpk)
  1068. if not puppet or not portal or not portal.mxid:
  1069. return
  1070. is_typing = evt.value.activity_status != TypingStatus.OFF
  1071. if puppet.pk == self.igpk:
  1072. self.remote_typing_status = TypingStatus.TEXT if is_typing else TypingStatus.OFF
  1073. await puppet.intent_for(portal).set_typing(portal.mxid, timeout=evt.value.ttl)
  1074. # endregion
  1075. # region Database getters
  1076. def _add_to_cache(self) -> None:
  1077. self.by_mxid[self.mxid] = self
  1078. if self.igpk:
  1079. self.by_igpk[self.igpk] = self
  1080. @classmethod
  1081. @async_getter_lock
  1082. async def get_by_mxid(cls, mxid: UserID, *, create: bool = True) -> User | None:
  1083. # Never allow ghosts to be users
  1084. if pu.Puppet.get_id_from_mxid(mxid):
  1085. return None
  1086. try:
  1087. return cls.by_mxid[mxid]
  1088. except KeyError:
  1089. pass
  1090. user = cast(cls, await super().get_by_mxid(mxid))
  1091. if user is not None:
  1092. user._add_to_cache()
  1093. return user
  1094. if create:
  1095. user = cls(mxid)
  1096. await user.insert()
  1097. user._add_to_cache()
  1098. return user
  1099. return None
  1100. @classmethod
  1101. @async_getter_lock
  1102. async def get_by_igpk(cls, igpk: int) -> User | None:
  1103. try:
  1104. return cls.by_igpk[igpk]
  1105. except KeyError:
  1106. pass
  1107. user = cast(cls, await super().get_by_igpk(igpk))
  1108. if user is not None:
  1109. user._add_to_cache()
  1110. return user
  1111. return None
  1112. @classmethod
  1113. async def all_logged_in(cls) -> AsyncGenerator[User, None]:
  1114. users = await super().all_logged_in()
  1115. user: cls
  1116. for index, user in enumerate(users):
  1117. try:
  1118. yield cls.by_mxid[user.mxid]
  1119. except KeyError:
  1120. user._add_to_cache()
  1121. yield user
  1122. # endregion