|
@@ -279,28 +279,31 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History
|
|
|
}
|
|
|
|
|
|
nMostRecent := user.bridge.DB.HistorySyncQuery.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations)
|
|
|
- var priorityCounter int
|
|
|
- for _, conv := range nMostRecent {
|
|
|
- jid, err := types.ParseJID(conv.ConversationID)
|
|
|
- if err != nil {
|
|
|
- user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.ConversationID, err)
|
|
|
- continue
|
|
|
+ if len(nMostRecent) > 0 {
|
|
|
+ // Find the portals for all of the conversations.
|
|
|
+ portals := []*Portal{}
|
|
|
+ for _, conv := range nMostRecent {
|
|
|
+ jid, err := types.ParseJID(conv.ConversationID)
|
|
|
+ if err != nil {
|
|
|
+ user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.ConversationID, err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ portals = append(portals, user.GetPortalByJID(jid))
|
|
|
}
|
|
|
- portal := user.GetPortalByJID(jid)
|
|
|
|
|
|
switch evt.GetSyncType() {
|
|
|
case waProto.HistorySync_INITIAL_BOOTSTRAP:
|
|
|
// Enqueue immediate backfills for the most recent messages first.
|
|
|
- user.EnqueueImmedateBackfill(portal, &priorityCounter)
|
|
|
+ user.EnqueueImmedateBackfills(portals)
|
|
|
case waProto.HistorySync_FULL, waProto.HistorySync_RECENT:
|
|
|
// Enqueue deferred backfills as configured.
|
|
|
- user.EnqueueDeferredBackfills(portal, &priorityCounter)
|
|
|
- user.EnqueueMediaBackfills(portal, &priorityCounter)
|
|
|
+ user.EnqueueDeferredBackfills(portals)
|
|
|
+ user.EnqueueMediaBackfills(portals)
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- // Tell the queue to check for new backfill requests.
|
|
|
- reCheckQueue <- true
|
|
|
+ // Tell the queue to check for new backfill requests.
|
|
|
+ reCheckQueue <- true
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -312,38 +315,43 @@ func getConversationTimestamp(conv *waProto.Conversation) uint64 {
|
|
|
return convTs
|
|
|
}
|
|
|
|
|
|
-func (user *User) EnqueueImmedateBackfill(portal *Portal, priorityCounter *int) {
|
|
|
- maxMessages := user.bridge.Config.Bridge.HistorySync.Immediate.MaxEvents
|
|
|
- initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, *priorityCounter, &portal.Key, nil, nil, maxMessages, maxMessages, 0)
|
|
|
- initialBackfill.Insert()
|
|
|
- *priorityCounter++
|
|
|
+func (user *User) EnqueueImmedateBackfills(portals []*Portal) {
|
|
|
+ for priority, portal := range portals {
|
|
|
+ maxMessages := user.bridge.Config.Bridge.HistorySync.Immediate.MaxEvents
|
|
|
+ initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, priority, &portal.Key, nil, nil, maxMessages, maxMessages, 0)
|
|
|
+ initialBackfill.Insert()
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-func (user *User) EnqueueDeferredBackfills(portal *Portal, priorityCounter *int) {
|
|
|
- for _, backfillStage := range user.bridge.Config.Bridge.HistorySync.Deferred {
|
|
|
- var startDate *time.Time = nil
|
|
|
- if backfillStage.StartDaysAgo > 0 {
|
|
|
- startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo)
|
|
|
- startDate = &startDaysAgo
|
|
|
+func (user *User) EnqueueDeferredBackfills(portals []*Portal) {
|
|
|
+ numPortals := len(portals)
|
|
|
+ for stageIdx, backfillStage := range user.bridge.Config.Bridge.HistorySync.Deferred {
|
|
|
+ for portalIdx, portal := range portals {
|
|
|
+ var startDate *time.Time = nil
|
|
|
+ if backfillStage.StartDaysAgo > 0 {
|
|
|
+ startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo)
|
|
|
+ startDate = &startDaysAgo
|
|
|
+ }
|
|
|
+ backfillMessages := user.bridge.DB.BackfillQuery.NewWithValues(
|
|
|
+ user.MXID, database.BackfillDeferred, stageIdx*numPortals+portalIdx, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
|
|
|
+ backfillMessages.Insert()
|
|
|
}
|
|
|
- backfillMessages := user.bridge.DB.BackfillQuery.NewWithValues(
|
|
|
- user.MXID, database.BackfillDeferred, *priorityCounter, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
|
|
|
- backfillMessages.Insert()
|
|
|
- *priorityCounter++
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (user *User) EnqueueMediaBackfills(portal *Portal, priorityCounter *int) {
|
|
|
- for _, backfillStage := range user.bridge.Config.Bridge.HistorySync.Media {
|
|
|
- var startDate *time.Time = nil
|
|
|
- if backfillStage.StartDaysAgo > 0 {
|
|
|
- startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo)
|
|
|
- startDate = &startDaysAgo
|
|
|
- }
|
|
|
- backfill := user.bridge.DB.BackfillQuery.NewWithValues(
|
|
|
- user.MXID, database.BackfillMedia, *priorityCounter, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
|
|
|
- backfill.Insert()
|
|
|
- *priorityCounter++
|
|
|
+func (user *User) EnqueueMediaBackfills(portals []*Portal) {
|
|
|
+ numPortals := len(portals)
|
|
|
+ for stageIdx, backfillStage := range user.bridge.Config.Bridge.HistorySync.Media {
|
|
|
+ for portalIdx, portal := range portals {
|
|
|
+ var startDate *time.Time = nil
|
|
|
+ if backfillStage.StartDaysAgo > 0 {
|
|
|
+ startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo)
|
|
|
+ startDate = &startDaysAgo
|
|
|
+ }
|
|
|
+ backfill := user.bridge.DB.BackfillQuery.NewWithValues(
|
|
|
+ user.MXID, database.BackfillMedia, stageIdx*numPortals+portalIdx, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
|
|
|
+ backfill.Insert()
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|