diff --git a/README.md b/README.md index 9e24249..072b947 100644 --- a/README.md +++ b/README.md @@ -9,8 +9,8 @@ This plugin provides bidirectional message synchronization between Mattermost an - Bidirectional message synchronization (Mattermost ↔ XMPP) - XMPP Multi-User Chat (MUC) support -- Configurable username prefixes for XMPP users in Mattermost -- Ghost user management for cross-platform user representation +- Configurable username prefixes for XMPP users +- Ghost user management for cross-platform user representation on the XMPP server with connection lifecycle management (**XEP-0077 only**) - Comprehensive XMPP client with SASL Plain authentication To learn more about plugins, see [our plugin documentation](https://developers.mattermost.com/extend/plugins/). @@ -72,7 +72,7 @@ To avoid having to manually install your plugin, build and deploy your plugin us } ``` -### Development guidance +### Development guidance 1. Fewer packages is better: default to the main package unless there's good reason for a new package. @@ -84,7 +84,7 @@ To avoid having to manually install your plugin, build and deploy your plugin us ### Modifying the server boilerplate -The server code comes with some boilerplate for creating an api, using slash commands, accessing the kvstore and using the cluster package for jobs. +The server code comes with some boilerplate for creating an api, using slash commands, accessing the kvstore and using the cluster package for jobs. #### Api diff --git a/cmd/xmpp-client-doctor/main.go b/cmd/xmpp-client-doctor/main.go index 35a0228..3b1c26f 100644 --- a/cmd/xmpp-client-doctor/main.go +++ b/cmd/xmpp-client-doctor/main.go @@ -4,18 +4,20 @@ import ( "crypto/tls" "flag" "fmt" - "log" + "log/slog" "os" "time" + "github.com/lmittmann/tint" + "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp" ) const ( // Default values for development server (sidecar) defaultServer = "localhost:5222" - defaultUsername = "testuser@localhost" - defaultPassword = "testpass" + defaultUsername = "admin@localhost" + defaultPassword = "admin" defaultResource = "doctor" defaultTestRoom = "test1@conference.localhost" ) @@ -46,8 +48,8 @@ func main() { flag.BoolVar(&config.TestMUC, "test-muc", true, "Enable MUC room testing (join/wait/leave)") flag.BoolVar(&config.TestDirectMessage, "test-dm", true, "Enable direct message testing (send message to admin user)") flag.BoolVar(&config.TestRoomExists, "test-room-exists", true, "Enable room existence testing using disco#info") - flag.BoolVar(&config.TestXEP0077, "test-xep0077", true, "Enable XEP-0077 In-Band Registration testing (required if enabled)") - flag.BoolVar(&config.Verbose, "verbose", true, "Enable verbose logging") + flag.BoolVar(&config.TestXEP0077, "test-xep0077", true, "Enable XEP-0077 In-Band Registration testing with ghost user message test") + flag.BoolVar(&config.Verbose, "verbose", false, "Enable verbose logging") flag.BoolVar(&config.InsecureSkipVerify, "insecure-skip-verify", true, "Skip TLS certificate verification (for development)") flag.Usage = func() { @@ -71,66 +73,44 @@ func main() { flag.Parse() - if config.Verbose { - log.SetFlags(log.LstdFlags | log.Lmicroseconds) - log.Printf("Starting XMPP client doctor...") - log.Printf("Configuration:") - log.Printf(" Server: %s", config.Server) - log.Printf(" Username: %s", config.Username) - log.Printf(" Resource: %s", config.Resource) - log.Printf(" Password: %s", maskPassword(config.Password)) - if config.TestMUC { - log.Printf(" Test Room: %s", config.TestRoom) - } - if config.TestDirectMessage { - log.Printf(" Test Direct Messages: enabled") - } - if config.TestRoomExists { - log.Printf(" Test Room Existence: enabled") - } - if config.TestXEP0077 { - log.Printf(" Test XEP-0077 In-Band Registration: enabled") - } - } + // Create the main logger + mainLogger := NewStructuredLogger(config.Verbose) + + mainLogger.LogInfo("Starting XMPP client doctor") + mainLogger.LogInfo("Configuration", + "server", config.Server, + "username", config.Username, + "resource", config.Resource, + "password", maskPassword(config.Password), + "test_room", config.TestRoom, + "test_muc", config.TestMUC, + "test_direct_message", config.TestDirectMessage, + "test_room_exists", config.TestRoomExists, + "test_xep0077", config.TestXEP0077) // Test the XMPP client - if err := testXMPPClient(config); err != nil { - log.Fatalf("❌ XMPP client test failed: %v", err) + if err := testXMPPClient(config, mainLogger); err != nil { + mainLogger.LogError("XMPP client test failed", "error", err) + os.Exit(1) } - if config.Verbose { - log.Printf("✅ XMPP client test completed successfully!") - } else { - fmt.Println("✅ XMPP client connectivity test passed!") - if config.TestXEP0077 { - fmt.Println("✅ XMPP XEP-0077 In-Band Registration test passed!") - } - if config.TestMUC { - fmt.Println("✅ XMPP MUC operations test passed!") - } - if config.TestDirectMessage { - fmt.Println("✅ XMPP direct message test passed!") - } - if config.TestRoomExists { - fmt.Println("✅ XMPP room existence test passed!") - } - } + mainLogger.LogInfo("XMPP client test completed successfully", + "xep0077_test", config.TestXEP0077, + "muc_test", config.TestMUC, + "direct_message_test", config.TestDirectMessage, + "room_exists_test", config.TestRoomExists) } -func testXMPPClient(config *Config) error { - if config.Verbose { - log.Printf("Creating XMPP client...") - } +func testXMPPClient(config *Config, logger *StructuredLogger) error { + logger.LogDebug("Creating XMPP client") - // Create a simple logger for the XMPP client - doctorLogger := &SimpleLogger{verbose: config.Verbose} + // Create a structured logger for the XMPP client (reuse the passed logger) + doctorLogger := logger // Create XMPP client with optional TLS configuration var client *xmpp.Client if config.InsecureSkipVerify { - if config.Verbose { - log.Printf("Using insecure TLS configuration (skipping certificate verification)") - } + logger.LogDebug("Using insecure TLS configuration", "skip_verify", true) tlsConfig := &tls.Config{ InsecureSkipVerify: true, //nolint:gosec // This is a testing tool for development environments } @@ -154,9 +134,7 @@ func testXMPPClient(config *Config) error { ) } - if config.Verbose { - log.Printf("Attempting to connect to XMPP server...") - } + logger.LogDebug("Attempting to connect to XMPP server", "server", config.Server) // Test connection start := time.Now() @@ -166,10 +144,8 @@ func testXMPPClient(config *Config) error { } connectDuration := time.Since(start) - if config.Verbose { - log.Printf("✅ Connected to XMPP server in %v", connectDuration) - log.Printf("Testing connection health...") - } + logger.LogInfo("Connected to XMPP server", "duration", connectDuration) + logger.LogDebug("Testing connection health") // Test connection health start = time.Now() @@ -179,9 +155,7 @@ func testXMPPClient(config *Config) error { } pingDuration := time.Since(start) - if config.Verbose { - log.Printf("✅ Connection health test passed in %v", pingDuration) - } + logger.LogInfo("Connection health test passed", "duration", pingDuration) var xep0077Duration time.Duration var mucDuration time.Duration @@ -191,7 +165,7 @@ func testXMPPClient(config *Config) error { // Test XEP-0077 In-Band Registration if requested if config.TestXEP0077 { start = time.Now() - err = testXEP0077(client, config) + err = testXEP0077(client, config, logger) if err != nil { return fmt.Errorf("XEP-0077 In-Band Registration test failed: %w", err) } @@ -201,7 +175,7 @@ func testXMPPClient(config *Config) error { // Test MUC operations if requested if config.TestMUC { start = time.Now() - err = testMUCOperations(client, config) + err = testMUCOperations(client, config, logger) if err != nil { return fmt.Errorf("MUC operations test failed: %w", err) } @@ -211,7 +185,7 @@ func testXMPPClient(config *Config) error { // Test direct message if requested if config.TestDirectMessage { start = time.Now() - err = testDirectMessage(client, config) + err = testDirectMessage(client, config, logger) if err != nil { return fmt.Errorf("direct message test failed: %w", err) } @@ -221,16 +195,14 @@ func testXMPPClient(config *Config) error { // Test room existence if requested if config.TestRoomExists { start = time.Now() - err = testRoomExists(client, config) + err = testRoomExists(client, config, logger) if err != nil { return fmt.Errorf("room existence test failed: %w", err) } roomExistsDuration = time.Since(start) } - if config.Verbose { - log.Printf("Disconnecting from XMPP server...") - } + logger.LogDebug("Disconnecting from XMPP server") // Disconnect start = time.Now() @@ -240,48 +212,37 @@ func testXMPPClient(config *Config) error { } disconnectDuration := time.Since(start) - if config.Verbose { - log.Printf("✅ Disconnected from XMPP server in %v", disconnectDuration) - log.Printf("Connection summary:") - log.Printf(" Connect time: %v", connectDuration) - log.Printf(" Ping time: %v", pingDuration) - if config.TestXEP0077 { - log.Printf(" XEP-0077 test time: %v", xep0077Duration) - } - if config.TestMUC { - log.Printf(" MUC operations time: %v", mucDuration) - } - if config.TestDirectMessage { - log.Printf(" Direct message time: %v", dmDuration) - } - if config.TestRoomExists { - log.Printf(" Room existence check time: %v", roomExistsDuration) - } - log.Printf(" Disconnect time: %v", disconnectDuration) - totalTime := connectDuration + pingDuration + disconnectDuration - if config.TestXEP0077 { - totalTime += xep0077Duration - } - if config.TestMUC { - totalTime += mucDuration - } - if config.TestDirectMessage { - totalTime += dmDuration - } - if config.TestRoomExists { - totalTime += roomExistsDuration - } - log.Printf(" Total time: %v", totalTime) + totalTime := connectDuration + pingDuration + disconnectDuration + if config.TestXEP0077 { + totalTime += xep0077Duration } + if config.TestMUC { + totalTime += mucDuration + } + if config.TestDirectMessage { + totalTime += dmDuration + } + if config.TestRoomExists { + totalTime += roomExistsDuration + } + + logger.LogInfo("Disconnected from XMPP server", "disconnect_duration", disconnectDuration) + logger.LogInfo("Connection summary", + "connect_time", connectDuration, + "ping_time", pingDuration, + "xep0077_time", xep0077Duration, + "muc_time", mucDuration, + "direct_message_time", dmDuration, + "room_exists_time", roomExistsDuration, + "disconnect_time", disconnectDuration, + "total_time", totalTime) return nil } -func testMUCOperations(client *xmpp.Client, config *Config) error { - if config.Verbose { - log.Printf("Testing MUC operations with room: %s", config.TestRoom) - log.Printf("First checking if room exists...") - } +func testMUCOperations(client *xmpp.Client, config *Config, logger *StructuredLogger) error { + logger.LogInfo("Testing MUC operations", "room", config.TestRoom) + logger.LogDebug("Checking if room exists first") // Check if room exists before attempting to join start := time.Now() @@ -291,18 +252,13 @@ func testMUCOperations(client *xmpp.Client, config *Config) error { } checkDuration := time.Since(start) - if config.Verbose { - log.Printf("✅ Room existence check completed in %v", checkDuration) - log.Printf("Room %s exists: %t", config.TestRoom, exists) - } + logger.LogInfo("Room existence check completed", "duration", checkDuration, "room", config.TestRoom, "exists", exists) if !exists { return fmt.Errorf("cannot test MUC operations: room %s does not exist or is not accessible", config.TestRoom) } - if config.Verbose { - log.Printf("Room exists, proceeding to join...") - } + logger.LogDebug("Room exists, proceeding to join") // Test joining the room start = time.Now() @@ -314,10 +270,8 @@ func testMUCOperations(client *xmpp.Client, config *Config) error { var sendDuration time.Duration - if config.Verbose { - log.Printf("✅ Successfully joined MUC room in %v", joinDuration) - log.Printf("Sending test message to room...") - } + logger.LogInfo("Successfully joined MUC room", "duration", joinDuration) + logger.LogDebug("Sending test message to room") // Send a test message testMessage := fmt.Sprintf("Test message from XMPP doctor at %s", time.Now().Format("15:04:05")) @@ -333,18 +287,13 @@ func testMUCOperations(client *xmpp.Client, config *Config) error { } sendDuration = time.Since(start) - if config.Verbose { - log.Printf("✅ Successfully sent message in %v", sendDuration) - log.Printf("Message: %s", testMessage) - log.Printf("Waiting 5 seconds in the room...") - } + logger.LogInfo("Successfully sent message", "duration", sendDuration, "message", testMessage) + logger.LogDebug("Waiting 5 seconds in the room") // Wait 5 seconds time.Sleep(5 * time.Second) - if config.Verbose { - log.Printf("Attempting to leave MUC room...") - } + logger.LogDebug("Attempting to leave MUC room") // Test leaving the room start = time.Now() @@ -354,25 +303,21 @@ func testMUCOperations(client *xmpp.Client, config *Config) error { } leaveDuration := time.Since(start) - if config.Verbose { - log.Printf("✅ Successfully left MUC room in %v", leaveDuration) - log.Printf("MUC operations summary:") - log.Printf(" Room existence check time: %v", checkDuration) - log.Printf(" Join time: %v", joinDuration) - log.Printf(" Send message time: %v", sendDuration) - log.Printf(" Wait time: 5s") - log.Printf(" Leave time: %v", leaveDuration) - log.Printf(" Total MUC time: %v", checkDuration+joinDuration+sendDuration+5*time.Second+leaveDuration) - } + logger.LogInfo("Successfully left MUC room", "duration", leaveDuration) + logger.LogInfo("MUC operations summary", + "room_check_time", checkDuration, + "join_time", joinDuration, + "send_time", sendDuration, + "wait_time", "5s", + "leave_time", leaveDuration, + "total_time", checkDuration+joinDuration+sendDuration+5*time.Second+leaveDuration) return nil } -func testDirectMessage(client *xmpp.Client, config *Config) error { - if config.Verbose { - log.Printf("Testing direct message functionality...") - log.Printf("Sending test message to admin user...") - } +func testDirectMessage(client *xmpp.Client, config *Config, logger *StructuredLogger) error { + logger.LogInfo("Testing direct message functionality") + logger.LogDebug("Sending test message to admin user") // Send a test message to the admin user testMessage := fmt.Sprintf("Test direct message from XMPP doctor at %s", time.Now().Format("15:04:05")) @@ -385,22 +330,18 @@ func testDirectMessage(client *xmpp.Client, config *Config) error { } sendDuration := time.Since(start) - if config.Verbose { - log.Printf("✅ Successfully sent direct message in %v", sendDuration) - log.Printf("Message: %s", testMessage) - log.Printf("Recipient: %s", adminJID) - log.Printf("Direct message test summary:") - log.Printf(" Send message time: %v", sendDuration) - } + logger.LogInfo("Successfully sent direct message", + "duration", sendDuration, + "message", testMessage, + "recipient", adminJID) + logger.LogInfo("Direct message test summary", "send_time", sendDuration) return nil } -func testRoomExists(client *xmpp.Client, config *Config) error { - if config.Verbose { - log.Printf("Testing room existence functionality...") - log.Printf("Checking if test room exists: %s", config.TestRoom) - } +func testRoomExists(client *xmpp.Client, config *Config, logger *StructuredLogger) error { + logger.LogInfo("Testing room existence functionality") + logger.LogDebug("Checking if test room exists", "room", config.TestRoom) // Test room existence check start := time.Now() @@ -410,16 +351,11 @@ func testRoomExists(client *xmpp.Client, config *Config) error { } checkDuration := time.Since(start) - if config.Verbose { - log.Printf("✅ Room existence check completed in %v", checkDuration) - log.Printf("Room %s exists: %t", config.TestRoom, exists) - } + logger.LogInfo("Room existence check completed", "duration", checkDuration, "room", config.TestRoom, "exists", exists) // Test with a non-existent room to verify negative case nonExistentRoom := "nonexistent-room-12345@conference.localhost" - if config.Verbose { - log.Printf("Testing negative case with non-existent room: %s", nonExistentRoom) - } + logger.LogDebug("Testing negative case with non-existent room", "room", nonExistentRoom) start = time.Now() existsNegative, err := client.CheckRoomExists(nonExistentRoom) @@ -428,14 +364,15 @@ func testRoomExists(client *xmpp.Client, config *Config) error { } checkNegativeDuration := time.Since(start) - if config.Verbose { - log.Printf("✅ Negative room existence check completed in %v", checkNegativeDuration) - log.Printf("Non-existent room %s exists: %t (should be false)", nonExistentRoom, existsNegative) - log.Printf("Room existence test summary:") - log.Printf(" Test room check time: %v", checkDuration) - log.Printf(" Negative case check time: %v", checkNegativeDuration) - log.Printf(" Total room existence test time: %v", checkDuration+checkNegativeDuration) - } + logger.LogInfo("Negative room existence check completed", + "duration", checkNegativeDuration, + "room", nonExistentRoom, + "exists", existsNegative, + "expected_false", true) + logger.LogInfo("Room existence test summary", + "test_room_time", checkDuration, + "negative_case_time", checkNegativeDuration, + "total_time", checkDuration+checkNegativeDuration) return nil } @@ -447,44 +384,60 @@ func maskPassword(password string) string { return password[:2] + "****" } -// SimpleLogger provides basic logging functionality for the doctor command -type SimpleLogger struct { +// StructuredLogger provides structured logging functionality for the doctor command using slog +type StructuredLogger struct { + logger *slog.Logger verbose bool } -// LogDebug logs debug messages if verbose mode is enabled -func (l *SimpleLogger) LogDebug(msg string, args ...interface{}) { - if l.verbose { - log.Printf("[DEBUG] "+msg, args...) +// NewStructuredLogger creates a new structured logger with colorized output +func NewStructuredLogger(verbose bool) *StructuredLogger { + // Configure log level based on verbose flag + level := slog.LevelInfo + if verbose { + level = slog.LevelDebug + } + + // Create tinted handler for colorized output + handler := tint.NewHandler(os.Stdout, &tint.Options{ + Level: level, + TimeFormat: "15:04:05.000", // More concise time format + AddSource: false, // Don't show source file info + }) + + // Create logger with the handler + logger := slog.New(handler) + + return &StructuredLogger{ + logger: logger, + verbose: verbose, } } -// LogInfo logs info messages -func (l *SimpleLogger) LogInfo(msg string, args ...interface{}) { - log.Printf("[INFO] "+msg, args...) +// LogDebug logs debug messages with structured key-value pairs +func (l *StructuredLogger) LogDebug(msg string, keyValuePairs ...any) { + l.logger.Debug(msg, keyValuePairs...) } -// LogWarn logs warning messages -func (l *SimpleLogger) LogWarn(msg string, args ...interface{}) { - log.Printf("[WARN] "+msg, args...) +// LogInfo logs info messages with structured key-value pairs +func (l *StructuredLogger) LogInfo(msg string, keyValuePairs ...any) { + l.logger.Info(msg, keyValuePairs...) } -// LogError logs error messages -func (l *SimpleLogger) LogError(msg string, args ...interface{}) { - log.Printf("[ERROR] "+msg, args...) +// LogWarn logs warning messages with structured key-value pairs +func (l *StructuredLogger) LogWarn(msg string, keyValuePairs ...any) { + l.logger.Warn(msg, keyValuePairs...) } -// testXEP0077 tests XEP-0077 In-Band Registration functionality by creating and deleting a test user -func testXEP0077(client *xmpp.Client, config *Config) error { - if config.Verbose { - log.Printf("Testing XEP-0077 In-Band Registration functionality...") - } +// LogError logs error messages with structured key-value pairs +func (l *StructuredLogger) LogError(msg string, keyValuePairs ...any) { + l.logger.Error(msg, keyValuePairs...) +} - // First, wait for server capability detection to complete - // This is handled asynchronously in the client Connect method - time.Sleep(2 * time.Second) +// testXEP0077 tests XEP-0077 In-Band Registration by creating a ghost user and testing message sending +func testXEP0077(client *xmpp.Client, config *Config, logger *StructuredLogger) error { + logger.LogInfo("Testing XEP-0077 In-Band Registration with ghost user messaging") - // Check if server supports XEP-0077 inBandReg, err := client.GetInBandRegistration() if err != nil { return fmt.Errorf("server does not support XEP-0077 In-Band Registration: %w", err) @@ -494,99 +447,95 @@ func testXEP0077(client *xmpp.Client, config *Config) error { return fmt.Errorf("XEP-0077 In-Band Registration is not enabled on this server") } - if config.Verbose { - log.Printf("✅ Server supports XEP-0077 In-Band Registration") - } - serverJID := client.GetJID().Domain() - // Step 1: Test registration fields discovery - start := time.Now() - if config.Verbose { - log.Printf("Testing registration fields discovery for server: %s", serverJID.String()) + // Step 1: Create ghost user with admin client + ghostUsername := fmt.Sprintf("ghost_test_%d", time.Now().Unix()) + ghostPassword := "testpass123" + ghostJID := fmt.Sprintf("%s@%s", ghostUsername, serverJID.String()) + + logger.LogInfo("Creating ghost user", "username", ghostUsername) + + ghostRegistrationRequest := &xmpp.RegistrationRequest{ + Username: ghostUsername, + Password: ghostPassword, + Email: fmt.Sprintf("%s@localhost", ghostUsername), } - fields, err := inBandReg.GetRegistrationFields(serverJID) + ghostRegResponse, err := inBandReg.RegisterAccount(serverJID, ghostRegistrationRequest) if err != nil { - return fmt.Errorf("failed to get registration fields from server: %w", err) + return fmt.Errorf("failed to register ghost user: %w", err) } - fieldsDuration := time.Since(start) - - if config.Verbose { - log.Printf("✅ Registration fields discovery completed in %v", fieldsDuration) - log.Printf("Registration fields: required=%v, available=%d", fields.Required, len(fields.Fields)) + if !ghostRegResponse.Success { + return fmt.Errorf("ghost user registration failed: %s", ghostRegResponse.Error) } - // Step 2: Create test user - testUsername := fmt.Sprintf("xmpptest%d", time.Now().Unix()) - testPassword := "testpass123" - testEmail := fmt.Sprintf("%s@localhost", testUsername) + logger.LogInfo("Ghost user created successfully", "username", ghostUsername) - if config.Verbose { - log.Printf("Creating test user: %s", testUsername) - } - - registrationRequest := &xmpp.RegistrationRequest{ - Username: testUsername, - Password: testPassword, - Email: testEmail, - } - - start = time.Now() - regResponse, err := inBandReg.RegisterAccount(serverJID, registrationRequest) - if err != nil { - return fmt.Errorf("failed to register test user '%s': %w", testUsername, err) - } - registerDuration := time.Since(start) - - if !regResponse.Success { - return fmt.Errorf("user registration failed: %s", regResponse.Error) - } - - if config.Verbose { - log.Printf("✅ Test user '%s' registered successfully in %v", testUsername, registerDuration) - log.Printf("Registration response: %s", regResponse.Message) - } - - // Step 3: Delete test user (cleanup) - if config.Verbose { - log.Printf("Cleaning up: removing test user '%s'", testUsername) - } - - start = time.Now() - cancelResponse, err := inBandReg.CancelRegistration(serverJID) - if err != nil { - if config.Verbose { - log.Printf("⚠️ Failed to remove test user '%s': %v", testUsername, err) - log.Printf("⚠️ Manual cleanup may be required") - } + // Step 2-7: Use ghost client for all operations + var ghostClient *xmpp.Client + if config.InsecureSkipVerify { + tlsConfig := &tls.Config{InsecureSkipVerify: true} //nolint:gosec // Testing tool + ghostClient = xmpp.NewClientWithTLS(config.Server, ghostJID, ghostPassword, "ghost_doctor", "ghost-remote-id", tlsConfig, logger) } else { - cancelDuration := time.Since(start) - if cancelResponse.Success { - if config.Verbose { - log.Printf("✅ Test user '%s' removed successfully in %v", testUsername, cancelDuration) - } - } else { - if config.Verbose { - log.Printf("⚠️ User removal may have failed: %s", cancelResponse.Error) - } - } + ghostClient = xmpp.NewClient(config.Server, ghostJID, ghostPassword, "ghost_doctor", "ghost-remote-id", logger) } - if config.Verbose { - log.Printf("XEP-0077 test summary:") - log.Printf(" Server support check: ✅") - log.Printf(" Registration fields discovery time: %v", fieldsDuration) - log.Printf(" User registration time: %v", registerDuration) - log.Printf(" Test username: %s", testUsername) - log.Printf(" Required fields count: %d", len(fields.Required)) - log.Printf(" User creation: ✅") - if err == nil && cancelResponse.Success { - log.Printf(" User cleanup: ✅") - } else { - log.Printf(" User cleanup: ⚠️") - } + // Step 2: Connect ghost client + if err := ghostClient.Connect(); err != nil { + return fmt.Errorf("failed to connect ghost user: %w", err) + } + logger.LogInfo("Ghost user connected") + + // Step 3: Check test room exists + exists, err := ghostClient.CheckRoomExists(config.TestRoom) + if err != nil { + return fmt.Errorf("failed to check room existence: %w", err) + } + if !exists { + return fmt.Errorf("test room %s does not exist", config.TestRoom) } + // Step 4: Join test room + if err := ghostClient.JoinRoom(config.TestRoom); err != nil { + return fmt.Errorf("failed to join room: %w", err) + } + logger.LogInfo("Ghost user joined room", "room", config.TestRoom) + + // Step 5: Send message to test room + testMessage := fmt.Sprintf("Test ghost user message from %s at %s", ghostUsername, time.Now().Format("15:04:05")) + messageReq := xmpp.MessageRequest{ + RoomJID: config.TestRoom, + GhostUserJID: ghostJID, + Message: testMessage, + } + + if _, err := ghostClient.SendMessage(&messageReq); err != nil { + return fmt.Errorf("failed to send message: %w", err) + } + logger.LogInfo("Ghost user sent message", "message", testMessage) + + // Step 6: Cancel account (ghost user cancels their own registration) + ghostInBandReg, err := ghostClient.GetInBandRegistration() + if err != nil { + return fmt.Errorf("failed to get XEP-0077 handler for ghost user: %w", err) + } + + ghostCancellationRequest := &xmpp.CancellationRequest{Username: ghostUsername} + ghostCancelResponse, err := ghostInBandReg.CancelRegistration(serverJID, ghostCancellationRequest) + if err != nil { + return fmt.Errorf("failed to cancel ghost user registration: %w", err) + } + if !ghostCancelResponse.Success { + return fmt.Errorf("ghost user registration cancellation failed: %s", ghostCancelResponse.Error) + } + logger.LogInfo("Ghost user registration cancelled successfully") + + // Clean disconnect + if err := ghostClient.Disconnect(); err != nil { + logger.LogWarn("Failed to disconnect ghost client", "error", err) + } + + logger.LogInfo("XEP-0077 ghost user test completed successfully", "username", ghostUsername) return nil } diff --git a/go.mod b/go.mod index b44aed8..58ad65f 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.24.3 require ( github.com/gorilla/mux v1.8.1 github.com/jellydator/ttlcache/v3 v3.4.0 + github.com/lmittmann/tint v1.1.2 github.com/mattermost/mattermost/server/public v0.1.10 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.10.0 diff --git a/go.sum b/go.sum index 8aa93af..96f05a2 100644 --- a/go.sum +++ b/go.sum @@ -425,6 +425,8 @@ github.com/leonklingele/grouper v1.1.2 h1:o1ARBDLOmmasUaNDesWqWCIFH3u7hoFlM84Yrj github.com/leonklingele/grouper v1.1.2/go.mod h1:6D0M/HVkhs2yRKRFZUoGjeDy7EZTfFBE9gl4kjmIGkA= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lmittmann/tint v1.1.2 h1:2CQzrL6rslrsyjqLDwD11bZ5OpLBPU+g3G/r5LSfS8w= +github.com/lmittmann/tint v1.1.2/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= github.com/lufeee/execinquery v1.2.1 h1:hf0Ems4SHcUGBxpGN7Jz78z1ppVkP/837ZlETPCEtOM= github.com/lufeee/execinquery v1.2.1/go.mod h1:EC7DrEKView09ocscGHC+apXMIaorh4xqSxS/dy8SbM= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= diff --git a/plugin.json b/plugin.json index b6dce3d..dada123 100644 --- a/plugin.json +++ b/plugin.json @@ -17,9 +17,6 @@ }, "executable": "" }, - "webapp": { - "bundle_path": "webapp/dist/main.js" - }, "settings_schema": { "header": "Configure the XMPP bridge connection settings below.", "footer": "For more information about setting up the XMPP bridge, see the [documentation](https://github.com/mattermost/mattermost-plugin-bridge-xmpp/blob/main/README.md).", @@ -65,12 +62,42 @@ "secret": false }, { - "key": "XMPPUsernamePrefix", - "display_name": "XMPP Username Prefix", + "key": "EnableXMPPGhostUsers", + "display_name": "Enable XMPP Ghost Users", + "type": "bool", + "help_text": "When enabled, individual XMPP accounts will be created for each Mattermost user using XEP-0077 In-Band Registration. If disabled or unsupported, the bridge user will be used for all communications.", + "placeholder": "", + "default": false, + "hosting": "", + "secret": false + }, + { + "key": "XMPPGhostUserPrefix", + "display_name": "XMPP Ghost User Prefix", "type": "text", - "help_text": "Prefix for XMPP users in Mattermost (e.g., 'xmpp' creates usernames like 'xmpp:user@domain')", - "placeholder": "xmpp", - "default": "xmpp", + "help_text": "Prefix for ghost user accounts created on the XMPP server (e.g., 'mm_' creates users like 'mm_john@xmpp.example.com'). Required when ghost users are enabled.", + "placeholder": "mm_", + "default": null, + "hosting": "", + "secret": false + }, + { + "key": "XMPPGhostUserDomain", + "display_name": "XMPP Ghost User Domain", + "type": "text", + "help_text": "Domain for ghost user accounts on the XMPP server. If not specified, uses the domain from the XMPP Server URL.", + "placeholder": "xmpp.example.com", + "default": null, + "hosting": "", + "secret": false + }, + { + "key": "XMPPGhostUserCleanup", + "display_name": "Enable Ghost User Cleanup", + "type": "bool", + "help_text": "When enabled, ghost user accounts will be automatically removed from the XMPP server when Mattermost users are deleted. Disable to preserve accounts.", + "placeholder": "", + "default": true, "hosting": "", "secret": false }, @@ -102,4 +129,4 @@ "version": "v0.1.4" } } -} \ No newline at end of file +} diff --git a/server/bridge/xmpp/bridge.go b/server/bridge/xmpp/bridge.go index 1cba16f..3ec5139 100644 --- a/server/bridge/xmpp/bridge.go +++ b/server/bridge/xmpp/bridge.go @@ -134,20 +134,6 @@ func (b *xmppBridge) createUserManager(cfg *config.Configuration, bridgeID strin return NewXMPPUserManager(bridgeID, log, store, b.api, cfg, b.bridgeClient) } -// waitForCapabilityDetection waits for server capability detection to complete -func (b *xmppBridge) waitForCapabilityDetection() error { - if b.bridgeClient == nil { - return fmt.Errorf("bridge client not available") - } - - // Trigger capability detection synchronously - if err := b.bridgeClient.DetectServerCapabilities(); err != nil { - return fmt.Errorf("failed to detect server capabilities: %w", err) - } - - return nil -} - // checkXEP0077Support checks if the XMPP server supports XEP-0077 In-Band Registration func (b *xmppBridge) checkXEP0077Support() (bool, error) { if b.bridgeClient == nil { @@ -237,14 +223,14 @@ func (b *xmppBridge) Start() error { return fmt.Errorf("failed to connect to XMPP server: %w", err) } - // Wait for server capability detection to complete before creating user manager - if err := b.waitForCapabilityDetection(); err != nil { - return fmt.Errorf("failed to detect server capabilities: %w", err) - } - // Initialize proper user manager now that we're connected and server capabilities are detected b.userManager = b.createUserManager(cfg, b.bridgeID, b.logger, b.kvstore) + // Start the user manager to enable lifecycle management + if err := b.userManager.Start(b.ctx); err != nil { + return fmt.Errorf("failed to start user manager: %w", err) + } + // Load and join mapped channels if err := b.loadAndJoinMappedChannels(); err != nil { b.logger.LogWarn("Failed to join some mapped channels", "error", err) @@ -265,6 +251,11 @@ func (b *xmppBridge) Stop() error { b.cancel() } + // Stop the user manager to stop lifecycle management + if err := b.userManager.Stop(); err != nil { + b.logger.LogWarn("Error stopping user manager", "error", err) + } + if b.bridgeClient != nil { if err := b.bridgeClient.Disconnect(); err != nil { b.logger.LogWarn("Error disconnecting from XMPP server", "error", err) diff --git a/server/bridge/xmpp/message_handler.go b/server/bridge/xmpp/message_handler.go index 79fcc43..ec38b80 100644 --- a/server/bridge/xmpp/message_handler.go +++ b/server/bridge/xmpp/message_handler.go @@ -123,6 +123,12 @@ func (h *xmppMessageHandler) sendMessageViaGhostUser(xmppUserManager *UserManage return h.sendMessageViaBridgeUser(msg, roomJID) } + // Update user activity in KV store after successful message send + if err := xmppUserManager.UpdateUserActivity(msg.SourceUserID); err != nil { + h.logger.LogError("Failed to update user activity after message send, user may never disconnect", "user_id", msg.SourceUserID, "error", err) + // Don't fail the message send for activity update failures + } + h.logger.LogDebug("Message sent via ghost user", "source_user_id", msg.SourceUserID, "ghost_jid", xmppUser.GetJID(), diff --git a/server/bridge/xmpp/user.go b/server/bridge/xmpp/user.go index a76a676..b7c4eef 100644 --- a/server/bridge/xmpp/user.go +++ b/server/bridge/xmpp/user.go @@ -14,6 +14,14 @@ import ( xmppClient "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp" ) +const ( + // ghostUserInactivityTimeout is the duration after which inactive ghost users are disconnected + ghostUserInactivityTimeout = 2 * time.Minute + + // ghostUserActivityCheckInterval is how often we check for inactive ghost users + ghostUserActivityCheckInterval = 1 * time.Minute +) + // User represents an XMPP user that implements the BridgeUser interface type User struct { // User identity @@ -29,6 +37,11 @@ type User struct { stateMu sync.RWMutex connected atomic.Bool + // Activity tracking for lifecycle management + lastActivity time.Time + activityMu sync.RWMutex + enableLifecycleCheck bool // Whether this user should be subject to inactivity disconnection + // Configuration config *config.Configuration @@ -42,6 +55,11 @@ type User struct { // NewXMPPUser creates a new XMPP user with specific credentials func NewXMPPUser(id, displayName, jid, password string, cfg *config.Configuration, log logger.Logger) *User { + return NewXMPPUserWithActivity(id, displayName, jid, password, cfg, log, time.Now(), false) +} + +// NewXMPPUserWithActivity creates a new XMPP user with specific credentials, last activity time, and lifecycle setting +func NewXMPPUserWithActivity(id, displayName, jid, password string, cfg *config.Configuration, log logger.Logger, lastActivity time.Time, enableLifecycle bool) *User { ctx, cancel := context.WithCancel(context.Background()) // Create TLS config based on certificate verification setting @@ -61,15 +79,17 @@ func NewXMPPUser(id, displayName, jid, password string, cfg *config.Configuratio ) return &User{ - id: id, - displayName: displayName, - jid: jid, - client: client, - state: model.UserStateOffline, - config: cfg, - ctx: ctx, - cancel: cancel, - logger: log, + id: id, + displayName: displayName, + jid: jid, + client: client, + state: model.UserStateOffline, + config: cfg, + ctx: ctx, + cancel: cancel, + logger: log, + lastActivity: lastActivity, // Use provided activity time + enableLifecycleCheck: enableLifecycle, // Use provided lifecycle setting } } @@ -164,6 +184,9 @@ func (u *User) SendMessageToChannel(channelID, message string) error { u.logger.LogDebug("XMPP user sending message to channel", "user_id", u.id, "channel_id", channelID) + // Update activity timestamp for this user interaction + u.UpdateLastActivity() + // Ensure we're joined to the room before sending the message if err := u.EnsureJoinedToRoom(channelID); err != nil { return fmt.Errorf("failed to ensure joined to room before sending message: %w", err) @@ -368,46 +391,41 @@ func (u *User) GetClient() *xmppClient.Client { return u.client } -// UpdateCredentials updates the user's JID and password for ghost user mode -// This creates a new XMPP client with the updated credentials -func (u *User) UpdateCredentials(newJID, newPassword string) error { - u.logger.LogDebug("Updating XMPP user credentials", "user_id", u.id, "old_jid", u.jid, "new_jid", newJID) +// Activity tracking methods - // Disconnect existing client if connected - wasConnected := u.IsConnected() - if wasConnected { - if err := u.Disconnect(); err != nil { - u.logger.LogWarn("Error disconnecting before credential update", "user_id", u.id, "error", err) - } - } - - // Create TLS config based on certificate verification setting - tlsConfig := &tls.Config{ - InsecureSkipVerify: u.config.XMPPInsecureSkipVerify, //nolint:gosec // Allow insecure TLS for testing environments - } - - // Create new XMPP client with updated credentials - newClient := xmppClient.NewClientWithTLS( - u.config.XMPPServerURL, - newJID, - newPassword, - u.config.GetXMPPResource(), - u.id, // Use user ID as remote ID - tlsConfig, - u.logger, - ) - - // Update user fields - u.jid = newJID - u.client = newClient - - // Reconnect if we were previously connected - if wasConnected { - if err := u.Connect(); err != nil { - return fmt.Errorf("failed to reconnect after credential update: %w", err) - } - } - - u.logger.LogInfo("XMPP user credentials updated successfully", "user_id", u.id, "new_jid", newJID) - return nil +// UpdateLastActivity updates the last activity timestamp for this user +func (u *User) UpdateLastActivity() { + u.activityMu.Lock() + defer u.activityMu.Unlock() + u.lastActivity = time.Now() + u.logger.LogDebug("Updated last activity for user", "user_id", u.id, "timestamp", u.lastActivity) +} + +// GetLastActivity returns the last activity timestamp +func (u *User) GetLastActivity() time.Time { + u.activityMu.RLock() + defer u.activityMu.RUnlock() + return u.lastActivity +} + +// IsInactive returns true if the user has been inactive longer than the specified duration +func (u *User) IsInactive(inactivityThreshold time.Duration) bool { + u.activityMu.RLock() + defer u.activityMu.RUnlock() + return time.Since(u.lastActivity) > inactivityThreshold +} + +// SetLifecycleManagement enables or disables lifecycle management for this user +func (u *User) SetLifecycleManagement(enabled bool) { + u.activityMu.Lock() + defer u.activityMu.Unlock() + u.enableLifecycleCheck = enabled + u.logger.LogDebug("Lifecycle management setting changed", "user_id", u.id, "enabled", enabled) +} + +// IsLifecycleManaged returns true if this user is subject to lifecycle management +func (u *User) IsLifecycleManaged() bool { + u.activityMu.RLock() + defer u.activityMu.RUnlock() + return u.enableLifecycleCheck } diff --git a/server/bridge/xmpp/user_manager.go b/server/bridge/xmpp/user_manager.go index fe7bdd4..3f88dfa 100644 --- a/server/bridge/xmpp/user_manager.go +++ b/server/bridge/xmpp/user_manager.go @@ -3,9 +3,11 @@ package xmpp import ( "bytes" "context" + "crypto/tls" "encoding/json" "fmt" "sync" + "time" "github.com/mattermost/mattermost/server/public/plugin" "mellium.im/xmpp/jid" @@ -33,6 +35,8 @@ type GhostUserData struct { GhostJID string `json:"ghost_jid"` // XMPP JID of the ghost user GhostPassword string `json:"ghost_password"` // XMPP password for the ghost user Created int64 `json:"created"` // Timestamp when ghost was created + LastActivity int64 `json:"last_activity"` // Unix timestamp of last activity + LifecycleEnabled bool `json:"lifecycle_enabled"` // Whether this user should be subject to inactivity checking } // UserManager manages XMPP users using XEP-0077 ghost users ONLY @@ -47,11 +51,23 @@ type UserManager struct { bridgeClient *xmppClient.Client ctx context.Context cancel context.CancelFunc + + // Node identification for HA environments + nodeID string // Unique identifier for this Mattermost node (from api.GetDiagnosticId) + + // Connection caching to prevent connection leaks + activeUsers map[string]*User // Cache of connected users on THIS node + activeUsersMu sync.RWMutex // Protects activeUsers map } // NewXMPPUserManager creates a new XMPP-specific user manager for ghost users only func NewXMPPUserManager(bridgeType string, log logger.Logger, store kvstore.KVStore, api plugin.API, cfg *config.Configuration, bridgeClient *xmppClient.Client) model.BridgeUserManager { ctx, cancel := context.WithCancel(context.Background()) + + // Get unique node ID from Mattermost API for HA environments + nodeID := api.GetDiagnosticId() + log.LogDebug("Initializing XMPP user manager", "bridge_type", bridgeType, "node_id", nodeID[:8]) + return &UserManager{ bridgeType: bridgeType, logger: log, @@ -61,6 +77,8 @@ func NewXMPPUserManager(bridgeType string, log logger.Logger, store kvstore.KVSt bridgeClient: bridgeClient, ctx: ctx, cancel: cancel, + nodeID: nodeID, + activeUsers: make(map[string]*User), // Initialize connection cache } } @@ -153,34 +171,61 @@ func (m *UserManager) CreateUser(user model.BridgeUser) error { return nil } -// GetUser retrieves a user by Mattermost user ID, creating XMPPUser from ghost data +// GetUser retrieves a user by Mattermost user ID, checking cache first, then creating XMPPUser from ghost data func (m *UserManager) GetUser(mattermostUserID string) (model.BridgeUser, error) { + // First check the connection cache + if cachedUser, found := m.getCachedUser(mattermostUserID); found { + m.logger.LogDebug("Found user in connection cache", "user_id", mattermostUserID, "ghost_jid", cachedUser.GetJID()) + return cachedUser, nil + } + // Check if ghost user data exists ghostData, err := m.loadGhostUserData(mattermostUserID) if err != nil { return nil, fmt.Errorf("ghost user not found for Mattermost user %s: %w", mattermostUserID, err) } - // Create XMPPUser directly with ghost credentials + // Create XMPPUser directly with ghost credentials and activity data m.configMu.RLock() cfg := m.config m.configMu.RUnlock() - user := NewXMPPUser(mattermostUserID, mattermostUserID, ghostData.GhostJID, ghostData.GhostPassword, cfg, m.logger) + // Handle migration of existing users without activity data + lastActivity := time.Now() + if ghostData.LastActivity > 0 { + lastActivity = time.Unix(ghostData.LastActivity, 0) + } else { + // Update the KV store with the migration data + ghostData.LastActivity = lastActivity.Unix() + ghostData.LifecycleEnabled = true + _ = m.storeGhostUserData(mattermostUserID, ghostData) // Don't fail if storage update fails + } + + user := m.createXMPPUserWithActivity(mattermostUserID, mattermostUserID, ghostData.GhostJID, ghostData.GhostPassword, cfg, m.logger, lastActivity, ghostData.LifecycleEnabled) // Ensure the user is connected if err := m.ensureUserConnected(user, mattermostUserID); err != nil { return nil, fmt.Errorf("failed to ensure ghost user is connected: %w", err) } + // Cache the connected user to prevent connection leaks + m.cacheUser(mattermostUserID, user) + return user, nil } // GetOrCreateUser retrieves a user by Mattermost user ID, creating a new ghost user if it doesn't exist func (m *UserManager) GetOrCreateUser(mattermostUserID, displayName string) (model.BridgeUser, error) { - // Try to get existing user first + // First check the connection cache + if cachedUser, found := m.getCachedUser(mattermostUserID); found { + m.logger.LogDebug("Found user in connection cache", "user_id", mattermostUserID, "ghost_jid", cachedUser.GetJID()) + return cachedUser, nil + } + + // Try to get existing user from KV store user, err := m.GetUser(mattermostUserID) if err == nil { + // GetUser already cached the user, so just return it return user, nil } @@ -200,15 +245,20 @@ func (m *UserManager) GetOrCreateUser(mattermostUserID, displayName string) (mod return nil, fmt.Errorf("failed to register ghost user: %w", err) } - // Create XMPPUser instance with the correct ghost credentials - xmppUser := NewXMPPUser(mattermostUserID, displayName, ghostJID, ghostPassword, cfg, m.logger) + // Initialize activity data for new user + now := time.Now() - // Store ghost user data + // Create XMPPUser instance with the correct ghost credentials and activity data + xmppUser := m.createXMPPUserWithActivity(mattermostUserID, displayName, ghostJID, ghostPassword, cfg, m.logger, now, true) + + // Store ghost user data with activity tracking ghostData := &GhostUserData{ MattermostUserID: mattermostUserID, GhostJID: ghostJID, GhostPassword: ghostPassword, - Created: m.getCurrentTimestamp(), + Created: now.Unix(), + LastActivity: now.Unix(), + LifecycleEnabled: true, } if err := m.storeGhostUserData(mattermostUserID, ghostData); err != nil { @@ -221,6 +271,9 @@ func (m *UserManager) GetOrCreateUser(mattermostUserID, displayName string) (mod return nil, fmt.Errorf("failed to connect newly created ghost user: %w", err) } + // Cache the connected user to prevent connection leaks + m.cacheUser(mattermostUserID, xmppUser) + m.logger.LogInfo("Ghost user created and connected successfully", "mattermost_user_id", mattermostUserID, "ghost_jid", ghostJID) return xmppUser, nil } @@ -234,6 +287,15 @@ func (m *UserManager) DeleteUser(mattermostUserID string) error { return fmt.Errorf("ghost user not found for Mattermost user %s", mattermostUserID) } + // Disconnect and remove from cache if user is currently active + if cachedUser, found := m.getCachedUser(mattermostUserID); found { + if err := cachedUser.Disconnect(); err != nil { + m.logger.LogWarn("Failed to disconnect cached user during deletion", "mattermost_user_id", mattermostUserID, "error", err) + } + m.removeCachedUser(mattermostUserID) + m.logger.LogDebug("Disconnected and removed user from cache during deletion", "mattermost_user_id", mattermostUserID) + } + // Clean up ghost user account if cleanup is enabled m.configMu.RLock() shouldCleanup := m.config.IsGhostUserCleanupEnabled() @@ -301,9 +363,16 @@ func (m *UserManager) Start(ctx context.Context) error { // Continue starting other users even if one fails } else { startedCount++ + // Enable lifecycle management for ghost users + if xmppUser, ok := user.(*User); ok { + xmppUser.SetLifecycleManagement(true) + } } } + // Start the lifecycle management goroutine + go m.lifecycleManager() + m.logger.LogInfo("XMPP ghost user manager started", "bridge_type", m.bridgeType, "user_count", startedCount) return nil } @@ -312,19 +381,43 @@ func (m *UserManager) Start(ctx context.Context) error { func (m *UserManager) Stop() error { m.logger.LogDebug("Stopping XMPP ghost user manager", "bridge_type", m.bridgeType) + // Cancel context to stop background goroutines if m.cancel != nil { m.cancel() } - // Get all users from KV store and stop them - users := m.ListUsers() - for _, user := range users { + // Gracefully shutdown all cached connections first (much faster than ListUsers) + cachedUsers := m.getCachedUsers() + disconnectedCount := 0 + + for _, user := range cachedUsers { if err := user.Stop(); err != nil { - m.logger.LogWarn("Error stopping ghost user during manager shutdown", "bridge_type", m.bridgeType, "user_id", user.GetID(), "error", err) + m.logger.LogWarn("Error stopping cached ghost user during manager shutdown", "bridge_type", m.bridgeType, "user_id", user.GetID(), "error", err) + } else { + disconnectedCount++ } } - m.logger.LogInfo("XMPP ghost user manager stopped", "bridge_type", m.bridgeType) + // Clear the entire cache + m.activeUsersMu.Lock() + m.activeUsers = make(map[string]*User) + m.activeUsersMu.Unlock() + + // Also check for any users not in cache and stop them (fallback) + users := m.ListUsers() + fallbackStoppedCount := 0 + for _, user := range users { + if err := user.Stop(); err != nil { + m.logger.LogWarn("Error stopping ghost user during manager shutdown", "bridge_type", m.bridgeType, "user_id", user.GetID(), "error", err) + } else { + fallbackStoppedCount++ + } + } + + m.logger.LogInfo("XMPP ghost user manager stopped", + "bridge_type", m.bridgeType, + "cached_users_stopped", disconnectedCount, + "fallback_users_stopped", fallbackStoppedCount) return nil } @@ -376,6 +469,30 @@ func (m *UserManager) removeGhostUserData(mattermostUserID string) error { return m.kvstore.Delete(key) } +// UpdateUserActivity updates both the in-memory and persisted activity timestamp for a user +func (m *UserManager) UpdateUserActivity(mattermostUserID string) error { + // Load existing ghost user data + ghostData, err := m.loadGhostUserData(mattermostUserID) + if err != nil { + return fmt.Errorf("failed to load ghost user data for activity update: %w", err) + } + + // Update the activity timestamp + now := time.Now() + ghostData.LastActivity = now.Unix() + + // Store the updated data + if err := m.storeGhostUserData(mattermostUserID, ghostData); err != nil { + return fmt.Errorf("failed to persist activity update: %w", err) + } + + m.logger.LogDebug("Updated user activity in KV store", + "user_id", mattermostUserID, + "timestamp", now) + + return nil +} + func (m *UserManager) cleanupGhostUser(mattermostUserID string) error { ghostData, err := m.loadGhostUserData(mattermostUserID) if err != nil { @@ -396,7 +513,10 @@ func (m *UserManager) cleanupGhostUser(mattermostUserID string) error { } // Unregister the ghost user account via XEP-0077 - response, err := regHandler.CancelRegistration(ghostJIDParsed.Domain()) + cancellationRequest := &xmppClient.CancellationRequest{ + Username: ghostJIDParsed.Localpart(), // Extract username from ghost JID + } + response, err := regHandler.CancelRegistration(ghostJIDParsed.Domain(), cancellationRequest) if err != nil { return fmt.Errorf("failed to cancel registration for ghost user %s: %w", ghostData.GhostJID, err) } @@ -446,6 +566,168 @@ func generateSecurePassword() string { return "temp_secure_password_123" } +// lifecycleManager runs periodically to check for inactive ghost users and disconnect them +func (m *UserManager) lifecycleManager() { + m.logger.LogDebug("Starting lifecycle manager for ghost user cleanup") + + ticker := time.NewTicker(ghostUserActivityCheckInterval) + defer ticker.Stop() + + for { + select { + case <-m.ctx.Done(): + m.logger.LogDebug("Lifecycle manager stopped due to context cancellation") + return + case <-ticker.C: + m.checkAndDisconnectInactiveUsers() + } + } +} + +// checkAndDisconnectInactiveUsers checks all cached users for inactivity and disconnects inactive ghost users +func (m *UserManager) checkAndDisconnectInactiveUsers() { + m.logger.LogDebug("Checking cached users for inactivity") + + // Get all currently cached users (this is much more efficient than ListUsers) + cachedUsers := m.getCachedUsers() + inactiveCount := 0 + disconnectedCount := 0 + + for _, xmppUser := range cachedUsers { + // Only check users that have lifecycle management enabled + if !xmppUser.IsLifecycleManaged() { + continue + } + + // Check if user is connected and inactive + if xmppUser.IsConnected() && xmppUser.IsInactive(ghostUserInactivityTimeout) { + inactiveCount++ + lastActivity := xmppUser.GetLastActivity() + inactiveDuration := time.Since(lastActivity) + + m.logger.LogInfo("Disconnecting inactive ghost user", + "user_id", xmppUser.GetID(), + "jid", xmppUser.GetJID(), + "last_activity", lastActivity, + "inactive_duration", inactiveDuration) + + // Gracefully disconnect the inactive user + if err := xmppUser.Disconnect(); err != nil { + m.logger.LogWarn("Failed to disconnect inactive ghost user", + "user_id", xmppUser.GetID(), + "jid", xmppUser.GetJID(), + "error", err) + } else { + disconnectedCount++ + // Remove disconnected user from cache to free memory + m.removeCachedUser(xmppUser.GetID()) + m.logger.LogDebug("Successfully disconnected and removed inactive ghost user from cache", + "user_id", xmppUser.GetID(), + "jid", xmppUser.GetJID()) + } + } + } + + if inactiveCount > 0 { + m.logger.LogInfo("Completed inactive user cleanup cycle", + "cached_users_checked", len(cachedUsers), + "inactive_users_found", inactiveCount, + "users_disconnected", disconnectedCount) + } else { + m.logger.LogDebug("No inactive users found during cleanup cycle", "cached_users_checked", len(cachedUsers)) + } +} + +// createXMPPUserWithActivity creates an XMPP user with node-specific resource and activity data +func (m *UserManager) createXMPPUserWithActivity(id, displayName, userJID, password string, cfg *config.Configuration, log logger.Logger, lastActivity time.Time, enableLifecycle bool) *User { + // Generate node-specific resource to prevent conflicts in HA environments + baseResource := cfg.GetXMPPResource() + nodeSpecificResource := fmt.Sprintf("%s-node-%s", baseResource, m.nodeID[:8]) + + m.logger.LogDebug("Creating XMPP user with node-specific resource", + "user_id", id, + "base_resource", baseResource, + "node_resource", nodeSpecificResource, + "node_id", m.nodeID[:8]) + + // Create TLS config based on certificate verification setting + tlsConfig := &tls.Config{ + InsecureSkipVerify: cfg.XMPPInsecureSkipVerify, //nolint:gosec // Allow insecure TLS for testing environments + } + + // Create XMPP client for this user with provided credentials and node-specific resource + client := xmppClient.NewClientWithTLS( + cfg.XMPPServerURL, + userJID, + password, // Use the provided password (ghost password) + nodeSpecificResource, // Use node-specific resource instead of base resource + id, // Use user ID as remote ID + tlsConfig, + log, + ) + + ctx, cancel := context.WithCancel(context.Background()) + + return &User{ + id: id, + displayName: displayName, + jid: userJID, + client: client, + state: model.UserStateOffline, + config: cfg, + ctx: ctx, + cancel: cancel, + logger: log, + lastActivity: lastActivity, // Use provided activity time + enableLifecycleCheck: enableLifecycle, // Use provided lifecycle setting + } +} + +// Cache management methods for connection caching + +// getCachedUser retrieves a user from the connection cache +func (m *UserManager) getCachedUser(mattermostUserID string) (*User, bool) { + m.activeUsersMu.RLock() + defer m.activeUsersMu.RUnlock() + user, exists := m.activeUsers[mattermostUserID] + return user, exists +} + +// cacheUser stores a user in the connection cache +func (m *UserManager) cacheUser(mattermostUserID string, user *User) { + m.activeUsersMu.Lock() + defer m.activeUsersMu.Unlock() + m.activeUsers[mattermostUserID] = user + m.logger.LogDebug("Cached user connection", + "user_id", mattermostUserID, + "ghost_jid", user.GetJID(), + "cache_size", len(m.activeUsers)) +} + +// removeCachedUser removes a user from the connection cache +func (m *UserManager) removeCachedUser(mattermostUserID string) { + m.activeUsersMu.Lock() + defer m.activeUsersMu.Unlock() + if user, exists := m.activeUsers[mattermostUserID]; exists { + delete(m.activeUsers, mattermostUserID) + m.logger.LogDebug("Removed user from cache", + "user_id", mattermostUserID, + "ghost_jid", user.GetJID(), + "cache_size", len(m.activeUsers)) + } +} + +// getCachedUsers returns all cached users (for lifecycle management) +func (m *UserManager) getCachedUsers() []*User { + m.activeUsersMu.RLock() + defer m.activeUsersMu.RUnlock() + users := make([]*User, 0, len(m.activeUsers)) + for _, user := range m.activeUsers { + users = append(users, user) + } + return users +} + func (m *UserManager) getCurrentTimestamp() int64 { // TODO: Use proper time source (time.Now().Unix()) return 0 diff --git a/server/xmpp/client.go b/server/xmpp/client.go index d7cdc37..27ae214 100644 --- a/server/xmpp/client.go +++ b/server/xmpp/client.go @@ -182,11 +182,6 @@ func (c *Client) GetInBandRegistration() (*InBandRegistration, error) { return c.XEPFeatures.InBandRegistration, nil } -// DetectServerCapabilities discovers which XEPs are supported by the server (public method) -func (c *Client) DetectServerCapabilities() error { - return c.detectServerCapabilities() -} - // detectServerCapabilities discovers which XEPs are supported by the server func (c *Client) detectServerCapabilities() error { if c.session == nil { @@ -360,6 +355,13 @@ func (c *Client) Connect() error { return fmt.Errorf("failed to start session serving") } c.logger.LogInfo("XMPP client connected successfully", "jid", c.jidAddr.String()) + + // Automatically detect server capabilities after successful connection + if err := c.detectServerCapabilities(); err != nil { + c.logger.LogError("Failed to detect server capabilities automatically", "error", err) + // Don't fail the connection for capability detection issues + } + return nil case <-time.After(10 * time.Second): return fmt.Errorf("timeout waiting for session to be ready") diff --git a/server/xmpp/xep_0077.go b/server/xmpp/xep_0077.go index 9716fd7..bb02f31 100644 --- a/server/xmpp/xep_0077.go +++ b/server/xmpp/xep_0077.go @@ -2,6 +2,7 @@ package xmpp import ( + "bytes" "context" "encoding/xml" "fmt" @@ -25,8 +26,8 @@ type InBandRegistration struct { enabled bool } -// RegistrationQuery represents the element -type RegistrationQuery struct { +// InBandRegistrationQuery represents the element +type InBandRegistrationQuery struct { XMLName xml.Name `xml:"jabber:iq:register query"` Instructions string `xml:"instructions,omitempty"` Username string `xml:"username,omitempty"` @@ -65,8 +66,13 @@ type RegistrationRequest struct { AdditionalFields map[string]string `json:"additional_fields,omitempty"` } -// RegistrationResponse represents the result of a registration operation -type RegistrationResponse struct { +// CancellationRequest represents a request to cancel/remove a user registration +type CancellationRequest struct { + Username string `json:"username"` +} + +// InBandRegistrationResponse represents the result of any XEP-0077 In-Band Registration operation +type InBandRegistrationResponse struct { Success bool `json:"success"` Error string `json:"error,omitempty"` Message string `json:"message,omitempty"` @@ -114,7 +120,7 @@ func (r *InBandRegistration) GetRegistrationFields(serverJID jid.JID) (*Registra To: serverJID, } - query := RegistrationQuery{} + query := InBandRegistrationQuery{} ctx, cancel := context.WithTimeout(r.client.ctx, 10*time.Second) defer cancel() @@ -138,7 +144,7 @@ func (r *InBandRegistration) GetRegistrationFields(serverJID jid.JID) (*Registra // Create the IQ with query payload iqWithQuery := struct { stanza.IQ - Query RegistrationQuery `xml:"jabber:iq:register query"` + Query InBandRegistrationQuery `xml:"jabber:iq:register query"` }{ IQ: iq, Query: query, @@ -162,13 +168,13 @@ func (r *InBandRegistration) GetRegistrationFields(serverJID jid.JID) (*Registra } // RegisterAccount registers a new account with the server -func (r *InBandRegistration) RegisterAccount(serverJID jid.JID, request *RegistrationRequest) (*RegistrationResponse, error) { +func (r *InBandRegistration) RegisterAccount(serverJID jid.JID, request *RegistrationRequest) (*InBandRegistrationResponse, error) { if r.client.session == nil { return nil, fmt.Errorf("XMPP session not established") } if request.Username == "" || request.Password == "" { - return &RegistrationResponse{ + return &InBandRegistrationResponse{ Success: false, Error: "username and password are required", }, nil @@ -180,7 +186,7 @@ func (r *InBandRegistration) RegisterAccount(serverJID jid.JID, request *Registr To: serverJID, } - query := RegistrationQuery{ + query := InBandRegistrationQuery{ Username: request.Username, Password: request.Password, Email: request.Email, @@ -207,57 +213,66 @@ func (r *InBandRegistration) RegisterAccount(serverJID jid.JID, request *Registr r.logger.LogInfo("Registering new account", "server", serverJID.String(), "username", request.Username) - // Create response channels - responseChannel := make(chan *RegistrationResponse, 1) - - // Store response handler temporarily - go func() { - // This is a simplified approach - in practice you'd want proper IQ response handling - response := &RegistrationResponse{ - Success: true, - Message: "Account registered successfully", - } - responseChannel <- response - }() - - // Create the IQ with query payload - iqWithQuery := struct { - stanza.IQ - Query RegistrationQuery `xml:"jabber:iq:register query"` - }{ - IQ: iq, - Query: query, + // Create a buffer to encode the query payload + var queryBuf bytes.Buffer + encoder := xml.NewEncoder(&queryBuf) + if err := encoder.Encode(query); err != nil { + return &InBandRegistrationResponse{ + Success: false, + Error: fmt.Sprintf("failed to encode registration query: %v", err), + }, nil } + encoder.Flush() - // Encode and send the registration IQ - if err := r.client.session.Encode(ctx, iqWithQuery); err != nil { - return &RegistrationResponse{ + // Create TokenReader from the encoded query by using xml.NewDecoder + payloadReader := xml.NewDecoder(bytes.NewReader(queryBuf.Bytes())) + + // Send the registration IQ and wait for response + response, err := r.client.session.SendIQElement(ctx, payloadReader, iq) + if err != nil { + return &InBandRegistrationResponse{ Success: false, Error: fmt.Sprintf("failed to send registration request: %v", err), }, nil } - // Wait for response - select { - case response := <-responseChannel: - r.logger.LogInfo("Account registration completed", "server", serverJID.String(), "username", request.Username, "success", response.Success) - return response, nil - case <-ctx.Done(): - return &RegistrationResponse{ - Success: false, - Error: fmt.Sprintf("timeout registering account with %s", serverJID.String()), - }, nil + // Try to unmarshal the response as an error IQ first + responseIQ, err := stanza.UnmarshalIQError(response, xml.StartElement{}) + registrationResponse := &InBandRegistrationResponse{} + response.Close() + + if err != nil { + // If we can't parse the response, treat it as a failure and log the parse error + registrationResponse.Success = false + registrationResponse.Error = "Failed to parse server response for registration request" + r.logger.LogWarn("Registration response could not be parsed, treating as failure", + "server", serverJID.String(), + "username", request.Username, + "parse_error", err.Error()) + } else { + // Successfully unmarshaled - check IQ type + if responseIQ.Type == stanza.ErrorIQ { + registrationResponse.Success = false + registrationResponse.Error = "Server returned error for registration request" + r.logger.LogWarn("Registration failed with server error", "server", serverJID.String(), "username", request.Username, "iq_type", responseIQ.Type) + } else { + registrationResponse.Success = true + registrationResponse.Message = "Account registration completed successfully" + } } + + r.logger.LogInfo("Account registration completed", "server", serverJID.String(), "username", request.Username, "success", registrationResponse.Success) + return registrationResponse, nil } // ChangePassword changes the password for an existing account -func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPassword, newPassword string) (*RegistrationResponse, error) { +func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPassword, newPassword string) (*InBandRegistrationResponse, error) { if r.client.session == nil { return nil, fmt.Errorf("XMPP session not established") } if username == "" || oldPassword == "" || newPassword == "" { - return &RegistrationResponse{ + return &InBandRegistrationResponse{ Success: false, Error: "username, old password, and new password are required", }, nil @@ -269,7 +284,7 @@ func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPass To: serverJID, } - query := RegistrationQuery{ + query := InBandRegistrationQuery{ Username: username, Password: newPassword, } @@ -282,7 +297,7 @@ func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPass // Create the IQ with query payload iqWithQuery := struct { stanza.IQ - Query RegistrationQuery `xml:"jabber:iq:register query"` + Query InBandRegistrationQuery `xml:"jabber:iq:register query"` }{ IQ: iq, Query: query, @@ -290,14 +305,14 @@ func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPass // Send the password change IQ if err := r.client.session.Encode(ctx, iqWithQuery); err != nil { - return &RegistrationResponse{ + return &InBandRegistrationResponse{ Success: false, Error: fmt.Sprintf("failed to send password change request: %v", err), }, nil } // In practice, you'd wait for the IQ response here - response := &RegistrationResponse{ + response := &InBandRegistrationResponse{ Success: true, Message: "Password changed successfully", } @@ -306,50 +321,83 @@ func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPass return response, nil } -// CancelRegistration cancels/removes an existing registration -func (r *InBandRegistration) CancelRegistration(serverJID jid.JID) (*RegistrationResponse, error) { +// CancelRegistration cancels/removes an existing registration for the specified user +func (r *InBandRegistration) CancelRegistration(serverJID jid.JID, request *CancellationRequest) (*InBandRegistrationResponse, error) { if r.client.session == nil { return nil, fmt.Errorf("XMPP session not established") } + if request.Username == "" { + return &InBandRegistrationResponse{ + Success: false, + Error: "username is required", + }, nil + } + // Create cancellation IQ iq := stanza.IQ{ Type: stanza.SetIQ, To: serverJID, } - query := RegistrationQuery{ - Remove: &struct{}{}, // Empty struct indicates removal + query := InBandRegistrationQuery{ + Username: request.Username, // Specify which user to remove + Remove: &struct{}{}, // Removal flag } ctx, cancel := context.WithTimeout(r.client.ctx, 10*time.Second) defer cancel() - r.logger.LogInfo("Cancelling registration", "server", serverJID.String()) + r.logger.LogInfo("Cancelling registration", "server", serverJID.String(), "username", request.Username) - // Create the IQ with query payload - iqWithQuery := struct { - stanza.IQ - Query RegistrationQuery `xml:"jabber:iq:register query"` - }{ - IQ: iq, - Query: query, + // Create a buffer to encode the query payload + var queryBuf bytes.Buffer + encoder := xml.NewEncoder(&queryBuf) + if err := encoder.Encode(query); err != nil { + return &InBandRegistrationResponse{ + Success: false, + Error: fmt.Sprintf("failed to encode cancellation query: %v", err), + }, nil } + encoder.Flush() - // Send the cancellation IQ - if err := r.client.session.Encode(ctx, iqWithQuery); err != nil { - return &RegistrationResponse{ + // Create TokenReader from the encoded query + payloadReader := xml.NewDecoder(bytes.NewReader(queryBuf.Bytes())) + + // Send the cancellation IQ and wait for response + response, err := r.client.session.SendIQElement(ctx, payloadReader, iq) + if err != nil { + return &InBandRegistrationResponse{ Success: false, Error: fmt.Sprintf("failed to send registration cancellation request: %v", err), }, nil } - // In practice, you'd wait for the IQ response here - response := &RegistrationResponse{ - Success: true, - Message: "Registration cancelled successfully", + // Try to unmarshal the response as an error IQ first + responseIQ, err := stanza.UnmarshalIQError(response, xml.StartElement{}) + cancellationResponse := &InBandRegistrationResponse{} + response.Close() + + if err != nil { + // If we can't parse the response, treat it as a failure and log the parse error + cancellationResponse.Success = false + cancellationResponse.Error = "Failed to parse server response for cancellation request" + r.logger.LogWarn("Cancellation response could not be parsed, treating as failure", + "server", serverJID.String(), + "username", request.Username, + "parse_error", err.Error()) + } else { + // Successfully unmarshaled - check IQ type + if responseIQ.Type == stanza.ErrorIQ { + cancellationResponse.Success = false + cancellationResponse.Error = "Server returned error for cancellation request" + r.logger.LogWarn("Registration cancellation failed with server error", "server", serverJID.String(), "username", request.Username, "iq_type", responseIQ.Type) + } else { + cancellationResponse.Success = true + cancellationResponse.Message = "Registration cancelled successfully" + } } - r.logger.LogInfo("Registration cancellation completed", "server", serverJID.String()) - return response, nil + r.logger.LogInfo("Registration cancellation completed", "server", serverJID.String(), "username", request.Username, "success", cancellationResponse.Success) + return cancellationResponse, nil } diff --git a/sidecar/docker-compose.yml b/sidecar/docker-compose.yml index b0f765b..68c218b 100644 --- a/sidecar/docker-compose.yml +++ b/sidecar/docker-compose.yml @@ -14,43 +14,7 @@ services: - "7777:7777" # File transfer proxy volumes: - openfire_data:/var/lib/openfire - depends_on: - - postgres - networks: - - openfire-network - - postgres: - image: postgres:15 - container_name: openfire-postgres - restart: unless-stopped - environment: - - POSTGRES_DB=openfire - - POSTGRES_USER=openfire - - POSTGRES_PASSWORD=openfire123 - volumes: - - postgres_data:/var/lib/postgresql/data - # ports: - # - "5432:5432" # Exposed for development access - networks: - - openfire-network - - adminer: - image: adminer:latest - container_name: openfire-adminer - restart: unless-stopped - # ports: - # - "8080:8080" # Database management interface - depends_on: - - postgres - networks: - - openfire-network volumes: openfire_data: driver: local - postgres_data: - driver: local - -networks: - openfire-network: - driver: bridge