package xmpp import ( "context" "crypto/tls" "sync" "sync/atomic" "time" "fmt" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/bridge" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/store/kvstore" xmppClient "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp" "github.com/mattermost/mattermost/server/public/plugin" "mellium.im/xmlstream" "mellium.im/xmpp/stanza" ) const ( // defaultMessageBufferSize is the buffer size for incoming message channels defaultMessageBufferSize = 1000 ) // xmppBridge handles syncing messages between Mattermost and XMPP type xmppBridge struct { logger logger.Logger api plugin.API kvstore kvstore.KVStore bridgeClient *xmppClient.Client // Main bridge XMPP client connection userManager pluginModel.BridgeUserManager bridgeID string // Bridge identifier used for registration remoteID string // Remote ID for shared channels // Message handling messageHandler *xmppMessageHandler userResolver *xmppUserResolver incomingMessages chan *pluginModel.DirectionalMessage // Connection management connected atomic.Bool ctx context.Context cancel context.CancelFunc // Current configuration config *config.Configuration configMu sync.RWMutex // Channel mappings cache channelMappings map[string]string mappingsMu sync.RWMutex } // NewBridge creates a new XMPP bridge func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration, bridgeID, remoteID string) pluginModel.Bridge { ctx, cancel := context.WithCancel(context.Background()) b := &xmppBridge{ logger: log, api: api, kvstore: kvstore, ctx: ctx, cancel: cancel, channelMappings: make(map[string]string), config: cfg, userManager: bridge.NewUserManager(bridgeID, log), incomingMessages: make(chan *pluginModel.DirectionalMessage, defaultMessageBufferSize), bridgeID: bridgeID, remoteID: remoteID, } // Initialize handlers after bridge is created b.messageHandler = newMessageHandler(b) b.userResolver = newUserResolver(b) // Initialize XMPP client with configuration if cfg.EnableSync && cfg.XMPPServerURL != "" && cfg.XMPPUsername != "" && cfg.XMPPPassword != "" { b.bridgeClient = b.createXMPPClient(cfg) } return b } // createXMPPClient creates an XMPP client with the given configuration func (b *xmppBridge) createXMPPClient(cfg *config.Configuration) *xmppClient.Client { // Create TLS config based on certificate verification setting tlsConfig := &tls.Config{ InsecureSkipVerify: cfg.XMPPInsecureSkipVerify, } return xmppClient.NewClientWithTLS( cfg.XMPPServerURL, cfg.XMPPUsername, cfg.XMPPPassword, cfg.GetXMPPResource(), "", // remoteID not needed for bridge client tlsConfig, b.logger, ) } // getConfiguration safely retrieves the current configuration func (b *xmppBridge) getConfiguration() *config.Configuration { b.configMu.RLock() defer b.configMu.RUnlock() return b.config } // UpdateConfiguration updates the bridge configuration // It handles validation and reconnection logic when the configuration changes func (b *xmppBridge) UpdateConfiguration(cfg *config.Configuration) error { // Validate configuration using built-in validation if err := cfg.IsValid(); err != nil { return fmt.Errorf("invalid configuration: %w", err) } // Get current config to check if restart is needed oldConfig := b.getConfiguration() // Update configuration under lock, then release immediately b.configMu.Lock() b.config = cfg // Initialize or update XMPP client with new configuration if !cfg.Equals(oldConfig) { if b.bridgeClient != nil && b.bridgeClient.Disconnect() != nil { b.logger.LogError("Failed to disconnect old XMPP bridge client") } b.bridgeClient = b.createXMPPClient(cfg) } b.configMu.Unlock() // Stop the bridge if err := b.Stop(); err != nil { b.logger.LogWarn("Error stopping bridge during restart", "error", err) } // Start the bridge with new configuration // Start() method already uses getConfiguration() safely if err := b.Start(); err != nil { b.logger.LogError("Failed to restart bridge with new configuration", "error", err) return fmt.Errorf("failed to restart bridge: %w", err) } b.logger.LogDebug("XMPP bridge configuration updated successfully") return nil } // Start initializes the bridge and connects to XMPP func (b *xmppBridge) Start() error { b.logger.LogDebug("Starting Mattermost to XMPP bridge") b.configMu.RLock() config := b.config b.configMu.RUnlock() if config == nil { return fmt.Errorf("bridge configuration not set") } if !config.EnableSync { b.logger.LogInfo("XMPP sync is disabled, bridge will not start") return nil } b.logger.LogInfo("Starting Mattermost to XMPP bridge", "xmpp_server", config.XMPPServerURL, "username", config.XMPPUsername) // Connect to XMPP server if err := b.connectToXMPP(); err != nil { return fmt.Errorf("failed to connect to XMPP server: %w", err) } // Load and join mapped channels if err := b.loadAndJoinMappedChannels(); err != nil { b.logger.LogWarn("Failed to join some mapped channels", "error", err) } // Start connection monitor go b.connectionMonitor() b.logger.LogInfo("Mattermost to XMPP bridge started successfully") return nil } // Stop shuts down the bridge func (b *xmppBridge) Stop() error { b.logger.LogInfo("Stopping Mattermost to XMPP bridge") if b.cancel != nil { b.cancel() } if b.bridgeClient != nil { if err := b.bridgeClient.Disconnect(); err != nil { b.logger.LogWarn("Error disconnecting from XMPP server", "error", err) } } b.connected.Store(false) b.logger.LogInfo("Mattermost to XMPP bridge stopped") return nil } // connectToXMPP establishes connection to the XMPP server func (b *xmppBridge) connectToXMPP() error { if b.bridgeClient == nil { return fmt.Errorf("XMPP client is not initialized") } b.logger.LogDebug("Connecting to XMPP server") err := b.bridgeClient.Connect() if err != nil { b.connected.Store(false) return fmt.Errorf("failed to connect to XMPP server: %w", err) } b.connected.Store(true) b.logger.LogInfo("Successfully connected to XMPP server") // Set online presence after successful connection if err := b.bridgeClient.SetOnlinePresence(); err != nil { b.logger.LogWarn("Failed to set online presence", "error", err) // Don't fail the connection for presence issues } else { b.logger.LogDebug("Set bridge client online presence") } b.bridgeClient.SetMessageHandler(b.handleIncomingXMPPMessage) return nil } // loadAndJoinMappedChannels loads channel mappings and joins corresponding XMPP rooms func (b *xmppBridge) loadAndJoinMappedChannels() error { b.logger.LogDebug("Loading and joining mapped channels") // Get all channel mappings from KV store mappings, err := b.getAllChannelMappings() if err != nil { return fmt.Errorf("failed to load channel mappings: %w", err) } if len(mappings) == 0 { b.logger.LogInfo("No channel mappings found, no rooms to join") return nil } b.logger.LogInfo("Found channel mappings, joining XMPP rooms", "count", len(mappings)) // Join each mapped room for channelID, roomJID := range mappings { if err := b.joinXMPPRoom(channelID, roomJID); err != nil { b.logger.LogWarn("Failed to join room", "channel_id", channelID, "room_jid", roomJID, "error", err) } } return nil } // joinXMPPRoom joins an XMPP room and updates the local cache func (b *xmppBridge) joinXMPPRoom(channelID, roomJID string) error { if !b.connected.Load() { return fmt.Errorf("not connected to XMPP server") } err := b.bridgeClient.JoinRoom(roomJID) if err != nil { return fmt.Errorf("failed to join XMPP room: %w", err) } b.logger.LogInfo("Joined XMPP room", "channel_id", channelID, "room_jid", roomJID) // Update local cache b.mappingsMu.Lock() b.channelMappings[channelID] = roomJID b.mappingsMu.Unlock() return nil } // getAllChannelMappings retrieves all channel mappings from KV store func (b *xmppBridge) getAllChannelMappings() (map[string]string, error) { if b.kvstore == nil { return nil, fmt.Errorf("KV store not initialized") } mappings := make(map[string]string) // Get all keys with the XMPP room mapping prefix to find all mapped rooms xmppPrefix := kvstore.KeyPrefixChannelMap + "xmpp_" keys, err := b.kvstore.ListKeysWithPrefix(0, 1000, xmppPrefix) if err != nil { return nil, fmt.Errorf("failed to list XMPP room mapping keys: %w", err) } // Load each mapping for _, key := range keys { channelIDBytes, err := b.kvstore.Get(key) if err != nil { b.logger.LogWarn("Failed to load mapping for key", "key", key, "error", err) continue } // Extract room JID from the key roomJID := kvstore.ExtractIdentifierFromChannelMapKey(key, "xmpp") if roomJID == "" { b.logger.LogWarn("Failed to extract room JID from key", "key", key) continue } channelID := string(channelIDBytes) mappings[channelID] = roomJID } return mappings, nil } // connectionMonitor monitors the XMPP connection func (b *xmppBridge) connectionMonitor() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-b.ctx.Done(): return case <-ticker.C: if err := b.Ping(); err != nil { b.logger.LogWarn("XMPP connection check failed", "error", err) b.handleReconnection() } } } } // handleReconnection attempts to reconnect to XMPP and rejoin rooms func (b *xmppBridge) handleReconnection() { b.configMu.RLock() config := b.config b.configMu.RUnlock() if config == nil || !config.EnableSync { return } b.logger.LogInfo("Attempting to reconnect to XMPP server") b.connected.Store(false) if b.bridgeClient != nil { _ = b.bridgeClient.Disconnect() } // Retry connection with exponential backoff maxRetries := 3 for i := range maxRetries { backoff := time.Duration(1< Mattermost) directionalMessage := &pluginModel.DirectionalMessage{ BridgeMessage: bridgeMessage, Direction: pluginModel.DirectionIncoming, } // Send to bridge's message channel select { case b.incomingMessages <- directionalMessage: b.logger.LogDebug("XMPP message queued for processing", "channel_id", channelID, "user_id", userID, "message_id", msg.ID) default: b.logger.LogWarn("Bridge message channel full, dropping message", "channel_id", channelID, "user_id", userID) } return nil }