diff --git a/server/bridge/manager.go b/server/bridge/manager.go index be26322..9984410 100644 --- a/server/bridge/manager.go +++ b/server/bridge/manager.go @@ -550,3 +550,14 @@ func (m *BridgeManager) StopMessageRouting() error { // Stop the message bus return m.messageBus.Stop() } + +// PublishMessage publishes a message to the message bus for routing to target bridges +func (m *BridgeManager) PublishMessage(msg *model.DirectionalMessage) error { + m.logger.LogDebug("Publishing message to message bus", + "source_bridge", msg.SourceBridge, + "direction", msg.Direction, + "target_bridges", msg.TargetBridges, + "message_id", msg.MessageID) + + return m.messageBus.Publish(msg) +} diff --git a/server/hooks_sharedchannels.go b/server/hooks_sharedchannels.go index 798ba0f..e49df2b 100644 --- a/server/hooks_sharedchannels.go +++ b/server/hooks_sharedchannels.go @@ -1,18 +1,24 @@ package main -import "github.com/mattermost/mattermost/server/public/model" +import ( + "fmt" + "time" + + pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model" + "github.com/mattermost/mattermost/server/public/model" +) // OnSharedChannelsPing is called to check if the bridge is healthy and ready to process messages func (p *Plugin) OnSharedChannelsPing(remoteCluster *model.RemoteCluster) bool { config := p.getConfiguration() - p.logger.LogDebug("OnSharedChannelsPing called", "remote_cluster_id", remoteCluster.RemoteId) - var remoteClusterID string if remoteCluster != nil { remoteClusterID = remoteCluster.RemoteId } + p.logger.LogDebug("OnSharedChannelsPing called", "remote_cluster_id", remoteClusterID) + p.logger.LogDebug("Received shared channels ping", "remote_cluster_id", remoteClusterID) // If sync is disabled, we're still "healthy" but not actively processing @@ -44,3 +50,119 @@ func (p *Plugin) OnSharedChannelsPing(remoteCluster *model.RemoteCluster) bool { p.logger.LogDebug("Shared channels ping successful - XMPP bridge is healthy", "remote_cluster_id", remoteClusterID) return true } + +// OnSharedChannelsSyncMsg processes sync messages from Mattermost shared channels and routes them to XMPP +func (p *Plugin) OnSharedChannelsSyncMsg(msg *model.SyncMsg, rc *model.RemoteCluster) (model.SyncResponse, error) { + config := p.getConfiguration() + + // Initialize sync response + now := model.GetMillis() + response := model.SyncResponse{ + PostsLastUpdateAt: now, + UsersLastUpdateAt: now, + ReactionsLastUpdateAt: now, + } + + var remoteClusterID string + if rc != nil { + remoteClusterID = rc.RemoteId + } + + p.logger.LogDebug("OnSharedChannelsSyncMsg called", + "remote_cluster_id", remoteClusterID, + "channel_id", msg.ChannelId, + "post_count", len(msg.Posts)) + + // If sync is disabled, return success but don't process + if !config.EnableSync { + p.logger.LogDebug("Sync message received but sync is disabled", "remote_cluster_id", remoteClusterID) + return response, nil + } + + // Process each post in the sync message + var processedCount int + var errors []string + + for _, post := range msg.Posts { + if err := p.processSyncPost(post, msg.ChannelId, msg.Users); err != nil { + errorMsg := fmt.Sprintf("failed to process post %s: %v", post.Id, err) + errors = append(errors, errorMsg) + p.logger.LogError("Failed to process sync post", "post_id", post.Id, "error", err) + } else { + processedCount++ + } + } + + p.logger.LogInfo("Processed sync message", + "remote_cluster_id", remoteClusterID, + "channel_id", msg.ChannelId, + "processed_posts", processedCount, + "failed_posts", len(errors)) + + // If we have errors, return them + if len(errors) > 0 { + return response, fmt.Errorf("failed to process %d posts: %v", len(errors), errors) + } + + return response, nil +} + +// processSyncPost converts a Mattermost post to a bridge message and routes it to XMPP +func (p *Plugin) processSyncPost(post *model.Post, channelID string, users map[string]*model.User) error { + // Find the user who created this post + var postUser *model.User + p.logger.LogInfo("Processing sync post", "post_id", post.UserId, "users", users) + if users != nil { + postUser = users[post.UserId] + } + + // If user not found in sync data, try to get from API + if postUser == nil { + var err error + postUser, err = p.API.GetUser(post.UserId) + if err != nil { + p.logger.LogWarn("Failed to get user for post", "user_id", post.UserId, "post_id", post.Id, "error", err) + // Create a placeholder user + postUser = &model.User{ + Id: post.UserId, + Username: "unknown-user", + } + } + } + + // Create bridge message from Mattermost post + bridgeMessage := &pluginModel.BridgeMessage{ + SourceBridge: "mattermost", + SourceChannelID: channelID, + SourceUserID: postUser.Id, + SourceUserName: postUser.Username, + Content: post.Message, + MessageType: "text", // TODO: Handle other message types + Timestamp: time.Unix(post.CreateAt/1000, 0), + MessageID: post.Id, + ThreadID: post.RootId, + TargetBridges: []string{"xmpp"}, // Route to XMPP + Metadata: map[string]any{ + "original_post": post, + "channel_id": channelID, + }, + } + + // Create directional message for outgoing (Mattermost -> XMPP) + directionalMessage := &pluginModel.DirectionalMessage{ + BridgeMessage: bridgeMessage, + Direction: pluginModel.DirectionOutgoing, + } + + // Publish the message to the message bus for routing to XMPP bridge + if err := p.bridgeManager.PublishMessage(directionalMessage); err != nil { + return fmt.Errorf("failed to publish sync message to message bus: %w", err) + } + + p.logger.LogDebug("Successfully published sync message to message bus", + "post_id", post.Id, + "channel_id", channelID, + "user", postUser.Username) + + return nil +} diff --git a/server/model/bridge.go b/server/model/bridge.go index 73da3ec..dcf67df 100644 --- a/server/model/bridge.go +++ b/server/model/bridge.go @@ -120,6 +120,9 @@ type BridgeManager interface { // DeleteChannepMapping is called when a channel mapping is deleted. DeleteChannepMapping(req DeleteChannelMappingRequest) error + + // PublishMessage publishes a message to the message bus for routing to target bridges + PublishMessage(msg *DirectionalMessage) error } type Bridge interface {