user.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2022 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable, cast
  18. import asyncio
  19. import logging
  20. import time
  21. from mauigpapi import AndroidAPI, AndroidMQTT, AndroidState
  22. from mauigpapi.errors import (
  23. IGCheckpointError,
  24. IGConsentRequiredError,
  25. IGNotLoggedInError,
  26. IGRateLimitError,
  27. IGUserIDNotFoundError,
  28. IrisSubscribeError,
  29. MQTTNotConnected,
  30. MQTTNotLoggedIn,
  31. )
  32. from mauigpapi.mqtt import Connect, Disconnect, GraphQLSubscription, SkywalkerSubscription
  33. from mauigpapi.types import (
  34. ActivityIndicatorData,
  35. CurrentUser,
  36. MessageSyncEvent,
  37. Operation,
  38. RealtimeDirectEvent,
  39. Thread,
  40. ThreadSyncEvent,
  41. TypingStatus,
  42. )
  43. from mautrix.appservice import AppService
  44. from mautrix.bridge import BaseUser, async_getter_lock
  45. from mautrix.types import EventID, MessageType, RoomID, TextMessageEventContent, UserID
  46. from mautrix.util.bridge_state import BridgeState, BridgeStateEvent
  47. from mautrix.util.logging import TraceLogger
  48. from mautrix.util.opt_prometheus import Gauge, Summary, async_time
  49. from . import portal as po, puppet as pu
  50. from .config import Config
  51. from .db import Portal as DBPortal, User as DBUser
  52. if TYPE_CHECKING:
  53. from .__main__ import InstagramBridge
  54. METRIC_MESSAGE = Summary("bridge_on_message", "calls to handle_message")
  55. METRIC_THREAD_SYNC = Summary("bridge_on_thread_sync", "calls to handle_thread_sync")
  56. METRIC_RTD = Summary("bridge_on_rtd", "calls to handle_rtd")
  57. METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
  58. METRIC_CONNECTED = Gauge("bridge_connected", "Bridged users connected to Instagram")
  59. BridgeState.human_readable_errors.update(
  60. {
  61. "ig-connection-error": "Instagram disconnected unexpectedly",
  62. "ig-auth-error": "Authentication error from Instagram: {message}",
  63. "ig-checkpoint": "Instagram checkpoint error. Please check the Instagram website.",
  64. "ig-consent-required": "Instagram requires a consent update. Please check the Instagram website.",
  65. "ig-checkpoint-locked": "Instagram checkpoint error. Please check the Instagram website.",
  66. "ig-rate-limit": "Got Instagram ratelimit error, waiting a few minutes before retrying...",
  67. "ig-disconnected": None,
  68. "ig-no-mqtt": "You're not connected to Instagram",
  69. "logged-out": "You're not logged into Instagram",
  70. }
  71. )
  72. class User(DBUser, BaseUser):
  73. ig_base_log: TraceLogger = logging.getLogger("mau.instagram")
  74. _activity_indicator_ids: dict[str, int] = {}
  75. by_mxid: dict[UserID, User] = {}
  76. by_igpk: dict[int, User] = {}
  77. config: Config
  78. az: AppService
  79. loop: asyncio.AbstractEventLoop
  80. client: AndroidAPI | None
  81. mqtt: AndroidMQTT | None
  82. _listen_task: asyncio.Task | None = None
  83. permission_level: str
  84. username: str | None
  85. _notice_room_lock: asyncio.Lock
  86. _notice_send_lock: asyncio.Lock
  87. _is_logged_in: bool
  88. _is_connected: bool
  89. shutdown: bool
  90. remote_typing_status: TypingStatus | None
  91. def __init__(
  92. self,
  93. mxid: UserID,
  94. igpk: int | None = None,
  95. state: AndroidState | None = None,
  96. notice_room: RoomID | None = None,
  97. ) -> None:
  98. super().__init__(mxid=mxid, igpk=igpk, state=state, notice_room=notice_room)
  99. BaseUser.__init__(self)
  100. self._notice_room_lock = asyncio.Lock()
  101. self._notice_send_lock = asyncio.Lock()
  102. perms = self.config.get_permissions(mxid)
  103. self.relay_whitelisted, self.is_whitelisted, self.is_admin, self.permission_level = perms
  104. self.client = None
  105. self.mqtt = None
  106. self.username = None
  107. self._is_logged_in = False
  108. self._is_connected = False
  109. self._is_refreshing = False
  110. self.shutdown = False
  111. self._listen_task = None
  112. self.remote_typing_status = None
  113. @classmethod
  114. def init_cls(cls, bridge: "InstagramBridge") -> AsyncIterable[Awaitable[None]]:
  115. cls.bridge = bridge
  116. cls.config = bridge.config
  117. cls.az = bridge.az
  118. cls.loop = bridge.loop
  119. return (user.try_connect() async for user in cls.all_logged_in())
  120. # region Connection management
  121. async def is_logged_in(self) -> bool:
  122. return bool(self.client) and self._is_logged_in
  123. async def get_puppet(self) -> pu.Puppet | None:
  124. if not self.igpk:
  125. return None
  126. return await pu.Puppet.get_by_pk(self.igpk)
  127. async def get_portal_with(self, puppet: pu.Puppet, create: bool = True) -> po.Portal | None:
  128. if not self.igpk:
  129. return None
  130. portal = await po.Portal.find_private_chat(self.igpk, puppet.pk)
  131. if portal:
  132. return portal
  133. if create:
  134. # TODO add error handling somewhere
  135. thread = await self.client.create_group_thread([puppet.pk])
  136. portal = await po.Portal.get_by_thread(thread, self.igpk)
  137. await portal.update_info(thread, self)
  138. return portal
  139. return None
  140. async def try_connect(self) -> None:
  141. try:
  142. await self.connect()
  143. except Exception as e:
  144. self.log.exception("Error while connecting to Instagram")
  145. await self.push_bridge_state(
  146. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  147. )
  148. @property
  149. def api_log(self) -> TraceLogger:
  150. return self.ig_base_log.getChild("http").getChild(self.mxid)
  151. @property
  152. def is_connected(self) -> bool:
  153. return bool(self.client) and bool(self.mqtt) and self._is_connected
  154. async def connect(self, user: CurrentUser | None = None) -> None:
  155. if not self.state:
  156. await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error="logged-out")
  157. return
  158. client = AndroidAPI(self.state, log=self.api_log)
  159. if not user:
  160. try:
  161. resp = await client.current_user()
  162. user = resp.user
  163. except IGNotLoggedInError as e:
  164. self.log.warning(f"Failed to connect to Instagram: {e}, logging out")
  165. await self.logout(error=e)
  166. return
  167. except (IGCheckpointError, IGConsentRequiredError) as e:
  168. await self._handle_checkpoint(e, on="connect", client=client)
  169. return
  170. self.client = client
  171. self._is_logged_in = True
  172. self.igpk = user.pk
  173. self.username = user.username
  174. await self.push_bridge_state(BridgeStateEvent.CONNECTING)
  175. self._track_metric(METRIC_LOGGED_IN, True)
  176. self.by_igpk[self.igpk] = self
  177. self.mqtt = AndroidMQTT(
  178. self.state, loop=self.loop, log=self.ig_base_log.getChild("mqtt").getChild(self.mxid)
  179. )
  180. self.mqtt.add_event_handler(Connect, self.on_connect)
  181. self.mqtt.add_event_handler(Disconnect, self.on_disconnect)
  182. self.mqtt.add_event_handler(MessageSyncEvent, self.handle_message)
  183. self.mqtt.add_event_handler(ThreadSyncEvent, self.handle_thread_sync)
  184. self.mqtt.add_event_handler(RealtimeDirectEvent, self.handle_rtd)
  185. await self.update()
  186. self.loop.create_task(self._try_sync_puppet(user))
  187. self.loop.create_task(self._try_sync())
  188. async def on_connect(self, evt: Connect) -> None:
  189. self.log.debug("Connected to Instagram")
  190. self._track_metric(METRIC_CONNECTED, True)
  191. self._is_connected = True
  192. await self.send_bridge_notice("Connected to Instagram")
  193. await self.push_bridge_state(BridgeStateEvent.CONNECTED)
  194. async def on_disconnect(self, evt: Disconnect) -> None:
  195. self.log.debug("Disconnected from Instagram")
  196. self._track_metric(METRIC_CONNECTED, False)
  197. self._is_connected = False
  198. # TODO this stuff could probably be moved to mautrix-python
  199. async def get_notice_room(self) -> RoomID:
  200. if not self.notice_room:
  201. async with self._notice_room_lock:
  202. # If someone already created the room while this call was waiting,
  203. # don't make a new room
  204. if self.notice_room:
  205. return self.notice_room
  206. creation_content = {}
  207. if not self.config["bridge.federate_rooms"]:
  208. creation_content["m.federate"] = False
  209. self.notice_room = await self.az.intent.create_room(
  210. is_direct=True,
  211. invitees=[self.mxid],
  212. topic="Instagram bridge notices",
  213. creation_content=creation_content,
  214. )
  215. await self.update()
  216. return self.notice_room
  217. async def fill_bridge_state(self, state: BridgeState) -> None:
  218. await super().fill_bridge_state(state)
  219. if not state.remote_id:
  220. if self.igpk:
  221. state.remote_id = str(self.igpk)
  222. else:
  223. try:
  224. state.remote_id = self.state.user_id
  225. except IGUserIDNotFoundError:
  226. state.remote_id = None
  227. if self.username:
  228. state.remote_name = f"@{self.username}"
  229. async def get_bridge_states(self) -> list[BridgeState]:
  230. if not self.state:
  231. return []
  232. state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR)
  233. if self.is_connected:
  234. state.state_event = BridgeStateEvent.CONNECTED
  235. elif self._is_refreshing or self.mqtt:
  236. state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
  237. return [state]
  238. async def send_bridge_notice(
  239. self,
  240. text: str,
  241. edit: EventID | None = None,
  242. state_event: BridgeStateEvent | None = None,
  243. important: bool = False,
  244. error_code: str | None = None,
  245. error_message: str | None = None,
  246. info: dict | None = None,
  247. ) -> EventID | None:
  248. if state_event:
  249. await self.push_bridge_state(
  250. state_event,
  251. error=error_code,
  252. message=error_message if error_code else text,
  253. info=info,
  254. )
  255. if self.config["bridge.disable_bridge_notices"]:
  256. return None
  257. if not important and not self.config["bridge.unimportant_bridge_notices"]:
  258. self.log.debug("Not sending unimportant bridge notice: %s", text)
  259. return None
  260. event_id = None
  261. try:
  262. self.log.debug("Sending bridge notice: %s", text)
  263. content = TextMessageEventContent(
  264. body=text, msgtype=(MessageType.TEXT if important else MessageType.NOTICE)
  265. )
  266. if edit:
  267. content.set_edit(edit)
  268. # This is locked to prevent notices going out in the wrong order
  269. async with self._notice_send_lock:
  270. event_id = await self.az.intent.send_message(await self.get_notice_room(), content)
  271. except Exception:
  272. self.log.warning("Failed to send bridge notice", exc_info=True)
  273. return edit or event_id
  274. async def _try_sync_puppet(self, user_info: CurrentUser) -> None:
  275. puppet = await pu.Puppet.get_by_pk(self.igpk)
  276. try:
  277. await puppet.update_info(user_info, self)
  278. except Exception:
  279. self.log.exception("Failed to update own puppet info")
  280. try:
  281. if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
  282. self.log.info(f"Automatically enabling custom puppet")
  283. await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
  284. except Exception:
  285. self.log.exception("Failed to automatically enable custom puppet")
  286. async def _try_sync(self) -> None:
  287. try:
  288. await self.sync()
  289. except Exception as e:
  290. self.log.exception("Exception while syncing")
  291. await self.push_bridge_state(
  292. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  293. )
  294. async def get_direct_chats(self) -> dict[UserID, list[RoomID]]:
  295. return {
  296. pu.Puppet.get_mxid_from_id(portal.other_user_pk): [portal.mxid]
  297. for portal in await DBPortal.find_private_chats_of(self.igpk)
  298. if portal.mxid
  299. }
  300. async def refresh(self, resync: bool = True) -> None:
  301. self._is_refreshing = True
  302. try:
  303. await self.stop_listen()
  304. if resync:
  305. retry_count = 0
  306. minutes = 1
  307. while True:
  308. try:
  309. await self.sync()
  310. return
  311. except Exception as e:
  312. if retry_count >= 4 and minutes < 10:
  313. minutes += 1
  314. retry_count += 1
  315. s = "s" if minutes != 1 else ""
  316. self.log.exception(
  317. f"Error while syncing for refresh, retrying in {minutes} minute{s}"
  318. )
  319. await self.push_bridge_state(
  320. BridgeStateEvent.UNKNOWN_ERROR,
  321. error="unknown-error",
  322. message="An unknown error occurred while connecting to Instagram",
  323. info={"python_error": str(e)},
  324. )
  325. await asyncio.sleep(minutes * 60)
  326. else:
  327. await self.start_listen()
  328. finally:
  329. self._is_refreshing = False
  330. async def _handle_checkpoint(
  331. self,
  332. e: IGCheckpointError | IGConsentRequiredError,
  333. on: str,
  334. client: AndroidAPI | None = None,
  335. ) -> None:
  336. self.log.warning(f"Got checkpoint error on {on}: {e.body.serialize()}")
  337. client = client or self.client
  338. self.client = None
  339. self.mqtt = None
  340. if isinstance(e, IGConsentRequiredError):
  341. await self.push_bridge_state(
  342. BridgeStateEvent.BAD_CREDENTIALS,
  343. error="ig-consent-required",
  344. info=e.body.serialize(),
  345. )
  346. return
  347. error_code = "ig-checkpoint"
  348. try:
  349. resp = await client.challenge_reset()
  350. info = {
  351. "challenge_context": (
  352. resp.challenge_context.serialize() if resp.challenge_context_str else None
  353. ),
  354. "step_name": resp.step_name,
  355. "step_data": resp.step_data.serialize() if resp.step_data else None,
  356. "user_id": resp.user_id,
  357. "action": resp.action,
  358. "status": resp.status,
  359. "challenge": e.body.challenge.serialize() if e.body.challenge else None,
  360. }
  361. self.log.debug(f"Challenge state: {resp.serialize()}")
  362. if resp.challenge_context.challenge_type_enum == "HACKED_LOCK":
  363. error_code = "ig-checkpoint-locked"
  364. except Exception:
  365. self.log.exception("Error resetting challenge state")
  366. info = {"challenge": e.body.challenge.serialize() if e.body.challenge else None}
  367. await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error=error_code, info=info)
  368. async def _sync_thread(self, thread: Thread, min_active_at: int) -> None:
  369. portal = await po.Portal.get_by_thread(thread, self.igpk)
  370. if portal.mxid:
  371. self.log.debug(f"{thread.thread_id} has a portal, syncing and backfilling...")
  372. await portal.update_matrix_room(self, thread, backfill=True)
  373. elif thread.last_activity_at > min_active_at:
  374. self.log.debug(f"{thread.thread_id} has been active recently, creating portal...")
  375. await portal.create_matrix_room(self, thread)
  376. else:
  377. self.log.debug(f"{thread.thread_id} is not active and doesn't have a portal")
  378. async def sync(self) -> None:
  379. sleep_minutes = 2
  380. while True:
  381. try:
  382. resp = await self.client.get_inbox()
  383. break
  384. except IGNotLoggedInError as e:
  385. self.log.exception("Got not logged in error while syncing")
  386. await self.logout(error=e)
  387. return
  388. except IGRateLimitError as e:
  389. self.log.error(
  390. "Got ratelimit error while trying to get inbox (%s), retrying in %d minutes",
  391. e.body,
  392. sleep_minutes,
  393. )
  394. await self.push_bridge_state(
  395. BridgeStateEvent.TRANSIENT_DISCONNECT, error="ig-rate-limit"
  396. )
  397. await asyncio.sleep(sleep_minutes * 60)
  398. sleep_minutes += 2
  399. except IGCheckpointError as e:
  400. await self._handle_checkpoint(e, on="sync")
  401. return
  402. if not self._listen_task:
  403. await self.start_listen(resp.seq_id, resp.snapshot_at_ms)
  404. max_age = self.config["bridge.portal_create_max_age"] * 1_000_000
  405. limit = self.config["bridge.chat_sync_limit"]
  406. min_active_at = (time.time() * 1_000_000) - max_age
  407. i = 0
  408. await self.push_bridge_state(BridgeStateEvent.BACKFILLING)
  409. async for thread in self.client.iter_inbox(start_at=resp):
  410. try:
  411. await self._sync_thread(thread, min_active_at)
  412. except Exception:
  413. self.log.exception(f"Error syncing thread {thread.thread_id}")
  414. i += 1
  415. if i >= limit:
  416. break
  417. try:
  418. await self.update_direct_chats()
  419. except Exception:
  420. self.log.exception("Error updating direct chat list")
  421. async def start_listen(
  422. self, seq_id: int | None = None, snapshot_at_ms: int | None = None
  423. ) -> None:
  424. self.shutdown = False
  425. if not seq_id:
  426. resp = await self.client.get_inbox(limit=1)
  427. seq_id, snapshot_at_ms = resp.seq_id, resp.snapshot_at_ms
  428. task = self.listen(seq_id=seq_id, snapshot_at_ms=snapshot_at_ms)
  429. self._listen_task = self.loop.create_task(task)
  430. async def listen(self, seq_id: int, snapshot_at_ms: int) -> None:
  431. try:
  432. await self.mqtt.listen(
  433. graphql_subs={
  434. GraphQLSubscription.app_presence(),
  435. GraphQLSubscription.direct_typing(self.state.user_id),
  436. GraphQLSubscription.direct_status(),
  437. },
  438. skywalker_subs={
  439. SkywalkerSubscription.direct_sub(self.state.user_id),
  440. SkywalkerSubscription.live_sub(self.state.user_id),
  441. },
  442. seq_id=seq_id,
  443. snapshot_at_ms=snapshot_at_ms,
  444. )
  445. except IrisSubscribeError as e:
  446. self.log.warning(f"Got IrisSubscribeError {e}, refreshing...")
  447. await self.refresh()
  448. except (MQTTNotConnected, MQTTNotLoggedIn) as e:
  449. await self.send_bridge_notice(
  450. f"Error in listener: {e}",
  451. important=True,
  452. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  453. error_code="ig-connection-error",
  454. )
  455. self.mqtt.disconnect()
  456. except Exception as e:
  457. self.log.exception("Fatal error in listener")
  458. await self.send_bridge_notice(
  459. "Fatal error in listener (see logs for more info)",
  460. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  461. important=True,
  462. error_code="ig-connection-error",
  463. info={"python_error": str(e)},
  464. )
  465. self.mqtt.disconnect()
  466. else:
  467. if not self.shutdown:
  468. await self.send_bridge_notice(
  469. "Instagram connection closed without error",
  470. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  471. error_code="ig-disconnected",
  472. )
  473. finally:
  474. self._listen_task = None
  475. self._is_connected = False
  476. self._track_metric(METRIC_CONNECTED, False)
  477. async def stop_listen(self) -> None:
  478. if self.mqtt:
  479. self.shutdown = True
  480. self.mqtt.disconnect()
  481. if self._listen_task:
  482. await self._listen_task
  483. self.shutdown = False
  484. self._track_metric(METRIC_CONNECTED, False)
  485. self._is_connected = False
  486. await self.update()
  487. async def logout(self, error: IGNotLoggedInError | None = None) -> None:
  488. if self.client and error is None:
  489. try:
  490. await self.client.logout(one_tap_app_login=False)
  491. except Exception:
  492. self.log.debug("Exception logging out", exc_info=True)
  493. if self.mqtt:
  494. self.mqtt.disconnect()
  495. self._track_metric(METRIC_CONNECTED, False)
  496. self._track_metric(METRIC_LOGGED_IN, False)
  497. if error is None:
  498. await self.push_bridge_state(BridgeStateEvent.LOGGED_OUT)
  499. puppet = await pu.Puppet.get_by_pk(self.igpk, create=False)
  500. if puppet and puppet.is_real_user:
  501. await puppet.switch_mxid(None, None)
  502. try:
  503. del self.by_igpk[self.igpk]
  504. except KeyError:
  505. pass
  506. self.igpk = None
  507. else:
  508. self.log.debug("Auth error body: %s", error.body.serialize())
  509. await self.send_bridge_notice(
  510. f"You have been logged out of Instagram: {error.proper_message}",
  511. important=True,
  512. state_event=BridgeStateEvent.BAD_CREDENTIALS,
  513. error_code="ig-auth-error",
  514. error_message=error.proper_message,
  515. )
  516. self.client = None
  517. self.mqtt = None
  518. self.state = None
  519. self._is_logged_in = False
  520. await self.update()
  521. # endregion
  522. # region Event handlers
  523. @async_time(METRIC_MESSAGE)
  524. async def handle_message(self, evt: MessageSyncEvent) -> None:
  525. portal = await po.Portal.get_by_thread_id(evt.message.thread_id, receiver=self.igpk)
  526. if not portal or not portal.mxid:
  527. self.log.debug("Got message in thread with no portal, getting info...")
  528. resp = await self.client.get_thread(evt.message.thread_id)
  529. portal = await po.Portal.get_by_thread(resp.thread, self.igpk)
  530. self.log.debug("Got info for unknown portal, creating room")
  531. await portal.create_matrix_room(self, resp.thread)
  532. if not portal.mxid:
  533. self.log.warning(
  534. "Room creation appears to have failed, "
  535. f"dropping message in {evt.message.thread_id}"
  536. )
  537. return
  538. self.log.trace(f"Received message sync event {evt.message}")
  539. sender = await pu.Puppet.get_by_pk(evt.message.user_id) if evt.message.user_id else None
  540. if evt.message.op == Operation.ADD:
  541. if not sender:
  542. # I don't think we care about adds with no sender
  543. return
  544. await portal.handle_instagram_item(self, sender, evt.message)
  545. elif evt.message.op == Operation.REMOVE:
  546. # Removes don't have a sender, only the message sender can unsend messages anyway
  547. await portal.handle_instagram_remove(evt.message.item_id)
  548. elif evt.message.op == Operation.REPLACE:
  549. await portal.handle_instagram_update(evt.message)
  550. @async_time(METRIC_THREAD_SYNC)
  551. async def handle_thread_sync(self, evt: ThreadSyncEvent) -> None:
  552. self.log.trace("Received thread sync event %s", evt)
  553. portal = await po.Portal.get_by_thread(evt, receiver=self.igpk)
  554. await portal.create_matrix_room(self, evt)
  555. @async_time(METRIC_RTD)
  556. async def handle_rtd(self, evt: RealtimeDirectEvent) -> None:
  557. if not isinstance(evt.value, ActivityIndicatorData):
  558. return
  559. now = int(time.time() * 1000)
  560. date = evt.value.timestamp_ms
  561. expiry = date + evt.value.ttl
  562. if expiry < now:
  563. return
  564. if evt.activity_indicator_id in self._activity_indicator_ids:
  565. return
  566. # TODO clear expired items from this dict
  567. self._activity_indicator_ids[evt.activity_indicator_id] = expiry
  568. puppet = await pu.Puppet.get_by_pk(int(evt.value.sender_id))
  569. portal = await po.Portal.get_by_thread_id(evt.thread_id, receiver=self.igpk)
  570. if not puppet or not portal or not portal.mxid:
  571. return
  572. is_typing = evt.value.activity_status != TypingStatus.OFF
  573. if puppet.pk == self.igpk:
  574. self.remote_typing_status = TypingStatus.TEXT if is_typing else TypingStatus.OFF
  575. await puppet.intent_for(portal).set_typing(
  576. portal.mxid, is_typing=is_typing, timeout=evt.value.ttl
  577. )
  578. # endregion
  579. # region Database getters
  580. def _add_to_cache(self) -> None:
  581. self.by_mxid[self.mxid] = self
  582. if self.igpk:
  583. self.by_igpk[self.igpk] = self
  584. @classmethod
  585. @async_getter_lock
  586. async def get_by_mxid(cls, mxid: UserID, *, create: bool = True) -> User | None:
  587. # Never allow ghosts to be users
  588. if pu.Puppet.get_id_from_mxid(mxid):
  589. return None
  590. try:
  591. return cls.by_mxid[mxid]
  592. except KeyError:
  593. pass
  594. user = cast(cls, await super().get_by_mxid(mxid))
  595. if user is not None:
  596. user._add_to_cache()
  597. return user
  598. if create:
  599. user = cls(mxid)
  600. await user.insert()
  601. user._add_to_cache()
  602. return user
  603. return None
  604. @classmethod
  605. @async_getter_lock
  606. async def get_by_igpk(cls, igpk: int) -> User | None:
  607. try:
  608. return cls.by_igpk[igpk]
  609. except KeyError:
  610. pass
  611. user = cast(cls, await super().get_by_igpk(igpk))
  612. if user is not None:
  613. user._add_to_cache()
  614. return user
  615. return None
  616. @classmethod
  617. async def all_logged_in(cls) -> AsyncGenerator[User, None]:
  618. users = await super().all_logged_in()
  619. user: cls
  620. for index, user in enumerate(users):
  621. try:
  622. yield cls.by_mxid[user.mxid]
  623. except KeyError:
  624. user._add_to_cache()
  625. yield user
  626. # endregion