diff --git a/server/events/sse.go b/server/events/sse.go index 068cf8183..f111ee487 100644 --- a/server/events/sse.go +++ b/server/events/sse.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net/http" + "sync/atomic" "time" "github.com/deluan/navidrome/log" @@ -16,29 +17,44 @@ type Broker interface { SendMessage(event Event) } -var serverStart time.Time +const keepAliveFrequency = 15 * time.Second + +type ( + messageChan chan []byte + clientsChan chan client + client struct { + address string + username string + userAgent string + channel messageChan + } +) + +func (c client) String() string { + return fmt.Sprintf("%s (%s - %s)", c.username, c.address, c.userAgent) +} type broker struct { // Events are pushed to this channel by the main events-gathering routine - notifier chan []byte + notifier messageChan // New client connections - newClients chan chan []byte + newClients clientsChan // Closed client connections - closingClients chan chan []byte + closingClients clientsChan // Client connections registry - clients map[chan []byte]bool + clients map[client]bool } func NewBroker() Broker { // Instantiate a broker broker := &broker{ - notifier: make(chan []byte, 100), - newClients: make(chan chan []byte), - closingClients: make(chan chan []byte), - clients: make(map[chan []byte]bool), + notifier: make(messageChan, 100), + newClients: make(clientsChan), + closingClients: make(clientsChan), + clients: make(map[client]bool), } // Set it running - listening and broadcasting events @@ -48,17 +64,21 @@ func NewBroker() Broker { } func (broker *broker) SendMessage(event Event) { - data := broker.formatEvent(event) + pkg := broker.preparePackage(event) - log.Trace("Broker received new event", "name", event.EventName(), "payload", string(data)) - broker.notifier <- data + log.Trace("Broker received new event", "name", event.EventName(), "event", string(pkg)) + broker.notifier <- pkg } -func (broker *broker) formatEvent(event Event) []byte { +var eventId uint32 + +func (broker *broker) preparePackage(event Event) []byte { pkg := struct { Event `json:"data"` + Id uint32 `json:"id"` Name string `json:"name"` }{} + pkg.Id = atomic.AddUint32(&eventId, 1) pkg.Name = event.EventName() pkg.Event = event data, _ := json.Marshal(pkg) @@ -66,10 +86,11 @@ func (broker *broker) formatEvent(event Event) []byte { } func (broker *broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + ctx := req.Context() + // Make sure that the writer supports flushing. flusher, ok := rw.(http.Flusher) - - user, _ := request.UserFrom(req.Context()) + user, _ := request.UserFrom(ctx) if !ok { log.Error(rw, "Streaming unsupported! Events cannot be sent to this client", "address", req.RemoteAddr, "userAgent", req.UserAgent(), "user", user.UserName) @@ -83,31 +104,37 @@ func (broker *broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("Access-Control-Allow-Origin", "*") // Each connection registers its own message channel with the Broker's connections registry - messageChan := make(chan []byte) + client := client{ + username: user.UserName, + address: req.RemoteAddr, + userAgent: req.UserAgent(), + channel: make(messageChan), + } - // Signal the broker that we have a new connection - broker.newClients <- messageChan + // Signal the broker that we have a new client + broker.newClients <- client - log.Debug(req.Context(), "New broker client", "address", req.RemoteAddr, "userAgent", req.UserAgent(), - "user", user.UserName) + log.Debug(ctx, "New broker client", "client", client.String()) // Remove this client from the map of connected clients // when this handler exits. defer func() { - broker.closingClients <- messageChan + broker.closingClients <- client }() - // Listen to connection close and un-register messageChan - notify := req.Context().Done() + // Listen to client close and un-register messageChan + notify := ctx.Done() go func() { <-notify - broker.closingClients <- messageChan + broker.closingClients <- client }() for { // Write to the ResponseWriter // Server Sent Events compatible - _, _ = fmt.Fprintf(rw, "data: %s\n\n", <-messageChan) + event := <-client.channel + log.Trace(ctx, "Sending event to client", "event", string(event), "client", client.String()) + _, _ = fmt.Fprintf(rw, "data: %s\n\n", event) // Flush the data immediately instead of buffering it for later. flusher.Flush() @@ -115,7 +142,7 @@ func (broker *broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) { } func (broker *broker) listen() { - keepAlive := time.NewTicker(15 * time.Second) + keepAlive := time.NewTicker(keepAliveFrequency) defer keepAlive.Stop() for { @@ -124,31 +151,34 @@ func (broker *broker) listen() { // A new client has connected. // Register their message channel broker.clients[s] = true - log.Debug("Client added to event broker", "numClients", len(broker.clients)) + log.Debug("Client added to event broker", "numClients", len(broker.clients), "newClient", s.String()) // Send a serverStart event to new client - s <- broker.formatEvent(&ServerStart{serverStart}) + s.channel <- broker.preparePackage(&ServerStart{serverStart}) case s := <-broker.closingClients: // A client has dettached and we want to // stop sending them messages. delete(broker.clients, s) - log.Debug("Removed client from event broker", "numClients", len(broker.clients)) + log.Debug("Removed client from event broker", "numClients", len(broker.clients), "client", s.String()) case event := <-broker.notifier: // We got a new event from the outside! // Send event to all connected clients - for clientMessageChan := range broker.clients { - clientMessageChan <- event + for client := range broker.clients { + log.Trace("Putting event on client's queue", "client", client.String(), "event", string(event)) + client.channel <- event } case ts := <-keepAlive.C: - // Send a keep alive packet every 15 seconds + // Send a keep alive message every 15 seconds broker.SendMessage(&KeepAlive{TS: ts.Unix()}) } } } +var serverStart time.Time + func init() { serverStart = time.Now() } diff --git a/ui/src/App.js b/ui/src/App.js index 2b9df8de9..c30daa92f 100644 --- a/ui/src/App.js +++ b/ui/src/App.js @@ -26,7 +26,7 @@ import { import createAdminStore from './store/createAdminStore' import { i18nProvider } from './i18n' import config from './config' -import { startEventStream } from './eventStream' +import { setDispatch, startEventStream } from './eventStream' const history = createHashHistory() @@ -60,7 +60,11 @@ const App = () => ( const Admin = (props) => { const dispatch = useDispatch() if (config.devActivityMenu) { - startEventStream(dispatch) + setDispatch(dispatch) + authProvider + .checkAuth() + .then(() => startEventStream()) + .catch(() => {}) // ignore if not logged in } return ( diff --git a/ui/src/authProvider.js b/ui/src/authProvider.js index a1f449490..18562305c 100644 --- a/ui/src/authProvider.js +++ b/ui/src/authProvider.js @@ -1,8 +1,9 @@ import jwtDecode from 'jwt-decode' import md5 from 'md5-hex' +import { v4 as uuidv4 } from 'uuid' import { baseUrl } from './utils' import config from './config' -import { v4 as uuidv4 } from 'uuid' +import { startEventStream, stopEventStream } from './eventStream' const authProvider = { login: ({ username, password }) => { @@ -38,6 +39,9 @@ const authProvider = { ) // Avoid going to create admin dialog after logout/login without a refresh config.firstTime = false + startEventStream().catch((e) => + console.log('error setting up event stream:', e) + ) return response }) .catch((error) => { @@ -53,6 +57,7 @@ const authProvider = { }, logout: () => { + stopEventStream() removeItems() return Promise.resolve() }, diff --git a/ui/src/eventStream.js b/ui/src/eventStream.js index 938413d01..1a5835302 100644 --- a/ui/src/eventStream.js +++ b/ui/src/eventStream.js @@ -1,19 +1,24 @@ import { baseUrl } from './utils' import throttle from 'lodash.throttle' import { processEvent, serverDown } from './actions' +import { httpClient } from './dataProvider' +import { REST_URL } from './consts' -let es = null -let dispatch = null -let timeout = null const defaultIntervalCheck = 20000 const reconnectIntervalCheck = 2000 let currentIntervalCheck = reconnectIntervalCheck +let es = null +let dispatch = null +let timeout = null -const getEventStream = () => { +const getEventStream = async () => { if (es === null) { - es = new EventSource( - baseUrl(`/app/api/events?jwt=${localStorage.getItem('token')}`) - ) + return httpClient(`${REST_URL}/keepalive/`).then(() => { + es = new EventSource( + baseUrl(`/app/api/events?jwt=${localStorage.getItem('token')}`) + ) + return es + }) } return es } @@ -33,29 +38,45 @@ const setTimeout = (value) => { }, currentIntervalCheck) } -export const startEventStream = (dispatchFunc) => { +const stopEventStream = () => { + if (es) { + es.close() + } + es = null + if (timeout != null) { + window.clearTimeout(timeout) + } + timeout = null +} + +const setDispatch = (dispatchFunc) => { dispatch = dispatchFunc +} + +const startEventStream = async () => { setTimeout(currentIntervalCheck) if (!localStorage.getItem('token')) { console.log('Cannot create a unauthenticated EventSource connection') return } - const es = getEventStream() - es.onmessage = throttle( - (msg) => { - const data = JSON.parse(msg.data) - if (data.name !== 'keepAlive') { - dispatch(processEvent(data)) - } - setTimeout(defaultIntervalCheck) // Reset timeout on every received message - }, - 100, - { trailing: true } - ) - es.onerror = (e) => { - setTimeout(reconnectIntervalCheck) - dispatch(serverDown()) - } - - return es + getEventStream().then((newStream) => { + newStream.onmessage = throttle( + (msg) => { + const data = JSON.parse(msg.data) + if (data.name !== 'keepAlive') { + dispatch(processEvent(data)) + } + setTimeout(defaultIntervalCheck) // Reset timeout on every received message + }, + 100, + { trailing: true } + ) + newStream.onerror = (e) => { + setTimeout(reconnectIntervalCheck) + dispatch(serverDown()) + } + es = newStream + }) } + +export { setDispatch, startEventStream, stopEventStream }