diff --git a/go.mod b/go.mod index efd6373..b1bc908 100644 --- a/go.mod +++ b/go.mod @@ -102,6 +102,7 @@ require ( github.com/hashicorp/yamux v0.1.2 // indirect github.com/hexops/gotextdiff v1.0.3 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/jellydator/ttlcache/v3 v3.4.0 // indirect github.com/jgautheron/goconst v1.7.1 // indirect github.com/jingyugao/rowserrcheck v1.1.1 // indirect github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect @@ -210,7 +211,7 @@ require ( golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f // indirect golang.org/x/mod v0.22.0 // indirect golang.org/x/net v0.34.0 // indirect - golang.org/x/sync v0.10.0 // indirect + golang.org/x/sync v0.15.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/term v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect diff --git a/go.sum b/go.sum index 9a27f5c..8d1f976 100644 --- a/go.sum +++ b/go.sum @@ -366,6 +366,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= +github.com/jellydator/ttlcache/v3 v3.4.0 h1:YS4P125qQS0tNhtL6aeYkheEaB/m8HCqdMMP4mnWdTY= +github.com/jellydator/ttlcache/v3 v3.4.0/go.mod h1:Hw9EgjymziQD3yGsQdf1FqFdpp7YjFMd4Srg5EJlgD4= github.com/jgautheron/goconst v1.7.1 h1:VpdAG7Ca7yvvJk5n8dMwQhfEZJh95kl/Hl9S1OI5Jkk= github.com/jgautheron/goconst v1.7.1/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4= github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c= @@ -867,6 +869,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= +golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/server/bridge/manager.go b/server/bridge/manager.go index 1137368..9984410 100644 --- a/server/bridge/manager.go +++ b/server/bridge/manager.go @@ -1,9 +1,11 @@ package bridge import ( + "context" "fmt" "sync" + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" mmModel "github.com/mattermost/mattermost/server/public/model" @@ -12,11 +14,15 @@ import ( // BridgeManager manages multiple bridge instances type BridgeManager struct { - bridges map[string]model.Bridge - mu sync.RWMutex - logger logger.Logger - api plugin.API - remoteID string + bridges map[string]model.Bridge + mu sync.RWMutex + logger logger.Logger + api plugin.API + remoteID string + messageBus model.MessageBus + routingCtx context.Context + routingCancel context.CancelFunc + routingWg sync.WaitGroup } // NewBridgeManager creates a new bridge manager @@ -28,11 +34,16 @@ func NewBridgeManager(logger logger.Logger, api plugin.API, remoteID string) mod panic("plugin API cannot be nil") } + ctx, cancel := context.WithCancel(context.Background()) + return &BridgeManager{ - bridges: make(map[string]model.Bridge), - logger: logger, - api: api, - remoteID: remoteID, + bridges: make(map[string]model.Bridge), + logger: logger, + api: api, + remoteID: remoteID, + messageBus: NewMessageBus(logger), + routingCtx: ctx, + routingCancel: cancel, } } @@ -55,6 +66,9 @@ func (m *BridgeManager) RegisterBridge(name string, bridge model.Bridge) error { m.bridges[name] = bridge m.logger.LogInfo("Bridge registered", "name", name) + // Subscribe bridge to message bus + go m.startBridgeMessageHandler(name, bridge) + return nil } @@ -149,6 +163,20 @@ func (m *BridgeManager) ListBridges() []string { return bridges } +// Start starts the bridge manager and message routing system +func (m *BridgeManager) Start() error { + m.logger.LogInfo("Starting bridge manager") + + // Start the message routing system + if err := m.StartMessageRouting(); err != nil { + m.logger.LogError("Failed to start message routing", "error", err) + return fmt.Errorf("failed to start message routing: %w", err) + } + + m.logger.LogInfo("Bridge manager started successfully") + return nil +} + // HasBridge checks if a bridge with the given name is registered func (m *BridgeManager) HasBridge(name string) bool { m.mu.RLock() @@ -173,6 +201,11 @@ func (m *BridgeManager) Shutdown() error { m.logger.LogInfo("Shutting down bridge manager", "bridge_count", len(m.bridges)) + // Stop message routing first + if err := m.StopMessageRouting(); err != nil { + m.logger.LogError("Failed to stop message routing during shutdown", "error", err) + } + var errors []error for name, bridge := range m.bridges { if bridge.IsConnected() { @@ -196,7 +229,7 @@ func (m *BridgeManager) Shutdown() error { } // OnPluginConfigurationChange propagates configuration changes to all registered bridges -func (m *BridgeManager) OnPluginConfigurationChange(config any) error { +func (m *BridgeManager) OnPluginConfigurationChange(config *config.Configuration) error { m.mu.RLock() defer m.mu.RUnlock() @@ -231,7 +264,7 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque return fmt.Errorf("invalid mapping request: %w", err) } - m.logger.LogDebug("Creating channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID, "user_id", req.UserID, "team_id", req.TeamID) + m.logger.LogDebug("Creating channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID, "user_id", req.UserID, "team_id", req.TeamID) // Get the specific bridge bridge, err := m.GetBridge(req.BridgeName) @@ -246,41 +279,41 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque } // NEW: Check if room already mapped to another channel - existingChannelID, err := bridge.GetRoomMapping(req.BridgeRoomID) + existingChannelID, err := bridge.GetChannelMapping(req.BridgeChannelID) if err != nil { - m.logger.LogError("Failed to check room mapping", "bridge_room_id", req.BridgeRoomID, "error", err) - return fmt.Errorf("failed to check room mapping: %w", err) + m.logger.LogError("Failed to check channel mapping", "bridge_channel_id", req.BridgeChannelID, "error", err) + return fmt.Errorf("failed to check channel mapping: %w", err) } if existingChannelID != "" { - m.logger.LogWarn("Room already mapped to another channel", - "bridge_room_id", req.BridgeRoomID, + m.logger.LogWarn("Channel already mapped to another channel", + "bridge_channel_id", req.BridgeChannelID, "existing_channel_id", existingChannelID, "requested_channel_id", req.ChannelID) - return fmt.Errorf("room '%s' is already mapped to channel '%s'", req.BridgeRoomID, existingChannelID) + return fmt.Errorf("channel '%s' is already mapped to channel '%s'", req.BridgeChannelID, existingChannelID) } // NEW: Check if room exists on target bridge - roomExists, err := bridge.RoomExists(req.BridgeRoomID) + channelExists, err := bridge.ChannelMappingExists(req.BridgeChannelID) if err != nil { - m.logger.LogError("Failed to check room existence", "bridge_room_id", req.BridgeRoomID, "error", err) - return fmt.Errorf("failed to check room existence: %w", err) + m.logger.LogError("Failed to check channel existence", "bridge_channel_id", req.BridgeChannelID, "error", err) + return fmt.Errorf("failed to check channel existence: %w", err) } - if !roomExists { - m.logger.LogWarn("Room does not exist on bridge", - "bridge_room_id", req.BridgeRoomID, + if !channelExists { + m.logger.LogWarn("Channel does not exist on bridge", + "bridge_channel_id", req.BridgeChannelID, "bridge_name", req.BridgeName) - return fmt.Errorf("room '%s' does not exist on %s bridge", req.BridgeRoomID, req.BridgeName) + return fmt.Errorf("channel '%s' does not exist on %s bridge", req.BridgeChannelID, req.BridgeName) } - m.logger.LogDebug("Room validation passed", - "bridge_room_id", req.BridgeRoomID, + m.logger.LogDebug("Channel validation passed", + "bridge_channel_id", req.BridgeChannelID, "bridge_name", req.BridgeName, - "room_exists", roomExists, + "channel_exists", channelExists, "already_mapped", false) // Create the channel mapping on the receiving bridge - if err = bridge.CreateChannelMapping(req.ChannelID, req.BridgeRoomID); err != nil { - m.logger.LogError("Failed to create channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID, "error", err) + if err = bridge.CreateChannelMapping(req.ChannelID, req.BridgeChannelID); err != nil { + m.logger.LogError("Failed to create channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID, "error", err) return fmt.Errorf("failed to create channel mapping for bridge '%s': %w", req.BridgeName, err) } @@ -291,19 +324,19 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque } // Create the channel mapping in the Mattermost bridge - if err = mattermostBridge.CreateChannelMapping(req.ChannelID, req.BridgeRoomID); err != nil { - m.logger.LogError("Failed to create channel mapping in Mattermost bridge", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID, "error", err) + if err = mattermostBridge.CreateChannelMapping(req.ChannelID, req.BridgeChannelID); err != nil { + m.logger.LogError("Failed to create channel mapping in Mattermost bridge", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID, "error", err) return fmt.Errorf("failed to create channel mapping in Mattermost bridge: %w", err) } // Share the channel using Mattermost's shared channels API if err = m.shareChannel(req); err != nil { - m.logger.LogError("Failed to share channel", "channel_id", req.ChannelID, "bridge_room_id", req.BridgeRoomID, "error", err) + m.logger.LogError("Failed to share channel", "channel_id", req.ChannelID, "bridge_channel_id", req.BridgeChannelID, "error", err) // Don't fail the entire operation if sharing fails, but log the error m.logger.LogWarn("Channel mapping created but sharing failed - channel may not sync properly") } - m.logger.LogInfo("Successfully created channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID) + m.logger.LogInfo("Successfully created channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID) return nil } @@ -370,9 +403,9 @@ func (m *BridgeManager) shareChannel(req model.CreateChannelMappingRequest) erro TeamId: req.TeamID, Home: true, ReadOnly: false, - ShareName: model.SanitizeShareName(fmt.Sprintf("bridge-%s", req.BridgeRoomID)), - ShareDisplayName: fmt.Sprintf("Bridge: %s", req.BridgeRoomID), - SharePurpose: fmt.Sprintf("Shared channel bridged to %s", req.BridgeRoomID), + ShareName: model.SanitizeShareName(fmt.Sprintf("bridge-%s", req.BridgeChannelID)), + ShareDisplayName: fmt.Sprintf("Bridge: %s", req.BridgeChannelID), + SharePurpose: fmt.Sprintf("Shared channel bridged to %s", req.BridgeChannelID), ShareHeader: "test header", CreatorId: req.UserID, RemoteId: m.remoteID, @@ -404,3 +437,127 @@ func (m *BridgeManager) unshareChannel(channelID string) error { return nil } + +// startBridgeMessageHandler starts message handling for a specific bridge +func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge model.Bridge) { + m.logger.LogDebug("Starting message handler for bridge", "bridge", bridgeName) + + // Subscribe to message bus + messageChannel := m.messageBus.Subscribe(bridgeName) + + // Start message routing goroutine + m.routingWg.Add(1) + go func() { + defer m.routingWg.Done() + defer m.logger.LogDebug("Message handler stopped for bridge", "bridge", bridgeName) + + for { + select { + case msg, ok := <-messageChannel: + if !ok { + m.logger.LogDebug("Message channel closed for bridge", "bridge", bridgeName) + return + } + + if err := m.handleBridgeMessage(bridgeName, bridge, msg); err != nil { + m.logger.LogError("Failed to handle message for bridge", + "bridge", bridgeName, + "source_bridge", msg.SourceBridge, + "error", err) + } + + case <-m.routingCtx.Done(): + m.logger.LogDebug("Context cancelled, stopping message handler", "bridge", bridgeName) + return + } + } + }() + + // Listen to bridge's outgoing messages + m.routingWg.Add(1) + go func() { + defer m.routingWg.Done() + defer m.logger.LogDebug("Bridge message listener stopped", "bridge", bridgeName) + + bridgeMessageChannel := bridge.GetMessageChannel() + for { + select { + case msg, ok := <-bridgeMessageChannel: + if !ok { + m.logger.LogDebug("Bridge message channel closed", "bridge", bridgeName) + return + } + + if err := m.messageBus.Publish(msg); err != nil { + m.logger.LogError("Failed to publish message from bridge", + "bridge", bridgeName, + "direction", msg.Direction, + "error", err) + } + + case <-m.routingCtx.Done(): + m.logger.LogDebug("Context cancelled, stopping bridge listener", "bridge", bridgeName) + return + } + } + }() +} + +// handleBridgeMessage processes an incoming message for a specific bridge +func (m *BridgeManager) handleBridgeMessage(bridgeName string, bridge model.Bridge, msg *model.DirectionalMessage) error { + m.logger.LogDebug("Handling message for bridge", + "target_bridge", bridgeName, + "source_bridge", msg.SourceBridge, + "direction", msg.Direction, + "channel_id", msg.SourceChannelID) + + // Get the bridge's message handler + handler := bridge.GetMessageHandler() + if handler == nil { + return fmt.Errorf("bridge %s does not have a message handler", bridgeName) + } + + // Check if the handler can process this message + if !handler.CanHandleMessage(msg.BridgeMessage) { + m.logger.LogDebug("Bridge cannot handle message", + "bridge", bridgeName, + "message_type", msg.MessageType) + return nil // Not an error, just skip + } + + // Process the message + return handler.ProcessMessage(msg) +} + +// StartMessageRouting starts the message bus and routing system +func (m *BridgeManager) StartMessageRouting() error { + m.logger.LogInfo("Starting message routing system") + return m.messageBus.Start() +} + +// StopMessageRouting stops the message bus and routing system +func (m *BridgeManager) StopMessageRouting() error { + m.logger.LogInfo("Stopping message routing system") + + // Cancel routing context + if m.routingCancel != nil { + m.routingCancel() + } + + // Wait for all routing goroutines to finish + m.routingWg.Wait() + + // Stop the message bus + return m.messageBus.Stop() +} + +// PublishMessage publishes a message to the message bus for routing to target bridges +func (m *BridgeManager) PublishMessage(msg *model.DirectionalMessage) error { + m.logger.LogDebug("Publishing message to message bus", + "source_bridge", msg.SourceBridge, + "direction", msg.Direction, + "target_bridges", msg.TargetBridges, + "message_id", msg.MessageID) + + return m.messageBus.Publish(msg) +} diff --git a/server/bridge/mattermost/bridge.go b/server/bridge/mattermost/bridge.go index 79baf6e..e618535 100644 --- a/server/bridge/mattermost/bridge.go +++ b/server/bridge/mattermost/bridge.go @@ -14,12 +14,24 @@ import ( "github.com/mattermost/mattermost/server/public/plugin" ) +const ( + // defaultMessageBufferSize is the buffer size for incoming message channels + defaultMessageBufferSize = 1000 +) + // mattermostBridge handles syncing messages between Mattermost instances type mattermostBridge struct { logger logger.Logger api plugin.API kvstore kvstore.KVStore userManager pluginModel.BridgeUserManager + botUserID string // Bot user ID for posting messages + remoteID string // Remote ID for shared channels + + // Message handling + messageHandler *mattermostMessageHandler + userResolver *mattermostUserResolver + incomingMessages chan *pluginModel.DirectionalMessage // Connection management connected atomic.Bool @@ -36,27 +48,41 @@ type mattermostBridge struct { } // NewBridge creates a new Mattermost bridge -func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration) pluginModel.Bridge { +func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration, botUserID, remoteID string) pluginModel.Bridge { ctx, cancel := context.WithCancel(context.Background()) - bridge := &mattermostBridge{ - logger: log, - api: api, - kvstore: kvstore, - ctx: ctx, - cancel: cancel, - channelMappings: make(map[string]string), - config: cfg, - userManager: bridge.NewUserManager("mattermost", log), + b := &mattermostBridge{ + logger: log, + api: api, + kvstore: kvstore, + botUserID: botUserID, + remoteID: remoteID, + ctx: ctx, + cancel: cancel, + channelMappings: make(map[string]string), + config: cfg, + userManager: bridge.NewUserManager("mattermost", log), + incomingMessages: make(chan *pluginModel.DirectionalMessage, defaultMessageBufferSize), } - return bridge + // Initialize handlers after bridge is created + b.messageHandler = newMessageHandler(b) + b.userResolver = newUserResolver(b) + + return b +} + +// getConfiguration safely retrieves the current configuration +func (b *mattermostBridge) getConfiguration() *config.Configuration { + b.configMu.RLock() + defer b.configMu.RUnlock() + return b.config } // UpdateConfiguration updates the bridge configuration -func (b *mattermostBridge) UpdateConfiguration(newConfig any) error { - cfg, ok := newConfig.(*config.Configuration) - if !ok { - return fmt.Errorf("invalid configuration type") +func (b *mattermostBridge) UpdateConfiguration(cfg *config.Configuration) error { + // Validate configuration using built-in validation + if err := cfg.IsValid(); err != nil { + return fmt.Errorf("invalid configuration: %w", err) } b.configMu.Lock() @@ -282,8 +308,8 @@ func (b *mattermostBridge) DeleteChannelMapping(channelID string) error { return nil } -// RoomExists checks if a Mattermost channel exists on the server -func (b *mattermostBridge) RoomExists(roomID string) (bool, error) { +// ChannelMappingExists checks if a Mattermost channel exists on the server +func (b *mattermostBridge) ChannelMappingExists(roomID string) (bool, error) { if b.api == nil { return false, fmt.Errorf("Mattermost API not initialized") } @@ -332,7 +358,47 @@ func (b *mattermostBridge) GetRoomMapping(roomID string) (string, error) { return channelID, nil } +// GetChannelMappingForBridge retrieves the Mattermost channel ID for a given room ID from a specific bridge +func (b *mattermostBridge) GetChannelMappingForBridge(bridgeName, roomID string) (string, error) { + channelIDBytes, err := b.kvstore.Get(kvstore.BuildChannelMapKey(bridgeName, roomID)) + if err != nil { + // No mapping found is not an error, just return empty string + b.logger.LogDebug("No channel mapping found for bridge room", "bridge_name", bridgeName, "room_id", roomID) + return "", nil + } + + channelID := string(channelIDBytes) + b.logger.LogDebug("Found channel mapping for bridge room", "bridge_name", bridgeName, "room_id", roomID, "channel_id", channelID) + + return channelID, nil +} + // GetUserManager returns the user manager for this bridge func (b *mattermostBridge) GetUserManager() pluginModel.BridgeUserManager { return b.userManager } + +// GetMessageChannel returns the channel for incoming messages from Mattermost +func (b *mattermostBridge) GetMessageChannel() <-chan *pluginModel.DirectionalMessage { + return b.incomingMessages +} + +// SendMessage sends a message to a Mattermost channel +func (b *mattermostBridge) SendMessage(msg *pluginModel.BridgeMessage) error { + return b.messageHandler.postMessageToMattermost(msg) +} + +// GetMessageHandler returns the message handler for this bridge +func (b *mattermostBridge) GetMessageHandler() pluginModel.MessageHandler { + return b.messageHandler +} + +// GetUserResolver returns the user resolver for this bridge +func (b *mattermostBridge) GetUserResolver() pluginModel.UserResolver { + return b.userResolver +} + +// GetRemoteID returns the remote ID used for shared channels registration +func (b *mattermostBridge) GetRemoteID() string { + return b.remoteID +} diff --git a/server/bridge/mattermost/message_handler.go b/server/bridge/mattermost/message_handler.go new file mode 100644 index 0000000..691ae7f --- /dev/null +++ b/server/bridge/mattermost/message_handler.go @@ -0,0 +1,208 @@ +package mattermost + +import ( + "fmt" + "strings" + + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" + pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" + mmModel "github.com/mattermost/mattermost/server/public/model" +) + +// mattermostMessageHandler handles incoming messages for the Mattermost bridge +type mattermostMessageHandler struct { + bridge *mattermostBridge + logger logger.Logger +} + +// newMessageHandler creates a new Mattermost message handler +func newMessageHandler(bridge *mattermostBridge) *mattermostMessageHandler { + return &mattermostMessageHandler{ + bridge: bridge, + logger: bridge.logger, + } +} + +// ProcessMessage processes an incoming message for the Mattermost bridge +func (h *mattermostMessageHandler) ProcessMessage(msg *pluginModel.DirectionalMessage) error { + h.logger.LogDebug("Processing message for Mattermost bridge", + "source_bridge", msg.SourceBridge, + "direction", msg.Direction, + "channel_id", msg.SourceChannelID) + + // Skip messages that originated from Mattermost to prevent loops + if msg.SourceBridge == "mattermost" { + h.logger.LogDebug("Skipping Mattermost-originated message to prevent loop") + return nil + } + + // For incoming messages to Mattermost, we post them to Mattermost channels + if msg.Direction == pluginModel.DirectionIncoming { + return h.postMessageToMattermost(msg.BridgeMessage) + } + + h.logger.LogDebug("Ignoring outgoing message for Mattermost bridge") + return nil +} + +// CanHandleMessage determines if this handler can process the message +func (h *mattermostMessageHandler) CanHandleMessage(msg *pluginModel.BridgeMessage) bool { + // Mattermost bridge can handle text messages that didn't originate from Mattermost + return msg.MessageType == "text" && msg.SourceBridge != "mattermost" +} + +// GetSupportedMessageTypes returns the message types this handler supports +func (h *mattermostMessageHandler) GetSupportedMessageTypes() []string { + return []string{"text"} +} + +// postMessageToMattermost posts a message to a Mattermost channel +func (h *mattermostMessageHandler) postMessageToMattermost(msg *pluginModel.BridgeMessage) error { + if h.bridge.api == nil { + return fmt.Errorf("Mattermost API not initialized") + } + + // Get the Mattermost channel ID from the channel mapping using the source bridge name + channelID, err := h.bridge.GetChannelMappingForBridge(msg.SourceBridge, msg.SourceChannelID) + if err != nil { + return fmt.Errorf("failed to get channel mapping: %w", err) + } + if channelID == "" { + // Check if the source channel ID is already a Mattermost channel ID + channelID = msg.SourceChannelID + } + + // Verify the channel exists + channel, appErr := h.bridge.api.GetChannel(channelID) + if appErr != nil { + return fmt.Errorf("failed to get channel %s: %w", channelID, appErr) + } + if channel == nil { + return fmt.Errorf("channel %s not found", channelID) + } + + // Format the message content + content := h.formatMessageContent(msg) + + // Create the post + post := &mmModel.Post{ + ChannelId: channelID, + UserId: h.bridge.botUserID, + Message: content, + Type: mmModel.PostTypeDefault, + Props: map[string]interface{}{ + "from_bridge": msg.SourceBridge, + "bridge_user_id": msg.SourceUserID, + "bridge_user_name": msg.SourceUserName, + "bridge_message_id": msg.MessageID, + "bridge_timestamp": msg.Timestamp.Unix(), + }, + } + + // Add thread ID if present + if msg.ThreadID != "" { + post.RootId = msg.ThreadID + } + + // Post the message as the plugin bot + createdPost, appErr := h.bridge.api.CreatePost(post) + if appErr != nil { + return fmt.Errorf("failed to create post in channel %s: %w", channelID, appErr) + } + + h.logger.LogDebug("Message posted to Mattermost channel", + "channel_id", channelID, + "post_id", createdPost.Id, + "source_bridge", msg.SourceBridge, + "content_length", len(content)) + + return nil +} + +// formatMessageContent formats the message content for Mattermost +func (h *mattermostMessageHandler) formatMessageContent(msg *pluginModel.BridgeMessage) string { + // For messages from other bridges, prefix with the bridge info and user name + if msg.SourceUserName != "" { + bridgeIcon := h.getBridgeIcon(msg.SourceBridge) + return fmt.Sprintf("%s **%s**: %s", bridgeIcon, msg.SourceUserName, msg.Content) + } + return msg.Content +} + +// getBridgeIcon returns an icon/emoji for the source bridge +func (h *mattermostMessageHandler) getBridgeIcon(bridgeType string) string { + switch bridgeType { + case "xmpp": + return ":speech_balloon:" // Chat bubble emoji for XMPP + case "slack": + return ":slack:" // Slack emoji if available + case "discord": + return ":discord:" // Discord emoji if available + default: + return ":bridge_at_night:" // Generic bridge emoji + } +} + +// mattermostUserResolver handles user resolution for the Mattermost bridge +type mattermostUserResolver struct { + bridge *mattermostBridge + logger logger.Logger +} + +// newUserResolver creates a new Mattermost user resolver +func newUserResolver(bridge *mattermostBridge) *mattermostUserResolver { + return &mattermostUserResolver{ + bridge: bridge, + logger: bridge.logger, + } +} + +// ResolveUser converts an external user ID to an ExternalUser +func (r *mattermostUserResolver) ResolveUser(externalUserID string) (*pluginModel.ExternalUser, error) { + r.logger.LogDebug("Resolving Mattermost user", "user_id", externalUserID) + + // For Mattermost, the external user ID is the Mattermost user ID + user, appErr := r.bridge.api.GetUser(externalUserID) + if appErr != nil { + return nil, fmt.Errorf("failed to get Mattermost user: %w", appErr) + } + + if user == nil { + return nil, fmt.Errorf("Mattermost user not found: %s", externalUserID) + } + + return &pluginModel.ExternalUser{ + BridgeType: "mattermost", + ExternalUserID: externalUserID, + DisplayName: r.GetDisplayName(externalUserID), + MattermostUserID: externalUserID, // Same as external ID for Mattermost + }, nil +} + +// FormatUserMention formats a user mention for Markdown content +func (r *mattermostUserResolver) FormatUserMention(user *pluginModel.ExternalUser) string { + // For Mattermost, use the standard @username format + return fmt.Sprintf("@%s", user.DisplayName) +} + +// GetDisplayName extracts display name from external user ID +func (r *mattermostUserResolver) GetDisplayName(externalUserID string) string { + // Try to get the actual username from Mattermost API + user, appErr := r.bridge.api.GetUser(externalUserID) + if appErr != nil || user == nil { + r.logger.LogWarn("Failed to get user for display name", "user_id", externalUserID) + return "Unknown User" + } + + // Prefer username, fallback to first name + last name, then to ID + if user.Username != "" { + return user.Username + } + + fullName := strings.TrimSpace(user.FirstName + " " + user.LastName) + if fullName != "" { + return fullName + } + + return user.Id[:8] // Show first 8 chars of ID as fallback +} diff --git a/server/bridge/messagebus.go b/server/bridge/messagebus.go new file mode 100644 index 0000000..6def69a --- /dev/null +++ b/server/bridge/messagebus.go @@ -0,0 +1,244 @@ +package bridge + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" +) + +const ( + // DefaultMessageBufferSize is the default buffer size for message channels + DefaultMessageBufferSize = 1000 + + // MessageDeliveryTimeout is the maximum time to wait for message delivery + MessageDeliveryTimeout = 5 * time.Second +) + +// messageBus implements the MessageBus interface +type messageBus struct { + // Core messaging + incomingMessages chan *model.DirectionalMessage + subscribers map[string]chan *model.DirectionalMessage + subscribersMu sync.RWMutex + + // Lifecycle management + ctx context.Context + cancel context.CancelFunc + logger logger.Logger + wg sync.WaitGroup + started bool + startMu sync.Mutex +} + +// NewMessageBus creates a new message bus instance +func NewMessageBus(logger logger.Logger) model.MessageBus { + ctx, cancel := context.WithCancel(context.Background()) + + return &messageBus{ + incomingMessages: make(chan *model.DirectionalMessage, DefaultMessageBufferSize), + subscribers: make(map[string]chan *model.DirectionalMessage), + ctx: ctx, + cancel: cancel, + logger: logger, + } +} + +// Subscribe returns a channel that receives messages for the specified bridge +func (mb *messageBus) Subscribe(bridgeName string) <-chan *model.DirectionalMessage { + mb.subscribersMu.Lock() + defer mb.subscribersMu.Unlock() + + // Create a buffered channel for this subscriber + ch := make(chan *model.DirectionalMessage, DefaultMessageBufferSize) + mb.subscribers[bridgeName] = ch + + mb.logger.LogDebug("Bridge subscribed to message bus", "bridge", bridgeName) + return ch +} + +// Publish sends a message to the message bus for routing +func (mb *messageBus) Publish(msg *model.DirectionalMessage) error { + if msg == nil { + return fmt.Errorf("message cannot be nil") + } + + if msg.BridgeMessage == nil { + return fmt.Errorf("bridge message cannot be nil") + } + + select { + case mb.incomingMessages <- msg: + mb.logger.LogDebug("Message published to bus", + "source_bridge", msg.SourceBridge, + "direction", msg.Direction, + "channel_id", msg.SourceChannelID) + return nil + case <-time.After(MessageDeliveryTimeout): + mb.logger.LogWarn("Message delivery timeout", + "source_bridge", msg.SourceBridge, + "channel_id", msg.SourceChannelID) + return fmt.Errorf("message delivery timeout") + case <-mb.ctx.Done(): + return fmt.Errorf("message bus is shutting down") + } +} + +// Start begins message routing +func (mb *messageBus) Start() error { + mb.startMu.Lock() + defer mb.startMu.Unlock() + + if mb.started { + return fmt.Errorf("message bus is already started") + } + + mb.logger.LogInfo("Starting message bus") + + // Start the message routing goroutine + mb.wg.Add(1) + go mb.routeMessages() + + mb.started = true + mb.logger.LogInfo("Message bus started successfully") + return nil +} + +// Stop ends message routing and cleans up resources +func (mb *messageBus) Stop() error { + mb.startMu.Lock() + defer mb.startMu.Unlock() + + if !mb.started { + return nil // Already stopped + } + + mb.logger.LogInfo("Stopping message bus") + + // Cancel context to signal shutdown + mb.cancel() + + // Wait for routing goroutine to finish + mb.wg.Wait() + + // Close all subscriber channels + mb.subscribersMu.Lock() + for bridgeName, ch := range mb.subscribers { + close(ch) + mb.logger.LogDebug("Closed subscriber channel", "bridge", bridgeName) + } + mb.subscribers = make(map[string]chan *model.DirectionalMessage) + mb.subscribersMu.Unlock() + + // Close incoming messages channel + close(mb.incomingMessages) + + mb.started = false + mb.logger.LogInfo("Message bus stopped successfully") + return nil +} + +// routeMessages handles the main message routing loop +func (mb *messageBus) routeMessages() { + defer mb.wg.Done() + + mb.logger.LogDebug("Message routing started") + + for { + select { + case msg, ok := <-mb.incomingMessages: + if !ok { + mb.logger.LogDebug("Incoming messages channel closed, stopping routing") + return + } + + if err := mb.routeMessage(msg); err != nil { + mb.logger.LogError("Failed to route message", + "source_bridge", msg.SourceBridge, + "direction", msg.Direction, + "error", err) + } + + case <-mb.ctx.Done(): + mb.logger.LogDebug("Context cancelled, stopping message routing") + return + } + } +} + +// routeMessage routes a single message to appropriate subscribers +func (mb *messageBus) routeMessage(msg *model.DirectionalMessage) error { + mb.subscribersMu.RLock() + defer mb.subscribersMu.RUnlock() + + routedCount := 0 + + // Route to specific target bridges if specified + if len(msg.TargetBridges) > 0 { + for _, targetBridge := range msg.TargetBridges { + if ch, exists := mb.subscribers[targetBridge]; exists { + if mb.deliverMessage(ch, msg, targetBridge) { + routedCount++ + } + } else { + mb.logger.LogWarn("Target bridge not subscribed", + "target_bridge", targetBridge, + "source_bridge", msg.SourceBridge) + } + } + } else { + // Route to all subscribers except the source bridge + for bridgeName, ch := range mb.subscribers { + if bridgeName != msg.SourceBridge { + if mb.deliverMessage(ch, msg, bridgeName) { + routedCount++ + } + } + } + } + + mb.logger.LogDebug("Message routed", + "source_bridge", msg.SourceBridge, + "routed_to_count", routedCount) + + return nil +} + +// deliverMessage attempts to deliver a message to a specific subscriber +func (mb *messageBus) deliverMessage(ch chan *model.DirectionalMessage, msg *model.DirectionalMessage, targetBridge string) bool { + select { + case ch <- msg: + return true + case <-time.After(MessageDeliveryTimeout): + mb.logger.LogWarn("Message delivery timeout to bridge", + "target_bridge", targetBridge, + "source_bridge", msg.SourceBridge) + return false + case <-mb.ctx.Done(): + return false + } +} + +// GetStats returns statistics about the message bus +func (mb *messageBus) GetStats() map[string]interface{} { + mb.subscribersMu.RLock() + defer mb.subscribersMu.RUnlock() + + stats := map[string]interface{}{ + "started": mb.started, + "subscriber_count": len(mb.subscribers), + "buffer_size": DefaultMessageBufferSize, + "pending_messages": len(mb.incomingMessages), + } + + subscribers := make([]string, 0, len(mb.subscribers)) + for bridgeName := range mb.subscribers { + subscribers = append(subscribers, bridgeName) + } + stats["subscribers"] = subscribers + + return stats +} diff --git a/server/bridge/xmpp/bridge.go b/server/bridge/xmpp/bridge.go index 4a1ffe6..902b8ae 100644 --- a/server/bridge/xmpp/bridge.go +++ b/server/bridge/xmpp/bridge.go @@ -18,6 +18,11 @@ import ( "github.com/mattermost/mattermost/server/public/plugin" ) +const ( + // defaultMessageBufferSize is the buffer size for incoming message channels + defaultMessageBufferSize = 1000 +) + // xmppBridge handles syncing messages between Mattermost and XMPP type xmppBridge struct { logger logger.Logger @@ -25,6 +30,12 @@ type xmppBridge struct { kvstore kvstore.KVStore bridgeClient *xmppClient.Client // Main bridge XMPP client connection userManager pluginModel.BridgeUserManager + remoteID string // Remote ID for shared channels + + // Message handling + messageHandler *xmppMessageHandler + userResolver *xmppUserResolver + incomingMessages chan *pluginModel.DirectionalMessage // Connection management connected atomic.Bool @@ -41,19 +52,25 @@ type xmppBridge struct { } // NewBridge creates a new XMPP bridge -func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration) pluginModel.Bridge { +func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration, remoteID string) pluginModel.Bridge { ctx, cancel := context.WithCancel(context.Background()) b := &xmppBridge{ - logger: log, - api: api, - kvstore: kvstore, - ctx: ctx, - cancel: cancel, - channelMappings: make(map[string]string), - config: cfg, - userManager: bridge.NewUserManager("xmpp", log), + logger: log, + api: api, + kvstore: kvstore, + ctx: ctx, + cancel: cancel, + channelMappings: make(map[string]string), + config: cfg, + userManager: bridge.NewUserManager("xmpp", log), + incomingMessages: make(chan *pluginModel.DirectionalMessage, defaultMessageBufferSize), + remoteID: remoteID, } + // Initialize handlers after bridge is created + b.messageHandler = newMessageHandler(b) + b.userResolver = newUserResolver(b) + // Initialize XMPP client with configuration if cfg.EnableSync && cfg.XMPPServerURL != "" && cfg.XMPPUsername != "" && cfg.XMPPPassword != "" { b.bridgeClient = b.createXMPPClient(cfg) @@ -80,56 +97,50 @@ func (b *xmppBridge) createXMPPClient(cfg *config.Configuration) *xmppClient.Cli ) } +// getConfiguration safely retrieves the current configuration +func (b *xmppBridge) getConfiguration() *config.Configuration { + b.configMu.RLock() + defer b.configMu.RUnlock() + return b.config +} + // UpdateConfiguration updates the bridge configuration -func (b *xmppBridge) UpdateConfiguration(newConfig any) error { - cfg, ok := newConfig.(*config.Configuration) - if !ok { - return fmt.Errorf("invalid configuration type") +// It handles validation and reconnection logic when the configuration changes +func (b *xmppBridge) UpdateConfiguration(cfg *config.Configuration) error { + // Validate configuration using built-in validation + if err := cfg.IsValid(); err != nil { + return fmt.Errorf("invalid configuration: %w", err) } - b.configMu.Lock() - oldConfig := b.config - b.config = cfg - defer b.configMu.Unlock() + // Get current config to check if restart is needed + oldConfig := b.getConfiguration() - b.logger.LogInfo("XMPP bridge configuration updated") + // Update configuration under lock, then release immediately + b.configMu.Lock() + b.config = cfg // Initialize or update XMPP client with new configuration - if cfg.EnableSync { - if cfg.XMPPServerURL == "" || cfg.XMPPUsername == "" || cfg.XMPPPassword == "" { - return fmt.Errorf("XMPP server URL, username, and password are required when sync is enabled") + if !cfg.Equals(oldConfig) { + if b.bridgeClient != nil && b.bridgeClient.Disconnect() != nil { + b.logger.LogError("Failed to disconnect old XMPP bridge client") } - b.bridgeClient = b.createXMPPClient(cfg) - } else { - b.bridgeClient = nil + } + b.configMu.Unlock() + + // Stop the bridge + if err := b.Stop(); err != nil { + b.logger.LogWarn("Error stopping bridge during restart", "error", err) } - // Check if we need to restart the bridge due to configuration changes - wasConnected := b.connected.Load() - needsRestart := oldConfig != nil && !oldConfig.Equals(cfg) && wasConnected - - // Log the configuration change - if needsRestart { - b.logger.LogInfo("Configuration changed, restarting bridge") - } else { - b.logger.LogInfo("Configuration updated", "config", cfg) + // Start the bridge with new configuration + // Start() method already uses getConfiguration() safely + if err := b.Start(); err != nil { + b.logger.LogError("Failed to restart bridge with new configuration", "error", err) + return fmt.Errorf("failed to restart bridge: %w", err) } - if needsRestart { - b.logger.LogInfo("Configuration changed, restarting bridge") - - // Stop the bridge - if err := b.Stop(); err != nil { - b.logger.LogWarn("Error stopping bridge during restart", "error", err) - } - - // Start the bridge with new configuration - if err := b.Start(); err != nil { - b.logger.LogError("Failed to restart bridge with new configuration", "error", err) - return fmt.Errorf("failed to restart bridge: %w", err) - } - } + b.logger.LogDebug("XMPP bridge configuration updated successfully") return nil } @@ -166,6 +177,9 @@ func (b *xmppBridge) Start() error { // Start connection monitor go b.connectionMonitor() + // Start message aggregation + go b.startMessageAggregation() + b.logger.LogInfo("Mattermost to XMPP bridge started successfully") return nil } @@ -490,8 +504,8 @@ func (b *xmppBridge) DeleteChannelMapping(channelID string) error { return nil } -// RoomExists checks if an XMPP room exists on the remote service -func (b *xmppBridge) RoomExists(roomID string) (bool, error) { +// ChannelMappingExists checks if an XMPP room exists on the remote service +func (b *xmppBridge) ChannelMappingExists(roomID string) (bool, error) { if !b.connected.Load() { return false, fmt.Errorf("not connected to XMPP server") } @@ -535,7 +549,78 @@ func (b *xmppBridge) GetRoomMapping(roomID string) (string, error) { return channelID, nil } +// GetChannelMappingForBridge retrieves the Mattermost channel ID for a given room ID from a specific bridge +func (b *xmppBridge) GetChannelMappingForBridge(bridgeName, roomID string) (string, error) { + // Look up the channel ID using the bridge name and room ID as the key + channelIDBytes, err := b.kvstore.Get(kvstore.BuildChannelMapKey(bridgeName, roomID)) + if err != nil { + // No mapping found is not an error, just return empty string + b.logger.LogDebug("No channel mapping found for bridge room", "bridge_name", bridgeName, "room_id", roomID) + return "", nil + } + + channelID := string(channelIDBytes) + b.logger.LogDebug("Found channel mapping for bridge room", "bridge_name", bridgeName, "room_id", roomID, "channel_id", channelID) + + return channelID, nil +} + // GetUserManager returns the user manager for this bridge func (b *xmppBridge) GetUserManager() pluginModel.BridgeUserManager { return b.userManager } + +// startMessageAggregation starts the message aggregation goroutine +func (b *xmppBridge) startMessageAggregation() { + clientChannel := b.bridgeClient.GetMessageChannel() + + for { + select { + case <-b.ctx.Done(): + b.logger.LogDebug("Stopping XMPP message aggregation") + return + case msg, ok := <-clientChannel: + if !ok { + b.logger.LogDebug("Bridge client message channel closed") + return + } + + // Forward to our bridge's message channel + select { + case b.incomingMessages <- msg: + // Message forwarded successfully + case <-b.ctx.Done(): + return + default: + b.logger.LogWarn("Bridge message channel full, dropping message", + "source_channel", msg.SourceChannelID, + "user_id", msg.SourceUserID) + } + } + } +} + +// GetMessageChannel returns the channel for incoming messages from XMPP +func (b *xmppBridge) GetMessageChannel() <-chan *pluginModel.DirectionalMessage { + return b.incomingMessages +} + +// SendMessage sends a message to an XMPP room +func (b *xmppBridge) SendMessage(msg *pluginModel.BridgeMessage) error { + return b.messageHandler.sendMessageToXMPP(msg) +} + +// GetMessageHandler returns the message handler for this bridge +func (b *xmppBridge) GetMessageHandler() pluginModel.MessageHandler { + return b.messageHandler +} + +// GetUserResolver returns the user resolver for this bridge +func (b *xmppBridge) GetUserResolver() pluginModel.UserResolver { + return b.userResolver +} + +// GetRemoteID returns the remote ID used for shared channels registration +func (b *xmppBridge) GetRemoteID() string { + return b.remoteID +} diff --git a/server/bridge/xmpp/message_handler.go b/server/bridge/xmpp/message_handler.go new file mode 100644 index 0000000..4035901 --- /dev/null +++ b/server/bridge/xmpp/message_handler.go @@ -0,0 +1,166 @@ +package xmpp + +import ( + "fmt" + "strings" + + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" + pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" + xmppClient "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp" +) + +// xmppMessageHandler handles incoming messages for the XMPP bridge +type xmppMessageHandler struct { + bridge *xmppBridge + logger logger.Logger +} + +// newMessageHandler creates a new XMPP message handler +func newMessageHandler(bridge *xmppBridge) *xmppMessageHandler { + return &xmppMessageHandler{ + bridge: bridge, + logger: bridge.logger, + } +} + +// ProcessMessage processes an incoming message for the XMPP bridge +func (h *xmppMessageHandler) ProcessMessage(msg *pluginModel.DirectionalMessage) error { + h.logger.LogDebug("Processing message for XMPP bridge", + "source_bridge", msg.SourceBridge, + "direction", msg.Direction, + "channel_id", msg.SourceChannelID) + + // Skip messages that originated from XMPP to prevent loops + if msg.SourceBridge == "xmpp" { + h.logger.LogDebug("Skipping XMPP-originated message to prevent loop") + return nil + } + + // For incoming messages to XMPP, we send them to XMPP rooms + if msg.Direction == pluginModel.DirectionIncoming { + return h.sendMessageToXMPP(msg.BridgeMessage) + } + + h.logger.LogDebug("Ignoring outgoing message for XMPP bridge") + return nil +} + +// CanHandleMessage determines if this handler can process the message +func (h *xmppMessageHandler) CanHandleMessage(msg *pluginModel.BridgeMessage) bool { + // XMPP bridge can handle text messages that didn't originate from XMPP + return msg.MessageType == "text" && msg.SourceBridge != "xmpp" +} + +// GetSupportedMessageTypes returns the message types this handler supports +func (h *xmppMessageHandler) GetSupportedMessageTypes() []string { + return []string{"text"} +} + +// sendMessageToXMPP sends a message to an XMPP room +func (h *xmppMessageHandler) sendMessageToXMPP(msg *pluginModel.BridgeMessage) error { + if h.bridge.bridgeClient == nil { + return fmt.Errorf("XMPP client not initialized") + } + + if !h.bridge.connected.Load() { + return fmt.Errorf("not connected to XMPP server") + } + + // Get the XMPP room JID from the channel mapping + roomJID, err := h.bridge.GetChannelMapping(msg.SourceChannelID) + if err != nil { + return fmt.Errorf("failed to get room mapping: %w", err) + } + if roomJID == "" { + return fmt.Errorf("channel is not mapped to any XMPP room") + } + + // Format the message content with user information + content := h.formatMessageContent(msg) + + // Create XMPP message request + req := xmppClient.MessageRequest{ + RoomJID: roomJID, + Message: content, + } + + // Send the message + _, err = h.bridge.bridgeClient.SendMessage(req) + if err != nil { + return fmt.Errorf("failed to send message to XMPP room: %w", err) + } + + h.logger.LogDebug("Message sent to XMPP room", + "channel_id", msg.SourceChannelID, + "room_jid", roomJID, + "content_length", len(content)) + + return nil +} + +// formatMessageContent formats the message content for XMPP +func (h *xmppMessageHandler) formatMessageContent(msg *pluginModel.BridgeMessage) string { + // For messages from other bridges, prefix with the user name + if msg.SourceUserName != "" { + return fmt.Sprintf("<%s> %s", msg.SourceUserName, msg.Content) + } + return msg.Content +} + +// xmppUserResolver handles user resolution for the XMPP bridge +type xmppUserResolver struct { + bridge *xmppBridge + logger logger.Logger +} + +// newUserResolver creates a new XMPP user resolver +func newUserResolver(bridge *xmppBridge) *xmppUserResolver { + return &xmppUserResolver{ + bridge: bridge, + logger: bridge.logger, + } +} + +// ResolveUser converts an external user ID to an ExternalUser +func (r *xmppUserResolver) ResolveUser(externalUserID string) (*pluginModel.ExternalUser, error) { + r.logger.LogDebug("Resolving XMPP user", "user_id", externalUserID) + + // For XMPP, the external user ID is typically the full JID + return &pluginModel.ExternalUser{ + BridgeType: "xmpp", + ExternalUserID: externalUserID, + DisplayName: r.GetDisplayName(externalUserID), + MattermostUserID: "", // Will be resolved by user mapping system + }, nil +} + +// FormatUserMention formats a user mention for Markdown content +func (r *xmppUserResolver) FormatUserMention(user *pluginModel.ExternalUser) string { + // For XMPP, we can format mentions as simple text with the display name + return fmt.Sprintf("@%s", user.DisplayName) +} + +// GetDisplayName extracts display name from external user ID +func (r *xmppUserResolver) GetDisplayName(externalUserID string) string { + // For XMPP JIDs, extract the local part or resource as display name + // Format: user@domain/resource -> use resource or user + if len(externalUserID) == 0 { + return "Unknown User" + } + + // Try to parse as JID and extract meaningful display name + parts := strings.Split(externalUserID, "/") + if len(parts) > 1 { + // Has resource part, use it as display name + return parts[1] + } + + // No resource, try to extract local part from user@domain + atIndex := strings.Index(externalUserID, "@") + if atIndex > 0 { + return externalUserID[:atIndex] + } + + // Fallback to the full ID + return externalUserID +} diff --git a/server/command/command.go b/server/command/command.go index d15b9f5..686eb97 100644 --- a/server/command/command.go +++ b/server/command/command.go @@ -6,11 +6,13 @@ import ( pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/public/plugin" "github.com/mattermost/mattermost/server/public/pluginapi" ) type Handler struct { client *pluginapi.Client + api plugin.API bridgeManager pluginModel.BridgeManager } @@ -22,7 +24,7 @@ type Command interface { const xmppBridgeCommandTrigger = "xmppbridge" // Register all your slash commands in the NewCommandHandler function. -func NewCommandHandler(client *pluginapi.Client, bridgeManager pluginModel.BridgeManager) Command { +func NewCommandHandler(client *pluginapi.Client, api plugin.API, bridgeManager pluginModel.BridgeManager) Command { // Register XMPP bridge command xmppBridgeData := model.NewAutocompleteData(xmppBridgeCommandTrigger, "", "Manage XMPP bridge") mapSubcommand := model.NewAutocompleteData("map", "[room_jid]", "Map current channel to XMPP room") @@ -35,11 +37,17 @@ func NewCommandHandler(client *pluginapi.Client, bridgeManager pluginModel.Bridg statusSubcommand := model.NewAutocompleteData("status", "", "Show bridge connection status") xmppBridgeData.AddCommand(statusSubcommand) + syncSubcommand := model.NewAutocompleteData("sync", "", "Force sync with shared channels if channel is mapped") + xmppBridgeData.AddCommand(syncSubcommand) + + syncResetSubcommand := model.NewAutocompleteData("sync-reset", "", "Reset sync cursor for channel if mapped") + xmppBridgeData.AddCommand(syncResetSubcommand) + err := client.SlashCommand.Register(&model.Command{ Trigger: xmppBridgeCommandTrigger, AutoComplete: true, AutoCompleteDesc: "Manage XMPP bridge mappings", - AutoCompleteHint: "[map|unmap|status]", + AutoCompleteHint: "[map|unmap|status|sync|sync-reset]", AutocompleteData: xmppBridgeData, }) if err != nil { @@ -48,6 +56,7 @@ func NewCommandHandler(client *pluginapi.Client, bridgeManager pluginModel.Bridg return &Handler{ client: client, + api: api, bridgeManager: bridgeManager, } } @@ -85,6 +94,8 @@ func (c *Handler) executeXMPPBridgeCommand(args *model.CommandArgs) *model.Comma - ` + "`/xmppbridge map `" + ` - Map current channel to XMPP room - ` + "`/xmppbridge unmap`" + ` - Unmap current channel from XMPP room - ` + "`/xmppbridge status`" + ` - Show bridge connection status +- ` + "`/xmppbridge sync`" + ` - Force sync with shared channels if channel is mapped +- ` + "`/xmppbridge sync-reset`" + ` - Reset sync cursor for channel if mapped **Example:** ` + "`/xmppbridge map general@conference.example.com`", @@ -99,6 +110,10 @@ func (c *Handler) executeXMPPBridgeCommand(args *model.CommandArgs) *model.Comma return c.executeUnmapCommand(args) case "status": return c.executeStatusCommand(args) + case "sync": + return c.executeSyncCommand(args) + case "sync-reset": + return c.executeSyncResetCommand(args) default: return &model.CommandResponse{ ResponseType: model.CommandResponseTypeEphemeral, @@ -161,11 +176,11 @@ func (c *Handler) executeMapCommand(args *model.CommandArgs, fields []string) *m // Create the mapping using BridgeManager mappingReq := pluginModel.CreateChannelMappingRequest{ - ChannelID: channelID, - BridgeName: "xmpp", - BridgeRoomID: roomJID, - UserID: args.UserId, - TeamID: args.TeamId, + ChannelID: channelID, + BridgeName: "xmpp", + BridgeChannelID: roomJID, + UserID: args.UserId, + TeamID: args.TeamId, } err = c.bridgeManager.CreateChannelMapping(mappingReq) @@ -284,6 +299,105 @@ func (c *Handler) isSystemAdmin(userID string) bool { } // formatMappingError provides user-friendly error messages for mapping operations +func (c *Handler) executeSyncCommand(args *model.CommandArgs) *model.CommandResponse { + channelID := args.ChannelId + + // Get the XMPP bridge to check if channel is mapped + bridge, err := c.bridgeManager.GetBridge("xmpp") + if err != nil { + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: "❌ XMPP bridge is not available. Please check the plugin configuration.", + } + } + + // Check if channel is mapped to XMPP + roomJID, err := bridge.GetChannelMapping(channelID) + if err != nil { + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: fmt.Sprintf("Error checking channel mapping: %v", err), + } + } + + if roomJID == "" { + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: "❌ This channel is not mapped to any XMPP room. Use `/xmppbridge map ` to create a mapping first.", + } + } + + // Force sync with shared channels + if err := c.api.SyncSharedChannel(channelID); err != nil { + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: fmt.Sprintf("❌ Failed to sync channel: %v", err), + } + } + + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: fmt.Sprintf("✅ Successfully triggered sync for channel mapped to XMPP room: `%s`", roomJID), + } +} + +func (c *Handler) executeSyncResetCommand(args *model.CommandArgs) *model.CommandResponse { + channelID := args.ChannelId + + // Get the XMPP bridge to check if channel is mapped + bridge, err := c.bridgeManager.GetBridge("xmpp") + if err != nil { + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: "❌ XMPP bridge is not available. Please check the plugin configuration.", + } + } + + // Check if channel is mapped to XMPP + roomJID, err := bridge.GetChannelMapping(channelID) + if err != nil { + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: fmt.Sprintf("Error checking channel mapping: %v", err), + } + } + + if roomJID == "" { + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: "❌ This channel is not mapped to any XMPP room. Use `/xmppbridge map ` to create a mapping first.", + } + } + + // Get remoteID from bridge for cursor operations + remoteID := bridge.GetRemoteID() + if remoteID == "" { + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: "❌ Bridge remote ID not available. Cannot reset sync cursor.", + } + } + + // Create empty cursor to reset to beginning + emptyCursor := model.GetPostsSinceForSyncCursor{ + LastPostUpdateAt: 1, + LastPostCreateAt: 1, + } + + // Reset sync cursor using UpdateSharedChannelCursor + if err := c.api.UpdateSharedChannelCursor(channelID, remoteID, emptyCursor); err != nil { + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: fmt.Sprintf("❌ Failed to reset sync cursor: %v", err), + } + } + + return &model.CommandResponse{ + ResponseType: model.CommandResponseTypeEphemeral, + Text: fmt.Sprintf("✅ Successfully reset sync cursor for channel mapped to XMPP room: `%s`", roomJID), + } +} + func (c *Handler) formatMappingError(operation, roomJID string, err error) *model.CommandResponse { errorMsg := err.Error() @@ -292,7 +406,7 @@ func (c *Handler) formatMappingError(operation, roomJID string, err error) *mode case strings.Contains(errorMsg, "already mapped to channel"): return &model.CommandResponse{ ResponseType: model.CommandResponseTypeEphemeral, - Text: fmt.Sprintf(`❌ **Room Already Mapped** + Text: fmt.Sprintf(`❌ **Channel Already Mapped** The XMPP room **%s** is already connected to another channel. @@ -305,7 +419,7 @@ The XMPP room **%s** is already connected to another channel. case strings.Contains(errorMsg, "does not exist"): return &model.CommandResponse{ ResponseType: model.CommandResponseTypeEphemeral, - Text: fmt.Sprintf(`❌ **Room Not Found** + Text: fmt.Sprintf(`❌ **Channel Not Found** The XMPP room **%s** doesn't exist or isn't accessible. diff --git a/server/configuration.go b/server/configuration.go index 79fa130..b6bad1f 100644 --- a/server/configuration.go +++ b/server/configuration.go @@ -57,7 +57,7 @@ func (p *Plugin) OnConfigurationChange() error { return errors.Wrap(err, "failed to load plugin configuration") } - p.API.LogDebug("Loaded configuration in OnConfigurationChange", "configuration", configuration) + p.API.LogDebug("Plugin configuration changed") // Validate the configuration if err := configuration.IsValid(); err != nil { diff --git a/server/hooks_sharedchannels.go b/server/hooks_sharedchannels.go index 798ba0f..e49df2b 100644 --- a/server/hooks_sharedchannels.go +++ b/server/hooks_sharedchannels.go @@ -1,18 +1,24 @@ package main -import "github.com/mattermost/mattermost/server/public/model" +import ( + "fmt" + "time" + + pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" + "github.com/mattermost/mattermost/server/public/model" +) // OnSharedChannelsPing is called to check if the bridge is healthy and ready to process messages func (p *Plugin) OnSharedChannelsPing(remoteCluster *model.RemoteCluster) bool { config := p.getConfiguration() - p.logger.LogDebug("OnSharedChannelsPing called", "remote_cluster_id", remoteCluster.RemoteId) - var remoteClusterID string if remoteCluster != nil { remoteClusterID = remoteCluster.RemoteId } + p.logger.LogDebug("OnSharedChannelsPing called", "remote_cluster_id", remoteClusterID) + p.logger.LogDebug("Received shared channels ping", "remote_cluster_id", remoteClusterID) // If sync is disabled, we're still "healthy" but not actively processing @@ -44,3 +50,119 @@ func (p *Plugin) OnSharedChannelsPing(remoteCluster *model.RemoteCluster) bool { p.logger.LogDebug("Shared channels ping successful - XMPP bridge is healthy", "remote_cluster_id", remoteClusterID) return true } + +// OnSharedChannelsSyncMsg processes sync messages from Mattermost shared channels and routes them to XMPP +func (p *Plugin) OnSharedChannelsSyncMsg(msg *model.SyncMsg, rc *model.RemoteCluster) (model.SyncResponse, error) { + config := p.getConfiguration() + + // Initialize sync response + now := model.GetMillis() + response := model.SyncResponse{ + PostsLastUpdateAt: now, + UsersLastUpdateAt: now, + ReactionsLastUpdateAt: now, + } + + var remoteClusterID string + if rc != nil { + remoteClusterID = rc.RemoteId + } + + p.logger.LogDebug("OnSharedChannelsSyncMsg called", + "remote_cluster_id", remoteClusterID, + "channel_id", msg.ChannelId, + "post_count", len(msg.Posts)) + + // If sync is disabled, return success but don't process + if !config.EnableSync { + p.logger.LogDebug("Sync message received but sync is disabled", "remote_cluster_id", remoteClusterID) + return response, nil + } + + // Process each post in the sync message + var processedCount int + var errors []string + + for _, post := range msg.Posts { + if err := p.processSyncPost(post, msg.ChannelId, msg.Users); err != nil { + errorMsg := fmt.Sprintf("failed to process post %s: %v", post.Id, err) + errors = append(errors, errorMsg) + p.logger.LogError("Failed to process sync post", "post_id", post.Id, "error", err) + } else { + processedCount++ + } + } + + p.logger.LogInfo("Processed sync message", + "remote_cluster_id", remoteClusterID, + "channel_id", msg.ChannelId, + "processed_posts", processedCount, + "failed_posts", len(errors)) + + // If we have errors, return them + if len(errors) > 0 { + return response, fmt.Errorf("failed to process %d posts: %v", len(errors), errors) + } + + return response, nil +} + +// processSyncPost converts a Mattermost post to a bridge message and routes it to XMPP +func (p *Plugin) processSyncPost(post *model.Post, channelID string, users map[string]*model.User) error { + // Find the user who created this post + var postUser *model.User + p.logger.LogInfo("Processing sync post", "post_id", post.UserId, "users", users) + if users != nil { + postUser = users[post.UserId] + } + + // If user not found in sync data, try to get from API + if postUser == nil { + var err error + postUser, err = p.API.GetUser(post.UserId) + if err != nil { + p.logger.LogWarn("Failed to get user for post", "user_id", post.UserId, "post_id", post.Id, "error", err) + // Create a placeholder user + postUser = &model.User{ + Id: post.UserId, + Username: "unknown-user", + } + } + } + + // Create bridge message from Mattermost post + bridgeMessage := &pluginModel.BridgeMessage{ + SourceBridge: "mattermost", + SourceChannelID: channelID, + SourceUserID: postUser.Id, + SourceUserName: postUser.Username, + Content: post.Message, + MessageType: "text", // TODO: Handle other message types + Timestamp: time.Unix(post.CreateAt/1000, 0), + MessageID: post.Id, + ThreadID: post.RootId, + TargetBridges: []string{"xmpp"}, // Route to XMPP + Metadata: map[string]any{ + "original_post": post, + "channel_id": channelID, + }, + } + + // Create directional message for outgoing (Mattermost -> XMPP) + directionalMessage := &pluginModel.DirectionalMessage{ + BridgeMessage: bridgeMessage, + Direction: pluginModel.DirectionOutgoing, + } + + // Publish the message to the message bus for routing to XMPP bridge + if err := p.bridgeManager.PublishMessage(directionalMessage); err != nil { + return fmt.Errorf("failed to publish sync message to message bus: %w", err) + } + + p.logger.LogDebug("Successfully published sync message to message bus", + "post_id", post.Id, + "channel_id", channelID, + "user", postUser.Username) + + return nil +} diff --git a/server/model/bridge.go b/server/model/bridge.go index bd00672..dcf67df 100644 --- a/server/model/bridge.go +++ b/server/model/bridge.go @@ -20,11 +20,11 @@ const ( // CreateChannelMappingRequest contains information needed to create a channel mapping type CreateChannelMappingRequest struct { - ChannelID string // Mattermost channel ID - BridgeName string // Name of the bridge (e.g., "xmpp") - BridgeRoomID string // Remote room/channel ID (e.g., JID for XMPP) - UserID string // ID of user who triggered the mapping creation - TeamID string // Team ID where the channel belongs + ChannelID string // Mattermost channel ID + BridgeName string // Name of the bridge (e.g., "xmpp") + BridgeChannelID string // Remote room/channel ID (e.g., JID for XMPP) + UserID string // ID of user who triggered the mapping creation + TeamID string // Team ID where the channel belongs } // Validate checks if all required fields are present and valid @@ -35,8 +35,8 @@ func (r CreateChannelMappingRequest) Validate() error { if r.BridgeName == "" { return fmt.Errorf("bridgeName cannot be empty") } - if r.BridgeRoomID == "" { - return fmt.Errorf("bridgeRoomID cannot be empty") + if r.BridgeChannelID == "" { + return fmt.Errorf("bridgeChannelID cannot be empty") } if r.UserID == "" { return fmt.Errorf("userID cannot be empty") @@ -73,6 +73,9 @@ func (r DeleteChannelMappingRequest) Validate() error { } type BridgeManager interface { + // Start starts the bridge manager and message routing system. + Start() error + // RegisterBridge registers a bridge with the given name. Returns an error if the name is empty, // the bridge is nil, or a bridge with the same name is already registered. RegisterBridge(name string, bridge Bridge) error @@ -110,18 +113,21 @@ type BridgeManager interface { // OnPluginConfigurationChange propagates configuration changes to all registered bridges. // Returns an error if any bridge fails to update its configuration, but continues to // attempt updating all bridges. - OnPluginConfigurationChange(config any) error + OnPluginConfigurationChange(config *config.Configuration) error // CreateChannelMapping is called when a channel mapping is created. CreateChannelMapping(req CreateChannelMappingRequest) error // DeleteChannepMapping is called when a channel mapping is deleted. DeleteChannepMapping(req DeleteChannelMappingRequest) error + + // PublishMessage publishes a message to the message bus for routing to target bridges + PublishMessage(msg *DirectionalMessage) error } type Bridge interface { // UpdateConfiguration updates the bridge configuration - UpdateConfiguration(config any) error + UpdateConfiguration(config *config.Configuration) error // Start starts the bridge Start() error @@ -129,20 +135,20 @@ type Bridge interface { // Stop stops the bridge Stop() error - // CreateChannelMapping creates a mapping between a Mattermost channel ID and an bridge room ID. + // CreateChannelMapping creates a mapping between a Mattermost channel ID and a bridge channel ID. CreateChannelMapping(channelID, roomJID string) error - // GetChannelMapping retrieves the bridge room ID for a given Mattermost channel ID. + // GetChannelMapping retrieves the bridge channel ID for a given Mattermost channel ID. GetChannelMapping(channelID string) (string, error) - // DeleteChannelMapping removes a mapping between a Mattermost channel ID and a bridge room ID. + // DeleteChannelMapping removes a mapping between a Mattermost channel ID and a bridge channel ID. DeleteChannelMapping(channelID string) error - // RoomExists checks if a room/channel exists on the remote service. - RoomExists(roomID string) (bool, error) + // ChannelMappingExists checks if a room/channel exists on the remote service. + ChannelMappingExists(roomID string) (bool, error) - // GetRoomMapping retrieves the Mattermost channel ID for a given room ID (reverse lookup). - GetRoomMapping(roomID string) (string, error) + // GetChannelMappingForBridge retrieves the Mattermost channel ID for a given room ID from a specific bridge. + GetChannelMappingForBridge(bridgeName, roomID string) (string, error) // IsConnected checks if the bridge is connected to the remote service. IsConnected() bool @@ -152,6 +158,15 @@ type Bridge interface { // GetUserManager returns the user manager for this bridge. GetUserManager() BridgeUserManager + + // Message handling for bidirectional communication + GetMessageChannel() <-chan *DirectionalMessage + SendMessage(msg *BridgeMessage) error + GetMessageHandler() MessageHandler + GetUserResolver() UserResolver + + // GetRemoteID returns the remote ID used for shared channels registration + GetRemoteID() string } // BridgeUser represents a user connected to any bridge service diff --git a/server/model/message.go b/server/model/message.go new file mode 100644 index 0000000..747b0f2 --- /dev/null +++ b/server/model/message.go @@ -0,0 +1,88 @@ +package model + +import ( + "time" +) + +// MessageDirection indicates the direction of message flow +type MessageDirection string + +const ( + DirectionIncoming MessageDirection = "incoming" // From external system to us + DirectionOutgoing MessageDirection = "outgoing" // From us to external system +) + +// BridgeMessage represents a message that can be passed between any bridge types +type BridgeMessage struct { + // Source information + SourceBridge string // "xmpp", "mattermost", "slack", etc. + SourceChannelID string // Channel ID in source system + SourceUserID string // User ID in source system (JID, user ID, etc.) + SourceUserName string // Display name in source system + + // Message content (standardized on Markdown) + Content string // Markdown formatted message content + MessageType string // "text", "image", "file", etc. + + // Metadata + Timestamp time.Time // When message was received + MessageID string // Unique message ID from source + ThreadID string // Thread/reply ID (if applicable) + + // Routing hints + TargetBridges []string // Which bridges should receive this + Metadata map[string]any // Bridge-specific metadata +} + +// DirectionalMessage wraps a BridgeMessage with direction information +type DirectionalMessage struct { + *BridgeMessage + Direction MessageDirection +} + +// ExternalUser represents a user from any bridge system +type ExternalUser struct { + BridgeType string // "xmpp", "slack", etc. + ExternalUserID string // JID, Slack user ID, etc. + DisplayName string // How to display this user + MattermostUserID string // Mapped Mattermost user (if exists) +} + +// UserResolver handles user resolution for a specific bridge +type UserResolver interface { + // ResolveUser converts an external user ID to an ExternalUser + ResolveUser(externalUserID string) (*ExternalUser, error) + + // FormatUserMention formats a user mention for Markdown content + FormatUserMention(user *ExternalUser) string + + // GetDisplayName extracts display name from external user ID + GetDisplayName(externalUserID string) string +} + +// MessageBus handles routing messages between bridges +type MessageBus interface { + // Subscribe returns a channel that receives messages for the specified bridge + Subscribe(bridgeName string) <-chan *DirectionalMessage + + // Publish sends a message to the message bus for routing + Publish(msg *DirectionalMessage) error + + // Start begins message routing + Start() error + + // Stop ends message routing and cleans up resources + Stop() error +} + +// MessageHandler processes incoming messages for a bridge +type MessageHandler interface { + // ProcessMessage handles an incoming message + ProcessMessage(msg *DirectionalMessage) error + + // CanHandleMessage determines if this handler can process the message + CanHandleMessage(msg *BridgeMessage) bool + + // GetSupportedMessageTypes returns the message types this handler supports + GetSupportedMessageTypes() []string +} diff --git a/server/plugin.go b/server/plugin.go index f7a5f81..97bb691 100644 --- a/server/plugin.go +++ b/server/plugin.go @@ -14,7 +14,6 @@ import ( "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/store/kvstore" - "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp" "github.com/mattermost/mattermost/server/public/model" "github.com/mattermost/mattermost/server/public/plugin" "github.com/mattermost/mattermost/server/public/pluginapi" @@ -35,9 +34,6 @@ type Plugin struct { // commandClient is the client used to register and execute slash commands. commandClient command.Command - // xmppClient is the client used to communicate with XMPP servers. - xmppClient *xmpp.Client - // logger is the main plugin logger logger logger.Logger @@ -70,15 +66,11 @@ func (p *Plugin) OnActivate() error { p.kvstore = kvstore.NewKVStore(p.client) - p.initXMPPClient() - // Load configuration directly cfg := p.getConfiguration() - p.logger.LogDebug("Loaded configuration in OnActivate", "config", cfg) // Register the plugin for shared channels if err := p.registerForSharedChannels(); err != nil { - p.logger.LogError("Failed to register for shared channels", "error", err) return fmt.Errorf("failed to register for shared channels: %w", err) } @@ -87,11 +79,15 @@ func (p *Plugin) OnActivate() error { // Initialize and register bridges with current configuration if err := p.initBridges(*cfg); err != nil { - p.logger.LogError("Failed to initialize bridges", "error", err) return fmt.Errorf("failed to initialize bridges: %w", err) } - p.commandClient = command.NewCommandHandler(p.client, p.bridgeManager) + p.commandClient = command.NewCommandHandler(p.client, p.API, p.bridgeManager) + + // Start the bridge manager (this starts message routing) + if err := p.bridgeManager.Start(); err != nil { + return fmt.Errorf("failed to start bridge manager: %w", err) + } // Start all bridges for _, bridgeName := range p.bridgeManager.ListBridges() { @@ -145,18 +141,6 @@ func (p *Plugin) ExecuteCommand(c *plugin.Context, args *model.CommandArgs) (*mo return response, nil } -func (p *Plugin) initXMPPClient() { - cfg := p.getConfiguration() - p.xmppClient = xmpp.NewClient( - cfg.XMPPServerURL, - cfg.XMPPUsername, - cfg.XMPPPassword, - cfg.GetXMPPResource(), - p.remoteID, - p.logger, - ) -} - func (p *Plugin) initBridges(cfg config.Configuration) error { // Create and register XMPP bridge xmppBridge := xmppbridge.NewBridge( @@ -164,6 +148,7 @@ func (p *Plugin) initBridges(cfg config.Configuration) error { p.API, p.kvstore, &cfg, + p.remoteID, ) if err := p.bridgeManager.RegisterBridge("xmpp", xmppBridge); err != nil { @@ -176,6 +161,8 @@ func (p *Plugin) initBridges(cfg config.Configuration) error { p.API, p.kvstore, &cfg, + p.botUserID, + "mattermost", ) if err := p.bridgeManager.RegisterBridge("mattermost", mattermostBridge); err != nil { @@ -203,7 +190,7 @@ func (p *Plugin) registerForSharedChannels() error { PluginID: manifest.Id, CreatorID: botUserID, AutoShareDMs: false, - AutoInvited: true, + AutoInvited: false, } remoteID, appErr := p.API.RegisterPluginForSharedChannels(opts) diff --git a/server/store/kvstore/constants.go b/server/store/kvstore/constants.go index 579bd5c..698a82c 100644 --- a/server/store/kvstore/constants.go +++ b/server/store/kvstore/constants.go @@ -7,72 +7,17 @@ import "strings" // to ensure consistency and avoid key conflicts. const ( - // CurrentKVStoreVersion is the current version requiring migrations - CurrentKVStoreVersion = 2 - // KeyPrefixXMPPUser is the prefix for XMPP user ID -> Mattermost user ID mappings - KeyPrefixXMPPUser = "xmpp_user_" - // KeyPrefixMattermostUser is the prefix for Mattermost user ID -> XMPP user ID mappings - KeyPrefixMattermostUser = "mattermost_user_" - // KeyPrefixChannelMap is the prefix for bridge-agnostic channel mappings KeyPrefixChannelMap = "channel_map_" - - // KeyPrefixGhostUser is the prefix for Mattermost user ID -> XMPP ghost user ID cache - KeyPrefixGhostUser = "ghost_user_" - // KeyPrefixGhostRoom is the prefix for ghost user room membership tracking - KeyPrefixGhostRoom = "ghost_room_" - - // KeyPrefixXMPPEventPost is the prefix for XMPP event ID -> Mattermost post ID mappings - KeyPrefixXMPPEventPost = "xmpp_event_post_" - // KeyPrefixXMPPReaction is the prefix for XMPP reaction event ID -> reaction info mappings - KeyPrefixXMPPReaction = "xmpp_reaction_" - - // KeyStoreVersion is the key for tracking the current KV store schema version - KeyStoreVersion = "kv_store_version" - - // KeyPrefixLegacyDMMapping was the old prefix for DM mappings - KeyPrefixLegacyDMMapping = "dm_mapping_" - // KeyPrefixLegacyXMPPDMMapping was the old prefix for XMPP DM mappings - KeyPrefixLegacyXMPPDMMapping = "xmpp_dm_mapping_" ) // Helper functions for building KV store keys -// BuildXMPPUserKey creates a key for XMPP user -> Mattermost user mapping -func BuildXMPPUserKey(xmppUserID string) string { - return KeyPrefixXMPPUser + xmppUserID -} - -// BuildMattermostUserKey creates a key for Mattermost user -> XMPP user mapping -func BuildMattermostUserKey(mattermostUserID string) string { - return KeyPrefixMattermostUser + mattermostUserID -} - // BuildChannelMapKey creates a bridge-agnostic key for channel mappings func BuildChannelMapKey(bridgeName, identifier string) string { return KeyPrefixChannelMap + bridgeName + "_" + identifier } -// BuildGhostUserKey creates a key for ghost user cache -func BuildGhostUserKey(mattermostUserID string) string { - return KeyPrefixGhostUser + mattermostUserID -} - -// BuildGhostRoomKey creates a key for ghost user room membership -func BuildGhostRoomKey(mattermostUserID, roomID string) string { - return KeyPrefixGhostRoom + mattermostUserID + "_" + roomID -} - -// BuildXMPPEventPostKey creates a key for XMPP event -> post mapping -func BuildXMPPEventPostKey(xmppEventID string) string { - return KeyPrefixXMPPEventPost + xmppEventID -} - -// BuildXMPPReactionKey creates a key for XMPP reaction storage -func BuildXMPPReactionKey(reactionEventID string) string { - return KeyPrefixXMPPReaction + reactionEventID -} - // ExtractIdentifierFromChannelMapKey extracts the identifier from a bridge-agnostic channel map key func ExtractIdentifierFromChannelMapKey(key, bridgeName string) string { expectedPrefix := KeyPrefixChannelMap + bridgeName + "_" @@ -83,4 +28,4 @@ func ExtractIdentifierFromChannelMapKey(key, bridgeName string) string { return "" } return key[len(expectedPrefix):] -} +} \ No newline at end of file diff --git a/server/xmpp/client.go b/server/xmpp/client.go index aad5e24..537711a 100644 --- a/server/xmpp/client.go +++ b/server/xmpp/client.go @@ -6,10 +6,15 @@ import ( "crypto/tls" "encoding/xml" "fmt" + "net" + "net/url" "time" + "github.com/jellydator/ttlcache/v3" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" "mellium.im/sasl" + "mellium.im/xmlstream" "mellium.im/xmpp" "mellium.im/xmpp/disco" "mellium.im/xmpp/jid" @@ -18,6 +23,17 @@ import ( "mellium.im/xmpp/stanza" ) +const ( + // defaultOperationTimeout is the default timeout for XMPP operations + defaultOperationTimeout = 5 * time.Second + + // msgBufferSize is the buffer size for incoming message channels + msgBufferSize = 1000 + + // messageDedupeTTL is the TTL for message deduplication cache + messageDedupeTTL = 30 * time.Second +) + // Client represents an XMPP client for communicating with XMPP servers. type Client struct { serverURL string @@ -38,6 +54,12 @@ type Client struct { mux *mux.ServeMux sessionReady chan struct{} sessionServing bool + + // Message handling for bridge integration + incomingMessages chan *model.DirectionalMessage + + // Message deduplication cache to handle XMPP server duplicates + dedupeCache *ttlcache.Cache[string, time.Time] } // MessageRequest represents a request to send a message. @@ -85,22 +107,40 @@ type UserProfile struct { // NewClient creates a new XMPP client. func NewClient(serverURL, username, password, resource, remoteID string, logger logger.Logger) *Client { ctx, cancel := context.WithCancel(context.Background()) - mucClient := &muc.Client{} - mux := mux.New("jabber:client", muc.HandleClient(mucClient)) - return &Client{ - serverURL: serverURL, - username: username, - password: password, - resource: resource, - remoteID: remoteID, - logger: logger, - ctx: ctx, - cancel: cancel, - mucClient: mucClient, - mux: mux, - sessionReady: make(chan struct{}), + // Create TTL cache for message deduplication + dedupeCache := ttlcache.New( + ttlcache.WithTTL[string, time.Time](messageDedupeTTL), + ) + + // Start automatic cleanup in background + go dedupeCache.Start() + + client := &Client{ + serverURL: serverURL, + username: username, + password: password, + resource: resource, + remoteID: remoteID, + logger: logger, + ctx: ctx, + cancel: cancel, + sessionReady: make(chan struct{}), + incomingMessages: make(chan *model.DirectionalMessage, msgBufferSize), + dedupeCache: dedupeCache, } + + // Create MUC client and set up message handling + mucClient := &muc.Client{} + client.mucClient = mucClient + + // Create mux with MUC client and our message handler + mux := mux.New("jabber:client", + muc.HandleClient(mucClient), + mux.MessageFunc(stanza.GroupChatMessage, xml.Name{}, client.handleIncomingMessage)) + client.mux = mux + + return client } // NewClientWithTLS creates a new XMPP client with custom TLS configuration. @@ -115,6 +155,44 @@ func (c *Client) SetServerDomain(domain string) { c.serverDomain = domain } +// parseServerAddress parses a server URL and returns a host:port address +func (c *Client) parseServerAddress(serverURL string) (string, error) { + // Handle simple host:port format (e.g., "localhost:5222") + if host, port, err := net.SplitHostPort(serverURL); err == nil { + // Already in host:port format, validate and return + if host == "" { + return "", fmt.Errorf("empty hostname in server URL: %s", serverURL) + } + if port == "" { + return "", fmt.Errorf("empty port in server URL: %s", serverURL) + } + return serverURL, nil + } + + // Try parsing as URL (e.g., "xmpp://localhost:5222" or "xmpps://localhost:5223") + parsedURL, err := url.Parse(serverURL) + if err != nil { + return "", fmt.Errorf("invalid server URL format: %w", err) + } + + host := parsedURL.Hostname() + if host == "" { + return "", fmt.Errorf("no hostname found in server URL: %s", serverURL) + } + + port := parsedURL.Port() + // Use default XMPP port if not specified + if port == "" { + if parsedURL.Scheme == "xmpps" { + port = "5223" + } else { + port = "5222" + } + } + + return host + ":" + port, nil +} + // Connect establishes connection to the XMPP server func (c *Client) Connect() error { if c.session != nil { @@ -150,15 +228,38 @@ func (c *Client) Connect() error { } } - // Use DialClientSession for proper SASL authentication - c.session, err = xmpp.DialClientSession( - c.ctx, + // Create a timeout context for the connection attempt (30 seconds) + connectCtx, connectCancel := context.WithTimeout(c.ctx, 30*time.Second) + defer connectCancel() + + c.logger.LogDebug("Connecting to XMPP server", "server_url", c.serverURL) + + // Parse server address for direct connection + // We use this instead of using the JID for the user to avoid SRV lookups that can fail + // depending on network/dns configuration and to ensure we connect directly to the specified + // server. + serverAddr, err := c.parseServerAddress(c.serverURL) + if err != nil { + return fmt.Errorf("failed to parse server URL %s: %w", c.serverURL, err) + } + + var d net.Dialer + conn, err := d.DialContext(connectCtx, "tcp", serverAddr) + if err != nil { + return fmt.Errorf("failed to dial XMPP server at %s: %w", serverAddr, err) + } + + // Create client session with direct connection + c.session, err = xmpp.NewClientSession( + connectCtx, c.jidAddr, + conn, xmpp.StartTLS(tlsConfig), xmpp.SASL("", c.password, sasl.Plain), xmpp.BindResource(), ) if err != nil { + conn.Close() return fmt.Errorf("failed to establish XMPP session: %w", err) } @@ -171,9 +272,12 @@ func (c *Client) Connect() error { if !c.sessionServing { return fmt.Errorf("failed to start session serving") } + c.logger.LogInfo("XMPP client connected successfully", "jid", c.jidAddr.String()) return nil - case <-time.After(5 * time.Second): + case <-time.After(10 * time.Second): return fmt.Errorf("timeout waiting for session to be ready") + case <-c.ctx.Done(): + return fmt.Errorf("connection cancelled: %w", c.ctx.Err()) } } @@ -206,18 +310,47 @@ func (c *Client) serveSession() { // Disconnect closes the XMPP connection func (c *Client) Disconnect() error { - if c.session != nil { - err := c.session.Close() + if c.session == nil { + return nil // Already disconnected + } + + c.logger.LogInfo("Disconnecting XMPP client", "jid", c.jidAddr.String()) + + // Send offline presence before disconnecting to properly leave rooms + if err := c.SetOfflinePresence(); err != nil { + c.logger.LogWarn("Failed to set offline presence before disconnect", "error", err) + // Don't fail the disconnect for presence issues + } + + // Close the session with a timeout to prevent hanging + sessionCloseCtx, cancel := context.WithTimeout(context.Background(), defaultOperationTimeout) + defer cancel() + + sessionCloseDone := make(chan error, 1) + go func() { + sessionCloseDone <- c.session.Close() + }() + + select { + case err := <-sessionCloseDone: c.session = nil if err != nil { + c.logger.LogWarn("Error closing XMPP session", "error", err) return fmt.Errorf("failed to close XMPP session: %w", err) } + case <-sessionCloseCtx.Done(): + c.logger.LogWarn("Timeout closing XMPP session, forcing disconnect") + c.session = nil + // Continue with cleanup even on timeout } - if c.cancel != nil { - c.cancel() - } + // Stop the TTL cache cleanup goroutine + c.dedupeCache.Stop() + // Cancel the client context + c.cancel() + + c.logger.LogInfo("XMPP client disconnected successfully") return nil } @@ -418,6 +551,30 @@ func (c *Client) SetOnlinePresence() error { return nil } +// SetOfflinePresence sends an offline presence stanza to indicate the client is going offline +func (c *Client) SetOfflinePresence() error { + if c.session == nil { + return fmt.Errorf("XMPP session not established") + } + + // Create presence stanza indicating we're unavailable + presence := stanza.Presence{ + Type: stanza.UnavailablePresence, + From: c.jidAddr, + } + + // Create a context with timeout for the presence update + ctx, cancel := context.WithTimeout(context.Background(), defaultOperationTimeout) + defer cancel() + + // Send the presence stanza + if err := c.session.Encode(ctx, presence); err != nil { + return fmt.Errorf("failed to send offline presence: %w", err) + } + + return nil +} + // CheckRoomExists verifies if an XMPP room exists and is accessible using disco#info func (c *Client) CheckRoomExists(roomJID string) (bool, error) { if c.session == nil { @@ -535,3 +692,118 @@ func (c *Client) Ping() error { c.logger.LogDebug("XMPP ping successful", "duration", duration) return nil } + +// GetMessageChannel returns the channel for incoming messages (Bridge interface) +func (c *Client) GetMessageChannel() <-chan *model.DirectionalMessage { + return c.incomingMessages +} + +// handleIncomingMessage processes incoming XMPP message stanzas +func (c *Client) handleIncomingMessage(msg stanza.Message, t xmlstream.TokenReadEncoder) error { + c.logger.LogDebug("Received XMPP message", + "from", msg.From.String(), + "to", msg.To.String(), + "type", fmt.Sprintf("%v", msg.Type)) + + // Only process groupchat messages for now (MUC messages from channels) + if msg.Type != stanza.GroupChatMessage { + c.logger.LogDebug("Ignoring non-groupchat message", "type", fmt.Sprintf("%v", msg.Type)) + return nil + } + + // Parse the message body from the token reader + var msgWithBody struct { + stanza.Message + Body string `xml:"body"` + } + msgWithBody.Message = msg + + d := xml.NewTokenDecoder(t) + if err := d.DecodeElement(&msgWithBody, nil); err != nil { + c.logger.LogError("Failed to decode message body", "error", err) + return err + } + + if msgWithBody.Body == "" { + c.logger.LogDebug("Message has no body, ignoring") + return nil + } + + // Deduplicate messages using message ID and TTL cache + if msg.ID != "" { + // Check if this message ID is already in the cache (indicates duplicate) + if c.dedupeCache.Has(msg.ID) { + return nil + } + + // Record this message in the cache with TTL + c.dedupeCache.Set(msg.ID, time.Now(), ttlcache.DefaultTTL) + } + + // Extract channel and user information from JIDs + channelID, err := c.extractChannelID(msg.From) + if err != nil { + c.logger.LogError("Failed to extract channel ID from JID", "from", msg.From.String(), "error", err) + return nil + } + + userID, userName := c.extractUserInfo(msg.From) + + // Create BridgeMessage + bridgeMsg := &model.BridgeMessage{ + SourceBridge: "xmpp", + SourceChannelID: channelID, + SourceUserID: userID, + SourceUserName: userName, + Content: msgWithBody.Body, // Already Markdown compatible + MessageType: "text", + Timestamp: time.Now(), // XMPP doesn't always provide timestamps + MessageID: msg.ID, + TargetBridges: []string{}, // Will be routed to all other bridges + Metadata: map[string]any{ + "xmpp_from": msg.From.String(), + "xmpp_to": msg.To.String(), + }, + } + + // Wrap in directional message + directionalMsg := &model.DirectionalMessage{ + BridgeMessage: bridgeMsg, + Direction: model.DirectionIncoming, + } + + // Send to message channel (non-blocking) + select { + case c.incomingMessages <- directionalMsg: + // Message queued successfully + default: + c.logger.LogWarn("Message channel full, dropping message", + "channel_id", channelID, + "user_id", userID) + } + + return nil +} + +// extractChannelID extracts the channel ID (room bare JID) from a message JID +func (c *Client) extractChannelID(from jid.JID) (string, error) { + // For MUC messages, the channel ID is the bare JID (without resource/nickname) + return from.Bare().String(), nil +} + +// extractUserInfo extracts user ID and display name from a message JID +func (c *Client) extractUserInfo(from jid.JID) (string, string) { + // For MUC messages, the resource part is the nickname + nickname := from.Resourcepart() + + // Use the full JID as user ID for XMPP + userID := from.String() + + // Use nickname as display name if available, otherwise use full JID + displayName := nickname + if displayName == "" { + displayName = from.String() + } + + return userID, displayName +}