瀏覽代碼

backfill queue: don't delete, just mark as complete

Sumner Evans 3 年之前
父節點
當前提交
005fbb09f8
共有 3 個文件被更改,包括 11 次插入8 次删除
  1. 2 2
      backfillqueue.go
  2. 8 6
      database/backfillqueue.go
  3. 1 0
      database/upgrades/2022-03-15-prioritized-backfill.go

+ 2 - 2
backfillqueue.go

@@ -41,7 +41,7 @@ func (bq *BackfillQueue) immediateBackfillLoop(user *User) {
 	for {
 		if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); backfill != nil {
 			bq.ImmediateBackfillRequests <- backfill
-			backfill.Delete()
+			backfill.MarkDone()
 		} else {
 			select {
 			case <-bq.ReCheckQueue:
@@ -61,7 +61,7 @@ func (bq *BackfillQueue) deferredBackfillLoop(user *User) {
 
 		if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil {
 			bq.DeferredBackfillRequests <- backfill
-			backfill.Delete()
+			backfill.MarkDone()
 		} else {
 			time.Sleep(10 * time.Second)
 		}

+ 8 - 6
database/backfillqueue.go

@@ -67,6 +67,7 @@ const (
 		  FROM backfill_queue
 		 WHERE user_mxid=$1
 		   AND type=$2
+		   AND completed_at IS NULL
 	  ORDER BY priority, queue_id
 	     LIMIT 1
 	`
@@ -106,6 +107,7 @@ type Backfill struct {
 	MaxBatchEvents int
 	MaxTotalEvents int
 	BatchDelay     int
+	CompletedAt    *time.Time
 }
 
 func (b *Backfill) Scan(row Scannable) *Backfill {
@@ -122,10 +124,10 @@ func (b *Backfill) Scan(row Scannable) *Backfill {
 func (b *Backfill) Insert() {
 	rows, err := b.db.Query(`
 		INSERT INTO backfill_queue
-			(user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay)
-		VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
+			(user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay, completed_at)
+		VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
 		RETURNING queue_id
-	`, b.UserID, b.BackfillType, b.Priority, b.Portal.JID, b.Portal.Receiver, b.TimeStart, b.TimeEnd, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay)
+	`, b.UserID, b.BackfillType, b.Priority, b.Portal.JID, b.Portal.Receiver, b.TimeStart, b.TimeEnd, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay, b.CompletedAt)
 	defer rows.Close()
 	if err != nil || !rows.Next() {
 		b.log.Warnfln("Failed to insert %v/%s with priority %d: %v", b.BackfillType, b.Portal.JID, b.Priority, err)
@@ -137,13 +139,13 @@ func (b *Backfill) Insert() {
 	}
 }
 
-func (b *Backfill) Delete() {
+func (b *Backfill) MarkDone() {
 	if b.QueueID == 0 {
 		b.log.Errorf("Cannot delete backfill without queue_id. Maybe it wasn't actually inserted in the database?")
 		return
 	}
-	_, err := b.db.Exec("DELETE FROM backfill_queue WHERE queue_id=$1", b.QueueID)
+	_, err := b.db.Exec("UPDATE backfill_queue SET completed_at=$1 WHERE queue_id=$2", time.Now(), b.QueueID)
 	if err != nil {
-		b.log.Warnfln("Failed to delete %s/%s: %v", b.BackfillType, b.Priority, err)
+		b.log.Warnfln("Failed to mark %s/%s as complete: %v", b.BackfillType, b.Priority, err)
 	}
 }

+ 1 - 0
database/upgrades/2022-03-15-prioritized-backfill.go

@@ -17,6 +17,7 @@ func init() {
 				max_batch_events    INTEGER NOT NULL,
 				max_total_events    INTEGER,
 				batch_delay         INTEGER,
+				completed_at        TIMESTAMP,
 
 				FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,
 				FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE