Эх сурвалжийг харах

Catch individual thread sync errors while syncing

Tulir Asokan 4 жил өмнө
parent
commit
dc38859b2e

+ 1 - 1
mautrix_instagram/__main__.py

@@ -135,7 +135,7 @@ class InstagramBridge(Bridge):
                     log.debug("Periodic reconnect loop stopped")
                     log.debug("Periodic reconnect loop stopped")
                     return
                     return
                 except Exception:
                 except Exception:
-                    log.exception("Error while reconnecting", user.mxid)
+                    log.exception("Error while reconnecting %s", user.mxid)
 
 
     async def get_portal(self, room_id: RoomID) -> Portal:
     async def get_portal(self, room_id: RoomID) -> Portal:
         return await Portal.get_by_mxid(room_id)
         return await Portal.get_by_mxid(room_id)

+ 20 - 11
mautrix_instagram/user.py

@@ -23,7 +23,7 @@ import time
 from mauigpapi import AndroidAPI, AndroidState, AndroidMQTT
 from mauigpapi import AndroidAPI, AndroidState, AndroidMQTT
 from mauigpapi.mqtt import Connect, Disconnect, GraphQLSubscription, SkywalkerSubscription
 from mauigpapi.mqtt import Connect, Disconnect, GraphQLSubscription, SkywalkerSubscription
 from mauigpapi.types import (CurrentUser, MessageSyncEvent, Operation, RealtimeDirectEvent,
 from mauigpapi.types import (CurrentUser, MessageSyncEvent, Operation, RealtimeDirectEvent,
-                             ActivityIndicatorData, TypingStatus, ThreadSyncEvent)
+                             ActivityIndicatorData, TypingStatus, ThreadSyncEvent, Thread)
 from mauigpapi.errors import IGNotLoggedInError, MQTTNotLoggedIn, MQTTNotConnected
 from mauigpapi.errors import IGNotLoggedInError, MQTTNotLoggedIn, MQTTNotConnected
 from mautrix.bridge import BaseUser, async_getter_lock
 from mautrix.bridge import BaseUser, async_getter_lock
 from mautrix.types import UserID, RoomID, EventID, TextMessageEventContent, MessageType
 from mautrix.types import UserID, RoomID, EventID, TextMessageEventContent, MessageType
@@ -224,6 +224,17 @@ class User(DBUser, BaseUser):
         else:
         else:
             await self.start_listen()
             await self.start_listen()
 
 
+    async def _sync_thread(self, thread: Thread, min_active_at: int) -> None:
+        portal = await po.Portal.get_by_thread(thread, self.igpk)
+        if portal.mxid:
+            self.log.debug(f"{thread.thread_id} has a portal, syncing and backfilling...")
+            await portal.update_matrix_room(self, thread, backfill=True)
+        elif thread.last_activity_at > min_active_at:
+            self.log.debug(f"{thread.thread_id} has been active recently, creating portal...")
+            await portal.create_matrix_room(self, thread)
+        else:
+            self.log.debug(f"{thread.thread_id} is not active and doesn't have a portal")
+
     async def sync(self) -> None:
     async def sync(self) -> None:
         resp = await self.client.get_inbox()
         resp = await self.client.get_inbox()
         max_age = self.config["bridge.portal_create_max_age"] * 1_000_000
         max_age = self.config["bridge.portal_create_max_age"] * 1_000_000
@@ -231,19 +242,17 @@ class User(DBUser, BaseUser):
         min_active_at = (time.time() * 1_000_000) - max_age
         min_active_at = (time.time() * 1_000_000) - max_age
         i = 0
         i = 0
         async for thread in self.client.iter_inbox(start_at=resp):
         async for thread in self.client.iter_inbox(start_at=resp):
-            portal = await po.Portal.get_by_thread(thread, self.igpk)
-            if portal.mxid:
-                self.log.debug(f"{thread.thread_id} has a portal, syncing and backfilling...")
-                await portal.update_matrix_room(self, thread, backfill=True)
-            elif thread.last_activity_at > min_active_at:
-                self.log.debug(f"{thread.thread_id} has been active recently, creating portal...")
-                await portal.create_matrix_room(self, thread)
-            else:
-                self.log.debug(f"{thread.thread_id} is not active and doesn't have a portal")
+            try:
+                await self._sync_thread(thread, min_active_at)
+            except Exception:
+                self.log.exception(f"Error syncing thread {thread.thread_id}")
             i += 1
             i += 1
             if i >= limit:
             if i >= limit:
                 break
                 break
-        await self.update_direct_chats()
+        try:
+            await self.update_direct_chats()
+        except Exception:
+            self.log.exception("Error updating direct chat list")
 
 
         if not self._listen_task:
         if not self._listen_task:
             await self.start_listen(resp.seq_id, resp.snapshot_at_ms)
             await self.start_listen(resp.seq_id, resp.snapshot_at_ms)