ソースを参照

Merge pull request #155 from mautrix/sumner/bri-729-add-configuration-to-allow-bridge-to

Use message send checkpoints
Sumner Evans 3 年 前
コミット
2e57810e96
3 ファイル変更89 行追加26 行削除
  1. 2 0
      mautrix_signal/example-config.yaml
  2. 86 25
      mautrix_signal/portal.py
  3. 1 1
      requirements.txt

+ 2 - 0
mautrix_signal/example-config.yaml

@@ -14,6 +14,8 @@ homeserver:
     # If set, the bridge will make POST requests to this URL whenever a user's Signal connection state changes.
     # The bridge will use the appservice as_token to authorize requests.
     status_endpoint: null
+    # Endpoint for reporting per-message status.
+    message_send_checkpoint_endpoint: null
 
 # Application service host/registration related details
 # Changing these values requires regeneration of the registration.

+ 86 - 25
mautrix_signal/portal.py

@@ -13,7 +13,6 @@
 #
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
-from mautrix.util.bridge_state import BridgeStateEvent
 from typing import (Dict, Tuple, Optional, List, Deque, Any, Union, AsyncGenerator, Awaitable, Set,
                     Callable, TYPE_CHECKING, cast)
 from html import escape as escape_html
@@ -38,6 +37,8 @@ from mautrix.types import (EventID, MessageEventContent, RoomID, EventType, Mess
                            MessageEvent, EncryptedEvent, ContentURI, MediaMessageEventContent,
                            TextMessageEventContent, ImageInfo, VideoInfo, FileInfo, AudioInfo,
                            PowerLevelStateEventContent, UserID)
+from mautrix.util.bridge_state import BridgeStateEvent
+from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus
 from mautrix.errors import MatrixError, MForbidden, IntentError
 
 from .db import Portal as DBPortal, Message as DBMessage, Reaction as DBReaction
@@ -292,6 +293,14 @@ class Portal(DBPortal, BasePortal):
                                    mentions=mentions, quote=quote, attachments=attachments,
                                    timestamp=request_id)
         except Exception as e:
+            sender.send_remote_checkpoint(
+                MessageSendCheckpointStatus.PERM_FAILURE,
+                event_id,
+                self.mxid,
+                EventType.ROOM_MESSAGE,
+                message.msgtype,
+                error=e,
+            )
             auth_failed = (
                 "org.whispersystems.signalservice.api.push.exceptions.AuthorizationFailedException"
             )
@@ -302,18 +311,26 @@ class Portal(DBPortal, BasePortal):
                 TextMessageEventContent(
                     msgtype=MessageType.NOTICE,
                     body=f"\u26a0 Your message was not bridged: {e}"))
-            return
-        msg = DBMessage(mxid=event_id, mx_room=self.mxid, sender=sender.address,
-                        timestamp=request_id,
-                        signal_chat_id=self.chat_id, signal_receiver=self.receiver)
-        await msg.insert()
-        await self._send_delivery_receipt(event_id)
-        self.log.debug(f"Handled Matrix message {event_id} -> {request_id}")
-        if attachment_path and self.config["signal.remove_file_after_handling"]:
-            try:
-                os.remove(attachment_path)
-            except FileNotFoundError:
-                pass
+        else:
+            sender.send_remote_checkpoint(
+                MessageSendCheckpointStatus.SUCCESS,
+                event_id,
+                self.mxid,
+                EventType.ROOM_MESSAGE,
+                message.msgtype,
+            )
+            await self._send_delivery_receipt(event_id)
+
+            msg = DBMessage(mxid=event_id, mx_room=self.mxid, sender=sender.address,
+                            timestamp=request_id,
+                            signal_chat_id=self.chat_id, signal_receiver=self.receiver)
+            await msg.insert()
+            self.log.debug(f"Handled Matrix message {event_id} -> {request_id}")
+            if attachment_path and self.config["signal.remove_file_after_handling"]:
+                try:
+                    os.remove(attachment_path)
+                except FileNotFoundError:
+                    pass
 
     async def handle_matrix_reaction(self, sender: 'u.User', event_id: EventID,
                                      reacting_to: EventID, emoji: str) -> None:
