user.go 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395
  1. // mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
  2. // Copyright (C) 2020 Tulir Asokan
  3. //
  4. // This program is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // This program is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. package main
  17. import (
  18. "context"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "net/http"
  23. "sort"
  24. "strings"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/skip2/go-qrcode"
  29. log "maunium.net/go/maulogger/v2"
  30. "maunium.net/go/mautrix/appservice"
  31. "maunium.net/go/mautrix/pushrules"
  32. "github.com/Rhymen/go-whatsapp"
  33. waBinary "github.com/Rhymen/go-whatsapp/binary"
  34. waProto "github.com/Rhymen/go-whatsapp/binary/proto"
  35. "maunium.net/go/mautrix"
  36. "maunium.net/go/mautrix/event"
  37. "maunium.net/go/mautrix/format"
  38. "maunium.net/go/mautrix/id"
  39. "maunium.net/go/mautrix-whatsapp/database"
  40. )
  41. type User struct {
  42. *database.User
  43. Conn *whatsapp.Conn
  44. bridge *Bridge
  45. log log.Logger
  46. Admin bool
  47. Whitelisted bool
  48. RelaybotWhitelisted bool
  49. IsRelaybot bool
  50. ConnectionErrors int
  51. CommunityID string
  52. cleanDisconnection bool
  53. batteryWarningsSent int
  54. lastReconnection int64
  55. pushName string
  56. chatListReceived chan struct{}
  57. syncPortalsDone chan struct{}
  58. messageInput chan PortalMessage
  59. messageOutput chan PortalMessage
  60. syncStart chan struct{}
  61. syncWait sync.WaitGroup
  62. syncing int32
  63. mgmtCreateLock sync.Mutex
  64. connLock sync.Mutex
  65. cancelReconnect func()
  66. prevBridgeStatus *AsmuxPong
  67. }
  68. func (bridge *Bridge) GetUserByMXID(userID id.UserID) *User {
  69. _, isPuppet := bridge.ParsePuppetMXID(userID)
  70. if isPuppet || userID == bridge.Bot.UserID {
  71. return nil
  72. }
  73. bridge.usersLock.Lock()
  74. defer bridge.usersLock.Unlock()
  75. user, ok := bridge.usersByMXID[userID]
  76. if !ok {
  77. return bridge.loadDBUser(bridge.DB.User.GetByMXID(userID), &userID)
  78. }
  79. return user
  80. }
  81. func (bridge *Bridge) GetUserByJID(userID whatsapp.JID) *User {
  82. bridge.usersLock.Lock()
  83. defer bridge.usersLock.Unlock()
  84. user, ok := bridge.usersByJID[userID]
  85. if !ok {
  86. return bridge.loadDBUser(bridge.DB.User.GetByJID(userID), nil)
  87. }
  88. return user
  89. }
  90. func (user *User) addToJIDMap() {
  91. user.bridge.usersLock.Lock()
  92. user.bridge.usersByJID[user.JID] = user
  93. user.bridge.usersLock.Unlock()
  94. }
  95. func (user *User) removeFromJIDMap() {
  96. user.bridge.usersLock.Lock()
  97. jidUser, ok := user.bridge.usersByJID[user.JID]
  98. if ok && user == jidUser {
  99. delete(user.bridge.usersByJID, user.JID)
  100. }
  101. user.bridge.usersLock.Unlock()
  102. user.bridge.Metrics.TrackLoginState(user.JID, false)
  103. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotLoggedIn})
  104. }
  105. func (bridge *Bridge) GetAllUsers() []*User {
  106. bridge.usersLock.Lock()
  107. defer bridge.usersLock.Unlock()
  108. dbUsers := bridge.DB.User.GetAll()
  109. output := make([]*User, len(dbUsers))
  110. for index, dbUser := range dbUsers {
  111. user, ok := bridge.usersByMXID[dbUser.MXID]
  112. if !ok {
  113. user = bridge.loadDBUser(dbUser, nil)
  114. }
  115. output[index] = user
  116. }
  117. return output
  118. }
  119. func (bridge *Bridge) loadDBUser(dbUser *database.User, mxid *id.UserID) *User {
  120. if dbUser == nil {
  121. if mxid == nil {
  122. return nil
  123. }
  124. dbUser = bridge.DB.User.New()
  125. dbUser.MXID = *mxid
  126. dbUser.Insert()
  127. }
  128. user := bridge.NewUser(dbUser)
  129. bridge.usersByMXID[user.MXID] = user
  130. if len(user.JID) > 0 {
  131. bridge.usersByJID[user.JID] = user
  132. }
  133. if len(user.ManagementRoom) > 0 {
  134. bridge.managementRooms[user.ManagementRoom] = user
  135. }
  136. return user
  137. }
  138. func (user *User) GetPortals() []*Portal {
  139. keys := user.User.GetPortalKeys()
  140. portals := make([]*Portal, len(keys))
  141. user.bridge.portalsLock.Lock()
  142. for i, key := range keys {
  143. portal, ok := user.bridge.portalsByJID[key]
  144. if !ok {
  145. portal = user.bridge.loadDBPortal(user.bridge.DB.Portal.GetByJID(key), &key)
  146. }
  147. portals[i] = portal
  148. }
  149. user.bridge.portalsLock.Unlock()
  150. return portals
  151. }
  152. func (bridge *Bridge) NewUser(dbUser *database.User) *User {
  153. user := &User{
  154. User: dbUser,
  155. bridge: bridge,
  156. log: bridge.Log.Sub("User").Sub(string(dbUser.MXID)),
  157. IsRelaybot: false,
  158. chatListReceived: make(chan struct{}, 1),
  159. syncPortalsDone: make(chan struct{}, 1),
  160. syncStart: make(chan struct{}, 1),
  161. messageInput: make(chan PortalMessage),
  162. messageOutput: make(chan PortalMessage, bridge.Config.Bridge.UserMessageBuffer),
  163. }
  164. user.RelaybotWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelaybotWhitelisted(user.MXID)
  165. user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID)
  166. user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID)
  167. go user.handleMessageLoop()
  168. go user.runMessageRingBuffer()
  169. return user
  170. }
  171. func (user *User) GetManagementRoom() id.RoomID {
  172. if len(user.ManagementRoom) == 0 {
  173. user.mgmtCreateLock.Lock()
  174. defer user.mgmtCreateLock.Unlock()
  175. if len(user.ManagementRoom) > 0 {
  176. return user.ManagementRoom
  177. }
  178. resp, err := user.bridge.Bot.CreateRoom(&mautrix.ReqCreateRoom{
  179. Topic: "WhatsApp bridge notices",
  180. IsDirect: true,
  181. })
  182. if err != nil {
  183. user.log.Errorln("Failed to auto-create management room:", err)
  184. } else {
  185. user.SetManagementRoom(resp.RoomID)
  186. }
  187. }
  188. return user.ManagementRoom
  189. }
  190. func (user *User) SetManagementRoom(roomID id.RoomID) {
  191. existingUser, ok := user.bridge.managementRooms[roomID]
  192. if ok {
  193. existingUser.ManagementRoom = ""
  194. existingUser.Update()
  195. }
  196. user.ManagementRoom = roomID
  197. user.bridge.managementRooms[user.ManagementRoom] = user
  198. user.Update()
  199. }
  200. func (user *User) SetSession(session *whatsapp.Session) {
  201. if session == nil {
  202. user.Session = nil
  203. user.LastConnection = 0
  204. } else if len(session.Wid) > 0 {
  205. user.Session = session
  206. } else {
  207. return
  208. }
  209. user.Update()
  210. }
  211. func (user *User) Connect(evenIfNoSession bool) bool {
  212. user.connLock.Lock()
  213. if user.Conn != nil {
  214. user.connLock.Unlock()
  215. if user.Conn.IsConnected() {
  216. return true
  217. } else {
  218. return user.RestoreSession()
  219. }
  220. } else if !evenIfNoSession && user.Session == nil {
  221. user.connLock.Unlock()
  222. return false
  223. }
  224. user.log.Debugln("Connecting to WhatsApp")
  225. if user.Session != nil {
  226. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWAConnecting})
  227. }
  228. timeout := time.Duration(user.bridge.Config.Bridge.ConnectionTimeout)
  229. if timeout == 0 {
  230. timeout = 20
  231. }
  232. user.Conn = whatsapp.NewConn(&whatsapp.Options{
  233. Timeout: timeout * time.Second,
  234. LongClientName: user.bridge.Config.WhatsApp.OSName,
  235. ShortClientName: user.bridge.Config.WhatsApp.BrowserName,
  236. ClientVersion: WAVersion,
  237. Log: user.log.Sub("Conn"),
  238. Handler: []whatsapp.Handler{user},
  239. })
  240. user.setupAdminTestHooks()
  241. user.connLock.Unlock()
  242. return user.RestoreSession()
  243. }
  244. func (user *User) DeleteConnection() {
  245. user.connLock.Lock()
  246. if user.Conn == nil {
  247. user.connLock.Unlock()
  248. return
  249. }
  250. err := user.Conn.Disconnect()
  251. if err != nil && err != whatsapp.ErrNotConnected {
  252. user.log.Warnln("Error disconnecting: %v", err)
  253. }
  254. user.Conn.RemoveHandlers()
  255. user.Conn = nil
  256. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  257. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
  258. user.connLock.Unlock()
  259. }
  260. func (user *User) RestoreSession() bool {
  261. if user.Session != nil {
  262. user.Conn.SetSession(*user.Session)
  263. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
  264. defer cancel()
  265. err := user.Conn.Restore(true, ctx)
  266. if err == whatsapp.ErrAlreadyLoggedIn {
  267. return true
  268. } else if err != nil {
  269. user.log.Errorln("Failed to restore session:", err)
  270. if errors.Is(err, whatsapp.ErrUnpaired) {
  271. user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp: unpaired from phone. " +
  272. "To re-pair your phone, log in again.")
  273. user.removeFromJIDMap()
  274. //user.JID = ""
  275. user.SetSession(nil)
  276. user.DeleteConnection()
  277. return false
  278. } else {
  279. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
  280. user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp. Make sure WhatsApp " +
  281. "on your phone is reachable and use `reconnect` to try connecting again.")
  282. }
  283. user.log.Debugln("Disconnecting due to failed session restore...")
  284. err = user.Conn.Disconnect()
  285. if err != nil {
  286. user.log.Errorln("Failed to disconnect after failed session restore:", err)
  287. }
  288. return false
  289. }
  290. user.ConnectionErrors = 0
  291. user.log.Debugln("Session restored successfully")
  292. user.PostLogin()
  293. }
  294. return true
  295. }
  296. func (user *User) HasSession() bool {
  297. return user.Session != nil
  298. }
  299. func (user *User) IsConnected() bool {
  300. return user.Conn != nil && user.Conn.IsConnected() && user.Conn.IsLoggedIn()
  301. }
  302. func (user *User) IsLoginInProgress() bool {
  303. return user.Conn != nil && user.Conn.IsLoginInProgress()
  304. }
  305. func (user *User) loginQrChannel(ce *CommandEvent, qrChan <-chan string, eventIDChan chan<- id.EventID) {
  306. var qrEventID id.EventID
  307. for code := range qrChan {
  308. if code == "stop" {
  309. return
  310. }
  311. qrCode, err := qrcode.Encode(code, qrcode.Low, 256)
  312. if err != nil {
  313. user.log.Errorln("Failed to encode QR code:", err)
  314. ce.Reply("Failed to encode QR code: %v", err)
  315. return
  316. }
  317. bot := user.bridge.AS.BotClient()
  318. resp, err := bot.UploadBytes(qrCode, "image/png")
  319. if err != nil {
  320. user.log.Errorln("Failed to upload QR code:", err)
  321. ce.Reply("Failed to upload QR code: %v", err)
  322. return
  323. }
  324. if qrEventID == "" {
  325. sendResp, err := bot.SendImage(ce.RoomID, code, resp.ContentURI)
  326. if err != nil {
  327. user.log.Errorln("Failed to send QR code to user:", err)
  328. return
  329. }
  330. qrEventID = sendResp.EventID
  331. eventIDChan <- qrEventID
  332. } else {
  333. _, err = bot.SendMessageEvent(ce.RoomID, event.EventMessage, &event.MessageEventContent{
  334. MsgType: event.MsgImage,
  335. Body: code,
  336. URL: resp.ContentURI.CUString(),
  337. NewContent: &event.MessageEventContent{
  338. MsgType: event.MsgImage,
  339. Body: code,
  340. URL: resp.ContentURI.CUString(),
  341. },
  342. RelatesTo: &event.RelatesTo{
  343. Type: event.RelReplace,
  344. EventID: qrEventID,
  345. },
  346. })
  347. if err != nil {
  348. user.log.Errorln("Failed to send edited QR code to user:", err)
  349. }
  350. }
  351. }
  352. }
  353. func (user *User) Login(ce *CommandEvent) {
  354. qrChan := make(chan string, 3)
  355. eventIDChan := make(chan id.EventID, 1)
  356. go user.loginQrChannel(ce, qrChan, eventIDChan)
  357. session, jid, err := user.Conn.Login(qrChan, nil)
  358. qrChan <- "stop"
  359. if err != nil {
  360. var eventID id.EventID
  361. select {
  362. case eventID = <-eventIDChan:
  363. default:
  364. }
  365. reply := event.MessageEventContent{
  366. MsgType: event.MsgText,
  367. }
  368. if err == whatsapp.ErrAlreadyLoggedIn {
  369. reply.Body = "You're already logged in"
  370. } else if err == whatsapp.ErrLoginInProgress {
  371. reply.Body = "You have a login in progress already."
  372. } else if err == whatsapp.ErrLoginTimedOut {
  373. reply.Body = "QR code scan timed out. Please try again."
  374. } else {
  375. user.log.Warnln("Failed to log in:", err)
  376. reply.Body = fmt.Sprintf("Unknown error while logging in: %v", err)
  377. }
  378. msg := reply
  379. if eventID != "" {
  380. msg.NewContent = &reply
  381. msg.RelatesTo = &event.RelatesTo{
  382. Type: event.RelReplace,
  383. EventID: eventID,
  384. }
  385. }
  386. _, _ = ce.Bot.SendMessageEvent(ce.RoomID, event.EventMessage, &msg)
  387. return
  388. }
  389. // TODO there's a bit of duplication between this and the provisioning API login method
  390. // Also between the two logout methods (commands.go and provisioning.go)
  391. user.log.Debugln("Successful login as", jid, "via command")
  392. user.ConnectionErrors = 0
  393. user.JID = strings.Replace(jid, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, 1)
  394. user.addToJIDMap()
  395. user.SetSession(&session)
  396. ce.Reply("Successfully logged in, synchronizing chats...")
  397. user.PostLogin()
  398. }
  399. type Chat struct {
  400. whatsapp.Chat
  401. Portal *Portal
  402. Contact whatsapp.Contact
  403. }
  404. type ChatList []Chat
  405. func (cl ChatList) Len() int {
  406. return len(cl)
  407. }
  408. func (cl ChatList) Less(i, j int) bool {
  409. return cl[i].LastMessageTime > cl[j].LastMessageTime
  410. }
  411. func (cl ChatList) Swap(i, j int) {
  412. cl[i], cl[j] = cl[j], cl[i]
  413. }
  414. func (user *User) PostLogin() {
  415. user.sendBridgeStatus(AsmuxPong{OK: true})
  416. user.bridge.Metrics.TrackConnectionState(user.JID, true)
  417. user.bridge.Metrics.TrackLoginState(user.JID, true)
  418. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  419. if !atomic.CompareAndSwapInt32(&user.syncing, 0, 1) {
  420. // TODO we should cleanly stop the old sync and start a new one instead of not starting a new one
  421. user.log.Warnln("There seems to be a post-sync already in progress, not starting a new one")
  422. return
  423. }
  424. user.log.Debugln("Locking processing of incoming messages and starting post-login sync")
  425. user.chatListReceived = make(chan struct{}, 1)
  426. user.syncPortalsDone = make(chan struct{}, 1)
  427. user.syncWait.Add(1)
  428. user.syncStart <- struct{}{}
  429. go user.intPostLogin()
  430. }
  431. func (user *User) tryAutomaticDoublePuppeting() {
  432. if !user.bridge.Config.CanDoublePuppet(user.MXID) {
  433. return
  434. }
  435. user.log.Debugln("Checking if double puppeting needs to be enabled")
  436. puppet := user.bridge.GetPuppetByJID(user.JID)
  437. if len(puppet.CustomMXID) > 0 {
  438. user.log.Debugln("User already has double-puppeting enabled")
  439. // Custom puppet already enabled
  440. return
  441. }
  442. accessToken, err := puppet.loginWithSharedSecret(user.MXID)
  443. if err != nil {
  444. user.log.Warnln("Failed to login with shared secret:", err)
  445. return
  446. }
  447. err = puppet.SwitchCustomMXID(accessToken, user.MXID)
  448. if err != nil {
  449. puppet.log.Warnln("Failed to switch to auto-logined custom puppet:", err)
  450. return
  451. }
  452. user.log.Infoln("Successfully automatically enabled custom puppet")
  453. }
  454. func (user *User) sendBridgeNotice(formatString string, args ...interface{}) {
  455. notice := fmt.Sprintf(formatString, args...)
  456. _, err := user.bridge.Bot.SendNotice(user.GetManagementRoom(), notice)
  457. if err != nil {
  458. user.log.Warnf("Failed to send bridge notice \"%s\": %v", notice, err)
  459. }
  460. }
  461. func (user *User) sendMarkdownBridgeAlert(formatString string, args ...interface{}) {
  462. notice := fmt.Sprintf(formatString, args...)
  463. content := format.RenderMarkdown(notice, true, false)
  464. _, err := user.bridge.Bot.SendMessageEvent(user.GetManagementRoom(), event.EventMessage, content)
  465. if err != nil {
  466. user.log.Warnf("Failed to send bridge alert \"%s\": %v", notice, err)
  467. }
  468. }
  469. func (user *User) postConnPing() bool {
  470. user.log.Debugln("Making post-connection ping")
  471. var err error
  472. for i := 0; ; i++ {
  473. err = user.Conn.AdminTest()
  474. if err == nil {
  475. user.log.Debugln("Post-connection ping OK")
  476. return true
  477. } else if errors.Is(err, whatsapp.ErrConnectionTimeout) && i < 5 {
  478. user.log.Warnfln("Post-connection ping timed out, sending new one")
  479. } else {
  480. break
  481. }
  482. }
  483. user.log.Errorfln("Post-connection ping failed: %v. Disconnecting and then reconnecting after a second", err)
  484. disconnectErr := user.Conn.Disconnect()
  485. if disconnectErr != nil {
  486. user.log.Warnln("Error while disconnecting after failed post-connection ping:", disconnectErr)
  487. }
  488. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
  489. user.bridge.Metrics.TrackDisconnection(user.MXID)
  490. go func() {
  491. time.Sleep(1 * time.Second)
  492. user.tryReconnect(fmt.Sprintf("Post-connection ping failed: %v", err))
  493. }()
  494. return false
  495. }
  496. func (user *User) intPostLogin() {
  497. defer atomic.StoreInt32(&user.syncing, 0)
  498. defer user.syncWait.Done()
  499. user.lastReconnection = time.Now().Unix()
  500. user.createCommunity()
  501. user.tryAutomaticDoublePuppeting()
  502. user.log.Debugln("Waiting for chat list receive confirmation")
  503. select {
  504. case <-user.chatListReceived:
  505. user.log.Debugln("Chat list receive confirmation received in PostLogin")
  506. case <-time.After(time.Duration(user.bridge.Config.Bridge.ChatListWait) * time.Second):
  507. user.log.Warnln("Timed out waiting for chat list to arrive!")
  508. user.postConnPing()
  509. return
  510. }
  511. if !user.postConnPing() {
  512. user.log.Debugln("Post-connection ping failed, unlocking processing of incoming messages.")
  513. return
  514. }
  515. user.log.Debugln("Waiting for portal sync complete confirmation")
  516. select {
  517. case <-user.syncPortalsDone:
  518. user.log.Debugln("Post-connection portal sync complete, unlocking processing of incoming messages.")
  519. // TODO this is too short, maybe a per-portal duration?
  520. case <-time.After(time.Duration(user.bridge.Config.Bridge.PortalSyncWait) * time.Second):
  521. user.log.Warnln("Timed out waiting for portal sync to complete! Unlocking processing of incoming messages.")
  522. }
  523. }
  524. type NormalMessage interface {
  525. GetInfo() whatsapp.MessageInfo
  526. }
  527. func (user *User) HandleEvent(event interface{}) {
  528. switch v := event.(type) {
  529. case NormalMessage:
  530. info := v.GetInfo()
  531. user.messageInput <- PortalMessage{info.RemoteJid, user, v, info.Timestamp}
  532. case whatsapp.MessageRevocation:
  533. user.messageInput <- PortalMessage{v.RemoteJid, user, v, 0}
  534. case whatsapp.StreamEvent:
  535. user.HandleStreamEvent(v)
  536. case []whatsapp.Chat:
  537. user.HandleChatList(v)
  538. case []whatsapp.Contact:
  539. user.HandleContactList(v)
  540. case error:
  541. user.HandleError(v)
  542. case whatsapp.Contact:
  543. go user.HandleNewContact(v)
  544. case whatsapp.BatteryMessage:
  545. user.HandleBatteryMessage(v)
  546. case whatsapp.CallInfo:
  547. user.HandleCallInfo(v)
  548. case whatsapp.PresenceEvent:
  549. go user.HandlePresence(v)
  550. case whatsapp.JSONMsgInfo:
  551. go user.HandleMsgInfo(v)
  552. case whatsapp.ReceivedMessage:
  553. user.HandleReceivedMessage(v)
  554. case whatsapp.ReadMessage:
  555. user.HandleReadMessage(v)
  556. case whatsapp.JSONCommand:
  557. user.HandleCommand(v)
  558. case whatsapp.ChatUpdate:
  559. user.HandleChatUpdate(v)
  560. case whatsapp.ConnInfo:
  561. user.HandleConnInfo(v)
  562. case whatsapp.MuteMessage:
  563. portal := user.bridge.GetPortalByJID(user.PortalKey(v.JID))
  564. if portal != nil {
  565. go user.updateChatMute(nil, portal, v.MutedUntil)
  566. }
  567. case whatsapp.ArchiveMessage:
  568. portal := user.bridge.GetPortalByJID(user.PortalKey(v.JID))
  569. if portal != nil {
  570. go user.updateChatTag(nil, portal, user.bridge.Config.Bridge.ArchiveTag, v.IsArchived)
  571. }
  572. case whatsapp.PinMessage:
  573. portal := user.bridge.GetPortalByJID(user.PortalKey(v.JID))
  574. if portal != nil {
  575. go user.updateChatTag(nil, portal, user.bridge.Config.Bridge.PinnedTag, v.IsPinned)
  576. }
  577. case whatsapp.RawJSONMessage:
  578. user.HandleJSONMessage(v)
  579. case *waProto.WebMessageInfo:
  580. user.updateLastConnectionIfNecessary()
  581. // TODO trace log
  582. //user.log.Debugfln("WebMessageInfo: %+v", v)
  583. case *waBinary.Node:
  584. user.log.Debugfln("Unknown binary message: %+v", v)
  585. default:
  586. user.log.Debugfln("Unknown type of event in HandleEvent: %T", v)
  587. }
  588. }
  589. func (user *User) HandleStreamEvent(evt whatsapp.StreamEvent) {
  590. if evt.Type == whatsapp.StreamSleep {
  591. if user.lastReconnection+60 > time.Now().Unix() {
  592. user.lastReconnection = 0
  593. user.log.Infoln("Stream went to sleep soon after reconnection, making new post-connection ping in 20 seconds")
  594. go func() {
  595. time.Sleep(20 * time.Second)
  596. // TODO if this happens during the post-login sync, it can get stuck forever
  597. // TODO check if the above is still true
  598. user.postConnPing()
  599. }()
  600. }
  601. } else {
  602. user.log.Infofln("Stream event: %+v", evt)
  603. }
  604. }
  605. func (user *User) HandleChatList(chats []whatsapp.Chat) {
  606. user.log.Infoln("Chat list received")
  607. chatMap := make(map[string]whatsapp.Chat)
  608. for _, chat := range user.Conn.Store.Chats {
  609. chatMap[chat.JID] = chat
  610. }
  611. for _, chat := range chats {
  612. chatMap[chat.JID] = chat
  613. }
  614. select {
  615. case user.chatListReceived <- struct{}{}:
  616. default:
  617. }
  618. go user.syncPortals(chatMap, false)
  619. }
  620. func (user *User) updateChatMute(intent *appservice.IntentAPI, portal *Portal, mutedUntil int64) {
  621. if len(portal.MXID) == 0 || !user.bridge.Config.Bridge.MuteBridging {
  622. return
  623. } else if intent == nil {
  624. doublePuppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  625. if doublePuppet == nil || doublePuppet.CustomIntent() == nil {
  626. return
  627. }
  628. intent = doublePuppet.CustomIntent()
  629. }
  630. var err error
  631. if mutedUntil != -1 && mutedUntil < time.Now().Unix() {
  632. user.log.Debugfln("Portal %s is muted until %d, unmuting...", portal.MXID, mutedUntil)
  633. err = intent.DeletePushRule("global", pushrules.RoomRule, string(portal.MXID))
  634. } else {
  635. user.log.Debugfln("Portal %s is muted until %d, muting...", portal.MXID, mutedUntil)
  636. err = intent.PutPushRule("global", pushrules.RoomRule, string(portal.MXID), &mautrix.ReqPutPushRule{
  637. Actions: []pushrules.PushActionType{pushrules.ActionDontNotify},
  638. })
  639. }
  640. if err != nil && !errors.Is(err, mautrix.MNotFound) {
  641. user.log.Warnfln("Failed to update push rule for %s through double puppet: %v", portal.MXID, err)
  642. }
  643. }
  644. type CustomTagData struct {
  645. Order json.Number `json:"order"`
  646. DoublePuppet bool `json:"net.maunium.whatsapp.puppet"`
  647. }
  648. type CustomTagEventContent struct {
  649. Tags map[string]CustomTagData `json:"tags"`
  650. }
  651. func (user *User) updateChatTag(intent *appservice.IntentAPI, portal *Portal, tag string, active bool) {
  652. if len(portal.MXID) == 0 || len(tag) == 0 {
  653. return
  654. } else if intent == nil {
  655. doublePuppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  656. if doublePuppet == nil || doublePuppet.CustomIntent() == nil {
  657. return
  658. }
  659. intent = doublePuppet.CustomIntent()
  660. }
  661. var existingTags CustomTagEventContent
  662. err := intent.GetTagsWithCustomData(portal.MXID, &existingTags)
  663. if err != nil && !errors.Is(err, mautrix.MNotFound) {
  664. user.log.Warnfln("Failed to get tags of %s: %v", portal.MXID, err)
  665. }
  666. currentTag, ok := existingTags.Tags[tag]
  667. if active && !ok {
  668. user.log.Debugln("Adding tag", tag, "to", portal.MXID)
  669. data := CustomTagData{"0.5", true}
  670. err = intent.AddTagWithCustomData(portal.MXID, tag, &data)
  671. } else if !active && ok && currentTag.DoublePuppet {
  672. user.log.Debugln("Removing tag", tag, "from", portal.MXID)
  673. err = intent.RemoveTag(portal.MXID, tag)
  674. } else {
  675. err = nil
  676. }
  677. if err != nil {
  678. user.log.Warnfln("Failed to update tag %s for %s through double puppet: %v", tag, portal.MXID, err)
  679. }
  680. }
  681. func (user *User) syncChatDoublePuppetDetails(doublePuppet *Puppet, chat Chat, justCreated bool) {
  682. if doublePuppet == nil || doublePuppet.CustomIntent() == nil {
  683. return
  684. }
  685. intent := doublePuppet.CustomIntent()
  686. if chat.UnreadCount == 0 {
  687. lastMessage := user.bridge.DB.Message.GetLastInChat(chat.Portal.Key)
  688. if lastMessage != nil {
  689. err := intent.MarkRead(chat.Portal.MXID, lastMessage.MXID)
  690. if err != nil {
  691. user.log.Warnln("Failed to mark %s in %s as read after backfill: %v", lastMessage.MXID, chat.Portal.MXID, err)
  692. }
  693. }
  694. } else if chat.UnreadCount == -1 {
  695. user.log.Debugfln("Invalid unread count (missing field?) in chat info %+v", chat.Source)
  696. }
  697. if justCreated || !user.bridge.Config.Bridge.TagOnlyOnCreate {
  698. user.updateChatMute(intent, chat.Portal, chat.MutedUntil)
  699. user.updateChatTag(intent, chat.Portal, user.bridge.Config.Bridge.ArchiveTag, chat.IsArchived)
  700. user.updateChatTag(intent, chat.Portal, user.bridge.Config.Bridge.PinnedTag, chat.IsPinned)
  701. }
  702. }
  703. func (user *User) syncPortal(chat Chat) {
  704. // Don't sync unless chat meta sync is enabled or portal doesn't exist
  705. if user.bridge.Config.Bridge.ChatMetaSync || len(chat.Portal.MXID) == 0 {
  706. chat.Portal.Sync(user, chat.Contact)
  707. }
  708. err := chat.Portal.BackfillHistory(user, chat.LastMessageTime)
  709. if err != nil {
  710. chat.Portal.log.Errorln("Error backfilling history:", err)
  711. }
  712. }
  713. func (user *User) collectChatList(chatMap map[string]whatsapp.Chat) ChatList {
  714. if chatMap == nil {
  715. chatMap = user.Conn.Store.Chats
  716. }
  717. user.log.Infoln("Reading chat list")
  718. chats := make(ChatList, 0, len(chatMap))
  719. existingKeys := user.GetInCommunityMap()
  720. portalKeys := make([]database.PortalKeyWithMeta, 0, len(chatMap))
  721. for _, chat := range chatMap {
  722. portal := user.GetPortalByJID(chat.JID)
  723. chats = append(chats, Chat{
  724. Chat: chat,
  725. Portal: portal,
  726. Contact: user.Conn.Store.Contacts[chat.JID],
  727. })
  728. var inCommunity, ok bool
  729. if inCommunity, ok = existingKeys[portal.Key]; !ok || !inCommunity {
  730. inCommunity = user.addPortalToCommunity(portal)
  731. if portal.IsPrivateChat() {
  732. puppet := user.bridge.GetPuppetByJID(portal.Key.JID)
  733. user.addPuppetToCommunity(puppet)
  734. }
  735. }
  736. portalKeys = append(portalKeys, database.PortalKeyWithMeta{PortalKey: portal.Key, InCommunity: inCommunity})
  737. }
  738. user.log.Infoln("Read chat list, updating user-portal mapping")
  739. err := user.SetPortalKeys(portalKeys)
  740. if err != nil {
  741. user.log.Warnln("Failed to update user-portal mapping:", err)
  742. }
  743. sort.Sort(chats)
  744. return chats
  745. }
  746. func (user *User) syncPortals(chatMap map[string]whatsapp.Chat, createAll bool) {
  747. // TODO use contexts instead of checking if user.Conn is the same?
  748. connAtStart := user.Conn
  749. chats := user.collectChatList(chatMap)
  750. limit := user.bridge.Config.Bridge.InitialChatSync
  751. if limit < 0 {
  752. limit = len(chats)
  753. }
  754. if user.Conn != connAtStart {
  755. user.log.Debugln("Connection seems to have changed before sync, cancelling")
  756. return
  757. }
  758. now := time.Now().Unix()
  759. user.log.Infoln("Syncing portals")
  760. doublePuppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  761. for i, chat := range chats {
  762. if chat.LastMessageTime+user.bridge.Config.Bridge.SyncChatMaxAge < now {
  763. break
  764. }
  765. create := (chat.LastMessageTime >= user.LastConnection && user.LastConnection > 0) || i < limit
  766. if len(chat.Portal.MXID) > 0 || create || createAll {
  767. user.log.Debugfln("Syncing chat %+v", chat.Chat.Source)
  768. justCreated := len(chat.Portal.MXID) == 0
  769. user.syncPortal(chat)
  770. user.syncChatDoublePuppetDetails(doublePuppet, chat, justCreated)
  771. }
  772. }
  773. if user.Conn != connAtStart {
  774. user.log.Debugln("Connection seems to have changed during sync, cancelling")
  775. return
  776. }
  777. user.UpdateDirectChats(nil)
  778. user.log.Infoln("Finished syncing portals")
  779. select {
  780. case user.syncPortalsDone <- struct{}{}:
  781. default:
  782. }
  783. }
  784. func (user *User) getDirectChats() map[id.UserID][]id.RoomID {
  785. res := make(map[id.UserID][]id.RoomID)
  786. privateChats := user.bridge.DB.Portal.FindPrivateChats(user.JID)
  787. for _, portal := range privateChats {
  788. if len(portal.MXID) > 0 {
  789. res[user.bridge.FormatPuppetMXID(portal.Key.JID)] = []id.RoomID{portal.MXID}
  790. }
  791. }
  792. return res
  793. }
  794. func (user *User) UpdateDirectChats(chats map[id.UserID][]id.RoomID) {
  795. if !user.bridge.Config.Bridge.SyncDirectChatList {
  796. return
  797. }
  798. puppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
  799. if puppet == nil || puppet.CustomIntent() == nil {
  800. return
  801. }
  802. intent := puppet.CustomIntent()
  803. method := http.MethodPatch
  804. if chats == nil {
  805. chats = user.getDirectChats()
  806. method = http.MethodPut
  807. }
  808. user.log.Debugln("Updating m.direct list on homeserver")
  809. var err error
  810. if user.bridge.Config.Homeserver.Asmux {
  811. urlPath := intent.BuildBaseURL("_matrix", "client", "unstable", "com.beeper.asmux", "dms")
  812. _, err = intent.MakeFullRequest(mautrix.FullRequest{
  813. Method: method,
  814. URL: urlPath,
  815. Headers: http.Header{"X-Asmux-Auth": {user.bridge.AS.Registration.AppToken}},
  816. RequestJSON: chats,
  817. })
  818. } else {
  819. existingChats := make(map[id.UserID][]id.RoomID)
  820. err = intent.GetAccountData(event.AccountDataDirectChats.Type, &existingChats)
  821. if err != nil {
  822. user.log.Warnln("Failed to get m.direct list to update it:", err)
  823. return
  824. }
  825. for userID, rooms := range existingChats {
  826. if _, ok := user.bridge.ParsePuppetMXID(userID); !ok {
  827. // This is not a ghost user, include it in the new list
  828. chats[userID] = rooms
  829. } else if _, ok := chats[userID]; !ok && method == http.MethodPatch {
  830. // This is a ghost user, but we're not replacing the whole list, so include it too
  831. chats[userID] = rooms
  832. }
  833. }
  834. err = intent.SetAccountData(event.AccountDataDirectChats.Type, &chats)
  835. }
  836. if err != nil {
  837. user.log.Warnln("Failed to update m.direct list:", err)
  838. }
  839. }
  840. func (user *User) HandleContactList(contacts []whatsapp.Contact) {
  841. contactMap := make(map[whatsapp.JID]whatsapp.Contact)
  842. for _, contact := range contacts {
  843. contactMap[contact.JID] = contact
  844. }
  845. go user.syncPuppets(contactMap)
  846. }
  847. func (user *User) syncPuppets(contacts map[whatsapp.JID]whatsapp.Contact) {
  848. if contacts == nil {
  849. contacts = user.Conn.Store.Contacts
  850. }
  851. _, hasSelf := contacts[user.JID]
  852. if !hasSelf {
  853. contacts[user.JID] = whatsapp.Contact{
  854. Name: user.pushName,
  855. Notify: user.pushName,
  856. JID: user.JID,
  857. }
  858. }
  859. user.log.Infoln("Syncing puppet info from contacts")
  860. for jid, contact := range contacts {
  861. if strings.HasSuffix(jid, whatsapp.NewUserSuffix) {
  862. puppet := user.bridge.GetPuppetByJID(contact.JID)
  863. puppet.Sync(user, contact)
  864. } else if strings.HasSuffix(jid, whatsapp.BroadcastSuffix) {
  865. portal := user.GetPortalByJID(contact.JID)
  866. portal.Sync(user, contact)
  867. }
  868. }
  869. user.log.Infoln("Finished syncing puppet info from contacts")
  870. }
  871. func (user *User) updateLastConnectionIfNecessary() {
  872. if user.LastConnection+60 < time.Now().Unix() {
  873. user.UpdateLastConnection()
  874. }
  875. }
  876. func (user *User) HandleError(err error) {
  877. if !errors.Is(err, whatsapp.ErrInvalidWsData) {
  878. user.log.Errorfln("WhatsApp error: %v", err)
  879. }
  880. if closed, ok := err.(*whatsapp.ErrConnectionClosed); ok {
  881. user.bridge.Metrics.TrackDisconnection(user.MXID)
  882. if closed.Code == 1000 && user.cleanDisconnection {
  883. user.cleanDisconnection = false
  884. if !user.bridge.Config.Bridge.AggressiveReconnect {
  885. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
  886. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  887. user.log.Infoln("Clean disconnection by server")
  888. return
  889. } else {
  890. user.log.Debugln("Clean disconnection by server, but aggressive reconnection is enabled")
  891. }
  892. }
  893. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection was closed with websocket status code %d", closed.Code))
  894. } else if failed, ok := err.(*whatsapp.ErrConnectionFailed); ok {
  895. disconnectErr := user.Conn.Disconnect()
  896. if disconnectErr != nil {
  897. user.log.Warnln("Failed to disconnect after connection fail:", disconnectErr)
  898. }
  899. user.bridge.Metrics.TrackDisconnection(user.MXID)
  900. user.ConnectionErrors++
  901. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection failed: %v", failed.Err))
  902. } else if err == whatsapp.ErrPingFalse {
  903. disconnectErr := user.Conn.Disconnect()
  904. if disconnectErr != nil {
  905. user.log.Warnln("Failed to disconnect after failed ping:", disconnectErr)
  906. }
  907. user.bridge.Metrics.TrackDisconnection(user.MXID)
  908. user.ConnectionErrors++
  909. go user.tryReconnect(fmt.Sprintf("Your WhatsApp connection failed: %v", err))
  910. }
  911. // Otherwise unknown error, probably mostly harmless
  912. }
  913. func (user *User) tryReconnect(msg string) {
  914. user.bridge.Metrics.TrackConnectionState(user.JID, false)
  915. if user.ConnectionErrors > user.bridge.Config.Bridge.MaxConnectionAttempts {
  916. user.sendMarkdownBridgeAlert("%s. Use the `reconnect` command to reconnect.", msg)
  917. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
  918. return
  919. }
  920. if user.bridge.Config.Bridge.ReportConnectionRetry {
  921. user.sendBridgeNotice("%s. Reconnecting...", msg)
  922. // Don't want the same error to be repeated
  923. msg = ""
  924. }
  925. var tries uint
  926. var exponentialBackoff bool
  927. baseDelay := time.Duration(user.bridge.Config.Bridge.ConnectionRetryDelay)
  928. if baseDelay < 0 {
  929. exponentialBackoff = true
  930. baseDelay = -baseDelay + 1
  931. }
  932. delay := baseDelay
  933. ctx, cancel := context.WithCancel(context.Background())
  934. defer cancel()
  935. user.cancelReconnect = cancel
  936. for user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
  937. select {
  938. case <-ctx.Done():
  939. user.log.Debugln("tryReconnect context cancelled, aborting reconnection attempts")
  940. return
  941. default:
  942. }
  943. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWAConnecting})
  944. err := user.Conn.Restore(true, ctx)
  945. if err == nil {
  946. user.ConnectionErrors = 0
  947. if user.bridge.Config.Bridge.ReportConnectionRetry {
  948. user.sendBridgeNotice("Reconnected successfully")
  949. }
  950. user.PostLogin()
  951. return
  952. } else if errors.Is(err, whatsapp.ErrBadRequest) {
  953. user.log.Warnln("Got init 400 error when trying to reconnect, resetting connection...")
  954. err = user.Conn.Disconnect()
  955. if err != nil {
  956. user.log.Debugln("Error while disconnecting for connection reset:", err)
  957. }
  958. } else if errors.Is(err, whatsapp.ErrUnpaired) {
  959. user.log.Errorln("Got init 401 (unpaired) error when trying to reconnect, not retrying")
  960. user.removeFromJIDMap()
  961. //user.JID = ""
  962. user.SetSession(nil)
  963. user.DeleteConnection()
  964. user.sendMarkdownBridgeAlert("\u26a0 Failed to reconnect to WhatsApp: unpaired from phone. " +
  965. "To re-pair your phone, log in again.")
  966. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotLoggedIn})
  967. return
  968. } else if errors.Is(err, whatsapp.ErrAlreadyLoggedIn) {
  969. user.log.Warnln("Reconnection said we're already logged in, not trying anymore")
  970. return
  971. } else {
  972. user.log.Errorln("Error while trying to reconnect after disconnection:", err)
  973. }
  974. tries++
  975. user.ConnectionErrors++
  976. if user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
  977. if exponentialBackoff {
  978. delay = (1 << tries) + baseDelay
  979. }
  980. if user.bridge.Config.Bridge.ReportConnectionRetry {
  981. user.sendBridgeNotice("Reconnection attempt failed: %v. Retrying in %d seconds...", err, delay)
  982. }
  983. time.Sleep(delay * time.Second)
  984. }
  985. }
  986. user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
  987. if user.bridge.Config.Bridge.ReportConnectionRetry {
  988. user.sendMarkdownBridgeAlert("%d reconnection attempts failed. Use the `reconnect` command to try to reconnect manually.", tries)
  989. } else {
  990. user.sendMarkdownBridgeAlert("\u26a0 %s. Additionally, %d reconnection attempts failed. Use the `reconnect` command to try to reconnect.", msg, tries)
  991. }
  992. }
  993. func (user *User) PortalKey(jid whatsapp.JID) database.PortalKey {
  994. return database.NewPortalKey(jid, user.JID)
  995. }
  996. func (user *User) GetPortalByJID(jid whatsapp.JID) *Portal {
  997. return user.bridge.GetPortalByJID(user.PortalKey(jid))
  998. }
  999. func (user *User) runMessageRingBuffer() {
  1000. for msg := range user.messageInput {
  1001. select {
  1002. case user.messageOutput <- msg:
  1003. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  1004. default:
  1005. dropped := <-user.messageOutput
  1006. user.log.Warnln("Buffer is full, dropping message in", dropped.chat)
  1007. user.messageOutput <- msg
  1008. }
  1009. }
  1010. }
  1011. func (user *User) handleMessageLoop() {
  1012. for {
  1013. select {
  1014. case msg := <-user.messageOutput:
  1015. user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
  1016. user.GetPortalByJID(msg.chat).messages <- msg
  1017. case <-user.syncStart:
  1018. user.log.Debugln("Processing of incoming messages is locked")
  1019. user.bridge.Metrics.TrackSyncLock(user.JID, true)
  1020. user.syncWait.Wait()
  1021. user.bridge.Metrics.TrackSyncLock(user.JID, false)
  1022. user.log.Debugln("Processing of incoming messages unlocked")
  1023. }
  1024. }
  1025. }
  1026. func (user *User) HandleNewContact(contact whatsapp.Contact) {
  1027. user.log.Debugfln("Contact message: %+v", contact)
  1028. if strings.HasSuffix(contact.JID, whatsapp.OldUserSuffix) {
  1029. contact.JID = strings.Replace(contact.JID, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, -1)
  1030. }
  1031. if strings.HasSuffix(contact.JID, whatsapp.NewUserSuffix) {
  1032. puppet := user.bridge.GetPuppetByJID(contact.JID)
  1033. puppet.UpdateName(user, contact)
  1034. } else if strings.HasSuffix(contact.JID, whatsapp.BroadcastSuffix) {
  1035. portal := user.GetPortalByJID(contact.JID)
  1036. portal.UpdateName(contact.Name, "", nil, true)
  1037. }
  1038. }
  1039. func (user *User) HandleBatteryMessage(battery whatsapp.BatteryMessage) {
  1040. user.log.Debugfln("Battery message: %+v", battery)
  1041. var notice string
  1042. if !battery.Plugged && battery.Percentage < 15 && user.batteryWarningsSent < 1 {
  1043. notice = fmt.Sprintf("Phone battery low (%d %% remaining)", battery.Percentage)
  1044. user.batteryWarningsSent = 1
  1045. } else if !battery.Plugged && battery.Percentage < 5 && user.batteryWarningsSent < 2 {
  1046. notice = fmt.Sprintf("Phone battery very low (%d %% remaining)", battery.Percentage)
  1047. user.batteryWarningsSent = 2
  1048. } else if battery.Percentage > 15 || battery.Plugged {
  1049. user.batteryWarningsSent = 0
  1050. }
  1051. if notice != "" {
  1052. go user.sendBridgeNotice("%s", notice)
  1053. }
  1054. }
  1055. type FakeMessage struct {
  1056. Text string
  1057. ID string
  1058. Alert bool
  1059. }
  1060. func (user *User) HandleCallInfo(info whatsapp.CallInfo) {
  1061. if info.Data != nil {
  1062. return
  1063. }
  1064. data := FakeMessage{
  1065. ID: info.ID,
  1066. }
  1067. switch info.Type {
  1068. case whatsapp.CallOffer:
  1069. if !user.bridge.Config.Bridge.CallNotices.Start {
  1070. return
  1071. }
  1072. data.Text = "Incoming call"
  1073. data.Alert = true
  1074. case whatsapp.CallOfferVideo:
  1075. if !user.bridge.Config.Bridge.CallNotices.Start {
  1076. return
  1077. }
  1078. data.Text = "Incoming video call"
  1079. data.Alert = true
  1080. case whatsapp.CallTerminate:
  1081. if !user.bridge.Config.Bridge.CallNotices.End {
  1082. return
  1083. }
  1084. data.Text = "Call ended"
  1085. data.ID += "E"
  1086. default:
  1087. return
  1088. }
  1089. portal := user.GetPortalByJID(info.From)
  1090. if portal != nil {
  1091. portal.messages <- PortalMessage{info.From, user, data, 0}
  1092. }
  1093. }
  1094. func (user *User) HandlePresence(info whatsapp.PresenceEvent) {
  1095. puppet := user.bridge.GetPuppetByJID(info.SenderJID)
  1096. switch info.Status {
  1097. case whatsapp.PresenceUnavailable:
  1098. _ = puppet.DefaultIntent().SetPresence("offline")
  1099. case whatsapp.PresenceAvailable:
  1100. if len(puppet.typingIn) > 0 && puppet.typingAt+15 > time.Now().Unix() {
  1101. portal := user.bridge.GetPortalByMXID(puppet.typingIn)
  1102. _, _ = puppet.IntentFor(portal).UserTyping(puppet.typingIn, false, 0)
  1103. puppet.typingIn = ""
  1104. puppet.typingAt = 0
  1105. } else {
  1106. _ = puppet.DefaultIntent().SetPresence("online")
  1107. }
  1108. case whatsapp.PresenceComposing:
  1109. portal := user.GetPortalByJID(info.JID)
  1110. if len(puppet.typingIn) > 0 && puppet.typingAt+15 > time.Now().Unix() {
  1111. if puppet.typingIn == portal.MXID {
  1112. return
  1113. }
  1114. _, _ = puppet.IntentFor(portal).UserTyping(puppet.typingIn, false, 0)
  1115. }
  1116. puppet.typingIn = portal.MXID
  1117. puppet.typingAt = time.Now().Unix()
  1118. _, _ = puppet.IntentFor(portal).UserTyping(portal.MXID, true, 15*1000)
  1119. }
  1120. }
  1121. func (user *User) HandleMsgInfo(info whatsapp.JSONMsgInfo) {
  1122. if (info.Command == whatsapp.MsgInfoCommandAck || info.Command == whatsapp.MsgInfoCommandAcks) && info.Acknowledgement == whatsapp.AckMessageRead {
  1123. portal := user.GetPortalByJID(info.ToJID)
  1124. if len(portal.MXID) == 0 {
  1125. return
  1126. }
  1127. intent := user.bridge.GetPuppetByJID(info.SenderJID).IntentFor(portal)
  1128. for _, msgID := range info.IDs {
  1129. msg := user.bridge.DB.Message.GetByJID(portal.Key, msgID)
  1130. if msg == nil || msg.IsFakeMXID() {
  1131. continue
  1132. }
  1133. err := intent.MarkRead(portal.MXID, msg.MXID)
  1134. if err != nil {
  1135. user.log.Warnln("Failed to mark message %s as read by %s: %v", msg.MXID, info.SenderJID, err)
  1136. }
  1137. }
  1138. }
  1139. }
  1140. func (user *User) HandleReceivedMessage(received whatsapp.ReceivedMessage) {
  1141. if received.Type == "read" {
  1142. go user.markSelfRead(received.Jid, received.Index)
  1143. } else {
  1144. user.log.Debugfln("Unknown received message type: %+v", received)
  1145. }
  1146. }
  1147. func (user *User) HandleReadMessage(read whatsapp.ReadMessage) {
  1148. user.log.Debugfln("Received chat read message: %+v", read)
  1149. go user.markSelfRead(read.Jid, "")
  1150. }
  1151. func (user *User) markSelfRead(jid, messageID string) {
  1152. if strings.HasSuffix(jid, whatsapp.OldUserSuffix) {
  1153. jid = strings.Replace(jid, whatsapp.OldUserSuffix, whatsapp.NewUserSuffix, -1)
  1154. }
  1155. puppet := user.bridge.GetPuppetByJID(user.JID)
  1156. if puppet == nil {
  1157. return
  1158. }
  1159. intent := puppet.CustomIntent()
  1160. if intent == nil {
  1161. return
  1162. }
  1163. portal := user.GetPortalByJID(jid)
  1164. if portal == nil {
  1165. return
  1166. }
  1167. var message *database.Message
  1168. if messageID == "" {
  1169. message = user.bridge.DB.Message.GetLastInChat(portal.Key)
  1170. if message == nil {
  1171. return
  1172. }
  1173. user.log.Debugfln("User read chat %s/%s in WhatsApp mobile (last known event: %s/%s)", portal.Key.JID, portal.MXID, message.JID, message.MXID)
  1174. } else {
  1175. message = user.bridge.DB.Message.GetByJID(portal.Key, messageID)
  1176. if message == nil || message.IsFakeMXID() {
  1177. return
  1178. }
  1179. user.log.Debugfln("User read message %s/%s in %s/%s in WhatsApp mobile", message.JID, message.MXID, portal.Key.JID, portal.MXID)
  1180. }
  1181. err := intent.MarkRead(portal.MXID, message.MXID)
  1182. if err != nil {
  1183. user.log.Warnfln("Failed to bridge own read receipt in %s: %v", jid, err)
  1184. }
  1185. }
  1186. func (user *User) HandleCommand(cmd whatsapp.JSONCommand) {
  1187. switch cmd.Type {
  1188. case whatsapp.CommandPicture:
  1189. if strings.HasSuffix(cmd.JID, whatsapp.NewUserSuffix) {
  1190. puppet := user.bridge.GetPuppetByJID(cmd.JID)
  1191. go puppet.UpdateAvatar(user, cmd.ProfilePicInfo)
  1192. } else if user.bridge.Config.Bridge.ChatMetaSync {
  1193. portal := user.GetPortalByJID(cmd.JID)
  1194. go portal.UpdateAvatar(user, cmd.ProfilePicInfo, true)
  1195. }
  1196. case whatsapp.CommandDisconnect:
  1197. if cmd.Kind == "replaced" {
  1198. user.cleanDisconnection = true
  1199. go user.sendMarkdownBridgeAlert("\u26a0 Your WhatsApp connection was closed by the server because you opened another WhatsApp Web client.\n\n" +
  1200. "Use the `reconnect` command to disconnect the other client and resume bridging.")
  1201. } else {
  1202. user.log.Warnln("Unknown kind of disconnect:", string(cmd.Raw))
  1203. go user.sendMarkdownBridgeAlert("\u26a0 Your WhatsApp connection was closed by the server (reason code: %s).\n\n"+
  1204. "Use the `reconnect` command to reconnect.", cmd.Kind)
  1205. }
  1206. }
  1207. }
  1208. func (user *User) HandleChatUpdate(cmd whatsapp.ChatUpdate) {
  1209. if cmd.Command != whatsapp.ChatUpdateCommandAction {
  1210. return
  1211. }
  1212. portal := user.GetPortalByJID(cmd.JID)
  1213. if len(portal.MXID) == 0 {
  1214. if cmd.Data.Action == whatsapp.ChatActionIntroduce || cmd.Data.Action == whatsapp.ChatActionCreate {
  1215. go func() {
  1216. err := portal.CreateMatrixRoom(user)
  1217. if err != nil {
  1218. user.log.Errorln("Failed to create portal room after receiving join event:", err)
  1219. }
  1220. }()
  1221. }
  1222. return
  1223. }
  1224. // These don't come down the message history :(
  1225. switch cmd.Data.Action {
  1226. case whatsapp.ChatActionAddTopic:
  1227. go portal.UpdateTopic(cmd.Data.AddTopic.Topic, cmd.Data.SenderJID, nil, true)
  1228. case whatsapp.ChatActionRemoveTopic:
  1229. go portal.UpdateTopic("", cmd.Data.SenderJID, nil, true)
  1230. case whatsapp.ChatActionRemove:
  1231. // We ignore leaving groups in the message history to avoid accidentally leaving rejoined groups,
  1232. // but if we get a real-time command that says we left, it should be safe to bridge it.
  1233. if !user.bridge.Config.Bridge.ChatMetaSync {
  1234. for _, jid := range cmd.Data.UserChange.JIDs {
  1235. if jid == user.JID {
  1236. go portal.HandleWhatsAppKick(nil, cmd.Data.SenderJID, cmd.Data.UserChange.JIDs)
  1237. break
  1238. }
  1239. }
  1240. }
  1241. }
  1242. if !user.bridge.Config.Bridge.ChatMetaSync {
  1243. // Ignore chat update commands, we're relying on the message history.
  1244. return
  1245. }
  1246. switch cmd.Data.Action {
  1247. case whatsapp.ChatActionNameChange:
  1248. go portal.UpdateName(cmd.Data.NameChange.Name, cmd.Data.SenderJID, nil, true)
  1249. case whatsapp.ChatActionPromote:
  1250. go portal.ChangeAdminStatus(cmd.Data.UserChange.JIDs, true)
  1251. case whatsapp.ChatActionDemote:
  1252. go portal.ChangeAdminStatus(cmd.Data.UserChange.JIDs, false)
  1253. case whatsapp.ChatActionAnnounce:
  1254. go portal.RestrictMessageSending(cmd.Data.Announce)
  1255. case whatsapp.ChatActionRestrict:
  1256. go portal.RestrictMetadataChanges(cmd.Data.Restrict)
  1257. case whatsapp.ChatActionRemove:
  1258. go portal.HandleWhatsAppKick(nil, cmd.Data.SenderJID, cmd.Data.UserChange.JIDs)
  1259. case whatsapp.ChatActionAdd:
  1260. go portal.HandleWhatsAppInvite(cmd.Data.SenderJID, nil, cmd.Data.UserChange.JIDs)
  1261. case whatsapp.ChatActionIntroduce:
  1262. if cmd.Data.SenderJID != "unknown" {
  1263. go portal.Sync(user, whatsapp.Contact{JID: portal.Key.JID})
  1264. }
  1265. }
  1266. }
  1267. func (user *User) HandleConnInfo(info whatsapp.ConnInfo) {
  1268. if user.Session != nil && info.Connected && len(info.ClientToken) > 0 {
  1269. user.log.Debugln("Received new tokens")
  1270. user.Session.ClientToken = info.ClientToken
  1271. user.Session.ServerToken = info.ServerToken
  1272. user.Session.Wid = info.WID
  1273. user.Update()
  1274. }
  1275. if len(info.PushName) > 0 {
  1276. user.pushName = info.PushName
  1277. }
  1278. }
  1279. func (user *User) HandleJSONMessage(evt whatsapp.RawJSONMessage) {
  1280. if !json.Valid(evt.RawMessage) {
  1281. return
  1282. }
  1283. user.log.Debugfln("JSON message with tag %s: %s", evt.Tag, evt.RawMessage)
  1284. user.updateLastConnectionIfNecessary()
  1285. }
  1286. func (user *User) NeedsRelaybot(portal *Portal) bool {
  1287. return !user.HasSession() || !user.IsInPortal(portal.Key)
  1288. }