# mautrix-instagram - A Matrix-Instagram puppeting bridge. # Copyright (C) 2023 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from __future__ import annotations from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable, Callable, cast from datetime import datetime, timedelta from functools import partial import asyncio import logging import time from mauigpapi import AndroidAPI, AndroidMQTT, AndroidState from mauigpapi.errors import ( IGChallengeError, IGCheckpointError, IGConsentRequiredError, IGNotLoggedInError, IGRateLimitError, IGResponseError, IGUnknownError, IGUserIDNotFoundError, IrisSubscribeError, MQTTConnectionUnauthorized, MQTTNotConnected, MQTTNotLoggedIn, MQTTReconnectionError, ) from mauigpapi.mqtt import ( Connect, Disconnect, GraphQLSubscription, NewSequenceID, ProxyUpdate, SkywalkerSubscription, ) from mauigpapi.types import ( ActivityIndicatorData, CurrentUser, MessageSyncEvent, Operation, RealtimeDirectEvent, Thread, ThreadRemoveEvent, ThreadSyncEvent, TypingStatus, ) from mauigpapi.types.direct_inbox import DMInbox, DMInboxResponse from mautrix.appservice import AppService from mautrix.bridge import BaseUser, async_getter_lock from mautrix.types import EventID, MessageType, RoomID, TextMessageEventContent, UserID from mautrix.util import background_task from mautrix.util.bridge_state import BridgeState, BridgeStateEvent from mautrix.util.logging import TraceLogger from mautrix.util.opt_prometheus import Gauge, Summary, async_time from mautrix.util.proxy import RETRYABLE_PROXY_EXCEPTIONS, ProxyHandler from mautrix.util.simple_lock import SimpleLock from . import portal as po, puppet as pu from .config import Config from .db import Backfill, Message as DBMessage, Portal as DBPortal, User as DBUser if TYPE_CHECKING: from .__main__ import InstagramBridge METRIC_MESSAGE = Summary("bridge_on_message", "calls to handle_message") METRIC_THREAD_SYNC = Summary("bridge_on_thread_sync", "calls to handle_thread_sync") METRIC_RTD = Summary("bridge_on_rtd", "calls to handle_rtd") METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge") METRIC_CONNECTED = Gauge("bridge_connected", "Bridged users connected to Instagram") BridgeState.human_readable_errors.update( { "ig-connection-error": "Instagram disconnected unexpectedly", "ig-refresh-connection-error": "Reconnecting failed again after refresh: {message}", "ig-connection-fatal-error": "Instagram disconnected unexpectedly", "ig-auth-error": "Authentication error from Instagram: {message}, please login again to continue", "ig-checkpoint": "Instagram checkpoint error. Please check the Instagram website.", "ig-consent-required": "Instagram requires a consent update. Please check the Instagram website.", "ig-checkpoint-locked": "Instagram checkpoint error. Please check the Instagram website.", "ig-rate-limit": "Got Instagram ratelimit error, waiting a few minutes before retrying...", "ig-disconnected": None, "logged-out": "You've been logged out of instagram, please login again to continue", } ) class User(DBUser, BaseUser): ig_base_log: TraceLogger = logging.getLogger("mau.instagram") _activity_indicator_ids: dict[str, int] = {} by_mxid: dict[UserID, User] = {} by_igpk: dict[int, User] = {} config: Config az: AppService loop: asyncio.AbstractEventLoop client: AndroidAPI | None mqtt: AndroidMQTT | None _listen_task: asyncio.Task | None = None _sync_lock: SimpleLock _backfill_loop_task: asyncio.Task | None _thread_sync_task: asyncio.Task | None _seq_id_save_task: asyncio.Task | None _message_error_login_last_recheck: float permission_level: str username: str | None _notice_room_lock: asyncio.Lock _notice_send_lock: asyncio.Lock _is_logged_in: bool _is_connected: bool shutdown: bool remote_typing_status: TypingStatus | None def __init__( self, mxid: UserID, igpk: int | None = None, state: AndroidState | None = None, notice_room: RoomID | None = None, seq_id: int | None = None, snapshot_at_ms: int | None = None, oldest_cursor: str | None = None, total_backfilled_portals: int | None = None, thread_sync_completed: bool = False, ) -> None: super().__init__( mxid=mxid, igpk=igpk, state=state, notice_room=notice_room, seq_id=seq_id, snapshot_at_ms=snapshot_at_ms, oldest_cursor=oldest_cursor, total_backfilled_portals=total_backfilled_portals, thread_sync_completed=thread_sync_completed, ) BaseUser.__init__(self) self._notice_room_lock = asyncio.Lock() self._notice_send_lock = asyncio.Lock() self.command_status = None perms = self.config.get_permissions(mxid) self.relay_whitelisted, self.is_whitelisted, self.is_admin, self.permission_level = perms self.client = None self.mqtt = None self.username = None self._message_error_login_last_recheck = 0 self._is_logged_in = False self._is_connected = False self._is_refreshing = False self.shutdown = False self._sync_lock = SimpleLock( "Waiting for thread sync to finish before handling %s", log=self.log ) self._listen_task = None self._thread_sync_task = None self._backfill_loop_task = None self.remote_typing_status = None self._seq_id_save_task = None self._message_error_login_last_recheck = 0 self.proxy_handler = ProxyHandler( api_url=self.config["bridge.get_proxy_api_url"], ) @classmethod def init_cls(cls, bridge: "InstagramBridge") -> AsyncIterable[Awaitable[None]]: cls.bridge = bridge cls.config = bridge.config cls.az = bridge.az cls.loop = bridge.loop return (user.try_connect() async for user in cls.all_logged_in()) # region Connection management async def is_logged_in(self) -> bool: return bool(self.client) and self._is_logged_in async def get_puppet(self) -> pu.Puppet | None: if not self.igpk: return None return await pu.Puppet.get_by_pk(self.igpk) async def get_portal_with(self, puppet: pu.Puppet, create: bool = True) -> po.Portal | None: if not self.igpk: return None portal = await po.Portal.find_private_chat(self.igpk, puppet.pk) if portal: return portal if create: # TODO add error handling somewhere thread = await self.client.create_group_thread([puppet.pk]) portal = await po.Portal.get_by_thread(thread, self.igpk) await portal.update_info(thread, self) return portal return None async def try_connect(self) -> None: while True: try: await self.connect() except RETRYABLE_PROXY_EXCEPTIONS as e: # These are retried by the client up to 10 times, but we actually want to retry # these indefinitely so we capture them here again and retry. self.log.warning( f"Proxy error connecting to Instagram: {e}, retrying in 1 minute", ) await asyncio.sleep(60) continue except Exception as e: self.log.exception("Error while connecting to Instagram") await self.push_bridge_state( BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)} ) return @property def api_log(self) -> TraceLogger: return self.ig_base_log.getChild("http").getChild(self.mxid) @property def is_connected(self) -> bool: return bool(self.client) and bool(self.mqtt) and self._is_connected async def connect(self, user: CurrentUser | None = None) -> None: if not self.state: await self.push_bridge_state( BridgeStateEvent.BAD_CREDENTIALS, error="logged-out", info={"cnd_action": "reauth"}, ) return client = AndroidAPI( self.state, log=self.api_log, proxy_handler=self.proxy_handler, on_proxy_update=self.on_proxy_update, on_response_error=self.on_response_error, ) if not user: try: resp = await client.current_user() user = resp.user except IGNotLoggedInError as e: self.log.warning(f"Failed to connect to Instagram: {e}, logging out") await self.logout(error=e) return except IGCheckpointError as e: self.log.debug("Checkpoint error content: %s", e.body) raise except (IGChallengeError, IGConsentRequiredError) as e: await self._handle_checkpoint(e, on="connect", client=client) return self.client = client self._is_logged_in = True self.igpk = user.pk self.username = user.username self._message_error_login_last_recheck = 0 await self.push_bridge_state(BridgeStateEvent.CONNECTING) self._track_metric(METRIC_LOGGED_IN, True) self.by_igpk[self.igpk] = self self.mqtt = AndroidMQTT( self.state, log=self.ig_base_log.getChild("mqtt").getChild(self.mxid), proxy_handler=self.proxy_handler, mqtt_keepalive=self.config["instagram.mqtt_keepalive"], ) self.mqtt.add_event_handler(Connect, self.on_connect) self.mqtt.add_event_handler(Disconnect, self.on_disconnect) self.mqtt.add_event_handler(NewSequenceID, self.update_seq_id) self.mqtt.add_event_handler(MessageSyncEvent, self.handle_message) self.mqtt.add_event_handler(ThreadSyncEvent, self.handle_thread_sync) self.mqtt.add_event_handler(ThreadRemoveEvent, self.handle_thread_remove) self.mqtt.add_event_handler(RealtimeDirectEvent, self.handle_rtd) self.mqtt.add_event_handler(ProxyUpdate, self.on_proxy_update) await self.update() self.loop.create_task(self._try_sync_puppet(user)) self.loop.create_task(self._post_connect()) async def _post_connect(self): # Backfill requests are handled synchronously so as not to overload the homeserver. # Users can configure their backfill stages to be more or less aggressive with backfilling # to try and avoid getting banned. if not self._backfill_loop_task or self._backfill_loop_task.done(): self._backfill_loop_task = asyncio.create_task(self._handle_backfill_requests_loop()) if not self.seq_id: await self._try_sync() else: self.log.debug("Connecting to MQTT directly as resync_on_startup is false") self.start_listen() if self.config["bridge.backfill.enable"]: if self._thread_sync_task and not self._thread_sync_task.done(): self.log.warning("Cancelling existing background thread sync task") self._thread_sync_task.cancel() self._thread_sync_task = asyncio.create_task(self.backfill_threads()) if self.bridge.homeserver_software.is_hungry: self.log.info("Updating contact info for all users") asyncio.gather(*[puppet.update_contact_info() async for puppet in pu.Puppet.get_all()]) async def _handle_backfill_requests_loop(self) -> None: if not self.config["bridge.backfill.enable"] or not self.config["bridge.backfill.msc2716"]: return while True: await self._sync_lock.wait("backfill request") req = await Backfill.get_next(self.mxid) if not req: await asyncio.sleep(30) continue self.log.info("Backfill request %s", req) try: portal = await po.Portal.get_by_thread_id( req.portal_thread_id, receiver=req.portal_receiver ) await req.mark_dispatched() await portal.backfill(self, req) await req.mark_done() except IGNotLoggedInError as e: self.log.exception("User got logged out during backfill loop") break except (IGChallengeError, IGConsentRequiredError) as e: self.log.exception("User got a challenge during backfill loop") await self._handle_checkpoint(e, on="backfill") break except Exception as e: self.log.exception("Failed to backfill portal %s: %s", req.portal_thread_id, e) # Don't try again to backfill this portal for a minute. await req.set_cooldown_timeout(60) self._backfill_loop_task = None async def on_connect(self, evt: Connect) -> None: self.log.debug("Connected to Instagram") self._track_metric(METRIC_CONNECTED, True) self._is_connected = True await self.send_bridge_notice("Connected to Instagram") await self.push_bridge_state(BridgeStateEvent.CONNECTED) async def on_disconnect(self, evt: Disconnect) -> None: self.log.debug("Disconnected from Instagram") self._track_metric(METRIC_CONNECTED, False) self._is_connected = False async def on_proxy_update(self, evt: ProxyUpdate | None = None) -> None: if self.client: self.client.setup_http(self.state.cookies.jar) if self.mqtt: self.mqtt.setup_proxy() if self.command_status: self.command_status["api"].setup_http(self.command_status["state"].cookies.jar) async def on_response_error(self, err: IGResponseError) -> None: if isinstance(err, IGNotLoggedInError) and (await self.is_logged_in()): self.log.warning(f"Noticed logout in API error response: {err}") await self.logout(error=err) # TODO this stuff could probably be moved to mautrix-python async def get_notice_room(self) -> RoomID: if not self.notice_room: async with self._notice_room_lock: # If someone already created the room while this call was waiting, # don't make a new room if self.notice_room: return self.notice_room creation_content = {} if not self.config["bridge.federate_rooms"]: creation_content["m.federate"] = False self.notice_room = await self.az.intent.create_room( is_direct=True, invitees=[self.mxid], topic="Instagram bridge notices", creation_content=creation_content, ) await self.update() return self.notice_room async def fill_bridge_state(self, state: BridgeState) -> None: await super().fill_bridge_state(state) if not state.remote_id: if self.igpk: state.remote_id = str(self.igpk) else: try: state.remote_id = self.state.user_id except IGUserIDNotFoundError: state.remote_id = None if self.username: state.remote_name = f"@{self.username}" async def get_bridge_states(self) -> list[BridgeState]: if not self.state: return [] state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR) if self.is_connected: state.state_event = BridgeStateEvent.CONNECTED elif self._is_refreshing or self.mqtt: state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT return [state] async def send_bridge_notice( self, text: str, edit: EventID | None = None, state_event: BridgeStateEvent | None = None, important: bool = False, error_code: str | None = None, error_message: str | None = None, info: dict | None = None, ) -> EventID | None: if state_event: await self.push_bridge_state( state_event, error=error_code, message=error_message if error_code else text, info=info, ) if self.config["bridge.disable_bridge_notices"]: return None if not important and not self.config["bridge.unimportant_bridge_notices"]: self.log.debug("Not sending unimportant bridge notice: %s", text) return None event_id = None try: self.log.debug("Sending bridge notice: %s", text) content = TextMessageEventContent( body=text, msgtype=(MessageType.TEXT if important else MessageType.NOTICE) ) if edit: content.set_edit(edit) # This is locked to prevent notices going out in the wrong order async with self._notice_send_lock: event_id = await self.az.intent.send_message(await self.get_notice_room(), content) except Exception: self.log.warning("Failed to send bridge notice", exc_info=True) return edit or event_id async def _try_sync_puppet(self, user_info: CurrentUser) -> None: puppet = await pu.Puppet.get_by_pk(self.igpk) try: if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid): self.log.info("Automatically enabling custom puppet") await puppet.switch_mxid(access_token="auto", mxid=self.mxid) except Exception: self.log.exception("Failed to automatically enable custom puppet") try: await puppet.update_info(user_info, self) except Exception: self.log.exception("Failed to update own puppet info") async def _try_sync(self) -> None: try: await self.sync() except Exception as e: self.log.exception("Exception while syncing") if isinstance(e, IGCheckpointError): self.log.debug("Checkpoint error content: %s", e.body) await self.push_bridge_state( BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)} ) async def get_direct_chats(self) -> dict[UserID, list[RoomID]]: return { pu.Puppet.get_mxid_from_id(portal.other_user_pk): [portal.mxid] for portal in await DBPortal.find_private_chats_of(self.igpk) if portal.mxid } async def refresh(self, resync: bool = True, update_proxy: bool = False) -> None: self._is_refreshing = True try: await self.stop_listen() self.state.reset_pigeon_session_id() if update_proxy and self.proxy_handler.update_proxy_url(reason="reconnect"): await self.on_proxy_update() if resync: retry_count = 0 minutes = 1 while True: try: await self.sync() return except Exception as e: if retry_count >= 4 and minutes < 10: minutes += 1 retry_count += 1 s = "s" if minutes != 1 else "" self.log.exception( f"Error while syncing for refresh, retrying in {minutes} minute{s}" ) if isinstance(e, IGCheckpointError): self.log.debug("Checkpoint error content: %s", e.body) await self.push_bridge_state( BridgeStateEvent.UNKNOWN_ERROR, error="unknown-error", message="An unknown error occurred while connecting to Instagram", info={"python_error": str(e)}, ) await asyncio.sleep(minutes * 60) else: self.start_listen() finally: self._is_refreshing = False async def _handle_checkpoint( self, e: IGChallengeError | IGConsentRequiredError, on: str, client: AndroidAPI | None = None, ) -> None: self.log.warning(f"Got checkpoint error on {on}: {e.body.serialize()}") client = client or self.client self.client = None self.mqtt = None if isinstance(e, IGConsentRequiredError): await self.push_bridge_state( BridgeStateEvent.BAD_CREDENTIALS, error="ig-consent-required", info=e.body.serialize(), ) return error_code = "ig-checkpoint" try: resp = await client.challenge_reset() info = { "challenge_context": ( resp.challenge_context.serialize() if resp.challenge_context_str else None ), "step_name": resp.step_name, "step_data": resp.step_data.serialize() if resp.step_data else None, "user_id": resp.user_id, "action": resp.action, "status": resp.status, "challenge": e.body.challenge.serialize() if e.body.challenge else None, } self.log.debug(f"Challenge state: {resp.serialize()}") if resp.challenge_context.challenge_type_enum == "HACKED_LOCK": error_code = "ig-checkpoint-locked" except Exception: self.log.exception("Error resetting challenge state") info = {"challenge": e.body.challenge.serialize() if e.body.challenge else None} await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error=error_code, info=info) async def _sync_thread( self, thread: Thread, enqueue_backfill: bool = True, portal: po.Portal | None = None ) -> bool: """ Sync a specific thread. Returns whether the thread had messages after the last message in the database before the sync. """ self.log.debug(f"Syncing thread {thread.thread_id}") forward_messages = thread.items assert self.client if not portal: portal = await po.Portal.get_by_thread(thread, self.igpk) assert portal else: assert portal.thread_id == thread.thread_id # Create or update the Matrix room if not portal.mxid: await portal.create_matrix_room(self, thread) else: await portal.update_matrix_room(self, thread) if not self.config["bridge.backfill.enable_initial"]: return True last_message = await DBMessage.get_last(portal.mxid) cursor = thread.oldest_cursor if last_message: original_number_of_messages = len(thread.items) new_messages = [ m for m in thread.items if last_message.ig_timestamp_ms < m.timestamp_ms ] forward_messages = new_messages portal.log.debug( f"{len(new_messages)}/{original_number_of_messages} messages are after most recent" " message." ) # Fetch more messages until we get back to messages that have been bridged already. while len(new_messages) > 0 and len(new_messages) == original_number_of_messages: await asyncio.sleep(self.config["bridge.backfill.incremental.page_delay"]) portal.log.debug("Fetching more messages for forward backfill") resp = await self.client.get_thread(portal.thread_id, cursor=cursor) if len(resp.thread.items) == 0: break original_number_of_messages = len(resp.thread.items) new_messages = [ m for m in resp.thread.items if last_message.ig_timestamp_ms < m.timestamp_ms ] forward_messages = new_messages + forward_messages cursor = resp.thread.oldest_cursor portal.log.debug( f"{len(new_messages)}/{original_number_of_messages} messages are after most " "recent message." ) elif not portal.first_event_id: self.log.debug( f"Skipping backfilling {portal.thread_id} as the first event ID is not known" ) return False if forward_messages: portal.cursor = cursor await portal.update() mark_read = thread.read_state == 0 or ( (hours := self.config["bridge.backfill.unread_hours_threshold"]) > 0 and ( datetime.fromtimestamp(forward_messages[0].timestamp_ms / 1000) < datetime.now() - timedelta(hours=hours) ) ) base_insertion_event_id = await portal.backfill_message_page( self, list(reversed(forward_messages)), forward=True, last_message=last_message, mark_read=mark_read, ) if ( not self.bridge.homeserver_software.is_hungry and self.config["bridge.backfill.msc2716"] ): await portal.send_post_backfill_dummy( forward_messages[0].timestamp, base_insertion_event_id=base_insertion_event_id ) if ( mark_read and not self.bridge.homeserver_software.is_hungry and (puppet := await self.get_puppet()) ): last_message = await DBMessage.get_last(portal.mxid) if last_message: await puppet.intent_for(portal).mark_read(portal.mxid, last_message.mxid) await portal._update_read_receipts(thread.last_seen_at) if self.config["bridge.backfill.msc2716"] and enqueue_backfill: await portal.enqueue_immediate_backfill(self, 1) return len(forward_messages) > 0 async def sync(self, increment_total_backfilled_portals: bool = False) -> None: await self.run_with_sync_lock(partial(self._sync, increment_total_backfilled_portals)) async def _sync(self, increment_total_backfilled_portals: bool = False) -> None: if not self._listen_task: self.state.reset_pigeon_session_id() sleep_minutes = 2 while True: try: resp = await self.client.get_inbox() break except IGNotLoggedInError as e: self.log.exception("Got not logged in error while syncing") return except IGRateLimitError as e: self.log.error( "Got ratelimit error while trying to get inbox (%s), retrying in %d minutes", e.body, sleep_minutes, ) await self.push_bridge_state( BridgeStateEvent.TRANSIENT_DISCONNECT, error="ig-rate-limit" ) await asyncio.sleep(sleep_minutes * 60) sleep_minutes += 2 except IGCheckpointError as e: self.log.debug("Checkpoint error content: %s", e.body) raise except (IGChallengeError, IGConsentRequiredError) as e: await self._handle_checkpoint(e, on="sync") return self.seq_id = resp.seq_id self.snapshot_at_ms = resp.snapshot_at_ms await self.save_seq_id() if not self._listen_task: self.start_listen(is_after_sync=True) sync_count = min( self.config["bridge.backfill.max_conversations"], self.config["bridge.max_startup_thread_sync_count"], ) self.log.debug(f"Fetching {sync_count} threads, 20 at a time...") local_limit: int | None = sync_count if sync_count == 0: return elif sync_count < 0: local_limit = None await self._sync_threads_with_delay( self.client.iter_inbox( self._update_seq_id_and_cursor, start_at=resp, local_limit=local_limit ), stop_when_threads_have_no_messages_to_backfill=True, increment_total_backfilled_portals=increment_total_backfilled_portals, local_limit=local_limit, ) try: await self.update_direct_chats() except Exception: self.log.exception("Error updating direct chat list") async def backfill_threads(self): try: await self.run_with_sync_lock(self._backfill_threads) except Exception: self.log.exception("Error in thread backfill loop") async def _backfill_threads(self): assert self.client if not self.config["bridge.backfill.enable"]: return max_conversations = self.config["bridge.backfill.max_conversations"] or 0 if 0 <= max_conversations <= (self.total_backfilled_portals or 0): self.log.info("Backfill max_conversations count reached, not syncing any more portals") return elif self.thread_sync_completed: self.log.debug("Thread backfill is marked as completed, not syncing more portals") return local_limit = ( max_conversations - (self.total_backfilled_portals or 0) if max_conversations >= 0 else None ) start_at = None if self.oldest_cursor: start_at = DMInboxResponse( status="", seq_id=self.seq_id, snapshot_at_ms=0, pending_requests_total=0, has_pending_top_requests=False, viewer=None, inbox=DMInbox( threads=[], has_older=True, unseen_count=0, unseen_count_ts=0, blended_inbox_enabled=False, oldest_cursor=self.oldest_cursor, ), ) backoff = self.config.get("bridge.backfill.backoff.thread_list", 300) await self._sync_threads_with_delay( self.client.iter_inbox( self._update_seq_id_and_cursor, start_at=start_at, local_limit=local_limit, rate_limit_exceeded_backoff=backoff, ), increment_total_backfilled_portals=True, local_limit=local_limit, ) await self.update_direct_chats() def _update_seq_id_and_cursor(self, seq_id: int, cursor: str | None): self.seq_id = seq_id if cursor: self.oldest_cursor = cursor async def _sync_threads_with_delay( self, threads: AsyncIterable[Thread], increment_total_backfilled_portals: bool = False, stop_when_threads_have_no_messages_to_backfill: bool = False, local_limit: int | None = None, ): sync_delay = self.config["bridge.backfill.min_sync_thread_delay"] last_thread_sync_ts = 0.0 found_thread_count = 0 async for thread in threads: found_thread_count += 1 now = time.monotonic() if now < last_thread_sync_ts + sync_delay: delay = last_thread_sync_ts + sync_delay - now self.log.debug("Thread sync is happening too quickly. Waiting for %ds", delay) await asyncio.sleep(delay) last_thread_sync_ts = time.monotonic() had_new_messages = await self._sync_thread(thread) if not had_new_messages and stop_when_threads_have_no_messages_to_backfill: self.log.debug("Got to threads with no new messages. Stopping sync.") return if increment_total_backfilled_portals: self.total_backfilled_portals = (self.total_backfilled_portals or 0) + 1 await self.update() if local_limit is None or found_thread_count < local_limit: if local_limit is None: self.log.info( "Reached end of thread list with no limit, marking thread sync as completed" ) else: self.log.info( f"Reached end of thread list (got {found_thread_count} with " f"limit {local_limit}), marking thread sync as completed" ) self.thread_sync_completed = True await self.update() async def run_with_sync_lock(self, func: Callable[[], Awaitable]): with self._sync_lock: retry_count = 0 while retry_count < 5: try: retry_count += 1 await func() # The sync was successful. Exit the loop. return except IGNotLoggedInError as e: return except Exception: self.log.exception( "Failed to sync threads. Waiting 30 seconds before retrying sync." ) await asyncio.sleep(30) # If we get here, it means that the sync has failed five times. If this happens, most # likely something very bad has happened. self.log.error("Failed to sync threads five times. Will not retry.") def start_listen(self, is_after_sync: bool = False) -> None: self.shutdown = False task = self._listen( seq_id=self.seq_id, snapshot_at_ms=self.snapshot_at_ms, is_after_sync=is_after_sync ) self._listen_task = self.loop.create_task(task) async def delayed_start_listen(self, sleep: int) -> None: await asyncio.sleep(sleep) if self.is_connected: self.log.debug( "Already reconnected before delay after MQTT reconnection error finished" ) else: self.log.debug("Reconnecting after MQTT connection error") self.start_listen() async def fetch_user_and_reconnect(self, sleep_first: int | None = None) -> None: if sleep_first: await asyncio.sleep(sleep_first) if self.is_connected: self.log.debug("Canceling user fetch, already reconnected") return self.log.debug("Refetching current user after disconnection") errors = 0 while True: try: resp = await self.client.current_user() except RETRYABLE_PROXY_EXCEPTIONS as e: # These are retried by the client up to 10 times, but we actually want to retry # these indefinitely so we capture them here again and retry. self.log.warning( f"Proxy error fetching user from Instagram: {e}, retrying in 1 minute", ) await asyncio.sleep(60) except IGNotLoggedInError as e: self.log.warning(f"Failed to reconnect to Instagram: {e}, logging out") return except (IGChallengeError, IGConsentRequiredError) as e: await self._handle_checkpoint(e, on="reconnect") return except IGUnknownError: errors += 1 if errors > 10: raise self.log.warning( "Non-JSON body while trying to check user for reconnection, retrying in 10s" ) await asyncio.sleep(10) except Exception as e: self.log.exception("Error while reconnecting to Instagram") if isinstance(e, IGCheckpointError): self.log.debug("Checkpoint error content: %s", e.body) await self.push_bridge_state( BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)} ) return else: self.log.debug(f"Confirmed current user {resp.user.pk}") self.start_listen() return async def _listen(self, seq_id: int, snapshot_at_ms: int, is_after_sync: bool) -> None: try: await self.mqtt.listen( graphql_subs={ GraphQLSubscription.app_presence(), GraphQLSubscription.direct_typing(self.state.user_id), GraphQLSubscription.direct_status(), }, skywalker_subs={ SkywalkerSubscription.direct_sub(self.state.user_id), SkywalkerSubscription.live_sub(self.state.user_id), }, seq_id=seq_id, snapshot_at_ms=snapshot_at_ms, ) except IrisSubscribeError as e: if is_after_sync: self.log.exception("Got IrisSubscribeError right after refresh") await self.send_bridge_notice( f"Reconnecting failed again after refresh: {e}", important=True, state_event=BridgeStateEvent.UNKNOWN_ERROR, error_code="ig-refresh-connection-error", error_message=str(e), info={"python_error": str(e)}, ) else: self.log.warning(f"Got IrisSubscribeError {e}, refreshing...") background_task.create(self.refresh()) except MQTTReconnectionError as e: self.log.warning( f"Unexpected connection error: {e}, reconnecting in 1 minute", exc_info=True ) await self.send_bridge_notice( f"Error in listener: {e}", important=True, state_event=BridgeStateEvent.TRANSIENT_DISCONNECT, error_code="ig-connection-error-socket", ) self.mqtt.disconnect() background_task.create(self.delayed_start_listen(sleep=60)) except (MQTTNotConnected, MQTTNotLoggedIn, MQTTConnectionUnauthorized) as e: self.log.warning(f"Unexpected connection error: {e}, checking auth and reconnecting") await self.send_bridge_notice( f"Error in listener: {e}", important=True, state_event=BridgeStateEvent.TRANSIENT_DISCONNECT, error_code="ig-connection-error-maybe-auth", ) self.mqtt.disconnect() background_task.create(self.fetch_user_and_reconnect(sleep_first=60)) except Exception as e: self.log.exception("Fatal error in listener, reconnecting in 5 minutes") await self.send_bridge_notice( "Fatal error in listener (see logs for more info)", state_event=BridgeStateEvent.UNKNOWN_ERROR, important=True, error_code="ig-unknown-connection-error", info={"python_error": str(e)}, ) self.mqtt.disconnect() background_task.create(self.fetch_user_and_reconnect(sleep_first=300)) else: if not self.shutdown: await self.send_bridge_notice( "Instagram connection closed without error", state_event=BridgeStateEvent.UNKNOWN_ERROR, error_code="ig-disconnected", ) finally: self._listen_task = None self._is_connected = False self._track_metric(METRIC_CONNECTED, False) async def stop_listen(self) -> None: if self.mqtt: self.shutdown = True self.mqtt.disconnect() if self._listen_task: await self._listen_task self.shutdown = False self._track_metric(METRIC_CONNECTED, False) self._is_connected = False await self.update() def stop_backfill_tasks(self) -> None: if self._backfill_loop_task: self._backfill_loop_task.cancel() self._backfill_loop_task = None if self._thread_sync_task: self._thread_sync_task.cancel() self._thread_sync_task = None async def message_fail_login_check(self) -> None: if self._message_error_login_last_recheck + 300 > time.monotonic(): self.log.warning( "Not rechecking login as it's less than 5 minutes since the last check" ) return self._message_error_login_last_recheck = time.monotonic() try: user = await self.client.current_user() except Exception as e: if isinstance(e, IGNotLoggedInError): self.log.info(f"Got ThreadUserIdDoesNotExist and whoami failed as expected") # This is handled by on_response_error else: self.log.warning( f"Got ThreadUserIdDoesNotExist and whoami failed, but with unexpected error", exc_info=True, ) if isinstance(e, IGResponseError): await self.logout(e) else: self.log.warning( f"Got ThreadUserIdDoesNotExist error, but whoami call is fine ({user.user.pk}" ) async def logout(self, error: IGResponseError | None = None) -> None: await self.stop_listen() self.stop_backfill_tasks() if self.client and error is None: try: await self.client.logout(one_tap_app_login=False) except Exception: self.log.debug("Exception logging out", exc_info=True) if self.mqtt: self.mqtt.disconnect() self._track_metric(METRIC_CONNECTED, False) self._track_metric(METRIC_LOGGED_IN, False) if error is None: await self.push_bridge_state(BridgeStateEvent.LOGGED_OUT) puppet = await pu.Puppet.get_by_pk(self.igpk, create=False) if puppet and puppet.is_real_user: await puppet.switch_mxid(None, None) try: del self.by_igpk[self.igpk] except KeyError: pass self.igpk = None else: self.log.debug("Auth error body: %s", error.body.serialize()) message = ( error.proper_message if isinstance(error, IGNotLoggedInError) else "unknown error" ) await self.send_bridge_notice( f"You have been logged out of Instagram: {message}", important=True, state_event=BridgeStateEvent.BAD_CREDENTIALS, error_code="ig-auth-error", error_message=message, info={"cnd_action": "reauth"}, ) self.client = None self.mqtt = None self.state = None self.seq_id = None if self._seq_id_save_task and not self._seq_id_save_task.done(): self._seq_id_save_task.cancel() self._seq_id_save_task = None self.snapshot_at_ms = None self.thread_sync_completed = False self._is_logged_in = False await self.update() # endregion # region Event handlers async def _save_seq_id_after_sleep(self) -> None: await asyncio.sleep(120) if self.seq_id is None: return self._seq_id_save_task = None self.log.trace("Saving sequence ID %d/%d", self.seq_id, self.snapshot_at_ms) try: await self.save_seq_id() except Exception: self.log.exception("Error saving sequence ID") async def update_seq_id(self, evt: NewSequenceID) -> None: self.seq_id = evt.seq_id self.snapshot_at_ms = evt.snapshot_at_ms if not self._seq_id_save_task or self._seq_id_save_task.done(): self.log.trace("Starting seq id save task (%d/%d)", evt.seq_id, evt.snapshot_at_ms) self._seq_id_save_task = asyncio.create_task(self._save_seq_id_after_sleep()) else: self.log.trace("Not starting seq id save task (%d/%d)", evt.seq_id, evt.snapshot_at_ms) @async_time(METRIC_MESSAGE) async def handle_message(self, evt: MessageSyncEvent) -> None: portal = await po.Portal.get_by_thread_id(evt.message.thread_id, receiver=self.igpk) if not portal or not portal.mxid: self.log.debug( "Got message in thread with no portal, getting info and syncing thread..." ) resp = await self.client.get_thread(evt.message.thread_id) portal = await po.Portal.get_by_thread(resp.thread, self.igpk) await self._sync_thread(resp.thread, enqueue_backfill=False, portal=portal) if not portal.mxid: self.log.warning( "Room creation appears to have failed, " f"dropping message in {evt.message.thread_id}" ) return self.log.trace(f"Received message sync event {evt.message}") if evt.message.new_reaction: await portal.handle_instagram_reaction( evt.message, remove=evt.message.op == Operation.REMOVE ) return sender = await pu.Puppet.get_by_pk(evt.message.user_id) if evt.message.user_id else None if evt.message.is_thread_image: await portal.update_thread_image(self, evt.message.thread_image, sender=sender) elif evt.message.op == Operation.ADD: if not sender: # I don't think we care about adds with no sender return await portal.handle_instagram_item(self, sender, evt.message) elif evt.message.op == Operation.REMOVE: # Removes don't have a sender, only the message sender can unsend messages anyway await portal.handle_instagram_remove(evt.message.item_id) elif evt.message.op == Operation.REPLACE: await portal.handle_instagram_update(evt.message) @async_time(METRIC_THREAD_SYNC) async def handle_thread_sync(self, evt: ThreadSyncEvent) -> None: self.log.trace("Thread sync event content: %s", evt) portal = await po.Portal.get_by_thread(evt, receiver=self.igpk) if portal.mxid: self.log.debug("Got thread sync event for %s with existing portal", portal.thread_id) elif evt.is_group: self.log.debug( "Got thread sync event for group %s without existing portal, creating room", portal.thread_id, ) else: self.log.debug( "Got thread sync event for DM %s without existing portal, ignoring", portal.thread_id, ) return await self._sync_thread(evt, enqueue_backfill=False, portal=portal) async def handle_thread_remove(self, evt: ThreadRemoveEvent) -> None: self.log.debug("Got thread remove event: %s", evt.serialize()) @async_time(METRIC_RTD) async def handle_rtd(self, evt: RealtimeDirectEvent) -> None: if not isinstance(evt.value, ActivityIndicatorData): return now = int(time.time() * 1000) date = evt.value.timestamp_ms expiry = date + evt.value.ttl if expiry < now: return if evt.activity_indicator_id in self._activity_indicator_ids: return # TODO clear expired items from this dict self._activity_indicator_ids[evt.activity_indicator_id] = expiry puppet = await pu.Puppet.get_by_pk(int(evt.value.sender_id)) portal = await po.Portal.get_by_thread_id(evt.thread_id, receiver=self.igpk) if not puppet or not portal or not portal.mxid: return is_typing = evt.value.activity_status != TypingStatus.OFF if puppet.pk == self.igpk: self.remote_typing_status = TypingStatus.TEXT if is_typing else TypingStatus.OFF await puppet.intent_for(portal).set_typing(portal.mxid, timeout=evt.value.ttl) # endregion # region Database getters def _add_to_cache(self) -> None: self.by_mxid[self.mxid] = self if self.igpk: self.by_igpk[self.igpk] = self @classmethod @async_getter_lock async def get_by_mxid(cls, mxid: UserID, *, create: bool = True) -> User | None: # Never allow ghosts to be users if pu.Puppet.get_id_from_mxid(mxid): return None try: return cls.by_mxid[mxid] except KeyError: pass user = cast(cls, await super().get_by_mxid(mxid)) if user is not None: user._add_to_cache() return user if create: user = cls(mxid) await user.insert() user._add_to_cache() return user return None @classmethod @async_getter_lock async def get_by_igpk(cls, igpk: int) -> User | None: try: return cls.by_igpk[igpk] except KeyError: pass user = cast(cls, await super().get_by_igpk(igpk)) if user is not None: user._add_to_cache() return user return None @classmethod async def all_logged_in(cls) -> AsyncGenerator[User, None]: users = await super().all_logged_in() user: cls for index, user in enumerate(users): try: yield cls.by_mxid[user.mxid] except KeyError: user._add_to_cache() yield user # endregion