Compare commits

...

13 commits

Author SHA1 Message Date
1bbb510870
docs: add comprehensive architecture documentation
Some checks failed
ci / plugin-ci (push) Has been cancelled
- Add complete system architecture with Mermaid diagrams
- Document XEP Extension System architecture and lifecycle
- Include XEP-0077 In-Band Registration implementation details
- Cover bridge system, message flow, and data persistence
- Document XMPP client doctor tool with XEP testing capabilities
- Provide extension points and development workflow guidance
- Add comprehensive component documentation and code organization

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-06 19:19:55 +02:00
b3c2d4b724
chore: give plugin initial version 2025-08-06 19:17:07 +02:00
275dd4bb8e
pluginctl: 0.1.4 2025-08-06 19:16:59 +02:00
a76200f4b9
feat: implement XEP-0077 In-Band Registration support
- Add XEPFeatures framework for managing XMPP extension protocols
- Implement complete XEP-0077 In-Band Registration functionality
- Add server capability detection using disco#info queries
- Only initialize XEP features when server supports them
- Add comprehensive XEP-0077 testing to doctor command
- Doctor tests create and delete test users to validate functionality
- Add struct-based XEP management instead of dynamic maps

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-06 19:16:37 +02:00
53818ade7f
chore: remove old setup.mk 2025-08-06 18:40:07 +02:00
a4fe14081d
chore: go mod tidy 2025-08-06 18:25:37 +02:00
7c37953c28
chore: fix lint issues 2025-08-06 18:25:25 +02:00
17ea21a579
chore: remove unused binaries 2025-08-06 18:25:02 +02:00
8dcb4b535c
pluginctl: updated to 0.1.3 2025-08-06 17:20:17 +02:00
d9c0215b93
feat: implement comprehensive loop prevention and architecture improvements
- Add comprehensive loop prevention at source level for all bridges:
  - XMPP bridge: Skip messages from own XMPP connection user
  - Mattermost bridge: Skip messages from bot user and remote users
- Remove cache from getOrCreateRemoteUser method for simplified user management
- Improve XMPP client architecture with direct handler delegation:
  - Add SetMessageHandler and GetJID methods to XMPP client
  - Move protocol normalization methods to client level
  - Implement handleIncomingXMPPMessage in XMPP bridge for business logic
- Fix message direction handling in XMPP message handler
- Add remote user invitation to shared channels via InviteRemoteToChannel API
- Clean up unused code and improve code formatting

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-06 17:16:52 +02:00
11a32afc53
refactor: improve XMPP client/bridge architecture separation
- Move protocol normalization methods to XMPP client (ExtractChannelID, ExtractUserInfo, ExtractMessageBody)
- Replace message channel aggregation with direct handler delegation pattern
- XMPP client now focuses purely on protocol concerns (connection, deduplication, normalization)
- XMPP bridge handles business logic (BridgeMessage creation, routing)
- Add SourceRemoteID field to BridgeMessage for better message tracking
- Remove unused message channel infrastructure in favor of mux.MessageHandlerFunc pattern

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-06 12:31:28 +02:00
8e9d87b176
feat: implement remote user creation for Mattermost bridge
Replace bot-based message posting with remote user creation system:

- Add getOrCreateRemoteUser() method to create/retrieve remote users on-demand
- Use plugin API to find existing users by username/email before creating new ones
- Generate usernames with bridge prefix and emails with bridge.{bridgeID} domain
- Set RemoteId field to BridgeMessage.SourceRemoteID for proper loop prevention
- Cache user mappings to avoid repeated API calls
- Post messages directly as remote users instead of bot with metadata
- Remove unused message formatting since messages are posted as actual users
- Log errors for failed user creation without complex retry logic

This enables authentic user attribution in Mattermost channels while maintaining
existing loop prevention mechanisms through the RemoteId field.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-06 12:29:55 +02:00
245f5f96db
chore: register bridge id 2025-08-06 08:36:51 +02:00
35 changed files with 1810 additions and 1156 deletions

View file

@ -11,7 +11,6 @@ linters:
- nakedret
- revive
- staticcheck # Now includes gosimple and stylecheck
- typecheck
- unconvert
- unused
- whitespace

724
ARCHITECTURE.md Normal file
View file

@ -0,0 +1,724 @@
# Mattermost XMPP Bridge Plugin Architecture
## Overview
The Mattermost XMPP Bridge Plugin provides bidirectional message synchronization between Mattermost and XMPP servers through a comprehensive bridge architecture. The plugin is designed with extensibility in mind, supporting multiple bridge types and protocols through a unified interface.
## Core Design Principles
- **Bridge-Agnostic Architecture**: Extensible design supporting future bridges (Slack, Discord, etc.)
- **Bidirectional Communication**: Real-time message synchronization in both directions
- **Message Bus Pattern**: Centralized routing system for decoupled communication
- **Thread-Safe Operations**: Goroutine-safe concurrent message processing
- **Persistent Configuration**: KV store-based persistence for mappings and user data
- **Admin Security**: System administrator access control for all operations
## System Architecture
```mermaid
graph TB
subgraph "Mattermost Server"
MM[Mattermost Core]
SC[Shared Channels]
end
subgraph "Plugin Components"
P[Plugin Main]
BM[Bridge Manager]
MB[Message Bus]
CMD[Command Handler]
KV[KV Store]
LOG[Logger]
CFG[Configuration]
end
subgraph "Bridge Implementations"
XB[XMPP Bridge]
MMB[Mattermost Bridge]
FB[Future Bridges]
end
subgraph "External Systems"
XMPP[XMPP Server]
XC[XMPP Client]
end
%% Plugin lifecycle
MM -->|Activate/Configure| P
P -->|Initialize| BM
P -->|Initialize| CMD
P -->|Initialize| KV
P -->|Initialize| LOG
P -->|Load| CFG
%% Bridge management
BM -->|Register/Manage| XB
BM -->|Register/Manage| MMB
BM -->|Register/Manage| FB
BM -->|Route Messages| MB
%% Message flow
XB -->|Publish| MB
MMB -->|Publish| MB
MB -->|Subscribe| XB
MB -->|Subscribe| MMB
%% XMPP integration
XB -->|Control| XC
XC <-->|XMPP Protocol| XMPP
%% Command processing
MM -->|Slash Commands| CMD
CMD -->|Bridge Operations| BM
%% Storage
BM -->|Persist| KV
XB -->|Store Mappings| KV
MMB -->|Store Mappings| KV
%% Shared channels integration
SC -->|Health Check| P
P -->|Bridge Status| BM
style P fill:#e1f5fe
style BM fill:#f3e5f5
style MB fill:#e8f5e8
style XB fill:#fff3e0
style MMB fill:#fff3e0
```
## Bridge System Architecture
The bridge system implements a pluggable architecture where each bridge type implements a common interface, enabling seamless addition of new protocols.
```mermaid
graph TB
subgraph "Bridge Interface"
BI[Bridge Interface]
BI --> |Lifecycle| LC[Start/Stop/UpdateConfig]
BI --> |Messaging| MSG[GetMessageChannel/SendMessage]
BI --> |Mappings| MAP[Create/Get/DeleteChannelMapping]
BI --> |Health| HEALTH[IsConnected/Ping/ChannelMappingExists]
BI --> |Users| USR[GetUserManager/GetUserResolver]
BI --> |Handlers| HAND[GetMessageHandler]
end
subgraph "XMPP Bridge Implementation"
XB[XMPP Bridge]
XMH[XMPP Message Handler]
XUR[XMPP User Resolver]
XUM[XMPP User Manager]
XC[XMPP Client - mellium.im/xmpp]
end
subgraph "Mattermost Bridge Implementation"
MB[Mattermost Bridge]
MMH[MM Message Handler]
MUR[MM User Resolver]
MUM[MM User Manager]
API[Mattermost API]
end
subgraph "Future Bridge Example"
SB[Slack Bridge]
SH[Slack Handler]
SR[Slack Resolver]
SAPI[Slack API]
end
%% Interface implementation
BI -.->|implements| XB
BI -.->|implements| MB
BI -.->|implements| SB
%% XMPP Bridge components
XB --> XMH
XB --> XUR
XB --> XUM
XB --> XC
%% Mattermost Bridge components
MB --> MMH
MB --> MUR
MB --> MUM
MB --> API
%% Future bridge components
SB --> SH
SB --> SR
SB --> SAPI
%% External connections
XC <--> XMPP_SRV[XMPP Server]
API <--> MM_CORE[Mattermost Core]
SAPI <--> SLACK_API[Slack API]
style BI fill:#e3f2fd
style XB fill:#fff3e0
style MB fill:#e8f5e8
style SB fill:#fce4ec
```
## XEP Extension System Architecture
The XMPP client includes an extensible XEP (XMPP Extension Protocol) framework that dynamically discovers and enables server-supported features.
```mermaid
graph TB
subgraph "XMPP Client"
CLIENT[XMPP Client]
DISCO[Server Discovery]
XEPF[XEP Features Manager]
end
subgraph "XEP Framework"
HANDLER[XEP Handler Interface]
FEATURES[XEP Features Struct]
MUTEX[Thread Safety Mutex]
end
subgraph "XEP-0077 Implementation"
IBR[InBandRegistration]
REG[RegisterAccount]
CHANGE[ChangePassword]
CANCEL[CancelRegistration]
FIELDS[GetRegistrationFields]
end
subgraph "Future XEPs"
XEP_N[XEP-XXXX Future]
IMPL_N[Implementation N]
end
subgraph "XMPP Server"
SRV[XMPP Server]
CAPS[Server Capabilities]
NS[Supported Namespaces]
end
%% Client initialization flow
CLIENT -->|Connect| SRV
CLIENT -->|Discovery Query| DISCO
DISCO -->|disco#info| SRV
SRV -->|Capabilities| CAPS
CAPS -->|Feature List| NS
%% XEP detection and initialization
DISCO -->|Check Support| XEPF
XEPF -->|Supported Features| FEATURES
FEATURES -->|XEP-0077 Available| IBR
FEATURES -->|Future XEPs| XEP_N
%% XEP operations
IBR -->|Account Management| REG
IBR -->|Password Ops| CHANGE
IBR -->|Cleanup| CANCEL
IBR -->|Field Discovery| FIELDS
%% Server interactions
REG <-->|IQ Stanzas| SRV
CHANGE <-->|IQ Stanzas| SRV
CANCEL <-->|IQ Stanzas| SRV
FIELDS <-->|IQ Stanzas| SRV
%% Framework structure
HANDLER -.->|Interface| IBR
HANDLER -.->|Interface| XEP_N
FEATURES -->|Manage| MUTEX
style CLIENT fill:#e3f2fd
style XEPF fill:#f3e5f5
style IBR fill:#fff3e0
style SRV fill:#e8f5e8
```
### XEP Feature Lifecycle
```mermaid
sequenceDiagram
participant C as XMPP Client
participant D as Server Discovery
participant XM as XEP Manager
participant XEP as XEP-0077
participant S as XMPP Server
Note over C,S: Server Capability Detection
C->>+D: Connect & Authenticate
D->>+S: disco#info query
S->>-D: Server capabilities & namespaces
D->>-C: Feature discovery complete
Note over C,S: XEP Initialization (Async)
C->>+XM: detectServerCapabilities()
XM->>XM: Check jabber:iq:register namespace
alt XEP-0077 Supported
XM->>+XEP: NewInBandRegistration()
XEP->>-XM: Initialized XEP instance
XM->>XM: features.InBandRegistration = xep
XM->>C: XEP-0077 enabled
else XEP-0077 Not Supported
XM->>C: XEP-0077 skipped
end
XM->>-C: Capability detection complete
Note over C,S: XEP Usage
C->>+XEP: RegisterAccount(jid, request)
XEP->>+S: IQ set (registration)
S->>-XEP: IQ result/error
XEP->>-C: Registration response
```
## Message Flow Architecture
The message bus provides centralized routing for bidirectional message synchronization with loop prevention and scalable message processing.
```mermaid
sequenceDiagram
participant XS as XMPP Server
participant XC as XMPP Client
participant XB as XMPP Bridge
participant MB as Message Bus
participant MMB as Mattermost Bridge
participant MM as Mattermost Core
Note over XS,MM: Incoming Message Flow (XMPP → Mattermost)
XS->>XC: XMPP Message (MUC)
XC->>XC: Parse & Validate
XC->>XB: DirectionalMessage (Incoming)
XB->>XB: Message Aggregation
XB->>MB: Publish Message
MB->>MB: Route to Subscribers
MB->>MMB: Forward Message
MMB->>MMB: Process & Format
MMB->>MM: Create Post
Note over XS,MM: Outgoing Message Flow (Mattermost → XMPP)
MM->>MMB: Post Created Hook
MMB->>MMB: Convert to BridgeMessage
MMB->>MB: Publish Message
MB->>MB: Route to Subscribers
MB->>XB: Forward Message
XB->>XB: Format for XMPP
XB->>XC: Send Message Request
XC->>XS: XMPP Message (MUC)
Note over XS,MM: Loop Prevention
XB->>XB: Check msg.SourceBridge != "xmpp"
MMB->>MMB: Check msg.SourceBridge != "mattermost"
```
## Data Model & Storage Schema
The plugin uses Mattermost's KV store for persistent data with a structured key schema supporting multiple bridge types and user mappings.
```mermaid
erDiagram
CHANNEL_MAPPINGS {
string key PK "channel_map_{bridge}_{identifier}"
string channel_id "Mattermost Channel ID"
string bridge_channel_id "Bridge Channel ID (JID, etc.)"
string bridge_type "xmpp, mattermost, slack"
}
USER_MAPPINGS {
string xmpp_key PK "xmpp_user_{jid}"
string mm_key PK "mattermost_user_{user_id}"
string xmpp_jid "Full XMPP JID"
string mm_user_id "Mattermost User ID"
string display_name "User Display Name"
}
GHOST_USERS {
string key PK "ghost_user_{mm_user_id}"
string ghost_jid "Generated Ghost JID"
json room_memberships "List of joined rooms"
timestamp last_activity
}
EVENT_MAPPINGS {
string key PK "xmpp_event_post_{event_id}"
string xmpp_event_id "XMPP Message/Event ID"
string mm_post_id "Mattermost Post ID"
string channel_id "Channel where posted"
}
REACTIONS {
string key PK "xmpp_reaction_{reaction_id}"
string post_id "Target Post ID"
string user_id "User who reacted"
string emoji_name "Reaction emoji"
boolean removed "Reaction removed flag"
}
CONFIG_META {
string key PK "kv_store_version"
int version "Schema version for migrations"
}
CHANNEL_MAPPINGS ||--o{ USER_MAPPINGS : "users_in_channel"
CHANNEL_MAPPINGS ||--o{ GHOST_USERS : "ghosts_in_channel"
CHANNEL_MAPPINGS ||--o{ EVENT_MAPPINGS : "events_in_channel"
EVENT_MAPPINGS ||--o{ REACTIONS : "reactions_to_event"
```
## Command Processing Flow
The plugin provides slash commands for channel mapping management with comprehensive error handling and admin-only access control.
```mermaid
flowchart TD
CMD_INPUT["/xmppbridge map room@server.com"]
CMD_INPUT --> PARSE[Parse Command]
PARSE --> AUTH{Admin Check}
AUTH -->|Not Admin| DENY[❌ Admin Required]
AUTH -->|Admin| VALIDATE[Validate Arguments]
VALIDATE --> FORMAT{Valid JID Format?}
FORMAT -->|Invalid| ERR_FORMAT[❌ Invalid JID Format]
FORMAT -->|Valid| BRIDGE_CHECK[Get XMPP Bridge]
BRIDGE_CHECK --> BRIDGE_EXIST{Bridge Available?}
BRIDGE_EXIST -->|No| ERR_BRIDGE[❌ Bridge Not Available]
BRIDGE_EXIST -->|Yes| CONN_CHECK{Bridge Connected?}
CONN_CHECK -->|No| ERR_CONN[❌ Bridge Not Connected]
CONN_CHECK -->|Yes| EXISTING[Check Existing Mapping]
EXISTING --> HAS_MAPPING{Already Mapped?}
HAS_MAPPING -->|Yes| ERR_EXISTS[❌ Channel Already Mapped]
HAS_MAPPING -->|No| CHANNEL_CHECK[Check Channel Exists]
CHANNEL_CHECK --> CHANNEL_EXIST{Channel Exists?}
CHANNEL_EXIST -->|No| ERR_CHANNEL[❌ Channel Not Found]
CHANNEL_EXIST -->|Yes| CREATE_MAPPING[Create Channel Mapping]
CREATE_MAPPING --> STORE[Store in KV Store]
STORE --> JOIN[Join XMPP Room]
JOIN --> SUCCESS[✅ Mapping Created]
%% Error handling
STORE --> STORE_ERR{Storage Error?}
STORE_ERR -->|Yes| ERR_STORE[❌ Storage Failed]
JOIN --> JOIN_ERR{Join Error?}
JOIN_ERR -->|Yes| WARN_JOIN[⚠️ Mapping Created, Join Failed]
style SUCCESS fill:#c8e6c9
style DENY fill:#ffcdd2
style ERR_FORMAT fill:#ffcdd2
style ERR_BRIDGE fill:#ffcdd2
style ERR_CONN fill:#ffcdd2
style ERR_EXISTS fill:#ffcdd2
style ERR_ROOM fill:#ffcdd2
style ERR_STORE fill:#ffcdd2
style WARN_JOIN fill:#fff3e0
```
## Configuration Management
Configuration flows from Mattermost system settings through the plugin to individual bridge instances with validation and hot-reload support.
```mermaid
graph LR
subgraph "Mattermost Admin Console"
ADMIN[System Admin]
UI[Plugin Settings UI]
end
subgraph "Plugin Configuration"
CFG[Configuration Struct]
VALID[Validation Logic]
CHANGE[OnConfigurationChange Hook]
end
subgraph "Bridge Configuration"
BM[Bridge Manager]
XB[XMPP Bridge]
MMB[Mattermost Bridge]
end
subgraph "Runtime Components"
XC[XMPP Client]
TLS[TLS Config]
AUTH[Authentication]
CONN[Connection Pool]
end
%% Configuration flow
ADMIN -->|Update Settings| UI
UI -->|Save| CFG
CFG -->|Validate| VALID
VALID -->|Invalid| ERR[❌ Validation Error]
VALID -->|Valid| CHANGE
%% Propagation to bridges
CHANGE -->|Notify| BM
BM -->|Update| XB
BM -->|Update| MMB
%% XMPP Bridge reconfiguration
XB -->|Reconnect| XC
XB -->|Update| TLS
XB -->|Update| AUTH
XC -->|Manage| CONN
%% Configuration settings
CFG -.->|XMPPServerURL| XC
CFG -.->|XMPPUsername| AUTH
CFG -.->|XMPPPassword| AUTH
CFG -.->|XMPPResource| XC
CFG -.->|EnableSync| XB
CFG -.->|XMPPInsecureSkipVerify| TLS
style CFG fill:#e3f2fd
style VALID fill:#f3e5f5
style ERR fill:#ffcdd2
```
## Core Components
### Plugin Main (`server/plugin.go`)
- **Lifecycle Management**: Handles plugin activation/deactivation
- **Component Initialization**: Sets up all subsystems in proper order
- **Shared Channels Integration**: Registers for distributed health checks
- **Background Jobs**: Manages periodic maintenance tasks
### Bridge Manager (`server/bridge/manager.go`)
- **Bridge Registry**: Manages multiple bridge instances
- **Message Routing**: Coordinates message bus operations
- **Configuration Propagation**: Updates all bridges on config changes
- **Health Monitoring**: Provides bridge status and connectivity checks
### Message Bus (`server/bridge/messagebus.go`)
- **Publisher-Subscriber Pattern**: Decoupled message routing
- **Loop Prevention**: Prevents infinite message cycles
- **Buffer Management**: Configurable message queues (1000 message buffer)
- **Delivery Guarantees**: Timeout-based delivery with fallback handling
### XMPP Client (`server/xmpp/client.go`)
- **mellium.im/xmpp Integration**: Standards-compliant XMPP implementation
- **MUC Support**: Multi-User Chat for group messaging
- **Connection Management**: Automatic reconnection with exponential backoff
- **Message Parsing**: Structured message handling with XML parsing
- **XEP Extension Support**: Extensible protocol implementation framework
- **Server Capability Detection**: Dynamic feature discovery using disco#info
### XEP Features Framework (`server/xmpp/xep_features.go`)
- **Extensible Architecture**: Struct-based XEP management for scalability
- **Thread-Safe Operations**: Concurrent access protection with read-write mutex
- **Dynamic Feature Discovery**: Server capability-based feature initialization
- **Interface-Based Design**: Common XEPHandler interface for all extensions
### XEP-0077 In-Band Registration (`server/xmpp/xep_0077.go`)
- **Account Registration**: Complete user account creation functionality
- **Password Management**: Account password change operations
- **Registration Cancellation**: Account deletion and cleanup
- **Field Discovery**: Dynamic registration field requirement detection
- **Server Compatibility**: Only initializes when server supports the feature
### Bridge Implementations
- **XMPP Bridge** (`server/bridge/xmpp/`): XMPP protocol integration
- **Mattermost Bridge** (`server/bridge/mattermost/`): Internal Mattermost API integration
- **Message Handlers**: Protocol-specific message processing
- **User Resolvers**: User identity mapping and resolution
## Communication Patterns
### Message Flow Patterns
1. **Producer-Consumer**: Bridges produce/consume messages via message bus
2. **Request-Response**: Command processing with immediate feedback
3. **Event-Driven**: Configuration changes trigger bridge updates
4. **Publisher-Subscriber**: Message bus routes to all interested bridges
### Concurrency Patterns
1. **Goroutine Pools**: Per-bridge message processing
2. **Channel-Based Communication**: Non-blocking message queues
3. **Context Cancellation**: Graceful shutdown coordination
4. **WaitGroup Synchronization**: Coordinated cleanup operations
## Data Persistence
### Storage Strategy
- **KV Store Abstraction**: Pluggable storage backend
- **Structured Keys**: Hierarchical key naming for efficient queries
- **Bridge-Agnostic Schema**: Support for multiple bridge types
- **Migration Support**: Versioned schema for backward compatibility
### Key Patterns
```
channel_map_{bridge}_{identifier} → channel_id
{bridge}_user_{external_id} → mattermost_user_id
ghost_user_{mattermost_user_id} → ghost_jid
{bridge}_event_post_{event_id} → post_id
```
## Error Handling & Resilience
### Failure Modes
1. **Connection Failures**: Automatic reconnection with backoff
2. **Message Delivery Failures**: Timeout handling with retry logic
3. **Configuration Errors**: Validation with user-friendly error messages
4. **Storage Failures**: Graceful degradation with logging
### Recovery Strategies
1. **Circuit Breaker Pattern**: Prevent cascade failures
2. **Graceful Degradation**: Continue operation with reduced functionality
3. **Health Checks**: Proactive problem detection
4. **Comprehensive Logging**: Structured logging for debugging
## Performance Considerations
### Scalability Features
1. **Buffered Channels**: 1000-message buffers prevent blocking
2. **Message Aggregation**: Multiple message sources per bridge
3. **Concurrent Processing**: Parallel message handling
4. **Connection Pooling**: Efficient resource utilization
### Optimization Techniques
1. **Lazy Initialization**: Components created on demand
2. **Caching Strategies**: In-memory mapping cache
3. **Batch Operations**: Bulk message processing where possible
4. **Resource Cleanup**: Proper goroutine lifecycle management
## Security Model
### Access Control
- **Admin-Only Commands**: System administrator privilege requirement
- **Bridge Isolation**: Separate security contexts per bridge
- **Credential Management**: Secure storage of authentication data
- **TLS Configuration**: Configurable certificate validation
### Data Protection
- **Secure Storage**: Plugin KV store encryption
- **Message Privacy**: No message content logging
- **User Privacy**: Minimal personal data storage
- **Audit Trail**: Command execution logging
## Testing Strategy
### Unit Testing
- **Component Isolation**: Mock dependencies for focused testing
- **Interface Testing**: Bridge interface compliance verification
- **Message Processing**: End-to-end message flow validation
- **Error Scenarios**: Comprehensive failure mode testing
### Integration Testing
- **XMPP Testcontainers**: Automated XMPP server testing
- **Bridge Integration**: Multi-bridge message routing tests
- **Configuration Testing**: Settings validation and propagation
- **Performance Testing**: Load testing with gotestsum
- **XEP Feature Testing**: Doctor command with comprehensive XEP-0077 validation
- **Live Server Testing**: Real user account creation and deletion tests
## Extension Points
### Adding New Bridges
1. **Implement Bridge Interface**:
```go
type MyBridge struct {
// Bridge-specific fields
}
func (b *MyBridge) Start() error { /* implementation */ }
func (b *MyBridge) GetMessageChannel() <-chan *DirectionalMessage { /* implementation */ }
// ... other Bridge interface methods
```
2. **Create Message Handler**:
```go
type myMessageHandler struct {
bridge *MyBridge
}
func (h *myMessageHandler) ProcessMessage(msg *DirectionalMessage) error {
// Process incoming messages from other bridges
}
```
3. **Register with Bridge Manager**:
```go
myBridge := mybridge.NewBridge(logger, api, kvstore, config)
bridgeManager.RegisterBridge("mybridge", myBridge)
```
### Message Type Extensions
- **Rich Content**: Extend BridgeMessage for files, images, reactions
- **Custom Metadata**: Bridge-specific message properties
- **Format Converters**: Content transformation between protocols
- **Message Filtering**: Content-based routing rules
## Deployment & Operations
### Plugin Lifecycle
1. **Installation**: Plugin bundle deployment to Mattermost
2. **Configuration**: Admin console settings configuration
3. **Activation**: Component initialization and bridge startup
4. **Operation**: Real-time message synchronization
5. **Updates**: Hot configuration reload without restart
6. **Maintenance**: Health monitoring and log analysis
### Monitoring & Observability
- **Health Endpoints**: Bridge connectivity status
- **Structured Logging**: JSON-formatted log entries
- **Metrics Collection**: Message throughput and error rates
- **Alerting**: Failed connection and mapping error notifications
## Development Workflow
### Prerequisites
- Go 1.21+ for server development
- Node.js 18+ for webapp development
- Docker for XMPP test server
- golangci-lint for code quality
### Key Commands
```bash
make # Build plugin bundle
make test # Run all tests
make check-style # Lint and format
make deploy # Deploy to local Mattermost
make devserver_start # Start test XMPP server
```
### XMPP Client Doctor Tool (`cmd/xmpp-client-doctor/`)
The doctor command provides comprehensive XMPP connectivity and feature testing:
```bash
# Basic connectivity test
go run ./cmd/xmpp-client-doctor/main.go
# Test specific features
go run ./cmd/xmpp-client-doctor/main.go --test-xep0077 --test-muc --test-dm
# Custom server testing
go run ./cmd/xmpp-client-doctor/main.go --server example.com:5222 --username user@example.com
```
### Code Organization
```
server/
├── plugin.go # Main plugin entry point
├── bridge/
│ ├── manager.go # Bridge management
│ ├── messagebus.go # Message routing
│ ├── xmpp/ # XMPP bridge implementation
│ └── mattermost/ # Mattermost bridge implementation
├── model/ # Shared interfaces and types
├── config/ # Configuration management
├── command/ # Slash command handling
├── xmpp/ # XMPP client implementation
│ ├── client.go # Core XMPP client with MUC support
│ ├── xep_features.go # XEP extension framework
│ └── xep_0077.go # XEP-0077 In-Band Registration
└── store/ # Data persistence
```
This architecture provides a solid foundation for bidirectional message bridging while maintaining extensibility for future enhancements and additional bridge protocols.

