|
@@ -1822,7 +1822,29 @@ class Portal(DBPortal, BasePortal):
|
|
await asyncio.sleep(backoff)
|
|
await asyncio.sleep(backoff)
|
|
raise
|
|
raise
|
|
|
|
|
|
- messages = resp.thread.items
|
|
|
|
|
|
+ messages = []
|
|
|
|
+ # Sometimes (seems like on Facebook chats) it fetches the first message in the chat over
|
|
|
|
+ # and over again.
|
|
|
|
+ for item in resp.thread.items:
|
|
|
|
+ # Check in-memory queues for duplicates
|
|
|
|
+ if item.item_id not in self._msgid_dedup:
|
|
|
|
+ self.log.debug(
|
|
|
|
+ f"Ignoring message {item.item_id} ({item.client_context}) by {item.user_id}"
|
|
|
|
+ " as it was already handled (message.id in dedup queue)"
|
|
|
|
+ )
|
|
|
|
+ continue
|
|
|
|
+ self._msgid_dedup.appendleft(item.item_id)
|
|
|
|
+
|
|
|
|
+ # Check database for duplicates
|
|
|
|
+ if await DBMessage.get_by_item_id(item.item_id, self.receiver) is not None:
|
|
|
|
+ self.log.debug(
|
|
|
|
+ f"Ignoring message {item.item_id} ({item.client_context}) by {item.user_id}"
|
|
|
|
+ " as it was already handled (message.id in database)"
|
|
|
|
+ )
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+ messages.append(item)
|
|
|
|
+
|
|
cursor = resp.thread.oldest_cursor
|
|
cursor = resp.thread.oldest_cursor
|
|
backfill_more = resp.thread.has_older
|
|
backfill_more = resp.thread.has_older
|
|
if len(messages) == 0:
|
|
if len(messages) == 0:
|