Browse Source

Make backfilling work

Tulir Asokan 4 years ago
parent
commit
2f34fb689e

+ 1 - 1
ROADMAP.md

@@ -24,7 +24,7 @@
       * [ ] Locations
   * [x] Message unsend
   * [ ] Message reactions
-  * [ ] Message history
+  * [x] Message history
   * [ ] Presence
   * [ ] Typing notifications
   * [ ] Read receipts

+ 0 - 1
mauigpapi/mqtt/conn.py

@@ -201,7 +201,6 @@ class AndroidMQTT:
             await self.iris_subscribe(self._iris_seq_id, self._iris_snapshot_at_ms)
 
     def _on_publish_handler(self, client: MQTToTClient, _: Any, mid: int) -> None:
-        self.log.trace(f"Received publish confirmation for {mid}")
         try:
             waiter = self._publish_waiters[mid]
         except KeyError:

+ 1 - 1
mautrix_instagram/db/message.py

@@ -55,7 +55,7 @@ class Message:
         return cls(**row)
 
     @classmethod
-    async def get_by_item_id(cls, item_id: str, receiver: int = 0) -> Optional['Message']:
+    async def get_by_item_id(cls, item_id: str, receiver: int) -> Optional['Message']:
         row = await cls.db.fetchrow("SELECT mxid, mx_room, item_id, receiver, sender "
                                     "FROM message WHERE item_id=$1 AND receiver=$2",
                                     item_id, receiver)

+ 26 - 19
mautrix_instagram/portal.py

@@ -71,6 +71,7 @@ class Portal(DBPortal, BasePortal):
     _main_intent: IntentAPI
     _last_participant_update: Set[int]
     _reaction_lock: asyncio.Lock
+    _backfill_leave: Optional[Set[IntentAPI]]
 
     def __init__(self, thread_id: str, receiver: int, other_user_pk: Optional[int],
                  mxid: Optional[RoomID] = None, name: Optional[str] = None, encrypted: bool = False
@@ -85,6 +86,7 @@ class Portal(DBPortal, BasePortal):
 
         self.backfill_lock = SimpleLock("Waiting for backfilling to finish before handling %s",
                                         log=self.log)
+        self._backfill_leave = None
         self._main_intent = None
         self._reaction_lock = asyncio.Lock()
 
@@ -308,7 +310,15 @@ class Portal(DBPortal, BasePortal):
                            " as it was already handled (message.id found in database)")
         else:
             self._msgid_dedup.appendleft(item.item_id)
-            intent = sender.intent_for(self)
+            if self.backfill_lock.locked and sender.need_backfill_invite(self):
+                self.log.debug("Adding %s's default puppet to room for backfilling", sender.mxid)
+                if self.is_direct:
+                    await self.main_intent.invite_user(self.mxid, sender.default_mxid)
+                intent = sender.default_mxid_intent
+                await intent.ensure_joined(self.mxid)
+                self._backfill_leave.add(intent)
+            else:
+                intent = sender.intent_for(self)
             event_id = None
             if item.media:
                 event_id = await self._handle_instagram_media(source, intent, item)
@@ -376,8 +386,6 @@ class Portal(DBPortal, BasePortal):
     # region Backfilling
 
     async def backfill(self, source: 'u.User', is_initial: bool = False) -> None:
-        if not is_initial:
-            raise RuntimeError("Non-initial backfilling is not supported")
         limit = (self.config["bridge.backfill.initial_limit"] if is_initial
                  else self.config["bridge.backfill.missed_limit"])
         if limit == 0:
@@ -397,14 +405,15 @@ class Portal(DBPortal, BasePortal):
 
         self.log.debug("Got %d entries from server", len(entries))
 
-        backfill_leave = await self._invite_own_puppet_backfill(source)
+        self._backfill_leave = set()
         async with NotificationDisabler(self.mxid, source):
             for entry in reversed(entries):
                 sender = await p.Puppet.get_by_pk(int(entry.user_id))
                 await self.handle_instagram_item(source, sender, entry)
-        for intent in backfill_leave:
+        for intent in self._backfill_leave:
             self.log.trace("Leaving room with %s post-backfill", intent.mxid)
             await intent.leave_room(self.mxid)
