From 2b1a5f579ae7ca8210d43fe223c1f33479d43d77 Mon Sep 17 00:00:00 2001 From: Deluan Date: Sun, 8 Nov 2020 00:06:48 -0500 Subject: [PATCH] Adding a communication channel between server and clients using SSE --- cmd/wire_gen.go | 25 +++++- cmd/wire_injectors.go | 29 ++++++- scanner/scanner.go | 16 +++- server/app/app.go | 12 ++- server/events/events.go | 18 ++++ server/events/sse.go | 134 +++++++++++++++++++++++++++++ ui/src/App.js | 26 ++++-- ui/src/actions/activity.js | 12 +++ ui/src/actions/index.js | 1 + ui/src/eventStream.js | 30 +++++++ ui/src/layout/ActivityMenu.js | 80 +++++++++++++++++ ui/src/layout/AppBar.js | 18 ++-- ui/src/reducers/activityReducer.js | 16 ++++ ui/src/reducers/index.js | 1 + ui/src/subsonic/index.js | 2 +- 15 files changed, 395 insertions(+), 25 deletions(-) create mode 100644 server/events/events.go create mode 100644 server/events/sse.go create mode 100644 ui/src/actions/activity.js create mode 100644 ui/src/eventStream.js create mode 100644 ui/src/layout/ActivityMenu.js create mode 100644 ui/src/reducers/activityReducer.js diff --git a/cmd/wire_gen.go b/cmd/wire_gen.go index f6f552617..b6e6fba57 100644 --- a/cmd/wire_gen.go +++ b/cmd/wire_gen.go @@ -12,6 +12,7 @@ import ( "github.com/deluan/navidrome/scanner" "github.com/deluan/navidrome/server" "github.com/deluan/navidrome/server/app" + "github.com/deluan/navidrome/server/events" "github.com/deluan/navidrome/server/subsonic" "github.com/google/wire" "sync" @@ -27,7 +28,8 @@ func CreateServer(musicFolder string) *server.Server { func CreateAppRouter() *app.Router { dataStore := persistence.New() - router := app.New(dataStore) + broker := GetBroker() + router := app.New(dataStore, broker) return router } @@ -53,10 +55,16 @@ func createScanner() scanner.Scanner { artworkCache := core.GetImageCache() artwork := core.NewArtwork(dataStore, artworkCache) cacheWarmer := core.NewCacheWarmer(artwork, artworkCache) - scannerScanner := scanner.New(dataStore, cacheWarmer) + broker := GetBroker() + scannerScanner := scanner.New(dataStore, cacheWarmer, broker) return scannerScanner } +func createBroker() events.Broker { + broker := events.NewBroker() + return broker +} + // wire_injectors.go: var allProviders = wire.NewSet(core.Set, subsonic.New, app.New, persistence.New) @@ -73,3 +81,16 @@ func GetScanner() scanner.Scanner { }) return scannerInstance } + +// Broker must be a Singleton +var ( + onceBroker sync.Once + brokerInstance events.Broker +) + +func GetBroker() events.Broker { + onceBroker.Do(func() { + brokerInstance = createBroker() + }) + return brokerInstance +} diff --git a/cmd/wire_injectors.go b/cmd/wire_injectors.go index afd938b5b..6c45181d0 100644 --- a/cmd/wire_injectors.go +++ b/cmd/wire_injectors.go @@ -3,14 +3,16 @@ package cmd import ( + "sync" + "github.com/deluan/navidrome/core" "github.com/deluan/navidrome/persistence" "github.com/deluan/navidrome/scanner" "github.com/deluan/navidrome/server" "github.com/deluan/navidrome/server/app" + "github.com/deluan/navidrome/server/events" "github.com/deluan/navidrome/server/subsonic" "github.com/google/wire" - "sync" ) var allProviders = wire.NewSet( @@ -28,7 +30,10 @@ func CreateServer(musicFolder string) *server.Server { } func CreateAppRouter() *app.Router { - panic(wire.Build(allProviders)) + panic(wire.Build( + allProviders, + GetBroker, + )) } func CreateSubsonicAPIRouter() *subsonic.Router { @@ -54,6 +59,26 @@ func GetScanner() scanner.Scanner { func createScanner() scanner.Scanner { panic(wire.Build( allProviders, + GetBroker, scanner.New, )) } + +// Broker must be a Singleton +var ( + onceBroker sync.Once + brokerInstance events.Broker +) + +func GetBroker() events.Broker { + onceBroker.Do(func() { + brokerInstance = createBroker() + }) + return brokerInstance +} + +func createBroker() events.Broker { + panic(wire.Build( + events.NewBroker, + )) +} diff --git a/scanner/scanner.go b/scanner/scanner.go index 2fae98aac..93be68526 100644 --- a/scanner/scanner.go +++ b/scanner/scanner.go @@ -12,6 +12,7 @@ import ( "github.com/deluan/navidrome/core" "github.com/deluan/navidrome/log" "github.com/deluan/navidrome/model" + "github.com/deluan/navidrome/server/events" "github.com/deluan/navidrome/utils" ) @@ -47,6 +48,7 @@ type scanner struct { lock *sync.RWMutex ds model.DataStore cacheWarmer core.CacheWarmer + broker events.Broker done chan bool scan chan bool } @@ -57,10 +59,11 @@ type scanStatus struct { lastUpdate time.Time } -func New(ds model.DataStore, cacheWarmer core.CacheWarmer) Scanner { +func New(ds model.DataStore, cacheWarmer core.CacheWarmer, broker events.Broker) Scanner { s := &scanner{ ds: ds, cacheWarmer: cacheWarmer, + broker: broker, folders: map[string]FolderScanner{}, status: map[string]*scanStatus{}, lock: &sync.RWMutex{}, @@ -107,14 +110,21 @@ func (s *scanner) rescan(mediaFolder string, fullRescan bool) error { log.Debug("Scanning folder (full scan)", "folder", mediaFolder) } - progress := make(chan uint32) + progress := make(chan uint32, 100) go func() { + defer func() { + s.broker.SendMessage(&events.ScanStatus{Scanning: false, Count: int64(s.status[mediaFolder].count)}) + }() for { count, more := <-progress if !more { break } - atomic.AddUint32(&s.status[mediaFolder].count, count) + if count == 0 { + continue + } + total := atomic.AddUint32(&s.status[mediaFolder].count, count) + s.broker.SendMessage(&events.ScanStatus{Scanning: true, Count: int64(total)}) } }() diff --git a/server/app/app.go b/server/app/app.go index eecdc4f18..96419fc3a 100644 --- a/server/app/app.go +++ b/server/app/app.go @@ -11,6 +11,7 @@ import ( "github.com/deluan/navidrome/core/auth" "github.com/deluan/navidrome/log" "github.com/deluan/navidrome/model" + "github.com/deluan/navidrome/server/events" "github.com/deluan/rest" "github.com/go-chi/chi" "github.com/go-chi/httprate" @@ -18,12 +19,13 @@ import ( ) type Router struct { - ds model.DataStore - mux http.Handler + ds model.DataStore + mux http.Handler + broker events.Broker } -func New(ds model.DataStore) *Router { - return &Router{ds: ds} +func New(ds model.DataStore, broker events.Broker) *Router { + return &Router{ds: ds, broker: broker} } func (app *Router) Setup(path string) { @@ -68,6 +70,8 @@ func (app *Router) routes(path string) http.Handler { // Keepalive endpoint to be used to keep the session valid (ex: while playing songs) r.Get("/keepalive/*", func(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte(`{"response":"ok"}`)) }) + + r.Handle("/events", app.broker) }) // Serve UI app assets diff --git a/server/events/events.go b/server/events/events.go new file mode 100644 index 000000000..8cf15a4d4 --- /dev/null +++ b/server/events/events.go @@ -0,0 +1,18 @@ +package events + +type Event interface { + EventName() string +} + +type ScanStatus struct { + Scanning bool `json:"scanning"` + Count int64 `json:"count"` +} + +func (s ScanStatus) EventName() string { return "scanStatus" } + +type KeepAlive struct { + TS int64 `json:"ts"` +} + +func (s KeepAlive) EventName() string { return "keepAlive" } diff --git a/server/events/sse.go b/server/events/sse.go new file mode 100644 index 000000000..7a31e9321 --- /dev/null +++ b/server/events/sse.go @@ -0,0 +1,134 @@ +// Based on https://thoughtbot.com/blog/writing-a-server-sent-events-server-in-go +package events + +import ( + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/deluan/navidrome/log" +) + +type Broker interface { + http.Handler + SendMessage(event Event) +} + +type broker struct { + // Events are pushed to this channel by the main events-gathering routine + Notifier chan []byte + + // New client connections + newClients chan chan []byte + + // Closed client connections + closingClients chan chan []byte + + // Client connections registry + clients map[chan []byte]bool +} + +func NewBroker() Broker { + // Instantiate a broker + broker := &broker{ + Notifier: make(chan []byte, 1), + newClients: make(chan chan []byte), + closingClients: make(chan chan []byte), + clients: make(map[chan []byte]bool), + } + + // Set it running - listening and broadcasting events + go broker.listen() + + return broker +} + +func (broker *broker) SendMessage(event Event) { + pkg := struct { + Event `json:"data"` + Name string `json:"name"` + }{} + pkg.Name = event.EventName() + pkg.Event = event + data, _ := json.Marshal(pkg) + broker.Notifier <- data +} + +func (broker *broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + // Make sure that the writer supports flushing. + // + flusher, ok := rw.(http.Flusher) + + if !ok { + http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError) + return + } + + rw.Header().Set("Content-Type", "text/event-stream") + rw.Header().Set("Cache-Control", "no-cache, no-transform") + rw.Header().Set("Connection", "keep-alive") + rw.Header().Set("Access-Control-Allow-Origin", "*") + + // Each connection registers its own message channel with the Broker's connections registry + messageChan := make(chan []byte) + + // Signal the broker that we have a new connection + broker.newClients <- messageChan + + // Remove this client from the map of connected clients + // when this handler exits. + defer func() { + broker.closingClients <- messageChan + }() + + // Listen to connection close and un-register messageChan + // notify := rw.(http.CloseNotifier).CloseNotify() + notify := req.Context().Done() + + go func() { + <-notify + broker.closingClients <- messageChan + }() + + for { + // Write to the ResponseWriter + // Server Sent Events compatible + _, _ = fmt.Fprintf(rw, "data: %s\n\n", <-messageChan) + + // Flush the data immediately instead of buffering it for later. + flusher.Flush() + } +} + +func (broker *broker) listen() { + keepAlive := time.NewTicker(15 * time.Second) + defer keepAlive.Stop() + + for { + select { + case s := <-broker.newClients: + + // A new client has connected. + // Register their message channel + broker.clients[s] = true + log.Debug("Client added", "numClients", len(broker.clients)) + case s := <-broker.closingClients: + + // A client has dettached and we want to + // stop sending them messages. + delete(broker.clients, s) + log.Debug("Removed client", "numClients", len(broker.clients)) + case event := <-broker.Notifier: + + // We got a new event from the outside! + // Send event to all connected clients + for clientMessageChan := range broker.clients { + clientMessageChan <- event + } + case ts := <-keepAlive.C: + // Send a keep alive packet every 15 seconds + broker.SendMessage(&KeepAlive{TS: ts.Unix()}) + } + } +} diff --git a/ui/src/App.js b/ui/src/App.js index ae219623d..eb98ff480 100644 --- a/ui/src/App.js +++ b/ui/src/App.js @@ -1,9 +1,9 @@ import React from 'react' import ReactGA from 'react-ga' import 'react-jinke-music-player/assets/index.css' -import { Provider } from 'react-redux' +import { Provider, useDispatch } from 'react-redux' import { createHashHistory } from 'history' -import { Admin, Resource } from 'react-admin' +import { Admin as RAAdmin, Resource } from 'react-admin' import dataProvider from './dataProvider' import authProvider from './authProvider' import { Layout, Login, Logout } from './layout' @@ -21,10 +21,13 @@ import { addToPlaylistDialogReducer, playQueueReducer, albumViewReducer, + activityReducer, } from './reducers' import createAdminStore from './store/createAdminStore' import { i18nProvider } from './i18n' import config from './config' +import { startEventStream } from './eventStream' +import { updateScanStatus } from './actions' const history = createHashHistory() if (config.gaTrackingId) { @@ -46,10 +49,20 @@ const App = () => ( albumView: albumViewReducer, theme: themeReducer, addToPlaylistDialog: addToPlaylistDialogReducer, + activity: activityReducer, }, })} > - + +) + +const Admin = (props) => { + const dispatch = useDispatch() + startEventStream((data) => dispatch(updateScanStatus(data))) + + return ( + ( layout={Layout} loginPage={Login} logoutButton={Logout} + {...props} > {(permissions) => [ , @@ -91,8 +105,8 @@ const App = () => ( , ]} - - -) + + ) +} export default App diff --git a/ui/src/actions/activity.js b/ui/src/actions/activity.js new file mode 100644 index 000000000..b2c32fd45 --- /dev/null +++ b/ui/src/actions/activity.js @@ -0,0 +1,12 @@ +export const ACTIVITY_SCAN_STATUS_UPD = 'ACTIVITY_SCAN_STATUS_UPD' + +const actionsMap = { scanStatus: ACTIVITY_SCAN_STATUS_UPD } + +export const updateScanStatus = (data) => { + let type = actionsMap[data.name] + if (!type) type = 'UNKNOWN' + return { + type, + data: data.data, + } +} diff --git a/ui/src/actions/index.js b/ui/src/actions/index.js index 23e5c5574..8a46b9ceb 100644 --- a/ui/src/actions/index.js +++ b/ui/src/actions/index.js @@ -2,3 +2,4 @@ export * from './audioplayer' export * from './themes' export * from './albumView' export * from './dialogs' +export * from './activity' diff --git a/ui/src/eventStream.js b/ui/src/eventStream.js new file mode 100644 index 000000000..add7d156c --- /dev/null +++ b/ui/src/eventStream.js @@ -0,0 +1,30 @@ +import baseUrl from './utils/baseUrl' +import throttle from 'lodash.throttle' + +// TODO https://stackoverflow.com/a/20060461 +let es = null +let dispatchFunc = null + +const getEventStream = () => { + if (es === null) { + es = new EventSource( + baseUrl(`/app/api/events?jwt=${localStorage.getItem('token')}`) + ) + } + return es +} + +export const startEventStream = (func) => { + const es = getEventStream() + dispatchFunc = func + es.onmessage = throttle( + (msg) => { + const data = JSON.parse(msg.data) + if (data.name !== 'keepAlive') { + dispatchFunc(data) + } + }, + 100, + { trailing: true } + ) +} diff --git a/ui/src/layout/ActivityMenu.js b/ui/src/layout/ActivityMenu.js new file mode 100644 index 000000000..70c6c68a0 --- /dev/null +++ b/ui/src/layout/ActivityMenu.js @@ -0,0 +1,80 @@ +import React, { useState } from 'react' +import { useSelector } from 'react-redux' +import { + Menu, + Badge, + CircularProgress, + IconButton, + makeStyles, + Tooltip, + MenuItem, +} from '@material-ui/core' +import { FiActivity } from 'react-icons/fi' +import subsonic from '../subsonic' + +const useStyles = makeStyles((theme) => ({ + wrapper: { + position: 'relative', + }, + progress: { + position: 'absolute', + top: -1, + left: 0, + zIndex: 1, + }, + button: { + zIndex: 2, + }, +})) + +const ActivityMenu = () => { + const classes = useStyles() + const [anchorEl, setAnchorEl] = useState(null) + const scanStatus = useSelector((state) => state.activity.scanStatus) + + const open = Boolean(anchorEl) + + const handleMenu = (event) => setAnchorEl(event.currentTarget) + const handleClose = () => setAnchorEl(null) + const startScan = () => fetch(subsonic.url('startScan', null)) + + return ( +
+ + + + + + + + {scanStatus.scanning && ( + + )} + + + {`Scanned: ${scanStatus.count}`} + + +
+ ) +} + +export default ActivityMenu diff --git a/ui/src/layout/AppBar.js b/ui/src/layout/AppBar.js index f7ac7ebdb..a5fef58c0 100644 --- a/ui/src/layout/AppBar.js +++ b/ui/src/layout/AppBar.js @@ -12,6 +12,7 @@ import ViewListIcon from '@material-ui/icons/ViewList' import InfoIcon from '@material-ui/icons/Info' import AboutDialog from './AboutDialog' import PersonalMenu from './PersonalMenu' +import ActivityMenu from './ActivityMenu' const useStyles = makeStyles((theme) => ({ root: { @@ -85,13 +86,16 @@ const CustomUserMenu = ({ onClick, ...rest }) => { } return ( - - -
- {resources.filter(settingsResources).map(renderSettingsMenuItemLink)} -
- -
+ <> + + + +
+ {resources.filter(settingsResources).map(renderSettingsMenuItemLink)} +
+ +
+ ) } diff --git a/ui/src/reducers/activityReducer.js b/ui/src/reducers/activityReducer.js new file mode 100644 index 000000000..95b1aaff7 --- /dev/null +++ b/ui/src/reducers/activityReducer.js @@ -0,0 +1,16 @@ +import { ACTIVITY_SCAN_STATUS_UPD } from '../actions' + +export const activityReducer = ( + previousState = { + scanStatus: { scanning: false, count: 0 }, + }, + payload +) => { + const { type, data } = payload + switch (type) { + case ACTIVITY_SCAN_STATUS_UPD: + return { ...previousState, scanStatus: data } + default: + return previousState + } +} diff --git a/ui/src/reducers/index.js b/ui/src/reducers/index.js index 28fbb1942..fd121d25f 100644 --- a/ui/src/reducers/index.js +++ b/ui/src/reducers/index.js @@ -2,3 +2,4 @@ export * from './themeReducer' export * from './dialogReducer' export * from './playQueue' export * from './albumView' +export * from './activityReducer' diff --git a/ui/src/subsonic/index.js b/ui/src/subsonic/index.js index 588bb330d..c59bb9f1d 100644 --- a/ui/src/subsonic/index.js +++ b/ui/src/subsonic/index.js @@ -9,7 +9,7 @@ const url = (command, id, options) => { params.append('f', 'json') params.append('v', '1.8.0') params.append('c', 'NavidromeUI') - params.append('id', id) + id && params.append('id', id) if (options) { if (options.ts) { options['_'] = new Date().getTime()