浏览代码

historysync: refactor to utilize backfill queue

Also sends the `m.room.marker` event when a backfill stage is complete.
Sumner Evans 3 年之前
父节点
当前提交
12a23e2ca5
共有 5 个文件被更改,包括 308 次插入179 次删除
  1. 69 0
      backfillqueue.go
  2. 4 1
      database/historysync.go
  3. 230 171
      historysync.go
  4. 3 0
      provisioning.go
  5. 2 7
      user.go

+ 69 - 0
backfillqueue.go

@@ -0,0 +1,69 @@
+// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
+// Copyright (C) 2021 Tulir Asokan, Sumner Evans
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+package main
+
+import (
+	"time"
+
+	log "maunium.net/go/maulogger/v2"
+	"maunium.net/go/mautrix-whatsapp/database"
+)
+
+type BackfillQueue struct {
+	BackfillQuery             *database.BackfillQuery
+	ImmediateBackfillRequests chan *database.Backfill
+	DeferredBackfillRequests  chan *database.Backfill
+	ReCheckQueue              chan bool
+
+	log log.Logger
+}
+
+func (bq *BackfillQueue) RunLoops(user *User) {
+	go bq.immediateBackfillLoop(user)
+	bq.deferredBackfillLoop(user)
+}
+
+func (bq *BackfillQueue) immediateBackfillLoop(user *User) {
+	for {
+		if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); backfill != nil {
+			bq.ImmediateBackfillRequests <- backfill
+			backfill.Delete()
+		} else {
+			select {
+			case <-bq.ReCheckQueue:
+			case <-time.After(10 * time.Second):
+			}
+		}
+	}
+}
+
+func (bq *BackfillQueue) deferredBackfillLoop(user *User) {
+	for {
+		// Finish all immediate backfills before doing the deferred ones.
+		if immediate := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); immediate != nil {
+			time.Sleep(10 * time.Second)
+			continue
+		}
+
+		if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil {
+			bq.DeferredBackfillRequests <- backfill
+			backfill.Delete()
+		} else {
+			time.Sleep(10 * time.Second)
+		}
+	}
+}

+ 4 - 1
database/historysync.go

