ソースを参照

Merge pull request #484 from mautrix/sumner/bri-3057

backfill: fixed bug where the media backfill loop would sleep too often
Sumner Evans 3 年 前
コミット
86840dec59
2 ファイル変更20 行追加33 行削除
  1. 9 23
      backfillqueue.go
  2. 11 10
      historysync.go

+ 9 - 23
backfillqueue.go

@@ -32,30 +32,13 @@ type BackfillQueue struct {
 	log log.Logger
 }
 
-func (bq *BackfillQueue) RunLoops(user *User) {
-	go bq.immediateBackfillLoop(user)
-	bq.deferredBackfillLoop(user)
-}
-
-func (bq *BackfillQueue) immediateBackfillLoop(user *User) {
+// Immediate backfills should happen first, then deferred backfills and lastly
+// media backfills.
+func (bq *BackfillQueue) RunLoop(user *User) {
 	for {
-		if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); backfill != nil {
-			bq.ImmediateBackfillRequests <- backfill
-			backfill.MarkDone()
-		} else {
-			select {
-			case <-bq.ReCheckQueue:
-			case <-time.After(10 * time.Second):
-			}
-		}
-	}
-}
-
-func (bq *BackfillQueue) deferredBackfillLoop(user *User) {
-	for {
-		// Finish all immediate backfills before doing the deferred ones.
 		if immediate := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); immediate != nil {
-			time.Sleep(10 * time.Second)
+			bq.ImmediateBackfillRequests <- immediate
+			immediate.MarkDone()
 		} else if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil {
 			bq.DeferredBackfillRequests <- backfill
 			backfill.MarkDone()
@@ -63,7 +46,10 @@ func (bq *BackfillQueue) deferredBackfillLoop(user *User) {
 			bq.DeferredBackfillRequests <- mediaBackfill
 			mediaBackfill.MarkDone()
 		} else {
-			time.Sleep(10 * time.Second)
+			select {
+			case <-bq.ReCheckQueue:
+			case <-time.After(time.Minute):
+			}
 		}
 	}
 }

+ 11 - 10
historysync.go

@@ -67,7 +67,7 @@ func (user *User) handleHistorySyncsLoop() {
 	// overload the homeserver. Users can configure their backfill stages
 	// to be more or less aggressive with backfilling at this stage.
 	go user.handleBackfillRequestsLoop(user.BackfillQueue.DeferredBackfillRequests)
-	go user.BackfillQueue.RunLoops(user)
+	go user.BackfillQueue.RunLoop(user)
 
 	// Always save the history syncs for the user. If they want to enable
 	// backfilling in the future, we will have it in the database.
@@ -78,7 +78,7 @@ func (user *User) handleHistorySyncsLoop() {
 
 func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Backfill) {
 	for req := range backfillRequests {
-		user.log.Debugfln("Handling backfill request %s", req)
+		user.log.Infofln("Handling backfill request %s", req)
 		conv := user.bridge.DB.HistorySyncQuery.GetConversation(user.MXID, req.Portal)
 		if conv == nil {
 			user.log.Debugfln("Could not find history sync conversation data for %s", req.Portal.String())
@@ -96,16 +96,17 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac
 				endTime = *req.TimeEnd
 			}
 
-			user.log.Debugfln("Backfilling media from %v to %v for %s", startTime, endTime, portal.Key.String())
+			user.log.Infofln("Backfilling media from %v to %v for %s", startTime, endTime, portal.Key.String())
 
 			// Go through all of the messages in the given time range,
 			// requesting any media that errored.
 			requested := 0
 			for _, msg := range user.bridge.DB.Message.GetMessagesBetween(portal.Key, startTime, endTime) {
-				if requested > 0 && requested%req.MaxBatchEvents == 0 {
-					time.Sleep(time.Duration(req.BatchDelay) * time.Second)
-				}
 				if msg.Error == database.MsgErrMediaNotFound {
+					if requested > 0 && requested%req.MaxBatchEvents == 0 {
+						time.Sleep(time.Duration(req.BatchDelay) * time.Second)
+					}
+
 					portal.requestMediaRetry(user, msg.MXID)
 					requested += 1
 				}
@@ -155,7 +156,7 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill
 		}
 	}
 
-	user.log.Debugfln("Backfilling %d messages in %s, %d messages at a time", len(allMsgs), portal.Key.JID, req.MaxBatchEvents)
+	user.log.Infofln("Backfilling %d messages in %s, %d messages at a time (queue ID: %d)", len(allMsgs), portal.Key.JID, req.MaxBatchEvents, req.QueueID)
 	toBackfill := allMsgs[0:]
 	var insertionEventIds []id.EventID
 	for len(toBackfill) > 0 {
@@ -180,10 +181,10 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill
 			time.Unix(int64(allMsgs[0].GetMessageTimestamp()), 0),
 			insertionEventIds[0])
 	}
-	user.log.Debugfln("Deleting %d history sync messages after backfilling", len(allMsgs))
+	user.log.Debugfln("Deleting %d history sync messages after backfilling (queue ID: %d)", len(allMsgs), req.QueueID)
 	err := user.bridge.DB.HistorySyncQuery.DeleteMessages(user.MXID, conv.ConversationID, allMsgs)
 	if err != nil {
-		user.log.Warnfln("Failed to delete %d history sync messages after backfilling: %v", len(allMsgs), err)
+		user.log.Warnfln("Failed to delete %d history sync messages after backfilling (queue ID: %d): %v", len(allMsgs), req.QueueID, err)
 	}
 
 	if !conv.MarkedAsUnread && conv.UnreadCount == 0 {
@@ -422,7 +423,7 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo)
 		newMinTs = lastMessage.Timestamp
 	}
 
-	portal.log.Infofln("Processing history sync with %d messages", len(messages))
+	portal.log.Debugfln("Processing backfill with %d messages", len(messages))
 	// The messages are ordered newest to oldest, so iterate them in reverse order.
 	for i := len(messages) - 1; i >= 0; i-- {
 		webMsg := messages[i]