浏览代码

Add real-time bridge status push option

Tulir Asokan 4 年之前
父节点
当前提交
f837c60c06

+ 1 - 0
mautrix_instagram/config.py

@@ -86,6 +86,7 @@ class Config(BaseBridgeConfig):
         copy("bridge.delivery_error_reports")
         copy("bridge.delivery_error_reports")
         copy("bridge.resend_bridge_info")
         copy("bridge.resend_bridge_info")
         copy("bridge.unimportant_bridge_notices")
         copy("bridge.unimportant_bridge_notices")
+        copy("bridge.disable_bridge_notices")
 
 
         copy("bridge.provisioning.enabled")
         copy("bridge.provisioning.enabled")
         copy("bridge.provisioning.prefix")
         copy("bridge.provisioning.prefix")

+ 6 - 0
mautrix_instagram/example-config.yaml

@@ -10,6 +10,10 @@ homeserver:
     asmux: false
     asmux: false
     # Number of retries for all HTTP requests if the homeserver isn't reachable.
     # Number of retries for all HTTP requests if the homeserver isn't reachable.
     http_retry_count: 4
     http_retry_count: 4
+    # The URL to push real-time bridge status to.
+    # If set, the bridge will make POST requests to this URL whenever a user's Facebook MQTT connection state changes.
+    # The bridge will use the appservice as_token to authorize requests.
+    status_endpoint: null
 
 
 # Application service host/registration related details
 # Application service host/registration related details
 # Changing these values requires regeneration of the registration.
 # Changing these values requires regeneration of the registration.
@@ -175,6 +179,8 @@ bridge:
     # Whether or not unimportant bridge notices should be sent to the user.
     # Whether or not unimportant bridge notices should be sent to the user.
     # (e.g. connected, disconnected but will retry)
     # (e.g. connected, disconnected but will retry)
     unimportant_bridge_notices: true
     unimportant_bridge_notices: true
+    # Disable bridge notices entirely
+    disable_bridge_notices: false
 
 
     # Provisioning API part of the web server for automated portal creation and fetching information.
     # Provisioning API part of the web server for automated portal creation and fetching information.
     # Used by things like mautrix-manager (https://github.com/tulir/mautrix-manager).
     # Used by things like mautrix-manager (https://github.com/tulir/mautrix-manager).

+ 2 - 1
mautrix_instagram/portal.py

@@ -706,7 +706,8 @@ class Portal(DBPortal, BasePortal):
                 await self.main_intent.kick_user(self.mxid, p.Puppet.get_mxid_from_id(pk),
                 await self.main_intent.kick_user(self.mxid, p.Puppet.get_mxid_from_id(pk),
                                                  reason="User had left this Instagram DM")
                                                  reason="User had left this Instagram DM")
 
 
-    async def _update_read_receipts(self, receipts: Dict[int, ThreadUserLastSeenAt]) -> None:
+    async def _update_read_receipts(self, receipts: Dict[Union[int, str], ThreadUserLastSeenAt]
+                                    ) -> None:
         for user_id, receipt in receipts.items():
         for user_id, receipt in receipts.items():
             message = await DBMessage.get_by_item_id(receipt.item_id, self.receiver)
             message = await DBMessage.get_by_item_id(receipt.item_id, self.receiver)
             if not message:
             if not message:

+ 53 - 25
mautrix_instagram/user.py

@@ -15,7 +15,6 @@
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 from typing import (Dict, Optional, AsyncIterable, Awaitable, AsyncGenerator, List, TYPE_CHECKING,
 from typing import (Dict, Optional, AsyncIterable, Awaitable, AsyncGenerator, List, TYPE_CHECKING,
                     cast)
                     cast)
-from collections import defaultdict
 import asyncio
 import asyncio
 import logging
 import logging
 import time
 import time
