Compare commits

..

8 commits

Author SHA1 Message Date
0442bd7b72
feat: implement connection caching for ghost user lifecycle management
Some checks failed
ci / plugin-ci (push) Has been cancelled
Implement comprehensive connection caching system to prevent XMPP connection leaks and support HA environments:

- Add node-specific XMPP resources using format "{baseResource}-node-{diagnosticId[:8]}" for HA compatibility
- Implement thread-safe connection cache with mutex protection in UserManager
- Add cache-first lookup in GetUser/GetOrCreateUser methods to prevent duplicate connections
- Update lifecycle manager to efficiently check cached users instead of expensive KV store queries
- Add graceful shutdown cleanup to properly disconnect all cached connections
- Implement cache management methods: getCachedUser, cacheUser, removeCachedUser, getCachedUsers
- Update activity tracking to work with cached connections
- Add proper cache cleanup when users are disconnected or deleted

This prevents connection leaks identified in previous implementation while maintaining efficient ghost user lifecycle management with 30-minute inactivity timeout.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-12 21:18:35 +02:00
22f8c97a25
feat: improve doctor command logging and automatic server capability detection
- Replace SimpleLogger with structured logging using slog and tint library
- Add colorized output with proper log levels (DEBUG/INFO/WARN/ERROR)
- Remove manual DetectServerCapabilities() calls from doctor command
- Server capabilities are now automatically detected on client connection
- Update default test credentials to admin@localhost/admin for consistency
- Improve testXEP0077 to use fail-fast approach with proper ghost user workflow
- Add structured logging throughout all test functions with timing information

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-12 19:08:49 +02:00
96d8b84dcb
fix: properly check responses for registration and cancellation requests 2025-08-12 18:56:06 +02:00
9bd0071b4a
chore: missing fields in plugin.json 2025-08-12 18:44:22 +02:00
b7fd8ddb54
chore: remove optional service in sidecar 2025-08-12 18:44:09 +02:00
109b491a0a
chore: disable webapp 2025-08-12 18:43:50 +02:00
b99b412692
fix: properly send inbandregistration requests 2025-08-12 18:43:21 +02:00
4e4a290813
chore: detect server capabilities on connect 2025-08-12 18:42:55 +02:00
12 changed files with 769 additions and 479 deletions

View file

@ -9,8 +9,8 @@ This plugin provides bidirectional message synchronization between Mattermost an
- Bidirectional message synchronization (Mattermost ↔ XMPP) - Bidirectional message synchronization (Mattermost ↔ XMPP)
- XMPP Multi-User Chat (MUC) support - XMPP Multi-User Chat (MUC) support
- Configurable username prefixes for XMPP users in Mattermost - Configurable username prefixes for XMPP users
- Ghost user management for cross-platform user representation - Ghost user management for cross-platform user representation on the XMPP server with connection lifecycle management (**XEP-0077 only**)
- Comprehensive XMPP client with SASL Plain authentication - Comprehensive XMPP client with SASL Plain authentication
To learn more about plugins, see [our plugin documentation](https://developers.mattermost.com/extend/plugins/). To learn more about plugins, see [our plugin documentation](https://developers.mattermost.com/extend/plugins/).
@ -72,7 +72,7 @@ To avoid having to manually install your plugin, build and deploy your plugin us
} }
``` ```
### Development guidance ### Development guidance
1. Fewer packages is better: default to the main package unless there's good reason for a new package. 1. Fewer packages is better: default to the main package unless there's good reason for a new package.
@ -84,7 +84,7 @@ To avoid having to manually install your plugin, build and deploy your plugin us
### Modifying the server boilerplate ### Modifying the server boilerplate
The server code comes with some boilerplate for creating an api, using slash commands, accessing the kvstore and using the cluster package for jobs. The server code comes with some boilerplate for creating an api, using slash commands, accessing the kvstore and using the cluster package for jobs.
#### Api #### Api

View file