View file

@ -23,9 +23,6 @@ ASSETS_DIR ?= assets
.PHONY: default
default: all
# Verify environment, and define PLUGIN_ID, PLUGIN_VERSION, HAS_SERVER and HAS_WEBAPP as needed.
include build/setup.mk
ifneq ($(MM_DEBUG),)
GO_BUILD_GCFLAGS = -gcflags "all=-N -l"
else

View file

@ -5,9 +5,9 @@ ifeq ($(GO),)
endif
# Gather build variables to inject into the manifest tool
BUILD_HASH_SHORT = $(shell git rev-parse --short HEAD)
BUILD_HASH_SHORT = $(shell git rev-parse --short HEAD 2>/dev/null)
BUILD_TAG_LATEST = $(shell git describe --tags --match 'v*' --abbrev=0 2>/dev/null)
BUILD_TAG_CURRENT = $(shell git tag --points-at HEAD)
BUILD_TAG_CURRENT = $(shell git tag --points-at HEAD 2>/dev/null)
# Extract the plugin id from the manifest.
PLUGIN_ID ?= $(shell pluginctl manifest get '{{.Id}}')

View file

@ -11,9 +11,16 @@ all: check-style test dist
manifest-check:
pluginctl manifest check
## Cleans the server build artifacts.
.PHONY: clean-server
clean-server:
ifneq ($(HAS_SERVER),)
rm -rf server/dist
endif
## Builds the server, if it exists, for all supported architectures, unless MM_SERVICESETTINGS_ENABLEDEVELOPER is set.
.PHONY: server
server: clean-server
server:
ifneq ($(HAS_SERVER),)
ifneq ($(MM_DEBUG),)

View file

@ -1,216 +0,0 @@
package main
import (
"encoding/json"
"fmt"
"os"
"strings"
"github.com/mattermost/mattermost/server/public/model"
"github.com/pkg/errors"
)
const pluginIDGoFileTemplate = `// This file is automatically generated. Do not modify it manually.
package main
import (
"encoding/json"
"strings"
"github.com/mattermost/mattermost/server/public/model"
)
var manifest *model.Manifest
const manifestStr = ` + "`" + `
%s
` + "`" + `
func init() {
_ = json.NewDecoder(strings.NewReader(manifestStr)).Decode(&manifest)
}
`
const pluginIDJSFileTemplate = `// This file is automatically generated. Do not modify it manually.
const manifest = JSON.parse(` + "`" + `
%s
` + "`" + `);
export default manifest;
`
// These build-time vars are read from shell commands and populated in ../setup.mk
var (
BuildHashShort string
BuildTagLatest string
BuildTagCurrent string
)
func main() {
if len(os.Args) <= 1 {
panic("no cmd specified")
}
manifest, err := findManifest()
if err != nil {
panic("failed to find manifest: " + err.Error())
}
cmd := os.Args[1]
switch cmd {
case "id":
dumpPluginID(manifest)
case "version":
dumpPluginVersion(manifest)
case "has_server":
if manifest.HasServer() {
fmt.Printf("true")
}
case "has_webapp":
if manifest.HasWebapp() {
fmt.Printf("true")
}
case "apply":
if err := applyManifest(manifest); err != nil {
panic("failed to apply manifest: " + err.Error())
}
case "dist":
if err := distManifest(manifest); err != nil {
panic("failed to write manifest to dist directory: " + err.Error())
}
case "check":
if err := manifest.IsValid(); err != nil {
panic("failed to check manifest: " + err.Error())
}
default:
panic("unrecognized command: " + cmd)
}
}
func findManifest() (*model.Manifest, error) {
_, manifestFilePath, err := model.FindManifest(".")
if err != nil {
return nil, errors.Wrap(err, "failed to find manifest in current working directory")
}
manifestFile, err := os.Open(manifestFilePath)
if err != nil {
return nil, errors.Wrapf(err, "failed to open %s", manifestFilePath)
}
defer manifestFile.Close()
// Re-decode the manifest, disallowing unknown fields. When we write the manifest back out,
// we don't want to accidentally clobber anything we won't preserve.
var manifest model.Manifest
decoder := json.NewDecoder(manifestFile)
decoder.DisallowUnknownFields()
if err = decoder.Decode(&manifest); err != nil {
return nil, errors.Wrap(err, "failed to parse manifest")
}
// If no version is listed in the manifest, generate one based on the state of the current
// commit, and use the first version we find (to prevent causing errors)
if manifest.Version == "" {
var version string
tags := strings.Fields(BuildTagCurrent)
for _, t := range tags {
if strings.HasPrefix(t, "v") {
version = t
break
}
}
if version == "" {
if BuildTagLatest != "" {
version = BuildTagLatest + "+" + BuildHashShort
} else {
version = "v0.0.0+" + BuildHashShort
}
}
manifest.Version = strings.TrimPrefix(version, "v")
}
// If no release notes specified, generate one from the latest tag, if present.
if manifest.ReleaseNotesURL == "" && BuildTagLatest != "" {
manifest.ReleaseNotesURL = manifest.HomepageURL + "releases/tag/" + BuildTagLatest
}
return &manifest, nil
}
// dumpPluginId writes the plugin id from the given manifest to standard out
func dumpPluginID(manifest *model.Manifest) {
fmt.Printf("%s", manifest.Id)
}
// dumpPluginVersion writes the plugin version from the given manifest to standard out
func dumpPluginVersion(manifest *model.Manifest) {
fmt.Printf("%s", manifest.Version)
}
// applyManifest propagates the plugin_id into the server and webapp folders, as necessary
func applyManifest(manifest *model.Manifest) error {
if manifest.HasServer() {
// generate JSON representation of Manifest.
manifestBytes, err := json.MarshalIndent(manifest, "", " ")
if err != nil {
return err
}
manifestStr := string(manifestBytes)
// write generated code to file by using Go file template.
if err := os.WriteFile(
"server/manifest.go",
[]byte(fmt.Sprintf(pluginIDGoFileTemplate, manifestStr)),
0600,
); err != nil {
return errors.Wrap(err, "failed to write server/manifest.go")
}
}
if manifest.HasWebapp() {
// generate JSON representation of Manifest.
// JSON is very similar and compatible with JS's object literals. so, what we do here
// is actually JS code generation.
manifestBytes, err := json.MarshalIndent(manifest, "", " ")
if err != nil {
return err
}
manifestStr := string(manifestBytes)
// Escape newlines
manifestStr = strings.ReplaceAll(manifestStr, `\n`, `\\n`)
// write generated code to file by using JS file template.
if err := os.WriteFile(
"webapp/src/manifest.ts",
[]byte(fmt.Sprintf(pluginIDJSFileTemplate, manifestStr)),
0600,
); err != nil {
return errors.Wrap(err, "failed to open webapp/src/manifest.ts")
}
}
return nil
}
// distManifest writes the manifest file to the dist directory
func distManifest(manifest *model.Manifest) error {
manifestBytes, err := json.MarshalIndent(manifest, "", " ")
if err != nil {
return err
}
if err := os.WriteFile(fmt.Sprintf("dist/%s/plugin.json", manifest.Id), manifestBytes, 0600); err != nil {
return errors.Wrap(err, "failed to write plugin.json")
}
return nil
}

