auth.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2022 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. import hashlib
  18. import hmac
  19. from mauigpapi.errors import (
  20. IGBad2FACodeError,
  21. IGChallengeError,
  22. IGChallengeWrongCodeError,
  23. IGLoginBadPasswordError,
  24. IGLoginInvalidUserError,
  25. IGLoginTwoFactorRequiredError,
  26. )
  27. from mauigpapi.http import AndroidAPI
  28. from mauigpapi.state import AndroidState
  29. from mauigpapi.types import BaseResponseUser
  30. from mautrix.bridge.commands import HelpSection, command_handler
  31. from .. import user as u
  32. from .typehint import CommandEvent
  33. SECTION_AUTH = HelpSection("Authentication", 10, "")
  34. async def get_login_state(user: u.User, seed: str) -> tuple[AndroidAPI, AndroidState]:
  35. if user.command_status and user.command_status["action"] == "Login":
  36. api: AndroidAPI = user.command_status["api"]
  37. state: AndroidState = user.command_status["state"]
  38. else:
  39. state = AndroidState()
  40. seed = hmac.new(seed.encode("utf-8"), user.mxid.encode("utf-8"), hashlib.sha256).digest()
  41. state.device.generate(seed)
  42. api = AndroidAPI(state, log=user.api_log, proxy_handler=user.proxy_handler)
  43. await api.qe_sync_login_experiments()
  44. user.command_status = {
  45. "action": "Login",
  46. "state": state,
  47. "api": api,
  48. }
  49. return api, state
  50. @command_handler(
  51. needs_auth=False,
  52. management_only=True,
  53. help_section=SECTION_AUTH,
  54. help_text="Log in to Instagram",
  55. help_args="<_username_> <_password_>",
  56. )
  57. async def login(evt: CommandEvent) -> None:
  58. if await evt.sender.is_logged_in():
  59. await evt.reply("You're already logged in")
  60. return
  61. elif len(evt.args) < 2:
  62. await evt.reply("**Usage:** `$cmdprefix+sp login <username> <password>`")
  63. return
  64. username = evt.args[0]
  65. password = " ".join(evt.args[1:])
  66. await evt.redact()
  67. api, state = await get_login_state(evt.sender, evt.config["instagram.device_seed"])
  68. try:
  69. resp = await api.login(username, password)
  70. except IGLoginTwoFactorRequiredError as e:
  71. tfa_info = e.body.two_factor_info
  72. msg = "Username and password accepted, but you have two-factor authentication enabled.\n"
  73. if tfa_info.totp_two_factor_on:
  74. msg += "Send the code from your authenticator app here."
  75. elif tfa_info.sms_two_factor_on:
  76. msg += f"Send the code sent to {tfa_info.obfuscated_phone_number} here."
  77. else:
  78. msg += (
  79. "Unfortunately, none of your two-factor authentication methods are currently "
  80. "supported by the bridge."
  81. )
  82. return
  83. evt.sender.command_status = {
  84. **evt.sender.command_status,
  85. "next": enter_login_2fa,
  86. "username": tfa_info.username,
  87. "is_totp": tfa_info.totp_two_factor_on,
  88. "2fa_identifier": tfa_info.two_factor_identifier,
  89. }
  90. await evt.reply(msg)
  91. except IGChallengeError:
  92. await api.challenge_auto(reset=True)
  93. evt.sender.command_status = {
  94. **evt.sender.command_status,
  95. "next": enter_login_security_code,
  96. }
  97. await evt.reply(
  98. "Username and password accepted, but Instagram wants to verify it's really"
  99. " you. Please confirm the login and enter the security code here."
  100. )
  101. except IGLoginInvalidUserError:
  102. await evt.reply("Invalid username")
  103. except IGLoginBadPasswordError:
  104. await evt.reply("Incorrect password")
  105. except Exception as e:
  106. evt.log.exception("Failed to log in")
  107. await evt.reply(f"Failed to log in: {e}")
  108. else:
  109. await _post_login(evt, state, resp.logged_in_user)
  110. async def enter_login_2fa(evt: CommandEvent) -> None:
  111. api: AndroidAPI = evt.sender.command_status["api"]
  112. state: AndroidState = evt.sender.command_status["state"]
  113. identifier = evt.sender.command_status["2fa_identifier"]
  114. username = evt.sender.command_status["username"]
  115. is_totp = evt.sender.command_status["is_totp"]
  116. try:
  117. resp = await api.two_factor_login(
  118. username, code="".join(evt.args), identifier=identifier, is_totp=is_totp
  119. )
  120. except IGBad2FACodeError:
  121. await evt.reply(
  122. "Invalid 2-factor authentication code. Please try again "
  123. "or use `$cmdprefix+sp cancel` to cancel."
  124. )
  125. except IGChallengeError:
  126. await api.challenge_auto(reset=True)
  127. evt.sender.command_status = {
  128. **evt.sender.command_status,
  129. "next": enter_login_security_code,
  130. }
  131. await evt.reply(
  132. "2-factor authentication code accepted, but Instagram wants to verify it's"
  133. " really you. Please confirm the login and enter the security code here."
  134. )
  135. except Exception as e:
  136. evt.log.exception("Failed to log in")
  137. await evt.reply(f"Failed to log in: {e}")
  138. evt.sender.command_status = None
  139. else:
  140. evt.sender.command_status = None
  141. await _post_login(evt, state, resp.logged_in_user)
  142. async def enter_login_security_code(evt: CommandEvent) -> None:
  143. api: AndroidAPI = evt.sender.command_status["api"]
  144. state: AndroidState = evt.sender.command_status["state"]
  145. try:
  146. resp = await api.challenge_send_security_code("".join(evt.args))
  147. except IGChallengeWrongCodeError as e:
  148. await evt.reply(f"Incorrect security code: {e}")
  149. except Exception as e:
  150. evt.log.exception("Failed to log in")
  151. await evt.reply(f"Failed to log in: {e}")
  152. evt.sender.command_status = None
  153. else:
  154. if not resp.logged_in_user:
  155. evt.log.error(
  156. f"Didn't get logged_in_user in challenge response "
  157. f"after entering security code: {resp.serialize()}"
  158. )
  159. await evt.reply("An unknown error occurred. Please check the bridge logs.")
  160. return
  161. evt.sender.command_status = None
  162. await _post_login(evt, state, resp.logged_in_user)
  163. async def _post_login(evt: CommandEvent, state: AndroidState, user: BaseResponseUser) -> None:
  164. evt.sender.state = state
  165. pl = state.device.payload
  166. manufacturer, model = pl["manufacturer"], pl["model"]
  167. await evt.reply(
  168. f"Successfully logged in as {user.full_name} ([@{user.username}]"
  169. f"(https://instagram.com/{user.username}), user ID: {user.pk}).\n\n"
  170. f"The bridge will show up on Instagram as {manufacturer} {model}."
  171. )
  172. await evt.sender.try_connect()
  173. @command_handler(
  174. needs_auth=True,
  175. help_section=SECTION_AUTH,
  176. help_text="Disconnect the bridge from your Instagram account",
  177. )
  178. async def logout(evt: CommandEvent) -> None:
  179. await evt.sender.logout()
  180. await evt.reply("Successfully logged out")