|
@@ -110,6 +110,10 @@ ChatInfo = Union[Group, GroupV2, GroupV2ID, Contact, Profile, Address]
|
|
MAX_MATRIX_MESSAGE_SIZE = 60000
|
|
MAX_MATRIX_MESSAGE_SIZE = 60000
|
|
|
|
|
|
|
|
|
|
|
|
+class UnknownReactionTarget(Exception):
|
|
|
|
+ pass
|
|
|
|
+
|
|
|
|
+
|
|
class Portal(DBPortal, BasePortal):
|
|
class Portal(DBPortal, BasePortal):
|
|
by_mxid: dict[RoomID, Portal] = {}
|
|
by_mxid: dict[RoomID, Portal] = {}
|
|
by_chat_id: dict[tuple[str, str], Portal] = {}
|
|
by_chat_id: dict[tuple[str, str], Portal] = {}
|
|
@@ -371,8 +375,10 @@ class Portal(DBPortal, BasePortal):
|
|
try:
|
|
try:
|
|
retry_count = await self._signal_send_with_retries(
|
|
retry_count = await self._signal_send_with_retries(
|
|
sender,
|
|
sender,
|
|
- message,
|
|
|
|
event_id,
|
|
event_id,
|
|
|
|
+ message_type=message.msgtype,
|
|
|
|
+ send_fn=lambda *args, **kwargs: self.signal.send(**kwargs),
|
|
|
|
+ event_type=EventType.ROOM_MESSAGE,
|
|
username=sender.username,
|
|
username=sender.username,
|
|
recipient=self.chat_id,
|
|
recipient=self.chat_id,
|
|
body=text,
|
|
body=text,
|
|
@@ -420,41 +426,55 @@ class Portal(DBPortal, BasePortal):
|
|
await Portal._expire_event(dm.room_id, dm.mxid)
|
|
await Portal._expire_event(dm.room_id, dm.mxid)
|
|
|
|
|
|
async def _signal_send_with_retries(
|
|
async def _signal_send_with_retries(
|
|
- self, sender: u.User, message: MessageEventContent, event_id: EventID, **send_args
|
|
|
|
|
|
+ self,
|
|
|
|
+ sender: u.User,
|
|
|
|
+ event_id: EventID,
|
|
|
|
+ send_fn: Callable,
|
|
|
|
+ event_type: EventType,
|
|
|
|
+ message_type: MessageType | None = None,
|
|
|
|
+ **send_args,
|
|
) -> int:
|
|
) -> int:
|
|
retry_count = 7
|
|
retry_count = 7
|
|
retry_message_event_id = None
|
|
retry_message_event_id = None
|
|
for retry_num in range(retry_count):
|
|
for retry_num in range(retry_count):
|
|
try:
|
|
try:
|
|
self.log.info(f"Send attempt {retry_num}")
|
|
self.log.info(f"Send attempt {retry_num}")
|
|
- await self.signal.send(**send_args)
|
|
|
|
|
|
+ await send_fn(sender, event_id, **send_args)
|
|
|
|
|
|
# It was successful.
|
|
# It was successful.
|
|
if retry_message_event_id is not None:
|
|
if retry_message_event_id is not None:
|
|
await self.main_intent.redact(self.mxid, retry_message_event_id)
|
|
await self.main_intent.redact(self.mxid, retry_message_event_id)
|
|
return retry_num
|
|
return retry_num
|
|
- except NotConnected as e:
|
|
|
|
- # Only handle NotConnected exceptions so that other exceptions actually continue to
|
|
|
|
- # error.
|
|
|
|
|
|
+ except (NotConnected, UnknownReactionTarget) as e:
|
|
|
|
+ # Only handle NotConnected and UnknownReactionTarget exceptions so that other
|
|
|
|
+ # exceptions actually continue to error.
|
|
sleep_seconds = (retry_num + 1) ** 2
|
|
sleep_seconds = (retry_num + 1) ** 2
|
|
- msg = f"Not connected to signald. Going to sleep for {sleep_seconds}s. Error: {e}"
|
|
|
|
|
|
+ msg = (
|
|
|
|
+ f"Not connected to signald. Going to sleep for {sleep_seconds}s. Error: {e}"
|
|
|
|
+ if isinstance(e, NotConnected)
|
|
|
|
+ else f"UnknownReactionTarget: Going to sleep for {sleep_seconds}s. Error: {e}"
|
|
|
|
+ )
|
|
self.log.exception(msg)
|
|
self.log.exception(msg)
|
|
sender.send_remote_checkpoint(
|
|
sender.send_remote_checkpoint(
|
|
MessageSendCheckpointStatus.WILL_RETRY,
|
|
MessageSendCheckpointStatus.WILL_RETRY,
|
|
event_id,
|
|
event_id,
|
|
self.mxid,
|
|
self.mxid,
|
|
- EventType.ROOM_MESSAGE,
|
|
|
|
- message.msgtype,
|
|
|
|
|
|
+ event_type,
|
|
|
|
+ message_type=message_type,
|
|
error=msg,
|
|
error=msg,
|
|
retry_num=retry_num,
|
|
retry_num=retry_num,
|
|
)
|
|
)
|
|
|
|
|
|
if retry_num > 2:
|
|
if retry_num > 2:
|
|
# User has waited > ~15 seconds, send a notice that we are retrying.
|
|
# User has waited > ~15 seconds, send a notice that we are retrying.
|
|
|
|
+ user_friendly_message = (
|
|
|
|
+ "There was an error connecting to signald."
|
|
|
|
+ if isinstance(e, NotConnected)
|
|
|
|
+ else "Could not find message to react to on Signal."
|
|
|
|
+ )
|
|
event_content = TextMessageEventContent(
|
|
event_content = TextMessageEventContent(
|
|
MessageType.NOTICE,
|
|
MessageType.NOTICE,
|
|
- f"There was an error connecting to signald. Waiting for {sleep_seconds} "
|
|
|
|
- "before retrying.",
|
|
|
|
|
|
+ f"{user_friendly_message} Waiting for {sleep_seconds} before retrying.",
|
|
)
|
|
)
|
|
if retry_message_event_id is not None:
|
|
if retry_message_event_id is not None:
|
|
event_content.set_edit(retry_message_event_id)
|
|
event_content.set_edit(retry_message_event_id)
|
|
@@ -465,7 +485,11 @@ class Portal(DBPortal, BasePortal):
|
|
|
|
|
|
if retry_message_event_id is not None:
|
|
if retry_message_event_id is not None:
|
|
await self.main_intent.redact(self.mxid, retry_message_event_id)
|
|
await self.main_intent.redact(self.mxid, retry_message_event_id)
|
|
- raise NotConnected(f"Connection to signald still did not work after {retry_count} retries")
|
|
|
|
|
|
+ event_type_name = {
|
|
|
|
+ EventType.ROOM_MESSAGE: "message",
|
|
|
|
+ EventType.REACTION: "reaction",
|
|
|
|
+ }.get(event_type, str(event_type))
|
|
|
|
+ raise NotConnected(f"Failed to send {event_type_name} after {retry_count} retries.")
|
|
|
|
|
|
async def handle_matrix_reaction(
|
|
async def handle_matrix_reaction(
|
|
self, sender: u.User, event_id: EventID, reacting_to: EventID, emoji: str
|
|
self, sender: u.User, event_id: EventID, reacting_to: EventID, emoji: str
|
|
@@ -476,11 +500,41 @@ class Portal(DBPortal, BasePortal):
|
|
|
|
|
|
# Signal doesn't seem to use variation selectors at all
|
|
# Signal doesn't seem to use variation selectors at all
|
|
emoji = variation_selector.remove(emoji)
|
|
emoji = variation_selector.remove(emoji)
|
|
|
|
+ try:
|
|
|
|
+ retry_count = await self._signal_send_with_retries(
|
|
|
|
+ sender,
|
|
|
|
+ event_id,
|
|
|
|
+ send_fn=self._handle_matrix_reaction,
|
|
|
|
+ event_type=EventType.REACTION,
|
|
|
|
+ reacting_to=reacting_to,
|
|
|
|
+ emoji=emoji,
|
|
|
|
+ )
|
|
|
|
+ except Exception as e:
|
|
|
|
+ self.log.exception("Sending reaction failed")
|
|
|
|
+ sender.send_remote_checkpoint(
|
|
|
|
+ MessageSendCheckpointStatus.PERM_FAILURE,
|
|
|
|
+ event_id,
|
|
|
|
+ self.mxid,
|
|
|
|
+ EventType.REACTION,
|
|
|
|
+ error=e,
|
|
|
|
+ )
|
|
|
|
+ else:
|
|
|
|
+ sender.send_remote_checkpoint(
|
|
|
|
+ MessageSendCheckpointStatus.SUCCESS,
|
|
|
|
+ event_id,
|
|
|
|
+ self.mxid,
|
|
|
|
+ EventType.REACTION,
|
|
|
|
+ retry_num=retry_count,
|
|
|
|
+ )
|
|
|
|
+ await self._send_delivery_receipt(event_id)
|
|
|
|
|
|
|
|
+ async def _handle_matrix_reaction(
|
|
|
|
+ self, sender: u.User, event_id: EventID, reacting_to: EventID, emoji: str
|
|
|
|
+ ) -> None:
|
|
message = await DBMessage.get_by_mxid(reacting_to, self.mxid)
|
|
message = await DBMessage.get_by_mxid(reacting_to, self.mxid)
|
|
if not message:
|
|
if not message:
|
|
self.log.debug(f"Ignoring reaction to unknown event {reacting_to}")
|
|
self.log.debug(f"Ignoring reaction to unknown event {reacting_to}")
|
|
- return
|
|
|
|
|
|
+ raise UnknownReactionTarget(f"Ignoring reaction to unknown event {reacting_to}")
|
|
|
|
|
|
existing = await DBReaction.get_by_signal_id(
|
|
existing = await DBReaction.get_by_signal_id(
|
|
self.chat_id, self.receiver, message.sender, message.timestamp, sender.address
|
|
self.chat_id, self.receiver, message.sender, message.timestamp, sender.address
|
|
@@ -497,30 +551,10 @@ class Portal(DBPortal, BasePortal):
|
|
target_author=message.sender,
|
|
target_author=message.sender,
|
|
target_sent_timestamp=message.timestamp,
|
|
target_sent_timestamp=message.timestamp,
|
|
)
|
|
)
|
|
- try:
|
|
|
|
- await self.signal.react(
|
|
|
|
- username=sender.username, recipient=self.chat_id, reaction=reaction
|
|
|
|
- )
|
|
|
|
- except Exception as e:
|
|
|
|
- sender.send_remote_checkpoint(
|
|
|
|
- MessageSendCheckpointStatus.PERM_FAILURE,
|
|
|
|
- event_id,
|
|
|
|
- self.mxid,
|
|
|
|
- EventType.REACTION,
|
|
|
|
- error=e,
|
|
|
|
- )
|
|
|
|
- else:
|
|
|
|
- self.log.trace(f"{sender.mxid} reacted to {message.timestamp} with {emoji}")
|
|
|
|
- sender.send_remote_checkpoint(
|
|
|
|
- MessageSendCheckpointStatus.SUCCESS,
|
|
|
|
- event_id,
|
|
|
|
- self.mxid,
|
|
|
|
- EventType.REACTION,
|
|
|
|
- )
|
|
|
|
- await self._upsert_reaction(
|
|
|
|
- existing, self.main_intent, event_id, sender, message, emoji
|
|
|
|
- )
|
|
|
|
- await self._send_delivery_receipt(event_id)
|
|
|
|
|
|
+ self.log.trace(f"{sender.mxid} reacted to {message.timestamp} with {emoji}")
|
|
|
|
+ await self.signal.react(sender.username, recipient=self.chat_id, reaction=reaction)
|
|
|
|
+
|
|
|
|
+ await self._upsert_reaction(existing, self.main_intent, event_id, sender, message, emoji)
|
|
|
|
|
|
async def handle_matrix_redaction(
|
|
async def handle_matrix_redaction(
|
|
self, sender: u.User, event_id: EventID, redaction_event_id: EventID
|
|
self, sender: u.User, event_id: EventID, redaction_event_id: EventID
|