feat: implement TTL cache for message deduplication and remove debug logging

- Replace manual map-based deduplication with jellydator/ttlcache/v3
- Add automatic cache eviction with 30-second TTL to prevent memory bloat
- Implement proper cache lifecycle management (start/stop)
- Remove emoji debug logs from bridge system and XMPP client
- Clean up verbose logging while maintaining essential error handling
- Update bridge interface method names for consistency

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Felipe M 2025-08-05 12:19:44 +02:00
parent 7b56cb34c6
commit eb852662f7
No known key found for this signature in database
GPG key ID: 52E5D65FCF99808A
9 changed files with 163 additions and 105 deletions

3
go.mod
View file

@ -102,6 +102,7 @@ require (
github.com/hashicorp/yamux v0.1.2 // indirect github.com/hashicorp/yamux v0.1.2 // indirect
github.com/hexops/gotextdiff v1.0.3 // indirect github.com/hexops/gotextdiff v1.0.3 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.4.0 // indirect
github.com/jgautheron/goconst v1.7.1 // indirect github.com/jgautheron/goconst v1.7.1 // indirect
github.com/jingyugao/rowserrcheck v1.1.1 // indirect github.com/jingyugao/rowserrcheck v1.1.1 // indirect
github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect
@ -210,7 +211,7 @@ require (
golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f // indirect golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f // indirect
golang.org/x/mod v0.22.0 // indirect golang.org/x/mod v0.22.0 // indirect
golang.org/x/net v0.34.0 // indirect golang.org/x/net v0.34.0 // indirect
golang.org/x/sync v0.10.0 // indirect golang.org/x/sync v0.15.0 // indirect
golang.org/x/sys v0.29.0 // indirect golang.org/x/sys v0.29.0 // indirect
golang.org/x/term v0.28.0 // indirect golang.org/x/term v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect golang.org/x/text v0.21.0 // indirect

4
go.sum
View file

@ -366,6 +366,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.4.0 h1:YS4P125qQS0tNhtL6aeYkheEaB/m8HCqdMMP4mnWdTY=
github.com/jellydator/ttlcache/v3 v3.4.0/go.mod h1:Hw9EgjymziQD3yGsQdf1FqFdpp7YjFMd4Srg5EJlgD4=
github.com/jgautheron/goconst v1.7.1 h1:VpdAG7Ca7yvvJk5n8dMwQhfEZJh95kl/Hl9S1OI5Jkk= github.com/jgautheron/goconst v1.7.1 h1:VpdAG7Ca7yvvJk5n8dMwQhfEZJh95kl/Hl9S1OI5Jkk=
github.com/jgautheron/goconst v1.7.1/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4= github.com/jgautheron/goconst v1.7.1/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4=
github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c= github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c=
@ -867,6 +869,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View file

@ -14,15 +14,15 @@ import (
// BridgeManager manages multiple bridge instances // BridgeManager manages multiple bridge instances
type BridgeManager struct { type BridgeManager struct {
bridges map[string]model.Bridge bridges map[string]model.Bridge
mu sync.RWMutex mu sync.RWMutex
logger logger.Logger logger logger.Logger
api plugin.API api plugin.API
remoteID string remoteID string
messageBus model.MessageBus messageBus model.MessageBus
routingCtx context.Context routingCtx context.Context
routingCancel context.CancelFunc routingCancel context.CancelFunc
routingWg sync.WaitGroup routingWg sync.WaitGroup
} }
// NewBridgeManager creates a new bridge manager // NewBridgeManager creates a new bridge manager
@ -163,6 +163,20 @@ func (m *BridgeManager) ListBridges() []string {
return bridges return bridges
} }
// Start starts the bridge manager and message routing system
func (m *BridgeManager) Start() error {
m.logger.LogInfo("Starting bridge manager")
// Start the message routing system
if err := m.StartMessageRouting(); err != nil {
m.logger.LogError("Failed to start message routing", "error", err)
return fmt.Errorf("failed to start message routing: %w", err)
}
m.logger.LogInfo("Bridge manager started successfully")
return nil
}
// HasBridge checks if a bridge with the given name is registered // HasBridge checks if a bridge with the given name is registered
func (m *BridgeManager) HasBridge(name string) bool { func (m *BridgeManager) HasBridge(name string) bool {
m.mu.RLock() m.mu.RLock()
@ -187,6 +201,11 @@ func (m *BridgeManager) Shutdown() error {
m.logger.LogInfo("Shutting down bridge manager", "bridge_count", len(m.bridges)) m.logger.LogInfo("Shutting down bridge manager", "bridge_count", len(m.bridges))
// Stop message routing first
if err := m.StopMessageRouting(); err != nil {
m.logger.LogError("Failed to stop message routing during shutdown", "error", err)
}
var errors []error var errors []error
for name, bridge := range m.bridges { for name, bridge := range m.bridges {
if bridge.IsConnected() { if bridge.IsConnected() {
@ -260,7 +279,7 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque
} }
// NEW: Check if room already mapped to another channel // NEW: Check if room already mapped to another channel
existingChannelID, err := bridge.GetRoomMapping(req.BridgeRoomID) existingChannelID, err := bridge.GetChannelMapping(req.BridgeRoomID)
if err != nil { if err != nil {
m.logger.LogError("Failed to check room mapping", "bridge_room_id", req.BridgeRoomID, "error", err) m.logger.LogError("Failed to check room mapping", "bridge_room_id", req.BridgeRoomID, "error", err)
return fmt.Errorf("failed to check room mapping: %w", err) return fmt.Errorf("failed to check room mapping: %w", err)
@ -274,7 +293,7 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque
} }
// NEW: Check if room exists on target bridge // NEW: Check if room exists on target bridge
roomExists, err := bridge.RoomExists(req.BridgeRoomID) roomExists, err := bridge.ChannelMappingExists(req.BridgeRoomID)
if err != nil { if err != nil {
m.logger.LogError("Failed to check room existence", "bridge_room_id", req.BridgeRoomID, "error", err) m.logger.LogError("Failed to check room existence", "bridge_room_id", req.BridgeRoomID, "error", err)
return fmt.Errorf("failed to check room existence: %w", err) return fmt.Errorf("failed to check room existence: %w", err)
@ -422,16 +441,16 @@ func (m *BridgeManager) unshareChannel(channelID string) error {
// startBridgeMessageHandler starts message handling for a specific bridge // startBridgeMessageHandler starts message handling for a specific bridge
func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge model.Bridge) { func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge model.Bridge) {
m.logger.LogDebug("Starting message handler for bridge", "bridge", bridgeName) m.logger.LogDebug("Starting message handler for bridge", "bridge", bridgeName)
// Subscribe to message bus // Subscribe to message bus
messageChannel := m.messageBus.Subscribe(bridgeName) messageChannel := m.messageBus.Subscribe(bridgeName)
// Start message routing goroutine // Start message routing goroutine
m.routingWg.Add(1) m.routingWg.Add(1)
go func() { go func() {
defer m.routingWg.Done() defer m.routingWg.Done()
defer m.logger.LogDebug("Message handler stopped for bridge", "bridge", bridgeName) defer m.logger.LogDebug("Message handler stopped for bridge", "bridge", bridgeName)
for { for {
select { select {
case msg, ok := <-messageChannel: case msg, ok := <-messageChannel:
@ -439,27 +458,27 @@ func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge mode
m.logger.LogDebug("Message channel closed for bridge", "bridge", bridgeName) m.logger.LogDebug("Message channel closed for bridge", "bridge", bridgeName)
return return
} }
if err := m.handleBridgeMessage(bridgeName, bridge, msg); err != nil { if err := m.handleBridgeMessage(bridgeName, bridge, msg); err != nil {
m.logger.LogError("Failed to handle message for bridge", m.logger.LogError("Failed to handle message for bridge",
"bridge", bridgeName, "bridge", bridgeName,
"source_bridge", msg.SourceBridge, "source_bridge", msg.SourceBridge,
"error", err) "error", err)
} }
case <-m.routingCtx.Done(): case <-m.routingCtx.Done():
m.logger.LogDebug("Context cancelled, stopping message handler", "bridge", bridgeName) m.logger.LogDebug("Context cancelled, stopping message handler", "bridge", bridgeName)
return return
} }
} }
}() }()
// Listen to bridge's outgoing messages // Listen to bridge's outgoing messages
m.routingWg.Add(1) m.routingWg.Add(1)
go func() { go func() {
defer m.routingWg.Done() defer m.routingWg.Done()
defer m.logger.LogDebug("Bridge message listener stopped", "bridge", bridgeName) defer m.logger.LogDebug("Bridge message listener stopped", "bridge", bridgeName)
bridgeMessageChannel := bridge.GetMessageChannel() bridgeMessageChannel := bridge.GetMessageChannel()
for { for {
select { select {
@ -468,14 +487,14 @@ func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge mode
m.logger.LogDebug("Bridge message channel closed", "bridge", bridgeName) m.logger.LogDebug("Bridge message channel closed", "bridge", bridgeName)
return return
} }
if err := m.messageBus.Publish(msg); err != nil { if err := m.messageBus.Publish(msg); err != nil {
m.logger.LogError("Failed to publish message from bridge", m.logger.LogError("Failed to publish message from bridge",
"bridge", bridgeName, "bridge", bridgeName,
"direction", msg.Direction, "direction", msg.Direction,
"error", err) "error", err)
} }
case <-m.routingCtx.Done(): case <-m.routingCtx.Done():
m.logger.LogDebug("Context cancelled, stopping bridge listener", "bridge", bridgeName) m.logger.LogDebug("Context cancelled, stopping bridge listener", "bridge", bridgeName)
return return
@ -486,26 +505,26 @@ func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge mode
// handleBridgeMessage processes an incoming message for a specific bridge // handleBridgeMessage processes an incoming message for a specific bridge
func (m *BridgeManager) handleBridgeMessage(bridgeName string, bridge model.Bridge, msg *model.DirectionalMessage) error { func (m *BridgeManager) handleBridgeMessage(bridgeName string, bridge model.Bridge, msg *model.DirectionalMessage) error {
m.logger.LogDebug("Handling message for bridge", m.logger.LogDebug("Handling message for bridge",
"target_bridge", bridgeName, "target_bridge", bridgeName,
"source_bridge", msg.SourceBridge, "source_bridge", msg.SourceBridge,
"direction", msg.Direction, "direction", msg.Direction,
"channel_id", msg.SourceChannelID) "channel_id", msg.SourceChannelID)
// Get the bridge's message handler // Get the bridge's message handler
handler := bridge.GetMessageHandler() handler := bridge.GetMessageHandler()
if handler == nil { if handler == nil {
return fmt.Errorf("bridge %s does not have a message handler", bridgeName) return fmt.Errorf("bridge %s does not have a message handler", bridgeName)
} }
// Check if the handler can process this message // Check if the handler can process this message
if !handler.CanHandleMessage(msg.BridgeMessage) { if !handler.CanHandleMessage(msg.BridgeMessage) {
m.logger.LogDebug("Bridge cannot handle message", m.logger.LogDebug("Bridge cannot handle message",
"bridge", bridgeName, "bridge", bridgeName,
"message_type", msg.MessageType) "message_type", msg.MessageType)
return nil // Not an error, just skip return nil // Not an error, just skip
} }
// Process the message // Process the message
return handler.ProcessMessage(msg) return handler.ProcessMessage(msg)
} }
@ -519,15 +538,15 @@ func (m *BridgeManager) StartMessageRouting() error {
// StopMessageRouting stops the message bus and routing system // StopMessageRouting stops the message bus and routing system
func (m *BridgeManager) StopMessageRouting() error { func (m *BridgeManager) StopMessageRouting() error {
m.logger.LogInfo("Stopping message routing system") m.logger.LogInfo("Stopping message routing system")
// Cancel routing context // Cancel routing context
if m.routingCancel != nil { if m.routingCancel != nil {
m.routingCancel() m.routingCancel()
} }
// Wait for all routing goroutines to finish // Wait for all routing goroutines to finish
m.routingWg.Wait() m.routingWg.Wait()
// Stop the message bus // Stop the message bus
return m.messageBus.Stop() return m.messageBus.Stop()
} }

View file

@ -25,6 +25,7 @@ type mattermostBridge struct {
api plugin.API api plugin.API
kvstore kvstore.KVStore kvstore kvstore.KVStore
userManager pluginModel.BridgeUserManager userManager pluginModel.BridgeUserManager
botUserID string // Bot user ID for posting messages
// Message handling // Message handling
messageHandler *mattermostMessageHandler messageHandler *mattermostMessageHandler
@ -46,12 +47,13 @@ type mattermostBridge struct {
} }
// NewBridge creates a new Mattermost bridge // NewBridge creates a new Mattermost bridge
func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration) pluginModel.Bridge { func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration, botUserID string) pluginModel.Bridge {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
b := &mattermostBridge{ b := &mattermostBridge{
logger: log, logger: log,
api: api, api: api,
kvstore: kvstore, kvstore: kvstore,
botUserID: botUserID,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
channelMappings: make(map[string]string), channelMappings: make(map[string]string),
@ -304,8 +306,8 @@ func (b *mattermostBridge) DeleteChannelMapping(channelID string) error {
return nil return nil
} }
// RoomExists checks if a Mattermost channel exists on the server // ChannelMappingExists checks if a Mattermost channel exists on the server
func (b *mattermostBridge) RoomExists(roomID string) (bool, error) { func (b *mattermostBridge) ChannelMappingExists(roomID string) (bool, error) {
if b.api == nil { if b.api == nil {
return false, fmt.Errorf("Mattermost API not initialized") return false, fmt.Errorf("Mattermost API not initialized")
} }
@ -354,6 +356,21 @@ func (b *mattermostBridge) GetRoomMapping(roomID string) (string, error) {
return channelID, nil return channelID, nil
} }
// GetChannelMappingForBridge retrieves the Mattermost channel ID for a given room ID from a specific bridge
func (b *mattermostBridge) GetChannelMappingForBridge(bridgeName, roomID string) (string, error) {
channelIDBytes, err := b.kvstore.Get(kvstore.BuildChannelMapKey(bridgeName, roomID))
if err != nil {
// No mapping found is not an error, just return empty string
b.logger.LogDebug("No channel mapping found for bridge room", "bridge_name", bridgeName, "room_id", roomID)
return "", nil
}
channelID := string(channelIDBytes)
b.logger.LogDebug("Found channel mapping for bridge room", "bridge_name", bridgeName, "room_id", roomID, "channel_id", channelID)
return channelID, nil
}
// GetUserManager returns the user manager for this bridge // GetUserManager returns the user manager for this bridge
func (b *mattermostBridge) GetUserManager() pluginModel.BridgeUserManager { func (b *mattermostBridge) GetUserManager() pluginModel.BridgeUserManager {
return b.userManager return b.userManager

View file

@ -62,8 +62,8 @@ func (h *mattermostMessageHandler) postMessageToMattermost(msg *pluginModel.Brid
return fmt.Errorf("Mattermost API not initialized") return fmt.Errorf("Mattermost API not initialized")
} }
// Get the Mattermost channel ID from the channel mapping // Get the Mattermost channel ID from the channel mapping using the source bridge name
channelID, err := h.bridge.GetRoomMapping(msg.SourceChannelID) channelID, err := h.bridge.GetChannelMappingForBridge(msg.SourceBridge, msg.SourceChannelID)
if err != nil { if err != nil {
return fmt.Errorf("failed to get channel mapping: %w", err) return fmt.Errorf("failed to get channel mapping: %w", err)
} }
@ -87,14 +87,15 @@ func (h *mattermostMessageHandler) postMessageToMattermost(msg *pluginModel.Brid
// Create the post // Create the post
post := &mmModel.Post{ post := &mmModel.Post{
ChannelId: channelID, ChannelId: channelID,
UserId: h.bridge.botUserID,
Message: content, Message: content,
Type: mmModel.PostTypeDefault, Type: mmModel.PostTypeDefault,
Props: map[string]interface{}{ Props: map[string]interface{}{
"from_bridge": msg.SourceBridge, "from_bridge": msg.SourceBridge,
"bridge_user_id": msg.SourceUserID, "bridge_user_id": msg.SourceUserID,
"bridge_user_name": msg.SourceUserName, "bridge_user_name": msg.SourceUserName,
"bridge_message_id": msg.MessageID, "bridge_message_id": msg.MessageID,
"bridge_timestamp": msg.Timestamp.Unix(), "bridge_timestamp": msg.Timestamp.Unix(),
}, },
} }
@ -204,4 +205,4 @@ func (r *mattermostUserResolver) GetDisplayName(externalUserID string) string {
} }
return user.Id[:8] // Show first 8 chars of ID as fallback return user.Id[:8] // Show first 8 chars of ID as fallback
} }

View file

@ -502,8 +502,8 @@ func (b *xmppBridge) DeleteChannelMapping(channelID string) error {
return nil return nil
} }
// RoomExists checks if an XMPP room exists on the remote service // ChannelMappingExists checks if an XMPP room exists on the remote service
func (b *xmppBridge) RoomExists(roomID string) (bool, error) { func (b *xmppBridge) ChannelMappingExists(roomID string) (bool, error) {
if !b.connected.Load() { if !b.connected.Load() {
return false, fmt.Errorf("not connected to XMPP server") return false, fmt.Errorf("not connected to XMPP server")
} }
@ -547,6 +547,22 @@ func (b *xmppBridge) GetRoomMapping(roomID string) (string, error) {
return channelID, nil return channelID, nil
} }
// GetChannelMappingForBridge retrieves the Mattermost channel ID for a given room ID from a specific bridge
func (b *xmppBridge) GetChannelMappingForBridge(bridgeName, roomID string) (string, error) {
// Look up the channel ID using the bridge name and room ID as the key
channelIDBytes, err := b.kvstore.Get(kvstore.BuildChannelMapKey(bridgeName, roomID))
if err != nil {
// No mapping found is not an error, just return empty string
b.logger.LogDebug("No channel mapping found for bridge room", "bridge_name", bridgeName, "room_id", roomID)
return "", nil
}
channelID := string(channelIDBytes)
b.logger.LogDebug("Found channel mapping for bridge room", "bridge_name", bridgeName, "room_id", roomID, "channel_id", channelID)
return channelID, nil
}
// GetUserManager returns the user manager for this bridge // GetUserManager returns the user manager for this bridge
func (b *xmppBridge) GetUserManager() pluginModel.BridgeUserManager { func (b *xmppBridge) GetUserManager() pluginModel.BridgeUserManager {
return b.userManager return b.userManager
@ -554,44 +570,30 @@ func (b *xmppBridge) GetUserManager() pluginModel.BridgeUserManager {
// startMessageAggregation starts the message aggregation goroutine // startMessageAggregation starts the message aggregation goroutine
func (b *xmppBridge) startMessageAggregation() { func (b *xmppBridge) startMessageAggregation() {
b.logger.LogDebug("Starting XMPP message aggregation") clientChannel := b.bridgeClient.GetMessageChannel()
for { for {
select { select {
case <-b.ctx.Done(): case <-b.ctx.Done():
b.logger.LogDebug("Stopping XMPP message aggregation") b.logger.LogDebug("Stopping XMPP message aggregation")
return return
default: case msg, ok := <-clientChannel:
// Aggregate messages from bridge client if available if !ok {
if b.bridgeClient != nil { b.logger.LogDebug("Bridge client message channel closed")
clientChannel := b.bridgeClient.GetMessageChannel() return
select {
case msg, ok := <-clientChannel:
if !ok {
b.logger.LogDebug("Bridge client message channel closed")
continue
}
// Forward to our bridge's message channel
select {
case b.incomingMessages <- msg:
b.logger.LogDebug("Message forwarded from bridge client",
"source_channel", msg.SourceChannelID,
"user_id", msg.SourceUserID)
default:
b.logger.LogWarn("Bridge message channel full, dropping message",
"source_channel", msg.SourceChannelID,
"user_id", msg.SourceUserID)
}
case <-b.ctx.Done():
return
default:
// No messages available, continue with other potential sources
}
} }
// TODO: Add aggregation from user client channels when implemented // Forward to our bridge's message channel
// This is where we would aggregate from multiple XMPP user connections select {
case b.incomingMessages <- msg:
// Message forwarded successfully
case <-b.ctx.Done():
return
default:
b.logger.LogWarn("Bridge message channel full, dropping message",
"source_channel", msg.SourceChannelID,
"user_id", msg.SourceUserID)
}
} }
} }
} }

View file

@ -73,6 +73,9 @@ func (r DeleteChannelMappingRequest) Validate() error {
} }
type BridgeManager interface { type BridgeManager interface {
// Start starts the bridge manager and message routing system.
Start() error
// RegisterBridge registers a bridge with the given name. Returns an error if the name is empty, // RegisterBridge registers a bridge with the given name. Returns an error if the name is empty,
// the bridge is nil, or a bridge with the same name is already registered. // the bridge is nil, or a bridge with the same name is already registered.
RegisterBridge(name string, bridge Bridge) error RegisterBridge(name string, bridge Bridge) error
@ -138,11 +141,11 @@ type Bridge interface {
// DeleteChannelMapping removes a mapping between a Mattermost channel ID and a bridge room ID. // DeleteChannelMapping removes a mapping between a Mattermost channel ID and a bridge room ID.
DeleteChannelMapping(channelID string) error DeleteChannelMapping(channelID string) error
// RoomExists checks if a room/channel exists on the remote service. // ChannelMappingExists checks if a room/channel exists on the remote service.
RoomExists(roomID string) (bool, error) ChannelMappingExists(roomID string) (bool, error)
// GetRoomMapping retrieves the Mattermost channel ID for a given room ID (reverse lookup). // GetChannelMappingForBridge retrieves the Mattermost channel ID for a given room ID from a specific bridge.
GetRoomMapping(roomID string) (string, error) GetChannelMappingForBridge(bridgeName, roomID string) (string, error)
// IsConnected checks if the bridge is connected to the remote service. // IsConnected checks if the bridge is connected to the remote service.
IsConnected() bool IsConnected() bool

View file

@ -14,7 +14,6 @@ import (
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/store/kvstore" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/store/kvstore"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp"
"github.com/mattermost/mattermost/server/public/model" "github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/plugin" "github.com/mattermost/mattermost/server/public/plugin"
"github.com/mattermost/mattermost/server/public/pluginapi" "github.com/mattermost/mattermost/server/public/pluginapi"
@ -35,9 +34,6 @@ type Plugin struct {
// commandClient is the client used to register and execute slash commands. // commandClient is the client used to register and execute slash commands.
commandClient command.Command commandClient command.Command
// xmppClient is the client used to communicate with XMPP servers.
xmppClient *xmpp.Client
// logger is the main plugin logger // logger is the main plugin logger
logger logger.Logger logger logger.Logger
@ -70,15 +66,11 @@ func (p *Plugin) OnActivate() error {
p.kvstore = kvstore.NewKVStore(p.client) p.kvstore = kvstore.NewKVStore(p.client)
p.initXMPPClient()
// Load configuration directly // Load configuration directly
cfg := p.getConfiguration() cfg := p.getConfiguration()
p.logger.LogDebug("Loaded configuration in OnActivate", "config", cfg)
// Register the plugin for shared channels // Register the plugin for shared channels
if err := p.registerForSharedChannels(); err != nil { if err := p.registerForSharedChannels(); err != nil {
p.logger.LogError("Failed to register for shared channels", "error", err)
return fmt.Errorf("failed to register for shared channels: %w", err) return fmt.Errorf("failed to register for shared channels: %w", err)
} }
@ -87,12 +79,16 @@ func (p *Plugin) OnActivate() error {
// Initialize and register bridges with current configuration // Initialize and register bridges with current configuration
if err := p.initBridges(*cfg); err != nil { if err := p.initBridges(*cfg); err != nil {
p.logger.LogError("Failed to initialize bridges", "error", err)
return fmt.Errorf("failed to initialize bridges: %w", err) return fmt.Errorf("failed to initialize bridges: %w", err)
} }
p.commandClient = command.NewCommandHandler(p.client, p.bridgeManager) p.commandClient = command.NewCommandHandler(p.client, p.bridgeManager)
// Start the bridge manager (this starts message routing)
if err := p.bridgeManager.Start(); err != nil {
return fmt.Errorf("failed to start bridge manager: %w", err)
}
// Start all bridges // Start all bridges
for _, bridgeName := range p.bridgeManager.ListBridges() { for _, bridgeName := range p.bridgeManager.ListBridges() {
if err := p.bridgeManager.StartBridge(bridgeName); err != nil { if err := p.bridgeManager.StartBridge(bridgeName); err != nil {
@ -145,18 +141,6 @@ func (p *Plugin) ExecuteCommand(c *plugin.Context, args *model.CommandArgs) (*mo
return response, nil return response, nil
} }
func (p *Plugin) initXMPPClient() {
cfg := p.getConfiguration()
p.xmppClient = xmpp.NewClient(
cfg.XMPPServerURL,
cfg.XMPPUsername,
cfg.XMPPPassword,
cfg.GetXMPPResource(),
p.remoteID,
p.logger,
)
}
func (p *Plugin) initBridges(cfg config.Configuration) error { func (p *Plugin) initBridges(cfg config.Configuration) error {
// Create and register XMPP bridge // Create and register XMPP bridge
xmppBridge := xmppbridge.NewBridge( xmppBridge := xmppbridge.NewBridge(
@ -176,6 +160,7 @@ func (p *Plugin) initBridges(cfg config.Configuration) error {
p.API, p.API,
p.kvstore, p.kvstore,
&cfg, &cfg,
p.botUserID,
) )
if err := p.bridgeManager.RegisterBridge("mattermost", mattermostBridge); err != nil { if err := p.bridgeManager.RegisterBridge("mattermost", mattermostBridge); err != nil {

View file

@ -8,6 +8,7 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/jellydator/ttlcache/v3"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
"mellium.im/sasl" "mellium.im/sasl"
@ -26,6 +27,9 @@ const (
// msgBufferSize is the buffer size for incoming message channels // msgBufferSize is the buffer size for incoming message channels
msgBufferSize = 1000 msgBufferSize = 1000
// messageDedupeTTL is the TTL for message deduplication cache
messageDedupeTTL = 30 * time.Second
) )
// Client represents an XMPP client for communicating with XMPP servers. // Client represents an XMPP client for communicating with XMPP servers.
@ -51,6 +55,9 @@ type Client struct {
// Message handling for bridge integration // Message handling for bridge integration
incomingMessages chan *model.DirectionalMessage incomingMessages chan *model.DirectionalMessage
// Message deduplication cache to handle XMPP server duplicates
dedupeCache *ttlcache.Cache[string, time.Time]
} }
// MessageRequest represents a request to send a message. // MessageRequest represents a request to send a message.
@ -99,6 +106,14 @@ type UserProfile struct {
func NewClient(serverURL, username, password, resource, remoteID string, logger logger.Logger) *Client { func NewClient(serverURL, username, password, resource, remoteID string, logger logger.Logger) *Client {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// Create TTL cache for message deduplication
dedupeCache := ttlcache.New(
ttlcache.WithTTL[string, time.Time](messageDedupeTTL),
)
// Start automatic cleanup in background
go dedupeCache.Start()
client := &Client{ client := &Client{
serverURL: serverURL, serverURL: serverURL,
username: username, username: username,
@ -110,6 +125,7 @@ func NewClient(serverURL, username, password, resource, remoteID string, logger
cancel: cancel, cancel: cancel,
sessionReady: make(chan struct{}), sessionReady: make(chan struct{}),
incomingMessages: make(chan *model.DirectionalMessage, msgBufferSize), incomingMessages: make(chan *model.DirectionalMessage, msgBufferSize),
dedupeCache: dedupeCache,
} }
// Create MUC client and set up message handling // Create MUC client and set up message handling
@ -139,6 +155,7 @@ func (c *Client) SetServerDomain(domain string) {
// Connect establishes connection to the XMPP server // Connect establishes connection to the XMPP server
func (c *Client) Connect() error { func (c *Client) Connect() error {
if c.session != nil { if c.session != nil {
return nil // Already connected return nil // Already connected
} }
@ -269,10 +286,11 @@ func (c *Client) Disconnect() error {
// Continue with cleanup even on timeout // Continue with cleanup even on timeout
} }
// Stop the TTL cache cleanup goroutine
c.dedupeCache.Stop()
// Cancel the client context // Cancel the client context
if c.cancel != nil { c.cancel()
c.cancel()
}
c.logger.LogInfo("XMPP client disconnected successfully") c.logger.LogInfo("XMPP client disconnected successfully")
return nil return nil
@ -653,6 +671,17 @@ func (c *Client) handleIncomingMessage(msg stanza.Message, t xmlstream.TokenRead
return nil return nil
} }
// Deduplicate messages using message ID and TTL cache
if msg.ID != "" {
// Check if this message ID is already in the cache (indicates duplicate)
if c.dedupeCache.Has(msg.ID) {
return nil
}
// Record this message in the cache with TTL
c.dedupeCache.Set(msg.ID, time.Now(), ttlcache.DefaultTTL)
}
// Extract channel and user information from JIDs // Extract channel and user information from JIDs
channelID, err := c.extractChannelID(msg.From) channelID, err := c.extractChannelID(msg.From)
if err != nil { if err != nil {
@ -688,10 +717,7 @@ func (c *Client) handleIncomingMessage(msg stanza.Message, t xmlstream.TokenRead
// Send to message channel (non-blocking) // Send to message channel (non-blocking)
select { select {
case c.incomingMessages <- directionalMsg: case c.incomingMessages <- directionalMsg:
c.logger.LogDebug("Message queued for processing", // Message queued successfully
"channel_id", channelID,
"user_id", userID,
"content_length", len(msgWithBody.Body))
default: default:
c.logger.LogWarn("Message channel full, dropping message", c.logger.LogWarn("Message channel full, dropping message",
"channel_id", channelID, "channel_id", channelID,