Forráskód Böngészése

Prioritize backfilling recent messages and fix some bugs

Tulir Asokan 3 éve
szülő
commit
752aff48f1
8 módosított fájl, 573 hozzáadás és 419 törlés
  1. 6 0
      crypto.go
  2. 2 1
      database/message.go
  3. 0 4
      database/portal.go
  4. 2 2
      go.mod
  5. 4 4
      go.sum
  6. 555 0
      historysync.go
  7. 3 280
      portal.go
  8. 1 128
      user.go

+ 6 - 0
crypto.go

@@ -23,6 +23,8 @@ import (
 	"runtime/debug"
 	"time"
 
+	"github.com/lib/pq"
+
 	"maunium.net/go/maulogger/v2"
 
 	"maunium.net/go/mautrix"
@@ -50,6 +52,10 @@ type CryptoHelper struct {
 	baseLog maulogger.Logger
 }
 
+func init() {
+	crypto.PostgresArrayWrapper = pq.Array
+}
+
 func NewCryptoHelper(bridge *Bridge) Crypto {
 	if !bridge.Config.Bridge.Encryption.Allow {
 		bridge.Log.Debugln("Bridge built with end-to-bridge encryption, but disabled in config")

+ 2 - 1
database/message.go

@@ -18,6 +18,7 @@ package database
 
 import (
 	"database/sql"
+	"errors"
 	"strings"
 	"time"
 
@@ -132,7 +133,7 @@ func (msg *Message) Scan(row Scannable) *Message {
 	var ts int64
 	err := row.Scan(&msg.Chat.JID, &msg.Chat.Receiver, &msg.JID, &msg.MXID, &msg.Sender, &ts, &msg.Sent, &msg.DecryptionError)
 	if err != nil {
-		if err != sql.ErrNoRows {
+		if !errors.Is(err, sql.ErrNoRows) {
 			msg.log.Errorln("Database scan failed:", err)
 		}
 		return nil

+ 0 - 4
database/portal.go

@@ -31,10 +31,6 @@ type PortalKey struct {
 	Receiver types.JID
 }
 
-func GroupPortalKey(jid types.JID) PortalKey {
-	return NewPortalKey(jid, jid)
-}
-
 func NewPortalKey(jid, receiver types.JID) PortalKey {
 	if jid.Server == types.GroupServer {
 		receiver = jid

+ 2 - 2
go.mod

@@ -8,13 +8,13 @@ require (
 	github.com/mattn/go-sqlite3 v1.14.9
 	github.com/prometheus/client_golang v1.11.0
 	github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
-	go.mau.fi/whatsmeow v0.0.0-20211103085107-c2cda88e7160
+	go.mau.fi/whatsmeow v0.0.0-20211103125516-00d0df2dd132
 	golang.org/x/image v0.0.0-20210628002857-a66eb6448b8d
 	google.golang.org/protobuf v1.27.1
 	gopkg.in/yaml.v2 v2.4.0
 	maunium.net/go/mauflag v1.0.0
 	maunium.net/go/maulogger/v2 v2.3.1
-	maunium.net/go/mautrix v0.10.0
+	maunium.net/go/mautrix v0.10.1-0.20211103193019-010782c6021e
 )
 
 require (

+ 4 - 4
go.sum

@@ -139,8 +139,8 @@ github.com/tidwall/sjson v1.2.3 h1:5+deguEhHSEjmuICXZ21uSSsXotWMA0orU783+Z7Cp8=
 github.com/tidwall/sjson v1.2.3/go.mod h1:5WdjKx3AQMvCJ4RG6/2UYT7dLrGvJUV1x4jdTAyGvZs=
 go.mau.fi/libsignal v0.0.0-20211024113310-f9fc6a1855f2 h1:xpQTMgJGGaF+c8jV/LA/FVXAPJxZbSAGeflOc+Ly6uQ=
 go.mau.fi/libsignal v0.0.0-20211024113310-f9fc6a1855f2/go.mod h1:3XlVlwOfp8f9Wri+C1D4ORqgUsN4ZvunJOoPjQMBhos=
-go.mau.fi/whatsmeow v0.0.0-20211103085107-c2cda88e7160 h1:fts4reZEaE04eqhpKqcWgqnVPcHLlqewy7dcvPBtEtQ=
-go.mau.fi/whatsmeow v0.0.0-20211103085107-c2cda88e7160/go.mod h1:ODEmmqeUn9eBDQHFc1S902YA3YFLtmaBujYRRFl53jI=
+go.mau.fi/whatsmeow v0.0.0-20211103125516-00d0df2dd132 h1:wgpiQPdoKCTyhK/GfNfeNhbXjG5A3Fjkbfsmww6ojcY=
+go.mau.fi/whatsmeow v0.0.0-20211103125516-00d0df2dd132/go.mod h1:ODEmmqeUn9eBDQHFc1S902YA3YFLtmaBujYRRFl53jI=
 golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@@ -219,5 +219,5 @@ maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
 maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
 maunium.net/go/maulogger/v2 v2.3.1 h1:fwBYJne0pHvJrrIPHK+TAPfyxxbBEz46oVGez2x0ODE=
 maunium.net/go/maulogger/v2 v2.3.1/go.mod h1:TYWy7wKwz/tIXTpsx8G3mZseIRiC5DoMxSZazOHy68A=
-maunium.net/go/mautrix v0.10.0 h1:DMymRnen+E2FWyhhRACJ8LCqiwIetVj2C2676eWMSTw=
-maunium.net/go/mautrix v0.10.0/go.mod h1:3U7pOAx4bxdIVJuunLDAToI+M7YwxcGMm74zBmX5aY0=
+maunium.net/go/mautrix v0.10.1-0.20211103193019-010782c6021e h1:yvKCw2P/nHEfjvKAVA8aokbe1UYIgJcswnv50EZaSRU=
+maunium.net/go/mautrix v0.10.1-0.20211103193019-010782c6021e/go.mod h1:k4Ng5oci83MEbqPL6KOjPdbU7f8v01KlMjR/zTQ+7mA=

+ 555 - 0
historysync.go

@@ -0,0 +1,555 @@
+// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
+// Copyright (C) 2021 Tulir Asokan
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+package main
+
+import (
+	"sort"
+	"sync"
+	"time"
+
+	waProto "go.mau.fi/whatsmeow/binary/proto"
+	"go.mau.fi/whatsmeow/types"
+	"maunium.net/go/mautrix"
+	"maunium.net/go/mautrix/appservice"
+	"maunium.net/go/mautrix/event"
+	"maunium.net/go/mautrix/id"
+)
+
+// region User history sync handling
+
+const (
+	FastBackfillPortalCount  = 20
+	FastBackfillMessageCount = 20
+	FastBackfillMessageCap   = 30
+)
+
+type portalToBackfill struct {
+	portal *Portal
+	conv   *waProto.Conversation
+	msgs   []*waProto.WebMessageInfo
+}
+
+type conversationList []*waProto.Conversation
+
+var _ sort.Interface = (conversationList)(nil)
+
+func (c conversationList) Len() int {
+	return len(c)
+}
+
+func (c conversationList) Less(i, j int) bool {
+	return c[i].GetConversationTimestamp() < c[j].GetConversationTimestamp()
+}
+
+func (c conversationList) Swap(i, j int) {
+	c[i], c[j] = c[j], c[i]
+}
+
+func (user *User) handleHistorySyncsLoop() {
+	for evt := range user.historySyncs {
+		user.handleHistorySync(evt.Data)
+	}
+}
+
+func (user *User) handleHistorySync(evt *waProto.HistorySync) {
+	if evt.GetSyncType() != waProto.HistorySync_RECENT && evt.GetSyncType() != waProto.HistorySync_FULL {
+		return
+	}
+	user.log.Infofln("Handling history sync with type %s, chunk order %d, progress %d%%", evt.GetSyncType(), evt.GetChunkOrder(), evt.GetProgress())
+
+	conversations := conversationList(evt.GetConversations())
+	// We want to handle recent conversations first
+	sort.Sort(sort.Reverse(conversations))
+	portalsToBackfill := make(chan portalToBackfill, len(conversations))
+
+	var backfillWait, fastBackfillWait sync.WaitGroup
+	var fastBackfillWaitDoneOnce sync.Once
+	// We have to add 1 to the waitgroup beforehand to make sure the wait in the goroutine doesn't finish
+	// before we add the actual numbers.
+	fastBackfillWait.Add(1)
+	backfillWait.Add(1)
+	go func() {
+		// Wait for the fast parallelized backfill to complete, then start the slow backfill loop (see explanation below)
+		fastBackfillWait.Wait()
+		user.slowBackfillLoop(portalsToBackfill, backfillWait.Done)
+	}()
+	for i, conv := range conversations {
+		// This will create portals and start backfilling for them.
+		//
+		// The first 20 (FastBackfillPortalCount) portals will be parallelized, where the portal is
+		// created and recent messages are backfilled in parallel. Other portals will be created
+		// synchronously (and this will only return when they're created).
+		//
+		// For said other portals, and older messages in the parallelized portals, backfilling also
+		// happens synchronously: the portals and messages to backfill are added to the
+		// portalsToBackfill channel, which is consumed one-by-one in the slowBackfillLoop method.
+		// That loop is only started after the fast parallelized backfill is completed.
+		user.handleHistorySyncConversation(i, conv, &fastBackfillWait, portalsToBackfill)
+		if i == FastBackfillPortalCount {
+			// There won't be any more portals going the fast backfill route, so remove the 1 item
+			// that was added to the wait group at the beginning.
+			fastBackfillWaitDoneOnce.Do(fastBackfillWait.Done)
+		}
+	}
+	fastBackfillWaitDoneOnce.Do(fastBackfillWait.Done)
+	// Wait for fast backfill to complete to make sure everything necessary is in the slow backfill queue,
+	// then close the slow backfill queue and wait for the loop to finish handling the queue.
+	fastBackfillWait.Wait()
+	close(portalsToBackfill)
+	backfillWait.Wait()
+	user.log.Infofln("Finished handling history sync with type %s, chunk order %d, progress %d%%", evt.GetSyncType(), evt.GetChunkOrder(), evt.GetProgress())
+}
+
+func (user *User) slowBackfillLoop(ch chan portalToBackfill, done func()) {
+	defer done()
+	for ptb := range ch {
+		user.log.Debugln("Bridging history sync payload for", ptb.portal.Key.JID)
+		ptb.portal.backfill(user, ptb.msgs)
+		if !ptb.conv.GetMarkedAsUnread() && ptb.conv.GetUnreadCount() == 0 {
+			user.markSelfReadFull(ptb.portal)
+		}
+	}
+}
+
+func (user *User) handleHistorySyncConversation(index int, conv *waProto.Conversation, fastBackfillWait *sync.WaitGroup, portalsToBackfill chan portalToBackfill) {
+	jid, err := types.ParseJID(conv.GetId())
+	if err != nil {
+		user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err)
+		return
+	}
+
+	// Update the client store with basic chat settings.
+	muteEnd := time.Unix(int64(conv.GetMuteEndTime()), 0)
+	if muteEnd.After(time.Now()) {
+		_ = user.Client.Store.ChatSettings.PutMutedUntil(jid, muteEnd)
+	}
+	if conv.GetArchived() {
+		_ = user.Client.Store.ChatSettings.PutArchived(jid, true)
+	}
+	if conv.GetPinned() > 0 {
+		_ = user.Client.Store.ChatSettings.PutPinned(jid, true)
+	}
+
+	portal := user.GetPortalByJID(jid)
+	// Check if portal is too old or doesn't contain anything we can bridge.
+	if !user.shouldCreatePortalForHistorySync(conv, portal) {
+		return
+	}
+
+	var msgs []*waProto.WebMessageInfo
+	if user.bridge.Config.Bridge.HistorySync.Backfill {
+		msgs = filterMessagesToBackfill(conv.GetMessages())
+	}
+	ptb := portalToBackfill{portal: portal, conv: conv, msgs: msgs}
+	if len(portal.MXID) == 0 {
+		// For the first few chats, do the portal creation and some backfilling in parallel to populate the chat list ASAP
+		if index < FastBackfillPortalCount {
+			fastBackfillWait.Add(1)
+			go user.fastBackfillRoutine(ptb, fastBackfillWait.Done, portalsToBackfill)
+			return
+		}
+		user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling")
+		err = portal.CreateMatrixRoom(user, getPartialInfoFromConversation(jid, conv), false)
+		if err != nil {
+			user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err)
+			return
+		}
+	}
+	if !user.bridge.Config.Bridge.HistorySync.Backfill {
+		user.log.Debugln("Backfill is disabled, not bridging history sync payload for", portal.Key.JID)
+	} else {
+		portalsToBackfill <- ptb
+	}
+}
+
+func (user *User) shouldCreatePortalForHistorySync(conv *waProto.Conversation, portal *Portal) bool {
+	maxAge := user.bridge.Config.Bridge.HistorySync.MaxAge
+	minLastMsgToCreate := time.Now().Add(-time.Duration(maxAge) * time.Second)
+	lastMsg := time.Unix(int64(conv.GetConversationTimestamp()), 0)
+
+	if len(portal.MXID) > 0 {
+		user.log.Debugfln("Portal for %s already exists, ensuring user is invited", portal.Key.JID)
+		portal.ensureUserInvited(user)
+		// Portal exists, let backfill continue
+		return true
+	} else if !user.bridge.Config.Bridge.HistorySync.CreatePortals {
+		user.log.Debugfln("Not creating portal for %s: creating rooms from history sync is disabled", portal.Key.JID)
+	} else if !containsSupportedMessages(conv) {
+		user.log.Debugfln("Not creating portal for %s: no interesting messages found", portal.Key.JID)
+	} else if maxAge > 0 && !lastMsg.After(minLastMsgToCreate) {
+		user.log.Debugfln("Not creating portal for %s: last message older than limit (%s)", portal.Key.JID, lastMsg)
+	} else {
+		// Portal doesn't exist, but should be created
+		return true
+	}
+	// Portal shouldn't be created, reason logged above
+	return false
+}
+
+func (user *User) fastBackfillRoutine(ptb portalToBackfill, done func(), slowBackfillChan chan portalToBackfill) {
+	defer done()
+
+	user.log.Debugln("Asynchronously creating portal for", ptb.portal.Key.JID, "as part of history sync handling")
+	err := ptb.portal.CreateMatrixRoom(user, getPartialInfoFromConversation(ptb.portal.Key.JID, ptb.conv), false)
+	if err != nil {
+		user.log.Warnfln("Failed to create room for %s during backfill: %v", ptb.portal.Key.JID, err)
+	}
+
+	if user.bridge.Config.Bridge.HistorySync.Backfill {
+		if len(ptb.msgs) > FastBackfillMessageCap {
+			user.log.Debugln("Bridging first 20 messages of history sync payload for", ptb.portal.Key.JID, "(async)")
+			ptb.portal.backfill(user, ptb.msgs[:FastBackfillMessageCount])
+			// Send the rest of the messages off to the slow backfill queue
+			ptb.msgs = ptb.msgs[FastBackfillMessageCount:]
+			slowBackfillChan <- ptb
+		} else {
+			user.log.Debugln("Bridging history sync payload for", ptb.portal.Key.JID, "(async)")
+			ptb.portal.backfill(user, ptb.msgs)
+		}
+	} else {
+		user.log.Debugln("Backfill is disabled, not bridging history sync payload for", ptb.portal.Key.JID)
+	}
+}
+
+func filterMessagesToBackfill(messages []*waProto.HistorySyncMsg) []*waProto.WebMessageInfo {
+	filtered := make([]*waProto.WebMessageInfo, 0, len(messages))
+	for _, msg := range messages {
+		wmi := msg.GetMessage()
+		msgType := getMessageType(wmi.GetMessage())
+		if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" {
+			continue
+		} else {
+			filtered = append(filtered, wmi)
+		}
+	}
+	return filtered
+}
+
+func containsSupportedMessages(conv *waProto.Conversation) bool {
+	for _, msg := range conv.GetMessages() {
+		if containsSupportedMessage(msg.GetMessage().GetMessage()) {
+			return true
+		}
+	}
+	return false
+}
+
+func getPartialInfoFromConversation(jid types.JID, conv *waProto.Conversation) *types.GroupInfo {
+	// TODO broadcast list info?
+	if jid.Server != types.GroupServer {
+		return nil
+	}
+	participants := make([]types.GroupParticipant, len(conv.GetParticipant()))
+	for i, pcp := range conv.GetParticipant() {
+		participantJID, _ := types.ParseJID(pcp.GetUserJid())
+		participants[i] = types.GroupParticipant{
+			JID:          participantJID,
+			IsAdmin:      pcp.GetRank() == waProto.GroupParticipant_ADMIN,
+			IsSuperAdmin: pcp.GetRank() == waProto.GroupParticipant_SUPERADMIN,
+		}
+	}
+	return &types.GroupInfo{
+		JID:          jid,
+		GroupName:    types.GroupName{Name: conv.GetName()},
+		Participants: participants,
+	}
+}
+
+// endregion
+// region Portal backfilling
+
+var (
+	PortalCreationDummyEvent = event.Type{Type: "fi.mau.dummy.portal_created", Class: event.MessageEventType}
+	BackfillDummyStateEvent  = event.Type{Type: "fi.mau.dummy.blank_backfill_state", Class: event.StateEventType}
+	BackfillEndDummyEvent    = event.Type{Type: "fi.mau.dummy.backfill_end", Class: event.MessageEventType}
+	PreBackfillDummyEvent    = event.Type{Type: "fi.mau.dummy.pre_backfill", Class: event.MessageEventType}
+)
+
+func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) {
+	portal.backfillLock.Lock()
+	defer portal.backfillLock.Unlock()
+
+	var historyBatch, newBatch mautrix.ReqBatchSend
+	var historyBatchInfos, newBatchInfos []*types.MessageInfo
+
+	firstMsgTimestamp := time.Unix(int64(messages[len(messages)-1].GetMessageTimestamp()), 0)
+
+	historyBatch.StateEventsAtStart = make([]*event.Event, 1)
+	newBatch.StateEventsAtStart = make([]*event.Event, 1)
+
+	// TODO remove the dummy state events after https://github.com/matrix-org/synapse/pull/11188
+	emptyStr := ""
+	dummyStateEvent := event.Event{
+		Type:      BackfillDummyStateEvent,
+		Sender:    portal.MainIntent().UserID,
+		StateKey:  &emptyStr,
+		Timestamp: firstMsgTimestamp.UnixMilli(),
+		Content:   event.Content{},
+	}
+	historyBatch.StateEventsAtStart[0] = &dummyStateEvent
+	newBatch.StateEventsAtStart[0] = &dummyStateEvent
+
+	addedMembers := make(map[id.UserID]*event.MemberEventContent)
+	addMember := func(puppet *Puppet) {
+		if _, alreadyAdded := addedMembers[puppet.MXID]; alreadyAdded {
+			return
+		}
+		mxid := puppet.MXID.String()
+		content := event.MemberEventContent{
+			Membership:  event.MembershipJoin,
+			Displayname: puppet.Displayname,
+			AvatarURL:   puppet.AvatarURL.CUString(),
+		}
+		inviteContent := content
+		inviteContent.Membership = event.MembershipInvite
+		historyBatch.StateEventsAtStart = append(historyBatch.StateEventsAtStart, &event.Event{
+			Type:      event.StateMember,
+			Sender:    portal.MainIntent().UserID,
+			StateKey:  &mxid,
+			Timestamp: firstMsgTimestamp.UnixMilli(),
+			Content:   event.Content{Parsed: &inviteContent},
+		}, &event.Event{
+			Type:      event.StateMember,
+			Sender:    puppet.MXID,
+			StateKey:  &mxid,
+			Timestamp: firstMsgTimestamp.UnixMilli(),
+			Content:   event.Content{Parsed: &content},
+		})
+		addedMembers[puppet.MXID] = &content
+	}
+
+	firstMessage := portal.bridge.DB.Message.GetFirstInChat(portal.Key)
+	lastMessage := portal.bridge.DB.Message.GetLastInChat(portal.Key)
+	var historyMaxTs, newMinTs time.Time
+
+	if portal.FirstEventID != "" || portal.NextBatchID != "" {
+		historyBatch.PrevEventID = portal.FirstEventID
+		historyBatch.BatchID = portal.NextBatchID
+		if firstMessage == nil && lastMessage == nil {
+			historyMaxTs = time.Now()
+		} else {
+			historyMaxTs = firstMessage.Timestamp
+		}
+	}
+	if lastMessage != nil {
+		newBatch.PrevEventID = lastMessage.MXID
+		newMinTs = lastMessage.Timestamp
+	}
+
+	portal.log.Infofln("Processing history sync with %d messages", len(messages))
+	// The messages are ordered newest to oldest, so iterate them in reverse order.
+	for i := len(messages) - 1; i >= 0; i-- {
+		webMsg := messages[i]
+		msgType := getMessageType(webMsg.GetMessage())
+		if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" {
+			if msgType != "ignore" {
+				portal.log.Debugfln("Skipping message %s with unknown type in backfill", webMsg.GetKey().GetId())
+			}
+			continue
+		}
+		info := portal.parseWebMessageInfo(webMsg)
+		if info == nil {
+			continue
+		}
+		var batch *mautrix.ReqBatchSend
+		var infos *[]*types.MessageInfo
+		var history bool
+		if !historyMaxTs.IsZero() && info.Timestamp.Before(historyMaxTs) {
+			batch, infos, history = &historyBatch, &historyBatchInfos, true
+		} else if !newMinTs.IsZero() && info.Timestamp.After(newMinTs) {
+			batch, infos = &newBatch, &newBatchInfos
+		} else {
+			continue
+		}
+		puppet := portal.getMessagePuppet(source, info)
+		var intent *appservice.IntentAPI
+		if portal.Key.JID == puppet.JID {
+			intent = puppet.DefaultIntent()
+		} else {
+			intent = puppet.IntentFor(portal)
+			if intent.IsCustomPuppet && !portal.bridge.Config.Bridge.HistorySync.DoublePuppetBackfill {
+				intent = puppet.DefaultIntent()
+				addMember(puppet)
+			}
+		}
+		converted := portal.convertMessage(intent, source, info, webMsg.GetMessage())
+		if converted == nil {
+			portal.log.Debugfln("Skipping unsupported message %s in backfill", info.ID)
+			continue
+		}
+		if history && !portal.IsPrivateChat() && !intent.IsCustomPuppet && !portal.bridge.StateStore.IsInRoom(portal.MXID, puppet.MXID) {
+			addMember(puppet)
+		}
+		err := portal.appendBatchEvents(converted, info, &batch.Events, infos)
+		if err != nil {
+			portal.log.Errorfln("Error handling message %s during backfill: %v", info.ID, err)
+		}
+	}
+
+	if (len(historyBatch.Events) > 0 && len(historyBatch.BatchID) == 0) || len(newBatch.Events) > 0 {
+		portal.log.Debugln("Sending a dummy event to avoid forward extremity errors with backfill")
+		_, err := portal.MainIntent().SendMessageEvent(portal.MXID, PreBackfillDummyEvent, struct{}{})
+		if err != nil {
+			portal.log.Warnln("Error sending pre-backfill dummy event:", err)
+		}
+	}
+
+	if len(historyBatch.Events) > 0 && len(historyBatch.PrevEventID) > 0 {
+		portal.log.Infofln("Sending %d historical messages...", len(historyBatch.Events))
+		historyResp, err := portal.MainIntent().BatchSend(portal.MXID, &historyBatch)
+		if err != nil {
+			portal.log.Errorln("Error sending batch of historical messages:", err)
+		} else {
+			portal.finishBatch(historyResp.EventIDs, historyBatchInfos)
+			portal.NextBatchID = historyResp.NextBatchID
+			portal.Update()
+			// If batchID is non-empty, it means this is backfilling very old messages, and we don't need a post-backfill dummy.
+			if historyBatch.BatchID == "" {
+				portal.sendPostBackfillDummy(time.UnixMilli(historyBatch.Events[len(historyBatch.Events)-1].Timestamp))
+			}
+		}
+	}
+
+	if len(newBatch.Events) > 0 && len(newBatch.PrevEventID) > 0 {
+		portal.log.Infofln("Sending %d new messages...", len(newBatch.Events))
+		newResp, err := portal.MainIntent().BatchSend(portal.MXID, &newBatch)
+		if err != nil {
+			portal.log.Errorln("Error sending batch of new messages:", err)
+		} else {
+			portal.finishBatch(newResp.EventIDs, newBatchInfos)
+			portal.sendPostBackfillDummy(time.UnixMilli(newBatch.Events[len(newBatch.Events)-1].Timestamp))
+		}
+	}
+}
+
+func (portal *Portal) parseWebMessageInfo(webMsg *waProto.WebMessageInfo) *types.MessageInfo {
+	info := types.MessageInfo{
+		MessageSource: types.MessageSource{
+			Chat:     portal.Key.JID,
+			IsFromMe: webMsg.GetKey().GetFromMe(),
+			IsGroup:  false,
+		},
+		ID:        webMsg.GetKey().GetId(),
+		PushName:  webMsg.GetPushName(),
+		Timestamp: time.Unix(int64(webMsg.GetMessageTimestamp()), 0),
+	}
+	var err error
+	if info.IsFromMe {
+		info.Sender = portal.Key.Receiver
+	} else if portal.IsPrivateChat() {
+		info.Sender = portal.Key.JID
+	} else if webMsg.GetParticipant() != "" {
+		info.Sender, err = types.ParseJID(webMsg.GetParticipant())
+	} else if webMsg.GetKey().GetParticipant() != "" {
+		info.Sender, err = types.ParseJID(webMsg.GetKey().GetParticipant())
+	}
+	if info.Sender.IsEmpty() {
+		portal.log.Warnfln("Failed to get sender of message %s (parse error: %v)", info.ID, err)
+		return nil
+	}
+	return &info
+}
+
+func (portal *Portal) appendBatchEvents(converted *ConvertedMessage, info *types.MessageInfo, eventsArray *[]*event.Event, infoArray *[]*types.MessageInfo) error {
+	mainEvt, err := portal.wrapBatchEvent(info, converted.Intent, converted.Type, converted.Content, converted.Extra)
+	if err != nil {
+		return err
+	}
+	if converted.Caption != nil {
+		captionEvt, err := portal.wrapBatchEvent(info, converted.Intent, converted.Type, converted.Caption, nil)
+		if err != nil {
+			return err
+		}
+		*eventsArray = append(*eventsArray, mainEvt, captionEvt)
+		*infoArray = append(*infoArray, nil, info)
+	} else {
+		*eventsArray = append(*eventsArray, mainEvt)
+		*infoArray = append(*infoArray, info)
+	}
+	return nil
+}
+
+const backfillIDField = "fi.mau.whatsapp.backfill_msg_id"
+
+func (portal *Portal) wrapBatchEvent(info *types.MessageInfo, intent *appservice.IntentAPI, eventType event.Type, content *event.MessageEventContent, extraContent map[string]interface{}) (*event.Event, error) {
+	if extraContent == nil {
+		extraContent = map[string]interface{}{}
+	}
+	extraContent[backfillIDField] = info.ID
+	if intent.IsCustomPuppet {
+		extraContent[doublePuppetField] = intent.IsCustomPuppet
+	}
+	wrappedContent := event.Content{
+		Parsed: content,
+		Raw:    extraContent,
+	}
+	newEventType, err := portal.encrypt(&wrappedContent, eventType)
+	if err != nil {
+		return nil, err
+	}
+	return &event.Event{
+		Sender:    intent.UserID,
+		Type:      newEventType,
+		Timestamp: info.Timestamp.UnixMilli(),
+		Content:   wrappedContent,
+	}, nil
+}
+
+func (portal *Portal) finishBatch(eventIDs []id.EventID, infos []*types.MessageInfo) {
+	if len(eventIDs) != len(infos) {
+		portal.log.Errorfln("Length of event IDs (%d) and message infos (%d) doesn't match! Using slow path for mapping event IDs", len(eventIDs), len(infos))
+		infoMap := make(map[types.MessageID]*types.MessageInfo, len(infos))
+		for _, info := range infos {
+			infoMap[info.ID] = info
+		}
+		for _, eventID := range eventIDs {
+			if evt, err := portal.MainIntent().GetEvent(portal.MXID, eventID); err != nil {
+				portal.log.Warnfln("Failed to get event %s to register it in the database: %v", eventID, err)
+			} else if msgID, ok := evt.Content.Raw[backfillIDField].(string); !ok {
+				portal.log.Warnfln("Event %s doesn't include the WhatsApp message ID", eventID)
+			} else if info, ok := infoMap[types.MessageID(msgID)]; !ok {
+				portal.log.Warnfln("Didn't find info of message %s (event %s) to register it in the database", msgID, eventID)
+			} else {
+				portal.markHandled(nil, info, eventID, true, false, false)
+			}
+		}
+	} else {
+		for i := 0; i < len(infos); i++ {
+			if infos[i] != nil {
+				portal.markHandled(nil, infos[i], eventIDs[i], true, false, false)
+			}
+		}
+		portal.log.Infofln("Successfully sent %d events", len(eventIDs))
+	}
+}
+
+func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time) {
+	resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, BackfillEndDummyEvent, struct{}{})
+	if err != nil {
+		portal.log.Errorln("Error sending post-backfill dummy event:", err)
+		return
+	}
+	msg := portal.bridge.DB.Message.New()
+	msg.Chat = portal.Key
+	msg.MXID = resp.EventID
+	msg.JID = types.MessageID(resp.EventID)
+	msg.Timestamp = lastTimestamp.Add(1 * time.Second)
+	msg.Sent = true
+	msg.Insert()
+}
+
+// endregion

+ 3 - 280
portal.go

@@ -908,281 +908,6 @@ func (portal *Portal) RestrictMetadataChanges(restrict bool) id.EventID {
 	return ""
 }
 
-func (portal *Portal) parseWebMessageInfo(webMsg *waProto.WebMessageInfo) *types.MessageInfo {
-	info := types.MessageInfo{
-		MessageSource: types.MessageSource{
-			Chat:     portal.Key.JID,
-			IsFromMe: webMsg.GetKey().GetFromMe(),
-			IsGroup:  false,
-		},
-		ID:        webMsg.GetKey().GetId(),
-		PushName:  webMsg.GetPushName(),
-		Timestamp: time.Unix(int64(webMsg.GetMessageTimestamp()), 0),
-	}
-	var err error
-	if info.IsFromMe {
-		info.Sender = portal.Key.Receiver
-	} else if portal.IsPrivateChat() {
-		info.Sender = portal.Key.JID
-	} else if webMsg.GetParticipant() != "" {
-		info.Sender, err = types.ParseJID(webMsg.GetParticipant())
-	} else if webMsg.GetKey().GetParticipant() != "" {
-		info.Sender, err = types.ParseJID(webMsg.GetKey().GetParticipant())
-	}
-	if info.Sender.IsEmpty() {
-		portal.log.Warnfln("Failed to get sender of message %s (parse error: %v)", info.ID, err)
-		return nil
-	}
-	return &info
-}
-
-const backfillIDField = "fi.mau.whatsapp.backfill_msg_id"
-const doublePuppetField = "net.maunium.whatsapp.puppet"
-
-func (portal *Portal) wrapBatchEvent(info *types.MessageInfo, intent *appservice.IntentAPI, eventType event.Type, content *event.MessageEventContent, extraContent map[string]interface{}) (*event.Event, error) {
-	if extraContent == nil {
-		extraContent = map[string]interface{}{}
-	}
-	extraContent[backfillIDField] = info.ID
-	if intent.IsCustomPuppet {
-		extraContent[doublePuppetField] = intent.IsCustomPuppet
-	}
-	wrappedContent := event.Content{
-		Parsed: content,
-		Raw:    extraContent,
-	}
-	newEventType, err := portal.encrypt(&wrappedContent, eventType)
-	if err != nil {
-		return nil, err
-	}
-	return &event.Event{
-		Sender:    intent.UserID,
-		Type:      newEventType,
-		Timestamp: info.Timestamp.UnixMilli(),
-		Content:   wrappedContent,
-	}, nil
-}
-
-func (portal *Portal) appendBatchEvents(converted *ConvertedMessage, info *types.MessageInfo, eventsArray *[]*event.Event, infoArray *[]*types.MessageInfo) error {
-	mainEvt, err := portal.wrapBatchEvent(info, converted.Intent, converted.Type, converted.Content, converted.Extra)
-	if err != nil {
-		return err
-	}
-	if converted.Caption != nil {
-		captionEvt, err := portal.wrapBatchEvent(info, converted.Intent, converted.Type, converted.Caption, nil)
-		if err != nil {
-			return err
-		}
-		*eventsArray = append(*eventsArray, mainEvt, captionEvt)
-		*infoArray = append(*infoArray, nil, info)
-	} else {
-		*eventsArray = append(*eventsArray, mainEvt)
-		*infoArray = append(*infoArray, info)
-	}
-	return nil
-}
-
-func (portal *Portal) finishBatch(eventIDs []id.EventID, infos []*types.MessageInfo) {
-	if len(eventIDs) != len(infos) {
-		portal.log.Errorfln("Length of event IDs (%d) and message infos (%d) doesn't match! Using slow path for mapping event IDs", len(eventIDs), len(infos))
-		infoMap := make(map[types.MessageID]*types.MessageInfo, len(infos))
-		for _, info := range infos {
-			infoMap[info.ID] = info
-		}
-		for _, eventID := range eventIDs {
-			if evt, err := portal.MainIntent().GetEvent(portal.MXID, eventID); err != nil {
-				portal.log.Warnfln("Failed to get event %s to register it in the database: %v", eventID, err)
-			} else if msgID, ok := evt.Content.Raw[backfillIDField].(string); !ok {
-				portal.log.Warnfln("Event %s doesn't include the WhatsApp message ID", eventID)
-			} else if info, ok := infoMap[types.MessageID(msgID)]; !ok {
-				portal.log.Warnfln("Didn't find info of message %s (event %s) to register it in the database", msgID, eventID)
-			} else {
-				portal.markHandled(nil, info, eventID, true, false, false)
-			}
-		}
-	} else {
-		for i := 0; i < len(infos); i++ {
-			if infos[i] != nil {
-				portal.markHandled(nil, infos[i], eventIDs[i], true, false, false)
-			}
-		}
-		portal.log.Infofln("Successfully sent %d events", len(eventIDs))
-	}
-}
-
-func (portal *Portal) backfill(source *User, messages []*waProto.HistorySyncMsg) {
-	portal.backfillLock.Lock()
-	defer portal.backfillLock.Unlock()
-
-	var historyBatch, newBatch mautrix.ReqBatchSend
-	var historyBatchInfos, newBatchInfos []*types.MessageInfo
-
-	firstMsgTimestamp := time.Unix(int64(messages[len(messages)-1].GetMessage().GetMessageTimestamp()), 0)
-
-	historyBatch.StateEventsAtStart = make([]*event.Event, 1)
-	newBatch.StateEventsAtStart = make([]*event.Event, 1)
-
-	// TODO remove the dummy state events after https://github.com/matrix-org/synapse/pull/11188
-	emptyStr := ""
-	dummyStateEvent := event.Event{
-		Type:      BackfillDummyStateEvent,
-		Sender:    portal.MainIntent().UserID,
-		StateKey:  &emptyStr,
-		Timestamp: firstMsgTimestamp.UnixMilli(),
-		Content:   event.Content{},
-	}
-	historyBatch.StateEventsAtStart[0] = &dummyStateEvent
-	newBatch.StateEventsAtStart[0] = &dummyStateEvent
-
-	addedMembers := make(map[id.UserID]*event.MemberEventContent)
-	addMember := func(puppet *Puppet) {
-		if _, alreadyAdded := addedMembers[puppet.MXID]; alreadyAdded {
-			return
-		}
-		mxid := puppet.MXID.String()
-		content := event.MemberEventContent{
-			Membership:  event.MembershipJoin,
-			Displayname: puppet.Displayname,
-			AvatarURL:   puppet.AvatarURL.CUString(),
-		}
-		inviteContent := content
-		inviteContent.Membership = event.MembershipInvite
-		historyBatch.StateEventsAtStart = append(historyBatch.StateEventsAtStart, &event.Event{
-			Type:      event.StateMember,
-			Sender:    portal.MainIntent().UserID,
-			StateKey:  &mxid,
-			Timestamp: firstMsgTimestamp.UnixMilli(),
-			Content:   event.Content{Parsed: &inviteContent},
-		}, &event.Event{
-			Type:      event.StateMember,
-			Sender:    puppet.MXID,
-			StateKey:  &mxid,
-			Timestamp: firstMsgTimestamp.UnixMilli(),
-			Content:   event.Content{Parsed: &content},
-		})
-		addedMembers[puppet.MXID] = &content
-	}
-
-	firstMessage := portal.bridge.DB.Message.GetFirstInChat(portal.Key)
-	lastMessage := portal.bridge.DB.Message.GetLastInChat(portal.Key)
-	var historyMaxTs, newMinTs time.Time
-
-	if portal.FirstEventID != "" || portal.NextBatchID != "" {
-		historyBatch.PrevEventID = portal.FirstEventID
-		historyBatch.BatchID = portal.NextBatchID
-		if firstMessage == nil && lastMessage == nil {
-			historyMaxTs = time.Now()
-		} else {
-			historyMaxTs = firstMessage.Timestamp
-		}
-	}
-	if lastMessage != nil {
-		newBatch.PrevEventID = lastMessage.MXID
-		newMinTs = lastMessage.Timestamp
-	}
-
-	portal.log.Infofln("Processing history sync with %d messages", len(messages))
-	// The messages are ordered newest to oldest, so iterate them in reverse order.
-	for i := len(messages) - 1; i >= 0; i-- {
-		wrappedMsg := messages[i]
-		webMsg := wrappedMsg.GetMessage()
-		msgType := getMessageType(webMsg.GetMessage())
-		if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" {
-			if msgType != "ignore" {
-				portal.log.Debugfln("Skipping message %s with unknown type in backfill", webMsg.GetKey().GetId())
-			}
-			continue
-		}
-		info := portal.parseWebMessageInfo(webMsg)
-		if info == nil {
-			continue
-		}
-		var batch *mautrix.ReqBatchSend
-		var infos *[]*types.MessageInfo
-		var history bool
-		if !historyMaxTs.IsZero() && info.Timestamp.Before(historyMaxTs) {
-			batch, infos, history = &historyBatch, &historyBatchInfos, true
-		} else if !newMinTs.IsZero() && info.Timestamp.After(newMinTs) {
-			batch, infos = &newBatch, &newBatchInfos
-		} else {
-			continue
-		}
-		puppet := portal.getMessagePuppet(source, info)
-		var intent *appservice.IntentAPI
-		if portal.Key.JID == puppet.JID {
-			intent = puppet.DefaultIntent()
-		} else {
-			intent = puppet.IntentFor(portal)
-			if intent.IsCustomPuppet && !portal.bridge.Config.Bridge.HistorySync.DoublePuppetBackfill {
-				intent = puppet.DefaultIntent()
-				addMember(puppet)
-			}
-		}
-		converted := portal.convertMessage(intent, source, info, webMsg.GetMessage())
-		if converted == nil {
-			portal.log.Debugfln("Skipping unsupported message %s in backfill", info.ID)
-			continue
-		}
-		if history && !portal.IsPrivateChat() && !intent.IsCustomPuppet && !portal.bridge.StateStore.IsInRoom(portal.MXID, puppet.MXID) {
-			addMember(puppet)
-		}
-		err := portal.appendBatchEvents(converted, info, &batch.Events, infos)
-		if err != nil {
-			portal.log.Errorfln("Error handling message %s during backfill: %v", info.ID, err)
-		}
-	}
-
-	if (len(historyBatch.Events) > 0 && len(historyBatch.BatchID) == 0) || len(newBatch.Events) > 0 {
-		portal.log.Debugln("Sending a dummy event to avoid forward extremity errors with backfill")
-		_, err := portal.MainIntent().SendMessageEvent(portal.MXID, PreBackfillDummyEvent, struct{}{})
-		if err != nil {
-			portal.log.Warnln("Error sending pre-backfill dummy event:", err)
-		}
-	}
-
-	if len(historyBatch.Events) > 0 && len(historyBatch.PrevEventID) > 0 {
-		portal.log.Infofln("Sending %d historical messages...", len(historyBatch.Events))
-		historyResp, err := portal.MainIntent().BatchSend(portal.MXID, &historyBatch)
-		if err != nil {
-			portal.log.Errorln("Error sending batch of historical messages:", err)
-		} else {
-			portal.finishBatch(historyResp.EventIDs, historyBatchInfos)
-			portal.NextBatchID = historyResp.NextBatchID
-			portal.Update()
-			// If batchID is non-empty, it means this is backfilling very old messages, and we don't need a post-backfill dummy.
-			if historyBatch.BatchID == "" {
-				portal.sendPostBackfillDummy(time.UnixMilli(historyBatch.Events[len(historyBatch.Events)-1].Timestamp))
-			}
-		}
-	}
-
-	if len(newBatch.Events) > 0 && len(newBatch.PrevEventID) > 0 {
-		portal.log.Infofln("Sending %d new messages...", len(newBatch.Events))
-		newResp, err := portal.MainIntent().BatchSend(portal.MXID, &newBatch)
-		if err != nil {
-			portal.log.Errorln("Error sending batch of new messages:", err)
-		} else {
-			portal.finishBatch(newResp.EventIDs, newBatchInfos)
-			portal.sendPostBackfillDummy(time.UnixMilli(newBatch.Events[len(newBatch.Events)-1].Timestamp))
-		}
-	}
-}
-
-func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time) {
-	resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, BackfillEndDummyEvent, struct{}{})
-	if err != nil {
-		portal.log.Errorln("Error sending post-backfill dummy event:", err)
-		return
-	}
-	msg := portal.bridge.DB.Message.New()
-	msg.Chat = portal.Key
-	msg.MXID = resp.EventID
-	msg.JID = types.MessageID(resp.EventID)
-	msg.Timestamp = lastTimestamp.Add(1 * time.Second)
-	msg.Sent = true
-	msg.Insert()
-}
-
 func (portal *Portal) getBridgeInfo() (string, event.BridgeEventContent) {
 	bridgeInfo := event.BridgeEventContent{
 		BridgeBot: portal.bridge.Bot.UserID,
@@ -1221,11 +946,6 @@ func (portal *Portal) UpdateBridgeInfo() {
 	}
 }
 
-var PortalCreationDummyEvent = event.Type{Type: "fi.mau.dummy.portal_created", Class: event.MessageEventType}
-var BackfillDummyStateEvent = event.Type{Type: "fi.mau.dummy.blank_backfill_state", Class: event.StateEventType}
-var BackfillEndDummyEvent = event.Type{Type: "fi.mau.dummy.backfill_end", Class: event.MessageEventType}
-var PreBackfillDummyEvent = event.Type{Type: "fi.mau.dummy.pre_backfill", Class: event.MessageEventType}
-
 func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, isFullInfo bool) error {
 	portal.roomCreateLock.Lock()
 	defer portal.roomCreateLock.Unlock()
@@ -1283,6 +1003,7 @@ func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, i
 				portal.log.Warnfln("Failed to get group info through %s: %v", user.JID, err)
 			} else {
 				groupInfo = foundInfo
+				isFullInfo = true
 			}
 		}
 		if groupInfo != nil {
@@ -1495,6 +1216,8 @@ func (portal *Portal) encrypt(content *event.Content, eventType event.Type) (eve
 	return eventType, nil
 }
 
+const doublePuppetField = "net.maunium.whatsapp.puppet"
+
 func (portal *Portal) sendMessage(intent *appservice.IntentAPI, eventType event.Type, content *event.MessageEventContent, extraContent map[string]interface{}, timestamp int64) (*mautrix.RespSendEvent, error) {
 	wrappedContent := event.Content{Parsed: content, Raw: extraContent}
 	if timestamp != 0 && intent.IsCustomPuppet {

+ 1 - 128
user.go

@@ -22,14 +22,12 @@ import (
 	"errors"
 	"fmt"
 	"net/http"
-	"sort"
 	"sync"
 	"time"
 
 	log "maunium.net/go/maulogger/v2"
 
 	"go.mau.fi/whatsmeow/appstate"
-	waProto "go.mau.fi/whatsmeow/binary/proto"
 	"maunium.net/go/mautrix/appservice"
 	"maunium.net/go/mautrix/pushrules"
 
@@ -164,14 +162,7 @@ func (bridge *Bridge) NewUser(dbUser *database.User) *User {
 	user.RelayWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelayWhitelisted(user.MXID)
 	user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID)
 	user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID)
-	go func() {
-		for evt := range user.historySyncs {
-			if evt == nil {
-				return
-			}
-			user.handleHistorySync(evt.Data)
-		}
-	}()
+	go user.handleHistorySyncsLoop()
 	return user
 }
 
@@ -348,124 +339,6 @@ func (user *User) sendMarkdownBridgeAlert(formatString string, args ...interface
 	}
 }
 
-func containsSupportedMessages(conv *waProto.Conversation) bool {
-	for _, msg := range conv.GetMessages() {
-		if containsSupportedMessage(msg.GetMessage().GetMessage()) {
-			return true
-		}
-	}
-	return false
-}
-
-type portalToBackfill struct {
-	portal *Portal
-	conv   *waProto.Conversation
-}
-
-type conversationList []*waProto.Conversation
-
-var _ sort.Interface = (conversationList)(nil)
-
-func (c conversationList) Len() int {
-	return len(c)
-}
-
-func (c conversationList) Less(i, j int) bool {
-	return c[i].GetConversationTimestamp() < c[j].GetConversationTimestamp()
-}
-
-func (c conversationList) Swap(i, j int) {
-	c[i], c[j] = c[j], c[i]
-}
-
-func (user *User) handleHistorySync(evt *waProto.HistorySync) {
-	if evt.GetSyncType() != waProto.HistorySync_RECENT && evt.GetSyncType() != waProto.HistorySync_FULL {
-		return
-	}
-	user.log.Infofln("Handling history sync with type %s, chunk order %d, progress %d%%", evt.GetSyncType(), evt.GetChunkOrder(), evt.GetProgress())
-	maxAge := user.bridge.Config.Bridge.HistorySync.MaxAge
-	minLastMsgToCreate := time.Now().Add(-time.Duration(maxAge) * time.Second)
-	createRooms := user.bridge.Config.Bridge.HistorySync.CreatePortals
-
-	conversations := conversationList(evt.GetConversations())
-	sort.Sort(sort.Reverse(conversations))
-	portalsToBackfill := make(chan portalToBackfill, len(conversations))
-
-	var backfillWait sync.WaitGroup
-	backfillWait.Add(1)
-	go func() {
-		for ptb := range portalsToBackfill {
-			user.log.Debugln("Bridging history sync payload for", ptb.portal.Key.JID)
-			ptb.portal.backfill(user, ptb.conv.GetMessages())
-			if !ptb.conv.GetMarkedAsUnread() && ptb.conv.GetUnreadCount() == 0 {
-				user.markSelfReadFull(ptb.portal)
-			}
-		}
-		backfillWait.Done()
-	}()
-
-	for _, conv := range conversations {
-		jid, err := types.ParseJID(conv.GetId())
-		if err != nil {
-			user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err)
-			continue
-		}
-
-		muteEnd := time.Unix(int64(conv.GetMuteEndTime()), 0)
-		if muteEnd.After(time.Now()) {
-			_ = user.Client.Store.ChatSettings.PutMutedUntil(jid, muteEnd)
-		}
-		if conv.GetArchived() {
-			_ = user.Client.Store.ChatSettings.PutArchived(jid, true)
-		}
-		if conv.GetPinned() > 0 {
-			_ = user.Client.Store.ChatSettings.PutPinned(jid, true)
-		}
-
-		lastMsg := time.Unix(int64(conv.GetConversationTimestamp()), 0)
-		portal := user.GetPortalByJID(jid)
-		if createRooms && len(portal.MXID) == 0 {
-			if !containsSupportedMessages(conv) {
-				user.log.Debugfln("Not creating portal for %s: no interesting messages found", portal.Key.JID)
-				continue
-			} else if maxAge > 0 && !lastMsg.After(minLastMsgToCreate) {
-				user.log.Debugfln("Not creating portal for %s: last message older than limit (%s)", portal.Key.JID, lastMsg)
-				continue
-			}
-			user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling")
-			participants := make([]types.GroupParticipant, len(conv.GetParticipant()))
-			for i, pcp := range conv.GetParticipant() {
-				participantJID, _ := types.ParseJID(pcp.GetUserJid())
-				participants[i] = types.GroupParticipant{
-					JID:          participantJID,
-					IsAdmin:      pcp.GetRank() == waProto.GroupParticipant_ADMIN,
-					IsSuperAdmin: pcp.GetRank() == waProto.GroupParticipant_SUPERADMIN,
-				}
-			}
-			partialInfo := types.GroupInfo{
-				JID:                  jid,
-				GroupName:            types.GroupName{Name: conv.GetName()},
-				Participants:         participants,
-			}
-			err = portal.CreateMatrixRoom(user, &partialInfo, false)
-			if err != nil {
-				user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err)
-				continue
-			}
-		}
-		if len(portal.MXID) == 0 {
-			user.log.Debugln("No room created, not bridging history sync payload for", portal.Key.JID)
-		} else if !user.bridge.Config.Bridge.HistorySync.Backfill {
-			user.log.Debugln("Backfill is disabled, not bridging history sync payload for", portal.Key.JID)
-		} else {
-			portalsToBackfill <- portalToBackfill{portal, conv}
-		}
-	}
-	close(portalsToBackfill)
-	backfillWait.Wait()
-	user.log.Infofln("Finished handling history sync with type %s, chunk order %d, progress %d%%", evt.GetSyncType(), evt.GetChunkOrder(), evt.GetProgress())
-}
-
 const callEventMaxAge = 15 * time.Minute
 
 func (user *User) handleCallStart(sender types.JID, id, callType string, ts time.Time) {