Selaa lähdekoodia

Add periodic reconnect option

Tulir Asokan 4 vuotta sitten
vanhempi
sitoutus
800a4cb979

+ 49 - 0
mautrix_instagram/__main__.py

@@ -13,6 +13,10 @@
 #
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from typing import Optional
+import logging
+import asyncio
+
 from mautrix.types import UserID, RoomID
 from mautrix.bridge import Bridge
 from mautrix.bridge.state_store.asyncpg import PgBridgeStateStore
@@ -47,6 +51,12 @@ class InstagramBridge(Bridge):
     state_store: PgBridgeStateStore
     provisioning_api: ProvisioningAPI
 
+    periodic_reconnect_task: Optional[asyncio.Task]
+
+    def preinit(self) -> None:
+        self.periodic_reconnect_task = None
+        super().preinit()
+
     def make_state_store(self) -> None:
         self.state_store = PgBridgeStateStore(self.db, self.get_puppet, self.get_double_puppet)
 
@@ -71,8 +81,10 @@ class InstagramBridge(Bridge):
         if self.config["bridge.resend_bridge_info"]:
             self.add_startup_actions(self.resend_bridge_info())
         await super().start()
+        self.periodic_reconnect_task = self.loop.create_task(self._try_periodic_reconnect_loop())
 
     def prepare_stop(self) -> None:
+        self.periodic_reconnect_task.cancel()
         self.add_shutdown_actions(user.stop_listen() for user in User.by_igpk.values())
         self.log.debug("Stopping puppet syncers")
         for puppet in Puppet.by_custom_mxid.values():
@@ -86,6 +98,43 @@ class InstagramBridge(Bridge):
             await portal.update_bridge_info()
         self.log.info("Finished re-sending bridge info state events")
 
+    async def _try_periodic_reconnect_loop(self) -> None:
+        try:
+            await self._periodic_reconnect_loop()
+        except Exception:
+            self.log.exception("Fatal error in periodic reconnect loop")
+
+    async def _periodic_reconnect_loop(self) -> None:
+        log = logging.getLogger("mau.periodic_reconnect")
+        always_reconnect = self.config["bridge.periodic_reconnect.always"]
+        interval = self.config["bridge.periodic_reconnect.interval"]
+        if interval <= 0:
+            log.debug("Periodic reconnection is not enabled")
+            return
+        resync = bool(self.config["bridge.periodic_reconnect.resync"])
+        if interval < 600:
+            log.warning("Periodic reconnect interval is quite low (%d)", interval)
+        log.debug("Starting periodic reconnect loop")
+        while True:
+            try:
+                await asyncio.sleep(interval)
+            except asyncio.CancelledError:
+                log.debug("Periodic reconnect loop stopped")
+                return
+            log.info("Executing periodic reconnections")
+            for user in User.by_igpk.values():
+                if not user.is_connected and not always_reconnect:
+                    log.debug("Not reconnecting %s: not connected", user.mxid)
+                    continue
+                log.debug("Executing periodic reconnect for %s", user.mxid)
+                try:
+                    await user.refresh(resync=resync)
+                except asyncio.CancelledError:
+                    log.debug("Periodic reconnect loop stopped")
+                    return
+                except Exception:
+                    log.exception("Error while reconnecting", user.mxid)
+
     async def get_portal(self, room_id: RoomID) -> Portal:
         return await Portal.get_by_mxid(room_id)
 

+ 4 - 8
mautrix_instagram/commands/conn.py

@@ -55,9 +55,9 @@ async def ping(evt: CommandEvent) -> None:
 
 
 @command_handler(needs_auth=True, management_only=False, help_section=SECTION_CONNECTION,
-                 help_text="Synchronize portals")
-async def sync(evt: CommandEvent) -> None:
-    await evt.sender.sync()
+                 help_text="Reconnect to Instagram and synchronize portals", aliases=["sync"])
+async def refresh(evt: CommandEvent) -> None:
+    await evt.sender.refresh()
     await evt.reply("Synchronization complete")
 
 
