123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- # mautrix-instagram - A Matrix-Instagram puppeting bridge.
- # Copyright (C) 2020 Tulir Asokan
- #
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU Affero General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Affero General Public License for more details.
- #
- # You should have received a copy of the GNU Affero General Public License
- # along with this program. If not, see <https://www.gnu.org/licenses/>.
- from typing import Optional, Dict, Any
- import random
- import time
- import json
- from aiohttp import ClientSession, ClientResponse
- from yarl import URL
- from mautrix.types import JSON, Serializable
- from ..state import AndroidState
- from ..errors import (IGActionSpamError, IGNotFoundError, IGRateLimitError, IGCheckpointError,
- IGUserHasLoggedOutError, IGLoginRequiredError, IGPrivateUserError,
- IGSentryBlockError, IGInactiveUserError, IGResponseError,
- IGLoginBadPasswordError, IGLoginInvalidUserError,
- IGLoginTwoFactorRequiredError)
- class BaseAndroidAPI:
- url = URL("https://i.instagram.com")
- http: ClientSession
- state: AndroidState
- def __init__(self, state: AndroidState) -> None:
- self.http = ClientSession(cookie_jar=state.cookies.jar)
- self.state = state
- @staticmethod
- def sign(req: Any, filter_nulls: bool = False) -> Dict[str, str]:
- if isinstance(req, Serializable):
- req = req.serialize()
- if isinstance(req, dict):
- def remove_nulls(d: dict) -> dict:
- return {k: v for k, v in d.items() if v is not None}
- req = json.dumps(req, object_hook=remove_nulls if filter_nulls else None)
- return {"signed_body": f"SIGNATURE.{req}"}
- @property
- def headers(self) -> Dict[str, str]:
- headers = {
- "User-Agent": self.state.user_agent,
- "X-Ads-Opt-Out": str(int(self.state.session.ads_opt_out)),
- # "X-DEVICE--ID": self.state.device.uuid,
- "X-CM-Bandwidth-KBPS": "-1.000",
- "X-CM-Latency": "-1.000",
- "X-IG-App-Locale": self.state.device.language,
- "X-IG-Device-Locale": self.state.device.language,
- "X-Pigeon-Session-Id": self.state.pigeon_session_id,
- "X-Pigeon-Rawclienttime": str(round(time.time(), 3)),
- "X-IG-Connection-Speed": f"{random.randint(1000, 3700)}kbps",
- "X-IG-Bandwidth-Speed-KBPS": "-1.000",
- "X-IG-Bandwidth-TotalBytes-B": "0",
- "X-IG-Bandwidth-TotalTime-MS": "0",
- "X-IG-EU-DC-ENABLED": (str(self.state.session.eu_dc_enabled).lower()
- if self.state.session.eu_dc_enabled is not None else None),
- "X-IG-Extended-CDN-Thumbnail-Cache-Busting-Value":
- str(self.state.session.thumbnail_cache_busting_value),
- "X-Bloks-Version-Id": self.state.application.BLOKS_VERSION_ID,
- "X-MID": self.state.cookies.get_value("mid"),
- "X-IG-WWW-Claim": self.state.session.ig_www_claim or "0",
- "X-Bloks-Is-Layout-RTL": str(self.state.device.is_layout_rtl).lower(),
- "X-IG-Connection-Type": self.state.device.connection_type,
- "X-Ig-Capabilities": self.state.application.CAPABILITIES,
- "X-IG-App-Id": self.state.application.FACEBOOK_ANALYTICS_APPLICATION_ID,
- "X-IG-Device-ID": self.state.device.uuid,
- "X-IG-Android-ID": self.state.device.id,
- "Accept-Language": self.state.device.language.replace("_", "-"),
- "X-FB-HTTP-Engine": "Liger",
- "Authorization": self.state.session.authorization,
- "Accept-Encoding": "gzip",
- "Connection": "close",
- }
- return {k: v for k, v in headers.items() if v is not None}
- async def handle_response(self, resp: ClientResponse) -> JSON:
- self._handle_response_headers(resp)
- body = await resp.json()
- if body["status"] == "ok":
- return body
- else:
- await self._raise_response_error(resp)
- async def _raise_response_error(self, resp: ClientResponse) -> None:
- try:
- data = await resp.json()
- except json.JSONDecodeError:
- data = {}
- if data.get("spam", False):
- raise IGActionSpamError(resp, data)
- elif data.get("two_factor_required", False):
- raise IGLoginTwoFactorRequiredError(resp, data)
- elif resp.status == 404:
- raise IGNotFoundError(resp, data)
- elif resp.status == 429:
- raise IGRateLimitError(resp, data)
- message = data.get("message")
- if isinstance(message, str):
- if message == "challenge_required":
- err = IGCheckpointError(resp, data)
- self.state.challenge_path = err.url
- raise err
- elif message == "user_has_logged_out":
- raise IGUserHasLoggedOutError(resp, data)
- elif message == "login_required":
- raise IGLoginRequiredError(resp, data)
- elif message.lower() == "not authorized to view user":
- raise IGPrivateUserError(resp, data)
- error_type = data.get("error_type")
- if error_type == "sentry_block":
- raise IGSentryBlockError(resp, data)
- elif error_type == "inactive_user":
- raise IGInactiveUserError(resp, data)
- elif error_type == "bad_password":
- raise IGLoginBadPasswordError(resp, data)
- elif error_type == "invalid_user":
- raise IGLoginInvalidUserError(resp, data)
- raise IGResponseError(resp, data)
- def _handle_response_headers(self, resp: ClientResponse) -> None:
- fields = {
- "X-IG-Set-WWW-Claim": "ig_www_claim",
- "IG-Set-Authorization": "authorization",
- "IG-Set-Password-Encryption-Key-ID": "password_encryption_key_id",
- "IG-Set-Password-Encryption-Pub-Key": "password_encryption_pubkey",
- "IG-Set-IG-U-IG-Direct-Region-Hint": "region_hint"
- }
- for header, field in fields.items():
- try:
- value = resp.headers[header]
- except KeyError:
- pass
- else:
- if value:
- setattr(self.state.session, field, value)
|