Browse Source

Merge pull request #461 from mautrix/sumner/bri-2227

history sync: implement prioritized backfill
Sumner Evans 3 years ago
parent
commit
37b8065db5

+ 69 - 0
backfillqueue.go

@@ -0,0 +1,69 @@
+// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
+// Copyright (C) 2021 Tulir Asokan, Sumner Evans
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+package main
+
+import (
+	"time"
+
+	log "maunium.net/go/maulogger/v2"
+	"maunium.net/go/mautrix-whatsapp/database"
+)
+
+type BackfillQueue struct {
+	BackfillQuery             *database.BackfillQuery
+	ImmediateBackfillRequests chan *database.Backfill
+	DeferredBackfillRequests  chan *database.Backfill
+	ReCheckQueue              chan bool
+
+	log log.Logger
+}
+
+func (bq *BackfillQueue) RunLoops(user *User) {
+	go bq.immediateBackfillLoop(user)
+	bq.deferredBackfillLoop(user)
+}
+
+func (bq *BackfillQueue) immediateBackfillLoop(user *User) {
+	for {
+		if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); backfill != nil {
+			bq.ImmediateBackfillRequests <- backfill
+			backfill.MarkDone()
+		} else {
+			select {
+			case <-bq.ReCheckQueue:
+			case <-time.After(10 * time.Second):
+			}
+		}
+	}
+}
+
+func (bq *BackfillQueue) deferredBackfillLoop(user *User) {
+	for {
+		// Finish all immediate backfills before doing the deferred ones.
+		if immediate := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); immediate != nil {
+			time.Sleep(10 * time.Second)
+			continue
+		}
+
+		if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil {
+			bq.DeferredBackfillRequests <- backfill
+			backfill.MarkDone()
+		} else {
+			time.Sleep(10 * time.Second)
+		}
+	}
+}

+ 40 - 2
commands.go

@@ -31,6 +31,7 @@ import (
 	"github.com/tidwall/gjson"
 
 	"maunium.net/go/maulogger/v2"
+	"maunium.net/go/mautrix-whatsapp/database"
 
 	"go.mau.fi/whatsmeow"
 	"go.mau.fi/whatsmeow/appstate"
@@ -140,7 +141,7 @@ func (handler *CommandHandler) CommandMux(ce *CommandEvent) {
 		handler.CommandLogout(ce)
 	case "toggle":
 		handler.CommandToggle(ce)
-	case "set-relay", "unset-relay", "login-matrix", "sync", "list", "search", "open", "pm", "invite-link", "resolve", "resolve-link", "join", "create", "accept":
+	case "set-relay", "unset-relay", "login-matrix", "sync", "list", "search", "open", "pm", "invite-link", "resolve", "resolve-link", "join", "create", "accept", "backfill":
 		if !ce.User.HasSession() {
 			ce.Reply("You are not logged in. Use the `login` command to log into WhatsApp.")
 			return
@@ -176,6 +177,8 @@ func (handler *CommandHandler) CommandMux(ce *CommandEvent) {
 			handler.CommandCreate(ce)
 		case "accept":
 			handler.CommandAccept(ce)
+		case "backfill":
+			handler.CommandBackfill(ce)
 		}
 	default:
 		ce.Reply("Unknown command, use the `help` command for help.")
@@ -745,6 +748,7 @@ func (handler *CommandHandler) CommandHelp(ce *CommandEvent) {
 		cmdPrefix + cmdSetPowerLevelHelp,
 		cmdPrefix + cmdDeletePortalHelp,
 		cmdPrefix + cmdDeleteAllPortalsHelp,
+		cmdPrefix + cmdBackfillHelp,
 	}, "\n* "))
 }
 
@@ -835,6 +839,40 @@ func (handler *CommandHandler) CommandDeleteAllPortals(ce *CommandEvent) {
 	}()
 }
 
