Przeglądaj źródła

Add metric for tracking buffer size

Tulir Asokan 4 lat temu
rodzic
commit
7d54aca762
3 zmienionych plików z 51 dodań i 20 usunięć
  1. 12 0
      metrics.go
  2. 36 20
      portal.go
  3. 3 0
      user.go

+ 12 - 0
metrics.go

@@ -61,6 +61,7 @@ type MetricsHandler struct {
 	loggedInState   map[types.WhatsAppID]bool
 	loggedInState   map[types.WhatsAppID]bool
 	syncLocked      prometheus.Gauge
 	syncLocked      prometheus.Gauge
 	syncLockedState map[types.WhatsAppID]bool
 	syncLockedState map[types.WhatsAppID]bool
+	bufferLength    *prometheus.GaugeVec
 }
 }
 
 
 func NewMetricsHandler(address string, log log.Logger, db *database.Database) *MetricsHandler {
 func NewMetricsHandler(address string, log log.Logger, db *database.Database) *MetricsHandler {
@@ -119,6 +120,10 @@ func NewMetricsHandler(address string, log log.Logger, db *database.Database) *M
 			Help: "Bridge users locked in post-login sync",
 			Help: "Bridge users locked in post-login sync",
 		}),
 		}),
 		syncLockedState: make(map[types.WhatsAppID]bool),
 		syncLockedState: make(map[types.WhatsAppID]bool),
+		bufferLength: promauto.NewGaugeVec(prometheus.GaugeOpts{
+			Name: "bridge_buffer_size",
+			Help: "Number of messages in buffer",
+		}, []string{"user_id"}),
 	}
 	}
 }
 }
 
 
@@ -189,6 +194,13 @@ func (mh *MetricsHandler) TrackSyncLock(jid types.WhatsAppID, locked bool) {
 	}
 	}
 }
 }
 
 
+func (mh *MetricsHandler) TrackBufferLength(id id.UserID, length int) {
+	if !mh.running {
+		return
+	}
+	mh.bufferLength.With(prometheus.Labels{"user_id": string(id)}).Set(float64(length))
+}
+
 func (mh *MetricsHandler) updateStats() {
 func (mh *MetricsHandler) updateStats() {
 	start := time.Now()
 	start := time.Now()
 	var puppetCount int
 	var puppetCount int

+ 36 - 20
portal.go

@@ -133,7 +133,8 @@ func (bridge *Bridge) NewManualPortal(key database.PortalKey) *Portal {
 
 
 		recentlyHandled: [recentlyHandledLength]types.WhatsAppMessageID{},
 		recentlyHandled: [recentlyHandledLength]types.WhatsAppMessageID{},
 
 
-		messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer),
+		messages:      make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer),
+		backfillStart: make(chan struct{}, 1),
 	}
 	}
 	portal.Key = key
 	portal.Key = key
 	go portal.handleMessageLoop()
 	go portal.handleMessageLoop()
@@ -148,7 +149,8 @@ func (bridge *Bridge) NewPortal(dbPortal *database.Portal) *Portal {
 
 
 		recentlyHandled: [recentlyHandledLength]types.WhatsAppMessageID{},
 		recentlyHandled: [recentlyHandledLength]types.WhatsAppMessageID{},
 
 
-		messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer),
+		messages:      make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer),
+		backfillStart: make(chan struct{}, 1),
 	}
 	}
 	go portal.handleMessageLoop()
 	go portal.handleMessageLoop()
 	return portal
 	return portal
