Эх сурвалжийг харах

Make bridge state monitoring more generic

Tulir Asokan 4 жил өмнө
parent
commit
74e21b8e1d
7 өөрчлөгдсөн 219 нэмэгдсэн , 184 устгасан
  1. 0 165
      asmux.go
  2. 193 0
      bridgestate.go
  3. 1 1
      commands.go
  4. 6 5
      config/config.go
  5. 5 0
      example-config.yaml
  6. 2 1
      provisioning.go
  7. 12 12
      user.go

+ 0 - 165
asmux.go

@@ -1,165 +0,0 @@
-// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
-// Copyright (C) 2020 Tulir Asokan
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program.  If not, see <https://www.gnu.org/licenses/>.
-
-package main
-
-import (
-	"context"
-	"errors"
-	"net/http"
-	"sync/atomic"
-	"time"
-
-	"github.com/Rhymen/go-whatsapp"
-	"maunium.net/go/mautrix/id"
-)
-
-type AsmuxError string
-
-const (
-	AsmuxWANotLoggedIn  AsmuxError = "wa-not-logged-in"
-	AsmuxWANotConnected AsmuxError = "wa-not-connected"
-	AsmuxWAConnecting   AsmuxError = "wa-connecting"
-	AsmuxWATimeout      AsmuxError = "wa-timeout"
-	AsmuxWAPingFalse    AsmuxError = "wa-ping-false"
-	AsmuxWAPingError    AsmuxError = "wa-ping-error"
-)
-
-var asmuxHumanErrors = map[AsmuxError]string{
-	AsmuxWANotLoggedIn:  "You're not logged into WhatsApp",
-	AsmuxWANotConnected: "You're not connected to WhatsApp",
-	AsmuxWAConnecting:   "Trying to reconnect to WhatsApp. Please make sure WhatsApp is running on your phone and connected to the internet.",
-	AsmuxWATimeout:      "WhatsApp on your phone is not responding. Please make sure it is running and connected to the internet.",
-	AsmuxWAPingFalse:    "WhatsApp returned an error, reconnecting. Please make sure WhatsApp is running on your phone and connected to the internet.",
-	AsmuxWAPingError:    "WhatsApp returned an unknown error",
-}
-
-type AsmuxPong struct {
-	OK          bool       `json:"ok"`
-	Timestamp   int64      `json:"timestamp"`
-	TTL         int        `json:"ttl"`
-	ErrorSource string     `json:"error_source,omitempty"`
-	Error       AsmuxError `json:"error,omitempty"`
-	Message     string     `json:"message,omitempty"`
-}
-
-func (pong *AsmuxPong) fill() {
-	pong.Timestamp = time.Now().Unix()
-	if !pong.OK {
-		pong.TTL = 60
-		pong.ErrorSource = "bridge"
-		pong.Message = asmuxHumanErrors[pong.Error]
-	} else {
-		pong.TTL = 240
-	}
-}
-
-func (pong *AsmuxPong) shouldDeduplicate(newPong *AsmuxPong) bool {
-	if pong == nil || pong.OK != newPong.OK || pong.Error != newPong.Error {
-		return false
-	}
-	return pong.Timestamp+int64(pong.TTL/5) > time.Now().Unix()
-}
-
-func (user *User) setupAdminTestHooks() {
-	if !user.bridge.Config.Homeserver.Asmux {
-		return
-	}
-	user.Conn.AdminTestHook = func(err error) {
-		if errors.Is(err, whatsapp.ErrConnectionTimeout) {
-			user.sendBridgeStatus(AsmuxPong{Error: AsmuxWATimeout})
-		} else if errors.Is(err, whatsapp.ErrPingFalse) {
-			user.sendBridgeStatus(AsmuxPong{Error: AsmuxWAPingFalse})
-		} else if err == nil {
-			user.sendBridgeStatus(AsmuxPong{OK: true})
-		} else {
-			user.sendBridgeStatus(AsmuxPong{Error: AsmuxWAPingError})
-		}
-	}
-	user.Conn.CountTimeoutHook = func() {
-		user.sendBridgeStatus(AsmuxPong{Error: AsmuxWATimeout})
-	}
-}
-
-func (user *User) sendBridgeStatus(state AsmuxPong) {
-	if !user.bridge.Config.Homeserver.Asmux {
-		return
-	}
-	state.fill()
-	if user.prevBridgeStatus != nil && user.prevBridgeStatus.shouldDeduplicate(&state) {
-		return
-	}
-	cli := user.bridge.AS.BotClient()
-	url := cli.BuildBaseURL("_matrix", "client", "unstable", "com.beeper.asmux", "pong")
-	user.log.Debugfln("Sending bridge state to asmux: %+v", state)
-	_, err := cli.MakeRequest("POST", url, &state, nil)
-	if err != nil {
-		user.log.Warnln("Failed to update bridge state in asmux:", err)
-	} else {
-		user.prevBridgeStatus = &state
-	}
-}
-
-var asmuxPingID uint32 = 0
-
-func (prov *ProvisioningAPI) AsmuxPing(w http.ResponseWriter, r *http.Request) {
-	if !prov.bridge.AS.CheckServerToken(w, r) {
-		return
-	}
-	userID := r.URL.Query().Get("user_id")
-	user := prov.bridge.GetUserByMXID(id.UserID(userID))
-	var resp AsmuxPong
-	if user.Conn == nil {
-		if user.Session == nil {
-			resp.Error = AsmuxWANotLoggedIn
-		} else {
-			resp.Error = AsmuxWANotConnected
-		}
-	} else {
-		if user.Conn.IsConnected() && user.Conn.IsLoggedIn() {
-			pingID := atomic.AddUint32(&asmuxPingID, 1)
-			user.log.Debugfln("Pinging WhatsApp mobile due to asmux /ping API request (ID %d)", pingID)
-			err := user.Conn.AdminTestWithSuppress(true)
-			if errors.Is(r.Context().Err(), context.Canceled) {
-				user.log.Warnfln("Ping request %d was canceled before we responded (response was %v)", pingID, err)
-				user.prevBridgeStatus = nil
-				return
-			}
-			user.log.Debugfln("Ping %d response: %v", pingID, err)
-			if err == whatsapp.ErrPingFalse {
-				user.log.Debugln("Forwarding ping false error from provisioning API to HandleError")
-				go user.HandleError(err)
-				resp.Error = AsmuxWAPingFalse
-			} else if errors.Is(err, whatsapp.ErrConnectionTimeout) {
-				resp.Error = AsmuxWATimeout
-			} else if err != nil {
-				resp.Error = AsmuxWAPingError
-			} else {
-				resp.OK = true
-			}
-		} else if user.Conn.IsLoginInProgress() {
-			resp.Error = AsmuxWAConnecting
-		} else if user.Conn.IsConnected() {
-			resp.Error = AsmuxWANotLoggedIn
-		} else {
-			resp.Error = AsmuxWANotConnected
-		}
-	}
-	resp.fill()
-	user.log.Debugfln("Responding bridge state to asmux: %+v", resp)
-	jsonResponse(w, http.StatusOK, &resp)
-	user.prevBridgeStatus = &resp
-}

