portal.py 65 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2022 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. from typing import TYPE_CHECKING, Any, AsyncGenerator, Awaitable, Callable, Union, cast
  18. from collections import deque
  19. from io import BytesIO
  20. import asyncio
  21. import json
  22. import mimetypes
  23. import asyncpg
  24. import magic
  25. from mauigpapi.types import (
  26. AnimatedMediaItem,
  27. CommandResponse,
  28. ExpiredMediaItem,
  29. MediaShareItem,
  30. MediaType,
  31. MessageSyncMessage,
  32. Reaction,
  33. ReactionStatus,
  34. ReelMediaShareItem,
  35. ReelShareType,
  36. RegularMediaItem,
  37. Thread,
  38. ThreadItem,
  39. ThreadItemType,
  40. ThreadUser,
  41. ThreadUserLastSeenAt,
  42. TypingStatus,
  43. VoiceMediaItem,
  44. )
  45. from mautrix.appservice import AppService, IntentAPI
  46. from mautrix.bridge import BasePortal, NotificationDisabler, async_getter_lock
  47. from mautrix.errors import MatrixError, MForbidden, MNotFound, SessionNotFound
  48. from mautrix.types import (
  49. AudioInfo,
  50. ContentURI,
  51. EventID,
  52. EventType,
  53. Format,
  54. ImageInfo,
  55. LocationMessageEventContent,
  56. MediaMessageEventContent,
  57. MessageEventContent,
  58. MessageType,
  59. RoomID,
  60. TextMessageEventContent,
  61. UserID,
  62. VideoInfo,
  63. )
  64. from mautrix.util import ffmpeg
  65. from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus
  66. from mautrix.util.simple_lock import SimpleLock
  67. from . import matrix as m, puppet as p, user as u
  68. from .config import Config
  69. from .db import Message as DBMessage, Portal as DBPortal, Reaction as DBReaction
  70. if TYPE_CHECKING:
  71. from .__main__ import InstagramBridge
  72. try:
  73. from mautrix.crypto.attachments import decrypt_attachment, encrypt_attachment
  74. except ImportError:
  75. encrypt_attachment = decrypt_attachment = None
  76. try:
  77. from PIL import Image
  78. except ImportError:
  79. Image = None
  80. StateBridge = EventType.find("m.bridge", EventType.Class.STATE)
  81. StateHalfShotBridge = EventType.find("uk.half-shot.bridge", EventType.Class.STATE)
  82. MediaData = Union[
  83. AnimatedMediaItem,
  84. ExpiredMediaItem,
  85. MediaShareItem,
  86. ReelMediaShareItem,
  87. RegularMediaItem,
  88. VoiceMediaItem,
  89. ]
  90. MediaUploadFunc = Callable[["u.User", MediaData, IntentAPI], Awaitable[MediaMessageEventContent]]
  91. class Portal(DBPortal, BasePortal):
  92. by_mxid: dict[RoomID, Portal] = {}
  93. by_thread_id: dict[tuple[str, int], Portal] = {}
  94. config: Config
  95. matrix: m.MatrixHandler
  96. az: AppService
  97. private_chat_portal_meta: bool
  98. _main_intent: IntentAPI | None
  99. _create_room_lock: asyncio.Lock
  100. backfill_lock: SimpleLock
  101. _msgid_dedup: deque[str]
  102. _reqid_dedup: set[str]
  103. _reaction_dedup: deque[tuple[str, int, str]]
  104. _main_intent: IntentAPI
  105. _last_participant_update: set[int]
  106. _reaction_lock: asyncio.Lock
  107. _backfill_leave: set[IntentAPI] | None
  108. _typing: set[UserID]
  109. def __init__(
  110. self,
  111. thread_id: str,
  112. receiver: int,
  113. other_user_pk: int | None,
  114. mxid: RoomID | None = None,
  115. name: str | None = None,
  116. avatar_url: ContentURI | None = None,
  117. encrypted: bool = False,
  118. name_set: bool = False,
  119. avatar_set: bool = False,
  120. relay_user_id: UserID | None = None,
  121. ) -> None:
  122. super().__init__(
  123. thread_id,
  124. receiver,
  125. other_user_pk,
  126. mxid,
  127. name,
  128. avatar_url,
  129. encrypted,
  130. name_set,
  131. avatar_set,
  132. relay_user_id,
  133. )
  134. self._create_room_lock = asyncio.Lock()
  135. self.log = self.log.getChild(thread_id)
  136. self._msgid_dedup = deque(maxlen=100)
  137. self._reaction_dedup = deque(maxlen=100)
  138. self._reqid_dedup = set()
  139. self._last_participant_update = set()
  140. self.backfill_lock = SimpleLock(
  141. "Waiting for backfilling to finish before handling %s", log=self.log
  142. )
  143. self._backfill_leave = None
  144. self._main_intent = None
  145. self._reaction_lock = asyncio.Lock()
  146. self._typing = set()
  147. self._relay_user = None
  148. @property
  149. def is_direct(self) -> bool:
  150. return self.other_user_pk is not None
  151. @property
  152. def main_intent(self) -> IntentAPI:
  153. if not self._main_intent:
  154. raise ValueError("Portal must be postinit()ed before main_intent can be used")
  155. return self._main_intent
  156. @classmethod
  157. def init_cls(cls, bridge: "InstagramBridge") -> None:
  158. cls.config = bridge.config
  159. cls.matrix = bridge.matrix
  160. cls.az = bridge.az
  161. cls.loop = bridge.loop
  162. cls.bridge = bridge
  163. cls.private_chat_portal_meta = cls.config["bridge.private_chat_portal_meta"]
  164. NotificationDisabler.puppet_cls = p.Puppet
  165. NotificationDisabler.config_enabled = cls.config["bridge.backfill.disable_notifications"]
  166. # region Misc
  167. async def _send_delivery_receipt(self, event_id: EventID) -> None:
  168. if event_id and self.config["bridge.delivery_receipts"]:
  169. try:
  170. await self.az.intent.mark_read(self.mxid, event_id)
  171. except Exception:
  172. self.log.exception("Failed to send delivery receipt for %s", event_id)
  173. async def _send_bridge_error(
  174. self,
  175. sender: u.User,
  176. err: Exception | str,
  177. event_id: EventID,
  178. event_type: EventType,
  179. message_type: MessageType | None = None,
  180. msg: str | None = None,
  181. confirmed: bool = False,
  182. status: MessageSendCheckpointStatus = MessageSendCheckpointStatus.PERM_FAILURE,
  183. ) -> None:
  184. sender.send_remote_checkpoint(
  185. status,
  186. event_id,
  187. self.mxid,
  188. event_type,
  189. message_type=message_type,
  190. error=err,
  191. )
  192. if self.config["bridge.delivery_error_reports"]:
  193. event_type_str = {
  194. EventType.REACTION: "reaction",
  195. EventType.ROOM_REDACTION: "redaction",
  196. }.get(event_type, "message")
  197. error_type = "was not" if confirmed else "may not have been"
  198. await self._send_message(
  199. self.main_intent,
  200. TextMessageEventContent(
  201. msgtype=MessageType.NOTICE,
  202. body=f"\u26a0 Your {event_type_str} {error_type} bridged: {msg or str(err)}",
  203. ),
  204. )
  205. async def _upsert_reaction(
  206. self,
  207. existing: DBReaction | None,
  208. intent: IntentAPI,
  209. mxid: EventID,
  210. message: DBMessage,
  211. sender: u.User | p.Puppet,
  212. reaction: str,
  213. ) -> None:
  214. if existing:
  215. self.log.debug(
  216. f"_upsert_reaction redacting {existing.mxid} and inserting {mxid}"
  217. f" (message: {message.mxid})"
  218. )
  219. await intent.redact(existing.mx_room, existing.mxid)
  220. await existing.edit(reaction=reaction, mxid=mxid, mx_room=message.mx_room)
  221. else:
  222. self.log.debug(f"_upsert_reaction inserting {mxid} (message: {message.mxid})")
  223. await DBReaction(
  224. mxid=mxid,
  225. mx_room=message.mx_room,
  226. ig_item_id=message.item_id,
  227. ig_receiver=self.receiver,
  228. ig_sender=sender.igpk,
  229. reaction=reaction,
  230. ).insert()
  231. # endregion
  232. # region Matrix event handling
  233. @staticmethod
  234. def _status_from_exception(e: Exception) -> MessageSendCheckpointStatus:
  235. if isinstance(e, NotImplementedError):
  236. return MessageSendCheckpointStatus.UNSUPPORTED
  237. return MessageSendCheckpointStatus.PERM_FAILURE
  238. async def handle_matrix_message(
  239. self, sender: u.User, message: MessageEventContent, event_id: EventID
  240. ) -> None:
  241. try:
  242. await self._handle_matrix_message(sender, message, event_id)
  243. except Exception as e:
  244. self.log.exception(f"Fatal error handling Matrix event {event_id}: {e}")
  245. await self._send_bridge_error(
  246. sender,
  247. e,
  248. event_id,
  249. EventType.ROOM_MESSAGE,
  250. message_type=message.msgtype,
  251. status=self._status_from_exception(e),
  252. confirmed=True,
  253. )
  254. async def _handle_matrix_image(
  255. self,
  256. sender: u.User,
  257. event_id: EventID,
  258. request_id: str,
  259. data: bytes,
  260. mime_type: str,
  261. width: int | None = None,
  262. height: int | None = None,
  263. ) -> CommandResponse:
  264. if mime_type != "image/jpeg":
  265. with BytesIO(data) as inp, BytesIO() as out:
  266. img = Image.open(inp)
  267. img.convert("RGB").save(out, format="JPEG", quality=80)
  268. data = out.getvalue()
  269. mime_type = "image/jpeg"
  270. self.log.trace(f"Uploading photo from {event_id} (mime: {mime_type})")
  271. upload_resp = await sender.client.upload_photo(
  272. data, mime=mime_type, width=width, height=height
  273. )
  274. self.log.trace(f"Broadcasting uploaded photo with request ID {request_id}")
  275. return await sender.client.broadcast(
  276. self.thread_id,
  277. ThreadItemType.CONFIGURE_PHOTO,
  278. client_context=request_id,
  279. upload_id=upload_resp.upload_id,
  280. allow_full_aspect_ratio="true",
  281. )
  282. async def _handle_matrix_video(
  283. self,
  284. sender: u.User,
  285. event_id: EventID,
  286. request_id: str,
  287. data: bytes,
  288. mime_type: str,
  289. duration: int | None = None,
  290. width: int | None = None,
  291. height: int | None = None,
  292. ) -> CommandResponse:
  293. if mime_type != "video/mp4":
  294. data = await ffmpeg.convert_bytes(
  295. data,
  296. output_extension=".mp4",
  297. output_args=("-c:v", "libx264", "-c:a", "aac"),
  298. input_mime=mime_type,
  299. )
  300. self.log.trace(f"Uploading video from {event_id}")
  301. _, upload_id = await sender.client.upload_mp4(
  302. data, duration_ms=duration, width=width, height=height
  303. )
  304. self.log.trace(f"Broadcasting uploaded video with request ID {request_id}")
  305. return await sender.client.broadcast(
  306. self.thread_id,
  307. ThreadItemType.CONFIGURE_VIDEO,
  308. client_context=request_id,
  309. upload_id=upload_id,
  310. video_result="",
  311. )
  312. async def _handle_matrix_audio(
  313. self,
  314. sender: u.User,
  315. event_id: EventID,
  316. request_id: str,
  317. data: bytes,
  318. mime_type: str,
  319. waveform: list[int],
  320. duration: int | None = None,
  321. ) -> CommandResponse:
  322. if mime_type != "audio/mp4":
  323. data = await ffmpeg.convert_bytes(
  324. data, output_extension=".m4a", output_args=("-c:a", "aac"), input_mime=mime_type
  325. )
  326. self.log.trace(f"Uploading audio from {event_id}")
  327. _, upload_id = await sender.client.upload_mp4(data, audio=True, duration_ms=duration)
  328. self.log.trace(f"Broadcasting uploaded audio with request ID {request_id}")
  329. return await sender.client.broadcast_audio(
  330. self.thread_id,
  331. is_direct=self.is_direct,
  332. client_context=request_id,
  333. upload_id=upload_id,
  334. waveform=json.dumps([(part or 0) / 1024 for part in waveform], separators=(",", ":")),
  335. waveform_sampling_frequency_hz="10",
  336. )
  337. async def _handle_matrix_message(
  338. self, orig_sender: u.User, message: MessageEventContent, event_id: EventID
  339. ) -> None:
  340. sender, is_relay = await self.get_relay_sender(orig_sender, f"message {event_id}")
  341. assert sender, "user is not logged in"
  342. assert sender.is_connected, "You're not connected to Instagram"
  343. if is_relay:
  344. await self.apply_relay_message_format(orig_sender, message)
  345. reply_to = {}
  346. if message.get_reply_to():
  347. msg = await DBMessage.get_by_mxid(message.get_reply_to(), self.mxid)
  348. if msg and msg.client_context:
  349. reply_to = {
  350. "replied_to_item_id": msg.item_id,
  351. "replied_to_client_context": msg.client_context,
  352. }
  353. request_id = sender.state.gen_client_context()
  354. self._reqid_dedup.add(request_id)
  355. self.log.debug(
  356. f"Handling Matrix message {event_id} from {sender.mxid}/{sender.igpk} "
  357. f"with request ID {request_id}"
  358. )
  359. if message.msgtype in (MessageType.EMOTE, MessageType.TEXT, MessageType.NOTICE):
  360. text = message.body
  361. if message.msgtype == MessageType.EMOTE:
  362. text = f"/me {text}"
  363. self.log.trace(f"Sending Matrix text from {event_id} with request ID {request_id}")
  364. resp = await sender.mqtt.send_text(
  365. self.thread_id, text=text, client_context=request_id, **reply_to
  366. )
  367. elif message.msgtype.is_media:
  368. if message.file and decrypt_attachment:
  369. data = await self.main_intent.download_media(message.file.url)
  370. data = decrypt_attachment(
  371. data, message.file.key.key, message.file.hashes.get("sha256"), message.file.iv
  372. )
  373. else:
  374. data = await self.main_intent.download_media(message.url)
  375. mime_type = message.info.mimetype or magic.from_buffer(data, mime=True)
  376. if message.msgtype == MessageType.IMAGE:
  377. resp = await self._handle_matrix_image(
  378. sender,
  379. event_id,
  380. request_id,
  381. data,
  382. mime_type,
  383. width=message.info.width,
  384. height=message.info.height,
  385. )
  386. elif message.msgtype == MessageType.AUDIO:
  387. waveform = message.get("org.matrix.msc1767.audio", {}).get("waveform", [0] * 30)
  388. resp = await self._handle_matrix_audio(
  389. sender,
  390. event_id,
  391. request_id,
  392. data,
  393. mime_type,
  394. waveform,
  395. duration=message.info.duration,
  396. )
  397. elif message.msgtype == MessageType.VIDEO:
  398. resp = await self._handle_matrix_video(
  399. sender,
  400. event_id,
  401. request_id,
  402. data,
  403. mime_type,
  404. duration=message.info.duration,
  405. width=message.info.width,
  406. height=message.info.height,
  407. )
  408. else:
  409. raise NotImplementedError(
  410. "Non-image/video/audio files are currently not supported"
  411. )
  412. else:
  413. raise NotImplementedError(f"Unknown message type {message.msgtype}")
  414. self.log.trace(f"Got response to message send {request_id}: {resp}")
  415. if resp.status != "ok":
  416. self.log.warning(f"Failed to handle {event_id}: {resp}")
  417. raise Exception(f"Failed to handle event. Error: {resp.payload.message}")
  418. else:
  419. sender.send_remote_checkpoint(
  420. status=MessageSendCheckpointStatus.SUCCESS,
  421. event_id=event_id,
  422. room_id=self.mxid,
  423. event_type=EventType.ROOM_MESSAGE,
  424. message_type=message.msgtype,
  425. )
  426. await self._send_delivery_receipt(event_id)
  427. self._msgid_dedup.appendleft(resp.payload.item_id)
  428. try:
  429. await DBMessage(
  430. mxid=event_id,
  431. mx_room=self.mxid,
  432. item_id=resp.payload.item_id,
  433. client_context=resp.payload.client_context,
  434. receiver=self.receiver,
  435. sender=sender.igpk,
  436. ).insert()
  437. except asyncpg.UniqueViolationError as e:
  438. self.log.warning(
  439. f"Error while persisting {event_id} ({resp.payload.client_context}) "
  440. f"-> {resp.payload.item_id}: {e}"
  441. )
  442. self._reqid_dedup.remove(request_id)
  443. self.log.debug(
  444. f"Handled Matrix message {event_id} ({resp.payload.client_context}) "
  445. f"-> {resp.payload.item_id}"
  446. )
  447. async def handle_matrix_reaction(
  448. self, sender: u.User, event_id: EventID, reacting_to: EventID, emoji: str
  449. ) -> None:
  450. try:
  451. await self._handle_matrix_reaction(sender, event_id, reacting_to, emoji)
  452. except Exception as e:
  453. self.log.exception(f"Fatal error handling Matrix event {event_id}: {e}")
  454. message = "Fatal error handling reaction (see logs for more details)"
  455. if isinstance(e, NotImplementedError):
  456. message = None
  457. await self._send_bridge_error(
  458. sender,
  459. e,
  460. event_id,
  461. EventType.REACTION,
  462. status=self._status_from_exception(e),
  463. confirmed=True,
  464. msg=message,
  465. )
  466. async def _handle_matrix_reaction(
  467. self, sender: u.User, event_id: EventID, reacting_to: EventID, emoji: str
  468. ) -> None:
  469. message = await DBMessage.get_by_mxid(reacting_to, self.mxid)
  470. if not message or message.is_internal:
  471. self.log.debug(f"Ignoring reaction to unknown event {reacting_to}")
  472. return
  473. if not await sender.is_logged_in():
  474. self.log.debug(f"Ignoring reaction by non-logged-in user {sender.mxid}")
  475. return
  476. existing = await DBReaction.get_by_item_id(message.item_id, message.receiver, sender.igpk)
  477. if existing and existing.reaction == emoji:
  478. sender.send_remote_checkpoint(
  479. status=MessageSendCheckpointStatus.SUCCESS,
  480. event_id=event_id,
  481. room_id=self.mxid,
  482. event_type=EventType.REACTION,
  483. )
  484. return
  485. dedup_id = (message.item_id, sender.igpk, emoji)
  486. self._reaction_dedup.appendleft(dedup_id)
  487. async with self._reaction_lock:
  488. try:
  489. resp = await sender.mqtt.send_reaction(
  490. self.thread_id, item_id=message.item_id, emoji=emoji
  491. )
  492. if resp.status != "ok":
  493. if resp.payload.message == "invalid unicode emoji":
  494. # Instagram doesn't support this reaction. Notify the user, and redact it
  495. # so that it doesn't get confusing.
  496. await self.main_intent.redact(
  497. self.mxid, event_id, reason="Unsupported emoji"
  498. )
  499. raise NotImplementedError(f"Instagram does not support the {emoji} emoji.")
  500. raise Exception(f"Failed to react to {event_id}: {resp}")
  501. except Exception as e:
  502. self.log.exception(f"Failed to handle {event_id}: {e}")
  503. raise
  504. else:
  505. sender.send_remote_checkpoint(
  506. status=MessageSendCheckpointStatus.SUCCESS,
  507. event_id=event_id,
  508. room_id=self.mxid,
  509. event_type=EventType.REACTION,
  510. )
  511. await self._send_delivery_receipt(event_id)
  512. self.log.trace(f"{sender.mxid} reacted to {message.item_id} with {emoji}")
  513. await self._upsert_reaction(
  514. existing, self.main_intent, event_id, message, sender, emoji
  515. )
  516. async def handle_matrix_redaction(
  517. self, orig_sender: u.User, event_id: EventID, redaction_event_id: EventID
  518. ) -> None:
  519. sender = None
  520. try:
  521. sender, _ = await self.get_relay_sender(orig_sender, f"redaction {event_id}")
  522. if not sender:
  523. raise Exception("User is not logged in")
  524. await self._handle_matrix_redaction(sender, event_id, redaction_event_id)
  525. except Exception as e:
  526. self.log.exception(f"Fatal error handling Matrix event {event_id}: {e}")
  527. await self._send_bridge_error(
  528. sender or orig_sender,
  529. e,
  530. redaction_event_id,
  531. EventType.ROOM_REDACTION,
  532. status=self._status_from_exception(e),
  533. confirmed=True,
  534. )
  535. async def _handle_matrix_redaction(
  536. self, sender: u.User, event_id: EventID, redaction_event_id: EventID
  537. ) -> None:
  538. if not sender.is_connected:
  539. raise Exception("You're not connected to Instagram")
  540. reaction = await DBReaction.get_by_mxid(event_id, self.mxid)
  541. if reaction:
  542. try:
  543. await reaction.delete()
  544. await sender.mqtt.send_reaction(
  545. self.thread_id,
  546. item_id=reaction.ig_item_id,
  547. reaction_status=ReactionStatus.DELETED,
  548. emoji="",
  549. )
  550. except Exception as e:
  551. raise Exception(f"Removing reaction failed: {e}")
  552. else:
  553. sender.send_remote_checkpoint(
  554. status=MessageSendCheckpointStatus.SUCCESS,
  555. event_id=redaction_event_id,
  556. room_id=self.mxid,
  557. event_type=EventType.ROOM_REDACTION,
  558. )
  559. await self._send_delivery_receipt(redaction_event_id)
  560. self.log.trace(f"Removed {reaction} after Matrix redaction")
  561. return
  562. message = await DBMessage.get_by_mxid(event_id, self.mxid)
  563. if message and not message.is_internal:
  564. try:
  565. await message.delete()
  566. await sender.client.delete_item(self.thread_id, message.item_id)
  567. self.log.trace(f"Removed {message} after Matrix redaction")
  568. except Exception as e:
  569. raise Exception(f"Removing message failed: {e}")
  570. else:
  571. sender.send_remote_checkpoint(
  572. status=MessageSendCheckpointStatus.SUCCESS,
  573. event_id=redaction_event_id,
  574. room_id=self.mxid,
  575. event_type=EventType.ROOM_REDACTION,
  576. )
  577. await self._send_delivery_receipt(redaction_event_id)
  578. self.log.trace(f"Removed {reaction} after Matrix redaction")
  579. return
  580. raise Exception("No message or reaction found for redaction")
  581. async def handle_matrix_typing(self, users: set[UserID]) -> None:
  582. if users == self._typing:
  583. return
  584. old_typing = self._typing
  585. self._typing = users
  586. await self._handle_matrix_typing(old_typing - users, TypingStatus.OFF)
  587. await self._handle_matrix_typing(users - old_typing, TypingStatus.TEXT)
  588. async def _handle_matrix_typing(self, users: set[UserID], status: TypingStatus) -> None:
  589. for mxid in users:
  590. user = await u.User.get_by_mxid(mxid, create=False)
  591. if (
  592. not user
  593. or not await user.is_logged_in()
  594. or user.remote_typing_status == status
  595. or not user.is_connected
  596. ):
  597. continue
  598. user.remote_typing_status = None
  599. await user.mqtt.indicate_activity(self.thread_id, status)
  600. async def handle_matrix_leave(self, user: u.User) -> None:
  601. if not await user.is_logged_in():
  602. return
  603. if self.is_direct:
  604. self.log.info(f"{user.mxid} left private chat portal with {self.other_user_pk}")
  605. if user.igpk == self.receiver:
  606. self.log.info(
  607. f"{user.mxid} was the recipient of this portal. Cleaning up and deleting..."
  608. )
  609. await self.cleanup_and_delete()
  610. else:
  611. self.log.debug(f"{user.mxid} left portal to {self.thread_id}")
  612. # TODO cleanup if empty
  613. # endregion
  614. # region Instagram event handling
  615. async def _reupload_instagram_media(
  616. self, source: u.User, media: RegularMediaItem, intent: IntentAPI
  617. ) -> MediaMessageEventContent:
  618. if media.media_type == MediaType.IMAGE:
  619. image = media.best_image
  620. if not image:
  621. raise ValueError("Attachment not available: didn't find photo URL")
  622. url = image.url
  623. msgtype = MessageType.IMAGE
  624. info = ImageInfo(height=image.height, width=image.width)
  625. elif media.media_type == MediaType.VIDEO:
  626. video = media.best_video
  627. if not video:
  628. raise ValueError("Attachment not available: didn't find video URL")
  629. url = video.url
  630. msgtype = MessageType.VIDEO
  631. info = VideoInfo(height=video.height, width=video.width)
  632. elif media.media_type == MediaType.CAROUSEL:
  633. raise ValueError(
  634. "Carousel media is not currently supported, "
  635. "please view the post on Instagram via the link below"
  636. )
  637. else:
  638. raise ValueError(
  639. f"Attachment not available: unsupported media type {media.media_type.human_name}"
  640. )
  641. return await self._reupload_instagram_file(source, url, msgtype, info, intent)
  642. async def _reupload_instagram_animated(
  643. self, source: u.User, media: AnimatedMediaItem, intent: IntentAPI
  644. ) -> MediaMessageEventContent:
  645. url = media.images.fixed_height.webp
  646. info = ImageInfo(
  647. height=int(media.images.fixed_height.height),
  648. width=int(media.images.fixed_height.width),
  649. )
  650. return await self._reupload_instagram_file(source, url, MessageType.IMAGE, info, intent)
  651. async def _reupload_instagram_voice(
  652. self, source: u.User, media: VoiceMediaItem, intent: IntentAPI
  653. ) -> MediaMessageEventContent:
  654. async def convert_to_ogg(data, mimetype):
  655. converted = await ffmpeg.convert_bytes(
  656. data, ".ogg", output_args=("-c:a", "libopus"), input_mime=mimetype
  657. )
  658. return converted, "audio/ogg"
  659. url = media.media.audio.audio_src
  660. info = AudioInfo(duration=media.media.audio.duration)
  661. waveform = [int(p * 1024) for p in media.media.audio.waveform_data]
  662. content = await self._reupload_instagram_file(
  663. source, url, MessageType.AUDIO, info, intent, convert_to_ogg
  664. )
  665. content["org.matrix.msc1767.audio"] = {
  666. "duration": media.media.audio.duration,
  667. "waveform": waveform,
  668. }
  669. content["org.matrix.msc3245.voice"] = {}
  670. return content
  671. async def _reupload_instagram_file(
  672. self,
  673. source: u.User,
  674. url: str,
  675. msgtype: MessageType,
  676. info: ImageInfo | VideoInfo | AudioInfo,
  677. intent: IntentAPI,
  678. convert_fn: Callable[[bytes, str], Awaitable[tuple[bytes, str]]] | None = None,
  679. ) -> MediaMessageEventContent:
  680. async with source.client.raw_http_get(url) as resp:
  681. try:
  682. length = int(resp.headers["Content-Length"])
  683. except KeyError:
  684. # TODO can the download be short-circuited if there's too much data?
  685. self.log.warning(
  686. "Got file download response with no Content-Length header,"
  687. "reading data dangerously"
  688. )
  689. length = 0
  690. if length > self.matrix.media_config.upload_size:
  691. self.log.debug(
  692. f"{url} was too large ({length} > {self.matrix.media_config.upload_size})"
  693. )
  694. raise ValueError("Attachment not available: too large")
  695. data = await resp.read()
  696. info.mimetype = resp.headers["Content-Type"] or magic.from_buffer(data, mime=True)
  697. # Run the conversion function on the data.
  698. if convert_fn is not None:
  699. data, info.mimetype = await convert_fn(data, info.mimetype)
  700. info.size = len(data)
  701. extension = {
  702. "image/webp": ".webp",
  703. "image/jpeg": ".jpg",
  704. "video/mp4": ".mp4",
  705. "audio/mp4": ".m4a",
  706. "audio/ogg": ".ogg",
  707. }.get(info.mimetype)
  708. extension = extension or mimetypes.guess_extension(info.mimetype) or ""
  709. file_name = f"{msgtype.value[2:]}{extension}"
  710. upload_mime_type = info.mimetype
  711. upload_file_name = file_name
  712. decryption_info = None
  713. if self.encrypted and encrypt_attachment:
  714. data, decryption_info = encrypt_attachment(data)
  715. upload_mime_type = "application/octet-stream"
  716. upload_file_name = None
  717. mxc = await intent.upload_media(
  718. data, mime_type=upload_mime_type, filename=upload_file_name
  719. )
  720. if decryption_info:
  721. decryption_info.url = mxc
  722. mxc = None
  723. return MediaMessageEventContent(
  724. body=file_name,
  725. external_url=url,
  726. url=mxc,
  727. file=decryption_info,
  728. info=info,
  729. msgtype=msgtype,
  730. )
  731. def _get_instagram_media_info(self, item: ThreadItem) -> tuple[MediaUploadFunc, MediaData]:
  732. # TODO maybe use a dict and item.item_type instead of a ton of ifs
  733. method = self._reupload_instagram_media
  734. if item.media:
  735. media_data = item.media
  736. elif item.visual_media:
  737. media_data = item.visual_media.media
  738. elif item.animated_media:
  739. media_data = item.animated_media
  740. method = self._reupload_instagram_animated
  741. elif item.voice_media:
  742. media_data = item.voice_media
  743. method = self._reupload_instagram_voice
  744. elif item.reel_share:
  745. media_data = item.reel_share.media
  746. elif item.story_share:
  747. media_data = item.story_share.media
  748. elif item.clip:
  749. media_data = item.clip.clip
  750. elif item.felix_share and item.felix_share.video:
  751. media_data = item.felix_share.video
  752. elif item.media_share:
  753. media_data = item.media_share
  754. elif item.direct_media_share:
  755. media_data = item.direct_media_share.media
  756. else:
  757. self.log.debug(f"Unknown media type in {item}")
  758. raise ValueError("Attachment not available: unsupported media type")
  759. if not media_data:
  760. self.log.debug(f"Didn't get media_data in {item}")
  761. raise ValueError("Attachment not available: unsupported media type")
  762. elif isinstance(media_data, ExpiredMediaItem):
  763. self.log.debug(f"Expired media in item {item}")
  764. raise ValueError("Attachment not available: media expired")
  765. return method, media_data
  766. async def _handle_instagram_media(
  767. self, source: u.User, intent: IntentAPI, item: ThreadItem
  768. ) -> EventID | None:
  769. try:
  770. reupload_func, media_data = self._get_instagram_media_info(item)
  771. content = await reupload_func(source, media_data, intent)
  772. except ValueError as e:
  773. content = TextMessageEventContent(body=str(e), msgtype=MessageType.NOTICE)
  774. except Exception:
  775. self.log.warning("Failed to upload media", exc_info=True)
  776. content = TextMessageEventContent(
  777. body="Attachment not available: failed to copy file", msgtype=MessageType.NOTICE
  778. )
  779. await self._add_instagram_reply(content, item.replied_to_message)
  780. return await self._send_message(intent, content, timestamp=item.timestamp // 1000)
  781. async def _handle_instagram_media_share(
  782. self, source: u.User, intent: IntentAPI, item: ThreadItem
  783. ) -> EventID | None:
  784. item_type_name = None
  785. if item.media_share:
  786. share_item = item.media_share
  787. elif item.clip:
  788. share_item = item.clip.clip
  789. item_type_name = "clip"
  790. elif item.felix_share and item.felix_share.video:
  791. share_item = item.felix_share.video
  792. elif item.story_share:
  793. share_item = item.story_share.media
  794. item_type_name = "story"
  795. elif item.direct_media_share:
  796. share_item = item.direct_media_share.media
  797. else:
  798. return None
  799. item_type_name = item_type_name or share_item.media_type.human_name
  800. user_text = f"@{share_item.user.username}"
  801. user_link = (
  802. f'<a href="https://www.instagram.com/{share_item.user.username}/">{user_text}</a>'
  803. )
  804. prefix = TextMessageEventContent(
  805. msgtype=MessageType.NOTICE,
  806. format=Format.HTML,
  807. body=f"Sent {user_text}'s {item_type_name}",
  808. formatted_body=f"Sent {user_link}'s {item_type_name}",
  809. )
  810. if item.direct_media_share and item.direct_media_share.media_share_type == "tag":
  811. tagged_user_id = item.direct_media_share.tagged_user_id
  812. if tagged_user_id == source.igpk and share_item.user.pk == self.other_user_pk:
  813. prefix.body = prefix.formatted_body = "Tagged you in their post"
  814. elif share_item.user.pk == source.igpk and tagged_user_id == self.other_user_pk:
  815. prefix.body = prefix.formatted_body = "Tagged them in your post"
  816. await self._send_message(intent, prefix, timestamp=item.timestamp // 1000)
  817. event_id = await self._handle_instagram_media(source, intent, item)
  818. external_url = f"https://www.instagram.com/p/{share_item.code}/"
  819. if share_item.caption:
  820. caption_body = (
  821. f"> {share_item.caption.user.username}: {share_item.caption.text}\n\n"
  822. f"{external_url}"
  823. )
  824. caption_formatted_body = (
  825. f"<blockquote><strong>{share_item.caption.user.username}</strong>"
  826. f" {share_item.caption.text}</blockquote>"
  827. f'<a href="{external_url}">instagram.com/p/{share_item.code}</a>'
  828. )
  829. else:
  830. caption_body = external_url
  831. caption_formatted_body = (
  832. f'<a href="{external_url}">instagram.com/p/{share_item.code}</a>'
  833. )
  834. caption = TextMessageEventContent(
  835. msgtype=MessageType.TEXT,
  836. body=caption_body,
  837. formatted_body=caption_formatted_body,
  838. format=Format.HTML,
  839. external_url=external_url,
  840. )
  841. await self._send_message(intent, caption, timestamp=item.timestamp // 1000)
  842. return event_id
  843. async def _handle_instagram_reel_share(
  844. self, source: u.User, intent: IntentAPI, item: ThreadItem
  845. ) -> EventID | None:
  846. media = item.reel_share.media
  847. prefix_html = None
  848. if item.reel_share.type == ReelShareType.REPLY:
  849. if item.reel_share.reel_owner_id == source.igpk:
  850. prefix = "Replied to your story"
  851. else:
  852. username = media.user.username
  853. prefix = f"Sent @{username}'s story"
  854. user_link = f'<a href="https://www.instagram.com/{username}/">@{username}</a>'
  855. prefix_html = f"Sent {user_link}'s story"
  856. elif item.reel_share.type == ReelShareType.REACTION:
  857. prefix = "Reacted to your story"
  858. elif item.reel_share.type == ReelShareType.MENTION:
  859. if item.reel_share.mentioned_user_id == source.igpk:
  860. prefix = "Mentioned you in their story"
  861. else:
  862. prefix = "You mentioned them in your story"
  863. else:
  864. self.log.debug(f"Unsupported reel share type {item.reel_share.type}")
  865. return None
  866. prefix_content = TextMessageEventContent(msgtype=MessageType.NOTICE, body=prefix)
  867. if prefix_html:
  868. prefix_content.format = Format.HTML
  869. prefix_content.formatted_body = prefix_html
  870. content = TextMessageEventContent(msgtype=MessageType.TEXT, body=item.reel_share.text)
  871. if not content.body and isinstance(media, MediaShareItem):
  872. content.body = media.caption.text if media.caption else ""
  873. if not content.body:
  874. content.body = "<no caption>"
  875. await self._send_message(intent, prefix_content, timestamp=item.timestamp // 1000)
  876. if isinstance(media, ExpiredMediaItem):
  877. # TODO send message about expired story
  878. pass
  879. else:
  880. fake_item_id = f"fi.mau.instagram.reel_share.{item.user_id}.{media.pk}"
  881. existing = await DBMessage.get_by_item_id(fake_item_id, self.receiver)
  882. if existing:
  883. # If the user already reacted or replied to the same reel share item,
  884. # use a Matrix reply instead of reposting the image.
  885. content.set_reply(existing.mxid)
  886. else:
  887. media_event_id = await self._handle_instagram_media(source, intent, item)
  888. await DBMessage(
  889. mxid=media_event_id,
  890. mx_room=self.mxid,
  891. item_id=fake_item_id,
  892. client_context=None,
  893. receiver=self.receiver,
  894. sender=media.user.pk,
  895. ).insert()
  896. return await self._send_message(intent, content, timestamp=item.timestamp // 1000)
  897. async def _handle_instagram_text(
  898. self, intent: IntentAPI, item: ThreadItem, text: str
  899. ) -> EventID:
  900. content = TextMessageEventContent(msgtype=MessageType.TEXT, body=text)
  901. await self._add_instagram_reply(content, item.replied_to_message)
  902. return await self._send_message(intent, content, timestamp=item.timestamp // 1000)
  903. async def _send_instagram_unhandled(self, intent: IntentAPI, item: ThreadItem) -> EventID:
  904. content = TextMessageEventContent(
  905. msgtype=MessageType.NOTICE, body=f"Unsupported message type {item.item_type.value}"
  906. )
  907. await self._add_instagram_reply(content, item.replied_to_message)
  908. return await self._send_message(intent, content, timestamp=item.timestamp // 1000)
  909. async def _handle_instagram_location(
  910. self, intent: IntentAPI, item: ThreadItem
  911. ) -> EventID | None:
  912. loc = item.location
  913. if not loc.lng or not loc.lat:
  914. # TODO handle somehow
  915. return None
  916. long_char = "E" if loc.lng > 0 else "W"
  917. lat_char = "N" if loc.lat > 0 else "S"
  918. body = (
  919. f"{loc.name} - {round(abs(loc.lat), 4)}° {lat_char}, "
  920. f"{round(abs(loc.lng), 4)}° {long_char}"
  921. )
  922. url = f"https://www.openstreetmap.org/#map=15/{loc.lat}/{loc.lng}"
  923. external_url = None
  924. if loc.external_source == "facebook_places":
  925. external_url = f"https://www.facebook.com/{loc.short_name}-{loc.facebook_places_id}"
  926. content = LocationMessageEventContent(
  927. msgtype=MessageType.LOCATION,
  928. geo_uri=f"geo:{loc.lat},{loc.lng}",
  929. body=f"Location: {body}\n{url}",
  930. external_url=external_url,
  931. )
  932. content["format"] = str(Format.HTML)
  933. content["formatted_body"] = f"Location: <a href='{url}'>{body}</a>"
  934. await self._add_instagram_reply(content, item.replied_to_message)
  935. return await self._send_message(intent, content, timestamp=item.timestamp // 1000)
  936. async def _handle_instagram_profile(
  937. self, intent: IntentAPI, item: ThreadItem
  938. ) -> EventID | None:
  939. username = item.profile.username
  940. user_link = f'<a href="https://www.instagram.com/{username}/">@{username}</a>'
  941. text = f"Shared @{username}'s profile"
  942. html = f"Shared {user_link}'s profile"
  943. content = TextMessageEventContent(
  944. msgtype=MessageType.TEXT, format=Format.HTML, body=text, formatted_body=html
  945. )
  946. await self._add_instagram_reply(content, item.replied_to_message)
  947. return await self._send_message(intent, content, timestamp=item.timestamp // 1000)
  948. async def handle_instagram_item(
  949. self, source: u.User, sender: p.Puppet, item: ThreadItem, is_backfill: bool = False
  950. ) -> None:
  951. try:
  952. await self._handle_instagram_item(source, sender, item, is_backfill)
  953. except Exception:
  954. self.log.exception("Fatal error handling Instagram item")
  955. self.log.trace("Item content: %s", item.serialize())
  956. async def _add_instagram_reply(
  957. self, content: MessageEventContent, reply_to: ThreadItem | None
  958. ) -> None:
  959. if not reply_to:
  960. return
  961. message = await DBMessage.get_by_item_id(reply_to.item_id, self.receiver)
  962. if not message:
  963. return
  964. content.set_reply(message.mxid)
  965. if not isinstance(content, TextMessageEventContent):
  966. return
  967. try:
  968. evt = await self.main_intent.get_event(message.mx_room, message.mxid)
  969. except (MNotFound, MForbidden):
  970. evt = None
  971. if not evt:
  972. return
  973. if evt.type == EventType.ROOM_ENCRYPTED:
  974. try:
  975. evt = await self.matrix.e2ee.decrypt(evt, wait_session_timeout=0)
  976. except SessionNotFound:
  977. return
  978. if isinstance(evt.content, TextMessageEventContent):
  979. evt.content.trim_reply_fallback()
  980. content.set_reply(evt)
  981. async def _handle_instagram_item(
  982. self, source: u.User, sender: p.Puppet, item: ThreadItem, is_backfill: bool = False
  983. ) -> None:
  984. if not isinstance(item, ThreadItem):
  985. # Parsing these items failed, they should have been logged already
  986. return
  987. elif item.client_context in self._reqid_dedup:
  988. self.log.debug(
  989. f"Ignoring message {item.item_id} ({item.client_context}) by "
  990. f"{item.user_id} as it was sent by us (client_context in dedup queue)"
  991. )
  992. elif item.item_id in self._msgid_dedup:
  993. self.log.debug(
  994. f"Ignoring message {item.item_id} ({item.client_context}) by "
  995. f"{item.user_id} as it was already handled (message.id in dedup queue)"
  996. )
  997. elif await DBMessage.get_by_item_id(item.item_id, self.receiver) is not None:
  998. self.log.debug(
  999. f"Ignoring message {item.item_id} ({item.client_context}) by "
  1000. f"{item.user_id} as it was already handled (message.id in database)"
  1001. )
  1002. else:
  1003. self.log.debug(
  1004. f"Starting handling of message {item.item_id} ({item.client_context}) "
  1005. f"by {item.user_id}"
  1006. )
  1007. self._msgid_dedup.appendleft(item.item_id)
  1008. if self.backfill_lock.locked and sender.need_backfill_invite(self):
  1009. self.log.debug("Adding %s's default puppet to room for backfilling", sender.mxid)
  1010. if self.is_direct:
  1011. await self.main_intent.invite_user(self.mxid, sender.default_mxid)
  1012. intent = sender.default_mxid_intent
  1013. await intent.ensure_joined(self.mxid)
  1014. self._backfill_leave.add(intent)
  1015. else:
  1016. intent = sender.intent_for(self)
  1017. event_id = None
  1018. if item.media or item.animated_media or item.voice_media or item.visual_media:
  1019. event_id = await self._handle_instagram_media(source, intent, item)
  1020. elif item.location:
  1021. event_id = await self._handle_instagram_location(intent, item)
  1022. elif item.profile:
  1023. event_id = await self._handle_instagram_profile(intent, item)
  1024. elif item.reel_share:
  1025. event_id = await self._handle_instagram_reel_share(source, intent, item)
  1026. elif (
  1027. item.media_share
  1028. or item.direct_media_share
  1029. or item.story_share
  1030. or item.clip
  1031. or item.felix_share
  1032. ):
  1033. event_id = await self._handle_instagram_media_share(source, intent, item)
  1034. elif item.action_log:
  1035. # These probably don't need to be bridged
  1036. self.log.debug(f"Ignoring action log message {item.item_id}")
  1037. return
  1038. # TODO handle item.clip?
  1039. if item.text:
  1040. event_id = await self._handle_instagram_text(intent, item, item.text)
  1041. elif item.like:
  1042. # We handle likes as text because Matrix clients do big emoji on their own.
  1043. event_id = await self._handle_instagram_text(intent, item, item.like)
  1044. elif item.link:
  1045. event_id = await self._handle_instagram_text(intent, item, item.link.text)
  1046. handled = bool(event_id)
  1047. if not event_id:
  1048. self.log.debug(f"Unhandled Instagram message {item.item_id}")
  1049. event_id = await self._send_instagram_unhandled(intent, item)
  1050. msg = DBMessage(
  1051. mxid=event_id,
  1052. mx_room=self.mxid,
  1053. item_id=item.item_id,
  1054. client_context=item.client_context,
  1055. receiver=self.receiver,
  1056. sender=sender.pk,
  1057. )
  1058. await msg.insert()
  1059. await self._send_delivery_receipt(event_id)
  1060. if handled:
  1061. self.log.debug(f"Handled Instagram message {item.item_id} -> {event_id}")
  1062. else:
  1063. self.log.debug(
  1064. f"Unhandled Instagram message {item.item_id} "
  1065. f"(type {item.item_type} -> fallback error {event_id})"
  1066. )
  1067. if is_backfill and item.reactions:
  1068. await self._handle_instagram_reactions(msg, item.reactions.emojis)
  1069. async def handle_instagram_remove(self, item_id: str) -> None:
  1070. message = await DBMessage.get_by_item_id(item_id, self.receiver)
  1071. if message is None:
  1072. return
  1073. await message.delete()
  1074. sender = await p.Puppet.get_by_pk(message.sender)
  1075. try:
  1076. await sender.intent_for(self).redact(self.mxid, message.mxid)
  1077. except MForbidden:
  1078. await self.main_intent.redact(self.mxid, message.mxid)
  1079. self.log.debug(f"Redacted {message.mxid} after Instagram unsend")
  1080. async def _handle_instagram_reactions(
  1081. self, message: DBMessage, reactions: list[Reaction]
  1082. ) -> None:
  1083. old_reactions: dict[int, DBReaction]
  1084. old_reactions = {
  1085. reaction.ig_sender: reaction
  1086. for reaction in await DBReaction.get_all_by_item_id(message.item_id, self.receiver)
  1087. }
  1088. for new_reaction in reactions:
  1089. old_reaction = old_reactions.pop(new_reaction.sender_id, None)
  1090. if old_reaction and old_reaction.reaction == new_reaction.emoji:
  1091. continue
  1092. puppet = await p.Puppet.get_by_pk(new_reaction.sender_id)
  1093. intent = puppet.intent_for(self)
  1094. reaction_event_id = await intent.react(self.mxid, message.mxid, new_reaction.emoji)
  1095. await self._upsert_reaction(
  1096. old_reaction, intent, reaction_event_id, message, puppet, new_reaction.emoji
  1097. )
  1098. for old_reaction in old_reactions.values():
  1099. await old_reaction.delete()
  1100. puppet = await p.Puppet.get_by_pk(old_reaction.ig_sender)
  1101. await puppet.intent_for(self).redact(self.mxid, old_reaction.mxid)
  1102. async def handle_instagram_update(self, item: MessageSyncMessage) -> None:
  1103. message = await DBMessage.get_by_item_id(item.item_id, self.receiver)
  1104. if not message:
  1105. return
  1106. if item.has_seen:
  1107. puppet = await p.Puppet.get_by_pk(item.has_seen, create=False)
  1108. if puppet:
  1109. await puppet.intent_for(self).mark_read(self.mxid, message.mxid)
  1110. else:
  1111. async with self._reaction_lock:
  1112. await self._handle_instagram_reactions(
  1113. message, (item.reactions.emojis if item.reactions else [])
  1114. )
  1115. # endregion
  1116. # region Updating portal info
  1117. def _get_thread_name(self, thread: Thread) -> str:
  1118. if self.is_direct:
  1119. if self.other_user_pk == thread.viewer_id and len(thread.users) == 0:
  1120. return "Instagram chat with yourself"
  1121. elif len(thread.users) == 1:
  1122. tpl = self.config["bridge.private_chat_name_template"]
  1123. ui = thread.users[0]
  1124. return tpl.format(displayname=ui.full_name, id=ui.pk, username=ui.username)
  1125. pass
  1126. elif thread.thread_title:
  1127. return self.config["bridge.group_chat_name_template"].format(name=thread.thread_title)
  1128. else:
  1129. return ""
  1130. async def update_info(self, thread: Thread, source: u.User) -> None:
  1131. changed = await self._update_name(self._get_thread_name(thread))
  1132. changed = await self._update_participants(thread.users, source) or changed
  1133. if changed:
  1134. await self.update_bridge_info()
  1135. await self.update()
  1136. # TODO update power levels with thread.admin_user_ids
  1137. async def _update_name(self, name: str) -> bool:
  1138. if name and (self.name != name or not self.name_set):
  1139. self.name = name
  1140. if self.mxid:
  1141. try:
  1142. await self.main_intent.set_room_name(self.mxid, name)
  1143. self.name_set = True
  1144. except Exception:
  1145. self.log.exception("Failed to update name")
  1146. self.name_set = False
  1147. return True
  1148. return False
  1149. async def _update_photo_from_puppet(self, puppet: p.Puppet) -> bool:
  1150. if not self.private_chat_portal_meta and not self.encrypted:
  1151. return False
  1152. if self.avatar_set and self.avatar_url == puppet.photo_mxc:
  1153. return False
  1154. self.avatar_url = puppet.photo_mxc
  1155. if self.mxid:
  1156. try:
  1157. await self.main_intent.set_room_avatar(self.mxid, puppet.photo_mxc)
  1158. self.avatar_set = True
  1159. except Exception:
  1160. self.log.exception("Failed to set room avatar")
  1161. self.avatar_set = False
  1162. return True
  1163. async def _update_participants(self, users: list[ThreadUser], source: u.User) -> bool:
  1164. meta_changed = False
  1165. # Make sure puppets who should be here are here
  1166. for user in users:
  1167. puppet = await p.Puppet.get_by_pk(user.pk)
  1168. await puppet.update_info(user, source)
  1169. if self.mxid:
  1170. await puppet.intent_for(self).ensure_joined(self.mxid)
  1171. if puppet.pk == self.other_user_pk:
  1172. meta_changed = await self._update_photo_from_puppet(puppet)
  1173. if self.mxid:
  1174. # Kick puppets who shouldn't be here
  1175. current_members = {int(user.pk) for user in users}
  1176. for user_id in await self.main_intent.get_room_members(self.mxid):
  1177. pk = p.Puppet.get_id_from_mxid(user_id)
  1178. if pk and pk not in current_members and pk != self.other_user_pk:
  1179. await self.main_intent.kick_user(
  1180. self.mxid,
  1181. p.Puppet.get_mxid_from_id(pk),
  1182. reason="User had left this Instagram DM",
  1183. )
  1184. return meta_changed
  1185. async def _update_read_receipts(self, receipts: dict[int | str, ThreadUserLastSeenAt]) -> None:
  1186. for user_id, receipt in receipts.items():
  1187. message = await DBMessage.get_by_item_id(receipt.item_id, self.receiver)
  1188. if not message:
  1189. continue
  1190. puppet = await p.Puppet.get_by_pk(int(user_id), create=False)
  1191. if not puppet:
  1192. continue
  1193. try:
  1194. await puppet.intent_for(self).mark_read(message.mx_room, message.mxid)
  1195. except Exception:
  1196. self.log.warning(
  1197. f"Failed to mark {message.mxid} in {message.mx_room} "
  1198. f"as read by {puppet.intent.mxid}",
  1199. exc_info=True,
  1200. )
  1201. # endregion
  1202. # region Backfilling
  1203. async def backfill(self, source: u.User, is_initial: bool = False) -> None:
  1204. limit = (
  1205. self.config["bridge.backfill.initial_limit"]
  1206. if is_initial
  1207. else self.config["bridge.backfill.missed_limit"]
  1208. )
  1209. if limit == 0:
  1210. return
  1211. elif limit < 0:
  1212. limit = None
  1213. with self.backfill_lock:
  1214. await self._backfill(source, is_initial, limit)
  1215. async def _backfill(self, source: u.User, is_initial: bool, limit: int) -> None:
  1216. self.log.debug("Backfilling history through %s", source.mxid)
  1217. entries = await self._fetch_backfill_items(source, is_initial, limit)
  1218. if not entries:
  1219. self.log.debug("Didn't get any items to backfill from server")
  1220. return
  1221. self.log.debug("Got %d entries from server", len(entries))
  1222. self._backfill_leave = set()
  1223. async with NotificationDisabler(self.mxid, source):
  1224. for entry in reversed(entries):
  1225. sender = await p.Puppet.get_by_pk(int(entry.user_id))
  1226. await self.handle_instagram_item(source, sender, entry, is_backfill=True)
  1227. for intent in self._backfill_leave:
  1228. self.log.trace("Leaving room with %s post-backfill", intent.mxid)
  1229. await intent.leave_room(self.mxid)
  1230. self._backfill_leave = None
  1231. self.log.info("Backfilled %d messages through %s", len(entries), source.mxid)
  1232. async def _fetch_backfill_items(
  1233. self, source: u.User, is_initial: bool, limit: int
  1234. ) -> list[ThreadItem]:
  1235. items = []
  1236. self.log.debug("Fetching up to %d messages through %s", limit, source.igpk)
  1237. async for item in source.client.iter_thread(self.thread_id):
  1238. if len(items) >= limit:
  1239. self.log.debug(f"Fetched {len(items)} messages (the limit)")
  1240. break
  1241. elif not is_initial:
  1242. msg = await DBMessage.get_by_item_id(item.item_id, receiver=self.receiver)
  1243. if msg is not None:
  1244. self.log.debug(
  1245. f"Fetched {len(items)} messages and hit a message"
  1246. " that's already in the database."
  1247. )
  1248. break
  1249. items.append(item)
  1250. return items
  1251. # endregion
  1252. # region Bridge info state event
  1253. @property
  1254. def bridge_info_state_key(self) -> str:
  1255. return f"net.maunium.instagram://instagram/{self.thread_id}"
  1256. @property
  1257. def bridge_info(self) -> dict[str, Any]:
  1258. return {
  1259. "bridgebot": self.az.bot_mxid,
  1260. "creator": self.main_intent.mxid,
  1261. "protocol": {
  1262. "id": "instagram",
  1263. "displayname": "Instagram DM",
  1264. "avatar_url": self.config["appservice.bot_avatar"],
  1265. },
  1266. "channel": {
  1267. "id": self.thread_id,
  1268. "displayname": self.name,
  1269. "avatar_url": self.avatar_url,
  1270. },
  1271. }
  1272. async def update_bridge_info(self) -> None:
  1273. if not self.mxid:
  1274. self.log.debug("Not updating bridge info: no Matrix room created")
  1275. return
  1276. try:
  1277. self.log.debug("Updating bridge info...")
  1278. await self.main_intent.send_state_event(
  1279. self.mxid, StateBridge, self.bridge_info, self.bridge_info_state_key
  1280. )
  1281. # TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec
  1282. await self.main_intent.send_state_event(
  1283. self.mxid, StateHalfShotBridge, self.bridge_info, self.bridge_info_state_key
  1284. )
  1285. except Exception:
  1286. self.log.warning("Failed to update bridge info", exc_info=True)
  1287. # endregion
  1288. # region Creating Matrix rooms
  1289. async def create_matrix_room(self, source: u.User, info: Thread) -> RoomID | None:
  1290. if self.mxid:
  1291. try:
  1292. await self.update_matrix_room(source, info)
  1293. except Exception:
  1294. self.log.exception("Failed to update portal")
  1295. return self.mxid
  1296. async with self._create_room_lock:
  1297. return await self._create_matrix_room(source, info)
  1298. def _get_invite_content(self, double_puppet: p.Puppet | None) -> dict[str, bool]:
  1299. invite_content = {}
  1300. if double_puppet:
  1301. invite_content["fi.mau.will_auto_accept"] = True
  1302. if self.is_direct:
  1303. invite_content["is_direct"] = True
  1304. return invite_content
  1305. async def update_matrix_room(
  1306. self, source: u.User, info: Thread, backfill: bool = False
  1307. ) -> None:
  1308. puppet = await p.Puppet.get_by_custom_mxid(source.mxid)
  1309. await self.main_intent.invite_user(
  1310. self.mxid,
  1311. source.mxid,
  1312. check_cache=True,
  1313. extra_content=self._get_invite_content(puppet),
  1314. )
  1315. if puppet:
  1316. did_join = await puppet.intent.ensure_joined(self.mxid)
  1317. if did_join and self.is_direct:
  1318. await source.update_direct_chats({self.main_intent.mxid: [self.mxid]})
  1319. await self.update_info(info, source)
  1320. if backfill:
  1321. last_msg = await DBMessage.get_by_item_id(
  1322. info.last_permanent_item.item_id, receiver=self.receiver
  1323. )
  1324. if last_msg is None:
  1325. self.log.debug(
  1326. f"Last permanent item ({info.last_permanent_item.item_id})"
  1327. " not found in database, starting backfilling"
  1328. )
  1329. await self.backfill(source, is_initial=False)
  1330. await self._update_read_receipts(info.last_seen_at)
  1331. async def _create_matrix_room(self, source: u.User, info: Thread) -> RoomID | None:
  1332. if self.mxid:
  1333. await self.update_matrix_room(source, info)
  1334. return self.mxid
  1335. await self.update_info(info, source)
  1336. self.log.debug("Creating Matrix room")
  1337. name: str | None = None
  1338. initial_state = [
  1339. {
  1340. "type": str(StateBridge),
  1341. "state_key": self.bridge_info_state_key,
  1342. "content": self.bridge_info,
  1343. },
  1344. # TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec
  1345. {
  1346. "type": str(StateHalfShotBridge),
  1347. "state_key": self.bridge_info_state_key,
  1348. "content": self.bridge_info,
  1349. },
  1350. ]
  1351. invites = []
  1352. if self.config["bridge.encryption.default"] and self.matrix.e2ee:
  1353. self.encrypted = True
  1354. initial_state.append(
  1355. {
  1356. "type": "m.room.encryption",
  1357. "content": {"algorithm": "m.megolm.v1.aes-sha2"},
  1358. }
  1359. )
  1360. if self.is_direct:
  1361. invites.append(self.az.bot_mxid)
  1362. if self.encrypted or self.private_chat_portal_meta or not self.is_direct:
  1363. name = self.name
  1364. if self.config["appservice.community_id"]:
  1365. initial_state.append(
  1366. {
  1367. "type": "m.room.related_groups",
  1368. "content": {"groups": [self.config["appservice.community_id"]]},
  1369. }
  1370. )
  1371. # We lock backfill lock here so any messages that come between the room being created
  1372. # and the initial backfill finishing wouldn't be bridged before the backfill messages.
  1373. with self.backfill_lock:
  1374. creation_content = {}
  1375. if not self.config["bridge.federate_rooms"]:
  1376. creation_content["m.federate"] = False
  1377. self.mxid = await self.main_intent.create_room(
  1378. name=name,
  1379. is_direct=self.is_direct,
  1380. initial_state=initial_state,
  1381. invitees=invites,
  1382. creation_content=creation_content,
  1383. )
  1384. if not self.mxid:
  1385. raise Exception("Failed to create room: no mxid returned")
  1386. if self.encrypted and self.matrix.e2ee and self.is_direct:
  1387. try:
  1388. await self.az.intent.ensure_joined(self.mxid)
  1389. except Exception:
  1390. self.log.warning(f"Failed to add bridge bot to new private chat {self.mxid}")
  1391. await self.update()
  1392. self.log.debug(f"Matrix room created: {self.mxid}")
  1393. self.by_mxid[self.mxid] = self
  1394. puppet = await p.Puppet.get_by_custom_mxid(source.mxid)
  1395. await self.main_intent.invite_user(
  1396. self.mxid, source.mxid, extra_content=self._get_invite_content(puppet)
  1397. )
  1398. if puppet:
  1399. try:
  1400. if self.is_direct:
  1401. await source.update_direct_chats({self.main_intent.mxid: [self.mxid]})
  1402. await puppet.intent.join_room_by_id(self.mxid)
  1403. except MatrixError:
  1404. self.log.debug(
  1405. "Failed to join custom puppet into newly created portal", exc_info=True
  1406. )
  1407. await self._update_participants(info.users, source)
  1408. try:
  1409. await self.backfill(source, is_initial=True)
  1410. except Exception:
  1411. self.log.exception("Failed to backfill new portal")
  1412. await self._update_read_receipts(info.last_seen_at)
  1413. return self.mxid
  1414. # endregion
  1415. # region Database getters
  1416. async def postinit(self) -> None:
  1417. self.by_thread_id[(self.thread_id, self.receiver)] = self
  1418. if self.mxid:
  1419. self.by_mxid[self.mxid] = self
  1420. self._main_intent = (
  1421. (await p.Puppet.get_by_pk(self.other_user_pk)).default_mxid_intent
  1422. if self.other_user_pk
  1423. else self.az.intent
  1424. )
  1425. async def delete(self) -> None:
  1426. await DBMessage.delete_all(self.mxid)
  1427. self.by_mxid.pop(self.mxid, None)
  1428. self.mxid = None
  1429. self.encrypted = False
  1430. await self.update()
  1431. async def save(self) -> None:
  1432. await self.update()
  1433. @classmethod
  1434. def all_with_room(cls) -> AsyncGenerator[Portal, None]:
  1435. return cls._db_to_portals(super().all_with_room())
  1436. @classmethod
  1437. def find_private_chats_with(cls, other_user: int) -> AsyncGenerator[Portal, None]:
  1438. return cls._db_to_portals(super().find_private_chats_with(other_user))
  1439. @classmethod
  1440. async def _db_to_portals(cls, query: Awaitable[list[Portal]]) -> AsyncGenerator[Portal, None]:
  1441. portals = await query
  1442. for index, portal in enumerate(portals):
  1443. try:
  1444. yield cls.by_thread_id[(portal.thread_id, portal.receiver)]
  1445. except KeyError:
  1446. await portal.postinit()
  1447. yield portal
  1448. @classmethod
  1449. @async_getter_lock
  1450. async def get_by_mxid(cls, mxid: RoomID) -> Portal | None:
  1451. try:
  1452. return cls.by_mxid[mxid]
  1453. except KeyError:
  1454. pass
  1455. portal = cast(cls, await super().get_by_mxid(mxid))
  1456. if portal is not None:
  1457. await portal.postinit()
  1458. return portal
  1459. return None
  1460. @classmethod
  1461. @async_getter_lock
  1462. async def get_by_thread_id(
  1463. cls,
  1464. thread_id: str,
  1465. *,
  1466. receiver: int,
  1467. is_group: bool | None = None,
  1468. other_user_pk: int | None = None,
  1469. ) -> Portal | None:
  1470. if is_group and receiver != 0:
  1471. receiver = 0
  1472. try:
  1473. return cls.by_thread_id[(thread_id, receiver)]
  1474. except KeyError:
  1475. pass
  1476. if is_group is None and receiver != 0:
  1477. try:
  1478. return cls.by_thread_id[(thread_id, 0)]
  1479. except KeyError:
  1480. pass
  1481. portal = cast(
  1482. cls,
  1483. await super().get_by_thread_id(
  1484. thread_id, receiver=receiver, rec_must_match=is_group is not None
  1485. ),
  1486. )
  1487. if portal is not None:
  1488. await portal.postinit()
  1489. return portal
  1490. if is_group is not None:
  1491. portal = cls(thread_id, receiver, other_user_pk=other_user_pk)
  1492. await portal.insert()
  1493. await portal.postinit()
  1494. return portal
  1495. return None
  1496. @classmethod
  1497. async def get_by_thread(cls, thread: Thread, receiver: int) -> Portal | None:
  1498. if thread.is_group:
  1499. receiver = 0
  1500. other_user_pk = None
  1501. else:
  1502. if len(thread.users) == 0:
  1503. other_user_pk = receiver
  1504. else:
  1505. other_user_pk = thread.users[0].pk
  1506. return await cls.get_by_thread_id(
  1507. thread.thread_id,
  1508. receiver=receiver,
  1509. is_group=thread.is_group,
  1510. other_user_pk=other_user_pk,
  1511. )
  1512. # endregion