|
@@ -30,6 +30,7 @@ from yarl import URL
|
|
|
import asyncpg
|
|
|
import magic
|
|
|
|
|
|
+from mauigpapi.errors import IGRateLimitError
|
|
|
from mauigpapi.types import (
|
|
|
AnimatedMediaItem,
|
|
|
CommandResponse,
|
|
@@ -51,11 +52,14 @@ from mauigpapi.types import (
|
|
|
VoiceMediaItem,
|
|
|
XMAMediaShareItem,
|
|
|
)
|
|
|
-from mautrix.appservice import AppService, IntentAPI
|
|
|
-from mautrix.bridge import BasePortal, NotificationDisabler, async_getter_lock
|
|
|
+from mautrix.appservice import DOUBLE_PUPPET_SOURCE_KEY, IntentAPI
|
|
|
+from mautrix.bridge import BasePortal, async_getter_lock
|
|
|
from mautrix.errors import MatrixError, MForbidden, MNotFound, SessionNotFound
|
|
|
from mautrix.types import (
|
|
|
AudioInfo,
|
|
|
+ BatchID,
|
|
|
+ BatchSendEvent,
|
|
|
+ BatchSendStateEvent,
|
|
|
BeeperMessageStatusEventContent,
|
|
|
ContentURI,
|
|
|
EventID,
|
|
@@ -64,6 +68,8 @@ from mautrix.types import (
|
|
|
ImageInfo,
|
|
|
LocationMessageEventContent,
|
|
|
MediaMessageEventContent,
|
|
|
+ Membership,
|
|
|
+ MemberStateEventContent,
|
|
|
MessageEventContent,
|
|
|
MessageStatus,
|
|
|
MessageStatusReason,
|
|
@@ -77,11 +83,10 @@ from mautrix.types import (
|
|
|
)
|
|
|
from mautrix.util import ffmpeg
|
|
|
from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus
|
|
|
-from mautrix.util.simple_lock import SimpleLock
|
|
|
|
|
|
from . import matrix as m, puppet as p, user as u
|
|
|
from .config import Config
|
|
|
-from .db import Message as DBMessage, Portal as DBPortal, Reaction as DBReaction
|
|
|
+from .db import Backfill, Message as DBMessage, Portal as DBPortal, Reaction as DBReaction
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
|
from .__main__ import InstagramBridge
|
|
@@ -110,6 +115,10 @@ MediaData = Union[
|
|
|
]
|
|
|
MediaUploadFunc = Callable[["u.User", MediaData, IntentAPI], Awaitable[MediaMessageEventContent]]
|
|
|
|
|
|
+PortalCreateDummy = EventType.find("fi.mau.dummy.portal_created", EventType.Class.MESSAGE)
|
|
|
+HistorySyncMarkerMessage = EventType.find("org.matrix.msc2716.marker", EventType.Class.MESSAGE)
|
|
|
+ConvertedMessage = tuple[EventType, MessageEventContent]
|
|
|
+
|
|
|
# This doesn't need to capture all valid URLs, it's enough to catch most of them.
|
|
|
# False negatives simply mean the link won't be linkified on Instagram,
|
|
|
# but false positives will cause the message to fail to send.
|
|
@@ -127,18 +136,15 @@ class Portal(DBPortal, BasePortal):
|
|
|
by_thread_id: dict[tuple[str, int], Portal] = {}
|
|
|
config: Config
|
|
|
matrix: m.MatrixHandler
|
|
|
- az: AppService
|
|
|
private_chat_portal_meta: bool
|
|
|
|
|
|
_main_intent: IntentAPI | None
|
|
|
_create_room_lock: asyncio.Lock
|
|
|
- backfill_lock: SimpleLock
|
|
|
_msgid_dedup: deque[str]
|
|
|
_reqid_dedup: set[str]
|
|
|
|
|
|
_last_participant_update: set[int]
|
|
|
_reaction_lock: asyncio.Lock
|
|
|
- _backfill_leave: set[IntentAPI] | None
|
|
|
_typing: set[UserID]
|
|
|
|
|
|
def __init__(
|
|
@@ -156,6 +162,7 @@ class Portal(DBPortal, BasePortal):
|
|
|
first_event_id: EventID | None = None,
|
|
|
next_batch_id: BatchID | None = None,
|
|
|
historical_base_insertion_event_id: EventID | None = None,
|
|
|
+ cursor: str | None = None,
|
|
|
) -> None:
|
|
|
super().__init__(
|
|
|
thread_id,
|
|
@@ -171,6 +178,7 @@ class Portal(DBPortal, BasePortal):
|
|
|
first_event_id,
|
|
|
next_batch_id,
|
|
|
historical_base_insertion_event_id,
|
|
|
+ cursor,
|
|
|
)
|
|
|
self._create_room_lock = asyncio.Lock()
|
|
|
self.log = self.log.getChild(thread_id)
|
|
@@ -178,10 +186,6 @@ class Portal(DBPortal, BasePortal):
|
|
|
self._reqid_dedup = set()
|
|
|
self._last_participant_update = set()
|
|
|
|
|
|
- self.backfill_lock = SimpleLock(
|
|
|
- "Waiting for backfilling to finish before handling %s", log=self.log
|
|
|
- )
|
|
|
- self._backfill_leave = None
|
|
|
self._main_intent = None
|
|
|
self._reaction_lock = asyncio.Lock()
|
|
|
self._typing = set()
|
|
@@ -205,8 +209,6 @@ class Portal(DBPortal, BasePortal):
|
|
|
cls.loop = bridge.loop
|
|
|
cls.bridge = bridge
|
|
|
cls.private_chat_portal_meta = cls.config["bridge.private_chat_portal_meta"]
|
|
|
- NotificationDisabler.puppet_cls = p.Puppet
|
|
|
- NotificationDisabler.config_enabled = cls.config["bridge.backfill.disable_notifications"]
|
|
|
|
|
|
# region Misc
|
|
|
|
|
@@ -1761,70 +1763,382 @@ class Portal(DBPortal, BasePortal):
|
|
|
return await p.Puppet.get_by_pk(self.other_user_pk)
|
|
|
|
|
|
# endregion
|
|
|
- # region Backfilling
|
|
|
+ # region Backfill
|
|
|
+
|
|
|
+ async def enqueue_immediate_backfill(self, source: u.User, priority: int) -> None:
|
|
|
+ assert self.config["bridge.backfill.msc2716"]
|
|
|
+ if not await Backfill.get(source.mxid, self.thread_id, self.receiver):
|
|
|
+ await Backfill.new(
|
|
|
+ source.mxid,
|
|
|
+ priority,
|
|
|
+ self.fbid,
|
|
|
+ self.fb_receiver,
|
|
|
+ self.config["bridge.backfill.incremental.max_pages"],
|
|
|
+ self.config["bridge.backfill.incremental.page_delay"],
|
|
|
+ self.config["bridge.backfill.incremental.post_batch_delay"],
|
|
|
+ self.config["bridge.backfill.incremental.max_total_pages"],
|
|
|
+ ).insert()
|
|
|
|
|
|
- async def backfill(self, source: u.User, thread: Thread, is_initial: bool = False) -> None:
|
|
|
- limit = (
|
|
|
- self.config["bridge.backfill.initial_limit"]
|
|
|
- if is_initial
|
|
|
- else self.config["bridge.backfill.missed_limit"]
|
|
|
+ async def backfill(self, source: u.User, backfill_request: Backfill) -> None:
|
|
|
+ try:
|
|
|
+ last_message_timestamp = await self._backfill(source, backfill_request)
|
|
|
+ if last_message_timestamp is not None:
|
|
|
+ await self.send_post_backfill_dummy(last_message_timestamp)
|
|
|
+ finally:
|
|
|
+ # Always sleep after the backfill request is finished processing, even if it errors.
|
|
|
+ await asyncio.sleep(backfill_request.post_batch_delay)
|
|
|
+
|
|
|
+ async def _backfill(self, source: u.User, backfill_request: Backfill) -> int | None:
|
|
|
+ assert source.client
|
|
|
+ self.log.debug("Backfill request: %s", backfill_request)
|
|
|
+
|
|
|
+ num_pages = backfill_request.num_pages
|
|
|
+ self.log.debug(
|
|
|
+ "Backfilling up to %d pages of history in %s through %s",
|
|
|
+ num_pages,
|
|
|
+ self.mxid,
|
|
|
+ source.mxid,
|
|
|
)
|
|
|
- if limit == 0:
|
|
|
- return
|
|
|
- elif limit < 0:
|
|
|
- limit = None
|
|
|
- with self.backfill_lock:
|
|
|
- await self._backfill(source, thread, is_initial, limit)
|
|
|
|
|
|
- async def _backfill(
|
|
|
- self, source: u.User, thread: Thread, is_initial: bool, limit: int
|
|
|
- ) -> None:
|
|
|
- self.log.debug("Backfilling history through %s", source.mxid)
|
|
|
+ try:
|
|
|
+ if self.cursor:
|
|
|
+ self.log.debug("There is a cursor for the chat, fetching messages before it")
|
|
|
+ resp = await source.client.get_thread(
|
|
|
+ self.thread_id, seq_id=source.seq_id, cursor=self.cursor
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ self.log.debug(
|
|
|
+ "There is no first message in the chat, starting with the most recent messages"
|
|
|
+ )
|
|
|
+ resp = await source.client.get_thread(self.thread_id, seq_id=source.seq_id)
|
|
|
+ except IGRateLimitError as e:
|
|
|
+ backoff = self.config.get("bridge.backfill.backoff.message_history", 300)
|
|
|
+ self.log.warning(
|
|
|
+ f"Backfilling failed due to rate limit. Waiting for {backoff} seconds before "
|
|
|
+ f"resuming. Error: {e}"
|
|
|
+ )
|
|
|
+ await asyncio.sleep(backoff)
|
|
|
+ raise
|
|
|
+
|
|
|
+ messages = resp.thread.items
|
|
|
+ cursor = resp.thread.oldest_cursor
|
|
|
+ backfill_more = resp.thread.has_older
|
|
|
+ if len(messages) == 0:
|
|
|
+ self.log.debug("No messages to backfill.")
|
|
|
+ return None
|
|
|
|
|
|
- entries = await self._fetch_backfill_items(source, thread, is_initial, limit)
|
|
|
- if not entries:
|
|
|
- self.log.debug("Didn't get any items to backfill from server")
|
|
|
- return
|
|
|
+ last_message_timestamp = messages[-1].timestamp
|
|
|
|
|
|
- self.log.debug("Got %d entries from server", len(entries))
|
|
|
-
|
|
|
- self._backfill_leave = set()
|
|
|
- async with NotificationDisabler(self.mxid, source):
|
|
|
- for entry in reversed(entries):
|
|
|
- sender = await p.Puppet.get_by_pk(int(entry.user_id))
|
|
|
- await self.handle_instagram_item(source, sender, entry, is_backfill=True)
|
|
|
- for intent in self._backfill_leave:
|
|
|
- self.log.trace("Leaving room with %s post-backfill", intent.mxid)
|
|
|
- await intent.leave_room(self.mxid)
|
|
|
- self._backfill_leave = None
|
|
|
- self.log.info("Backfilled %d messages through %s", len(entries), source.mxid)
|
|
|
-
|
|
|
- async def _fetch_backfill_items(
|
|
|
- self, source: u.User, thread: Thread, is_initial: bool, limit: int
|
|
|
- ) -> list[ThreadItem]:
|
|
|
- items = []
|
|
|
- excluded_count = 0
|
|
|
- self.log.debug("Fetching up to %d messages through %s", limit, source.igpk)
|
|
|
- async for item in source.client.iter_thread(self.thread_id, start_at=thread):
|
|
|
- if len(items) - excluded_count >= limit:
|
|
|
- self.log.debug(f"Fetched {len(items)} messages (the limit)")
|
|
|
- break
|
|
|
- elif not is_initial:
|
|
|
- msg = await DBMessage.get_by_item_id(item.item_id, receiver=self.receiver)
|
|
|
- if msg is not None:
|
|
|
- self.log.debug(
|
|
|
- f"Fetched {len(items)} messages and hit a message"
|
|
|
- " that's already in the database."
|
|
|
+ pages_to_backfill = backfill_request.num_pages
|
|
|
+ if backfill_request.max_total_pages > -1:
|
|
|
+ pages_to_backfill = min(pages_to_backfill, backfill_request.max_total_pages)
|
|
|
+
|
|
|
+ pages_backfilled = 0
|
|
|
+ for i in range(pages_to_backfill):
|
|
|
+ base_insertion_event_id = await self.backfill_message_page(source, messages)
|
|
|
+ self.cursor = cursor
|
|
|
+ await self.save()
|
|
|
+ pages_backfilled += 1
|
|
|
+
|
|
|
+ if base_insertion_event_id:
|
|
|
+ self.historical_base_insertion_event_id = base_insertion_event_id
|
|
|
+ await self.save()
|
|
|
+
|
|
|
+ if backfill_more and i < pages_to_backfill - 1:
|
|
|
+ # Sleep before fetching another page of messages.
|
|
|
+ await asyncio.sleep(backfill_request.page_delay)
|
|
|
+
|
|
|
+ # Fetch more messages
|
|
|
+ try:
|
|
|
+ resp = await source.client.get_thread(
|
|
|
+ self.thread_id, seq_id=source.seq_id, cursor=self.cursor
|
|
|
)
|
|
|
+ messages = resp.thread.items
|
|
|
+ cursor = resp.thread.oldest_cursor
|
|
|
+ backfill_more &= resp.thread.has_older
|
|
|
+ except IGRateLimitError as e:
|
|
|
+ backoff = self.config.get("bridge.backfill.backoff.message_history", 300)
|
|
|
+ self.log.warning(
|
|
|
+ f"Backfilling failed due to rate limit. Waiting for {backoff} seconds "
|
|
|
+ "before resuming."
|
|
|
+ )
|
|
|
+ await asyncio.sleep(backoff)
|
|
|
+
|
|
|
+ # If we hit the rate limit, then we will want to give up for now, but enqueue
|
|
|
+ # additional backfill to do later.
|
|
|
break
|
|
|
- elif not item.is_handleable:
|
|
|
- self.log.debug(
|
|
|
- f"Not counting {item.unhandleable_type} item {item.item_id}"
|
|
|
- " against backfill limit"
|
|
|
+
|
|
|
+ if backfill_request.max_total_pages == -1:
|
|
|
+ new_max_total_pages = -1
|
|
|
+ else:
|
|
|
+ new_max_total_pages = backfill_request.max_total_pages - pages_backfilled
|
|
|
+ if new_max_total_pages <= 0:
|
|
|
+ backfill_more = False
|
|
|
+
|
|
|
+ if backfill_more:
|
|
|
+ self.log.debug("Enqueueing more backfill")
|
|
|
+ await Backfill.new(
|
|
|
+ source.mxid,
|
|
|
+ # Always enqueue subsequent backfills at the lowest priority
|
|
|
+ 2,
|
|
|
+ self.fbid,
|
|
|
+ self.fb_receiver,
|
|
|
+ backfill_request.num_pages,
|
|
|
+ backfill_request.page_delay,
|
|
|
+ backfill_request.post_batch_delay,
|
|
|
+ new_max_total_pages,
|
|
|
+ ).insert()
|
|
|
+ else:
|
|
|
+ self.log.debug("No more messages to backfill")
|
|
|
+
|
|
|
+ return last_message_timestamp
|
|
|
+
|
|
|
+ async def backfill_message_page(
|
|
|
+ self,
|
|
|
+ source: u.User,
|
|
|
+ message_page: list[ThreadItem],
|
|
|
+ forward: bool = False,
|
|
|
+ last_message: DBMessage | None = None,
|
|
|
+ mark_read: bool = False,
|
|
|
+ ) -> EventID | None:
|
|
|
+ """
|
|
|
+ Backfills a page of messages to Matrix. The messages should be in order from oldest to
|
|
|
+ newest.
|
|
|
+
|
|
|
+ Returns: a tuple containing the number of messages that were actually bridged, the
|
|
|
+ timestamp of the oldest bridged message and the base insertion event ID if it exists.
|
|
|
+ """
|
|
|
+ assert source.client
|
|
|
+ if len(message_page) == 0:
|
|
|
+ return None
|
|
|
+
|
|
|
+ if forward:
|
|
|
+ assert (last_message and last_message.mxid) or self.first_event_id
|
|
|
+ prev_event_id = last_message.mxid if last_message else self.first_event_id
|
|
|
+ else:
|
|
|
+ assert self.config["bridge.backfill.msc2716"]
|
|
|
+ assert self.first_event_id
|
|
|
+ prev_event_id = self.first_event_id
|
|
|
+
|
|
|
+ assert self.mxid
|
|
|
+
|
|
|
+ oldest_message_in_page = message_page[0]
|
|
|
+ oldest_msg_timestamp = oldest_message_in_page.timestamp_ms
|
|
|
+
|
|
|
+ batch_messages: list[BatchSendEvent] = []
|
|
|
+ state_events_at_start: list[BatchSendStateEvent] = []
|
|
|
+
|
|
|
+ added_members = set()
|
|
|
+ current_members = await self.main_intent.state_store.get_members(
|
|
|
+ self.mxid, memberships=(Membership.JOIN,)
|
|
|
+ )
|
|
|
+
|
|
|
+ def add_member(puppet: p.Puppet, mxid: UserID):
|
|
|
+ assert self.mxid
|
|
|
+ if mxid in added_members:
|
|
|
+ return
|
|
|
+ if (
|
|
|
+ self.bridge.homeserver_software.is_hungry
|
|
|
+ or not self.config["bridge.backfill.msc2716"]
|
|
|
+ ):
|
|
|
+ # Hungryserv doesn't expect or check state events at start.
|
|
|
+ added_members.add(mxid)
|
|
|
+ return
|
|
|
+
|
|
|
+ content_args = {"avatar_url": puppet.photo_mxc, "displayname": puppet.name}
|
|
|
+ state_events_at_start.extend(
|
|
|
+ [
|
|
|
+ BatchSendStateEvent(
|
|
|
+ content=MemberStateEventContent(Membership.INVITE, **content_args),
|
|
|
+ type=EventType.ROOM_MEMBER,
|
|
|
+ sender=self.main_intent.mxid,
|
|
|
+ state_key=mxid,
|
|
|
+ timestamp=oldest_msg_timestamp,
|
|
|
+ ),
|
|
|
+ BatchSendStateEvent(
|
|
|
+ content=MemberStateEventContent(Membership.JOIN, **content_args),
|
|
|
+ type=EventType.ROOM_MEMBER,
|
|
|
+ sender=mxid,
|
|
|
+ state_key=mxid,
|
|
|
+ timestamp=oldest_msg_timestamp,
|
|
|
+ ),
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ added_members.add(mxid)
|
|
|
+
|
|
|
+ message_infos: list[tuple[ThreadItem, int]] = []
|
|
|
+ intents: list[IntentAPI] = []
|
|
|
+ last_message_timestamp = 0
|
|
|
+
|
|
|
+ for message in message_page:
|
|
|
+ last_message_timestamp = max(last_message_timestamp, message.timestamp_ms)
|
|
|
+
|
|
|
+ puppet: p.Puppet = await p.Puppet.get_by_pk(message.user_id)
|
|
|
+ if puppet:
|
|
|
+ intent = puppet.intent_for(self)
|
|
|
+ if not puppet.name:
|
|
|
+ # TODO where to get "info"
|
|
|
+ await puppet.update_info(info, source)
|
|
|
+ else:
|
|
|
+ intent = self.main_intent
|
|
|
+ if puppet.is_real_user and not self._can_double_puppet_backfill(intent.mxid):
|
|
|
+ intent = puppet.default_mxid_intent
|
|
|
+
|
|
|
+ # Convert the message
|
|
|
+ converted = await self.convert_instagram_item(source, puppet, message)
|
|
|
+ if not converted:
|
|
|
+ self.log.debug(f"Skipping unsupported message in backfill {message.item_id}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ if intent.mxid not in current_members:
|
|
|
+ add_member(puppet, intent.mxid)
|
|
|
+
|
|
|
+ for index, (event_type, content) in enumerate(converted):
|
|
|
+ if self.encrypted and self.matrix.e2ee:
|
|
|
+ event_type, content = await self.matrix.e2ee.encrypt(
|
|
|
+ self.mxid, event_type, content
|
|
|
+ )
|
|
|
+ if intent.api.is_real_user and intent.api.bridge_name is not None:
|
|
|
+ content[DOUBLE_PUPPET_SOURCE_KEY] = intent.api.bridge_name
|
|
|
+
|
|
|
+ message_infos.append((message, index))
|
|
|
+ batch_messages.append(
|
|
|
+ BatchSendEvent(
|
|
|
+ content=content,
|
|
|
+ type=event_type,
|
|
|
+ sender=intent.mxid,
|
|
|
+ timestamp=message.timestamp,
|
|
|
+ )
|
|
|
+ )
|
|
|
+ intents.append(intent)
|
|
|
+
|
|
|
+ if not batch_messages:
|
|
|
+ return None
|
|
|
+
|
|
|
+ if not self.bridge.homeserver_software.is_hungry and (
|
|
|
+ forward or self.next_batch_id is None
|
|
|
+ ):
|
|
|
+ self.log.debug("Sending dummy event to avoid forward extremity errors")
|
|
|
+ await self.az.intent.send_message_event(
|
|
|
+ self.mxid, EventType("fi.mau.dummy.pre_backfill", EventType.Class.MESSAGE), {}
|
|
|
+ )
|
|
|
+
|
|
|
+ self.log.info(
|
|
|
+ "Sending %d %s messages to %s with batch ID %s and previous event ID %s",
|
|
|
+ len(batch_messages),
|
|
|
+ "new" if forward else "historical",
|
|
|
+ self.mxid,
|
|
|
+ self.next_batch_id,
|
|
|
+ prev_event_id,
|
|
|
+ )
|
|
|
+
|
|
|
+ base_insertion_event_id = None
|
|
|
+ if self.config["bridge.backfill.msc2716"]:
|
|
|
+ batch_send_resp = await self.main_intent.batch_send(
|
|
|
+ self.mxid,
|
|
|
+ prev_event_id,
|
|
|
+ batch_id=self.next_batch_id,
|
|
|
+ events=batch_messages,
|
|
|
+ state_events_at_start=state_events_at_start,
|
|
|
+ beeper_new_messages=forward,
|
|
|
+ beeper_mark_read_by=source.mxid if mark_read else None,
|
|
|
+ )
|
|
|
+ base_insertion_event_id = batch_send_resp.base_insertion_event_id
|
|
|
+ event_ids = batch_send_resp.event_ids
|
|
|
+ else:
|
|
|
+ batch_send_resp = None
|
|
|
+ event_ids = [
|
|
|
+ await intent.send_message_event(
|
|
|
+ self.mxid, evt.type, evt.content, timestamp=evt.timestamp
|
|
|
)
|
|
|
- excluded_count += 1
|
|
|
- items.append(item)
|
|
|
- return items
|
|
|
+ for evt, intent in zip(reversed(batch_messages), reversed(intents))
|
|
|
+ ]
|
|
|
+ await self._finish_batch(event_ids, message_infos)
|
|
|
+ if not forward:
|
|
|
+ assert batch_send_resp
|
|
|
+ self.log.debug("Got next batch ID %s for %s", batch_send_resp.next_batch_id, self.mxid)
|
|
|
+ self.next_batch_id = batch_send_resp.next_batch_id
|
|
|
+ await self.save()
|
|
|
+
|
|
|
+ return base_insertion_event_id
|
|
|
+
|
|
|
+ def _can_double_puppet_backfill(self, custom_mxid: UserID) -> bool:
|
|
|
+ return self.config["bridge.backfill.double_puppet_backfill"] and (
|
|
|
+ # Hungryserv can batch send any users
|
|
|
+ self.bridge.homeserver_software.is_hungry
|
|
|
+ # Non-MSC2716 backfill can use any double puppet
|
|
|
+ or not self.config["bridge.backfill.msc2716"]
|
|
|
+ # Local users can be double puppeted even with MSC2716
|
|
|
+ or (custom_mxid[custom_mxid.index(":") + 1 :] == self.config["homeserver.domain"])
|
|
|
+ )
|
|
|
+
|
|
|
+ async def _finish_batch(
|
|
|
+ self, event_ids: list[EventID], message_infos: list[tuple[ThreadItem, int]]
|
|
|
+ ):
|
|
|
+ # We have to do this slightly annoying processing of the event IDs and message infos so
|
|
|
+ # that we only map the last event ID to the message.
|
|
|
+ # When inline captions are enabled, this will have no effect since index will always be 0
|
|
|
+ # since there's only ever one event per message.
|
|
|
+ current_message = None
|
|
|
+ messages = []
|
|
|
+ for event_id, (message, index) in zip(event_ids, message_infos):
|
|
|
+ if index == 0 and current_message:
|
|
|
+ # This means that all of the events for the previous message have been processed,
|
|
|
+ # and the current_message is the most recent event for that message.
|
|
|
+ messages.append(current_message)
|
|
|
+
|
|
|
+ current_message = DBMessage(
|
|
|
+ mxid=event_id,
|
|
|
+ mx_room=self.mxid,
|
|
|
+ item_id=message.item_id,
|
|
|
+ client_context=message.client_context,
|
|
|
+ receiver=self.receiver,
|
|
|
+ sender=message.user_id,
|
|
|
+ ig_timestamp=message.timestamp_ms,
|
|
|
+ )
|
|
|
+
|
|
|
+ if current_message:
|
|
|
+ messages.append(current_message)
|
|
|
+
|
|
|
+ try:
|
|
|
+ await DBMessage.bulk_insert(messages)
|
|
|
+ except Exception:
|
|
|
+ self.log.exception("Failed to store batch message IDs")
|
|
|
+
|
|
|
+ async def send_post_backfill_dummy(
|
|
|
+ self,
|
|
|
+ last_message_timestamp: int,
|
|
|
+ base_insertion_event_id: EventID | None = None,
|
|
|
+ ):
|
|
|
+ assert self.mxid
|
|
|
+
|
|
|
+ if not base_insertion_event_id:
|
|
|
+ base_insertion_event_id = self.historical_base_insertion_event_id
|
|
|
+
|
|
|
+ if not base_insertion_event_id:
|
|
|
+ self.log.debug(
|
|
|
+ "No base insertion event ID in database or from batch send response. Not sending"
|
|
|
+ " dummy event."
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
+ event_id = await self.main_intent.send_message_event(
|
|
|
+ self.mxid,
|
|
|
+ event_type=HistorySyncMarkerMessage,
|
|
|
+ content={
|
|
|
+ "org.matrix.msc2716.marker.insertion": base_insertion_event_id,
|
|
|
+ "m.marker.insertion": base_insertion_event_id,
|
|
|
+ },
|
|
|
+ )
|
|
|
+ await DBMessage(
|
|
|
+ mxid=event_id,
|
|
|
+ mx_room=self.mxid,
|
|
|
+ item_id="",
|
|
|
+ client_context=None,
|
|
|
+ receiver=self.receiver,
|
|
|
+ sender=0,
|
|
|
+ ig_timestamp=last_message_timestamp,
|
|
|
+ ).insert()
|
|
|
|
|
|
# endregion
|
|
|
# region Bridge info state event
|
|
@@ -1877,7 +2191,11 @@ class Portal(DBPortal, BasePortal):
|
|
|
self.log.exception("Failed to update portal")
|
|
|
return self.mxid
|
|
|
async with self._create_room_lock:
|
|
|
- return await self._create_matrix_room(source, info)
|
|
|
+ try:
|
|
|
+ return await self._create_matrix_room(source, info)
|
|
|
+ except Exception:
|
|
|
+ self.log.exception("Failed to create portal")
|
|
|
+ return None
|
|
|
|
|
|
def _get_invite_content(self, double_puppet: p.Puppet | None) -> dict[str, bool]:
|
|
|
invite_content = {}
|
|
@@ -1887,9 +2205,7 @@ class Portal(DBPortal, BasePortal):
|
|
|
invite_content["is_direct"] = True
|
|
|
return invite_content
|
|
|
|
|
|
- async def update_matrix_room(
|
|
|
- self, source: u.User, info: Thread, backfill: bool = False
|
|
|
- ) -> None:
|
|
|
+ async def update_matrix_room(self, source: u.User, info: Thread) -> None:
|
|
|
puppet = await p.Puppet.get_by_custom_mxid(source.mxid)
|
|
|
await self.main_intent.invite_user(
|
|
|
self.mxid,
|
|
@@ -1903,17 +2219,6 @@ class Portal(DBPortal, BasePortal):
|
|
|
await source.update_direct_chats({self.main_intent.mxid: [self.mxid]})
|
|
|
|
|
|
await self.update_info(info, source)
|
|
|
-
|
|
|
- if backfill:
|
|
|
- last_msg = await DBMessage.get_by_item_id(
|
|
|
- info.last_permanent_item.item_id, receiver=self.receiver
|
|
|
- )
|
|
|
- if last_msg is None:
|
|
|
- self.log.debug(
|
|
|
- f"Last permanent item ({info.last_permanent_item.item_id})"
|
|
|
- " not found in database, starting backfilling"
|
|
|
- )
|
|
|
- await self.backfill(source, thread=info, is_initial=False)
|
|
|
await self._update_read_receipts(info.last_seen_at)
|
|
|
|
|
|
async def _create_matrix_room(self, source: u.User, info: Thread) -> RoomID | None:
|
|
@@ -1950,54 +2255,52 @@ class Portal(DBPortal, BasePortal):
|
|
|
if self.encrypted or self.private_chat_portal_meta or not self.is_direct:
|
|
|
name = self.name
|
|
|
|
|
|
- # We lock backfill lock here so any messages that come between the room being created
|
|
|
- # and the initial backfill finishing wouldn't be bridged before the backfill messages.
|
|
|
- with self.backfill_lock:
|
|
|
- creation_content = {}
|
|
|
- if not self.config["bridge.federate_rooms"]:
|
|
|
- creation_content["m.federate"] = False
|
|
|
- self.mxid = await self.main_intent.create_room(
|
|
|
- name=name,
|
|
|
- is_direct=self.is_direct,
|
|
|
- initial_state=initial_state,
|
|
|
- invitees=invites,
|
|
|
- creation_content=creation_content,
|
|
|
- )
|
|
|
- if not self.mxid:
|
|
|
- raise Exception("Failed to create room: no mxid returned")
|
|
|
+ creation_content = {}
|
|
|
+ if not self.config["bridge.federate_rooms"]:
|
|
|
+ creation_content["m.federate"] = False
|
|
|
+ self.mxid = await self.main_intent.create_room(
|
|
|
+ name=name,
|
|
|
+ is_direct=self.is_direct,
|
|
|
+ initial_state=initial_state,
|
|
|
+ invitees=invites,
|
|
|
+ creation_content=creation_content,
|
|
|
+ )
|
|
|
+ if not self.mxid:
|
|
|
+ raise Exception("Failed to create room: no mxid returned")
|
|
|
|
|
|
- if self.encrypted and self.matrix.e2ee and self.is_direct:
|
|
|
- try:
|
|
|
- await self.az.intent.ensure_joined(self.mxid)
|
|
|
- except Exception:
|
|
|
- self.log.warning(f"Failed to add bridge bot to new private chat {self.mxid}")
|
|
|
+ if self.encrypted and self.matrix.e2ee and self.is_direct:
|
|
|
+ try:
|
|
|
+ await self.az.intent.ensure_joined(self.mxid)
|
|
|
+ except Exception:
|
|
|
+ self.log.warning(f"Failed to add bridge bot to new private chat {self.mxid}")
|
|
|
|
|
|
- await self.update()
|
|
|
- self.log.debug(f"Matrix room created: {self.mxid}")
|
|
|
- self.by_mxid[self.mxid] = self
|
|
|
+ await self.update()
|
|
|
+ self.log.debug(f"Matrix room created: {self.mxid}")
|
|
|
+ self.by_mxid[self.mxid] = self
|
|
|
|
|
|
- puppet = await p.Puppet.get_by_custom_mxid(source.mxid)
|
|
|
- await self.main_intent.invite_user(
|
|
|
- self.mxid, source.mxid, extra_content=self._get_invite_content(puppet)
|
|
|
- )
|
|
|
- if puppet:
|
|
|
- try:
|
|
|
- if self.is_direct:
|
|
|
- await source.update_direct_chats({self.main_intent.mxid: [self.mxid]})
|
|
|
- await puppet.intent.join_room_by_id(self.mxid)
|
|
|
- except MatrixError:
|
|
|
- self.log.debug(
|
|
|
- "Failed to join custom puppet into newly created portal", exc_info=True
|
|
|
- )
|
|
|
+ puppet = await p.Puppet.get_by_custom_mxid(source.mxid)
|
|
|
+ await self.main_intent.invite_user(
|
|
|
+ self.mxid, source.mxid, extra_content=self._get_invite_content(puppet)
|
|
|
+ )
|
|
|
+ if puppet:
|
|
|
+ try:
|
|
|
+ if self.is_direct:
|
|
|
+ await source.update_direct_chats({self.main_intent.mxid: [self.mxid]})
|
|
|
+ await puppet.intent.join_room_by_id(self.mxid)
|
|
|
+ except MatrixError:
|
|
|
+ self.log.debug(
|
|
|
+ "Failed to join custom puppet into newly created portal", exc_info=True
|
|
|
+ )
|
|
|
|
|
|
- await self._update_participants(info.users, source)
|
|
|
+ await self._update_participants(info.users, source)
|
|
|
|
|
|
- try:
|
|
|
- await self.backfill(source, thread=info, is_initial=True)
|
|
|
- except Exception:
|
|
|
- self.log.exception("Failed to backfill new portal")
|
|
|
- await self._update_read_receipts(info.last_seen_at)
|
|
|
+ self.log.trace("Sending portal post-create dummy event")
|
|
|
+ self.first_event_id = await self.az.intent.send_message_event(
|
|
|
+ self.mxid, PortalCreateDummy, {}
|
|
|
+ )
|
|
|
+ await self.update()
|
|
|
|
|
|
+ await self._update_read_receipts(info.last_seen_at)
|
|
|
return self.mxid
|
|
|
|
|
|
# endregion
|