challenge.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2020 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from typing import Union
  17. from .base import BaseAndroidAPI
  18. from ..types import ChallengeStateResponse
  19. from ..errors import IGResponseError, IGChallengeWrongCodeError
  20. class ChallengeAPI(BaseAndroidAPI):
  21. @property
  22. def __path(self) -> str:
  23. return f"/api/v1{self.state.challenge_path}"
  24. async def challenge_get_state(self) -> ChallengeStateResponse:
  25. query = {
  26. "guid": self.state.device.uuid,
  27. "device_id": self.state.device.id,
  28. }
  29. return self.__handle_resp(await self.std_http_get(self.__path, query=query,
  30. response_type=ChallengeStateResponse))
  31. async def challenge_select_method(self, choice: str, is_replay: bool = False
  32. ) -> ChallengeStateResponse:
  33. path = self.__path
  34. if is_replay:
  35. path = path.replace("/challenge/", "/challenge/replay/")
  36. req = {
  37. "choice": choice,
  38. "_csrftoken": self.state.cookies.csrf_token,
  39. "guid": self.state.device.uuid,
  40. "device_id": self.state.device.id,
  41. }
  42. return self.__handle_resp(await self.std_http_post(path, data=req,
  43. response_type=ChallengeStateResponse))
  44. async def challenge_delta_review(self, was_me: bool = True) -> ChallengeStateResponse:
  45. return await self.challenge_select_method("0" if was_me else "1")
  46. async def challenge_send_phone_number(self, phone_number: str) -> ChallengeStateResponse:
  47. req = {
  48. "phone_number": phone_number,
  49. "_csrftoken": self.state.cookies.csrf_token,
  50. "guid": self.state.device.uuid,
  51. "device_id": self.state.device.id,
  52. }
  53. return self.__handle_resp(await self.std_http_post(self.__path, data=req,
  54. response_type=ChallengeStateResponse))
  55. async def challenge_send_security_code(self, code: Union[str, int]) -> ChallengeStateResponse:
  56. req = {
  57. "security_code": code,
  58. "_csrftoken": self.state.cookies.csrf_token,
  59. "guid": self.state.device.uuid,
  60. "device_id": self.state.device.id,
  61. }
  62. try:
  63. return self.__handle_resp(await self.std_http_post(
  64. self.__path, data=req, response_type=ChallengeStateResponse))
  65. except IGResponseError as e:
  66. if e.response.status == 400:
  67. raise IGChallengeWrongCodeError((await e.response.json())["message"]) from e
  68. raise
  69. async def challenge_reset(self) -> ChallengeStateResponse:
  70. req = {
  71. "_csrftoken": self.state.cookies.csrf_token,
  72. "guid": self.state.device.uuid,
  73. "device_id": self.state.device.id,
  74. }
  75. return self.__handle_resp(await self.std_http_post(
  76. self.__path.replace("/challenge/", "/challenge/reset/"),
  77. data=req, response_type=ChallengeStateResponse))
  78. async def challenge_auto(self, reset: bool = False) -> ChallengeStateResponse:
  79. if reset:
  80. await self.challenge_reset()
  81. challenge = self.state.challenge or await self.challenge_get_state()
  82. if challenge.step_name == "select_verify_method":
  83. return await self.challenge_select_method(challenge.step_data.choice)
  84. elif challenge.step_name == "delta_login_review":
  85. return await self.challenge_delta_review(was_me=True)
  86. return challenge
  87. def __handle_resp(self, resp: ChallengeStateResponse) -> ChallengeStateResponse:
  88. if resp.action == "close":
  89. self.state.challenge = None
  90. self.state.challenge_path = None
  91. else:
  92. self.state.challenge = resp
  93. return resp