@@ -117,7 +117,10 @@ func (hsc *HistorySyncConversation) Upsert() {
 		DO UPDATE SET
 		DO UPDATE SET
 			portal_jid=EXCLUDED.portal_jid,
 			portal_jid=EXCLUDED.portal_jid,
 			portal_receiver=EXCLUDED.portal_receiver,
 			portal_receiver=EXCLUDED.portal_receiver,
-			last_message_timestamp=EXCLUDED.last_message_timestamp,
+			last_message_timestamp=CASE
+				WHEN EXCLUDED.last_message_timestamp > history_sync_conversation.last_message_timestamp THEN EXCLUDED.last_message_timestamp
+				ELSE history_sync_conversation.last_message_timestamp
+			END,
 			archived=EXCLUDED.archived,
 			archived=EXCLUDED.archived,
 			pinned=EXCLUDED.pinned,
 			pinned=EXCLUDED.pinned,
 			mute_end_time=EXCLUDED.mute_end_time,
 			mute_end_time=EXCLUDED.mute_end_time,

+ 230 - 171
historysync.go

@@ -18,8 +18,6 @@ package main
 
 
 import (
 import (
 	"fmt"
 	"fmt"
-	"sort"
-	"sync"
 	"time"
 	"time"
 
 
 	waProto "go.mau.fi/whatsmeow/binary/proto"
 	waProto "go.mau.fi/whatsmeow/binary/proto"
@@ -35,12 +33,6 @@ import (
 
 
 // region User history sync handling
 // region User history sync handling
 
 
-type portalToBackfill struct {
-	portal *Portal
-	conv   *waProto.Conversation
-	msgs   []*waProto.WebMessageInfo
-}
-
 type wrappedInfo struct {
 type wrappedInfo struct {
 	*types.MessageInfo
 	*types.MessageInfo
 	Type  database.MessageType
 	Type  database.MessageType
@@ -50,134 +42,118 @@ type wrappedInfo struct {
 	ExpiresIn       uint32
 	ExpiresIn       uint32
 }
 }
 
 
-type conversationList []*waProto.Conversation
+func (user *User) handleHistorySyncsLoop() {
+	reCheckQueue := make(chan bool, 1)
+	if user.bridge.Config.Bridge.HistorySync.Backfill {
+		// Start the backfill queue.
+		user.BackfillQueue = &BackfillQueue{
+			BackfillQuery:             user.bridge.DB.BackfillQuery,
+			ImmediateBackfillRequests: make(chan *database.Backfill, 1),
+			DeferredBackfillRequests:  make(chan *database.Backfill, 1),
+			ReCheckQueue:              make(chan bool, 1),
+			log:                       user.log.Sub("BackfillQueue"),
+		}
+		reCheckQueue = user.BackfillQueue.ReCheckQueue
 
 
-var _ sort.Interface = (conversationList)(nil)
+		// Immediate backfills can be done in parallel
+		for i := 0; i < user.bridge.Config.Bridge.HistorySync.Immediate.WorkerCount; i++ {
+			go user.handleBackfillRequestsLoop(user.BackfillQueue.ImmediateBackfillRequests)
+		}
 
 
-func (c conversationList) Len() int {
-	return len(c)
-}
+		// Deferred backfills should be handled synchronously so as not to
+		// overload the homeserver. Users can configure their backfill stages
+		// to be more or less aggressive with backfilling at this stage.
+		go user.handleBackfillRequestsLoop(user.BackfillQueue.DeferredBackfillRequests)
+		go user.BackfillQueue.RunLoops(user)
+	}
 
 
-func (c conversationList) Less(i, j int) bool {
-	return getConversationTimestamp(c[i]) < getConversationTimestamp(c[j])
+	// Always save the history syncs for the user. If they want to enable
+	// backfilling in the future, we will have it in the database.
+	for evt := range user.historySyncs {
+		user.handleHistorySync(reCheckQueue, evt.Data)
+	}
 }
 }
 
 
-func (c conversationList) Swap(i, j int) {
-	c[i], c[j] = c[j], c[i]
-}
+func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Backfill) {
+	for req := range backfillRequests {
+		user.log.Infof("Backfill request: %v", req)
+		conv := user.bridge.DB.HistorySyncQuery.GetConversation(user.MXID, req.Portal)
 
 
-func (user *User) handleHistorySyncsLoop() {
-	for evt := range user.historySyncs {
-		go user.sendBridgeState(BridgeState{StateEvent: StateBackfilling})
-		user.handleHistorySync(evt.Data)
-		if len(user.historySyncs) == 0 && user.IsConnected() {
-			go user.sendBridgeState(BridgeState{StateEvent: StateConnected})
+		// Update the client store with basic chat settings.
+		if conv.MuteEndTime.After(time.Now()) {
+			user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime)
+		}
+		if conv.Archived {
+			user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true)
+		}
+		if conv.Pinned > 0 {
+			user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true)
 		}
 		}
-	}
-}
 
 
-func (user *User) handleHistorySync(evt *waProto.HistorySync) {
-	if evt == nil || evt.SyncType == nil || evt.GetSyncType() == waProto.HistorySync_INITIAL_STATUS_V3 || evt.GetSyncType() == waProto.HistorySync_PUSH_NAME {
-		return
-	}
-	description := fmt.Sprintf("type %s, %d conversations, chunk order %d, progress %d%%", evt.GetSyncType(), len(evt.GetConversations()), evt.GetChunkOrder(), evt.GetProgress())
-	user.log.Infoln("Handling history sync with", description)
-
-	conversations := conversationList(evt.GetConversations())
-	// We want to handle recent conversations first
-	sort.Sort(sort.Reverse(conversations))
-	portalsToBackfill := make(chan portalToBackfill, len(conversations))
-
-	var backfillWait sync.WaitGroup
-	backfillWait.Add(1)
-	go user.backfillLoop(portalsToBackfill, backfillWait.Done)
-	for _, conv := range conversations {
-		user.handleHistorySyncConversation(conv, portalsToBackfill)
-	}
-	close(portalsToBackfill)
-	backfillWait.Wait()
-	user.log.Infoln("Finished handling history sync with", description)
-}
+		portal := user.GetPortalByJID(conv.PortalKey.JID)
+		if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration {
+			portal.ExpirationTime = *conv.EphemeralExpiration
+			portal.Update()
+		}
 
 
-func (user *User) backfillLoop(ch chan portalToBackfill, done func()) {
-	defer done()
-	for ptb := range ch {
-		if len(ptb.msgs) > 0 {
-			user.log.Debugln("Bridging history sync payload for", ptb.portal.Key.JID)
-			ptb.portal.backfill(user, ptb.msgs)
-		} else {
-			user.log.Debugfln("Not backfilling %s: no bridgeable messages found", ptb.portal.Key.JID)
+		if !user.shouldCreatePortalForHistorySync(conv, portal) {
+			continue
 		}
 		}
-		if !ptb.conv.GetMarkedAsUnread() && ptb.conv.GetUnreadCount() == 0 {
-			user.markSelfReadFull(ptb.portal)
+
+		if len(portal.MXID) == 0 {
+			user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling")
+			err := portal.CreateMatrixRoom(user, nil, true)
+			if err != nil {
+				user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err)
+				continue
+			}
+		} else {
+			portal.UpdateMatrixRoom(user, nil)
 		}
 		}
-	}
-}
 
 
-func (user *User) handleHistorySyncConversation(conv *waProto.Conversation, portalsToBackfill chan portalToBackfill) {
-	jid, err := types.ParseJID(conv.GetId())
-	if err != nil {
-		user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err)
-		return
-	}
+		allMsgs := user.bridge.DB.HistorySyncQuery.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents)
 
 
-	// Update the client store with basic chat settings.
-	muteEnd := time.Unix(int64(conv.GetMuteEndTime()), 0)
-	if muteEnd.After(time.Now()) {
-		_ = user.Client.Store.ChatSettings.PutMutedUntil(jid, muteEnd)
-	}
-	if conv.GetArchived() {
-		_ = user.Client.Store.ChatSettings.PutArchived(jid, true)
-	}
-	if conv.GetPinned() > 0 {
-		_ = user.Client.Store.ChatSettings.PutPinned(jid, true)
-	}
+		if len(allMsgs) > 0 {
+			user.log.Debugf("Backfilling %d messages in %s, %d messages at a time", len(allMsgs), portal.Key.JID, req.MaxBatchEvents)
+			toBackfill := allMsgs[0:]
+			insertionEventIds := []id.EventID{}
+			for {
+				if len(toBackfill) == 0 {
+					break
+				}
 
 
-	portal := user.GetPortalByJID(jid)
-	if conv.EphemeralExpiration != nil && portal.ExpirationTime != conv.GetEphemeralExpiration() {
-		portal.ExpirationTime = conv.GetEphemeralExpiration()
-		portal.Update()
-	}
-	// Check if portal is too old or doesn't contain anything we can bridge.
-	if !user.shouldCreatePortalForHistorySync(conv, portal) {
-		return
-	}
+				time.Sleep(time.Duration(req.BatchDelay) * time.Second)
 
 
-	var msgs []*waProto.WebMessageInfo
-	if user.bridge.Config.Bridge.HistorySync.Backfill {
-		msgs = filterMessagesToBackfill(conv.GetMessages())
-	}
-	ptb := portalToBackfill{portal: portal, conv: conv, msgs: msgs}
-	if len(portal.MXID) == 0 {
-		user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling")
-		err = portal.CreateMatrixRoom(user, getPartialInfoFromConversation(jid, conv), false)
-		if err != nil {
-			user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err)
-			return
-		}
-	} else {
-		portal.UpdateMatrixRoom(user, nil)
-	}
-	if !user.bridge.Config.Bridge.HistorySync.Backfill {
-		user.log.Debugln("Backfill is disabled, not bridging history sync payload for", portal.Key.JID)
-	} else {
-		portalsToBackfill <- ptb
-	}
-}
+				var msgs []*waProto.WebMessageInfo
+				if len(toBackfill) <= req.MaxBatchEvents {
+					msgs = toBackfill
+					toBackfill = toBackfill[0:0]
+				} else {
+					msgs = toBackfill[len(toBackfill)-req.MaxBatchEvents:]
+					toBackfill = toBackfill[:len(toBackfill)-req.MaxBatchEvents]
+				}
 
 
-func getConversationTimestamp(conv *waProto.Conversation) uint64 {
-	convTs := conv.GetConversationTimestamp()
-	if convTs == 0 && len(conv.GetMessages()) > 0 {
-		convTs = conv.Messages[0].GetMessage().GetMessageTimestamp()
+				if len(msgs) > 0 {
+					user.log.Debugf("Backfilling %d messages in %s", len(msgs), portal.Key.JID)
+					insertionEventIds = append(insertionEventIds, portal.backfill(user, msgs)...)
+				}
+			}
+			user.log.Debugf("Finished backfilling %d messages in %s", len(allMsgs), portal.Key.JID)
+			if len(insertionEventIds) > 0 {
+				portal.sendPostBackfillDummy(
+					time.Unix(int64(allMsgs[len(allMsgs)-1].GetMessageTimestamp()), 0),
+					insertionEventIds[0])
+			}
+		} else {
+			user.log.Debugfln("Not backfilling %s: no bridgeable messages found", portal.Key.JID)
+		}
+		if !conv.MarkedAsUnread && conv.UnreadCount == 0 {
+			user.markSelfReadFull(portal)
+		}
 	}
 	}
