mirror of
https://github.com/navidrome/navidrome.git
synced 2025-04-05 21:47:36 +03:00
Fix deadlock situation when events are sent too fast to the broker
This commit is contained in:
parent
4ea0f235e1
commit
5bd33455a1
2 changed files with 8 additions and 9 deletions
|
@ -48,7 +48,6 @@ type scanner struct {
|
||||||
ds model.DataStore
|
ds model.DataStore
|
||||||
cacheWarmer core.CacheWarmer
|
cacheWarmer core.CacheWarmer
|
||||||
broker events.Broker
|
broker events.Broker
|
||||||
scan chan bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type scanStatus struct {
|
type scanStatus struct {
|
||||||
|
@ -66,7 +65,6 @@ func New(ds model.DataStore, cacheWarmer core.CacheWarmer, broker events.Broker)
|
||||||
folders: map[string]FolderScanner{},
|
folders: map[string]FolderScanner{},
|
||||||
status: map[string]*scanStatus{},
|
status: map[string]*scanStatus{},
|
||||||
lock: &sync.RWMutex{},
|
lock: &sync.RWMutex{},
|
||||||
scan: make(chan bool),
|
|
||||||
}
|
}
|
||||||
s.loadFolders()
|
s.loadFolders()
|
||||||
return s
|
return s
|
||||||
|
|
|
@ -10,13 +10,12 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/navidrome/navidrome/utils/singleton"
|
|
||||||
|
|
||||||
"code.cloudfoundry.org/go-diodes"
|
"code.cloudfoundry.org/go-diodes"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/navidrome/navidrome/consts"
|
"github.com/navidrome/navidrome/consts"
|
||||||
"github.com/navidrome/navidrome/log"
|
"github.com/navidrome/navidrome/log"
|
||||||
"github.com/navidrome/navidrome/model/request"
|
"github.com/navidrome/navidrome/model/request"
|
||||||
|
"github.com/navidrome/navidrome/utils/singleton"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Broker interface {
|
type Broker interface {
|
||||||
|
@ -89,17 +88,17 @@ func GetBroker() Broker {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *broker) SendMessage(ctx context.Context, evt Event) {
|
func (b *broker) SendMessage(ctx context.Context, evt Event) {
|
||||||
msg := b.prepareMessage(evt)
|
msg := b.prepareMessage(ctx, evt)
|
||||||
msg.senderCtx = ctx
|
|
||||||
log.Trace("Broker received new event", "event", msg)
|
log.Trace("Broker received new event", "event", msg)
|
||||||
b.publish <- msg
|
b.publish <- msg
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *broker) prepareMessage(event Event) message {
|
func (b *broker) prepareMessage(ctx context.Context, event Event) message {
|
||||||
msg := message{}
|
msg := message{}
|
||||||
msg.id = atomic.AddUint32(&eventId, 1)
|
msg.id = atomic.AddUint32(&eventId, 1)
|
||||||
msg.data = event.Data(event)
|
msg.data = event.Data(event)
|
||||||
msg.event = event.Name(event)
|
msg.event = event.Name(event)
|
||||||
|
msg.senderCtx = ctx
|
||||||
return msg
|
return msg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -213,7 +212,8 @@ func (b *broker) listen() {
|
||||||
log.Debug("Client added to event broker", "numClients", len(clients), "newClient", c.String())
|
log.Debug("Client added to event broker", "numClients", len(clients), "newClient", c.String())
|
||||||
|
|
||||||
// Send a serverStart event to new client
|
// Send a serverStart event to new client
|
||||||
c.diode.put(b.prepareMessage(&ServerStart{StartTime: consts.ServerStart, Version: consts.Version()}))
|
c.diode.put(b.prepareMessage(context.Background(),
|
||||||
|
&ServerStart{StartTime: consts.ServerStart, Version: consts.Version()}))
|
||||||
|
|
||||||
case c := <-b.unsubscribing:
|
case c := <-b.unsubscribing:
|
||||||
// A client has detached and we want to
|
// A client has detached and we want to
|
||||||
|
@ -222,6 +222,7 @@ func (b *broker) listen() {
|
||||||
log.Debug("Removed client from event broker", "numClients", len(clients), "client", c.String())
|
log.Debug("Removed client from event broker", "numClients", len(clients), "client", c.String())
|
||||||
|
|
||||||
case event := <-b.publish:
|
case event := <-b.publish:
|
||||||
|
log.Trace("Got new published event", "event", event)
|
||||||
// We got a new event from the outside!
|
// We got a new event from the outside!
|
||||||
// Send event to all connected clients
|
// Send event to all connected clients
|
||||||
for c := range clients {
|
for c := range clients {
|
||||||
|
@ -233,7 +234,7 @@ func (b *broker) listen() {
|
||||||
|
|
||||||
case ts := <-keepAlive.C:
|
case ts := <-keepAlive.C:
|
||||||
// Send a keep alive message every 15 seconds
|
// Send a keep alive message every 15 seconds
|
||||||
b.SendMessage(context.Background(), &KeepAlive{TS: ts.Unix()})
|
go b.SendMessage(context.Background(), &KeepAlive{TS: ts.Unix()})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue