Эх сурвалжийг харах

backfill: backfill conversation when started

This applies to when the conversation is started via the provisioning
API (start new chat) or when a new message comes in to that portal.
Sumner Evans 3 жил өмнө
parent
commit
8a49fea812
5 өөрчлөгдсөн 24 нэмэгдсэн , 32 устгасан
  1. 1 1
      commands.go
  2. 11 25
      historysync.go
  3. 8 2
      portal.go
  4. 1 1
      provisioning.go
  5. 3 3
      user.go

+ 1 - 1
commands.go

@@ -1059,7 +1059,7 @@ func (handler *CommandHandler) CommandOpen(ce *CommandEvent) {
 		portal.UpdateMatrixRoom(ce.User, info)
 		ce.Reply("Portal room synced.")
 	} else {
-		err = portal.CreateMatrixRoom(ce.User, info, true)
+		err = portal.CreateMatrixRoom(ce.User, info, true, true)
 		if err != nil {
 			ce.Reply("Failed to create room: %v", err)
 		} else {

+ 11 - 25
historysync.go

@@ -78,6 +78,10 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac
 	for req := range backfillRequests {
 		user.log.Infof("Backfill request: %v", req)
 		conv := user.bridge.DB.HistorySyncQuery.GetConversation(user.MXID, req.Portal)
+		if conv == nil {
+			user.log.Errorf("Could not find conversation for %s in %s", user.MXID, req.Portal.String())
+			continue
+		}
 
 		// Update the client store with basic chat settings.
 		if conv.MuteEndTime.After(time.Now()) {
@@ -102,7 +106,7 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac
 
 		if len(portal.MXID) == 0 {
 			user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling")
-			err := portal.CreateMatrixRoom(user, nil, true)
+			err := portal.CreateMatrixRoom(user, nil, true, false)
 			if err != nil {
 				user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err)
 				continue
@@ -122,8 +126,6 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac
 					break
 				}
 
-				time.Sleep(time.Duration(req.BatchDelay) * time.Second)
-
 				var msgs []*waProto.WebMessageInfo
 				if len(toBackfill) <= req.MaxBatchEvents {
 					msgs = toBackfill
@@ -134,6 +136,7 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac
 				}
 
 				if len(msgs) > 0 {
+					time.Sleep(time.Duration(req.BatchDelay) * time.Second)
 					user.log.Debugf("Backfilling %d messages in %s", len(msgs), portal.Key.JID)
 					insertionEventIds = append(insertionEventIds, portal.backfill(user, msgs)...)
 				}
@@ -224,8 +227,7 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History
 	// If this was the initial bootstrap, enqueue immediate backfills for the
 	// most recent portals. If it's the last history sync event, start
 	// backfilling the rest of the history of the portals.
-	historySyncConfig := user.bridge.Config.Bridge.HistorySync
-	if historySyncConfig.Backfill && (evt.GetSyncType() == waProto.HistorySync_FULL || evt.GetSyncType() == waProto.HistorySync_INITIAL_BOOTSTRAP) {
+	if user.bridge.Config.Bridge.HistorySync.Backfill && evt.GetSyncType() == waProto.HistorySync_FULL {
 		nMostRecent := user.bridge.DB.HistorySyncQuery.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations)
 		for i, conv := range nMostRecent {
 			jid, err := types.ParseJID(conv.ConversationID)
@@ -235,26 +237,10 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History
 			}
 			portal := user.GetPortalByJID(jid)
 
-			switch evt.GetSyncType() {
-			case waProto.HistorySync_INITIAL_BOOTSTRAP:
-				// Enqueue immediate backfills for the most recent messages first.
-				maxMessages := historySyncConfig.Immediate.MaxEvents
-				initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, i, &portal.Key, nil, nil, maxMessages, maxMessages, 0)
-				initialBackfill.Insert()
-
-			case waProto.HistorySync_FULL:
-				// Enqueue deferred backfills as configured.
-				for j, backfillStage := range historySyncConfig.Deferred {
-					var startDate *time.Time = nil
-					if backfillStage.StartDaysAgo > 0 {
-						startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo)
-						startDate = &startDaysAgo
-					}
-					backfill := user.bridge.DB.BackfillQuery.NewWithValues(
-						user.MXID, database.BackfillDeferred, j*len(nMostRecent)+i, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
-					backfill.Insert()
-				}
-			}
+			// Enqueue immediate backfills for the most recent messages first.
+			user.EnqueueImmedateBackfill(portal, i)
+			// Enqueue deferred backfills as configured.
+			user.EnqueueDeferredBackfills(portal, len(nMostRecent), i)
 		}
 
 		// Tell the queue to check for new backfill requests.

+ 8 - 2
portal.go

@@ -237,7 +237,7 @@ func (portal *Portal) handleMessageLoopItem(msg PortalMessage) {
 			return
 		}
 		portal.log.Debugln("Creating Matrix room from incoming message")
-		err := portal.CreateMatrixRoom(msg.source, nil, false)
+		err := portal.CreateMatrixRoom(msg.source, nil, false, true)
 		if err != nil {
 			portal.log.Errorln("Failed to create portal room:", err)
 			return
@@ -1163,7 +1163,7 @@ func (portal *Portal) UpdateBridgeInfo() {
 	}
 }
 
-func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, isFullInfo bool) error {
+func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, isFullInfo, backfill bool) error {
 	portal.roomCreateLock.Lock()
 	defer portal.roomCreateLock.Unlock()
 	if len(portal.MXID) > 0 {
@@ -1336,6 +1336,12 @@ func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, i
 		portal.FirstEventID = firstEventResp.EventID
 		portal.Update()
 	}
+
+	if user.bridge.Config.Bridge.HistorySync.Backfill && backfill {
+		user.EnqueueImmedateBackfill(portal, 0)
+		user.EnqueueDeferredBackfills(portal, 1, 0)
+		user.BackfillQueue.ReCheckQueue <- true
+	}
 	return nil
 }
 

+ 1 - 1
provisioning.go

@@ -346,7 +346,7 @@ func (prov *ProvisioningAPI) OpenGroup(w http.ResponseWriter, r *http.Request) {
 		portal := user.GetPortalByJID(info.JID)
 		status := http.StatusOK
 		if len(portal.MXID) == 0 {
-			err = portal.CreateMatrixRoom(user, info, true)
+			err = portal.CreateMatrixRoom(user, info, true, true)
 			if err != nil {
 				jsonResponse(w, http.StatusInternalServerError, Error{
 					Error: fmt.Sprintf("Failed to create portal: %v", err),

+ 3 - 3
user.go

@@ -937,7 +937,7 @@ func (user *User) ResyncGroups(createPortals bool) error {
 		portal := user.GetPortalByJID(group.JID)
 		if len(portal.MXID) == 0 {
 			if createPortals {
-				err = portal.CreateMatrixRoom(user, group, true)
+				err = portal.CreateMatrixRoom(user, group, true, true)
 				if err != nil {
 					return fmt.Errorf("failed to create room for %s: %w", group.JID, err)
 				}
@@ -1020,7 +1020,7 @@ func (user *User) markSelfReadFull(portal *Portal) {
 func (user *User) handleGroupCreate(evt *events.JoinedGroup) {
 	portal := user.GetPortalByJID(evt.JID)
 	if len(portal.MXID) == 0 {
-		err := portal.CreateMatrixRoom(user, &evt.GroupInfo, true)
+		err := portal.CreateMatrixRoom(user, &evt.GroupInfo, true, true)
 		if err != nil {
 			user.log.Errorln("Failed to create Matrix room after join notification: %v", err)
 		}
@@ -1088,7 +1088,7 @@ func (user *User) StartPM(jid types.JID, reason string) (*Portal, *Puppet, bool,
 			return portal, puppet, false, nil
 		}
 	}
-	err := portal.CreateMatrixRoom(user, nil, false)
+	err := portal.CreateMatrixRoom(user, nil, false, true)
 	return portal, puppet, true, err
 }