eventprocessor.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package appservice
  2. import (
  3. "maunium.net/go/gomatrix"
  4. log "maunium.net/go/maulogger"
  5. )
  6. type ExecMode uint8
  7. const (
  8. AsyncHandlers ExecMode = iota
  9. AsyncLoop
  10. Sync
  11. )
  12. type EventProcessor struct {
  13. ExecMode ExecMode
  14. as *AppService
  15. log log.Logger
  16. stop chan struct{}
  17. handlers map[gomatrix.EventType][]gomatrix.OnEventListener
  18. }
  19. func NewEventProcessor(as *AppService) *EventProcessor {
  20. return &EventProcessor{
  21. ExecMode: AsyncHandlers,
  22. as: as,
  23. log: as.Log.Sub("Events"),
  24. stop: make(chan struct{}, 1),
  25. handlers: make(map[gomatrix.EventType][]gomatrix.OnEventListener),
  26. }
  27. }
  28. func (ep *EventProcessor) On(evtType gomatrix.EventType, handler gomatrix.OnEventListener) {
  29. handlers, ok := ep.handlers[evtType]
  30. if !ok {
  31. handlers = []gomatrix.OnEventListener{handler}
  32. } else {
  33. handlers = append(handlers, handler)
  34. }
  35. ep.handlers[evtType] = handlers
  36. }
  37. func (ep *EventProcessor) Start() {
  38. for {
  39. select {
  40. case evt := <-ep.as.Events:
  41. handlers, ok := ep.handlers[evt.Type]
  42. if !ok {
  43. continue
  44. }
  45. switch ep.ExecMode {
  46. case AsyncHandlers:
  47. for _, handler := range handlers {
  48. go handler(evt)
  49. }
  50. case AsyncLoop:
  51. go func() {
  52. for _, handler := range handlers {
  53. handler(evt)
  54. }
  55. }()
  56. case Sync:
  57. for _, handler := range handlers {
  58. handler(evt)
  59. }
  60. }
  61. case <-ep.stop:
  62. return
  63. }
  64. }
  65. }
  66. func (ep *EventProcessor) Stop() {
  67. ep.stop <- struct{}{}
  68. }