mirror of
https://github.com/foxcpp/maddy.git
synced 2025-04-04 21:47:40 +03:00
101 lines
2 KiB
Go
101 lines
2 KiB
Go
package updatepipe
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"strconv"
|
|
|
|
mess "github.com/foxcpp/go-imap-mess"
|
|
"github.com/foxcpp/maddy/framework/log"
|
|
"github.com/foxcpp/maddy/internal/updatepipe/pubsub"
|
|
)
|
|
|
|
type PubSubPipe struct {
|
|
PubSub pubsub.PubSub
|
|
Log log.Logger
|
|
}
|
|
|
|
func (p *PubSubPipe) Listen(upds chan<- mess.Update) error {
|
|
go func() {
|
|
for m := range p.PubSub.Listener() {
|
|
id, upd, err := parseUpdate(m.Payload)
|
|
if err != nil {
|
|
p.Log.Error("failed to parse update", err)
|
|
continue
|
|
}
|
|
if id == p.myID() {
|
|
continue
|
|
}
|
|
upds <- *upd
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (p *PubSubPipe) InitPush() error {
|
|
return nil
|
|
}
|
|
|
|
func (p *PubSubPipe) myID() string {
|
|
return fmt.Sprintf("%d-%p", os.Getpid(), p)
|
|
}
|
|
|
|
func (p *PubSubPipe) channel(key interface{}) (string, error) {
|
|
var psKey string
|
|
switch k := key.(type) {
|
|
case string:
|
|
psKey = k
|
|
case uint64:
|
|
psKey = "__uint64_" + strconv.FormatUint(k, 10)
|
|
default:
|
|
return "", fmt.Errorf("updatepipe: key type must be either string or uint64")
|
|
}
|
|
return psKey, nil
|
|
}
|
|
|
|
func (p *PubSubPipe) Subscribe(key interface{}) {
|
|
psKey, err := p.channel(key)
|
|
if err != nil {
|
|
p.Log.Error("invalid key passed to Subscribe", err)
|
|
return
|
|
}
|
|
|
|
if err := p.PubSub.Subscribe(context.TODO(), psKey); err != nil {
|
|
p.Log.Error("pubsub subscribe failed", err)
|
|
} else {
|
|
p.Log.DebugMsg("subscribed to pubsub", "channel", psKey)
|
|
}
|
|
}
|
|
|
|
func (p *PubSubPipe) Unsubscribe(key interface{}) {
|
|
psKey, err := p.channel(key)
|
|
if err != nil {
|
|
p.Log.Error("invalid key passed to Unsubscribe", err)
|
|
return
|
|
}
|
|
|
|
if err := p.PubSub.Unsubscribe(context.TODO(), psKey); err != nil {
|
|
p.Log.Error("pubsub unsubscribe failed", err)
|
|
} else {
|
|
p.Log.DebugMsg("unsubscribed from pubsub", "channel", psKey)
|
|
}
|
|
}
|
|
|
|
func (p *PubSubPipe) Push(upd mess.Update) error {
|
|
psKey, err := p.channel(upd.Key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
updBlob, err := formatUpdate(p.myID(), upd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return p.PubSub.Publish(psKey, updBlob)
|
|
}
|
|
|
|
func (p *PubSubPipe) Close() error {
|
|
return p.PubSub.Close()
|
|
}
|