Terminate all MPV instances when stopping Navidrome (#3008)

* Terminate all mpv instances when stopping Navidrome

* Exit trackSwitcher goroutine when terminating

* Remove potential race condition when starting the Playback device

* Fix lint error

* Removed unused and unneeded vars/functions

* Use device short name in log

* Small refactor

* Small nitpick

* Make start functions more uniform
This commit is contained in:
Deluan Quintão 2024-05-09 06:57:24 -04:00 committed by GitHub
parent 677d9947f3
commit 6408dda948
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 59 additions and 79 deletions

View file

@ -83,11 +83,8 @@ func runNavidrome() {
g.Go(startServer(ctx)) g.Go(startServer(ctx))
g.Go(startSignaller(ctx)) g.Go(startSignaller(ctx))
g.Go(startScheduler(ctx)) g.Go(startScheduler(ctx))
g.Go(schedulePeriodicScan(ctx))
if conf.Server.Jukebox.Enabled {
g.Go(startPlaybackServer(ctx)) g.Go(startPlaybackServer(ctx))
} g.Go(schedulePeriodicScan(ctx))
if err := g.Wait(); err != nil && !errors.Is(err, interrupted) { if err := g.Wait(); err != nil && !errors.Is(err, interrupted) {
log.Error("Fatal error in Navidrome. Aborting", err) log.Error("Fatal error in Navidrome. Aborting", err)
@ -151,21 +148,21 @@ func schedulePeriodicScan(ctx context.Context) func() error {
} }
func startScheduler(ctx context.Context) func() error { func startScheduler(ctx context.Context) func() error {
return func() error {
log.Info(ctx, "Starting scheduler") log.Info(ctx, "Starting scheduler")
schedulerInstance := scheduler.GetInstance() schedulerInstance := scheduler.GetInstance()
return func() error {
schedulerInstance.Run(ctx) schedulerInstance.Run(ctx)
return nil return nil
} }
} }
func startPlaybackServer(ctx context.Context) func() error { func startPlaybackServer(ctx context.Context) func() error {
log.Info(ctx, "Starting playback server")
playbackInstance := GetPlaybackServer()
return func() error { return func() error {
if !conf.Server.Jukebox.Enabled {
return nil
}
log.Info(ctx, "Starting playback server")
playbackInstance := GetPlaybackServer()
return playbackInstance.Run(ctx) return playbackInstance.Run(ctx)
} }
} }

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"sync"
"github.com/navidrome/navidrome/core/playback/mpv" "github.com/navidrome/navidrome/core/playback/mpv"
"github.com/navidrome/navidrome/log" "github.com/navidrome/navidrome/log"
@ -22,6 +23,7 @@ type Track interface {
} }
type playbackDevice struct { type playbackDevice struct {
serviceCtx context.Context
ParentPlaybackServer PlaybackServer ParentPlaybackServer PlaybackServer
Default bool Default bool
User string User string
@ -31,7 +33,7 @@ type playbackDevice struct {
Gain float32 Gain float32
PlaybackDone chan bool PlaybackDone chan bool
ActiveTrack Track ActiveTrack Track
TrackSwitcherStarted bool startTrackSwitcher sync.Once
} }
type DeviceStatus struct { type DeviceStatus struct {
@ -43,8 +45,6 @@ type DeviceStatus struct {
const DefaultGain float32 = 1.0 const DefaultGain float32 = 1.0
var EmptyStatus = DeviceStatus{CurrentIndex: -1, Playing: false, Gain: DefaultGain, Position: 0}
func (pd *playbackDevice) getStatus() DeviceStatus { func (pd *playbackDevice) getStatus() DeviceStatus {
pos := 0 pos := 0
if pd.ActiveTrack != nil { if pd.ActiveTrack != nil {
@ -61,8 +61,9 @@ func (pd *playbackDevice) getStatus() DeviceStatus {
// NewPlaybackDevice creates a new playback device which implements all the basic Jukebox mode commands defined here: // NewPlaybackDevice creates a new playback device which implements all the basic Jukebox mode commands defined here:
// http://www.subsonic.org/pages/api.jsp#jukeboxControl // http://www.subsonic.org/pages/api.jsp#jukeboxControl
// Starts the trackSwitcher goroutine for the device. // Starts the trackSwitcher goroutine for the device.
func NewPlaybackDevice(playbackServer PlaybackServer, name string, deviceName string) *playbackDevice { func NewPlaybackDevice(ctx context.Context, playbackServer PlaybackServer, name string, deviceName string) *playbackDevice {
return &playbackDevice{ return &playbackDevice{
serviceCtx: ctx,
ParentPlaybackServer: playbackServer, ParentPlaybackServer: playbackServer,
User: "", User: "",
Name: name, Name: name,
@ -70,7 +71,6 @@ func NewPlaybackDevice(playbackServer PlaybackServer, name string, deviceName st
Gain: DefaultGain, Gain: DefaultGain,
PlaybackQueue: NewQueue(), PlaybackQueue: NewQueue(),
PlaybackDone: make(chan bool), PlaybackDone: make(chan bool),
TrackSwitcherStarted: false,
} }
} }
@ -103,14 +103,13 @@ func (pd *playbackDevice) Set(ctx context.Context, ids []string) (DeviceStatus,
func (pd *playbackDevice) Start(ctx context.Context) (DeviceStatus, error) { func (pd *playbackDevice) Start(ctx context.Context) (DeviceStatus, error) {
log.Debug(ctx, "Processing Start action", "device", pd) log.Debug(ctx, "Processing Start action", "device", pd)
if !pd.TrackSwitcherStarted { pd.startTrackSwitcher.Do(func() {
log.Info(ctx, "Starting trackSwitcher goroutine") log.Info(ctx, "Starting trackSwitcher goroutine")
// Start one trackSwitcher goroutine with each device // Start one trackSwitcher goroutine with each device
go func() { go func() {
pd.trackSwitcherGoroutine() pd.trackSwitcherGoroutine()
}() }()
pd.TrackSwitcherStarted = true })
}
if pd.ActiveTrack != nil { if pd.ActiveTrack != nil {
if pd.isPlaying() { if pd.isPlaying() {
@ -255,7 +254,8 @@ func (pd *playbackDevice) isPlaying() bool {
func (pd *playbackDevice) trackSwitcherGoroutine() { func (pd *playbackDevice) trackSwitcherGoroutine() {
log.Debug("Started trackSwitcher goroutine", "device", pd) log.Debug("Started trackSwitcher goroutine", "device", pd)
for { for {
<-pd.PlaybackDone select {
case <-pd.PlaybackDone:
log.Debug("Track switching detected") log.Debug("Track switching detected")
if pd.ActiveTrack != nil { if pd.ActiveTrack != nil {
pd.ActiveTrack.Close() pd.ActiveTrack.Close()
@ -275,6 +275,10 @@ func (pd *playbackDevice) trackSwitcherGoroutine() {
} else { } else {
log.Debug("There is no song left in the playlist. Finish.") log.Debug("There is no song left in the playlist. Finish.")
} }
case <-pd.serviceCtx.Done():
log.Debug("Stopping trackSwitcher goroutine", "device", pd.Name)
return
}
} }
} }
@ -285,7 +289,7 @@ func (pd *playbackDevice) switchActiveTrackByIndex(index int) error {
return errors.New("could not get current track") return errors.New("could not get current track")
} }
track, err := mpv.NewTrack(pd.PlaybackDone, pd.DeviceName, *currentTrack) track, err := mpv.NewTrack(pd.serviceCtx, pd.PlaybackDone, pd.DeviceName, *currentTrack)
if err != nil { if err != nil {
return err return err
} }

View file

@ -14,11 +14,11 @@ import (
"github.com/navidrome/navidrome/log" "github.com/navidrome/navidrome/log"
) )
func start(args []string) (Executor, error) { func start(ctx context.Context, args []string) (Executor, error) {
log.Debug("Executing mpv command", "cmd", args) log.Debug("Executing mpv command", "cmd", args)
j := Executor{args: args} j := Executor{args: args}
j.PipeReader, j.out = io.Pipe() j.PipeReader, j.out = io.Pipe()
err := j.start() err := j.start(ctx)
if err != nil { if err != nil {
return Executor{}, err return Executor{}, err
} }
@ -38,12 +38,9 @@ type Executor struct {
out *io.PipeWriter out *io.PipeWriter
args []string args []string
cmd *exec.Cmd cmd *exec.Cmd
ctx context.Context
} }
func (j *Executor) start() error { func (j *Executor) start(ctx context.Context) error {
ctx := context.Background()
j.ctx = ctx
cmd := exec.CommandContext(ctx, j.args[0], j.args[1:]...) // #nosec cmd := exec.CommandContext(ctx, j.args[0], j.args[1:]...) // #nosec
cmd.Stdout = j.out cmd.Stdout = j.out
if log.IsGreaterOrEqualTo(log.LevelTrace) { if log.IsGreaterOrEqualTo(log.LevelTrace) {

View file

@ -6,6 +6,7 @@ package mpv
// https://mpv.io/manual/master/#properties // https://mpv.io/manual/master/#properties
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"time" "time"
@ -24,7 +25,7 @@ type MpvTrack struct {
CloseCalled bool CloseCalled bool
} }
func NewTrack(playbackDoneChannel chan bool, deviceName string, mf model.MediaFile) (*MpvTrack, error) { func NewTrack(ctx context.Context, playbackDoneChannel chan bool, deviceName string, mf model.MediaFile) (*MpvTrack, error) {
log.Debug("Loading track", "trackPath", mf.Path, "mediaType", mf.ContentType()) log.Debug("Loading track", "trackPath", mf.Path, "mediaType", mf.ContentType())
if _, err := mpvCommand(); err != nil { if _, err := mpvCommand(); err != nil {
@ -34,7 +35,7 @@ func NewTrack(playbackDoneChannel chan bool, deviceName string, mf model.MediaFi
tmpSocketName := socketName("mpv-ctrl-", ".socket") tmpSocketName := socketName("mpv-ctrl-", ".socket")
args := createMPVCommand(deviceName, mf.Path, tmpSocketName) args := createMPVCommand(deviceName, mf.Path, tmpSocketName)
exe, err := start(args) exe, err := start(ctx, args)
if err != nil { if err != nil {
log.Error("Error starting mpv process", err) log.Error("Error starting mpv process", err)
return nil, err return nil, err
@ -110,13 +111,13 @@ func (t *MpvTrack) Close() {
log.Debug("sending shutdown command") log.Debug("sending shutdown command")
_, err := t.Conn.Call("quit") _, err := t.Conn.Call("quit")
if err != nil { if err != nil {
log.Error("Error sending quit command to mpv-ipc socket", err) log.Warn("Error sending quit command to mpv-ipc socket", err)
if t.Exe != nil { if t.Exe != nil {
log.Debug("cancelling executor") log.Debug("cancelling executor")
err = t.Exe.Cancel() err = t.Exe.Cancel()
if err != nil { if err != nil {
log.Error("Error canceling executor", err) log.Warn("Error canceling executor", err)
} }
} }
} }

View file

@ -19,7 +19,6 @@ type PlaybackServer interface {
Run(ctx context.Context) error Run(ctx context.Context) error
GetDeviceForUser(user string) (*playbackDevice, error) GetDeviceForUser(user string) (*playbackDevice, error)
GetMediaFile(id string) (*model.MediaFile, error) GetMediaFile(id string) (*model.MediaFile, error)
GetCtx() *context.Context
} }
type playbackServer struct { type playbackServer struct {
@ -37,37 +36,33 @@ func GetInstance(ds model.DataStore) PlaybackServer {
// Run starts the playback server which serves request until canceled using the given context // Run starts the playback server which serves request until canceled using the given context
func (ps *playbackServer) Run(ctx context.Context) error { func (ps *playbackServer) Run(ctx context.Context) error {
devices, err := ps.initDeviceStatus(conf.Server.Jukebox.Devices, conf.Server.Jukebox.Default) ps.ctx = &ctx
ps.playbackDevices = devices
devices, err := ps.initDeviceStatus(ctx, conf.Server.Jukebox.Devices, conf.Server.Jukebox.Default)
if err != nil { if err != nil {
return err return err
} }
ps.playbackDevices = devices
log.Info(ctx, fmt.Sprintf("%d audio devices found", len(devices))) log.Info(ctx, fmt.Sprintf("%d audio devices found", len(devices)))
defaultDevice, _ := ps.getDefaultDevice() defaultDevice, _ := ps.getDefaultDevice()
log.Info(ctx, "Using audio device: "+defaultDevice.DeviceName) log.Info(ctx, "Using audio device: "+defaultDevice.DeviceName)
ps.ctx = &ctx
<-ctx.Done() <-ctx.Done()
// Should confirm all subprocess are terminated before returning
return nil return nil
} }
// GetCtx produces the context this server was started with. Used for data-retrieval and cancellation func (ps *playbackServer) initDeviceStatus(ctx context.Context, devices []conf.AudioDeviceDefinition, defaultDevice string) ([]playbackDevice, error) {
func (ps *playbackServer) GetCtx() *context.Context {
return ps.ctx
}
func (ps *playbackServer) initDeviceStatus(devices []conf.AudioDeviceDefinition, defaultDevice string) ([]playbackDevice, error) {
pbDevices := make([]playbackDevice, max(1, len(devices))) pbDevices := make([]playbackDevice, max(1, len(devices)))
defaultDeviceFound := false defaultDeviceFound := false
if defaultDevice == "" { if defaultDevice == "" {
// if there are no devices given and no default device, we create a synthetic device named "auto" // if there are no devices given and no default device, we create a synthetic device named "auto"
if len(devices) == 0 { if len(devices) == 0 {
pbDevices[0] = *NewPlaybackDevice(ps, "auto", "auto") pbDevices[0] = *NewPlaybackDevice(ctx, ps, "auto", "auto")
} }
// if there is but only one entry and no default given, just use that. // if there is but only one entry and no default given, just use that.
@ -75,7 +70,7 @@ func (ps *playbackServer) initDeviceStatus(devices []conf.AudioDeviceDefinition,
if len(devices[0]) != 2 { if len(devices[0]) != 2 {
return []playbackDevice{}, fmt.Errorf("audio device definition ought to contain 2 fields, found: %d ", len(devices[0])) return []playbackDevice{}, fmt.Errorf("audio device definition ought to contain 2 fields, found: %d ", len(devices[0]))
} }
pbDevices[0] = *NewPlaybackDevice(ps, devices[0][0], devices[0][1]) pbDevices[0] = *NewPlaybackDevice(ctx, ps, devices[0][0], devices[0][1])
} }
if len(devices) > 1 { if len(devices) > 1 {
@ -91,7 +86,7 @@ func (ps *playbackServer) initDeviceStatus(devices []conf.AudioDeviceDefinition,
return []playbackDevice{}, fmt.Errorf("audio device definition ought to contain 2 fields, found: %d ", len(audioDevice)) return []playbackDevice{}, fmt.Errorf("audio device definition ought to contain 2 fields, found: %d ", len(audioDevice))
} }
pbDevices[idx] = *NewPlaybackDevice(ps, audioDevice[0], audioDevice[1]) pbDevices[idx] = *NewPlaybackDevice(ctx, ps, audioDevice[0], audioDevice[1])
if audioDevice[0] == defaultDevice { if audioDevice[0] == defaultDevice {
pbDevices[idx].Default = true pbDevices[idx].Default = true
@ -106,12 +101,12 @@ func (ps *playbackServer) initDeviceStatus(devices []conf.AudioDeviceDefinition,
} }
func (ps *playbackServer) getDefaultDevice() (*playbackDevice, error) { func (ps *playbackServer) getDefaultDevice() (*playbackDevice, error) {
for idx, audioDevice := range ps.playbackDevices { for idx := range ps.playbackDevices {
if audioDevice.Default { if ps.playbackDevices[idx].Default {
return &ps.playbackDevices[idx], nil return &ps.playbackDevices[idx], nil
} }
} }
return &playbackDevice{}, fmt.Errorf("no default device found") return nil, fmt.Errorf("no default device found")
} }
// GetMediaFile retrieves the MediaFile given by the id parameter // GetMediaFile retrieves the MediaFile given by the id parameter
@ -125,7 +120,7 @@ func (ps *playbackServer) GetDeviceForUser(user string) (*playbackDevice, error)
// README: here we might plug-in the user-device mapping one fine day // README: here we might plug-in the user-device mapping one fine day
device, err := ps.getDefaultDevice() device, err := ps.getDefaultDevice()
if err != nil { if err != nil {
return &playbackDevice{}, err return nil, err
} }
device.User = user device.User = user
return device, nil return device, nil

View file

@ -134,17 +134,3 @@ func (pd *Queue) IncreaseIndex() {
pd.SetIndex(pd.Index + 1) pd.SetIndex(pd.Index + 1)
} }
} }
func max(x, y int) int {
if x < y {
return y
}
return x
}
func min(x, y int) int {
if x > y {
return y
}
return x
}