|
@@ -22,6 +22,7 @@ import (
|
|
|
"fmt"
|
|
|
"strconv"
|
|
|
"strings"
|
|
|
+ "sync"
|
|
|
"time"
|
|
|
|
|
|
log "maunium.net/go/maulogger/v2"
|
|
@@ -51,6 +52,8 @@ func (bt BackfillType) String() string {
|
|
|
type BackfillQuery struct {
|
|
|
db *Database
|
|
|
log log.Logger
|
|
|
+
|
|
|
+ backfillQueryLock sync.Mutex
|
|
|
}
|
|
|
|
|
|
func (bq *BackfillQuery) New() *Backfill {
|
|
@@ -82,7 +85,13 @@ const (
|
|
|
FROM backfill_queue
|
|
|
WHERE user_mxid=$1
|
|
|
AND type IN (%s)
|
|
|
- AND dispatch_time IS NULL
|
|
|
+ AND (
|
|
|
+ dispatch_time IS NULL
|
|
|
+ OR (
|
|
|
+ dispatch_time < current_timestamp - interval '15 minutes'
|
|
|
+ AND completed_at IS NULL
|
|
|
+ )
|
|
|
+ )
|
|
|
ORDER BY type, priority, queue_id
|
|
|
LIMIT 1
|
|
|
`
|
|
@@ -90,6 +99,9 @@ const (
|
|
|
|
|
|
// GetNext returns the next backfill to perform
|
|
|
func (bq *BackfillQuery) GetNext(userID id.UserID, backfillTypes []BackfillType) (backfill *Backfill) {
|
|
|
+ bq.backfillQueryLock.Lock()
|
|
|
+ defer bq.backfillQueryLock.Unlock()
|
|
|
+
|
|
|
types := []string{}
|
|
|
for _, backfillType := range backfillTypes {
|
|
|
types = append(types, strconv.Itoa(int(backfillType)))
|
|
@@ -107,6 +119,8 @@ func (bq *BackfillQuery) GetNext(userID id.UserID, backfillTypes []BackfillType)
|
|
|
}
|
|
|
|
|
|
func (bq *BackfillQuery) DeleteAll(userID id.UserID) {
|
|
|
+ bq.backfillQueryLock.Lock()
|
|
|
+ defer bq.backfillQueryLock.Unlock()
|
|
|
_, err := bq.db.Exec("DELETE FROM backfill_queue WHERE user_mxid=$1", userID)
|
|
|
if err != nil {
|
|
|
bq.log.Warnfln("Failed to delete backfill queue items for %s: %v", userID, err)
|
|
@@ -114,6 +128,8 @@ func (bq *BackfillQuery) DeleteAll(userID id.UserID) {
|
|
|
}
|
|
|
|
|
|
func (bq *BackfillQuery) DeleteAllForPortal(userID id.UserID, portalKey PortalKey) {
|
|
|
+ bq.backfillQueryLock.Lock()
|
|
|
+ defer bq.backfillQueryLock.Unlock()
|
|
|
_, err := bq.db.Exec(`
|
|
|
DELETE FROM backfill_queue
|
|
|
WHERE user_mxid=$1
|
|
@@ -161,6 +177,9 @@ func (b *Backfill) Scan(row Scannable) *Backfill {
|
|
|
}
|
|
|
|
|
|
func (b *Backfill) Insert() {
|
|
|
+ b.db.Backfill.backfillQueryLock.Lock()
|
|
|
+ defer b.db.Backfill.backfillQueryLock.Unlock()
|
|
|
+
|
|
|
rows, err := b.db.Query(`
|
|
|
INSERT INTO backfill_queue
|
|
|
(user_mxid, type, priority, portal_jid, portal_receiver, time_start, max_batch_events, max_total_events, batch_delay, dispatch_time, completed_at)
|
|
@@ -179,6 +198,9 @@ func (b *Backfill) Insert() {
|
|
|
}
|
|
|
|
|
|
func (b *Backfill) MarkDispatched() {
|
|
|
+ b.db.Backfill.backfillQueryLock.Lock()
|
|
|
+ defer b.db.Backfill.backfillQueryLock.Unlock()
|
|
|
+
|
|
|
if b.QueueID == 0 {
|
|
|
b.log.Errorf("Cannot mark backfill as dispatched without queue_id. Maybe it wasn't actually inserted in the database?")
|
|
|
return
|
|
@@ -190,6 +212,9 @@ func (b *Backfill) MarkDispatched() {
|
|
|
}
|
|
|
|
|
|
func (b *Backfill) MarkDone() {
|
|
|
+ b.db.Backfill.backfillQueryLock.Lock()
|
|
|
+ defer b.db.Backfill.backfillQueryLock.Unlock()
|
|
|
+
|
|
|
if b.QueueID == 0 {
|
|
|
b.log.Errorf("Cannot mark backfill done without queue_id. Maybe it wasn't actually inserted in the database?")
|
|
|
return
|
|
@@ -199,3 +224,79 @@ func (b *Backfill) MarkDone() {
|
|
|
b.log.Warnfln("Failed to mark %s/%s as complete: %v", b.BackfillType, b.Priority, err)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+func (bq *BackfillQuery) NewBackfillState(userID id.UserID, portalKey *PortalKey) *BackfillState {
|
|
|
+ return &BackfillState{
|
|
|
+ db: bq.db,
|
|
|
+ log: bq.log,
|
|
|
+ UserID: userID,
|
|
|
+ Portal: portalKey,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+const (
|
|
|
+ getBackfillState = `
|
|
|
+ SELECT user_mxid, portal_jid, portal_receiver, processing_batch, backfill_complete, first_expected_ts
|
|
|
+ FROM backfill_state
|
|
|
+ WHERE user_mxid=$1
|
|
|
+ AND portal_jid=$2
|
|
|
+ AND portal_receiver=$3
|
|
|
+ `
|
|
|
+)
|
|
|
+
|
|
|
+type BackfillState struct {
|
|
|
+ db *Database
|
|
|
+ log log.Logger
|
|
|
+
|
|
|
+ // Fields
|
|
|
+ UserID id.UserID
|
|
|
+ Portal *PortalKey
|
|
|
+ ProcessingBatch bool
|
|
|
+ BackfillComplete bool
|
|
|
+ FirstExpectedTimestamp uint64
|
|
|
+}
|
|
|
+
|
|
|
+func (b *BackfillState) Scan(row Scannable) *BackfillState {
|
|
|
+ err := row.Scan(&b.UserID, &b.Portal.JID, &b.Portal.Receiver, &b.ProcessingBatch, &b.BackfillComplete, &b.FirstExpectedTimestamp)
|
|
|
+ if err != nil {
|
|
|
+ if !errors.Is(err, sql.ErrNoRows) {
|
|
|
+ b.log.Errorln("Database scan failed:", err)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return b
|
|
|
+}
|
|
|
+
|
|
|
+func (b *BackfillState) Upsert() {
|
|
|
+ _, err := b.db.Exec(`
|
|
|
+ INSERT INTO backfill_state
|
|
|
+ (user_mxid, portal_jid, portal_receiver, processing_batch, backfill_complete, first_expected_ts)
|
|
|
+ VALUES ($1, $2, $3, $4, $5, $6)
|
|
|
+ ON CONFLICT (user_mxid, portal_jid, portal_receiver)
|
|
|
+ DO UPDATE SET
|
|
|
+ processing_batch=EXCLUDED.processing_batch,
|
|
|
+ backfill_complete=EXCLUDED.backfill_complete,
|
|
|
+ first_expected_ts=EXCLUDED.first_expected_ts`,
|
|
|
+ b.UserID, b.Portal.JID, b.Portal.Receiver, b.ProcessingBatch, b.BackfillComplete, b.FirstExpectedTimestamp)
|
|
|
+ if err != nil {
|
|
|
+ b.log.Warnfln("Failed to insert backfill state for %s: %v", b.Portal.JID, err)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (b *BackfillState) SetProcessingBatch(processing bool) {
|
|
|
+ b.ProcessingBatch = processing
|
|
|
+ b.Upsert()
|
|
|
+}
|
|
|
+
|
|
|
+func (bq *BackfillQuery) GetBackfillState(userID id.UserID, portalKey *PortalKey) (backfillState *BackfillState) {
|
|
|
+ rows, err := bq.db.Query(getBackfillState, userID, portalKey.JID, portalKey.Receiver)
|
|
|
+ if err != nil || rows == nil {
|
|
|
+ bq.log.Error(err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ defer rows.Close()
|
|
|
+ if rows.Next() {
|
|
|
+ backfillState = bq.NewBackfillState(userID, portalKey).Scan(rows)
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|