Sfoglia il codice sorgente

Properly refetch proxy URL on reconnects and connection errors

Tulir Asokan 2 anni fa
parent
commit
21364c4ad3
1 ha cambiato i file con 76 aggiunte e 23 eliminazioni
  1. 76 23
      mautrix_instagram/user.py

+ 76 - 23
mautrix_instagram/user.py

@@ -20,6 +20,8 @@ import asyncio
 import logging
 import time
 
+from aiohttp import ClientConnectionError
+
 from mauigpapi import AndroidAPI, AndroidMQTT, AndroidState, ProxyHandler
 from mauigpapi.errors import (
     IGChallengeError,
@@ -65,6 +67,15 @@ from .db import Portal as DBPortal, User as DBUser
 if TYPE_CHECKING:
     from .__main__ import InstagramBridge
 
+try:
+    from aiohttp_socks import ProxyConnectionError, ProxyError, ProxyTimeoutError
+except ImportError:
+
+    class ProxyError(Exception):
+        pass
+
+    ProxyConnectionError = ProxyTimeoutError = ProxyError
+
 METRIC_MESSAGE = Summary("bridge_on_message", "calls to handle_message")
 METRIC_THREAD_SYNC = Summary("bridge_on_thread_sync", "calls to handle_thread_sync")
 METRIC_RTD = Summary("bridge_on_rtd", "calls to handle_rtd")
@@ -464,12 +475,35 @@ class User(DBUser, BaseUser):
         else:
             self.log.debug(f"{thread.thread_id} is not active and doesn't have a portal")
 
+    async def _maybe_update_proxy(self, source: str) -> None:
+        if not self._listen_task:
+            self.proxy_handler.update_proxy_url()
+            await self.on_proxy_update()
+        else:
+            self.log.debug(f"Not updating proxy: listen_task is still running? (caller: {source})")
+
     async def sync(self) -> None:
         sleep_minutes = 2
+        errors = 0
         while True:
             try:
                 resp = await self.client.get_inbox()
                 break
+            except (
+                ProxyError,
+                ProxyTimeoutError,
+                ProxyConnectionError,
+                ClientConnectionError,
+                ConnectionError,
+                asyncio.TimeoutError,
+            ) as e:
+                errors += 1
+                wait = min(errors * 10, 60)
+                self.log.warning(
+                    f"{e.__class__.__name__} while trying to sync, retrying in {wait} seconds: {e}"
+                )
+                await asyncio.sleep(wait)
+                await self._maybe_update_proxy("sync error")
             except IGNotLoggedInError as e:
                 self.log.exception("Got not logged in error while syncing")
                 await self.logout(error=e)
@@ -530,28 +564,45 @@ class User(DBUser, BaseUser):
 
     async def fetch_user_and_reconnect(self) -> None:
         self.log.debug("Refetching current user after disconnection")
-        try:
-            resp = await self.client.current_user()
-        except IGNotLoggedInError as e:
-            self.log.warning(f"Failed to reconnect to Instagram: {e}, logging out")
-            await self.logout(error=e)
-            return
-        except (IGChallengeError, IGConsentRequiredError) as e:
-            await self._handle_checkpoint(e, on="reconnect")
-            return
-        except Exception as e:
-            self.log.exception("Error while reconnecting to Instagram")
-            if isinstance(e, IGCheckpointError):
-                self.log.debug("Checkpoint error content: %s", e.body)
-            await self.push_bridge_state(
-                BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
-            )
-            return
-        else:
-            self.log.debug(f"Confirmed current user {resp.user.pk}")
-            self.proxy_handler.update_proxy_url()
-            await self.on_proxy_update()
-            self.start_listen()
+        errors = 0
+        while True:
+            try:
+                resp = await self.client.current_user()
+            except (
+                ProxyError,
+                ProxyTimeoutError,
+                ProxyConnectionError,
+                ClientConnectionError,
+                ConnectionError,
+                asyncio.TimeoutError,
+            ) as e:
+                errors += 1
+                wait = min(errors * 10, 60)
+                self.log.warning(
+                    f"{e.__class__.__name__} while trying to check user for reconnection, "
+                    f"retrying in {wait} seconds: {e}"
+                )
+                await asyncio.sleep(wait)
+                await self._maybe_update_proxy("fetch_user_and_reconnect error")
+            except IGNotLoggedInError as e:
+                self.log.warning(f"Failed to reconnect to Instagram: {e}, logging out")
+                await self.logout(error=e)
+                return
+            except (IGChallengeError, IGConsentRequiredError) as e:
+                await self._handle_checkpoint(e, on="reconnect")
+                return
+            except Exception as e:
+                self.log.exception("Error while reconnecting to Instagram")
+                if isinstance(e, IGCheckpointError):
+                    self.log.debug("Checkpoint error content: %s", e.body)
+                await self.push_bridge_state(
+                    BridgeStateEvent.UNKNOWN_ERROR, info={"python_error": str(e)}
+                )
+                return
+            else:
+                self.log.debug(f"Confirmed current user {resp.user.pk}")
+                self.start_listen()
+                return
 
     async def _listen(self, seq_id: int, snapshot_at_ms: int, is_after_sync: bool) -> None:
         try:
@@ -583,7 +634,9 @@ class User(DBUser, BaseUser):
                 self.log.warning(f"Got IrisSubscribeError {e}, refreshing...")
                 asyncio.create_task(self.refresh())
         except (MQTTNotConnected, MQTTNotLoggedIn, MQTTConnectionUnauthorized) as e:
-            self.log.warning(f"Unexpected connection error: {e}")
+            self.log.warning(
+                f"Unexpected connection error: {e}", exc_info="MQTT reconnection failed" in str(e)
+            )
             await self.send_bridge_notice(
                 f"Error in listener: {e}",
                 important=True,