user.go 36 KB


  1. // mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
  2. // Copyright (C) 2020 Tulir Asokan
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package main
  17. import (
  18. "encoding/json"
  19. "errors"
  20. "fmt"
  21. "net/http"
  22. "sort"
  23. "strconv"
  24. "strings"
  25. "sync"
  26. "time"
  27. "github.com/skip2/go-qrcode"
  28. log "maunium.net/go/maulogger/v2"
  29. "github.com/Rhymen/go-whatsapp"
  30. waBinary "github.com/Rhymen/go-whatsapp/binary"
  31. waProto "github.com/Rhymen/go-whatsapp/binary/proto"
  32. "maunium.net/go/mautrix"
  33. "maunium.net/go/mautrix/event"
  34. "maunium.net/go/mautrix/format"
  35. "maunium.net/go/mautrix/id"
  36. "maunium.net/go/mautrix-whatsapp/database"
  37. "maunium.net/go/mautrix-whatsapp/types"
  38. "maunium.net/go/mautrix-whatsapp/whatsapp-ext"
  39. )
  40. type User struct {
  41. *database.User
  42. Conn *whatsappExt.ExtendedConn
  43. bridge *Bridge
  44. log log.Logger
  45. Admin bool
  46. Whitelisted bool
  47. RelaybotWhitelisted bool
  48. IsRelaybot bool
  49. ConnectionErrors int
  50. CommunityID string
  51. cleanDisconnection bool
  52. batteryWarningsSent int
  53. lastReconnection int64
  54. chatListReceived chan struct{}
  55. syncPortalsDone chan struct{}
  56. messageInput chan PortalMessage
  57. messageOutput chan PortalMessage
  58. syncStart chan struct{}
  59. syncWait sync.WaitGroup
  60. mgmtCreateLock sync.Mutex
  61. connLock sync.Mutex
  62. }
  63. func (bridge *Bridge) GetUserByMXID(userID id.UserID) *User {
  64. _, isPuppet := bridge.ParsePuppetMXID(userID)
  65. if isPuppet || userID == bridge.Bot.UserID {
  66. return nil
  67. }
  68. bridge.usersLock.Lock()
  69. defer bridge.usersLock.Unlock()
  70. user, ok := bridge.usersByMXID[userID]
  71. if !ok {
  72. return bridge.loadDBUser(bridge.DB.User.GetByMXID(userID), &userID)
  73. }
  74. return user
  75. }
  76. func (bridge *Bridge) GetUserByJID(userID types.WhatsAppID) *User {
  77. bridge.usersLock.Lock()
  78. defer bridge.usersLock.Unlock()
  79. user, ok := bridge.usersByJID[userID]
  80. if !ok {
  81. return bridge.loadDBUser(bridge.DB.User.GetByJID(userID), nil)
  82. }
  83. return user
  84. }
  85. func (user *User) addToJIDMap() {
  86. user.bridge.usersLock.Lock()
  87. user.bridge.usersByJID[user.JID] = user
  88. user.bridge.usersLock.Unlock()
  89. }
  90. func (user *User) removeFromJIDMap() {
  91. user.bridge.usersLock.Lock()
  92. jidUser, ok := user.bridge.usersByJID[user.JID]
  93. if ok && user == jidUser {
  94. delete(user.bridge.usersByJID, user.JID)
  95. }
  96. user.bridge.usersLock.Unlock()
  97. user.bridge.Metrics.TrackLoginState(user.JID, false)
  98. }
  99. func (bridge *Bridge) GetAllUsers() []*User {
  100. bridge.usersLock.Lock()
  101. defer bridge.usersLock.Unlock()
  102. dbUsers := bridge.DB.User.GetAll()
  103. output := make([]*User, len(dbUsers))
  104. for index, dbUser := range dbUsers {
  105. user, ok := bridge.usersByMXID[dbUser.MXID]
  106. if !ok {
  107. user = bridge.loadDBUser(dbUser, nil)
  108. }
  109. output[index] = user
  110. }
  111. return output
  112. }
  113. func (bridge *Bridge) loadDBUser(dbUser *database.User, mxid *id.UserID) *User {
  114. if dbUser == nil {
  115. if mxid == nil {
  116. return nil
  117. }
  118. dbUser = bridge.DB.User.New()
  119. dbUser.MXID = *mxid
  120. dbUser.Insert()
  121. }
  122. user := bridge.NewUser(dbUser)
  123. bridge.usersByMXID[user.MXID] = user
  124. if len(user.JID) > 0 {
  125. bridge.usersByJID[user.JID] = user
  126. }
  127. if len(user.ManagementRoom) > 0 {
  128. bridge.managementRooms[user.ManagementRoom] = user
  129. }
  130. return user
  131. }
  132. func (user *User) GetPortals() []*Portal {
  133. keys := user.User.GetPortalKeys()
  134. portals := make([]*Portal, len(keys))
  135. user.bridge.portalsLock.Lock()
  136. for i, key := range keys {
  137. portal, ok := user.bridge.portalsByJID[key]
  138. if !ok {
  139. portal = user.bridge.loadDBPortal(user.bridge.DB.Portal.GetByJID(key), &key)
  140. }
  141. portals[i] = portal
  142. }
  143. user.bridge.portalsLock.Unlock()
  144. return portals
  145. }
  146. func (bridge *Bridge) NewUser(dbUser *database.User) *User {
  147. user := &User{
  148. User: dbUser,
  149. bridge: bridge,
  150. log: bridge.Log.Sub("User").Sub(string(dbUser.MXID)),
  151. IsRelaybot: false,
  152. chatListReceived: make(chan struct{}, 1),
  153. syncPortalsDone: make(chan struct{}, 1),
  154. syncStart: make(chan struct{}, 1),
  155. messageInput: make(chan PortalMessage),
  156. messageOutput: make(chan PortalMessage, bridge.Config.Bridge.UserMessageBuffer),
  157. }
  158. user.RelaybotWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelaybotWhitelisted(user.MXID)
  159. user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID)
  160. user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID)
  161. go user.handleMessageLoop()
  162. go user.runMessageRingBuffer()
  163. return user
  164. }
  165. func (user *User) GetManagementRoom() id.RoomID {
  166. if len(user.ManagementRoom) == 0 {
  167. user.mgmtCreateLock.Lock()
  168. defer user.mgmtCreateLock.Unlock()
  169. if len(user.ManagementRoom) > 0 {
  170. return user.ManagementRoom
  171. }
  172. resp, err := user.bridge.Bot.CreateRoom(&mautrix.ReqCreateRoom{
  173. Topic: "WhatsApp bridge notices",
  174. IsDirect: true,
  175. })
  176. if err != nil {
  177. user.log.Errorln("Failed to auto-create management room:", err)
  178. } else {
  179. user.SetManagementRoom(resp.RoomID)
  180. }
  181. }
  182. return user.ManagementRoom
  183. }
  184. func (user *User) SetManagementRoom(roomID id.RoomID) {
  185. existingUser, ok := user.bridge.managementRooms[roomID]
  186. if ok {
  187. existingUser.ManagementRoom = ""
  188. existingUser.Update()
  189. }
  190. user.ManagementRoom = roomID
  191. user.bridge.managementRooms[user.ManagementRoom] = user
  192. user.Update()
  193. }
  194. func (user *User) SetSession(session *whatsapp.Session) {
  195. if session == nil {
  196. user.Session = nil
  197. user.LastConnection = 0
  198. } else if len(session.Wid) > 0 {
  199. user.Session = session
  200. } else {
  201. return
  202. }
  203. user.Update()
  204. }
  205. func (user *User) Connect(evenIfNoSession bool) bool {
  206. user.connLock.Lock()
  207. if user.Conn != nil && user.Conn.IsConnected() {
  208. user.connLock.Unlock()
  209. return true
  210. } else if !evenIfNoSession && user.Session == nil {
  211. user.connLock.Unlock()
  212. return false
  213. }
  214. if user.Conn != nil {
  215. user.Disconnect()
  216. }
  217. user.log.Debugln("Connecting to WhatsApp")
  218. timeout := time.Duration(user.bridge.Config.Bridge.ConnectionTimeout)
  219. if timeout == 0 {
  220. timeout = 20
  221. }
  222. conn, err := whatsapp.NewConnWithOptions(&whatsapp.Options{
  223. Timeout: timeout * time.Second,
  224. LongClientName: user.bridge.Config.WhatsApp.OSName,
  225. ShortClientName: user.bridge.Config.WhatsApp.BrowserName,
  226. ClientVersion: WAVersion,
  227. })
  228. if err != nil {
  229. user.log.Errorln("Failed to connect to WhatsApp:", err)
  230. user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp server. " +
  231. "This indicates a network problem on the bridge server. See bridge logs for more info.")
  232. user.connLock.Unlock()
  233. return false
  234. }
  235. user.Conn = whatsappExt.ExtendConn(conn)
  236. user.log.Debugln("WhatsApp connection successful")
  237. user.Conn.AddHandler(user)
  238. user.connLock.Unlock()
  239. return user.RestoreSession()
  240. }
  241. func (user *User) Disconnect() {
  242. sess, err := user.Conn.Disconnect()
  243. if err != nil && err != whatsapp.ErrNotConnected {
  244. user.log.Warnln("Error disconnecting: %v", err)
  245. }
  246. user.SetSession(&sess)
  247. user.Conn.RemoveHandlers()
  248. user.Conn = nil
  249. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  250. }
  251. func (user *User) RestoreSession() bool {
  252. if user.Session != nil {
  253. sess, err := user.Conn.RestoreWithSession(*user.Session)
  254. if err == whatsapp.ErrAlreadyLoggedIn {
  255. return true
  256. } else if err != nil {
  257. user.log.Errorln("Failed to restore session:", err)
  258. if errors.Is(err, whatsapp.ErrUnpaired) {
  259. user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp: unpaired from phone. " +
  260. "To re-pair your phone, log in again.")
  261. user.Disconnect()
  262. user.removeFromJIDMap()
  263. //user.JID = ""
  264. user.SetSession(nil)
  265. return false
  266. } else {
  267. user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp. Make sure WhatsApp " +
  268. "on your phone is reachable and use `reconnect` to try connecting again.")
  269. }
  270. user.log.Debugln("Disconnecting due to failed session restore...")
  271. _, err := user.Conn.Disconnect()
  272. if err != nil {
  273. user.log.Errorln("Failed to disconnect after failed session restore:", err)
  274. }
  275. return false
  276. }
  277. user.ConnectionErrors = 0
  278. user.SetSession(&sess)
  279. user.log.Debugln("Session restored successfully")
  280. user.PostLogin()
  281. }
  282. return true
  283. }
  284. func (user *User) HasSession() bool {
  285. return user.Session != nil
  286. }
  287. func (user *User) IsConnected() bool {
  288. return user.Conn != nil && user.Conn.IsConnected() && user.Conn.IsLoggedIn()
  289. }
  290. func (user *User) IsLoginInProgress() bool {
  291. return user.Conn != nil && user.Conn.IsLoginInProgress()
  292. }
  293. func (user *User) loginQrChannel(ce *CommandEvent, qrChan <-chan string, eventIDChan chan<- id.EventID) {
  294. var qrEventID id.EventID
  295. for code := range qrChan {
  296. if code == "stop" {
  297. return
  298. }
  299. qrCode, err := qrcode.Encode(code, qrcode.Low, 256)
  300. if err != nil {
  301. user.log.Errorln("Failed to encode QR code:", err)
  302. ce.Reply("Failed to encode QR code: %v", err)
  303. return
  304. }
  305. bot := user.bridge.AS.BotClient()
  306. resp, err := bot.UploadBytes(qrCode, "image/png")
  307. if err != nil {
  308. user.log.Errorln("Failed to upload QR code:", err)
  309. ce.Reply("Failed to upload QR code: %v", err)
  310. return
  311. }
  312. if qrEventID == "" {
  313. sendResp, err := bot.SendImage(ce.RoomID, code, resp.ContentURI)
  314. if err != nil {
  315. user.log.Errorln("Failed to send QR code to user:", err)
  316. return
  317. }
  318. qrEventID = sendResp.EventID
  319. eventIDChan <- qrEventID
  320. } else {
  321. _, err = bot.SendMessageEvent(ce.RoomID, event.EventMessage, &event.MessageEventContent{
  322. MsgType: event.MsgImage,
  323. Body: code,
  324. URL: resp.ContentURI.CUString(),
  325. NewContent: &event.MessageEventContent{
  326. MsgType: event.MsgImage,
  327. Body: code,
  328. URL: resp.ContentURI.CUString(),
  329. },
  330. RelatesTo: &event.RelatesTo{
  331. Type: event.RelReplace,
  332. EventID: qrEventID,
  333. },
  334. })
  335. if err != nil {
  336. user.log.Errorln("Failed to send edited QR code to user:", err)
  337. }
  338. }
  339. }
  340. }
  341. func (user *User) Login(ce *CommandEvent) {
  342. qrChan := make(chan string, 3)
  343. eventIDChan := make(chan id.EventID, 1)
  344. go user.loginQrChannel(ce, qrChan, eventIDChan)
  345. session, err := user.Conn.LoginWithRetry(qrChan, nil, user.bridge.Config.Bridge.LoginQRRegenCount)
  346. qrChan <- "stop"
  347. if err != nil {
  348. var eventID id.EventID
  349. select {
  350. case eventID = <-eventIDChan:
  351. default:
  352. }
  353. reply := event.MessageEventContent{
  354. MsgType: event.MsgText,
  355. }
  356. if err == whatsapp.ErrAlreadyLoggedIn {
  357. reply.Body = "You're already logged in"
  358. } else if err == whatsapp.ErrLoginInProgress {
  359. reply.Body = "You have a login in progress already."
  360. } else if err == whatsapp.ErrLoginTimedOut {
  361. reply.Body = "QR code scan timed out. Please try again."
  362. } else {
  363. user.log.Warnln("Failed to log in:", err)
  364. reply.Body = fmt.Sprintf("Unknown error while logging in: %v", err)
  365. }
  366. msg := reply
  367. if eventID != "" {
  368. msg.NewContent = &reply
  369. msg.RelatesTo = &event.RelatesTo{
  370. Type: event.RelReplace,
  371. EventID: eventID,
  372. }
  373. }
  374. _, _ = ce.Bot.SendMessageEvent(ce.RoomID, event.EventMessage, &msg)
  375. return
  376. }
  377. // TODO there's a bit of duplication between this and the provisioning API login method
  378. // Also between the two logout methods (commands.go and provisioning.go)
  379. user.ConnectionErrors = 0
  380. user.JID = strings.Replace(user.Conn.Info.Wid, whatsappExt.OldUserSuffix, whatsappExt.NewUserSuffix, 1)
  381. user.addToJIDMap()
  382. user.SetSession(&session)
  383. ce.Reply("Successfully logged in, synchronizing chats...")
  384. user.PostLogin()
  385. }
  386. type Chat struct {
  387. Portal *Portal
  388. LastMessageTime uint64
  389. Contact whatsapp.Contact
  390. }
  391. type ChatList []Chat
  392. func (cl ChatList) Len() int {
  393. return len(cl)
  394. }
  395. func (cl ChatList) Less(i, j int) bool {
  396. return cl[i].LastMessageTime > cl[j].LastMessageTime
  397. }
  398. func (cl ChatList) Swap(i, j int) {
  399. cl[i], cl[j] = cl[j], cl[i]
  400. }
  401. func (user *User) PostLogin() {
  402. user.bridge.Metrics.TrackConnectionState(user.JID, true)
  403. user.bridge.Metrics.TrackLoginState(user.JID, true)
  404. user.bridge.Metrics.TrackBufferLength(user.MXID, 0)
  405. user.log.Debugln("Locking processing of incoming messages and starting post-login sync")
  406. user.syncWait.Add(1)
  407. user.syncStart <- struct{}{}
  408. go user.intPostLogin(user.Conn)
  409. }
  410. func (user *User) tryAutomaticDoublePuppeting() {
  411. if len(user.bridge.Config.Bridge.LoginSharedSecret) == 0 {
  412. // Automatic login not enabled
  413. return
  414. } else if _, homeserver, _ := user.MXID.Parse(); homeserver != user.bridge.Config.Homeserver.Domain {
  415. // user is on another homeserver
  416. return
  417. }
  418. user.log.Debugln("Checking if double puppeting needs to be enabled")
  419. puppet := user.bridge.GetPuppetByJID(user.JID)
  420. if len(puppet.CustomMXID) > 0 {
  421. user.log.Debugln("User already has double-puppeting enabled")
  422. // Custom puppet already enabled
  423. return
  424. }
  425. accessToken, err := puppet.loginWithSharedSecret(user.MXID)
  426. if err != nil {
  427. user.log.Warnln("Failed to login with shared secret:", err)
  428. return
  429. }
  430. err = puppet.SwitchCustomMXID(accessToken, user.MXID)
  431. if err != nil {
  432. puppet.log.Warnln("Failed to switch to auto-logined custom puppet:", err)
  433. return
  434. }
  435. user.log.Infoln("Successfully automatically enabled custom puppet")
  436. }
  437. func (user *User) sendBridgeNotice(formatString string, args ...interface{}) {
  438. notice := fmt.Sprintf(formatString, args...)
  439. _, err := user.bridge.Bot.SendNotice(user.GetManagementRoom(), notice)
  440. if err != nil {
  441. user.log.Warnf("Failed to send bridge notice \"%s\": %v", notice, err)
  442. }
  443. }
  444. func (user *User) sendMarkdownBridgeAlert(formatString string, args ...interface{}) {
  445. notice := fmt.Sprintf(formatString, args...)
  446. content := format.RenderMarkdown(notice, true, false)
  447. _, err := user.bridge.Bot.SendMessageEvent(user.GetManagementRoom(), event.EventMessage, content)
  448. if err != nil {
  449. user.log.Warnf("Failed to send bridge alert \"%s\": %v", notice, err)
  450. }
  451. }
  452. func (user *User) postConnPing(conn *whatsappExt.ExtendedConn) bool {
  453. if user.Conn != conn {
  454. user.log.Warnln("Connection changed before scheduled post-connection ping, canceling ping")
  455. return false
  456. }
  457. user.log.Debugln("Making post-connection ping")
  458. err := conn.AdminTest()
  459. if err != nil {
  460. user.log.Errorfln("Post-connection ping failed: %v. Disconnecting and then reconnecting after a second", err)
  461. sess, disconnectErr := conn.Disconnect()
  462. if disconnectErr != nil {
  463. user.log.Warnln("Error while disconnecting after failed post-connection ping:", disconnectErr)
  464. } else {
  465. user.Session = &sess
  466. }
  467. user.bridge.Metrics.TrackDisconnection(user.MXID)
  468. go func() {
  469. time.Sleep(1 * time.Second)
  470. user.tryReconnect(fmt.Sprintf("Post-connection ping failed: %v", err))
  471. }()
  472. return false
  473. } else {
  474. user.log.Debugln("Post-connection ping OK")
  475. return true
  476. }
  477. }
  478. func (user *User) intPostLogin(conn *whatsappExt.ExtendedConn) {
  479. defer user.syncWait.Done()
  480. user.lastReconnection = time.Now().Unix()
  481. user.createCommunity()
  482. user.tryAutomaticDoublePuppeting()
  483. user.log.Debugln("Waiting for chat list receive confirmation")
  484. select {
  485. case <-user.chatListReceived:
  486. user.log.Debugln("Chat list receive confirmation received in PostLogin")
  487. case <-time.After(time.Duration(user.bridge.Config.Bridge.ChatListWait) * time.Second):
  488. user.log.Warnln("Timed out waiting for chat list to arrive!")
  489. user.postConnPing(conn)
  490. return
  491. }
  492. if !user.postConnPing(conn) {
  493. user.log.Debugln("Post-connection ping failed, unlocking processing of incoming messages.")
  494. return
  495. }
  496. user.log.Debugln("Waiting for portal sync complete confirmation")
  497. select {
  498. case <-user.syncPortalsDone:
  499. user.log.Debugln("Post-connection portal sync complete, unlocking processing of incoming messages.")
  500. // TODO this is too short, maybe a per-portal duration?
  501. case <-time.After(time.Duration(user.bridge.Config.Bridge.PortalSyncWait) * time.Second):
  502. user.log.Warnln("Timed out waiting for portal sync to complete! Unlocking processing of incoming messages.")
  503. }
  504. }
  505. func (user *User) HandleStreamEvent(evt whatsappExt.StreamEvent) {
  506. if evt.Type == whatsappExt.StreamSleep {
  507. if user.lastReconnection+60 > time.Now().Unix() {
  508. user.lastReconnection = 0
  509. user.log.Infoln("Stream went to sleep soon after reconnection, making new post-connection ping in 20 seconds")
  510. conn := user.Conn
  511. go func() {
  512. time.Sleep(20 * time.Second)
  513. // TODO if this happens during the post-login sync, it can get stuck forever
  514. user.postConnPing(conn)
  515. }()
  516. }
  517. } else {
  518. user.log.Infofln("Stream event: %+v", evt)
  519. }
  520. }
  521. func (user *User) HandleChatList(chats []whatsapp.Chat) {
  522. user.log.Infoln("Chat list received")
  523. chatMap := make(map[string]whatsapp.Chat)
  524. for _, chat := range user.Conn.Store.Chats {
  525. chatMap[chat.Jid] = chat
  526. }
  527. for _, chat := range chats {
  528. chatMap[chat.Jid] = chat
  529. }
  530. select {
  531. case user.chatListReceived <- struct{}{}:
  532. default:
  533. }
  534. go user.syncPortals(chatMap, false)
  535. }
  536. func (user *User) syncPortals(chatMap map[string]whatsapp.Chat, createAll bool) {
  537. if chatMap == nil {
  538. chatMap = user.Conn.Store.Chats
  539. }
  540. user.log.Infoln("Reading chat list")
  541. chats := make(ChatList, 0, len(chatMap))
  542. existingKeys := user.GetInCommunityMap()
  543. portalKeys := make([]database.PortalKeyWithMeta, 0, len(chatMap))
  544. for _, chat := range chatMap {
  545. ts, err := strconv.ParseUint(chat.LastMessageTime, 10, 64)
  546. if err != nil {
  547. user.log.Warnfln("Non-integer last message time in %s: %s", chat.Jid, chat.LastMessageTime)
  548. continue
  549. }
  550. portal := user.GetPortalByJID(chat.Jid)
  551. chats = append(chats, Chat{
  552. Portal: portal,
  553. Contact: user.Conn.Store.Contacts[chat.Jid],
  554. LastMessageTime: ts,
  555. })
  556. var inCommunity, ok bool
  557. if inCommunity, ok = existingKeys[portal.Key]; !ok || !inCommunity {
  558. inCommunity = user.addPortalToCommunity(portal)
  559. if portal.IsPrivateChat() {
  560. puppet := user.bridge.GetPuppetByJID(portal.Key.JID)
  561. user.addPuppetToCommunity(puppet)
  562. }
  563. }
  564. portalKeys = append(portalKeys, database.PortalKeyWithMeta{PortalKey: portal.Key, InCommunity: inCommunity})
  565. }
  566. user.log.Infoln("Read chat list, updating user-portal mapping")
  567. err := user.SetPortalKeys(portalKeys)
  568. if err != nil {
  569. user.log.Warnln("Failed to update user-portal mapping:", err)
  570. }
  571. sort.Sort(chats)
  572. limit := user.bridge.Config.Bridge.InitialChatSync
  573. if limit < 0 {
  574. limit = len(chats)
  575. }
  576. now := uint64(time.Now().Unix())
  577. user.log.Infoln("Syncing portals")
  578. for i, chat := range chats {
  579. if chat.LastMessageTime+user.bridge.Config.Bridge.SyncChatMaxAge < now {
  580. break
  581. }
  582. create := (chat.LastMessageTime >= user.LastConnection && user.LastConnection > 0) || i < limit
  583. if len(chat.Portal.MXID) > 0 || create || createAll {
  584. // Don't sync unless chat meta sync is enabled or portal doesn't exist
  585. if user.bridge.Config.Bridge.ChatMetaSync || len(chat.Portal.MXID) == 0 {
  586. chat.Portal.Sync(user, chat.Contact)
  587. }
  588. err = chat.Portal.BackfillHistory(user, chat.LastMessageTime)
  589. if err != nil {
  590. chat.Portal.log.Errorln("Error backfilling history:", err)
  591. }
  592. }
  593. }
  594. user.UpdateDirectChats(nil)
  595. user.log.Infoln("Finished syncing portals")
  596. select {
  597. case user.syncPortalsDone <- struct{}{}:
  598. default:
  599. }
  600. }
  601. func (user *User) getDirectChats() map[id.UserID][]id.RoomID {
  602. res := make(map[id.UserID][]id.RoomID)
  603. privateChats := user.bridge.DB.Portal.FindPrivateChats(user.JID)
  604. for _, portal := range privateChats {
  605. if len(portal.MXID) > 0 {
  606. res[user.bridge.FormatPuppetMXID(portal.Key.JID)] = []id.RoomID{portal.MXID}
  607. }
  608. }
  609. return res
  610. }
  611. func (user *User) UpdateDirectChats(chats map[id.UserID][]id.RoomID) {
  612. if !user.bridge.Config.Bridge.SyncDirectChatList {
  613. return
  614. }
  615. puppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  616. if puppet == nil || puppet.CustomIntent() == nil {
  617. return
  618. }
  619. intent := puppet.CustomIntent()
  620. method := http.MethodPatch
  621. if chats == nil {
  622. chats = user.getDirectChats()
  623. method = http.MethodPut
  624. }
  625. user.log.Debugln("Updating m.direct list on homeserver")
  626. var err error
  627. if user.bridge.Config.Homeserver.Asmux {
  628. urlPath := intent.BuildBaseURL("_matrix", "client", "unstable", "net.maunium.asmux", "dms")
  629. _, err = intent.MakeFullRequest(method, urlPath, http.Header{
  630. "X-Asmux-Auth": {user.bridge.AS.Registration.AppToken},
  631. }, chats, nil)
  632. } else {
  633. existingChats := make(map[id.UserID][]id.RoomID)
  634. err = intent.GetAccountData(event.AccountDataDirectChats.Type, &existingChats)
  635. if err != nil {
  636. user.log.Warnln("Failed to get m.direct list to update it:", err)
  637. return
  638. }
  639. for userID, rooms := range existingChats {
  640. if _, ok := user.bridge.ParsePuppetMXID(userID); !ok {
  641. // This is not a ghost user, include it in the new list
  642. chats[userID] = rooms
  643. } else if _, ok := chats[userID]; !ok && method == http.MethodPatch {
  644. // This is a ghost user, but we're not replacing the whole list, so include it too
  645. chats[userID] = rooms
  646. }
  647. }
  648. err = intent.SetAccountData(event.AccountDataDirectChats.Type, &chats)
  649. }
  650. if err != nil {
  651. user.log.Warnln("Failed to update m.direct list:", err)
  652. }
  653. }
  654. func (user *User) HandleContactList(contacts []whatsapp.Contact) {
  655. contactMap := make(map[string]whatsapp.Contact)
  656. for _, contact := range contacts {
  657. contactMap[contact.Jid] = contact
  658. }
  659. go user.syncPuppets(contactMap)
  660. }
  661. func (user *User) syncPuppets(contacts map[string]whatsapp.Contact) {
  662. if contacts == nil {
  663. contacts = user.Conn.Store.Contacts
  664. }
  665. user.log.Infoln("Syncing puppet info from contacts")
  666. for jid, contact := range contacts {
  667. if strings.HasSuffix(jid, whatsappExt.NewUserSuffix) {
  668. puppet := user.bridge.GetPuppetByJID(contact.Jid)
  669. puppet.Sync(user, contact)
  670. }
  671. }
  672. user.log.Infoln("Finished syncing puppet info from contacts")
  673. }
  674. func (user *User) updateLastConnectionIfNecessary() {
  675. if user.LastConnection+60 < uint64(time.Now().Unix()) {
  676. user.UpdateLastConnection()
  677. }
  678. }
  679. func (user *User) HandleError(err error) {
  680. if !errors.Is(err, whatsapp.ErrInvalidWsData) {
  681. user.log.Errorfln("WhatsApp error: %v", err)
  682. }
  683. if closed, ok := err.(*whatsapp.ErrConnectionClosed); ok {
  684. user.bridge.Metrics.TrackDisconnection(user.MXID)
  685. if closed.Code == 1000 && user.cleanDisconnection {
  686. user.cleanDisconnection = false
  687. if !user.bridge.Config.Bridge.AggressiveReconnect {
  688. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  689. user.log.Infoln("Clean disconnection by server")
  690. return
  691. } else {
  692. user.log.Debugln("Clean disconnection by server, but aggressive reconnection is enabled")
  693. }
  694. }
  695. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection was closed with websocket status code %d", closed.Code))
  696. } else if failed, ok := err.(*whatsapp.ErrConnectionFailed); ok {
  697. user.bridge.Metrics.TrackDisconnection(user.MXID)
  698. user.ConnectionErrors++
  699. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection failed: %v", failed.Err))
  700. }
  701. // Otherwise unknown error, probably mostly harmless
  702. }
  703. func (user *User) tryReconnect(msg string) {
  704. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  705. if user.ConnectionErrors > user.bridge.Config.Bridge.MaxConnectionAttempts {
  706. user.sendMarkdownBridgeAlert("%s. Use the `reconnect` command to reconnect.", msg)
  707. return
  708. }
  709. if user.bridge.Config.Bridge.ReportConnectionRetry {
  710. user.sendBridgeNotice("%s. Reconnecting...", msg)
  711. // Don't want the same error to be repeated
  712. msg = ""
  713. }
  714. var tries uint
  715. var exponentialBackoff bool
  716. baseDelay := time.Duration(user.bridge.Config.Bridge.ConnectionRetryDelay)
  717. if baseDelay < 0 {
  718. exponentialBackoff = true
  719. baseDelay = -baseDelay + 1
  720. }
  721. delay := baseDelay
  722. conn := user.Conn
  723. for user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
  724. if user.Conn != conn {
  725. user.log.Debugln("Connection was recreated, aborting reconnection attempts")
  726. return
  727. }
  728. err := conn.Restore()
  729. if err == nil {
  730. user.ConnectionErrors = 0
  731. if user.bridge.Config.Bridge.ReportConnectionRetry {
  732. user.sendBridgeNotice("Reconnected successfully")
  733. }
  734. user.PostLogin()
  735. return
  736. } else if errors.Is(err, whatsapp.ErrBadRequest) {
  737. user.log.Infoln("Got init 400 error when trying to reconnect, resetting connection...")
  738. sess, err := conn.Disconnect()
  739. if err != nil {
  740. user.log.Debugln("Error while disconnecting for connection reset:", err)
  741. }
  742. user.SetSession(&sess)
  743. }
  744. user.log.Errorln("Error while trying to reconnect after disconnection:", err)
  745. tries++
  746. user.ConnectionErrors++
  747. if user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
  748. if exponentialBackoff {
  749. delay = (1 << tries) + baseDelay
  750. }
  751. if user.bridge.Config.Bridge.ReportConnectionRetry {
  752. user.sendBridgeNotice("Reconnection attempt failed: %v. Retrying in %d seconds...", err, delay)
  753. }
  754. time.Sleep(delay * time.Second)
  755. }
  756. }
  757. if user.bridge.Config.Bridge.ReportConnectionRetry {
  758. user.sendMarkdownBridgeAlert("%d reconnection attempts failed. Use the `reconnect` command to try to reconnect manually.", tries)
  759. } else {
  760. user.sendMarkdownBridgeAlert("\u26a0 %s. Additionally, %d reconnection attempts failed. Use the `reconnect` command to try to reconnect.", msg, tries)
  761. }
  762. }
  763. func (user *User) ShouldCallSynchronously() bool {
  764. return true
  765. }
  766. func (user *User) HandleJSONParseError(err error) {
  767. user.log.Errorln("WhatsApp JSON parse error:", err)
  768. }
  769. func (user *User) PortalKey(jid types.WhatsAppID) database.PortalKey {
  770. return database.NewPortalKey(jid, user.JID)
  771. }
  772. func (user *User) GetPortalByJID(jid types.WhatsAppID) *Portal {
  773. return user.bridge.GetPortalByJID(user.PortalKey(jid))
  774. }
  775. func (user *User) runMessageRingBuffer() {
  776. for msg := range user.messageInput {
  777. select {
  778. case user.messageOutput <- msg:
  779. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  780. default:
  781. dropped := <-user.messageOutput
  782. user.log.Warnln("Buffer is full, dropping message in", dropped.chat)
  783. user.messageOutput <- msg
  784. }
  785. }
  786. }
  787. func (user *User) handleMessageLoop() {
  788. for {
  789. select {
  790. case msg := <-user.messageOutput:
  791. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  792. user.GetPortalByJID(msg.chat).messages <- msg
  793. case <-user.syncStart:
  794. user.log.Debugln("Processing of incoming messages is locked")
  795. user.bridge.Metrics.TrackSyncLock(user.JID, true)
  796. user.syncWait.Wait()
  797. user.bridge.Metrics.TrackSyncLock(user.JID, false)
  798. user.log.Debugln("Processing of incoming messages unlocked")
  799. }
  800. }
  801. }
  802. func (user *User) HandleNewContact(contact whatsapp.Contact) {
  803. user.log.Debugfln("Contact message: %+v", contact)
  804. go func() {
  805. if strings.HasSuffix(contact.Jid, whatsappExt.OldUserSuffix) {
  806. contact.Jid = strings.Replace(contact.Jid, whatsappExt.OldUserSuffix, whatsappExt.NewUserSuffix, -1)
  807. }
  808. puppet := user.bridge.GetPuppetByJID(contact.Jid)
  809. puppet.UpdateName(user, contact)
  810. }()
  811. }
  812. func (user *User) HandleBatteryMessage(battery whatsapp.BatteryMessage) {
  813. user.log.Debugfln("Battery message: %+v", battery)
  814. var notice string
  815. if !battery.Plugged && battery.Percentage < 15 && user.batteryWarningsSent < 1 {
  816. notice = fmt.Sprintf("Phone battery low (%d %% remaining)", battery.Percentage)
  817. user.batteryWarningsSent = 1
  818. } else if !battery.Plugged && battery.Percentage < 5 && user.batteryWarningsSent < 2 {
  819. notice = fmt.Sprintf("Phone battery very low (%d %% remaining)", battery.Percentage)
  820. user.batteryWarningsSent = 2
  821. } else if battery.Percentage > 15 || battery.Plugged {
  822. user.batteryWarningsSent = 0
  823. }
  824. if notice != "" {
  825. go user.sendBridgeNotice("%s", notice)
  826. }
  827. }
  828. func (user *User) HandleTextMessage(message whatsapp.TextMessage) {
  829. user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}
  830. }
  831. func (user *User) HandleImageMessage(message whatsapp.ImageMessage) {
  832. user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}
  833. }
  834. func (user *User) HandleStickerMessage(message whatsapp.StickerMessage) {
  835. user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}
  836. }
  837. func (user *User) HandleVideoMessage(message whatsapp.VideoMessage) {
  838. user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}
  839. }
  840. func (user *User) HandleAudioMessage(message whatsapp.AudioMessage) {
  841. user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}
  842. }
  843. func (user *User) HandleDocumentMessage(message whatsapp.DocumentMessage) {
  844. user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}
  845. }
  846. func (user *User) HandleContactMessage(message whatsapp.ContactMessage) {
  847. user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}
  848. }
  849. func (user *User) HandleStubMessage(message whatsapp.StubMessage) {
  850. user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}
  851. }
  852. func (user *User) HandleLocationMessage(message whatsapp.LocationMessage) {
  853. user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}
  854. }
  855. func (user *User) HandleMessageRevoke(message whatsappExt.MessageRevocation) {
  856. user.messageInput <- PortalMessage{message.RemoteJid, user, message, 0}
  857. }
  858. type FakeMessage struct {
  859. Text string
  860. ID string
  861. Alert bool
  862. }
  863. func (user *User) HandleCallInfo(info whatsappExt.CallInfo) {
  864. if info.Data != nil {
  865. return
  866. }
  867. data := FakeMessage{
  868. ID: info.ID,
  869. }
  870. switch info.Type {
  871. case whatsappExt.CallOffer:
  872. if !user.bridge.Config.Bridge.CallNotices.Start {
  873. return
  874. }
  875. data.Text = "Incoming call"
  876. data.Alert = true
  877. case whatsappExt.CallOfferVideo:
  878. if !user.bridge.Config.Bridge.CallNotices.Start {
  879. return
  880. }
  881. data.Text = "Incoming video call"
  882. data.Alert = true
  883. case whatsappExt.CallTerminate:
  884. if !user.bridge.Config.Bridge.CallNotices.End {
  885. return
  886. }
  887. data.Text = "Call ended"
  888. data.ID += "E"
  889. default:
  890. return
  891. }
  892. portal := user.GetPortalByJID(info.From)
  893. if portal != nil {
  894. portal.messages <- PortalMessage{info.From, user, data, 0}
  895. }
  896. }
  897. func (user *User) HandlePresence(info whatsappExt.Presence) {
  898. puppet := user.bridge.GetPuppetByJID(info.SenderJID)
  899. switch info.Status {
  900. case whatsapp.PresenceUnavailable:
  901. _ = puppet.DefaultIntent().SetPresence("offline")
  902. case whatsapp.PresenceAvailable:
  903. if len(puppet.typingIn) > 0 && puppet.typingAt+15 > time.Now().Unix() {
  904. portal := user.bridge.GetPortalByMXID(puppet.typingIn)
  905. _, _ = puppet.IntentFor(portal).UserTyping(puppet.typingIn, false, 0)
  906. puppet.typingIn = ""
  907. puppet.typingAt = 0
  908. } else {
  909. _ = puppet.DefaultIntent().SetPresence("online")
  910. }
  911. case whatsapp.PresenceComposing:
  912. portal := user.GetPortalByJID(info.JID)
  913. if len(puppet.typingIn) > 0 && puppet.typingAt+15 > time.Now().Unix() {
  914. if puppet.typingIn == portal.MXID {
  915. return
  916. }
  917. _, _ = puppet.IntentFor(portal).UserTyping(puppet.typingIn, false, 0)
  918. }
  919. puppet.typingIn = portal.MXID
  920. puppet.typingAt = time.Now().Unix()
  921. _, _ = puppet.IntentFor(portal).UserTyping(portal.MXID, true, 15*1000)
  922. }
  923. }
  924. func (user *User) HandleMsgInfo(info whatsappExt.MsgInfo) {
  925. if (info.Command == whatsappExt.MsgInfoCommandAck || info.Command == whatsappExt.MsgInfoCommandAcks) && info.Acknowledgement == whatsappExt.AckMessageRead {
  926. portal := user.GetPortalByJID(info.ToJID)
  927. if len(portal.MXID) == 0 {
  928. return
  929. }
  930. go func() {
  931. intent := user.bridge.GetPuppetByJID(info.SenderJID).IntentFor(portal)
  932. for _, msgID := range info.IDs {
  933. msg := user.bridge.DB.Message.GetByJID(portal.Key, msgID)
  934. if msg == nil || msg.IsFakeMXID() {
  935. continue
  936. }
  937. err := intent.MarkRead(portal.MXID, msg.MXID)
  938. if err != nil {
  939. user.log.Warnln("Failed to mark message %s as read by %s: %v", msg.MXID, info.SenderJID, err)
  940. }
  941. }
  942. }()
  943. }
  944. }
  945. func (user *User) HandleReceivedMessage(received whatsapp.ReceivedMessage) {
  946. if received.Type == "read" {
  947. user.markSelfRead(received.Jid, received.Index)
  948. } else {
  949. user.log.Debugfln("Unknown received message type: %+v", received)
  950. }
  951. }
  952. func (user *User) HandleReadMessage(read whatsapp.ReadMessage) {
  953. user.log.Debugfln("Received chat read message: %+v", read)
  954. user.markSelfRead(read.Jid, "")
  955. }
  956. func (user *User) markSelfRead(jid, messageID string) {
  957. if strings.HasSuffix(jid, whatsappExt.OldUserSuffix) {
  958. jid = strings.Replace(jid, whatsappExt.OldUserSuffix, whatsappExt.NewUserSuffix, -1)
  959. }
  960. puppet := user.bridge.GetPuppetByJID(user.JID)
  961. if puppet == nil {
  962. return
  963. }
  964. intent := puppet.CustomIntent()
  965. if intent == nil {
  966. return
  967. }
  968. portal := user.GetPortalByJID(jid)
  969. if portal == nil {
  970. return
  971. }
  972. var message *database.Message
  973. if messageID == "" {
  974. message = user.bridge.DB.Message.GetLastInChat(portal.Key)
  975. if message == nil {
  976. return
  977. }
  978. user.log.Debugfln("User read chat %s/%s in WhatsApp mobile (last known event: %s/%s)", portal.Key.JID, portal.MXID, message.JID, message.MXID)
  979. } else {
  980. message = user.bridge.DB.Message.GetByJID(portal.Key, messageID)
  981. if message == nil || message.IsFakeMXID() {
  982. return
  983. }
  984. user.log.Debugfln("User read message %s/%s in %s/%s in WhatsApp mobile", message.JID, message.MXID, portal.Key.JID, portal.MXID)
  985. }
  986. err := intent.MarkRead(portal.MXID, message.MXID)
  987. if err != nil {
  988. user.log.Warnfln("Failed to bridge own read receipt in %s: %v", jid, err)
  989. }
  990. }
  991. func (user *User) HandleCommand(cmd whatsappExt.Command) {
  992. switch cmd.Type {
  993. case whatsappExt.CommandPicture:
  994. if strings.HasSuffix(cmd.JID, whatsappExt.NewUserSuffix) {
  995. puppet := user.bridge.GetPuppetByJID(cmd.JID)
  996. go puppet.UpdateAvatar(user, cmd.ProfilePicInfo)
  997. } else if user.bridge.Config.Bridge.ChatMetaSync {
  998. portal := user.GetPortalByJID(cmd.JID)
  999. go portal.UpdateAvatar(user, cmd.ProfilePicInfo, true)
  1000. }
  1001. case whatsappExt.CommandDisconnect:
  1002. if cmd.Kind == "replaced" {
  1003. user.cleanDisconnection = true
  1004. go user.sendMarkdownBridgeAlert("\u26a0 Your WhatsApp connection was closed by the server because you opened another WhatsApp Web client.\n\n" +
  1005. "Use the `reconnect` command to disconnect the other client and resume bridging.")
  1006. } else {
  1007. user.log.Warnln("Unknown kind of disconnect:", string(cmd.Raw))
  1008. go user.sendMarkdownBridgeAlert("\u26a0 Your WhatsApp connection was closed by the server (reason code: %s).\n\n"+
  1009. "Use the `reconnect` command to reconnect.", cmd.Kind)
  1010. }
  1011. }
  1012. }
  1013. func (user *User) HandleChatUpdate(cmd whatsappExt.ChatUpdate) {
  1014. if cmd.Command != whatsappExt.ChatUpdateCommandAction {
  1015. return
  1016. }
  1017. portal := user.GetPortalByJID(cmd.JID)
  1018. if len(portal.MXID) == 0 {
  1019. if cmd.Data.Action == whatsappExt.ChatActionIntroduce || cmd.Data.Action == whatsappExt.ChatActionCreate {
  1020. go func() {
  1021. err := portal.CreateMatrixRoom(user)
  1022. if err != nil {
  1023. user.log.Errorln("Failed to create portal room after receiving join event:", err)
  1024. }
  1025. }()
  1026. }
  1027. return
  1028. }
  1029. // These don't come down the message history :(
  1030. switch cmd.Data.Action {
  1031. case whatsappExt.ChatActionAddTopic:
  1032. go portal.UpdateTopic(cmd.Data.AddTopic.Topic, cmd.Data.SenderJID, nil,true)
  1033. case whatsappExt.ChatActionRemoveTopic:
  1034. go portal.UpdateTopic("", cmd.Data.SenderJID, nil,true)
  1035. }
  1036. if !user.bridge.Config.Bridge.ChatMetaSync {
  1037. // Ignore chat update commands, we're relying on the message history.
  1038. return
  1039. }
  1040. switch cmd.Data.Action {
  1041. case whatsappExt.ChatActionNameChange:
  1042. go portal.UpdateName(cmd.Data.NameChange.Name, cmd.Data.SenderJID, nil, true)
  1043. case whatsappExt.ChatActionPromote:
  1044. go portal.ChangeAdminStatus(cmd.Data.UserChange.JIDs, true)
  1045. case whatsappExt.ChatActionDemote:
  1046. go portal.ChangeAdminStatus(cmd.Data.UserChange.JIDs, false)
  1047. case whatsappExt.ChatActionAnnounce:
  1048. go portal.RestrictMessageSending(cmd.Data.Announce)
  1049. case whatsappExt.ChatActionRestrict:
  1050. go portal.RestrictMetadataChanges(cmd.Data.Restrict)
  1051. case whatsappExt.ChatActionRemove:
  1052. go portal.HandleWhatsAppKick(cmd.Data.SenderJID, cmd.Data.UserChange.JIDs)
  1053. case whatsappExt.ChatActionAdd:
  1054. go portal.HandleWhatsAppInvite(cmd.Data.SenderJID, nil, cmd.Data.UserChange.JIDs)
  1055. case whatsappExt.ChatActionIntroduce:
  1056. if cmd.Data.SenderJID != "unknown" {
  1057. go portal.Sync(user, whatsapp.Contact{Jid: portal.Key.JID})
  1058. }
  1059. }
  1060. }
  1061. func (user *User) HandleJsonMessage(message string) {
  1062. var msg json.RawMessage
  1063. err := json.Unmarshal([]byte(message), &msg)
  1064. if err != nil {
  1065. return
  1066. }
  1067. user.log.Debugln("JSON message:", message)
  1068. user.updateLastConnectionIfNecessary()
  1069. }
  1070. func (user *User) HandleRawMessage(message *waProto.WebMessageInfo) {
  1071. user.updateLastConnectionIfNecessary()
  1072. }
  1073. func (user *User) HandleUnknownBinaryNode(node *waBinary.Node) {
  1074. user.log.Debugfln("Unknown binary message: %+v", node)
  1075. }
  1076. func (user *User) NeedsRelaybot(portal *Portal) bool {
  1077. return !user.HasSession() || !user.IsInPortal(portal.Key)
  1078. }