Browse Source

Merge pull request #20 from mautrix/sumner/bri-827-add-bridge-and-remote-message-tracking

Tulir Asokan 3 years ago
parent
commit
13b6b157b6

+ 8 - 4
mautrix_instagram/__main__.py

@@ -61,8 +61,8 @@ class InstagramBridge(Bridge):
         self.state_store = PgBridgeStateStore(self.db, self.get_puppet, self.get_double_puppet)
         self.state_store = PgBridgeStateStore(self.db, self.get_puppet, self.get_double_puppet)
 
 
     def prepare_db(self) -> None:
     def prepare_db(self) -> None:
-        self.db = Database(self.config["appservice.database"], upgrade_table=upgrade_table,
-                           loop=self.loop, db_args=self.config["appservice.database_opts"])
+        self.db = Database.create(self.config["appservice.database"], upgrade_table=upgrade_table,
+                                  db_args=self.config["appservice.database_opts"])
         init_db(self.db)
         init_db(self.db)
 
 
     def prepare_bridge(self) -> None:
     def prepare_bridge(self) -> None:
@@ -74,9 +74,9 @@ class InstagramBridge(Bridge):
 
 
     async def start(self) -> None:
     async def start(self) -> None:
         await self.db.start()
         await self.db.start()
-        await self.state_store.upgrade_table.upgrade(self.db.pool)
+        await self.state_store.upgrade_table.upgrade(self.db)
         if self.matrix.e2ee:
         if self.matrix.e2ee:
-            self.matrix.e2ee.crypto_db.override_pool(self.db.pool)
+            self.matrix.e2ee.crypto_db.override_pool(self.db)
         self.add_startup_actions(User.init_cls(self))
         self.add_startup_actions(User.init_cls(self))
         self.add_startup_actions(Puppet.init_cls(self))
         self.add_startup_actions(Puppet.init_cls(self))
         Portal.init_cls(self)
         Portal.init_cls(self)
@@ -92,6 +92,10 @@ class InstagramBridge(Bridge):
         for puppet in Puppet.by_custom_mxid.values():
         for puppet in Puppet.by_custom_mxid.values():
             puppet.stop()
             puppet.stop()
 
 
+    async def stop(self) -> None:
+        await super().stop()
+        await self.db.stop()
+
     async def resend_bridge_info(self) -> None:
     async def resend_bridge_info(self) -> None:
         self.config["bridge.resend_bridge_info"] = False
         self.config["bridge.resend_bridge_info"] = False
         self.config.save()
         self.config.save()

+ 3 - 1
mautrix_instagram/example-config.yaml

@@ -11,9 +11,11 @@ homeserver:
     # Number of retries for all HTTP requests if the homeserver isn't reachable.
     # Number of retries for all HTTP requests if the homeserver isn't reachable.
     http_retry_count: 4
     http_retry_count: 4
     # The URL to push real-time bridge status to.
     # The URL to push real-time bridge status to.
-    # If set, the bridge will make POST requests to this URL whenever a user's Facebook MQTT connection state changes.
+    # If set, the bridge will make POST requests to this URL whenever a user's Instagram MQTT connection state changes.
     # The bridge will use the appservice as_token to authorize requests.
     # The bridge will use the appservice as_token to authorize requests.
     status_endpoint: null
     status_endpoint: null
+    # Endpoint for reporting per-message status.
+    message_send_checkpoint_endpoint: null
 
 
 # Application service host/registration related details
 # Application service host/registration related details
 # Changing these values requires regeneration of the registration.
 # Changing these values requires regeneration of the registration.

+ 131 - 27
mautrix_instagram/portal.py

@@ -22,6 +22,7 @@ import asyncio
 
 
 import asyncpg
 import asyncpg
 import magic
 import magic
