# mautrix-instagram - A Matrix-Instagram puppeting bridge.
# Copyright (C) 2020 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
from typing import Union, Set, Optional, Any, Dict, Awaitable, Type, List, TypeVar, Callable
from collections import defaultdict
from socket import socket, error as SocketError
from uuid import uuid4
import logging
import urllib.request
import asyncio
import zlib
import time
import json
import re
import paho.mqtt.client
from paho.mqtt.client import MQTTMessage, WebsocketConnectionError
from yarl import URL
from mautrix.util.logging import TraceLogger
from ..errors import NotLoggedIn, NotConnected
from ..state import AndroidState
from ..types import (CommandResponse, ThreadItemType, ThreadAction, ReactionStatus, TypingStatus,
IrisPayload, PubsubPayload, AppPresenceEventPayload, RealtimeDirectEvent,
RealtimeZeroProvisionPayload, ClientConfigUpdatePayload, MessageSyncEvent,
MessageSyncMessage, LiveVideoCommentPayload, PubsubEvent)
from .thrift import RealtimeConfig, RealtimeClientInfo, ForegroundStateConfig, IncomingMessage
from .otclient import MQTToTClient
from .subscription import everclear_subscriptions, RealtimeTopic, GraphQLQueryID
from .events import Connect, Disconnect
try:
import socks
except ImportError:
socks = None
T = TypeVar('T')
ACTIVITY_INDICATOR_REGEX = re.compile(
r"/direct_v2/threads/([\w_]+)/activity_indicator_id/([\w_]+)")
class AndroidMQTT:
_loop: asyncio.AbstractEventLoop
_client: MQTToTClient
log: TraceLogger
state: AndroidState
_graphql_subs: Set[str]
_skywalker_subs: Set[str]
_iris_seq_id: Optional[int]
_iris_snapshot_at_ms: Optional[int]
_publish_waiters: Dict[int, asyncio.Future]
_response_waiters: Dict[RealtimeTopic, asyncio.Future]
_response_waiter_locks: Dict[RealtimeTopic, asyncio.Lock]
_disconnect_error: Optional[Exception]
_event_handlers: Dict[Type[T], List[Callable[[T], Awaitable[None]]]]
# region Initialization
def __init__(self, state: AndroidState, loop: Optional[asyncio.AbstractEventLoop] = None,
log: Optional[TraceLogger] = None) -> None:
self._graphql_subs = set()
self._skywalker_subs = set()
self._iris_seq_id = None
self._iris_snapshot_at_ms = None
self._publish_waiters = {}
self._response_waiters = {}
self._disconnect_error = None
self._response_waiter_locks = defaultdict(lambda: asyncio.Lock())
self._event_handlers = defaultdict(lambda: [])
self.log = log or logging.getLogger("mauigpapi")
self._loop = loop or asyncio.get_event_loop()
self.state = state
self._client = MQTToTClient(
client_id=self._form_client_id(),
clean_session=True,
protocol=paho.mqtt.client.MQTTv31,
transport="tcp",
)
try:
http_proxy = urllib.request.getproxies()["http"]
except KeyError:
http_proxy = None
if http_proxy and socks and URL:
proxy_url = URL(http_proxy)
proxy_type = {
"http": socks.HTTP,
"https": socks.HTTP,
"socks": socks.SOCKS5,
"socks5": socks.SOCKS5,
"socks4": socks.SOCKS4,
}[proxy_url.scheme]
self._client.proxy_set(proxy_type=proxy_type, proxy_addr=proxy_url.host,
proxy_port=proxy_url.port, proxy_username=proxy_url.user,
proxy_password=proxy_url.password)
self._client.enable_logger()
self._client.tls_set()
# mqtt.max_inflight_messages_set(20) # The rest will get queued
# mqtt.max_queued_messages_set(0) # Unlimited messages can be queued
# mqtt.message_retry_set(20) # Retry sending for at least 20 seconds
# mqtt.reconnect_delay_set(min_delay=1, max_delay=120)
self._client.connect_async("edge-mqtt.facebook.com", 443, keepalive=60)
self._client.on_message = self._on_message_handler
self._client.on_publish = self._on_publish_handler
self._client.on_connect = self._on_connect_handler
# self._client.on_disconnect = self._on_disconnect_handler
self._client.on_socket_open = self._on_socket_open
self._client.on_socket_close = self._on_socket_close
self._client.on_socket_register_write = self._on_socket_register_write
self._client.on_socket_unregister_write = self._on_socket_unregister_write
def _form_client_id(self) -> bytes:
subscribe_topics = [RealtimeTopic.PUBSUB, RealtimeTopic.SUB_IRIS_RESPONSE,
RealtimeTopic.REALTIME_SUB, RealtimeTopic.REGION_HINT,
RealtimeTopic.SEND_MESSAGE_RESPONSE, RealtimeTopic.MESSAGE_SYNC,
RealtimeTopic.UNKNOWN_179, RealtimeTopic.UNKNOWN_PP]
subscribe_topic_ids = [int(topic.encoded) for topic in subscribe_topics]
password = f"sessionid={self.state.cookies['sessionid']}"
cfg = RealtimeConfig(
client_identifier=self.state.device.phone_id[:20],
client_info=RealtimeClientInfo(
user_id=int(self.state.user_id),
user_agent=self.state.user_agent,
client_capabilities=0b10110111,
endpoint_capabilities=0,
publish_format=1,
no_automatic_foreground=True,
make_user_available_in_foreground=False,
device_id=self.state.device.phone_id,
is_initially_foreground=True,
network_type=1,
network_subtype=0,
client_mqtt_session_id=int(time.time() * 1000) & 0xffffffff,
subscribe_topics=subscribe_topic_ids,
client_type="cookie_auth",
app_id=567067343352427,
region_preference=self.state.session.region_hint or "LLA",
device_secret="",
client_stack=3,
),
password=password,
app_specific_info={
"app_version": self.state.application.APP_VERSION,
"X-IG-Capabilities": self.state.application.CAPABILITIES,
"everclear_subscriptions": json.dumps(everclear_subscriptions),
"User-Agent": self.state.user_agent,
"Accept-Language": self.state.device.language.replace("_", "-"),
"platform": "android",
"ig_mqtt_route": "django",
"pubsub_msg_type_blacklist": "direct, typing_type",
"auth_cache_enabled": "0",
},
)
return zlib.compress(cfg.to_thrift(), level=9)
# endregion
def _on_socket_open(self, client: MQTToTClient, _: Any, sock: socket) -> None:
self._loop.add_reader(sock, client.loop_read)
def _on_socket_close(self, client: MQTToTClient, _: Any, sock: socket) -> None:
self._loop.remove_reader(sock)
def _on_socket_register_write(self, client: MQTToTClient, _: Any, sock: socket) -> None:
self._loop.add_writer(sock, client.loop_write)
def _on_socket_unregister_write(self, client: MQTToTClient, _: Any, sock: socket) -> None:
self._loop.remove_writer(sock)
def _on_connect_handler(self, client: MQTToTClient, _: Any, flags: Dict[str, Any], rc: int
) -> None:
if rc != 0:
err = paho.mqtt.client.connack_string(rc)
self.log.error("MQTT Connection Error: %s (%d)", err, rc)
return
self._loop.create_task(self._post_connect())
async def _post_connect(self) -> None:
self.log.debug("Re-subscribing to things after connect")
if self._graphql_subs:
res = await self.graphql_subscribe(self._graphql_subs)
self.log.trace("GraphQL subscribe response: %s", res)
if self._skywalker_subs:
res = await self.skywalker_subscribe(self._skywalker_subs)
self.log.trace("Skywalker subscribe response: %s", res)
if self._iris_seq_id:
await self.iris_subscribe(self._iris_seq_id, self._iris_snapshot_at_ms)
def _on_publish_handler(self, client: MQTToTClient, _: Any, mid: int) -> None:
self.log.trace(f"Received publish confirmation for {mid}")
try:
waiter = self._publish_waiters[mid]
except KeyError:
return
waiter.set_result(None)
# region Incoming event parsing
@staticmethod
def _parse_direct_thread_path(path: str) -> dict:
blank, direct_v2, threads, thread_id, *rest = path.split("/")
assert blank == ""
assert direct_v2 == "direct_v2"
assert threads == "threads"
additional = {
"thread_id": thread_id
}
if rest:
if rest[0] == "admin_user_ids":
additional["admin_user_ids"] = rest[1]
elif rest[0] == "approval_required_for_new_members":
additional["approval_required_for_new_members"] = True
elif rest[0] == ["participants"]:
additional["participants"] = {rest[1]: rest[2]}
elif rest[0] == ["items"]:
additional["item_id"] = rest[1]
# TODO wtf is this?
# it has something to do with reactions
if len(rest) > 4:
additional[rest[2]] = {
rest[3]: rest[4],
}
return additional
def _on_message_sync(self, payload: bytes) -> None:
parsed = json.loads(payload.decode("utf-8"))
self.log.trace("Got message sync event: %s", parsed)
for sync_item in parsed:
parsed_item = IrisPayload.deserialize(sync_item)
for part in parsed_item.data:
raw_message = {
"path": part.path,
"op": part.op,
**self._parse_direct_thread_path(part.path),
}
try:
raw_message = {
**raw_message,
**json.loads(part.value),
}
except json.JSONDecodeError:
raw_message["value"] = part.value
message = MessageSyncMessage.deserialize(raw_message)
evt = MessageSyncEvent(iris=parsed_item, message=message)
self._loop.create_task(self._dispatch(evt))
def _on_pubsub(self, payload: bytes) -> None:
parsed_thrift = IncomingMessage.from_thrift(payload)
message = PubsubPayload.parse_json(parsed_thrift.payload)
self.log.trace(f"Got pubsub event with topic {parsed_thrift.topic}: {message}")
for data in message.data:
match = ACTIVITY_INDICATOR_REGEX.match(data.path)
if match:
evt = PubsubEvent(data=data, base=message, thread_id=match.group(1),
activity_indicator_id=match.group(2))
self._loop.create_task(self._dispatch(evt))
elif not data.double_publish:
self.log.debug("Pubsub: no activity indicator on data: %s", data)
else:
self.log.debug("Pubsub: double publish: %s", data.path)
@staticmethod
def _parse_realtime_sub_item(topic: str, raw: dict) -> Any:
if topic == GraphQLQueryID.APP_PRESENCE:
return AppPresenceEventPayload.deserialize(raw).presence_event
elif topic == GraphQLQueryID.ZERO_PROVISION:
return RealtimeZeroProvisionPayload.deserialize(raw).zero_product_provisioning_event
elif topic == GraphQLQueryID.CLIENT_CONFIG_UPDATE:
return ClientConfigUpdatePayload.deserialize(raw).client_config_update_event
elif topic == GraphQLQueryID.LIVE_REALTIME_COMMENTS:
return LiveVideoCommentPayload.deserialize(raw).live_video_comment_event
elif topic == "direct":
return RealtimeDirectEvent.deserialize(raw)
def _on_realtime_sub(self, payload: bytes) -> None:
parsed_thrift = IncomingMessage.from_thrift(payload)
topic = parsed_thrift.topic
if topic not in ("direct", GraphQLQueryID.APP_PRESENCE, GraphQLQueryID.ZERO_PROVISION,
GraphQLQueryID.CLIENT_CONFIG_UPDATE, GraphQLQueryID.LIVE_REALTIME_COMMENTS):
self.log.debug(f"Got unknown realtime sub event {topic}: {parsed_thrift.payload}")
parsed_json = json.loads(parsed_thrift.payload)
event = parsed_json.get("event", "")
for item in parsed_json["data"]:
evt = self._parse_realtime_sub_item(topic, item)
self.log.trace(f"Got realtime sub event with topic {topic}/{event}: {evt}")
self._loop.create_task(self._dispatch(evt))
def _on_message_handler(self, client: MQTToTClient, _: Any, message: MQTTMessage) -> None:
try:
topic = RealtimeTopic.decode(message.topic)
# Instagram Android MQTT messages are always compressed
message.payload = zlib.decompress(message.payload)
if topic == RealtimeTopic.MESSAGE_SYNC:
self._on_message_sync(message.payload)
elif topic == RealtimeTopic.PUBSUB:
self._on_pubsub(message.payload)
elif topic == RealtimeTopic.REALTIME_SUB:
self._on_realtime_sub(message.payload)
else:
print("other message", message.payload)
try:
waiter = self._response_waiters.pop(topic)
except KeyError:
self.log.debug("No handler for MQTT message in %s: %s",
topic.value, message.payload)
else:
waiter.set_result(message)
except Exception:
self.log.exception("Error in incoming MQTT message handler")
print(message.payload)
# endregion
async def _reconnect(self) -> None:
try:
self._client.reconnect()
except (SocketError, OSError, WebsocketConnectionError) as e:
# TODO custom class
raise NotLoggedIn("MQTT reconnection failed") from e
def add_event_handler(self, evt_type: Type[T], handler: Callable[[T], Awaitable[None]]
) -> None:
self._event_handlers[evt_type].append(handler)
async def _dispatch(self, evt: T) -> None:
for handler in self._event_handlers[type(evt)]:
try:
await handler(evt)
except Exception:
self.log.exception(f"Error in {type(evt)} handler")
def disconnect(self) -> None:
self._client.disconnect()
async def listen(self, graphql_subs: Set[str] = None, skywalker_subs: Set[str] = None,
seq_id: int = None, snapshot_at_ms: int = None) -> None:
self._graphql_subs = graphql_subs or set()
self._skywalker_subs = skywalker_subs or set()
self._iris_seq_id = seq_id
self._iris_snapshot_at_ms = snapshot_at_ms
self.log.debug("Connecting to Instagram MQTT")
await self._reconnect()
await self._dispatch(Connect())
exit_if_not_connected = False
while True:
try:
await asyncio.sleep(1)
except asyncio.CancelledError:
self.disconnect()
# this might not be necessary
self._client.loop_misc()
break
rc = self._client.loop_misc()
# If disconnect() has been called
# Beware, internal API, may have to change this to something more stable!
if self._client._state == paho.mqtt.client.mqtt_cs_disconnecting:
break # Stop listening
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
# If known/expected error
if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST:
await self._dispatch(Disconnect(reason="Connection lost, retrying"))
elif rc == paho.mqtt.client.MQTT_ERR_NOMEM:
# This error is wrongly classified
# See https://github.com/eclipse/paho.mqtt.python/issues/340
await self._dispatch(Disconnect(reason="Connection lost, retrying"))
elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED:
raise NotLoggedIn("MQTT connection refused")
elif rc == paho.mqtt.client.MQTT_ERR_NO_CONN:
if exit_if_not_connected:
raise NotConnected("MQTT error: no connection")
await self._dispatch(Disconnect(reason="MQTT Error: no connection, retrying"))
else:
err = paho.mqtt.client.error_string(rc)
self.log.error("MQTT Error: %s", err)
await self._dispatch(Disconnect(reason=f"MQTT Error: {err}, retrying"))
await self._reconnect()
exit_if_not_connected = True
await self._dispatch(Connect())
else:
exit_if_not_connected = False
if self._disconnect_error:
self.log.info("disconnect_error is set, raising and clearing variable")
err = self._disconnect_error
self._disconnect_error = None
raise err
# region Basic outgoing MQTT
def publish(self, topic: RealtimeTopic, payload: Union[str, bytes, dict]
) -> asyncio.Future:
if isinstance(payload, dict):
payload = json.dumps(payload)
if isinstance(payload, str):
payload = payload.encode("utf-8")
payload = zlib.compress(payload, level=9)
info = self._client.publish(topic.encoded, payload, qos=1)
fut = asyncio.Future()
self._publish_waiters[info.mid] = fut
return fut
async def request(self, topic: RealtimeTopic, response: RealtimeTopic,
payload: Union[str, bytes, dict]) -> MQTTMessage:
async with self._response_waiter_locks[response]:
fut = asyncio.Future()
self._response_waiters[response] = fut
await self.publish(topic, payload)
return await fut
async def iris_subscribe(self, seq_id: int, snapshot_at_ms: int) -> None:
resp = await self.request(RealtimeTopic.SUB_IRIS, RealtimeTopic.SUB_IRIS_RESPONSE,
{"seq_id": seq_id, "snapshot_at_ms": snapshot_at_ms,
"snapshot_app_version": "message"})
self.log.debug("Iris subscribe response: %s", resp.payload.decode("utf-8"))
def graphql_subscribe(self, subs: Set[str]) -> asyncio.Future:
self._graphql_subs |= subs
return self.publish(RealtimeTopic.REALTIME_SUB, {"sub": list(subs)})
def graphql_unsubscribe(self, subs: Set[str]) -> asyncio.Future:
self._graphql_subs -= subs
return self.publish(RealtimeTopic.REALTIME_SUB, {"unsub": list(subs)})
def skywalker_subscribe(self, subs: Set[str]) -> asyncio.Future:
self._skywalker_subs |= subs
return self.publish(RealtimeTopic.PUBSUB, {"sub": list(subs)})
def skywalker_unsubscribe(self, subs: Set[str]) -> asyncio.Future:
self._skywalker_subs -= subs
return self.publish(RealtimeTopic.PUBSUB, {"unsub": list(subs)})
# endregion
# region Actually sending messages and stuff
async def send_foreground_state(self, state: ForegroundStateConfig) -> None:
self.log.debug("Updating foreground state: %s", state)
await self.publish(RealtimeTopic.FOREGROUND_STATE,
zlib.compress(state.to_thrift(), level=9))
if state.keep_alive_timeout:
self._client._keepalive = state.keep_alive_timeout
async def send_command(self, thread_id: str, action: ThreadAction,
client_context: Optional[str] = None,
offline_threading_id: Optional[str] = None, **kwargs: Any
) -> CommandResponse:
client_context = client_context or str(uuid4())
req = {
"thread_id": thread_id,
"client_context": client_context,
"offline_threading_id": offline_threading_id or client_context,
"action": action.value,
# "device_id": self.state.cookies["ig_did"],
**kwargs,
}
resp = await self.request(RealtimeTopic.SEND_MESSAGE, RealtimeTopic.SEND_MESSAGE_RESPONSE,
payload=req)
return CommandResponse.parse_json(resp.payload.decode("utf-8"))
def send_item(self, thread_id: str, item_type: ThreadItemType, shh_mode: bool = False,
client_context: Optional[str] = None, offline_threading_id: Optional[str] = None,
**kwargs: Any) -> Awaitable[CommandResponse]:
return self.send_command(thread_id, item_type=item_type.value,
is_shh_mode=str(int(shh_mode)), action=ThreadAction.SEND_ITEM,
client_context=client_context,
offline_threading_id=offline_threading_id, **kwargs)
def send_hashtag(self, thread_id: str, hashtag: str, text: str = "", shh_mode: bool = False,
client_context: Optional[str] = None,
offline_threading_id: Optional[str] = None) -> Awaitable[CommandResponse]:
return self.send_item(thread_id, text=text, item_id=hashtag, shh_mode=shh_mode,
item_type=ThreadItemType.HASHTAG, client_context=client_context,
offline_threading_id=offline_threading_id)
def send_like(self, thread_id: str, shh_mode: bool = False,
client_context: Optional[str] = None, offline_threading_id: Optional[str] = None,
) -> Awaitable[CommandResponse]:
return self.send_item(thread_id, shh_mode=shh_mode, item_type=ThreadItemType.LIKE,
client_context=client_context,
offline_threading_id=offline_threading_id)
def send_location(self, thread_id: str, venue_id: str, text: str = "",
shh_mode: bool = False, client_context: Optional[str] = None,
offline_threading_id: Optional[str] = None,
) -> Awaitable[CommandResponse]:
return self.send_item(thread_id, text=text, item_id=venue_id, shh_mode=shh_mode,
item_type=ThreadItemType.LOCATION, client_context=client_context,
offline_threading_id=offline_threading_id)
def send_media(self, thread_id: str, media_id: str, text: str = "", shh_mode: bool = False,
client_context: Optional[str] = None,
offline_threading_id: Optional[str] = None) -> Awaitable[CommandResponse]:
return self.send_item(thread_id, text=text, item_id=media_id, shh_mode=shh_mode,
item_type=ThreadItemType.MEDIA_SHARE, client_context=client_context,
offline_threading_id=offline_threading_id)
def send_profile(self, thread_id: str, user_id: str, text: str = "", shh_mode: bool = False,
client_context: Optional[str] = None,
offline_threading_id: Optional[str] = None) -> Awaitable[CommandResponse]:
return self.send_item(thread_id, text=text, item_id=user_id, shh_mode=shh_mode,
item_type=ThreadItemType.PROFILE, client_context=client_context,
offline_threading_id=offline_threading_id)
def send_reaction(self, thread_id: str, emoji: str, item_id: str,
reaction_status: ReactionStatus = ReactionStatus.CREATED,
target_item_type: ThreadItemType = ThreadItemType.TEXT,
shh_mode: bool = False, client_context: Optional[str] = None,
offline_threading_id: Optional[str] = None) -> Awaitable[CommandResponse]:
return self.send_item(thread_id, reaction_status=reaction_status.value, node_type="item",
reaction_type="like", target_item_type=target_item_type.value,
emoji=emoji, item_id=item_id, reaction_action_source="double_tap",
shh_mode=shh_mode, item_type=ThreadItemType.REACTION,
client_context=client_context,
offline_threading_id=offline_threading_id)
def send_user_story(self, thread_id: str, media_id: str, text: str = "",
shh_mode: bool = False, client_context: Optional[str] = None,
offline_threading_id: Optional[str] = None) -> Awaitable[CommandResponse]:
return self.send_item(thread_id, text=text, item_id=media_id, shh_mode=shh_mode,
item_type=ThreadItemType.REEL_SHARE, client_context=client_context,
offline_threading_id=offline_threading_id)
def send_text(self, thread_id: str, text: str = "", shh_mode: bool = False,
client_context: Optional[str] = None, offline_threading_id: Optional[str] = None
) -> Awaitable[CommandResponse]:
return self.send_item(thread_id, text=text, shh_mode=shh_mode,
item_type=ThreadItemType.TEXT, client_context=client_context,
offline_threading_id=offline_threading_id)
def mark_seen(self, thread_id: str, item_id: str, client_context: Optional[str] = None,
offline_threading_id: Optional[str] = None) -> Awaitable[CommandResponse]:
return self.send_command(thread_id, item_id=item_id, action=ThreadAction.MARK_SEEN,
client_context=client_context,
offline_threading_id=offline_threading_id)
def mark_visual_item_seen(self, thread_id: str, item_id: str,
client_context: Optional[str] = None,
offline_threading_id: Optional[str] = None
) -> Awaitable[CommandResponse]:
return self.send_command(thread_id, item_id=item_id,
action=ThreadAction.MARK_VISUAL_ITEM_SEEN,
client_context=client_context,
offline_threading_id=offline_threading_id)
def indicate_activity(self, thread_id: str, activity_status: TypingStatus = TypingStatus.TEXT,
client_context: Optional[str] = None,
offline_threading_id: Optional[str] = None
) -> Awaitable[CommandResponse]:
return self.send_command(thread_id, activity_status=activity_status.value,
action=ThreadAction.INDICATE_ACTIVITY,
client_context=client_context,
offline_threading_id=offline_threading_id)
# endregion