123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- # mautrix-instagram - A Matrix-Instagram puppeting bridge.
- # Copyright (C) 2022 Tulir Asokan
- #
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU Affero General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Affero General Public License for more details.
- #
- # You should have received a copy of the GNU Affero General Public License
- # along with this program. If not, see <https://www.gnu.org/licenses/>.
- from __future__ import annotations
- from typing import TYPE_CHECKING, Any, AsyncGenerator, AsyncIterable, Awaitable, cast
- import os.path
- from yarl import URL
- from mauigpapi.types import BaseResponseUser
- from mautrix.appservice import IntentAPI
- from mautrix.bridge import BasePuppet, async_getter_lock
- from mautrix.types import ContentURI, RoomID, SyncToken, UserID
- from mautrix.util.simple_template import SimpleTemplate
- from . import portal as p, user as u
- from .config import Config
- from .db import Puppet as DBPuppet
- if TYPE_CHECKING:
- from .__main__ import InstagramBridge
- class Puppet(DBPuppet, BasePuppet):
- by_pk: dict[int, Puppet] = {}
- by_custom_mxid: dict[UserID, Puppet] = {}
- hs_domain: str
- mxid_template: SimpleTemplate[int]
- bridge: InstagramBridge
- config: Config
- default_mxid_intent: IntentAPI
- default_mxid: UserID
- def __init__(
- self,
- pk: int,
- name: str | None = None,
- username: str | None = None,
- photo_id: str | None = None,
- photo_mxc: ContentURI | None = None,
- name_set: bool = False,
- avatar_set: bool = False,
- contact_info_set: bool = False,
- is_registered: bool = False,
- custom_mxid: UserID | None = None,
- access_token: str | None = None,
- next_batch: SyncToken | None = None,
- base_url: URL | None = None,
- ) -> None:
- super().__init__(
- pk=pk,
- name=name,
- username=username,
- photo_id=photo_id,
- name_set=name_set,
- photo_mxc=photo_mxc,
- avatar_set=avatar_set,
- contact_info_set=contact_info_set,
- is_registered=is_registered,
- custom_mxid=custom_mxid,
- access_token=access_token,
- next_batch=next_batch,
- base_url=base_url,
- )
- self.log = self.log.getChild(str(pk))
- self.default_mxid = self.get_mxid_from_id(pk)
- self.default_mxid_intent = self.az.intent.user(self.default_mxid)
- self.intent = self._fresh_intent()
- @classmethod
- def init_cls(cls, bridge: "InstagramBridge") -> AsyncIterable[Awaitable[None]]:
- cls.bridge = bridge
- cls.config = bridge.config
- cls.loop = bridge.loop
- cls.mx = bridge.matrix
- cls.az = bridge.az
- cls.hs_domain = cls.config["homeserver.domain"]
- cls.mxid_template = SimpleTemplate(
- cls.config["bridge.username_template"],
- "userid",
- prefix="@",
- suffix=f":{cls.hs_domain}",
- type=int,
- )
- cls.sync_with_custom_puppets = cls.config["bridge.sync_with_custom_puppets"]
- cls.homeserver_url_map = {
- server: URL(url)
- for server, url in cls.config["bridge.double_puppet_server_map"].items()
- }
- cls.allow_discover_url = cls.config["bridge.double_puppet_allow_discovery"]
- cls.login_shared_secret_map = {
- server: secret.encode("utf-8")
- for server, secret in cls.config["bridge.login_shared_secret_map"].items()
- }
- cls.login_device_name = "Instagram Bridge"
- return (puppet.try_start() async for puppet in cls.all_with_custom_mxid())
- @property
- def igpk(self) -> int:
- return self.pk
- def intent_for(self, portal: p.Portal) -> IntentAPI:
- if portal.other_user_pk == self.pk:
- return self.default_mxid_intent
- return self.intent
- def need_backfill_invite(self, portal: p.Portal) -> bool:
- return (
- portal.other_user_pk != self.pk
- and (self.is_real_user or portal.is_direct)
- and self.config["bridge.backfill.invite_own_puppet"]
- )
- async def update_info(self, info: BaseResponseUser, source: u.User) -> None:
- update = False
- if info.username and self.username != info.username:
- self.username = info.username
- update = True
- update = await self.update_contact_info() or update
- update = await self._update_name(info) or update
- update = await self._update_avatar(info, source) or update
- if update:
- await self.update()
- async def update_contact_info(self) -> bool:
- if not self.bridge.homeserver_software.is_hungry:
- return False
- if self.contact_info_set:
- return False
- try:
- contact_info: dict[str, Any] = {
- "com.beeper.bridge.remote_id": str(self.igpk),
- "com.beeper.bridge.service": self.bridge.beeper_service_name,
- "com.beeper.bridge.network": self.bridge.beeper_network_name,
- }
- if self.username:
- contact_info["com.beeper.bridge.identifiers"] = [f"instagram:{self.username}"]
- await self.default_mxid_intent.beeper_update_profile(contact_info)
- self.contact_info_set = True
- except Exception:
- self.log.exception("Error updating contact info")
- self.contact_info_set = False
- return True
- @classmethod
- def _get_displayname(cls, info: BaseResponseUser) -> str:
- return cls.config["bridge.displayname_template"].format(
- displayname=info.full_name or info.username, id=info.pk, username=info.username
- )
- async def _update_name(self, info: BaseResponseUser) -> bool:
- name = self._get_displayname(info)
- if name != self.name:
- self.name = name
- try:
- await self.default_mxid_intent.set_displayname(self.name)
- self.name_set = True
- except Exception:
- self.log.exception("Failed to update displayname")
- self.name_set = False
- return True
- return False
- async def _update_avatar(self, info: BaseResponseUser, source: u.User) -> bool:
- pic_id = (
- f"id_{info.profile_pic_id}.jpg"
- if info.profile_pic_id
- else os.path.basename(URL(info.profile_pic_url).path)
- )
- if pic_id != self.photo_id or not self.avatar_set:
- self.photo_id = pic_id
- if info.has_anonymous_profile_picture:
- mxc = ""
- else:
- resp = await source.client.proxy_with_retry(
- "Puppet._update_avatar",
- lambda: source.client.raw_http_get(info.profile_pic_url),
- )
- content_type = resp.headers["Content-Type"]
- resp_data = await resp.read()
- mxc = await self.default_mxid_intent.upload_media(
- data=resp_data,
- mime_type=content_type,
- filename=pic_id,
- async_upload=self.config["homeserver.async_media"],
- )
- try:
- await self.default_mxid_intent.set_avatar_url(mxc)
- self.avatar_set = True
- self.photo_mxc = mxc
- except Exception:
- self.log.exception("Failed to update avatar")
- self.avatar_set = False
- return True
- return False
- async def default_puppet_should_leave_room(self, room_id: RoomID) -> bool:
- portal = await p.Portal.get_by_mxid(room_id)
- return portal and portal.other_user_pk != self.pk
- # region Database getters
- def _add_to_cache(self) -> None:
- self.by_pk[self.pk] = self
- if self.custom_mxid:
- self.by_custom_mxid[self.custom_mxid] = self
- async def save(self) -> None:
- await self.update()
- @classmethod
- async def get_by_mxid(cls, mxid: UserID, create: bool = True) -> Puppet | None:
- pk = cls.get_id_from_mxid(mxid)
- if pk:
- return await cls.get_by_pk(pk, create=create)
- return None
- @classmethod
- @async_getter_lock
- async def get_by_custom_mxid(cls, mxid: UserID) -> Puppet | None:
- try:
- return cls.by_custom_mxid[mxid]
- except KeyError:
- pass
- puppet = cast(cls, await super().get_by_custom_mxid(mxid))
- if puppet:
- puppet._add_to_cache()
- return puppet
- return None
- @classmethod
- def get_id_from_mxid(cls, mxid: UserID) -> int | None:
- return cls.mxid_template.parse(mxid)
- @classmethod
- def get_mxid_from_id(cls, pk: int) -> UserID:
- return UserID(cls.mxid_template.format_full(pk))
- @classmethod
- @async_getter_lock
- async def get_by_pk(cls, pk: int, *, create: bool = True) -> Puppet | None:
- try:
- return cls.by_pk[pk]
- except KeyError:
- pass
- puppet = cast(cls, await super().get_by_pk(pk))
- if puppet is not None:
- puppet._add_to_cache()
- return puppet
- if create:
- puppet = cls(pk)
- await puppet.insert()
- puppet._add_to_cache()
- return puppet
- return None
- @classmethod
- async def all_with_custom_mxid(cls) -> AsyncGenerator[Puppet, None]:
- puppets = await super().all_with_custom_mxid()
- puppet: cls
- for puppet in puppets:
- try:
- yield cls.by_pk[puppet.pk]
- except KeyError:
- puppet._add_to_cache()
- yield puppet
- @classmethod
- async def get_all(cls) -> AsyncGenerator[Puppet, None]:
- puppets = await super().get_all()
- puppet: cls
- for puppet in puppets:
- try:
- yield cls.by_pk[puppet.pk]
- except KeyError:
- puppet._add_to_cache()
- yield puppet
- # endregion
|