Browse Source

Copy dynamic proxy support from mautrix-facebook (#59)

Alejandro Herrera 2 năm trước cách đây
mục cha
commit
05ab1ca5a7

+ 3 - 0
Dockerfile

@@ -11,6 +11,9 @@ RUN apk add --no-cache \
       py3-commonmark \
       #py3-prometheus-client \
       py3-paho-mqtt \
+      # proxy support
+      py3-aiohttp-socks \
+      py3-pysocks \
       # Other dependencies
       ca-certificates \
       su-exec \

+ 1 - 0
mauigpapi/__init__.py

@@ -1,3 +1,4 @@
 from .http import AndroidAPI
 from .mqtt import AndroidMQTT, GraphQLSubscription, SkywalkerSubscription
+from .proxy import ProxyHandler
 from .state import AndroidState

+ 30 - 4
mauigpapi/http/base.py

@@ -21,7 +21,7 @@ import logging
 import random
 import time
 
-from aiohttp import ClientResponse, ClientSession, ContentTypeError
+from aiohttp import ClientResponse, ClientSession, ContentTypeError, CookieJar
 from yarl import URL
 
 from mautrix.types import JSON, Serializable
@@ -48,8 +48,14 @@ from ..errors import (
     IGUnknownError,
     IGUserHasLoggedOutError,
 )
+from ..proxy import ProxyHandler
 from ..state import AndroidState
 
+try:
+    from aiohttp_socks import ProxyConnector
+except ImportError:
+    ProxyConnector = None
+
 T = TypeVar("T")
 
 
@@ -65,11 +71,19 @@ class BaseAndroidAPI:
     state: AndroidState
     log: TraceLogger
 
-    def __init__(self, state: AndroidState, log: TraceLogger | None = None) -> None:
-        self.http = ClientSession(cookie_jar=state.cookies.jar)
-        self.state = state
+    def __init__(
+        self,
+        state: AndroidState,
+        log: TraceLogger | None = None,
+        proxy_handler: ProxyHandler | None = None,
+    ) -> None:
         self.log = log or logging.getLogger("mauigpapi.http")
 
+        self.proxy_handler = proxy_handler
+        self.setup_http(cookie_jar=state.cookies.jar)
+
+        self.state = state
+
     @staticmethod
     def sign(req: Any, filter_nulls: bool = False) -> dict[str, str]:
         if isinstance(req, Serializable):
@@ -123,6 +137,18 @@ class BaseAndroidAPI:
         }
         return {k: v for k, v in headers.items() if v is not None}
 
+    def setup_http(self, cookie_jar: CookieJar) -> None:
+        connector = None
+        http_proxy = self.proxy_handler.get_proxy_url()
+        if http_proxy:
+            if ProxyConnector:
+                connector = ProxyConnector.from_url(http_proxy)
+            else:
+                self.log.warning("http_proxy is set, but aiohttp-socks is not installed")
+
+        self.http = ClientSession(connector=connector, cookie_jar=cookie_jar)
+        return None
+
     def raw_http_get(self, url: URL | str):
         if isinstance(url, str):
             url = URL(url, encoded=True)

+ 1 - 1
mauigpapi/mqtt/__init__.py

@@ -1,3 +1,3 @@
 from .conn import AndroidMQTT
-from .events import Connect, Disconnect, NewSequenceID
+from .events import Connect, Disconnect, NewSequenceID, ProxyUpdate
 from .subscription import GraphQLSubscription, SkywalkerSubscription

+ 30 - 21
mauigpapi/mqtt/conn.py

@@ -37,6 +37,7 @@ from ..errors import (
     MQTTNotConnected,
     MQTTNotLoggedIn,
 )
+from ..proxy import ProxyHandler
 from ..state import AndroidState
 from ..types import (
     AppPresenceEventPayload,
@@ -59,7 +60,7 @@ from ..types import (
     ThreadSyncEvent,
     TypingStatus,
 )
-from .events import Connect, Disconnect, NewSequenceID
+from .events import Connect, Disconnect, NewSequenceID, ProxyUpdate
 from .otclient import MQTToTClient
 from .subscription import GraphQLQueryID, RealtimeTopic, everclear_subscriptions
 from .thrift import ForegroundStateConfig, IncomingMessage, RealtimeClientInfo, RealtimeConfig
@@ -103,6 +104,7 @@ class AndroidMQTT:
         state: AndroidState,
         loop: asyncio.AbstractEventLoop | None = None,
         log: TraceLogger | None = None,
+        proxy_handler: ProxyHandler | None = None,
     ) -> None:
         self._graphql_subs = set()
         self._skywalker_subs = set()
@@ -125,26 +127,8 @@ class AndroidMQTT:
             protocol=pmc.MQTTv31,
             transport="tcp",
         )