@@ -24,8 +23,9 @@ from mauigpapi import AndroidAPI, AndroidState, AndroidMQTT
 from mauigpapi.mqtt import Connect, Disconnect, GraphQLSubscription, SkywalkerSubscription
 from mauigpapi.mqtt import Connect, Disconnect, GraphQLSubscription, SkywalkerSubscription
 from mauigpapi.types import (CurrentUser, MessageSyncEvent, Operation, RealtimeDirectEvent,
 from mauigpapi.types import (CurrentUser, MessageSyncEvent, Operation, RealtimeDirectEvent,
                              ActivityIndicatorData, TypingStatus, ThreadSyncEvent, Thread)
                              ActivityIndicatorData, TypingStatus, ThreadSyncEvent, Thread)
-from mauigpapi.errors import IGNotLoggedInError, MQTTNotLoggedIn, MQTTNotConnected
-from mautrix.bridge import BaseUser, async_getter_lock
+from mauigpapi.errors import (IGNotLoggedInError, MQTTNotLoggedIn, MQTTNotConnected,
+                              IrisSubscribeError)
+from mautrix.bridge import BaseUser, BridgeState, async_getter_lock
 from mautrix.types import UserID, RoomID, EventID, TextMessageEventContent, MessageType
 from mautrix.types import UserID, RoomID, EventID, TextMessageEventContent, MessageType
 from mautrix.appservice import AppService
 from mautrix.appservice import AppService
 from mautrix.util.opt_prometheus import Summary, Gauge, async_time
 from mautrix.util.opt_prometheus import Summary, Gauge, async_time
@@ -44,6 +44,15 @@ METRIC_RTD = Summary("bridge_on_rtd", "calls to handle_rtd")
 METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
 METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
 METRIC_CONNECTED = Gauge("bridge_connected", "Bridged users connected to Instagram")
 METRIC_CONNECTED = Gauge("bridge_connected", "Bridged users connected to Instagram")
 
 
+BridgeState.human_readable_errors.update({
+    "ig-connection-error": "Instagram disconnected unexpectedly",
+    "ig-logged-out": "You logged out from Instagram",
+    "ig-auth-error": "Authentication error from Instagram: {message}",
+    "ig-disconnected": None,
+    "ig-no-mqtt": "You're not connected to Instagram",
+    "ig-not-logged-in": "You're not logged into Instagram",
+})
+
 
 
 class User(DBUser, BaseUser):
 class User(DBUser, BaseUser):
     ig_base_log: TraceLogger = logging.getLogger("mau.instagram")
     ig_base_log: TraceLogger = logging.getLogger("mau.instagram")
@@ -72,21 +81,18 @@ class User(DBUser, BaseUser):
                  state: Optional[AndroidState] = None, notice_room: Optional[RoomID] = None
                  state: Optional[AndroidState] = None, notice_room: Optional[RoomID] = None
                  ) -> None:
                  ) -> None:
         super().__init__(mxid=mxid, igpk=igpk, state=state, notice_room=notice_room)
         super().__init__(mxid=mxid, igpk=igpk, state=state, notice_room=notice_room)
+        BaseUser.__init__(self)
         self._notice_room_lock = asyncio.Lock()
         self._notice_room_lock = asyncio.Lock()
         self._notice_send_lock = asyncio.Lock()
         self._notice_send_lock = asyncio.Lock()
         perms = self.config.get_permissions(mxid)
         perms = self.config.get_permissions(mxid)
         self.is_whitelisted, self.is_admin, self.permission_level = perms
         self.is_whitelisted, self.is_admin, self.permission_level = perms
-        self.log = self.log.getChild(self.mxid)
         self.client = None
         self.client = None
         self.mqtt = None
         self.mqtt = None
         self.username = None
         self.username = None
-        self.dm_update_lock = asyncio.Lock()
-        self._metric_value = defaultdict(lambda: False)
         self._is_logged_in = False
         self._is_logged_in = False
         self._is_connected = False
         self._is_connected = False
         self.shutdown = False
         self.shutdown = False
         self._listen_task = None
         self._listen_task = None
-        self.command_status = None
         self.remote_typing_status = None
         self.remote_typing_status = None
 
 
     @classmethod
     @classmethod
