refactor: standardize bridge-agnostic terminology and remove unused kvstore functions
- Replace "Room" with "Channel" in bridge-agnostic contexts throughout codebase - Update BridgeRoomID → BridgeChannelID in model structs and all references - Change error messages to use consistent "Channel" terminology for user-facing text - Update log keys: bridge_room_id → bridge_channel_id for consistency - Clean up kvstore constants file by removing unused functions and constants: - Removed BuildXMPPUserKey, BuildMattermostUserKey, BuildGhostUserKey - Removed BuildXMPPEventPostKey, BuildXMPPReactionKey functions - Removed unused constants: KeyPrefixXMPPUser, KeyPrefixMattermostUser, etc. - Keep only actively used BuildChannelMapKey and ExtractIdentifierFromChannelMapKey - Preserve XMPP-specific "Room" terminology in appropriate contexts (client methods, JIDs) 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
eb852662f7
commit
4c6aeb2392
8 changed files with 94 additions and 150 deletions
|
@ -264,7 +264,7 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque
|
|||
return fmt.Errorf("invalid mapping request: %w", err)
|
||||
}
|
||||
|
||||
m.logger.LogDebug("Creating channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID, "user_id", req.UserID, "team_id", req.TeamID)
|
||||
m.logger.LogDebug("Creating channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID, "user_id", req.UserID, "team_id", req.TeamID)
|
||||
|
||||
// Get the specific bridge
|
||||
bridge, err := m.GetBridge(req.BridgeName)
|
||||
|
@ -279,41 +279,41 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque
|
|||
}
|
||||
|
||||
// NEW: Check if room already mapped to another channel
|
||||
existingChannelID, err := bridge.GetChannelMapping(req.BridgeRoomID)
|
||||
existingChannelID, err := bridge.GetChannelMapping(req.BridgeChannelID)
|
||||
if err != nil {
|
||||
m.logger.LogError("Failed to check room mapping", "bridge_room_id", req.BridgeRoomID, "error", err)
|
||||
return fmt.Errorf("failed to check room mapping: %w", err)
|
||||
m.logger.LogError("Failed to check channel mapping", "bridge_channel_id", req.BridgeChannelID, "error", err)
|
||||
return fmt.Errorf("failed to check channel mapping: %w", err)
|
||||
}
|
||||
if existingChannelID != "" {
|
||||
m.logger.LogWarn("Room already mapped to another channel",
|
||||
"bridge_room_id", req.BridgeRoomID,
|
||||
m.logger.LogWarn("Channel already mapped to another channel",
|
||||
"bridge_channel_id", req.BridgeChannelID,
|
||||
"existing_channel_id", existingChannelID,
|
||||
"requested_channel_id", req.ChannelID)
|
||||
return fmt.Errorf("room '%s' is already mapped to channel '%s'", req.BridgeRoomID, existingChannelID)
|
||||
return fmt.Errorf("channel '%s' is already mapped to channel '%s'", req.BridgeChannelID, existingChannelID)
|
||||
}
|
||||
|
||||
// NEW: Check if room exists on target bridge
|
||||
roomExists, err := bridge.ChannelMappingExists(req.BridgeRoomID)
|
||||
channelExists, err := bridge.ChannelMappingExists(req.BridgeChannelID)
|
||||
if err != nil {
|
||||
m.logger.LogError("Failed to check room existence", "bridge_room_id", req.BridgeRoomID, "error", err)
|
||||
return fmt.Errorf("failed to check room existence: %w", err)
|
||||
m.logger.LogError("Failed to check channel existence", "bridge_channel_id", req.BridgeChannelID, "error", err)
|
||||
return fmt.Errorf("failed to check channel existence: %w", err)
|
||||
}
|
||||
if !roomExists {
|
||||
m.logger.LogWarn("Room does not exist on bridge",
|
||||
"bridge_room_id", req.BridgeRoomID,
|
||||
if !channelExists {
|
||||
m.logger.LogWarn("Channel does not exist on bridge",
|
||||
"bridge_channel_id", req.BridgeChannelID,
|
||||
"bridge_name", req.BridgeName)
|
||||
return fmt.Errorf("room '%s' does not exist on %s bridge", req.BridgeRoomID, req.BridgeName)
|
||||
return fmt.Errorf("channel '%s' does not exist on %s bridge", req.BridgeChannelID, req.BridgeName)
|
||||
}
|
||||
|
||||
m.logger.LogDebug("Room validation passed",
|
||||
"bridge_room_id", req.BridgeRoomID,
|
||||
m.logger.LogDebug("Channel validation passed",
|
||||
"bridge_channel_id", req.BridgeChannelID,
|
||||
"bridge_name", req.BridgeName,
|
||||
"room_exists", roomExists,
|
||||
"channel_exists", channelExists,
|
||||
"already_mapped", false)
|
||||
|
||||
// Create the channel mapping on the receiving bridge
|
||||
if err = bridge.CreateChannelMapping(req.ChannelID, req.BridgeRoomID); err != nil {
|
||||
m.logger.LogError("Failed to create channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID, "error", err)
|
||||
if err = bridge.CreateChannelMapping(req.ChannelID, req.BridgeChannelID); err != nil {
|
||||
m.logger.LogError("Failed to create channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID, "error", err)
|
||||
return fmt.Errorf("failed to create channel mapping for bridge '%s': %w", req.BridgeName, err)
|
||||
}
|
||||
|
||||
|
@ -324,19 +324,19 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque
|
|||
}
|
||||
|
||||
// Create the channel mapping in the Mattermost bridge
|
||||
if err = mattermostBridge.CreateChannelMapping(req.ChannelID, req.BridgeRoomID); err != nil {
|
||||
m.logger.LogError("Failed to create channel mapping in Mattermost bridge", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID, "error", err)
|
||||
if err = mattermostBridge.CreateChannelMapping(req.ChannelID, req.BridgeChannelID); err != nil {
|
||||
m.logger.LogError("Failed to create channel mapping in Mattermost bridge", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID, "error", err)
|
||||
return fmt.Errorf("failed to create channel mapping in Mattermost bridge: %w", err)
|
||||
}
|
||||
|
||||
// Share the channel using Mattermost's shared channels API
|
||||
if err = m.shareChannel(req); err != nil {
|
||||
m.logger.LogError("Failed to share channel", "channel_id", req.ChannelID, "bridge_room_id", req.BridgeRoomID, "error", err)
|
||||
m.logger.LogError("Failed to share channel", "channel_id", req.ChannelID, "bridge_channel_id", req.BridgeChannelID, "error", err)
|
||||
// Don't fail the entire operation if sharing fails, but log the error
|
||||
m.logger.LogWarn("Channel mapping created but sharing failed - channel may not sync properly")
|
||||
}
|
||||
|
||||
m.logger.LogInfo("Successfully created channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID)
|
||||
m.logger.LogInfo("Successfully created channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -403,9 +403,9 @@ func (m *BridgeManager) shareChannel(req model.CreateChannelMappingRequest) erro
|
|||
TeamId: req.TeamID,
|
||||
Home: true,
|
||||
ReadOnly: false,
|
||||
ShareName: model.SanitizeShareName(fmt.Sprintf("bridge-%s", req.BridgeRoomID)),
|
||||
ShareDisplayName: fmt.Sprintf("Bridge: %s", req.BridgeRoomID),
|
||||
SharePurpose: fmt.Sprintf("Shared channel bridged to %s", req.BridgeRoomID),
|
||||
ShareName: model.SanitizeShareName(fmt.Sprintf("bridge-%s", req.BridgeChannelID)),
|
||||
ShareDisplayName: fmt.Sprintf("Bridge: %s", req.BridgeChannelID),
|
||||
SharePurpose: fmt.Sprintf("Shared channel bridged to %s", req.BridgeChannelID),
|
||||
ShareHeader: "test header",
|
||||
CreatorId: req.UserID,
|
||||
RemoteId: m.remoteID,
|
||||
|
|
|
@ -13,7 +13,7 @@ import (
|
|||
const (
|
||||
// DefaultMessageBufferSize is the default buffer size for message channels
|
||||
DefaultMessageBufferSize = 1000
|
||||
|
||||
|
||||
// MessageDeliveryTimeout is the maximum time to wait for message delivery
|
||||
MessageDeliveryTimeout = 5 * time.Second
|
||||
)
|
||||
|
@ -24,20 +24,20 @@ type messageBus struct {
|
|||
incomingMessages chan *model.DirectionalMessage
|
||||
subscribers map[string]chan *model.DirectionalMessage
|
||||
subscribersMu sync.RWMutex
|
||||
|
||||
|
||||
// Lifecycle management
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
logger logger.Logger
|
||||
wg sync.WaitGroup
|
||||
started bool
|
||||
startMu sync.Mutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
logger logger.Logger
|
||||
wg sync.WaitGroup
|
||||
started bool
|
||||
startMu sync.Mutex
|
||||
}
|
||||
|
||||
// NewMessageBus creates a new message bus instance
|
||||
func NewMessageBus(logger logger.Logger) model.MessageBus {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
|
||||
return &messageBus{
|
||||
incomingMessages: make(chan *model.DirectionalMessage, DefaultMessageBufferSize),
|
||||
subscribers: make(map[string]chan *model.DirectionalMessage),
|
||||
|
@ -51,11 +51,11 @@ func NewMessageBus(logger logger.Logger) model.MessageBus {
|
|||
func (mb *messageBus) Subscribe(bridgeName string) <-chan *model.DirectionalMessage {
|
||||
mb.subscribersMu.Lock()
|
||||
defer mb.subscribersMu.Unlock()
|
||||
|
||||
|
||||
// Create a buffered channel for this subscriber
|
||||
ch := make(chan *model.DirectionalMessage, DefaultMessageBufferSize)
|
||||
mb.subscribers[bridgeName] = ch
|
||||
|
||||
|
||||
mb.logger.LogDebug("Bridge subscribed to message bus", "bridge", bridgeName)
|
||||
return ch
|
||||
}
|
||||
|
@ -65,20 +65,20 @@ func (mb *messageBus) Publish(msg *model.DirectionalMessage) error {
|
|||
if msg == nil {
|
||||
return fmt.Errorf("message cannot be nil")
|
||||
}
|
||||
|
||||
|
||||
if msg.BridgeMessage == nil {
|
||||
return fmt.Errorf("bridge message cannot be nil")
|
||||
}
|
||||
|
||||
|
||||
select {
|
||||
case mb.incomingMessages <- msg:
|
||||
mb.logger.LogDebug("Message published to bus",
|
||||
mb.logger.LogDebug("Message published to bus",
|
||||
"source_bridge", msg.SourceBridge,
|
||||
"direction", msg.Direction,
|
||||
"channel_id", msg.SourceChannelID)
|
||||
return nil
|
||||
case <-time.After(MessageDeliveryTimeout):
|
||||
mb.logger.LogWarn("Message delivery timeout",
|
||||
mb.logger.LogWarn("Message delivery timeout",
|
||||
"source_bridge", msg.SourceBridge,
|
||||
"channel_id", msg.SourceChannelID)
|
||||
return fmt.Errorf("message delivery timeout")
|
||||
|
@ -91,17 +91,17 @@ func (mb *messageBus) Publish(msg *model.DirectionalMessage) error {
|
|||
func (mb *messageBus) Start() error {
|
||||
mb.startMu.Lock()
|
||||
defer mb.startMu.Unlock()
|
||||
|
||||
|
||||
if mb.started {
|
||||
return fmt.Errorf("message bus is already started")
|
||||
}
|
||||
|
||||
|
||||
mb.logger.LogInfo("Starting message bus")
|
||||
|
||||
|
||||
// Start the message routing goroutine
|
||||
mb.wg.Add(1)
|
||||
go mb.routeMessages()
|
||||
|
||||
|
||||
mb.started = true
|
||||
mb.logger.LogInfo("Message bus started successfully")
|
||||
return nil
|
||||
|
@ -111,19 +111,19 @@ func (mb *messageBus) Start() error {
|
|||
func (mb *messageBus) Stop() error {
|
||||
mb.startMu.Lock()
|
||||
defer mb.startMu.Unlock()
|
||||
|
||||
|
||||
if !mb.started {
|
||||
return nil // Already stopped
|
||||
}
|
||||
|
||||
|
||||
mb.logger.LogInfo("Stopping message bus")
|
||||
|
||||
|
||||
// Cancel context to signal shutdown
|
||||
mb.cancel()
|
||||
|
||||
|
||||
// Wait for routing goroutine to finish
|
||||
mb.wg.Wait()
|
||||
|
||||
|
||||
// Close all subscriber channels
|
||||
mb.subscribersMu.Lock()
|
||||
for bridgeName, ch := range mb.subscribers {
|
||||
|
@ -132,10 +132,10 @@ func (mb *messageBus) Stop() error {
|
|||
}
|
||||
mb.subscribers = make(map[string]chan *model.DirectionalMessage)
|
||||
mb.subscribersMu.Unlock()
|
||||
|
||||
|
||||
// Close incoming messages channel
|
||||
close(mb.incomingMessages)
|
||||
|
||||
|
||||
mb.started = false
|
||||
mb.logger.LogInfo("Message bus stopped successfully")
|
||||
return nil
|
||||
|
@ -144,9 +144,9 @@ func (mb *messageBus) Stop() error {
|
|||
// routeMessages handles the main message routing loop
|
||||
func (mb *messageBus) routeMessages() {
|
||||
defer mb.wg.Done()
|
||||
|
||||
|
||||
mb.logger.LogDebug("Message routing started")
|
||||
|
||||
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-mb.incomingMessages:
|
||||
|
@ -154,14 +154,14 @@ func (mb *messageBus) routeMessages() {
|
|||
mb.logger.LogDebug("Incoming messages channel closed, stopping routing")
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
if err := mb.routeMessage(msg); err != nil {
|
||||
mb.logger.LogError("Failed to route message",
|
||||
mb.logger.LogError("Failed to route message",
|
||||
"source_bridge", msg.SourceBridge,
|
||||
"direction", msg.Direction,
|
||||
"error", err)
|
||||
}
|
||||
|
||||
|
||||
case <-mb.ctx.Done():
|
||||
mb.logger.LogDebug("Context cancelled, stopping message routing")
|
||||
return
|
||||
|
@ -173,9 +173,9 @@ func (mb *messageBus) routeMessages() {
|
|||
func (mb *messageBus) routeMessage(msg *model.DirectionalMessage) error {
|
||||
mb.subscribersMu.RLock()
|
||||
defer mb.subscribersMu.RUnlock()
|
||||
|
||||
|
||||
routedCount := 0
|
||||
|
||||
|
||||
// Route to specific target bridges if specified
|
||||
if len(msg.TargetBridges) > 0 {
|
||||
for _, targetBridge := range msg.TargetBridges {
|
||||
|
@ -184,7 +184,7 @@ func (mb *messageBus) routeMessage(msg *model.DirectionalMessage) error {
|
|||
routedCount++
|
||||
}
|
||||
} else {
|
||||
mb.logger.LogWarn("Target bridge not subscribed",
|
||||
mb.logger.LogWarn("Target bridge not subscribed",
|
||||
"target_bridge", targetBridge,
|
||||
"source_bridge", msg.SourceBridge)
|
||||
}
|
||||
|
@ -199,11 +199,11 @@ func (mb *messageBus) routeMessage(msg *model.DirectionalMessage) error {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
mb.logger.LogDebug("Message routed",
|
||||
|
||||
mb.logger.LogDebug("Message routed",
|
||||
"source_bridge", msg.SourceBridge,
|
||||
"routed_to_count", routedCount)
|
||||
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -213,7 +213,7 @@ func (mb *messageBus) deliverMessage(ch chan *model.DirectionalMessage, msg *mod
|
|||
case ch <- msg:
|
||||
return true
|
||||
case <-time.After(MessageDeliveryTimeout):
|
||||
mb.logger.LogWarn("Message delivery timeout to bridge",
|
||||
mb.logger.LogWarn("Message delivery timeout to bridge",
|
||||
"target_bridge", targetBridge,
|
||||
"source_bridge", msg.SourceBridge)
|
||||
return false
|
||||
|
@ -226,19 +226,19 @@ func (mb *messageBus) deliverMessage(ch chan *model.DirectionalMessage, msg *mod
|
|||
func (mb *messageBus) GetStats() map[string]interface{} {
|
||||
mb.subscribersMu.RLock()
|
||||
defer mb.subscribersMu.RUnlock()
|
||||
|
||||
|
||||
stats := map[string]interface{}{
|
||||
"started": mb.started,
|
||||
"subscriber_count": len(mb.subscribers),
|
||||
"buffer_size": DefaultMessageBufferSize,
|
||||
"pending_messages": len(mb.incomingMessages),
|
||||
"started": mb.started,
|
||||
"subscriber_count": len(mb.subscribers),
|
||||
"buffer_size": DefaultMessageBufferSize,
|
||||
"pending_messages": len(mb.incomingMessages),
|
||||
}
|
||||
|
||||
|
||||
subscribers := make([]string, 0, len(mb.subscribers))
|
||||
for bridgeName := range mb.subscribers {
|
||||
subscribers = append(subscribers, bridgeName)
|
||||
}
|
||||
stats["subscribers"] = subscribers
|
||||
|
||||
|
||||
return stats
|
||||
}
|
||||
}
|
||||
|
|
|
@ -163,4 +163,4 @@ func (r *xmppUserResolver) GetDisplayName(externalUserID string) string {
|
|||
|
||||
// Fallback to the full ID
|
||||
return externalUserID
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue