Browse Source

Add support for Instagram->Matrix read receipts and typing notifications

Tulir Asokan 4 years ago
parent
commit
127f67bae5
5 changed files with 54 additions and 28 deletions
  1. 2 2
      ROADMAP.md
  2. 8 7
      mauigpapi/mqtt/conn.py
  3. 4 4
      mauigpapi/types/mqtt.py
  4. 10 5
      mautrix_instagram/portal.py
  5. 30 10
      mautrix_instagram/user.py

+ 2 - 2
ROADMAP.md

@@ -28,8 +28,8 @@
   * [x] Message reactions
   * [x] Message history
   * [ ] Presence
-  * [ ] Typing notifications
-  * [ ] Read receipts
+  * [x] Typing notifications
+  * [x] Read receipts
   * [x] User metadata
     * [x] Name
     * [x] Avatar

+ 8 - 7
mauigpapi/mqtt/conn.py

@@ -222,8 +222,8 @@ class AndroidMQTT:
             subitem_key = rest[0]
             if subitem_key == "approval_required_for_new_members":
                 additional["approval_required_for_new_members"] = True
-            elif subitem_key == "participants":
-                additional["participants"] = {rest[1]: rest[2]}
+            elif subitem_key == "participants" and len(rest) > 2 and rest[2] == "has_seen":
+                additional["has_seen"] = int(rest[1])
             elif subitem_key == "items":
                 additional["item_id"] = rest[1]
                 # TODO wtf is this?
@@ -232,8 +232,10 @@ class AndroidMQTT:
                     additional[rest[2]] = {
                         rest[3]: rest[4],
                     }
-            elif subitem_key in ("admin_user_ids", "activity_indicator_id"):
-                additional[subitem_key] = rest[1]
+            elif subitem_key in "admin_user_ids":
+                additional["admin_user_id"] = int(rest[1])
+            elif subitem_key == "activity_indicator_id":
+                additional["activity_indicator_id"] = rest[1]
         print("Parsed path", path, "->", additional)
         return additional
 
@@ -261,8 +263,8 @@ class AndroidMQTT:
 
     def _on_pubsub(self, payload: bytes) -> None:
         parsed_thrift = IncomingMessage.from_thrift(payload)
+        self.log.trace(f"Got pubsub event {parsed_thrift.topic} / {parsed_thrift.payload}")
         message = PubsubPayload.parse_json(parsed_thrift.payload)
-        self.log.trace(f"Got pubsub event with topic {parsed_thrift.topic}: {message}")
         for data in message.data:
             match = ACTIVITY_INDICATOR_REGEX.match(data.path)
             if match:
@@ -299,14 +301,13 @@ class AndroidMQTT:
             topic = GraphQLQueryID(parsed_thrift.topic)
         except ValueError:
             topic = parsed_thrift.topic
+        self.log.trace(f"Got realtime sub event {topic} / {parsed_thrift.payload}")
         allowed = ("direct", GraphQLQueryID.APP_PRESENCE, GraphQLQueryID.ZERO_PROVISION,
                    GraphQLQueryID.CLIENT_CONFIG_UPDATE, GraphQLQueryID.LIVE_REALTIME_COMMENTS)
         if topic not in allowed:
-            self.log.debug(f"Got unknown realtime sub event {topic}: {parsed_thrift.payload}")
             return
         parsed_json = json.loads(parsed_thrift.payload)
         for evt in self._parse_realtime_sub_item(topic, parsed_json):
-            self.log.trace(f"Got realtime sub event with topic {topic}: {evt}")
             self._loop.create_task(self._dispatch(evt))
 
     def _on_message_handler(self, client: MQTToTClient, _: Any, message: MQTTMessage) -> None:

+ 4 - 4
mauigpapi/types/mqtt.py

@@ -13,7 +13,7 @@
 #
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
-from typing import Dict, Any, List, Optional
+from typing import Any, List, Optional, Union
 import json
 
 from attr import dataclass
@@ -91,9 +91,9 @@ class MessageSyncMessage(ThreadItem, SerializableAttrs['MessageSyncMessage']):
     op: Operation = Operation.ADD
 
     # These come from parsing the path
-    admin_user_ids: Optional[int] = None
+    admin_user_id: Optional[int] = None
     approval_required_for_new_members: Optional[bool] = None
-    participants: Optional[Dict[str, str]] = None
+    has_seen: Optional[int] = None
     thread_id: Optional[str] = None
 
 
@@ -119,7 +119,7 @@ class PubsubBasePayload(SerializableAttrs['PubsubBasePayload']):
 
 @dataclass(kw_only=True)
 class ActivityIndicatorData(SerializableAttrs['ActivityIndicatorData']):
-    timestamp: str
+    timestamp: Union[int, str]
     sender_id: str
     ttl: int
     activity_status: TypingStatus

+ 10 - 5
mautrix_instagram/portal.py

