chore: register bridge id
This commit is contained in:
parent
b1c6f21ea3
commit
245f5f96db
5 changed files with 52 additions and 33 deletions
|
@ -64,7 +64,7 @@ func (m *BridgeManager) RegisterBridge(name string, bridge model.Bridge) error {
|
|||
}
|
||||
|
||||
m.bridges[name] = bridge
|
||||
m.logger.LogInfo("Bridge registered", "name", name)
|
||||
m.logger.LogInfo("Bridge registered", "bridge_id", name)
|
||||
|
||||
// Subscribe bridge to message bus
|
||||
go m.startBridgeMessageHandler(name, bridge)
|
||||
|
@ -82,14 +82,14 @@ func (m *BridgeManager) StartBridge(name string) error {
|
|||
return fmt.Errorf("bridge '%s' is not registered", name)
|
||||
}
|
||||
|
||||
m.logger.LogInfo("Starting bridge", "name", name)
|
||||
m.logger.LogInfo("Starting bridge", "bridge_id", name)
|
||||
|
||||
if err := bridge.Start(); err != nil {
|
||||
m.logger.LogError("Failed to start bridge", "name", name, "error", err)
|
||||
m.logger.LogError("Failed to start bridge", "bridge_id", name, "error", err)
|
||||
return fmt.Errorf("failed to start bridge '%s': %w", name, err)
|
||||
}
|
||||
|
||||
m.logger.LogInfo("Bridge started successfully", "name", name)
|
||||
m.logger.LogInfo("Bridge started successfully", "bridge_id", name)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -103,14 +103,14 @@ func (m *BridgeManager) StopBridge(name string) error {
|
|||
return fmt.Errorf("bridge '%s' is not registered", name)
|
||||
}
|
||||
|
||||
m.logger.LogInfo("Stopping bridge", "name", name)
|
||||
m.logger.LogInfo("Stopping bridge", "bridge_id", name)
|
||||
|
||||
if err := bridge.Stop(); err != nil {
|
||||
m.logger.LogError("Failed to stop bridge", "name", name, "error", err)
|
||||
m.logger.LogError("Failed to stop bridge", "bridge_id", name, "error", err)
|
||||
return fmt.Errorf("failed to stop bridge '%s': %w", name, err)
|
||||
}
|
||||
|
||||
m.logger.LogInfo("Bridge stopped successfully", "name", name)
|
||||
m.logger.LogInfo("Bridge stopped successfully", "bridge_id", name)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -127,12 +127,12 @@ func (m *BridgeManager) UnregisterBridge(name string) error {
|
|||
// Stop the bridge before unregistering
|
||||
if bridge.IsConnected() {
|
||||
if err := bridge.Stop(); err != nil {
|
||||
m.logger.LogWarn("Failed to stop bridge during unregistration", "name", name, "error", err)
|
||||
m.logger.LogWarn("Failed to stop bridge during unregistration", "bridge_id", name, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
delete(m.bridges, name)
|
||||
m.logger.LogInfo("Bridge unregistered", "name", name)
|
||||
m.logger.LogInfo("Bridge unregistered", "bridge_id", name)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -211,7 +211,7 @@ func (m *BridgeManager) Shutdown() error {
|
|||
if bridge.IsConnected() {
|
||||
if err := bridge.Stop(); err != nil {
|
||||
errors = append(errors, fmt.Errorf("failed to stop bridge '%s': %w", name, err))
|
||||
m.logger.LogError("Failed to stop bridge during shutdown", "name", name, "error", err)
|
||||
m.logger.LogError("Failed to stop bridge during shutdown", "bridge_id", name, "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -243,9 +243,9 @@ func (m *BridgeManager) OnPluginConfigurationChange(config *config.Configuration
|
|||
for name, bridge := range m.bridges {
|
||||
if err := bridge.UpdateConfiguration(config); err != nil {
|
||||
errors = append(errors, fmt.Errorf("failed to update configuration for bridge '%s': %w", name, err))
|
||||
m.logger.LogError("Failed to update bridge configuration", "name", name, "error", err)
|
||||
m.logger.LogError("Failed to update bridge configuration", "bridge_id", name, "error", err)
|
||||
} else {
|
||||
m.logger.LogDebug("Successfully updated bridge configuration", "name", name)
|
||||
m.logger.LogDebug("Successfully updated bridge configuration", "bridge_id", name)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -408,7 +408,7 @@ func (m *BridgeManager) shareChannel(req model.CreateChannelMappingRequest) erro
|
|||
SharePurpose: fmt.Sprintf("Shared channel bridged to %s", req.BridgeChannelID),
|
||||
ShareHeader: "test header",
|
||||
CreatorId: req.UserID,
|
||||
RemoteId: m.remoteID,
|
||||
RemoteId: "",
|
||||
}
|
||||
|
||||
// Share the channel
|
||||
|
@ -439,35 +439,35 @@ func (m *BridgeManager) unshareChannel(channelID string) error {
|
|||
}
|
||||
|
||||
// startBridgeMessageHandler starts message handling for a specific bridge
|
||||
func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge model.Bridge) {
|
||||
m.logger.LogDebug("Starting message handler for bridge", "bridge", bridgeName)
|
||||
func (m *BridgeManager) startBridgeMessageHandler(bridgeID string, bridge model.Bridge) {
|
||||
m.logger.LogDebug("Starting message handler for bridge", "bridge_id", bridgeID)
|
||||
|
||||
// Subscribe to message bus
|
||||
messageChannel := m.messageBus.Subscribe(bridgeName)
|
||||
messageChannel := m.messageBus.Subscribe(bridgeID)
|
||||
|
||||
// Start message routing goroutine
|
||||
m.routingWg.Add(1)
|
||||
go func() {
|
||||
defer m.routingWg.Done()
|
||||
defer m.logger.LogDebug("Message handler stopped for bridge", "bridge", bridgeName)
|
||||
defer m.logger.LogDebug("Message handler stopped for bridge", "bridge_id", bridgeID)
|
||||
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-messageChannel:
|
||||
if !ok {
|
||||
m.logger.LogDebug("Message channel closed for bridge", "bridge", bridgeName)
|
||||
m.logger.LogDebug("Message channel closed for bridge", "bridge_id", bridgeID)
|
||||
return
|
||||
}
|
||||
|
||||
if err := m.handleBridgeMessage(bridgeName, bridge, msg); err != nil {
|
||||
if err := m.handleBridgeMessage(bridgeID, bridge, msg); err != nil {
|
||||
m.logger.LogError("Failed to handle message for bridge",
|
||||
"bridge", bridgeName,
|
||||
"bridge_id", bridgeID,
|
||||
"source_bridge", msg.SourceBridge,
|
||||
"error", err)
|
||||
}
|
||||
|
||||
case <-m.routingCtx.Done():
|
||||
m.logger.LogDebug("Context cancelled, stopping message handler", "bridge", bridgeName)
|
||||
m.logger.LogDebug("Context cancelled, stopping message handler", "bridge_id", bridgeID)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -477,26 +477,26 @@ func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge mode
|
|||
m.routingWg.Add(1)
|
||||
go func() {
|
||||
defer m.routingWg.Done()
|
||||
defer m.logger.LogDebug("Bridge message listener stopped", "bridge", bridgeName)
|
||||
defer m.logger.LogDebug("Bridge message listener stopped", "bridge_id", bridgeID)
|
||||
|
||||
bridgeMessageChannel := bridge.GetMessageChannel()
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-bridgeMessageChannel:
|
||||
if !ok {
|
||||
m.logger.LogDebug("Bridge message channel closed", "bridge", bridgeName)
|
||||
m.logger.LogDebug("Bridge message channel closed", "bridge_id", bridgeID)
|
||||
return
|
||||
}
|
||||
|
||||
if err := m.messageBus.Publish(msg); err != nil {
|
||||
m.logger.LogError("Failed to publish message from bridge",
|
||||
"bridge", bridgeName,
|
||||
"bridge_id", bridgeID,
|
||||
"direction", msg.Direction,
|
||||
"error", err)
|
||||
}
|
||||
|
||||
case <-m.routingCtx.Done():
|
||||
m.logger.LogDebug("Context cancelled, stopping bridge listener", "bridge", bridgeName)
|
||||
m.logger.LogDebug("Context cancelled, stopping bridge listener", "bridge_id", bridgeID)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -504,9 +504,9 @@ func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge mode
|
|||
}
|
||||
|
||||
// handleBridgeMessage processes an incoming message for a specific bridge
|
||||
func (m *BridgeManager) handleBridgeMessage(bridgeName string, bridge model.Bridge, msg *model.DirectionalMessage) error {
|
||||
func (m *BridgeManager) handleBridgeMessage(bridgeID string, bridge model.Bridge, msg *model.DirectionalMessage) error {
|
||||
m.logger.LogDebug("Handling message for bridge",
|
||||
"target_bridge", bridgeName,
|
||||
"target_bridge", bridgeID,
|
||||
"source_bridge", msg.SourceBridge,
|
||||
"direction", msg.Direction,
|
||||
"channel_id", msg.SourceChannelID)
|
||||
|
@ -514,13 +514,13 @@ func (m *BridgeManager) handleBridgeMessage(bridgeName string, bridge model.Brid
|
|||
// Get the bridge's message handler
|
||||
handler := bridge.GetMessageHandler()
|
||||
if handler == nil {
|
||||
return fmt.Errorf("bridge %s does not have a message handler", bridgeName)
|
||||
return fmt.Errorf("bridge %s does not have a message handler", bridgeID)
|
||||
}
|
||||
|
||||
// Check if the handler can process this message
|
||||
if !handler.CanHandleMessage(msg.BridgeMessage) {
|
||||
m.logger.LogDebug("Bridge cannot handle message",
|
||||
"bridge", bridgeName,
|
||||
"bridge_id", bridgeID,
|
||||
"message_type", msg.MessageType)
|
||||
return nil // Not an error, just skip
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ type mattermostBridge struct {
|
|||
kvstore kvstore.KVStore
|
||||
userManager pluginModel.BridgeUserManager
|
||||
botUserID string // Bot user ID for posting messages
|
||||
bridgeID string // Bridge identifier used for registration
|
||||
remoteID string // Remote ID for shared channels
|
||||
|
||||
// Message handling
|
||||
|
@ -48,19 +49,20 @@ type mattermostBridge struct {
|
|||
}
|
||||
|
||||
// NewBridge creates a new Mattermost bridge
|
||||
func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration, botUserID, remoteID string) pluginModel.Bridge {
|
||||
func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration, botUserID, bridgeID, remoteID string) pluginModel.Bridge {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
b := &mattermostBridge{
|
||||
logger: log,
|
||||
api: api,
|
||||
kvstore: kvstore,
|
||||
botUserID: botUserID,
|
||||
bridgeID: bridgeID,
|
||||
remoteID: remoteID,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
channelMappings: make(map[string]string),
|
||||
config: cfg,
|
||||
userManager: bridge.NewUserManager("mattermost", log),
|
||||
userManager: bridge.NewUserManager(bridgeID, log),
|
||||
incomingMessages: make(chan *pluginModel.DirectionalMessage, defaultMessageBufferSize),
|
||||
}
|
||||
|
||||
|
@ -402,3 +404,8 @@ func (b *mattermostBridge) GetUserResolver() pluginModel.UserResolver {
|
|||
func (b *mattermostBridge) GetRemoteID() string {
|
||||
return b.remoteID
|
||||
}
|
||||
|
||||
// ID returns the bridge identifier used when registering the bridge
|
||||
func (b *mattermostBridge) ID() string {
|
||||
return b.bridgeID
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ type xmppBridge struct {
|
|||
kvstore kvstore.KVStore
|
||||
bridgeClient *xmppClient.Client // Main bridge XMPP client connection
|
||||
userManager pluginModel.BridgeUserManager
|
||||
bridgeID string // Bridge identifier used for registration
|
||||
remoteID string // Remote ID for shared channels
|
||||
|
||||
// Message handling
|
||||
|
@ -52,7 +53,7 @@ type xmppBridge struct {
|
|||
}
|
||||
|
||||
// NewBridge creates a new XMPP bridge
|
||||
func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration, remoteID string) pluginModel.Bridge {
|
||||
func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration, bridgeID, remoteID string) pluginModel.Bridge {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
b := &xmppBridge{
|
||||
logger: log,
|
||||
|
@ -62,8 +63,9 @@ func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *
|
|||
cancel: cancel,
|
||||
channelMappings: make(map[string]string),
|
||||
config: cfg,
|
||||
userManager: bridge.NewUserManager("xmpp", log),
|
||||
userManager: bridge.NewUserManager(bridgeID, log),
|
||||
incomingMessages: make(chan *pluginModel.DirectionalMessage, defaultMessageBufferSize),
|
||||
bridgeID: bridgeID,
|
||||
remoteID: remoteID,
|
||||
}
|
||||
|
||||
|
@ -624,3 +626,8 @@ func (b *xmppBridge) GetUserResolver() pluginModel.UserResolver {
|
|||
func (b *xmppBridge) GetRemoteID() string {
|
||||
return b.remoteID
|
||||
}
|
||||
|
||||
// ID returns the bridge identifier used when registering the bridge
|
||||
func (b *xmppBridge) ID() string {
|
||||
return b.bridgeID
|
||||
}
|
||||
|
|
|
@ -167,6 +167,9 @@ type Bridge interface {
|
|||
|
||||
// GetRemoteID returns the remote ID used for shared channels registration
|
||||
GetRemoteID() string
|
||||
|
||||
// ID returns the bridge identifier used when registering the bridge
|
||||
ID() string
|
||||
}
|
||||
|
||||
// BridgeUser represents a user connected to any bridge service
|
||||
|
|
|
@ -148,6 +148,7 @@ func (p *Plugin) initBridges(cfg config.Configuration) error {
|
|||
p.API,
|
||||
p.kvstore,
|
||||
&cfg,
|
||||
"xmpp",
|
||||
p.remoteID,
|
||||
)
|
||||
|
||||
|
@ -163,6 +164,7 @@ func (p *Plugin) initBridges(cfg config.Configuration) error {
|
|||
&cfg,
|
||||
p.botUserID,
|
||||
"mattermost",
|
||||
"mattermost",
|
||||
)
|
||||
|
||||
if err := p.bridgeManager.RegisterBridge("mattermost", mattermostBridge); err != nil {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue