Make eventStream connection/reconnection more reliable

Also more logs on the server
This commit is contained in:
Deluan 2020-11-20 20:22:22 -05:00
parent c8c95bfb47
commit 3e8bee4f65
4 changed files with 121 additions and 61 deletions

View file

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"sync/atomic"
"time" "time"
"github.com/deluan/navidrome/log" "github.com/deluan/navidrome/log"
@ -16,29 +17,44 @@ type Broker interface {
SendMessage(event Event) SendMessage(event Event)
} }
var serverStart time.Time const keepAliveFrequency = 15 * time.Second
type (
messageChan chan []byte
clientsChan chan client
client struct {
address string
username string
userAgent string
channel messageChan
}
)
func (c client) String() string {
return fmt.Sprintf("%s (%s - %s)", c.username, c.address, c.userAgent)
}
type broker struct { type broker struct {
// Events are pushed to this channel by the main events-gathering routine // Events are pushed to this channel by the main events-gathering routine
notifier chan []byte notifier messageChan
// New client connections // New client connections
newClients chan chan []byte newClients clientsChan
// Closed client connections // Closed client connections
closingClients chan chan []byte closingClients clientsChan
// Client connections registry // Client connections registry
clients map[chan []byte]bool clients map[client]bool
} }
func NewBroker() Broker { func NewBroker() Broker {
// Instantiate a broker // Instantiate a broker
broker := &broker{ broker := &broker{
notifier: make(chan []byte, 100), notifier: make(messageChan, 100),
newClients: make(chan chan []byte), newClients: make(clientsChan),
closingClients: make(chan chan []byte), closingClients: make(clientsChan),
clients: make(map[chan []byte]bool), clients: make(map[client]bool),
} }
// Set it running - listening and broadcasting events // Set it running - listening and broadcasting events
@ -48,17 +64,21 @@ func NewBroker() Broker {
} }
func (broker *broker) SendMessage(event Event) { func (broker *broker) SendMessage(event Event) {
data := broker.formatEvent(event) pkg := broker.preparePackage(event)
log.Trace("Broker received new event", "name", event.EventName(), "payload", string(data)) log.Trace("Broker received new event", "name", event.EventName(), "event", string(pkg))
broker.notifier <- data broker.notifier <- pkg
} }
func (broker *broker) formatEvent(event Event) []byte { var eventId uint32
func (broker *broker) preparePackage(event Event) []byte {
pkg := struct { pkg := struct {
Event `json:"data"` Event `json:"data"`
Id uint32 `json:"id"`
Name string `json:"name"` Name string `json:"name"`
}{} }{}
pkg.Id = atomic.AddUint32(&eventId, 1)
pkg.Name = event.EventName() pkg.Name = event.EventName()
pkg.Event = event pkg.Event = event
data, _ := json.Marshal(pkg) data, _ := json.Marshal(pkg)
@ -66,10 +86,11 @@ func (broker *broker) formatEvent(event Event) []byte {
} }
func (broker *broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) { func (broker *broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
ctx := req.Context()
// Make sure that the writer supports flushing. // Make sure that the writer supports flushing.
flusher, ok := rw.(http.Flusher) flusher, ok := rw.(http.Flusher)
user, _ := request.UserFrom(ctx)
user, _ := request.UserFrom(req.Context())
if !ok { if !ok {
log.Error(rw, "Streaming unsupported! Events cannot be sent to this client", "address", req.RemoteAddr, log.Error(rw, "Streaming unsupported! Events cannot be sent to this client", "address", req.RemoteAddr,
"userAgent", req.UserAgent(), "user", user.UserName) "userAgent", req.UserAgent(), "user", user.UserName)
@ -83,31 +104,37 @@ func (broker *broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("Access-Control-Allow-Origin", "*") rw.Header().Set("Access-Control-Allow-Origin", "*")
// Each connection registers its own message channel with the Broker's connections registry // Each connection registers its own message channel with the Broker's connections registry
messageChan := make(chan []byte) client := client{
username: user.UserName,
address: req.RemoteAddr,
userAgent: req.UserAgent(),
channel: make(messageChan),
}
// Signal the broker that we have a new connection // Signal the broker that we have a new client
broker.newClients <- messageChan broker.newClients <- client
log.Debug(req.Context(), "New broker client", "address", req.RemoteAddr, "userAgent", req.UserAgent(), log.Debug(ctx, "New broker client", "client", client.String())
"user", user.UserName)
// Remove this client from the map of connected clients // Remove this client from the map of connected clients
// when this handler exits. // when this handler exits.
defer func() { defer func() {
broker.closingClients <- messageChan broker.closingClients <- client
}() }()
// Listen to connection close and un-register messageChan // Listen to client close and un-register messageChan
notify := req.Context().Done() notify := ctx.Done()
go func() { go func() {
<-notify <-notify
broker.closingClients <- messageChan broker.closingClients <- client
}() }()
for { for {
// Write to the ResponseWriter // Write to the ResponseWriter
// Server Sent Events compatible // Server Sent Events compatible
_, _ = fmt.Fprintf(rw, "data: %s\n\n", <-messageChan) event := <-client.channel
log.Trace(ctx, "Sending event to client", "event", string(event), "client", client.String())
_, _ = fmt.Fprintf(rw, "data: %s\n\n", event)
// Flush the data immediately instead of buffering it for later. // Flush the data immediately instead of buffering it for later.
flusher.Flush() flusher.Flush()
@ -115,7 +142,7 @@ func (broker *broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
} }
func (broker *broker) listen() { func (broker *broker) listen() {
keepAlive := time.NewTicker(15 * time.Second) keepAlive := time.NewTicker(keepAliveFrequency)
defer keepAlive.Stop() defer keepAlive.Stop()
for { for {
@ -124,31 +151,34 @@ func (broker *broker) listen() {
// A new client has connected. // A new client has connected.
// Register their message channel // Register their message channel
broker.clients[s] = true broker.clients[s] = true
log.Debug("Client added to event broker", "numClients", len(broker.clients)) log.Debug("Client added to event broker", "numClients", len(broker.clients), "newClient", s.String())
// Send a serverStart event to new client // Send a serverStart event to new client
s <- broker.formatEvent(&ServerStart{serverStart}) s.channel <- broker.preparePackage(&ServerStart{serverStart})
case s := <-broker.closingClients: case s := <-broker.closingClients:
// A client has dettached and we want to // A client has dettached and we want to
// stop sending them messages. // stop sending them messages.
delete(broker.clients, s) delete(broker.clients, s)
log.Debug("Removed client from event broker", "numClients", len(broker.clients)) log.Debug("Removed client from event broker", "numClients", len(broker.clients), "client", s.String())
case event := <-broker.notifier: case event := <-broker.notifier:
// We got a new event from the outside! // We got a new event from the outside!
// Send event to all connected clients // Send event to all connected clients
for clientMessageChan := range broker.clients { for client := range broker.clients {
clientMessageChan <- event log.Trace("Putting event on client's queue", "client", client.String(), "event", string(event))
client.channel <- event
} }
case ts := <-keepAlive.C: case ts := <-keepAlive.C:
// Send a keep alive packet every 15 seconds // Send a keep alive message every 15 seconds
broker.SendMessage(&KeepAlive{TS: ts.Unix()}) broker.SendMessage(&KeepAlive{TS: ts.Unix()})
} }
} }
} }
var serverStart time.Time
func init() { func init() {
serverStart = time.Now() serverStart = time.Now()
} }

View file

@ -26,7 +26,7 @@ import {
import createAdminStore from './store/createAdminStore' import createAdminStore from './store/createAdminStore'
import { i18nProvider } from './i18n' import { i18nProvider } from './i18n'
import config from './config' import config from './config'
import { startEventStream } from './eventStream' import { setDispatch, startEventStream } from './eventStream'
const history = createHashHistory() const history = createHashHistory()
@ -60,7 +60,11 @@ const App = () => (
const Admin = (props) => { const Admin = (props) => {
const dispatch = useDispatch() const dispatch = useDispatch()
if (config.devActivityMenu) { if (config.devActivityMenu) {
startEventStream(dispatch) setDispatch(dispatch)
authProvider
.checkAuth()
.then(() => startEventStream())
.catch(() => {}) // ignore if not logged in
} }
return ( return (

View file

@ -1,8 +1,9 @@
import jwtDecode from 'jwt-decode' import jwtDecode from 'jwt-decode'
import md5 from 'md5-hex' import md5 from 'md5-hex'
import { v4 as uuidv4 } from 'uuid'
import { baseUrl } from './utils' import { baseUrl } from './utils'
import config from './config' import config from './config'
import { v4 as uuidv4 } from 'uuid' import { startEventStream, stopEventStream } from './eventStream'
const authProvider = { const authProvider = {
login: ({ username, password }) => { login: ({ username, password }) => {
@ -38,6 +39,9 @@ const authProvider = {
) )
// Avoid going to create admin dialog after logout/login without a refresh // Avoid going to create admin dialog after logout/login without a refresh
config.firstTime = false config.firstTime = false
startEventStream().catch((e) =>
console.log('error setting up event stream:', e)
)
return response return response
}) })
.catch((error) => { .catch((error) => {
@ -53,6 +57,7 @@ const authProvider = {
}, },
logout: () => { logout: () => {
stopEventStream()
removeItems() removeItems()
return Promise.resolve() return Promise.resolve()
}, },

View file

@ -1,19 +1,24 @@
import { baseUrl } from './utils' import { baseUrl } from './utils'
import throttle from 'lodash.throttle' import throttle from 'lodash.throttle'
import { processEvent, serverDown } from './actions' import { processEvent, serverDown } from './actions'
import { httpClient } from './dataProvider'
import { REST_URL } from './consts'
let es = null
let dispatch = null
let timeout = null
const defaultIntervalCheck = 20000 const defaultIntervalCheck = 20000
const reconnectIntervalCheck = 2000 const reconnectIntervalCheck = 2000
let currentIntervalCheck = reconnectIntervalCheck let currentIntervalCheck = reconnectIntervalCheck
let es = null
let dispatch = null
let timeout = null
const getEventStream = () => { const getEventStream = async () => {
if (es === null) { if (es === null) {
es = new EventSource( return httpClient(`${REST_URL}/keepalive/`).then(() => {
baseUrl(`/app/api/events?jwt=${localStorage.getItem('token')}`) es = new EventSource(
) baseUrl(`/app/api/events?jwt=${localStorage.getItem('token')}`)
)
return es
})
} }
return es return es
} }
@ -33,29 +38,45 @@ const setTimeout = (value) => {
}, currentIntervalCheck) }, currentIntervalCheck)
} }
export const startEventStream = (dispatchFunc) => { const stopEventStream = () => {
if (es) {
es.close()
}
es = null
if (timeout != null) {
window.clearTimeout(timeout)
}
timeout = null
}
const setDispatch = (dispatchFunc) => {
dispatch = dispatchFunc dispatch = dispatchFunc
}
const startEventStream = async () => {
setTimeout(currentIntervalCheck) setTimeout(currentIntervalCheck)
if (!localStorage.getItem('token')) { if (!localStorage.getItem('token')) {
console.log('Cannot create a unauthenticated EventSource connection') console.log('Cannot create a unauthenticated EventSource connection')
return return
} }
const es = getEventStream() getEventStream().then((newStream) => {
es.onmessage = throttle( newStream.onmessage = throttle(
(msg) => { (msg) => {
const data = JSON.parse(msg.data) const data = JSON.parse(msg.data)
if (data.name !== 'keepAlive') { if (data.name !== 'keepAlive') {
dispatch(processEvent(data)) dispatch(processEvent(data))
} }
setTimeout(defaultIntervalCheck) // Reset timeout on every received message setTimeout(defaultIntervalCheck) // Reset timeout on every received message
}, },
100, 100,
{ trailing: true } { trailing: true }
) )
es.onerror = (e) => { newStream.onerror = (e) => {
setTimeout(reconnectIntervalCheck) setTimeout(reconnectIntervalCheck)
dispatch(serverDown()) dispatch(serverDown())
} }
es = newStream
return es })
} }
export { setDispatch, startEventStream, stopEventStream }