|
@@ -1,5 +1,5 @@
|
|
|
# mautrix-instagram - A Matrix-Instagram puppeting bridge.
|
|
|
-# Copyright (C) 2020 Tulir Asokan
|
|
|
+# Copyright (C) 2022 Tulir Asokan
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
@@ -16,10 +16,15 @@
|
|
|
from __future__ import annotations
|
|
|
|
|
|
from typing import Awaitable
|
|
|
+from uuid import uuid4
|
|
|
import json
|
|
|
import logging
|
|
|
+import random
|
|
|
+import string
|
|
|
+import time
|
|
|
|
|
|
from aiohttp import web
|
|
|
+from yarl import URL
|
|
|
|
|
|
from mauigpapi import AndroidAPI, AndroidState
|
|
|
from mauigpapi.errors import (
|
|
@@ -51,10 +56,13 @@ class ProvisioningAPI:
|
|
|
self.app.router.add_options("/api/login", self.login_options)
|
|
|
self.app.router.add_options("/api/login/2fa", self.login_options)
|
|
|
self.app.router.add_options("/api/login/checkpoint", self.login_options)
|
|
|
+ self.app.router.add_options("/api/login/fb", self.login_options)
|
|
|
self.app.router.add_options("/api/logout", self.login_options)
|
|
|
self.app.router.add_post("/api/login", self.login)
|
|
|
self.app.router.add_post("/api/login/2fa", self.login_2fa)
|
|
|
self.app.router.add_post("/api/login/checkpoint", self.login_checkpoint)
|
|
|
+ self.app.router.add_get("/api/login/fb", self.get_fb_login_url)
|
|
|
+ self.app.router.add_post("/api/login/fb", self.post_fb_login_token)
|
|
|
self.app.router.add_post("/api/logout", self.logout)
|
|
|
|
|
|
@property
|
|
@@ -62,7 +70,7 @@ class ProvisioningAPI:
|
|
|
return {
|
|
|
"Access-Control-Allow-Origin": "*",
|
|
|
"Access-Control-Allow-Headers": "Authorization, Content-Type",
|
|
|
- "Access-Control-Allow-Methods": "POST, OPTIONS",
|
|
|
+ "Access-Control-Allow-Methods": "GET, POST, OPTIONS",
|
|
|
}
|
|
|
|
|
|
@property
|
|
@@ -145,7 +153,7 @@ class ProvisioningAPI:
|
|
|
raise self._missing_key_error(e)
|
|
|
|
|
|
self.log.debug("%s is attempting to log in as %s", user.mxid, username)
|
|
|
- api, state = await get_login_state(user, username, self.device_seed)
|
|
|
+ api, state = await get_login_state(user, self.device_seed)
|
|
|
try:
|
|
|
resp = await api.login(username, password)
|
|
|
except IGLoginTwoFactorRequiredError as e:
|
|
@@ -178,7 +186,7 @@ class ProvisioningAPI:
|
|
|
return await self._finish_login(user, state, api, login_resp=resp, after="password")
|
|
|
|
|
|
async def _get_user(
|
|
|
- self, request: web.Request, check_state: bool = False
|
|
|
+ self, request: web.Request, check_state: bool = False, read_body: bool = True
|
|
|
) -> tuple[u.User, JSON]:
|
|
|
user = await self.check_token(request)
|
|
|
if check_state and (not user.command_status or user.command_status["action"] != "Login"):
|
|
@@ -186,10 +194,13 @@ class ProvisioningAPI:
|
|
|
text='{"error": "No 2-factor login in progress"}', headers=self._headers
|
|
|
)
|
|
|
|
|
|
- try:
|
|
|
- data = await request.json()
|
|
|
- except json.JSONDecodeError:
|
|
|
- raise web.HTTPBadRequest(text='{"error": "Malformed JSON"}', headers=self._headers)
|
|
|
+ if read_body:
|
|
|
+ try:
|
|
|
+ data = await request.json()
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ raise web.HTTPBadRequest(text='{"error": "Malformed JSON"}', headers=self._headers)
|
|
|
+ else:
|
|
|
+ data = None
|
|
|
return user, data
|
|
|
|
|
|
async def login_2fa(self, request: web.Request) -> web.Response:
|
|
@@ -320,7 +331,7 @@ class ProvisioningAPI:
|
|
|
raise
|
|
|
self.log.debug("%s got a checkpoint after a login that looked successful", user.mxid)
|
|
|
return await self.start_checkpoint(user, api, e)
|
|
|
- await user.connect()
|
|
|
+ await user.connect(user=resp.user)
|
|
|
return web.json_response(
|
|
|
data={
|
|
|
"status": "logged-in",
|
|
@@ -335,3 +346,48 @@ class ProvisioningAPI:
|
|
|
user = await self.check_token(request)
|
|
|
await user.logout()
|
|
|
return web.json_response({}, headers=self._acao_headers)
|
|
|
+
|
|
|
+ async def get_fb_login_url(self, request: web.Request) -> web.Response:
|
|
|
+ user, _ = await self._get_user(request, check_state=False, read_body=False)
|
|
|
+ timestamp = int(time.time() * 1000)
|
|
|
+ logger_id = str(uuid4())
|
|
|
+ query: dict[str, str] = {
|
|
|
+ "app_id": "124024574287414",
|
|
|
+ "cbt": str(timestamp),
|
|
|
+ "e2e": json.dumps({"init": timestamp}, separators=(",", ":")),
|
|
|
+ "sso": "chrome_custom_tab",
|
|
|
+ "scope": "email",
|
|
|
+ "state": json.dumps(
|
|
|
+ {
|
|
|
+ "0_auth_logger_id": logger_id,
|
|
|
+ "7_challenge": "".join(random.choices(string.ascii_lowercase, k=20)),
|
|
|
+ "3_method": "custom_tab",
|
|
|
+ },
|
|
|
+ separators=(",", ":"),
|
|
|
+ ),
|
|
|
+ "redirect_uri": "fbconnect://cct.com.instagram.android",
|
|
|
+ "response_type": "token,signed_request,graph_domain,granted_scopes",
|
|
|
+ "return_scopes": "true",
|
|
|
+ }
|
|
|
+ self.log.debug("%s requested a Facebook login URL (logger ID %s)", user.mxid, logger_id)
|
|
|
+ return web.json_response(
|
|
|
+ {
|
|
|
+ "url": str(URL("https://m.facebook.com/v2.3/dialog/oauth").with_query(query)),
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ async def post_fb_login_token(self, request: web.Request) -> web.Response:
|
|
|
+ user, data = await self._get_user(request, check_state=False)
|
|
|
+
|
|
|
+ try:
|
|
|
+ fb_access_token = data["access_token"]
|
|
|
+ logger_id = data["state"]["0_auth_logger_id"]
|
|
|
+ except KeyError as e:
|
|
|
+ raise self._missing_key_error(e)
|
|
|
+
|
|
|
+ self.log.debug(
|
|
|
+ "%s is attempting to log in with Facebook token (logger ID %s)", user.mxid, logger_id
|
|
|
+ )
|
|
|
+ api, state = await get_login_state(user, self.device_seed)
|
|
|
+ resp = await api.facebook_signup(fb_access_token)
|
|
|
+ return await self._finish_login(user, state, api, login_resp=resp, after="facebook auth")
|