formatter.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. # mautrix-signal - A Matrix-Signal puppeting bridge
  2. # Copyright (C) 2021 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. from typing import cast
  18. import html
  19. import struct
  20. from mausignald.types import Address, Mention, MessageData
  21. from mautrix.types import Format, MessageType, TextMessageEventContent, UserID
  22. from mautrix.util.formatter import (
  23. EntityString,
  24. EntityType,
  25. MarkdownString,
  26. MatrixParser as BaseMatrixParser,
  27. SemiAbstractEntity,
  28. )
  29. from . import puppet as pu, user as u
  30. # Helper methods from from https://github.com/LonamiWebs/Telethon/blob/master/telethon/helpers.py
  31. # I don't know if this is how Signal actually calculates lengths,
  32. # but it seems to work better than plain len()
  33. def add_surrogate(text: str) -> str:
  34. return "".join(
  35. "".join(chr(y) for y in struct.unpack("<HH", x.encode("utf-16le")))
  36. if (0x10000 <= ord(x) <= 0x10FFFF)
  37. else x
  38. for x in text
  39. )
  40. def del_surrogate(text: str) -> str:
  41. return text.encode("utf-16", "surrogatepass").decode("utf-16")
  42. async def signal_to_matrix(message: MessageData) -> TextMessageEventContent:
  43. content = TextMessageEventContent(msgtype=MessageType.TEXT, body=message.body)
  44. surrogated_text = add_surrogate(message.body)
  45. if message.mentions:
  46. text_chunks = []
  47. html_chunks = []
  48. last_offset = 0
  49. for mention in message.mentions:
  50. before = surrogated_text[last_offset : mention.start]
  51. last_offset = mention.start + mention.length
  52. text_chunks.append(before)
  53. html_chunks.append(html.escape(before))
  54. puppet = await pu.Puppet.get_by_uuid(mention.uuid)
  55. name = add_surrogate(puppet.name or puppet.mxid)
  56. text_chunks.append(name)
  57. html_chunks.append(f'<a href="https://matrix.to/#/{puppet.mxid}">{name}</a>')
  58. end = surrogated_text[last_offset:]
  59. text_chunks.append(end)
  60. html_chunks.append(html.escape(end))
  61. content.body = del_surrogate("".join(text_chunks))
  62. content.format = Format.HTML
  63. content.formatted_body = del_surrogate("".join(html_chunks))
  64. return content
  65. class MentionEntity(Mention, SemiAbstractEntity):
  66. @property
  67. def offset(self) -> int:
  68. return self.start
  69. @offset.setter
  70. def offset(self, val: int) -> None:
  71. self.start = val
  72. def copy(self) -> MentionEntity:
  73. return MentionEntity(uuid=self.uuid, length=self.length, start=self.start)
  74. # TODO this has a lot of duplication with mautrix-facebook, maybe move to mautrix-python
  75. class SignalFormatString(EntityString[MentionEntity, EntityType], MarkdownString):
  76. def format(self, entity_type: EntityType, **kwargs) -> SignalFormatString:
  77. prefix = suffix = ""
  78. if entity_type == EntityType.USER_MENTION:
  79. self.entities.append(
  80. MentionEntity(uuid=kwargs["uuid"], start=0, length=len(self.text)),
  81. )
  82. return self
  83. elif entity_type == EntityType.BOLD:
  84. prefix = suffix = "**"
  85. elif entity_type == EntityType.ITALIC:
  86. prefix = suffix = "_"
  87. elif entity_type == EntityType.STRIKETHROUGH:
  88. prefix = suffix = "~~"
  89. elif entity_type == EntityType.URL:
  90. if kwargs["url"] != self.text:
  91. suffix = f" ({kwargs['url']})"
  92. elif entity_type == EntityType.PREFORMATTED:
  93. prefix = f"```{kwargs['language']}\n"
  94. suffix = "\n```"
  95. elif entity_type == EntityType.INLINE_CODE:
  96. prefix = suffix = "`"
  97. elif entity_type == EntityType.BLOCKQUOTE:
  98. children = self.trim().split("\n")
  99. children = [child.prepend("> ") for child in children]
  100. return self.join(children, "\n")
  101. elif entity_type == EntityType.HEADER:
  102. prefix = "#" * kwargs["size"] + " "
  103. else:
  104. return self
  105. self._offset_entities(len(prefix))
  106. self.text = f"{prefix}{self.text}{suffix}"
  107. return self
  108. class MatrixParser(BaseMatrixParser[SignalFormatString]):
  109. fs = SignalFormatString
  110. async def user_pill_to_fstring(
  111. self, msg: SignalFormatString, user_id: UserID
  112. ) -> SignalFormatString:
  113. user = await u.User.get_by_mxid(user_id, create=False)
  114. if user and user.uuid:
  115. uuid = user.uuid
  116. else:
  117. puppet = await pu.Puppet.get_by_mxid(user_id, create=False)
  118. if puppet:
  119. uuid = puppet.uuid
  120. else:
  121. return msg
  122. return msg.format(self.e.USER_MENTION, uuid=uuid)
  123. async def parse(self, data: str) -> SignalFormatString:
  124. return cast(SignalFormatString, await super().parse(data))
  125. async def matrix_to_signal(content: TextMessageEventContent) -> tuple[str, list[Mention]]:
  126. if content.msgtype == MessageType.EMOTE:
  127. content.body = f"/me {content.body}"
  128. if content.formatted_body:
  129. content.formatted_body = f"/me {content.formatted_body}"
  130. if content.format == Format.HTML and content.formatted_body:
  131. parsed = await MatrixParser().parse(add_surrogate(content.formatted_body))
  132. text, mentions = del_surrogate(parsed.text), parsed.entities
  133. else:
  134. text, mentions = content.body, []
  135. return text, mentions