feat: implement production-ready MUC operations and comprehensive testing

- Implement proper XMPP MUC operations using mellium.im/xmpp/muc package
- Add session readiness checking to prevent blocking on room joins
- Create comprehensive bridge manager architecture with lifecycle management
- Add complete channel mapping functionality with KV store persistence
- Remove defensive logger nil checks as requested by user
- Enhance XMPP client doctor with MUC testing (join/wait/leave workflow)
- Add detailed dev server documentation for test room creation
- Implement timeout protection for all MUC operations
- Add proper error handling with fmt.Errorf instead of pkg/errors
- Successfully tested: MUC join in ~21ms, 5s wait, clean leave operation

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Felipe M 2025-08-01 13:47:15 +02:00
parent 4d6929bab6
commit d159c668c2
No known key found for this signature in database
GPG key ID: 52E5D65FCF99808A
11 changed files with 1048 additions and 553 deletions

216
server/bridge/manager.go Normal file
View file

@ -0,0 +1,216 @@
package bridge
import (
"fmt"
"sync"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
)
// Manager manages multiple bridge instances
type Manager struct {
bridges map[string]model.Bridge
mu sync.RWMutex
logger logger.Logger
}
// NewManager creates a new bridge manager
func NewManager(logger logger.Logger) model.BridgeManager {
if logger == nil {
panic("logger cannot be nil")
}
return &Manager{
bridges: make(map[string]model.Bridge),
logger: logger,
}
}
// RegisterBridge registers a bridge with the manager
func (m *Manager) RegisterBridge(name string, bridge model.Bridge) error {
if name == "" {
return fmt.Errorf("bridge name cannot be empty")
}
if bridge == nil {
return fmt.Errorf("bridge cannot be nil")
}
m.mu.Lock()
defer m.mu.Unlock()
if _, exists := m.bridges[name]; exists {
return fmt.Errorf("bridge '%s' is already registered", name)
}
m.bridges[name] = bridge
m.logger.LogInfo("Bridge registered", "name", name)
return nil
}
// StartBridge starts a specific bridge
func (m *Manager) StartBridge(name string) error {
m.mu.RLock()
bridge, exists := m.bridges[name]
m.mu.RUnlock()
if !exists {
return fmt.Errorf("bridge '%s' is not registered", name)
}
m.logger.LogInfo("Starting bridge", "name", name)
if err := bridge.Start(); err != nil {
m.logger.LogError("Failed to start bridge", "name", name, "error", err)
return fmt.Errorf("failed to start bridge '%s': %w", name, err)
}
m.logger.LogInfo("Bridge started successfully", "name", name)
return nil
}
// StopBridge stops a specific bridge
func (m *Manager) StopBridge(name string) error {
m.mu.RLock()
bridge, exists := m.bridges[name]
m.mu.RUnlock()
if !exists {
return fmt.Errorf("bridge '%s' is not registered", name)
}
m.logger.LogInfo("Stopping bridge", "name", name)
if err := bridge.Stop(); err != nil {
m.logger.LogError("Failed to stop bridge", "name", name, "error", err)
return fmt.Errorf("failed to stop bridge '%s': %w", name, err)
}
m.logger.LogInfo("Bridge stopped successfully", "name", name)
return nil
}
// UnregisterBridge removes a bridge from the manager
func (m *Manager) UnregisterBridge(name string) error {
m.mu.Lock()
defer m.mu.Unlock()
bridge, exists := m.bridges[name]
if !exists {
return fmt.Errorf("bridge '%s' is not registered", name)
}
// Stop the bridge before unregistering
if bridge.IsConnected() {
if err := bridge.Stop(); err != nil {
m.logger.LogWarn("Failed to stop bridge during unregistration", "name", name, "error", err)
}
}
delete(m.bridges, name)
m.logger.LogInfo("Bridge unregistered", "name", name)
return nil
}
// GetBridge retrieves a bridge by name
func (m *Manager) GetBridge(name string) (model.Bridge, error) {
m.mu.RLock()
defer m.mu.RUnlock()
bridge, exists := m.bridges[name]
if !exists {
return nil, fmt.Errorf("bridge '%s' is not registered", name)
}
return bridge, nil
}
// ListBridges returns a list of all registered bridge names
func (m *Manager) ListBridges() []string {
m.mu.RLock()
defer m.mu.RUnlock()
bridges := make([]string, 0, len(m.bridges))
for name := range m.bridges {
bridges = append(bridges, name)
}
return bridges
}
// HasBridge checks if a bridge with the given name is registered
func (m *Manager) HasBridge(name string) bool {
m.mu.RLock()
defer m.mu.RUnlock()
_, exists := m.bridges[name]
return exists
}
// HasBridges checks if any bridges are registered
func (m *Manager) HasBridges() bool {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.bridges) > 0
}
// Shutdown stops and unregisters all bridges
func (m *Manager) Shutdown() error {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.LogInfo("Shutting down bridge manager", "bridge_count", len(m.bridges))
var errors []error
for name, bridge := range m.bridges {
if bridge.IsConnected() {
if err := bridge.Stop(); err != nil {
errors = append(errors, fmt.Errorf("failed to stop bridge '%s': %w", name, err))
m.logger.LogError("Failed to stop bridge during shutdown", "name", name, "error", err)
}
}
}
// Clear all bridges
m.bridges = make(map[string]model.Bridge)
m.logger.LogInfo("Bridge manager shutdown complete")
if len(errors) > 0 {
return fmt.Errorf("shutdown completed with errors: %v", errors)
}
return nil
}
// OnPluginConfigurationChange propagates configuration changes to all registered bridges
func (m *Manager) OnPluginConfigurationChange(config any) error {
m.mu.RLock()
defer m.mu.RUnlock()
if len(m.bridges) == 0 {
return nil
}
m.logger.LogInfo("Plugin configuration changed, propagating to bridges", "bridge_count", len(m.bridges))
var errors []error
for name, bridge := range m.bridges {
if err := bridge.UpdateConfiguration(config); err != nil {
errors = append(errors, fmt.Errorf("failed to update configuration for bridge '%s': %w", name, err))
m.logger.LogError("Failed to update bridge configuration", "name", name, "error", err)
} else {
m.logger.LogDebug("Successfully updated bridge configuration", "name", name)
}
}
if len(errors) > 0 {
return fmt.Errorf("configuration update completed with errors: %v", errors)
}
m.logger.LogInfo("Configuration changes propagated to all bridges")
return nil
}

