|
@@ -22,6 +22,7 @@ import (
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
"net/http"
|
|
|
+ "sort"
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
@@ -361,6 +362,22 @@ type portalToBackfill struct {
|
|
|
conv *waProto.Conversation
|
|
|
}
|
|
|
|
|
|
+type conversationList []*waProto.Conversation
|
|
|
+
|
|
|
+var _ sort.Interface = (conversationList)(nil)
|
|
|
+
|
|
|
+func (c conversationList) Len() int {
|
|
|
+ return len(c)
|
|
|
+}
|
|
|
+
|
|
|
+func (c conversationList) Less(i, j int) bool {
|
|
|
+ return c[i].GetConversationTimestamp() < c[j].GetConversationTimestamp()
|
|
|
+}
|
|
|
+
|
|
|
+func (c conversationList) Swap(i, j int) {
|
|
|
+ c[i], c[j] = c[j], c[i]
|
|
|
+}
|
|
|
+
|
|
|
func (user *User) handleHistorySync(evt *waProto.HistorySync) {
|
|
|
if evt.GetSyncType() != waProto.HistorySync_RECENT && evt.GetSyncType() != waProto.HistorySync_FULL {
|
|
|
return
|
|
@@ -369,8 +386,25 @@ func (user *User) handleHistorySync(evt *waProto.HistorySync) {
|
|
|
maxAge := user.bridge.Config.Bridge.HistorySync.MaxAge
|
|
|
minLastMsgToCreate := time.Now().Add(-time.Duration(maxAge) * time.Second)
|
|
|
createRooms := user.bridge.Config.Bridge.HistorySync.CreatePortals
|
|
|
- portalsToBackfill := make([]portalToBackfill, 0)
|
|
|
- for _, conv := range evt.GetConversations() {
|
|
|
+
|
|
|
+ conversations := conversationList(evt.GetConversations())
|
|
|
+ sort.Sort(sort.Reverse(conversations))
|
|
|
+ portalsToBackfill := make(chan portalToBackfill, len(conversations))
|
|
|
+
|
|
|
+ var backfillWait sync.WaitGroup
|
|
|
+ backfillWait.Add(1)
|
|
|
+ go func() {
|
|
|
+ for ptb := range portalsToBackfill {
|
|
|
+ user.log.Debugln("Bridging history sync payload for", ptb.portal.Key.JID)
|
|
|
+ ptb.portal.backfill(user, ptb.conv.GetMessages())
|
|
|
+ if !ptb.conv.GetMarkedAsUnread() && ptb.conv.GetUnreadCount() == 0 {
|
|
|
+ user.markSelfReadFull(ptb.portal)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ backfillWait.Done()
|
|
|
+ }()
|
|
|
+
|
|
|
+ for _, conv := range conversations {
|
|
|
jid, err := types.ParseJID(conv.GetId())
|
|
|
if err != nil {
|
|
|
user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err)
|
|
@@ -410,16 +444,11 @@ func (user *User) handleHistorySync(evt *waProto.HistorySync) {
|
|
|
} else if !user.bridge.Config.Bridge.HistorySync.Backfill {
|
|
|
user.log.Debugln("Backfill is disabled, not bridging history sync payload for", portal.Key.JID)
|
|
|
} else {
|
|
|
- portalsToBackfill = append(portalsToBackfill, portalToBackfill{portal, conv})
|
|
|
- }
|
|
|
- }
|
|
|
- for _, ptb := range portalsToBackfill {
|
|
|
- user.log.Debugln("Bridging history sync payload for", ptb.portal.Key.JID)
|
|
|
- ptb.portal.backfill(user, ptb.conv.GetMessages())
|
|
|
- if !ptb.conv.GetMarkedAsUnread() && ptb.conv.GetUnreadCount() == 0 {
|
|
|
- user.markSelfReadFull(ptb.portal)
|
|
|
+ portalsToBackfill <- portalToBackfill{portal, conv}
|
|
|
}
|
|
|
}
|
|
|
+ close(portalsToBackfill)
|
|
|
+ backfillWait.Wait()
|
|
|
user.log.Infofln("Finished handling history sync with type %s, chunk order %d, progress %d%%", evt.GetSyncType(), evt.GetChunkOrder(), evt.GetProgress())
|
|
|
}
|
|
|
|