|
@@ -1900,29 +1900,32 @@ class Portal(DBPortal, BasePortal):
|
|
|
await asyncio.sleep(backoff)
|
|
|
raise
|
|
|
|
|
|
- messages = []
|
|
|
- # Sometimes (seems like on Facebook chats) it fetches the first message in the chat over
|
|
|
- # and over again.
|
|
|
- for item in resp.thread.items:
|
|
|
- # Check in-memory queues for duplicates
|
|
|
- if item.item_id in self._msgid_dedup:
|
|
|
- self.log.debug(
|
|
|
- f"Ignoring message {item.item_id} ({item.client_context}) by {item.user_id}"
|
|
|
- " as it was already handled (message.id in dedup queue)"
|
|
|
- )
|
|
|
- continue
|
|
|
- self._msgid_dedup.appendleft(item.item_id)
|
|
|
+ async def dedup_messages(messages: list[ThreadItem]) -> list[ThreadItem]:
|
|
|
+ deduped = []
|
|
|
+ # Sometimes (seems like on Facebook chats) it fetches the first message in the chat over
|
|
|
+ # and over again.
|
|
|
+ for item in messages:
|
|
|
+ # Check in-memory queues for duplicates
|
|
|
+ if item.item_id in self._msgid_dedup:
|
|
|
+ self.log.debug(
|
|
|
+ f"Ignoring message {item.item_id} ({item.client_context}) by {item.user_id}"
|
|
|
+ " as it was already handled (message.id in dedup queue)"
|
|
|
+ )
|
|
|
+ continue
|
|
|
+ self._msgid_dedup.appendleft(item.item_id)
|
|
|
|
|
|
- # Check database for duplicates
|
|
|
- if await DBMessage.get_by_item_id(item.item_id, self.receiver) is not None:
|
|
|
- self.log.debug(
|
|
|
- f"Ignoring message {item.item_id} ({item.client_context}) by {item.user_id}"
|
|
|
- " as it was already handled (message.id in database)"
|
|
|
- )
|
|
|
- continue
|
|
|
+ # Check database for duplicates
|
|
|
+ if await DBMessage.get_by_item_id(item.item_id, self.receiver) is not None:
|
|
|
+ self.log.debug(
|
|
|
+ f"Ignoring message {item.item_id} ({item.client_context}) by {item.user_id}"
|
|
|
+ " as it was already handled (message.id in database)"
|
|
|
+ )
|
|
|
+ continue
|
|
|
|
|
|
- messages.append(item)
|
|
|
+ deduped.append(item)
|
|
|
+ return deduped
|
|
|
|
|
|
+ messages = await dedup_messages(resp.thread.items)
|
|
|
cursor = resp.thread.oldest_cursor
|
|
|
backfill_more = resp.thread.has_older
|
|
|
if len(messages) == 0:
|
|
@@ -1957,7 +1960,7 @@ class Portal(DBPortal, BasePortal):
|
|
|
resp = await source.client.get_thread(
|
|
|
self.thread_id, seq_id=source.seq_id, cursor=self.cursor
|
|
|
)
|
|
|
- messages = resp.thread.items
|
|
|
+ messages = await dedup_messages(resp.thread.items)
|
|
|
cursor = resp.thread.oldest_cursor
|
|
|
backfill_more &= resp.thread.has_older
|
|
|
except IGRateLimitError as e:
|
|
@@ -2160,6 +2163,8 @@ class Portal(DBPortal, BasePortal):
|
|
|
self.next_batch_id,
|
|
|
prev_event_id,
|
|
|
)
|
|
|
+ if self.bridge.homeserver_software.is_hungry:
|
|
|
+ self.log.debug("Batch message event IDs %s", [m.event_id for m in batch_messages])
|
|
|
|
|
|
base_insertion_event_id = None
|
|
|
if self.config["bridge.backfill.msc2716"]:
|