|
@@ -114,6 +114,12 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ allMsgs := user.bridge.DB.HistorySyncQuery.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents)
|
|
|
|
+ if len(allMsgs) == 0 {
|
|
|
|
+ user.log.Debugfln("Not backfilling %s: no bridgeable messages found", portal.Key.JID)
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
if len(portal.MXID) == 0 {
|
|
if len(portal.MXID) == 0 {
|
|
user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling")
|
|
user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling")
|
|
err := portal.CreateMatrixRoom(user, nil, true, false)
|
|
err := portal.CreateMatrixRoom(user, nil, true, false)
|
|
@@ -123,46 +129,41 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- allMsgs := user.bridge.DB.HistorySyncQuery.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents)
|
|
|
|
-
|
|
|
|
- if len(allMsgs) > 0 {
|
|
|
|
- user.log.Debugfln("Backfilling %d messages in %s, %d messages at a time", len(allMsgs), portal.Key.JID, req.MaxBatchEvents)
|
|
|
|
- toBackfill := allMsgs[0:]
|
|
|
|
- insertionEventIds := []id.EventID{}
|
|
|
|
- for {
|
|
|
|
- if len(toBackfill) == 0 {
|
|
|
|
- break
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- var msgs []*waProto.WebMessageInfo
|
|
|
|
- if len(toBackfill) <= req.MaxBatchEvents {
|
|
|
|
- msgs = toBackfill
|
|
|
|
- toBackfill = toBackfill[0:0]
|
|
|
|
- } else {
|
|
|
|
- msgs = toBackfill[len(toBackfill)-req.MaxBatchEvents:]
|
|
|
|
- toBackfill = toBackfill[:len(toBackfill)-req.MaxBatchEvents]
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if len(msgs) > 0 {
|
|
|
|
- time.Sleep(time.Duration(req.BatchDelay) * time.Second)
|
|
|
|
- user.log.Debugfln("Backfilling %d messages in %s", len(msgs), portal.Key.JID)
|
|
|
|
- insertionEventIds = append(insertionEventIds, portal.backfill(user, msgs)...)
|
|
|
|
- }
|
|
|
|
|
|
+ user.log.Debugfln("Backfilling %d messages in %s, %d messages at a time", len(allMsgs), portal.Key.JID, req.MaxBatchEvents)
|
|
|
|
+ toBackfill := allMsgs[0:]
|
|
|
|
+ var insertionEventIds []id.EventID
|
|
|
|
+ for {
|
|
|
|
+ if len(toBackfill) == 0 {
|
|
|
|
+ break
|
|
}
|
|
}
|
|
- user.log.Debugfln("Finished backfilling %d messages in %s", len(allMsgs), portal.Key.JID)
|
|
|
|
- if len(insertionEventIds) > 0 {
|
|
|
|
- portal.sendPostBackfillDummy(
|
|
|
|
- time.Unix(int64(allMsgs[len(allMsgs)-1].GetMessageTimestamp()), 0),
|
|
|
|
- insertionEventIds[0])
|
|
|
|
|
|
+
|
|
|
|
+ var msgs []*waProto.WebMessageInfo
|
|
|
|
+ if len(toBackfill) <= req.MaxBatchEvents {
|
|
|
|
+ msgs = toBackfill
|
|
|
|
+ toBackfill = toBackfill[0:0]
|
|
|
|
+ } else {
|
|
|
|
+ msgs = toBackfill[len(toBackfill)-req.MaxBatchEvents:]
|
|
|
|
+ toBackfill = toBackfill[:len(toBackfill)-req.MaxBatchEvents]
|
|
}
|
|
}
|
|
- user.log.Debugfln("Deleting %d history sync messages after backfilling", len(allMsgs))
|
|
|
|
- err := user.bridge.DB.HistorySyncQuery.DeleteMessages(user.MXID, conv.ConversationID, allMsgs)
|
|
|
|
- if err != nil {
|
|
|
|
- user.log.Warnfln("Failed to delete %d history sync messages after backfilling: %v", len(allMsgs), err)
|
|
|
|
|
|
+
|
|
|
|
+ if len(msgs) > 0 {
|
|
|
|
+ time.Sleep(time.Duration(req.BatchDelay) * time.Second)
|
|
|
|
+ user.log.Debugfln("Backfilling %d messages in %s (queue ID: %d)", len(msgs), portal.Key.JID, req.QueueID)
|
|
|
|
+ insertionEventIds = append(insertionEventIds, portal.backfill(user, msgs)...)
|
|
}
|
|
}
|
|
- } else {
|
|
|
|
- user.log.Debugfln("Not backfilling %s: no bridgeable messages found", portal.Key.JID)
|
|
|
|
}
|
|
}
|
|
|
|
+ user.log.Debugfln("Finished backfilling %d messages in %s (queue ID: %d)", len(allMsgs), portal.Key.JID, req.QueueID)
|
|
|
|
+ if len(insertionEventIds) > 0 {
|
|
|
|
+ portal.sendPostBackfillDummy(
|
|
|
|
+ time.Unix(int64(allMsgs[len(allMsgs)-1].GetMessageTimestamp()), 0),
|
|
|
|
+ insertionEventIds[0])
|
|
|
|
+ }
|
|
|
|
+ user.log.Debugfln("Deleting %d history sync messages after backfilling", len(allMsgs))
|
|
|
|
+ err := user.bridge.DB.HistorySyncQuery.DeleteMessages(user.MXID, conv.ConversationID, allMsgs)
|
|
|
|
+ if err != nil {
|
|
|
|
+ user.log.Warnfln("Failed to delete %d history sync messages after backfilling: %v", len(allMsgs), err)
|
|
|
|
+ }
|
|
|
|
+
|
|
if !conv.MarkedAsUnread && conv.UnreadCount == 0 {
|
|
if !conv.MarkedAsUnread && conv.UnreadCount == 0 {
|
|
user.markSelfReadFull(portal)
|
|
user.markSelfReadFull(portal)
|
|
}
|
|
}
|