@ -4,18 +4,20 @@ import (
"crypto/tls" "crypto/tls"
"flag" "flag"
"fmt" "fmt"
"log" "log/slog"
"os" "os"
"time" "time"
"github.com/lmittmann/tint"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp"
) )
const ( const (
// Default values for development server (sidecar) // Default values for development server (sidecar)
defaultServer = "localhost:5222" defaultServer = "localhost:5222"
defaultUsername = "testuser@localhost" defaultUsername = "admin@localhost"
defaultPassword = "testpass" defaultPassword = "admin"
defaultResource = "doctor" defaultResource = "doctor"
defaultTestRoom = "test1@conference.localhost" defaultTestRoom = "test1@conference.localhost"
) )
@ -46,8 +48,8 @@ func main() {
flag.BoolVar(&config.TestMUC, "test-muc", true, "Enable MUC room testing (join/wait/leave)") flag.BoolVar(&config.TestMUC, "test-muc", true, "Enable MUC room testing (join/wait/leave)")
flag.BoolVar(&config.TestDirectMessage, "test-dm", true, "Enable direct message testing (send message to admin user)") flag.BoolVar(&config.TestDirectMessage, "test-dm", true, "Enable direct message testing (send message to admin user)")
flag.BoolVar(&config.TestRoomExists, "test-room-exists", true, "Enable room existence testing using disco#info") flag.BoolVar(&config.TestRoomExists, "test-room-exists", true, "Enable room existence testing using disco#info")
flag.BoolVar(&config.TestXEP0077, "test-xep0077", true, "Enable XEP-0077 In-Band Registration testing (required if enabled)") flag.BoolVar(&config.TestXEP0077, "test-xep0077", true, "Enable XEP-0077 In-Band Registration testing with ghost user message test")
flag.BoolVar(&config.Verbose, "verbose", true, "Enable verbose logging") flag.BoolVar(&config.Verbose, "verbose", false, "Enable verbose logging")
flag.BoolVar(&config.InsecureSkipVerify, "insecure-skip-verify", true, "Skip TLS certificate verification (for development)") flag.BoolVar(&config.InsecureSkipVerify, "insecure-skip-verify", true, "Skip TLS certificate verification (for development)")
flag.Usage = func() { flag.Usage = func() {
@ -71,66 +73,44 @@ func main() {
flag.Parse() flag.Parse()
if config.Verbose { // Create the main logger
log.SetFlags(log.LstdFlags | log.Lmicroseconds) mainLogger := NewStructuredLogger(config.Verbose)
log.Printf("Starting XMPP client doctor...")
log.Printf("Configuration:") mainLogger.LogInfo("Starting XMPP client doctor")
log.Printf(" Server: %s", config.Server) mainLogger.LogInfo("Configuration",
log.Printf(" Username: %s", config.Username) "server", config.Server,
log.Printf(" Resource: %s", config.Resource) "username", config.Username,
log.Printf(" Password: %s", maskPassword(config.Password)) "resource", config.Resource,
if config.TestMUC { "password", maskPassword(config.Password),
log.Printf(" Test Room: %s", config.TestRoom) "test_room", config.TestRoom,
} "test_muc", config.TestMUC,
if config.TestDirectMessage { "test_direct_message", config.TestDirectMessage,
log.Printf(" Test Direct Messages: enabled") "test_room_exists", config.TestRoomExists,
} "test_xep0077", config.TestXEP0077)
if config.TestRoomExists {
log.Printf(" Test Room Existence: enabled")
}
if config.TestXEP0077 {
log.Printf(" Test XEP-0077 In-Band Registration: enabled")
}
}
// Test the XMPP client // Test the XMPP client
if err := testXMPPClient(config); err != nil { if err := testXMPPClient(config, mainLogger); err != nil {
log.Fatalf("❌ XMPP client test failed: %v", err) mainLogger.LogError("XMPP client test failed", "error", err)
os.Exit(1)
} }
if config.Verbose { mainLogger.LogInfo("XMPP client test completed successfully",
log.Printf("✅ XMPP client test completed successfully!") "xep0077_test", config.TestXEP0077,
} else { "muc_test", config.TestMUC,
fmt.Println("✅ XMPP client connectivity test passed!") "direct_message_test", config.TestDirectMessage,
if config.TestXEP0077 { "room_exists_test", config.TestRoomExists)
fmt.Println("✅ XMPP XEP-0077 In-Band Registration test passed!")
}
if config.TestMUC {
fmt.Println("✅ XMPP MUC operations test passed!")
}
if config.TestDirectMessage {
fmt.Println("✅ XMPP direct message test passed!")
}
if config.TestRoomExists {
fmt.Println("✅ XMPP room existence test passed!")
}
}
} }
func testXMPPClient(config *Config) error { func testXMPPClient(config *Config, logger *StructuredLogger) error {
if config.Verbose { logger.LogDebug("Creating XMPP client")
log.Printf("Creating XMPP client...")
}
// Create a simple logger for the XMPP client // Create a structured logger for the XMPP client (reuse the passed logger)
doctorLogger := &SimpleLogger{verbose: config.Verbose} doctorLogger := logger
// Create XMPP client with optional TLS configuration // Create XMPP client with optional TLS configuration
var client *xmpp.Client var client *xmpp.Client
if config.InsecureSkipVerify { if config.InsecureSkipVerify {
if config.Verbose { logger.LogDebug("Using insecure TLS configuration", "skip_verify", true)
log.Printf("Using insecure TLS configuration (skipping certificate verification)")
}
tlsConfig := &tls.Config{ tlsConfig := &tls.Config{
InsecureSkipVerify: true, //nolint:gosec // This is a testing tool for development environments InsecureSkipVerify: true, //nolint:gosec // This is a testing tool for development environments
} }
@ -154,9 +134,7 @@ func testXMPPClient(config *Config) error {
) )
} }
if config.Verbose { logger.LogDebug("Attempting to connect to XMPP server", "server", config.Server)
log.Printf("Attempting to connect to XMPP server...")
}
// Test connection // Test connection
start := time.Now() start := time.Now()
@ -166,10 +144,8 @@ func testXMPPClient(config *Config) error {
} }
connectDuration := time.Since(start) connectDuration := time.Since(start)
if config.Verbose { logger.LogInfo("Connected to XMPP server", "duration", connectDuration)
log.Printf("✅ Connected to XMPP server in %v", connectDuration) logger.LogDebug("Testing connection health")
log.Printf("Testing connection health...")
}
// Test connection health // Test connection health
start = time.Now() start = time.Now()
@ -179,9 +155,7 @@ func testXMPPClient(config *Config) error {
} }
pingDuration := time.Since(start) pingDuration := time.Since(start)
if config.Verbose { logger.LogInfo("Connection health test passed", "duration", pingDuration)
log.Printf("✅ Connection health test passed in %v", pingDuration)
}
var xep0077Duration time.Duration var xep0077Duration time.Duration
var mucDuration time.Duration var mucDuration time.Duration
@ -191,7 +165,7 @@ func testXMPPClient(config *Config) error {
// Test XEP-0077 In-Band Registration if requested // Test XEP-0077 In-Band Registration if requested
if config.TestXEP0077 { if config.TestXEP0077 {
start = time.Now() start = time.Now()
err = testXEP0077(client, config) err = testXEP0077(client, config, logger)
if err != nil { if err != nil {
return fmt.Errorf("XEP-0077 In-Band Registration test failed: %w", err) return fmt.Errorf("XEP-0077 In-Band Registration test failed: %w", err)
} }
@ -201,7 +175,7 @@ func testXMPPClient(config *Config) error {
// Test MUC operations if requested // Test MUC operations if requested
if config.TestMUC { if config.TestMUC {
start = time.Now() start = time.Now()
err = testMUCOperations(client, config) err = testMUCOperations(client, config, logger)
if err != nil { if err != nil {
return fmt.Errorf("MUC operations test failed: %w", err) return fmt.Errorf("MUC operations test failed: %w", err)
} }
@ -211,7 +185,7 @@ func testXMPPClient(config *Config) error {
// Test direct message if requested // Test direct message if requested
if config.TestDirectMessage { if config.TestDirectMessage {
start = time.Now() start = time.Now()
err = testDirectMessage(client, config) err = testDirectMessage(client, config, logger)
if err != nil { if err != nil {
return fmt.Errorf("direct message test failed: %w", err) return fmt.Errorf("direct message test failed: %w", err)
} }
@ -221,16 +195,14 @@ func testXMPPClient(config *Config) error {
// Test room existence if requested // Test room existence if requested
if config.TestRoomExists { if config.TestRoomExists {
start = time.Now() start = time.Now()
err = testRoomExists(client, config) err = testRoomExists(client, config, logger)
if err != nil { if err != nil {
return fmt.Errorf("room existence test failed: %w", err) return fmt.Errorf("room existence test failed: %w", err)
} }
roomExistsDuration = time.Since(start) roomExistsDuration = time.Since(start)
} }
if config.Verbose { logger.LogDebug("Disconnecting from XMPP server")
log.Printf("Disconnecting from XMPP server...")
}
// Disconnect // Disconnect
start = time.Now() start = time.Now()
@ -240,48 +212,37 @@ func testXMPPClient(config *Config) error {
} }
disconnectDuration := time.Since(start) disconnectDuration := time.Since(start)
if config.Verbose { totalTime := connectDuration + pingDuration + disconnectDuration
log.Printf("✅ Disconnected from XMPP server in %v", disconnectDuration) if config.TestXEP0077 {
log.Printf("Connection summary:") totalTime += xep0077Duration
log.Printf(" Connect time: %v", connectDuration)
log.Printf(" Ping time: %v", pingDuration)
if config.TestXEP0077 {
log.Printf(" XEP-0077 test time: %v", xep0077Duration)
}
if config.TestMUC {
log.Printf(" MUC operations time: %v", mucDuration)
}
if config.TestDirectMessage {
log.Printf(" Direct message time: %v", dmDuration)
}
if config.TestRoomExists {
log.Printf(" Room existence check time: %v", roomExistsDuration)
}
log.Printf(" Disconnect time: %v", disconnectDuration)
totalTime := connectDuration + pingDuration + disconnectDuration
if config.TestXEP0077 {
totalTime += xep0077Duration
}
if config.TestMUC {
totalTime += mucDuration
}
if config.TestDirectMessage {
totalTime += dmDuration
}
if config.TestRoomExists {
totalTime += roomExistsDuration
}
log.Printf(" Total time: %v", totalTime)
} }
if config.TestMUC {
totalTime += mucDuration
}
if config.TestDirectMessage {
totalTime += dmDuration
}
if config.TestRoomExists {
totalTime += roomExistsDuration
}
logger.LogInfo("Disconnected from XMPP server", "disconnect_duration", disconnectDuration)
logger.LogInfo("Connection summary",
"connect_time", connectDuration,
"ping_time", pingDuration,
"xep0077_time", xep0077Duration,
"muc_time", mucDuration,
"direct_message_time", dmDuration,
"room_exists_time", roomExistsDuration,
"disconnect_time", disconnectDuration,
"total_time", totalTime)
return nil return nil
} }
func testMUCOperations(client *xmpp.Client, config *Config) error { func testMUCOperations(client *xmpp.Client, config *Config, logger *StructuredLogger) error {
if config.Verbose { logger.LogInfo("Testing MUC operations", "room", config.TestRoom)
log.Printf("Testing MUC operations with room: %s", config.TestRoom) logger.LogDebug("Checking if room exists first")
log.Printf("First checking if room exists...")
}
// Check if room exists before attempting to join // Check if room exists before attempting to join
start := time.Now() start := time.Now()
@ -291,18 +252,13 @@ func testMUCOperations(client *xmpp.Client, config *Config) error {
} }
checkDuration := time.Since(start) checkDuration := time.Since(start)
if config.Verbose { logger.LogInfo("Room existence check completed", "duration", checkDuration, "room", config.TestRoom, "exists", exists)
log.Printf("✅ Room existence check completed in %v", checkDuration)
log.Printf("Room %s exists: %t", config.TestRoom, exists)
}
if !exists { if !exists {
return fmt.Errorf("cannot test MUC operations: room %s does not exist or is not accessible", config.TestRoom) return fmt.Errorf("cannot test MUC operations: room %s does not exist or is not accessible", config.TestRoom)
} }
if config.Verbose { logger.LogDebug("Room exists, proceeding to join")
log.Printf("Room exists, proceeding to join...")
}
// Test joining the room // Test joining the room
start = time.Now() start = time.Now()
@ -314,10 +270,8 @@ func testMUCOperations(client *xmpp.Client, config *Config) error {
var sendDuration time.Duration var sendDuration time.Duration
if config.Verbose { logger.LogInfo("Successfully joined MUC room", "duration", joinDuration)
log.Printf("✅ Successfully joined MUC room in %v", joinDuration) logger.LogDebug("Sending test message to room")
log.Printf("Sending test message to room...")
}
// Send a test message // Send a test message
testMessage := fmt.Sprintf("Test message from XMPP doctor at %s", time.Now().Format("15:04:05")) testMessage := fmt.Sprintf("Test message from XMPP doctor at %s", time.Now().Format("15:04:05"))
@ -333,18 +287,13 @@ func testMUCOperations(client *xmpp.Client, config *Config) error {
} }
sendDuration = time.Since(start) sendDuration = time.Since(start)
if config.Verbose { logger.LogInfo("Successfully sent message", "duration", sendDuration, "message", testMessage)
log.Printf("✅ Successfully sent message in %v", sendDuration) logger.LogDebug("Waiting 5 seconds in the room")
log.Printf("Message: %s", testMessage)
log.Printf("Waiting 5 seconds in the room...")
}
// Wait 5 seconds // Wait 5 seconds
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
if config.Verbose { logger.LogDebug("Attempting to leave MUC room")
log.Printf("Attempting to leave MUC room...")
}
// Test leaving the room // Test leaving the room
start = time.Now() start = time.Now()
@ -354,25 +303,21 @@ func testMUCOperations(client *xmpp.Client, config *Config) error {
} }
leaveDuration := time.Since(start) leaveDuration := time.Since(start)
if config.Verbose { logger.LogInfo("Successfully left MUC room", "duration", leaveDuration)
log.Printf("✅ Successfully left MUC room in %v", leaveDuration) logger.LogInfo("MUC operations summary",
log.Printf("MUC operations summary:") "room_check_time", checkDuration,
log.Printf(" Room existence check time: %v", checkDuration) "join_time", joinDuration,
log.Printf(" Join time: %v", joinDuration) "send_time", sendDuration,
log.Printf(" Send message time: %v", sendDuration) "wait_time", "5s",
log.Printf(" Wait time: 5s") "leave_time", leaveDuration,
log.Printf(" Leave time: %v", leaveDuration) "total_time", checkDuration+joinDuration+sendDuration+5*time.Second+leaveDuration)
log.Printf(" Total MUC time: %v", checkDuration+joinDuration+sendDuration+5*time.Second+leaveDuration)
}
return nil return nil
} }
func testDirectMessage(client *xmpp.Client, config *Config) error { func testDirectMessage(client *xmpp.Client, config *Config, logger *StructuredLogger) error {
if config.Verbose { logger.LogInfo("Testing direct message functionality")
log.Printf("Testing direct message functionality...") logger.LogDebug("Sending test message to admin user")
log.Printf("Sending test message to admin user...")
}
// Send a test message to the admin user // Send a test message to the admin user
testMessage := fmt.Sprintf("Test direct message from XMPP doctor at %s", time.Now().Format("15:04:05")) testMessage := fmt.Sprintf("Test direct message from XMPP doctor at %s", time.Now().Format("15:04:05"))
@ -385,22 +330,18 @@ func testDirectMessage(client *xmpp.Client, config *Config) error {
} }
sendDuration := time.Since(start) sendDuration := time.Since(start)
if config.Verbose { logger.LogInfo("Successfully sent direct message",
log.Printf("✅ Successfully sent direct message in %v", sendDuration) "duration", sendDuration,
log.Printf("Message: %s", testMessage) "message", testMessage,
log.Printf("Recipient: %s", adminJID) "recipient", adminJID)
log.Printf("Direct message test summary:") logger.LogInfo("Direct message test summary", "send_time", sendDuration)
log.Printf(" Send message time: %v", sendDuration)
}
return nil return nil
} }
func testRoomExists(client *xmpp.Client, config *Config) error { func testRoomExists(client *xmpp.Client, config *Config, logger *StructuredLogger) error {
if config.Verbose { logger.LogInfo("Testing room existence functionality")
log.Printf("Testing room existence functionality...") logger.LogDebug("Checking if test room exists", "room", config.TestRoom)
log.Printf("Checking if test room exists: %s", config.TestRoom)
}
// Test room existence check // Test room existence check
start := time.Now() start := time.Now()
@ -410,16 +351,11 @@ func testRoomExists(client *xmpp.Client, config *Config) error {
} }
checkDuration := time.Since(start) checkDuration := time.Since(start)
if config.Verbose { logger.LogInfo("Room existence check completed", "duration", checkDuration, "room", config.TestRoom, "exists", exists)
log.Printf("✅ Room existence check completed in %v", checkDuration)
log.Printf("Room %s exists: %t", config.TestRoom, exists)
}
// Test with a non-existent room to verify negative case // Test with a non-existent room to verify negative case
nonExistentRoom := "nonexistent-room-12345@conference.localhost" nonExistentRoom := "nonexistent-room-12345@conference.localhost"
if config.Verbose { logger.LogDebug("Testing negative case with non-existent room", "room", nonExistentRoom)
log.Printf("Testing negative case with non-existent room: %s", nonExistentRoom)
}
start = time.Now() start = time.Now()
existsNegative, err := client.CheckRoomExists(nonExistentRoom) existsNegative, err := client.CheckRoomExists(nonExistentRoom)
@ -428,14 +364,15 @@ func testRoomExists(client *xmpp.Client, config *Config) error {
} }
checkNegativeDuration := time.Since(start) checkNegativeDuration := time.Since(start)
if config.Verbose { logger.LogInfo("Negative room existence check completed",
log.Printf("✅ Negative room existence check completed in %v", checkNegativeDuration) "duration", checkNegativeDuration,
log.Printf("Non-existent room %s exists: %t (should be false)", nonExistentRoom, existsNegative) "room", nonExistentRoom,
log.Printf("Room existence test summary:") "exists", existsNegative,
log.Printf(" Test room check time: %v", checkDuration) "expected_false", true)
log.Printf(" Negative case check time: %v", checkNegativeDuration) logger.LogInfo("Room existence test summary",
log.Printf(" Total room existence test time: %v", checkDuration+checkNegativeDuration) "test_room_time", checkDuration,
} "negative_case_time", checkNegativeDuration,
"total_time", checkDuration+checkNegativeDuration)
return nil return nil
} }
@ -447,44 +384,60 @@ func maskPassword(password string) string {
return password[:2] + "****" return password[:2] + "****"
} }
// SimpleLogger provides basic logging functionality for the doctor command // StructuredLogger provides structured logging functionality for the doctor command using slog
type SimpleLogger struct { type StructuredLogger struct {
logger *slog.Logger
verbose bool verbose bool
} }
// LogDebug logs debug messages if verbose mode is enabled // NewStructuredLogger creates a new structured logger with colorized output
func (l *SimpleLogger) LogDebug(msg string, args ...interface{}) { func NewStructuredLogger(verbose bool) *StructuredLogger {
if l.verbose { // Configure log level based on verbose flag
log.Printf("[DEBUG] "+msg, args...) level := slog.LevelInfo
if verbose {
level = slog.LevelDebug
}
// Create tinted handler for colorized output
handler := tint.NewHandler(os.Stdout, &tint.Options{
Level: level,
TimeFormat: "15:04:05.000", // More concise time format
AddSource: false, // Don't show source file info
})
// Create logger with the handler
logger := slog.New(handler)
return &StructuredLogger{
logger: logger,
verbose: verbose,
} }
} }
// LogInfo logs info messages // LogDebug logs debug messages with structured key-value pairs
func (l *SimpleLogger) LogInfo(msg string, args ...interface{}) { func (l *StructuredLogger) LogDebug(msg string, keyValuePairs ...any) {
log.Printf("[INFO] "+msg, args...) l.logger.Debug(msg, keyValuePairs...)
} }
// LogWarn logs warning messages // LogInfo logs info messages with structured key-value pairs
func (l *SimpleLogger) LogWarn(msg string, args ...interface{}) { func (l *StructuredLogger) LogInfo(msg string, keyValuePairs ...any) {
log.Printf("[WARN] "+msg, args...) l.logger.Info(msg, keyValuePairs...)
} }
// LogError logs error messages // LogWarn logs warning messages with structured key-value pairs
func (l *SimpleLogger) LogError(msg string, args ...interface{}) { func (l *StructuredLogger) LogWarn(msg string, keyValuePairs ...any) {
log.Printf("[ERROR] "+msg, args...) l.logger.Warn(msg, keyValuePairs...)
} }
// testXEP0077 tests XEP-0077 In-Band Registration functionality by creating and deleting a test user // LogError logs error messages with structured key-value pairs
func testXEP0077(client *xmpp.Client, config *Config) error { func (l *StructuredLogger) LogError(msg string, keyValuePairs ...any) {
if config.Verbose { l.logger.Error(msg, keyValuePairs...)
log.Printf("Testing XEP-0077 In-Band Registration functionality...") }
}
// First, wait for server capability detection to complete // testXEP0077 tests XEP-0077 In-Band Registration by creating a ghost user and testing message sending
// This is handled asynchronously in the client Connect method func testXEP0077(client *xmpp.Client, config *Config, logger *StructuredLogger) error {
time.Sleep(2 * time.Second) logger.LogInfo("Testing XEP-0077 In-Band Registration with ghost user messaging")
// Check if server supports XEP-0077
inBandReg, err := client.GetInBandRegistration() inBandReg, err := client.GetInBandRegistration()
if err != nil { if err != nil {
return fmt.Errorf("server does not support XEP-0077 In-Band Registration: %w", err) return fmt.Errorf("server does not support XEP-0077 In-Band Registration: %w", err)
@ -494,99 +447,95 @@ func testXEP0077(client *xmpp.Client, config *Config) error {
return fmt.Errorf("XEP-0077 In-Band Registration is not enabled on this server") return fmt.Errorf("XEP-0077 In-Band Registration is not enabled on this server")
} }
if config.Verbose {
log.Printf("✅ Server supports XEP-0077 In-Band Registration")
}
serverJID := client.GetJID().Domain() serverJID := client.GetJID().Domain()
// Step 1: Test registration fields discovery // Step 1: Create ghost user with admin client
start := time.Now() ghostUsername := fmt.Sprintf("ghost_test_%d", time.Now().Unix())
if config.Verbose { ghostPassword := "testpass123"
log.Printf("Testing registration fields discovery for server: %s", serverJID.String()) ghostJID := fmt.Sprintf("%s@%s", ghostUsername, serverJID.String())
logger.LogInfo("Creating ghost user", "username", ghostUsername)
ghostRegistrationRequest := &xmpp.RegistrationRequest{
Username: ghostUsername,
Password: ghostPassword,
Email: fmt.Sprintf("%s@localhost", ghostUsername),
} }
fields, err := inBandReg.GetRegistrationFields(serverJID) ghostRegResponse, err := inBandReg.RegisterAccount(serverJID, ghostRegistrationRequest)
if err != nil { if err != nil {
return fmt.Errorf("failed to get registration fields from server: %w", err) return fmt.Errorf("failed to register ghost user: %w", err)
} }
fieldsDuration := time.Since(start) if !ghostRegResponse.Success {
return fmt.Errorf("ghost user registration failed: %s", ghostRegResponse.Error)
if config.Verbose {
log.Printf("✅ Registration fields discovery completed in %v", fieldsDuration)
log.Printf("Registration fields: required=%v, available=%d", fields.Required, len(fields.Fields))
} }
// Step 2: Create test user logger.LogInfo("Ghost user created successfully", "username", ghostUsername)
testUsername := fmt.Sprintf("xmpptest%d", time.Now().Unix())
testPassword := "testpass123"
testEmail := fmt.Sprintf("%s@localhost", testUsername)
if config.Verbose { // Step 2-7: Use ghost client for all operations
log.Printf("Creating test user: %s", testUsername) var ghostClient *xmpp.Client
} if config.InsecureSkipVerify {
tlsConfig := &tls.Config{InsecureSkipVerify: true} //nolint:gosec // Testing tool
registrationRequest := &xmpp.RegistrationRequest{ ghostClient = xmpp.NewClientWithTLS(config.Server, ghostJID, ghostPassword, "ghost_doctor", "ghost-remote-id", tlsConfig, logger)
Username: testUsername,
Password: testPassword,
Email: testEmail,
}
start = time.Now()
regResponse, err := inBandReg.RegisterAccount(serverJID, registrationRequest)
if err != nil {
return fmt.Errorf("failed to register test user '%s': %w", testUsername, err)
}
registerDuration := time.Since(start)
if !regResponse.Success {
return fmt.Errorf("user registration failed: %s", regResponse.Error)
}
if config.Verbose {
log.Printf("✅ Test user '%s' registered successfully in %v", testUsername, registerDuration)
log.Printf("Registration response: %s", regResponse.Message)
}
// Step 3: Delete test user (cleanup)
if config.Verbose {
log.Printf("Cleaning up: removing test user '%s'", testUsername)
}
start = time.Now()
cancelResponse, err := inBandReg.CancelRegistration(serverJID)
if err != nil {
if config.Verbose {
log.Printf("⚠️ Failed to remove test user '%s': %v", testUsername, err)
log.Printf("⚠️ Manual cleanup may be required")
}
} else { } else {
cancelDuration := time.Since(start) ghostClient = xmpp.NewClient(config.Server, ghostJID, ghostPassword, "ghost_doctor", "ghost-remote-id", logger)
if cancelResponse.Success {
if config.Verbose {
log.Printf("✅ Test user '%s' removed successfully in %v", testUsername, cancelDuration)
}
} else {
if config.Verbose {
log.Printf("⚠️ User removal may have failed: %s", cancelResponse.Error)
}
}
} }
if config.Verbose { // Step 2: Connect ghost client
log.Printf("XEP-0077 test summary:") if err := ghostClient.Connect(); err != nil {
log.Printf(" Server support check: ✅") return fmt.Errorf("failed to connect ghost user: %w", err)
log.Printf(" Registration fields discovery time: %v", fieldsDuration) }
log.Printf(" User registration time: %v", registerDuration) logger.LogInfo("Ghost user connected")
log.Printf(" Test username: %s", testUsername)
log.Printf(" Required fields count: %d", len(fields.Required)) // Step 3: Check test room exists
log.Printf(" User creation: ✅") exists, err := ghostClient.CheckRoomExists(config.TestRoom)
if err == nil && cancelResponse.Success { if err != nil {
log.Printf(" User cleanup: ✅") return fmt.Errorf("failed to check room existence: %w", err)
} else { }
log.Printf(" User cleanup: ⚠️") if !exists {
} return fmt.Errorf("test room %s does not exist", config.TestRoom)
} }
// Step 4: Join test room
if err := ghostClient.JoinRoom(config.TestRoom); err != nil {
return fmt.Errorf("failed to join room: %w", err)
}
logger.LogInfo("Ghost user joined room", "room", config.TestRoom)
// Step 5: Send message to test room
testMessage := fmt.Sprintf("Test ghost user message from %s at %s", ghostUsername, time.Now().Format("15:04:05"))
messageReq := xmpp.MessageRequest{
RoomJID: config.TestRoom,
GhostUserJID: ghostJID,
Message: testMessage,
}
if _, err := ghostClient.SendMessage(&messageReq); err != nil {
return fmt.Errorf("failed to send message: %w", err)
}
logger.LogInfo("Ghost user sent message", "message", testMessage)
// Step 6: Cancel account (ghost user cancels their own registration)
ghostInBandReg, err := ghostClient.GetInBandRegistration()
if err != nil {
return fmt.Errorf("failed to get XEP-0077 handler for ghost user: %w", err)
}
ghostCancellationRequest := &xmpp.CancellationRequest{Username: ghostUsername}
ghostCancelResponse, err := ghostInBandReg.CancelRegistration(serverJID, ghostCancellationRequest)
if err != nil {
return fmt.Errorf("failed to cancel ghost user registration: %w", err)
}
if !ghostCancelResponse.Success {
return fmt.Errorf("ghost user registration cancellation failed: %s", ghostCancelResponse.Error)
}
logger.LogInfo("Ghost user registration cancelled successfully")
// Clean disconnect
if err := ghostClient.Disconnect(); err != nil {
logger.LogWarn("Failed to disconnect ghost client", "error", err)
}
logger.LogInfo("XEP-0077 ghost user test completed successfully", "username", ghostUsername)
return nil return nil
} }

1
go.mod
View file

@ -5,6 +5,7 @@ go 1.24.3
require ( require (
github.com/gorilla/mux v1.8.1 github.com/gorilla/mux v1.8.1
github.com/jellydator/ttlcache/v3 v3.4.0 github.com/jellydator/ttlcache/v3 v3.4.0
github.com/lmittmann/tint v1.1.2
github.com/mattermost/mattermost/server/public v0.1.10 github.com/mattermost/mattermost/server/public v0.1.10
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.10.0 github.com/stretchr/testify v1.10.0

2
go.sum
View file

@ -425,6 +425,8 @@ github.com/leonklingele/grouper v1.1.2 h1:o1ARBDLOmmasUaNDesWqWCIFH3u7hoFlM84Yrj
github.com/leonklingele/grouper v1.1.2/go.mod h1:6D0M/HVkhs2yRKRFZUoGjeDy7EZTfFBE9gl4kjmIGkA= github.com/leonklingele/grouper v1.1.2/go.mod h1:6D0M/HVkhs2yRKRFZUoGjeDy7EZTfFBE9gl4kjmIGkA=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lmittmann/tint v1.1.2 h1:2CQzrL6rslrsyjqLDwD11bZ5OpLBPU+g3G/r5LSfS8w=
github.com/lmittmann/tint v1.1.2/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
github.com/lufeee/execinquery v1.2.1 h1:hf0Ems4SHcUGBxpGN7Jz78z1ppVkP/837ZlETPCEtOM= github.com/lufeee/execinquery v1.2.1 h1:hf0Ems4SHcUGBxpGN7Jz78z1ppVkP/837ZlETPCEtOM=
github.com/lufeee/execinquery v1.2.1/go.mod h1:EC7DrEKView09ocscGHC+apXMIaorh4xqSxS/dy8SbM= github.com/lufeee/execinquery v1.2.1/go.mod h1:EC7DrEKView09ocscGHC+apXMIaorh4xqSxS/dy8SbM=
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=

View file

@ -17,9 +17,6 @@
}, },
"executable": "" "executable": ""
}, },
"webapp": {
"bundle_path": "webapp/dist/main.js"
},
"settings_schema": { "settings_schema": {
"header": "Configure the XMPP bridge connection settings below.", "header": "Configure the XMPP bridge connection settings below.",
"footer": "For more information about setting up the XMPP bridge, see the [documentation](https://github.com/mattermost/mattermost-plugin-bridge-xmpp/blob/main/README.md).", "footer": "For more information about setting up the XMPP bridge, see the [documentation](https://github.com/mattermost/mattermost-plugin-bridge-xmpp/blob/main/README.md).",
@ -65,12 +62,42 @@
"secret": false "secret": false
}, },
{ {
"key": "XMPPUsernamePrefix", "key": "EnableXMPPGhostUsers",
"display_name": "XMPP Username Prefix", "display_name": "Enable XMPP Ghost Users",
"type": "bool",
"help_text": "When enabled, individual XMPP accounts will be created for each Mattermost user using XEP-0077 In-Band Registration. If disabled or unsupported, the bridge user will be used for all communications.",
"placeholder": "",
"default": false,
"hosting": "",
"secret": false
},
{
"key": "XMPPGhostUserPrefix",
"display_name": "XMPP Ghost User Prefix",
"type": "text", "type": "text",
"help_text": "Prefix for XMPP users in Mattermost (e.g., 'xmpp' creates usernames like 'xmpp:user@domain')", "help_text": "Prefix for ghost user accounts created on the XMPP server (e.g., 'mm_' creates users like 'mm_john@xmpp.example.com'). Required when ghost users are enabled.",
"placeholder": "xmpp", "placeholder": "mm_",
"default": "xmpp", "default": null,
"hosting": "",
"secret": false
},
{
"key": "XMPPGhostUserDomain",
"display_name": "XMPP Ghost User Domain",
"type": "text",
"help_text": "Domain for ghost user accounts on the XMPP server. If not specified, uses the domain from the XMPP Server URL.",
"placeholder": "xmpp.example.com",
"default": null,
"hosting": "",
"secret": false
},
{
"key": "XMPPGhostUserCleanup",
"display_name": "Enable Ghost User Cleanup",
"type": "bool",
"help_text": "When enabled, ghost user accounts will be automatically removed from the XMPP server when Mattermost users are deleted. Disable to preserve accounts.",
"placeholder": "",
"default": true,
"hosting": "", "hosting": "",
"secret": false "secret": false
}, },
@ -102,4 +129,4 @@
"version": "v0.1.4" "version": "v0.1.4"
} }
} }
} }

