|
@@ -36,7 +36,8 @@ from ..state import AndroidState
|
|
|
from ..types import (CommandResponse, ThreadItemType, ThreadAction, ReactionStatus, TypingStatus,
|
|
|
IrisPayload, PubsubPayload, AppPresenceEventPayload, RealtimeDirectEvent,
|
|
|
RealtimeZeroProvisionPayload, ClientConfigUpdatePayload, MessageSyncEvent,
|
|
|
- MessageSyncMessage, LiveVideoCommentPayload, PubsubEvent)
|
|
|
+ MessageSyncMessage, LiveVideoCommentPayload, PubsubEvent, IrisPayloadData,
|
|
|
+ ThreadSyncEvent)
|
|
|
from .thrift import RealtimeConfig, RealtimeClientInfo, ForegroundStateConfig, IncomingMessage
|
|
|
from .otclient import MQTToTClient
|
|
|
from .subscription import everclear_subscriptions, RealtimeTopic, GraphQLQueryID
|
|
@@ -52,6 +53,9 @@ T = TypeVar('T')
|
|
|
ACTIVITY_INDICATOR_REGEX = re.compile(
|
|
|
r"/direct_v2/threads/([\w_]+)/activity_indicator_id/([\w_]+)")
|
|
|
|
|
|
+INBOX_THREAD_REGEX = re.compile(
|
|
|
+ r"/direct_v2/inbox/threads/([\w_]+)")
|
|
|
+
|
|
|
|
|
|
class AndroidMQTT:
|
|
|
_loop: asyncio.AbstractEventLoop
|
|
@@ -216,8 +220,8 @@ class AndroidMQTT:
|
|
|
assert blank == ""
|
|
|
assert direct_v2 == "direct_v2"
|
|
|
assert threads == "threads"
|
|
|
- except (AssertionError, ValueError) as e:
|
|
|
- self.log.debug(f"Got {e} while parsing path {path}")
|
|
|
+ except (AssertionError, ValueError, IndexError) as e:
|
|
|
+ self.log.debug(f"Got {e!r} while parsing path {path}")
|
|
|
raise
|
|
|
additional = {
|
|
|
"thread_id": thread_id
|
|
@@ -243,27 +247,41 @@ class AndroidMQTT:
|
|
|
self.log.trace("Parsed path %s -> %s", path, additional)
|
|
|
return additional
|
|
|
|
|
|
+ def _on_messager_sync_item(self, part: IrisPayloadData, parsed_item: IrisPayload) -> None:
|
|
|
+ if part.path.startswith("/direct_v2/threads/"):
|
|
|
+ raw_message = {
|
|
|
+ "path": part.path,
|
|
|
+ "op": part.op,
|
|
|
+ **self._parse_direct_thread_path(part.path),
|
|
|
+ }
|
|
|
+ try:
|
|
|
+ raw_message = {
|
|
|
+ **raw_message,
|
|
|
+ **json.loads(part.value),
|
|
|
+ }
|
|
|
+ except (json.JSONDecodeError, TypeError):
|
|
|
+ raw_message["value"] = part.value
|
|
|
+ message = MessageSyncMessage.deserialize(raw_message)
|
|
|
+ evt = MessageSyncEvent(iris=parsed_item, message=message)
|
|
|
+ elif part.path.startswith("/direct_v2/inbox/threads/"):
|
|
|
+ raw_message = {
|
|
|
+ "path": part.path,
|
|
|
+ "op": part.op,
|
|
|
+ **json.loads(part.value),
|
|
|
+ }
|
|
|
+ evt = ThreadSyncEvent.deserialize(raw_message)
|
|
|
+ else:
|
|
|
+ self.log.warning(f"Unsupported path {part.path}")
|
|
|
+ return
|
|
|
+ self._loop.create_task(self._dispatch(evt))
|
|
|
+
|
|
|
def _on_message_sync(self, payload: bytes) -> None:
|
|
|
parsed = json.loads(payload.decode("utf-8"))
|
|
|
self.log.trace("Got message sync event: %s", parsed)
|
|
|
for sync_item in parsed:
|
|
|
parsed_item = IrisPayload.deserialize(sync_item)
|
|
|
for part in parsed_item.data:
|
|
|
- raw_message = {
|
|
|
- "path": part.path,
|
|
|
- "op": part.op,
|
|
|
- **self._parse_direct_thread_path(part.path),
|
|
|
- }
|
|
|
- try:
|
|
|
- raw_message = {
|
|
|
- **raw_message,
|
|
|
- **json.loads(part.value),
|
|
|
- }
|
|
|
- except (json.JSONDecodeError, TypeError):
|
|
|
- raw_message["value"] = part.value
|
|
|
- message = MessageSyncMessage.deserialize(raw_message)
|
|
|
- evt = MessageSyncEvent(iris=parsed_item, message=message)
|
|
|
- self._loop.create_task(self._dispatch(evt))
|
|
|
+ self._on_messager_sync_item(part, parsed_item)
|
|
|
|
|
|
def _on_pubsub(self, payload: bytes) -> None:
|
|
|
parsed_thrift = IncomingMessage.from_thrift(payload)
|