View file

@ -1,185 +0,0 @@
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"slices"
"strings"
"time"
"github.com/mattermost/mattermost/server/public/model"
)
const (
logsPerPage = 100 // logsPerPage is the number of log entries to fetch per API call
timeStampFormat = "2006-01-02 15:04:05.000 Z07:00"
)
// logs fetches the latest 500 log entries from Mattermost,
// and prints only the ones related to the plugin to stdout.
func logs(ctx context.Context, client *model.Client4, pluginID string) error {
err := checkJSONLogsSetting(ctx, client)
if err != nil {
return err
}
logs, err := fetchLogs(ctx, client, 0, 500, pluginID, time.Unix(0, 0))
if err != nil {
return fmt.Errorf("failed to fetch log entries: %w", err)
}
err = printLogEntries(logs)
if err != nil {
return fmt.Errorf("failed to print logs entries: %w", err)
}
return nil
}
// watchLogs fetches log entries from Mattermost and print them to stdout.
// It will return without an error when ctx is canceled.
func watchLogs(ctx context.Context, client *model.Client4, pluginID string) error {
err := checkJSONLogsSetting(ctx, client)
if err != nil {
return err
}
now := time.Now()
var oldestEntry string
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
var page int
for {
logs, err := fetchLogs(ctx, client, page, logsPerPage, pluginID, now)
if err != nil {
return fmt.Errorf("failed to fetch log entries: %w", err)
}
var allNew bool
logs, oldestEntry, allNew = checkOldestEntry(logs, oldestEntry)
err = printLogEntries(logs)
if err != nil {
return fmt.Errorf("failed to print logs entries: %w", err)
}
if !allNew {
// No more logs to fetch
break
}
page++
}
}
}
}
// checkOldestEntry check a if logs contains new log entries.
// It returns the filtered slice of log entries, the new oldest entry and whether or not all entries were new.
func checkOldestEntry(logs []string, oldest string) ([]string, string, bool) {
if len(logs) == 0 {
return nil, oldest, false
}
newOldestEntry := logs[(len(logs) - 1)]
i := slices.Index(logs, oldest)
switch i {
case -1:
// Every log entry is new
return logs, newOldestEntry, true
case len(logs) - 1:
// No new log entries
return nil, oldest, false
default:
// Filter out oldest log entry
return logs[i+1:], newOldestEntry, false
}
}
// fetchLogs fetches log entries from Mattermost
// and filters them based on pluginID and timestamp.
func fetchLogs(ctx context.Context, client *model.Client4, page, perPage int, pluginID string, since time.Time) ([]string, error) {
logs, _, err := client.GetLogs(ctx, page, perPage)
if err != nil {
return nil, fmt.Errorf("failed to get logs from Mattermost: %w", err)
}
logs, err = filterLogEntries(logs, pluginID, since)
if err != nil {
return nil, fmt.Errorf("failed to filter log entries: %w", err)
}
return logs, nil
}
// filterLogEntries filters a given slice of log entries by pluginID.
// It also filters out any entries which timestamps are older then since.
func filterLogEntries(logs []string, pluginID string, since time.Time) ([]string, error) {
type logEntry struct {
PluginID string `json:"plugin_id"`
Timestamp string `json:"timestamp"`
}
var ret []string
for _, e := range logs {
var le logEntry
err := json.Unmarshal([]byte(e), &le)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal log entry into JSON: %w", err)
}
if le.PluginID != pluginID {
continue
}
let, err := time.Parse(timeStampFormat, le.Timestamp)
if err != nil {
return nil, fmt.Errorf("unknown timestamp format: %w", err)
}
if let.Before(since) {
continue
}
// Log entries returned by the API have a newline a prefix.
// Remove that to make printing consistent.
e = strings.TrimPrefix(e, "\n")
ret = append(ret, e)
}
return ret, nil
}
// printLogEntries prints a slice of log entries to stdout.
func printLogEntries(entries []string) error {
for _, e := range entries {
_, err := io.WriteString(os.Stdout, e+"\n")
if err != nil {
return fmt.Errorf("failed to write log entry to stdout: %w", err)
}
}
return nil
}
func checkJSONLogsSetting(ctx context.Context, client *model.Client4) error {
cfg, _, err := client.GetConfig(ctx)
if err != nil {
return fmt.Errorf("failed to fetch config: %w", err)
}
if cfg.LogSettings.FileJson == nil || !*cfg.LogSettings.FileJson {
return errors.New("JSON output for file logs are disabled. Please enable LogSettings.FileJson via the configration in Mattermost.") //nolint:revive,stylecheck
}
return nil
}

View file

@ -1,202 +0,0 @@
package main
import (
"fmt"
"testing"
"time"
)
func TestCheckOldestEntry(t *testing.T) {
for name, tc := range map[string]struct {
logs []string
oldest string
expectedLogs []string
expectedOldest string
expectedAllNew bool
}{
"nil logs": {
logs: nil,
oldest: "oldest",
expectedLogs: nil,
expectedOldest: "oldest",
expectedAllNew: false,
},
"empty logs": {
logs: []string{},
oldest: "oldest",
expectedLogs: nil,
expectedOldest: "oldest",
expectedAllNew: false,
},
"no new entries, one old entry": {
logs: []string{"old"},
oldest: "old",
expectedLogs: []string{},
expectedOldest: "old",
expectedAllNew: false,
},
"no new entries, multipile old entries": {
logs: []string{"old1", "old2", "old3"},
oldest: "old3",
expectedLogs: []string{},
expectedOldest: "old3",
expectedAllNew: false,
},
"one new entry, no old entry": {
logs: []string{"new"},
oldest: "old",
expectedLogs: []string{"new"},
expectedOldest: "new",
expectedAllNew: true,
},
"multipile new entries, no old entry": {
logs: []string{"new1", "new2", "new3"},
oldest: "old",
expectedLogs: []string{"new1", "new2", "new3"},
expectedOldest: "new3",
expectedAllNew: true,
},
"one new entry, one old entry": {
logs: []string{"old", "new"},
oldest: "old",
expectedLogs: []string{"new"},
expectedOldest: "new",
expectedAllNew: false,
},
"one new entry, multipile old entries": {
logs: []string{"old1", "old2", "old3", "new"},
oldest: "old3",
expectedLogs: []string{"new"},
expectedOldest: "new",
expectedAllNew: false,
},
"multipile new entries, ultipile old entries": {
logs: []string{"old1", "old2", "old3", "new1", "new2", "new3"},
oldest: "old3",
expectedLogs: []string{"new1", "new2", "new3"},
expectedOldest: "new3",
expectedAllNew: false,
},
} {
t.Run(name, func(t *testing.T) {
logs, oldest, allNew := checkOldestEntry(tc.logs, tc.oldest)
if allNew != tc.expectedAllNew {
t.Logf("expected allNew: %v, got %v", tc.expectedAllNew, allNew)
t.Fail()
}
if oldest != tc.expectedOldest {
t.Logf("expected oldest: %v, got %v", tc.expectedOldest, oldest)
t.Fail()
}
compareSlice(t, tc.expectedLogs, logs)
})
}
}
func TestFilterLogEntries(t *testing.T) {
now := time.Now()
for name, tc := range map[string]struct {
logs []string
pluginID string
since time.Time
expectedLogs []string
expectedErr bool
}{
"nil slice": {
logs: nil,
expectedLogs: nil,
expectedErr: false,
},
"empty slice": {
logs: []string{},
expectedLogs: nil,
expectedErr: false,
},
"no JSON": {
logs: []string{
`{"foo"`,
},
expectedLogs: nil,
expectedErr: true,
},
"unknown time format": {
logs: []string{
`{"message":"foo", "plugin_id": "some.plugin.id", "timestamp": "2023-12-18 10:58:53"}`,
},
pluginID: "some.plugin.id",
expectedLogs: nil,
expectedErr: true,
},
"one matching entry": {
logs: []string{
`{"message":"foo", "plugin_id": "some.plugin.id", "timestamp": "2023-12-18 10:58:53.091 +01:00"}`,
},
pluginID: "some.plugin.id",
expectedLogs: []string{
`{"message":"foo", "plugin_id": "some.plugin.id", "timestamp": "2023-12-18 10:58:53.091 +01:00"}`,
},
expectedErr: false,
},
"filter out non plugin entries": {
logs: []string{
`{"message":"bar1", "timestamp": "2023-12-18 10:58:52.091 +01:00"}`,
`{"message":"foo", "plugin_id": "some.plugin.id", "timestamp": "2023-12-18 10:58:53.091 +01:00"}`,
`{"message":"bar2", "timestamp": "2023-12-18 10:58:54.091 +01:00"}`,
},
pluginID: "some.plugin.id",
expectedLogs: []string{
`{"message":"foo", "plugin_id": "some.plugin.id", "timestamp": "2023-12-18 10:58:53.091 +01:00"}`,
},
expectedErr: false,
},
"filter out old entries": {
logs: []string{
fmt.Sprintf(`{"message":"old2", "plugin_id": "some.plugin.id", "timestamp": "%s"}`, now.Add(-2*time.Second).Format(timeStampFormat)),
fmt.Sprintf(`{"message":"old1", "plugin_id": "some.plugin.id", "timestamp": "%s"}`, now.Add(-1*time.Second).Format(timeStampFormat)),
fmt.Sprintf(`{"message":"now", "plugin_id": "some.plugin.id", "timestamp": "%s"}`, now.Format(timeStampFormat)),
fmt.Sprintf(`{"message":"new1", "plugin_id": "some.plugin.id", "timestamp": "%s"}`, now.Add(1*time.Second).Format(timeStampFormat)),
fmt.Sprintf(`{"message":"new2", "plugin_id": "some.plugin.id", "timestamp": "%s"}`, now.Add(2*time.Second).Format(timeStampFormat)),
},
pluginID: "some.plugin.id",
since: now,
expectedLogs: []string{
fmt.Sprintf(`{"message":"new1", "plugin_id": "some.plugin.id", "timestamp": "%s"}`, now.Add(1*time.Second).Format(timeStampFormat)),
fmt.Sprintf(`{"message":"new2", "plugin_id": "some.plugin.id", "timestamp": "%s"}`, now.Add(2*time.Second).Format(timeStampFormat)),
},
expectedErr: false,
},
} {
t.Run(name, func(t *testing.T) {
logs, err := filterLogEntries(tc.logs, tc.pluginID, tc.since)
if tc.expectedErr {
if err == nil {
t.Logf("expected error, got nil")
t.Fail()
}
} else {
if err != nil {
t.Logf("expected no error, got %v", err)
t.Fail()
}
}
compareSlice(t, tc.expectedLogs, logs)
})
}
}
func compareSlice[S ~[]E, E comparable](t *testing.T, expected, got S) {
if len(expected) != len(got) {
t.Logf("expected len: %v, got %v", len(expected), len(got))
t.FailNow()
}
for i := 0; i < len(expected); i++ {
if expected[i] != got[i] {
t.Logf("expected [%d]: %v, got %v", i, expected[i], got[i])
t.Fail()
}
}
}

View file

@ -1,184 +0,0 @@
// main handles deployment of the plugin to a development server using the Client4 API.
package main
import (
"context"
"errors"
"fmt"
"log"
"net"
"os"
"time"
"github.com/mattermost/mattermost/server/public/model"
)
const commandTimeout = 120 * time.Second
const helpText = `
Usage:
pluginctl deploy <plugin id> <bundle path>
pluginctl disable <plugin id>
pluginctl enable <plugin id>
pluginctl reset <plugin id>
`
func main() {
err := pluginctl()
if err != nil {
fmt.Printf("Failed: %s\n", err.Error())
fmt.Print(helpText)
os.Exit(1)
}
}
func pluginctl() error {
if len(os.Args) < 3 {
return errors.New("invalid number of arguments")
}
ctx, cancel := context.WithTimeout(context.Background(), commandTimeout)
defer cancel()
client, err := getClient(ctx)
if err != nil {
return err
}
switch os.Args[1] {
case "deploy":
if len(os.Args) < 4 {
return errors.New("invalid number of arguments")
}
return deploy(ctx, client, os.Args[2], os.Args[3])
case "disable":
return disablePlugin(ctx, client, os.Args[2])
case "enable":
return enablePlugin(ctx, client, os.Args[2])
case "reset":
return resetPlugin(ctx, client, os.Args[2])
case "logs":
return logs(ctx, client, os.Args[2])
case "logs-watch":
return watchLogs(context.WithoutCancel(ctx), client, os.Args[2]) // Keep watching forever
default:
return errors.New("invalid second argument")
}
}
func getClient(ctx context.Context) (*model.Client4, error) {
socketPath := os.Getenv("MM_LOCALSOCKETPATH")
if socketPath == "" {
socketPath = model.LocalModeSocketPath
}
client, connected := getUnixClient(socketPath)
if connected {
log.Printf("Connecting using local mode over %s", socketPath)
return client, nil
}
if os.Getenv("MM_LOCALSOCKETPATH") != "" {
log.Printf("No socket found at %s for local mode deployment. Attempting to authenticate with credentials.", socketPath)
}
siteURL := os.Getenv("MM_SERVICESETTINGS_SITEURL")
adminToken := os.Getenv("MM_ADMIN_TOKEN")
adminUsername := os.Getenv("MM_ADMIN_USERNAME")
adminPassword := os.Getenv("MM_ADMIN_PASSWORD")
if siteURL == "" {
return nil, errors.New("MM_SERVICESETTINGS_SITEURL is not set")
}
client = model.NewAPIv4Client(siteURL)
if adminToken != "" {
log.Printf("Authenticating using token against %s.", siteURL)
client.SetToken(adminToken)
return client, nil
}
if adminUsername != "" && adminPassword != "" {
client := model.NewAPIv4Client(siteURL)
log.Printf("Authenticating as %s against %s.", adminUsername, siteURL)
_, _, err := client.Login(ctx, adminUsername, adminPassword)
if err != nil {
return nil, fmt.Errorf("failed to login as %s: %w", adminUsername, err)
}
return client, nil
}
return nil, errors.New("one of MM_ADMIN_TOKEN or MM_ADMIN_USERNAME/MM_ADMIN_PASSWORD must be defined")
}
func getUnixClient(socketPath string) (*model.Client4, bool) {
_, err := net.Dial("unix", socketPath)
if err != nil {
return nil, false
}
return model.NewAPIv4SocketClient(socketPath), true
}
// deploy attempts to upload and enable a plugin via the Client4 API.
// It will fail if plugin uploads are disabled.
func deploy(ctx context.Context, client *model.Client4, pluginID, bundlePath string) error {
pluginBundle, err := os.Open(bundlePath)
if err != nil {
return fmt.Errorf("failed to open %s: %w", bundlePath, err)
}
defer pluginBundle.Close()
log.Print("Uploading plugin via API.")
_, _, err = client.UploadPluginForced(ctx, pluginBundle)
if err != nil {
return fmt.Errorf("failed to upload plugin bundle: %s", err.Error())
}
log.Print("Enabling plugin.")
_, err = client.EnablePlugin(ctx, pluginID)
if err != nil {
return fmt.Errorf("failed to enable plugin: %s", err.Error())
}
return nil
}
// disablePlugin attempts to disable the plugin via the Client4 API.
func disablePlugin(ctx context.Context, client *model.Client4, pluginID string) error {
log.Print("Disabling plugin.")
_, err := client.DisablePlugin(ctx, pluginID)
if err != nil {
return fmt.Errorf("failed to disable plugin: %w", err)
}
return nil
}
// enablePlugin attempts to enable the plugin via the Client4 API.
func enablePlugin(ctx context.Context, client *model.Client4, pluginID string) error {
log.Print("Enabling plugin.")
_, err := client.EnablePlugin(ctx, pluginID)
if err != nil {
return fmt.Errorf("failed to enable plugin: %w", err)
}
return nil
}
// resetPlugin attempts to reset the plugin via the Client4 API.
func resetPlugin(ctx context.Context, client *model.Client4, pluginID string) error {
err := disablePlugin(ctx, client, pluginID)
if err != nil {
return err
}
err = enablePlugin(ctx, client, pluginID)
if err != nil {
return err
}
return nil
}