+ 193 - 0
bridgestate.go

@@ -0,0 +1,193 @@
+// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
+// Copyright (C) 2021 Tulir Asokan
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+package main
+
+import (
+	"bytes"
+	"context"
+	"encoding/json"
+	"errors"
+	"io/ioutil"
+	"net/http"
+	"sync/atomic"
+	"time"
+
+	"github.com/Rhymen/go-whatsapp"
+	"maunium.net/go/mautrix/id"
+)
+
+type BridgeErrorCode string
+
+const (
+	WANotLoggedIn  BridgeErrorCode = "wa-not-logged-in"
+	WANotConnected BridgeErrorCode = "wa-not-connected"
+	WAConnecting   BridgeErrorCode = "wa-connecting"
+	WATimeout      BridgeErrorCode = "wa-timeout"
+	WAPingFalse    BridgeErrorCode = "wa-ping-false"
+	WAPingError    BridgeErrorCode = "wa-ping-error"
+)
+
+var bridgeHumanErrors = map[BridgeErrorCode]string{
+	WANotLoggedIn:  "You're not logged into WhatsApp",
+	WANotConnected: "You're not connected to WhatsApp",
+	WAConnecting:   "Trying to reconnect to WhatsApp. Please make sure WhatsApp is running on your phone and connected to the internet.",
+	WATimeout:      "WhatsApp on your phone is not responding. Please make sure it is running and connected to the internet.",
+	WAPingFalse:    "WhatsApp returned an error, reconnecting. Please make sure WhatsApp is running on your phone and connected to the internet.",
+	WAPingError:    "WhatsApp returned an unknown error",
+}
+
+type BridgeState struct {
+	OK          bool            `json:"ok"`
+	Timestamp   int64           `json:"timestamp"`
+	TTL         int             `json:"ttl"`
+	ErrorSource string          `json:"error_source,omitempty"`
+	Error       BridgeErrorCode `json:"error,omitempty"`
+	Message     string          `json:"message,omitempty"`
+
+	UserID id.UserID `json:"user_id"`
+}
+
+func (pong *BridgeState) fill() {
+	pong.Timestamp = time.Now().Unix()
+	if !pong.OK {
+		pong.TTL = 60
+		pong.ErrorSource = "bridge"
+		pong.Message = bridgeHumanErrors[pong.Error]
+	} else {
+		pong.TTL = 240
+	}
+}
+
+func (pong *BridgeState) shouldDeduplicate(newPong *BridgeState) bool {
+	if pong == nil || pong.OK != newPong.OK || pong.Error != newPong.Error {
+		return false
+	}
+	return pong.Timestamp+int64(pong.TTL/5) > time.Now().Unix()
+}
+
+func (user *User) setupAdminTestHooks() {
+	if !user.bridge.Config.Homeserver.Asmux {
+		return
+	}
+	user.Conn.AdminTestHook = func(err error) {
+		if errors.Is(err, whatsapp.ErrConnectionTimeout) {
+			user.sendBridgeState(BridgeState{Error: WATimeout})
+		} else if errors.Is(err, whatsapp.ErrPingFalse) {
+			user.sendBridgeState(BridgeState{Error: WAPingFalse})
+		} else if err == nil {
+			user.sendBridgeState(BridgeState{OK: true})
+		} else {
+			user.sendBridgeState(BridgeState{Error: WAPingError})
+		}
+	}
+	user.Conn.CountTimeoutHook = func() {
+		user.sendBridgeState(BridgeState{Error: WATimeout})
+	}
+}
+
+func (user *User) sendBridgeState(state BridgeState) {
+	if len(user.bridge.Config.Homeserver.StatusEndpoint) == 0 {
+		return
+	}
+
+	state.UserID = user.MXID
+	state.fill()
+	if user.prevBridgeStatus != nil && user.prevBridgeStatus.shouldDeduplicate(&state) {
+		return
+	}
+
+	var body bytes.Buffer
+	var err error
+	if err = json.NewEncoder(&body).Encode(&state); err != nil {
+		user.log.Warnln("Failed to encode bridge state update JSON:", err)
+		return
+	}
+
+	ctx, cancel := context.WithTimeout(context.Background(), 30 * time.Second)
+	defer cancel()
+
+	var req *http.Request
+	var resp *http.Response
+	if req, err = http.NewRequestWithContext(ctx, http.MethodPost, user.bridge.Config.Homeserver.StatusEndpoint, &body); err != nil {
+		user.log.Warnln("Failed to prepare bridge state update request:", err)
+	} else if resp, err = http.DefaultClient.Do(req); err != nil {
+		user.log.Warnln("Failed to send bridge state update:", err)
+	} else if resp.StatusCode < 200 || resp.StatusCode > 299 {
+		respBody, _ := ioutil.ReadAll(resp.Body)
+		if respBody != nil {
+			respBody = bytes.ReplaceAll(respBody, []byte("\n"), []byte("\\n"))
+		}
+		user.log.Warnfln("Unexpected status code %d sending bridge state update: %s", respBody)
+	} else {
+		user.prevBridgeStatus = &state
+	}
+	if resp != nil && resp.Body != nil {
+		_ = resp.Body.Close()
+	}
+}
+
+var bridgeStatePingID uint32 = 0
+
+func (prov *ProvisioningAPI) BridgeStatePing(w http.ResponseWriter, r *http.Request) {
+	if !prov.bridge.AS.CheckServerToken(w, r) {
+		return
+	}
+	userID := r.URL.Query().Get("user_id")
+	user := prov.bridge.GetUserByMXID(id.UserID(userID))
+	var resp BridgeState
+	if user.Conn == nil {
+		if user.Session == nil {
+			resp.Error = WANotLoggedIn
+		} else {
+			resp.Error = WANotConnected
+		}
+	} else {
+		if user.Conn.IsConnected() && user.Conn.IsLoggedIn() {
+			pingID := atomic.AddUint32(&bridgeStatePingID, 1)
+			user.log.Debugfln("Pinging WhatsApp mobile due to asmux /ping API request (ID %d)", pingID)
+			err := user.Conn.AdminTestWithSuppress(true)
+			if errors.Is(r.Context().Err(), context.Canceled) {
+				user.log.Warnfln("Ping request %d was canceled before we responded (response was %v)", pingID, err)
+				user.prevBridgeStatus = nil
+				return
+			}
+			user.log.Debugfln("Ping %d response: %v", pingID, err)
+			if err == whatsapp.ErrPingFalse {
+				user.log.Debugln("Forwarding ping false error from provisioning API to HandleError")
+				go user.HandleError(err)
+				resp.Error = WAPingFalse
+			} else if errors.Is(err, whatsapp.ErrConnectionTimeout) {
+				resp.Error = WATimeout
+			} else if err != nil {
+				resp.Error = WAPingError
+			} else {
+				resp.OK = true
+			}
+		} else if user.Conn.IsLoginInProgress() {
+			resp.Error = WAConnecting
+		} else if user.Conn.IsConnected() {
+			resp.Error = WANotLoggedIn
+		} else {
+			resp.Error = WANotConnected
+		}
+	}
+	resp.UserID = user.MXID
+	resp.fill()
+	user.log.Debugfln("Responding bridge state to asmux: %+v", resp)
+	jsonResponse(w, http.StatusOK, &resp)
+	user.prevBridgeStatus = &resp
+}