View file

@ -1,450 +0,0 @@
package mattermost
import (
"context"
"crypto/tls"
"sync"
"sync/atomic"
"time"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/store/kvstore"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp"
"github.com/mattermost/mattermost/server/public/plugin"
"github.com/pkg/errors"
)
// MattermostToXMPPBridge handles syncing messages from Mattermost to XMPP
type MattermostToXMPPBridge struct {
logger logger.Logger
api plugin.API
kvstore kvstore.KVStore
xmppClient *xmpp.Client
// Connection management
connected atomic.Bool
ctx context.Context
cancel context.CancelFunc
// Current configuration
config *config.Configuration
configMu sync.RWMutex
// Channel mappings cache
channelMappings map[string]string
mappingsMu sync.RWMutex
}
// NewMattermostToXMPPBridge creates a new Mattermost to XMPP bridge
func NewMattermostToXMPPBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration) *MattermostToXMPPBridge {
ctx, cancel := context.WithCancel(context.Background())
bridge := &MattermostToXMPPBridge{
logger: log,
api: api,
kvstore: kvstore,
ctx: ctx,
cancel: cancel,
channelMappings: make(map[string]string),
config: cfg,
}
// Initialize XMPP client with configuration
if cfg.EnableSync && cfg.XMPPServerURL != "" && cfg.XMPPUsername != "" && cfg.XMPPPassword != "" {
bridge.xmppClient = bridge.createXMPPClient(cfg)
}
return bridge
}
// createXMPPClient creates an XMPP client with the given configuration
func (b *MattermostToXMPPBridge) createXMPPClient(cfg *config.Configuration) *xmpp.Client {
// Create TLS config based on certificate verification setting
tlsConfig := &tls.Config{
InsecureSkipVerify: cfg.XMPPInsecureSkipVerify,
}
return xmpp.NewClientWithTLS(
cfg.XMPPServerURL,
cfg.XMPPUsername,
cfg.XMPPPassword,
cfg.GetXMPPResource(),
"", // remoteID not needed for bridge user
tlsConfig,
)
}
// UpdateConfiguration updates the bridge configuration
func (b *MattermostToXMPPBridge) UpdateConfiguration(newConfig any) error {
cfg, ok := newConfig.(*config.Configuration)
if !ok {
return errors.New("invalid configuration type")
}
b.configMu.Lock()
oldConfig := b.config
b.config = cfg
// Initialize or update XMPP client with new configuration
if cfg.EnableSync {
if cfg.XMPPServerURL == "" || cfg.XMPPUsername == "" || cfg.XMPPPassword == "" {
b.configMu.Unlock()
return errors.New("XMPP server URL, username, and password are required when sync is enabled")
}
b.xmppClient = b.createXMPPClient(cfg)
} else {
b.xmppClient = nil
}
b.configMu.Unlock()
// Check if we need to restart the bridge due to configuration changes
wasConnected := b.connected.Load()
needsRestart := oldConfig != nil && !oldConfig.Equals(cfg) && wasConnected
// Log the configuration change
if b.logger != nil {
if needsRestart {
b.logger.LogInfo("Configuration changed, restarting bridge", "old_config", oldConfig, "new_config", cfg)
} else {
b.logger.LogInfo("Configuration updated", "config", cfg)
}
}
if needsRestart {
if b.logger != nil {
b.logger.LogInfo("Configuration changed, restarting bridge")
}
// Stop the bridge
if err := b.Stop(); err != nil && b.logger != nil {
b.logger.LogWarn("Error stopping bridge during restart", "error", err)
}
// Start the bridge with new configuration
if err := b.Start(); err != nil {
if b.logger != nil {
b.logger.LogError("Failed to restart bridge with new configuration", "error", err)
}
return errors.Wrap(err, "failed to restart bridge")
}
}
return nil
}
// Start initializes the bridge and connects to XMPP
func (b *MattermostToXMPPBridge) Start() error {
b.logger.LogDebug("Starting Mattermost to XMPP bridge")
b.configMu.RLock()
config := b.config
b.configMu.RUnlock()
if config == nil {
return errors.New("bridge configuration not set")
}
// Print the configuration for debugging
b.logger.LogDebug("Bridge configuration", "config", config)
if !config.EnableSync {
if b.logger != nil {
b.logger.LogInfo("XMPP sync is disabled, bridge will not start")
}
return nil
}
if b.logger != nil {
b.logger.LogInfo("Starting Mattermost to XMPP bridge", "xmpp_server", config.XMPPServerURL, "username", config.XMPPUsername)
}
// Connect to XMPP server
if err := b.connectToXMPP(); err != nil {
return errors.Wrap(err, "failed to connect to XMPP server")
}
// Load and join mapped channels
if err := b.loadAndJoinMappedChannels(); err != nil {
if b.logger != nil {
b.logger.LogWarn("Failed to join some mapped channels", "error", err)
}
}
// Start connection monitor
go b.connectionMonitor()
if b.logger != nil {
b.logger.LogInfo("Mattermost to XMPP bridge started successfully")
}
return nil
}
// Stop shuts down the bridge
func (b *MattermostToXMPPBridge) Stop() error {
if b.logger != nil {
b.logger.LogInfo("Stopping Mattermost to XMPP bridge")
}
if b.cancel != nil {
b.cancel()
}
if b.xmppClient != nil {
if err := b.xmppClient.Disconnect(); err != nil && b.logger != nil {
b.logger.LogWarn("Error disconnecting from XMPP server", "error", err)
}
}
b.connected.Store(false)
if b.logger != nil {
b.logger.LogInfo("Mattermost to XMPP bridge stopped")
}
return nil
}
// connectToXMPP establishes connection to the XMPP server
func (b *MattermostToXMPPBridge) connectToXMPP() error {
if b.xmppClient == nil {
return errors.New("XMPP client is not initialized")
}
if b.logger != nil {
b.logger.LogDebug("Connecting to XMPP server")
}
err := b.xmppClient.Connect()
if err != nil {
b.connected.Store(false)
return errors.Wrap(err, "failed to connect to XMPP server")
}
b.connected.Store(true)
if b.logger != nil {
b.logger.LogInfo("Successfully connected to XMPP server")
}
// Set online presence after successful connection
if err := b.xmppClient.SetOnlinePresence(); err != nil {
if b.logger != nil {
b.logger.LogWarn("Failed to set online presence", "error", err)
}
// Don't fail the connection for presence issues
} else if b.logger != nil {
b.logger.LogDebug("Set bridge user online presence")
}
return nil
}
// loadAndJoinMappedChannels loads channel mappings and joins corresponding XMPP rooms
func (b *MattermostToXMPPBridge) loadAndJoinMappedChannels() error {
if b.logger != nil {
b.logger.LogDebug("Loading and joining mapped channels")
}
// Get all channel mappings from KV store
mappings, err := b.getAllChannelMappings()
if err != nil {
return errors.Wrap(err, "failed to load channel mappings")
}
if len(mappings) == 0 {
if b.logger != nil {
b.logger.LogInfo("No channel mappings found, no rooms to join")
}
return nil
}
if b.logger != nil {
b.logger.LogInfo("Found channel mappings, joining XMPP rooms", "count", len(mappings))
}
// Join each mapped room
for channelID, roomJID := range mappings {
if err := b.joinXMPPRoom(channelID, roomJID); err != nil && b.logger != nil {
b.logger.LogWarn("Failed to join room", "channel_id", channelID, "room_jid", roomJID, "error", err)
}
}
return nil
}
// joinXMPPRoom joins an XMPP room and updates the local cache
func (b *MattermostToXMPPBridge) joinXMPPRoom(channelID, roomJID string) error {
if !b.connected.Load() {
return errors.New("not connected to XMPP server")
}
err := b.xmppClient.JoinRoom(roomJID)
if err != nil {
return errors.Wrap(err, "failed to join XMPP room")
}
// Update local cache
b.mappingsMu.Lock()
b.channelMappings[channelID] = roomJID
b.mappingsMu.Unlock()
return nil
}
// getAllChannelMappings retrieves all channel mappings from KV store
func (b *MattermostToXMPPBridge) getAllChannelMappings() (map[string]string, error) {
mappings := make(map[string]string)
return mappings, nil
}
// connectionMonitor monitors the XMPP connection
func (b *MattermostToXMPPBridge) connectionMonitor() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-b.ctx.Done():
return
case <-ticker.C:
if err := b.checkConnection(); err != nil {
if b.logger != nil {
b.logger.LogWarn("XMPP connection check failed", "error", err)
}
b.handleReconnection()
}
}
}
}
// checkConnection verifies the XMPP connection is still active
func (b *MattermostToXMPPBridge) checkConnection() error {
if !b.connected.Load() {
return errors.New("not connected")
}
return b.xmppClient.TestConnection()
}
// handleReconnection attempts to reconnect to XMPP and rejoin rooms
func (b *MattermostToXMPPBridge) handleReconnection() {
b.configMu.RLock()
config := b.config
b.configMu.RUnlock()
if config == nil || !config.EnableSync {
return
}
if b.logger != nil {
b.logger.LogInfo("Attempting to reconnect to XMPP server")
}
b.connected.Store(false)
if b.xmppClient != nil {
b.xmppClient.Disconnect()
}
// Retry connection with exponential backoff
maxRetries := 3
for i := 0; i < maxRetries; i++ {
backoff := time.Duration(1<<uint(i)) * time.Second
select {
case <-b.ctx.Done():
return
case <-time.After(backoff):
}
if err := b.connectToXMPP(); err != nil {
if b.logger != nil {
b.logger.LogWarn("Reconnection attempt failed", "attempt", i+1, "error", err)
}
continue
}
if err := b.loadAndJoinMappedChannels(); err != nil && b.logger != nil {
b.logger.LogWarn("Failed to rejoin rooms after reconnection", "error", err)
}
if b.logger != nil {
b.logger.LogInfo("Successfully reconnected to XMPP server")
}
return
}
if b.logger != nil {
b.logger.LogError("Failed to reconnect to XMPP server after all attempts")
}
}
// Public API methods
// IsConnected returns whether the bridge is connected to XMPP
func (b *MattermostToXMPPBridge) IsConnected() bool {
return b.connected.Load()
}
// CreateChannelRoomMapping creates a mapping between a Mattermost channel and XMPP room
func (b *MattermostToXMPPBridge) CreateChannelRoomMapping(channelID, roomJID string) error {
if b.kvstore == nil {
return errors.New("KV store not initialized")
}
// Store forward and reverse mappings
err := b.kvstore.Set(kvstore.BuildChannelMappingKey(channelID), []byte(roomJID))
if err != nil {
return errors.Wrap(err, "failed to store channel room mapping")
}
err = b.kvstore.Set(kvstore.BuildRoomMappingKey(roomJID), []byte(channelID))
if err != nil {
return errors.Wrap(err, "failed to store reverse room mapping")
}
// Update local cache
b.mappingsMu.Lock()
b.channelMappings[channelID] = roomJID
b.mappingsMu.Unlock()
// Join the room if connected
if b.connected.Load() {
if err := b.xmppClient.JoinRoom(roomJID); err != nil && b.logger != nil {
b.logger.LogWarn("Failed to join newly mapped room", "channel_id", channelID, "room_jid", roomJID, "error", err)
}
}
if b.logger != nil {
b.logger.LogInfo("Created channel room mapping", "channel_id", channelID, "room_jid", roomJID)
}
return nil
}
// GetChannelRoomMapping gets the XMPP room JID for a Mattermost channel
func (b *MattermostToXMPPBridge) GetChannelRoomMapping(channelID string) (string, error) {
// Check cache first
b.mappingsMu.RLock()
roomJID, exists := b.channelMappings[channelID]
b.mappingsMu.RUnlock()
if exists {
return roomJID, nil
}
if b.kvstore == nil {
return "", errors.New("KV store not initialized")
}
// Load from KV store
roomJIDBytes, err := b.kvstore.Get(kvstore.BuildChannelMappingKey(channelID))
if err != nil {
return "", nil // Unmapped channels are expected
}
roomJID = string(roomJIDBytes)
// Update cache
b.mappingsMu.Lock()
b.channelMappings[channelID] = roomJID
b.mappingsMu.Unlock()
return roomJID, nil
}

