|
@@ -78,10 +78,10 @@ func (user *User) handleHistorySyncsLoop() {
|
|
|
|
|
|
func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Backfill) {
|
|
func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Backfill) {
|
|
for req := range backfillRequests {
|
|
for req := range backfillRequests {
|
|
- user.log.Infof("Backfill request: %v", req)
|
|
|
|
|
|
+ user.log.Debugfln("Handling backfill request %#v", req)
|
|
conv := user.bridge.DB.HistorySyncQuery.GetConversation(user.MXID, req.Portal)
|
|
conv := user.bridge.DB.HistorySyncQuery.GetConversation(user.MXID, req.Portal)
|
|
if conv == nil {
|
|
if conv == nil {
|
|
- user.log.Errorf("Could not find conversation for %s in %s", user.MXID, req.Portal.String())
|
|
|
|
|
|
+ user.log.Debugfln("Could not find history sync conversation data for %s", req.Portal.String())
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
|
|
|
|
@@ -118,17 +118,15 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill
|
|
user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling")
|
|
user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling")
|
|
err := portal.CreateMatrixRoom(user, nil, true, false)
|
|
err := portal.CreateMatrixRoom(user, nil, true, false)
|
|
if err != nil {
|
|
if err != nil {
|
|
- user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err)
|
|
|
|
|
|
+ user.log.Errorfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- } else {
|
|
|
|
- portal.UpdateMatrixRoom(user, nil)
|
|
|
|
}
|
|
}
|
|
|
|
|
|
allMsgs := user.bridge.DB.HistorySyncQuery.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents)
|
|
allMsgs := user.bridge.DB.HistorySyncQuery.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents)
|
|
|
|
|
|
if len(allMsgs) > 0 {
|
|
if len(allMsgs) > 0 {
|
|
- user.log.Debugf("Backfilling %d messages in %s, %d messages at a time", len(allMsgs), portal.Key.JID, req.MaxBatchEvents)
|
|
|
|
|
|
+ user.log.Debugfln("Backfilling %d messages in %s, %d messages at a time", len(allMsgs), portal.Key.JID, req.MaxBatchEvents)
|
|
toBackfill := allMsgs[0:]
|
|
toBackfill := allMsgs[0:]
|
|
insertionEventIds := []id.EventID{}
|
|
insertionEventIds := []id.EventID{}
|
|
for {
|
|
for {
|
|
@@ -147,20 +145,20 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill
|
|
|
|
|
|
if len(msgs) > 0 {
|
|
if len(msgs) > 0 {
|
|
time.Sleep(time.Duration(req.BatchDelay) * time.Second)
|
|
time.Sleep(time.Duration(req.BatchDelay) * time.Second)
|
|
- user.log.Debugf("Backfilling %d messages in %s", len(msgs), portal.Key.JID)
|
|
|
|
|
|
+ user.log.Debugfln("Backfilling %d messages in %s", len(msgs), portal.Key.JID)
|
|
insertionEventIds = append(insertionEventIds, portal.backfill(user, msgs)...)
|
|
insertionEventIds = append(insertionEventIds, portal.backfill(user, msgs)...)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- user.log.Debugf("Finished backfilling %d messages in %s", len(allMsgs), portal.Key.JID)
|
|
|
|
|
|
+ user.log.Debugfln("Finished backfilling %d messages in %s", len(allMsgs), portal.Key.JID)
|
|
if len(insertionEventIds) > 0 {
|
|
if len(insertionEventIds) > 0 {
|
|
portal.sendPostBackfillDummy(
|
|
portal.sendPostBackfillDummy(
|
|
time.Unix(int64(allMsgs[len(allMsgs)-1].GetMessageTimestamp()), 0),
|
|
time.Unix(int64(allMsgs[len(allMsgs)-1].GetMessageTimestamp()), 0),
|
|
insertionEventIds[0])
|
|
insertionEventIds[0])
|
|
}
|
|
}
|
|
- user.log.Debugf("Deleting %d history sync messages after backfilling", len(allMsgs))
|
|
|
|
|
|
+ user.log.Debugfln("Deleting %d history sync messages after backfilling", len(allMsgs))
|
|
err := user.bridge.DB.HistorySyncQuery.DeleteMessages(user.MXID, conv.ConversationID, allMsgs)
|
|
err := user.bridge.DB.HistorySyncQuery.DeleteMessages(user.MXID, conv.ConversationID, allMsgs)
|
|
if err != nil {
|
|
if err != nil {
|
|
- user.log.Warnf("Failed to delete %d history sync messages after backfilling: %v", len(allMsgs), err)
|
|
|
|
|
|
+ user.log.Warnfln("Failed to delete %d history sync messages after backfilling: %v", len(allMsgs), err)
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
user.log.Debugfln("Not backfilling %s: no bridgeable messages found", portal.Key.JID)
|
|
user.log.Debugfln("Not backfilling %s: no bridgeable messages found", portal.Key.JID)
|
|
@@ -198,6 +196,9 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History
|
|
if err != nil {
|
|
if err != nil {
|
|
user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err)
|
|
user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err)
|
|
continue
|
|
continue
|
|
|
|
+ } else if jid.Server == types.BroadcastServer {
|
|
|
|
+ user.log.Debugfln("Skipping broadcast list %s in history sync", jid)
|
|
|
|
+ continue
|
|
}
|
|
}
|
|
portal := user.GetPortalByJID(jid)
|
|
portal := user.GetPortalByJID(jid)
|
|
|
|
|
|
@@ -231,7 +232,7 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History
|
|
|
|
|
|
message, err := user.bridge.DB.HistorySyncQuery.NewMessageWithValues(user.MXID, conv.GetId(), msg.Message.GetKey().GetId(), msg)
|
|
message, err := user.bridge.DB.HistorySyncQuery.NewMessageWithValues(user.MXID, conv.GetId(), msg.Message.GetKey().GetId(), msg)
|
|
if err != nil {
|
|
if err != nil {
|
|
- user.log.Warnf("Failed to save message %s in %s. Error: %+v", msg.Message.Key.Id, conv.GetId(), err)
|
|
|
|
|
|
+ user.log.Warnfln("Failed to save message %s in %s. Error: %+v", msg.Message.Key.Id, conv.GetId(), err)
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
message.Insert()
|
|
message.Insert()
|
|
@@ -304,10 +305,8 @@ var (
|
|
PortalCreationDummyEvent = event.Type{Type: "fi.mau.dummy.portal_created", Class: event.MessageEventType}
|
|
PortalCreationDummyEvent = event.Type{Type: "fi.mau.dummy.portal_created", Class: event.MessageEventType}
|
|
PreBackfillDummyEvent = event.Type{Type: "fi.mau.dummy.pre_backfill", Class: event.MessageEventType}
|
|
PreBackfillDummyEvent = event.Type{Type: "fi.mau.dummy.pre_backfill", Class: event.MessageEventType}
|
|
|
|
|
|
- // Marker events for when a backfill finishes
|
|
|
|
BackfillEndDummyEvent = event.Type{Type: "fi.mau.dummy.backfill_end", Class: event.MessageEventType}
|
|
BackfillEndDummyEvent = event.Type{Type: "fi.mau.dummy.backfill_end", Class: event.MessageEventType}
|
|
- RoomMarker = event.Type{Type: "m.room.marker", Class: event.MessageEventType}
|
|
|
|
- MSC2716Marker = event.Type{Type: "org.matrix.msc2716.marker", Class: event.MessageEventType}
|
|
|
|
|
|
+ HistorySyncMarker = event.Type{Type: "org.matrix.msc2716.marker", Class: event.MessageEventType}
|
|
)
|
|
)
|
|
|
|
|
|
func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) []id.EventID {
|
|
func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) []id.EventID {
|
|
@@ -537,7 +536,7 @@ func (portal *Portal) wrapBatchEvent(info *types.MessageInfo, intent *appservice
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
|
|
- if eventType == event.EventEncrypted {
|
|
|
|
|
|
+ if newEventType == event.EventEncrypted {
|
|
// Clear other custom keys if the event was encrypted, but keep the double puppet identifier
|
|
// Clear other custom keys if the event was encrypted, but keep the double puppet identifier
|
|
wrappedContent.Raw = map[string]interface{}{backfillIDField: info.ID}
|
|
wrappedContent.Raw = map[string]interface{}{backfillIDField: info.ID}
|
|
if intent.IsCustomPuppet {
|
|
if intent.IsCustomPuppet {
|
|
@@ -598,10 +597,10 @@ func (portal *Portal) finishBatchEvt(info *wrappedInfo, eventID id.EventID) {
|
|
}
|
|
}
|
|
|
|
|
|
func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time, insertionEventId id.EventID) {
|
|
func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time, insertionEventId id.EventID) {
|
|
- for _, evtType := range []event.Type{BackfillEndDummyEvent, RoomMarker, MSC2716Marker} {
|
|
|
|
|
|
+ for _, evtType := range []event.Type{BackfillEndDummyEvent, HistorySyncMarker} {
|
|
resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, evtType, map[string]interface{}{
|
|
resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, evtType, map[string]interface{}{
|
|
"org.matrix.msc2716.marker.insertion": insertionEventId,
|
|
"org.matrix.msc2716.marker.insertion": insertionEventId,
|
|
- "m.marker.insertion": insertionEventId,
|
|
|
|
|
|
+ //"m.marker.insertion": insertionEventId,
|
|
})
|
|
})
|
|
if err != nil {
|
|
if err != nil {
|
|
portal.log.Errorln("Error sending post-backfill dummy event:", err)
|
|
portal.log.Errorln("Error sending post-backfill dummy event:", err)
|