|
@@ -1451,7 +1451,7 @@ class Portal(DBPortal, BasePortal):
|
|
# endregion
|
|
# endregion
|
|
# region Backfilling
|
|
# region Backfilling
|
|
|
|
|
|
- async def backfill(self, source: u.User, is_initial: bool = False) -> None:
|
|
|
|
|
|
+ async def backfill(self, source: u.User, thread: Thread, is_initial: bool = False) -> None:
|
|
limit = (
|
|
limit = (
|
|
self.config["bridge.backfill.initial_limit"]
|
|
self.config["bridge.backfill.initial_limit"]
|
|
if is_initial
|
|
if is_initial
|
|
@@ -1462,12 +1462,14 @@ class Portal(DBPortal, BasePortal):
|
|
elif limit < 0:
|
|
elif limit < 0:
|
|
limit = None
|
|
limit = None
|
|
with self.backfill_lock:
|
|
with self.backfill_lock:
|
|
- await self._backfill(source, is_initial, limit)
|
|
|
|
|
|
+ await self._backfill(source, thread, is_initial, limit)
|
|
|
|
|
|
- async def _backfill(self, source: u.User, is_initial: bool, limit: int) -> None:
|
|
|
|
|
|
+ async def _backfill(
|
|
|
|
+ self, source: u.User, thread: Thread, is_initial: bool, limit: int
|
|
|
|
+ ) -> None:
|
|
self.log.debug("Backfilling history through %s", source.mxid)
|
|
self.log.debug("Backfilling history through %s", source.mxid)
|
|
|
|
|
|
- entries = await self._fetch_backfill_items(source, is_initial, limit)
|
|
|
|
|
|
+ entries = await self._fetch_backfill_items(source, thread, is_initial, limit)
|
|
if not entries:
|
|
if not entries:
|
|
self.log.debug("Didn't get any items to backfill from server")
|
|
self.log.debug("Didn't get any items to backfill from server")
|
|
return
|
|
return
|
|
@@ -1486,11 +1488,11 @@ class Portal(DBPortal, BasePortal):
|
|
self.log.info("Backfilled %d messages through %s", len(entries), source.mxid)
|
|
self.log.info("Backfilled %d messages through %s", len(entries), source.mxid)
|
|
|
|
|
|
async def _fetch_backfill_items(
|
|
async def _fetch_backfill_items(
|
|
- self, source: u.User, is_initial: bool, limit: int
|
|
|
|
|
|
+ self, source: u.User, thread: Thread, is_initial: bool, limit: int
|
|
) -> list[ThreadItem]:
|
|
) -> list[ThreadItem]:
|
|
items = []
|
|
items = []
|
|
self.log.debug("Fetching up to %d messages through %s", limit, source.igpk)
|
|
self.log.debug("Fetching up to %d messages through %s", limit, source.igpk)
|
|
- async for item in source.client.iter_thread(self.thread_id):
|
|
|
|
|
|
+ async for item in source.client.iter_thread(self.thread_id, start_at=thread):
|
|
if len(items) >= limit:
|
|
if len(items) >= limit:
|
|
self.log.debug(f"Fetched {len(items)} messages (the limit)")
|
|
self.log.debug(f"Fetched {len(items)} messages (the limit)")
|
|
break
|
|
break
|
|
@@ -1597,7 +1599,7 @@ class Portal(DBPortal, BasePortal):
|
|
f"Last permanent item ({info.last_permanent_item.item_id})"
|
|
f"Last permanent item ({info.last_permanent_item.item_id})"
|
|
" not found in database, starting backfilling"
|
|
" not found in database, starting backfilling"
|
|
)
|
|
)
|
|
- await self.backfill(source, is_initial=False)
|
|
|
|
|
|
+ await self.backfill(source, thread=info, is_initial=False)
|
|
await self._update_read_receipts(info.last_seen_at)
|
|
await self._update_read_receipts(info.last_seen_at)
|
|
|
|
|
|
async def _create_matrix_room(self, source: u.User, info: Thread) -> RoomID | None:
|
|
async def _create_matrix_room(self, source: u.User, info: Thread) -> RoomID | None:
|
|
@@ -1677,7 +1679,7 @@ class Portal(DBPortal, BasePortal):
|
|
await self._update_participants(info.users, source)
|
|
await self._update_participants(info.users, source)
|
|
|
|
|
|
try:
|
|
try:
|
|
- await self.backfill(source, is_initial=True)
|
|
|
|
|
|
+ await self.backfill(source, thread=info, is_initial=True)
|
|
except Exception:
|
|
except Exception:
|
|
self.log.exception("Failed to backfill new portal")
|
|
self.log.exception("Failed to backfill new portal")
|
|
await self._update_read_receipts(info.last_seen_at)
|
|
await self._update_read_receipts(info.last_seen_at)
|