فهرست منبع

Add locking for whatsapp->matrix messages for desegregated group chats

Tulir Asokan 6 سال پیش
والد
کامیت
79851a62b4
5فایلهای تغییر یافته به همراه104 افزوده شده و 28 حذف شده
  1. 3 3
      Gopkg.lock
  2. 1 1
      example-config.yaml
  3. 88 24
      portal.go
  4. 10 0
      vendor/maunium.net/go/mautrix-appservice/intent.go
  5. 2 0
      vendor/maunium.net/go/mautrix-appservice/statestore.go

+ 3 - 3
Gopkg.lock

@@ -87,7 +87,7 @@
     "curve25519",
     "hkdf"
   ]
-  revision = "614d502a4dac94afa3a6ce146bd1736da82514c6"
+  revision = "182538f80094b6a8efaade63a8fd8e0d9d5843dd"
 
 [[projects]]
   branch = "master"
@@ -102,7 +102,7 @@
   branch = "master"
   name = "golang.org/x/sys"
   packages = ["unix"]
-  revision = "d99a578cf41bfccdeaf48b0845c823a4b8b0ad5e"
+  revision = "49385e6e15226593f68b26af201feec29d5bba22"
 
 [[projects]]
   name = "gopkg.in/russross/blackfriday.v2"
@@ -141,7 +141,7 @@
   branch = "master"
   name = "maunium.net/go/mautrix-appservice"
   packages = ["."]
-  revision = "37d4449056015cea5d0a4420bba595c61ad32007"
+  revision = "fb756247f82716de7698b8200f28f16b4fd04a6b"
 
 [solve-meta]
   analyzer-name = "dep"

+ 1 - 1
example-config.yaml

@@ -64,7 +64,7 @@ bridge:
   #   domain - All users on that homeserver
   #     mxid - Specific user
   permissions:
-    "example.com": full
+    "example.com": user
     "@admin:example.com": admin
 
 # Logging config.

+ 88 - 24
portal.go

@@ -102,6 +102,9 @@ func (bridge *Bridge) NewPortal(dbPortal *database.Portal) *Portal {
 		Portal: dbPortal,
 		bridge: bridge,
 		log:    bridge.Log.Sub(fmt.Sprintf("Portal/%s", dbPortal.Key)),
+
+		messageLocks:    make(map[types.WhatsAppMessageID]sync.Mutex),
+		recentlyHandled: [20]types.WhatsAppMessageID{},
 	}
 }
 
@@ -111,7 +114,80 @@ type Portal struct {
 	bridge *Bridge
 	log    log.Logger
 
-	roomCreateLock sync.Mutex
+	roomCreateLock   sync.Mutex
+	messageLocksLock sync.Mutex
+	messageLocks     map[types.WhatsAppMessageID]sync.Mutex
+
+	recentlyHandled      [20]types.WhatsAppMessageID
+	recentlyHandledLock  sync.Mutex
+	recentlyHandledIndex uint8
+}
+
+func (portal *Portal) getMessageLock(messageID types.WhatsAppMessageID) sync.Mutex {
+	portal.messageLocksLock.Lock()
+	defer portal.messageLocksLock.Unlock()
+	lock, ok := portal.messageLocks[messageID]
+	if !ok {
+		portal.messageLocks[messageID] = lock
+	}
+	return lock
+}
+
+func (portal *Portal) deleteMessageLock(messageID types.WhatsAppMessageID) {
+	portal.messageLocksLock.Lock()
+	delete(portal.messageLocks, messageID)
+	portal.messageLocksLock.Unlock()
+}
+
+func (portal *Portal) isRecentlyHandled(id types.WhatsAppMessageID) bool {
+	start := portal.recentlyHandledIndex
+	for i := start; i != start; i = (i - 1) % 20 {
+		if portal.recentlyHandled[i] == id {
+			return true
+		}
+	}
+	return false
+}
+
+func (portal *Portal) isDuplicate(id types.WhatsAppMessageID) bool {
+	msg := portal.bridge.DB.Message.GetByJID(portal.Key, id)
+	if msg != nil {
+		return true
+	}
+	return false
+}
+
+func (portal *Portal) markHandled(jid types.WhatsAppMessageID, mxid types.MatrixEventID) {
+	msg := portal.bridge.DB.Message.New()
+	msg.Chat = portal.Key
+	msg.JID = jid
+	msg.MXID = mxid
+	msg.Insert()
+
+	portal.recentlyHandledLock.Lock()
+	index := portal.recentlyHandledIndex
+	portal.recentlyHandledIndex = (portal.recentlyHandledIndex + 1) % 20
+	portal.recentlyHandledLock.Unlock()
+	portal.recentlyHandled[index] = jid
+}
+
+func (portal *Portal) startHandling(id types.WhatsAppMessageID) (*sync.Mutex, bool) {
+	if portal.isRecentlyHandled(id) {
+		return nil, false
+	}
+	lock := portal.getMessageLock(id)
+	lock.Lock()
+	if portal.isDuplicate(id) {
+		lock.Unlock()
+		return nil, false
+	}
+	return &lock, true
+}
+
+func (portal *Portal) finishHandling(id types.WhatsAppMessageID, mxid types.MatrixEventID) {
+	portal.markHandled(id, mxid)
+	portal.deleteMessageLock(id)
+	portal.log.Debugln("Handled message", id, "->", mxid)
 }
 
 func (portal *Portal) SyncParticipants(metadata *whatsappExt.GroupInfo) {
@@ -238,6 +314,8 @@ func (portal *Portal) Sync(user *User, contact whatsapp.Contact) {
 			portal.log.Errorln("Failed to create portal room:", err)
 			return
 		}
+	} else {
+		portal.MainIntent().EnsureInvited(portal.MXID, user.MXID)
 	}
 
 	update := false
