user.go 42 KB


  1. // mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
  2. // Copyright (C) 2020 Tulir Asokan
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package main
  17. import (
  18. "context"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "net/http"
  23. "sort"
  24. "strings"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/skip2/go-qrcode"
  29. log "maunium.net/go/maulogger/v2"
  30. "maunium.net/go/mautrix/appservice"
  31. "maunium.net/go/mautrix/pushrules"
  32. "github.com/Rhymen/go-whatsapp"
  33. waBinary "github.com/Rhymen/go-whatsapp/binary"
  34. waProto "github.com/Rhymen/go-whatsapp/binary/proto"
  35. "maunium.net/go/mautrix"
  36. "maunium.net/go/mautrix/event"
  37. "maunium.net/go/mautrix/format"
  38. "maunium.net/go/mautrix/id"
  39. "maunium.net/go/mautrix-whatsapp/database"
  40. )
  41. type User struct {
  42. *database.User
  43. Conn *whatsapp.Conn
  44. bridge *Bridge
  45. log log.Logger
  46. Admin bool
  47. Whitelisted bool
  48. RelaybotWhitelisted bool
  49. IsRelaybot bool
  50. ConnectionErrors int
  51. CommunityID string
  52. cleanDisconnection bool
  53. batteryWarningsSent int
  54. lastReconnection int64
  55. pushName string
  56. chatListReceived chan struct{}
  57. syncPortalsDone chan struct{}
  58. messageInput chan PortalMessage
  59. messageOutput chan PortalMessage
  60. syncStart chan struct{}
  61. syncWait sync.WaitGroup
  62. syncing int32
  63. mgmtCreateLock sync.Mutex
  64. connLock sync.Mutex
  65. cancelReconnect func()
  66. prevBridgeStatus *AsmuxPong
  67. }
  68. func (bridge *Bridge) GetUserByMXID(userID id.UserID) *User {
  69. _, isPuppet := bridge.ParsePuppetMXID(userID)
  70. if isPuppet || userID == bridge.Bot.UserID {
  71. return nil
  72. }
  73. bridge.usersLock.Lock()
  74. defer bridge.usersLock.Unlock()
  75. user, ok := bridge.usersByMXID[userID]
  76. if !ok {
  77. return bridge.loadDBUser(bridge.DB.User.GetByMXID(userID), &userID)
  78. }
  79. return user
  80. }
  81. func (bridge *Bridge) GetUserByJID(userID whatsapp.JID) *User {
  82. bridge.usersLock.Lock()
  83. defer bridge.usersLock.Unlock()
  84. user, ok := bridge.usersByJID[userID]
  85. if !ok {
  86. return bridge.loadDBUser(bridge.DB.User.GetByJID(userID), nil)
  87. }
  88. return user
  89. }
  90. func (user *User) addToJIDMap() {
  91. user.bridge.usersLock.Lock()
  92. user.bridge.usersByJID[user.JID] = user
  93. user.bridge.usersLock.Unlock()
  94. }
  95. func (user *User) removeFromJIDMap() {
  96. user.bridge.usersLock.Lock()
  97. jidUser, ok := user.bridge.usersByJID[user.JID]
  98. if ok && user == jidUser {
  99. delete(user.bridge.usersByJID, user.JID)
  100. }
  101. user.bridge.usersLock.Unlock()
  102. user.bridge.Metrics.TrackLoginState(user.JID, false)
  103. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotLoggedIn})
  104. }
  105. func (bridge *Bridge) GetAllUsers() []*User {
  106. bridge.usersLock.Lock()
  107. defer bridge.usersLock.Unlock()
  108. dbUsers := bridge.DB.User.GetAll()
  109. output := make([]*User, len(dbUsers))
  110. for index, dbUser := range dbUsers {
  111. user, ok := bridge.usersByMXID[dbUser.MXID]
  112. if !ok {
  113. user = bridge.loadDBUser(dbUser, nil)
  114. }
  115. output[index] = user
  116. }
  117. return output
  118. }
  119. func (bridge *Bridge) loadDBUser(dbUser *database.User, mxid *id.UserID) *User {
  120. if dbUser == nil {
  121. if mxid == nil {
  122. return nil
  123. }
  124. dbUser = bridge.DB.User.New()
  125. dbUser.MXID = *mxid
  126. dbUser.Insert()
  127. }
  128. user := bridge.NewUser(dbUser)
  129. bridge.usersByMXID[user.MXID] = user
  130. if len(user.JID) > 0 {
  131. bridge.usersByJID[user.JID] = user
  132. }
  133. if len(user.ManagementRoom) > 0 {
  134. bridge.managementRooms[user.ManagementRoom] = user
  135. }
  136. return user
  137. }
  138. func (user *User) GetPortals() []*Portal {
  139. keys := user.User.GetPortalKeys()
  140. portals := make([]*Portal, len(keys))
  141. user.bridge.portalsLock.Lock()
  142. for i, key := range keys {
  143. portal, ok := user.bridge.portalsByJID[key]
  144. if !ok {
  145. portal = user.bridge.loadDBPortal(user.bridge.DB.Portal.GetByJID(key), &key)
  146. }
  147. portals[i] = portal
  148. }
  149. user.bridge.portalsLock.Unlock()
  150. return portals
  151. }
  152. func (bridge *Bridge) NewUser(dbUser *database.User) *User {
  153. user := &User{
  154. User: dbUser,
  155. bridge: bridge,
  156. log: bridge.Log.Sub("User").Sub(string(dbUser.MXID)),
  157. IsRelaybot: false,
  158. chatListReceived: make(chan struct{}, 1),
  159. syncPortalsDone: make(chan struct{}, 1),
  160. syncStart: make(chan struct{}, 1),
  161. messageInput: make(chan PortalMessage),
  162. messageOutput: make(chan PortalMessage, bridge.Config.Bridge.UserMessageBuffer),
  163. }
  164. user.RelaybotWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelaybotWhitelisted(user.MXID)
  165. user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID)
  166. user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID)
  167. go user.handleMessageLoop()
  168. go user.runMessageRingBuffer()
  169. return user
  170. }
  171. func (user *User) GetManagementRoom() id.RoomID {
  172. if len(user.ManagementRoom) == 0 {
  173. user.mgmtCreateLock.Lock()
  174. defer user.mgmtCreateLock.Unlock()
  175. if len(user.ManagementRoom) > 0 {
  176. return user.ManagementRoom
  177. }
  178. resp, err := user.bridge.Bot.CreateRoom(&mautrix.ReqCreateRoom{
  179. Topic: "WhatsApp bridge notices",
  180. IsDirect: true,
  181. })
  182. if err != nil {
  183. user.log.Errorln("Failed to auto-create management room:", err)
  184. } else {
  185. user.SetManagementRoom(resp.RoomID)
  186. }
  187. }
  188. return user.ManagementRoom
  189. }
  190. func (user *User) SetManagementRoom(roomID id.RoomID) {
  191. existingUser, ok := user.bridge.managementRooms[roomID]
  192. if ok {
  193. existingUser.ManagementRoom = ""
  194. existingUser.Update()
  195. }
  196. user.ManagementRoom = roomID
  197. user.bridge.managementRooms[user.ManagementRoom] = user
  198. user.Update()
  199. }
  200. func (user *User) SetSession(session *whatsapp.Session) {
  201. if session == nil {
  202. user.Session = nil
  203. user.LastConnection = 0
  204. } else if len(session.Wid) > 0 {
  205. user.Session = session
  206. } else {
  207. return
  208. }
  209. user.Update()
  210. }
  211. func (user *User) Connect(evenIfNoSession bool) bool {
  212. user.connLock.Lock()
  213. if user.Conn != nil {
  214. user.connLock.Unlock()
  215. if user.Conn.IsConnected() {
  216. return true
  217. } else {
  218. return user.RestoreSession()
  219. }
  220. } else if !evenIfNoSession && user.Session == nil {
  221. user.connLock.Unlock()
  222. return false
  223. }
  224. user.log.Debugln("Connecting to WhatsApp")
  225. if user.Session != nil {
  226. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWAConnecting})
  227. }
  228. timeout := time.Duration(user.bridge.Config.Bridge.ConnectionTimeout)
  229. if timeout == 0 {
  230. timeout = 20
  231. }
  232. user.Conn = whatsapp.NewConn(&whatsapp.Options{
  233. Timeout: timeout * time.Second,
  234. LongClientName: user.bridge.Config.WhatsApp.OSName,
  235. ShortClientName: user.bridge.Config.WhatsApp.BrowserName,
  236. ClientVersion: WAVersion,
  237. Log: user.log.Sub("Conn"),
  238. Handler: []whatsapp.Handler{user},
  239. })
  240. user.setupAdminTestHooks()
  241. user.connLock.Unlock()
  242. return user.RestoreSession()
  243. }
  244. func (user *User) DeleteConnection() {
  245. user.connLock.Lock()
  246. if user.Conn == nil {
  247. user.connLock.Unlock()
  248. return
  249. }
  250. err := user.Conn.Disconnect()
  251. if err != nil && err != whatsapp.ErrNotConnected {
  252. user.log.Warnln("Error disconnecting: %v", err)
  253. }
  254. user.Conn.RemoveHandlers()
  255. user.Conn = nil
  256. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  257. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
  258. user.connLock.Unlock()
  259. }
  260. func (user *User) RestoreSession() bool {
  261. if user.Session != nil {
  262. user.Conn.SetSession(*user.Session)
  263. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
  264. defer cancel()
  265. err := user.Conn.Restore(true, ctx)
  266. if err == whatsapp.ErrAlreadyLoggedIn {
  267. return true
  268. } else if err != nil {
  269. user.log.Errorln("Failed to restore session:", err)
  270. if errors.Is(err, whatsapp.ErrUnpaired) {
  271. user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp: unpaired from phone. " +
  272. "To re-pair your phone, log in again.")
  273. user.removeFromJIDMap()
  274. //user.JID = ""
  275. user.SetSession(nil)
  276. user.DeleteConnection()
  277. return false
  278. } else {
  279. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
  280. user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp. Make sure WhatsApp " +
  281. "on your phone is reachable and use `reconnect` to try connecting again.")
  282. }
  283. user.log.Debugln("Disconnecting due to failed session restore...")
  284. err = user.Conn.Disconnect()
  285. if err != nil {
  286. user.log.Errorln("Failed to disconnect after failed session restore:", err)
  287. }
  288. return false
  289. }
  290. user.ConnectionErrors = 0
  291. user.log.Debugln("Session restored successfully")
  292. user.PostLogin()
  293. }
  294. return true
  295. }
  296. func (user *User) HasSession() bool {
  297. return user.Session != nil
  298. }
  299. func (user *User) IsConnected() bool {
  300. return user.Conn != nil && user.Conn.IsConnected() && user.Conn.IsLoggedIn()
  301. }
  302. func (user *User) IsLoginInProgress() bool {
  303. return user.Conn != nil && user.Conn.IsLoginInProgress()
  304. }
  305. func (user *User) loginQrChannel(ce *CommandEvent, qrChan <-chan string, eventIDChan chan<- id.EventID) {
  306. var qrEventID id.EventID
  307. for code := range qrChan {
  308. if code == "stop" {
  309. return
  310. }
  311. qrCode, err := qrcode.Encode(code, qrcode.Low, 256)
  312. if err != nil {
  313. user.log.Errorln("Failed to encode QR code:", err)
  314. ce.Reply("Failed to encode QR code: %v", err)
  315. return
  316. }
  317. bot := user.bridge.AS.BotClient()
  318. resp, err := bot.UploadBytes(qrCode, "image/png")
  319. if err != nil {
  320. user.log.Errorln("Failed to upload QR code:", err)
  321. ce.Reply("Failed to upload QR code: %v", err)
  322. return
  323. }
  324. if qrEventID == "" {
  325. sendResp, err := bot.SendImage(ce.RoomID, code, resp.ContentURI)
  326. if err != nil {
  327. user.log.Errorln("Failed to send QR code to user:", err)
  328. return
  329. }
  330. qrEventID = sendResp.EventID
  331. eventIDChan <- qrEventID
  332. } else {
  333. _, err = bot.SendMessageEvent(ce.RoomID, event.EventMessage, &event.MessageEventContent{
  334. MsgType: event.MsgImage,
  335. Body: code,
  336. URL: resp.ContentURI.CUString(),
  337. NewContent: &event.MessageEventContent{
  338. MsgType: event.MsgImage,
  339. Body: code,
  340. URL: resp.ContentURI.CUString(),
  341. },
  342. RelatesTo: &event.RelatesTo{
  343. Type: event.RelReplace,
  344. EventID: qrEventID,
  345. },
  346. })
  347. if err != nil {
  348. user.log.Errorln("Failed to send edited QR code to user:", err)
  349. }
  350. }
  351. }
  352. }
  353. func (user *User) Login(ce *CommandEvent) {
  354. qrChan := make(chan string, 3)
  355. eventIDChan := make(chan id.EventID, 1)
  356. go user.loginQrChannel(ce, qrChan, eventIDChan)
  357. session, jid, err := user.Conn.Login(qrChan, nil, user.bridge.Config.Bridge.LoginQRRegenCount)
  358. qrChan <- "stop"
  359. if err != nil {
  360. var eventID id.EventID
  361. select {
  362. case eventID = <-eventIDChan:
  363. default:
  364. }
  365. reply := event.MessageEventContent{
  366. MsgType: event.MsgText,
  367. }
  368. if err == whatsapp.ErrAlreadyLoggedIn {
  369. reply.Body = "You're already logged in"
  370. } else if err == whatsapp.ErrLoginInProgress {
  371. reply.Body = "You have a login in progress already."
  372. } else if err == whatsapp.ErrLoginTimedOut {
  373. reply.Body = "QR code scan timed out. Please try again."
  374. } else {
  375. user.log.Warnln("Failed to log in:", err)
  376. reply.Body = fmt.Sprintf("Unknown error while logging in: %v", err)
  377. }
  378. msg := reply
  379. if eventID != "" {
  380. msg.NewContent = &reply
  381. msg.RelatesTo = &event.RelatesTo{
  382. Type: event.RelReplace,
  383. EventID: eventID,
  384. }
  385. }
  386. _, _ = ce.Bot.SendMessageEvent(ce.RoomID, event.EventMessage, &msg)
  387. return
  388. }
  389. // TODO there's a bit of duplication between this and the provisioning API login method
  390. // Also between the two logout methods (commands.go and provisioning.go)
  391. user.log.Debugln("Successful login as", jid, "via command")
  392. user.ConnectionErrors = 0
  393. user.JID = strings.Replace(jid, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, 1)
  394. user.addToJIDMap()
  395. user.SetSession(&session)
  396. ce.Reply("Successfully logged in, synchronizing chats...")
  397. user.PostLogin()
  398. }
  399. type Chat struct {
  400. whatsapp.Chat
  401. Portal *Portal
  402. Contact whatsapp.Contact
  403. }
  404. type ChatList []Chat
  405. func (cl ChatList) Len() int {
  406. return len(cl)
  407. }
  408. func (cl ChatList) Less(i, j int) bool {
  409. return cl[i].LastMessageTime > cl[j].LastMessageTime
  410. }
  411. func (cl ChatList) Swap(i, j int) {
  412. cl[i], cl[j] = cl[j], cl[i]
  413. }
  414. func (user *User) PostLogin() {
  415. user.sendBridgeStatus(AsmuxPong{OK: true})
  416. user.bridge.Metrics.TrackConnectionState(user.JID, true)
  417. user.bridge.Metrics.TrackLoginState(user.JID, true)
  418. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  419. if !atomic.CompareAndSwapInt32(&user.syncing, 0, 1) {
  420. // TODO we should cleanly stop the old sync and start a new one instead of not starting a new one
  421. user.log.Warnln("There seems to be a post-sync already in progress, not starting a new one")
  422. return
  423. }
  424. user.log.Debugln("Locking processing of incoming messages and starting post-login sync")
  425. user.chatListReceived = make(chan struct{}, 1)
  426. user.syncPortalsDone = make(chan struct{}, 1)
  427. user.syncWait.Add(1)
  428. user.syncStart <- struct{}{}
  429. go user.intPostLogin()
  430. }
  431. func (user *User) tryAutomaticDoublePuppeting() {
  432. if !user.bridge.Config.CanDoublePuppet(user.MXID) {
  433. return
  434. }
  435. user.log.Debugln("Checking if double puppeting needs to be enabled")
  436. puppet := user.bridge.GetPuppetByJID(user.JID)
  437. if len(puppet.CustomMXID) > 0 {
  438. user.log.Debugln("User already has double-puppeting enabled")
  439. // Custom puppet already enabled
  440. return
  441. }
  442. accessToken, err := puppet.loginWithSharedSecret(user.MXID)
  443. if err != nil {
  444. user.log.Warnln("Failed to login with shared secret:", err)
  445. return
  446. }
  447. err = puppet.SwitchCustomMXID(accessToken, user.MXID)
  448. if err != nil {
  449. puppet.log.Warnln("Failed to switch to auto-logined custom puppet:", err)
  450. return
  451. }
  452. user.log.Infoln("Successfully automatically enabled custom puppet")
  453. }
  454. func (user *User) sendBridgeNotice(formatString string, args ...interface{}) {
  455. notice := fmt.Sprintf(formatString, args...)
  456. _, err := user.bridge.Bot.SendNotice(user.GetManagementRoom(), notice)
  457. if err != nil {
  458. user.log.Warnf("Failed to send bridge notice \"%s\": %v", notice, err)
  459. }
  460. }
  461. func (user *User) sendMarkdownBridgeAlert(formatString string, args ...interface{}) {
  462. notice := fmt.Sprintf(formatString, args...)
  463. content := format.RenderMarkdown(notice, true, false)
  464. _, err := user.bridge.Bot.SendMessageEvent(user.GetManagementRoom(), event.EventMessage, content)
  465. if err != nil {
  466. user.log.Warnf("Failed to send bridge alert \"%s\": %v", notice, err)
  467. }
  468. }
  469. func (user *User) postConnPing() bool {
  470. user.log.Debugln("Making post-connection ping")
  471. var err error
  472. for i := 0; ; i++ {
  473. err = user.Conn.AdminTest()
  474. if err == nil {
  475. user.log.Debugln("Post-connection ping OK")
  476. return true
  477. } else if errors.Is(err, whatsapp.ErrConnectionTimeout) && i < 5 {
  478. user.log.Warnfln("Post-connection ping timed out, sending new one")
  479. } else {
  480. break
  481. }
  482. }
  483. user.log.Errorfln("Post-connection ping failed: %v. Disconnecting and then reconnecting after a second", err)
  484. disconnectErr := user.Conn.Disconnect()
  485. if disconnectErr != nil {
  486. user.log.Warnln("Error while disconnecting after failed post-connection ping:", disconnectErr)
  487. }
  488. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
  489. user.bridge.Metrics.TrackDisconnection(user.MXID)
  490. go func() {
  491. time.Sleep(1 * time.Second)
  492. user.tryReconnect(fmt.Sprintf("Post-connection ping failed: %v", err))
  493. }()
  494. return false
  495. }
  496. func (user *User) intPostLogin() {
  497. defer atomic.StoreInt32(&user.syncing, 0)
  498. defer user.syncWait.Done()
  499. user.lastReconnection = time.Now().Unix()
  500. user.createCommunity()
  501. user.tryAutomaticDoublePuppeting()
  502. user.log.Debugln("Waiting for chat list receive confirmation")
  503. select {
  504. case <-user.chatListReceived:
  505. user.log.Debugln("Chat list receive confirmation received in PostLogin")
  506. case <-time.After(time.Duration(user.bridge.Config.Bridge.ChatListWait) * time.Second):
  507. user.log.Warnln("Timed out waiting for chat list to arrive!")
  508. user.postConnPing()
  509. return
  510. }
  511. if !user.postConnPing() {
  512. user.log.Debugln("Post-connection ping failed, unlocking processing of incoming messages.")
  513. return
  514. }
  515. user.log.Debugln("Waiting for portal sync complete confirmation")
  516. select {
  517. case <-user.syncPortalsDone:
  518. user.log.Debugln("Post-connection portal sync complete, unlocking processing of incoming messages.")
  519. // TODO this is too short, maybe a per-portal duration?
  520. case <-time.After(time.Duration(user.bridge.Config.Bridge.PortalSyncWait) * time.Second):
  521. user.log.Warnln("Timed out waiting for portal sync to complete! Unlocking processing of incoming messages.")
  522. }
  523. }
  524. type NormalMessage interface {
  525. GetInfo() whatsapp.MessageInfo
  526. }
  527. func (user *User) HandleEvent(event interface{}) {
  528. switch v := event.(type) {
  529. case NormalMessage:
  530. info := v.GetInfo()
  531. user.messageInput <- PortalMessage{info.RemoteJid, user, v, info.Timestamp}
  532. case whatsapp.MessageRevocation:
  533. user.messageInput <- PortalMessage{v.RemoteJid, user, v, 0}
  534. case whatsapp.StreamEvent:
  535. user.HandleStreamEvent(v)
  536. case []whatsapp.Chat:
  537. user.HandleChatList(v)
  538. case []whatsapp.Contact:
  539. user.HandleContactList(v)
  540. case error:
  541. user.HandleError(v)
  542. case whatsapp.Contact:
  543. go user.HandleNewContact(v)
  544. case whatsapp.BatteryMessage:
  545. user.HandleBatteryMessage(v)
  546. case whatsapp.CallInfo:
  547. user.HandleCallInfo(v)
  548. case whatsapp.PresenceEvent:
  549. go user.HandlePresence(v)
  550. case whatsapp.JSONMsgInfo:
  551. go user.HandleMsgInfo(v)
  552. case whatsapp.ReceivedMessage:
  553. user.HandleReceivedMessage(v)
  554. case whatsapp.ReadMessage:
  555. user.HandleReadMessage(v)
  556. case whatsapp.JSONCommand:
  557. user.HandleCommand(v)
  558. case whatsapp.ChatUpdate:
  559. user.HandleChatUpdate(v)
  560. case whatsapp.ConnInfo:
  561. user.HandleConnInfo(v)
  562. case whatsapp.MuteMessage:
  563. portal := user.bridge.GetPortalByJID(user.PortalKey(v.JID))
  564. if portal != nil {
  565. go user.updateChatMute(nil, portal, v.MutedUntil)
  566. }
  567. case whatsapp.ArchiveMessage:
  568. portal := user.bridge.GetPortalByJID(user.PortalKey(v.JID))
  569. if portal != nil {
  570. go user.updateChatArchive(nil, portal, v.IsArchived)
  571. }
  572. case json.RawMessage:
  573. user.HandleJSONMessage(v)
  574. case *waProto.WebMessageInfo:
  575. user.updateLastConnectionIfNecessary()
  576. // TODO trace log
  577. //user.log.Debugfln("WebMessageInfo: %+v", v)
  578. case *waBinary.Node:
  579. user.log.Debugfln("Unknown binary message: %+v", v)
  580. default:
  581. user.log.Debugfln("Unknown type of event in HandleEvent: %T", v)
  582. }
  583. }
  584. func (user *User) HandleStreamEvent(evt whatsapp.StreamEvent) {
  585. if evt.Type == whatsapp.StreamSleep {
  586. if user.lastReconnection+60 > time.Now().Unix() {
  587. user.lastReconnection = 0
  588. user.log.Infoln("Stream went to sleep soon after reconnection, making new post-connection ping in 20 seconds")
  589. go func() {
  590. time.Sleep(20 * time.Second)
  591. // TODO if this happens during the post-login sync, it can get stuck forever
  592. // TODO check if the above is still true
  593. user.postConnPing()
  594. }()
  595. }
  596. } else {
  597. user.log.Infofln("Stream event: %+v", evt)
  598. }
  599. }
  600. func (user *User) HandleChatList(chats []whatsapp.Chat) {
  601. user.log.Infoln("Chat list received")
  602. chatMap := make(map[string]whatsapp.Chat)
  603. for _, chat := range user.Conn.Store.Chats {
  604. chatMap[chat.JID] = chat
  605. }
  606. for _, chat := range chats {
  607. chatMap[chat.JID] = chat
  608. }
  609. select {
  610. case user.chatListReceived <- struct{}{}:
  611. default:
  612. }
  613. go user.syncPortals(chatMap, false)
  614. }
  615. func (user *User) updateChatMute(intent *appservice.IntentAPI, portal *Portal, mutedUntil int64) {
  616. if len(portal.MXID) == 0 || !user.bridge.Config.Bridge.MuteBridging {
  617. return
  618. } else if intent == nil {
  619. doublePuppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  620. if doublePuppet == nil || doublePuppet.CustomIntent() == nil {
  621. return
  622. }
  623. intent = doublePuppet.CustomIntent()
  624. }
  625. var err error
  626. if mutedUntil < time.Now().Unix() {
  627. user.log.Debugln("Unmuting", portal.MXID)
  628. err = intent.DeletePushRule("global", pushrules.RoomRule, string(portal.MXID))
  629. } else {
  630. user.log.Debugln("Muting", portal.MXID)
  631. err = intent.PutPushRule("global", pushrules.RoomRule, string(portal.MXID), &mautrix.ReqPutPushRule{
  632. Actions: []pushrules.PushActionType{pushrules.ActionDontNotify},
  633. })
  634. }
  635. if err != nil && !errors.Is(err, mautrix.MNotFound) {
  636. user.log.Warnfln("Failed to update push rule for %s through double puppet: %v", portal.MXID, err)
  637. }
  638. }
  639. func (user *User) updateChatArchive(intent *appservice.IntentAPI, portal *Portal, archived bool) {
  640. if len(portal.MXID) == 0 || len(user.bridge.Config.Bridge.ArchiveTag) == 0 {
  641. return
  642. } else if intent == nil {
  643. doublePuppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  644. if doublePuppet == nil || doublePuppet.CustomIntent() == nil {
  645. return
  646. }
  647. intent = doublePuppet.CustomIntent()
  648. }
  649. var err error
  650. if archived {
  651. user.log.Debugln("Adding tag", user.bridge.Config.Bridge.ArchiveTag, "to", portal.MXID)
  652. err = intent.AddTag(portal.MXID, user.bridge.Config.Bridge.ArchiveTag, 0.5)
  653. } else {
  654. user.log.Debugln("Removing tag", user.bridge.Config.Bridge.ArchiveTag, "from", portal.MXID)
  655. err = intent.RemoveTag(portal.MXID, user.bridge.Config.Bridge.ArchiveTag)
  656. }
  657. if err != nil {
  658. user.log.Warnfln("Failed to update tag for %s through double puppet: %v", portal.MXID, err)
  659. }
  660. }
  661. func (user *User) syncChatDoublePuppetDetails(doublePuppet *Puppet, chat Chat) {
  662. if doublePuppet == nil || doublePuppet.CustomIntent() == nil {
  663. return
  664. }
  665. intent := doublePuppet.CustomIntent()
  666. if chat.UnreadCount == 0 {
  667. lastMessage := user.bridge.DB.Message.GetLastInChat(chat.Portal.Key)
  668. if lastMessage != nil {
  669. err := intent.MarkRead(chat.Portal.MXID, lastMessage.MXID)
  670. if err != nil {
  671. user.log.Warnln("Failed to mark %s in %s as read after backfill: %v", lastMessage.MXID, chat.Portal.MXID, err)
  672. }
  673. }
  674. }
  675. user.updateChatMute(intent, chat.Portal, chat.MutedUntil)
  676. user.updateChatArchive(intent, chat.Portal, chat.IsArchived)
  677. }
  678. func (user *User) syncPortal(chat Chat) {
  679. // Don't sync unless chat meta sync is enabled or portal doesn't exist
  680. if user.bridge.Config.Bridge.ChatMetaSync || len(chat.Portal.MXID) == 0 {
  681. chat.Portal.Sync(user, chat.Contact)
  682. }
  683. err := chat.Portal.BackfillHistory(user, chat.LastMessageTime)
  684. if err != nil {
  685. chat.Portal.log.Errorln("Error backfilling history:", err)
  686. }
  687. }
  688. func (user *User) collectChatList(chatMap map[string]whatsapp.Chat) ChatList {
  689. if chatMap == nil {
  690. chatMap = user.Conn.Store.Chats
  691. }
  692. user.log.Infoln("Reading chat list")
  693. chats := make(ChatList, 0, len(chatMap))
  694. existingKeys := user.GetInCommunityMap()
  695. portalKeys := make([]database.PortalKeyWithMeta, 0, len(chatMap))
  696. for _, chat := range chatMap {
  697. portal := user.GetPortalByJID(chat.JID)
  698. chats = append(chats, Chat{
  699. Chat: chat,
  700. Portal: portal,
  701. Contact: user.Conn.Store.Contacts[chat.JID],
  702. })
  703. var inCommunity, ok bool
  704. if inCommunity, ok = existingKeys[portal.Key]; !ok || !inCommunity {
  705. inCommunity = user.addPortalToCommunity(portal)
  706. if portal.IsPrivateChat() {
  707. puppet := user.bridge.GetPuppetByJID(portal.Key.JID)
  708. user.addPuppetToCommunity(puppet)
  709. }
  710. }
  711. portalKeys = append(portalKeys, database.PortalKeyWithMeta{PortalKey: portal.Key, InCommunity: inCommunity})
  712. }
  713. user.log.Infoln("Read chat list, updating user-portal mapping")
  714. err := user.SetPortalKeys(portalKeys)
  715. if err != nil {
  716. user.log.Warnln("Failed to update user-portal mapping:", err)
  717. }
  718. sort.Sort(chats)
  719. return chats
  720. }
  721. func (user *User) syncPortals(chatMap map[string]whatsapp.Chat, createAll bool) {
  722. // TODO use contexts instead of checking if user.Conn is the same?
  723. connAtStart := user.Conn
  724. chats := user.collectChatList(chatMap)
  725. limit := user.bridge.Config.Bridge.InitialChatSync
  726. if limit < 0 {
  727. limit = len(chats)
  728. }
  729. if user.Conn != connAtStart {
  730. user.log.Debugln("Connection seems to have changed before sync, cancelling")
  731. return
  732. }
  733. now := time.Now().Unix()
  734. user.log.Infoln("Syncing portals")
  735. doublePuppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  736. for i, chat := range chats {
  737. if chat.LastMessageTime+user.bridge.Config.Bridge.SyncChatMaxAge < now {
  738. break
  739. }
  740. create := (chat.LastMessageTime >= user.LastConnection && user.LastConnection > 0) || i < limit
  741. if len(chat.Portal.MXID) > 0 || create || createAll {
  742. user.syncPortal(chat)
  743. user.syncChatDoublePuppetDetails(doublePuppet, chat)
  744. }
  745. }
  746. if user.Conn != connAtStart {
  747. user.log.Debugln("Connection seems to have changed during sync, cancelling")
  748. return
  749. }
  750. user.UpdateDirectChats(nil)
  751. user.log.Infoln("Finished syncing portals")
  752. select {
  753. case user.syncPortalsDone <- struct{}{}:
  754. default:
  755. }
  756. }
  757. func (user *User) getDirectChats() map[id.UserID][]id.RoomID {
  758. res := make(map[id.UserID][]id.RoomID)
  759. privateChats := user.bridge.DB.Portal.FindPrivateChats(user.JID)
  760. for _, portal := range privateChats {
  761. if len(portal.MXID) > 0 {
  762. res[user.bridge.FormatPuppetMXID(portal.Key.JID)] = []id.RoomID{portal.MXID}
  763. }
  764. }
  765. return res
  766. }
  767. func (user *User) UpdateDirectChats(chats map[id.UserID][]id.RoomID) {
  768. if !user.bridge.Config.Bridge.SyncDirectChatList {
  769. return
  770. }
  771. puppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  772. if puppet == nil || puppet.CustomIntent() == nil {
  773. return
  774. }
  775. intent := puppet.CustomIntent()
  776. method := http.MethodPatch
  777. if chats == nil {
  778. chats = user.getDirectChats()
  779. method = http.MethodPut
  780. }
  781. user.log.Debugln("Updating m.direct list on homeserver")
  782. var err error
  783. if user.bridge.Config.Homeserver.Asmux {
  784. urlPath := intent.BuildBaseURL("_matrix", "client", "unstable", "com.beeper.asmux", "dms")
  785. _, err = intent.MakeFullRequest(mautrix.FullRequest{
  786. Method: method,
  787. URL: urlPath,
  788. Headers: http.Header{"X-Asmux-Auth": {user.bridge.AS.Registration.AppToken}},
  789. RequestJSON: chats,
  790. })
  791. } else {
  792. existingChats := make(map[id.UserID][]id.RoomID)
  793. err = intent.GetAccountData(event.AccountDataDirectChats.Type, &existingChats)
  794. if err != nil {
  795. user.log.Warnln("Failed to get m.direct list to update it:", err)
  796. return
  797. }
  798. for userID, rooms := range existingChats {
  799. if _, ok := user.bridge.ParsePuppetMXID(userID); !ok {
  800. // This is not a ghost user, include it in the new list
  801. chats[userID] = rooms
  802. } else if _, ok := chats[userID]; !ok && method == http.MethodPatch {
  803. // This is a ghost user, but we're not replacing the whole list, so include it too
  804. chats[userID] = rooms
  805. }
  806. }
  807. err = intent.SetAccountData(event.AccountDataDirectChats.Type, &chats)
  808. }
  809. if err != nil {
  810. user.log.Warnln("Failed to update m.direct list:", err)
  811. }
  812. }
  813. func (user *User) HandleContactList(contacts []whatsapp.Contact) {
  814. contactMap := make(map[whatsapp.JID]whatsapp.Contact)
  815. for _, contact := range contacts {
  816. contactMap[contact.JID] = contact
  817. }
  818. go user.syncPuppets(contactMap)
  819. }
  820. func (user *User) syncPuppets(contacts map[whatsapp.JID]whatsapp.Contact) {
  821. if contacts == nil {
  822. contacts = user.Conn.Store.Contacts
  823. }
  824. _, hasSelf := contacts[user.JID]
  825. if !hasSelf {
  826. contacts[user.JID] = whatsapp.Contact{
  827. Name: user.pushName,
  828. Notify: user.pushName,
  829. JID: user.JID,
  830. }
  831. }
  832. user.log.Infoln("Syncing puppet info from contacts")
  833. for jid, contact := range contacts {
  834. if strings.HasSuffix(jid, whatsapp.NewUserSuffix) {
  835. puppet := user.bridge.GetPuppetByJID(contact.JID)
  836. puppet.Sync(user, contact)
  837. } else if strings.HasSuffix(jid, whatsapp.BroadcastSuffix) {
  838. portal := user.GetPortalByJID(contact.JID)
  839. portal.Sync(user, contact)
  840. }
  841. }
  842. user.log.Infoln("Finished syncing puppet info from contacts")
  843. }
  844. func (user *User) updateLastConnectionIfNecessary() {
  845. if user.LastConnection+60 < time.Now().Unix() {
  846. user.UpdateLastConnection()
  847. }
  848. }
  849. func (user *User) HandleError(err error) {
  850. if !errors.Is(err, whatsapp.ErrInvalidWsData) {
  851. user.log.Errorfln("WhatsApp error: %v", err)
  852. }
  853. if closed, ok := err.(*whatsapp.ErrConnectionClosed); ok {
  854. user.bridge.Metrics.TrackDisconnection(user.MXID)
  855. if closed.Code == 1000 && user.cleanDisconnection {
  856. user.cleanDisconnection = false
  857. if !user.bridge.Config.Bridge.AggressiveReconnect {
  858. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
  859. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  860. user.log.Infoln("Clean disconnection by server")
  861. return
  862. } else {
  863. user.log.Debugln("Clean disconnection by server, but aggressive reconnection is enabled")
  864. }
  865. }
  866. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection was closed with websocket status code %d", closed.Code))
  867. } else if failed, ok := err.(*whatsapp.ErrConnectionFailed); ok {
  868. disconnectErr := user.Conn.Disconnect()
  869. if disconnectErr != nil {
  870. user.log.Warnln("Failed to disconnect after connection fail:", disconnectErr)
  871. }
  872. user.bridge.Metrics.TrackDisconnection(user.MXID)
  873. user.ConnectionErrors++
  874. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection failed: %v", failed.Err))
  875. } else if err == whatsapp.ErrPingFalse {
  876. disconnectErr := user.Conn.Disconnect()
  877. if disconnectErr != nil {
  878. user.log.Warnln("Failed to disconnect after failed ping:", disconnectErr)
  879. }
  880. user.bridge.Metrics.TrackDisconnection(user.MXID)
  881. user.ConnectionErrors++
  882. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection failed: %v", err))
  883. }
  884. // Otherwise unknown error, probably mostly harmless
  885. }
  886. func (user *User) tryReconnect(msg string) {
  887. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  888. if user.ConnectionErrors > user.bridge.Config.Bridge.MaxConnectionAttempts {
  889. user.sendMarkdownBridgeAlert("%s. Use the `reconnect` command to reconnect.", msg)
  890. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
  891. return
  892. }
  893. if user.bridge.Config.Bridge.ReportConnectionRetry {
  894. user.sendBridgeNotice("%s. Reconnecting...", msg)
  895. // Don't want the same error to be repeated
  896. msg = ""
  897. }
  898. var tries uint
  899. var exponentialBackoff bool
  900. baseDelay := time.Duration(user.bridge.Config.Bridge.ConnectionRetryDelay)
  901. if baseDelay < 0 {
  902. exponentialBackoff = true
  903. baseDelay = -baseDelay + 1
  904. }
  905. delay := baseDelay
  906. ctx, cancel := context.WithCancel(context.Background())
  907. defer cancel()
  908. user.cancelReconnect = cancel
  909. for user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
  910. select {
  911. case <-ctx.Done():
  912. user.log.Debugln("tryReconnect context cancelled, aborting reconnection attempts")
  913. return
  914. default:
  915. }
  916. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWAConnecting})
  917. err := user.Conn.Restore(true, ctx)
  918. if err == nil {
  919. user.ConnectionErrors = 0
  920. if user.bridge.Config.Bridge.ReportConnectionRetry {
  921. user.sendBridgeNotice("Reconnected successfully")
  922. }
  923. user.PostLogin()
  924. return
  925. } else if errors.Is(err, whatsapp.ErrBadRequest) {
  926. user.log.Warnln("Got init 400 error when trying to reconnect, resetting connection...")
  927. err = user.Conn.Disconnect()
  928. if err != nil {
  929. user.log.Debugln("Error while disconnecting for connection reset:", err)
  930. }
  931. } else if errors.Is(err, whatsapp.ErrUnpaired) {
  932. user.log.Errorln("Got init 401 (unpaired) error when trying to reconnect, not retrying")
  933. user.removeFromJIDMap()
  934. //user.JID = ""
  935. user.SetSession(nil)
  936. user.DeleteConnection()
  937. user.sendMarkdownBridgeAlert("\u26a0 Failed to reconnect to WhatsApp: unpaired from phone. " +
  938. "To re-pair your phone, log in again.")
  939. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotLoggedIn})
  940. return
  941. } else if errors.Is(err, whatsapp.ErrAlreadyLoggedIn) {
  942. user.log.Warnln("Reconnection said we're already logged in, not trying anymore")
  943. return
  944. } else {
  945. user.log.Errorln("Error while trying to reconnect after disconnection:", err)
  946. }
  947. tries++
  948. user.ConnectionErrors++
  949. if user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
  950. if exponentialBackoff {
  951. delay = (1 << tries) + baseDelay
  952. }
  953. if user.bridge.Config.Bridge.ReportConnectionRetry {
  954. user.sendBridgeNotice("Reconnection attempt failed: %v. Retrying in %d seconds...", err, delay)
  955. }
  956. time.Sleep(delay * time.Second)
  957. }
  958. }
  959. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
  960. if user.bridge.Config.Bridge.ReportConnectionRetry {
  961. user.sendMarkdownBridgeAlert("%d reconnection attempts failed. Use the `reconnect` command to try to reconnect manually.", tries)
  962. } else {
  963. user.sendMarkdownBridgeAlert("\u26a0 %s. Additionally, %d reconnection attempts failed. Use the `reconnect` command to try to reconnect.", msg, tries)
  964. }
  965. }
  966. func (user *User) PortalKey(jid whatsapp.JID) database.PortalKey {
  967. return database.NewPortalKey(jid, user.JID)
  968. }
  969. func (user *User) GetPortalByJID(jid whatsapp.JID) *Portal {
  970. return user.bridge.GetPortalByJID(user.PortalKey(jid))
  971. }
  972. func (user *User) runMessageRingBuffer() {
  973. for msg := range user.messageInput {
  974. select {
  975. case user.messageOutput <- msg:
  976. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  977. default:
  978. dropped := <-user.messageOutput
  979. user.log.Warnln("Buffer is full, dropping message in", dropped.chat)
  980. user.messageOutput <- msg
  981. }
  982. }
  983. }
  984. func (user *User) handleMessageLoop() {
  985. for {
  986. select {
  987. case msg := <-user.messageOutput:
  988. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  989. user.GetPortalByJID(msg.chat).messages <- msg
  990. case <-user.syncStart:
  991. user.log.Debugln("Processing of incoming messages is locked")
  992. user.bridge.Metrics.TrackSyncLock(user.JID, true)
  993. user.syncWait.Wait()
  994. user.bridge.Metrics.TrackSyncLock(user.JID, false)
  995. user.log.Debugln("Processing of incoming messages unlocked")
  996. }
  997. }
  998. }
  999. func (user *User) HandleNewContact(contact whatsapp.Contact) {
  1000. user.log.Debugfln("Contact message: %+v", contact)
  1001. if strings.HasSuffix(contact.JID, whatsapp.OldUserSuffix) {
  1002. contact.JID = strings.Replace(contact.JID, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, -1)
  1003. }
  1004. if strings.HasSuffix(contact.JID, whatsapp.NewUserSuffix) {
  1005. puppet := user.bridge.GetPuppetByJID(contact.JID)
  1006. puppet.UpdateName(user, contact)
  1007. } else if strings.HasSuffix(contact.JID, whatsapp.BroadcastSuffix) {
  1008. portal := user.GetPortalByJID(contact.JID)
  1009. portal.UpdateName(contact.Name, "", nil, true)
  1010. }
  1011. }
  1012. func (user *User) HandleBatteryMessage(battery whatsapp.BatteryMessage) {
  1013. user.log.Debugfln("Battery message: %+v", battery)
  1014. var notice string
  1015. if !battery.Plugged && battery.Percentage < 15 && user.batteryWarningsSent < 1 {
  1016. notice = fmt.Sprintf("Phone battery low (%d %% remaining)", battery.Percentage)
  1017. user.batteryWarningsSent = 1
  1018. } else if !battery.Plugged && battery.Percentage < 5 && user.batteryWarningsSent < 2 {
  1019. notice = fmt.Sprintf("Phone battery very low (%d %% remaining)", battery.Percentage)
  1020. user.batteryWarningsSent = 2
  1021. } else if battery.Percentage > 15 || battery.Plugged {
  1022. user.batteryWarningsSent = 0
  1023. }
  1024. if notice != "" {
  1025. go user.sendBridgeNotice("%s", notice)
  1026. }
  1027. }
  1028. type FakeMessage struct {
  1029. Text string
  1030. ID string
  1031. Alert bool
  1032. }
  1033. func (user *User) HandleCallInfo(info whatsapp.CallInfo) {
  1034. if info.Data != nil {
  1035. return
  1036. }
  1037. data := FakeMessage{
  1038. ID: info.ID,
  1039. }
  1040. switch info.Type {
  1041. case whatsapp.CallOffer:
  1042. if !user.bridge.Config.Bridge.CallNotices.Start {
  1043. return
  1044. }
  1045. data.Text = "Incoming call"
  1046. data.Alert = true
  1047. case whatsapp.CallOfferVideo:
  1048. if !user.bridge.Config.Bridge.CallNotices.Start {
  1049. return
  1050. }
  1051. data.Text = "Incoming video call"
  1052. data.Alert = true
  1053. case whatsapp.CallTerminate:
  1054. if !user.bridge.Config.Bridge.CallNotices.End {
  1055. return
  1056. }
  1057. data.Text = "Call ended"
  1058. data.ID += "E"
  1059. default:
  1060. return
  1061. }
  1062. portal := user.GetPortalByJID(info.From)
  1063. if portal != nil {
  1064. portal.messages <- PortalMessage{info.From, user, data, 0}
  1065. }
  1066. }
  1067. func (user *User) HandlePresence(info whatsapp.PresenceEvent) {
  1068. puppet := user.bridge.GetPuppetByJID(info.SenderJID)
  1069. switch info.Status {
  1070. case whatsapp.PresenceUnavailable:
  1071. _ = puppet.DefaultIntent().SetPresence("offline")
  1072. case whatsapp.PresenceAvailable:
  1073. if len(puppet.typingIn) > 0 && puppet.typingAt+15 > time.Now().Unix() {
  1074. portal := user.bridge.GetPortalByMXID(puppet.typingIn)
  1075. _, _ = puppet.IntentFor(portal).UserTyping(puppet.typingIn, false, 0)
  1076. puppet.typingIn = ""
  1077. puppet.typingAt = 0
  1078. } else {
  1079. _ = puppet.DefaultIntent().SetPresence("online")
  1080. }
  1081. case whatsapp.PresenceComposing:
  1082. portal := user.GetPortalByJID(info.JID)
  1083. if len(puppet.typingIn) > 0 && puppet.typingAt+15 > time.Now().Unix() {
  1084. if puppet.typingIn == portal.MXID {
  1085. return
  1086. }
  1087. _, _ = puppet.IntentFor(portal).UserTyping(puppet.typingIn, false, 0)
  1088. }
  1089. puppet.typingIn = portal.MXID
  1090. puppet.typingAt = time.Now().Unix()
  1091. _, _ = puppet.IntentFor(portal).UserTyping(portal.MXID, true, 15*1000)
  1092. }
  1093. }
  1094. func (user *User) HandleMsgInfo(info whatsapp.JSONMsgInfo) {
  1095. if (info.Command == whatsapp.MsgInfoCommandAck || info.Command == whatsapp.MsgInfoCommandAcks) && info.Acknowledgement == whatsapp.AckMessageRead {
  1096. portal := user.GetPortalByJID(info.ToJID)
  1097. if len(portal.MXID) == 0 {
  1098. return
  1099. }
  1100. intent := user.bridge.GetPuppetByJID(info.SenderJID).IntentFor(portal)
  1101. for _, msgID := range info.IDs {
  1102. msg := user.bridge.DB.Message.GetByJID(portal.Key, msgID)
  1103. if msg == nil || msg.IsFakeMXID() {
  1104. continue
  1105. }
  1106. err := intent.MarkRead(portal.MXID, msg.MXID)
  1107. if err != nil {
  1108. user.log.Warnln("Failed to mark message %s as read by %s: %v", msg.MXID, info.SenderJID, err)
  1109. }
  1110. }
  1111. }
  1112. }
  1113. func (user *User) HandleReceivedMessage(received whatsapp.ReceivedMessage) {
  1114. if received.Type == "read" {
  1115. go user.markSelfRead(received.Jid, received.Index)
  1116. } else {
  1117. user.log.Debugfln("Unknown received message type: %+v", received)
  1118. }
  1119. }
  1120. func (user *User) HandleReadMessage(read whatsapp.ReadMessage) {
  1121. user.log.Debugfln("Received chat read message: %+v", read)
  1122. go user.markSelfRead(read.Jid, "")
  1123. }
  1124. func (user *User) markSelfRead(jid, messageID string) {
  1125. if strings.HasSuffix(jid, whatsapp.OldUserSuffix) {
  1126. jid = strings.Replace(jid, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, -1)
  1127. }
  1128. puppet := user.bridge.GetPuppetByJID(user.JID)
  1129. if puppet == nil {
  1130. return
  1131. }
  1132. intent := puppet.CustomIntent()
  1133. if intent == nil {
  1134. return
  1135. }
  1136. portal := user.GetPortalByJID(jid)
  1137. if portal == nil {
  1138. return
  1139. }
  1140. var message *database.Message
  1141. if messageID == "" {
  1142. message = user.bridge.DB.Message.GetLastInChat(portal.Key)
  1143. if message == nil {
  1144. return
  1145. }
  1146. user.log.Debugfln("User read chat %s/%s in WhatsApp mobile (last known event: %s/%s)", portal.Key.JID, portal.MXID, message.JID, message.MXID)
  1147. } else {
  1148. message = user.bridge.DB.Message.GetByJID(portal.Key, messageID)
  1149. if message == nil || message.IsFakeMXID() {
  1150. return
  1151. }
  1152. user.log.Debugfln("User read message %s/%s in %s/%s in WhatsApp mobile", message.JID, message.MXID, portal.Key.JID, portal.MXID)
  1153. }
  1154. err := intent.MarkRead(portal.MXID, message.MXID)
  1155. if err != nil {
  1156. user.log.Warnfln("Failed to bridge own read receipt in %s: %v", jid, err)
  1157. }
  1158. }
  1159. func (user *User) HandleCommand(cmd whatsapp.JSONCommand) {
  1160. switch cmd.Type {
  1161. case whatsapp.CommandPicture:
  1162. if strings.HasSuffix(cmd.JID, whatsapp.NewUserSuffix) {
  1163. puppet := user.bridge.GetPuppetByJID(cmd.JID)
  1164. go puppet.UpdateAvatar(user, cmd.ProfilePicInfo)
  1165. } else if user.bridge.Config.Bridge.ChatMetaSync {
  1166. portal := user.GetPortalByJID(cmd.JID)
  1167. go portal.UpdateAvatar(user, cmd.ProfilePicInfo, true)
  1168. }
  1169. case whatsapp.CommandDisconnect:
  1170. if cmd.Kind == "replaced" {
  1171. user.cleanDisconnection = true
  1172. go user.sendMarkdownBridgeAlert("\u26a0 Your WhatsApp connection was closed by the server because you opened another WhatsApp Web client.\n\n" +
  1173. "Use the `reconnect` command to disconnect the other client and resume bridging.")
  1174. } else {
  1175. user.log.Warnln("Unknown kind of disconnect:", string(cmd.Raw))
  1176. go user.sendMarkdownBridgeAlert("\u26a0 Your WhatsApp connection was closed by the server (reason code: %s).\n\n"+
  1177. "Use the `reconnect` command to reconnect.", cmd.Kind)
  1178. }
  1179. }
  1180. }
  1181. func (user *User) HandleChatUpdate(cmd whatsapp.ChatUpdate) {
  1182. if cmd.Command != whatsapp.ChatUpdateCommandAction {
  1183. return
  1184. }
  1185. portal := user.GetPortalByJID(cmd.JID)
  1186. if len(portal.MXID) == 0 {
  1187. if cmd.Data.Action == whatsapp.ChatActionIntroduce || cmd.Data.Action == whatsapp.ChatActionCreate {
  1188. go func() {
  1189. err := portal.CreateMatrixRoom(user)
  1190. if err != nil {
  1191. user.log.Errorln("Failed to create portal room after receiving join event:", err)
  1192. }
  1193. }()
  1194. }
  1195. return
  1196. }
  1197. // These don't come down the message history :(
  1198. switch cmd.Data.Action {
  1199. case whatsapp.ChatActionAddTopic:
  1200. go portal.UpdateTopic(cmd.Data.AddTopic.Topic, cmd.Data.SenderJID, nil, true)
  1201. case whatsapp.ChatActionRemoveTopic:
  1202. go portal.UpdateTopic("", cmd.Data.SenderJID, nil, true)
  1203. case whatsapp.ChatActionRemove:
  1204. // We ignore leaving groups in the message history to avoid accidentally leaving rejoined groups,
  1205. // but if we get a real-time command that says we left, it should be safe to bridge it.
  1206. if !user.bridge.Config.Bridge.ChatMetaSync {
  1207. for _, jid := range cmd.Data.UserChange.JIDs {
  1208. if jid == user.JID {
  1209. go portal.HandleWhatsAppKick(nil, cmd.Data.SenderJID, cmd.Data.UserChange.JIDs)
  1210. break
  1211. }
  1212. }
  1213. }
  1214. }
  1215. if !user.bridge.Config.Bridge.ChatMetaSync {
  1216. // Ignore chat update commands, we're relying on the message history.
  1217. return
  1218. }
  1219. switch cmd.Data.Action {
  1220. case whatsapp.ChatActionNameChange:
  1221. go portal.UpdateName(cmd.Data.NameChange.Name, cmd.Data.SenderJID, nil, true)
  1222. case whatsapp.ChatActionPromote:
  1223. go portal.ChangeAdminStatus(cmd.Data.UserChange.JIDs, true)
  1224. case whatsapp.ChatActionDemote:
  1225. go portal.ChangeAdminStatus(cmd.Data.UserChange.JIDs, false)
  1226. case whatsapp.ChatActionAnnounce:
  1227. go portal.RestrictMessageSending(cmd.Data.Announce)
  1228. case whatsapp.ChatActionRestrict:
  1229. go portal.RestrictMetadataChanges(cmd.Data.Restrict)
  1230. case whatsapp.ChatActionRemove:
  1231. go portal.HandleWhatsAppKick(nil, cmd.Data.SenderJID, cmd.Data.UserChange.JIDs)
  1232. case whatsapp.ChatActionAdd:
  1233. go portal.HandleWhatsAppInvite(cmd.Data.SenderJID, nil, cmd.Data.UserChange.JIDs)
  1234. case whatsapp.ChatActionIntroduce:
  1235. if cmd.Data.SenderJID != "unknown" {
  1236. go portal.Sync(user, whatsapp.Contact{JID: portal.Key.JID})
  1237. }
  1238. }
  1239. }
  1240. func (user *User) HandleConnInfo(info whatsapp.ConnInfo) {
  1241. if user.Session != nil && info.Connected && len(info.ClientToken) > 0 {
  1242. user.log.Debugln("Received new tokens")
  1243. user.Session.ClientToken = info.ClientToken
  1244. user.Session.ServerToken = info.ServerToken
  1245. user.Session.Wid = info.WID
  1246. user.Update()
  1247. }
  1248. if len(info.PushName) > 0 {
  1249. user.pushName = info.PushName
  1250. }
  1251. }
  1252. func (user *User) HandleJSONMessage(message json.RawMessage) {
  1253. if !json.Valid(message) {
  1254. return
  1255. }
  1256. user.log.Debugfln("JSON message: %s", message)
  1257. user.updateLastConnectionIfNecessary()
  1258. }
  1259. func (user *User) NeedsRelaybot(portal *Portal) bool {
  1260. return !user.HasSession() || !user.IsInPortal(portal.Key)
  1261. }