فهرست منبع

Merge branch 'sumner/bri-2379'

Sumner Evans 2 سال پیش
والد
کامیت
6556cbb73e

+ 31 - 25
mauigpapi/http/thread.py

@@ -15,16 +15,18 @@
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 from __future__ import annotations
 
-from typing import AsyncIterable, Type
+from typing import AsyncIterable, Callable, Type
+import asyncio
 import json
 
+from mauigpapi.errors.response import IGRateLimitError
+
 from ..types import (
     CommandResponse,
     DMInboxResponse,
     DMThreadResponse,
     Thread,
     ThreadAction,
-    ThreadItem,
     ThreadItemType,
 )
 from .base import BaseAndroidAPI, T
@@ -55,25 +57,50 @@ class ThreadAPI(BaseAndroidAPI):
         )
 
     async def iter_inbox(
-        self, start_at: DMInboxResponse | None = None, message_limit: int = 10
+        self,
+        update_seq_id_and_cursor: Callable[[int, str | None], None],
+        start_at: DMInboxResponse | None = None,
+        local_limit: int | None = None,
+        rate_limit_exceeded_backoff: float = 60.0,
     ) -> AsyncIterable[Thread]:
+        thread_counter = 0
         if start_at:
             cursor = start_at.inbox.oldest_cursor
             seq_id = start_at.seq_id
             has_more = start_at.inbox.has_older
             for thread in start_at.inbox.threads:
                 yield thread
+                thread_counter += 1
+                if local_limit and thread_counter >= local_limit:
+                    return
+            update_seq_id_and_cursor(seq_id, cursor)
         else:
             cursor = None
             seq_id = None
             has_more = True
         while has_more:
-            resp = await self.get_inbox(message_limit=message_limit, cursor=cursor, seq_id=seq_id)
+            try:
+                resp = await self.get_inbox(message_limit=10, cursor=cursor, seq_id=seq_id)
+            except IGRateLimitError:
+                self.log.warning(
+                    "Fetching more threads failed due to rate limit. Waiting for "
+                    f"{rate_limit_exceeded_backoff} seconds before resuming."
+                )
+                await asyncio.sleep(rate_limit_exceeded_backoff)
+                continue
+            except Exception:
+                self.log.exception("Failed to fetch more threads")
+                raise
+
             seq_id = resp.seq_id
             cursor = resp.inbox.oldest_cursor
             has_more = resp.inbox.has_older
             for thread in resp.inbox.threads:
                 yield thread
+                thread_counter += 1
+                if local_limit and thread_counter >= local_limit:
+                    return
+            update_seq_id_and_cursor(seq_id, cursor)
 
     async def get_thread(
         self,
@@ -94,27 +121,6 @@ class ThreadAPI(BaseAndroidAPI):
             f"/api/v1/direct_v2/threads/{thread_id}/", query=query, response_type=DMThreadResponse
         )
 
