sing/common/observable/observer.go
2022-07-12 14:05:58 +08:00

89 lines
1.8 KiB
Go

package observable
import (
"os"
"sync"
)
type Observable[T any] interface {
Subscribe() (subscription Subscription[T], done <-chan struct{}, err error)
UnSubscribe(subscription Subscription[T])
}
type Observer[T any] struct {
subscriber *Subscriber[T]
listenerSize int
listener map[Subscription[T]]*Subscriber[T]
mux sync.Mutex
done bool
}
func NewObserver[T any](subscriber *Subscriber[T], listenerBufferSize int) *Observer[T] {
observable := &Observer[T]{
subscriber: subscriber,
listener: map[Subscription[T]]*Subscriber[T]{},
listenerSize: listenerBufferSize,
}
go observable.process()
return observable
}
func (o *Observer[T]) process() {
subscription, done := o.subscriber.Subscription()
process:
for {
select {
case <-done:
break process
case entry := <-subscription:
o.mux.Lock()
for _, sub := range o.listener {
sub.Emit(entry)
}
o.mux.Unlock()
}
}
o.mux.Lock()
defer o.mux.Unlock()
for _, listener := range o.listener {
listener.Close()
}
}
func (o *Observer[T]) Subscribe() (subscription Subscription[T], done <-chan struct{}, err error) {
o.mux.Lock()
defer o.mux.Unlock()
if o.done {
return nil, nil, os.ErrClosed
}
subscriber := NewSubscriber[T](o.listenerSize)
subscription, done = subscriber.Subscription()
o.listener[subscription] = subscriber
return
}
func (o *Observer[T]) UnSubscribe(subscription Subscription[T]) {
o.mux.Lock()
defer o.mux.Unlock()
subscriber, exist := o.listener[subscription]
if !exist {
return
}
delete(o.listener, subscription)
subscriber.Close()
}
func (o *Observer[T]) Emit(item T) {
o.subscriber.Emit(item)
}
func (o *Observer[T]) Close() error {
o.mux.Lock()
defer o.mux.Unlock()
if o.done {
return os.ErrClosed
}
o.subscriber.Close()
o.done = true
return nil
}