Forráskód Böngészése

Add bridge_connected metric

Tulir Asokan 4 éve
szülő
commit
544aee708c
4 módosított fájl, 56 hozzáadás és 3 törlés
  1. 18 1
      mausignald/signald.py
  2. 12 0
      mausignald/types.py
  3. 7 1
      mautrix_signal/signal.py
  4. 19 1
      mautrix_signal/user.py

+ 18 - 1
mausignald/signald.py

@@ -12,7 +12,7 @@ from mautrix.util.logging import TraceLogger
 from .rpc import SignaldRPCClient
 from .rpc import SignaldRPCClient
 from .errors import UnexpectedError, UnexpectedResponse, make_linking_error
 from .errors import UnexpectedError, UnexpectedResponse, make_linking_error
 from .types import (Address, Quote, Attachment, Reaction, Account, Message, Contact, Group,
 from .types import (Address, Quote, Attachment, Reaction, Account, Message, Contact, Group,
-                    Profile, GroupID, Identity, GetIdentitiesResponse)
+                    Profile, GroupID, GetIdentitiesResponse, ListenEvent, ListenAction)
 
 
 T = TypeVar('T')
 T = TypeVar('T')
 EventHandler = Callable[[T], Awaitable[None]]
 EventHandler = Callable[[T], Awaitable[None]]
@@ -27,6 +27,9 @@ class SignaldClient(SignaldRPCClient):
         super().__init__(socket_path, log, loop)
         super().__init__(socket_path, log, loop)
         self._event_handlers = {}
         self._event_handlers = {}
         self.add_rpc_handler("message", self._parse_message)
         self.add_rpc_handler("message", self._parse_message)
+        self.add_rpc_handler("listen_started", self._parse_listen_start)
+        self.add_rpc_handler("listen_stopped", self._parse_listen_stop)
+        self.add_rpc_handler("version", self._log_version)
 
 
     def add_event_handler(self, event_class: Type[T], handler: EventHandler) -> None:
     def add_event_handler(self, event_class: Type[T], handler: EventHandler) -> None:
         self._event_handlers.setdefault(event_class, []).append(handler)
         self._event_handlers.setdefault(event_class, []).append(handler)
@@ -55,6 +58,20 @@ class SignaldClient(SignaldRPCClient):
         event = event_class.deserialize(event_data)
         event = event_class.deserialize(event_data)
         await self._run_event_handler(event)
         await self._run_event_handler(event)
 
 
+    async def _log_version(self, data: Dict[str, Any]) -> None:
+        name = data["data"]["name"]
+        version = data["data"]["version"]
+        self.log.info(f"Connected to {name} v{version}")
+
+    async def _parse_listen_start(self, data: Dict[str, Any]) -> None:
+        evt = ListenEvent(action=ListenAction.STARTED, username=data["data"])
+        await self._run_event_handler(evt)
+
+    async def _parse_listen_stop(self, data: Dict[str, Any]) -> None:
+        evt = ListenEvent(action=ListenAction.STARTED, username=data["data"],
+                          exception=data.get("exception", None))
+        await self._run_event_handler(evt)
+
     async def subscribe(self, username: str) -> bool:
     async def subscribe(self, username: str) -> bool:
         try:
         try:
             await self.request("subscribe", "subscribed", username=username)
             await self.request("subscribe", "subscribed", username=username)

+ 12 - 0
mausignald/types.py

@@ -251,3 +251,15 @@ class Message(SerializableAttrs['Message']):
     sync_message: Optional[SyncMessage] = attr.ib(default=None, metadata={"json": "syncMessage"})
     sync_message: Optional[SyncMessage] = attr.ib(default=None, metadata={"json": "syncMessage"})
     typing: Optional[TypingNotification] = None
     typing: Optional[TypingNotification] = None
     receipt: Optional[Receipt] = None
     receipt: Optional[Receipt] = None
+
+
+class ListenAction(SerializableEnum):
+    STARTED = "started"
+    STOPPED = "stopped"
+
+
+@dataclass
+class ListenEvent(SerializableAttrs['ListenEvent']):
+    action: ListenAction
+    username: str
+    exception: Optional[str] = None

+ 7 - 1
mautrix_signal/signal.py

@@ -19,7 +19,7 @@ import logging
 
 
 from mausignald import SignaldClient
 from mausignald import SignaldClient
 from mausignald.types import (Message, MessageData, Address, TypingNotification, TypingAction,
 from mausignald.types import (Message, MessageData, Address, TypingNotification, TypingAction,
-                              OwnReadReceipt, Receipt, ReceiptType)
+                              OwnReadReceipt, Receipt, ReceiptType, ListenEvent)
 from mautrix.util.logging import TraceLogger
 from mautrix.util.logging import TraceLogger
 
 
 from .db import Message as DBMessage
 from .db import Message as DBMessage
