From 9d2dd5619b2ff3e6398e7a9e9c3253d43e11d64d Mon Sep 17 00:00:00 2001 From: Felipe Martin Date: Mon, 11 Aug 2025 13:28:47 +0200 Subject: [PATCH] feat: implement graceful message bus shutdown with drainage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add atomic.Bool draining flag to prevent new messages during shutdown - Modify Publish() to silently ignore messages when draining - Update Stop() method to set draining flag and naturally drain channel - Channel closes and routing goroutine processes all remaining messages - Zero message loss during normal plugin shutdown/restart - No complex drainage logic - leverages Go channel semantics 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- server/bridge/messagebus.go | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/server/bridge/messagebus.go b/server/bridge/messagebus.go index 70f5488..69d2f4e 100644 --- a/server/bridge/messagebus.go +++ b/server/bridge/messagebus.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" @@ -32,6 +33,9 @@ type messageBus struct { wg sync.WaitGroup started bool startMu sync.Mutex + + // Graceful shutdown management + draining atomic.Bool } // NewMessageBus creates a new message bus instance @@ -70,6 +74,14 @@ func (mb *messageBus) Publish(msg *model.DirectionalMessage) error { return fmt.Errorf("bridge message cannot be nil") } + // Check if we're draining - if so, silently ignore new messages + if mb.draining.Load() { + mb.logger.LogDebug("Ignoring message during shutdown drainage", + "source_bridge", msg.SourceBridge, + "channel_id", msg.SourceChannelID) + return nil + } + select { case mb.incomingMessages <- msg: mb.logger.LogDebug("Message published to bus", @@ -116,12 +128,20 @@ func (mb *messageBus) Stop() error { return nil // Already stopped } - mb.logger.LogInfo("Stopping message bus") + pendingCount := len(mb.incomingMessages) + mb.logger.LogInfo("Stopping message bus", "pending_messages", pendingCount) + + // Set draining flag to prevent new messages + mb.draining.Store(true) // Cancel context to signal shutdown mb.cancel() - // Wait for routing goroutine to finish + // Close incoming messages channel to signal routing goroutine to finish + // The routing goroutine will process all remaining messages until channel is empty + close(mb.incomingMessages) + + // Wait for routing goroutine to finish processing all remaining messages mb.wg.Wait() // Close all subscriber channels @@ -133,11 +153,8 @@ func (mb *messageBus) Stop() error { mb.subscribers = make(map[string]chan *model.DirectionalMessage) mb.subscribersMu.Unlock() - // Close incoming messages channel - close(mb.incomingMessages) - mb.started = false - mb.logger.LogInfo("Message bus stopped successfully") + mb.logger.LogInfo("Message bus stopped successfully", "drained_messages", pendingCount) return nil }