feat: restore XMPP bridge to use direct client connection instead of bridge user
Some checks are pending
ci / plugin-ci (push) Waiting to run

- Replace bridgeUser with bridgeClient (*xmppClient.Client) in XMPP bridge
- Update createXMPPClient to return XMPP client with TLS configuration
- Migrate connection, disconnection, and room operations to use bridgeClient
- Update Ping() and RoomExists() methods to use client methods directly
- Maintain bridge-agnostic user management system for additional users
- Fix formatting and import organization across bridge components

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Felipe M 2025-08-04 18:04:10 +02:00
parent db8037ffbf
commit 65038fb7a2
No known key found for this signature in database
GPG key ID: 52E5D65FCF99808A
6 changed files with 101 additions and 87 deletions

View file

@ -194,13 +194,13 @@ func (u *MattermostUser) Ping() error {
if u.api == nil {
return fmt.Errorf("Mattermost API not initialized for user %s", u.id)
}
// Test API connectivity by getting server version
version := u.api.GetServerVersion()
if version == "" {
return fmt.Errorf("Mattermost API ping returned empty server version for user %s", u.id)
}
return nil
}
@ -209,7 +209,7 @@ func (u *MattermostUser) CheckChannelExists(channelID string) (bool, error) {
if u.api == nil {
return false, fmt.Errorf("Mattermost API not initialized for user %s", u.id)
}
// Try to get the channel by ID
_, appErr := u.api.GetChannel(channelID)
if appErr != nil {
@ -219,7 +219,7 @@ func (u *MattermostUser) CheckChannelExists(channelID string) (bool, error) {
}
return false, fmt.Errorf("failed to check channel existence: %w", appErr)
}
return true, nil
}
@ -297,4 +297,4 @@ func (u *MattermostUser) GetEmail() string {
// GetAPI returns the Mattermost API instance (for advanced operations)
func (u *MattermostUser) GetAPI() plugin.API {
return u.api
}
}

View file