@@ -340,12 +357,28 @@ class Portal(DBPortal, BasePortal):
             reaction = Reaction(emoji=emoji, remove=False,
                                 target_author=message.sender,
                                 target_sent_timestamp=message.timestamp)
-            await self.signal.react(username=sender.username, recipient=self.chat_id,
-                                    reaction=reaction)
-            await self._upsert_reaction(existing, self.main_intent, event_id, sender, message,
-                                        emoji)
-            self.log.trace(f"{sender.mxid} reacted to {message.timestamp} with {emoji}")
-        await self._send_delivery_receipt(event_id)
+            try:
+                await self.signal.react(username=sender.username, recipient=self.chat_id,
+                                        reaction=reaction)
+            except Exception as e:
+                sender.send_remote_checkpoint(
+                    MessageSendCheckpointStatus.PERM_FAILURE,
+                    event_id,
+                    self.mxid,
+                    EventType.REACTION,
+                    error=e,
+                )
+            else:
+                self.log.trace(f"{sender.mxid} reacted to {message.timestamp} with {emoji}")
+                sender.send_remote_checkpoint(
+                    MessageSendCheckpointStatus.SUCCESS,
+                    event_id,
+                    self.mxid,
+                    EventType.REACTION,
+                )
+                await self._upsert_reaction(existing, self.main_intent, event_id, sender, message,
+                                            emoji)
+                await self._send_delivery_receipt(event_id)
 
     async def handle_matrix_redaction(self, sender: 'u.User', event_id: EventID,
                                       redaction_event_id: EventID) -> None:
@@ -358,10 +391,24 @@ class Portal(DBPortal, BasePortal):
                 await message.delete()
                 await self.signal.remote_delete(sender.username, recipient=self.chat_id,
                                                 timestamp=message.timestamp)
-                await self._send_delivery_receipt(redaction_event_id)
-                self.log.trace(f"Removed {message} after Matrix redaction")
-            except Exception:
+            except Exception as e:
                 self.log.exception("Removing message failed")
+                sender.send_remote_checkpoint(
+                    MessageSendCheckpointStatus.PERM_FAILURE,
+                    event_id,
+                    self.mxid,
+                    EventType.ROOM_REDACTION,
+                    error=e,
+                )
+            else:
+                self.log.trace(f"Removed {message} after Matrix redaction")
+                sender.send_remote_checkpoint(
+                    MessageSendCheckpointStatus.SUCCESS,
+                    event_id,
+                    self.mxid,
+                    EventType.ROOM_REDACTION,
+                )
+                await self._send_delivery_receipt(redaction_event_id)
 
         reaction = await DBReaction.get_by_mxid(event_id, self.mxid)
         if reaction:
@@ -372,10 +419,24 @@ class Portal(DBPortal, BasePortal):
                                            target_sent_timestamp=reaction.msg_timestamp)
                 await self.signal.react(username=sender.username, recipient=self.chat_id,
                                         reaction=remove_reaction)
-                await self._send_delivery_receipt(redaction_event_id)
-                self.log.trace(f"Removed {reaction} after Matrix redaction")
-            except Exception:
+            except Exception as e:
                 self.log.exception("Removing reaction failed")
+                sender.send_remote_checkpoint(
+                    MessageSendCheckpointStatus.PERM_FAILURE,
+                    event_id,
+                    self.mxid,
+                    EventType.ROOM_REDACTION,
+                    error=e,
+                )
+            else:
+                self.log.trace(f"Removed {reaction} after Matrix redaction")
+                sender.send_remote_checkpoint(
+                    MessageSendCheckpointStatus.SUCCESS,
+                    event_id,
+                    self.mxid,
+                    EventType.ROOM_REDACTION,
+                )
+                await self._send_delivery_receipt(redaction_event_id)
 
     async def handle_matrix_join(self, user: 'u.User') -> None:
         if self.is_direct or not await user.is_logged_in():

+ 1 - 1
requirements.txt

@@ -4,5 +4,5 @@ commonmark>=0.8,<0.10
 aiohttp>=3,<4
 yarl>=1,<2
 attrs>=19.1
-mautrix>=0.11.1,<0.12
+mautrix>=0.11.2,<0.12
 asyncpg>=0.20,<0.25