Browse Source

Fix parsing realtime sub events

Tulir Asokan 4 years ago
parent
commit
b8e5589b38
2 changed files with 37 additions and 20 deletions
  1. 31 20
      mauigpapi/mqtt/conn.py
  2. 6 0
      mauigpapi/types/mqtt.py

+ 31 - 20
mauigpapi/mqtt/conn.py

@@ -13,7 +13,8 @@
 #
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
-from typing import Union, Set, Optional, Any, Dict, Awaitable, Type, List, TypeVar, Callable
+from typing import (Union, Set, Optional, Any, Dict, Awaitable, Type, List, TypeVar, Callable,
+                    Iterable)
 from collections import defaultdict
 from socket import socket, error as SocketError
 from uuid import uuid4
@@ -219,13 +220,12 @@ class AndroidMQTT:
             "thread_id": thread_id
         }
         if rest:
-            if rest[0] == "admin_user_ids":
-                additional["admin_user_ids"] = rest[1]
-            elif rest[0] == "approval_required_for_new_members":
+            subitem_key = rest[0]
+            if subitem_key == "approval_required_for_new_members":
                 additional["approval_required_for_new_members"] = True
-            elif rest[0] == ["participants"]:
+            elif subitem_key == ["participants"]:
                 additional["participants"] = {rest[1]: rest[2]}
-            elif rest[0] == ["items"]:
+            elif subitem_key == ["items"]:
                 additional["item_id"] = rest[1]
                 # TODO wtf is this?
                 #      it has something to do with reactions
@@ -233,6 +233,8 @@ class AndroidMQTT:
                     additional[rest[2]] = {
                         rest[3]: rest[4],
                     }
+            elif subitem_key in ("admin_user_ids", "activity_indicator_id"):
+                additional[subitem_key] = rest[1]
         return additional
 
     def _on_message_sync(self, payload: bytes) -> None:
@@ -272,30 +274,39 @@ class AndroidMQTT:
             else:
                 self.log.debug("Pubsub: double publish: %s", data.path)
 
-    @staticmethod
-    def _parse_realtime_sub_item(topic: str, raw: dict) -> Any:
+    def _parse_realtime_sub_item(self, topic: Union[str, GraphQLQueryID], raw: dict
+                                 ) -> Iterable[Any]:
         if topic == GraphQLQueryID.APP_PRESENCE:
-            return AppPresenceEventPayload.deserialize(raw).presence_event
+            yield AppPresenceEventPayload.deserialize(raw).presence_event
         elif topic == GraphQLQueryID.ZERO_PROVISION:
-            return RealtimeZeroProvisionPayload.deserialize(raw).zero_product_provisioning_event
+            yield RealtimeZeroProvisionPayload.deserialize(raw).zero_product_provisioning_event
         elif topic == GraphQLQueryID.CLIENT_CONFIG_UPDATE:
-            return ClientConfigUpdatePayload.deserialize(raw).client_config_update_event
+            yield ClientConfigUpdatePayload.deserialize(raw).client_config_update_event
         elif topic == GraphQLQueryID.LIVE_REALTIME_COMMENTS:
-            return LiveVideoCommentPayload.deserialize(raw).live_video_comment_event
+            yield LiveVideoCommentPayload.deserialize(raw).live_video_comment_event
         elif topic == "direct":
-            return RealtimeDirectEvent.deserialize(raw)
+            event = raw["event"]
+            for item in raw["data"]:
+                yield RealtimeDirectEvent.deserialize({
+                    "event": event,
+                    **self._parse_direct_thread_path(item["path"]),
+                    **item,
+                })
 
     def _on_realtime_sub(self, payload: bytes) -> None:
         parsed_thrift = IncomingMessage.from_thrift(payload)
-        topic = parsed_thrift.topic
-        if topic not in ("direct", GraphQLQueryID.APP_PRESENCE, GraphQLQueryID.ZERO_PROVISION,
-                         GraphQLQueryID.CLIENT_CONFIG_UPDATE, GraphQLQueryID.LIVE_REALTIME_COMMENTS):
+        try:
+            topic = GraphQLQueryID(parsed_thrift.topic)
+        except ValueError:
+            topic = parsed_thrift.topic
+        allowed = ("direct", GraphQLQueryID.APP_PRESENCE, GraphQLQueryID.ZERO_PROVISION,
+                   GraphQLQueryID.CLIENT_CONFIG_UPDATE, GraphQLQueryID.LIVE_REALTIME_COMMENTS)
+        if topic not in allowed:
             self.log.debug(f"Got unknown realtime sub event {topic}: {parsed_thrift.payload}")
+            return
         parsed_json = json.loads(parsed_thrift.payload)
-        event = parsed_json.get("event", "<no event type>")
-        for item in parsed_json["data"]:
-            evt = self._parse_realtime_sub_item(topic, item)
-            self.log.trace(f"Got realtime sub event with topic {topic}/{event}: {evt}")
+        for evt in self._parse_realtime_sub_item(topic, parsed_json):
+            self.log.trace(f"Got realtime sub event with topic {topic}: {evt}")
             self._loop.create_task(self._dispatch(evt))
 
     def _on_message_handler(self, client: MQTToTClient, _: Any, message: MQTTMessage) -> None:

+ 6 - 0
mauigpapi/types/mqtt.py

@@ -201,6 +201,12 @@ class RealtimeDirectEvent(SerializableAttrs['RealtimeDirectEvent']):
     op: Operation
     path: str
     value: RealtimeDirectData
+    # Comes from the parent object
+    # TODO many places have this kind of event, it's usually "patch", might need an enum
+    event: Optional[str] = None
+    # Parsed from path
+    thread_id: Optional[str] = None
+    activity_indicator_id: Optional[str] = None
 
 
 @dataclass(kw_only=True)