@@ -122,10 +128,11 @@ class User(DBUser, BaseUser):
         try:
         try:
             resp = await client.current_user()
             resp = await client.current_user()
         except IGNotLoggedInError as e:
         except IGNotLoggedInError as e:
-            self.log.warning(f"Failed to connect to Instagram: {e}")
-            # TODO show reason?
-            await self.send_bridge_notice("You have been logged out of Instagram",
-                                          important=True)
+            self.log.warning(f"Failed to connect to Instagram: {e}, logging out")
+            await self.send_bridge_notice(f"You have been logged out of Instagram: {e!s}",
+                                          important=True, error_code="ig-auth-error",
+                                          error_message=str(e))
+            await self.logout(from_error=True)
             return
             return
         self.client = client
         self.client = client
         self._is_logged_in = True
         self._is_logged_in = True
@@ -152,6 +159,7 @@ class User(DBUser, BaseUser):
         self._track_metric(METRIC_CONNECTED, True)
         self._track_metric(METRIC_CONNECTED, True)
         self._is_connected = True
         self._is_connected = True
         await self.send_bridge_notice("Connected to Instagram")
         await self.send_bridge_notice("Connected to Instagram")
+        await self.push_bridge_state(ok=True)
 
 
     async def on_disconnect(self, evt: Disconnect) -> None:
     async def on_disconnect(self, evt: Disconnect) -> None:
         self.log.debug("Disconnected from Instagram")
         self.log.debug("Disconnected from Instagram")
@@ -172,11 +180,23 @@ class User(DBUser, BaseUser):
                 await self.update()
                 await self.update()
         return self.notice_room
         return self.notice_room
 
 
+    async def get_bridge_state(self) -> BridgeState:
+        if not self.client:
+            return BridgeState(ok=False, error="ig-not-logged-in")
+        elif not self._listen_task or self._listen_task.done() or not self.is_connected:
+            return BridgeState(ok=False, error="ig-no-mqtt")
+        return BridgeState(ok=True)
+
     async def send_bridge_notice(self, text: str, edit: Optional[EventID] = None,
     async def send_bridge_notice(self, text: str, edit: Optional[EventID] = None,
-                                 important: bool = False) -> Optional[EventID]:
+                                 important: bool = False, error_code: Optional[str] = None,
+                                 error_message: Optional[str] = None) -> Optional[EventID]:
+        if error_code:
+            await self.push_bridge_state(ok=False, error=error_code, message=error_message)
+        if self.config["bridge.disable_bridge_notices"]:
+            return None
         if not important and not self.config["bridge.unimportant_bridge_notices"]:
         if not important and not self.config["bridge.unimportant_bridge_notices"]:
             self.log.debug("Not sending unimportant bridge notice: %s", text)
             self.log.debug("Not sending unimportant bridge notice: %s", text)
-            return
+            return None
         event_id = None
         event_id = None
         try:
         try:
             self.log.debug("Sending bridge notice: %s", text)
             self.log.debug("Sending bridge notice: %s", text)
@@ -285,17 +305,22 @@ class User(DBUser, BaseUser):
                 skywalker_subs={SkywalkerSubscription.direct_sub(self.state.user_id),
                 skywalker_subs={SkywalkerSubscription.direct_sub(self.state.user_id),
                                 SkywalkerSubscription.live_sub(self.state.user_id)},
                                 SkywalkerSubscription.live_sub(self.state.user_id)},
                 seq_id=seq_id, snapshot_at_ms=snapshot_at_ms)
                 seq_id=seq_id, snapshot_at_ms=snapshot_at_ms)
+        except IrisSubscribeError as e:
+            self.log.warning(f"Got IrisSubscribeError {e}, refreshing...")
+            await self.refresh()
         except (MQTTNotConnected, MQTTNotLoggedIn) as e:
         except (MQTTNotConnected, MQTTNotLoggedIn) as e:
