challenge.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2022 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. from ..errors import IGChallengeWrongCodeError, IGResponseError
  18. from ..types import ChallengeStateResponse
  19. from .base import BaseAndroidAPI
  20. class ChallengeAPI(BaseAndroidAPI):
  21. @property
  22. def __path(self) -> str:
  23. return f"/api/v1{self.state.challenge_path}"
  24. async def challenge_get_state(self) -> ChallengeStateResponse:
  25. query = {
  26. "guid": self.state.device.uuid,
  27. "device_id": self.state.device.id,
  28. }
  29. return self.__handle_resp(
  30. await self.std_http_get(self.__path, query=query, response_type=ChallengeStateResponse)
  31. )
  32. async def challenge_select_method(
  33. self, choice: str, is_replay: bool = False
  34. ) -> ChallengeStateResponse:
  35. path = self.__path
  36. if is_replay:
  37. path = path.replace("/challenge/", "/challenge/replay/")
  38. req = {
  39. "choice": choice,
  40. "_csrftoken": self.state.cookies.csrf_token,
  41. "guid": self.state.device.uuid,
  42. "device_id": self.state.device.id,
  43. }
  44. return self.__handle_resp(
  45. await self.std_http_post(path, data=req, response_type=ChallengeStateResponse)
  46. )
  47. async def challenge_delta_review(self, was_me: bool = True) -> ChallengeStateResponse:
  48. return await self.challenge_select_method("0" if was_me else "1")
  49. async def challenge_send_phone_number(self, phone_number: str) -> ChallengeStateResponse:
  50. req = {
  51. "phone_number": phone_number,
  52. "_csrftoken": self.state.cookies.csrf_token,
  53. "guid": self.state.device.uuid,
  54. "device_id": self.state.device.id,
  55. }
  56. return self.__handle_resp(
  57. await self.std_http_post(self.__path, data=req, response_type=ChallengeStateResponse)
  58. )
  59. async def challenge_send_security_code(self, code: str | int) -> ChallengeStateResponse:
  60. req = {
  61. "security_code": code,
  62. "_csrftoken": self.state.cookies.csrf_token,
  63. "guid": self.state.device.uuid,
  64. "device_id": self.state.device.id,
  65. }
  66. try:
  67. return self.__handle_resp(
  68. await self.std_http_post(
  69. self.__path, data=req, response_type=ChallengeStateResponse
  70. )
  71. )
  72. except IGResponseError as e:
  73. if e.response.status == 400:
  74. raise IGChallengeWrongCodeError((await e.response.json())["message"]) from e
  75. raise
  76. async def challenge_reset(self) -> ChallengeStateResponse:
  77. req = {
  78. "_csrftoken": self.state.cookies.csrf_token,
  79. "guid": self.state.device.uuid,
  80. "device_id": self.state.device.id,
  81. }
  82. return self.__handle_resp(
  83. await self.std_http_post(
  84. self.__path.replace("/challenge/", "/challenge/reset/"),
  85. data=req,
  86. response_type=ChallengeStateResponse,
  87. )
  88. )
  89. async def challenge_auto(self, reset: bool = False) -> ChallengeStateResponse:
  90. if reset:
  91. await self.challenge_reset()
  92. challenge = self.state.challenge or await self.challenge_get_state()
  93. if challenge.step_name == "select_verify_method":
  94. return await self.challenge_select_method(challenge.step_data.choice)
  95. elif challenge.step_name == "delta_login_review":
  96. return await self.challenge_delta_review(was_me=True)
  97. return challenge
  98. def __handle_resp(self, resp: ChallengeStateResponse) -> ChallengeStateResponse:
  99. if resp.action == "close":
  100. self.state.challenge = None
  101. self.state.challenge_path = None
  102. else:
  103. self.state.challenge = resp
  104. return resp