user.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2020 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from typing import (Dict, Optional, AsyncIterable, Awaitable, AsyncGenerator, List, TYPE_CHECKING,
  17. cast)
  18. from collections import defaultdict
  19. import asyncio
  20. import logging
  21. from mauigpapi import AndroidAPI, AndroidState, AndroidMQTT
  22. from mauigpapi.mqtt import Connect, Disconnect, GraphQLSubscription, SkywalkerSubscription
  23. from mauigpapi.types import CurrentUser, MessageSyncEvent, Operation
  24. from mauigpapi.errors import IGNotLoggedInError
  25. from mautrix.bridge import BaseUser
  26. from mautrix.types import UserID, RoomID, EventID, TextMessageEventContent, MessageType
  27. from mautrix.appservice import AppService
  28. from mautrix.util.opt_prometheus import Summary, Gauge, async_time
  29. from .db import User as DBUser, Portal as DBPortal
  30. from .config import Config
  31. from . import puppet as pu, portal as po
  32. if TYPE_CHECKING:
  33. from .__main__ import InstagramBridge
  34. METRIC_MESSAGE = Summary("bridge_on_message", "calls to handle_message")
  35. METRIC_RECEIPT = Summary("bridge_on_receipt", "calls to handle_receipt")
  36. METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
  37. METRIC_CONNECTED = Gauge("bridge_connected", "Bridged users connected to Instagram")
  38. class User(DBUser, BaseUser):
  39. by_mxid: Dict[UserID, 'User'] = {}
  40. by_igpk: Dict[int, 'User'] = {}
  41. config: Config
  42. az: AppService
  43. loop: asyncio.AbstractEventLoop
  44. client: Optional[AndroidAPI]
  45. mqtt: Optional[AndroidMQTT]
  46. _listen_task: Optional[asyncio.Task] = None
  47. permission_level: str
  48. username: Optional[str]
  49. _notice_room_lock: asyncio.Lock
  50. _notice_send_lock: asyncio.Lock
  51. _is_logged_in: bool
  52. def __init__(self, mxid: UserID, igpk: Optional[int] = None,
  53. state: Optional[AndroidState] = None, notice_room: Optional[RoomID] = None
  54. ) -> None:
  55. super().__init__(mxid=mxid, igpk=igpk, state=state, notice_room=notice_room)
  56. self._notice_room_lock = asyncio.Lock()
  57. self._notice_send_lock = asyncio.Lock()
  58. perms = self.config.get_permissions(mxid)
  59. self.is_whitelisted, self.is_admin, self.permission_level = perms
  60. self.log = self.log.getChild(self.mxid)
  61. self.client = None
  62. self.username = None
  63. self.dm_update_lock = asyncio.Lock()
  64. self._metric_value = defaultdict(lambda: False)
  65. self._is_logged_in = False
  66. self._listen_task = None
  67. self.command_status = None
  68. @classmethod
  69. def init_cls(cls, bridge: 'InstagramBridge') -> AsyncIterable[Awaitable[None]]:
  70. cls.bridge = bridge
  71. cls.config = bridge.config
  72. cls.az = bridge.az
  73. cls.loop = bridge.loop
  74. return (user.try_connect() async for user in cls.all_logged_in())
  75. # region Connection management
  76. async def is_logged_in(self) -> bool:
  77. return bool(self.client) and self._is_logged_in
  78. async def try_connect(self) -> None:
  79. try:
  80. await self.connect()
  81. except Exception:
  82. self.log.exception("Error while connecting to Instagram")
  83. async def connect(self) -> None:
  84. client = AndroidAPI(self.state)
  85. try:
  86. resp = await client.current_user()
  87. except IGNotLoggedInError as e:
  88. self.log.warning(f"Failed to connect to Instagram: {e}")
  89. # TODO show reason?
  90. await self.send_bridge_notice("You have been logged out of Instagram")
  91. return
  92. self.client = client
  93. self._is_logged_in = True
  94. self.igpk = resp.user.pk
  95. self.username = resp.user.username
  96. self._track_metric(METRIC_LOGGED_IN, True)
  97. self.by_igpk[self.igpk] = self
  98. self.mqtt = AndroidMQTT(self.state, loop=self.loop,
  99. log=logging.getLogger("mau.instagram.mqtt").getChild(self.mxid))
  100. self.mqtt.add_event_handler(Connect, self.on_connect)
  101. self.mqtt.add_event_handler(Disconnect, self.on_disconnect)
  102. self.mqtt.add_event_handler(MessageSyncEvent, self.handle_message)
  103. await self.update()
  104. self.loop.create_task(self._try_sync_puppet(resp.user))
  105. self.loop.create_task(self._try_sync())
  106. async def on_connect(self, evt: Connect) -> None:
  107. self._track_metric(METRIC_CONNECTED, True)
  108. async def on_disconnect(self, evt: Disconnect) -> None:
  109. self._track_metric(METRIC_CONNECTED, False)
  110. # TODO this stuff could probably be moved to mautrix-python
  111. async def get_notice_room(self) -> RoomID:
  112. if not self.notice_room:
  113. async with self._notice_room_lock:
  114. # If someone already created the room while this call was waiting,
  115. # don't make a new room
  116. if self.notice_room:
  117. return self.notice_room
  118. self.notice_room = await self.az.intent.create_room(
  119. is_direct=True, invitees=[self.mxid],
  120. topic="Instagram bridge notices")
  121. await self.update()
  122. return self.notice_room
  123. async def send_bridge_notice(self, text: str, edit: Optional[EventID] = None,
  124. important: bool = False) -> Optional[EventID]:
  125. event_id = None
  126. try:
  127. self.log.debug("Sending bridge notice: %s", text)
  128. content = TextMessageEventContent(body=text, msgtype=(MessageType.TEXT if important
  129. else MessageType.NOTICE))
  130. if edit:
  131. content.set_edit(edit)
  132. # This is locked to prevent notices going out in the wrong order
  133. async with self._notice_send_lock:
  134. event_id = await self.az.intent.send_message(await self.get_notice_room(), content)
  135. except Exception:
  136. self.log.warning("Failed to send bridge notice", exc_info=True)
  137. return edit or event_id
  138. async def _try_sync_puppet(self, user_info: CurrentUser) -> None:
  139. puppet = await pu.Puppet.get_by_pk(self.igpk)
  140. try:
  141. await puppet.update_info(user_info)
  142. except Exception:
  143. self.log.exception("Failed to update own puppet info")
  144. try:
  145. if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
  146. self.log.info(f"Automatically enabling custom puppet")
  147. await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
  148. except Exception:
  149. self.log.exception("Failed to automatically enable custom puppet")
  150. async def _try_sync(self) -> None:
  151. try:
  152. await self.sync()
  153. except Exception:
  154. self.log.exception("Exception while syncing")
  155. async def get_direct_chats(self) -> Dict[UserID, List[RoomID]]:
  156. return {
  157. pu.Puppet.get_mxid_from_id(portal.other_user_pk): [portal.mxid]
  158. for portal in await DBPortal.find_private_chats_of(self.igpk)
  159. if portal.mxid
  160. }
  161. async def sync(self) -> None:
  162. resp = await self.client.get_inbox()
  163. limit = self.config["bridge.initial_conversation_sync"]
  164. threads = sorted(resp.inbox.threads, key=lambda thread: thread.last_activity_at)
  165. if limit < 0:
  166. limit = len(threads)
  167. for i, thread in enumerate(threads):
  168. portal = await po.Portal.get_by_thread(thread, self.igpk)
  169. if portal.mxid:
  170. await portal.update_matrix_room(self, thread, backfill=True)
  171. elif i < limit:
  172. await portal.create_matrix_room(self, thread)
  173. await self.update_direct_chats()
  174. self._listen_task = self.loop.create_task(self.mqtt.listen(
  175. graphql_subs={GraphQLSubscription.app_presence(),
  176. GraphQLSubscription.direct_typing(self.state.user_id),
  177. GraphQLSubscription.direct_status()},
  178. skywalker_subs={SkywalkerSubscription.direct_sub(self.state.user_id),
  179. SkywalkerSubscription.live_sub(self.state.user_id)},
  180. seq_id=resp.seq_id, snapshot_at_ms=resp.snapshot_at_ms))
  181. async def stop(self) -> None:
  182. if self.mqtt:
  183. self.mqtt.disconnect()
  184. self._track_metric(METRIC_CONNECTED, False)
  185. await self.update()
  186. async def logout(self) -> None:
  187. if self.mqtt:
  188. self.mqtt.disconnect()
  189. self._track_metric(METRIC_CONNECTED, False)
  190. self._track_metric(METRIC_LOGGED_IN, False)
  191. puppet = await pu.Puppet.get_by_pk(self.igpk, create=False)
  192. if puppet and puppet.is_real_user:
  193. await puppet.switch_mxid(None, None)
  194. try:
  195. del self.by_igpk[self.igpk]
  196. except KeyError:
  197. pass
  198. self.client = None
  199. self.mqtt = None
  200. self.state = None
  201. self._is_logged_in = False
  202. await self.update()
  203. # endregion
  204. # region Event handlers
  205. @async_time(METRIC_MESSAGE)
  206. async def handle_message(self, evt: MessageSyncEvent) -> None:
  207. portal = await po.Portal.get_by_thread_id(evt.message.thread_id, receiver=self.igpk)
  208. if not portal.mxid:
  209. # TODO try to find the thread?
  210. self.log.warning(f"Ignoring message to unknown thread {evt.message.thread_id}")
  211. return
  212. self.log.trace(f"Received message sync event {evt.message}")
  213. sender = await pu.Puppet.get_by_pk(evt.message.user_id) if evt.message.user_id else None
  214. if evt.message.op == Operation.ADD:
  215. if not sender:
  216. # I don't think we care about adds with no sender
  217. return
  218. await portal.handle_instagram_item(self, sender, evt.message)
  219. elif evt.message.op == Operation.REMOVE:
  220. # Removes don't have a sender, only the message sender can unsend messages anyway
  221. await portal.handle_instagram_remove(evt.message.item_id)
  222. elif evt.message.op == Operation.REPLACE:
  223. await portal.handle_instagram_update(evt.message)
  224. # @async_time(METRIC_RECEIPT)
  225. # async def handle_receipt(self, evt: ConversationReadEntry) -> None:
  226. # portal = await po.Portal.get_by_twid(evt.conversation_id, self.twid,
  227. # conv_type=evt.conversation.type)
  228. # if not portal.mxid:
  229. # return
  230. # sender = await pu.Puppet.get_by_twid(self.twid)
  231. # await portal.handle_twitter_receipt(sender, int(evt.last_read_event_id))
  232. # endregion
  233. # region Database getters
  234. def _add_to_cache(self) -> None:
  235. self.by_mxid[self.mxid] = self
  236. if self.igpk:
  237. self.by_igpk[self.igpk] = self
  238. @classmethod
  239. async def get_by_mxid(cls, mxid: UserID, create: bool = True) -> Optional['User']:
  240. # Never allow ghosts to be users
  241. if pu.Puppet.get_id_from_mxid(mxid):
  242. return None
  243. try:
  244. return cls.by_mxid[mxid]
  245. except KeyError:
  246. pass
  247. user = cast(cls, await super().get_by_mxid(mxid))
  248. if user is not None:
  249. user._add_to_cache()
  250. return user
  251. if create:
  252. user = cls(mxid)
  253. await user.insert()
  254. user._add_to_cache()
  255. return user
  256. return None
  257. @classmethod
  258. async def get_by_igpk(cls, igpk: int) -> Optional['User']:
  259. try:
  260. return cls.by_igpk[igpk]
  261. except KeyError:
  262. pass
  263. user = cast(cls, await super().get_by_igpk(igpk))
  264. if user is not None:
  265. user._add_to_cache()
  266. return user
  267. return None
  268. @classmethod
  269. async def all_logged_in(cls) -> AsyncGenerator['User', None]:
  270. users = await super().all_logged_in()
  271. user: cls
  272. for index, user in enumerate(users):
  273. try:
  274. yield cls.by_mxid[user.mxid]
  275. except KeyError:
  276. user._add_to_cache()
  277. yield user
  278. # endregion