login.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2022 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. import base64
  18. import io
  19. import json
  20. import struct
  21. import time
  22. import uuid
  23. from Crypto.Cipher import AES, PKCS1_v1_5
  24. from Crypto.PublicKey import RSA
  25. from Crypto.Random import get_random_bytes
  26. from ..types import FacebookLoginResponse, LoginErrorResponse, LoginResponse, LogoutResponse
  27. from .base import BaseAndroidAPI
  28. class LoginAPI(BaseAndroidAPI):
  29. async def get_mobile_config(self) -> None:
  30. req = {
  31. "bool_opt_policy": "0",
  32. "mobileconfigsessionless": "",
  33. "api_version": "3",
  34. "unit_type": "1",
  35. "query_hash": "dae17f1d3276207ebfe78f7a67cc9a04d4b88ff8c88dfc17e148fafb3f655b8e",
  36. "device_id": self.state.device.id,
  37. "fetch_type": "ASYNC_FULL",
  38. "family_device_id": self.state.device.fdid.upper(),
  39. }
  40. await self.std_http_post("/api/v1/launcher/mobileconfig/", data=req)
  41. async def login(
  42. self,
  43. username: str,
  44. password: str | None = None,
  45. encrypted_password: str | None = None,
  46. ) -> LoginResponse:
  47. if password:
  48. if encrypted_password:
  49. raise ValueError("Only one of password or encrypted_password must be provided")
  50. encrypted_password = self._encrypt_password(password)
  51. elif not encrypted_password:
  52. raise ValueError("One of password or encrypted_password is required")
  53. req = {
  54. "username": username,
  55. "enc_password": encrypted_password,
  56. "guid": self.state.device.uuid,
  57. "phone_id": self.state.device.phone_id,
  58. "device_id": self.state.device.id,
  59. "adid": self.state.device.adid,
  60. "google_tokens": "[]",
  61. "login_attempt_count": "0", # TODO maybe cache this somewhere?
  62. "country_codes": json.dumps([{"country_code": "1", "source": "default"}]),
  63. "jazoest": self._jazoest,
  64. }
  65. return await self.std_http_post(
  66. "/api/v1/accounts/login/", data=req, response_type=LoginResponse
  67. )
  68. async def one_tap_app_login(self, user_id: str, nonce: str) -> LoginResponse:
  69. req = {
  70. "phone_id": self.state.device.phone_id,
  71. "user_id": user_id,
  72. "adid": self.state.device.adid,
  73. "guid": self.state.device.uuid,
  74. "device_id": self.state.device.id,
  75. "login_nonce": nonce,
  76. }
  77. return await self.std_http_post(
  78. "/api/v1/accounts/one_tap_app_login/", data=req, response_type=LoginResponse
  79. )
  80. async def send_two_factor_login_sms(
  81. self, username: str, identifier: str
  82. ) -> LoginErrorResponse:
  83. req = {
  84. "two_factor_identifier": identifier,
  85. "username": username,
  86. "guid": self.state.device.uuid,
  87. "device_id": self.state.device.id,
  88. }
  89. return await self.std_http_post(
  90. "/api/v1/accounts/send_two_factor_login_sms/",
  91. data=req,
  92. response_type=LoginErrorResponse,
  93. )
  94. async def two_factor_login(
  95. self,
  96. username: str,
  97. code: str,
  98. identifier: str,
  99. trust_device: bool = True,
  100. is_totp: bool = True,
  101. ) -> LoginResponse:
  102. req = {
  103. "verification_code": code,
  104. "two_factor_identifier": identifier,
  105. "username": username,
  106. "trust_this_device": "1" if trust_device else "0",
  107. "guid": self.state.device.uuid,
  108. "device_id": self.state.device.id,
  109. # TOTP = 3, Backup code = 2, SMS = 1
  110. "verification_method": "3" if is_totp else "1",
  111. }
  112. return await self.std_http_post(
  113. "/api/v1/accounts/two_factor_login/", data=req, response_type=LoginResponse
  114. )
  115. # async def two_factor_trusted_status(self, username: str, identifier: str, polling_nonce: str):
  116. # pass
  117. async def facebook_signup(self, fb_access_token: str) -> FacebookLoginResponse:
  118. req = {
  119. "jazoest": self._jazoest,
  120. "dryrun": "true",
  121. "fb_req_flag": "false",
  122. "phone_id": self.state.device.phone_id,
  123. "force_signup_with_fb_after_cp_claiming": "false",
  124. "adid": self.state.device.adid,
  125. "guid": self.state.device.uuid,
  126. "device_id": self.state.device.id,
  127. # "waterfall_id": uuid4(),
  128. "fb_access_token": fb_access_token,
  129. }
  130. return await self.std_http_post(
  131. "/api/v1/fb/facebook_signup/", data=req, response_type=FacebookLoginResponse
  132. )
  133. async def logout(self, one_tap_app_login: bool | None = None) -> LogoutResponse:
  134. req = {
  135. "guid": self.state.device.uuid,
  136. "phone_id": self.state.device.phone_id,
  137. "device_id": self.state.device.id,
  138. "_uuid": self.state.device.uuid,
  139. "one_tap_app_login": one_tap_app_login,
  140. }
  141. return await self.std_http_post(
  142. "/api/v1/accounts/logout/", data=req, response_type=LogoutResponse
  143. )
  144. async def change_password(self, old_password: str, new_password: str):
  145. return self.change_password_encrypted(
  146. old_password=self._encrypt_password(old_password),
  147. new_password1=self._encrypt_password(new_password),
  148. new_password2=self._encrypt_password(new_password),
  149. )
  150. async def change_password_encrypted(
  151. self, old_password: str, new_password1: str, new_password2: str
  152. ):
  153. req = {
  154. "_csrftoken": self.state.cookies.csrf_token,
  155. "_uid": self.state.session.ds_user_id,
  156. "_uuid": self.state.device.uuid,
  157. "enc_old_password": old_password,
  158. "enc_new_password1": new_password1,
  159. "enc_new_password2": new_password2,
  160. }
  161. # TODO parse response content
  162. return await self.std_http_post("/api/v1/accounts/change_password/", data=req)
  163. def _encrypt_password(self, password: str) -> str:
  164. # Key and IV for AES encryption
  165. rand_key = get_random_bytes(32)
  166. iv = get_random_bytes(12)
  167. # Encrypt AES key with Instagram's RSA public key
  168. pubkey_bytes = base64.b64decode(self.state.session.password_encryption_pubkey)
  169. pubkey = RSA.import_key(pubkey_bytes)
  170. cipher_rsa = PKCS1_v1_5.new(pubkey)
  171. encrypted_rand_key = cipher_rsa.encrypt(rand_key)
  172. cipher_aes = AES.new(rand_key, AES.MODE_GCM, nonce=iv)
  173. # Add the current time to the additional authenticated data (AAD) section
  174. current_time = int(time.time())
  175. cipher_aes.update(str(current_time).encode("utf-8"))
  176. # Encrypt the password and get the AES MAC auth tag
  177. encrypted_passwd, auth_tag = cipher_aes.encrypt_and_digest(password.encode("utf-8"))
  178. buf = io.BytesIO()
  179. # 1 is presumably the version
  180. buf.write(bytes([1, int(self.state.session.password_encryption_key_id)]))
  181. buf.write(iv)
  182. # Length of the encrypted AES key as a little-endian 16-bit int
  183. buf.write(struct.pack("<h", len(encrypted_rand_key)))
  184. buf.write(encrypted_rand_key)
  185. buf.write(auth_tag)
  186. buf.write(encrypted_passwd)
  187. encoded = base64.b64encode(buf.getvalue()).decode("utf-8")
  188. return f"#PWD_INSTAGRAM:4:{current_time}:{encoded}"
  189. @property
  190. def _jazoest(self) -> str:
  191. return f"2{sum(ord(i) for i in self.state.device.phone_id)}"