+        self._backfill_leave = None
         self.log.info("Backfilled %d messages through %s", len(entries), source.mxid)
 
     async def _fetch_backfill_items(self, source: 'u.User', is_initial: bool, limit: int
@@ -424,17 +433,6 @@ class Portal(DBPortal, BasePortal):
             items.append(item)
         return items
 
-    async def _invite_own_puppet_backfill(self, source: 'u.User') -> Set[IntentAPI]:
-        backfill_leave = set()
-        # TODO we should probably only invite the puppet when needed
-        if self.config["bridge.backfill.invite_own_puppet"]:
-            self.log.debug("Adding %s's default puppet to room for backfilling", source.mxid)
-            sender = await p.Puppet.get_by_pk(source.igpk)
-            await self.main_intent.invite_user(self.mxid, sender.default_mxid)
-            await sender.default_mxid_intent.join_room_by_id(self.mxid)
-            backfill_leave.add(sender.default_mxid_intent)
-        return backfill_leave
-
     # endregion
     # region Bridge info state event
 
@@ -478,14 +476,15 @@ class Portal(DBPortal, BasePortal):
     async def create_matrix_room(self, source: 'u.User', info: Thread) -> Optional[RoomID]:
         if self.mxid:
             try:
-                await self._update_matrix_room(source, info)
+                await self.update_matrix_room(source, info)
             except Exception:
                 self.log.exception("Failed to update portal")
             return self.mxid
         async with self._create_room_lock:
             return await self._create_matrix_room(source, info)
 
-    async def _update_matrix_room(self, source: 'u.User', info: Thread) -> None:
+    async def update_matrix_room(self, source: 'u.User', info: Thread, backfill: bool = False
+                                 ) -> None:
         await self.main_intent.invite_user(self.mxid, source.mxid, check_cache=True)
         puppet = await p.Puppet.get_by_custom_mxid(source.mxid)
         if puppet:
@@ -495,6 +494,14 @@ class Portal(DBPortal, BasePortal):
 
         await self.update_info(info)
 
+        if backfill:
+            last_msg = await DBMessage.get_by_item_id(info.last_permanent_item.item_id,
+                                                      receiver=self.receiver)
+            if last_msg is None:
+                self.log.debug(f"Last permanent item ({info.last_permanent_item.item_id})"
+                               " not found in database, starting backfilling")
+                await self.backfill(source, is_initial=False)
+
         # TODO
         # up = DBUserPortal.get(source.fbid, self.fbid, self.fb_receiver)
         # if not up:
@@ -507,7 +514,7 @@ class Portal(DBPortal, BasePortal):
 
     async def _create_matrix_room(self, source: 'u.User', info: Thread) -> Optional[RoomID]:
         if self.mxid:
-            await self._update_matrix_room(source, info)
+            await self.update_matrix_room(source, info)
             return self.mxid
         await self.update_info(info)
         self.log.debug("Creating Matrix room")

+ 5 - 2
mautrix_instagram/puppet.py

@@ -81,11 +81,14 @@ class Puppet(DBPuppet, BasePuppet):
         return self.pk
 
     def intent_for(self, portal: 'p.Portal') -> IntentAPI:
-        if portal.other_user_pk == self.pk or (self.config["bridge.backfill.invite_own_puppet"]
-                                               and portal.backfill_lock.locked):
+        if portal.other_user_pk == self.pk:
             return self.default_mxid_intent
         return self.intent
 
+    def need_backfill_invite(self, portal: 'p.Portal') -> bool:
+        return (portal.other_user_pk != self.pk and self.is_real_user
+                and self.config["bridge.backfill.invite_own_puppet"])
+
     async def update_info(self, info: BaseResponseUser) -> None:
         update = False
         update = await self._update_name(info) or update

+ 3 - 1
mautrix_instagram/user.py

@@ -193,7 +193,9 @@ class User(DBUser, BaseUser):
             limit = len(threads)
         for i, thread in enumerate(threads):
             portal = await po.Portal.get_by_thread(thread, self.igpk)
-            if portal.mxid or i < limit:
+            if portal.mxid:
+                await portal.update_matrix_room(self, thread, backfill=True)
+            elif i < limit:
                 await portal.create_matrix_room(self, thread)
         await self.update_direct_chats()