-	return convTs
 }
 }
 
 
-func (user *User) shouldCreatePortalForHistorySync(conv *waProto.Conversation, portal *Portal) bool {
-	maxAge := user.bridge.Config.Bridge.HistorySync.MaxAge
-	minLastMsgToCreate := time.Now().Add(-time.Duration(maxAge) * time.Second)
-	lastMsg := time.Unix(int64(getConversationTimestamp(conv)), 0)
-
+func (user *User) shouldCreatePortalForHistorySync(conv *database.HistorySyncConversation, portal *Portal) bool {
 	if len(portal.MXID) > 0 {
 	if len(portal.MXID) > 0 {
 		user.log.Debugfln("Portal for %s already exists, ensuring user is invited", portal.Key.JID)
 		user.log.Debugfln("Portal for %s already exists, ensuring user is invited", portal.Key.JID)
 		portal.ensureUserInvited(user)
 		portal.ensureUserInvited(user)
@@ -185,10 +161,6 @@ func (user *User) shouldCreatePortalForHistorySync(conv *waProto.Conversation, p
 		return true
 		return true
 	} else if !user.bridge.Config.Bridge.HistorySync.CreatePortals {
 	} else if !user.bridge.Config.Bridge.HistorySync.CreatePortals {
 		user.log.Debugfln("Not creating portal for %s: creating rooms from history sync is disabled", portal.Key.JID)
 		user.log.Debugfln("Not creating portal for %s: creating rooms from history sync is disabled", portal.Key.JID)
-	} else if !containsSupportedMessages(conv) {
-		user.log.Debugfln("Not creating portal for %s: no interesting messages found", portal.Key.JID)
-	} else if maxAge > 0 && !lastMsg.After(minLastMsgToCreate) {
-		user.log.Debugfln("Not creating portal for %s: last message older than limit (%s)", portal.Key.JID, lastMsg)
 	} else {
 	} else {
 		// Portal doesn't exist, but should be created
 		// Portal doesn't exist, but should be created
 		return true
 		return true
@@ -197,47 +169,123 @@ func (user *User) shouldCreatePortalForHistorySync(conv *waProto.Conversation, p
 	return false
 	return false
 }
 }
 
 
-func filterMessagesToBackfill(messages []*waProto.HistorySyncMsg) []*waProto.WebMessageInfo {
-	filtered := make([]*waProto.WebMessageInfo, 0, len(messages))
-	for _, msg := range messages {
-		wmi := msg.GetMessage()
-		msgType := getMessageType(wmi.GetMessage())
-		if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" {
+func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.HistorySync) {
+	if evt == nil || evt.SyncType == nil || evt.GetSyncType() == waProto.HistorySync_INITIAL_STATUS_V3 || evt.GetSyncType() == waProto.HistorySync_PUSH_NAME {
+		return
+	}
+	description := fmt.Sprintf("type %s, %d conversations, chunk order %d", evt.GetSyncType(), len(evt.GetConversations()), evt.GetChunkOrder())
+	user.log.Infoln("Storing history sync with", description)
+
+	for _, conv := range evt.GetConversations() {
+		jid, err := types.ParseJID(conv.GetId())
+		if err != nil {
+			user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err)
 			continue
 			continue
-		} else {
-			filtered = append(filtered, wmi)
+		}
+		portal := user.GetPortalByJID(jid)
+
+		historySyncConversation := user.bridge.DB.HistorySyncQuery.NewConversationWithValues(
+			user.MXID,
+			conv.GetId(),
+			&portal.Key,
+			getConversationTimestamp(conv),
+			conv.GetMuteEndTime(),
+			conv.GetArchived(),
+			conv.GetPinned(),
+			conv.GetDisappearingMode().GetInitiator(),
+			conv.GetEndOfHistoryTransferType(),
+			conv.EphemeralExpiration,
+			conv.GetMarkedAsUnread(),
+			conv.GetUnreadCount())
+		historySyncConversation.Upsert()
+
+		for _, msg := range conv.GetMessages() {
+			// Don't store messages that will just be skipped.
+			wmi := msg.GetMessage()
+			msgType := getMessageType(wmi.GetMessage())
+			if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" {
+				continue
+			}
+
+			// Don't store unsupported messages.
+			if !containsSupportedMessage(msg.GetMessage().GetMessage()) {
+				continue
+			}
+
+			message, err := user.bridge.DB.HistorySyncQuery.NewMessageWithValues(user.MXID, conv.GetId(), msg)
+			if err != nil {
+				user.log.Warnf("Failed to save message %s in %s. Error: %+v", msg.Message.Key.Id, conv.GetId(), err)
+				continue
+			}
+			message.Insert()
 		}
 		}
 	}
 	}
-	return filtered
-}
 
 
-func containsSupportedMessages(conv *waProto.Conversation) bool {
-	for _, msg := range conv.GetMessages() {
-		if containsSupportedMessage(msg.GetMessage().GetMessage()) {
-			return true
+	// If this was the initial bootstrap, enqueue immediate backfills for the
+	// most recent portals. If it's the last history sync event, start
+	// backfilling the rest of the history of the portals.
+	historySyncConfig := user.bridge.Config.Bridge.HistorySync
+	if historySyncConfig.Backfill && (evt.GetSyncType() == waProto.HistorySync_FULL || evt.GetSyncType() == waProto.HistorySync_INITIAL_BOOTSTRAP) {
+		nMostRecent := user.bridge.DB.HistorySyncQuery.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations)
+		for i, conv := range nMostRecent {
+			jid, err := types.ParseJID(conv.ConversationID)
+			if err != nil {
+				user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.ConversationID, err)
+				continue
+			}
+			portal := user.GetPortalByJID(jid)
+
+			switch evt.GetSyncType() {
+			case waProto.HistorySync_INITIAL_BOOTSTRAP:
+				// Enqueue immediate backfills for the most recent messages first.
+				maxMessages := historySyncConfig.Immediate.MaxEvents
+				initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, i, &portal.Key, nil, nil, maxMessages, maxMessages, 0)
+				initialBackfill.Insert()
+
+			case waProto.HistorySync_FULL:
+				// Enqueue deferred backfills as configured.
+				for j, backfillStage := range historySyncConfig.Deferred {
+					var startDate *time.Time = nil
+					if backfillStage.StartDaysAgo > 0 {
+						startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo)
+						startDate = &startDaysAgo
+					}
+					backfill := user.bridge.DB.BackfillQuery.NewWithValues(
+						user.MXID, database.BackfillDeferred, j*len(nMostRecent)+i, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
+					backfill.Insert()
+				}
+			}
 		}
 		}
+
+		// Tell the queue to check for new backfill requests.
+		reCheckQueue <- true
 	}
 	}
