user.go 36 KB


  1. // mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
  2. // Copyright (C) 2020 Tulir Asokan
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package main
  17. import (
  18. "encoding/json"
  19. "errors"
  20. "fmt"
  21. "net/http"
  22. "sort"
  23. "strconv"
  24. "strings"
  25. "sync"
  26. "time"
  27. "github.com/skip2/go-qrcode"
  28. log "maunium.net/go/maulogger/v2"
  29. "github.com/Rhymen/go-whatsapp"
  30. waBinary "github.com/Rhymen/go-whatsapp/binary"
  31. waProto "github.com/Rhymen/go-whatsapp/binary/proto"
  32. "maunium.net/go/mautrix"
  33. "maunium.net/go/mautrix/event"
  34. "maunium.net/go/mautrix/format"
  35. "maunium.net/go/mautrix/id"
  36. "maunium.net/go/mautrix-whatsapp/database"
  37. )
  38. type User struct {
  39. *database.User
  40. Conn *whatsapp.Conn
  41. bridge *Bridge
  42. log log.Logger
  43. Admin bool
  44. Whitelisted bool
  45. RelaybotWhitelisted bool
  46. IsRelaybot bool
  47. ConnectionErrors int
  48. CommunityID string
  49. cleanDisconnection bool
  50. batteryWarningsSent int
  51. lastReconnection int64
  52. chatListReceived chan struct{}
  53. syncPortalsDone chan struct{}
  54. messageInput chan PortalMessage
  55. messageOutput chan PortalMessage
  56. syncStart chan struct{}
  57. syncWait sync.WaitGroup
  58. mgmtCreateLock sync.Mutex
  59. connLock sync.Mutex
  60. }
  61. func (bridge *Bridge) GetUserByMXID(userID id.UserID) *User {
  62. _, isPuppet := bridge.ParsePuppetMXID(userID)
  63. if isPuppet || userID == bridge.Bot.UserID {
  64. return nil
  65. }
  66. bridge.usersLock.Lock()
  67. defer bridge.usersLock.Unlock()
  68. user, ok := bridge.usersByMXID[userID]
  69. if !ok {
  70. return bridge.loadDBUser(bridge.DB.User.GetByMXID(userID), &userID)
  71. }
  72. return user
  73. }
  74. func (bridge *Bridge) GetUserByJID(userID whatsapp.JID) *User {
  75. bridge.usersLock.Lock()
  76. defer bridge.usersLock.Unlock()
  77. user, ok := bridge.usersByJID[userID]
  78. if !ok {
  79. return bridge.loadDBUser(bridge.DB.User.GetByJID(userID), nil)
  80. }
  81. return user
  82. }
  83. func (user *User) addToJIDMap() {
  84. user.bridge.usersLock.Lock()
  85. user.bridge.usersByJID[user.JID] = user
  86. user.bridge.usersLock.Unlock()
  87. }
  88. func (user *User) removeFromJIDMap() {
  89. user.bridge.usersLock.Lock()
  90. jidUser, ok := user.bridge.usersByJID[user.JID]
  91. if ok && user == jidUser {
  92. delete(user.bridge.usersByJID, user.JID)
  93. }
  94. user.bridge.usersLock.Unlock()
  95. user.bridge.Metrics.TrackLoginState(user.JID, false)
  96. }
  97. func (bridge *Bridge) GetAllUsers() []*User {
  98. bridge.usersLock.Lock()
  99. defer bridge.usersLock.Unlock()
  100. dbUsers := bridge.DB.User.GetAll()
  101. output := make([]*User, len(dbUsers))
  102. for index, dbUser := range dbUsers {
  103. user, ok := bridge.usersByMXID[dbUser.MXID]
  104. if !ok {
  105. user = bridge.loadDBUser(dbUser, nil)
  106. }
  107. output[index] = user
  108. }
  109. return output
  110. }
  111. func (bridge *Bridge) loadDBUser(dbUser *database.User, mxid *id.UserID) *User {
  112. if dbUser == nil {
  113. if mxid == nil {
  114. return nil
  115. }
  116. dbUser = bridge.DB.User.New()
  117. dbUser.MXID = *mxid
  118. dbUser.Insert()
  119. }
  120. user := bridge.NewUser(dbUser)
  121. bridge.usersByMXID[user.MXID] = user
  122. if len(user.JID) > 0 {
  123. bridge.usersByJID[user.JID] = user
  124. }
  125. if len(user.ManagementRoom) > 0 {
  126. bridge.managementRooms[user.ManagementRoom] = user
  127. }
  128. return user
  129. }
  130. func (user *User) GetPortals() []*Portal {
  131. keys := user.User.GetPortalKeys()
  132. portals := make([]*Portal, len(keys))
  133. user.bridge.portalsLock.Lock()
  134. for i, key := range keys {
  135. portal, ok := user.bridge.portalsByJID[key]
  136. if !ok {
  137. portal = user.bridge.loadDBPortal(user.bridge.DB.Portal.GetByJID(key), &key)
  138. }
  139. portals[i] = portal
  140. }
  141. user.bridge.portalsLock.Unlock()
  142. return portals
  143. }
  144. func (bridge *Bridge) NewUser(dbUser *database.User) *User {
  145. user := &User{
  146. User: dbUser,
  147. bridge: bridge,
  148. log: bridge.Log.Sub("User").Sub(string(dbUser.MXID)),
  149. IsRelaybot: false,
  150. chatListReceived: make(chan struct{}, 1),
  151. syncPortalsDone: make(chan struct{}, 1),
  152. syncStart: make(chan struct{}, 1),
  153. messageInput: make(chan PortalMessage),
  154. messageOutput: make(chan PortalMessage, bridge.Config.Bridge.UserMessageBuffer),
  155. }
  156. user.RelaybotWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelaybotWhitelisted(user.MXID)
  157. user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID)
  158. user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID)
  159. go user.handleMessageLoop()
  160. go user.runMessageRingBuffer()
  161. return user
  162. }
  163. func (user *User) GetManagementRoom() id.RoomID {
  164. if len(user.ManagementRoom) == 0 {
  165. user.mgmtCreateLock.Lock()
  166. defer user.mgmtCreateLock.Unlock()
  167. if len(user.ManagementRoom) > 0 {
  168. return user.ManagementRoom
  169. }
  170. resp, err := user.bridge.Bot.CreateRoom(&mautrix.ReqCreateRoom{
  171. Topic: "WhatsApp bridge notices",
  172. IsDirect: true,
  173. })
  174. if err != nil {
  175. user.log.Errorln("Failed to auto-create management room:", err)
  176. } else {
  177. user.SetManagementRoom(resp.RoomID)
  178. }
  179. }
  180. return user.ManagementRoom
  181. }
  182. func (user *User) SetManagementRoom(roomID id.RoomID) {
  183. existingUser, ok := user.bridge.managementRooms[roomID]
  184. if ok {
  185. existingUser.ManagementRoom = ""
  186. existingUser.Update()
  187. }
  188. user.ManagementRoom = roomID
  189. user.bridge.managementRooms[user.ManagementRoom] = user
  190. user.Update()
  191. }
  192. func (user *User) SetSession(session *whatsapp.Session) {
  193. if session == nil {
  194. user.Session = nil
  195. user.LastConnection = 0
  196. } else if len(session.Wid) > 0 {
  197. user.Session = session
  198. } else {
  199. return
  200. }
  201. user.Update()
  202. }
  203. func (user *User) Connect(evenIfNoSession bool) bool {
  204. user.connLock.Lock()
  205. if user.Conn != nil && user.Conn.IsConnected() {
  206. user.connLock.Unlock()
  207. return true
  208. } else if !evenIfNoSession && user.Session == nil {
  209. user.connLock.Unlock()
  210. return false
  211. }
  212. if user.Conn != nil {
  213. user.Disconnect()
  214. }
  215. user.log.Debugln("Connecting to WhatsApp")
  216. timeout := time.Duration(user.bridge.Config.Bridge.ConnectionTimeout)
  217. if timeout == 0 {
  218. timeout = 20
  219. }
  220. conn, err := whatsapp.NewConnWithOptions(&whatsapp.Options{
  221. Timeout: timeout * time.Second,
  222. LongClientName: user.bridge.Config.WhatsApp.OSName,
  223. ShortClientName: user.bridge.Config.WhatsApp.BrowserName,
  224. ClientVersion: WAVersion,
  225. })
  226. if err != nil {
  227. user.log.Errorln("Failed to connect to WhatsApp:", err)
  228. user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp server. " +
  229. "This indicates a network problem on the bridge server. See bridge logs for more info.")
  230. user.connLock.Unlock()
  231. return false
  232. }
  233. user.Conn = conn
  234. user.log.Debugln("WhatsApp connection successful")
  235. user.Conn.AddHandler(user)
  236. user.connLock.Unlock()
  237. return user.RestoreSession()
  238. }
  239. func (user *User) Disconnect() {
  240. sess, err := user.Conn.Disconnect()
  241. if err != nil && err != whatsapp.ErrNotConnected {
  242. user.log.Warnln("Error disconnecting: %v", err)
  243. }
  244. user.SetSession(&sess)
  245. user.Conn.RemoveHandlers()
  246. user.Conn = nil
  247. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  248. }
  249. func (user *User) RestoreSession() bool {
  250. if user.Session != nil {
  251. sess, err := user.Conn.RestoreWithSession(*user.Session)
  252. if err == whatsapp.ErrAlreadyLoggedIn {
  253. return true
  254. } else if err != nil {
  255. user.log.Errorln("Failed to restore session:", err)
  256. if errors.Is(err, whatsapp.ErrUnpaired) {
  257. user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp: unpaired from phone. " +
  258. "To re-pair your phone, log in again.")
  259. user.Disconnect()
  260. user.removeFromJIDMap()
  261. //user.JID = ""
  262. user.SetSession(nil)
  263. return false
  264. } else {
  265. user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp. Make sure WhatsApp " +
  266. "on your phone is reachable and use `reconnect` to try connecting again.")
  267. }
  268. user.log.Debugln("Disconnecting due to failed session restore...")
  269. _, err := user.Conn.Disconnect()
  270. if err != nil {
  271. user.log.Errorln("Failed to disconnect after failed session restore:", err)
  272. }
  273. return false
  274. }
  275. user.ConnectionErrors = 0
  276. user.SetSession(&sess)
  277. user.log.Debugln("Session restored successfully")
  278. user.PostLogin()
  279. }
  280. return true
  281. }
  282. func (user *User) HasSession() bool {
  283. return user.Session != nil
  284. }
  285. func (user *User) IsConnected() bool {
  286. return user.Conn != nil && user.Conn.IsConnected() && user.Conn.IsLoggedIn()
  287. }
  288. func (user *User) IsLoginInProgress() bool {
  289. return user.Conn != nil && user.Conn.IsLoginInProgress()
  290. }
  291. func (user *User) loginQrChannel(ce *CommandEvent, qrChan <-chan string, eventIDChan chan<- id.EventID) {
  292. var qrEventID id.EventID
  293. for code := range qrChan {
  294. if code == "stop" {
  295. return
  296. }
  297. qrCode, err := qrcode.Encode(code, qrcode.Low, 256)
  298. if err != nil {
  299. user.log.Errorln("Failed to encode QR code:", err)
  300. ce.Reply("Failed to encode QR code: %v", err)
  301. return
  302. }
  303. bot := user.bridge.AS.BotClient()
  304. resp, err := bot.UploadBytes(qrCode, "image/png")
  305. if err != nil {
  306. user.log.Errorln("Failed to upload QR code:", err)
  307. ce.Reply("Failed to upload QR code: %v", err)
  308. return
  309. }
  310. if qrEventID == "" {
  311. sendResp, err := bot.SendImage(ce.RoomID, code, resp.ContentURI)
  312. if err != nil {
  313. user.log.Errorln("Failed to send QR code to user:", err)
  314. return
  315. }
  316. qrEventID = sendResp.EventID
  317. eventIDChan <- qrEventID
  318. } else {
  319. _, err = bot.SendMessageEvent(ce.RoomID, event.EventMessage, &event.MessageEventContent{
  320. MsgType: event.MsgImage,
  321. Body: code,
  322. URL: resp.ContentURI.CUString(),
  323. NewContent: &event.MessageEventContent{
  324. MsgType: event.MsgImage,
  325. Body: code,
  326. URL: resp.ContentURI.CUString(),
  327. },
  328. RelatesTo: &event.RelatesTo{
  329. Type: event.RelReplace,
  330. EventID: qrEventID,
  331. },
  332. })
  333. if err != nil {
  334. user.log.Errorln("Failed to send edited QR code to user:", err)
  335. }
  336. }
  337. }
  338. }
  339. func (user *User) Login(ce *CommandEvent) {
  340. qrChan := make(chan string, 3)
  341. eventIDChan := make(chan id.EventID, 1)
  342. go user.loginQrChannel(ce, qrChan, eventIDChan)
  343. session, err := user.Conn.LoginWithRetry(qrChan, nil, user.bridge.Config.Bridge.LoginQRRegenCount)
  344. qrChan <- "stop"
  345. if err != nil {
  346. var eventID id.EventID
  347. select {
  348. case eventID = <-eventIDChan:
  349. default:
  350. }
  351. reply := event.MessageEventContent{
  352. MsgType: event.MsgText,
  353. }
  354. if err == whatsapp.ErrAlreadyLoggedIn {
  355. reply.Body = "You're already logged in"
  356. } else if err == whatsapp.ErrLoginInProgress {
  357. reply.Body = "You have a login in progress already."
  358. } else if err == whatsapp.ErrLoginTimedOut {
  359. reply.Body = "QR code scan timed out. Please try again."
  360. } else {
  361. user.log.Warnln("Failed to log in:", err)
  362. reply.Body = fmt.Sprintf("Unknown error while logging in: %v", err)
  363. }
  364. msg := reply
  365. if eventID != "" {
  366. msg.NewContent = &reply
  367. msg.RelatesTo = &event.RelatesTo{
  368. Type: event.RelReplace,
  369. EventID: eventID,
  370. }
  371. }
  372. _, _ = ce.Bot.SendMessageEvent(ce.RoomID, event.EventMessage, &msg)
  373. return
  374. }
  375. // TODO there's a bit of duplication between this and the provisioning API login method
  376. // Also between the two logout methods (commands.go and provisioning.go)
  377. user.ConnectionErrors = 0
  378. user.JID = strings.Replace(user.Conn.Info.Wid, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, 1)
  379. user.addToJIDMap()
  380. user.SetSession(&session)
  381. ce.Reply("Successfully logged in, synchronizing chats...")
  382. user.PostLogin()
  383. }
  384. type Chat struct {
  385. Portal *Portal
  386. LastMessageTime uint64
  387. Contact whatsapp.Contact
  388. }
  389. type ChatList []Chat
  390. func (cl ChatList) Len() int {
  391. return len(cl)
  392. }
  393. func (cl ChatList) Less(i, j int) bool {
  394. return cl[i].LastMessageTime > cl[j].LastMessageTime
  395. }
  396. func (cl ChatList) Swap(i, j int) {
  397. cl[i], cl[j] = cl[j], cl[i]
  398. }
  399. func (user *User) PostLogin() {
  400. user.bridge.Metrics.TrackConnectionState(user.JID, true)
  401. user.bridge.Metrics.TrackLoginState(user.JID, true)
  402. user.bridge.Metrics.TrackBufferLength(user.MXID, 0)
  403. user.log.Debugln("Locking processing of incoming messages and starting post-login sync")
  404. // TODO can an old sync still be ongoing when PostLogin is called again?
  405. // TODO 2: can the new sync happen before this happens?
  406. user.chatListReceived = make(chan struct{}, 1)
  407. user.syncPortalsDone = make(chan struct{}, 1)
  408. user.syncWait.Add(1)
  409. user.syncStart <- struct{}{}
  410. go user.intPostLogin(user.Conn)
  411. }
  412. func (user *User) tryAutomaticDoublePuppeting() {
  413. if !user.bridge.Config.CanDoublePuppet(user.MXID) {
  414. return
  415. }
  416. user.log.Debugln("Checking if double puppeting needs to be enabled")
  417. puppet := user.bridge.GetPuppetByJID(user.JID)
  418. if len(puppet.CustomMXID) > 0 {
  419. user.log.Debugln("User already has double-puppeting enabled")
  420. // Custom puppet already enabled
  421. return
  422. }
  423. accessToken, err := puppet.loginWithSharedSecret(user.MXID)
  424. if err != nil {
  425. user.log.Warnln("Failed to login with shared secret:", err)
  426. return
  427. }
  428. err = puppet.SwitchCustomMXID(accessToken, user.MXID)
  429. if err != nil {
  430. puppet.log.Warnln("Failed to switch to auto-logined custom puppet:", err)
  431. return
  432. }
  433. user.log.Infoln("Successfully automatically enabled custom puppet")
  434. }
  435. func (user *User) sendBridgeNotice(formatString string, args ...interface{}) {
  436. notice := fmt.Sprintf(formatString, args...)
  437. _, err := user.bridge.Bot.SendNotice(user.GetManagementRoom(), notice)
  438. if err != nil {
  439. user.log.Warnf("Failed to send bridge notice \"%s\": %v", notice, err)
  440. }
  441. }
  442. func (user *User) sendMarkdownBridgeAlert(formatString string, args ...interface{}) {
  443. notice := fmt.Sprintf(formatString, args...)
  444. content := format.RenderMarkdown(notice, true, false)
  445. _, err := user.bridge.Bot.SendMessageEvent(user.GetManagementRoom(), event.EventMessage, content)
  446. if err != nil {
  447. user.log.Warnf("Failed to send bridge alert \"%s\": %v", notice, err)
  448. }
  449. }
  450. func (user *User) postConnPing(conn *whatsapp.Conn) bool {
  451. if user.Conn != conn {
  452. user.log.Warnln("Connection changed before scheduled post-connection ping, canceling ping")
  453. return false
  454. }
  455. user.log.Debugln("Making post-connection ping")
  456. err := conn.AdminTest()
  457. if err != nil {
  458. user.log.Errorfln("Post-connection ping failed: %v. Disconnecting and then reconnecting after a second", err)
  459. sess, disconnectErr := conn.Disconnect()
  460. if disconnectErr != nil {
  461. user.log.Warnln("Error while disconnecting after failed post-connection ping:", disconnectErr)
  462. } else {
  463. user.Session = &sess
  464. }
  465. user.bridge.Metrics.TrackDisconnection(user.MXID)
  466. go func() {
  467. time.Sleep(1 * time.Second)
  468. user.tryReconnect(fmt.Sprintf("Post-connection ping failed: %v", err))
  469. }()
  470. return false
  471. } else {
  472. user.log.Debugln("Post-connection ping OK")
  473. return true
  474. }
  475. }
  476. func (user *User) intPostLogin(conn *whatsapp.Conn) {
  477. defer user.syncWait.Done()
  478. user.lastReconnection = time.Now().Unix()
  479. user.createCommunity()
  480. user.tryAutomaticDoublePuppeting()
  481. user.log.Debugln("Waiting for chat list receive confirmation")
  482. select {
  483. case <-user.chatListReceived:
  484. user.log.Debugln("Chat list receive confirmation received in PostLogin")
  485. case <-time.After(time.Duration(user.bridge.Config.Bridge.ChatListWait) * time.Second):
  486. user.log.Warnln("Timed out waiting for chat list to arrive!")
  487. user.postConnPing(conn)
  488. return
  489. }
  490. if !user.postConnPing(conn) {
  491. user.log.Debugln("Post-connection ping failed, unlocking processing of incoming messages.")
  492. return
  493. }
  494. user.log.Debugln("Waiting for portal sync complete confirmation")
  495. select {
  496. case <-user.syncPortalsDone:
  497. user.log.Debugln("Post-connection portal sync complete, unlocking processing of incoming messages.")
  498. // TODO this is too short, maybe a per-portal duration?
  499. case <-time.After(time.Duration(user.bridge.Config.Bridge.PortalSyncWait) * time.Second):
  500. user.log.Warnln("Timed out waiting for portal sync to complete! Unlocking processing of incoming messages.")
  501. }
  502. }
  503. type InfoGetter interface {
  504. GetInfo() whatsapp.MessageInfo
  505. }
  506. func (user *User) HandleEvent(event interface{}) {
  507. switch v := event.(type) {
  508. case whatsapp.TextMessage, whatsapp.ImageMessage, whatsapp.StickerMessage, whatsapp.VideoMessage,
  509. whatsapp.AudioMessage, whatsapp.DocumentMessage, whatsapp.ContactMessage, whatsapp.StubMessage,
  510. whatsapp.LocationMessage:
  511. info := v.(InfoGetter).GetInfo()
  512. user.messageInput <- PortalMessage{info.RemoteJid, user, v, info.Timestamp}
  513. case whatsapp.MessageRevocation:
  514. user.messageInput <- PortalMessage{v.RemoteJid, user, v, 0}
  515. case whatsapp.StreamEvent:
  516. user.HandleStreamEvent(v)
  517. case []whatsapp.Chat:
  518. user.HandleChatList(v)
  519. case []whatsapp.Contact:
  520. user.HandleContactList(v)
  521. case error:
  522. user.HandleError(v)
  523. case whatsapp.Contact:
  524. go user.HandleNewContact(v)
  525. case whatsapp.BatteryMessage:
  526. user.HandleBatteryMessage(v)
  527. case whatsapp.CallInfo:
  528. user.HandleCallInfo(v)
  529. case whatsapp.PresenceEvent:
  530. go user.HandlePresence(v)
  531. case whatsapp.JSONMsgInfo:
  532. go user.HandleMsgInfo(v)
  533. case whatsapp.ReceivedMessage:
  534. user.HandleReceivedMessage(v)
  535. case whatsapp.ReadMessage:
  536. user.HandleReadMessage(v)
  537. case whatsapp.JSONCommand:
  538. user.HandleCommand(v)
  539. case whatsapp.ChatUpdate:
  540. user.HandleChatUpdate(v)
  541. case json.RawMessage:
  542. user.HandleJSONMessage(v)
  543. case *waProto.WebMessageInfo:
  544. user.updateLastConnectionIfNecessary()
  545. // TODO trace log
  546. user.log.Debugfln("WebMessageInfo: %+v", v)
  547. case *waBinary.Node:
  548. user.log.Debugfln("Unknown binary message: %+v", v)
  549. default:
  550. user.log.Debugfln("Unknown type of event in HandleEvent: %T", v)
  551. }
  552. }
  553. func (user *User) HandleStreamEvent(evt whatsapp.StreamEvent) {
  554. if evt.Type == whatsapp.StreamSleep {
  555. if user.lastReconnection+60 > time.Now().Unix() {
  556. user.lastReconnection = 0
  557. user.log.Infoln("Stream went to sleep soon after reconnection, making new post-connection ping in 20 seconds")
  558. conn := user.Conn
  559. go func() {
  560. time.Sleep(20 * time.Second)
  561. // TODO if this happens during the post-login sync, it can get stuck forever
  562. user.postConnPing(conn)
  563. }()
  564. }
  565. } else {
  566. user.log.Infofln("Stream event: %+v", evt)
  567. }
  568. }
  569. func (user *User) HandleChatList(chats []whatsapp.Chat) {
  570. user.log.Infoln("Chat list received")
  571. chatMap := make(map[string]whatsapp.Chat)
  572. for _, chat := range user.Conn.Store.Chats {
  573. chatMap[chat.Jid] = chat
  574. }
  575. for _, chat := range chats {
  576. chatMap[chat.Jid] = chat
  577. }
  578. select {
  579. case user.chatListReceived <- struct{}{}:
  580. default:
  581. }
  582. go user.syncPortals(chatMap, false)
  583. }
  584. func (user *User) syncPortals(chatMap map[string]whatsapp.Chat, createAll bool) {
  585. // TODO use contexts instead of checking if user.Conn is the same?
  586. connAtStart := user.Conn
  587. if chatMap == nil {
  588. chatMap = user.Conn.Store.Chats
  589. }
  590. user.log.Infoln("Reading chat list")
  591. chats := make(ChatList, 0, len(chatMap))
  592. existingKeys := user.GetInCommunityMap()
  593. portalKeys := make([]database.PortalKeyWithMeta, 0, len(chatMap))
  594. for _, chat := range chatMap {
  595. ts, err := strconv.ParseUint(chat.LastMessageTime, 10, 64)
  596. if err != nil {
  597. user.log.Warnfln("Non-integer last message time in %s: %s", chat.Jid, chat.LastMessageTime)
  598. continue
  599. }
  600. portal := user.GetPortalByJID(chat.Jid)
  601. chats = append(chats, Chat{
  602. Portal: portal,
  603. Contact: user.Conn.Store.Contacts[chat.Jid],
  604. LastMessageTime: ts,
  605. })
  606. var inCommunity, ok bool
  607. if inCommunity, ok = existingKeys[portal.Key]; !ok || !inCommunity {
  608. inCommunity = user.addPortalToCommunity(portal)
  609. if portal.IsPrivateChat() {
  610. puppet := user.bridge.GetPuppetByJID(portal.Key.JID)
  611. user.addPuppetToCommunity(puppet)
  612. }
  613. }
  614. portalKeys = append(portalKeys, database.PortalKeyWithMeta{PortalKey: portal.Key, InCommunity: inCommunity})
  615. }
  616. user.log.Infoln("Read chat list, updating user-portal mapping")
  617. err := user.SetPortalKeys(portalKeys)
  618. if err != nil {
  619. user.log.Warnln("Failed to update user-portal mapping:", err)
  620. }
  621. sort.Sort(chats)
  622. limit := user.bridge.Config.Bridge.InitialChatSync
  623. if limit < 0 {
  624. limit = len(chats)
  625. }
  626. if user.Conn != connAtStart {
  627. user.log.Debugln("Connection seems to have changed before sync, cancelling")
  628. return
  629. }
  630. now := uint64(time.Now().Unix())
  631. user.log.Infoln("Syncing portals")
  632. for i, chat := range chats {
  633. if chat.LastMessageTime+user.bridge.Config.Bridge.SyncChatMaxAge < now {
  634. break
  635. }
  636. create := (chat.LastMessageTime >= user.LastConnection && user.LastConnection > 0) || i < limit
  637. if len(chat.Portal.MXID) > 0 || create || createAll {
  638. // Don't sync unless chat meta sync is enabled or portal doesn't exist
  639. if user.bridge.Config.Bridge.ChatMetaSync || len(chat.Portal.MXID) == 0 {
  640. chat.Portal.Sync(user, chat.Contact)
  641. }
  642. err = chat.Portal.BackfillHistory(user, chat.LastMessageTime)
  643. if err != nil {
  644. chat.Portal.log.Errorln("Error backfilling history:", err)
  645. }
  646. }
  647. }
  648. if user.Conn != connAtStart {
  649. user.log.Debugln("Connection seems to have changed during sync, cancelling")
  650. return
  651. }
  652. user.UpdateDirectChats(nil)
  653. user.log.Infoln("Finished syncing portals")
  654. select {
  655. case user.syncPortalsDone <- struct{}{}:
  656. default:
  657. }
  658. }
  659. func (user *User) getDirectChats() map[id.UserID][]id.RoomID {
  660. res := make(map[id.UserID][]id.RoomID)
  661. privateChats := user.bridge.DB.Portal.FindPrivateChats(user.JID)
  662. for _, portal := range privateChats {
  663. if len(portal.MXID) > 0 {
  664. res[user.bridge.FormatPuppetMXID(portal.Key.JID)] = []id.RoomID{portal.MXID}
  665. }
  666. }
  667. return res
  668. }
  669. func (user *User) UpdateDirectChats(chats map[id.UserID][]id.RoomID) {
  670. if !user.bridge.Config.Bridge.SyncDirectChatList {
  671. return
  672. }
  673. puppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  674. if puppet == nil || puppet.CustomIntent() == nil {
  675. return
  676. }
  677. intent := puppet.CustomIntent()
  678. method := http.MethodPatch
  679. if chats == nil {
  680. chats = user.getDirectChats()
  681. method = http.MethodPut
  682. }
  683. user.log.Debugln("Updating m.direct list on homeserver")
  684. var err error
  685. if user.bridge.Config.Homeserver.Asmux {
  686. urlPath := intent.BuildBaseURL("_matrix", "client", "unstable", "net.maunium.asmux", "dms")
  687. _, err = intent.MakeFullRequest(method, urlPath, http.Header{
  688. "X-Asmux-Auth": {user.bridge.AS.Registration.AppToken},
  689. }, chats, nil)
  690. } else {
  691. existingChats := make(map[id.UserID][]id.RoomID)
  692. err = intent.GetAccountData(event.AccountDataDirectChats.Type, &existingChats)
  693. if err != nil {
  694. user.log.Warnln("Failed to get m.direct list to update it:", err)
  695. return
  696. }
  697. for userID, rooms := range existingChats {
  698. if _, ok := user.bridge.ParsePuppetMXID(userID); !ok {
  699. // This is not a ghost user, include it in the new list
  700. chats[userID] = rooms
  701. } else if _, ok := chats[userID]; !ok && method == http.MethodPatch {
  702. // This is a ghost user, but we're not replacing the whole list, so include it too
  703. chats[userID] = rooms
  704. }
  705. }
  706. err = intent.SetAccountData(event.AccountDataDirectChats.Type, &chats)
  707. }
  708. if err != nil {
  709. user.log.Warnln("Failed to update m.direct list:", err)
  710. }
  711. }
  712. func (user *User) HandleContactList(contacts []whatsapp.Contact) {
  713. contactMap := make(map[string]whatsapp.Contact)
  714. for _, contact := range contacts {
  715. contactMap[contact.Jid] = contact
  716. }
  717. go user.syncPuppets(contactMap)
  718. }
  719. func (user *User) syncPuppets(contacts map[string]whatsapp.Contact) {
  720. if contacts == nil {
  721. contacts = user.Conn.Store.Contacts
  722. }
  723. user.log.Infoln("Syncing puppet info from contacts")
  724. for jid, contact := range contacts {
  725. if strings.HasSuffix(jid, whatsapp.NewUserSuffix) {
  726. puppet := user.bridge.GetPuppetByJID(contact.Jid)
  727. puppet.Sync(user, contact)
  728. }
  729. }
  730. user.log.Infoln("Finished syncing puppet info from contacts")
  731. }
  732. func (user *User) updateLastConnectionIfNecessary() {
  733. if user.LastConnection+60 < uint64(time.Now().Unix()) {
  734. user.UpdateLastConnection()
  735. }
  736. }
  737. func (user *User) HandleError(err error) {
  738. if !errors.Is(err, whatsapp.ErrInvalidWsData) {
  739. user.log.Errorfln("WhatsApp error: %v", err)
  740. }
  741. if closed, ok := err.(*whatsapp.ErrConnectionClosed); ok {
  742. user.bridge.Metrics.TrackDisconnection(user.MXID)
  743. if closed.Code == 1000 && user.cleanDisconnection {
  744. user.cleanDisconnection = false
  745. if !user.bridge.Config.Bridge.AggressiveReconnect {
  746. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  747. user.log.Infoln("Clean disconnection by server")
  748. return
  749. } else {
  750. user.log.Debugln("Clean disconnection by server, but aggressive reconnection is enabled")
  751. }
  752. }
  753. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection was closed with websocket status code %d", closed.Code))
  754. } else if failed, ok := err.(*whatsapp.ErrConnectionFailed); ok {
  755. user.bridge.Metrics.TrackDisconnection(user.MXID)
  756. user.ConnectionErrors++
  757. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection failed: %v", failed.Err))
  758. }
  759. // Otherwise unknown error, probably mostly harmless
  760. }
  761. func (user *User) tryReconnect(msg string) {
  762. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  763. if user.ConnectionErrors > user.bridge.Config.Bridge.MaxConnectionAttempts {
  764. user.sendMarkdownBridgeAlert("%s. Use the `reconnect` command to reconnect.", msg)
  765. return
  766. }
  767. if user.bridge.Config.Bridge.ReportConnectionRetry {
  768. user.sendBridgeNotice("%s. Reconnecting...", msg)
  769. // Don't want the same error to be repeated
  770. msg = ""
  771. }
  772. var tries uint
  773. var exponentialBackoff bool
  774. baseDelay := time.Duration(user.bridge.Config.Bridge.ConnectionRetryDelay)
  775. if baseDelay < 0 {
  776. exponentialBackoff = true
  777. baseDelay = -baseDelay + 1
  778. }
  779. delay := baseDelay
  780. conn := user.Conn
  781. takeover := false
  782. for user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
  783. if user.Conn != conn {
  784. user.log.Debugln("Connection was recreated, aborting reconnection attempts")
  785. return
  786. }
  787. err := conn.Restore(takeover)
  788. takeover = true
  789. if err == nil {
  790. user.ConnectionErrors = 0
  791. if user.bridge.Config.Bridge.ReportConnectionRetry {
  792. user.sendBridgeNotice("Reconnected successfully")
  793. }
  794. user.PostLogin()
  795. return
  796. } else if errors.Is(err, whatsapp.ErrBadRequest) {
  797. user.log.Infoln("Got init 400 error when trying to reconnect, resetting connection...")
  798. sess, err := conn.Disconnect()
  799. if err != nil {
  800. user.log.Debugln("Error while disconnecting for connection reset:", err)
  801. }
  802. user.SetSession(&sess)
  803. }
  804. user.log.Errorln("Error while trying to reconnect after disconnection:", err)
  805. tries++
  806. user.ConnectionErrors++
  807. if user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
  808. if exponentialBackoff {
  809. delay = (1 << tries) + baseDelay
  810. }
  811. if user.bridge.Config.Bridge.ReportConnectionRetry {
  812. user.sendBridgeNotice("Reconnection attempt failed: %v. Retrying in %d seconds...", err, delay)
  813. }
  814. time.Sleep(delay * time.Second)
  815. }
  816. }
  817. if user.bridge.Config.Bridge.ReportConnectionRetry {
  818. user.sendMarkdownBridgeAlert("%d reconnection attempts failed. Use the `reconnect` command to try to reconnect manually.", tries)
  819. } else {
  820. user.sendMarkdownBridgeAlert("\u26a0 %s. Additionally, %d reconnection attempts failed. Use the `reconnect` command to try to reconnect.", msg, tries)
  821. }
  822. }
  823. func (user *User) PortalKey(jid whatsapp.JID) database.PortalKey {
  824. return database.NewPortalKey(jid, user.JID)
  825. }
  826. func (user *User) GetPortalByJID(jid whatsapp.JID) *Portal {
  827. return user.bridge.GetPortalByJID(user.PortalKey(jid))
  828. }
  829. func (user *User) runMessageRingBuffer() {
  830. for msg := range user.messageInput {
  831. select {
  832. case user.messageOutput <- msg:
  833. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  834. default:
  835. dropped := <-user.messageOutput
  836. user.log.Warnln("Buffer is full, dropping message in", dropped.chat)
  837. user.messageOutput <- msg
  838. }
  839. }
  840. }
  841. func (user *User) handleMessageLoop() {
  842. for {
  843. select {
  844. case msg := <-user.messageOutput:
  845. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  846. user.GetPortalByJID(msg.chat).messages <- msg
  847. case <-user.syncStart:
  848. user.log.Debugln("Processing of incoming messages is locked")
  849. user.bridge.Metrics.TrackSyncLock(user.JID, true)
  850. user.syncWait.Wait()
  851. user.bridge.Metrics.TrackSyncLock(user.JID, false)
  852. user.log.Debugln("Processing of incoming messages unlocked")
  853. }
  854. }
  855. }
  856. func (user *User) HandleNewContact(contact whatsapp.Contact) {
  857. user.log.Debugfln("Contact message: %+v", contact)
  858. if strings.HasSuffix(contact.Jid, whatsapp.OldUserSuffix) {
  859. contact.Jid = strings.Replace(contact.Jid, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, -1)
  860. }
  861. puppet := user.bridge.GetPuppetByJID(contact.Jid)
  862. puppet.UpdateName(user, contact)
  863. }
  864. func (user *User) HandleBatteryMessage(battery whatsapp.BatteryMessage) {
  865. user.log.Debugfln("Battery message: %+v", battery)
  866. var notice string
  867. if !battery.Plugged && battery.Percentage < 15 && user.batteryWarningsSent < 1 {
  868. notice = fmt.Sprintf("Phone battery low (%d %% remaining)", battery.Percentage)
  869. user.batteryWarningsSent = 1
  870. } else if !battery.Plugged && battery.Percentage < 5 && user.batteryWarningsSent < 2 {
  871. notice = fmt.Sprintf("Phone battery very low (%d %% remaining)", battery.Percentage)
  872. user.batteryWarningsSent = 2
  873. } else if battery.Percentage > 15 || battery.Plugged {
  874. user.batteryWarningsSent = 0
  875. }
  876. if notice != "" {
  877. go user.sendBridgeNotice("%s", notice)
  878. }
  879. }
  880. type FakeMessage struct {
  881. Text string
  882. ID string
  883. Alert bool
  884. }
  885. func (user *User) HandleCallInfo(info whatsapp.CallInfo) {
  886. if info.Data != nil {
  887. return
  888. }
  889. data := FakeMessage{
  890. ID: info.ID,
  891. }
  892. switch info.Type {
  893. case whatsapp.CallOffer:
  894. if !user.bridge.Config.Bridge.CallNotices.Start {
  895. return
  896. }
  897. data.Text = "Incoming call"
  898. data.Alert = true
  899. case whatsapp.CallOfferVideo:
  900. if !user.bridge.Config.Bridge.CallNotices.Start {
  901. return
  902. }
  903. data.Text = "Incoming video call"
  904. data.Alert = true
  905. case whatsapp.CallTerminate:
  906. if !user.bridge.Config.Bridge.CallNotices.End {
  907. return
  908. }
  909. data.Text = "Call ended"
  910. data.ID += "E"
  911. default:
  912. return
  913. }
  914. portal := user.GetPortalByJID(info.From)
  915. if portal != nil {
  916. portal.messages <- PortalMessage{info.From, user, data, 0}
  917. }
  918. }
  919. func (user *User) HandlePresence(info whatsapp.PresenceEvent) {
  920. puppet := user.bridge.GetPuppetByJID(info.SenderJID)
  921. switch info.Status {
  922. case whatsapp.PresenceUnavailable:
  923. _ = puppet.DefaultIntent().SetPresence("offline")
  924. case whatsapp.PresenceAvailable:
  925. if len(puppet.typingIn) > 0 && puppet.typingAt+15 > time.Now().Unix() {
  926. portal := user.bridge.GetPortalByMXID(puppet.typingIn)
  927. _, _ = puppet.IntentFor(portal).UserTyping(puppet.typingIn, false, 0)
  928. puppet.typingIn = ""
  929. puppet.typingAt = 0
  930. } else {
  931. _ = puppet.DefaultIntent().SetPresence("online")
  932. }
  933. case whatsapp.PresenceComposing:
  934. portal := user.GetPortalByJID(info.JID)
  935. if len(puppet.typingIn) > 0 && puppet.typingAt+15 > time.Now().Unix() {
  936. if puppet.typingIn == portal.MXID {
  937. return
  938. }
  939. _, _ = puppet.IntentFor(portal).UserTyping(puppet.typingIn, false, 0)
  940. }
  941. puppet.typingIn = portal.MXID
  942. puppet.typingAt = time.Now().Unix()
  943. _, _ = puppet.IntentFor(portal).UserTyping(portal.MXID, true, 15*1000)
  944. }
  945. }
  946. func (user *User) HandleMsgInfo(info whatsapp.JSONMsgInfo) {
  947. if (info.Command == whatsapp.MsgInfoCommandAck || info.Command == whatsapp.MsgInfoCommandAcks) && info.Acknowledgement == whatsapp.AckMessageRead {
  948. portal := user.GetPortalByJID(info.ToJID)
  949. if len(portal.MXID) == 0 {
  950. return
  951. }
  952. intent := user.bridge.GetPuppetByJID(info.SenderJID).IntentFor(portal)
  953. for _, msgID := range info.IDs {
  954. msg := user.bridge.DB.Message.GetByJID(portal.Key, msgID)
  955. if msg == nil || msg.IsFakeMXID() {
  956. continue
  957. }
  958. err := intent.MarkRead(portal.MXID, msg.MXID)
  959. if err != nil {
  960. user.log.Warnln("Failed to mark message %s as read by %s: %v", msg.MXID, info.SenderJID, err)
  961. }
  962. }
  963. }
  964. }
  965. func (user *User) HandleReceivedMessage(received whatsapp.ReceivedMessage) {
  966. if received.Type == "read" {
  967. go user.markSelfRead(received.Jid, received.Index)
  968. } else {
  969. user.log.Debugfln("Unknown received message type: %+v", received)
  970. }
  971. }
  972. func (user *User) HandleReadMessage(read whatsapp.ReadMessage) {
  973. user.log.Debugfln("Received chat read message: %+v", read)
  974. go user.markSelfRead(read.Jid, "")
  975. }
  976. func (user *User) markSelfRead(jid, messageID string) {
  977. if strings.HasSuffix(jid, whatsapp.OldUserSuffix) {
  978. jid = strings.Replace(jid, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, -1)
  979. }
  980. puppet := user.bridge.GetPuppetByJID(user.JID)
  981. if puppet == nil {
  982. return
  983. }
  984. intent := puppet.CustomIntent()
  985. if intent == nil {
  986. return
  987. }
  988. portal := user.GetPortalByJID(jid)
  989. if portal == nil {
  990. return
  991. }
  992. var message *database.Message
  993. if messageID == "" {
  994. message = user.bridge.DB.Message.GetLastInChat(portal.Key)
  995. if message == nil {
  996. return
  997. }
  998. user.log.Debugfln("User read chat %s/%s in WhatsApp mobile (last known event: %s/%s)", portal.Key.JID, portal.MXID, message.JID, message.MXID)
  999. } else {
  1000. message = user.bridge.DB.Message.GetByJID(portal.Key, messageID)
  1001. if message == nil || message.IsFakeMXID() {
  1002. return
  1003. }
  1004. user.log.Debugfln("User read message %s/%s in %s/%s in WhatsApp mobile", message.JID, message.MXID, portal.Key.JID, portal.MXID)
  1005. }
  1006. err := intent.MarkRead(portal.MXID, message.MXID)
  1007. if err != nil {
  1008. user.log.Warnfln("Failed to bridge own read receipt in %s: %v", jid, err)
  1009. }
  1010. }
  1011. func (user *User) HandleCommand(cmd whatsapp.JSONCommand) {
  1012. switch cmd.Type {
  1013. case whatsapp.CommandPicture:
  1014. if strings.HasSuffix(cmd.JID, whatsapp.NewUserSuffix) {
  1015. puppet := user.bridge.GetPuppetByJID(cmd.JID)
  1016. go puppet.UpdateAvatar(user, cmd.ProfilePicInfo)
  1017. } else if user.bridge.Config.Bridge.ChatMetaSync {
  1018. portal := user.GetPortalByJID(cmd.JID)
  1019. go portal.UpdateAvatar(user, cmd.ProfilePicInfo, true)
  1020. }
  1021. case whatsapp.CommandDisconnect:
  1022. if cmd.Kind == "replaced" {
  1023. user.cleanDisconnection = true
  1024. go user.sendMarkdownBridgeAlert("\u26a0 Your WhatsApp connection was closed by the server because you opened another WhatsApp Web client.\n\n" +
  1025. "Use the `reconnect` command to disconnect the other client and resume bridging.")
  1026. } else {
  1027. user.log.Warnln("Unknown kind of disconnect:", string(cmd.Raw))
  1028. go user.sendMarkdownBridgeAlert("\u26a0 Your WhatsApp connection was closed by the server (reason code: %s).\n\n"+
  1029. "Use the `reconnect` command to reconnect.", cmd.Kind)
  1030. }
  1031. }
  1032. }
  1033. func (user *User) HandleChatUpdate(cmd whatsapp.ChatUpdate) {
  1034. if cmd.Command != whatsapp.ChatUpdateCommandAction {
  1035. return
  1036. }
  1037. portal := user.GetPortalByJID(cmd.JID)
  1038. if len(portal.MXID) == 0 {
  1039. if cmd.Data.Action == whatsapp.ChatActionIntroduce || cmd.Data.Action == whatsapp.ChatActionCreate {
  1040. go func() {
  1041. err := portal.CreateMatrixRoom(user)
  1042. if err != nil {
  1043. user.log.Errorln("Failed to create portal room after receiving join event:", err)
  1044. }
  1045. }()
  1046. }
  1047. return
  1048. }
  1049. // These don't come down the message history :(
  1050. switch cmd.Data.Action {
  1051. case whatsapp.ChatActionAddTopic:
  1052. go portal.UpdateTopic(cmd.Data.AddTopic.Topic, cmd.Data.SenderJID, nil, true)
  1053. case whatsapp.ChatActionRemoveTopic:
  1054. go portal.UpdateTopic("", cmd.Data.SenderJID, nil, true)
  1055. case whatsapp.ChatActionRemove:
  1056. // We ignore leaving groups in the message history to avoid accidentally leaving rejoined groups,
  1057. // but if we get a real-time command that says we left, it should be safe to bridge it.
  1058. if !user.bridge.Config.Bridge.ChatMetaSync {
  1059. for _, jid := range cmd.Data.UserChange.JIDs {
  1060. if jid == user.JID {
  1061. go portal.HandleWhatsAppKick(nil, cmd.Data.SenderJID, cmd.Data.UserChange.JIDs)
  1062. break
  1063. }
  1064. }
  1065. }
  1066. }
  1067. if !user.bridge.Config.Bridge.ChatMetaSync {
  1068. // Ignore chat update commands, we're relying on the message history.
  1069. return
  1070. }
  1071. switch cmd.Data.Action {
  1072. case whatsapp.ChatActionNameChange:
  1073. go portal.UpdateName(cmd.Data.NameChange.Name, cmd.Data.SenderJID, nil, true)
  1074. case whatsapp.ChatActionPromote:
  1075. go portal.ChangeAdminStatus(cmd.Data.UserChange.JIDs, true)
  1076. case whatsapp.ChatActionDemote:
  1077. go portal.ChangeAdminStatus(cmd.Data.UserChange.JIDs, false)
  1078. case whatsapp.ChatActionAnnounce:
  1079. go portal.RestrictMessageSending(cmd.Data.Announce)
  1080. case whatsapp.ChatActionRestrict:
  1081. go portal.RestrictMetadataChanges(cmd.Data.Restrict)
  1082. case whatsapp.ChatActionRemove:
  1083. go portal.HandleWhatsAppKick(nil, cmd.Data.SenderJID, cmd.Data.UserChange.JIDs)
  1084. case whatsapp.ChatActionAdd:
  1085. go portal.HandleWhatsAppInvite(cmd.Data.SenderJID, nil, cmd.Data.UserChange.JIDs)
  1086. case whatsapp.ChatActionIntroduce:
  1087. if cmd.Data.SenderJID != "unknown" {
  1088. go portal.Sync(user, whatsapp.Contact{Jid: portal.Key.JID})
  1089. }
  1090. }
  1091. }
  1092. func (user *User) HandleJSONMessage(message json.RawMessage) {
  1093. if !json.Valid(message) {
  1094. return
  1095. }
  1096. user.log.Debugfln("JSON message: %s", message)
  1097. user.updateLastConnectionIfNecessary()
  1098. }
  1099. func (user *User) NeedsRelaybot(portal *Portal) bool {
  1100. return !user.HasSession() || !user.IsInPortal(portal.Key)
  1101. }