backfillqueue.go 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. // mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
  2. // Copyright (C) 2021 Tulir Asokan, Sumner Evans
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package main
  17. import (
  18. "time"
  19. log "maunium.net/go/maulogger/v2"
  20. "maunium.net/go/mautrix/id"
  21. "maunium.net/go/mautrix-whatsapp/database"
  22. )
  23. type BackfillQueue struct {
  24. BackfillQuery *database.BackfillQuery
  25. reCheckChannels []chan bool
  26. log log.Logger
  27. }
  28. func (bq *BackfillQueue) ReCheck() {
  29. bq.log.Infofln("Sending re-checks to %d channels", len(bq.reCheckChannels))
  30. for _, channel := range bq.reCheckChannels {
  31. go func(c chan bool) {
  32. c <- true
  33. }(channel)
  34. }
  35. }
  36. func (bq *BackfillQueue) GetNextBackfill(userID id.UserID, backfillTypes []database.BackfillType, waitForBackfillTypes []database.BackfillType, reCheckChannel chan bool) *database.Backfill {
  37. for {
  38. if !bq.BackfillQuery.HasUnstartedOrInFlightOfType(userID, waitForBackfillTypes) {
  39. // check for immediate when dealing with deferred
  40. if backfill := bq.BackfillQuery.GetNext(userID, backfillTypes); backfill != nil {
  41. backfill.MarkDispatched()
  42. return backfill
  43. }
  44. }
  45. select {
  46. case <-reCheckChannel:
  47. case <-time.After(time.Minute):
  48. }
  49. }
  50. }
  51. func (user *User) HandleBackfillRequestsLoop(backfillTypes []database.BackfillType, waitForBackfillTypes []database.BackfillType) {
  52. reCheckChannel := make(chan bool)
  53. user.BackfillQueue.reCheckChannels = append(user.BackfillQueue.reCheckChannels, reCheckChannel)
  54. for {
  55. req := user.BackfillQueue.GetNextBackfill(user.MXID, backfillTypes, waitForBackfillTypes, reCheckChannel)
  56. user.log.Infofln("Handling backfill request %s", req)
  57. conv := user.bridge.DB.HistorySync.GetConversation(user.MXID, *req.Portal)
  58. if conv == nil {
  59. user.log.Debugfln("Could not find history sync conversation data for %s", req.Portal.String())
  60. req.MarkDone()
  61. continue
  62. }
  63. portal := user.GetPortalByJID(conv.PortalKey.JID)
  64. // Update the client store with basic chat settings.
  65. if conv.MuteEndTime.After(time.Now()) {
  66. user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime)
  67. }
  68. if conv.Archived {
  69. user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true)
  70. }
  71. if conv.Pinned > 0 {
  72. user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true)
  73. }
  74. if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration {
  75. portal.ExpirationTime = *conv.EphemeralExpiration
  76. portal.Update(nil)
  77. }
  78. user.backfillInChunks(req, conv, portal)
  79. req.MarkDone()
  80. }
  81. }