|
@@ -102,18 +102,27 @@ func (bridge *Bridge) GetAllPortals() []*Portal {
|
|
|
}
|
|
|
|
|
|
func (bridge *Bridge) NewPortal(dbPortal *database.Portal) *Portal {
|
|
|
- return &Portal{
|
|
|
+ portal := &Portal{
|
|
|
Portal: dbPortal,
|
|
|
bridge: bridge,
|
|
|
log: bridge.Log.Sub(fmt.Sprintf("Portal/%s", dbPortal.Key)),
|
|
|
|
|
|
messageLocks: make(map[types.WhatsAppMessageID]sync.Mutex),
|
|
|
recentlyHandled: [recentlyHandledLength]types.WhatsAppMessageID{},
|
|
|
+
|
|
|
+ messages: make(chan PortalMessage, 128),
|
|
|
}
|
|
|
+ go portal.handleMessageLoop()
|
|
|
+ return portal
|
|
|
}
|
|
|
|
|
|
const recentlyHandledLength = 100
|
|
|
|
|
|
+type PortalMessage struct {
|
|
|
+ source *User
|
|
|
+ data interface{}
|
|
|
+}
|
|
|
+
|
|
|
type Portal struct {
|
|
|
*database.Portal
|
|
|
|
|
@@ -128,9 +137,47 @@ type Portal struct {
|
|
|
recentlyHandledLock sync.Mutex
|
|
|
recentlyHandledIndex uint8
|
|
|
|
|
|
+ lastMessageTs uint64
|
|
|
+
|
|
|
+ messages chan PortalMessage
|
|
|
+
|
|
|
isPrivate *bool
|
|
|
}
|
|
|
|
|
|
+func (portal *Portal) handleMessageLoop() {
|
|
|
+ for msg := range portal.messages {
|
|
|
+ if len(portal.MXID) == 0 {
|
|
|
+ _, isRevocation := msg.data.(whatsappExt.MessageRevocation)
|
|
|
+ if isRevocation {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ err := portal.CreateMatrixRoom(msg.source)
|
|
|
+ if err != nil {
|
|
|
+ portal.log.Errorln("Failed to create portal room:", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ portal.handleMessage(msg)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (portal *Portal) handleMessage(msg PortalMessage) {
|
|
|
+ switch data := msg.data.(type) {
|
|
|
+ case whatsapp.TextMessage:
|
|
|
+ portal.HandleTextMessage(msg.source, data)
|
|
|
+ case whatsapp.ImageMessage:
|
|
|
+ portal.HandleMediaMessage(msg.source, data.Download, data.Thumbnail, data.Info, data.Type, data.Caption)
|
|
|
+ case whatsapp.VideoMessage:
|
|
|
+ portal.HandleMediaMessage(msg.source, data.Download, data.Thumbnail, data.Info, data.Type, data.Caption)
|
|
|
+ case whatsapp.AudioMessage:
|
|
|
+ portal.HandleMediaMessage(msg.source, data.Download, nil, data.Info, data.Type, "")
|
|
|
+ case whatsapp.DocumentMessage:
|
|
|
+ portal.HandleMediaMessage(msg.source, data.Download, data.Thumbnail, data.Info, data.Type, data.Title)
|
|
|
+ case whatsappExt.MessageRevocation:
|
|
|
+ portal.HandleMessageRevoke(msg.source, data)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (portal *Portal) getMessageLock(messageID types.WhatsAppMessageID) sync.Mutex {
|
|
|
portal.messageLocksLock.Lock()
|
|
|
defer portal.messageLocksLock.Unlock()
|
|
@@ -194,16 +241,17 @@ func (portal *Portal) markHandled(source *User, message *waProto.WebMessageInfo,
|
|
|
portal.recentlyHandled[index] = msg.JID
|
|
|
}
|
|
|
|
|
|
-func (portal *Portal) startHandling(id types.WhatsAppMessageID) (*sync.Mutex, bool) {
|
|
|
- if portal.isRecentlyHandled(id) {
|
|
|
+func (portal *Portal) startHandling(info whatsapp.MessageInfo) (*sync.Mutex, bool) {
|
|
|
+ if portal.lastMessageTs > info.Timestamp+1 || portal.isRecentlyHandled(info.Id) {
|
|
|
return nil, false
|
|
|
}
|
|
|
- lock := portal.getMessageLock(id)
|
|
|
+ lock := portal.getMessageLock(info.Id)
|
|
|
lock.Lock()
|
|
|
- if portal.isDuplicate(id) {
|
|
|
+ if portal.isDuplicate(info.Id) {
|
|
|
lock.Unlock()
|
|
|
return nil, false
|
|
|
}
|
|
|
+ portal.lastMessageTs = info.Timestamp
|
|
|
return &lock, true
|
|
|
}
|
|
|
|
|
@@ -476,6 +524,27 @@ func (portal *Portal) RestrictMetadataChanges(restrict bool) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (portal *Portal) FillHistory(user *User) error {
|
|
|
+ resp, err := user.Conn.LoadMessages(portal.Key.JID, "", 50)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ messages, ok := resp.Content.([]interface{})
|
|
|
+ if !ok {
|
|
|
+ return fmt.Errorf("history response not list")
|
|
|
+ }
|
|
|
+ for _, rawMessage := range messages {
|
|
|
+ message, ok := rawMessage.(*waProto.WebMessageInfo)
|
|
|
+ if !ok {
|
|
|
+ portal.log.Warnln("Unexpected non-WebMessageInfo item in history response:", rawMessage)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ fmt.Println("Filling history", message.GetKey(), message.GetMessageTimestamp())
|
|
|
+ portal.handleMessage(PortalMessage{user, whatsapp.ParseProtoMessage(message)})
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
func (portal *Portal) CreateMatrixRoom(user *User) error {
|
|
|
portal.roomCreateLock.Lock()
|
|
|
defer portal.roomCreateLock.Unlock()
|
|
@@ -523,6 +592,10 @@ func (portal *Portal) CreateMatrixRoom(user *User) error {
|
|
|
}
|
|
|
portal.MXID = resp.RoomID
|
|
|
portal.Update()
|
|
|
+ err = portal.FillHistory(user)
|
|
|
+ if err != nil {
|
|
|
+ portal.log.Errorln("Failed to fill history:", err)
|
|
|
+ }
|
|
|
return nil
|
|
|
}
|
|
|
|
|
@@ -605,17 +678,15 @@ func (portal *Portal) HandleMessageRevoke(user *User, message whatsappExt.Messag
|
|
|
}
|
|
|
|
|
|
func (portal *Portal) HandleTextMessage(source *User, message whatsapp.TextMessage) {
|
|
|
- lock, ok := portal.startHandling(message.Info.Id)
|
|
|
- if !ok {
|
|
|
+ if len(portal.MXID) == 0 {
|
|
|
return
|
|
|
}
|
|
|
- defer lock.Unlock()
|
|
|
|
|
|
- err := portal.CreateMatrixRoom(source)
|
|
|
- if err != nil {
|
|
|
- portal.log.Errorln("Failed to create portal room:", err)
|
|
|
+ lock, ok := portal.startHandling(message.Info)
|
|
|
+ if !ok {
|
|
|
return
|
|
|
}
|
|
|
+ defer lock.Unlock()
|
|
|
|
|
|
intent := portal.GetMessageIntent(source, message.Info)
|
|
|
if intent == nil {
|
|
@@ -640,17 +711,15 @@ func (portal *Portal) HandleTextMessage(source *User, message whatsapp.TextMessa
|
|
|
}
|
|
|
|
|
|
func (portal *Portal) HandleMediaMessage(source *User, download func() ([]byte, error), thumbnail []byte, info whatsapp.MessageInfo, mimeType, caption string) {
|
|
|
- lock, ok := portal.startHandling(info.Id)
|
|
|
- if !ok {
|
|
|
+ if len(portal.MXID) == 0 {
|
|
|
return
|
|
|
}
|
|
|
- defer lock.Unlock()
|
|
|
|
|
|
- err := portal.CreateMatrixRoom(source)
|
|
|
- if err != nil {
|
|
|
- portal.log.Errorln("Failed to create portal room:", err)
|
|
|
+ lock, ok := portal.startHandling(info)
|
|
|
+ if !ok {
|
|
|
return
|
|
|
}
|
|
|
+ defer lock.Unlock()
|
|
|
|
|
|
intent := portal.GetMessageIntent(source, info)
|
|
|
if intent == nil {
|
|
@@ -1018,6 +1087,9 @@ func (portal *Portal) Cleanup(puppetsOnly bool) {
|
|
|
return
|
|
|
}
|
|
|
for member, _ := range members.Joined {
|
|
|
+ if member == intent.UserID {
|
|
|
+ continue
|
|
|
+ }
|
|
|
puppet := portal.bridge.GetPuppetByMXID(member)
|
|
|
if puppet != nil {
|
|
|
_, err = puppet.Intent().LeaveRoom(portal.MXID)
|
|
@@ -1031,6 +1103,10 @@ func (portal *Portal) Cleanup(puppetsOnly bool) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ _, err = intent.LeaveRoom(portal.MXID)
|
|
|
+ if err != nil {
|
|
|
+ portal.log.Errorln("Error leaving with main intent while cleaning up portal:", err)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (portal *Portal) HandleMatrixLeave(sender *User) {
|