# mautrix-instagram - A Matrix-Instagram puppeting bridge. # Copyright (C) 2022 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from __future__ import annotations from typing import TYPE_CHECKING, Any, AsyncGenerator, AsyncIterable, Awaitable, cast import os.path from yarl import URL from mauigpapi.types import BaseResponseUser from mautrix.appservice import IntentAPI from mautrix.bridge import BasePuppet, async_getter_lock from mautrix.types import ContentURI, RoomID, SyncToken, UserID from mautrix.util.simple_template import SimpleTemplate from . import portal as p, user as u from .config import Config from .db import Puppet as DBPuppet if TYPE_CHECKING: from .__main__ import InstagramBridge class Puppet(DBPuppet, BasePuppet): by_pk: dict[int, Puppet] = {} by_custom_mxid: dict[UserID, Puppet] = {} hs_domain: str mxid_template: SimpleTemplate[int] bridge: InstagramBridge config: Config default_mxid_intent: IntentAPI default_mxid: UserID def __init__( self, pk: int, name: str | None = None, username: str | None = None, photo_id: str | None = None, photo_mxc: ContentURI | None = None, name_set: bool = False, avatar_set: bool = False, contact_info_set: bool = False, is_registered: bool = False, custom_mxid: UserID | None = None, access_token: str | None = None, next_batch: SyncToken | None = None, base_url: URL | None = None, ) -> None: super().__init__( pk=pk, name=name, username=username, photo_id=photo_id, name_set=name_set, photo_mxc=photo_mxc, avatar_set=avatar_set, contact_info_set=contact_info_set, is_registered=is_registered, custom_mxid=custom_mxid, access_token=access_token, next_batch=next_batch, base_url=base_url, ) self.log = self.log.getChild(str(pk)) self.default_mxid = self.get_mxid_from_id(pk) self.default_mxid_intent = self.az.intent.user(self.default_mxid) self.intent = self._fresh_intent() @classmethod def init_cls(cls, bridge: "InstagramBridge") -> AsyncIterable[Awaitable[None]]: cls.bridge = bridge cls.config = bridge.config cls.loop = bridge.loop cls.mx = bridge.matrix cls.az = bridge.az cls.hs_domain = cls.config["homeserver.domain"] cls.mxid_template = SimpleTemplate( cls.config["bridge.username_template"], "userid", prefix="@", suffix=f":{cls.hs_domain}", type=int, ) cls.sync_with_custom_puppets = cls.config["bridge.sync_with_custom_puppets"] cls.homeserver_url_map = { server: URL(url) for server, url in cls.config["bridge.double_puppet_server_map"].items() } cls.allow_discover_url = cls.config["bridge.double_puppet_allow_discovery"] cls.login_shared_secret_map = { server: secret.encode("utf-8") for server, secret in cls.config["bridge.login_shared_secret_map"].items() } cls.login_device_name = "Instagram Bridge" return (puppet.try_start() async for puppet in cls.all_with_custom_mxid()) @property def igpk(self) -> int: return self.pk def intent_for(self, portal: p.Portal) -> IntentAPI: if portal.other_user_pk == self.pk: return self.default_mxid_intent return self.intent def need_backfill_invite(self, portal: p.Portal) -> bool: return ( portal.other_user_pk != self.pk and (self.is_real_user or portal.is_direct) and self.config["bridge.backfill.invite_own_puppet"] ) async def update_info(self, info: BaseResponseUser, source: u.User) -> None: update = False if info.username and self.username != info.username: self.username = info.username update = True update = await self.update_contact_info() or update update = await self._update_name(info) or update update = await self._update_avatar(info, source) or update if update: await self.update() async def update_contact_info(self) -> bool: if not self.bridge.homeserver_software.is_hungry: return False if self.contact_info_set: return False try: contact_info: dict[str, Any] = { "com.beeper.bridge.remote_id": str(self.igpk), "com.beeper.bridge.service": self.bridge.beeper_service_name, "com.beeper.bridge.network": self.bridge.beeper_network_name, } if self.username: contact_info["com.beeper.bridge.identifiers"] = [f"instagram:{self.username}"] await self.default_mxid_intent.beeper_update_profile(contact_info) self.contact_info_set = True except Exception: self.log.exception("Error updating contact info") self.contact_info_set = False return True @classmethod def _get_displayname(cls, info: BaseResponseUser) -> str: return cls.config["bridge.displayname_template"].format( displayname=info.full_name or info.username, id=info.pk, username=info.username ) async def _update_name(self, info: BaseResponseUser) -> bool: name = self._get_displayname(info) if name != self.name: self.name = name try: await self.default_mxid_intent.set_displayname(self.name) self.name_set = True except Exception: self.log.exception("Failed to update displayname") self.name_set = False return True return False async def _update_avatar(self, info: BaseResponseUser, source: u.User) -> bool: pic_id = ( f"id_{info.profile_pic_id}.jpg" if info.profile_pic_id else os.path.basename(URL(info.profile_pic_url).path) ) if pic_id != self.photo_id or not self.avatar_set: self.photo_id = pic_id if info.has_anonymous_profile_picture: mxc = "" else: resp = await source.client.proxy_with_retry( "Puppet._update_avatar", lambda: source.client.raw_http_get(info.profile_pic_url), ) content_type = resp.headers["Content-Type"] resp_data = await resp.read() mxc = await self.default_mxid_intent.upload_media( data=resp_data, mime_type=content_type, filename=pic_id, async_upload=self.config["homeserver.async_media"], ) try: await self.default_mxid_intent.set_avatar_url(mxc) self.avatar_set = True self.photo_mxc = mxc except Exception: self.log.exception("Failed to update avatar") self.avatar_set = False return True return False async def default_puppet_should_leave_room(self, room_id: RoomID) -> bool: portal = await p.Portal.get_by_mxid(room_id) return portal and portal.other_user_pk != self.pk # region Database getters def _add_to_cache(self) -> None: self.by_pk[self.pk] = self if self.custom_mxid: self.by_custom_mxid[self.custom_mxid] = self async def save(self) -> None: await self.update() @classmethod async def get_by_mxid(cls, mxid: UserID, create: bool = True) -> Puppet | None: pk = cls.get_id_from_mxid(mxid) if pk: return await cls.get_by_pk(pk, create=create) return None @classmethod @async_getter_lock async def get_by_custom_mxid(cls, mxid: UserID) -> Puppet | None: try: return cls.by_custom_mxid[mxid] except KeyError: pass puppet = cast(cls, await super().get_by_custom_mxid(mxid)) if puppet: puppet._add_to_cache() return puppet return None @classmethod def get_id_from_mxid(cls, mxid: UserID) -> int | None: return cls.mxid_template.parse(mxid) @classmethod def get_mxid_from_id(cls, pk: int) -> UserID: return UserID(cls.mxid_template.format_full(pk)) @classmethod @async_getter_lock async def get_by_pk(cls, pk: int, *, create: bool = True) -> Puppet | None: try: return cls.by_pk[pk] except KeyError: pass puppet = cast(cls, await super().get_by_pk(pk)) if puppet is not None: puppet._add_to_cache() return puppet if create: puppet = cls(pk) await puppet.insert() puppet._add_to_cache() return puppet return None @classmethod async def all_with_custom_mxid(cls) -> AsyncGenerator[Puppet, None]: puppets = await super().all_with_custom_mxid() puppet: cls for puppet in puppets: try: yield cls.by_pk[puppet.pk] except KeyError: puppet._add_to_cache() yield puppet @classmethod async def get_all(cls) -> AsyncGenerator[Puppet, None]: puppets = await super().get_all() puppet: cls for puppet in puppets: try: yield cls.by_pk[puppet.pk] except KeyError: puppet._add_to_cache() yield puppet # endregion