-	return false
 }
 }
 
 
-func getPartialInfoFromConversation(jid types.JID, conv *waProto.Conversation) *types.GroupInfo {
-	// TODO broadcast list info?
-	if jid.Server != types.GroupServer {
-		return nil
+func getConversationTimestamp(conv *waProto.Conversation) uint64 {
+	convTs := conv.GetConversationTimestamp()
+	if convTs == 0 && len(conv.GetMessages()) > 0 {
+		convTs = conv.Messages[0].GetMessage().GetMessageTimestamp()
 	}
 	}
-	participants := make([]types.GroupParticipant, len(conv.GetParticipant()))
-	for i, pcp := range conv.GetParticipant() {
-		participantJID, _ := types.ParseJID(pcp.GetUserJid())
-		participants[i] = types.GroupParticipant{
-			JID:          participantJID,
-			IsAdmin:      pcp.GetRank() == waProto.GroupParticipant_ADMIN,
-			IsSuperAdmin: pcp.GetRank() == waProto.GroupParticipant_SUPERADMIN,
+	return convTs
+}
+
+func (user *User) EnqueueImmedateBackfill(portal *Portal, priority int) {
+	maxMessages := user.bridge.Config.Bridge.HistorySync.Immediate.MaxEvents
+	initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, priority, &portal.Key, nil, nil, maxMessages, maxMessages, 0)
+	initialBackfill.Insert()
+}
+
+func (user *User) EnqueueDeferredBackfills(portal *Portal, numConversations, priority int) {
+	for j, backfillStage := range user.bridge.Config.Bridge.HistorySync.Deferred {
+		var startDate *time.Time = nil
+		if backfillStage.StartDaysAgo > 0 {
+			startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo)
+			startDate = &startDaysAgo
 		}
 		}
-	}
-	return &types.GroupInfo{
-		JID:          jid,
-		GroupName:    types.GroupName{Name: conv.GetName()},
-		Participants: participants,
+		backfill := user.bridge.DB.BackfillQuery.NewWithValues(
+			user.MXID, database.BackfillDeferred, j*numConversations+priority, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
+		backfill.Insert()
 	}
 	}
 }
 }
 
 
