package discord import ( "bufio" "context" "encoding/binary" "fmt" "io" "log/slog" "os/exec" "strings" "sync" "time" "discord-jukebox-bot/pkg/subsonic" "github.com/bwmarrin/discordgo" "layeh.com/gopus" ) // JukeboxPlayer handles the music playback functionality type JukeboxPlayer struct { bot *Bot currentSong *subsonic.Song playlist []subsonic.Song playlistMutex sync.Mutex playingMutex sync.Mutex currentStreamCancel context.CancelFunc songHistory []*subsonic.Song historyMutex sync.RWMutex maxHistorySize int } // NewJukeboxPlayer creates a new jukebox player func NewJukeboxPlayer(bot *Bot) *JukeboxPlayer { jukebox := &JukeboxPlayer{ bot: bot, playlist: make([]subsonic.Song, 0), songHistory: make([]*subsonic.Song, 0), maxHistorySize: 20, // Store the last 20 songs } // Register command handlers bot.RegisterCommand("play", jukebox.handlePlay) bot.RegisterCommand("stop", jukebox.handleStop) bot.RegisterCommand("skip", jukebox.handleSkip) bot.RegisterCommand("info", jukebox.handleInfo) return jukebox } // handlePlay handles the play command func (j *JukeboxPlayer) handlePlay(s *discordgo.Session, i *discordgo.InteractionCreate) { // Acknowledge the interaction immediately err := s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{ Type: discordgo.InteractionResponseDeferredChannelMessageWithSource, }) if err != nil { slog.Error("Error responding to interaction", "error", err) return } // Find the voice channel the user is in var channelID string if i.GuildID != "" { // Find the voice state of the user vs, err := s.State.VoiceState(i.GuildID, i.Member.User.ID) if err == nil && vs != nil && vs.ChannelID != "" { channelID = vs.ChannelID } } // If we couldn't find the user's voice channel if channelID == "" { content := "You need to be in a voice channel to use this command." _, err = s.InteractionResponseEdit(i.Interaction, &discordgo.WebhookEdit{ Content: &content, }) if err != nil { slog.Error("Error editing interaction response", "error", err) } return } // Join the voice channel err = j.bot.JoinVoiceChannel(i.GuildID, channelID) if err != nil { content := "Failed to join voice channel: " + err.Error() _, err = s.InteractionResponseEdit(i.Interaction, &discordgo.WebhookEdit{ Content: &content, }) if err != nil { slog.Error("Error editing interaction response", "error", err) } return } // Start playing music if not already playing if !j.bot.IsPlaying() { go j.startPlaying() content := "🎵 Jukebox started! Random songs will be played from your Subsonic library." _, err = s.InteractionResponseEdit(i.Interaction, &discordgo.WebhookEdit{ Content: &content, }) } else { content := "🎵 Jukebox is already playing!" _, err = s.InteractionResponseEdit(i.Interaction, &discordgo.WebhookEdit{ Content: &content, }) } if err != nil { slog.Error("Error editing interaction response", "error", err) } } // handleStop handles the stop command func (j *JukeboxPlayer) handleStop(s *discordgo.Session, i *discordgo.InteractionCreate) { // Acknowledge the interaction immediately err := s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{ Type: discordgo.InteractionResponseChannelMessageWithSource, Data: &discordgo.InteractionResponseData{ Content: "Stopping jukebox...", }, }) if err != nil { slog.Error("Error responding to interaction", "error", err) return } // First stop the music j.bot.stopPlaying() // Then leave the voice channel j.bot.leaveVoiceChannel() // Clear the Discord status when stopping j.bot.session.UpdateGameStatus(0, "") // Update the response content := "🛑 Jukebox stopped." _, err = s.InteractionResponseEdit(i.Interaction, &discordgo.WebhookEdit{ Content: &content, }) if err != nil { slog.Error("Error editing interaction response", "error", err) } } // handleSkip handles the skip command func (j *JukeboxPlayer) handleSkip(s *discordgo.Session, i *discordgo.InteractionCreate) { // Acknowledge the interaction immediately err := s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{ Type: discordgo.InteractionResponseChannelMessageWithSource, Data: &discordgo.InteractionResponseData{ Content: "Skipping to next song...", }, }) if err != nil { slog.Error("Error responding to interaction", "error", err) return } if !j.bot.IsPlaying() { content := "Jukebox is not currently playing." _, err = s.InteractionResponseEdit(i.Interaction, &discordgo.WebhookEdit{ Content: &content, }) if err != nil { slog.Error("Error editing interaction response", "error", err) } return } // Get the current song info for the response message currentSong := j.GetCurrentSong() songInfo := "" if currentSong != nil { songInfo = fmt.Sprintf(" (\"%s\" by %s)", currentSong.Title, currentSong.Artist) } // Skip the current song without stopping the playback j.skipCurrentSong() // Update the response content := fmt.Sprintf("⏭️ Skipped current song%s. Loading next track...", songInfo) _, err = s.InteractionResponseEdit(i.Interaction, &discordgo.WebhookEdit{ Content: &content, }) if err != nil { slog.Error("Error editing interaction response", "error", err) } } // startPlaying starts the jukebox playback func (j *JukeboxPlayer) startPlaying() { j.bot.mu.Lock() if j.bot.playing { slog.Debug("Jukebox is already playing, ignoring play request") j.bot.mu.Unlock() return } j.bot.playing = true stopChan := j.bot.stopChan j.bot.mu.Unlock() // Make sure the voice connection is speaking if j.bot.voiceConn != nil && j.bot.voiceConn.Ready { err := j.bot.voiceConn.Speaking(true) if err != nil { slog.Warn("Failed to set speaking state", "error", err) // Try to reconnect voice if j.bot.voiceConn != nil && j.bot.voiceConn.GuildID != "" && j.bot.voiceConn.ChannelID != "" { err = j.bot.JoinVoiceChannel(j.bot.voiceConn.GuildID, j.bot.voiceConn.ChannelID) if err != nil { slog.Error("Failed to restore voice connection", "error", err) // Continue anyway, we'll try again when playing song } } } } slog.Info("Starting jukebox playback") for { // For each iteration, get the latest stop channel j.bot.mu.Lock() localStopChan := j.bot.stopChan isPlaying := j.bot.playing j.bot.mu.Unlock() // If we're completely stopped, exit the playback loop if !isPlaying { slog.Info("Jukebox playback stopped") // Clear current song when stopping j.playingMutex.Lock() j.currentSong = nil j.playingMutex.Unlock() return } stopChan = localStopChan // Check if we should stop select { case <-stopChan: // Check if we're still supposed to be playing (complete stop vs. skip) j.bot.mu.Lock() stillPlaying := j.bot.playing j.bot.mu.Unlock() if !stillPlaying { return // Completely stop playback } slog.Info("Skipping to next song") // Otherwise, continue to the next song (was a skip) default: // Continue playing } // Check if we need to load more songs j.ensurePlaylist() // Get the next song to play song := j.getNextSong() if song == nil { // No songs available time.Sleep(1 * time.Second) continue } j.playingMutex.Lock() j.currentSong = song j.playingMutex.Unlock() // Add song to history when it starts playing j.addToHistory(song) // Update Discord status with the current song information in format "Artist - Title (Album)" var statusText string if song.Album != "" { statusText = fmt.Sprintf("%s - %s (%s)", song.Artist, song.Title, song.Album) } else { statusText = fmt.Sprintf("%s - %s", song.Artist, song.Title) } // Truncate if too long for Discord status (128 char limit) if len(statusText) > 128 { statusText = statusText[:125] + "..." } statusErr := j.bot.session.UpdateGameStatus(0, statusText) if statusErr != nil { slog.Warn("Failed to update Discord status", "error", statusErr) } else { slog.Debug("Updated Discord status", "status", statusText) } // Announce the song in the voice channel if j.bot.voiceConn != nil && j.bot.voiceConn.Ready { slog.Info("Now playing", "artist", song.Artist, "title", song.Title, "album", song.Album, "id", song.ID, "duration", song.Duration, "path", song.Path) } // Play the song err := j.playSong(song) if err != nil { slog.Error("Error playing song", "error", err) time.Sleep(1 * time.Second) } } } // ensurePlaylist ensures that the playlist has songs func (j *JukeboxPlayer) ensurePlaylist() { j.playlistMutex.Lock() defer j.playlistMutex.Unlock() // If we have songs in the playlist, we're good if len(j.playlist) > 0 { return } // Fetch random songs from Subsonic songs, err := j.bot.subsonic.GetRandomSongs(10) if err != nil { slog.Error("Error getting random songs", "error", err) return } j.playlist = songs } // getNextSong gets the next song from the playlist func (j *JukeboxPlayer) getNextSong() *subsonic.Song { j.playlistMutex.Lock() defer j.playlistMutex.Unlock() if len(j.playlist) == 0 { return nil } // Get the first song song := j.playlist[0] // Remove it from the playlist j.playlist = j.playlist[1:] return &song } // playSong plays a song over the voice connection func (j *JukeboxPlayer) playSong(song *subsonic.Song) error { // Check if voice connection is ready, and attempt to reconnect if needed if j.bot.voiceConn == nil || !j.bot.voiceConn.Ready { slog.Warn("Voice connection not ready, attempting to restore it") // If we have guild ID and channel ID available, try to reconnect if j.bot.voiceConn != nil && j.bot.voiceConn.GuildID != "" && j.bot.voiceConn.ChannelID != "" { err := j.bot.JoinVoiceChannel(j.bot.voiceConn.GuildID, j.bot.voiceConn.ChannelID) if err != nil { return fmt.Errorf("failed to restore voice connection: %w", err) } slog.Info("Successfully restored voice connection") } else { return fmt.Errorf("voice connection not ready and cannot be restored") } } // Get the stream URL (raw format for better compatibility) streamURL := j.bot.subsonic.GetRawStreamURL(song.ID) slog.Debug("Attempting to play song with direct FFmpeg method", "url", streamURL) // Check if ffmpeg is available ffmpegPath, err := exec.LookPath("ffmpeg") if err != nil || ffmpegPath == "" { return fmt.Errorf("ffmpeg not found, required for audio streaming: %w", err) } // Create a context that can be cancelled when skipping songs streamCtx, cancelStream := context.WithCancel(context.Background()) // We'll cancel this context when the song ends or is skipped j.bot.mu.Lock() j.currentStreamCancel = cancelStream j.bot.mu.Unlock() // Make sure we clean up our context if we exit defer func() { j.bot.mu.Lock() // No need to check equality, just clean up if we have a cancellation function if j.currentStreamCancel != nil { j.currentStreamCancel = nil } j.bot.mu.Unlock() cancelStream() }() // Create an opusEncoder and begin speaking slog.Debug("Setting Discord voice status to Speaking") err = j.bot.voiceConn.Speaking(true) if err != nil { slog.Warn("Failed to set speaking state", "error", err) // Try to recover the voice connection if j.bot.voiceConn != nil && j.bot.voiceConn.GuildID != "" && j.bot.voiceConn.ChannelID != "" { err = j.bot.JoinVoiceChannel(j.bot.voiceConn.GuildID, j.bot.voiceConn.ChannelID) if err != nil { slog.Error("Failed to reconnect to voice channel", "error", err) } else { err = j.bot.voiceConn.Speaking(true) if err != nil { slog.Error("Still failed to set speaking state after reconnection", "error", err) } } } } defer j.bot.voiceConn.Speaking(false) // Create FFmpeg command with optimized parameters for Discord streaming cmd := exec.CommandContext(streamCtx, ffmpegPath, "-hide_banner", // Reduce console output "-loglevel", "warning", // Only show warnings and errors "-reconnect", "1", // Allow reconnection "-reconnect_streamed", "1", // Reconnect to streamed resources "-reconnect_delay_max", "5", // Maximum delay between reconnection attempts (seconds) "-i", streamURL, // Input from Subsonic stream URL "-acodec", "pcm_s16le", "-f", "s16le", "-ar", "48000", "-ac", "2", "-") // Get the stdout and stderr pipes stdout, err := cmd.StdoutPipe() if err != nil { return fmt.Errorf("error creating stdout pipe: %w", err) } stderr, err := cmd.StderrPipe() if err != nil { return fmt.Errorf("error creating stderr pipe: %w", err) } // Start the FFmpeg process if err := cmd.Start(); err != nil { return fmt.Errorf("error starting FFmpeg: %w", err) } // Ensure we clean up the FFmpeg process defer func() { if cmd.Process != nil { cmd.Process.Kill() cmd.Wait() } }() // Monitor stderr for debugging go func() { scanner := bufio.NewScanner(stderr) for scanner.Scan() { // Check if we've been cancelled select { case <-streamCtx.Done(): return default: line := scanner.Text() if len(line) > 0 { slog.Debug("FFmpeg output", "message", line) } } } }() // Use this to signal that streaming has started frameSent := make(chan struct{}, 1) // For signaling errors or completion done := make(chan error, 1) // Create Opus encoder opusEncoder, err := gopus.NewEncoder(48000, 2, gopus.Audio) if err != nil { return fmt.Errorf("failed to create opus encoder: %w", err) } // Set the bitrate opusEncoder.SetBitrate(64000) // 64 kbps is sufficient for music over Discord // Buffer for reading PCM data pcmBuffer := make([]int16, 960*2) // 960 PCM samples * 2 channels opusBuffer := make([]byte, 1000) // Buffer for opus data go func() { defer close(done) slog.Debug("Starting direct PCM to Opus conversion") framesSent := 0 firstFrameSent := false // Read from stdout, encode to opus, send to Discord for { // Check if we should stop select { case <-j.bot.stopChan: slog.Debug("Audio streaming interrupted by stop channel") return case <-streamCtx.Done(): slog.Debug("Audio streaming interrupted by context cancellation") return default: // Continue streaming } // Read raw PCM data with a timeout readDone := make(chan error, 1) go func() { err := binary.Read(stdout, binary.LittleEndian, pcmBuffer) readDone <- err }() // Wait for read or cancellation select { case err := <-readDone: if err == io.EOF { if framesSent == 0 { done <- fmt.Errorf("stream ended without producing any audio frames") } else { slog.Debug("End of audio stream reached", "total_frames", framesSent) } return } if err != nil { // Check if context was cancelled or if this is a closed file error (which is normal during skips) select { case <-streamCtx.Done(): slog.Debug("PCM read interrupted by context cancellation") return case <-j.bot.stopChan: slog.Debug("PCM read interrupted by stop channel") return default: // Only log warnings for errors that aren't related to normal stream endings errMsg := err.Error() if !strings.Contains(errMsg, "file already closed") && !strings.Contains(errMsg, "unexpected EOF") { slog.Warn("Error reading PCM data", "error", err) } time.Sleep(20 * time.Millisecond) continue } } case <-streamCtx.Done(): slog.Debug("PCM read interrupted by context cancellation while reading") return case <-j.bot.stopChan: slog.Debug("PCM read interrupted by stop channel while reading") return } // Encode the PCM data to Opus opus, err := opusEncoder.Encode(pcmBuffer, 960, len(opusBuffer)) if err != nil { slog.Warn("Failed to encode PCM to Opus", "error", err) time.Sleep(20 * time.Millisecond) continue } // Send the Opus data to Discord select { case j.bot.voiceConn.OpusSend <- opus: framesSent++ // Signal that the first frame was sent if !firstFrameSent { firstFrameSent = true select { case frameSent <- struct{}{}: default: // Channel buffer full, no need to block } } // Log progress if framesSent%250 == 0 { slog.Debug("Audio streaming progress", "frames_sent", framesSent) } case <-j.bot.stopChan: return case <-streamCtx.Done(): return case <-time.After(50 * time.Millisecond): // Timeout, try again with the next frame continue } // Control the frame timing (20ms per frame) time.Sleep(15 * time.Millisecond) } }() // Wait for the first frame to be sent or a timeout select { case <-frameSent: slog.Info("Audio streaming started successfully") case err := <-done: if err != nil { return fmt.Errorf("failed to start audio streaming: %w", err) } case <-time.After(10 * time.Second): return fmt.Errorf("timeout waiting for first audio frame to be sent") } // Wait for the song to finish or be interrupted slog.Debug("Waiting for song to complete or be interrupted") // Create a timeout for the entire song (based on song duration plus a margin) songDurationSecs := song.Duration if songDurationSecs <= 0 { // If we don't have a proper duration, use a default max duration songDurationSecs = 600 // 10 minutes maximum } // Add a 30-second margin for buffering and network delays songTimeout := time.Duration(songDurationSecs+30) * time.Second select { case err := <-done: if err != nil { slog.Error("Song ended with error", "error", err) } else { slog.Debug("Song completed successfully") } return err case <-j.bot.stopChan: slog.Debug("Song playback interrupted") return nil case <-time.After(songTimeout): slog.Warn("Song playback exceeded maximum duration", "expected_duration_seconds", songDurationSecs, "timeout_seconds", songTimeout/time.Second) return fmt.Errorf("song playback timed out after %d seconds", songTimeout/time.Second) } } // skipCurrentSong skips the current song without stopping the playback func (j *JukeboxPlayer) skipCurrentSong() { // Signal to stop the current song only but keep overall playback going j.bot.mu.Lock() if j.bot.playing { // Cancel the current stream context if it exists if j.currentStreamCancel != nil { slog.Debug("Cancelling current stream context") j.currentStreamCancel() j.currentStreamCancel = nil } // Also close the stop channel for compatibility with other parts of the code close(j.bot.stopChan) j.bot.stopChan = make(chan struct{}) slog.Debug("Skip signal sent - current song will stop") // Show "Skipping..." status while transitioning between songs j.bot.session.UpdateGameStatus(0, "Skipping to next track...") } else { slog.Debug("Skip requested but jukebox is not playing") } j.bot.mu.Unlock() } // handleInfo handles the info command func (j *JukeboxPlayer) handleInfo(s *discordgo.Session, i *discordgo.InteractionCreate) { // Acknowledge the interaction immediately err := s.InteractionRespond(i.Interaction, &discordgo.InteractionResponse{ Type: discordgo.InteractionResponseDeferredChannelMessageWithSource, }) if err != nil { slog.Error("Failed to acknowledge interaction", "error", err) return } // Get the current song information j.playingMutex.Lock() currentSong := j.currentSong j.playingMutex.Unlock() // Check if the bot is playing and has a voice connection isPlaying := j.bot.IsPlaying() && j.bot.voiceConn != nil && j.bot.voiceConn.Ready if currentSong == nil || !isPlaying { // No song is playing, inform the user content := "No song is currently playing. Start the jukebox with `/jukebox play`!" _, err = s.InteractionResponseEdit(i.Interaction, &discordgo.WebhookEdit{ Content: &content, }) if err != nil { slog.Error("Failed to send 'no song playing' message", "error", err) } return } // Create an embed with song information embed := &discordgo.MessageEmbed{ Title: "Currently Playing", Description: fmt.Sprintf("**%s**", currentSong.Title), Color: 0x1DB954, // Spotify green color Fields: []*discordgo.MessageEmbedField{ { Name: "Artist", Value: currentSong.Artist, Inline: true, }, { Name: "Album", Value: currentSong.Album, Inline: true, }, }, Timestamp: time.Now().Format(time.RFC3339), Footer: &discordgo.MessageEmbedFooter{ Text: "Discord Jukebox Bot", }, } // Add duration field if available if currentSong.Duration > 0 { minutes := currentSong.Duration / 60 seconds := currentSong.Duration % 60 embed.Fields = append(embed.Fields, &discordgo.MessageEmbedField{ Name: "Duration", Value: fmt.Sprintf("%d:%02d", minutes, seconds), Inline: true, }) } // Add genre field if available if currentSong.Genre != "" { embed.Fields = append(embed.Fields, &discordgo.MessageEmbedField{ Name: "Genre", Value: currentSong.Genre, Inline: true, }) } // Add year field if available if currentSong.Year > 0 { embed.Fields = append(embed.Fields, &discordgo.MessageEmbedField{ Name: "Year", Value: fmt.Sprintf("%d", currentSong.Year), Inline: true, }) } // Add cover art if available if currentSong.CoverArt != "" { coverArtURL := j.bot.subsonic.GetCoverArtURL(currentSong.CoverArt) embed.Thumbnail = &discordgo.MessageEmbedThumbnail{ URL: strings.Replace(coverArtURL, ".fmartingr.dev", "", -1), } } // Send the embed response embeds := []*discordgo.MessageEmbed{embed} _, err = s.InteractionResponseEdit(i.Interaction, &discordgo.WebhookEdit{ Embeds: &embeds, }) if err != nil { slog.Error("Failed to send song info message", "error", err) } } // GetCurrentSong returns the current song func (j *JukeboxPlayer) GetCurrentSong() *subsonic.Song { j.playingMutex.Lock() defer j.playingMutex.Unlock() return j.currentSong } // addToHistory adds a song to the playback history func (j *JukeboxPlayer) addToHistory(song *subsonic.Song) { if song == nil { return } j.historyMutex.Lock() defer j.historyMutex.Unlock() // Add the song to the beginning of the history list j.songHistory = append([]*subsonic.Song{song}, j.songHistory...) // Trim the history if it exceeds the maximum size if len(j.songHistory) > j.maxHistorySize { j.songHistory = j.songHistory[:j.maxHistorySize] } } // GetSongHistory returns a copy of the playback history func (j *JukeboxPlayer) GetSongHistory() []*subsonic.Song { j.historyMutex.RLock() defer j.historyMutex.RUnlock() // Create a copy of the history to return history := make([]*subsonic.Song, len(j.songHistory)) copy(history, j.songHistory) return history }