Эх сурвалжийг харах

Send dummy event after backfill

This is necessary in case there aren't any real messages before the next
backfill. Otherwise the next backfill would go before the old backfill
(at the top of the room) rather than at the bottom of the room.
Tulir Asokan 3 жил өмнө
parent
commit
b874d324eb
2 өөрчлөгдсөн 48 нэмэгдсэн , 197 устгасан
  1. 6 1
      database/message.go
  2. 42 196
      portal.go

+ 6 - 1
database/message.go

@@ -123,10 +123,15 @@ func (msg *Message) Scan(row Scannable) *Message {
 }
 
 func (msg *Message) Insert() {
+	var sender interface{} = msg.Sender
+	// Slightly hacky hack to allow inserting empty senders (used for post-backfill dummy events)
+	if msg.Sender.IsEmpty() {
+		sender = ""
+	}
 	_, err := msg.db.Exec(`INSERT INTO message
 			(chat_jid, chat_receiver, jid, mxid, sender, timestamp, sent)
 			VALUES ($1, $2, $3, $4, $5, $6, $7)`,
-		msg.Chat.JID, msg.Chat.Receiver, msg.JID, msg.MXID, msg.Sender, msg.Timestamp.Unix(), msg.Sent)
+		msg.Chat.JID, msg.Chat.Receiver, msg.JID, msg.MXID, sender, msg.Timestamp.Unix(), msg.Sent)
 	if err != nil {
 		msg.log.Warnfln("Failed to insert %s@%s: %v", msg.Chat, msg.JID, err)
 	}

+ 42 - 196
portal.go

@@ -54,7 +54,6 @@ import (
 	"maunium.net/go/mautrix/crypto/attachment"
 	"maunium.net/go/mautrix/event"
 	"maunium.net/go/mautrix/id"
-	"maunium.net/go/mautrix/pushrules"
 
 	"maunium.net/go/mautrix-whatsapp/database"
 )
@@ -263,7 +262,7 @@ func (portal *Portal) getMessageType(waMsg *waProto.Message) string {
 		switch waMsg.GetProtocolMessage().GetType() {
 		case waProto.ProtocolMessage_REVOKE:
 			return "revoke"
-		case waProto.ProtocolMessage_APP_STATE_SYNC_KEY_SHARE, waProto.ProtocolMessage_HISTORY_SYNC_NOTIFICATION:
+		case waProto.ProtocolMessage_APP_STATE_SYNC_KEY_SHARE, waProto.ProtocolMessage_HISTORY_SYNC_NOTIFICATION, waProto.ProtocolMessage_INITIAL_SECURITY_NOTIFICATION_SETTING_SYNC:
 			return "ignore"
 		default:
 			return "unknown_protocol"
@@ -917,14 +916,24 @@ func (portal *Portal) backfill(source *User, messages []*waProto.HistorySyncMsg)
 	var historyBatch, newBatch mautrix.ReqBatchSend
 	var historyBatchInfos, newBatchInfos []*types.MessageInfo
 
-	historyBatch.StateEventsAtStart = make([]*event.Event, 0)
-	newBatch.StateEventsAtStart = make([]*event.Event, 0)
-	removeMembersAtEnd := make(map[id.UserID]*event.MemberEventContent)
-
 	firstMsgTimestamp := time.Unix(int64(messages[len(messages)-1].GetMessage().GetMessageTimestamp()), 0)
 
+	historyBatch.StateEventsAtStart = make([]*event.Event, 1)
+	newBatch.StateEventsAtStart = make([]*event.Event, 1)
+	emptyStr := ""
+	dummyStateEvent := event.Event{
+		Type:      BackfillDummyStateEvent,
+		Sender:    portal.MainIntent().UserID,
+		StateKey:  &emptyStr,
+		Timestamp: firstMsgTimestamp.UnixMilli(),
+		Content:   event.Content{},
+	}
+	historyBatch.StateEventsAtStart[0] = &dummyStateEvent
+	newBatch.StateEventsAtStart[0] = &dummyStateEvent
+	addedMembers := make(map[id.UserID]*event.MemberEventContent)
+
 	addMember := func(puppet *Puppet) {
-		if _, alreadyAdded := removeMembersAtEnd[puppet.MXID]; alreadyAdded {
+		if _, alreadyAdded := addedMembers[puppet.MXID]; alreadyAdded {
 			return
 		}
 		mxid := puppet.MXID.String()
@@ -948,7 +957,7 @@ func (portal *Portal) backfill(source *User, messages []*waProto.HistorySyncMsg)
 			Timestamp: firstMsgTimestamp.UnixMilli(),
 			Content:   event.Content{Parsed: &content},
 		})
-		removeMembersAtEnd[puppet.MXID] = &content
+		addedMembers[puppet.MXID] = &content
 	}
 
 	firstMessage := portal.bridge.DB.Message.GetFirstInChat(portal.Key)
@@ -1029,211 +1038,45 @@ func (portal *Portal) backfill(source *User, messages []*waProto.HistorySyncMsg)
 			portal.finishBatch(historyResp.EventIDs, historyBatchInfos)
 			portal.NextBatchID = historyResp.NextBatchID
 			portal.Update()
+			// If batchID is non-empty, it means this is backfilling very old messages, and we don't need a post-backfill dummy.
+			if historyBatch.BatchID == "" {
+				portal.sendPostBackfillDummy(time.UnixMilli(historyBatch.Events[len(historyBatch.Events)-1].Timestamp))
+			}
 		}
 	}
