|
@@ -55,6 +55,7 @@ from mautrix.bridge import BasePortal, RejectMatrixInvite, async_getter_lock
|
|
|
from mautrix.errors import IntentError, MatrixError, MForbidden
|
|
|
from mautrix.types import (
|
|
|
AudioInfo,
|
|
|
+ BeeperMessageStatusEventContent,
|
|
|
ContentURI,
|
|
|
EncryptedEvent,
|
|
|
EncryptedFile,
|
|
@@ -66,8 +67,11 @@ from mautrix.types import (
|
|
|
Membership,
|
|
|
MessageEvent,
|
|
|
MessageEventContent,
|
|
|
+ MessageStatusReason,
|
|
|
MessageType,
|
|
|
PowerLevelStateEventContent,
|
|
|
+ RelatesTo,
|
|
|
+ RelationType,
|
|
|
RoomID,
|
|
|
TextMessageEventContent,
|
|
|
UserID,
|
|
@@ -320,12 +324,53 @@ class Portal(DBPortal, BasePortal):
|
|
|
status, event_id, self.mxid, EventType.ROOM_MESSAGE, message.msgtype, error=e
|
|
|
)
|
|
|
await sender.handle_auth_failure(e)
|
|
|
- await self._send_message(
|
|
|
- self.main_intent,
|
|
|
- TextMessageEventContent(
|
|
|
- msgtype=MessageType.NOTICE, body=f"\u26a0 Your message was not bridged: {e}"
|
|
|
- ),
|
|
|
- )
|
|
|
+ await self._send_error_notice("message", e)
|
|
|
+ asyncio.create_task(self._send_message_status(event_id, e))
|
|
|
+
|
|
|
+ async def _send_error_notice(self, type_name: str, err: Exception) -> None:
|
|
|
+ if not self.config["bridge.delivery_error_reports"]:
|
|
|
+ return
|
|
|
+ message = f"{type(err).__name__}: {err}"
|
|
|
+ if isinstance(err, NotConnected):
|
|
|
+ message = "There was an error connecting to signald."
|
|
|
+ elif isinstance(err, UnknownReactionTarget):
|
|
|
+ message = "Could not find message to react to on Signal."
|
|
|
+ await self._send_message(
|
|
|
+ self.main_intent,
|
|
|
+ TextMessageEventContent(
|
|
|
+ msgtype=MessageType.NOTICE,
|
|
|
+ body=f"\u26a0 Your {type_name} was not bridged: {message}",
|
|
|
+ ),
|
|
|
+ )
|
|
|
+
|
|
|
+ async def _send_message_status(self, event_id: EventID, err: Exception | None) -> None:
|
|
|
+ if not self.config["bridge.message_status_events"]:
|
|
|
+ return
|
|
|
+ intent = self.az.intent if self.encrypted else self.main_intent
|
|
|
+ status = BeeperMessageStatusEventContent(
|
|
|
+ network=self.bridge_info_state_key,
|
|
|
+ relates_to=RelatesTo(
|
|
|
+ rel_type=RelationType.REFERENCE,
|
|
|
+ event_id=event_id,
|
|
|
+ ),
|
|
|
+ success=err is None,
|
|
|
+ )
|
|
|
+ if err:
|
|
|
+ status.reason = MessageStatusReason.GENERIC_ERROR
|
|
|
+ status.error = str(err)
|
|
|
+ status.is_certain = True
|
|
|
+ status.can_retry = True
|
|
|
+ if isinstance(err, AttachmentTooLargeError):
|
|
|
+ status.reason = MessageStatusReason.UNSUPPORTED
|
|
|
+ status.can_retry = False
|
|
|
+ elif isinstance(err, UnknownReactionTarget):
|
|
|
+ status.can_retry = False
|
|
|
+
|
|
|
+ await intent.send_message_event(
|
|
|
+ room_id=self.mxid,
|
|
|
+ event_type=EventType.BEEPER_MESSAGE_STATUS,
|
|
|
+ content=status,
|
|
|
+ )
|
|
|
|
|
|
async def _beeper_link_preview_to_signal(
|
|
|
self, beeper_link_preview: dict[str, Any]
|
|
@@ -406,58 +451,55 @@ class Portal(DBPortal, BasePortal):
|
|
|
return
|
|
|
|
|
|
self.log.debug(f"Sending Matrix message {event_id} to Signal with timestamp {request_id}")
|
|
|
- try:
|
|
|
- retry_count = await self._signal_send_with_retries(
|
|
|
- sender,
|
|
|
- event_id,
|
|
|
- message_type=message.msgtype,
|
|
|
- send_fn=lambda *args, **kwargs: self.signal.send(**kwargs),
|
|
|
- event_type=EventType.ROOM_MESSAGE,
|
|
|
- username=sender.username,
|
|
|
- recipient=self.chat_id,
|
|
|
- body=text,
|
|
|
- mentions=mentions,
|
|
|
- previews=link_previews,
|
|
|
- quote=quote,
|
|
|
- attachments=attachments,
|
|
|
- timestamp=request_id,
|
|
|
- )
|
|
|
- except Exception:
|
|
|
- self.log.exception("Sending message failed")
|
|
|
- raise
|
|
|
- else:
|
|
|
- sender.send_remote_checkpoint(
|
|
|
- MessageSendCheckpointStatus.SUCCESS,
|
|
|
- event_id,
|
|
|
- self.mxid,
|
|
|
- EventType.ROOM_MESSAGE,
|
|
|
- message.msgtype,
|
|
|
- retry_num=retry_count,
|
|
|
- )
|
|
|
- await self._send_delivery_receipt(event_id)
|
|
|
+ retry_count = await self._signal_send_with_retries(
|
|
|
+ sender,
|
|
|
+ event_id,
|
|
|
+ message_type=message.msgtype,
|
|
|
+ send_fn=lambda *args, **kwargs: self.signal.send(**kwargs),
|
|
|
+ event_type=EventType.ROOM_MESSAGE,
|
|
|
+ username=sender.username,
|
|
|
+ recipient=self.chat_id,
|
|
|
+ body=text,
|
|
|
+ mentions=mentions,
|
|
|
+ previews=link_previews,
|
|
|
+ quote=quote,
|
|
|
+ attachments=attachments,
|
|
|
+ timestamp=request_id,
|
|
|
+ )
|
|
|
|
|
|
- msg = DBMessage(
|
|
|
- mxid=event_id,
|
|
|
- mx_room=self.mxid,
|
|
|
- sender=sender.address,
|
|
|
- timestamp=request_id,
|
|
|
- signal_chat_id=self.chat_id,
|
|
|
- signal_receiver=self.receiver,
|
|
|
- )
|
|
|
- await msg.insert()
|
|
|
- self.log.debug(f"Handled Matrix message {event_id} -> {request_id}")
|
|
|
- if attachment_path and self.config["signal.remove_file_after_handling"]:
|
|
|
- try:
|
|
|
- os.remove(attachment_path)
|
|
|
- except FileNotFoundError:
|
|
|
- pass
|
|
|
+ msg = DBMessage(
|
|
|
+ mxid=event_id,
|
|
|
+ mx_room=self.mxid,
|
|
|
+ sender=sender.address,
|
|
|
+ timestamp=request_id,
|
|
|
+ signal_chat_id=self.chat_id,
|
|
|
+ signal_receiver=self.receiver,
|
|
|
+ )
|
|
|
+ await msg.insert()
|
|
|
+ self.log.debug(f"Handled Matrix message {event_id} -> {request_id}")
|
|
|
+ if attachment_path and self.config["signal.remove_file_after_handling"]:
|
|
|
+ try:
|
|
|
+ os.remove(attachment_path)
|
|
|
+ except FileNotFoundError:
|
|
|
+ pass
|
|
|
|
|
|
- # Handle disappearing messages
|
|
|
- if self.expiration_time and self.disappearing_enabled:
|
|
|
- dm = DisappearingMessage(self.mxid, event_id, self.expiration_time)
|
|
|
- dm.start_timer()
|
|
|
- await dm.insert()
|
|
|
- await self._disappear_event(dm)
|
|
|
+ # Handle disappearing messages
|
|
|
+ if self.expiration_time and self.disappearing_enabled:
|
|
|
+ dm = DisappearingMessage(self.mxid, event_id, self.expiration_time)
|
|
|
+ dm.start_timer()
|
|
|
+ await dm.insert()
|
|
|
+ await self._disappear_event(dm)
|
|
|
+
|
|
|
+ sender.send_remote_checkpoint(
|
|
|
+ MessageSendCheckpointStatus.SUCCESS,
|
|
|
+ event_id,
|
|
|
+ self.mxid,
|
|
|
+ EventType.ROOM_MESSAGE,
|
|
|
+ message.msgtype,
|
|
|
+ retry_num=retry_count,
|
|
|
+ )
|
|
|
+ await self._send_delivery_receipt(event_id)
|
|
|
+ asyncio.create_task(self._send_message_status(event_id, err=None))
|
|
|
|
|
|
async def _signal_send_with_retries(
|
|
|
self,
|
|
@@ -468,8 +510,8 @@ class Portal(DBPortal, BasePortal):
|
|
|
message_type: MessageType | None = None,
|
|
|
**send_args,
|
|
|
) -> int:
|
|
|
- retry_count = 7
|
|
|
- retry_message_event_id = None
|
|
|
+ retry_count = 4
|
|
|
+ last_error_type = NotConnected
|
|
|
for retry_num in range(retry_count):
|
|
|
try:
|
|
|
req_id = uuid4()
|
|
@@ -477,15 +519,14 @@ class Portal(DBPortal, BasePortal):
|
|
|
f"Send attempt {retry_num}. Attempting to send {event_id} with {req_id}"
|
|
|
)
|
|
|
await send_fn(sender, event_id, req_id=req_id, **send_args)
|
|
|
-
|
|
|
- # It was successful.
|
|
|
- if retry_message_event_id is not None:
|
|
|
- await self.main_intent.redact(self.mxid, retry_message_event_id)
|
|
|
return retry_num
|
|
|
except (NotConnected, UnknownReactionTarget) as e:
|
|
|
+ if retry_num >= retry_count - 1:
|
|
|
+ break
|
|
|
+ last_error_type = type(e)
|
|
|
# Only handle NotConnected and UnknownReactionTarget exceptions so that other
|
|
|
# exceptions actually continue to error.
|
|
|
- sleep_seconds = (retry_num + 1) ** 2
|
|
|
+ sleep_seconds = retry_num * 2 + 1
|
|
|
msg = (
|
|
|
f"Not connected to signald. Going to sleep for {sleep_seconds}s. Error: {e}"
|
|
|
if isinstance(e, NotConnected)
|
|
@@ -502,34 +543,15 @@ class Portal(DBPortal, BasePortal):
|
|
|
retry_num=retry_num,
|
|
|
)
|
|
|
|
|
|
- if retry_num > 2:
|
|
|
- # User has waited > ~15 seconds, send a notice that we are retrying.
|
|
|
- user_friendly_message = (
|
|
|
- "There was an error connecting to signald."
|
|
|
- if isinstance(e, NotConnected)
|
|
|
- else "Could not find message to react to on Signal."
|
|
|
- )
|
|
|
- event_content = TextMessageEventContent(
|
|
|
- MessageType.NOTICE,
|
|
|
- f"{user_friendly_message} Waiting for {sleep_seconds} before retrying.",
|
|
|
- )
|
|
|
- if retry_message_event_id is not None:
|
|
|
- event_content.set_edit(retry_message_event_id)
|
|
|
- new_event_id = await self.main_intent.send_message(self.mxid, event_content)
|
|
|
- retry_message_event_id = retry_message_event_id or new_event_id
|
|
|
-
|
|
|
await asyncio.sleep(sleep_seconds)
|
|
|
except Exception as e:
|
|
|
await sender.handle_auth_failure(e)
|
|
|
raise
|
|
|
-
|
|
|
- if retry_message_event_id is not None:
|
|
|
- await self.main_intent.redact(self.mxid, retry_message_event_id)
|
|
|
event_type_name = {
|
|
|
EventType.ROOM_MESSAGE: "message",
|
|
|
EventType.REACTION: "reaction",
|
|
|
}.get(event_type, str(event_type))
|
|
|
- raise NotConnected(f"Failed to send {event_type_name} after {retry_count} retries.")
|
|
|
+ raise last_error_type(f"Failed to send {event_type_name} after {retry_count} retries.")
|
|
|
|
|
|
async def handle_matrix_reaction(
|
|
|
self, sender: u.User, event_id: EventID, reacting_to: EventID, emoji: str
|
|
@@ -550,7 +572,7 @@ class Portal(DBPortal, BasePortal):
|
|
|
emoji=emoji,
|
|
|
)
|
|
|
except Exception as e:
|
|
|
- self.log.exception("Sending reaction failed")
|
|
|
+ self.log.exception(f"Failed to handle Matrix reaction {event_id} to {reacting_to}")
|
|
|
sender.send_remote_checkpoint(
|
|
|
MessageSendCheckpointStatus.PERM_FAILURE,
|
|
|
event_id,
|
|
@@ -558,7 +580,9 @@ class Portal(DBPortal, BasePortal):
|
|
|
EventType.REACTION,
|
|
|
error=e,
|
|
|
)
|
|
|
+ await self._send_error_notice("reaction", e)
|
|
|
await sender.handle_auth_failure(e)
|
|
|
+ asyncio.create_task(self._send_message_status(event_id, e))
|
|
|
else:
|
|
|
sender.send_remote_checkpoint(
|
|
|
MessageSendCheckpointStatus.SUCCESS,
|
|
@@ -568,6 +592,7 @@ class Portal(DBPortal, BasePortal):
|
|
|
retry_num=retry_count,
|
|
|
)
|
|
|
await self._send_delivery_receipt(event_id)
|
|
|
+ asyncio.create_task(self._send_message_status(event_id, err=None))
|
|
|
|
|
|
async def _handle_matrix_reaction(
|
|
|
self,
|
|
@@ -621,7 +646,10 @@ class Portal(DBPortal, BasePortal):
|
|
|
sender.username, recipient=self.chat_id, timestamp=message.timestamp
|
|
|
)
|
|
|
except Exception as e:
|
|
|
- self.log.exception("Removing message failed")
|
|
|
+ self.log.exception(
|
|
|
+ f"Failed to handle Matrix redaction {redaction_event_id} of "
|
|
|
+ f"message {event_id} ({message.timestamp})"
|
|
|
+ )
|
|
|
sender.send_remote_checkpoint(
|
|
|
MessageSendCheckpointStatus.PERM_FAILURE,
|
|
|
redaction_event_id,
|
|
@@ -630,6 +658,8 @@ class Portal(DBPortal, BasePortal):
|
|
|
error=e,
|
|
|
)
|
|
|
await sender.handle_auth_failure(e)
|
|
|
+ asyncio.create_task(self._send_error_notice("message deletion", e))
|
|
|
+ asyncio.create_task(self._send_message_status(event_id, e))
|
|
|
else:
|
|
|
self.log.trace(f"Removed {message} after Matrix redaction")
|
|
|
sender.send_remote_checkpoint(
|
|
@@ -639,6 +669,7 @@ class Portal(DBPortal, BasePortal):
|
|
|
EventType.ROOM_REDACTION,
|
|
|
)
|
|
|
await self._send_delivery_receipt(redaction_event_id)
|
|
|
+ asyncio.create_task(self._send_message_status(redaction_event_id, err=None))
|
|
|
return
|
|
|
|
|
|
reaction = await DBReaction.get_by_mxid(event_id, self.mxid)
|
|
@@ -655,7 +686,10 @@ class Portal(DBPortal, BasePortal):
|
|
|
username=sender.username, recipient=self.chat_id, reaction=remove_reaction
|
|
|
)
|
|
|
except Exception as e:
|
|
|
- self.log.exception("Removing reaction failed")
|
|
|
+ self.log.exception(
|
|
|
+ f"Failed to handle Matrix redaction {redaction_event_id} of "
|
|
|
+ f"reaction {event_id} to {reaction.msg_timestamp}"
|
|
|
+ )
|
|
|
sender.send_remote_checkpoint(
|
|
|
MessageSendCheckpointStatus.PERM_FAILURE,
|
|
|
redaction_event_id,
|
|
@@ -664,6 +698,8 @@ class Portal(DBPortal, BasePortal):
|
|
|
error=e,
|
|
|
)
|
|
|
await sender.handle_auth_failure(e)
|
|
|
+ asyncio.create_task(self._send_error_notice("reaction deletion", e))
|
|
|
+ asyncio.create_task(self._send_message_status(event_id, e))
|
|
|
else:
|
|
|
self.log.trace(f"Removed {reaction} after Matrix redaction")
|
|
|
sender.send_remote_checkpoint(
|
|
@@ -673,6 +709,7 @@ class Portal(DBPortal, BasePortal):
|
|
|
EventType.ROOM_REDACTION,
|
|
|
)
|
|
|
await self._send_delivery_receipt(redaction_event_id)
|
|
|
+ asyncio.create_task(self._send_message_status(redaction_event_id, err=None))
|
|
|
return
|
|
|
|
|
|
sender.send_remote_checkpoint(
|
|
@@ -680,8 +717,10 @@ class Portal(DBPortal, BasePortal):
|
|
|
redaction_event_id,
|
|
|
self.mxid,
|
|
|
EventType.ROOM_REDACTION,
|
|
|
- error=f"No message or reaction found for redaction",
|
|
|
+ error="No message or reaction found for redaction",
|
|
|
)
|
|
|
+ status_err = UnknownReactionTarget("No message or reaction found for redaction")
|
|
|
+ asyncio.create_task(self._send_message_status(redaction_event_id, err=status_err))
|
|
|
|
|
|
async def handle_matrix_join(self, user: u.User) -> None:
|
|
|
if self.is_direct or not await user.is_logged_in():
|
|
@@ -735,7 +774,7 @@ class Portal(DBPortal, BasePortal):
|
|
|
invited_by.username, self.chat_id, add_members=[user.address]
|
|
|
)
|
|
|
except RPCError as e:
|
|
|
- raise RejectMatrixInvite(e.message) from e
|
|
|
+ raise RejectMatrixInvite(str(e)) from e
|
|
|
power_levels = await self.main_intent.get_power_levels(self.mxid)
|
|
|
invitee_pl = power_levels.get_user_level(user.mxid)
|
|
|
if invitee_pl >= 50:
|