Compare commits

...

8 commits

Author SHA1 Message Date
0442bd7b72
feat: implement connection caching for ghost user lifecycle management
Some checks failed
ci / plugin-ci (push) Has been cancelled
Implement comprehensive connection caching system to prevent XMPP connection leaks and support HA environments:

- Add node-specific XMPP resources using format "{baseResource}-node-{diagnosticId[:8]}" for HA compatibility
- Implement thread-safe connection cache with mutex protection in UserManager
- Add cache-first lookup in GetUser/GetOrCreateUser methods to prevent duplicate connections
- Update lifecycle manager to efficiently check cached users instead of expensive KV store queries
- Add graceful shutdown cleanup to properly disconnect all cached connections
- Implement cache management methods: getCachedUser, cacheUser, removeCachedUser, getCachedUsers
- Update activity tracking to work with cached connections
- Add proper cache cleanup when users are disconnected or deleted

This prevents connection leaks identified in previous implementation while maintaining efficient ghost user lifecycle management with 30-minute inactivity timeout.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-12 21:18:35 +02:00
22f8c97a25
feat: improve doctor command logging and automatic server capability detection
- Replace SimpleLogger with structured logging using slog and tint library
- Add colorized output with proper log levels (DEBUG/INFO/WARN/ERROR)
- Remove manual DetectServerCapabilities() calls from doctor command
- Server capabilities are now automatically detected on client connection
- Update default test credentials to admin@localhost/admin for consistency
- Improve testXEP0077 to use fail-fast approach with proper ghost user workflow
- Add structured logging throughout all test functions with timing information

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-12 19:08:49 +02:00
96d8b84dcb
fix: properly check responses for registration and cancellation requests 2025-08-12 18:56:06 +02:00
9bd0071b4a
chore: missing fields in plugin.json 2025-08-12 18:44:22 +02:00
b7fd8ddb54
chore: remove optional service in sidecar 2025-08-12 18:44:09 +02:00
109b491a0a
chore: disable webapp 2025-08-12 18:43:50 +02:00
b99b412692
fix: properly send inbandregistration requests 2025-08-12 18:43:21 +02:00
4e4a290813
chore: detect server capabilities on connect 2025-08-12 18:42:55 +02:00
12 changed files with 769 additions and 479 deletions

View file

