user.go 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411
  1. // mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
  2. // Copyright (C) 2020 Tulir Asokan
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package main
  17. import (
  18. "context"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "net/http"
  23. "sort"
  24. "strings"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/skip2/go-qrcode"
  29. log "maunium.net/go/maulogger/v2"
  30. "maunium.net/go/mautrix/appservice"
  31. "maunium.net/go/mautrix/pushrules"
  32. "github.com/Rhymen/go-whatsapp"
  33. waBinary "github.com/Rhymen/go-whatsapp/binary"
  34. waProto "github.com/Rhymen/go-whatsapp/binary/proto"
  35. "maunium.net/go/mautrix"
  36. "maunium.net/go/mautrix/event"
  37. "maunium.net/go/mautrix/format"
  38. "maunium.net/go/mautrix/id"
  39. "maunium.net/go/mautrix-whatsapp/database"
  40. )
  41. type User struct {
  42. *database.User
  43. Conn *whatsapp.Conn
  44. bridge *Bridge
  45. log log.Logger
  46. Admin bool
  47. Whitelisted bool
  48. RelaybotWhitelisted bool
  49. IsRelaybot bool
  50. ConnectionErrors int
  51. CommunityID string
  52. cleanDisconnection bool
  53. batteryWarningsSent int
  54. lastReconnection int64
  55. pushName string
  56. chatListReceived chan struct{}
  57. syncPortalsDone chan struct{}
  58. messageInput chan PortalMessage
  59. messageOutput chan PortalMessage
  60. syncStart chan struct{}
  61. syncWait sync.WaitGroup
  62. syncing int32
  63. mgmtCreateLock sync.Mutex
  64. connLock sync.Mutex
  65. cancelReconnect func()
  66. prevBridgeStatus *BridgeState
  67. }
  68. func (bridge *Bridge) GetUserByMXID(userID id.UserID) *User {
  69. _, isPuppet := bridge.ParsePuppetMXID(userID)
  70. if isPuppet || userID == bridge.Bot.UserID {
  71. return nil
  72. }
  73. bridge.usersLock.Lock()
  74. defer bridge.usersLock.Unlock()
  75. user, ok := bridge.usersByMXID[userID]
  76. if !ok {
  77. return bridge.loadDBUser(bridge.DB.User.GetByMXID(userID), &userID)
  78. }
  79. return user
  80. }
  81. func (bridge *Bridge) GetUserByJID(userID whatsapp.JID) *User {
  82. bridge.usersLock.Lock()
  83. defer bridge.usersLock.Unlock()
  84. user, ok := bridge.usersByJID[userID]
  85. if !ok {
  86. return bridge.loadDBUser(bridge.DB.User.GetByJID(userID), nil)
  87. }
  88. return user
  89. }
  90. func (user *User) addToJIDMap() {
  91. user.bridge.usersLock.Lock()
  92. user.bridge.usersByJID[user.JID] = user
  93. user.bridge.usersLock.Unlock()
  94. }
  95. func (user *User) removeFromJIDMap(state BridgeStateEvent) {
  96. user.bridge.usersLock.Lock()
  97. jidUser, ok := user.bridge.usersByJID[user.JID]
  98. if ok && user == jidUser {
  99. delete(user.bridge.usersByJID, user.JID)
  100. }
  101. user.bridge.usersLock.Unlock()
  102. user.bridge.Metrics.TrackLoginState(user.JID, false)
  103. user.sendBridgeState(BridgeState{StateEvent: state, Error: WANotLoggedIn})
  104. }
  105. func (bridge *Bridge) GetAllUsers() []*User {
  106. bridge.usersLock.Lock()
  107. defer bridge.usersLock.Unlock()
  108. dbUsers := bridge.DB.User.GetAll()
  109. output := make([]*User, len(dbUsers))
  110. for index, dbUser := range dbUsers {
  111. user, ok := bridge.usersByMXID[dbUser.MXID]
  112. if !ok {
  113. user = bridge.loadDBUser(dbUser, nil)
  114. }
  115. output[index] = user
  116. }
  117. return output
  118. }
  119. func (bridge *Bridge) loadDBUser(dbUser *database.User, mxid *id.UserID) *User {
  120. if dbUser == nil {
  121. if mxid == nil {
  122. return nil
  123. }
  124. dbUser = bridge.DB.User.New()
  125. dbUser.MXID = *mxid
  126. dbUser.Insert()
  127. }
  128. user := bridge.NewUser(dbUser)
  129. bridge.usersByMXID[user.MXID] = user
  130. if len(user.JID) > 0 {
  131. bridge.usersByJID[user.JID] = user
  132. }
  133. if len(user.ManagementRoom) > 0 {
  134. bridge.managementRooms[user.ManagementRoom] = user
  135. }
  136. return user
  137. }
  138. func (user *User) GetPortals() []*Portal {
  139. keys := user.User.GetPortalKeys()
  140. portals := make([]*Portal, len(keys))
  141. user.bridge.portalsLock.Lock()
  142. for i, key := range keys {
  143. portal, ok := user.bridge.portalsByJID[key]
  144. if !ok {
  145. portal = user.bridge.loadDBPortal(user.bridge.DB.Portal.GetByJID(key), &key)
  146. }
  147. portals[i] = portal
  148. }
  149. user.bridge.portalsLock.Unlock()
  150. return portals
  151. }
  152. func (bridge *Bridge) NewUser(dbUser *database.User) *User {
  153. user := &User{
  154. User: dbUser,
  155. bridge: bridge,
  156. log: bridge.Log.Sub("User").Sub(string(dbUser.MXID)),
  157. IsRelaybot: false,
  158. chatListReceived: make(chan struct{}, 1),
  159. syncPortalsDone: make(chan struct{}, 1),
  160. syncStart: make(chan struct{}, 1),
  161. messageInput: make(chan PortalMessage),
  162. messageOutput: make(chan PortalMessage, bridge.Config.Bridge.UserMessageBuffer),
  163. }
  164. user.RelaybotWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelaybotWhitelisted(user.MXID)
  165. user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID)
  166. user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID)
  167. go user.handleMessageLoop()
  168. go user.runMessageRingBuffer()
  169. return user
  170. }
  171. func (user *User) GetManagementRoom() id.RoomID {
  172. if len(user.ManagementRoom) == 0 {
  173. user.mgmtCreateLock.Lock()
  174. defer user.mgmtCreateLock.Unlock()
  175. if len(user.ManagementRoom) > 0 {
  176. return user.ManagementRoom
  177. }
  178. resp, err := user.bridge.Bot.CreateRoom(&mautrix.ReqCreateRoom{
  179. Topic: "WhatsApp bridge notices",
  180. IsDirect: true,
  181. })
  182. if err != nil {
  183. user.log.Errorln("Failed to auto-create management room:", err)
  184. } else {
  185. user.SetManagementRoom(resp.RoomID)
  186. }
  187. }
  188. return user.ManagementRoom
  189. }
  190. func (user *User) SetManagementRoom(roomID id.RoomID) {
  191. existingUser, ok := user.bridge.managementRooms[roomID]
  192. if ok {
  193. existingUser.ManagementRoom = ""
  194. existingUser.Update()
  195. }
  196. user.ManagementRoom = roomID
  197. user.bridge.managementRooms[user.ManagementRoom] = user
  198. user.Update()
  199. }
  200. func (user *User) SetSession(session *whatsapp.Session) {
  201. if session == nil {
  202. user.Session = nil
  203. user.LastConnection = 0
  204. } else if len(session.Wid) > 0 {
  205. user.Session = session
  206. } else {
  207. return
  208. }
  209. user.Update()
  210. }
  211. func (user *User) Connect(evenIfNoSession bool) bool {
  212. user.connLock.Lock()
  213. if user.Conn != nil {
  214. user.connLock.Unlock()
  215. if user.Conn.IsConnected() {
  216. return true
  217. } else {
  218. return user.RestoreSession()
  219. }
  220. } else if !evenIfNoSession && user.Session == nil {
  221. user.connLock.Unlock()
  222. return false
  223. }
  224. user.log.Debugln("Connecting to WhatsApp")
  225. if user.Session != nil {
  226. user.sendBridgeState(BridgeState{StateEvent: StateConnecting, Error: WAConnecting})
  227. }
  228. timeout := time.Duration(user.bridge.Config.Bridge.ConnectionTimeout)
  229. if timeout == 0 {
  230. timeout = 20
  231. }
  232. user.Conn = whatsapp.NewConn(&whatsapp.Options{
  233. Timeout: timeout * time.Second,
  234. LongClientName: user.bridge.Config.WhatsApp.OSName,
  235. ShortClientName: user.bridge.Config.WhatsApp.BrowserName,
  236. ClientVersion: WAVersion,
  237. Log: user.log.Sub("Conn"),
  238. Handler: []whatsapp.Handler{user},
  239. })
  240. user.setupAdminTestHooks()
  241. user.connLock.Unlock()
  242. return user.RestoreSession()
  243. }
  244. func (user *User) DeleteConnection() {
  245. user.connLock.Lock()
  246. if user.Conn == nil {
  247. user.connLock.Unlock()
  248. return
  249. }
  250. err := user.Conn.Disconnect()
  251. if err != nil && err != whatsapp.ErrNotConnected {
  252. user.log.Warnln("Error disconnecting: %v", err)
  253. }
  254. user.Conn.RemoveHandlers()
  255. user.Conn = nil
  256. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  257. user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
  258. user.connLock.Unlock()
  259. }
  260. func (user *User) RestoreSession() bool {
  261. if user.Session != nil {
  262. user.Conn.SetSession(*user.Session)
  263. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
  264. defer cancel()
  265. err := user.Conn.Restore(true, ctx)
  266. if err == whatsapp.ErrAlreadyLoggedIn {
  267. return true
  268. } else if err != nil {
  269. user.log.Errorln("Failed to restore session:", err)
  270. if errors.Is(err, whatsapp.ErrUnpaired) {
  271. user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp: unpaired from phone. " +
  272. "To re-pair your phone, log in again.")
  273. user.removeFromJIDMap(StateBadCredentials)
  274. //user.JID = ""
  275. user.SetSession(nil)
  276. user.DeleteConnection()
  277. return false
  278. } else {
  279. user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
  280. user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp. Make sure WhatsApp " +
  281. "on your phone is reachable and use `reconnect` to try connecting again.")
  282. }
  283. user.log.Debugln("Disconnecting due to failed session restore...")
  284. err = user.Conn.Disconnect()
  285. if err != nil {
  286. user.log.Errorln("Failed to disconnect after failed session restore:", err)
  287. }
  288. return false
  289. }
  290. user.ConnectionErrors = 0
  291. user.log.Debugln("Session restored successfully")
  292. user.PostLogin()
  293. }
  294. return true
  295. }
  296. func (user *User) HasSession() bool {
  297. return user.Session != nil
  298. }
  299. func (user *User) IsConnected() bool {
  300. return user.Conn != nil && user.Conn.IsConnected() && user.Conn.IsLoggedIn()
  301. }
  302. func (user *User) IsLoginInProgress() bool {
  303. return user.Conn != nil && user.Conn.IsLoginInProgress()
  304. }
  305. func (user *User) loginQrChannel(ce *CommandEvent, qrChan <-chan string, eventIDChan chan<- id.EventID) {
  306. var qrEventID id.EventID
  307. for code := range qrChan {
  308. if code == "stop" {
  309. return
  310. }
  311. qrCode, err := qrcode.Encode(code, qrcode.Low, 256)
  312. if err != nil {
  313. user.log.Errorln("Failed to encode QR code:", err)
  314. ce.Reply("Failed to encode QR code: %v", err)
  315. return
  316. }
  317. bot := user.bridge.AS.BotClient()
  318. resp, err := bot.UploadBytes(qrCode, "image/png")
  319. if err != nil {
  320. user.log.Errorln("Failed to upload QR code:", err)
  321. ce.Reply("Failed to upload QR code: %v", err)
  322. return
  323. }
  324. if qrEventID == "" {
  325. sendResp, err := bot.SendImage(ce.RoomID, code, resp.ContentURI)
  326. if err != nil {
  327. user.log.Errorln("Failed to send QR code to user:", err)
  328. return
  329. }
  330. qrEventID = sendResp.EventID
  331. eventIDChan <- qrEventID
  332. } else {
  333. _, err = bot.SendMessageEvent(ce.RoomID, event.EventMessage, &event.MessageEventContent{
  334. MsgType: event.MsgImage,
  335. Body: code,
  336. URL: resp.ContentURI.CUString(),
  337. NewContent: &event.MessageEventContent{
  338. MsgType: event.MsgImage,
  339. Body: code,
  340. URL: resp.ContentURI.CUString(),
  341. },
  342. RelatesTo: &event.RelatesTo{
  343. Type: event.RelReplace,
  344. EventID: qrEventID,
  345. },
  346. })
  347. if err != nil {
  348. user.log.Errorln("Failed to send edited QR code to user:", err)
  349. }
  350. }
  351. }
  352. }
  353. func (user *User) Login(ce *CommandEvent) {
  354. qrChan := make(chan string, 3)
  355. eventIDChan := make(chan id.EventID, 1)
  356. go user.loginQrChannel(ce, qrChan, eventIDChan)
  357. session, jid, err := user.Conn.Login(qrChan, nil)
  358. qrChan <- "stop"
  359. if err != nil {
  360. var eventID id.EventID
  361. select {
  362. case eventID = <-eventIDChan:
  363. default:
  364. }
  365. reply := event.MessageEventContent{
  366. MsgType: event.MsgText,
  367. }
  368. if err == whatsapp.ErrAlreadyLoggedIn {
  369. reply.Body = "You're already logged in"
  370. } else if err == whatsapp.ErrLoginInProgress {
  371. reply.Body = "You have a login in progress already."
  372. } else if err == whatsapp.ErrLoginTimedOut {
  373. reply.Body = "QR code scan timed out. Please try again."
  374. } else {
  375. user.log.Warnln("Failed to log in:", err)
  376. reply.Body = fmt.Sprintf("Unknown error while logging in: %v", err)
  377. }
  378. msg := reply
  379. if eventID != "" {
  380. msg.NewContent = &reply
  381. msg.RelatesTo = &event.RelatesTo{
  382. Type: event.RelReplace,
  383. EventID: eventID,
  384. }
  385. }
  386. _, _ = ce.Bot.SendMessageEvent(ce.RoomID, event.EventMessage, &msg)
  387. return
  388. }
  389. // TODO there's a bit of duplication between this and the provisioning API login method
  390. // Also between the two logout methods (commands.go and provisioning.go)
  391. user.log.Debugln("Successful login as", jid, "via command")
  392. user.ConnectionErrors = 0
  393. user.JID = strings.Replace(jid, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, 1)
  394. user.addToJIDMap()
  395. user.SetSession(&session)
  396. ce.Reply("Successfully logged in, synchronizing chats...")
  397. user.PostLogin()
  398. }
  399. type Chat struct {
  400. whatsapp.Chat
  401. Portal *Portal
  402. Contact whatsapp.Contact
  403. }
  404. type ChatList []Chat
  405. func (cl ChatList) Len() int {
  406. return len(cl)
  407. }
  408. func (cl ChatList) Less(i, j int) bool {
  409. return cl[i].LastMessageTime > cl[j].LastMessageTime
  410. }
  411. func (cl ChatList) Swap(i, j int) {
  412. cl[i], cl[j] = cl[j], cl[i]
  413. }
  414. func (user *User) PostLogin() {
  415. user.sendBridgeState(BridgeState{StateEvent: StateBackfilling})
  416. user.bridge.Metrics.TrackConnectionState(user.JID, true)
  417. user.bridge.Metrics.TrackLoginState(user.JID, true)
  418. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  419. if !atomic.CompareAndSwapInt32(&user.syncing, 0, 1) {
  420. // TODO we should cleanly stop the old sync and start a new one instead of not starting a new one
  421. user.log.Warnln("There seems to be a post-sync already in progress, not starting a new one")
  422. return
  423. }
  424. user.log.Debugln("Locking processing of incoming messages and starting post-login sync")
  425. user.chatListReceived = make(chan struct{}, 1)
  426. user.syncPortalsDone = make(chan struct{}, 1)
  427. user.syncWait.Add(1)
  428. user.syncStart <- struct{}{}
  429. go user.intPostLogin()
  430. }
  431. func (user *User) tryAutomaticDoublePuppeting() {
  432. if !user.bridge.Config.CanDoublePuppet(user.MXID) {
  433. return
  434. }
  435. user.log.Debugln("Checking if double puppeting needs to be enabled")
  436. puppet := user.bridge.GetPuppetByJID(user.JID)
  437. if len(puppet.CustomMXID) > 0 {
  438. user.log.Debugln("User already has double-puppeting enabled")
  439. // Custom puppet already enabled
  440. return
  441. }
  442. accessToken, err := puppet.loginWithSharedSecret(user.MXID)
  443. if err != nil {
  444. user.log.Warnln("Failed to login with shared secret:", err)
  445. return
  446. }
  447. err = puppet.SwitchCustomMXID(accessToken, user.MXID)
  448. if err != nil {
  449. puppet.log.Warnln("Failed to switch to auto-logined custom puppet:", err)
  450. return
  451. }
  452. user.log.Infoln("Successfully automatically enabled custom puppet")
  453. }
  454. func (user *User) sendBridgeNotice(formatString string, args ...interface{}) {
  455. notice := fmt.Sprintf(formatString, args...)
  456. _, err := user.bridge.Bot.SendNotice(user.GetManagementRoom(), notice)
  457. if err != nil {
  458. user.log.Warnf("Failed to send bridge notice \"%s\": %v", notice, err)
  459. }
  460. }
  461. func (user *User) sendMarkdownBridgeAlert(formatString string, args ...interface{}) {
  462. notice := fmt.Sprintf(formatString, args...)
  463. content := format.RenderMarkdown(notice, true, false)
  464. _, err := user.bridge.Bot.SendMessageEvent(user.GetManagementRoom(), event.EventMessage, content)
  465. if err != nil {
  466. user.log.Warnf("Failed to send bridge alert \"%s\": %v", notice, err)
  467. }
  468. }
  469. func (user *User) postConnPing() bool {
  470. user.log.Debugln("Making post-connection ping")
  471. var err error
  472. for i := 0; ; i++ {
  473. err = user.Conn.AdminTest()
  474. if err == nil {
  475. user.log.Debugln("Post-connection ping OK")
  476. return true
  477. } else if errors.Is(err, whatsapp.ErrConnectionTimeout) && i < 5 {
  478. user.log.Warnfln("Post-connection ping timed out, sending new one")
  479. } else {
  480. break
  481. }
  482. }
  483. user.log.Errorfln("Post-connection ping failed: %v. Disconnecting and then reconnecting after a second", err)
  484. disconnectErr := user.Conn.Disconnect()
  485. if disconnectErr != nil {
  486. user.log.Warnln("Error while disconnecting after failed post-connection ping:", disconnectErr)
  487. }
  488. user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
  489. user.bridge.Metrics.TrackDisconnection(user.MXID)
  490. go func() {
  491. time.Sleep(1 * time.Second)
  492. user.tryReconnect(fmt.Sprintf("Post-connection ping failed: %v", err))
  493. }()
  494. return false
  495. }
  496. func (user *User) intPostLogin() {
  497. defer atomic.StoreInt32(&user.syncing, 0)
  498. defer user.syncWait.Done()
  499. user.lastReconnection = time.Now().Unix()
  500. user.createCommunity()
  501. user.tryAutomaticDoublePuppeting()
  502. user.log.Debugln("Waiting for chat list receive confirmation")
  503. select {
  504. case <-user.chatListReceived:
  505. user.log.Debugln("Chat list receive confirmation received in PostLogin")
  506. case <-time.After(time.Duration(user.bridge.Config.Bridge.ChatListWait) * time.Second):
  507. user.log.Warnln("Timed out waiting for chat list to arrive!")
  508. user.postConnPing()
  509. return
  510. }
  511. if !user.postConnPing() {
  512. user.log.Debugln("Post-connection ping failed, unlocking processing of incoming messages.")
  513. return
  514. }
  515. user.log.Debugln("Waiting for portal sync complete confirmation")
  516. select {
  517. case <-user.syncPortalsDone:
  518. user.log.Debugln("Post-connection portal sync complete, unlocking processing of incoming messages.")
  519. // TODO this is too short, maybe a per-portal duration?
  520. case <-time.After(time.Duration(user.bridge.Config.Bridge.PortalSyncWait) * time.Second):
  521. user.log.Warnln("Timed out waiting for portal sync to complete! Unlocking processing of incoming messages.")
  522. }
  523. user.sendBridgeState(BridgeState{StateEvent: StateConnected})
  524. }
  525. type NormalMessage interface {
  526. GetInfo() whatsapp.MessageInfo
  527. }
  528. func (user *User) HandleEvent(event interface{}) {
  529. switch v := event.(type) {
  530. case NormalMessage:
  531. info := v.GetInfo()
  532. user.messageInput <- PortalMessage{info.RemoteJid, user, v, info.Timestamp}
  533. case whatsapp.MessageRevocation:
  534. user.messageInput <- PortalMessage{v.RemoteJid, user, v, 0}
  535. case whatsapp.StreamEvent:
  536. user.HandleStreamEvent(v)
  537. case []whatsapp.Chat:
  538. user.HandleChatList(v)
  539. case []whatsapp.Contact:
  540. user.HandleContactList(v)
  541. case error:
  542. user.HandleError(v)
  543. case whatsapp.Contact:
  544. go user.HandleNewContact(v)
  545. case whatsapp.BatteryMessage:
  546. user.HandleBatteryMessage(v)
  547. case whatsapp.CallInfo:
  548. user.HandleCallInfo(v)
  549. case whatsapp.PresenceEvent:
  550. go user.HandlePresence(v)
  551. case whatsapp.JSONMsgInfo:
  552. go user.HandleMsgInfo(v)
  553. case whatsapp.ReceivedMessage:
  554. user.HandleReceivedMessage(v)
  555. case whatsapp.ReadMessage:
  556. user.HandleReadMessage(v)
  557. case whatsapp.JSONCommand:
  558. user.HandleCommand(v)
  559. case whatsapp.ChatUpdate:
  560. user.HandleChatUpdate(v)
  561. case whatsapp.ConnInfo:
  562. user.HandleConnInfo(v)
  563. case whatsapp.MuteMessage:
  564. portal := user.bridge.GetPortalByJID(user.PortalKey(v.JID))
  565. if portal != nil {
  566. go user.updateChatMute(nil, portal, v.MutedUntil)
  567. }
  568. case whatsapp.ArchiveMessage:
  569. portal := user.bridge.GetPortalByJID(user.PortalKey(v.JID))
  570. if portal != nil {
  571. go user.updateChatTag(nil, portal, user.bridge.Config.Bridge.ArchiveTag, v.IsArchived)
  572. }
  573. case whatsapp.PinMessage:
  574. portal := user.bridge.GetPortalByJID(user.PortalKey(v.JID))
  575. if portal != nil {
  576. go user.updateChatTag(nil, portal, user.bridge.Config.Bridge.PinnedTag, v.IsPinned)
  577. }
  578. case whatsapp.RawJSONMessage:
  579. user.HandleJSONMessage(v)
  580. case *waProto.WebMessageInfo:
  581. user.updateLastConnectionIfNecessary()
  582. // TODO trace log
  583. //user.log.Debugfln("WebMessageInfo: %+v", v)
  584. case *waBinary.Node:
  585. user.log.Debugfln("Unknown binary message: %+v", v)
  586. default:
  587. user.log.Debugfln("Unknown type of event in HandleEvent: %T", v)
  588. }
  589. }
  590. func (user *User) HandleStreamEvent(evt whatsapp.StreamEvent) {
  591. if evt.Type == whatsapp.StreamSleep {
  592. if user.lastReconnection+60 > time.Now().Unix() {
  593. user.lastReconnection = 0
  594. user.log.Infoln("Stream went to sleep soon after reconnection, making new post-connection ping in 20 seconds")
  595. go func() {
  596. time.Sleep(20 * time.Second)
  597. // TODO if this happens during the post-login sync, it can get stuck forever
  598. // TODO check if the above is still true
  599. user.postConnPing()
  600. }()
  601. }
  602. } else {
  603. user.log.Infofln("Stream event: %+v", evt)
  604. }
  605. }
  606. func (user *User) HandleChatList(chats []whatsapp.Chat) {
  607. user.log.Infoln("Chat list received")
  608. chatMap := make(map[string]whatsapp.Chat)
  609. user.Conn.Store.ChatsLock.RLock()
  610. for _, chat := range user.Conn.Store.Chats {
  611. chatMap[chat.JID] = chat
  612. }
  613. user.Conn.Store.ChatsLock.RUnlock()
  614. for _, chat := range chats {
  615. chatMap[chat.JID] = chat
  616. }
  617. select {
  618. case user.chatListReceived <- struct{}{}:
  619. user.log.Debugln("Sent chat list receive confirmation from HandleChatList")
  620. default:
  621. user.log.Debugln("Failed to send chat list receive confirmation from HandleChatList, channel probably full")
  622. }
  623. go user.syncPortals(chatMap, false)
  624. }
  625. func (user *User) updateChatMute(intent *appservice.IntentAPI, portal *Portal, mutedUntil int64) {
  626. if len(portal.MXID) == 0 || !user.bridge.Config.Bridge.MuteBridging {
  627. return
  628. } else if intent == nil {
  629. doublePuppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  630. if doublePuppet == nil || doublePuppet.CustomIntent() == nil {
  631. return
  632. }
  633. intent = doublePuppet.CustomIntent()
  634. }
  635. var err error
  636. if mutedUntil != -1 && mutedUntil < time.Now().Unix() {
  637. user.log.Debugfln("Portal %s is muted until %d, unmuting...", portal.MXID, mutedUntil)
  638. err = intent.DeletePushRule("global", pushrules.RoomRule, string(portal.MXID))
  639. } else {
  640. user.log.Debugfln("Portal %s is muted until %d, muting...", portal.MXID, mutedUntil)
  641. err = intent.PutPushRule("global", pushrules.RoomRule, string(portal.MXID), &mautrix.ReqPutPushRule{
  642. Actions: []pushrules.PushActionType{pushrules.ActionDontNotify},
  643. })
  644. }
  645. if err != nil && !errors.Is(err, mautrix.MNotFound) {
  646. user.log.Warnfln("Failed to update push rule for %s through double puppet: %v", portal.MXID, err)
  647. }
  648. }
  649. type CustomTagData struct {
  650. Order json.Number `json:"order"`
  651. DoublePuppet bool `json:"net.maunium.whatsapp.puppet"`
  652. }
  653. type CustomTagEventContent struct {
  654. Tags map[string]CustomTagData `json:"tags"`
  655. }
  656. func (user *User) updateChatTag(intent *appservice.IntentAPI, portal *Portal, tag string, active bool) {
  657. if len(portal.MXID) == 0 || len(tag) == 0 {
  658. return
  659. } else if intent == nil {
  660. doublePuppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  661. if doublePuppet == nil || doublePuppet.CustomIntent() == nil {
  662. return
  663. }
  664. intent = doublePuppet.CustomIntent()
  665. }
  666. var existingTags CustomTagEventContent
  667. err := intent.GetTagsWithCustomData(portal.MXID, &existingTags)
  668. if err != nil && !errors.Is(err, mautrix.MNotFound) {
  669. user.log.Warnfln("Failed to get tags of %s: %v", portal.MXID, err)
  670. }
  671. currentTag, ok := existingTags.Tags[tag]
  672. if active && !ok {
  673. user.log.Debugln("Adding tag", tag, "to", portal.MXID)
  674. data := CustomTagData{"0.5", true}
  675. err = intent.AddTagWithCustomData(portal.MXID, tag, &data)
  676. } else if !active && ok && currentTag.DoublePuppet {
  677. user.log.Debugln("Removing tag", tag, "from", portal.MXID)
  678. err = intent.RemoveTag(portal.MXID, tag)
  679. } else {
  680. err = nil
  681. }
  682. if err != nil {
  683. user.log.Warnfln("Failed to update tag %s for %s through double puppet: %v", tag, portal.MXID, err)
  684. }
  685. }
  686. type CustomReadReceipt struct {
  687. Timestamp int64 `json:"ts,omitempty"`
  688. DoublePuppet bool `json:"net.maunium.whatsapp.puppet,omitempty"`
  689. }
  690. func (user *User) syncChatDoublePuppetDetails(doublePuppet *Puppet, chat Chat, justCreated bool) {
  691. if doublePuppet == nil || doublePuppet.CustomIntent() == nil || len(chat.Portal.MXID) == 0 {
  692. return
  693. }
  694. intent := doublePuppet.CustomIntent()
  695. if chat.UnreadCount == 0 && (justCreated || !user.bridge.Config.Bridge.MarkReadOnlyOnCreate) {
  696. lastMessage := user.bridge.DB.Message.GetLastInChatBefore(chat.Portal.Key, chat.ReceivedAt.Unix())
  697. if lastMessage != nil {
  698. err := intent.MarkReadWithContent(chat.Portal.MXID, lastMessage.MXID, &CustomReadReceipt{DoublePuppet: true})
  699. if err != nil {
  700. user.log.Warnfln("Failed to mark %s in %s as read after backfill: %v", lastMessage.MXID, chat.Portal.MXID, err)
  701. }
  702. }
  703. } else if chat.UnreadCount == -1 {
  704. user.log.Debugfln("Invalid unread count (missing field?) in chat info %+v", chat.Source)
  705. }
  706. if justCreated || !user.bridge.Config.Bridge.TagOnlyOnCreate {
  707. user.updateChatMute(intent, chat.Portal, chat.MutedUntil)
  708. user.updateChatTag(intent, chat.Portal, user.bridge.Config.Bridge.ArchiveTag, chat.IsArchived)
  709. user.updateChatTag(intent, chat.Portal, user.bridge.Config.Bridge.PinnedTag, chat.IsPinned)
  710. }
  711. }
  712. func (user *User) syncPortal(chat Chat) {
  713. // Don't sync unless chat meta sync is enabled or portal doesn't exist
  714. if user.bridge.Config.Bridge.ChatMetaSync || len(chat.Portal.MXID) == 0 {
  715. failedToCreate := chat.Portal.Sync(user, chat.Contact)
  716. if failedToCreate {
  717. return
  718. }
  719. }
  720. err := chat.Portal.BackfillHistory(user, chat.LastMessageTime)
  721. if err != nil {
  722. chat.Portal.log.Errorln("Error backfilling history:", err)
  723. }
  724. }
  725. func (user *User) collectChatList(chatMap map[string]whatsapp.Chat) ChatList {
  726. if chatMap == nil {
  727. chatMap = user.Conn.Store.Chats
  728. }
  729. user.log.Infoln("Reading chat list")
  730. chats := make(ChatList, 0, len(chatMap))
  731. existingKeys := user.GetInCommunityMap()
  732. portalKeys := make([]database.PortalKeyWithMeta, 0, len(chatMap))
  733. for _, chat := range chatMap {
  734. portal := user.GetPortalByJID(chat.JID)
  735. user.Conn.Store.ContactsLock.RLock()
  736. contact, _ := user.Conn.Store.Contacts[chat.JID]
  737. user.Conn.Store.ContactsLock.RUnlock()
  738. chats = append(chats, Chat{
  739. Chat: chat,
  740. Portal: portal,
  741. Contact: contact,
  742. })
  743. var inCommunity, ok bool
  744. if inCommunity, ok = existingKeys[portal.Key]; !ok || !inCommunity {
  745. inCommunity = user.addPortalToCommunity(portal)
  746. if portal.IsPrivateChat() {
  747. puppet := user.bridge.GetPuppetByJID(portal.Key.JID)
  748. user.addPuppetToCommunity(puppet)
  749. }
  750. }
  751. portalKeys = append(portalKeys, database.PortalKeyWithMeta{PortalKey: portal.Key, InCommunity: inCommunity})
  752. }
  753. user.log.Infoln("Read chat list, updating user-portal mapping")
  754. err := user.SetPortalKeys(portalKeys)
  755. if err != nil {
  756. user.log.Warnln("Failed to update user-portal mapping:", err)
  757. }
  758. sort.Sort(chats)
  759. return chats
  760. }
  761. func (user *User) syncPortals(chatMap map[string]whatsapp.Chat, createAll bool) {
  762. // TODO use contexts instead of checking if user.Conn is the same?
  763. connAtStart := user.Conn
  764. chats := user.collectChatList(chatMap)
  765. limit := user.bridge.Config.Bridge.InitialChatSync
  766. if limit < 0 {
  767. limit = len(chats)
  768. }
  769. if user.Conn != connAtStart {
  770. user.log.Debugln("Connection seems to have changed before sync, cancelling")
  771. return
  772. }
  773. now := time.Now().Unix()
  774. user.log.Infoln("Syncing portals")
  775. doublePuppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  776. for i, chat := range chats {
  777. if chat.LastMessageTime+user.bridge.Config.Bridge.SyncChatMaxAge < now {
  778. break
  779. }
  780. create := (chat.LastMessageTime >= user.LastConnection && user.LastConnection > 0) || i < limit
  781. if len(chat.Portal.MXID) > 0 || create || createAll {
  782. user.log.Debugfln("Syncing chat %+v", chat.Chat.Source)
  783. justCreated := len(chat.Portal.MXID) == 0
  784. user.syncPortal(chat)
  785. user.syncChatDoublePuppetDetails(doublePuppet, chat, justCreated)
  786. }
  787. }
  788. if user.Conn != connAtStart {
  789. user.log.Debugln("Connection seems to have changed during sync, cancelling")
  790. return
  791. }
  792. user.UpdateDirectChats(nil)
  793. user.log.Infoln("Finished syncing portals")
  794. select {
  795. case user.syncPortalsDone <- struct{}{}:
  796. default:
  797. }
  798. }
  799. func (user *User) getDirectChats() map[id.UserID][]id.RoomID {
  800. res := make(map[id.UserID][]id.RoomID)
  801. privateChats := user.bridge.DB.Portal.FindPrivateChats(user.JID)
  802. for _, portal := range privateChats {
  803. if len(portal.MXID) > 0 {
  804. res[user.bridge.FormatPuppetMXID(portal.Key.JID)] = []id.RoomID{portal.MXID}
  805. }
  806. }
  807. return res
  808. }
  809. func (user *User) UpdateDirectChats(chats map[id.UserID][]id.RoomID) {
  810. if !user.bridge.Config.Bridge.SyncDirectChatList {
  811. return
  812. }
  813. puppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  814. if puppet == nil || puppet.CustomIntent() == nil {
  815. return
  816. }
  817. intent := puppet.CustomIntent()
  818. method := http.MethodPatch
  819. if chats == nil {
  820. chats = user.getDirectChats()
  821. method = http.MethodPut
  822. }
  823. user.log.Debugln("Updating m.direct list on homeserver")
  824. var err error
  825. if user.bridge.Config.Homeserver.Asmux {
  826. urlPath := intent.BuildBaseURL("_matrix", "client", "unstable", "com.beeper.asmux", "dms")
  827. _, err = intent.MakeFullRequest(mautrix.FullRequest{
  828. Method: method,
  829. URL: urlPath,
  830. Headers: http.Header{"X-Asmux-Auth": {user.bridge.AS.Registration.AppToken}},
  831. RequestJSON: chats,
  832. })
  833. } else {
  834. existingChats := make(map[id.UserID][]id.RoomID)
  835. err = intent.GetAccountData(event.AccountDataDirectChats.Type, &existingChats)
  836. if err != nil {
  837. user.log.Warnln("Failed to get m.direct list to update it:", err)
  838. return
  839. }
  840. for userID, rooms := range existingChats {
  841. if _, ok := user.bridge.ParsePuppetMXID(userID); !ok {
  842. // This is not a ghost user, include it in the new list
  843. chats[userID] = rooms
  844. } else if _, ok := chats[userID]; !ok && method == http.MethodPatch {
  845. // This is a ghost user, but we're not replacing the whole list, so include it too
  846. chats[userID] = rooms
  847. }
  848. }
  849. err = intent.SetAccountData(event.AccountDataDirectChats.Type, &chats)
  850. }
  851. if err != nil {
  852. user.log.Warnln("Failed to update m.direct list:", err)
  853. }
  854. }
  855. func (user *User) HandleContactList(contacts []whatsapp.Contact) {
  856. contactMap := make(map[whatsapp.JID]whatsapp.Contact)
  857. for _, contact := range contacts {
  858. contactMap[contact.JID] = contact
  859. }
  860. go user.syncPuppets(contactMap)
  861. }
  862. func (user *User) syncPuppets(contacts map[whatsapp.JID]whatsapp.Contact) {
  863. if contacts == nil {
  864. contacts = user.Conn.Store.Contacts
  865. }
  866. _, hasSelf := contacts[user.JID]
  867. if !hasSelf {
  868. contacts[user.JID] = whatsapp.Contact{
  869. Name: user.pushName,
  870. Notify: user.pushName,
  871. JID: user.JID,
  872. }
  873. }
  874. user.log.Infoln("Syncing puppet info from contacts")
  875. for jid, contact := range contacts {
  876. if strings.HasSuffix(jid, whatsapp.NewUserSuffix) {
  877. puppet := user.bridge.GetPuppetByJID(contact.JID)
  878. puppet.Sync(user, contact)
  879. } else if strings.HasSuffix(jid, whatsapp.BroadcastSuffix) {
  880. portal := user.GetPortalByJID(contact.JID)
  881. portal.Sync(user, contact)
  882. }
  883. }
  884. user.log.Infoln("Finished syncing puppet info from contacts")
  885. }
  886. func (user *User) updateLastConnectionIfNecessary() {
  887. if user.LastConnection+60 < time.Now().Unix() {
  888. user.UpdateLastConnection()
  889. }
  890. }
  891. func (user *User) HandleError(err error) {
  892. if !errors.Is(err, whatsapp.ErrInvalidWsData) {
  893. user.log.Errorfln("WhatsApp error: %v", err)
  894. }
  895. if closed, ok := err.(*whatsapp.ErrConnectionClosed); ok {
  896. user.bridge.Metrics.TrackDisconnection(user.MXID)
  897. if closed.Code == 1000 && user.cleanDisconnection {
  898. user.cleanDisconnection = false
  899. if !user.bridge.Config.Bridge.AggressiveReconnect {
  900. user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
  901. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  902. user.log.Infoln("Clean disconnection by server")
  903. return
  904. } else {
  905. user.log.Debugln("Clean disconnection by server, but aggressive reconnection is enabled")
  906. }
  907. }
  908. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection was closed with websocket status code %d", closed.Code))
  909. } else if failed, ok := err.(*whatsapp.ErrConnectionFailed); ok {
  910. disconnectErr := user.Conn.Disconnect()
  911. if disconnectErr != nil {
  912. user.log.Warnln("Failed to disconnect after connection fail:", disconnectErr)
  913. }
  914. user.bridge.Metrics.TrackDisconnection(user.MXID)
  915. user.ConnectionErrors++
  916. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection failed: %v", failed.Err))
  917. } else if err == whatsapp.ErrPingFalse || err == whatsapp.ErrWebsocketKeepaliveFailed {
  918. disconnectErr := user.Conn.Disconnect()
  919. if disconnectErr != nil {
  920. user.log.Warnln("Failed to disconnect after failed ping:", disconnectErr)
  921. }
  922. user.bridge.Metrics.TrackDisconnection(user.MXID)
  923. user.ConnectionErrors++
  924. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection failed: %v", err))
  925. }
  926. // Otherwise unknown error, probably mostly harmless
  927. }
  928. func (user *User) tryReconnect(msg string) {
  929. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  930. if user.ConnectionErrors > user.bridge.Config.Bridge.MaxConnectionAttempts {
  931. user.sendMarkdownBridgeAlert("%s. Use the `reconnect` command to reconnect.", msg)
  932. user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
  933. return
  934. }
  935. if user.bridge.Config.Bridge.ReportConnectionRetry {
  936. user.sendBridgeNotice("%s. Reconnecting...", msg)
  937. // Don't want the same error to be repeated
  938. msg = ""
  939. }
  940. var tries uint
  941. var exponentialBackoff bool
  942. baseDelay := time.Duration(user.bridge.Config.Bridge.ConnectionRetryDelay)
  943. if baseDelay < 0 {
  944. exponentialBackoff = true
  945. baseDelay = -baseDelay + 1
  946. }
  947. delay := baseDelay
  948. ctx, cancel := context.WithCancel(context.Background())
  949. defer cancel()
  950. user.cancelReconnect = cancel
  951. for user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
  952. select {
  953. case <-ctx.Done():
  954. user.log.Debugln("tryReconnect context cancelled, aborting reconnection attempts")
  955. return
  956. default:
  957. }
  958. user.sendBridgeState(BridgeState{StateEvent: StateConnecting, Error: WAConnecting})
  959. err := user.Conn.Restore(true, ctx)
  960. if err == nil {
  961. user.ConnectionErrors = 0
  962. if user.bridge.Config.Bridge.ReportConnectionRetry {
  963. user.sendBridgeNotice("Reconnected successfully")
  964. }
  965. user.PostLogin()
  966. return
  967. } else if errors.Is(err, whatsapp.ErrBadRequest) {
  968. user.log.Warnln("Got init 400 error when trying to reconnect, resetting connection...")
  969. err = user.Conn.Disconnect()
  970. if err != nil {
  971. user.log.Debugln("Error while disconnecting for connection reset:", err)
  972. }
  973. } else if errors.Is(err, whatsapp.ErrUnpaired) {
  974. user.log.Errorln("Got init 401 (unpaired) error when trying to reconnect, not retrying")
  975. user.removeFromJIDMap(StateBadCredentials)
  976. //user.JID = ""
  977. user.SetSession(nil)
  978. user.DeleteConnection()
  979. user.sendMarkdownBridgeAlert("\u26a0 Failed to reconnect to WhatsApp: unpaired from phone. " +
  980. "To re-pair your phone, log in again.")
  981. return
  982. } else if errors.Is(err, whatsapp.ErrAlreadyLoggedIn) {
  983. user.log.Warnln("Reconnection said we're already logged in, not trying anymore")
  984. return
  985. } else {
  986. user.log.Errorln("Error while trying to reconnect after disconnection:", err)
  987. }
  988. tries++
  989. user.ConnectionErrors++
  990. if user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
  991. if exponentialBackoff {
  992. delay = (1 << tries) + baseDelay
  993. }
  994. if user.bridge.Config.Bridge.ReportConnectionRetry {
  995. user.sendBridgeNotice("Reconnection attempt failed: %v. Retrying in %d seconds...", err, delay)
  996. }
  997. time.Sleep(delay * time.Second)
  998. }
  999. }
  1000. user.sendBridgeState(BridgeState{StateEvent: StateBadCredentials, Error: WANotConnected})
  1001. if user.bridge.Config.Bridge.ReportConnectionRetry {
  1002. user.sendMarkdownBridgeAlert("%d reconnection attempts failed. Use the `reconnect` command to try to reconnect manually.", tries)
  1003. } else {
  1004. user.sendMarkdownBridgeAlert("\u26a0 %s. Additionally, %d reconnection attempts failed. Use the `reconnect` command to try to reconnect.", msg, tries)
  1005. }
  1006. }
  1007. func (user *User) PortalKey(jid whatsapp.JID) database.PortalKey {
  1008. return database.NewPortalKey(jid, user.JID)
  1009. }
  1010. func (user *User) GetPortalByJID(jid whatsapp.JID) *Portal {
  1011. return user.bridge.GetPortalByJID(user.PortalKey(jid))
  1012. }
  1013. func (user *User) runMessageRingBuffer() {
  1014. for msg := range user.messageInput {
  1015. select {
  1016. case user.messageOutput <- msg:
  1017. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  1018. default:
  1019. dropped := <-user.messageOutput
  1020. user.log.Warnln("Buffer is full, dropping message in", dropped.chat)
  1021. user.messageOutput <- msg
  1022. }
  1023. }
  1024. }
  1025. func (user *User) handleMessageLoop() {
  1026. for {
  1027. select {
  1028. case msg := <-user.messageOutput:
  1029. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  1030. user.GetPortalByJID(msg.chat).messages <- msg
  1031. case <-user.syncStart:
  1032. user.log.Debugln("Processing of incoming messages is locked")
  1033. user.bridge.Metrics.TrackSyncLock(user.JID, true)
  1034. user.syncWait.Wait()
  1035. user.bridge.Metrics.TrackSyncLock(user.JID, false)
  1036. user.log.Debugln("Processing of incoming messages unlocked")
  1037. }
  1038. }
  1039. }
  1040. func (user *User) HandleNewContact(contact whatsapp.Contact) {
  1041. user.log.Debugfln("Contact message: %+v", contact)
  1042. if strings.HasSuffix(contact.JID, whatsapp.OldUserSuffix) {
  1043. contact.JID = strings.Replace(contact.JID, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, -1)
  1044. }
  1045. if strings.HasSuffix(contact.JID, whatsapp.NewUserSuffix) {
  1046. puppet := user.bridge.GetPuppetByJID(contact.JID)
  1047. puppet.UpdateName(user, contact)
  1048. } else if strings.HasSuffix(contact.JID, whatsapp.BroadcastSuffix) {
  1049. portal := user.GetPortalByJID(contact.JID)
  1050. portal.UpdateName(contact.Name, "", nil, true)
  1051. }
  1052. }
  1053. func (user *User) HandleBatteryMessage(battery whatsapp.BatteryMessage) {
  1054. user.log.Debugfln("Battery message: %+v", battery)
  1055. var notice string
  1056. if !battery.Plugged && battery.Percentage < 15 && user.batteryWarningsSent < 1 {
  1057. notice = fmt.Sprintf("Phone battery low (%d %% remaining)", battery.Percentage)
  1058. user.batteryWarningsSent = 1
  1059. } else if !battery.Plugged && battery.Percentage < 5 && user.batteryWarningsSent < 2 {
  1060. notice = fmt.Sprintf("Phone battery very low (%d %% remaining)", battery.Percentage)
  1061. user.batteryWarningsSent = 2
  1062. } else if battery.Percentage > 15 || battery.Plugged {
  1063. user.batteryWarningsSent = 0
  1064. }
  1065. if notice != "" {
  1066. go user.sendBridgeNotice("%s", notice)
  1067. }
  1068. }
  1069. type FakeMessage struct {
  1070. Text string
  1071. ID string
  1072. Alert bool
  1073. }
  1074. func (user *User) HandleCallInfo(info whatsapp.CallInfo) {
  1075. if info.Data != nil {
  1076. return
  1077. }
  1078. data := FakeMessage{
  1079. ID: info.ID,
  1080. }
  1081. switch info.Type {
  1082. case whatsapp.CallOffer:
  1083. if !user.bridge.Config.Bridge.CallNotices.Start {
  1084. return
  1085. }
  1086. data.Text = "Incoming call"
  1087. data.Alert = true
  1088. case whatsapp.CallOfferVideo:
  1089. if !user.bridge.Config.Bridge.CallNotices.Start {
  1090. return
  1091. }
  1092. data.Text = "Incoming video call"
  1093. data.Alert = true
  1094. case whatsapp.CallTerminate:
  1095. if !user.bridge.Config.Bridge.CallNotices.End {
  1096. return
  1097. }
  1098. data.Text = "Call ended"
  1099. data.ID += "E"
  1100. default:
  1101. return
  1102. }
  1103. portal := user.GetPortalByJID(info.From)
  1104. if portal != nil {
  1105. portal.messages <- PortalMessage{info.From, user, data, 0}
  1106. }
  1107. }
  1108. func (user *User) HandlePresence(info whatsapp.PresenceEvent) {
  1109. puppet := user.bridge.GetPuppetByJID(info.SenderJID)
  1110. switch info.Status {
  1111. case whatsapp.PresenceUnavailable:
  1112. _ = puppet.DefaultIntent().SetPresence("offline")
  1113. case whatsapp.PresenceAvailable:
  1114. if len(puppet.typingIn) > 0 && puppet.typingAt+15 > time.Now().Unix() {
  1115. portal := user.bridge.GetPortalByMXID(puppet.typingIn)
  1116. _, _ = puppet.IntentFor(portal).UserTyping(puppet.typingIn, false, 0)
  1117. puppet.typingIn = ""
  1118. puppet.typingAt = 0
  1119. } else {
  1120. _ = puppet.DefaultIntent().SetPresence("online")
  1121. }
  1122. case whatsapp.PresenceComposing:
  1123. portal := user.GetPortalByJID(info.JID)
  1124. if len(puppet.typingIn) > 0 && puppet.typingAt+15 > time.Now().Unix() {
  1125. if puppet.typingIn == portal.MXID {
  1126. return
  1127. }
  1128. _, _ = puppet.IntentFor(portal).UserTyping(puppet.typingIn, false, 0)
  1129. }
  1130. puppet.typingIn = portal.MXID
  1131. puppet.typingAt = time.Now().Unix()
  1132. _, _ = puppet.IntentFor(portal).UserTyping(portal.MXID, true, 15*1000)
  1133. }
  1134. }
  1135. func (user *User) HandleMsgInfo(info whatsapp.JSONMsgInfo) {
  1136. if (info.Command == whatsapp.MsgInfoCommandAck || info.Command == whatsapp.MsgInfoCommandAcks) && info.Acknowledgement == whatsapp.AckMessageRead {
  1137. portal := user.GetPortalByJID(info.ToJID)
  1138. if len(portal.MXID) == 0 {
  1139. return
  1140. }
  1141. intent := user.bridge.GetPuppetByJID(info.SenderJID).IntentFor(portal)
  1142. for _, msgID := range info.IDs {
  1143. msg := user.bridge.DB.Message.GetByJID(portal.Key, msgID)
  1144. if msg == nil || msg.IsFakeMXID() {
  1145. continue
  1146. }
  1147. err := intent.MarkReadWithContent(portal.MXID, msg.MXID, &CustomReadReceipt{DoublePuppet: intent.IsCustomPuppet})
  1148. if err != nil {
  1149. user.log.Warnfln("Failed to mark message %s as read by %s: %v", msg.MXID, info.SenderJID, err)
  1150. }
  1151. }
  1152. }
  1153. }
  1154. func (user *User) HandleReceivedMessage(received whatsapp.ReceivedMessage) {
  1155. if received.Type == "read" {
  1156. go user.markSelfRead(received.Jid, received.Index)
  1157. } else {
  1158. user.log.Debugfln("Unknown received message type: %+v", received)
  1159. }
  1160. }
  1161. func (user *User) HandleReadMessage(read whatsapp.ReadMessage) {
  1162. user.log.Debugfln("Received chat read message: %+v", read)
  1163. go user.markSelfRead(read.Jid, "")
  1164. }
  1165. func (user *User) markSelfRead(jid, messageID string) {
  1166. if strings.HasSuffix(jid, whatsapp.OldUserSuffix) {
  1167. jid = strings.Replace(jid, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, -1)
  1168. }
  1169. puppet := user.bridge.GetPuppetByJID(user.JID)
  1170. if puppet == nil {
  1171. return
  1172. }
  1173. intent := puppet.CustomIntent()
  1174. if intent == nil {
  1175. return
  1176. }
  1177. portal := user.GetPortalByJID(jid)
  1178. if portal == nil {
  1179. return
  1180. }
  1181. var message *database.Message
  1182. if messageID == "" {
  1183. message = user.bridge.DB.Message.GetLastInChat(portal.Key)
  1184. if message == nil {
  1185. return
  1186. }
  1187. user.log.Debugfln("User read chat %s/%s in WhatsApp mobile (last known event: %s/%s)", portal.Key.JID, portal.MXID, message.JID, message.MXID)
  1188. } else {
  1189. message = user.bridge.DB.Message.GetByJID(portal.Key, messageID)
  1190. if message == nil || message.IsFakeMXID() {
  1191. return
  1192. }
  1193. user.log.Debugfln("User read message %s/%s in %s/%s in WhatsApp mobile", message.JID, message.MXID, portal.Key.JID, portal.MXID)
  1194. }
  1195. err := intent.MarkReadWithContent(portal.MXID, message.MXID, &CustomReadReceipt{DoublePuppet: true})
  1196. if err != nil {
  1197. user.log.Warnfln("Failed to bridge own read receipt in %s: %v", jid, err)
  1198. }
  1199. }
  1200. func (user *User) HandleCommand(cmd whatsapp.JSONCommand) {
  1201. switch cmd.Type {
  1202. case whatsapp.CommandPicture:
  1203. if strings.HasSuffix(cmd.JID, whatsapp.NewUserSuffix) {
  1204. puppet := user.bridge.GetPuppetByJID(cmd.JID)
  1205. go puppet.UpdateAvatar(user, cmd.ProfilePicInfo)
  1206. } else if user.bridge.Config.Bridge.ChatMetaSync {
  1207. portal := user.GetPortalByJID(cmd.JID)
  1208. go portal.UpdateAvatar(user, cmd.ProfilePicInfo, true)
  1209. }
  1210. case whatsapp.CommandDisconnect:
  1211. if cmd.Kind == "replaced" {
  1212. user.cleanDisconnection = true
  1213. go user.sendMarkdownBridgeAlert("\u26a0 Your WhatsApp connection was closed by the server because you opened another WhatsApp Web client.\n\n" +
  1214. "Use the `reconnect` command to disconnect the other client and resume bridging.")
  1215. } else {
  1216. user.log.Warnln("Unknown kind of disconnect:", string(cmd.Raw))
  1217. go user.sendMarkdownBridgeAlert("\u26a0 Your WhatsApp connection was closed by the server (reason code: %s).\n\n"+
  1218. "Use the `reconnect` command to reconnect.", cmd.Kind)
  1219. }
  1220. }
  1221. }
  1222. func (user *User) HandleChatUpdate(cmd whatsapp.ChatUpdate) {
  1223. if cmd.Command != whatsapp.ChatUpdateCommandAction {
  1224. return
  1225. }
  1226. portal := user.GetPortalByJID(cmd.JID)
  1227. if len(portal.MXID) == 0 {
  1228. if cmd.Data.Action == whatsapp.ChatActionIntroduce || cmd.Data.Action == whatsapp.ChatActionCreate {
  1229. go func() {
  1230. err := portal.CreateMatrixRoom(user)
  1231. if err != nil {
  1232. user.log.Errorln("Failed to create portal room after receiving join event:", err)
  1233. }
  1234. }()
  1235. }
  1236. return
  1237. }
  1238. // These don't come down the message history :(
  1239. switch cmd.Data.Action {
  1240. case whatsapp.ChatActionAddTopic:
  1241. go portal.UpdateTopic(cmd.Data.AddTopic.Topic, cmd.Data.SenderJID, nil, true)
  1242. case whatsapp.ChatActionRemoveTopic:
  1243. go portal.UpdateTopic("", cmd.Data.SenderJID, nil, true)
  1244. case whatsapp.ChatActionRemove:
  1245. // We ignore leaving groups in the message history to avoid accidentally leaving rejoined groups,
  1246. // but if we get a real-time command that says we left, it should be safe to bridge it.
  1247. if !user.bridge.Config.Bridge.ChatMetaSync {
  1248. for _, jid := range cmd.Data.UserChange.JIDs {
  1249. if jid == user.JID {
  1250. go portal.HandleWhatsAppKick(nil, cmd.Data.SenderJID, cmd.Data.UserChange.JIDs)
  1251. break
  1252. }
  1253. }
  1254. }
  1255. }
  1256. if !user.bridge.Config.Bridge.ChatMetaSync {
  1257. // Ignore chat update commands, we're relying on the message history.
  1258. return
  1259. }
  1260. switch cmd.Data.Action {
  1261. case whatsapp.ChatActionNameChange:
  1262. go portal.UpdateName(cmd.Data.NameChange.Name, cmd.Data.SenderJID, nil, true)
  1263. case whatsapp.ChatActionPromote:
  1264. go portal.ChangeAdminStatus(cmd.Data.UserChange.JIDs, true)
  1265. case whatsapp.ChatActionDemote:
  1266. go portal.ChangeAdminStatus(cmd.Data.UserChange.JIDs, false)
  1267. case whatsapp.ChatActionAnnounce:
  1268. go portal.RestrictMessageSending(cmd.Data.Announce)
  1269. case whatsapp.ChatActionRestrict:
  1270. go portal.RestrictMetadataChanges(cmd.Data.Restrict)
  1271. case whatsapp.ChatActionRemove:
  1272. go portal.HandleWhatsAppKick(nil, cmd.Data.SenderJID, cmd.Data.UserChange.JIDs)
  1273. case whatsapp.ChatActionAdd:
  1274. go portal.HandleWhatsAppInvite(user, cmd.Data.SenderJID, nil, cmd.Data.UserChange.JIDs)
  1275. case whatsapp.ChatActionIntroduce:
  1276. if cmd.Data.SenderJID != "unknown" {
  1277. go portal.Sync(user, whatsapp.Contact{JID: portal.Key.JID})
  1278. }
  1279. }
  1280. }
  1281. func (user *User) HandleConnInfo(info whatsapp.ConnInfo) {
  1282. if user.Session != nil && info.Connected && len(info.ClientToken) > 0 {
  1283. user.log.Debugln("Received new tokens")
  1284. user.Session.ClientToken = info.ClientToken
  1285. user.Session.ServerToken = info.ServerToken
  1286. user.Session.Wid = info.WID
  1287. user.Update()
  1288. }
  1289. if len(info.PushName) > 0 {
  1290. user.pushName = info.PushName
  1291. }
  1292. }
  1293. func (user *User) HandleJSONMessage(evt whatsapp.RawJSONMessage) {
  1294. if !json.Valid(evt.RawMessage) {
  1295. return
  1296. }
  1297. user.log.Debugfln("JSON message with tag %s: %s", evt.Tag, evt.RawMessage)
  1298. user.updateLastConnectionIfNecessary()
  1299. }
  1300. func (user *User) NeedsRelaybot(portal *Portal) bool {
  1301. return !user.HasSession() || !user.IsInPortal(portal.Key)
  1302. }