Compare commits
7 commits
65038fb7a2
...
b1c6f21ea3
Author | SHA1 | Date | |
---|---|---|---|
b1c6f21ea3 | |||
d21dcd2dd1 | |||
5d81ca2154 | |||
4c6aeb2392 | |||
eb852662f7 | |||
7b56cb34c6 | |||
69a67704f4 |
16 changed files with 1708 additions and 234 deletions
3
go.mod
3
go.mod
|
@ -102,6 +102,7 @@ require (
|
||||||
github.com/hashicorp/yamux v0.1.2 // indirect
|
github.com/hashicorp/yamux v0.1.2 // indirect
|
||||||
github.com/hexops/gotextdiff v1.0.3 // indirect
|
github.com/hexops/gotextdiff v1.0.3 // indirect
|
||||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.4.0 // indirect
|
||||||
github.com/jgautheron/goconst v1.7.1 // indirect
|
github.com/jgautheron/goconst v1.7.1 // indirect
|
||||||
github.com/jingyugao/rowserrcheck v1.1.1 // indirect
|
github.com/jingyugao/rowserrcheck v1.1.1 // indirect
|
||||||
github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect
|
github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect
|
||||||
|
@ -210,7 +211,7 @@ require (
|
||||||
golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f // indirect
|
golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f // indirect
|
||||||
golang.org/x/mod v0.22.0 // indirect
|
golang.org/x/mod v0.22.0 // indirect
|
||||||
golang.org/x/net v0.34.0 // indirect
|
golang.org/x/net v0.34.0 // indirect
|
||||||
golang.org/x/sync v0.10.0 // indirect
|
golang.org/x/sync v0.15.0 // indirect
|
||||||
golang.org/x/sys v0.29.0 // indirect
|
golang.org/x/sys v0.29.0 // indirect
|
||||||
golang.org/x/term v0.28.0 // indirect
|
golang.org/x/term v0.28.0 // indirect
|
||||||
golang.org/x/text v0.21.0 // indirect
|
golang.org/x/text v0.21.0 // indirect
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -366,6 +366,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:
|
||||||
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
|
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
|
||||||
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
|
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
|
||||||
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.4.0 h1:YS4P125qQS0tNhtL6aeYkheEaB/m8HCqdMMP4mnWdTY=
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.4.0/go.mod h1:Hw9EgjymziQD3yGsQdf1FqFdpp7YjFMd4Srg5EJlgD4=
|
||||||
github.com/jgautheron/goconst v1.7.1 h1:VpdAG7Ca7yvvJk5n8dMwQhfEZJh95kl/Hl9S1OI5Jkk=
|
github.com/jgautheron/goconst v1.7.1 h1:VpdAG7Ca7yvvJk5n8dMwQhfEZJh95kl/Hl9S1OI5Jkk=
|
||||||
github.com/jgautheron/goconst v1.7.1/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4=
|
github.com/jgautheron/goconst v1.7.1/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4=
|
||||||
github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c=
|
github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c=
|
||||||
|
@ -867,6 +869,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
|
||||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
|
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
|
||||||
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||||
|
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
|
||||||
|
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
package bridge
|
package bridge
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config"
|
||||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
|
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
|
||||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
||||||
mmModel "github.com/mattermost/mattermost/server/public/model"
|
mmModel "github.com/mattermost/mattermost/server/public/model"
|
||||||
|
@ -12,11 +14,15 @@ import (
|
||||||
|
|
||||||
// BridgeManager manages multiple bridge instances
|
// BridgeManager manages multiple bridge instances
|
||||||
type BridgeManager struct {
|
type BridgeManager struct {
|
||||||
bridges map[string]model.Bridge
|
bridges map[string]model.Bridge
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
logger logger.Logger
|
logger logger.Logger
|
||||||
api plugin.API
|
api plugin.API
|
||||||
remoteID string
|
remoteID string
|
||||||
|
messageBus model.MessageBus
|
||||||
|
routingCtx context.Context
|
||||||
|
routingCancel context.CancelFunc
|
||||||
|
routingWg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBridgeManager creates a new bridge manager
|
// NewBridgeManager creates a new bridge manager
|
||||||
|
@ -28,11 +34,16 @@ func NewBridgeManager(logger logger.Logger, api plugin.API, remoteID string) mod
|
||||||
panic("plugin API cannot be nil")
|
panic("plugin API cannot be nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
return &BridgeManager{
|
return &BridgeManager{
|
||||||
bridges: make(map[string]model.Bridge),
|
bridges: make(map[string]model.Bridge),
|
||||||
logger: logger,
|
logger: logger,
|
||||||
api: api,
|
api: api,
|
||||||
remoteID: remoteID,
|
remoteID: remoteID,
|
||||||
|
messageBus: NewMessageBus(logger),
|
||||||
|
routingCtx: ctx,
|
||||||
|
routingCancel: cancel,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,6 +66,9 @@ func (m *BridgeManager) RegisterBridge(name string, bridge model.Bridge) error {
|
||||||
m.bridges[name] = bridge
|
m.bridges[name] = bridge
|
||||||
m.logger.LogInfo("Bridge registered", "name", name)
|
m.logger.LogInfo("Bridge registered", "name", name)
|
||||||
|
|
||||||
|
// Subscribe bridge to message bus
|
||||||
|
go m.startBridgeMessageHandler(name, bridge)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,6 +163,20 @@ func (m *BridgeManager) ListBridges() []string {
|
||||||
return bridges
|
return bridges
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start starts the bridge manager and message routing system
|
||||||
|
func (m *BridgeManager) Start() error {
|
||||||
|
m.logger.LogInfo("Starting bridge manager")
|
||||||
|
|
||||||
|
// Start the message routing system
|
||||||
|
if err := m.StartMessageRouting(); err != nil {
|
||||||
|
m.logger.LogError("Failed to start message routing", "error", err)
|
||||||
|
return fmt.Errorf("failed to start message routing: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
m.logger.LogInfo("Bridge manager started successfully")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// HasBridge checks if a bridge with the given name is registered
|
// HasBridge checks if a bridge with the given name is registered
|
||||||
func (m *BridgeManager) HasBridge(name string) bool {
|
func (m *BridgeManager) HasBridge(name string) bool {
|
||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
|
@ -173,6 +201,11 @@ func (m *BridgeManager) Shutdown() error {
|
||||||
|
|
||||||
m.logger.LogInfo("Shutting down bridge manager", "bridge_count", len(m.bridges))
|
m.logger.LogInfo("Shutting down bridge manager", "bridge_count", len(m.bridges))
|
||||||
|
|
||||||
|
// Stop message routing first
|
||||||
|
if err := m.StopMessageRouting(); err != nil {
|
||||||
|
m.logger.LogError("Failed to stop message routing during shutdown", "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
var errors []error
|
var errors []error
|
||||||
for name, bridge := range m.bridges {
|
for name, bridge := range m.bridges {
|
||||||
if bridge.IsConnected() {
|
if bridge.IsConnected() {
|
||||||
|
@ -196,7 +229,7 @@ func (m *BridgeManager) Shutdown() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnPluginConfigurationChange propagates configuration changes to all registered bridges
|
// OnPluginConfigurationChange propagates configuration changes to all registered bridges
|
||||||
func (m *BridgeManager) OnPluginConfigurationChange(config any) error {
|
func (m *BridgeManager) OnPluginConfigurationChange(config *config.Configuration) error {
|
||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
defer m.mu.RUnlock()
|
defer m.mu.RUnlock()
|
||||||
|
|
||||||
|
@ -231,7 +264,7 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque
|
||||||
return fmt.Errorf("invalid mapping request: %w", err)
|
return fmt.Errorf("invalid mapping request: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.logger.LogDebug("Creating channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID, "user_id", req.UserID, "team_id", req.TeamID)
|
m.logger.LogDebug("Creating channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID, "user_id", req.UserID, "team_id", req.TeamID)
|
||||||
|
|
||||||
// Get the specific bridge
|
// Get the specific bridge
|
||||||
bridge, err := m.GetBridge(req.BridgeName)
|
bridge, err := m.GetBridge(req.BridgeName)
|
||||||
|
@ -246,41 +279,41 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque
|
||||||
}
|
}
|
||||||
|
|
||||||
// NEW: Check if room already mapped to another channel
|
// NEW: Check if room already mapped to another channel
|
||||||
existingChannelID, err := bridge.GetRoomMapping(req.BridgeRoomID)
|
existingChannelID, err := bridge.GetChannelMapping(req.BridgeChannelID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.logger.LogError("Failed to check room mapping", "bridge_room_id", req.BridgeRoomID, "error", err)
|
m.logger.LogError("Failed to check channel mapping", "bridge_channel_id", req.BridgeChannelID, "error", err)
|
||||||
return fmt.Errorf("failed to check room mapping: %w", err)
|
return fmt.Errorf("failed to check channel mapping: %w", err)
|
||||||
}
|
}
|
||||||
if existingChannelID != "" {
|
if existingChannelID != "" {
|
||||||
m.logger.LogWarn("Room already mapped to another channel",
|
m.logger.LogWarn("Channel already mapped to another channel",
|
||||||
"bridge_room_id", req.BridgeRoomID,
|
"bridge_channel_id", req.BridgeChannelID,
|
||||||
"existing_channel_id", existingChannelID,
|
"existing_channel_id", existingChannelID,
|
||||||
"requested_channel_id", req.ChannelID)
|
"requested_channel_id", req.ChannelID)
|
||||||
return fmt.Errorf("room '%s' is already mapped to channel '%s'", req.BridgeRoomID, existingChannelID)
|
return fmt.Errorf("channel '%s' is already mapped to channel '%s'", req.BridgeChannelID, existingChannelID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NEW: Check if room exists on target bridge
|
// NEW: Check if room exists on target bridge
|
||||||
roomExists, err := bridge.RoomExists(req.BridgeRoomID)
|
channelExists, err := bridge.ChannelMappingExists(req.BridgeChannelID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.logger.LogError("Failed to check room existence", "bridge_room_id", req.BridgeRoomID, "error", err)
|
m.logger.LogError("Failed to check channel existence", "bridge_channel_id", req.BridgeChannelID, "error", err)
|
||||||
return fmt.Errorf("failed to check room existence: %w", err)
|
return fmt.Errorf("failed to check channel existence: %w", err)
|
||||||
}
|
}
|
||||||
if !roomExists {
|
if !channelExists {
|
||||||
m.logger.LogWarn("Room does not exist on bridge",
|
m.logger.LogWarn("Channel does not exist on bridge",
|
||||||
"bridge_room_id", req.BridgeRoomID,
|
"bridge_channel_id", req.BridgeChannelID,
|
||||||
"bridge_name", req.BridgeName)
|
"bridge_name", req.BridgeName)
|
||||||
return fmt.Errorf("room '%s' does not exist on %s bridge", req.BridgeRoomID, req.BridgeName)
|
return fmt.Errorf("channel '%s' does not exist on %s bridge", req.BridgeChannelID, req.BridgeName)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.logger.LogDebug("Room validation passed",
|
m.logger.LogDebug("Channel validation passed",
|
||||||
"bridge_room_id", req.BridgeRoomID,
|
"bridge_channel_id", req.BridgeChannelID,
|
||||||
"bridge_name", req.BridgeName,
|
"bridge_name", req.BridgeName,
|
||||||
"room_exists", roomExists,
|
"channel_exists", channelExists,
|
||||||
"already_mapped", false)
|
"already_mapped", false)
|
||||||
|
|
||||||
// Create the channel mapping on the receiving bridge
|
// Create the channel mapping on the receiving bridge
|
||||||
if err = bridge.CreateChannelMapping(req.ChannelID, req.BridgeRoomID); err != nil {
|
if err = bridge.CreateChannelMapping(req.ChannelID, req.BridgeChannelID); err != nil {
|
||||||
m.logger.LogError("Failed to create channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID, "error", err)
|
m.logger.LogError("Failed to create channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID, "error", err)
|
||||||
return fmt.Errorf("failed to create channel mapping for bridge '%s': %w", req.BridgeName, err)
|
return fmt.Errorf("failed to create channel mapping for bridge '%s': %w", req.BridgeName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,19 +324,19 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the channel mapping in the Mattermost bridge
|
// Create the channel mapping in the Mattermost bridge
|
||||||
if err = mattermostBridge.CreateChannelMapping(req.ChannelID, req.BridgeRoomID); err != nil {
|
if err = mattermostBridge.CreateChannelMapping(req.ChannelID, req.BridgeChannelID); err != nil {
|
||||||
m.logger.LogError("Failed to create channel mapping in Mattermost bridge", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID, "error", err)
|
m.logger.LogError("Failed to create channel mapping in Mattermost bridge", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID, "error", err)
|
||||||
return fmt.Errorf("failed to create channel mapping in Mattermost bridge: %w", err)
|
return fmt.Errorf("failed to create channel mapping in Mattermost bridge: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Share the channel using Mattermost's shared channels API
|
// Share the channel using Mattermost's shared channels API
|
||||||
if err = m.shareChannel(req); err != nil {
|
if err = m.shareChannel(req); err != nil {
|
||||||
m.logger.LogError("Failed to share channel", "channel_id", req.ChannelID, "bridge_room_id", req.BridgeRoomID, "error", err)
|
m.logger.LogError("Failed to share channel", "channel_id", req.ChannelID, "bridge_channel_id", req.BridgeChannelID, "error", err)
|
||||||
// Don't fail the entire operation if sharing fails, but log the error
|
// Don't fail the entire operation if sharing fails, but log the error
|
||||||
m.logger.LogWarn("Channel mapping created but sharing failed - channel may not sync properly")
|
m.logger.LogWarn("Channel mapping created but sharing failed - channel may not sync properly")
|
||||||
}
|
}
|
||||||
|
|
||||||
m.logger.LogInfo("Successfully created channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_room_id", req.BridgeRoomID)
|
m.logger.LogInfo("Successfully created channel mapping", "channel_id", req.ChannelID, "bridge_name", req.BridgeName, "bridge_channel_id", req.BridgeChannelID)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -370,9 +403,9 @@ func (m *BridgeManager) shareChannel(req model.CreateChannelMappingRequest) erro
|
||||||
TeamId: req.TeamID,
|
TeamId: req.TeamID,
|
||||||
Home: true,
|
Home: true,
|
||||||
ReadOnly: false,
|
ReadOnly: false,
|
||||||
ShareName: model.SanitizeShareName(fmt.Sprintf("bridge-%s", req.BridgeRoomID)),
|
ShareName: model.SanitizeShareName(fmt.Sprintf("bridge-%s", req.BridgeChannelID)),
|
||||||
ShareDisplayName: fmt.Sprintf("Bridge: %s", req.BridgeRoomID),
|
ShareDisplayName: fmt.Sprintf("Bridge: %s", req.BridgeChannelID),
|
||||||
SharePurpose: fmt.Sprintf("Shared channel bridged to %s", req.BridgeRoomID),
|
SharePurpose: fmt.Sprintf("Shared channel bridged to %s", req.BridgeChannelID),
|
||||||
ShareHeader: "test header",
|
ShareHeader: "test header",
|
||||||
CreatorId: req.UserID,
|
CreatorId: req.UserID,
|
||||||
RemoteId: m.remoteID,
|
RemoteId: m.remoteID,
|
||||||
|
@ -404,3 +437,127 @@ func (m *BridgeManager) unshareChannel(channelID string) error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// startBridgeMessageHandler starts message handling for a specific bridge
|
||||||
|
func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge model.Bridge) {
|
||||||
|
m.logger.LogDebug("Starting message handler for bridge", "bridge", bridgeName)
|
||||||
|
|
||||||
|
// Subscribe to message bus
|
||||||
|
messageChannel := m.messageBus.Subscribe(bridgeName)
|
||||||
|
|
||||||
|
// Start message routing goroutine
|
||||||
|
m.routingWg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer m.routingWg.Done()
|
||||||
|
defer m.logger.LogDebug("Message handler stopped for bridge", "bridge", bridgeName)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case msg, ok := <-messageChannel:
|
||||||
|
if !ok {
|
||||||
|
m.logger.LogDebug("Message channel closed for bridge", "bridge", bridgeName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := m.handleBridgeMessage(bridgeName, bridge, msg); err != nil {
|
||||||
|
m.logger.LogError("Failed to handle message for bridge",
|
||||||
|
"bridge", bridgeName,
|
||||||
|
"source_bridge", msg.SourceBridge,
|
||||||
|
"error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-m.routingCtx.Done():
|
||||||
|
m.logger.LogDebug("Context cancelled, stopping message handler", "bridge", bridgeName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Listen to bridge's outgoing messages
|
||||||
|
m.routingWg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer m.routingWg.Done()
|
||||||
|
defer m.logger.LogDebug("Bridge message listener stopped", "bridge", bridgeName)
|
||||||
|
|
||||||
|
bridgeMessageChannel := bridge.GetMessageChannel()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case msg, ok := <-bridgeMessageChannel:
|
||||||
|
if !ok {
|
||||||
|
m.logger.LogDebug("Bridge message channel closed", "bridge", bridgeName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := m.messageBus.Publish(msg); err != nil {
|
||||||
|
m.logger.LogError("Failed to publish message from bridge",
|
||||||
|
"bridge", bridgeName,
|
||||||
|
"direction", msg.Direction,
|
||||||
|
"error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-m.routingCtx.Done():
|
||||||
|
m.logger.LogDebug("Context cancelled, stopping bridge listener", "bridge", bridgeName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleBridgeMessage processes an incoming message for a specific bridge
|
||||||
|
func (m *BridgeManager) handleBridgeMessage(bridgeName string, bridge model.Bridge, msg *model.DirectionalMessage) error {
|
||||||
|
m.logger.LogDebug("Handling message for bridge",
|
||||||
|
"target_bridge", bridgeName,
|
||||||
|
"source_bridge", msg.SourceBridge,
|
||||||
|
"direction", msg.Direction,
|
||||||
|
"channel_id", msg.SourceChannelID)
|
||||||
|
|
||||||
|
// Get the bridge's message handler
|
||||||
|
handler := bridge.GetMessageHandler()
|
||||||
|
if handler == nil {
|
||||||
|
return fmt.Errorf("bridge %s does not have a message handler", bridgeName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the handler can process this message
|
||||||
|
if !handler.CanHandleMessage(msg.BridgeMessage) {
|
||||||
|
m.logger.LogDebug("Bridge cannot handle message",
|
||||||
|
"bridge", bridgeName,
|
||||||
|
"message_type", msg.MessageType)
|
||||||
|
return nil // Not an error, just skip
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process the message
|
||||||
|
return handler.ProcessMessage(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartMessageRouting starts the message bus and routing system
|
||||||
|
func (m *BridgeManager) StartMessageRouting() error {
|
||||||
|
m.logger.LogInfo("Starting message routing system")
|
||||||
|
return m.messageBus.Start()
|
||||||
|
}
|
||||||
|
|
||||||
|
// StopMessageRouting stops the message bus and routing system
|
||||||
|
func (m *BridgeManager) StopMessageRouting() error {
|
||||||
|
m.logger.LogInfo("Stopping message routing system")
|
||||||
|
|
||||||
|
// Cancel routing context
|
||||||
|
if m.routingCancel != nil {
|
||||||
|
m.routingCancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all routing goroutines to finish
|
||||||
|
m.routingWg.Wait()
|
||||||
|
|
||||||
|
// Stop the message bus
|
||||||
|
return m.messageBus.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
// PublishMessage publishes a message to the message bus for routing to target bridges
|
||||||
|
func (m *BridgeManager) PublishMessage(msg *model.DirectionalMessage) error {
|
||||||
|
m.logger.LogDebug("Publishing message to message bus",
|
||||||
|
"source_bridge", msg.SourceBridge,
|
||||||
|
"direction", msg.Direction,
|
||||||
|
"target_bridges", msg.TargetBridges,
|
||||||
|
"message_id", msg.MessageID)
|
||||||
|
|
||||||
|
return m.messageBus.Publish(msg)
|
||||||
|
}
|
||||||
|
|
|
@ -14,12 +14,24 @@ import (
|
||||||
"github.com/mattermost/mattermost/server/public/plugin"
|
"github.com/mattermost/mattermost/server/public/plugin"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// defaultMessageBufferSize is the buffer size for incoming message channels
|
||||||
|
defaultMessageBufferSize = 1000
|
||||||
|
)
|
||||||
|
|
||||||
// mattermostBridge handles syncing messages between Mattermost instances
|
// mattermostBridge handles syncing messages between Mattermost instances
|
||||||
type mattermostBridge struct {
|
type mattermostBridge struct {
|
||||||
logger logger.Logger
|
logger logger.Logger
|
||||||
api plugin.API
|
api plugin.API
|
||||||
kvstore kvstore.KVStore
|
kvstore kvstore.KVStore
|
||||||
userManager pluginModel.BridgeUserManager
|
userManager pluginModel.BridgeUserManager
|
||||||
|
botUserID string // Bot user ID for posting messages
|
||||||
|
remoteID string // Remote ID for shared channels
|
||||||
|
|
||||||
|
// Message handling
|
||||||
|
messageHandler *mattermostMessageHandler
|
||||||
|
userResolver *mattermostUserResolver
|
||||||
|
incomingMessages chan *pluginModel.DirectionalMessage
|
||||||
|
|
||||||
// Connection management
|
// Connection management
|
||||||
connected atomic.Bool
|
connected atomic.Bool
|
||||||
|
@ -36,27 +48,41 @@ type mattermostBridge struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBridge creates a new Mattermost bridge
|
// NewBridge creates a new Mattermost bridge
|
||||||
func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration) pluginModel.Bridge {
|
func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration, botUserID, remoteID string) pluginModel.Bridge {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
bridge := &mattermostBridge{
|
b := &mattermostBridge{
|
||||||
logger: log,
|
logger: log,
|
||||||
api: api,
|
api: api,
|
||||||
kvstore: kvstore,
|
kvstore: kvstore,
|
||||||
ctx: ctx,
|
botUserID: botUserID,
|
||||||
cancel: cancel,
|
remoteID: remoteID,
|
||||||
channelMappings: make(map[string]string),
|
ctx: ctx,
|
||||||
config: cfg,
|
cancel: cancel,
|
||||||
userManager: bridge.NewUserManager("mattermost", log),
|
channelMappings: make(map[string]string),
|
||||||
|
config: cfg,
|
||||||
|
userManager: bridge.NewUserManager("mattermost", log),
|
||||||
|
incomingMessages: make(chan *pluginModel.DirectionalMessage, defaultMessageBufferSize),
|
||||||
}
|
}
|
||||||
|
|
||||||
return bridge
|
// Initialize handlers after bridge is created
|
||||||
|
b.messageHandler = newMessageHandler(b)
|
||||||
|
b.userResolver = newUserResolver(b)
|
||||||
|
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
// getConfiguration safely retrieves the current configuration
|
||||||
|
func (b *mattermostBridge) getConfiguration() *config.Configuration {
|
||||||
|
b.configMu.RLock()
|
||||||
|
defer b.configMu.RUnlock()
|
||||||
|
return b.config
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateConfiguration updates the bridge configuration
|
// UpdateConfiguration updates the bridge configuration
|
||||||
func (b *mattermostBridge) UpdateConfiguration(newConfig any) error {
|
func (b *mattermostBridge) UpdateConfiguration(cfg *config.Configuration) error {
|
||||||
cfg, ok := newConfig.(*config.Configuration)
|
// Validate configuration using built-in validation
|
||||||
if !ok {
|
if err := cfg.IsValid(); err != nil {
|
||||||
return fmt.Errorf("invalid configuration type")
|
return fmt.Errorf("invalid configuration: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
b.configMu.Lock()
|
b.configMu.Lock()
|
||||||
|
@ -282,8 +308,8 @@ func (b *mattermostBridge) DeleteChannelMapping(channelID string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RoomExists checks if a Mattermost channel exists on the server
|
// ChannelMappingExists checks if a Mattermost channel exists on the server
|
||||||
func (b *mattermostBridge) RoomExists(roomID string) (bool, error) {
|
func (b *mattermostBridge) ChannelMappingExists(roomID string) (bool, error) {
|
||||||
if b.api == nil {
|
if b.api == nil {
|
||||||
return false, fmt.Errorf("Mattermost API not initialized")
|
return false, fmt.Errorf("Mattermost API not initialized")
|
||||||
}
|
}
|
||||||
|
@ -332,7 +358,47 @@ func (b *mattermostBridge) GetRoomMapping(roomID string) (string, error) {
|
||||||
return channelID, nil
|
return channelID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetChannelMappingForBridge retrieves the Mattermost channel ID for a given room ID from a specific bridge
|
||||||
|
func (b *mattermostBridge) GetChannelMappingForBridge(bridgeName, roomID string) (string, error) {
|
||||||
|
channelIDBytes, err := b.kvstore.Get(kvstore.BuildChannelMapKey(bridgeName, roomID))
|
||||||
|
if err != nil {
|
||||||
|
// No mapping found is not an error, just return empty string
|
||||||
|
b.logger.LogDebug("No channel mapping found for bridge room", "bridge_name", bridgeName, "room_id", roomID)
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
channelID := string(channelIDBytes)
|
||||||
|
b.logger.LogDebug("Found channel mapping for bridge room", "bridge_name", bridgeName, "room_id", roomID, "channel_id", channelID)
|
||||||
|
|
||||||
|
return channelID, nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetUserManager returns the user manager for this bridge
|
// GetUserManager returns the user manager for this bridge
|
||||||
func (b *mattermostBridge) GetUserManager() pluginModel.BridgeUserManager {
|
func (b *mattermostBridge) GetUserManager() pluginModel.BridgeUserManager {
|
||||||
return b.userManager
|
return b.userManager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetMessageChannel returns the channel for incoming messages from Mattermost
|
||||||
|
func (b *mattermostBridge) GetMessageChannel() <-chan *pluginModel.DirectionalMessage {
|
||||||
|
return b.incomingMessages
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendMessage sends a message to a Mattermost channel
|
||||||
|
func (b *mattermostBridge) SendMessage(msg *pluginModel.BridgeMessage) error {
|
||||||
|
return b.messageHandler.postMessageToMattermost(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetMessageHandler returns the message handler for this bridge
|
||||||
|
func (b *mattermostBridge) GetMessageHandler() pluginModel.MessageHandler {
|
||||||
|
return b.messageHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetUserResolver returns the user resolver for this bridge
|
||||||
|
func (b *mattermostBridge) GetUserResolver() pluginModel.UserResolver {
|
||||||
|
return b.userResolver
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRemoteID returns the remote ID used for shared channels registration
|
||||||
|
func (b *mattermostBridge) GetRemoteID() string {
|
||||||
|
return b.remoteID
|
||||||
|
}
|
||||||
|
|
208
server/bridge/mattermost/message_handler.go
Normal file
208
server/bridge/mattermost/message_handler.go
Normal file
|
@ -0,0 +1,208 @@
|
||||||
|
package mattermost
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
|
||||||
|
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
||||||
|
mmModel "github.com/mattermost/mattermost/server/public/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
// mattermostMessageHandler handles incoming messages for the Mattermost bridge
|
||||||
|
type mattermostMessageHandler struct {
|
||||||
|
bridge *mattermostBridge
|
||||||
|
logger logger.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// newMessageHandler creates a new Mattermost message handler
|
||||||
|
func newMessageHandler(bridge *mattermostBridge) *mattermostMessageHandler {
|
||||||
|
return &mattermostMessageHandler{
|
||||||
|
bridge: bridge,
|
||||||
|
logger: bridge.logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProcessMessage processes an incoming message for the Mattermost bridge
|
||||||
|
func (h *mattermostMessageHandler) ProcessMessage(msg *pluginModel.DirectionalMessage) error {
|
||||||
|
h.logger.LogDebug("Processing message for Mattermost bridge",
|
||||||
|
"source_bridge", msg.SourceBridge,
|
||||||
|
"direction", msg.Direction,
|
||||||
|
"channel_id", msg.SourceChannelID)
|
||||||
|
|
||||||
|
// Skip messages that originated from Mattermost to prevent loops
|
||||||
|
if msg.SourceBridge == "mattermost" {
|
||||||
|
h.logger.LogDebug("Skipping Mattermost-originated message to prevent loop")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// For incoming messages to Mattermost, we post them to Mattermost channels
|
||||||
|
if msg.Direction == pluginModel.DirectionIncoming {
|
||||||
|
return h.postMessageToMattermost(msg.BridgeMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
h.logger.LogDebug("Ignoring outgoing message for Mattermost bridge")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CanHandleMessage determines if this handler can process the message
|
||||||
|
func (h *mattermostMessageHandler) CanHandleMessage(msg *pluginModel.BridgeMessage) bool {
|
||||||
|
// Mattermost bridge can handle text messages that didn't originate from Mattermost
|
||||||
|
return msg.MessageType == "text" && msg.SourceBridge != "mattermost"
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetSupportedMessageTypes returns the message types this handler supports
|
||||||
|
func (h *mattermostMessageHandler) GetSupportedMessageTypes() []string {
|
||||||
|
return []string{"text"}
|
||||||
|
}
|
||||||
|
|
||||||
|
// postMessageToMattermost posts a message to a Mattermost channel
|
||||||
|
func (h *mattermostMessageHandler) postMessageToMattermost(msg *pluginModel.BridgeMessage) error {
|
||||||
|
if h.bridge.api == nil {
|
||||||
|
return fmt.Errorf("Mattermost API not initialized")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the Mattermost channel ID from the channel mapping using the source bridge name
|
||||||
|
channelID, err := h.bridge.GetChannelMappingForBridge(msg.SourceBridge, msg.SourceChannelID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get channel mapping: %w", err)
|
||||||
|
}
|
||||||
|
if channelID == "" {
|
||||||
|
// Check if the source channel ID is already a Mattermost channel ID
|
||||||
|
channelID = msg.SourceChannelID
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the channel exists
|
||||||
|
channel, appErr := h.bridge.api.GetChannel(channelID)
|
||||||
|
if appErr != nil {
|
||||||
|
return fmt.Errorf("failed to get channel %s: %w", channelID, appErr)
|
||||||
|
}
|
||||||
|
if channel == nil {
|
||||||
|
return fmt.Errorf("channel %s not found", channelID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Format the message content
|
||||||
|
content := h.formatMessageContent(msg)
|
||||||
|
|
||||||
|
// Create the post
|
||||||
|
post := &mmModel.Post{
|
||||||
|
ChannelId: channelID,
|
||||||
|
UserId: h.bridge.botUserID,
|
||||||
|
Message: content,
|
||||||
|
Type: mmModel.PostTypeDefault,
|
||||||
|
Props: map[string]interface{}{
|
||||||
|
"from_bridge": msg.SourceBridge,
|
||||||
|
"bridge_user_id": msg.SourceUserID,
|
||||||
|
"bridge_user_name": msg.SourceUserName,
|
||||||
|
"bridge_message_id": msg.MessageID,
|
||||||
|
"bridge_timestamp": msg.Timestamp.Unix(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add thread ID if present
|
||||||
|
if msg.ThreadID != "" {
|
||||||
|
post.RootId = msg.ThreadID
|
||||||
|
}
|
||||||
|
|
||||||
|
// Post the message as the plugin bot
|
||||||
|
createdPost, appErr := h.bridge.api.CreatePost(post)
|
||||||
|
if appErr != nil {
|
||||||
|
return fmt.Errorf("failed to create post in channel %s: %w", channelID, appErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
h.logger.LogDebug("Message posted to Mattermost channel",
|
||||||
|
"channel_id", channelID,
|
||||||
|
"post_id", createdPost.Id,
|
||||||
|
"source_bridge", msg.SourceBridge,
|
||||||
|
"content_length", len(content))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// formatMessageContent formats the message content for Mattermost
|
||||||
|
func (h *mattermostMessageHandler) formatMessageContent(msg *pluginModel.BridgeMessage) string {
|
||||||
|
// For messages from other bridges, prefix with the bridge info and user name
|
||||||
|
if msg.SourceUserName != "" {
|
||||||
|
bridgeIcon := h.getBridgeIcon(msg.SourceBridge)
|
||||||
|
return fmt.Sprintf("%s **%s**: %s", bridgeIcon, msg.SourceUserName, msg.Content)
|
||||||
|
}
|
||||||
|
return msg.Content
|
||||||
|
}
|
||||||
|
|
||||||
|
// getBridgeIcon returns an icon/emoji for the source bridge
|
||||||
|
func (h *mattermostMessageHandler) getBridgeIcon(bridgeType string) string {
|
||||||
|
switch bridgeType {
|
||||||
|
case "xmpp":
|
||||||
|
return ":speech_balloon:" // Chat bubble emoji for XMPP
|
||||||
|
case "slack":
|
||||||
|
return ":slack:" // Slack emoji if available
|
||||||
|
case "discord":
|
||||||
|
return ":discord:" // Discord emoji if available
|
||||||
|
default:
|
||||||
|
return ":bridge_at_night:" // Generic bridge emoji
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// mattermostUserResolver handles user resolution for the Mattermost bridge
|
||||||
|
type mattermostUserResolver struct {
|
||||||
|
bridge *mattermostBridge
|
||||||
|
logger logger.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// newUserResolver creates a new Mattermost user resolver
|
||||||
|
func newUserResolver(bridge *mattermostBridge) *mattermostUserResolver {
|
||||||
|
return &mattermostUserResolver{
|
||||||
|
bridge: bridge,
|
||||||
|
logger: bridge.logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResolveUser converts an external user ID to an ExternalUser
|
||||||
|
func (r *mattermostUserResolver) ResolveUser(externalUserID string) (*pluginModel.ExternalUser, error) {
|
||||||
|
r.logger.LogDebug("Resolving Mattermost user", "user_id", externalUserID)
|
||||||
|
|
||||||
|
// For Mattermost, the external user ID is the Mattermost user ID
|
||||||
|
user, appErr := r.bridge.api.GetUser(externalUserID)
|
||||||
|
if appErr != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get Mattermost user: %w", appErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if user == nil {
|
||||||
|
return nil, fmt.Errorf("Mattermost user not found: %s", externalUserID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &pluginModel.ExternalUser{
|
||||||
|
BridgeType: "mattermost",
|
||||||
|
ExternalUserID: externalUserID,
|
||||||
|
DisplayName: r.GetDisplayName(externalUserID),
|
||||||
|
MattermostUserID: externalUserID, // Same as external ID for Mattermost
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FormatUserMention formats a user mention for Markdown content
|
||||||
|
func (r *mattermostUserResolver) FormatUserMention(user *pluginModel.ExternalUser) string {
|
||||||
|
// For Mattermost, use the standard @username format
|
||||||
|
return fmt.Sprintf("@%s", user.DisplayName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetDisplayName extracts display name from external user ID
|
||||||
|
func (r *mattermostUserResolver) GetDisplayName(externalUserID string) string {
|
||||||
|
// Try to get the actual username from Mattermost API
|
||||||
|
user, appErr := r.bridge.api.GetUser(externalUserID)
|
||||||
|
if appErr != nil || user == nil {
|
||||||
|
r.logger.LogWarn("Failed to get user for display name", "user_id", externalUserID)
|
||||||
|
return "Unknown User"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prefer username, fallback to first name + last name, then to ID
|
||||||
|
if user.Username != "" {
|
||||||
|
return user.Username
|
||||||
|
}
|
||||||
|
|
||||||
|
fullName := strings.TrimSpace(user.FirstName + " " + user.LastName)
|
||||||
|
if fullName != "" {
|
||||||
|
return fullName
|
||||||
|
}
|
||||||
|
|
||||||
|
return user.Id[:8] // Show first 8 chars of ID as fallback
|
||||||
|
}
|
244
server/bridge/messagebus.go
Normal file
244
server/bridge/messagebus.go
Normal file
|
@ -0,0 +1,244 @@
|
||||||
|
package bridge
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
|
||||||
|
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// DefaultMessageBufferSize is the default buffer size for message channels
|
||||||
|
DefaultMessageBufferSize = 1000
|
||||||
|
|
||||||
|
// MessageDeliveryTimeout is the maximum time to wait for message delivery
|
||||||
|
MessageDeliveryTimeout = 5 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// messageBus implements the MessageBus interface
|
||||||
|
type messageBus struct {
|
||||||
|
// Core messaging
|
||||||
|
incomingMessages chan *model.DirectionalMessage
|
||||||
|
subscribers map[string]chan *model.DirectionalMessage
|
||||||
|
subscribersMu sync.RWMutex
|
||||||
|
|
||||||
|
// Lifecycle management
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
logger logger.Logger
|
||||||
|
wg sync.WaitGroup
|
||||||
|
started bool
|
||||||
|
startMu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMessageBus creates a new message bus instance
|
||||||
|
func NewMessageBus(logger logger.Logger) model.MessageBus {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
return &messageBus{
|
||||||
|
incomingMessages: make(chan *model.DirectionalMessage, DefaultMessageBufferSize),
|
||||||
|
subscribers: make(map[string]chan *model.DirectionalMessage),
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
logger: logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe returns a channel that receives messages for the specified bridge
|
||||||
|
func (mb *messageBus) Subscribe(bridgeName string) <-chan *model.DirectionalMessage {
|
||||||
|
mb.subscribersMu.Lock()
|
||||||
|
defer mb.subscribersMu.Unlock()
|
||||||
|
|
||||||
|
// Create a buffered channel for this subscriber
|
||||||
|
ch := make(chan *model.DirectionalMessage, DefaultMessageBufferSize)
|
||||||
|
mb.subscribers[bridgeName] = ch
|
||||||
|
|
||||||
|
mb.logger.LogDebug("Bridge subscribed to message bus", "bridge", bridgeName)
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish sends a message to the message bus for routing
|
||||||
|
func (mb *messageBus) Publish(msg *model.DirectionalMessage) error {
|
||||||
|
if msg == nil {
|
||||||
|
return fmt.Errorf("message cannot be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if msg.BridgeMessage == nil {
|
||||||
|
return fmt.Errorf("bridge message cannot be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case mb.incomingMessages <- msg:
|
||||||
|
mb.logger.LogDebug("Message published to bus",
|
||||||
|
"source_bridge", msg.SourceBridge,
|
||||||
|
"direction", msg.Direction,
|
||||||
|
"channel_id", msg.SourceChannelID)
|
||||||
|
return nil
|
||||||
|
case <-time.After(MessageDeliveryTimeout):
|
||||||
|
mb.logger.LogWarn("Message delivery timeout",
|
||||||
|
"source_bridge", msg.SourceBridge,
|
||||||
|
"channel_id", msg.SourceChannelID)
|
||||||
|
return fmt.Errorf("message delivery timeout")
|
||||||
|
case <-mb.ctx.Done():
|
||||||
|
return fmt.Errorf("message bus is shutting down")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start begins message routing
|
||||||
|
func (mb *messageBus) Start() error {
|
||||||
|
mb.startMu.Lock()
|
||||||
|
defer mb.startMu.Unlock()
|
||||||
|
|
||||||
|
if mb.started {
|
||||||
|
return fmt.Errorf("message bus is already started")
|
||||||
|
}
|
||||||
|
|
||||||
|
mb.logger.LogInfo("Starting message bus")
|
||||||
|
|
||||||
|
// Start the message routing goroutine
|
||||||
|
mb.wg.Add(1)
|
||||||
|
go mb.routeMessages()
|
||||||
|
|
||||||
|
mb.started = true
|
||||||
|
mb.logger.LogInfo("Message bus started successfully")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop ends message routing and cleans up resources
|
||||||
|
func (mb *messageBus) Stop() error {
|
||||||
|
mb.startMu.Lock()
|
||||||
|
defer mb.startMu.Unlock()
|
||||||
|
|
||||||
|
if !mb.started {
|
||||||
|
return nil // Already stopped
|
||||||
|
}
|
||||||
|
|
||||||
|
mb.logger.LogInfo("Stopping message bus")
|
||||||
|
|
||||||
|
// Cancel context to signal shutdown
|
||||||
|
mb.cancel()
|
||||||
|
|
||||||
|
// Wait for routing goroutine to finish
|
||||||
|
mb.wg.Wait()
|
||||||
|
|
||||||
|
// Close all subscriber channels
|
||||||
|
mb.subscribersMu.Lock()
|
||||||
|
for bridgeName, ch := range mb.subscribers {
|
||||||
|
close(ch)
|
||||||
|
mb.logger.LogDebug("Closed subscriber channel", "bridge", bridgeName)
|
||||||
|
}
|
||||||
|
mb.subscribers = make(map[string]chan *model.DirectionalMessage)
|
||||||
|
mb.subscribersMu.Unlock()
|
||||||
|
|
||||||
|
// Close incoming messages channel
|
||||||
|
close(mb.incomingMessages)
|
||||||
|
|
||||||
|
mb.started = false
|
||||||
|
mb.logger.LogInfo("Message bus stopped successfully")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// routeMessages handles the main message routing loop
|
||||||
|
func (mb *messageBus) routeMessages() {
|
||||||
|
defer mb.wg.Done()
|
||||||
|
|
||||||
|
mb.logger.LogDebug("Message routing started")
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case msg, ok := <-mb.incomingMessages:
|
||||||
|
if !ok {
|
||||||
|
mb.logger.LogDebug("Incoming messages channel closed, stopping routing")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := mb.routeMessage(msg); err != nil {
|
||||||
|
mb.logger.LogError("Failed to route message",
|
||||||
|
"source_bridge", msg.SourceBridge,
|
||||||
|
"direction", msg.Direction,
|
||||||
|
"error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-mb.ctx.Done():
|
||||||
|
mb.logger.LogDebug("Context cancelled, stopping message routing")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// routeMessage routes a single message to appropriate subscribers
|
||||||
|
func (mb *messageBus) routeMessage(msg *model.DirectionalMessage) error {
|
||||||
|
mb.subscribersMu.RLock()
|
||||||
|
defer mb.subscribersMu.RUnlock()
|
||||||
|
|
||||||
|
routedCount := 0
|
||||||
|
|
||||||
|
// Route to specific target bridges if specified
|
||||||
|
if len(msg.TargetBridges) > 0 {
|
||||||
|
for _, targetBridge := range msg.TargetBridges {
|
||||||
|
if ch, exists := mb.subscribers[targetBridge]; exists {
|
||||||
|
if mb.deliverMessage(ch, msg, targetBridge) {
|
||||||
|
routedCount++
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
mb.logger.LogWarn("Target bridge not subscribed",
|
||||||
|
"target_bridge", targetBridge,
|
||||||
|
"source_bridge", msg.SourceBridge)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Route to all subscribers except the source bridge
|
||||||
|
for bridgeName, ch := range mb.subscribers {
|
||||||
|
if bridgeName != msg.SourceBridge {
|
||||||
|
if mb.deliverMessage(ch, msg, bridgeName) {
|
||||||
|
routedCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mb.logger.LogDebug("Message routed",
|
||||||
|
"source_bridge", msg.SourceBridge,
|
||||||
|
"routed_to_count", routedCount)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// deliverMessage attempts to deliver a message to a specific subscriber
|
||||||
|
func (mb *messageBus) deliverMessage(ch chan *model.DirectionalMessage, msg *model.DirectionalMessage, targetBridge string) bool {
|
||||||
|
select {
|
||||||
|
case ch <- msg:
|
||||||
|
return true
|
||||||
|
case <-time.After(MessageDeliveryTimeout):
|
||||||
|
mb.logger.LogWarn("Message delivery timeout to bridge",
|
||||||
|
"target_bridge", targetBridge,
|
||||||
|
"source_bridge", msg.SourceBridge)
|
||||||
|
return false
|
||||||
|
case <-mb.ctx.Done():
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetStats returns statistics about the message bus
|
||||||
|
func (mb *messageBus) GetStats() map[string]interface{} {
|
||||||
|
mb.subscribersMu.RLock()
|
||||||
|
defer mb.subscribersMu.RUnlock()
|
||||||
|
|
||||||
|
stats := map[string]interface{}{
|
||||||
|
"started": mb.started,
|
||||||
|
"subscriber_count": len(mb.subscribers),
|
||||||
|
"buffer_size": DefaultMessageBufferSize,
|
||||||
|
"pending_messages": len(mb.incomingMessages),
|
||||||
|
}
|
||||||
|
|
||||||
|
subscribers := make([]string, 0, len(mb.subscribers))
|
||||||
|
for bridgeName := range mb.subscribers {
|
||||||
|
subscribers = append(subscribers, bridgeName)
|
||||||
|
}
|
||||||
|
stats["subscribers"] = subscribers
|
||||||
|
|
||||||
|
return stats
|
||||||
|
}
|
|
@ -18,6 +18,11 @@ import (
|
||||||
"github.com/mattermost/mattermost/server/public/plugin"
|
"github.com/mattermost/mattermost/server/public/plugin"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// defaultMessageBufferSize is the buffer size for incoming message channels
|
||||||
|
defaultMessageBufferSize = 1000
|
||||||
|
)
|
||||||
|
|
||||||
// xmppBridge handles syncing messages between Mattermost and XMPP
|
// xmppBridge handles syncing messages between Mattermost and XMPP
|
||||||
type xmppBridge struct {
|
type xmppBridge struct {
|
||||||
logger logger.Logger
|
logger logger.Logger
|
||||||
|
@ -25,6 +30,12 @@ type xmppBridge struct {
|
||||||
kvstore kvstore.KVStore
|
kvstore kvstore.KVStore
|
||||||
bridgeClient *xmppClient.Client // Main bridge XMPP client connection
|
bridgeClient *xmppClient.Client // Main bridge XMPP client connection
|
||||||
userManager pluginModel.BridgeUserManager
|
userManager pluginModel.BridgeUserManager
|
||||||
|
remoteID string // Remote ID for shared channels
|
||||||
|
|
||||||
|
// Message handling
|
||||||
|
messageHandler *xmppMessageHandler
|
||||||
|
userResolver *xmppUserResolver
|
||||||
|
incomingMessages chan *pluginModel.DirectionalMessage
|
||||||
|
|
||||||
// Connection management
|
// Connection management
|
||||||
connected atomic.Bool
|
connected atomic.Bool
|
||||||
|
@ -41,19 +52,25 @@ type xmppBridge struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBridge creates a new XMPP bridge
|
// NewBridge creates a new XMPP bridge
|
||||||
func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration) pluginModel.Bridge {
|
func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration, remoteID string) pluginModel.Bridge {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
b := &xmppBridge{
|
b := &xmppBridge{
|
||||||
logger: log,
|
logger: log,
|
||||||
api: api,
|
api: api,
|
||||||
kvstore: kvstore,
|
kvstore: kvstore,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
channelMappings: make(map[string]string),
|
channelMappings: make(map[string]string),
|
||||||
config: cfg,
|
config: cfg,
|
||||||
userManager: bridge.NewUserManager("xmpp", log),
|
userManager: bridge.NewUserManager("xmpp", log),
|
||||||
|
incomingMessages: make(chan *pluginModel.DirectionalMessage, defaultMessageBufferSize),
|
||||||
|
remoteID: remoteID,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Initialize handlers after bridge is created
|
||||||
|
b.messageHandler = newMessageHandler(b)
|
||||||
|
b.userResolver = newUserResolver(b)
|
||||||
|
|
||||||
// Initialize XMPP client with configuration
|
// Initialize XMPP client with configuration
|
||||||
if cfg.EnableSync && cfg.XMPPServerURL != "" && cfg.XMPPUsername != "" && cfg.XMPPPassword != "" {
|
if cfg.EnableSync && cfg.XMPPServerURL != "" && cfg.XMPPUsername != "" && cfg.XMPPPassword != "" {
|
||||||
b.bridgeClient = b.createXMPPClient(cfg)
|
b.bridgeClient = b.createXMPPClient(cfg)
|
||||||
|
@ -80,56 +97,50 @@ func (b *xmppBridge) createXMPPClient(cfg *config.Configuration) *xmppClient.Cli
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getConfiguration safely retrieves the current configuration
|
||||||
|
func (b *xmppBridge) getConfiguration() *config.Configuration {
|
||||||
|
b.configMu.RLock()
|
||||||
|
defer b.configMu.RUnlock()
|
||||||
|
return b.config
|
||||||
|
}
|
||||||
|
|
||||||
// UpdateConfiguration updates the bridge configuration
|
// UpdateConfiguration updates the bridge configuration
|
||||||
func (b *xmppBridge) UpdateConfiguration(newConfig any) error {
|
// It handles validation and reconnection logic when the configuration changes
|
||||||
cfg, ok := newConfig.(*config.Configuration)
|
func (b *xmppBridge) UpdateConfiguration(cfg *config.Configuration) error {
|
||||||
if !ok {
|
// Validate configuration using built-in validation
|
||||||
return fmt.Errorf("invalid configuration type")
|
if err := cfg.IsValid(); err != nil {
|
||||||
|
return fmt.Errorf("invalid configuration: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
b.configMu.Lock()
|
// Get current config to check if restart is needed
|
||||||
oldConfig := b.config
|
oldConfig := b.getConfiguration()
|
||||||
b.config = cfg
|
|
||||||
defer b.configMu.Unlock()
|
|
||||||
|
|
||||||
b.logger.LogInfo("XMPP bridge configuration updated")
|
// Update configuration under lock, then release immediately
|
||||||
|
b.configMu.Lock()
|
||||||
|
b.config = cfg
|
||||||
|
|
||||||
// Initialize or update XMPP client with new configuration
|
// Initialize or update XMPP client with new configuration
|
||||||
if cfg.EnableSync {
|
if !cfg.Equals(oldConfig) {
|
||||||
if cfg.XMPPServerURL == "" || cfg.XMPPUsername == "" || cfg.XMPPPassword == "" {
|
if b.bridgeClient != nil && b.bridgeClient.Disconnect() != nil {
|
||||||
return fmt.Errorf("XMPP server URL, username, and password are required when sync is enabled")
|
b.logger.LogError("Failed to disconnect old XMPP bridge client")
|
||||||
}
|
}
|
||||||
|
|
||||||
b.bridgeClient = b.createXMPPClient(cfg)
|
b.bridgeClient = b.createXMPPClient(cfg)
|
||||||
} else {
|
}
|
||||||
b.bridgeClient = nil
|
b.configMu.Unlock()
|
||||||
|
|
||||||
|
// Stop the bridge
|
||||||
|
if err := b.Stop(); err != nil {
|
||||||
|
b.logger.LogWarn("Error stopping bridge during restart", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if we need to restart the bridge due to configuration changes
|
// Start the bridge with new configuration
|
||||||
wasConnected := b.connected.Load()
|
// Start() method already uses getConfiguration() safely
|
||||||
needsRestart := oldConfig != nil && !oldConfig.Equals(cfg) && wasConnected
|
if err := b.Start(); err != nil {
|
||||||
|
b.logger.LogError("Failed to restart bridge with new configuration", "error", err)
|
||||||
// Log the configuration change
|
return fmt.Errorf("failed to restart bridge: %w", err)
|
||||||
if needsRestart {
|
|
||||||
b.logger.LogInfo("Configuration changed, restarting bridge")
|
|
||||||
} else {
|
|
||||||
b.logger.LogInfo("Configuration updated", "config", cfg)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if needsRestart {
|
b.logger.LogDebug("XMPP bridge configuration updated successfully")
|
||||||
b.logger.LogInfo("Configuration changed, restarting bridge")
|
|
||||||
|
|
||||||
// Stop the bridge
|
|
||||||
if err := b.Stop(); err != nil {
|
|
||||||
b.logger.LogWarn("Error stopping bridge during restart", "error", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start the bridge with new configuration
|
|
||||||
if err := b.Start(); err != nil {
|
|
||||||
b.logger.LogError("Failed to restart bridge with new configuration", "error", err)
|
|
||||||
return fmt.Errorf("failed to restart bridge: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -166,6 +177,9 @@ func (b *xmppBridge) Start() error {
|
||||||
// Start connection monitor
|
// Start connection monitor
|
||||||
go b.connectionMonitor()
|
go b.connectionMonitor()
|
||||||
|
|
||||||
|
// Start message aggregation
|
||||||
|
go b.startMessageAggregation()
|
||||||
|
|
||||||
b.logger.LogInfo("Mattermost to XMPP bridge started successfully")
|
b.logger.LogInfo("Mattermost to XMPP bridge started successfully")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -490,8 +504,8 @@ func (b *xmppBridge) DeleteChannelMapping(channelID string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RoomExists checks if an XMPP room exists on the remote service
|
// ChannelMappingExists checks if an XMPP room exists on the remote service
|
||||||
func (b *xmppBridge) RoomExists(roomID string) (bool, error) {
|
func (b *xmppBridge) ChannelMappingExists(roomID string) (bool, error) {
|
||||||
if !b.connected.Load() {
|
if !b.connected.Load() {
|
||||||
return false, fmt.Errorf("not connected to XMPP server")
|
return false, fmt.Errorf("not connected to XMPP server")
|
||||||
}
|
}
|
||||||
|
@ -535,7 +549,78 @@ func (b *xmppBridge) GetRoomMapping(roomID string) (string, error) {
|
||||||
return channelID, nil
|
return channelID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetChannelMappingForBridge retrieves the Mattermost channel ID for a given room ID from a specific bridge
|
||||||
|
func (b *xmppBridge) GetChannelMappingForBridge(bridgeName, roomID string) (string, error) {
|
||||||
|
// Look up the channel ID using the bridge name and room ID as the key
|
||||||
|
channelIDBytes, err := b.kvstore.Get(kvstore.BuildChannelMapKey(bridgeName, roomID))
|
||||||
|
if err != nil {
|
||||||
|
// No mapping found is not an error, just return empty string
|
||||||
|
b.logger.LogDebug("No channel mapping found for bridge room", "bridge_name", bridgeName, "room_id", roomID)
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
channelID := string(channelIDBytes)
|
||||||
|
b.logger.LogDebug("Found channel mapping for bridge room", "bridge_name", bridgeName, "room_id", roomID, "channel_id", channelID)
|
||||||
|
|
||||||
|
return channelID, nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetUserManager returns the user manager for this bridge
|
// GetUserManager returns the user manager for this bridge
|
||||||
func (b *xmppBridge) GetUserManager() pluginModel.BridgeUserManager {
|
func (b *xmppBridge) GetUserManager() pluginModel.BridgeUserManager {
|
||||||
return b.userManager
|
return b.userManager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// startMessageAggregation starts the message aggregation goroutine
|
||||||
|
func (b *xmppBridge) startMessageAggregation() {
|
||||||
|
clientChannel := b.bridgeClient.GetMessageChannel()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-b.ctx.Done():
|
||||||
|
b.logger.LogDebug("Stopping XMPP message aggregation")
|
||||||
|
return
|
||||||
|
case msg, ok := <-clientChannel:
|
||||||
|
if !ok {
|
||||||
|
b.logger.LogDebug("Bridge client message channel closed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Forward to our bridge's message channel
|
||||||
|
select {
|
||||||
|
case b.incomingMessages <- msg:
|
||||||
|
// Message forwarded successfully
|
||||||
|
case <-b.ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
b.logger.LogWarn("Bridge message channel full, dropping message",
|
||||||
|
"source_channel", msg.SourceChannelID,
|
||||||
|
"user_id", msg.SourceUserID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetMessageChannel returns the channel for incoming messages from XMPP
|
||||||
|
func (b *xmppBridge) GetMessageChannel() <-chan *pluginModel.DirectionalMessage {
|
||||||
|
return b.incomingMessages
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendMessage sends a message to an XMPP room
|
||||||
|
func (b *xmppBridge) SendMessage(msg *pluginModel.BridgeMessage) error {
|
||||||
|
return b.messageHandler.sendMessageToXMPP(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetMessageHandler returns the message handler for this bridge
|
||||||
|
func (b *xmppBridge) GetMessageHandler() pluginModel.MessageHandler {
|
||||||
|
return b.messageHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetUserResolver returns the user resolver for this bridge
|
||||||
|
func (b *xmppBridge) GetUserResolver() pluginModel.UserResolver {
|
||||||
|
return b.userResolver
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRemoteID returns the remote ID used for shared channels registration
|
||||||
|
func (b *xmppBridge) GetRemoteID() string {
|
||||||
|
return b.remoteID
|
||||||
|
}
|
||||||
|
|
166
server/bridge/xmpp/message_handler.go
Normal file
166
server/bridge/xmpp/message_handler.go
Normal file
|
@ -0,0 +1,166 @@
|
||||||
|
package xmpp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
|
||||||
|
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
||||||
|
xmppClient "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp"
|
||||||
|
)
|
||||||
|
|
||||||
|
// xmppMessageHandler handles incoming messages for the XMPP bridge
|
||||||
|
type xmppMessageHandler struct {
|
||||||
|
bridge *xmppBridge
|
||||||
|
logger logger.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// newMessageHandler creates a new XMPP message handler
|
||||||
|
func newMessageHandler(bridge *xmppBridge) *xmppMessageHandler {
|
||||||
|
return &xmppMessageHandler{
|
||||||
|
bridge: bridge,
|
||||||
|
logger: bridge.logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProcessMessage processes an incoming message for the XMPP bridge
|
||||||
|
func (h *xmppMessageHandler) ProcessMessage(msg *pluginModel.DirectionalMessage) error {
|
||||||
|
h.logger.LogDebug("Processing message for XMPP bridge",
|
||||||
|
"source_bridge", msg.SourceBridge,
|
||||||
|
"direction", msg.Direction,
|
||||||
|
"channel_id", msg.SourceChannelID)
|
||||||
|
|
||||||
|
// Skip messages that originated from XMPP to prevent loops
|
||||||
|
if msg.SourceBridge == "xmpp" {
|
||||||
|
h.logger.LogDebug("Skipping XMPP-originated message to prevent loop")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// For incoming messages to XMPP, we send them to XMPP rooms
|
||||||
|
if msg.Direction == pluginModel.DirectionIncoming {
|
||||||
|
return h.sendMessageToXMPP(msg.BridgeMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
h.logger.LogDebug("Ignoring outgoing message for XMPP bridge")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CanHandleMessage determines if this handler can process the message
|
||||||
|
func (h *xmppMessageHandler) CanHandleMessage(msg *pluginModel.BridgeMessage) bool {
|
||||||
|
// XMPP bridge can handle text messages that didn't originate from XMPP
|
||||||
|
return msg.MessageType == "text" && msg.SourceBridge != "xmpp"
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetSupportedMessageTypes returns the message types this handler supports
|
||||||
|
func (h *xmppMessageHandler) GetSupportedMessageTypes() []string {
|
||||||
|
return []string{"text"}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendMessageToXMPP sends a message to an XMPP room
|
||||||
|
func (h *xmppMessageHandler) sendMessageToXMPP(msg *pluginModel.BridgeMessage) error {
|
||||||
|
if h.bridge.bridgeClient == nil {
|
||||||
|
return fmt.Errorf("XMPP client not initialized")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !h.bridge.connected.Load() {
|
||||||
|
return fmt.Errorf("not connected to XMPP server")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the XMPP room JID from the channel mapping
|
||||||
|
roomJID, err := h.bridge.GetChannelMapping(msg.SourceChannelID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get room mapping: %w", err)
|
||||||
|
}
|
||||||
|
if roomJID == "" {
|
||||||
|
return fmt.Errorf("channel is not mapped to any XMPP room")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Format the message content with user information
|
||||||
|
content := h.formatMessageContent(msg)
|
||||||
|
|
||||||
|
// Create XMPP message request
|
||||||
|
req := xmppClient.MessageRequest{
|
||||||
|
RoomJID: roomJID,
|
||||||
|
Message: content,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the message
|
||||||
|
_, err = h.bridge.bridgeClient.SendMessage(req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to send message to XMPP room: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
h.logger.LogDebug("Message sent to XMPP room",
|
||||||
|
"channel_id", msg.SourceChannelID,
|
||||||
|
"room_jid", roomJID,
|
||||||
|
"content_length", len(content))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// formatMessageContent formats the message content for XMPP
|
||||||
|
func (h *xmppMessageHandler) formatMessageContent(msg *pluginModel.BridgeMessage) string {
|
||||||
|
// For messages from other bridges, prefix with the user name
|
||||||
|
if msg.SourceUserName != "" {
|
||||||
|
return fmt.Sprintf("<%s> %s", msg.SourceUserName, msg.Content)
|
||||||
|
}
|
||||||
|
return msg.Content
|
||||||
|
}
|
||||||
|
|
||||||
|
// xmppUserResolver handles user resolution for the XMPP bridge
|
||||||
|
type xmppUserResolver struct {
|
||||||
|
bridge *xmppBridge
|
||||||
|
logger logger.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// newUserResolver creates a new XMPP user resolver
|
||||||
|
func newUserResolver(bridge *xmppBridge) *xmppUserResolver {
|
||||||
|
return &xmppUserResolver{
|
||||||
|
bridge: bridge,
|
||||||
|
logger: bridge.logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResolveUser converts an external user ID to an ExternalUser
|
||||||
|
func (r *xmppUserResolver) ResolveUser(externalUserID string) (*pluginModel.ExternalUser, error) {
|
||||||
|
r.logger.LogDebug("Resolving XMPP user", "user_id", externalUserID)
|
||||||
|
|
||||||
|
// For XMPP, the external user ID is typically the full JID
|
||||||
|
return &pluginModel.ExternalUser{
|
||||||
|
BridgeType: "xmpp",
|
||||||
|
ExternalUserID: externalUserID,
|
||||||
|
DisplayName: r.GetDisplayName(externalUserID),
|
||||||
|
MattermostUserID: "", // Will be resolved by user mapping system
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FormatUserMention formats a user mention for Markdown content
|
||||||
|
func (r *xmppUserResolver) FormatUserMention(user *pluginModel.ExternalUser) string {
|
||||||
|
// For XMPP, we can format mentions as simple text with the display name
|
||||||
|
return fmt.Sprintf("@%s", user.DisplayName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetDisplayName extracts display name from external user ID
|
||||||
|
func (r *xmppUserResolver) GetDisplayName(externalUserID string) string {
|
||||||
|
// For XMPP JIDs, extract the local part or resource as display name
|
||||||
|
// Format: user@domain/resource -> use resource or user
|
||||||
|
if len(externalUserID) == 0 {
|
||||||
|
return "Unknown User"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to parse as JID and extract meaningful display name
|
||||||
|
parts := strings.Split(externalUserID, "/")
|
||||||
|
if len(parts) > 1 {
|
||||||
|
// Has resource part, use it as display name
|
||||||
|
return parts[1]
|
||||||
|
}
|
||||||
|
|
||||||
|
// No resource, try to extract local part from user@domain
|
||||||
|
atIndex := strings.Index(externalUserID, "@")
|
||||||
|
if atIndex > 0 {
|
||||||
|
return externalUserID[:atIndex]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fallback to the full ID
|
||||||
|
return externalUserID
|
||||||
|
}
|
|
@ -6,11 +6,13 @@ import (
|
||||||
|
|
||||||
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
||||||
"github.com/mattermost/mattermost/server/public/model"
|
"github.com/mattermost/mattermost/server/public/model"
|
||||||
|
"github.com/mattermost/mattermost/server/public/plugin"
|
||||||
"github.com/mattermost/mattermost/server/public/pluginapi"
|
"github.com/mattermost/mattermost/server/public/pluginapi"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Handler struct {
|
type Handler struct {
|
||||||
client *pluginapi.Client
|
client *pluginapi.Client
|
||||||
|
api plugin.API
|
||||||
bridgeManager pluginModel.BridgeManager
|
bridgeManager pluginModel.BridgeManager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,7 +24,7 @@ type Command interface {
|
||||||
const xmppBridgeCommandTrigger = "xmppbridge"
|
const xmppBridgeCommandTrigger = "xmppbridge"
|
||||||
|
|
||||||
// Register all your slash commands in the NewCommandHandler function.
|
// Register all your slash commands in the NewCommandHandler function.
|
||||||
func NewCommandHandler(client *pluginapi.Client, bridgeManager pluginModel.BridgeManager) Command {
|
func NewCommandHandler(client *pluginapi.Client, api plugin.API, bridgeManager pluginModel.BridgeManager) Command {
|
||||||
// Register XMPP bridge command
|
// Register XMPP bridge command
|
||||||
xmppBridgeData := model.NewAutocompleteData(xmppBridgeCommandTrigger, "", "Manage XMPP bridge")
|
xmppBridgeData := model.NewAutocompleteData(xmppBridgeCommandTrigger, "", "Manage XMPP bridge")
|
||||||
mapSubcommand := model.NewAutocompleteData("map", "[room_jid]", "Map current channel to XMPP room")
|
mapSubcommand := model.NewAutocompleteData("map", "[room_jid]", "Map current channel to XMPP room")
|
||||||
|
@ -35,11 +37,17 @@ func NewCommandHandler(client *pluginapi.Client, bridgeManager pluginModel.Bridg
|
||||||
statusSubcommand := model.NewAutocompleteData("status", "", "Show bridge connection status")
|
statusSubcommand := model.NewAutocompleteData("status", "", "Show bridge connection status")
|
||||||
xmppBridgeData.AddCommand(statusSubcommand)
|
xmppBridgeData.AddCommand(statusSubcommand)
|
||||||
|
|
||||||
|
syncSubcommand := model.NewAutocompleteData("sync", "", "Force sync with shared channels if channel is mapped")
|
||||||
|
xmppBridgeData.AddCommand(syncSubcommand)
|
||||||
|
|
||||||
|
syncResetSubcommand := model.NewAutocompleteData("sync-reset", "", "Reset sync cursor for channel if mapped")
|
||||||
|
xmppBridgeData.AddCommand(syncResetSubcommand)
|
||||||
|
|
||||||
err := client.SlashCommand.Register(&model.Command{
|
err := client.SlashCommand.Register(&model.Command{
|
||||||
Trigger: xmppBridgeCommandTrigger,
|
Trigger: xmppBridgeCommandTrigger,
|
||||||
AutoComplete: true,
|
AutoComplete: true,
|
||||||
AutoCompleteDesc: "Manage XMPP bridge mappings",
|
AutoCompleteDesc: "Manage XMPP bridge mappings",
|
||||||
AutoCompleteHint: "[map|unmap|status]",
|
AutoCompleteHint: "[map|unmap|status|sync|sync-reset]",
|
||||||
AutocompleteData: xmppBridgeData,
|
AutocompleteData: xmppBridgeData,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -48,6 +56,7 @@ func NewCommandHandler(client *pluginapi.Client, bridgeManager pluginModel.Bridg
|
||||||
|
|
||||||
return &Handler{
|
return &Handler{
|
||||||
client: client,
|
client: client,
|
||||||
|
api: api,
|
||||||
bridgeManager: bridgeManager,
|
bridgeManager: bridgeManager,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -85,6 +94,8 @@ func (c *Handler) executeXMPPBridgeCommand(args *model.CommandArgs) *model.Comma
|
||||||
- ` + "`/xmppbridge map <room_jid>`" + ` - Map current channel to XMPP room
|
- ` + "`/xmppbridge map <room_jid>`" + ` - Map current channel to XMPP room
|
||||||
- ` + "`/xmppbridge unmap`" + ` - Unmap current channel from XMPP room
|
- ` + "`/xmppbridge unmap`" + ` - Unmap current channel from XMPP room
|
||||||
- ` + "`/xmppbridge status`" + ` - Show bridge connection status
|
- ` + "`/xmppbridge status`" + ` - Show bridge connection status
|
||||||
|
- ` + "`/xmppbridge sync`" + ` - Force sync with shared channels if channel is mapped
|
||||||
|
- ` + "`/xmppbridge sync-reset`" + ` - Reset sync cursor for channel if mapped
|
||||||
|
|
||||||
**Example:**
|
**Example:**
|
||||||
` + "`/xmppbridge map general@conference.example.com`",
|
` + "`/xmppbridge map general@conference.example.com`",
|
||||||
|
@ -99,6 +110,10 @@ func (c *Handler) executeXMPPBridgeCommand(args *model.CommandArgs) *model.Comma
|
||||||
return c.executeUnmapCommand(args)
|
return c.executeUnmapCommand(args)
|
||||||
case "status":
|
case "status":
|
||||||
return c.executeStatusCommand(args)
|
return c.executeStatusCommand(args)
|
||||||
|
case "sync":
|
||||||
|
return c.executeSyncCommand(args)
|
||||||
|
case "sync-reset":
|
||||||
|
return c.executeSyncResetCommand(args)
|
||||||
default:
|
default:
|
||||||
return &model.CommandResponse{
|
return &model.CommandResponse{
|
||||||
ResponseType: model.CommandResponseTypeEphemeral,
|
ResponseType: model.CommandResponseTypeEphemeral,
|
||||||
|
@ -161,11 +176,11 @@ func (c *Handler) executeMapCommand(args *model.CommandArgs, fields []string) *m
|
||||||
|
|
||||||
// Create the mapping using BridgeManager
|
// Create the mapping using BridgeManager
|
||||||
mappingReq := pluginModel.CreateChannelMappingRequest{
|
mappingReq := pluginModel.CreateChannelMappingRequest{
|
||||||
ChannelID: channelID,
|
ChannelID: channelID,
|
||||||
BridgeName: "xmpp",
|
BridgeName: "xmpp",
|
||||||
BridgeRoomID: roomJID,
|
BridgeChannelID: roomJID,
|
||||||
UserID: args.UserId,
|
UserID: args.UserId,
|
||||||
TeamID: args.TeamId,
|
TeamID: args.TeamId,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.bridgeManager.CreateChannelMapping(mappingReq)
|
err = c.bridgeManager.CreateChannelMapping(mappingReq)
|
||||||
|
@ -284,6 +299,105 @@ func (c *Handler) isSystemAdmin(userID string) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// formatMappingError provides user-friendly error messages for mapping operations
|
// formatMappingError provides user-friendly error messages for mapping operations
|
||||||
|
func (c *Handler) executeSyncCommand(args *model.CommandArgs) *model.CommandResponse {
|
||||||
|
channelID := args.ChannelId
|
||||||
|
|
||||||
|
// Get the XMPP bridge to check if channel is mapped
|
||||||
|
bridge, err := c.bridgeManager.GetBridge("xmpp")
|
||||||
|
if err != nil {
|
||||||
|
return &model.CommandResponse{
|
||||||
|
ResponseType: model.CommandResponseTypeEphemeral,
|
||||||
|
Text: "❌ XMPP bridge is not available. Please check the plugin configuration.",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if channel is mapped to XMPP
|
||||||
|
roomJID, err := bridge.GetChannelMapping(channelID)
|
||||||
|
if err != nil {
|
||||||
|
return &model.CommandResponse{
|
||||||
|
ResponseType: model.CommandResponseTypeEphemeral,
|
||||||
|
Text: fmt.Sprintf("Error checking channel mapping: %v", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if roomJID == "" {
|
||||||
|
return &model.CommandResponse{
|
||||||
|
ResponseType: model.CommandResponseTypeEphemeral,
|
||||||
|
Text: "❌ This channel is not mapped to any XMPP room. Use `/xmppbridge map <room_jid>` to create a mapping first.",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Force sync with shared channels
|
||||||
|
if err := c.api.SyncSharedChannel(channelID); err != nil {
|
||||||
|
return &model.CommandResponse{
|
||||||
|
ResponseType: model.CommandResponseTypeEphemeral,
|
||||||
|
Text: fmt.Sprintf("❌ Failed to sync channel: %v", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &model.CommandResponse{
|
||||||
|
ResponseType: model.CommandResponseTypeEphemeral,
|
||||||
|
Text: fmt.Sprintf("✅ Successfully triggered sync for channel mapped to XMPP room: `%s`", roomJID),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Handler) executeSyncResetCommand(args *model.CommandArgs) *model.CommandResponse {
|
||||||
|
channelID := args.ChannelId
|
||||||
|
|
||||||
|
// Get the XMPP bridge to check if channel is mapped
|
||||||
|
bridge, err := c.bridgeManager.GetBridge("xmpp")
|
||||||
|
if err != nil {
|
||||||
|
return &model.CommandResponse{
|
||||||
|
ResponseType: model.CommandResponseTypeEphemeral,
|
||||||
|
Text: "❌ XMPP bridge is not available. Please check the plugin configuration.",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if channel is mapped to XMPP
|
||||||
|
roomJID, err := bridge.GetChannelMapping(channelID)
|
||||||
|
if err != nil {
|
||||||
|
return &model.CommandResponse{
|
||||||
|
ResponseType: model.CommandResponseTypeEphemeral,
|
||||||
|
Text: fmt.Sprintf("Error checking channel mapping: %v", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if roomJID == "" {
|
||||||
|
return &model.CommandResponse{
|
||||||
|
ResponseType: model.CommandResponseTypeEphemeral,
|
||||||
|
Text: "❌ This channel is not mapped to any XMPP room. Use `/xmppbridge map <room_jid>` to create a mapping first.",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get remoteID from bridge for cursor operations
|
||||||
|
remoteID := bridge.GetRemoteID()
|
||||||
|
if remoteID == "" {
|
||||||
|
return &model.CommandResponse{
|
||||||
|
ResponseType: model.CommandResponseTypeEphemeral,
|
||||||
|
Text: "❌ Bridge remote ID not available. Cannot reset sync cursor.",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create empty cursor to reset to beginning
|
||||||
|
emptyCursor := model.GetPostsSinceForSyncCursor{
|
||||||
|
LastPostUpdateAt: 1,
|
||||||
|
LastPostCreateAt: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset sync cursor using UpdateSharedChannelCursor
|
||||||
|
if err := c.api.UpdateSharedChannelCursor(channelID, remoteID, emptyCursor); err != nil {
|
||||||
|
return &model.CommandResponse{
|
||||||
|
ResponseType: model.CommandResponseTypeEphemeral,
|
||||||
|
Text: fmt.Sprintf("❌ Failed to reset sync cursor: %v", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &model.CommandResponse{
|
||||||
|
ResponseType: model.CommandResponseTypeEphemeral,
|
||||||
|
Text: fmt.Sprintf("✅ Successfully reset sync cursor for channel mapped to XMPP room: `%s`", roomJID),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Handler) formatMappingError(operation, roomJID string, err error) *model.CommandResponse {
|
func (c *Handler) formatMappingError(operation, roomJID string, err error) *model.CommandResponse {
|
||||||
errorMsg := err.Error()
|
errorMsg := err.Error()
|
||||||
|
|
||||||
|
@ -292,7 +406,7 @@ func (c *Handler) formatMappingError(operation, roomJID string, err error) *mode
|
||||||
case strings.Contains(errorMsg, "already mapped to channel"):
|
case strings.Contains(errorMsg, "already mapped to channel"):
|
||||||
return &model.CommandResponse{
|
return &model.CommandResponse{
|
||||||
ResponseType: model.CommandResponseTypeEphemeral,
|
ResponseType: model.CommandResponseTypeEphemeral,
|
||||||
Text: fmt.Sprintf(`❌ **Room Already Mapped**
|
Text: fmt.Sprintf(`❌ **Channel Already Mapped**
|
||||||
|
|
||||||
The XMPP room **%s** is already connected to another channel.
|
The XMPP room **%s** is already connected to another channel.
|
||||||
|
|
||||||
|
@ -305,7 +419,7 @@ The XMPP room **%s** is already connected to another channel.
|
||||||
case strings.Contains(errorMsg, "does not exist"):
|
case strings.Contains(errorMsg, "does not exist"):
|
||||||
return &model.CommandResponse{
|
return &model.CommandResponse{
|
||||||
ResponseType: model.CommandResponseTypeEphemeral,
|
ResponseType: model.CommandResponseTypeEphemeral,
|
||||||
Text: fmt.Sprintf(`❌ **Room Not Found**
|
Text: fmt.Sprintf(`❌ **Channel Not Found**
|
||||||
|
|
||||||
The XMPP room **%s** doesn't exist or isn't accessible.
|
The XMPP room **%s** doesn't exist or isn't accessible.
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,7 @@ func (p *Plugin) OnConfigurationChange() error {
|
||||||
return errors.Wrap(err, "failed to load plugin configuration")
|
return errors.Wrap(err, "failed to load plugin configuration")
|
||||||
}
|
}
|
||||||
|
|
||||||
p.API.LogDebug("Loaded configuration in OnConfigurationChange", "configuration", configuration)
|
p.API.LogDebug("Plugin configuration changed")
|
||||||
|
|
||||||
// Validate the configuration
|
// Validate the configuration
|
||||||
if err := configuration.IsValid(); err != nil {
|
if err := configuration.IsValid(); err != nil {
|
||||||
|
|
|
@ -1,18 +1,24 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import "github.com/mattermost/mattermost/server/public/model"
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
||||||
|
"github.com/mattermost/mattermost/server/public/model"
|
||||||
|
)
|
||||||
|
|
||||||
// OnSharedChannelsPing is called to check if the bridge is healthy and ready to process messages
|
// OnSharedChannelsPing is called to check if the bridge is healthy and ready to process messages
|
||||||
func (p *Plugin) OnSharedChannelsPing(remoteCluster *model.RemoteCluster) bool {
|
func (p *Plugin) OnSharedChannelsPing(remoteCluster *model.RemoteCluster) bool {
|
||||||
config := p.getConfiguration()
|
config := p.getConfiguration()
|
||||||
|
|
||||||
p.logger.LogDebug("OnSharedChannelsPing called", "remote_cluster_id", remoteCluster.RemoteId)
|
|
||||||
|
|
||||||
var remoteClusterID string
|
var remoteClusterID string
|
||||||
if remoteCluster != nil {
|
if remoteCluster != nil {
|
||||||
remoteClusterID = remoteCluster.RemoteId
|
remoteClusterID = remoteCluster.RemoteId
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.logger.LogDebug("OnSharedChannelsPing called", "remote_cluster_id", remoteClusterID)
|
||||||
|
|
||||||
p.logger.LogDebug("Received shared channels ping", "remote_cluster_id", remoteClusterID)
|
p.logger.LogDebug("Received shared channels ping", "remote_cluster_id", remoteClusterID)
|
||||||
|
|
||||||
// If sync is disabled, we're still "healthy" but not actively processing
|
// If sync is disabled, we're still "healthy" but not actively processing
|
||||||
|
@ -44,3 +50,119 @@ func (p *Plugin) OnSharedChannelsPing(remoteCluster *model.RemoteCluster) bool {
|
||||||
p.logger.LogDebug("Shared channels ping successful - XMPP bridge is healthy", "remote_cluster_id", remoteClusterID)
|
p.logger.LogDebug("Shared channels ping successful - XMPP bridge is healthy", "remote_cluster_id", remoteClusterID)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OnSharedChannelsSyncMsg processes sync messages from Mattermost shared channels and routes them to XMPP
|
||||||
|
func (p *Plugin) OnSharedChannelsSyncMsg(msg *model.SyncMsg, rc *model.RemoteCluster) (model.SyncResponse, error) {
|
||||||
|
config := p.getConfiguration()
|
||||||
|
|
||||||
|
// Initialize sync response
|
||||||
|
now := model.GetMillis()
|
||||||
|
response := model.SyncResponse{
|
||||||
|
PostsLastUpdateAt: now,
|
||||||
|
UsersLastUpdateAt: now,
|
||||||
|
ReactionsLastUpdateAt: now,
|
||||||
|
}
|
||||||
|
|
||||||
|
var remoteClusterID string
|
||||||
|
if rc != nil {
|
||||||
|
remoteClusterID = rc.RemoteId
|
||||||
|
}
|
||||||
|
|
||||||
|
p.logger.LogDebug("OnSharedChannelsSyncMsg called",
|
||||||
|
"remote_cluster_id", remoteClusterID,
|
||||||
|
"channel_id", msg.ChannelId,
|
||||||
|
"post_count", len(msg.Posts))
|
||||||
|
|
||||||
|
// If sync is disabled, return success but don't process
|
||||||
|
if !config.EnableSync {
|
||||||
|
p.logger.LogDebug("Sync message received but sync is disabled", "remote_cluster_id", remoteClusterID)
|
||||||
|
return response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process each post in the sync message
|
||||||
|
var processedCount int
|
||||||
|
var errors []string
|
||||||
|
|
||||||
|
for _, post := range msg.Posts {
|
||||||
|
if err := p.processSyncPost(post, msg.ChannelId, msg.Users); err != nil {
|
||||||
|
errorMsg := fmt.Sprintf("failed to process post %s: %v", post.Id, err)
|
||||||
|
errors = append(errors, errorMsg)
|
||||||
|
p.logger.LogError("Failed to process sync post", "post_id", post.Id, "error", err)
|
||||||
|
} else {
|
||||||
|
processedCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
p.logger.LogInfo("Processed sync message",
|
||||||
|
"remote_cluster_id", remoteClusterID,
|
||||||
|
"channel_id", msg.ChannelId,
|
||||||
|
"processed_posts", processedCount,
|
||||||
|
"failed_posts", len(errors))
|
||||||
|
|
||||||
|
// If we have errors, return them
|
||||||
|
if len(errors) > 0 {
|
||||||
|
return response, fmt.Errorf("failed to process %d posts: %v", len(errors), errors)
|
||||||
|
}
|
||||||
|
|
||||||
|
return response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// processSyncPost converts a Mattermost post to a bridge message and routes it to XMPP
|
||||||
|
func (p *Plugin) processSyncPost(post *model.Post, channelID string, users map[string]*model.User) error {
|
||||||
|
// Find the user who created this post
|
||||||
|
var postUser *model.User
|
||||||
|
p.logger.LogInfo("Processing sync post", "post_id", post.UserId, "users", users)
|
||||||
|
if users != nil {
|
||||||
|
postUser = users[post.UserId]
|
||||||
|
}
|
||||||
|
|
||||||
|
// If user not found in sync data, try to get from API
|
||||||
|
if postUser == nil {
|
||||||
|
var err error
|
||||||
|
postUser, err = p.API.GetUser(post.UserId)
|
||||||
|
if err != nil {
|
||||||
|
p.logger.LogWarn("Failed to get user for post", "user_id", post.UserId, "post_id", post.Id, "error", err)
|
||||||
|
// Create a placeholder user
|
||||||
|
postUser = &model.User{
|
||||||
|
Id: post.UserId,
|
||||||
|
Username: "unknown-user",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create bridge message from Mattermost post
|
||||||
|
bridgeMessage := &pluginModel.BridgeMessage{
|
||||||
|
SourceBridge: "mattermost",
|
||||||
|
SourceChannelID: channelID,
|
||||||
|
SourceUserID: postUser.Id,
|
||||||
|
SourceUserName: postUser.Username,
|
||||||
|
Content: post.Message,
|
||||||
|
MessageType: "text", // TODO: Handle other message types
|
||||||
|
Timestamp: time.Unix(post.CreateAt/1000, 0),
|
||||||
|
MessageID: post.Id,
|
||||||
|
ThreadID: post.RootId,
|
||||||
|
TargetBridges: []string{"xmpp"}, // Route to XMPP
|
||||||
|
Metadata: map[string]any{
|
||||||
|
"original_post": post,
|
||||||
|
"channel_id": channelID,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create directional message for outgoing (Mattermost -> XMPP)
|
||||||
|
directionalMessage := &pluginModel.DirectionalMessage{
|
||||||
|
BridgeMessage: bridgeMessage,
|
||||||
|
Direction: pluginModel.DirectionOutgoing,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish the message to the message bus for routing to XMPP bridge
|
||||||
|
if err := p.bridgeManager.PublishMessage(directionalMessage); err != nil {
|
||||||
|
return fmt.Errorf("failed to publish sync message to message bus: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
p.logger.LogDebug("Successfully published sync message to message bus",
|
||||||
|
"post_id", post.Id,
|
||||||
|
"channel_id", channelID,
|
||||||
|
"user", postUser.Username)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -20,11 +20,11 @@ const (
|
||||||
|
|
||||||
// CreateChannelMappingRequest contains information needed to create a channel mapping
|
// CreateChannelMappingRequest contains information needed to create a channel mapping
|
||||||
type CreateChannelMappingRequest struct {
|
type CreateChannelMappingRequest struct {
|
||||||
ChannelID string // Mattermost channel ID
|
ChannelID string // Mattermost channel ID
|
||||||
BridgeName string // Name of the bridge (e.g., "xmpp")
|
BridgeName string // Name of the bridge (e.g., "xmpp")
|
||||||
BridgeRoomID string // Remote room/channel ID (e.g., JID for XMPP)
|
BridgeChannelID string // Remote room/channel ID (e.g., JID for XMPP)
|
||||||
UserID string // ID of user who triggered the mapping creation
|
UserID string // ID of user who triggered the mapping creation
|
||||||
TeamID string // Team ID where the channel belongs
|
TeamID string // Team ID where the channel belongs
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate checks if all required fields are present and valid
|
// Validate checks if all required fields are present and valid
|
||||||
|
@ -35,8 +35,8 @@ func (r CreateChannelMappingRequest) Validate() error {
|
||||||
if r.BridgeName == "" {
|
if r.BridgeName == "" {
|
||||||
return fmt.Errorf("bridgeName cannot be empty")
|
return fmt.Errorf("bridgeName cannot be empty")
|
||||||
}
|
}
|
||||||
if r.BridgeRoomID == "" {
|
if r.BridgeChannelID == "" {
|
||||||
return fmt.Errorf("bridgeRoomID cannot be empty")
|
return fmt.Errorf("bridgeChannelID cannot be empty")
|
||||||
}
|
}
|
||||||
if r.UserID == "" {
|
if r.UserID == "" {
|
||||||
return fmt.Errorf("userID cannot be empty")
|
return fmt.Errorf("userID cannot be empty")
|
||||||
|
@ -73,6 +73,9 @@ func (r DeleteChannelMappingRequest) Validate() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
type BridgeManager interface {
|
type BridgeManager interface {
|
||||||
|
// Start starts the bridge manager and message routing system.
|
||||||
|
Start() error
|
||||||
|
|
||||||
// RegisterBridge registers a bridge with the given name. Returns an error if the name is empty,
|
// RegisterBridge registers a bridge with the given name. Returns an error if the name is empty,
|
||||||
// the bridge is nil, or a bridge with the same name is already registered.
|
// the bridge is nil, or a bridge with the same name is already registered.
|
||||||
RegisterBridge(name string, bridge Bridge) error
|
RegisterBridge(name string, bridge Bridge) error
|
||||||
|
@ -110,18 +113,21 @@ type BridgeManager interface {
|
||||||
// OnPluginConfigurationChange propagates configuration changes to all registered bridges.
|
// OnPluginConfigurationChange propagates configuration changes to all registered bridges.
|
||||||
// Returns an error if any bridge fails to update its configuration, but continues to
|
// Returns an error if any bridge fails to update its configuration, but continues to
|
||||||
// attempt updating all bridges.
|
// attempt updating all bridges.
|
||||||
OnPluginConfigurationChange(config any) error
|
OnPluginConfigurationChange(config *config.Configuration) error
|
||||||
|
|
||||||
// CreateChannelMapping is called when a channel mapping is created.
|
// CreateChannelMapping is called when a channel mapping is created.
|
||||||
CreateChannelMapping(req CreateChannelMappingRequest) error
|
CreateChannelMapping(req CreateChannelMappingRequest) error
|
||||||
|
|
||||||
// DeleteChannepMapping is called when a channel mapping is deleted.
|
// DeleteChannepMapping is called when a channel mapping is deleted.
|
||||||
DeleteChannepMapping(req DeleteChannelMappingRequest) error
|
DeleteChannepMapping(req DeleteChannelMappingRequest) error
|
||||||
|
|
||||||
|
// PublishMessage publishes a message to the message bus for routing to target bridges
|
||||||
|
PublishMessage(msg *DirectionalMessage) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type Bridge interface {
|
type Bridge interface {
|
||||||
// UpdateConfiguration updates the bridge configuration
|
// UpdateConfiguration updates the bridge configuration
|
||||||
UpdateConfiguration(config any) error
|
UpdateConfiguration(config *config.Configuration) error
|
||||||
|
|
||||||
// Start starts the bridge
|
// Start starts the bridge
|
||||||
Start() error
|
Start() error
|
||||||
|
@ -129,20 +135,20 @@ type Bridge interface {
|
||||||
// Stop stops the bridge
|
// Stop stops the bridge
|
||||||
Stop() error
|
Stop() error
|
||||||
|
|
||||||
// CreateChannelMapping creates a mapping between a Mattermost channel ID and an bridge room ID.
|
// CreateChannelMapping creates a mapping between a Mattermost channel ID and a bridge channel ID.
|
||||||
CreateChannelMapping(channelID, roomJID string) error
|
CreateChannelMapping(channelID, roomJID string) error
|
||||||
|
|
||||||
// GetChannelMapping retrieves the bridge room ID for a given Mattermost channel ID.
|
// GetChannelMapping retrieves the bridge channel ID for a given Mattermost channel ID.
|
||||||
GetChannelMapping(channelID string) (string, error)
|
GetChannelMapping(channelID string) (string, error)
|
||||||
|
|
||||||
// DeleteChannelMapping removes a mapping between a Mattermost channel ID and a bridge room ID.
|
// DeleteChannelMapping removes a mapping between a Mattermost channel ID and a bridge channel ID.
|
||||||
DeleteChannelMapping(channelID string) error
|
DeleteChannelMapping(channelID string) error
|
||||||
|
|
||||||
// RoomExists checks if a room/channel exists on the remote service.
|
// ChannelMappingExists checks if a room/channel exists on the remote service.
|
||||||
RoomExists(roomID string) (bool, error)
|
ChannelMappingExists(roomID string) (bool, error)
|
||||||
|
|
||||||
// GetRoomMapping retrieves the Mattermost channel ID for a given room ID (reverse lookup).
|
// GetChannelMappingForBridge retrieves the Mattermost channel ID for a given room ID from a specific bridge.
|
||||||
GetRoomMapping(roomID string) (string, error)
|
GetChannelMappingForBridge(bridgeName, roomID string) (string, error)
|
||||||
|
|
||||||
// IsConnected checks if the bridge is connected to the remote service.
|
// IsConnected checks if the bridge is connected to the remote service.
|
||||||
IsConnected() bool
|
IsConnected() bool
|
||||||
|
@ -152,6 +158,15 @@ type Bridge interface {
|
||||||
|
|
||||||
// GetUserManager returns the user manager for this bridge.
|
// GetUserManager returns the user manager for this bridge.
|
||||||
GetUserManager() BridgeUserManager
|
GetUserManager() BridgeUserManager
|
||||||
|
|
||||||
|
// Message handling for bidirectional communication
|
||||||
|
GetMessageChannel() <-chan *DirectionalMessage
|
||||||
|
SendMessage(msg *BridgeMessage) error
|
||||||
|
GetMessageHandler() MessageHandler
|
||||||
|
GetUserResolver() UserResolver
|
||||||
|
|
||||||
|
// GetRemoteID returns the remote ID used for shared channels registration
|
||||||
|
GetRemoteID() string
|
||||||
}
|
}
|
||||||
|
|
||||||
// BridgeUser represents a user connected to any bridge service
|
// BridgeUser represents a user connected to any bridge service
|
||||||
|
|
88
server/model/message.go
Normal file
88
server/model/message.go
Normal file
|
@ -0,0 +1,88 @@
|
||||||
|
package model
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MessageDirection indicates the direction of message flow
|
||||||
|
type MessageDirection string
|
||||||
|
|
||||||
|
const (
|
||||||
|
DirectionIncoming MessageDirection = "incoming" // From external system to us
|
||||||
|
DirectionOutgoing MessageDirection = "outgoing" // From us to external system
|
||||||
|
)
|
||||||
|
|
||||||
|
// BridgeMessage represents a message that can be passed between any bridge types
|
||||||
|
type BridgeMessage struct {
|
||||||
|
// Source information
|
||||||
|
SourceBridge string // "xmpp", "mattermost", "slack", etc.
|
||||||
|
SourceChannelID string // Channel ID in source system
|
||||||
|
SourceUserID string // User ID in source system (JID, user ID, etc.)
|
||||||
|
SourceUserName string // Display name in source system
|
||||||
|
|
||||||
|
// Message content (standardized on Markdown)
|
||||||
|
Content string // Markdown formatted message content
|
||||||
|
MessageType string // "text", "image", "file", etc.
|
||||||
|
|
||||||
|
// Metadata
|
||||||
|
Timestamp time.Time // When message was received
|
||||||
|
MessageID string // Unique message ID from source
|
||||||
|
ThreadID string // Thread/reply ID (if applicable)
|
||||||
|
|
||||||
|
// Routing hints
|
||||||
|
TargetBridges []string // Which bridges should receive this
|
||||||
|
Metadata map[string]any // Bridge-specific metadata
|
||||||
|
}
|
||||||
|
|
||||||
|
// DirectionalMessage wraps a BridgeMessage with direction information
|
||||||
|
type DirectionalMessage struct {
|
||||||
|
*BridgeMessage
|
||||||
|
Direction MessageDirection
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExternalUser represents a user from any bridge system
|
||||||
|
type ExternalUser struct {
|
||||||
|
BridgeType string // "xmpp", "slack", etc.
|
||||||
|
ExternalUserID string // JID, Slack user ID, etc.
|
||||||
|
DisplayName string // How to display this user
|
||||||
|
MattermostUserID string // Mapped Mattermost user (if exists)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UserResolver handles user resolution for a specific bridge
|
||||||
|
type UserResolver interface {
|
||||||
|
// ResolveUser converts an external user ID to an ExternalUser
|
||||||
|
ResolveUser(externalUserID string) (*ExternalUser, error)
|
||||||
|
|
||||||
|
// FormatUserMention formats a user mention for Markdown content
|
||||||
|
FormatUserMention(user *ExternalUser) string
|
||||||
|
|
||||||
|
// GetDisplayName extracts display name from external user ID
|
||||||
|
GetDisplayName(externalUserID string) string
|
||||||
|
}
|
||||||
|
|
||||||
|
// MessageBus handles routing messages between bridges
|
||||||
|
type MessageBus interface {
|
||||||
|
// Subscribe returns a channel that receives messages for the specified bridge
|
||||||
|
Subscribe(bridgeName string) <-chan *DirectionalMessage
|
||||||
|
|
||||||
|
// Publish sends a message to the message bus for routing
|
||||||
|
Publish(msg *DirectionalMessage) error
|
||||||
|
|
||||||
|
// Start begins message routing
|
||||||
|
Start() error
|
||||||
|
|
||||||
|
// Stop ends message routing and cleans up resources
|
||||||
|
Stop() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// MessageHandler processes incoming messages for a bridge
|
||||||
|
type MessageHandler interface {
|
||||||
|
// ProcessMessage handles an incoming message
|
||||||
|
ProcessMessage(msg *DirectionalMessage) error
|
||||||
|
|
||||||
|
// CanHandleMessage determines if this handler can process the message
|
||||||
|
CanHandleMessage(msg *BridgeMessage) bool
|
||||||
|
|
||||||
|
// GetSupportedMessageTypes returns the message types this handler supports
|
||||||
|
GetSupportedMessageTypes() []string
|
||||||
|
}
|
|
@ -14,7 +14,6 @@ import (
|
||||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
|
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
|
||||||
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
||||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/store/kvstore"
|
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/store/kvstore"
|
||||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp"
|
|
||||||
"github.com/mattermost/mattermost/server/public/model"
|
"github.com/mattermost/mattermost/server/public/model"
|
||||||
"github.com/mattermost/mattermost/server/public/plugin"
|
"github.com/mattermost/mattermost/server/public/plugin"
|
||||||
"github.com/mattermost/mattermost/server/public/pluginapi"
|
"github.com/mattermost/mattermost/server/public/pluginapi"
|
||||||
|
@ -35,9 +34,6 @@ type Plugin struct {
|
||||||
// commandClient is the client used to register and execute slash commands.
|
// commandClient is the client used to register and execute slash commands.
|
||||||
commandClient command.Command
|
commandClient command.Command
|
||||||
|
|
||||||
// xmppClient is the client used to communicate with XMPP servers.
|
|
||||||
xmppClient *xmpp.Client
|
|
||||||
|
|
||||||
// logger is the main plugin logger
|
// logger is the main plugin logger
|
||||||
logger logger.Logger
|
logger logger.Logger
|
||||||
|
|
||||||
|
@ -70,15 +66,11 @@ func (p *Plugin) OnActivate() error {
|
||||||
|
|
||||||
p.kvstore = kvstore.NewKVStore(p.client)
|
p.kvstore = kvstore.NewKVStore(p.client)
|
||||||
|
|
||||||
p.initXMPPClient()
|
|
||||||
|
|
||||||
// Load configuration directly
|
// Load configuration directly
|
||||||
cfg := p.getConfiguration()
|
cfg := p.getConfiguration()
|
||||||
p.logger.LogDebug("Loaded configuration in OnActivate", "config", cfg)
|
|
||||||
|
|
||||||
// Register the plugin for shared channels
|
// Register the plugin for shared channels
|
||||||
if err := p.registerForSharedChannels(); err != nil {
|
if err := p.registerForSharedChannels(); err != nil {
|
||||||
p.logger.LogError("Failed to register for shared channels", "error", err)
|
|
||||||
return fmt.Errorf("failed to register for shared channels: %w", err)
|
return fmt.Errorf("failed to register for shared channels: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,11 +79,15 @@ func (p *Plugin) OnActivate() error {
|
||||||
|
|
||||||
// Initialize and register bridges with current configuration
|
// Initialize and register bridges with current configuration
|
||||||
if err := p.initBridges(*cfg); err != nil {
|
if err := p.initBridges(*cfg); err != nil {
|
||||||
p.logger.LogError("Failed to initialize bridges", "error", err)
|
|
||||||
return fmt.Errorf("failed to initialize bridges: %w", err)
|
return fmt.Errorf("failed to initialize bridges: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.commandClient = command.NewCommandHandler(p.client, p.bridgeManager)
|
p.commandClient = command.NewCommandHandler(p.client, p.API, p.bridgeManager)
|
||||||
|
|
||||||
|
// Start the bridge manager (this starts message routing)
|
||||||
|
if err := p.bridgeManager.Start(); err != nil {
|
||||||
|
return fmt.Errorf("failed to start bridge manager: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Start all bridges
|
// Start all bridges
|
||||||
for _, bridgeName := range p.bridgeManager.ListBridges() {
|
for _, bridgeName := range p.bridgeManager.ListBridges() {
|
||||||
|
@ -145,18 +141,6 @@ func (p *Plugin) ExecuteCommand(c *plugin.Context, args *model.CommandArgs) (*mo
|
||||||
return response, nil
|
return response, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Plugin) initXMPPClient() {
|
|
||||||
cfg := p.getConfiguration()
|
|
||||||
p.xmppClient = xmpp.NewClient(
|
|
||||||
cfg.XMPPServerURL,
|
|
||||||
cfg.XMPPUsername,
|
|
||||||
cfg.XMPPPassword,
|
|
||||||
cfg.GetXMPPResource(),
|
|
||||||
p.remoteID,
|
|
||||||
p.logger,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Plugin) initBridges(cfg config.Configuration) error {
|
func (p *Plugin) initBridges(cfg config.Configuration) error {
|
||||||
// Create and register XMPP bridge
|
// Create and register XMPP bridge
|
||||||
xmppBridge := xmppbridge.NewBridge(
|
xmppBridge := xmppbridge.NewBridge(
|
||||||
|
@ -164,6 +148,7 @@ func (p *Plugin) initBridges(cfg config.Configuration) error {
|
||||||
p.API,
|
p.API,
|
||||||
p.kvstore,
|
p.kvstore,
|
||||||
&cfg,
|
&cfg,
|
||||||
|
p.remoteID,
|
||||||
)
|
)
|
||||||
|
|
||||||
if err := p.bridgeManager.RegisterBridge("xmpp", xmppBridge); err != nil {
|
if err := p.bridgeManager.RegisterBridge("xmpp", xmppBridge); err != nil {
|
||||||
|
@ -176,6 +161,8 @@ func (p *Plugin) initBridges(cfg config.Configuration) error {
|
||||||
p.API,
|
p.API,
|
||||||
p.kvstore,
|
p.kvstore,
|
||||||
&cfg,
|
&cfg,
|
||||||
|
p.botUserID,
|
||||||
|
"mattermost",
|
||||||
)
|
)
|
||||||
|
|
||||||
if err := p.bridgeManager.RegisterBridge("mattermost", mattermostBridge); err != nil {
|
if err := p.bridgeManager.RegisterBridge("mattermost", mattermostBridge); err != nil {
|
||||||
|
@ -203,7 +190,7 @@ func (p *Plugin) registerForSharedChannels() error {
|
||||||
PluginID: manifest.Id,
|
PluginID: manifest.Id,
|
||||||
CreatorID: botUserID,
|
CreatorID: botUserID,
|
||||||
AutoShareDMs: false,
|
AutoShareDMs: false,
|
||||||
AutoInvited: true,
|
AutoInvited: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
remoteID, appErr := p.API.RegisterPluginForSharedChannels(opts)
|
remoteID, appErr := p.API.RegisterPluginForSharedChannels(opts)
|
||||||
|
|
|
@ -7,72 +7,17 @@ import "strings"
|
||||||
// to ensure consistency and avoid key conflicts.
|
// to ensure consistency and avoid key conflicts.
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// CurrentKVStoreVersion is the current version requiring migrations
|
|
||||||
CurrentKVStoreVersion = 2
|
|
||||||
// KeyPrefixXMPPUser is the prefix for XMPP user ID -> Mattermost user ID mappings
|
|
||||||
KeyPrefixXMPPUser = "xmpp_user_"
|
|
||||||
// KeyPrefixMattermostUser is the prefix for Mattermost user ID -> XMPP user ID mappings
|
|
||||||
KeyPrefixMattermostUser = "mattermost_user_"
|
|
||||||
|
|
||||||
// KeyPrefixChannelMap is the prefix for bridge-agnostic channel mappings
|
// KeyPrefixChannelMap is the prefix for bridge-agnostic channel mappings
|
||||||
KeyPrefixChannelMap = "channel_map_"
|
KeyPrefixChannelMap = "channel_map_"
|
||||||
|
|
||||||
// KeyPrefixGhostUser is the prefix for Mattermost user ID -> XMPP ghost user ID cache
|
|
||||||
KeyPrefixGhostUser = "ghost_user_"
|
|
||||||
// KeyPrefixGhostRoom is the prefix for ghost user room membership tracking
|
|
||||||
KeyPrefixGhostRoom = "ghost_room_"
|
|
||||||
|
|
||||||
// KeyPrefixXMPPEventPost is the prefix for XMPP event ID -> Mattermost post ID mappings
|
|
||||||
KeyPrefixXMPPEventPost = "xmpp_event_post_"
|
|
||||||
// KeyPrefixXMPPReaction is the prefix for XMPP reaction event ID -> reaction info mappings
|
|
||||||
KeyPrefixXMPPReaction = "xmpp_reaction_"
|
|
||||||
|
|
||||||
// KeyStoreVersion is the key for tracking the current KV store schema version
|
|
||||||
KeyStoreVersion = "kv_store_version"
|
|
||||||
|
|
||||||
// KeyPrefixLegacyDMMapping was the old prefix for DM mappings
|
|
||||||
KeyPrefixLegacyDMMapping = "dm_mapping_"
|
|
||||||
// KeyPrefixLegacyXMPPDMMapping was the old prefix for XMPP DM mappings
|
|
||||||
KeyPrefixLegacyXMPPDMMapping = "xmpp_dm_mapping_"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Helper functions for building KV store keys
|
// Helper functions for building KV store keys
|
||||||
|
|
||||||
// BuildXMPPUserKey creates a key for XMPP user -> Mattermost user mapping
|
|
||||||
func BuildXMPPUserKey(xmppUserID string) string {
|
|
||||||
return KeyPrefixXMPPUser + xmppUserID
|
|
||||||
}
|
|
||||||
|
|
||||||
// BuildMattermostUserKey creates a key for Mattermost user -> XMPP user mapping
|
|
||||||
func BuildMattermostUserKey(mattermostUserID string) string {
|
|
||||||
return KeyPrefixMattermostUser + mattermostUserID
|
|
||||||
}
|
|
||||||
|
|
||||||
// BuildChannelMapKey creates a bridge-agnostic key for channel mappings
|
// BuildChannelMapKey creates a bridge-agnostic key for channel mappings
|
||||||
func BuildChannelMapKey(bridgeName, identifier string) string {
|
func BuildChannelMapKey(bridgeName, identifier string) string {
|
||||||
return KeyPrefixChannelMap + bridgeName + "_" + identifier
|
return KeyPrefixChannelMap + bridgeName + "_" + identifier
|
||||||
}
|
}
|
||||||
|
|
||||||
// BuildGhostUserKey creates a key for ghost user cache
|
|
||||||
func BuildGhostUserKey(mattermostUserID string) string {
|
|
||||||
return KeyPrefixGhostUser + mattermostUserID
|
|
||||||
}
|
|
||||||
|
|
||||||
// BuildGhostRoomKey creates a key for ghost user room membership
|
|
||||||
func BuildGhostRoomKey(mattermostUserID, roomID string) string {
|
|
||||||
return KeyPrefixGhostRoom + mattermostUserID + "_" + roomID
|
|
||||||
}
|
|
||||||
|
|
||||||
// BuildXMPPEventPostKey creates a key for XMPP event -> post mapping
|
|
||||||
func BuildXMPPEventPostKey(xmppEventID string) string {
|
|
||||||
return KeyPrefixXMPPEventPost + xmppEventID
|
|
||||||
}
|
|
||||||
|
|
||||||
// BuildXMPPReactionKey creates a key for XMPP reaction storage
|
|
||||||
func BuildXMPPReactionKey(reactionEventID string) string {
|
|
||||||
return KeyPrefixXMPPReaction + reactionEventID
|
|
||||||
}
|
|
||||||
|
|
||||||
// ExtractIdentifierFromChannelMapKey extracts the identifier from a bridge-agnostic channel map key
|
// ExtractIdentifierFromChannelMapKey extracts the identifier from a bridge-agnostic channel map key
|
||||||
func ExtractIdentifierFromChannelMapKey(key, bridgeName string) string {
|
func ExtractIdentifierFromChannelMapKey(key, bridgeName string) string {
|
||||||
expectedPrefix := KeyPrefixChannelMap + bridgeName + "_"
|
expectedPrefix := KeyPrefixChannelMap + bridgeName + "_"
|
||||||
|
@ -83,4 +28,4 @@ func ExtractIdentifierFromChannelMapKey(key, bridgeName string) string {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
return key[len(expectedPrefix):]
|
return key[len(expectedPrefix):]
|
||||||
}
|
}
|
|
@ -6,10 +6,15 @@ import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/jellydator/ttlcache/v3"
|
||||||
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
|
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
|
||||||
|
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
||||||
"mellium.im/sasl"
|
"mellium.im/sasl"
|
||||||
|
"mellium.im/xmlstream"
|
||||||
"mellium.im/xmpp"
|
"mellium.im/xmpp"
|
||||||
"mellium.im/xmpp/disco"
|
"mellium.im/xmpp/disco"
|
||||||
"mellium.im/xmpp/jid"
|
"mellium.im/xmpp/jid"
|
||||||
|
@ -18,6 +23,17 @@ import (
|
||||||
"mellium.im/xmpp/stanza"
|
"mellium.im/xmpp/stanza"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// defaultOperationTimeout is the default timeout for XMPP operations
|
||||||
|
defaultOperationTimeout = 5 * time.Second
|
||||||
|
|
||||||
|
// msgBufferSize is the buffer size for incoming message channels
|
||||||
|
msgBufferSize = 1000
|
||||||
|
|
||||||
|
// messageDedupeTTL is the TTL for message deduplication cache
|
||||||
|
messageDedupeTTL = 30 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
// Client represents an XMPP client for communicating with XMPP servers.
|
// Client represents an XMPP client for communicating with XMPP servers.
|
||||||
type Client struct {
|
type Client struct {
|
||||||
serverURL string
|
serverURL string
|
||||||
|
@ -38,6 +54,12 @@ type Client struct {
|
||||||
mux *mux.ServeMux
|
mux *mux.ServeMux
|
||||||
sessionReady chan struct{}
|
sessionReady chan struct{}
|
||||||
sessionServing bool
|
sessionServing bool
|
||||||
|
|
||||||
|
// Message handling for bridge integration
|
||||||
|
incomingMessages chan *model.DirectionalMessage
|
||||||
|
|
||||||
|
// Message deduplication cache to handle XMPP server duplicates
|
||||||
|
dedupeCache *ttlcache.Cache[string, time.Time]
|
||||||
}
|
}
|
||||||
|
|
||||||
// MessageRequest represents a request to send a message.
|
// MessageRequest represents a request to send a message.
|
||||||
|
@ -85,22 +107,40 @@ type UserProfile struct {
|
||||||
// NewClient creates a new XMPP client.
|
// NewClient creates a new XMPP client.
|
||||||
func NewClient(serverURL, username, password, resource, remoteID string, logger logger.Logger) *Client {
|
func NewClient(serverURL, username, password, resource, remoteID string, logger logger.Logger) *Client {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
mucClient := &muc.Client{}
|
|
||||||
mux := mux.New("jabber:client", muc.HandleClient(mucClient))
|
|
||||||
|
|
||||||
return &Client{
|
// Create TTL cache for message deduplication
|
||||||
serverURL: serverURL,
|
dedupeCache := ttlcache.New(
|
||||||
username: username,
|
ttlcache.WithTTL[string, time.Time](messageDedupeTTL),
|
||||||
password: password,
|
)
|
||||||
resource: resource,
|
|
||||||
remoteID: remoteID,
|
// Start automatic cleanup in background
|
||||||
logger: logger,
|
go dedupeCache.Start()
|
||||||
ctx: ctx,
|
|
||||||
cancel: cancel,
|
client := &Client{
|
||||||
mucClient: mucClient,
|
serverURL: serverURL,
|
||||||
mux: mux,
|
username: username,
|
||||||
sessionReady: make(chan struct{}),
|
password: password,
|
||||||
|
resource: resource,
|
||||||
|
remoteID: remoteID,
|
||||||
|
logger: logger,
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
sessionReady: make(chan struct{}),
|
||||||
|
incomingMessages: make(chan *model.DirectionalMessage, msgBufferSize),
|
||||||
|
dedupeCache: dedupeCache,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create MUC client and set up message handling
|
||||||
|
mucClient := &muc.Client{}
|
||||||
|
client.mucClient = mucClient
|
||||||
|
|
||||||
|
// Create mux with MUC client and our message handler
|
||||||
|
mux := mux.New("jabber:client",
|
||||||
|
muc.HandleClient(mucClient),
|
||||||
|
mux.MessageFunc(stanza.GroupChatMessage, xml.Name{}, client.handleIncomingMessage))
|
||||||
|
client.mux = mux
|
||||||
|
|
||||||
|
return client
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClientWithTLS creates a new XMPP client with custom TLS configuration.
|
// NewClientWithTLS creates a new XMPP client with custom TLS configuration.
|
||||||
|
@ -115,6 +155,44 @@ func (c *Client) SetServerDomain(domain string) {
|
||||||
c.serverDomain = domain
|
c.serverDomain = domain
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// parseServerAddress parses a server URL and returns a host:port address
|
||||||
|
func (c *Client) parseServerAddress(serverURL string) (string, error) {
|
||||||
|
// Handle simple host:port format (e.g., "localhost:5222")
|
||||||
|
if host, port, err := net.SplitHostPort(serverURL); err == nil {
|
||||||
|
// Already in host:port format, validate and return
|
||||||
|
if host == "" {
|
||||||
|
return "", fmt.Errorf("empty hostname in server URL: %s", serverURL)
|
||||||
|
}
|
||||||
|
if port == "" {
|
||||||
|
return "", fmt.Errorf("empty port in server URL: %s", serverURL)
|
||||||
|
}
|
||||||
|
return serverURL, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try parsing as URL (e.g., "xmpp://localhost:5222" or "xmpps://localhost:5223")
|
||||||
|
parsedURL, err := url.Parse(serverURL)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("invalid server URL format: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
host := parsedURL.Hostname()
|
||||||
|
if host == "" {
|
||||||
|
return "", fmt.Errorf("no hostname found in server URL: %s", serverURL)
|
||||||
|
}
|
||||||
|
|
||||||
|
port := parsedURL.Port()
|
||||||
|
// Use default XMPP port if not specified
|
||||||
|
if port == "" {
|
||||||
|
if parsedURL.Scheme == "xmpps" {
|
||||||
|
port = "5223"
|
||||||
|
} else {
|
||||||
|
port = "5222"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return host + ":" + port, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Connect establishes connection to the XMPP server
|
// Connect establishes connection to the XMPP server
|
||||||
func (c *Client) Connect() error {
|
func (c *Client) Connect() error {
|
||||||
if c.session != nil {
|
if c.session != nil {
|
||||||
|
@ -150,15 +228,38 @@ func (c *Client) Connect() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use DialClientSession for proper SASL authentication
|
// Create a timeout context for the connection attempt (30 seconds)
|
||||||
c.session, err = xmpp.DialClientSession(
|
connectCtx, connectCancel := context.WithTimeout(c.ctx, 30*time.Second)
|
||||||
c.ctx,
|
defer connectCancel()
|
||||||
|
|
||||||
|
c.logger.LogDebug("Connecting to XMPP server", "server_url", c.serverURL)
|
||||||
|
|
||||||
|
// Parse server address for direct connection
|
||||||
|
// We use this instead of using the JID for the user to avoid SRV lookups that can fail
|
||||||
|
// depending on network/dns configuration and to ensure we connect directly to the specified
|
||||||
|
// server.
|
||||||
|
serverAddr, err := c.parseServerAddress(c.serverURL)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to parse server URL %s: %w", c.serverURL, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var d net.Dialer
|
||||||
|
conn, err := d.DialContext(connectCtx, "tcp", serverAddr)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to dial XMPP server at %s: %w", serverAddr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create client session with direct connection
|
||||||
|
c.session, err = xmpp.NewClientSession(
|
||||||
|
connectCtx,
|
||||||
c.jidAddr,
|
c.jidAddr,
|
||||||
|
conn,
|
||||||
xmpp.StartTLS(tlsConfig),
|
xmpp.StartTLS(tlsConfig),
|
||||||
xmpp.SASL("", c.password, sasl.Plain),
|
xmpp.SASL("", c.password, sasl.Plain),
|
||||||
xmpp.BindResource(),
|
xmpp.BindResource(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
conn.Close()
|
||||||
return fmt.Errorf("failed to establish XMPP session: %w", err)
|
return fmt.Errorf("failed to establish XMPP session: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,9 +272,12 @@ func (c *Client) Connect() error {
|
||||||
if !c.sessionServing {
|
if !c.sessionServing {
|
||||||
return fmt.Errorf("failed to start session serving")
|
return fmt.Errorf("failed to start session serving")
|
||||||
}
|
}
|
||||||
|
c.logger.LogInfo("XMPP client connected successfully", "jid", c.jidAddr.String())
|
||||||
return nil
|
return nil
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(10 * time.Second):
|
||||||
return fmt.Errorf("timeout waiting for session to be ready")
|
return fmt.Errorf("timeout waiting for session to be ready")
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
return fmt.Errorf("connection cancelled: %w", c.ctx.Err())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -206,18 +310,47 @@ func (c *Client) serveSession() {
|
||||||
|
|
||||||
// Disconnect closes the XMPP connection
|
// Disconnect closes the XMPP connection
|
||||||
func (c *Client) Disconnect() error {
|
func (c *Client) Disconnect() error {
|
||||||
if c.session != nil {
|
if c.session == nil {
|
||||||
err := c.session.Close()
|
return nil // Already disconnected
|
||||||
|
}
|
||||||
|
|
||||||
|
c.logger.LogInfo("Disconnecting XMPP client", "jid", c.jidAddr.String())
|
||||||
|
|
||||||
|
// Send offline presence before disconnecting to properly leave rooms
|
||||||
|
if err := c.SetOfflinePresence(); err != nil {
|
||||||
|
c.logger.LogWarn("Failed to set offline presence before disconnect", "error", err)
|
||||||
|
// Don't fail the disconnect for presence issues
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the session with a timeout to prevent hanging
|
||||||
|
sessionCloseCtx, cancel := context.WithTimeout(context.Background(), defaultOperationTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
sessionCloseDone := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
sessionCloseDone <- c.session.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-sessionCloseDone:
|
||||||
c.session = nil
|
c.session = nil
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
c.logger.LogWarn("Error closing XMPP session", "error", err)
|
||||||
return fmt.Errorf("failed to close XMPP session: %w", err)
|
return fmt.Errorf("failed to close XMPP session: %w", err)
|
||||||
}
|
}
|
||||||
|
case <-sessionCloseCtx.Done():
|
||||||
|
c.logger.LogWarn("Timeout closing XMPP session, forcing disconnect")
|
||||||
|
c.session = nil
|
||||||
|
// Continue with cleanup even on timeout
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.cancel != nil {
|
// Stop the TTL cache cleanup goroutine
|
||||||
c.cancel()
|
c.dedupeCache.Stop()
|
||||||
}
|
|
||||||
|
|
||||||
|
// Cancel the client context
|
||||||
|
c.cancel()
|
||||||
|
|
||||||
|
c.logger.LogInfo("XMPP client disconnected successfully")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -418,6 +551,30 @@ func (c *Client) SetOnlinePresence() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetOfflinePresence sends an offline presence stanza to indicate the client is going offline
|
||||||
|
func (c *Client) SetOfflinePresence() error {
|
||||||
|
if c.session == nil {
|
||||||
|
return fmt.Errorf("XMPP session not established")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create presence stanza indicating we're unavailable
|
||||||
|
presence := stanza.Presence{
|
||||||
|
Type: stanza.UnavailablePresence,
|
||||||
|
From: c.jidAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a context with timeout for the presence update
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), defaultOperationTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Send the presence stanza
|
||||||
|
if err := c.session.Encode(ctx, presence); err != nil {
|
||||||
|
return fmt.Errorf("failed to send offline presence: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// CheckRoomExists verifies if an XMPP room exists and is accessible using disco#info
|
// CheckRoomExists verifies if an XMPP room exists and is accessible using disco#info
|
||||||
func (c *Client) CheckRoomExists(roomJID string) (bool, error) {
|
func (c *Client) CheckRoomExists(roomJID string) (bool, error) {
|
||||||
if c.session == nil {
|
if c.session == nil {
|
||||||
|
@ -535,3 +692,118 @@ func (c *Client) Ping() error {
|
||||||
c.logger.LogDebug("XMPP ping successful", "duration", duration)
|
c.logger.LogDebug("XMPP ping successful", "duration", duration)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetMessageChannel returns the channel for incoming messages (Bridge interface)
|
||||||
|
func (c *Client) GetMessageChannel() <-chan *model.DirectionalMessage {
|
||||||
|
return c.incomingMessages
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleIncomingMessage processes incoming XMPP message stanzas
|
||||||
|
func (c *Client) handleIncomingMessage(msg stanza.Message, t xmlstream.TokenReadEncoder) error {
|
||||||
|
c.logger.LogDebug("Received XMPP message",
|
||||||
|
"from", msg.From.String(),
|
||||||
|
"to", msg.To.String(),
|
||||||
|
"type", fmt.Sprintf("%v", msg.Type))
|
||||||
|
|
||||||
|
// Only process groupchat messages for now (MUC messages from channels)
|
||||||
|
if msg.Type != stanza.GroupChatMessage {
|
||||||
|
c.logger.LogDebug("Ignoring non-groupchat message", "type", fmt.Sprintf("%v", msg.Type))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse the message body from the token reader
|
||||||
|
var msgWithBody struct {
|
||||||
|
stanza.Message
|
||||||
|
Body string `xml:"body"`
|
||||||
|
}
|
||||||
|
msgWithBody.Message = msg
|
||||||
|
|
||||||
|
d := xml.NewTokenDecoder(t)
|
||||||
|
if err := d.DecodeElement(&msgWithBody, nil); err != nil {
|
||||||
|
c.logger.LogError("Failed to decode message body", "error", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if msgWithBody.Body == "" {
|
||||||
|
c.logger.LogDebug("Message has no body, ignoring")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deduplicate messages using message ID and TTL cache
|
||||||
|
if msg.ID != "" {
|
||||||
|
// Check if this message ID is already in the cache (indicates duplicate)
|
||||||
|
if c.dedupeCache.Has(msg.ID) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Record this message in the cache with TTL
|
||||||
|
c.dedupeCache.Set(msg.ID, time.Now(), ttlcache.DefaultTTL)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract channel and user information from JIDs
|
||||||
|
channelID, err := c.extractChannelID(msg.From)
|
||||||
|
if err != nil {
|
||||||
|
c.logger.LogError("Failed to extract channel ID from JID", "from", msg.From.String(), "error", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
userID, userName := c.extractUserInfo(msg.From)
|
||||||
|
|
||||||
|
// Create BridgeMessage
|
||||||
|
bridgeMsg := &model.BridgeMessage{
|
||||||
|
SourceBridge: "xmpp",
|
||||||
|
SourceChannelID: channelID,
|
||||||
|
SourceUserID: userID,
|
||||||
|
SourceUserName: userName,
|
||||||
|
Content: msgWithBody.Body, // Already Markdown compatible
|
||||||
|
MessageType: "text",
|
||||||
|
Timestamp: time.Now(), // XMPP doesn't always provide timestamps
|
||||||
|
MessageID: msg.ID,
|
||||||
|
TargetBridges: []string{}, // Will be routed to all other bridges
|
||||||
|
Metadata: map[string]any{
|
||||||
|
"xmpp_from": msg.From.String(),
|
||||||
|
"xmpp_to": msg.To.String(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wrap in directional message
|
||||||
|
directionalMsg := &model.DirectionalMessage{
|
||||||
|
BridgeMessage: bridgeMsg,
|
||||||
|
Direction: model.DirectionIncoming,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send to message channel (non-blocking)
|
||||||
|
select {
|
||||||
|
case c.incomingMessages <- directionalMsg:
|
||||||
|
// Message queued successfully
|
||||||
|
default:
|
||||||
|
c.logger.LogWarn("Message channel full, dropping message",
|
||||||
|
"channel_id", channelID,
|
||||||
|
"user_id", userID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// extractChannelID extracts the channel ID (room bare JID) from a message JID
|
||||||
|
func (c *Client) extractChannelID(from jid.JID) (string, error) {
|
||||||
|
// For MUC messages, the channel ID is the bare JID (without resource/nickname)
|
||||||
|
return from.Bare().String(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// extractUserInfo extracts user ID and display name from a message JID
|
||||||
|
func (c *Client) extractUserInfo(from jid.JID) (string, string) {
|
||||||
|
// For MUC messages, the resource part is the nickname
|
||||||
|
nickname := from.Resourcepart()
|
||||||
|
|
||||||
|
// Use the full JID as user ID for XMPP
|
||||||
|
userID := from.String()
|
||||||
|
|
||||||
|
// Use nickname as display name if available, otherwise use full JID
|
||||||
|
displayName := nickname
|
||||||
|
if displayName == "" {
|
||||||
|
displayName = from.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
return userID, displayName
|
||||||
|
}
|
||||||
|
|
Reference in a new issue