|
@@ -29,7 +29,6 @@ import (
|
|
|
|
|
|
"maunium.net/go/mautrix"
|
|
|
"maunium.net/go/mautrix/appservice"
|
|
|
- "maunium.net/go/mautrix/bridge/bridgeconfig"
|
|
|
"maunium.net/go/mautrix/event"
|
|
|
"maunium.net/go/mautrix/id"
|
|
|
"maunium.net/go/mautrix/util/dbutil"
|
|
@@ -242,8 +241,8 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor
|
|
|
portal.backfillLock.Lock()
|
|
|
defer portal.backfillLock.Unlock()
|
|
|
|
|
|
- if !user.shouldCreatePortalForHistorySync(conv, portal) {
|
|
|
- return
|
|
|
+ if len(portal.MXID) > 0 && !user.bridge.AS.StateStore.IsInRoom(portal.MXID, user.MXID) {
|
|
|
+ portal.ensureUserInvited(user)
|
|
|
}
|
|
|
|
|
|
backfillState := user.bridge.DB.Backfill.GetBackfillState(user.MXID, &portal.Key)
|
|
@@ -253,19 +252,17 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor
|
|
|
backfillState.SetProcessingBatch(true)
|
|
|
defer backfillState.SetProcessingBatch(false)
|
|
|
|
|
|
- var forwardPrevID id.EventID
|
|
|
var timeEnd *time.Time
|
|
|
- var isLatestEvents, shouldMarkAsRead, shouldAtomicallyMarkAsRead bool
|
|
|
+ var forward, shouldMarkAsRead bool
|
|
|
portal.latestEventBackfillLock.Lock()
|
|
|
if req.BackfillType == database.BackfillForward {
|
|
|
// TODO this overrides the TimeStart set when enqueuing the backfill
|
|
|
// maybe the enqueue should instead include the prev event ID
|
|
|
lastMessage := portal.bridge.DB.Message.GetLastInChat(portal.Key)
|
|
|
- forwardPrevID = lastMessage.MXID
|
|
|
start := lastMessage.Timestamp.Add(1 * time.Second)
|
|
|
req.TimeStart = &start
|
|
|
// Sending events at the end of the room (= latest events)
|
|
|
- isLatestEvents = true
|
|
|
+ forward = true
|
|
|
} else {
|
|
|
firstMessage := portal.bridge.DB.Message.GetFirstInChat(portal.Key)
|
|
|
if firstMessage != nil {
|
|
@@ -274,10 +271,10 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor
|
|
|
user.log.Debugfln("Limiting backfill to end at %v", end)
|
|
|
} else {
|
|
|
// Portal is empty -> events are latest
|
|
|
- isLatestEvents = true
|
|
|
+ forward = true
|
|
|
}
|
|
|
}
|
|
|
- if !isLatestEvents {
|
|
|
+ if !forward {
|
|
|
// We'll use normal batch sending, so no need to keep blocking new message processing
|
|
|
portal.latestEventBackfillLock.Unlock()
|
|
|
} else {
|
|
@@ -288,7 +285,6 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor
|
|
|
isUnread := conv.MarkedAsUnread || conv.UnreadCount > 0
|
|
|
isTooOld := user.bridge.Config.Bridge.HistorySync.UnreadHoursThreshold > 0 && conv.LastMessageTimestamp.Before(time.Now().Add(time.Duration(-user.bridge.Config.Bridge.HistorySync.UnreadHoursThreshold)*time.Hour))
|
|
|
shouldMarkAsRead = !isUnread || isTooOld
|
|
|
- shouldAtomicallyMarkAsRead = shouldMarkAsRead && user.bridge.Config.Homeserver.Software == bridgeconfig.SoftwareHungry
|
|
|
}
|
|
|
allMsgs := user.bridge.DB.HistorySync.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, timeEnd, req.MaxTotalEvents)
|
|
|
|
|
@@ -347,7 +343,6 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor
|
|
|
|
|
|
user.log.Infofln("Backfilling %d messages in %s, %d messages at a time (queue ID: %d)", len(allMsgs), portal.Key.JID, req.MaxBatchEvents, req.QueueID)
|
|
|
toBackfill := allMsgs[0:]
|
|
|
- var insertionEventIds []id.EventID
|
|
|
for len(toBackfill) > 0 {
|
|
|
var msgs []*waProto.WebMessageInfo
|
|
|
if len(toBackfill) <= req.MaxBatchEvents || req.MaxBatchEvents < 0 {
|
|
@@ -361,19 +356,10 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor
|
|
|
if len(msgs) > 0 {
|
|
|
time.Sleep(time.Duration(req.BatchDelay) * time.Second)
|
|
|
user.log.Debugfln("Backfilling %d messages in %s (queue ID: %d)", len(msgs), portal.Key.JID, req.QueueID)
|
|
|
- resp := portal.backfill(user, msgs, req.BackfillType == database.BackfillForward, isLatestEvents, shouldAtomicallyMarkAsRead, forwardPrevID)
|
|
|
- if resp != nil && (resp.BaseInsertionEventID != "" || !isLatestEvents) {
|
|
|
- insertionEventIds = append(insertionEventIds, resp.BaseInsertionEventID)
|
|
|
- }
|
|
|
+ portal.backfill(user, msgs, forward, shouldMarkAsRead)
|
|
|
}
|
|
|
}
|
|
|
user.log.Debugfln("Finished backfilling %d messages in %s (queue ID: %d)", len(allMsgs), portal.Key.JID, req.QueueID)
|
|
|
- if len(insertionEventIds) > 0 {
|
|
|
- portal.sendPostBackfillDummy(
|
|
|
- time.Unix(int64(allMsgs[0].GetMessageTimestamp()), 0),
|
|
|
- insertionEventIds[0])
|
|
|
- }
|
|
|
- user.log.Debugfln("Deleting %d history sync messages after backfilling (queue ID: %d)", len(allMsgs), req.QueueID)
|
|
|
err := user.bridge.DB.HistorySync.DeleteMessages(user.MXID, conv.ConversationID, allMsgs)
|
|
|
if err != nil {
|
|
|
user.log.Warnfln("Failed to delete %d history sync messages after backfilling (queue ID: %d): %v", len(allMsgs), req.QueueID, err)
|
|
@@ -399,30 +385,6 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor
|
|
|
backfillState.Upsert()
|
|
|
portal.updateBackfillStatus(backfillState)
|
|
|
}
|
|
|
-
|
|
|
- if isLatestEvents && !shouldAtomicallyMarkAsRead {
|
|
|
- if shouldMarkAsRead {
|
|
|
- user.markSelfReadFull(portal)
|
|
|
- } else if conv.MarkedAsUnread && user.bridge.Config.Bridge.SyncManualMarkedUnread {
|
|
|
- user.markUnread(portal, true)
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func (user *User) shouldCreatePortalForHistorySync(conv *database.HistorySyncConversation, portal *Portal) bool {
|
|
|
- if len(portal.MXID) > 0 {
|
|
|
- if !user.bridge.AS.StateStore.IsInRoom(portal.MXID, user.MXID) {
|
|
|
- portal.ensureUserInvited(user)
|
|
|
- }
|
|
|
- // Portal exists, let backfill continue
|
|
|
- return true
|
|
|
- } else if !user.bridge.Config.Bridge.HistorySync.CreatePortals {
|
|
|
- user.log.Debugfln("Not creating portal for %s: creating rooms from history sync is disabled", portal.Key.JID)
|
|
|
- return false
|
|
|
- } else {
|
|
|
- // Portal doesn't exist, but should be created
|
|
|
- return true
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
func (user *User) storeHistorySync(evt *waProto.HistorySync) {
|
|
@@ -627,69 +589,20 @@ func (portal *Portal) deterministicEventID(sender types.JID, messageID types.Mes
|
|
|
|
|
|
var (
|
|
|
PortalCreationDummyEvent = event.Type{Type: "fi.mau.dummy.portal_created", Class: event.MessageEventType}
|
|
|
- PreBackfillDummyEvent = event.Type{Type: "fi.mau.dummy.pre_backfill", Class: event.MessageEventType}
|
|
|
-
|
|
|
- HistorySyncMarker = event.Type{Type: "org.matrix.msc2716.marker", Class: event.MessageEventType}
|
|
|
|
|
|
BackfillStatusEvent = event.Type{Type: "com.beeper.backfill_status", Class: event.StateEventType}
|
|
|
)
|
|
|
|
|
|
-func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo, isForward, isLatest, atomicMarkAsRead bool, prevEventID id.EventID) *mautrix.RespBatchSend {
|
|
|
- var req mautrix.ReqBatchSend
|
|
|
+func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo, isForward, atomicMarkAsRead bool) *mautrix.RespBeeperBatchSend {
|
|
|
+ var req mautrix.ReqBeeperBatchSend
|
|
|
var infos []*wrappedInfo
|
|
|
|
|
|
- if !isForward {
|
|
|
- if portal.FirstEventID != "" || portal.NextBatchID != "" {
|
|
|
- req.PrevEventID = portal.FirstEventID
|
|
|
- req.BatchID = portal.NextBatchID
|
|
|
- } else {
|
|
|
- portal.log.Warnfln("Can't backfill %d messages through %s to chat: first event ID not known", len(messages), source.MXID)
|
|
|
- return nil
|
|
|
- }
|
|
|
- } else {
|
|
|
- req.PrevEventID = prevEventID
|
|
|
- }
|
|
|
- req.BeeperNewMessages = isLatest && req.BatchID == ""
|
|
|
+ req.Forward = isForward
|
|
|
if atomicMarkAsRead {
|
|
|
- req.BeeperMarkReadBy = source.MXID
|
|
|
+ req.MarkReadBy = source.MXID
|
|
|
}
|
|
|
|
|
|
- beforeFirstMessageTimestampMillis := (int64(messages[len(messages)-1].GetMessageTimestamp()) * 1000) - 1
|
|
|
- req.StateEventsAtStart = make([]*event.Event, 0)
|
|
|
-
|
|
|
- addedMembers := make(map[id.UserID]struct{})
|
|
|
- addMember := func(puppet *Puppet) {
|
|
|
- if portal.bridge.Config.Homeserver.Software == bridgeconfig.SoftwareHungry {
|
|
|
- // Hungryserv doesn't need state_events_at_start, it can figure out memberships automatically
|
|
|
- return
|
|
|
- } else if _, alreadyAdded := addedMembers[puppet.MXID]; alreadyAdded {
|
|
|
- return
|
|
|
- }
|
|
|
- mxid := puppet.MXID.String()
|
|
|
- content := event.MemberEventContent{
|
|
|
- Membership: event.MembershipJoin,
|
|
|
- Displayname: puppet.Displayname,
|
|
|
- AvatarURL: puppet.AvatarURL.CUString(),
|
|
|
- }
|
|
|
- inviteContent := content
|
|
|
- inviteContent.Membership = event.MembershipInvite
|
|
|
- req.StateEventsAtStart = append(req.StateEventsAtStart, &event.Event{
|
|
|
- Type: event.StateMember,
|
|
|
- Sender: portal.MainIntent().UserID,
|
|
|
- StateKey: &mxid,
|
|
|
- Timestamp: beforeFirstMessageTimestampMillis,
|
|
|
- Content: event.Content{Parsed: &inviteContent},
|
|
|
- }, &event.Event{
|
|
|
- Type: event.StateMember,
|
|
|
- Sender: puppet.MXID,
|
|
|
- StateKey: &mxid,
|
|
|
- Timestamp: beforeFirstMessageTimestampMillis,
|
|
|
- Content: event.Content{Parsed: &content},
|
|
|
- })
|
|
|
- addedMembers[puppet.MXID] = struct{}{}
|
|
|
- }
|
|
|
-
|
|
|
- portal.log.Infofln("Processing history sync with %d messages (forward: %t, latest: %t, prev: %s, batch: %s)", len(messages), isForward, isLatest, req.PrevEventID, req.BatchID)
|
|
|
+ portal.log.Infofln("Processing history sync with %d messages (forward: %t)", len(messages), isForward)
|
|
|
// The messages are ordered newest to oldest, so iterate them in reverse order.
|
|
|
for i := len(messages) - 1; i >= 0; i-- {
|
|
|
webMsg := messages[i]
|
|
@@ -720,19 +633,12 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo,
|
|
|
if puppet == nil {
|
|
|
continue
|
|
|
}
|
|
|
- intent := puppet.IntentFor(portal)
|
|
|
- if intent.IsCustomPuppet && !portal.bridge.Config.CanDoublePuppetBackfill(puppet.CustomMXID) {
|
|
|
- intent = puppet.DefaultIntent()
|
|
|
- }
|
|
|
|
|
|
- converted := portal.convertMessage(intent, source, &msgEvt.Info, msgEvt.Message, true)
|
|
|
+ converted := portal.convertMessage(puppet.IntentFor(portal), source, &msgEvt.Info, msgEvt.Message, true)
|
|
|
if converted == nil {
|
|
|
portal.log.Debugfln("Skipping unsupported message %s in backfill", msgEvt.Info.ID)
|
|
|
continue
|
|
|
}
|
|
|
- if !intent.IsCustomPuppet && !portal.bridge.StateStore.IsInRoom(portal.MXID, puppet.MXID) {
|
|
|
- addMember(puppet)
|
|
|
- }
|
|
|
if converted.ReplyTo != nil {
|
|
|
portal.SetReply(converted.Content, converted.ReplyTo, true)
|
|
|
}
|
|
@@ -747,15 +653,7 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo,
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
- if len(req.BatchID) == 0 || isForward {
|
|
|
- portal.log.Debugln("Sending a dummy event to avoid forward extremity errors with backfill")
|
|
|
- _, err := portal.MainIntent().SendMessageEvent(portal.MXID, PreBackfillDummyEvent, struct{}{})
|
|
|
- if err != nil {
|
|
|
- portal.log.Warnln("Error sending pre-backfill dummy event:", err)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- resp, err := portal.MainIntent().BatchSend(portal.MXID, &req)
|
|
|
+ resp, err := portal.MainIntent().BeeperBatchSend(portal.MXID, &req)
|
|
|
if err != nil {
|
|
|
portal.log.Errorln("Error batch sending messages:", err)
|
|
|
return nil
|
|
@@ -766,12 +664,7 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo,
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
- // Do the following block in the transaction
|
|
|
- {
|
|
|
- portal.finishBatch(txn, resp.EventIDs, infos)
|
|
|
- portal.NextBatchID = resp.NextBatchID
|
|
|
- portal.Update(txn)
|
|
|
- }
|
|
|
+ portal.finishBatch(txn, resp.EventIDs, infos)
|
|
|
|
|
|
err = txn.Commit()
|
|
|
if err != nil {
|
|
@@ -846,19 +739,16 @@ func (portal *Portal) appendBatchEvents(source *User, converted *ConvertedMessag
|
|
|
*infoArray = append(*infoArray, nil)
|
|
|
}
|
|
|
}
|
|
|
- // Sending reactions in the same batch requires deterministic event IDs, so only do it on hungryserv
|
|
|
- if portal.bridge.Config.Homeserver.Software == bridgeconfig.SoftwareHungry {
|
|
|
- for _, reaction := range raw.GetReactions() {
|
|
|
- reactionEvent, reactionInfo := portal.wrapBatchReaction(source, reaction, mainEvt.ID, info.Timestamp)
|
|
|
- if reactionEvent != nil {
|
|
|
- *eventsArray = append(*eventsArray, reactionEvent)
|
|
|
- *infoArray = append(*infoArray, &wrappedInfo{
|
|
|
- MessageInfo: reactionInfo,
|
|
|
- SenderMXID: reactionEvent.Sender,
|
|
|
- ReactionTarget: info.ID,
|
|
|
- Type: database.MsgReaction,
|
|
|
- })
|
|
|
- }
|
|
|
+ for _, reaction := range raw.GetReactions() {
|
|
|
+ reactionEvent, reactionInfo := portal.wrapBatchReaction(source, reaction, mainEvt.ID, info.Timestamp)
|
|
|
+ if reactionEvent != nil {
|
|
|
+ *eventsArray = append(*eventsArray, reactionEvent)
|
|
|
+ *infoArray = append(*infoArray, &wrappedInfo{
|
|
|
+ MessageInfo: reactionInfo,
|
|
|
+ SenderMXID: reactionEvent.Sender,
|
|
|
+ ReactionTarget: info.ID,
|
|
|
+ Type: database.MsgReaction,
|
|
|
+ })
|
|
|
}
|
|
|
}
|
|
|
return nil
|
|
@@ -923,13 +813,8 @@ func (portal *Portal) wrapBatchEvent(info *types.MessageInfo, intent *appservice
|
|
|
return nil, err
|
|
|
}
|
|
|
intent.AddDoublePuppetValue(&wrappedContent)
|
|
|
- var eventID id.EventID
|
|
|
- if portal.bridge.Config.Homeserver.Software == bridgeconfig.SoftwareHungry {
|
|
|
- eventID = portal.deterministicEventID(info.Sender, info.ID, partName)
|
|
|
- }
|
|
|
-
|
|
|
return &event.Event{
|
|
|
- ID: eventID,
|
|
|
+ ID: portal.deterministicEventID(info.Sender, info.ID, partName),
|
|
|
Sender: intent.UserID,
|
|
|
Type: newEventType,
|
|
|
Timestamp: info.Timestamp.UnixMilli(),
|
|
@@ -956,33 +841,13 @@ func (portal *Portal) finishBatch(txn dbutil.Transaction, eventIDs []id.EventID,
|
|
|
portal.log.Infofln("Successfully sent %d events", len(eventIDs))
|
|
|
}
|
|
|
|
|
|
-func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time, insertionEventId id.EventID) {
|
|
|
- resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, HistorySyncMarker, map[string]interface{}{
|
|
|
- "org.matrix.msc2716.marker.insertion": insertionEventId,
|
|
|
- //"m.marker.insertion": insertionEventId,
|
|
|
- })
|
|
|
- if err != nil {
|
|
|
- portal.log.Errorln("Error sending post-backfill dummy event:", err)
|
|
|
- return
|
|
|
- }
|
|
|
- msg := portal.bridge.DB.Message.New()
|
|
|
- msg.Chat = portal.Key
|
|
|
- msg.MXID = resp.EventID
|
|
|
- msg.SenderMXID = portal.MainIntent().UserID
|
|
|
- msg.JID = types.MessageID(resp.EventID)
|
|
|
- msg.Timestamp = lastTimestamp.Add(1 * time.Second)
|
|
|
- msg.Sent = true
|
|
|
- msg.Type = database.MsgFake
|
|
|
- msg.Insert(nil)
|
|
|
-}
|
|
|
-
|
|
|
func (portal *Portal) updateBackfillStatus(backfillState *database.BackfillState) {
|
|
|
backfillStatus := "backfilling"
|
|
|
if backfillState.BackfillComplete {
|
|
|
backfillStatus = "complete"
|
|
|
}
|
|
|
|
|
|
- _, err := portal.MainIntent().SendStateEvent(portal.MXID, BackfillStatusEvent, "", map[string]interface{}{
|
|
|
+ _, err := portal.bridge.Bot.SendStateEvent(portal.MXID, BackfillStatusEvent, "", map[string]interface{}{
|
|
|
"status": backfillStatus,
|
|
|
"first_timestamp": backfillState.FirstExpectedTimestamp * 1000,
|
|
|
})
|