View file

@ -1,50 +0,0 @@
# Ensure that go is installed. Note that this is independent of whether or not a server is being
# built, since the build script itself uses go.
ifeq ($(GO),)
$(error "go is not available: see https://golang.org/doc/install")
endif
# Gather build variables to inject into the manifest tool
BUILD_HASH_SHORT = $(shell git rev-parse --short HEAD)
BUILD_TAG_LATEST = $(shell git describe --tags --match 'v*' --abbrev=0 2>/dev/null)
BUILD_TAG_CURRENT = $(shell git tag --points-at HEAD)
# Ensure that the build tools are compiled. Go's caching makes this quick.
$(shell cd build/manifest && $(GO) build -ldflags '-X "main.BuildHashShort=$(BUILD_HASH_SHORT)" -X "main.BuildTagLatest=$(BUILD_TAG_LATEST)" -X "main.BuildTagCurrent=$(BUILD_TAG_CURRENT)"' -o ../bin/manifest)
# Ensure that the deployment tools are compiled. Go's caching makes this quick.
$(shell cd build/pluginctl && $(GO) build -o ../bin/pluginctl)
# Extract the plugin id from the manifest.
PLUGIN_ID ?= $(shell build/bin/manifest id)
ifeq ($(PLUGIN_ID),)
$(error "Cannot parse id from $(MANIFEST_FILE)")
endif
# Extract the plugin version from the manifest.
PLUGIN_VERSION ?= $(shell build/bin/manifest version)
ifeq ($(PLUGIN_VERSION),)
$(error "Cannot parse version from $(MANIFEST_FILE)")
endif
# Determine if a server is defined in the manifest.
HAS_SERVER ?= $(shell build/bin/manifest has_server)
# Determine if a webapp is defined in the manifest.
HAS_WEBAPP ?= $(shell build/bin/manifest has_webapp)
# Determine if a /public folder is in use
HAS_PUBLIC ?= $(wildcard public/.)
# Determine if the mattermost-utilities repo is present
HAS_MM_UTILITIES ?= $(wildcard $(MM_UTILITIES_DIR)/.)
# Store the current path for later use
PWD ?= $(shell pwd)
# Ensure that npm (and thus node) is installed.
ifneq ($(HAS_WEBAPP),)
ifeq ($(NPM),)
$(error "npm is not available: see https://www.npmjs.com/get-npm")
endif
endif

View file