@@ -67,10 +67,7 @@ async def connect(evt: CommandEvent) -> None:
     if evt.sender.is_connected:
         await evt.reply("You're already connected to Instagram.")
         return
-    # TODO backfill when reconnecting
-    await evt.sender.stop_listen()
-    evt.sender.shutdown = False
-    await evt.sender.start_listen()
+    await evt.sender.refresh(resync=False)
     await evt.reply("Restarted connection to Instagram.")
 
 
@@ -80,5 +77,4 @@ async def disconnect(evt: CommandEvent) -> None:
     if not evt.sender.mqtt:
         await evt.reply("You're not connected to Instagram.")
     await evt.sender.stop_listen()
-    evt.sender.shutdown = False
     await evt.reply("Successfully disconnected from Instagram.")

+ 3 - 0
mautrix_instagram/config.py

@@ -71,6 +71,9 @@ class Config(BaseBridgeConfig):
         copy("bridge.backfill.initial_limit")
         copy("bridge.backfill.missed_limit")
         copy("bridge.backfill.disable_notifications")
+        copy("bridge.periodic_reconnect.interval")
+        copy("bridge.periodic_reconnect.resync")
+        copy("bridge.periodic_reconnect.always")
         copy("bridge.encryption.allow")
         copy("bridge.encryption.default")
         copy("bridge.encryption.key_sharing.allow")

+ 9 - 0
mautrix_instagram/example-config.yaml

@@ -120,6 +120,15 @@ bridge:
         # If using double puppeting, should notifications be disabled
         # while the initial backfill is in progress?
         disable_notifications: false
+    periodic_reconnect:
+        # Interval in seconds in which to automatically reconnect all users.
+        # This can be used to automatically mitigate the bug where Instagram stops sending messages.
+        # Set to -1 to disable periodic reconnections entirely.
+        interval: -1
+        # Whether or not the bridge should backfill chats when reconnecting.
+        resync: true
+        # Should even disconnected users be reconnected?
+        always: false
     # End-to-bridge encryption support options. You must install the e2be optional dependency for
     # this to work. See https://github.com/tulir/mautrix-telegram/wiki/End‐to‐bridge-encryption
     encryption:

+ 12 - 2
mautrix_instagram/user.py

@@ -217,6 +217,13 @@ class User(DBUser, BaseUser):
             if portal.mxid
         }
 
+    async def refresh(self, resync: bool = True) -> None:
+        await self.stop_listen()
+        if resync:
+            await self.sync()
+        else:
+            await self.start_listen()
+
     async def sync(self) -> None:
         resp = await self.client.get_inbox()
         max_age = self.config["bridge.portal_create_max_age"] * 1_000_000
@@ -241,7 +248,9 @@ class User(DBUser, BaseUser):
         if not self._listen_task:
             await self.start_listen(resp.seq_id, resp.snapshot_at_ms)
 
-    async def start_listen(self, seq_id: Optional[int] = None, snapshot_at_ms: Optional[int] = None) -> None:
+    async def start_listen(self, seq_id: Optional[int] = None, snapshot_at_ms: Optional[int] = None
+                           ) -> None:
+        self.shutdown = False
         if not seq_id:
             resp = await self.client.get_inbox(limit=1)
             seq_id, snapshot_at_ms = resp.seq_id, resp.snapshot_at_ms
@@ -274,11 +283,12 @@ class User(DBUser, BaseUser):
             self._track_metric(METRIC_CONNECTED, False)
 
     async def stop_listen(self) -> None:
-        self.shutdown = True
         if self.mqtt:
+            self.shutdown = True
             self.mqtt.disconnect()
             if self._listen_task:
                 await self._listen_task
+            self.shutdown = False
         self._track_metric(METRIC_CONNECTED, False)
         self._is_connected = False
         await self.update()