@@ -39,6 +39,7 @@ class SignalHandler(SignaldClient):
     def __init__(self, bridge: 'SignalBridge') -> None:
     def __init__(self, bridge: 'SignalBridge') -> None:
         super().__init__(bridge.config["signal.socket_path"], loop=bridge.loop)
         super().__init__(bridge.config["signal.socket_path"], loop=bridge.loop)
         self.add_event_handler(Message, self.on_message)
         self.add_event_handler(Message, self.on_message)
+        self.add_event_handler(ListenEvent, self.on_listen)
 
 
     async def on_message(self, evt: Message) -> None:
     async def on_message(self, evt: Message) -> None:
         sender = await pu.Puppet.get_by_address(evt.source)
         sender = await pu.Puppet.get_by_address(evt.source)
@@ -64,6 +65,11 @@ class SignalHandler(SignaldClient):
                 # Typing notification from own device
                 # Typing notification from own device
                 pass
                 pass
 
 
+    @staticmethod
+    async def on_listen(evt: ListenEvent) -> None:
+        user = await u.User.get_by_username(evt.username)
+        user.on_listen(evt)
+
     @staticmethod
     @staticmethod
     async def handle_message(user: 'u.User', sender: 'pu.Puppet', msg: MessageData,
     async def handle_message(user: 'u.User', sender: 'pu.Puppet', msg: MessageData,
                              addr_override: Optional[Address] = None) -> None:
                              addr_override: Optional[Address] = None) -> None:

+ 19 - 1
mautrix_signal/user.py

@@ -14,13 +14,15 @@
 # You should have received a copy of the GNU Affero General Public License
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 from typing import Dict, Optional, AsyncGenerator, TYPE_CHECKING, cast
 from typing import Dict, Optional, AsyncGenerator, TYPE_CHECKING, cast
+from collections import defaultdict
 from uuid import UUID
 from uuid import UUID
 import asyncio
 import asyncio
 
 
-from mausignald.types import Account, Address, Contact, Group
+from mausignald.types import Account, Address, Contact, Group, ListenEvent, ListenAction
 from mautrix.bridge import BaseUser
 from mautrix.bridge import BaseUser
 from mautrix.types import UserID, RoomID
 from mautrix.types import UserID, RoomID
 from mautrix.appservice import AppService
 from mautrix.appservice import AppService
+from mautrix.util.opt_prometheus import Gauge
 
 
 from .db import User as DBUser
 from .db import User as DBUser
 from .config import Config
 from .config import Config
@@ -29,6 +31,8 @@ from . import puppet as pu, portal as po
 if TYPE_CHECKING:
 if TYPE_CHECKING:
     from .__main__ import SignalBridge
     from .__main__ import SignalBridge
 
 
+METRIC_CONNECTED = Gauge('bridge_connected', 'Bridge users connected to Signal')
+
 
 
 class User(DBUser, BaseUser):
 class User(DBUser, BaseUser):
     by_mxid: Dict[UserID, 'User'] = {}
     by_mxid: Dict[UserID, 'User'] = {}
@@ -52,6 +56,7 @@ class User(DBUser, BaseUser):
         self.log = self.log.getChild(self.mxid)
         self.log = self.log.getChild(self.mxid)
         self.dm_update_lock = asyncio.Lock()
         self.dm_update_lock = asyncio.Lock()
         self.command_status = None
         self.command_status = None
+        self._metric_value = defaultdict(lambda: False)
 
 
     @classmethod
     @classmethod
     def init_cls(cls, bridge: 'SignalBridge') -> None:
     def init_cls(cls, bridge: 'SignalBridge') -> None:
@@ -76,6 +81,19 @@ class User(DBUser, BaseUser):
         await self.bridge.signal.subscribe(self.username)
         await self.bridge.signal.subscribe(self.username)
         self.loop.create_task(self.sync())
         self.loop.create_task(self.sync())
 
 
+    def on_listen(self, evt: ListenEvent) -> None:
+        if evt.action == ListenAction.STARTED:
+            self.log.info("Connected to Signal")
+            self._track_metric(METRIC_CONNECTED, True)
+        elif evt.action == ListenAction.STOPPED:
+            if evt.exception:
+                self.log.warning(f"Disconnected from Signal: {evt.exception}")
+            else:
+                self.log.info("Disconnected from Signal")
+            self._track_metric(METRIC_CONNECTED, False)
+        else:
+            self.log.warning(f"Unrecognized listen action {evt.action}")
+
     async def _sync_puppet(self) -> None:
     async def _sync_puppet(self) -> None:
         puppet = await pu.Puppet.get_by_address(self.address)
         puppet = await pu.Puppet.get_by_address(self.address)
         if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
         if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):