user.py 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2022 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable, Callable, cast
  18. from datetime import datetime, timedelta
  19. from functools import partial
  20. import asyncio
  21. import logging
  22. import time
  23. from aiohttp import ClientConnectionError
  24. from mauigpapi import AndroidAPI, AndroidMQTT, AndroidState, ProxyHandler
  25. from mauigpapi.errors import (
  26. IGChallengeError,
  27. IGCheckpointError,
  28. IGConsentRequiredError,
  29. IGNotLoggedInError,
  30. IGRateLimitError,
  31. IGUserIDNotFoundError,
  32. IrisSubscribeError,
  33. MQTTConnectionUnauthorized,
  34. MQTTNotConnected,
  35. MQTTNotLoggedIn,
  36. )
  37. from mauigpapi.mqtt import (
  38. Connect,
  39. Disconnect,
  40. GraphQLSubscription,
  41. NewSequenceID,
  42. ProxyUpdate,
  43. SkywalkerSubscription,
  44. )
  45. from mauigpapi.types import (
  46. ActivityIndicatorData,
  47. CurrentUser,
  48. MessageSyncEvent,
  49. Operation,
  50. RealtimeDirectEvent,
  51. Thread,
  52. ThreadSyncEvent,
  53. TypingStatus,
  54. )
  55. from mauigpapi.types.direct_inbox import DMInbox, DMInboxResponse
  56. from mautrix.appservice import AppService
  57. from mautrix.bridge import BaseUser, async_getter_lock
  58. from mautrix.types import EventID, MessageType, RoomID, TextMessageEventContent, UserID
  59. from mautrix.util.bridge_state import BridgeState, BridgeStateEvent
  60. from mautrix.util.logging import TraceLogger
  61. from mautrix.util.opt_prometheus import Gauge, Summary, async_time
  62. from mautrix.util.simple_lock import SimpleLock
  63. from . import portal as po, puppet as pu
  64. from .config import Config
  65. from .db import Backfill, Message as DBMessage, Portal as DBPortal, User as DBUser
  66. if TYPE_CHECKING:
  67. from .__main__ import InstagramBridge
  68. try:
  69. from aiohttp_socks import ProxyConnectionError, ProxyError, ProxyTimeoutError
  70. except ImportError:
  71. class ProxyError(Exception):
  72. pass
  73. ProxyConnectionError = ProxyTimeoutError = ProxyError
  74. METRIC_MESSAGE = Summary("bridge_on_message", "calls to handle_message")
  75. METRIC_THREAD_SYNC = Summary("bridge_on_thread_sync", "calls to handle_thread_sync")
  76. METRIC_RTD = Summary("bridge_on_rtd", "calls to handle_rtd")
  77. METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
  78. METRIC_CONNECTED = Gauge("bridge_connected", "Bridged users connected to Instagram")
  79. BridgeState.human_readable_errors.update(
  80. {
  81. "ig-connection-error": "Instagram disconnected unexpectedly",
  82. "ig-refresh-connection-error": "Reconnecting failed again after refresh: {message}",
  83. "ig-connection-fatal-error": "Instagram disconnected unexpectedly",
  84. "ig-auth-error": "Authentication error from Instagram: {message}",
  85. "ig-checkpoint": "Instagram checkpoint error. Please check the Instagram website.",
  86. "ig-consent-required": "Instagram requires a consent update. Please check the Instagram website.",
  87. "ig-checkpoint-locked": "Instagram checkpoint error. Please check the Instagram website.",
  88. "ig-rate-limit": "Got Instagram ratelimit error, waiting a few minutes before retrying...",
  89. "ig-disconnected": None,
  90. "ig-no-mqtt": "You're not connected to Instagram",
  91. "logged-out": "You're not logged into Instagram",
  92. }
  93. )
  94. class User(DBUser, BaseUser):
  95. ig_base_log: TraceLogger = logging.getLogger("mau.instagram")
  96. _activity_indicator_ids: dict[str, int] = {}
  97. by_mxid: dict[UserID, User] = {}
  98. by_igpk: dict[int, User] = {}
  99. config: Config
  100. az: AppService
  101. loop: asyncio.AbstractEventLoop
  102. client: AndroidAPI | None
  103. mqtt: AndroidMQTT | None
  104. _listen_task: asyncio.Task | None = None
  105. _sync_lock: SimpleLock
  106. _backfill_loop_task: asyncio.Task | None
  107. _thread_sync_task: asyncio.Task | None
  108. _seq_id_save_task: asyncio.Task | None
  109. permission_level: str
  110. username: str | None
  111. _notice_room_lock: asyncio.Lock
  112. _notice_send_lock: asyncio.Lock
  113. _is_logged_in: bool
  114. _is_connected: bool
  115. shutdown: bool
  116. remote_typing_status: TypingStatus | None
  117. def __init__(
  118. self,
  119. mxid: UserID,
  120. igpk: int | None = None,
  121. state: AndroidState | None = None,
  122. notice_room: RoomID | None = None,
  123. seq_id: int | None = None,
  124. snapshot_at_ms: int | None = None,
  125. oldest_cursor: str | None = None,
  126. total_backfilled_portals: int | None = None,
  127. thread_sync_completed: bool = False,
  128. ) -> None:
  129. super().__init__(
  130. mxid=mxid,
  131. igpk=igpk,
  132. state=state,
  133. notice_room=notice_room,
  134. seq_id=seq_id,
  135. snapshot_at_ms=snapshot_at_ms,
  136. oldest_cursor=oldest_cursor,
  137. total_backfilled_portals=total_backfilled_portals,
  138. thread_sync_completed=thread_sync_completed,
  139. )
  140. BaseUser.__init__(self)
  141. self._notice_room_lock = asyncio.Lock()
  142. self._notice_send_lock = asyncio.Lock()
  143. perms = self.config.get_permissions(mxid)
  144. self.relay_whitelisted, self.is_whitelisted, self.is_admin, self.permission_level = perms
  145. self.client = None
  146. self.mqtt = None
  147. self.username = None
  148. self._is_logged_in = False
  149. self._is_connected = False
  150. self._is_refreshing = False
  151. self.shutdown = False
  152. self._sync_lock = SimpleLock(
  153. "Waiting for thread sync to finish before handling %s", log=self.log
  154. )
  155. self._listen_task = None
  156. self._thread_sync_task = None
  157. self._backfill_loop_task = None
  158. self.remote_typing_status = None
  159. self._seq_id_save_task = None
  160. self.proxy_handler = ProxyHandler(
  161. api_url=self.config["bridge.get_proxy_api_url"],
  162. )
  163. @classmethod
  164. def init_cls(cls, bridge: "InstagramBridge") -> AsyncIterable[Awaitable[None]]:
  165. cls.bridge = bridge
  166. cls.config = bridge.config
  167. cls.az = bridge.az
  168. cls.loop = bridge.loop
  169. return (user.try_connect() async for user in cls.all_logged_in())
  170. # region Connection management
  171. async def is_logged_in(self) -> bool:
  172. return bool(self.client) and self._is_logged_in
  173. async def get_puppet(self) -> pu.Puppet | None:
  174. if not self.igpk:
  175. return None
  176. return await pu.Puppet.get_by_pk(self.igpk)
  177. async def get_portal_with(self, puppet: pu.Puppet, create: bool = True) -> po.Portal | None:
  178. if not self.igpk:
  179. return None
  180. portal = await po.Portal.find_private_chat(self.igpk, puppet.pk)
  181. if portal:
  182. return portal
  183. if create:
  184. # TODO add error handling somewhere
  185. thread = await self.client.create_group_thread([puppet.pk])
  186. portal = await po.Portal.get_by_thread(thread, self.igpk)
  187. await portal.update_info(thread, self)
  188. return portal
  189. return None
  190. async def try_connect(self) -> None:
  191. try:
  192. await self.connect()
  193. except Exception as e:
  194. self.log.exception("Error while connecting to Instagram")
  195. await self.push_bridge_state(
  196. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  197. )
  198. @property
  199. def api_log(self) -> TraceLogger:
  200. return self.ig_base_log.getChild("http").getChild(self.mxid)
  201. @property
  202. def is_connected(self) -> bool:
  203. return bool(self.client) and bool(self.mqtt) and self._is_connected
  204. async def connect(self, user: CurrentUser | None = None) -> None:
  205. if not self.state:
  206. await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error="logged-out")
  207. return
  208. client = AndroidAPI(
  209. self.state,
  210. log=self.api_log,
  211. proxy_handler=self.proxy_handler,
  212. )
  213. if not user:
  214. try:
  215. resp = await client.current_user()
  216. user = resp.user
  217. except IGNotLoggedInError as e:
  218. self.log.warning(f"Failed to connect to Instagram: {e}, logging out")
  219. await self.logout(error=e)
  220. return
  221. except IGCheckpointError as e:
  222. self.log.debug("Checkpoint error content: %s", e.body)
  223. raise
  224. except (IGChallengeError, IGConsentRequiredError) as e:
  225. await self._handle_checkpoint(e, on="connect", client=client)
  226. return
  227. self.client = client
  228. self._is_logged_in = True
  229. self.igpk = user.pk
  230. self.username = user.username
  231. await self.push_bridge_state(BridgeStateEvent.CONNECTING)
  232. self._track_metric(METRIC_LOGGED_IN, True)
  233. self.by_igpk[self.igpk] = self
  234. self.mqtt = AndroidMQTT(
  235. self.state,
  236. loop=self.loop,
  237. log=self.ig_base_log.getChild("mqtt").getChild(self.mxid),
  238. proxy_handler=self.proxy_handler,
  239. )
  240. self.mqtt.add_event_handler(Connect, self.on_connect)
  241. self.mqtt.add_event_handler(Disconnect, self.on_disconnect)
  242. self.mqtt.add_event_handler(NewSequenceID, self.update_seq_id)
  243. self.mqtt.add_event_handler(MessageSyncEvent, self.handle_message)
  244. self.mqtt.add_event_handler(ThreadSyncEvent, self.handle_thread_sync)
  245. self.mqtt.add_event_handler(RealtimeDirectEvent, self.handle_rtd)
  246. self.mqtt.add_event_handler(ProxyUpdate, self.on_proxy_update)
  247. await self.update()
  248. self.loop.create_task(self._try_sync_puppet(user))
  249. # Backfill requests are handled synchronously so as not to overload the homeserver.
  250. # Users can configure their backfill stages to be more or less aggressive with backfilling
  251. # to try and avoid getting banned.
  252. if not self._backfill_loop_task or self._backfill_loop_task.done():
  253. self._backfill_loop_task = asyncio.create_task(self._handle_backfill_requests_loop())
  254. if not self.seq_id:
  255. await self._try_sync()
  256. else:
  257. self.log.debug("Connecting to MQTT directly as resync_on_startup is false")
  258. self.start_listen()
  259. if self.config["bridge.backfill.enable"]:
  260. if self._thread_sync_task and not self._thread_sync_task.done():
  261. self.log.warning("Cancelling existing background thread sync task")
  262. self._thread_sync_task.cancel()
  263. self._thread_sync_task = asyncio.create_task(self.backfill_threads())
  264. async def _handle_backfill_requests_loop(self) -> None:
  265. while True:
  266. await self._sync_lock.wait("backfill request")
  267. req = await Backfill.get_next(self.mxid)
  268. if not req:
  269. await asyncio.sleep(30)
  270. continue
  271. self.log.info("Backfill request %s", req)
  272. try:
  273. portal = await po.Portal.get_by_thread_id(
  274. req.portal_thread_id, receiver=req.portal_receiver
  275. )
  276. await req.mark_dispatched()
  277. await portal.backfill(self, req)
  278. await req.mark_done()
  279. except Exception as e:
  280. self.log.exception("Failed to backfill portal %s: %s", req.portal_thread_id, e)
  281. # Don't try again to backfill this portal for a minute.
  282. await req.set_cooldown_timeout(60)
  283. async def on_connect(self, evt: Connect) -> None:
  284. self.log.debug("Connected to Instagram")
  285. self._track_metric(METRIC_CONNECTED, True)
  286. self._is_connected = True
  287. await self.send_bridge_notice("Connected to Instagram")
  288. await self.push_bridge_state(BridgeStateEvent.CONNECTED)
  289. async def on_disconnect(self, evt: Disconnect) -> None:
  290. self.log.debug("Disconnected from Instagram")
  291. self._track_metric(METRIC_CONNECTED, False)
  292. self._is_connected = False
  293. async def on_proxy_update(self, evt: ProxyUpdate | None = None) -> None:
  294. if self.client:
  295. self.client.setup_http(self.state.cookies.jar)
  296. # TODO this stuff could probably be moved to mautrix-python
  297. async def get_notice_room(self) -> RoomID:
  298. if not self.notice_room:
  299. async with self._notice_room_lock:
  300. # If someone already created the room while this call was waiting,
  301. # don't make a new room
  302. if self.notice_room:
  303. return self.notice_room
  304. creation_content = {}
  305. if not self.config["bridge.federate_rooms"]:
  306. creation_content["m.federate"] = False
  307. self.notice_room = await self.az.intent.create_room(
  308. is_direct=True,
  309. invitees=[self.mxid],
  310. topic="Instagram bridge notices",
  311. creation_content=creation_content,
  312. )
  313. await self.update()
  314. return self.notice_room
  315. async def fill_bridge_state(self, state: BridgeState) -> None:
  316. await super().fill_bridge_state(state)
  317. if not state.remote_id:
  318. if self.igpk:
  319. state.remote_id = str(self.igpk)
  320. else:
  321. try:
  322. state.remote_id = self.state.user_id
  323. except IGUserIDNotFoundError:
  324. state.remote_id = None
  325. if self.username:
  326. state.remote_name = f"@{self.username}"
  327. async def get_bridge_states(self) -> list[BridgeState]:
  328. if not self.state:
  329. return []
  330. state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR)
  331. if self.is_connected:
  332. state.state_event = BridgeStateEvent.CONNECTED
  333. elif self._is_refreshing or self.mqtt:
  334. state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
  335. return [state]
  336. async def send_bridge_notice(
  337. self,
  338. text: str,
  339. edit: EventID | None = None,
  340. state_event: BridgeStateEvent | None = None,
  341. important: bool = False,
  342. error_code: str | None = None,
  343. error_message: str | None = None,
  344. info: dict | None = None,
  345. ) -> EventID | None:
  346. if state_event:
  347. await self.push_bridge_state(
  348. state_event,
  349. error=error_code,
  350. message=error_message if error_code else text,
  351. info=info,
  352. )
  353. if self.config["bridge.disable_bridge_notices"]:
  354. return None
  355. if not important and not self.config["bridge.unimportant_bridge_notices"]:
  356. self.log.debug("Not sending unimportant bridge notice: %s", text)
  357. return None
  358. event_id = None
  359. try:
  360. self.log.debug("Sending bridge notice: %s", text)
  361. content = TextMessageEventContent(
  362. body=text, msgtype=(MessageType.TEXT if important else MessageType.NOTICE)
  363. )
  364. if edit:
  365. content.set_edit(edit)
  366. # This is locked to prevent notices going out in the wrong order
  367. async with self._notice_send_lock:
  368. event_id = await self.az.intent.send_message(await self.get_notice_room(), content)
  369. except Exception:
  370. self.log.warning("Failed to send bridge notice", exc_info=True)
  371. return edit or event_id
  372. async def _try_sync_puppet(self, user_info: CurrentUser) -> None:
  373. puppet = await pu.Puppet.get_by_pk(self.igpk)
  374. try:
  375. await puppet.update_info(user_info, self)
  376. except Exception:
  377. self.log.exception("Failed to update own puppet info")
  378. try:
  379. if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
  380. self.log.info(f"Automatically enabling custom puppet")
  381. await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
  382. except Exception:
  383. self.log.exception("Failed to automatically enable custom puppet")
  384. async def _try_sync(self) -> None:
  385. try:
  386. await self.sync()
  387. except Exception as e:
  388. self.log.exception("Exception while syncing")
  389. if isinstance(e, IGCheckpointError):
  390. self.log.debug("Checkpoint error content: %s", e.body)
  391. await self.push_bridge_state(
  392. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  393. )
  394. async def get_direct_chats(self) -> dict[UserID, list[RoomID]]:
  395. return {
  396. pu.Puppet.get_mxid_from_id(portal.other_user_pk): [portal.mxid]
  397. for portal in await DBPortal.find_private_chats_of(self.igpk)
  398. if portal.mxid
  399. }
  400. async def refresh(self, resync: bool = True) -> None:
  401. self._is_refreshing = True
  402. try:
  403. await self.stop_listen()
  404. if resync:
  405. retry_count = 0
  406. minutes = 1
  407. while True:
  408. try:
  409. await self.sync()
  410. return
  411. except Exception as e:
  412. if retry_count >= 4 and minutes < 10:
  413. minutes += 1
  414. retry_count += 1
  415. s = "s" if minutes != 1 else ""
  416. self.log.exception(
  417. f"Error while syncing for refresh, retrying in {minutes} minute{s}"
  418. )
  419. if isinstance(e, IGCheckpointError):
  420. self.log.debug("Checkpoint error content: %s", e.body)
  421. await self.push_bridge_state(
  422. BridgeStateEvent.UNKNOWN_ERROR,
  423. error="unknown-error",
  424. message="An unknown error occurred while connecting to Instagram",
  425. info={"python_error": str(e)},
  426. )
  427. await asyncio.sleep(minutes * 60)
  428. else:
  429. self.start_listen()
  430. finally:
  431. self._is_refreshing = False
  432. self.proxy_handler.update_proxy_url()
  433. async def _handle_checkpoint(
  434. self,
  435. e: IGChallengeError | IGConsentRequiredError,
  436. on: str,
  437. client: AndroidAPI | None = None,
  438. ) -> None:
  439. self.log.warning(f"Got checkpoint error on {on}: {e.body.serialize()}")
  440. client = client or self.client
  441. self.client = None
  442. self.mqtt = None
  443. if isinstance(e, IGConsentRequiredError):
  444. await self.push_bridge_state(
  445. BridgeStateEvent.BAD_CREDENTIALS,
  446. error="ig-consent-required",
  447. info=e.body.serialize(),
  448. )
  449. return
  450. error_code = "ig-checkpoint"
  451. try:
  452. resp = await client.challenge_reset()
  453. info = {
  454. "challenge_context": (
  455. resp.challenge_context.serialize() if resp.challenge_context_str else None
  456. ),
  457. "step_name": resp.step_name,
  458. "step_data": resp.step_data.serialize() if resp.step_data else None,
  459. "user_id": resp.user_id,
  460. "action": resp.action,
  461. "status": resp.status,
  462. "challenge": e.body.challenge.serialize() if e.body.challenge else None,
  463. }
  464. self.log.debug(f"Challenge state: {resp.serialize()}")
  465. if resp.challenge_context.challenge_type_enum == "HACKED_LOCK":
  466. error_code = "ig-checkpoint-locked"
  467. except Exception:
  468. self.log.exception("Error resetting challenge state")
  469. info = {"challenge": e.body.challenge.serialize() if e.body.challenge else None}
  470. await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error=error_code, info=info)
  471. async def _sync_thread(self, thread: Thread) -> bool:
  472. """
  473. Sync a specific thread. Returns whether the thread had messages after the last message in
  474. the database before the sync.
  475. """
  476. self.log.debug(f"Syncing thread {thread.thread_id}")
  477. forward_messages = thread.items
  478. assert self.client
  479. portal = await po.Portal.get_by_thread(thread, self.igpk)
  480. assert portal
  481. # Create or update the Matrix room
  482. if not portal.mxid:
  483. await portal.create_matrix_room(self, thread)
  484. else:
  485. await portal.update_matrix_room(self, thread)
  486. last_message = await DBMessage.get_last(portal.mxid)
  487. if last_message:
  488. original_number_of_messages = len(thread.items)
  489. new_messages = [
  490. m for m in thread.items if last_message.ig_timestamp_ms < m.timestamp_ms
  491. ]
  492. forward_messages = new_messages
  493. portal.log.debug(
  494. f"{len(new_messages)}/{original_number_of_messages} messages are after most recent"
  495. " message."
  496. )
  497. # Fetch more messages until we get back to messages that have been bridged already.
  498. cursor = thread.prev_cursor
  499. while len(new_messages) > 0 and len(new_messages) == original_number_of_messages:
  500. await asyncio.sleep(self.config["bridge.backfill.incremental.page_delay"])
  501. portal.log.debug("Fetching more messages for forward backfill")
  502. resp = await self.client.get_thread(portal.thread_id, cursor=cursor)
  503. if len(resp.thread.items) == 0:
  504. break
  505. original_number_of_messages = len(resp.thread.items)
  506. new_messages = [
  507. m for m in resp.thread.items if last_message.ig_timestamp_ms < m.timestamp_ms
  508. ]
  509. forward_messages = new_messages + forward_messages
  510. cursor = resp.thread.prev_cursor
  511. portal.log.debug(
  512. f"{len(new_messages)}/{original_number_of_messages} messages are after most "
  513. "recent message."
  514. )
  515. portal.cursor = cursor
  516. await portal.update()
  517. elif not portal.first_event_id:
  518. self.log.debug(
  519. f"Skipping backfilling {portal.thread_id} as the first event ID is not known"
  520. )
  521. return False
  522. if forward_messages:
  523. mark_read = thread.read_state == 0 or (
  524. (hours := self.config["bridge.backfill.unread_hours_threshold"]) > 0
  525. and (
  526. datetime.fromtimestamp(forward_messages[0].timestamp_ms / 1000)
  527. < datetime.now() - timedelta(hours=hours)
  528. )
  529. )
  530. base_insertion_event_id = await portal.backfill_message_page(
  531. self,
  532. list(reversed(forward_messages)),
  533. forward=True,
  534. last_message=last_message,
  535. mark_read=mark_read,
  536. )
  537. if not self.bridge.homeserver_software.is_hungry:
  538. await portal.send_post_backfill_dummy(
  539. forward_messages[0].timestamp, base_insertion_event_id=base_insertion_event_id
  540. )
  541. if (
  542. mark_read
  543. and not self.bridge.homeserver_software.is_hungry
  544. and (puppet := await self.get_puppet())
  545. ):
  546. last_message = await DBMessage.get_last(portal.mxid)
  547. if last_message:
  548. await puppet.intent_for(portal).mark_read(portal.mxid, last_message.mxid)
  549. if self.config["bridge.backfill.msc2716"]:
  550. await portal.enqueue_immediate_backfill(self, 1)
  551. return len(forward_messages) > 0
  552. async def _maybe_update_proxy(self, source: str) -> None:
  553. if not self._listen_task:
  554. self.proxy_handler.update_proxy_url()
  555. await self.on_proxy_update()
  556. else:
  557. self.log.debug(f"Not updating proxy: listen_task is still running? (caller: {source})")
  558. async def sync(self, increment_total_backfilled_portals: bool = False) -> None:
  559. await self.run_with_sync_lock(partial(self._sync, increment_total_backfilled_portals))
  560. async def _sync(self, increment_total_backfilled_portals: bool = False) -> None:
  561. sleep_minutes = 2
  562. errors = 0
  563. while True:
  564. try:
  565. resp = await self.client.get_inbox()
  566. break
  567. except (
  568. ProxyError,
  569. ProxyTimeoutError,
  570. ProxyConnectionError,
  571. ClientConnectionError,
  572. ConnectionError,
  573. asyncio.TimeoutError,
  574. ) as e:
  575. errors += 1
  576. wait = min(errors * 10, 60)
  577. self.log.warning(
  578. f"{e.__class__.__name__} while trying to sync, retrying in {wait} seconds: {e}"
  579. )
  580. await asyncio.sleep(wait)
  581. await self._maybe_update_proxy("sync error")
  582. except IGNotLoggedInError as e:
  583. self.log.exception("Got not logged in error while syncing")
  584. await self.logout(error=e)
  585. return
  586. except IGRateLimitError as e:
  587. self.log.error(
  588. "Got ratelimit error while trying to get inbox (%s), retrying in %d minutes",
  589. e.body,
  590. sleep_minutes,
  591. )
  592. await self.push_bridge_state(
  593. BridgeStateEvent.TRANSIENT_DISCONNECT, error="ig-rate-limit"
  594. )
  595. await asyncio.sleep(sleep_minutes * 60)
  596. sleep_minutes += 2
  597. except IGCheckpointError as e:
  598. self.log.debug("Checkpoint error content: %s", e.body)
  599. raise
  600. except (IGChallengeError, IGConsentRequiredError) as e:
  601. await self._handle_checkpoint(e, on="sync")
  602. return
  603. self.seq_id = resp.seq_id
  604. self.snapshot_at_ms = resp.snapshot_at_ms
  605. await self.save_seq_id()
  606. if not self._listen_task:
  607. self.start_listen(is_after_sync=True)
  608. sync_count = min(
  609. self.config["bridge.backfill.max_conversations"],
  610. self.config["bridge.max_startup_thread_sync_count"],
  611. )
  612. self.log.debug(f"Fetching {sync_count} threads, 20 at a time...")
  613. local_limit: int | None = sync_count
  614. if sync_count == 0:
  615. return
  616. elif sync_count < 0:
  617. local_limit = None
  618. await self._sync_threads_with_delay(
  619. self.client.iter_inbox(start_at=resp, local_limit=local_limit),
  620. stop_when_threads_have_no_messages_to_backfill=True,
  621. increment_total_backfilled_portals=increment_total_backfilled_portals,
  622. local_limit=local_limit,
  623. )
  624. try:
  625. await self.update_direct_chats()
  626. except Exception:
  627. self.log.exception("Error updating direct chat list")
  628. async def backfill_threads(self):
  629. try:
  630. await self.run_with_sync_lock(self._backfill_threads)
  631. except Exception:
  632. self.log.exception("Error in thread backfill loop")
  633. async def _backfill_threads(self):
  634. assert self.client
  635. if not self.config["bridge.backfill.enable"]:
  636. return
  637. max_conversations = self.config["bridge.backfill.max_conversations"] or 0
  638. if 0 <= max_conversations <= (self.total_backfilled_portals or 0):
  639. self.log.info("Backfill max_conversations count reached, not syncing any more portals")
  640. return
  641. elif self.thread_sync_completed:
  642. self.log.debug("Thread backfill is marked as completed, not syncing more portals")
  643. return
  644. local_limit = (
  645. max_conversations - (self.total_backfilled_portals or 0)
  646. if max_conversations >= 0
  647. else None
  648. )
  649. start_at = None
  650. if self.oldest_cursor:
  651. start_at = DMInboxResponse(
  652. status="",
  653. seq_id=self.seq_id,
  654. snapshot_at_ms=0,
  655. pending_requests_total=0,
  656. has_pending_top_requests=False,
  657. viewer=None,
  658. inbox=DMInbox(
  659. threads=[],
  660. has_older=True,
  661. unseen_count=0,
  662. unseen_count_ts=0,
  663. blended_inbox_enabled=False,
  664. oldest_cursor=self.oldest_cursor,
  665. ),
  666. )
  667. backoff = self.config.get("bridge.backfill.backoff.thread_list", 300)
  668. await self._sync_threads_with_delay(
  669. self.client.iter_inbox(
  670. start_at,
  671. local_limit=local_limit,
  672. rate_limit_exceeded_backoff=backoff,
  673. ),
  674. increment_total_backfilled_portals=True,
  675. local_limit=local_limit,
  676. )
  677. await self.update_direct_chats()
  678. async def _sync_threads_with_delay(
  679. self,
  680. threads: AsyncIterable[tuple[Thread, int | None, str | None]],
  681. increment_total_backfilled_portals: bool = False,
  682. stop_when_threads_have_no_messages_to_backfill: bool = False,
  683. local_limit: int | None = None,
  684. ):
  685. sync_delay = self.config["bridge.backfill.min_sync_thread_delay"]
  686. last_thread_sync_ts = 0.0
  687. found_thread_count = 0
  688. async for thread, seq_id, cursor in threads:
  689. found_thread_count += 1
  690. now = time.monotonic()
  691. if last_thread_sync_ts is not None and now < last_thread_sync_ts + sync_delay:
  692. delay = last_thread_sync_ts + sync_delay - now
  693. self.log.debug("Thread sync is happening too quickly. Waiting for %ds", delay)
  694. await asyncio.sleep(delay)
  695. last_thread_sync_ts = now
  696. had_new_messages = await self._sync_thread(thread)
  697. if not had_new_messages and stop_when_threads_have_no_messages_to_backfill:
  698. self.log.debug("Got to threads with no new messages. Stopping sync.")
  699. return
  700. if increment_total_backfilled_portals:
  701. self.total_backfilled_portals = (self.total_backfilled_portals or 0) + 1
  702. if seq_id:
  703. self.seq_id = seq_id
  704. if cursor:
  705. self.oldest_cursor = cursor
  706. await self.update()
  707. if local_limit is None or found_thread_count < local_limit:
  708. if local_limit is None:
  709. self.log.info(
  710. "Reached end of thread list with no limit, marking thread sync as completed"
  711. )
  712. else:
  713. self.log.info(
  714. f"Reached end of thread list (got {found_thread_count} with "
  715. f"limit {local_limit}), marking thread sync as completed"
  716. )
  717. self.thread_sync_completed = True
  718. await self.update()
  719. async def run_with_sync_lock(self, func: Callable[[], Awaitable]):
  720. with self._sync_lock:
  721. retry_count = 0
  722. while retry_count < 5:
  723. try:
  724. retry_count += 1
  725. await func()
  726. # The sync was successful. Exit the loop.
  727. return
  728. except IGNotLoggedInError as e:
  729. await self.send_bridge_notice(
  730. f"You have been logged out of Instagram: {e!s}",
  731. important=True,
  732. state_event=BridgeStateEvent.BAD_CREDENTIALS,
  733. error_code="ig-auth-error",
  734. error_message=str(e),
  735. )
  736. await self.logout(error=e)
  737. return
  738. except Exception:
  739. self.log.exception(
  740. "Failed to sync threads. Waiting 30 seconds before retrying sync."
  741. )
  742. await asyncio.sleep(30)
  743. # If we get here, it means that the sync has failed five times. If this happens, most
  744. # likely something very bad has happened.
  745. self.log.error("Failed to sync threads five times. Will not retry.")
  746. def start_listen(self, is_after_sync: bool = False) -> None:
  747. self.shutdown = False
  748. task = self._listen(
  749. seq_id=self.seq_id, snapshot_at_ms=self.snapshot_at_ms, is_after_sync=is_after_sync
  750. )
  751. self._listen_task = self.loop.create_task(task)
  752. async def fetch_user_and_reconnect(self) -> None:
  753. self.log.debug("Refetching current user after disconnection")
  754. errors = 0
  755. while True:
  756. try:
  757. resp = await self.client.current_user()
  758. except (
  759. ProxyError,
  760. ProxyTimeoutError,
  761. ProxyConnectionError,
  762. ClientConnectionError,
  763. ConnectionError,
  764. asyncio.TimeoutError,
  765. ) as e:
  766. errors += 1
  767. wait = min(errors * 10, 60)
  768. self.log.warning(
  769. f"{e.__class__.__name__} while trying to check user for reconnection, "
  770. f"retrying in {wait} seconds: {e}"
  771. )
  772. await asyncio.sleep(wait)
  773. await self._maybe_update_proxy("fetch_user_and_reconnect error")
  774. except IGNotLoggedInError as e:
  775. self.log.warning(f"Failed to reconnect to Instagram: {e}, logging out")
  776. await self.logout(error=e)
  777. return
  778. except (IGChallengeError, IGConsentRequiredError) as e:
  779. await self._handle_checkpoint(e, on="reconnect")
  780. return
  781. except Exception as e:
  782. self.log.exception("Error while reconnecting to Instagram")
  783. if isinstance(e, IGCheckpointError):
  784. self.log.debug("Checkpoint error content: %s", e.body)
  785. await self.push_bridge_state(
  786. BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
  787. )
  788. return
  789. else:
  790. self.log.debug(f"Confirmed current user {resp.user.pk}")
  791. self.start_listen()
  792. return
  793. async def _listen(self, seq_id: int, snapshot_at_ms: int, is_after_sync: bool) -> None:
  794. try:
  795. await self.mqtt.listen(
  796. graphql_subs={
  797. GraphQLSubscription.app_presence(),
  798. GraphQLSubscription.direct_typing(self.state.user_id),
  799. GraphQLSubscription.direct_status(),
  800. },
  801. skywalker_subs={
  802. SkywalkerSubscription.direct_sub(self.state.user_id),
  803. SkywalkerSubscription.live_sub(self.state.user_id),
  804. },
  805. seq_id=seq_id,
  806. snapshot_at_ms=snapshot_at_ms,
  807. )
  808. except IrisSubscribeError as e:
  809. if is_after_sync:
  810. self.log.exception("Got IrisSubscribeError right after refresh")
  811. await self.send_bridge_notice(
  812. f"Reconnecting failed again after refresh: {e}",
  813. important=True,
  814. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  815. error_code="ig-refresh-connection-error",
  816. error_message=str(e),
  817. info={"python_error": str(e)},
  818. )
  819. else:
  820. self.log.warning(f"Got IrisSubscribeError {e}, refreshing...")
  821. asyncio.create_task(self.refresh())
  822. except (MQTTNotConnected, MQTTNotLoggedIn, MQTTConnectionUnauthorized) as e:
  823. self.log.warning(
  824. f"Unexpected connection error: {e}", exc_info="MQTT reconnection failed" in str(e)
  825. )
  826. await self.send_bridge_notice(
  827. f"Error in listener: {e}",
  828. important=True,
  829. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  830. error_code="ig-connection-error",
  831. )
  832. self.mqtt.disconnect()
  833. asyncio.create_task(self.fetch_user_and_reconnect())
  834. except Exception as e:
  835. self.log.exception("Fatal error in listener")
  836. await self.send_bridge_notice(
  837. "Fatal error in listener (see logs for more info)",
  838. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  839. important=True,
  840. error_code="ig-unknown-connection-error",
  841. info={"python_error": str(e)},
  842. )
  843. self.mqtt.disconnect()
  844. else:
  845. if not self.shutdown:
  846. await self.send_bridge_notice(
  847. "Instagram connection closed without error",
  848. state_event=BridgeStateEvent.UNKNOWN_ERROR,
  849. error_code="ig-disconnected",
  850. )
  851. finally:
  852. self._listen_task = None
  853. self._is_connected = False
  854. self._track_metric(METRIC_CONNECTED, False)
  855. async def stop_listen(self) -> None:
  856. if self.mqtt:
  857. self.shutdown = True
  858. self.mqtt.disconnect()
  859. if self._listen_task:
  860. await self._listen_task
  861. self.shutdown = False
  862. self._track_metric(METRIC_CONNECTED, False)
  863. self._is_connected = False
  864. await self.update()
  865. def stop_backfill_tasks(self) -> None:
  866. if self._backfill_loop_task:
  867. self._backfill_loop_task.cancel()
  868. self._backfill_loop_task = None
  869. if self._thread_sync_task:
  870. self._thread_sync_task.cancel()
  871. self._thread_sync_task = None
  872. async def logout(self, error: IGNotLoggedInError | None = None) -> None:
  873. await self.stop_listen()
  874. self.stop_backfill_tasks()
  875. if self.client and error is None:
  876. try:
  877. await self.client.logout(one_tap_app_login=False)
  878. except Exception:
  879. self.log.debug("Exception logging out", exc_info=True)
  880. if self.mqtt:
  881. self.mqtt.disconnect()
  882. self._track_metric(METRIC_CONNECTED, False)
  883. self._track_metric(METRIC_LOGGED_IN, False)
  884. if error is None:
  885. await self.push_bridge_state(BridgeStateEvent.LOGGED_OUT)
  886. puppet = await pu.Puppet.get_by_pk(self.igpk, create=False)
  887. if puppet and puppet.is_real_user:
  888. await puppet.switch_mxid(None, None)
  889. try:
  890. del self.by_igpk[self.igpk]
  891. except KeyError:
  892. pass
  893. self.igpk = None
  894. else:
  895. self.log.debug("Auth error body: %s", error.body.serialize())
  896. await self.send_bridge_notice(
  897. f"You have been logged out of Instagram: {error.proper_message}",
  898. important=True,
  899. state_event=BridgeStateEvent.BAD_CREDENTIALS,
  900. error_code="ig-auth-error",
  901. error_message=error.proper_message,
  902. )
  903. self.client = None
  904. self.mqtt = None
  905. self.state = None
  906. self.seq_id = None
  907. self.snapshot_at_ms = None
  908. self.thread_sync_completed = False
  909. self._is_logged_in = False
  910. await self.update()
  911. # endregion
  912. # region Event handlers
  913. async def _save_seq_id_after_sleep(self) -> None:
  914. await asyncio.sleep(120)
  915. self._seq_id_save_task = None
  916. self.log.trace("Saving sequence ID %d/%d", self.seq_id, self.snapshot_at_ms)
  917. try:
  918. await self.save_seq_id()
  919. except Exception:
  920. self.log.exception("Error saving sequence ID")
  921. async def update_seq_id(self, evt: NewSequenceID) -> None:
  922. self.seq_id = evt.seq_id
  923. self.snapshot_at_ms = evt.snapshot_at_ms
  924. if not self._seq_id_save_task or self._seq_id_save_task.done():
  925. self.log.trace("Starting seq id save task (%d/%d)", evt.seq_id, evt.snapshot_at_ms)
  926. self._seq_id_save_task = asyncio.create_task(self._save_seq_id_after_sleep())
  927. else:
  928. self.log.trace("Not starting seq id save task (%d/%d)", evt.seq_id, evt.snapshot_at_ms)
  929. @async_time(METRIC_MESSAGE)
  930. async def handle_message(self, evt: MessageSyncEvent) -> None:
  931. portal = await po.Portal.get_by_thread_id(evt.message.thread_id, receiver=self.igpk)
  932. if not portal or not portal.mxid:
  933. self.log.debug("Got message in thread with no portal, getting info...")
  934. resp = await self.client.get_thread(evt.message.thread_id)
  935. portal = await po.Portal.get_by_thread(resp.thread, self.igpk)
  936. self.log.debug("Got info for unknown portal, creating room")
  937. await portal.create_matrix_room(self, resp.thread)
  938. if not portal.mxid:
  939. self.log.warning(
  940. "Room creation appears to have failed, "
  941. f"dropping message in {evt.message.thread_id}"
  942. )
  943. return
  944. self.log.trace(f"Received message sync event {evt.message}")
  945. if evt.message.new_reaction:
  946. await portal.handle_instagram_reaction(
  947. evt.message, remove=evt.message.op == Operation.REMOVE
  948. )
  949. return
  950. sender = await pu.Puppet.get_by_pk(evt.message.user_id) if evt.message.user_id else None
  951. if evt.message.op == Operation.ADD:
  952. if not sender:
  953. # I don't think we care about adds with no sender
  954. return
  955. await portal.handle_instagram_item(self, sender, evt.message)
  956. elif evt.message.op == Operation.REMOVE:
  957. # Removes don't have a sender, only the message sender can unsend messages anyway
  958. await portal.handle_instagram_remove(evt.message.item_id)
  959. elif evt.message.op == Operation.REPLACE:
  960. await portal.handle_instagram_update(evt.message)
  961. @async_time(METRIC_THREAD_SYNC)
  962. async def handle_thread_sync(self, evt: ThreadSyncEvent) -> None:
  963. self.log.trace("Thread sync event content: %s", evt)
  964. portal = await po.Portal.get_by_thread(evt, receiver=self.igpk)
  965. if portal.mxid:
  966. self.log.debug("Got thread sync event for %s with existing portal", portal.thread_id)
  967. await portal.update_matrix_room(self, evt)
  968. elif evt.is_group:
  969. self.log.debug(
  970. "Got thread sync event for group %s without existing portal, creating room",
  971. portal.thread_id,
  972. )
  973. await portal.create_matrix_room(self, evt)
  974. else:
  975. self.log.debug(
  976. "Got thread sync event for DM %s without existing portal, ignoring",
  977. portal.thread_id,
  978. )
  979. @async_time(METRIC_RTD)
  980. async def handle_rtd(self, evt: RealtimeDirectEvent) -> None:
  981. if not isinstance(evt.value, ActivityIndicatorData):
  982. return
  983. now = int(time.time() * 1000)
  984. date = evt.value.timestamp_ms
  985. expiry = date + evt.value.ttl
  986. if expiry < now:
  987. return
  988. if evt.activity_indicator_id in self._activity_indicator_ids:
  989. return
  990. # TODO clear expired items from this dict
  991. self._activity_indicator_ids[evt.activity_indicator_id] = expiry
  992. puppet = await pu.Puppet.get_by_pk(int(evt.value.sender_id))
  993. portal = await po.Portal.get_by_thread_id(evt.thread_id, receiver=self.igpk)
  994. if not puppet or not portal or not portal.mxid:
  995. return
  996. is_typing = evt.value.activity_status != TypingStatus.OFF
  997. if puppet.pk == self.igpk:
  998. self.remote_typing_status = TypingStatus.TEXT if is_typing else TypingStatus.OFF
  999. await puppet.intent_for(portal).set_typing(
  1000. portal.mxid, is_typing=is_typing, timeout=evt.value.ttl
  1001. )
  1002. # endregion
  1003. # region Database getters
  1004. def _add_to_cache(self) -> None:
  1005. self.by_mxid[self.mxid] = self
  1006. if self.igpk:
  1007. self.by_igpk[self.igpk] = self
  1008. @classmethod
  1009. @async_getter_lock
  1010. async def get_by_mxid(cls, mxid: UserID, *, create: bool = True) -> User | None:
  1011. # Never allow ghosts to be users
  1012. if pu.Puppet.get_id_from_mxid(mxid):
  1013. return None
  1014. try:
  1015. return cls.by_mxid[mxid]
  1016. except KeyError:
  1017. pass
  1018. user = cast(cls, await super().get_by_mxid(mxid))
  1019. if user is not None:
  1020. user._add_to_cache()
  1021. return user
  1022. if create:
  1023. user = cls(mxid)
  1024. await user.insert()
  1025. user._add_to_cache()
  1026. return user
  1027. return None
  1028. @classmethod
  1029. @async_getter_lock
  1030. async def get_by_igpk(cls, igpk: int) -> User | None:
  1031. try:
  1032. return cls.by_igpk[igpk]
  1033. except KeyError:
  1034. pass
  1035. user = cast(cls, await super().get_by_igpk(igpk))
  1036. if user is not None:
  1037. user._add_to_cache()
  1038. return user
  1039. return None
  1040. @classmethod
  1041. async def all_logged_in(cls) -> AsyncGenerator[User, None]:
  1042. users = await super().all_logged_in()
  1043. user: cls
  1044. for index, user in enumerate(users):
  1045. try:
  1046. yield cls.by_mxid[user.mxid]
  1047. except KeyError:
  1048. user._add_to_cache()
  1049. yield user
  1050. # endregion