user.go 44 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409
  1. // mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
  2. // Copyright (C) 2020 Tulir Asokan
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package main
  17. import (
  18. "context"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "net/http"
  23. "sort"
  24. "strings"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/skip2/go-qrcode"
  29. log "maunium.net/go/maulogger/v2"
  30. "maunium.net/go/mautrix/appservice"
  31. "maunium.net/go/mautrix/pushrules"
  32. "github.com/Rhymen/go-whatsapp"
  33. waBinary "github.com/Rhymen/go-whatsapp/binary"
  34. waProto "github.com/Rhymen/go-whatsapp/binary/proto"
  35. "maunium.net/go/mautrix"
  36. "maunium.net/go/mautrix/event"
  37. "maunium.net/go/mautrix/format"
  38. "maunium.net/go/mautrix/id"
  39. "maunium.net/go/mautrix-whatsapp/database"
  40. )
  41. type User struct {
  42. *database.User
  43. Conn *whatsapp.Conn
  44. bridge *Bridge
  45. log log.Logger
  46. Admin bool
  47. Whitelisted bool
  48. RelaybotWhitelisted bool
  49. IsRelaybot bool
  50. ConnectionErrors int
  51. CommunityID string
  52. cleanDisconnection bool
  53. batteryWarningsSent int
  54. lastReconnection int64
  55. pushName string
  56. chatListReceived chan struct{}
  57. syncPortalsDone chan struct{}
  58. messageInput chan PortalMessage
  59. messageOutput chan PortalMessage
  60. syncStart chan struct{}
  61. syncWait sync.WaitGroup
  62. syncing int32
  63. mgmtCreateLock sync.Mutex
  64. connLock sync.Mutex
  65. cancelReconnect func()
  66. prevBridgeStatus *BridgeState
  67. }
  68. func (bridge *Bridge) GetUserByMXID(userID id.UserID) *User {
  69. _, isPuppet := bridge.ParsePuppetMXID(userID)
  70. if isPuppet || userID == bridge.Bot.UserID {
  71. return nil
  72. }
  73. bridge.usersLock.Lock()
  74. defer bridge.usersLock.Unlock()
  75. user, ok := bridge.usersByMXID[userID]
  76. if !ok {
  77. return bridge.loadDBUser(bridge.DB.User.GetByMXID(userID), &userID)
  78. }
  79. return user
  80. }
  81. func (bridge *Bridge) GetUserByJID(userID whatsapp.JID) *User {
  82. bridge.usersLock.Lock()
  83. defer bridge.usersLock.Unlock()
  84. user, ok := bridge.usersByJID[userID]
  85. if !ok {
  86. return bridge.loadDBUser(bridge.DB.User.GetByJID(userID), nil)
  87. }
  88. return user
  89. }
  90. func (user *User) addToJIDMap() {
  91. user.bridge.usersLock.Lock()
  92. user.bridge.usersByJID[user.JID] = user
  93. user.bridge.usersLock.Unlock()
  94. }
  95. func (user *User) removeFromJIDMap() {
  96. user.bridge.usersLock.Lock()
  97. jidUser, ok := user.bridge.usersByJID[user.JID]
  98. if ok && user == jidUser {
  99. delete(user.bridge.usersByJID, user.JID)
  100. }
  101. user.bridge.usersLock.Unlock()
  102. user.bridge.Metrics.TrackLoginState(user.JID, false)
  103. user.sendBridgeState(BridgeState{Error: WANotLoggedIn})
  104. }
  105. func (bridge *Bridge) GetAllUsers() []*User {
  106. bridge.usersLock.Lock()
  107. defer bridge.usersLock.Unlock()
  108. dbUsers := bridge.DB.User.GetAll()
  109. output := make([]*User, len(dbUsers))
  110. for index, dbUser := range dbUsers {
  111. user, ok := bridge.usersByMXID[dbUser.MXID]
  112. if !ok {
  113. user = bridge.loadDBUser(dbUser, nil)
  114. }
  115. output[index] = user
  116. }
  117. return output
  118. }
  119. func (bridge *Bridge) loadDBUser(dbUser *database.User, mxid *id.UserID) *User {
  120. if dbUser == nil {
  121. if mxid == nil {
  122. return nil
  123. }
  124. dbUser = bridge.DB.User.New()
  125. dbUser.MXID = *mxid
  126. dbUser.Insert()
  127. }
  128. user := bridge.NewUser(dbUser)
  129. bridge.usersByMXID[user.MXID] = user
  130. if len(user.JID) > 0 {
  131. bridge.usersByJID[user.JID] = user
  132. }
  133. if len(user.ManagementRoom) > 0 {
  134. bridge.managementRooms[user.ManagementRoom] = user
  135. }
  136. return user
  137. }
  138. func (user *User) GetPortals() []*Portal {
  139. keys := user.User.GetPortalKeys()
  140. portals := make([]*Portal, len(keys))
  141. user.bridge.portalsLock.Lock()
  142. for i, key := range keys {
  143. portal, ok := user.bridge.portalsByJID[key]
  144. if !ok {
  145. portal = user.bridge.loadDBPortal(user.bridge.DB.Portal.GetByJID(key), &key)
  146. }
  147. portals[i] = portal
  148. }
  149. user.bridge.portalsLock.Unlock()
  150. return portals
  151. }
  152. func (bridge *Bridge) NewUser(dbUser *database.User) *User {
  153. user := &User{
  154. User: dbUser,
  155. bridge: bridge,
  156. log: bridge.Log.Sub("User").Sub(string(dbUser.MXID)),
  157. IsRelaybot: false,
  158. chatListReceived: make(chan struct{}, 1),
  159. syncPortalsDone: make(chan struct{}, 1),
  160. syncStart: make(chan struct{}, 1),
  161. messageInput: make(chan PortalMessage),
  162. messageOutput: make(chan PortalMessage, bridge.Config.Bridge.UserMessageBuffer),
  163. }
  164. user.RelaybotWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelaybotWhitelisted(user.MXID)
  165. user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID)
  166. user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID)
  167. go user.handleMessageLoop()
  168. go user.runMessageRingBuffer()
  169. return user
  170. }
  171. func (user *User) GetManagementRoom() id.RoomID {
  172. if len(user.ManagementRoom) == 0 {
  173. user.mgmtCreateLock.Lock()
  174. defer user.mgmtCreateLock.Unlock()
  175. if len(user.ManagementRoom) > 0 {
  176. return user.ManagementRoom
  177. }
  178. resp, err := user.bridge.Bot.CreateRoom(&mautrix.ReqCreateRoom{
  179. Topic: "WhatsApp bridge notices",
  180. IsDirect: true,
  181. })
  182. if err != nil {
  183. user.log.Errorln("Failed to auto-create management room:", err)
  184. } else {
  185. user.SetManagementRoom(resp.RoomID)
  186. }
  187. }
  188. return user.ManagementRoom
  189. }
  190. func (user *User) SetManagementRoom(roomID id.RoomID) {
  191. existingUser, ok := user.bridge.managementRooms[roomID]
  192. if ok {
  193. existingUser.ManagementRoom = ""
  194. existingUser.Update()
  195. }
  196. user.ManagementRoom = roomID
  197. user.bridge.managementRooms[user.ManagementRoom] = user
  198. user.Update()
  199. }
  200. func (user *User) SetSession(session *whatsapp.Session) {
  201. if session == nil {
  202. user.Session = nil
  203. user.LastConnection = 0
  204. } else if len(session.Wid) > 0 {
  205. user.Session = session
  206. } else {
  207. return
  208. }
  209. user.Update()
  210. }
  211. func (user *User) Connect(evenIfNoSession bool) bool {
  212. user.connLock.Lock()
  213. if user.Conn != nil {
  214. user.connLock.Unlock()
  215. if user.Conn.IsConnected() {
  216. return true
  217. } else {
  218. return user.RestoreSession()
  219. }
  220. } else if !evenIfNoSession && user.Session == nil {
  221. user.connLock.Unlock()
  222. return false
  223. }
  224. user.log.Debugln("Connecting to WhatsApp")
  225. if user.Session != nil {
  226. user.sendBridgeState(BridgeState{Error: WAConnecting})
  227. }
  228. timeout := time.Duration(user.bridge.Config.Bridge.ConnectionTimeout)
  229. if timeout == 0 {
  230. timeout = 20
  231. }
  232. user.Conn = whatsapp.NewConn(&whatsapp.Options{
  233. Timeout: timeout * time.Second,
  234. LongClientName: user.bridge.Config.WhatsApp.OSName,
  235. ShortClientName: user.bridge.Config.WhatsApp.BrowserName,
  236. ClientVersion: WAVersion,
  237. Log: user.log.Sub("Conn"),
  238. Handler: []whatsapp.Handler{user},
  239. })
  240. user.setupAdminTestHooks()
  241. user.connLock.Unlock()
  242. return user.RestoreSession()
  243. }
  244. func (user *User) DeleteConnection() {
  245. user.connLock.Lock()
  246. if user.Conn == nil {
  247. user.connLock.Unlock()
  248. return
  249. }
  250. err := user.Conn.Disconnect()
  251. if err != nil && err != whatsapp.ErrNotConnected {
  252. user.log.Warnln("Error disconnecting: %v", err)
  253. }
  254. user.Conn.RemoveHandlers()
  255. user.Conn = nil
  256. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  257. user.sendBridgeState(BridgeState{Error: WANotConnected})
  258. user.connLock.Unlock()
  259. }
  260. func (user *User) RestoreSession() bool {
  261. if user.Session != nil {
  262. user.Conn.SetSession(*user.Session)
  263. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
  264. defer cancel()
  265. err := user.Conn.Restore(true, ctx)
  266. if err == whatsapp.ErrAlreadyLoggedIn {
  267. return true
  268. } else if err != nil {
  269. user.log.Errorln("Failed to restore session:", err)
  270. if errors.Is(err, whatsapp.ErrUnpaired) {
  271. user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp: unpaired from phone. " +
  272. "To re-pair your phone, log in again.")
  273. user.removeFromJIDMap()
  274. //user.JID = ""
  275. user.SetSession(nil)
  276. user.DeleteConnection()
  277. return false
  278. } else {
  279. user.sendBridgeState(BridgeState{Error: WANotConnected})
  280. user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp. Make sure WhatsApp " +
  281. "on your phone is reachable and use `reconnect` to try connecting again.")
  282. }
  283. user.log.Debugln("Disconnecting due to failed session restore...")
  284. err = user.Conn.Disconnect()
  285. if err != nil {
  286. user.log.Errorln("Failed to disconnect after failed session restore:", err)
  287. }
  288. return false
  289. }
  290. user.ConnectionErrors = 0
  291. user.log.Debugln("Session restored successfully")
  292. user.PostLogin()
  293. }
  294. return true
  295. }
  296. func (user *User) HasSession() bool {
  297. return user.Session != nil
  298. }
  299. func (user *User) IsConnected() bool {
  300. return user.Conn != nil && user.Conn.IsConnected() && user.Conn.IsLoggedIn()
  301. }
  302. func (user *User) IsLoginInProgress() bool {
  303. return user.Conn != nil && user.Conn.IsLoginInProgress()
  304. }
  305. func (user *User) loginQrChannel(ce *CommandEvent, qrChan <-chan string, eventIDChan chan<- id.EventID) {
  306. var qrEventID id.EventID
  307. for code := range qrChan {
  308. if code == "stop" {
  309. return
  310. }
  311. qrCode, err := qrcode.Encode(code, qrcode.Low, 256)
  312. if err != nil {
  313. user.log.Errorln("Failed to encode QR code:", err)
  314. ce.Reply("Failed to encode QR code: %v", err)
  315. return
  316. }
  317. bot := user.bridge.AS.BotClient()
  318. resp, err := bot.UploadBytes(qrCode, "image/png")
  319. if err != nil {
  320. user.log.Errorln("Failed to upload QR code:", err)
  321. ce.Reply("Failed to upload QR code: %v", err)
  322. return
  323. }
  324. if qrEventID == "" {
  325. sendResp, err := bot.SendImage(ce.RoomID, code, resp.ContentURI)
  326. if err != nil {
  327. user.log.Errorln("Failed to send QR code to user:", err)
  328. return
  329. }
  330. qrEventID = sendResp.EventID
  331. eventIDChan <- qrEventID
  332. } else {
  333. _, err = bot.SendMessageEvent(ce.RoomID, event.EventMessage, &event.MessageEventContent{
  334. MsgType: event.MsgImage,
  335. Body: code,
  336. URL: resp.ContentURI.CUString(),
  337. NewContent: &event.MessageEventContent{
  338. MsgType: event.MsgImage,
  339. Body: code,
  340. URL: resp.ContentURI.CUString(),
  341. },
  342. RelatesTo: &event.RelatesTo{
  343. Type: event.RelReplace,
  344. EventID: qrEventID,
  345. },
  346. })
  347. if err != nil {
  348. user.log.Errorln("Failed to send edited QR code to user:", err)
  349. }
  350. }
  351. }
  352. }
  353. func (user *User) Login(ce *CommandEvent) {
  354. qrChan := make(chan string, 3)
  355. eventIDChan := make(chan id.EventID, 1)
  356. go user.loginQrChannel(ce, qrChan, eventIDChan)
  357. session, jid, err := user.Conn.Login(qrChan, nil)
  358. qrChan <- "stop"
  359. if err != nil {
  360. var eventID id.EventID
  361. select {
  362. case eventID = <-eventIDChan:
  363. default:
  364. }
  365. reply := event.MessageEventContent{
  366. MsgType: event.MsgText,
  367. }
  368. if err == whatsapp.ErrAlreadyLoggedIn {
  369. reply.Body = "You're already logged in"
  370. } else if err == whatsapp.ErrLoginInProgress {
  371. reply.Body = "You have a login in progress already."
  372. } else if err == whatsapp.ErrLoginTimedOut {
  373. reply.Body = "QR code scan timed out. Please try again."
  374. } else {
  375. user.log.Warnln("Failed to log in:", err)
  376. reply.Body = fmt.Sprintf("Unknown error while logging in: %v", err)
  377. }
  378. msg := reply
  379. if eventID != "" {
  380. msg.NewContent = &reply
  381. msg.RelatesTo = &event.RelatesTo{
  382. Type: event.RelReplace,
  383. EventID: eventID,
  384. }
  385. }
  386. _, _ = ce.Bot.SendMessageEvent(ce.RoomID, event.EventMessage, &msg)
  387. return
  388. }
  389. // TODO there's a bit of duplication between this and the provisioning API login method
  390. // Also between the two logout methods (commands.go and provisioning.go)
  391. user.log.Debugln("Successful login as", jid, "via command")
  392. user.ConnectionErrors = 0
  393. user.JID = strings.Replace(jid, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, 1)
  394. user.addToJIDMap()
  395. user.SetSession(&session)
  396. ce.Reply("Successfully logged in, synchronizing chats...")
  397. user.PostLogin()
  398. }
  399. type Chat struct {
  400. whatsapp.Chat
  401. Portal *Portal
  402. Contact whatsapp.Contact
  403. }
  404. type ChatList []Chat
  405. func (cl ChatList) Len() int {
  406. return len(cl)
  407. }
  408. func (cl ChatList) Less(i, j int) bool {
  409. return cl[i].LastMessageTime > cl[j].LastMessageTime
  410. }
  411. func (cl ChatList) Swap(i, j int) {
  412. cl[i], cl[j] = cl[j], cl[i]
  413. }
  414. func (user *User) PostLogin() {
  415. user.sendBridgeState(BridgeState{OK: true})
  416. user.bridge.Metrics.TrackConnectionState(user.JID, true)
  417. user.bridge.Metrics.TrackLoginState(user.JID, true)
  418. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  419. if !atomic.CompareAndSwapInt32(&user.syncing, 0, 1) {
  420. // TODO we should cleanly stop the old sync and start a new one instead of not starting a new one
  421. user.log.Warnln("There seems to be a post-sync already in progress, not starting a new one")
  422. return
  423. }
  424. user.log.Debugln("Locking processing of incoming messages and starting post-login sync")
  425. user.chatListReceived = make(chan struct{}, 1)
  426. user.syncPortalsDone = make(chan struct{}, 1)
  427. user.syncWait.Add(1)
  428. user.syncStart <- struct{}{}
  429. go user.intPostLogin()
  430. }
  431. func (user *User) tryAutomaticDoublePuppeting() {
  432. if !user.bridge.Config.CanDoublePuppet(user.MXID) {
  433. return
  434. }
  435. user.log.Debugln("Checking if double puppeting needs to be enabled")
  436. puppet := user.bridge.GetPuppetByJID(user.JID)
  437. if len(puppet.CustomMXID) > 0 {
  438. user.log.Debugln("User already has double-puppeting enabled")
  439. // Custom puppet already enabled
  440. return
  441. }
  442. accessToken, err := puppet.loginWithSharedSecret(user.MXID)
  443. if err != nil {
  444. user.log.Warnln("Failed to login with shared secret:", err)
  445. return
  446. }
  447. err = puppet.SwitchCustomMXID(accessToken, user.MXID)
  448. if err != nil {
  449. puppet.log.Warnln("Failed to switch to auto-logined custom puppet:", err)
  450. return
  451. }
  452. user.log.Infoln("Successfully automatically enabled custom puppet")
  453. }
  454. func (user *User) sendBridgeNotice(formatString string, args ...interface{}) {
  455. notice := fmt.Sprintf(formatString, args...)
  456. _, err := user.bridge.Bot.SendNotice(user.GetManagementRoom(), notice)
  457. if err != nil {
  458. user.log.Warnf("Failed to send bridge notice \"%s\": %v", notice, err)
  459. }
  460. }
  461. func (user *User) sendMarkdownBridgeAlert(formatString string, args ...interface{}) {
  462. notice := fmt.Sprintf(formatString, args...)
  463. content := format.RenderMarkdown(notice, true, false)
  464. _, err := user.bridge.Bot.SendMessageEvent(user.GetManagementRoom(), event.EventMessage, content)
  465. if err != nil {
  466. user.log.Warnf("Failed to send bridge alert \"%s\": %v", notice, err)
  467. }
  468. }
  469. func (user *User) postConnPing() bool {
  470. user.log.Debugln("Making post-connection ping")
  471. var err error
  472. for i := 0; ; i++ {
  473. err = user.Conn.AdminTest()
  474. if err == nil {
  475. user.log.Debugln("Post-connection ping OK")
  476. return true
  477. } else if errors.Is(err, whatsapp.ErrConnectionTimeout) && i < 5 {
  478. user.log.Warnfln("Post-connection ping timed out, sending new one")
  479. } else {
  480. break
  481. }
  482. }
  483. user.log.Errorfln("Post-connection ping failed: %v. Disconnecting and then reconnecting after a second", err)
  484. disconnectErr := user.Conn.Disconnect()
  485. if disconnectErr != nil {
  486. user.log.Warnln("Error while disconnecting after failed post-connection ping:", disconnectErr)
  487. }
  488. user.sendBridgeState(BridgeState{Error: WANotConnected})
  489. user.bridge.Metrics.TrackDisconnection(user.MXID)
  490. go func() {
  491. time.Sleep(1 * time.Second)
  492. user.tryReconnect(fmt.Sprintf("Post-connection ping failed: %v", err))
  493. }()
  494. return false
  495. }
  496. func (user *User) intPostLogin() {
  497. defer atomic.StoreInt32(&user.syncing, 0)
  498. defer user.syncWait.Done()
  499. user.lastReconnection = time.Now().Unix()
  500. user.createCommunity()
  501. user.tryAutomaticDoublePuppeting()
  502. user.log.Debugln("Waiting for chat list receive confirmation")
  503. select {
  504. case <-user.chatListReceived:
  505. user.log.Debugln("Chat list receive confirmation received in PostLogin")
  506. case <-time.After(time.Duration(user.bridge.Config.Bridge.ChatListWait) * time.Second):
  507. user.log.Warnln("Timed out waiting for chat list to arrive!")
  508. user.postConnPing()
  509. return
  510. }
  511. if !user.postConnPing() {
  512. user.log.Debugln("Post-connection ping failed, unlocking processing of incoming messages.")
  513. return
  514. }
  515. user.log.Debugln("Waiting for portal sync complete confirmation")
  516. select {
  517. case <-user.syncPortalsDone:
  518. user.log.Debugln("Post-connection portal sync complete, unlocking processing of incoming messages.")
  519. // TODO this is too short, maybe a per-portal duration?
  520. case <-time.After(time.Duration(user.bridge.Config.Bridge.PortalSyncWait) * time.Second):
  521. user.log.Warnln("Timed out waiting for portal sync to complete! Unlocking processing of incoming messages.")
  522. }
  523. }
  524. type NormalMessage interface {
  525. GetInfo() whatsapp.MessageInfo
  526. }
  527. func (user *User) HandleEvent(event interface{}) {
  528. switch v := event.(type) {
  529. case NormalMessage:
  530. info := v.GetInfo()
  531. user.messageInput <- PortalMessage{info.RemoteJid, user, v, info.Timestamp}
  532. case whatsapp.MessageRevocation:
  533. user.messageInput <- PortalMessage{v.RemoteJid, user, v, 0}
  534. case whatsapp.StreamEvent:
  535. user.HandleStreamEvent(v)
  536. case []whatsapp.Chat:
  537. user.HandleChatList(v)
  538. case []whatsapp.Contact:
  539. user.HandleContactList(v)
  540. case error:
  541. user.HandleError(v)
  542. case whatsapp.Contact:
  543. go user.HandleNewContact(v)
  544. case whatsapp.BatteryMessage:
  545. user.HandleBatteryMessage(v)
  546. case whatsapp.CallInfo:
  547. user.HandleCallInfo(v)
  548. case whatsapp.PresenceEvent:
  549. go user.HandlePresence(v)
  550. case whatsapp.JSONMsgInfo:
  551. go user.HandleMsgInfo(v)
  552. case whatsapp.ReceivedMessage:
  553. user.HandleReceivedMessage(v)
  554. case whatsapp.ReadMessage:
  555. user.HandleReadMessage(v)
  556. case whatsapp.JSONCommand:
  557. user.HandleCommand(v)
  558. case whatsapp.ChatUpdate:
  559. user.HandleChatUpdate(v)
  560. case whatsapp.ConnInfo:
  561. user.HandleConnInfo(v)
  562. case whatsapp.MuteMessage:
  563. portal := user.bridge.GetPortalByJID(user.PortalKey(v.JID))
  564. if portal != nil {
  565. go user.updateChatMute(nil, portal, v.MutedUntil)
  566. }
  567. case whatsapp.ArchiveMessage:
  568. portal := user.bridge.GetPortalByJID(user.PortalKey(v.JID))
  569. if portal != nil {
  570. go user.updateChatTag(nil, portal, user.bridge.Config.Bridge.ArchiveTag, v.IsArchived)
  571. }
  572. case whatsapp.PinMessage:
  573. portal := user.bridge.GetPortalByJID(user.PortalKey(v.JID))
  574. if portal != nil {
  575. go user.updateChatTag(nil, portal, user.bridge.Config.Bridge.PinnedTag, v.IsPinned)
  576. }
  577. case whatsapp.RawJSONMessage:
  578. user.HandleJSONMessage(v)
  579. case *waProto.WebMessageInfo:
  580. user.updateLastConnectionIfNecessary()
  581. // TODO trace log
  582. //user.log.Debugfln("WebMessageInfo: %+v", v)
  583. case *waBinary.Node:
  584. user.log.Debugfln("Unknown binary message: %+v", v)
  585. default:
  586. user.log.Debugfln("Unknown type of event in HandleEvent: %T", v)
  587. }
  588. }
  589. func (user *User) HandleStreamEvent(evt whatsapp.StreamEvent) {
  590. if evt.Type == whatsapp.StreamSleep {
  591. if user.lastReconnection+60 > time.Now().Unix() {
  592. user.lastReconnection = 0
  593. user.log.Infoln("Stream went to sleep soon after reconnection, making new post-connection ping in 20 seconds")
  594. go func() {
  595. time.Sleep(20 * time.Second)
  596. // TODO if this happens during the post-login sync, it can get stuck forever
  597. // TODO check if the above is still true
  598. user.postConnPing()
  599. }()
  600. }
  601. } else {
  602. user.log.Infofln("Stream event: %+v", evt)
  603. }
  604. }
  605. func (user *User) HandleChatList(chats []whatsapp.Chat) {
  606. user.log.Infoln("Chat list received")
  607. chatMap := make(map[string]whatsapp.Chat)
  608. user.Conn.Store.ChatsLock.RLock()
  609. for _, chat := range user.Conn.Store.Chats {
  610. chatMap[chat.JID] = chat
  611. }
  612. user.Conn.Store.ChatsLock.RUnlock()
  613. for _, chat := range chats {
  614. chatMap[chat.JID] = chat
  615. }
  616. select {
  617. case user.chatListReceived <- struct{}{}:
  618. default:
  619. }
  620. go user.syncPortals(chatMap, false)
  621. }
  622. func (user *User) updateChatMute(intent *appservice.IntentAPI, portal *Portal, mutedUntil int64) {
  623. if len(portal.MXID) == 0 || !user.bridge.Config.Bridge.MuteBridging {
  624. return
  625. } else if intent == nil {
  626. doublePuppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  627. if doublePuppet == nil || doublePuppet.CustomIntent() == nil {
  628. return
  629. }
  630. intent = doublePuppet.CustomIntent()
  631. }
  632. var err error
  633. if mutedUntil != -1 && mutedUntil < time.Now().Unix() {
  634. user.log.Debugfln("Portal %s is muted until %d, unmuting...", portal.MXID, mutedUntil)
  635. err = intent.DeletePushRule("global", pushrules.RoomRule, string(portal.MXID))
  636. } else {
  637. user.log.Debugfln("Portal %s is muted until %d, muting...", portal.MXID, mutedUntil)
  638. err = intent.PutPushRule("global", pushrules.RoomRule, string(portal.MXID), &mautrix.ReqPutPushRule{
  639. Actions: []pushrules.PushActionType{pushrules.ActionDontNotify},
  640. })
  641. }
  642. if err != nil && !errors.Is(err, mautrix.MNotFound) {
  643. user.log.Warnfln("Failed to update push rule for %s through double puppet: %v", portal.MXID, err)
  644. }
  645. }
  646. type CustomTagData struct {
  647. Order json.Number `json:"order"`
  648. DoublePuppet bool `json:"net.maunium.whatsapp.puppet"`
  649. }
  650. type CustomTagEventContent struct {
  651. Tags map[string]CustomTagData `json:"tags"`
  652. }
  653. func (user *User) updateChatTag(intent *appservice.IntentAPI, portal *Portal, tag string, active bool) {
  654. if len(portal.MXID) == 0 || len(tag) == 0 {
  655. return
  656. } else if intent == nil {
  657. doublePuppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  658. if doublePuppet == nil || doublePuppet.CustomIntent() == nil {
  659. return
  660. }
  661. intent = doublePuppet.CustomIntent()
  662. }
  663. var existingTags CustomTagEventContent
  664. err := intent.GetTagsWithCustomData(portal.MXID, &existingTags)
  665. if err != nil && !errors.Is(err, mautrix.MNotFound) {
  666. user.log.Warnfln("Failed to get tags of %s: %v", portal.MXID, err)
  667. }
  668. currentTag, ok := existingTags.Tags[tag]
  669. if active && !ok {
  670. user.log.Debugln("Adding tag", tag, "to", portal.MXID)
  671. data := CustomTagData{"0.5", true}
  672. err = intent.AddTagWithCustomData(portal.MXID, tag, &data)
  673. } else if !active && ok && currentTag.DoublePuppet {
  674. user.log.Debugln("Removing tag", tag, "from", portal.MXID)
  675. err = intent.RemoveTag(portal.MXID, tag)
  676. } else {
  677. err = nil
  678. }
  679. if err != nil {
  680. user.log.Warnfln("Failed to update tag %s for %s through double puppet: %v", tag, portal.MXID, err)
  681. }
  682. }
  683. type CustomReadReceipt struct {
  684. Timestamp int64 `json:"ts,omitempty"`
  685. DoublePuppet bool `json:"net.maunium.whatsapp.puppet,omitempty"`
  686. }
  687. func (user *User) syncChatDoublePuppetDetails(doublePuppet *Puppet, chat Chat, justCreated bool) {
  688. if doublePuppet == nil || doublePuppet.CustomIntent() == nil || len(chat.Portal.MXID) == 0 {
  689. return
  690. }
  691. intent := doublePuppet.CustomIntent()
  692. if chat.UnreadCount == 0 && (justCreated || !user.bridge.Config.Bridge.MarkReadOnlyOnCreate) {
  693. lastMessage := user.bridge.DB.Message.GetLastInChatBefore(chat.Portal.Key, chat.ReceivedAt.Unix())
  694. if lastMessage != nil {
  695. err := intent.MarkReadWithContent(chat.Portal.MXID, lastMessage.MXID, &CustomReadReceipt{DoublePuppet: true})
  696. if err != nil {
  697. user.log.Warnfln("Failed to mark %s in %s as read after backfill: %v", lastMessage.MXID, chat.Portal.MXID, err)
  698. }
  699. }
  700. } else if chat.UnreadCount == -1 {
  701. user.log.Debugfln("Invalid unread count (missing field?) in chat info %+v", chat.Source)
  702. }
  703. if justCreated || !user.bridge.Config.Bridge.TagOnlyOnCreate {
  704. user.updateChatMute(intent, chat.Portal, chat.MutedUntil)
  705. user.updateChatTag(intent, chat.Portal, user.bridge.Config.Bridge.ArchiveTag, chat.IsArchived)
  706. user.updateChatTag(intent, chat.Portal, user.bridge.Config.Bridge.PinnedTag, chat.IsPinned)
  707. }
  708. }
  709. func (user *User) syncPortal(chat Chat) {
  710. // Don't sync unless chat meta sync is enabled or portal doesn't exist
  711. if user.bridge.Config.Bridge.ChatMetaSync || len(chat.Portal.MXID) == 0 {
  712. failedToCreate := chat.Portal.Sync(user, chat.Contact)
  713. if failedToCreate {
  714. return
  715. }
  716. }
  717. err := chat.Portal.BackfillHistory(user, chat.LastMessageTime)
  718. if err != nil {
  719. chat.Portal.log.Errorln("Error backfilling history:", err)
  720. }
  721. }
  722. func (user *User) collectChatList(chatMap map[string]whatsapp.Chat) ChatList {
  723. if chatMap == nil {
  724. chatMap = user.Conn.Store.Chats
  725. }
  726. user.log.Infoln("Reading chat list")
  727. chats := make(ChatList, 0, len(chatMap))
  728. existingKeys := user.GetInCommunityMap()
  729. portalKeys := make([]database.PortalKeyWithMeta, 0, len(chatMap))
  730. for _, chat := range chatMap {
  731. portal := user.GetPortalByJID(chat.JID)
  732. user.Conn.Store.ContactsLock.RLock()
  733. contact, _ := user.Conn.Store.Contacts[chat.JID]
  734. user.Conn.Store.ContactsLock.RUnlock()
  735. chats = append(chats, Chat{
  736. Chat: chat,
  737. Portal: portal,
  738. Contact: contact,
  739. })
  740. var inCommunity, ok bool
  741. if inCommunity, ok = existingKeys[portal.Key]; !ok || !inCommunity {
  742. inCommunity = user.addPortalToCommunity(portal)
  743. if portal.IsPrivateChat() {
  744. puppet := user.bridge.GetPuppetByJID(portal.Key.JID)
  745. user.addPuppetToCommunity(puppet)
  746. }
  747. }
  748. portalKeys = append(portalKeys, database.PortalKeyWithMeta{PortalKey: portal.Key, InCommunity: inCommunity})
  749. }
  750. user.log.Infoln("Read chat list, updating user-portal mapping")
  751. err := user.SetPortalKeys(portalKeys)
  752. if err != nil {
  753. user.log.Warnln("Failed to update user-portal mapping:", err)
  754. }
  755. sort.Sort(chats)
  756. return chats
  757. }
  758. func (user *User) syncPortals(chatMap map[string]whatsapp.Chat, createAll bool) {
  759. // TODO use contexts instead of checking if user.Conn is the same?
  760. connAtStart := user.Conn
  761. chats := user.collectChatList(chatMap)
  762. limit := user.bridge.Config.Bridge.InitialChatSync
  763. if limit < 0 {
  764. limit = len(chats)
  765. }
  766. if user.Conn != connAtStart {
  767. user.log.Debugln("Connection seems to have changed before sync, cancelling")
  768. return
  769. }
  770. now := time.Now().Unix()
  771. user.log.Infoln("Syncing portals")
  772. doublePuppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  773. for i, chat := range chats {
  774. if chat.LastMessageTime+user.bridge.Config.Bridge.SyncChatMaxAge < now {
  775. break
  776. }
  777. create := (chat.LastMessageTime >= user.LastConnection && user.LastConnection > 0) || i < limit
  778. if len(chat.Portal.MXID) > 0 || create || createAll {
  779. user.log.Debugfln("Syncing chat %+v", chat.Chat.Source)
  780. justCreated := len(chat.Portal.MXID) == 0
  781. user.syncPortal(chat)
  782. user.syncChatDoublePuppetDetails(doublePuppet, chat, justCreated)
  783. }
  784. }
  785. if user.Conn != connAtStart {
  786. user.log.Debugln("Connection seems to have changed during sync, cancelling")
  787. return
  788. }
  789. user.UpdateDirectChats(nil)
  790. user.log.Infoln("Finished syncing portals")
  791. select {
  792. case user.syncPortalsDone <- struct{}{}:
  793. default:
  794. }
  795. }
  796. func (user *User) getDirectChats() map[id.UserID][]id.RoomID {
  797. res := make(map[id.UserID][]id.RoomID)
  798. privateChats := user.bridge.DB.Portal.FindPrivateChats(user.JID)
  799. for _, portal := range privateChats {
  800. if len(portal.MXID) > 0 {
  801. res[user.bridge.FormatPuppetMXID(portal.Key.JID)] = []id.RoomID{portal.MXID}
  802. }
  803. }
  804. return res
  805. }
  806. func (user *User) UpdateDirectChats(chats map[id.UserID][]id.RoomID) {
  807. if !user.bridge.Config.Bridge.SyncDirectChatList {
  808. return
  809. }
  810. puppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  811. if puppet == nil || puppet.CustomIntent() == nil {
  812. return
  813. }
  814. intent := puppet.CustomIntent()
  815. method := http.MethodPatch
  816. if chats == nil {
  817. chats = user.getDirectChats()
  818. method = http.MethodPut
  819. }
  820. user.log.Debugln("Updating m.direct list on homeserver")
  821. var err error
  822. if user.bridge.Config.Homeserver.Asmux {
  823. urlPath := intent.BuildBaseURL("_matrix", "client", "unstable", "com.beeper.asmux", "dms")
  824. _, err = intent.MakeFullRequest(mautrix.FullRequest{
  825. Method: method,
  826. URL: urlPath,
  827. Headers: http.Header{"X-Asmux-Auth": {user.bridge.AS.Registration.AppToken}},
  828. RequestJSON: chats,
  829. })
  830. } else {
  831. existingChats := make(map[id.UserID][]id.RoomID)
  832. err = intent.GetAccountData(event.AccountDataDirectChats.Type, &existingChats)
  833. if err != nil {
  834. user.log.Warnln("Failed to get m.direct list to update it:", err)
  835. return
  836. }
  837. for userID, rooms := range existingChats {
  838. if _, ok := user.bridge.ParsePuppetMXID(userID); !ok {
  839. // This is not a ghost user, include it in the new list
  840. chats[userID] = rooms
  841. } else if _, ok := chats[userID]; !ok && method == http.MethodPatch {
  842. // This is a ghost user, but we're not replacing the whole list, so include it too
  843. chats[userID] = rooms
  844. }
  845. }
  846. err = intent.SetAccountData(event.AccountDataDirectChats.Type, &chats)
  847. }
  848. if err != nil {
  849. user.log.Warnln("Failed to update m.direct list:", err)
  850. }
  851. }
  852. func (user *User) HandleContactList(contacts []whatsapp.Contact) {
  853. contactMap := make(map[whatsapp.JID]whatsapp.Contact)
  854. for _, contact := range contacts {
  855. contactMap[contact.JID] = contact
  856. }
  857. go user.syncPuppets(contactMap)
  858. }
  859. func (user *User) syncPuppets(contacts map[whatsapp.JID]whatsapp.Contact) {
  860. if contacts == nil {
  861. contacts = user.Conn.Store.Contacts
  862. }
  863. _, hasSelf := contacts[user.JID]
  864. if !hasSelf {
  865. contacts[user.JID] = whatsapp.Contact{
  866. Name: user.pushName,
  867. Notify: user.pushName,
  868. JID: user.JID,
  869. }
  870. }
  871. user.log.Infoln("Syncing puppet info from contacts")
  872. for jid, contact := range contacts {
  873. if strings.HasSuffix(jid, whatsapp.NewUserSuffix) {
  874. puppet := user.bridge.GetPuppetByJID(contact.JID)
  875. puppet.Sync(user, contact)
  876. } else if strings.HasSuffix(jid, whatsapp.BroadcastSuffix) {
  877. portal := user.GetPortalByJID(contact.JID)
  878. portal.Sync(user, contact)
  879. }
  880. }
  881. user.log.Infoln("Finished syncing puppet info from contacts")
  882. }
  883. func (user *User) updateLastConnectionIfNecessary() {
  884. if user.LastConnection+60 < time.Now().Unix() {
  885. user.UpdateLastConnection()
  886. }
  887. }
  888. func (user *User) HandleError(err error) {
  889. if !errors.Is(err, whatsapp.ErrInvalidWsData) {
  890. user.log.Errorfln("WhatsApp error: %v", err)
  891. }
  892. if closed, ok := err.(*whatsapp.ErrConnectionClosed); ok {
  893. user.bridge.Metrics.TrackDisconnection(user.MXID)
  894. if closed.Code == 1000 && user.cleanDisconnection {
  895. user.cleanDisconnection = false
  896. if !user.bridge.Config.Bridge.AggressiveReconnect {
  897. user.sendBridgeState(BridgeState{Error: WANotConnected})
  898. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  899. user.log.Infoln("Clean disconnection by server")
  900. return
  901. } else {
  902. user.log.Debugln("Clean disconnection by server, but aggressive reconnection is enabled")
  903. }
  904. }
  905. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection was closed with websocket status code %d", closed.Code))
  906. } else if failed, ok := err.(*whatsapp.ErrConnectionFailed); ok {
  907. disconnectErr := user.Conn.Disconnect()
  908. if disconnectErr != nil {
  909. user.log.Warnln("Failed to disconnect after connection fail:", disconnectErr)
  910. }
  911. user.bridge.Metrics.TrackDisconnection(user.MXID)
  912. user.ConnectionErrors++
  913. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection failed: %v", failed.Err))
  914. } else if err == whatsapp.ErrPingFalse || err == whatsapp.ErrWebsocketKeepaliveFailed {
  915. disconnectErr := user.Conn.Disconnect()
  916. if disconnectErr != nil {
  917. user.log.Warnln("Failed to disconnect after failed ping:", disconnectErr)
  918. }
  919. user.bridge.Metrics.TrackDisconnection(user.MXID)
  920. user.ConnectionErrors++
  921. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection failed: %v", err))
  922. }
  923. // Otherwise unknown error, probably mostly harmless
  924. }
  925. func (user *User) tryReconnect(msg string) {
  926. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  927. if user.ConnectionErrors > user.bridge.Config.Bridge.MaxConnectionAttempts {
  928. user.sendMarkdownBridgeAlert("%s. Use the `reconnect` command to reconnect.", msg)
  929. user.sendBridgeState(BridgeState{Error: WANotConnected})
  930. return
  931. }
  932. if user.bridge.Config.Bridge.ReportConnectionRetry {
  933. user.sendBridgeNotice("%s. Reconnecting...", msg)
  934. // Don't want the same error to be repeated
  935. msg = ""
  936. }
  937. var tries uint
  938. var exponentialBackoff bool
  939. baseDelay := time.Duration(user.bridge.Config.Bridge.ConnectionRetryDelay)
  940. if baseDelay < 0 {
  941. exponentialBackoff = true
  942. baseDelay = -baseDelay + 1
  943. }
  944. delay := baseDelay
  945. ctx, cancel := context.WithCancel(context.Background())
  946. defer cancel()
  947. user.cancelReconnect = cancel
  948. for user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
  949. select {
  950. case <-ctx.Done():
  951. user.log.Debugln("tryReconnect context cancelled, aborting reconnection attempts")
  952. return
  953. default:
  954. }
  955. user.sendBridgeState(BridgeState{Error: WAConnecting})
  956. err := user.Conn.Restore(true, ctx)
  957. if err == nil {
  958. user.ConnectionErrors = 0
  959. if user.bridge.Config.Bridge.ReportConnectionRetry {
  960. user.sendBridgeNotice("Reconnected successfully")
  961. }
  962. user.PostLogin()
  963. return
  964. } else if errors.Is(err, whatsapp.ErrBadRequest) {
  965. user.log.Warnln("Got init 400 error when trying to reconnect, resetting connection...")
  966. err = user.Conn.Disconnect()
  967. if err != nil {
  968. user.log.Debugln("Error while disconnecting for connection reset:", err)
  969. }
  970. } else if errors.Is(err, whatsapp.ErrUnpaired) {
  971. user.log.Errorln("Got init 401 (unpaired) error when trying to reconnect, not retrying")
  972. user.removeFromJIDMap()
  973. //user.JID = ""
  974. user.SetSession(nil)
  975. user.DeleteConnection()
  976. user.sendMarkdownBridgeAlert("\u26a0 Failed to reconnect to WhatsApp: unpaired from phone. " +
  977. "To re-pair your phone, log in again.")
  978. user.sendBridgeState(BridgeState{Error: WANotLoggedIn})
  979. return
  980. } else if errors.Is(err, whatsapp.ErrAlreadyLoggedIn) {
  981. user.log.Warnln("Reconnection said we're already logged in, not trying anymore")
  982. return
  983. } else {
  984. user.log.Errorln("Error while trying to reconnect after disconnection:", err)
  985. }
  986. tries++
  987. user.ConnectionErrors++
  988. if user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
  989. if exponentialBackoff {
  990. delay = (1 << tries) + baseDelay
  991. }
  992. if user.bridge.Config.Bridge.ReportConnectionRetry {
  993. user.sendBridgeNotice("Reconnection attempt failed: %v. Retrying in %d seconds...", err, delay)
  994. }
  995. time.Sleep(delay * time.Second)
  996. }
  997. }
  998. user.sendBridgeState(BridgeState{Error: WANotConnected})
  999. if user.bridge.Config.Bridge.ReportConnectionRetry {
  1000. user.sendMarkdownBridgeAlert("%d reconnection attempts failed. Use the `reconnect` command to try to reconnect manually.", tries)
  1001. } else {
  1002. user.sendMarkdownBridgeAlert("\u26a0 %s. Additionally, %d reconnection attempts failed. Use the `reconnect` command to try to reconnect.", msg, tries)
  1003. }
  1004. }
  1005. func (user *User) PortalKey(jid whatsapp.JID) database.PortalKey {
  1006. return database.NewPortalKey(jid, user.JID)
  1007. }
  1008. func (user *User) GetPortalByJID(jid whatsapp.JID) *Portal {
  1009. return user.bridge.GetPortalByJID(user.PortalKey(jid))
  1010. }
  1011. func (user *User) runMessageRingBuffer() {
  1012. for msg := range user.messageInput {
  1013. select {
  1014. case user.messageOutput <- msg:
  1015. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  1016. default:
  1017. dropped := <-user.messageOutput
  1018. user.log.Warnln("Buffer is full, dropping message in", dropped.chat)
  1019. user.messageOutput <- msg
  1020. }
  1021. }
  1022. }
  1023. func (user *User) handleMessageLoop() {
  1024. for {
  1025. select {
  1026. case msg := <-user.messageOutput:
  1027. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  1028. user.GetPortalByJID(msg.chat).messages <- msg
  1029. case <-user.syncStart:
  1030. user.log.Debugln("Processing of incoming messages is locked")
  1031. user.bridge.Metrics.TrackSyncLock(user.JID, true)
  1032. user.syncWait.Wait()
  1033. user.bridge.Metrics.TrackSyncLock(user.JID, false)
  1034. user.log.Debugln("Processing of incoming messages unlocked")
  1035. }
  1036. }
  1037. }
  1038. func (user *User) HandleNewContact(contact whatsapp.Contact) {
  1039. user.log.Debugfln("Contact message: %+v", contact)
  1040. if strings.HasSuffix(contact.JID, whatsapp.OldUserSuffix) {
  1041. contact.JID = strings.Replace(contact.JID, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, -1)
  1042. }
  1043. if strings.HasSuffix(contact.JID, whatsapp.NewUserSuffix) {
  1044. puppet := user.bridge.GetPuppetByJID(contact.JID)
  1045. puppet.UpdateName(user, contact)
  1046. } else if strings.HasSuffix(contact.JID, whatsapp.BroadcastSuffix) {
  1047. portal := user.GetPortalByJID(contact.JID)
  1048. portal.UpdateName(contact.Name, "", nil, true)
  1049. }
  1050. }
  1051. func (user *User) HandleBatteryMessage(battery whatsapp.BatteryMessage) {
  1052. user.log.Debugfln("Battery message: %+v", battery)
  1053. var notice string
  1054. if !battery.Plugged && battery.Percentage < 15 && user.batteryWarningsSent < 1 {
  1055. notice = fmt.Sprintf("Phone battery low (%d %% remaining)", battery.Percentage)
  1056. user.batteryWarningsSent = 1
  1057. } else if !battery.Plugged && battery.Percentage < 5 && user.batteryWarningsSent < 2 {
  1058. notice = fmt.Sprintf("Phone battery very low (%d %% remaining)", battery.Percentage)
  1059. user.batteryWarningsSent = 2
  1060. } else if battery.Percentage > 15 || battery.Plugged {
  1061. user.batteryWarningsSent = 0
  1062. }
  1063. if notice != "" {
  1064. go user.sendBridgeNotice("%s", notice)
  1065. }
  1066. }
  1067. type FakeMessage struct {
  1068. Text string
  1069. ID string
  1070. Alert bool
  1071. }
  1072. func (user *User) HandleCallInfo(info whatsapp.CallInfo) {
  1073. if info.Data != nil {
  1074. return
  1075. }
  1076. data := FakeMessage{
  1077. ID: info.ID,
  1078. }
  1079. switch info.Type {
  1080. case whatsapp.CallOffer:
  1081. if !user.bridge.Config.Bridge.CallNotices.Start {
  1082. return
  1083. }
  1084. data.Text = "Incoming call"
  1085. data.Alert = true
  1086. case whatsapp.CallOfferVideo:
  1087. if !user.bridge.Config.Bridge.CallNotices.Start {
  1088. return
  1089. }
  1090. data.Text = "Incoming video call"
  1091. data.Alert = true
  1092. case whatsapp.CallTerminate:
  1093. if !user.bridge.Config.Bridge.CallNotices.End {
  1094. return
  1095. }
  1096. data.Text = "Call ended"
  1097. data.ID += "E"
  1098. default:
  1099. return
  1100. }
  1101. portal := user.GetPortalByJID(info.From)
  1102. if portal != nil {
  1103. portal.messages <- PortalMessage{info.From, user, data, 0}
  1104. }
  1105. }
  1106. func (user *User) HandlePresence(info whatsapp.PresenceEvent) {
  1107. puppet := user.bridge.GetPuppetByJID(info.SenderJID)
  1108. switch info.Status {
  1109. case whatsapp.PresenceUnavailable:
  1110. _ = puppet.DefaultIntent().SetPresence("offline")
  1111. case whatsapp.PresenceAvailable:
  1112. if len(puppet.typingIn) > 0 && puppet.typingAt+15 > time.Now().Unix() {
  1113. portal := user.bridge.GetPortalByMXID(puppet.typingIn)
  1114. _, _ = puppet.IntentFor(portal).UserTyping(puppet.typingIn, false, 0)
  1115. puppet.typingIn = ""
  1116. puppet.typingAt = 0
  1117. } else {
  1118. _ = puppet.DefaultIntent().SetPresence("online")
  1119. }
  1120. case whatsapp.PresenceComposing:
  1121. portal := user.GetPortalByJID(info.JID)
  1122. if len(puppet.typingIn) > 0 && puppet.typingAt+15 > time.Now().Unix() {
  1123. if puppet.typingIn == portal.MXID {
  1124. return
  1125. }
  1126. _, _ = puppet.IntentFor(portal).UserTyping(puppet.typingIn, false, 0)
  1127. }
  1128. puppet.typingIn = portal.MXID
  1129. puppet.typingAt = time.Now().Unix()
  1130. _, _ = puppet.IntentFor(portal).UserTyping(portal.MXID, true, 15*1000)
  1131. }
  1132. }
  1133. func (user *User) HandleMsgInfo(info whatsapp.JSONMsgInfo) {
  1134. if (info.Command == whatsapp.MsgInfoCommandAck || info.Command == whatsapp.MsgInfoCommandAcks) && info.Acknowledgement == whatsapp.AckMessageRead {
  1135. portal := user.GetPortalByJID(info.ToJID)
  1136. if len(portal.MXID) == 0 {
  1137. return
  1138. }
  1139. intent := user.bridge.GetPuppetByJID(info.SenderJID).IntentFor(portal)
  1140. for _, msgID := range info.IDs {
  1141. msg := user.bridge.DB.Message.GetByJID(portal.Key, msgID)
  1142. if msg == nil || msg.IsFakeMXID() {
  1143. continue
  1144. }
  1145. err := intent.MarkReadWithContent(portal.MXID, msg.MXID, &CustomReadReceipt{DoublePuppet: intent.IsCustomPuppet})
  1146. if err != nil {
  1147. user.log.Warnfln("Failed to mark message %s as read by %s: %v", msg.MXID, info.SenderJID, err)
  1148. }
  1149. }
  1150. }
  1151. }
  1152. func (user *User) HandleReceivedMessage(received whatsapp.ReceivedMessage) {
  1153. if received.Type == "read" {
  1154. go user.markSelfRead(received.Jid, received.Index)
  1155. } else {
  1156. user.log.Debugfln("Unknown received message type: %+v", received)
  1157. }
  1158. }
  1159. func (user *User) HandleReadMessage(read whatsapp.ReadMessage) {
  1160. user.log.Debugfln("Received chat read message: %+v", read)
  1161. go user.markSelfRead(read.Jid, "")
  1162. }
  1163. func (user *User) markSelfRead(jid, messageID string) {
  1164. if strings.HasSuffix(jid, whatsapp.OldUserSuffix) {
  1165. jid = strings.Replace(jid, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, -1)
  1166. }
  1167. puppet := user.bridge.GetPuppetByJID(user.JID)
  1168. if puppet == nil {
  1169. return
  1170. }
  1171. intent := puppet.CustomIntent()
  1172. if intent == nil {
  1173. return
  1174. }
  1175. portal := user.GetPortalByJID(jid)
  1176. if portal == nil {
  1177. return
  1178. }
  1179. var message *database.Message
  1180. if messageID == "" {
  1181. message = user.bridge.DB.Message.GetLastInChat(portal.Key)
  1182. if message == nil {
  1183. return
  1184. }
  1185. user.log.Debugfln("User read chat %s/%s in WhatsApp mobile (last known event: %s/%s)", portal.Key.JID, portal.MXID, message.JID, message.MXID)
  1186. } else {
  1187. message = user.bridge.DB.Message.GetByJID(portal.Key, messageID)
  1188. if message == nil || message.IsFakeMXID() {
  1189. return
  1190. }
  1191. user.log.Debugfln("User read message %s/%s in %s/%s in WhatsApp mobile", message.JID, message.MXID, portal.Key.JID, portal.MXID)
  1192. }
  1193. err := intent.MarkReadWithContent(portal.MXID, message.MXID, &CustomReadReceipt{DoublePuppet: true})
  1194. if err != nil {
  1195. user.log.Warnfln("Failed to bridge own read receipt in %s: %v", jid, err)
  1196. }
  1197. }
  1198. func (user *User) HandleCommand(cmd whatsapp.JSONCommand) {
  1199. switch cmd.Type {
  1200. case whatsapp.CommandPicture:
  1201. if strings.HasSuffix(cmd.JID, whatsapp.NewUserSuffix) {
  1202. puppet := user.bridge.GetPuppetByJID(cmd.JID)
  1203. go puppet.UpdateAvatar(user, cmd.ProfilePicInfo)
  1204. } else if user.bridge.Config.Bridge.ChatMetaSync {
  1205. portal := user.GetPortalByJID(cmd.JID)
  1206. go portal.UpdateAvatar(user, cmd.ProfilePicInfo, true)
  1207. }
  1208. case whatsapp.CommandDisconnect:
  1209. if cmd.Kind == "replaced" {
  1210. user.cleanDisconnection = true
  1211. go user.sendMarkdownBridgeAlert("\u26a0 Your WhatsApp connection was closed by the server because you opened another WhatsApp Web client.\n\n" +
  1212. "Use the `reconnect` command to disconnect the other client and resume bridging.")
  1213. } else {
  1214. user.log.Warnln("Unknown kind of disconnect:", string(cmd.Raw))
  1215. go user.sendMarkdownBridgeAlert("\u26a0 Your WhatsApp connection was closed by the server (reason code: %s).\n\n"+
  1216. "Use the `reconnect` command to reconnect.", cmd.Kind)
  1217. }
  1218. }
  1219. }
  1220. func (user *User) HandleChatUpdate(cmd whatsapp.ChatUpdate) {
  1221. if cmd.Command != whatsapp.ChatUpdateCommandAction {
  1222. return
  1223. }
  1224. portal := user.GetPortalByJID(cmd.JID)
  1225. if len(portal.MXID) == 0 {
  1226. if cmd.Data.Action == whatsapp.ChatActionIntroduce || cmd.Data.Action == whatsapp.ChatActionCreate {
  1227. go func() {
  1228. err := portal.CreateMatrixRoom(user)
  1229. if err != nil {
  1230. user.log.Errorln("Failed to create portal room after receiving join event:", err)
  1231. }
  1232. }()
  1233. }
  1234. return
  1235. }
  1236. // These don't come down the message history :(
  1237. switch cmd.Data.Action {
  1238. case whatsapp.ChatActionAddTopic:
  1239. go portal.UpdateTopic(cmd.Data.AddTopic.Topic, cmd.Data.SenderJID, nil, true)
  1240. case whatsapp.ChatActionRemoveTopic:
  1241. go portal.UpdateTopic("", cmd.Data.SenderJID, nil, true)
  1242. case whatsapp.ChatActionRemove:
  1243. // We ignore leaving groups in the message history to avoid accidentally leaving rejoined groups,
  1244. // but if we get a real-time command that says we left, it should be safe to bridge it.
  1245. if !user.bridge.Config.Bridge.ChatMetaSync {
  1246. for _, jid := range cmd.Data.UserChange.JIDs {
  1247. if jid == user.JID {
  1248. go portal.HandleWhatsAppKick(nil, cmd.Data.SenderJID, cmd.Data.UserChange.JIDs)
  1249. break
  1250. }
  1251. }
  1252. }
  1253. }
  1254. if !user.bridge.Config.Bridge.ChatMetaSync {
  1255. // Ignore chat update commands, we're relying on the message history.
  1256. return
  1257. }
  1258. switch cmd.Data.Action {
  1259. case whatsapp.ChatActionNameChange:
  1260. go portal.UpdateName(cmd.Data.NameChange.Name, cmd.Data.SenderJID, nil, true)
  1261. case whatsapp.ChatActionPromote:
  1262. go portal.ChangeAdminStatus(cmd.Data.UserChange.JIDs, true)
  1263. case whatsapp.ChatActionDemote:
  1264. go portal.ChangeAdminStatus(cmd.Data.UserChange.JIDs, false)
  1265. case whatsapp.ChatActionAnnounce:
  1266. go portal.RestrictMessageSending(cmd.Data.Announce)
  1267. case whatsapp.ChatActionRestrict:
  1268. go portal.RestrictMetadataChanges(cmd.Data.Restrict)
  1269. case whatsapp.ChatActionRemove:
  1270. go portal.HandleWhatsAppKick(nil, cmd.Data.SenderJID, cmd.Data.UserChange.JIDs)
  1271. case whatsapp.ChatActionAdd:
  1272. go portal.HandleWhatsAppInvite(user, cmd.Data.SenderJID, nil, cmd.Data.UserChange.JIDs)
  1273. case whatsapp.ChatActionIntroduce:
  1274. if cmd.Data.SenderJID != "unknown" {
  1275. go portal.Sync(user, whatsapp.Contact{JID: portal.Key.JID})
  1276. }
  1277. }
  1278. }
  1279. func (user *User) HandleConnInfo(info whatsapp.ConnInfo) {
  1280. if user.Session != nil && info.Connected && len(info.ClientToken) > 0 {
  1281. user.log.Debugln("Received new tokens")
  1282. user.Session.ClientToken = info.ClientToken
  1283. user.Session.ServerToken = info.ServerToken
  1284. user.Session.Wid = info.WID
  1285. user.Update()
  1286. }
  1287. if len(info.PushName) > 0 {
  1288. user.pushName = info.PushName
  1289. }
  1290. }
  1291. func (user *User) HandleJSONMessage(evt whatsapp.RawJSONMessage) {
  1292. if !json.Valid(evt.RawMessage) {
  1293. return
  1294. }
  1295. user.log.Debugfln("JSON message with tag %s: %s", evt.Tag, evt.RawMessage)
  1296. user.updateLastConnectionIfNecessary()
  1297. }
  1298. func (user *User) NeedsRelaybot(portal *Portal) bool {
  1299. return !user.HasSession() || !user.IsInPortal(portal.Key)
  1300. }