|
@@ -13,7 +13,7 @@
|
|
#
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
-from typing import Awaitable, Dict
|
|
|
|
|
|
+from typing import Awaitable, Dict, Tuple
|
|
import logging
|
|
import logging
|
|
import asyncio
|
|
import asyncio
|
|
import json
|
|
import json
|
|
@@ -23,8 +23,9 @@ from aiohttp import web
|
|
from mauigpapi import AndroidState, AndroidAPI
|
|
from mauigpapi import AndroidState, AndroidAPI
|
|
from mauigpapi.types import BaseResponseUser
|
|
from mauigpapi.types import BaseResponseUser
|
|
from mauigpapi.errors import (IGLoginTwoFactorRequiredError, IGLoginBadPasswordError,
|
|
from mauigpapi.errors import (IGLoginTwoFactorRequiredError, IGLoginBadPasswordError,
|
|
- IGLoginInvalidUserError, IGBad2FACodeError, IGNotLoggedInError)
|
|
|
|
-from mautrix.types import UserID
|
|
|
|
|
|
+ IGLoginInvalidUserError, IGBad2FACodeError, IGNotLoggedInError,
|
|
|
|
+ IGCheckpointError, IGChallengeWrongCodeError)
|
|
|
|
+from mautrix.types import UserID, JSON
|
|
from mautrix.util.logging import TraceLogger
|
|
from mautrix.util.logging import TraceLogger
|
|
|
|
|
|
from ..commands.auth import get_login_state
|
|
from ..commands.auth import get_login_state
|
|
@@ -104,12 +105,7 @@ class ProvisioningAPI:
|
|
return web.json_response(data, headers=self._acao_headers)
|
|
return web.json_response(data, headers=self._acao_headers)
|
|
|
|
|
|
async def login(self, request: web.Request) -> web.Response:
|
|
async def login(self, request: web.Request) -> web.Response:
|
|
- user = await self.check_token(request)
|
|
|
|
-
|
|
|
|
- try:
|
|
|
|
- data = await request.json()
|
|
|
|
- except json.JSONDecodeError:
|
|
|
|
- raise web.HTTPBadRequest(text='{"error": "Malformed JSON"}', headers=self._headers)
|
|
|
|
|
|
+ user, data = await self._get_user(request, check_state=False)
|
|
|
|
|
|
try:
|
|
try:
|
|
username = data["username"]
|
|
username = data["username"]
|
|
@@ -124,18 +120,25 @@ class ProvisioningAPI:
|
|
return web.json_response(data={
|
|
return web.json_response(data={
|
|
"status": "two-factor",
|
|
"status": "two-factor",
|
|
"response": e.body.serialize(),
|
|
"response": e.body.serialize(),
|
|
- }, status=202, headers=self._headers)
|
|
|
|
|
|
+ }, status=202, headers=self._acao_headers)
|
|
|
|
+ except IGCheckpointError as e:
|
|
|
|
+ await api.challenge_auto(reset=True)
|
|
|
|
+ return web.json_response(data={
|
|
|
|
+ "status": "checkpoint",
|
|
|
|
+ "response": e.body.serialize(),
|
|
|
|
+ }, status=202, headers=self._acao_headers)
|
|
except IGLoginInvalidUserError:
|
|
except IGLoginInvalidUserError:
|
|
return web.json_response(data={"status": "invalid-username"},
|
|
return web.json_response(data={"status": "invalid-username"},
|
|
- status=404, headers=self._headers)
|
|
|
|
|
|
+ status=404, headers=self._acao_headers)
|
|
except IGLoginBadPasswordError:
|
|
except IGLoginBadPasswordError:
|
|
return web.json_response(data={"status": "incorrect-password"},
|
|
return web.json_response(data={"status": "incorrect-password"},
|
|
- status=403, headers=self._headers)
|
|
|
|
|
|
+ status=403, headers=self._acao_headers)
|
|
return await self._finish_login(user, api, state, resp.logged_in_user)
|
|
return await self._finish_login(user, api, state, resp.logged_in_user)
|
|
|
|
|
|
- async def login_2fa(self, request: web.Request) -> web.Response:
|
|
|
|
|
|
+ async def _get_user(self, request: web.Request, check_state: bool = False
|
|
|
|
+ ) -> Tuple['u.User', JSON]:
|
|
user = await self.check_token(request)
|
|
user = await self.check_token(request)
|
|
- if not user.command_status or user.command_status["action"] != "Login":
|
|
|
|
|
|
+ if check_state and (not user.command_status or user.command_status["action"] != "Login"):
|
|
raise web.HTTPNotFound(text='{"error": "No 2-factor login in progress}',
|
|
raise web.HTTPNotFound(text='{"error": "No 2-factor login in progress}',
|
|
headers=self._headers)
|
|
headers=self._headers)
|
|
|
|
|
|
@@ -143,6 +146,10 @@ class ProvisioningAPI:
|
|
data = await request.json()
|
|
data = await request.json()
|
|
except json.JSONDecodeError:
|
|
except json.JSONDecodeError:
|
|
raise web.HTTPBadRequest(text='{"error": "Malformed JSON"}', headers=self._headers)
|
|
raise web.HTTPBadRequest(text='{"error": "Malformed JSON"}', headers=self._headers)
|
|
|
|
+ return user, data
|
|
|
|
+
|
|
|
|
+ async def login_2fa(self, request: web.Request) -> web.Response:
|
|
|
|
+ user, data = await self._get_user(request, check_state=True)
|
|
|
|
|
|
try:
|
|
try:
|
|
username = data["username"]
|
|
username = data["username"]
|
|
@@ -157,14 +164,39 @@ class ProvisioningAPI:
|
|
try:
|
|
try:
|
|
resp = await api.two_factor_login(username, code=code, identifier=identifier,
|
|
resp = await api.two_factor_login(username, code=code, identifier=identifier,
|
|
is_totp=is_totp)
|
|
is_totp=is_totp)
|
|
- except IGBad2FACodeError as e:
|
|
|
|
|
|
+ except IGBad2FACodeError:
|
|
return web.json_response(data={
|
|
return web.json_response(data={
|
|
"status": "incorrect-2fa-code",
|
|
"status": "incorrect-2fa-code",
|
|
}, status=403, headers=self._acao_headers)
|
|
}, status=403, headers=self._acao_headers)
|
|
|
|
+ except IGCheckpointError as e:
|
|
|
|
+ await api.challenge_auto(reset=True)
|
|
|
|
+ return web.json_response(data={
|
|
|
|
+ "status": "checkpoint",
|
|
|
|
+ "response": e.body.serialize(),
|
|
|
|
+ }, status=202, headers=self._acao_headers)
|
|
|
|
+ return await self._finish_login(user, api, state, resp.logged_in_user)
|
|
|
|
+
|
|
|
|
+ async def login_checkpoint(self, request: web.Request) -> web.Response:
|
|
|
|
+ user, data = await self._get_user(request, check_state=True)
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ code = data["code"]
|
|
|
|
+ except KeyError:
|
|
|
|
+ raise web.HTTPBadRequest(text='{"error": "Missing keys"}', headers=self._headers)
|
|
|
|
+
|
|
|
|
+ api: AndroidAPI = user.command_status["api"]
|
|
|
|
+ state: AndroidState = user.command_status["state"]
|
|
|
|
+ try:
|
|
|
|
+ resp = await api.challenge_send_security_code(code=code)
|
|
|
|
+ except IGChallengeWrongCodeError:
|
|
|
|
+ return web.json_response(data={
|
|
|
|
+ "status": "incorrect-challenge-code",
|
|
|
|
+ }, status=403, headers=self._acao_headers)
|
|
return await self._finish_login(user, api, state, resp.logged_in_user)
|
|
return await self._finish_login(user, api, state, resp.logged_in_user)
|
|
|
|
|
|
async def _finish_login(self, user: 'u.User', api: AndroidAPI, state: AndroidState,
|
|
async def _finish_login(self, user: 'u.User', api: AndroidAPI, state: AndroidState,
|
|
resp_user: BaseResponseUser) -> web.Response:
|
|
resp_user: BaseResponseUser) -> web.Response:
|
|
|
|
+ await api.simulate_post_login_flow()
|
|
user.state = state
|
|
user.state = state
|
|
pl = state.device.payload
|
|
pl = state.device.payload
|
|
manufacturer, model = pl["manufacturer"], pl["model"]
|
|
manufacturer, model = pl["manufacturer"], pl["model"]
|