refactor: improve XMPP client/bridge architecture separation
- Move protocol normalization methods to XMPP client (ExtractChannelID, ExtractUserInfo, ExtractMessageBody) - Replace message channel aggregation with direct handler delegation pattern - XMPP client now focuses purely on protocol concerns (connection, deduplication, normalization) - XMPP bridge handles business logic (BridgeMessage creation, routing) - Add SourceRemoteID field to BridgeMessage for better message tracking - Remove unused message channel infrastructure in favor of mux.MessageHandlerFunc pattern 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
8e9d87b176
commit
11a32afc53
3 changed files with 127 additions and 125 deletions
|
@ -16,6 +16,8 @@ import (
|
||||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/store/kvstore"
|
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/store/kvstore"
|
||||||
xmppClient "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp"
|
xmppClient "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp"
|
||||||
"github.com/mattermost/mattermost/server/public/plugin"
|
"github.com/mattermost/mattermost/server/public/plugin"
|
||||||
|
"mellium.im/xmlstream"
|
||||||
|
"mellium.im/xmpp/stanza"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -179,8 +181,6 @@ func (b *xmppBridge) Start() error {
|
||||||
// Start connection monitor
|
// Start connection monitor
|
||||||
go b.connectionMonitor()
|
go b.connectionMonitor()
|
||||||
|
|
||||||
// Start message aggregation
|
|
||||||
go b.startMessageAggregation()
|
|
||||||
|
|
||||||
b.logger.LogInfo("Mattermost to XMPP bridge started successfully")
|
b.logger.LogInfo("Mattermost to XMPP bridge started successfully")
|
||||||
return nil
|
return nil
|
||||||
|
@ -230,6 +230,8 @@ func (b *xmppBridge) connectToXMPP() error {
|
||||||
b.logger.LogDebug("Set bridge client online presence")
|
b.logger.LogDebug("Set bridge client online presence")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
b.bridgeClient.SetMessageHandler(b.handleIncomingXMPPMessage)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -572,35 +574,6 @@ func (b *xmppBridge) GetUserManager() pluginModel.BridgeUserManager {
|
||||||
return b.userManager
|
return b.userManager
|
||||||
}
|
}
|
||||||
|
|
||||||
// startMessageAggregation starts the message aggregation goroutine
|
|
||||||
func (b *xmppBridge) startMessageAggregation() {
|
|
||||||
clientChannel := b.bridgeClient.GetMessageChannel()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-b.ctx.Done():
|
|
||||||
b.logger.LogDebug("Stopping XMPP message aggregation")
|
|
||||||
return
|
|
||||||
case msg, ok := <-clientChannel:
|
|
||||||
if !ok {
|
|
||||||
b.logger.LogDebug("Bridge client message channel closed")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Forward to our bridge's message channel
|
|
||||||
select {
|
|
||||||
case b.incomingMessages <- msg:
|
|
||||||
// Message forwarded successfully
|
|
||||||
case <-b.ctx.Done():
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
b.logger.LogWarn("Bridge message channel full, dropping message",
|
|
||||||
"source_channel", msg.SourceChannelID,
|
|
||||||
"user_id", msg.SourceUserID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetMessageChannel returns the channel for incoming messages from XMPP
|
// GetMessageChannel returns the channel for incoming messages from XMPP
|
||||||
func (b *xmppBridge) GetMessageChannel() <-chan *pluginModel.DirectionalMessage {
|
func (b *xmppBridge) GetMessageChannel() <-chan *pluginModel.DirectionalMessage {
|
||||||
|
@ -631,3 +604,72 @@ func (b *xmppBridge) GetRemoteID() string {
|
||||||
func (b *xmppBridge) ID() string {
|
func (b *xmppBridge) ID() string {
|
||||||
return b.bridgeID
|
return b.bridgeID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleIncomingXMPPMessage handles incoming XMPP messages and converts them to bridge messages
|
||||||
|
func (b *xmppBridge) handleIncomingXMPPMessage(msg stanza.Message, t xmlstream.TokenReadEncoder) error {
|
||||||
|
b.logger.LogDebug("XMPP bridge handling incoming message",
|
||||||
|
"from", msg.From.String(),
|
||||||
|
"to", msg.To.String(),
|
||||||
|
"type", fmt.Sprintf("%v", msg.Type))
|
||||||
|
|
||||||
|
// Only process groupchat messages for now (MUC messages from channels)
|
||||||
|
if msg.Type != stanza.GroupChatMessage {
|
||||||
|
b.logger.LogDebug("Ignoring non-groupchat message", "type", fmt.Sprintf("%v", msg.Type))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract message body using client method
|
||||||
|
messageBody, err := b.bridgeClient.ExtractMessageBody(t)
|
||||||
|
if err != nil {
|
||||||
|
b.logger.LogWarn("Failed to extract message body", "error", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if messageBody == "" {
|
||||||
|
b.logger.LogDebug("Ignoring message with empty body")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use client methods for protocol normalization
|
||||||
|
channelID, err := b.bridgeClient.ExtractChannelID(msg.From)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to extract channel ID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
userID, displayName := b.bridgeClient.ExtractUserInfo(msg.From)
|
||||||
|
|
||||||
|
// Create bridge message
|
||||||
|
bridgeMessage := &pluginModel.BridgeMessage{
|
||||||
|
SourceBridge: b.bridgeID,
|
||||||
|
SourceChannelID: channelID,
|
||||||
|
SourceUserID: userID,
|
||||||
|
SourceUserName: displayName,
|
||||||
|
SourceRemoteID: b.remoteID,
|
||||||
|
Content: messageBody,
|
||||||
|
MessageType: "text",
|
||||||
|
Timestamp: time.Now(), // TODO: Parse timestamp from message if available
|
||||||
|
MessageID: msg.ID,
|
||||||
|
TargetBridges: []string{"mattermost"}, // Route to Mattermost
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create directional message for incoming (XMPP -> Mattermost)
|
||||||
|
directionalMessage := &pluginModel.DirectionalMessage{
|
||||||
|
BridgeMessage: bridgeMessage,
|
||||||
|
Direction: pluginModel.DirectionIncoming,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send to bridge's message channel
|
||||||
|
select {
|
||||||
|
case b.incomingMessages <- directionalMessage:
|
||||||
|
b.logger.LogDebug("XMPP message queued for processing",
|
||||||
|
"channel_id", channelID,
|
||||||
|
"user_id", userID,
|
||||||
|
"message_id", msg.ID)
|
||||||
|
default:
|
||||||
|
b.logger.LogWarn("Bridge message channel full, dropping message",
|
||||||
|
"channel_id", channelID,
|
||||||
|
"user_id", userID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -15,10 +15,11 @@ const (
|
||||||
// BridgeMessage represents a message that can be passed between any bridge types
|
// BridgeMessage represents a message that can be passed between any bridge types
|
||||||
type BridgeMessage struct {
|
type BridgeMessage struct {
|
||||||
// Source information
|
// Source information
|
||||||
SourceBridge string // "xmpp", "mattermost", "slack", etc.
|
SourceBridge string // "xmpp", "mattermost", "slack", etc.
|
||||||
SourceChannelID string // Channel ID in source system
|
SourceChannelID string // Channel ID in source system
|
||||||
SourceUserID string // User ID in source system (JID, user ID, etc.)
|
SourceUserID string // User ID in source system (JID, user ID, etc.)
|
||||||
SourceUserName string // Display name in source system
|
SourceUserName string // Display name in source system
|
||||||
|
SourceRemoteID string // Remote ID of the bridge instance that created this message
|
||||||
|
|
||||||
// Message content (standardized on Markdown)
|
// Message content (standardized on Markdown)
|
||||||
Content string // Markdown formatted message content
|
Content string // Markdown formatted message content
|
||||||
|
|
|
@ -12,7 +12,6 @@ import (
|
||||||
|
|
||||||
"github.com/jellydator/ttlcache/v3"
|
"github.com/jellydator/ttlcache/v3"
|
||||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
|
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
|
||||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
|
||||||
"mellium.im/sasl"
|
"mellium.im/sasl"
|
||||||
"mellium.im/xmlstream"
|
"mellium.im/xmlstream"
|
||||||
"mellium.im/xmpp"
|
"mellium.im/xmpp"
|
||||||
|
@ -27,9 +26,6 @@ const (
|
||||||
// defaultOperationTimeout is the default timeout for XMPP operations
|
// defaultOperationTimeout is the default timeout for XMPP operations
|
||||||
defaultOperationTimeout = 5 * time.Second
|
defaultOperationTimeout = 5 * time.Second
|
||||||
|
|
||||||
// msgBufferSize is the buffer size for incoming message channels
|
|
||||||
msgBufferSize = 1000
|
|
||||||
|
|
||||||
// messageDedupeTTL is the TTL for message deduplication cache
|
// messageDedupeTTL is the TTL for message deduplication cache
|
||||||
messageDedupeTTL = 30 * time.Second
|
messageDedupeTTL = 30 * time.Second
|
||||||
)
|
)
|
||||||
|
@ -56,7 +52,7 @@ type Client struct {
|
||||||
sessionServing bool
|
sessionServing bool
|
||||||
|
|
||||||
// Message handling for bridge integration
|
// Message handling for bridge integration
|
||||||
incomingMessages chan *model.DirectionalMessage
|
messageHandler mux.MessageHandlerFunc // Bridge handler for incoming messages
|
||||||
|
|
||||||
// Message deduplication cache to handle XMPP server duplicates
|
// Message deduplication cache to handle XMPP server duplicates
|
||||||
dedupeCache *ttlcache.Cache[string, time.Time]
|
dedupeCache *ttlcache.Cache[string, time.Time]
|
||||||
|
@ -92,6 +88,12 @@ type XMPPMessage struct {
|
||||||
Body MessageBody `xml:"body"`
|
Body MessageBody `xml:"body"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MessageWithBody represents a message stanza with body for parsing
|
||||||
|
type MessageWithBody struct {
|
||||||
|
stanza.Message
|
||||||
|
Body string `xml:"body"`
|
||||||
|
}
|
||||||
|
|
||||||
// GhostUser represents an XMPP ghost user
|
// GhostUser represents an XMPP ghost user
|
||||||
type GhostUser struct {
|
type GhostUser struct {
|
||||||
JID string `json:"jid"`
|
JID string `json:"jid"`
|
||||||
|
@ -126,7 +128,6 @@ func NewClient(serverURL, username, password, resource, remoteID string, logger
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
sessionReady: make(chan struct{}),
|
sessionReady: make(chan struct{}),
|
||||||
incomingMessages: make(chan *model.DirectionalMessage, msgBufferSize),
|
|
||||||
dedupeCache: dedupeCache,
|
dedupeCache: dedupeCache,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,6 +156,11 @@ func (c *Client) SetServerDomain(domain string) {
|
||||||
c.serverDomain = domain
|
c.serverDomain = domain
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetMessageHandler sets the bridge message handler for incoming XMPP messages
|
||||||
|
func (c *Client) SetMessageHandler(handler mux.MessageHandlerFunc) {
|
||||||
|
c.messageHandler = handler
|
||||||
|
}
|
||||||
|
|
||||||
// parseServerAddress parses a server URL and returns a host:port address
|
// parseServerAddress parses a server URL and returns a host:port address
|
||||||
func (c *Client) parseServerAddress(serverURL string) (string, error) {
|
func (c *Client) parseServerAddress(serverURL string) (string, error) {
|
||||||
// Handle simple host:port format (e.g., "localhost:5222")
|
// Handle simple host:port format (e.g., "localhost:5222")
|
||||||
|
@ -354,6 +360,38 @@ func (c *Client) Disconnect() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ExtractChannelID extracts the channel ID (room bare JID) from a message JID
|
||||||
|
func (c *Client) ExtractChannelID(from jid.JID) (string, error) {
|
||||||
|
// For MUC messages, the channel ID is the bare JID (without resource/nickname)
|
||||||
|
return from.Bare().String(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExtractUserInfo extracts user ID and display name from a message JID
|
||||||
|
func (c *Client) ExtractUserInfo(from jid.JID) (string, string) {
|
||||||
|
// For MUC messages, the resource part is the nickname
|
||||||
|
nickname := from.Resourcepart()
|
||||||
|
|
||||||
|
// Use the full JID as user ID for XMPP
|
||||||
|
userID := from.String()
|
||||||
|
|
||||||
|
// Use nickname as display name if available, otherwise use full JID
|
||||||
|
displayName := nickname
|
||||||
|
if displayName == "" {
|
||||||
|
displayName = from.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
return userID, displayName
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExtractMessageBody extracts the message body from an XMPP token stream
|
||||||
|
func (c *Client) ExtractMessageBody(t xmlstream.TokenReadEncoder) (string, error) {
|
||||||
|
var fullMsg MessageWithBody
|
||||||
|
if err := xml.NewTokenDecoder(t).DecodeElement(&fullMsg, nil); err != nil {
|
||||||
|
return "", fmt.Errorf("failed to decode message body: %w", err)
|
||||||
|
}
|
||||||
|
return fullMsg.Body, nil
|
||||||
|
}
|
||||||
|
|
||||||
// JoinRoom joins an XMPP Multi-User Chat room
|
// JoinRoom joins an XMPP Multi-User Chat room
|
||||||
func (c *Client) JoinRoom(roomJID string) error {
|
func (c *Client) JoinRoom(roomJID string) error {
|
||||||
if c.session == nil {
|
if c.session == nil {
|
||||||
|
@ -693,10 +731,6 @@ func (c *Client) Ping() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMessageChannel returns the channel for incoming messages (Bridge interface)
|
|
||||||
func (c *Client) GetMessageChannel() <-chan *model.DirectionalMessage {
|
|
||||||
return c.incomingMessages
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleIncomingMessage processes incoming XMPP message stanzas
|
// handleIncomingMessage processes incoming XMPP message stanzas
|
||||||
func (c *Client) handleIncomingMessage(msg stanza.Message, t xmlstream.TokenReadEncoder) error {
|
func (c *Client) handleIncomingMessage(msg stanza.Message, t xmlstream.TokenReadEncoder) error {
|
||||||
|
@ -711,28 +745,11 @@ func (c *Client) handleIncomingMessage(msg stanza.Message, t xmlstream.TokenRead
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse the message body from the token reader
|
|
||||||
var msgWithBody struct {
|
|
||||||
stanza.Message
|
|
||||||
Body string `xml:"body"`
|
|
||||||
}
|
|
||||||
msgWithBody.Message = msg
|
|
||||||
|
|
||||||
d := xml.NewTokenDecoder(t)
|
|
||||||
if err := d.DecodeElement(&msgWithBody, nil); err != nil {
|
|
||||||
c.logger.LogError("Failed to decode message body", "error", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if msgWithBody.Body == "" {
|
|
||||||
c.logger.LogDebug("Message has no body, ignoring")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deduplicate messages using message ID and TTL cache
|
// Deduplicate messages using message ID and TTL cache
|
||||||
if msg.ID != "" {
|
if msg.ID != "" {
|
||||||
// Check if this message ID is already in the cache (indicates duplicate)
|
// Check if this message ID is already in the cache (indicates duplicate)
|
||||||
if c.dedupeCache.Has(msg.ID) {
|
if c.dedupeCache.Has(msg.ID) {
|
||||||
|
c.logger.LogDebug("Skipping duplicate message", "message_id", msg.ID)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -740,70 +757,12 @@ func (c *Client) handleIncomingMessage(msg stanza.Message, t xmlstream.TokenRead
|
||||||
c.dedupeCache.Set(msg.ID, time.Now(), ttlcache.DefaultTTL)
|
c.dedupeCache.Set(msg.ID, time.Now(), ttlcache.DefaultTTL)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract channel and user information from JIDs
|
// Delegate to bridge handler if set
|
||||||
channelID, err := c.extractChannelID(msg.From)
|
if c.messageHandler != nil {
|
||||||
if err != nil {
|
return c.messageHandler(msg, t)
|
||||||
c.logger.LogError("Failed to extract channel ID from JID", "from", msg.From.String(), "error", err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
userID, userName := c.extractUserInfo(msg.From)
|
|
||||||
|
|
||||||
// Create BridgeMessage
|
|
||||||
bridgeMsg := &model.BridgeMessage{
|
|
||||||
SourceBridge: "xmpp",
|
|
||||||
SourceChannelID: channelID,
|
|
||||||
SourceUserID: userID,
|
|
||||||
SourceUserName: userName,
|
|
||||||
Content: msgWithBody.Body, // Already Markdown compatible
|
|
||||||
MessageType: "text",
|
|
||||||
Timestamp: time.Now(), // XMPP doesn't always provide timestamps
|
|
||||||
MessageID: msg.ID,
|
|
||||||
TargetBridges: []string{}, // Will be routed to all other bridges
|
|
||||||
Metadata: map[string]any{
|
|
||||||
"xmpp_from": msg.From.String(),
|
|
||||||
"xmpp_to": msg.To.String(),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wrap in directional message
|
|
||||||
directionalMsg := &model.DirectionalMessage{
|
|
||||||
BridgeMessage: bridgeMsg,
|
|
||||||
Direction: model.DirectionIncoming,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send to message channel (non-blocking)
|
|
||||||
select {
|
|
||||||
case c.incomingMessages <- directionalMsg:
|
|
||||||
// Message queued successfully
|
|
||||||
default:
|
|
||||||
c.logger.LogWarn("Message channel full, dropping message",
|
|
||||||
"channel_id", channelID,
|
|
||||||
"user_id", userID)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.logger.LogDebug("No message handler set, ignoring message")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// extractChannelID extracts the channel ID (room bare JID) from a message JID
|
|
||||||
func (c *Client) extractChannelID(from jid.JID) (string, error) {
|
|
||||||
// For MUC messages, the channel ID is the bare JID (without resource/nickname)
|
|
||||||
return from.Bare().String(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// extractUserInfo extracts user ID and display name from a message JID
|
|
||||||
func (c *Client) extractUserInfo(from jid.JID) (string, string) {
|
|
||||||
// For MUC messages, the resource part is the nickname
|
|
||||||
nickname := from.Resourcepart()
|
|
||||||
|
|
||||||
// Use the full JID as user ID for XMPP
|
|
||||||
userID := from.String()
|
|
||||||
|
|
||||||
// Use nickname as display name if available, otherwise use full JID
|
|
||||||
displayName := nickname
|
|
||||||
if displayName == "" {
|
|
||||||
displayName = from.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
return userID, displayName
|
|
||||||
}
|
|
||||||
|
|
Reference in a new issue