|
@@ -371,8 +371,10 @@ class Portal(DBPortal, BasePortal):
|
|
try:
|
|
try:
|
|
retry_count = await self._signal_send_with_retries(
|
|
retry_count = await self._signal_send_with_retries(
|
|
sender,
|
|
sender,
|
|
- message,
|
|
|
|
event_id,
|
|
event_id,
|
|
|
|
+ message_type=message.msgtype,
|
|
|
|
+ send_fn=lambda *args, **kwargs: self.signal.send(**kwargs),
|
|
|
|
+ event_type=EventType.ROOM_MESSAGE,
|
|
username=sender.username,
|
|
username=sender.username,
|
|
recipient=self.chat_id,
|
|
recipient=self.chat_id,
|
|
body=text,
|
|
body=text,
|
|
@@ -420,14 +422,20 @@ class Portal(DBPortal, BasePortal):
|
|
await Portal._expire_event(dm.room_id, dm.mxid)
|
|
await Portal._expire_event(dm.room_id, dm.mxid)
|
|
|
|
|
|
async def _signal_send_with_retries(
|
|
async def _signal_send_with_retries(
|
|
- self, sender: u.User, message: MessageEventContent, event_id: EventID, **send_args
|
|
|
|
|
|
+ self,
|
|
|
|
+ sender: u.User,
|
|
|
|
+ event_id: EventID,
|
|
|
|
+ send_fn: Callable,
|
|
|
|
+ event_type: EventType,
|
|
|
|
+ message_type: MessageType | None = None,
|
|
|
|
+ **send_args,
|
|
) -> int:
|
|
) -> int:
|
|
retry_count = 7
|
|
retry_count = 7
|
|
retry_message_event_id = None
|
|
retry_message_event_id = None
|
|
for retry_num in range(retry_count):
|
|
for retry_num in range(retry_count):
|
|
try:
|
|
try:
|
|
self.log.info(f"Send attempt {retry_num}")
|
|
self.log.info(f"Send attempt {retry_num}")
|
|
- await self.signal.send(**send_args)
|
|
|
|
|
|
+ await send_fn(sender, event_id, **send_args)
|
|
|
|
|
|
# It was successful.
|
|
# It was successful.
|
|
if retry_message_event_id is not None:
|
|
if retry_message_event_id is not None:
|
|
@@ -443,8 +451,8 @@ class Portal(DBPortal, BasePortal):
|
|
MessageSendCheckpointStatus.WILL_RETRY,
|
|
MessageSendCheckpointStatus.WILL_RETRY,
|
|
event_id,
|
|
event_id,
|
|
self.mxid,
|
|
self.mxid,
|
|
- EventType.ROOM_MESSAGE,
|
|
|
|
- message.msgtype,
|
|
|
|
|
|
+ event_type,
|
|
|
|
+ message_type=message_type,
|
|
error=msg,
|
|
error=msg,
|
|
retry_num=retry_num,
|
|
retry_num=retry_num,
|
|
)
|
|
)
|
|
@@ -476,11 +484,41 @@ class Portal(DBPortal, BasePortal):
|
|
|
|
|
|
# Signal doesn't seem to use variation selectors at all
|
|
# Signal doesn't seem to use variation selectors at all
|
|
emoji = variation_selector.remove(emoji)
|
|
emoji = variation_selector.remove(emoji)
|
|
|
|
+ try:
|
|
|
|
+ retry_count = await self._signal_send_with_retries(
|
|
|
|
+ sender,
|
|
|
|
+ event_id,
|
|
|
|
+ send_fn=self._handle_matrix_reaction,
|
|
|
|
+ event_type=EventType.REACTION,
|
|
|
|
+ reacting_to=reacting_to,
|
|
|
|
+ emoji=emoji,
|
|
|
|
+ )
|
|
|
|
+ except Exception as e:
|
|
|
|
+ self.log.exception("Sending reaction failed")
|
|
|
|
+ sender.send_remote_checkpoint(
|
|
|
|
+ MessageSendCheckpointStatus.PERM_FAILURE,
|
|
|
|
+ event_id,
|
|
|
|
+ self.mxid,
|
|
|
|
+ EventType.REACTION,
|
|
|
|
+ error=e,
|
|
|
|
+ )
|
|
|
|
+ else:
|
|
|
|
+ sender.send_remote_checkpoint(
|
|
|
|
+ MessageSendCheckpointStatus.SUCCESS,
|
|
|
|
+ event_id,
|
|
|
|
+ self.mxid,
|
|
|
|
+ EventType.REACTION,
|
|
|
|
+ retry_num=retry_count,
|
|
|
|
+ )
|
|
|
|
+ await self._send_delivery_receipt(event_id)
|
|
|
|
|
|
|
|
+ async def _handle_matrix_reaction(
|
|
|
|
+ self, sender: u.User, event_id: EventID, reacting_to: EventID, emoji: str
|
|
|
|
+ ) -> None:
|
|
message = await DBMessage.get_by_mxid(reacting_to, self.mxid)
|
|
message = await DBMessage.get_by_mxid(reacting_to, self.mxid)
|
|
if not message:
|
|
if not message:
|
|
self.log.debug(f"Ignoring reaction to unknown event {reacting_to}")
|
|
self.log.debug(f"Ignoring reaction to unknown event {reacting_to}")
|
|
- return
|
|
|
|
|
|
+ raise NotConnected(f"Ignoring reaction to unknown event {reacting_to}")
|
|
|
|
|
|
existing = await DBReaction.get_by_signal_id(
|
|
existing = await DBReaction.get_by_signal_id(
|
|
self.chat_id, self.receiver, message.sender, message.timestamp, sender.address
|
|
self.chat_id, self.receiver, message.sender, message.timestamp, sender.address
|
|
@@ -497,30 +535,10 @@ class Portal(DBPortal, BasePortal):
|
|
target_author=message.sender,
|
|
target_author=message.sender,
|
|
target_sent_timestamp=message.timestamp,
|
|
target_sent_timestamp=message.timestamp,
|
|
)
|
|
)
|
|
- try:
|
|
|
|
- await self.signal.react(
|
|
|
|
- username=sender.username, recipient=self.chat_id, reaction=reaction
|
|
|
|
- )
|
|
|
|
- except Exception as e:
|
|
|
|
- sender.send_remote_checkpoint(
|
|
|
|
- MessageSendCheckpointStatus.PERM_FAILURE,
|
|
|
|
- event_id,
|
|
|
|
- self.mxid,
|
|
|
|
- EventType.REACTION,
|
|
|
|
- error=e,
|
|
|
|
- )
|
|
|
|
- else:
|
|
|
|
- self.log.trace(f"{sender.mxid} reacted to {message.timestamp} with {emoji}")
|
|
|
|
- sender.send_remote_checkpoint(
|
|
|
|
- MessageSendCheckpointStatus.SUCCESS,
|
|
|
|
- event_id,
|
|
|
|
- self.mxid,
|
|
|
|
- EventType.REACTION,
|
|
|
|
- )
|
|
|
|
- await self._upsert_reaction(
|
|
|
|
- existing, self.main_intent, event_id, sender, message, emoji
|
|
|
|
- )
|
|
|
|
- await self._send_delivery_receipt(event_id)
|
|
|
|
|
|
+ self.log.trace(f"{sender.mxid} reacted to {message.timestamp} with {emoji}")
|
|
|
|
+ await self.signal.react(sender.username, recipient=self.chat_id, reaction=reaction)
|
|
|
|
+
|
|
|
|
+ await self._upsert_reaction(existing, self.main_intent, event_id, sender, message, emoji)
|
|
|
|
|
|
async def handle_matrix_redaction(
|
|
async def handle_matrix_redaction(
|
|
self, sender: u.User, event_id: EventID, redaction_event_id: EventID
|
|
self, sender: u.User, event_id: EventID, redaction_event_id: EventID
|