Ver código fonte

portal: report message send checkpoint on all message types

Sumner Evans 3 anos atrás
pai
commit
7fd7bc7a11
1 arquivos alterados com 132 adições e 27 exclusões
  1. 132 27
      mautrix_instagram/portal.py

+ 132 - 27
mautrix_instagram/portal.py

@@ -22,6 +22,7 @@ import asyncio
 
 import asyncpg
 import magic
+from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus
 
 from mauigpapi.types import (Thread, ThreadUser, ThreadItem, RegularMediaItem, MediaType,
                              ReactionStatus, Reaction, AnimatedMediaItem, ThreadItemType,
@@ -133,17 +134,33 @@ class Portal(DBPortal, BasePortal):
             except Exception:
                 self.log.exception("Failed to send delivery receipt for %s", event_id)
 
-    async def _send_bridge_error(self, msg: str, event_type: str = "message",
-                                 confirmed: bool = False) -> None:
+    async def _send_bridge_error(self, sender: 'u.User', err: Exception, event_id: EventID,
+                                 event_type: EventType,
+                                 message_type: Optional[MessageType] = None,
+                                 msg: Optional[str] = None, confirmed: bool = False) -> None:
+        if self.config["homeserver.message_send_checkpoint_endpoint"]:
+            sender.send_remote_checkpoint(
+                MessageSendCheckpointStatus.PERM_FAILURE,
+                event_id,
+                self.mxid,
+                event_type,
+                message_type=message_type,
+                error=err
+            )
+
         if self.config["bridge.delivery_error_reports"]:
+            event_type_str = {
+                EventType.REACTION: "reaction",
+                EventType.ROOM_REDACTION: "redaction",
+            }.get(event_type, "message")
             error_type = "was not" if confirmed else "may not have been"
             await self._send_message(self.main_intent, TextMessageEventContent(
                 msgtype=MessageType.NOTICE,
-                body=f"\u26a0 Your {event_type} {error_type} bridged: {msg}"))
+                body=f"\u26a0 Your {event_type_str} {error_type} bridged: {msg or str(err)}"))
 
-    async def _upsert_reaction(self, existing: DBReaction, intent: IntentAPI, mxid: EventID,
-                               message: DBMessage, sender: Union['u.User', 'p.Puppet'],
-                               reaction: str) -> None:
+    async def _upsert_reaction(self, existing: Optional[DBReaction], intent: IntentAPI,
+                               mxid: EventID, message: DBMessage,
+                               sender: Union['u.User', 'p.Puppet'], reaction: str) -> None:
         if existing:
             self.log.debug(f"_upsert_reaction redacting {existing.mxid} and inserting {mxid}"
                            f" (message: {message.mxid})")
@@ -162,10 +179,16 @@ class Portal(DBPortal, BasePortal):
                                     event_id: EventID) -> None:
         try:
             await self._handle_matrix_message(sender, message, event_id)
-        except Exception:
-            self.log.exception(f"Fatal error handling Matrix event {event_id}")
-            await self._send_bridge_error("Fatal error in message handling "
-                                          "(see logs for more details)")
+        except Exception as e:
+            self.log.exception(f"Fatal error handling Matrix event {event_id}: {e}")
+            await self._send_bridge_error(
+                sender,
+                e,
+                event_id,
+                EventType.ROOM_MESSAGE,
+                message_type=message.msgtype,
+                msg="Fatal error in message handling (see logs for more details)",
+            )
 
     async def _handle_matrix_message(self, sender: 'u.User', message: MessageEventContent,
                                      event_id: EventID) -> None:
@@ -174,7 +197,14 @@ class Portal(DBPortal, BasePortal):
             self.log.debug(f"Ignoring puppet-sent message by confirmed puppet user {sender.mxid}")
             return
         elif not sender.is_connected:
-            await self._send_bridge_error("You're not connected to Instagram", confirmed=True)
+            await self._send_bridge_error(
+                sender,
+                Exception("You're not connected to Instagram"),
+                event_id,
+                EventType.ROOM_MESSAGE,
+                message_type=message.msgtype,
+                confirmed=True,
+            )
             return
         request_id = sender.state.gen_client_context()
         self._reqid_dedup.add(request_id)
@@ -213,8 +243,14 @@ class Portal(DBPortal, BasePortal):
                                                      upload_id=upload_resp.upload_id,
                                                      allow_full_aspect_ratio="1")
             else:
