mirror of
https://github.com/navidrome/navidrome.git
synced 2025-04-04 13:07:36 +03:00
Use channels for EventStream instead of diodes
This commit is contained in:
parent
fea2de8f90
commit
b22d0366d5
1 changed files with 28 additions and 21 deletions
|
@ -12,7 +12,7 @@ import (
|
|||
"github.com/navidrome/navidrome/consts"
|
||||
"github.com/navidrome/navidrome/log"
|
||||
"github.com/navidrome/navidrome/model/request"
|
||||
"github.com/navidrome/navidrome/utils/diodes"
|
||||
"github.com/navidrome/navidrome/utils/pl"
|
||||
"github.com/navidrome/navidrome/utils/singleton"
|
||||
)
|
||||
|
||||
|
@ -24,6 +24,7 @@ type Broker interface {
|
|||
const (
|
||||
keepAliveFrequency = 15 * time.Second
|
||||
writeTimeOut = 5 * time.Second
|
||||
bufferSize = 1
|
||||
)
|
||||
|
||||
type (
|
||||
|
@ -41,7 +42,7 @@ type (
|
|||
username string
|
||||
userAgent string
|
||||
clientUniqueId string
|
||||
diode *diodes.Diode[message]
|
||||
msgC chan message
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -80,7 +81,7 @@ func GetBroker() Broker {
|
|||
|
||||
func (b *broker) SendMessage(ctx context.Context, evt Event) {
|
||||
msg := b.prepareMessage(ctx, evt)
|
||||
log.Trace("Broker received new event", "event", msg)
|
||||
log.Trace("Broker received new event", "type", msg.event, "data", msg.data)
|
||||
b.publish <- msg
|
||||
}
|
||||
|
||||
|
@ -147,21 +148,18 @@ func (b *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
// Each connection registers its own message channel with the Broker's connections registry
|
||||
c := b.subscribe(r)
|
||||
defer b.unsubscribe(c)
|
||||
log.Debug(ctx, "New broker client", "client", c.String())
|
||||
log.Debug(ctx, "Started new EventStream connection", "client", c.String())
|
||||
|
||||
for {
|
||||
event := c.diode.Next()
|
||||
if event == nil {
|
||||
log.Trace(ctx, "Client closed the EventStream connection", "client", c.String())
|
||||
return
|
||||
}
|
||||
log.Trace(ctx, "Sending event to client", "event", *event, "client", c.String())
|
||||
err := writeEvent(ctx, w, *event, writeTimeOut)
|
||||
for event := range pl.ReadOrDone(ctx, c.msgC) {
|
||||
log.Trace(ctx, "Sending event to client", "event", event, "client", c.String())
|
||||
err := writeEvent(ctx, w, event, writeTimeOut)
|
||||
if err != nil {
|
||||
log.Debug(ctx, "Error sending event to client. Closing connection", "event", *event, "client", c.String(), err)
|
||||
log.Debug(ctx, "Error sending event to client. Closing connection", "event", event, "client", c.String(), err)
|
||||
return
|
||||
}
|
||||
}
|
||||
log.Trace(ctx, "Client EventStream connection closed", "client", c.String())
|
||||
return
|
||||
}
|
||||
|
||||
func (b *broker) subscribe(r *http.Request) client {
|
||||
|
@ -175,9 +173,7 @@ func (b *broker) subscribe(r *http.Request) client {
|
|||
userAgent: r.UserAgent(),
|
||||
clientUniqueId: clientUniqueId,
|
||||
}
|
||||
c.diode = diodes.New[message](ctx, 1024, diodes.AlertFunc(func(missed int) {
|
||||
log.Debug("Dropped SSE events", "client", c.String(), "missed", missed)
|
||||
}))
|
||||
c.msgC = make(chan message, bufferSize)
|
||||
|
||||
// Signal the broker that we have a new client
|
||||
b.subscribing <- c
|
||||
|
@ -220,18 +216,19 @@ func (b *broker) listen() {
|
|||
// A new client has connected.
|
||||
// Register their message channel
|
||||
clients[c] = struct{}{}
|
||||
log.Debug("Client added to event broker", "numClients", len(clients), "newClient", c.String())
|
||||
log.Debug("Client added to EventStream broker", "numActiveClients", len(clients), "newClient", c.String())
|
||||
|
||||
// Send a serverStart event to new client
|
||||
msg := b.prepareMessage(context.Background(),
|
||||
&ServerStart{StartTime: consts.ServerStart, Version: consts.Version})
|
||||
c.diode.Put(msg)
|
||||
sendOrDrop(c, msg)
|
||||
|
||||
case c := <-b.unsubscribing:
|
||||
// A client has detached, and we want to
|
||||
// stop sending them messages.
|
||||
close(c.msgC)
|
||||
delete(clients, c)
|
||||
log.Debug("Removed client from event broker", "numClients", len(clients), "client", c.String())
|
||||
log.Debug("Removed client from EventStream broker", "numActiveClients", len(clients), "client", c.String())
|
||||
|
||||
case msg := <-b.publish:
|
||||
msg.id = getNextEventId()
|
||||
|
@ -241,7 +238,7 @@ func (b *broker) listen() {
|
|||
for c := range clients {
|
||||
if b.shouldSend(msg, c) {
|
||||
log.Trace("Putting event on client's queue", "client", c.String(), "event", msg)
|
||||
c.diode.Put(msg)
|
||||
sendOrDrop(c, msg)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -254,8 +251,18 @@ func (b *broker) listen() {
|
|||
msg.id = getNextEventId()
|
||||
for c := range clients {
|
||||
log.Trace("Putting a keepalive event on client's queue", "client", c.String(), "event", msg)
|
||||
c.diode.Put(msg)
|
||||
sendOrDrop(c, msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func sendOrDrop(client client, msg message) {
|
||||
select {
|
||||
case client.msgC <- msg:
|
||||
default:
|
||||
if log.CurrentLevel() >= log.LevelTrace {
|
||||
log.Trace("Event dropped because client's channel is full", "event", msg, "client", client.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue