challenge.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2022 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. from ..errors import IGChallengeWrongCodeError, IGResponseError
  18. from ..types import ChallengeStateResponse
  19. from .base import BaseAndroidAPI
  20. class ChallengeAPI(BaseAndroidAPI):
  21. @property
  22. def __path(self) -> str:
  23. return f"/api/v1{self.state.challenge_path}"
  24. async def challenge_get_state(self) -> ChallengeStateResponse:
  25. query = {
  26. "guid": self.state.device.uuid,
  27. "device_id": self.state.device.id,
  28. }
  29. if self.state.challenge_context:
  30. query["challenge_context"] = self.state.challenge_context.json()
  31. self.log.debug("Fetching current challenge state")
  32. return self.__handle_resp(
  33. await self.std_http_get(self.__path, query=query, response_type=ChallengeStateResponse)
  34. )
  35. async def challenge_select_method(
  36. self, choice: str, is_replay: bool = False
  37. ) -> ChallengeStateResponse:
  38. path = self.__path
  39. if is_replay:
  40. path = path.replace("/challenge/", "/challenge/replay/")
  41. req = {
  42. "choice": choice,
  43. "_csrftoken": self.state.cookies.csrf_token,
  44. "guid": self.state.device.uuid,
  45. "device_id": self.state.device.id,
  46. }
  47. self.log.debug(f"Selecting challenge method {choice} (replay: {is_replay})")
  48. return self.__handle_resp(
  49. await self.std_http_post(path, data=req, response_type=ChallengeStateResponse)
  50. )
  51. async def challenge_delta_review(self, was_me: bool = True) -> ChallengeStateResponse:
  52. return await self.challenge_select_method("0" if was_me else "1")
  53. async def challenge_send_phone_number(self, phone_number: str) -> ChallengeStateResponse:
  54. req = {
  55. "phone_number": phone_number,
  56. "_csrftoken": self.state.cookies.csrf_token,
  57. "guid": self.state.device.uuid,
  58. "device_id": self.state.device.id,
  59. }
  60. self.log.debug("Sending challenge phone number")
  61. return self.__handle_resp(
  62. await self.std_http_post(self.__path, data=req, response_type=ChallengeStateResponse)
  63. )
  64. async def challenge_send_security_code(self, code: str | int) -> ChallengeStateResponse:
  65. req = {
  66. "security_code": code,
  67. "_csrftoken": self.state.cookies.csrf_token,
  68. "guid": self.state.device.uuid,
  69. "device_id": self.state.device.id,
  70. }
  71. try:
  72. self.log.debug("Sending challenge security code")
  73. return self.__handle_resp(
  74. await self.std_http_post(
  75. self.__path, data=req, response_type=ChallengeStateResponse
  76. )
  77. )
  78. except IGResponseError as e:
  79. if e.response.status == 400:
  80. raise IGChallengeWrongCodeError((await e.response.json())["message"]) from e
  81. raise
  82. async def challenge_reset(self) -> ChallengeStateResponse:
  83. req = {
  84. "_csrftoken": self.state.cookies.csrf_token,
  85. "guid": self.state.device.uuid,
  86. "device_id": self.state.device.id,
  87. }
  88. self.log.debug("Resetting challenge")
  89. return self.__handle_resp(
  90. await self.std_http_post(
  91. self.__path.replace("/challenge/", "/challenge/reset/"),
  92. data=req,
  93. response_type=ChallengeStateResponse,
  94. )
  95. )
  96. async def challenge_auto(self, reset: bool = False) -> ChallengeStateResponse:
  97. if reset:
  98. await self.challenge_reset()
  99. challenge = self.state.challenge or await self.challenge_get_state()
  100. if challenge.step_name == "select_verify_method":
  101. self.log.debug(
  102. "Got select_verify_method challenge step, "
  103. f"auto-selecting {challenge.step_data.choice}"
  104. )
  105. return await self.challenge_select_method(challenge.step_data.choice)
  106. elif challenge.step_name == "delta_login_review":
  107. self.log.debug("Got delta_login_review challenge step, auto-selecting was_me=True")
  108. return await self.challenge_delta_review(was_me=True)
  109. else:
  110. self.log.debug(f"Got unknown challenge step {challenge.step_name}, not doing anything")
  111. return challenge
  112. def __handle_resp(self, resp: ChallengeStateResponse) -> ChallengeStateResponse:
  113. if resp.action == "close":
  114. self.log.debug(
  115. f"Challenge closed (step: {resp.step_name}, has user: {bool(resp.logged_in_user)})"
  116. )
  117. self.state.challenge = None
  118. self.state.challenge_context = None
  119. self.state.challenge_path = None
  120. else:
  121. self.state.challenge = resp
  122. return resp