@@ -24,7 +24,7 @@ import magic
 
 from mauigpapi.types import (Thread, ThreadUser, ThreadItem, RegularMediaItem, MediaType,
                              ReactionStatus, Reaction, AnimatedMediaItem, ThreadItemType,
-                             VoiceMediaItem, ExpiredMediaItem, MediaShareItem, ReelShareType)
+                             VoiceMediaItem, ExpiredMediaItem, MessageSyncMessage, ReelShareType)
 from mautrix.appservice import AppService, IntentAPI
 from mautrix.bridge import BasePortal, NotificationDisabler
 from mautrix.types import (EventID, MessageEventContent, RoomID, EventType, MessageType, ImageInfo,
@@ -501,13 +501,18 @@ class Portal(DBPortal, BasePortal):
             puppet = await p.Puppet.get_by_pk(old_reaction.ig_sender)
             await puppet.intent_for(self).redact(self.mxid, old_reaction.mxid)
 
-    async def handle_instagram_update(self, item: ThreadItem) -> None:
+    async def handle_instagram_update(self, item: MessageSyncMessage) -> None:
         message = await DBMessage.get_by_item_id(item.item_id, self.receiver)
         if not message:
             return
-        async with self._reaction_lock:
-            await self._handle_instagram_reactions(message, (item.reactions.emojis
-                                                             if item.reactions else []))
+        if item.has_seen:
+            puppet = await p.Puppet.get_by_pk(item.has_seen, create=False)
+            if puppet:
+                await puppet.intent_for(self).mark_read(self.mxid, message.mxid)
+        else:
+            async with self._reaction_lock:
+                await self._handle_instagram_reactions(message, (item.reactions.emojis
+                                                                 if item.reactions else []))
 
     # endregion
     # region Updating portal info

+ 30 - 10
mautrix_instagram/user.py

@@ -18,10 +18,12 @@ from typing import (Dict, Optional, AsyncIterable, Awaitable, AsyncGenerator, Li
 from collections import defaultdict
 import asyncio
 import logging
+import time
 
 from mauigpapi import AndroidAPI, AndroidState, AndroidMQTT
 from mauigpapi.mqtt import Connect, Disconnect, GraphQLSubscription, SkywalkerSubscription
-from mauigpapi.types import CurrentUser, MessageSyncEvent, Operation
+from mauigpapi.types import (CurrentUser, MessageSyncEvent, Operation, RealtimeDirectEvent,
+                             ActivityIndicatorData, TypingStatus)
 from mauigpapi.errors import IGNotLoggedInError
 from mautrix.bridge import BaseUser
 from mautrix.types import UserID, RoomID, EventID, TextMessageEventContent, MessageType
@@ -36,12 +38,13 @@ if TYPE_CHECKING:
     from .__main__ import InstagramBridge
 
 METRIC_MESSAGE = Summary("bridge_on_message", "calls to handle_message")
-METRIC_RECEIPT = Summary("bridge_on_receipt", "calls to handle_receipt")
+METRIC_RTD = Summary("bridge_on_rtd", "calls to handle_rtd")
 METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
 METRIC_CONNECTED = Gauge("bridge_connected", "Bridged users connected to Instagram")
 
 
 class User(DBUser, BaseUser):
+    _activity_indicator_ids: Dict[str, int] = {}
     by_mxid: Dict[UserID, 'User'] = {}
     by_igpk: Dict[int, 'User'] = {}
     config: Config
@@ -117,6 +120,7 @@ class User(DBUser, BaseUser):
         self.mqtt.add_event_handler(Connect, self.on_connect)
         self.mqtt.add_event_handler(Disconnect, self.on_disconnect)
         self.mqtt.add_event_handler(MessageSyncEvent, self.handle_message)
+        self.mqtt.add_event_handler(RealtimeDirectEvent, self.handle_rtd)
 
         await self.update()
 
@@ -258,14 +262,30 @@ class User(DBUser, BaseUser):
         elif evt.message.op == Operation.REPLACE:
             await portal.handle_instagram_update(evt.message)
 
-    # @async_time(METRIC_RECEIPT)
-    # async def handle_receipt(self, evt: ConversationReadEntry) -> None:
-    #     portal = await po.Portal.get_by_twid(evt.conversation_id, self.twid,
-    #                                          conv_type=evt.conversation.type)
-    #     if not portal.mxid:
-    #         return
-    #     sender = await pu.Puppet.get_by_twid(self.twid)
-    #     await portal.handle_twitter_receipt(sender, int(evt.last_read_event_id))
+    @async_time(METRIC_RTD)
+    async def handle_rtd(self, evt: RealtimeDirectEvent) -> None:
+        if not isinstance(evt.value, ActivityIndicatorData):
+            return
+
+        now = int(time.time() * 1000)
+        date = int(evt.value.timestamp) // 1000
+        expiry = date + evt.value.ttl
+        if expiry < now:
+            return
+
+        if evt.activity_indicator_id in self._activity_indicator_ids:
+            return
+        # TODO clear expired items from this dict
+        self._activity_indicator_ids[evt.activity_indicator_id] = expiry
+
+        puppet = await pu.Puppet.get_by_pk(int(evt.value.sender_id))
+        portal = await po.Portal.get_by_thread_id(evt.thread_id, receiver=self.igpk)
+        if not puppet or not portal:
+            return
+
+        is_typing = evt.value.activity_status != TypingStatus.OFF
+        await puppet.intent_for(portal).set_typing(portal.mxid, is_typing=is_typing,
+                                                   timeout=evt.value.ttl)
 
     # endregion
     # region Database getters