|
@@ -139,9 +139,11 @@ class Portal(DBPortal, BasePortal):
|
|
async def _send_bridge_error(self, sender: 'u.User', err: Union[Exception, str],
|
|
async def _send_bridge_error(self, sender: 'u.User', err: Union[Exception, str],
|
|
event_id: EventID, event_type: EventType,
|
|
event_id: EventID, event_type: EventType,
|
|
message_type: Optional[MessageType] = None,
|
|
message_type: Optional[MessageType] = None,
|
|
- msg: Optional[str] = None, confirmed: bool = False) -> None:
|
|
|
|
|
|
+ msg: Optional[str] = None, confirmed: bool = False,
|
|
|
|
+ status: MessageSendCheckpointStatus = MessageSendCheckpointStatus.PERM_FAILURE,
|
|
|
|
+ ) -> None:
|
|
sender.send_remote_checkpoint(
|
|
sender.send_remote_checkpoint(
|
|
- MessageSendCheckpointStatus.PERM_FAILURE,
|
|
|
|
|
|
+ status,
|
|
event_id,
|
|
event_id,
|
|
self.mxid,
|
|
self.mxid,
|
|
event_type,
|
|
event_type,
|
|
@@ -176,6 +178,11 @@ class Portal(DBPortal, BasePortal):
|
|
# endregion
|
|
# endregion
|
|
# region Matrix event handling
|
|
# region Matrix event handling
|
|
|
|
|
|
|
|
+ def _status_from_exception(self, e: Exception) -> MessageSendCheckpointStatus:
|
|
|
|
+ if isinstance(e, NotImplementedError):
|
|
|
|
+ return MessageSendCheckpointStatus.UNSUPPORTED
|
|
|
|
+ return MessageSendCheckpointStatus.PERM_FAILURE
|
|
|
|
+
|
|
async def handle_matrix_message(self, sender: 'u.User', message: MessageEventContent,
|
|
async def handle_matrix_message(self, sender: 'u.User', message: MessageEventContent,
|
|
event_id: EventID) -> None:
|
|
event_id: EventID) -> None:
|
|
try:
|
|
try:
|
|
@@ -188,33 +195,17 @@ class Portal(DBPortal, BasePortal):
|
|
event_id,
|
|
event_id,
|
|
EventType.ROOM_MESSAGE,
|
|
EventType.ROOM_MESSAGE,
|
|
message_type=message.msgtype,
|
|
message_type=message.msgtype,
|
|
- msg="Fatal error in message handling (see logs for more details)",
|
|
|
|
|
|
+ status=self._status_from_exception(e),
|
|
|
|
+ confirmed=True,
|
|
)
|
|
)
|
|
|
|
|
|
async def _handle_matrix_message(self, orig_sender: 'u.User', message: MessageEventContent,
|
|
async def _handle_matrix_message(self, orig_sender: 'u.User', message: MessageEventContent,
|
|
event_id: EventID) -> None:
|
|
event_id: EventID) -> None:
|
|
sender, is_relay = await self.get_relay_sender(orig_sender, f"message {event_id}")
|
|
sender, is_relay = await self.get_relay_sender(orig_sender, f"message {event_id}")
|
|
- if not sender:
|
|
|
|
- orig_sender.send_remote_checkpoint(
|
|
|
|
- status=MessageSendCheckpointStatus.PERM_FAILURE,
|
|
|
|
- event_id=event_id,
|
|
|
|
- room_id=self.mxid,
|
|
|
|
- event_type=EventType.ROOM_MESSAGE,
|
|
|
|
- message_type=message.msgtype,
|
|
|
|
- error="user is not logged in",
|
|
|
|
- )
|
|
|
|
- return
|
|
|
|
- elif not sender.is_connected:
|
|
|
|
- await self._send_bridge_error(
|
|
|
|
- sender,
|
|
|
|
- "You're not connected to Instagram",
|
|
|
|
- event_id,
|
|
|
|
- EventType.ROOM_MESSAGE,
|
|
|
|
- message_type=message.msgtype,
|
|
|
|
- confirmed=True,
|
|
|
|
- )
|
|
|
|
- return
|
|
|
|
- elif is_relay:
|
|
|
|
|
|
+ assert sender, "user is not logged in"
|
|
|
|
+ assert sender.is_connected, "You're not connected to Instagram"
|
|
|
|
+
|
|
|
|
+ if is_relay:
|
|
await self.apply_relay_message_format(orig_sender, message)
|
|
await self.apply_relay_message_format(orig_sender, message)
|
|
|
|
|
|
request_id = sender.state.gen_client_context()
|
|
request_id = sender.state.gen_client_context()
|
|
@@ -254,30 +245,14 @@ class Portal(DBPortal, BasePortal):
|
|
upload_id=upload_resp.upload_id,
|
|
upload_id=upload_resp.upload_id,
|
|
allow_full_aspect_ratio="1")
|
|
allow_full_aspect_ratio="1")
|
|
else:
|
|
else:
|
|
- await self._send_bridge_error(
|
|
|
|
- sender,
|
|
|
|
- "Non-image files are currently not supported",
|
|
|
|
- event_id,
|
|
|
|
- EventType.ROOM_MESSAGE,
|
|
|
|
- message_type=message.msgtype,
|
|
|
|
- confirmed=True,
|
|
|
|
- )
|
|
|
|
- return
|
|
|
|
|
|
+ raise NotImplementedError("Non-image files are currently not supported")
|
|
else:
|
|
else:
|
|
- self.log.debug(f"Unhandled Matrix message {event_id}: "
|
|
|
|
- f"unknown msgtype {message.msgtype}")
|
|
|
|
- return
|
|
|
|
|
|
+ raise NotImplementedError(f"Unknown message type {message.msgtype}")
|
|
|
|
+
|
|
self.log.trace(f"Got response to message send {request_id}: {resp}")
|
|
self.log.trace(f"Got response to message send {request_id}: {resp}")
|
|
if resp.status != "ok":
|
|
if resp.status != "ok":
|
|
self.log.warning(f"Failed to handle {event_id}: {resp}")
|
|
self.log.warning(f"Failed to handle {event_id}: {resp}")
|
|
- await self._send_bridge_error(
|
|
|
|
- sender,
|
|
|
|
- resp.payload.message,
|
|
|
|
- event_id,
|
|
|
|
- EventType.ROOM_MESSAGE,
|
|
|
|
- message_type=message.msgtype,
|
|
|
|
- confirmed=True,
|
|
|
|
- )
|
|
|
|
|
|
+ raise Exception(f"Failed to handle event. Error: {resp.payload.message}")
|
|
else:
|
|
else:
|
|
sender.send_remote_checkpoint(
|
|
sender.send_remote_checkpoint(
|
|
status=MessageSendCheckpointStatus.SUCCESS,
|
|
status=MessageSendCheckpointStatus.SUCCESS,
|
|
@@ -300,6 +275,22 @@ class Portal(DBPortal, BasePortal):
|
|
|
|
|
|
async def handle_matrix_reaction(self, sender: 'u.User', event_id: EventID,
|
|
async def handle_matrix_reaction(self, sender: 'u.User', event_id: EventID,
|
|
reacting_to: EventID, emoji: str) -> None:
|
|
reacting_to: EventID, emoji: str) -> None:
|
|
|
|
+ try:
|
|
|
|
+ await self._handle_matrix_reaction(sender, event_id, reacting_to, emoji)
|
|
|
|
+ except Exception as e:
|
|
|
|
+ self.log.exception(f"Fatal error handling Matrix event {event_id}: {e}")
|
|
|
|
+ await self._send_bridge_error(
|
|
|
|
+ sender,
|
|
|
|
+ e,
|
|
|
|
+ event_id,
|
|
|
|
+ EventType.REACTION,
|
|
|
|
+ status=self._status_from_exception(e),
|
|
|
|
+ confirmed=True,
|
|
|
|
+ msg="Fatal error handling reaction (see logs for more details)",
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ async def _handle_matrix_reaction(self, sender: 'u.User', event_id: EventID,
|
|
|
|
+ reacting_to: EventID, emoji: str) -> None:
|
|
message = await DBMessage.get_by_mxid(reacting_to, self.mxid)
|
|
message = await DBMessage.get_by_mxid(reacting_to, self.mxid)
|
|
if not message or message.is_internal:
|
|
if not message or message.is_internal:
|
|
self.log.debug(f"Ignoring reaction to unknown event {reacting_to}")
|
|
self.log.debug(f"Ignoring reaction to unknown event {reacting_to}")
|
|
@@ -311,6 +302,12 @@ class Portal(DBPortal, BasePortal):
|
|
|
|
|
|
existing = await DBReaction.get_by_item_id(message.item_id, message.receiver, sender.igpk)
|
|
existing = await DBReaction.get_by_item_id(message.item_id, message.receiver, sender.igpk)
|
|
if existing and existing.reaction == emoji:
|
|
if existing and existing.reaction == emoji:
|
|
|
|
+ sender.send_remote_checkpoint(
|
|
|
|
+ status=MessageSendCheckpointStatus.SUCCESS,
|
|
|
|
+ event_id=event_id,
|
|
|
|
+ room_id=self.mxid,
|
|
|
|
+ event_type=EventType.REACTION,
|
|
|
|
+ )
|
|
return
|
|
return
|
|
|
|
|
|
dedup_id = (message.item_id, sender.igpk, emoji)
|
|
dedup_id = (message.item_id, sender.igpk, emoji)
|
|
@@ -323,13 +320,7 @@ class Portal(DBPortal, BasePortal):
|
|
raise Exception(f"Failed to react to {event_id}: {resp}")
|
|
raise Exception(f"Failed to react to {event_id}: {resp}")
|
|
except Exception as e:
|
|
except Exception as e:
|
|
self.log.exception(f"Failed to handle {event_id}: {e}")
|
|
self.log.exception(f"Failed to handle {event_id}: {e}")
|
|
- await self._send_bridge_error(
|
|
|
|
- sender,
|
|
|
|
- e,
|
|
|
|
- event_id,
|
|
|
|
- EventType.REACTION,
|
|
|
|
- confirmed=True,
|
|
|
|
- )
|
|
|
|
|
|
+ raise
|
|
else:
|
|
else:
|
|
sender.send_remote_checkpoint(
|
|
sender.send_remote_checkpoint(
|
|
status=MessageSendCheckpointStatus.SUCCESS,
|
|
status=MessageSendCheckpointStatus.SUCCESS,
|
|
@@ -344,25 +335,28 @@ class Portal(DBPortal, BasePortal):
|
|
|
|
|
|
async def handle_matrix_redaction(self, orig_sender: 'u.User', event_id: EventID,
|
|
async def handle_matrix_redaction(self, orig_sender: 'u.User', event_id: EventID,
|
|
redaction_event_id: EventID) -> None:
|
|
redaction_event_id: EventID) -> None:
|
|
- sender, _ = await self.get_relay_sender(orig_sender, f"redaction {event_id}")
|
|
|
|
- if not sender:
|
|
|
|
- orig_sender.send_remote_checkpoint(
|
|
|
|
- status=MessageSendCheckpointStatus.PERM_FAILURE,
|
|
|
|
- event_id=redaction_event_id,
|
|
|
|
- room_id=self.mxid,
|
|
|
|
- event_type=EventType.ROOM_REDACTION,
|
|
|
|
- error="user is not logged in",
|
|
|
|
- )
|
|
|
|
- return
|
|
|
|
- elif not sender.is_connected:
|
|
|
|
|
|
+ sender = None
|
|
|
|
+ try:
|
|
|
|
+ sender, _ = await self.get_relay_sender(orig_sender, f"redaction {event_id}")
|
|
|
|
+ if not sender:
|
|
|
|
+ raise Exception("User is not logged in")
|
|
|
|
+
|
|
|
|
+ await self._handle_matrix_redaction(sender, event_id, redaction_event_id)
|
|
|
|
+ except Exception as e:
|
|
|
|
+ self.log.exception(f"Fatal error handling Matrix event {event_id}: {e}")
|
|
await self._send_bridge_error(
|
|
await self._send_bridge_error(
|
|
- sender,
|
|
|
|
- "You're not connected to Instagram",
|
|
|
|
|
|
+ sender or orig_sender,
|
|
|
|
+ e,
|
|
redaction_event_id,
|
|
redaction_event_id,
|
|
EventType.ROOM_REDACTION,
|
|
EventType.ROOM_REDACTION,
|
|
|
|
+ status=self._status_from_exception(e),
|
|
confirmed=True,
|
|
confirmed=True,
|
|
)
|
|
)
|
|
- return
|
|
|
|
|
|
+
|
|
|
|
+ async def _handle_matrix_redaction(self, sender: 'u.User', event_id: EventID,
|
|
|
|
+ redaction_event_id: EventID) -> None:
|
|
|
|
+ if not sender.is_connected:
|
|
|
|
+ raise Exception("You're not connected to Instagram")
|
|
|
|
|
|
reaction = await DBReaction.get_by_mxid(event_id, self.mxid)
|
|
reaction = await DBReaction.get_by_mxid(event_id, self.mxid)
|
|
if reaction:
|
|
if reaction:
|
|
@@ -371,15 +365,7 @@ class Portal(DBPortal, BasePortal):
|
|
await sender.mqtt.send_reaction(self.thread_id, item_id=reaction.ig_item_id,
|
|
await sender.mqtt.send_reaction(self.thread_id, item_id=reaction.ig_item_id,
|
|
reaction_status=ReactionStatus.DELETED, emoji="")
|
|
reaction_status=ReactionStatus.DELETED, emoji="")
|
|
except Exception as e:
|
|
except Exception as e:
|
|
- self.log.exception("Removing reaction failed")
|
|
|
|
- await self._send_bridge_error(
|
|
|
|
- sender,
|
|
|
|
- e,
|
|
|
|
- redaction_event_id,
|
|
|
|
- EventType.ROOM_REDACTION,
|
|
|
|
- confirmed=True,
|
|
|
|
- msg=f"Removing reaction {reaction.reaction} from {event_id} failed.",
|
|
|
|
- )
|
|
|
|
|
|
+ raise Exception(f"Removing reaction failed: {e}")
|
|
else:
|
|
else:
|
|
sender.send_remote_checkpoint(
|
|
sender.send_remote_checkpoint(
|
|
status=MessageSendCheckpointStatus.SUCCESS,
|
|
status=MessageSendCheckpointStatus.SUCCESS,
|
|
@@ -398,15 +384,7 @@ class Portal(DBPortal, BasePortal):
|
|
await sender.client.delete_item(self.thread_id, message.item_id)
|
|
await sender.client.delete_item(self.thread_id, message.item_id)
|
|
self.log.trace(f"Removed {message} after Matrix redaction")
|
|
self.log.trace(f"Removed {message} after Matrix redaction")
|
|
except Exception as e:
|
|
except Exception as e:
|
|
- self.log.exception("Removing message failed")
|
|
|
|
- await self._send_bridge_error(
|
|
|
|
- sender,
|
|
|
|
- e,
|
|
|
|
- redaction_event_id,
|
|
|
|
- EventType.ROOM_REDACTION,
|
|
|
|
- confirmed=True,
|
|
|
|
- msg=f"Removing message {event_id} failed.",
|
|
|
|
- )
|
|
|
|
|
|
+ raise Exception(f"Removing message failed: {e}")
|
|
else:
|
|
else:
|
|
sender.send_remote_checkpoint(
|
|
sender.send_remote_checkpoint(
|
|
status=MessageSendCheckpointStatus.SUCCESS,
|
|
status=MessageSendCheckpointStatus.SUCCESS,
|
|
@@ -418,13 +396,7 @@ class Portal(DBPortal, BasePortal):
|
|
self.log.trace(f"Removed {reaction} after Matrix redaction")
|
|
self.log.trace(f"Removed {reaction} after Matrix redaction")
|
|
return
|
|
return
|
|
|
|
|
|
- sender.send_remote_checkpoint(
|
|
|
|
- MessageSendCheckpointStatus.PERM_FAILURE,
|
|
|
|
- redaction_event_id,
|
|
|
|
- self.mxid,
|
|
|
|
- EventType.ROOM_REDACTION,
|
|
|
|
- error=Exception("No message or reaction found for redaction"),
|
|
|
|
- )
|
|
|
|
|
|
+ raise Exception("No message or reaction found for redaction")
|
|
|
|
|
|
async def handle_matrix_typing(self, users: Set[UserID]) -> None:
|
|
async def handle_matrix_typing(self, users: Set[UserID]) -> None:
|
|
if users == self._typing:
|
|
if users == self._typing:
|