user.py 30 KB


  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2022 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable, cast
  18. import asyncio
  19. import logging
  20. import time
  21. from mauigpapi import AndroidAPI, AndroidMQTT, AndroidState
  22. from mauigpapi.errors import (
  23. IGChallengeError,
  24. IGCheckpointError,
  25. IGConsentRequiredError,
  26. IGNotLoggedInError,
  27. IGRateLimitError,
  28. IGUserIDNotFoundError,
  29. IrisSubscribeError,
  30. MQTTNotConnected,
  31. MQTTNotLoggedIn,
  32. )
  33. from mauigpapi.mqtt import (
  34. Connect,
  35. Disconnect,
  36. GraphQLSubscription,
  37. NewSequenceID,
  38. SkywalkerSubscription,
  39. )
  40. from mauigpapi.types import (
  41. ActivityIndicatorData,
  42. CurrentUser,
  43. MessageSyncEvent,
  44. Operation,
  45. RealtimeDirectEvent,
  46. Thread,
  47. ThreadSyncEvent,
  48. TypingStatus,
  49. )
  50. from mautrix.appservice import AppService
  51. from mautrix.bridge import BaseUser, async_getter_lock
  52. from mautrix.types import EventID, MessageType, RoomID, TextMessageEventContent, UserID
  53. from mautrix.util.bridge_state import BridgeState, BridgeStateEvent
  54. from mautrix.util.logging import TraceLogger
  55. from mautrix.util.opt_prometheus import Gauge, Summary, async_time
  56. from . import portal as po, puppet as pu
  57. from .config import Config
  58. from .db import Portal as DBPortal, User as DBUser
  59. if TYPE_CHECKING:
  60. from .__main__ import InstagramBridge
  61. METRIC_MESSAGE = Summary("bridge_on_message", "calls to handle_message")
  62. METRIC_THREAD_SYNC = Summary("bridge_on_thread_sync", "calls to handle_thread_sync")
  63. METRIC_RTD = Summary("bridge_on_rtd", "calls to handle_rtd")
  64. METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
  65. METRIC_CONNECTED = Gauge("bridge_connected", "Bridged users connected to Instagram")
  66. BridgeState.human_readable_errors.update(
  67. {
  68. "ig-connection-error": "Instagram disconnected unexpectedly",
  69. "ig-refresh-connection-error": "Reconnecting failed again after refresh: {message}",
  70. "ig-connection-fatal-error": "Instagram disconnected unexpectedly",
  71. "ig-auth-error": "Authentication error from Instagram: {message}",
  72. "ig-checkpoint": "Instagram checkpoint error. Please check the Instagram website.",
  73. "ig-consent-required": "Instagram requires a consent update. Please check the Instagram website.",
  74. "ig-checkpoint-locked": "Instagram checkpoint error. Please check the Instagram website.",
  75. "ig-rate-limit": "Got Instagram ratelimit error, waiting a few minutes before retrying...",
  76. "ig-disconnected": None,
  77. "ig-no-mqtt": "You're not connected to Instagram",
  78. "logged-out": "You're not logged into Instagram",
  79. }
  80. )
  81. class User(DBUser, BaseUser):
  82. ig_base_log: TraceLogger = logging.getLogger("mau.instagram")
  83. _activity_indicator_ids: dict[str, int] = {}
  84. by_mxid: dict[UserID, User] = {}
  85. by_igpk: dict[int, User] = {}
  86. config: Config
  87. az: AppService
  88. loop: asyncio.AbstractEventLoop
  89. client: AndroidAPI | None
  90. mqtt: AndroidMQTT | None
  91. _listen_task: asyncio.Task | None = None
  92. _seq_id_save_task: asyncio.Task | None
  93. permission_level: str
  94. username: str | None
  95. _notice_room_lock: asyncio.Lock
  96. _notice_send_lock: asyncio.Lock
  97. _is_logged_in: bool
  98. _is_connected: bool
  99. shutdown: bool
  100. remote_typing_status: TypingStatus | None
  101. def __init__(
  102. self,
  103. mxid: UserID,
  104. igpk: int | None = None,
  105. state: AndroidState | None = None,
  106. notice_room: RoomID | None = None,
  107. seq_id: int | None = None,
  108. snapshot_at_ms: int | None = None,
  109. ) -> None:
  110. super().__init__(
  111. mxid=mxid,
  112. igpk=igpk,
  113. state=state,
  114. notice_room=notice_room,
  115. seq_id=seq_id,
  116. snapshot_at_ms=snapshot_at_ms,
  117. )
  118. BaseUser.__init__(self)
  119. self._notice_room_lock = asyncio.Lock()
  120. self._notice_send_lock = asyncio.Lock()
  121. perms = self.config.get_permissions(mxid)
  122. self.relay_whitelisted, self.is_whitelisted, self.is_admin, self.permission_level = perms
  123. self.client = None
  124. self.mqtt = None
  125. self.username = None
  126. self._is_logged_in = False
  127. self._is_connected = False
  128. self._is_refreshing = False
  129. self.shutdown = False
  130. self._listen_task = None
  131. self.remote_typing_status = None
  132. self._seq_id_save_task = None
  133. @classmethod
  134. def init_cls(cls, bridge: "InstagramBridge") -> AsyncIterable[Awaitable[None]]:
  135. cls.bridge = bridge
  136. cls.config = bridge.config
  137. cls.az = bridge.az
  138. cls.loop = bridge.loop
  139. return (user.try_connect() async for user in cls.all_logged_in())
  140. # region Connection management
  141. async def is_logged_in(self) -> bool:
  142. return bool(self.client) and self._is_logged_in
  143. async def get_puppet(self) -> pu.Puppet | None:
  144. if not self.igpk:
  145. return None
  146. return await pu.Puppet.get_by_pk(self.igpk)
  147. async def get_portal_with(self, puppet: pu.Puppet, create: bool = True) -> po.Portal | None:
  148. if not self.igpk:
  149. return None
  150. portal = await po.Portal.find_private_chat(self.igpk, puppet.pk)
  151. if portal:
  152. return portal
  153. if create:
  154. # TODO add error handling somewhere
  155. thread = await self.client.create_group_thread([puppet.pk])
  156. portal = await po.Portal.get_by_thread(thread, self.igpk)
  157. await portal.update_info(thread, self)
  158. return portal
  159. return None
  160. async def try_connect(self) -> None:
  161. try:
  162. await self.connect()
  163. except Exception as e:
  164. self.log.exception("Error while connecting to Instagram")
  165. await self.push_bridge_state(
  166. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  167. )
  168. @property
  169. def api_log(self) -> TraceLogger:
  170. return self.ig_base_log.getChild("http").getChild(self.mxid)
  171. @property
  172. def is_connected(self) -> bool:
  173. return bool(self.client) and bool(self.mqtt) and self._is_connected
  174. async def connect(self, user: CurrentUser | None = None) -> None:
  175. if not self.state:
  176. await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error="logged-out")
  177. return
  178. client = AndroidAPI(self.state, log=self.api_log)
  179. if not user:
  180. try:
  181. resp = await client.current_user()
  182. user = resp.user
  183. except IGNotLoggedInError as e:
  184. self.log.warning(f"Failed to connect to Instagram: {e}, logging out")
  185. await self.logout(error=e)
  186. return
  187. except IGCheckpointError as e:
  188. self.log.debug("Checkpoint error content: %s", e.body)
  189. raise
  190. except (IGChallengeError, IGConsentRequiredError) as e:
  191. await self._handle_checkpoint(e, on="connect", client=client)
  192. return
  193. self.client = client
  194. self._is_logged_in = True
  195. self.igpk = user.pk
  196. self.username = user.username
  197. await self.push_bridge_state(BridgeStateEvent.CONNECTING)
  198. self._track_metric(METRIC_LOGGED_IN, True)
  199. self.by_igpk[self.igpk] = self
  200. self.mqtt = AndroidMQTT(
  201. self.state, loop=self.loop, log=self.ig_base_log.getChild("mqtt").getChild(self.mxid)
  202. )
  203. self.mqtt.add_event_handler(Connect, self.on_connect)
  204. self.mqtt.add_event_handler(Disconnect, self.on_disconnect)
  205. self.mqtt.add_event_handler(NewSequenceID, self.update_seq_id)
  206. self.mqtt.add_event_handler(MessageSyncEvent, self.handle_message)
  207. self.mqtt.add_event_handler(ThreadSyncEvent, self.handle_thread_sync)
  208. self.mqtt.add_event_handler(RealtimeDirectEvent, self.handle_rtd)
  209. await self.update()
  210. self.loop.create_task(self._try_sync_puppet(user))
  211. if not self.seq_id or self.config["bridge.resync_on_startup"]:
  212. self.loop.create_task(self._try_sync())
  213. else:
  214. self.log.debug("Connecting to MQTT directly as resync_on_startup is false")
  215. self.start_listen()
  216. async def on_connect(self, evt: Connect) -> None:
  217. self.log.debug("Connected to Instagram")
  218. self._track_metric(METRIC_CONNECTED, True)
  219. self._is_connected = True
  220. await self.send_bridge_notice("Connected to Instagram")
  221. await self.push_bridge_state(BridgeStateEvent.CONNECTED)
  222. async def on_disconnect(self, evt: Disconnect) -> None:
  223. self.log.debug("Disconnected from Instagram")
  224. self._track_metric(METRIC_CONNECTED, False)
  225. self._is_connected = False
  226. # TODO this stuff could probably be moved to mautrix-python
  227. async def get_notice_room(self) -> RoomID:
  228. if not self.notice_room:
  229. async with self._notice_room_lock:
  230. # If someone already created the room while this call was waiting,
  231. # don't make a new room
  232. if self.notice_room:
  233. return self.notice_room
  234. creation_content = {}
  235. if not self.config["bridge.federate_rooms"]:
  236. creation_content["m.federate"] = False
  237. self.notice_room = await self.az.intent.create_room(
  238. is_direct=True,
  239. invitees=[self.mxid],
  240. topic="Instagram bridge notices",
  241. creation_content=creation_content,
  242. )
  243. await self.update()
  244. return self.notice_room
  245. async def fill_bridge_state(self, state: BridgeState) -> None:
  246. await super().fill_bridge_state(state)
  247. if not state.remote_id:
  248. if self.igpk:
  249. state.remote_id = str(self.igpk)
  250. else:
  251. try:
  252. state.remote_id = self.state.user_id
  253. except IGUserIDNotFoundError:
  254. state.remote_id = None
  255. if self.username:
  256. state.remote_name = f"@{self.username}"
  257. async def get_bridge_states(self) -> list[BridgeState]:
  258. if not self.state:
  259. return []
  260. state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR)
  261. if self.is_connected:
  262. state.state_event = BridgeStateEvent.CONNECTED
  263. elif self._is_refreshing or self.mqtt:
  264. state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
  265. return [state]
  266. async def send_bridge_notice(
  267. self,
  268. text: str,
  269. edit: EventID | None = None,
  270. state_event: BridgeStateEvent | None = None,
  271. important: bool = False,
  272. error_code: str | None = None,
  273. error_message: str | None = None,
  274. info: dict | None = None,
  275. ) -> EventID | None:
  276. if state_event:
  277. await self.push_bridge_state(
  278. state_event,
  279. error=error_code,
  280. message=error_message if error_code else text,
  281. info=info,
  282. )
  283. if self.config["bridge.disable_bridge_notices"]:
  284. return None
  285. if not important and not self.config["bridge.unimportant_bridge_notices"]:
  286. self.log.debug("Not sending unimportant bridge notice: %s", text)
  287. return None
  288. event_id = None
  289. try:
  290. self.log.debug("Sending bridge notice: %s", text)
  291. content = TextMessageEventContent(
  292. body=text, msgtype=(MessageType.TEXT if important else MessageType.NOTICE)
  293. )
  294. if edit:
  295. content.set_edit(edit)
  296. # This is locked to prevent notices going out in the wrong order
  297. async with self._notice_send_lock:
  298. event_id = await self.az.intent.send_message(await self.get_notice_room(), content)
  299. except Exception:
  300. self.log.warning("Failed to send bridge notice", exc_info=True)
  301. return edit or event_id
  302. async def _try_sync_puppet(self, user_info: CurrentUser) -> None:
  303. puppet = await pu.Puppet.get_by_pk(self.igpk)
  304. try:
  305. await puppet.update_info(user_info, self)
  306. except Exception:
  307. self.log.exception("Failed to update own puppet info")
  308. try:
  309. if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
  310. self.log.info(f"Automatically enabling custom puppet")
  311. await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
  312. except Exception:
  313. self.log.exception("Failed to automatically enable custom puppet")
  314. async def _try_sync(self) -> None:
  315. try:
  316. await self.sync()
  317. except Exception as e:
  318. self.log.exception("Exception while syncing")
  319. if isinstance(e, IGCheckpointError):
  320. self.log.debug("Checkpoint error content: %s", e.body)
  321. await self.push_bridge_state(
  322. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  323. )
  324. async def get_direct_chats(self) -> dict[UserID, list[RoomID]]:
  325. return {
  326. pu.Puppet.get_mxid_from_id(portal.other_user_pk): [portal.mxid]
  327. for portal in await DBPortal.find_private_chats_of(self.igpk)
  328. if portal.mxid
  329. }
  330. async def refresh(self, resync: bool = True) -> None:
  331. self._is_refreshing = True
  332. try:
  333. await self.stop_listen()
  334. if resync:
  335. retry_count = 0
  336. minutes = 1
  337. while True:
  338. try:
  339. await self.sync()
  340. return
  341. except Exception as e:
  342. if retry_count >= 4 and minutes < 10:
  343. minutes += 1
  344. retry_count += 1
  345. s = "s" if minutes != 1 else ""
  346. self.log.exception(
  347. f"Error while syncing for refresh, retrying in {minutes} minute{s}"
  348. )
  349. if isinstance(e, IGCheckpointError):
  350. self.log.debug("Checkpoint error content: %s", e.body)
  351. await self.push_bridge_state(
  352. BridgeStateEvent.UNKNOWN_ERROR,
  353. error="unknown-error",
  354. message="An unknown error occurred while connecting to Instagram",
  355. info={"python_error": str(e)},
  356. )
  357. await asyncio.sleep(minutes * 60)
  358. else:
  359. self.start_listen()
  360. finally:
  361. self._is_refreshing = False
  362. async def _handle_checkpoint(
  363. self,
  364. e: IGChallengeError | IGConsentRequiredError,
  365. on: str,
  366. client: AndroidAPI | None = None,
  367. ) -> None:
  368. self.log.warning(f"Got checkpoint error on {on}: {e.body.serialize()}")
  369. client = client or self.client
  370. self.client = None
  371. self.mqtt = None
  372. if isinstance(e, IGConsentRequiredError):
  373. await self.push_bridge_state(
  374. BridgeStateEvent.BAD_CREDENTIALS,
  375. error="ig-consent-required",
  376. info=e.body.serialize(),
  377. )
  378. return
  379. error_code = "ig-checkpoint"
  380. try:
  381. resp = await client.challenge_reset()
  382. info = {
  383. "challenge_context": (
  384. resp.challenge_context.serialize() if resp.challenge_context_str else None
  385. ),
  386. "step_name": resp.step_name,
  387. "step_data": resp.step_data.serialize() if resp.step_data else None,
  388. "user_id": resp.user_id,
  389. "action": resp.action,
  390. "status": resp.status,
  391. "challenge": e.body.challenge.serialize() if e.body.challenge else None,
  392. }
  393. self.log.debug(f"Challenge state: {resp.serialize()}")
  394. if resp.challenge_context.challenge_type_enum == "HACKED_LOCK":
  395. error_code = "ig-checkpoint-locked"
  396. except Exception:
  397. self.log.exception("Error resetting challenge state")
  398. info = {"challenge": e.body.challenge.serialize() if e.body.challenge else None}
  399. await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error=error_code, info=info)
  400. async def _sync_thread(self, thread: Thread, allow_create: bool) -> None:
  401. portal = await po.Portal.get_by_thread(thread, self.igpk)
  402. if portal.mxid:
  403. self.log.debug(f"{thread.thread_id} has a portal, syncing and backfilling...")
  404. await portal.update_matrix_room(self, thread, backfill=True)
  405. elif allow_create:
  406. self.log.debug(f"{thread.thread_id} has been active recently, creating portal...")
  407. await portal.create_matrix_room(self, thread)
  408. else:
  409. self.log.debug(f"{thread.thread_id} is not active and doesn't have a portal")
  410. async def sync(self) -> None:
  411. sleep_minutes = 2
  412. while True:
  413. try:
  414. resp = await self.client.get_inbox()
  415. break
  416. except IGNotLoggedInError as e:
  417. self.log.exception("Got not logged in error while syncing")
  418. await self.logout(error=e)
  419. return
  420. except IGRateLimitError as e:
  421. self.log.error(
  422. "Got ratelimit error while trying to get inbox (%s), retrying in %d minutes",
  423. e.body,
  424. sleep_minutes,
  425. )
  426. await self.push_bridge_state(
  427. BridgeStateEvent.TRANSIENT_DISCONNECT, error="ig-rate-limit"
  428. )
  429. await asyncio.sleep(sleep_minutes * 60)
  430. sleep_minutes += 2
  431. except IGCheckpointError as e:
  432. self.log.debug("Checkpoint error content: %s", e.body)
  433. raise
  434. except (IGChallengeError, IGConsentRequiredError) as e:
  435. await self._handle_checkpoint(e, on="sync")
  436. return
  437. self.seq_id = resp.seq_id
  438. self.snapshot_at_ms = resp.snapshot_at_ms
  439. await self.save_seq_id()
  440. if not self._listen_task:
  441. self.start_listen(is_after_sync=True)
  442. max_age = self.config["bridge.portal_create_max_age"] * 1_000_000
  443. limit = self.config["bridge.chat_sync_limit"]
  444. create_limit = self.config["bridge.chat_create_limit"]
  445. min_active_at = (time.time() * 1_000_000) - max_age
  446. i = 0
  447. await self.push_bridge_state(BridgeStateEvent.BACKFILLING)
  448. async for thread in self.client.iter_inbox(start_at=resp):
  449. try:
  450. await self._sync_thread(
  451. thread=thread,
  452. allow_create=thread.last_activity_at > min_active_at and i < create_limit,
  453. )
  454. except Exception:
  455. self.log.exception(f"Error syncing thread {thread.thread_id}")
  456. i += 1
  457. if i >= limit:
  458. break
  459. try:
  460. await self.update_direct_chats()
  461. except Exception:
  462. self.log.exception("Error updating direct chat list")
  463. def start_listen(self, is_after_sync: bool = False) -> None:
  464. self.shutdown = False
  465. task = self._listen(
  466. seq_id=self.seq_id, snapshot_at_ms=self.snapshot_at_ms, is_after_sync=is_after_sync
  467. )
  468. self._listen_task = self.loop.create_task(task)
  469. async def _listen(self, seq_id: int, snapshot_at_ms: int, is_after_sync: bool) -> None:
  470. try:
  471. await self.mqtt.listen(
  472. graphql_subs={
  473. GraphQLSubscription.app_presence(),
  474. GraphQLSubscription.direct_typing(self.state.user_id),
  475. GraphQLSubscription.direct_status(),
  476. },
  477. skywalker_subs={
  478. SkywalkerSubscription.direct_sub(self.state.user_id),
  479. SkywalkerSubscription.live_sub(self.state.user_id),
  480. },
  481. seq_id=seq_id,
  482. snapshot_at_ms=snapshot_at_ms,
  483. )
  484. except IrisSubscribeError as e:
  485. if is_after_sync:
  486. self.log.exception("Got IrisSubscribeError right after refresh")
  487. await self.send_bridge_notice(
  488. f"Reconnecting failed again after refresh: {e}",
  489. important=True,
  490. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  491. error_code="ig-refresh-connection-error",
  492. error_message=str(e),
  493. info={"python_error": str(e)},
  494. )
  495. else:
  496. self.log.warning(f"Got IrisSubscribeError {e}, refreshing...")
  497. asyncio.create_task(self.refresh())
  498. except (MQTTNotConnected, MQTTNotLoggedIn) as e:
  499. await self.send_bridge_notice(
  500. f"Error in listener: {e}",
  501. important=True,
  502. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  503. error_code="ig-connection-error",
  504. )
  505. self.mqtt.disconnect()
  506. except Exception as e:
  507. self.log.exception("Fatal error in listener")
  508. await self.send_bridge_notice(
  509. "Fatal error in listener (see logs for more info)",
  510. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  511. important=True,
  512. error_code="ig-unknown-connection-error",
  513. info={"python_error": str(e)},
  514. )
  515. self.mqtt.disconnect()
  516. else:
  517. if not self.shutdown:
  518. await self.send_bridge_notice(
  519. "Instagram connection closed without error",
  520. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  521. error_code="ig-disconnected",
  522. )
  523. finally:
  524. self._listen_task = None
  525. self._is_connected = False
  526. self._track_metric(METRIC_CONNECTED, False)
  527. async def stop_listen(self) -> None:
  528. if self.mqtt:
  529. self.shutdown = True
  530. self.mqtt.disconnect()
  531. if self._listen_task:
  532. await self._listen_task
  533. self.shutdown = False
  534. self._track_metric(METRIC_CONNECTED, False)
  535. self._is_connected = False
  536. await self.update()
  537. async def logout(self, error: IGNotLoggedInError | None = None) -> None:
  538. if self.client and error is None:
  539. try:
  540. await self.client.logout(one_tap_app_login=False)
  541. except Exception:
  542. self.log.debug("Exception logging out", exc_info=True)
  543. if self.mqtt:
  544. self.mqtt.disconnect()
  545. self._track_metric(METRIC_CONNECTED, False)
  546. self._track_metric(METRIC_LOGGED_IN, False)
  547. if error is None:
  548. await self.push_bridge_state(BridgeStateEvent.LOGGED_OUT)
  549. puppet = await pu.Puppet.get_by_pk(self.igpk, create=False)
  550. if puppet and puppet.is_real_user:
  551. await puppet.switch_mxid(None, None)
  552. try:
  553. del self.by_igpk[self.igpk]
  554. except KeyError:
  555. pass
  556. self.igpk = None
  557. else:
  558. self.log.debug("Auth error body: %s", error.body.serialize())
  559. await self.send_bridge_notice(
  560. f"You have been logged out of Instagram: {error.proper_message}",
  561. important=True,
  562. state_event=BridgeStateEvent.BAD_CREDENTIALS,
  563. error_code="ig-auth-error",
  564. error_message=error.proper_message,
  565. )
  566. self.client = None
  567. self.mqtt = None
  568. self.state = None
  569. self.seq_id = None
  570. self.snapshot_at_ms = None
  571. self._is_logged_in = False
  572. await self.update()
  573. # endregion
  574. # region Event handlers
  575. async def _save_seq_id_after_sleep(self) -> None:
  576. await asyncio.sleep(120)
  577. self._seq_id_save_task = None
  578. self.log.trace("Saving sequence ID %d/%d", self.seq_id, self.snapshot_at_ms)
  579. try:
  580. await self.save_seq_id()
  581. except Exception:
  582. self.log.exception("Error saving sequence ID")
  583. async def update_seq_id(self, evt: NewSequenceID) -> None:
  584. self.seq_id = evt.seq_id
  585. self.snapshot_at_ms = evt.snapshot_at_ms
  586. if not self._seq_id_save_task or self._seq_id_save_task.done():
  587. self.log.trace("Starting seq id save task (%d/%d)", evt.seq_id, evt.snapshot_at_ms)
  588. self._seq_id_save_task = asyncio.create_task(self._save_seq_id_after_sleep())
  589. else:
  590. self.log.trace("Not starting seq id save task (%d/%d)", evt.seq_id, evt.snapshot_at_ms)
  591. @async_time(METRIC_MESSAGE)
  592. async def handle_message(self, evt: MessageSyncEvent) -> None:
  593. portal = await po.Portal.get_by_thread_id(evt.message.thread_id, receiver=self.igpk)
  594. if not portal or not portal.mxid:
  595. self.log.debug("Got message in thread with no portal, getting info...")
  596. resp = await self.client.get_thread(evt.message.thread_id)
  597. portal = await po.Portal.get_by_thread(resp.thread, self.igpk)
  598. self.log.debug("Got info for unknown portal, creating room")
  599. await portal.create_matrix_room(self, resp.thread)
  600. if not portal.mxid:
  601. self.log.warning(
  602. "Room creation appears to have failed, "
  603. f"dropping message in {evt.message.thread_id}"
  604. )
  605. return
  606. self.log.trace(f"Received message sync event {evt.message}")
  607. sender = await pu.Puppet.get_by_pk(evt.message.user_id) if evt.message.user_id else None
  608. if evt.message.op == Operation.ADD:
  609. if not sender:
  610. # I don't think we care about adds with no sender
  611. return
  612. await portal.handle_instagram_item(self, sender, evt.message)
  613. elif evt.message.op == Operation.REMOVE:
  614. # Removes don't have a sender, only the message sender can unsend messages anyway
  615. await portal.handle_instagram_remove(evt.message.item_id)
  616. elif evt.message.op == Operation.REPLACE:
  617. await portal.handle_instagram_update(evt.message)
  618. @async_time(METRIC_THREAD_SYNC)
  619. async def handle_thread_sync(self, evt: ThreadSyncEvent) -> None:
  620. self.log.trace("Thread sync event content: %s", evt)
  621. portal = await po.Portal.get_by_thread(evt, receiver=self.igpk)
  622. if portal.mxid:
  623. self.log.debug("Got thread sync event for %s with existing portal", portal.thread_id)
  624. await portal.update_matrix_room(self, evt)
  625. elif evt.is_group:
  626. self.log.debug(
  627. "Got thread sync event for group %s without existing portal, creating room",
  628. portal.thread_id,
  629. )
  630. await portal.create_matrix_room(self, evt)
  631. else:
  632. self.log.debug(
  633. "Got thread sync event for DM %s without existing portal, ignoring",
  634. portal.thread_id,
  635. )
  636. @async_time(METRIC_RTD)
  637. async def handle_rtd(self, evt: RealtimeDirectEvent) -> None:
  638. if not isinstance(evt.value, ActivityIndicatorData):
  639. return
  640. now = int(time.time() * 1000)
  641. date = evt.value.timestamp_ms
  642. expiry = date + evt.value.ttl
  643. if expiry < now:
  644. return
  645. if evt.activity_indicator_id in self._activity_indicator_ids:
  646. return
  647. # TODO clear expired items from this dict
  648. self._activity_indicator_ids[evt.activity_indicator_id] = expiry
  649. puppet = await pu.Puppet.get_by_pk(int(evt.value.sender_id))
  650. portal = await po.Portal.get_by_thread_id(evt.thread_id, receiver=self.igpk)
  651. if not puppet or not portal or not portal.mxid:
  652. return
  653. is_typing = evt.value.activity_status != TypingStatus.OFF
  654. if puppet.pk == self.igpk:
  655. self.remote_typing_status = TypingStatus.TEXT if is_typing else TypingStatus.OFF
  656. await puppet.intent_for(portal).set_typing(
  657. portal.mxid, is_typing=is_typing, timeout=evt.value.ttl
  658. )
  659. # endregion
  660. # region Database getters
  661. def _add_to_cache(self) -> None:
  662. self.by_mxid[self.mxid] = self
  663. if self.igpk:
  664. self.by_igpk[self.igpk] = self
  665. @classmethod
  666. @async_getter_lock
  667. async def get_by_mxid(cls, mxid: UserID, *, create: bool = True) -> User | None:
  668. # Never allow ghosts to be users
  669. if pu.Puppet.get_id_from_mxid(mxid):
  670. return None
  671. try:
  672. return cls.by_mxid[mxid]
  673. except KeyError:
  674. pass
  675. user = cast(cls, await super().get_by_mxid(mxid))
  676. if user is not None:
  677. user._add_to_cache()
  678. return user
  679. if create:
  680. user = cls(mxid)
  681. await user.insert()
  682. user._add_to_cache()
  683. return user
  684. return None
  685. @classmethod
  686. @async_getter_lock
  687. async def get_by_igpk(cls, igpk: int) -> User | None:
  688. try:
  689. return cls.by_igpk[igpk]
  690. except KeyError:
  691. pass
  692. user = cast(cls, await super().get_by_igpk(igpk))
  693. if user is not None:
  694. user._add_to_cache()
  695. return user
  696. return None
  697. @classmethod
  698. async def all_logged_in(cls) -> AsyncGenerator[User, None]:
  699. users = await super().all_logged_in()
  700. user: cls
  701. for index, user in enumerate(users):
  702. try:
  703. yield cls.by_mxid[user.mxid]
  704. except KeyError:
  705. user._add_to_cache()
  706. yield user
  707. # endregion