conn.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2020 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from typing import (Union, Set, Optional, Any, Dict, Awaitable, Type, List, TypeVar, Callable,
  17. Iterable)
  18. from collections import defaultdict
  19. from socket import socket, error as SocketError
  20. from uuid import uuid4
  21. import logging
  22. import urllib.request
  23. import asyncio
  24. import zlib
  25. import time
  26. import json
  27. import re
  28. import paho.mqtt.client
  29. from paho.mqtt.client import MQTTMessage, WebsocketConnectionError
  30. from yarl import URL
  31. from mautrix.util.logging import TraceLogger
  32. from ..errors import MQTTNotLoggedIn, MQTTNotConnected
  33. from ..state import AndroidState
  34. from ..types import (CommandResponse, ThreadItemType, ThreadAction, ReactionStatus, TypingStatus,
  35. IrisPayload, PubsubPayload, AppPresenceEventPayload, RealtimeDirectEvent,
  36. RealtimeZeroProvisionPayload, ClientConfigUpdatePayload, MessageSyncEvent,
  37. MessageSyncMessage, LiveVideoCommentPayload, PubsubEvent, IrisPayloadData,
  38. ThreadSyncEvent)
  39. from .thrift import RealtimeConfig, RealtimeClientInfo, ForegroundStateConfig, IncomingMessage
  40. from .otclient import MQTToTClient
  41. from .subscription import everclear_subscriptions, RealtimeTopic, GraphQLQueryID
  42. from .events import Connect, Disconnect
  43. try:
  44. import socks
  45. except ImportError:
  46. socks = None
  47. T = TypeVar('T')
  48. ACTIVITY_INDICATOR_REGEX = re.compile(
  49. r"/direct_v2/threads/([\w_]+)/activity_indicator_id/([\w_]+)")
  50. INBOX_THREAD_REGEX = re.compile(
  51. r"/direct_v2/inbox/threads/([\w_]+)")
  52. class AndroidMQTT:
  53. _loop: asyncio.AbstractEventLoop
  54. _client: MQTToTClient
  55. log: TraceLogger
  56. state: AndroidState
  57. _graphql_subs: Set[str]
  58. _skywalker_subs: Set[str]
  59. _iris_seq_id: Optional[int]
  60. _iris_snapshot_at_ms: Optional[int]
  61. _publish_waiters: Dict[int, asyncio.Future]
  62. _response_waiters: Dict[RealtimeTopic, asyncio.Future]
  63. _response_waiter_locks: Dict[RealtimeTopic, asyncio.Lock]
  64. _disconnect_error: Optional[Exception]
  65. _event_handlers: Dict[Type[T], List[Callable[[T], Awaitable[None]]]]
  66. # region Initialization
  67. def __init__(self, state: AndroidState, loop: Optional[asyncio.AbstractEventLoop] = None,
  68. log: Optional[TraceLogger] = None) -> None:
  69. self._graphql_subs = set()
  70. self._skywalker_subs = set()
  71. self._iris_seq_id = None
  72. self._iris_snapshot_at_ms = None
  73. self._publish_waiters = {}
  74. self._response_waiters = {}
  75. self._disconnect_error = None
  76. self._response_waiter_locks = defaultdict(lambda: asyncio.Lock())
  77. self._event_handlers = defaultdict(lambda: [])
  78. self.log = log or logging.getLogger("mauigpapi.mqtt")
  79. self._loop = loop or asyncio.get_event_loop()
  80. self.state = state
  81. self._client = MQTToTClient(
  82. client_id=self._form_client_id(),
  83. clean_session=True,
  84. protocol=paho.mqtt.client.MQTTv31,
  85. transport="tcp",
  86. )
  87. try:
  88. http_proxy = urllib.request.getproxies()["http"]
  89. except KeyError:
  90. http_proxy = None
  91. if http_proxy and socks and URL:
  92. proxy_url = URL(http_proxy)
  93. proxy_type = {
  94. "http": socks.HTTP,
  95. "https": socks.HTTP,
  96. "socks": socks.SOCKS5,
  97. "socks5": socks.SOCKS5,
  98. "socks4": socks.SOCKS4,
  99. }[proxy_url.scheme]
  100. self._client.proxy_set(proxy_type=proxy_type, proxy_addr=proxy_url.host,
  101. proxy_port=proxy_url.port, proxy_username=proxy_url.user,
  102. proxy_password=proxy_url.password)
  103. self._client.enable_logger()
  104. self._client.tls_set()
  105. # mqtt.max_inflight_messages_set(20) # The rest will get queued
  106. # mqtt.max_queued_messages_set(0) # Unlimited messages can be queued
  107. # mqtt.message_retry_set(20) # Retry sending for at least 20 seconds
  108. # mqtt.reconnect_delay_set(min_delay=1, max_delay=120)
  109. self._client.connect_async("edge-mqtt.facebook.com", 443, keepalive=60)
  110. self._client.on_message = self._on_message_handler
  111. self._client.on_publish = self._on_publish_handler
  112. self._client.on_connect = self._on_connect_handler
  113. # self._client.on_disconnect = self._on_disconnect_handler
  114. self._client.on_socket_open = self._on_socket_open
  115. self._client.on_socket_close = self._on_socket_close
  116. self._client.on_socket_register_write = self._on_socket_register_write
  117. self._client.on_socket_unregister_write = self._on_socket_unregister_write
  118. def _form_client_id(self) -> bytes:
  119. subscribe_topics = [RealtimeTopic.PUBSUB, RealtimeTopic.SUB_IRIS_RESPONSE,
  120. RealtimeTopic.REALTIME_SUB, RealtimeTopic.REGION_HINT,
  121. RealtimeTopic.SEND_MESSAGE_RESPONSE, RealtimeTopic.MESSAGE_SYNC,
  122. RealtimeTopic.UNKNOWN_179, RealtimeTopic.UNKNOWN_PP]
  123. subscribe_topic_ids = [int(topic.encoded) for topic in subscribe_topics]
  124. password = f"sessionid={self.state.cookies['sessionid']}"
  125. cfg = RealtimeConfig(
  126. client_identifier=self.state.device.phone_id[:20],
  127. client_info=RealtimeClientInfo(
  128. user_id=int(self.state.user_id),
  129. user_agent=self.state.user_agent,
  130. client_capabilities=0b10110111,
  131. endpoint_capabilities=0,
  132. publish_format=1,
  133. no_automatic_foreground=True,
  134. make_user_available_in_foreground=False,
  135. device_id=self.state.device.phone_id,
  136. is_initially_foreground=True,
  137. network_type=1,
  138. network_subtype=0,
  139. client_mqtt_session_id=int(time.time() * 1000) & 0xffffffff,
  140. subscribe_topics=subscribe_topic_ids,
  141. client_type="cookie_auth",
  142. app_id=567067343352427,
  143. region_preference=self.state.session.region_hint or "LLA",
  144. device_secret="",
  145. client_stack=3,
  146. ),
  147. password=password,
  148. app_specific_info={
  149. "app_version": self.state.application.APP_VERSION,
  150. "X-IG-Capabilities": self.state.application.CAPABILITIES,
  151. "everclear_subscriptions": json.dumps(everclear_subscriptions),
  152. "User-Agent": self.state.user_agent,
  153. "Accept-Language": self.state.device.language.replace("_", "-"),
  154. "platform": "android",
  155. "ig_mqtt_route": "django",
  156. "pubsub_msg_type_blacklist": "direct, typing_type",
  157. "auth_cache_enabled": "0",
  158. },
  159. )
  160. return zlib.compress(cfg.to_thrift(), level=9)
  161. # endregion
  162. def _on_socket_open(self, client: MQTToTClient, _: Any, sock: socket) -> None:
  163. self._loop.add_reader(sock, client.loop_read)
  164. def _on_socket_close(self, client: MQTToTClient, _: Any, sock: socket) -> None:
  165. self._loop.remove_reader(sock)
  166. def _on_socket_register_write(self, client: MQTToTClient, _: Any, sock: socket) -> None:
  167. self._loop.add_writer(sock, client.loop_write)
  168. def _on_socket_unregister_write(self, client: MQTToTClient, _: Any, sock: socket) -> None:
  169. self._loop.remove_writer(sock)
  170. def _on_connect_handler(self, client: MQTToTClient, _: Any, flags: Dict[str, Any], rc: int
  171. ) -> None:
  172. if rc != 0:
  173. err = paho.mqtt.client.connack_string(rc)
  174. self.log.error("MQTT Connection Error: %s (%d)", err, rc)
  175. return
  176. self._loop.create_task(self._post_connect())
  177. async def _post_connect(self) -> None:
  178. await self._dispatch(Connect())
  179. self.log.debug("Re-subscribing to things after connect")
  180. if self._graphql_subs:
  181. res = await self.graphql_subscribe(self._graphql_subs)
  182. self.log.trace("GraphQL subscribe response: %s", res)
  183. if self._skywalker_subs:
  184. res = await self.skywalker_subscribe(self._skywalker_subs)
  185. self.log.trace("Skywalker subscribe response: %s", res)
  186. if self._iris_seq_id:
  187. await self.iris_subscribe(self._iris_seq_id, self._iris_snapshot_at_ms)
  188. def _on_publish_handler(self, client: MQTToTClient, _: Any, mid: int) -> None:
  189. try:
  190. waiter = self._publish_waiters[mid]
  191. except KeyError:
  192. return
  193. waiter.set_result(None)
  194. # region Incoming event parsing
  195. def _parse_direct_thread_path(self, path: str) -> dict:
  196. try:
  197. blank, direct_v2, threads, thread_id, *rest = path.split("/")
  198. assert blank == ""
  199. assert direct_v2 == "direct_v2"
  200. assert threads == "threads"
  201. except (AssertionError, ValueError, IndexError) as e:
  202. self.log.debug(f"Got {e!r} while parsing path {path}")
  203. raise
  204. additional = {
  205. "thread_id": thread_id
  206. }
  207. if rest:
  208. subitem_key = rest[0]
  209. if subitem_key == "approval_required_for_new_members":
  210. additional["approval_required_for_new_members"] = True
  211. elif subitem_key == "participants" and len(rest) > 2 and rest[2] == "has_seen":
  212. additional["has_seen"] = int(rest[1])
  213. elif subitem_key == "items":
  214. additional["item_id"] = rest[1]
  215. # TODO wtf is this?
  216. # it has something to do with reactions
  217. if len(rest) > 4:
  218. additional[rest[2]] = {
  219. rest[3]: rest[4],
  220. }
  221. elif subitem_key in "admin_user_ids":
  222. additional["admin_user_id"] = int(rest[1])
  223. elif subitem_key == "activity_indicator_id":
  224. additional["activity_indicator_id"] = rest[1]
  225. self.log.trace("Parsed path %s -> %s", path, additional)
  226. return additional
  227. def _on_messager_sync_item(self, part: IrisPayloadData, parsed_item: IrisPayload) -> None:
  228. if part.path.startswith("/direct_v2/threads/"):
  229. raw_message = {
  230. "path": part.path,
  231. "op": part.op,
  232. **self._parse_direct_thread_path(part.path),
  233. }
  234. try:
  235. raw_message = {
  236. **raw_message,
  237. **json.loads(part.value),
  238. }
  239. except (json.JSONDecodeError, TypeError):
  240. raw_message["value"] = part.value
  241. message = MessageSyncMessage.deserialize(raw_message)
  242. evt = MessageSyncEvent(iris=parsed_item, message=message)
  243. elif part.path.startswith("/direct_v2/inbox/threads/"):
  244. raw_message = {
  245. "path": part.path,
  246. "op": part.op,
  247. **json.loads(part.value),
  248. }
  249. evt = ThreadSyncEvent.deserialize(raw_message)
  250. else:
  251. self.log.warning(f"Unsupported path {part.path}")
  252. return
  253. self._loop.create_task(self._dispatch(evt))
  254. def _on_message_sync(self, payload: bytes) -> None:
  255. parsed = json.loads(payload.decode("utf-8"))
  256. self.log.trace("Got message sync event: %s", parsed)
  257. for sync_item in parsed:
  258. parsed_item = IrisPayload.deserialize(sync_item)
  259. for part in parsed_item.data:
  260. self._on_messager_sync_item(part, parsed_item)
  261. def _on_pubsub(self, payload: bytes) -> None:
  262. parsed_thrift = IncomingMessage.from_thrift(payload)
  263. self.log.trace(f"Got pubsub event {parsed_thrift.topic} / {parsed_thrift.payload}")
  264. message = PubsubPayload.parse_json(parsed_thrift.payload)
  265. for data in message.data:
  266. match = ACTIVITY_INDICATOR_REGEX.match(data.path)
  267. if match:
  268. evt = PubsubEvent(data=data, base=message, thread_id=match.group(1),
  269. activity_indicator_id=match.group(2))
  270. self._loop.create_task(self._dispatch(evt))
  271. elif not data.double_publish:
  272. self.log.debug("Pubsub: no activity indicator on data: %s", data)
  273. else:
  274. self.log.debug("Pubsub: double publish: %s", data.path)
  275. def _parse_realtime_sub_item(self, topic: Union[str, GraphQLQueryID], raw: dict
  276. ) -> Iterable[Any]:
  277. if topic == GraphQLQueryID.APP_PRESENCE:
  278. yield AppPresenceEventPayload.deserialize(raw).presence_event
  279. elif topic == GraphQLQueryID.ZERO_PROVISION:
  280. yield RealtimeZeroProvisionPayload.deserialize(raw).zero_product_provisioning_event
  281. elif topic == GraphQLQueryID.CLIENT_CONFIG_UPDATE:
  282. yield ClientConfigUpdatePayload.deserialize(raw).client_config_update_event
  283. elif topic == GraphQLQueryID.LIVE_REALTIME_COMMENTS:
  284. yield LiveVideoCommentPayload.deserialize(raw).live_video_comment_event
  285. elif topic == "direct":
  286. event = raw["event"]
  287. for item in raw["data"]:
  288. yield RealtimeDirectEvent.deserialize({
  289. "event": event,
  290. **self._parse_direct_thread_path(item["path"]),
  291. **item,
  292. })
  293. def _on_realtime_sub(self, payload: bytes) -> None:
  294. parsed_thrift = IncomingMessage.from_thrift(payload)
  295. try:
  296. topic = GraphQLQueryID(parsed_thrift.topic)
  297. except ValueError:
  298. topic = parsed_thrift.topic
  299. self.log.trace(f"Got realtime sub event {topic} / {parsed_thrift.payload}")
  300. allowed = ("direct", GraphQLQueryID.APP_PRESENCE, GraphQLQueryID.ZERO_PROVISION,
  301. GraphQLQueryID.CLIENT_CONFIG_UPDATE, GraphQLQueryID.LIVE_REALTIME_COMMENTS)
  302. if topic not in allowed:
  303. return
  304. parsed_json = json.loads(parsed_thrift.payload)
  305. for evt in self._parse_realtime_sub_item(topic, parsed_json):
  306. self._loop.create_task(self._dispatch(evt))
  307. def _on_message_handler(self, client: MQTToTClient, _: Any, message: MQTTMessage) -> None:
  308. try:
  309. topic = RealtimeTopic.decode(message.topic)
  310. # Instagram Android MQTT messages are always compressed
  311. message.payload = zlib.decompress(message.payload)
  312. if topic == RealtimeTopic.MESSAGE_SYNC:
  313. self._on_message_sync(message.payload)
  314. elif topic == RealtimeTopic.PUBSUB:
  315. self._on_pubsub(message.payload)
  316. elif topic == RealtimeTopic.REALTIME_SUB:
  317. self._on_realtime_sub(message.payload)
  318. else:
  319. self.log.trace("Other message payload: %s", message.payload)
  320. try:
  321. waiter = self._response_waiters.pop(topic)
  322. except KeyError:
  323. self.log.debug("No handler for MQTT message in %s: %s",
  324. topic.value, message.payload)
  325. else:
  326. waiter.set_result(message)
  327. except Exception:
  328. self.log.exception("Error in incoming MQTT message handler")
  329. self.log.trace("Errored MQTT payload: %s", message.payload)
  330. # endregion
  331. async def _reconnect(self) -> None:
  332. try:
  333. self._client.reconnect()
  334. except (SocketError, OSError, WebsocketConnectionError) as e:
  335. # TODO custom class
  336. raise MQTTNotLoggedIn("MQTT reconnection failed") from e
  337. def add_event_handler(self, evt_type: Type[T], handler: Callable[[T], Awaitable[None]]
  338. ) -> None:
  339. self._event_handlers[evt_type].append(handler)
  340. async def _dispatch(self, evt: T) -> None:
  341. for handler in self._event_handlers[type(evt)]:
  342. try:
  343. await handler(evt)
  344. except Exception:
  345. self.log.exception(f"Error in {type(evt)} handler")
  346. def disconnect(self) -> None:
  347. self._client.disconnect()
  348. async def listen(self, graphql_subs: Set[str] = None, skywalker_subs: Set[str] = None,
  349. seq_id: int = None, snapshot_at_ms: int = None) -> None:
  350. self._graphql_subs = graphql_subs or set()
  351. self._skywalker_subs = skywalker_subs or set()
  352. self._iris_seq_id = seq_id
  353. self._iris_snapshot_at_ms = snapshot_at_ms
  354. self.log.debug("Connecting to Instagram MQTT")
  355. await self._reconnect()
  356. exit_if_not_connected = False
  357. while True:
  358. try:
  359. await asyncio.sleep(1)
  360. except asyncio.CancelledError:
  361. self.disconnect()
  362. # this might not be necessary
  363. self._client.loop_misc()
  364. break
  365. rc = self._client.loop_misc()
  366. # If disconnect() has been called
  367. # Beware, internal API, may have to change this to something more stable!
  368. if self._client._state == paho.mqtt.client.mqtt_cs_disconnecting:
  369. break # Stop listening
  370. if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
  371. # If known/expected error
  372. if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST:
  373. await self._dispatch(Disconnect(reason="Connection lost, retrying"))
  374. elif rc == paho.mqtt.client.MQTT_ERR_NOMEM:
  375. # This error is wrongly classified
  376. # See https://github.com/eclipse/paho.mqtt.python/issues/340
  377. await self._dispatch(Disconnect(reason="Connection lost, retrying"))
  378. elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED:
  379. raise MQTTNotLoggedIn("MQTT connection refused")
  380. elif rc == paho.mqtt.client.MQTT_ERR_NO_CONN:
  381. if exit_if_not_connected:
  382. raise MQTTNotConnected("MQTT error: no connection")
  383. await self._dispatch(Disconnect(reason="MQTT Error: no connection, retrying"))
  384. else:
  385. err = paho.mqtt.client.error_string(rc)
  386. self.log.error("MQTT Error: %s", err)
  387. await self._dispatch(Disconnect(reason=f"MQTT Error: {err}, retrying"))
  388. await self._reconnect()
  389. exit_if_not_connected = True
  390. else:
  391. exit_if_not_connected = False
  392. if self._disconnect_error:
  393. self.log.info("disconnect_error is set, raising and clearing variable")
  394. err = self._disconnect_error
  395. self._disconnect_error = None
  396. raise err
  397. # region Basic outgoing MQTT
  398. def publish(self, topic: RealtimeTopic, payload: Union[str, bytes, dict]
  399. ) -> asyncio.Future:
  400. if isinstance(payload, dict):
  401. payload = json.dumps(payload)
  402. if isinstance(payload, str):
  403. payload = payload.encode("utf-8")
  404. payload = zlib.compress(payload, level=9)
  405. info = self._client.publish(topic.encoded, payload, qos=1)
  406. fut = asyncio.Future()
  407. self._publish_waiters[info.mid] = fut
  408. return fut
  409. async def request(self, topic: RealtimeTopic, response: RealtimeTopic,
  410. payload: Union[str, bytes, dict]) -> MQTTMessage:
  411. async with self._response_waiter_locks[response]:
  412. fut = asyncio.Future()
  413. self._response_waiters[response] = fut
  414. await self.publish(topic, payload)
  415. return await fut
  416. async def iris_subscribe(self, seq_id: int, snapshot_at_ms: int) -> None:
  417. self.log.debug(f"Requesting iris subscribe {seq_id}/{snapshot_at_ms}")
  418. resp = await self.request(RealtimeTopic.SUB_IRIS, RealtimeTopic.SUB_IRIS_RESPONSE,
  419. {"seq_id": seq_id, "snapshot_at_ms": snapshot_at_ms})
  420. # TODO check succeeded and raise error if needed
  421. self.log.debug("Iris subscribe response: %s", resp.payload.decode("utf-8"))
  422. def graphql_subscribe(self, subs: Set[str]) -> asyncio.Future:
  423. self._graphql_subs |= subs
  424. return self.publish(RealtimeTopic.REALTIME_SUB, {"sub": list(subs)})
  425. def graphql_unsubscribe(self, subs: Set[str]) -> asyncio.Future:
  426. self._graphql_subs -= subs
  427. return self.publish(RealtimeTopic.REALTIME_SUB, {"unsub": list(subs)})
  428. def skywalker_subscribe(self, subs: Set[str]) -> asyncio.Future:
  429. self._skywalker_subs |= subs
  430. return self.publish(RealtimeTopic.PUBSUB, {"sub": list(subs)})
  431. def skywalker_unsubscribe(self, subs: Set[str]) -> asyncio.Future:
  432. self._skywalker_subs -= subs
  433. return self.publish(RealtimeTopic.PUBSUB, {"unsub": list(subs)})
  434. # endregion
  435. # region Actually sending messages and stuff
  436. async def send_foreground_state(self, state: ForegroundStateConfig) -> None:
  437. self.log.debug("Updating foreground state: %s", state)
  438. await self.publish(RealtimeTopic.FOREGROUND_STATE,
  439. zlib.compress(state.to_thrift(), level=9))
  440. if state.keep_alive_timeout:
  441. self._client._keepalive = state.keep_alive_timeout
  442. async def send_command(self, thread_id: str, action: ThreadAction,
  443. client_context: Optional[str] = None,
  444. offline_threading_id: Optional[str] = None, **kwargs: Any
  445. ) -> CommandResponse:
  446. client_context = client_context or str(uuid4())
  447. req = {
  448. "thread_id": thread_id,
  449. "client_context": client_context,
  450. "offline_threading_id": offline_threading_id or client_context,
  451. "action": action.value,
  452. # "device_id": self.state.cookies["ig_did"],
  453. **kwargs,
  454. }
  455. resp = await self.request(RealtimeTopic.SEND_MESSAGE, RealtimeTopic.SEND_MESSAGE_RESPONSE,
  456. payload=req)
  457. return CommandResponse.parse_json(resp.payload.decode("utf-8"))
  458. def send_item(self, thread_id: str, item_type: ThreadItemType, shh_mode: bool = False,
  459. client_context: Optional[str] = None, offline_threading_id: Optional[str] = None,
  460. **kwargs: Any) -> Awaitable[CommandResponse]:
  461. return self.send_command(thread_id, item_type=item_type.value,
  462. is_shh_mode=str(int(shh_mode)), action=ThreadAction.SEND_ITEM,
  463. client_context=client_context,
  464. offline_threading_id=offline_threading_id, **kwargs)
  465. def send_hashtag(self, thread_id: str, hashtag: str, text: str = "", shh_mode: bool = False,
  466. client_context: Optional[str] = None,
  467. offline_threading_id: Optional[str] = None) -> Awaitable[CommandResponse]:
  468. return self.send_item(thread_id, text=text, item_id=hashtag, shh_mode=shh_mode,
  469. item_type=ThreadItemType.HASHTAG, client_context=client_context,
  470. offline_threading_id=offline_threading_id)
  471. def send_like(self, thread_id: str, shh_mode: bool = False,
  472. client_context: Optional[str] = None, offline_threading_id: Optional[str] = None,
  473. ) -> Awaitable[CommandResponse]:
  474. return self.send_item(thread_id, shh_mode=shh_mode, item_type=ThreadItemType.LIKE,
  475. client_context=client_context,
  476. offline_threading_id=offline_threading_id)
  477. def send_location(self, thread_id: str, venue_id: str, text: str = "",
  478. shh_mode: bool = False, client_context: Optional[str] = None,
  479. offline_threading_id: Optional[str] = None,
  480. ) -> Awaitable[CommandResponse]:
  481. return self.send_item(thread_id, text=text, item_id=venue_id, shh_mode=shh_mode,
  482. item_type=ThreadItemType.LOCATION, client_context=client_context,
  483. offline_threading_id=offline_threading_id)
  484. def send_media(self, thread_id: str, media_id: str, text: str = "", shh_mode: bool = False,
  485. client_context: Optional[str] = None,
  486. offline_threading_id: Optional[str] = None) -> Awaitable[CommandResponse]:
  487. return self.send_item(thread_id, text=text, media_id=media_id, shh_mode=shh_mode,
  488. item_type=ThreadItemType.MEDIA_SHARE, client_context=client_context,
  489. offline_threading_id=offline_threading_id)
  490. def send_profile(self, thread_id: str, user_id: str, text: str = "", shh_mode: bool = False,
  491. client_context: Optional[str] = None,
  492. offline_threading_id: Optional[str] = None) -> Awaitable[CommandResponse]:
  493. return self.send_item(thread_id, text=text, item_id=user_id, shh_mode=shh_mode,
  494. item_type=ThreadItemType.PROFILE, client_context=client_context,
  495. offline_threading_id=offline_threading_id)
  496. def send_reaction(self, thread_id: str, emoji: str, item_id: str,
  497. reaction_status: ReactionStatus = ReactionStatus.CREATED,
  498. target_item_type: ThreadItemType = ThreadItemType.TEXT,
  499. shh_mode: bool = False, client_context: Optional[str] = None,
  500. offline_threading_id: Optional[str] = None) -> Awaitable[CommandResponse]:
  501. return self.send_item(thread_id, reaction_status=reaction_status.value, node_type="item",
  502. reaction_type="like", target_item_type=target_item_type.value,
  503. emoji=emoji, item_id=item_id, reaction_action_source="double_tap",
  504. shh_mode=shh_mode, item_type=ThreadItemType.REACTION,
  505. client_context=client_context,
  506. offline_threading_id=offline_threading_id)
  507. def send_user_story(self, thread_id: str, media_id: str, text: str = "",
  508. shh_mode: bool = False, client_context: Optional[str] = None,
  509. offline_threading_id: Optional[str] = None) -> Awaitable[CommandResponse]:
  510. return self.send_item(thread_id, text=text, item_id=media_id, shh_mode=shh_mode,
  511. item_type=ThreadItemType.REEL_SHARE, client_context=client_context,
  512. offline_threading_id=offline_threading_id)
  513. def send_text(self, thread_id: str, text: str = "", shh_mode: bool = False,
  514. client_context: Optional[str] = None, offline_threading_id: Optional[str] = None
  515. ) -> Awaitable[CommandResponse]:
  516. return self.send_item(thread_id, text=text, shh_mode=shh_mode,
  517. item_type=ThreadItemType.TEXT, client_context=client_context,
  518. offline_threading_id=offline_threading_id)
  519. def mark_seen(self, thread_id: str, item_id: str, client_context: Optional[str] = None,
  520. offline_threading_id: Optional[str] = None) -> Awaitable[CommandResponse]:
  521. return self.send_command(thread_id, item_id=item_id, action=ThreadAction.MARK_SEEN,
  522. client_context=client_context,
  523. offline_threading_id=offline_threading_id)
  524. def mark_visual_item_seen(self, thread_id: str, item_id: str,
  525. client_context: Optional[str] = None,
  526. offline_threading_id: Optional[str] = None
  527. ) -> Awaitable[CommandResponse]:
  528. return self.send_command(thread_id, item_id=item_id,
  529. action=ThreadAction.MARK_VISUAL_ITEM_SEEN,
  530. client_context=client_context,
  531. offline_threading_id=offline_threading_id)
  532. def indicate_activity(self, thread_id: str, activity_status: TypingStatus = TypingStatus.TEXT,
  533. client_context: Optional[str] = None,
  534. offline_threading_id: Optional[str] = None
  535. ) -> Awaitable[CommandResponse]:
  536. return self.send_command(thread_id, activity_status=activity_status.value,
  537. action=ThreadAction.INDICATE_ACTIVITY,
  538. client_context=client_context,
  539. offline_threading_id=offline_threading_id)
  540. # endregion