From 69a67704f431e734ed916a79e04d160c8192b579 Mon Sep 17 00:00:00 2001 From: Felipe Martin Date: Mon, 4 Aug 2025 19:04:43 +0200 Subject: [PATCH 1/7] fix: prevent dangling XMPP connections during configuration updates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add getConfiguration() methods to both bridges for thread-safe config access - Refactor UpdateConfiguration() methods to prevent mutex deadlock by releasing lock before blocking operations - Fix XMPP bridge to properly disconnect existing bridgeClient before creating new one - Add comprehensive timeout support to XMPP client (30s connection, 10s operations, 5s ping) - Implement proper disconnection with offline presence - Update all interfaces to use *config.Configuration for type safety 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- server/bridge/manager.go | 3 +- server/bridge/mattermost/bridge.go | 15 ++++-- server/bridge/xmpp/bridge.go | 70 ++++++++++++------------- server/configuration.go | 2 +- server/model/bridge.go | 4 +- server/xmpp/client.go | 82 ++++++++++++++++++++++++++---- 6 files changed, 121 insertions(+), 55 deletions(-) diff --git a/server/bridge/manager.go b/server/bridge/manager.go index 1137368..5dc1f9a 100644 --- a/server/bridge/manager.go +++ b/server/bridge/manager.go @@ -4,6 +4,7 @@ import ( "fmt" "sync" + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" mmModel "github.com/mattermost/mattermost/server/public/model" @@ -196,7 +197,7 @@ func (m *BridgeManager) Shutdown() error { } // OnPluginConfigurationChange propagates configuration changes to all registered bridges -func (m *BridgeManager) OnPluginConfigurationChange(config any) error { +func (m *BridgeManager) OnPluginConfigurationChange(config *config.Configuration) error { m.mu.RLock() defer m.mu.RUnlock() diff --git a/server/bridge/mattermost/bridge.go b/server/bridge/mattermost/bridge.go index 79baf6e..c2cdbec 100644 --- a/server/bridge/mattermost/bridge.go +++ b/server/bridge/mattermost/bridge.go @@ -52,11 +52,18 @@ func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg * return bridge } +// getConfiguration safely retrieves the current configuration +func (b *mattermostBridge) getConfiguration() *config.Configuration { + b.configMu.RLock() + defer b.configMu.RUnlock() + return b.config +} + // UpdateConfiguration updates the bridge configuration -func (b *mattermostBridge) UpdateConfiguration(newConfig any) error { - cfg, ok := newConfig.(*config.Configuration) - if !ok { - return fmt.Errorf("invalid configuration type") +func (b *mattermostBridge) UpdateConfiguration(cfg *config.Configuration) error { + // Validate configuration using built-in validation + if err := cfg.IsValid(); err != nil { + return fmt.Errorf("invalid configuration: %w", err) } b.configMu.Lock() diff --git a/server/bridge/xmpp/bridge.go b/server/bridge/xmpp/bridge.go index 4a1ffe6..b375ce4 100644 --- a/server/bridge/xmpp/bridge.go +++ b/server/bridge/xmpp/bridge.go @@ -80,56 +80,50 @@ func (b *xmppBridge) createXMPPClient(cfg *config.Configuration) *xmppClient.Cli ) } +// getConfiguration safely retrieves the current configuration +func (b *xmppBridge) getConfiguration() *config.Configuration { + b.configMu.RLock() + defer b.configMu.RUnlock() + return b.config +} + // UpdateConfiguration updates the bridge configuration -func (b *xmppBridge) UpdateConfiguration(newConfig any) error { - cfg, ok := newConfig.(*config.Configuration) - if !ok { - return fmt.Errorf("invalid configuration type") +// It handles validation and reconnection logic when the configuration changes +func (b *xmppBridge) UpdateConfiguration(cfg *config.Configuration) error { + // Validate configuration using built-in validation + if err := cfg.IsValid(); err != nil { + return fmt.Errorf("invalid configuration: %w", err) } - b.configMu.Lock() - oldConfig := b.config - b.config = cfg - defer b.configMu.Unlock() + // Get current config to check if restart is needed + oldConfig := b.getConfiguration() - b.logger.LogInfo("XMPP bridge configuration updated") + // Update configuration under lock, then release immediately + b.configMu.Lock() + b.config = cfg // Initialize or update XMPP client with new configuration - if cfg.EnableSync { - if cfg.XMPPServerURL == "" || cfg.XMPPUsername == "" || cfg.XMPPPassword == "" { - return fmt.Errorf("XMPP server URL, username, and password are required when sync is enabled") + if !cfg.Equals(oldConfig) { + if b.bridgeClient != nil && b.bridgeClient.Disconnect() != nil { + b.logger.LogError("Failed to disconnect old XMPP bridge client") } - b.bridgeClient = b.createXMPPClient(cfg) - } else { - b.bridgeClient = nil + } + b.configMu.Unlock() + + // Stop the bridge + if err := b.Stop(); err != nil { + b.logger.LogWarn("Error stopping bridge during restart", "error", err) } - // Check if we need to restart the bridge due to configuration changes - wasConnected := b.connected.Load() - needsRestart := oldConfig != nil && !oldConfig.Equals(cfg) && wasConnected - - // Log the configuration change - if needsRestart { - b.logger.LogInfo("Configuration changed, restarting bridge") - } else { - b.logger.LogInfo("Configuration updated", "config", cfg) + // Start the bridge with new configuration + // Start() method already uses getConfiguration() safely + if err := b.Start(); err != nil { + b.logger.LogError("Failed to restart bridge with new configuration", "error", err) + return fmt.Errorf("failed to restart bridge: %w", err) } - if needsRestart { - b.logger.LogInfo("Configuration changed, restarting bridge") - - // Stop the bridge - if err := b.Stop(); err != nil { - b.logger.LogWarn("Error stopping bridge during restart", "error", err) - } - - // Start the bridge with new configuration - if err := b.Start(); err != nil { - b.logger.LogError("Failed to restart bridge with new configuration", "error", err) - return fmt.Errorf("failed to restart bridge: %w", err) - } - } + b.logger.LogDebug("XMPP bridge configuration updated successfully") return nil } diff --git a/server/configuration.go b/server/configuration.go index 79fa130..b6bad1f 100644 --- a/server/configuration.go +++ b/server/configuration.go @@ -57,7 +57,7 @@ func (p *Plugin) OnConfigurationChange() error { return errors.Wrap(err, "failed to load plugin configuration") } - p.API.LogDebug("Loaded configuration in OnConfigurationChange", "configuration", configuration) + p.API.LogDebug("Plugin configuration changed") // Validate the configuration if err := configuration.IsValid(); err != nil { diff --git a/server/model/bridge.go b/server/model/bridge.go index bd00672..a5df73e 100644 --- a/server/model/bridge.go +++ b/server/model/bridge.go @@ -110,7 +110,7 @@ type BridgeManager interface { // OnPluginConfigurationChange propagates configuration changes to all registered bridges. // Returns an error if any bridge fails to update its configuration, but continues to // attempt updating all bridges. - OnPluginConfigurationChange(config any) error + OnPluginConfigurationChange(config *config.Configuration) error // CreateChannelMapping is called when a channel mapping is created. CreateChannelMapping(req CreateChannelMappingRequest) error @@ -121,7 +121,7 @@ type BridgeManager interface { type Bridge interface { // UpdateConfiguration updates the bridge configuration - UpdateConfiguration(config any) error + UpdateConfiguration(config *config.Configuration) error // Start starts the bridge Start() error diff --git a/server/xmpp/client.go b/server/xmpp/client.go index aad5e24..83f3709 100644 --- a/server/xmpp/client.go +++ b/server/xmpp/client.go @@ -18,6 +18,11 @@ import ( "mellium.im/xmpp/stanza" ) +const ( + // defaultOperationTimeout is the default timeout for XMPP operations + defaultOperationTimeout = 5 * time.Second +) + // Client represents an XMPP client for communicating with XMPP servers. type Client struct { serverURL string @@ -150,9 +155,13 @@ func (c *Client) Connect() error { } } - // Use DialClientSession for proper SASL authentication + // Create a timeout context for the connection attempt (30 seconds) + connectCtx, connectCancel := context.WithTimeout(c.ctx, 30*time.Second) + defer connectCancel() + + // Use DialClientSession for proper SASL authentication with timeout c.session, err = xmpp.DialClientSession( - c.ctx, + connectCtx, c.jidAddr, xmpp.StartTLS(tlsConfig), xmpp.SASL("", c.password, sasl.Plain), @@ -171,9 +180,12 @@ func (c *Client) Connect() error { if !c.sessionServing { return fmt.Errorf("failed to start session serving") } + c.logger.LogInfo("XMPP client connected successfully", "jid", c.jidAddr.String()) return nil - case <-time.After(5 * time.Second): + case <-time.After(10 * time.Second): return fmt.Errorf("timeout waiting for session to be ready") + case <-c.ctx.Done(): + return fmt.Errorf("connection cancelled: %w", c.ctx.Err()) } } @@ -206,18 +218,46 @@ func (c *Client) serveSession() { // Disconnect closes the XMPP connection func (c *Client) Disconnect() error { - if c.session != nil { - err := c.session.Close() - c.session = nil - if err != nil { - return fmt.Errorf("failed to close XMPP session: %w", err) - } + if c.session == nil { + return nil // Already disconnected } + c.logger.LogInfo("Disconnecting XMPP client", "jid", c.jidAddr.String()) + + // Send offline presence before disconnecting to properly leave rooms + if err := c.SetOfflinePresence(); err != nil { + c.logger.LogWarn("Failed to set offline presence before disconnect", "error", err) + // Don't fail the disconnect for presence issues + } + + // Close the session with a timeout to prevent hanging + sessionCloseCtx, cancel := context.WithTimeout(context.Background(), defaultOperationTimeout) + defer cancel() + + sessionCloseDone := make(chan error, 1) + go func() { + sessionCloseDone <- c.session.Close() + }() + + select { + case err := <-sessionCloseDone: + c.session = nil + if err != nil { + c.logger.LogWarn("Error closing XMPP session", "error", err) + return fmt.Errorf("failed to close XMPP session: %w", err) + } + case <-sessionCloseCtx.Done(): + c.logger.LogWarn("Timeout closing XMPP session, forcing disconnect") + c.session = nil + // Continue with cleanup even on timeout + } + + // Cancel the client context if c.cancel != nil { c.cancel() } + c.logger.LogInfo("XMPP client disconnected successfully") return nil } @@ -418,6 +458,30 @@ func (c *Client) SetOnlinePresence() error { return nil } +// SetOfflinePresence sends an offline presence stanza to indicate the client is going offline +func (c *Client) SetOfflinePresence() error { + if c.session == nil { + return fmt.Errorf("XMPP session not established") + } + + // Create presence stanza indicating we're unavailable + presence := stanza.Presence{ + Type: stanza.UnavailablePresence, + From: c.jidAddr, + } + + // Create a context with timeout for the presence update + ctx, cancel := context.WithTimeout(context.Background(), defaultOperationTimeout) + defer cancel() + + // Send the presence stanza + if err := c.session.Encode(ctx, presence); err != nil { + return fmt.Errorf("failed to send offline presence: %w", err) + } + + return nil +} + // CheckRoomExists verifies if an XMPP room exists and is accessible using disco#info func (c *Client) CheckRoomExists(roomJID string) (bool, error) { if c.session == nil { From 7b56cb34c6f61abff9fe30e1178f39f9e52d965e Mon Sep 17 00:00:00 2001 From: Felipe Martin Date: Mon, 4 Aug 2025 21:52:28 +0200 Subject: [PATCH 2/7] feat: implement bidirectional message bridge system with XMPP-Mattermost integration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit implements a comprehensive bridge-agnostic message routing system that enables real-time bidirectional message synchronization between XMPP and Mattermost platforms. Key features: - Bridge-agnostic message types and structures for extensibility - Central message bus system with publisher-subscriber pattern - Complete Bridge interface implementation for both XMPP and Mattermost - Message aggregation from multiple sources for scalability - Loop prevention mechanisms to avoid infinite message cycles - Buffered channels for high-performance message processing Architecture highlights: - Producer-consumer pattern for message routing between bridges - Thread-safe goroutine lifecycle management with context cancellation - Message handlers separated into dedicated files for maintainability - Support for future bridge implementations (Slack, Discord, etc.) - Markdown content standardization across all bridges Files added: - server/model/message.go: Core bridge-agnostic message structures - server/bridge/messagebus.go: Central message routing system - server/bridge/mattermost/message_handler.go: Mattermost-specific message processing - server/bridge/xmpp/message_handler.go: XMPP-specific message processing Files modified: - server/bridge/manager.go: Integration with message bus and routing - server/bridge/mattermost/bridge.go: Complete Bridge interface implementation - server/bridge/xmpp/bridge.go: Message aggregation and interface completion - server/model/bridge.go: Extended Bridge interface for bidirectional messaging - server/xmpp/client.go: Enhanced message listening with mellium.im/xmpp 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- server/bridge/manager.go | 144 +++++++++++- server/bridge/mattermost/bridge.go | 55 ++++- server/bridge/mattermost/message_handler.go | 207 +++++++++++++++++ server/bridge/messagebus.go | 244 ++++++++++++++++++++ server/bridge/xmpp/bridge.go | 98 +++++++- server/bridge/xmpp/message_handler.go | 166 +++++++++++++ server/model/bridge.go | 6 + server/model/message.go | 88 +++++++ server/xmpp/client.go | 152 ++++++++++-- 9 files changed, 1119 insertions(+), 41 deletions(-) create mode 100644 server/bridge/mattermost/message_handler.go create mode 100644 server/bridge/messagebus.go create mode 100644 server/bridge/xmpp/message_handler.go create mode 100644 server/model/message.go diff --git a/server/bridge/manager.go b/server/bridge/manager.go index 5dc1f9a..4259613 100644 --- a/server/bridge/manager.go +++ b/server/bridge/manager.go @@ -1,6 +1,7 @@ package bridge import ( + "context" "fmt" "sync" @@ -13,11 +14,15 @@ import ( // BridgeManager manages multiple bridge instances type BridgeManager struct { - bridges map[string]model.Bridge - mu sync.RWMutex - logger logger.Logger - api plugin.API - remoteID string + bridges map[string]model.Bridge + mu sync.RWMutex + logger logger.Logger + api plugin.API + remoteID string + messageBus model.MessageBus + routingCtx context.Context + routingCancel context.CancelFunc + routingWg sync.WaitGroup } // NewBridgeManager creates a new bridge manager @@ -29,11 +34,16 @@ func NewBridgeManager(logger logger.Logger, api plugin.API, remoteID string) mod panic("plugin API cannot be nil") } + ctx, cancel := context.WithCancel(context.Background()) + return &BridgeManager{ - bridges: make(map[string]model.Bridge), - logger: logger, - api: api, - remoteID: remoteID, + bridges: make(map[string]model.Bridge), + logger: logger, + api: api, + remoteID: remoteID, + messageBus: NewMessageBus(logger), + routingCtx: ctx, + routingCancel: cancel, } } @@ -56,6 +66,9 @@ func (m *BridgeManager) RegisterBridge(name string, bridge model.Bridge) error { m.bridges[name] = bridge m.logger.LogInfo("Bridge registered", "name", name) + // Subscribe bridge to message bus + go m.startBridgeMessageHandler(name, bridge) + return nil } @@ -405,3 +418,116 @@ func (m *BridgeManager) unshareChannel(channelID string) error { return nil } + +// startBridgeMessageHandler starts message handling for a specific bridge +func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge model.Bridge) { + m.logger.LogDebug("Starting message handler for bridge", "bridge", bridgeName) + + // Subscribe to message bus + messageChannel := m.messageBus.Subscribe(bridgeName) + + // Start message routing goroutine + m.routingWg.Add(1) + go func() { + defer m.routingWg.Done() + defer m.logger.LogDebug("Message handler stopped for bridge", "bridge", bridgeName) + + for { + select { + case msg, ok := <-messageChannel: + if !ok { + m.logger.LogDebug("Message channel closed for bridge", "bridge", bridgeName) + return + } + + if err := m.handleBridgeMessage(bridgeName, bridge, msg); err != nil { + m.logger.LogError("Failed to handle message for bridge", + "bridge", bridgeName, + "source_bridge", msg.SourceBridge, + "error", err) + } + + case <-m.routingCtx.Done(): + m.logger.LogDebug("Context cancelled, stopping message handler", "bridge", bridgeName) + return + } + } + }() + + // Listen to bridge's outgoing messages + m.routingWg.Add(1) + go func() { + defer m.routingWg.Done() + defer m.logger.LogDebug("Bridge message listener stopped", "bridge", bridgeName) + + bridgeMessageChannel := bridge.GetMessageChannel() + for { + select { + case msg, ok := <-bridgeMessageChannel: + if !ok { + m.logger.LogDebug("Bridge message channel closed", "bridge", bridgeName) + return + } + + if err := m.messageBus.Publish(msg); err != nil { + m.logger.LogError("Failed to publish message from bridge", + "bridge", bridgeName, + "direction", msg.Direction, + "error", err) + } + + case <-m.routingCtx.Done(): + m.logger.LogDebug("Context cancelled, stopping bridge listener", "bridge", bridgeName) + return + } + } + }() +} + +// handleBridgeMessage processes an incoming message for a specific bridge +func (m *BridgeManager) handleBridgeMessage(bridgeName string, bridge model.Bridge, msg *model.DirectionalMessage) error { + m.logger.LogDebug("Handling message for bridge", + "target_bridge", bridgeName, + "source_bridge", msg.SourceBridge, + "direction", msg.Direction, + "channel_id", msg.SourceChannelID) + + // Get the bridge's message handler + handler := bridge.GetMessageHandler() + if handler == nil { + return fmt.Errorf("bridge %s does not have a message handler", bridgeName) + } + + // Check if the handler can process this message + if !handler.CanHandleMessage(msg.BridgeMessage) { + m.logger.LogDebug("Bridge cannot handle message", + "bridge", bridgeName, + "message_type", msg.MessageType) + return nil // Not an error, just skip + } + + // Process the message + return handler.ProcessMessage(msg) +} + +// StartMessageRouting starts the message bus and routing system +func (m *BridgeManager) StartMessageRouting() error { + m.logger.LogInfo("Starting message routing system") + return m.messageBus.Start() +} + +// StopMessageRouting stops the message bus and routing system +func (m *BridgeManager) StopMessageRouting() error { + m.logger.LogInfo("Stopping message routing system") + + // Cancel routing context + if m.routingCancel != nil { + m.routingCancel() + } + + // Wait for all routing goroutines to finish + m.routingWg.Wait() + + // Stop the message bus + return m.messageBus.Stop() +} diff --git a/server/bridge/mattermost/bridge.go b/server/bridge/mattermost/bridge.go index c2cdbec..7400b6c 100644 --- a/server/bridge/mattermost/bridge.go +++ b/server/bridge/mattermost/bridge.go @@ -14,6 +14,11 @@ import ( "github.com/mattermost/mattermost/server/public/plugin" ) +const ( + // defaultMessageBufferSize is the buffer size for incoming message channels + defaultMessageBufferSize = 1000 +) + // mattermostBridge handles syncing messages between Mattermost instances type mattermostBridge struct { logger logger.Logger @@ -21,6 +26,11 @@ type mattermostBridge struct { kvstore kvstore.KVStore userManager pluginModel.BridgeUserManager + // Message handling + messageHandler *mattermostMessageHandler + userResolver *mattermostUserResolver + incomingMessages chan *pluginModel.DirectionalMessage + // Connection management connected atomic.Bool ctx context.Context @@ -38,18 +48,23 @@ type mattermostBridge struct { // NewBridge creates a new Mattermost bridge func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration) pluginModel.Bridge { ctx, cancel := context.WithCancel(context.Background()) - bridge := &mattermostBridge{ - logger: log, - api: api, - kvstore: kvstore, - ctx: ctx, - cancel: cancel, - channelMappings: make(map[string]string), - config: cfg, - userManager: bridge.NewUserManager("mattermost", log), + b := &mattermostBridge{ + logger: log, + api: api, + kvstore: kvstore, + ctx: ctx, + cancel: cancel, + channelMappings: make(map[string]string), + config: cfg, + userManager: bridge.NewUserManager("mattermost", log), + incomingMessages: make(chan *pluginModel.DirectionalMessage, defaultMessageBufferSize), } - return bridge + // Initialize handlers after bridge is created + b.messageHandler = newMessageHandler(b) + b.userResolver = newUserResolver(b) + + return b } // getConfiguration safely retrieves the current configuration @@ -343,3 +358,23 @@ func (b *mattermostBridge) GetRoomMapping(roomID string) (string, error) { func (b *mattermostBridge) GetUserManager() pluginModel.BridgeUserManager { return b.userManager } + +// GetMessageChannel returns the channel for incoming messages from Mattermost +func (b *mattermostBridge) GetMessageChannel() <-chan *pluginModel.DirectionalMessage { + return b.incomingMessages +} + +// SendMessage sends a message to a Mattermost channel +func (b *mattermostBridge) SendMessage(msg *pluginModel.BridgeMessage) error { + return b.messageHandler.postMessageToMattermost(msg) +} + +// GetMessageHandler returns the message handler for this bridge +func (b *mattermostBridge) GetMessageHandler() pluginModel.MessageHandler { + return b.messageHandler +} + +// GetUserResolver returns the user resolver for this bridge +func (b *mattermostBridge) GetUserResolver() pluginModel.UserResolver { + return b.userResolver +} diff --git a/server/bridge/mattermost/message_handler.go b/server/bridge/mattermost/message_handler.go new file mode 100644 index 0000000..925165f --- /dev/null +++ b/server/bridge/mattermost/message_handler.go @@ -0,0 +1,207 @@ +package mattermost + +import ( + "fmt" + "strings" + + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" + pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" + mmModel "github.com/mattermost/mattermost/server/public/model" +) + +// mattermostMessageHandler handles incoming messages for the Mattermost bridge +type mattermostMessageHandler struct { + bridge *mattermostBridge + logger logger.Logger +} + +// newMessageHandler creates a new Mattermost message handler +func newMessageHandler(bridge *mattermostBridge) *mattermostMessageHandler { + return &mattermostMessageHandler{ + bridge: bridge, + logger: bridge.logger, + } +} + +// ProcessMessage processes an incoming message for the Mattermost bridge +func (h *mattermostMessageHandler) ProcessMessage(msg *pluginModel.DirectionalMessage) error { + h.logger.LogDebug("Processing message for Mattermost bridge", + "source_bridge", msg.SourceBridge, + "direction", msg.Direction, + "channel_id", msg.SourceChannelID) + + // Skip messages that originated from Mattermost to prevent loops + if msg.SourceBridge == "mattermost" { + h.logger.LogDebug("Skipping Mattermost-originated message to prevent loop") + return nil + } + + // For incoming messages to Mattermost, we post them to Mattermost channels + if msg.Direction == pluginModel.DirectionIncoming { + return h.postMessageToMattermost(msg.BridgeMessage) + } + + h.logger.LogDebug("Ignoring outgoing message for Mattermost bridge") + return nil +} + +// CanHandleMessage determines if this handler can process the message +func (h *mattermostMessageHandler) CanHandleMessage(msg *pluginModel.BridgeMessage) bool { + // Mattermost bridge can handle text messages that didn't originate from Mattermost + return msg.MessageType == "text" && msg.SourceBridge != "mattermost" +} + +// GetSupportedMessageTypes returns the message types this handler supports +func (h *mattermostMessageHandler) GetSupportedMessageTypes() []string { + return []string{"text"} +} + +// postMessageToMattermost posts a message to a Mattermost channel +func (h *mattermostMessageHandler) postMessageToMattermost(msg *pluginModel.BridgeMessage) error { + if h.bridge.api == nil { + return fmt.Errorf("Mattermost API not initialized") + } + + // Get the Mattermost channel ID from the channel mapping + channelID, err := h.bridge.GetRoomMapping(msg.SourceChannelID) + if err != nil { + return fmt.Errorf("failed to get channel mapping: %w", err) + } + if channelID == "" { + // Check if the source channel ID is already a Mattermost channel ID + channelID = msg.SourceChannelID + } + + // Verify the channel exists + channel, appErr := h.bridge.api.GetChannel(channelID) + if appErr != nil { + return fmt.Errorf("failed to get channel %s: %w", channelID, appErr) + } + if channel == nil { + return fmt.Errorf("channel %s not found", channelID) + } + + // Format the message content + content := h.formatMessageContent(msg) + + // Create the post + post := &mmModel.Post{ + ChannelId: channelID, + Message: content, + Type: mmModel.PostTypeDefault, + Props: map[string]interface{}{ + "from_bridge": msg.SourceBridge, + "bridge_user_id": msg.SourceUserID, + "bridge_user_name": msg.SourceUserName, + "bridge_message_id": msg.MessageID, + "bridge_timestamp": msg.Timestamp.Unix(), + }, + } + + // Add thread ID if present + if msg.ThreadID != "" { + post.RootId = msg.ThreadID + } + + // Post the message as the plugin bot + createdPost, appErr := h.bridge.api.CreatePost(post) + if appErr != nil { + return fmt.Errorf("failed to create post in channel %s: %w", channelID, appErr) + } + + h.logger.LogDebug("Message posted to Mattermost channel", + "channel_id", channelID, + "post_id", createdPost.Id, + "source_bridge", msg.SourceBridge, + "content_length", len(content)) + + return nil +} + +// formatMessageContent formats the message content for Mattermost +func (h *mattermostMessageHandler) formatMessageContent(msg *pluginModel.BridgeMessage) string { + // For messages from other bridges, prefix with the bridge info and user name + if msg.SourceUserName != "" { + bridgeIcon := h.getBridgeIcon(msg.SourceBridge) + return fmt.Sprintf("%s **%s**: %s", bridgeIcon, msg.SourceUserName, msg.Content) + } + return msg.Content +} + +// getBridgeIcon returns an icon/emoji for the source bridge +func (h *mattermostMessageHandler) getBridgeIcon(bridgeType string) string { + switch bridgeType { + case "xmpp": + return ":speech_balloon:" // Chat bubble emoji for XMPP + case "slack": + return ":slack:" // Slack emoji if available + case "discord": + return ":discord:" // Discord emoji if available + default: + return ":bridge_at_night:" // Generic bridge emoji + } +} + +// mattermostUserResolver handles user resolution for the Mattermost bridge +type mattermostUserResolver struct { + bridge *mattermostBridge + logger logger.Logger +} + +// newUserResolver creates a new Mattermost user resolver +func newUserResolver(bridge *mattermostBridge) *mattermostUserResolver { + return &mattermostUserResolver{ + bridge: bridge, + logger: bridge.logger, + } +} + +// ResolveUser converts an external user ID to an ExternalUser +func (r *mattermostUserResolver) ResolveUser(externalUserID string) (*pluginModel.ExternalUser, error) { + r.logger.LogDebug("Resolving Mattermost user", "user_id", externalUserID) + + // For Mattermost, the external user ID is the Mattermost user ID + user, appErr := r.bridge.api.GetUser(externalUserID) + if appErr != nil { + return nil, fmt.Errorf("failed to get Mattermost user: %w", appErr) + } + + if user == nil { + return nil, fmt.Errorf("Mattermost user not found: %s", externalUserID) + } + + return &pluginModel.ExternalUser{ + BridgeType: "mattermost", + ExternalUserID: externalUserID, + DisplayName: r.GetDisplayName(externalUserID), + MattermostUserID: externalUserID, // Same as external ID for Mattermost + }, nil +} + +// FormatUserMention formats a user mention for Markdown content +func (r *mattermostUserResolver) FormatUserMention(user *pluginModel.ExternalUser) string { + // For Mattermost, use the standard @username format + return fmt.Sprintf("@%s", user.DisplayName) +} + +// GetDisplayName extracts display name from external user ID +func (r *mattermostUserResolver) GetDisplayName(externalUserID string) string { + // Try to get the actual username from Mattermost API + user, appErr := r.bridge.api.GetUser(externalUserID) + if appErr != nil || user == nil { + r.logger.LogWarn("Failed to get user for display name", "user_id", externalUserID) + return "Unknown User" + } + + // Prefer username, fallback to first name + last name, then to ID + if user.Username != "" { + return user.Username + } + + fullName := strings.TrimSpace(user.FirstName + " " + user.LastName) + if fullName != "" { + return fullName + } + + return user.Id[:8] // Show first 8 chars of ID as fallback +} \ No newline at end of file diff --git a/server/bridge/messagebus.go b/server/bridge/messagebus.go new file mode 100644 index 0000000..f51babd --- /dev/null +++ b/server/bridge/messagebus.go @@ -0,0 +1,244 @@ +package bridge + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" +) + +const ( + // DefaultMessageBufferSize is the default buffer size for message channels + DefaultMessageBufferSize = 1000 + + // MessageDeliveryTimeout is the maximum time to wait for message delivery + MessageDeliveryTimeout = 5 * time.Second +) + +// messageBus implements the MessageBus interface +type messageBus struct { + // Core messaging + incomingMessages chan *model.DirectionalMessage + subscribers map[string]chan *model.DirectionalMessage + subscribersMu sync.RWMutex + + // Lifecycle management + ctx context.Context + cancel context.CancelFunc + logger logger.Logger + wg sync.WaitGroup + started bool + startMu sync.Mutex +} + +// NewMessageBus creates a new message bus instance +func NewMessageBus(logger logger.Logger) model.MessageBus { + ctx, cancel := context.WithCancel(context.Background()) + + return &messageBus{ + incomingMessages: make(chan *model.DirectionalMessage, DefaultMessageBufferSize), + subscribers: make(map[string]chan *model.DirectionalMessage), + ctx: ctx, + cancel: cancel, + logger: logger, + } +} + +// Subscribe returns a channel that receives messages for the specified bridge +func (mb *messageBus) Subscribe(bridgeName string) <-chan *model.DirectionalMessage { + mb.subscribersMu.Lock() + defer mb.subscribersMu.Unlock() + + // Create a buffered channel for this subscriber + ch := make(chan *model.DirectionalMessage, DefaultMessageBufferSize) + mb.subscribers[bridgeName] = ch + + mb.logger.LogDebug("Bridge subscribed to message bus", "bridge", bridgeName) + return ch +} + +// Publish sends a message to the message bus for routing +func (mb *messageBus) Publish(msg *model.DirectionalMessage) error { + if msg == nil { + return fmt.Errorf("message cannot be nil") + } + + if msg.BridgeMessage == nil { + return fmt.Errorf("bridge message cannot be nil") + } + + select { + case mb.incomingMessages <- msg: + mb.logger.LogDebug("Message published to bus", + "source_bridge", msg.SourceBridge, + "direction", msg.Direction, + "channel_id", msg.SourceChannelID) + return nil + case <-time.After(MessageDeliveryTimeout): + mb.logger.LogWarn("Message delivery timeout", + "source_bridge", msg.SourceBridge, + "channel_id", msg.SourceChannelID) + return fmt.Errorf("message delivery timeout") + case <-mb.ctx.Done(): + return fmt.Errorf("message bus is shutting down") + } +} + +// Start begins message routing +func (mb *messageBus) Start() error { + mb.startMu.Lock() + defer mb.startMu.Unlock() + + if mb.started { + return fmt.Errorf("message bus is already started") + } + + mb.logger.LogInfo("Starting message bus") + + // Start the message routing goroutine + mb.wg.Add(1) + go mb.routeMessages() + + mb.started = true + mb.logger.LogInfo("Message bus started successfully") + return nil +} + +// Stop ends message routing and cleans up resources +func (mb *messageBus) Stop() error { + mb.startMu.Lock() + defer mb.startMu.Unlock() + + if !mb.started { + return nil // Already stopped + } + + mb.logger.LogInfo("Stopping message bus") + + // Cancel context to signal shutdown + mb.cancel() + + // Wait for routing goroutine to finish + mb.wg.Wait() + + // Close all subscriber channels + mb.subscribersMu.Lock() + for bridgeName, ch := range mb.subscribers { + close(ch) + mb.logger.LogDebug("Closed subscriber channel", "bridge", bridgeName) + } + mb.subscribers = make(map[string]chan *model.DirectionalMessage) + mb.subscribersMu.Unlock() + + // Close incoming messages channel + close(mb.incomingMessages) + + mb.started = false + mb.logger.LogInfo("Message bus stopped successfully") + return nil +} + +// routeMessages handles the main message routing loop +func (mb *messageBus) routeMessages() { + defer mb.wg.Done() + + mb.logger.LogDebug("Message routing started") + + for { + select { + case msg, ok := <-mb.incomingMessages: + if !ok { + mb.logger.LogDebug("Incoming messages channel closed, stopping routing") + return + } + + if err := mb.routeMessage(msg); err != nil { + mb.logger.LogError("Failed to route message", + "source_bridge", msg.SourceBridge, + "direction", msg.Direction, + "error", err) + } + + case <-mb.ctx.Done(): + mb.logger.LogDebug("Context cancelled, stopping message routing") + return + } + } +} + +// routeMessage routes a single message to appropriate subscribers +func (mb *messageBus) routeMessage(msg *model.DirectionalMessage) error { + mb.subscribersMu.RLock() + defer mb.subscribersMu.RUnlock() + + routedCount := 0 + + // Route to specific target bridges if specified + if len(msg.TargetBridges) > 0 { + for _, targetBridge := range msg.TargetBridges { + if ch, exists := mb.subscribers[targetBridge]; exists { + if mb.deliverMessage(ch, msg, targetBridge) { + routedCount++ + } + } else { + mb.logger.LogWarn("Target bridge not subscribed", + "target_bridge", targetBridge, + "source_bridge", msg.SourceBridge) + } + } + } else { + // Route to all subscribers except the source bridge + for bridgeName, ch := range mb.subscribers { + if bridgeName != msg.SourceBridge { + if mb.deliverMessage(ch, msg, bridgeName) { + routedCount++ + } + } + } + } + + mb.logger.LogDebug("Message routed", + "source_bridge", msg.SourceBridge, + "routed_to_count", routedCount) + + return nil +} + +// deliverMessage attempts to deliver a message to a specific subscriber +func (mb *messageBus) deliverMessage(ch chan *model.DirectionalMessage, msg *model.DirectionalMessage, targetBridge string) bool { + select { + case ch <- msg: + return true + case <-time.After(MessageDeliveryTimeout): + mb.logger.LogWarn("Message delivery timeout to bridge", + "target_bridge", targetBridge, + "source_bridge", msg.SourceBridge) + return false + case <-mb.ctx.Done(): + return false + } +} + +// GetStats returns statistics about the message bus +func (mb *messageBus) GetStats() map[string]interface{} { + mb.subscribersMu.RLock() + defer mb.subscribersMu.RUnlock() + + stats := map[string]interface{}{ + "started": mb.started, + "subscriber_count": len(mb.subscribers), + "buffer_size": DefaultMessageBufferSize, + "pending_messages": len(mb.incomingMessages), + } + + subscribers := make([]string, 0, len(mb.subscribers)) + for bridgeName := range mb.subscribers { + subscribers = append(subscribers, bridgeName) + } + stats["subscribers"] = subscribers + + return stats +} \ No newline at end of file diff --git a/server/bridge/xmpp/bridge.go b/server/bridge/xmpp/bridge.go index b375ce4..ca05e78 100644 --- a/server/bridge/xmpp/bridge.go +++ b/server/bridge/xmpp/bridge.go @@ -18,6 +18,11 @@ import ( "github.com/mattermost/mattermost/server/public/plugin" ) +const ( + // defaultMessageBufferSize is the buffer size for incoming message channels + defaultMessageBufferSize = 1000 +) + // xmppBridge handles syncing messages between Mattermost and XMPP type xmppBridge struct { logger logger.Logger @@ -26,6 +31,11 @@ type xmppBridge struct { bridgeClient *xmppClient.Client // Main bridge XMPP client connection userManager pluginModel.BridgeUserManager + // Message handling + messageHandler *xmppMessageHandler + userResolver *xmppUserResolver + incomingMessages chan *pluginModel.DirectionalMessage + // Connection management connected atomic.Bool ctx context.Context @@ -44,16 +54,21 @@ type xmppBridge struct { func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration) pluginModel.Bridge { ctx, cancel := context.WithCancel(context.Background()) b := &xmppBridge{ - logger: log, - api: api, - kvstore: kvstore, - ctx: ctx, - cancel: cancel, - channelMappings: make(map[string]string), - config: cfg, - userManager: bridge.NewUserManager("xmpp", log), + logger: log, + api: api, + kvstore: kvstore, + ctx: ctx, + cancel: cancel, + channelMappings: make(map[string]string), + config: cfg, + userManager: bridge.NewUserManager("xmpp", log), + incomingMessages: make(chan *pluginModel.DirectionalMessage, defaultMessageBufferSize), } + // Initialize handlers after bridge is created + b.messageHandler = newMessageHandler(b) + b.userResolver = newUserResolver(b) + // Initialize XMPP client with configuration if cfg.EnableSync && cfg.XMPPServerURL != "" && cfg.XMPPUsername != "" && cfg.XMPPPassword != "" { b.bridgeClient = b.createXMPPClient(cfg) @@ -160,6 +175,9 @@ func (b *xmppBridge) Start() error { // Start connection monitor go b.connectionMonitor() + // Start message aggregation + go b.startMessageAggregation() + b.logger.LogInfo("Mattermost to XMPP bridge started successfully") return nil } @@ -533,3 +551,67 @@ func (b *xmppBridge) GetRoomMapping(roomID string) (string, error) { func (b *xmppBridge) GetUserManager() pluginModel.BridgeUserManager { return b.userManager } + +// startMessageAggregation starts the message aggregation goroutine +func (b *xmppBridge) startMessageAggregation() { + b.logger.LogDebug("Starting XMPP message aggregation") + + for { + select { + case <-b.ctx.Done(): + b.logger.LogDebug("Stopping XMPP message aggregation") + return + default: + // Aggregate messages from bridge client if available + if b.bridgeClient != nil { + clientChannel := b.bridgeClient.GetMessageChannel() + select { + case msg, ok := <-clientChannel: + if !ok { + b.logger.LogDebug("Bridge client message channel closed") + continue + } + + // Forward to our bridge's message channel + select { + case b.incomingMessages <- msg: + b.logger.LogDebug("Message forwarded from bridge client", + "source_channel", msg.SourceChannelID, + "user_id", msg.SourceUserID) + default: + b.logger.LogWarn("Bridge message channel full, dropping message", + "source_channel", msg.SourceChannelID, + "user_id", msg.SourceUserID) + } + case <-b.ctx.Done(): + return + default: + // No messages available, continue with other potential sources + } + } + + // TODO: Add aggregation from user client channels when implemented + // This is where we would aggregate from multiple XMPP user connections + } + } +} + +// GetMessageChannel returns the channel for incoming messages from XMPP +func (b *xmppBridge) GetMessageChannel() <-chan *pluginModel.DirectionalMessage { + return b.incomingMessages +} + +// SendMessage sends a message to an XMPP room +func (b *xmppBridge) SendMessage(msg *pluginModel.BridgeMessage) error { + return b.messageHandler.sendMessageToXMPP(msg) +} + +// GetMessageHandler returns the message handler for this bridge +func (b *xmppBridge) GetMessageHandler() pluginModel.MessageHandler { + return b.messageHandler +} + +// GetUserResolver returns the user resolver for this bridge +func (b *xmppBridge) GetUserResolver() pluginModel.UserResolver { + return b.userResolver +} diff --git a/server/bridge/xmpp/message_handler.go b/server/bridge/xmpp/message_handler.go new file mode 100644 index 0000000..3916574 --- /dev/null +++ b/server/bridge/xmpp/message_handler.go @@ -0,0 +1,166 @@ +package xmpp + +import ( + "fmt" + "strings" + + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" + pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" + xmppClient "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp" +) + +// xmppMessageHandler handles incoming messages for the XMPP bridge +type xmppMessageHandler struct { + bridge *xmppBridge + logger logger.Logger +} + +// newMessageHandler creates a new XMPP message handler +func newMessageHandler(bridge *xmppBridge) *xmppMessageHandler { + return &xmppMessageHandler{ + bridge: bridge, + logger: bridge.logger, + } +} + +// ProcessMessage processes an incoming message for the XMPP bridge +func (h *xmppMessageHandler) ProcessMessage(msg *pluginModel.DirectionalMessage) error { + h.logger.LogDebug("Processing message for XMPP bridge", + "source_bridge", msg.SourceBridge, + "direction", msg.Direction, + "channel_id", msg.SourceChannelID) + + // Skip messages that originated from XMPP to prevent loops + if msg.SourceBridge == "xmpp" { + h.logger.LogDebug("Skipping XMPP-originated message to prevent loop") + return nil + } + + // For incoming messages to XMPP, we send them to XMPP rooms + if msg.Direction == pluginModel.DirectionIncoming { + return h.sendMessageToXMPP(msg.BridgeMessage) + } + + h.logger.LogDebug("Ignoring outgoing message for XMPP bridge") + return nil +} + +// CanHandleMessage determines if this handler can process the message +func (h *xmppMessageHandler) CanHandleMessage(msg *pluginModel.BridgeMessage) bool { + // XMPP bridge can handle text messages that didn't originate from XMPP + return msg.MessageType == "text" && msg.SourceBridge != "xmpp" +} + +// GetSupportedMessageTypes returns the message types this handler supports +func (h *xmppMessageHandler) GetSupportedMessageTypes() []string { + return []string{"text"} +} + +// sendMessageToXMPP sends a message to an XMPP room +func (h *xmppMessageHandler) sendMessageToXMPP(msg *pluginModel.BridgeMessage) error { + if h.bridge.bridgeClient == nil { + return fmt.Errorf("XMPP client not initialized") + } + + if !h.bridge.connected.Load() { + return fmt.Errorf("not connected to XMPP server") + } + + // Get the XMPP room JID from the channel mapping + roomJID, err := h.bridge.GetChannelMapping(msg.SourceChannelID) + if err != nil { + return fmt.Errorf("failed to get room mapping: %w", err) + } + if roomJID == "" { + return fmt.Errorf("channel is not mapped to any XMPP room") + } + + // Format the message content with user information + content := h.formatMessageContent(msg) + + // Create XMPP message request + req := xmppClient.MessageRequest{ + RoomJID: roomJID, + Message: content, + } + + // Send the message + _, err = h.bridge.bridgeClient.SendMessage(req) + if err != nil { + return fmt.Errorf("failed to send message to XMPP room: %w", err) + } + + h.logger.LogDebug("Message sent to XMPP room", + "channel_id", msg.SourceChannelID, + "room_jid", roomJID, + "content_length", len(content)) + + return nil +} + +// formatMessageContent formats the message content for XMPP +func (h *xmppMessageHandler) formatMessageContent(msg *pluginModel.BridgeMessage) string { + // For messages from other bridges, prefix with the user name + if msg.SourceUserName != "" { + return fmt.Sprintf("<%s> %s", msg.SourceUserName, msg.Content) + } + return msg.Content +} + +// xmppUserResolver handles user resolution for the XMPP bridge +type xmppUserResolver struct { + bridge *xmppBridge + logger logger.Logger +} + +// newUserResolver creates a new XMPP user resolver +func newUserResolver(bridge *xmppBridge) *xmppUserResolver { + return &xmppUserResolver{ + bridge: bridge, + logger: bridge.logger, + } +} + +// ResolveUser converts an external user ID to an ExternalUser +func (r *xmppUserResolver) ResolveUser(externalUserID string) (*pluginModel.ExternalUser, error) { + r.logger.LogDebug("Resolving XMPP user", "user_id", externalUserID) + + // For XMPP, the external user ID is typically the full JID + return &pluginModel.ExternalUser{ + BridgeType: "xmpp", + ExternalUserID: externalUserID, + DisplayName: r.GetDisplayName(externalUserID), + MattermostUserID: "", // Will be resolved by user mapping system + }, nil +} + +// FormatUserMention formats a user mention for Markdown content +func (r *xmppUserResolver) FormatUserMention(user *pluginModel.ExternalUser) string { + // For XMPP, we can format mentions as simple text with the display name + return fmt.Sprintf("@%s", user.DisplayName) +} + +// GetDisplayName extracts display name from external user ID +func (r *xmppUserResolver) GetDisplayName(externalUserID string) string { + // For XMPP JIDs, extract the local part or resource as display name + // Format: user@domain/resource -> use resource or user + if len(externalUserID) == 0 { + return "Unknown User" + } + + // Try to parse as JID and extract meaningful display name + parts := strings.Split(externalUserID, "/") + if len(parts) > 1 { + // Has resource part, use it as display name + return parts[1] + } + + // No resource, try to extract local part from user@domain + atIndex := strings.Index(externalUserID, "@") + if atIndex > 0 { + return externalUserID[:atIndex] + } + + // Fallback to the full ID + return externalUserID +} \ No newline at end of file diff --git a/server/model/bridge.go b/server/model/bridge.go index a5df73e..f274e0e 100644 --- a/server/model/bridge.go +++ b/server/model/bridge.go @@ -152,6 +152,12 @@ type Bridge interface { // GetUserManager returns the user manager for this bridge. GetUserManager() BridgeUserManager + + // Message handling for bidirectional communication + GetMessageChannel() <-chan *DirectionalMessage + SendMessage(msg *BridgeMessage) error + GetMessageHandler() MessageHandler + GetUserResolver() UserResolver } // BridgeUser represents a user connected to any bridge service diff --git a/server/model/message.go b/server/model/message.go new file mode 100644 index 0000000..5b2a252 --- /dev/null +++ b/server/model/message.go @@ -0,0 +1,88 @@ +package model + +import ( + "time" +) + +// MessageDirection indicates the direction of message flow +type MessageDirection string + +const ( + DirectionIncoming MessageDirection = "incoming" // From external system to us + DirectionOutgoing MessageDirection = "outgoing" // From us to external system +) + +// BridgeMessage represents a message that can be passed between any bridge types +type BridgeMessage struct { + // Source information + SourceBridge string // "xmpp", "mattermost", "slack", etc. + SourceChannelID string // Channel ID in source system + SourceUserID string // User ID in source system (JID, user ID, etc.) + SourceUserName string // Display name in source system + + // Message content (standardized on Markdown) + Content string // Markdown formatted message content + MessageType string // "text", "image", "file", etc. + + // Metadata + Timestamp time.Time // When message was received + MessageID string // Unique message ID from source + ThreadID string // Thread/reply ID (if applicable) + + // Routing hints + TargetBridges []string // Which bridges should receive this + Metadata map[string]any // Bridge-specific metadata +} + +// DirectionalMessage wraps a BridgeMessage with direction information +type DirectionalMessage struct { + *BridgeMessage + Direction MessageDirection +} + +// ExternalUser represents a user from any bridge system +type ExternalUser struct { + BridgeType string // "xmpp", "slack", etc. + ExternalUserID string // JID, Slack user ID, etc. + DisplayName string // How to display this user + MattermostUserID string // Mapped Mattermost user (if exists) +} + +// UserResolver handles user resolution for a specific bridge +type UserResolver interface { + // ResolveUser converts an external user ID to an ExternalUser + ResolveUser(externalUserID string) (*ExternalUser, error) + + // FormatUserMention formats a user mention for Markdown content + FormatUserMention(user *ExternalUser) string + + // GetDisplayName extracts display name from external user ID + GetDisplayName(externalUserID string) string +} + +// MessageBus handles routing messages between bridges +type MessageBus interface { + // Subscribe returns a channel that receives messages for the specified bridge + Subscribe(bridgeName string) <-chan *DirectionalMessage + + // Publish sends a message to the message bus for routing + Publish(msg *DirectionalMessage) error + + // Start begins message routing + Start() error + + // Stop ends message routing and cleans up resources + Stop() error +} + +// MessageHandler processes incoming messages for a bridge +type MessageHandler interface { + // ProcessMessage handles an incoming message + ProcessMessage(msg *DirectionalMessage) error + + // CanHandleMessage determines if this handler can process the message + CanHandleMessage(msg *BridgeMessage) bool + + // GetSupportedMessageTypes returns the message types this handler supports + GetSupportedMessageTypes() []string +} \ No newline at end of file diff --git a/server/xmpp/client.go b/server/xmpp/client.go index 83f3709..7f32100 100644 --- a/server/xmpp/client.go +++ b/server/xmpp/client.go @@ -9,7 +9,9 @@ import ( "time" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" "mellium.im/sasl" + "mellium.im/xmlstream" "mellium.im/xmpp" "mellium.im/xmpp/disco" "mellium.im/xmpp/jid" @@ -21,6 +23,9 @@ import ( const ( // defaultOperationTimeout is the default timeout for XMPP operations defaultOperationTimeout = 5 * time.Second + + // msgBufferSize is the buffer size for incoming message channels + msgBufferSize = 1000 ) // Client represents an XMPP client for communicating with XMPP servers. @@ -43,6 +48,9 @@ type Client struct { mux *mux.ServeMux sessionReady chan struct{} sessionServing bool + + // Message handling for bridge integration + incomingMessages chan *model.DirectionalMessage } // MessageRequest represents a request to send a message. @@ -90,22 +98,31 @@ type UserProfile struct { // NewClient creates a new XMPP client. func NewClient(serverURL, username, password, resource, remoteID string, logger logger.Logger) *Client { ctx, cancel := context.WithCancel(context.Background()) - mucClient := &muc.Client{} - mux := mux.New("jabber:client", muc.HandleClient(mucClient)) - return &Client{ - serverURL: serverURL, - username: username, - password: password, - resource: resource, - remoteID: remoteID, - logger: logger, - ctx: ctx, - cancel: cancel, - mucClient: mucClient, - mux: mux, - sessionReady: make(chan struct{}), + client := &Client{ + serverURL: serverURL, + username: username, + password: password, + resource: resource, + remoteID: remoteID, + logger: logger, + ctx: ctx, + cancel: cancel, + sessionReady: make(chan struct{}), + incomingMessages: make(chan *model.DirectionalMessage, msgBufferSize), } + + // Create MUC client and set up message handling + mucClient := &muc.Client{} + client.mucClient = mucClient + + // Create mux with MUC client and our message handler + mux := mux.New("jabber:client", + muc.HandleClient(mucClient), + mux.MessageFunc(stanza.GroupChatMessage, xml.Name{}, client.handleIncomingMessage)) + client.mux = mux + + return client } // NewClientWithTLS creates a new XMPP client with custom TLS configuration. @@ -599,3 +616,110 @@ func (c *Client) Ping() error { c.logger.LogDebug("XMPP ping successful", "duration", duration) return nil } + +// GetMessageChannel returns the channel for incoming messages (Bridge interface) +func (c *Client) GetMessageChannel() <-chan *model.DirectionalMessage { + return c.incomingMessages +} + +// handleIncomingMessage processes incoming XMPP message stanzas +func (c *Client) handleIncomingMessage(msg stanza.Message, t xmlstream.TokenReadEncoder) error { + c.logger.LogDebug("Received XMPP message", + "from", msg.From.String(), + "to", msg.To.String(), + "type", fmt.Sprintf("%v", msg.Type)) + + // Only process groupchat messages for now (MUC messages from channels) + if msg.Type != stanza.GroupChatMessage { + c.logger.LogDebug("Ignoring non-groupchat message", "type", fmt.Sprintf("%v", msg.Type)) + return nil + } + + // Parse the message body from the token reader + var msgWithBody struct { + stanza.Message + Body string `xml:"body"` + } + msgWithBody.Message = msg + + d := xml.NewTokenDecoder(t) + if err := d.DecodeElement(&msgWithBody, nil); err != nil { + c.logger.LogError("Failed to decode message body", "error", err) + return err + } + + if msgWithBody.Body == "" { + c.logger.LogDebug("Message has no body, ignoring") + return nil + } + + // Extract channel and user information from JIDs + channelID, err := c.extractChannelID(msg.From) + if err != nil { + c.logger.LogError("Failed to extract channel ID from JID", "from", msg.From.String(), "error", err) + return nil + } + + userID, userName := c.extractUserInfo(msg.From) + + // Create BridgeMessage + bridgeMsg := &model.BridgeMessage{ + SourceBridge: "xmpp", + SourceChannelID: channelID, + SourceUserID: userID, + SourceUserName: userName, + Content: msgWithBody.Body, // Already Markdown compatible + MessageType: "text", + Timestamp: time.Now(), // XMPP doesn't always provide timestamps + MessageID: msg.ID, + TargetBridges: []string{}, // Will be routed to all other bridges + Metadata: map[string]any{ + "xmpp_from": msg.From.String(), + "xmpp_to": msg.To.String(), + }, + } + + // Wrap in directional message + directionalMsg := &model.DirectionalMessage{ + BridgeMessage: bridgeMsg, + Direction: model.DirectionIncoming, + } + + // Send to message channel (non-blocking) + select { + case c.incomingMessages <- directionalMsg: + c.logger.LogDebug("Message queued for processing", + "channel_id", channelID, + "user_id", userID, + "content_length", len(msgWithBody.Body)) + default: + c.logger.LogWarn("Message channel full, dropping message", + "channel_id", channelID, + "user_id", userID) + } + + return nil +} + +// extractChannelID extracts the channel ID (room bare JID) from a message JID +func (c *Client) extractChannelID(from jid.JID) (string, error) { + // For MUC messages, the channel ID is the bare JID (without resource/nickname) + return from.Bare().String(), nil +} + +// extractUserInfo extracts user ID and display name from a message JID +func (c *Client) extractUserInfo(from jid.JID) (string, string) { + // For MUC messages, the resource part is the nickname + nickname := from.Resourcepart() + + // Use the full JID as user ID for XMPP + userID := from.String() + + // Use nickname as display name if available, otherwise use full JID + displayName := nickname + if displayName == "" { + displayName = from.String() + } + + return userID, displayName +} From eb852662f70f09bf750d023e98257288287e32ec Mon Sep 17 00:00:00 2001 From: Felipe Martin Date: Tue, 5 Aug 2025 12:19:44 +0200 Subject: [PATCH 3/7] feat: implement TTL cache for message deduplication and remove debug logging MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace manual map-based deduplication with jellydator/ttlcache/v3 - Add automatic cache eviction with 30-second TTL to prevent memory bloat - Implement proper cache lifecycle management (start/stop) - Remove emoji debug logs from bridge system and XMPP client - Clean up verbose logging while maintaining essential error handling - Update bridge interface method names for consistency 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- go.mod | 3 +- go.sum | 4 ++ server/bridge/manager.go | 77 +++++++++++++-------- server/bridge/mattermost/bridge.go | 23 +++++- server/bridge/mattermost/message_handler.go | 17 ++--- server/bridge/xmpp/bridge.go | 66 +++++++++--------- server/model/bridge.go | 11 +-- server/plugin.go | 27 ++------ server/xmpp/client.go | 40 +++++++++-- 9 files changed, 163 insertions(+), 105 deletions(-) diff --git a/go.mod b/go.mod index efd6373..b1bc908 100644 --- a/go.mod +++ b/go.mod @@ -102,6 +102,7 @@ require ( github.com/hashicorp/yamux v0.1.2 // indirect github.com/hexops/gotextdiff v1.0.3 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/jellydator/ttlcache/v3 v3.4.0 // indirect github.com/jgautheron/goconst v1.7.1 // indirect github.com/jingyugao/rowserrcheck v1.1.1 // indirect github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect @@ -210,7 +211,7 @@ require ( golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f // indirect golang.org/x/mod v0.22.0 // indirect golang.org/x/net v0.34.0 // indirect - golang.org/x/sync v0.10.0 // indirect + golang.org/x/sync v0.15.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/term v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect diff --git a/go.sum b/go.sum index 9a27f5c..8d1f976 100644 --- a/go.sum +++ b/go.sum @@ -366,6 +366,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= +github.com/jellydator/ttlcache/v3 v3.4.0 h1:YS4P125qQS0tNhtL6aeYkheEaB/m8HCqdMMP4mnWdTY= +github.com/jellydator/ttlcache/v3 v3.4.0/go.mod h1:Hw9EgjymziQD3yGsQdf1FqFdpp7YjFMd4Srg5EJlgD4= github.com/jgautheron/goconst v1.7.1 h1:VpdAG7Ca7yvvJk5n8dMwQhfEZJh95kl/Hl9S1OI5Jkk= github.com/jgautheron/goconst v1.7.1/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4= github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c= @@ -867,6 +869,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= +golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/server/bridge/manager.go b/server/bridge/manager.go index 4259613..d839708 100644 --- a/server/bridge/manager.go +++ b/server/bridge/manager.go @@ -14,15 +14,15 @@ import ( // BridgeManager manages multiple bridge instances type BridgeManager struct { - bridges map[string]model.Bridge - mu sync.RWMutex - logger logger.Logger - api plugin.API - remoteID string - messageBus model.MessageBus - routingCtx context.Context + bridges map[string]model.Bridge + mu sync.RWMutex + logger logger.Logger + api plugin.API + remoteID string + messageBus model.MessageBus + routingCtx context.Context routingCancel context.CancelFunc - routingWg sync.WaitGroup + routingWg sync.WaitGroup } // NewBridgeManager creates a new bridge manager @@ -163,6 +163,20 @@ func (m *BridgeManager) ListBridges() []string { return bridges } +// Start starts the bridge manager and message routing system +func (m *BridgeManager) Start() error { + m.logger.LogInfo("Starting bridge manager") + + // Start the message routing system + if err := m.StartMessageRouting(); err != nil { + m.logger.LogError("Failed to start message routing", "error", err) + return fmt.Errorf("failed to start message routing: %w", err) + } + + m.logger.LogInfo("Bridge manager started successfully") + return nil +} + // HasBridge checks if a bridge with the given name is registered func (m *BridgeManager) HasBridge(name string) bool { m.mu.RLock() @@ -187,6 +201,11 @@ func (m *BridgeManager) Shutdown() error { m.logger.LogInfo("Shutting down bridge manager", "bridge_count", len(m.bridges)) + // Stop message routing first + if err := m.StopMessageRouting(); err != nil { + m.logger.LogError("Failed to stop message routing during shutdown", "error", err) + } + var errors []error for name, bridge := range m.bridges { if bridge.IsConnected() { @@ -260,7 +279,7 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque } // NEW: Check if room already mapped to another channel - existingChannelID, err := bridge.GetRoomMapping(req.BridgeRoomID) + existingChannelID, err := bridge.GetChannelMapping(req.BridgeRoomID) if err != nil { m.logger.LogError("Failed to check room mapping", "bridge_room_id", req.BridgeRoomID, "error", err) return fmt.Errorf("failed to check room mapping: %w", err) @@ -274,7 +293,7 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque } // NEW: Check if room exists on target bridge - roomExists, err := bridge.RoomExists(req.BridgeRoomID) + roomExists, err := bridge.ChannelMappingExists(req.BridgeRoomID) if err != nil { m.logger.LogError("Failed to check room existence", "bridge_room_id", req.BridgeRoomID, "error", err) return fmt.Errorf("failed to check room existence: %w", err) @@ -422,16 +441,16 @@ func (m *BridgeManager) unshareChannel(channelID string) error { // startBridgeMessageHandler starts message handling for a specific bridge func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge model.Bridge) { m.logger.LogDebug("Starting message handler for bridge", "bridge", bridgeName) - + // Subscribe to message bus messageChannel := m.messageBus.Subscribe(bridgeName) - + // Start message routing goroutine m.routingWg.Add(1) go func() { defer m.routingWg.Done() defer m.logger.LogDebug("Message handler stopped for bridge", "bridge", bridgeName) - + for { select { case msg, ok := <-messageChannel: @@ -439,27 +458,27 @@ func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge mode m.logger.LogDebug("Message channel closed for bridge", "bridge", bridgeName) return } - + if err := m.handleBridgeMessage(bridgeName, bridge, msg); err != nil { - m.logger.LogError("Failed to handle message for bridge", + m.logger.LogError("Failed to handle message for bridge", "bridge", bridgeName, "source_bridge", msg.SourceBridge, "error", err) } - + case <-m.routingCtx.Done(): m.logger.LogDebug("Context cancelled, stopping message handler", "bridge", bridgeName) return } } }() - + // Listen to bridge's outgoing messages m.routingWg.Add(1) go func() { defer m.routingWg.Done() defer m.logger.LogDebug("Bridge message listener stopped", "bridge", bridgeName) - + bridgeMessageChannel := bridge.GetMessageChannel() for { select { @@ -468,14 +487,14 @@ func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge mode m.logger.LogDebug("Bridge message channel closed", "bridge", bridgeName) return } - + if err := m.messageBus.Publish(msg); err != nil { - m.logger.LogError("Failed to publish message from bridge", + m.logger.LogError("Failed to publish message from bridge", "bridge", bridgeName, "direction", msg.Direction, "error", err) } - + case <-m.routingCtx.Done(): m.logger.LogDebug("Context cancelled, stopping bridge listener", "bridge", bridgeName) return @@ -486,26 +505,26 @@ func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge mode // handleBridgeMessage processes an incoming message for a specific bridge func (m *BridgeManager) handleBridgeMessage(bridgeName string, bridge model.Bridge, msg *model.DirectionalMessage) error { - m.logger.LogDebug("Handling message for bridge", + m.logger.LogDebug("Handling message for bridge", "target_bridge", bridgeName, "source_bridge", msg.SourceBridge, "direction", msg.Direction, "channel_id", msg.SourceChannelID) - + // Get the bridge's message handler handler := bridge.GetMessageHandler() if handler == nil { return fmt.Errorf("bridge %s does not have a message handler", bridgeName) } - + // Check if the handler can process this message if !handler.CanHandleMessage(msg.BridgeMessage) { - m.logger.LogDebug("Bridge cannot handle message", + m.logger.LogDebug("Bridge cannot handle message", "bridge", bridgeName, "message_type", msg.MessageType) return nil // Not an error, just skip } - + // Process the message return handler.ProcessMessage(msg) } @@ -519,15 +538,15 @@ func (m *BridgeManager) StartMessageRouting() error { // StopMessageRouting stops the message bus and routing system func (m *BridgeManager) StopMessageRouting() error { m.logger.LogInfo("Stopping message routing system") - + // Cancel routing context if m.routingCancel != nil { m.routingCancel() } - + // Wait for all routing goroutines to finish m.routingWg.Wait() - + // Stop the message bus return m.messageBus.Stop() } diff --git a/server/bridge/mattermost/bridge.go b/server/bridge/mattermost/bridge.go index 7400b6c..8ff59d3 100644 --- a/server/bridge/mattermost/bridge.go +++ b/server/bridge/mattermost/bridge.go @@ -25,6 +25,7 @@ type mattermostBridge struct { api plugin.API kvstore kvstore.KVStore userManager pluginModel.BridgeUserManager + botUserID string // Bot user ID for posting messages // Message handling messageHandler *mattermostMessageHandler @@ -46,12 +47,13 @@ type mattermostBridge struct { } // NewBridge creates a new Mattermost bridge -func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration) pluginModel.Bridge { +func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration, botUserID string) pluginModel.Bridge { ctx, cancel := context.WithCancel(context.Background()) b := &mattermostBridge{ logger: log, api: api, kvstore: kvstore, + botUserID: botUserID, ctx: ctx, cancel: cancel, channelMappings: make(map[string]string), @@ -304,8 +306,8 @@ func (b *mattermostBridge) DeleteChannelMapping(channelID string) error { return nil } -// RoomExists checks if a Mattermost channel exists on the server -func (b *mattermostBridge) RoomExists(roomID string) (bool, error) { +// ChannelMappingExists checks if a Mattermost channel exists on the server +func (b *mattermostBridge) ChannelMappingExists(roomID string) (bool, error) { if b.api == nil { return false, fmt.Errorf("Mattermost API not initialized") } @@ -354,6 +356,21 @@ func (b *mattermostBridge) GetRoomMapping(roomID string) (string, error) { return channelID, nil } +// GetChannelMappingForBridge retrieves the Mattermost channel ID for a given room ID from a specific bridge +func (b *mattermostBridge) GetChannelMappingForBridge(bridgeName, roomID string) (string, error) { + channelIDBytes, err := b.kvstore.Get(kvstore.BuildChannelMapKey(bridgeName, roomID)) + if err != nil { + // No mapping found is not an error, just return empty string + b.logger.LogDebug("No channel mapping found for bridge room", "bridge_name", bridgeName, "room_id", roomID) + return "", nil + } + + channelID := string(channelIDBytes) + b.logger.LogDebug("Found channel mapping for bridge room", "bridge_name", bridgeName, "room_id", roomID, "channel_id", channelID) + + return channelID, nil +} + // GetUserManager returns the user manager for this bridge func (b *mattermostBridge) GetUserManager() pluginModel.BridgeUserManager { return b.userManager diff --git a/server/bridge/mattermost/message_handler.go b/server/bridge/mattermost/message_handler.go index 925165f..691ae7f 100644 --- a/server/bridge/mattermost/message_handler.go +++ b/server/bridge/mattermost/message_handler.go @@ -62,8 +62,8 @@ func (h *mattermostMessageHandler) postMessageToMattermost(msg *pluginModel.Brid return fmt.Errorf("Mattermost API not initialized") } - // Get the Mattermost channel ID from the channel mapping - channelID, err := h.bridge.GetRoomMapping(msg.SourceChannelID) + // Get the Mattermost channel ID from the channel mapping using the source bridge name + channelID, err := h.bridge.GetChannelMappingForBridge(msg.SourceBridge, msg.SourceChannelID) if err != nil { return fmt.Errorf("failed to get channel mapping: %w", err) } @@ -87,14 +87,15 @@ func (h *mattermostMessageHandler) postMessageToMattermost(msg *pluginModel.Brid // Create the post post := &mmModel.Post{ ChannelId: channelID, + UserId: h.bridge.botUserID, Message: content, Type: mmModel.PostTypeDefault, Props: map[string]interface{}{ - "from_bridge": msg.SourceBridge, - "bridge_user_id": msg.SourceUserID, - "bridge_user_name": msg.SourceUserName, - "bridge_message_id": msg.MessageID, - "bridge_timestamp": msg.Timestamp.Unix(), + "from_bridge": msg.SourceBridge, + "bridge_user_id": msg.SourceUserID, + "bridge_user_name": msg.SourceUserName, + "bridge_message_id": msg.MessageID, + "bridge_timestamp": msg.Timestamp.Unix(), }, } @@ -204,4 +205,4 @@ func (r *mattermostUserResolver) GetDisplayName(externalUserID string) string { } return user.Id[:8] // Show first 8 chars of ID as fallback -} \ No newline at end of file +} diff --git a/server/bridge/xmpp/bridge.go b/server/bridge/xmpp/bridge.go index ca05e78..d0ed6a3 100644 --- a/server/bridge/xmpp/bridge.go +++ b/server/bridge/xmpp/bridge.go @@ -502,8 +502,8 @@ func (b *xmppBridge) DeleteChannelMapping(channelID string) error { return nil } -// RoomExists checks if an XMPP room exists on the remote service -func (b *xmppBridge) RoomExists(roomID string) (bool, error) { +// ChannelMappingExists checks if an XMPP room exists on the remote service +func (b *xmppBridge) ChannelMappingExists(roomID string) (bool, error) { if !b.connected.Load() { return false, fmt.Errorf("not connected to XMPP server") } @@ -547,6 +547,22 @@ func (b *xmppBridge) GetRoomMapping(roomID string) (string, error) { return channelID, nil } +// GetChannelMappingForBridge retrieves the Mattermost channel ID for a given room ID from a specific bridge +func (b *xmppBridge) GetChannelMappingForBridge(bridgeName, roomID string) (string, error) { + // Look up the channel ID using the bridge name and room ID as the key + channelIDBytes, err := b.kvstore.Get(kvstore.BuildChannelMapKey(bridgeName, roomID)) + if err != nil { + // No mapping found is not an error, just return empty string + b.logger.LogDebug("No channel mapping found for bridge room", "bridge_name", bridgeName, "room_id", roomID) + return "", nil + } + + channelID := string(channelIDBytes) + b.logger.LogDebug("Found channel mapping for bridge room", "bridge_name", bridgeName, "room_id", roomID, "channel_id", channelID) + + return channelID, nil +} + // GetUserManager returns the user manager for this bridge func (b *xmppBridge) GetUserManager() pluginModel.BridgeUserManager { return b.userManager @@ -554,44 +570,30 @@ func (b *xmppBridge) GetUserManager() pluginModel.BridgeUserManager { // startMessageAggregation starts the message aggregation goroutine func (b *xmppBridge) startMessageAggregation() { - b.logger.LogDebug("Starting XMPP message aggregation") + clientChannel := b.bridgeClient.GetMessageChannel() for { select { case <-b.ctx.Done(): b.logger.LogDebug("Stopping XMPP message aggregation") return - default: - // Aggregate messages from bridge client if available - if b.bridgeClient != nil { - clientChannel := b.bridgeClient.GetMessageChannel() - select { - case msg, ok := <-clientChannel: - if !ok { - b.logger.LogDebug("Bridge client message channel closed") - continue - } - - // Forward to our bridge's message channel - select { - case b.incomingMessages <- msg: - b.logger.LogDebug("Message forwarded from bridge client", - "source_channel", msg.SourceChannelID, - "user_id", msg.SourceUserID) - default: - b.logger.LogWarn("Bridge message channel full, dropping message", - "source_channel", msg.SourceChannelID, - "user_id", msg.SourceUserID) - } - case <-b.ctx.Done(): - return - default: - // No messages available, continue with other potential sources - } + case msg, ok := <-clientChannel: + if !ok { + b.logger.LogDebug("Bridge client message channel closed") + return } - // TODO: Add aggregation from user client channels when implemented - // This is where we would aggregate from multiple XMPP user connections + // Forward to our bridge's message channel + select { + case b.incomingMessages <- msg: + // Message forwarded successfully + case <-b.ctx.Done(): + return + default: + b.logger.LogWarn("Bridge message channel full, dropping message", + "source_channel", msg.SourceChannelID, + "user_id", msg.SourceUserID) + } } } } diff --git a/server/model/bridge.go b/server/model/bridge.go index f274e0e..6405439 100644 --- a/server/model/bridge.go +++ b/server/model/bridge.go @@ -73,6 +73,9 @@ func (r DeleteChannelMappingRequest) Validate() error { } type BridgeManager interface { + // Start starts the bridge manager and message routing system. + Start() error + // RegisterBridge registers a bridge with the given name. Returns an error if the name is empty, // the bridge is nil, or a bridge with the same name is already registered. RegisterBridge(name string, bridge Bridge) error @@ -138,11 +141,11 @@ type Bridge interface { // DeleteChannelMapping removes a mapping between a Mattermost channel ID and a bridge room ID. DeleteChannelMapping(channelID string) error - // RoomExists checks if a room/channel exists on the remote service. - RoomExists(roomID string) (bool, error) + // ChannelMappingExists checks if a room/channel exists on the remote service. + ChannelMappingExists(roomID string) (bool, error) - // GetRoomMapping retrieves the Mattermost channel ID for a given room ID (reverse lookup). - GetRoomMapping(roomID string) (string, error) + // GetChannelMappingForBridge retrieves the Mattermost channel ID for a given room ID from a specific bridge. + GetChannelMappingForBridge(bridgeName, roomID string) (string, error) // IsConnected checks if the bridge is connected to the remote service. IsConnected() bool diff --git a/server/plugin.go b/server/plugin.go index f7a5f81..646286c 100644 --- a/server/plugin.go +++ b/server/plugin.go @@ -14,7 +14,6 @@ import ( "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/store/kvstore" - "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp" "github.com/mattermost/mattermost/server/public/model" "github.com/mattermost/mattermost/server/public/plugin" "github.com/mattermost/mattermost/server/public/pluginapi" @@ -35,9 +34,6 @@ type Plugin struct { // commandClient is the client used to register and execute slash commands. commandClient command.Command - // xmppClient is the client used to communicate with XMPP servers. - xmppClient *xmpp.Client - // logger is the main plugin logger logger logger.Logger @@ -70,15 +66,11 @@ func (p *Plugin) OnActivate() error { p.kvstore = kvstore.NewKVStore(p.client) - p.initXMPPClient() - // Load configuration directly cfg := p.getConfiguration() - p.logger.LogDebug("Loaded configuration in OnActivate", "config", cfg) // Register the plugin for shared channels if err := p.registerForSharedChannels(); err != nil { - p.logger.LogError("Failed to register for shared channels", "error", err) return fmt.Errorf("failed to register for shared channels: %w", err) } @@ -87,12 +79,16 @@ func (p *Plugin) OnActivate() error { // Initialize and register bridges with current configuration if err := p.initBridges(*cfg); err != nil { - p.logger.LogError("Failed to initialize bridges", "error", err) return fmt.Errorf("failed to initialize bridges: %w", err) } p.commandClient = command.NewCommandHandler(p.client, p.bridgeManager) + // Start the bridge manager (this starts message routing) + if err := p.bridgeManager.Start(); err != nil { + return fmt.Errorf("failed to start bridge manager: %w", err) + } + // Start all bridges for _, bridgeName := range p.bridgeManager.ListBridges() { if err := p.bridgeManager.StartBridge(bridgeName); err != nil { @@ -145,18 +141,6 @@ func (p *Plugin) ExecuteCommand(c *plugin.Context, args *model.CommandArgs) (*mo return response, nil } -func (p *Plugin) initXMPPClient() { - cfg := p.getConfiguration() - p.xmppClient = xmpp.NewClient( - cfg.XMPPServerURL, - cfg.XMPPUsername, - cfg.XMPPPassword, - cfg.GetXMPPResource(), - p.remoteID, - p.logger, - ) -} - func (p *Plugin) initBridges(cfg config.Configuration) error { // Create and register XMPP bridge xmppBridge := xmppbridge.NewBridge( @@ -176,6 +160,7 @@ func (p *Plugin) initBridges(cfg config.Configuration) error { p.API, p.kvstore, &cfg, + p.botUserID, ) if err := p.bridgeManager.RegisterBridge("mattermost", mattermostBridge); err != nil { diff --git a/server/xmpp/client.go b/server/xmpp/client.go index 7f32100..ebe16dd 100644 --- a/server/xmpp/client.go +++ b/server/xmpp/client.go @@ -8,6 +8,7 @@ import ( "fmt" "time" + "github.com/jellydator/ttlcache/v3" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" "mellium.im/sasl" @@ -26,6 +27,9 @@ const ( // msgBufferSize is the buffer size for incoming message channels msgBufferSize = 1000 + + // messageDedupeTTL is the TTL for message deduplication cache + messageDedupeTTL = 30 * time.Second ) // Client represents an XMPP client for communicating with XMPP servers. @@ -51,6 +55,9 @@ type Client struct { // Message handling for bridge integration incomingMessages chan *model.DirectionalMessage + + // Message deduplication cache to handle XMPP server duplicates + dedupeCache *ttlcache.Cache[string, time.Time] } // MessageRequest represents a request to send a message. @@ -99,6 +106,14 @@ type UserProfile struct { func NewClient(serverURL, username, password, resource, remoteID string, logger logger.Logger) *Client { ctx, cancel := context.WithCancel(context.Background()) + // Create TTL cache for message deduplication + dedupeCache := ttlcache.New( + ttlcache.WithTTL[string, time.Time](messageDedupeTTL), + ) + + // Start automatic cleanup in background + go dedupeCache.Start() + client := &Client{ serverURL: serverURL, username: username, @@ -110,6 +125,7 @@ func NewClient(serverURL, username, password, resource, remoteID string, logger cancel: cancel, sessionReady: make(chan struct{}), incomingMessages: make(chan *model.DirectionalMessage, msgBufferSize), + dedupeCache: dedupeCache, } // Create MUC client and set up message handling @@ -139,6 +155,7 @@ func (c *Client) SetServerDomain(domain string) { // Connect establishes connection to the XMPP server func (c *Client) Connect() error { + if c.session != nil { return nil // Already connected } @@ -269,10 +286,11 @@ func (c *Client) Disconnect() error { // Continue with cleanup even on timeout } + // Stop the TTL cache cleanup goroutine + c.dedupeCache.Stop() + // Cancel the client context - if c.cancel != nil { - c.cancel() - } + c.cancel() c.logger.LogInfo("XMPP client disconnected successfully") return nil @@ -653,6 +671,17 @@ func (c *Client) handleIncomingMessage(msg stanza.Message, t xmlstream.TokenRead return nil } + // Deduplicate messages using message ID and TTL cache + if msg.ID != "" { + // Check if this message ID is already in the cache (indicates duplicate) + if c.dedupeCache.Has(msg.ID) { + return nil + } + + // Record this message in the cache with TTL + c.dedupeCache.Set(msg.ID, time.Now(), ttlcache.DefaultTTL) + } + // Extract channel and user information from JIDs channelID, err := c.extractChannelID(msg.From) if err != nil { @@ -688,10 +717,7 @@ func (c *Client) handleIncomingMessage(msg stanza.Message, t xmlstream.TokenRead // Send to message channel (non-blocking) select { case c.incomingMessages <- directionalMsg: - c.logger.LogDebug("Message queued for processing", - "channel_id", channelID, - "user_id", userID, - "content_length", len(msgWithBody.Body)) + // Message queued successfully default: c.logger.LogWarn("Message channel full, dropping message", "channel_id", channelID, From 4c6aeb2392d9daeecb5e06b876b1bc79c4d2a3c3 Mon Sep 17 00:00:00 2001 From: Felipe Martin Date: Tue, 5 Aug 2025 12:33:19 +0200 Subject: [PATCH 4/7] refactor: standardize bridge-agnostic terminology and remove unused kvstore functions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace "Room" with "Channel" in bridge-agnostic contexts throughout codebase - Update BridgeRoomID → BridgeChannelID in model structs and all references - Change error messages to use consistent "Channel" terminology for user-facing text - Update log keys: bridge_room_id → bridge_channel_id for consistency - Clean up kvstore constants file by removing unused functions and constants: - Removed BuildXMPPUserKey, BuildMattermostUserKey, BuildGhostUserKey - Removed BuildXMPPEventPostKey, BuildXMPPReactionKey functions - Removed unused constants: KeyPrefixXMPPUser, KeyPrefixMattermostUser, etc. - Keep only actively used BuildChannelMapKey and ExtractIdentifierFromChannelMapKey - Preserve XMPP-specific "Room" terminology in appropriate contexts (client methods, JIDs) 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- server/bridge/manager.go | 52 +++++++-------- server/bridge/messagebus.go | 92 +++++++++++++-------------- server/bridge/xmpp/message_handler.go | 2 +- server/command/command.go | 14 ++-- server/model/bridge.go | 20 +++--- server/model/message.go | 6 +- server/store/kvstore/constants.go | 57 +---------------- server/xmpp/client.go | 1 - 8 files changed, 94 insertions(+), 150 deletions(-) diff --git a/server/bridge/manager.go b/server/bridge/manager.go index d839708..be26322 100644 --- a/server/bridge/manager.go +++ b/server/bridge/manager.go @@ -264,7 +264,7 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque return fmt.Errorf("invalid mapping request: %w", err) } - m.logger.LogDebug("Creating channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID, "user_id", req.UserID, "team_id", req.TeamID) + m.logger.LogDebug("Creating channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID, "user_id", req.UserID, "team_id", req.TeamID) // Get the specific bridge bridge, err := m.GetBridge(req.BridgeName) @@ -279,41 +279,41 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque } // NEW: Check if room already mapped to another channel - existingChannelID, err := bridge.GetChannelMapping(req.BridgeRoomID) + existingChannelID, err := bridge.GetChannelMapping(req.BridgeChannelID) if err != nil { - m.logger.LogError("Failed to check room mapping", "bridge_room_id", req.BridgeRoomID, "error", err) - return fmt.Errorf("failed to check room mapping: %w", err) + m.logger.LogError("Failed to check channel mapping", "bridge_channel_id", req.BridgeChannelID, "error", err) + return fmt.Errorf("failed to check channel mapping: %w", err) } if existingChannelID != "" { - m.logger.LogWarn("Room already mapped to another channel", - "bridge_room_id", req.BridgeRoomID, + m.logger.LogWarn("Channel already mapped to another channel", + "bridge_channel_id", req.BridgeChannelID, "existing_channel_id", existingChannelID, "requested_channel_id", req.ChannelID) - return fmt.Errorf("room '%s' is already mapped to channel '%s'", req.BridgeRoomID, existingChannelID) + return fmt.Errorf("channel '%s' is already mapped to channel '%s'", req.BridgeChannelID, existingChannelID) } // NEW: Check if room exists on target bridge - roomExists, err := bridge.ChannelMappingExists(req.BridgeRoomID) + channelExists, err := bridge.ChannelMappingExists(req.BridgeChannelID) if err != nil { - m.logger.LogError("Failed to check room existence", "bridge_room_id", req.BridgeRoomID, "error", err) - return fmt.Errorf("failed to check room existence: %w", err) + m.logger.LogError("Failed to check channel existence", "bridge_channel_id", req.BridgeChannelID, "error", err) + return fmt.Errorf("failed to check channel existence: %w", err) } - if !roomExists { - m.logger.LogWarn("Room does not exist on bridge", - "bridge_room_id", req.BridgeRoomID, + if !channelExists { + m.logger.LogWarn("Channel does not exist on bridge", + "bridge_channel_id", req.BridgeChannelID, "bridge_name", req.BridgeName) - return fmt.Errorf("room '%s' does not exist on %s bridge", req.BridgeRoomID, req.BridgeName) + return fmt.Errorf("channel '%s' does not exist on %s bridge", req.BridgeChannelID, req.BridgeName) } - m.logger.LogDebug("Room validation passed", - "bridge_room_id", req.BridgeRoomID, + m.logger.LogDebug("Channel validation passed", + "bridge_channel_id", req.BridgeChannelID, "bridge_name", req.BridgeName, - "room_exists", roomExists, + "channel_exists", channelExists, "already_mapped", false) // Create the channel mapping on the receiving bridge - if err = bridge.CreateChannelMapping(req.ChannelID, req.BridgeRoomID); err != nil { - m.logger.LogError("Failed to create channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID, "error", err) + if err = bridge.CreateChannelMapping(req.ChannelID, req.BridgeChannelID); err != nil { + m.logger.LogError("Failed to create channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID, "error", err) return fmt.Errorf("failed to create channel mapping for bridge '%s': %w", req.BridgeName, err) } @@ -324,19 +324,19 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque } // Create the channel mapping in the Mattermost bridge - if err = mattermostBridge.CreateChannelMapping(req.ChannelID, req.BridgeRoomID); err != nil { - m.logger.LogError("Failed to create channel mapping in Mattermost bridge", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID, "error", err) + if err = mattermostBridge.CreateChannelMapping(req.ChannelID, req.BridgeChannelID); err != nil { + m.logger.LogError("Failed to create channel mapping in Mattermost bridge", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID, "error", err) return fmt.Errorf("failed to create channel mapping in Mattermost bridge: %w", err) } // Share the channel using Mattermost's shared channels API if err = m.shareChannel(req); err != nil { - m.logger.LogError("Failed to share channel", "channel_id", req.ChannelID, "bridge_room_id", req.BridgeRoomID, "error", err) + m.logger.LogError("Failed to share channel", "channel_id", req.ChannelID, "bridge_channel_id", req.BridgeChannelID, "error", err) // Don't fail the entire operation if sharing fails, but log the error m.logger.LogWarn("Channel mapping created but sharing failed - channel may not sync properly") } - m.logger.LogInfo("Successfully created channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID) + m.logger.LogInfo("Successfully created channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID) return nil } @@ -403,9 +403,9 @@ func (m *BridgeManager) shareChannel(req model.CreateChannelMappingRequest) erro TeamId: req.TeamID, Home: true, ReadOnly: false, - ShareName: model.SanitizeShareName(fmt.Sprintf("bridge-%s", req.BridgeRoomID)), - ShareDisplayName: fmt.Sprintf("Bridge: %s", req.BridgeRoomID), - SharePurpose: fmt.Sprintf("Shared channel bridged to %s", req.BridgeRoomID), + ShareName: model.SanitizeShareName(fmt.Sprintf("bridge-%s", req.BridgeChannelID)), + ShareDisplayName: fmt.Sprintf("Bridge: %s", req.BridgeChannelID), + SharePurpose: fmt.Sprintf("Shared channel bridged to %s", req.BridgeChannelID), ShareHeader: "test header", CreatorId: req.UserID, RemoteId: m.remoteID, diff --git a/server/bridge/messagebus.go b/server/bridge/messagebus.go index f51babd..6def69a 100644 --- a/server/bridge/messagebus.go +++ b/server/bridge/messagebus.go @@ -13,7 +13,7 @@ import ( const ( // DefaultMessageBufferSize is the default buffer size for message channels DefaultMessageBufferSize = 1000 - + // MessageDeliveryTimeout is the maximum time to wait for message delivery MessageDeliveryTimeout = 5 * time.Second ) @@ -24,20 +24,20 @@ type messageBus struct { incomingMessages chan *model.DirectionalMessage subscribers map[string]chan *model.DirectionalMessage subscribersMu sync.RWMutex - + // Lifecycle management - ctx context.Context - cancel context.CancelFunc - logger logger.Logger - wg sync.WaitGroup - started bool - startMu sync.Mutex + ctx context.Context + cancel context.CancelFunc + logger logger.Logger + wg sync.WaitGroup + started bool + startMu sync.Mutex } // NewMessageBus creates a new message bus instance func NewMessageBus(logger logger.Logger) model.MessageBus { ctx, cancel := context.WithCancel(context.Background()) - + return &messageBus{ incomingMessages: make(chan *model.DirectionalMessage, DefaultMessageBufferSize), subscribers: make(map[string]chan *model.DirectionalMessage), @@ -51,11 +51,11 @@ func NewMessageBus(logger logger.Logger) model.MessageBus { func (mb *messageBus) Subscribe(bridgeName string) <-chan *model.DirectionalMessage { mb.subscribersMu.Lock() defer mb.subscribersMu.Unlock() - + // Create a buffered channel for this subscriber ch := make(chan *model.DirectionalMessage, DefaultMessageBufferSize) mb.subscribers[bridgeName] = ch - + mb.logger.LogDebug("Bridge subscribed to message bus", "bridge", bridgeName) return ch } @@ -65,20 +65,20 @@ func (mb *messageBus) Publish(msg *model.DirectionalMessage) error { if msg == nil { return fmt.Errorf("message cannot be nil") } - + if msg.BridgeMessage == nil { return fmt.Errorf("bridge message cannot be nil") } - + select { case mb.incomingMessages <- msg: - mb.logger.LogDebug("Message published to bus", + mb.logger.LogDebug("Message published to bus", "source_bridge", msg.SourceBridge, "direction", msg.Direction, "channel_id", msg.SourceChannelID) return nil case <-time.After(MessageDeliveryTimeout): - mb.logger.LogWarn("Message delivery timeout", + mb.logger.LogWarn("Message delivery timeout", "source_bridge", msg.SourceBridge, "channel_id", msg.SourceChannelID) return fmt.Errorf("message delivery timeout") @@ -91,17 +91,17 @@ func (mb *messageBus) Publish(msg *model.DirectionalMessage) error { func (mb *messageBus) Start() error { mb.startMu.Lock() defer mb.startMu.Unlock() - + if mb.started { return fmt.Errorf("message bus is already started") } - + mb.logger.LogInfo("Starting message bus") - + // Start the message routing goroutine mb.wg.Add(1) go mb.routeMessages() - + mb.started = true mb.logger.LogInfo("Message bus started successfully") return nil @@ -111,19 +111,19 @@ func (mb *messageBus) Start() error { func (mb *messageBus) Stop() error { mb.startMu.Lock() defer mb.startMu.Unlock() - + if !mb.started { return nil // Already stopped } - + mb.logger.LogInfo("Stopping message bus") - + // Cancel context to signal shutdown mb.cancel() - + // Wait for routing goroutine to finish mb.wg.Wait() - + // Close all subscriber channels mb.subscribersMu.Lock() for bridgeName, ch := range mb.subscribers { @@ -132,10 +132,10 @@ func (mb *messageBus) Stop() error { } mb.subscribers = make(map[string]chan *model.DirectionalMessage) mb.subscribersMu.Unlock() - + // Close incoming messages channel close(mb.incomingMessages) - + mb.started = false mb.logger.LogInfo("Message bus stopped successfully") return nil @@ -144,9 +144,9 @@ func (mb *messageBus) Stop() error { // routeMessages handles the main message routing loop func (mb *messageBus) routeMessages() { defer mb.wg.Done() - + mb.logger.LogDebug("Message routing started") - + for { select { case msg, ok := <-mb.incomingMessages: @@ -154,14 +154,14 @@ func (mb *messageBus) routeMessages() { mb.logger.LogDebug("Incoming messages channel closed, stopping routing") return } - + if err := mb.routeMessage(msg); err != nil { - mb.logger.LogError("Failed to route message", + mb.logger.LogError("Failed to route message", "source_bridge", msg.SourceBridge, "direction", msg.Direction, "error", err) } - + case <-mb.ctx.Done(): mb.logger.LogDebug("Context cancelled, stopping message routing") return @@ -173,9 +173,9 @@ func (mb *messageBus) routeMessages() { func (mb *messageBus) routeMessage(msg *model.DirectionalMessage) error { mb.subscribersMu.RLock() defer mb.subscribersMu.RUnlock() - + routedCount := 0 - + // Route to specific target bridges if specified if len(msg.TargetBridges) > 0 { for _, targetBridge := range msg.TargetBridges { @@ -184,7 +184,7 @@ func (mb *messageBus) routeMessage(msg *model.DirectionalMessage) error { routedCount++ } } else { - mb.logger.LogWarn("Target bridge not subscribed", + mb.logger.LogWarn("Target bridge not subscribed", "target_bridge", targetBridge, "source_bridge", msg.SourceBridge) } @@ -199,11 +199,11 @@ func (mb *messageBus) routeMessage(msg *model.DirectionalMessage) error { } } } - - mb.logger.LogDebug("Message routed", + + mb.logger.LogDebug("Message routed", "source_bridge", msg.SourceBridge, "routed_to_count", routedCount) - + return nil } @@ -213,7 +213,7 @@ func (mb *messageBus) deliverMessage(ch chan *model.DirectionalMessage, msg *mod case ch <- msg: return true case <-time.After(MessageDeliveryTimeout): - mb.logger.LogWarn("Message delivery timeout to bridge", + mb.logger.LogWarn("Message delivery timeout to bridge", "target_bridge", targetBridge, "source_bridge", msg.SourceBridge) return false @@ -226,19 +226,19 @@ func (mb *messageBus) deliverMessage(ch chan *model.DirectionalMessage, msg *mod func (mb *messageBus) GetStats() map[string]interface{} { mb.subscribersMu.RLock() defer mb.subscribersMu.RUnlock() - + stats := map[string]interface{}{ - "started": mb.started, - "subscriber_count": len(mb.subscribers), - "buffer_size": DefaultMessageBufferSize, - "pending_messages": len(mb.incomingMessages), + "started": mb.started, + "subscriber_count": len(mb.subscribers), + "buffer_size": DefaultMessageBufferSize, + "pending_messages": len(mb.incomingMessages), } - + subscribers := make([]string, 0, len(mb.subscribers)) for bridgeName := range mb.subscribers { subscribers = append(subscribers, bridgeName) } stats["subscribers"] = subscribers - + return stats -} \ No newline at end of file +} diff --git a/server/bridge/xmpp/message_handler.go b/server/bridge/xmpp/message_handler.go index 3916574..4035901 100644 --- a/server/bridge/xmpp/message_handler.go +++ b/server/bridge/xmpp/message_handler.go @@ -163,4 +163,4 @@ func (r *xmppUserResolver) GetDisplayName(externalUserID string) string { // Fallback to the full ID return externalUserID -} \ No newline at end of file +} diff --git a/server/command/command.go b/server/command/command.go index d15b9f5..95ceac8 100644 --- a/server/command/command.go +++ b/server/command/command.go @@ -161,11 +161,11 @@ func (c *Handler) executeMapCommand(args *model.CommandArgs, fields []string) *m // Create the mapping using BridgeManager mappingReq := pluginModel.CreateChannelMappingRequest{ - ChannelID: channelID, - BridgeName: "xmpp", - BridgeRoomID: roomJID, - UserID: args.UserId, - TeamID: args.TeamId, + ChannelID: channelID, + BridgeName: "xmpp", + BridgeChannelID: roomJID, + UserID: args.UserId, + TeamID: args.TeamId, } err = c.bridgeManager.CreateChannelMapping(mappingReq) @@ -292,7 +292,7 @@ func (c *Handler) formatMappingError(operation, roomJID string, err error) *mode case strings.Contains(errorMsg, "already mapped to channel"): return &model.CommandResponse{ ResponseType: model.CommandResponseTypeEphemeral, - Text: fmt.Sprintf(`❌ **Room Already Mapped** + Text: fmt.Sprintf(`❌ **Channel Already Mapped** The XMPP room **%s** is already connected to another channel. @@ -305,7 +305,7 @@ The XMPP room **%s** is already connected to another channel. case strings.Contains(errorMsg, "does not exist"): return &model.CommandResponse{ ResponseType: model.CommandResponseTypeEphemeral, - Text: fmt.Sprintf(`❌ **Room Not Found** + Text: fmt.Sprintf(`❌ **Channel Not Found** The XMPP room **%s** doesn't exist or isn't accessible. diff --git a/server/model/bridge.go b/server/model/bridge.go index 6405439..21729fa 100644 --- a/server/model/bridge.go +++ b/server/model/bridge.go @@ -20,11 +20,11 @@ const ( // CreateChannelMappingRequest contains information needed to create a channel mapping type CreateChannelMappingRequest struct { - ChannelID string // Mattermost channel ID - BridgeName string // Name of the bridge (e.g., "xmpp") - BridgeRoomID string // Remote room/channel ID (e.g., JID for XMPP) - UserID string // ID of user who triggered the mapping creation - TeamID string // Team ID where the channel belongs + ChannelID string // Mattermost channel ID + BridgeName string // Name of the bridge (e.g., "xmpp") + BridgeChannelID string // Remote room/channel ID (e.g., JID for XMPP) + UserID string // ID of user who triggered the mapping creation + TeamID string // Team ID where the channel belongs } // Validate checks if all required fields are present and valid @@ -35,8 +35,8 @@ func (r CreateChannelMappingRequest) Validate() error { if r.BridgeName == "" { return fmt.Errorf("bridgeName cannot be empty") } - if r.BridgeRoomID == "" { - return fmt.Errorf("bridgeRoomID cannot be empty") + if r.BridgeChannelID == "" { + return fmt.Errorf("bridgeChannelID cannot be empty") } if r.UserID == "" { return fmt.Errorf("userID cannot be empty") @@ -132,13 +132,13 @@ type Bridge interface { // Stop stops the bridge Stop() error - // CreateChannelMapping creates a mapping between a Mattermost channel ID and an bridge room ID. + // CreateChannelMapping creates a mapping between a Mattermost channel ID and a bridge channel ID. CreateChannelMapping(channelID, roomJID string) error - // GetChannelMapping retrieves the bridge room ID for a given Mattermost channel ID. + // GetChannelMapping retrieves the bridge channel ID for a given Mattermost channel ID. GetChannelMapping(channelID string) (string, error) - // DeleteChannelMapping removes a mapping between a Mattermost channel ID and a bridge room ID. + // DeleteChannelMapping removes a mapping between a Mattermost channel ID and a bridge channel ID. DeleteChannelMapping(channelID string) error // ChannelMappingExists checks if a room/channel exists on the remote service. diff --git a/server/model/message.go b/server/model/message.go index 5b2a252..747b0f2 100644 --- a/server/model/message.go +++ b/server/model/message.go @@ -30,8 +30,8 @@ type BridgeMessage struct { ThreadID string // Thread/reply ID (if applicable) // Routing hints - TargetBridges []string // Which bridges should receive this - Metadata map[string]any // Bridge-specific metadata + TargetBridges []string // Which bridges should receive this + Metadata map[string]any // Bridge-specific metadata } // DirectionalMessage wraps a BridgeMessage with direction information @@ -85,4 +85,4 @@ type MessageHandler interface { // GetSupportedMessageTypes returns the message types this handler supports GetSupportedMessageTypes() []string -} \ No newline at end of file +} diff --git a/server/store/kvstore/constants.go b/server/store/kvstore/constants.go index 579bd5c..698a82c 100644 --- a/server/store/kvstore/constants.go +++ b/server/store/kvstore/constants.go @@ -7,72 +7,17 @@ import "strings" // to ensure consistency and avoid key conflicts. const ( - // CurrentKVStoreVersion is the current version requiring migrations - CurrentKVStoreVersion = 2 - // KeyPrefixXMPPUser is the prefix for XMPP user ID -> Mattermost user ID mappings - KeyPrefixXMPPUser = "xmpp_user_" - // KeyPrefixMattermostUser is the prefix for Mattermost user ID -> XMPP user ID mappings - KeyPrefixMattermostUser = "mattermost_user_" - // KeyPrefixChannelMap is the prefix for bridge-agnostic channel mappings KeyPrefixChannelMap = "channel_map_" - - // KeyPrefixGhostUser is the prefix for Mattermost user ID -> XMPP ghost user ID cache - KeyPrefixGhostUser = "ghost_user_" - // KeyPrefixGhostRoom is the prefix for ghost user room membership tracking - KeyPrefixGhostRoom = "ghost_room_" - - // KeyPrefixXMPPEventPost is the prefix for XMPP event ID -> Mattermost post ID mappings - KeyPrefixXMPPEventPost = "xmpp_event_post_" - // KeyPrefixXMPPReaction is the prefix for XMPP reaction event ID -> reaction info mappings - KeyPrefixXMPPReaction = "xmpp_reaction_" - - // KeyStoreVersion is the key for tracking the current KV store schema version - KeyStoreVersion = "kv_store_version" - - // KeyPrefixLegacyDMMapping was the old prefix for DM mappings - KeyPrefixLegacyDMMapping = "dm_mapping_" - // KeyPrefixLegacyXMPPDMMapping was the old prefix for XMPP DM mappings - KeyPrefixLegacyXMPPDMMapping = "xmpp_dm_mapping_" ) // Helper functions for building KV store keys -// BuildXMPPUserKey creates a key for XMPP user -> Mattermost user mapping -func BuildXMPPUserKey(xmppUserID string) string { - return KeyPrefixXMPPUser + xmppUserID -} - -// BuildMattermostUserKey creates a key for Mattermost user -> XMPP user mapping -func BuildMattermostUserKey(mattermostUserID string) string { - return KeyPrefixMattermostUser + mattermostUserID -} - // BuildChannelMapKey creates a bridge-agnostic key for channel mappings func BuildChannelMapKey(bridgeName, identifier string) string { return KeyPrefixChannelMap + bridgeName + "_" + identifier } -// BuildGhostUserKey creates a key for ghost user cache -func BuildGhostUserKey(mattermostUserID string) string { - return KeyPrefixGhostUser + mattermostUserID -} - -// BuildGhostRoomKey creates a key for ghost user room membership -func BuildGhostRoomKey(mattermostUserID, roomID string) string { - return KeyPrefixGhostRoom + mattermostUserID + "_" + roomID -} - -// BuildXMPPEventPostKey creates a key for XMPP event -> post mapping -func BuildXMPPEventPostKey(xmppEventID string) string { - return KeyPrefixXMPPEventPost + xmppEventID -} - -// BuildXMPPReactionKey creates a key for XMPP reaction storage -func BuildXMPPReactionKey(reactionEventID string) string { - return KeyPrefixXMPPReaction + reactionEventID -} - // ExtractIdentifierFromChannelMapKey extracts the identifier from a bridge-agnostic channel map key func ExtractIdentifierFromChannelMapKey(key, bridgeName string) string { expectedPrefix := KeyPrefixChannelMap + bridgeName + "_" @@ -83,4 +28,4 @@ func ExtractIdentifierFromChannelMapKey(key, bridgeName string) string { return "" } return key[len(expectedPrefix):] -} +} \ No newline at end of file diff --git a/server/xmpp/client.go b/server/xmpp/client.go index ebe16dd..5889225 100644 --- a/server/xmpp/client.go +++ b/server/xmpp/client.go @@ -155,7 +155,6 @@ func (c *Client) SetServerDomain(domain string) { // Connect establishes connection to the XMPP server func (c *Client) Connect() error { - if c.session != nil { return nil // Already connected } From 5d81ca2154c2b4fda2a0a173b86b20f13a11714d Mon Sep 17 00:00:00 2001 From: Felipe Martin Date: Tue, 5 Aug 2025 19:37:23 +0200 Subject: [PATCH 5/7] fix: implement direct TCP connection to bypass DNS SRV discovery issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add parseServerAddress helper to handle both URL and host:port formats - Replace DialClientSession with NewClientSession and direct net.Dialer - Bypass DNS SRV record lookups that were causing connection timeouts - Support both simple host:port (localhost:5222) and URL formats - Add proper error handling and connection cleanup 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- server/xmpp/client.go | 63 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/server/xmpp/client.go b/server/xmpp/client.go index 5889225..537711a 100644 --- a/server/xmpp/client.go +++ b/server/xmpp/client.go @@ -6,6 +6,8 @@ import ( "crypto/tls" "encoding/xml" "fmt" + "net" + "net/url" "time" "github.com/jellydator/ttlcache/v3" @@ -153,6 +155,44 @@ func (c *Client) SetServerDomain(domain string) { c.serverDomain = domain } +// parseServerAddress parses a server URL and returns a host:port address +func (c *Client) parseServerAddress(serverURL string) (string, error) { + // Handle simple host:port format (e.g., "localhost:5222") + if host, port, err := net.SplitHostPort(serverURL); err == nil { + // Already in host:port format, validate and return + if host == "" { + return "", fmt.Errorf("empty hostname in server URL: %s", serverURL) + } + if port == "" { + return "", fmt.Errorf("empty port in server URL: %s", serverURL) + } + return serverURL, nil + } + + // Try parsing as URL (e.g., "xmpp://localhost:5222" or "xmpps://localhost:5223") + parsedURL, err := url.Parse(serverURL) + if err != nil { + return "", fmt.Errorf("invalid server URL format: %w", err) + } + + host := parsedURL.Hostname() + if host == "" { + return "", fmt.Errorf("no hostname found in server URL: %s", serverURL) + } + + port := parsedURL.Port() + // Use default XMPP port if not specified + if port == "" { + if parsedURL.Scheme == "xmpps" { + port = "5223" + } else { + port = "5222" + } + } + + return host + ":" + port, nil +} + // Connect establishes connection to the XMPP server func (c *Client) Connect() error { if c.session != nil { @@ -192,15 +232,34 @@ func (c *Client) Connect() error { connectCtx, connectCancel := context.WithTimeout(c.ctx, 30*time.Second) defer connectCancel() - // Use DialClientSession for proper SASL authentication with timeout - c.session, err = xmpp.DialClientSession( + c.logger.LogDebug("Connecting to XMPP server", "server_url", c.serverURL) + + // Parse server address for direct connection + // We use this instead of using the JID for the user to avoid SRV lookups that can fail + // depending on network/dns configuration and to ensure we connect directly to the specified + // server. + serverAddr, err := c.parseServerAddress(c.serverURL) + if err != nil { + return fmt.Errorf("failed to parse server URL %s: %w", c.serverURL, err) + } + + var d net.Dialer + conn, err := d.DialContext(connectCtx, "tcp", serverAddr) + if err != nil { + return fmt.Errorf("failed to dial XMPP server at %s: %w", serverAddr, err) + } + + // Create client session with direct connection + c.session, err = xmpp.NewClientSession( connectCtx, c.jidAddr, + conn, xmpp.StartTLS(tlsConfig), xmpp.SASL("", c.password, sasl.Plain), xmpp.BindResource(), ) if err != nil { + conn.Close() return fmt.Errorf("failed to establish XMPP session: %w", err) } From d21dcd2dd13a83cc3c2d9561d2f8f9fe64b15c32 Mon Sep 17 00:00:00 2001 From: Felipe Martin Date: Tue, 5 Aug 2025 19:39:01 +0200 Subject: [PATCH 6/7] feat: implement sync and sync-reset commands for shared channel management MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add GetRemoteID() method to Bridge interface for cursor operations - Update bridge constructors to accept and store remoteID parameter - Implement executeSyncCommand handler for forcing shared channel sync - Implement executeSyncResetCommand handler for resetting sync cursor - Add command registration for 'sync' and 'sync-reset' subcommands - Enhance command handler with direct plugin API access for shared channel operations - Add comprehensive validation and error handling for unmapped channels - Support both SyncSharedChannel and UpdateSharedChannelCursor API methods 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- server/bridge/mattermost/bridge.go | 9 ++- server/bridge/xmpp/bridge.go | 9 ++- server/command/command.go | 118 ++++++++++++++++++++++++++++- server/model/bridge.go | 3 + server/plugin.go | 6 +- 5 files changed, 139 insertions(+), 6 deletions(-) diff --git a/server/bridge/mattermost/bridge.go b/server/bridge/mattermost/bridge.go index 8ff59d3..e618535 100644 --- a/server/bridge/mattermost/bridge.go +++ b/server/bridge/mattermost/bridge.go @@ -26,6 +26,7 @@ type mattermostBridge struct { kvstore kvstore.KVStore userManager pluginModel.BridgeUserManager botUserID string // Bot user ID for posting messages + remoteID string // Remote ID for shared channels // Message handling messageHandler *mattermostMessageHandler @@ -47,13 +48,14 @@ type mattermostBridge struct { } // NewBridge creates a new Mattermost bridge -func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration, botUserID string) pluginModel.Bridge { +func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration, botUserID, remoteID string) pluginModel.Bridge { ctx, cancel := context.WithCancel(context.Background()) b := &mattermostBridge{ logger: log, api: api, kvstore: kvstore, botUserID: botUserID, + remoteID: remoteID, ctx: ctx, cancel: cancel, channelMappings: make(map[string]string), @@ -395,3 +397,8 @@ func (b *mattermostBridge) GetMessageHandler() pluginModel.MessageHandler { func (b *mattermostBridge) GetUserResolver() pluginModel.UserResolver { return b.userResolver } + +// GetRemoteID returns the remote ID used for shared channels registration +func (b *mattermostBridge) GetRemoteID() string { + return b.remoteID +} diff --git a/server/bridge/xmpp/bridge.go b/server/bridge/xmpp/bridge.go index d0ed6a3..902b8ae 100644 --- a/server/bridge/xmpp/bridge.go +++ b/server/bridge/xmpp/bridge.go @@ -30,6 +30,7 @@ type xmppBridge struct { kvstore kvstore.KVStore bridgeClient *xmppClient.Client // Main bridge XMPP client connection userManager pluginModel.BridgeUserManager + remoteID string // Remote ID for shared channels // Message handling messageHandler *xmppMessageHandler @@ -51,7 +52,7 @@ type xmppBridge struct { } // NewBridge creates a new XMPP bridge -func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration) pluginModel.Bridge { +func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration, remoteID string) pluginModel.Bridge { ctx, cancel := context.WithCancel(context.Background()) b := &xmppBridge{ logger: log, @@ -63,6 +64,7 @@ func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg * config: cfg, userManager: bridge.NewUserManager("xmpp", log), incomingMessages: make(chan *pluginModel.DirectionalMessage, defaultMessageBufferSize), + remoteID: remoteID, } // Initialize handlers after bridge is created @@ -617,3 +619,8 @@ func (b *xmppBridge) GetMessageHandler() pluginModel.MessageHandler { func (b *xmppBridge) GetUserResolver() pluginModel.UserResolver { return b.userResolver } + +// GetRemoteID returns the remote ID used for shared channels registration +func (b *xmppBridge) GetRemoteID() string { + return b.remoteID +} diff --git a/server/command/command.go b/server/command/command.go index 95ceac8..686eb97 100644 --- a/server/command/command.go +++ b/server/command/command.go @@ -6,11 +6,13 @@ import ( pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/public/plugin" "github.com/mattermost/mattermost/server/public/pluginapi" ) type Handler struct { client *pluginapi.Client + api plugin.API bridgeManager pluginModel.BridgeManager } @@ -22,7 +24,7 @@ type Command interface { const xmppBridgeCommandTrigger = "xmppbridge" // Register all your slash commands in the NewCommandHandler function. -func NewCommandHandler(client *pluginapi.Client, bridgeManager pluginModel.BridgeManager) Command { +func NewCommandHandler(client *pluginapi.Client, api plugin.API, bridgeManager pluginModel.BridgeManager) Command { // Register XMPP bridge command xmppBridgeData := model.NewAutocompleteData(xmppBridgeCommandTrigger, "", "Manage XMPP bridge") mapSubcommand := model.NewAutocompleteData("map", "[room_jid]", "Map current channel to XMPP room") @@ -35,11 +37,17 @@ func NewCommandHandler(client *pluginapi.Client, bridgeManager pluginModel.Bridg statusSubcommand := model.NewAutocompleteData("status", "", "Show bridge connection status") xmppBridgeData.AddCommand(statusSubcommand) + syncSubcommand := model.NewAutocompleteData("sync", "", "Force sync with shared channels if channel is mapped") + xmppBridgeData.AddCommand(syncSubcommand) + + syncResetSubcommand := model.NewAutocompleteData("sync-reset", "", "Reset sync cursor for channel if mapped") + xmppBridgeData.AddCommand(syncResetSubcommand) + err := client.SlashCommand.Register(&model.Command{ Trigger: xmppBridgeCommandTrigger, AutoComplete: true, AutoCompleteDesc: "Manage XMPP bridge mappings", - AutoCompleteHint: "[map|unmap|status]", + AutoCompleteHint: "[map|unmap|status|sync|sync-reset]", AutocompleteData: xmppBridgeData, }) if err != nil { @@ -48,6 +56,7 @@ func NewCommandHandler(client *pluginapi.Client, bridgeManager pluginModel.Bridg return &Handler{ client: client, + api: api, bridgeManager: bridgeManager, } } @@ -85,6 +94,8 @@ func (c *Handler) executeXMPPBridgeCommand(args *model.CommandArgs) *model.Comma - ` + "`/xmppbridge map `" + ` - Map current channel to XMPP room - ` + "`/xmppbridge unmap`" + ` - Unmap current channel from XMPP room - ` + "`/xmppbridge status`" + ` - Show bridge connection status +- ` + "`/xmppbridge sync`" + ` - Force sync with shared channels if channel is mapped +- ` + "`/xmppbridge sync-reset`" + ` - Reset sync cursor for channel if mapped **Example:** ` + "`/xmppbridge map general@conference.example.com`", @@ -99,6 +110,10 @@ func (c *Handler) executeXMPPBridgeCommand(args *model.CommandArgs) *model.Comma return c.executeUnmapCommand(args) case "status": return c.executeStatusCommand(args) + case "sync": + return c.executeSyncCommand(args) + case "sync-reset": + return c.executeSyncResetCommand(args) default: return &model.CommandResponse{ ResponseType: model.CommandResponseTypeEphemeral, @@ -284,6 +299,105 @@ func (c *Handler) isSystemAdmin(userID string) bool { } // formatMappingError provides user-friendly error messages for mapping operations +func (c *Handler) executeSyncCommand(args *model.CommandArgs) *model.CommandResponse { + channelID := args.ChannelId + + // Get the XMPP bridge to check if channel is mapped + bridge, err := c.bridgeManager.GetBridge("xmpp") + if err != nil { + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: "❌ XMPP bridge is not available. Please check the plugin configuration.", + } + } + + // Check if channel is mapped to XMPP + roomJID, err := bridge.GetChannelMapping(channelID) + if err != nil { + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: fmt.Sprintf("Error checking channel mapping: %v", err), + } + } + + if roomJID == "" { + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: "❌ This channel is not mapped to any XMPP room. Use `/xmppbridge map ` to create a mapping first.", + } + } + + // Force sync with shared channels + if err := c.api.SyncSharedChannel(channelID); err != nil { + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: fmt.Sprintf("❌ Failed to sync channel: %v", err), + } + } + + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: fmt.Sprintf("✅ Successfully triggered sync for channel mapped to XMPP room: `%s`", roomJID), + } +} + +func (c *Handler) executeSyncResetCommand(args *model.CommandArgs) *model.CommandResponse { + channelID := args.ChannelId + + // Get the XMPP bridge to check if channel is mapped + bridge, err := c.bridgeManager.GetBridge("xmpp") + if err != nil { + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: "❌ XMPP bridge is not available. Please check the plugin configuration.", + } + } + + // Check if channel is mapped to XMPP + roomJID, err := bridge.GetChannelMapping(channelID) + if err != nil { + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: fmt.Sprintf("Error checking channel mapping: %v", err), + } + } + + if roomJID == "" { + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: "❌ This channel is not mapped to any XMPP room. Use `/xmppbridge map ` to create a mapping first.", + } + } + + // Get remoteID from bridge for cursor operations + remoteID := bridge.GetRemoteID() + if remoteID == "" { + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: "❌ Bridge remote ID not available. Cannot reset sync cursor.", + } + } + + // Create empty cursor to reset to beginning + emptyCursor := model.GetPostsSinceForSyncCursor{ + LastPostUpdateAt: 1, + LastPostCreateAt: 1, + } + + // Reset sync cursor using UpdateSharedChannelCursor + if err := c.api.UpdateSharedChannelCursor(channelID, remoteID, emptyCursor); err != nil { + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: fmt.Sprintf("❌ Failed to reset sync cursor: %v", err), + } + } + + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: fmt.Sprintf("✅ Successfully reset sync cursor for channel mapped to XMPP room: `%s`", roomJID), + } +} + func (c *Handler) formatMappingError(operation, roomJID string, err error) *model.CommandResponse { errorMsg := err.Error() diff --git a/server/model/bridge.go b/server/model/bridge.go index 21729fa..73da3ec 100644 --- a/server/model/bridge.go +++ b/server/model/bridge.go @@ -161,6 +161,9 @@ type Bridge interface { SendMessage(msg *BridgeMessage) error GetMessageHandler() MessageHandler GetUserResolver() UserResolver + + // GetRemoteID returns the remote ID used for shared channels registration + GetRemoteID() string } // BridgeUser represents a user connected to any bridge service diff --git a/server/plugin.go b/server/plugin.go index 646286c..97bb691 100644 --- a/server/plugin.go +++ b/server/plugin.go @@ -82,7 +82,7 @@ func (p *Plugin) OnActivate() error { return fmt.Errorf("failed to initialize bridges: %w", err) } - p.commandClient = command.NewCommandHandler(p.client, p.bridgeManager) + p.commandClient = command.NewCommandHandler(p.client, p.API, p.bridgeManager) // Start the bridge manager (this starts message routing) if err := p.bridgeManager.Start(); err != nil { @@ -148,6 +148,7 @@ func (p *Plugin) initBridges(cfg config.Configuration) error { p.API, p.kvstore, &cfg, + p.remoteID, ) if err := p.bridgeManager.RegisterBridge("xmpp", xmppBridge); err != nil { @@ -161,6 +162,7 @@ func (p *Plugin) initBridges(cfg config.Configuration) error { p.kvstore, &cfg, p.botUserID, + "mattermost", ) if err := p.bridgeManager.RegisterBridge("mattermost", mattermostBridge); err != nil { @@ -188,7 +190,7 @@ func (p *Plugin) registerForSharedChannels() error { PluginID: manifest.Id, CreatorID: botUserID, AutoShareDMs: false, - AutoInvited: true, + AutoInvited: false, } remoteID, appErr := p.API.RegisterPluginForSharedChannels(opts) From b1c6f21ea3bb3c7328d3286512192ef729851de4 Mon Sep 17 00:00:00 2001 From: Felipe Martin Date: Tue, 5 Aug 2025 19:40:12 +0200 Subject: [PATCH 7/7] feat: implement OnSharedChannelsSyncMsg hook for bidirectional sync MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add PublishMessage method to BridgeManager interface and implementation - Implement OnSharedChannelsSyncMsg hook to process Mattermost shared channel sync messages - Add processSyncPost helper to convert Mattermost posts to bridge messages - Route sync messages from Mattermost shared channels to XMPP bridge via message bus - Handle user resolution with fallback to API calls for missing users - Add comprehensive logging and error handling for sync operations - Support routing of text messages from shared channels to XMPP rooms 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- server/bridge/manager.go | 11 +++ server/hooks_sharedchannels.go | 128 ++++++++++++++++++++++++++++++++- server/model/bridge.go | 3 + 3 files changed, 139 insertions(+), 3 deletions(-) diff --git a/server/bridge/manager.go b/server/bridge/manager.go index be26322..9984410 100644 --- a/server/bridge/manager.go +++ b/server/bridge/manager.go @@ -550,3 +550,14 @@ func (m *BridgeManager) StopMessageRouting() error { // Stop the message bus return m.messageBus.Stop() } + +// PublishMessage publishes a message to the message bus for routing to target bridges +func (m *BridgeManager) PublishMessage(msg *model.DirectionalMessage) error { + m.logger.LogDebug("Publishing message to message bus", + "source_bridge", msg.SourceBridge, + "direction", msg.Direction, + "target_bridges", msg.TargetBridges, + "message_id", msg.MessageID) + + return m.messageBus.Publish(msg) +} diff --git a/server/hooks_sharedchannels.go b/server/hooks_sharedchannels.go index 798ba0f..e49df2b 100644 --- a/server/hooks_sharedchannels.go +++ b/server/hooks_sharedchannels.go @@ -1,18 +1,24 @@ package main -import "github.com/mattermost/mattermost/server/public/model" +import ( + "fmt" + "time" + + pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" + "github.com/mattermost/mattermost/server/public/model" +) // OnSharedChannelsPing is called to check if the bridge is healthy and ready to process messages func (p *Plugin) OnSharedChannelsPing(remoteCluster *model.RemoteCluster) bool { config := p.getConfiguration() - p.logger.LogDebug("OnSharedChannelsPing called", "remote_cluster_id", remoteCluster.RemoteId) - var remoteClusterID string if remoteCluster != nil { remoteClusterID = remoteCluster.RemoteId } + p.logger.LogDebug("OnSharedChannelsPing called", "remote_cluster_id", remoteClusterID) + p.logger.LogDebug("Received shared channels ping", "remote_cluster_id", remoteClusterID) // If sync is disabled, we're still "healthy" but not actively processing @@ -44,3 +50,119 @@ func (p *Plugin) OnSharedChannelsPing(remoteCluster *model.RemoteCluster) bool { p.logger.LogDebug("Shared channels ping successful - XMPP bridge is healthy", "remote_cluster_id", remoteClusterID) return true } + +// OnSharedChannelsSyncMsg processes sync messages from Mattermost shared channels and routes them to XMPP +func (p *Plugin) OnSharedChannelsSyncMsg(msg *model.SyncMsg, rc *model.RemoteCluster) (model.SyncResponse, error) { + config := p.getConfiguration() + + // Initialize sync response + now := model.GetMillis() + response := model.SyncResponse{ + PostsLastUpdateAt: now, + UsersLastUpdateAt: now, + ReactionsLastUpdateAt: now, + } + + var remoteClusterID string + if rc != nil { + remoteClusterID = rc.RemoteId + } + + p.logger.LogDebug("OnSharedChannelsSyncMsg called", + "remote_cluster_id", remoteClusterID, + "channel_id", msg.ChannelId, + "post_count", len(msg.Posts)) + + // If sync is disabled, return success but don't process + if !config.EnableSync { + p.logger.LogDebug("Sync message received but sync is disabled", "remote_cluster_id", remoteClusterID) + return response, nil + } + + // Process each post in the sync message + var processedCount int + var errors []string + + for _, post := range msg.Posts { + if err := p.processSyncPost(post, msg.ChannelId, msg.Users); err != nil { + errorMsg := fmt.Sprintf("failed to process post %s: %v", post.Id, err) + errors = append(errors, errorMsg) + p.logger.LogError("Failed to process sync post", "post_id", post.Id, "error", err) + } else { + processedCount++ + } + } + + p.logger.LogInfo("Processed sync message", + "remote_cluster_id", remoteClusterID, + "channel_id", msg.ChannelId, + "processed_posts", processedCount, + "failed_posts", len(errors)) + + // If we have errors, return them + if len(errors) > 0 { + return response, fmt.Errorf("failed to process %d posts: %v", len(errors), errors) + } + + return response, nil +} + +// processSyncPost converts a Mattermost post to a bridge message and routes it to XMPP +func (p *Plugin) processSyncPost(post *model.Post, channelID string, users map[string]*model.User) error { + // Find the user who created this post + var postUser *model.User + p.logger.LogInfo("Processing sync post", "post_id", post.UserId, "users", users) + if users != nil { + postUser = users[post.UserId] + } + + // If user not found in sync data, try to get from API + if postUser == nil { + var err error + postUser, err = p.API.GetUser(post.UserId) + if err != nil { + p.logger.LogWarn("Failed to get user for post", "user_id", post.UserId, "post_id", post.Id, "error", err) + // Create a placeholder user + postUser = &model.User{ + Id: post.UserId, + Username: "unknown-user", + } + } + } + + // Create bridge message from Mattermost post + bridgeMessage := &pluginModel.BridgeMessage{ + SourceBridge: "mattermost", + SourceChannelID: channelID, + SourceUserID: postUser.Id, + SourceUserName: postUser.Username, + Content: post.Message, + MessageType: "text", // TODO: Handle other message types + Timestamp: time.Unix(post.CreateAt/1000, 0), + MessageID: post.Id, + ThreadID: post.RootId, + TargetBridges: []string{"xmpp"}, // Route to XMPP + Metadata: map[string]any{ + "original_post": post, + "channel_id": channelID, + }, + } + + // Create directional message for outgoing (Mattermost -> XMPP) + directionalMessage := &pluginModel.DirectionalMessage{ + BridgeMessage: bridgeMessage, + Direction: pluginModel.DirectionOutgoing, + } + + // Publish the message to the message bus for routing to XMPP bridge + if err := p.bridgeManager.PublishMessage(directionalMessage); err != nil { + return fmt.Errorf("failed to publish sync message to message bus: %w", err) + } + + p.logger.LogDebug("Successfully published sync message to message bus", + "post_id", post.Id, + "channel_id", channelID, + "user", postUser.Username) + + return nil +} diff --git a/server/model/bridge.go b/server/model/bridge.go index 73da3ec..dcf67df 100644 --- a/server/model/bridge.go +++ b/server/model/bridge.go @@ -120,6 +120,9 @@ type BridgeManager interface { // DeleteChannepMapping is called when a channel mapping is deleted. DeleteChannepMapping(req DeleteChannelMappingRequest) error + + // PublishMessage publishes a message to the message bus for routing to target bridges + PublishMessage(msg *DirectionalMessage) error } type Bridge interface {