feat: implement comprehensive bridge-agnostic user management system
This commit implements a complete multi-user bridge management system that allows bridges to control multiple users with async goroutine management and convenience methods for channel operations. Key features: - Bridge-agnostic BridgeUser interface with validation, identity, state management, channel operations, connection lifecycle, and goroutine lifecycle methods - BridgeUserManager interface for user lifecycle management with bridge type identification - XMPPUser implementation for XMPP bridge with XMPP client integration, connection monitoring, and room operations - MattermostUser implementation for Mattermost bridge with API integration and channel management - Updated Bridge interface to include GetUserManager() method - Base UserManager implementation with generic user management logic - Added Ping() and CheckChannelExists() methods to BridgeUser interface for health checking and room validation - Updated bridge manager naming from Manager to BridgeManager for clarity The system enables bridges to manage multiple users (like "Mattermost Bridge" user in XMPP) with proper state management, connection monitoring, and channel operations abstracted across different bridge protocols. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
ea1711e94c
commit
db8037ffbf
8 changed files with 949 additions and 94 deletions
|
@ -10,8 +10,8 @@ import (
|
|||
"github.com/mattermost/mattermost/server/public/plugin"
|
||||
)
|
||||
|
||||
// Manager manages multiple bridge instances
|
||||
type Manager struct {
|
||||
// BridgeManager manages multiple bridge instances
|
||||
type BridgeManager struct {
|
||||
bridges map[string]model.Bridge
|
||||
mu sync.RWMutex
|
||||
logger logger.Logger
|
||||
|
@ -19,8 +19,8 @@ type Manager struct {
|
|||
remoteID string
|
||||
}
|
||||
|
||||
// NewManager creates a new bridge manager
|
||||
func NewManager(logger logger.Logger, api plugin.API, remoteID string) model.BridgeManager {
|
||||
// NewBridgeManager creates a new bridge manager
|
||||
func NewBridgeManager(logger logger.Logger, api plugin.API, remoteID string) model.BridgeManager {
|
||||
if logger == nil {
|
||||
panic("logger cannot be nil")
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ func NewManager(logger logger.Logger, api plugin.API, remoteID string) model.Bri
|
|||
panic("plugin API cannot be nil")
|
||||
}
|
||||
|
||||
return &Manager{
|
||||
return &BridgeManager{
|
||||
bridges: make(map[string]model.Bridge),
|
||||
logger: logger,
|
||||
api: api,
|
||||
|
@ -37,7 +37,7 @@ func NewManager(logger logger.Logger, api plugin.API, remoteID string) model.Bri
|
|||
}
|
||||
|
||||
// RegisterBridge registers a bridge with the manager
|
||||
func (m *Manager) RegisterBridge(name string, bridge model.Bridge) error {
|
||||
func (m *BridgeManager) RegisterBridge(name string, bridge model.Bridge) error {
|
||||
if name == "" {
|
||||
return fmt.Errorf("bridge name cannot be empty")
|
||||
}
|
||||
|
@ -59,7 +59,7 @@ func (m *Manager) RegisterBridge(name string, bridge model.Bridge) error {
|
|||
}
|
||||
|
||||
// StartBridge starts a specific bridge
|
||||
func (m *Manager) StartBridge(name string) error {
|
||||
func (m *BridgeManager) StartBridge(name string) error {
|
||||
m.mu.RLock()
|
||||
bridge, exists := m.bridges[name]
|
||||
m.mu.RUnlock()
|
||||
|
@ -80,7 +80,7 @@ func (m *Manager) StartBridge(name string) error {
|
|||
}
|
||||
|
||||
// StopBridge stops a specific bridge
|
||||
func (m *Manager) StopBridge(name string) error {
|
||||
func (m *BridgeManager) StopBridge(name string) error {
|
||||
m.mu.RLock()
|
||||
bridge, exists := m.bridges[name]
|
||||
m.mu.RUnlock()
|
||||
|
@ -101,7 +101,7 @@ func (m *Manager) StopBridge(name string) error {
|
|||
}
|
||||
|
||||
// UnregisterBridge removes a bridge from the manager
|
||||
func (m *Manager) UnregisterBridge(name string) error {
|
||||
func (m *BridgeManager) UnregisterBridge(name string) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
|
@ -124,7 +124,7 @@ func (m *Manager) UnregisterBridge(name string) error {
|
|||
}
|
||||
|
||||
// GetBridge retrieves a bridge by name
|
||||
func (m *Manager) GetBridge(name string) (model.Bridge, error) {
|
||||
func (m *BridgeManager) GetBridge(name string) (model.Bridge, error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
|
@ -137,7 +137,7 @@ func (m *Manager) GetBridge(name string) (model.Bridge, error) {
|
|||
}
|
||||
|
||||
// ListBridges returns a list of all registered bridge names
|
||||
func (m *Manager) ListBridges() []string {
|
||||
func (m *BridgeManager) ListBridges() []string {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
|
@ -150,7 +150,7 @@ func (m *Manager) ListBridges() []string {
|
|||
}
|
||||
|
||||
// HasBridge checks if a bridge with the given name is registered
|
||||
func (m *Manager) HasBridge(name string) bool {
|
||||
func (m *BridgeManager) HasBridge(name string) bool {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
|
@ -159,7 +159,7 @@ func (m *Manager) HasBridge(name string) bool {
|
|||
}
|
||||
|
||||
// HasBridges checks if any bridges are registered
|
||||
func (m *Manager) HasBridges() bool {
|
||||
func (m *BridgeManager) HasBridges() bool {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
|
@ -167,7 +167,7 @@ func (m *Manager) HasBridges() bool {
|
|||
}
|
||||
|
||||
// Shutdown stops and unregisters all bridges
|
||||
func (m *Manager) Shutdown() error {
|
||||
func (m *BridgeManager) Shutdown() error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
|
@ -196,7 +196,7 @@ func (m *Manager) Shutdown() error {
|
|||
}
|
||||
|
||||
// OnPluginConfigurationChange propagates configuration changes to all registered bridges
|
||||
func (m *Manager) OnPluginConfigurationChange(config any) error {
|
||||
func (m *BridgeManager) OnPluginConfigurationChange(config any) error {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
|
@ -225,7 +225,7 @@ func (m *Manager) OnPluginConfigurationChange(config any) error {
|
|||
}
|
||||
|
||||
// CreateChannelMapping handles the creation of a channel mapping by calling the appropriate bridge
|
||||
func (m *Manager) CreateChannelMapping(req model.CreateChannelMappingRequest) error {
|
||||
func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingRequest) error {
|
||||
// Validate request
|
||||
if err := req.Validate(); err != nil {
|
||||
return fmt.Errorf("invalid mapping request: %w", err)
|
||||
|
@ -308,7 +308,7 @@ func (m *Manager) CreateChannelMapping(req model.CreateChannelMappingRequest) er
|
|||
}
|
||||
|
||||
// DeleteChannepMapping handles the deletion of a channel mapping by calling the appropriate bridges
|
||||
func (m *Manager) DeleteChannepMapping(req model.DeleteChannelMappingRequest) error {
|
||||
func (m *BridgeManager) DeleteChannepMapping(req model.DeleteChannelMappingRequest) error {
|
||||
// Validate request
|
||||
if err := req.Validate(); err != nil {
|
||||
return fmt.Errorf("invalid delete request: %w", err)
|
||||
|
@ -359,7 +359,7 @@ func (m *Manager) DeleteChannepMapping(req model.DeleteChannelMappingRequest) er
|
|||
}
|
||||
|
||||
// shareChannel creates a shared channel configuration using the Mattermost API
|
||||
func (m *Manager) shareChannel(req model.CreateChannelMappingRequest) error {
|
||||
func (m *BridgeManager) shareChannel(req model.CreateChannelMappingRequest) error {
|
||||
if m.remoteID == "" {
|
||||
return fmt.Errorf("remote ID not set - plugin not registered for shared channels")
|
||||
}
|
||||
|
@ -389,7 +389,7 @@ func (m *Manager) shareChannel(req model.CreateChannelMappingRequest) error {
|
|||
}
|
||||
|
||||
// unshareChannel removes shared channel configuration using the Mattermost API
|
||||
func (m *Manager) unshareChannel(channelID string) error {
|
||||
func (m *BridgeManager) unshareChannel(channelID string) error {
|
||||
// Unshare the channel
|
||||
unshared, err := m.api.UnshareChannel(channelID)
|
||||
if err != nil {
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/bridge"
|
||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config"
|
||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
|
||||
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
||||
|
@ -15,9 +16,10 @@ import (
|
|||
|
||||
// mattermostBridge handles syncing messages between Mattermost instances
|
||||
type mattermostBridge struct {
|
||||
logger logger.Logger
|
||||
api plugin.API
|
||||
kvstore kvstore.KVStore
|
||||
logger logger.Logger
|
||||
api plugin.API
|
||||
kvstore kvstore.KVStore
|
||||
userManager pluginModel.BridgeUserManager
|
||||
|
||||
// Connection management
|
||||
connected atomic.Bool
|
||||
|
@ -44,6 +46,7 @@ func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *
|
|||
cancel: cancel,
|
||||
channelMappings: make(map[string]string),
|
||||
config: cfg,
|
||||
userManager: bridge.NewUserManager("mattermost", log),
|
||||
}
|
||||
|
||||
return bridge
|
||||
|
@ -328,3 +331,8 @@ func (b *mattermostBridge) GetRoomMapping(roomID string) (string, error) {
|
|||
|
||||
return channelID, nil
|
||||
}
|
||||
|
||||
// GetUserManager returns the user manager for this bridge
|
||||
func (b *mattermostBridge) GetUserManager() pluginModel.BridgeUserManager {
|
||||
return b.userManager
|
||||
}
|
||||
|
|
300
server/bridge/mattermost/user.go
Normal file
300
server/bridge/mattermost/user.go
Normal file
|
@ -0,0 +1,300 @@
|
|||
package mattermost
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config"
|
||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
|
||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
||||
mmModel "github.com/mattermost/mattermost/server/public/model"
|
||||
"github.com/mattermost/mattermost/server/public/plugin"
|
||||
)
|
||||
|
||||
// MattermostUser represents a Mattermost user that implements the BridgeUser interface
|
||||
type MattermostUser struct {
|
||||
// User identity
|
||||
id string
|
||||
displayName string
|
||||
username string
|
||||
email string
|
||||
|
||||
// Mattermost API
|
||||
api plugin.API
|
||||
|
||||
// State management
|
||||
state model.UserState
|
||||
stateMu sync.RWMutex
|
||||
|
||||
// Configuration
|
||||
config *config.Configuration
|
||||
|
||||
// Goroutine lifecycle
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
// Logger
|
||||
logger logger.Logger
|
||||
}
|
||||
|
||||
// NewMattermostUser creates a new Mattermost user
|
||||
func NewMattermostUser(id, displayName, username, email string, api plugin.API, cfg *config.Configuration, logger logger.Logger) *MattermostUser {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
return &MattermostUser{
|
||||
id: id,
|
||||
displayName: displayName,
|
||||
username: username,
|
||||
email: email,
|
||||
api: api,
|
||||
state: model.UserStateOffline,
|
||||
config: cfg,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Validation
|
||||
func (u *MattermostUser) Validate() error {
|
||||
if u.id == "" {
|
||||
return fmt.Errorf("user ID cannot be empty")
|
||||
}
|
||||
if u.username == "" {
|
||||
return fmt.Errorf("username cannot be empty")
|
||||
}
|
||||
if u.config == nil {
|
||||
return fmt.Errorf("configuration cannot be nil")
|
||||
}
|
||||
if u.api == nil {
|
||||
return fmt.Errorf("Mattermost API cannot be nil")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Identity (bridge-agnostic)
|
||||
func (u *MattermostUser) GetID() string {
|
||||
return u.id
|
||||
}
|
||||
|
||||
func (u *MattermostUser) GetDisplayName() string {
|
||||
return u.displayName
|
||||
}
|
||||
|
||||
// State management
|
||||
func (u *MattermostUser) GetState() model.UserState {
|
||||
u.stateMu.RLock()
|
||||
defer u.stateMu.RUnlock()
|
||||
return u.state
|
||||
}
|
||||
|
||||
func (u *MattermostUser) SetState(state model.UserState) error {
|
||||
u.stateMu.Lock()
|
||||
defer u.stateMu.Unlock()
|
||||
|
||||
u.logger.LogDebug("Changing Mattermost user state", "user_id", u.id, "old_state", u.state, "new_state", state)
|
||||
u.state = state
|
||||
|
||||
// TODO: Update user status in Mattermost if needed
|
||||
// This could involve setting custom status or presence indicators
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Channel operations (abstracted from rooms/channels/groups)
|
||||
func (u *MattermostUser) JoinChannel(channelID string) error {
|
||||
u.logger.LogDebug("Mattermost user joining channel", "user_id", u.id, "channel_id", channelID)
|
||||
|
||||
// Add user to channel
|
||||
_, appErr := u.api.AddUserToChannel(channelID, u.id, "")
|
||||
if appErr != nil {
|
||||
return fmt.Errorf("failed to add Mattermost user %s to channel %s: %w", u.id, channelID, appErr)
|
||||
}
|
||||
|
||||
u.logger.LogInfo("Mattermost user joined channel", "user_id", u.id, "channel_id", channelID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *MattermostUser) LeaveChannel(channelID string) error {
|
||||
u.logger.LogDebug("Mattermost user leaving channel", "user_id", u.id, "channel_id", channelID)
|
||||
|
||||
// Remove user from channel
|
||||
appErr := u.api.DeleteChannelMember(channelID, u.id)
|
||||
if appErr != nil {
|
||||
return fmt.Errorf("failed to remove Mattermost user %s from channel %s: %w", u.id, channelID, appErr)
|
||||
}
|
||||
|
||||
u.logger.LogInfo("Mattermost user left channel", "user_id", u.id, "channel_id", channelID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *MattermostUser) SendMessageToChannel(channelID, message string) error {
|
||||
u.logger.LogDebug("Mattermost user sending message to channel", "user_id", u.id, "channel_id", channelID)
|
||||
|
||||
// Create post
|
||||
post := &mmModel.Post{
|
||||
UserId: u.id,
|
||||
ChannelId: channelID,
|
||||
Message: message,
|
||||
}
|
||||
|
||||
// Send post
|
||||
_, appErr := u.api.CreatePost(post)
|
||||
if appErr != nil {
|
||||
return fmt.Errorf("failed to send message to Mattermost channel %s: %w", channelID, appErr)
|
||||
}
|
||||
|
||||
u.logger.LogDebug("Mattermost user sent message to channel", "user_id", u.id, "channel_id", channelID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Connection lifecycle
|
||||
func (u *MattermostUser) Connect() error {
|
||||
u.logger.LogDebug("Connecting Mattermost user", "user_id", u.id, "username", u.username)
|
||||
|
||||
// For Mattermost users, "connecting" means verifying the user exists and is accessible
|
||||
user, appErr := u.api.GetUser(u.id)
|
||||
if appErr != nil {
|
||||
return fmt.Errorf("failed to verify Mattermost user %s: %w", u.id, appErr)
|
||||
}
|
||||
|
||||
// Update user information if it has changed
|
||||
if user.GetDisplayName("") != u.displayName {
|
||||
u.displayName = user.GetDisplayName("")
|
||||
u.logger.LogDebug("Updated Mattermost user display name", "user_id", u.id, "display_name", u.displayName)
|
||||
}
|
||||
|
||||
u.logger.LogInfo("Mattermost user connected", "user_id", u.id, "username", u.username)
|
||||
|
||||
// Update state to online
|
||||
_ = u.SetState(model.UserStateOnline)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *MattermostUser) Disconnect() error {
|
||||
u.logger.LogDebug("Disconnecting Mattermost user", "user_id", u.id, "username", u.username)
|
||||
|
||||
// For Mattermost users, "disconnecting" is mostly a state change
|
||||
// The user still exists in Mattermost, but we're not actively managing them
|
||||
|
||||
_ = u.SetState(model.UserStateOffline)
|
||||
|
||||
u.logger.LogInfo("Mattermost user disconnected", "user_id", u.id, "username", u.username)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *MattermostUser) IsConnected() bool {
|
||||
return u.GetState() == model.UserStateOnline
|
||||
}
|
||||
|
||||
func (u *MattermostUser) Ping() error {
|
||||
if u.api == nil {
|
||||
return fmt.Errorf("Mattermost API not initialized for user %s", u.id)
|
||||
}
|
||||
|
||||
// Test API connectivity by getting server version
|
||||
version := u.api.GetServerVersion()
|
||||
if version == "" {
|
||||
return fmt.Errorf("Mattermost API ping returned empty server version for user %s", u.id)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CheckChannelExists checks if a Mattermost channel exists
|
||||
func (u *MattermostUser) CheckChannelExists(channelID string) (bool, error) {
|
||||
if u.api == nil {
|
||||
return false, fmt.Errorf("Mattermost API not initialized for user %s", u.id)
|
||||
}
|
||||
|
||||
// Try to get the channel by ID
|
||||
_, appErr := u.api.GetChannel(channelID)
|
||||
if appErr != nil {
|
||||
// Check if it's a "not found" error
|
||||
if appErr.StatusCode == 404 {
|
||||
return false, nil // Channel doesn't exist
|
||||
}
|
||||
return false, fmt.Errorf("failed to check channel existence: %w", appErr)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Goroutine lifecycle
|
||||
func (u *MattermostUser) Start(ctx context.Context) error {
|
||||
u.logger.LogDebug("Starting Mattermost user", "user_id", u.id, "username", u.username)
|
||||
|
||||
// Update context
|
||||
u.ctx = ctx
|
||||
|
||||
// Connect to verify user exists
|
||||
if err := u.Connect(); err != nil {
|
||||
return fmt.Errorf("failed to start Mattermost user %s: %w", u.id, err)
|
||||
}
|
||||
|
||||
// Start monitoring in a goroutine
|
||||
go u.monitor()
|
||||
|
||||
u.logger.LogInfo("Mattermost user started", "user_id", u.id, "username", u.username)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *MattermostUser) Stop() error {
|
||||
u.logger.LogDebug("Stopping Mattermost user", "user_id", u.id, "username", u.username)
|
||||
|
||||
// Cancel context to stop goroutines
|
||||
if u.cancel != nil {
|
||||
u.cancel()
|
||||
}
|
||||
|
||||
// Disconnect
|
||||
if err := u.Disconnect(); err != nil {
|
||||
u.logger.LogWarn("Error disconnecting Mattermost user during stop", "user_id", u.id, "error", err)
|
||||
}
|
||||
|
||||
u.logger.LogInfo("Mattermost user stopped", "user_id", u.id, "username", u.username)
|
||||
return nil
|
||||
}
|
||||
|
||||
// monitor periodically checks the user's status and updates information
|
||||
func (u *MattermostUser) monitor() {
|
||||
u.logger.LogDebug("Starting monitor for Mattermost user", "user_id", u.id)
|
||||
|
||||
// Simple monitoring - check user exists periodically
|
||||
for {
|
||||
select {
|
||||
case <-u.ctx.Done():
|
||||
u.logger.LogDebug("Monitor stopped for Mattermost user", "user_id", u.id)
|
||||
return
|
||||
default:
|
||||
// Wait before next check
|
||||
timeoutCtx, cancel := context.WithTimeout(u.ctx, 60*time.Second)
|
||||
select {
|
||||
case <-u.ctx.Done():
|
||||
cancel()
|
||||
return
|
||||
case <-timeoutCtx.Done():
|
||||
cancel()
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetUsername returns the Mattermost username for this user (Mattermost-specific method)
|
||||
func (u *MattermostUser) GetUsername() string {
|
||||
return u.username
|
||||
}
|
||||
|
||||
// GetEmail returns the Mattermost email for this user (Mattermost-specific method)
|
||||
func (u *MattermostUser) GetEmail() string {
|
||||
return u.email
|
||||
}
|
||||
|
||||
// GetAPI returns the Mattermost API instance (for advanced operations)
|
||||
func (u *MattermostUser) GetAPI() plugin.API {
|
||||
return u.api
|
||||
}
|
188
server/bridge/user.go
Normal file
188
server/bridge/user.go
Normal file
|
@ -0,0 +1,188 @@
|
|||
package bridge
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config"
|
||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
|
||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
||||
)
|
||||
|
||||
// Manager implements the BridgeUserManager interface with bridge-agnostic logic
|
||||
type UserManager struct {
|
||||
bridgeType string
|
||||
logger logger.Logger
|
||||
users map[string]model.BridgeUser
|
||||
mu sync.RWMutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// NewUserManager creates a new user manager for a specific bridge type
|
||||
func NewUserManager(bridgeType string, logger logger.Logger) model.BridgeUserManager {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &UserManager{
|
||||
bridgeType: bridgeType,
|
||||
logger: logger,
|
||||
users: make(map[string]model.BridgeUser),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
// CreateUser adds a user to the bridge system
|
||||
func (m *UserManager) CreateUser(user model.BridgeUser) error {
|
||||
// Validate the user first
|
||||
if err := user.Validate(); err != nil {
|
||||
return fmt.Errorf("invalid user: %w", err)
|
||||
}
|
||||
|
||||
userID := user.GetID()
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
// Check if user already exists
|
||||
if _, exists := m.users[userID]; exists {
|
||||
return fmt.Errorf("user %s already exists", userID)
|
||||
}
|
||||
|
||||
m.logger.LogDebug("Adding bridge user", "bridge_type", m.bridgeType, "user_id", userID, "display_name", user.GetDisplayName())
|
||||
|
||||
// Store the user
|
||||
m.users[userID] = user
|
||||
|
||||
m.logger.LogInfo("Bridge user added successfully", "bridge_type", m.bridgeType, "user_id", userID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetUser retrieves a user by ID
|
||||
func (m *UserManager) GetUser(userID string) (model.BridgeUser, error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
user, exists := m.users[userID]
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("user %s not found", userID)
|
||||
}
|
||||
|
||||
return user, nil
|
||||
}
|
||||
|
||||
// DeleteUser removes a user from the bridge system
|
||||
func (m *UserManager) DeleteUser(userID string) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
user, exists := m.users[userID]
|
||||
if !exists {
|
||||
return fmt.Errorf("user %s not found", userID)
|
||||
}
|
||||
|
||||
m.logger.LogDebug("Deleting bridge user", "bridge_type", m.bridgeType, "user_id", userID)
|
||||
|
||||
// Stop the user first
|
||||
if err := user.Stop(); err != nil {
|
||||
m.logger.LogWarn("Error stopping user during deletion", "bridge_type", m.bridgeType, "user_id", userID, "error", err)
|
||||
}
|
||||
|
||||
// Disconnect if still connected
|
||||
if user.IsConnected() {
|
||||
if err := user.Disconnect(); err != nil {
|
||||
m.logger.LogWarn("Error disconnecting user during deletion", "bridge_type", m.bridgeType, "user_id", userID, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from map
|
||||
delete(m.users, userID)
|
||||
|
||||
m.logger.LogInfo("Bridge user deleted successfully", "bridge_type", m.bridgeType, "user_id", userID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListUsers returns a list of all users
|
||||
func (m *UserManager) ListUsers() []model.BridgeUser {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
users := make([]model.BridgeUser, 0, len(m.users))
|
||||
for _, user := range m.users {
|
||||
users = append(users, user)
|
||||
}
|
||||
|
||||
return users
|
||||
}
|
||||
|
||||
// HasUser checks if a user exists
|
||||
func (m *UserManager) HasUser(userID string) bool {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
_, exists := m.users[userID]
|
||||
return exists
|
||||
}
|
||||
|
||||
// Start initializes the user manager
|
||||
func (m *UserManager) Start(ctx context.Context) error {
|
||||
m.logger.LogDebug("Starting user manager", "bridge_type", m.bridgeType)
|
||||
|
||||
// Update context
|
||||
m.ctx = ctx
|
||||
|
||||
// Start all existing users
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
for userID, user := range m.users {
|
||||
if err := user.Start(ctx); err != nil {
|
||||
m.logger.LogWarn("Failed to start user during manager startup", "bridge_type", m.bridgeType, "user_id", userID, "error", err)
|
||||
// Continue starting other users even if one fails
|
||||
}
|
||||
}
|
||||
|
||||
m.logger.LogInfo("User manager started", "bridge_type", m.bridgeType, "user_count", len(m.users))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop shuts down the user manager
|
||||
func (m *UserManager) Stop() error {
|
||||
m.logger.LogDebug("Stopping user manager", "bridge_type", m.bridgeType)
|
||||
|
||||
if m.cancel != nil {
|
||||
m.cancel()
|
||||
}
|
||||
|
||||
// Stop all users
|
||||
m.mu.RLock()
|
||||
users := make([]model.BridgeUser, 0, len(m.users))
|
||||
for _, user := range m.users {
|
||||
users = append(users, user)
|
||||
}
|
||||
m.mu.RUnlock()
|
||||
|
||||
for _, user := range users {
|
||||
if err := user.Stop(); err != nil {
|
||||
m.logger.LogWarn("Error stopping user during manager shutdown", "bridge_type", m.bridgeType, "user_id", user.GetID(), "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
m.logger.LogInfo("User manager stopped", "bridge_type", m.bridgeType)
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateConfiguration updates configuration for all users
|
||||
func (m *UserManager) UpdateConfiguration(cfg *config.Configuration) error {
|
||||
m.logger.LogDebug("Updating configuration for user manager", "bridge_type", m.bridgeType)
|
||||
|
||||
// For now, we don't propagate config changes to individual users
|
||||
// This can be extended later if needed
|
||||
m.logger.LogInfo("User manager configuration updated", "bridge_type", m.bridgeType)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetBridgeType returns the bridge type this manager handles
|
||||
func (m *UserManager) GetBridgeType() string {
|
||||
return m.bridgeType
|
||||
}
|
|
@ -2,27 +2,28 @@ package xmpp
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"fmt"
|
||||
|
||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/bridge"
|
||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config"
|
||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
|
||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
||||
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/store/kvstore"
|
||||
xmppClient "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp"
|
||||
"github.com/mattermost/mattermost/server/public/plugin"
|
||||
)
|
||||
|
||||
// xmppBridge handles syncing messages between Mattermost and XMPP
|
||||
type xmppBridge struct {
|
||||
logger logger.Logger
|
||||
api plugin.API
|
||||
kvstore kvstore.KVStore
|
||||
xmppClient *xmppClient.Client
|
||||
logger logger.Logger
|
||||
api plugin.API
|
||||
kvstore kvstore.KVStore
|
||||
bridgeUser model.BridgeUser // Handles the bridge user and main bridge XMPP connection
|
||||
userManager pluginModel.BridgeUserManager
|
||||
|
||||
// Connection management
|
||||
connected atomic.Bool
|
||||
|
@ -41,7 +42,7 @@ type xmppBridge struct {
|
|||
// NewBridge creates a new XMPP bridge
|
||||
func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration) pluginModel.Bridge {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
bridge := &xmppBridge{
|
||||
b := &xmppBridge{
|
||||
logger: log,
|
||||
api: api,
|
||||
kvstore: kvstore,
|
||||
|
@ -49,32 +50,20 @@ func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *
|
|||
cancel: cancel,
|
||||
channelMappings: make(map[string]string),
|
||||
config: cfg,
|
||||
userManager: bridge.NewUserManager("xmpp", log),
|
||||
}
|
||||
|
||||
// Initialize XMPP client with configuration
|
||||
if cfg.EnableSync && cfg.XMPPServerURL != "" && cfg.XMPPUsername != "" && cfg.XMPPPassword != "" {
|
||||
bridge.xmppClient = bridge.createXMPPClient(cfg)
|
||||
b.bridgeUser = b.createXMPPClient(cfg)
|
||||
}
|
||||
|
||||
return bridge
|
||||
return b
|
||||
}
|
||||
|
||||
// createXMPPClient creates an XMPP client with the given configuration
|
||||
func (b *xmppBridge) createXMPPClient(cfg *config.Configuration) *xmppClient.Client {
|
||||
// Create TLS config based on certificate verification setting
|
||||
tlsConfig := &tls.Config{
|
||||
InsecureSkipVerify: cfg.XMPPInsecureSkipVerify,
|
||||
}
|
||||
|
||||
return xmppClient.NewClientWithTLS(
|
||||
cfg.XMPPServerURL,
|
||||
cfg.XMPPUsername,
|
||||
cfg.XMPPPassword,
|
||||
cfg.GetXMPPResource(),
|
||||
"", // remoteID not needed for bridge user
|
||||
tlsConfig,
|
||||
b.logger,
|
||||
)
|
||||
func (b *xmppBridge) createXMPPClient(cfg *config.Configuration) model.BridgeUser {
|
||||
return NewXMPPUser("_bridge_", "Bridge User", cfg.XMPPUsername, cfg, b.logger)
|
||||
}
|
||||
|
||||
// UpdateConfiguration updates the bridge configuration
|
||||
|
@ -97,9 +86,9 @@ func (b *xmppBridge) UpdateConfiguration(newConfig any) error {
|
|||
return fmt.Errorf("XMPP server URL, username, and password are required when sync is enabled")
|
||||
}
|
||||
|
||||
b.xmppClient = b.createXMPPClient(cfg)
|
||||
b.bridgeUser = b.createXMPPClient(cfg)
|
||||
} else {
|
||||
b.xmppClient = nil
|
||||
b.bridgeUser = nil
|
||||
}
|
||||
|
||||
// Check if we need to restart the bridge due to configuration changes
|
||||
|
@ -108,7 +97,7 @@ func (b *xmppBridge) UpdateConfiguration(newConfig any) error {
|
|||
|
||||
// Log the configuration change
|
||||
if needsRestart {
|
||||
b.logger.LogInfo("Configuration changed, restarting bridge", "old_config", oldConfig, "new_config", cfg)
|
||||
b.logger.LogInfo("Configuration changed, restarting bridge")
|
||||
} else {
|
||||
b.logger.LogInfo("Configuration updated", "config", cfg)
|
||||
}
|
||||
|
@ -175,8 +164,8 @@ func (b *xmppBridge) Stop() error {
|
|||
b.cancel()
|
||||
}
|
||||
|
||||
if b.xmppClient != nil {
|
||||
if err := b.xmppClient.Disconnect(); err != nil {
|
||||
if b.bridgeUser != nil {
|
||||
if err := b.bridgeUser.Disconnect(); err != nil {
|
||||
b.logger.LogWarn("Error disconnecting from XMPP server", "error", err)
|
||||
}
|
||||
}
|
||||
|
@ -188,13 +177,13 @@ func (b *xmppBridge) Stop() error {
|
|||
|
||||
// connectToXMPP establishes connection to the XMPP server
|
||||
func (b *xmppBridge) connectToXMPP() error {
|
||||
if b.xmppClient == nil {
|
||||
if b.bridgeUser == nil {
|
||||
return fmt.Errorf("XMPP client is not initialized")
|
||||
}
|
||||
|
||||
b.logger.LogDebug("Connecting to XMPP server")
|
||||
|
||||
err := b.xmppClient.Connect()
|
||||
err := b.bridgeUser.Connect()
|
||||
if err != nil {
|
||||
b.connected.Store(false)
|
||||
return fmt.Errorf("failed to connect to XMPP server: %w", err)
|
||||
|
@ -204,7 +193,7 @@ func (b *xmppBridge) connectToXMPP() error {
|
|||
b.logger.LogInfo("Successfully connected to XMPP server")
|
||||
|
||||
// Set online presence after successful connection
|
||||
if err := b.xmppClient.SetOnlinePresence(); err != nil {
|
||||
if err := b.bridgeUser.SetState(pluginModel.UserStateOnline); err != nil {
|
||||
b.logger.LogWarn("Failed to set online presence", "error", err)
|
||||
// Don't fail the connection for presence issues
|
||||
} else {
|
||||
|
@ -247,7 +236,7 @@ func (b *xmppBridge) joinXMPPRoom(channelID, roomJID string) error {
|
|||
return fmt.Errorf("not connected to XMPP server")
|
||||
}
|
||||
|
||||
err := b.xmppClient.JoinRoom(roomJID)
|
||||
err := b.bridgeUser.JoinChannel(roomJID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to join XMPP room: %w", err)
|
||||
}
|
||||
|
@ -309,7 +298,7 @@ func (b *xmppBridge) connectionMonitor() {
|
|||
case <-b.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := b.checkConnection(); err != nil {
|
||||
if err := b.Ping(); err != nil {
|
||||
b.logger.LogWarn("XMPP connection check failed", "error", err)
|
||||
b.handleReconnection()
|
||||
}
|
||||
|
@ -317,14 +306,6 @@ func (b *xmppBridge) connectionMonitor() {
|
|||
}
|
||||
}
|
||||
|
||||
// checkConnection verifies the XMPP connection is still active
|
||||
func (b *xmppBridge) checkConnection() error {
|
||||
if !b.connected.Load() {
|
||||
return fmt.Errorf("not connected")
|
||||
}
|
||||
return b.xmppClient.Ping()
|
||||
}
|
||||
|
||||
// handleReconnection attempts to reconnect to XMPP and rejoin rooms
|
||||
func (b *xmppBridge) handleReconnection() {
|
||||
b.configMu.RLock()
|
||||
|
@ -338,8 +319,8 @@ func (b *xmppBridge) handleReconnection() {
|
|||
b.logger.LogInfo("Attempting to reconnect to XMPP server")
|
||||
b.connected.Store(false)
|
||||
|
||||
if b.xmppClient != nil {
|
||||
b.xmppClient.Disconnect()
|
||||
if b.bridgeUser != nil {
|
||||
_ = b.bridgeUser.Disconnect()
|
||||
}
|
||||
|
||||
// Retry connection with exponential backoff
|
||||
|
@ -382,14 +363,14 @@ func (b *xmppBridge) Ping() error {
|
|||
return fmt.Errorf("XMPP bridge is not connected")
|
||||
}
|
||||
|
||||
if b.xmppClient == nil {
|
||||
if b.bridgeUser == nil {
|
||||
return fmt.Errorf("XMPP client not initialized")
|
||||
}
|
||||
|
||||
b.logger.LogDebug("Testing XMPP bridge connectivity with ping")
|
||||
|
||||
// Use the XMPP client's ping method
|
||||
if err := b.xmppClient.Ping(); err != nil {
|
||||
// Use the XMPP user's ping method
|
||||
if err := b.bridgeUser.Ping(); err != nil {
|
||||
b.logger.LogWarn("XMPP bridge ping failed", "error", err)
|
||||
return fmt.Errorf("XMPP bridge ping failed: %w", err)
|
||||
}
|
||||
|
@ -416,7 +397,7 @@ func (b *xmppBridge) CreateChannelMapping(channelID, roomJID string) error {
|
|||
|
||||
// Join the room if connected
|
||||
if b.connected.Load() {
|
||||
if err := b.xmppClient.JoinRoom(roomJID); err != nil {
|
||||
if err := b.bridgeUser.JoinChannel(roomJID); err != nil {
|
||||
b.logger.LogWarn("Failed to join newly mapped room", "channel_id", channelID, "room_jid", roomJID, "error", err)
|
||||
}
|
||||
}
|
||||
|
@ -482,8 +463,8 @@ func (b *xmppBridge) DeleteChannelMapping(channelID string) error {
|
|||
b.mappingsMu.Unlock()
|
||||
|
||||
// Leave the room if connected
|
||||
if b.connected.Load() && b.xmppClient != nil {
|
||||
if err := b.xmppClient.LeaveRoom(roomJID); err != nil {
|
||||
if b.connected.Load() && b.bridgeUser != nil {
|
||||
if err := b.bridgeUser.LeaveChannel(roomJID); err != nil {
|
||||
b.logger.LogWarn("Failed to leave unmapped room", "channel_id", channelID, "room_jid", roomJID, "error", err)
|
||||
// Don't fail the entire operation if leaving the room fails
|
||||
} else {
|
||||
|
@ -501,14 +482,14 @@ func (b *xmppBridge) RoomExists(roomID string) (bool, error) {
|
|||
return false, fmt.Errorf("not connected to XMPP server")
|
||||
}
|
||||
|
||||
if b.xmppClient == nil {
|
||||
if b.bridgeUser == nil {
|
||||
return false, fmt.Errorf("XMPP client not initialized")
|
||||
}
|
||||
|
||||
b.logger.LogDebug("Checking if XMPP room exists", "room_jid", roomID)
|
||||
|
||||
// Use the XMPP client to check room existence
|
||||
exists, err := b.xmppClient.CheckRoomExists(roomID)
|
||||
// Use the XMPP user to check room existence
|
||||
exists, err := b.bridgeUser.CheckChannelExists(roomID)
|
||||
if err != nil {
|
||||
b.logger.LogError("Failed to check room existence", "room_jid", roomID, "error", err)
|
||||
return false, fmt.Errorf("failed to check room existence: %w", err)
|
||||
|
@ -539,3 +520,8 @@ func (b *xmppBridge) GetRoomMapping(roomID string) (string, error) {
|
|||
|
||||
return channelID, nil
|
||||
}
|
||||
|
||||
// GetUserManager returns the user manager for this bridge
|
||||
func (b *xmppBridge) GetUserManager() pluginModel.BridgeUserManager {
|
||||
return b.userManager
|
||||
}
|
||||
|
|
336
server/bridge/xmpp/user.go
Normal file
336
server/bridge/xmpp/user.go
Normal file
|
@ -0,0 +1,336 @@
|
|||
package xmpp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config"
|
||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
|
||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
||||
xmppClient "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp"
|
||||
)
|
||||
|
||||
// XMPPUser represents an XMPP user that implements the BridgeUser interface
|
||||
type XMPPUser struct {
|
||||
// User identity
|
||||
id string
|
||||
displayName string
|
||||
jid string
|
||||
|
||||
// XMPP client
|
||||
client *xmppClient.Client
|
||||
|
||||
// State management
|
||||
state model.UserState
|
||||
stateMu sync.RWMutex
|
||||
connected atomic.Bool
|
||||
|
||||
// Configuration
|
||||
config *config.Configuration
|
||||
|
||||
// Goroutine lifecycle
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
// Logger
|
||||
logger logger.Logger
|
||||
}
|
||||
|
||||
// NewXMPPUser creates a new XMPP user
|
||||
func NewXMPPUser(id, displayName, jid string, cfg *config.Configuration, logger logger.Logger) *XMPPUser {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
// Create TLS config based on certificate verification setting
|
||||
tlsConfig := &tls.Config{
|
||||
InsecureSkipVerify: cfg.XMPPInsecureSkipVerify,
|
||||
}
|
||||
|
||||
// Create XMPP client for this user
|
||||
client := xmppClient.NewClientWithTLS(
|
||||
cfg.XMPPServerURL,
|
||||
jid,
|
||||
cfg.XMPPPassword, // This might need to be user-specific in the future
|
||||
cfg.GetXMPPResource(),
|
||||
id, // Use user ID as remote ID
|
||||
tlsConfig,
|
||||
logger,
|
||||
)
|
||||
|
||||
return &XMPPUser{
|
||||
id: id,
|
||||
displayName: displayName,
|
||||
jid: jid,
|
||||
client: client,
|
||||
state: model.UserStateOffline,
|
||||
config: cfg,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Validation
|
||||
func (u *XMPPUser) Validate() error {
|
||||
if u.id == "" {
|
||||
return fmt.Errorf("user ID cannot be empty")
|
||||
}
|
||||
if u.jid == "" {
|
||||
return fmt.Errorf("JID cannot be empty")
|
||||
}
|
||||
if u.config == nil {
|
||||
return fmt.Errorf("configuration cannot be nil")
|
||||
}
|
||||
if u.config.XMPPServerURL == "" {
|
||||
return fmt.Errorf("XMPP server URL cannot be empty")
|
||||
}
|
||||
if u.client == nil {
|
||||
return fmt.Errorf("XMPP client cannot be nil")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Identity (bridge-agnostic)
|
||||
func (u *XMPPUser) GetID() string {
|
||||
return u.id
|
||||
}
|
||||
|
||||
func (u *XMPPUser) GetDisplayName() string {
|
||||
return u.displayName
|
||||
}
|
||||
|
||||
// State management
|
||||
func (u *XMPPUser) GetState() model.UserState {
|
||||
u.stateMu.RLock()
|
||||
defer u.stateMu.RUnlock()
|
||||
return u.state
|
||||
}
|
||||
|
||||
func (u *XMPPUser) SetState(state model.UserState) error {
|
||||
u.stateMu.Lock()
|
||||
defer u.stateMu.Unlock()
|
||||
|
||||
u.logger.LogDebug("Changing XMPP user state", "user_id", u.id, "old_state", u.state, "new_state", state)
|
||||
u.state = state
|
||||
|
||||
// TODO: Send presence update to XMPP server based on state
|
||||
// This would involve mapping UserState to XMPP presence types
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Channel operations
|
||||
func (u *XMPPUser) JoinChannel(channelID string) error {
|
||||
if !u.connected.Load() {
|
||||
return fmt.Errorf("user %s is not connected", u.id)
|
||||
}
|
||||
|
||||
u.logger.LogDebug("XMPP user joining channel", "user_id", u.id, "channel_id", channelID)
|
||||
|
||||
// For XMPP, channelID is the room JID
|
||||
err := u.client.JoinRoom(channelID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to join XMPP room %s: %w", channelID, err)
|
||||
}
|
||||
|
||||
u.logger.LogInfo("XMPP user joined channel", "user_id", u.id, "channel_id", channelID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *XMPPUser) LeaveChannel(channelID string) error {
|
||||
if !u.connected.Load() {
|
||||
return fmt.Errorf("user %s is not connected", u.id)
|
||||
}
|
||||
|
||||
u.logger.LogDebug("XMPP user leaving channel", "user_id", u.id, "channel_id", channelID)
|
||||
|
||||
// For XMPP, channelID is the room JID
|
||||
err := u.client.LeaveRoom(channelID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to leave XMPP room %s: %w", channelID, err)
|
||||
}
|
||||
|
||||
u.logger.LogInfo("XMPP user left channel", "user_id", u.id, "channel_id", channelID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *XMPPUser) SendMessageToChannel(channelID, message string) error {
|
||||
if !u.connected.Load() {
|
||||
return fmt.Errorf("user %s is not connected", u.id)
|
||||
}
|
||||
|
||||
u.logger.LogDebug("XMPP user sending message to channel", "user_id", u.id, "channel_id", channelID)
|
||||
|
||||
// Create message request for XMPP
|
||||
req := xmppClient.MessageRequest{
|
||||
RoomJID: channelID,
|
||||
GhostUserJID: u.jid,
|
||||
Message: message,
|
||||
}
|
||||
|
||||
_, err := u.client.SendMessage(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send message to XMPP room %s: %w", channelID, err)
|
||||
}
|
||||
|
||||
u.logger.LogDebug("XMPP user sent message to channel", "user_id", u.id, "channel_id", channelID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Connection lifecycle
|
||||
func (u *XMPPUser) Connect() error {
|
||||
u.logger.LogDebug("Connecting XMPP user", "user_id", u.id, "jid", u.jid)
|
||||
|
||||
err := u.client.Connect()
|
||||
if err != nil {
|
||||
u.connected.Store(false)
|
||||
return fmt.Errorf("failed to connect XMPP user %s: %w", u.id, err)
|
||||
}
|
||||
|
||||
u.connected.Store(true)
|
||||
u.logger.LogInfo("XMPP user connected", "user_id", u.id, "jid", u.jid)
|
||||
|
||||
// Set online presence after successful connection
|
||||
if err := u.client.SetOnlinePresence(); err != nil {
|
||||
u.logger.LogWarn("Failed to set online presence for XMPP user", "user_id", u.id, "error", err)
|
||||
// Don't fail the connection for presence issues
|
||||
}
|
||||
|
||||
// Update state to online
|
||||
_ = u.SetState(model.UserStateOnline)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *XMPPUser) Disconnect() error {
|
||||
u.logger.LogDebug("Disconnecting XMPP user", "user_id", u.id, "jid", u.jid)
|
||||
|
||||
if u.client == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := u.client.Disconnect()
|
||||
if err != nil {
|
||||
u.logger.LogWarn("Error disconnecting XMPP user", "user_id", u.id, "error", err)
|
||||
}
|
||||
|
||||
u.connected.Store(false)
|
||||
_ = u.SetState(model.UserStateOffline)
|
||||
|
||||
u.logger.LogInfo("XMPP user disconnected", "user_id", u.id, "jid", u.jid)
|
||||
return err
|
||||
}
|
||||
|
||||
func (u *XMPPUser) IsConnected() bool {
|
||||
return u.connected.Load()
|
||||
}
|
||||
|
||||
func (u *XMPPUser) Ping() error {
|
||||
if !u.connected.Load() {
|
||||
return fmt.Errorf("XMPP user %s is not connected", u.id)
|
||||
}
|
||||
|
||||
if u.client == nil {
|
||||
return fmt.Errorf("XMPP client not initialized for user %s", u.id)
|
||||
}
|
||||
|
||||
return u.client.Ping()
|
||||
}
|
||||
|
||||
// CheckChannelExists checks if an XMPP room/channel exists
|
||||
func (u *XMPPUser) CheckChannelExists(channelID string) (bool, error) {
|
||||
if !u.connected.Load() {
|
||||
return false, fmt.Errorf("XMPP user %s is not connected", u.id)
|
||||
}
|
||||
|
||||
if u.client == nil {
|
||||
return false, fmt.Errorf("XMPP client not initialized for user %s", u.id)
|
||||
}
|
||||
|
||||
return u.client.CheckRoomExists(channelID)
|
||||
}
|
||||
|
||||
// Goroutine lifecycle
|
||||
func (u *XMPPUser) Start(ctx context.Context) error {
|
||||
u.logger.LogDebug("Starting XMPP user", "user_id", u.id, "jid", u.jid)
|
||||
|
||||
// Update context
|
||||
u.ctx = ctx
|
||||
|
||||
// Connect to XMPP server
|
||||
if err := u.Connect(); err != nil {
|
||||
return fmt.Errorf("failed to start XMPP user %s: %w", u.id, err)
|
||||
}
|
||||
|
||||
// Start connection monitoring in a goroutine
|
||||
go u.connectionMonitor()
|
||||
|
||||
u.logger.LogInfo("XMPP user started", "user_id", u.id, "jid", u.jid)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *XMPPUser) Stop() error {
|
||||
u.logger.LogDebug("Stopping XMPP user", "user_id", u.id, "jid", u.jid)
|
||||
|
||||
// Cancel context to stop goroutines
|
||||
if u.cancel != nil {
|
||||
u.cancel()
|
||||
}
|
||||
|
||||
// Disconnect from XMPP server
|
||||
if err := u.Disconnect(); err != nil {
|
||||
u.logger.LogWarn("Error disconnecting XMPP user during stop", "user_id", u.id, "error", err)
|
||||
}
|
||||
|
||||
u.logger.LogInfo("XMPP user stopped", "user_id", u.id, "jid", u.jid)
|
||||
return nil
|
||||
}
|
||||
|
||||
// connectionMonitor monitors the XMPP connection for this user
|
||||
func (u *XMPPUser) connectionMonitor() {
|
||||
u.logger.LogDebug("Starting connection monitor for XMPP user", "user_id", u.id)
|
||||
|
||||
// Simple monitoring - check connection periodically
|
||||
for {
|
||||
select {
|
||||
case <-u.ctx.Done():
|
||||
u.logger.LogDebug("Connection monitor stopped for XMPP user", "user_id", u.id)
|
||||
return
|
||||
default:
|
||||
// Check connection every 30 seconds
|
||||
if u.connected.Load() {
|
||||
if err := u.client.Ping(); err != nil {
|
||||
u.logger.LogWarn("Connection check failed for XMPP user", "user_id", u.id, "error", err)
|
||||
u.connected.Store(false)
|
||||
_ = u.SetState(model.UserStateOffline)
|
||||
|
||||
// TODO: Implement reconnection logic if needed
|
||||
}
|
||||
}
|
||||
|
||||
// Wait before next check
|
||||
timeoutCtx, cancel := context.WithTimeout(u.ctx, 30 * time.Second) // 30 seconds
|
||||
select {
|
||||
case <-u.ctx.Done():
|
||||
cancel()
|
||||
return
|
||||
case <-timeoutCtx.Done():
|
||||
cancel()
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetJID returns the XMPP JID for this user (XMPP-specific method)
|
||||
func (u *XMPPUser) GetJID() string {
|
||||
return u.jid
|
||||
}
|
||||
|
||||
// GetClient returns the underlying XMPP client (for advanced operations)
|
||||
func (u *XMPPUser) GetClient() *xmppClient.Client {
|
||||
return u.client
|
||||
}
|
|
@ -1,6 +1,11 @@
|
|||
package model
|
||||
|
||||
import "fmt"
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config"
|
||||
)
|
||||
|
||||
type BridgeID string
|
||||
|
||||
|
@ -144,27 +149,59 @@ type Bridge interface {
|
|||
|
||||
// Ping actively tests the bridge connection health by sending a lightweight request.
|
||||
Ping() error
|
||||
|
||||
// GetUserManager returns the user manager for this bridge.
|
||||
GetUserManager() BridgeUserManager
|
||||
}
|
||||
|
||||
// BridgeUser represents a user connected to any bridge service
|
||||
type BridgeUser interface {
|
||||
// Validation
|
||||
Validate() error
|
||||
|
||||
// Identity (bridge-agnostic)
|
||||
GetID() string
|
||||
GetDisplayName() string
|
||||
|
||||
// State management
|
||||
GetState() UserState
|
||||
SetState(state UserState) error
|
||||
|
||||
// Channel operations (abstracted from rooms/channels/groups)
|
||||
JoinChannel(channelID string) error
|
||||
LeaveChannel(channelID string) error
|
||||
SendMessageToChannel(channelID, message string) error
|
||||
|
||||
// Connection lifecycle
|
||||
Connect() error
|
||||
Disconnect() error
|
||||
IsConnected() bool
|
||||
Ping() error
|
||||
|
||||
// Channel existence check
|
||||
CheckChannelExists(channelID string) (bool, error)
|
||||
|
||||
// Goroutine lifecycle
|
||||
Start(ctx context.Context) error
|
||||
Stop() error
|
||||
}
|
||||
|
||||
// BridgeUserManager manages users for a specific bridge
|
||||
type BridgeUserManager interface {
|
||||
// CreateUser creates a new user in the bridge system.
|
||||
CreateUser(userID string, userData any) error
|
||||
|
||||
// GetUser retrieves user data for a given user ID.
|
||||
GetUser(userID string) (any, error)
|
||||
|
||||
// UpdateUser updates user data for a given user ID.
|
||||
UpdateUser(userID string, userData any) error
|
||||
|
||||
// DeleteUser removes a user from the bridge system.
|
||||
// User lifecycle
|
||||
CreateUser(user BridgeUser) error
|
||||
GetUser(userID string) (BridgeUser, error)
|
||||
DeleteUser(userID string) error
|
||||
|
||||
// ListUsers returns a list of all users in the bridge system.
|
||||
ListUsers() ([]string, error)
|
||||
|
||||
// HasUser checks if a user exists in the bridge system.
|
||||
ListUsers() []BridgeUser
|
||||
HasUser(userID string) bool
|
||||
|
||||
// OnUserStateChange is called when a user's state changes (e.g., online, away, offline).
|
||||
OnUserStateChange(userID string, state UserState) error
|
||||
// Manager lifecycle
|
||||
Start(ctx context.Context) error
|
||||
Stop() error
|
||||
|
||||
// Configuration updates
|
||||
UpdateConfiguration(config *config.Configuration) error
|
||||
|
||||
// Bridge type identification
|
||||
GetBridgeType() string
|
||||
}
|
||||
|
|
|
@ -83,7 +83,7 @@ func (p *Plugin) OnActivate() error {
|
|||
}
|
||||
|
||||
// Initialize bridge manager
|
||||
p.bridgeManager = bridge.NewManager(p.logger, p.API, p.remoteID)
|
||||
p.bridgeManager = bridge.NewBridgeManager(p.logger, p.API, p.remoteID)
|
||||
|
||||
// Initialize and register bridges with current configuration
|
||||
if err := p.initBridges(*cfg); err != nil {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue