diff --git a/cmd/wire_gen.go b/cmd/wire_gen.go
index f6f552617..b6e6fba57 100644
--- a/cmd/wire_gen.go
+++ b/cmd/wire_gen.go
@@ -12,6 +12,7 @@ import (
"github.com/deluan/navidrome/scanner"
"github.com/deluan/navidrome/server"
"github.com/deluan/navidrome/server/app"
+ "github.com/deluan/navidrome/server/events"
"github.com/deluan/navidrome/server/subsonic"
"github.com/google/wire"
"sync"
@@ -27,7 +28,8 @@ func CreateServer(musicFolder string) *server.Server {
func CreateAppRouter() *app.Router {
dataStore := persistence.New()
- router := app.New(dataStore)
+ broker := GetBroker()
+ router := app.New(dataStore, broker)
return router
}
@@ -53,10 +55,16 @@ func createScanner() scanner.Scanner {
artworkCache := core.GetImageCache()
artwork := core.NewArtwork(dataStore, artworkCache)
cacheWarmer := core.NewCacheWarmer(artwork, artworkCache)
- scannerScanner := scanner.New(dataStore, cacheWarmer)
+ broker := GetBroker()
+ scannerScanner := scanner.New(dataStore, cacheWarmer, broker)
return scannerScanner
}
+func createBroker() events.Broker {
+ broker := events.NewBroker()
+ return broker
+}
+
// wire_injectors.go:
var allProviders = wire.NewSet(core.Set, subsonic.New, app.New, persistence.New)
@@ -73,3 +81,16 @@ func GetScanner() scanner.Scanner {
})
return scannerInstance
}
+
+// Broker must be a Singleton
+var (
+ onceBroker sync.Once
+ brokerInstance events.Broker
+)
+
+func GetBroker() events.Broker {
+ onceBroker.Do(func() {
+ brokerInstance = createBroker()
+ })
+ return brokerInstance
+}
diff --git a/cmd/wire_injectors.go b/cmd/wire_injectors.go
index afd938b5b..6c45181d0 100644
--- a/cmd/wire_injectors.go
+++ b/cmd/wire_injectors.go
@@ -3,14 +3,16 @@
package cmd
import (
+ "sync"
+
"github.com/deluan/navidrome/core"
"github.com/deluan/navidrome/persistence"
"github.com/deluan/navidrome/scanner"
"github.com/deluan/navidrome/server"
"github.com/deluan/navidrome/server/app"
+ "github.com/deluan/navidrome/server/events"
"github.com/deluan/navidrome/server/subsonic"
"github.com/google/wire"
- "sync"
)
var allProviders = wire.NewSet(
@@ -28,7 +30,10 @@ func CreateServer(musicFolder string) *server.Server {
}
func CreateAppRouter() *app.Router {
- panic(wire.Build(allProviders))
+ panic(wire.Build(
+ allProviders,
+ GetBroker,
+ ))
}
func CreateSubsonicAPIRouter() *subsonic.Router {
@@ -54,6 +59,26 @@ func GetScanner() scanner.Scanner {
func createScanner() scanner.Scanner {
panic(wire.Build(
allProviders,
+ GetBroker,
scanner.New,
))
}
+
+// Broker must be a Singleton
+var (
+ onceBroker sync.Once
+ brokerInstance events.Broker
+)
+
+func GetBroker() events.Broker {
+ onceBroker.Do(func() {
+ brokerInstance = createBroker()
+ })
+ return brokerInstance
+}
+
+func createBroker() events.Broker {
+ panic(wire.Build(
+ events.NewBroker,
+ ))
+}
diff --git a/scanner/scanner.go b/scanner/scanner.go
index 2fae98aac..93be68526 100644
--- a/scanner/scanner.go
+++ b/scanner/scanner.go
@@ -12,6 +12,7 @@ import (
"github.com/deluan/navidrome/core"
"github.com/deluan/navidrome/log"
"github.com/deluan/navidrome/model"
+ "github.com/deluan/navidrome/server/events"
"github.com/deluan/navidrome/utils"
)
@@ -47,6 +48,7 @@ type scanner struct {
lock *sync.RWMutex
ds model.DataStore
cacheWarmer core.CacheWarmer
+ broker events.Broker
done chan bool
scan chan bool
}
@@ -57,10 +59,11 @@ type scanStatus struct {
lastUpdate time.Time
}
-func New(ds model.DataStore, cacheWarmer core.CacheWarmer) Scanner {
+func New(ds model.DataStore, cacheWarmer core.CacheWarmer, broker events.Broker) Scanner {
s := &scanner{
ds: ds,
cacheWarmer: cacheWarmer,
+ broker: broker,
folders: map[string]FolderScanner{},
status: map[string]*scanStatus{},
lock: &sync.RWMutex{},
@@ -107,14 +110,21 @@ func (s *scanner) rescan(mediaFolder string, fullRescan bool) error {
log.Debug("Scanning folder (full scan)", "folder", mediaFolder)
}
- progress := make(chan uint32)
+ progress := make(chan uint32, 100)
go func() {
+ defer func() {
+ s.broker.SendMessage(&events.ScanStatus{Scanning: false, Count: int64(s.status[mediaFolder].count)})
+ }()
for {
count, more := <-progress
if !more {
break
}
- atomic.AddUint32(&s.status[mediaFolder].count, count)
+ if count == 0 {
+ continue
+ }
+ total := atomic.AddUint32(&s.status[mediaFolder].count, count)
+ s.broker.SendMessage(&events.ScanStatus{Scanning: true, Count: int64(total)})
}
}()
diff --git a/server/app/app.go b/server/app/app.go
index eecdc4f18..96419fc3a 100644
--- a/server/app/app.go
+++ b/server/app/app.go
@@ -11,6 +11,7 @@ import (
"github.com/deluan/navidrome/core/auth"
"github.com/deluan/navidrome/log"
"github.com/deluan/navidrome/model"
+ "github.com/deluan/navidrome/server/events"
"github.com/deluan/rest"
"github.com/go-chi/chi"
"github.com/go-chi/httprate"
@@ -18,12 +19,13 @@ import (
)
type Router struct {
- ds model.DataStore
- mux http.Handler
+ ds model.DataStore
+ mux http.Handler
+ broker events.Broker
}
-func New(ds model.DataStore) *Router {
- return &Router{ds: ds}
+func New(ds model.DataStore, broker events.Broker) *Router {
+ return &Router{ds: ds, broker: broker}
}
func (app *Router) Setup(path string) {
@@ -68,6 +70,8 @@ func (app *Router) routes(path string) http.Handler {
// Keepalive endpoint to be used to keep the session valid (ex: while playing songs)
r.Get("/keepalive/*", func(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte(`{"response":"ok"}`)) })
+
+ r.Handle("/events", app.broker)
})
// Serve UI app assets
diff --git a/server/events/events.go b/server/events/events.go
new file mode 100644
index 000000000..8cf15a4d4
--- /dev/null
+++ b/server/events/events.go
@@ -0,0 +1,18 @@
+package events
+
+type Event interface {
+ EventName() string
+}
+
+type ScanStatus struct {
+ Scanning bool `json:"scanning"`
+ Count int64 `json:"count"`
+}
+
+func (s ScanStatus) EventName() string { return "scanStatus" }
+
+type KeepAlive struct {
+ TS int64 `json:"ts"`
+}
+
+func (s KeepAlive) EventName() string { return "keepAlive" }
diff --git a/server/events/sse.go b/server/events/sse.go
new file mode 100644
index 000000000..7a31e9321
--- /dev/null
+++ b/server/events/sse.go
@@ -0,0 +1,134 @@
+// Based on https://thoughtbot.com/blog/writing-a-server-sent-events-server-in-go
+package events
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "time"
+
+ "github.com/deluan/navidrome/log"
+)
+
+type Broker interface {
+ http.Handler
+ SendMessage(event Event)
+}
+
+type broker struct {
+ // Events are pushed to this channel by the main events-gathering routine
+ Notifier chan []byte
+
+ // New client connections
+ newClients chan chan []byte
+
+ // Closed client connections
+ closingClients chan chan []byte
+
+ // Client connections registry
+ clients map[chan []byte]bool
+}
+
+func NewBroker() Broker {
+ // Instantiate a broker
+ broker := &broker{
+ Notifier: make(chan []byte, 1),
+ newClients: make(chan chan []byte),
+ closingClients: make(chan chan []byte),
+ clients: make(map[chan []byte]bool),
+ }
+
+ // Set it running - listening and broadcasting events
+ go broker.listen()
+
+ return broker
+}
+
+func (broker *broker) SendMessage(event Event) {
+ pkg := struct {
+ Event `json:"data"`
+ Name string `json:"name"`
+ }{}
+ pkg.Name = event.EventName()
+ pkg.Event = event
+ data, _ := json.Marshal(pkg)
+ broker.Notifier <- data
+}
+
+func (broker *broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
+ // Make sure that the writer supports flushing.
+ //
+ flusher, ok := rw.(http.Flusher)
+
+ if !ok {
+ http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
+ return
+ }
+
+ rw.Header().Set("Content-Type", "text/event-stream")
+ rw.Header().Set("Cache-Control", "no-cache, no-transform")
+ rw.Header().Set("Connection", "keep-alive")
+ rw.Header().Set("Access-Control-Allow-Origin", "*")
+
+ // Each connection registers its own message channel with the Broker's connections registry
+ messageChan := make(chan []byte)
+
+ // Signal the broker that we have a new connection
+ broker.newClients <- messageChan
+
+ // Remove this client from the map of connected clients
+ // when this handler exits.
+ defer func() {
+ broker.closingClients <- messageChan
+ }()
+
+ // Listen to connection close and un-register messageChan
+ // notify := rw.(http.CloseNotifier).CloseNotify()
+ notify := req.Context().Done()
+
+ go func() {
+ <-notify
+ broker.closingClients <- messageChan
+ }()
+
+ for {
+ // Write to the ResponseWriter
+ // Server Sent Events compatible
+ _, _ = fmt.Fprintf(rw, "data: %s\n\n", <-messageChan)
+
+ // Flush the data immediately instead of buffering it for later.
+ flusher.Flush()
+ }
+}
+
+func (broker *broker) listen() {
+ keepAlive := time.NewTicker(15 * time.Second)
+ defer keepAlive.Stop()
+
+ for {
+ select {
+ case s := <-broker.newClients:
+
+ // A new client has connected.
+ // Register their message channel
+ broker.clients[s] = true
+ log.Debug("Client added", "numClients", len(broker.clients))
+ case s := <-broker.closingClients:
+
+ // A client has dettached and we want to
+ // stop sending them messages.
+ delete(broker.clients, s)
+ log.Debug("Removed client", "numClients", len(broker.clients))
+ case event := <-broker.Notifier:
+
+ // We got a new event from the outside!
+ // Send event to all connected clients
+ for clientMessageChan := range broker.clients {
+ clientMessageChan <- event
+ }
+ case ts := <-keepAlive.C:
+ // Send a keep alive packet every 15 seconds
+ broker.SendMessage(&KeepAlive{TS: ts.Unix()})
+ }
+ }
+}
diff --git a/ui/src/App.js b/ui/src/App.js
index ae219623d..eb98ff480 100644
--- a/ui/src/App.js
+++ b/ui/src/App.js
@@ -1,9 +1,9 @@
import React from 'react'
import ReactGA from 'react-ga'
import 'react-jinke-music-player/assets/index.css'
-import { Provider } from 'react-redux'
+import { Provider, useDispatch } from 'react-redux'
import { createHashHistory } from 'history'
-import { Admin, Resource } from 'react-admin'
+import { Admin as RAAdmin, Resource } from 'react-admin'
import dataProvider from './dataProvider'
import authProvider from './authProvider'
import { Layout, Login, Logout } from './layout'
@@ -21,10 +21,13 @@ import {
addToPlaylistDialogReducer,
playQueueReducer,
albumViewReducer,
+ activityReducer,
} from './reducers'
import createAdminStore from './store/createAdminStore'
import { i18nProvider } from './i18n'
import config from './config'
+import { startEventStream } from './eventStream'
+import { updateScanStatus } from './actions'
const history = createHashHistory()
if (config.gaTrackingId) {
@@ -46,10 +49,20 @@ const App = () => (
albumView: albumViewReducer,
theme: themeReducer,
addToPlaylistDialog: addToPlaylistDialogReducer,
+ activity: activityReducer,
},
})}
>
-