login.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2020 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from typing import Optional
  17. import base64
  18. import struct
  19. import time
  20. import json
  21. import io
  22. from Crypto.PublicKey import RSA
  23. from Crypto.Cipher import PKCS1_OAEP, AES
  24. from Crypto.Random import get_random_bytes
  25. from ..types import LoginResponse, LoginResponseUser, LogoutResponse
  26. from .base import BaseAndroidAPI
  27. class LoginAPI(BaseAndroidAPI):
  28. async def login(self, username: str, password: Optional[str] = None,
  29. encrypted_password: Optional[str] = None) -> LoginResponse:
  30. if password:
  31. if encrypted_password:
  32. raise ValueError("Only one of password or encrypted_password must be provided")
  33. encrypted_password = self._encrypt_password(password)
  34. elif not encrypted_password:
  35. raise ValueError("One of password or encrypted_password is required")
  36. req = {
  37. "username": username,
  38. "enc_password": encrypted_password,
  39. "guid": self.state.device.uuid,
  40. "phone_id": self.state.device.phone_id,
  41. "_csrftoken": self.state.cookies.csrf_token,
  42. "device_id": self.state.device.id,
  43. "adid": "", # not set on pre-login
  44. "google_tokens": "[]",
  45. "login_attempt_count": 0, # TODO maybe cache this somewhere?
  46. "country_codes": json.dumps([{"country_code": "1", "source": "default"}]),
  47. "jazoest": self._jazoest,
  48. }
  49. resp = await self.http.post(url=self.url / "api/v1/accounts/login/", data=self.sign(req))
  50. return LoginResponse.deserialize(await self.handle_response(resp))
  51. async def one_tap_app_login(self, user_id: str, nonce: str) -> LoginResponse:
  52. req = {
  53. "phone_id": self.state.device.phone_id,
  54. "_csrftoken": self.state.cookies.csrf_token,
  55. "user_id": user_id,
  56. "adid": self.state.device.adid,
  57. "guid": self.state.device.uuid,
  58. "device_id": self.state.device.id,
  59. "login_nonce": nonce,
  60. }
  61. resp = await self.http.post(url=self.url / "api/v1/accounts/one_tap_app_login/",
  62. data=self.sign(req))
  63. return LoginResponse.deserialize(await self.handle_response(resp))
  64. async def two_factor_login(self, username: str, code: str, identifier: str,
  65. trust_device: bool = True, method: Optional[str] = "1"
  66. ) -> LoginResponseUser:
  67. req = {
  68. "verification_code": code,
  69. "_csrftoken": self.state.cookies.csrf_token,
  70. "two_factor_identifier": identifier,
  71. "username": username,
  72. "trust_this_device": "1" if trust_device else "0",
  73. "guid": self.state.device.uuid,
  74. "device_id": self.state.device.id,
  75. "verification_method": method,
  76. }
  77. resp = await self.http.post(url=self.url / "api/v1/accounts/one_tap_app_login/",
  78. data=self.sign(req))
  79. return LoginResponseUser.deserialize(await self.handle_response(resp))
  80. async def logout(self, one_tap_app_login: Optional[bool] = None) -> LogoutResponse:
  81. req = {
  82. "guid": self.state.device.uuid,
  83. "phone_id": self.state.device.phone_id,
  84. "_csrftoken": self.state.cookies.csrf_token,
  85. "device_id": self.state.device.id,
  86. "_uuid": self.state.device.uuid,
  87. "one_tap_app_login": one_tap_app_login,
  88. }
  89. resp = await self.http.post(url=self.url / "api/v1/accounts/logout/",
  90. data=self.sign(req))
  91. return LogoutResponse.deserialize(await self.handle_response(resp))
  92. async def change_password(self, old_password: str, new_password: str):
  93. return self.change_password_encrypted(old_password=self._encrypt_password(old_password),
  94. new_password1=self._encrypt_password(new_password),
  95. new_password2=self._encrypt_password(new_password))
  96. async def change_password_encrypted(self, old_password: str, new_password1: str,
  97. new_password2: str):
  98. req = {
  99. "_csrftoken": self.state.cookies.csrf_token,
  100. "_uid": self.state.cookies.user_id,
  101. "_uuid": self.state.device.uuid,
  102. "enc_old_password": old_password,
  103. "enc_new_password1": new_password1,
  104. "enc_new_password2": new_password2,
  105. }
  106. resp = await self.http.post(self.url / "api/v1/accounts/change_password/",
  107. data=self.sign(req))
  108. # TODO parse response content
  109. return await self.handle_response(resp)
  110. def _encrypt_password(self, password: str) -> str:
  111. # Key and IV for AES encryption
  112. rand_key = get_random_bytes(32)
  113. iv = get_random_bytes(12)
  114. # Encrypt AES key with Instagram's RSA public key
  115. pubkey_bytes = base64.b64decode(self.state.session.password_encryption_pubkey)
  116. pubkey = RSA.import_key(pubkey_bytes)
  117. cipher_rsa = PKCS1_OAEP.new(pubkey)
  118. encrypted_rand_key = cipher_rsa.encrypt(rand_key)
  119. cipher_aes = AES.new(rand_key, AES.MODE_GCM, iv=iv)
  120. # Add the current time to the additional authenticated data (AAD) section
  121. current_time = int(time.time())
  122. cipher_aes.update(str(current_time).encode("utf-8"))
  123. # Encrypt the password and get the AES MAC auth tag
  124. encrypted_passwd, auth_tag = cipher_aes.encrypt_and_digest(password.encode("utf-8"))
  125. buf = io.BytesIO()
  126. # 1 is presumably the version
  127. buf.write(bytes([1, int(self.state.session.password_encryption_key_id)]))
  128. buf.write(iv)
  129. # Length of the encrypted AES key as a little-endian 16-bit int
  130. buf.write(struct.pack("<h", len(encrypted_rand_key)))
  131. buf.write(encrypted_rand_key)
  132. buf.write(auth_tag)
  133. buf.write(encrypted_passwd)
  134. encoded = base64.b64encode(buf.getvalue())
  135. return f"#PWD_INSTAGRAM:4:{current_time}:{encoded}"
  136. @property
  137. def _jazoest(self) -> str:
  138. return f"2{sum(ord(i) for i in self.state.device.phone_id)}"