sync.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package gomatrix
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "runtime/debug"
  6. "time"
  7. )
  8. // Syncer represents an interface that must be satisfied in order to do /sync requests on a client.
  9. type Syncer interface {
  10. // Process the /sync response. The since parameter is the since= value that was used to produce the response.
  11. // This is useful for detecting the very first sync (since=""). If an error is return, Syncing will be stopped
  12. // permanently.
  13. ProcessResponse(resp *RespSync, since string) error
  14. // OnFailedSync returns either the time to wait before retrying or an error to stop syncing permanently.
  15. OnFailedSync(res *RespSync, err error) (time.Duration, error)
  16. // GetFilterJSON for the given user ID. NOT the filter ID.
  17. GetFilterJSON(userID string) json.RawMessage
  18. }
  19. // DefaultSyncer is the default syncing implementation. You can either write your own syncer, or selectively
  20. // replace parts of this default syncer (e.g. the ProcessResponse method). The default syncer uses the observer
  21. // pattern to notify callers about incoming events. See DefaultSyncer.OnEventType for more information.
  22. type DefaultSyncer struct {
  23. UserID string
  24. Store Storer
  25. listeners map[EventType][]OnEventListener // event type to listeners array
  26. }
  27. // OnEventListener can be used with DefaultSyncer.OnEventType to be informed of incoming events.
  28. type OnEventListener func(*Event)
  29. // NewDefaultSyncer returns an instantiated DefaultSyncer
  30. func NewDefaultSyncer(userID string, store Storer) *DefaultSyncer {
  31. return &DefaultSyncer{
  32. UserID: userID,
  33. Store: store,
  34. listeners: make(map[EventType][]OnEventListener),
  35. }
  36. }
  37. // ProcessResponse processes the /sync response in a way suitable for bots. "Suitable for bots" means a stream of
  38. // unrepeating events. Returns a fatal error if a listener panics.
  39. func (s *DefaultSyncer) ProcessResponse(res *RespSync, since string) (err error) {
  40. if !s.shouldProcessResponse(res, since) {
  41. return
  42. }
  43. defer func() {
  44. if r := recover(); r != nil {
  45. err = fmt.Errorf("ProcessResponse panicked! userID=%s since=%s panic=%s\n%s", s.UserID, since, r, debug.Stack())
  46. }
  47. }()
  48. for roomID, roomData := range res.Rooms.Join {
  49. room := s.getOrCreateRoom(roomID)
  50. for _, event := range roomData.State.Events {
  51. event.RoomID = roomID
  52. room.UpdateState(event)
  53. s.notifyListeners(event)
  54. }
  55. for _, event := range roomData.Timeline.Events {
  56. event.RoomID = roomID
  57. s.notifyListeners(event)
  58. }
  59. }
  60. for roomID, roomData := range res.Rooms.Invite {
  61. room := s.getOrCreateRoom(roomID)
  62. for _, event := range roomData.State.Events {
  63. event.RoomID = roomID
  64. room.UpdateState(event)
  65. s.notifyListeners(event)
  66. }
  67. }
  68. for roomID, roomData := range res.Rooms.Leave {
  69. room := s.getOrCreateRoom(roomID)
  70. for _, event := range roomData.Timeline.Events {
  71. if event.StateKey != nil {
  72. event.RoomID = roomID
  73. room.UpdateState(event)
  74. s.notifyListeners(event)
  75. }
  76. }
  77. }
  78. return
  79. }
  80. // OnEventType allows callers to be notified when there are new events for the given event type.
  81. // There are no duplicate checks.
  82. func (s *DefaultSyncer) OnEventType(eventType EventType, callback OnEventListener) {
  83. _, exists := s.listeners[eventType]
  84. if !exists {
  85. s.listeners[eventType] = []OnEventListener{}
  86. }
  87. s.listeners[eventType] = append(s.listeners[eventType], callback)
  88. }
  89. // shouldProcessResponse returns true if the response should be processed. May modify the response to remove
  90. // stuff that shouldn't be processed.
  91. func (s *DefaultSyncer) shouldProcessResponse(resp *RespSync, since string) bool {
  92. if since == "" {
  93. return false
  94. }
  95. // This is a horrible hack because /sync will return the most recent messages for a room
  96. // as soon as you /join it. We do NOT want to process those events in that particular room
  97. // because they may have already been processed (if you toggle the bot in/out of the room).
  98. //
  99. // Work around this by inspecting each room's timeline and seeing if an m.room.member event for us
  100. // exists and is "join" and then discard processing that room entirely if so.
  101. // TODO: We probably want to process messages from after the last join event in the timeline.
  102. for roomID, roomData := range resp.Rooms.Join {
  103. for i := len(roomData.Timeline.Events) - 1; i >= 0; i-- {
  104. e := roomData.Timeline.Events[i]
  105. if e.Type == StateMember && e.GetStateKey() == s.UserID {
  106. if e.Content.Membership == "join" {
  107. _, ok := resp.Rooms.Join[roomID]
  108. if !ok {
  109. continue
  110. }
  111. delete(resp.Rooms.Join, roomID) // don't re-process messages
  112. delete(resp.Rooms.Invite, roomID) // don't re-process invites
  113. break
  114. }
  115. }
  116. }
  117. }
  118. return true
  119. }
  120. // getOrCreateRoom must only be called by the Sync() goroutine which calls ProcessResponse()
  121. func (s *DefaultSyncer) getOrCreateRoom(roomID string) *Room {
  122. room := s.Store.LoadRoom(roomID)
  123. if room == nil { // create a new Room
  124. room = NewRoom(roomID)
  125. s.Store.SaveRoom(room)
  126. }
  127. return room
  128. }
  129. func (s *DefaultSyncer) notifyListeners(event *Event) {
  130. listeners, exists := s.listeners[event.Type]
  131. if !exists {
  132. return
  133. }
  134. for _, fn := range listeners {
  135. fn(event)
  136. }
  137. }
  138. // OnFailedSync always returns a 10 second wait period between failed /syncs, never a fatal error.
  139. func (s *DefaultSyncer) OnFailedSync(res *RespSync, err error) (time.Duration, error) {
  140. return 10 * time.Second, nil
  141. }
  142. // GetFilterJSON returns a filter with a timeline limit of 50.
  143. func (s *DefaultSyncer) GetFilterJSON(userID string) json.RawMessage {
  144. return json.RawMessage(`{"room":{"timeline":{"limit":50}}}`)
  145. }