auth.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2023 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. import base64
  18. import hashlib
  19. import hmac
  20. import zlib
  21. from mauigpapi.errors import (
  22. IGBad2FACodeError,
  23. IGChallengeError,
  24. IGChallengeWrongCodeError,
  25. IGLoginBadPasswordError,
  26. IGLoginInvalidCredentialsError,
  27. IGLoginInvalidUserError,
  28. IGLoginTwoFactorRequiredError,
  29. )
  30. from mauigpapi.http import AndroidAPI
  31. from mauigpapi.state import AndroidState
  32. from mauigpapi.types import BaseResponseUser
  33. from mautrix.bridge.commands import HelpSection, command_handler
  34. from mautrix.types import EventID
  35. from .. import user as u
  36. from .typehint import CommandEvent
  37. SECTION_AUTH = HelpSection("Authentication", 10, "")
  38. async def get_login_state(user: u.User, seed: str) -> tuple[AndroidAPI, AndroidState]:
  39. if user.command_status and user.command_status["action"] == "Login":
  40. api: AndroidAPI = user.command_status["api"]
  41. state: AndroidState = user.command_status["state"]
  42. else:
  43. state = AndroidState()
  44. seed = hmac.new(seed.encode("utf-8"), user.mxid.encode("utf-8"), hashlib.sha256).digest()
  45. state.device.generate(seed)
  46. api = AndroidAPI(
  47. state,
  48. log=user.api_log,
  49. proxy_handler=user.proxy_handler,
  50. on_proxy_update=user.on_proxy_update,
  51. on_response_error=user.on_response_error,
  52. )
  53. await api.get_mobile_config()
  54. user.command_status = {
  55. "action": "Login",
  56. "state": state,
  57. "api": api,
  58. }
  59. return api, state
  60. @command_handler(
  61. needs_auth=False,
  62. management_only=True,
  63. help_section=SECTION_AUTH,
  64. help_text="Log into Instagram",
  65. help_args="<_username_> <_password_>",
  66. )
  67. async def login(evt: CommandEvent) -> None:
  68. if await evt.sender.is_logged_in():
  69. await evt.reply("You're already logged in")
  70. return
  71. elif len(evt.args) < 2:
  72. await evt.reply("**Usage:** `$cmdprefix+sp login <username> <password>`")
  73. return
  74. username = evt.args[0]
  75. password = " ".join(evt.args[1:])
  76. await evt.redact()
  77. api, state = await get_login_state(evt.sender, evt.config["instagram.device_seed"])
  78. try:
  79. resp = await api.login(username, password)
  80. except IGLoginTwoFactorRequiredError as e:
  81. tfa_info = e.body.two_factor_info
  82. msg = "Username and password accepted, but you have two-factor authentication enabled.\n"
  83. if tfa_info.totp_two_factor_on:
  84. msg += "Send the code from your authenticator app here."
  85. if tfa_info.sms_two_factor_on:
  86. msg += f" Alternatively, send `resend-sms` to get an SMS code to •••{tfa_info.obfuscated_phone_number}"
  87. elif tfa_info.sms_two_factor_on:
  88. msg += (
  89. f"Send the code sent to •••{tfa_info.obfuscated_phone_number} here."
  90. " You can also send `resend-sms` if you didn't receive the code."
  91. )
  92. else:
  93. msg += (
  94. "Unfortunately, none of your two-factor authentication methods are currently "
  95. "supported by the bridge."
  96. )
  97. return
  98. evt.sender.command_status = {
  99. **evt.sender.command_status,
  100. "next": enter_login_2fa,
  101. "username": tfa_info.username,
  102. "is_totp": tfa_info.totp_two_factor_on,
  103. "has_sms": tfa_info.sms_two_factor_on,
  104. "2fa_identifier": tfa_info.two_factor_identifier,
  105. }
  106. await evt.reply(msg)
  107. except IGChallengeError:
  108. await evt.reply(
  109. "Login challenges aren't currently supported. "
  110. "Please set up real two-factor authentication."
  111. )
  112. await api.challenge_auto()
  113. evt.sender.command_status = {
  114. **evt.sender.command_status,
  115. "next": enter_login_security_code,
  116. }
  117. await evt.reply(
  118. "Username and password accepted, but Instagram wants to verify it's really"
  119. " you. Please confirm the login and enter the security code here."
  120. )
  121. except IGLoginInvalidUserError:
  122. await evt.reply("Invalid username")
  123. except IGLoginBadPasswordError:
  124. await evt.reply("Incorrect password")
  125. except IGLoginInvalidCredentialsError:
  126. await evt.reply("Incorrect username or password")
  127. except Exception as e:
  128. evt.log.exception("Failed to log in")
  129. await evt.reply(f"Failed to log in: {e}")
  130. else:
  131. await _post_login(evt, state, resp.logged_in_user)
  132. async def enter_login_2fa(evt: CommandEvent) -> None:
  133. api: AndroidAPI = evt.sender.command_status["api"]
  134. state: AndroidState = evt.sender.command_status["state"]
  135. identifier = evt.sender.command_status["2fa_identifier"]
  136. username = evt.sender.command_status["username"]
  137. is_totp = evt.sender.command_status["is_totp"]
  138. has_sms = evt.sender.command_status["has_sms"]
  139. code = "".join(evt.args).lower()
  140. if has_sms and code == "resend-sms":
  141. try:
  142. resp = await api.send_two_factor_login_sms(username, identifier=identifier)
  143. except Exception as e:
  144. evt.log.exception("Failed to re-request SMS code")
  145. await evt.reply(f"Failed to re-request SMS code: {e}")
  146. else:
  147. await evt.reply(
  148. f"Re-requested SMS code to {resp.two_factor_info.obfuscated_phone_number}"
  149. )
  150. evt.sender.command_status[
  151. "2fa_identifier"
  152. ] = resp.two_factor_info.two_factor_identifier
  153. evt.sender.command_status["is_totp"] = False
  154. return
  155. try:
  156. resp = await api.two_factor_login(
  157. username, code=code, identifier=identifier, is_totp=is_totp
  158. )
  159. except IGBad2FACodeError:
  160. await evt.reply(
  161. "Invalid 2-factor authentication code. Please try again "
  162. "or use `$cmdprefix+sp cancel` to cancel."
  163. )
  164. except IGChallengeError:
  165. await api.challenge_auto(reset=True)
  166. evt.sender.command_status = {
  167. **evt.sender.command_status,
  168. "next": enter_login_security_code,
  169. }
  170. await evt.reply(
  171. "2-factor authentication code accepted, but Instagram wants to verify it's"
  172. " really you. Please confirm the login and enter the security code here."
  173. )
  174. except Exception as e:
  175. evt.log.exception("Failed to log in")
  176. await evt.reply(f"Failed to log in: {e}")
  177. evt.sender.command_status = None
  178. else:
  179. evt.sender.command_status = None
  180. await _post_login(evt, state, resp.logged_in_user)
  181. async def enter_login_security_code(evt: CommandEvent) -> None:
  182. api: AndroidAPI = evt.sender.command_status["api"]
  183. state: AndroidState = evt.sender.command_status["state"]
  184. try:
  185. resp = await api.challenge_send_security_code("".join(evt.args))
  186. except IGChallengeWrongCodeError as e:
  187. await evt.reply(f"Incorrect security code: {e}")
  188. except Exception as e:
  189. evt.log.exception("Failed to log in")
  190. await evt.reply(f"Failed to log in: {e}")
  191. evt.sender.command_status = None
  192. else:
  193. if not resp.logged_in_user:
  194. evt.log.error(
  195. f"Didn't get logged_in_user in challenge response "
  196. f"after entering security code: {resp.serialize()}"
  197. )
  198. await evt.reply("An unknown error occurred. Please check the bridge logs.")
  199. return
  200. evt.sender.command_status = None
  201. await _post_login(evt, state, resp.logged_in_user)
  202. async def _post_login(evt: CommandEvent, state: AndroidState, user: BaseResponseUser) -> None:
  203. evt.sender.state = state
  204. pl = state.device.payload
  205. manufacturer, model = pl["manufacturer"], pl["model"]
  206. await evt.reply(
  207. f"Successfully logged in as {user.full_name} ([@{user.username}]"
  208. f"(https://instagram.com/{user.username}), user ID: {user.pk}).\n\n"
  209. f"The bridge will show up on Instagram as {manufacturer} {model}."
  210. )
  211. await evt.sender.try_connect()
  212. @command_handler(
  213. needs_auth=True,
  214. help_section=SECTION_AUTH,
  215. help_text="Disconnect the bridge from your Instagram account",
  216. )
  217. async def logout(evt: CommandEvent) -> None:
  218. await evt.sender.logout()
  219. await evt.reply("Successfully logged out")
  220. @command_handler(
  221. needs_auth=False,
  222. management_only=True,
  223. help_section=SECTION_AUTH,
  224. help_text="Log into Instagram with a pre-generated session blob",
  225. help_args="<_blob_>",
  226. )
  227. async def login_blob(evt: CommandEvent) -> EventID:
  228. if await evt.sender.is_logged_in():
  229. return await evt.reply("You're already logged in")
  230. elif len(evt.args) < 1:
  231. return await evt.reply("**Usage:** `$cmdprefix+sp login-blob <blob>`")
  232. await evt.redact()
  233. try:
  234. state = AndroidState.parse_json(zlib.decompress(base64.b64decode("".join(evt.args))))
  235. except Exception:
  236. evt.log.exception(f"{evt.sender} provided an invalid login blob")
  237. return await evt.reply("Invalid blob")
  238. evt.sender.state = state
  239. await evt.reply("Connecting...")
  240. await evt.sender.try_connect()
  241. await evt.reply("Maybe connected now, try pinging?")