123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- # mautrix-instagram - A Matrix-Instagram puppeting bridge.
- # Copyright (C) 2023 Tulir Asokan
- #
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU Affero General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Affero General Public License for more details.
- #
- # You should have received a copy of the GNU Affero General Public License
- # along with this program. If not, see <https://www.gnu.org/licenses/>.
- from __future__ import annotations
- import base64
- import hashlib
- import hmac
- import zlib
- from mauigpapi.errors import (
- IGBad2FACodeError,
- IGChallengeError,
- IGChallengeWrongCodeError,
- IGLoginBadPasswordError,
- IGLoginInvalidCredentialsError,
- IGLoginInvalidUserError,
- IGLoginTwoFactorRequiredError,
- )
- from mauigpapi.http import AndroidAPI
- from mauigpapi.state import AndroidState
- from mauigpapi.types import BaseResponseUser
- from mautrix.bridge.commands import HelpSection, command_handler
- from mautrix.types import EventID
- from mautrix.util.proxy import proxy_with_retry
- from .. import user as u
- from .typehint import CommandEvent
- SECTION_AUTH = HelpSection("Authentication", 10, "")
- async def get_login_state(user: u.User, seed: str) -> tuple[AndroidAPI, AndroidState]:
- if user.command_status and user.command_status["action"] == "Login":
- api: AndroidAPI = user.command_status["api"]
- state: AndroidState = user.command_status["state"]
- else:
- state = AndroidState()
- seed = hmac.new(seed.encode("utf-8"), user.mxid.encode("utf-8"), hashlib.sha256).digest()
- state.device.generate(seed)
- api = AndroidAPI(state, log=user.api_log, proxy_handler=user.proxy_handler)
- await proxy_with_retry(
- "get_login_state",
- lambda: api.get_mobile_config(),
- logger=user.log,
- proxy_handler=user.proxy_handler,
- on_proxy_change=user.on_proxy_update,
- )
- user.command_status = {
- "action": "Login",
- "state": state,
- "api": api,
- }
- return api, state
- @command_handler(
- needs_auth=False,
- management_only=True,
- help_section=SECTION_AUTH,
- help_text="Log into Instagram",
- help_args="<_username_> <_password_>",
- )
- async def login(evt: CommandEvent) -> None:
- if await evt.sender.is_logged_in():
- await evt.reply("You're already logged in")
- return
- elif len(evt.args) < 2:
- await evt.reply("**Usage:** `$cmdprefix+sp login <username> <password>`")
- return
- username = evt.args[0]
- password = " ".join(evt.args[1:])
- await evt.redact()
- api, state = await get_login_state(evt.sender, evt.config["instagram.device_seed"])
- try:
- resp = await api.login(username, password)
- except IGLoginTwoFactorRequiredError as e:
- tfa_info = e.body.two_factor_info
- msg = "Username and password accepted, but you have two-factor authentication enabled.\n"
- if tfa_info.totp_two_factor_on:
- msg += "Send the code from your authenticator app here."
- if tfa_info.sms_two_factor_on:
- msg += f" Alternatively, send `resend-sms` to get an SMS code to •••{tfa_info.obfuscated_phone_number}"
- elif tfa_info.sms_two_factor_on:
- msg += (
- f"Send the code sent to •••{tfa_info.obfuscated_phone_number} here."
- " You can also send `resend-sms` if you didn't receive the code."
- )
- else:
- msg += (
- "Unfortunately, none of your two-factor authentication methods are currently "
- "supported by the bridge."
- )
- return
- evt.sender.command_status = {
- **evt.sender.command_status,
- "next": enter_login_2fa,
- "username": tfa_info.username,
- "is_totp": tfa_info.totp_two_factor_on,
- "has_sms": tfa_info.sms_two_factor_on,
- "2fa_identifier": tfa_info.two_factor_identifier,
- }
- await evt.reply(msg)
- except IGChallengeError:
- await evt.reply(
- "Login challenges aren't currently supported. "
- "Please set up real two-factor authentication."
- )
- await api.challenge_auto()
- evt.sender.command_status = {
- **evt.sender.command_status,
- "next": enter_login_security_code,
- }
- await evt.reply(
- "Username and password accepted, but Instagram wants to verify it's really"
- " you. Please confirm the login and enter the security code here."
- )
- except IGLoginInvalidUserError:
- await evt.reply("Invalid username")
- except IGLoginBadPasswordError:
- await evt.reply("Incorrect password")
- except IGLoginInvalidCredentialsError:
- await evt.reply("Incorrect username or password")
- except Exception as e:
- evt.log.exception("Failed to log in")
- await evt.reply(f"Failed to log in: {e}")
- else:
- await _post_login(evt, state, resp.logged_in_user)
- async def enter_login_2fa(evt: CommandEvent) -> None:
- api: AndroidAPI = evt.sender.command_status["api"]
- state: AndroidState = evt.sender.command_status["state"]
- identifier = evt.sender.command_status["2fa_identifier"]
- username = evt.sender.command_status["username"]
- is_totp = evt.sender.command_status["is_totp"]
- has_sms = evt.sender.command_status["has_sms"]
- code = "".join(evt.args).lower()
- if has_sms and code == "resend-sms":
- try:
- resp = await api.send_two_factor_login_sms(username, identifier=identifier)
- except Exception as e:
- evt.log.exception("Failed to re-request SMS code")
- await evt.reply(f"Failed to re-request SMS code: {e}")
- else:
- await evt.reply(
- f"Re-requested SMS code to {resp.two_factor_info.obfuscated_phone_number}"
- )
- evt.sender.command_status[
- "2fa_identifier"
- ] = resp.two_factor_info.two_factor_identifier
- evt.sender.command_status["is_totp"] = False
- return
- try:
- resp = await api.two_factor_login(
- username, code=code, identifier=identifier, is_totp=is_totp
- )
- except IGBad2FACodeError:
- await evt.reply(
- "Invalid 2-factor authentication code. Please try again "
- "or use `$cmdprefix+sp cancel` to cancel."
- )
- except IGChallengeError:
- await api.challenge_auto(reset=True)
- evt.sender.command_status = {
- **evt.sender.command_status,
- "next": enter_login_security_code,
- }
- await evt.reply(
- "2-factor authentication code accepted, but Instagram wants to verify it's"
- " really you. Please confirm the login and enter the security code here."
- )
- except Exception as e:
- evt.log.exception("Failed to log in")
- await evt.reply(f"Failed to log in: {e}")
- evt.sender.command_status = None
- else:
- evt.sender.command_status = None
- await _post_login(evt, state, resp.logged_in_user)
- async def enter_login_security_code(evt: CommandEvent) -> None:
- api: AndroidAPI = evt.sender.command_status["api"]
- state: AndroidState = evt.sender.command_status["state"]
- try:
- resp = await api.challenge_send_security_code("".join(evt.args))
- except IGChallengeWrongCodeError as e:
- await evt.reply(f"Incorrect security code: {e}")
- except Exception as e:
- evt.log.exception("Failed to log in")
- await evt.reply(f"Failed to log in: {e}")
- evt.sender.command_status = None
- else:
- if not resp.logged_in_user:
- evt.log.error(
- f"Didn't get logged_in_user in challenge response "
- f"after entering security code: {resp.serialize()}"
- )
- await evt.reply("An unknown error occurred. Please check the bridge logs.")
- return
- evt.sender.command_status = None
- await _post_login(evt, state, resp.logged_in_user)
- async def _post_login(evt: CommandEvent, state: AndroidState, user: BaseResponseUser) -> None:
- evt.sender.state = state
- pl = state.device.payload
- manufacturer, model = pl["manufacturer"], pl["model"]
- await evt.reply(
- f"Successfully logged in as {user.full_name} ([@{user.username}]"
- f"(https://instagram.com/{user.username}), user ID: {user.pk}).\n\n"
- f"The bridge will show up on Instagram as {manufacturer} {model}."
- )
- await evt.sender.try_connect()
- @command_handler(
- needs_auth=True,
- help_section=SECTION_AUTH,
- help_text="Disconnect the bridge from your Instagram account",
- )
- async def logout(evt: CommandEvent) -> None:
- await evt.sender.logout()
- await evt.reply("Successfully logged out")
- @command_handler(
- needs_auth=False,
- management_only=True,
- help_section=SECTION_AUTH,
- help_text="Log into Instagram with a pre-generated session blob",
- help_args="<_blob_>",
- )
- async def login_blob(evt: CommandEvent) -> EventID:
- if await evt.sender.is_logged_in():
- return await evt.reply("You're already logged in")
- elif len(evt.args) < 1:
- return await evt.reply("**Usage:** `$cmdprefix+sp login-blob <blob>`")
- await evt.redact()
- try:
- state = AndroidState.parse_json(zlib.decompress(base64.b64decode("".join(evt.args))))
- except Exception:
- evt.log.exception(f"{evt.sender} provided an invalid login blob")
- return await evt.reply("Invalid blob")
- evt.sender.state = state
- await evt.reply("Connecting...")
- await evt.sender.try_connect()
- await evt.reply("Maybe connected now, try pinging?")
|