@@ -382,22 +460,6 @@ func (portal *Portal) MainIntent() *appservice.IntentAPI {
 	return portal.bridge.Bot
 }
 
-func (portal *Portal) IsDuplicate(id types.WhatsAppMessageID) bool {
-	msg := portal.bridge.DB.Message.GetByJID(portal.Key, id)
-	if msg != nil {
-		return true
-	}
-	return false
-}
-
-func (portal *Portal) MarkHandled(jid types.WhatsAppMessageID, mxid types.MatrixEventID) {
-	msg := portal.bridge.DB.Message.New()
-	msg.Chat = portal.Key
-	msg.JID = jid
-	msg.MXID = mxid
-	msg.Insert()
-}
-
 func (portal *Portal) GetMessageIntent(user *User, info whatsapp.MessageInfo) *appservice.IntentAPI {
 	if info.FromMe {
 		if portal.IsPrivateChat() {
@@ -434,9 +496,11 @@ func (portal *Portal) SetReply(content *gomatrix.Content, info whatsapp.MessageI
 }
 
 func (portal *Portal) HandleTextMessage(source *User, message whatsapp.TextMessage) {
-	if portal.IsDuplicate(message.Info.Id) {
+	lock, ok := portal.startHandling(message.Info.Id)
+	if !ok {
 		return
 	}
+	defer lock.Unlock()
 
 	err := portal.CreateMatrixRoom([]string{source.MXID})
 	if err != nil {
@@ -463,14 +527,15 @@ func (portal *Portal) HandleTextMessage(source *User, message whatsapp.TextMessa
 		portal.log.Errorfln("Failed to handle message %s: %v", message.Info.Id, err)
 		return
 	}
-	portal.MarkHandled(message.Info.Id, resp.EventID)
-	portal.log.Debugln("Handled message", message.Info.Id, "->", resp.EventID)
+	portal.finishHandling(message.Info.Id, resp.EventID)
 }
 
 func (portal *Portal) HandleMediaMessage(source *User, download func() ([]byte, error), thumbnail []byte, info whatsapp.MessageInfo, mimeType, caption string) {
-	if portal.IsDuplicate(info.Id) {
+	lock, ok := portal.startHandling(info.Id)
+	if !ok {
 		return
 	}
+	defer lock.Unlock()
 
 	err := portal.CreateMatrixRoom([]string{source.MXID})
 	if err != nil {
@@ -563,8 +628,7 @@ func (portal *Portal) HandleMediaMessage(source *User, download func() ([]byte,
 		// TODO store caption mxid?
 	}
 
-	portal.MarkHandled(info.Id, resp.EventID)
-	portal.log.Debugln("Handled message", info.Id, "->", resp.EventID)
+	portal.finishHandling(info.Id, resp.EventID)
 }
 
 func makeMessageID() *string {
@@ -799,8 +863,8 @@ func (portal *Portal) HandleMatrixMessage(sender *User, evt *gomatrix.Event) {
 		portal.log.Debugln("Unhandled Matrix event:", evt)
 		return
 	}
+	portal.markHandled(info.GetKey().GetId(), evt.ID)
 	err = sender.Conn.Send(info)
-	portal.MarkHandled(info.GetKey().GetId(), evt.ID)
 	if err != nil {
 		portal.log.Errorfln("Error handling Matrix event %s: %v", evt.ID, err)
 	} else {

+ 10 - 0
vendor/maunium.net/go/mautrix-appservice/intent.go

@@ -231,3 +231,13 @@ func (intent *IntentAPI) SetAvatarURL(avatarURL string) error {
 	}
 	return intent.Client.SetAvatarURL(avatarURL)
 }
+
+func (intent *IntentAPI) EnsureInvited(roomID, userID string) error {
+	if !intent.as.StateStore.IsInvited(roomID, userID) {
+		_, err := intent.Client.InviteUser(roomID, &gomatrix.ReqInviteUser{
+			UserID: userID,
+		})
+		return err
+	}
+	return nil
+}

+ 2 - 0
vendor/maunium.net/go/mautrix-appservice/statestore.go

@@ -30,6 +30,8 @@ func (as *AppService) UpdateState(evt *gomatrix.Event) {
 	switch evt.Type {
 	case gomatrix.StateMember:
 		as.StateStore.SetMembership(evt.RoomID, evt.GetStateKey(), evt.Content.Membership)
+	case gomatrix.StatePowerLevels:
+		as.StateStore.SetPowerLevels(evt.RoomID, &evt.Content.PowerLevels)
 	}
 }