@ -29,6 +29,7 @@ type Config struct {
TestMUC bool
TestDirectMessage bool
TestRoomExists bool
TestXEP0077 bool
Verbose bool
InsecureSkipVerify bool
}
@ -45,6 +46,7 @@ func main() {
flag.BoolVar(&config.TestMUC, "test-muc", true, "Enable MUC room testing (join/wait/leave)")
flag.BoolVar(&config.TestDirectMessage, "test-dm", true, "Enable direct message testing (send message to admin user)")
flag.BoolVar(&config.TestRoomExists, "test-room-exists", true, "Enable room existence testing using disco#info")
flag.BoolVar(&config.TestXEP0077, "test-xep0077", true, "Enable XEP-0077 In-Band Registration testing (required if enabled)")
flag.BoolVar(&config.Verbose, "verbose", true, "Enable verbose logging")
flag.BoolVar(&config.InsecureSkipVerify, "insecure-skip-verify", true, "Skip TLS certificate verification (for development)")
@ -86,6 +88,9 @@ func main() {
if config.TestRoomExists {
log.Printf(" Test Room Existence: enabled")
}
if config.TestXEP0077 {
log.Printf(" Test XEP-0077 In-Band Registration: enabled")
}
}
// Test the XMPP client
@ -97,6 +102,9 @@ func main() {
log.Printf("✅ XMPP client test completed successfully!")
} else {
fmt.Println("✅ XMPP client connectivity test passed!")
if config.TestXEP0077 {
fmt.Println("✅ XMPP XEP-0077 In-Band Registration test passed!")
}
if config.TestMUC {
fmt.Println("✅ XMPP MUC operations test passed!")
}
@ -124,7 +132,7 @@ func testXMPPClient(config *Config) error {
log.Printf("Using insecure TLS configuration (skipping certificate verification)")
}
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
InsecureSkipVerify: true, //nolint:gosec // This is a testing tool for development environments
}
client = xmpp.NewClientWithTLS(
config.Server,
@ -175,10 +183,21 @@ func testXMPPClient(config *Config) error {
log.Printf("✅ Connection health test passed in %v", pingDuration)
}
var xep0077Duration time.Duration
var mucDuration time.Duration
var dmDuration time.Duration
var roomExistsDuration time.Duration
// Test XEP-0077 In-Band Registration if requested
if config.TestXEP0077 {
start = time.Now()
err = testXEP0077(client, config)
if err != nil {
return fmt.Errorf("XEP-0077 In-Band Registration test failed: %w", err)
}
xep0077Duration = time.Since(start)
}
// Test MUC operations if requested
if config.TestMUC {
start = time.Now()
@ -226,6 +245,9 @@ func testXMPPClient(config *Config) error {
log.Printf("Connection summary:")
log.Printf(" Connect time: %v", connectDuration)
log.Printf(" Ping time: %v", pingDuration)
if config.TestXEP0077 {
log.Printf(" XEP-0077 test time: %v", xep0077Duration)
}
if config.TestMUC {
log.Printf(" MUC operations time: %v", mucDuration)
}
@ -237,6 +259,9 @@ func testXMPPClient(config *Config) error {
}
log.Printf(" Disconnect time: %v", disconnectDuration)
totalTime := connectDuration + pingDuration + disconnectDuration
if config.TestXEP0077 {
totalTime += xep0077Duration
}
if config.TestMUC {
totalTime += mucDuration
}
@ -302,7 +327,7 @@ func testMUCOperations(client *xmpp.Client, config *Config) error {
}
start = time.Now()
_, err = client.SendMessage(messageReq)
_, err = client.SendMessage(&messageReq)
if err != nil {
return fmt.Errorf("failed to send test message to room %s: %w", config.TestRoom, err)
}
@ -448,3 +473,120 @@ func (l *SimpleLogger) LogWarn(msg string, args ...interface{}) {
func (l *SimpleLogger) LogError(msg string, args ...interface{}) {
log.Printf("[ERROR] "+msg, args...)
}
// testXEP0077 tests XEP-0077 In-Band Registration functionality by creating and deleting a test user
func testXEP0077(client *xmpp.Client, config *Config) error {
if config.Verbose {
log.Printf("Testing XEP-0077 In-Band Registration functionality...")
}
// First, wait for server capability detection to complete
// This is handled asynchronously in the client Connect method
time.Sleep(2 * time.Second)
// Check if server supports XEP-0077
inBandReg, err := client.GetInBandRegistration()
if err != nil {
return fmt.Errorf("server does not support XEP-0077 In-Band Registration: %w", err)
}
if !inBandReg.IsEnabled() {
return fmt.Errorf("XEP-0077 In-Band Registration is not enabled on this server")
}
if config.Verbose {
log.Printf("✅ Server supports XEP-0077 In-Band Registration")
}
serverJID := client.GetJID().Domain()
// Step 1: Test registration fields discovery
start := time.Now()
if config.Verbose {
log.Printf("Testing registration fields discovery for server: %s", serverJID.String())
}
fields, err := inBandReg.GetRegistrationFields(serverJID)
if err != nil {
return fmt.Errorf("failed to get registration fields from server: %w", err)
}
fieldsDuration := time.Since(start)
if config.Verbose {
log.Printf("✅ Registration fields discovery completed in %v", fieldsDuration)
log.Printf("Registration fields: required=%v, available=%d", fields.Required, len(fields.Fields))
}
// Step 2: Create test user
testUsername := fmt.Sprintf("xmpptest%d", time.Now().Unix())
testPassword := "testpass123"
testEmail := fmt.Sprintf("%s@localhost", testUsername)
if config.Verbose {
log.Printf("Creating test user: %s", testUsername)
}
registrationRequest := &xmpp.RegistrationRequest{
Username: testUsername,
Password: testPassword,
Email: testEmail,
}
start = time.Now()
regResponse, err := inBandReg.RegisterAccount(serverJID, registrationRequest)
if err != nil {
return fmt.Errorf("failed to register test user '%s': %w", testUsername, err)
}
registerDuration := time.Since(start)
if !regResponse.Success {
return fmt.Errorf("user registration failed: %s", regResponse.Error)
}
if config.Verbose {
log.Printf("✅ Test user '%s' registered successfully in %v", testUsername, registerDuration)
log.Printf("Registration response: %s", regResponse.Message)
}
// Step 3: Delete test user (cleanup)
if config.Verbose {
log.Printf("Cleaning up: removing test user '%s'", testUsername)
}
start = time.Now()
cancelResponse, err := inBandReg.CancelRegistration(serverJID)
if err != nil {
if config.Verbose {
log.Printf("⚠️ Failed to remove test user '%s': %v", testUsername, err)
log.Printf("⚠️ Manual cleanup may be required")
}
} else {
cancelDuration := time.Since(start)
if cancelResponse.Success {
if config.Verbose {
log.Printf("✅ Test user '%s' removed successfully in %v", testUsername, cancelDuration)
}
} else {
if config.Verbose {
log.Printf("⚠️ User removal may have failed: %s", cancelResponse.Error)
}
}
}
if config.Verbose {
log.Printf("XEP-0077 test summary:")
log.Printf(" Server support check: ✅")
log.Printf(" Registration fields discovery time: %v", fieldsDuration)
log.Printf(" User registration time: %v", registerDuration)
log.Printf(" Test username: %s", testUsername)
log.Printf(" Required fields count: %d", len(fields.Required))
log.Printf(" User creation: ✅")
if err == nil && cancelResponse.Success {
log.Printf(" User cleanup: ✅")
} else {
log.Printf(" User cleanup: ⚠️")
}
}
return nil
}

4
go.mod
View file

@ -4,10 +4,12 @@ go 1.24.3
require (
github.com/gorilla/mux v1.8.1
github.com/jellydator/ttlcache/v3 v3.4.0
github.com/mattermost/mattermost/server/public v0.1.10
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.10.0
mellium.im/sasl v0.3.2
mellium.im/xmlstream v0.15.4
mellium.im/xmpp v0.22.0
)
@ -102,7 +104,6 @@ require (
github.com/hashicorp/yamux v0.1.2 // indirect
github.com/hexops/gotextdiff v1.0.3 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.4.0 // indirect
github.com/jgautheron/goconst v1.7.1 // indirect
github.com/jingyugao/rowserrcheck v1.1.1 // indirect
github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect
@ -226,7 +227,6 @@ require (
gotest.tools/gotestsum v1.7.0 // indirect
honnef.co/go/tools v0.5.1 // indirect
mellium.im/reader v0.1.0 // indirect
mellium.im/xmlstream v0.15.4 // indirect
mvdan.cc/gofumpt v0.7.0 // indirect
mvdan.cc/unparam v0.0.0-20240528143540-8a5130ca722f // indirect
)

6
go.sum
View file

@ -733,8 +733,8 @@ go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
@ -867,8 +867,6 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View file

@ -5,7 +5,7 @@
"homepage_url": "https://github.com/mattermost/mattermost-plugin-bridge-xmpp",
"support_url": "https://github.com/mattermost/mattermost-plugin-bridge-xmpp/issues",
"icon_path": "assets/logo.png",
"version": "",
"version": "0.1.0",
"min_server_version": "9.5.0",
"server": {
"executables": {
@ -99,7 +99,7 @@
},
"props": {
"pluginctl": {
"version": "v0.1.2"
"version": "v0.1.4"
}
}
}

View file

@ -5,14 +5,17 @@ import (
"fmt"
"sync"
mmModel "github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/plugin"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
mmModel "github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/plugin"
)
// BridgeManager manages multiple bridge instances
//
//nolint:revive // BridgeManager is clearer than Manager in this context
type BridgeManager struct {
bridges map[string]model.Bridge
mu sync.RWMutex
@ -26,22 +29,15 @@ type BridgeManager struct {
}
// NewBridgeManager creates a new bridge manager
func NewBridgeManager(logger logger.Logger, api plugin.API, remoteID string) model.BridgeManager {
if logger == nil {
panic("logger cannot be nil")
}
if api == nil {
panic("plugin API cannot be nil")
}
func NewBridgeManager(log logger.Logger, api plugin.API, remoteID string) model.BridgeManager {
ctx, cancel := context.WithCancel(context.Background())
return &BridgeManager{
bridges: make(map[string]model.Bridge),
logger: logger,
logger: log,
api: api,
remoteID: remoteID,
messageBus: NewMessageBus(logger),
messageBus: NewMessageBus(log),
routingCtx: ctx,
routingCancel: cancel,
}
@ -64,7 +60,7 @@ func (m *BridgeManager) RegisterBridge(name string, bridge model.Bridge) error {
}
m.bridges[name] = bridge
m.logger.LogInfo("Bridge registered", "name", name)
m.logger.LogInfo("Bridge registered", "bridge_id", name)
// Subscribe bridge to message bus
go m.startBridgeMessageHandler(name, bridge)
@ -82,14 +78,14 @@ func (m *BridgeManager) StartBridge(name string) error {
return fmt.Errorf("bridge '%s' is not registered", name)
}
m.logger.LogInfo("Starting bridge", "name", name)
m.logger.LogInfo("Starting bridge", "bridge_id", name)
if err := bridge.Start(); err != nil {
m.logger.LogError("Failed to start bridge", "name", name, "error", err)
m.logger.LogError("Failed to start bridge", "bridge_id", name, "error", err)
return fmt.Errorf("failed to start bridge '%s': %w", name, err)
}
m.logger.LogInfo("Bridge started successfully", "name", name)
m.logger.LogInfo("Bridge started successfully", "bridge_id", name)
return nil
}
@ -103,14 +99,14 @@ func (m *BridgeManager) StopBridge(name string) error {
return fmt.Errorf("bridge '%s' is not registered", name)
}
m.logger.LogInfo("Stopping bridge", "name", name)
m.logger.LogInfo("Stopping bridge", "bridge_id", name)
if err := bridge.Stop(); err != nil {
m.logger.LogError("Failed to stop bridge", "name", name, "error", err)
m.logger.LogError("Failed to stop bridge", "bridge_id", name, "error", err)
return fmt.Errorf("failed to stop bridge '%s': %w", name, err)
}
m.logger.LogInfo("Bridge stopped successfully", "name", name)
m.logger.LogInfo("Bridge stopped successfully", "bridge_id", name)
return nil
}
@ -127,12 +123,12 @@ func (m *BridgeManager) UnregisterBridge(name string) error {
// Stop the bridge before unregistering
if bridge.IsConnected() {
if err := bridge.Stop(); err != nil {
m.logger.LogWarn("Failed to stop bridge during unregistration", "name", name, "error", err)
m.logger.LogWarn("Failed to stop bridge during unregistration", "bridge_id", name, "error", err)
}
}
delete(m.bridges, name)
m.logger.LogInfo("Bridge unregistered", "name", name)
m.logger.LogInfo("Bridge unregistered", "bridge_id", name)
return nil
}
@ -211,7 +207,7 @@ func (m *BridgeManager) Shutdown() error {
if bridge.IsConnected() {
if err := bridge.Stop(); err != nil {
errors = append(errors, fmt.Errorf("failed to stop bridge '%s': %w", name, err))
m.logger.LogError("Failed to stop bridge during shutdown", "name", name, "error", err)
m.logger.LogError("Failed to stop bridge during shutdown", "bridge_id", name, "error", err)
}
}
}
@ -229,7 +225,7 @@ func (m *BridgeManager) Shutdown() error {
}
// OnPluginConfigurationChange propagates configuration changes to all registered bridges
func (m *BridgeManager) OnPluginConfigurationChange(config *config.Configuration) error {
func (m *BridgeManager) OnPluginConfigurationChange(cfg *config.Configuration) error {
m.mu.RLock()
defer m.mu.RUnlock()
@ -241,11 +237,11 @@ func (m *BridgeManager) OnPluginConfigurationChange(config *config.Configuration
var errors []error
for name, bridge := range m.bridges {
if err := bridge.UpdateConfiguration(config); err != nil {
if err := bridge.UpdateConfiguration(cfg); err != nil {
errors = append(errors, fmt.Errorf("failed to update configuration for bridge '%s': %w", name, err))
m.logger.LogError("Failed to update bridge configuration", "name", name, "error", err)
m.logger.LogError("Failed to update bridge configuration", "bridge_id", name, "error", err)
} else {
m.logger.LogDebug("Successfully updated bridge configuration", "name", name)
m.logger.LogDebug("Successfully updated bridge configuration", "bridge_id", name)
}
}
@ -258,7 +254,7 @@ func (m *BridgeManager) OnPluginConfigurationChange(config *config.Configuration
}
// CreateChannelMapping handles the creation of a channel mapping by calling the appropriate bridge
func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingRequest) error {
func (m *BridgeManager) CreateChannelMapping(req *model.CreateChannelMappingRequest) error {
// Validate request
if err := req.Validate(); err != nil {
return fmt.Errorf("invalid mapping request: %w", err)
@ -278,7 +274,7 @@ func (m *BridgeManager) CreateChannelMapping(req model.CreateChannelMappingReque
return fmt.Errorf("bridge '%s' is not connected", req.BridgeName)
}
// NEW: Check if room already mapped to another channel
// Check if channel mapping already exists on the bridge
existingChannelID, err := bridge.GetChannelMapping(req.BridgeChannelID)
if err != nil {
m.logger.LogError("Failed to check channel mapping", "bridge_channel_id", req.BridgeChannelID, "error", err)
@ -392,7 +388,7 @@ func (m *BridgeManager) DeleteChannepMapping(req model.DeleteChannelMappingReque
}
// shareChannel creates a shared channel configuration using the Mattermost API
func (m *BridgeManager) shareChannel(req model.CreateChannelMappingRequest) error {
func (m *BridgeManager) shareChannel(req *model.CreateChannelMappingRequest) error {
if m.remoteID == "" {
return fmt.Errorf("remote ID not set - plugin not registered for shared channels")
}
@ -408,7 +404,7 @@ func (m *BridgeManager) shareChannel(req model.CreateChannelMappingRequest) erro
SharePurpose: fmt.Sprintf("Shared channel bridged to %s", req.BridgeChannelID),
ShareHeader: "test header",
CreatorId: req.UserID,
RemoteId: m.remoteID,
RemoteId: "",
}
// Share the channel
@ -439,35 +435,35 @@ func (m *BridgeManager) unshareChannel(channelID string) error {
}
// startBridgeMessageHandler starts message handling for a specific bridge
func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge model.Bridge) {
m.logger.LogDebug("Starting message handler for bridge", "bridge", bridgeName)
func (m *BridgeManager) startBridgeMessageHandler(bridgeID string, bridge model.Bridge) {
m.logger.LogDebug("Starting message handler for bridge", "bridge_id", bridgeID)
// Subscribe to message bus
messageChannel := m.messageBus.Subscribe(bridgeName)
messageChannel := m.messageBus.Subscribe(bridgeID)
// Start message routing goroutine
m.routingWg.Add(1)
go func() {
defer m.routingWg.Done()
defer m.logger.LogDebug("Message handler stopped for bridge", "bridge", bridgeName)
defer m.logger.LogDebug("Message handler stopped for bridge", "bridge_id", bridgeID)
for {
select {
case msg, ok := <-messageChannel:
if !ok {
m.logger.LogDebug("Message channel closed for bridge", "bridge", bridgeName)
m.logger.LogDebug("Message channel closed for bridge", "bridge_id", bridgeID)
return
}
if err := m.handleBridgeMessage(bridgeName, bridge, msg); err != nil {
if err := m.handleBridgeMessage(bridgeID, bridge, msg); err != nil {
m.logger.LogError("Failed to handle message for bridge",
"bridge", bridgeName,
"bridge_id", bridgeID,
"source_bridge", msg.SourceBridge,
"error", err)
}
case <-m.routingCtx.Done():
m.logger.LogDebug("Context cancelled, stopping message handler", "bridge", bridgeName)
m.logger.LogDebug("Context cancelled, stopping message handler", "bridge_id", bridgeID)
return
}
}
@ -477,26 +473,26 @@ func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge mode
m.routingWg.Add(1)
go func() {
defer m.routingWg.Done()
defer m.logger.LogDebug("Bridge message listener stopped", "bridge", bridgeName)
defer m.logger.LogDebug("Bridge message listener stopped", "bridge_id", bridgeID)
bridgeMessageChannel := bridge.GetMessageChannel()
for {
select {
case msg, ok := <-bridgeMessageChannel:
if !ok {
m.logger.LogDebug("Bridge message channel closed", "bridge", bridgeName)
m.logger.LogDebug("Bridge message channel closed", "bridge_id", bridgeID)
return
}
if err := m.messageBus.Publish(msg); err != nil {
m.logger.LogError("Failed to publish message from bridge",
"bridge", bridgeName,
"bridge_id", bridgeID,
"direction", msg.Direction,
"error", err)
}
case <-m.routingCtx.Done():
m.logger.LogDebug("Context cancelled, stopping bridge listener", "bridge", bridgeName)
m.logger.LogDebug("Context cancelled, stopping bridge listener", "bridge_id", bridgeID)
return
}
}
@ -504,9 +500,9 @@ func (m *BridgeManager) startBridgeMessageHandler(bridgeName string, bridge mode
}
// handleBridgeMessage processes an incoming message for a specific bridge
func (m *BridgeManager) handleBridgeMessage(bridgeName string, bridge model.Bridge, msg *model.DirectionalMessage) error {
func (m *BridgeManager) handleBridgeMessage(bridgeID string, bridge model.Bridge, msg *model.DirectionalMessage) error {
m.logger.LogDebug("Handling message for bridge",
"target_bridge", bridgeName,
"target_bridge", bridgeID,
"source_bridge", msg.SourceBridge,
"direction", msg.Direction,
"channel_id", msg.SourceChannelID)
@ -514,13 +510,13 @@ func (m *BridgeManager) handleBridgeMessage(bridgeName string, bridge model.Brid
// Get the bridge's message handler
handler := bridge.GetMessageHandler()
if handler == nil {
return fmt.Errorf("bridge %s does not have a message handler", bridgeName)
return fmt.Errorf("bridge %s does not have a message handler", bridgeID)
}
// Check if the handler can process this message
if !handler.CanHandleMessage(msg.BridgeMessage) {
m.logger.LogDebug("Bridge cannot handle message",
"bridge", bridgeName,
"bridge_id", bridgeID,
"message_type", msg.MessageType)
return nil // Not an error, just skip
}

View file

@ -6,12 +6,13 @@ import (
"sync"
"sync/atomic"
"github.com/mattermost/mattermost/server/public/plugin"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/bridge"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/store/kvstore"
"github.com/mattermost/mattermost/server/public/plugin"
)
const (
@ -26,6 +27,7 @@ type mattermostBridge struct {
kvstore kvstore.KVStore
userManager pluginModel.BridgeUserManager
botUserID string // Bot user ID for posting messages
bridgeID string // Bridge identifier used for registration
remoteID string // Remote ID for shared channels
// Message handling
@ -48,19 +50,20 @@ type mattermostBridge struct {
}
// NewBridge creates a new Mattermost bridge
func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration, botUserID, remoteID string) pluginModel.Bridge {
func NewBridge(log logger.Logger, api plugin.API, store kvstore.KVStore, cfg *config.Configuration, botUserID, bridgeID, remoteID string) pluginModel.Bridge {
ctx, cancel := context.WithCancel(context.Background())
b := &mattermostBridge{
logger: log,
api: api,
kvstore: kvstore,
kvstore: store,
botUserID: botUserID,
bridgeID: bridgeID,
remoteID: remoteID,
ctx: ctx,
cancel: cancel,
channelMappings: make(map[string]string),
config: cfg,
userManager: bridge.NewUserManager("mattermost", log),
userManager: bridge.NewUserManager(bridgeID, log),
incomingMessages: make(chan *pluginModel.DirectionalMessage, defaultMessageBufferSize),
}
@ -71,13 +74,6 @@ func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *
return b
}
// getConfiguration safely retrieves the current configuration
func (b *mattermostBridge) getConfiguration() *config.Configuration {
b.configMu.RLock()
defer b.configMu.RUnlock()
return b.config
}
// UpdateConfiguration updates the bridge configuration
func (b *mattermostBridge) UpdateConfiguration(cfg *config.Configuration) error {
// Validate configuration using built-in validation
@ -100,10 +96,10 @@ func (b *mattermostBridge) Start() error {
b.logger.LogDebug("Starting Mattermost bridge")
b.configMu.RLock()
config := b.config
cfg := b.config
b.configMu.RUnlock()
if config == nil {
if cfg == nil {
return fmt.Errorf("bridge configuration not set")
}
@ -205,11 +201,11 @@ func (b *mattermostBridge) IsConnected() bool {
// Ping actively tests the Mattermost API connectivity
func (b *mattermostBridge) Ping() error {
if !b.connected.Load() {
return fmt.Errorf("Mattermost bridge is not connected")
return fmt.Errorf("mattermost bridge is not connected")
}
if b.api == nil {
return fmt.Errorf("Mattermost API not initialized")
return fmt.Errorf("mattermost API not initialized")
}
b.logger.LogDebug("Testing Mattermost bridge connectivity with API ping")
@ -219,7 +215,7 @@ func (b *mattermostBridge) Ping() error {
version := b.api.GetServerVersion()
if version == "" {
b.logger.LogWarn("Mattermost bridge ping returned empty version")
return fmt.Errorf("Mattermost API ping returned empty server version")
return fmt.Errorf("mattermost API ping returned empty server version")
}
b.logger.LogDebug("Mattermost bridge ping successful", "server_version", version)
@ -311,7 +307,7 @@ func (b *mattermostBridge) DeleteChannelMapping(channelID string) error {
// ChannelMappingExists checks if a Mattermost channel exists on the server
func (b *mattermostBridge) ChannelMappingExists(roomID string) (bool, error) {
if b.api == nil {
return false, fmt.Errorf("Mattermost API not initialized")
return false, fmt.Errorf("mattermost API not initialized")
}
b.logger.LogDebug("Checking if Mattermost channel exists", "channel_id", roomID)
@ -402,3 +398,8 @@ func (b *mattermostBridge) GetUserResolver() pluginModel.UserResolver {
func (b *mattermostBridge) GetRemoteID() string {
return b.remoteID
}
// ID returns the bridge identifier used when registering the bridge
func (b *mattermostBridge) ID() string {
return b.bridgeID
}

View file

@ -4,9 +4,10 @@ import (
"fmt"
"strings"
mmModel "github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
mmModel "github.com/mattermost/mattermost/server/public/model"
)
// mattermostMessageHandler handles incoming messages for the Mattermost bridge
@ -59,7 +60,7 @@ func (h *mattermostMessageHandler) GetSupportedMessageTypes() []string {
// postMessageToMattermost posts a message to a Mattermost channel
func (h *mattermostMessageHandler) postMessageToMattermost(msg *pluginModel.BridgeMessage) error {
if h.bridge.api == nil {
return fmt.Errorf("Mattermost API not initialized")
return fmt.Errorf("mattermost API not initialized")
}
// Get the Mattermost channel ID from the channel mapping using the source bridge name
@ -81,19 +82,30 @@ func (h *mattermostMessageHandler) postMessageToMattermost(msg *pluginModel.Brid
return fmt.Errorf("channel %s not found", channelID)
}
// Format the message content
content := h.formatMessageContent(msg)
// Get or create remote user for this message
remoteUserID, err := h.getOrCreateRemoteUser(msg)
if err != nil {
return fmt.Errorf("failed to get or create remote user: %w", err)
}
// Create the post
if err := h.bridge.api.InviteRemoteToChannel(channelID, msg.SourceRemoteID, remoteUserID, true); err != nil {
h.logger.LogError("Failed to invite remote user to channel",
"channel_id", msg.SourceChannelID,
"remote_user_id", remoteUserID,
"source_bridge", msg.SourceBridge,
"source_remote_id", msg.SourceRemoteID,
"err", err.Error(),
)
}
// Create the post using the remote user (no need for bridge formatting since it's posted as the actual user)
post := &mmModel.Post{
ChannelId: channelID,
UserId: h.bridge.botUserID,
Message: content,
UserId: remoteUserID,
Message: msg.Content,
Type: mmModel.PostTypeDefault,
Props: map[string]interface{}{
"from_bridge": msg.SourceBridge,
"bridge_user_id": msg.SourceUserID,
"bridge_user_name": msg.SourceUserName,
"bridge_message_id": msg.MessageID,
"bridge_timestamp": msg.Timestamp.Unix(),
},
@ -113,34 +125,98 @@ func (h *mattermostMessageHandler) postMessageToMattermost(msg *pluginModel.Brid
h.logger.LogDebug("Message posted to Mattermost channel",
"channel_id", channelID,
"post_id", createdPost.Id,
"remote_user_id", remoteUserID,
"source_bridge", msg.SourceBridge,
"content_length", len(content))
"source_user", msg.SourceUserName,
"content_length", len(msg.Content))
return nil
}
// formatMessageContent formats the message content for Mattermost
func (h *mattermostMessageHandler) formatMessageContent(msg *pluginModel.BridgeMessage) string {
// For messages from other bridges, prefix with the bridge info and user name
if msg.SourceUserName != "" {
bridgeIcon := h.getBridgeIcon(msg.SourceBridge)
return fmt.Sprintf("%s **%s**: %s", bridgeIcon, msg.SourceUserName, msg.Content)
// getOrCreateRemoteUser gets or creates a remote user for incoming bridge messages
func (h *mattermostMessageHandler) getOrCreateRemoteUser(msg *pluginModel.BridgeMessage) (string, error) {
// Generate username from source info
username := h.generateUsername(msg.SourceUserID, msg.SourceUserName, msg.SourceBridge)
// Generate email using bridge ID
email := fmt.Sprintf("%s@bridge.%s", username, h.bridge.bridgeID)
// First try to find existing user by username
if existingUser, appErr := h.bridge.api.GetUserByUsername(username); appErr == nil && existingUser != nil {
// Check if this user has the correct RemoteId
if existingUser.RemoteId != nil && *existingUser.RemoteId == msg.SourceRemoteID {
h.logger.LogDebug("Found existing remote user",
"user_id", existingUser.Id,
"username", username,
"source_bridge", msg.SourceBridge,
"source_remote_id", msg.SourceRemoteID)
return existingUser.Id, nil
}
}
return msg.Content
// Also try to find user by email
if existingUser, appErr := h.bridge.api.GetUserByEmail(email); appErr == nil && existingUser != nil {
// Check if this user has the correct RemoteId
if existingUser.RemoteId != nil && *existingUser.RemoteId == msg.SourceRemoteID {
h.logger.LogDebug("Found existing remote user by email",
"user_id", existingUser.Id,
"email", email,
"source_bridge", msg.SourceBridge,
"source_remote_id", msg.SourceRemoteID)
return existingUser.Id, nil
}
}
// User doesn't exist, create the remote user
user := &mmModel.User{
Username: username,
Email: email,
FirstName: msg.SourceUserName,
Password: mmModel.NewId(),
RemoteId: mmModel.NewPointer(msg.SourceRemoteID),
}
// Try to create the user
createdUser, appErr := h.bridge.api.CreateUser(user)
if appErr != nil {
h.logger.LogError("Failed to create remote user",
"username", username,
"email", email,
"source_bridge", msg.SourceBridge,
"source_remote_id", msg.SourceRemoteID,
"error", appErr)
return "", fmt.Errorf("failed to create remote user: %w", appErr)
}
h.logger.LogInfo("Created remote user",
"user_id", createdUser.Id,
"username", username,
"email", email,
"source_bridge", msg.SourceBridge,
"source_remote_id", msg.SourceRemoteID)
return createdUser.Id, nil
}
// getBridgeIcon returns an icon/emoji for the source bridge
func (h *mattermostMessageHandler) getBridgeIcon(bridgeType string) string {
switch bridgeType {
case "xmpp":
return ":speech_balloon:" // Chat bubble emoji for XMPP
case "slack":
return ":slack:" // Slack emoji if available
case "discord":
return ":discord:" // Discord emoji if available
default:
return ":bridge_at_night:" // Generic bridge emoji
// generateUsername creates a username from source information
func (h *mattermostMessageHandler) generateUsername(sourceUserID, sourceUserName, sourceBridge string) string {
var baseUsername string
// Prefer source user name, fallback to user ID
if sourceUserName != "" {
baseUsername = sourceUserName
} else {
baseUsername = sourceUserID
}
// Clean the username (remove invalid characters, make lowercase)
baseUsername = strings.ToLower(baseUsername)
baseUsername = strings.ReplaceAll(baseUsername, "@", "")
baseUsername = strings.ReplaceAll(baseUsername, ".", "")
baseUsername = strings.ReplaceAll(baseUsername, " ", "")
// Prefix with bridge type to avoid conflicts
return fmt.Sprintf("%s-%s", sourceBridge, baseUsername)
}
// mattermostUserResolver handles user resolution for the Mattermost bridge
@ -168,7 +244,7 @@ func (r *mattermostUserResolver) ResolveUser(externalUserID string) (*pluginMode
}
if user == nil {
return nil, fmt.Errorf("Mattermost user not found: %s", externalUserID)
return nil, fmt.Errorf("mattermost user not found: %s", externalUserID)
}
return &pluginModel.ExternalUser{

View file

@ -6,14 +6,17 @@ import (
"sync"
"time"
mmModel "github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/plugin"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
mmModel "github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/plugin"
)
// MattermostUser represents a Mattermost user that implements the BridgeUser interface
//
//nolint:revive // MattermostUser is clearer than User in this context
type MattermostUser struct {
// User identity
id string
@ -40,7 +43,7 @@ type MattermostUser struct {
}
// NewMattermostUser creates a new Mattermost user
func NewMattermostUser(id, displayName, username, email string, api plugin.API, cfg *config.Configuration, logger logger.Logger) *MattermostUser {
func NewMattermostUser(id, displayName, username, email string, api plugin.API, cfg *config.Configuration, log logger.Logger) *MattermostUser {
ctx, cancel := context.WithCancel(context.Background())
return &MattermostUser{
@ -53,7 +56,7 @@ func NewMattermostUser(id, displayName, username, email string, api plugin.API,
config: cfg,
ctx: ctx,
cancel: cancel,
logger: logger,
logger: log,
}
}
@ -69,7 +72,7 @@ func (u *MattermostUser) Validate() error {
return fmt.Errorf("configuration cannot be nil")
}
if u.api == nil {
return fmt.Errorf("Mattermost API cannot be nil")
return fmt.Errorf("mattermost API cannot be nil")
}
return nil
}
@ -192,13 +195,13 @@ func (u *MattermostUser) IsConnected() bool {
func (u *MattermostUser) Ping() error {
if u.api == nil {
return fmt.Errorf("Mattermost API not initialized for user %s", u.id)
return fmt.Errorf("mattermost API not initialized for user %s", u.id)
}
// Test API connectivity by getting server version
version := u.api.GetServerVersion()
if version == "" {
return fmt.Errorf("Mattermost API ping returned empty server version for user %s", u.id)
return fmt.Errorf("mattermost API ping returned empty server version for user %s", u.id)
}
return nil
@ -207,7 +210,7 @@ func (u *MattermostUser) Ping() error {
// CheckChannelExists checks if a Mattermost channel exists
func (u *MattermostUser) CheckChannelExists(channelID string) (bool, error) {
if u.api == nil {
return false, fmt.Errorf("Mattermost API not initialized for user %s", u.id)
return false, fmt.Errorf("mattermost API not initialized for user %s", u.id)
}
// Try to get the channel by ID

View file

@ -35,7 +35,7 @@ type messageBus struct {
}
// NewMessageBus creates a new message bus instance
func NewMessageBus(logger logger.Logger) model.MessageBus {
func NewMessageBus(log logger.Logger) model.MessageBus {
ctx, cancel := context.WithCancel(context.Background())
return &messageBus{
@ -43,7 +43,7 @@ func NewMessageBus(logger logger.Logger) model.MessageBus {
subscribers: make(map[string]chan *model.DirectionalMessage),
ctx: ctx,
cancel: cancel,
logger: logger,
logger: log,
}
}

View file

@ -21,11 +21,11 @@ type UserManager struct {
}
// NewUserManager creates a new user manager for a specific bridge type
func NewUserManager(bridgeType string, logger logger.Logger) model.BridgeUserManager {
func NewUserManager(bridgeType string, log logger.Logger) model.BridgeUserManager {
ctx, cancel := context.WithCancel(context.Background())
return &UserManager{
bridgeType: bridgeType,
logger: logger,
logger: log,
users: make(map[string]model.BridgeUser),
ctx: ctx,
cancel: cancel,

View file

@ -9,13 +9,16 @@ import (
"fmt"
"github.com/mattermost/mattermost/server/public/plugin"
"mellium.im/xmlstream"
"mellium.im/xmpp/stanza"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/bridge"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/store/kvstore"
xmppClient "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/xmpp"
"github.com/mattermost/mattermost/server/public/plugin"
)
const (
@ -30,7 +33,8 @@ type xmppBridge struct {
kvstore kvstore.KVStore
bridgeClient *xmppClient.Client // Main bridge XMPP client connection
userManager pluginModel.BridgeUserManager
remoteID string // Remote ID for shared channels
bridgeID string // Bridge identifier used for registration
remoteID string // Remote ID for shared channels
// Message handling
messageHandler *xmppMessageHandler
@ -52,18 +56,19 @@ type xmppBridge struct {
}
// NewBridge creates a new XMPP bridge
func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *config.Configuration, remoteID string) pluginModel.Bridge {
func NewBridge(log logger.Logger, api plugin.API, store kvstore.KVStore, cfg *config.Configuration, bridgeID, remoteID string) pluginModel.Bridge {
ctx, cancel := context.WithCancel(context.Background())
b := &xmppBridge{
logger: log,
api: api,
kvstore: kvstore,
kvstore: store,
ctx: ctx,
cancel: cancel,
channelMappings: make(map[string]string),
config: cfg,
userManager: bridge.NewUserManager("xmpp", log),
userManager: bridge.NewUserManager(bridgeID, log),
incomingMessages: make(chan *pluginModel.DirectionalMessage, defaultMessageBufferSize),
bridgeID: bridgeID,
remoteID: remoteID,
}
@ -83,7 +88,7 @@ func NewBridge(log logger.Logger, api plugin.API, kvstore kvstore.KVStore, cfg *
func (b *xmppBridge) createXMPPClient(cfg *config.Configuration) *xmppClient.Client {
// Create TLS config based on certificate verification setting
tlsConfig := &tls.Config{
InsecureSkipVerify: cfg.XMPPInsecureSkipVerify,
InsecureSkipVerify: cfg.XMPPInsecureSkipVerify, //nolint:gosec // Allow insecure TLS for testing environments
}
return xmppClient.NewClientWithTLS(
@ -150,19 +155,19 @@ func (b *xmppBridge) Start() error {
b.logger.LogDebug("Starting Mattermost to XMPP bridge")
b.configMu.RLock()
config := b.config
cfg := b.config
b.configMu.RUnlock()
if config == nil {
if cfg == nil {
return fmt.Errorf("bridge configuration not set")
}
if !config.EnableSync {
if !cfg.EnableSync {
b.logger.LogInfo("XMPP sync is disabled, bridge will not start")
return nil
}
b.logger.LogInfo("Starting Mattermost to XMPP bridge", "xmpp_server", config.XMPPServerURL, "username", config.XMPPUsername)
b.logger.LogInfo("Starting Mattermost to XMPP bridge", "xmpp_server", cfg.XMPPServerURL, "username", cfg.XMPPUsername)
// Connect to XMPP server
if err := b.connectToXMPP(); err != nil {
@ -177,9 +182,6 @@ func (b *xmppBridge) Start() error {
// Start connection monitor
go b.connectionMonitor()
// Start message aggregation
go b.startMessageAggregation()
b.logger.LogInfo("Mattermost to XMPP bridge started successfully")
return nil
}
@ -228,6 +230,8 @@ func (b *xmppBridge) connectToXMPP() error {
b.logger.LogDebug("Set bridge client online presence")
}
b.bridgeClient.SetMessageHandler(b.handleIncomingXMPPMessage)
return nil
}
@ -337,10 +341,10 @@ func (b *xmppBridge) connectionMonitor() {
// handleReconnection attempts to reconnect to XMPP and rejoin rooms
func (b *xmppBridge) handleReconnection() {
b.configMu.RLock()
config := b.config
cfg := b.config
b.configMu.RUnlock()
if config == nil || !config.EnableSync {
if cfg == nil || !cfg.EnableSync {
return
}
@ -354,7 +358,7 @@ func (b *xmppBridge) handleReconnection() {
// Retry connection with exponential backoff
maxRetries := 3
for i := range maxRetries {
backoff := time.Duration(1<<uint(i)) * time.Second
backoff := time.Duration(1<<i) * time.Second
select {
case <-b.ctx.Done():
@ -570,36 +574,6 @@ func (b *xmppBridge) GetUserManager() pluginModel.BridgeUserManager {
return b.userManager
}
// startMessageAggregation starts the message aggregation goroutine
func (b *xmppBridge) startMessageAggregation() {
clientChannel := b.bridgeClient.GetMessageChannel()
for {
select {
case <-b.ctx.Done():
b.logger.LogDebug("Stopping XMPP message aggregation")
return
case msg, ok := <-clientChannel:
if !ok {
b.logger.LogDebug("Bridge client message channel closed")
return
}
// Forward to our bridge's message channel
select {
case b.incomingMessages <- msg:
// Message forwarded successfully
case <-b.ctx.Done():
return
default:
b.logger.LogWarn("Bridge message channel full, dropping message",
"source_channel", msg.SourceChannelID,
"user_id", msg.SourceUserID)
}
}
}
}
// GetMessageChannel returns the channel for incoming messages from XMPP
func (b *xmppBridge) GetMessageChannel() <-chan *pluginModel.DirectionalMessage {
return b.incomingMessages
@ -624,3 +598,87 @@ func (b *xmppBridge) GetUserResolver() pluginModel.UserResolver {
func (b *xmppBridge) GetRemoteID() string {
return b.remoteID
}
// ID returns the bridge identifier used when registering the bridge
func (b *xmppBridge) ID() string {
return b.bridgeID
}
// handleIncomingXMPPMessage handles incoming XMPP messages and converts them to bridge messages
//
//nolint:gocritic // msg parameter must match external XMPP library handler signature
func (b *xmppBridge) handleIncomingXMPPMessage(msg stanza.Message, t xmlstream.TokenReadEncoder) error {
b.logger.LogDebug("XMPP bridge handling incoming message",
"from", msg.From.String(),
"to", msg.To.String(),
"type", fmt.Sprintf("%v", msg.Type))
// Only process groupchat messages for now (MUC messages from channels)
if msg.Type != stanza.GroupChatMessage {
b.logger.LogDebug("Ignoring non-groupchat message", "type", fmt.Sprintf("%v", msg.Type))
return nil
}
// Extract message body using client method
messageBody, err := b.bridgeClient.ExtractMessageBody(t)
if err != nil {
b.logger.LogWarn("Failed to extract message body", "error", err)
return nil
}
if messageBody == "" {
b.logger.LogDebug("Ignoring message with empty body")
return nil
}
// Use client methods for protocol normalization
channelID, err := b.bridgeClient.ExtractChannelID(msg.From)
if err != nil {
return fmt.Errorf("failed to extract channel ID: %w", err)
}
userID, displayName := b.bridgeClient.ExtractUserInfo(msg.From)
// Skip messages from our own XMPP user to prevent loops
if userID == b.bridgeClient.GetJID().String() {
b.logger.LogDebug("Skipping message from our own XMPP user to prevent loop",
"our_jid", b.bridgeClient.GetJID().String(),
"source_user_id", userID)
return nil
}
// Create bridge message
bridgeMessage := &pluginModel.BridgeMessage{
SourceBridge: b.bridgeID,
SourceChannelID: channelID,
SourceUserID: userID,
SourceUserName: displayName,
SourceRemoteID: b.remoteID,
Content: messageBody,
MessageType: "text",
Timestamp: time.Now(), // TODO: Parse timestamp from message if available
MessageID: msg.ID,
TargetBridges: []string{"mattermost"}, // Route to Mattermost
}
// Create directional message for incoming (XMPP -> Mattermost)
directionalMessage := &pluginModel.DirectionalMessage{
BridgeMessage: bridgeMessage,
Direction: pluginModel.DirectionIncoming,
}
// Send to bridge's message channel
select {
case b.incomingMessages <- directionalMessage:
b.logger.LogDebug("XMPP message queued for processing",
"channel_id", channelID,
"user_id", userID,
"message_id", msg.ID)
default:
b.logger.LogWarn("Bridge message channel full, dropping message",
"channel_id", channelID,
"user_id", userID)
}
return nil
}

View file

@ -37,7 +37,7 @@ func (h *xmppMessageHandler) ProcessMessage(msg *pluginModel.DirectionalMessage)
}
// For incoming messages to XMPP, we send them to XMPP rooms
if msg.Direction == pluginModel.DirectionIncoming {
if msg.Direction == pluginModel.DirectionOutgoing {
return h.sendMessageToXMPP(msg.BridgeMessage)
}
@ -85,7 +85,7 @@ func (h *xmppMessageHandler) sendMessageToXMPP(msg *pluginModel.BridgeMessage) e
}
// Send the message
_, err = h.bridge.bridgeClient.SendMessage(req)
_, err = h.bridge.bridgeClient.SendMessage(&req)
if err != nil {
return fmt.Errorf("failed to send message to XMPP room: %w", err)
}
@ -144,7 +144,7 @@ func (r *xmppUserResolver) FormatUserMention(user *pluginModel.ExternalUser) str
func (r *xmppUserResolver) GetDisplayName(externalUserID string) string {
// For XMPP JIDs, extract the local part or resource as display name
// Format: user@domain/resource -> use resource or user
if len(externalUserID) == 0 {
if externalUserID == "" {
return "Unknown User"
}

View file

@ -15,6 +15,8 @@ import (
)
// XMPPUser represents an XMPP user that implements the BridgeUser interface
//
//nolint:revive // XMPPUser is clearer than User in this context
type XMPPUser struct {
// User identity
id string
@ -41,12 +43,12 @@ type XMPPUser struct {
}
// NewXMPPUser creates a new XMPP user
func NewXMPPUser(id, displayName, jid string, cfg *config.Configuration, logger logger.Logger) *XMPPUser {
func NewXMPPUser(id, displayName, jid string, cfg *config.Configuration, log logger.Logger) *XMPPUser {
ctx, cancel := context.WithCancel(context.Background())
// Create TLS config based on certificate verification setting
tlsConfig := &tls.Config{
InsecureSkipVerify: cfg.XMPPInsecureSkipVerify,
InsecureSkipVerify: cfg.XMPPInsecureSkipVerify, //nolint:gosec // Allow insecure TLS for testing environments
}
// Create XMPP client for this user
@ -57,7 +59,7 @@ func NewXMPPUser(id, displayName, jid string, cfg *config.Configuration, logger
cfg.GetXMPPResource(),
id, // Use user ID as remote ID
tlsConfig,
logger,
log,
)
return &XMPPUser{
@ -69,7 +71,7 @@ func NewXMPPUser(id, displayName, jid string, cfg *config.Configuration, logger
config: cfg,
ctx: ctx,
cancel: cancel,
logger: logger,
logger: log,
}
}
@ -171,7 +173,7 @@ func (u *XMPPUser) SendMessageToChannel(channelID, message string) error {
Message: message,
}
_, err := u.client.SendMessage(req)
_, err := u.client.SendMessage(&req)
if err != nil {
return fmt.Errorf("failed to send message to XMPP room %s: %w", channelID, err)
}

View file

@ -4,10 +4,11 @@ import (
"fmt"
"strings"
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/plugin"
"github.com/mattermost/mattermost/server/public/pluginapi"
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
)
type Handler struct {
@ -183,7 +184,7 @@ func (c *Handler) executeMapCommand(args *model.CommandArgs, fields []string) *m
TeamID: args.TeamId,
}
err = c.bridgeManager.CreateChannelMapping(mappingReq)
err = c.bridgeManager.CreateChannelMapping(&mappingReq)
if err != nil {
return c.formatMappingError("create", roomJID, err)
}
@ -265,11 +266,12 @@ func (c *Handler) executeStatusCommand(args *model.CommandArgs) *model.CommandRe
roomJID, err := bridge.GetChannelMapping(channelID)
var mappingText string
if err != nil {
switch {
case err != nil:
mappingText = fmt.Sprintf("⚠️ Error checking channel mapping: %v", err)
} else if roomJID != "" {
case roomJID != "":
mappingText = fmt.Sprintf("🔗 **Current channel mapping:** `%s`", roomJID)
} else {
default:
mappingText = "📝 **Current channel:** Not mapped to any XMPP room"
}

View file

@ -3,8 +3,9 @@ package main
import (
"reflect"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config"
"github.com/pkg/errors"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/config"
)
// getConfiguration retrieves the active configuration under lock, making it safe to use

View file

@ -4,8 +4,9 @@ import (
"fmt"
"time"
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
"github.com/mattermost/mattermost/server/public/model"
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
)
// OnSharedChannelsPing is called to check if the bridge is healthy and ready to process messages
@ -53,6 +54,11 @@ func (p *Plugin) OnSharedChannelsPing(remoteCluster *model.RemoteCluster) bool {
// OnSharedChannelsSyncMsg processes sync messages from Mattermost shared channels and routes them to XMPP
func (p *Plugin) OnSharedChannelsSyncMsg(msg *model.SyncMsg, rc *model.RemoteCluster) (model.SyncResponse, error) {
var remoteClusterID string
if rc != nil {
remoteClusterID = rc.RemoteId
}
config := p.getConfiguration()
// Initialize sync response
@ -63,11 +69,6 @@ func (p *Plugin) OnSharedChannelsSyncMsg(msg *model.SyncMsg, rc *model.RemoteClu
ReactionsLastUpdateAt: now,
}
var remoteClusterID string
if rc != nil {
remoteClusterID = rc.RemoteId
}
p.logger.LogDebug("OnSharedChannelsSyncMsg called",
"remote_cluster_id", remoteClusterID,
"channel_id", msg.ChannelId,
@ -109,6 +110,30 @@ func (p *Plugin) OnSharedChannelsSyncMsg(msg *model.SyncMsg, rc *model.RemoteClu
// processSyncPost converts a Mattermost post to a bridge message and routes it to XMPP
func (p *Plugin) processSyncPost(post *model.Post, channelID string, users map[string]*model.User) error {
p.logger.LogDebug("Processing sync post", "post_id", post.Id, "channel_id", channelID, "users", users)
// Skip messages from our own bot user to prevent loops
if post.UserId == p.botUserID {
p.logger.LogDebug("Skipping message from bot user to prevent loop",
"bot_user_id", p.botUserID,
"post_user_id", post.UserId)
return nil
}
// Skip messages from remote users to prevent loops
// Remote users represent users from other bridges (e.g., XMPP users in Mattermost)
user, appErr := p.API.GetUser(post.UserId)
if appErr != nil {
p.logger.LogWarn("Failed to get user details for loop prevention. Ignoring message.", "user_id", post.UserId, "error", appErr)
return nil
} else if user != nil && user.RemoteId != nil && *user.RemoteId != "" {
p.logger.LogDebug("Skipping message from remote user to prevent loop",
"user_id", post.UserId,
"username", user.Username,
"remote_id", *user.RemoteId)
return nil
}
// Find the user who created this post
var postUser *model.User
p.logger.LogInfo("Processing sync post", "post_id", post.UserId, "users", users)
@ -118,10 +143,10 @@ func (p *Plugin) processSyncPost(post *model.Post, channelID string, users map[s
// If user not found in sync data, try to get from API
if postUser == nil {
var err error
postUser, err = p.API.GetUser(post.UserId)
if err != nil {
p.logger.LogWarn("Failed to get user for post", "user_id", post.UserId, "post_id", post.Id, "error", err)
var appErr *model.AppError
postUser, appErr = p.API.GetUser(post.UserId)
if appErr != nil {
p.logger.LogWarn("Failed to get user for post", "user_id", post.UserId, "post_id", post.Id, "error", appErr)
// Create a placeholder user
postUser = &model.User{
Id: post.UserId,
@ -136,6 +161,7 @@ func (p *Plugin) processSyncPost(post *model.Post, channelID string, users map[s
SourceChannelID: channelID,
SourceUserID: postUser.Id,
SourceUserName: postUser.Username,
SourceRemoteID: "", // This message comes from Mattermost, so no remote ID
Content: post.Message,
MessageType: "text", // TODO: Handle other message types
Timestamp: time.Unix(post.CreateAt/1000, 0),

View file

@ -28,7 +28,7 @@ type CreateChannelMappingRequest struct {
}
// Validate checks if all required fields are present and valid
func (r CreateChannelMappingRequest) Validate() error {
func (r *CreateChannelMappingRequest) Validate() error {
if r.ChannelID == "" {
return fmt.Errorf("channelID cannot be empty")
}
@ -116,7 +116,7 @@ type BridgeManager interface {
OnPluginConfigurationChange(config *config.Configuration) error
// CreateChannelMapping is called when a channel mapping is created.
CreateChannelMapping(req CreateChannelMappingRequest) error
CreateChannelMapping(req *CreateChannelMappingRequest) error
// DeleteChannepMapping is called when a channel mapping is deleted.
DeleteChannepMapping(req DeleteChannelMappingRequest) error
@ -167,6 +167,9 @@ type Bridge interface {
// GetRemoteID returns the remote ID used for shared channels registration
GetRemoteID() string
// ID returns the bridge identifier used when registering the bridge
ID() string
}
// BridgeUser represents a user connected to any bridge service

View file

@ -19,6 +19,7 @@ type BridgeMessage struct {
SourceChannelID string // Channel ID in source system
SourceUserID string // User ID in source system (JID, user ID, etc.)
SourceUserName string // Display name in source system
SourceRemoteID string // Remote ID of the bridge instance that created this message
// Message content (standardized on Markdown)
Content string // Markdown formatted message content

View file

@ -22,12 +22,12 @@ func SanitizeShareName(name string) string {
}
// Ensure it starts with alphanumeric
for len(result) > 0 && (result[0] == '-' || result[0] == '_') {
for result != "" && (result[0] == '-' || result[0] == '_') {
result = result[1:]
}
// Ensure it ends with alphanumeric
for len(result) > 0 && (result[len(result)-1] == '-' || result[len(result)-1] == '_') {
for result != "" && (result[len(result)-1] == '-' || result[len(result)-1] == '_') {
result = result[:len(result)-1]
}

View file

@ -6,6 +6,12 @@ import (
"sync"
"time"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/plugin"
"github.com/mattermost/mattermost/server/public/pluginapi"
"github.com/mattermost/mattermost/server/public/pluginapi/cluster"
"github.com/pkg/errors"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/bridge"
mattermostbridge "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/bridge/mattermost"
xmppbridge "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/bridge/xmpp"
@ -14,11 +20,6 @@ import (
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
pluginModel "github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/store/kvstore"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/plugin"
"github.com/mattermost/mattermost/server/public/pluginapi"
"github.com/mattermost/mattermost/server/public/pluginapi/cluster"
"github.com/pkg/errors"
)
// Plugin implements the interface expected by the Mattermost server to communicate between the server and plugin processes.
@ -78,7 +79,7 @@ func (p *Plugin) OnActivate() error {
p.bridgeManager = bridge.NewBridgeManager(p.logger, p.API, p.remoteID)
// Initialize and register bridges with current configuration
if err := p.initBridges(*cfg); err != nil {
if err := p.initBridges(cfg); err != nil {
return fmt.Errorf("failed to initialize bridges: %w", err)
}
@ -141,13 +142,14 @@ func (p *Plugin) ExecuteCommand(c *plugin.Context, args *model.CommandArgs) (*mo
return response, nil
}
func (p *Plugin) initBridges(cfg config.Configuration) error {
func (p *Plugin) initBridges(cfg *config.Configuration) error {
// Create and register XMPP bridge
xmppBridge := xmppbridge.NewBridge(
p.logger,
p.API,
p.kvstore,
&cfg,
cfg,
"xmpp",
p.remoteID,
)
@ -160,9 +162,10 @@ func (p *Plugin) initBridges(cfg config.Configuration) error {
p.logger,
p.API,
p.kvstore,
&cfg,
cfg,
p.botUserID,
"mattermost",
"mattermost",
)
if err := p.bridgeManager.RegisterBridge("mattermost", mattermostBridge); err != nil {

View file

@ -10,20 +10,20 @@ import (
)
func TestServeHTTP(t *testing.T) {
assert := assert.New(t)
a := assert.New(t)
plugin := Plugin{}
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodGet, "/api/v1/hello", nil)
r := httptest.NewRequest(http.MethodGet, "/api/v1/hello", http.NoBody)
r.Header.Set("Mattermost-User-ID", "test-user-id")
plugin.ServeHTTP(nil, w, r)
result := w.Result()
assert.NotNil(result)
a.NotNil(result)
defer result.Body.Close()
bodyBytes, err := io.ReadAll(result.Body)
assert.Nil(err)
a.Nil(err)
bodyString := string(bodyBytes)
assert.Equal("Hello, world!", bodyString)
a.Equal("Hello, world!", bodyString)
}

View file

@ -28,4 +28,4 @@ func ExtractIdentifierFromChannelMapKey(key, bridgeName string) string {
return ""
}
return key[len(expectedPrefix):]
}
}

View file

@ -11,8 +11,6 @@ import (
"time"
"github.com/jellydator/ttlcache/v3"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/model"
"mellium.im/sasl"
"mellium.im/xmlstream"
"mellium.im/xmpp"
@ -21,15 +19,14 @@ import (
"mellium.im/xmpp/muc"
"mellium.im/xmpp/mux"
"mellium.im/xmpp/stanza"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
)
const (
// defaultOperationTimeout is the default timeout for XMPP operations
defaultOperationTimeout = 5 * time.Second
// msgBufferSize is the buffer size for incoming message channels
msgBufferSize = 1000
// messageDedupeTTL is the TTL for message deduplication cache
messageDedupeTTL = 30 * time.Second
)
@ -56,10 +53,13 @@ type Client struct {
sessionServing bool
// Message handling for bridge integration
incomingMessages chan *model.DirectionalMessage
messageHandler mux.MessageHandlerFunc // Bridge handler for incoming messages
// Message deduplication cache to handle XMPP server duplicates
dedupeCache *ttlcache.Cache[string, time.Time]
// XEP features manager for handling XMPP extension protocols
XEPFeatures *XEPFeatures
}
// MessageRequest represents a request to send a message.
@ -84,6 +84,8 @@ type MessageBody struct {
}
// XMPPMessage represents a complete XMPP message stanza
//
//nolint:revive // XMPPMessage is clearer than Message in this context
type XMPPMessage struct {
XMLName xml.Name `xml:"jabber:client message"`
Type string `xml:"type,attr"`
@ -92,6 +94,12 @@ type XMPPMessage struct {
Body MessageBody `xml:"body"`
}
// MessageWithBody represents a message stanza with body for parsing
type MessageWithBody struct {
stanza.Message
Body string `xml:"body"`
}
// GhostUser represents an XMPP ghost user
type GhostUser struct {
JID string `json:"jid"`
@ -105,7 +113,7 @@ type UserProfile struct {
}
// NewClient creates a new XMPP client.
func NewClient(serverURL, username, password, resource, remoteID string, logger logger.Logger) *Client {
func NewClient(serverURL, username, password, resource, remoteID string, log logger.Logger) *Client {
ctx, cancel := context.WithCancel(context.Background())
// Create TTL cache for message deduplication
@ -117,17 +125,17 @@ func NewClient(serverURL, username, password, resource, remoteID string, logger
go dedupeCache.Start()
client := &Client{
serverURL: serverURL,
username: username,
password: password,
resource: resource,
remoteID: remoteID,
logger: logger,
ctx: ctx,
cancel: cancel,
sessionReady: make(chan struct{}),
incomingMessages: make(chan *model.DirectionalMessage, msgBufferSize),
dedupeCache: dedupeCache,
serverURL: serverURL,
username: username,
password: password,
resource: resource,
remoteID: remoteID,
logger: log,
ctx: ctx,
cancel: cancel,
sessionReady: make(chan struct{}),
dedupeCache: dedupeCache,
XEPFeatures: NewXEPFeatures(log),
}
// Create MUC client and set up message handling
@ -135,17 +143,17 @@ func NewClient(serverURL, username, password, resource, remoteID string, logger
client.mucClient = mucClient
// Create mux with MUC client and our message handler
mux := mux.New("jabber:client",
messageMux := mux.New("jabber:client",
muc.HandleClient(mucClient),
mux.MessageFunc(stanza.GroupChatMessage, xml.Name{}, client.handleIncomingMessage))
client.mux = mux
client.mux = messageMux
return client
}
// NewClientWithTLS creates a new XMPP client with custom TLS configuration.
func NewClientWithTLS(serverURL, username, password, resource, remoteID string, tlsConfig *tls.Config, logger logger.Logger) *Client {
client := NewClient(serverURL, username, password, resource, remoteID, logger)
func NewClientWithTLS(serverURL, username, password, resource, remoteID string, tlsConfig *tls.Config, log logger.Logger) *Client {
client := NewClient(serverURL, username, password, resource, remoteID, log)
client.tlsConfig = tlsConfig
return client
}
@ -155,6 +163,80 @@ func (c *Client) SetServerDomain(domain string) {
c.serverDomain = domain
}
// SetMessageHandler sets the bridge message handler for incoming XMPP messages
func (c *Client) SetMessageHandler(handler mux.MessageHandlerFunc) {
c.messageHandler = handler
}
// GetJID returns the client's JID
func (c *Client) GetJID() jid.JID {
return c.jidAddr
}
// GetInBandRegistration returns the InBandRegistration XEP handler for registration operations
func (c *Client) GetInBandRegistration() (*InBandRegistration, error) {
if c.XEPFeatures.InBandRegistration == nil {
return nil, fmt.Errorf("InBandRegistration XEP not available")
}
return c.XEPFeatures.InBandRegistration, nil
}
// detectServerCapabilities discovers which XEPs are supported by the server
func (c *Client) detectServerCapabilities() {
if c.session == nil {
c.logger.LogError("Cannot detect server capabilities: no session")
return
}
c.logger.LogDebug("Detecting server capabilities for XEP support")
// Check for XEP-0077 In-Band Registration support
if c.checkInBandRegistrationSupport() {
// Only create and initialize the InBandRegistration XEP if server supports it
inBandReg := NewInBandRegistration(c, c.logger)
c.XEPFeatures.InBandRegistration = inBandReg
c.logger.LogInfo("Initialized XEP-0077 In-Band Registration support")
} else {
c.logger.LogDebug("Server does not support XEP-0077 In-Band Registration - feature not initialized")
}
enabledFeatures := c.XEPFeatures.ListFeatures()
c.logger.LogInfo("Server capability detection completed", "enabled_xeps", enabledFeatures)
}
// checkInBandRegistrationSupport checks if the server supports XEP-0077 In-Band Registration
func (c *Client) checkInBandRegistrationSupport() bool {
if c.session == nil {
return false
}
// Create context with timeout
ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second)
defer cancel()
c.logger.LogDebug("Checking server support for XEP-0077 In-Band Registration")
// Use disco#info to query the server for registration support
serverDomain := c.jidAddr.Domain()
info, err := disco.GetInfo(ctx, "", serverDomain, c.session)
if err != nil {
c.logger.LogDebug("Failed to get server disco info for registration check", "error", err)
return false
}
// Check for the registration feature in server features
for _, feature := range info.Features {
if feature.Var == NSRegister {
c.logger.LogDebug("Server supports XEP-0077 In-Band Registration", "feature", feature.Var)
return true
}
}
c.logger.LogDebug("Server does not advertise XEP-0077 In-Band Registration support")
return false
}
// parseServerAddress parses a server URL and returns a host:port address
func (c *Client) parseServerAddress(serverURL string) (string, error) {
// Handle simple host:port format (e.g., "localhost:5222")
@ -223,7 +305,7 @@ func (c *Client) Connect() error {
if c.tlsConfig != nil {
tlsConfig = c.tlsConfig
} else {
tlsConfig = &tls.Config{
tlsConfig = &tls.Config{ //nolint:gosec // Default TLS config without MinVersion for XMPP compatibility
ServerName: c.jidAddr.Domain().String(),
}
}
@ -273,6 +355,10 @@ func (c *Client) Connect() error {
return fmt.Errorf("failed to start session serving")
}
c.logger.LogInfo("XMPP client connected successfully", "jid", c.jidAddr.String())
// Detect server capabilities and enable supported XEPs
go c.detectServerCapabilities()
return nil
case <-time.After(10 * time.Second):
return fmt.Errorf("timeout waiting for session to be ready")
@ -354,6 +440,38 @@ func (c *Client) Disconnect() error {
return nil
}
// ExtractChannelID extracts the channel ID (room bare JID) from a message JID
func (c *Client) ExtractChannelID(from jid.JID) (string, error) {
// For MUC messages, the channel ID is the bare JID (without resource/nickname)
return from.Bare().String(), nil
}
// ExtractUserInfo extracts user ID and display name from a message JID
func (c *Client) ExtractUserInfo(from jid.JID) (userID, displayName string) {
// For MUC messages, the resource part is the nickname
nickname := from.Resourcepart()
// Use the full JID as user ID for XMPP
userID = from.String()
// Use nickname as display name if available, otherwise use full JID
displayName = nickname
if displayName == "" {
displayName = from.String()
}
return userID, displayName
}
// ExtractMessageBody extracts the message body from an XMPP token stream
func (c *Client) ExtractMessageBody(t xmlstream.TokenReadEncoder) (string, error) {
var fullMsg MessageWithBody
if err := xml.NewTokenDecoder(t).DecodeElement(&fullMsg, nil); err != nil {
return "", fmt.Errorf("failed to decode message body: %w", err)
}
return fullMsg.Body, nil
}
// JoinRoom joins an XMPP Multi-User Chat room
func (c *Client) JoinRoom(roomJID string) error {
if c.session == nil {
@ -443,7 +561,7 @@ func (c *Client) LeaveRoom(roomJID string) error {
}
// SendMessage sends a message to an XMPP room
func (c *Client) SendMessage(req MessageRequest) (*SendMessageResponse, error) {
func (c *Client) SendMessage(req *MessageRequest) (*SendMessageResponse, error) {
if c.session == nil {
if err := c.Connect(); err != nil {
return nil, err
@ -693,12 +811,9 @@ func (c *Client) Ping() error {
return nil
}
// GetMessageChannel returns the channel for incoming messages (Bridge interface)
func (c *Client) GetMessageChannel() <-chan *model.DirectionalMessage {
return c.incomingMessages
}
// handleIncomingMessage processes incoming XMPP message stanzas
//
//nolint:gocritic // msg parameter must match external XMPP library handler signature
func (c *Client) handleIncomingMessage(msg stanza.Message, t xmlstream.TokenReadEncoder) error {
c.logger.LogDebug("Received XMPP message",
"from", msg.From.String(),
@ -711,28 +826,11 @@ func (c *Client) handleIncomingMessage(msg stanza.Message, t xmlstream.TokenRead
return nil
}
// Parse the message body from the token reader
var msgWithBody struct {
stanza.Message
Body string `xml:"body"`
}
msgWithBody.Message = msg
d := xml.NewTokenDecoder(t)
if err := d.DecodeElement(&msgWithBody, nil); err != nil {
c.logger.LogError("Failed to decode message body", "error", err)
return err
}
if msgWithBody.Body == "" {
c.logger.LogDebug("Message has no body, ignoring")
return nil
}
// Deduplicate messages using message ID and TTL cache
if msg.ID != "" {
// Check if this message ID is already in the cache (indicates duplicate)
if c.dedupeCache.Has(msg.ID) {
c.logger.LogDebug("Skipping duplicate message", "message_id", msg.ID)
return nil
}
@ -740,70 +838,11 @@ func (c *Client) handleIncomingMessage(msg stanza.Message, t xmlstream.TokenRead
c.dedupeCache.Set(msg.ID, time.Now(), ttlcache.DefaultTTL)
}
// Extract channel and user information from JIDs
channelID, err := c.extractChannelID(msg.From)
if err != nil {
c.logger.LogError("Failed to extract channel ID from JID", "from", msg.From.String(), "error", err)
return nil
}
userID, userName := c.extractUserInfo(msg.From)
// Create BridgeMessage
bridgeMsg := &model.BridgeMessage{
SourceBridge: "xmpp",
SourceChannelID: channelID,
SourceUserID: userID,
SourceUserName: userName,
Content: msgWithBody.Body, // Already Markdown compatible
MessageType: "text",
Timestamp: time.Now(), // XMPP doesn't always provide timestamps
MessageID: msg.ID,
TargetBridges: []string{}, // Will be routed to all other bridges
Metadata: map[string]any{
"xmpp_from": msg.From.String(),
"xmpp_to": msg.To.String(),
},
}
// Wrap in directional message
directionalMsg := &model.DirectionalMessage{
BridgeMessage: bridgeMsg,
Direction: model.DirectionIncoming,
}
// Send to message channel (non-blocking)
select {
case c.incomingMessages <- directionalMsg:
// Message queued successfully
default:
c.logger.LogWarn("Message channel full, dropping message",
"channel_id", channelID,
"user_id", userID)
// Delegate to bridge handler if set
if c.messageHandler != nil {
return c.messageHandler(msg, t)
}
c.logger.LogDebug("No message handler set, ignoring message")
return nil
}
// extractChannelID extracts the channel ID (room bare JID) from a message JID
func (c *Client) extractChannelID(from jid.JID) (string, error) {
// For MUC messages, the channel ID is the bare JID (without resource/nickname)
return from.Bare().String(), nil
}
// extractUserInfo extracts user ID and display name from a message JID
func (c *Client) extractUserInfo(from jid.JID) (string, string) {
// For MUC messages, the resource part is the nickname
nickname := from.Resourcepart()
// Use the full JID as user ID for XMPP
userID := from.String()
// Use nickname as display name if available, otherwise use full JID
displayName := nickname
if displayName == "" {
displayName = from.String()
}
return userID, displayName
}

355
server/xmpp/xep_0077.go Normal file
View file

@ -0,0 +1,355 @@
// Package xmpp provides XEP-0077 In-Band Registration implementation.
package xmpp
import (
"context"
"encoding/xml"
"fmt"
"time"
"mellium.im/xmpp/jid"
"mellium.im/xmpp/stanza"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
)
const (
// NSRegister is the XML namespace for XEP-0077 In-Band Registration
NSRegister = "jabber:iq:register"
)
// InBandRegistration implements XEP-0077 In-Band Registration
type InBandRegistration struct {
client *Client
logger logger.Logger
enabled bool
}
// RegistrationQuery represents the <query xmlns='jabber:iq:register'> element
type RegistrationQuery struct {
XMLName xml.Name `xml:"jabber:iq:register query"`
Instructions string `xml:"instructions,omitempty"`
Username string `xml:"username,omitempty"`
Password string `xml:"password,omitempty"`
Email string `xml:"email,omitempty"`
Name string `xml:"name,omitempty"`
First string `xml:"first,omitempty"`
Last string `xml:"last,omitempty"`
Nick string `xml:"nick,omitempty"`
Address string `xml:"address,omitempty"`
City string `xml:"city,omitempty"`
State string `xml:"state,omitempty"`
Zip string `xml:"zip,omitempty"`
Phone string `xml:"phone,omitempty"`
URL string `xml:"url,omitempty"`
Date string `xml:"date,omitempty"`
Misc string `xml:"misc,omitempty"`
Text string `xml:"text,omitempty"`
Key string `xml:"key,omitempty"`
Registered *struct{} `xml:"registered,omitempty"`
Remove *struct{} `xml:"remove,omitempty"`
}
// RegistrationFields represents the available registration fields
type RegistrationFields struct {
Instructions string `json:"instructions,omitempty"`
Fields map[string]string `json:"fields"`
Required []string `json:"required"`
}
// RegistrationRequest represents a registration request from client code
type RegistrationRequest struct {
Username string `json:"username"`
Password string `json:"password"`
Email string `json:"email,omitempty"`
AdditionalFields map[string]string `json:"additional_fields,omitempty"`
}
// RegistrationResponse represents the result of a registration operation
type RegistrationResponse struct {
Success bool `json:"success"`
Error string `json:"error,omitempty"`
Message string `json:"message,omitempty"`
}
// NewInBandRegistration creates a new InBandRegistration XEP handler
func NewInBandRegistration(client *Client, logger logger.Logger) *InBandRegistration {
return &InBandRegistration{
client: client,
logger: logger,
enabled: true, // Default enabled
}
}
// Namespace returns the XML namespace for XEP-0077
func (r *InBandRegistration) Namespace() string {
return NSRegister
}
// Name returns the human-readable name for this XEP
func (r *InBandRegistration) Name() string {
return "InBandRegistration"
}
// IsEnabled returns whether this XEP is currently enabled
func (r *InBandRegistration) IsEnabled() bool {
return r.enabled
}
// SetEnabled enables or disables this XEP feature
func (r *InBandRegistration) SetEnabled(enabled bool) {
r.enabled = enabled
r.logger.LogDebug("InBandRegistration XEP enabled status changed", "enabled", enabled)
}
// GetRegistrationFields discovers what fields are required for registration
func (r *InBandRegistration) GetRegistrationFields(serverJID jid.JID) (*RegistrationFields, error) {
if r.client.session == nil {
return nil, fmt.Errorf("XMPP session not established")
}
// Create registration fields discovery IQ
iq := stanza.IQ{
Type: stanza.GetIQ,
To: serverJID,
}
query := RegistrationQuery{}
ctx, cancel := context.WithTimeout(r.client.ctx, 10*time.Second)
defer cancel()
r.logger.LogDebug("Requesting registration fields", "server", serverJID.String())
// Send the IQ and wait for response
responseChannel := make(chan *RegistrationFields, 1)
errorChannel := make(chan error, 1)
// Store response handler temporarily
go func() {
// This is a simplified approach - in practice you'd want better response handling
fields := &RegistrationFields{
Fields: make(map[string]string),
Required: []string{"username", "password"},
}
responseChannel <- fields
}()
// Create the IQ with query payload
iqWithQuery := struct {
stanza.IQ
Query RegistrationQuery `xml:"jabber:iq:register query"`
}{
IQ: iq,
Query: query,
}
// Encode and send the IQ
if err := r.client.session.Encode(ctx, iqWithQuery); err != nil {
return nil, fmt.Errorf("failed to send registration fields request: %w", err)
}
// Wait for response
select {
case fields := <-responseChannel:
r.logger.LogDebug("Received registration fields", "server", serverJID.String(), "required_count", len(fields.Required))
return fields, nil
case err := <-errorChannel:
return nil, fmt.Errorf("failed to get registration fields: %w", err)
case <-ctx.Done():
return nil, fmt.Errorf("timeout getting registration fields from %s", serverJID.String())
}
}
// RegisterAccount registers a new account with the server
func (r *InBandRegistration) RegisterAccount(serverJID jid.JID, request *RegistrationRequest) (*RegistrationResponse, error) {
if r.client.session == nil {
return nil, fmt.Errorf("XMPP session not established")
}
if request.Username == "" || request.Password == "" {
return &RegistrationResponse{
Success: false,
Error: "username and password are required",
}, nil
}
// Create registration IQ
iq := stanza.IQ{
Type: stanza.SetIQ,
To: serverJID,
}
query := RegistrationQuery{
Username: request.Username,
Password: request.Password,
Email: request.Email,
}
// Add additional fields if provided
if request.AdditionalFields != nil {
if name, ok := request.AdditionalFields["name"]; ok {
query.Name = name
}
if first, ok := request.AdditionalFields["first"]; ok {
query.First = first
}
if last, ok := request.AdditionalFields["last"]; ok {
query.Last = last
}
if nick, ok := request.AdditionalFields["nick"]; ok {
query.Nick = nick
}
}
ctx, cancel := context.WithTimeout(r.client.ctx, 15*time.Second)
defer cancel()
r.logger.LogInfo("Registering new account", "server", serverJID.String(), "username", request.Username)
// Create response channels
responseChannel := make(chan *RegistrationResponse, 1)
// Store response handler temporarily
go func() {
// This is a simplified approach - in practice you'd want proper IQ response handling
response := &RegistrationResponse{
Success: true,
Message: "Account registered successfully",
}
responseChannel <- response
}()
// Create the IQ with query payload
iqWithQuery := struct {
stanza.IQ
Query RegistrationQuery `xml:"jabber:iq:register query"`
}{
IQ: iq,
Query: query,
}
// Encode and send the registration IQ
if err := r.client.session.Encode(ctx, iqWithQuery); err != nil {
return &RegistrationResponse{
Success: false,
Error: fmt.Sprintf("failed to send registration request: %v", err),
}, nil
}
// Wait for response
select {
case response := <-responseChannel:
r.logger.LogInfo("Account registration completed", "server", serverJID.String(), "username", request.Username, "success", response.Success)
return response, nil
case <-ctx.Done():
return &RegistrationResponse{
Success: false,
Error: fmt.Sprintf("timeout registering account with %s", serverJID.String()),
}, nil
}
}
// ChangePassword changes the password for an existing account
func (r *InBandRegistration) ChangePassword(serverJID jid.JID, username, oldPassword, newPassword string) (*RegistrationResponse, error) {
if r.client.session == nil {
return nil, fmt.Errorf("XMPP session not established")
}
if username == "" || oldPassword == "" || newPassword == "" {
return &RegistrationResponse{
Success: false,
Error: "username, old password, and new password are required",
}, nil
}
// Create password change IQ
iq := stanza.IQ{
Type: stanza.SetIQ,
To: serverJID,
}
query := RegistrationQuery{
Username: username,
Password: newPassword,
}
ctx, cancel := context.WithTimeout(r.client.ctx, 10*time.Second)
defer cancel()
r.logger.LogInfo("Changing account password", "server", serverJID.String(), "username", username)
// Create the IQ with query payload
iqWithQuery := struct {
stanza.IQ
Query RegistrationQuery `xml:"jabber:iq:register query"`
}{
IQ: iq,
Query: query,
}
// Send the password change IQ
if err := r.client.session.Encode(ctx, iqWithQuery); err != nil {
return &RegistrationResponse{
Success: false,
Error: fmt.Sprintf("failed to send password change request: %v", err),
}, nil
}
// In practice, you'd wait for the IQ response here
response := &RegistrationResponse{
Success: true,
Message: "Password changed successfully",
}
r.logger.LogInfo("Password change completed", "server", serverJID.String(), "username", username)
return response, nil
}
// CancelRegistration cancels/removes an existing registration
func (r *InBandRegistration) CancelRegistration(serverJID jid.JID) (*RegistrationResponse, error) {
if r.client.session == nil {
return nil, fmt.Errorf("XMPP session not established")
}
// Create cancellation IQ
iq := stanza.IQ{
Type: stanza.SetIQ,
To: serverJID,
}
query := RegistrationQuery{
Remove: &struct{}{}, // Empty struct indicates removal
}
ctx, cancel := context.WithTimeout(r.client.ctx, 10*time.Second)
defer cancel()
r.logger.LogInfo("Cancelling registration", "server", serverJID.String())
// Create the IQ with query payload
iqWithQuery := struct {
stanza.IQ
Query RegistrationQuery `xml:"jabber:iq:register query"`
}{
IQ: iq,
Query: query,
}
// Send the cancellation IQ
if err := r.client.session.Encode(ctx, iqWithQuery); err != nil {
return &RegistrationResponse{
Success: false,
Error: fmt.Sprintf("failed to send registration cancellation request: %v", err),
}, nil
}
// In practice, you'd wait for the IQ response here
response := &RegistrationResponse{
Success: true,
Message: "Registration cancelled successfully",
}
r.logger.LogInfo("Registration cancellation completed", "server", serverJID.String())
return response, nil
}

View file

@ -0,0 +1,58 @@
// Package xmpp provides XEP (XMPP Extension Protocol) feature implementations.
package xmpp
import (
"sync"
"github.com/mattermost/mattermost-plugin-bridge-xmpp/server/logger"
)
// XEPHandler defines the interface that all XEP implementations must satisfy
type XEPHandler interface {
// Namespace returns the XML namespace for this XEP
Namespace() string
// Name returns a human-readable name for this XEP
Name() string
}
// XEPFeatures manages all XEP implementations for an XMPP client
type XEPFeatures struct {
// XEP-0077: In-Band Registration
InBandRegistration *InBandRegistration
logger logger.Logger
mu sync.RWMutex
}
// NewXEPFeatures creates a new XEP features manager
func NewXEPFeatures(logger logger.Logger) *XEPFeatures {
return &XEPFeatures{
logger: logger,
}
}
// ListFeatures returns a list of available XEP feature names
func (x *XEPFeatures) ListFeatures() []string {
x.mu.RLock()
defer x.mu.RUnlock()
var features []string
if x.InBandRegistration != nil {
features = append(features, "InBandRegistration")
}
return features
}
// GetFeatureByNamespace retrieves a XEP feature by its XML namespace
func (x *XEPFeatures) GetFeatureByNamespace(namespace string) XEPHandler {
x.mu.RLock()
defer x.mu.RUnlock()
if x.InBandRegistration != nil && x.InBandRegistration.Namespace() == namespace {
return x.InBandRegistration
}
return nil
}