Browse Source

db/user: add columns to track thread sync status

Signed-off-by: Sumner Evans <sumner@beeper.com>
Sumner Evans 2 years ago
parent
commit
75c1d012a0

+ 1 - 0
mautrix_instagram/db/upgrade/__init__.py

@@ -28,4 +28,5 @@ from . import (
     v08_sync_sequence_id,
     v09_backfill_queue,
     v10_portal_infinite_backfill,
+    v11_per_user_thread_sync_status,
 )

+ 10 - 7
mautrix_instagram/db/upgrade/v00_latest_revision.py

@@ -18,7 +18,7 @@ from mautrix.util.async_db import Connection, Scheme
 from . import upgrade_table
 
 
-@upgrade_table.register(description="Latest revision", upgrades_to=10)
+@upgrade_table.register(description="Latest revision", upgrades_to=11)
 async def upgrade_latest(conn: Connection, scheme: Scheme) -> None:
     await conn.execute(
         """CREATE TABLE portal (
@@ -41,12 +41,15 @@ async def upgrade_latest(conn: Connection, scheme: Scheme) -> None:
     )
     await conn.execute(
         """CREATE TABLE "user" (
-            mxid           TEXT PRIMARY KEY,
-            igpk           BIGINT,
-            state          jsonb,
-            seq_id         BIGINT,
-            snapshot_at_ms BIGINT,
-            notice_room    TEXT
+            mxid                        TEXT PRIMARY KEY,
+            igpk                        BIGINT,
+            state                       jsonb,
+            seq_id                      BIGINT,
+            snapshot_at_ms              BIGINT,
+            notice_room                 TEXT,
+            oldest_cursor               TEXT,
+            total_backfilled_portals    INTEGER,
+            thread_sync_completed       BOOLEAN NOT NULL DEFAULT false
         )"""
     )
     await conn.execute(

+ 29 - 0
mautrix_instagram/db/upgrade/v11_per_user_thread_sync_status.py

@@ -0,0 +1,29 @@
+# mautrix-instagram - A Matrix-Instagram puppeting bridge.
+# Copyright (C) 2022 Tulir Asokan, Sumner Evans
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from mautrix.util.async_db import Connection
+
+from . import upgrade_table
+
+
+@upgrade_table.register(
+    description="Add columns to the user table to track thread sync status for backfill"
+)
+async def upgrade_v12(conn: Connection) -> None:
+    await conn.execute('ALTER TABLE "user" ADD COLUMN oldest_cursor TEXT')
+    await conn.execute('ALTER TABLE "user" ADD COLUMN total_backfilled_portals INTEGER')
+    await conn.execute(
+        'ALTER TABLE "user" ADD COLUMN thread_sync_completed BOOLEAN NOT NULL DEFAULT false'
+    )

+ 26 - 6
mautrix_instagram/db/user.py

@@ -37,6 +37,9 @@ class User:
     notice_room: RoomID | None
     seq_id: int | None
     snapshot_at_ms: int | None
+    oldest_cursor: str | None
+    total_backfilled_portals: int | None
+    thread_sync_completed: bool
 
     @property
     def _values(self):
@@ -47,18 +50,37 @@ class User:
             self.notice_room,
             self.seq_id,
             self.snapshot_at_ms,
+            self.oldest_cursor,
+            self.total_backfilled_portals,
+            self.thread_sync_completed,
         )
 
+    _columns = ",".join(
+        (
+            "mxid",
+            "igpk",
+            "state",
+            "notice_room",
+            "seq_id",
+            "snapshot_at_ms",
+            "oldest_cursor",
+            "total_backfilled_portals",
+            "thread_sync_completed",
+        )
+    )
+
     async def insert(self) -> None:
-        q = """
-        INSERT INTO "user" (mxid, igpk, state, notice_room, seq_id, snapshot_at_ms)
-        VALUES ($1, $2, $3, $4, $5, $6)
+        q = f"""
+        INSERT INTO "user" ({self._columns})
+        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
         """
         await self.db.execute(q, *self._values)
 
     async def update(self) -> None:
         q = """
-        UPDATE "user" SET igpk=$2, state=$3, notice_room=$4, seq_id=$5, snapshot_at_ms=$6
+        UPDATE "user"
+        SET igpk=$2, state=$3, notice_room=$4, seq_id=$5, snapshot_at_ms=$6,
+            oldest_cursor=$7, total_backfilled_portals=$8, thread_sync_completed=$9
         WHERE mxid=$1
         """
         await self.db.execute(q, *self._values)
@@ -73,8 +95,6 @@ class User:
         state_str = data.pop("state")
         return cls(state=AndroidState.parse_json(state_str) if state_str else None, **data)
 
-    _columns = "mxid, igpk, state, notice_room, seq_id, snapshot_at_ms"
-
     @classmethod
     async def get_by_mxid(cls, mxid: UserID) -> User | None:
         q = f'SELECT {cls._columns} FROM "user" WHERE mxid=$1'