Browse Source

Make message buffers bigger and make locking message handling less hacky

Tulir Asokan 5 years ago
parent
commit
0430446d6d
4 changed files with 25 additions and 12 deletions
  1. 4 0
      config/bridge.go
  2. 2 0
      example-config.yaml
  3. 2 2
      portal.go
  4. 17 10
      user.go

+ 4 - 0
config/bridge.go

@@ -44,6 +44,8 @@ type BridgeConfig struct {
 	ReportConnectionRetry bool `yaml:"report_connection_retry"`
 	ChatListWait          int  `yaml:"chat_list_wait"`
 	PortalSyncWait        int  `yaml:"portal_sync_wait"`
+	UserMessageBuffer     int  `yaml:"user_message_buffer"`
+	PortalMessageBuffer   int  `yaml:"portal_message_buffer"`
 
 	CallNotices struct {
 		Start bool `yaml:"start"`
@@ -96,6 +98,8 @@ func (bc *BridgeConfig) setDefaults() {
 	bc.ReportConnectionRetry = true
 	bc.ChatListWait = 30
 	bc.PortalSyncWait = 600
+	bc.UserMessageBuffer = 1024
+	bc.PortalMessageBuffer = 128
 
 	bc.CallNotices.Start = true
 	bc.CallNotices.End = true

+ 2 - 0
example-config.yaml

@@ -109,6 +109,8 @@ bridge:
     # Maximum number of seconds to wait to sync portals before force unlocking message processing.
     # If this is too low and you have lots of chats, it could cause backfilling to fail.
     portal_sync_wait: 600
+    user_message_buffer: 1024
+    portal_message_buffer: 128
 
     # Whether or not to send call start/end notices to Matrix.
     call_notices:

+ 2 - 2
portal.go

@@ -133,7 +133,7 @@ func (bridge *Bridge) NewManualPortal(key database.PortalKey) *Portal {
 
 		recentlyHandled: [recentlyHandledLength]types.WhatsAppMessageID{},
 
-		messages: make(chan PortalMessage, 128),
+		messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer),
 	}
 	portal.Key = key
 	go portal.handleMessageLoop()
@@ -148,7 +148,7 @@ func (bridge *Bridge) NewPortal(dbPortal *database.Portal) *Portal {
 
 		recentlyHandled: [recentlyHandledLength]types.WhatsAppMessageID{},
 
-		messages: make(chan PortalMessage, 128),
+		messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer),
 	}
 	go portal.handleMessageLoop()
 	return portal

+ 17 - 10
user.go

@@ -65,7 +65,9 @@ type User struct {
 	syncPortalsDone  chan struct{}
 
 	messages chan PortalMessage
-	syncLock sync.Mutex
+
+	syncStart chan struct{}
+	syncWait  sync.WaitGroup
 
 	mgmtCreateLock sync.Mutex
 }
@@ -167,7 +169,8 @@ func (bridge *Bridge) NewUser(dbUser *database.User) *User {
 
 		chatListReceived: make(chan struct{}, 1),
 		syncPortalsDone:  make(chan struct{}, 1),
-		messages:         make(chan PortalMessage, 256),
+		syncStart:        make(chan struct{}, 1),
+		messages:         make(chan PortalMessage, bridge.Config.Bridge.UserMessageBuffer),
 	}
 	user.RelaybotWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelaybotWhitelisted(user.MXID)
 	user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID)
@@ -399,7 +402,8 @@ func (cl ChatList) Swap(i, j int) {
 
 func (user *User) PostLogin() {
 	user.log.Debugln("Locking processing of incoming messages and starting post-login sync")
-	user.syncLock.Lock()
+	user.syncWait.Add(1)
+	user.syncStart <- struct{}{}
 	go user.intPostLogin()
 }
 
@@ -431,7 +435,7 @@ func (user *User) tryAutomaticDoublePuppeting() {
 }
 
 func (user *User) intPostLogin() {
-	defer user.syncLock.Unlock()
+	defer user.syncWait.Done()
 	user.createCommunity()
 	user.tryAutomaticDoublePuppeting()
 
@@ -439,14 +443,14 @@ func (user *User) intPostLogin() {
 	case <-user.chatListReceived:
 		user.log.Debugln("Chat list receive confirmation received in PostLogin")
 	case <-time.After(time.Duration(user.bridge.Config.Bridge.ChatListWait) * time.Second):
-		user.log.Warnln("Timed out waiting for chat list to arrive! Unlocking processing of incoming messages.")
+		user.log.Warnln("Timed out waiting for chat list to arrive!")
 		return
 	}
 	select {
 	case <-user.syncPortalsDone:
 		user.log.Debugln("Post-login portal sync complete, unlocking processing of incoming messages.")
 	case <-time.After(time.Duration(user.bridge.Config.Bridge.PortalSyncWait) * time.Second):
-		user.log.Warnln("Timed out waiting for chat list to arrive! Unlocking processing of incoming messages.")
+		user.log.Warnln("Timed out waiting for portal sync to complete! Unlocking processing of incoming messages.")
 	}
 }
 
@@ -658,10 +662,13 @@ func (user *User) GetPortalByJID(jid types.WhatsAppID) *Portal {
 }
 
 func (user *User) handleMessageLoop() {
-	for msg := range user.messages {
-		user.syncLock.Lock()
-		user.GetPortalByJID(msg.chat).messages <- msg
-		user.syncLock.Unlock()
+	for {
+		select {
+		case msg := <-user.messages:
+			user.GetPortalByJID(msg.chat).messages <- msg
+		case <-user.syncStart:
+			user.syncWait.Wait()
+		}
 	}
 }