@@ -175,7 +177,8 @@ type Portal struct {
 	recentlyHandledLock  sync.Mutex
 	recentlyHandledLock  sync.Mutex
 	recentlyHandledIndex uint8
 	recentlyHandledIndex uint8
 
 
-	backfillLock  sync.Mutex
+	backfillStart chan struct{}
+	backfillWait  sync.WaitGroup
 	backfilling   bool
 	backfilling   bool
 	lastMessageTs uint64
 	lastMessageTs uint64
 
 
@@ -190,23 +193,32 @@ type Portal struct {
 const MaxMessageAgeToCreatePortal = 5 * 60 // 5 minutes
 const MaxMessageAgeToCreatePortal = 5 * 60 // 5 minutes
 
 
 func (portal *Portal) handleMessageLoop() {
 func (portal *Portal) handleMessageLoop() {
-	for msg := range portal.messages {
-		if len(portal.MXID) == 0 {
-			if msg.timestamp+MaxMessageAgeToCreatePortal < uint64(time.Now().Unix()) {
-				portal.log.Debugln("Not creating portal room for incoming message as the message is too old.")
-				continue
-			}
-			portal.log.Debugln("Creating Matrix room from incoming message")
-			err := portal.CreateMatrixRoom(msg.source)
-			if err != nil {
-				portal.log.Errorln("Failed to create portal room:", err)
-				return
-			}
+	for {
+		select {
+		case msg := <-portal.messages:
+			portal.handleNewMessage(msg)
+		case <-portal.backfillStart:
+			portal.log.Debugln("Processing of incoming messages is locked for backfilling")
+			portal.backfillWait.Wait()
+			portal.log.Debugln("Processing of incoming messages unlocked after backfilling")
+		}
+	}
+}
+
+func (portal *Portal) handleNewMessage(msg PortalMessage) {
+	if len(portal.MXID) == 0 {
+		if msg.timestamp+MaxMessageAgeToCreatePortal < uint64(time.Now().Unix()) {
+			portal.log.Debugln("Not creating portal room for incoming message as the message is too old.")
+			return
+		}
+		portal.log.Debugln("Creating Matrix room from incoming message")
+		err := portal.CreateMatrixRoom(msg.source)
+		if err != nil {
+			portal.log.Errorln("Failed to create portal room:", err)
+			return
 		}
 		}
-		portal.backfillLock.Lock()
-		portal.handleMessage(msg)
-		portal.backfillLock.Unlock()
 	}
 	}
+	portal.handleMessage(msg)
 }
 }
 
 
 func (portal *Portal) handleMessage(msg PortalMessage) {
 func (portal *Portal) handleMessage(msg PortalMessage) {
@@ -729,7 +741,11 @@ func (portal *Portal) BackfillHistory(user *User, lastMessageTime uint64) error
 }
 }
 
 
 func (portal *Portal) beginBackfill() func() {
 func (portal *Portal) beginBackfill() func() {
-	portal.backfillLock.Lock()
+	portal.backfillWait.Add(1)
+	select {
+	case portal.backfillStart <- struct{}{}:
+	default:
+	}
 	portal.backfilling = true
 	portal.backfilling = true
 	var privateChatPuppetInvited bool
 	var privateChatPuppetInvited bool
 	var privateChatPuppet *Puppet
 	var privateChatPuppet *Puppet
@@ -747,7 +763,7 @@ func (portal *Portal) beginBackfill() func() {
 	return func() {
 	return func() {
 		portal.backfilling = false
 		portal.backfilling = false
 		portal.privateChatBackfillInvitePuppet = nil
 		portal.privateChatBackfillInvitePuppet = nil
-		portal.backfillLock.Unlock()
+		portal.backfillWait.Done()
 		if privateChatPuppet != nil && privateChatPuppetInvited {
 		if privateChatPuppet != nil && privateChatPuppetInvited {
 			_, _ = privateChatPuppet.DefaultIntent().LeaveRoom(portal.MXID)
 			_, _ = privateChatPuppet.DefaultIntent().LeaveRoom(portal.MXID)
 		}
 		}

+ 3 - 0
user.go

@@ -416,6 +416,7 @@ func (cl ChatList) Swap(i, j int) {
 func (user *User) PostLogin() {
 func (user *User) PostLogin() {
 	user.bridge.Metrics.TrackConnectionState(user.JID, true)
 	user.bridge.Metrics.TrackConnectionState(user.JID, true)
 	user.bridge.Metrics.TrackLoginState(user.JID, true)
 	user.bridge.Metrics.TrackLoginState(user.JID, true)
+	user.bridge.Metrics.TrackBufferLength(user.MXID, 0)
 	user.log.Debugln("Locking processing of incoming messages and starting post-login sync")
 	user.log.Debugln("Locking processing of incoming messages and starting post-login sync")
 	user.syncWait.Add(1)
 	user.syncWait.Add(1)
 	user.syncStart <- struct{}{}
 	user.syncStart <- struct{}{}
@@ -799,6 +800,7 @@ func (user *User) runMessageRingBuffer() {
 	for msg := range user.messageInput {
 	for msg := range user.messageInput {
 		select {
 		select {
 		case user.messageOutput <- msg:
 		case user.messageOutput <- msg:
+			user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
 		default:
 		default:
 			dropped := <-user.messageOutput
 			dropped := <-user.messageOutput
 			user.log.Warnln("Buffer is full, dropping message in", dropped.chat)
 			user.log.Warnln("Buffer is full, dropping message in", dropped.chat)
@@ -811,6 +813,7 @@ func (user *User) handleMessageLoop() {
 	for {
 	for {
 		select {
 		select {
 		case msg := <-user.messageOutput:
 		case msg := <-user.messageOutput:
+			user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
 			user.GetPortalByJID(msg.chat).messages <- msg
 			user.GetPortalByJID(msg.chat).messages <- msg
 		case <-user.syncStart:
 		case <-user.syncStart:
 			user.log.Debugln("Processing of incoming messages is locked")
 			user.log.Debugln("Processing of incoming messages is locked")