2019-08-25-move-state-store-to-db.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package upgrades
  2. import (
  3. "database/sql"
  4. "encoding/json"
  5. "fmt"
  6. "io/ioutil"
  7. "os"
  8. "strings"
  9. "maunium.net/go/mautrix/event"
  10. )
  11. func init() {
  12. migrateRegistrations := func(tx *sql.Tx, registrations map[string]bool) error {
  13. if len(registrations) == 0 {
  14. return nil
  15. }
  16. executeBatch := func(tx *sql.Tx, valueStrings []string, values ...interface{}) error {
  17. valueString := strings.Join(valueStrings, ",")
  18. _, err := tx.Exec("INSERT INTO mx_registrations (user_id) VALUES "+valueString, values...)
  19. return err
  20. }
  21. batchSize := 100
  22. values := make([]interface{}, 0, batchSize)
  23. valueStrings := make([]string, 0, batchSize)
  24. i := 1
  25. for userID, registered := range registrations {
  26. if i == batchSize {
  27. err := executeBatch(tx, valueStrings, values...)
  28. if err != nil {
  29. return err
  30. }
  31. i = 1
  32. values = make([]interface{}, 0, batchSize)
  33. valueStrings = make([]string, 0, batchSize)
  34. }
  35. if registered {
  36. values = append(values, userID)
  37. valueStrings = append(valueStrings, fmt.Sprintf("($%d)", i))
  38. i++
  39. }
  40. }
  41. return executeBatch(tx, valueStrings, values...)
  42. }
  43. migrateMemberships := func(tx *sql.Tx, rooms map[string]map[string]event.Membership) error {
  44. for roomID, members := range rooms {
  45. if len(members) == 0 {
  46. continue
  47. }
  48. var values []interface{}
  49. var valueStrings []string
  50. i := 1
  51. for userID, membership := range members {
  52. values = append(values, roomID, userID, membership)
  53. valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d)", i, i+1, i+2))
  54. i += 3
  55. }
  56. valueString := strings.Join(valueStrings, ",")
  57. _, err := tx.Exec("INSERT INTO mx_user_profile (room_id, user_id, membership) VALUES "+valueString, values...)
  58. if err != nil {
  59. return err
  60. }
  61. }
  62. return nil
  63. }
  64. migratePowerLevels := func(tx *sql.Tx, rooms map[string]*event.PowerLevelsEventContent) error {
  65. if len(rooms) == 0 {
  66. return nil
  67. }
  68. var values []interface{}
  69. var valueStrings []string
  70. i := 1
  71. for roomID, powerLevels := range rooms {
  72. powerLevelBytes, err := json.Marshal(powerLevels)
  73. if err != nil {
  74. return err
  75. }
  76. values = append(values, roomID, powerLevelBytes)
  77. valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d)", i, i+1))
  78. i += 2
  79. }
  80. valueString := strings.Join(valueStrings, ",")
  81. _, err := tx.Exec("INSERT INTO mx_room_state (room_id, power_levels) VALUES "+valueString, values...)
  82. return err
  83. }
  84. userProfileTable := `CREATE TABLE mx_user_profile (
  85. room_id VARCHAR(255),
  86. user_id VARCHAR(255),
  87. membership VARCHAR(15) NOT NULL,
  88. PRIMARY KEY (room_id, user_id)
  89. )`
  90. roomStateTable := `CREATE TABLE mx_room_state (
  91. room_id VARCHAR(255) PRIMARY KEY,
  92. power_levels TEXT
  93. )`
  94. registrationsTable := `CREATE TABLE mx_registrations (
  95. user_id VARCHAR(255) PRIMARY KEY
  96. )`
  97. type TempStateStore struct {
  98. Registrations map[string]bool `json:"registrations"`
  99. Members map[string]map[string]event.Membership `json:"memberships"`
  100. PowerLevels map[string]*event.PowerLevelsEventContent `json:"power_levels"`
  101. }
  102. upgrades[9] = upgrade{"Move state store to main DB", func(tx *sql.Tx, ctx context) error {
  103. if ctx.dialect == Postgres {
  104. roomStateTable = strings.Replace(roomStateTable, "TEXT", "JSONB", 1)
  105. }
  106. var store TempStateStore
  107. if _, err := tx.Exec(userProfileTable); err != nil {
  108. return err
  109. } else if _, err = tx.Exec(roomStateTable); err != nil {
  110. return err
  111. } else if _, err = tx.Exec(registrationsTable); err != nil {
  112. return err
  113. } else if data, err := ioutil.ReadFile("mx-state.json"); err != nil {
  114. ctx.log.Debugln("mx-state.json not found, not migrating state store")
  115. } else if err = json.Unmarshal(data, &store); err != nil {
  116. return err
  117. } else if err = migrateRegistrations(tx, store.Registrations); err != nil {
  118. return err
  119. } else if err = migrateMemberships(tx, store.Members); err != nil {
  120. return err
  121. } else if err = migratePowerLevels(tx, store.PowerLevels); err != nil {
  122. return err
  123. } else if err = os.Rename("mx-state.json", "mx-state.json.bak"); err != nil {
  124. return err
  125. }
  126. return nil
  127. }}
  128. }