Parcourir la source

Fix backfill queue query. Fixes #517

Tulir Asokan il y a 3 ans
Parent
commit
8012368de5
2 fichiers modifiés avec 16 ajouts et 7 suppressions
  1. 13 7
      database/backfill.go
  2. 3 0
      database/historysync.go

+ 13 - 7
database/backfill.go

@@ -90,7 +90,7 @@ const (
 			AND (
 				dispatch_time IS NULL
 				OR (
-					dispatch_time < current_timestamp - interval '15 minutes'
+					dispatch_time < $2
 					AND completed_at IS NULL
 				)
 			)
@@ -112,13 +112,13 @@ func (bq *BackfillQuery) GetNext(userID id.UserID, backfillTypes []BackfillType)
 	bq.backfillQueryLock.Lock()
 	defer bq.backfillQueryLock.Unlock()
 
-	types := []string{}
+	var types []string
 	for _, backfillType := range backfillTypes {
 		types = append(types, strconv.Itoa(int(backfillType)))
 	}
-	rows, err := bq.db.Query(fmt.Sprintf(getNextBackfillQuery, strings.Join(types, ",")), userID)
+	rows, err := bq.db.Query(fmt.Sprintf(getNextBackfillQuery, strings.Join(types, ",")), userID, time.Now().Add(-15*time.Minute))
 	if err != nil || rows == nil {
-		bq.log.Error(err)
+		bq.log.Errorfln("Failed to query next backfill queue job: %v", err)
 		return
 	}
 	defer rows.Close()
@@ -142,6 +142,9 @@ func (bq *BackfillQuery) HasUnstartedOrInFlightOfType(userID id.UserID, backfill
 	}
 	rows, err := bq.db.Query(fmt.Sprintf(getUnstartedOrInFlightQuery, strings.Join(types, ",")), userID)
 	if err != nil || rows == nil {
+		if err != nil && !errors.Is(err, sql.ErrNoRows) {
+			bq.log.Warnfln("Failed to query backfill queue jobs: %v", err)
+		}
 		// No rows means that there are no unstarted or in flight backfill
 		// requests.
 		return false
@@ -198,13 +201,16 @@ func (b *Backfill) String() string {
 }
 
 func (b *Backfill) Scan(row dbutil.Scannable) *Backfill {
-	err := row.Scan(&b.QueueID, &b.UserID, &b.BackfillType, &b.Priority, &b.Portal.JID, &b.Portal.Receiver, &b.TimeStart, &b.MaxBatchEvents, &b.MaxTotalEvents, &b.BatchDelay)
+	var maxTotalEvents, batchDelay sql.NullInt32
+	err := row.Scan(&b.QueueID, &b.UserID, &b.BackfillType, &b.Priority, &b.Portal.JID, &b.Portal.Receiver, &b.TimeStart, &b.MaxBatchEvents, &maxTotalEvents, &batchDelay)
 	if err != nil {
 		if !errors.Is(err, sql.ErrNoRows) {
 			b.log.Errorln("Database scan failed:", err)
 		}
 		return nil
 	}
+	b.MaxTotalEvents = int(maxTotalEvents.Int32)
+	b.BatchDelay = int(batchDelay.Int32)
 	return b
 }
 
@@ -234,7 +240,7 @@ func (b *Backfill) MarkDispatched() {
 	defer b.db.Backfill.backfillQueryLock.Unlock()
 
 	if b.QueueID == 0 {
-		b.log.Errorf("Cannot mark backfill as dispatched without queue_id. Maybe it wasn't actually inserted in the database?")
+		b.log.Errorfln("Cannot mark backfill as dispatched without queue_id. Maybe it wasn't actually inserted in the database?")
 		return
 	}
 	_, err := b.db.Exec("UPDATE backfill_queue SET dispatch_time=$1 WHERE queue_id=$2", time.Now(), b.QueueID)
@@ -248,7 +254,7 @@ func (b *Backfill) MarkDone() {
 	defer b.db.Backfill.backfillQueryLock.Unlock()
 
 	if b.QueueID == 0 {
-		b.log.Errorf("Cannot mark backfill done without queue_id. Maybe it wasn't actually inserted in the database?")
+		b.log.Errorfln("Cannot mark backfill done without queue_id. Maybe it wasn't actually inserted in the database?")
 		return
 	}
 	_, err := b.db.Exec("UPDATE backfill_queue SET completed_at=$1 WHERE queue_id=$2", time.Now(), b.QueueID)

+ 3 - 0
database/historysync.go

@@ -275,6 +275,9 @@ func (hsq *HistorySyncQuery) GetMessagesBetween(userID id.UserID, conversationID
 	rows, err := hsq.db.Query(fmt.Sprintf(getMessagesBetween, whereClauses, limitClause), args...)
 	defer rows.Close()
 	if err != nil || rows == nil {
+		if err != nil && !errors.Is(err, sql.ErrNoRows) {
+			hsq.log.Warnfln("Failed to query messages between range: %v", err)
+		}
 		return nil
 	}