auth.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. # mautrix-twitter - A Matrix-Twitter DM puppeting bridge
  2. # Copyright (C) 2020 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from typing import Tuple, TYPE_CHECKING
  17. import hashlib
  18. import hmac
  19. from mautrix.bridge.commands import HelpSection, command_handler
  20. from mauigpapi.state import AndroidState
  21. from mauigpapi.http import AndroidAPI
  22. from mauigpapi.errors import (IGLoginTwoFactorRequiredError, IGLoginBadPasswordError,
  23. IGLoginInvalidUserError, IGBad2FACodeError, IGCheckpointError,
  24. IGChallengeWrongCodeError)
  25. from mauigpapi.types import BaseResponseUser
  26. from .typehint import CommandEvent
  27. if TYPE_CHECKING:
  28. from ..user import User
  29. SECTION_AUTH = HelpSection("Authentication", 10, "")
  30. async def get_login_state(user: 'User', username: str, seed: str
  31. ) -> Tuple[AndroidAPI, AndroidState]:
  32. if user.command_status and user.command_status["action"] == "Login":
  33. api: AndroidAPI = user.command_status["api"]
  34. state: AndroidState = user.command_status["state"]
  35. else:
  36. state = AndroidState()
  37. seed = hmac.new(seed.encode("utf-8"), username.encode("utf-8"), hashlib.sha256).digest()
  38. state.device.generate(seed)
  39. api = AndroidAPI(state, log=user.api_log)
  40. await api.qe_sync_login_experiments()
  41. user.command_status = {
  42. "action": "Login",
  43. "state": state,
  44. "api": api,
  45. }
  46. return api, state
  47. @command_handler(needs_auth=False, management_only=True, help_section=SECTION_AUTH,
  48. help_text="Log in to Instagram", help_args="<_username_> <_password_>")
  49. async def login(evt: CommandEvent) -> None:
  50. if await evt.sender.is_logged_in():
  51. await evt.reply("You're already logged in")
  52. return
  53. elif len(evt.args) < 2:
  54. await evt.reply("**Usage:** `$cmdprefix+sp login <username> <password>`")
  55. return
  56. username = evt.args[0]
  57. password = " ".join(evt.args[1:])
  58. api, state = await get_login_state(evt.sender, username, evt.config["instagram.device_seed"])
  59. try:
  60. resp = await api.login(username, password)
  61. except IGLoginTwoFactorRequiredError as e:
  62. tfa_info = e.body.two_factor_info
  63. msg = "Username and password accepted, but you have two-factor authentication enabled.\n"
  64. if tfa_info.totp_two_factor_on:
  65. msg += "Send the code from your authenticator app here."
  66. elif tfa_info.sms_two_factor_on:
  67. msg += f"Send the code sent to {tfa_info.obfuscated_phone_number} here."
  68. else:
  69. msg += ("Unfortunately, none of your two-factor authentication methods are currently "
  70. "supported by the bridge.")
  71. return
  72. evt.sender.command_status = {
  73. **evt.sender.command_status,
  74. "next": enter_login_2fa,
  75. "username": tfa_info.username,
  76. "is_totp": tfa_info.totp_two_factor_on,
  77. "2fa_identifier": tfa_info.two_factor_identifier,
  78. }
  79. await evt.reply(msg)
  80. except IGCheckpointError:
  81. await api.challenge_auto(reset=True)
  82. evt.sender.command_status = {
  83. **evt.sender.command_status,
  84. "next": enter_login_security_code,
  85. }
  86. await evt.reply("Username and password accepted, but Instagram wants to verify it's really"
  87. " you. Please confirm the login and enter the security code here.")
  88. except IGLoginInvalidUserError:
  89. await evt.reply("Invalid username")
  90. except IGLoginBadPasswordError:
  91. await evt.reply("Incorrect password")
  92. except Exception as e:
  93. evt.log.exception("Failed to log in")
  94. await evt.reply(f"Failed to log in: {e}")
  95. else:
  96. await _post_login(evt, state, resp.logged_in_user)
  97. async def enter_login_2fa(evt: CommandEvent) -> None:
  98. api: AndroidAPI = evt.sender.command_status["api"]
  99. state: AndroidState = evt.sender.command_status["state"]
  100. identifier = evt.sender.command_status["2fa_identifier"]
  101. username = evt.sender.command_status["username"]
  102. is_totp = evt.sender.command_status["is_totp"]
  103. try:
  104. resp = await api.two_factor_login(username, code="".join(evt.args), identifier=identifier,
  105. is_totp=is_totp)
  106. except IGBad2FACodeError:
  107. await evt.reply("Invalid 2-factor authentication code. Please try again "
  108. "or use `$cmdprefix+sp cancel` to cancel.")
  109. except IGCheckpointError:
  110. await api.challenge_auto(reset=True)
  111. evt.sender.command_status = {
  112. **evt.sender.command_status,
  113. "next": enter_login_security_code,
  114. }
  115. await evt.reply("2-factor authentication code accepted, but Instagram wants to verify it's"
  116. " really you. Please confirm the login and enter the security code here.")
  117. except Exception as e:
  118. evt.log.exception("Failed to log in")
  119. await evt.reply(f"Failed to log in: {e}")
  120. evt.sender.command_status = None
  121. else:
  122. evt.sender.command_status = None
  123. await _post_login(evt, state, resp.logged_in_user)
  124. async def enter_login_security_code(evt: CommandEvent) -> None:
  125. api: AndroidAPI = evt.sender.command_status["api"]
  126. state: AndroidState = evt.sender.command_status["state"]
  127. try:
  128. resp = await api.challenge_send_security_code("".join(evt.args))
  129. except IGChallengeWrongCodeError as e:
  130. await evt.reply(f"Incorrect security code: {e}")
  131. except Exception as e:
  132. evt.log.exception("Failed to log in")
  133. await evt.reply(f"Failed to log in: {e}")
  134. evt.sender.command_status = None
  135. else:
  136. if not resp.logged_in_user:
  137. evt.log.error(f"Didn't get logged_in_user in challenge response "
  138. f"after entering security code: {resp.serialize()}")
  139. await evt.reply("An unknown error occurred. Please check the bridge logs.")
  140. return
  141. evt.sender.command_status = None
  142. await _post_login(evt, state, resp.logged_in_user)
  143. async def _post_login(evt: CommandEvent, state: AndroidState, user: BaseResponseUser) -> None:
  144. evt.sender.state = state
  145. pl = state.device.payload
  146. manufacturer, model = pl["manufacturer"], pl["model"]
  147. await evt.reply(f"Successfully logged in as {user.full_name} ([@{user.username}]"
  148. f"(https://instagram.com/{user.username}), user ID: {user.pk}).\n\n"
  149. f"The bridge will show up on Instagram as {manufacturer} {model}.")
  150. await evt.sender.try_connect()
  151. @command_handler(needs_auth=True, help_section=SECTION_AUTH, help_text="Disconnect the bridge from"
  152. "your Instagram account")
  153. async def logout(evt: CommandEvent) -> None:
  154. await evt.sender.logout()
  155. await evt.reply("Successfully logged out")