Browse Source

Split forward backfilling and do it in one batch

Multiple batches is complicated, as it would require figuring out where
the batch ended when handling the next batch of new-old messages.
Tulir Asokan 3 years ago
parent
commit
331138bc6b
1 changed files with 74 additions and 71 deletions
  1. 74 71
      historysync.go

+ 74 - 71
historysync.go

@@ -128,12 +128,12 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac
 				portal.Update()
 			}
 
-			user.createOrUpdatePortalAndBackfillWithLock(req, conv, portal)
+			user.backfillInChunks(req, conv, portal)
 		}
 	}
 }
 
-func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill, conv *database.HistorySyncConversation, portal *Portal) {
+func (user *User) backfillInChunks(req *database.Backfill, conv *database.HistorySyncConversation, portal *Portal) {
 	portal.backfillLock.Lock()
 	defer portal.backfillLock.Unlock()
 
@@ -141,6 +141,22 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill
 		return
 	}
 
+	var forwardPrevID id.EventID
+	if req.BackfillType == database.BackfillForward {
+		// TODO this overrides the TimeStart set when enqueuing the backfill
+		//      maybe the enqueue should instead include the prev event ID
+		lastMessage := portal.bridge.DB.Message.GetLastInChat(portal.Key)
+		forwardPrevID = lastMessage.MXID
+		start := lastMessage.Timestamp.Add(1 * time.Second)
+		req.TimeStart = &start
+	} else {
+		firstMessage := portal.bridge.DB.Message.GetFirstInChat(portal.Key)
+		if firstMessage != nil && (req.TimeEnd == nil || firstMessage.Timestamp.Before(*req.TimeEnd)) {
+			end := firstMessage.Timestamp.Add(-1 * time.Second)
+			req.TimeEnd = &end
+			user.log.Debugfln("Limiting backfill to end at %v", end)
+		}
+	}
 	allMsgs := user.bridge.DB.HistorySyncQuery.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents)
 	if len(allMsgs) == 0 {
 		user.log.Debugfln("Not backfilling %s: no bridgeable messages found", portal.Key.JID)
@@ -161,7 +177,7 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill
 	var insertionEventIds []id.EventID
 	for len(toBackfill) > 0 {
 		var msgs []*waProto.WebMessageInfo
-		if len(toBackfill) <= req.MaxBatchEvents {
+		if len(toBackfill) <= req.MaxBatchEvents || req.MaxBatchEvents < 0 {
 			msgs = toBackfill
 			toBackfill = nil
 		} else {
@@ -172,7 +188,10 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill
 		if len(msgs) > 0 {
 			time.Sleep(time.Duration(req.BatchDelay) * time.Second)
 			user.log.Debugfln("Backfilling %d messages in %s (queue ID: %d)", len(msgs), portal.Key.JID, req.QueueID)
-			insertionEventIds = append(insertionEventIds, portal.backfill(user, msgs)...)
+			resp := portal.backfill(user, msgs, req.BackfillType == database.BackfillForward, forwardPrevID)
+			if resp != nil {
+				insertionEventIds = append(insertionEventIds, resp.BaseInsertionEventID)
+			}
 		}
 	}
 	user.log.Debugfln("Finished backfilling %d messages in %s (queue ID: %d)", len(allMsgs), portal.Key.JID, req.QueueID)
@@ -297,6 +316,7 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History
 				// Enqueue immediate backfills for the most recent messages first.
 				user.EnqueueImmedateBackfills(portals)
 			case waProto.HistorySync_FULL, waProto.HistorySync_RECENT:
+				user.EnqueueForwardBackfills(portals)
 				// Enqueue deferred backfills as configured.
 				user.EnqueueDeferredBackfills(portals)
 				user.EnqueueMediaBackfills(portals)
@@ -340,6 +360,18 @@ func (user *User) EnqueueDeferredBackfills(portals []*Portal) {
 	}
 }
 
+func (user *User) EnqueueForwardBackfills(portals []*Portal) {
+	for priority, portal := range portals {
+		lastMsg := user.bridge.DB.Message.GetLastInChat(portal.Key)
+		if lastMsg == nil {
+			continue
+		}
+		backfill := user.bridge.DB.BackfillQuery.NewWithValues(
+			user.MXID, database.BackfillForward, priority, &portal.Key, &lastMsg.Timestamp, nil, -1, -1, 0)
+		backfill.Insert()
+	}
+}
+
 func (user *User) EnqueueMediaBackfills(portals []*Portal) {
 	numPortals := len(portals)
 	for stageIdx, backfillStage := range user.bridge.Config.Bridge.HistorySync.Media {
@@ -367,16 +399,26 @@ var (
 	HistorySyncMarker     = event.Type{Type: "org.matrix.msc2716.marker", Class: event.MessageEventType}
 )
 
-func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) []id.EventID {
-	var historyBatch, newBatch mautrix.ReqBatchSend
-	var historyBatchInfos, newBatchInfos []*wrappedInfo
+func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo, isForward bool, prevEventID id.EventID) *mautrix.RespBatchSend {
+	var req mautrix.ReqBatchSend
+	var infos []*wrappedInfo
 
-	firstMsgTimestamp := time.Unix(int64(messages[len(messages)-1].GetMessageTimestamp()), 0)
+	if !isForward {
+		if portal.FirstEventID != "" || portal.NextBatchID != "" {
+			req.PrevEventID = portal.FirstEventID
+			req.BatchID = portal.NextBatchID
+		} else {
+			portal.log.Warnfln("Can't backfill %d messages through %s to chat: first event ID not known", len(messages), source.MXID)
+			return nil
+		}
+	} else {
+		req.PrevEventID = prevEventID
+	}
 
-	historyBatch.StateEventsAtStart = make([]*event.Event, 0)
-	newBatch.StateEventsAtStart = make([]*event.Event, 0)
+	beforeFirstMessageTimestampMillis := (int64(messages[len(messages)-1].GetMessageTimestamp()) * 1000) - 1
+	req.StateEventsAtStart = make([]*event.Event, 0)
 
-	addedMembers := make(map[id.UserID]*event.MemberEventContent)
+	addedMembers := make(map[id.UserID]struct{})
 	addMember := func(puppet *Puppet) {
 		if _, alreadyAdded := addedMembers[puppet.MXID]; alreadyAdded {
 			return
@@ -389,41 +431,23 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo)
 		}
 		inviteContent := content
 		inviteContent.Membership = event.MembershipInvite
-		historyBatch.StateEventsAtStart = append(historyBatch.StateEventsAtStart, &event.Event{
+		req.StateEventsAtStart = append(req.StateEventsAtStart, &event.Event{
 			Type:      event.StateMember,
 			Sender:    portal.MainIntent().UserID,
 			StateKey:  &mxid,
-			Timestamp: firstMsgTimestamp.UnixMilli(),
+			Timestamp: beforeFirstMessageTimestampMillis,
 			Content:   event.Content{Parsed: &inviteContent},
 		}, &event.Event{
 			Type:      event.StateMember,
 			Sender:    puppet.MXID,
 			StateKey:  &mxid,
-			Timestamp: firstMsgTimestamp.UnixMilli(),
+			Timestamp: beforeFirstMessageTimestampMillis,
 			Content:   event.Content{Parsed: &content},
 		})
-		addedMembers[puppet.MXID] = &content
-	}
-
-	firstMessage := portal.bridge.DB.Message.GetFirstInChat(portal.Key)
-	lastMessage := portal.bridge.DB.Message.GetLastInChat(portal.Key)
-	var historyMaxTs, newMinTs time.Time
-
-	if portal.FirstEventID != "" || portal.NextBatchID != "" {
-		historyBatch.PrevEventID = portal.FirstEventID
-		historyBatch.BatchID = portal.NextBatchID
-		if firstMessage == nil && lastMessage == nil {
-			historyMaxTs = time.Now()
-		} else {
-			historyMaxTs = firstMessage.Timestamp
-		}
-	}
-	if lastMessage != nil {
-		newBatch.PrevEventID = lastMessage.MXID
-		newMinTs = lastMessage.Timestamp
+		addedMembers[puppet.MXID] = struct{}{}
 	}
 
-	portal.log.Debugfln("Processing backfill with %d messages", len(messages))
+	portal.log.Infofln("Processing history sync with %d messages (forward: %t)", len(messages), isForward)
 	// The messages are ordered newest to oldest, so iterate them in reverse order.
 	for i := len(messages) - 1; i >= 0; i-- {
 		webMsg := messages[i]
@@ -446,15 +470,6 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo)
 		if info == nil {
 			continue
 		}
-		var batch *mautrix.ReqBatchSend
-		var infos *[]*wrappedInfo
-		if !historyMaxTs.IsZero() && info.Timestamp.Before(historyMaxTs) {
-			batch, infos = &historyBatch, &historyBatchInfos
-		} else if !newMinTs.IsZero() && info.Timestamp.After(newMinTs) {
-			batch, infos = &newBatch, &newBatchInfos
-		} else {
-			continue
-		}
 		if webMsg.GetPushName() != "" && webMsg.GetPushName() != "-" {
 			existingContact, _ := source.Client.Store.Contacts.GetContact(info.Sender)
 			if !existingContact.Found || existingContact.PushName == "" {
@@ -484,13 +499,18 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo)
 		if len(converted.ReplyTo) > 0 {
 			portal.SetReply(converted.Content, converted.ReplyTo)
 		}
-		err := portal.appendBatchEvents(converted, info, webMsg.GetEphemeralStartTimestamp(), &batch.Events, infos)
+		err := portal.appendBatchEvents(converted, info, webMsg.GetEphemeralStartTimestamp(), &req.Events, &infos)
 		if err != nil {
 			portal.log.Errorfln("Error handling message %s during backfill: %v", info.ID, err)
 		}
 	}
+	portal.log.Infofln("Made %d Matrix events from messages in batch", len(req.Events))
+
+	if len(req.Events) == 0 {
+		return nil
+	}
 
-	if (len(historyBatch.Events) > 0 && len(historyBatch.BatchID) == 0) || len(newBatch.Events) > 0 {
+	if len(req.BatchID) == 0 || isForward {
 		portal.log.Debugln("Sending a dummy event to avoid forward extremity errors with backfill")
 		_, err := portal.MainIntent().SendMessageEvent(portal.MXID, PreBackfillDummyEvent, struct{}{})
 		if err != nil {
@@ -498,33 +518,16 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo)
 		}
 	}
 
-	var insertionEventIds []id.EventID
-
-	if len(historyBatch.Events) > 0 && len(historyBatch.PrevEventID) > 0 {
-		portal.log.Infofln("Sending %d historical messages...", len(historyBatch.Events))
-		historyResp, err := portal.MainIntent().BatchSend(portal.MXID, &historyBatch)
-		if err != nil {
-			portal.log.Errorln("Error sending batch of historical messages:", err)
-		} else {
-			insertionEventIds = append(insertionEventIds, historyResp.BaseInsertionEventID)
-			portal.finishBatch(historyResp.EventIDs, historyBatchInfos)
-			portal.NextBatchID = historyResp.NextBatchID
-			portal.Update()
-		}
-	}
-
-	if len(newBatch.Events) > 0 && len(newBatch.PrevEventID) > 0 {
-		portal.log.Infofln("Sending %d new messages...", len(newBatch.Events))
-		newResp, err := portal.MainIntent().BatchSend(portal.MXID, &newBatch)
-		if err != nil {
-			portal.log.Errorln("Error sending batch of new messages:", err)
-		} else {
-			insertionEventIds = append(insertionEventIds, newResp.BaseInsertionEventID)
-			portal.finishBatch(newResp.EventIDs, newBatchInfos)
-		}
+	resp, err := portal.MainIntent().BatchSend(portal.MXID, &req)
+	if err != nil {
+		portal.log.Errorln("Error batch sending messages:", err)
+		return nil
+	} else {
+		portal.finishBatch(resp.EventIDs, infos)
+		portal.NextBatchID = resp.NextBatchID
+		portal.Update()
+		return resp
 	}
-
-	return insertionEventIds
 }
 
 func (portal *Portal) parseWebMessageInfo(source *User, webMsg *waProto.WebMessageInfo) *types.MessageInfo {