-        try:
-            http_proxy = urllib.request.getproxies()["http"]
-        except KeyError:
-            http_proxy = None
-        if http_proxy and socks and URL:
-            proxy_url = URL(http_proxy)
-            proxy_type = {
-                "http": socks.HTTP,
-                "https": socks.HTTP,
-                "socks": socks.SOCKS5,
-                "socks5": socks.SOCKS5,
-                "socks4": socks.SOCKS4,
-            }[proxy_url.scheme]
-            self._client.proxy_set(
-                proxy_type=proxy_type,
-                proxy_addr=proxy_url.host,
-                proxy_port=proxy_url.port,
-                proxy_username=proxy_url.user,
-                proxy_password=proxy_url.password,
-            )
+        self.proxy_handler = proxy_handler
+        self.setup_proxy()
         self._client.enable_logger()
         self._client.tls_set()
         # mqtt.max_inflight_messages_set(20)  # The rest will get queued
@@ -161,6 +145,28 @@ class AndroidMQTT:
         self._client.on_socket_register_write = self._on_socket_register_write
         self._client.on_socket_unregister_write = self._on_socket_unregister_write
 
+    def setup_proxy(self):
+        http_proxy = self.proxy_handler.get_proxy_url()
+        if http_proxy:
+            if not socks:
+                self.log.warning("http_proxy is set, but pysocks is not installed")
+            else:
+                proxy_url = URL(http_proxy)
+                proxy_type = {
+                    "http": socks.HTTP,
+                    "https": socks.HTTP,
+                    "socks": socks.SOCKS5,
+                    "socks5": socks.SOCKS5,
+                    "socks4": socks.SOCKS4,
+                }[proxy_url.scheme]
+                self._client.proxy_set(
+                    proxy_type=proxy_type,
+                    proxy_addr=proxy_url.host,
+                    proxy_port=proxy_url.port,
+                    proxy_username=proxy_url.user,
+                    proxy_password=proxy_url.password,
+                )
+
     def _clear_response_waiters(self) -> None:
         for waiter in self._response_waiters.values():
             if not waiter.done():
@@ -572,6 +578,9 @@ class AndroidMQTT:
                 elif rc == pmc.MQTT_ERR_NO_CONN:
                     if connection_retries > retry_limit:
                         raise MQTTNotConnected(f"Connection failed {connection_retries} times")