View file

@ -134,20 +134,6 @@ func (b *xmppBridge) createUserManager(cfg *config.Configuration, bridgeID strin
return NewXMPPUserManager(bridgeID, log, store, b.api, cfg, b.bridgeClient) return NewXMPPUserManager(bridgeID, log, store, b.api, cfg, b.bridgeClient)
} }
// waitForCapabilityDetection waits for server capability detection to complete
func (b *xmppBridge) waitForCapabilityDetection() error {
if b.bridgeClient == nil {
return fmt.Errorf("bridge client not available")
}
// Trigger capability detection synchronously
if err := b.bridgeClient.DetectServerCapabilities(); err != nil {
return fmt.Errorf("failed to detect server capabilities: %w", err)
}
return nil
}
// checkXEP0077Support checks if the XMPP server supports XEP-0077 In-Band Registration // checkXEP0077Support checks if the XMPP server supports XEP-0077 In-Band Registration
func (b *xmppBridge) checkXEP0077Support() (bool, error) { func (b *xmppBridge) checkXEP0077Support() (bool, error) {
if b.bridgeClient == nil { if b.bridgeClient == nil {
@ -237,14 +223,14 @@ func (b *xmppBridge) Start() error {
return fmt.Errorf("failed to connect to XMPP server: %w", err) return fmt.Errorf("failed to connect to XMPP server: %w", err)
} }
// Wait for server capability detection to complete before creating user manager
if err := b.waitForCapabilityDetection(); err != nil {
return fmt.Errorf("failed to detect server capabilities: %w", err)
}
// Initialize proper user manager now that we're connected and server capabilities are detected // Initialize proper user manager now that we're connected and server capabilities are detected
b.userManager = b.createUserManager(cfg, b.bridgeID, b.logger, b.kvstore) b.userManager = b.createUserManager(cfg, b.bridgeID, b.logger, b.kvstore)
// Start the user manager to enable lifecycle management
if err := b.userManager.Start(b.ctx); err != nil {
return fmt.Errorf("failed to start user manager: %w", err)
}
// Load and join mapped channels // Load and join mapped channels
if err := b.loadAndJoinMappedChannels(); err != nil { if err := b.loadAndJoinMappedChannels(); err != nil {
b.logger.LogWarn("Failed to join some mapped channels", "error", err) b.logger.LogWarn("Failed to join some mapped channels", "error", err)
@ -265,6 +251,11 @@ func (b *xmppBridge) Stop() error {
b.cancel() b.cancel()
} }
// Stop the user manager to stop lifecycle management
if err := b.userManager.Stop(); err != nil {
b.logger.LogWarn("Error stopping user manager", "error", err)
}
if b.bridgeClient != nil { if b.bridgeClient != nil {
if err := b.bridgeClient.Disconnect(); err != nil { if err := b.bridgeClient.Disconnect(); err != nil {
b.logger.LogWarn("Error disconnecting from XMPP server", "error", err) b.logger.LogWarn("Error disconnecting from XMPP server", "error", err)

View file

@ -123,6 +123,12 @@ func (h *xmppMessageHandler) sendMessageViaGhostUser(xmppUserManager *UserManage
return h.sendMessageViaBridgeUser(msg, roomJID) return h.sendMessageViaBridgeUser(msg, roomJID)
} }
// Update user activity in KV store after successful message send
if err := xmppUserManager.UpdateUserActivity(msg.SourceUserID); err != nil {
h.logger.LogError("Failed to update user activity after message send, user may never disconnect", "user_id", msg.SourceUserID, "error", err)
// Don't fail the message send for activity update failures
}
h.logger.LogDebug("Message sent via ghost user", h.logger.LogDebug("Message sent via ghost user",
"source_user_id", msg.SourceUserID, "source_user_id", msg.SourceUserID,
"ghost_jid", xmppUser.GetJID(), "ghost_jid", xmppUser.GetJID(),

View file

@ -14,6 +14,14 @@ import (
xmppClient "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp" xmppClient "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp"
) )
const (
// ghostUserInactivityTimeout is the duration after which inactive ghost users are disconnected
ghostUserInactivityTimeout = 2 * time.Minute
// ghostUserActivityCheckInterval is how often we check for inactive ghost users
ghostUserActivityCheckInterval = 1 * time.Minute
)
// User represents an XMPP user that implements the BridgeUser interface // User represents an XMPP user that implements the BridgeUser interface
type User struct { type User struct {
// User identity // User identity
@ -29,6 +37,11 @@ type User struct {
stateMu sync.RWMutex stateMu sync.RWMutex
connected atomic.Bool connected atomic.Bool
// Activity tracking for lifecycle management
lastActivity time.Time
activityMu sync.RWMutex
enableLifecycleCheck bool // Whether this user should be subject to inactivity disconnection
// Configuration // Configuration
config *config.Configuration config *config.Configuration
@ -42,6 +55,11 @@ type User struct {
// NewXMPPUser creates a new XMPP user with specific credentials // NewXMPPUser creates a new XMPP user with specific credentials
func NewXMPPUser(id, displayName, jid, password string, cfg *config.Configuration, log logger.Logger) *User { func NewXMPPUser(id, displayName, jid, password string, cfg *config.Configuration, log logger.Logger) *User {
return NewXMPPUserWithActivity(id, displayName, jid, password, cfg, log, time.Now(), false)
}
// NewXMPPUserWithActivity creates a new XMPP user with specific credentials, last activity time, and lifecycle setting
func NewXMPPUserWithActivity(id, displayName, jid, password string, cfg *config.Configuration, log logger.Logger, lastActivity time.Time, enableLifecycle bool) *User {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// Create TLS config based on certificate verification setting // Create TLS config based on certificate verification setting
@ -61,15 +79,17 @@ func NewXMPPUser(id, displayName, jid, password string, cfg *config.Configuratio
) )
return &User{ return &User{
id: id, id: id,
displayName: displayName, displayName: displayName,
jid: jid, jid: jid,
client: client, client: client,
state: model.UserStateOffline, state: model.UserStateOffline,
config: cfg, config: cfg,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
logger: log, logger: log,
lastActivity: lastActivity, // Use provided activity time
enableLifecycleCheck: enableLifecycle, // Use provided lifecycle setting
} }
} }
@ -164,6 +184,9 @@ func (u *User) SendMessageToChannel(channelID, message string) error {
u.logger.LogDebug("XMPP user sending message to channel", "user_id", u.id, "channel_id", channelID) u.logger.LogDebug("XMPP user sending message to channel", "user_id", u.id, "channel_id", channelID)
// Update activity timestamp for this user interaction
u.UpdateLastActivity()
// Ensure we're joined to the room before sending the message // Ensure we're joined to the room before sending the message
if err := u.EnsureJoinedToRoom(channelID); err != nil { if err := u.EnsureJoinedToRoom(channelID); err != nil {
return fmt.Errorf("failed to ensure joined to room before sending message: %w", err) return fmt.Errorf("failed to ensure joined to room before sending message: %w", err)
@ -368,46 +391,41 @@ func (u *User) GetClient() *xmppClient.Client {
return u.client return u.client
} }
// UpdateCredentials updates the user's JID and password for ghost user mode // Activity tracking methods
// This creates a new XMPP client with the updated credentials
func (u *User) UpdateCredentials(newJID, newPassword string) error {
u.logger.LogDebug("Updating XMPP user credentials", "user_id", u.id, "old_jid", u.jid, "new_jid", newJID)
// Disconnect existing client if connected // UpdateLastActivity updates the last activity timestamp for this user
wasConnected := u.IsConnected() func (u *User) UpdateLastActivity() {
if wasConnected { u.activityMu.Lock()
if err := u.Disconnect(); err != nil { defer u.activityMu.Unlock()
u.logger.LogWarn("Error disconnecting before credential update", "user_id", u.id, "error", err) u.lastActivity = time.Now()
} u.logger.LogDebug("Updated last activity for user", "user_id", u.id, "timestamp", u.lastActivity)
} }
// Create TLS config based on certificate verification setting // GetLastActivity returns the last activity timestamp
tlsConfig := &tls.Config{ func (u *User) GetLastActivity() time.Time {
InsecureSkipVerify: u.config.XMPPInsecureSkipVerify, //nolint:gosec // Allow insecure TLS for testing environments u.activityMu.RLock()
} defer u.activityMu.RUnlock()
return u.lastActivity
// Create new XMPP client with updated credentials }
newClient := xmppClient.NewClientWithTLS(
u.config.XMPPServerURL, // IsInactive returns true if the user has been inactive longer than the specified duration
newJID, func (u *User) IsInactive(inactivityThreshold time.Duration) bool {
newPassword, u.activityMu.RLock()
u.config.GetXMPPResource(), defer u.activityMu.RUnlock()
u.id, // Use user ID as remote ID return time.Since(u.lastActivity) > inactivityThreshold
tlsConfig, }
u.logger,
) // SetLifecycleManagement enables or disables lifecycle management for this user
func (u *User) SetLifecycleManagement(enabled bool) {
// Update user fields u.activityMu.Lock()
u.jid = newJID defer u.activityMu.Unlock()
u.client = newClient u.enableLifecycleCheck = enabled
u.logger.LogDebug("Lifecycle management setting changed", "user_id", u.id, "enabled", enabled)
// Reconnect if we were previously connected }
if wasConnected {
if err := u.Connect(); err != nil { // IsLifecycleManaged returns true if this user is subject to lifecycle management
return fmt.Errorf("failed to reconnect after credential update: %w", err) func (u *User) IsLifecycleManaged() bool {
} u.activityMu.RLock()
} defer u.activityMu.RUnlock()
return u.enableLifecycleCheck
u.logger.LogInfo("XMPP user credentials updated successfully", "user_id", u.id, "new_jid", newJID)
return nil
} }

View file

@ -3,9 +3,11 @@ package xmpp
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/tls"
"encoding/json" "encoding/json"
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/mattermost/mattermost/server/public/plugin" "github.com/mattermost/mattermost/server/public/plugin"
"mellium.im/xmpp/jid" "mellium.im/xmpp/jid"
@ -33,6 +35,8 @@ type GhostUserData struct {
GhostJID string `json:"ghost_jid"` // XMPP JID of the ghost user GhostJID string `json:"ghost_jid"` // XMPP JID of the ghost user
GhostPassword string `json:"ghost_password"` // XMPP password for the ghost user GhostPassword string `json:"ghost_password"` // XMPP password for the ghost user
Created int64 `json:"created"` // Timestamp when ghost was created Created int64 `json:"created"` // Timestamp when ghost was created
LastActivity int64 `json:"last_activity"` // Unix timestamp of last activity
LifecycleEnabled bool `json:"lifecycle_enabled"` // Whether this user should be subject to inactivity checking
} }
// UserManager manages XMPP users using XEP-0077 ghost users ONLY // UserManager manages XMPP users using XEP-0077 ghost users ONLY
@ -47,11 +51,23 @@ type UserManager struct {
bridgeClient *xmppClient.Client bridgeClient *xmppClient.Client
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
// Node identification for HA environments
nodeID string // Unique identifier for this Mattermost node (from api.GetDiagnosticId)
// Connection caching to prevent connection leaks
activeUsers map[string]*User // Cache of connected users on THIS node
activeUsersMu sync.RWMutex // Protects activeUsers map
} }
// NewXMPPUserManager creates a new XMPP-specific user manager for ghost users only // NewXMPPUserManager creates a new XMPP-specific user manager for ghost users only
func NewXMPPUserManager(bridgeType string, log logger.Logger, store kvstore.KVStore, api plugin.API, cfg *config.Configuration, bridgeClient *xmppClient.Client) model.BridgeUserManager { func NewXMPPUserManager(bridgeType string, log logger.Logger, store kvstore.KVStore, api plugin.API, cfg *config.Configuration, bridgeClient *xmppClient.Client) model.BridgeUserManager {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// Get unique node ID from Mattermost API for HA environments
nodeID := api.GetDiagnosticId()
log.LogDebug("Initializing XMPP user manager", "bridge_type", bridgeType, "node_id", nodeID[:8])
return &UserManager{ return &UserManager{
bridgeType: bridgeType, bridgeType: bridgeType,
logger: log, logger: log,
@ -61,6 +77,8 @@ func NewXMPPUserManager(bridgeType string, log logger.Logger, store kvstore.KVSt
bridgeClient: bridgeClient, bridgeClient: bridgeClient,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
nodeID: nodeID,
activeUsers: make(map[string]*User), // Initialize connection cache
} }
} }
@ -153,34 +171,61 @@ func (m *UserManager) CreateUser(user model.BridgeUser) error {
return nil return nil
} }
// GetUser retrieves a user by Mattermost user ID, creating XMPPUser from ghost data // GetUser retrieves a user by Mattermost user ID, checking cache first, then creating XMPPUser from ghost data
func (m *UserManager) GetUser(mattermostUserID string) (model.BridgeUser, error) { func (m *UserManager) GetUser(mattermostUserID string) (model.BridgeUser, error) {
// First check the connection cache
if cachedUser, found := m.getCachedUser(mattermostUserID); found {
m.logger.LogDebug("Found user in connection cache", "user_id", mattermostUserID, "ghost_jid", cachedUser.GetJID())
return cachedUser, nil
}
// Check if ghost user data exists // Check if ghost user data exists
ghostData, err := m.loadGhostUserData(mattermostUserID) ghostData, err := m.loadGhostUserData(mattermostUserID)
if err != nil { if err != nil {
return nil, fmt.Errorf("ghost user not found for Mattermost user %s: %w", mattermostUserID, err) return nil, fmt.Errorf("ghost user not found for Mattermost user %s: %w", mattermostUserID, err)
} }
// Create XMPPUser directly with ghost credentials // Create XMPPUser directly with ghost credentials and activity data
m.configMu.RLock() m.configMu.RLock()
cfg := m.config cfg := m.config
m.configMu.RUnlock() m.configMu.RUnlock()
user := NewXMPPUser(mattermostUserID, mattermostUserID, ghostData.GhostJID, ghostData.GhostPassword, cfg, m.logger) // Handle migration of existing users without activity data
lastActivity := time.Now()
if ghostData.LastActivity > 0 {
lastActivity = time.Unix(ghostData.LastActivity, 0)
} else {
// Update the KV store with the migration data
ghostData.LastActivity = lastActivity.Unix()
ghostData.LifecycleEnabled = true
_ = m.storeGhostUserData(mattermostUserID, ghostData) // Don't fail if storage update fails
}
user := m.createXMPPUserWithActivity(mattermostUserID, mattermostUserID, ghostData.GhostJID, ghostData.GhostPassword, cfg, m.logger, lastActivity, ghostData.LifecycleEnabled)
// Ensure the user is connected // Ensure the user is connected
if err := m.ensureUserConnected(user, mattermostUserID); err != nil { if err := m.ensureUserConnected(user, mattermostUserID); err != nil {
return nil, fmt.Errorf("failed to ensure ghost user is connected: %w", err) return nil, fmt.Errorf("failed to ensure ghost user is connected: %w", err)
} }
// Cache the connected user to prevent connection leaks
m.cacheUser(mattermostUserID, user)
return user, nil return user, nil
} }
// GetOrCreateUser retrieves a user by Mattermost user ID, creating a new ghost user if it doesn't exist // GetOrCreateUser retrieves a user by Mattermost user ID, creating a new ghost user if it doesn't exist
func (m *UserManager) GetOrCreateUser(mattermostUserID, displayName string) (model.BridgeUser, error) { func (m *UserManager) GetOrCreateUser(mattermostUserID, displayName string) (model.BridgeUser, error) {
// Try to get existing user first // First check the connection cache
if cachedUser, found := m.getCachedUser(mattermostUserID); found {
m.logger.LogDebug("Found user in connection cache", "user_id", mattermostUserID, "ghost_jid", cachedUser.GetJID())
return cachedUser, nil
}
// Try to get existing user from KV store
user, err := m.GetUser(mattermostUserID) user, err := m.GetUser(mattermostUserID)
if err == nil { if err == nil {
// GetUser already cached the user, so just return it
return user, nil return user, nil
} }
@ -200,15 +245,20 @@ func (m *UserManager) GetOrCreateUser(mattermostUserID, displayName string) (mod
return nil, fmt.Errorf("failed to register ghost user: %w", err) return nil, fmt.Errorf("failed to register ghost user: %w", err)
} }
// Create XMPPUser instance with the correct ghost credentials // Initialize activity data for new user
xmppUser := NewXMPPUser(mattermostUserID, displayName, ghostJID, ghostPassword, cfg, m.logger) now := time.Now()
// Store ghost user data // Create XMPPUser instance with the correct ghost credentials and activity data
xmppUser := m.createXMPPUserWithActivity(mattermostUserID, displayName, ghostJID, ghostPassword, cfg, m.logger, now, true)
// Store ghost user data with activity tracking
ghostData := &GhostUserData{ ghostData := &GhostUserData{
MattermostUserID: mattermostUserID, MattermostUserID: mattermostUserID,
GhostJID: ghostJID, GhostJID: ghostJID,
GhostPassword: ghostPassword, GhostPassword: ghostPassword,
Created: m.getCurrentTimestamp(), Created: now.Unix(),
LastActivity: now.Unix(),
LifecycleEnabled: true,
} }
if err := m.storeGhostUserData(mattermostUserID, ghostData); err != nil { if err := m.storeGhostUserData(mattermostUserID, ghostData); err != nil {
@ -221,6 +271,9 @@ func (m *UserManager) GetOrCreateUser(mattermostUserID, displayName string) (mod
return nil, fmt.Errorf("failed to connect newly created ghost user: %w", err) return nil, fmt.Errorf("failed to connect newly created ghost user: %w", err)
} }
// Cache the connected user to prevent connection leaks
m.cacheUser(mattermostUserID, xmppUser)
m.logger.LogInfo("Ghost user created and connected successfully", "mattermost_user_id", mattermostUserID, "ghost_jid", ghostJID) m.logger.LogInfo("Ghost user created and connected successfully", "mattermost_user_id", mattermostUserID, "ghost_jid", ghostJID)
return xmppUser, nil return xmppUser, nil
} }
@ -234,6 +287,15 @@ func (m *UserManager) DeleteUser(mattermostUserID string) error {
return fmt.Errorf("ghost user not found for Mattermost user %s", mattermostUserID) return fmt.Errorf("ghost user not found for Mattermost user %s", mattermostUserID)
} }
// Disconnect and remove from cache if user is currently active
if cachedUser, found := m.getCachedUser(mattermostUserID); found {
if err := cachedUser.Disconnect(); err != nil {
m.logger.LogWarn("Failed to disconnect cached user during deletion", "mattermost_user_id", mattermostUserID, "error", err)
}
m.removeCachedUser(mattermostUserID)
m.logger.LogDebug("Disconnected and removed user from cache during deletion", "mattermost_user_id", mattermostUserID)
}
// Clean up ghost user account if cleanup is enabled // Clean up ghost user account if cleanup is enabled
m.configMu.RLock() m.configMu.RLock()
shouldCleanup := m.config.IsGhostUserCleanupEnabled() shouldCleanup := m.config.IsGhostUserCleanupEnabled()
@ -301,9 +363,16 @@ func (m *UserManager) Start(ctx context.Context) error {
// Continue starting other users even if one fails // Continue starting other users even if one fails
} else { } else {
startedCount++ startedCount++
// Enable lifecycle management for ghost users
if xmppUser, ok := user.(*User); ok {
xmppUser.SetLifecycleManagement(true)
}
} }
} }
// Start the lifecycle management goroutine
go m.lifecycleManager()
m.logger.LogInfo("XMPP ghost user manager started", "bridge_type", m.bridgeType, "user_count", startedCount) m.logger.LogInfo("XMPP ghost user manager started", "bridge_type", m.bridgeType, "user_count", startedCount)
return nil return nil
} }
@ -312,19 +381,43 @@ func (m *UserManager) Start(ctx context.Context) error {
func (m *UserManager) Stop() error { func (m *UserManager) Stop() error {
m.logger.LogDebug("Stopping XMPP ghost user manager", "bridge_type", m.bridgeType) m.logger.LogDebug("Stopping XMPP ghost user manager", "bridge_type", m.bridgeType)
// Cancel context to stop background goroutines
if m.cancel != nil { if m.cancel != nil {
m.cancel() m.cancel()
} }
// Get all users from KV store and stop them // Gracefully shutdown all cached connections first (much faster than ListUsers)
users := m.ListUsers() cachedUsers := m.getCachedUsers()
for _, user := range users { disconnectedCount := 0
for _, user := range cachedUsers {
if err := user.Stop(); err != nil { if err := user.Stop(); err != nil {
m.logger.LogWarn("Error stopping ghost user during manager shutdown", "bridge_type", m.bridgeType, "user_id", user.GetID(), "error", err) m.logger.LogWarn("Error stopping cached ghost user during manager shutdown", "bridge_type", m.bridgeType, "user_id", user.GetID(), "error", err)
} else {
disconnectedCount++
} }
} }
m.logger.LogInfo("XMPP ghost user manager stopped", "bridge_type", m.bridgeType) // Clear the entire cache
m.activeUsersMu.Lock()
m.activeUsers = make(map[string]*User)
m.activeUsersMu.Unlock()
// Also check for any users not in cache and stop them (fallback)
users := m.ListUsers()
fallbackStoppedCount := 0
for _, user := range users {
if err := user.Stop(); err != nil {
m.logger.LogWarn("Error stopping ghost user during manager shutdown", "bridge_type", m.bridgeType, "user_id", user.GetID(), "error", err)
} else {
fallbackStoppedCount++
}
}
m.logger.LogInfo("XMPP ghost user manager stopped",
"bridge_type", m.bridgeType,
"cached_users_stopped", disconnectedCount,
"fallback_users_stopped", fallbackStoppedCount)
return nil return nil
} }
@ -376,6 +469,30 @@ func (m *UserManager) removeGhostUserData(mattermostUserID string) error {
return m.kvstore.Delete(key) return m.kvstore.Delete(key)
} }
// UpdateUserActivity updates both the in-memory and persisted activity timestamp for a user
func (m *UserManager) UpdateUserActivity(mattermostUserID string) error {
// Load existing ghost user data
ghostData, err := m.loadGhostUserData(mattermostUserID)
if err != nil {
return fmt.Errorf("failed to load ghost user data for activity update: %w", err)
}
// Update the activity timestamp
now := time.Now()
ghostData.LastActivity = now.Unix()
// Store the updated data
if err := m.storeGhostUserData(mattermostUserID, ghostData); err != nil {
return fmt.Errorf("failed to persist activity update: %w", err)
}
m.logger.LogDebug("Updated user activity in KV store",
"user_id", mattermostUserID,
"timestamp", now)
return nil
}
func (m *UserManager) cleanupGhostUser(mattermostUserID string) error { func (m *UserManager) cleanupGhostUser(mattermostUserID string) error {
ghostData, err := m.loadGhostUserData(mattermostUserID) ghostData, err := m.loadGhostUserData(mattermostUserID)
if err != nil { if err != nil {
@ -396,7 +513,10 @@ func (m *UserManager) cleanupGhostUser(mattermostUserID string) error {
} }
// Unregister the ghost user account via XEP-0077 // Unregister the ghost user account via XEP-0077
response, err := regHandler.CancelRegistration(ghostJIDParsed.Domain()) cancellationRequest := &xmppClient.CancellationRequest{
Username: ghostJIDParsed.Localpart(), // Extract username from ghost JID
}
response, err := regHandler.CancelRegistration(ghostJIDParsed.Domain(), cancellationRequest)
if err != nil { if err != nil {
return fmt.Errorf("failed to cancel registration for ghost user %s: %w", ghostData.GhostJID, err) return fmt.Errorf("failed to cancel registration for ghost user %s: %w", ghostData.GhostJID, err)
} }
@ -446,6 +566,168 @@ func generateSecurePassword() string {
return "temp_secure_password_123" return "temp_secure_password_123"
} }
// lifecycleManager runs periodically to check for inactive ghost users and disconnect them
func (m *UserManager) lifecycleManager() {
m.logger.LogDebug("Starting lifecycle manager for ghost user cleanup")
ticker := time.NewTicker(ghostUserActivityCheckInterval)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
m.logger.LogDebug("Lifecycle manager stopped due to context cancellation")
return
case <-ticker.C:
m.checkAndDisconnectInactiveUsers()
}
}
}
// checkAndDisconnectInactiveUsers checks all cached users for inactivity and disconnects inactive ghost users
func (m *UserManager) checkAndDisconnectInactiveUsers() {
m.logger.LogDebug("Checking cached users for inactivity")
// Get all currently cached users (this is much more efficient than ListUsers)
cachedUsers := m.getCachedUsers()
inactiveCount := 0
disconnectedCount := 0
for _, xmppUser := range cachedUsers {
// Only check users that have lifecycle management enabled
if !xmppUser.IsLifecycleManaged() {
continue
}
// Check if user is connected and inactive
if xmppUser.IsConnected() && xmppUser.IsInactive(ghostUserInactivityTimeout) {
inactiveCount++
lastActivity := xmppUser.GetLastActivity()
inactiveDuration := time.Since(lastActivity)
m.logger.LogInfo("Disconnecting inactive ghost user",
"user_id", xmppUser.GetID(),
"jid", xmppUser.GetJID(),
"last_activity", lastActivity,
"inactive_duration", inactiveDuration)
// Gracefully disconnect the inactive user
if err := xmppUser.Disconnect(); err != nil {
m.logger.LogWarn("Failed to disconnect inactive ghost user",
"user_id", xmppUser.GetID(),
"jid", xmppUser.GetJID(),
"error", err)
} else {
disconnectedCount++
// Remove disconnected user from cache to free memory
m.removeCachedUser(xmppUser.GetID())
m.logger.LogDebug("Successfully disconnected and removed inactive ghost user from cache",
"user_id", xmppUser.GetID(),
"jid", xmppUser.GetJID())
}
}
}
if inactiveCount > 0 {
m.logger.LogInfo("Completed inactive user cleanup cycle",
"cached_users_checked", len(cachedUsers),
"inactive_users_found", inactiveCount,
"users_disconnected", disconnectedCount)
} else {
m.logger.LogDebug("No inactive users found during cleanup cycle", "cached_users_checked", len(cachedUsers))
}
}
// createXMPPUserWithActivity creates an XMPP user with node-specific resource and activity data
func (m *UserManager) createXMPPUserWithActivity(id, displayName, userJID, password string, cfg *config.Configuration, log logger.Logger, lastActivity time.Time, enableLifecycle bool) *User {
// Generate node-specific resource to prevent conflicts in HA environments
baseResource := cfg.GetXMPPResource()
nodeSpecificResource := fmt.Sprintf("%s-node-%s", baseResource, m.nodeID[:8])
m.logger.LogDebug("Creating XMPP user with node-specific resource",
"user_id", id,
"base_resource", baseResource,
"node_resource", nodeSpecificResource,
"node_id", m.nodeID[:8])
// Create TLS config based on certificate verification setting
tlsConfig := &tls.Config{
InsecureSkipVerify: cfg.XMPPInsecureSkipVerify, //nolint:gosec // Allow insecure TLS for testing environments
}
// Create XMPP client for this user with provided credentials and node-specific resource
client := xmppClient.NewClientWithTLS(
cfg.XMPPServerURL,
userJID,
password, // Use the provided password (ghost password)
nodeSpecificResource, // Use node-specific resource instead of base resource
id, // Use user ID as remote ID
tlsConfig,
log,
)
ctx, cancel := context.WithCancel(context.Background())
return &User{
id: id,
displayName: displayName,
jid: userJID,
client: client,
state: model.UserStateOffline,
config: cfg,
ctx: ctx,
cancel: cancel,
logger: log,
lastActivity: lastActivity, // Use provided activity time
enableLifecycleCheck: enableLifecycle, // Use provided lifecycle setting
}
}
// Cache management methods for connection caching
// getCachedUser retrieves a user from the connection cache
func (m *UserManager) getCachedUser(mattermostUserID string) (*User, bool) {
m.activeUsersMu.RLock()
defer m.activeUsersMu.RUnlock()
user, exists := m.activeUsers[mattermostUserID]
return user, exists
}
// cacheUser stores a user in the connection cache
func (m *UserManager) cacheUser(mattermostUserID string, user *User) {
m.activeUsersMu.Lock()
defer m.activeUsersMu.Unlock()
m.activeUsers[mattermostUserID] = user
m.logger.LogDebug("Cached user connection",
"user_id", mattermostUserID,
"ghost_jid", user.GetJID(),
"cache_size", len(m.activeUsers))
}
// removeCachedUser removes a user from the connection cache
func (m *UserManager) removeCachedUser(mattermostUserID string) {
m.activeUsersMu.Lock()
defer m.activeUsersMu.Unlock()
if user, exists := m.activeUsers[mattermostUserID]; exists {
delete(m.activeUsers, mattermostUserID)
m.logger.LogDebug("Removed user from cache",
"user_id", mattermostUserID,
"ghost_jid", user.GetJID(),
"cache_size", len(m.activeUsers))
}
}
// getCachedUsers returns all cached users (for lifecycle management)
func (m *UserManager) getCachedUsers() []*User {
m.activeUsersMu.RLock()
defer m.activeUsersMu.RUnlock()
users := make([]*User, 0, len(m.activeUsers))
for _, user := range m.activeUsers {
users = append(users, user)
}
return users
}
func (m *UserManager) getCurrentTimestamp() int64 { func (m *UserManager) getCurrentTimestamp() int64 {
// TODO: Use proper time source (time.Now().Unix()) // TODO: Use proper time source (time.Now().Unix())
return 0 return 0

View file

@ -182,11 +182,6 @@ func (c *Client) GetInBandRegistration() (*InBandRegistration, error) {
return c.XEPFeatures.InBandRegistration, nil return c.XEPFeatures.InBandRegistration, nil
} }
// DetectServerCapabilities discovers which XEPs are supported by the server (public method)
func (c *Client) DetectServerCapabilities() error {
return c.detectServerCapabilities()
}
// detectServerCapabilities discovers which XEPs are supported by the server // detectServerCapabilities discovers which XEPs are supported by the server
func (c *Client) detectServerCapabilities() error { func (c *Client) detectServerCapabilities() error {
if c.session == nil { if c.session == nil {
@ -360,6 +355,13 @@ func (c *Client) Connect() error {
return fmt.Errorf("failed to start session serving") return fmt.Errorf("failed to start session serving")
} }
c.logger.LogInfo("XMPP client connected successfully", "jid", c.jidAddr.String()) c.logger.LogInfo("XMPP client connected successfully", "jid", c.jidAddr.String())
// Automatically detect server capabilities after successful connection
if err := c.detectServerCapabilities(); err != nil {
c.logger.LogError("Failed to detect server capabilities automatically", "error", err)
// Don't fail the connection for capability detection issues
}
return nil return nil
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
return fmt.Errorf("timeout waiting for session to be ready") return fmt.Errorf("timeout waiting for session to be ready")

View file

@ -2,6 +2,7 @@
package xmpp package xmpp
import ( import (
"bytes"
"context" "context"
"encoding/xml" "encoding/xml"
"fmt" "fmt"
@ -25,8 +26,8 @@ type InBandRegistration struct {
enabled bool enabled bool
} }
// RegistrationQuery represents the <query xmlns='jabber:iq:register'> element // InBandRegistrationQuery represents the <query xmlns='jabber:iq:register'> element
type RegistrationQuery struct { type InBandRegistrationQuery struct {
XMLName xml.Name `xml:"jabber:iq:register query"` XMLName xml.Name `xml:"jabber:iq:register query"`
Instructions string `xml:"instructions,omitempty"` Instructions string `xml:"instructions,omitempty"`
Username string `xml:"username,omitempty"` Username string `xml:"username,omitempty"`
@ -65,8 +66,13 @@ type RegistrationRequest struct {
AdditionalFields map[string]string `json:"additional_fields,omitempty"` AdditionalFields map[string]string `json:"additional_fields,omitempty"`
} }
// RegistrationResponse represents the result of a registration operation // CancellationRequest represents a request to cancel/remove a user registration
type RegistrationResponse struct { type CancellationRequest struct {
Username string `json:"username"`
}
// InBandRegistrationResponse represents the result of any XEP-0077 In-Band Registration operation
type InBandRegistrationResponse struct {
Success bool `json:"success"` Success bool `json:"success"`
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
Message string `json:"message,omitempty"` Message string `json:"message,omitempty"`
@ -114,7 +120,7 @@ func (r *InBandRegistration) GetRegistrationFields(serverJID jid.JID) (*Registra
To: serverJID, To: serverJID,
} }
query := RegistrationQuery{} query := InBandRegistrationQuery{}
ctx, cancel := context.WithTimeout(r.client.ctx, 10*time.Second) ctx, cancel := context.WithTimeout(r.client.ctx, 10*time.Second)
defer cancel() defer cancel()
@ -138,7 +144,7 @@ func (r *InBandRegistration) GetRegistrationFields(serverJID jid.JID) (*Registra
// Create the IQ with query payload // Create the IQ with query payload
iqWithQuery := struct { iqWithQuery := struct {
stanza.IQ stanza.IQ
Query RegistrationQuery `xml:"jabber:iq:register query"` Query InBandRegistrationQuery `xml:"jabber:iq:register query"`
}{ }{
IQ: iq, IQ: iq,
Query: query, Query: query,
@ -162,13 +168,13 @@ func (r *InBandRegistration) GetRegistrationFields(serverJID jid.JID) (*Registra
} }
// RegisterAccount registers a new account with the server // RegisterAccount registers a new account with the server
func (r *InBandRegistration) RegisterAccount(serverJID jid.JID, request *RegistrationRequest) (*RegistrationResponse, error) { func (r *InBandRegistration) RegisterAccount(serverJID jid.JID, request *RegistrationRequest) (*InBandRegistrationResponse, error) {
if r.client.session == nil { if r.client.session == nil {
return nil, fmt.Errorf("XMPP session not established") return nil, fmt.Errorf("XMPP session not established")
} }
if request.Username == "" || request.Password == "" { if request.Username == "" || request.Password == "" {
return &RegistrationResponse{ return &InBandRegistrationResponse{
Success: false, Success: false,
Error: "username and password are required", Error: "username and password are required",
}, nil }, nil
@ -180,7 +186,7 @@ func (r *InBandRegistration) RegisterAccount(serverJID jid.JID, request *Registr
To: serverJID, To: serverJID,
} }
query := RegistrationQuery{ query := InBandRegistrationQuery{
Username: request.Username, Username: request.Username,
Password: request.Password, Password: request.Password,
Email: request.Email, Email: request.Email,
@ -207,57 +213,66 @@ func (r *InBandRegistration) RegisterAccount(serverJID jid.JID, request *Registr
r.logger.LogInfo("Registering new account", "server", serverJID.String(), "username", request.Username) r.logger.LogInfo("Registering new account", "server", serverJID.String(), "username", request.Username)
// Create response channels // Create a buffer to encode the query payload
responseChannel := make(chan *RegistrationResponse, 1) var queryBuf bytes.Buffer
encoder := xml.NewEncoder(&queryBuf)
// Store response handler temporarily if err := encoder.Encode(query); err != nil {
go func() { return &InBandRegistrationResponse{
// This is a simplified approach - in practice you'd want proper IQ response handling Success: false,
response := &RegistrationResponse{ Error: fmt.Sprintf("failed to encode registration query: %v", err),
Success: true, }, nil
Message: "Account registered successfully",
}
responseChannel <- response
}()
// Create the IQ with query payload
iqWithQuery := struct {
stanza.IQ
Query RegistrationQuery `xml:"jabber:iq:register query"`
}{
IQ: iq,
Query: query,
} }
encoder.Flush()
// Encode and send the registration IQ // Create TokenReader from the encoded query by using xml.NewDecoder
if err := r.client.session.Encode(ctx, iqWithQuery); err != nil { payloadReader := xml.NewDecoder(bytes.NewReader(queryBuf.Bytes()))
return &RegistrationResponse{
// Send the registration IQ and wait for response
response, err := r.client.session.SendIQElement(ctx, payloadReader, iq)
if err != nil {
return &InBandRegistrationResponse{
Success: false, Success: false,
Error: fmt.Sprintf("failed to send registration request: %v", err), Error: fmt.Sprintf("failed to send registration request: %v", err),
}, nil }, nil
} }
// Wait for response // Try to unmarshal the response as an error IQ first
select { responseIQ, err := stanza.UnmarshalIQError(response, xml.StartElement{})
case response := <-responseChannel: registrationResponse := &InBandRegistrationResponse{}
r.logger.LogInfo("Account registration completed", "server", serverJID.String(), "username", request.Username, "success", response.Success) response.Close()
return response, nil
case <-ctx.Done(): if err != nil {
return &RegistrationResponse{ // If we can't parse the response, treat it as a failure and log the parse error
Success: false, registrationResponse.Success = false
Error: fmt.Sprintf("timeout registering account with %s", serverJID.String()), registrationResponse.Error = "Failed to parse server response for registration request"
}, nil r.logger.LogWarn("Registration response could not be parsed, treating as failure",
"server", serverJID.String(),
"username", request.Username,
"parse_error", err.Error())
} else {
// Successfully unmarshaled - check IQ type
if responseIQ.Type == stanza.ErrorIQ {
registrationResponse.Success = false
registrationResponse.Error = "Server returned error for registration request"
r.logger.LogWarn("Registration failed with server error", "server", serverJID.String(), "username", request.Username, "iq_type", responseIQ.Type)
} else {
registrationResponse.Success = true
registrationResponse.Message = "Account registration completed successfully"
}
} }
r.logger.LogInfo("Account registration completed", "server", serverJID.String(), "username", request.Username, "success", registrationResponse.Success)
return registrationResponse, nil
} }
// ChangePassword changes the password for an existing account // ChangePassword changes the password for an existing account
func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPassword, newPassword string) (*RegistrationResponse, error) { func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPassword, newPassword string) (*InBandRegistrationResponse, error) {
if r.client.session == nil { if r.client.session == nil {
return nil, fmt.Errorf("XMPP session not established") return nil, fmt.Errorf("XMPP session not established")
} }
if username == "" || oldPassword == "" || newPassword == "" { if username == "" || oldPassword == "" || newPassword == "" {
return &RegistrationResponse{ return &InBandRegistrationResponse{
Success: false, Success: false,
Error: "username, old password, and new password are required", Error: "username, old password, and new password are required",
}, nil }, nil
@ -269,7 +284,7 @@ func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPass
To: serverJID, To: serverJID,
} }
query := RegistrationQuery{ query := InBandRegistrationQuery{
Username: username, Username: username,
Password: newPassword, Password: newPassword,
} }
@ -282,7 +297,7 @@ func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPass
// Create the IQ with query payload // Create the IQ with query payload
iqWithQuery := struct { iqWithQuery := struct {
stanza.IQ stanza.IQ
Query RegistrationQuery `xml:"jabber:iq:register query"` Query InBandRegistrationQuery `xml:"jabber:iq:register query"`
}{ }{
IQ: iq, IQ: iq,
Query: query, Query: query,
@ -290,14 +305,14 @@ func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPass
// Send the password change IQ // Send the password change IQ
if err := r.client.session.Encode(ctx, iqWithQuery); err != nil { if err := r.client.session.Encode(ctx, iqWithQuery); err != nil {
return &RegistrationResponse{ return &InBandRegistrationResponse{
Success: false, Success: false,
Error: fmt.Sprintf("failed to send password change request: %v", err), Error: fmt.Sprintf("failed to send password change request: %v", err),
}, nil }, nil
} }
// In practice, you'd wait for the IQ response here // In practice, you'd wait for the IQ response here
response := &RegistrationResponse{ response := &InBandRegistrationResponse{
Success: true, Success: true,
Message: "Password changed successfully", Message: "Password changed successfully",
} }
@ -306,50 +321,83 @@ func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPass
return response, nil return response, nil
} }
// CancelRegistration cancels/removes an existing registration // CancelRegistration cancels/removes an existing registration for the specified user
func (r *InBandRegistration) CancelRegistration(serverJID jid.JID) (*RegistrationResponse, error) { func (r *InBandRegistration) CancelRegistration(serverJID jid.JID, request *CancellationRequest) (*InBandRegistrationResponse, error) {
if r.client.session == nil { if r.client.session == nil {
return nil, fmt.Errorf("XMPP session not established") return nil, fmt.Errorf("XMPP session not established")
} }
if request.Username == "" {
return &InBandRegistrationResponse{
Success: false,
Error: "username is required",
}, nil
}
// Create cancellation IQ // Create cancellation IQ
iq := stanza.IQ{ iq := stanza.IQ{
Type: stanza.SetIQ, Type: stanza.SetIQ,
To: serverJID, To: serverJID,
} }
query := RegistrationQuery{ query := InBandRegistrationQuery{
Remove: &struct{}{}, // Empty struct indicates removal Username: request.Username, // Specify which user to remove
Remove: &struct{}{}, // Removal flag
} }
ctx, cancel := context.WithTimeout(r.client.ctx, 10*time.Second) ctx, cancel := context.WithTimeout(r.client.ctx, 10*time.Second)
defer cancel() defer cancel()
r.logger.LogInfo("Cancelling registration", "server", serverJID.String()) r.logger.LogInfo("Cancelling registration", "server", serverJID.String(), "username", request.Username)
// Create the IQ with query payload // Create a buffer to encode the query payload
iqWithQuery := struct { var queryBuf bytes.Buffer
stanza.IQ encoder := xml.NewEncoder(&queryBuf)
Query RegistrationQuery `xml:"jabber:iq:register query"` if err := encoder.Encode(query); err != nil {
}{ return &InBandRegistrationResponse{
IQ: iq, Success: false,
Query: query, Error: fmt.Sprintf("failed to encode cancellation query: %v", err),
}, nil
} }
encoder.Flush()
// Send the cancellation IQ // Create TokenReader from the encoded query
if err := r.client.session.Encode(ctx, iqWithQuery); err != nil { payloadReader := xml.NewDecoder(bytes.NewReader(queryBuf.Bytes()))
return &RegistrationResponse{
// Send the cancellation IQ and wait for response
response, err := r.client.session.SendIQElement(ctx, payloadReader, iq)
if err != nil {
return &InBandRegistrationResponse{
Success: false, Success: false,
Error: fmt.Sprintf("failed to send registration cancellation request: %v", err), Error: fmt.Sprintf("failed to send registration cancellation request: %v", err),
}, nil }, nil
} }
// In practice, you'd wait for the IQ response here // Try to unmarshal the response as an error IQ first
response := &RegistrationResponse{ responseIQ, err := stanza.UnmarshalIQError(response, xml.StartElement{})
Success: true, cancellationResponse := &InBandRegistrationResponse{}
Message: "Registration cancelled successfully", response.Close()
if err != nil {
// If we can't parse the response, treat it as a failure and log the parse error
cancellationResponse.Success = false
cancellationResponse.Error = "Failed to parse server response for cancellation request"
r.logger.LogWarn("Cancellation response could not be parsed, treating as failure",
"server", serverJID.String(),
"username", request.Username,
"parse_error", err.Error())
} else {
// Successfully unmarshaled - check IQ type
if responseIQ.Type == stanza.ErrorIQ {
cancellationResponse.Success = false
cancellationResponse.Error = "Server returned error for cancellation request"
r.logger.LogWarn("Registration cancellation failed with server error", "server", serverJID.String(), "username", request.Username, "iq_type", responseIQ.Type)
} else {
cancellationResponse.Success = true
cancellationResponse.Message = "Registration cancelled successfully"
}
} }
r.logger.LogInfo("Registration cancellation completed", "server", serverJID.String()) r.logger.LogInfo("Registration cancellation completed", "server", serverJID.String(), "username", request.Username, "success", cancellationResponse.Success)
return response, nil return cancellationResponse, nil
} }

View file

@ -14,43 +14,7 @@ services:
- "7777:7777" # File transfer proxy - "7777:7777" # File transfer proxy
volumes: volumes:
- openfire_data:/var/lib/openfire - openfire_data:/var/lib/openfire
depends_on:
- postgres
networks:
- openfire-network
postgres:
image: postgres:15
container_name: openfire-postgres
restart: unless-stopped
environment:
- POSTGRES_DB=openfire
- POSTGRES_USER=openfire
- POSTGRES_PASSWORD=openfire123
volumes:
- postgres_data:/var/lib/postgresql/data
# ports:
# - "5432:5432" # Exposed for development access
networks:
- openfire-network
adminer:
image: adminer:latest
container_name: openfire-adminer
restart: unless-stopped
# ports:
# - "8080:8080" # Database management interface
depends_on:
- postgres
networks:
- openfire-network
volumes: volumes:
openfire_data: openfire_data:
driver: local driver: local
postgres_data:
driver: local
networks:
openfire-network:
driver: bridge