View file

@ -1,11 +1,441 @@
package xmpp
// XMPPToMattermostBridge handles syncing messages from XMPP to Mattermost
type XMPPToMattermostBridge struct {
// TODO: Implement in Phase 4
import (
"context"
"crypto/tls"
"sync"
"sync/atomic"
"time"
"fmt"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/store/kvstore"
xmppClient "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp"
"github.com/mattermost/mattermost/server/public/plugin"
)
// xmppBridge handles syncing messages between Mattermost and XMPP
type xmppBridge struct {
logger logger.Logger
api plugin.API
kvstore kvstore.KVStore
xmppClient *xmppClient.Client
// Connection management
connected atomic.Bool
ctx context.Context
cancel context.CancelFunc
// Current configuration
config *config.Configuration
configMu sync.RWMutex
// Channel mappings cache
channelMappings map[string]string
mappingsMu sync.RWMutex
}
// NewXMPPToMattermostBridge creates a new XMPP to Mattermost bridge
func NewXMPPToMattermostBridge() *XMPPToMattermostBridge {
return &XMPPToMattermostBridge{}
}
// NewBridge creates a new XMPP bridge
func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration) pluginModel.Bridge {
ctx, cancel := context.WithCancel(context.Background())
bridge := &xmppBridge{
logger: log,
api: api,
kvstore: kvstore,
ctx: ctx,
cancel: cancel,
channelMappings: make(map[string]string),
config: cfg,
}
// Initialize XMPP client with configuration
if cfg.EnableSync && cfg.XMPPServerURL != "" && cfg.XMPPUsername != "" && cfg.XMPPPassword != "" {
bridge.xmppClient = bridge.createXMPPClient(cfg)
}
return bridge
}
// createXMPPClient creates an XMPP client with the given configuration
func (b *xmppBridge) createXMPPClient(cfg *config.Configuration) *xmppClient.Client {
// Create TLS config based on certificate verification setting
tlsConfig := &tls.Config{
InsecureSkipVerify: cfg.XMPPInsecureSkipVerify,
}
return xmppClient.NewClientWithTLS(
cfg.XMPPServerURL,
cfg.XMPPUsername,
cfg.XMPPPassword,
cfg.GetXMPPResource(),
"", // remoteID not needed for bridge user
tlsConfig,
)
}
// UpdateConfiguration updates the bridge configuration
func (b *xmppBridge) UpdateConfiguration(newConfig any) error {
cfg, ok := newConfig.(*config.Configuration)
if !ok {
return fmt.Errorf("invalid configuration type")
}
b.configMu.Lock()
oldConfig := b.config
b.config = cfg
// Initialize or update XMPP client with new configuration
if cfg.EnableSync {
if cfg.XMPPServerURL == "" || cfg.XMPPUsername == "" || cfg.XMPPPassword == "" {
b.configMu.Unlock()
return fmt.Errorf("XMPP server URL, username, and password are required when sync is enabled")
}
b.xmppClient = b.createXMPPClient(cfg)
} else {
b.xmppClient = nil
}
b.configMu.Unlock()
// Check if we need to restart the bridge due to configuration changes
wasConnected := b.connected.Load()
needsRestart := oldConfig != nil && !oldConfig.Equals(cfg) && wasConnected
// Log the configuration change
if needsRestart {
b.logger.LogInfo("Configuration changed, restarting bridge", "old_config", oldConfig, "new_config", cfg)
} else {
b.logger.LogInfo("Configuration updated", "config", cfg)
}
if needsRestart {
b.logger.LogInfo("Configuration changed, restarting bridge")
// Stop the bridge
if err := b.Stop(); err != nil {
b.logger.LogWarn("Error stopping bridge during restart", "error", err)
}
// Start the bridge with new configuration
if err := b.Start(); err != nil {
b.logger.LogError("Failed to restart bridge with new configuration", "error", err)
return fmt.Errorf("failed to restart bridge: %w", err)
}
}
return nil
}
// Start initializes the bridge and connects to XMPP
func (b *xmppBridge) Start() error {
b.logger.LogDebug("Starting Mattermost to XMPP bridge")
b.configMu.RLock()
config := b.config
b.configMu.RUnlock()
if config == nil {
return fmt.Errorf("bridge configuration not set")
}
// Print the configuration for debugging
b.logger.LogDebug("Bridge configuration", "config", config)
if !config.EnableSync {
b.logger.LogInfo("XMPP sync is disabled, bridge will not start")
return nil
}
b.logger.LogInfo("Starting Mattermost to XMPP bridge", "xmpp_server", config.XMPPServerURL, "username", config.XMPPUsername)
// Connect to XMPP server
if err := b.connectToXMPP(); err != nil {
return fmt.Errorf("failed to connect to XMPP server: %w", err)
}
// Load and join mapped channels
if err := b.loadAndJoinMappedChannels(); err != nil {
b.logger.LogWarn("Failed to join some mapped channels", "error", err)
}
// Start connection monitor
go b.connectionMonitor()
b.logger.LogInfo("Mattermost to XMPP bridge started successfully")
return nil
}
// Stop shuts down the bridge
func (b *xmppBridge) Stop() error {
b.logger.LogInfo("Stopping Mattermost to XMPP bridge")
if b.cancel != nil {
b.cancel()
}
if b.xmppClient != nil {
if err := b.xmppClient.Disconnect(); err != nil {
b.logger.LogWarn("Error disconnecting from XMPP server", "error", err)
}
}
b.connected.Store(false)
b.logger.LogInfo("Mattermost to XMPP bridge stopped")
return nil
}
// connectToXMPP establishes connection to the XMPP server
func (b *xmppBridge) connectToXMPP() error {
if b.xmppClient == nil {
return fmt.Errorf("XMPP client is not initialized")
}
b.logger.LogDebug("Connecting to XMPP server")
err := b.xmppClient.Connect()
if err != nil {
b.connected.Store(false)
return fmt.Errorf("failed to connect to XMPP server: %w", err)
}
b.connected.Store(true)
b.logger.LogInfo("Successfully connected to XMPP server")
// Set online presence after successful connection
if err := b.xmppClient.SetOnlinePresence(); err != nil {
b.logger.LogWarn("Failed to set online presence", "error", err)
// Don't fail the connection for presence issues
} else {
b.logger.LogDebug("Set bridge user online presence")
}
return nil
}
// loadAndJoinMappedChannels loads channel mappings and joins corresponding XMPP rooms
func (b *xmppBridge) loadAndJoinMappedChannels() error {
b.logger.LogDebug("Loading and joining mapped channels")
// Get all channel mappings from KV store
mappings, err := b.getAllChannelMappings()
if err != nil {
return fmt.Errorf("failed to load channel mappings: %w", err)
}
if len(mappings) == 0 {
b.logger.LogInfo("No channel mappings found, no rooms to join")
return nil
}
b.logger.LogInfo("Found channel mappings, joining XMPP rooms", "count", len(mappings))
// Join each mapped room
for channelID, roomJID := range mappings {
if err := b.joinXMPPRoom(channelID, roomJID); err != nil {
b.logger.LogWarn("Failed to join room", "channel_id", channelID, "room_jid", roomJID, "error", err)
}
}
return nil
}
// joinXMPPRoom joins an XMPP room and updates the local cache
func (b *xmppBridge) joinXMPPRoom(channelID, roomJID string) error {
if !b.connected.Load() {
return fmt.Errorf("not connected to XMPP server")
}
err := b.xmppClient.JoinRoom(roomJID)
if err != nil {
return fmt.Errorf("failed to join XMPP room: %w", err)
}
b.logger.LogInfo("Joined XMPP room", "channel_id", channelID, "room_jid", roomJID)
// Update local cache
b.mappingsMu.Lock()
b.channelMappings[channelID] = roomJID
b.mappingsMu.Unlock()
return nil
}
// getAllChannelMappings retrieves all channel mappings from KV store
func (b *xmppBridge) getAllChannelMappings() (map[string]string, error) {
if b.kvstore == nil {
return nil, fmt.Errorf("KV store not initialized")
}
mappings := make(map[string]string)
// Get all keys with the channel mapping prefix
keys, err := b.kvstore.ListKeysWithPrefix(0, 1000, kvstore.KeyPrefixChannelMapping)
if err != nil {
return nil, fmt.Errorf("failed to list channel mapping keys: %w", err)
}
// Load each mapping
for _, key := range keys {
roomJIDBytes, err := b.kvstore.Get(key)
if err != nil {
b.logger.LogWarn("Failed to load mapping for key", "key", key, "error", err)
continue
}
// Extract channel ID from the key
channelID := kvstore.ExtractChannelIDFromKey(key)
if channelID == "" {
b.logger.LogWarn("Failed to extract channel ID from key", "key", key)
continue
}
mappings[channelID] = string(roomJIDBytes)
}
return mappings, nil
}
// connectionMonitor monitors the XMPP connection
func (b *xmppBridge) connectionMonitor() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-b.ctx.Done():
return
case <-ticker.C:
if err := b.checkConnection(); err != nil {
b.logger.LogWarn("XMPP connection check failed", "error", err)
b.handleReconnection()
}
}
}
}
// checkConnection verifies the XMPP connection is still active
func (b *xmppBridge) checkConnection() error {
if !b.connected.Load() {
return fmt.Errorf("not connected")
}
return b.xmppClient.TestConnection()
}
// handleReconnection attempts to reconnect to XMPP and rejoin rooms
func (b *xmppBridge) handleReconnection() {
b.configMu.RLock()
config := b.config
b.configMu.RUnlock()
if config == nil || !config.EnableSync {
return
}
b.logger.LogInfo("Attempting to reconnect to XMPP server")
b.connected.Store(false)
if b.xmppClient != nil {
b.xmppClient.Disconnect()
}
// Retry connection with exponential backoff
maxRetries := 3
for i := range maxRetries {
backoff := time.Duration(1<<uint(i)) * time.Second
select {
case <-b.ctx.Done():
return
case <-time.After(backoff):
}
if err := b.connectToXMPP(); err != nil {
b.logger.LogWarn("Reconnection attempt failed", "attempt", i+1, "error", err)
continue
}
if err := b.loadAndJoinMappedChannels(); err != nil {
b.logger.LogWarn("Failed to rejoin rooms after reconnection", "error", err)
}
b.logger.LogInfo("Successfully reconnected to XMPP server")
return
}
b.logger.LogError("Failed to reconnect to XMPP server after all attempts")
}
// Public API methods
// IsConnected returns whether the bridge is connected to XMPP
func (b *xmppBridge) IsConnected() bool {
return b.connected.Load()
}
// CreateChannelRoomMapping creates a mapping between a Mattermost channel and XMPP room
func (b *xmppBridge) CreateChannelRoomMapping(channelID, roomJID string) error {
if b.kvstore == nil {
return fmt.Errorf("KV store not initialized")
}
// Store forward and reverse mappings
err := b.kvstore.Set(kvstore.BuildChannelMappingKey(channelID), []byte(roomJID))
if err != nil {
return fmt.Errorf("failed to store channel room mapping: %w", err)
}
err = b.kvstore.Set(kvstore.BuildRoomMappingKey(roomJID), []byte(channelID))
if err != nil {
return fmt.Errorf("failed to store reverse room mapping: %w", err)
}
// Update local cache
b.mappingsMu.Lock()
b.channelMappings[channelID] = roomJID
b.mappingsMu.Unlock()
// Join the room if connected
if b.connected.Load() {
if err := b.xmppClient.JoinRoom(roomJID); err != nil {
b.logger.LogWarn("Failed to join newly mapped room", "channel_id", channelID, "room_jid", roomJID, "error", err)
}
}
b.logger.LogInfo("Created channel room mapping", "channel_id", channelID, "room_jid", roomJID)
return nil
}
// GetChannelRoomMapping gets the XMPP room JID for a Mattermost channel
func (b *xmppBridge) GetChannelRoomMapping(channelID string) (string, error) {
// Check cache first
b.mappingsMu.RLock()
roomJID, exists := b.channelMappings[channelID]
b.mappingsMu.RUnlock()
if exists {
return roomJID, nil
}
if b.kvstore == nil {
return "", fmt.Errorf("KV store not initialized")
}
// Load from KV store
roomJIDBytes, err := b.kvstore.Get(kvstore.BuildChannelMappingKey(channelID))
if err != nil {
return "", nil // Unmapped channels are expected
}
roomJID = string(roomJIDBytes)
// Update cache
b.mappingsMu.Lock()
b.channelMappings[channelID] = roomJID
b.mappingsMu.Unlock()
return roomJID, nil
}