feat: implement OnSharedChannelsSyncMsg hook for bidirectional sync
Some checks are pending
ci / plugin-ci (push) Waiting to run
Some checks are pending
ci / plugin-ci (push) Waiting to run
- Add PublishMessage method to BridgeManager interface and implementation - Implement OnSharedChannelsSyncMsg hook to process Mattermost shared channel sync messages - Add processSyncPost helper to convert Mattermost posts to bridge messages - Route sync messages from Mattermost shared channels to XMPP bridge via message bus - Handle user resolution with fallback to API calls for missing users - Add comprehensive logging and error handling for sync operations - Support routing of text messages from shared channels to XMPP rooms 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
d21dcd2dd1
commit
b1c6f21ea3
3 changed files with 139 additions and 3 deletions
|
@ -550,3 +550,14 @@ func (m *BridgeManager) StopMessageRouting() error {
|
|||
// Stop the message bus
|
||||
return m.messageBus.Stop()
|
||||
}
|
||||
|
||||
// PublishMessage publishes a message to the message bus for routing to target bridges
|
||||
func (m *BridgeManager) PublishMessage(msg *model.DirectionalMessage) error {
|
||||
m.logger.LogDebug("Publishing message to message bus",
|
||||
"source_bridge", msg.SourceBridge,
|
||||
"direction", msg.Direction,
|
||||
"target_bridges", msg.TargetBridges,
|
||||
"message_id", msg.MessageID)
|
||||
|
||||
return m.messageBus.Publish(msg)
|
||||
}
|
||||
|
|
|
@ -1,18 +1,24 @@
|
|||
package main
|
||||
|
||||
import "github.com/mattermost/mattermost/server/public/model"
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
|
||||
"github.com/mattermost/mattermost/server/public/model"
|
||||
)
|
||||
|
||||
// OnSharedChannelsPing is called to check if the bridge is healthy and ready to process messages
|
||||
func (p *Plugin) OnSharedChannelsPing(remoteCluster *model.RemoteCluster) bool {
|
||||
config := p.getConfiguration()
|
||||
|
||||
p.logger.LogDebug("OnSharedChannelsPing called", "remote_cluster_id", remoteCluster.RemoteId)
|
||||
|
||||
var remoteClusterID string
|
||||
if remoteCluster != nil {
|
||||
remoteClusterID = remoteCluster.RemoteId
|
||||
}
|
||||
|
||||
p.logger.LogDebug("OnSharedChannelsPing called", "remote_cluster_id", remoteClusterID)
|
||||
|
||||
p.logger.LogDebug("Received shared channels ping", "remote_cluster_id", remoteClusterID)
|
||||
|
||||
// If sync is disabled, we're still "healthy" but not actively processing
|
||||
|
@ -44,3 +50,119 @@ func (p *Plugin) OnSharedChannelsPing(remoteCluster *model.RemoteCluster) bool {
|
|||
p.logger.LogDebug("Shared channels ping successful - XMPP bridge is healthy", "remote_cluster_id", remoteClusterID)
|
||||
return true
|
||||
}
|
||||
|
||||
// OnSharedChannelsSyncMsg processes sync messages from Mattermost shared channels and routes them to XMPP
|
||||
func (p *Plugin) OnSharedChannelsSyncMsg(msg *model.SyncMsg, rc *model.RemoteCluster) (model.SyncResponse, error) {
|
||||
config := p.getConfiguration()
|
||||
|
||||
// Initialize sync response
|
||||
now := model.GetMillis()
|
||||
response := model.SyncResponse{
|
||||
PostsLastUpdateAt: now,
|
||||
UsersLastUpdateAt: now,
|
||||
ReactionsLastUpdateAt: now,
|
||||
}
|
||||
|
||||
var remoteClusterID string
|
||||
if rc != nil {
|
||||
remoteClusterID = rc.RemoteId
|
||||
}
|
||||
|
||||
p.logger.LogDebug("OnSharedChannelsSyncMsg called",
|
||||
"remote_cluster_id", remoteClusterID,
|
||||
"channel_id", msg.ChannelId,
|
||||
"post_count", len(msg.Posts))
|
||||
|
||||
// If sync is disabled, return success but don't process
|
||||
if !config.EnableSync {
|
||||
p.logger.LogDebug("Sync message received but sync is disabled", "remote_cluster_id", remoteClusterID)
|
||||
return response, nil
|
||||
}
|
||||
|
||||
// Process each post in the sync message
|
||||
var processedCount int
|
||||
var errors []string
|
||||
|
||||
for _, post := range msg.Posts {
|
||||
if err := p.processSyncPost(post, msg.ChannelId, msg.Users); err != nil {
|
||||
errorMsg := fmt.Sprintf("failed to process post %s: %v", post.Id, err)
|
||||
errors = append(errors, errorMsg)
|
||||
p.logger.LogError("Failed to process sync post", "post_id", post.Id, "error", err)
|
||||
} else {
|
||||
processedCount++
|
||||
}
|
||||
}
|
||||
|
||||
p.logger.LogInfo("Processed sync message",
|
||||
"remote_cluster_id", remoteClusterID,
|
||||
"channel_id", msg.ChannelId,
|
||||
"processed_posts", processedCount,
|
||||
"failed_posts", len(errors))
|
||||
|
||||
// If we have errors, return them
|
||||
if len(errors) > 0 {
|
||||
return response, fmt.Errorf("failed to process %d posts: %v", len(errors), errors)
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
// processSyncPost converts a Mattermost post to a bridge message and routes it to XMPP
|
||||
func (p *Plugin) processSyncPost(post *model.Post, channelID string, users map[string]*model.User) error {
|
||||
// Find the user who created this post
|
||||
var postUser *model.User
|
||||
p.logger.LogInfo("Processing sync post", "post_id", post.UserId, "users", users)
|
||||
if users != nil {
|
||||
postUser = users[post.UserId]
|
||||
}
|
||||
|
||||
// If user not found in sync data, try to get from API
|
||||
if postUser == nil {
|
||||
var err error
|
||||
postUser, err = p.API.GetUser(post.UserId)
|
||||
if err != nil {
|
||||
p.logger.LogWarn("Failed to get user for post", "user_id", post.UserId, "post_id", post.Id, "error", err)
|
||||
// Create a placeholder user
|
||||
postUser = &model.User{
|
||||
Id: post.UserId,
|
||||
Username: "unknown-user",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create bridge message from Mattermost post
|
||||
bridgeMessage := &pluginModel.BridgeMessage{
|
||||
SourceBridge: "mattermost",
|
||||
SourceChannelID: channelID,
|
||||
SourceUserID: postUser.Id,
|
||||
SourceUserName: postUser.Username,
|
||||
Content: post.Message,
|
||||
MessageType: "text", // TODO: Handle other message types
|
||||
Timestamp: time.Unix(post.CreateAt/1000, 0),
|
||||
MessageID: post.Id,
|
||||
ThreadID: post.RootId,
|
||||
TargetBridges: []string{"xmpp"}, // Route to XMPP
|
||||
Metadata: map[string]any{
|
||||
"original_post": post,
|
||||
"channel_id": channelID,
|
||||
},
|
||||
}
|
||||
|
||||
// Create directional message for outgoing (Mattermost -> XMPP)
|
||||
directionalMessage := &pluginModel.DirectionalMessage{
|
||||
BridgeMessage: bridgeMessage,
|
||||
Direction: pluginModel.DirectionOutgoing,
|
||||
}
|
||||
|
||||
// Publish the message to the message bus for routing to XMPP bridge
|
||||
if err := p.bridgeManager.PublishMessage(directionalMessage); err != nil {
|
||||
return fmt.Errorf("failed to publish sync message to message bus: %w", err)
|
||||
}
|
||||
|
||||
p.logger.LogDebug("Successfully published sync message to message bus",
|
||||
"post_id", post.Id,
|
||||
"channel_id", channelID,
|
||||
"user", postUser.Username)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -120,6 +120,9 @@ type BridgeManager interface {
|
|||
|
||||
// DeleteChannepMapping is called when a channel mapping is deleted.
|
||||
DeleteChannepMapping(req DeleteChannelMappingRequest) error
|
||||
|
||||
// PublishMessage publishes a message to the message bus for routing to target bridges
|
||||
PublishMessage(msg *DirectionalMessage) error
|
||||
}
|
||||
|
||||
type Bridge interface {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue