diff --git a/server/bridge/messagebus.go b/server/bridge/messagebus.go index 70f5488..69d2f4e 100644 --- a/server/bridge/messagebus.go +++ b/server/bridge/messagebus.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" @@ -32,6 +33,9 @@ type messageBus struct { wg sync.WaitGroup started bool startMu sync.Mutex + + // Graceful shutdown management + draining atomic.Bool } // NewMessageBus creates a new message bus instance @@ -70,6 +74,14 @@ func (mb *messageBus) Publish(msg *model.DirectionalMessage) error { return fmt.Errorf("bridge message cannot be nil") } + // Check if we're draining - if so, silently ignore new messages + if mb.draining.Load() { + mb.logger.LogDebug("Ignoring message during shutdown drainage", + "source_bridge", msg.SourceBridge, + "channel_id", msg.SourceChannelID) + return nil + } + select { case mb.incomingMessages <- msg: mb.logger.LogDebug("Message published to bus", @@ -116,12 +128,20 @@ func (mb *messageBus) Stop() error { return nil // Already stopped } - mb.logger.LogInfo("Stopping message bus") + pendingCount := len(mb.incomingMessages) + mb.logger.LogInfo("Stopping message bus", "pending_messages", pendingCount) + + // Set draining flag to prevent new messages + mb.draining.Store(true) // Cancel context to signal shutdown mb.cancel() - // Wait for routing goroutine to finish + // Close incoming messages channel to signal routing goroutine to finish + // The routing goroutine will process all remaining messages until channel is empty + close(mb.incomingMessages) + + // Wait for routing goroutine to finish processing all remaining messages mb.wg.Wait() // Close all subscriber channels @@ -133,11 +153,8 @@ func (mb *messageBus) Stop() error { mb.subscribers = make(map[string]chan *model.DirectionalMessage) mb.subscribersMu.Unlock() - // Close incoming messages channel - close(mb.incomingMessages) - mb.started = false - mb.logger.LogInfo("Message bus stopped successfully") + mb.logger.LogInfo("Message bus stopped successfully", "drained_messages", pendingCount) return nil }