+from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus
 
 
 from mauigpapi.types import (Thread, ThreadUser, ThreadItem, RegularMediaItem, MediaType,
 from mauigpapi.types import (Thread, ThreadUser, ThreadItem, RegularMediaItem, MediaType,
                              ReactionStatus, Reaction, AnimatedMediaItem, ThreadItemType,
                              ReactionStatus, Reaction, AnimatedMediaItem, ThreadItemType,
@@ -133,17 +134,32 @@ class Portal(DBPortal, BasePortal):
             except Exception:
             except Exception:
                 self.log.exception("Failed to send delivery receipt for %s", event_id)
                 self.log.exception("Failed to send delivery receipt for %s", event_id)
 
 
-    async def _send_bridge_error(self, msg: str, event_type: str = "message",
-                                 confirmed: bool = False) -> None:
+    async def _send_bridge_error(self, sender: 'u.User', err: Exception, event_id: EventID,
+                                 event_type: EventType,
+                                 message_type: Optional[MessageType] = None,
+                                 msg: Optional[str] = None, confirmed: bool = False) -> None:
+        sender.send_remote_checkpoint(
+            MessageSendCheckpointStatus.PERM_FAILURE,
+            event_id,
+            self.mxid,
+            event_type,
+            message_type=message_type,
+            error=err
+        )
+
         if self.config["bridge.delivery_error_reports"]:
         if self.config["bridge.delivery_error_reports"]:
+            event_type_str = {
+                EventType.REACTION: "reaction",
+                EventType.ROOM_REDACTION: "redaction",
+            }.get(event_type, "message")
             error_type = "was not" if confirmed else "may not have been"
             error_type = "was not" if confirmed else "may not have been"
             await self._send_message(self.main_intent, TextMessageEventContent(
             await self._send_message(self.main_intent, TextMessageEventContent(
                 msgtype=MessageType.NOTICE,
                 msgtype=MessageType.NOTICE,
-                body=f"\u26a0 Your {event_type} {error_type} bridged: {msg}"))
+                body=f"\u26a0 Your {event_type_str} {error_type} bridged: {msg or str(err)}"))
 
 
-    async def _upsert_reaction(self, existing: DBReaction, intent: IntentAPI, mxid: EventID,
-                               message: DBMessage, sender: Union['u.User', 'p.Puppet'],
-                               reaction: str) -> None:
+    async def _upsert_reaction(self, existing: Optional[DBReaction], intent: IntentAPI,
+                               mxid: EventID, message: DBMessage,
+                               sender: Union['u.User', 'p.Puppet'], reaction: str) -> None:
         if existing:
         if existing:
             self.log.debug(f"_upsert_reaction redacting {existing.mxid} and inserting {mxid}"
             self.log.debug(f"_upsert_reaction redacting {existing.mxid} and inserting {mxid}"
                            f" (message: {message.mxid})")
                            f" (message: {message.mxid})")
@@ -162,10 +178,16 @@ class Portal(DBPortal, BasePortal):
                                     event_id: EventID) -> None:
                                     event_id: EventID) -> None:
         try:
         try:
             await self._handle_matrix_message(sender, message, event_id)
             await self._handle_matrix_message(sender, message, event_id)
-        except Exception:
-            self.log.exception(f"Fatal error handling Matrix event {event_id}")
-            await self._send_bridge_error("Fatal error in message handling "
-                                          "(see logs for more details)")
+        except Exception as e:
+            self.log.exception(f"Fatal error handling Matrix event {event_id}: {e}")
+            await self._send_bridge_error(
+                sender,
+                e,
+                event_id,
+                EventType.ROOM_MESSAGE,
+                message_type=message.msgtype,
+                msg="Fatal error in message handling (see logs for more details)",
+            )
 
 
     async def _handle_matrix_message(self, sender: 'u.User', message: MessageEventContent,
     async def _handle_matrix_message(self, sender: 'u.User', message: MessageEventContent,
                                      event_id: EventID) -> None:
                                      event_id: EventID) -> None:
@@ -174,7 +196,14 @@ class Portal(DBPortal, BasePortal):
             self.log.debug(f"Ignoring puppet-sent message by confirmed puppet user {sender.mxid}")
             self.log.debug(f"Ignoring puppet-sent message by confirmed puppet user {sender.mxid}")
             return
             return
         elif not sender.is_connected:
         elif not sender.is_connected:
-            await self._send_bridge_error("You're not connected to Instagram", confirmed=True)
+            await self._send_bridge_error(
+                sender,
+                Exception("You're not connected to Instagram"),
+                event_id,
+                EventType.ROOM_MESSAGE,
+                message_type=message.msgtype,
+                confirmed=True,
+            )
             return
             return
         request_id = sender.state.gen_client_context()
         request_id = sender.state.gen_client_context()
         self._reqid_dedup.add(request_id)
         self._reqid_dedup.add(request_id)
@@ -213,8 +242,14 @@ class Portal(DBPortal, BasePortal):
                                                      upload_id=upload_resp.upload_id,
                                                      upload_id=upload_resp.upload_id,
                                                      allow_full_aspect_ratio="1")
                                                      allow_full_aspect_ratio="1")
             else:
             else:
