user.py 13 KB


  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2020 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from typing import (Dict, Optional, AsyncIterable, Awaitable, AsyncGenerator, List, TYPE_CHECKING,
  17. cast)
  18. from collections import defaultdict
  19. import asyncio
  20. import logging
  21. import time
  22. from mauigpapi import AndroidAPI, AndroidState, AndroidMQTT
  23. from mauigpapi.mqtt import Connect, Disconnect, GraphQLSubscription, SkywalkerSubscription
  24. from mauigpapi.types import (CurrentUser, MessageSyncEvent, Operation, RealtimeDirectEvent,
  25. ActivityIndicatorData, TypingStatus)
  26. from mauigpapi.errors import IGNotLoggedInError
  27. from mautrix.bridge import BaseUser
  28. from mautrix.types import UserID, RoomID, EventID, TextMessageEventContent, MessageType
  29. from mautrix.appservice import AppService
  30. from mautrix.util.opt_prometheus import Summary, Gauge, async_time
  31. from .db import User as DBUser, Portal as DBPortal
  32. from .config import Config
  33. from . import puppet as pu, portal as po
  34. if TYPE_CHECKING:
  35. from .__main__ import InstagramBridge
  36. METRIC_MESSAGE = Summary("bridge_on_message", "calls to handle_message")
  37. METRIC_RTD = Summary("bridge_on_rtd", "calls to handle_rtd")
  38. METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
  39. METRIC_CONNECTED = Gauge("bridge_connected", "Bridged users connected to Instagram")
  40. class User(DBUser, BaseUser):
  41. _activity_indicator_ids: Dict[str, int] = {}
  42. by_mxid: Dict[UserID, 'User'] = {}
  43. by_igpk: Dict[int, 'User'] = {}
  44. config: Config
  45. az: AppService
  46. loop: asyncio.AbstractEventLoop
  47. client: Optional[AndroidAPI]
  48. mqtt: Optional[AndroidMQTT]
  49. _listen_task: Optional[asyncio.Task] = None
  50. permission_level: str
  51. username: Optional[str]
  52. _notice_room_lock: asyncio.Lock
  53. _notice_send_lock: asyncio.Lock
  54. _is_logged_in: bool
  55. def __init__(self, mxid: UserID, igpk: Optional[int] = None,
  56. state: Optional[AndroidState] = None, notice_room: Optional[RoomID] = None
  57. ) -> None:
  58. super().__init__(mxid=mxid, igpk=igpk, state=state, notice_room=notice_room)
  59. self._notice_room_lock = asyncio.Lock()
  60. self._notice_send_lock = asyncio.Lock()
  61. perms = self.config.get_permissions(mxid)
  62. self.is_whitelisted, self.is_admin, self.permission_level = perms
  63. self.log = self.log.getChild(self.mxid)
  64. self.client = None
  65. self.username = None
  66. self.dm_update_lock = asyncio.Lock()
  67. self._metric_value = defaultdict(lambda: False)
  68. self._is_logged_in = False
  69. self._listen_task = None
  70. self.command_status = None
  71. @classmethod
  72. def init_cls(cls, bridge: 'InstagramBridge') -> AsyncIterable[Awaitable[None]]:
  73. cls.bridge = bridge
  74. cls.config = bridge.config
  75. cls.az = bridge.az
  76. cls.loop = bridge.loop
  77. return (user.try_connect() async for user in cls.all_logged_in())
  78. # region Connection management
  79. async def is_logged_in(self) -> bool:
  80. return bool(self.client) and self._is_logged_in
  81. async def try_connect(self) -> None:
  82. try:
  83. await self.connect()
  84. except Exception:
  85. self.log.exception("Error while connecting to Instagram")
  86. async def connect(self) -> None:
  87. client = AndroidAPI(self.state)
  88. try:
  89. resp = await client.current_user()
  90. except IGNotLoggedInError as e:
  91. self.log.warning(f"Failed to connect to Instagram: {e}")
  92. # TODO show reason?
  93. await self.send_bridge_notice("You have been logged out of Instagram")
  94. return
  95. self.client = client
  96. self._is_logged_in = True
  97. self.igpk = resp.user.pk
  98. self.username = resp.user.username
  99. self._track_metric(METRIC_LOGGED_IN, True)
  100. self.by_igpk[self.igpk] = self
  101. self.mqtt = AndroidMQTT(self.state, loop=self.loop,
  102. log=logging.getLogger("mau.instagram.mqtt").getChild(self.mxid))
  103. self.mqtt.add_event_handler(Connect, self.on_connect)
  104. self.mqtt.add_event_handler(Disconnect, self.on_disconnect)
  105. self.mqtt.add_event_handler(MessageSyncEvent, self.handle_message)
  106. self.mqtt.add_event_handler(RealtimeDirectEvent, self.handle_rtd)
  107. await self.update()
  108. self.loop.create_task(self._try_sync_puppet(resp.user))
  109. self.loop.create_task(self._try_sync())
  110. async def on_connect(self, evt: Connect) -> None:
  111. self._track_metric(METRIC_CONNECTED, True)
  112. async def on_disconnect(self, evt: Disconnect) -> None:
  113. self._track_metric(METRIC_CONNECTED, False)
  114. # TODO this stuff could probably be moved to mautrix-python
  115. async def get_notice_room(self) -> RoomID:
  116. if not self.notice_room:
  117. async with self._notice_room_lock:
  118. # If someone already created the room while this call was waiting,
  119. # don't make a new room
  120. if self.notice_room:
  121. return self.notice_room
  122. self.notice_room = await self.az.intent.create_room(
  123. is_direct=True, invitees=[self.mxid],
  124. topic="Instagram bridge notices")
  125. await self.update()
  126. return self.notice_room
  127. async def send_bridge_notice(self, text: str, edit: Optional[EventID] = None,
  128. important: bool = False) -> Optional[EventID]:
  129. event_id = None
  130. try:
  131. self.log.debug("Sending bridge notice: %s", text)
  132. content = TextMessageEventContent(body=text, msgtype=(MessageType.TEXT if important
  133. else MessageType.NOTICE))
  134. if edit:
  135. content.set_edit(edit)
  136. # This is locked to prevent notices going out in the wrong order
  137. async with self._notice_send_lock:
  138. event_id = await self.az.intent.send_message(await self.get_notice_room(), content)
  139. except Exception:
  140. self.log.warning("Failed to send bridge notice", exc_info=True)
  141. return edit or event_id
  142. async def _try_sync_puppet(self, user_info: CurrentUser) -> None:
  143. puppet = await pu.Puppet.get_by_pk(self.igpk)
  144. try:
  145. await puppet.update_info(user_info, self)
  146. except Exception:
  147. self.log.exception("Failed to update own puppet info")
  148. try:
  149. if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
  150. self.log.info(f"Automatically enabling custom puppet")
  151. await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
  152. except Exception:
  153. self.log.exception("Failed to automatically enable custom puppet")
  154. async def _try_sync(self) -> None:
  155. try:
  156. await self.sync()
  157. except Exception:
  158. self.log.exception("Exception while syncing")
  159. async def get_direct_chats(self) -> Dict[UserID, List[RoomID]]:
  160. return {
  161. pu.Puppet.get_mxid_from_id(portal.other_user_pk): [portal.mxid]
  162. for portal in await DBPortal.find_private_chats_of(self.igpk)
  163. if portal.mxid
  164. }
  165. async def sync(self) -> None:
  166. resp = await self.client.get_inbox()
  167. limit = self.config["bridge.initial_conversation_sync"]
  168. threads = sorted(resp.inbox.threads, key=lambda thread: thread.last_activity_at)
  169. if limit < 0:
  170. limit = len(threads)
  171. for i, thread in enumerate(threads):
  172. portal = await po.Portal.get_by_thread(thread, self.igpk)
  173. if portal.mxid:
  174. await portal.update_matrix_room(self, thread, backfill=True)
  175. elif i < limit:
  176. await portal.create_matrix_room(self, thread)
  177. await self.update_direct_chats()
  178. self._listen_task = self.loop.create_task(self.mqtt.listen(
  179. graphql_subs={GraphQLSubscription.app_presence(),
  180. GraphQLSubscription.direct_typing(self.state.user_id),
  181. GraphQLSubscription.direct_status()},
  182. skywalker_subs={SkywalkerSubscription.direct_sub(self.state.user_id),
  183. SkywalkerSubscription.live_sub(self.state.user_id)},
  184. seq_id=resp.seq_id, snapshot_at_ms=resp.snapshot_at_ms))
  185. async def stop(self) -> None:
  186. if self.mqtt:
  187. self.mqtt.disconnect()
  188. self._track_metric(METRIC_CONNECTED, False)
  189. await self.update()
  190. async def logout(self) -> None:
  191. if self.mqtt:
  192. self.mqtt.disconnect()
  193. self._track_metric(METRIC_CONNECTED, False)
  194. self._track_metric(METRIC_LOGGED_IN, False)
  195. puppet = await pu.Puppet.get_by_pk(self.igpk, create=False)
  196. if puppet and puppet.is_real_user:
  197. await puppet.switch_mxid(None, None)
  198. try:
  199. del self.by_igpk[self.igpk]
  200. except KeyError:
  201. pass
  202. self.client = None
  203. self.mqtt = None
  204. self.state = None
  205. self._is_logged_in = False
  206. await self.update()
  207. # endregion
  208. # region Event handlers
  209. @async_time(METRIC_MESSAGE)
  210. async def handle_message(self, evt: MessageSyncEvent) -> None:
  211. portal = await po.Portal.get_by_thread_id(evt.message.thread_id, receiver=self.igpk)
  212. if not portal:
  213. # TODO try to find the thread?
  214. self.log.warning(f"Ignoring message to unknown thread {evt.message.thread_id}")
  215. return
  216. elif not portal.mxid:
  217. # TODO create portal room?
  218. self.log.warning(f"Ignoring message to thread with no room {evt.message.thread_id}")
  219. return
  220. self.log.trace(f"Received message sync event {evt.message}")
  221. sender = await pu.Puppet.get_by_pk(evt.message.user_id) if evt.message.user_id else None
  222. if evt.message.op == Operation.ADD:
  223. if not sender:
  224. # I don't think we care about adds with no sender
  225. return
  226. await portal.handle_instagram_item(self, sender, evt.message)
  227. elif evt.message.op == Operation.REMOVE:
  228. # Removes don't have a sender, only the message sender can unsend messages anyway
  229. await portal.handle_instagram_remove(evt.message.item_id)
  230. elif evt.message.op == Operation.REPLACE:
  231. await portal.handle_instagram_update(evt.message)
  232. @async_time(METRIC_RTD)
  233. async def handle_rtd(self, evt: RealtimeDirectEvent) -> None:
  234. if not isinstance(evt.value, ActivityIndicatorData):
  235. return
  236. now = int(time.time() * 1000)
  237. date = int(evt.value.timestamp) // 1000
  238. expiry = date + evt.value.ttl
  239. if expiry < now:
  240. return
  241. if evt.activity_indicator_id in self._activity_indicator_ids:
  242. return
  243. # TODO clear expired items from this dict
  244. self._activity_indicator_ids[evt.activity_indicator_id] = expiry
  245. puppet = await pu.Puppet.get_by_pk(int(evt.value.sender_id))
  246. portal = await po.Portal.get_by_thread_id(evt.thread_id, receiver=self.igpk)
  247. if not puppet or not portal:
  248. return
  249. is_typing = evt.value.activity_status != TypingStatus.OFF
  250. await puppet.intent_for(portal).set_typing(portal.mxid, is_typing=is_typing,
  251. timeout=evt.value.ttl)
  252. # endregion
  253. # region Database getters
  254. def _add_to_cache(self) -> None:
  255. self.by_mxid[self.mxid] = self
  256. if self.igpk:
  257. self.by_igpk[self.igpk] = self
  258. @classmethod
  259. async def get_by_mxid(cls, mxid: UserID, create: bool = True) -> Optional['User']:
  260. # Never allow ghosts to be users
  261. if pu.Puppet.get_id_from_mxid(mxid):
  262. return None
  263. try:
  264. return cls.by_mxid[mxid]
  265. except KeyError:
  266. pass
  267. user = cast(cls, await super().get_by_mxid(mxid))
  268. if user is not None:
  269. user._add_to_cache()
  270. return user
  271. if create:
  272. user = cls(mxid)
  273. await user.insert()
  274. user._add_to_cache()
  275. return user
  276. return None
  277. @classmethod
  278. async def get_by_igpk(cls, igpk: int) -> Optional['User']:
  279. try:
  280. return cls.by_igpk[igpk]
  281. except KeyError:
  282. pass
  283. user = cast(cls, await super().get_by_igpk(igpk))
  284. if user is not None:
  285. user._add_to_cache()
  286. return user
  287. return None
  288. @classmethod
  289. async def all_logged_in(cls) -> AsyncGenerator['User', None]:
  290. users = await super().all_logged_in()
  291. user: cls
  292. for index, user in enumerate(users):
  293. try:
  294. yield cls.by_mxid[user.mxid]
  295. except KeyError:
  296. user._add_to_cache()
  297. yield user
  298. # endregion