historysync.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. // mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
  2. // Copyright (C) 2022 Tulir Asokan, Sumner Evans
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package database
  17. import (
  18. "database/sql"
  19. "errors"
  20. "fmt"
  21. "time"
  22. waProto "go.mau.fi/whatsmeow/binary/proto"
  23. "google.golang.org/protobuf/proto"
  24. _ "github.com/mattn/go-sqlite3"
  25. log "maunium.net/go/maulogger/v2"
  26. "maunium.net/go/mautrix/id"
  27. )
  28. type HistorySyncQuery struct {
  29. db *Database
  30. log log.Logger
  31. }
  32. type HistorySyncConversation struct {
  33. db *Database
  34. log log.Logger
  35. UserID id.UserID
  36. ConversationID string
  37. PortalKey *PortalKey
  38. LastMessageTimestamp time.Time
  39. MuteEndTime time.Time
  40. Archived bool
  41. Pinned uint32
  42. DisappearingMode waProto.DisappearingMode_DisappearingModeInitiator
  43. EndOfHistoryTransferType waProto.Conversation_ConversationEndOfHistoryTransferType
  44. EphemeralExpiration *uint32
  45. MarkedAsUnread bool
  46. UnreadCount uint32
  47. }
  48. func (hsq *HistorySyncQuery) NewConversation() *HistorySyncConversation {
  49. return &HistorySyncConversation{
  50. db: hsq.db,
  51. log: hsq.log,
  52. PortalKey: &PortalKey{},
  53. }
  54. }
  55. func (hsq *HistorySyncQuery) NewConversationWithValues(
  56. userID id.UserID,
  57. conversationID string,
  58. portalKey *PortalKey,
  59. lastMessageTimestamp,
  60. muteEndTime uint64,
  61. archived bool,
  62. pinned uint32,
  63. disappearingMode waProto.DisappearingMode_DisappearingModeInitiator,
  64. endOfHistoryTransferType waProto.Conversation_ConversationEndOfHistoryTransferType,
  65. ephemeralExpiration *uint32,
  66. markedAsUnread bool,
  67. unreadCount uint32) *HistorySyncConversation {
  68. return &HistorySyncConversation{
  69. db: hsq.db,
  70. log: hsq.log,
  71. UserID: userID,
  72. ConversationID: conversationID,
  73. PortalKey: portalKey,
  74. LastMessageTimestamp: time.Unix(int64(lastMessageTimestamp), 0),
  75. MuteEndTime: time.Unix(int64(muteEndTime), 0),
  76. Archived: archived,
  77. Pinned: pinned,
  78. DisappearingMode: disappearingMode,
  79. EndOfHistoryTransferType: endOfHistoryTransferType,
  80. EphemeralExpiration: ephemeralExpiration,
  81. MarkedAsUnread: markedAsUnread,
  82. UnreadCount: unreadCount,
  83. }
  84. }
  85. const (
  86. getNMostRecentConversations = `
  87. SELECT user_mxid, conversation_id, portal_jid, portal_receiver, last_message_timestamp, archived, pinned, mute_end_time, disappearing_mode, end_of_history_transfer_type, ephemeral_expiration, marked_as_unread, unread_count
  88. FROM history_sync_conversation
  89. WHERE user_mxid=$1
  90. ORDER BY last_message_timestamp DESC
  91. LIMIT $2
  92. `
  93. getConversationByPortal = `
  94. SELECT user_mxid, conversation_id, portal_jid, portal_receiver, last_message_timestamp, archived, pinned, mute_end_time, disappearing_mode, end_of_history_transfer_type, ephemeral_expiration, marked_as_unread, unread_count
  95. FROM history_sync_conversation
  96. WHERE user_mxid=$1
  97. AND portal_jid=$2
  98. AND portal_receiver=$3
  99. `
  100. )
  101. func (hsc *HistorySyncConversation) Upsert() {
  102. _, err := hsc.db.Exec(`
  103. INSERT INTO history_sync_conversation (user_mxid, conversation_id, portal_jid, portal_receiver, last_message_timestamp, archived, pinned, mute_end_time, disappearing_mode, end_of_history_transfer_type, ephemeral_expiration, marked_as_unread, unread_count)
  104. VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
  105. ON CONFLICT (user_mxid, conversation_id)
  106. DO UPDATE SET
  107. portal_jid=EXCLUDED.portal_jid,
  108. portal_receiver=EXCLUDED.portal_receiver,
  109. last_message_timestamp=EXCLUDED.last_message_timestamp,
  110. archived=EXCLUDED.archived,
  111. pinned=EXCLUDED.pinned,
  112. mute_end_time=EXCLUDED.mute_end_time,
  113. disappearing_mode=EXCLUDED.disappearing_mode,
  114. end_of_history_transfer_type=EXCLUDED.end_of_history_transfer_type,
  115. ephemeral_expiration=EXCLUDED.ephemeral_expiration,
  116. marked_as_unread=EXCLUDED.marked_as_unread,
  117. unread_count=EXCLUDED.unread_count
  118. `,
  119. hsc.UserID,
  120. hsc.ConversationID,
  121. hsc.PortalKey.JID.String(),
  122. hsc.PortalKey.Receiver.String(),
  123. hsc.LastMessageTimestamp,
  124. hsc.Archived,
  125. hsc.Pinned,
  126. hsc.MuteEndTime,
  127. hsc.DisappearingMode,
  128. hsc.EndOfHistoryTransferType,
  129. hsc.EphemeralExpiration,
  130. hsc.MarkedAsUnread,
  131. hsc.UnreadCount)
  132. if err != nil {
  133. hsc.log.Warnfln("Failed to insert history sync conversation %s/%s: %v", hsc.UserID, hsc.ConversationID, err)
  134. }
  135. }
  136. func (hsc *HistorySyncConversation) Scan(row Scannable) *HistorySyncConversation {
  137. err := row.Scan(
  138. &hsc.UserID,
  139. &hsc.ConversationID,
  140. &hsc.PortalKey.JID,
  141. &hsc.PortalKey.Receiver,
  142. &hsc.LastMessageTimestamp,
  143. &hsc.Archived,
  144. &hsc.Pinned,
  145. &hsc.MuteEndTime,
  146. &hsc.DisappearingMode,
  147. &hsc.EndOfHistoryTransferType,
  148. &hsc.EphemeralExpiration,
  149. &hsc.MarkedAsUnread,
  150. &hsc.UnreadCount)
  151. if err != nil {
  152. if !errors.Is(err, sql.ErrNoRows) {
  153. hsc.log.Errorln("Database scan failed:", err)
  154. }
  155. return nil
  156. }
  157. return hsc
  158. }
  159. func (hsq *HistorySyncQuery) GetNMostRecentConversations(userID id.UserID, n int) (conversations []*HistorySyncConversation) {
  160. rows, err := hsq.db.Query(getNMostRecentConversations, userID, n)
  161. defer rows.Close()
  162. if err != nil || rows == nil {
  163. return nil
  164. }
  165. for rows.Next() {
  166. conversations = append(conversations, hsq.NewConversation().Scan(rows))
  167. }
  168. return
  169. }
  170. func (hsq *HistorySyncQuery) GetConversation(userID id.UserID, portalKey *PortalKey) (conversation *HistorySyncConversation) {
  171. rows, err := hsq.db.Query(getConversationByPortal, userID, portalKey.JID, portalKey.Receiver)
  172. defer rows.Close()
  173. if err != nil || rows == nil {
  174. return nil
  175. }
  176. if rows.Next() {
  177. conversation = hsq.NewConversation().Scan(rows)
  178. }
  179. return
  180. }
  181. func (hsq *HistorySyncQuery) DeleteAllConversations(userID id.UserID) error {
  182. _, err := hsq.db.Exec("DELETE FROM history_sync_conversation WHERE user_mxid=$1", userID)
  183. return err
  184. }
  185. const (
  186. getMessagesBetween = `
  187. SELECT data
  188. FROM history_sync_message
  189. WHERE user_mxid=$1
  190. AND conversation_id=$2
  191. %s
  192. ORDER BY timestamp DESC
  193. %s
  194. `
  195. )
  196. type HistorySyncMessage struct {
  197. db *Database
  198. log log.Logger
  199. UserID id.UserID
  200. ConversationID string
  201. Timestamp time.Time
  202. Data []byte
  203. }
  204. func (hsq *HistorySyncQuery) NewMessageWithValues(userID id.UserID, conversationID string, message *waProto.HistorySyncMsg) (*HistorySyncMessage, error) {
  205. msgData, err := proto.Marshal(message)
  206. if err != nil {
  207. return nil, err
  208. }
  209. return &HistorySyncMessage{
  210. db: hsq.db,
  211. log: hsq.log,
  212. UserID: userID,
  213. ConversationID: conversationID,
  214. Timestamp: time.Unix(int64(message.Message.GetMessageTimestamp()), 0),
  215. Data: msgData,
  216. }, nil
  217. }
  218. func (hsm *HistorySyncMessage) Insert() {
  219. _, err := hsm.db.Exec(`
  220. INSERT INTO history_sync_message (user_mxid, conversation_id, timestamp, data)
  221. VALUES ($1, $2, $3, $4)
  222. `, hsm.UserID, hsm.ConversationID, hsm.Timestamp, hsm.Data)
  223. if err != nil {
  224. hsm.log.Warnfln("Failed to insert history sync message %s/%s: %v", hsm.ConversationID, hsm.Timestamp, err)
  225. }
  226. }
  227. func (hsq *HistorySyncQuery) GetMessagesBetween(userID id.UserID, conversationID string, startTime, endTime *time.Time, limit int) (messages []*waProto.WebMessageInfo) {
  228. whereClauses := ""
  229. args := []interface{}{userID, conversationID}
  230. argNum := 3
  231. if startTime != nil {
  232. whereClauses += fmt.Sprintf(" AND timestamp >= $%d", argNum)
  233. args = append(args, startTime)
  234. argNum++
  235. }
  236. if endTime != nil {
  237. whereClauses += fmt.Sprintf(" AND timestamp <= $%d", argNum)
  238. args = append(args, endTime)
  239. }
  240. limitClause := ""
  241. if limit > 0 {
  242. limitClause = fmt.Sprintf("LIMIT %d", limit)
  243. }
  244. rows, err := hsq.db.Query(fmt.Sprintf(getMessagesBetween, whereClauses, limitClause), args...)
  245. defer rows.Close()
  246. if err != nil || rows == nil {
  247. return nil
  248. }
  249. var msgData []byte
  250. for rows.Next() {
  251. err := rows.Scan(&msgData)
  252. if err != nil {
  253. hsq.log.Error("Database scan failed: %v", err)
  254. continue
  255. }
  256. var historySyncMsg waProto.HistorySyncMsg
  257. err = proto.Unmarshal(msgData, &historySyncMsg)
  258. if err != nil {
  259. hsq.log.Errorf("Failed to unmarshal history sync message: %v", err)
  260. continue
  261. }
  262. messages = append(messages, historySyncMsg.Message)
  263. }
  264. return
  265. }
  266. func (hsq *HistorySyncQuery) DeleteAllMessages(userID id.UserID) error {
  267. _, err := hsq.db.Exec("DELETE FROM history_sync_message WHERE user_mxid=$1", userID)
  268. return err
  269. }