Compare commits

..

No commits in common. "0442bd7b72f0077eb9927b95427ea2c18e066e6e" and "f31b80d92bb4fff52ae533a0d677570e67c87b2a" have entirely different histories.

12 changed files with 474 additions and 764 deletions

View file

@ -9,8 +9,8 @@ This plugin provides bidirectional message synchronization between Mattermost an
- Bidirectional message synchronization (Mattermost ↔ XMPP)
- XMPP Multi-User Chat (MUC) support
- Configurable username prefixes for XMPP users
- Ghost user management for cross-platform user representation on the XMPP server with connection lifecycle management (**XEP-0077 only**)
- Configurable username prefixes for XMPP users in Mattermost
- Ghost user management for cross-platform user representation
- Comprehensive XMPP client with SASL Plain authentication
To learn more about plugins, see [our plugin documentation](https://developers.mattermost.com/extend/plugins/).
@ -72,7 +72,7 @@ To avoid having to manually install your plugin, build and deploy your plugin us
}
```
### Development guidance
### Development guidance
1. Fewer packages is better: default to the main package unless there's good reason for a new package.
@ -84,7 +84,7 @@ To avoid having to manually install your plugin, build and deploy your plugin us
### Modifying the server boilerplate
The server code comes with some boilerplate for creating an api, using slash commands, accessing the kvstore and using the cluster package for jobs.
The server code comes with some boilerplate for creating an api, using slash commands, accessing the kvstore and using the cluster package for jobs.
#### Api

View file

@ -4,20 +4,18 @@ import (
"crypto/tls"
"flag"
"fmt"
"log/slog"
"log"
"os"
"time"
"github.com/lmittmann/tint"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp"
)
const (
// Default values for development server (sidecar)
defaultServer = "localhost:5222"
defaultUsername = "admin@localhost"
defaultPassword = "admin"
defaultUsername = "testuser@localhost"
defaultPassword = "testpass"
defaultResource = "doctor"
defaultTestRoom = "test1@conference.localhost"
)
@ -48,8 +46,8 @@ func main() {
flag.BoolVar(&config.TestMUC, "test-muc", true, "Enable MUC room testing (join/wait/leave)")
flag.BoolVar(&config.TestDirectMessage, "test-dm", true, "Enable direct message testing (send message to admin user)")
flag.BoolVar(&config.TestRoomExists, "test-room-exists", true, "Enable room existence testing using disco#info")
flag.BoolVar(&config.TestXEP0077, "test-xep0077", true, "Enable XEP-0077 In-Band Registration testing with ghost user message test")
flag.BoolVar(&config.Verbose, "verbose", false, "Enable verbose logging")
flag.BoolVar(&config.TestXEP0077, "test-xep0077", true, "Enable XEP-0077 In-Band Registration testing (required if enabled)")
flag.BoolVar(&config.Verbose, "verbose", true, "Enable verbose logging")
flag.BoolVar(&config.InsecureSkipVerify, "insecure-skip-verify", true, "Skip TLS certificate verification (for development)")
flag.Usage = func() {
@ -73,44 +71,66 @@ func main() {
flag.Parse()
// Create the main logger
mainLogger := NewStructuredLogger(config.Verbose)
mainLogger.LogInfo("Starting XMPP client doctor")
mainLogger.LogInfo("Configuration",
"server", config.Server,
"username", config.Username,
"resource", config.Resource,
"password", maskPassword(config.Password),
"test_room", config.TestRoom,
"test_muc", config.TestMUC,
"test_direct_message", config.TestDirectMessage,
"test_room_exists", config.TestRoomExists,
"test_xep0077", config.TestXEP0077)
// Test the XMPP client
if err := testXMPPClient(config, mainLogger); err != nil {
mainLogger.LogError("XMPP client test failed", "error", err)
os.Exit(1)
if config.Verbose {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
log.Printf("Starting XMPP client doctor...")
log.Printf("Configuration:")
log.Printf(" Server: %s", config.Server)
log.Printf(" Username: %s", config.Username)
log.Printf(" Resource: %s", config.Resource)
log.Printf(" Password: %s", maskPassword(config.Password))
if config.TestMUC {
log.Printf(" Test Room: %s", config.TestRoom)
}
if config.TestDirectMessage {
log.Printf(" Test Direct Messages: enabled")
}
if config.TestRoomExists {
log.Printf(" Test Room Existence: enabled")
}
if config.TestXEP0077 {
log.Printf(" Test XEP-0077 In-Band Registration: enabled")
}
}
mainLogger.LogInfo("XMPP client test completed successfully",
"xep0077_test", config.TestXEP0077,
"muc_test", config.TestMUC,
"direct_message_test", config.TestDirectMessage,
"room_exists_test", config.TestRoomExists)
// Test the XMPP client
if err := testXMPPClient(config); err != nil {
log.Fatalf("❌ XMPP client test failed: %v", err)
}
if config.Verbose {
log.Printf("✅ XMPP client test completed successfully!")
} else {
fmt.Println("✅ XMPP client connectivity test passed!")
if config.TestXEP0077 {
fmt.Println("✅ XMPP XEP-0077 In-Band Registration test passed!")
}
if config.TestMUC {
fmt.Println("✅ XMPP MUC operations test passed!")
}
if config.TestDirectMessage {
fmt.Println("✅ XMPP direct message test passed!")
}
if config.TestRoomExists {
fmt.Println("✅ XMPP room existence test passed!")
}
}
}
func testXMPPClient(config *Config, logger *StructuredLogger) error {
logger.LogDebug("Creating XMPP client")
func testXMPPClient(config *Config) error {
if config.Verbose {
log.Printf("Creating XMPP client...")
}
// Create a structured logger for the XMPP client (reuse the passed logger)
doctorLogger := logger
// Create a simple logger for the XMPP client
doctorLogger := &SimpleLogger{verbose: config.Verbose}
// Create XMPP client with optional TLS configuration
var client *xmpp.Client
if config.InsecureSkipVerify {
logger.LogDebug("Using insecure TLS configuration", "skip_verify", true)
if config.Verbose {
log.Printf("Using insecure TLS configuration (skipping certificate verification)")
}
tlsConfig := &tls.Config{
InsecureSkipVerify: true, //nolint:gosec // This is a testing tool for development environments
}
@ -134,7 +154,9 @@ func testXMPPClient(config *Config, logger *StructuredLogger) error {
)
}
logger.LogDebug("Attempting to connect to XMPP server", "server", config.Server)
if config.Verbose {
log.Printf("Attempting to connect to XMPP server...")
}
// Test connection
start := time.Now()
@ -144,8 +166,10 @@ func testXMPPClient(config *Config, logger *StructuredLogger) error {
}
connectDuration := time.Since(start)
logger.LogInfo("Connected to XMPP server", "duration", connectDuration)
logger.LogDebug("Testing connection health")
if config.Verbose {
log.Printf("✅ Connected to XMPP server in %v", connectDuration)
log.Printf("Testing connection health...")
}
// Test connection health
start = time.Now()
@ -155,7 +179,9 @@ func testXMPPClient(config *Config, logger *StructuredLogger) error {
}
pingDuration := time.Since(start)
logger.LogInfo("Connection health test passed", "duration", pingDuration)
if config.Verbose {
log.Printf("✅ Connection health test passed in %v", pingDuration)
}
var xep0077Duration time.Duration
var mucDuration time.Duration
@ -165,7 +191,7 @@ func testXMPPClient(config *Config, logger *StructuredLogger) error {
// Test XEP-0077 In-Band Registration if requested
if config.TestXEP0077 {
start = time.Now()
err = testXEP0077(client, config, logger)
err = testXEP0077(client, config)
if err != nil {
return fmt.Errorf("XEP-0077 In-Band Registration test failed: %w", err)
}
@ -175,7 +201,7 @@ func testXMPPClient(config *Config, logger *StructuredLogger) error {
// Test MUC operations if requested
if config.TestMUC {
start = time.Now()
err = testMUCOperations(client, config, logger)
err = testMUCOperations(client, config)
if err != nil {
return fmt.Errorf("MUC operations test failed: %w", err)
}
@ -185,7 +211,7 @@ func testXMPPClient(config *Config, logger *StructuredLogger) error {
// Test direct message if requested
if config.TestDirectMessage {
start = time.Now()
err = testDirectMessage(client, config, logger)
err = testDirectMessage(client, config)
if err != nil {
return fmt.Errorf("direct message test failed: %w", err)
}
@ -195,14 +221,16 @@ func testXMPPClient(config *Config, logger *StructuredLogger) error {
// Test room existence if requested
if config.TestRoomExists {
start = time.Now()
err = testRoomExists(client, config, logger)
err = testRoomExists(client, config)
if err != nil {
return fmt.Errorf("room existence test failed: %w", err)
}
roomExistsDuration = time.Since(start)
}
logger.LogDebug("Disconnecting from XMPP server")
if config.Verbose {
log.Printf("Disconnecting from XMPP server...")
}
// Disconnect
start = time.Now()
@ -212,37 +240,48 @@ func testXMPPClient(config *Config, logger *StructuredLogger) error {
}
disconnectDuration := time.Since(start)
totalTime := connectDuration + pingDuration + disconnectDuration
if config.TestXEP0077 {
totalTime += xep0077Duration
if config.Verbose {
log.Printf("✅ Disconnected from XMPP server in %v", disconnectDuration)
log.Printf("Connection summary:")
log.Printf(" Connect time: %v", connectDuration)
log.Printf(" Ping time: %v", pingDuration)
if config.TestXEP0077 {
log.Printf(" XEP-0077 test time: %v", xep0077Duration)
}
if config.TestMUC {
log.Printf(" MUC operations time: %v", mucDuration)
}
if config.TestDirectMessage {
log.Printf(" Direct message time: %v", dmDuration)
}
if config.TestRoomExists {
log.Printf(" Room existence check time: %v", roomExistsDuration)
}
log.Printf(" Disconnect time: %v", disconnectDuration)
totalTime := connectDuration + pingDuration + disconnectDuration
if config.TestXEP0077 {
totalTime += xep0077Duration
}
if config.TestMUC {
totalTime += mucDuration
}
if config.TestDirectMessage {
totalTime += dmDuration
}
if config.TestRoomExists {
totalTime += roomExistsDuration
}
log.Printf(" Total time: %v", totalTime)
}
if config.TestMUC {
totalTime += mucDuration
}
if config.TestDirectMessage {
totalTime += dmDuration
}
if config.TestRoomExists {
totalTime += roomExistsDuration
}
logger.LogInfo("Disconnected from XMPP server", "disconnect_duration", disconnectDuration)
logger.LogInfo("Connection summary",
"connect_time", connectDuration,
"ping_time", pingDuration,
"xep0077_time", xep0077Duration,
"muc_time", mucDuration,
"direct_message_time", dmDuration,
"room_exists_time", roomExistsDuration,
"disconnect_time", disconnectDuration,
"total_time", totalTime)
return nil
}
func testMUCOperations(client *xmpp.Client, config *Config, logger *StructuredLogger) error {
logger.LogInfo("Testing MUC operations", "room", config.TestRoom)
logger.LogDebug("Checking if room exists first")
func testMUCOperations(client *xmpp.Client, config *Config) error {
if config.Verbose {
log.Printf("Testing MUC operations with room: %s", config.TestRoom)
log.Printf("First checking if room exists...")
}
// Check if room exists before attempting to join
start := time.Now()
@ -252,13 +291,18 @@ func testMUCOperations(client *xmpp.Client, config *Config, logger *StructuredLo
}
checkDuration := time.Since(start)
logger.LogInfo("Room existence check completed", "duration", checkDuration, "room", config.TestRoom, "exists", exists)
if config.Verbose {
log.Printf("✅ Room existence check completed in %v", checkDuration)
log.Printf("Room %s exists: %t", config.TestRoom, exists)
}
if !exists {
return fmt.Errorf("cannot test MUC operations: room %s does not exist or is not accessible", config.TestRoom)
}
logger.LogDebug("Room exists, proceeding to join")
if config.Verbose {
log.Printf("Room exists, proceeding to join...")
}
// Test joining the room
start = time.Now()
@ -270,8 +314,10 @@ func testMUCOperations(client *xmpp.Client, config *Config, logger *StructuredLo
var sendDuration time.Duration
logger.LogInfo("Successfully joined MUC room", "duration", joinDuration)
logger.LogDebug("Sending test message to room")
if config.Verbose {
log.Printf("✅ Successfully joined MUC room in %v", joinDuration)
log.Printf("Sending test message to room...")
}
// Send a test message
testMessage := fmt.Sprintf("Test message from XMPP doctor at %s", time.Now().Format("15:04:05"))
@ -287,13 +333,18 @@ func testMUCOperations(client *xmpp.Client, config *Config, logger *StructuredLo
}
sendDuration = time.Since(start)
logger.LogInfo("Successfully sent message", "duration", sendDuration, "message", testMessage)
logger.LogDebug("Waiting 5 seconds in the room")
if config.Verbose {
log.Printf("✅ Successfully sent message in %v", sendDuration)
log.Printf("Message: %s", testMessage)
log.Printf("Waiting 5 seconds in the room...")
}
// Wait 5 seconds
time.Sleep(5 * time.Second)
logger.LogDebug("Attempting to leave MUC room")
if config.Verbose {
log.Printf("Attempting to leave MUC room...")
}
// Test leaving the room
start = time.Now()
@ -303,21 +354,25 @@ func testMUCOperations(client *xmpp.Client, config *Config, logger *StructuredLo
}
leaveDuration := time.Since(start)
logger.LogInfo("Successfully left MUC room", "duration", leaveDuration)
logger.LogInfo("MUC operations summary",
"room_check_time", checkDuration,
"join_time", joinDuration,
"send_time", sendDuration,
"wait_time", "5s",
"leave_time", leaveDuration,
"total_time", checkDuration+joinDuration+sendDuration+5*time.Second+leaveDuration)
if config.Verbose {
log.Printf("✅ Successfully left MUC room in %v", leaveDuration)
log.Printf("MUC operations summary:")
log.Printf(" Room existence check time: %v", checkDuration)
log.Printf(" Join time: %v", joinDuration)
log.Printf(" Send message time: %v", sendDuration)
log.Printf(" Wait time: 5s")
log.Printf(" Leave time: %v", leaveDuration)
log.Printf(" Total MUC time: %v", checkDuration+joinDuration+sendDuration+5*time.Second+leaveDuration)
}
return nil
}
func testDirectMessage(client *xmpp.Client, config *Config, logger *StructuredLogger) error {
logger.LogInfo("Testing direct message functionality")
logger.LogDebug("Sending test message to admin user")
func testDirectMessage(client *xmpp.Client, config *Config) error {
if config.Verbose {
log.Printf("Testing direct message functionality...")
log.Printf("Sending test message to admin user...")
}
// Send a test message to the admin user
testMessage := fmt.Sprintf("Test direct message from XMPP doctor at %s", time.Now().Format("15:04:05"))
@ -330,18 +385,22 @@ func testDirectMessage(client *xmpp.Client, config *Config, logger *StructuredLo
}
sendDuration := time.Since(start)
logger.LogInfo("Successfully sent direct message",
"duration", sendDuration,
"message", testMessage,
"recipient", adminJID)
logger.LogInfo("Direct message test summary", "send_time", sendDuration)
if config.Verbose {
log.Printf("✅ Successfully sent direct message in %v", sendDuration)
log.Printf("Message: %s", testMessage)
log.Printf("Recipient: %s", adminJID)
log.Printf("Direct message test summary:")
log.Printf(" Send message time: %v", sendDuration)
}
return nil
}
func testRoomExists(client *xmpp.Client, config *Config, logger *StructuredLogger) error {
logger.LogInfo("Testing room existence functionality")
logger.LogDebug("Checking if test room exists", "room", config.TestRoom)
func testRoomExists(client *xmpp.Client, config *Config) error {
if config.Verbose {
log.Printf("Testing room existence functionality...")
log.Printf("Checking if test room exists: %s", config.TestRoom)
}
// Test room existence check
start := time.Now()
@ -351,11 +410,16 @@ func testRoomExists(client *xmpp.Client, config *Config, logger *StructuredLogge
}
checkDuration := time.Since(start)
logger.LogInfo("Room existence check completed", "duration", checkDuration, "room", config.TestRoom, "exists", exists)
if config.Verbose {
log.Printf("✅ Room existence check completed in %v", checkDuration)
log.Printf("Room %s exists: %t", config.TestRoom, exists)
}
// Test with a non-existent room to verify negative case
nonExistentRoom := "nonexistent-room-12345@conference.localhost"
logger.LogDebug("Testing negative case with non-existent room", "room", nonExistentRoom)
if config.Verbose {
log.Printf("Testing negative case with non-existent room: %s", nonExistentRoom)
}
start = time.Now()
existsNegative, err := client.CheckRoomExists(nonExistentRoom)
@ -364,15 +428,14 @@ func testRoomExists(client *xmpp.Client, config *Config, logger *StructuredLogge
}
checkNegativeDuration := time.Since(start)
logger.LogInfo("Negative room existence check completed",
"duration", checkNegativeDuration,
"room", nonExistentRoom,
"exists", existsNegative,
"expected_false", true)
logger.LogInfo("Room existence test summary",
"test_room_time", checkDuration,
"negative_case_time", checkNegativeDuration,
"total_time", checkDuration+checkNegativeDuration)
if config.Verbose {
log.Printf("✅ Negative room existence check completed in %v", checkNegativeDuration)
log.Printf("Non-existent room %s exists: %t (should be false)", nonExistentRoom, existsNegative)
log.Printf("Room existence test summary:")
log.Printf(" Test room check time: %v", checkDuration)
log.Printf(" Negative case check time: %v", checkNegativeDuration)
log.Printf(" Total room existence test time: %v", checkDuration+checkNegativeDuration)
}
return nil
}
@ -384,60 +447,44 @@ func maskPassword(password string) string {
return password[:2] + "****"
}
// StructuredLogger provides structured logging functionality for the doctor command using slog
type StructuredLogger struct {
logger *slog.Logger
// SimpleLogger provides basic logging functionality for the doctor command
type SimpleLogger struct {
verbose bool
}
// NewStructuredLogger creates a new structured logger with colorized output
func NewStructuredLogger(verbose bool) *StructuredLogger {
// Configure log level based on verbose flag
level := slog.LevelInfo
if verbose {
level = slog.LevelDebug
}
// Create tinted handler for colorized output
handler := tint.NewHandler(os.Stdout, &tint.Options{
Level: level,
TimeFormat: "15:04:05.000", // More concise time format
AddSource: false, // Don't show source file info
})
// Create logger with the handler
logger := slog.New(handler)
return &StructuredLogger{
logger: logger,
verbose: verbose,
// LogDebug logs debug messages if verbose mode is enabled
func (l *SimpleLogger) LogDebug(msg string, args ...interface{}) {
if l.verbose {
log.Printf("[DEBUG] "+msg, args...)
}
}
// LogDebug logs debug messages with structured key-value pairs
func (l *StructuredLogger) LogDebug(msg string, keyValuePairs ...any) {
l.logger.Debug(msg, keyValuePairs...)
// LogInfo logs info messages
func (l *SimpleLogger) LogInfo(msg string, args ...interface{}) {
log.Printf("[INFO] "+msg, args...)
}
// LogInfo logs info messages with structured key-value pairs
func (l *StructuredLogger) LogInfo(msg string, keyValuePairs ...any) {
l.logger.Info(msg, keyValuePairs...)
// LogWarn logs warning messages
func (l *SimpleLogger) LogWarn(msg string, args ...interface{}) {
log.Printf("[WARN] "+msg, args...)
}
// LogWarn logs warning messages with structured key-value pairs
func (l *StructuredLogger) LogWarn(msg string, keyValuePairs ...any) {
l.logger.Warn(msg, keyValuePairs...)
// LogError logs error messages
func (l *SimpleLogger) LogError(msg string, args ...interface{}) {
log.Printf("[ERROR] "+msg, args...)
}
// LogError logs error messages with structured key-value pairs
func (l *StructuredLogger) LogError(msg string, keyValuePairs ...any) {
l.logger.Error(msg, keyValuePairs...)
}
// testXEP0077 tests XEP-0077 In-Band Registration functionality by creating and deleting a test user
func testXEP0077(client *xmpp.Client, config *Config) error {
if config.Verbose {
log.Printf("Testing XEP-0077 In-Band Registration functionality...")
}
// testXEP0077 tests XEP-0077 In-Band Registration by creating a ghost user and testing message sending
func testXEP0077(client *xmpp.Client, config *Config, logger *StructuredLogger) error {
logger.LogInfo("Testing XEP-0077 In-Band Registration with ghost user messaging")
// First, wait for server capability detection to complete
// This is handled asynchronously in the client Connect method
time.Sleep(2 * time.Second)
// Check if server supports XEP-0077
inBandReg, err := client.GetInBandRegistration()
if err != nil {
return fmt.Errorf("server does not support XEP-0077 In-Band Registration: %w", err)
@ -447,95 +494,99 @@ func testXEP0077(client *xmpp.Client, config *Config, logger *StructuredLogger)
return fmt.Errorf("XEP-0077 In-Band Registration is not enabled on this server")
}
if config.Verbose {
log.Printf("✅ Server supports XEP-0077 In-Band Registration")
}
serverJID := client.GetJID().Domain()
// Step 1: Create ghost user with admin client
ghostUsername := fmt.Sprintf("ghost_test_%d", time.Now().Unix())
ghostPassword := "testpass123"
ghostJID := fmt.Sprintf("%s@%s", ghostUsername, serverJID.String())
logger.LogInfo("Creating ghost user", "username", ghostUsername)
ghostRegistrationRequest := &xmpp.RegistrationRequest{
Username: ghostUsername,
Password: ghostPassword,
Email: fmt.Sprintf("%s@localhost", ghostUsername),
// Step 1: Test registration fields discovery
start := time.Now()
if config.Verbose {
log.Printf("Testing registration fields discovery for server: %s", serverJID.String())
}
ghostRegResponse, err := inBandReg.RegisterAccount(serverJID, ghostRegistrationRequest)
fields, err := inBandReg.GetRegistrationFields(serverJID)
if err != nil {
return fmt.Errorf("failed to register ghost user: %w", err)
return fmt.Errorf("failed to get registration fields from server: %w", err)
}
if !ghostRegResponse.Success {
return fmt.Errorf("ghost user registration failed: %s", ghostRegResponse.Error)
fieldsDuration := time.Since(start)
if config.Verbose {
log.Printf("✅ Registration fields discovery completed in %v", fieldsDuration)
log.Printf("Registration fields: required=%v, available=%d", fields.Required, len(fields.Fields))
}
logger.LogInfo("Ghost user created successfully", "username", ghostUsername)
// Step 2: Create test user
testUsername := fmt.Sprintf("xmpptest%d", time.Now().Unix())
testPassword := "testpass123"
testEmail := fmt.Sprintf("%s@localhost", testUsername)
// Step 2-7: Use ghost client for all operations
var ghostClient *xmpp.Client
if config.InsecureSkipVerify {
tlsConfig := &tls.Config{InsecureSkipVerify: true} //nolint:gosec // Testing tool
ghostClient = xmpp.NewClientWithTLS(config.Server, ghostJID, ghostPassword, "ghost_doctor", "ghost-remote-id", tlsConfig, logger)
if config.Verbose {
log.Printf("Creating test user: %s", testUsername)
}
registrationRequest := &xmpp.RegistrationRequest{
Username: testUsername,
Password: testPassword,
Email: testEmail,
}
start = time.Now()
regResponse, err := inBandReg.RegisterAccount(serverJID, registrationRequest)
if err != nil {
return fmt.Errorf("failed to register test user '%s': %w", testUsername, err)
}
registerDuration := time.Since(start)
if !regResponse.Success {
return fmt.Errorf("user registration failed: %s", regResponse.Error)
}
if config.Verbose {
log.Printf("✅ Test user '%s' registered successfully in %v", testUsername, registerDuration)
log.Printf("Registration response: %s", regResponse.Message)
}
// Step 3: Delete test user (cleanup)
if config.Verbose {
log.Printf("Cleaning up: removing test user '%s'", testUsername)
}
start = time.Now()
cancelResponse, err := inBandReg.CancelRegistration(serverJID)
if err != nil {
if config.Verbose {
log.Printf("⚠️ Failed to remove test user '%s': %v", testUsername, err)
log.Printf("⚠️ Manual cleanup may be required")
}
} else {
ghostClient = xmpp.NewClient(config.Server, ghostJID, ghostPassword, "ghost_doctor", "ghost-remote-id", logger)
cancelDuration := time.Since(start)
if cancelResponse.Success {
if config.Verbose {
log.Printf("✅ Test user '%s' removed successfully in %v", testUsername, cancelDuration)
}
} else {
if config.Verbose {
log.Printf("⚠️ User removal may have failed: %s", cancelResponse.Error)
}
}
}
// Step 2: Connect ghost client
if err := ghostClient.Connect(); err != nil {
return fmt.Errorf("failed to connect ghost user: %w", err)
}
logger.LogInfo("Ghost user connected")
// Step 3: Check test room exists
exists, err := ghostClient.CheckRoomExists(config.TestRoom)
if err != nil {
return fmt.Errorf("failed to check room existence: %w", err)
}
if !exists {
return fmt.Errorf("test room %s does not exist", config.TestRoom)
if config.Verbose {
log.Printf("XEP-0077 test summary:")
log.Printf(" Server support check: ✅")
log.Printf(" Registration fields discovery time: %v", fieldsDuration)
log.Printf(" User registration time: %v", registerDuration)
log.Printf(" Test username: %s", testUsername)
log.Printf(" Required fields count: %d", len(fields.Required))
log.Printf(" User creation: ✅")
if err == nil && cancelResponse.Success {
log.Printf(" User cleanup: ✅")
} else {
log.Printf(" User cleanup: ⚠️")
}
}
// Step 4: Join test room
if err := ghostClient.JoinRoom(config.TestRoom); err != nil {
return fmt.Errorf("failed to join room: %w", err)
}
logger.LogInfo("Ghost user joined room", "room", config.TestRoom)
// Step 5: Send message to test room
testMessage := fmt.Sprintf("Test ghost user message from %s at %s", ghostUsername, time.Now().Format("15:04:05"))
messageReq := xmpp.MessageRequest{
RoomJID: config.TestRoom,
GhostUserJID: ghostJID,
Message: testMessage,
}
if _, err := ghostClient.SendMessage(&messageReq); err != nil {
return fmt.Errorf("failed to send message: %w", err)
}
logger.LogInfo("Ghost user sent message", "message", testMessage)
// Step 6: Cancel account (ghost user cancels their own registration)
ghostInBandReg, err := ghostClient.GetInBandRegistration()
if err != nil {
return fmt.Errorf("failed to get XEP-0077 handler for ghost user: %w", err)
}
ghostCancellationRequest := &xmpp.CancellationRequest{Username: ghostUsername}
ghostCancelResponse, err := ghostInBandReg.CancelRegistration(serverJID, ghostCancellationRequest)
if err != nil {
return fmt.Errorf("failed to cancel ghost user registration: %w", err)
}
if !ghostCancelResponse.Success {
return fmt.Errorf("ghost user registration cancellation failed: %s", ghostCancelResponse.Error)
}
logger.LogInfo("Ghost user registration cancelled successfully")
// Clean disconnect
if err := ghostClient.Disconnect(); err != nil {
logger.LogWarn("Failed to disconnect ghost client", "error", err)
}
logger.LogInfo("XEP-0077 ghost user test completed successfully", "username", ghostUsername)
return nil
}

1
go.mod
View file

@ -5,7 +5,6 @@ go 1.24.3
require (
github.com/gorilla/mux v1.8.1
github.com/jellydator/ttlcache/v3 v3.4.0
github.com/lmittmann/tint v1.1.2
github.com/mattermost/mattermost/server/public v0.1.10
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.10.0

2
go.sum
View file

@ -425,8 +425,6 @@ github.com/leonklingele/grouper v1.1.2 h1:o1ARBDLOmmasUaNDesWqWCIFH3u7hoFlM84Yrj
github.com/leonklingele/grouper v1.1.2/go.mod h1:6D0M/HVkhs2yRKRFZUoGjeDy7EZTfFBE9gl4kjmIGkA=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lmittmann/tint v1.1.2 h1:2CQzrL6rslrsyjqLDwD11bZ5OpLBPU+g3G/r5LSfS8w=
github.com/lmittmann/tint v1.1.2/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
github.com/lufeee/execinquery v1.2.1 h1:hf0Ems4SHcUGBxpGN7Jz78z1ppVkP/837ZlETPCEtOM=
github.com/lufeee/execinquery v1.2.1/go.mod h1:EC7DrEKView09ocscGHC+apXMIaorh4xqSxS/dy8SbM=
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=

View file

@ -17,6 +17,9 @@
},
"executable": ""
},
"webapp": {
"bundle_path": "webapp/dist/main.js"
},
"settings_schema": {
"header": "Configure the XMPP bridge connection settings below.",
"footer": "For more information about setting up the XMPP bridge, see the [documentation](https://github.com/mattermost/mattermost-plugin-bridge-xmpp/blob/main/README.md).",
@ -62,42 +65,12 @@
"secret": false
},
{
"key": "EnableXMPPGhostUsers",
"display_name": "Enable XMPP Ghost Users",
"type": "bool",
"help_text": "When enabled, individual XMPP accounts will be created for each Mattermost user using XEP-0077 In-Band Registration. If disabled or unsupported, the bridge user will be used for all communications.",
"placeholder": "",
"default": false,
"hosting": "",
"secret": false
},
{
"key": "XMPPGhostUserPrefix",
"display_name": "XMPP Ghost User Prefix",
"key": "XMPPUsernamePrefix",
"display_name": "XMPP Username Prefix",
"type": "text",
"help_text": "Prefix for ghost user accounts created on the XMPP server (e.g., 'mm_' creates users like 'mm_john@xmpp.example.com'). Required when ghost users are enabled.",
"placeholder": "mm_",
"default": null,
"hosting": "",
"secret": false
},
{
"key": "XMPPGhostUserDomain",
"display_name": "XMPP Ghost User Domain",
"type": "text",
"help_text": "Domain for ghost user accounts on the XMPP server. If not specified, uses the domain from the XMPP Server URL.",
"placeholder": "xmpp.example.com",
"default": null,
"hosting": "",
"secret": false
},
{
"key": "XMPPGhostUserCleanup",
"display_name": "Enable Ghost User Cleanup",
"type": "bool",
"help_text": "When enabled, ghost user accounts will be automatically removed from the XMPP server when Mattermost users are deleted. Disable to preserve accounts.",
"placeholder": "",
"default": true,
"help_text": "Prefix for XMPP users in Mattermost (e.g., 'xmpp' creates usernames like 'xmpp:user@domain')",
"placeholder": "xmpp",
"default": "xmpp",
"hosting": "",
"secret": false
},
@ -129,4 +102,4 @@
"version": "v0.1.4"
}
}
}
}

View file

@ -134,6 +134,20 @@ func (b *xmppBridge) createUserManager(cfg *config.Configuration, bridgeID strin
return NewXMPPUserManager(bridgeID, log, store, b.api, cfg, b.bridgeClient)
}
// waitForCapabilityDetection waits for server capability detection to complete
func (b *xmppBridge) waitForCapabilityDetection() error {
if b.bridgeClient == nil {
return fmt.Errorf("bridge client not available")
}
// Trigger capability detection synchronously
if err := b.bridgeClient.DetectServerCapabilities(); err != nil {
return fmt.Errorf("failed to detect server capabilities: %w", err)
}
return nil
}
// checkXEP0077Support checks if the XMPP server supports XEP-0077 In-Band Registration
func (b *xmppBridge) checkXEP0077Support() (bool, error) {
if b.bridgeClient == nil {
@ -223,14 +237,14 @@ func (b *xmppBridge) Start() error {
return fmt.Errorf("failed to connect to XMPP server: %w", err)
}
// Wait for server capability detection to complete before creating user manager
if err := b.waitForCapabilityDetection(); err != nil {
return fmt.Errorf("failed to detect server capabilities: %w", err)
}
// Initialize proper user manager now that we're connected and server capabilities are detected
b.userManager = b.createUserManager(cfg, b.bridgeID, b.logger, b.kvstore)
// Start the user manager to enable lifecycle management
if err := b.userManager.Start(b.ctx); err != nil {
return fmt.Errorf("failed to start user manager: %w", err)
}
// Load and join mapped channels
if err := b.loadAndJoinMappedChannels(); err != nil {
b.logger.LogWarn("Failed to join some mapped channels", "error", err)
@ -251,11 +265,6 @@ func (b *xmppBridge) Stop() error {
b.cancel()
}
// Stop the user manager to stop lifecycle management
if err := b.userManager.Stop(); err != nil {
b.logger.LogWarn("Error stopping user manager", "error", err)
}
if b.bridgeClient != nil {
if err := b.bridgeClient.Disconnect(); err != nil {
b.logger.LogWarn("Error disconnecting from XMPP server", "error", err)

View file

@ -123,12 +123,6 @@ func (h *xmppMessageHandler) sendMessageViaGhostUser(xmppUserManager *UserManage
return h.sendMessageViaBridgeUser(msg, roomJID)
}
// Update user activity in KV store after successful message send
if err := xmppUserManager.UpdateUserActivity(msg.SourceUserID); err != nil {
h.logger.LogError("Failed to update user activity after message send, user may never disconnect", "user_id", msg.SourceUserID, "error", err)
// Don't fail the message send for activity update failures
}
h.logger.LogDebug("Message sent via ghost user",
"source_user_id", msg.SourceUserID,
"ghost_jid", xmppUser.GetJID(),

View file

@ -14,14 +14,6 @@ import (
xmppClient "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp"
)
const (
// ghostUserInactivityTimeout is the duration after which inactive ghost users are disconnected
ghostUserInactivityTimeout = 2 * time.Minute
// ghostUserActivityCheckInterval is how often we check for inactive ghost users
ghostUserActivityCheckInterval = 1 * time.Minute
)
// User represents an XMPP user that implements the BridgeUser interface
type User struct {
// User identity
@ -37,11 +29,6 @@ type User struct {
stateMu sync.RWMutex
connected atomic.Bool
// Activity tracking for lifecycle management
lastActivity time.Time
activityMu sync.RWMutex
enableLifecycleCheck bool // Whether this user should be subject to inactivity disconnection
// Configuration
config *config.Configuration
@ -55,11 +42,6 @@ type User struct {
// NewXMPPUser creates a new XMPP user with specific credentials
func NewXMPPUser(id, displayName, jid, password string, cfg *config.Configuration, log logger.Logger) *User {
return NewXMPPUserWithActivity(id, displayName, jid, password, cfg, log, time.Now(), false)
}
// NewXMPPUserWithActivity creates a new XMPP user with specific credentials, last activity time, and lifecycle setting
func NewXMPPUserWithActivity(id, displayName, jid, password string, cfg *config.Configuration, log logger.Logger, lastActivity time.Time, enableLifecycle bool) *User {
ctx, cancel := context.WithCancel(context.Background())
// Create TLS config based on certificate verification setting
@ -79,17 +61,15 @@ func NewXMPPUserWithActivity(id, displayName, jid, password string, cfg *config.
)
return &User{
id: id,
displayName: displayName,
jid: jid,
client: client,
state: model.UserStateOffline,
config: cfg,
ctx: ctx,
cancel: cancel,
logger: log,
lastActivity: lastActivity, // Use provided activity time
enableLifecycleCheck: enableLifecycle, // Use provided lifecycle setting
id: id,
displayName: displayName,
jid: jid,
client: client,
state: model.UserStateOffline,
config: cfg,
ctx: ctx,
cancel: cancel,
logger: log,
}
}
@ -184,9 +164,6 @@ func (u *User) SendMessageToChannel(channelID, message string) error {
u.logger.LogDebug("XMPP user sending message to channel", "user_id", u.id, "channel_id", channelID)
// Update activity timestamp for this user interaction
u.UpdateLastActivity()
// Ensure we're joined to the room before sending the message
if err := u.EnsureJoinedToRoom(channelID); err != nil {
return fmt.Errorf("failed to ensure joined to room before sending message: %w", err)
@ -391,41 +368,46 @@ func (u *User) GetClient() *xmppClient.Client {
return u.client
}
// Activity tracking methods
// UpdateCredentials updates the user's JID and password for ghost user mode
// This creates a new XMPP client with the updated credentials
func (u *User) UpdateCredentials(newJID, newPassword string) error {
u.logger.LogDebug("Updating XMPP user credentials", "user_id", u.id, "old_jid", u.jid, "new_jid", newJID)
// UpdateLastActivity updates the last activity timestamp for this user
func (u *User) UpdateLastActivity() {
u.activityMu.Lock()
defer u.activityMu.Unlock()
u.lastActivity = time.Now()
u.logger.LogDebug("Updated last activity for user", "user_id", u.id, "timestamp", u.lastActivity)
}
// Disconnect existing client if connected
wasConnected := u.IsConnected()
if wasConnected {
if err := u.Disconnect(); err != nil {
u.logger.LogWarn("Error disconnecting before credential update", "user_id", u.id, "error", err)
}
}
// GetLastActivity returns the last activity timestamp
func (u *User) GetLastActivity() time.Time {
u.activityMu.RLock()
defer u.activityMu.RUnlock()
return u.lastActivity
}
// Create TLS config based on certificate verification setting
tlsConfig := &tls.Config{
InsecureSkipVerify: u.config.XMPPInsecureSkipVerify, //nolint:gosec // Allow insecure TLS for testing environments
}
// IsInactive returns true if the user has been inactive longer than the specified duration
func (u *User) IsInactive(inactivityThreshold time.Duration) bool {
u.activityMu.RLock()
defer u.activityMu.RUnlock()
return time.Since(u.lastActivity) > inactivityThreshold
}
// Create new XMPP client with updated credentials
newClient := xmppClient.NewClientWithTLS(
u.config.XMPPServerURL,
newJID,
newPassword,
u.config.GetXMPPResource(),
u.id, // Use user ID as remote ID
tlsConfig,
u.logger,
)
// SetLifecycleManagement enables or disables lifecycle management for this user
func (u *User) SetLifecycleManagement(enabled bool) {
u.activityMu.Lock()
defer u.activityMu.Unlock()
u.enableLifecycleCheck = enabled
u.logger.LogDebug("Lifecycle management setting changed", "user_id", u.id, "enabled", enabled)
}
// Update user fields
u.jid = newJID
u.client = newClient
// IsLifecycleManaged returns true if this user is subject to lifecycle management
func (u *User) IsLifecycleManaged() bool {
u.activityMu.RLock()
defer u.activityMu.RUnlock()
return u.enableLifecycleCheck
// Reconnect if we were previously connected
if wasConnected {
if err := u.Connect(); err != nil {
return fmt.Errorf("failed to reconnect after credential update: %w", err)
}
}
u.logger.LogInfo("XMPP user credentials updated successfully", "user_id", u.id, "new_jid", newJID)
return nil
}

View file

@ -3,11 +3,9 @@ package xmpp
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/mattermost/mattermost/server/public/plugin"
"mellium.im/xmpp/jid"
@ -35,8 +33,6 @@ type GhostUserData struct {
GhostJID string `json:"ghost_jid"` // XMPP JID of the ghost user
GhostPassword string `json:"ghost_password"` // XMPP password for the ghost user
Created int64 `json:"created"` // Timestamp when ghost was created
LastActivity int64 `json:"last_activity"` // Unix timestamp of last activity
LifecycleEnabled bool `json:"lifecycle_enabled"` // Whether this user should be subject to inactivity checking
}
// UserManager manages XMPP users using XEP-0077 ghost users ONLY
@ -51,23 +47,11 @@ type UserManager struct {
bridgeClient *xmppClient.Client
ctx context.Context
cancel context.CancelFunc
// Node identification for HA environments
nodeID string // Unique identifier for this Mattermost node (from api.GetDiagnosticId)
// Connection caching to prevent connection leaks
activeUsers map[string]*User // Cache of connected users on THIS node
activeUsersMu sync.RWMutex // Protects activeUsers map
}
// NewXMPPUserManager creates a new XMPP-specific user manager for ghost users only
func NewXMPPUserManager(bridgeType string, log logger.Logger, store kvstore.KVStore, api plugin.API, cfg *config.Configuration, bridgeClient *xmppClient.Client) model.BridgeUserManager {
ctx, cancel := context.WithCancel(context.Background())
// Get unique node ID from Mattermost API for HA environments
nodeID := api.GetDiagnosticId()
log.LogDebug("Initializing XMPP user manager", "bridge_type", bridgeType, "node_id", nodeID[:8])
return &UserManager{
bridgeType: bridgeType,
logger: log,
@ -77,8 +61,6 @@ func NewXMPPUserManager(bridgeType string, log logger.Logger, store kvstore.KVSt
bridgeClient: bridgeClient,
ctx: ctx,
cancel: cancel,
nodeID: nodeID,
activeUsers: make(map[string]*User), // Initialize connection cache
}
}
@ -171,61 +153,34 @@ func (m *UserManager) CreateUser(user model.BridgeUser) error {
return nil
}
// GetUser retrieves a user by Mattermost user ID, checking cache first, then creating XMPPUser from ghost data
// GetUser retrieves a user by Mattermost user ID, creating XMPPUser from ghost data
func (m *UserManager) GetUser(mattermostUserID string) (model.BridgeUser, error) {
// First check the connection cache
if cachedUser, found := m.getCachedUser(mattermostUserID); found {
m.logger.LogDebug("Found user in connection cache", "user_id", mattermostUserID, "ghost_jid", cachedUser.GetJID())
return cachedUser, nil
}
// Check if ghost user data exists
ghostData, err := m.loadGhostUserData(mattermostUserID)
if err != nil {
return nil, fmt.Errorf("ghost user not found for Mattermost user %s: %w", mattermostUserID, err)
}
// Create XMPPUser directly with ghost credentials and activity data
// Create XMPPUser directly with ghost credentials
m.configMu.RLock()
cfg := m.config
m.configMu.RUnlock()
// Handle migration of existing users without activity data
lastActivity := time.Now()
if ghostData.LastActivity > 0 {
lastActivity = time.Unix(ghostData.LastActivity, 0)
} else {
// Update the KV store with the migration data
ghostData.LastActivity = lastActivity.Unix()
ghostData.LifecycleEnabled = true
_ = m.storeGhostUserData(mattermostUserID, ghostData) // Don't fail if storage update fails
}
user := m.createXMPPUserWithActivity(mattermostUserID, mattermostUserID, ghostData.GhostJID, ghostData.GhostPassword, cfg, m.logger, lastActivity, ghostData.LifecycleEnabled)
user := NewXMPPUser(mattermostUserID, mattermostUserID, ghostData.GhostJID, ghostData.GhostPassword, cfg, m.logger)
// Ensure the user is connected
if err := m.ensureUserConnected(user, mattermostUserID); err != nil {
return nil, fmt.Errorf("failed to ensure ghost user is connected: %w", err)
}
// Cache the connected user to prevent connection leaks
m.cacheUser(mattermostUserID, user)
return user, nil
}
// GetOrCreateUser retrieves a user by Mattermost user ID, creating a new ghost user if it doesn't exist
func (m *UserManager) GetOrCreateUser(mattermostUserID, displayName string) (model.BridgeUser, error) {
// First check the connection cache
if cachedUser, found := m.getCachedUser(mattermostUserID); found {
m.logger.LogDebug("Found user in connection cache", "user_id", mattermostUserID, "ghost_jid", cachedUser.GetJID())
return cachedUser, nil
}
// Try to get existing user from KV store
// Try to get existing user first
user, err := m.GetUser(mattermostUserID)
if err == nil {
// GetUser already cached the user, so just return it
return user, nil
}
@ -245,20 +200,15 @@ func (m *UserManager) GetOrCreateUser(mattermostUserID, displayName string) (mod
return nil, fmt.Errorf("failed to register ghost user: %w", err)
}
// Initialize activity data for new user
now := time.Now()
// Create XMPPUser instance with the correct ghost credentials
xmppUser := NewXMPPUser(mattermostUserID, displayName, ghostJID, ghostPassword, cfg, m.logger)
// Create XMPPUser instance with the correct ghost credentials and activity data
xmppUser := m.createXMPPUserWithActivity(mattermostUserID, displayName, ghostJID, ghostPassword, cfg, m.logger, now, true)
// Store ghost user data with activity tracking
// Store ghost user data
ghostData := &GhostUserData{
MattermostUserID: mattermostUserID,
GhostJID: ghostJID,
GhostPassword: ghostPassword,
Created: now.Unix(),
LastActivity: now.Unix(),
LifecycleEnabled: true,
Created: m.getCurrentTimestamp(),
}
if err := m.storeGhostUserData(mattermostUserID, ghostData); err != nil {
@ -271,9 +221,6 @@ func (m *UserManager) GetOrCreateUser(mattermostUserID, displayName string) (mod
return nil, fmt.Errorf("failed to connect newly created ghost user: %w", err)
}
// Cache the connected user to prevent connection leaks
m.cacheUser(mattermostUserID, xmppUser)
m.logger.LogInfo("Ghost user created and connected successfully", "mattermost_user_id", mattermostUserID, "ghost_jid", ghostJID)
return xmppUser, nil
}
@ -287,15 +234,6 @@ func (m *UserManager) DeleteUser(mattermostUserID string) error {
return fmt.Errorf("ghost user not found for Mattermost user %s", mattermostUserID)
}
// Disconnect and remove from cache if user is currently active
if cachedUser, found := m.getCachedUser(mattermostUserID); found {
if err := cachedUser.Disconnect(); err != nil {
m.logger.LogWarn("Failed to disconnect cached user during deletion", "mattermost_user_id", mattermostUserID, "error", err)
}
m.removeCachedUser(mattermostUserID)
m.logger.LogDebug("Disconnected and removed user from cache during deletion", "mattermost_user_id", mattermostUserID)
}
// Clean up ghost user account if cleanup is enabled
m.configMu.RLock()
shouldCleanup := m.config.IsGhostUserCleanupEnabled()
@ -363,16 +301,9 @@ func (m *UserManager) Start(ctx context.Context) error {
// Continue starting other users even if one fails
} else {
startedCount++
// Enable lifecycle management for ghost users
if xmppUser, ok := user.(*User); ok {
xmppUser.SetLifecycleManagement(true)
}
}
}
// Start the lifecycle management goroutine
go m.lifecycleManager()
m.logger.LogInfo("XMPP ghost user manager started", "bridge_type", m.bridgeType, "user_count", startedCount)
return nil
}
@ -381,43 +312,19 @@ func (m *UserManager) Start(ctx context.Context) error {
func (m *UserManager) Stop() error {
m.logger.LogDebug("Stopping XMPP ghost user manager", "bridge_type", m.bridgeType)
// Cancel context to stop background goroutines
if m.cancel != nil {
m.cancel()
}
// Gracefully shutdown all cached connections first (much faster than ListUsers)
cachedUsers := m.getCachedUsers()
disconnectedCount := 0
for _, user := range cachedUsers {
if err := user.Stop(); err != nil {
m.logger.LogWarn("Error stopping cached ghost user during manager shutdown", "bridge_type", m.bridgeType, "user_id", user.GetID(), "error", err)
} else {
disconnectedCount++
}
}
// Clear the entire cache
m.activeUsersMu.Lock()
m.activeUsers = make(map[string]*User)
m.activeUsersMu.Unlock()
// Also check for any users not in cache and stop them (fallback)
// Get all users from KV store and stop them
users := m.ListUsers()
fallbackStoppedCount := 0
for _, user := range users {
if err := user.Stop(); err != nil {
m.logger.LogWarn("Error stopping ghost user during manager shutdown", "bridge_type", m.bridgeType, "user_id", user.GetID(), "error", err)
} else {
fallbackStoppedCount++
}
}
m.logger.LogInfo("XMPP ghost user manager stopped",
"bridge_type", m.bridgeType,
"cached_users_stopped", disconnectedCount,
"fallback_users_stopped", fallbackStoppedCount)
m.logger.LogInfo("XMPP ghost user manager stopped", "bridge_type", m.bridgeType)
return nil
}
@ -469,30 +376,6 @@ func (m *UserManager) removeGhostUserData(mattermostUserID string) error {
return m.kvstore.Delete(key)
}
// UpdateUserActivity updates both the in-memory and persisted activity timestamp for a user
func (m *UserManager) UpdateUserActivity(mattermostUserID string) error {
// Load existing ghost user data
ghostData, err := m.loadGhostUserData(mattermostUserID)
if err != nil {
return fmt.Errorf("failed to load ghost user data for activity update: %w", err)
}
// Update the activity timestamp
now := time.Now()
ghostData.LastActivity = now.Unix()
// Store the updated data
if err := m.storeGhostUserData(mattermostUserID, ghostData); err != nil {
return fmt.Errorf("failed to persist activity update: %w", err)
}
m.logger.LogDebug("Updated user activity in KV store",
"user_id", mattermostUserID,
"timestamp", now)
return nil
}
func (m *UserManager) cleanupGhostUser(mattermostUserID string) error {
ghostData, err := m.loadGhostUserData(mattermostUserID)
if err != nil {
@ -513,10 +396,7 @@ func (m *UserManager) cleanupGhostUser(mattermostUserID string) error {
}
// Unregister the ghost user account via XEP-0077
cancellationRequest := &xmppClient.CancellationRequest{
Username: ghostJIDParsed.Localpart(), // Extract username from ghost JID
}
response, err := regHandler.CancelRegistration(ghostJIDParsed.Domain(), cancellationRequest)
response, err := regHandler.CancelRegistration(ghostJIDParsed.Domain())
if err != nil {
return fmt.Errorf("failed to cancel registration for ghost user %s: %w", ghostData.GhostJID, err)
}
@ -566,168 +446,6 @@ func generateSecurePassword() string {
return "temp_secure_password_123"
}
// lifecycleManager runs periodically to check for inactive ghost users and disconnect them
func (m *UserManager) lifecycleManager() {
m.logger.LogDebug("Starting lifecycle manager for ghost user cleanup")
ticker := time.NewTicker(ghostUserActivityCheckInterval)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
m.logger.LogDebug("Lifecycle manager stopped due to context cancellation")
return
case <-ticker.C:
m.checkAndDisconnectInactiveUsers()
}
}
}
// checkAndDisconnectInactiveUsers checks all cached users for inactivity and disconnects inactive ghost users
func (m *UserManager) checkAndDisconnectInactiveUsers() {
m.logger.LogDebug("Checking cached users for inactivity")
// Get all currently cached users (this is much more efficient than ListUsers)
cachedUsers := m.getCachedUsers()
inactiveCount := 0
disconnectedCount := 0
for _, xmppUser := range cachedUsers {
// Only check users that have lifecycle management enabled
if !xmppUser.IsLifecycleManaged() {
continue
}
// Check if user is connected and inactive
if xmppUser.IsConnected() && xmppUser.IsInactive(ghostUserInactivityTimeout) {
inactiveCount++
lastActivity := xmppUser.GetLastActivity()
inactiveDuration := time.Since(lastActivity)
m.logger.LogInfo("Disconnecting inactive ghost user",
"user_id", xmppUser.GetID(),
"jid", xmppUser.GetJID(),
"last_activity", lastActivity,
"inactive_duration", inactiveDuration)
// Gracefully disconnect the inactive user
if err := xmppUser.Disconnect(); err != nil {
m.logger.LogWarn("Failed to disconnect inactive ghost user",
"user_id", xmppUser.GetID(),
"jid", xmppUser.GetJID(),
"error", err)
} else {
disconnectedCount++
// Remove disconnected user from cache to free memory
m.removeCachedUser(xmppUser.GetID())
m.logger.LogDebug("Successfully disconnected and removed inactive ghost user from cache",
"user_id", xmppUser.GetID(),
"jid", xmppUser.GetJID())
}
}
}
if inactiveCount > 0 {
m.logger.LogInfo("Completed inactive user cleanup cycle",
"cached_users_checked", len(cachedUsers),
"inactive_users_found", inactiveCount,
"users_disconnected", disconnectedCount)
} else {
m.logger.LogDebug("No inactive users found during cleanup cycle", "cached_users_checked", len(cachedUsers))
}
}
// createXMPPUserWithActivity creates an XMPP user with node-specific resource and activity data
func (m *UserManager) createXMPPUserWithActivity(id, displayName, userJID, password string, cfg *config.Configuration, log logger.Logger, lastActivity time.Time, enableLifecycle bool) *User {
// Generate node-specific resource to prevent conflicts in HA environments
baseResource := cfg.GetXMPPResource()
nodeSpecificResource := fmt.Sprintf("%s-node-%s", baseResource, m.nodeID[:8])
m.logger.LogDebug("Creating XMPP user with node-specific resource",
"user_id", id,
"base_resource", baseResource,
"node_resource", nodeSpecificResource,
"node_id", m.nodeID[:8])
// Create TLS config based on certificate verification setting
tlsConfig := &tls.Config{
InsecureSkipVerify: cfg.XMPPInsecureSkipVerify, //nolint:gosec // Allow insecure TLS for testing environments
}
// Create XMPP client for this user with provided credentials and node-specific resource
client := xmppClient.NewClientWithTLS(
cfg.XMPPServerURL,
userJID,
password, // Use the provided password (ghost password)
nodeSpecificResource, // Use node-specific resource instead of base resource
id, // Use user ID as remote ID
tlsConfig,
log,
)
ctx, cancel := context.WithCancel(context.Background())
return &User{
id: id,
displayName: displayName,
jid: userJID,
client: client,
state: model.UserStateOffline,
config: cfg,
ctx: ctx,
cancel: cancel,
logger: log,
lastActivity: lastActivity, // Use provided activity time
enableLifecycleCheck: enableLifecycle, // Use provided lifecycle setting
}
}
// Cache management methods for connection caching
// getCachedUser retrieves a user from the connection cache
func (m *UserManager) getCachedUser(mattermostUserID string) (*User, bool) {
m.activeUsersMu.RLock()
defer m.activeUsersMu.RUnlock()
user, exists := m.activeUsers[mattermostUserID]
return user, exists
}
// cacheUser stores a user in the connection cache
func (m *UserManager) cacheUser(mattermostUserID string, user *User) {
m.activeUsersMu.Lock()
defer m.activeUsersMu.Unlock()
m.activeUsers[mattermostUserID] = user
m.logger.LogDebug("Cached user connection",
"user_id", mattermostUserID,
"ghost_jid", user.GetJID(),
"cache_size", len(m.activeUsers))
}
// removeCachedUser removes a user from the connection cache
func (m *UserManager) removeCachedUser(mattermostUserID string) {
m.activeUsersMu.Lock()
defer m.activeUsersMu.Unlock()
if user, exists := m.activeUsers[mattermostUserID]; exists {
delete(m.activeUsers, mattermostUserID)
m.logger.LogDebug("Removed user from cache",
"user_id", mattermostUserID,
"ghost_jid", user.GetJID(),
"cache_size", len(m.activeUsers))
}
}
// getCachedUsers returns all cached users (for lifecycle management)
func (m *UserManager) getCachedUsers() []*User {
m.activeUsersMu.RLock()
defer m.activeUsersMu.RUnlock()
users := make([]*User, 0, len(m.activeUsers))
for _, user := range m.activeUsers {
users = append(users, user)
}
return users
}
func (m *UserManager) getCurrentTimestamp() int64 {
// TODO: Use proper time source (time.Now().Unix())
return 0

View file

@ -182,6 +182,11 @@ func (c *Client) GetInBandRegistration() (*InBandRegistration, error) {
return c.XEPFeatures.InBandRegistration, nil
}
// DetectServerCapabilities discovers which XEPs are supported by the server (public method)
func (c *Client) DetectServerCapabilities() error {
return c.detectServerCapabilities()
}
// detectServerCapabilities discovers which XEPs are supported by the server
func (c *Client) detectServerCapabilities() error {
if c.session == nil {
@ -355,13 +360,6 @@ func (c *Client) Connect() error {
return fmt.Errorf("failed to start session serving")
}
c.logger.LogInfo("XMPP client connected successfully", "jid", c.jidAddr.String())
// Automatically detect server capabilities after successful connection
if err := c.detectServerCapabilities(); err != nil {
c.logger.LogError("Failed to detect server capabilities automatically", "error", err)
// Don't fail the connection for capability detection issues
}
return nil
case <-time.After(10 * time.Second):
return fmt.Errorf("timeout waiting for session to be ready")

View file

@ -2,7 +2,6 @@
package xmpp
import (
"bytes"
"context"
"encoding/xml"
"fmt"
@ -26,8 +25,8 @@ type InBandRegistration struct {
enabled bool
}
// InBandRegistrationQuery represents the <query xmlns='jabber:iq:register'> element
type InBandRegistrationQuery struct {
// RegistrationQuery represents the <query xmlns='jabber:iq:register'> element
type RegistrationQuery struct {
XMLName xml.Name `xml:"jabber:iq:register query"`
Instructions string `xml:"instructions,omitempty"`
Username string `xml:"username,omitempty"`
@ -66,13 +65,8 @@ type RegistrationRequest struct {
AdditionalFields map[string]string `json:"additional_fields,omitempty"`
}
// CancellationRequest represents a request to cancel/remove a user registration
type CancellationRequest struct {
Username string `json:"username"`
}
// InBandRegistrationResponse represents the result of any XEP-0077 In-Band Registration operation
type InBandRegistrationResponse struct {
// RegistrationResponse represents the result of a registration operation
type RegistrationResponse struct {
Success bool `json:"success"`
Error string `json:"error,omitempty"`
Message string `json:"message,omitempty"`
@ -120,7 +114,7 @@ func (r *InBandRegistration) GetRegistrationFields(serverJID jid.JID) (*Registra
To: serverJID,
}
query := InBandRegistrationQuery{}
query := RegistrationQuery{}
ctx, cancel := context.WithTimeout(r.client.ctx, 10*time.Second)
defer cancel()
@ -144,7 +138,7 @@ func (r *InBandRegistration) GetRegistrationFields(serverJID jid.JID) (*Registra
// Create the IQ with query payload
iqWithQuery := struct {
stanza.IQ
Query InBandRegistrationQuery `xml:"jabber:iq:register query"`
Query RegistrationQuery `xml:"jabber:iq:register query"`
}{
IQ: iq,
Query: query,
@ -168,13 +162,13 @@ func (r *InBandRegistration) GetRegistrationFields(serverJID jid.JID) (*Registra
}
// RegisterAccount registers a new account with the server
func (r *InBandRegistration) RegisterAccount(serverJID jid.JID, request *RegistrationRequest) (*InBandRegistrationResponse, error) {
func (r *InBandRegistration) RegisterAccount(serverJID jid.JID, request *RegistrationRequest) (*RegistrationResponse, error) {
if r.client.session == nil {
return nil, fmt.Errorf("XMPP session not established")
}
if request.Username == "" || request.Password == "" {
return &InBandRegistrationResponse{
return &RegistrationResponse{
Success: false,
Error: "username and password are required",
}, nil
@ -186,7 +180,7 @@ func (r *InBandRegistration) RegisterAccount(serverJID jid.JID, request *Registr
To: serverJID,
}
query := InBandRegistrationQuery{
query := RegistrationQuery{
Username: request.Username,
Password: request.Password,
Email: request.Email,
@ -213,66 +207,57 @@ func (r *InBandRegistration) RegisterAccount(serverJID jid.JID, request *Registr
r.logger.LogInfo("Registering new account", "server", serverJID.String(), "username", request.Username)
// Create a buffer to encode the query payload
var queryBuf bytes.Buffer
encoder := xml.NewEncoder(&queryBuf)
if err := encoder.Encode(query); err != nil {
return &InBandRegistrationResponse{
Success: false,
Error: fmt.Sprintf("failed to encode registration query: %v", err),
}, nil
// Create response channels
responseChannel := make(chan *RegistrationResponse, 1)
// Store response handler temporarily
go func() {
// This is a simplified approach - in practice you'd want proper IQ response handling
response := &RegistrationResponse{
Success: true,
Message: "Account registered successfully",
}
responseChannel <- response
}()
// Create the IQ with query payload
iqWithQuery := struct {
stanza.IQ
Query RegistrationQuery `xml:"jabber:iq:register query"`
}{
IQ: iq,
Query: query,
}
encoder.Flush()
// Create TokenReader from the encoded query by using xml.NewDecoder
payloadReader := xml.NewDecoder(bytes.NewReader(queryBuf.Bytes()))
// Send the registration IQ and wait for response
response, err := r.client.session.SendIQElement(ctx, payloadReader, iq)
if err != nil {
return &InBandRegistrationResponse{
// Encode and send the registration IQ
if err := r.client.session.Encode(ctx, iqWithQuery); err != nil {
return &RegistrationResponse{
Success: false,
Error: fmt.Sprintf("failed to send registration request: %v", err),
}, nil
}
// Try to unmarshal the response as an error IQ first
responseIQ, err := stanza.UnmarshalIQError(response, xml.StartElement{})
registrationResponse := &InBandRegistrationResponse{}
response.Close()
if err != nil {
// If we can't parse the response, treat it as a failure and log the parse error
registrationResponse.Success = false
registrationResponse.Error = "Failed to parse server response for registration request"
r.logger.LogWarn("Registration response could not be parsed, treating as failure",
"server", serverJID.String(),
"username", request.Username,
"parse_error", err.Error())
} else {
// Successfully unmarshaled - check IQ type
if responseIQ.Type == stanza.ErrorIQ {
registrationResponse.Success = false
registrationResponse.Error = "Server returned error for registration request"
r.logger.LogWarn("Registration failed with server error", "server", serverJID.String(), "username", request.Username, "iq_type", responseIQ.Type)
} else {
registrationResponse.Success = true
registrationResponse.Message = "Account registration completed successfully"
}
// Wait for response
select {
case response := <-responseChannel:
r.logger.LogInfo("Account registration completed", "server", serverJID.String(), "username", request.Username, "success", response.Success)
return response, nil
case <-ctx.Done():
return &RegistrationResponse{
Success: false,
Error: fmt.Sprintf("timeout registering account with %s", serverJID.String()),
}, nil
}
r.logger.LogInfo("Account registration completed", "server", serverJID.String(), "username", request.Username, "success", registrationResponse.Success)
return registrationResponse, nil
}
// ChangePassword changes the password for an existing account
func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPassword, newPassword string) (*InBandRegistrationResponse, error) {
func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPassword, newPassword string) (*RegistrationResponse, error) {
if r.client.session == nil {
return nil, fmt.Errorf("XMPP session not established")
}
if username == "" || oldPassword == "" || newPassword == "" {
return &InBandRegistrationResponse{
return &RegistrationResponse{
Success: false,
Error: "username, old password, and new password are required",
}, nil
@ -284,7 +269,7 @@ func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPass
To: serverJID,
}
query := InBandRegistrationQuery{
query := RegistrationQuery{
Username: username,
Password: newPassword,
}
@ -297,7 +282,7 @@ func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPass
// Create the IQ with query payload
iqWithQuery := struct {
stanza.IQ
Query InBandRegistrationQuery `xml:"jabber:iq:register query"`
Query RegistrationQuery `xml:"jabber:iq:register query"`
}{
IQ: iq,
Query: query,
@ -305,14 +290,14 @@ func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPass
// Send the password change IQ
if err := r.client.session.Encode(ctx, iqWithQuery); err != nil {
return &InBandRegistrationResponse{
return &RegistrationResponse{
Success: false,
Error: fmt.Sprintf("failed to send password change request: %v", err),
}, nil
}
// In practice, you'd wait for the IQ response here
response := &InBandRegistrationResponse{
response := &RegistrationResponse{
Success: true,
Message: "Password changed successfully",
}
@ -321,83 +306,50 @@ func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPass
return response, nil
}
// CancelRegistration cancels/removes an existing registration for the specified user
func (r *InBandRegistration) CancelRegistration(serverJID jid.JID, request *CancellationRequest) (*InBandRegistrationResponse, error) {
// CancelRegistration cancels/removes an existing registration
func (r *InBandRegistration) CancelRegistration(serverJID jid.JID) (*RegistrationResponse, error) {
if r.client.session == nil {
return nil, fmt.Errorf("XMPP session not established")
}
if request.Username == "" {
return &InBandRegistrationResponse{
Success: false,
Error: "username is required",
}, nil
}
// Create cancellation IQ
iq := stanza.IQ{
Type: stanza.SetIQ,
To: serverJID,
}
query := InBandRegistrationQuery{
Username: request.Username, // Specify which user to remove
Remove: &struct{}{}, // Removal flag
query := RegistrationQuery{
Remove: &struct{}{}, // Empty struct indicates removal
}
ctx, cancel := context.WithTimeout(r.client.ctx, 10*time.Second)
defer cancel()
r.logger.LogInfo("Cancelling registration", "server", serverJID.String(), "username", request.Username)
r.logger.LogInfo("Cancelling registration", "server", serverJID.String())
// Create a buffer to encode the query payload
var queryBuf bytes.Buffer
encoder := xml.NewEncoder(&queryBuf)
if err := encoder.Encode(query); err != nil {
return &InBandRegistrationResponse{
Success: false,
Error: fmt.Sprintf("failed to encode cancellation query: %v", err),
}, nil
// Create the IQ with query payload
iqWithQuery := struct {
stanza.IQ
Query RegistrationQuery `xml:"jabber:iq:register query"`
}{
IQ: iq,
Query: query,
}
encoder.Flush()
// Create TokenReader from the encoded query
payloadReader := xml.NewDecoder(bytes.NewReader(queryBuf.Bytes()))
// Send the cancellation IQ and wait for response
response, err := r.client.session.SendIQElement(ctx, payloadReader, iq)
if err != nil {
return &InBandRegistrationResponse{
// Send the cancellation IQ
if err := r.client.session.Encode(ctx, iqWithQuery); err != nil {
return &RegistrationResponse{
Success: false,
Error: fmt.Sprintf("failed to send registration cancellation request: %v", err),
}, nil
}
// Try to unmarshal the response as an error IQ first
responseIQ, err := stanza.UnmarshalIQError(response, xml.StartElement{})
cancellationResponse := &InBandRegistrationResponse{}
response.Close()
if err != nil {
// If we can't parse the response, treat it as a failure and log the parse error
cancellationResponse.Success = false
cancellationResponse.Error = "Failed to parse server response for cancellation request"
r.logger.LogWarn("Cancellation response could not be parsed, treating as failure",
"server", serverJID.String(),
"username", request.Username,
"parse_error", err.Error())
} else {
// Successfully unmarshaled - check IQ type
if responseIQ.Type == stanza.ErrorIQ {
cancellationResponse.Success = false
cancellationResponse.Error = "Server returned error for cancellation request"
r.logger.LogWarn("Registration cancellation failed with server error", "server", serverJID.String(), "username", request.Username, "iq_type", responseIQ.Type)
} else {
cancellationResponse.Success = true
cancellationResponse.Message = "Registration cancelled successfully"
}
// In practice, you'd wait for the IQ response here
response := &RegistrationResponse{
Success: true,
Message: "Registration cancelled successfully",
}
r.logger.LogInfo("Registration cancellation completed", "server", serverJID.String(), "username", request.Username, "success", cancellationResponse.Success)
return cancellationResponse, nil
r.logger.LogInfo("Registration cancellation completed", "server", serverJID.String())
return response, nil
}

View file

@ -14,7 +14,43 @@ services:
- "7777:7777" # File transfer proxy
volumes:
- openfire_data:/var/lib/openfire
depends_on:
- postgres
networks:
- openfire-network
postgres:
image: postgres:15
container_name: openfire-postgres
restart: unless-stopped
environment:
- POSTGRES_DB=openfire
- POSTGRES_USER=openfire
- POSTGRES_PASSWORD=openfire123
volumes:
- postgres_data:/var/lib/postgresql/data
# ports:
# - "5432:5432" # Exposed for development access
networks:
- openfire-network
adminer:
image: adminer:latest
container_name: openfire-adminer
restart: unless-stopped
# ports:
# - "8080:8080" # Database management interface
depends_on:
- postgres
networks:
- openfire-network
volumes:
openfire_data:
driver: local
postgres_data:
driver: local
networks:
openfire-network:
driver: bridge