Tulir Asokan 4 жил өмнө
parent
commit
1459158bfa

+ 98 - 0
mautrix_instagram/__main__.py

@@ -0,0 +1,98 @@
+# mautrix-instagram - A Matrix-Instagram puppeting bridge.
+# Copyright (C) 2020 Tulir Asokan
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from mautrix.types import UserID, RoomID
+from mautrix.bridge import Bridge
+from mautrix.bridge.state_store.asyncpg import PgBridgeStateStore
+from mautrix.util.async_db import Database
+
+from .config import Config
+from .db import upgrade_table, init as init_db
+from .user import User
+from .portal import Portal
+from .puppet import Puppet
+from .matrix import MatrixHandler
+from .version import version, linkified_version
+
+
+class InstagramBridge(Bridge):
+    name = "mautrix-instagram"
+    module = "mautrix_instagram"
+    command = "python -m mautrix-instagram"
+    description = "A Matrix-Instagram DM puppeting bridge."
+    repo_url = "https://github.com/tulir/mautrix-instagram"
+    real_user_content_key = "net.maunium.instagram.puppet"
+    version = version
+    markdown_version = linkified_version
+    config_class = Config
+    matrix_class = MatrixHandler
+
+    db: Database
+    config: Config
+    matrix: MatrixHandler
+    state_store: PgBridgeStateStore
+
+    def make_state_store(self) -> None:
+        self.state_store = PgBridgeStateStore(self.db, self.get_puppet, self.get_double_puppet)
+
+    def prepare_db(self) -> None:
+        self.db = Database(self.config["appservice.database"], upgrade_table=upgrade_table,
+                           loop=self.loop)
+        init_db(self.db)
+
+    def prepare_bridge(self) -> None:
+        super().prepare_bridge()
+
+    async def start(self) -> None:
+        await self.db.start()
+        await self.state_store.upgrade_table.upgrade(self.db.pool)
+        self.add_startup_actions(User.init_cls(self))
+        self.add_startup_actions(Puppet.init_cls(self))
+        Portal.init_cls(self)
+        if self.config["bridge.resend_bridge_info"]:
+            self.add_startup_actions(self.resend_bridge_info())
+        await super().start()
+
+    def prepare_stop(self) -> None:
+        self.add_shutdown_actions(user.stop() for user in User.by_igpk.values())
+        self.log.debug("Stopping puppet syncers")
+        for puppet in Puppet.by_custom_mxid.values():
+            puppet.stop()
+
+    async def resend_bridge_info(self) -> None:
+        self.config["bridge.resend_bridge_info"] = False
+        self.config.save()
+        self.log.info("Re-sending bridge info state event to all portals")
+        async for portal in Portal.all_with_room():
+            await portal.update_bridge_info()
+        self.log.info("Finished re-sending bridge info state events")
+
+    async def get_portal(self, room_id: RoomID) -> Portal:
+        return await Portal.get_by_mxid(room_id)
+
+    async def get_puppet(self, user_id: UserID, create: bool = False) -> Puppet:
+        return await Puppet.get_by_mxid(user_id, create=create)
+
+    async def get_double_puppet(self, user_id: UserID) -> Puppet:
+        return await Puppet.get_by_custom_mxid(user_id)
+
+    async def get_user(self, user_id: UserID, create: bool = True) -> User:
+        return await User.get_by_mxid(user_id, create=create)
+
+    def is_bridge_ghost(self, user_id: UserID) -> bool:
+        return bool(Puppet.get_id_from_mxid(user_id))
+
+
+InstagramBridge().run()

+ 1 - 1
mautrix_instagram/db/__init__.py

@@ -13,4 +13,4 @@ def init(db: Database) -> None:
         table.db = db
 
 
-__all__ = ["upgrade_table", "User", "Puppet", "Portal", "Message", "Reaction"]
+__all__ = ["upgrade_table", "User", "Puppet", "Portal", "Message", "Reaction", "init"]

+ 5 - 1
mautrix_instagram/db/portal.py

@@ -61,9 +61,13 @@ class Portal:
         return cls._from_row(row)
 
     @classmethod
-    async def get_by_thread_id(cls, thread_id: str, receiver: int = 0) -> Optional['Portal']:
+    async def get_by_thread_id(cls, thread_id: str, receiver: int,
+                               rec_must_match: bool = True) -> Optional['Portal']:
         q = ("SELECT thread_id, receiver, other_user_pk, mxid, name, encrypted "
              "FROM portal WHERE thread_id=$1 AND receiver=$2")
+        if not rec_must_match:
+            q = ("SELECT thread_id, receiver, other_user_pk, mxid, name, encrypted "
+                 "FROM portal WHERE thread_id=$1 AND (receiver=$2 OR receiver=0)")
         row = await cls.db.fetchrow(q, thread_id, receiver)
         if not row:
             return None

+ 14 - 10
mautrix_instagram/db/puppet.py

@@ -34,6 +34,8 @@ class Puppet:
     username: Optional[str]
     photo_id: Optional[str]
     photo_mxc: Optional[ContentURI]
+    name_set: bool
+    avatar_set: bool
 
     is_registered: bool
 
@@ -43,20 +45,22 @@ class Puppet:
     base_url: Optional[URL]
 
     async def insert(self) -> None:
-        q = ("INSERT INTO puppet (pk, name, username, photo_id, photo_mxc, is_registered,"
-             "                    custom_mxid, access_token, next_batch, base_url) "
-             "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)")
+        q = ("INSERT INTO puppet (pk, name, username, photo_id, photo_mxc, name_set, avatar_set,"
+             "                    is_registered, custom_mxid, access_token, next_batch, base_url) "
+             "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)")
         await self.db.execute(q, self.pk, self.name, self.username, self.photo_id, self.photo_mxc,
                               self.is_registered, self.custom_mxid, self.access_token,
                               self.next_batch, str(self.base_url) if self.base_url else None)
 
     async def update(self) -> None:
-        q = ("UPDATE puppet SET name=$2, username=$3, photo_id=$4, photo_mxc=$5, is_registered=$6,"
-             "                  custom_mxid=$7, access_token=$8, next_batch=$9, base_url=$10 "
+        q = ("UPDATE puppet SET name=$2, username=$3, photo_id=$4, photo_mxc=$5, name_set=$6,"
+             "                  avatar_set=$7, is_registered=$8, custom_mxid=$9, access_token=$10,"
+             "                  next_batch=$11, base_url=$12 "
              "WHERE pk=$1")
         await self.db.execute(q, self.pk, self.name, self.username, self.photo_id, self.photo_mxc,
-                              self.is_registered, self.custom_mxid, self.access_token,
-                              self.next_batch, str(self.base_url) if self.base_url else None)
+                              self.name_set, self.avatar_set, self.is_registered, self.custom_mxid,
+                              self.access_token, self.next_batch,
+                              str(self.base_url) if self.base_url else None)
 
     @classmethod
     def _from_row(cls, row: asyncpg.Record) -> 'Puppet':
@@ -67,7 +71,7 @@ class Puppet:
 
     @classmethod
     async def get_by_pk(cls, pk: int) -> Optional['Puppet']:
-        q = ("SELECT pk, name, username, photo_id, photo_mxc, is_registered,"
+        q = ("SELECT pk, name, username, photo_id, photo_mxc, name_set, avatar_set, is_registered,"
              "       custom_mxid, access_token, next_batch, base_url "
              "FROM puppet WHERE igpk=$1")
         row = await cls.db.fetchrow(q, pk)
@@ -77,7 +81,7 @@ class Puppet:
 
     @classmethod
     async def get_by_custom_mxid(cls, mxid: UserID) -> Optional['Puppet']:
-        q = ("SELECT pk, name, username, photo_id, photo_mxc, is_registered,"
+        q = ("SELECT pk, name, username, photo_id, photo_mxc, name_set, avatar_set, is_registered,"
              "       custom_mxid, access_token, next_batch, base_url "
              "FROM puppet WHERE custom_mxid=$1")
         row = await cls.db.fetchrow(q, mxid)
@@ -87,7 +91,7 @@ class Puppet:
 
     @classmethod
     async def all_with_custom_mxid(cls) -> List['Puppet']:
-        q = ("SELECT pk, name, username, photo_id, photo_mxc, is_registered,"
+        q = ("SELECT pk, name, username, photo_id, photo_mxc, name_set, avatar_set, is_registered,"
              "       custom_mxid, access_token, next_batch, base_url "
              "FROM puppet WHERE custom_mxid IS NOT NULL")
         rows = await cls.db.fetch(q)

+ 2 - 0
mautrix_instagram/db/upgrade.py

@@ -43,6 +43,8 @@ async def upgrade_v1(conn: Connection) -> None:
         username      TEXT,
         photo_id      TEXT,
         photo_mxc     TEXT,
+        name_set      BOOLEAN NOT NULL DEFAULT false,
+        avatar_set    BOOLEAN NOT NULL DEFAULT false,
         is_registered BOOLEAN NOT NULL DEFAULT false,
         custom_mxid   TEXT,
         access_token  TEXT,

+ 126 - 0
mautrix_instagram/matrix.py

@@ -0,0 +1,126 @@
+# mautrix-instagram - A Matrix-Instagram puppeting bridge.
+# Copyright (C) 2020 Tulir Asokan
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from typing import List, Union, TYPE_CHECKING
+
+from mautrix.bridge import BaseMatrixHandler
+from mautrix.types import (Event, ReactionEvent, MessageEvent, StateEvent, EncryptedEvent, RoomID,
+                           EventID, UserID, ReactionEventContent, RelationType, EventType,
+                           ReceiptEvent, TypingEvent, PresenceEvent, RedactionEvent,
+                           SingleReceiptEventContent)
+
+from .db import Message as DBMessage
+from . import puppet as pu, portal as po, user as u
+
+if TYPE_CHECKING:
+    from .__main__ import InstagramBridge
+
+
+class MatrixHandler(BaseMatrixHandler):
+    def __init__(self, bridge: 'InstagramBridge') -> None:
+        prefix, suffix = bridge.config["bridge.username_template"].format(userid=":").split(":")
+        homeserver = bridge.config["homeserver.domain"]
+        self.user_id_prefix = f"@{prefix}"
+        self.user_id_suffix = f"{suffix}:{homeserver}"
+
+        super().__init__(bridge=bridge)
+
+    def filter_matrix_event(self, evt: Event) -> bool:
+        if isinstance(evt, (ReceiptEvent, TypingEvent)):
+            return False
+        elif not isinstance(evt, (ReactionEvent, MessageEvent, StateEvent, EncryptedEvent,
+                                RedactionEvent)):
+            return True
+        return (evt.sender == self.az.bot_mxid
+                or pu.Puppet.get_id_from_mxid(evt.sender) is not None)
+
+    async def send_welcome_message(self, room_id: RoomID, inviter: 'u.User') -> None:
+        await super().send_welcome_message(room_id, inviter)
+        if not inviter.notice_room:
+            inviter.notice_room = room_id
+            await inviter.update()
+            await self.az.intent.send_notice(room_id, "This room has been marked as your "
+                                                      "Instagram bridge notice room.")
+
+    async def handle_leave(self, room_id: RoomID, user_id: UserID, event_id: EventID) -> None:
+        portal = await po.Portal.get_by_mxid(room_id)
+        if not portal:
+            return
+
+        user = await u.User.get_by_mxid(user_id, create=False)
+        if not user:
+            return
+
+        await portal.handle_matrix_leave(user)
+
+    @staticmethod
+    async def handle_redaction(room_id: RoomID, user_id: UserID, event_id: EventID,
+                               redaction_event_id: EventID) -> None:
+        user = await u.User.get_by_mxid(user_id)
+        if not user:
+            return
+
+        portal = await po.Portal.get_by_mxid(room_id)
+        if not portal:
+            return
+
+        await portal.handle_matrix_redaction(user, event_id, redaction_event_id)
+
+    @classmethod
+    async def handle_reaction(cls, room_id: RoomID, user_id: UserID, event_id: EventID,
+                              content: ReactionEventContent) -> None:
+        if content.relates_to.rel_type != RelationType.ANNOTATION:
+            cls.log.debug(f"Ignoring m.reaction event in {room_id} from {user_id} with unexpected "
+                          f"relation type {content.relates_to.rel_type}")
+            return
+        user = await u.User.get_by_mxid(user_id)
+        if not user:
+            return
+
+        portal = await po.Portal.get_by_mxid(room_id)
+        if not portal:
+            return
+
+        await portal.handle_matrix_reaction(user, event_id, content.relates_to.event_id,
+                                            content.relates_to.key)
+
+    async def handle_read_receipt(self, user: 'u.User', portal: 'po.Portal', event_id: EventID,
+                                  data: SingleReceiptEventContent) -> None:
+        message = await DBMessage.get_by_mxid(event_id, portal.mxid)
+        if not message:
+            return
+        # TODO implement
+        # user.log.debug(f"Marking messages in {portal.thread_id} read up to {message.item_id}")
+        # await user.client.conversation(portal.thread_id).mark_read(message.item_id)
+
+    @staticmethod
+    async def handle_typing(room_id: RoomID, typing: List[UserID]) -> None:
+        # TODO implement
+        pass
+
+    async def handle_event(self, evt: Event) -> None:
+        if evt.type == EventType.ROOM_REDACTION:
+            evt: RedactionEvent
+            await self.handle_redaction(evt.room_id, evt.sender, evt.redacts, evt.event_id)
+        elif evt.type == EventType.REACTION:
+            evt: ReactionEvent
+            await self.handle_reaction(evt.room_id, evt.sender, evt.event_id, evt.content)
+
+    async def handle_ephemeral_event(self, evt: Union[ReceiptEvent, PresenceEvent, TypingEvent]
+                                     ) -> None:
+        if evt.type == EventType.TYPING:
+            await self.handle_typing(evt.room_id, evt.content.user_ids)
+        else:
+            await super().handle_ephemeral_event(evt)

+ 591 - 0
mautrix_instagram/portal.py

@@ -0,0 +1,591 @@
+# mautrix-instagram - A Matrix-Instagram puppeting bridge.
+# Copyright (C) 2020 Tulir Asokan
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from typing import (Dict, Tuple, Optional, List, Deque, Set, Any, Union, AsyncGenerator,
+                    Awaitable, TYPE_CHECKING, cast)
+from collections import deque
+from uuid import uuid4
+import asyncio
+
+import magic
+
+from mauigpapi.types import Thread, ThreadUser, ThreadItem
+from mautrix.appservice import AppService, IntentAPI
+from mautrix.bridge import BasePortal, NotificationDisabler
+from mautrix.types import (EventID, MessageEventContent, RoomID, EventType, MessageType,
+                           TextMessageEventContent)
+from mautrix.errors import MatrixError
+from mautrix.util.simple_lock import SimpleLock
+
+from .db import Portal as DBPortal, Message as DBMessage, Reaction as DBReaction
+from .config import Config
+from . import user as u, puppet as p, matrix as m
+
+if TYPE_CHECKING:
+    from .__main__ import InstagramBridge
+
+try:
+    from mautrix.crypto.attachments import encrypt_attachment, decrypt_attachment
+except ImportError:
+    encrypt_attachment = decrypt_attachment = None
+
+StateBridge = EventType.find("m.bridge", EventType.Class.STATE)
+StateHalfShotBridge = EventType.find("uk.half-shot.bridge", EventType.Class.STATE)
+
+
+class Portal(DBPortal, BasePortal):
+    by_mxid: Dict[RoomID, 'Portal'] = {}
+    by_thread_id: Dict[Tuple[str, int], 'Portal'] = {}
+    config: Config
+    matrix: 'm.MatrixHandler'
+    az: AppService
+    private_chat_portal_meta: bool
+
+    _main_intent: Optional[IntentAPI]
+    _create_room_lock: asyncio.Lock
+    backfill_lock: SimpleLock
+    _msgid_dedup: Deque[str]
+    _reqid_dedup: Set[str]
+    _reaction_dedup: Deque[Tuple[str, int, str]]
+
+    _main_intent: IntentAPI
+    _last_participant_update: Set[int]
+    _reaction_lock: asyncio.Lock
+
+    def __init__(self, thread_id: str, receiver: int, other_user_pk: Optional[int],
+                 mxid: Optional[RoomID] = None, name: Optional[str] = None, encrypted: bool = False
+                 ) -> None:
+        super().__init__(thread_id, receiver, other_user_pk, mxid, name, encrypted)
+        self._create_room_lock = asyncio.Lock()
+        self.log = self.log.getChild(thread_id)
+        self._msgid_dedup = deque(maxlen=100)
+        self._reaction_dedup = deque(maxlen=100)
+        self._reqid_dedup = set()
+        self._last_participant_update = set()
+
+        self.backfill_lock = SimpleLock("Waiting for backfilling to finish before handling %s",
+                                        log=self.log)
+        self._main_intent = None
+        self._reaction_lock = asyncio.Lock()
+
+    @property
+    def is_direct(self) -> bool:
+        return self.other_user_pk is not None
+
+    @property
+    def main_intent(self) -> IntentAPI:
+        if not self._main_intent:
+            raise ValueError("Portal must be postinit()ed before main_intent can be used")
+        return self._main_intent
+
+    @classmethod
+    def init_cls(cls, bridge: 'InstagramBridge') -> None:
+        cls.config = bridge.config
+        cls.matrix = bridge.matrix
+        cls.az = bridge.az
+        cls.loop = bridge.loop
+        cls.bridge = bridge
+        cls.private_chat_portal_meta = cls.config["bridge.private_chat_portal_meta"]
+        NotificationDisabler.puppet_cls = p.Puppet
+        NotificationDisabler.config_enabled = cls.config["bridge.backfill.disable_notifications"]
+
+    # region Misc
+
+    async def _send_delivery_receipt(self, event_id: EventID) -> None:
+        if event_id and self.config["bridge.delivery_receipts"]:
+            try:
+                await self.az.intent.mark_read(self.mxid, event_id)
+            except Exception:
+                self.log.exception("Failed to send delivery receipt for %s", event_id)
+
+    async def _upsert_reaction(self, existing: DBReaction, intent: IntentAPI, mxid: EventID,
+                               message: DBMessage, sender: Union['u.User', 'p.Puppet'],
+                               reaction: str) -> None:
+        if existing:
+            self.log.debug(f"_upsert_reaction redacting {existing.mxid} and inserting {mxid}"
+                           f" (message: {message.mxid})")
+            await intent.redact(existing.mx_room, existing.mxid)
+            await existing.edit(reaction=reaction, mxid=mxid, mx_room=message.mx_room)
+        else:
+            self.log.debug(f"_upsert_reaction inserting {mxid} (message: {message.mxid})")
+            await DBReaction(mxid=mxid, mx_room=message.mx_room, ig_item_id=message.item_id,
+                             ig_receiver=self.receiver, ig_sender=sender.pk, reaction=reaction
+                             ).insert()
+
+    # endregion
+    # region Matrix event handling
+
+    async def handle_matrix_message(self, sender: 'u.User', message: MessageEventContent,
+                                    event_id: EventID) -> None:
+        if not sender.client:
+            self.log.debug(f"Ignoring message {event_id} as user is not connected")
+            return
+        elif ((message.get(self.bridge.real_user_content_key,
+                           False) and await p.Puppet.get_by_custom_mxid(sender.mxid))):
+            self.log.debug(f"Ignoring puppet-sent message by confirmed puppet user {sender.mxid}")
+            return
+        request_id = str(uuid4())
+        self._reqid_dedup.add(request_id)
+        if message.msgtype in (MessageType.EMOTE, MessageType.TEXT):
+            text = message.body
+            if message.msgtype == MessageType.EMOTE:
+                text = f"/me {text}"
+            resp = await sender.mqtt.send_text(self.thread_id, text=text,
+                                               client_context=request_id)
+        elif message.msgtype.is_media:
+            if message.file and decrypt_attachment:
+                data = await self.main_intent.download_media(message.file.url)
+                data = decrypt_attachment(data, message.file.key.key,
+                                          message.file.hashes.get("sha256"), message.file.iv)
+            else:
+                data = await self.main_intent.download_media(message.url)
+            mime_type = message.info.mimetype or magic.from_buffer(data, mime=True)
+            if mime_type == "image/jpeg":
+                upload_resp = await sender.client.upload_jpeg_photo(data)
+                resp = await sender.mqtt.send_media(self.thread_id, upload_resp.upload_id,
+                                                    client_context=request_id)
+            else:
+                # TODO add link to media for unsupported file types
+                return
+        else:
+            return
+        if resp.status != "ok":
+            self.log.warning(f"Failed to handle {event_id}: {resp}")
+        else:
+            self._msgid_dedup.appendleft(resp.payload.item_id)
+            await DBMessage(mxid=event_id, mx_room=self.mxid, item_id=resp.payload.item_id,
+                            receiver=self.receiver).insert()
+            self._reqid_dedup.remove(request_id)
+            await self._send_delivery_receipt(event_id)
+            self.log.debug(f"Handled Matrix message {event_id} -> {resp.payload.item_id}")
+
+    async def handle_matrix_reaction(self, sender: 'u.User', event_id: EventID,
+                                     reacting_to: EventID, emoji: str) -> None:
+        message = await DBMessage.get_by_mxid(reacting_to, self.mxid)
+        if not message:
+            self.log.debug(f"Ignoring reaction to unknown event {reacting_to}")
+            return
+
+        existing = await DBReaction.get_by_item_id(message.item_id, message.receiver, sender.igpk)
+        if existing and existing.reaction == emoji:
+            return
+
+        dedup_id = (message.item_id, sender.igpk, emoji)
+        self._reaction_dedup.appendleft(dedup_id)
+        async with self._reaction_lock:
+            # TODO check response?
+            await sender.mqtt.send_reaction(self.thread_id, item_id=message.item_id, emoji=emoji)
+            await self._upsert_reaction(existing, self.main_intent, event_id, message, sender,
+                                        emoji)
+            self.log.trace(f"{sender.mxid} reacted to {message.item_id} with {emoji}")
+        await self._send_delivery_receipt(event_id)
+
+    async def handle_matrix_redaction(self, sender: 'u.User', event_id: EventID,
+                                      redaction_event_id: EventID) -> None:
+        if not self.mxid:
+            return
+
+        # TODO implement
+        # reaction = await DBReaction.get_by_mxid(event_id, self.mxid)
+        # if reaction:
+        #     try:
+        #         await reaction.delete()
+        #         await sender.client.conversation(self.twid).delete_reaction(reaction.tw_msgid,
+        #                                                                     reaction.reaction)
+        #         await self._send_delivery_receipt(redaction_event_id)
+        #         self.log.trace(f"Removed {reaction} after Matrix redaction")
+        #     except Exception:
+        #         self.log.exception("Removing reaction failed")
+
+    async def handle_matrix_leave(self, user: 'u.User') -> None:
+        if self.is_direct:
+            self.log.info(f"{user.mxid} left private chat portal with {self.other_user_pk}")
+            if user.igpk == self.receiver:
+                self.log.info(f"{user.mxid} was the recipient of this portal. "
+                              "Cleaning up and deleting...")
+                await self.cleanup_and_delete()
+        else:
+            self.log.debug(f"{user.mxid} left portal to {self.thread_id}")
+            # TODO cleanup if empty
+
+    # endregion
+    # region Instagram event handling
+
+    async def handle_instagram_item(self, source: 'u.User', sender: 'p.Puppet', item: ThreadItem
+                                    ) -> None:
+        if item.client_context in self._reqid_dedup:
+            self.log.debug(f"Ignoring message {item.item_id} by {item.user_id}"
+                           " as it was sent by us (client_context in dedup queue)")
+        elif item.item_id in self._msgid_dedup:
+            self.log.debug(f"Ignoring message {item.item_id} by {item.user_id}"
+                           " as it was already handled (message.id in dedup queue)")
+        elif await DBMessage.get_by_item_id(item.item_id, self.receiver) is not None:
+            self.log.debug(f"Ignoring message {item.item_id} by {item.user_id}"
+                           " as it was already handled (message.id found in database)")
+        else:
+            self._msgid_dedup.appendleft(item.item_id)
+            intent = sender.intent_for(self)
+            event_id = None
+            if item.text:
+                content = TextMessageEventContent(msgtype=MessageType.TEXT, body=item.text)
+                # TODO timestamp is probably not milliseconds
+                event_id = await self._send_message(intent, content, timestamp=item.timestamp)
+            # TODO handle attachments and reactions
+            if event_id:
+                await DBMessage(mxid=event_id, mx_room=self.mxid, item_id=item.item_id,
+                                receiver=self.receiver).insert()
+                await self._send_delivery_receipt(event_id)
+                self.log.debug(f"Handled Instagram message {item.item_id} -> {event_id}")
+
+    # endregion
+    # region Updating portal info
+
+    async def update_info(self, thread: Thread) -> None:
+        changed = await self._update_name(thread.thread_title)
+        if changed:
+            await self.update_bridge_info()
+            await self.update()
+        await self._update_participants(thread.users)
+        # TODO update power levels with thread.admin_user_ids
+
+    async def _update_name(self, name: str) -> bool:
+        if self.name != name:
+            self.name = name
+            if self.mxid:
+                await self.main_intent.set_room_name(self.mxid, name)
+            return True
+        return False
+
+    async def _update_participants(self, users: List[ThreadUser]) -> None:
+        if not self.mxid:
+            return
+
+        # Make sure puppets who should be here are here
+        for user in users:
+            puppet = await p.Puppet.get_by_pk(user.pk)
+            await puppet.intent_for(self).ensure_joined(self.mxid)
+
+        # Kick puppets who shouldn't be here
+        current_members = {int(user.pk) for user in users}
+        for user_id in await self.main_intent.get_room_members(self.mxid):
+            pk = p.Puppet.get_id_from_mxid(user_id)
+            if pk and pk not in current_members:
+                await self.main_intent.kick_user(self.mxid, p.Puppet.get_mxid_from_id(pk),
+                                                 reason="User had left this Instagram DM")
+
+    # endregion
+    # region Backfilling
+
+    async def backfill(self, source: 'u.User', is_initial: bool = False) -> None:
+        if not is_initial:
+            raise RuntimeError("Non-initial backfilling is not supported")
+        limit = (self.config["bridge.backfill.initial_limit"] if is_initial
+                 else self.config["bridge.backfill.missed_limit"])
+        if limit == 0:
+            return
+        elif limit < 0:
+            limit = None
+        with self.backfill_lock:
+            await self._backfill(source, is_initial, limit)
+
+    async def _backfill(self, source: 'u.User', is_initial: bool, limit: int) -> None:
+        self.log.debug("Backfilling history through %s", source.mxid)
+
+        entries = await self._fetch_backfill_items(source, is_initial, limit)
+        if not entries:
+            self.log.debug("Didn't get any items to backfill from server")
+            return
+
+        self.log.debug("Got %d entries from server", len(entries))
+
+        backfill_leave = await self._invite_own_puppet_backfill(source)
+        async with NotificationDisabler(self.mxid, source):
+            for entry in reversed(entries):
+                sender = await p.Puppet.get_by_pk(int(entry.user_id))
+                await self.handle_instagram_item(source, sender, entry)
+        for intent in backfill_leave:
+            self.log.trace("Leaving room with %s post-backfill", intent.mxid)
+            await intent.leave_room(self.mxid)
+        self.log.info("Backfilled %d messages through %s", len(entries), source.mxid)
+
+    async def _fetch_backfill_items(self, source: 'u.User', is_initial: bool, limit: int
+                                    ) -> List[ThreadItem]:
+        items = []
+        self.log.debug("Fetching up to %d messages through %s", limit, source.igpk)
+        async for item in source.client.iter_thread(self.thread_id):
+            if len(items) >= limit:
+                self.log.debug(f"Fetched {len(items)} messages (the limit)")
+                break
+            elif not is_initial:
+                msg = await DBMessage.get_by_item_id(item.item_id, receiver=self.receiver)
+                if msg is not None:
+                    self.log.debug(f"Fetched {len(items)} messages and hit a message"
+                                   " that's already in the database.")
+                    break
+            items.append(item)
+        return items
+
+    async def _invite_own_puppet_backfill(self, source: 'u.User') -> Set[IntentAPI]:
+        backfill_leave = set()
+        # TODO we should probably only invite the puppet when needed
+        if self.config["bridge.backfill.invite_own_puppet"]:
+            self.log.debug("Adding %s's default puppet to room for backfilling", source.mxid)
+            sender = await p.Puppet.get_by_pk(source.igpk)
+            await self.main_intent.invite_user(self.mxid, sender.default_mxid)
+            await sender.default_mxid_intent.join_room_by_id(self.mxid)
+            backfill_leave.add(sender.default_mxid_intent)
+        return backfill_leave
+
+    # endregion
+    # region Bridge info state event
+
+    @property
+    def bridge_info_state_key(self) -> str:
+        return f"net.maunium.instagram://instagram/{self.thread_id}"
+
+    @property
+    def bridge_info(self) -> Dict[str, Any]:
+        return {
+            "bridgebot": self.az.bot_mxid,
+            "creator": self.main_intent.mxid,
+            "protocol": {
+                "id": "instagram",
+                "displayname": "Instagram DM",
+                "avatar_url": self.config["appservice.bot_avatar"],
+            },
+            "channel": {
+                "id": self.thread_id,
+                "displayname": self.name,
+            }
+        }
+
+    async def update_bridge_info(self) -> None:
+        if not self.mxid:
+            self.log.debug("Not updating bridge info: no Matrix room created")
+            return
+        try:
+            self.log.debug("Updating bridge info...")
+            await self.main_intent.send_state_event(self.mxid, StateBridge,
+                                                    self.bridge_info, self.bridge_info_state_key)
+            # TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec
+            await self.main_intent.send_state_event(self.mxid, StateHalfShotBridge,
+                                                    self.bridge_info, self.bridge_info_state_key)
+        except Exception:
+            self.log.warning("Failed to update bridge info", exc_info=True)
+
+    # endregion
+    # region Creating Matrix rooms
+
+    async def create_matrix_room(self, source: 'u.User', info: Thread) -> Optional[RoomID]:
+        if self.mxid:
+            try:
+                await self._update_matrix_room(source, info)
+            except Exception:
+                self.log.exception("Failed to update portal")
+            return self.mxid
+        async with self._create_room_lock:
+            return await self._create_matrix_room(source, info)
+
+    async def _update_matrix_room(self, source: 'u.User', info: Thread) -> None:
+        await self.main_intent.invite_user(self.mxid, source.mxid, check_cache=True)
+        puppet = await p.Puppet.get_by_custom_mxid(source.mxid)
+        if puppet:
+            did_join = await puppet.intent.ensure_joined(self.mxid)
+            if did_join and self.is_direct:
+                await source.update_direct_chats({self.main_intent.mxid: [self.mxid]})
+
+        await self.update_info(info)
+
+        # TODO
+        # up = DBUserPortal.get(source.fbid, self.fbid, self.fb_receiver)
+        # if not up:
+        #     in_community = await source._community_helper.add_room(source._community_id, self.mxid)
+        #     DBUserPortal(user=source.fbid, portal=self.fbid, portal_receiver=self.fb_receiver,
+        #                  in_community=in_community).insert()
+        # elif not up.in_community:
+        #     in_community = await source._community_helper.add_room(source._community_id, self.mxid)
+        #     up.edit(in_community=in_community)
+
+    async def _create_matrix_room(self, source: 'u.User', info: Thread) -> Optional[RoomID]:
+        if self.mxid:
+            await self._update_matrix_room(source, info)
+            return self.mxid
+        await self.update_info(info)
+        self.log.debug("Creating Matrix room")
+        name: Optional[str] = None
+        initial_state = [{
+            "type": str(StateBridge),
+            "state_key": self.bridge_info_state_key,
+            "content": self.bridge_info,
+        }, {
+            # TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec
+            "type": str(StateHalfShotBridge),
+            "state_key": self.bridge_info_state_key,
+            "content": self.bridge_info,
+        }]
+        invites = [source.mxid]
+        if self.config["bridge.encryption.default"] and self.matrix.e2ee:
+            self.encrypted = True
+            initial_state.append({
+                "type": "m.room.encryption",
+                "content": {"algorithm": "m.megolm.v1.aes-sha2"},
+            })
+            if self.is_direct:
+                invites.append(self.az.bot_mxid)
+        if self.encrypted or self.private_chat_portal_meta or not self.is_direct:
+            name = self.name
+        if self.config["appservice.community_id"]:
+            initial_state.append({
+                "type": "m.room.related_groups",
+                "content": {"groups": [self.config["appservice.community_id"]]},
+            })
+
+        # We lock backfill lock here so any messages that come between the room being created
+        # and the initial backfill finishing wouldn't be bridged before the backfill messages.
+        with self.backfill_lock:
+            self.mxid = await self.main_intent.create_room(name=name, is_direct=self.is_direct,
+                                                           initial_state=initial_state,
+                                                           invitees=invites)
+            if not self.mxid:
+                raise Exception("Failed to create room: no mxid returned")
+
+            if self.encrypted and self.matrix.e2ee and self.is_direct:
+                try:
+                    await self.az.intent.ensure_joined(self.mxid)
+                except Exception:
+                    self.log.warning("Failed to add bridge bot "
+                                     f"to new private chat {self.mxid}")
+
+            await self.update()
+            self.log.debug(f"Matrix room created: {self.mxid}")
+            self.by_mxid[self.mxid] = self
+            if not self.is_direct:
+                await self._update_participants(info.users)
+
+            puppet = await p.Puppet.get_by_custom_mxid(source.mxid)
+            if puppet:
+                try:
+                    await puppet.intent.join_room_by_id(self.mxid)
+                    if self.is_direct:
+                        await source.update_direct_chats({self.main_intent.mxid: [self.mxid]})
+                except MatrixError:
+                    self.log.debug("Failed to join custom puppet into newly created portal",
+                                   exc_info=True)
+
+            # TODO
+            # in_community = await source._community_helper.add_room(source._community_id, self.mxid)
+            # DBUserPortal(user=source.fbid, portal=self.fbid, portal_receiver=self.fb_receiver,
+            #              in_community=in_community).upsert()
+
+            try:
+                await self.backfill(source, is_initial=True)
+            except Exception:
+                self.log.exception("Failed to backfill new portal")
+
+        return self.mxid
+
+    # endregion
+    # region Database getters
+
+    async def postinit(self) -> None:
+        self.by_thread_id[(self.thread_id, self.receiver)] = self
+        if self.mxid:
+            self.by_mxid[self.mxid] = self
+        self._main_intent = ((await p.Puppet.get_by_pk(self.other_user_pk)).default_mxid_intent
+                             if self.other_user_pk else self.az.intent)
+
+    async def delete(self) -> None:
+        await DBMessage.delete_all(self.mxid)
+        self.by_mxid.pop(self.mxid, None)
+        self.mxid = None
+        self.encrypted = False
+        await self.update()
+
+    async def save(self) -> None:
+        await self.update()
+
+    @classmethod
+    def all_with_room(cls) -> AsyncGenerator['Portal', None]:
+        return cls._db_to_portals(super().all_with_room())
+
+    @classmethod
+    def find_private_chats_with(cls, other_user: int) -> AsyncGenerator['Portal', None]:
+        return cls._db_to_portals(super().find_private_chats_with(other_user))
+
+    @classmethod
+    async def _db_to_portals(cls, query: Awaitable[List['Portal']]
+                             ) -> AsyncGenerator['Portal', None]:
+        portals = await query
+        for index, portal in enumerate(portals):
+            try:
+                yield cls.by_thread_id[(portal.thread_id, portal.receiver)]
+            except KeyError:
+                await portal.postinit()
+                yield portal
+
+    @classmethod
+    async def get_by_mxid(cls, mxid: RoomID) -> Optional['Portal']:
+        try:
+            return cls.by_mxid[mxid]
+        except KeyError:
+            pass
+
+        portal = cast(cls, await super().get_by_mxid(mxid))
+        if portal is not None:
+            await portal.postinit()
+            return portal
+
+        return None
+
+    @classmethod
+    async def get_by_thread_id(cls, thread_id: str, receiver: int,
+                               is_group: Optional[bool] = None,
+                               other_user_pk: Optional[int] = None) -> Optional['Portal']:
+        if is_group and receiver != 0:
+            receiver = 0
+        try:
+            return cls.by_thread_id[(thread_id, receiver)]
+        except KeyError:
+            pass
+        if is_group is None and receiver != 0:
+            try:
+                return cls.by_thread_id[(thread_id, 0)]
+            except KeyError:
+                pass
+
+        portal = cast(cls, await super().get_by_thread_id(thread_id, receiver,
+                                                          rec_must_match=is_group is not None))
+        if portal is not None:
+            await portal.postinit()
+            return portal
+
+        if is_group is not None:
+            portal = cls(thread_id, receiver, other_user_pk=other_user_pk)
+            await portal.insert()
+            await portal.postinit()
+            return portal
+
+        return None
+
+    @classmethod
+    async def get_by_thread(cls, thread: Thread, receiver: int) -> Optional['Portal']:
+        if thread.is_group:
+            receiver = 0
+            other_user_pk = None
+        else:
+            other_user_pk = thread.users[0].pk
+        return await cls.get_by_thread_id(thread.thread_id, receiver, is_group=thread.is_group,
+                                          other_user_pk=other_user_pk)
+    # endregion

+ 205 - 0
mautrix_instagram/puppet.py

@@ -0,0 +1,205 @@
+# mautrix-instagram - A Matrix-Instagram puppeting bridge.
+# Copyright (C) 2020 Tulir Asokan
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from typing import Optional, Dict, AsyncIterable, Awaitable, AsyncGenerator, TYPE_CHECKING, cast
+
+from aiohttp import ClientSession
+from yarl import URL
+
+from mauigpapi.types import BaseResponseUser
+from mautrix.bridge import BasePuppet
+from mautrix.appservice import IntentAPI
+from mautrix.types import ContentURI, UserID, SyncToken, RoomID
+from mautrix.util.simple_template import SimpleTemplate
+
+from .db import Puppet as DBPuppet
+from .config import Config
+from . import portal as p
+
+if TYPE_CHECKING:
+    from .__main__ import InstagramBridge
+
+
+class Puppet(DBPuppet, BasePuppet):
+    by_pk: Dict[int, 'Puppet'] = {}
+    by_custom_mxid: Dict[UserID, 'Puppet'] = {}
+    hs_domain: str
+    mxid_template: SimpleTemplate[int]
+
+    config: Config
+
+    default_mxid_intent: IntentAPI
+    default_mxid: UserID
+
+    def __init__(self, pk: int, name: Optional[str] = None, username: Optional[str] = None,
+                 photo_id: Optional[str] = None, photo_mxc: Optional[ContentURI] = None,
+                 is_registered: bool = False, custom_mxid: Optional[UserID] = None,
+                 access_token: Optional[str] = None, next_batch: Optional[SyncToken] = None,
+                 base_url: Optional[URL] = None) -> None:
+        super().__init__(pk=pk, name=name, username=username, photo_id=photo_id, photo_mxc=photo_mxc,
+                         is_registered=is_registered, custom_mxid=custom_mxid,
+                         access_token=access_token, next_batch=next_batch, base_url=base_url)
+        self.log = self.log.getChild(str(pk))
+
+        self.default_mxid = self.get_mxid_from_id(pk)
+        self.default_mxid_intent = self.az.intent.user(self.default_mxid)
+        self.intent = self._fresh_intent()
+
+    @classmethod
+    def init_cls(cls, bridge: 'InstagramBridge') -> AsyncIterable[Awaitable[None]]:
+        cls.config = bridge.config
+        cls.loop = bridge.loop
+        cls.mx = bridge.matrix
+        cls.az = bridge.az
+        cls.hs_domain = cls.config["homeserver.domain"]
+        cls.mxid_template = SimpleTemplate(cls.config["bridge.username_template"], "userid",
+                                           prefix="@", suffix=f":{cls.hs_domain}", type=int)
+        cls.sync_with_custom_puppets = cls.config["bridge.sync_with_custom_puppets"]
+        cls.homeserver_url_map = {server: URL(url) for server, url
+                                  in cls.config["bridge.double_puppet_server_map"].items()}
+        cls.allow_discover_url = cls.config["bridge.double_puppet_allow_discovery"]
+        cls.login_shared_secret_map = {server: secret.encode("utf-8") for server, secret
+                                       in cls.config["bridge.login_shared_secret_map"].items()}
+        cls.login_device_name = "Instagram Bridge"
+        return (puppet.try_start() async for puppet in cls.all_with_custom_mxid())
+
+    def intent_for(self, portal: 'p.Portal') -> IntentAPI:
+        if portal.other_user_pk == self.pk or (self.config["bridge.backfill.invite_own_puppet"]
+                                               and portal.backfill_lock.locked):
+            return self.default_mxid_intent
+        return self.intent
+
+    async def update_info(self, info: BaseResponseUser) -> None:
+        update = False
+        update = await self._update_name(info) or update
+        update = await self._update_avatar(info) or update
+        if update:
+            await self.update()
+
+    @classmethod
+    def _get_displayname(cls, info: BaseResponseUser) -> str:
+        return cls.config["bridge.displayname_template"].format(displayname=info.full_name,
+                                                                id=info.pk, username=info.username)
+
+    async def _update_name(self, info: BaseResponseUser) -> bool:
+        name = self._get_displayname(info)
+        if name != self.name:
+            self.name = name
+            try:
+                await self.default_mxid_intent.set_displayname(self.name)
+                self.name_set = True
+            except Exception:
+                self.log.exception("Failed to update displayname")
+                self.name_set = False
+            return True
+        return False
+
+    async def _update_avatar(self, info: BaseResponseUser) -> bool:
+        if info.profile_pic_id != self.photo_id or not self.avatar_set:
+            self.photo_id = info.profile_pic_id
+            if info.profile_pic_id:
+                async with ClientSession() as sess, sess.get(info.profile_pic_url) as resp:
+                    content_type = resp.headers["Content-Type"]
+                    resp_data = await resp.read()
+                mxc = await self.default_mxid_intent.upload_media(data=resp_data,
+                                                                  mime_type=content_type,
+                                                                  filename=info.profile_pic_id)
+            else:
+                mxc = None
+            try:
+                await self.default_mxid_intent.set_avatar_url(mxc)
+                self.avatar_set = True
+                self.photo_mxc = mxc
+            except Exception:
+                self.log.exception("Failed to update avatar")
+                self.avatar_set = False
+            return True
+        return False
+
+    async def default_puppet_should_leave_room(self, room_id: RoomID) -> bool:
+        portal = await p.Portal.get_by_mxid(room_id)
+        return portal and portal.other_user_pk != self.pk
+
+    # region Database getters
+
+    def _add_to_cache(self) -> None:
+        self.by_pk[self.pk] = self
+        if self.custom_mxid:
+            self.by_custom_mxid[self.custom_mxid] = self
+
+    async def save(self) -> None:
+        await self.update()
+
+    @classmethod
+    async def get_by_mxid(cls, mxid: UserID, create: bool = True) -> Optional['Puppet']:
+        pk = cls.get_id_from_mxid(mxid)
+        if pk:
+            return await cls.get_by_pk(pk, create)
+        return None
+
+    @classmethod
+    async def get_by_custom_mxid(cls, mxid: UserID) -> Optional['Puppet']:
+        try:
+            return cls.by_custom_mxid[mxid]
+        except KeyError:
+            pass
+
+        puppet = cast(cls, await super().get_by_custom_mxid(mxid))
+        if puppet:
+            puppet._add_to_cache()
+            return puppet
+
+        return None
+
+    @classmethod
+    def get_id_from_mxid(cls, mxid: UserID) -> Optional[int]:
+        return cls.mxid_template.parse(mxid)
+
+    @classmethod
+    def get_mxid_from_id(cls, twid: int) -> UserID:
+        return UserID(cls.mxid_template.format_full(twid))
+
+    @classmethod
+    async def get_by_pk(cls, pk: int, create: bool = True) -> Optional['Puppet']:
+        try:
+            return cls.by_pk[pk]
+        except KeyError:
+            pass
+
+        puppet = cast(cls, await super().get_by_pk(pk))
+        if puppet is not None:
+            puppet._add_to_cache()
+            return puppet
+
+        if create:
+            puppet = cls(pk)
+            await puppet.insert()
+            puppet._add_to_cache()
+            return puppet
+
+        return None
+
+    @classmethod
+    async def all_with_custom_mxid(cls) -> AsyncGenerator['Puppet', None]:
+        puppets = await super().all_with_custom_mxid()
+        puppet: cls
+        for index, puppet in enumerate(puppets):
+            try:
+                yield cls.by_pk[puppet.pk]
+            except KeyError:
+                puppet._add_to_cache()
+                yield puppet
+
+    # endregion

+ 310 - 0
mautrix_instagram/user.py

@@ -0,0 +1,310 @@
+# mautrix-instagram - A Matrix-Instagram puppeting bridge.
+# Copyright (C) 2020 Tulir Asokan
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+from typing import (Dict, Optional, AsyncIterable, Awaitable, AsyncGenerator, List, TYPE_CHECKING,
+                    cast)
+from collections import defaultdict
+import asyncio
+import logging
+
+from mauigpapi.mqtt import (AndroidMQTT, Connect, Disconnect, GraphQLSubscription,
+                            SkywalkerSubscription)
+from mauigpapi.http import AndroidAPI
+from mauigpapi.state import AndroidState
+from mauigpapi.types import CurrentUser, MessageSyncEvent
+from mauigpapi.errors import IGNotLoggedInError
+from mautrix.bridge import BaseUser
+from mautrix.types import UserID, RoomID, EventID, TextMessageEventContent, MessageType
+from mautrix.appservice import AppService
+from mautrix.util.opt_prometheus import Summary, Gauge, async_time
+
+from .db import User as DBUser, Portal as DBPortal
+from .config import Config
+from . import puppet as pu, portal as po
+
+if TYPE_CHECKING:
+    from .__main__ import InstagramBridge
+
+METRIC_MESSAGE = Summary("bridge_on_message", "calls to handle_message")
+METRIC_RECEIPT = Summary("bridge_on_receipt", "calls to handle_receipt")
+METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
+METRIC_CONNECTED = Gauge("bridge_connected", "Bridged users connected to Instagram")
+
+
+class User(DBUser, BaseUser):
+    by_mxid: Dict[UserID, 'User'] = {}
+    by_igpk: Dict[int, 'User'] = {}
+    config: Config
+    az: AppService
+    loop: asyncio.AbstractEventLoop
+
+    client: Optional[AndroidAPI]
+    mqtt: Optional[AndroidMQTT]
+    _listen_task: Optional[asyncio.Task] = None
+
+    permission_level: str
+    username: Optional[str]
+
+    _notice_room_lock: asyncio.Lock
+    _notice_send_lock: asyncio.Lock
+    _is_logged_in: bool
+
+    def __init__(self, mxid: UserID, igpk: Optional[int] = None,
+                 state: Optional[AndroidState] = None, notice_room: Optional[RoomID] = None
+                 ) -> None:
+        super().__init__(mxid=mxid, igpk=igpk, state=state, notice_room=notice_room)
+        self._notice_room_lock = asyncio.Lock()
+        self._notice_send_lock = asyncio.Lock()
+        perms = self.config.get_permissions(mxid)
+        self.is_whitelisted, self.is_admin, self.permission_level = perms
+        self.log = self.log.getChild(self.mxid)
+        self.client = None
+        self.username = None
+        self.dm_update_lock = asyncio.Lock()
+        self._metric_value = defaultdict(lambda: False)
+        self._is_logged_in = False
+        self._listen_task = None
+
+    @classmethod
+    def init_cls(cls, bridge: 'InstagramBridge') -> AsyncIterable[Awaitable[None]]:
+        cls.bridge = bridge
+        cls.config = bridge.config
+        cls.az = bridge.az
+        cls.loop = bridge.loop
+        return (user.try_connect() async for user in cls.all_logged_in())
+
+    # region Connection management
+
+    async def is_logged_in(self) -> bool:
+        return bool(self.client) and self._is_logged_in
+
+    async def try_connect(self) -> None:
+        try:
+            await self.connect()
+        except Exception:
+            self.log.exception("Error while connecting to Instagram")
+
+    async def connect(self) -> None:
+        client = AndroidAPI(self.state)
+
+        try:
+            resp = await client.current_user()
+        except IGNotLoggedInError as e:
+            self.log.warning(f"Failed to connect to Instagram: {e}")
+            # TODO show reason?
+            await self.send_bridge_notice("You have been logged out of Instagram")
+            return
+        self.client = client
+        self.igpk = resp.user.pk
+        self.username = resp.user.username
+        self._track_metric(METRIC_LOGGED_IN, True)
+        self.by_igpk[self.igpk] = self
+
+        self.mqtt = AndroidMQTT(self.state, loop=self.loop,
+                                log=logging.getLogger("mau.instagram.mqtt").getChild(self.mxid))
+        self.mqtt.add_event_handler(Connect, self.on_connect)
+        self.mqtt.add_event_handler(Disconnect, self.on_disconnect)
+        self.mqtt.add_event_handler(MessageSyncEvent, self.handle_message)
+
+        await self.update()
+
+        self.loop.create_task(self._try_sync_puppet(resp.user))
+        self.loop.create_task(self._try_sync())
+
+    async def on_connect(self, evt: Connect) -> None:
+        self._track_metric(METRIC_CONNECTED, True)
+
+    async def on_disconnect(self, evt: Disconnect) -> None:
+        self._track_metric(METRIC_CONNECTED, False)
+
+    # TODO this stuff could probably be moved to mautrix-python
+    async def get_notice_room(self) -> RoomID:
+        if not self.notice_room:
+            async with self._notice_room_lock:
+                # If someone already created the room while this call was waiting,
+                # don't make a new room
+                if self.notice_room:
+                    return self.notice_room
+                self.notice_room = await self.az.intent.create_room(
+                    is_direct=True, invitees=[self.mxid],
+                    topic="Instagram bridge notices")
+                await self.update()
+        return self.notice_room
+
+    async def send_bridge_notice(self, text: str, edit: Optional[EventID] = None,
+                                 important: bool = False) -> Optional[EventID]:
+        event_id = None
+        try:
+            self.log.debug("Sending bridge notice: %s", text)
+            content = TextMessageEventContent(body=text, msgtype=(MessageType.TEXT if important
+                                                                  else MessageType.NOTICE))
+            if edit:
+                content.set_edit(edit)
+            # This is locked to prevent notices going out in the wrong order
+            async with self._notice_send_lock:
+                event_id = await self.az.intent.send_message(await self.get_notice_room(), content)
+        except Exception:
+            self.log.warning("Failed to send bridge notice", exc_info=True)
+        return edit or event_id
+
+    async def _try_sync_puppet(self, user_info: CurrentUser) -> None:
+        puppet = await pu.Puppet.get_by_pk(self.igpk)
+        try:
+            await puppet.update_info(user_info)
+        except Exception:
+            self.log.exception("Failed to update own puppet info")
+        try:
+            if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
+                self.log.info(f"Automatically enabling custom puppet")
+                await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
+        except Exception:
+            self.log.exception("Failed to automatically enable custom puppet")
+
+    async def _try_sync(self) -> None:
+        try:
+            await self.sync()
+        except Exception:
+            self.log.exception("Exception while syncing")
+
+    async def get_direct_chats(self) -> Dict[UserID, List[RoomID]]:
+        return {
+            pu.Puppet.get_mxid_from_id(portal.other_user_pk): [portal.mxid]
+            for portal in await DBPortal.find_private_chats_of(self.igpk)
+            if portal.mxid
+        }
+
+    async def sync(self) -> None:
+        resp = await self.client.get_inbox()
+        limit = self.config["bridge.initial_conversation_sync"]
+        threads = sorted(resp.inbox.threads, key=lambda thread: thread.last_activity_at)
+        if limit < 0:
+            limit = len(threads)
+        for i, thread in enumerate(threads):
+            portal = await po.Portal.get_by_thread(thread, self.igpk)
+            if portal.mxid or i < limit:
+                await portal.create_matrix_room(self, thread)
+        await self.update_direct_chats()
+
+        self._listen_task = self.loop.create_task(self.mqtt.listen(
+            graphql_subs={GraphQLSubscription.app_presence(),
+                          GraphQLSubscription.direct_typing(self.state.user_id),
+                          GraphQLSubscription.direct_status()},
+            skywalker_subs={SkywalkerSubscription.direct_sub(self.state.user_id),
+                            SkywalkerSubscription.live_sub(self.state.user_id)},
+            seq_id=resp.seq_id, snapshot_at_ms=resp.snapshot_at_ms))
+
+    async def stop(self) -> None:
+        if self.mqtt:
+            self.mqtt.disconnect()
+        self._track_metric(METRIC_CONNECTED, False)
+        await self.update()
+
+    async def logout(self) -> None:
+        if self.mqtt:
+            self.mqtt.disconnect()
+        self._track_metric(METRIC_CONNECTED, False)
+        self._track_metric(METRIC_LOGGED_IN, False)
+        puppet = await pu.Puppet.get_by_pk(self.igpk, create=False)
+        if puppet and puppet.is_real_user:
+            await puppet.switch_mxid(None, None)
+        try:
+            del self.by_igpk[self.igpk]
+        except KeyError:
+            pass
+        self.client = None
+        self.mqtt = None
+        self.state = None
+        self._is_logged_in = False
+        await self.update()
+
+    # endregion
+    # region Event handlers
+
+    @async_time(METRIC_MESSAGE)
+    async def handle_message(self, evt: MessageSyncEvent) -> None:
+        portal = await po.Portal.get_by_thread_id(evt.message.thread_id, receiver=self.igpk)
+        if not portal.mxid:
+            # TODO try to find the thread?
+            self.log.warning(f"Ignoring message to unknown thread {evt.message.thread_id}")
+            return
+        sender = await pu.Puppet.get_by_pk(evt.message.user_id)
+        await portal.handle_instagram_item(self, sender, evt.message)
+
+    # @async_time(METRIC_RECEIPT)
+    # async def handle_receipt(self, evt: ConversationReadEntry) -> None:
+    #     portal = await po.Portal.get_by_twid(evt.conversation_id, self.twid,
+    #                                          conv_type=evt.conversation.type)
+    #     if not portal.mxid:
+    #         return
+    #     sender = await pu.Puppet.get_by_twid(self.twid)
+    #     await portal.handle_twitter_receipt(sender, int(evt.last_read_event_id))
+
+    # endregion
+    # region Database getters
+
+    def _add_to_cache(self) -> None:
+        self.by_mxid[self.mxid] = self
+        if self.igpk:
+            self.by_igpk[self.igpk] = self
+
+    @classmethod
+    async def get_by_mxid(cls, mxid: UserID, create: bool = True) -> Optional['User']:
+        # Never allow ghosts to be users
+        if pu.Puppet.get_id_from_mxid(mxid):
+            return None
+        try:
+            return cls.by_mxid[mxid]
+        except KeyError:
+            pass
+
+        user = cast(cls, await super().get_by_mxid(mxid))
+        if user is not None:
+            user._add_to_cache()
+            return user
+
+        if create:
+            user = cls(mxid)
+            await user.insert()
+            user._add_to_cache()
+            return user
+
+        return None
+
+    @classmethod
+    async def get_by_igpk(cls, igpk: int) -> Optional['User']:
+        try:
+            return cls.by_igpk[igpk]
+        except KeyError:
+            pass
+
+        user = cast(cls, await super().get_by_igpk(igpk))
+        if user is not None:
+            user._add_to_cache()
+            return user
+
+        return None
+
+    @classmethod
+    async def all_logged_in(cls) -> AsyncGenerator['User', None]:
+        users = await super().all_logged_in()
+        user: cls
+        for index, user in enumerate(users):
+            try:
+                yield cls.by_mxid[user.mxid]
+            except KeyError:
+                user._add_to_cache()
+                yield user
+
+    # endregion