+                    if self.proxy_handler.update_proxy_url():
+                        self.setup_proxy()
+                        await self._dispatch(ProxyUpdate())
                     sleep = connection_retries * 2
                     await self._dispatch(
                         Disconnect(

+ 5 - 0
mauigpapi/mqtt/events.py

@@ -30,3 +30,8 @@ class Disconnect:
 class NewSequenceID:
     seq_id: int
     snapshot_at_ms: int
+
+
+@dataclass
+class ProxyUpdate:
+    pass

+ 52 - 0
mauigpapi/proxy.py

@@ -0,0 +1,52 @@
+from __future__ import annotations
+
+import json
+import logging
+import urllib.request
+
+
+class ProxyHandler:
+    current_proxy_url: str | None = None
+    log = logging.getLogger("mauigpapi.proxy")
+
+    def __init__(self, api_url: str | None) -> None:
+        self.api_url = api_url
+
+    def get_proxy_url_from_api(self) -> str | None:
+        assert self.api_url is not None
+
+        request = urllib.request.Request(self.api_url, method="GET")
+        self.log.debug("Requesting proxy from: %s", self.api_url)
+
+        try:
+            with urllib.request.urlopen(request) as f:
+                response = json.loads(f.read().decode())
+        except Exception:
+            self.log.exception("Failed to retrieve proxy from API")
+        else:
+            return response["proxy_url"]
+
+        return None
+
+    def update_proxy_url(self) -> bool:
+        old_proxy = self.current_proxy_url
+        new_proxy = None
+
+        if self.api_url is not None:
+            new_proxy = self.get_proxy_url_from_api()
+        else:
+            new_proxy = urllib.request.getproxies().get("http")
+
+        if old_proxy != new_proxy:
+            self.log.debug("Set new proxy URL: %s", new_proxy)
+            self.current_proxy_url = new_proxy
+            return True
+
+        self.log.debug("Got same proxy URL: %s", new_proxy)
+        return False
+
+    def get_proxy_url(self) -> str | None:
+        if not self.current_proxy_url:
+            self.update_proxy_url()
+
+        return self.current_proxy_url

+ 1 - 1
mautrix_instagram/commands/auth.py

@@ -45,7 +45,7 @@ async def get_login_state(user: u.User, seed: str) -> tuple[AndroidAPI, AndroidS
         state = AndroidState()
         seed = hmac.new(seed.encode("utf-8"), user.mxid.encode("utf-8"), hashlib.sha256).digest()
         state.device.generate(seed)
-        api = AndroidAPI(state, log=user.api_log)
+        api = AndroidAPI(state, log=user.api_log, proxy_handler=user.proxy_handler)
         await api.qe_sync_login_experiments()
         user.command_status = {
             "action": "Login",

+ 2 - 0
mautrix_instagram/config.py

@@ -89,6 +89,8 @@ class Config(BaseBridgeConfig):
 
         copy("bridge.command_prefix")
 
+        copy("bridge.get_proxy_api_url")
+
         copy_dict("bridge.permissions")
 
     def _get_permissions(self, key: str) -> Permissions:

+ 4 - 0
mautrix_instagram/example-config.yaml

@@ -167,6 +167,10 @@ bridge:
         resync: true
         # Should even disconnected users be reconnected?
         always: false
+
+    # URL to call to retrieve a proxy URL from (defaults to the http_proxy environment variable).
+    get_proxy_api_url: null
+
     # End-to-bridge encryption support options.
     #
     # See https://docs.mau.fi/bridges/general/end-to-bridge-encryption.html for more info.

+ 23 - 3
mautrix_instagram/user.py

@@ -20,7 +20,7 @@ import asyncio
 import logging
 import time
 
-from mauigpapi import AndroidAPI, AndroidMQTT, AndroidState
+from mauigpapi import AndroidAPI, AndroidMQTT, AndroidState, ProxyHandler
 from mauigpapi.errors import (
     IGChallengeError,
     IGCheckpointError,
@@ -38,6 +38,7 @@ from mauigpapi.mqtt import (
     Disconnect,
     GraphQLSubscription,
     NewSequenceID,
+    ProxyUpdate,
     SkywalkerSubscription,
 )
 from mauigpapi.types import (
@@ -144,6 +145,10 @@ class User(DBUser, BaseUser):
         self.remote_typing_status = None
         self._seq_id_save_task = None
 
+        self.proxy_handler = ProxyHandler(
+            api_url=self.config["bridge.get_proxy_api_url"],
+        )
+
     @classmethod
     def init_cls(cls, bridge: "InstagramBridge") -> AsyncIterable[Awaitable[None]]:
         cls.bridge = bridge
@@ -197,7 +202,11 @@ class User(DBUser, BaseUser):
         if not self.state:
             await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error="logged-out")
             return
-        client = AndroidAPI(self.state, log=self.api_log)
+        client = AndroidAPI(
+            self.state,
+            log=self.api_log,
+            proxy_handler=self.proxy_handler,
+        )
 
         if not user:
             try:
@@ -222,7 +231,10 @@ class User(DBUser, BaseUser):
         self.by_igpk[self.igpk] = self
 
         self.mqtt = AndroidMQTT(
-            self.state, loop=self.loop, log=self.ig_base_log.getChild("mqtt").getChild(self.mxid)
+            self.state,
+            loop=self.loop,
+            log=self.ig_base_log.getChild("mqtt").getChild(self.mxid),
+            proxy_handler=self.proxy_handler,
         )
         self.mqtt.add_event_handler(Connect, self.on_connect)
         self.mqtt.add_event_handler(Disconnect, self.on_disconnect)
@@ -230,6 +242,7 @@ class User(DBUser, BaseUser):
         self.mqtt.add_event_handler(MessageSyncEvent, self.handle_message)
         self.mqtt.add_event_handler(ThreadSyncEvent, self.handle_thread_sync)
         self.mqtt.add_event_handler(RealtimeDirectEvent, self.handle_rtd)
+        self.mqtt.add_event_handler(ProxyUpdate, self.on_proxy_update)
 
         await self.update()
 
@@ -252,6 +265,10 @@ class User(DBUser, BaseUser):
         self._track_metric(METRIC_CONNECTED, False)
         self._is_connected = False
 
+    async def on_proxy_update(self, evt: ProxyUpdate | None = None) -> None:
+        if self.client:
+            self.client.setup_http(self.state.cookies.jar)
+
     # TODO this stuff could probably be moved to mautrix-python
     async def get_notice_room(self) -> RoomID:
         if not self.notice_room:
@@ -395,6 +412,7 @@ class User(DBUser, BaseUser):
                 self.start_listen()
         finally:
             self._is_refreshing = False
+            self.proxy_handler.update_proxy_url()
 
     async def _handle_checkpoint(
         self,
@@ -531,6 +549,8 @@ class User(DBUser, BaseUser):
             return
         else:
             self.log.debug(f"Confirmed current user {resp.user.pk}")
+            self.proxy_handler.update_proxy_url()
+            await self.on_proxy_update()
             self.start_listen()
 
     async def _listen(self, seq_id: int, snapshot_at_ms: int, is_after_sync: bool) -> None:

+ 4 - 0
optional-requirements.txt

@@ -8,6 +8,10 @@ unpaddedbase64>=1,<3
 #/metrics
 prometheus_client>=0.6,<0.16
 
+#/proxy
+pysocks
+aiohttp-socks
+
 #/imageconvert
 pillow>=4,<10