@@ -246,11 +294,15 @@ func getPartialInfoFromConversation(jid types.JID, conv *waProto.Conversation) *
 
 
 var (
 var (
 	PortalCreationDummyEvent = event.Type{Type: "fi.mau.dummy.portal_created", Class: event.MessageEventType}
 	PortalCreationDummyEvent = event.Type{Type: "fi.mau.dummy.portal_created", Class: event.MessageEventType}
-	BackfillEndDummyEvent    = event.Type{Type: "fi.mau.dummy.backfill_end", Class: event.MessageEventType}
 	PreBackfillDummyEvent    = event.Type{Type: "fi.mau.dummy.pre_backfill", Class: event.MessageEventType}
 	PreBackfillDummyEvent    = event.Type{Type: "fi.mau.dummy.pre_backfill", Class: event.MessageEventType}
+
+	// Marker events for when a backfill finishes
+	BackfillEndDummyEvent = event.Type{Type: "fi.mau.dummy.backfill_end", Class: event.MessageEventType}
+	RoomMarker            = event.Type{Type: "m.room.marker", Class: event.MessageEventType}
+	MSC2716Marker         = event.Type{Type: "org.matrix.msc2716.marker", Class: event.MessageEventType}
 )
 )
 
 
-func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) {
+func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) []id.EventID {
 	portal.backfillLock.Lock()
 	portal.backfillLock.Lock()
 	defer portal.backfillLock.Unlock()
 	defer portal.backfillLock.Unlock()
 
 
@@ -375,32 +427,33 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo)
 		}
 		}
 	}
 	}
 
 
