|
@@ -28,6 +28,7 @@ import (
|
|
|
"maunium.net/go/mautrix/event"
|
|
|
"maunium.net/go/mautrix/id"
|
|
|
|
|
|
+ "maunium.net/go/mautrix-whatsapp/config"
|
|
|
"maunium.net/go/mautrix-whatsapp/database"
|
|
|
)
|
|
|
|
|
@@ -52,7 +53,7 @@ func (user *User) handleHistorySyncsLoop() {
|
|
|
reCheckQueue := make(chan bool, 1)
|
|
|
// Start the backfill queue.
|
|
|
user.BackfillQueue = &BackfillQueue{
|
|
|
- BackfillQuery: user.bridge.DB.BackfillQuery,
|
|
|
+ BackfillQuery: user.bridge.DB.Backfill,
|
|
|
ImmediateBackfillRequests: make(chan *database.Backfill, 1),
|
|
|
DeferredBackfillRequests: make(chan *database.Backfill, 1),
|
|
|
ReCheckQueue: make(chan bool, 1),
|
|
@@ -71,6 +72,11 @@ func (user *User) handleHistorySyncsLoop() {
|
|
|
go user.handleBackfillRequestsLoop(user.BackfillQueue.DeferredBackfillRequests)
|
|
|
go user.BackfillQueue.RunLoop(user)
|
|
|
|
|
|
+ if user.bridge.Config.Bridge.HistorySync.MediaRequests.AutoRequestMedia &&
|
|
|
+ user.bridge.Config.Bridge.HistorySync.MediaRequests.RequestMethod == config.MediaRequestMethodLocalTime {
|
|
|
+ go user.dailyMediaRequestLoop()
|
|
|
+ }
|
|
|
+
|
|
|
// Always save the history syncs for the user. If they want to enable
|
|
|
// backfilling in the future, we will have it in the database.
|
|
|
for evt := range user.historySyncs {
|
|
@@ -78,10 +84,56 @@ func (user *User) handleHistorySyncsLoop() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (user *User) dailyMediaRequestLoop() {
|
|
|
+ // Calculate when to do the first set of media retry requests
|
|
|
+ now := time.Now()
|
|
|
+ userTz, err := time.LoadLocation(user.Timezone)
|
|
|
+ if err != nil {
|
|
|
+ userTz = now.Local().Location()
|
|
|
+ }
|
|
|
+ tonightMidnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, userTz)
|
|
|
+ midnightOffset := time.Duration(user.bridge.Config.Bridge.HistorySync.MediaRequests.RequestLocalTime) * time.Minute
|
|
|
+ requestStartTime := tonightMidnight.Add(midnightOffset)
|
|
|
+
|
|
|
+ // If the request time for today has already happened, we need to start the
|
|
|
+ // request loop tomorrow instead.
|
|
|
+ if requestStartTime.Before(now) {
|
|
|
+ requestStartTime = requestStartTime.AddDate(0, 0, 1)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Wait to start the loop
|
|
|
+ user.log.Infof("Waiting until %s to do media retry requests", requestStartTime)
|
|
|
+ time.Sleep(time.Until(requestStartTime))
|
|
|
+
|
|
|
+ for {
|
|
|
+ mediaBackfillRequests := user.bridge.DB.MediaBackfillRequest.GetMediaBackfillRequestsForUser(user.MXID)
|
|
|
+ user.log.Infof("Sending %d media retry requests", len(mediaBackfillRequests))
|
|
|
+
|
|
|
+ // Send all of the media backfill requests for the user at once
|
|
|
+ for _, req := range mediaBackfillRequests {
|
|
|
+ portal := user.GetPortalByJID(req.PortalKey.JID)
|
|
|
+ _, err := portal.requestMediaRetry(user, req.EventID, req.MediaKey)
|
|
|
+ if err != nil {
|
|
|
+ user.log.Warnf("Failed to send media retry request for %s / %s", req.PortalKey.String(), req.EventID)
|
|
|
+ req.Status = database.MediaBackfillRequestStatusRequestFailed
|
|
|
+ req.Error = err.Error()
|
|
|
+ } else {
|
|
|
+ user.log.Debugfln("Sent media retry request for %s / %s", req.PortalKey.String(), req.EventID)
|
|
|
+ req.Status = database.MediaBackfillRequestStatusRequested
|
|
|
+ }
|
|
|
+ req.MediaKey = nil
|
|
|
+ req.Upsert()
|
|
|
+ }
|
|
|
+
|
|
|
+ // Wait for 24 hours before making requests again
|
|
|
+ time.Sleep(24 * time.Hour)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Backfill) {
|
|
|
for req := range backfillRequests {
|
|
|
user.log.Infofln("Handling backfill request %s", req)
|
|
|
- conv := user.bridge.DB.HistorySyncQuery.GetConversation(user.MXID, req.Portal)
|
|
|
+ conv := user.bridge.DB.HistorySync.GetConversation(user.MXID, req.Portal)
|
|
|
if conv == nil {
|
|
|
user.log.Debugfln("Could not find history sync conversation data for %s", req.Portal.String())
|
|
|
continue
|
|
@@ -132,7 +184,7 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor
|
|
|
user.log.Debugfln("Limiting backfill to end at %v", end)
|
|
|
}
|
|
|
}
|
|
|
- allMsgs := user.bridge.DB.HistorySyncQuery.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents)
|
|
|
+ allMsgs := user.bridge.DB.HistorySync.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents)
|
|
|
|
|
|
sendDisappearedNotice := false
|
|
|
// If expired messages are on, and a notice has not been sent to this chat
|
|
@@ -211,7 +263,7 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor
|
|
|
insertionEventIds[0])
|
|
|
}
|
|
|
user.log.Debugfln("Deleting %d history sync messages after backfilling (queue ID: %d)", len(allMsgs), req.QueueID)
|
|
|
- err := user.bridge.DB.HistorySyncQuery.DeleteMessages(user.MXID, conv.ConversationID, allMsgs)
|
|
|
+ err := user.bridge.DB.HistorySync.DeleteMessages(user.MXID, conv.ConversationID, allMsgs)
|
|
|
if err != nil {
|
|
|
user.log.Warnfln("Failed to delete %d history sync messages after backfilling (queue ID: %d): %v", len(allMsgs), req.QueueID, err)
|
|
|
}
|
|
@@ -255,7 +307,7 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History
|
|
|
}
|
|
|
portal := user.GetPortalByJID(jid)
|
|
|
|
|
|
- historySyncConversation := user.bridge.DB.HistorySyncQuery.NewConversationWithValues(
|
|
|
+ historySyncConversation := user.bridge.DB.HistorySync.NewConversationWithValues(
|
|
|
user.MXID,
|
|
|
conv.GetId(),
|
|
|
&portal.Key,
|
|
@@ -291,7 +343,7 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- message, err := user.bridge.DB.HistorySyncQuery.NewMessageWithValues(user.MXID, conv.GetId(), wmi.GetKey().GetId(), rawMsg)
|
|
|
+ message, err := user.bridge.DB.HistorySync.NewMessageWithValues(user.MXID, conv.GetId(), wmi.GetKey().GetId(), rawMsg)
|
|
|
if err != nil {
|
|
|
user.log.Warnfln("Failed to save message %s in %s. Error: %+v", wmi.GetKey().Id, conv.GetId(), err)
|
|
|
continue
|
|
@@ -308,7 +360,7 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- nMostRecent := user.bridge.DB.HistorySyncQuery.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations)
|
|
|
+ nMostRecent := user.bridge.DB.HistorySync.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations)
|
|
|
if len(nMostRecent) > 0 {
|
|
|
// Find the portals for all of the conversations.
|
|
|
portals := []*Portal{}
|
|
@@ -348,7 +400,7 @@ func getConversationTimestamp(conv *waProto.Conversation) uint64 {
|
|
|
func (user *User) EnqueueImmedateBackfills(portals []*Portal) {
|
|
|
for priority, portal := range portals {
|
|
|
maxMessages := user.bridge.Config.Bridge.HistorySync.Immediate.MaxEvents
|
|
|
- initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, priority, &portal.Key, nil, nil, maxMessages, maxMessages, 0)
|
|
|
+ initialBackfill := user.bridge.DB.Backfill.NewWithValues(user.MXID, database.BackfillImmediate, priority, &portal.Key, nil, nil, maxMessages, maxMessages, 0)
|
|
|
initialBackfill.Insert()
|
|
|
}
|
|
|
}
|
|
@@ -362,7 +414,7 @@ func (user *User) EnqueueDeferredBackfills(portals []*Portal) {
|
|
|
startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo)
|
|
|
startDate = &startDaysAgo
|
|
|
}
|
|
|
- backfillMessages := user.bridge.DB.BackfillQuery.NewWithValues(
|
|
|
+ backfillMessages := user.bridge.DB.Backfill.NewWithValues(
|
|
|
user.MXID, database.BackfillDeferred, stageIdx*numPortals+portalIdx, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
|
|
|
backfillMessages.Insert()
|
|
|
}
|
|
@@ -375,7 +427,7 @@ func (user *User) EnqueueForwardBackfills(portals []*Portal) {
|
|
|
if lastMsg == nil {
|
|
|
continue
|
|
|
}
|
|
|
- backfill := user.bridge.DB.BackfillQuery.NewWithValues(
|
|
|
+ backfill := user.bridge.DB.Backfill.NewWithValues(
|
|
|
user.MXID, database.BackfillForward, priority, &portal.Key, &lastMsg.Timestamp, nil, -1, -1, 0)
|
|
|
backfill.Insert()
|
|
|
}
|
|
@@ -519,21 +571,27 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo,
|
|
|
portal.finishBatch(resp.EventIDs, infos)
|
|
|
portal.NextBatchID = resp.NextBatchID
|
|
|
portal.Update()
|
|
|
- if portal.bridge.Config.Bridge.HistorySync.AutoRequestMedia {
|
|
|
- go portal.requestMediaRetries(source, infos)
|
|
|
+ if portal.bridge.Config.Bridge.HistorySync.MediaRequests.AutoRequestMedia {
|
|
|
+ go portal.requestMediaRetries(source, resp.EventIDs, infos)
|
|
|
}
|
|
|
return resp
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (portal *Portal) requestMediaRetries(source *User, infos []*wrappedInfo) {
|
|
|
- for _, info := range infos {
|
|
|
+func (portal *Portal) requestMediaRetries(source *User, eventIDs []id.EventID, infos []*wrappedInfo) {
|
|
|
+ for i, info := range infos {
|
|
|
if info != nil && info.Error == database.MsgErrMediaNotFound && info.MediaKey != nil {
|
|
|
- err := source.Client.SendMediaRetryReceipt(info.MessageInfo, info.MediaKey)
|
|
|
- if err != nil {
|
|
|
- portal.log.Warnfln("Failed to send post-backfill media retry request for %s: %v", info.ID, err)
|
|
|
- } else {
|
|
|
- portal.log.Debugfln("Sent post-backfill media retry request for %s", info.ID)
|
|
|
+ switch portal.bridge.Config.Bridge.HistorySync.MediaRequests.RequestMethod {
|
|
|
+ case config.MediaRequestMethodImmediate:
|
|
|
+ err := source.Client.SendMediaRetryReceipt(info.MessageInfo, info.MediaKey)
|
|
|
+ if err != nil {
|
|
|
+ portal.log.Warnfln("Failed to send post-backfill media retry request for %s: %v", info.ID, err)
|
|
|
+ } else {
|
|
|
+ portal.log.Debugfln("Sent post-backfill media retry request for %s", info.ID)
|
|
|
+ }
|
|
|
+ case config.MediaRequestMethodLocalTime:
|
|
|
+ req := portal.bridge.DB.MediaBackfillRequest.NewMediaBackfillRequestWithValues(source.MXID, &portal.Key, eventIDs[i], info.MediaKey)
|
|
|
+ req.Upsert()
|
|
|
}
|
|
|
}
|
|
|
}
|