conn.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2020 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from typing import Union, Set, Optional, Any, Dict, Awaitable, Type, List, TypeVar, Callable
  17. from collections import defaultdict
  18. from socket import socket, error as SocketError
  19. from uuid import uuid4
  20. import logging
  21. import urllib.request
  22. import asyncio
  23. import zlib
  24. import time
  25. import json
  26. import re
  27. import paho.mqtt.client
  28. from paho.mqtt.client import MQTTMessage, WebsocketConnectionError
  29. from yarl import URL
  30. from mautrix.util.logging import TraceLogger
  31. from ..errors import NotLoggedIn, NotConnected
  32. from ..state import AndroidState
  33. from .thrift import RealtimeConfig, RealtimeClientInfo, ForegroundStateConfig, IncomingMessage
  34. from .otclient import MQTToTClient
  35. from .subscription import everclear_subscriptions
  36. from .types import (RealtimeTopic, CommandResponse, ThreadItemType, ThreadAction, ReactionStatus,
  37. TypingStatus, IrisPayload, PubsubPayload, AppPresenceEventPayload,
  38. RealtimeDirectEvent, RealtimeZeroProvisionPayload, ClientConfigUpdatePayload,
  39. LiveVideoCommentPayload, PubsubEvent, MessageSyncEvent, MessageSyncMessage)
  40. from .subscription import GraphQLQueryID
  41. from .events import Connect, Disconnect
  42. try:
  43. import socks
  44. except ImportError:
  45. socks = None
  46. T = TypeVar('T')
  47. ACTIVITY_INDICATOR_REGEX = re.compile(
  48. r"/direct_v2/threads/([\w_]+)/activity_indicator_id/([\w_]+)")
  49. class AndroidMQTT:
  50. _loop: asyncio.AbstractEventLoop
  51. _client: MQTToTClient
  52. log: TraceLogger
  53. state: AndroidState
  54. _graphql_subs: Set[str]
  55. _skywalker_subs: Set[str]
  56. _iris_seq_id: Optional[int]
  57. _iris_snapshot_at_ms: Optional[int]
  58. _publish_waiters: Dict[int, asyncio.Future]
  59. _response_waiters: Dict[RealtimeTopic, asyncio.Future]
  60. _response_waiter_locks: Dict[RealtimeTopic, asyncio.Lock]
  61. _disconnect_error: Optional[Exception]
  62. _event_handlers: Dict[Type[T], List[Callable[[T], Awaitable[None]]]]
  63. # region Initialization
  64. def __init__(self, state: AndroidState, loop: Optional[asyncio.AbstractEventLoop] = None,
  65. log: Optional[TraceLogger] = None) -> None:
  66. self._graphql_subs = set()
  67. self._skywalker_subs = set()
  68. self._iris_seq_id = None
  69. self._iris_snapshot_at_ms = None
  70. self._publish_waiters = {}
  71. self._response_waiters = {}
  72. self._disconnect_error = None
  73. self._response_waiter_locks = defaultdict(lambda: asyncio.Lock())
  74. self._event_handlers = defaultdict(lambda: [])
  75. self.log = log or logging.getLogger("mauigpapi")
  76. self._loop = loop or asyncio.get_event_loop()
  77. self.state = state
  78. self._client = MQTToTClient(
  79. client_id=self._form_client_id(),
  80. clean_session=True,
  81. protocol=paho.mqtt.client.MQTTv31,
  82. transport="tcp",
  83. )
  84. try:
  85. http_proxy = urllib.request.getproxies()["http"]
  86. except KeyError:
  87. http_proxy = None
  88. if http_proxy and socks and URL:
  89. proxy_url = URL(http_proxy)
  90. proxy_type = {
  91. "http": socks.HTTP,
  92. "https": socks.HTTP,
  93. "socks": socks.SOCKS5,
  94. "socks5": socks.SOCKS5,
  95. "socks4": socks.SOCKS4,
  96. }[proxy_url.scheme]
  97. self._client.proxy_set(proxy_type=proxy_type, proxy_addr=proxy_url.host,
  98. proxy_port=proxy_url.port, proxy_username=proxy_url.user,
  99. proxy_password=proxy_url.password)
  100. self._client.enable_logger()
  101. self._client.tls_set()
  102. # mqtt.max_inflight_messages_set(20) # The rest will get queued
  103. # mqtt.max_queued_messages_set(0) # Unlimited messages can be queued
  104. # mqtt.message_retry_set(20) # Retry sending for at least 20 seconds
  105. # mqtt.reconnect_delay_set(min_delay=1, max_delay=120)
  106. self._client.connect_async("edge-mqtt.facebook.com", 443, keepalive=60)
  107. self._client.on_message = self._on_message_handler
  108. self._client.on_publish = self._on_publish_handler
  109. self._client.on_connect = self._on_connect_handler
  110. self._client.on_disconnect = self._on_disconnect_handler
  111. self._client.on_socket_open = self._on_socket_open
  112. self._client.on_socket_close = self._on_socket_close
  113. self._client.on_socket_register_write = self._on_socket_register_write
  114. self._client.on_socket_unregister_write = self._on_socket_unregister_write
  115. def _form_client_id(self) -> bytes:
  116. subscribe_topics = [RealtimeTopic.PUBSUB, RealtimeTopic.SUB_IRIS_RESPONSE,
  117. RealtimeTopic.REALTIME_SUB, RealtimeTopic.REGION_HINT,
  118. RealtimeTopic.SEND_MESSAGE_RESPONSE, RealtimeTopic.MESSAGE_SYNC,
  119. RealtimeTopic.UNKNOWN_179, RealtimeTopic.UNKNOWN_PP]
  120. subscribe_topic_ids = [int(topic.encoded) for topic in subscribe_topics]
  121. password = f"sessionid={self.state.cookies['sessionid']}"
  122. cfg = RealtimeConfig(
  123. client_identifier=self.state.device.phone_id[:20],
  124. client_info=RealtimeClientInfo(
  125. user_id=int(self.state.user_id),
  126. user_agent=self.state.user_agent,
  127. client_capabilities=0b10110111,
  128. endpoint_capabilities=0,
  129. publish_format=1,
  130. no_automatic_foreground=True,
  131. make_user_available_in_foreground=False,
  132. device_id=self.state.device.phone_id,
  133. is_initially_foreground=True,
  134. network_type=1,
  135. network_subtype=0,
  136. client_mqtt_session_id=int(time.time() * 1000) & 0xffffffff,
  137. subscribe_topics=subscribe_topic_ids,
  138. client_type="cookie_auth",
  139. app_id=567067343352427,
  140. region_preference=self.state.session.region_hint or "LLA",
  141. device_secret="",
  142. client_stack=3,
  143. ),
  144. password=password,
  145. app_specific_info={
  146. "app_version": self.state.application.APP_VERSION,
  147. "X-IG-Capabilities": self.state.application.CAPABILITIES,
  148. "everclear_subscriptions": json.dumps(everclear_subscriptions),
  149. "User-Agent": self.state.user_agent,
  150. "Accept-Language": self.state.device.language.replace("_", "-"),
  151. "platform": "android",
  152. "ig_mqtt_route": "django",
  153. "pubsub_msg_type_blacklist": "direct, typing_type",
  154. "auth_cache_enabled": "0",
  155. },
  156. )
  157. return zlib.compress(cfg.to_thrift(), level=9)
  158. # endregion
  159. def _on_socket_open(self, client: MQTToTClient, _: Any, sock: socket) -> None:
  160. self._loop.add_reader(sock, client.loop_read)
  161. def _on_socket_close(self, client: MQTToTClient, _: Any, sock: socket) -> None:
  162. self._loop.remove_reader(sock)
  163. def _on_socket_register_write(self, client: MQTToTClient, _: Any, sock: socket) -> None:
  164. self._loop.add_writer(sock, client.loop_write)
  165. def _on_socket_unregister_write(self, client: MQTToTClient, _: Any, sock: socket) -> None:
  166. self._loop.remove_writer(sock)
  167. def _on_disconnect_handler(self, client: MQTToTClient, _: Any, rc: int) -> None:
  168. print(f"_on_disconnect_handler({rc})")
  169. def _on_connect_handler(self, client: MQTToTClient, _: Any, flags: Dict[str, Any], rc: int
  170. ) -> None:
  171. print(f"_on_connect_handler({flags}, {rc})")
  172. if rc != 0:
  173. err = paho.mqtt.client.connack_string(rc)
  174. self.log.error("MQTT Connection Error: %s (%d)", err, rc)
  175. return
  176. self._loop.create_task(self._post_connect())
  177. async def _post_connect(self) -> None:
  178. self.log.debug("Re-subscribing to things after connect")
  179. if self._graphql_subs:
  180. res = await self.graphql_subscribe(self._graphql_subs)
  181. self.log.trace("GraphQL subscribe response: %s", res)
  182. if self._skywalker_subs:
  183. res = await self.skywalker_subscribe(self._skywalker_subs)
  184. self.log.trace("Skywalker subscribe response: %s", res)
  185. if self._iris_seq_id:
  186. await self.iris_subscribe(self._iris_seq_id, self._iris_snapshot_at_ms)
  187. def _on_publish_handler(self, client: MQTToTClient, _: Any, mid: int) -> None:
  188. self.log.trace(f"Received publish confirmation for {mid}")
  189. try:
  190. waiter = self._publish_waiters[mid]
  191. except KeyError:
  192. return
  193. waiter.set_result(None)
  194. # region Incoming event parsing
  195. @staticmethod
  196. def _parse_direct_thread_path(path: str) -> dict:
  197. blank, direct_v2, threads, thread_id, *rest = path.split("/")
  198. assert blank == ""
  199. assert direct_v2 == "direct_v2"
  200. assert threads == "threads"
  201. additional = {
  202. "thread_id": thread_id
  203. }
  204. if rest:
  205. if rest[0] == "admin_user_ids":
  206. additional["admin_user_ids"] = rest[1]
  207. elif rest[0] == "approval_required_for_new_members":
  208. additional["approval_required_for_new_members"] = True
  209. elif rest[0] == ["participants"]:
  210. additional["participants"] = {rest[1]: rest[2]}
  211. elif rest[0] == ["items"]:
  212. additional["item_id"] = rest[1]
  213. # TODO wtf is this?
  214. # it has something to do with reactions
  215. if len(rest) > 4:
  216. additional[rest[2]] = {
  217. rest[3]: rest[4],
  218. }
  219. return additional
  220. def _on_message_sync(self, payload: bytes) -> None:
  221. parsed = json.loads(payload.decode("utf-8"))
  222. self.log.trace("Got message sync event: %s", parsed)
  223. for sync_item in parsed:
  224. parsed_item = IrisPayload.deserialize(sync_item)
  225. for part in parsed_item.data:
  226. raw_message = {
  227. "path": part.path,
  228. "op": part.op,
  229. **self._parse_direct_thread_path(part.path),
  230. }
  231. try:
  232. raw_message = {
  233. **raw_message,
  234. **json.loads(part.value),
  235. }
  236. except json.JSONDecodeError:
  237. raw_message["value"] = part.value
  238. message = MessageSyncMessage.deserialize(raw_message)
  239. evt = MessageSyncEvent(iris=parsed_item, message=message)
  240. self._loop.create_task(self._dispatch(evt))
  241. def _on_pubsub(self, payload: bytes) -> None:
  242. parsed_thrift = IncomingMessage.from_thrift(payload)
  243. message = PubsubPayload.parse_json(parsed_thrift.payload)
  244. self.log.trace(f"Got pubsub event with topic {parsed_thrift.topic}: {message}")
  245. for data in message.data:
  246. match = ACTIVITY_INDICATOR_REGEX.match(data.path)
  247. if match:
  248. evt = PubsubEvent(data=data, base=message, thread_id=match.group(1),
  249. activity_indicator_id=match.group(2))
  250. self._loop.create_task(self._dispatch(evt))
  251. elif not data.double_publish:
  252. self.log.debug("Pubsub: no activity indicator on data: %s", data)
  253. else:
  254. self.log.debug("Pubsub: double publish: %s", data.path)
  255. @staticmethod
  256. def _parse_realtime_sub_item(topic: str, raw: dict) -> Any:
  257. if topic == GraphQLQueryID.appPresence:
  258. return AppPresenceEventPayload.deserialize(raw).presence_event
  259. elif topic == GraphQLQueryID.zeroProvision:
  260. return RealtimeZeroProvisionPayload.deserialize(raw).zero_product_provisioning_event
  261. elif topic == GraphQLQueryID.clientConfigUpdate:
  262. return ClientConfigUpdatePayload.deserialize(raw).client_config_update_event
  263. elif topic == GraphQLQueryID.liveRealtimeComments:
  264. return LiveVideoCommentPayload.deserialize(raw).live_video_comment_event
  265. elif topic == "direct":
  266. return RealtimeDirectEvent.deserialize(raw)
  267. def _on_realtime_sub(self, payload: bytes) -> None:
  268. parsed_thrift = IncomingMessage.from_thrift(payload)
  269. topic = parsed_thrift.topic
  270. if topic not in ("direct", GraphQLQueryID.appPresence, GraphQLQueryID.zeroProvision,
  271. GraphQLQueryID.clientConfigUpdate, GraphQLQueryID.liveRealtimeComments):
  272. self.log.debug(f"Got unknown realtime sub event {topic}: {parsed_thrift.payload}")
  273. parsed_json = json.loads(parsed_thrift.payload)
  274. event = parsed_json["event"]
  275. for item in parsed_json["data"]:
  276. evt = self._parse_realtime_sub_item(topic, item)
  277. self.log.trace(f"Got realtime sub event with topic {topic}/{event}: {evt}")
  278. self._loop.create_task(self._dispatch(evt))
  279. def _on_message_handler(self, client: MQTToTClient, _: Any, message: MQTTMessage) -> None:
  280. try:
  281. topic = RealtimeTopic.decode(message.topic)
  282. # Instagram Android MQTT messages are always compressed
  283. message.payload = zlib.decompress(message.payload)
  284. if topic == RealtimeTopic.MESSAGE_SYNC:
  285. self._on_message_sync(message.payload)
  286. elif topic == RealtimeTopic.PUBSUB:
  287. self._on_pubsub(message.payload)
  288. elif topic == RealtimeTopic.REALTIME_SUB:
  289. self._on_realtime_sub(message.payload)
  290. else:
  291. print("other message", message.payload)
  292. try:
  293. waiter = self._response_waiters.pop(topic)
  294. except KeyError:
  295. self.log.debug("No handler for MQTT message in %s: %s",
  296. topic.value, message.payload)
  297. else:
  298. waiter.set_result(message)
  299. except Exception:
  300. self.log.exception("Error in incoming MQTT message handler")
  301. print(message.payload)
  302. # endregion
  303. async def _reconnect(self) -> None:
  304. try:
  305. print("Reconnecting")
  306. self._client.reconnect()
  307. except (SocketError, OSError, WebsocketConnectionError) as e:
  308. # TODO custom class
  309. raise NotLoggedIn("MQTT reconnection failed") from e
  310. def add_event_handler(self, evt_type: Type[T], handler: Callable[[T], Awaitable[None]]
  311. ) -> None:
  312. self._event_handlers[evt_type].append(handler)
  313. async def _dispatch(self, evt: T) -> None:
  314. for handler in self._event_handlers[type(evt)]:
  315. try:
  316. await handler(evt)
  317. except Exception:
  318. self.log.exception(f"Error in {type(evt)} handler")
  319. def disconnect(self) -> None:
  320. self._client.disconnect()
  321. async def listen(self, graphql_subs: Set[str] = None, skywalker_subs: Set[str] = None,
  322. seq_id: int = None, snapshot_at_ms: int = None) -> None:
  323. self._graphql_subs = graphql_subs or set()
  324. self._skywalker_subs = skywalker_subs or set()
  325. self._iris_seq_id = seq_id
  326. self._iris_snapshot_at_ms = snapshot_at_ms
  327. await self._reconnect()
  328. await self._dispatch(Connect())
  329. exit_if_not_connected = False
  330. while True:
  331. try:
  332. await asyncio.sleep(1)
  333. except asyncio.CancelledError:
  334. self.disconnect()
  335. # this might not be necessary
  336. self._client.loop_misc()
  337. break
  338. rc = self._client.loop_misc()
  339. # If disconnect() has been called
  340. # Beware, internal API, may have to change this to something more stable!
  341. if self._client._state == paho.mqtt.client.mqtt_cs_disconnecting:
  342. break # Stop listening
  343. if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
  344. # If known/expected error
  345. if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST:
  346. await self._dispatch(Disconnect(reason="Connection lost, retrying"))
  347. elif rc == paho.mqtt.client.MQTT_ERR_NOMEM:
  348. # This error is wrongly classified
  349. # See https://github.com/eclipse/paho.mqtt.python/issues/340
  350. await self._dispatch(Disconnect(reason="Connection lost, retrying"))
  351. elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED:
  352. raise NotLoggedIn("MQTT connection refused")
  353. elif rc == paho.mqtt.client.MQTT_ERR_NO_CONN:
  354. if exit_if_not_connected:
  355. raise NotConnected("MQTT error: no connection")
  356. await self._dispatch(Disconnect(reason="MQTT Error: no connection, retrying"))
  357. else:
  358. err = paho.mqtt.client.error_string(rc)
  359. self.log.error("MQTT Error: %s", err)
  360. await self._dispatch(Disconnect(reason=f"MQTT Error: {err}, retrying"))
  361. await self._reconnect()
  362. exit_if_not_connected = True
  363. await self._dispatch(Connect())
  364. else:
  365. exit_if_not_connected = False
  366. if self._disconnect_error:
  367. self.log.info("disconnect_error is set, raising and clearing variable")
  368. err = self._disconnect_error
  369. self._disconnect_error = None
  370. raise err
  371. # region Basic outgoing MQTT
  372. def publish(self, topic: RealtimeTopic, payload: Union[str, bytes, dict]
  373. ) -> asyncio.Future:
  374. if isinstance(payload, dict):
  375. payload = json.dumps(payload)
  376. if isinstance(payload, str):
  377. payload = payload.encode("utf-8")
  378. payload = zlib.compress(payload, level=9)
  379. info = self._client.publish(topic.encoded, payload, qos=1)
  380. fut = asyncio.Future()
  381. self._publish_waiters[info.mid] = fut
  382. return fut
  383. async def request(self, topic: RealtimeTopic, response: RealtimeTopic,
  384. payload: Union[str, bytes, dict]) -> MQTTMessage:
  385. async with self._response_waiter_locks[response]:
  386. fut = asyncio.Future()
  387. self._response_waiters[response] = fut
  388. await self.publish(topic, payload)
  389. return await fut
  390. async def iris_subscribe(self, seq_id: int, snapshot_at_ms: int) -> None:
  391. resp = await self.request(RealtimeTopic.SUB_IRIS, RealtimeTopic.SUB_IRIS_RESPONSE,
  392. {"seq_id": seq_id, "snapshot_at_ms": snapshot_at_ms,
  393. "snapshot_app_version": "message"})
  394. self.log.debug("Iris subscribe response: %s", resp.payload.decode("utf-8"))
  395. def graphql_subscribe(self, subs: Set[str]) -> asyncio.Future:
  396. self._graphql_subs |= subs
  397. return self.publish(RealtimeTopic.REALTIME_SUB, {"sub": list(subs)})
  398. def graphql_unsubscribe(self, subs: Set[str]) -> asyncio.Future:
  399. self._graphql_subs -= subs
  400. return self.publish(RealtimeTopic.REALTIME_SUB, {"unsub": list(subs)})
  401. def skywalker_subscribe(self, subs: Set[str]) -> asyncio.Future:
  402. self._skywalker_subs |= subs
  403. return self.publish(RealtimeTopic.PUBSUB, {"sub": list(subs)})
  404. def skywalker_unsubscribe(self, subs: Set[str]) -> asyncio.Future:
  405. self._skywalker_subs -= subs
  406. return self.publish(RealtimeTopic.PUBSUB, {"unsub": list(subs)})
  407. # endregion
  408. # region Actually sending messages and stuff
  409. async def send_foreground_state(self, state: ForegroundStateConfig) -> None:
  410. self.log.debug("Updating foreground state: %s", state)
  411. await self.publish(RealtimeTopic.FOREGROUND_STATE,
  412. zlib.compress(state.to_thrift(), level=9))
  413. if state.keep_alive_timeout:
  414. self._client._keepalive = state.keep_alive_timeout
  415. async def send_command(self, thread_id: str, action: ThreadAction,
  416. client_context: Optional[str] = None,
  417. offline_threading_id: Optional[str] = None, **kwargs: Any
  418. ) -> CommandResponse:
  419. client_context = client_context or str(uuid4())
  420. req = {
  421. "thread_id": thread_id,
  422. "client_context": client_context,
  423. "offline_threading_id": offline_threading_id or client_context,
  424. "action": action.value,
  425. # "device_id": self.state.cookies["ig_did"],
  426. **kwargs,
  427. }
  428. resp = await self.request(RealtimeTopic.SEND_MESSAGE, RealtimeTopic.SEND_MESSAGE_RESPONSE,
  429. payload=req)
  430. return CommandResponse.parse_json(resp.payload.decode("utf-8"))
  431. def send_item(self, thread_id: str, item_type: ThreadItemType, shh_mode: bool = False,
  432. client_context: Optional[str] = None, offline_threading_id: Optional[str] = None,
  433. **kwargs: Any) -> Awaitable[CommandResponse]:
  434. return self.send_command(thread_id, item_type=item_type.value,
  435. is_shh_mode=str(int(shh_mode)), action=ThreadAction.SEND_ITEM,
  436. client_context=client_context,
  437. offline_threading_id=offline_threading_id, **kwargs)
  438. def send_hashtag(self, thread_id: str, hashtag: str, text: str = "", shh_mode: bool = False,
  439. client_context: Optional[str] = None,
  440. offline_threading_id: Optional[str] = None) -> Awaitable[CommandResponse]:
  441. return self.send_item(thread_id, text=text, item_id=hashtag, shh_mode=shh_mode,
  442. item_type=ThreadItemType.HASHTAG, client_context=client_context,
  443. offline_threading_id=offline_threading_id)
  444. def send_like(self, thread_id: str, shh_mode: bool = False,
  445. client_context: Optional[str] = None, offline_threading_id: Optional[str] = None,
  446. ) -> Awaitable[CommandResponse]:
  447. return self.send_item(thread_id, shh_mode=shh_mode, item_type=ThreadItemType.LIKE,
  448. client_context=client_context,
  449. offline_threading_id=offline_threading_id)
  450. def send_location(self, thread_id: str, venue_id: str, text: str = "",
  451. shh_mode: bool = False, client_context: Optional[str] = None,
  452. offline_threading_id: Optional[str] = None,
  453. ) -> Awaitable[CommandResponse]:
  454. return self.send_item(thread_id, text=text, item_id=venue_id, shh_mode=shh_mode,
  455. item_type=ThreadItemType.LOCATION, client_context=client_context,
  456. offline_threading_id=offline_threading_id)
  457. def send_media(self, thread_id: str, media_id: str, text: str = "", shh_mode: bool = False,
  458. client_context: Optional[str] = None,
  459. offline_threading_id: Optional[str] = None) -> Awaitable[CommandResponse]:
  460. return self.send_item(thread_id, text=text, item_id=media_id, shh_mode=shh_mode,
  461. item_type=ThreadItemType.MEDIA_SHARE, client_context=client_context,
  462. offline_threading_id=offline_threading_id)
  463. def send_profile(self, thread_id: str, user_id: str, text: str = "", shh_mode: bool = False,
  464. client_context: Optional[str] = None,
  465. offline_threading_id: Optional[str] = None) -> Awaitable[CommandResponse]:
  466. return self.send_item(thread_id, text=text, item_id=user_id, shh_mode=shh_mode,
  467. item_type=ThreadItemType.PROFILE, client_context=client_context,
  468. offline_threading_id=offline_threading_id)
  469. def send_reaction(self, thread_id: str, emoji: str, item_id: str,
  470. reaction_status: ReactionStatus = ReactionStatus.CREATED,
  471. target_item_type: ThreadItemType = ThreadItemType.TEXT,
  472. shh_mode: bool = False, client_context: Optional[str] = None,
  473. offline_threading_id: Optional[str] = None) -> Awaitable[CommandResponse]:
  474. return self.send_item(thread_id, reaction_status=reaction_status.value, node_type="item",
  475. reaction_type="like", target_item_type=target_item_type.value,
  476. emoji=emoji, item_id=item_id, reaction_action_source="double_tap",
  477. shh_mode=shh_mode, item_type=ThreadItemType.REACTION,
  478. client_context=client_context,
  479. offline_threading_id=offline_threading_id)
  480. def send_user_story(self, thread_id: str, media_id: str, text: str = "",
  481. shh_mode: bool = False, client_context: Optional[str] = None,
  482. offline_threading_id: Optional[str] = None) -> Awaitable[CommandResponse]:
  483. return self.send_item(thread_id, text=text, item_id=media_id, shh_mode=shh_mode,
  484. item_type=ThreadItemType.REEL_SHARE, client_context=client_context,
  485. offline_threading_id=offline_threading_id)
  486. def send_text(self, thread_id: str, text: str = "", shh_mode: bool = False,
  487. client_context: Optional[str] = None, offline_threading_id: Optional[str] = None
  488. ) -> Awaitable[CommandResponse]:
  489. return self.send_item(thread_id, text=text, shh_mode=shh_mode,
  490. item_type=ThreadItemType.TEXT, client_context=client_context,
  491. offline_threading_id=offline_threading_id)
  492. def mark_seen(self, thread_id: str, item_id: str, client_context: Optional[str] = None,
  493. offline_threading_id: Optional[str] = None) -> Awaitable[CommandResponse]:
  494. return self.send_command(thread_id, item_id=item_id, action=ThreadAction.MARK_SEEN,
  495. client_context=client_context,
  496. offline_threading_id=offline_threading_id)
  497. def mark_visual_item_seen(self, thread_id: str, item_id: str,
  498. client_context: Optional[str] = None,
  499. offline_threading_id: Optional[str] = None
  500. ) -> Awaitable[CommandResponse]:
  501. return self.send_command(thread_id, item_id=item_id,
  502. action=ThreadAction.MARK_VISUAL_ITEM_SEEN,
  503. client_context=client_context,
  504. offline_threading_id=offline_threading_id)
  505. def indicate_activity(self, thread_id: str, activity_status: TypingStatus = TypingStatus.TEXT,
  506. client_context: Optional[str] = None,
  507. offline_threading_id: Optional[str] = None
  508. ) -> Awaitable[CommandResponse]:
  509. return self.send_command(thread_id, activity_status=activity_status.value,
  510. action=ThreadAction.INDICATE_ACTIVITY,
  511. client_context=client_context,
  512. offline_threading_id=offline_threading_id)
  513. # endregion