+	insertionEventIds := []id.EventID{}
+
 	if len(historyBatch.Events) > 0 && len(historyBatch.PrevEventID) > 0 {
 	if len(historyBatch.Events) > 0 && len(historyBatch.PrevEventID) > 0 {
 		portal.log.Infofln("Sending %d historical messages...", len(historyBatch.Events))
 		portal.log.Infofln("Sending %d historical messages...", len(historyBatch.Events))
 		historyResp, err := portal.MainIntent().BatchSend(portal.MXID, &historyBatch)
 		historyResp, err := portal.MainIntent().BatchSend(portal.MXID, &historyBatch)
+		insertionEventIds = append(insertionEventIds, historyResp.BaseInsertionEventID)
 		if err != nil {
 		if err != nil {
 			portal.log.Errorln("Error sending batch of historical messages:", err)
 			portal.log.Errorln("Error sending batch of historical messages:", err)
 		} else {
 		} else {
 			portal.finishBatch(historyResp.EventIDs, historyBatchInfos)
 			portal.finishBatch(historyResp.EventIDs, historyBatchInfos)
 			portal.NextBatchID = historyResp.NextBatchID
 			portal.NextBatchID = historyResp.NextBatchID
 			portal.Update()
 			portal.Update()
-			// If batchID is non-empty, it means this is backfilling very old messages, and we don't need a post-backfill dummy.
-			if historyBatch.BatchID == "" {
-				portal.sendPostBackfillDummy(time.UnixMilli(historyBatch.Events[len(historyBatch.Events)-1].Timestamp))
-			}
 		}
 		}
 	}
 	}
 
 
 	if len(newBatch.Events) > 0 && len(newBatch.PrevEventID) > 0 {
 	if len(newBatch.Events) > 0 && len(newBatch.PrevEventID) > 0 {
 		portal.log.Infofln("Sending %d new messages...", len(newBatch.Events))
 		portal.log.Infofln("Sending %d new messages...", len(newBatch.Events))
 		newResp, err := portal.MainIntent().BatchSend(portal.MXID, &newBatch)
 		newResp, err := portal.MainIntent().BatchSend(portal.MXID, &newBatch)
+		insertionEventIds = append(insertionEventIds, newResp.BaseInsertionEventID)
 		if err != nil {
 		if err != nil {
 			portal.log.Errorln("Error sending batch of new messages:", err)
 			portal.log.Errorln("Error sending batch of new messages:", err)
 		} else {
 		} else {
 			portal.finishBatch(newResp.EventIDs, newBatchInfos)
 			portal.finishBatch(newResp.EventIDs, newBatchInfos)
-			portal.sendPostBackfillDummy(time.UnixMilli(newBatch.Events[len(newBatch.Events)-1].Timestamp))
 		}
 		}
 	}
 	}
