discord-jukebox-bot/pkg/discord/jukebox.go

835 lines
23 KiB
Go

package discord
import (
"bufio"
"context"
"encoding/binary"
"fmt"
"io"
"log/slog"
"os/exec"
"strings"
"sync"
"time"
"discord-jukebox-bot/pkg/subsonic"
"github.com/bwmarrin/discordgo"
"layeh.com/gopus"
)
// JukeboxPlayer handles the music playback functionality
type JukeboxPlayer struct {
bot *Bot
currentSong *subsonic.Song
currentSongStarted time.Time
playlist []subsonic.Song
playlistMutex sync.Mutex
playingMutex sync.Mutex
currentStreamCancel context.CancelFunc
songHistory []*subsonic.Song
historyMutex sync.RWMutex
maxHistorySize int
initialized bool
}
// NewJukeboxPlayer creates a new jukebox player
func NewJukeboxPlayer(bot *Bot) *JukeboxPlayer {
jukebox := &JukeboxPlayer{
bot: bot,
playlist: make([]subsonic.Song, 0),
songHistory: make([]*subsonic.Song, 0),
maxHistorySize: 20, // Store the last 20 songs
initialized: true,
}
// Register command handlers
bot.RegisterCommand("play", jukebox.handlePlay)
bot.RegisterCommand("stop", jukebox.handleStop)
bot.RegisterCommand("skip", jukebox.handleSkip)
bot.RegisterCommand("info", jukebox.handleInfo)
return jukebox
}
// handlePlay handles the play command
func (j *JukeboxPlayer) handlePlay(s *discordgo.Session, i *discordgo.InteractionCreate) {
// Acknowledge the interaction immediately
err := s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{
Type: discordgo.InteractionResponseDeferredChannelMessageWithSource,
})
if err != nil {
slog.Error("Error responding to interaction", "error", err)
return
}
// Find the voice channel the user is in
var channelID string
if i.GuildID != "" {
// Find the voice state of the user
vs, err := s.State.VoiceState(i.GuildID, i.Member.User.ID)
if err == nil && vs != nil && vs.ChannelID != "" {
channelID = vs.ChannelID
}
}
// If we couldn't find the user's voice channel
if channelID == "" {
content := "You need to be in a voice channel to use this command."
_, err = s.InteractionResponseEdit(i.Interaction, &discordgo.WebhookEdit{
Content: &content,
})
if err != nil {
slog.Error("Error editing interaction response", "error", err)
}
return
}
// Join the voice channel
err = j.bot.JoinVoiceChannel(i.GuildID, channelID)
if err != nil {
content := "Failed to join voice channel: " + err.Error()
_, err = s.InteractionResponseEdit(i.Interaction, &discordgo.WebhookEdit{
Content: &content,
})
if err != nil {
slog.Error("Error editing interaction response", "error", err)
}
return
}
// Start playing music if not already playing
if !j.bot.IsPlaying() {
go j.startPlaying()
content := "🎵 Jukebox started! Random songs will be played from your Subsonic library."
_, err = s.InteractionResponseEdit(i.Interaction, &discordgo.WebhookEdit{
Content: &content,
})
} else {
content := "🎵 Jukebox is already playing!"
_, err = s.InteractionResponseEdit(i.Interaction, &discordgo.WebhookEdit{
Content: &content,
})
}
if err != nil {
slog.Error("Error editing interaction response", "error", err)
}
}
// handleStop handles the stop command
func (j *JukeboxPlayer) handleStop(s *discordgo.Session, i *discordgo.InteractionCreate) {
// Acknowledge the interaction immediately
err := s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{
Type: discordgo.InteractionResponseChannelMessageWithSource,
Data: &discordgo.InteractionResponseData{
Content: "Stopping jukebox...",
},
})
if err != nil {
slog.Error("Error responding to interaction", "error", err)
return
}
// First stop the music
j.bot.stopPlaying()
// Then leave the voice channel
j.bot.leaveVoiceChannel()
// Clear the Discord status when stopping
j.bot.session.UpdateGameStatus(0, "")
// Update the response
content := "🛑 Jukebox stopped."
_, err = s.InteractionResponseEdit(i.Interaction, &discordgo.WebhookEdit{
Content: &content,
})
if err != nil {
slog.Error("Error editing interaction response", "error", err)
}
}
// handleSkip handles the skip command
func (j *JukeboxPlayer) handleSkip(s *discordgo.Session, i *discordgo.InteractionCreate) {
// Acknowledge the interaction immediately
err := s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{
Type: discordgo.InteractionResponseChannelMessageWithSource,
Data: &discordgo.InteractionResponseData{
Content: "Skipping to next song...",
},
})
if err != nil {
slog.Error("Error responding to interaction", "error", err)
return
}
if !j.bot.IsPlaying() {
content := "Jukebox is not currently playing."
_, err = s.InteractionResponseEdit(i.Interaction, &discordgo.WebhookEdit{
Content: &content,
})
if err != nil {
slog.Error("Error editing interaction response", "error", err)
}
return
}
// Get the current song info for the response message
currentSong := j.GetCurrentSong()
songInfo := ""
if currentSong != nil {
songInfo = fmt.Sprintf(" (\"%s\" by %s)", currentSong.Title, currentSong.Artist)
}
// Skip the current song without stopping the playback
j.skipCurrentSong()
// Update the response
content := fmt.Sprintf("⏭️ Skipped current song%s. Loading next track...", songInfo)
_, err = s.InteractionResponseEdit(i.Interaction, &discordgo.WebhookEdit{
Content: &content,
})
if err != nil {
slog.Error("Error editing interaction response", "error", err)
}
}
// startPlaying starts the jukebox playback
func (j *JukeboxPlayer) startPlaying() {
j.bot.mu.Lock()
if j.bot.playing {
slog.Debug("Jukebox is already playing, ignoring play request")
j.bot.mu.Unlock()
return
}
j.bot.playing = true
stopChan := j.bot.stopChan
j.bot.mu.Unlock()
// Make sure the voice connection is speaking
if j.bot.voiceConn != nil && j.bot.voiceConn.Ready {
err := j.bot.voiceConn.Speaking(true)
if err != nil {
slog.Warn("Failed to set speaking state", "error", err)
// Try to reconnect voice
if j.bot.voiceConn != nil && j.bot.voiceConn.GuildID != "" && j.bot.voiceConn.ChannelID != "" {
err = j.bot.JoinVoiceChannel(j.bot.voiceConn.GuildID, j.bot.voiceConn.ChannelID)
if err != nil {
slog.Error("Failed to restore voice connection", "error", err)
// Continue anyway, we'll try again when playing song
}
}
}
}
slog.Info("Starting jukebox playback")
for {
// For each iteration, get the latest stop channel
j.bot.mu.Lock()
localStopChan := j.bot.stopChan
isPlaying := j.bot.playing
j.bot.mu.Unlock()
// If we're completely stopped, exit the playback loop
if !isPlaying {
slog.Info("Jukebox playback stopped")
// Clear current song when stopping
j.playingMutex.Lock()
j.currentSong = nil
j.currentSongStarted = time.Time{} // Reset time to zero value
j.playingMutex.Unlock()
return
}
stopChan = localStopChan
// Check if we should stop
select {
case <-stopChan:
// Check if we're still supposed to be playing (complete stop vs. skip)
j.bot.mu.Lock()
stillPlaying := j.bot.playing
j.bot.mu.Unlock()
if !stillPlaying {
return // Completely stop playback
}
slog.Info("Skipping to next song")
// Otherwise, continue to the next song (was a skip)
default:
// Continue playing
}
// Check if we need to load more songs
j.ensurePlaylist()
// Get the next song to play
song := j.getNextSong()
if song == nil {
// No songs available
time.Sleep(1 * time.Second)
continue
}
j.playingMutex.Lock()
j.currentSong = song
j.currentSongStarted = time.Now()
j.playingMutex.Unlock()
// Add song to history when it starts playing
j.addToHistory(song)
// Update Discord status with the current song information in format "Artist - Title (Album)"
var statusText string
if song.Album != "" {
statusText = fmt.Sprintf("%s - %s (%s)", song.Artist, song.Title, song.Album)
} else {
statusText = fmt.Sprintf("%s - %s", song.Artist, song.Title)
}
// Truncate if too long for Discord status (128 char limit)
if len(statusText) > 128 {
statusText = statusText[:125] + "..."
}
statusErr := j.bot.session.UpdateGameStatus(0, statusText)
if statusErr != nil {
slog.Warn("Failed to update Discord status", "error", statusErr)
} else {
slog.Debug("Updated Discord status", "status", statusText)
}
// Announce the song in the voice channel
if j.bot.voiceConn != nil && j.bot.voiceConn.Ready {
slog.Info("Now playing",
"artist", song.Artist,
"title", song.Title,
"album", song.Album,
"id", song.ID,
"duration", song.Duration,
"path", song.Path)
}
// Play the song
err := j.playSong(song)
if err != nil {
slog.Error("Error playing song", "error", err)
time.Sleep(1 * time.Second)
}
}
}
// ensurePlaylist ensures that the playlist has songs
func (j *JukeboxPlayer) ensurePlaylist() {
j.playlistMutex.Lock()
defer j.playlistMutex.Unlock()
// If we have songs in the playlist, we're good
if len(j.playlist) > 0 {
return
}
// Fetch random songs from Subsonic
songs, err := j.bot.subsonic.GetRandomSongs(10)
if err != nil {
slog.Error("Error getting random songs", "error", err)
return
}
j.playlist = songs
}
// getNextSong gets the next song from the playlist
func (j *JukeboxPlayer) getNextSong() *subsonic.Song {
j.playlistMutex.Lock()
defer j.playlistMutex.Unlock()
if len(j.playlist) == 0 {
return nil
}
// Get the first song
song := j.playlist[0]
// Remove it from the playlist
j.playlist = j.playlist[1:]
return &song
}
// playSong plays a song over the voice connection
func (j *JukeboxPlayer) playSong(song *subsonic.Song) error {
// Check if voice connection is ready, and attempt to reconnect if needed
if j.bot.voiceConn == nil || !j.bot.voiceConn.Ready {
slog.Warn("Voice connection not ready, attempting to restore it")
// If we have guild ID and channel ID available, try to reconnect
if j.bot.voiceConn != nil && j.bot.voiceConn.GuildID != "" && j.bot.voiceConn.ChannelID != "" {
err := j.bot.JoinVoiceChannel(j.bot.voiceConn.GuildID, j.bot.voiceConn.ChannelID)
if err != nil {
return fmt.Errorf("failed to restore voice connection: %w", err)
}
slog.Info("Successfully restored voice connection")
} else {
return fmt.Errorf("voice connection not ready and cannot be restored")
}
}
// Get the stream URL (raw format for better compatibility)
streamURL := j.bot.subsonic.GetRawStreamURL(song.ID)
slog.Debug("Attempting to play song with direct FFmpeg method", "url", streamURL)
// Check if ffmpeg is available
ffmpegPath, err := exec.LookPath("ffmpeg")
if err != nil || ffmpegPath == "" {
return fmt.Errorf("ffmpeg not found, required for audio streaming: %w", err)
}
// Create a context that can be cancelled when skipping songs
streamCtx, cancelStream := context.WithCancel(context.Background())
// We'll cancel this context when the song ends or is skipped
j.bot.mu.Lock()
j.currentStreamCancel = cancelStream
j.bot.mu.Unlock()
// Make sure we clean up our context if we exit
defer func() {
j.bot.mu.Lock()
// No need to check equality, just clean up if we have a cancellation function
if j.currentStreamCancel != nil {
j.currentStreamCancel = nil
}
j.bot.mu.Unlock()
cancelStream()
}()
// Create an opusEncoder and begin speaking
slog.Debug("Setting Discord voice status to Speaking")
err = j.bot.voiceConn.Speaking(true)
if err != nil {
slog.Warn("Failed to set speaking state", "error", err)
// Try to recover the voice connection
if j.bot.voiceConn != nil && j.bot.voiceConn.GuildID != "" && j.bot.voiceConn.ChannelID != "" {
err = j.bot.JoinVoiceChannel(j.bot.voiceConn.GuildID, j.bot.voiceConn.ChannelID)
if err != nil {
slog.Error("Failed to reconnect to voice channel", "error", err)
} else {
err = j.bot.voiceConn.Speaking(true)
if err != nil {
slog.Error("Still failed to set speaking state after reconnection", "error", err)
}
}
}
}
defer j.bot.voiceConn.Speaking(false)
// Create FFmpeg command with optimized parameters for Discord streaming
cmd := exec.CommandContext(streamCtx, ffmpegPath,
"-hide_banner", // Reduce console output
"-loglevel", "warning", // Only show warnings and errors
"-reconnect", "1", // Allow reconnection
"-reconnect_streamed", "1", // Reconnect to streamed resources
"-reconnect_delay_max", "5", // Maximum delay between reconnection attempts (seconds)
"-i", streamURL, // Input from Subsonic stream URL
"-acodec", "pcm_s16le",
"-f", "s16le",
"-ar", "48000",
"-ac", "2",
"-")
// Get the stdout and stderr pipes
stdout, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("error creating stdout pipe: %w", err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
return fmt.Errorf("error creating stderr pipe: %w", err)
}
// Start the FFmpeg process
if err := cmd.Start(); err != nil {
return fmt.Errorf("error starting FFmpeg: %w", err)
}
// Ensure we clean up the FFmpeg process
defer func() {
if cmd.Process != nil {
cmd.Process.Kill()
cmd.Wait()
}
}()
// Monitor stderr for debugging
go func() {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
// Check if we've been cancelled
select {
case <-streamCtx.Done():
return
default:
line := scanner.Text()
if len(line) > 0 {
slog.Debug("FFmpeg output", "message", line)
}
}
}
}()
// Use this to signal that streaming has started
frameSent := make(chan struct{}, 1)
// For signaling errors or completion
done := make(chan error, 1)
// Create Opus encoder
opusEncoder, err := gopus.NewEncoder(48000, 2, gopus.Audio)
if err != nil {
return fmt.Errorf("failed to create opus encoder: %w", err)
}
// Set the bitrate
opusEncoder.SetBitrate(64000) // 64 kbps is sufficient for music over Discord
// Buffer for reading PCM data
pcmBuffer := make([]int16, 960*2) // 960 PCM samples * 2 channels
opusBuffer := make([]byte, 1000) // Buffer for opus data
go func() {
defer close(done)
slog.Debug("Starting direct PCM to Opus conversion")
framesSent := 0
firstFrameSent := false
// Read from stdout, encode to opus, send to Discord
for {
// Check if we should stop
select {
case <-j.bot.stopChan:
slog.Debug("Audio streaming interrupted by stop channel")
return
case <-streamCtx.Done():
slog.Debug("Audio streaming interrupted by context cancellation")
return
default:
// Continue streaming
}
// Read raw PCM data with a timeout
readDone := make(chan error, 1)
go func() {
err := binary.Read(stdout, binary.LittleEndian, pcmBuffer)
readDone <- err
}()
// Wait for read or cancellation
select {
case err := <-readDone:
if err == io.EOF {
if framesSent == 0 {
done <- fmt.Errorf("stream ended without producing any audio frames")
} else {
slog.Debug("End of audio stream reached", "total_frames", framesSent)
}
return
}
if err != nil {
// Check if context was cancelled or if this is a closed file error (which is normal during skips)
select {
case <-streamCtx.Done():
slog.Debug("PCM read interrupted by context cancellation")
return
case <-j.bot.stopChan:
slog.Debug("PCM read interrupted by stop channel")
return
default:
// Only log warnings for errors that aren't related to normal stream endings
errMsg := err.Error()
if !strings.Contains(errMsg, "file already closed") &&
!strings.Contains(errMsg, "unexpected EOF") {
slog.Warn("Error reading PCM data", "error", err)
}
time.Sleep(20 * time.Millisecond)
continue
}
}
case <-streamCtx.Done():
slog.Debug("PCM read interrupted by context cancellation while reading")
return
case <-j.bot.stopChan:
slog.Debug("PCM read interrupted by stop channel while reading")
return
}
// Encode the PCM data to Opus
opus, err := opusEncoder.Encode(pcmBuffer, 960, len(opusBuffer))
if err != nil {
slog.Warn("Failed to encode PCM to Opus", "error", err)
time.Sleep(20 * time.Millisecond)
continue
}
// Send the Opus data to Discord
select {
case j.bot.voiceConn.OpusSend <- opus:
framesSent++
// Signal that the first frame was sent
if !firstFrameSent {
firstFrameSent = true
select {
case frameSent <- struct{}{}:
default:
// Channel buffer full, no need to block
}
}
// Log progress
if framesSent%250 == 0 {
slog.Debug("Audio streaming progress", "frames_sent", framesSent)
}
case <-j.bot.stopChan:
return
case <-streamCtx.Done():
return
case <-time.After(50 * time.Millisecond):
// Timeout, try again with the next frame
continue
}
// Control the frame timing (20ms per frame)
time.Sleep(15 * time.Millisecond)
}
}()
// Wait for the first frame to be sent or a timeout
select {
case <-frameSent:
slog.Info("Audio streaming started successfully")
case err := <-done:
if err != nil {
return fmt.Errorf("failed to start audio streaming: %w", err)
}
case <-time.After(10 * time.Second):
return fmt.Errorf("timeout waiting for first audio frame to be sent")
}
// Wait for the song to finish or be interrupted
slog.Debug("Waiting for song to complete or be interrupted")
// Create a timeout for the entire song (based on song duration plus a margin)
songDurationSecs := song.Duration
if songDurationSecs <= 0 {
// If we don't have a proper duration, use a default max duration
songDurationSecs = 600 // 10 minutes maximum
}
// Add a 30-second margin for buffering and network delays
songTimeout := time.Duration(songDurationSecs+30) * time.Second
select {
case err := <-done:
if err != nil {
slog.Error("Song ended with error", "error", err)
} else {
slog.Debug("Song completed successfully")
}
return err
case <-j.bot.stopChan:
slog.Debug("Song playback interrupted")
return nil
case <-time.After(songTimeout):
slog.Warn("Song playback exceeded maximum duration",
"expected_duration_seconds", songDurationSecs,
"timeout_seconds", songTimeout/time.Second)
return fmt.Errorf("song playback timed out after %d seconds", songTimeout/time.Second)
}
}
// skipCurrentSong skips the current song without stopping the playback
func (j *JukeboxPlayer) skipCurrentSong() {
// Signal to stop the current song only but keep overall playback going
j.bot.mu.Lock()
if j.bot.playing {
// Cancel the current stream context if it exists
if j.currentStreamCancel != nil {
slog.Debug("Cancelling current stream context")
j.currentStreamCancel()
j.currentStreamCancel = nil
}
// Also close the stop channel for compatibility with other parts of the code
close(j.bot.stopChan)
j.bot.stopChan = make(chan struct{})
slog.Debug("Skip signal sent - current song will stop")
// Show "Skipping..." status while transitioning between songs
j.bot.session.UpdateGameStatus(0, "Skipping to next track...")
} else {
slog.Debug("Skip requested but jukebox is not playing")
}
j.bot.mu.Unlock()
}
// handleInfo handles the info command
func (j *JukeboxPlayer) handleInfo(s *discordgo.Session, i *discordgo.InteractionCreate) {
// Acknowledge the interaction immediately
err := s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{
Type: discordgo.InteractionResponseDeferredChannelMessageWithSource,
})
if err != nil {
slog.Error("Failed to acknowledge interaction", "error", err)
return
}
// Get the current song information
j.playingMutex.Lock()
currentSong := j.currentSong
j.playingMutex.Unlock()
// Check if the bot is playing and has a voice connection
isPlaying := j.bot.IsPlaying() && j.bot.voiceConn != nil && j.bot.voiceConn.Ready
if currentSong == nil || !isPlaying {
// No song is playing, inform the user
content := "No song is currently playing. Start the jukebox with `/jukebox play`!"
_, err = s.InteractionResponseEdit(i.Interaction, &discordgo.WebhookEdit{
Content: &content,
})
if err != nil {
slog.Error("Failed to send 'no song playing' message", "error", err)
}
return
}
// Create an embed with song information
embed := &discordgo.MessageEmbed{
Title: "Currently Playing",
Description: fmt.Sprintf("**%s**", currentSong.Title),
Color: 0x1DB954, // Spotify green color
Fields: []*discordgo.MessageEmbedField{
{
Name: "Artist",
Value: currentSong.Artist,
Inline: true,
},
{
Name: "Album",
Value: currentSong.Album,
Inline: true,
},
},
Timestamp: time.Now().Format(time.RFC3339),
Footer: &discordgo.MessageEmbedFooter{
Text: "Discord Jukebox Bot",
},
}
// Add duration field if available
if currentSong.Duration > 0 {
minutes := currentSong.Duration / 60
seconds := currentSong.Duration % 60
embed.Fields = append(embed.Fields, &discordgo.MessageEmbedField{
Name: "Duration",
Value: fmt.Sprintf("%d:%02d", minutes, seconds),
Inline: true,
})
}
// Add genre field if available
if currentSong.Genre != "" {
embed.Fields = append(embed.Fields, &discordgo.MessageEmbedField{
Name: "Genre",
Value: currentSong.Genre,
Inline: true,
})
}
// Add year field if available
if currentSong.Year > 0 {
embed.Fields = append(embed.Fields, &discordgo.MessageEmbedField{
Name: "Year",
Value: fmt.Sprintf("%d", currentSong.Year),
Inline: true,
})
}
// Add cover art if available
if currentSong.CoverArt != "" {
coverArtURL := j.bot.subsonic.GetCoverArtURL(currentSong.CoverArt)
embed.Thumbnail = &discordgo.MessageEmbedThumbnail{
URL: strings.Replace(coverArtURL, ".fmartingr.dev", "", -1),
}
}
// Send the embed response
embeds := []*discordgo.MessageEmbed{embed}
_, err = s.InteractionResponseEdit(i.Interaction, &discordgo.WebhookEdit{
Embeds: &embeds,
})
if err != nil {
slog.Error("Failed to send song info message", "error", err)
}
}
// GetCurrentSong returns the current song
func (j *JukeboxPlayer) GetCurrentSong() *subsonic.Song {
j.playingMutex.Lock()
defer j.playingMutex.Unlock()
return j.currentSong
}
// GetCurrentSongInfo returns the current song and its start time
func (j *JukeboxPlayer) GetCurrentSongInfo() (*subsonic.Song, time.Time) {
j.playingMutex.Lock()
defer j.playingMutex.Unlock()
return j.currentSong, j.currentSongStarted
}
// addToHistory adds a song to the playback history
func (j *JukeboxPlayer) addToHistory(song *subsonic.Song) {
if song == nil {
return
}
j.historyMutex.Lock()
defer j.historyMutex.Unlock()
// Avoid duplicate entries at the beginning of history
if len(j.songHistory) > 0 && j.songHistory[0] != nil &&
j.songHistory[0].ID == song.ID {
return
}
// Add the song to the beginning of the history list
j.songHistory = append([]*subsonic.Song{song}, j.songHistory...)
// Trim the history if it exceeds the maximum size
if len(j.songHistory) > j.maxHistorySize {
j.songHistory = j.songHistory[:j.maxHistorySize]
}
}
// GetSongHistory returns a copy of the playback history
func (j *JukeboxPlayer) GetSongHistory() []*subsonic.Song {
j.historyMutex.RLock()
defer j.historyMutex.RUnlock()
// Ensure we don't return a nil slice even if no history yet
if j.songHistory == nil {
return make([]*subsonic.Song, 0)
}
// Create a copy of the history to return
history := make([]*subsonic.Song, len(j.songHistory))
copy(history, j.songHistory)
return history
}