+
 	if len(newBatch.Events) > 0 {
+		portal.log.Debugln("Sending a dummy event to avoid forward extremity errors with forward backfill")
+		_, err := portal.MainIntent().SendMessageEvent(portal.MXID, ForwardBackfillDummyEvent, struct{}{})
+		if err != nil {
+			portal.log.Warnln("Error sending dummy event for forward backfill:", err)
+		}
 		portal.log.Infofln("Sending %d new messages...", len(newBatch.Events))
 		newResp, err := portal.MainIntent().BatchSend(portal.MXID, &newBatch)
 		if err != nil {
 			portal.log.Errorln("Error sending batch of new messages:", err)
 		} else {
 			portal.finishBatch(newResp.EventIDs, newBatchInfos)
+			portal.sendPostBackfillDummy(time.UnixMilli(newBatch.Events[len(newBatch.Events)-1].Timestamp))
 		}
 	}
 }
 
-//func (portal *Portal) BackfillHistory(user *User, lastMessageTime int64) error {
-//	if !portal.bridge.Config.Bridge.RecoverHistory {
-//		return nil
-//	}
-//
-//	endBackfill := portal.beginBackfill()
-//	defer endBackfill()
-//
-//	lastMessage := portal.bridge.DB.Message.GetLastInChat(portal.Key)
-//	if lastMessage == nil {
-//		return nil
-//	}
-//	if lastMessage.Timestamp >= lastMessageTime {
-//		portal.log.Debugln("Not backfilling: no new messages")
-//		return nil
-//	}
-//
-//	lastMessageID := lastMessage.JID
-//	lastMessageFromMe := lastMessage.Sender == user.JID
-//	portal.log.Infoln("Backfilling history since", lastMessageID, "for", user.MXID)
-//	for len(lastMessageID) > 0 {
-//		portal.log.Debugln("Fetching 50 messages of history after", lastMessageID)
-//		resp, err := user.Conn.LoadMessagesAfter(portal.Key.JID, lastMessageID, lastMessageFromMe, 50)
-//		if err == whatsapp.ErrServerRespondedWith404 {
-//			portal.log.Warnln("Got 404 response trying to fetch messages to backfill. Fetching latest messages as fallback.")
-//			resp, err = user.Conn.LoadMessagesBefore(portal.Key.JID, "", true, 50)
-//		}
-//		if err != nil {
-//			return err
-//		}
-//		messages, ok := resp.Content.([]interface{})
-//		if !ok || len(messages) == 0 {
-//			portal.log.Debugfln("Didn't get more messages to backfill (resp.Content is %T)", resp.Content)
-//			break
-//		}
-//
-//		portal.handleHistory(user, messages)
-//
-//		lastMessageProto, ok := messages[len(messages)-1].(*waProto.WebMessageInfo)
-//		if ok {
-//			lastMessageID = lastMessageProto.GetKey().GetId()
-//			lastMessageFromMe = lastMessageProto.GetKey().GetFromMe()
-//		}
-//	}
-//	portal.log.Infoln("Backfilling finished")
-//	return nil
-//}
-//
-//func (portal *Portal) beginBackfill() func() {
-//	portal.backfillLock.Lock()
-//	portal.backfilling = true
-//	var privateChatPuppetInvited bool
-//	var privateChatPuppet *Puppet
-//	if portal.IsPrivateChat() && portal.bridge.Config.Bridge.InviteOwnPuppetForBackfilling && portal.Key.JID != portal.Key.Receiver {
-//		privateChatPuppet = portal.bridge.GetPuppetByJID(portal.Key.Receiver)
-//		portal.privateChatBackfillInvitePuppet = func() {
-//			if privateChatPuppetInvited {
-//				return
-//			}
-//			privateChatPuppetInvited = true
-//			_, _ = portal.MainIntent().InviteUser(portal.MXID, &mautrix.ReqInviteUser{UserID: privateChatPuppet.MXID})
-//			_ = privateChatPuppet.DefaultIntent().EnsureJoined(portal.MXID)
-//		}
-//	}
-//	return func() {
-//		portal.backfilling = false
-//		portal.privateChatBackfillInvitePuppet = nil
-//		portal.backfillLock.Unlock()
-//		if privateChatPuppet != nil && privateChatPuppetInvited {
-//			_, _ = privateChatPuppet.DefaultIntent().LeaveRoom(portal.MXID)
-//		}
-//	}
-//}
-
-func (portal *Portal) disableNotifications(user *User) {
-	if !portal.bridge.Config.Bridge.HistoryDisableNotifs {
-		return
-	}
-	puppet := portal.bridge.GetPuppetByCustomMXID(user.MXID)
-	if puppet == nil || puppet.customIntent == nil {
-		return
-	}
-	portal.log.Debugfln("Disabling notifications for %s for backfilling", user.MXID)
-	ruleID := fmt.Sprintf("net.maunium.silence_while_backfilling.%s", portal.MXID)
-	err := puppet.customIntent.PutPushRule("global", pushrules.OverrideRule, ruleID, &mautrix.ReqPutPushRule{
-		Actions: []pushrules.PushActionType{pushrules.ActionDontNotify},
-		Conditions: []pushrules.PushCondition{{
-			Kind:    pushrules.KindEventMatch,
-			Key:     "room_id",
-			Pattern: string(portal.MXID),
-		}},
-	})
+func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time) {
+	resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, BackfillEndDummyEvent, struct{}{})
 	if err != nil {
-		portal.log.Warnfln("Failed to disable notifications for %s while backfilling: %v", user.MXID, err)
-	}
-}
-
-func (portal *Portal) enableNotifications(user *User) {
-	if !portal.bridge.Config.Bridge.HistoryDisableNotifs {
+		portal.log.Errorln("Error sending post-backfill dummy event:", err)
 		return
 	}
-	puppet := portal.bridge.GetPuppetByCustomMXID(user.MXID)
-	if puppet == nil || puppet.customIntent == nil {
-		return
-	}
-	portal.log.Debugfln("Re-enabling notifications for %s after backfilling", user.MXID)
-	ruleID := fmt.Sprintf("net.maunium.silence_while_backfilling.%s", portal.MXID)
-	err := puppet.customIntent.DeletePushRule("global", pushrules.OverrideRule, ruleID)
-	if err != nil {
-		portal.log.Warnfln("Failed to re-enable notifications for %s after backfilling: %v", user.MXID, err)
-	}
+	msg := portal.bridge.DB.Message.New()
+	msg.Chat = portal.Key
+	msg.MXID = resp.EventID
+	msg.JID = types.MessageID(resp.EventID)
+	msg.Timestamp = lastTimestamp.Add(1 * time.Second)
+	msg.Sent = true
+	msg.Insert()
 }
 
