user.go 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292
  1. // mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
  2. // Copyright (C) 2020 Tulir Asokan
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package main
  17. import (
  18. "context"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "net/http"
  23. "sort"
  24. "strconv"
  25. "strings"
  26. "sync"
  27. "sync/atomic"
  28. "time"
  29. "github.com/skip2/go-qrcode"
  30. log "maunium.net/go/maulogger/v2"
  31. "github.com/Rhymen/go-whatsapp"
  32. waBinary "github.com/Rhymen/go-whatsapp/binary"
  33. waProto "github.com/Rhymen/go-whatsapp/binary/proto"
  34. "maunium.net/go/mautrix"
  35. "maunium.net/go/mautrix/event"
  36. "maunium.net/go/mautrix/format"
  37. "maunium.net/go/mautrix/id"
  38. "maunium.net/go/mautrix-whatsapp/database"
  39. )
  40. type User struct {
  41. *database.User
  42. Conn *whatsapp.Conn
  43. bridge *Bridge
  44. log log.Logger
  45. Admin bool
  46. Whitelisted bool
  47. RelaybotWhitelisted bool
  48. IsRelaybot bool
  49. ConnectionErrors int
  50. CommunityID string
  51. cleanDisconnection bool
  52. batteryWarningsSent int
  53. lastReconnection int64
  54. pushName string
  55. chatListReceived chan struct{}
  56. syncPortalsDone chan struct{}
  57. messageInput chan PortalMessage
  58. messageOutput chan PortalMessage
  59. syncStart chan struct{}
  60. syncWait sync.WaitGroup
  61. syncing int32
  62. mgmtCreateLock sync.Mutex
  63. connLock sync.Mutex
  64. cancelReconnect func()
  65. prevBridgeStatus *AsmuxPong
  66. }
  67. func (bridge *Bridge) GetUserByMXID(userID id.UserID) *User {
  68. _, isPuppet := bridge.ParsePuppetMXID(userID)
  69. if isPuppet || userID == bridge.Bot.UserID {
  70. return nil
  71. }
  72. bridge.usersLock.Lock()
  73. defer bridge.usersLock.Unlock()
  74. user, ok := bridge.usersByMXID[userID]
  75. if !ok {
  76. return bridge.loadDBUser(bridge.DB.User.GetByMXID(userID), &userID)
  77. }
  78. return user
  79. }
  80. func (bridge *Bridge) GetUserByJID(userID whatsapp.JID) *User {
  81. bridge.usersLock.Lock()
  82. defer bridge.usersLock.Unlock()
  83. user, ok := bridge.usersByJID[userID]
  84. if !ok {
  85. return bridge.loadDBUser(bridge.DB.User.GetByJID(userID), nil)
  86. }
  87. return user
  88. }
  89. func (user *User) addToJIDMap() {
  90. user.bridge.usersLock.Lock()
  91. user.bridge.usersByJID[user.JID] = user
  92. user.bridge.usersLock.Unlock()
  93. }
  94. func (user *User) removeFromJIDMap() {
  95. user.bridge.usersLock.Lock()
  96. jidUser, ok := user.bridge.usersByJID[user.JID]
  97. if ok && user == jidUser {
  98. delete(user.bridge.usersByJID, user.JID)
  99. }
  100. user.bridge.usersLock.Unlock()
  101. user.bridge.Metrics.TrackLoginState(user.JID, false)
  102. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotLoggedIn})
  103. }
  104. func (bridge *Bridge) GetAllUsers() []*User {
  105. bridge.usersLock.Lock()
  106. defer bridge.usersLock.Unlock()
  107. dbUsers := bridge.DB.User.GetAll()
  108. output := make([]*User, len(dbUsers))
  109. for index, dbUser := range dbUsers {
  110. user, ok := bridge.usersByMXID[dbUser.MXID]
  111. if !ok {
  112. user = bridge.loadDBUser(dbUser, nil)
  113. }
  114. output[index] = user
  115. }
  116. return output
  117. }
  118. func (bridge *Bridge) loadDBUser(dbUser *database.User, mxid *id.UserID) *User {
  119. if dbUser == nil {
  120. if mxid == nil {
  121. return nil
  122. }
  123. dbUser = bridge.DB.User.New()
  124. dbUser.MXID = *mxid
  125. dbUser.Insert()
  126. }
  127. user := bridge.NewUser(dbUser)
  128. bridge.usersByMXID[user.MXID] = user
  129. if len(user.JID) > 0 {
  130. bridge.usersByJID[user.JID] = user
  131. }
  132. if len(user.ManagementRoom) > 0 {
  133. bridge.managementRooms[user.ManagementRoom] = user
  134. }
  135. return user
  136. }
  137. func (user *User) GetPortals() []*Portal {
  138. keys := user.User.GetPortalKeys()
  139. portals := make([]*Portal, len(keys))
  140. user.bridge.portalsLock.Lock()
  141. for i, key := range keys {
  142. portal, ok := user.bridge.portalsByJID[key]
  143. if !ok {
  144. portal = user.bridge.loadDBPortal(user.bridge.DB.Portal.GetByJID(key), &key)
  145. }
  146. portals[i] = portal
  147. }
  148. user.bridge.portalsLock.Unlock()
  149. return portals
  150. }
  151. func (bridge *Bridge) NewUser(dbUser *database.User) *User {
  152. user := &User{
  153. User: dbUser,
  154. bridge: bridge,
  155. log: bridge.Log.Sub("User").Sub(string(dbUser.MXID)),
  156. IsRelaybot: false,
  157. chatListReceived: make(chan struct{}, 1),
  158. syncPortalsDone: make(chan struct{}, 1),
  159. syncStart: make(chan struct{}, 1),
  160. messageInput: make(chan PortalMessage),
  161. messageOutput: make(chan PortalMessage, bridge.Config.Bridge.UserMessageBuffer),
  162. }
  163. user.RelaybotWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelaybotWhitelisted(user.MXID)
  164. user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID)
  165. user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID)
  166. go user.handleMessageLoop()
  167. go user.runMessageRingBuffer()
  168. return user
  169. }
  170. func (user *User) GetManagementRoom() id.RoomID {
  171. if len(user.ManagementRoom) == 0 {
  172. user.mgmtCreateLock.Lock()
  173. defer user.mgmtCreateLock.Unlock()
  174. if len(user.ManagementRoom) > 0 {
  175. return user.ManagementRoom
  176. }
  177. resp, err := user.bridge.Bot.CreateRoom(&mautrix.ReqCreateRoom{
  178. Topic: "WhatsApp bridge notices",
  179. IsDirect: true,
  180. })
  181. if err != nil {
  182. user.log.Errorln("Failed to auto-create management room:", err)
  183. } else {
  184. user.SetManagementRoom(resp.RoomID)
  185. }
  186. }
  187. return user.ManagementRoom
  188. }
  189. func (user *User) SetManagementRoom(roomID id.RoomID) {
  190. existingUser, ok := user.bridge.managementRooms[roomID]
  191. if ok {
  192. existingUser.ManagementRoom = ""
  193. existingUser.Update()
  194. }
  195. user.ManagementRoom = roomID
  196. user.bridge.managementRooms[user.ManagementRoom] = user
  197. user.Update()
  198. }
  199. func (user *User) SetSession(session *whatsapp.Session) {
  200. if session == nil {
  201. user.Session = nil
  202. user.LastConnection = 0
  203. } else if len(session.Wid) > 0 {
  204. user.Session = session
  205. } else {
  206. return
  207. }
  208. user.Update()
  209. }
  210. func (user *User) Connect(evenIfNoSession bool) bool {
  211. user.connLock.Lock()
  212. if user.Conn != nil {
  213. user.connLock.Unlock()
  214. if user.Conn.IsConnected() {
  215. return true
  216. } else {
  217. return user.RestoreSession()
  218. }
  219. } else if !evenIfNoSession && user.Session == nil {
  220. user.connLock.Unlock()
  221. return false
  222. }
  223. user.log.Debugln("Connecting to WhatsApp")
  224. if user.Session != nil {
  225. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWAConnecting})
  226. }
  227. timeout := time.Duration(user.bridge.Config.Bridge.ConnectionTimeout)
  228. if timeout == 0 {
  229. timeout = 20
  230. }
  231. user.Conn = whatsapp.NewConn(&whatsapp.Options{
  232. Timeout: timeout * time.Second,
  233. LongClientName: user.bridge.Config.WhatsApp.OSName,
  234. ShortClientName: user.bridge.Config.WhatsApp.BrowserName,
  235. ClientVersion: WAVersion,
  236. Log: user.log.Sub("Conn"),
  237. Handler: []whatsapp.Handler{user},
  238. })
  239. user.setupAdminTestHooks()
  240. user.connLock.Unlock()
  241. return user.RestoreSession()
  242. }
  243. func (user *User) DeleteConnection() {
  244. user.connLock.Lock()
  245. if user.Conn == nil {
  246. user.connLock.Unlock()
  247. return
  248. }
  249. err := user.Conn.Disconnect()
  250. if err != nil && err != whatsapp.ErrNotConnected {
  251. user.log.Warnln("Error disconnecting: %v", err)
  252. }
  253. user.Conn.RemoveHandlers()
  254. user.Conn = nil
  255. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  256. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
  257. user.connLock.Unlock()
  258. }
  259. func (user *User) RestoreSession() bool {
  260. if user.Session != nil {
  261. user.Conn.SetSession(*user.Session)
  262. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
  263. defer cancel()
  264. err := user.Conn.Restore(true, ctx)
  265. if err == whatsapp.ErrAlreadyLoggedIn {
  266. return true
  267. } else if err != nil {
  268. user.log.Errorln("Failed to restore session:", err)
  269. if errors.Is(err, whatsapp.ErrUnpaired) {
  270. user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp: unpaired from phone. " +
  271. "To re-pair your phone, log in again.")
  272. user.removeFromJIDMap()
  273. //user.JID = ""
  274. user.SetSession(nil)
  275. user.DeleteConnection()
  276. return false
  277. } else {
  278. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
  279. user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp. Make sure WhatsApp " +
  280. "on your phone is reachable and use `reconnect` to try connecting again.")
  281. }
  282. user.log.Debugln("Disconnecting due to failed session restore...")
  283. err = user.Conn.Disconnect()
  284. if err != nil {
  285. user.log.Errorln("Failed to disconnect after failed session restore:", err)
  286. }
  287. return false
  288. }
  289. user.ConnectionErrors = 0
  290. user.log.Debugln("Session restored successfully")
  291. user.PostLogin()
  292. }
  293. return true
  294. }
  295. func (user *User) HasSession() bool {
  296. return user.Session != nil
  297. }
  298. func (user *User) IsConnected() bool {
  299. return user.Conn != nil && user.Conn.IsConnected() && user.Conn.IsLoggedIn()
  300. }
  301. func (user *User) IsLoginInProgress() bool {
  302. return user.Conn != nil && user.Conn.IsLoginInProgress()
  303. }
  304. func (user *User) loginQrChannel(ce *CommandEvent, qrChan <-chan string, eventIDChan chan<- id.EventID) {
  305. var qrEventID id.EventID
  306. for code := range qrChan {
  307. if code == "stop" {
  308. return
  309. }
  310. qrCode, err := qrcode.Encode(code, qrcode.Low, 256)
  311. if err != nil {
  312. user.log.Errorln("Failed to encode QR code:", err)
  313. ce.Reply("Failed to encode QR code: %v", err)
  314. return
  315. }
  316. bot := user.bridge.AS.BotClient()
  317. resp, err := bot.UploadBytes(qrCode, "image/png")
  318. if err != nil {
  319. user.log.Errorln("Failed to upload QR code:", err)
  320. ce.Reply("Failed to upload QR code: %v", err)
  321. return
  322. }
  323. if qrEventID == "" {
  324. sendResp, err := bot.SendImage(ce.RoomID, code, resp.ContentURI)
  325. if err != nil {
  326. user.log.Errorln("Failed to send QR code to user:", err)
  327. return
  328. }
  329. qrEventID = sendResp.EventID
  330. eventIDChan <- qrEventID
  331. } else {
  332. _, err = bot.SendMessageEvent(ce.RoomID, event.EventMessage, &event.MessageEventContent{
  333. MsgType: event.MsgImage,
  334. Body: code,
  335. URL: resp.ContentURI.CUString(),
  336. NewContent: &event.MessageEventContent{
  337. MsgType: event.MsgImage,
  338. Body: code,
  339. URL: resp.ContentURI.CUString(),
  340. },
  341. RelatesTo: &event.RelatesTo{
  342. Type: event.RelReplace,
  343. EventID: qrEventID,
  344. },
  345. })
  346. if err != nil {
  347. user.log.Errorln("Failed to send edited QR code to user:", err)
  348. }
  349. }
  350. }
  351. }
  352. func (user *User) Login(ce *CommandEvent) {
  353. qrChan := make(chan string, 3)
  354. eventIDChan := make(chan id.EventID, 1)
  355. go user.loginQrChannel(ce, qrChan, eventIDChan)
  356. session, jid, err := user.Conn.Login(qrChan, nil, user.bridge.Config.Bridge.LoginQRRegenCount)
  357. qrChan <- "stop"
  358. if err != nil {
  359. var eventID id.EventID
  360. select {
  361. case eventID = <-eventIDChan:
  362. default:
  363. }
  364. reply := event.MessageEventContent{
  365. MsgType: event.MsgText,
  366. }
  367. if err == whatsapp.ErrAlreadyLoggedIn {
  368. reply.Body = "You're already logged in"
  369. } else if err == whatsapp.ErrLoginInProgress {
  370. reply.Body = "You have a login in progress already."
  371. } else if err == whatsapp.ErrLoginTimedOut {
  372. reply.Body = "QR code scan timed out. Please try again."
  373. } else {
  374. user.log.Warnln("Failed to log in:", err)
  375. reply.Body = fmt.Sprintf("Unknown error while logging in: %v", err)
  376. }
  377. msg := reply
  378. if eventID != "" {
  379. msg.NewContent = &reply
  380. msg.RelatesTo = &event.RelatesTo{
  381. Type: event.RelReplace,
  382. EventID: eventID,
  383. }
  384. }
  385. _, _ = ce.Bot.SendMessageEvent(ce.RoomID, event.EventMessage, &msg)
  386. return
  387. }
  388. // TODO there's a bit of duplication between this and the provisioning API login method
  389. // Also between the two logout methods (commands.go and provisioning.go)
  390. user.log.Debugln("Successful login as", jid, "via command")
  391. user.ConnectionErrors = 0
  392. user.JID = strings.Replace(jid, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, 1)
  393. user.addToJIDMap()
  394. user.SetSession(&session)
  395. ce.Reply("Successfully logged in, synchronizing chats...")
  396. user.PostLogin()
  397. }
  398. type Chat struct {
  399. Portal *Portal
  400. LastMessageTime uint64
  401. MarkAsRead bool
  402. Contact whatsapp.Contact
  403. }
  404. type ChatList []Chat
  405. func (cl ChatList) Len() int {
  406. return len(cl)
  407. }
  408. func (cl ChatList) Less(i, j int) bool {
  409. return cl[i].LastMessageTime > cl[j].LastMessageTime
  410. }
  411. func (cl ChatList) Swap(i, j int) {
  412. cl[i], cl[j] = cl[j], cl[i]
  413. }
  414. func (user *User) PostLogin() {
  415. user.sendBridgeStatus(AsmuxPong{OK: true})
  416. user.bridge.Metrics.TrackConnectionState(user.JID, true)
  417. user.bridge.Metrics.TrackLoginState(user.JID, true)
  418. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  419. if !atomic.CompareAndSwapInt32(&user.syncing, 0, 1) {
  420. // TODO we should cleanly stop the old sync and start a new one instead of not starting a new one
  421. user.log.Warnln("There seems to be a post-sync already in progress, not starting a new one")
  422. return
  423. }
  424. user.log.Debugln("Locking processing of incoming messages and starting post-login sync")
  425. user.chatListReceived = make(chan struct{}, 1)
  426. user.syncPortalsDone = make(chan struct{}, 1)
  427. user.syncWait.Add(1)
  428. user.syncStart <- struct{}{}
  429. go user.intPostLogin()
  430. }
  431. func (user *User) tryAutomaticDoublePuppeting() {
  432. if !user.bridge.Config.CanDoublePuppet(user.MXID) {
  433. return
  434. }
  435. user.log.Debugln("Checking if double puppeting needs to be enabled")
  436. puppet := user.bridge.GetPuppetByJID(user.JID)
  437. if len(puppet.CustomMXID) > 0 {
  438. user.log.Debugln("User already has double-puppeting enabled")
  439. // Custom puppet already enabled
  440. return
  441. }
  442. accessToken, err := puppet.loginWithSharedSecret(user.MXID)
  443. if err != nil {
  444. user.log.Warnln("Failed to login with shared secret:", err)
  445. return
  446. }
  447. err = puppet.SwitchCustomMXID(accessToken, user.MXID)
  448. if err != nil {
  449. puppet.log.Warnln("Failed to switch to auto-logined custom puppet:", err)
  450. return
  451. }
  452. user.log.Infoln("Successfully automatically enabled custom puppet")
  453. }
  454. func (user *User) sendBridgeNotice(formatString string, args ...interface{}) {
  455. notice := fmt.Sprintf(formatString, args...)
  456. _, err := user.bridge.Bot.SendNotice(user.GetManagementRoom(), notice)
  457. if err != nil {
  458. user.log.Warnf("Failed to send bridge notice \"%s\": %v", notice, err)
  459. }
  460. }
  461. func (user *User) sendMarkdownBridgeAlert(formatString string, args ...interface{}) {
  462. notice := fmt.Sprintf(formatString, args...)
  463. content := format.RenderMarkdown(notice, true, false)
  464. _, err := user.bridge.Bot.SendMessageEvent(user.GetManagementRoom(), event.EventMessage, content)
  465. if err != nil {
  466. user.log.Warnf("Failed to send bridge alert \"%s\": %v", notice, err)
  467. }
  468. }
  469. func (user *User) postConnPing() bool {
  470. user.log.Debugln("Making post-connection ping")
  471. var err error
  472. for i := 0; ; i++ {
  473. err = user.Conn.AdminTest()
  474. if err == nil {
  475. user.log.Debugln("Post-connection ping OK")
  476. return true
  477. } else if errors.Is(err, whatsapp.ErrConnectionTimeout) && i < 5 {
  478. user.log.Warnfln("Post-connection ping timed out, sending new one")
  479. } else {
  480. break
  481. }
  482. }
  483. user.log.Errorfln("Post-connection ping failed: %v. Disconnecting and then reconnecting after a second", err)
  484. disconnectErr := user.Conn.Disconnect()
  485. if disconnectErr != nil {
  486. user.log.Warnln("Error while disconnecting after failed post-connection ping:", disconnectErr)
  487. }
  488. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
  489. user.bridge.Metrics.TrackDisconnection(user.MXID)
  490. go func() {
  491. time.Sleep(1 * time.Second)
  492. user.tryReconnect(fmt.Sprintf("Post-connection ping failed: %v", err))
  493. }()
  494. return false
  495. }
  496. func (user *User) intPostLogin() {
  497. defer atomic.StoreInt32(&user.syncing, 0)
  498. defer user.syncWait.Done()
  499. user.lastReconnection = time.Now().Unix()
  500. user.createCommunity()
  501. user.tryAutomaticDoublePuppeting()
  502. user.log.Debugln("Waiting for chat list receive confirmation")
  503. select {
  504. case <-user.chatListReceived:
  505. user.log.Debugln("Chat list receive confirmation received in PostLogin")
  506. case <-time.After(time.Duration(user.bridge.Config.Bridge.ChatListWait) * time.Second):
  507. user.log.Warnln("Timed out waiting for chat list to arrive!")
  508. user.postConnPing()
  509. return
  510. }
  511. if !user.postConnPing() {
  512. user.log.Debugln("Post-connection ping failed, unlocking processing of incoming messages.")
  513. return
  514. }
  515. user.log.Debugln("Waiting for portal sync complete confirmation")
  516. select {
  517. case <-user.syncPortalsDone:
  518. user.log.Debugln("Post-connection portal sync complete, unlocking processing of incoming messages.")
  519. // TODO this is too short, maybe a per-portal duration?
  520. case <-time.After(time.Duration(user.bridge.Config.Bridge.PortalSyncWait) * time.Second):
  521. user.log.Warnln("Timed out waiting for portal sync to complete! Unlocking processing of incoming messages.")
  522. }
  523. }
  524. type NormalMessage interface {
  525. GetInfo() whatsapp.MessageInfo
  526. }
  527. func (user *User) HandleEvent(event interface{}) {
  528. switch v := event.(type) {
  529. case NormalMessage:
  530. info := v.GetInfo()
  531. user.messageInput <- PortalMessage{info.RemoteJid, user, v, info.Timestamp}
  532. case whatsapp.MessageRevocation:
  533. user.messageInput <- PortalMessage{v.RemoteJid, user, v, 0}
  534. case whatsapp.StreamEvent:
  535. user.HandleStreamEvent(v)
  536. case []whatsapp.Chat:
  537. user.HandleChatList(v)
  538. case []whatsapp.Contact:
  539. user.HandleContactList(v)
  540. case error:
  541. user.HandleError(v)
  542. case whatsapp.Contact:
  543. go user.HandleNewContact(v)
  544. case whatsapp.BatteryMessage:
  545. user.HandleBatteryMessage(v)
  546. case whatsapp.CallInfo:
  547. user.HandleCallInfo(v)
  548. case whatsapp.PresenceEvent:
  549. go user.HandlePresence(v)
  550. case whatsapp.JSONMsgInfo:
  551. go user.HandleMsgInfo(v)
  552. case whatsapp.ReceivedMessage:
  553. user.HandleReceivedMessage(v)
  554. case whatsapp.ReadMessage:
  555. user.HandleReadMessage(v)
  556. case whatsapp.JSONCommand:
  557. user.HandleCommand(v)
  558. case whatsapp.ChatUpdate:
  559. user.HandleChatUpdate(v)
  560. case whatsapp.ConnInfo:
  561. user.HandleConnInfo(v)
  562. case json.RawMessage:
  563. user.HandleJSONMessage(v)
  564. case *waProto.WebMessageInfo:
  565. user.updateLastConnectionIfNecessary()
  566. // TODO trace log
  567. //user.log.Debugfln("WebMessageInfo: %+v", v)
  568. case *waBinary.Node:
  569. user.log.Debugfln("Unknown binary message: %+v", v)
  570. default:
  571. user.log.Debugfln("Unknown type of event in HandleEvent: %T", v)
  572. }
  573. }
  574. func (user *User) HandleStreamEvent(evt whatsapp.StreamEvent) {
  575. if evt.Type == whatsapp.StreamSleep {
  576. if user.lastReconnection+60 > time.Now().Unix() {
  577. user.lastReconnection = 0
  578. user.log.Infoln("Stream went to sleep soon after reconnection, making new post-connection ping in 20 seconds")
  579. go func() {
  580. time.Sleep(20 * time.Second)
  581. // TODO if this happens during the post-login sync, it can get stuck forever
  582. // TODO check if the above is still true
  583. user.postConnPing()
  584. }()
  585. }
  586. } else {
  587. user.log.Infofln("Stream event: %+v", evt)
  588. }
  589. }
  590. func (user *User) HandleChatList(chats []whatsapp.Chat) {
  591. user.log.Infoln("Chat list received")
  592. chatMap := make(map[string]whatsapp.Chat)
  593. for _, chat := range user.Conn.Store.Chats {
  594. chatMap[chat.JID] = chat
  595. }
  596. for _, chat := range chats {
  597. chatMap[chat.JID] = chat
  598. }
  599. select {
  600. case user.chatListReceived <- struct{}{}:
  601. default:
  602. }
  603. go user.syncPortals(chatMap, false)
  604. }
  605. func (user *User) syncPortals(chatMap map[string]whatsapp.Chat, createAll bool) {
  606. // TODO use contexts instead of checking if user.Conn is the same?
  607. connAtStart := user.Conn
  608. if chatMap == nil {
  609. chatMap = user.Conn.Store.Chats
  610. }
  611. user.log.Infoln("Reading chat list")
  612. chats := make(ChatList, 0, len(chatMap))
  613. existingKeys := user.GetInCommunityMap()
  614. portalKeys := make([]database.PortalKeyWithMeta, 0, len(chatMap))
  615. for _, chat := range chatMap {
  616. ts, err := strconv.ParseUint(chat.LastMessageTime, 10, 64)
  617. if err != nil {
  618. user.log.Warnfln("Non-integer last message time in %s: %s", chat.JID, chat.LastMessageTime)
  619. continue
  620. }
  621. portal := user.GetPortalByJID(chat.JID)
  622. chats = append(chats, Chat{
  623. Portal: portal,
  624. Contact: user.Conn.Store.Contacts[chat.JID],
  625. LastMessageTime: ts,
  626. MarkAsRead: chat.Unread == "0",
  627. })
  628. var inCommunity, ok bool
  629. if inCommunity, ok = existingKeys[portal.Key]; !ok || !inCommunity {
  630. inCommunity = user.addPortalToCommunity(portal)
  631. if portal.IsPrivateChat() {
  632. puppet := user.bridge.GetPuppetByJID(portal.Key.JID)
  633. user.addPuppetToCommunity(puppet)
  634. }
  635. }
  636. portalKeys = append(portalKeys, database.PortalKeyWithMeta{PortalKey: portal.Key, InCommunity: inCommunity})
  637. }
  638. user.log.Infoln("Read chat list, updating user-portal mapping")
  639. err := user.SetPortalKeys(portalKeys)
  640. if err != nil {
  641. user.log.Warnln("Failed to update user-portal mapping:", err)
  642. }
  643. sort.Sort(chats)
  644. limit := user.bridge.Config.Bridge.InitialChatSync
  645. if limit < 0 {
  646. limit = len(chats)
  647. }
  648. if user.Conn != connAtStart {
  649. user.log.Debugln("Connection seems to have changed before sync, cancelling")
  650. return
  651. }
  652. now := uint64(time.Now().Unix())
  653. user.log.Infoln("Syncing portals")
  654. doublePuppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  655. for i, chat := range chats {
  656. if chat.LastMessageTime+user.bridge.Config.Bridge.SyncChatMaxAge < now {
  657. break
  658. }
  659. create := (chat.LastMessageTime >= user.LastConnection && user.LastConnection > 0) || i < limit
  660. if len(chat.Portal.MXID) > 0 || create || createAll {
  661. // Don't sync unless chat meta sync is enabled or portal doesn't exist
  662. if user.bridge.Config.Bridge.ChatMetaSync || len(chat.Portal.MXID) == 0 {
  663. chat.Portal.Sync(user, chat.Contact)
  664. }
  665. err = chat.Portal.BackfillHistory(user, chat.LastMessageTime)
  666. if err != nil {
  667. chat.Portal.log.Errorln("Error backfilling history:", err)
  668. }
  669. if chat.MarkAsRead && doublePuppet != nil && doublePuppet.CustomIntent() != nil {
  670. lastMessage := user.bridge.DB.Message.GetLastInChat(chat.Portal.Key)
  671. if lastMessage != nil {
  672. err = doublePuppet.CustomIntent().MarkRead(chat.Portal.MXID, lastMessage.MXID)
  673. if err != nil {
  674. user.log.Warnln("Failed to mark %s in %s as read after backfill: %v", lastMessage.MXID, chat.Portal.MXID, err)
  675. }
  676. }
  677. }
  678. }
  679. }
  680. if user.Conn != connAtStart {
  681. user.log.Debugln("Connection seems to have changed during sync, cancelling")
  682. return
  683. }
  684. user.UpdateDirectChats(nil)
  685. user.log.Infoln("Finished syncing portals")
  686. select {
  687. case user.syncPortalsDone <- struct{}{}:
  688. default:
  689. }
  690. }
  691. func (user *User) getDirectChats() map[id.UserID][]id.RoomID {
  692. res := make(map[id.UserID][]id.RoomID)
  693. privateChats := user.bridge.DB.Portal.FindPrivateChats(user.JID)
  694. for _, portal := range privateChats {
  695. if len(portal.MXID) > 0 {
  696. res[user.bridge.FormatPuppetMXID(portal.Key.JID)] = []id.RoomID{portal.MXID}
  697. }
  698. }
  699. return res
  700. }
  701. func (user *User) UpdateDirectChats(chats map[id.UserID][]id.RoomID) {
  702. if !user.bridge.Config.Bridge.SyncDirectChatList {
  703. return
  704. }
  705. puppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  706. if puppet == nil || puppet.CustomIntent() == nil {
  707. return
  708. }
  709. intent := puppet.CustomIntent()
  710. method := http.MethodPatch
  711. if chats == nil {
  712. chats = user.getDirectChats()
  713. method = http.MethodPut
  714. }
  715. user.log.Debugln("Updating m.direct list on homeserver")
  716. var err error
  717. if user.bridge.Config.Homeserver.Asmux {
  718. urlPath := intent.BuildBaseURL("_matrix", "client", "unstable", "com.beeper.asmux", "dms")
  719. _, err = intent.MakeFullRequest(mautrix.FullRequest{
  720. Method: method,
  721. URL: urlPath,
  722. Headers: http.Header{"X-Asmux-Auth": {user.bridge.AS.Registration.AppToken}},
  723. RequestJSON: chats,
  724. })
  725. } else {
  726. existingChats := make(map[id.UserID][]id.RoomID)
  727. err = intent.GetAccountData(event.AccountDataDirectChats.Type, &existingChats)
  728. if err != nil {
  729. user.log.Warnln("Failed to get m.direct list to update it:", err)
  730. return
  731. }
  732. for userID, rooms := range existingChats {
  733. if _, ok := user.bridge.ParsePuppetMXID(userID); !ok {
  734. // This is not a ghost user, include it in the new list
  735. chats[userID] = rooms
  736. } else if _, ok := chats[userID]; !ok && method == http.MethodPatch {
  737. // This is a ghost user, but we're not replacing the whole list, so include it too
  738. chats[userID] = rooms
  739. }
  740. }
  741. err = intent.SetAccountData(event.AccountDataDirectChats.Type, &chats)
  742. }
  743. if err != nil {
  744. user.log.Warnln("Failed to update m.direct list:", err)
  745. }
  746. }
  747. func (user *User) HandleContactList(contacts []whatsapp.Contact) {
  748. contactMap := make(map[whatsapp.JID]whatsapp.Contact)
  749. for _, contact := range contacts {
  750. contactMap[contact.JID] = contact
  751. }
  752. go user.syncPuppets(contactMap)
  753. }
  754. func (user *User) syncPuppets(contacts map[whatsapp.JID]whatsapp.Contact) {
  755. if contacts == nil {
  756. contacts = user.Conn.Store.Contacts
  757. }
  758. _, hasSelf := contacts[user.JID]
  759. if !hasSelf {
  760. contacts[user.JID] = whatsapp.Contact{
  761. Name: user.pushName,
  762. Notify: user.pushName,
  763. JID: user.JID,
  764. }
  765. }
  766. user.log.Infoln("Syncing puppet info from contacts")
  767. for jid, contact := range contacts {
  768. if strings.HasSuffix(jid, whatsapp.NewUserSuffix) {
  769. puppet := user.bridge.GetPuppetByJID(contact.JID)
  770. puppet.Sync(user, contact)
  771. } else if strings.HasSuffix(jid, whatsapp.BroadcastSuffix) {
  772. portal := user.GetPortalByJID(contact.JID)
  773. portal.Sync(user, contact)
  774. }
  775. }
  776. user.log.Infoln("Finished syncing puppet info from contacts")
  777. }
  778. func (user *User) updateLastConnectionIfNecessary() {
  779. if user.LastConnection+60 < uint64(time.Now().Unix()) {
  780. user.UpdateLastConnection()
  781. }
  782. }
  783. func (user *User) HandleError(err error) {
  784. if !errors.Is(err, whatsapp.ErrInvalidWsData) {
  785. user.log.Errorfln("WhatsApp error: %v", err)
  786. }
  787. if closed, ok := err.(*whatsapp.ErrConnectionClosed); ok {
  788. user.bridge.Metrics.TrackDisconnection(user.MXID)
  789. if closed.Code == 1000 && user.cleanDisconnection {
  790. user.cleanDisconnection = false
  791. if !user.bridge.Config.Bridge.AggressiveReconnect {
  792. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
  793. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  794. user.log.Infoln("Clean disconnection by server")
  795. return
  796. } else {
  797. user.log.Debugln("Clean disconnection by server, but aggressive reconnection is enabled")
  798. }
  799. }
  800. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection was closed with websocket status code %d", closed.Code))
  801. } else if failed, ok := err.(*whatsapp.ErrConnectionFailed); ok {
  802. disconnectErr := user.Conn.Disconnect()
  803. if disconnectErr != nil {
  804. user.log.Warnln("Failed to disconnect after connection fail:", disconnectErr)
  805. }
  806. user.bridge.Metrics.TrackDisconnection(user.MXID)
  807. user.ConnectionErrors++
  808. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection failed: %v", failed.Err))
  809. } else if err == whatsapp.ErrPingFalse {
  810. disconnectErr := user.Conn.Disconnect()
  811. if disconnectErr != nil {
  812. user.log.Warnln("Failed to disconnect after failed ping:", disconnectErr)
  813. }
  814. user.bridge.Metrics.TrackDisconnection(user.MXID)
  815. user.ConnectionErrors++
  816. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection failed: %v", err))
  817. }
  818. // Otherwise unknown error, probably mostly harmless
  819. }
  820. func (user *User) tryReconnect(msg string) {
  821. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  822. if user.ConnectionErrors > user.bridge.Config.Bridge.MaxConnectionAttempts {
  823. user.sendMarkdownBridgeAlert("%s. Use the `reconnect` command to reconnect.", msg)
  824. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
  825. return
  826. }
  827. if user.bridge.Config.Bridge.ReportConnectionRetry {
  828. user.sendBridgeNotice("%s. Reconnecting...", msg)
  829. // Don't want the same error to be repeated
  830. msg = ""
  831. }
  832. var tries uint
  833. var exponentialBackoff bool
  834. baseDelay := time.Duration(user.bridge.Config.Bridge.ConnectionRetryDelay)
  835. if baseDelay < 0 {
  836. exponentialBackoff = true
  837. baseDelay = -baseDelay + 1
  838. }
  839. delay := baseDelay
  840. ctx, cancel := context.WithCancel(context.Background())
  841. defer cancel()
  842. user.cancelReconnect = cancel
  843. for user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
  844. select {
  845. case <-ctx.Done():
  846. user.log.Debugln("tryReconnect context cancelled, aborting reconnection attempts")
  847. return
  848. default:
  849. }
  850. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWAConnecting})
  851. err := user.Conn.Restore(true, ctx)
  852. if err == nil {
  853. user.ConnectionErrors = 0
  854. if user.bridge.Config.Bridge.ReportConnectionRetry {
  855. user.sendBridgeNotice("Reconnected successfully")
  856. }
  857. user.PostLogin()
  858. return
  859. } else if errors.Is(err, whatsapp.ErrBadRequest) {
  860. user.log.Warnln("Got init 400 error when trying to reconnect, resetting connection...")
  861. err = user.Conn.Disconnect()
  862. if err != nil {
  863. user.log.Debugln("Error while disconnecting for connection reset:", err)
  864. }
  865. } else if errors.Is(err, whatsapp.ErrUnpaired) {
  866. user.log.Errorln("Got init 401 (unpaired) error when trying to reconnect, not retrying")
  867. user.removeFromJIDMap()
  868. //user.JID = ""
  869. user.SetSession(nil)
  870. user.DeleteConnection()
  871. user.sendMarkdownBridgeAlert("\u26a0 Failed to reconnect to WhatsApp: unpaired from phone. " +
  872. "To re-pair your phone, log in again.")
  873. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotLoggedIn})
  874. return
  875. } else if errors.Is(err, whatsapp.ErrAlreadyLoggedIn) {
  876. user.log.Warnln("Reconnection said we're already logged in, not trying anymore")
  877. return
  878. } else {
  879. user.log.Errorln("Error while trying to reconnect after disconnection:", err)
  880. }
  881. tries++
  882. user.ConnectionErrors++
  883. if user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
  884. if exponentialBackoff {
  885. delay = (1 << tries) + baseDelay
  886. }
  887. if user.bridge.Config.Bridge.ReportConnectionRetry {
  888. user.sendBridgeNotice("Reconnection attempt failed: %v. Retrying in %d seconds...", err, delay)
  889. }
  890. time.Sleep(delay * time.Second)
  891. }
  892. }
  893. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
  894. if user.bridge.Config.Bridge.ReportConnectionRetry {
  895. user.sendMarkdownBridgeAlert("%d reconnection attempts failed. Use the `reconnect` command to try to reconnect manually.", tries)
  896. } else {
  897. user.sendMarkdownBridgeAlert("\u26a0 %s. Additionally, %d reconnection attempts failed. Use the `reconnect` command to try to reconnect.", msg, tries)
  898. }
  899. }
  900. func (user *User) PortalKey(jid whatsapp.JID) database.PortalKey {
  901. return database.NewPortalKey(jid, user.JID)
  902. }
  903. func (user *User) GetPortalByJID(jid whatsapp.JID) *Portal {
  904. return user.bridge.GetPortalByJID(user.PortalKey(jid))
  905. }
  906. func (user *User) runMessageRingBuffer() {
  907. for msg := range user.messageInput {
  908. select {
  909. case user.messageOutput <- msg:
  910. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  911. default:
  912. dropped := <-user.messageOutput
  913. user.log.Warnln("Buffer is full, dropping message in", dropped.chat)
  914. user.messageOutput <- msg
  915. }
  916. }
  917. }
  918. func (user *User) handleMessageLoop() {
  919. for {
  920. select {
  921. case msg := <-user.messageOutput:
  922. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  923. user.GetPortalByJID(msg.chat).messages <- msg
  924. case <-user.syncStart:
  925. user.log.Debugln("Processing of incoming messages is locked")
  926. user.bridge.Metrics.TrackSyncLock(user.JID, true)
  927. user.syncWait.Wait()
  928. user.bridge.Metrics.TrackSyncLock(user.JID, false)
  929. user.log.Debugln("Processing of incoming messages unlocked")
  930. }
  931. }
  932. }
  933. func (user *User) HandleNewContact(contact whatsapp.Contact) {
  934. user.log.Debugfln("Contact message: %+v", contact)
  935. if strings.HasSuffix(contact.JID, whatsapp.OldUserSuffix) {
  936. contact.JID = strings.Replace(contact.JID, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, -1)
  937. }
  938. if strings.HasSuffix(contact.JID, whatsapp.NewUserSuffix) {
  939. puppet := user.bridge.GetPuppetByJID(contact.JID)
  940. puppet.UpdateName(user, contact)
  941. } else if strings.HasSuffix(contact.JID, whatsapp.BroadcastSuffix) {
  942. portal := user.GetPortalByJID(contact.JID)
  943. portal.UpdateName(contact.Name, "", nil, true)
  944. }
  945. }
  946. func (user *User) HandleBatteryMessage(battery whatsapp.BatteryMessage) {
  947. user.log.Debugfln("Battery message: %+v", battery)
  948. var notice string
  949. if !battery.Plugged && battery.Percentage < 15 && user.batteryWarningsSent < 1 {
  950. notice = fmt.Sprintf("Phone battery low (%d %% remaining)", battery.Percentage)
  951. user.batteryWarningsSent = 1
  952. } else if !battery.Plugged && battery.Percentage < 5 && user.batteryWarningsSent < 2 {
  953. notice = fmt.Sprintf("Phone battery very low (%d %% remaining)", battery.Percentage)
  954. user.batteryWarningsSent = 2
  955. } else if battery.Percentage > 15 || battery.Plugged {
  956. user.batteryWarningsSent = 0
  957. }
  958. if notice != "" {
  959. go user.sendBridgeNotice("%s", notice)
  960. }
  961. }
  962. type FakeMessage struct {
  963. Text string
  964. ID string
  965. Alert bool
  966. }
  967. func (user *User) HandleCallInfo(info whatsapp.CallInfo) {
  968. if info.Data != nil {
  969. return
  970. }
  971. data := FakeMessage{
  972. ID: info.ID,
  973. }
  974. switch info.Type {
  975. case whatsapp.CallOffer:
  976. if !user.bridge.Config.Bridge.CallNotices.Start {
  977. return
  978. }
  979. data.Text = "Incoming call"
  980. data.Alert = true
  981. case whatsapp.CallOfferVideo:
  982. if !user.bridge.Config.Bridge.CallNotices.Start {
  983. return
  984. }
  985. data.Text = "Incoming video call"
  986. data.Alert = true
  987. case whatsapp.CallTerminate:
  988. if !user.bridge.Config.Bridge.CallNotices.End {
  989. return
  990. }
  991. data.Text = "Call ended"
  992. data.ID += "E"
  993. default:
  994. return
  995. }
  996. portal := user.GetPortalByJID(info.From)
  997. if portal != nil {
  998. portal.messages <- PortalMessage{info.From, user, data, 0}
  999. }
  1000. }
  1001. func (user *User) HandlePresence(info whatsapp.PresenceEvent) {
  1002. puppet := user.bridge.GetPuppetByJID(info.SenderJID)
  1003. switch info.Status {
  1004. case whatsapp.PresenceUnavailable:
  1005. _ = puppet.DefaultIntent().SetPresence("offline")
  1006. case whatsapp.PresenceAvailable:
  1007. if len(puppet.typingIn) > 0 && puppet.typingAt+15 > time.Now().Unix() {
  1008. portal := user.bridge.GetPortalByMXID(puppet.typingIn)
  1009. _, _ = puppet.IntentFor(portal).UserTyping(puppet.typingIn, false, 0)
  1010. puppet.typingIn = ""
  1011. puppet.typingAt = 0
  1012. } else {
  1013. _ = puppet.DefaultIntent().SetPresence("online")
  1014. }
  1015. case whatsapp.PresenceComposing:
  1016. portal := user.GetPortalByJID(info.JID)
  1017. if len(puppet.typingIn) > 0 && puppet.typingAt+15 > time.Now().Unix() {
  1018. if puppet.typingIn == portal.MXID {
  1019. return
  1020. }
  1021. _, _ = puppet.IntentFor(portal).UserTyping(puppet.typingIn, false, 0)
  1022. }
  1023. puppet.typingIn = portal.MXID
  1024. puppet.typingAt = time.Now().Unix()
  1025. _, _ = puppet.IntentFor(portal).UserTyping(portal.MXID, true, 15*1000)
  1026. }
  1027. }
  1028. func (user *User) HandleMsgInfo(info whatsapp.JSONMsgInfo) {
  1029. if (info.Command == whatsapp.MsgInfoCommandAck || info.Command == whatsapp.MsgInfoCommandAcks) && info.Acknowledgement == whatsapp.AckMessageRead {
  1030. portal := user.GetPortalByJID(info.ToJID)
  1031. if len(portal.MXID) == 0 {
  1032. return
  1033. }
  1034. intent := user.bridge.GetPuppetByJID(info.SenderJID).IntentFor(portal)
  1035. for _, msgID := range info.IDs {
  1036. msg := user.bridge.DB.Message.GetByJID(portal.Key, msgID)
  1037. if msg == nil || msg.IsFakeMXID() {
  1038. continue
  1039. }
  1040. err := intent.MarkRead(portal.MXID, msg.MXID)
  1041. if err != nil {
  1042. user.log.Warnln("Failed to mark message %s as read by %s: %v", msg.MXID, info.SenderJID, err)
  1043. }
  1044. }
  1045. }
  1046. }
  1047. func (user *User) HandleReceivedMessage(received whatsapp.ReceivedMessage) {
  1048. if received.Type == "read" {
  1049. go user.markSelfRead(received.Jid, received.Index)
  1050. } else {
  1051. user.log.Debugfln("Unknown received message type: %+v", received)
  1052. }
  1053. }
  1054. func (user *User) HandleReadMessage(read whatsapp.ReadMessage) {
  1055. user.log.Debugfln("Received chat read message: %+v", read)
  1056. go user.markSelfRead(read.Jid, "")
  1057. }
  1058. func (user *User) markSelfRead(jid, messageID string) {
  1059. if strings.HasSuffix(jid, whatsapp.OldUserSuffix) {
  1060. jid = strings.Replace(jid, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, -1)
  1061. }
  1062. puppet := user.bridge.GetPuppetByJID(user.JID)
  1063. if puppet == nil {
  1064. return
  1065. }
  1066. intent := puppet.CustomIntent()
  1067. if intent == nil {
  1068. return
  1069. }
  1070. portal := user.GetPortalByJID(jid)
  1071. if portal == nil {
  1072. return
  1073. }
  1074. var message *database.Message
  1075. if messageID == "" {
  1076. message = user.bridge.DB.Message.GetLastInChat(portal.Key)
  1077. if message == nil {
  1078. return
  1079. }
  1080. user.log.Debugfln("User read chat %s/%s in WhatsApp mobile (last known event: %s/%s)", portal.Key.JID, portal.MXID, message.JID, message.MXID)
  1081. } else {
  1082. message = user.bridge.DB.Message.GetByJID(portal.Key, messageID)
  1083. if message == nil || message.IsFakeMXID() {
  1084. return
  1085. }
  1086. user.log.Debugfln("User read message %s/%s in %s/%s in WhatsApp mobile", message.JID, message.MXID, portal.Key.JID, portal.MXID)
  1087. }
  1088. err := intent.MarkRead(portal.MXID, message.MXID)
  1089. if err != nil {
  1090. user.log.Warnfln("Failed to bridge own read receipt in %s: %v", jid, err)
  1091. }
  1092. }
  1093. func (user *User) HandleCommand(cmd whatsapp.JSONCommand) {
  1094. switch cmd.Type {
  1095. case whatsapp.CommandPicture:
  1096. if strings.HasSuffix(cmd.JID, whatsapp.NewUserSuffix) {
  1097. puppet := user.bridge.GetPuppetByJID(cmd.JID)
  1098. go puppet.UpdateAvatar(user, cmd.ProfilePicInfo)
  1099. } else if user.bridge.Config.Bridge.ChatMetaSync {
  1100. portal := user.GetPortalByJID(cmd.JID)
  1101. go portal.UpdateAvatar(user, cmd.ProfilePicInfo, true)
  1102. }
  1103. case whatsapp.CommandDisconnect:
  1104. if cmd.Kind == "replaced" {
  1105. user.cleanDisconnection = true
  1106. go user.sendMarkdownBridgeAlert("\u26a0 Your WhatsApp connection was closed by the server because you opened another WhatsApp Web client.\n\n" +
  1107. "Use the `reconnect` command to disconnect the other client and resume bridging.")
  1108. } else {
  1109. user.log.Warnln("Unknown kind of disconnect:", string(cmd.Raw))
  1110. go user.sendMarkdownBridgeAlert("\u26a0 Your WhatsApp connection was closed by the server (reason code: %s).\n\n"+
  1111. "Use the `reconnect` command to reconnect.", cmd.Kind)
  1112. }
  1113. }
  1114. }
  1115. func (user *User) HandleChatUpdate(cmd whatsapp.ChatUpdate) {
  1116. if cmd.Command != whatsapp.ChatUpdateCommandAction {
  1117. return
  1118. }
  1119. portal := user.GetPortalByJID(cmd.JID)
  1120. if len(portal.MXID) == 0 {
  1121. if cmd.Data.Action == whatsapp.ChatActionIntroduce || cmd.Data.Action == whatsapp.ChatActionCreate {
  1122. go func() {
  1123. err := portal.CreateMatrixRoom(user)
  1124. if err != nil {
  1125. user.log.Errorln("Failed to create portal room after receiving join event:", err)
  1126. }
  1127. }()
  1128. }
  1129. return
  1130. }
  1131. // These don't come down the message history :(
  1132. switch cmd.Data.Action {
  1133. case whatsapp.ChatActionAddTopic:
  1134. go portal.UpdateTopic(cmd.Data.AddTopic.Topic, cmd.Data.SenderJID, nil, true)
  1135. case whatsapp.ChatActionRemoveTopic:
  1136. go portal.UpdateTopic("", cmd.Data.SenderJID, nil, true)
  1137. case whatsapp.ChatActionRemove:
  1138. // We ignore leaving groups in the message history to avoid accidentally leaving rejoined groups,
  1139. // but if we get a real-time command that says we left, it should be safe to bridge it.
  1140. if !user.bridge.Config.Bridge.ChatMetaSync {
  1141. for _, jid := range cmd.Data.UserChange.JIDs {
  1142. if jid == user.JID {
  1143. go portal.HandleWhatsAppKick(nil, cmd.Data.SenderJID, cmd.Data.UserChange.JIDs)
  1144. break
  1145. }
  1146. }
  1147. }
  1148. }
  1149. if !user.bridge.Config.Bridge.ChatMetaSync {
  1150. // Ignore chat update commands, we're relying on the message history.
  1151. return
  1152. }
  1153. switch cmd.Data.Action {
  1154. case whatsapp.ChatActionNameChange:
  1155. go portal.UpdateName(cmd.Data.NameChange.Name, cmd.Data.SenderJID, nil, true)
  1156. case whatsapp.ChatActionPromote:
  1157. go portal.ChangeAdminStatus(cmd.Data.UserChange.JIDs, true)
  1158. case whatsapp.ChatActionDemote:
  1159. go portal.ChangeAdminStatus(cmd.Data.UserChange.JIDs, false)
  1160. case whatsapp.ChatActionAnnounce:
  1161. go portal.RestrictMessageSending(cmd.Data.Announce)
  1162. case whatsapp.ChatActionRestrict:
  1163. go portal.RestrictMetadataChanges(cmd.Data.Restrict)
  1164. case whatsapp.ChatActionRemove:
  1165. go portal.HandleWhatsAppKick(nil, cmd.Data.SenderJID, cmd.Data.UserChange.JIDs)
  1166. case whatsapp.ChatActionAdd:
  1167. go portal.HandleWhatsAppInvite(cmd.Data.SenderJID, nil, cmd.Data.UserChange.JIDs)
  1168. case whatsapp.ChatActionIntroduce:
  1169. if cmd.Data.SenderJID != "unknown" {
  1170. go portal.Sync(user, whatsapp.Contact{JID: portal.Key.JID})
  1171. }
  1172. }
  1173. }
  1174. func (user *User) HandleConnInfo(info whatsapp.ConnInfo) {
  1175. if user.Session != nil && info.Connected && len(info.ClientToken) > 0 {
  1176. user.log.Debugln("Received new tokens")
  1177. user.Session.ClientToken = info.ClientToken
  1178. user.Session.ServerToken = info.ServerToken
  1179. user.Session.Wid = info.WID
  1180. user.Update()
  1181. }
  1182. if len(info.PushName) > 0 {
  1183. user.pushName = info.PushName
  1184. }
  1185. }
  1186. func (user *User) HandleJSONMessage(message json.RawMessage) {
  1187. if !json.Valid(message) {
  1188. return
  1189. }
  1190. user.log.Debugfln("JSON message: %s", message)
  1191. user.updateLastConnectionIfNecessary()
  1192. }
  1193. func (user *User) NeedsRelaybot(portal *Portal) bool {
  1194. return !user.HasSession() || !user.IsInPortal(portal.Key)
  1195. }