-                await self._send_bridge_error("Non-image files are currently not supported",
-                                              confirmed=True)
+                await self._send_bridge_error(
+                    sender,
+                    Exception("Non-image files are currently not supported"),
+                    event_id,
+                    EventType.ROOM_MESSAGE,
+                    message_type=message.msgtype,
+                    confirmed=True,
+                )
                 return
                 return
         else:
         else:
             self.log.debug(f"Unhandled Matrix message {event_id}: "
             self.log.debug(f"Unhandled Matrix message {event_id}: "
@@ -223,8 +258,23 @@ class Portal(DBPortal, BasePortal):
         self.log.trace(f"Got response to message send {request_id}: {resp}")
         self.log.trace(f"Got response to message send {request_id}: {resp}")
         if resp.status != "ok":
         if resp.status != "ok":
             self.log.warning(f"Failed to handle {event_id}: {resp}")
             self.log.warning(f"Failed to handle {event_id}: {resp}")
-            await self._send_bridge_error(resp.payload.message)
+            await self._send_bridge_error(
+                sender,
+                Exception(resp.payload.message),
+                event_id,
+                EventType.ROOM_MESSAGE,
+                message_type=message.msgtype,
+                confirmed=True,
+            )
         else:
         else:
+            sender.send_remote_checkpoint(
+                status=MessageSendCheckpointStatus.SUCCESS,
+                event_id=event_id,
+                room_id=self.mxid,
+                event_type=EventType.ROOM_MESSAGE,
+                message_type=message.msgtype,
+            )
+            await self._send_delivery_receipt(event_id)
             self._msgid_dedup.appendleft(resp.payload.item_id)
             self._msgid_dedup.appendleft(resp.payload.item_id)
             try:
             try:
                 await DBMessage(mxid=event_id, mx_room=self.mxid, item_id=resp.payload.item_id,
                 await DBMessage(mxid=event_id, mx_room=self.mxid, item_id=resp.payload.item_id,
@@ -233,7 +283,6 @@ class Portal(DBPortal, BasePortal):
                 self.log.warning(f"Error while persisting {event_id} "
                 self.log.warning(f"Error while persisting {event_id} "
                                  f"({resp.payload.client_context}) -> {resp.payload.item_id}: {e}")
                                  f"({resp.payload.client_context}) -> {resp.payload.item_id}: {e}")
             self._reqid_dedup.remove(request_id)
             self._reqid_dedup.remove(request_id)
-            await self._send_delivery_receipt(event_id)
             self.log.debug(f"Handled Matrix message {event_id} ({resp.payload.client_context}) "
             self.log.debug(f"Handled Matrix message {event_id} ({resp.payload.client_context}) "
                            f"-> {resp.payload.item_id}")
                            f"-> {resp.payload.item_id}")
 
 
@@ -251,33 +300,71 @@ class Portal(DBPortal, BasePortal):
         dedup_id = (message.item_id, sender.igpk, emoji)
         dedup_id = (message.item_id, sender.igpk, emoji)
         self._reaction_dedup.appendleft(dedup_id)
         self._reaction_dedup.appendleft(dedup_id)
         async with self._reaction_lock:
         async with self._reaction_lock:
-            # TODO check response?
-            await sender.mqtt.send_reaction(self.thread_id, item_id=message.item_id, emoji=emoji)
-            await self._upsert_reaction(existing, self.main_intent, event_id, message, sender,
-                                        emoji)
-            self.log.trace(f"{sender.mxid} reacted to {message.item_id} with {emoji}")
-        await self._send_delivery_receipt(event_id)
+            try:
+                resp = await sender.mqtt.send_reaction(self.thread_id, item_id=message.item_id,
+                                                       emoji=emoji)
+                if resp.status != "ok":
+                    raise Exception(f"Failed to react to {event_id}: {resp}")
+            except Exception as e:
+                self.log.exception(f"Failed to handle {event_id}: {e}")
+                await self._send_bridge_error(
+                    sender,
+                    e,
+                    event_id,
+                    EventType.REACTION,
+                    confirmed=True,
+                )
+            else:
+                sender.send_remote_checkpoint(
+                    status=MessageSendCheckpointStatus.SUCCESS,
+                    event_id=event_id,
+                    room_id=self.mxid,
+                    event_type=EventType.REACTION,
+                )
+                await self._send_delivery_receipt(event_id)
+                self.log.trace(f"{sender.mxid} reacted to {message.item_id} with {emoji}")
+                await self._upsert_reaction(existing, self.main_intent, event_id, message, sender,
+                                            emoji)
 
 
     async def handle_matrix_redaction(self, sender: 'u.User', event_id: EventID,
     async def handle_matrix_redaction(self, sender: 'u.User', event_id: EventID,
                                       redaction_event_id: EventID) -> None:
                                       redaction_event_id: EventID) -> None:
         if not self.mxid:
         if not self.mxid:
             return
             return
         elif not sender.is_connected:
         elif not sender.is_connected:
-            await self._send_bridge_error("You're not connected to Instagram",
-                                          event_type="redaction", confirmed=True)
+            await self._send_bridge_error(
+                sender,
+                Exception("You're not connected to Instagram"),
+                event_id,
+                EventType.ROOM_REDACTION,
+                confirmed=True,
+            )
             return
             return
 
 
-        # TODO implement
         reaction = await DBReaction.get_by_mxid(event_id, self.mxid)
         reaction = await DBReaction.get_by_mxid(event_id, self.mxid)
         if reaction:
         if reaction:
             try:
             try:
                 await reaction.delete()
                 await reaction.delete()
                 await sender.mqtt.send_reaction(self.thread_id, item_id=reaction.ig_item_id,
                 await sender.mqtt.send_reaction(self.thread_id, item_id=reaction.ig_item_id,
                                                 reaction_status=ReactionStatus.DELETED, emoji="")
                                                 reaction_status=ReactionStatus.DELETED, emoji="")
+            except Exception as e:
+                self.log.exception("Removing reaction failed")
+                await self._send_bridge_error(
+                    sender,
+                    e,
+                    event_id,
+                    EventType.ROOM_REDACTION,
+                    confirmed=True,
+                    msg=f"Remvoving reaction {reaction.reaction} from {event_id} failed.",
+                )
+            else:
+                sender.send_remote_checkpoint(
+                    status=MessageSendCheckpointStatus.SUCCESS,
+                    event_id=event_id,
+                    room_id=self.mxid,
+                    event_type=EventType.ROOM_REDACTION,
+                )
                 await self._send_delivery_receipt(redaction_event_id)
                 await self._send_delivery_receipt(redaction_event_id)
                 self.log.trace(f"Removed {reaction} after Matrix redaction")
                 self.log.trace(f"Removed {reaction} after Matrix redaction")
-            except Exception:
-                self.log.exception("Removing reaction failed")
             return
             return
 
 
         message = await DBMessage.get_by_mxid(event_id, self.mxid)
         message = await DBMessage.get_by_mxid(event_id, self.mxid)
@@ -286,8 +373,25 @@ class Portal(DBPortal, BasePortal):
                 await message.delete()
                 await message.delete()
                 await sender.client.delete_item(self.thread_id, message.item_id)
                 await sender.client.delete_item(self.thread_id, message.item_id)
                 self.log.trace(f"Removed {message} after Matrix redaction")
                 self.log.trace(f"Removed {message} after Matrix redaction")
-            except Exception:
+            except Exception as e:
                 self.log.exception("Removing message failed")
                 self.log.exception("Removing message failed")
+                await self._send_bridge_error(
+                    sender,
+                    e,
+                    event_id,
+                    EventType.ROOM_REDACTION,
+                    confirmed=True,
+                    msg=f"Remvoving message {event_id} failed.",
+                )
+            else:
+                sender.send_remote_checkpoint(
+                    status=MessageSendCheckpointStatus.SUCCESS,
+                    event_id=event_id,
+                    room_id=self.mxid,
+                    event_type=EventType.ROOM_REDACTION,
+                )
+                await self._send_delivery_receipt(redaction_event_id)
+                self.log.trace(f"Removed {reaction} after Matrix redaction")
 
 
     async def handle_matrix_typing(self, users: Set[UserID]) -> None:
     async def handle_matrix_typing(self, users: Set[UserID]) -> None:
         if users == self._typing:
         if users == self._typing:

+ 1 - 1
requirements.txt

@@ -4,7 +4,7 @@ commonmark>=0.8,<0.10
 aiohttp>=3,<4
 aiohttp>=3,<4
 yarl>=1,<2
 yarl>=1,<2
 attrs>=20.1
 attrs>=20.1
-mautrix>=0.10.5,<0.11
+mautrix>=0.11.2,<0.12
 asyncpg>=0.20,<0.25
 asyncpg>=0.20,<0.25
 pycryptodome>=3,<4
 pycryptodome>=3,<4
 paho-mqtt>=1.5,<2
 paho-mqtt>=1.5,<2