-//func (portal *Portal) FillInitialHistory(user *User) error {
-//	if portal.bridge.Config.Bridge.InitialHistoryFill == 0 {
-//		return nil
-//	}
-//	endBackfill := portal.beginBackfill()
-//	defer endBackfill()
-//	if portal.privateChatBackfillInvitePuppet != nil {
-//		portal.privateChatBackfillInvitePuppet()
-//	}
-//
-//	n := portal.bridge.Config.Bridge.InitialHistoryFill
-//	portal.log.Infoln("Filling initial history, maximum", n, "messages")
-//	var messages []interface{}
-//	before := ""
-//	fromMe := true
-//	chunkNum := 0
-//	for n > 0 {
-//		chunkNum += 1
-//		count := 50
-//		if n < count {
-//			count = n
-//		}
-//		portal.log.Debugfln("Fetching chunk %d (%d messages / %d cap) before message %s", chunkNum, count, n, before)
-//		resp, err := user.Conn.LoadMessagesBefore(portal.Key.JID, before, fromMe, count)
-//		if err != nil {
-//			return err
-//		}
-//		chunk, ok := resp.Content.([]interface{})
-//		if !ok || len(chunk) == 0 {
-//			portal.log.Infoln("Chunk empty, starting handling of loaded messages")
-//			break
-//		}
-//
-//		messages = append(chunk, messages...)
-//
-//		portal.log.Debugfln("Fetched chunk and received %d messages", len(chunk))
-//
-//		n -= len(chunk)
-//		key := chunk[0].(*waProto.WebMessageInfo).GetKey()
-//		before = key.GetId()
-//		fromMe = key.GetFromMe()
-//		if len(before) == 0 {
-//			portal.log.Infoln("No message ID for first message, starting handling of loaded messages")
-//			break
-//		}
-//	}
-//	portal.disableNotifications(user)
-//	portal.handleHistory(user, messages)
-//	portal.enableNotifications(user)
-//	portal.log.Infoln("Initial history fill complete")
-//	return nil
-//}
-//
-//func (portal *Portal) handleHistory(user *User, messages []interface{}) {
-//	portal.log.Infoln("Handling", len(messages), "messages of history")
-//	for _, rawMessage := range messages {
-//		message, ok := rawMessage.(*waProto.WebMessageInfo)
-//		if !ok {
-//			portal.log.Warnln("Unexpected non-WebMessageInfo item in history response:", rawMessage)
-//			continue
-//		}
-//		data := whatsapp.ParseProtoMessage(message)
-//		if data == nil || data == whatsapp.ErrMessageTypeNotImplemented {
-//			st := message.GetMessageStubType()
-//			// Ignore some types that are known to fail
-//			if st == waProto.WebMessageInfo_CALL_MISSED_VOICE || st == waProto.WebMessageInfo_CALL_MISSED_VIDEO ||
-//				st == waProto.WebMessageInfo_CALL_MISSED_GROUP_VOICE || st == waProto.WebMessageInfo_CALL_MISSED_GROUP_VIDEO {
-//				continue
-//			}
-//			portal.log.Warnln("Message", message.GetKey().GetId(), "failed to parse during backfilling")
-//			continue
-//		}
-//		if portal.privateChatBackfillInvitePuppet != nil && message.GetKey().GetFromMe() && portal.IsPrivateChat() {
-//			portal.privateChatBackfillInvitePuppet()
-//		}
-//		portal.handleMessage(PortalMessage{portal.Key.JID, user, data, message.GetMessageTimestamp()}, true)
-//	}
-//}
-
 type BridgeInfoSection struct {
 	ID          string              `json:"id"`
 	DisplayName string              `json:"displayname,omitempty"`
@@ -1291,7 +1134,10 @@ func (portal *Portal) UpdateBridgeInfo() {
 	}
 }
 
-var PortalCreationDummyEvent = event.Type{Type: "fi.mau.portal_created", Class: event.MessageEventType}
+var PortalCreationDummyEvent = event.Type{Type: "fi.mau.dummy.portal_created", Class: event.MessageEventType}
+var BackfillDummyStateEvent = event.Type{Type: "fi.mau.dummy.blank_backfill_state", Class: event.StateEventType}
+var BackfillEndDummyEvent = event.Type{Type: "fi.mau.dummy.backfill_end", Class: event.MessageEventType}
+var ForwardBackfillDummyEvent = event.Type{Type: "fi.mau.dummy.pre_forward_backfill", Class: event.MessageEventType}
 
 func (portal *Portal) CreateMatrixRoom(user *User) error {
 	portal.roomCreateLock.Lock()