@ -9,8 +9,8 @@ This plugin provides bidirectional message synchronization between Mattermost an
- Bidirectional message synchronization (Mattermost ↔ XMPP)
- XMPP Multi-User Chat (MUC) support
- Configurable username prefixes for XMPP users in Mattermost
- Ghost user management for cross-platform user representation
- Configurable username prefixes for XMPP users
- Ghost user management for cross-platform user representation on the XMPP server with connection lifecycle management (**XEP-0077 only**)
- Comprehensive XMPP client with SASL Plain authentication
To learn more about plugins, see [our plugin documentation](https://developers.mattermost.com/extend/plugins/).
@ -72,7 +72,7 @@ To avoid having to manually install your plugin, build and deploy your plugin us
}
```
### Development guidance
### Development guidance
1. Fewer packages is better: default to the main package unless there's good reason for a new package.
@ -84,7 +84,7 @@ To avoid having to manually install your plugin, build and deploy your plugin us
### Modifying the server boilerplate
The server code comes with some boilerplate for creating an api, using slash commands, accessing the kvstore and using the cluster package for jobs.
The server code comes with some boilerplate for creating an api, using slash commands, accessing the kvstore and using the cluster package for jobs.
#### Api

View file

@ -4,18 +4,20 @@ import (
"crypto/tls"
"flag"
"fmt"
"log"
"log/slog"
"os"
"time"
"github.com/lmittmann/tint"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp"
)
const (
// Default values for development server (sidecar)
defaultServer = "localhost:5222"
defaultUsername = "testuser@localhost"
defaultPassword = "testpass"
defaultUsername = "admin@localhost"
defaultPassword = "admin"
defaultResource = "doctor"
defaultTestRoom = "test1@conference.localhost"
)
@ -46,8 +48,8 @@ func main() {
flag.BoolVar(&config.TestMUC, "test-muc", true, "Enable MUC room testing (join/wait/leave)")
flag.BoolVar(&config.TestDirectMessage, "test-dm", true, "Enable direct message testing (send message to admin user)")
flag.BoolVar(&config.TestRoomExists, "test-room-exists", true, "Enable room existence testing using disco#info")
flag.BoolVar(&config.TestXEP0077, "test-xep0077", true, "Enable XEP-0077 In-Band Registration testing (required if enabled)")
flag.BoolVar(&config.Verbose, "verbose", true, "Enable verbose logging")
flag.BoolVar(&config.TestXEP0077, "test-xep0077", true, "Enable XEP-0077 In-Band Registration testing with ghost user message test")
flag.BoolVar(&config.Verbose, "verbose", false, "Enable verbose logging")
flag.BoolVar(&config.InsecureSkipVerify, "insecure-skip-verify", true, "Skip TLS certificate verification (for development)")
flag.Usage = func() {
@ -71,66 +73,44 @@ func main() {
flag.Parse()
if config.Verbose {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
log.Printf("Starting XMPP client doctor...")
log.Printf("Configuration:")
log.Printf(" Server: %s", config.Server)
log.Printf(" Username: %s", config.Username)
log.Printf(" Resource: %s", config.Resource)
log.Printf(" Password: %s", maskPassword(config.Password))
if config.TestMUC {
log.Printf(" Test Room: %s", config.TestRoom)
}
if config.TestDirectMessage {
log.Printf(" Test Direct Messages: enabled")
}
if config.TestRoomExists {
log.Printf(" Test Room Existence: enabled")
}
if config.TestXEP0077 {
log.Printf(" Test XEP-0077 In-Band Registration: enabled")
}
}
// Create the main logger
mainLogger := NewStructuredLogger(config.Verbose)
mainLogger.LogInfo("Starting XMPP client doctor")
mainLogger.LogInfo("Configuration",
"server", config.Server,
"username", config.Username,
"resource", config.Resource,
"password", maskPassword(config.Password),
"test_room", config.TestRoom,
"test_muc", config.TestMUC,
"test_direct_message", config.TestDirectMessage,
"test_room_exists", config.TestRoomExists,
"test_xep0077", config.TestXEP0077)
// Test the XMPP client
if err := testXMPPClient(config); err != nil {
log.Fatalf("❌ XMPP client test failed: %v", err)
if err := testXMPPClient(config, mainLogger); err != nil {
mainLogger.LogError("XMPP client test failed", "error", err)
os.Exit(1)
}
if config.Verbose {
log.Printf("✅ XMPP client test completed successfully!")
} else {
fmt.Println("✅ XMPP client connectivity test passed!")
if config.TestXEP0077 {
fmt.Println("✅ XMPP XEP-0077 In-Band Registration test passed!")
}
if config.TestMUC {
fmt.Println("✅ XMPP MUC operations test passed!")
}
if config.TestDirectMessage {
fmt.Println("✅ XMPP direct message test passed!")
}
if config.TestRoomExists {
fmt.Println("✅ XMPP room existence test passed!")
}
}
mainLogger.LogInfo("XMPP client test completed successfully",
"xep0077_test", config.TestXEP0077,
"muc_test", config.TestMUC,
"direct_message_test", config.TestDirectMessage,
"room_exists_test", config.TestRoomExists)
}
func testXMPPClient(config *Config) error {
if config.Verbose {
log.Printf("Creating XMPP client...")
}
func testXMPPClient(config *Config, logger *StructuredLogger) error {
logger.LogDebug("Creating XMPP client")
// Create a simple logger for the XMPP client
doctorLogger := &SimpleLogger{verbose: config.Verbose}
// Create a structured logger for the XMPP client (reuse the passed logger)
doctorLogger := logger
// Create XMPP client with optional TLS configuration
var client *xmpp.Client
if config.InsecureSkipVerify {
if config.Verbose {
log.Printf("Using insecure TLS configuration (skipping certificate verification)")
}
logger.LogDebug("Using insecure TLS configuration", "skip_verify", true)
tlsConfig := &tls.Config{
InsecureSkipVerify: true, //nolint:gosec // This is a testing tool for development environments
}
@ -154,9 +134,7 @@ func testXMPPClient(config *Config) error {
)
}
if config.Verbose {
log.Printf("Attempting to connect to XMPP server...")
}
logger.LogDebug("Attempting to connect to XMPP server", "server", config.Server)
// Test connection
start := time.Now()
@ -166,10 +144,8 @@ func testXMPPClient(config *Config) error {
}
connectDuration := time.Since(start)
if config.Verbose {
log.Printf("✅ Connected to XMPP server in %v", connectDuration)
log.Printf("Testing connection health...")
}
logger.LogInfo("Connected to XMPP server", "duration", connectDuration)
logger.LogDebug("Testing connection health")
// Test connection health
start = time.Now()
@ -179,9 +155,7 @@ func testXMPPClient(config *Config) error {
}
pingDuration := time.Since(start)
if config.Verbose {
log.Printf("✅ Connection health test passed in %v", pingDuration)
}
logger.LogInfo("Connection health test passed", "duration", pingDuration)
var xep0077Duration time.Duration
var mucDuration time.Duration
@ -191,7 +165,7 @@ func testXMPPClient(config *Config) error {
// Test XEP-0077 In-Band Registration if requested
if config.TestXEP0077 {
start = time.Now()
err = testXEP0077(client, config)
err = testXEP0077(client, config, logger)
if err != nil {
return fmt.Errorf("XEP-0077 In-Band Registration test failed: %w", err)
}
@ -201,7 +175,7 @@ func testXMPPClient(config *Config) error {
// Test MUC operations if requested
if config.TestMUC {
start = time.Now()
err = testMUCOperations(client, config)
err = testMUCOperations(client, config, logger)
if err != nil {
return fmt.Errorf("MUC operations test failed: %w", err)
}
@ -211,7 +185,7 @@ func testXMPPClient(config *Config) error {
// Test direct message if requested
if config.TestDirectMessage {
start = time.Now()
err = testDirectMessage(client, config)
err = testDirectMessage(client, config, logger)
if err != nil {
return fmt.Errorf("direct message test failed: %w", err)
}
@ -221,16 +195,14 @@ func testXMPPClient(config *Config) error {
// Test room existence if requested
if config.TestRoomExists {
start = time.Now()
err = testRoomExists(client, config)
err = testRoomExists(client, config, logger)
if err != nil {
return fmt.Errorf("room existence test failed: %w", err)
}
roomExistsDuration = time.Since(start)
}
if config.Verbose {
log.Printf("Disconnecting from XMPP server...")
}
logger.LogDebug("Disconnecting from XMPP server")
// Disconnect
start = time.Now()
@ -240,48 +212,37 @@ func testXMPPClient(config *Config) error {
}
disconnectDuration := time.Since(start)
if config.Verbose {
log.Printf("✅ Disconnected from XMPP server in %v", disconnectDuration)
log.Printf("Connection summary:")
log.Printf(" Connect time: %v", connectDuration)
log.Printf(" Ping time: %v", pingDuration)
if config.TestXEP0077 {
log.Printf(" XEP-0077 test time: %v", xep0077Duration)
}
if config.TestMUC {
log.Printf(" MUC operations time: %v", mucDuration)
}
if config.TestDirectMessage {
log.Printf(" Direct message time: %v", dmDuration)
}
if config.TestRoomExists {
log.Printf(" Room existence check time: %v", roomExistsDuration)
}
log.Printf(" Disconnect time: %v", disconnectDuration)
totalTime := connectDuration + pingDuration + disconnectDuration
if config.TestXEP0077 {
totalTime += xep0077Duration
}
if config.TestMUC {
totalTime += mucDuration
}
if config.TestDirectMessage {
totalTime += dmDuration
}
if config.TestRoomExists {
totalTime += roomExistsDuration
}
log.Printf(" Total time: %v", totalTime)
totalTime := connectDuration + pingDuration + disconnectDuration
if config.TestXEP0077 {
totalTime += xep0077Duration
}
if config.TestMUC {
totalTime += mucDuration
}
if config.TestDirectMessage {
totalTime += dmDuration
}
if config.TestRoomExists {
totalTime += roomExistsDuration
}
logger.LogInfo("Disconnected from XMPP server", "disconnect_duration", disconnectDuration)
logger.LogInfo("Connection summary",
"connect_time", connectDuration,
"ping_time", pingDuration,
"xep0077_time", xep0077Duration,
"muc_time", mucDuration,
"direct_message_time", dmDuration,
"room_exists_time", roomExistsDuration,
"disconnect_time", disconnectDuration,
"total_time", totalTime)
return nil
}
func testMUCOperations(client *xmpp.Client, config *Config) error {
if config.Verbose {
log.Printf("Testing MUC operations with room: %s", config.TestRoom)
log.Printf("First checking if room exists...")
}
func testMUCOperations(client *xmpp.Client, config *Config, logger *StructuredLogger) error {
logger.LogInfo("Testing MUC operations", "room", config.TestRoom)
logger.LogDebug("Checking if room exists first")
// Check if room exists before attempting to join
start := time.Now()
@ -291,18 +252,13 @@ func testMUCOperations(client *xmpp.Client, config *Config) error {
}
checkDuration := time.Since(start)
if config.Verbose {
log.Printf("✅ Room existence check completed in %v", checkDuration)
log.Printf("Room %s exists: %t", config.TestRoom, exists)
}
logger.LogInfo("Room existence check completed", "duration", checkDuration, "room", config.TestRoom, "exists", exists)
if !exists {
return fmt.Errorf("cannot test MUC operations: room %s does not exist or is not accessible", config.TestRoom)
}
if config.Verbose {
log.Printf("Room exists, proceeding to join...")
}
logger.LogDebug("Room exists, proceeding to join")
// Test joining the room
start = time.Now()
@ -314,10 +270,8 @@ func testMUCOperations(client *xmpp.Client, config *Config) error {
var sendDuration time.Duration
if config.Verbose {
log.Printf("✅ Successfully joined MUC room in %v", joinDuration)
log.Printf("Sending test message to room...")
}
logger.LogInfo("Successfully joined MUC room", "duration", joinDuration)
logger.LogDebug("Sending test message to room")
// Send a test message
testMessage := fmt.Sprintf("Test message from XMPP doctor at %s", time.Now().Format("15:04:05"))
@ -333,18 +287,13 @@ func testMUCOperations(client *xmpp.Client, config *Config) error {
}
sendDuration = time.Since(start)
if config.Verbose {
log.Printf("✅ Successfully sent message in %v", sendDuration)
log.Printf("Message: %s", testMessage)
log.Printf("Waiting 5 seconds in the room...")
}
logger.LogInfo("Successfully sent message", "duration", sendDuration, "message", testMessage)
logger.LogDebug("Waiting 5 seconds in the room")
// Wait 5 seconds
time.Sleep(5 * time.Second)
if config.Verbose {
log.Printf("Attempting to leave MUC room...")
}
logger.LogDebug("Attempting to leave MUC room")
// Test leaving the room
start = time.Now()
@ -354,25 +303,21 @@ func testMUCOperations(client *xmpp.Client, config *Config) error {
}
leaveDuration := time.Since(start)
if config.Verbose {
log.Printf("✅ Successfully left MUC room in %v", leaveDuration)
log.Printf("MUC operations summary:")
log.Printf(" Room existence check time: %v", checkDuration)
log.Printf(" Join time: %v", joinDuration)
log.Printf(" Send message time: %v", sendDuration)
log.Printf(" Wait time: 5s")
log.Printf(" Leave time: %v", leaveDuration)
log.Printf(" Total MUC time: %v", checkDuration+joinDuration+sendDuration+5*time.Second+leaveDuration)
}
logger.LogInfo("Successfully left MUC room", "duration", leaveDuration)
logger.LogInfo("MUC operations summary",
"room_check_time", checkDuration,
"join_time", joinDuration,
"send_time", sendDuration,
"wait_time", "5s",
"leave_time", leaveDuration,
"total_time", checkDuration+joinDuration+sendDuration+5*time.Second+leaveDuration)
return nil
}
func testDirectMessage(client *xmpp.Client, config *Config) error {
if config.Verbose {
log.Printf("Testing direct message functionality...")
log.Printf("Sending test message to admin user...")
}
func testDirectMessage(client *xmpp.Client, config *Config, logger *StructuredLogger) error {
logger.LogInfo("Testing direct message functionality")
logger.LogDebug("Sending test message to admin user")
// Send a test message to the admin user
testMessage := fmt.Sprintf("Test direct message from XMPP doctor at %s", time.Now().Format("15:04:05"))
@ -385,22 +330,18 @@ func testDirectMessage(client *xmpp.Client, config *Config) error {
}
sendDuration := time.Since(start)
if config.Verbose {
log.Printf("✅ Successfully sent direct message in %v", sendDuration)
log.Printf("Message: %s", testMessage)
log.Printf("Recipient: %s", adminJID)
log.Printf("Direct message test summary:")
log.Printf(" Send message time: %v", sendDuration)
}
logger.LogInfo("Successfully sent direct message",
"duration", sendDuration,
"message", testMessage,
"recipient", adminJID)
logger.LogInfo("Direct message test summary", "send_time", sendDuration)
return nil
}
func testRoomExists(client *xmpp.Client, config *Config) error {
if config.Verbose {
log.Printf("Testing room existence functionality...")
log.Printf("Checking if test room exists: %s", config.TestRoom)
}
func testRoomExists(client *xmpp.Client, config *Config, logger *StructuredLogger) error {
logger.LogInfo("Testing room existence functionality")
logger.LogDebug("Checking if test room exists", "room", config.TestRoom)
// Test room existence check
start := time.Now()
@ -410,16 +351,11 @@ func testRoomExists(client *xmpp.Client, config *Config) error {
}
checkDuration := time.Since(start)
if config.Verbose {
log.Printf("✅ Room existence check completed in %v", checkDuration)
log.Printf("Room %s exists: %t", config.TestRoom, exists)
}
logger.LogInfo("Room existence check completed", "duration", checkDuration, "room", config.TestRoom, "exists", exists)
// Test with a non-existent room to verify negative case
nonExistentRoom := "nonexistent-room-12345@conference.localhost"
if config.Verbose {
log.Printf("Testing negative case with non-existent room: %s", nonExistentRoom)
}
logger.LogDebug("Testing negative case with non-existent room", "room", nonExistentRoom)
start = time.Now()
existsNegative, err := client.CheckRoomExists(nonExistentRoom)
@ -428,14 +364,15 @@ func testRoomExists(client *xmpp.Client, config *Config) error {
}
checkNegativeDuration := time.Since(start)
if config.Verbose {
log.Printf("✅ Negative room existence check completed in %v", checkNegativeDuration)
log.Printf("Non-existent room %s exists: %t (should be false)", nonExistentRoom, existsNegative)
log.Printf("Room existence test summary:")
log.Printf(" Test room check time: %v", checkDuration)
log.Printf(" Negative case check time: %v", checkNegativeDuration)
log.Printf(" Total room existence test time: %v", checkDuration+checkNegativeDuration)
}
logger.LogInfo("Negative room existence check completed",
"duration", checkNegativeDuration,
"room", nonExistentRoom,
"exists", existsNegative,
"expected_false", true)
logger.LogInfo("Room existence test summary",
"test_room_time", checkDuration,
"negative_case_time", checkNegativeDuration,
"total_time", checkDuration+checkNegativeDuration)
return nil
}
@ -447,44 +384,60 @@ func maskPassword(password string) string {
return password[:2] + "****"
}
// SimpleLogger provides basic logging functionality for the doctor command
type SimpleLogger struct {
// StructuredLogger provides structured logging functionality for the doctor command using slog
type StructuredLogger struct {
logger *slog.Logger
verbose bool
}
// LogDebug logs debug messages if verbose mode is enabled
func (l *SimpleLogger) LogDebug(msg string, args ...interface{}) {
if l.verbose {
log.Printf("[DEBUG] "+msg, args...)
// NewStructuredLogger creates a new structured logger with colorized output
func NewStructuredLogger(verbose bool) *StructuredLogger {
// Configure log level based on verbose flag
level := slog.LevelInfo
if verbose {
level = slog.LevelDebug
}
// Create tinted handler for colorized output
handler := tint.NewHandler(os.Stdout, &tint.Options{
Level: level,
TimeFormat: "15:04:05.000", // More concise time format
AddSource: false, // Don't show source file info
})
// Create logger with the handler
logger := slog.New(handler)
return &StructuredLogger{
logger: logger,
verbose: verbose,
}
}
// LogInfo logs info messages
func (l *SimpleLogger) LogInfo(msg string, args ...interface{}) {
log.Printf("[INFO] "+msg, args...)
// LogDebug logs debug messages with structured key-value pairs
func (l *StructuredLogger) LogDebug(msg string, keyValuePairs ...any) {
l.logger.Debug(msg, keyValuePairs...)
}
// LogWarn logs warning messages
func (l *SimpleLogger) LogWarn(msg string, args ...interface{}) {
log.Printf("[WARN] "+msg, args...)
// LogInfo logs info messages with structured key-value pairs
func (l *StructuredLogger) LogInfo(msg string, keyValuePairs ...any) {
l.logger.Info(msg, keyValuePairs...)
}
// LogError logs error messages
func (l *SimpleLogger) LogError(msg string, args ...interface{}) {
log.Printf("[ERROR] "+msg, args...)
// LogWarn logs warning messages with structured key-value pairs
func (l *StructuredLogger) LogWarn(msg string, keyValuePairs ...any) {
l.logger.Warn(msg, keyValuePairs...)
}
// testXEP0077 tests XEP-0077 In-Band Registration functionality by creating and deleting a test user
func testXEP0077(client *xmpp.Client, config *Config) error {
if config.Verbose {
log.Printf("Testing XEP-0077 In-Band Registration functionality...")
}
// LogError logs error messages with structured key-value pairs
func (l *StructuredLogger) LogError(msg string, keyValuePairs ...any) {
l.logger.Error(msg, keyValuePairs...)
}
// First, wait for server capability detection to complete
// This is handled asynchronously in the client Connect method
time.Sleep(2 * time.Second)
// testXEP0077 tests XEP-0077 In-Band Registration by creating a ghost user and testing message sending
func testXEP0077(client *xmpp.Client, config *Config, logger *StructuredLogger) error {
logger.LogInfo("Testing XEP-0077 In-Band Registration with ghost user messaging")
// Check if server supports XEP-0077
inBandReg, err := client.GetInBandRegistration()
if err != nil {
return fmt.Errorf("server does not support XEP-0077 In-Band Registration: %w", err)
@ -494,99 +447,95 @@ func testXEP0077(client *xmpp.Client, config *Config) error {
return fmt.Errorf("XEP-0077 In-Band Registration is not enabled on this server")
}
if config.Verbose {
log.Printf("✅ Server supports XEP-0077 In-Band Registration")
}
serverJID := client.GetJID().Domain()
// Step 1: Test registration fields discovery
start := time.Now()
if config.Verbose {
log.Printf("Testing registration fields discovery for server: %s", serverJID.String())
// Step 1: Create ghost user with admin client
ghostUsername := fmt.Sprintf("ghost_test_%d", time.Now().Unix())
ghostPassword := "testpass123"
ghostJID := fmt.Sprintf("%s@%s", ghostUsername, serverJID.String())
logger.LogInfo("Creating ghost user", "username", ghostUsername)
ghostRegistrationRequest := &xmpp.RegistrationRequest{
Username: ghostUsername,
Password: ghostPassword,
Email: fmt.Sprintf("%s@localhost", ghostUsername),
}
fields, err := inBandReg.GetRegistrationFields(serverJID)
ghostRegResponse, err := inBandReg.RegisterAccount(serverJID, ghostRegistrationRequest)
if err != nil {
return fmt.Errorf("failed to get registration fields from server: %w", err)
return fmt.Errorf("failed to register ghost user: %w", err)
}
fieldsDuration := time.Since(start)
if config.Verbose {
log.Printf("✅ Registration fields discovery completed in %v", fieldsDuration)
log.Printf("Registration fields: required=%v, available=%d", fields.Required, len(fields.Fields))
if !ghostRegResponse.Success {
return fmt.Errorf("ghost user registration failed: %s", ghostRegResponse.Error)
}
// Step 2: Create test user
testUsername := fmt.Sprintf("xmpptest%d", time.Now().Unix())
testPassword := "testpass123"
testEmail := fmt.Sprintf("%s@localhost", testUsername)
logger.LogInfo("Ghost user created successfully", "username", ghostUsername)
if config.Verbose {
log.Printf("Creating test user: %s", testUsername)
}
registrationRequest := &xmpp.RegistrationRequest{
Username: testUsername,
Password: testPassword,
Email: testEmail,
}
start = time.Now()
regResponse, err := inBandReg.RegisterAccount(serverJID, registrationRequest)
if err != nil {
return fmt.Errorf("failed to register test user '%s': %w", testUsername, err)
}
registerDuration := time.Since(start)
if !regResponse.Success {
return fmt.Errorf("user registration failed: %s", regResponse.Error)
}
if config.Verbose {
log.Printf("✅ Test user '%s' registered successfully in %v", testUsername, registerDuration)
log.Printf("Registration response: %s", regResponse.Message)
}
// Step 3: Delete test user (cleanup)
if config.Verbose {
log.Printf("Cleaning up: removing test user '%s'", testUsername)
}
start = time.Now()
cancelResponse, err := inBandReg.CancelRegistration(serverJID)
if err != nil {
if config.Verbose {
log.Printf("⚠️ Failed to remove test user '%s': %v", testUsername, err)
log.Printf("⚠️ Manual cleanup may be required")
}
// Step 2-7: Use ghost client for all operations
var ghostClient *xmpp.Client
if config.InsecureSkipVerify {
tlsConfig := &tls.Config{InsecureSkipVerify: true} //nolint:gosec // Testing tool
ghostClient = xmpp.NewClientWithTLS(config.Server, ghostJID, ghostPassword, "ghost_doctor", "ghost-remote-id", tlsConfig, logger)
} else {
cancelDuration := time.Since(start)
if cancelResponse.Success {
if config.Verbose {
log.Printf("✅ Test user '%s' removed successfully in %v", testUsername, cancelDuration)
}
} else {
if config.Verbose {
log.Printf("⚠️ User removal may have failed: %s", cancelResponse.Error)
}
}
ghostClient = xmpp.NewClient(config.Server, ghostJID, ghostPassword, "ghost_doctor", "ghost-remote-id", logger)
}
if config.Verbose {
log.Printf("XEP-0077 test summary:")
log.Printf(" Server support check: ✅")
log.Printf(" Registration fields discovery time: %v", fieldsDuration)
log.Printf(" User registration time: %v", registerDuration)
log.Printf(" Test username: %s", testUsername)
log.Printf(" Required fields count: %d", len(fields.Required))
log.Printf(" User creation: ✅")
if err == nil && cancelResponse.Success {
log.Printf(" User cleanup: ✅")
} else {
log.Printf(" User cleanup: ⚠️")
}
// Step 2: Connect ghost client
if err := ghostClient.Connect(); err != nil {
return fmt.Errorf("failed to connect ghost user: %w", err)
}
logger.LogInfo("Ghost user connected")
// Step 3: Check test room exists
exists, err := ghostClient.CheckRoomExists(config.TestRoom)
if err != nil {
return fmt.Errorf("failed to check room existence: %w", err)
}
if !exists {
return fmt.Errorf("test room %s does not exist", config.TestRoom)
}
// Step 4: Join test room
if err := ghostClient.JoinRoom(config.TestRoom); err != nil {
return fmt.Errorf("failed to join room: %w", err)
}
logger.LogInfo("Ghost user joined room", "room", config.TestRoom)
// Step 5: Send message to test room
testMessage := fmt.Sprintf("Test ghost user message from %s at %s", ghostUsername, time.Now().Format("15:04:05"))
messageReq := xmpp.MessageRequest{
RoomJID: config.TestRoom,
GhostUserJID: ghostJID,
Message: testMessage,
}
if _, err := ghostClient.SendMessage(&messageReq); err != nil {
return fmt.Errorf("failed to send message: %w", err)
}
logger.LogInfo("Ghost user sent message", "message", testMessage)
// Step 6: Cancel account (ghost user cancels their own registration)
ghostInBandReg, err := ghostClient.GetInBandRegistration()
if err != nil {
return fmt.Errorf("failed to get XEP-0077 handler for ghost user: %w", err)
}
ghostCancellationRequest := &xmpp.CancellationRequest{Username: ghostUsername}
ghostCancelResponse, err := ghostInBandReg.CancelRegistration(serverJID, ghostCancellationRequest)
if err != nil {
return fmt.Errorf("failed to cancel ghost user registration: %w", err)
}
if !ghostCancelResponse.Success {
return fmt.Errorf("ghost user registration cancellation failed: %s", ghostCancelResponse.Error)
}
logger.LogInfo("Ghost user registration cancelled successfully")
// Clean disconnect
if err := ghostClient.Disconnect(); err != nil {
logger.LogWarn("Failed to disconnect ghost client", "error", err)
}
logger.LogInfo("XEP-0077 ghost user test completed successfully", "username", ghostUsername)
return nil
}

1
go.mod
View file

@ -5,6 +5,7 @@ go 1.24.3
require (
github.com/gorilla/mux v1.8.1
github.com/jellydator/ttlcache/v3 v3.4.0
github.com/lmittmann/tint v1.1.2
github.com/mattermost/mattermost/server/public v0.1.10
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.10.0

2
go.sum
View file

@ -425,6 +425,8 @@ github.com/leonklingele/grouper v1.1.2 h1:o1ARBDLOmmasUaNDesWqWCIFH3u7hoFlM84Yrj
github.com/leonklingele/grouper v1.1.2/go.mod h1:6D0M/HVkhs2yRKRFZUoGjeDy7EZTfFBE9gl4kjmIGkA=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lmittmann/tint v1.1.2 h1:2CQzrL6rslrsyjqLDwD11bZ5OpLBPU+g3G/r5LSfS8w=
github.com/lmittmann/tint v1.1.2/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
github.com/lufeee/execinquery v1.2.1 h1:hf0Ems4SHcUGBxpGN7Jz78z1ppVkP/837ZlETPCEtOM=
github.com/lufeee/execinquery v1.2.1/go.mod h1:EC7DrEKView09ocscGHC+apXMIaorh4xqSxS/dy8SbM=
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=

View file

@ -17,9 +17,6 @@
},
"executable": ""
},
"webapp": {
"bundle_path": "webapp/dist/main.js"
},
"settings_schema": {
"header": "Configure the XMPP bridge connection settings below.",
"footer": "For more information about setting up the XMPP bridge, see the [documentation](https://github.com/mattermost/mattermost-plugin-bridge-xmpp/blob/main/README.md).",
@ -65,12 +62,42 @@
"secret": false
},
{
"key": "XMPPUsernamePrefix",
"display_name": "XMPP Username Prefix",
"key": "EnableXMPPGhostUsers",
"display_name": "Enable XMPP Ghost Users",
"type": "bool",
"help_text": "When enabled, individual XMPP accounts will be created for each Mattermost user using XEP-0077 In-Band Registration. If disabled or unsupported, the bridge user will be used for all communications.",
"placeholder": "",
"default": false,
"hosting": "",
"secret": false
},
{
"key": "XMPPGhostUserPrefix",
"display_name": "XMPP Ghost User Prefix",
"type": "text",
"help_text": "Prefix for XMPP users in Mattermost (e.g., 'xmpp' creates usernames like 'xmpp:user@domain')",
"placeholder": "xmpp",
"default": "xmpp",
"help_text": "Prefix for ghost user accounts created on the XMPP server (e.g., 'mm_' creates users like 'mm_john@xmpp.example.com'). Required when ghost users are enabled.",
"placeholder": "mm_",
"default": null,
"hosting": "",
"secret": false
},
{
"key": "XMPPGhostUserDomain",
"display_name": "XMPP Ghost User Domain",
"type": "text",
"help_text": "Domain for ghost user accounts on the XMPP server. If not specified, uses the domain from the XMPP Server URL.",
"placeholder": "xmpp.example.com",
"default": null,
"hosting": "",
"secret": false
},
{
"key": "XMPPGhostUserCleanup",
"display_name": "Enable Ghost User Cleanup",
"type": "bool",
"help_text": "When enabled, ghost user accounts will be automatically removed from the XMPP server when Mattermost users are deleted. Disable to preserve accounts.",
"placeholder": "",
"default": true,
"hosting": "",
"secret": false
},
@ -102,4 +129,4 @@
"version": "v0.1.4"
}
}
}
}

View file

@ -134,20 +134,6 @@ func (b *xmppBridge) createUserManager(cfg *config.Configuration, bridgeID strin
return NewXMPPUserManager(bridgeID, log, store, b.api, cfg, b.bridgeClient)
}
// waitForCapabilityDetection waits for server capability detection to complete
func (b *xmppBridge) waitForCapabilityDetection() error {
if b.bridgeClient == nil {
return fmt.Errorf("bridge client not available")
}
// Trigger capability detection synchronously
if err := b.bridgeClient.DetectServerCapabilities(); err != nil {
return fmt.Errorf("failed to detect server capabilities: %w", err)
}
return nil
}
// checkXEP0077Support checks if the XMPP server supports XEP-0077 In-Band Registration
func (b *xmppBridge) checkXEP0077Support() (bool, error) {
if b.bridgeClient == nil {
@ -237,14 +223,14 @@ func (b *xmppBridge) Start() error {
return fmt.Errorf("failed to connect to XMPP server: %w", err)
}
// Wait for server capability detection to complete before creating user manager
if err := b.waitForCapabilityDetection(); err != nil {
return fmt.Errorf("failed to detect server capabilities: %w", err)
}
// Initialize proper user manager now that we're connected and server capabilities are detected
b.userManager = b.createUserManager(cfg, b.bridgeID, b.logger, b.kvstore)
// Start the user manager to enable lifecycle management
if err := b.userManager.Start(b.ctx); err != nil {
return fmt.Errorf("failed to start user manager: %w", err)
}
// Load and join mapped channels
if err := b.loadAndJoinMappedChannels(); err != nil {
b.logger.LogWarn("Failed to join some mapped channels", "error", err)
@ -265,6 +251,11 @@ func (b *xmppBridge) Stop() error {
b.cancel()
}
// Stop the user manager to stop lifecycle management
if err := b.userManager.Stop(); err != nil {
b.logger.LogWarn("Error stopping user manager", "error", err)
}
if b.bridgeClient != nil {
if err := b.bridgeClient.Disconnect(); err != nil {
b.logger.LogWarn("Error disconnecting from XMPP server", "error", err)

View file

@ -123,6 +123,12 @@ func (h *xmppMessageHandler) sendMessageViaGhostUser(xmppUserManager *UserManage
return h.sendMessageViaBridgeUser(msg, roomJID)
}
// Update user activity in KV store after successful message send
if err := xmppUserManager.UpdateUserActivity(msg.SourceUserID); err != nil {
h.logger.LogError("Failed to update user activity after message send, user may never disconnect", "user_id", msg.SourceUserID, "error", err)
// Don't fail the message send for activity update failures
}
h.logger.LogDebug("Message sent via ghost user",
"source_user_id", msg.SourceUserID,
"ghost_jid", xmppUser.GetJID(),

View file

@ -14,6 +14,14 @@ import (
xmppClient "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp"
)
const (
// ghostUserInactivityTimeout is the duration after which inactive ghost users are disconnected
ghostUserInactivityTimeout = 2 * time.Minute
// ghostUserActivityCheckInterval is how often we check for inactive ghost users
ghostUserActivityCheckInterval = 1 * time.Minute
)
// User represents an XMPP user that implements the BridgeUser interface
type User struct {
// User identity
@ -29,6 +37,11 @@ type User struct {
stateMu sync.RWMutex
connected atomic.Bool
// Activity tracking for lifecycle management
lastActivity time.Time
activityMu sync.RWMutex
enableLifecycleCheck bool // Whether this user should be subject to inactivity disconnection
// Configuration
config *config.Configuration
@ -42,6 +55,11 @@ type User struct {
// NewXMPPUser creates a new XMPP user with specific credentials
func NewXMPPUser(id, displayName, jid, password string, cfg *config.Configuration, log logger.Logger) *User {
return NewXMPPUserWithActivity(id, displayName, jid, password, cfg, log, time.Now(), false)
}
// NewXMPPUserWithActivity creates a new XMPP user with specific credentials, last activity time, and lifecycle setting
func NewXMPPUserWithActivity(id, displayName, jid, password string, cfg *config.Configuration, log logger.Logger, lastActivity time.Time, enableLifecycle bool) *User {
ctx, cancel := context.WithCancel(context.Background())
// Create TLS config based on certificate verification setting
@ -61,15 +79,17 @@ func NewXMPPUser(id, displayName, jid, password string, cfg *config.Configuratio
)
return &User{
id: id,
displayName: displayName,
jid: jid,
client: client,
state: model.UserStateOffline,
config: cfg,
ctx: ctx,
cancel: cancel,
logger: log,
id: id,
displayName: displayName,
jid: jid,
client: client,
state: model.UserStateOffline,
config: cfg,
ctx: ctx,
cancel: cancel,
logger: log,
lastActivity: lastActivity, // Use provided activity time
enableLifecycleCheck: enableLifecycle, // Use provided lifecycle setting
}
}
@ -164,6 +184,9 @@ func (u *User) SendMessageToChannel(channelID, message string) error {
u.logger.LogDebug("XMPP user sending message to channel", "user_id", u.id, "channel_id", channelID)
// Update activity timestamp for this user interaction
u.UpdateLastActivity()
// Ensure we're joined to the room before sending the message
if err := u.EnsureJoinedToRoom(channelID); err != nil {
return fmt.Errorf("failed to ensure joined to room before sending message: %w", err)
@ -368,46 +391,41 @@ func (u *User) GetClient() *xmppClient.Client {
return u.client
}
// UpdateCredentials updates the user's JID and password for ghost user mode
// This creates a new XMPP client with the updated credentials
func (u *User) UpdateCredentials(newJID, newPassword string) error {
u.logger.LogDebug("Updating XMPP user credentials", "user_id", u.id, "old_jid", u.jid, "new_jid", newJID)
// Activity tracking methods
// Disconnect existing client if connected
wasConnected := u.IsConnected()
if wasConnected {
if err := u.Disconnect(); err != nil {
u.logger.LogWarn("Error disconnecting before credential update", "user_id", u.id, "error", err)
}
}
// Create TLS config based on certificate verification setting
tlsConfig := &tls.Config{
InsecureSkipVerify: u.config.XMPPInsecureSkipVerify, //nolint:gosec // Allow insecure TLS for testing environments
}
// Create new XMPP client with updated credentials
newClient := xmppClient.NewClientWithTLS(
u.config.XMPPServerURL,
newJID,
newPassword,
u.config.GetXMPPResource(),
u.id, // Use user ID as remote ID
tlsConfig,
u.logger,
)
// Update user fields
u.jid = newJID
u.client = newClient
// Reconnect if we were previously connected
if wasConnected {
if err := u.Connect(); err != nil {
return fmt.Errorf("failed to reconnect after credential update: %w", err)
}
}
u.logger.LogInfo("XMPP user credentials updated successfully", "user_id", u.id, "new_jid", newJID)
return nil
// UpdateLastActivity updates the last activity timestamp for this user
func (u *User) UpdateLastActivity() {
u.activityMu.Lock()
defer u.activityMu.Unlock()
u.lastActivity = time.Now()
u.logger.LogDebug("Updated last activity for user", "user_id", u.id, "timestamp", u.lastActivity)
}
// GetLastActivity returns the last activity timestamp
func (u *User) GetLastActivity() time.Time {
u.activityMu.RLock()
defer u.activityMu.RUnlock()
return u.lastActivity
}
// IsInactive returns true if the user has been inactive longer than the specified duration
func (u *User) IsInactive(inactivityThreshold time.Duration) bool {
u.activityMu.RLock()
defer u.activityMu.RUnlock()
return time.Since(u.lastActivity) > inactivityThreshold
}
// SetLifecycleManagement enables or disables lifecycle management for this user
func (u *User) SetLifecycleManagement(enabled bool) {
u.activityMu.Lock()
defer u.activityMu.Unlock()
u.enableLifecycleCheck = enabled
u.logger.LogDebug("Lifecycle management setting changed", "user_id", u.id, "enabled", enabled)
}
// IsLifecycleManaged returns true if this user is subject to lifecycle management
func (u *User) IsLifecycleManaged() bool {
u.activityMu.RLock()
defer u.activityMu.RUnlock()
return u.enableLifecycleCheck
}

View file

@ -3,9 +3,11 @@ package xmpp
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/mattermost/mattermost/server/public/plugin"
"mellium.im/xmpp/jid"
@ -33,6 +35,8 @@ type GhostUserData struct {
GhostJID string `json:"ghost_jid"` // XMPP JID of the ghost user
GhostPassword string `json:"ghost_password"` // XMPP password for the ghost user
Created int64 `json:"created"` // Timestamp when ghost was created
LastActivity int64 `json:"last_activity"` // Unix timestamp of last activity
LifecycleEnabled bool `json:"lifecycle_enabled"` // Whether this user should be subject to inactivity checking
}
// UserManager manages XMPP users using XEP-0077 ghost users ONLY
@ -47,11 +51,23 @@ type UserManager struct {
bridgeClient *xmppClient.Client
ctx context.Context
cancel context.CancelFunc
// Node identification for HA environments
nodeID string // Unique identifier for this Mattermost node (from api.GetDiagnosticId)
// Connection caching to prevent connection leaks
activeUsers map[string]*User // Cache of connected users on THIS node
activeUsersMu sync.RWMutex // Protects activeUsers map
}
// NewXMPPUserManager creates a new XMPP-specific user manager for ghost users only
func NewXMPPUserManager(bridgeType string, log logger.Logger, store kvstore.KVStore, api plugin.API, cfg *config.Configuration, bridgeClient *xmppClient.Client) model.BridgeUserManager {
ctx, cancel := context.WithCancel(context.Background())
// Get unique node ID from Mattermost API for HA environments
nodeID := api.GetDiagnosticId()
log.LogDebug("Initializing XMPP user manager", "bridge_type", bridgeType, "node_id", nodeID[:8])
return &UserManager{
bridgeType: bridgeType,
logger: log,
@ -61,6 +77,8 @@ func NewXMPPUserManager(bridgeType string, log logger.Logger, store kvstore.KVSt
bridgeClient: bridgeClient,
ctx: ctx,
cancel: cancel,
nodeID: nodeID,
activeUsers: make(map[string]*User), // Initialize connection cache
}
}
@ -153,34 +171,61 @@ func (m *UserManager) CreateUser(user model.BridgeUser) error {
return nil
}
// GetUser retrieves a user by Mattermost user ID, creating XMPPUser from ghost data
// GetUser retrieves a user by Mattermost user ID, checking cache first, then creating XMPPUser from ghost data
func (m *UserManager) GetUser(mattermostUserID string) (model.BridgeUser, error) {
// First check the connection cache
if cachedUser, found := m.getCachedUser(mattermostUserID); found {
m.logger.LogDebug("Found user in connection cache", "user_id", mattermostUserID, "ghost_jid", cachedUser.GetJID())
return cachedUser, nil
}
// Check if ghost user data exists
ghostData, err := m.loadGhostUserData(mattermostUserID)
if err != nil {
return nil, fmt.Errorf("ghost user not found for Mattermost user %s: %w", mattermostUserID, err)
}
// Create XMPPUser directly with ghost credentials
// Create XMPPUser directly with ghost credentials and activity data
m.configMu.RLock()
cfg := m.config
m.configMu.RUnlock()
user := NewXMPPUser(mattermostUserID, mattermostUserID, ghostData.GhostJID, ghostData.GhostPassword, cfg, m.logger)
// Handle migration of existing users without activity data
lastActivity := time.Now()
if ghostData.LastActivity > 0 {
lastActivity = time.Unix(ghostData.LastActivity, 0)
} else {
// Update the KV store with the migration data
ghostData.LastActivity = lastActivity.Unix()
ghostData.LifecycleEnabled = true
_ = m.storeGhostUserData(mattermostUserID, ghostData) // Don't fail if storage update fails
}
user := m.createXMPPUserWithActivity(mattermostUserID, mattermostUserID, ghostData.GhostJID, ghostData.GhostPassword, cfg, m.logger, lastActivity, ghostData.LifecycleEnabled)
// Ensure the user is connected
if err := m.ensureUserConnected(user, mattermostUserID); err != nil {
return nil, fmt.Errorf("failed to ensure ghost user is connected: %w", err)
}
// Cache the connected user to prevent connection leaks
m.cacheUser(mattermostUserID, user)
return user, nil
}
// GetOrCreateUser retrieves a user by Mattermost user ID, creating a new ghost user if it doesn't exist
func (m *UserManager) GetOrCreateUser(mattermostUserID, displayName string) (model.BridgeUser, error) {
// Try to get existing user first
// First check the connection cache
if cachedUser, found := m.getCachedUser(mattermostUserID); found {
m.logger.LogDebug("Found user in connection cache", "user_id", mattermostUserID, "ghost_jid", cachedUser.GetJID())
return cachedUser, nil
}
// Try to get existing user from KV store
user, err := m.GetUser(mattermostUserID)
if err == nil {
// GetUser already cached the user, so just return it
return user, nil
}
@ -200,15 +245,20 @@ func (m *UserManager) GetOrCreateUser(mattermostUserID, displayName string) (mod
return nil, fmt.Errorf("failed to register ghost user: %w", err)
}
// Create XMPPUser instance with the correct ghost credentials
xmppUser := NewXMPPUser(mattermostUserID, displayName, ghostJID, ghostPassword, cfg, m.logger)
// Initialize activity data for new user
now := time.Now()
// Store ghost user data
// Create XMPPUser instance with the correct ghost credentials and activity data
xmppUser := m.createXMPPUserWithActivity(mattermostUserID, displayName, ghostJID, ghostPassword, cfg, m.logger, now, true)
// Store ghost user data with activity tracking
ghostData := &GhostUserData{
MattermostUserID: mattermostUserID,
GhostJID: ghostJID,
GhostPassword: ghostPassword,
Created: m.getCurrentTimestamp(),
Created: now.Unix(),
LastActivity: now.Unix(),
LifecycleEnabled: true,
}
if err := m.storeGhostUserData(mattermostUserID, ghostData); err != nil {
@ -221,6 +271,9 @@ func (m *UserManager) GetOrCreateUser(mattermostUserID, displayName string) (mod
return nil, fmt.Errorf("failed to connect newly created ghost user: %w", err)
}
// Cache the connected user to prevent connection leaks
m.cacheUser(mattermostUserID, xmppUser)
m.logger.LogInfo("Ghost user created and connected successfully", "mattermost_user_id", mattermostUserID, "ghost_jid", ghostJID)
return xmppUser, nil
}
@ -234,6 +287,15 @@ func (m *UserManager) DeleteUser(mattermostUserID string) error {
return fmt.Errorf("ghost user not found for Mattermost user %s", mattermostUserID)
}
// Disconnect and remove from cache if user is currently active
if cachedUser, found := m.getCachedUser(mattermostUserID); found {
if err := cachedUser.Disconnect(); err != nil {
m.logger.LogWarn("Failed to disconnect cached user during deletion", "mattermost_user_id", mattermostUserID, "error", err)
}
m.removeCachedUser(mattermostUserID)
m.logger.LogDebug("Disconnected and removed user from cache during deletion", "mattermost_user_id", mattermostUserID)
}
// Clean up ghost user account if cleanup is enabled
m.configMu.RLock()
shouldCleanup := m.config.IsGhostUserCleanupEnabled()
@ -301,9 +363,16 @@ func (m *UserManager) Start(ctx context.Context) error {
// Continue starting other users even if one fails
} else {
startedCount++
// Enable lifecycle management for ghost users
if xmppUser, ok := user.(*User); ok {
xmppUser.SetLifecycleManagement(true)
}
}
}
// Start the lifecycle management goroutine
go m.lifecycleManager()
m.logger.LogInfo("XMPP ghost user manager started", "bridge_type", m.bridgeType, "user_count", startedCount)
return nil
}
@ -312,19 +381,43 @@ func (m *UserManager) Start(ctx context.Context) error {
func (m *UserManager) Stop() error {
m.logger.LogDebug("Stopping XMPP ghost user manager", "bridge_type", m.bridgeType)
// Cancel context to stop background goroutines
if m.cancel != nil {
m.cancel()
}
// Get all users from KV store and stop them
users := m.ListUsers()
for _, user := range users {
// Gracefully shutdown all cached connections first (much faster than ListUsers)
cachedUsers := m.getCachedUsers()
disconnectedCount := 0
for _, user := range cachedUsers {
if err := user.Stop(); err != nil {
m.logger.LogWarn("Error stopping ghost user during manager shutdown", "bridge_type", m.bridgeType, "user_id", user.GetID(), "error", err)
m.logger.LogWarn("Error stopping cached ghost user during manager shutdown", "bridge_type", m.bridgeType, "user_id", user.GetID(), "error", err)
} else {
disconnectedCount++
}
}
m.logger.LogInfo("XMPP ghost user manager stopped", "bridge_type", m.bridgeType)
// Clear the entire cache
m.activeUsersMu.Lock()
m.activeUsers = make(map[string]*User)
m.activeUsersMu.Unlock()
// Also check for any users not in cache and stop them (fallback)
users := m.ListUsers()
fallbackStoppedCount := 0
for _, user := range users {
if err := user.Stop(); err != nil {
m.logger.LogWarn("Error stopping ghost user during manager shutdown", "bridge_type", m.bridgeType, "user_id", user.GetID(), "error", err)
} else {
fallbackStoppedCount++
}
}
m.logger.LogInfo("XMPP ghost user manager stopped",
"bridge_type", m.bridgeType,
"cached_users_stopped", disconnectedCount,
"fallback_users_stopped", fallbackStoppedCount)
return nil
}
@ -376,6 +469,30 @@ func (m *UserManager) removeGhostUserData(mattermostUserID string) error {
return m.kvstore.Delete(key)
}
// UpdateUserActivity updates both the in-memory and persisted activity timestamp for a user
func (m *UserManager) UpdateUserActivity(mattermostUserID string) error {
// Load existing ghost user data
ghostData, err := m.loadGhostUserData(mattermostUserID)
if err != nil {
return fmt.Errorf("failed to load ghost user data for activity update: %w", err)
}
// Update the activity timestamp
now := time.Now()
ghostData.LastActivity = now.Unix()
// Store the updated data
if err := m.storeGhostUserData(mattermostUserID, ghostData); err != nil {
return fmt.Errorf("failed to persist activity update: %w", err)
}
m.logger.LogDebug("Updated user activity in KV store",
"user_id", mattermostUserID,
"timestamp", now)
return nil
}
func (m *UserManager) cleanupGhostUser(mattermostUserID string) error {
ghostData, err := m.loadGhostUserData(mattermostUserID)
if err != nil {
@ -396,7 +513,10 @@ func (m *UserManager) cleanupGhostUser(mattermostUserID string) error {
}
// Unregister the ghost user account via XEP-0077
response, err := regHandler.CancelRegistration(ghostJIDParsed.Domain())
cancellationRequest := &xmppClient.CancellationRequest{
Username: ghostJIDParsed.Localpart(), // Extract username from ghost JID
}
response, err := regHandler.CancelRegistration(ghostJIDParsed.Domain(), cancellationRequest)
if err != nil {
return fmt.Errorf("failed to cancel registration for ghost user %s: %w", ghostData.GhostJID, err)
}
@ -446,6 +566,168 @@ func generateSecurePassword() string {
return "temp_secure_password_123"
}
// lifecycleManager runs periodically to check for inactive ghost users and disconnect them
func (m *UserManager) lifecycleManager() {
m.logger.LogDebug("Starting lifecycle manager for ghost user cleanup")
ticker := time.NewTicker(ghostUserActivityCheckInterval)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
m.logger.LogDebug("Lifecycle manager stopped due to context cancellation")
return
case <-ticker.C:
m.checkAndDisconnectInactiveUsers()
}
}
}
// checkAndDisconnectInactiveUsers checks all cached users for inactivity and disconnects inactive ghost users
func (m *UserManager) checkAndDisconnectInactiveUsers() {
m.logger.LogDebug("Checking cached users for inactivity")
// Get all currently cached users (this is much more efficient than ListUsers)
cachedUsers := m.getCachedUsers()
inactiveCount := 0
disconnectedCount := 0
for _, xmppUser := range cachedUsers {
// Only check users that have lifecycle management enabled
if !xmppUser.IsLifecycleManaged() {
continue
}
// Check if user is connected and inactive
if xmppUser.IsConnected() && xmppUser.IsInactive(ghostUserInactivityTimeout) {
inactiveCount++
lastActivity := xmppUser.GetLastActivity()
inactiveDuration := time.Since(lastActivity)
m.logger.LogInfo("Disconnecting inactive ghost user",
"user_id", xmppUser.GetID(),
"jid", xmppUser.GetJID(),
"last_activity", lastActivity,
"inactive_duration", inactiveDuration)
// Gracefully disconnect the inactive user
if err := xmppUser.Disconnect(); err != nil {
m.logger.LogWarn("Failed to disconnect inactive ghost user",
"user_id", xmppUser.GetID(),
"jid", xmppUser.GetJID(),
"error", err)
} else {
disconnectedCount++
// Remove disconnected user from cache to free memory
m.removeCachedUser(xmppUser.GetID())
m.logger.LogDebug("Successfully disconnected and removed inactive ghost user from cache",
"user_id", xmppUser.GetID(),
"jid", xmppUser.GetJID())
}
}
}
if inactiveCount > 0 {
m.logger.LogInfo("Completed inactive user cleanup cycle",
"cached_users_checked", len(cachedUsers),
"inactive_users_found", inactiveCount,
"users_disconnected", disconnectedCount)
} else {
m.logger.LogDebug("No inactive users found during cleanup cycle", "cached_users_checked", len(cachedUsers))
}
}
// createXMPPUserWithActivity creates an XMPP user with node-specific resource and activity data
func (m *UserManager) createXMPPUserWithActivity(id, displayName, userJID, password string, cfg *config.Configuration, log logger.Logger, lastActivity time.Time, enableLifecycle bool) *User {
// Generate node-specific resource to prevent conflicts in HA environments
baseResource := cfg.GetXMPPResource()
nodeSpecificResource := fmt.Sprintf("%s-node-%s", baseResource, m.nodeID[:8])
m.logger.LogDebug("Creating XMPP user with node-specific resource",
"user_id", id,
"base_resource", baseResource,
"node_resource", nodeSpecificResource,
"node_id", m.nodeID[:8])
// Create TLS config based on certificate verification setting
tlsConfig := &tls.Config{
InsecureSkipVerify: cfg.XMPPInsecureSkipVerify, //nolint:gosec // Allow insecure TLS for testing environments
}
// Create XMPP client for this user with provided credentials and node-specific resource
client := xmppClient.NewClientWithTLS(
cfg.XMPPServerURL,
userJID,
password, // Use the provided password (ghost password)
nodeSpecificResource, // Use node-specific resource instead of base resource
id, // Use user ID as remote ID
tlsConfig,
log,
)
ctx, cancel := context.WithCancel(context.Background())
return &User{
id: id,
displayName: displayName,
jid: userJID,
client: client,
state: model.UserStateOffline,
config: cfg,
ctx: ctx,
cancel: cancel,
logger: log,
lastActivity: lastActivity, // Use provided activity time
enableLifecycleCheck: enableLifecycle, // Use provided lifecycle setting
}
}
// Cache management methods for connection caching
// getCachedUser retrieves a user from the connection cache
func (m *UserManager) getCachedUser(mattermostUserID string) (*User, bool) {
m.activeUsersMu.RLock()
defer m.activeUsersMu.RUnlock()
user, exists := m.activeUsers[mattermostUserID]
return user, exists
}
// cacheUser stores a user in the connection cache
func (m *UserManager) cacheUser(mattermostUserID string, user *User) {
m.activeUsersMu.Lock()
defer m.activeUsersMu.Unlock()
m.activeUsers[mattermostUserID] = user
m.logger.LogDebug("Cached user connection",
"user_id", mattermostUserID,
"ghost_jid", user.GetJID(),
"cache_size", len(m.activeUsers))
}
// removeCachedUser removes a user from the connection cache
func (m *UserManager) removeCachedUser(mattermostUserID string) {
m.activeUsersMu.Lock()
defer m.activeUsersMu.Unlock()
if user, exists := m.activeUsers[mattermostUserID]; exists {
delete(m.activeUsers, mattermostUserID)
m.logger.LogDebug("Removed user from cache",
"user_id", mattermostUserID,
"ghost_jid", user.GetJID(),
"cache_size", len(m.activeUsers))
}
}
// getCachedUsers returns all cached users (for lifecycle management)
func (m *UserManager) getCachedUsers() []*User {
m.activeUsersMu.RLock()
defer m.activeUsersMu.RUnlock()
users := make([]*User, 0, len(m.activeUsers))
for _, user := range m.activeUsers {
users = append(users, user)
}
return users
}
func (m *UserManager) getCurrentTimestamp() int64 {
// TODO: Use proper time source (time.Now().Unix())
return 0

View file

@ -182,11 +182,6 @@ func (c *Client) GetInBandRegistration() (*InBandRegistration, error) {
return c.XEPFeatures.InBandRegistration, nil
}
// DetectServerCapabilities discovers which XEPs are supported by the server (public method)
func (c *Client) DetectServerCapabilities() error {
return c.detectServerCapabilities()
}
// detectServerCapabilities discovers which XEPs are supported by the server
func (c *Client) detectServerCapabilities() error {
if c.session == nil {
@ -360,6 +355,13 @@ func (c *Client) Connect() error {
return fmt.Errorf("failed to start session serving")
}
c.logger.LogInfo("XMPP client connected successfully", "jid", c.jidAddr.String())
// Automatically detect server capabilities after successful connection
if err := c.detectServerCapabilities(); err != nil {
c.logger.LogError("Failed to detect server capabilities automatically", "error", err)
// Don't fail the connection for capability detection issues
}
return nil
case <-time.After(10 * time.Second):
return fmt.Errorf("timeout waiting for session to be ready")

View file

@ -2,6 +2,7 @@
package xmpp
import (
"bytes"
"context"
"encoding/xml"
"fmt"
@ -25,8 +26,8 @@ type InBandRegistration struct {
enabled bool
}
// RegistrationQuery represents the <query xmlns='jabber:iq:register'> element
type RegistrationQuery struct {
// InBandRegistrationQuery represents the <query xmlns='jabber:iq:register'> element
type InBandRegistrationQuery struct {
XMLName xml.Name `xml:"jabber:iq:register query"`
Instructions string `xml:"instructions,omitempty"`
Username string `xml:"username,omitempty"`
@ -65,8 +66,13 @@ type RegistrationRequest struct {
AdditionalFields map[string]string `json:"additional_fields,omitempty"`
}
// RegistrationResponse represents the result of a registration operation
type RegistrationResponse struct {
// CancellationRequest represents a request to cancel/remove a user registration
type CancellationRequest struct {
Username string `json:"username"`
}
// InBandRegistrationResponse represents the result of any XEP-0077 In-Band Registration operation
type InBandRegistrationResponse struct {
Success bool `json:"success"`
Error string `json:"error,omitempty"`
Message string `json:"message,omitempty"`
@ -114,7 +120,7 @@ func (r *InBandRegistration) GetRegistrationFields(serverJID jid.JID) (*Registra
To: serverJID,
}
query := RegistrationQuery{}
query := InBandRegistrationQuery{}
ctx, cancel := context.WithTimeout(r.client.ctx, 10*time.Second)
defer cancel()
@ -138,7 +144,7 @@ func (r *InBandRegistration) GetRegistrationFields(serverJID jid.JID) (*Registra
// Create the IQ with query payload
iqWithQuery := struct {
stanza.IQ
Query RegistrationQuery `xml:"jabber:iq:register query"`
Query InBandRegistrationQuery `xml:"jabber:iq:register query"`
}{
IQ: iq,
Query: query,
@ -162,13 +168,13 @@ func (r *InBandRegistration) GetRegistrationFields(serverJID jid.JID) (*Registra
}
// RegisterAccount registers a new account with the server
func (r *InBandRegistration) RegisterAccount(serverJID jid.JID, request *RegistrationRequest) (*RegistrationResponse, error) {
func (r *InBandRegistration) RegisterAccount(serverJID jid.JID, request *RegistrationRequest) (*InBandRegistrationResponse, error) {
if r.client.session == nil {
return nil, fmt.Errorf("XMPP session not established")
}
if request.Username == "" || request.Password == "" {
return &RegistrationResponse{
return &InBandRegistrationResponse{
Success: false,
Error: "username and password are required",
}, nil
@ -180,7 +186,7 @@ func (r *InBandRegistration) RegisterAccount(serverJID jid.JID, request *Registr
To: serverJID,
}
query := RegistrationQuery{
query := InBandRegistrationQuery{
Username: request.Username,
Password: request.Password,
Email: request.Email,
@ -207,57 +213,66 @@ func (r *InBandRegistration) RegisterAccount(serverJID jid.JID, request *Registr
r.logger.LogInfo("Registering new account", "server", serverJID.String(), "username", request.Username)
// Create response channels
responseChannel := make(chan *RegistrationResponse, 1)
// Store response handler temporarily
go func() {
// This is a simplified approach - in practice you'd want proper IQ response handling
response := &RegistrationResponse{
Success: true,
Message: "Account registered successfully",
}
responseChannel <- response
}()
// Create the IQ with query payload
iqWithQuery := struct {
stanza.IQ
Query RegistrationQuery `xml:"jabber:iq:register query"`
}{
IQ: iq,
Query: query,
// Create a buffer to encode the query payload
var queryBuf bytes.Buffer
encoder := xml.NewEncoder(&queryBuf)
if err := encoder.Encode(query); err != nil {
return &InBandRegistrationResponse{
Success: false,
Error: fmt.Sprintf("failed to encode registration query: %v", err),
}, nil
}
encoder.Flush()
// Encode and send the registration IQ
if err := r.client.session.Encode(ctx, iqWithQuery); err != nil {
return &RegistrationResponse{
// Create TokenReader from the encoded query by using xml.NewDecoder
payloadReader := xml.NewDecoder(bytes.NewReader(queryBuf.Bytes()))
// Send the registration IQ and wait for response
response, err := r.client.session.SendIQElement(ctx, payloadReader, iq)
if err != nil {
return &InBandRegistrationResponse{
Success: false,
Error: fmt.Sprintf("failed to send registration request: %v", err),
}, nil
}
// Wait for response
select {
case response := <-responseChannel:
r.logger.LogInfo("Account registration completed", "server", serverJID.String(), "username", request.Username, "success", response.Success)
return response, nil
case <-ctx.Done():
return &RegistrationResponse{
Success: false,
Error: fmt.Sprintf("timeout registering account with %s", serverJID.String()),
}, nil
// Try to unmarshal the response as an error IQ first
responseIQ, err := stanza.UnmarshalIQError(response, xml.StartElement{})
registrationResponse := &InBandRegistrationResponse{}
response.Close()
if err != nil {
// If we can't parse the response, treat it as a failure and log the parse error
registrationResponse.Success = false
registrationResponse.Error = "Failed to parse server response for registration request"
r.logger.LogWarn("Registration response could not be parsed, treating as failure",
"server", serverJID.String(),
"username", request.Username,
"parse_error", err.Error())
} else {
// Successfully unmarshaled - check IQ type
if responseIQ.Type == stanza.ErrorIQ {
registrationResponse.Success = false
registrationResponse.Error = "Server returned error for registration request"
r.logger.LogWarn("Registration failed with server error", "server", serverJID.String(), "username", request.Username, "iq_type", responseIQ.Type)
} else {
registrationResponse.Success = true
registrationResponse.Message = "Account registration completed successfully"
}
}
r.logger.LogInfo("Account registration completed", "server", serverJID.String(), "username", request.Username, "success", registrationResponse.Success)
return registrationResponse, nil
}
// ChangePassword changes the password for an existing account
func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPassword, newPassword string) (*RegistrationResponse, error) {
func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPassword, newPassword string) (*InBandRegistrationResponse, error) {
if r.client.session == nil {
return nil, fmt.Errorf("XMPP session not established")
}
if username == "" || oldPassword == "" || newPassword == "" {
return &RegistrationResponse{
return &InBandRegistrationResponse{
Success: false,
Error: "username, old password, and new password are required",
}, nil
@ -269,7 +284,7 @@ func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPass
To: serverJID,
}
query := RegistrationQuery{
query := InBandRegistrationQuery{
Username: username,
Password: newPassword,
}
@ -282,7 +297,7 @@ func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPass
// Create the IQ with query payload
iqWithQuery := struct {
stanza.IQ
Query RegistrationQuery `xml:"jabber:iq:register query"`
Query InBandRegistrationQuery `xml:"jabber:iq:register query"`
}{
IQ: iq,
Query: query,
@ -290,14 +305,14 @@ func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPass
// Send the password change IQ
if err := r.client.session.Encode(ctx, iqWithQuery); err != nil {
return &RegistrationResponse{
return &InBandRegistrationResponse{
Success: false,
Error: fmt.Sprintf("failed to send password change request: %v", err),
}, nil
}
// In practice, you'd wait for the IQ response here
response := &RegistrationResponse{
response := &InBandRegistrationResponse{
Success: true,
Message: "Password changed successfully",
}
@ -306,50 +321,83 @@ func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPass
return response, nil
}
// CancelRegistration cancels/removes an existing registration
func (r *InBandRegistration) CancelRegistration(serverJID jid.JID) (*RegistrationResponse, error) {
// CancelRegistration cancels/removes an existing registration for the specified user
func (r *InBandRegistration) CancelRegistration(serverJID jid.JID, request *CancellationRequest) (*InBandRegistrationResponse, error) {
if r.client.session == nil {
return nil, fmt.Errorf("XMPP session not established")
}
if request.Username == "" {
return &InBandRegistrationResponse{
Success: false,
Error: "username is required",
}, nil
}
// Create cancellation IQ
iq := stanza.IQ{
Type: stanza.SetIQ,
To: serverJID,
}
query := RegistrationQuery{
Remove: &struct{}{}, // Empty struct indicates removal
query := InBandRegistrationQuery{
Username: request.Username, // Specify which user to remove
Remove: &struct{}{}, // Removal flag
}
ctx, cancel := context.WithTimeout(r.client.ctx, 10*time.Second)
defer cancel()
r.logger.LogInfo("Cancelling registration", "server", serverJID.String())
r.logger.LogInfo("Cancelling registration", "server", serverJID.String(), "username", request.Username)
// Create the IQ with query payload
iqWithQuery := struct {
stanza.IQ
Query RegistrationQuery `xml:"jabber:iq:register query"`
}{
IQ: iq,
Query: query,
// Create a buffer to encode the query payload
var queryBuf bytes.Buffer
encoder := xml.NewEncoder(&queryBuf)
if err := encoder.Encode(query); err != nil {
return &InBandRegistrationResponse{
Success: false,
Error: fmt.Sprintf("failed to encode cancellation query: %v", err),
}, nil
}
encoder.Flush()
// Send the cancellation IQ
if err := r.client.session.Encode(ctx, iqWithQuery); err != nil {
return &RegistrationResponse{
// Create TokenReader from the encoded query
payloadReader := xml.NewDecoder(bytes.NewReader(queryBuf.Bytes()))
// Send the cancellation IQ and wait for response
response, err := r.client.session.SendIQElement(ctx, payloadReader, iq)
if err != nil {
return &InBandRegistrationResponse{
Success: false,
Error: fmt.Sprintf("failed to send registration cancellation request: %v", err),
}, nil
}
// In practice, you'd wait for the IQ response here
response := &RegistrationResponse{
Success: true,
Message: "Registration cancelled successfully",
// Try to unmarshal the response as an error IQ first
responseIQ, err := stanza.UnmarshalIQError(response, xml.StartElement{})
cancellationResponse := &InBandRegistrationResponse{}
response.Close()
if err != nil {
// If we can't parse the response, treat it as a failure and log the parse error
cancellationResponse.Success = false
cancellationResponse.Error = "Failed to parse server response for cancellation request"
r.logger.LogWarn("Cancellation response could not be parsed, treating as failure",
"server", serverJID.String(),
"username", request.Username,
"parse_error", err.Error())
} else {
// Successfully unmarshaled - check IQ type
if responseIQ.Type == stanza.ErrorIQ {
cancellationResponse.Success = false
cancellationResponse.Error = "Server returned error for cancellation request"
r.logger.LogWarn("Registration cancellation failed with server error", "server", serverJID.String(), "username", request.Username, "iq_type", responseIQ.Type)
} else {
cancellationResponse.Success = true
cancellationResponse.Message = "Registration cancelled successfully"
}
}
r.logger.LogInfo("Registration cancellation completed", "server", serverJID.String())
return response, nil
r.logger.LogInfo("Registration cancellation completed", "server", serverJID.String(), "username", request.Username, "success", cancellationResponse.Success)
return cancellationResponse, nil
}

View file

@ -14,43 +14,7 @@ services:
- "7777:7777" # File transfer proxy
volumes:
- openfire_data:/var/lib/openfire
depends_on:
- postgres
networks:
- openfire-network
postgres:
image: postgres:15
container_name: openfire-postgres
restart: unless-stopped
environment:
- POSTGRES_DB=openfire
- POSTGRES_USER=openfire
- POSTGRES_PASSWORD=openfire123
volumes:
- postgres_data:/var/lib/postgresql/data
# ports:
# - "5432:5432" # Exposed for development access
networks:
- openfire-network
adminer:
image: adminer:latest
container_name: openfire-adminer
restart: unless-stopped
# ports:
# - "8080:8080" # Database management interface
depends_on:
- postgres
networks:
- openfire-network
volumes:
openfire_data:
driver: local
postgres_data:
driver: local
networks:
openfire-network:
driver: bridge