diff --git a/go.mod b/go.mod index b1bc908..efd6373 100644 --- a/go.mod +++ b/go.mod @@ -102,7 +102,6 @@ require ( github.com/hashicorp/yamux v0.1.2 // indirect github.com/hexops/gotextdiff v1.0.3 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/jellydator/ttlcache/v3 v3.4.0 // indirect github.com/jgautheron/goconst v1.7.1 // indirect github.com/jingyugao/rowserrcheck v1.1.1 // indirect github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect @@ -211,7 +210,7 @@ require ( golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f // indirect golang.org/x/mod v0.22.0 // indirect golang.org/x/net v0.34.0 // indirect - golang.org/x/sync v0.15.0 // indirect + golang.org/x/sync v0.10.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/term v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect diff --git a/go.sum b/go.sum index 8d1f976..9a27f5c 100644 --- a/go.sum +++ b/go.sum @@ -366,8 +366,6 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= -github.com/jellydator/ttlcache/v3 v3.4.0 h1:YS4P125qQS0tNhtL6aeYkheEaB/m8HCqdMMP4mnWdTY= -github.com/jellydator/ttlcache/v3 v3.4.0/go.mod h1:Hw9EgjymziQD3yGsQdf1FqFdpp7YjFMd4Srg5EJlgD4= github.com/jgautheron/goconst v1.7.1 h1:VpdAG7Ca7yvvJk5n8dMwQhfEZJh95kl/Hl9S1OI5Jkk= github.com/jgautheron/goconst v1.7.1/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4= github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c= @@ -869,8 +867,6 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= -golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/server/bridge/manager.go b/server/bridge/manager.go index 9984410..1137368 100644 --- a/server/bridge/manager.go +++ b/server/bridge/manager.go @@ -1,11 +1,9 @@ package bridge import ( - "context" "fmt" "sync" - "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" mmModel "github.com/mattermost/mattermost/server/public/model" @@ -14,15 +12,11 @@ import ( // BridgeManager manages multiple bridge instances type BridgeManager struct { - bridges map[string]model.Bridge - mu sync.RWMutex - logger logger.Logger - api plugin.API - remoteID string - messageBus model.MessageBus - routingCtx context.Context - routingCancel context.CancelFunc - routingWg sync.WaitGroup + bridges map[string]model.Bridge + mu sync.RWMutex + logger logger.Logger + api plugin.API + remoteID string } // NewBridgeManager creates a new bridge manager @@ -34,16 +28,11 @@ func NewBridgeManager(logger logger.Logger, api plugin.API, remoteID string) mod panic("plugin API cannot be nil") } - ctx, cancel := context.WithCancel(context.Background()) - return &BridgeManager{ - bridges: make(map[string]model.Bridge), - logger: logger, - api: api, - remoteID: remoteID, - messageBus: NewMessageBus(logger), - routingCtx: ctx, - routingCancel: cancel, + bridges: make(map[string]model.Bridge), + logger: logger, + api: api, + remoteID: remoteID, } } @@ -66,9 +55,6 @@ func (m *BridgeManager) RegisterBridge(name string, bridge model.Bridge) error { m.bridges[name] = bridge m.logger.LogInfo("Bridge registered", "name", name) - // Subscribe bridge to message bus - go m.startBridgeMessageHandler(name, bridge) - return nil } @@ -163,20 +149,6 @@ func (m *BridgeManager) ListBridges() []string { return bridges } -// Start starts the bridge manager and message routing system -func (m *BridgeManager) Start() error { - m.logger.LogInfo("Starting bridge manager") - - // Start the message routing system - if err := m.StartMessageRouting(); err != nil { - m.logger.LogError("Failed to start message routing", "error", err) - return fmt.Errorf("failed to start message routing: %w", err) - } - - m.logger.LogInfo("Bridge manager started successfully") - return nil -} - // HasBridge checks if a bridge with the given name is registered func (m *BridgeManager) HasBridge(name string) bool { m.mu.RLock() @@ -201,11 +173,6 @@ func (m *BridgeManager) Shutdown() error { m.logger.LogInfo("Shutting down bridge manager", "bridge_count", len(m.bridges)) - // Stop message routing first - if err := m.StopMessageRouting(); err != nil { - m.logger.LogError("Failed to stop message routing during shutdown", "error", err) - } - var errors []error for name, bridge := range m.bridges { if bridge.IsConnected() { @@ -229,7 +196,7 @@ func (m *BridgeManager) Shutdown() error { } // OnPluginConfigurationChange propagates configuration changes to all registered bridges -func (m *BridgeManager) OnPluginConfigurationChange(config *config.Configuration) error { +func (m *BridgeManager) OnPluginConfigurationChange(config any) error { m.mu.RLock() defer m.mu.RUnlock() @@ -264,7 +231,7 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque return fmt.Errorf("invalid mapping request: %w", err) } - m.logger.LogDebug("Creating channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID, "user_id", req.UserID, "team_id", req.TeamID) + m.logger.LogDebug("Creating channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID, "user_id", req.UserID, "team_id", req.TeamID) // Get the specific bridge bridge, err := m.GetBridge(req.BridgeName) @@ -279,41 +246,41 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque } // NEW: Check if room already mapped to another channel - existingChannelID, err := bridge.GetChannelMapping(req.BridgeChannelID) + existingChannelID, err := bridge.GetRoomMapping(req.BridgeRoomID) if err != nil { - m.logger.LogError("Failed to check channel mapping", "bridge_channel_id", req.BridgeChannelID, "error", err) - return fmt.Errorf("failed to check channel mapping: %w", err) + m.logger.LogError("Failed to check room mapping", "bridge_room_id", req.BridgeRoomID, "error", err) + return fmt.Errorf("failed to check room mapping: %w", err) } if existingChannelID != "" { - m.logger.LogWarn("Channel already mapped to another channel", - "bridge_channel_id", req.BridgeChannelID, + m.logger.LogWarn("Room already mapped to another channel", + "bridge_room_id", req.BridgeRoomID, "existing_channel_id", existingChannelID, "requested_channel_id", req.ChannelID) - return fmt.Errorf("channel '%s' is already mapped to channel '%s'", req.BridgeChannelID, existingChannelID) + return fmt.Errorf("room '%s' is already mapped to channel '%s'", req.BridgeRoomID, existingChannelID) } // NEW: Check if room exists on target bridge - channelExists, err := bridge.ChannelMappingExists(req.BridgeChannelID) + roomExists, err := bridge.RoomExists(req.BridgeRoomID) if err != nil { - m.logger.LogError("Failed to check channel existence", "bridge_channel_id", req.BridgeChannelID, "error", err) - return fmt.Errorf("failed to check channel existence: %w", err) + m.logger.LogError("Failed to check room existence", "bridge_room_id", req.BridgeRoomID, "error", err) + return fmt.Errorf("failed to check room existence: %w", err) } - if !channelExists { - m.logger.LogWarn("Channel does not exist on bridge", - "bridge_channel_id", req.BridgeChannelID, + if !roomExists { + m.logger.LogWarn("Room does not exist on bridge", + "bridge_room_id", req.BridgeRoomID, "bridge_name", req.BridgeName) - return fmt.Errorf("channel '%s' does not exist on %s bridge", req.BridgeChannelID, req.BridgeName) + return fmt.Errorf("room '%s' does not exist on %s bridge", req.BridgeRoomID, req.BridgeName) } - m.logger.LogDebug("Channel validation passed", - "bridge_channel_id", req.BridgeChannelID, + m.logger.LogDebug("Room validation passed", + "bridge_room_id", req.BridgeRoomID, "bridge_name", req.BridgeName, - "channel_exists", channelExists, + "room_exists", roomExists, "already_mapped", false) // Create the channel mapping on the receiving bridge - if err = bridge.CreateChannelMapping(req.ChannelID, req.BridgeChannelID); err != nil { - m.logger.LogError("Failed to create channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID, "error", err) + if err = bridge.CreateChannelMapping(req.ChannelID, req.BridgeRoomID); err != nil { + m.logger.LogError("Failed to create channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID, "error", err) return fmt.Errorf("failed to create channel mapping for bridge '%s': %w", req.BridgeName, err) } @@ -324,19 +291,19 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque } // Create the channel mapping in the Mattermost bridge - if err = mattermostBridge.CreateChannelMapping(req.ChannelID, req.BridgeChannelID); err != nil { - m.logger.LogError("Failed to create channel mapping in Mattermost bridge", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID, "error", err) + if err = mattermostBridge.CreateChannelMapping(req.ChannelID, req.BridgeRoomID); err != nil { + m.logger.LogError("Failed to create channel mapping in Mattermost bridge", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID, "error", err) return fmt.Errorf("failed to create channel mapping in Mattermost bridge: %w", err) } // Share the channel using Mattermost's shared channels API if err = m.shareChannel(req); err != nil { - m.logger.LogError("Failed to share channel", "channel_id", req.ChannelID, "bridge_channel_id", req.BridgeChannelID, "error", err) + m.logger.LogError("Failed to share channel", "channel_id", req.ChannelID, "bridge_room_id", req.BridgeRoomID, "error", err) // Don't fail the entire operation if sharing fails, but log the error m.logger.LogWarn("Channel mapping created but sharing failed - channel may not sync properly") } - m.logger.LogInfo("Successfully created channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID) + m.logger.LogInfo("Successfully created channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID) return nil } @@ -403,9 +370,9 @@ func (m *BridgeManager) shareChannel(req model.CreateChannelMappingRequest) erro TeamId: req.TeamID, Home: true, ReadOnly: false, - ShareName: model.SanitizeShareName(fmt.Sprintf("bridge-%s", req.BridgeChannelID)), - ShareDisplayName: fmt.Sprintf("Bridge: %s", req.BridgeChannelID), - SharePurpose: fmt.Sprintf("Shared channel bridged to %s", req.BridgeChannelID), + ShareName: model.SanitizeShareName(fmt.Sprintf("bridge-%s", req.BridgeRoomID)), + ShareDisplayName: fmt.Sprintf("Bridge: %s", req.BridgeRoomID), + SharePurpose: fmt.Sprintf("Shared channel bridged to %s", req.BridgeRoomID), ShareHeader: "test header", CreatorId: req.UserID, RemoteId: m.remoteID, @@ -437,127 +404,3 @@ func (m *BridgeManager) unshareChannel(channelID string) error { return nil } - -// startBridgeMessageHandler starts message handling for a specific bridge -func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge model.Bridge) { - m.logger.LogDebug("Starting message handler for bridge", "bridge", bridgeName) - - // Subscribe to message bus - messageChannel := m.messageBus.Subscribe(bridgeName) - - // Start message routing goroutine - m.routingWg.Add(1) - go func() { - defer m.routingWg.Done() - defer m.logger.LogDebug("Message handler stopped for bridge", "bridge", bridgeName) - - for { - select { - case msg, ok := <-messageChannel: - if !ok { - m.logger.LogDebug("Message channel closed for bridge", "bridge", bridgeName) - return - } - - if err := m.handleBridgeMessage(bridgeName, bridge, msg); err != nil { - m.logger.LogError("Failed to handle message for bridge", - "bridge", bridgeName, - "source_bridge", msg.SourceBridge, - "error", err) - } - - case <-m.routingCtx.Done(): - m.logger.LogDebug("Context cancelled, stopping message handler", "bridge", bridgeName) - return - } - } - }() - - // Listen to bridge's outgoing messages - m.routingWg.Add(1) - go func() { - defer m.routingWg.Done() - defer m.logger.LogDebug("Bridge message listener stopped", "bridge", bridgeName) - - bridgeMessageChannel := bridge.GetMessageChannel() - for { - select { - case msg, ok := <-bridgeMessageChannel: - if !ok { - m.logger.LogDebug("Bridge message channel closed", "bridge", bridgeName) - return - } - - if err := m.messageBus.Publish(msg); err != nil { - m.logger.LogError("Failed to publish message from bridge", - "bridge", bridgeName, - "direction", msg.Direction, - "error", err) - } - - case <-m.routingCtx.Done(): - m.logger.LogDebug("Context cancelled, stopping bridge listener", "bridge", bridgeName) - return - } - } - }() -} - -// handleBridgeMessage processes an incoming message for a specific bridge -func (m *BridgeManager) handleBridgeMessage(bridgeName string, bridge model.Bridge, msg *model.DirectionalMessage) error { - m.logger.LogDebug("Handling message for bridge", - "target_bridge", bridgeName, - "source_bridge", msg.SourceBridge, - "direction", msg.Direction, - "channel_id", msg.SourceChannelID) - - // Get the bridge's message handler - handler := bridge.GetMessageHandler() - if handler == nil { - return fmt.Errorf("bridge %s does not have a message handler", bridgeName) - } - - // Check if the handler can process this message - if !handler.CanHandleMessage(msg.BridgeMessage) { - m.logger.LogDebug("Bridge cannot handle message", - "bridge", bridgeName, - "message_type", msg.MessageType) - return nil // Not an error, just skip - } - - // Process the message - return handler.ProcessMessage(msg) -} - -// StartMessageRouting starts the message bus and routing system -func (m *BridgeManager) StartMessageRouting() error { - m.logger.LogInfo("Starting message routing system") - return m.messageBus.Start() -} - -// StopMessageRouting stops the message bus and routing system -func (m *BridgeManager) StopMessageRouting() error { - m.logger.LogInfo("Stopping message routing system") - - // Cancel routing context - if m.routingCancel != nil { - m.routingCancel() - } - - // Wait for all routing goroutines to finish - m.routingWg.Wait() - - // Stop the message bus - return m.messageBus.Stop() -} - -// PublishMessage publishes a message to the message bus for routing to target bridges -func (m *BridgeManager) PublishMessage(msg *model.DirectionalMessage) error { - m.logger.LogDebug("Publishing message to message bus", - "source_bridge", msg.SourceBridge, - "direction", msg.Direction, - "target_bridges", msg.TargetBridges, - "message_id", msg.MessageID) - - return m.messageBus.Publish(msg) -} diff --git a/server/bridge/mattermost/bridge.go b/server/bridge/mattermost/bridge.go index e618535..79baf6e 100644 --- a/server/bridge/mattermost/bridge.go +++ b/server/bridge/mattermost/bridge.go @@ -14,24 +14,12 @@ import ( "github.com/mattermost/mattermost/server/public/plugin" ) -const ( - // defaultMessageBufferSize is the buffer size for incoming message channels - defaultMessageBufferSize = 1000 -) - // mattermostBridge handles syncing messages between Mattermost instances type mattermostBridge struct { logger logger.Logger api plugin.API kvstore kvstore.KVStore userManager pluginModel.BridgeUserManager - botUserID string // Bot user ID for posting messages - remoteID string // Remote ID for shared channels - - // Message handling - messageHandler *mattermostMessageHandler - userResolver *mattermostUserResolver - incomingMessages chan *pluginModel.DirectionalMessage // Connection management connected atomic.Bool @@ -48,41 +36,27 @@ type mattermostBridge struct { } // NewBridge creates a new Mattermost bridge -func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration, botUserID, remoteID string) pluginModel.Bridge { +func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration) pluginModel.Bridge { ctx, cancel := context.WithCancel(context.Background()) - b := &mattermostBridge{ - logger: log, - api: api, - kvstore: kvstore, - botUserID: botUserID, - remoteID: remoteID, - ctx: ctx, - cancel: cancel, - channelMappings: make(map[string]string), - config: cfg, - userManager: bridge.NewUserManager("mattermost", log), - incomingMessages: make(chan *pluginModel.DirectionalMessage, defaultMessageBufferSize), + bridge := &mattermostBridge{ + logger: log, + api: api, + kvstore: kvstore, + ctx: ctx, + cancel: cancel, + channelMappings: make(map[string]string), + config: cfg, + userManager: bridge.NewUserManager("mattermost", log), } - // Initialize handlers after bridge is created - b.messageHandler = newMessageHandler(b) - b.userResolver = newUserResolver(b) - - return b -} - -// getConfiguration safely retrieves the current configuration -func (b *mattermostBridge) getConfiguration() *config.Configuration { - b.configMu.RLock() - defer b.configMu.RUnlock() - return b.config + return bridge } // UpdateConfiguration updates the bridge configuration -func (b *mattermostBridge) UpdateConfiguration(cfg *config.Configuration) error { - // Validate configuration using built-in validation - if err := cfg.IsValid(); err != nil { - return fmt.Errorf("invalid configuration: %w", err) +func (b *mattermostBridge) UpdateConfiguration(newConfig any) error { + cfg, ok := newConfig.(*config.Configuration) + if !ok { + return fmt.Errorf("invalid configuration type") } b.configMu.Lock() @@ -308,8 +282,8 @@ func (b *mattermostBridge) DeleteChannelMapping(channelID string) error { return nil } -// ChannelMappingExists checks if a Mattermost channel exists on the server -func (b *mattermostBridge) ChannelMappingExists(roomID string) (bool, error) { +// RoomExists checks if a Mattermost channel exists on the server +func (b *mattermostBridge) RoomExists(roomID string) (bool, error) { if b.api == nil { return false, fmt.Errorf("Mattermost API not initialized") } @@ -358,47 +332,7 @@ func (b *mattermostBridge) GetRoomMapping(roomID string) (string, error) { return channelID, nil } -// GetChannelMappingForBridge retrieves the Mattermost channel ID for a given room ID from a specific bridge -func (b *mattermostBridge) GetChannelMappingForBridge(bridgeName, roomID string) (string, error) { - channelIDBytes, err := b.kvstore.Get(kvstore.BuildChannelMapKey(bridgeName, roomID)) - if err != nil { - // No mapping found is not an error, just return empty string - b.logger.LogDebug("No channel mapping found for bridge room", "bridge_name", bridgeName, "room_id", roomID) - return "", nil - } - - channelID := string(channelIDBytes) - b.logger.LogDebug("Found channel mapping for bridge room", "bridge_name", bridgeName, "room_id", roomID, "channel_id", channelID) - - return channelID, nil -} - // GetUserManager returns the user manager for this bridge func (b *mattermostBridge) GetUserManager() pluginModel.BridgeUserManager { return b.userManager } - -// GetMessageChannel returns the channel for incoming messages from Mattermost -func (b *mattermostBridge) GetMessageChannel() <-chan *pluginModel.DirectionalMessage { - return b.incomingMessages -} - -// SendMessage sends a message to a Mattermost channel -func (b *mattermostBridge) SendMessage(msg *pluginModel.BridgeMessage) error { - return b.messageHandler.postMessageToMattermost(msg) -} - -// GetMessageHandler returns the message handler for this bridge -func (b *mattermostBridge) GetMessageHandler() pluginModel.MessageHandler { - return b.messageHandler -} - -// GetUserResolver returns the user resolver for this bridge -func (b *mattermostBridge) GetUserResolver() pluginModel.UserResolver { - return b.userResolver -} - -// GetRemoteID returns the remote ID used for shared channels registration -func (b *mattermostBridge) GetRemoteID() string { - return b.remoteID -} diff --git a/server/bridge/mattermost/message_handler.go b/server/bridge/mattermost/message_handler.go deleted file mode 100644 index 691ae7f..0000000 --- a/server/bridge/mattermost/message_handler.go +++ /dev/null @@ -1,208 +0,0 @@ -package mattermost - -import ( - "fmt" - "strings" - - "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" - pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" - mmModel "github.com/mattermost/mattermost/server/public/model" -) - -// mattermostMessageHandler handles incoming messages for the Mattermost bridge -type mattermostMessageHandler struct { - bridge *mattermostBridge - logger logger.Logger -} - -// newMessageHandler creates a new Mattermost message handler -func newMessageHandler(bridge *mattermostBridge) *mattermostMessageHandler { - return &mattermostMessageHandler{ - bridge: bridge, - logger: bridge.logger, - } -} - -// ProcessMessage processes an incoming message for the Mattermost bridge -func (h *mattermostMessageHandler) ProcessMessage(msg *pluginModel.DirectionalMessage) error { - h.logger.LogDebug("Processing message for Mattermost bridge", - "source_bridge", msg.SourceBridge, - "direction", msg.Direction, - "channel_id", msg.SourceChannelID) - - // Skip messages that originated from Mattermost to prevent loops - if msg.SourceBridge == "mattermost" { - h.logger.LogDebug("Skipping Mattermost-originated message to prevent loop") - return nil - } - - // For incoming messages to Mattermost, we post them to Mattermost channels - if msg.Direction == pluginModel.DirectionIncoming { - return h.postMessageToMattermost(msg.BridgeMessage) - } - - h.logger.LogDebug("Ignoring outgoing message for Mattermost bridge") - return nil -} - -// CanHandleMessage determines if this handler can process the message -func (h *mattermostMessageHandler) CanHandleMessage(msg *pluginModel.BridgeMessage) bool { - // Mattermost bridge can handle text messages that didn't originate from Mattermost - return msg.MessageType == "text" && msg.SourceBridge != "mattermost" -} - -// GetSupportedMessageTypes returns the message types this handler supports -func (h *mattermostMessageHandler) GetSupportedMessageTypes() []string { - return []string{"text"} -} - -// postMessageToMattermost posts a message to a Mattermost channel -func (h *mattermostMessageHandler) postMessageToMattermost(msg *pluginModel.BridgeMessage) error { - if h.bridge.api == nil { - return fmt.Errorf("Mattermost API not initialized") - } - - // Get the Mattermost channel ID from the channel mapping using the source bridge name - channelID, err := h.bridge.GetChannelMappingForBridge(msg.SourceBridge, msg.SourceChannelID) - if err != nil { - return fmt.Errorf("failed to get channel mapping: %w", err) - } - if channelID == "" { - // Check if the source channel ID is already a Mattermost channel ID - channelID = msg.SourceChannelID - } - - // Verify the channel exists - channel, appErr := h.bridge.api.GetChannel(channelID) - if appErr != nil { - return fmt.Errorf("failed to get channel %s: %w", channelID, appErr) - } - if channel == nil { - return fmt.Errorf("channel %s not found", channelID) - } - - // Format the message content - content := h.formatMessageContent(msg) - - // Create the post - post := &mmModel.Post{ - ChannelId: channelID, - UserId: h.bridge.botUserID, - Message: content, - Type: mmModel.PostTypeDefault, - Props: map[string]interface{}{ - "from_bridge": msg.SourceBridge, - "bridge_user_id": msg.SourceUserID, - "bridge_user_name": msg.SourceUserName, - "bridge_message_id": msg.MessageID, - "bridge_timestamp": msg.Timestamp.Unix(), - }, - } - - // Add thread ID if present - if msg.ThreadID != "" { - post.RootId = msg.ThreadID - } - - // Post the message as the plugin bot - createdPost, appErr := h.bridge.api.CreatePost(post) - if appErr != nil { - return fmt.Errorf("failed to create post in channel %s: %w", channelID, appErr) - } - - h.logger.LogDebug("Message posted to Mattermost channel", - "channel_id", channelID, - "post_id", createdPost.Id, - "source_bridge", msg.SourceBridge, - "content_length", len(content)) - - return nil -} - -// formatMessageContent formats the message content for Mattermost -func (h *mattermostMessageHandler) formatMessageContent(msg *pluginModel.BridgeMessage) string { - // For messages from other bridges, prefix with the bridge info and user name - if msg.SourceUserName != "" { - bridgeIcon := h.getBridgeIcon(msg.SourceBridge) - return fmt.Sprintf("%s **%s**: %s", bridgeIcon, msg.SourceUserName, msg.Content) - } - return msg.Content -} - -// getBridgeIcon returns an icon/emoji for the source bridge -func (h *mattermostMessageHandler) getBridgeIcon(bridgeType string) string { - switch bridgeType { - case "xmpp": - return ":speech_balloon:" // Chat bubble emoji for XMPP - case "slack": - return ":slack:" // Slack emoji if available - case "discord": - return ":discord:" // Discord emoji if available - default: - return ":bridge_at_night:" // Generic bridge emoji - } -} - -// mattermostUserResolver handles user resolution for the Mattermost bridge -type mattermostUserResolver struct { - bridge *mattermostBridge - logger logger.Logger -} - -// newUserResolver creates a new Mattermost user resolver -func newUserResolver(bridge *mattermostBridge) *mattermostUserResolver { - return &mattermostUserResolver{ - bridge: bridge, - logger: bridge.logger, - } -} - -// ResolveUser converts an external user ID to an ExternalUser -func (r *mattermostUserResolver) ResolveUser(externalUserID string) (*pluginModel.ExternalUser, error) { - r.logger.LogDebug("Resolving Mattermost user", "user_id", externalUserID) - - // For Mattermost, the external user ID is the Mattermost user ID - user, appErr := r.bridge.api.GetUser(externalUserID) - if appErr != nil { - return nil, fmt.Errorf("failed to get Mattermost user: %w", appErr) - } - - if user == nil { - return nil, fmt.Errorf("Mattermost user not found: %s", externalUserID) - } - - return &pluginModel.ExternalUser{ - BridgeType: "mattermost", - ExternalUserID: externalUserID, - DisplayName: r.GetDisplayName(externalUserID), - MattermostUserID: externalUserID, // Same as external ID for Mattermost - }, nil -} - -// FormatUserMention formats a user mention for Markdown content -func (r *mattermostUserResolver) FormatUserMention(user *pluginModel.ExternalUser) string { - // For Mattermost, use the standard @username format - return fmt.Sprintf("@%s", user.DisplayName) -} - -// GetDisplayName extracts display name from external user ID -func (r *mattermostUserResolver) GetDisplayName(externalUserID string) string { - // Try to get the actual username from Mattermost API - user, appErr := r.bridge.api.GetUser(externalUserID) - if appErr != nil || user == nil { - r.logger.LogWarn("Failed to get user for display name", "user_id", externalUserID) - return "Unknown User" - } - - // Prefer username, fallback to first name + last name, then to ID - if user.Username != "" { - return user.Username - } - - fullName := strings.TrimSpace(user.FirstName + " " + user.LastName) - if fullName != "" { - return fullName - } - - return user.Id[:8] // Show first 8 chars of ID as fallback -} diff --git a/server/bridge/messagebus.go b/server/bridge/messagebus.go deleted file mode 100644 index 6def69a..0000000 --- a/server/bridge/messagebus.go +++ /dev/null @@ -1,244 +0,0 @@ -package bridge - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" - "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" -) - -const ( - // DefaultMessageBufferSize is the default buffer size for message channels - DefaultMessageBufferSize = 1000 - - // MessageDeliveryTimeout is the maximum time to wait for message delivery - MessageDeliveryTimeout = 5 * time.Second -) - -// messageBus implements the MessageBus interface -type messageBus struct { - // Core messaging - incomingMessages chan *model.DirectionalMessage - subscribers map[string]chan *model.DirectionalMessage - subscribersMu sync.RWMutex - - // Lifecycle management - ctx context.Context - cancel context.CancelFunc - logger logger.Logger - wg sync.WaitGroup - started bool - startMu sync.Mutex -} - -// NewMessageBus creates a new message bus instance -func NewMessageBus(logger logger.Logger) model.MessageBus { - ctx, cancel := context.WithCancel(context.Background()) - - return &messageBus{ - incomingMessages: make(chan *model.DirectionalMessage, DefaultMessageBufferSize), - subscribers: make(map[string]chan *model.DirectionalMessage), - ctx: ctx, - cancel: cancel, - logger: logger, - } -} - -// Subscribe returns a channel that receives messages for the specified bridge -func (mb *messageBus) Subscribe(bridgeName string) <-chan *model.DirectionalMessage { - mb.subscribersMu.Lock() - defer mb.subscribersMu.Unlock() - - // Create a buffered channel for this subscriber - ch := make(chan *model.DirectionalMessage, DefaultMessageBufferSize) - mb.subscribers[bridgeName] = ch - - mb.logger.LogDebug("Bridge subscribed to message bus", "bridge", bridgeName) - return ch -} - -// Publish sends a message to the message bus for routing -func (mb *messageBus) Publish(msg *model.DirectionalMessage) error { - if msg == nil { - return fmt.Errorf("message cannot be nil") - } - - if msg.BridgeMessage == nil { - return fmt.Errorf("bridge message cannot be nil") - } - - select { - case mb.incomingMessages <- msg: - mb.logger.LogDebug("Message published to bus", - "source_bridge", msg.SourceBridge, - "direction", msg.Direction, - "channel_id", msg.SourceChannelID) - return nil - case <-time.After(MessageDeliveryTimeout): - mb.logger.LogWarn("Message delivery timeout", - "source_bridge", msg.SourceBridge, - "channel_id", msg.SourceChannelID) - return fmt.Errorf("message delivery timeout") - case <-mb.ctx.Done(): - return fmt.Errorf("message bus is shutting down") - } -} - -// Start begins message routing -func (mb *messageBus) Start() error { - mb.startMu.Lock() - defer mb.startMu.Unlock() - - if mb.started { - return fmt.Errorf("message bus is already started") - } - - mb.logger.LogInfo("Starting message bus") - - // Start the message routing goroutine - mb.wg.Add(1) - go mb.routeMessages() - - mb.started = true - mb.logger.LogInfo("Message bus started successfully") - return nil -} - -// Stop ends message routing and cleans up resources -func (mb *messageBus) Stop() error { - mb.startMu.Lock() - defer mb.startMu.Unlock() - - if !mb.started { - return nil // Already stopped - } - - mb.logger.LogInfo("Stopping message bus") - - // Cancel context to signal shutdown - mb.cancel() - - // Wait for routing goroutine to finish - mb.wg.Wait() - - // Close all subscriber channels - mb.subscribersMu.Lock() - for bridgeName, ch := range mb.subscribers { - close(ch) - mb.logger.LogDebug("Closed subscriber channel", "bridge", bridgeName) - } - mb.subscribers = make(map[string]chan *model.DirectionalMessage) - mb.subscribersMu.Unlock() - - // Close incoming messages channel - close(mb.incomingMessages) - - mb.started = false - mb.logger.LogInfo("Message bus stopped successfully") - return nil -} - -// routeMessages handles the main message routing loop -func (mb *messageBus) routeMessages() { - defer mb.wg.Done() - - mb.logger.LogDebug("Message routing started") - - for { - select { - case msg, ok := <-mb.incomingMessages: - if !ok { - mb.logger.LogDebug("Incoming messages channel closed, stopping routing") - return - } - - if err := mb.routeMessage(msg); err != nil { - mb.logger.LogError("Failed to route message", - "source_bridge", msg.SourceBridge, - "direction", msg.Direction, - "error", err) - } - - case <-mb.ctx.Done(): - mb.logger.LogDebug("Context cancelled, stopping message routing") - return - } - } -} - -// routeMessage routes a single message to appropriate subscribers -func (mb *messageBus) routeMessage(msg *model.DirectionalMessage) error { - mb.subscribersMu.RLock() - defer mb.subscribersMu.RUnlock() - - routedCount := 0 - - // Route to specific target bridges if specified - if len(msg.TargetBridges) > 0 { - for _, targetBridge := range msg.TargetBridges { - if ch, exists := mb.subscribers[targetBridge]; exists { - if mb.deliverMessage(ch, msg, targetBridge) { - routedCount++ - } - } else { - mb.logger.LogWarn("Target bridge not subscribed", - "target_bridge", targetBridge, - "source_bridge", msg.SourceBridge) - } - } - } else { - // Route to all subscribers except the source bridge - for bridgeName, ch := range mb.subscribers { - if bridgeName != msg.SourceBridge { - if mb.deliverMessage(ch, msg, bridgeName) { - routedCount++ - } - } - } - } - - mb.logger.LogDebug("Message routed", - "source_bridge", msg.SourceBridge, - "routed_to_count", routedCount) - - return nil -} - -// deliverMessage attempts to deliver a message to a specific subscriber -func (mb *messageBus) deliverMessage(ch chan *model.DirectionalMessage, msg *model.DirectionalMessage, targetBridge string) bool { - select { - case ch <- msg: - return true - case <-time.After(MessageDeliveryTimeout): - mb.logger.LogWarn("Message delivery timeout to bridge", - "target_bridge", targetBridge, - "source_bridge", msg.SourceBridge) - return false - case <-mb.ctx.Done(): - return false - } -} - -// GetStats returns statistics about the message bus -func (mb *messageBus) GetStats() map[string]interface{} { - mb.subscribersMu.RLock() - defer mb.subscribersMu.RUnlock() - - stats := map[string]interface{}{ - "started": mb.started, - "subscriber_count": len(mb.subscribers), - "buffer_size": DefaultMessageBufferSize, - "pending_messages": len(mb.incomingMessages), - } - - subscribers := make([]string, 0, len(mb.subscribers)) - for bridgeName := range mb.subscribers { - subscribers = append(subscribers, bridgeName) - } - stats["subscribers"] = subscribers - - return stats -} diff --git a/server/bridge/xmpp/bridge.go b/server/bridge/xmpp/bridge.go index 902b8ae..4a1ffe6 100644 --- a/server/bridge/xmpp/bridge.go +++ b/server/bridge/xmpp/bridge.go @@ -18,11 +18,6 @@ import ( "github.com/mattermost/mattermost/server/public/plugin" ) -const ( - // defaultMessageBufferSize is the buffer size for incoming message channels - defaultMessageBufferSize = 1000 -) - // xmppBridge handles syncing messages between Mattermost and XMPP type xmppBridge struct { logger logger.Logger @@ -30,12 +25,6 @@ type xmppBridge struct { kvstore kvstore.KVStore bridgeClient *xmppClient.Client // Main bridge XMPP client connection userManager pluginModel.BridgeUserManager - remoteID string // Remote ID for shared channels - - // Message handling - messageHandler *xmppMessageHandler - userResolver *xmppUserResolver - incomingMessages chan *pluginModel.DirectionalMessage // Connection management connected atomic.Bool @@ -52,25 +41,19 @@ type xmppBridge struct { } // NewBridge creates a new XMPP bridge -func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration, remoteID string) pluginModel.Bridge { +func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration) pluginModel.Bridge { ctx, cancel := context.WithCancel(context.Background()) b := &xmppBridge{ - logger: log, - api: api, - kvstore: kvstore, - ctx: ctx, - cancel: cancel, - channelMappings: make(map[string]string), - config: cfg, - userManager: bridge.NewUserManager("xmpp", log), - incomingMessages: make(chan *pluginModel.DirectionalMessage, defaultMessageBufferSize), - remoteID: remoteID, + logger: log, + api: api, + kvstore: kvstore, + ctx: ctx, + cancel: cancel, + channelMappings: make(map[string]string), + config: cfg, + userManager: bridge.NewUserManager("xmpp", log), } - // Initialize handlers after bridge is created - b.messageHandler = newMessageHandler(b) - b.userResolver = newUserResolver(b) - // Initialize XMPP client with configuration if cfg.EnableSync && cfg.XMPPServerURL != "" && cfg.XMPPUsername != "" && cfg.XMPPPassword != "" { b.bridgeClient = b.createXMPPClient(cfg) @@ -97,50 +80,56 @@ func (b *xmppBridge) createXMPPClient(cfg *config.Configuration) *xmppClient.Cli ) } -// getConfiguration safely retrieves the current configuration -func (b *xmppBridge) getConfiguration() *config.Configuration { - b.configMu.RLock() - defer b.configMu.RUnlock() - return b.config -} - // UpdateConfiguration updates the bridge configuration -// It handles validation and reconnection logic when the configuration changes -func (b *xmppBridge) UpdateConfiguration(cfg *config.Configuration) error { - // Validate configuration using built-in validation - if err := cfg.IsValid(); err != nil { - return fmt.Errorf("invalid configuration: %w", err) +func (b *xmppBridge) UpdateConfiguration(newConfig any) error { + cfg, ok := newConfig.(*config.Configuration) + if !ok { + return fmt.Errorf("invalid configuration type") } - // Get current config to check if restart is needed - oldConfig := b.getConfiguration() - - // Update configuration under lock, then release immediately b.configMu.Lock() + oldConfig := b.config b.config = cfg + defer b.configMu.Unlock() + + b.logger.LogInfo("XMPP bridge configuration updated") // Initialize or update XMPP client with new configuration - if !cfg.Equals(oldConfig) { - if b.bridgeClient != nil && b.bridgeClient.Disconnect() != nil { - b.logger.LogError("Failed to disconnect old XMPP bridge client") + if cfg.EnableSync { + if cfg.XMPPServerURL == "" || cfg.XMPPUsername == "" || cfg.XMPPPassword == "" { + return fmt.Errorf("XMPP server URL, username, and password are required when sync is enabled") } + b.bridgeClient = b.createXMPPClient(cfg) - } - b.configMu.Unlock() - - // Stop the bridge - if err := b.Stop(); err != nil { - b.logger.LogWarn("Error stopping bridge during restart", "error", err) + } else { + b.bridgeClient = nil } - // Start the bridge with new configuration - // Start() method already uses getConfiguration() safely - if err := b.Start(); err != nil { - b.logger.LogError("Failed to restart bridge with new configuration", "error", err) - return fmt.Errorf("failed to restart bridge: %w", err) + // Check if we need to restart the bridge due to configuration changes + wasConnected := b.connected.Load() + needsRestart := oldConfig != nil && !oldConfig.Equals(cfg) && wasConnected + + // Log the configuration change + if needsRestart { + b.logger.LogInfo("Configuration changed, restarting bridge") + } else { + b.logger.LogInfo("Configuration updated", "config", cfg) } - b.logger.LogDebug("XMPP bridge configuration updated successfully") + if needsRestart { + b.logger.LogInfo("Configuration changed, restarting bridge") + + // Stop the bridge + if err := b.Stop(); err != nil { + b.logger.LogWarn("Error stopping bridge during restart", "error", err) + } + + // Start the bridge with new configuration + if err := b.Start(); err != nil { + b.logger.LogError("Failed to restart bridge with new configuration", "error", err) + return fmt.Errorf("failed to restart bridge: %w", err) + } + } return nil } @@ -177,9 +166,6 @@ func (b *xmppBridge) Start() error { // Start connection monitor go b.connectionMonitor() - // Start message aggregation - go b.startMessageAggregation() - b.logger.LogInfo("Mattermost to XMPP bridge started successfully") return nil } @@ -504,8 +490,8 @@ func (b *xmppBridge) DeleteChannelMapping(channelID string) error { return nil } -// ChannelMappingExists checks if an XMPP room exists on the remote service -func (b *xmppBridge) ChannelMappingExists(roomID string) (bool, error) { +// RoomExists checks if an XMPP room exists on the remote service +func (b *xmppBridge) RoomExists(roomID string) (bool, error) { if !b.connected.Load() { return false, fmt.Errorf("not connected to XMPP server") } @@ -549,78 +535,7 @@ func (b *xmppBridge) GetRoomMapping(roomID string) (string, error) { return channelID, nil } -// GetChannelMappingForBridge retrieves the Mattermost channel ID for a given room ID from a specific bridge -func (b *xmppBridge) GetChannelMappingForBridge(bridgeName, roomID string) (string, error) { - // Look up the channel ID using the bridge name and room ID as the key - channelIDBytes, err := b.kvstore.Get(kvstore.BuildChannelMapKey(bridgeName, roomID)) - if err != nil { - // No mapping found is not an error, just return empty string - b.logger.LogDebug("No channel mapping found for bridge room", "bridge_name", bridgeName, "room_id", roomID) - return "", nil - } - - channelID := string(channelIDBytes) - b.logger.LogDebug("Found channel mapping for bridge room", "bridge_name", bridgeName, "room_id", roomID, "channel_id", channelID) - - return channelID, nil -} - // GetUserManager returns the user manager for this bridge func (b *xmppBridge) GetUserManager() pluginModel.BridgeUserManager { return b.userManager } - -// startMessageAggregation starts the message aggregation goroutine -func (b *xmppBridge) startMessageAggregation() { - clientChannel := b.bridgeClient.GetMessageChannel() - - for { - select { - case <-b.ctx.Done(): - b.logger.LogDebug("Stopping XMPP message aggregation") - return - case msg, ok := <-clientChannel: - if !ok { - b.logger.LogDebug("Bridge client message channel closed") - return - } - - // Forward to our bridge's message channel - select { - case b.incomingMessages <- msg: - // Message forwarded successfully - case <-b.ctx.Done(): - return - default: - b.logger.LogWarn("Bridge message channel full, dropping message", - "source_channel", msg.SourceChannelID, - "user_id", msg.SourceUserID) - } - } - } -} - -// GetMessageChannel returns the channel for incoming messages from XMPP -func (b *xmppBridge) GetMessageChannel() <-chan *pluginModel.DirectionalMessage { - return b.incomingMessages -} - -// SendMessage sends a message to an XMPP room -func (b *xmppBridge) SendMessage(msg *pluginModel.BridgeMessage) error { - return b.messageHandler.sendMessageToXMPP(msg) -} - -// GetMessageHandler returns the message handler for this bridge -func (b *xmppBridge) GetMessageHandler() pluginModel.MessageHandler { - return b.messageHandler -} - -// GetUserResolver returns the user resolver for this bridge -func (b *xmppBridge) GetUserResolver() pluginModel.UserResolver { - return b.userResolver -} - -// GetRemoteID returns the remote ID used for shared channels registration -func (b *xmppBridge) GetRemoteID() string { - return b.remoteID -} diff --git a/server/bridge/xmpp/message_handler.go b/server/bridge/xmpp/message_handler.go deleted file mode 100644 index 4035901..0000000 --- a/server/bridge/xmpp/message_handler.go +++ /dev/null @@ -1,166 +0,0 @@ -package xmpp - -import ( - "fmt" - "strings" - - "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" - pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" - xmppClient "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp" -) - -// xmppMessageHandler handles incoming messages for the XMPP bridge -type xmppMessageHandler struct { - bridge *xmppBridge - logger logger.Logger -} - -// newMessageHandler creates a new XMPP message handler -func newMessageHandler(bridge *xmppBridge) *xmppMessageHandler { - return &xmppMessageHandler{ - bridge: bridge, - logger: bridge.logger, - } -} - -// ProcessMessage processes an incoming message for the XMPP bridge -func (h *xmppMessageHandler) ProcessMessage(msg *pluginModel.DirectionalMessage) error { - h.logger.LogDebug("Processing message for XMPP bridge", - "source_bridge", msg.SourceBridge, - "direction", msg.Direction, - "channel_id", msg.SourceChannelID) - - // Skip messages that originated from XMPP to prevent loops - if msg.SourceBridge == "xmpp" { - h.logger.LogDebug("Skipping XMPP-originated message to prevent loop") - return nil - } - - // For incoming messages to XMPP, we send them to XMPP rooms - if msg.Direction == pluginModel.DirectionIncoming { - return h.sendMessageToXMPP(msg.BridgeMessage) - } - - h.logger.LogDebug("Ignoring outgoing message for XMPP bridge") - return nil -} - -// CanHandleMessage determines if this handler can process the message -func (h *xmppMessageHandler) CanHandleMessage(msg *pluginModel.BridgeMessage) bool { - // XMPP bridge can handle text messages that didn't originate from XMPP - return msg.MessageType == "text" && msg.SourceBridge != "xmpp" -} - -// GetSupportedMessageTypes returns the message types this handler supports -func (h *xmppMessageHandler) GetSupportedMessageTypes() []string { - return []string{"text"} -} - -// sendMessageToXMPP sends a message to an XMPP room -func (h *xmppMessageHandler) sendMessageToXMPP(msg *pluginModel.BridgeMessage) error { - if h.bridge.bridgeClient == nil { - return fmt.Errorf("XMPP client not initialized") - } - - if !h.bridge.connected.Load() { - return fmt.Errorf("not connected to XMPP server") - } - - // Get the XMPP room JID from the channel mapping - roomJID, err := h.bridge.GetChannelMapping(msg.SourceChannelID) - if err != nil { - return fmt.Errorf("failed to get room mapping: %w", err) - } - if roomJID == "" { - return fmt.Errorf("channel is not mapped to any XMPP room") - } - - // Format the message content with user information - content := h.formatMessageContent(msg) - - // Create XMPP message request - req := xmppClient.MessageRequest{ - RoomJID: roomJID, - Message: content, - } - - // Send the message - _, err = h.bridge.bridgeClient.SendMessage(req) - if err != nil { - return fmt.Errorf("failed to send message to XMPP room: %w", err) - } - - h.logger.LogDebug("Message sent to XMPP room", - "channel_id", msg.SourceChannelID, - "room_jid", roomJID, - "content_length", len(content)) - - return nil -} - -// formatMessageContent formats the message content for XMPP -func (h *xmppMessageHandler) formatMessageContent(msg *pluginModel.BridgeMessage) string { - // For messages from other bridges, prefix with the user name - if msg.SourceUserName != "" { - return fmt.Sprintf("<%s> %s", msg.SourceUserName, msg.Content) - } - return msg.Content -} - -// xmppUserResolver handles user resolution for the XMPP bridge -type xmppUserResolver struct { - bridge *xmppBridge - logger logger.Logger -} - -// newUserResolver creates a new XMPP user resolver -func newUserResolver(bridge *xmppBridge) *xmppUserResolver { - return &xmppUserResolver{ - bridge: bridge, - logger: bridge.logger, - } -} - -// ResolveUser converts an external user ID to an ExternalUser -func (r *xmppUserResolver) ResolveUser(externalUserID string) (*pluginModel.ExternalUser, error) { - r.logger.LogDebug("Resolving XMPP user", "user_id", externalUserID) - - // For XMPP, the external user ID is typically the full JID - return &pluginModel.ExternalUser{ - BridgeType: "xmpp", - ExternalUserID: externalUserID, - DisplayName: r.GetDisplayName(externalUserID), - MattermostUserID: "", // Will be resolved by user mapping system - }, nil -} - -// FormatUserMention formats a user mention for Markdown content -func (r *xmppUserResolver) FormatUserMention(user *pluginModel.ExternalUser) string { - // For XMPP, we can format mentions as simple text with the display name - return fmt.Sprintf("@%s", user.DisplayName) -} - -// GetDisplayName extracts display name from external user ID -func (r *xmppUserResolver) GetDisplayName(externalUserID string) string { - // For XMPP JIDs, extract the local part or resource as display name - // Format: user@domain/resource -> use resource or user - if len(externalUserID) == 0 { - return "Unknown User" - } - - // Try to parse as JID and extract meaningful display name - parts := strings.Split(externalUserID, "/") - if len(parts) > 1 { - // Has resource part, use it as display name - return parts[1] - } - - // No resource, try to extract local part from user@domain - atIndex := strings.Index(externalUserID, "@") - if atIndex > 0 { - return externalUserID[:atIndex] - } - - // Fallback to the full ID - return externalUserID -} diff --git a/server/command/command.go b/server/command/command.go index 686eb97..d15b9f5 100644 --- a/server/command/command.go +++ b/server/command/command.go @@ -6,13 +6,11 @@ import ( pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" "github.com/mattermost/mattermost/server/public/model" - "github.com/mattermost/mattermost/server/public/plugin" "github.com/mattermost/mattermost/server/public/pluginapi" ) type Handler struct { client *pluginapi.Client - api plugin.API bridgeManager pluginModel.BridgeManager } @@ -24,7 +22,7 @@ type Command interface { const xmppBridgeCommandTrigger = "xmppbridge" // Register all your slash commands in the NewCommandHandler function. -func NewCommandHandler(client *pluginapi.Client, api plugin.API, bridgeManager pluginModel.BridgeManager) Command { +func NewCommandHandler(client *pluginapi.Client, bridgeManager pluginModel.BridgeManager) Command { // Register XMPP bridge command xmppBridgeData := model.NewAutocompleteData(xmppBridgeCommandTrigger, "", "Manage XMPP bridge") mapSubcommand := model.NewAutocompleteData("map", "[room_jid]", "Map current channel to XMPP room") @@ -37,17 +35,11 @@ func NewCommandHandler(client *pluginapi.Client, api plugin.API, bridgeManager p statusSubcommand := model.NewAutocompleteData("status", "", "Show bridge connection status") xmppBridgeData.AddCommand(statusSubcommand) - syncSubcommand := model.NewAutocompleteData("sync", "", "Force sync with shared channels if channel is mapped") - xmppBridgeData.AddCommand(syncSubcommand) - - syncResetSubcommand := model.NewAutocompleteData("sync-reset", "", "Reset sync cursor for channel if mapped") - xmppBridgeData.AddCommand(syncResetSubcommand) - err := client.SlashCommand.Register(&model.Command{ Trigger: xmppBridgeCommandTrigger, AutoComplete: true, AutoCompleteDesc: "Manage XMPP bridge mappings", - AutoCompleteHint: "[map|unmap|status|sync|sync-reset]", + AutoCompleteHint: "[map|unmap|status]", AutocompleteData: xmppBridgeData, }) if err != nil { @@ -56,7 +48,6 @@ func NewCommandHandler(client *pluginapi.Client, api plugin.API, bridgeManager p return &Handler{ client: client, - api: api, bridgeManager: bridgeManager, } } @@ -94,8 +85,6 @@ func (c *Handler) executeXMPPBridgeCommand(args *model.CommandArgs) *model.Comma - ` + "`/xmppbridge map `" + ` - Map current channel to XMPP room - ` + "`/xmppbridge unmap`" + ` - Unmap current channel from XMPP room - ` + "`/xmppbridge status`" + ` - Show bridge connection status -- ` + "`/xmppbridge sync`" + ` - Force sync with shared channels if channel is mapped -- ` + "`/xmppbridge sync-reset`" + ` - Reset sync cursor for channel if mapped **Example:** ` + "`/xmppbridge map general@conference.example.com`", @@ -110,10 +99,6 @@ func (c *Handler) executeXMPPBridgeCommand(args *model.CommandArgs) *model.Comma return c.executeUnmapCommand(args) case "status": return c.executeStatusCommand(args) - case "sync": - return c.executeSyncCommand(args) - case "sync-reset": - return c.executeSyncResetCommand(args) default: return &model.CommandResponse{ ResponseType: model.CommandResponseTypeEphemeral, @@ -176,11 +161,11 @@ func (c *Handler) executeMapCommand(args *model.CommandArgs, fields []string) *m // Create the mapping using BridgeManager mappingReq := pluginModel.CreateChannelMappingRequest{ - ChannelID: channelID, - BridgeName: "xmpp", - BridgeChannelID: roomJID, - UserID: args.UserId, - TeamID: args.TeamId, + ChannelID: channelID, + BridgeName: "xmpp", + BridgeRoomID: roomJID, + UserID: args.UserId, + TeamID: args.TeamId, } err = c.bridgeManager.CreateChannelMapping(mappingReq) @@ -299,105 +284,6 @@ func (c *Handler) isSystemAdmin(userID string) bool { } // formatMappingError provides user-friendly error messages for mapping operations -func (c *Handler) executeSyncCommand(args *model.CommandArgs) *model.CommandResponse { - channelID := args.ChannelId - - // Get the XMPP bridge to check if channel is mapped - bridge, err := c.bridgeManager.GetBridge("xmpp") - if err != nil { - return &model.CommandResponse{ - ResponseType: model.CommandResponseTypeEphemeral, - Text: "❌ XMPP bridge is not available. Please check the plugin configuration.", - } - } - - // Check if channel is mapped to XMPP - roomJID, err := bridge.GetChannelMapping(channelID) - if err != nil { - return &model.CommandResponse{ - ResponseType: model.CommandResponseTypeEphemeral, - Text: fmt.Sprintf("Error checking channel mapping: %v", err), - } - } - - if roomJID == "" { - return &model.CommandResponse{ - ResponseType: model.CommandResponseTypeEphemeral, - Text: "❌ This channel is not mapped to any XMPP room. Use `/xmppbridge map ` to create a mapping first.", - } - } - - // Force sync with shared channels - if err := c.api.SyncSharedChannel(channelID); err != nil { - return &model.CommandResponse{ - ResponseType: model.CommandResponseTypeEphemeral, - Text: fmt.Sprintf("❌ Failed to sync channel: %v", err), - } - } - - return &model.CommandResponse{ - ResponseType: model.CommandResponseTypeEphemeral, - Text: fmt.Sprintf("✅ Successfully triggered sync for channel mapped to XMPP room: `%s`", roomJID), - } -} - -func (c *Handler) executeSyncResetCommand(args *model.CommandArgs) *model.CommandResponse { - channelID := args.ChannelId - - // Get the XMPP bridge to check if channel is mapped - bridge, err := c.bridgeManager.GetBridge("xmpp") - if err != nil { - return &model.CommandResponse{ - ResponseType: model.CommandResponseTypeEphemeral, - Text: "❌ XMPP bridge is not available. Please check the plugin configuration.", - } - } - - // Check if channel is mapped to XMPP - roomJID, err := bridge.GetChannelMapping(channelID) - if err != nil { - return &model.CommandResponse{ - ResponseType: model.CommandResponseTypeEphemeral, - Text: fmt.Sprintf("Error checking channel mapping: %v", err), - } - } - - if roomJID == "" { - return &model.CommandResponse{ - ResponseType: model.CommandResponseTypeEphemeral, - Text: "❌ This channel is not mapped to any XMPP room. Use `/xmppbridge map ` to create a mapping first.", - } - } - - // Get remoteID from bridge for cursor operations - remoteID := bridge.GetRemoteID() - if remoteID == "" { - return &model.CommandResponse{ - ResponseType: model.CommandResponseTypeEphemeral, - Text: "❌ Bridge remote ID not available. Cannot reset sync cursor.", - } - } - - // Create empty cursor to reset to beginning - emptyCursor := model.GetPostsSinceForSyncCursor{ - LastPostUpdateAt: 1, - LastPostCreateAt: 1, - } - - // Reset sync cursor using UpdateSharedChannelCursor - if err := c.api.UpdateSharedChannelCursor(channelID, remoteID, emptyCursor); err != nil { - return &model.CommandResponse{ - ResponseType: model.CommandResponseTypeEphemeral, - Text: fmt.Sprintf("❌ Failed to reset sync cursor: %v", err), - } - } - - return &model.CommandResponse{ - ResponseType: model.CommandResponseTypeEphemeral, - Text: fmt.Sprintf("✅ Successfully reset sync cursor for channel mapped to XMPP room: `%s`", roomJID), - } -} - func (c *Handler) formatMappingError(operation, roomJID string, err error) *model.CommandResponse { errorMsg := err.Error() @@ -406,7 +292,7 @@ func (c *Handler) formatMappingError(operation, roomJID string, err error) *mode case strings.Contains(errorMsg, "already mapped to channel"): return &model.CommandResponse{ ResponseType: model.CommandResponseTypeEphemeral, - Text: fmt.Sprintf(`❌ **Channel Already Mapped** + Text: fmt.Sprintf(`❌ **Room Already Mapped** The XMPP room **%s** is already connected to another channel. @@ -419,7 +305,7 @@ The XMPP room **%s** is already connected to another channel. case strings.Contains(errorMsg, "does not exist"): return &model.CommandResponse{ ResponseType: model.CommandResponseTypeEphemeral, - Text: fmt.Sprintf(`❌ **Channel Not Found** + Text: fmt.Sprintf(`❌ **Room Not Found** The XMPP room **%s** doesn't exist or isn't accessible. diff --git a/server/configuration.go b/server/configuration.go index b6bad1f..79fa130 100644 --- a/server/configuration.go +++ b/server/configuration.go @@ -57,7 +57,7 @@ func (p *Plugin) OnConfigurationChange() error { return errors.Wrap(err, "failed to load plugin configuration") } - p.API.LogDebug("Plugin configuration changed") + p.API.LogDebug("Loaded configuration in OnConfigurationChange", "configuration", configuration) // Validate the configuration if err := configuration.IsValid(); err != nil { diff --git a/server/hooks_sharedchannels.go b/server/hooks_sharedchannels.go index e49df2b..798ba0f 100644 --- a/server/hooks_sharedchannels.go +++ b/server/hooks_sharedchannels.go @@ -1,24 +1,18 @@ package main -import ( - "fmt" - "time" - - pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" - "github.com/mattermost/mattermost/server/public/model" -) +import "github.com/mattermost/mattermost/server/public/model" // OnSharedChannelsPing is called to check if the bridge is healthy and ready to process messages func (p *Plugin) OnSharedChannelsPing(remoteCluster *model.RemoteCluster) bool { config := p.getConfiguration() + p.logger.LogDebug("OnSharedChannelsPing called", "remote_cluster_id", remoteCluster.RemoteId) + var remoteClusterID string if remoteCluster != nil { remoteClusterID = remoteCluster.RemoteId } - p.logger.LogDebug("OnSharedChannelsPing called", "remote_cluster_id", remoteClusterID) - p.logger.LogDebug("Received shared channels ping", "remote_cluster_id", remoteClusterID) // If sync is disabled, we're still "healthy" but not actively processing @@ -50,119 +44,3 @@ func (p *Plugin) OnSharedChannelsPing(remoteCluster *model.RemoteCluster) bool { p.logger.LogDebug("Shared channels ping successful - XMPP bridge is healthy", "remote_cluster_id", remoteClusterID) return true } - -// OnSharedChannelsSyncMsg processes sync messages from Mattermost shared channels and routes them to XMPP -func (p *Plugin) OnSharedChannelsSyncMsg(msg *model.SyncMsg, rc *model.RemoteCluster) (model.SyncResponse, error) { - config := p.getConfiguration() - - // Initialize sync response - now := model.GetMillis() - response := model.SyncResponse{ - PostsLastUpdateAt: now, - UsersLastUpdateAt: now, - ReactionsLastUpdateAt: now, - } - - var remoteClusterID string - if rc != nil { - remoteClusterID = rc.RemoteId - } - - p.logger.LogDebug("OnSharedChannelsSyncMsg called", - "remote_cluster_id", remoteClusterID, - "channel_id", msg.ChannelId, - "post_count", len(msg.Posts)) - - // If sync is disabled, return success but don't process - if !config.EnableSync { - p.logger.LogDebug("Sync message received but sync is disabled", "remote_cluster_id", remoteClusterID) - return response, nil - } - - // Process each post in the sync message - var processedCount int - var errors []string - - for _, post := range msg.Posts { - if err := p.processSyncPost(post, msg.ChannelId, msg.Users); err != nil { - errorMsg := fmt.Sprintf("failed to process post %s: %v", post.Id, err) - errors = append(errors, errorMsg) - p.logger.LogError("Failed to process sync post", "post_id", post.Id, "error", err) - } else { - processedCount++ - } - } - - p.logger.LogInfo("Processed sync message", - "remote_cluster_id", remoteClusterID, - "channel_id", msg.ChannelId, - "processed_posts", processedCount, - "failed_posts", len(errors)) - - // If we have errors, return them - if len(errors) > 0 { - return response, fmt.Errorf("failed to process %d posts: %v", len(errors), errors) - } - - return response, nil -} - -// processSyncPost converts a Mattermost post to a bridge message and routes it to XMPP -func (p *Plugin) processSyncPost(post *model.Post, channelID string, users map[string]*model.User) error { - // Find the user who created this post - var postUser *model.User - p.logger.LogInfo("Processing sync post", "post_id", post.UserId, "users", users) - if users != nil { - postUser = users[post.UserId] - } - - // If user not found in sync data, try to get from API - if postUser == nil { - var err error - postUser, err = p.API.GetUser(post.UserId) - if err != nil { - p.logger.LogWarn("Failed to get user for post", "user_id", post.UserId, "post_id", post.Id, "error", err) - // Create a placeholder user - postUser = &model.User{ - Id: post.UserId, - Username: "unknown-user", - } - } - } - - // Create bridge message from Mattermost post - bridgeMessage := &pluginModel.BridgeMessage{ - SourceBridge: "mattermost", - SourceChannelID: channelID, - SourceUserID: postUser.Id, - SourceUserName: postUser.Username, - Content: post.Message, - MessageType: "text", // TODO: Handle other message types - Timestamp: time.Unix(post.CreateAt/1000, 0), - MessageID: post.Id, - ThreadID: post.RootId, - TargetBridges: []string{"xmpp"}, // Route to XMPP - Metadata: map[string]any{ - "original_post": post, - "channel_id": channelID, - }, - } - - // Create directional message for outgoing (Mattermost -> XMPP) - directionalMessage := &pluginModel.DirectionalMessage{ - BridgeMessage: bridgeMessage, - Direction: pluginModel.DirectionOutgoing, - } - - // Publish the message to the message bus for routing to XMPP bridge - if err := p.bridgeManager.PublishMessage(directionalMessage); err != nil { - return fmt.Errorf("failed to publish sync message to message bus: %w", err) - } - - p.logger.LogDebug("Successfully published sync message to message bus", - "post_id", post.Id, - "channel_id", channelID, - "user", postUser.Username) - - return nil -} diff --git a/server/model/bridge.go b/server/model/bridge.go index dcf67df..bd00672 100644 --- a/server/model/bridge.go +++ b/server/model/bridge.go @@ -20,11 +20,11 @@ const ( // CreateChannelMappingRequest contains information needed to create a channel mapping type CreateChannelMappingRequest struct { - ChannelID string // Mattermost channel ID - BridgeName string // Name of the bridge (e.g., "xmpp") - BridgeChannelID string // Remote room/channel ID (e.g., JID for XMPP) - UserID string // ID of user who triggered the mapping creation - TeamID string // Team ID where the channel belongs + ChannelID string // Mattermost channel ID + BridgeName string // Name of the bridge (e.g., "xmpp") + BridgeRoomID string // Remote room/channel ID (e.g., JID for XMPP) + UserID string // ID of user who triggered the mapping creation + TeamID string // Team ID where the channel belongs } // Validate checks if all required fields are present and valid @@ -35,8 +35,8 @@ func (r CreateChannelMappingRequest) Validate() error { if r.BridgeName == "" { return fmt.Errorf("bridgeName cannot be empty") } - if r.BridgeChannelID == "" { - return fmt.Errorf("bridgeChannelID cannot be empty") + if r.BridgeRoomID == "" { + return fmt.Errorf("bridgeRoomID cannot be empty") } if r.UserID == "" { return fmt.Errorf("userID cannot be empty") @@ -73,9 +73,6 @@ func (r DeleteChannelMappingRequest) Validate() error { } type BridgeManager interface { - // Start starts the bridge manager and message routing system. - Start() error - // RegisterBridge registers a bridge with the given name. Returns an error if the name is empty, // the bridge is nil, or a bridge with the same name is already registered. RegisterBridge(name string, bridge Bridge) error @@ -113,21 +110,18 @@ type BridgeManager interface { // OnPluginConfigurationChange propagates configuration changes to all registered bridges. // Returns an error if any bridge fails to update its configuration, but continues to // attempt updating all bridges. - OnPluginConfigurationChange(config *config.Configuration) error + OnPluginConfigurationChange(config any) error // CreateChannelMapping is called when a channel mapping is created. CreateChannelMapping(req CreateChannelMappingRequest) error // DeleteChannepMapping is called when a channel mapping is deleted. DeleteChannepMapping(req DeleteChannelMappingRequest) error - - // PublishMessage publishes a message to the message bus for routing to target bridges - PublishMessage(msg *DirectionalMessage) error } type Bridge interface { // UpdateConfiguration updates the bridge configuration - UpdateConfiguration(config *config.Configuration) error + UpdateConfiguration(config any) error // Start starts the bridge Start() error @@ -135,20 +129,20 @@ type Bridge interface { // Stop stops the bridge Stop() error - // CreateChannelMapping creates a mapping between a Mattermost channel ID and a bridge channel ID. + // CreateChannelMapping creates a mapping between a Mattermost channel ID and an bridge room ID. CreateChannelMapping(channelID, roomJID string) error - // GetChannelMapping retrieves the bridge channel ID for a given Mattermost channel ID. + // GetChannelMapping retrieves the bridge room ID for a given Mattermost channel ID. GetChannelMapping(channelID string) (string, error) - // DeleteChannelMapping removes a mapping between a Mattermost channel ID and a bridge channel ID. + // DeleteChannelMapping removes a mapping between a Mattermost channel ID and a bridge room ID. DeleteChannelMapping(channelID string) error - // ChannelMappingExists checks if a room/channel exists on the remote service. - ChannelMappingExists(roomID string) (bool, error) + // RoomExists checks if a room/channel exists on the remote service. + RoomExists(roomID string) (bool, error) - // GetChannelMappingForBridge retrieves the Mattermost channel ID for a given room ID from a specific bridge. - GetChannelMappingForBridge(bridgeName, roomID string) (string, error) + // GetRoomMapping retrieves the Mattermost channel ID for a given room ID (reverse lookup). + GetRoomMapping(roomID string) (string, error) // IsConnected checks if the bridge is connected to the remote service. IsConnected() bool @@ -158,15 +152,6 @@ type Bridge interface { // GetUserManager returns the user manager for this bridge. GetUserManager() BridgeUserManager - - // Message handling for bidirectional communication - GetMessageChannel() <-chan *DirectionalMessage - SendMessage(msg *BridgeMessage) error - GetMessageHandler() MessageHandler - GetUserResolver() UserResolver - - // GetRemoteID returns the remote ID used for shared channels registration - GetRemoteID() string } // BridgeUser represents a user connected to any bridge service diff --git a/server/model/message.go b/server/model/message.go deleted file mode 100644 index 747b0f2..0000000 --- a/server/model/message.go +++ /dev/null @@ -1,88 +0,0 @@ -package model - -import ( - "time" -) - -// MessageDirection indicates the direction of message flow -type MessageDirection string - -const ( - DirectionIncoming MessageDirection = "incoming" // From external system to us - DirectionOutgoing MessageDirection = "outgoing" // From us to external system -) - -// BridgeMessage represents a message that can be passed between any bridge types -type BridgeMessage struct { - // Source information - SourceBridge string // "xmpp", "mattermost", "slack", etc. - SourceChannelID string // Channel ID in source system - SourceUserID string // User ID in source system (JID, user ID, etc.) - SourceUserName string // Display name in source system - - // Message content (standardized on Markdown) - Content string // Markdown formatted message content - MessageType string // "text", "image", "file", etc. - - // Metadata - Timestamp time.Time // When message was received - MessageID string // Unique message ID from source - ThreadID string // Thread/reply ID (if applicable) - - // Routing hints - TargetBridges []string // Which bridges should receive this - Metadata map[string]any // Bridge-specific metadata -} - -// DirectionalMessage wraps a BridgeMessage with direction information -type DirectionalMessage struct { - *BridgeMessage - Direction MessageDirection -} - -// ExternalUser represents a user from any bridge system -type ExternalUser struct { - BridgeType string // "xmpp", "slack", etc. - ExternalUserID string // JID, Slack user ID, etc. - DisplayName string // How to display this user - MattermostUserID string // Mapped Mattermost user (if exists) -} - -// UserResolver handles user resolution for a specific bridge -type UserResolver interface { - // ResolveUser converts an external user ID to an ExternalUser - ResolveUser(externalUserID string) (*ExternalUser, error) - - // FormatUserMention formats a user mention for Markdown content - FormatUserMention(user *ExternalUser) string - - // GetDisplayName extracts display name from external user ID - GetDisplayName(externalUserID string) string -} - -// MessageBus handles routing messages between bridges -type MessageBus interface { - // Subscribe returns a channel that receives messages for the specified bridge - Subscribe(bridgeName string) <-chan *DirectionalMessage - - // Publish sends a message to the message bus for routing - Publish(msg *DirectionalMessage) error - - // Start begins message routing - Start() error - - // Stop ends message routing and cleans up resources - Stop() error -} - -// MessageHandler processes incoming messages for a bridge -type MessageHandler interface { - // ProcessMessage handles an incoming message - ProcessMessage(msg *DirectionalMessage) error - - // CanHandleMessage determines if this handler can process the message - CanHandleMessage(msg *BridgeMessage) bool - - // GetSupportedMessageTypes returns the message types this handler supports - GetSupportedMessageTypes() []string -} diff --git a/server/plugin.go b/server/plugin.go index 97bb691..f7a5f81 100644 --- a/server/plugin.go +++ b/server/plugin.go @@ -14,6 +14,7 @@ import ( "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/store/kvstore" + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp" "github.com/mattermost/mattermost/server/public/model" "github.com/mattermost/mattermost/server/public/plugin" "github.com/mattermost/mattermost/server/public/pluginapi" @@ -34,6 +35,9 @@ type Plugin struct { // commandClient is the client used to register and execute slash commands. commandClient command.Command + // xmppClient is the client used to communicate with XMPP servers. + xmppClient *xmpp.Client + // logger is the main plugin logger logger logger.Logger @@ -66,11 +70,15 @@ func (p *Plugin) OnActivate() error { p.kvstore = kvstore.NewKVStore(p.client) + p.initXMPPClient() + // Load configuration directly cfg := p.getConfiguration() + p.logger.LogDebug("Loaded configuration in OnActivate", "config", cfg) // Register the plugin for shared channels if err := p.registerForSharedChannels(); err != nil { + p.logger.LogError("Failed to register for shared channels", "error", err) return fmt.Errorf("failed to register for shared channels: %w", err) } @@ -79,15 +87,11 @@ func (p *Plugin) OnActivate() error { // Initialize and register bridges with current configuration if err := p.initBridges(*cfg); err != nil { + p.logger.LogError("Failed to initialize bridges", "error", err) return fmt.Errorf("failed to initialize bridges: %w", err) } - p.commandClient = command.NewCommandHandler(p.client, p.API, p.bridgeManager) - - // Start the bridge manager (this starts message routing) - if err := p.bridgeManager.Start(); err != nil { - return fmt.Errorf("failed to start bridge manager: %w", err) - } + p.commandClient = command.NewCommandHandler(p.client, p.bridgeManager) // Start all bridges for _, bridgeName := range p.bridgeManager.ListBridges() { @@ -141,6 +145,18 @@ func (p *Plugin) ExecuteCommand(c *plugin.Context, args *model.CommandArgs) (*mo return response, nil } +func (p *Plugin) initXMPPClient() { + cfg := p.getConfiguration() + p.xmppClient = xmpp.NewClient( + cfg.XMPPServerURL, + cfg.XMPPUsername, + cfg.XMPPPassword, + cfg.GetXMPPResource(), + p.remoteID, + p.logger, + ) +} + func (p *Plugin) initBridges(cfg config.Configuration) error { // Create and register XMPP bridge xmppBridge := xmppbridge.NewBridge( @@ -148,7 +164,6 @@ func (p *Plugin) initBridges(cfg config.Configuration) error { p.API, p.kvstore, &cfg, - p.remoteID, ) if err := p.bridgeManager.RegisterBridge("xmpp", xmppBridge); err != nil { @@ -161,8 +176,6 @@ func (p *Plugin) initBridges(cfg config.Configuration) error { p.API, p.kvstore, &cfg, - p.botUserID, - "mattermost", ) if err := p.bridgeManager.RegisterBridge("mattermost", mattermostBridge); err != nil { @@ -190,7 +203,7 @@ func (p *Plugin) registerForSharedChannels() error { PluginID: manifest.Id, CreatorID: botUserID, AutoShareDMs: false, - AutoInvited: false, + AutoInvited: true, } remoteID, appErr := p.API.RegisterPluginForSharedChannels(opts) diff --git a/server/store/kvstore/constants.go b/server/store/kvstore/constants.go index 698a82c..579bd5c 100644 --- a/server/store/kvstore/constants.go +++ b/server/store/kvstore/constants.go @@ -7,17 +7,72 @@ import "strings" // to ensure consistency and avoid key conflicts. const ( + // CurrentKVStoreVersion is the current version requiring migrations + CurrentKVStoreVersion = 2 + // KeyPrefixXMPPUser is the prefix for XMPP user ID -> Mattermost user ID mappings + KeyPrefixXMPPUser = "xmpp_user_" + // KeyPrefixMattermostUser is the prefix for Mattermost user ID -> XMPP user ID mappings + KeyPrefixMattermostUser = "mattermost_user_" + // KeyPrefixChannelMap is the prefix for bridge-agnostic channel mappings KeyPrefixChannelMap = "channel_map_" + + // KeyPrefixGhostUser is the prefix for Mattermost user ID -> XMPP ghost user ID cache + KeyPrefixGhostUser = "ghost_user_" + // KeyPrefixGhostRoom is the prefix for ghost user room membership tracking + KeyPrefixGhostRoom = "ghost_room_" + + // KeyPrefixXMPPEventPost is the prefix for XMPP event ID -> Mattermost post ID mappings + KeyPrefixXMPPEventPost = "xmpp_event_post_" + // KeyPrefixXMPPReaction is the prefix for XMPP reaction event ID -> reaction info mappings + KeyPrefixXMPPReaction = "xmpp_reaction_" + + // KeyStoreVersion is the key for tracking the current KV store schema version + KeyStoreVersion = "kv_store_version" + + // KeyPrefixLegacyDMMapping was the old prefix for DM mappings + KeyPrefixLegacyDMMapping = "dm_mapping_" + // KeyPrefixLegacyXMPPDMMapping was the old prefix for XMPP DM mappings + KeyPrefixLegacyXMPPDMMapping = "xmpp_dm_mapping_" ) // Helper functions for building KV store keys +// BuildXMPPUserKey creates a key for XMPP user -> Mattermost user mapping +func BuildXMPPUserKey(xmppUserID string) string { + return KeyPrefixXMPPUser + xmppUserID +} + +// BuildMattermostUserKey creates a key for Mattermost user -> XMPP user mapping +func BuildMattermostUserKey(mattermostUserID string) string { + return KeyPrefixMattermostUser + mattermostUserID +} + // BuildChannelMapKey creates a bridge-agnostic key for channel mappings func BuildChannelMapKey(bridgeName, identifier string) string { return KeyPrefixChannelMap + bridgeName + "_" + identifier } +// BuildGhostUserKey creates a key for ghost user cache +func BuildGhostUserKey(mattermostUserID string) string { + return KeyPrefixGhostUser + mattermostUserID +} + +// BuildGhostRoomKey creates a key for ghost user room membership +func BuildGhostRoomKey(mattermostUserID, roomID string) string { + return KeyPrefixGhostRoom + mattermostUserID + "_" + roomID +} + +// BuildXMPPEventPostKey creates a key for XMPP event -> post mapping +func BuildXMPPEventPostKey(xmppEventID string) string { + return KeyPrefixXMPPEventPost + xmppEventID +} + +// BuildXMPPReactionKey creates a key for XMPP reaction storage +func BuildXMPPReactionKey(reactionEventID string) string { + return KeyPrefixXMPPReaction + reactionEventID +} + // ExtractIdentifierFromChannelMapKey extracts the identifier from a bridge-agnostic channel map key func ExtractIdentifierFromChannelMapKey(key, bridgeName string) string { expectedPrefix := KeyPrefixChannelMap + bridgeName + "_" @@ -28,4 +83,4 @@ func ExtractIdentifierFromChannelMapKey(key, bridgeName string) string { return "" } return key[len(expectedPrefix):] -} \ No newline at end of file +} diff --git a/server/xmpp/client.go b/server/xmpp/client.go index 537711a..aad5e24 100644 --- a/server/xmpp/client.go +++ b/server/xmpp/client.go @@ -6,15 +6,10 @@ import ( "crypto/tls" "encoding/xml" "fmt" - "net" - "net/url" "time" - "github.com/jellydator/ttlcache/v3" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" - "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" "mellium.im/sasl" - "mellium.im/xmlstream" "mellium.im/xmpp" "mellium.im/xmpp/disco" "mellium.im/xmpp/jid" @@ -23,17 +18,6 @@ import ( "mellium.im/xmpp/stanza" ) -const ( - // defaultOperationTimeout is the default timeout for XMPP operations - defaultOperationTimeout = 5 * time.Second - - // msgBufferSize is the buffer size for incoming message channels - msgBufferSize = 1000 - - // messageDedupeTTL is the TTL for message deduplication cache - messageDedupeTTL = 30 * time.Second -) - // Client represents an XMPP client for communicating with XMPP servers. type Client struct { serverURL string @@ -54,12 +38,6 @@ type Client struct { mux *mux.ServeMux sessionReady chan struct{} sessionServing bool - - // Message handling for bridge integration - incomingMessages chan *model.DirectionalMessage - - // Message deduplication cache to handle XMPP server duplicates - dedupeCache *ttlcache.Cache[string, time.Time] } // MessageRequest represents a request to send a message. @@ -107,40 +85,22 @@ type UserProfile struct { // NewClient creates a new XMPP client. func NewClient(serverURL, username, password, resource, remoteID string, logger logger.Logger) *Client { ctx, cancel := context.WithCancel(context.Background()) - - // Create TTL cache for message deduplication - dedupeCache := ttlcache.New( - ttlcache.WithTTL[string, time.Time](messageDedupeTTL), - ) - - // Start automatic cleanup in background - go dedupeCache.Start() - - client := &Client{ - serverURL: serverURL, - username: username, - password: password, - resource: resource, - remoteID: remoteID, - logger: logger, - ctx: ctx, - cancel: cancel, - sessionReady: make(chan struct{}), - incomingMessages: make(chan *model.DirectionalMessage, msgBufferSize), - dedupeCache: dedupeCache, - } - - // Create MUC client and set up message handling mucClient := &muc.Client{} - client.mucClient = mucClient + mux := mux.New("jabber:client", muc.HandleClient(mucClient)) - // Create mux with MUC client and our message handler - mux := mux.New("jabber:client", - muc.HandleClient(mucClient), - mux.MessageFunc(stanza.GroupChatMessage, xml.Name{}, client.handleIncomingMessage)) - client.mux = mux - - return client + return &Client{ + serverURL: serverURL, + username: username, + password: password, + resource: resource, + remoteID: remoteID, + logger: logger, + ctx: ctx, + cancel: cancel, + mucClient: mucClient, + mux: mux, + sessionReady: make(chan struct{}), + } } // NewClientWithTLS creates a new XMPP client with custom TLS configuration. @@ -155,44 +115,6 @@ func (c *Client) SetServerDomain(domain string) { c.serverDomain = domain } -// parseServerAddress parses a server URL and returns a host:port address -func (c *Client) parseServerAddress(serverURL string) (string, error) { - // Handle simple host:port format (e.g., "localhost:5222") - if host, port, err := net.SplitHostPort(serverURL); err == nil { - // Already in host:port format, validate and return - if host == "" { - return "", fmt.Errorf("empty hostname in server URL: %s", serverURL) - } - if port == "" { - return "", fmt.Errorf("empty port in server URL: %s", serverURL) - } - return serverURL, nil - } - - // Try parsing as URL (e.g., "xmpp://localhost:5222" or "xmpps://localhost:5223") - parsedURL, err := url.Parse(serverURL) - if err != nil { - return "", fmt.Errorf("invalid server URL format: %w", err) - } - - host := parsedURL.Hostname() - if host == "" { - return "", fmt.Errorf("no hostname found in server URL: %s", serverURL) - } - - port := parsedURL.Port() - // Use default XMPP port if not specified - if port == "" { - if parsedURL.Scheme == "xmpps" { - port = "5223" - } else { - port = "5222" - } - } - - return host + ":" + port, nil -} - // Connect establishes connection to the XMPP server func (c *Client) Connect() error { if c.session != nil { @@ -228,38 +150,15 @@ func (c *Client) Connect() error { } } - // Create a timeout context for the connection attempt (30 seconds) - connectCtx, connectCancel := context.WithTimeout(c.ctx, 30*time.Second) - defer connectCancel() - - c.logger.LogDebug("Connecting to XMPP server", "server_url", c.serverURL) - - // Parse server address for direct connection - // We use this instead of using the JID for the user to avoid SRV lookups that can fail - // depending on network/dns configuration and to ensure we connect directly to the specified - // server. - serverAddr, err := c.parseServerAddress(c.serverURL) - if err != nil { - return fmt.Errorf("failed to parse server URL %s: %w", c.serverURL, err) - } - - var d net.Dialer - conn, err := d.DialContext(connectCtx, "tcp", serverAddr) - if err != nil { - return fmt.Errorf("failed to dial XMPP server at %s: %w", serverAddr, err) - } - - // Create client session with direct connection - c.session, err = xmpp.NewClientSession( - connectCtx, + // Use DialClientSession for proper SASL authentication + c.session, err = xmpp.DialClientSession( + c.ctx, c.jidAddr, - conn, xmpp.StartTLS(tlsConfig), xmpp.SASL("", c.password, sasl.Plain), xmpp.BindResource(), ) if err != nil { - conn.Close() return fmt.Errorf("failed to establish XMPP session: %w", err) } @@ -272,12 +171,9 @@ func (c *Client) Connect() error { if !c.sessionServing { return fmt.Errorf("failed to start session serving") } - c.logger.LogInfo("XMPP client connected successfully", "jid", c.jidAddr.String()) return nil - case <-time.After(10 * time.Second): + case <-time.After(5 * time.Second): return fmt.Errorf("timeout waiting for session to be ready") - case <-c.ctx.Done(): - return fmt.Errorf("connection cancelled: %w", c.ctx.Err()) } } @@ -310,47 +206,18 @@ func (c *Client) serveSession() { // Disconnect closes the XMPP connection func (c *Client) Disconnect() error { - if c.session == nil { - return nil // Already disconnected - } - - c.logger.LogInfo("Disconnecting XMPP client", "jid", c.jidAddr.String()) - - // Send offline presence before disconnecting to properly leave rooms - if err := c.SetOfflinePresence(); err != nil { - c.logger.LogWarn("Failed to set offline presence before disconnect", "error", err) - // Don't fail the disconnect for presence issues - } - - // Close the session with a timeout to prevent hanging - sessionCloseCtx, cancel := context.WithTimeout(context.Background(), defaultOperationTimeout) - defer cancel() - - sessionCloseDone := make(chan error, 1) - go func() { - sessionCloseDone <- c.session.Close() - }() - - select { - case err := <-sessionCloseDone: + if c.session != nil { + err := c.session.Close() c.session = nil if err != nil { - c.logger.LogWarn("Error closing XMPP session", "error", err) return fmt.Errorf("failed to close XMPP session: %w", err) } - case <-sessionCloseCtx.Done(): - c.logger.LogWarn("Timeout closing XMPP session, forcing disconnect") - c.session = nil - // Continue with cleanup even on timeout } - // Stop the TTL cache cleanup goroutine - c.dedupeCache.Stop() + if c.cancel != nil { + c.cancel() + } - // Cancel the client context - c.cancel() - - c.logger.LogInfo("XMPP client disconnected successfully") return nil } @@ -551,30 +418,6 @@ func (c *Client) SetOnlinePresence() error { return nil } -// SetOfflinePresence sends an offline presence stanza to indicate the client is going offline -func (c *Client) SetOfflinePresence() error { - if c.session == nil { - return fmt.Errorf("XMPP session not established") - } - - // Create presence stanza indicating we're unavailable - presence := stanza.Presence{ - Type: stanza.UnavailablePresence, - From: c.jidAddr, - } - - // Create a context with timeout for the presence update - ctx, cancel := context.WithTimeout(context.Background(), defaultOperationTimeout) - defer cancel() - - // Send the presence stanza - if err := c.session.Encode(ctx, presence); err != nil { - return fmt.Errorf("failed to send offline presence: %w", err) - } - - return nil -} - // CheckRoomExists verifies if an XMPP room exists and is accessible using disco#info func (c *Client) CheckRoomExists(roomJID string) (bool, error) { if c.session == nil { @@ -692,118 +535,3 @@ func (c *Client) Ping() error { c.logger.LogDebug("XMPP ping successful", "duration", duration) return nil } - -// GetMessageChannel returns the channel for incoming messages (Bridge interface) -func (c *Client) GetMessageChannel() <-chan *model.DirectionalMessage { - return c.incomingMessages -} - -// handleIncomingMessage processes incoming XMPP message stanzas -func (c *Client) handleIncomingMessage(msg stanza.Message, t xmlstream.TokenReadEncoder) error { - c.logger.LogDebug("Received XMPP message", - "from", msg.From.String(), - "to", msg.To.String(), - "type", fmt.Sprintf("%v", msg.Type)) - - // Only process groupchat messages for now (MUC messages from channels) - if msg.Type != stanza.GroupChatMessage { - c.logger.LogDebug("Ignoring non-groupchat message", "type", fmt.Sprintf("%v", msg.Type)) - return nil - } - - // Parse the message body from the token reader - var msgWithBody struct { - stanza.Message - Body string `xml:"body"` - } - msgWithBody.Message = msg - - d := xml.NewTokenDecoder(t) - if err := d.DecodeElement(&msgWithBody, nil); err != nil { - c.logger.LogError("Failed to decode message body", "error", err) - return err - } - - if msgWithBody.Body == "" { - c.logger.LogDebug("Message has no body, ignoring") - return nil - } - - // Deduplicate messages using message ID and TTL cache - if msg.ID != "" { - // Check if this message ID is already in the cache (indicates duplicate) - if c.dedupeCache.Has(msg.ID) { - return nil - } - - // Record this message in the cache with TTL - c.dedupeCache.Set(msg.ID, time.Now(), ttlcache.DefaultTTL) - } - - // Extract channel and user information from JIDs - channelID, err := c.extractChannelID(msg.From) - if err != nil { - c.logger.LogError("Failed to extract channel ID from JID", "from", msg.From.String(), "error", err) - return nil - } - - userID, userName := c.extractUserInfo(msg.From) - - // Create BridgeMessage - bridgeMsg := &model.BridgeMessage{ - SourceBridge: "xmpp", - SourceChannelID: channelID, - SourceUserID: userID, - SourceUserName: userName, - Content: msgWithBody.Body, // Already Markdown compatible - MessageType: "text", - Timestamp: time.Now(), // XMPP doesn't always provide timestamps - MessageID: msg.ID, - TargetBridges: []string{}, // Will be routed to all other bridges - Metadata: map[string]any{ - "xmpp_from": msg.From.String(), - "xmpp_to": msg.To.String(), - }, - } - - // Wrap in directional message - directionalMsg := &model.DirectionalMessage{ - BridgeMessage: bridgeMsg, - Direction: model.DirectionIncoming, - } - - // Send to message channel (non-blocking) - select { - case c.incomingMessages <- directionalMsg: - // Message queued successfully - default: - c.logger.LogWarn("Message channel full, dropping message", - "channel_id", channelID, - "user_id", userID) - } - - return nil -} - -// extractChannelID extracts the channel ID (room bare JID) from a message JID -func (c *Client) extractChannelID(from jid.JID) (string, error) { - // For MUC messages, the channel ID is the bare JID (without resource/nickname) - return from.Bare().String(), nil -} - -// extractUserInfo extracts user ID and display name from a message JID -func (c *Client) extractUserInfo(from jid.JID) (string, string) { - // For MUC messages, the resource part is the nickname - nickname := from.Resourcepart() - - // Use the full JID as user ID for XMPP - userID := from.String() - - // Use nickname as display name if available, otherwise use full JID - displayName := nickname - if displayName == "" { - displayName = from.String() - } - - return userID, displayName -}