-            await self.send_bridge_notice(f"Error in listener: {e}", important=True)
+            await self.send_bridge_notice(f"Error in listener: {e}", important=True,
+                                          error_code="ig-connection-error")
             self.mqtt.disconnect()
             self.mqtt.disconnect()
         except Exception:
         except Exception:
             self.log.exception("Fatal error in listener")
             self.log.exception("Fatal error in listener")
             await self.send_bridge_notice("Fatal error in listener (see logs for more info)",
             await self.send_bridge_notice("Fatal error in listener (see logs for more info)",
-                                          important=True)
+                                          important=True, error_code="ig-connection-error")
             self.mqtt.disconnect()
             self.mqtt.disconnect()
         else:
         else:
             if not self.shutdown:
             if not self.shutdown:
-                await self.send_bridge_notice("Instagram connection closed without error")
+                await self.send_bridge_notice("Instagram connection closed without error",
+                                              error_code="ig-disconnected")
         finally:
         finally:
             self._listen_task = None
             self._listen_task = None
             self._is_connected = False
             self._is_connected = False
@@ -312,7 +337,7 @@ class User(DBUser, BaseUser):
         self._is_connected = False
         self._is_connected = False
         await self.update()
         await self.update()
 
 
-    async def logout(self) -> None:
+    async def logout(self, from_error: bool = False) -> None:
         if self.client:
         if self.client:
             try:
             try:
                 await self.client.logout(one_tap_app_login=False)
                 await self.client.logout(one_tap_app_login=False)
@@ -322,17 +347,20 @@ class User(DBUser, BaseUser):
             self.mqtt.disconnect()
             self.mqtt.disconnect()
         self._track_metric(METRIC_CONNECTED, False)
         self._track_metric(METRIC_CONNECTED, False)
         self._track_metric(METRIC_LOGGED_IN, False)
         self._track_metric(METRIC_LOGGED_IN, False)
-        puppet = await pu.Puppet.get_by_pk(self.igpk, create=False)
-        if puppet and puppet.is_real_user:
-            await puppet.switch_mxid(None, None)
-        try:
-            del self.by_igpk[self.igpk]
-        except KeyError:
-            pass
+        if not from_error:
+            puppet = await pu.Puppet.get_by_pk(self.igpk, create=False)
+            if puppet and puppet.is_real_user:
+                await puppet.switch_mxid(None, None)
+            try:
+                del self.by_igpk[self.igpk]
+            except KeyError:
+                pass
+            self.igpk = None
+        else:
+            await self.push_bridge_state(ok=False, error="ig-logged-out")
         self.client = None
         self.client = None
         self.mqtt = None
         self.mqtt = None
         self.state = None
         self.state = None
-        self.igpk = None
         self._is_logged_in = False
         self._is_logged_in = False
         await self.update()
         await self.update()
 
 

+ 4 - 1
mautrix_instagram/web/provisioning_api.py

@@ -102,7 +102,10 @@ class ProvisioningAPI:
                 # TODO maybe don't always log out?
                 # TODO maybe don't always log out?
                 self.log.exception(f"Got error checking current user for %s, logging out. %s",
                 self.log.exception(f"Got error checking current user for %s, logging out. %s",
                                    user.mxid, e.body.json())
                                    user.mxid, e.body.json())
-                await user.logout()
+                await user.send_bridge_notice(f"You have been logged out of Instagram: {e!s}",
+                                              important=True, error_code="ig-auth-error",
+                                              error_message=str(e))
+                await user.logout(from_error=True)
             else:
             else:
                 data["instagram"] = resp.user.serialize()
                 data["instagram"] = resp.user.serialize()
                 pl = user.state.device.payload
                 pl = user.state.device.payload

+ 2 - 2
requirements.txt

@@ -4,7 +4,7 @@ commonmark>=0.8,<0.10
 aiohttp>=3,<4
 aiohttp>=3,<4
 yarl>=1,<2
 yarl>=1,<2
 attrs>=20.1
 attrs>=20.1
-mautrix>=0.9,<0.10
-asyncpg>=0.20,<0.23
+mautrix>=0.9.3,<0.10
+asyncpg>=0.20,<0.24
 pycryptodome>=3,<4
 pycryptodome>=3,<4
 paho-mqtt>=1.5,<2
 paho-mqtt>=1.5,<2