+
+	return insertionEventIds
 }
 }
 
 
 func (portal *Portal) parseWebMessageInfo(source *User, webMsg *waProto.WebMessageInfo) *types.MessageInfo {
 func (portal *Portal) parseWebMessageInfo(source *User, webMsg *waProto.WebMessageInfo) *types.MessageInfo {
@@ -530,19 +583,25 @@ func (portal *Portal) finishBatchEvt(info *wrappedInfo, eventID id.EventID) {
 	}
 	}
 }
 }
 
 
-func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time) {
-	resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, BackfillEndDummyEvent, struct{}{})
-	if err != nil {
-		portal.log.Errorln("Error sending post-backfill dummy event:", err)
-		return
+func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time, insertionEventId id.EventID) {
+	for _, evtType := range []event.Type{BackfillEndDummyEvent, RoomMarker, MSC2716Marker} {
+		resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, evtType, map[string]interface{}{
+			"org.matrix.msc2716.marker.insertion": insertionEventId,
+			"m.marker.insertion":                  insertionEventId,
+		})
+		if err != nil {
+			portal.log.Errorln("Error sending post-backfill dummy event:", err)
+			return
+		}
+		msg := portal.bridge.DB.Message.New()
+		msg.Chat = portal.Key
+		msg.MXID = resp.EventID
+		msg.JID = types.MessageID(resp.EventID)
+		msg.Timestamp = lastTimestamp.Add(1 * time.Second)
+		msg.Sent = true
+		msg.Insert()
+
 	}
 	}