@ -2,6 +2,7 @@ package xmpp
import (
"context"
"crypto/tls"
"sync"
"sync/atomic"
"time"
@ -11,19 +12,19 @@ import (
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/bridge"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/store/kvstore"
xmppClient "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp"
"github.com/mattermost/mattermost/server/public/plugin"
)
// xmppBridge handles syncing messages between Mattermost and XMPP
type xmppBridge struct {
logger logger.Logger
api plugin.API
kvstore kvstore.KVStore
bridgeUser model.BridgeUser // Handles the bridge user and main bridge XMPP connection
userManager pluginModel.BridgeUserManager
logger logger.Logger
api plugin.API
kvstore kvstore.KVStore
bridgeClient *xmppClient.Client // Main bridge XMPP client connection
userManager pluginModel.BridgeUserManager
// Connection management
connected atomic.Bool
@ -55,15 +56,28 @@ func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *
// Initialize XMPP client with configuration
if cfg.EnableSync && cfg.XMPPServerURL != "" && cfg.XMPPUsername != "" && cfg.XMPPPassword != "" {
b.bridgeUser = b.createXMPPClient(cfg)
b.bridgeClient = b.createXMPPClient(cfg)
}
return b
}
// createXMPPClient creates an XMPP client with the given configuration
func (b *xmppBridge) createXMPPClient(cfg *config.Configuration) model.BridgeUser {
return NewXMPPUser("_bridge_", "Bridge User", cfg.XMPPUsername, cfg, b.logger)
func (b *xmppBridge) createXMPPClient(cfg *config.Configuration) *xmppClient.Client {
// Create TLS config based on certificate verification setting
tlsConfig := &tls.Config{
InsecureSkipVerify: cfg.XMPPInsecureSkipVerify,
}
return xmppClient.NewClientWithTLS(
cfg.XMPPServerURL,
cfg.XMPPUsername,
cfg.XMPPPassword,
cfg.GetXMPPResource(),
"", // remoteID not needed for bridge client
tlsConfig,
b.logger,
)
}
// UpdateConfiguration updates the bridge configuration
@ -86,9 +100,9 @@ func (b *xmppBridge) UpdateConfiguration(newConfig any) error {
return fmt.Errorf("XMPP server URL, username, and password are required when sync is enabled")
}
b.bridgeUser = b.createXMPPClient(cfg)
b.bridgeClient = b.createXMPPClient(cfg)
} else {
b.bridgeUser = nil
b.bridgeClient = nil
}
// Check if we need to restart the bridge due to configuration changes
@ -164,8 +178,8 @@ func (b *xmppBridge) Stop() error {
b.cancel()
}
if b.bridgeUser != nil {
if err := b.bridgeUser.Disconnect(); err != nil {
if b.bridgeClient != nil {
if err := b.bridgeClient.Disconnect(); err != nil {
b.logger.LogWarn("Error disconnecting from XMPP server", "error", err)
}
}
@ -177,13 +191,13 @@ func (b *xmppBridge) Stop() error {
// connectToXMPP establishes connection to the XMPP server
func (b *xmppBridge) connectToXMPP() error {
if b.bridgeUser == nil {
if b.bridgeClient == nil {
return fmt.Errorf("XMPP client is not initialized")
}
b.logger.LogDebug("Connecting to XMPP server")
err := b.bridgeUser.Connect()
err := b.bridgeClient.Connect()
if err != nil {
b.connected.Store(false)
return fmt.Errorf("failed to connect to XMPP server: %w", err)
@ -193,11 +207,11 @@ func (b *xmppBridge) connectToXMPP() error {
b.logger.LogInfo("Successfully connected to XMPP server")
// Set online presence after successful connection
if err := b.bridgeUser.SetState(pluginModel.UserStateOnline); err != nil {
if err := b.bridgeClient.SetOnlinePresence(); err != nil {
b.logger.LogWarn("Failed to set online presence", "error", err)
// Don't fail the connection for presence issues
} else {
b.logger.LogDebug("Set bridge user online presence")
b.logger.LogDebug("Set bridge client online presence")
}
return nil
@ -236,7 +250,7 @@ func (b *xmppBridge) joinXMPPRoom(channelID, roomJID string) error {
return fmt.Errorf("not connected to XMPP server")
}
err := b.bridgeUser.JoinChannel(roomJID)
err := b.bridgeClient.JoinRoom(roomJID)
if err != nil {
return fmt.Errorf("failed to join XMPP room: %w", err)
}
@ -319,8 +333,8 @@ func (b *xmppBridge) handleReconnection() {
b.logger.LogInfo("Attempting to reconnect to XMPP server")
b.connected.Store(false)
if b.bridgeUser != nil {
_ = b.bridgeUser.Disconnect()
if b.bridgeClient != nil {
_ = b.bridgeClient.Disconnect()
}
// Retry connection with exponential backoff
@ -363,14 +377,14 @@ func (b *xmppBridge) Ping() error {
return fmt.Errorf("XMPP bridge is not connected")
}
if b.bridgeUser == nil {
if b.bridgeClient == nil {
return fmt.Errorf("XMPP client not initialized")
}
b.logger.LogDebug("Testing XMPP bridge connectivity with ping")
// Use the XMPP user's ping method
if err := b.bridgeUser.Ping(); err != nil {
// Use the XMPP client's ping method
if err := b.bridgeClient.Ping(); err != nil {
b.logger.LogWarn("XMPP bridge ping failed", "error", err)
return fmt.Errorf("XMPP bridge ping failed: %w", err)
}
@ -397,7 +411,7 @@ func (b *xmppBridge) CreateChannelMapping(channelID, roomJID string) error {
// Join the room if connected
if b.connected.Load() {
if err := b.bridgeUser.JoinChannel(roomJID); err != nil {
if err := b.bridgeClient.JoinRoom(roomJID); err != nil {
b.logger.LogWarn("Failed to join newly mapped room", "channel_id", channelID, "room_jid", roomJID, "error", err)
}
}
@ -463,8 +477,8 @@ func (b *xmppBridge) DeleteChannelMapping(channelID string) error {
b.mappingsMu.Unlock()
// Leave the room if connected
if b.connected.Load() && b.bridgeUser != nil {
if err := b.bridgeUser.LeaveChannel(roomJID); err != nil {
if b.connected.Load() && b.bridgeClient != nil {
if err := b.bridgeClient.LeaveRoom(roomJID); err != nil {
b.logger.LogWarn("Failed to leave unmapped room", "channel_id", channelID, "room_jid", roomJID, "error", err)
// Don't fail the entire operation if leaving the room fails
} else {
@ -482,14 +496,14 @@ func (b *xmppBridge) RoomExists(roomID string) (bool, error) {
return false, fmt.Errorf("not connected to XMPP server")
}
if b.bridgeUser == nil {
if b.bridgeClient == nil {
return false, fmt.Errorf("XMPP client not initialized")
}
b.logger.LogDebug("Checking if XMPP room exists", "room_jid", roomID)
// Use the XMPP user to check room existence
exists, err := b.bridgeUser.CheckChannelExists(roomID)
// Use the XMPP client to check room existence
exists, err := b.bridgeClient.CheckRoomExists(roomID)
if err != nil {
b.logger.LogError("Failed to check room existence", "room_jid", roomID, "error", err)
return false, fmt.Errorf("failed to check room existence: %w", err)

View file

@ -43,7 +43,7 @@ type XMPPUser struct {
// NewXMPPUser creates a new XMPP user
func NewXMPPUser(id, displayName, jid string, cfg *config.Configuration, logger logger.Logger) *XMPPUser {
ctx, cancel := context.WithCancel(context.Background())
// Create TLS config based on certificate verification setting
tlsConfig := &tls.Config{
InsecureSkipVerify: cfg.XMPPInsecureSkipVerify,
@ -112,13 +112,13 @@ func (u *XMPPUser) GetState() model.UserState {
func (u *XMPPUser) SetState(state model.UserState) error {
u.stateMu.Lock()
defer u.stateMu.Unlock()
u.logger.LogDebug("Changing XMPP user state", "user_id", u.id, "old_state", u.state, "new_state", state)
u.state = state
// TODO: Send presence update to XMPP server based on state
// This would involve mapping UserState to XMPP presence types
return nil
}
@ -127,15 +127,15 @@ func (u *XMPPUser) JoinChannel(channelID string) error {
if !u.connected.Load() {
return fmt.Errorf("user %s is not connected", u.id)
}
u.logger.LogDebug("XMPP user joining channel", "user_id", u.id, "channel_id", channelID)
// For XMPP, channelID is the room JID
err := u.client.JoinRoom(channelID)
if err != nil {
return fmt.Errorf("failed to join XMPP room %s: %w", channelID, err)
}
u.logger.LogInfo("XMPP user joined channel", "user_id", u.id, "channel_id", channelID)
return nil
}
@ -144,15 +144,15 @@ func (u *XMPPUser) LeaveChannel(channelID string) error {
if !u.connected.Load() {
return fmt.Errorf("user %s is not connected", u.id)
}
u.logger.LogDebug("XMPP user leaving channel", "user_id", u.id, "channel_id", channelID)
// For XMPP, channelID is the room JID
err := u.client.LeaveRoom(channelID)
if err != nil {
return fmt.Errorf("failed to leave XMPP room %s: %w", channelID, err)
}
u.logger.LogInfo("XMPP user left channel", "user_id", u.id, "channel_id", channelID)
return nil
}
@ -161,21 +161,21 @@ func (u *XMPPUser) SendMessageToChannel(channelID, message string) error {
if !u.connected.Load() {
return fmt.Errorf("user %s is not connected", u.id)
}
u.logger.LogDebug("XMPP user sending message to channel", "user_id", u.id, "channel_id", channelID)
// Create message request for XMPP
req := xmppClient.MessageRequest{
RoomJID: channelID,
GhostUserJID: u.jid,
Message: message,
}
_, err := u.client.SendMessage(req)
if err != nil {
return fmt.Errorf("failed to send message to XMPP room %s: %w", channelID, err)
}
u.logger.LogDebug("XMPP user sent message to channel", "user_id", u.id, "channel_id", channelID)
return nil
}
@ -183,43 +183,43 @@ func (u *XMPPUser) SendMessageToChannel(channelID, message string) error {
// Connection lifecycle
func (u *XMPPUser) Connect() error {
u.logger.LogDebug("Connecting XMPP user", "user_id", u.id, "jid", u.jid)
err := u.client.Connect()
if err != nil {
u.connected.Store(false)
return fmt.Errorf("failed to connect XMPP user %s: %w", u.id, err)
}
u.connected.Store(true)
u.logger.LogInfo("XMPP user connected", "user_id", u.id, "jid", u.jid)
// Set online presence after successful connection
if err := u.client.SetOnlinePresence(); err != nil {
u.logger.LogWarn("Failed to set online presence for XMPP user", "user_id", u.id, "error", err)
// Don't fail the connection for presence issues
}
// Update state to online
_ = u.SetState(model.UserStateOnline)
return nil
}
func (u *XMPPUser) Disconnect() error {
u.logger.LogDebug("Disconnecting XMPP user", "user_id", u.id, "jid", u.jid)
if u.client == nil {
return nil
}
err := u.client.Disconnect()
if err != nil {
u.logger.LogWarn("Error disconnecting XMPP user", "user_id", u.id, "error", err)
}
u.connected.Store(false)
_ = u.SetState(model.UserStateOffline)
u.logger.LogInfo("XMPP user disconnected", "user_id", u.id, "jid", u.jid)
return err
}
@ -232,11 +232,11 @@ func (u *XMPPUser) Ping() error {
if !u.connected.Load() {
return fmt.Errorf("XMPP user %s is not connected", u.id)
}
if u.client == nil {
return fmt.Errorf("XMPP client not initialized for user %s", u.id)
}
return u.client.Ping()
}
@ -245,46 +245,46 @@ func (u *XMPPUser) CheckChannelExists(channelID string) (bool, error) {
if !u.connected.Load() {
return false, fmt.Errorf("XMPP user %s is not connected", u.id)
}
if u.client == nil {
return false, fmt.Errorf("XMPP client not initialized for user %s", u.id)
}
return u.client.CheckRoomExists(channelID)
}
// Goroutine lifecycle
func (u *XMPPUser) Start(ctx context.Context) error {
u.logger.LogDebug("Starting XMPP user", "user_id", u.id, "jid", u.jid)
// Update context
u.ctx = ctx
// Connect to XMPP server
if err := u.Connect(); err != nil {
return fmt.Errorf("failed to start XMPP user %s: %w", u.id, err)
}
// Start connection monitoring in a goroutine
go u.connectionMonitor()
u.logger.LogInfo("XMPP user started", "user_id", u.id, "jid", u.jid)
return nil
}
func (u *XMPPUser) Stop() error {
u.logger.LogDebug("Stopping XMPP user", "user_id", u.id, "jid", u.jid)
// Cancel context to stop goroutines
if u.cancel != nil {
u.cancel()
}
// Disconnect from XMPP server
if err := u.Disconnect(); err != nil {
u.logger.LogWarn("Error disconnecting XMPP user during stop", "user_id", u.id, "error", err)
}
u.logger.LogInfo("XMPP user stopped", "user_id", u.id, "jid", u.jid)
return nil
}
@ -292,7 +292,7 @@ func (u *XMPPUser) Stop() error {
// connectionMonitor monitors the XMPP connection for this user
func (u *XMPPUser) connectionMonitor() {
u.logger.LogDebug("Starting connection monitor for XMPP user", "user_id", u.id)
// Simple monitoring - check connection periodically
for {
select {
@ -306,13 +306,13 @@ func (u *XMPPUser) connectionMonitor() {
u.logger.LogWarn("Connection check failed for XMPP user", "user_id", u.id, "error", err)
u.connected.Store(false)
_ = u.SetState(model.UserStateOffline)
// TODO: Implement reconnection logic if needed
}
}
// Wait before next check
timeoutCtx, cancel := context.WithTimeout(u.ctx, 30 * time.Second) // 30 seconds
timeoutCtx, cancel := context.WithTimeout(u.ctx, 30*time.Second) // 30 seconds
select {
case <-u.ctx.Done():
cancel()
@ -333,4 +333,4 @@ func (u *XMPPUser) GetJID() string {
// GetClient returns the underlying XMPP client (for advanced operations)
func (u *XMPPUser) GetClient() *xmppClient.Client {
return u.client
}
}

View file

@ -19,13 +19,13 @@ const DefaultXMPPUsernamePrefix = "xmpp"
// If you add non-reference types to your configuration struct, be sure to rewrite Clone as a deep
// copy appropriate for your types.
type Configuration struct {
XMPPServerURL string `json:"XMPPServerURL"`
XMPPUsername string `json:"XMPPUsername"`
XMPPPassword string `json:"XMPPPassword"`
EnableSync bool `json:"EnableSync"`
XMPPUsernamePrefix string `json:"XMPPUsernamePrefix"`
XMPPResource string `json:"XMPPResource"`
XMPPInsecureSkipVerify bool `json:"XMPPInsecureSkipVerify"`
XMPPServerURL string `json:"XMPPServerURL"`
XMPPUsername string `json:"XMPPUsername"`
XMPPPassword string `json:"XMPPPassword"`
EnableSync bool `json:"EnableSync"`
XMPPUsernamePrefix string `json:"XMPPUsernamePrefix"`
XMPPResource string `json:"XMPPResource"`
XMPPInsecureSkipVerify bool `json:"XMPPInsecureSkipVerify"`
}
// Equals compares two configuration structs
@ -95,4 +95,4 @@ func (c *Configuration) IsValid() error {
}
return nil
}
}

View file

@ -38,4 +38,4 @@ func (l *PluginAPILogger) LogWarn(message string, keyValuePairs ...any) {
// LogError logs an error message
func (l *PluginAPILogger) LogError(message string, keyValuePairs ...any) {
l.api.LogError(message, keyValuePairs...)
}
}