+const cmdBackfillHelp = `backfill [batch size] [batch delay] - Backfill all messages the portal.`
+
+func (handler *CommandHandler) CommandBackfill(ce *CommandEvent) {
+	if ce.Portal == nil {
+		ce.Reply("This is not a portal room")
+		return
+	}
+	if !ce.Bridge.Config.Bridge.HistorySync.Backfill {
+		ce.Reply("Backfill is not enabled for this bridge.")
+		return
+	}
+	batchSize := 100
+	batchDelay := 5
+	if len(ce.Args) >= 1 {
+		var err error
+		batchSize, err = strconv.Atoi(ce.Args[0])
+		if err != nil || batchSize < 1 {
+			ce.Reply("\"%s\" isn't a valid batch size", ce.Args[0])
+			return
+		}
+	}
+	if len(ce.Args) >= 2 {
+		var err error
+		batchDelay, err = strconv.Atoi(ce.Args[0])
+		if err != nil || batchSize < 0 {
+			ce.Reply("\"%s\" isn't a valid batch delay", ce.Args[1])
+			return
+		}
+	}
+	backfill := ce.Portal.bridge.DB.BackfillQuery.NewWithValues(ce.User.MXID, database.BackfillImmediate, 0, &ce.Portal.Key, nil, nil, batchSize, -1, batchDelay)
+	backfill.Insert()
+	ce.User.BackfillQueue.ReCheckQueue <- true
+}
+
 const cmdListHelp = `list <contacts|groups> [page] [items per page] - Get a list of all contacts and groups.`
 
 func matchesQuery(str string, query string) bool {
@@ -1015,7 +1053,7 @@ func (handler *CommandHandler) CommandOpen(ce *CommandEvent) {
 		portal.UpdateMatrixRoom(ce.User, info)
 		ce.Reply("Portal room synced.")
 	} else {
-		err = portal.CreateMatrixRoom(ce.User, info, true)
+		err = portal.CreateMatrixRoom(ce.User, info, true, true)
 		if err != nil {
 			ce.Reply("Failed to create room: %v", err)
 		} else {

+ 18 - 5
config/bridge.go

@@ -28,6 +28,12 @@ import (
 	"maunium.net/go/mautrix/id"
 )
 
+type DeferredConfig struct {
+	StartDaysAgo   int `yaml:"start_days_ago"`
+	MaxBatchEvents int `yaml:"max_batch_events"`
+	BatchDelay     int `yaml:"batch_delay"`
+}
+
 type BridgeConfig struct {
 	UsernameTemplate    string `yaml:"username_template"`
 	DisplaynameTemplate string `yaml:"displayname_template"`
@@ -40,11 +46,18 @@ type BridgeConfig struct {
 	IdentityChangeNotices bool `yaml:"identity_change_notices"`
 
 	HistorySync struct {
-		CreatePortals        bool  `yaml:"create_portals"`
-		MaxAge               int64 `yaml:"max_age"`
-		Backfill             bool  `yaml:"backfill"`
-		DoublePuppetBackfill bool  `yaml:"double_puppet_backfill"`
-		RequestFullSync      bool  `yaml:"request_full_sync"`
+		CreatePortals           bool `yaml:"create_portals"`
+		Backfill                bool `yaml:"backfill"`
+		DoublePuppetBackfill    bool `yaml:"double_puppet_backfill"`
+		RequestFullSync         bool `yaml:"request_full_sync"`
+		MaxInitialConversations int  `yaml:"max_initial_conversations"`
+
+		Immediate struct {
+			WorkerCount int `yaml:"worker_count"`
+			MaxEvents   int `yaml:"max_events"`
+		} `yaml:"immediate"`
+
+		Deferred []DeferredConfig `yaml:"deferred"`
 	} `yaml:"history_sync"`
 	UserAvatarSync    bool `yaml:"user_avatar_sync"`
 	BridgeMatrixLeave bool `yaml:"bridge_matrix_leave"`

+ 4 - 1
config/upgrade.go

@@ -78,10 +78,13 @@ func (helper *UpgradeHelper) doUpgrade() {
 	helper.Copy(Bool, "bridge", "call_start_notices")
 	helper.Copy(Bool, "bridge", "identity_change_notices")
 	helper.Copy(Bool, "bridge", "history_sync", "create_portals")
-	helper.Copy(Int, "bridge", "history_sync", "max_age")
 	helper.Copy(Bool, "bridge", "history_sync", "backfill")
 	helper.Copy(Bool, "bridge", "history_sync", "double_puppet_backfill")
 	helper.Copy(Bool, "bridge", "history_sync", "request_full_sync")
+	helper.Copy(Int, "bridge", "history_sync", "max_initial_conversations")
+	helper.Copy(Int, "bridge", "history_sync", "immediate", "worker_count")
+	helper.Copy(Int, "bridge", "history_sync", "immediate", "max_events")
+	helper.Copy(List, "bridge", "history_sync", "deferred")
 	helper.Copy(Bool, "bridge", "user_avatar_sync")
 	helper.Copy(Bool, "bridge", "bridge_matrix_leave")
 	helper.Copy(Bool, "bridge", "sync_with_custom_puppets")

+ 151 - 0
database/backfillqueue.go

@@ -0,0 +1,151 @@
+// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
+// Copyright (C) 2021 Tulir Asokan, Sumner Evans
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+package database
+
+import (
+	"database/sql"
+	"errors"
+	"time"
+
+	log "maunium.net/go/maulogger/v2"
+	"maunium.net/go/mautrix/id"
+)
+
+type BackfillType int
+
+const (
+	BackfillImmediate BackfillType = 0
+	BackfillDeferred               = 1
+)
+
+type BackfillQuery struct {
+	db  *Database
+	log log.Logger
+}
+
+func (bq *BackfillQuery) New() *Backfill {
+	return &Backfill{
+		db:     bq.db,
+		log:    bq.log,
+		Portal: &PortalKey{},
+	}
+}
+
+func (bq *BackfillQuery) NewWithValues(userID id.UserID, backfillType BackfillType, priority int, portal *PortalKey, timeStart *time.Time, timeEnd *time.Time, maxBatchEvents, maxTotalEvents, batchDelay int) *Backfill {
+	return &Backfill{
+		db:             bq.db,
+		log:            bq.log,
+		UserID:         userID,
+		BackfillType:   backfillType,
+		Priority:       priority,
+		Portal:         portal,
+		TimeStart:      timeStart,
+		TimeEnd:        timeEnd,
+		MaxBatchEvents: maxBatchEvents,
+		MaxTotalEvents: maxTotalEvents,
+		BatchDelay:     batchDelay,
+	}
+}
+
+const (
+	getNextBackfillQuery = `
+		SELECT queue_id, user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay
+		  FROM backfill_queue
+		 WHERE user_mxid=$1
+		   AND type=$2
+		   AND completed_at IS NULL
+	  ORDER BY priority, queue_id
+	     LIMIT 1
+	`
+)
+
+/// Returns the next backfill to perform
+func (bq *BackfillQuery) GetNext(userID id.UserID, backfillType BackfillType) (backfill *Backfill) {
+	rows, err := bq.db.Query(getNextBackfillQuery, userID, backfillType)
+	defer rows.Close()
+	if err != nil || rows == nil {
+		bq.log.Error(err)
+		return
+	}
+	if rows.Next() {
+		backfill = bq.New().Scan(rows)
+	}
+	return
+}
+
+func (bq *BackfillQuery) DeleteAll(userID id.UserID) error {
+	_, err := bq.db.Exec("DELETE FROM backfill_queue WHERE user_mxid=$1", userID)
+	return err
+}
+
+type Backfill struct {
+	db  *Database
+	log log.Logger
+
+	// Fields
+	QueueID        int
+	UserID         id.UserID
+	BackfillType   BackfillType
+	Priority       int
+	Portal         *PortalKey
+	TimeStart      *time.Time
+	TimeEnd        *time.Time
+	MaxBatchEvents int
+	MaxTotalEvents int
+	BatchDelay     int
+	CompletedAt    *time.Time
+}
+
+func (b *Backfill) Scan(row Scannable) *Backfill {
+	err := row.Scan(&b.QueueID, &b.UserID, &b.BackfillType, &b.Priority, &b.Portal.JID, &b.Portal.Receiver, &b.TimeStart, &b.TimeEnd, &b.MaxBatchEvents, &b.MaxTotalEvents, &b.BatchDelay)
+	if err != nil {
+		if !errors.Is(err, sql.ErrNoRows) {
+			b.log.Errorln("Database scan failed:", err)
+		}
+		return nil
+	}
+	return b
+}
+
+func (b *Backfill) Insert() {
+	rows, err := b.db.Query(`
+		INSERT INTO backfill_queue
+			(user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay, completed_at)
+		VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
+		RETURNING queue_id
+	`, b.UserID, b.BackfillType, b.Priority, b.Portal.JID, b.Portal.Receiver, b.TimeStart, b.TimeEnd, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay, b.CompletedAt)
+	defer rows.Close()
+	if err != nil || !rows.Next() {
+		b.log.Warnfln("Failed to insert %v/%s with priority %d: %v", b.BackfillType, b.Portal.JID, b.Priority, err)
+		return
+	}
+	err = rows.Scan(&b.QueueID)
+	if err != nil {
+		b.log.Warnfln("Failed to insert %s/%s with priority %s: %v", b.BackfillType, b.Portal.JID, b.Priority, err)
+	}
+}
+
+func (b *Backfill) MarkDone() {
+	if b.QueueID == 0 {
+		b.log.Errorf("Cannot delete backfill without queue_id. Maybe it wasn't actually inserted in the database?")
+		return
+	}
+	_, err := b.db.Exec("UPDATE backfill_queue SET completed_at=$1 WHERE queue_id=$2", time.Now(), b.QueueID)
+	if err != nil {
+		b.log.Warnfln("Failed to mark %s/%s as complete: %v", b.BackfillType, b.Priority, err)
+	}
+}

+ 10 - 0
database/database.go

@@ -46,6 +46,8 @@ type Database struct {
 	Reaction *ReactionQuery
 
 	DisappearingMessage *DisappearingMessageQuery
+	BackfillQuery       *BackfillQuery
+	HistorySyncQuery    *HistorySyncQuery
 }
 
 func New(cfg config.DatabaseConfig, baseLog log.Logger) (*Database, error) {
@@ -83,6 +85,14 @@ func New(cfg config.DatabaseConfig, baseLog log.Logger) (*Database, error) {
 		db:  db,
 		log: db.log.Sub("DisappearingMessage"),
 	}
+	db.BackfillQuery = &BackfillQuery{
+		db:  db,
+		log: db.log.Sub("Backfill"),
+	}
+	db.HistorySyncQuery = &HistorySyncQuery{
+		db:  db,
+		log: db.log.Sub("HistorySync"),
+	}
 
 	db.SetMaxOpenConns(cfg.MaxOpenConns)
 	db.SetMaxIdleConns(cfg.MaxIdleConns)

+ 317 - 0
database/historysync.go

@@ -0,0 +1,317 @@
+// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
+// Copyright (C) 2022 Tulir Asokan, Sumner Evans
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+package database
+
+import (
+	"database/sql"
+	"errors"
+	"fmt"
+	"strings"
+	"time"
+
+	waProto "go.mau.fi/whatsmeow/binary/proto"
+	"google.golang.org/protobuf/proto"
+
+	_ "github.com/mattn/go-sqlite3"
+	log "maunium.net/go/maulogger/v2"
+	"maunium.net/go/mautrix/id"
+)
+
+type HistorySyncQuery struct {
+	db  *Database
+	log log.Logger
+}
+
+type HistorySyncConversation struct {
+	db  *Database
+	log log.Logger
+
+	UserID                   id.UserID
+	ConversationID           string
+	PortalKey                *PortalKey
+	LastMessageTimestamp     time.Time
+	MuteEndTime              time.Time
+	Archived                 bool
+	Pinned                   uint32
+	DisappearingMode         waProto.DisappearingMode_DisappearingModeInitiator
+	EndOfHistoryTransferType waProto.Conversation_ConversationEndOfHistoryTransferType
+	EphemeralExpiration      *uint32
+	MarkedAsUnread           bool
+	UnreadCount              uint32
+}
+
+func (hsq *HistorySyncQuery) NewConversation() *HistorySyncConversation {
+	return &HistorySyncConversation{
+		db:        hsq.db,
+		log:       hsq.log,
+		PortalKey: &PortalKey{},
+	}
+}
+
+func (hsq *HistorySyncQuery) NewConversationWithValues(
+	userID id.UserID,
+	conversationID string,
+	portalKey *PortalKey,
+	lastMessageTimestamp,
+	muteEndTime uint64,
+	archived bool,
+	pinned uint32,
+	disappearingMode waProto.DisappearingMode_DisappearingModeInitiator,
+	endOfHistoryTransferType waProto.Conversation_ConversationEndOfHistoryTransferType,
+	ephemeralExpiration *uint32,
+	markedAsUnread bool,
+	unreadCount uint32) *HistorySyncConversation {
+	return &HistorySyncConversation{
+		db:                       hsq.db,
+		log:                      hsq.log,
+		UserID:                   userID,
+		ConversationID:           conversationID,
+		PortalKey:                portalKey,
+		LastMessageTimestamp:     time.Unix(int64(lastMessageTimestamp), 0),
+		MuteEndTime:              time.Unix(int64(muteEndTime), 0),
+		Archived:                 archived,
+		Pinned:                   pinned,
+		DisappearingMode:         disappearingMode,
+		EndOfHistoryTransferType: endOfHistoryTransferType,
+		EphemeralExpiration:      ephemeralExpiration,
+		MarkedAsUnread:           markedAsUnread,
+		UnreadCount:              unreadCount,
+	}
+}
+
+const (
+	getNMostRecentConversations = `
+		SELECT user_mxid, conversation_id, portal_jid, portal_receiver, last_message_timestamp, archived, pinned, mute_end_time, disappearing_mode, end_of_history_transfer_type, ephemeral_expiration, marked_as_unread, unread_count
+		  FROM history_sync_conversation
+		 WHERE user_mxid=$1
+		 ORDER BY last_message_timestamp DESC
+		 LIMIT $2
+	`
+	getConversationByPortal = `
+		SELECT user_mxid, conversation_id, portal_jid, portal_receiver, last_message_timestamp, archived, pinned, mute_end_time, disappearing_mode, end_of_history_transfer_type, ephemeral_expiration, marked_as_unread, unread_count
+		  FROM history_sync_conversation
+		 WHERE user_mxid=$1
+		   AND portal_jid=$2
+		   AND portal_receiver=$3
+	`
+)
+
+func (hsc *HistorySyncConversation) Upsert() {
+	_, err := hsc.db.Exec(`
+		INSERT INTO history_sync_conversation (user_mxid, conversation_id, portal_jid, portal_receiver, last_message_timestamp, archived, pinned, mute_end_time, disappearing_mode, end_of_history_transfer_type, ephemeral_expiration, marked_as_unread, unread_count)
+		VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
+		ON CONFLICT (user_mxid, conversation_id)
+		DO UPDATE SET
+			portal_jid=EXCLUDED.portal_jid,
+			portal_receiver=EXCLUDED.portal_receiver,
+			last_message_timestamp=CASE
+				WHEN EXCLUDED.last_message_timestamp > history_sync_conversation.last_message_timestamp THEN EXCLUDED.last_message_timestamp
+				ELSE history_sync_conversation.last_message_timestamp
+			END,
+			archived=EXCLUDED.archived,
+			pinned=EXCLUDED.pinned,
+			mute_end_time=EXCLUDED.mute_end_time,
+			disappearing_mode=EXCLUDED.disappearing_mode,
+			end_of_history_transfer_type=EXCLUDED.end_of_history_transfer_type,
+			ephemeral_expiration=EXCLUDED.ephemeral_expiration,
+			marked_as_unread=EXCLUDED.marked_as_unread,
+			unread_count=EXCLUDED.unread_count
+	`,
+		hsc.UserID,
+		hsc.ConversationID,
+		hsc.PortalKey.JID.String(),
+		hsc.PortalKey.Receiver.String(),
+		hsc.LastMessageTimestamp,
+		hsc.Archived,
+		hsc.Pinned,
+		hsc.MuteEndTime,
+		hsc.DisappearingMode,
+		hsc.EndOfHistoryTransferType,
+		hsc.EphemeralExpiration,
+		hsc.MarkedAsUnread,
+		hsc.UnreadCount)
+	if err != nil {
+		hsc.log.Warnfln("Failed to insert history sync conversation %s/%s: %v", hsc.UserID, hsc.ConversationID, err)
+	}
+}
+
+func (hsc *HistorySyncConversation) Scan(row Scannable) *HistorySyncConversation {
+	err := row.Scan(
+		&hsc.UserID,
+		&hsc.ConversationID,
+		&hsc.PortalKey.JID,
+		&hsc.PortalKey.Receiver,
+		&hsc.LastMessageTimestamp,
+		&hsc.Archived,
+		&hsc.Pinned,
+		&hsc.MuteEndTime,
+		&hsc.DisappearingMode,
+		&hsc.EndOfHistoryTransferType,
+		&hsc.EphemeralExpiration,
+		&hsc.MarkedAsUnread,
+		&hsc.UnreadCount)
+	if err != nil {
+		if !errors.Is(err, sql.ErrNoRows) {
+			hsc.log.Errorln("Database scan failed:", err)
+		}
+		return nil
+	}
+	return hsc
+}
+
+func (hsq *HistorySyncQuery) GetNMostRecentConversations(userID id.UserID, n int) (conversations []*HistorySyncConversation) {
+	rows, err := hsq.db.Query(getNMostRecentConversations, userID, n)
+	defer rows.Close()
+	if err != nil || rows == nil {
+		return nil
+	}
+	for rows.Next() {
+		conversations = append(conversations, hsq.NewConversation().Scan(rows))
+	}
+	return
+}
+
+func (hsq *HistorySyncQuery) GetConversation(userID id.UserID, portalKey *PortalKey) (conversation *HistorySyncConversation) {
+	rows, err := hsq.db.Query(getConversationByPortal, userID, portalKey.JID, portalKey.Receiver)
+	defer rows.Close()
+	if err != nil || rows == nil {
+		return nil
+	}
+	if rows.Next() {
+		conversation = hsq.NewConversation().Scan(rows)
+	}
+	return
+}
+
+func (hsq *HistorySyncQuery) DeleteAllConversations(userID id.UserID) error {
+	_, err := hsq.db.Exec("DELETE FROM history_sync_conversation WHERE user_mxid=$1", userID)
+	return err
+}
+
+const (
+	getMessagesBetween = `
+		SELECT data
+		  FROM history_sync_message
+		 WHERE user_mxid=$1
+		   AND conversation_id=$2
+		 %s
+		 ORDER BY timestamp DESC
+		 %s
+	`
+	deleteMessages = `
+		DELETE FROM history_sync_message
+		 WHERE %s
+	`
+)
+
+type HistorySyncMessage struct {
+	db  *Database
+	log log.Logger
+
+	UserID         id.UserID
+	ConversationID string
+	MessageID      string
+	Timestamp      time.Time
+	Data           []byte
+}
+
+func (hsq *HistorySyncQuery) NewMessageWithValues(userID id.UserID, conversationID, messageID string, message *waProto.HistorySyncMsg) (*HistorySyncMessage, error) {
+	msgData, err := proto.Marshal(message)
+	if err != nil {
+		return nil, err
+	}
+	return &HistorySyncMessage{
+		db:             hsq.db,
+		log:            hsq.log,
+		UserID:         userID,
+		ConversationID: conversationID,
+		MessageID:      messageID,
+		Timestamp:      time.Unix(int64(message.Message.GetMessageTimestamp()), 0),
+		Data:           msgData,
+	}, nil
+}
+
+func (hsm *HistorySyncMessage) Insert() {
+	_, err := hsm.db.Exec(`
+		INSERT INTO history_sync_message (user_mxid, conversation_id, message_id, timestamp, data)
+		VALUES ($1, $2, $3, $4, $5)
+		ON CONFLICT (user_mxid, conversation_id, message_id) DO NOTHING
+	`, hsm.UserID, hsm.ConversationID, hsm.MessageID, hsm.Timestamp, hsm.Data)
+	if err != nil {
+		hsm.log.Warnfln("Failed to insert history sync message %s/%s: %v", hsm.ConversationID, hsm.Timestamp, err)
+	}
+}
+
+func (hsq *HistorySyncQuery) GetMessagesBetween(userID id.UserID, conversationID string, startTime, endTime *time.Time, limit int) (messages []*waProto.WebMessageInfo) {
+	whereClauses := ""
+	args := []interface{}{userID, conversationID}
+	argNum := 3
+	if startTime != nil {
+		whereClauses += fmt.Sprintf(" AND timestamp >= $%d", argNum)
+		args = append(args, startTime)
+		argNum++
+	}
+	if endTime != nil {
+		whereClauses += fmt.Sprintf(" AND timestamp <= $%d", argNum)
+		args = append(args, endTime)
+	}
+
+	limitClause := ""
+	if limit > 0 {
+		limitClause = fmt.Sprintf("LIMIT %d", limit)
+	}
+
+	rows, err := hsq.db.Query(fmt.Sprintf(getMessagesBetween, whereClauses, limitClause), args...)
+	defer rows.Close()
+	if err != nil || rows == nil {
+		return nil
+	}
+
+	var msgData []byte
+	for rows.Next() {
+		err := rows.Scan(&msgData)
+		if err != nil {
+			hsq.log.Error("Database scan failed: %v", err)
+			continue
+		}
+		var historySyncMsg waProto.HistorySyncMsg
+		err = proto.Unmarshal(msgData, &historySyncMsg)
+		if err != nil {
+			hsq.log.Errorf("Failed to unmarshal history sync message: %v", err)
+			continue
+		}
+		messages = append(messages, historySyncMsg.Message)
+	}
+	return
+}
+
+func (hsq *HistorySyncQuery) DeleteMessages(userID id.UserID, conversationID string, messages []*waProto.WebMessageInfo) error {
+	whereClauses := []string{}
+	preparedStatementArgs := []interface{}{userID, conversationID}
+	for i, msg := range messages {
+		whereClauses = append(whereClauses, fmt.Sprintf("(user_mxid=$1 AND conversation_id=$2 AND message_id=$%d)", i+3))
+		preparedStatementArgs = append(preparedStatementArgs, msg.GetKey().GetId())
+	}
+
+	_, err := hsq.db.Exec(fmt.Sprintf(deleteMessages, strings.Join(whereClauses, " OR ")), preparedStatementArgs...)
+	return err
+}
+
+func (hsq *HistorySyncQuery) DeleteAllMessages(userID id.UserID) error {
+	_, err := hsq.db.Exec("DELETE FROM history_sync_message WHERE user_mxid=$1", userID)
+	return err
+}

+ 45 - 0
database/upgrades/2022-03-15-prioritized-backfill.go

@@ -0,0 +1,45 @@
+package upgrades
+
+import (
+	"database/sql"
+	"fmt"
+)
+
+func init() {
+	upgrades[39] = upgrade{"Add backfill queue", func(tx *sql.Tx, ctx context) error {
+		// The queue_id needs to auto-increment every insertion. For SQLite,
+		// INTEGER PRIMARY KEY is an alias for the ROWID, so it will
+		// auto-increment. See https://sqlite.org/lang_createtable.html#rowid
+		// For Postgres, we need to add GENERATED ALWAYS AS IDENTITY for the
+		// same functionality.
+		queueIDColumnTypeModifier := ""
+		if ctx.dialect == Postgres {
+			queueIDColumnTypeModifier = "GENERATED ALWAYS AS IDENTITY"
+		}
+
+		_, err := tx.Exec(fmt.Sprintf(`
+			CREATE TABLE backfill_queue (
+				queue_id            INTEGER PRIMARY KEY %s,
+				user_mxid           TEXT,
+				type                INTEGER NOT NULL,
+				priority            INTEGER NOT NULL,
+				portal_jid          TEXT,
+				portal_receiver     TEXT,
+				time_start          TIMESTAMP,
+				time_end            TIMESTAMP,
+				max_batch_events    INTEGER NOT NULL,
+				max_total_events    INTEGER,
+				batch_delay         INTEGER,
+				completed_at        TIMESTAMP,
+
+				FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,
+				FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE
+			)
+		`, queueIDColumnTypeModifier))
+		if err != nil {
+			return err
+		}
+
+		return err
+	}}
+}

+ 52 - 0
database/upgrades/2022-03-18-historysync-store.go

@@ -0,0 +1,52 @@
+package upgrades
+
+import (
+	"database/sql"
+)
+
+func init() {
+	upgrades[40] = upgrade{"Store history syncs for later backfills", func(tx *sql.Tx, ctx context) error {
+		_, err := tx.Exec(`
+			CREATE TABLE history_sync_conversation (
+				user_mxid                       TEXT,
+				conversation_id                 TEXT,
+				portal_jid                      TEXT,
+				portal_receiver                 TEXT,
+				last_message_timestamp          TIMESTAMP,
+				archived                        BOOLEAN,
+				pinned                          INTEGER,
+				mute_end_time                   TIMESTAMP,
+				disappearing_mode               INTEGER,
+				end_of_history_transfer_type    INTEGER,
+				ephemeral_expiration            INTEGER,
+				marked_as_unread                BOOLEAN,
+				unread_count                    INTEGER,
+
+				PRIMARY KEY (user_mxid, conversation_id),
+				FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,
+				FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE ON UPDATE CASCADE
+			)
+		`)
+		if err != nil {
+			return err
+		}
+		_, err = tx.Exec(`
+			CREATE TABLE history_sync_message (
+				user_mxid                TEXT,
+				conversation_id          TEXT,
+				message_id               TEXT,
+				timestamp                TIMESTAMP,
+				data                     BYTEA,
+
+				PRIMARY KEY (user_mxid, conversation_id, message_id),
+				FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,
+				FOREIGN KEY (user_mxid, conversation_id) REFERENCES history_sync_conversation(user_mxid, conversation_id) ON DELETE CASCADE
+			)
+		`)
+		if err != nil {
+			return err
+		}
+
+		return nil
+	}}
+}

+ 1 - 1
database/upgrades/upgrades.go

@@ -40,7 +40,7 @@ type upgrade struct {
 	fn      upgradeFunc
 }
 
-const NumberOfUpgrades = 39
+const NumberOfUpgrades = 41
 
 var upgrades [NumberOfUpgrades]upgrade
 

+ 47 - 5
example-config.yaml

@@ -115,14 +115,10 @@ bridge:
     # Should another user's cryptographic identity changing send a message to Matrix?
     identity_change_notices: false
     portal_message_buffer: 128
-    # Settings for handling history sync payloads. These settings only apply right after login,
-    # because the phone only sends the history sync data once, and there's no way to re-request it
-    # (other than logging out and back in again).
+    # Settings for handling history sync payloads.
     history_sync:
         # Should the bridge create portals for chats in the history sync payload?
         create_portals: true
-        # Maximum age of chats in seconds to create portals for. Set to 0 to create portals for all chats in sync payload.
-        max_age: 604800
         # Enable backfilling history sync payloads from WhatsApp using batch sending?
         # This requires a server with MSC2716 support, which is currently an experimental feature in synapse.
         # It can be enabled by setting experimental_features -> msc2716_enabled to true in homeserver.yaml.
@@ -137,6 +133,52 @@ bridge:
         # Should the bridge request a full sync from the phone when logging in?
         # This bumps the size of history syncs from 3 months to 1 year.
         request_full_sync: false
+        # The maximum number of initial conversations that should be synced.
+        # Other conversations will be backfilled on demand when the start PM
+        # provisioning endpoint is used or when a message comes in from that
+        # chat.
+        max_initial_conversations: 10
+        # Settings for immediate backfills. These backfills should generally be
+        # small and their main purpose is to populate each of the initial chats
+        # (as configured by max_initial_conversations) with a few messages so
+        # that you can continue conversations without loosing context.
+        immediate:
+            # The number of concurrent backfill workers to create for immediate
+            # backfills. Note that using more than one worker could cause the
+            # room list to jump around since there are no guarantees about the
+            # order in which the backfills will complete.
+            worker_count: 1
+            # The maximum number of events to backfill initially.
+            max_events: 10
+        # Settings for deferred backfills. The purpose of these backfills are
+        # to fill in the rest of the chat history that was not covered by the
+        # immediate backfills. These backfills generally should happen at a
+        # slower pace so as not to overload the homeserver.
+        # Each deferred backfill config should define a "stage" of backfill
+        # (i.e. the last week of messages). The fields are as follows:
+        # - start_days_ago: the number of days ago to start backfilling from.
+        #   To indicate the start of time, use -1. For example, for a week ago,
+        #   use 7.
+        # - max_batch_events: the number of events to send per batch.
+        # - batch_delay: the number of seconds to wait before backfilling each
+        #   batch.
+        deferred:
+            # Last Week
+            - start_days_ago: 7
+              max_batch_events: 20
+              batch_delay: 5
+            # Last Month
+            - start_days_ago: 30
+              max_batch_events: 50
+              batch_delay: 10
+            # Last 3 months
+            - start_days_ago: 90
+              max_batch_events: 100
+              batch_delay: 10
+            # The start of time
+            - start_days_ago: -1
+              max_batch_events: 500
+              batch_delay: 10
     # Should puppet avatars be fetched from the server even if an avatar is already set?
     user_avatar_sync: true
     # Should Matrix users leaving groups be bridged to WhatsApp?

+ 231 - 160
historysync.go

@@ -18,8 +18,6 @@ package main
 
 import (
 	"fmt"
-	"sort"
-	"sync"
 	"time"
 
 	waProto "go.mau.fi/whatsmeow/binary/proto"
@@ -35,12 +33,6 @@ import (
 
 // region User history sync handling
 
-type portalToBackfill struct {
-	portal *Portal
-	conv   *waProto.Conversation
-	msgs   []*waProto.WebMessageInfo
-}
-
 type wrappedInfo struct {
 	*types.MessageInfo
 	Type  database.MessageType
@@ -50,107 +42,81 @@ type wrappedInfo struct {
 	ExpiresIn       uint32
 }
 
-type conversationList []*waProto.Conversation
-
-var _ sort.Interface = (conversationList)(nil)
-
-func (c conversationList) Len() int {
-	return len(c)
-}
-
-func (c conversationList) Less(i, j int) bool {
-	return getConversationTimestamp(c[i]) < getConversationTimestamp(c[j])
-}
-
-func (c conversationList) Swap(i, j int) {
-	c[i], c[j] = c[j], c[i]
-}
-
 func (user *User) handleHistorySyncsLoop() {
-	for evt := range user.historySyncs {
-		go user.sendBridgeState(BridgeState{StateEvent: StateBackfilling})
-		user.handleHistorySync(evt.Data)
-		if len(user.historySyncs) == 0 && user.IsConnected() {
-			go user.sendBridgeState(BridgeState{StateEvent: StateConnected})
-		}
+	if !user.bridge.Config.Bridge.HistorySync.Backfill {
+		return
 	}
-}
 
-func (user *User) handleHistorySync(evt *waProto.HistorySync) {
-	if evt == nil || evt.SyncType == nil || evt.GetSyncType() == waProto.HistorySync_INITIAL_STATUS_V3 || evt.GetSyncType() == waProto.HistorySync_PUSH_NAME {
-		return
+	reCheckQueue := make(chan bool, 1)
+	// Start the backfill queue.
+	user.BackfillQueue = &BackfillQueue{
+		BackfillQuery:             user.bridge.DB.BackfillQuery,
+		ImmediateBackfillRequests: make(chan *database.Backfill, 1),
+		DeferredBackfillRequests:  make(chan *database.Backfill, 1),
+		ReCheckQueue:              make(chan bool, 1),
+		log:                       user.log.Sub("BackfillQueue"),
 	}
-	description := fmt.Sprintf("type %s, %d conversations, chunk order %d, progress %d%%", evt.GetSyncType(), len(evt.GetConversations()), evt.GetChunkOrder(), evt.GetProgress())
-	user.log.Infoln("Handling history sync with", description)
+	reCheckQueue = user.BackfillQueue.ReCheckQueue
 
-	conversations := conversationList(evt.GetConversations())
-	// We want to handle recent conversations first
-	sort.Sort(sort.Reverse(conversations))
-	portalsToBackfill := make(chan portalToBackfill, len(conversations))
+	// Immediate backfills can be done in parallel
+	for i := 0; i < user.bridge.Config.Bridge.HistorySync.Immediate.WorkerCount; i++ {
+		go user.handleBackfillRequestsLoop(user.BackfillQueue.ImmediateBackfillRequests)
+	}
 
-	var backfillWait sync.WaitGroup
-	backfillWait.Add(1)
-	go user.backfillLoop(portalsToBackfill, backfillWait.Done)
-	for _, conv := range conversations {
-		user.handleHistorySyncConversation(conv, portalsToBackfill)
+	// Deferred backfills should be handled synchronously so as not to
+	// overload the homeserver. Users can configure their backfill stages
+	// to be more or less aggressive with backfilling at this stage.
+	go user.handleBackfillRequestsLoop(user.BackfillQueue.DeferredBackfillRequests)
+	go user.BackfillQueue.RunLoops(user)
+
+	// Always save the history syncs for the user. If they want to enable
+	// backfilling in the future, we will have it in the database.
+	for evt := range user.historySyncs {
+		user.handleHistorySync(reCheckQueue, evt.Data)
 	}
-	close(portalsToBackfill)
-	backfillWait.Wait()
-	user.log.Infoln("Finished handling history sync with", description)
 }
 
-func (user *User) backfillLoop(ch chan portalToBackfill, done func()) {
-	defer done()
-	for ptb := range ch {
-		if len(ptb.msgs) > 0 {
-			user.log.Debugln("Bridging history sync payload for", ptb.portal.Key.JID)
-			ptb.portal.backfill(user, ptb.msgs)
-		} else {
-			user.log.Debugfln("Not backfilling %s: no bridgeable messages found", ptb.portal.Key.JID)
+func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Backfill) {
+	for req := range backfillRequests {
+		user.log.Infof("Backfill request: %v", req)
+		conv := user.bridge.DB.HistorySyncQuery.GetConversation(user.MXID, req.Portal)
+		if conv == nil {
+			user.log.Errorf("Could not find conversation for %s in %s", user.MXID, req.Portal.String())
+			continue
 		}
-		if !ptb.conv.GetMarkedAsUnread() && ptb.conv.GetUnreadCount() == 0 {
-			user.markSelfReadFull(ptb.portal)
+
+		// Update the client store with basic chat settings.
+		if conv.MuteEndTime.After(time.Now()) {
+			user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime)
+		}
+		if conv.Archived {
+			user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true)
+		}
+		if conv.Pinned > 0 {
+			user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true)
 		}
-	}
-}
 
-func (user *User) handleHistorySyncConversation(conv *waProto.Conversation, portalsToBackfill chan portalToBackfill) {
-	jid, err := types.ParseJID(conv.GetId())
-	if err != nil {
-		user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err)
-		return
-	}
+		portal := user.GetPortalByJID(conv.PortalKey.JID)
+		if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration {
+			portal.ExpirationTime = *conv.EphemeralExpiration
+			portal.Update()
+		}
 
-	// Update the client store with basic chat settings.
-	muteEnd := time.Unix(int64(conv.GetMuteEndTime()), 0)
-	if muteEnd.After(time.Now()) {
-		_ = user.Client.Store.ChatSettings.PutMutedUntil(jid, muteEnd)
-	}
-	if conv.GetArchived() {
-		_ = user.Client.Store.ChatSettings.PutArchived(jid, true)
-	}
-	if conv.GetPinned() > 0 {
-		_ = user.Client.Store.ChatSettings.PutPinned(jid, true)
+		user.createOrUpdatePortalAndBackfillWithLock(req, conv, portal)
 	}
+}
+
+func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill, conv *database.HistorySyncConversation, portal *Portal) {
+	portal.backfillLock.Lock()
+	defer portal.backfillLock.Unlock()
 
-	portal := user.GetPortalByJID(jid)
-	if conv.EphemeralExpiration != nil && portal.ExpirationTime != conv.GetEphemeralExpiration() {
-		portal.ExpirationTime = conv.GetEphemeralExpiration()
-		portal.Update()
-	}
-	// Check if portal is too old or doesn't contain anything we can bridge.
 	if !user.shouldCreatePortalForHistorySync(conv, portal) {
 		return
 	}
 
-	var msgs []*waProto.WebMessageInfo
-	if user.bridge.Config.Bridge.HistorySync.Backfill {
-		msgs = filterMessagesToBackfill(conv.GetMessages())
-	}
-	ptb := portalToBackfill{portal: portal, conv: conv, msgs: msgs}
 	if len(portal.MXID) == 0 {
 		user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling")
-		err = portal.CreateMatrixRoom(user, getPartialInfoFromConversation(jid, conv), false)
+		err := portal.CreateMatrixRoom(user, nil, true, false)
 		if err != nil {
 			user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err)
 			return
@@ -158,26 +124,53 @@ func (user *User) handleHistorySyncConversation(conv *waProto.Conversation, port
 	} else {
 		portal.UpdateMatrixRoom(user, nil)
 	}
-	if !user.bridge.Config.Bridge.HistorySync.Backfill {
-		user.log.Debugln("Backfill is disabled, not bridging history sync payload for", portal.Key.JID)
+
+	allMsgs := user.bridge.DB.HistorySyncQuery.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents)
+
+	if len(allMsgs) > 0 {
+		user.log.Debugf("Backfilling %d messages in %s, %d messages at a time", len(allMsgs), portal.Key.JID, req.MaxBatchEvents)
+		toBackfill := allMsgs[0:]
+		insertionEventIds := []id.EventID{}
+		for {
+			if len(toBackfill) == 0 {
+				break
+			}
+
+			var msgs []*waProto.WebMessageInfo
+			if len(toBackfill) <= req.MaxBatchEvents {
+				msgs = toBackfill
+				toBackfill = toBackfill[0:0]
+			} else {
+				msgs = toBackfill[len(toBackfill)-req.MaxBatchEvents:]
+				toBackfill = toBackfill[:len(toBackfill)-req.MaxBatchEvents]
+			}
+
+			if len(msgs) > 0 {
+				time.Sleep(time.Duration(req.BatchDelay) * time.Second)
+				user.log.Debugf("Backfilling %d messages in %s", len(msgs), portal.Key.JID)
+				insertionEventIds = append(insertionEventIds, portal.backfill(user, msgs)...)
+			}
+		}
+		user.log.Debugf("Finished backfilling %d messages in %s", len(allMsgs), portal.Key.JID)
+		if len(insertionEventIds) > 0 {
+			portal.sendPostBackfillDummy(
+				time.Unix(int64(allMsgs[len(allMsgs)-1].GetMessageTimestamp()), 0),
+				insertionEventIds[0])
+		}
+		user.log.Debugf("Deleting %d history sync messages after backfilling", len(allMsgs))
+		err := user.bridge.DB.HistorySyncQuery.DeleteMessages(user.MXID, conv.ConversationID, allMsgs)
+		if err != nil {
+			user.log.Warnf("Failed to delete %d history sync messages after backfilling: %v", len(allMsgs), err)
+		}
 	} else {
-		portalsToBackfill <- ptb
+		user.log.Debugfln("Not backfilling %s: no bridgeable messages found", portal.Key.JID)
 	}
-}
-
-func getConversationTimestamp(conv *waProto.Conversation) uint64 {
-	convTs := conv.GetConversationTimestamp()
-	if convTs == 0 && len(conv.GetMessages()) > 0 {
-		convTs = conv.Messages[0].GetMessage().GetMessageTimestamp()
+	if !conv.MarkedAsUnread && conv.UnreadCount == 0 {
+		user.markSelfReadFull(portal)
 	}
-	return convTs
 }
 
-func (user *User) shouldCreatePortalForHistorySync(conv *waProto.Conversation, portal *Portal) bool {
-	maxAge := user.bridge.Config.Bridge.HistorySync.MaxAge
-	minLastMsgToCreate := time.Now().Add(-time.Duration(maxAge) * time.Second)
-	lastMsg := time.Unix(int64(getConversationTimestamp(conv)), 0)
-
+func (user *User) shouldCreatePortalForHistorySync(conv *database.HistorySyncConversation, portal *Portal) bool {
 	if len(portal.MXID) > 0 {
 		user.log.Debugfln("Portal for %s already exists, ensuring user is invited", portal.Key.JID)
 		portal.ensureUserInvited(user)
@@ -185,10 +178,6 @@ func (user *User) shouldCreatePortalForHistorySync(conv *waProto.Conversation, p
 		return true
 	} else if !user.bridge.Config.Bridge.HistorySync.CreatePortals {
 		user.log.Debugfln("Not creating portal for %s: creating rooms from history sync is disabled", portal.Key.JID)
-	} else if !containsSupportedMessages(conv) {
-		user.log.Debugfln("Not creating portal for %s: no interesting messages found", portal.Key.JID)
-	} else if maxAge > 0 && !lastMsg.After(minLastMsgToCreate) {
-		user.log.Debugfln("Not creating portal for %s: last message older than limit (%s)", portal.Key.JID, lastMsg)
 	} else {
 		// Portal doesn't exist, but should be created
 		return true
@@ -197,47 +186,112 @@ func (user *User) shouldCreatePortalForHistorySync(conv *waProto.Conversation, p
 	return false
 }
 
-func filterMessagesToBackfill(messages []*waProto.HistorySyncMsg) []*waProto.WebMessageInfo {
-	filtered := make([]*waProto.WebMessageInfo, 0, len(messages))
-	for _, msg := range messages {
-		wmi := msg.GetMessage()
-		msgType := getMessageType(wmi.GetMessage())
-		if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" {
+func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.HistorySync) {
+	if evt == nil || evt.SyncType == nil || evt.GetSyncType() == waProto.HistorySync_INITIAL_STATUS_V3 || evt.GetSyncType() == waProto.HistorySync_PUSH_NAME {
+		return
+	}
+	description := fmt.Sprintf("type %s, %d conversations, chunk order %d", evt.GetSyncType(), len(evt.GetConversations()), evt.GetChunkOrder())
+	user.log.Infoln("Storing history sync with", description)
+
+	for _, conv := range evt.GetConversations() {
+		jid, err := types.ParseJID(conv.GetId())
+		if err != nil {
+			user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err)
 			continue
-		} else {
-			filtered = append(filtered, wmi)
+		}
+		portal := user.GetPortalByJID(jid)
+
+		historySyncConversation := user.bridge.DB.HistorySyncQuery.NewConversationWithValues(
+			user.MXID,
+			conv.GetId(),
+			&portal.Key,
+			getConversationTimestamp(conv),
+			conv.GetMuteEndTime(),
+			conv.GetArchived(),
+			conv.GetPinned(),
+			conv.GetDisappearingMode().GetInitiator(),
+			conv.GetEndOfHistoryTransferType(),
+			conv.EphemeralExpiration,
+			conv.GetMarkedAsUnread(),
+			conv.GetUnreadCount())
+		historySyncConversation.Upsert()
+
+		for _, msg := range conv.GetMessages() {
+			// Don't store messages that will just be skipped.
+			wmi := msg.GetMessage()
+			msgType := getMessageType(wmi.GetMessage())
+			if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" {
+				continue
+			}
+
+			// Don't store unsupported messages.
+			if !containsSupportedMessage(msg.GetMessage().GetMessage()) {
+				continue
+			}
+
+			message, err := user.bridge.DB.HistorySyncQuery.NewMessageWithValues(user.MXID, conv.GetId(), msg.Message.GetKey().GetId(), msg)
+			if err != nil {
+				user.log.Warnf("Failed to save message %s in %s. Error: %+v", msg.Message.Key.Id, conv.GetId(), err)
+				continue
+			}
+			message.Insert()
 		}
 	}
-	return filtered
-}
 
-func containsSupportedMessages(conv *waProto.Conversation) bool {
-	for _, msg := range conv.GetMessages() {
-		if containsSupportedMessage(msg.GetMessage().GetMessage()) {
-			return true
+	// If this was the initial bootstrap, enqueue immediate backfills for the
+	// most recent portals. If it's the last history sync event, start
+	// backfilling the rest of the history of the portals.
+	if user.bridge.Config.Bridge.HistorySync.Backfill && (evt.GetSyncType() == waProto.HistorySync_INITIAL_BOOTSTRAP || evt.GetSyncType() == waProto.HistorySync_FULL || evt.GetSyncType() == waProto.HistorySync_RECENT) {
+		nMostRecent := user.bridge.DB.HistorySyncQuery.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations)
+		for i, conv := range nMostRecent {
+			jid, err := types.ParseJID(conv.ConversationID)
+			if err != nil {
+				user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.ConversationID, err)
+				continue
+			}
+			portal := user.GetPortalByJID(jid)
+
+			switch evt.GetSyncType() {
+			case waProto.HistorySync_INITIAL_BOOTSTRAP:
+				// Enqueue immediate backfills for the most recent messages first.
+				user.EnqueueImmedateBackfill(portal, i)
+			case waProto.HistorySync_FULL, waProto.HistorySync_RECENT:
+				if evt.GetProgress() >= 99 {
+					// Enqueue deferred backfills as configured.
+					user.EnqueueDeferredBackfills(portal, len(nMostRecent), i)
+				}
+			}
 		}
+
+		// Tell the queue to check for new backfill requests.
+		reCheckQueue <- true
 	}
-	return false
 }
 
-func getPartialInfoFromConversation(jid types.JID, conv *waProto.Conversation) *types.GroupInfo {
-	// TODO broadcast list info?
-	if jid.Server != types.GroupServer {
-		return nil
+func getConversationTimestamp(conv *waProto.Conversation) uint64 {
+	convTs := conv.GetConversationTimestamp()
+	if convTs == 0 && len(conv.GetMessages()) > 0 {
+		convTs = conv.Messages[0].GetMessage().GetMessageTimestamp()
 	}
-	participants := make([]types.GroupParticipant, len(conv.GetParticipant()))
-	for i, pcp := range conv.GetParticipant() {
-		participantJID, _ := types.ParseJID(pcp.GetUserJid())
-		participants[i] = types.GroupParticipant{
-			JID:          participantJID,
-			IsAdmin:      pcp.GetRank() == waProto.GroupParticipant_ADMIN,
-			IsSuperAdmin: pcp.GetRank() == waProto.GroupParticipant_SUPERADMIN,
+	return convTs
+}
+
+func (user *User) EnqueueImmedateBackfill(portal *Portal, priority int) {
+	maxMessages := user.bridge.Config.Bridge.HistorySync.Immediate.MaxEvents
+	initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, priority, &portal.Key, nil, nil, maxMessages, maxMessages, 0)
+	initialBackfill.Insert()
+}
+
+func (user *User) EnqueueDeferredBackfills(portal *Portal, numConversations, priority int) {
+	for j, backfillStage := range user.bridge.Config.Bridge.HistorySync.Deferred {
+		var startDate *time.Time = nil
+		if backfillStage.StartDaysAgo > 0 {
+			startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo)
+			startDate = &startDaysAgo
 		}
-	}
-	return &types.GroupInfo{
-		JID:          jid,
-		GroupName:    types.GroupName{Name: conv.GetName()},
-		Participants: participants,
+		backfill := user.bridge.DB.BackfillQuery.NewWithValues(
+			user.MXID, database.BackfillDeferred, j*numConversations+priority, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
+		backfill.Insert()
 	}
 }
 
@@ -246,14 +300,15 @@ func getPartialInfoFromConversation(jid types.JID, conv *waProto.Conversation) *
 
 var (
 	PortalCreationDummyEvent = event.Type{Type: "fi.mau.dummy.portal_created", Class: event.MessageEventType}
-	BackfillEndDummyEvent    = event.Type{Type: "fi.mau.dummy.backfill_end", Class: event.MessageEventType}
 	PreBackfillDummyEvent    = event.Type{Type: "fi.mau.dummy.pre_backfill", Class: event.MessageEventType}
-)
 
-func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) {
-	portal.backfillLock.Lock()
-	defer portal.backfillLock.Unlock()
+	// Marker events for when a backfill finishes
+	BackfillEndDummyEvent = event.Type{Type: "fi.mau.dummy.backfill_end", Class: event.MessageEventType}
+	RoomMarker            = event.Type{Type: "m.room.marker", Class: event.MessageEventType}
+	MSC2716Marker         = event.Type{Type: "org.matrix.msc2716.marker", Class: event.MessageEventType}
+)
 
+func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) []id.EventID {
 	var historyBatch, newBatch mautrix.ReqBatchSend
 	var historyBatchInfos, newBatchInfos []*wrappedInfo
 
@@ -375,32 +430,33 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo)
 		}
 	}
 
+	insertionEventIds := []id.EventID{}
+
 	if len(historyBatch.Events) > 0 && len(historyBatch.PrevEventID) > 0 {
 		portal.log.Infofln("Sending %d historical messages...", len(historyBatch.Events))
 		historyResp, err := portal.MainIntent().BatchSend(portal.MXID, &historyBatch)
+		insertionEventIds = append(insertionEventIds, historyResp.BaseInsertionEventID)
 		if err != nil {
 			portal.log.Errorln("Error sending batch of historical messages:", err)
 		} else {
 			portal.finishBatch(historyResp.EventIDs, historyBatchInfos)
 			portal.NextBatchID = historyResp.NextBatchID
 			portal.Update()
-			// If batchID is non-empty, it means this is backfilling very old messages, and we don't need a post-backfill dummy.
-			if historyBatch.BatchID == "" {
-				portal.sendPostBackfillDummy(time.UnixMilli(historyBatch.Events[len(historyBatch.Events)-1].Timestamp))
-			}
 		}
 	}
 
 	if len(newBatch.Events) > 0 && len(newBatch.PrevEventID) > 0 {
 		portal.log.Infofln("Sending %d new messages...", len(newBatch.Events))
 		newResp, err := portal.MainIntent().BatchSend(portal.MXID, &newBatch)
+		insertionEventIds = append(insertionEventIds, newResp.BaseInsertionEventID)
 		if err != nil {
 			portal.log.Errorln("Error sending batch of new messages:", err)
 		} else {
 			portal.finishBatch(newResp.EventIDs, newBatchInfos)
-			portal.sendPostBackfillDummy(time.UnixMilli(newBatch.Events[len(newBatch.Events)-1].Timestamp))
 		}
 	}
+
+	return insertionEventIds
 }
 
 func (portal *Portal) parseWebMessageInfo(source *User, webMsg *waProto.WebMessageInfo) *types.MessageInfo {
@@ -478,6 +534,15 @@ func (portal *Portal) wrapBatchEvent(info *types.MessageInfo, intent *appservice
 	if err != nil {
 		return nil, err
 	}
+
+	if eventType == event.EventEncrypted {
+		// Clear other custom keys if the event was encrypted, but keep the double puppet identifier
+		wrappedContent.Raw = map[string]interface{}{backfillIDField: info.ID}
+		if intent.IsCustomPuppet {
+			wrappedContent.Raw[doublePuppetKey] = doublePuppetValue
+		}
+	}
+
 	return &event.Event{
 		Sender:    intent.UserID,
 		Type:      newEventType,
@@ -530,19 +595,25 @@ func (portal *Portal) finishBatchEvt(info *wrappedInfo, eventID id.EventID) {
 	}
 }
 
-func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time) {
-	resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, BackfillEndDummyEvent, struct{}{})
-	if err != nil {
-		portal.log.Errorln("Error sending post-backfill dummy event:", err)
-		return
+func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time, insertionEventId id.EventID) {
+	for _, evtType := range []event.Type{BackfillEndDummyEvent, RoomMarker, MSC2716Marker} {
+		resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, evtType, map[string]interface{}{
+			"org.matrix.msc2716.marker.insertion": insertionEventId,
+			"m.marker.insertion":                  insertionEventId,
+		})
+		if err != nil {
+			portal.log.Errorln("Error sending post-backfill dummy event:", err)
+			return
+		}
+		msg := portal.bridge.DB.Message.New()
+		msg.Chat = portal.Key
+		msg.MXID = resp.EventID
+		msg.JID = types.MessageID(resp.EventID)
+		msg.Timestamp = lastTimestamp.Add(1 * time.Second)
+		msg.Sent = true
+		msg.Insert()
+
 	}
-	msg := portal.bridge.DB.Message.New()
-	msg.Chat = portal.Key
-	msg.MXID = resp.EventID
-	msg.JID = types.MessageID(resp.EventID)
-	msg.Timestamp = lastTimestamp.Add(1 * time.Second)
-	msg.Sent = true
-	msg.Insert()
 }
 
 // endregion

+ 19 - 9
portal.go

@@ -237,7 +237,7 @@ func (portal *Portal) handleMessageLoopItem(msg PortalMessage) {
 			return
 		}
 		portal.log.Debugln("Creating Matrix room from incoming message")
-		err := portal.CreateMatrixRoom(msg.source, nil, false)
+		err := portal.CreateMatrixRoom(msg.source, nil, false, true)
 		if err != nil {
 			portal.log.Errorln("Failed to create portal room:", err)
 			return
@@ -1164,7 +1164,7 @@ func (portal *Portal) UpdateBridgeInfo() {
 	}
 }
 
-func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, isFullInfo bool) error {
+func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, isFullInfo, backfill bool) error {
 	portal.roomCreateLock.Lock()
 	defer portal.roomCreateLock.Unlock()
 	if len(portal.MXID) > 0 {
@@ -1337,6 +1337,12 @@ func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, i
 		portal.FirstEventID = firstEventResp.EventID
 		portal.Update()
 	}
+
+	if user.bridge.Config.Bridge.HistorySync.Backfill && backfill {
+		user.EnqueueImmedateBackfill(portal, 0)
+		user.EnqueueDeferredBackfills(portal, 1, 0)
+		user.BackfillQueue.ReCheckQueue <- true
+	}
 	return nil
 }
 
@@ -1895,7 +1901,7 @@ func shallowCopyMap(data map[string]interface{}) map[string]interface{} {
 	return newMap
 }
 
-func (portal *Portal) makeMediaBridgeFailureMessage(info *types.MessageInfo, bridgeErr error, converted *ConvertedMessage, keys *FailedMediaKeys) *ConvertedMessage {
+func (portal *Portal) makeMediaBridgeFailureMessage(info *types.MessageInfo, bridgeErr error, converted *ConvertedMessage, keys *FailedMediaKeys, userFriendlyError string) *ConvertedMessage {
 	portal.log.Errorfln("Failed to bridge media for %s: %v", info.ID, bridgeErr)
 	if keys != nil {
 		meta := &FailedMediaMeta{
@@ -1908,9 +1914,13 @@ func (portal *Portal) makeMediaBridgeFailureMessage(info *types.MessageInfo, bri
 		portal.mediaErrorCache[info.ID] = meta
 	}
 	converted.Type = event.EventMessage
+	body := userFriendlyError
+	if body == "" {
+		body = fmt.Sprintf("Failed to bridge media: %v", bridgeErr)
+	}
 	converted.Content = &event.MessageEventContent{
 		MsgType: event.MsgNotice,
-		Body:    fmt.Sprintf("Failed to bridge media: %v", bridgeErr),
+		Body:    body,
 	}
 	return converted
 }
@@ -2159,24 +2169,24 @@ func (portal *Portal) convertMediaMessage(intent *appservice.IntentAPI, source *
 			Type:      whatsmeow.GetMediaType(msg),
 			SHA256:    msg.GetFileSha256(),
 			EncSHA256: msg.GetFileEncSha256(),
-		})
+		}, "Old photo or attachment. This will sync in a future update.")
 	} else if errors.Is(err, whatsmeow.ErrNoURLPresent) {
 		portal.log.Debugfln("No URL present error for media message %s, ignoring...", info.ID)
 		return nil
 	} else if errors.Is(err, whatsmeow.ErrFileLengthMismatch) || errors.Is(err, whatsmeow.ErrInvalidMediaSHA256) {
 		portal.log.Warnfln("Mismatching media checksums in %s: %v. Ignoring because WhatsApp seems to ignore them too", info.ID, err)
 	} else if err != nil {
-		return portal.makeMediaBridgeFailureMessage(info, err, converted, nil)
+		return portal.makeMediaBridgeFailureMessage(info, err, converted, nil, "")
 	}
 
 	err = portal.uploadMedia(intent, data, converted.Content)
 	if err != nil {
 		if errors.Is(err, mautrix.MTooLarge) {
-			return portal.makeMediaBridgeFailureMessage(info, errors.New("homeserver rejected too large file"), converted, nil)
+			return portal.makeMediaBridgeFailureMessage(info, errors.New("homeserver rejected too large file"), converted, nil, "")
 		} else if httpErr, ok := err.(mautrix.HTTPError); ok && httpErr.IsStatus(413) {
-			return portal.makeMediaBridgeFailureMessage(info, errors.New("proxy rejected too large file"), converted, nil)
+			return portal.makeMediaBridgeFailureMessage(info, errors.New("proxy rejected too large file"), converted, nil, "")
 		} else {
-			return portal.makeMediaBridgeFailureMessage(info, fmt.Errorf("failed to upload media: %w", err), converted, nil)
+			return portal.makeMediaBridgeFailureMessage(info, fmt.Errorf("failed to upload media: %w", err), converted, nil, "")
 		}
 	}
 	return converted

+ 1 - 1
provisioning.go

@@ -346,7 +346,7 @@ func (prov *ProvisioningAPI) OpenGroup(w http.ResponseWriter, r *http.Request) {
 		portal := user.GetPortalByJID(info.JID)
 		status := http.StatusOK
 		if len(portal.MXID) == 0 {
-			err = portal.CreateMatrixRoom(user, info, true)
+			err = portal.CreateMatrixRoom(user, info, true, true)
 			if err != nil {
 				jsonResponse(w, http.StatusInternalServerError, Error{
 					Error: fmt.Sprintf("Failed to create portal: %v", err),

+ 16 - 5
user.go

@@ -74,6 +74,8 @@ type User struct {
 	groupListCache     []*types.GroupInfo
 	groupListCacheLock sync.Mutex
 	groupListCacheTime time.Time
+
+	BackfillQueue *BackfillQueue
 }
 
 func (bridge *Bridge) getUserByMXID(userID id.UserID, onlyIfExists bool) *User {
@@ -186,7 +188,9 @@ func (bridge *Bridge) NewUser(dbUser *database.User) *User {
 	user.RelayWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelayWhitelisted(user.MXID)
 	user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID)
 	user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID)
-	go user.handleHistorySyncsLoop()
+	if user.bridge.Config.Bridge.HistorySync.Backfill {
+		go user.handleHistorySyncsLoop()
+	}
 	return user
 }
 
@@ -410,6 +414,11 @@ func (user *User) DeleteSession() {
 		user.JID = types.EmptyJID
 		user.Update()
 	}
+
+	// Delete all of the backfill and history sync data.
+	user.bridge.DB.BackfillQuery.DeleteAll(user.MXID)
+	user.bridge.DB.HistorySyncQuery.DeleteAllConversations(user.MXID)
+	user.bridge.DB.HistorySyncQuery.DeleteAllMessages(user.MXID)
 }
 
 func (user *User) IsConnected() bool {
@@ -685,7 +694,9 @@ func (user *User) HandleEvent(event interface{}) {
 		portal := user.GetPortalByMessageSource(v.Info.MessageSource)
 		portal.messages <- PortalMessage{undecryptable: v, source: user}
 	case *events.HistorySync:
-		user.historySyncs <- v
+		if user.bridge.Config.Bridge.HistorySync.Backfill {
+			user.historySyncs <- v
+		}
 	case *events.Mute:
 		portal := user.GetPortalByJID(v.JID)
 		if portal != nil {
@@ -942,7 +953,7 @@ func (user *User) ResyncGroups(createPortals bool) error {
 		portal := user.GetPortalByJID(group.JID)
 		if len(portal.MXID) == 0 {
 			if createPortals {
-				err = portal.CreateMatrixRoom(user, group, true)
+				err = portal.CreateMatrixRoom(user, group, true, true)
 				if err != nil {
 					return fmt.Errorf("failed to create room for %s: %w", group.JID, err)
 				}
@@ -1025,7 +1036,7 @@ func (user *User) markSelfReadFull(portal *Portal) {
 func (user *User) handleGroupCreate(evt *events.JoinedGroup) {
 	portal := user.GetPortalByJID(evt.JID)
 	if len(portal.MXID) == 0 {
-		err := portal.CreateMatrixRoom(user, &evt.GroupInfo, true)
+		err := portal.CreateMatrixRoom(user, &evt.GroupInfo, true, true)
 		if err != nil {
 			user.log.Errorln("Failed to create Matrix room after join notification: %v", err)
 		}
@@ -1093,7 +1104,7 @@ func (user *User) StartPM(jid types.JID, reason string) (*Portal, *Puppet, bool,
 			return portal, puppet, false, nil
 		}
 	}
-	err := portal.CreateMatrixRoom(user, nil, false)
+	err := portal.CreateMatrixRoom(user, nil, false, true)
 	return portal, puppet, true, err
 }