+ 1 - 1
commands.go

@@ -569,7 +569,7 @@ func (handler *CommandHandler) CommandDisconnect(ce *CommandEvent) {
 		return
 	}
 	ce.User.bridge.Metrics.TrackConnectionState(ce.User.JID, false)
-	ce.User.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
+	ce.User.sendBridgeState(BridgeState{Error: WANotConnected})
 	ce.Reply("Successfully disconnected. Use the `reconnect` command to reconnect.")
 }
 

+ 6 - 5
config/config.go

@@ -27,9 +27,10 @@ import (
 
 type Config struct {
 	Homeserver struct {
-		Address string `yaml:"address"`
-		Domain  string `yaml:"domain"`
-		Asmux   bool   `yaml:"asmux"`
+		Address        string `yaml:"address"`
+		Domain         string `yaml:"domain"`
+		Asmux          bool   `yaml:"asmux"`
+		StatusEndpoint string `yaml:"status_endpoint"`
 	} `yaml:"homeserver"`
 
 	AppService struct {
@@ -69,8 +70,8 @@ type Config struct {
 	} `yaml:"metrics"`
 
 	WhatsApp struct {
-		OSName string `yaml:"os_name"`
-		BrowserName  string `yaml:"browser_name"`
+		OSName      string `yaml:"os_name"`
+		BrowserName string `yaml:"browser_name"`
 	} `yaml:"whatsapp"`
 
 	Bridge BridgeConfig `yaml:"bridge"`

+ 5 - 0
example-config.yaml

@@ -5,6 +5,11 @@ homeserver:
     # The domain of the homeserver (for MXIDs, etc).
     domain: example.com
 
+    # The URL to push real-time bridge status to.
+    # If set, the bridge will make POST requests to this URL whenever a user's whatsapp connection state changes.
+    # The bridge will use the as_token to authorize requests.
+    status_endpoint: null
+
 # Application service host/registration related details.
 # Changing these values requires regeneration of the registration.
 appservice:

+ 2 - 1
provisioning.go

@@ -53,7 +53,8 @@ func (prov *ProvisioningAPI) Init() {
 	r.HandleFunc("/delete_connection", prov.DeleteConnection).Methods(http.MethodPost)
 	r.HandleFunc("/disconnect", prov.Disconnect).Methods(http.MethodPost)
 	r.HandleFunc("/reconnect", prov.Reconnect).Methods(http.MethodPost)
-	prov.bridge.AS.Router.HandleFunc("/_matrix/app/com.beeper.asmux/ping", prov.AsmuxPing).Methods(http.MethodPost)
+	prov.bridge.AS.Router.HandleFunc("/_matrix/app/com.beeper.asmux/ping", prov.BridgeStatePing).Methods(http.MethodPost)
+	prov.bridge.AS.Router.HandleFunc("/_matrix/app/com.beeper.bridge_state", prov.BridgeStatePing).Methods(http.MethodPost)
 }
 
 type responseWrap struct {

+ 12 - 12
user.go

@@ -81,7 +81,7 @@ type User struct {
 	connLock        sync.Mutex
 	cancelReconnect func()
 
-	prevBridgeStatus *AsmuxPong
+	prevBridgeStatus *BridgeState
 }
 
 func (bridge *Bridge) GetUserByMXID(userID id.UserID) *User {
@@ -122,7 +122,7 @@ func (user *User) removeFromJIDMap() {
 	}
 	user.bridge.usersLock.Unlock()
 	user.bridge.Metrics.TrackLoginState(user.JID, false)
-	user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotLoggedIn})
+	user.sendBridgeState(BridgeState{Error: WANotLoggedIn})
 }
 
 func (bridge *Bridge) GetAllUsers() []*User {
@@ -257,7 +257,7 @@ func (user *User) Connect(evenIfNoSession bool) bool {
 	}
 	user.log.Debugln("Connecting to WhatsApp")
 	if user.Session != nil {
-		user.sendBridgeStatus(AsmuxPong{Error: AsmuxWAConnecting})
+		user.sendBridgeState(BridgeState{Error: WAConnecting})
 	}
 	timeout := time.Duration(user.bridge.Config.Bridge.ConnectionTimeout)
 	if timeout == 0 {
@@ -289,7 +289,7 @@ func (user *User) DeleteConnection() {
 	user.Conn.RemoveHandlers()
 	user.Conn = nil
 	user.bridge.Metrics.TrackConnectionState(user.JID, false)
-	user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
+	user.sendBridgeState(BridgeState{Error: WANotConnected})
 	user.connLock.Unlock()
 }
 
@@ -312,7 +312,7 @@ func (user *User) RestoreSession() bool {
 				user.DeleteConnection()
 				return false
 			} else {
-				user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
+				user.sendBridgeState(BridgeState{Error: WANotConnected})
 				user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp. Make sure WhatsApp " +
 					"on your phone is reachable and use `reconnect` to try connecting again.")
 			}
@@ -462,7 +462,7 @@ func (cl ChatList) Swap(i, j int) {
 }
 
 func (user *User) PostLogin() {
-	user.sendBridgeStatus(AsmuxPong{OK: true})
+	user.sendBridgeState(BridgeState{OK: true})
 	user.bridge.Metrics.TrackConnectionState(user.JID, true)
 	user.bridge.Metrics.TrackLoginState(user.JID, true)
 	user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
@@ -539,7 +539,7 @@ func (user *User) postConnPing() bool {
 	if disconnectErr != nil {
 		user.log.Warnln("Error while disconnecting after failed post-connection ping:", disconnectErr)
 	}
-	user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
+	user.sendBridgeState(BridgeState{Error: WANotConnected})
 	user.bridge.Metrics.TrackDisconnection(user.MXID)
 	go func() {
 		time.Sleep(1 * time.Second)
@@ -969,7 +969,7 @@ func (user *User) HandleError(err error) {
 		if closed.Code == 1000 && user.cleanDisconnection {
 			user.cleanDisconnection = false
 			if !user.bridge.Config.Bridge.AggressiveReconnect {
-				user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
+				user.sendBridgeState(BridgeState{Error: WANotConnected})
 				user.bridge.Metrics.TrackConnectionState(user.JID, false)
 				user.log.Infoln("Clean disconnection by server")
 				return
@@ -1002,7 +1002,7 @@ func (user *User) tryReconnect(msg string) {
 	user.bridge.Metrics.TrackConnectionState(user.JID, false)
 	if user.ConnectionErrors > user.bridge.Config.Bridge.MaxConnectionAttempts {
 		user.sendMarkdownBridgeAlert("%s. Use the `reconnect` command to reconnect.", msg)
-		user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
+		user.sendBridgeState(BridgeState{Error: WANotConnected})
 		return
 	}
 	if user.bridge.Config.Bridge.ReportConnectionRetry {
@@ -1028,7 +1028,7 @@ func (user *User) tryReconnect(msg string) {
 			return
 		default:
 		}
-		user.sendBridgeStatus(AsmuxPong{Error: AsmuxWAConnecting})
+		user.sendBridgeState(BridgeState{Error: WAConnecting})
 		err := user.Conn.Restore(true, ctx)
 		if err == nil {
 			user.ConnectionErrors = 0
@@ -1051,7 +1051,7 @@ func (user *User) tryReconnect(msg string) {
 			user.DeleteConnection()
 			user.sendMarkdownBridgeAlert("\u26a0 Failed to reconnect to WhatsApp: unpaired from phone. " +
 				"To re-pair your phone, log in again.")
-			user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotLoggedIn})
+			user.sendBridgeState(BridgeState{Error: WANotLoggedIn})
 			return
 		} else if errors.Is(err, whatsapp.ErrAlreadyLoggedIn) {
 			user.log.Warnln("Reconnection said we're already logged in, not trying anymore")
@@ -1072,7 +1072,7 @@ func (user *User) tryReconnect(msg string) {
 		}
 	}
 
-	user.sendBridgeStatus(AsmuxPong{Error: AsmuxWANotConnected})
+	user.sendBridgeState(BridgeState{Error: WANotConnected})
 	if user.bridge.Config.Bridge.ReportConnectionRetry {
 		user.sendMarkdownBridgeAlert("%d reconnection attempts failed. Use the `reconnect` command to try to reconnect manually.", tries)
 	} else {