formatter.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. # mautrix-signal - A Matrix-Signal puppeting bridge
  2. # Copyright (C) 2021 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. from typing import cast
  18. import html
  19. import struct
  20. from mautrix.types import Format, MessageType, TextMessageEventContent, UserID
  21. from mautrix.util.formatter import (
  22. EntityString,
  23. EntityType,
  24. MarkdownString,
  25. MatrixParser as BaseMatrixParser,
  26. SemiAbstractEntity,
  27. )
  28. from mausignald.types import Address, Mention, MessageData
  29. from . import puppet as pu, user as u
  30. # Helper methods from from https://github.com/LonamiWebs/Telethon/blob/master/telethon/helpers.py
  31. # I don't know if this is how Signal actually calculates lengths,
  32. # but it seems to work better than plain len()
  33. def add_surrogate(text: str) -> str:
  34. return "".join(
  35. "".join(chr(y) for y in struct.unpack("<HH", x.encode("utf-16le")))
  36. if (0x10000 <= ord(x) <= 0x10FFFF)
  37. else x
  38. for x in text
  39. )
  40. def del_surrogate(text: str) -> str:
  41. return text.encode("utf-16", "surrogatepass").decode("utf-16")
  42. async def signal_to_matrix(message: MessageData) -> TextMessageEventContent:
  43. content = TextMessageEventContent(msgtype=MessageType.TEXT, body=message.body)
  44. surrogated_text = add_surrogate(message.body)
  45. if message.mentions:
  46. text_chunks = []
  47. html_chunks = []
  48. last_offset = 0
  49. for mention in message.mentions:
  50. before = surrogated_text[last_offset : mention.start]
  51. last_offset = mention.start + mention.length
  52. text_chunks.append(before)
  53. html_chunks.append(html.escape(before))
  54. puppet = await pu.Puppet.get_by_address(Address(uuid=mention.uuid))
  55. name = add_surrogate(puppet.name or puppet.mxid)
  56. text_chunks.append(name)
  57. html_chunks.append(f'<a href="https://matrix.to/#/{puppet.mxid}">{name}</a>')
  58. end = surrogated_text[last_offset:]
  59. text_chunks.append(end)
  60. html_chunks.append(html.escape(end))
  61. content.body = del_surrogate("".join(text_chunks))
  62. content.format = Format.HTML
  63. content.formatted_body = del_surrogate("".join(html_chunks))
  64. return content
  65. class MentionEntity(Mention, SemiAbstractEntity):
  66. @property
  67. def offset(self) -> int:
  68. return self.start
  69. @offset.setter
  70. def offset(self, val: int) -> None:
  71. self.start = val
  72. def copy(self) -> MentionEntity:
  73. return MentionEntity(uuid=self.uuid, length=self.length, start=self.start)
  74. # TODO this has a lot of duplication with mautrix-facebook, maybe move to mautrix-python
  75. class SignalFormatString(EntityString[MentionEntity, EntityType], MarkdownString):
  76. def format(self, entity_type: EntityType, **kwargs) -> SignalFormatString:
  77. prefix = suffix = ""
  78. if entity_type == EntityType.USER_MENTION:
  79. self.entities.append(
  80. MentionEntity(uuid=kwargs["uuid"], start=0, length=len(self.text)),
  81. )
  82. return self
  83. elif entity_type == EntityType.BOLD:
  84. prefix = suffix = "**"
  85. elif entity_type == EntityType.ITALIC:
  86. prefix = suffix = "_"
  87. elif entity_type == EntityType.STRIKETHROUGH:
  88. prefix = suffix = "~~"
  89. elif entity_type == EntityType.URL:
  90. if kwargs["url"] != self.text:
  91. suffix = f" ({kwargs['url']})"
  92. elif entity_type == EntityType.PREFORMATTED:
  93. prefix = f"```{kwargs['language']}\n"
  94. suffix = "\n```"
  95. elif entity_type == EntityType.INLINE_CODE:
  96. prefix = suffix = "`"
  97. elif entity_type == EntityType.BLOCKQUOTE:
  98. children = self.trim().split("\n")
  99. children = [child.prepend("> ") for child in children]
  100. return self.join(children, "\n")
  101. elif entity_type == EntityType.HEADER:
  102. prefix = "#" * kwargs["size"] + " "
  103. else:
  104. return self
  105. self._offset_entities(len(prefix))
  106. self.text = f"{prefix}{self.text}{suffix}"
  107. return self
  108. class MatrixParser(BaseMatrixParser[SignalFormatString]):
  109. fs = SignalFormatString
  110. async def user_pill_to_fstring(
  111. self, msg: SignalFormatString, user_id: UserID
  112. ) -> SignalFormatString:
  113. user = await u.User.get_by_mxid(user_id, create=False)
  114. if user and user.uuid:
  115. uuid = user.uuid
  116. else:
  117. puppet = await pu.Puppet.get_by_mxid(user_id, create=False)
  118. if puppet:
  119. uuid = puppet.uuid
  120. else:
  121. return msg
  122. return msg.format(self.e.USER_MENTION, uuid=uuid)
  123. async def parse(self, data: str) -> SignalFormatString:
  124. return cast(SignalFormatString, await super().parse(data))
  125. async def matrix_to_signal(content: TextMessageEventContent) -> tuple[str, list[Mention]]:
  126. if content.msgtype == MessageType.EMOTE:
  127. content.body = f"/me {content.body}"
  128. if content.formatted_body:
  129. content.formatted_body = f"/me {content.formatted_body}"
  130. if content.format == Format.HTML and content.formatted_body:
  131. parsed = await MatrixParser().parse(add_surrogate(content.formatted_body))
  132. text, mentions = del_surrogate(parsed.text), parsed.entities
  133. else:
  134. text, mentions = content.body, []
  135. return text, mentions