-                await self._send_bridge_error("Non-image files are currently not supported",
-                                              confirmed=True)
+                await self._send_bridge_error(
+                    sender,
+                    Exception("Non-image files are currently not supported"),
+                    event_id,
+                    EventType.ROOM_MESSAGE,
+                    message_type=message.msgtype,
+                    confirmed=True,
+                )
                 return
         else:
             self.log.debug(f"Unhandled Matrix message {event_id}: "
@@ -223,8 +259,23 @@ class Portal(DBPortal, BasePortal):
         self.log.trace(f"Got response to message send {request_id}: {resp}")
         if resp.status != "ok":
             self.log.warning(f"Failed to handle {event_id}: {resp}")
-            await self._send_bridge_error(resp.payload.message)
+            await self._send_bridge_error(
+                sender,
+                Exception(resp.payload.message),
+                event_id,
+                EventType.ROOM_MESSAGE,
+                message_type=message.msgtype,
+                confirmed=True,
+            )
         else:
+            sender.send_remote_checkpoint(
+                status=MessageSendCheckpointStatus.SUCCESS,
+                event_id=event_id,
+                room_id=self.mxid,
+                event_type=EventType.ROOM_MESSAGE,
+                message_type=message.msgtype,
+            )
+            await self._send_delivery_receipt(event_id)
             self._msgid_dedup.appendleft(resp.payload.item_id)
             try:
                 await DBMessage(mxid=event_id, mx_room=self.mxid, item_id=resp.payload.item_id,
@@ -233,7 +284,6 @@ class Portal(DBPortal, BasePortal):
                 self.log.warning(f"Error while persisting {event_id} "
                                  f"({resp.payload.client_context}) -> {resp.payload.item_id}: {e}")
             self._reqid_dedup.remove(request_id)
-            await self._send_delivery_receipt(event_id)
             self.log.debug(f"Handled Matrix message {event_id} ({resp.payload.client_context}) "
                            f"-> {resp.payload.item_id}")
 
@@ -251,33 +301,71 @@ class Portal(DBPortal, BasePortal):
         dedup_id = (message.item_id, sender.igpk, emoji)
         self._reaction_dedup.appendleft(dedup_id)
         async with self._reaction_lock:
-            # TODO check response?
-            await sender.mqtt.send_reaction(self.thread_id, item_id=message.item_id, emoji=emoji)
-            await self._upsert_reaction(existing, self.main_intent, event_id, message, sender,
-                                        emoji)
-            self.log.trace(f"{sender.mxid} reacted to {message.item_id} with {emoji}")
-        await self._send_delivery_receipt(event_id)
+            try:
+                resp = await sender.mqtt.send_reaction(self.thread_id, item_id=message.item_id,
+                                                       emoji=emoji)
+                if resp.status != "ok":
+                    raise Exception(f"Failed to react to {event_id}: {resp}")
+            except Exception as e:
+                self.log.exception(f"Failed to handle {event_id}: {e}")
+                await self._send_bridge_error(
+                    sender,
+                    e,
+                    event_id,
+                    EventType.REACTION,
+                    confirmed=True,
+                )
+            else:
+                sender.send_remote_checkpoint(
+                    status=MessageSendCheckpointStatus.SUCCESS,
+                    event_id=event_id,
+                    room_id=self.mxid,
+                    event_type=EventType.REACTION,
+                )
+                await self._send_delivery_receipt(event_id)
+                self.log.trace(f"{sender.mxid} reacted to {message.item_id} with {emoji}")
+                await self._upsert_reaction(existing, self.main_intent, event_id, message, sender,
+                                            emoji)
 
     async def handle_matrix_redaction(self, sender: 'u.User', event_id: EventID,
                                       redaction_event_id: EventID) -> None:
         if not self.mxid:
             return
         elif not sender.is_connected:
-            await self._send_bridge_error("You're not connected to Instagram",
-                                          event_type="redaction", confirmed=True)
+            await self._send_bridge_error(
+                sender,
+                Exception("You're not connected to Instagram"),
+                event_id,
+                EventType.ROOM_REDACTION,
+                confirmed=True,
+            )
             return
 
-        # TODO implement
         reaction = await DBReaction.get_by_mxid(event_id, self.mxid)
         if reaction:
             try:
                 await reaction.delete()
                 await sender.mqtt.send_reaction(self.thread_id, item_id=reaction.ig_item_id,
                                                 reaction_status=ReactionStatus.DELETED, emoji="")
+            except Exception as e:
+                self.log.exception("Removing reaction failed")
+                await self._send_bridge_error(
+                    sender,
+                    e,
+                    event_id,
+                    EventType.ROOM_REDACTION,
+                    confirmed=True,
+                    msg=f"Remvoving reaction {reaction.reaction} from {event_id} failed.",
+                )
+            else:
+                sender.send_remote_checkpoint(
+                    status=MessageSendCheckpointStatus.SUCCESS,
+                    event_id=event_id,
+                    room_id=self.mxid,
+                    event_type=EventType.ROOM_REDACTION,
+                )
                 await self._send_delivery_receipt(redaction_event_id)
                 self.log.trace(f"Removed {reaction} after Matrix redaction")
-            except Exception:
-                self.log.exception("Removing reaction failed")
             return
 
         message = await DBMessage.get_by_mxid(event_id, self.mxid)
@@ -286,8 +374,25 @@ class Portal(DBPortal, BasePortal):
                 await message.delete()
                 await sender.client.delete_item(self.thread_id, message.item_id)
                 self.log.trace(f"Removed {message} after Matrix redaction")
-            except Exception:
+            except Exception as e:
                 self.log.exception("Removing message failed")
+                await self._send_bridge_error(
+                    sender,
+                    e,
+                    event_id,
+                    EventType.ROOM_REDACTION,
+                    confirmed=True,
+                    msg=f"Remvoving message {event_id} failed.",
+                )
+            else:
+                sender.send_remote_checkpoint(
+                    status=MessageSendCheckpointStatus.SUCCESS,
+                    event_id=event_id,
+                    room_id=self.mxid,
+                    event_type=EventType.ROOM_REDACTION,
+                )
+                await self._send_delivery_receipt(redaction_event_id)
+                self.log.trace(f"Removed {reaction} after Matrix redaction")
 
     async def handle_matrix_typing(self, users: Set[UserID]) -> None:
         if users == self._typing: