mirror of
https://github.com/SagerNet/sing.git
synced 2025-04-03 20:07:38 +03:00
89 lines
1.8 KiB
Go
89 lines
1.8 KiB
Go
package observable
|
|
|
|
import (
|
|
"os"
|
|
"sync"
|
|
)
|
|
|
|
type Observable[T any] interface {
|
|
Subscribe() (subscription Subscription[T], done <-chan struct{}, err error)
|
|
UnSubscribe(subscription Subscription[T])
|
|
}
|
|
|
|
type Observer[T any] struct {
|
|
subscriber *Subscriber[T]
|
|
listenerSize int
|
|
listener map[Subscription[T]]*Subscriber[T]
|
|
mux sync.Mutex
|
|
done bool
|
|
}
|
|
|
|
func NewObserver[T any](subscriber *Subscriber[T], listenerBufferSize int) *Observer[T] {
|
|
observable := &Observer[T]{
|
|
subscriber: subscriber,
|
|
listener: map[Subscription[T]]*Subscriber[T]{},
|
|
listenerSize: listenerBufferSize,
|
|
}
|
|
go observable.process()
|
|
return observable
|
|
}
|
|
|
|
func (o *Observer[T]) process() {
|
|
subscription, done := o.subscriber.Subscription()
|
|
process:
|
|
for {
|
|
select {
|
|
case <-done:
|
|
break process
|
|
case entry := <-subscription:
|
|
o.mux.Lock()
|
|
for _, sub := range o.listener {
|
|
sub.Emit(entry)
|
|
}
|
|
o.mux.Unlock()
|
|
}
|
|
}
|
|
o.mux.Lock()
|
|
defer o.mux.Unlock()
|
|
for _, listener := range o.listener {
|
|
listener.Close()
|
|
}
|
|
}
|
|
|
|
func (o *Observer[T]) Subscribe() (subscription Subscription[T], done <-chan struct{}, err error) {
|
|
o.mux.Lock()
|
|
defer o.mux.Unlock()
|
|
if o.done {
|
|
return nil, nil, os.ErrClosed
|
|
}
|
|
subscriber := NewSubscriber[T](o.listenerSize)
|
|
subscription, done = subscriber.Subscription()
|
|
o.listener[subscription] = subscriber
|
|
return
|
|
}
|
|
|
|
func (o *Observer[T]) UnSubscribe(subscription Subscription[T]) {
|
|
o.mux.Lock()
|
|
defer o.mux.Unlock()
|
|
subscriber, exist := o.listener[subscription]
|
|
if !exist {
|
|
return
|
|
}
|
|
delete(o.listener, subscription)
|
|
subscriber.Close()
|
|
}
|
|
|
|
func (o *Observer[T]) Emit(item T) {
|
|
o.subscriber.Emit(item)
|
|
}
|
|
|
|
func (o *Observer[T]) Close() error {
|
|
o.mux.Lock()
|
|
defer o.mux.Unlock()
|
|
if o.done {
|
|
return os.ErrClosed
|
|
}
|
|
o.subscriber.Close()
|
|
o.done = true
|
|
return nil
|
|
}
|