diff --git a/server/bridge/manager.go b/server/bridge/manager.go index 5dc1f9a..4259613 100644 --- a/server/bridge/manager.go +++ b/server/bridge/manager.go @@ -1,6 +1,7 @@ package bridge import ( + "context" "fmt" "sync" @@ -13,11 +14,15 @@ import ( // BridgeManager manages multiple bridge instances type BridgeManager struct { - bridges map[string]model.Bridge - mu sync.RWMutex - logger logger.Logger - api plugin.API - remoteID string + bridges map[string]model.Bridge + mu sync.RWMutex + logger logger.Logger + api plugin.API + remoteID string + messageBus model.MessageBus + routingCtx context.Context + routingCancel context.CancelFunc + routingWg sync.WaitGroup } // NewBridgeManager creates a new bridge manager @@ -29,11 +34,16 @@ func NewBridgeManager(logger logger.Logger, api plugin.API, remoteID string) mod panic("plugin API cannot be nil") } + ctx, cancel := context.WithCancel(context.Background()) + return &BridgeManager{ - bridges: make(map[string]model.Bridge), - logger: logger, - api: api, - remoteID: remoteID, + bridges: make(map[string]model.Bridge), + logger: logger, + api: api, + remoteID: remoteID, + messageBus: NewMessageBus(logger), + routingCtx: ctx, + routingCancel: cancel, } } @@ -56,6 +66,9 @@ func (m *BridgeManager) RegisterBridge(name string, bridge model.Bridge) error { m.bridges[name] = bridge m.logger.LogInfo("Bridge registered", "name", name) + // Subscribe bridge to message bus + go m.startBridgeMessageHandler(name, bridge) + return nil } @@ -405,3 +418,116 @@ func (m *BridgeManager) unshareChannel(channelID string) error { return nil } + +// startBridgeMessageHandler starts message handling for a specific bridge +func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge model.Bridge) { + m.logger.LogDebug("Starting message handler for bridge", "bridge", bridgeName) + + // Subscribe to message bus + messageChannel := m.messageBus.Subscribe(bridgeName) + + // Start message routing goroutine + m.routingWg.Add(1) + go func() { + defer m.routingWg.Done() + defer m.logger.LogDebug("Message handler stopped for bridge", "bridge", bridgeName) + + for { + select { + case msg, ok := <-messageChannel: + if !ok { + m.logger.LogDebug("Message channel closed for bridge", "bridge", bridgeName) + return + } + + if err := m.handleBridgeMessage(bridgeName, bridge, msg); err != nil { + m.logger.LogError("Failed to handle message for bridge", + "bridge", bridgeName, + "source_bridge", msg.SourceBridge, + "error", err) + } + + case <-m.routingCtx.Done(): + m.logger.LogDebug("Context cancelled, stopping message handler", "bridge", bridgeName) + return + } + } + }() + + // Listen to bridge's outgoing messages + m.routingWg.Add(1) + go func() { + defer m.routingWg.Done() + defer m.logger.LogDebug("Bridge message listener stopped", "bridge", bridgeName) + + bridgeMessageChannel := bridge.GetMessageChannel() + for { + select { + case msg, ok := <-bridgeMessageChannel: + if !ok { + m.logger.LogDebug("Bridge message channel closed", "bridge", bridgeName) + return + } + + if err := m.messageBus.Publish(msg); err != nil { + m.logger.LogError("Failed to publish message from bridge", + "bridge", bridgeName, + "direction", msg.Direction, + "error", err) + } + + case <-m.routingCtx.Done(): + m.logger.LogDebug("Context cancelled, stopping bridge listener", "bridge", bridgeName) + return + } + } + }() +} + +// handleBridgeMessage processes an incoming message for a specific bridge +func (m *BridgeManager) handleBridgeMessage(bridgeName string, bridge model.Bridge, msg *model.DirectionalMessage) error { + m.logger.LogDebug("Handling message for bridge", + "target_bridge", bridgeName, + "source_bridge", msg.SourceBridge, + "direction", msg.Direction, + "channel_id", msg.SourceChannelID) + + // Get the bridge's message handler + handler := bridge.GetMessageHandler() + if handler == nil { + return fmt.Errorf("bridge %s does not have a message handler", bridgeName) + } + + // Check if the handler can process this message + if !handler.CanHandleMessage(msg.BridgeMessage) { + m.logger.LogDebug("Bridge cannot handle message", + "bridge", bridgeName, + "message_type", msg.MessageType) + return nil // Not an error, just skip + } + + // Process the message + return handler.ProcessMessage(msg) +} + +// StartMessageRouting starts the message bus and routing system +func (m *BridgeManager) StartMessageRouting() error { + m.logger.LogInfo("Starting message routing system") + return m.messageBus.Start() +} + +// StopMessageRouting stops the message bus and routing system +func (m *BridgeManager) StopMessageRouting() error { + m.logger.LogInfo("Stopping message routing system") + + // Cancel routing context + if m.routingCancel != nil { + m.routingCancel() + } + + // Wait for all routing goroutines to finish + m.routingWg.Wait() + + // Stop the message bus + return m.messageBus.Stop() +} diff --git a/server/bridge/mattermost/bridge.go b/server/bridge/mattermost/bridge.go index c2cdbec..7400b6c 100644 --- a/server/bridge/mattermost/bridge.go +++ b/server/bridge/mattermost/bridge.go @@ -14,6 +14,11 @@ import ( "github.com/mattermost/mattermost/server/public/plugin" ) +const ( + // defaultMessageBufferSize is the buffer size for incoming message channels + defaultMessageBufferSize = 1000 +) + // mattermostBridge handles syncing messages between Mattermost instances type mattermostBridge struct { logger logger.Logger @@ -21,6 +26,11 @@ type mattermostBridge struct { kvstore kvstore.KVStore userManager pluginModel.BridgeUserManager + // Message handling + messageHandler *mattermostMessageHandler + userResolver *mattermostUserResolver + incomingMessages chan *pluginModel.DirectionalMessage + // Connection management connected atomic.Bool ctx context.Context @@ -38,18 +48,23 @@ type mattermostBridge struct { // NewBridge creates a new Mattermost bridge func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration) pluginModel.Bridge { ctx, cancel := context.WithCancel(context.Background()) - bridge := &mattermostBridge{ - logger: log, - api: api, - kvstore: kvstore, - ctx: ctx, - cancel: cancel, - channelMappings: make(map[string]string), - config: cfg, - userManager: bridge.NewUserManager("mattermost", log), + b := &mattermostBridge{ + logger: log, + api: api, + kvstore: kvstore, + ctx: ctx, + cancel: cancel, + channelMappings: make(map[string]string), + config: cfg, + userManager: bridge.NewUserManager("mattermost", log), + incomingMessages: make(chan *pluginModel.DirectionalMessage, defaultMessageBufferSize), } - return bridge + // Initialize handlers after bridge is created + b.messageHandler = newMessageHandler(b) + b.userResolver = newUserResolver(b) + + return b } // getConfiguration safely retrieves the current configuration @@ -343,3 +358,23 @@ func (b *mattermostBridge) GetRoomMapping(roomID string) (string, error) { func (b *mattermostBridge) GetUserManager() pluginModel.BridgeUserManager { return b.userManager } + +// GetMessageChannel returns the channel for incoming messages from Mattermost +func (b *mattermostBridge) GetMessageChannel() <-chan *pluginModel.DirectionalMessage { + return b.incomingMessages +} + +// SendMessage sends a message to a Mattermost channel +func (b *mattermostBridge) SendMessage(msg *pluginModel.BridgeMessage) error { + return b.messageHandler.postMessageToMattermost(msg) +} + +// GetMessageHandler returns the message handler for this bridge +func (b *mattermostBridge) GetMessageHandler() pluginModel.MessageHandler { + return b.messageHandler +} + +// GetUserResolver returns the user resolver for this bridge +func (b *mattermostBridge) GetUserResolver() pluginModel.UserResolver { + return b.userResolver +} diff --git a/server/bridge/mattermost/message_handler.go b/server/bridge/mattermost/message_handler.go new file mode 100644 index 0000000..925165f --- /dev/null +++ b/server/bridge/mattermost/message_handler.go @@ -0,0 +1,207 @@ +package mattermost + +import ( + "fmt" + "strings" + + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" + pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" + mmModel "github.com/mattermost/mattermost/server/public/model" +) + +// mattermostMessageHandler handles incoming messages for the Mattermost bridge +type mattermostMessageHandler struct { + bridge *mattermostBridge + logger logger.Logger +} + +// newMessageHandler creates a new Mattermost message handler +func newMessageHandler(bridge *mattermostBridge) *mattermostMessageHandler { + return &mattermostMessageHandler{ + bridge: bridge, + logger: bridge.logger, + } +} + +// ProcessMessage processes an incoming message for the Mattermost bridge +func (h *mattermostMessageHandler) ProcessMessage(msg *pluginModel.DirectionalMessage) error { + h.logger.LogDebug("Processing message for Mattermost bridge", + "source_bridge", msg.SourceBridge, + "direction", msg.Direction, + "channel_id", msg.SourceChannelID) + + // Skip messages that originated from Mattermost to prevent loops + if msg.SourceBridge == "mattermost" { + h.logger.LogDebug("Skipping Mattermost-originated message to prevent loop") + return nil + } + + // For incoming messages to Mattermost, we post them to Mattermost channels + if msg.Direction == pluginModel.DirectionIncoming { + return h.postMessageToMattermost(msg.BridgeMessage) + } + + h.logger.LogDebug("Ignoring outgoing message for Mattermost bridge") + return nil +} + +// CanHandleMessage determines if this handler can process the message +func (h *mattermostMessageHandler) CanHandleMessage(msg *pluginModel.BridgeMessage) bool { + // Mattermost bridge can handle text messages that didn't originate from Mattermost + return msg.MessageType == "text" && msg.SourceBridge != "mattermost" +} + +// GetSupportedMessageTypes returns the message types this handler supports +func (h *mattermostMessageHandler) GetSupportedMessageTypes() []string { + return []string{"text"} +} + +// postMessageToMattermost posts a message to a Mattermost channel +func (h *mattermostMessageHandler) postMessageToMattermost(msg *pluginModel.BridgeMessage) error { + if h.bridge.api == nil { + return fmt.Errorf("Mattermost API not initialized") + } + + // Get the Mattermost channel ID from the channel mapping + channelID, err := h.bridge.GetRoomMapping(msg.SourceChannelID) + if err != nil { + return fmt.Errorf("failed to get channel mapping: %w", err) + } + if channelID == "" { + // Check if the source channel ID is already a Mattermost channel ID + channelID = msg.SourceChannelID + } + + // Verify the channel exists + channel, appErr := h.bridge.api.GetChannel(channelID) + if appErr != nil { + return fmt.Errorf("failed to get channel %s: %w", channelID, appErr) + } + if channel == nil { + return fmt.Errorf("channel %s not found", channelID) + } + + // Format the message content + content := h.formatMessageContent(msg) + + // Create the post + post := &mmModel.Post{ + ChannelId: channelID, + Message: content, + Type: mmModel.PostTypeDefault, + Props: map[string]interface{}{ + "from_bridge": msg.SourceBridge, + "bridge_user_id": msg.SourceUserID, + "bridge_user_name": msg.SourceUserName, + "bridge_message_id": msg.MessageID, + "bridge_timestamp": msg.Timestamp.Unix(), + }, + } + + // Add thread ID if present + if msg.ThreadID != "" { + post.RootId = msg.ThreadID + } + + // Post the message as the plugin bot + createdPost, appErr := h.bridge.api.CreatePost(post) + if appErr != nil { + return fmt.Errorf("failed to create post in channel %s: %w", channelID, appErr) + } + + h.logger.LogDebug("Message posted to Mattermost channel", + "channel_id", channelID, + "post_id", createdPost.Id, + "source_bridge", msg.SourceBridge, + "content_length", len(content)) + + return nil +} + +// formatMessageContent formats the message content for Mattermost +func (h *mattermostMessageHandler) formatMessageContent(msg *pluginModel.BridgeMessage) string { + // For messages from other bridges, prefix with the bridge info and user name + if msg.SourceUserName != "" { + bridgeIcon := h.getBridgeIcon(msg.SourceBridge) + return fmt.Sprintf("%s **%s**: %s", bridgeIcon, msg.SourceUserName, msg.Content) + } + return msg.Content +} + +// getBridgeIcon returns an icon/emoji for the source bridge +func (h *mattermostMessageHandler) getBridgeIcon(bridgeType string) string { + switch bridgeType { + case "xmpp": + return ":speech_balloon:" // Chat bubble emoji for XMPP + case "slack": + return ":slack:" // Slack emoji if available + case "discord": + return ":discord:" // Discord emoji if available + default: + return ":bridge_at_night:" // Generic bridge emoji + } +} + +// mattermostUserResolver handles user resolution for the Mattermost bridge +type mattermostUserResolver struct { + bridge *mattermostBridge + logger logger.Logger +} + +// newUserResolver creates a new Mattermost user resolver +func newUserResolver(bridge *mattermostBridge) *mattermostUserResolver { + return &mattermostUserResolver{ + bridge: bridge, + logger: bridge.logger, + } +} + +// ResolveUser converts an external user ID to an ExternalUser +func (r *mattermostUserResolver) ResolveUser(externalUserID string) (*pluginModel.ExternalUser, error) { + r.logger.LogDebug("Resolving Mattermost user", "user_id", externalUserID) + + // For Mattermost, the external user ID is the Mattermost user ID + user, appErr := r.bridge.api.GetUser(externalUserID) + if appErr != nil { + return nil, fmt.Errorf("failed to get Mattermost user: %w", appErr) + } + + if user == nil { + return nil, fmt.Errorf("Mattermost user not found: %s", externalUserID) + } + + return &pluginModel.ExternalUser{ + BridgeType: "mattermost", + ExternalUserID: externalUserID, + DisplayName: r.GetDisplayName(externalUserID), + MattermostUserID: externalUserID, // Same as external ID for Mattermost + }, nil +} + +// FormatUserMention formats a user mention for Markdown content +func (r *mattermostUserResolver) FormatUserMention(user *pluginModel.ExternalUser) string { + // For Mattermost, use the standard @username format + return fmt.Sprintf("@%s", user.DisplayName) +} + +// GetDisplayName extracts display name from external user ID +func (r *mattermostUserResolver) GetDisplayName(externalUserID string) string { + // Try to get the actual username from Mattermost API + user, appErr := r.bridge.api.GetUser(externalUserID) + if appErr != nil || user == nil { + r.logger.LogWarn("Failed to get user for display name", "user_id", externalUserID) + return "Unknown User" + } + + // Prefer username, fallback to first name + last name, then to ID + if user.Username != "" { + return user.Username + } + + fullName := strings.TrimSpace(user.FirstName + " " + user.LastName) + if fullName != "" { + return fullName + } + + return user.Id[:8] // Show first 8 chars of ID as fallback +} \ No newline at end of file diff --git a/server/bridge/messagebus.go b/server/bridge/messagebus.go new file mode 100644 index 0000000..f51babd --- /dev/null +++ b/server/bridge/messagebus.go @@ -0,0 +1,244 @@ +package bridge + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" +) + +const ( + // DefaultMessageBufferSize is the default buffer size for message channels + DefaultMessageBufferSize = 1000 + + // MessageDeliveryTimeout is the maximum time to wait for message delivery + MessageDeliveryTimeout = 5 * time.Second +) + +// messageBus implements the MessageBus interface +type messageBus struct { + // Core messaging + incomingMessages chan *model.DirectionalMessage + subscribers map[string]chan *model.DirectionalMessage + subscribersMu sync.RWMutex + + // Lifecycle management + ctx context.Context + cancel context.CancelFunc + logger logger.Logger + wg sync.WaitGroup + started bool + startMu sync.Mutex +} + +// NewMessageBus creates a new message bus instance +func NewMessageBus(logger logger.Logger) model.MessageBus { + ctx, cancel := context.WithCancel(context.Background()) + + return &messageBus{ + incomingMessages: make(chan *model.DirectionalMessage, DefaultMessageBufferSize), + subscribers: make(map[string]chan *model.DirectionalMessage), + ctx: ctx, + cancel: cancel, + logger: logger, + } +} + +// Subscribe returns a channel that receives messages for the specified bridge +func (mb *messageBus) Subscribe(bridgeName string) <-chan *model.DirectionalMessage { + mb.subscribersMu.Lock() + defer mb.subscribersMu.Unlock() + + // Create a buffered channel for this subscriber + ch := make(chan *model.DirectionalMessage, DefaultMessageBufferSize) + mb.subscribers[bridgeName] = ch + + mb.logger.LogDebug("Bridge subscribed to message bus", "bridge", bridgeName) + return ch +} + +// Publish sends a message to the message bus for routing +func (mb *messageBus) Publish(msg *model.DirectionalMessage) error { + if msg == nil { + return fmt.Errorf("message cannot be nil") + } + + if msg.BridgeMessage == nil { + return fmt.Errorf("bridge message cannot be nil") + } + + select { + case mb.incomingMessages <- msg: + mb.logger.LogDebug("Message published to bus", + "source_bridge", msg.SourceBridge, + "direction", msg.Direction, + "channel_id", msg.SourceChannelID) + return nil + case <-time.After(MessageDeliveryTimeout): + mb.logger.LogWarn("Message delivery timeout", + "source_bridge", msg.SourceBridge, + "channel_id", msg.SourceChannelID) + return fmt.Errorf("message delivery timeout") + case <-mb.ctx.Done(): + return fmt.Errorf("message bus is shutting down") + } +} + +// Start begins message routing +func (mb *messageBus) Start() error { + mb.startMu.Lock() + defer mb.startMu.Unlock() + + if mb.started { + return fmt.Errorf("message bus is already started") + } + + mb.logger.LogInfo("Starting message bus") + + // Start the message routing goroutine + mb.wg.Add(1) + go mb.routeMessages() + + mb.started = true + mb.logger.LogInfo("Message bus started successfully") + return nil +} + +// Stop ends message routing and cleans up resources +func (mb *messageBus) Stop() error { + mb.startMu.Lock() + defer mb.startMu.Unlock() + + if !mb.started { + return nil // Already stopped + } + + mb.logger.LogInfo("Stopping message bus") + + // Cancel context to signal shutdown + mb.cancel() + + // Wait for routing goroutine to finish + mb.wg.Wait() + + // Close all subscriber channels + mb.subscribersMu.Lock() + for bridgeName, ch := range mb.subscribers { + close(ch) + mb.logger.LogDebug("Closed subscriber channel", "bridge", bridgeName) + } + mb.subscribers = make(map[string]chan *model.DirectionalMessage) + mb.subscribersMu.Unlock() + + // Close incoming messages channel + close(mb.incomingMessages) + + mb.started = false + mb.logger.LogInfo("Message bus stopped successfully") + return nil +} + +// routeMessages handles the main message routing loop +func (mb *messageBus) routeMessages() { + defer mb.wg.Done() + + mb.logger.LogDebug("Message routing started") + + for { + select { + case msg, ok := <-mb.incomingMessages: + if !ok { + mb.logger.LogDebug("Incoming messages channel closed, stopping routing") + return + } + + if err := mb.routeMessage(msg); err != nil { + mb.logger.LogError("Failed to route message", + "source_bridge", msg.SourceBridge, + "direction", msg.Direction, + "error", err) + } + + case <-mb.ctx.Done(): + mb.logger.LogDebug("Context cancelled, stopping message routing") + return + } + } +} + +// routeMessage routes a single message to appropriate subscribers +func (mb *messageBus) routeMessage(msg *model.DirectionalMessage) error { + mb.subscribersMu.RLock() + defer mb.subscribersMu.RUnlock() + + routedCount := 0 + + // Route to specific target bridges if specified + if len(msg.TargetBridges) > 0 { + for _, targetBridge := range msg.TargetBridges { + if ch, exists := mb.subscribers[targetBridge]; exists { + if mb.deliverMessage(ch, msg, targetBridge) { + routedCount++ + } + } else { + mb.logger.LogWarn("Target bridge not subscribed", + "target_bridge", targetBridge, + "source_bridge", msg.SourceBridge) + } + } + } else { + // Route to all subscribers except the source bridge + for bridgeName, ch := range mb.subscribers { + if bridgeName != msg.SourceBridge { + if mb.deliverMessage(ch, msg, bridgeName) { + routedCount++ + } + } + } + } + + mb.logger.LogDebug("Message routed", + "source_bridge", msg.SourceBridge, + "routed_to_count", routedCount) + + return nil +} + +// deliverMessage attempts to deliver a message to a specific subscriber +func (mb *messageBus) deliverMessage(ch chan *model.DirectionalMessage, msg *model.DirectionalMessage, targetBridge string) bool { + select { + case ch <- msg: + return true + case <-time.After(MessageDeliveryTimeout): + mb.logger.LogWarn("Message delivery timeout to bridge", + "target_bridge", targetBridge, + "source_bridge", msg.SourceBridge) + return false + case <-mb.ctx.Done(): + return false + } +} + +// GetStats returns statistics about the message bus +func (mb *messageBus) GetStats() map[string]interface{} { + mb.subscribersMu.RLock() + defer mb.subscribersMu.RUnlock() + + stats := map[string]interface{}{ + "started": mb.started, + "subscriber_count": len(mb.subscribers), + "buffer_size": DefaultMessageBufferSize, + "pending_messages": len(mb.incomingMessages), + } + + subscribers := make([]string, 0, len(mb.subscribers)) + for bridgeName := range mb.subscribers { + subscribers = append(subscribers, bridgeName) + } + stats["subscribers"] = subscribers + + return stats +} \ No newline at end of file diff --git a/server/bridge/xmpp/bridge.go b/server/bridge/xmpp/bridge.go index b375ce4..ca05e78 100644 --- a/server/bridge/xmpp/bridge.go +++ b/server/bridge/xmpp/bridge.go @@ -18,6 +18,11 @@ import ( "github.com/mattermost/mattermost/server/public/plugin" ) +const ( + // defaultMessageBufferSize is the buffer size for incoming message channels + defaultMessageBufferSize = 1000 +) + // xmppBridge handles syncing messages between Mattermost and XMPP type xmppBridge struct { logger logger.Logger @@ -26,6 +31,11 @@ type xmppBridge struct { bridgeClient *xmppClient.Client // Main bridge XMPP client connection userManager pluginModel.BridgeUserManager + // Message handling + messageHandler *xmppMessageHandler + userResolver *xmppUserResolver + incomingMessages chan *pluginModel.DirectionalMessage + // Connection management connected atomic.Bool ctx context.Context @@ -44,16 +54,21 @@ type xmppBridge struct { func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration) pluginModel.Bridge { ctx, cancel := context.WithCancel(context.Background()) b := &xmppBridge{ - logger: log, - api: api, - kvstore: kvstore, - ctx: ctx, - cancel: cancel, - channelMappings: make(map[string]string), - config: cfg, - userManager: bridge.NewUserManager("xmpp", log), + logger: log, + api: api, + kvstore: kvstore, + ctx: ctx, + cancel: cancel, + channelMappings: make(map[string]string), + config: cfg, + userManager: bridge.NewUserManager("xmpp", log), + incomingMessages: make(chan *pluginModel.DirectionalMessage, defaultMessageBufferSize), } + // Initialize handlers after bridge is created + b.messageHandler = newMessageHandler(b) + b.userResolver = newUserResolver(b) + // Initialize XMPP client with configuration if cfg.EnableSync && cfg.XMPPServerURL != "" && cfg.XMPPUsername != "" && cfg.XMPPPassword != "" { b.bridgeClient = b.createXMPPClient(cfg) @@ -160,6 +175,9 @@ func (b *xmppBridge) Start() error { // Start connection monitor go b.connectionMonitor() + // Start message aggregation + go b.startMessageAggregation() + b.logger.LogInfo("Mattermost to XMPP bridge started successfully") return nil } @@ -533,3 +551,67 @@ func (b *xmppBridge) GetRoomMapping(roomID string) (string, error) { func (b *xmppBridge) GetUserManager() pluginModel.BridgeUserManager { return b.userManager } + +// startMessageAggregation starts the message aggregation goroutine +func (b *xmppBridge) startMessageAggregation() { + b.logger.LogDebug("Starting XMPP message aggregation") + + for { + select { + case <-b.ctx.Done(): + b.logger.LogDebug("Stopping XMPP message aggregation") + return + default: + // Aggregate messages from bridge client if available + if b.bridgeClient != nil { + clientChannel := b.bridgeClient.GetMessageChannel() + select { + case msg, ok := <-clientChannel: + if !ok { + b.logger.LogDebug("Bridge client message channel closed") + continue + } + + // Forward to our bridge's message channel + select { + case b.incomingMessages <- msg: + b.logger.LogDebug("Message forwarded from bridge client", + "source_channel", msg.SourceChannelID, + "user_id", msg.SourceUserID) + default: + b.logger.LogWarn("Bridge message channel full, dropping message", + "source_channel", msg.SourceChannelID, + "user_id", msg.SourceUserID) + } + case <-b.ctx.Done(): + return + default: + // No messages available, continue with other potential sources + } + } + + // TODO: Add aggregation from user client channels when implemented + // This is where we would aggregate from multiple XMPP user connections + } + } +} + +// GetMessageChannel returns the channel for incoming messages from XMPP +func (b *xmppBridge) GetMessageChannel() <-chan *pluginModel.DirectionalMessage { + return b.incomingMessages +} + +// SendMessage sends a message to an XMPP room +func (b *xmppBridge) SendMessage(msg *pluginModel.BridgeMessage) error { + return b.messageHandler.sendMessageToXMPP(msg) +} + +// GetMessageHandler returns the message handler for this bridge +func (b *xmppBridge) GetMessageHandler() pluginModel.MessageHandler { + return b.messageHandler +} + +// GetUserResolver returns the user resolver for this bridge +func (b *xmppBridge) GetUserResolver() pluginModel.UserResolver { + return b.userResolver +} diff --git a/server/bridge/xmpp/message_handler.go b/server/bridge/xmpp/message_handler.go new file mode 100644 index 0000000..3916574 --- /dev/null +++ b/server/bridge/xmpp/message_handler.go @@ -0,0 +1,166 @@ +package xmpp + +import ( + "fmt" + "strings" + + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" + pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" + xmppClient "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp" +) + +// xmppMessageHandler handles incoming messages for the XMPP bridge +type xmppMessageHandler struct { + bridge *xmppBridge + logger logger.Logger +} + +// newMessageHandler creates a new XMPP message handler +func newMessageHandler(bridge *xmppBridge) *xmppMessageHandler { + return &xmppMessageHandler{ + bridge: bridge, + logger: bridge.logger, + } +} + +// ProcessMessage processes an incoming message for the XMPP bridge +func (h *xmppMessageHandler) ProcessMessage(msg *pluginModel.DirectionalMessage) error { + h.logger.LogDebug("Processing message for XMPP bridge", + "source_bridge", msg.SourceBridge, + "direction", msg.Direction, + "channel_id", msg.SourceChannelID) + + // Skip messages that originated from XMPP to prevent loops + if msg.SourceBridge == "xmpp" { + h.logger.LogDebug("Skipping XMPP-originated message to prevent loop") + return nil + } + + // For incoming messages to XMPP, we send them to XMPP rooms + if msg.Direction == pluginModel.DirectionIncoming { + return h.sendMessageToXMPP(msg.BridgeMessage) + } + + h.logger.LogDebug("Ignoring outgoing message for XMPP bridge") + return nil +} + +// CanHandleMessage determines if this handler can process the message +func (h *xmppMessageHandler) CanHandleMessage(msg *pluginModel.BridgeMessage) bool { + // XMPP bridge can handle text messages that didn't originate from XMPP + return msg.MessageType == "text" && msg.SourceBridge != "xmpp" +} + +// GetSupportedMessageTypes returns the message types this handler supports +func (h *xmppMessageHandler) GetSupportedMessageTypes() []string { + return []string{"text"} +} + +// sendMessageToXMPP sends a message to an XMPP room +func (h *xmppMessageHandler) sendMessageToXMPP(msg *pluginModel.BridgeMessage) error { + if h.bridge.bridgeClient == nil { + return fmt.Errorf("XMPP client not initialized") + } + + if !h.bridge.connected.Load() { + return fmt.Errorf("not connected to XMPP server") + } + + // Get the XMPP room JID from the channel mapping + roomJID, err := h.bridge.GetChannelMapping(msg.SourceChannelID) + if err != nil { + return fmt.Errorf("failed to get room mapping: %w", err) + } + if roomJID == "" { + return fmt.Errorf("channel is not mapped to any XMPP room") + } + + // Format the message content with user information + content := h.formatMessageContent(msg) + + // Create XMPP message request + req := xmppClient.MessageRequest{ + RoomJID: roomJID, + Message: content, + } + + // Send the message + _, err = h.bridge.bridgeClient.SendMessage(req) + if err != nil { + return fmt.Errorf("failed to send message to XMPP room: %w", err) + } + + h.logger.LogDebug("Message sent to XMPP room", + "channel_id", msg.SourceChannelID, + "room_jid", roomJID, + "content_length", len(content)) + + return nil +} + +// formatMessageContent formats the message content for XMPP +func (h *xmppMessageHandler) formatMessageContent(msg *pluginModel.BridgeMessage) string { + // For messages from other bridges, prefix with the user name + if msg.SourceUserName != "" { + return fmt.Sprintf("<%s> %s", msg.SourceUserName, msg.Content) + } + return msg.Content +} + +// xmppUserResolver handles user resolution for the XMPP bridge +type xmppUserResolver struct { + bridge *xmppBridge + logger logger.Logger +} + +// newUserResolver creates a new XMPP user resolver +func newUserResolver(bridge *xmppBridge) *xmppUserResolver { + return &xmppUserResolver{ + bridge: bridge, + logger: bridge.logger, + } +} + +// ResolveUser converts an external user ID to an ExternalUser +func (r *xmppUserResolver) ResolveUser(externalUserID string) (*pluginModel.ExternalUser, error) { + r.logger.LogDebug("Resolving XMPP user", "user_id", externalUserID) + + // For XMPP, the external user ID is typically the full JID + return &pluginModel.ExternalUser{ + BridgeType: "xmpp", + ExternalUserID: externalUserID, + DisplayName: r.GetDisplayName(externalUserID), + MattermostUserID: "", // Will be resolved by user mapping system + }, nil +} + +// FormatUserMention formats a user mention for Markdown content +func (r *xmppUserResolver) FormatUserMention(user *pluginModel.ExternalUser) string { + // For XMPP, we can format mentions as simple text with the display name + return fmt.Sprintf("@%s", user.DisplayName) +} + +// GetDisplayName extracts display name from external user ID +func (r *xmppUserResolver) GetDisplayName(externalUserID string) string { + // For XMPP JIDs, extract the local part or resource as display name + // Format: user@domain/resource -> use resource or user + if len(externalUserID) == 0 { + return "Unknown User" + } + + // Try to parse as JID and extract meaningful display name + parts := strings.Split(externalUserID, "/") + if len(parts) > 1 { + // Has resource part, use it as display name + return parts[1] + } + + // No resource, try to extract local part from user@domain + atIndex := strings.Index(externalUserID, "@") + if atIndex > 0 { + return externalUserID[:atIndex] + } + + // Fallback to the full ID + return externalUserID +} \ No newline at end of file diff --git a/server/model/bridge.go b/server/model/bridge.go index a5df73e..f274e0e 100644 --- a/server/model/bridge.go +++ b/server/model/bridge.go @@ -152,6 +152,12 @@ type Bridge interface { // GetUserManager returns the user manager for this bridge. GetUserManager() BridgeUserManager + + // Message handling for bidirectional communication + GetMessageChannel() <-chan *DirectionalMessage + SendMessage(msg *BridgeMessage) error + GetMessageHandler() MessageHandler + GetUserResolver() UserResolver } // BridgeUser represents a user connected to any bridge service diff --git a/server/model/message.go b/server/model/message.go new file mode 100644 index 0000000..5b2a252 --- /dev/null +++ b/server/model/message.go @@ -0,0 +1,88 @@ +package model + +import ( + "time" +) + +// MessageDirection indicates the direction of message flow +type MessageDirection string + +const ( + DirectionIncoming MessageDirection = "incoming" // From external system to us + DirectionOutgoing MessageDirection = "outgoing" // From us to external system +) + +// BridgeMessage represents a message that can be passed between any bridge types +type BridgeMessage struct { + // Source information + SourceBridge string // "xmpp", "mattermost", "slack", etc. + SourceChannelID string // Channel ID in source system + SourceUserID string // User ID in source system (JID, user ID, etc.) + SourceUserName string // Display name in source system + + // Message content (standardized on Markdown) + Content string // Markdown formatted message content + MessageType string // "text", "image", "file", etc. + + // Metadata + Timestamp time.Time // When message was received + MessageID string // Unique message ID from source + ThreadID string // Thread/reply ID (if applicable) + + // Routing hints + TargetBridges []string // Which bridges should receive this + Metadata map[string]any // Bridge-specific metadata +} + +// DirectionalMessage wraps a BridgeMessage with direction information +type DirectionalMessage struct { + *BridgeMessage + Direction MessageDirection +} + +// ExternalUser represents a user from any bridge system +type ExternalUser struct { + BridgeType string // "xmpp", "slack", etc. + ExternalUserID string // JID, Slack user ID, etc. + DisplayName string // How to display this user + MattermostUserID string // Mapped Mattermost user (if exists) +} + +// UserResolver handles user resolution for a specific bridge +type UserResolver interface { + // ResolveUser converts an external user ID to an ExternalUser + ResolveUser(externalUserID string) (*ExternalUser, error) + + // FormatUserMention formats a user mention for Markdown content + FormatUserMention(user *ExternalUser) string + + // GetDisplayName extracts display name from external user ID + GetDisplayName(externalUserID string) string +} + +// MessageBus handles routing messages between bridges +type MessageBus interface { + // Subscribe returns a channel that receives messages for the specified bridge + Subscribe(bridgeName string) <-chan *DirectionalMessage + + // Publish sends a message to the message bus for routing + Publish(msg *DirectionalMessage) error + + // Start begins message routing + Start() error + + // Stop ends message routing and cleans up resources + Stop() error +} + +// MessageHandler processes incoming messages for a bridge +type MessageHandler interface { + // ProcessMessage handles an incoming message + ProcessMessage(msg *DirectionalMessage) error + + // CanHandleMessage determines if this handler can process the message + CanHandleMessage(msg *BridgeMessage) bool + + // GetSupportedMessageTypes returns the message types this handler supports + GetSupportedMessageTypes() []string +} \ No newline at end of file diff --git a/server/xmpp/client.go b/server/xmpp/client.go index 83f3709..7f32100 100644 --- a/server/xmpp/client.go +++ b/server/xmpp/client.go @@ -9,7 +9,9 @@ import ( "time" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" "mellium.im/sasl" + "mellium.im/xmlstream" "mellium.im/xmpp" "mellium.im/xmpp/disco" "mellium.im/xmpp/jid" @@ -21,6 +23,9 @@ import ( const ( // defaultOperationTimeout is the default timeout for XMPP operations defaultOperationTimeout = 5 * time.Second + + // msgBufferSize is the buffer size for incoming message channels + msgBufferSize = 1000 ) // Client represents an XMPP client for communicating with XMPP servers. @@ -43,6 +48,9 @@ type Client struct { mux *mux.ServeMux sessionReady chan struct{} sessionServing bool + + // Message handling for bridge integration + incomingMessages chan *model.DirectionalMessage } // MessageRequest represents a request to send a message. @@ -90,22 +98,31 @@ type UserProfile struct { // NewClient creates a new XMPP client. func NewClient(serverURL, username, password, resource, remoteID string, logger logger.Logger) *Client { ctx, cancel := context.WithCancel(context.Background()) - mucClient := &muc.Client{} - mux := mux.New("jabber:client", muc.HandleClient(mucClient)) - return &Client{ - serverURL: serverURL, - username: username, - password: password, - resource: resource, - remoteID: remoteID, - logger: logger, - ctx: ctx, - cancel: cancel, - mucClient: mucClient, - mux: mux, - sessionReady: make(chan struct{}), + client := &Client{ + serverURL: serverURL, + username: username, + password: password, + resource: resource, + remoteID: remoteID, + logger: logger, + ctx: ctx, + cancel: cancel, + sessionReady: make(chan struct{}), + incomingMessages: make(chan *model.DirectionalMessage, msgBufferSize), } + + // Create MUC client and set up message handling + mucClient := &muc.Client{} + client.mucClient = mucClient + + // Create mux with MUC client and our message handler + mux := mux.New("jabber:client", + muc.HandleClient(mucClient), + mux.MessageFunc(stanza.GroupChatMessage, xml.Name{}, client.handleIncomingMessage)) + client.mux = mux + + return client } // NewClientWithTLS creates a new XMPP client with custom TLS configuration. @@ -599,3 +616,110 @@ func (c *Client) Ping() error { c.logger.LogDebug("XMPP ping successful", "duration", duration) return nil } + +// GetMessageChannel returns the channel for incoming messages (Bridge interface) +func (c *Client) GetMessageChannel() <-chan *model.DirectionalMessage { + return c.incomingMessages +} + +// handleIncomingMessage processes incoming XMPP message stanzas +func (c *Client) handleIncomingMessage(msg stanza.Message, t xmlstream.TokenReadEncoder) error { + c.logger.LogDebug("Received XMPP message", + "from", msg.From.String(), + "to", msg.To.String(), + "type", fmt.Sprintf("%v", msg.Type)) + + // Only process groupchat messages for now (MUC messages from channels) + if msg.Type != stanza.GroupChatMessage { + c.logger.LogDebug("Ignoring non-groupchat message", "type", fmt.Sprintf("%v", msg.Type)) + return nil + } + + // Parse the message body from the token reader + var msgWithBody struct { + stanza.Message + Body string `xml:"body"` + } + msgWithBody.Message = msg + + d := xml.NewTokenDecoder(t) + if err := d.DecodeElement(&msgWithBody, nil); err != nil { + c.logger.LogError("Failed to decode message body", "error", err) + return err + } + + if msgWithBody.Body == "" { + c.logger.LogDebug("Message has no body, ignoring") + return nil + } + + // Extract channel and user information from JIDs + channelID, err := c.extractChannelID(msg.From) + if err != nil { + c.logger.LogError("Failed to extract channel ID from JID", "from", msg.From.String(), "error", err) + return nil + } + + userID, userName := c.extractUserInfo(msg.From) + + // Create BridgeMessage + bridgeMsg := &model.BridgeMessage{ + SourceBridge: "xmpp", + SourceChannelID: channelID, + SourceUserID: userID, + SourceUserName: userName, + Content: msgWithBody.Body, // Already Markdown compatible + MessageType: "text", + Timestamp: time.Now(), // XMPP doesn't always provide timestamps + MessageID: msg.ID, + TargetBridges: []string{}, // Will be routed to all other bridges + Metadata: map[string]any{ + "xmpp_from": msg.From.String(), + "xmpp_to": msg.To.String(), + }, + } + + // Wrap in directional message + directionalMsg := &model.DirectionalMessage{ + BridgeMessage: bridgeMsg, + Direction: model.DirectionIncoming, + } + + // Send to message channel (non-blocking) + select { + case c.incomingMessages <- directionalMsg: + c.logger.LogDebug("Message queued for processing", + "channel_id", channelID, + "user_id", userID, + "content_length", len(msgWithBody.Body)) + default: + c.logger.LogWarn("Message channel full, dropping message", + "channel_id", channelID, + "user_id", userID) + } + + return nil +} + +// extractChannelID extracts the channel ID (room bare JID) from a message JID +func (c *Client) extractChannelID(from jid.JID) (string, error) { + // For MUC messages, the channel ID is the bare JID (without resource/nickname) + return from.Bare().String(), nil +} + +// extractUserInfo extracts user ID and display name from a message JID +func (c *Client) extractUserInfo(from jid.JID) (string, string) { + // For MUC messages, the resource part is the nickname + nickname := from.Resourcepart() + + // Use the full JID as user ID for XMPP + userID := from.String() + + // Use nickname as display name if available, otherwise use full JID + displayName := nickname + if displayName == "" { + displayName = from.String() + } + + return userID, displayName +}