backfillqueue.go 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. // mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
  2. // Copyright (C) 2021 Tulir Asokan, Sumner Evans
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package main
  17. import (
  18. "time"
  19. log "maunium.net/go/maulogger/v2"
  20. "maunium.net/go/mautrix/id"
  21. "maunium.net/go/mautrix-whatsapp/database"
  22. )
  23. type BackfillQueue struct {
  24. BackfillQuery *database.BackfillQuery
  25. reCheckChannels []chan bool
  26. log log.Logger
  27. }
  28. func (bq *BackfillQueue) ReCheck() {
  29. bq.log.Info("Sending re-checks to %d channels", len(bq.reCheckChannels))
  30. for _, channel := range bq.reCheckChannels {
  31. go func(c chan bool) {
  32. c <- true
  33. }(channel)
  34. }
  35. }
  36. func (bq *BackfillQueue) GetNextBackfill(userID id.UserID, backfillTypes []database.BackfillType, reCheckChannel chan bool) *database.Backfill {
  37. for {
  38. if backfill := bq.BackfillQuery.GetNext(userID, backfillTypes); backfill != nil {
  39. backfill.MarkDispatched()
  40. return backfill
  41. } else {
  42. select {
  43. case <-reCheckChannel:
  44. case <-time.After(time.Minute):
  45. }
  46. }
  47. }
  48. }
  49. func (user *User) HandleBackfillRequestsLoop(backfillTypes []database.BackfillType) {
  50. reCheckChannel := make(chan bool)
  51. user.BackfillQueue.reCheckChannels = append(user.BackfillQueue.reCheckChannels, reCheckChannel)
  52. for {
  53. req := user.BackfillQueue.GetNextBackfill(user.MXID, backfillTypes, reCheckChannel)
  54. user.log.Infofln("Handling backfill request %s", req)
  55. conv := user.bridge.DB.HistorySync.GetConversation(user.MXID, req.Portal)
  56. if conv == nil {
  57. user.log.Debugfln("Could not find history sync conversation data for %s", req.Portal.String())
  58. req.MarkDone()
  59. continue
  60. }
  61. portal := user.GetPortalByJID(conv.PortalKey.JID)
  62. // Update the client store with basic chat settings.
  63. if conv.MuteEndTime.After(time.Now()) {
  64. user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime)
  65. }
  66. if conv.Archived {
  67. user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true)
  68. }
  69. if conv.Pinned > 0 {
  70. user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true)
  71. }
  72. if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration {
  73. portal.ExpirationTime = *conv.EphemeralExpiration
  74. portal.Update(nil)
  75. }
  76. user.backfillInChunks(req, conv, portal)
  77. req.MarkDone()
  78. }
  79. }