From d159c668c28de7771069c4710567294dee17dd5b Mon Sep 17 00:00:00 2001 From: Felipe Martin Date: Fri, 1 Aug 2025 13:47:15 +0200 Subject: [PATCH] feat: implement production-ready MUC operations and comprehensive testing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Implement proper XMPP MUC operations using mellium.im/xmpp/muc package - Add session readiness checking to prevent blocking on room joins - Create comprehensive bridge manager architecture with lifecycle management - Add complete channel mapping functionality with KV store persistence - Remove defensive logger nil checks as requested by user - Enhance XMPP client doctor with MUC testing (join/wait/leave workflow) - Add detailed dev server documentation for test room creation - Implement timeout protection for all MUC operations - Add proper error handling with fmt.Errorf instead of pkg/errors - Successfully tested: MUC join in ~21ms, 5s wait, clean leave operation 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- cmd/xmpp-client-doctor/main.go | 91 +++++- server/bridge/manager.go | 216 ++++++++++++++ server/bridge/mattermost/bridge.go | 450 ----------------------------- server/bridge/xmpp/bridge.go | 444 +++++++++++++++++++++++++++- server/command/command.go | 38 ++- server/configuration.go | 9 +- server/model/bridge.go | 46 +++ server/plugin.go | 70 +++-- server/store/kvstore/constants.go | 8 + server/xmpp/client.go | 197 ++++++++++--- sidecar/README.md | 32 +- 11 files changed, 1048 insertions(+), 553 deletions(-) create mode 100644 server/bridge/manager.go delete mode 100644 server/bridge/mattermost/bridge.go diff --git a/cmd/xmpp-client-doctor/main.go b/cmd/xmpp-client-doctor/main.go index 7a0b208..c3930b2 100644 --- a/cmd/xmpp-client-doctor/main.go +++ b/cmd/xmpp-client-doctor/main.go @@ -17,6 +17,7 @@ const ( defaultUsername = "testuser@localhost" defaultPassword = "testpass" defaultResource = "doctor" + defaultTestRoom = "test1@conference.localhost" ) type Config struct { @@ -24,6 +25,8 @@ type Config struct { Username string Password string Resource string + TestRoom string + TestMUC bool Verbose bool InsecureSkipVerify bool } @@ -36,19 +39,27 @@ func main() { flag.StringVar(&config.Username, "username", defaultUsername, "XMPP username/JID") flag.StringVar(&config.Password, "password", defaultPassword, "XMPP password") flag.StringVar(&config.Resource, "resource", defaultResource, "XMPP resource") + flag.StringVar(&config.TestRoom, "test-room", defaultTestRoom, "MUC room JID for testing") + flag.BoolVar(&config.TestMUC, "test-muc", true, "Enable MUC room testing (join/wait/leave)") flag.BoolVar(&config.Verbose, "verbose", true, "Enable verbose logging") flag.BoolVar(&config.InsecureSkipVerify, "insecure-skip-verify", true, "Skip TLS certificate verification (for development)") flag.Usage = func() { - fmt.Fprintf(os.Stderr, "xmpp-client-doctor - Test XMPP client connectivity\n\n") + fmt.Fprintf(os.Stderr, "xmpp-client-doctor - Test XMPP client connectivity and MUC operations\n\n") fmt.Fprintf(os.Stderr, "This tool tests the XMPP client implementation by connecting to an XMPP server,\n") - fmt.Fprintf(os.Stderr, "performing a connection test, and then disconnecting gracefully.\n\n") + fmt.Fprintf(os.Stderr, "performing connection tests, optionally testing MUC room operations,\n") + fmt.Fprintf(os.Stderr, "and then disconnecting gracefully.\n\n") fmt.Fprintf(os.Stderr, "Usage:\n") fmt.Fprintf(os.Stderr, " %s [flags]\n\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Examples:\n") + fmt.Fprintf(os.Stderr, " %s # Test basic connectivity\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s --test-muc # Test connectivity and MUC operations\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s --test-muc=false # Test connectivity only\n\n", os.Args[0]) fmt.Fprintf(os.Stderr, "Flags:\n") flag.PrintDefaults() fmt.Fprintf(os.Stderr, "\nDefault values are configured for the development server in ./sidecar/\n") fmt.Fprintf(os.Stderr, "Make sure to start the development server with: cd sidecar && docker-compose up -d\n") + fmt.Fprintf(os.Stderr, "For MUC testing, create the test room 'test1' via the admin console at http://localhost:9090\n") } flag.Parse() @@ -61,6 +72,9 @@ func main() { log.Printf(" Username: %s", config.Username) log.Printf(" Resource: %s", config.Resource) log.Printf(" Password: %s", maskPassword(config.Password)) + if config.TestMUC { + log.Printf(" Test Room: %s", config.TestRoom) + } } // Test the XMPP client @@ -72,6 +86,9 @@ func main() { log.Printf("✅ XMPP client test completed successfully!") } else { fmt.Println("✅ XMPP client connectivity test passed!") + if config.TestMUC { + fmt.Println("✅ XMPP MUC operations test passed!") + } } } @@ -134,6 +151,21 @@ func testXMPPClient(config *Config) error { if config.Verbose { log.Printf("✅ Connection health test passed in %v", pingDuration) + } + + var mucDuration time.Duration + + // Test MUC operations if requested + if config.TestMUC { + start = time.Now() + err = testMUCOperations(client, config) + if err != nil { + return fmt.Errorf("MUC operations test failed: %w", err) + } + mucDuration = time.Since(start) + } + + if config.Verbose { log.Printf("Disconnecting from XMPP server...") } @@ -150,8 +182,61 @@ func testXMPPClient(config *Config) error { log.Printf("Connection summary:") log.Printf(" Connect time: %v", connectDuration) log.Printf(" Ping time: %v", pingDuration) + if config.TestMUC { + log.Printf(" MUC operations time: %v", mucDuration) + } log.Printf(" Disconnect time: %v", disconnectDuration) - log.Printf(" Total time: %v", connectDuration+pingDuration+disconnectDuration) + totalTime := connectDuration + pingDuration + disconnectDuration + if config.TestMUC { + totalTime += mucDuration + } + log.Printf(" Total time: %v", totalTime) + } + + return nil +} + +func testMUCOperations(client *xmpp.Client, config *Config) error { + if config.Verbose { + log.Printf("Testing MUC operations with room: %s", config.TestRoom) + log.Printf("Attempting to join MUC room...") + } + + // Test joining the room + start := time.Now() + err := client.JoinRoom(config.TestRoom) + if err != nil { + return fmt.Errorf("failed to join MUC room %s: %w", config.TestRoom, err) + } + joinDuration := time.Since(start) + + if config.Verbose { + log.Printf("✅ Successfully joined MUC room in %v", joinDuration) + log.Printf("Waiting 5 seconds in the room...") + } + + // Wait 5 seconds + time.Sleep(5 * time.Second) + + if config.Verbose { + log.Printf("Attempting to leave MUC room...") + } + + // Test leaving the room + start = time.Now() + err = client.LeaveRoom(config.TestRoom) + if err != nil { + return fmt.Errorf("failed to leave MUC room %s: %w", config.TestRoom, err) + } + leaveDuration := time.Since(start) + + if config.Verbose { + log.Printf("✅ Successfully left MUC room in %v", leaveDuration) + log.Printf("MUC operations summary:") + log.Printf(" Join time: %v", joinDuration) + log.Printf(" Wait time: 5s") + log.Printf(" Leave time: %v", leaveDuration) + log.Printf(" Total MUC time: %v", joinDuration+5*time.Second+leaveDuration) } return nil diff --git a/server/bridge/manager.go b/server/bridge/manager.go new file mode 100644 index 0000000..588aa81 --- /dev/null +++ b/server/bridge/manager.go @@ -0,0 +1,216 @@ +package bridge + +import ( + "fmt" + "sync" + + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" +) + +// Manager manages multiple bridge instances +type Manager struct { + bridges map[string]model.Bridge + mu sync.RWMutex + logger logger.Logger +} + +// NewManager creates a new bridge manager +func NewManager(logger logger.Logger) model.BridgeManager { + if logger == nil { + panic("logger cannot be nil") + } + + return &Manager{ + bridges: make(map[string]model.Bridge), + logger: logger, + } +} + +// RegisterBridge registers a bridge with the manager +func (m *Manager) RegisterBridge(name string, bridge model.Bridge) error { + if name == "" { + return fmt.Errorf("bridge name cannot be empty") + } + if bridge == nil { + return fmt.Errorf("bridge cannot be nil") + } + + m.mu.Lock() + defer m.mu.Unlock() + + if _, exists := m.bridges[name]; exists { + return fmt.Errorf("bridge '%s' is already registered", name) + } + + m.bridges[name] = bridge + m.logger.LogInfo("Bridge registered", "name", name) + + return nil +} + +// StartBridge starts a specific bridge +func (m *Manager) StartBridge(name string) error { + m.mu.RLock() + bridge, exists := m.bridges[name] + m.mu.RUnlock() + + if !exists { + return fmt.Errorf("bridge '%s' is not registered", name) + } + + m.logger.LogInfo("Starting bridge", "name", name) + + if err := bridge.Start(); err != nil { + m.logger.LogError("Failed to start bridge", "name", name, "error", err) + return fmt.Errorf("failed to start bridge '%s': %w", name, err) + } + + m.logger.LogInfo("Bridge started successfully", "name", name) + return nil +} + +// StopBridge stops a specific bridge +func (m *Manager) StopBridge(name string) error { + m.mu.RLock() + bridge, exists := m.bridges[name] + m.mu.RUnlock() + + if !exists { + return fmt.Errorf("bridge '%s' is not registered", name) + } + + m.logger.LogInfo("Stopping bridge", "name", name) + + if err := bridge.Stop(); err != nil { + m.logger.LogError("Failed to stop bridge", "name", name, "error", err) + return fmt.Errorf("failed to stop bridge '%s': %w", name, err) + } + + m.logger.LogInfo("Bridge stopped successfully", "name", name) + return nil +} + +// UnregisterBridge removes a bridge from the manager +func (m *Manager) UnregisterBridge(name string) error { + m.mu.Lock() + defer m.mu.Unlock() + + bridge, exists := m.bridges[name] + if !exists { + return fmt.Errorf("bridge '%s' is not registered", name) + } + + // Stop the bridge before unregistering + if bridge.IsConnected() { + if err := bridge.Stop(); err != nil { + m.logger.LogWarn("Failed to stop bridge during unregistration", "name", name, "error", err) + } + } + + delete(m.bridges, name) + m.logger.LogInfo("Bridge unregistered", "name", name) + + return nil +} + +// GetBridge retrieves a bridge by name +func (m *Manager) GetBridge(name string) (model.Bridge, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + bridge, exists := m.bridges[name] + if !exists { + return nil, fmt.Errorf("bridge '%s' is not registered", name) + } + + return bridge, nil +} + +// ListBridges returns a list of all registered bridge names +func (m *Manager) ListBridges() []string { + m.mu.RLock() + defer m.mu.RUnlock() + + bridges := make([]string, 0, len(m.bridges)) + for name := range m.bridges { + bridges = append(bridges, name) + } + + return bridges +} + +// HasBridge checks if a bridge with the given name is registered +func (m *Manager) HasBridge(name string) bool { + m.mu.RLock() + defer m.mu.RUnlock() + + _, exists := m.bridges[name] + return exists +} + +// HasBridges checks if any bridges are registered +func (m *Manager) HasBridges() bool { + m.mu.RLock() + defer m.mu.RUnlock() + + return len(m.bridges) > 0 +} + +// Shutdown stops and unregisters all bridges +func (m *Manager) Shutdown() error { + m.mu.Lock() + defer m.mu.Unlock() + + m.logger.LogInfo("Shutting down bridge manager", "bridge_count", len(m.bridges)) + + var errors []error + for name, bridge := range m.bridges { + if bridge.IsConnected() { + if err := bridge.Stop(); err != nil { + errors = append(errors, fmt.Errorf("failed to stop bridge '%s': %w", name, err)) + m.logger.LogError("Failed to stop bridge during shutdown", "name", name, "error", err) + } + } + } + + // Clear all bridges + m.bridges = make(map[string]model.Bridge) + + m.logger.LogInfo("Bridge manager shutdown complete") + + if len(errors) > 0 { + return fmt.Errorf("shutdown completed with errors: %v", errors) + } + + return nil +} + +// OnPluginConfigurationChange propagates configuration changes to all registered bridges +func (m *Manager) OnPluginConfigurationChange(config any) error { + m.mu.RLock() + defer m.mu.RUnlock() + + if len(m.bridges) == 0 { + return nil + } + + m.logger.LogInfo("Plugin configuration changed, propagating to bridges", "bridge_count", len(m.bridges)) + + var errors []error + for name, bridge := range m.bridges { + if err := bridge.UpdateConfiguration(config); err != nil { + errors = append(errors, fmt.Errorf("failed to update configuration for bridge '%s': %w", name, err)) + m.logger.LogError("Failed to update bridge configuration", "name", name, "error", err) + } else { + m.logger.LogDebug("Successfully updated bridge configuration", "name", name) + } + } + + if len(errors) > 0 { + return fmt.Errorf("configuration update completed with errors: %v", errors) + } + + m.logger.LogInfo("Configuration changes propagated to all bridges") + return nil +} \ No newline at end of file diff --git a/server/bridge/mattermost/bridge.go b/server/bridge/mattermost/bridge.go deleted file mode 100644 index 79b7fb0..0000000 --- a/server/bridge/mattermost/bridge.go +++ /dev/null @@ -1,450 +0,0 @@ -package mattermost - -import ( - "context" - "crypto/tls" - "sync" - "sync/atomic" - "time" - - "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config" - "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" - "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/store/kvstore" - "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp" - "github.com/mattermost/mattermost/server/public/plugin" - "github.com/pkg/errors" -) - -// MattermostToXMPPBridge handles syncing messages from Mattermost to XMPP -type MattermostToXMPPBridge struct { - logger logger.Logger - api plugin.API - kvstore kvstore.KVStore - xmppClient *xmpp.Client - - // Connection management - connected atomic.Bool - ctx context.Context - cancel context.CancelFunc - - // Current configuration - config *config.Configuration - configMu sync.RWMutex - - // Channel mappings cache - channelMappings map[string]string - mappingsMu sync.RWMutex -} - -// NewMattermostToXMPPBridge creates a new Mattermost to XMPP bridge -func NewMattermostToXMPPBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration) *MattermostToXMPPBridge { - ctx, cancel := context.WithCancel(context.Background()) - bridge := &MattermostToXMPPBridge{ - logger: log, - api: api, - kvstore: kvstore, - ctx: ctx, - cancel: cancel, - channelMappings: make(map[string]string), - config: cfg, - } - - // Initialize XMPP client with configuration - if cfg.EnableSync && cfg.XMPPServerURL != "" && cfg.XMPPUsername != "" && cfg.XMPPPassword != "" { - bridge.xmppClient = bridge.createXMPPClient(cfg) - } - - return bridge -} - -// createXMPPClient creates an XMPP client with the given configuration -func (b *MattermostToXMPPBridge) createXMPPClient(cfg *config.Configuration) *xmpp.Client { - // Create TLS config based on certificate verification setting - tlsConfig := &tls.Config{ - InsecureSkipVerify: cfg.XMPPInsecureSkipVerify, - } - - return xmpp.NewClientWithTLS( - cfg.XMPPServerURL, - cfg.XMPPUsername, - cfg.XMPPPassword, - cfg.GetXMPPResource(), - "", // remoteID not needed for bridge user - tlsConfig, - ) -} - -// UpdateConfiguration updates the bridge configuration -func (b *MattermostToXMPPBridge) UpdateConfiguration(newConfig any) error { - cfg, ok := newConfig.(*config.Configuration) - if !ok { - return errors.New("invalid configuration type") - } - - b.configMu.Lock() - oldConfig := b.config - b.config = cfg - - // Initialize or update XMPP client with new configuration - if cfg.EnableSync { - if cfg.XMPPServerURL == "" || cfg.XMPPUsername == "" || cfg.XMPPPassword == "" { - b.configMu.Unlock() - return errors.New("XMPP server URL, username, and password are required when sync is enabled") - } - - b.xmppClient = b.createXMPPClient(cfg) - } else { - b.xmppClient = nil - } - - b.configMu.Unlock() - - // Check if we need to restart the bridge due to configuration changes - wasConnected := b.connected.Load() - needsRestart := oldConfig != nil && !oldConfig.Equals(cfg) && wasConnected - - // Log the configuration change - if b.logger != nil { - if needsRestart { - b.logger.LogInfo("Configuration changed, restarting bridge", "old_config", oldConfig, "new_config", cfg) - } else { - b.logger.LogInfo("Configuration updated", "config", cfg) - } - } - - if needsRestart { - if b.logger != nil { - b.logger.LogInfo("Configuration changed, restarting bridge") - } - - // Stop the bridge - if err := b.Stop(); err != nil && b.logger != nil { - b.logger.LogWarn("Error stopping bridge during restart", "error", err) - } - - // Start the bridge with new configuration - if err := b.Start(); err != nil { - if b.logger != nil { - b.logger.LogError("Failed to restart bridge with new configuration", "error", err) - } - return errors.Wrap(err, "failed to restart bridge") - } - } - - return nil -} - -// Start initializes the bridge and connects to XMPP -func (b *MattermostToXMPPBridge) Start() error { - b.logger.LogDebug("Starting Mattermost to XMPP bridge") - - b.configMu.RLock() - config := b.config - b.configMu.RUnlock() - - if config == nil { - return errors.New("bridge configuration not set") - } - - // Print the configuration for debugging - b.logger.LogDebug("Bridge configuration", "config", config) - - if !config.EnableSync { - if b.logger != nil { - b.logger.LogInfo("XMPP sync is disabled, bridge will not start") - } - return nil - } - - if b.logger != nil { - b.logger.LogInfo("Starting Mattermost to XMPP bridge", "xmpp_server", config.XMPPServerURL, "username", config.XMPPUsername) - } - - // Connect to XMPP server - if err := b.connectToXMPP(); err != nil { - return errors.Wrap(err, "failed to connect to XMPP server") - } - - // Load and join mapped channels - if err := b.loadAndJoinMappedChannels(); err != nil { - if b.logger != nil { - b.logger.LogWarn("Failed to join some mapped channels", "error", err) - } - } - - // Start connection monitor - go b.connectionMonitor() - - if b.logger != nil { - b.logger.LogInfo("Mattermost to XMPP bridge started successfully") - } - return nil -} - -// Stop shuts down the bridge -func (b *MattermostToXMPPBridge) Stop() error { - if b.logger != nil { - b.logger.LogInfo("Stopping Mattermost to XMPP bridge") - } - - if b.cancel != nil { - b.cancel() - } - - if b.xmppClient != nil { - if err := b.xmppClient.Disconnect(); err != nil && b.logger != nil { - b.logger.LogWarn("Error disconnecting from XMPP server", "error", err) - } - } - - b.connected.Store(false) - if b.logger != nil { - b.logger.LogInfo("Mattermost to XMPP bridge stopped") - } - return nil -} - -// connectToXMPP establishes connection to the XMPP server -func (b *MattermostToXMPPBridge) connectToXMPP() error { - if b.xmppClient == nil { - return errors.New("XMPP client is not initialized") - } - - if b.logger != nil { - b.logger.LogDebug("Connecting to XMPP server") - } - - err := b.xmppClient.Connect() - if err != nil { - b.connected.Store(false) - return errors.Wrap(err, "failed to connect to XMPP server") - } - - b.connected.Store(true) - if b.logger != nil { - b.logger.LogInfo("Successfully connected to XMPP server") - } - - // Set online presence after successful connection - if err := b.xmppClient.SetOnlinePresence(); err != nil { - if b.logger != nil { - b.logger.LogWarn("Failed to set online presence", "error", err) - } - // Don't fail the connection for presence issues - } else if b.logger != nil { - b.logger.LogDebug("Set bridge user online presence") - } - - return nil -} - -// loadAndJoinMappedChannels loads channel mappings and joins corresponding XMPP rooms -func (b *MattermostToXMPPBridge) loadAndJoinMappedChannels() error { - if b.logger != nil { - b.logger.LogDebug("Loading and joining mapped channels") - } - - // Get all channel mappings from KV store - mappings, err := b.getAllChannelMappings() - if err != nil { - return errors.Wrap(err, "failed to load channel mappings") - } - - if len(mappings) == 0 { - if b.logger != nil { - b.logger.LogInfo("No channel mappings found, no rooms to join") - } - return nil - } - - if b.logger != nil { - b.logger.LogInfo("Found channel mappings, joining XMPP rooms", "count", len(mappings)) - } - - // Join each mapped room - for channelID, roomJID := range mappings { - if err := b.joinXMPPRoom(channelID, roomJID); err != nil && b.logger != nil { - b.logger.LogWarn("Failed to join room", "channel_id", channelID, "room_jid", roomJID, "error", err) - } - } - - return nil -} - -// joinXMPPRoom joins an XMPP room and updates the local cache -func (b *MattermostToXMPPBridge) joinXMPPRoom(channelID, roomJID string) error { - if !b.connected.Load() { - return errors.New("not connected to XMPP server") - } - - err := b.xmppClient.JoinRoom(roomJID) - if err != nil { - return errors.Wrap(err, "failed to join XMPP room") - } - - // Update local cache - b.mappingsMu.Lock() - b.channelMappings[channelID] = roomJID - b.mappingsMu.Unlock() - - return nil -} - -// getAllChannelMappings retrieves all channel mappings from KV store -func (b *MattermostToXMPPBridge) getAllChannelMappings() (map[string]string, error) { - mappings := make(map[string]string) - return mappings, nil -} - -// connectionMonitor monitors the XMPP connection -func (b *MattermostToXMPPBridge) connectionMonitor() { - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - - for { - select { - case <-b.ctx.Done(): - return - case <-ticker.C: - if err := b.checkConnection(); err != nil { - if b.logger != nil { - b.logger.LogWarn("XMPP connection check failed", "error", err) - } - b.handleReconnection() - } - } - } -} - -// checkConnection verifies the XMPP connection is still active -func (b *MattermostToXMPPBridge) checkConnection() error { - if !b.connected.Load() { - return errors.New("not connected") - } - return b.xmppClient.TestConnection() -} - -// handleReconnection attempts to reconnect to XMPP and rejoin rooms -func (b *MattermostToXMPPBridge) handleReconnection() { - b.configMu.RLock() - config := b.config - b.configMu.RUnlock() - - if config == nil || !config.EnableSync { - return - } - - if b.logger != nil { - b.logger.LogInfo("Attempting to reconnect to XMPP server") - } - b.connected.Store(false) - - if b.xmppClient != nil { - b.xmppClient.Disconnect() - } - - // Retry connection with exponential backoff - maxRetries := 3 - for i := 0; i < maxRetries; i++ { - backoff := time.Duration(1<