-    async def iter_thread(
-        self,
-        thread_id: str,
-        seq_id: int | None = None,
-        cursor: str | None = None,
-        start_at: Thread | None = None,
-    ) -> AsyncIterable[ThreadItem]:
-        if start_at:
-            for item in start_at.items:
-                yield item
-            cursor = start_at.oldest_cursor
-            has_more = start_at.has_older
-        else:
-            has_more = True
-        while has_more:
-            resp = await self.get_thread(thread_id, seq_id=seq_id, cursor=cursor)
-            cursor = resp.thread.oldest_cursor
-            has_more = resp.thread.has_older
-            for item in resp.thread.items:
-                yield item
-
     async def create_group_thread(self, recipient_users: list[int | str]) -> Thread:
         return await self.std_http_post(
             "/api/v1/direct_v2/create_group_thread/",

+ 1 - 0
mauigpapi/types/thread_item.py

@@ -81,6 +81,7 @@ class ThreadItemType(ExtensibleEnum):
     XMA_REEL_SHARE = "xma_reel_share"
     XMA_STORY_SHARE = "xma_story_share"
     XMA_REEL_MENTION = "xma_reel_mention"
+    EXPIRED_PLACEHOLDER = "expired_placeholder"
 
 
 @dataclass(kw_only=True)

+ 19 - 8
mautrix_instagram/config.py

@@ -53,20 +53,31 @@ class Config(BaseBridgeConfig):
 
         copy("bridge.displayname_max_length")
 
-        copy("bridge.portal_create_max_age")
-        copy("bridge.chat_sync_limit")
-        copy("bridge.chat_create_limit")
-        copy("bridge.resync_on_startup")
+        copy("bridge.max_startup_thread_sync_count")
         copy("bridge.sync_with_custom_puppets")
         copy("bridge.sync_direct_chat_list")
         copy("bridge.double_puppet_server_map")
         copy("bridge.double_puppet_allow_discovery")
         copy("bridge.login_shared_secret_map")
         copy("bridge.federate_rooms")
-        copy("bridge.backfill.invite_own_puppet")
-        copy("bridge.backfill.initial_limit")
-        copy("bridge.backfill.missed_limit")
-        copy("bridge.backfill.disable_notifications")
+        copy("bridge.backfill.enable")
+        copy("bridge.backfill.msc2716")
+        copy("bridge.backfill.double_puppet_backfill")
+        if "bridge.initial_chat_sync" in self:
+            initial_chat_sync = self["bridge.initial_chat_sync"]
+            base["bridge.backfill.max_conversations"] = self.get(
+                "bridge.backfill.max_conversations", initial_chat_sync
+            )
+        else:
+            copy("bridge.backfill.max_conversations")
+        copy("bridge.backfill.min_sync_thread_delay")
+        copy("bridge.backfill.unread_hours_threshold")
+        copy("bridge.backfill.backoff.thread_list")
+        copy("bridge.backfill.backoff.message_history")
+        copy("bridge.backfill.incremental.max_pages")
+        copy("bridge.backfill.incremental.max_total_pages")
+        copy("bridge.backfill.incremental.page_delay")
+        copy("bridge.backfill.incremental.post_batch_delay")
         copy("bridge.periodic_reconnect.interval")
         copy("bridge.periodic_reconnect.resync")
         copy("bridge.periodic_reconnect.always")

+ 3 - 2
mautrix_instagram/db/__init__.py

@@ -1,5 +1,6 @@
 from mautrix.util.async_db import Database
 
+from .backfill_queue import Backfill
 from .message import Message
 from .portal import Portal
 from .puppet import Puppet
@@ -9,8 +10,8 @@ from .user import User
 
 
 def init(db: Database) -> None:
-    for table in (User, Puppet, Portal, Message, Reaction):
+    for table in (User, Puppet, Portal, Message, Reaction, Backfill):
         table.db = db
 
 
-__all__ = ["upgrade_table", "User", "Puppet", "Portal", "Message", "Reaction", "init"]
+__all__ = ["upgrade_table", "User", "Puppet", "Portal", "Message", "Reaction", "Backfill", "init"]

+ 182 - 0
mautrix_instagram/db/backfill_queue.py

@@ -0,0 +1,182 @@
+# mautrix-instagram - A Matrix-Instagram puppeting bridge.
+# Copyright (C) 2022 Tulir Asokan
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, ClassVar
+from datetime import datetime, timedelta
+
+from asyncpg import Record
+from attr import dataclass
+
+from mautrix.types import RoomID, UserID
+from mautrix.util.async_db import Database
+
+fake_db = Database.create("") if TYPE_CHECKING else None
+
+
+@dataclass
+class Backfill:
+    db: ClassVar[Database] = fake_db
+
+    queue_id: int | None
+    user_mxid: UserID
+    priority: int
+    portal_thread_id: str
+    portal_receiver: int
+    num_pages: int
+    page_delay: int
+    post_batch_delay: int
+    max_total_pages: int
+    dispatch_time: datetime | None
+    completed_at: datetime | None
+    cooldown_timeout: datetime | None
+
+    @staticmethod
+    def new(
+        user_mxid: UserID,
+        priority: int,
+        portal_thread_id: str,
+        portal_receiver: int,
+        num_pages: int,
+        page_delay: int = 0,
+        post_batch_delay: int = 0,
+        max_total_pages: int = -1,
+    ) -> "Backfill":
+        return Backfill(
+            queue_id=None,
+            user_mxid=user_mxid,
+            priority=priority,
+            portal_thread_id=portal_thread_id,
+            portal_receiver=portal_receiver,
+            num_pages=num_pages,
+            page_delay=page_delay,
+            post_batch_delay=post_batch_delay,
+            max_total_pages=max_total_pages,
+            dispatch_time=None,
+            completed_at=None,
+            cooldown_timeout=None,
+        )
+
+    @classmethod
+    def _from_row(cls, row: Record | None) -> Backfill | None:
+        if row is None:
+            return None
+        return cls(**row)
+
+    columns = [
+        "user_mxid",
+        "priority",
+        "portal_thread_id",
+        "portal_receiver",
+        "num_pages",
+        "page_delay",
+        "post_batch_delay",
+        "max_total_pages",
+        "dispatch_time",
+        "completed_at",
+        "cooldown_timeout",
+    ]
+    columns_str = ",".join(columns)
+
+    @classmethod
+    async def get_next(cls, user_mxid: UserID) -> Backfill | None:
+        q = f"""
+        SELECT queue_id, {cls.columns_str}
+        FROM backfill_queue
+        WHERE user_mxid=$1
+            AND (
+                dispatch_time IS NULL
+                OR (
+                    dispatch_time < $2
+                    AND completed_at IS NULL
+                )
+            )
+            AND (
+                cooldown_timeout IS NULL
+                OR cooldown_timeout < current_timestamp
+            )
+        ORDER BY priority, queue_id
+        LIMIT 1
+        """
+        return cls._from_row(
+            await cls.db.fetchrow(q, user_mxid, datetime.now() - timedelta(minutes=15))
+        )
+
+    @classmethod
+    async def get(
+        cls,
+        user_mxid: UserID,
+        portal_thread_id: str,
+        portal_receiver: int,
+    ) -> Backfill | None:
+        q = f"""
+        SELECT queue_id, {cls.columns_str}
+        FROM backfill_queue
+        WHERE user_mxid=$1
+          AND portal_thread_id=$2
+          AND portal_receiver=$3
+        ORDER BY priority, queue_id
+        LIMIT 1
+        """
+        return cls._from_row(
+            await cls.db.fetchrow(q, user_mxid, portal_thread_id, portal_receiver)
+        )
+
+    @classmethod
+    async def delete_all(cls, user_mxid: UserID) -> None:
+        await cls.db.execute("DELETE FROM backfill_queue WHERE user_mxid=$1", user_mxid)
+
+    @classmethod
+    async def delete_for_portal(cls, portal_thread_id: str, portal_receiver: int) -> None:
+        q = "DELETE FROM backfill_queue WHERE portal_thread_id=$1 AND portal_receiver=$2"
+        await cls.db.execute(q, portal_thread_id, portal_receiver)
+
+    async def insert(self) -> None:
+        q = f"""
+        INSERT INTO backfill_queue ({self.columns_str})
+        VALUES ({','.join(f'${i+1}' for i in range(len(self.columns)))})
+        RETURNING queue_id
+        """
+        row = await self.db.fetchrow(
+            q,
+            self.user_mxid,
+            self.priority,
+            self.portal_thread_id,
+            self.portal_receiver,
+            self.num_pages,
+            self.page_delay,
+            self.post_batch_delay,
+            self.max_total_pages,
+            self.dispatch_time,
+            self.completed_at,
+            self.cooldown_timeout,
+        )
+        self.queue_id = row["queue_id"]
+
+    async def mark_dispatched(self) -> None:
+        q = "UPDATE backfill_queue SET dispatch_time=$1 WHERE queue_id=$2"
+        await self.db.execute(q, datetime.now(), self.queue_id)
+
+    async def mark_done(self) -> None:
+        q = "UPDATE backfill_queue SET completed_at=$1 WHERE queue_id=$2"
+        await self.db.execute(q, datetime.now(), self.queue_id)
+
+    async def set_cooldown_timeout(self, timeout) -> None:
+        """
+        Set the backfill request to cooldown for ``timeout`` seconds.
+        """
+        q = "UPDATE backfill_queue SET cooldown_timeout=$1 WHERE queue_id=$2"
+        await self.db.execute(q, datetime.now() + timedelta(seconds=timeout), self.queue_id)

+ 16 - 9
mautrix_instagram/db/message.py

@@ -18,9 +18,10 @@ from __future__ import annotations
 from typing import TYPE_CHECKING, ClassVar
 
 from attr import dataclass
+import attr
 
 from mautrix.types import EventID, RoomID
-from mautrix.util.async_db import Database
+from mautrix.util.async_db import Database, Scheme
 
 fake_db = Database.create("") if TYPE_CHECKING else None
 
@@ -37,18 +38,16 @@ class Message:
     sender: int
     ig_timestamp: int | None
 
+    _columns = "mxid, mx_room, item_id, client_context, receiver, sender, ig_timestamp"
+    _insert_query = f"INSERT INTO message ({_columns}) VALUES ($1, $2, $3, $4, $5, $6, $7)"
+
     @property
     def ig_timestamp_ms(self) -> int:
         return (self.ig_timestamp // 1000) if self.ig_timestamp else 0
 
     async def insert(self) -> None:
-        q = """
-            INSERT INTO message (mxid, mx_room, item_id, client_context, receiver, sender,
-                                 ig_timestamp)
-            VALUES ($1, $2, $3, $4, $5, $6, $7)
-        """
         await self.db.execute(
-            q,
+            self._insert_query,
             self.mxid,
             self.mx_room,
             self.item_id,
@@ -58,6 +57,16 @@ class Message:
             self.ig_timestamp,
         )
 
+    @classmethod
+    async def bulk_insert(cls, messages: list[Message]) -> None:
+        columns = cls._columns.split(", ")
+        records = [attr.astuple(message) for message in messages]
+        async with cls.db.acquire() as conn, conn.transaction():
+            if cls.db.scheme == Scheme.POSTGRES:
+                await conn.copy_records_to_table("message", records=records, columns=columns)
+            else:
+                await conn.executemany(cls._insert_query, records)
+
     async def delete(self) -> None:
         q = "DELETE FROM message WHERE item_id=$1 AND receiver=$2"
         await self.db.execute(q, self.item_id, self.receiver)
@@ -66,8 +75,6 @@ class Message:
     async def delete_all(cls, room_id: RoomID) -> None:
         await cls.db.execute("DELETE FROM message WHERE mx_room=$1", room_id)
 
-    _columns = "mxid, mx_room, item_id, client_context, receiver, sender, ig_timestamp"
-
     @classmethod
     async def get_by_mxid(cls, mxid: EventID, mx_room: RoomID) -> Message | None:
         q = f"SELECT {cls._columns} FROM message WHERE mxid=$1 AND mx_room=$2"

+ 61 - 41
mautrix_instagram/db/portal.py

@@ -17,10 +17,10 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING, ClassVar
 
+from asyncpg import Record
 from attr import dataclass
-import asyncpg
 
-from mautrix.types import ContentURI, RoomID, UserID
+from mautrix.types import BatchID, ContentURI, EventID, RoomID, UserID
 from mautrix.util.async_db import Database
 
 fake_db = Database.create("") if TYPE_CHECKING else None
@@ -40,6 +40,10 @@ class Portal:
     name_set: bool
     avatar_set: bool
     relay_user_id: UserID | None
+    first_event_id: EventID | None
+    next_batch_id: BatchID | None
+    historical_base_insertion_event_id: EventID | None
+    cursor: str | none
 
     @property
     def _values(self):
@@ -54,77 +58,93 @@ class Portal:
             self.name_set,
             self.avatar_set,
             self.relay_user_id,
+            self.first_event_id,
+            self.next_batch_id,
+            self.historical_base_insertion_event_id,
+            self.cursor,
         )
 
+    column_names = ",".join(
+        (
+            "thread_id",
+            "receiver",
+            "other_user_pk",
+            "mxid",
+            "name",
+            "avatar_url",
+            "encrypted",
+            "name_set",
+            "avatar_set",
+            "relay_user_id",
+            "first_event_id",
+            "next_batch_id",
+            "historical_base_insertion_event_id",
+            "cursor",
+        )
+    )
+
     async def insert(self) -> None:
         q = (
-            "INSERT INTO portal (thread_id, receiver, other_user_pk, mxid, name, avatar_url, "
-            "                    encrypted, name_set, avatar_set, relay_user_id) "
-            "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)"
+            f"INSERT INTO portal ({self.column_names}) "
+            "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)"
         )
         await self.db.execute(q, *self._values)
 
     async def update(self) -> None:
         q = (
             "UPDATE portal SET other_user_pk=$3, mxid=$4, name=$5, avatar_url=$6, encrypted=$7,"
-            "                  name_set=$8, avatar_set=$9, relay_user_id=$10 "
+            "                  name_set=$8, avatar_set=$9, relay_user_id=$10, first_event_id=$11,"
+            "                  next_batch_id=$12, historical_base_insertion_event_id=$13,"
+            "                  cursor=$14 "
             "WHERE thread_id=$1 AND receiver=$2"
         )
         await self.db.execute(q, *self._values)
 
     @classmethod
-    def _from_row(cls, row: asyncpg.Record) -> Portal:
+    def _from_row(cls, row: Record | None) -> Portal | None:
+        if row is None:
+            return None
         return cls(**row)
 
     @classmethod
     async def get_by_mxid(cls, mxid: RoomID) -> Portal | None:
-        q = (
-            "SELECT thread_id, receiver, other_user_pk, mxid, name, avatar_url, encrypted, "
-            "       name_set, avatar_set, relay_user_id "
-            "FROM portal WHERE mxid=$1"
-        )
+        q = f"SELECT {cls.column_names} FROM portal WHERE mxid=$1"
         row = await cls.db.fetchrow(q, mxid)
-        if not row:
-            return None
         return cls._from_row(row)
 
     @classmethod
     async def get_by_thread_id(
         cls, thread_id: str, receiver: int, rec_must_match: bool = True
     ) -> Portal | None:
-        q = (
-            "SELECT thread_id, receiver, other_user_pk, mxid, name, avatar_url, encrypted, "
-            "       name_set, avatar_set, relay_user_id "
-            "FROM portal WHERE thread_id=$1 AND receiver=$2"
-        )
+        q = f"SELECT {cls.column_names} FROM portal WHERE thread_id=$1 AND receiver=$2"
         if not rec_must_match:
-            q = (
-                "SELECT thread_id, receiver, other_user_pk, mxid, name, avatar_url, encrypted, "
-                "       name_set, avatar_set "
-                "FROM portal WHERE thread_id=$1 AND (receiver=$2 OR receiver=0)"
-            )
+            q = f"""
+                SELECT {cls.column_names}
+                FROM portal
+                WHERE thread_id=$1
+                    AND (receiver=$2 OR receiver=0)
+            """
         row = await cls.db.fetchrow(q, thread_id, receiver)
-        if not row:
-            return None
         return cls._from_row(row)
 
     @classmethod
     async def find_private_chats_of(cls, receiver: int) -> list[Portal]:
-        q = (
-            "SELECT thread_id, receiver, other_user_pk, mxid, name, avatar_url, encrypted, "
-            "       name_set, avatar_set, relay_user_id "
-            "FROM portal WHERE receiver=$1 AND other_user_pk IS NOT NULL"
-        )
+        q = f"""
+            SELECT {cls.column_names}
+            FROM portal
+            WHERE receiver=$1
+                AND other_user_pk IS NOT NULL
+        """
         rows = await cls.db.fetch(q, receiver)
         return [cls._from_row(row) for row in rows]
 
     @classmethod
     async def find_private_chats_with(cls, other_user: int) -> list[Portal]:
-        q = (
-            "SELECT thread_id, receiver, other_user_pk, mxid, name, avatar_url, encrypted, "
-            "       name_set, avatar_set, relay_user_id "
-            "FROM portal WHERE other_user_pk=$1"
-        )
+        q = f"""
+            SELECT {cls.column_names}
+            FROM portal
+            WHERE other_user_pk=$1
+        """
         rows = await cls.db.fetch(q, other_user)
         return [cls._from_row(row) for row in rows]
 
@@ -135,10 +155,10 @@ class Portal:
 
     @classmethod
     async def all_with_room(cls) -> list[Portal]:
-        q = (
-            "SELECT thread_id, receiver, other_user_pk, mxid, name, avatar_url, encrypted, "
-            "       name_set, avatar_set, relay_user_id "
-            "FROM portal WHERE mxid IS NOT NULL"
-        )
+        q = f"""
+            SELECT {cls.column_names}
+            FROM portal
+            WHERE mxid IS NOT NULL
+        """
         rows = await cls.db.fetch(q)
         return [cls._from_row(row) for row in rows]

+ 32 - 0
mautrix_instagram/db/upgrade/__init__.py

@@ -0,0 +1,32 @@
+# mautrix-instagram - A Matrix-Instagram puppeting bridge.
+# Copyright (C) 2022 Tulir Asokan, Sumner Evans
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from mautrix.util.async_db import UpgradeTable
+
+upgrade_table = UpgradeTable()
+
+from . import (
+    v00_latest_revision,
+    v02_name_avatar_set,
+    v03_relay_portal,
+    v04_message_client_content,
+    v05_message_ig_timestamp,
+    v06_hidden_events,
+    v07_reaction_timestamps,
+    v08_sync_sequence_id,
+    v09_backfill_queue,
+    v10_portal_infinite_backfill,
+    v11_per_user_thread_sync_status,
+)

+ 53 - 61
mautrix_instagram/db/upgrade.py → mautrix_instagram/db/upgrade/v00_latest_revision.py

@@ -1,5 +1,5 @@
 # mautrix-instagram - A Matrix-Instagram puppeting bridge.
-# Copyright (C) 2020 Tulir Asokan
+# Copyright (C) 2022 Tulir Asokan, Sumner Evans
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Affero General Public License as published by
@@ -13,38 +13,43 @@
 #
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
-from asyncpg import Connection
+from mautrix.util.async_db import Connection, Scheme
 
-from mautrix.util.async_db import UpgradeTable
+from . import upgrade_table
 
-upgrade_table = UpgradeTable()
 
-
-@upgrade_table.register(description="Latest revision", upgrades_to=8)
-async def upgrade_latest(conn: Connection) -> None:
+@upgrade_table.register(description="Latest revision", upgrades_to=11)
+async def upgrade_latest(conn: Connection, scheme: Scheme) -> None:
     await conn.execute(
         """CREATE TABLE portal (
-            thread_id     TEXT,
-            receiver      BIGINT,
-            other_user_pk BIGINT,
-            mxid          TEXT,
-            name          TEXT,
-            avatar_url    TEXT,
-            name_set      BOOLEAN NOT NULL DEFAULT false,
-            avatar_set    BOOLEAN NOT NULL DEFAULT false,
-            encrypted     BOOLEAN NOT NULL DEFAULT false,
-            relay_user_id TEXT,
+            thread_id                           TEXT,
+            receiver                            BIGINT,
+            other_user_pk                       BIGINT,
+            mxid                                TEXT,
+            name                                TEXT,
+            avatar_url                          TEXT,
+            name_set                            BOOLEAN NOT NULL DEFAULT false,
+            avatar_set                          BOOLEAN NOT NULL DEFAULT false,
+            encrypted                           BOOLEAN NOT NULL DEFAULT false,
+            relay_user_id                       TEXT,
+            first_event_id                      TEXT,
+            next_batch_id                       TEXT,
+            historical_base_insertion_event_id  TEXT,
+            cursor                              TEXT,
             PRIMARY KEY (thread_id, receiver)
         )"""
     )
     await conn.execute(
         """CREATE TABLE "user" (
-            mxid           TEXT PRIMARY KEY,
-            igpk           BIGINT,
-            state          jsonb,
-            seq_id         BIGINT,
-            snapshot_at_ms BIGINT,
-            notice_room    TEXT
+            mxid                        TEXT PRIMARY KEY,
+            igpk                        BIGINT,
+            state                       jsonb,
+            seq_id                      BIGINT,
+            snapshot_at_ms              BIGINT,
+            notice_room                 TEXT,
+            oldest_cursor               TEXT,
+            total_backfilled_portals    INTEGER,
+            thread_sync_completed       BOOLEAN NOT NULL DEFAULT false
         )"""
     )
     await conn.execute(
@@ -103,41 +108,28 @@ async def upgrade_latest(conn: Connection) -> None:
         )"""
     )
 
-
-@upgrade_table.register(description="Add name_set and avatar_set to portal table")
-async def upgrade_v2(conn: Connection) -> None:
-    await conn.execute("ALTER TABLE portal ADD COLUMN avatar_url TEXT")
-    await conn.execute("ALTER TABLE portal ADD COLUMN name_set BOOLEAN NOT NULL DEFAULT false")
-    await conn.execute("ALTER TABLE portal ADD COLUMN avatar_set BOOLEAN NOT NULL DEFAULT false")
-    await conn.execute("UPDATE portal SET name_set=true WHERE name<>''")
-
-
-@upgrade_table.register(description="Add relay user field to portal table")
-async def upgrade_v3(conn: Connection) -> None:
-    await conn.execute("ALTER TABLE portal ADD COLUMN relay_user_id TEXT")
-
-
-@upgrade_table.register(description="Add client context field to message table")
-async def upgrade_v4(conn: Connection) -> None:
-    await conn.execute("ALTER TABLE message ADD COLUMN client_context TEXT")
-
-
-@upgrade_table.register(description="Add ig_timestamp field to message table")
-async def upgrade_v5(conn: Connection) -> None:
-    await conn.execute("ALTER TABLE message ADD COLUMN ig_timestamp BIGINT")
-
-
-@upgrade_table.register(description="Allow hidden events in message table")
-async def upgrade_v6(conn: Connection) -> None:
-    await conn.execute("ALTER TABLE message ALTER COLUMN mxid DROP NOT NULL")
-
-
-@upgrade_table.register(description="Store reaction timestamps")
-async def upgrade_v7(conn: Connection) -> None:
-    await conn.execute("ALTER TABLE reaction ADD COLUMN mx_timestamp BIGINT")
-
-
-@upgrade_table.register(description="Store sync sequence ID in user table")
-async def upgrade_v8(conn: Connection) -> None:
-    await conn.execute('ALTER TABLE "user" ADD COLUMN seq_id BIGINT')
-    await conn.execute('ALTER TABLE "user" ADD COLUMN snapshot_at_ms BIGINT')
+    gen = ""
+    if scheme in (Scheme.POSTGRES, Scheme.COCKROACH):
+        gen = "GENERATED ALWAYS AS IDENTITY"
+    await conn.execute(
+        f"""
+        CREATE TABLE backfill_queue (
+            queue_id            INTEGER PRIMARY KEY {gen},
+            user_mxid           TEXT,
+            priority            INTEGER NOT NULL,
+            portal_thread_id    TEXT,
+            portal_receiver     BIGINT,
+            num_pages           INTEGER NOT NULL,
+            page_delay          INTEGER NOT NULL,
+            post_batch_delay    INTEGER NOT NULL,
+            max_total_pages     INTEGER NOT NULL,
+            dispatch_time       TIMESTAMP,
+            completed_at        TIMESTAMP,
+            cooldown_timeout    TIMESTAMP,
+
+            FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,
+            FOREIGN KEY (portal_thread_id, portal_receiver)
+                REFERENCES portal(thread_id, receiver) ON DELETE CASCADE
+        )
+        """
+    )

+ 26 - 0
mautrix_instagram/db/upgrade/v02_name_avatar_set.py

@@ -0,0 +1,26 @@
+# mautrix-instagram - A Matrix-Instagram puppeting bridge.
+# Copyright (C) 2022 Tulir Asokan, Sumner Evans
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from mautrix.util.async_db import Connection
+
+from . import upgrade_table
+
+
+@upgrade_table.register(description="Add name_set and avatar_set to portal table")
+async def upgrade_v2(conn: Connection) -> None:
+    await conn.execute("ALTER TABLE portal ADD COLUMN avatar_url TEXT")
+    await conn.execute("ALTER TABLE portal ADD COLUMN name_set BOOLEAN NOT NULL DEFAULT false")
+    await conn.execute("ALTER TABLE portal ADD COLUMN avatar_set BOOLEAN NOT NULL DEFAULT false")
+    await conn.execute("UPDATE portal SET name_set=true WHERE name<>''")

+ 23 - 0
mautrix_instagram/db/upgrade/v03_relay_portal.py

@@ -0,0 +1,23 @@
+# mautrix-instagram - A Matrix-Instagram puppeting bridge.
+# Copyright (C) 2022 Tulir Asokan, Sumner Evans
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from mautrix.util.async_db import Connection
+
+from . import upgrade_table
+
+
+@upgrade_table.register(description="Add relay user field to portal table")
+async def upgrade_v3(conn: Connection) -> None:
+    await conn.execute("ALTER TABLE portal ADD COLUMN relay_user_id TEXT")

+ 23 - 0
mautrix_instagram/db/upgrade/v04_message_client_content.py

@@ -0,0 +1,23 @@
+# mautrix-instagram - A Matrix-Instagram puppeting bridge.
+# Copyright (C) 2022 Tulir Asokan, Sumner Evans
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from mautrix.util.async_db import Connection
+
+from . import upgrade_table
+
+
+@upgrade_table.register(description="Add client context field to message table")
+async def upgrade_v4(conn: Connection) -> None:
+    await conn.execute("ALTER TABLE message ADD COLUMN client_context TEXT")

+ 23 - 0
mautrix_instagram/db/upgrade/v05_message_ig_timestamp.py

@@ -0,0 +1,23 @@
+# mautrix-instagram - A Matrix-Instagram puppeting bridge.
+# Copyright (C) 2022 Tulir Asokan, Sumner Evans
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from mautrix.util.async_db import Connection
+
+from . import upgrade_table
+
+
+@upgrade_table.register(description="Add ig_timestamp field to message table")
+async def upgrade_v5(conn: Connection) -> None:
+    await conn.execute("ALTER TABLE message ADD COLUMN ig_timestamp BIGINT")

+ 23 - 0
mautrix_instagram/db/upgrade/v06_hidden_events.py

@@ -0,0 +1,23 @@
+# mautrix-instagram - A Matrix-Instagram puppeting bridge.
+# Copyright (C) 2022 Tulir Asokan, Sumner Evans
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from mautrix.util.async_db import Connection
+
+from . import upgrade_table
+
+
+@upgrade_table.register(description="Allow hidden events in message table")
+async def upgrade_v6(conn: Connection) -> None:
+    await conn.execute("ALTER TABLE message ALTER COLUMN mxid DROP NOT NULL")

+ 23 - 0
mautrix_instagram/db/upgrade/v07_reaction_timestamps.py

@@ -0,0 +1,23 @@
+# mautrix-instagram - A Matrix-Instagram puppeting bridge.
+# Copyright (C) 2022 Tulir Asokan, Sumner Evans
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from mautrix.util.async_db import Connection
+
+from . import upgrade_table
+
+
+@upgrade_table.register(description="Store reaction timestamps")
+async def upgrade_v7(conn: Connection) -> None:
+    await conn.execute("ALTER TABLE reaction ADD COLUMN mx_timestamp BIGINT")

+ 24 - 0
mautrix_instagram/db/upgrade/v08_sync_sequence_id.py

@@ -0,0 +1,24 @@
+# mautrix-instagram - A Matrix-Instagram puppeting bridge.
+# Copyright (C) 2022 Tulir Asokan, Sumner Evans
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from mautrix.util.async_db import Connection
+
+from . import upgrade_table
+
+
+@upgrade_table.register(description="Store sync sequence ID in user table")
+async def upgrade_v8(conn: Connection) -> None:
+    await conn.execute('ALTER TABLE "user" ADD COLUMN seq_id BIGINT')
+    await conn.execute('ALTER TABLE "user" ADD COLUMN snapshot_at_ms BIGINT')

+ 47 - 0
mautrix_instagram/db/upgrade/v09_backfill_queue.py

@@ -0,0 +1,47 @@
+# mautrix-instagram - A Matrix-Instagram puppeting bridge.
+# Copyright (C) 2022 Tulir Asokan, Sumner Evans
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from mautrix.util.async_db import Connection, Scheme
+
+from . import upgrade_table
+
+
+@upgrade_table.register(description="Add the backfill queue table")
+async def upgrade_v9(conn: Connection, scheme: Scheme) -> None:
+    gen = ""
+    if scheme in (Scheme.POSTGRES, Scheme.COCKROACH):
+        gen = "GENERATED ALWAYS AS IDENTITY"
+    await conn.execute(
+        f"""
+        CREATE TABLE backfill_queue (
+            queue_id            INTEGER PRIMARY KEY {gen},
+            user_mxid           TEXT,
+            priority            INTEGER NOT NULL,
+            portal_thread_id    TEXT,
+            portal_receiver     BIGINT,
+            num_pages           INTEGER NOT NULL,
+            page_delay          INTEGER NOT NULL,
+            post_batch_delay    INTEGER NOT NULL,
+            max_total_pages     INTEGER NOT NULL,
+            dispatch_time       TIMESTAMP,
+            completed_at        TIMESTAMP,
+            cooldown_timeout    TIMESTAMP,
+
+            FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,
+            FOREIGN KEY (portal_thread_id, portal_receiver)
+                REFERENCES portal(thread_id, receiver) ON DELETE CASCADE
+        )
+        """
+    )

+ 26 - 0
mautrix_instagram/db/upgrade/v10_portal_infinite_backfill.py

@@ -0,0 +1,26 @@
+# mautrix-instagram - A Matrix-Instagram puppeting bridge.
+# Copyright (C) 2022 Tulir Asokan, Sumner Evans
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from mautrix.util.async_db import Connection
+
+from . import upgrade_table
+
+
+@upgrade_table.register(description="Add columns to store infinite backfill pointers for portals")
+async def upgrade_v10(conn: Connection) -> None:
+    await conn.execute("ALTER TABLE portal ADD COLUMN first_event_id TEXT")
+    await conn.execute("ALTER TABLE portal ADD COLUMN next_batch_id TEXT")
+    await conn.execute("ALTER TABLE portal ADD COLUMN historical_base_insertion_event_id TEXT")
+    await conn.execute("ALTER TABLE portal ADD COLUMN cursor TEXT")

+ 29 - 0
mautrix_instagram/db/upgrade/v11_per_user_thread_sync_status.py

@@ -0,0 +1,29 @@
+# mautrix-instagram - A Matrix-Instagram puppeting bridge.
+# Copyright (C) 2022 Tulir Asokan, Sumner Evans
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from mautrix.util.async_db import Connection
+
+from . import upgrade_table
+
+
+@upgrade_table.register(
+    description="Add columns to the user table to track thread sync status for backfill"
+)
+async def upgrade_v12(conn: Connection) -> None:
+    await conn.execute('ALTER TABLE "user" ADD COLUMN oldest_cursor TEXT')
+    await conn.execute('ALTER TABLE "user" ADD COLUMN total_backfilled_portals INTEGER')
+    await conn.execute(
+        'ALTER TABLE "user" ADD COLUMN thread_sync_completed BOOLEAN NOT NULL DEFAULT false'
+    )

+ 26 - 6
mautrix_instagram/db/user.py

@@ -37,6 +37,9 @@ class User:
     notice_room: RoomID | None
     seq_id: int | None
     snapshot_at_ms: int | None
+    oldest_cursor: str | None
+    total_backfilled_portals: int | None
+    thread_sync_completed: bool
 
     @property
     def _values(self):
@@ -47,18 +50,37 @@ class User:
             self.notice_room,
             self.seq_id,
             self.snapshot_at_ms,
+            self.oldest_cursor,
+            self.total_backfilled_portals,
+            self.thread_sync_completed,
         )
 
+    _columns = ",".join(
+        (
+            "mxid",
+            "igpk",
+            "state",
+            "notice_room",
+            "seq_id",
+            "snapshot_at_ms",
+            "oldest_cursor",
+            "total_backfilled_portals",
+            "thread_sync_completed",
+        )
+    )
+
     async def insert(self) -> None:
-        q = """
-        INSERT INTO "user" (mxid, igpk, state, notice_room, seq_id, snapshot_at_ms)
-        VALUES ($1, $2, $3, $4, $5, $6)
+        q = f"""
+        INSERT INTO "user" ({self._columns})
+        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
         """
         await self.db.execute(q, *self._values)
 
     async def update(self) -> None:
         q = """
-        UPDATE "user" SET igpk=$2, state=$3, notice_room=$4, seq_id=$5, snapshot_at_ms=$6
+        UPDATE "user"
+        SET igpk=$2, state=$3, notice_room=$4, seq_id=$5, snapshot_at_ms=$6,
+            oldest_cursor=$7, total_backfilled_portals=$8, thread_sync_completed=$9
         WHERE mxid=$1
         """
         await self.db.execute(q, *self._values)
@@ -73,8 +95,6 @@ class User:
         state_str = data.pop("state")
         return cls(state=AndroidState.parse_json(state_str) if state_str else None, **data)
 
-    _columns = "mxid, igpk, state, notice_room, seq_id, snapshot_at_ms"
-
     @classmethod
     async def get_by_mxid(cls, mxid: UserID) -> User | None:
         q = f'SELECT {cls._columns} FROM "user" WHERE mxid=$1'

+ 70 - 23
mautrix_instagram/example-config.yaml

@@ -109,15 +109,9 @@ bridge:
     # Maximum length of displayname
     displayname_max_length: 100
 
-    # Maximum number of seconds since the last activity in a chat to automatically create portals.
-    portal_create_max_age: 259200
-    # Maximum number of chats to fetch for startup sync
-    chat_sync_limit: 20
-    # Maximum number of chats to create during startup sync
-    chat_create_limit: 10
-    # Should the chat list be synced on startup?
-    # If false, the bridge will try to reconnect to MQTT directly and ask the server to send missed events.
-    resync_on_startup: true
+    # The maximum number of conversations that should be synced when we get a
+    # message sync error. In general, 1 page (20) is sufficient.
+    max_startup_thread_sync_count: 20
     # Whether or not to use /sync to get read receipts and typing notifications
     # when double puppeting is enabled
     sync_with_custom_puppets: false
@@ -144,20 +138,73 @@ bridge:
     federate_rooms: true
     # Settings for backfilling messages from Instagram.
     backfill:
-        # Whether or not the Instagram users of logged in Matrix users should be
-        # invited to private chats when backfilling history from Instagram. This is
-        # usually needed to prevent rate limits and to allow timestamp massaging.
-        invite_own_puppet: true
-        # Maximum number of messages to backfill initially.
-        # Set to 0 to disable backfilling when creating portal.
-        initial_limit: 0
-        # Maximum number of messages to backfill if messages were missed while
-        # the bridge was disconnected.
-        # Set to 0 to disable backfilling missed messages.
-        missed_limit: 1000
-        # If using double puppeting, should notifications be disabled
-        # while the initial backfill is in progress?
-        disable_notifications: false
+        # Whether to enable backfilling at all.
+        #
+        # This requires a server with MSC2716 support, which is currently an
+        # experimental feature in synapse. It can be enabled by setting
+        # experimental_features -> msc2716_enabled to true in homeserver.yaml.
+        #
+        # Note that prior to Synapse 1.49, there were some bugs with the
+        # implementation, especially if using event persistence workers. There
+        # are also still some issues in Synapse's federation implementation.
+        enable: false
+        # Use MSC2716 for backfilling? If this is disabled, backfilling only happens when syncing threads,
+        # and the incremental settings below don't apply.
+        #
+        # This requires a server with MSC2716 support, which is currently an experimental feature in Synapse.
+        # It can be enabled by setting experimental_features -> msc2716_enabled to true in homeserver.yaml.
+        msc2716: false
+        # Use double puppets for backfilling?
+        # In order to use this, the double puppets must be in the appservice's user ID namespace
+        # (because the bridge can't use the double puppet access token with batch sending).
+        # This only affects double puppets on the local server, double puppets on other servers will never be used.
+        double_puppet_backfill: false
+        # The maximum number of conversations that should be synced.
+        # Other conversations will be backfilled on demand when the start PM
+        # provisioning endpoint is used or when a message comes in from that
+        # chat.
+        # If set to -1, all conversations will by synced.
+        max_conversations: 20
+        # The minimum amount of time to wait between syncing each thread. This
+        # helps avoid situations where you sync too quickly.
+        min_sync_thread_delay: 5
+        # If this value is greater than 0, then if the conversation's last
+        # message was more than this number of hours ago, then the conversation
+        # will automatically be marked it as read.
+        # Conversations that have a last message that is less than this number
+        # of hours ago will have their unread status synced from Instagram.
+        unread_hours_threshold: 0
+
+        # Settings for how quickly to backoff when rate-limits are encountered
+        # while backfilling.
+        backoff:
+            # How many seconds to wait after getting rate limited during a
+            # thread list fetch.
+            thread_list: 300
+            # How many seconds to wait after getting rate limited during a
+            # message history fetch.
+            message_history: 300
+
+        # Settings for backfills.
+        #
+        # During initial/incremental sync, the entirety of the thread that is
+        # available will be backfilled. For example, on initial sync, about 20
+        # messages are included for each thread in the thread list returned by
+        # the server. After that, incremental backfills will be run for each of
+        # the portals in a round-robin fashion until all portals have been
+        # backfilled as configured below.
+        incremental:
+            # The maximum number of pages to backfill per batch.
+            max_pages: 10
+            # The maximum number of total pages to backfill per portal.
+            # If set to -1, infinite pages will be synced.
+            max_total_pages: -1
+            # The number of seconds to wait between backfilling each page.
+            page_delay: 5
+            # The number of seconds to wait after backfilling the batch of
+            # messages.
+            post_batch_delay: 20
+
     periodic_reconnect:
         # Interval in seconds in which to automatically reconnect all users.
         # This can be used to automatically mitigate the bug where Instagram stops sending messages.

+ 16 - 0
mautrix_instagram/matrix.py

@@ -16,6 +16,7 @@
 from __future__ import annotations
 
 from typing import TYPE_CHECKING
+import sys
 
 from mautrix.bridge import BaseMatrixHandler
 from mautrix.types import (
@@ -51,6 +52,21 @@ class MatrixHandler(BaseMatrixHandler):
 
         super().__init__(bridge=bridge)
 
+    async def check_versions(self) -> None:
+        await super().check_versions()
+        if self.config["bridge.backfill.enable"] and not (
+            support := self.versions.supports("org.matrix.msc2716")
+        ):
+            self.log.fatal(
+                "Backfilling is enabled in bridge config, but "
+                + (
+                    "MSC2716 batch sending is not enabled on homeserver"
+                    if support is False
+                    else "homeserver does not support MSC2716 batch sending"
+                )
+            )
+            sys.exit(18)
+
     async def send_welcome_message(self, room_id: RoomID, inviter: u.User) -> None:
         await super().send_welcome_message(room_id, inviter)
         if not inviter.notice_room:

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 644 - 213
mautrix_instagram/portal.py


+ 329 - 30
mautrix_instagram/user.py

@@ -15,7 +15,9 @@
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 from __future__ import annotations
 
-from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable, cast
+from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable, Callable, cast
+from datetime import datetime, timedelta
+from functools import partial
 import asyncio
 import logging
 import time
@@ -27,6 +29,7 @@ from mauigpapi.errors import (
     IGChallengeError,
     IGCheckpointError,
     IGConsentRequiredError,
+    IGLoginRequiredError,
     IGNotLoggedInError,
     IGRateLimitError,
     IGUserIDNotFoundError,
@@ -53,16 +56,18 @@ from mauigpapi.types import (
     ThreadSyncEvent,
     TypingStatus,
 )
+from mauigpapi.types.direct_inbox import DMInbox, DMInboxResponse
 from mautrix.appservice import AppService
 from mautrix.bridge import BaseUser, async_getter_lock
 from mautrix.types import EventID, MessageType, RoomID, TextMessageEventContent, UserID
 from mautrix.util.bridge_state import BridgeState, BridgeStateEvent
 from mautrix.util.logging import TraceLogger
 from mautrix.util.opt_prometheus import Gauge, Summary, async_time
+from mautrix.util.simple_lock import SimpleLock
 
 from . import portal as po, puppet as pu
 from .config import Config
-from .db import Portal as DBPortal, User as DBUser
+from .db import Backfill, Message as DBMessage, Portal as DBPortal, User as DBUser
 
 if TYPE_CHECKING:
     from .__main__ import InstagramBridge
@@ -111,6 +116,9 @@ class User(DBUser, BaseUser):
     client: AndroidAPI | None
     mqtt: AndroidMQTT | None
     _listen_task: asyncio.Task | None = None
+    _sync_lock: SimpleLock
+    _backfill_loop_task: asyncio.Task | None
+    _thread_sync_task: asyncio.Task | None
     _seq_id_save_task: asyncio.Task | None
 
     permission_level: str
@@ -131,6 +139,9 @@ class User(DBUser, BaseUser):
         notice_room: RoomID | None = None,
         seq_id: int | None = None,
         snapshot_at_ms: int | None = None,
+        oldest_cursor: str | None = None,
+        total_backfilled_portals: int | None = None,
+        thread_sync_completed: bool = False,
     ) -> None:
         super().__init__(
             mxid=mxid,
@@ -139,6 +150,9 @@ class User(DBUser, BaseUser):
             notice_room=notice_room,
             seq_id=seq_id,
             snapshot_at_ms=snapshot_at_ms,
+            oldest_cursor=oldest_cursor,
+            total_backfilled_portals=total_backfilled_portals,
+            thread_sync_completed=thread_sync_completed,
         )
         BaseUser.__init__(self)
         self._notice_room_lock = asyncio.Lock()
@@ -152,7 +166,12 @@ class User(DBUser, BaseUser):
         self._is_connected = False
         self._is_refreshing = False
         self.shutdown = False
+        self._sync_lock = SimpleLock(
+            "Waiting for thread sync to finish before handling %s", log=self.log
+        )
         self._listen_task = None
+        self._thread_sync_task = None
+        self._backfill_loop_task = None
         self.remote_typing_status = None
         self._seq_id_save_task = None
 
@@ -258,12 +277,62 @@ class User(DBUser, BaseUser):
         await self.update()
 
         self.loop.create_task(self._try_sync_puppet(user))
-        if not self.seq_id or self.config["bridge.resync_on_startup"]:
-            self.loop.create_task(self._try_sync())
+        self.loop.create_task(self._post_connect())
+
+    async def _post_connect(self):
+        # Backfill requests are handled synchronously so as not to overload the homeserver.
+        # Users can configure their backfill stages to be more or less aggressive with backfilling
+        # to try and avoid getting banned.
+        if not self._backfill_loop_task or self._backfill_loop_task.done():
+            self._backfill_loop_task = asyncio.create_task(self._handle_backfill_requests_loop())
+
+        if not self.seq_id:
+            await self._try_sync()
         else:
             self.log.debug("Connecting to MQTT directly as resync_on_startup is false")
             self.start_listen()
 
+        if self.config["bridge.backfill.enable"]:
+            if self._thread_sync_task and not self._thread_sync_task.done():
+                self.log.warning("Cancelling existing background thread sync task")
+                self._thread_sync_task.cancel()
+            self._thread_sync_task = asyncio.create_task(self.backfill_threads())
+
+    async def _handle_backfill_requests_loop(self) -> None:
+        while True:
+            await self._sync_lock.wait("backfill request")
+            req = await Backfill.get_next(self.mxid)
+            if not req:
+                await asyncio.sleep(30)
+                continue
+            self.log.info("Backfill request %s", req)
+            try:
+                portal = await po.Portal.get_by_thread_id(
+                    req.portal_thread_id, receiver=req.portal_receiver
+                )
+                await req.mark_dispatched()
+                await portal.backfill(self, req)
+                await req.mark_done()
+            except IGLoginRequiredError as e:
+                self.log.exception(
+                    "User is logged out. Stopping backfill requests loop and forcing refresh."
+                )
+                await self.refresh(resync=False)
+                break
+            except IGChallengeError as e:
+                self.log.exception(
+                    "User received a challenge. Stopping backfill requests loop and "
+                    "forcing refresh."
+                )
+                await self.refresh(resync=False)
+                break
+            except Exception as e:
+                self.log.exception("Failed to backfill portal %s: %s", req.portal_thread_id, e)
+
+                # Don't try again to backfill this portal for a minute.
+                await req.set_cooldown_timeout(60)
+        self._backfill_loop_task = None
+
     async def on_connect(self, evt: Connect) -> None:
         self.log.debug("Connected to Instagram")
         self._track_metric(METRIC_CONNECTED, True)
@@ -464,16 +533,99 @@ class User(DBUser, BaseUser):
             info = {"challenge": e.body.challenge.serialize() if e.body.challenge else None}
         await self.push_bridge_state(BridgeStateEvent.BAD_CREDENTIALS, error=error_code, info=info)
 
-    async def _sync_thread(self, thread: Thread, allow_create: bool) -> None:
+    async def _sync_thread(self, thread: Thread) -> bool:
+        """
+        Sync a specific thread. Returns whether the thread had messages after the last message in
+        the database before the sync.
+        """
+        self.log.debug(f"Syncing thread {thread.thread_id}")
+
+        forward_messages = thread.items
+
+        assert self.client
         portal = await po.Portal.get_by_thread(thread, self.igpk)
-        if portal.mxid:
-            self.log.debug(f"{thread.thread_id} has a portal, syncing and backfilling...")
-            await portal.update_matrix_room(self, thread, backfill=True)
-        elif allow_create:
-            self.log.debug(f"{thread.thread_id} has been active recently, creating portal...")
+        assert portal
+
+        # Create or update the Matrix room
+        if not portal.mxid:
             await portal.create_matrix_room(self, thread)
         else:
-            self.log.debug(f"{thread.thread_id} is not active and doesn't have a portal")
+            await portal.update_matrix_room(self, thread)
+
+        last_message = await DBMessage.get_last(portal.mxid)
+        cursor = thread.oldest_cursor
+        if last_message:
+            original_number_of_messages = len(thread.items)
+            new_messages = [
+                m for m in thread.items if last_message.ig_timestamp_ms < m.timestamp_ms
+            ]
+            forward_messages = new_messages
+
+            portal.log.debug(
+                f"{len(new_messages)}/{original_number_of_messages} messages are after most recent"
+                " message."
+            )
+
+            # Fetch more messages until we get back to messages that have been bridged already.
+            while len(new_messages) > 0 and len(new_messages) == original_number_of_messages:
+                await asyncio.sleep(self.config["bridge.backfill.incremental.page_delay"])
+
+                portal.log.debug("Fetching more messages for forward backfill")
+                resp = await self.client.get_thread(portal.thread_id, cursor=cursor)
+                if len(resp.thread.items) == 0:
+                    break
+                original_number_of_messages = len(resp.thread.items)
+                new_messages = [
+                    m for m in resp.thread.items if last_message.ig_timestamp_ms < m.timestamp_ms
+                ]
+                forward_messages = new_messages + forward_messages
+                cursor = resp.thread.oldest_cursor
+                portal.log.debug(
+                    f"{len(new_messages)}/{original_number_of_messages} messages are after most "
+                    "recent message."
+                )
+        elif not portal.first_event_id:
+            self.log.debug(
+                f"Skipping backfilling {portal.thread_id} as the first event ID is not known"
+            )
+            return False
+
+        if forward_messages:
+            portal.cursor = cursor
+            await portal.update()
+
+            mark_read = thread.read_state == 0 or (
+                (hours := self.config["bridge.backfill.unread_hours_threshold"]) > 0
+                and (
+                    datetime.fromtimestamp(forward_messages[0].timestamp_ms / 1000)
+                    < datetime.now() - timedelta(hours=hours)
+                )
+            )
+            base_insertion_event_id = await portal.backfill_message_page(
+                self,
+                list(reversed(forward_messages)),
+                forward=True,
+                last_message=last_message,
+                mark_read=mark_read,
+            )
+            if not self.bridge.homeserver_software.is_hungry:
+                await portal.send_post_backfill_dummy(
+                    forward_messages[0].timestamp, base_insertion_event_id=base_insertion_event_id
+                )
+            if (
+                mark_read
+                and not self.bridge.homeserver_software.is_hungry
+                and (puppet := await self.get_puppet())
+            ):
+                last_message = await DBMessage.get_last(portal.mxid)
+                if last_message:
+                    await puppet.intent_for(portal).mark_read(portal.mxid, last_message.mxid)
+
+            await portal._update_read_receipts(thread.last_seen_at)
+
+        if self.config["bridge.backfill.msc2716"]:
+            await portal.enqueue_immediate_backfill(self, 1)
+        return len(forward_messages) > 0
 
     async def _maybe_update_proxy(self, source: str) -> None:
         if not self._listen_task:
@@ -482,7 +634,10 @@ class User(DBUser, BaseUser):
         else:
             self.log.debug(f"Not updating proxy: listen_task is still running? (caller: {source})")
 
-    async def sync(self) -> None:
+    async def sync(self, increment_total_backfilled_portals: bool = False) -> None:
+        await self.run_with_sync_lock(partial(self._sync, increment_total_backfilled_portals))
+
+    async def _sync(self, increment_total_backfilled_portals: bool = False) -> None:
         sleep_minutes = 2
         errors = 0
         while True:
@@ -533,28 +688,162 @@ class User(DBUser, BaseUser):
         if not self._listen_task:
             self.start_listen(is_after_sync=True)
 
-        max_age = self.config["bridge.portal_create_max_age"] * 1_000_000
-        limit = self.config["bridge.chat_sync_limit"]
-        create_limit = self.config["bridge.chat_create_limit"]
-        min_active_at = (time.time() * 1_000_000) - max_age
-        i = 0
-        await self.push_bridge_state(BridgeStateEvent.BACKFILLING)
-        async for thread in self.client.iter_inbox(start_at=resp):
-            try:
-                await self._sync_thread(
-                    thread=thread,
-                    allow_create=thread.last_activity_at > min_active_at and i < create_limit,
-                )
-            except Exception:
-                self.log.exception(f"Error syncing thread {thread.thread_id}")
-            i += 1
-            if i >= limit:
-                break
+        sync_count = min(
+            self.config["bridge.backfill.max_conversations"],
+            self.config["bridge.max_startup_thread_sync_count"],
+        )
+        self.log.debug(f"Fetching {sync_count} threads, 20 at a time...")
+
+        local_limit: int | None = sync_count
+        if sync_count == 0:
+            return
+        elif sync_count < 0:
+            local_limit = None
+
+        await self._sync_threads_with_delay(
+            self.client.iter_inbox(
+                self._update_seq_id_and_cursor, start_at=resp, local_limit=local_limit
+            ),
+            stop_when_threads_have_no_messages_to_backfill=True,
+            increment_total_backfilled_portals=increment_total_backfilled_portals,
+            local_limit=local_limit,
+        )
+
         try:
             await self.update_direct_chats()
         except Exception:
             self.log.exception("Error updating direct chat list")
 
+    async def backfill_threads(self):
+        try:
+            await self.run_with_sync_lock(self._backfill_threads)
+        except Exception:
+            self.log.exception("Error in thread backfill loop")
+
+    async def _backfill_threads(self):
+        assert self.client
+        if not self.config["bridge.backfill.enable"]:
+            return
+
+        max_conversations = self.config["bridge.backfill.max_conversations"] or 0
+        if 0 <= max_conversations <= (self.total_backfilled_portals or 0):
+            self.log.info("Backfill max_conversations count reached, not syncing any more portals")
+            return
+        elif self.thread_sync_completed:
+            self.log.debug("Thread backfill is marked as completed, not syncing more portals")
+            return
+        local_limit = (
+            max_conversations - (self.total_backfilled_portals or 0)
+            if max_conversations >= 0
+            else None
+        )
+
+        start_at = None
+        if self.oldest_cursor:
+            start_at = DMInboxResponse(
+                status="",
+                seq_id=self.seq_id,
+                snapshot_at_ms=0,
+                pending_requests_total=0,
+                has_pending_top_requests=False,
+                viewer=None,
+                inbox=DMInbox(
+                    threads=[],
+                    has_older=True,
+                    unseen_count=0,
+                    unseen_count_ts=0,
+                    blended_inbox_enabled=False,
+                    oldest_cursor=self.oldest_cursor,
+                ),
+            )
+        backoff = self.config.get("bridge.backfill.backoff.thread_list", 300)
+        await self._sync_threads_with_delay(
+            self.client.iter_inbox(
+                self._update_seq_id_and_cursor,
+                start_at=start_at,
+                local_limit=local_limit,
+                rate_limit_exceeded_backoff=backoff,
+            ),
+            increment_total_backfilled_portals=True,
+            local_limit=local_limit,
+        )
+        await self.update_direct_chats()
+
+    def _update_seq_id_and_cursor(self, seq_id: int, cursor: str | None):
+        self.seq_id = seq_id
+        if cursor:
+            self.oldest_cursor = cursor
+
+    async def _sync_threads_with_delay(
+        self,
+        threads: AsyncIterable[Thread],
+        increment_total_backfilled_portals: bool = False,
+        stop_when_threads_have_no_messages_to_backfill: bool = False,
+        local_limit: int | None = None,
+    ):
+        sync_delay = self.config["bridge.backfill.min_sync_thread_delay"]
+        last_thread_sync_ts = 0.0
+        found_thread_count = 0
+        async for thread in threads:
+            found_thread_count += 1
+            now = time.monotonic()
+            if now < last_thread_sync_ts + sync_delay:
+                delay = last_thread_sync_ts + sync_delay - now
+                self.log.debug("Thread sync is happening too quickly. Waiting for %ds", delay)
+                await asyncio.sleep(delay)
+
+            last_thread_sync_ts = time.monotonic()
+            had_new_messages = await self._sync_thread(thread)
+            if not had_new_messages and stop_when_threads_have_no_messages_to_backfill:
+                self.log.debug("Got to threads with no new messages. Stopping sync.")
+                return
+
+            if increment_total_backfilled_portals:
+                self.total_backfilled_portals = (self.total_backfilled_portals or 0) + 1
+            await self.update()
+        if local_limit is None or found_thread_count < local_limit:
+            if local_limit is None:
+                self.log.info(
+                    "Reached end of thread list with no limit, marking thread sync as completed"
+                )
+            else:
+                self.log.info(
+                    f"Reached end of thread list (got {found_thread_count} with "
+                    f"limit {local_limit}), marking thread sync as completed"
+                )
+            self.thread_sync_completed = True
+        await self.update()
+
+    async def run_with_sync_lock(self, func: Callable[[], Awaitable]):
+        with self._sync_lock:
+            retry_count = 0
+            while retry_count < 5:
+                try:
+                    retry_count += 1
+                    await func()
+
+                    # The sync was successful. Exit the loop.
+                    return
+                except IGNotLoggedInError as e:
+                    await self.send_bridge_notice(
+                        f"You have been logged out of Instagram: {e!s}",
+                        important=True,
+                        state_event=BridgeStateEvent.BAD_CREDENTIALS,
+                        error_code="ig-auth-error",
+                        error_message=str(e),
+                    )
+                    await self.logout(error=e)
+                    return
+                except Exception:
+                    self.log.exception(
+                        "Failed to sync threads. Waiting 30 seconds before retrying sync."
+                    )
+                    await asyncio.sleep(30)
+
+            # If we get here, it means that the sync has failed five times. If this happens, most
+            # likely something very bad has happened.
+            self.log.error("Failed to sync threads five times. Will not retry.")
+
     def start_listen(self, is_after_sync: bool = False) -> None:
         self.shutdown = False
         task = self._listen(
@@ -678,7 +967,17 @@ class User(DBUser, BaseUser):
         self._is_connected = False
         await self.update()
 
+    def stop_backfill_tasks(self) -> None:
+        if self._backfill_loop_task:
+            self._backfill_loop_task.cancel()
+            self._backfill_loop_task = None
+        if self._thread_sync_task:
+            self._thread_sync_task.cancel()
+            self._thread_sync_task = None
+
     async def logout(self, error: IGNotLoggedInError | None = None) -> None:
+        await self.stop_listen()
+        self.stop_backfill_tasks()
         if self.client and error is None:
             try:
                 await self.client.logout(one_tap_app_login=False)
@@ -712,6 +1011,7 @@ class User(DBUser, BaseUser):
         self.state = None
         self.seq_id = None
         self.snapshot_at_ms = None
+        self.thread_sync_completed = False
         self._is_logged_in = False
         await self.update()
 
@@ -752,7 +1052,6 @@ class User(DBUser, BaseUser):
                 )
                 return
         self.log.trace(f"Received message sync event {evt.message}")
-        await portal.backfill_lock.wait(f"{evt.message.op} {evt.message.item_id}")
         if evt.message.new_reaction:
             await portal.handle_instagram_reaction(
                 evt.message, remove=evt.message.op == Operation.REMOVE

+ 1 - 1
requirements.txt

@@ -4,7 +4,7 @@ commonmark>=0.8,<0.10
 aiohttp>=3,<4
 yarl>=1,<2
 attrs>=20.1
-mautrix>=0.18.6,<0.19
+mautrix>=0.18.9,<0.19
 asyncpg>=0.20,<0.28
 pycryptodome>=3,<4
 paho-mqtt>=1.5,<2

برخی فایل ها در این مقایسه diff نمایش داده نمی شوند زیرا تعداد فایل ها بسیار زیاد است