challenge.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2022 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. from ..errors import IGChallengeWrongCodeError, IGResponseError
  18. from ..types import ChallengeStateResponse
  19. from .base import BaseAndroidAPI
  20. class ChallengeAPI(BaseAndroidAPI):
  21. @property
  22. def __path(self) -> str:
  23. return f"/api/v1{self.state.challenge_path}"
  24. async def challenge_get_state(self) -> ChallengeStateResponse:
  25. query = {
  26. "guid": self.state.device.uuid,
  27. "device_id": self.state.device.id,
  28. }
  29. self.log.debug("Fetching current challenge state")
  30. return self.__handle_resp(
  31. await self.std_http_get(self.__path, query=query, response_type=ChallengeStateResponse)
  32. )
  33. async def challenge_select_method(
  34. self, choice: str, is_replay: bool = False
  35. ) -> ChallengeStateResponse:
  36. path = self.__path
  37. if is_replay:
  38. path = path.replace("/challenge/", "/challenge/replay/")
  39. req = {
  40. "choice": choice,
  41. "_csrftoken": self.state.cookies.csrf_token,
  42. "guid": self.state.device.uuid,
  43. "device_id": self.state.device.id,
  44. }
  45. self.log.debug(f"Selecting challenge method {choice} (replay: {is_replay})")
  46. return self.__handle_resp(
  47. await self.std_http_post(path, data=req, response_type=ChallengeStateResponse)
  48. )
  49. async def challenge_delta_review(self, was_me: bool = True) -> ChallengeStateResponse:
  50. return await self.challenge_select_method("0" if was_me else "1")
  51. async def challenge_send_phone_number(self, phone_number: str) -> ChallengeStateResponse:
  52. req = {
  53. "phone_number": phone_number,
  54. "_csrftoken": self.state.cookies.csrf_token,
  55. "guid": self.state.device.uuid,
  56. "device_id": self.state.device.id,
  57. }
  58. self.log.debug("Sending challenge phone number")
  59. return self.__handle_resp(
  60. await self.std_http_post(self.__path, data=req, response_type=ChallengeStateResponse)
  61. )
  62. async def challenge_send_security_code(self, code: str | int) -> ChallengeStateResponse:
  63. req = {
  64. "security_code": code,
  65. "_csrftoken": self.state.cookies.csrf_token,
  66. "guid": self.state.device.uuid,
  67. "device_id": self.state.device.id,
  68. }
  69. try:
  70. self.log.debug("Sending challenge security code")
  71. return self.__handle_resp(
  72. await self.std_http_post(
  73. self.__path, data=req, response_type=ChallengeStateResponse
  74. )
  75. )
  76. except IGResponseError as e:
  77. if e.response.status == 400:
  78. raise IGChallengeWrongCodeError((await e.response.json())["message"]) from e
  79. raise
  80. async def challenge_reset(self) -> ChallengeStateResponse:
  81. req = {
  82. "_csrftoken": self.state.cookies.csrf_token,
  83. "guid": self.state.device.uuid,
  84. "device_id": self.state.device.id,
  85. }
  86. self.log.debug("Resetting challenge")
  87. return self.__handle_resp(
  88. await self.std_http_post(
  89. self.__path.replace("/challenge/", "/challenge/reset/"),
  90. data=req,
  91. response_type=ChallengeStateResponse,
  92. )
  93. )
  94. async def challenge_auto(self, reset: bool = False) -> ChallengeStateResponse:
  95. if reset:
  96. await self.challenge_reset()
  97. challenge = self.state.challenge or await self.challenge_get_state()
  98. if challenge.step_name == "select_verify_method":
  99. self.log.debug(
  100. "Got select_verify_method challenge step, "
  101. f"auto-selecting {challenge.step_data.choice}"
  102. )
  103. return await self.challenge_select_method(challenge.step_data.choice)
  104. elif challenge.step_name == "delta_login_review":
  105. self.log.debug("Got delta_login_review challenge step, auto-selecting was_me=True")
  106. return await self.challenge_delta_review(was_me=True)
  107. else:
  108. self.log.debug(f"Got unknown challenge step {challenge.step_name}, not doing anything")
  109. return challenge
  110. def __handle_resp(self, resp: ChallengeStateResponse) -> ChallengeStateResponse:
  111. if resp.action == "close":
  112. self.log.debug(
  113. f"Challenge closed (step: {resp.step_name}, has user: {bool(resp.logged_in_user)})"
  114. )
  115. self.state.challenge = None
  116. self.state.challenge_path = None
  117. else:
  118. self.state.challenge = resp
  119. return resp