Ver Fonte

backfill: only update cursor after processing whole page

Signed-off-by: Sumner Evans <sumner@beeper.com>
Sumner Evans há 2 anos atrás
pai
commit
427fd6950e
2 ficheiros alterados com 19 adições e 12 exclusões
  1. 7 4
      mauigpapi/http/thread.py
  2. 12 8
      mautrix_instagram/user.py

+ 7 - 4
mauigpapi/http/thread.py

@@ -15,7 +15,7 @@
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 from __future__ import annotations
 
-from typing import AsyncIterable, Type
+from typing import AsyncIterable, Callable, Type
 import asyncio
 import json
 
@@ -58,20 +58,22 @@ class ThreadAPI(BaseAndroidAPI):
 
     async def iter_inbox(
         self,
+        update_seq_id_and_cursor: Callable[[int, str | None], None],
         start_at: DMInboxResponse | None = None,
         local_limit: int | None = None,
         rate_limit_exceeded_backoff: float = 60.0,
-    ) -> AsyncIterable[tuple[Thread, int | None, str | None]]:
+    ) -> AsyncIterable[Thread]:
         thread_counter = 0
         if start_at:
             cursor = start_at.inbox.oldest_cursor
             seq_id = start_at.seq_id
             has_more = start_at.inbox.has_older
             for thread in start_at.inbox.threads:
-                yield thread, seq_id, cursor
+                yield thread
                 thread_counter += 1
                 if local_limit and thread_counter >= local_limit:
                     return
+            update_seq_id_and_cursor(seq_id, cursor)
         else:
             cursor = None
             seq_id = None
@@ -94,10 +96,11 @@ class ThreadAPI(BaseAndroidAPI):
             cursor = resp.inbox.oldest_cursor
             has_more = resp.inbox.has_older
             for thread in resp.inbox.threads:
-                yield thread, seq_id, cursor
+                yield thread
                 thread_counter += 1
                 if local_limit and thread_counter >= local_limit:
                     return
+            update_seq_id_and_cursor(seq_id, cursor)
 
     async def get_thread(
         self,

+ 12 - 8
mautrix_instagram/user.py

@@ -682,7 +682,9 @@ class User(DBUser, BaseUser):
             local_limit = None
 
         await self._sync_threads_with_delay(
-            self.client.iter_inbox(start_at=resp, local_limit=local_limit),
+            self.client.iter_inbox(
+                self._update_seq_id_and_cursor, start_at=resp, local_limit=local_limit
+            ),
             stop_when_threads_have_no_messages_to_backfill=True,
             increment_total_backfilled_portals=increment_total_backfilled_portals,
             local_limit=local_limit,
@@ -738,7 +740,8 @@ class User(DBUser, BaseUser):
         backoff = self.config.get("bridge.backfill.backoff.thread_list", 300)
         await self._sync_threads_with_delay(
             self.client.iter_inbox(
-                start_at,
+                self._update_seq_id_and_cursor,
+                start_at=start_at,
                 local_limit=local_limit,
                 rate_limit_exceeded_backoff=backoff,
             ),
@@ -747,9 +750,14 @@ class User(DBUser, BaseUser):
         )
         await self.update_direct_chats()
 
+    def _update_seq_id_and_cursor(self, seq_id: int, cursor: str | None):
+        self.seq_id = seq_id
+        if cursor:
+            self.oldest_cursor = cursor
+
     async def _sync_threads_with_delay(
         self,
-        threads: AsyncIterable[tuple[Thread, int | None, str | None]],
+        threads: AsyncIterable[Thread],
         increment_total_backfilled_portals: bool = False,
         stop_when_threads_have_no_messages_to_backfill: bool = False,
         local_limit: int | None = None,
@@ -757,7 +765,7 @@ class User(DBUser, BaseUser):
         sync_delay = self.config["bridge.backfill.min_sync_thread_delay"]
         last_thread_sync_ts = 0.0
         found_thread_count = 0
-        async for thread, seq_id, cursor in threads:
+        async for thread in threads:
             found_thread_count += 1
             now = time.monotonic()
             if now < last_thread_sync_ts + sync_delay:
@@ -773,10 +781,6 @@ class User(DBUser, BaseUser):
 
             if increment_total_backfilled_portals:
                 self.total_backfilled_portals = (self.total_backfilled_portals or 0) + 1
-            if seq_id:
-                self.seq_id = seq_id
-            if cursor:
-                self.oldest_cursor = cursor
             await self.update()
         if local_limit is None or found_thread_count < local_limit:
             if local_limit is None: