|
@@ -559,7 +559,9 @@ class User(DBUser, BaseUser):
|
|
|
info = {"challenge": e.body.challenge.serialize() if e.body.challenge else None}
|
|
|
await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error=error_code, info=info)
|
|
|
|
|
|
- async def _sync_thread(self, thread: Thread) -> bool:
|
|
|
+ async def _sync_thread(
|
|
|
+ self, thread: Thread, enqueue_backfill: bool = True, portal: po.Portal | None = None
|
|
|
+ ) -> bool:
|
|
|
"""
|
|
|
Sync a specific thread. Returns whether the thread had messages after the last message in
|
|
|
the database before the sync.
|
|
@@ -569,8 +571,11 @@ class User(DBUser, BaseUser):
|
|
|
forward_messages = thread.items
|
|
|
|
|
|
assert self.client
|
|
|
- portal = await po.Portal.get_by_thread(thread, self.igpk)
|
|
|
- assert portal
|
|
|
+ if not portal:
|
|
|
+ portal = await po.Portal.get_by_thread(thread, self.igpk)
|
|
|
+ assert portal
|
|
|
+ else:
|
|
|
+ assert portal.thread_id == thread.thread_id
|
|
|
|
|
|
# Create or update the Matrix room
|
|
|
if not portal.mxid:
|
|
@@ -655,7 +660,7 @@ class User(DBUser, BaseUser):
|
|
|
|
|
|
await portal._update_read_receipts(thread.last_seen_at)
|
|
|
|
|
|
- if self.config["bridge.backfill.msc2716"]:
|
|
|
+ if self.config["bridge.backfill.msc2716"] and enqueue_backfill:
|
|
|
await portal.enqueue_immediate_backfill(self, 1)
|
|
|
return len(forward_messages) > 0
|
|
|
|
|
@@ -1077,11 +1082,12 @@ class User(DBUser, BaseUser):
|
|
|
async def handle_message(self, evt: MessageSyncEvent) -> None:
|
|
|
portal = await po.Portal.get_by_thread_id(evt.message.thread_id, receiver=self.igpk)
|
|
|
if not portal or not portal.mxid:
|
|
|
- self.log.debug("Got message in thread with no portal, getting info...")
|
|
|
+ self.log.debug(
|
|
|
+ "Got message in thread with no portal, getting info and syncing thread..."
|
|
|
+ )
|
|
|
resp = await self.client.get_thread(evt.message.thread_id)
|
|
|
portal = await po.Portal.get_by_thread(resp.thread, self.igpk)
|
|
|
- self.log.debug("Got info for unknown portal, creating room")
|
|
|
- await portal.create_matrix_room(self, resp.thread)
|
|
|
+ await self._sync_thread(resp.thread, enqueue_backfill=False, portal=portal)
|
|
|
if not portal.mxid:
|
|
|
self.log.warning(
|
|
|
"Room creation appears to have failed, "
|
|
@@ -1114,18 +1120,18 @@ class User(DBUser, BaseUser):
|
|
|
portal = await po.Portal.get_by_thread(evt, receiver=self.igpk)
|
|
|
if portal.mxid:
|
|
|
self.log.debug("Got thread sync event for %s with existing portal", portal.thread_id)
|
|
|
- await portal.update_matrix_room(self, evt)
|
|
|
elif evt.is_group:
|
|
|
self.log.debug(
|
|
|
"Got thread sync event for group %s without existing portal, creating room",
|
|
|
portal.thread_id,
|
|
|
)
|
|
|
- await portal.create_matrix_room(self, evt)
|
|
|
else:
|
|
|
self.log.debug(
|
|
|
"Got thread sync event for DM %s without existing portal, ignoring",
|
|
|
portal.thread_id,
|
|
|
)
|
|
|
+ return
|
|
|
+ await self._sync_thread(evt, enqueue_backfill=False, portal=portal)
|
|
|
|
|
|
async def handle_thread_remove(self, evt: ThreadRemoveEvent) -> None:
|
|
|
self.log.debug("Got thread remove event: %s", evt.serialize())
|