user.go 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423
  1. // mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
  2. // Copyright (C) 2020 Tulir Asokan
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package main
  17. import (
  18. "context"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "net/http"
  23. "sort"
  24. "strings"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/skip2/go-qrcode"
  29. log "maunium.net/go/maulogger/v2"
  30. "maunium.net/go/mautrix/appservice"
  31. "maunium.net/go/mautrix/pushrules"
  32. "github.com/Rhymen/go-whatsapp"
  33. waBinary "github.com/Rhymen/go-whatsapp/binary"
  34. waProto "github.com/Rhymen/go-whatsapp/binary/proto"
  35. "maunium.net/go/mautrix"
  36. "maunium.net/go/mautrix/event"
  37. "maunium.net/go/mautrix/format"
  38. "maunium.net/go/mautrix/id"
  39. "maunium.net/go/mautrix-whatsapp/database"
  40. )
  41. type User struct {
  42. *database.User
  43. Conn *whatsapp.Conn
  44. bridge *Bridge
  45. log log.Logger
  46. Admin bool
  47. Whitelisted bool
  48. RelaybotWhitelisted bool
  49. IsRelaybot bool
  50. ConnectionErrors int
  51. CommunityID string
  52. cleanDisconnection bool
  53. batteryWarningsSent int
  54. lastReconnection int64
  55. pushName string
  56. chatListReceived chan struct{}
  57. syncPortalsDone chan struct{}
  58. messageInput chan PortalMessage
  59. messageOutput chan PortalMessage
  60. syncStart chan struct{}
  61. syncWait sync.WaitGroup
  62. syncing int32
  63. mgmtCreateLock sync.Mutex
  64. connLock sync.Mutex
  65. cancelReconnect func()
  66. prevBridgeStatus *BridgeState
  67. }
  68. func (bridge *Bridge) GetUserByMXID(userID id.UserID) *User {
  69. _, isPuppet := bridge.ParsePuppetMXID(userID)
  70. if isPuppet || userID == bridge.Bot.UserID {
  71. return nil
  72. }
  73. bridge.usersLock.Lock()
  74. defer bridge.usersLock.Unlock()
  75. user, ok := bridge.usersByMXID[userID]
  76. if !ok {
  77. return bridge.loadDBUser(bridge.DB.User.GetByMXID(userID), &userID)
  78. }
  79. return user
  80. }
  81. func (bridge *Bridge) GetUserByJID(userID whatsapp.JID) *User {
  82. bridge.usersLock.Lock()
  83. defer bridge.usersLock.Unlock()
  84. user, ok := bridge.usersByJID[userID]
  85. if !ok {
  86. return bridge.loadDBUser(bridge.DB.User.GetByJID(userID), nil)
  87. }
  88. return user
  89. }
  90. func (user *User) addToJIDMap() {
  91. user.bridge.usersLock.Lock()
  92. user.bridge.usersByJID[user.JID] = user
  93. user.bridge.usersLock.Unlock()
  94. }
  95. func (user *User) removeFromJIDMap(state BridgeStateEvent) {
  96. user.bridge.usersLock.Lock()
  97. jidUser, ok := user.bridge.usersByJID[user.JID]
  98. if ok && user == jidUser {
  99. delete(user.bridge.usersByJID, user.JID)
  100. }
  101. user.bridge.usersLock.Unlock()
  102. user.bridge.Metrics.TrackLoginState(user.JID, false)
  103. user.sendBridgeState(BridgeState{StateEvent: state, Error: WANotLoggedIn})
  104. }
  105. func (bridge *Bridge) GetAllUsers() []*User {
  106. bridge.usersLock.Lock()
  107. defer bridge.usersLock.Unlock()
  108. dbUsers := bridge.DB.User.GetAll()
  109. output := make([]*User, len(dbUsers))
  110. for index, dbUser := range dbUsers {
  111. user, ok := bridge.usersByMXID[dbUser.MXID]
  112. if !ok {
  113. user = bridge.loadDBUser(dbUser, nil)
  114. }
  115. output[index] = user
  116. }
  117. return output
  118. }
  119. func (bridge *Bridge) loadDBUser(dbUser *database.User, mxid *id.UserID) *User {
  120. if dbUser == nil {
  121. if mxid == nil {
  122. return nil
  123. }
  124. dbUser = bridge.DB.User.New()
  125. dbUser.MXID = *mxid
  126. dbUser.Insert()
  127. }
  128. user := bridge.NewUser(dbUser)
  129. bridge.usersByMXID[user.MXID] = user
  130. if len(user.JID) > 0 {
  131. bridge.usersByJID[user.JID] = user
  132. }
  133. if len(user.ManagementRoom) > 0 {
  134. bridge.managementRooms[user.ManagementRoom] = user
  135. }
  136. return user
  137. }
  138. func (user *User) GetPortals() []*Portal {
  139. keys := user.User.GetPortalKeys()
  140. portals := make([]*Portal, len(keys))
  141. user.bridge.portalsLock.Lock()
  142. for i, key := range keys {
  143. portal, ok := user.bridge.portalsByJID[key]
  144. if !ok {
  145. portal = user.bridge.loadDBPortal(user.bridge.DB.Portal.GetByJID(key), &key)
  146. }
  147. portals[i] = portal
  148. }
  149. user.bridge.portalsLock.Unlock()
  150. return portals
  151. }
  152. func (bridge *Bridge) NewUser(dbUser *database.User) *User {
  153. user := &User{
  154. User: dbUser,
  155. bridge: bridge,
  156. log: bridge.Log.Sub("User").Sub(string(dbUser.MXID)),
  157. IsRelaybot: false,
  158. chatListReceived: make(chan struct{}, 1),
  159. syncPortalsDone: make(chan struct{}, 1),
  160. syncStart: make(chan struct{}, 1),
  161. messageInput: make(chan PortalMessage),
  162. messageOutput: make(chan PortalMessage, bridge.Config.Bridge.UserMessageBuffer),
  163. }
  164. user.RelaybotWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelaybotWhitelisted(user.MXID)
  165. user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID)
  166. user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID)
  167. go user.handleMessageLoop()
  168. go user.runMessageRingBuffer()
  169. return user
  170. }
  171. func (user *User) GetManagementRoom() id.RoomID {
  172. if len(user.ManagementRoom) == 0 {
  173. user.mgmtCreateLock.Lock()
  174. defer user.mgmtCreateLock.Unlock()
  175. if len(user.ManagementRoom) > 0 {
  176. return user.ManagementRoom
  177. }
  178. resp, err := user.bridge.Bot.CreateRoom(&mautrix.ReqCreateRoom{
  179. Topic: "WhatsApp bridge notices",
  180. IsDirect: true,
  181. })
  182. if err != nil {
  183. user.log.Errorln("Failed to auto-create management room:", err)
  184. } else {
  185. user.SetManagementRoom(resp.RoomID)
  186. }
  187. }
  188. return user.ManagementRoom
  189. }
  190. func (user *User) SetManagementRoom(roomID id.RoomID) {
  191. existingUser, ok := user.bridge.managementRooms[roomID]
  192. if ok {
  193. existingUser.ManagementRoom = ""
  194. existingUser.Update()
  195. }
  196. user.ManagementRoom = roomID
  197. user.bridge.managementRooms[user.ManagementRoom] = user
  198. user.Update()
  199. }
  200. func (user *User) SetSession(session *whatsapp.Session) {
  201. if session == nil {
  202. user.Session = nil
  203. user.LastConnection = 0
  204. } else if len(session.Wid) > 0 {
  205. user.Session = session
  206. } else {
  207. return
  208. }
  209. user.Update()
  210. }
  211. func (user *User) Connect(evenIfNoSession bool) bool {
  212. user.connLock.Lock()
  213. if user.Conn != nil {
  214. user.connLock.Unlock()
  215. if user.Conn.IsConnected() {
  216. return true
  217. } else {
  218. return user.RestoreSession()
  219. }
  220. } else if !evenIfNoSession && user.Session == nil {
  221. user.connLock.Unlock()
  222. return false
  223. }
  224. user.log.Debugln("Connecting to WhatsApp")
  225. if user.Session != nil {
  226. user.sendBridgeState(BridgeState{StateEvent: StateConnecting, Error: WAConnecting})
  227. }
  228. timeout := time.Duration(user.bridge.Config.Bridge.ConnectionTimeout)
  229. if timeout == 0 {
  230. timeout = 20
  231. }
  232. user.Conn = whatsapp.NewConn(&whatsapp.Options{
  233. Timeout: timeout * time.Second,
  234. LongClientName: user.bridge.Config.WhatsApp.OSName,
  235. ShortClientName: user.bridge.Config.WhatsApp.BrowserName,
  236. ClientVersion: WAVersion,
  237. Log: user.log.Sub("Conn"),
  238. Handler: []whatsapp.Handler{user},
  239. })
  240. user.setupAdminTestHooks()
  241. user.connLock.Unlock()
  242. return user.RestoreSession()
  243. }
  244. func (user *User) DeleteConnection() {
  245. user.connLock.Lock()
  246. if user.Conn == nil {
  247. user.connLock.Unlock()
  248. return
  249. }
  250. err := user.Conn.Disconnect()
  251. if err != nil && err != whatsapp.ErrNotConnected {
  252. user.log.Warnln("Error disconnecting: %v", err)
  253. }
  254. user.Conn.RemoveHandlers()
  255. user.Conn = nil
  256. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  257. user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
  258. user.connLock.Unlock()
  259. }
  260. func (user *User) RestoreSession() bool {
  261. if user.Session != nil {
  262. user.Conn.SetSession(*user.Session)
  263. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
  264. defer cancel()
  265. err := user.Conn.Restore(true, ctx)
  266. if err == whatsapp.ErrAlreadyLoggedIn {
  267. return true
  268. } else if err != nil {
  269. user.log.Errorln("Failed to restore session:", err)
  270. if errors.Is(err, whatsapp.ErrUnpaired) {
  271. user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp: unpaired from phone. " +
  272. "To re-pair your phone, log in again.")
  273. user.removeFromJIDMap(StateBadCredentials)
  274. //user.JID = ""
  275. user.SetSession(nil)
  276. user.DeleteConnection()
  277. return false
  278. } else {
  279. user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
  280. user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp. Make sure WhatsApp " +
  281. "on your phone is reachable and use `reconnect` to try connecting again.")
  282. }
  283. user.log.Debugln("Disconnecting due to failed session restore...")
  284. err = user.Conn.Disconnect()
  285. if err != nil {
  286. user.log.Errorln("Failed to disconnect after failed session restore:", err)
  287. }
  288. return false
  289. }
  290. user.ConnectionErrors = 0
  291. user.log.Debugln("Session restored successfully")
  292. user.PostLogin()
  293. }
  294. return true
  295. }
  296. func (user *User) HasSession() bool {
  297. return user.Session != nil
  298. }
  299. func (user *User) IsConnected() bool {
  300. return user.Conn != nil && user.Conn.IsConnected() && user.Conn.IsLoggedIn()
  301. }
  302. func (user *User) IsLoginInProgress() bool {
  303. return user.Conn != nil && user.Conn.IsLoginInProgress()
  304. }
  305. func (user *User) loginQrChannel(ce *CommandEvent, qrChan <-chan string, eventIDChan chan<- id.EventID) {
  306. var qrEventID id.EventID
  307. for code := range qrChan {
  308. if code == "stop" {
  309. return
  310. }
  311. qrCode, err := qrcode.Encode(code, qrcode.Low, 256)
  312. if err != nil {
  313. user.log.Errorln("Failed to encode QR code:", err)
  314. ce.Reply("Failed to encode QR code: %v", err)
  315. return
  316. }
  317. bot := user.bridge.AS.BotClient()
  318. resp, err := bot.UploadBytes(qrCode, "image/png")
  319. if err != nil {
  320. user.log.Errorln("Failed to upload QR code:", err)
  321. ce.Reply("Failed to upload QR code: %v", err)
  322. return
  323. }
  324. if qrEventID == "" {
  325. sendResp, err := bot.SendImage(ce.RoomID, code, resp.ContentURI)
  326. if err != nil {
  327. user.log.Errorln("Failed to send QR code to user:", err)
  328. return
  329. }
  330. qrEventID = sendResp.EventID
  331. eventIDChan <- qrEventID
  332. } else {
  333. _, err = bot.SendMessageEvent(ce.RoomID, event.EventMessage, &event.MessageEventContent{
  334. MsgType: event.MsgImage,
  335. Body: code,
  336. URL: resp.ContentURI.CUString(),
  337. NewContent: &event.MessageEventContent{
  338. MsgType: event.MsgImage,
  339. Body: code,
  340. URL: resp.ContentURI.CUString(),
  341. },
  342. RelatesTo: &event.RelatesTo{
  343. Type: event.RelReplace,
  344. EventID: qrEventID,
  345. },
  346. })
  347. if err != nil {
  348. user.log.Errorln("Failed to send edited QR code to user:", err)
  349. }
  350. }
  351. }
  352. }
  353. func (user *User) Login(ce *CommandEvent) {
  354. qrChan := make(chan string, 3)
  355. eventIDChan := make(chan id.EventID, 1)
  356. go user.loginQrChannel(ce, qrChan, eventIDChan)
  357. session, jid, err := user.Conn.Login(qrChan, nil)
  358. qrChan <- "stop"
  359. if err != nil {
  360. var eventID id.EventID
  361. select {
  362. case eventID = <-eventIDChan:
  363. default:
  364. }
  365. reply := event.MessageEventContent{
  366. MsgType: event.MsgText,
  367. }
  368. if err == whatsapp.ErrAlreadyLoggedIn {
  369. reply.Body = "You're already logged in"
  370. } else if err == whatsapp.ErrLoginInProgress {
  371. reply.Body = "You have a login in progress already."
  372. } else if err == whatsapp.ErrLoginTimedOut {
  373. reply.Body = "QR code scan timed out. Please try again."
  374. } else if errors.Is(err, whatsapp.ErrMultiDeviceNotSupported) {
  375. reply.Body = "WhatsApp multi-device is not currently supported. Please disable it and try again."
  376. } else {
  377. user.log.Warnln("Failed to log in:", err)
  378. reply.Body = fmt.Sprintf("Unknown error while logging in: %v", err)
  379. }
  380. msg := reply
  381. if eventID != "" {
  382. msg.NewContent = &reply
  383. msg.RelatesTo = &event.RelatesTo{
  384. Type: event.RelReplace,
  385. EventID: eventID,
  386. }
  387. }
  388. _, _ = ce.Bot.SendMessageEvent(ce.RoomID, event.EventMessage, &msg)
  389. return
  390. }
  391. // TODO there's a bit of duplication between this and the provisioning API login method
  392. // Also between the two logout methods (commands.go and provisioning.go)
  393. user.log.Debugln("Successful login as", jid, "via command")
  394. user.ConnectionErrors = 0
  395. user.JID = strings.Replace(jid, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, 1)
  396. user.addToJIDMap()
  397. user.SetSession(&session)
  398. ce.Reply("Successfully logged in, synchronizing chats...")
  399. user.PostLogin()
  400. }
  401. type Chat struct {
  402. whatsapp.Chat
  403. Portal *Portal
  404. Contact whatsapp.Contact
  405. }
  406. type ChatList []Chat
  407. func (cl ChatList) Len() int {
  408. return len(cl)
  409. }
  410. func (cl ChatList) Less(i, j int) bool {
  411. return cl[i].LastMessageTime > cl[j].LastMessageTime
  412. }
  413. func (cl ChatList) Swap(i, j int) {
  414. cl[i], cl[j] = cl[j], cl[i]
  415. }
  416. func (user *User) PostLogin() {
  417. user.sendBridgeState(BridgeState{StateEvent: StateBackfilling})
  418. user.bridge.Metrics.TrackConnectionState(user.JID, true)
  419. user.bridge.Metrics.TrackLoginState(user.JID, true)
  420. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  421. if !atomic.CompareAndSwapInt32(&user.syncing, 0, 1) {
  422. // TODO we should cleanly stop the old sync and start a new one instead of not starting a new one
  423. user.log.Warnln("There seems to be a post-sync already in progress, not starting a new one")
  424. return
  425. }
  426. user.log.Debugln("Locking processing of incoming messages and starting post-login sync")
  427. user.chatListReceived = make(chan struct{}, 1)
  428. user.syncPortalsDone = make(chan struct{}, 1)
  429. user.syncWait.Add(1)
  430. user.syncStart <- struct{}{}
  431. go user.intPostLogin()
  432. }
  433. func (user *User) tryAutomaticDoublePuppeting() {
  434. if !user.bridge.Config.CanDoublePuppet(user.MXID) {
  435. return
  436. }
  437. user.log.Debugln("Checking if double puppeting needs to be enabled")
  438. puppet := user.bridge.GetPuppetByJID(user.JID)
  439. if len(puppet.CustomMXID) > 0 {
  440. user.log.Debugln("User already has double-puppeting enabled")
  441. // Custom puppet already enabled
  442. return
  443. }
  444. accessToken, err := puppet.loginWithSharedSecret(user.MXID)
  445. if err != nil {
  446. user.log.Warnln("Failed to login with shared secret:", err)
  447. return
  448. }
  449. err = puppet.SwitchCustomMXID(accessToken, user.MXID)
  450. if err != nil {
  451. puppet.log.Warnln("Failed to switch to auto-logined custom puppet:", err)
  452. return
  453. }
  454. user.log.Infoln("Successfully automatically enabled custom puppet")
  455. }
  456. func (user *User) sendBridgeNotice(formatString string, args ...interface{}) {
  457. notice := fmt.Sprintf(formatString, args...)
  458. _, err := user.bridge.Bot.SendNotice(user.GetManagementRoom(), notice)
  459. if err != nil {
  460. user.log.Warnf("Failed to send bridge notice \"%s\": %v", notice, err)
  461. }
  462. }
  463. func (user *User) sendMarkdownBridgeAlert(formatString string, args ...interface{}) {
  464. notice := fmt.Sprintf(formatString, args...)
  465. content := format.RenderMarkdown(notice, true, false)
  466. _, err := user.bridge.Bot.SendMessageEvent(user.GetManagementRoom(), event.EventMessage, content)
  467. if err != nil {
  468. user.log.Warnf("Failed to send bridge alert \"%s\": %v", notice, err)
  469. }
  470. }
  471. func (user *User) postConnPing() bool {
  472. user.log.Debugln("Making post-connection ping")
  473. var err error
  474. for i := 0; ; i++ {
  475. err = user.Conn.AdminTest()
  476. if err == nil {
  477. user.log.Debugln("Post-connection ping OK")
  478. return true
  479. } else if errors.Is(err, whatsapp.ErrConnectionTimeout) && i < 5 {
  480. user.log.Warnfln("Post-connection ping timed out, sending new one")
  481. } else {
  482. break
  483. }
  484. }
  485. user.log.Errorfln("Post-connection ping failed: %v. Disconnecting and then reconnecting after a second", err)
  486. disconnectErr := user.Conn.Disconnect()
  487. if disconnectErr != nil {
  488. user.log.Warnln("Error while disconnecting after failed post-connection ping:", disconnectErr)
  489. }
  490. user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
  491. user.bridge.Metrics.TrackDisconnection(user.MXID)
  492. go func() {
  493. time.Sleep(1 * time.Second)
  494. user.tryReconnect(fmt.Sprintf("Post-connection ping failed: %v", err))
  495. }()
  496. return false
  497. }
  498. func (user *User) intPostLogin() {
  499. defer atomic.StoreInt32(&user.syncing, 0)
  500. defer user.syncWait.Done()
  501. user.lastReconnection = time.Now().Unix()
  502. user.createCommunity()
  503. user.tryAutomaticDoublePuppeting()
  504. user.log.Debugln("Waiting for chat list receive confirmation")
  505. select {
  506. case <-user.chatListReceived:
  507. user.log.Debugln("Chat list receive confirmation received in PostLogin")
  508. case <-time.After(time.Duration(user.bridge.Config.Bridge.ChatListWait) * time.Second):
  509. user.log.Warnln("Timed out waiting for chat list to arrive!")
  510. user.postConnPing()
  511. return
  512. }
  513. if !user.postConnPing() {
  514. user.log.Debugln("Post-connection ping failed, unlocking processing of incoming messages.")
  515. return
  516. }
  517. user.log.Debugln("Waiting for portal sync complete confirmation")
  518. select {
  519. case <-user.syncPortalsDone:
  520. user.log.Debugln("Post-connection portal sync complete, unlocking processing of incoming messages.")
  521. // TODO this is too short, maybe a per-portal duration?
  522. case <-time.After(time.Duration(user.bridge.Config.Bridge.PortalSyncWait) * time.Second):
  523. user.log.Warnln("Timed out waiting for portal sync to complete! Unlocking processing of incoming messages.")
  524. }
  525. user.sendBridgeState(BridgeState{StateEvent: StateConnected})
  526. }
  527. type NormalMessage interface {
  528. GetInfo() whatsapp.MessageInfo
  529. }
  530. func (user *User) HandleEvent(event interface{}) {
  531. switch v := event.(type) {
  532. case NormalMessage:
  533. info := v.GetInfo()
  534. user.messageInput <- PortalMessage{info.RemoteJid, user, v, info.Timestamp}
  535. case whatsapp.MessageRevocation:
  536. user.messageInput <- PortalMessage{v.RemoteJid, user, v, 0}
  537. case whatsapp.StreamEvent:
  538. user.HandleStreamEvent(v)
  539. case []whatsapp.Chat:
  540. user.HandleChatList(v)
  541. case []whatsapp.Contact:
  542. user.HandleContactList(v)
  543. case error:
  544. user.HandleError(v)
  545. case whatsapp.Contact:
  546. go user.HandleNewContact(v)
  547. case whatsapp.BatteryMessage:
  548. user.HandleBatteryMessage(v)
  549. case whatsapp.CallInfo:
  550. user.HandleCallInfo(v)
  551. case whatsapp.PresenceEvent:
  552. go user.HandlePresence(v)
  553. case whatsapp.JSONMsgInfo:
  554. go user.HandleMsgInfo(v)
  555. case whatsapp.ReceivedMessage:
  556. user.HandleReceivedMessage(v)
  557. case whatsapp.ReadMessage:
  558. user.HandleReadMessage(v)
  559. case whatsapp.JSONCommand:
  560. user.HandleCommand(v)
  561. case whatsapp.ChatUpdate:
  562. user.HandleChatUpdate(v)
  563. case whatsapp.ConnInfo:
  564. user.HandleConnInfo(v)
  565. case whatsapp.MuteMessage:
  566. portal := user.bridge.GetPortalByJID(user.PortalKey(v.JID))
  567. if portal != nil {
  568. go user.updateChatMute(nil, portal, v.MutedUntil)
  569. }
  570. case whatsapp.ArchiveMessage:
  571. portal := user.bridge.GetPortalByJID(user.PortalKey(v.JID))
  572. if portal != nil {
  573. go user.updateChatTag(nil, portal, user.bridge.Config.Bridge.ArchiveTag, v.IsArchived)
  574. }
  575. case whatsapp.PinMessage:
  576. portal := user.bridge.GetPortalByJID(user.PortalKey(v.JID))
  577. if portal != nil {
  578. go user.updateChatTag(nil, portal, user.bridge.Config.Bridge.PinnedTag, v.IsPinned)
  579. }
  580. case whatsapp.RawJSONMessage:
  581. user.HandleJSONMessage(v)
  582. case *waProto.WebMessageInfo:
  583. user.updateLastConnectionIfNecessary()
  584. // TODO trace log
  585. //user.log.Debugfln("WebMessageInfo: %+v", v)
  586. case *waBinary.Node:
  587. user.log.Debugfln("Unknown binary message: %+v", v)
  588. default:
  589. user.log.Debugfln("Unknown type of event in HandleEvent: %T", v)
  590. }
  591. }
  592. func (user *User) HandleStreamEvent(evt whatsapp.StreamEvent) {
  593. if evt.Type == whatsapp.StreamSleep {
  594. if user.lastReconnection+60 > time.Now().Unix() {
  595. user.lastReconnection = 0
  596. user.log.Infoln("Stream went to sleep soon after reconnection, making new post-connection ping in 20 seconds")
  597. go func() {
  598. time.Sleep(20 * time.Second)
  599. // TODO if this happens during the post-login sync, it can get stuck forever
  600. // TODO check if the above is still true
  601. user.postConnPing()
  602. }()
  603. }
  604. } else {
  605. user.log.Infofln("Stream event: %+v", evt)
  606. }
  607. }
  608. func (user *User) HandleChatList(chats []whatsapp.Chat) {
  609. user.log.Infoln("Chat list received")
  610. chatMap := make(map[string]whatsapp.Chat)
  611. user.Conn.Store.ChatsLock.RLock()
  612. for _, chat := range user.Conn.Store.Chats {
  613. chatMap[chat.JID] = chat
  614. }
  615. user.Conn.Store.ChatsLock.RUnlock()
  616. for _, chat := range chats {
  617. chatMap[chat.JID] = chat
  618. }
  619. select {
  620. case user.chatListReceived <- struct{}{}:
  621. user.log.Debugln("Sent chat list receive confirmation from HandleChatList")
  622. default:
  623. user.log.Debugln("Failed to send chat list receive confirmation from HandleChatList, channel probably full")
  624. }
  625. go user.syncPortals(chatMap, false)
  626. }
  627. func (user *User) updateChatMute(intent *appservice.IntentAPI, portal *Portal, mutedUntil int64) {
  628. if len(portal.MXID) == 0 || !user.bridge.Config.Bridge.MuteBridging {
  629. return
  630. } else if intent == nil {
  631. doublePuppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  632. if doublePuppet == nil || doublePuppet.CustomIntent() == nil {
  633. return
  634. }
  635. intent = doublePuppet.CustomIntent()
  636. }
  637. var err error
  638. if mutedUntil != -1 && mutedUntil < time.Now().Unix() {
  639. user.log.Debugfln("Portal %s is muted until %d, unmuting...", portal.MXID, mutedUntil)
  640. err = intent.DeletePushRule("global", pushrules.RoomRule, string(portal.MXID))
  641. } else {
  642. user.log.Debugfln("Portal %s is muted until %d, muting...", portal.MXID, mutedUntil)
  643. err = intent.PutPushRule("global", pushrules.RoomRule, string(portal.MXID), &mautrix.ReqPutPushRule{
  644. Actions: []pushrules.PushActionType{pushrules.ActionDontNotify},
  645. })
  646. }
  647. if err != nil && !errors.Is(err, mautrix.MNotFound) {
  648. user.log.Warnfln("Failed to update push rule for %s through double puppet: %v", portal.MXID, err)
  649. }
  650. }
  651. type CustomTagData struct {
  652. Order json.Number `json:"order"`
  653. DoublePuppet bool `json:"net.maunium.whatsapp.puppet"`
  654. }
  655. type CustomTagEventContent struct {
  656. Tags map[string]CustomTagData `json:"tags"`
  657. }
  658. func (user *User) updateChatTag(intent *appservice.IntentAPI, portal *Portal, tag string, active bool) {
  659. if len(portal.MXID) == 0 || len(tag) == 0 {
  660. return
  661. } else if intent == nil {
  662. doublePuppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  663. if doublePuppet == nil || doublePuppet.CustomIntent() == nil {
  664. return
  665. }
  666. intent = doublePuppet.CustomIntent()
  667. }
  668. var existingTags CustomTagEventContent
  669. err := intent.GetTagsWithCustomData(portal.MXID, &existingTags)
  670. if err != nil && !errors.Is(err, mautrix.MNotFound) {
  671. user.log.Warnfln("Failed to get tags of %s: %v", portal.MXID, err)
  672. }
  673. currentTag, ok := existingTags.Tags[tag]
  674. if active && !ok {
  675. user.log.Debugln("Adding tag", tag, "to", portal.MXID)
  676. data := CustomTagData{"0.5", true}
  677. err = intent.AddTagWithCustomData(portal.MXID, tag, &data)
  678. } else if !active && ok && currentTag.DoublePuppet {
  679. user.log.Debugln("Removing tag", tag, "from", portal.MXID)
  680. err = intent.RemoveTag(portal.MXID, tag)
  681. } else {
  682. err = nil
  683. }
  684. if err != nil {
  685. user.log.Warnfln("Failed to update tag %s for %s through double puppet: %v", tag, portal.MXID, err)
  686. }
  687. }
  688. type CustomReadReceipt struct {
  689. Timestamp int64 `json:"ts,omitempty"`
  690. DoublePuppet bool `json:"net.maunium.whatsapp.puppet,omitempty"`
  691. }
  692. func (user *User) syncChatDoublePuppetDetails(doublePuppet *Puppet, chat Chat, justCreated bool) {
  693. if doublePuppet == nil || doublePuppet.CustomIntent() == nil || len(chat.Portal.MXID) == 0 {
  694. return
  695. }
  696. intent := doublePuppet.CustomIntent()
  697. if chat.UnreadCount == 0 && (justCreated || !user.bridge.Config.Bridge.MarkReadOnlyOnCreate) {
  698. lastMessage := user.bridge.DB.Message.GetLastInChatBefore(chat.Portal.Key, chat.ReceivedAt.Unix())
  699. if lastMessage != nil {
  700. err := intent.MarkReadWithContent(chat.Portal.MXID, lastMessage.MXID, &CustomReadReceipt{DoublePuppet: true})
  701. if err != nil {
  702. user.log.Warnfln("Failed to mark %s in %s as read after backfill: %v", lastMessage.MXID, chat.Portal.MXID, err)
  703. }
  704. }
  705. } else if chat.UnreadCount == -1 {
  706. user.log.Debugfln("Invalid unread count (missing field?) in chat info %+v", chat.Source)
  707. }
  708. if justCreated || !user.bridge.Config.Bridge.TagOnlyOnCreate {
  709. user.updateChatMute(intent, chat.Portal, chat.MutedUntil)
  710. user.updateChatTag(intent, chat.Portal, user.bridge.Config.Bridge.ArchiveTag, chat.IsArchived)
  711. user.updateChatTag(intent, chat.Portal, user.bridge.Config.Bridge.PinnedTag, chat.IsPinned)
  712. }
  713. }
  714. func (user *User) syncPortal(chat Chat) {
  715. // Don't sync unless chat meta sync is enabled or portal doesn't exist
  716. if user.bridge.Config.Bridge.ChatMetaSync || len(chat.Portal.MXID) == 0 {
  717. failedToCreate := chat.Portal.Sync(user, chat.Contact)
  718. if failedToCreate {
  719. return
  720. }
  721. }
  722. err := chat.Portal.BackfillHistory(user, chat.LastMessageTime)
  723. if err != nil {
  724. chat.Portal.log.Errorln("Error backfilling history:", err)
  725. }
  726. }
  727. func (user *User) collectChatList(chatMap map[string]whatsapp.Chat) ChatList {
  728. if chatMap == nil {
  729. chatMap = user.Conn.Store.Chats
  730. }
  731. user.log.Infoln("Reading chat list")
  732. chats := make(ChatList, 0, len(chatMap))
  733. existingKeys := user.GetInCommunityMap()
  734. portalKeys := make([]database.PortalKeyWithMeta, 0, len(chatMap))
  735. for _, chat := range chatMap {
  736. portal := user.GetPortalByJID(chat.JID)
  737. user.Conn.Store.ContactsLock.RLock()
  738. contact, _ := user.Conn.Store.Contacts[chat.JID]
  739. user.Conn.Store.ContactsLock.RUnlock()
  740. chats = append(chats, Chat{
  741. Chat: chat,
  742. Portal: portal,
  743. Contact: contact,
  744. })
  745. var inCommunity, ok bool
  746. if inCommunity, ok = existingKeys[portal.Key]; !ok || !inCommunity {
  747. inCommunity = user.addPortalToCommunity(portal)
  748. if portal.IsPrivateChat() {
  749. puppet := user.bridge.GetPuppetByJID(portal.Key.JID)
  750. user.addPuppetToCommunity(puppet)
  751. }
  752. }
  753. portalKeys = append(portalKeys, database.PortalKeyWithMeta{PortalKey: portal.Key, InCommunity: inCommunity})
  754. }
  755. user.log.Infoln("Read chat list, updating user-portal mapping")
  756. err := user.SetPortalKeys(portalKeys)
  757. if err != nil {
  758. user.log.Warnln("Failed to update user-portal mapping:", err)
  759. }
  760. sort.Sort(chats)
  761. return chats
  762. }
  763. func (user *User) syncPortals(chatMap map[string]whatsapp.Chat, createAll bool) {
  764. // TODO use contexts instead of checking if user.Conn is the same?
  765. connAtStart := user.Conn
  766. chats := user.collectChatList(chatMap)
  767. limit := user.bridge.Config.Bridge.InitialChatSync
  768. if limit < 0 {
  769. limit = len(chats)
  770. }
  771. if user.Conn != connAtStart {
  772. user.log.Debugln("Connection seems to have changed before sync, cancelling")
  773. return
  774. }
  775. now := time.Now().Unix()
  776. user.log.Infoln("Syncing portals")
  777. doublePuppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  778. for i, chat := range chats {
  779. if chat.LastMessageTime+user.bridge.Config.Bridge.SyncChatMaxAge < now {
  780. break
  781. }
  782. create := (chat.LastMessageTime >= user.LastConnection && user.LastConnection > 0) || i < limit
  783. if len(chat.Portal.MXID) > 0 || create || createAll {
  784. user.log.Debugfln("Syncing chat %+v", chat.Chat.Source)
  785. justCreated := len(chat.Portal.MXID) == 0
  786. user.syncPortal(chat)
  787. user.syncChatDoublePuppetDetails(doublePuppet, chat, justCreated)
  788. }
  789. }
  790. if user.Conn != connAtStart {
  791. user.log.Debugln("Connection seems to have changed during sync, cancelling")
  792. return
  793. }
  794. user.UpdateDirectChats(nil)
  795. user.log.Infoln("Finished syncing portals")
  796. select {
  797. case user.syncPortalsDone <- struct{}{}:
  798. default:
  799. }
  800. }
  801. func (user *User) getDirectChats() map[id.UserID][]id.RoomID {
  802. res := make(map[id.UserID][]id.RoomID)
  803. privateChats := user.bridge.DB.Portal.FindPrivateChats(user.JID)
  804. for _, portal := range privateChats {
  805. if len(portal.MXID) > 0 {
  806. res[user.bridge.FormatPuppetMXID(portal.Key.JID)] = []id.RoomID{portal.MXID}
  807. }
  808. }
  809. return res
  810. }
  811. func (user *User) UpdateDirectChats(chats map[id.UserID][]id.RoomID) {
  812. if !user.bridge.Config.Bridge.SyncDirectChatList {
  813. return
  814. }
  815. puppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  816. if puppet == nil || puppet.CustomIntent() == nil {
  817. return
  818. }
  819. intent := puppet.CustomIntent()
  820. method := http.MethodPatch
  821. if chats == nil {
  822. chats = user.getDirectChats()
  823. method = http.MethodPut
  824. }
  825. user.log.Debugln("Updating m.direct list on homeserver")
  826. var err error
  827. if user.bridge.Config.Homeserver.Asmux {
  828. urlPath := intent.BuildBaseURL("_matrix", "client", "unstable", "com.beeper.asmux", "dms")
  829. _, err = intent.MakeFullRequest(mautrix.FullRequest{
  830. Method: method,
  831. URL: urlPath,
  832. Headers: http.Header{"X-Asmux-Auth": {user.bridge.AS.Registration.AppToken}},
  833. RequestJSON: chats,
  834. })
  835. } else {
  836. existingChats := make(map[id.UserID][]id.RoomID)
  837. err = intent.GetAccountData(event.AccountDataDirectChats.Type, &existingChats)
  838. if err != nil {
  839. user.log.Warnln("Failed to get m.direct list to update it:", err)
  840. return
  841. }
  842. for userID, rooms := range existingChats {
  843. if _, ok := user.bridge.ParsePuppetMXID(userID); !ok {
  844. // This is not a ghost user, include it in the new list
  845. chats[userID] = rooms
  846. } else if _, ok := chats[userID]; !ok && method == http.MethodPatch {
  847. // This is a ghost user, but we're not replacing the whole list, so include it too
  848. chats[userID] = rooms
  849. }
  850. }
  851. err = intent.SetAccountData(event.AccountDataDirectChats.Type, &chats)
  852. }
  853. if err != nil {
  854. user.log.Warnln("Failed to update m.direct list:", err)
  855. }
  856. }
  857. func (user *User) HandleContactList(contacts []whatsapp.Contact) {
  858. contactMap := make(map[whatsapp.JID]whatsapp.Contact)
  859. for _, contact := range contacts {
  860. contactMap[contact.JID] = contact
  861. }
  862. go user.syncPuppets(contactMap)
  863. }
  864. func (user *User) syncPuppets(contacts map[whatsapp.JID]whatsapp.Contact) {
  865. if contacts == nil {
  866. contacts = user.Conn.Store.Contacts
  867. }
  868. _, hasSelf := contacts[user.JID]
  869. if !hasSelf {
  870. contacts[user.JID] = whatsapp.Contact{
  871. Name: user.pushName,
  872. Notify: user.pushName,
  873. JID: user.JID,
  874. }
  875. }
  876. user.log.Infoln("Syncing puppet info from contacts")
  877. for jid, contact := range contacts {
  878. if strings.HasSuffix(jid, whatsapp.NewUserSuffix) {
  879. puppet := user.bridge.GetPuppetByJID(contact.JID)
  880. puppet.Sync(user, contact)
  881. } else if strings.HasSuffix(jid, whatsapp.BroadcastSuffix) {
  882. portal := user.GetPortalByJID(contact.JID)
  883. portal.Sync(user, contact)
  884. }
  885. }
  886. user.log.Infoln("Finished syncing puppet info from contacts")
  887. }
  888. func (user *User) updateLastConnectionIfNecessary() {
  889. if user.LastConnection+60 < time.Now().Unix() {
  890. user.UpdateLastConnection()
  891. }
  892. }
  893. func (user *User) HandleError(err error) {
  894. if !errors.Is(err, whatsapp.ErrInvalidWsData) {
  895. user.log.Errorfln("WhatsApp error: %v", err)
  896. }
  897. if closed, ok := err.(*whatsapp.ErrConnectionClosed); ok {
  898. if user.Session == nil {
  899. user.log.Debugln("Websocket disconnected, but no session stored, not trying to reconnect")
  900. return
  901. }
  902. user.bridge.Metrics.TrackDisconnection(user.MXID)
  903. if closed.Code == 1000 && user.cleanDisconnection {
  904. user.cleanDisconnection = false
  905. if !user.bridge.Config.Bridge.AggressiveReconnect {
  906. user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
  907. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  908. user.log.Infoln("Clean disconnection by server")
  909. return
  910. } else {
  911. user.log.Debugln("Clean disconnection by server, but aggressive reconnection is enabled")
  912. }
  913. }
  914. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection was closed with websocket status code %d", closed.Code))
  915. } else if failed, ok := err.(*whatsapp.ErrConnectionFailed); ok {
  916. disconnectErr := user.Conn.Disconnect()
  917. if disconnectErr != nil {
  918. user.log.Warnln("Failed to disconnect after connection fail:", disconnectErr)
  919. }
  920. user.bridge.Metrics.TrackDisconnection(user.MXID)
  921. user.ConnectionErrors++
  922. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection failed: %v", failed.Err))
  923. } else if err == whatsapp.ErrPingFalse || err == whatsapp.ErrWebsocketKeepaliveFailed {
  924. disconnectErr := user.Conn.Disconnect()
  925. if disconnectErr != nil {
  926. user.log.Warnln("Failed to disconnect after failed ping:", disconnectErr)
  927. }
  928. user.bridge.Metrics.TrackDisconnection(user.MXID)
  929. user.ConnectionErrors++
  930. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection failed: %v", err))
  931. }
  932. // Otherwise unknown error, probably mostly harmless
  933. }
  934. func (user *User) tryReconnect(msg string) {
  935. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  936. if user.ConnectionErrors > user.bridge.Config.Bridge.MaxConnectionAttempts {
  937. user.sendMarkdownBridgeAlert("%s. Use the `reconnect` command to reconnect.", msg)
  938. user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
  939. return
  940. }
  941. if user.bridge.Config.Bridge.ReportConnectionRetry {
  942. user.sendBridgeNotice("%s. Reconnecting...", msg)
  943. // Don't want the same error to be repeated
  944. msg = ""
  945. }
  946. var tries uint
  947. var exponentialBackoff bool
  948. baseDelay := time.Duration(user.bridge.Config.Bridge.ConnectionRetryDelay)
  949. if baseDelay < 0 {
  950. exponentialBackoff = true
  951. baseDelay = -baseDelay + 1
  952. }
  953. delay := baseDelay
  954. ctx, cancel := context.WithCancel(context.Background())
  955. defer cancel()
  956. user.cancelReconnect = cancel
  957. for user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
  958. select {
  959. case <-ctx.Done():
  960. user.log.Debugln("tryReconnect context cancelled, aborting reconnection attempts")
  961. return
  962. default:
  963. }
  964. user.sendBridgeState(BridgeState{StateEvent: StateConnecting, Error: WAConnecting})
  965. err := user.Conn.Restore(true, ctx)
  966. if err == nil {
  967. user.ConnectionErrors = 0
  968. if user.bridge.Config.Bridge.ReportConnectionRetry {
  969. user.sendBridgeNotice("Reconnected successfully")
  970. }
  971. user.PostLogin()
  972. return
  973. } else if errors.Is(err, whatsapp.ErrBadRequest) {
  974. user.log.Warnln("Got init 400 error when trying to reconnect, resetting connection...")
  975. err = user.Conn.Disconnect()
  976. if err != nil {
  977. user.log.Debugln("Error while disconnecting for connection reset:", err)
  978. }
  979. } else if errors.Is(err, whatsapp.ErrUnpaired) || errors.Is(err, whatsapp.ErrInvalidSession) {
  980. user.log.Errorfln("Got init %s error when trying to reconnect, not retrying", err)
  981. user.removeFromJIDMap(StateBadCredentials)
  982. //user.JID = ""
  983. user.SetSession(nil)
  984. user.DeleteConnection()
  985. errMsg := "unpaired from phone"
  986. if errors.Is(err, whatsapp.ErrInvalidSession) {
  987. errMsg = "invalid session"
  988. }
  989. user.sendMarkdownBridgeAlert("\u26a0 Failed to reconnect to WhatsApp: %s. " +
  990. "To re-pair your phone, log in again.", errMsg)
  991. return
  992. } else if errors.Is(err, whatsapp.ErrAlreadyLoggedIn) {
  993. user.log.Warnln("Reconnection said we're already logged in, not trying anymore")
  994. return
  995. } else {
  996. user.log.Errorln("Error while trying to reconnect after disconnection:", err)
  997. }
  998. tries++
  999. user.ConnectionErrors++
  1000. if user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
  1001. if exponentialBackoff {
  1002. delay = (1 << tries) + baseDelay
  1003. }
  1004. if user.bridge.Config.Bridge.ReportConnectionRetry {
  1005. user.sendBridgeNotice("Reconnection attempt failed: %v. Retrying in %d seconds...", err, delay)
  1006. }
  1007. time.Sleep(delay * time.Second)
  1008. }
  1009. }
  1010. user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
  1011. if user.bridge.Config.Bridge.ReportConnectionRetry {
  1012. user.sendMarkdownBridgeAlert("%d reconnection attempts failed. Use the `reconnect` command to try to reconnect manually.", tries)
  1013. } else {
  1014. user.sendMarkdownBridgeAlert("\u26a0 %s. Additionally, %d reconnection attempts failed. Use the `reconnect` command to try to reconnect.", msg, tries)
  1015. }
  1016. }
  1017. func (user *User) PortalKey(jid whatsapp.JID) database.PortalKey {
  1018. return database.NewPortalKey(jid, user.JID)
  1019. }
  1020. func (user *User) GetPortalByJID(jid whatsapp.JID) *Portal {
  1021. return user.bridge.GetPortalByJID(user.PortalKey(jid))
  1022. }
  1023. func (user *User) runMessageRingBuffer() {
  1024. for msg := range user.messageInput {
  1025. select {
  1026. case user.messageOutput <- msg:
  1027. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  1028. default:
  1029. dropped := <-user.messageOutput
  1030. user.log.Warnln("Buffer is full, dropping message in", dropped.chat)
  1031. user.messageOutput <- msg
  1032. }
  1033. }
  1034. }
  1035. func (user *User) handleMessageLoop() {
  1036. for {
  1037. select {
  1038. case msg := <-user.messageOutput:
  1039. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  1040. user.GetPortalByJID(msg.chat).messages <- msg
  1041. case <-user.syncStart:
  1042. user.log.Debugln("Processing of incoming messages is locked")
  1043. user.bridge.Metrics.TrackSyncLock(user.JID, true)
  1044. user.syncWait.Wait()
  1045. user.bridge.Metrics.TrackSyncLock(user.JID, false)
  1046. user.log.Debugln("Processing of incoming messages unlocked")
  1047. }
  1048. }
  1049. }
  1050. func (user *User) HandleNewContact(contact whatsapp.Contact) {
  1051. user.log.Debugfln("Contact message: %+v", contact)
  1052. if strings.HasSuffix(contact.JID, whatsapp.OldUserSuffix) {
  1053. contact.JID = strings.Replace(contact.JID, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, -1)
  1054. }
  1055. if strings.HasSuffix(contact.JID, whatsapp.NewUserSuffix) {
  1056. puppet := user.bridge.GetPuppetByJID(contact.JID)
  1057. puppet.UpdateName(user, contact)
  1058. } else if strings.HasSuffix(contact.JID, whatsapp.BroadcastSuffix) {
  1059. portal := user.GetPortalByJID(contact.JID)
  1060. portal.UpdateName(contact.Name, "", nil, true)
  1061. }
  1062. }
  1063. func (user *User) HandleBatteryMessage(battery whatsapp.BatteryMessage) {
  1064. user.log.Debugfln("Battery message: %+v", battery)
  1065. var notice string
  1066. if !battery.Plugged && battery.Percentage < 15 && user.batteryWarningsSent < 1 {
  1067. notice = fmt.Sprintf("Phone battery low (%d %% remaining)", battery.Percentage)
  1068. user.batteryWarningsSent = 1
  1069. } else if !battery.Plugged && battery.Percentage < 5 && user.batteryWarningsSent < 2 {
  1070. notice = fmt.Sprintf("Phone battery very low (%d %% remaining)", battery.Percentage)
  1071. user.batteryWarningsSent = 2
  1072. } else if battery.Percentage > 15 || battery.Plugged {
  1073. user.batteryWarningsSent = 0
  1074. }
  1075. if notice != "" {
  1076. go user.sendBridgeNotice("%s", notice)
  1077. }
  1078. }
  1079. type FakeMessage struct {
  1080. Text string
  1081. ID string
  1082. Alert bool
  1083. }
  1084. func (user *User) HandleCallInfo(info whatsapp.CallInfo) {
  1085. if info.Data != nil {
  1086. return
  1087. }
  1088. data := FakeMessage{
  1089. ID: info.ID,
  1090. }
  1091. switch info.Type {
  1092. case whatsapp.CallOffer:
  1093. if !user.bridge.Config.Bridge.CallNotices.Start {
  1094. return
  1095. }
  1096. data.Text = "Incoming call"
  1097. data.Alert = true
  1098. case whatsapp.CallOfferVideo:
  1099. if !user.bridge.Config.Bridge.CallNotices.Start {
  1100. return
  1101. }
  1102. data.Text = "Incoming video call"
  1103. data.Alert = true
  1104. case whatsapp.CallTerminate:
  1105. if !user.bridge.Config.Bridge.CallNotices.End {
  1106. return
  1107. }
  1108. data.Text = "Call ended"
  1109. data.ID += "E"
  1110. default:
  1111. return
  1112. }
  1113. portal := user.GetPortalByJID(info.From)
  1114. if portal != nil {
  1115. portal.messages <- PortalMessage{info.From, user, data, 0}
  1116. }
  1117. }
  1118. func (user *User) HandlePresence(info whatsapp.PresenceEvent) {
  1119. puppet := user.bridge.GetPuppetByJID(info.SenderJID)
  1120. switch info.Status {
  1121. case whatsapp.PresenceUnavailable:
  1122. _ = puppet.DefaultIntent().SetPresence("offline")
  1123. case whatsapp.PresenceAvailable:
  1124. if len(puppet.typingIn) > 0 && puppet.typingAt+15 > time.Now().Unix() {
  1125. portal := user.bridge.GetPortalByMXID(puppet.typingIn)
  1126. _, _ = puppet.IntentFor(portal).UserTyping(puppet.typingIn, false, 0)
  1127. puppet.typingIn = ""
  1128. puppet.typingAt = 0
  1129. } else {
  1130. _ = puppet.DefaultIntent().SetPresence("online")
  1131. }
  1132. case whatsapp.PresenceComposing:
  1133. portal := user.GetPortalByJID(info.JID)
  1134. if len(puppet.typingIn) > 0 && puppet.typingAt+15 > time.Now().Unix() {
  1135. if puppet.typingIn == portal.MXID {
  1136. return
  1137. }
  1138. _, _ = puppet.IntentFor(portal).UserTyping(puppet.typingIn, false, 0)
  1139. }
  1140. if len(portal.MXID) > 0 {
  1141. puppet.typingIn = portal.MXID
  1142. puppet.typingAt = time.Now().Unix()
  1143. _, _ = puppet.IntentFor(portal).UserTyping(portal.MXID, true, 15*1000)
  1144. }
  1145. }
  1146. }
  1147. func (user *User) HandleMsgInfo(info whatsapp.JSONMsgInfo) {
  1148. if (info.Command == whatsapp.MsgInfoCommandAck || info.Command == whatsapp.MsgInfoCommandAcks) && info.Acknowledgement == whatsapp.AckMessageRead {
  1149. portal := user.GetPortalByJID(info.ToJID)
  1150. if len(portal.MXID) == 0 {
  1151. return
  1152. }
  1153. intent := user.bridge.GetPuppetByJID(info.SenderJID).IntentFor(portal)
  1154. for _, msgID := range info.IDs {
  1155. msg := user.bridge.DB.Message.GetByJID(portal.Key, msgID)
  1156. if msg == nil || msg.IsFakeMXID() {
  1157. continue
  1158. }
  1159. err := intent.MarkReadWithContent(portal.MXID, msg.MXID, &CustomReadReceipt{DoublePuppet: intent.IsCustomPuppet})
  1160. if err != nil {
  1161. user.log.Warnfln("Failed to mark message %s as read by %s: %v", msg.MXID, info.SenderJID, err)
  1162. }
  1163. }
  1164. }
  1165. }
  1166. func (user *User) HandleReceivedMessage(received whatsapp.ReceivedMessage) {
  1167. if received.Type == "read" {
  1168. go user.markSelfRead(received.Jid, received.Index)
  1169. } else {
  1170. user.log.Debugfln("Unknown received message type: %+v", received)
  1171. }
  1172. }
  1173. func (user *User) HandleReadMessage(read whatsapp.ReadMessage) {
  1174. user.log.Debugfln("Received chat read message: %+v", read)
  1175. go user.markSelfRead(read.Jid, "")
  1176. }
  1177. func (user *User) markSelfRead(jid, messageID string) {
  1178. if strings.HasSuffix(jid, whatsapp.OldUserSuffix) {
  1179. jid = strings.Replace(jid, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, -1)
  1180. }
  1181. puppet := user.bridge.GetPuppetByJID(user.JID)
  1182. if puppet == nil {
  1183. return
  1184. }
  1185. intent := puppet.CustomIntent()
  1186. if intent == nil {
  1187. return
  1188. }
  1189. portal := user.GetPortalByJID(jid)
  1190. if portal == nil {
  1191. return
  1192. }
  1193. var message *database.Message
  1194. if messageID == "" {
  1195. message = user.bridge.DB.Message.GetLastInChat(portal.Key)
  1196. if message == nil {
  1197. return
  1198. }
  1199. user.log.Debugfln("User read chat %s/%s in WhatsApp mobile (last known event: %s/%s)", portal.Key.JID, portal.MXID, message.JID, message.MXID)
  1200. } else {
  1201. message = user.bridge.DB.Message.GetByJID(portal.Key, messageID)
  1202. if message == nil || message.IsFakeMXID() {
  1203. return
  1204. }
  1205. user.log.Debugfln("User read message %s/%s in %s/%s in WhatsApp mobile", message.JID, message.MXID, portal.Key.JID, portal.MXID)
  1206. }
  1207. err := intent.MarkReadWithContent(portal.MXID, message.MXID, &CustomReadReceipt{DoublePuppet: true})
  1208. if err != nil {
  1209. user.log.Warnfln("Failed to bridge own read receipt in %s: %v", jid, err)
  1210. }
  1211. }
  1212. func (user *User) HandleCommand(cmd whatsapp.JSONCommand) {
  1213. switch cmd.Type {
  1214. case whatsapp.CommandPicture:
  1215. if strings.HasSuffix(cmd.JID, whatsapp.NewUserSuffix) {
  1216. puppet := user.bridge.GetPuppetByJID(cmd.JID)
  1217. go puppet.UpdateAvatar(user, cmd.ProfilePicInfo)
  1218. } else if user.bridge.Config.Bridge.ChatMetaSync {
  1219. portal := user.GetPortalByJID(cmd.JID)
  1220. go portal.UpdateAvatar(user, cmd.ProfilePicInfo, true)
  1221. }
  1222. case whatsapp.CommandDisconnect:
  1223. if cmd.Kind == "replaced" {
  1224. user.cleanDisconnection = true
  1225. go user.sendMarkdownBridgeAlert("\u26a0 Your WhatsApp connection was closed by the server because you opened another WhatsApp Web client.\n\n" +
  1226. "Use the `reconnect` command to disconnect the other client and resume bridging.")
  1227. } else {
  1228. user.log.Warnln("Unknown kind of disconnect:", string(cmd.Raw))
  1229. go user.sendMarkdownBridgeAlert("\u26a0 Your WhatsApp connection was closed by the server (reason code: %s).\n\n"+
  1230. "Use the `reconnect` command to reconnect.", cmd.Kind)
  1231. }
  1232. }
  1233. }
  1234. func (user *User) HandleChatUpdate(cmd whatsapp.ChatUpdate) {
  1235. if cmd.Command != whatsapp.ChatUpdateCommandAction {
  1236. return
  1237. }
  1238. portal := user.GetPortalByJID(cmd.JID)
  1239. if len(portal.MXID) == 0 {
  1240. if cmd.Data.Action == whatsapp.ChatActionIntroduce || cmd.Data.Action == whatsapp.ChatActionCreate {
  1241. go func() {
  1242. err := portal.CreateMatrixRoom(user)
  1243. if err != nil {
  1244. user.log.Errorln("Failed to create portal room after receiving join event:", err)
  1245. }
  1246. }()
  1247. }
  1248. return
  1249. }
  1250. // These don't come down the message history :(
  1251. switch cmd.Data.Action {
  1252. case whatsapp.ChatActionAddTopic:
  1253. go portal.UpdateTopic(cmd.Data.AddTopic.Topic, cmd.Data.SenderJID, nil, true)
  1254. case whatsapp.ChatActionRemoveTopic:
  1255. go portal.UpdateTopic("", cmd.Data.SenderJID, nil, true)
  1256. case whatsapp.ChatActionRemove:
  1257. // We ignore leaving groups in the message history to avoid accidentally leaving rejoined groups,
  1258. // but if we get a real-time command that says we left, it should be safe to bridge it.
  1259. if !user.bridge.Config.Bridge.ChatMetaSync {
  1260. for _, jid := range cmd.Data.UserChange.JIDs {
  1261. if jid == user.JID {
  1262. go portal.HandleWhatsAppKick(nil, cmd.Data.SenderJID, cmd.Data.UserChange.JIDs)
  1263. break
  1264. }
  1265. }
  1266. }
  1267. }
  1268. if !user.bridge.Config.Bridge.ChatMetaSync {
  1269. // Ignore chat update commands, we're relying on the message history.
  1270. return
  1271. }
  1272. switch cmd.Data.Action {
  1273. case whatsapp.ChatActionNameChange:
  1274. go portal.UpdateName(cmd.Data.NameChange.Name, cmd.Data.SenderJID, nil, true)
  1275. case whatsapp.ChatActionPromote:
  1276. go portal.ChangeAdminStatus(cmd.Data.UserChange.JIDs, true)
  1277. case whatsapp.ChatActionDemote:
  1278. go portal.ChangeAdminStatus(cmd.Data.UserChange.JIDs, false)
  1279. case whatsapp.ChatActionAnnounce:
  1280. go portal.RestrictMessageSending(cmd.Data.Announce)
  1281. case whatsapp.ChatActionRestrict:
  1282. go portal.RestrictMetadataChanges(cmd.Data.Restrict)
  1283. case whatsapp.ChatActionRemove:
  1284. go portal.HandleWhatsAppKick(nil, cmd.Data.SenderJID, cmd.Data.UserChange.JIDs)
  1285. case whatsapp.ChatActionAdd:
  1286. go portal.HandleWhatsAppInvite(user, cmd.Data.SenderJID, nil, cmd.Data.UserChange.JIDs)
  1287. case whatsapp.ChatActionIntroduce:
  1288. if cmd.Data.SenderJID != "unknown" {
  1289. go portal.Sync(user, whatsapp.Contact{JID: portal.Key.JID})
  1290. }
  1291. }
  1292. }
  1293. func (user *User) HandleConnInfo(info whatsapp.ConnInfo) {
  1294. if user.Session != nil && info.Connected && len(info.ClientToken) > 0 {
  1295. user.log.Debugln("Received new tokens")
  1296. user.Session.ClientToken = info.ClientToken
  1297. user.Session.ServerToken = info.ServerToken
  1298. user.Session.Wid = info.WID
  1299. user.Update()
  1300. }
  1301. if len(info.PushName) > 0 {
  1302. user.pushName = info.PushName
  1303. }
  1304. }
  1305. func (user *User) HandleJSONMessage(evt whatsapp.RawJSONMessage) {
  1306. if !json.Valid(evt.RawMessage) {
  1307. return
  1308. }
  1309. user.log.Debugfln("JSON message with tag %s: %s", evt.Tag, evt.RawMessage)
  1310. user.updateLastConnectionIfNecessary()
  1311. }
  1312. func (user *User) NeedsRelaybot(portal *Portal) bool {
  1313. return !user.HasSession() || !user.IsInPortal(portal.Key)
  1314. }