-	msg := portal.bridge.DB.Message.New()
-	msg.Chat = portal.Key
-	msg.MXID = resp.EventID
-	msg.JID = types.MessageID(resp.EventID)
-	msg.Timestamp = lastTimestamp.Add(1 * time.Second)
-	msg.Sent = true
-	msg.Insert()
 }
 }
 
 
 // endregion
 // endregion

+ 3 - 0
provisioning.go

@@ -436,6 +436,9 @@ func (prov *ProvisioningAPI) Logout(w http.ResponseWriter, r *http.Request) {
 	user.bridge.Metrics.TrackConnectionState(user.JID, false)
 	user.bridge.Metrics.TrackConnectionState(user.JID, false)
 	user.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut})
 	user.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut})
 	user.DeleteSession()
 	user.DeleteSession()
+	prov.bridge.DB.BackfillQuery.DeleteAll(user.MXID)
+	prov.bridge.DB.HistorySyncQuery.DeleteAllConversations(user.MXID)
+	prov.bridge.DB.HistorySyncQuery.DeleteAllMessages(user.MXID)
 	jsonResponse(w, http.StatusOK, Response{true, "Logged out successfully."})
 	jsonResponse(w, http.StatusOK, Response{true, "Logged out successfully."})
 }
 }
 
 

+ 2 - 7
user.go

@@ -74,6 +74,8 @@ type User struct {
 	groupListCache     []*types.GroupInfo
 	groupListCache     []*types.GroupInfo
 	groupListCacheLock sync.Mutex
 	groupListCacheLock sync.Mutex
 	groupListCacheTime time.Time
 	groupListCacheTime time.Time
+
+	BackfillQueue *BackfillQueue
 }
 }
 
 
 func (bridge *Bridge) getUserByMXID(userID id.UserID, onlyIfExists bool) *User {
 func (bridge *Bridge) getUserByMXID(userID id.UserID, onlyIfExists bool) *User {
@@ -561,18 +563,11 @@ func (user *User) HandleEvent(event interface{}) {
 		go user.tryAutomaticDoublePuppeting()
 		go user.tryAutomaticDoublePuppeting()
 	case *events.OfflineSyncPreview:
 	case *events.OfflineSyncPreview:
 		user.log.Infofln("Server says it's going to send %d messages and %d receipts that were missed during downtime", v.Messages, v.Receipts)
 		user.log.Infofln("Server says it's going to send %d messages and %d receipts that were missed during downtime", v.Messages, v.Receipts)
-		go user.sendBridgeState(BridgeState{
-			StateEvent: StateBackfilling,
-			Message:    fmt.Sprintf("backfilling %d messages and %d receipts", v.Messages, v.Receipts),
-		})
 	case *events.OfflineSyncCompleted:
 	case *events.OfflineSyncCompleted:
 		if !user.PhoneRecentlySeen(true) {
 		if !user.PhoneRecentlySeen(true) {
 			user.log.Infofln("Offline sync completed, but phone last seen date is still %s - sending phone offline bridge status", user.PhoneLastSeen)
 			user.log.Infofln("Offline sync completed, but phone last seen date is still %s - sending phone offline bridge status", user.PhoneLastSeen)
 			go user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WAPhoneOffline})
 			go user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WAPhoneOffline})
 		} else {
 		} else {
-			if user.GetPrevBridgeState().StateEvent == StateBackfilling {
-				user.log.Infoln("Offline sync completed")
-			}
 			go user.sendBridgeState(BridgeState{StateEvent: StateConnected})
 			go user.sendBridgeState(BridgeState{StateEvent: StateConnected})
 		}
 		}
 	case *events.AppStateSyncComplete:
 	case *events.AppStateSyncComplete: