123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251 |
- # mautrix-instagram - A Matrix-Instagram puppeting bridge.
- # Copyright (C) 2023 Tulir Asokan
- #
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU Affero General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Affero General Public License for more details.
- #
- # You should have received a copy of the GNU Affero General Public License
- # along with this program. If not, see <https://www.gnu.org/licenses/>.
- from __future__ import annotations
- from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable, Callable, cast
- from datetime import datetime, timedelta
- from functools import partial
- import asyncio
- import logging
- import time
- from mauigpapi import AndroidAPI, AndroidMQTT, AndroidState
- from mauigpapi.errors import (
- IGChallengeError,
- IGCheckpointError,
- IGConsentRequiredError,
- IGNotLoggedInError,
- IGRateLimitError,
- IGResponseError,
- IGUnknownError,
- IGUserIDNotFoundError,
- IrisSubscribeError,
- MQTTConnectionUnauthorized,
- MQTTNotConnected,
- MQTTNotLoggedIn,
- MQTTReconnectionError,
- )
- from mauigpapi.mqtt import (
- Connect,
- Disconnect,
- GraphQLSubscription,
- NewSequenceID,
- ProxyUpdate,
- SkywalkerSubscription,
- )
- from mauigpapi.types import (
- ActivityIndicatorData,
- CurrentUser,
- MessageSyncEvent,
- Operation,
- RealtimeDirectEvent,
- Thread,
- ThreadRemoveEvent,
- ThreadSyncEvent,
- TypingStatus,
- )
- from mauigpapi.types.direct_inbox import DMInbox, DMInboxResponse
- from mautrix.appservice import AppService
- from mautrix.bridge import BaseUser, async_getter_lock
- from mautrix.types import EventID, MessageType, RoomID, TextMessageEventContent, UserID
- from mautrix.util import background_task
- from mautrix.util.bridge_state import BridgeState, BridgeStateEvent
- from mautrix.util.logging import TraceLogger
- from mautrix.util.opt_prometheus import Gauge, Summary, async_time
- from mautrix.util.proxy import RETRYABLE_PROXY_EXCEPTIONS, ProxyHandler
- from mautrix.util.simple_lock import SimpleLock
- from . import portal as po, puppet as pu
- from .config import Config
- from .db import Backfill, Message as DBMessage, Portal as DBPortal, User as DBUser
- if TYPE_CHECKING:
- from .__main__ import InstagramBridge
- METRIC_MESSAGE = Summary("bridge_on_message", "calls to handle_message")
- METRIC_THREAD_SYNC = Summary("bridge_on_thread_sync", "calls to handle_thread_sync")
- METRIC_RTD = Summary("bridge_on_rtd", "calls to handle_rtd")
- METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
- METRIC_CONNECTED = Gauge("bridge_connected", "Bridged users connected to Instagram")
- BridgeState.human_readable_errors.update(
- {
- "ig-connection-error": "Instagram disconnected unexpectedly",
- "ig-refresh-connection-error": "Reconnecting failed again after refresh: {message}",
- "ig-connection-fatal-error": "Instagram disconnected unexpectedly",
- "ig-auth-error": "Authentication error from Instagram: {message}, please login again to continue",
- "ig-checkpoint": "Instagram checkpoint error. Please check the Instagram website.",
- "ig-consent-required": "Instagram requires a consent update. Please check the Instagram website.",
- "ig-checkpoint-locked": "Instagram checkpoint error. Please check the Instagram website.",
- "ig-rate-limit": "Got Instagram ratelimit error, waiting a few minutes before retrying...",
- "ig-disconnected": None,
- "logged-out": "You've been logged out of instagram, please login again to continue",
- }
- )
- class User(DBUser, BaseUser):
- ig_base_log: TraceLogger = logging.getLogger("mau.instagram")
- _activity_indicator_ids: dict[str, int] = {}
- by_mxid: dict[UserID, User] = {}
- by_igpk: dict[int, User] = {}
- config: Config
- az: AppService
- loop: asyncio.AbstractEventLoop
- client: AndroidAPI | None
- mqtt: AndroidMQTT | None
- _listen_task: asyncio.Task | None = None
- _sync_lock: SimpleLock
- _backfill_loop_task: asyncio.Task | None
- _thread_sync_task: asyncio.Task | None
- _seq_id_save_task: asyncio.Task | None
- _message_error_login_last_recheck: float
- permission_level: str
- username: str | None
- _notice_room_lock: asyncio.Lock
- _notice_send_lock: asyncio.Lock
- _is_logged_in: bool
- _is_connected: bool
- shutdown: bool
- remote_typing_status: TypingStatus | None
- def __init__(
- self,
- mxid: UserID,
- igpk: int | None = None,
- state: AndroidState | None = None,
- notice_room: RoomID | None = None,
- seq_id: int | None = None,
- snapshot_at_ms: int | None = None,
- oldest_cursor: str | None = None,
- total_backfilled_portals: int | None = None,
- thread_sync_completed: bool = False,
- ) -> None:
- super().__init__(
- mxid=mxid,
- igpk=igpk,
- state=state,
- notice_room=notice_room,
- seq_id=seq_id,
- snapshot_at_ms=snapshot_at_ms,
- oldest_cursor=oldest_cursor,
- total_backfilled_portals=total_backfilled_portals,
- thread_sync_completed=thread_sync_completed,
- )
- BaseUser.__init__(self)
- self._notice_room_lock = asyncio.Lock()
- self._notice_send_lock = asyncio.Lock()
- self.command_status = None
- perms = self.config.get_permissions(mxid)
- self.relay_whitelisted, self.is_whitelisted, self.is_admin, self.permission_level = perms
- self.client = None
- self.mqtt = None
- self.username = None
- self._message_error_login_last_recheck = 0
- self._is_logged_in = False
- self._is_connected = False
- self._is_refreshing = False
- self.shutdown = False
- self._sync_lock = SimpleLock(
- "Waiting for thread sync to finish before handling %s", log=self.log
- )
- self._listen_task = None
- self._thread_sync_task = None
- self._backfill_loop_task = None
- self.remote_typing_status = None
- self._seq_id_save_task = None
- self._message_error_login_last_recheck = 0
- self.proxy_handler = ProxyHandler(
- api_url=self.config["bridge.get_proxy_api_url"],
- )
- @classmethod
- def init_cls(cls, bridge: "InstagramBridge") -> AsyncIterable[Awaitable[None]]:
- cls.bridge = bridge
- cls.config = bridge.config
- cls.az = bridge.az
- cls.loop = bridge.loop
- return (user.try_connect() async for user in cls.all_logged_in())
- # region Connection management
- async def is_logged_in(self) -> bool:
- return bool(self.client) and self._is_logged_in
- async def get_puppet(self) -> pu.Puppet | None:
- if not self.igpk:
- return None
- return await pu.Puppet.get_by_pk(self.igpk)
- async def get_portal_with(self, puppet: pu.Puppet, create: bool = True) -> po.Portal | None:
- if not self.igpk:
- return None
- portal = await po.Portal.find_private_chat(self.igpk, puppet.pk)
- if portal:
- return portal
- if create:
- # TODO add error handling somewhere
- thread = await self.client.create_group_thread([puppet.pk])
- portal = await po.Portal.get_by_thread(thread, self.igpk)
- await portal.update_info(thread, self)
- return portal
- return None
- async def try_connect(self) -> None:
- while True:
- try:
- await self.connect()
- except RETRYABLE_PROXY_EXCEPTIONS as e:
- # These are retried by the client up to 10 times, but we actually want to retry
- # these indefinitely so we capture them here again and retry.
- self.log.warning(
- f"Proxy error connecting to Instagram: {e}, retrying in 1 minute",
- )
- await asyncio.sleep(60)
- continue
- except Exception as e:
- self.log.exception("Error while connecting to Instagram")
- await self.push_bridge_state(
- BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
- )
- return
- @property
- def api_log(self) -> TraceLogger:
- return self.ig_base_log.getChild("http").getChild(self.mxid)
- @property
- def is_connected(self) -> bool:
- return bool(self.client) and bool(self.mqtt) and self._is_connected
- async def connect(self, user: CurrentUser | None = None) -> None:
- if not self.state:
- await self.push_bridge_state(
- BridgeStateEvent.BAD_CREDENTIALS,
- error="logged-out",
- info={"cnd_action": "reauth"},
- )
- return
- client = AndroidAPI(
- self.state,
- log=self.api_log,
- proxy_handler=self.proxy_handler,
- on_proxy_update=self.on_proxy_update,
- on_response_error=self.on_response_error,
- )
- if not user:
- try:
- resp = await client.current_user()
- user = resp.user
- except IGNotLoggedInError as e:
- self.log.warning(f"Failed to connect to Instagram: {e}, logging out")
- await self.logout(error=e)
- return
- except IGCheckpointError as e:
- self.log.debug("Checkpoint error content: %s", e.body)
- raise
- except (IGChallengeError, IGConsentRequiredError) as e:
- await self._handle_checkpoint(e, on="connect", client=client)
- return
- self.client = client
- self._is_logged_in = True
- self.igpk = user.pk
- self.username = user.username
- self._message_error_login_last_recheck = 0
- await self.push_bridge_state(BridgeStateEvent.CONNECTING)
- self._track_metric(METRIC_LOGGED_IN, True)
- self.by_igpk[self.igpk] = self
- self.mqtt = AndroidMQTT(
- self.state,
- log=self.ig_base_log.getChild("mqtt").getChild(self.mxid),
- proxy_handler=self.proxy_handler,
- mqtt_keepalive=self.config["instagram.mqtt_keepalive"],
- )
- self.mqtt.add_event_handler(Connect, self.on_connect)
- self.mqtt.add_event_handler(Disconnect, self.on_disconnect)
- self.mqtt.add_event_handler(NewSequenceID, self.update_seq_id)
- self.mqtt.add_event_handler(MessageSyncEvent, self.handle_message)
- self.mqtt.add_event_handler(ThreadSyncEvent, self.handle_thread_sync)
- self.mqtt.add_event_handler(ThreadRemoveEvent, self.handle_thread_remove)
- self.mqtt.add_event_handler(RealtimeDirectEvent, self.handle_rtd)
- self.mqtt.add_event_handler(ProxyUpdate, self.on_proxy_update)
- await self.update()
- self.loop.create_task(self._try_sync_puppet(user))
- self.loop.create_task(self._post_connect())
- async def _post_connect(self):
- # Backfill requests are handled synchronously so as not to overload the homeserver.
- # Users can configure their backfill stages to be more or less aggressive with backfilling
- # to try and avoid getting banned.
- if not self._backfill_loop_task or self._backfill_loop_task.done():
- self._backfill_loop_task = asyncio.create_task(self._handle_backfill_requests_loop())
- if not self.seq_id:
- await self._try_sync()
- else:
- self.log.debug("Connecting to MQTT directly as resync_on_startup is false")
- self.start_listen()
- if self.config["bridge.backfill.enable"]:
- if self._thread_sync_task and not self._thread_sync_task.done():
- self.log.warning("Cancelling existing background thread sync task")
- self._thread_sync_task.cancel()
- self._thread_sync_task = asyncio.create_task(self.backfill_threads())
- if self.bridge.homeserver_software.is_hungry:
- self.log.info("Updating contact info for all users")
- asyncio.gather(*[puppet.update_contact_info() async for puppet in pu.Puppet.get_all()])
- async def _handle_backfill_requests_loop(self) -> None:
- if not self.config["bridge.backfill.enable"] or not self.config["bridge.backfill.msc2716"]:
- return
- while True:
- await self._sync_lock.wait("backfill request")
- req = await Backfill.get_next(self.mxid)
- if not req:
- await asyncio.sleep(30)
- continue
- self.log.info("Backfill request %s", req)
- try:
- portal = await po.Portal.get_by_thread_id(
- req.portal_thread_id, receiver=req.portal_receiver
- )
- await req.mark_dispatched()
- await portal.backfill(self, req)
- await req.mark_done()
- except IGNotLoggedInError as e:
- self.log.exception("User got logged out during backfill loop")
- break
- except (IGChallengeError, IGConsentRequiredError) as e:
- self.log.exception("User got a challenge during backfill loop")
- await self._handle_checkpoint(e, on="backfill")
- break
- except Exception as e:
- self.log.exception("Failed to backfill portal %s: %s", req.portal_thread_id, e)
- # Don't try again to backfill this portal for a minute.
- await req.set_cooldown_timeout(60)
- self._backfill_loop_task = None
- async def on_connect(self, evt: Connect) -> None:
- self.log.debug("Connected to Instagram")
- self._track_metric(METRIC_CONNECTED, True)
- self._is_connected = True
- await self.send_bridge_notice("Connected to Instagram")
- await self.push_bridge_state(BridgeStateEvent.CONNECTED)
- async def on_disconnect(self, evt: Disconnect) -> None:
- self.log.debug("Disconnected from Instagram")
- self._track_metric(METRIC_CONNECTED, False)
- self._is_connected = False
- async def on_proxy_update(self, evt: ProxyUpdate | None = None) -> None:
- if self.client:
- self.client.setup_http(self.state.cookies.jar)
- if self.mqtt:
- self.mqtt.setup_proxy()
- if self.command_status:
- self.command_status["api"].setup_http(self.command_status["state"].cookies.jar)
- async def on_response_error(self, err: IGResponseError) -> None:
- if isinstance(err, IGNotLoggedInError) and (await self.is_logged_in()):
- self.log.warning(f"Noticed logout in API error response: {err}")
- await self.logout(error=err)
- # TODO this stuff could probably be moved to mautrix-python
- async def get_notice_room(self) -> RoomID:
- if not self.notice_room:
- async with self._notice_room_lock:
- # If someone already created the room while this call was waiting,
- # don't make a new room
- if self.notice_room:
- return self.notice_room
- creation_content = {}
- if not self.config["bridge.federate_rooms"]:
- creation_content["m.federate"] = False
- self.notice_room = await self.az.intent.create_room(
- is_direct=True,
- invitees=[self.mxid],
- topic="Instagram bridge notices",
- creation_content=creation_content,
- )
- await self.update()
- return self.notice_room
- async def fill_bridge_state(self, state: BridgeState) -> None:
- await super().fill_bridge_state(state)
- if not state.remote_id:
- if self.igpk:
- state.remote_id = str(self.igpk)
- else:
- try:
- state.remote_id = self.state.user_id
- except IGUserIDNotFoundError:
- state.remote_id = None
- if self.username:
- state.remote_name = f"@{self.username}"
- async def get_bridge_states(self) -> list[BridgeState]:
- if not self.state:
- return []
- state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR)
- if self.is_connected:
- state.state_event = BridgeStateEvent.CONNECTED
- elif self._is_refreshing or self.mqtt:
- state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
- return [state]
- async def send_bridge_notice(
- self,
- text: str,
- edit: EventID | None = None,
- state_event: BridgeStateEvent | None = None,
- important: bool = False,
- error_code: str | None = None,
- error_message: str | None = None,
- info: dict | None = None,
- ) -> EventID | None:
- if state_event:
- await self.push_bridge_state(
- state_event,
- error=error_code,
- message=error_message if error_code else text,
- info=info,
- )
- if self.config["bridge.disable_bridge_notices"]:
- return None
- if not important and not self.config["bridge.unimportant_bridge_notices"]:
- self.log.debug("Not sending unimportant bridge notice: %s", text)
- return None
- event_id = None
- try:
- self.log.debug("Sending bridge notice: %s", text)
- content = TextMessageEventContent(
- body=text, msgtype=(MessageType.TEXT if important else MessageType.NOTICE)
- )
- if edit:
- content.set_edit(edit)
- # This is locked to prevent notices going out in the wrong order
- async with self._notice_send_lock:
- event_id = await self.az.intent.send_message(await self.get_notice_room(), content)
- except Exception:
- self.log.warning("Failed to send bridge notice", exc_info=True)
- return edit or event_id
- async def _try_sync_puppet(self, user_info: CurrentUser) -> None:
- puppet = await pu.Puppet.get_by_pk(self.igpk)
- try:
- if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
- self.log.info("Automatically enabling custom puppet")
- await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
- except Exception:
- self.log.exception("Failed to automatically enable custom puppet")
- try:
- await puppet.update_info(user_info, self)
- except Exception:
- self.log.exception("Failed to update own puppet info")
- async def _try_sync(self) -> None:
- try:
- await self.sync()
- except Exception as e:
- self.log.exception("Exception while syncing")
- if isinstance(e, IGCheckpointError):
- self.log.debug("Checkpoint error content: %s", e.body)
- await self.push_bridge_state(
- BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
- )
- async def get_direct_chats(self) -> dict[UserID, list[RoomID]]:
- return {
- pu.Puppet.get_mxid_from_id(portal.other_user_pk): [portal.mxid]
- for portal in await DBPortal.find_private_chats_of(self.igpk)
- if portal.mxid
- }
- async def refresh(self, resync: bool = True, update_proxy: bool = False) -> None:
- self._is_refreshing = True
- try:
- await self.stop_listen()
- self.state.reset_pigeon_session_id()
- if update_proxy and self.proxy_handler.update_proxy_url(reason="reconnect"):
- await self.on_proxy_update()
- if resync:
- retry_count = 0
- minutes = 1
- while True:
- try:
- await self.sync()
- return
- except Exception as e:
- if retry_count >= 4 and minutes < 10:
- minutes += 1
- retry_count += 1
- s = "s" if minutes != 1 else ""
- self.log.exception(
- f"Error while syncing for refresh, retrying in {minutes} minute{s}"
- )
- if isinstance(e, IGCheckpointError):
- self.log.debug("Checkpoint error content: %s", e.body)
- await self.push_bridge_state(
- BridgeStateEvent.UNKNOWN_ERROR,
- error="unknown-error",
- message="An unknown error occurred while connecting to Instagram",
- info={"python_error": str(e)},
- )
- await asyncio.sleep(minutes * 60)
- else:
- self.start_listen()
- finally:
- self._is_refreshing = False
- async def _handle_checkpoint(
- self,
- e: IGChallengeError | IGConsentRequiredError,
- on: str,
- client: AndroidAPI | None = None,
- ) -> None:
- self.log.warning(f"Got checkpoint error on {on}: {e.body.serialize()}")
- client = client or self.client
- self.client = None
- self.mqtt = None
- if isinstance(e, IGConsentRequiredError):
- await self.push_bridge_state(
- BridgeStateEvent.BAD_CREDENTIALS,
- error="ig-consent-required",
- info=e.body.serialize(),
- )
- return
- error_code = "ig-checkpoint"
- try:
- resp = await client.challenge_reset()
- info = {
- "challenge_context": (
- resp.challenge_context.serialize() if resp.challenge_context_str else None
- ),
- "step_name": resp.step_name,
- "step_data": resp.step_data.serialize() if resp.step_data else None,
- "user_id": resp.user_id,
- "action": resp.action,
- "status": resp.status,
- "challenge": e.body.challenge.serialize() if e.body.challenge else None,
- }
- self.log.debug(f"Challenge state: {resp.serialize()}")
- if resp.challenge_context.challenge_type_enum == "HACKED_LOCK":
- error_code = "ig-checkpoint-locked"
- except Exception:
- self.log.exception("Error resetting challenge state")
- info = {"challenge": e.body.challenge.serialize() if e.body.challenge else None}
- await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error=error_code, info=info)
- async def _sync_thread(
- self, thread: Thread, enqueue_backfill: bool = True, portal: po.Portal | None = None
- ) -> bool:
- """
- Sync a specific thread. Returns whether the thread had messages after the last message in
- the database before the sync.
- """
- self.log.debug(f"Syncing thread {thread.thread_id}")
- forward_messages = thread.items
- assert self.client
- if not portal:
- portal = await po.Portal.get_by_thread(thread, self.igpk)
- assert portal
- else:
- assert portal.thread_id == thread.thread_id
- # Create or update the Matrix room
- if not portal.mxid:
- await portal.create_matrix_room(self, thread)
- else:
- await portal.update_matrix_room(self, thread)
- if not self.config["bridge.backfill.enable_initial"]:
- return True
- last_message = await DBMessage.get_last(portal.mxid)
- cursor = thread.oldest_cursor
- if last_message:
- original_number_of_messages = len(thread.items)
- new_messages = [
- m for m in thread.items if last_message.ig_timestamp_ms < m.timestamp_ms
- ]
- forward_messages = new_messages
- portal.log.debug(
- f"{len(new_messages)}/{original_number_of_messages} messages are after most recent"
- " message."
- )
- # Fetch more messages until we get back to messages that have been bridged already.
- while len(new_messages) > 0 and len(new_messages) == original_number_of_messages:
- await asyncio.sleep(self.config["bridge.backfill.incremental.page_delay"])
- portal.log.debug("Fetching more messages for forward backfill")
- resp = await self.client.get_thread(portal.thread_id, cursor=cursor)
- if len(resp.thread.items) == 0:
- break
- original_number_of_messages = len(resp.thread.items)
- new_messages = [
- m for m in resp.thread.items if last_message.ig_timestamp_ms < m.timestamp_ms
- ]
- forward_messages = new_messages + forward_messages
- cursor = resp.thread.oldest_cursor
- portal.log.debug(
- f"{len(new_messages)}/{original_number_of_messages} messages are after most "
- "recent message."
- )
- elif not portal.first_event_id:
- self.log.debug(
- f"Skipping backfilling {portal.thread_id} as the first event ID is not known"
- )
- return False
- if forward_messages:
- portal.cursor = cursor
- await portal.update()
- mark_read = thread.read_state == 0 or (
- (hours := self.config["bridge.backfill.unread_hours_threshold"]) > 0
- and (
- datetime.fromtimestamp(forward_messages[0].timestamp_ms / 1000)
- < datetime.now() - timedelta(hours=hours)
- )
- )
- base_insertion_event_id = await portal.backfill_message_page(
- self,
- list(reversed(forward_messages)),
- forward=True,
- last_message=last_message,
- mark_read=mark_read,
- )
- if (
- not self.bridge.homeserver_software.is_hungry
- and self.config["bridge.backfill.msc2716"]
- ):
- await portal.send_post_backfill_dummy(
- forward_messages[0].timestamp, base_insertion_event_id=base_insertion_event_id
- )
- if (
- mark_read
- and not self.bridge.homeserver_software.is_hungry
- and (puppet := await self.get_puppet())
- ):
- last_message = await DBMessage.get_last(portal.mxid)
- if last_message:
- await puppet.intent_for(portal).mark_read(portal.mxid, last_message.mxid)
- await portal._update_read_receipts(thread.last_seen_at)
- if self.config["bridge.backfill.msc2716"] and enqueue_backfill:
- await portal.enqueue_immediate_backfill(self, 1)
- return len(forward_messages) > 0
- async def sync(self, increment_total_backfilled_portals: bool = False) -> None:
- await self.run_with_sync_lock(partial(self._sync, increment_total_backfilled_portals))
- async def _sync(self, increment_total_backfilled_portals: bool = False) -> None:
- if not self._listen_task:
- self.state.reset_pigeon_session_id()
- sleep_minutes = 2
- while True:
- try:
- resp = await self.client.get_inbox()
- break
- except IGNotLoggedInError as e:
- self.log.exception("Got not logged in error while syncing")
- return
- except IGRateLimitError as e:
- self.log.error(
- "Got ratelimit error while trying to get inbox (%s), retrying in %d minutes",
- e.body,
- sleep_minutes,
- )
- await self.push_bridge_state(
- BridgeStateEvent.TRANSIENT_DISCONNECT, error="ig-rate-limit"
- )
- await asyncio.sleep(sleep_minutes * 60)
- sleep_minutes += 2
- except IGCheckpointError as e:
- self.log.debug("Checkpoint error content: %s", e.body)
- raise
- except (IGChallengeError, IGConsentRequiredError) as e:
- await self._handle_checkpoint(e, on="sync")
- return
- self.seq_id = resp.seq_id
- self.snapshot_at_ms = resp.snapshot_at_ms
- await self.save_seq_id()
- if not self._listen_task:
- self.start_listen(is_after_sync=True)
- sync_count = min(
- self.config["bridge.backfill.max_conversations"],
- self.config["bridge.max_startup_thread_sync_count"],
- )
- self.log.debug(f"Fetching {sync_count} threads, 20 at a time...")
- local_limit: int | None = sync_count
- if sync_count == 0:
- return
- elif sync_count < 0:
- local_limit = None
- await self._sync_threads_with_delay(
- self.client.iter_inbox(
- self._update_seq_id_and_cursor, start_at=resp, local_limit=local_limit
- ),
- stop_when_threads_have_no_messages_to_backfill=True,
- increment_total_backfilled_portals=increment_total_backfilled_portals,
- local_limit=local_limit,
- )
- try:
- await self.update_direct_chats()
- except Exception:
- self.log.exception("Error updating direct chat list")
- async def backfill_threads(self):
- try:
- await self.run_with_sync_lock(self._backfill_threads)
- except Exception:
- self.log.exception("Error in thread backfill loop")
- async def _backfill_threads(self):
- assert self.client
- if not self.config["bridge.backfill.enable"]:
- return
- max_conversations = self.config["bridge.backfill.max_conversations"] or 0
- if 0 <= max_conversations <= (self.total_backfilled_portals or 0):
- self.log.info("Backfill max_conversations count reached, not syncing any more portals")
- return
- elif self.thread_sync_completed:
- self.log.debug("Thread backfill is marked as completed, not syncing more portals")
- return
- local_limit = (
- max_conversations - (self.total_backfilled_portals or 0)
- if max_conversations >= 0
- else None
- )
- start_at = None
- if self.oldest_cursor:
- start_at = DMInboxResponse(
- status="",
- seq_id=self.seq_id,
- snapshot_at_ms=0,
- pending_requests_total=0,
- has_pending_top_requests=False,
- viewer=None,
- inbox=DMInbox(
- threads=[],
- has_older=True,
- unseen_count=0,
- unseen_count_ts=0,
- blended_inbox_enabled=False,
- oldest_cursor=self.oldest_cursor,
- ),
- )
- backoff = self.config.get("bridge.backfill.backoff.thread_list", 300)
- await self._sync_threads_with_delay(
- self.client.iter_inbox(
- self._update_seq_id_and_cursor,
- start_at=start_at,
- local_limit=local_limit,
- rate_limit_exceeded_backoff=backoff,
- ),
- increment_total_backfilled_portals=True,
- local_limit=local_limit,
- )
- await self.update_direct_chats()
- def _update_seq_id_and_cursor(self, seq_id: int, cursor: str | None):
- self.seq_id = seq_id
- if cursor:
- self.oldest_cursor = cursor
- async def _sync_threads_with_delay(
- self,
- threads: AsyncIterable[Thread],
- increment_total_backfilled_portals: bool = False,
- stop_when_threads_have_no_messages_to_backfill: bool = False,
- local_limit: int | None = None,
- ):
- sync_delay = self.config["bridge.backfill.min_sync_thread_delay"]
- last_thread_sync_ts = 0.0
- found_thread_count = 0
- async for thread in threads:
- found_thread_count += 1
- now = time.monotonic()
- if now < last_thread_sync_ts + sync_delay:
- delay = last_thread_sync_ts + sync_delay - now
- self.log.debug("Thread sync is happening too quickly. Waiting for %ds", delay)
- await asyncio.sleep(delay)
- last_thread_sync_ts = time.monotonic()
- had_new_messages = await self._sync_thread(thread)
- if not had_new_messages and stop_when_threads_have_no_messages_to_backfill:
- self.log.debug("Got to threads with no new messages. Stopping sync.")
- return
- if increment_total_backfilled_portals:
- self.total_backfilled_portals = (self.total_backfilled_portals or 0) + 1
- await self.update()
- if local_limit is None or found_thread_count < local_limit:
- if local_limit is None:
- self.log.info(
- "Reached end of thread list with no limit, marking thread sync as completed"
- )
- else:
- self.log.info(
- f"Reached end of thread list (got {found_thread_count} with "
- f"limit {local_limit}), marking thread sync as completed"
- )
- self.thread_sync_completed = True
- await self.update()
- async def run_with_sync_lock(self, func: Callable[[], Awaitable]):
- with self._sync_lock:
- retry_count = 0
- while retry_count < 5:
- try:
- retry_count += 1
- await func()
- # The sync was successful. Exit the loop.
- return
- except IGNotLoggedInError as e:
- return
- except Exception:
- self.log.exception(
- "Failed to sync threads. Waiting 30 seconds before retrying sync."
- )
- await asyncio.sleep(30)
- # If we get here, it means that the sync has failed five times. If this happens, most
- # likely something very bad has happened.
- self.log.error("Failed to sync threads five times. Will not retry.")
- def start_listen(self, is_after_sync: bool = False) -> None:
- self.shutdown = False
- task = self._listen(
- seq_id=self.seq_id, snapshot_at_ms=self.snapshot_at_ms, is_after_sync=is_after_sync
- )
- self._listen_task = self.loop.create_task(task)
- async def delayed_start_listen(self, sleep: int) -> None:
- await asyncio.sleep(sleep)
- if self.is_connected:
- self.log.debug(
- "Already reconnected before delay after MQTT reconnection error finished"
- )
- else:
- self.log.debug("Reconnecting after MQTT connection error")
- self.start_listen()
- async def fetch_user_and_reconnect(self, sleep_first: int | None = None) -> None:
- if sleep_first:
- await asyncio.sleep(sleep_first)
- if self.is_connected:
- self.log.debug("Canceling user fetch, already reconnected")
- return
- self.log.debug("Refetching current user after disconnection")
- errors = 0
- while True:
- try:
- resp = await self.client.current_user()
- except RETRYABLE_PROXY_EXCEPTIONS as e:
- # These are retried by the client up to 10 times, but we actually want to retry
- # these indefinitely so we capture them here again and retry.
- self.log.warning(
- f"Proxy error fetching user from Instagram: {e}, retrying in 1 minute",
- )
- await asyncio.sleep(60)
- except IGNotLoggedInError as e:
- self.log.warning(f"Failed to reconnect to Instagram: {e}, logging out")
- return
- except (IGChallengeError, IGConsentRequiredError) as e:
- await self._handle_checkpoint(e, on="reconnect")
- return
- except IGUnknownError:
- errors += 1
- if errors > 10:
- raise
- self.log.warning(
- "Non-JSON body while trying to check user for reconnection, retrying in 10s"
- )
- await asyncio.sleep(10)
- except Exception as e:
- self.log.exception("Error while reconnecting to Instagram")
- if isinstance(e, IGCheckpointError):
- self.log.debug("Checkpoint error content: %s", e.body)
- await self.push_bridge_state(
- BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
- )
- return
- else:
- self.log.debug(f"Confirmed current user {resp.user.pk}")
- self.start_listen()
- return
- async def _listen(self, seq_id: int, snapshot_at_ms: int, is_after_sync: bool) -> None:
- try:
- await self.mqtt.listen(
- graphql_subs={
- GraphQLSubscription.app_presence(),
- GraphQLSubscription.direct_typing(self.state.user_id),
- GraphQLSubscription.direct_status(),
- },
- skywalker_subs={
- SkywalkerSubscription.direct_sub(self.state.user_id),
- SkywalkerSubscription.live_sub(self.state.user_id),
- },
- seq_id=seq_id,
- snapshot_at_ms=snapshot_at_ms,
- )
- except IrisSubscribeError as e:
- if is_after_sync:
- self.log.exception("Got IrisSubscribeError right after refresh")
- await self.send_bridge_notice(
- f"Reconnecting failed again after refresh: {e}",
- important=True,
- state_event=BridgeStateEvent.UNKNOWN_ERROR,
- error_code="ig-refresh-connection-error",
- error_message=str(e),
- info={"python_error": str(e)},
- )
- else:
- self.log.warning(f"Got IrisSubscribeError {e}, refreshing...")
- background_task.create(self.refresh())
- except MQTTReconnectionError as e:
- self.log.warning(
- f"Unexpected connection error: {e}, reconnecting in 1 minute", exc_info=True
- )
- await self.send_bridge_notice(
- f"Error in listener: {e}",
- important=True,
- state_event=BridgeStateEvent.TRANSIENT_DISCONNECT,
- error_code="ig-connection-error-socket",
- )
- self.mqtt.disconnect()
- background_task.create(self.delayed_start_listen(sleep=60))
- except (MQTTNotConnected, MQTTNotLoggedIn, MQTTConnectionUnauthorized) as e:
- self.log.warning(f"Unexpected connection error: {e}, checking auth and reconnecting")
- await self.send_bridge_notice(
- f"Error in listener: {e}",
- important=True,
- state_event=BridgeStateEvent.TRANSIENT_DISCONNECT,
- error_code="ig-connection-error-maybe-auth",
- )
- self.mqtt.disconnect()
- background_task.create(self.fetch_user_and_reconnect(sleep_first=60))
- except Exception as e:
- self.log.exception("Fatal error in listener, reconnecting in 5 minutes")
- await self.send_bridge_notice(
- "Fatal error in listener (see logs for more info)",
- state_event=BridgeStateEvent.UNKNOWN_ERROR,
- important=True,
- error_code="ig-unknown-connection-error",
- info={"python_error": str(e)},
- )
- self.mqtt.disconnect()
- background_task.create(self.fetch_user_and_reconnect(sleep_first=300))
- else:
- if not self.shutdown:
- await self.send_bridge_notice(
- "Instagram connection closed without error",
- state_event=BridgeStateEvent.UNKNOWN_ERROR,
- error_code="ig-disconnected",
- )
- finally:
- self._listen_task = None
- self._is_connected = False
- self._track_metric(METRIC_CONNECTED, False)
- async def stop_listen(self) -> None:
- if self.mqtt:
- self.shutdown = True
- self.mqtt.disconnect()
- if self._listen_task:
- await self._listen_task
- self.shutdown = False
- self._track_metric(METRIC_CONNECTED, False)
- self._is_connected = False
- await self.update()
- def stop_backfill_tasks(self) -> None:
- if self._backfill_loop_task:
- self._backfill_loop_task.cancel()
- self._backfill_loop_task = None
- if self._thread_sync_task:
- self._thread_sync_task.cancel()
- self._thread_sync_task = None
- async def message_fail_login_check(self) -> None:
- if self._message_error_login_last_recheck + 300 > time.monotonic():
- self.log.warning(
- "Not rechecking login as it's less than 5 minutes since the last check"
- )
- return
- self._message_error_login_last_recheck = time.monotonic()
- try:
- user = await self.client.current_user()
- except Exception as e:
- if isinstance(e, IGNotLoggedInError):
- self.log.info(f"Got ThreadUserIdDoesNotExist and whoami failed as expected")
- # This is handled by on_response_error
- else:
- self.log.warning(
- f"Got ThreadUserIdDoesNotExist and whoami failed, but with unexpected error",
- exc_info=True,
- )
- if isinstance(e, IGResponseError):
- await self.logout(e)
- else:
- self.log.warning(
- f"Got ThreadUserIdDoesNotExist error, but whoami call is fine ({user.user.pk}"
- )
- async def logout(self, error: IGResponseError | None = None) -> None:
- await self.stop_listen()
- self.stop_backfill_tasks()
- if self.client and error is None:
- try:
- await self.client.logout(one_tap_app_login=False)
- except Exception:
- self.log.debug("Exception logging out", exc_info=True)
- if self.mqtt:
- self.mqtt.disconnect()
- self._track_metric(METRIC_CONNECTED, False)
- self._track_metric(METRIC_LOGGED_IN, False)
- if error is None:
- await self.push_bridge_state(BridgeStateEvent.LOGGED_OUT)
- puppet = await pu.Puppet.get_by_pk(self.igpk, create=False)
- if puppet and puppet.is_real_user:
- await puppet.switch_mxid(None, None)
- try:
- del self.by_igpk[self.igpk]
- except KeyError:
- pass
- self.igpk = None
- else:
- self.log.debug("Auth error body: %s", error.body.serialize())
- message = (
- error.proper_message if isinstance(error, IGNotLoggedInError) else "unknown error"
- )
- await self.send_bridge_notice(
- f"You have been logged out of Instagram: {message}",
- important=True,
- state_event=BridgeStateEvent.BAD_CREDENTIALS,
- error_code="ig-auth-error",
- error_message=message,
- info={"cnd_action": "reauth"},
- )
- self.client = None
- self.mqtt = None
- self.state = None
- self.seq_id = None
- if self._seq_id_save_task and not self._seq_id_save_task.done():
- self._seq_id_save_task.cancel()
- self._seq_id_save_task = None
- self.snapshot_at_ms = None
- self.thread_sync_completed = False
- self._is_logged_in = False
- await self.update()
- # endregion
- # region Event handlers
- async def _save_seq_id_after_sleep(self) -> None:
- await asyncio.sleep(120)
- if self.seq_id is None:
- return
- self._seq_id_save_task = None
- self.log.trace("Saving sequence ID %d/%d", self.seq_id, self.snapshot_at_ms)
- try:
- await self.save_seq_id()
- except Exception:
- self.log.exception("Error saving sequence ID")
- async def update_seq_id(self, evt: NewSequenceID) -> None:
- self.seq_id = evt.seq_id
- self.snapshot_at_ms = evt.snapshot_at_ms
- if not self._seq_id_save_task or self._seq_id_save_task.done():
- self.log.trace("Starting seq id save task (%d/%d)", evt.seq_id, evt.snapshot_at_ms)
- self._seq_id_save_task = asyncio.create_task(self._save_seq_id_after_sleep())
- else:
- self.log.trace("Not starting seq id save task (%d/%d)", evt.seq_id, evt.snapshot_at_ms)
- @async_time(METRIC_MESSAGE)
- async def handle_message(self, evt: MessageSyncEvent) -> None:
- portal = await po.Portal.get_by_thread_id(evt.message.thread_id, receiver=self.igpk)
- if not portal or not portal.mxid:
- self.log.debug(
- "Got message in thread with no portal, getting info and syncing thread..."
- )
- resp = await self.client.get_thread(evt.message.thread_id)
- portal = await po.Portal.get_by_thread(resp.thread, self.igpk)
- await self._sync_thread(resp.thread, enqueue_backfill=False, portal=portal)
- if not portal.mxid:
- self.log.warning(
- "Room creation appears to have failed, "
- f"dropping message in {evt.message.thread_id}"
- )
- return
- self.log.trace(f"Received message sync event {evt.message}")
- if evt.message.new_reaction:
- await portal.handle_instagram_reaction(
- evt.message, remove=evt.message.op == Operation.REMOVE
- )
- return
- sender = await pu.Puppet.get_by_pk(evt.message.user_id) if evt.message.user_id else None
- if evt.message.is_thread_image:
- await portal.update_thread_image(self, evt.message.thread_image, sender=sender)
- elif evt.message.op == Operation.ADD:
- if not sender:
- # I don't think we care about adds with no sender
- return
- background_task.create(
- self.client.send_delivery_receipt(portal.thread_id, sender.pk, evt.message.item_id)
- )
- await portal.handle_instagram_item(self, sender, evt.message)
- elif evt.message.op == Operation.REMOVE:
- # Removes don't have a sender, only the message sender can unsend messages anyway
- await portal.handle_instagram_remove(evt.message.item_id)
- elif evt.message.op == Operation.REPLACE:
- await portal.handle_instagram_update(evt.message)
- @async_time(METRIC_THREAD_SYNC)
- async def handle_thread_sync(self, evt: ThreadSyncEvent) -> None:
- self.log.trace("Thread sync event content: %s", evt)
- portal = await po.Portal.get_by_thread(evt, receiver=self.igpk)
- if portal.mxid:
- self.log.debug("Got thread sync event for %s with existing portal", portal.thread_id)
- elif evt.is_group:
- self.log.debug(
- "Got thread sync event for group %s without existing portal, creating room",
- portal.thread_id,
- )
- else:
- self.log.debug(
- "Got thread sync event for DM %s without existing portal, ignoring",
- portal.thread_id,
- )
- return
- await self._sync_thread(evt, enqueue_backfill=False, portal=portal)
- async def handle_thread_remove(self, evt: ThreadRemoveEvent) -> None:
- self.log.debug("Got thread remove event: %s", evt.serialize())
- @async_time(METRIC_RTD)
- async def handle_rtd(self, evt: RealtimeDirectEvent) -> None:
- if not isinstance(evt.value, ActivityIndicatorData):
- return
- now = int(time.time() * 1000)
- date = evt.value.timestamp_ms
- expiry = date + evt.value.ttl
- if expiry < now:
- return
- if evt.activity_indicator_id in self._activity_indicator_ids:
- return
- # TODO clear expired items from this dict
- self._activity_indicator_ids[evt.activity_indicator_id] = expiry
- puppet = await pu.Puppet.get_by_pk(int(evt.value.sender_id))
- portal = await po.Portal.get_by_thread_id(evt.thread_id, receiver=self.igpk)
- if not puppet or not portal or not portal.mxid:
- return
- is_typing = evt.value.activity_status != TypingStatus.OFF
- if puppet.pk == self.igpk:
- self.remote_typing_status = TypingStatus.TEXT if is_typing else TypingStatus.OFF
- await puppet.intent_for(portal).set_typing(portal.mxid, timeout=evt.value.ttl)
- # endregion
- # region Database getters
- def _add_to_cache(self) -> None:
- self.by_mxid[self.mxid] = self
- if self.igpk:
- self.by_igpk[self.igpk] = self
- @classmethod
- @async_getter_lock
- async def get_by_mxid(cls, mxid: UserID, *, create: bool = True) -> User | None:
- # Never allow ghosts to be users
- if pu.Puppet.get_id_from_mxid(mxid):
- return None
- try:
- return cls.by_mxid[mxid]
- except KeyError:
- pass
- user = cast(cls, await super().get_by_mxid(mxid))
- if user is not None:
- user._add_to_cache()
- return user
- if create:
- user = cls(mxid)
- await user.insert()
- user._add_to_cache()
- return user
- return None
- @classmethod
- @async_getter_lock
- async def get_by_igpk(cls, igpk: int) -> User | None:
- try:
- return cls.by_igpk[igpk]
- except KeyError:
- pass
- user = cast(cls, await super().get_by_igpk(igpk))
- if user is not None:
- user._add_to_cache()
- return user
- return None
- @classmethod
- async def all_logged_in(cls) -> AsyncGenerator[User, None]:
- users = await super().all_logged_in()
- user: cls
- for index, user in enumerate(users):
- try:
- yield cls.by_mxid[user.mxid]
- except KeyError:
- user._add_to_cache()
- yield user
- # endregion
|