diff --git a/go.mod b/go.mod index d7c8440..bf3dddd 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/foxcpp/go-dovecot-sasl v0.0.0-20200522223722-c4699d7a24bf github.com/foxcpp/go-imap-backend-tests v0.0.0-20200802090154-7e6248c85a0e github.com/foxcpp/go-imap-i18nlevel v0.0.0-20200208001533-d6ec88553005 - github.com/foxcpp/go-imap-mess v0.0.0-20210718073110-d5eb968a0995 + github.com/foxcpp/go-imap-mess v0.0.0-20210718180745-f14f34d14a3b github.com/foxcpp/go-imap-namespace v0.0.0-20200802091432-08496dd8e0ed github.com/foxcpp/go-imap-sql v0.4.1-0.20210718082546-d38d40f5442c github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15 diff --git a/go.sum b/go.sum index 6bbc084..34db9ba 100644 --- a/go.sum +++ b/go.sum @@ -164,12 +164,11 @@ github.com/foxcpp/go-imap-i18nlevel v0.0.0-20200208001533-d6ec88553005 h1:pfoFtk github.com/foxcpp/go-imap-i18nlevel v0.0.0-20200208001533-d6ec88553005/go.mod h1:34FwxnjC2N+EFs2wMtsHevrZLWRKRuVU8wEcHWKq/nE= github.com/foxcpp/go-imap-idle v0.0.0-20200829140055-32dc40172769 h1:qMdULYMxKuAgboOllBNLmA7wQ4YEwnhvq8onksrMC3A= github.com/foxcpp/go-imap-idle v0.0.0-20200829140055-32dc40172769/go.mod h1:PLnHIusEiOdmy63Y7IL2RjShIk4cyFi3a8MTC/WcLkk= -github.com/foxcpp/go-imap-mess v0.0.0-20210718073110-d5eb968a0995 h1:UXksj5CP+1Zg5GCD74JEijuQ2C22vXFgquhdG/YlXuI= github.com/foxcpp/go-imap-mess v0.0.0-20210718073110-d5eb968a0995/go.mod h1:cps13jIcqI/3FGVJQ8azz3apLjikGoKKcB6xb65VSag= +github.com/foxcpp/go-imap-mess v0.0.0-20210718180745-f14f34d14a3b h1:O85JcWCduTZz5FkqCrQX79wc88lgSezbTqrWNgXyEow= +github.com/foxcpp/go-imap-mess v0.0.0-20210718180745-f14f34d14a3b/go.mod h1:cps13jIcqI/3FGVJQ8azz3apLjikGoKKcB6xb65VSag= github.com/foxcpp/go-imap-namespace v0.0.0-20200802091432-08496dd8e0ed h1:1Jo7geyvunrPSjL6F6D9EcXoNApS5v3LQaro7aUNPnE= github.com/foxcpp/go-imap-namespace v0.0.0-20200802091432-08496dd8e0ed/go.mod h1:Shows1vmkBWO40ChOClaUe6DUnZrsP1UPAuoWzIUdgQ= -github.com/foxcpp/go-imap-sql v0.4.1-0.20210718081250-7f103db60f22 h1:ZXgI+FkKj+iKSrI58CZEkbSSojV84jm4LjEwglyH4Ik= -github.com/foxcpp/go-imap-sql v0.4.1-0.20210718081250-7f103db60f22/go.mod h1:kl+x+noffdBsp1pAR+PTHupLoxHnJZZw6BFcmmCZeUI= github.com/foxcpp/go-imap-sql v0.4.1-0.20210718082546-d38d40f5442c h1:bNaw79UJEonuRo1box0wUGGBT2V/XGkhT2NakjZnRIs= github.com/foxcpp/go-imap-sql v0.4.1-0.20210718082546-d38d40f5442c/go.mod h1:kl+x+noffdBsp1pAR+PTHupLoxHnJZZw6BFcmmCZeUI= github.com/foxcpp/go-mockdns v0.0.0-20191216195825-5eabd8dbfe1f/go.mod h1:tPg4cp4nseejPd+UKxtCVQ2hUxNTZ7qQZJa7CLriIeo= diff --git a/internal/storage/imapsql/imapsql.go b/internal/storage/imapsql/imapsql.go index 4ebc731..a800716 100644 --- a/internal/storage/imapsql/imapsql.go +++ b/internal/storage/imapsql/imapsql.go @@ -48,6 +48,7 @@ import ( "github.com/foxcpp/maddy/framework/module" "github.com/foxcpp/maddy/internal/authz" "github.com/foxcpp/maddy/internal/updatepipe" + "github.com/foxcpp/maddy/internal/updatepipe/pubsub" _ "github.com/go-sql-driver/mysql" _ "github.com/lib/pq" @@ -252,8 +253,22 @@ func (store *Storage) EnableUpdatePipe(mode updatepipe.BackendMode) error { store.Log.DebugMsg("using unix socket for external updates", "path", sockPath) store.updPipe = &updatepipe.UnixSockPipe{ SockPath: sockPath, - Log: log.Logger{Name: "sql/updpipe", Debug: store.Log.Debug}, + Log: log.Logger{Name: "storage.imapsql/updpipe", Debug: store.Log.Debug}, } + case "postgres": + store.Log.DebugMsg("using PostgreSQL broker for external updates") + ps, err := pubsub.NewPQ(strings.Join(store.dsn, " ")) + if err != nil { + return fmt.Errorf("enable_update_pipe: %w", err) + } + ps.Log = log.Logger{Name: "storage.imapsql/updpipe/pubsub", Debug: store.Log.Debug} + pipe := &updatepipe.PubSubPipe{ + PubSub: ps, + Log: log.Logger{Name: "storage.imapsql/updpipe", Debug: store.Log.Debug}, + } + store.Back.UpdateManager().ExternalUnsubscribe = pipe.Unsubscribe + store.Back.UpdateManager().ExternalSubscribe = pipe.Subscribe + store.updPipe = pipe default: return errors.New("imapsql: driver does not have an update pipe implementation") } diff --git a/internal/updatepipe/pubsub/pq.go b/internal/updatepipe/pubsub/pq.go new file mode 100644 index 0000000..29f9c52 --- /dev/null +++ b/internal/updatepipe/pubsub/pq.go @@ -0,0 +1,86 @@ +package pubsub + +import ( + "context" + "database/sql" + "time" + + "github.com/foxcpp/maddy/framework/log" + "github.com/lib/pq" +) + +type Msg struct { + Key string + Payload string +} + +type PqPubSub struct { + Notify chan Msg + + L *pq.Listener + sender *sql.DB + + Log log.Logger +} + +func NewPQ(dsn string) (*PqPubSub, error) { + l := &PqPubSub{ + Log: log.Logger{Name: "pgpubsub"}, + Notify: make(chan Msg), + } + l.L = pq.NewListener(dsn, 10*time.Second, time.Minute, l.eventHandler) + var err error + l.sender, err = sql.Open("postgres", dsn) + if err != nil { + return nil, err + } + + go func() { + defer close(l.Notify) + for n := range l.L.Notify { + if n == nil { + continue + } + + l.Notify <- Msg{Key: n.Channel, Payload: n.Extra} + } + }() + + return l, nil +} + +func (l *PqPubSub) Close() error { + l.sender.Close() + l.L.Close() + return nil +} + +func (l *PqPubSub) eventHandler(ev pq.ListenerEventType, err error) { + switch ev { + case pq.ListenerEventConnected: + l.Log.DebugMsg("connected") + case pq.ListenerEventReconnected: + l.Log.Msg("connection reestablished") + case pq.ListenerEventConnectionAttemptFailed: + l.Log.Error("connection attempt failed", err) + case pq.ListenerEventDisconnected: + l.Log.Msg("connection closed", "err", err) + } +} + +func (l *PqPubSub) Subscribe(_ context.Context, key string) error { + return l.L.Listen(key) +} + +func (l *PqPubSub) Unsubscribe(_ context.Context, key string) error { + return l.L.Unlisten(key) +} + +func (l *PqPubSub) Publish(key, payload string) error { + _, err := l.sender.Exec(`SELECT pg_notify($1, $2)`, key, payload) + return err +} + +func (l *PqPubSub) Listener() chan Msg { + return l.Notify +} diff --git a/internal/updatepipe/pubsub/pubsub.go b/internal/updatepipe/pubsub/pubsub.go new file mode 100644 index 0000000..64480ab --- /dev/null +++ b/internal/updatepipe/pubsub/pubsub.go @@ -0,0 +1,11 @@ +package pubsub + +import "context" + +type PubSub interface { + Subscribe(ctx context.Context, key string) error + Unsubscribe(ctx context.Context, key string) error + Publish(key, payload string) error + Listener() chan Msg + Close() error +} diff --git a/internal/updatepipe/pubsub_pipe.go b/internal/updatepipe/pubsub_pipe.go new file mode 100644 index 0000000..b1cef67 --- /dev/null +++ b/internal/updatepipe/pubsub_pipe.go @@ -0,0 +1,101 @@ +package updatepipe + +import ( + "context" + "fmt" + "os" + "strconv" + + mess "github.com/foxcpp/go-imap-mess" + "github.com/foxcpp/maddy/framework/log" + "github.com/foxcpp/maddy/internal/updatepipe/pubsub" +) + +type PubSubPipe struct { + PubSub pubsub.PubSub + Log log.Logger +} + +func (p *PubSubPipe) Listen(upds chan<- mess.Update) error { + go func() { + for m := range p.PubSub.Listener() { + id, upd, err := parseUpdate(m.Payload) + if err != nil { + p.Log.Error("failed to parse update", err) + continue + } + if id == p.myID() { + continue + } + upds <- *upd + } + }() + return nil +} + +func (p *PubSubPipe) InitPush() error { + return nil +} + +func (p *PubSubPipe) myID() string { + return fmt.Sprintf("%d-%p", os.Getpid(), p) +} + +func (p *PubSubPipe) channel(key interface{}) (string, error) { + var psKey string + switch k := key.(type) { + case string: + psKey = k + case uint64: + psKey = "__uint64_" + strconv.FormatUint(k, 10) + default: + return "", fmt.Errorf("updatepipe: key type must be either string or uint64") + } + return psKey, nil +} + +func (p *PubSubPipe) Subscribe(key interface{}) { + psKey, err := p.channel(key) + if err != nil { + p.Log.Error("invalid key passed to Subscribe", err) + return + } + + if err := p.PubSub.Subscribe(context.TODO(), psKey); err != nil { + p.Log.Error("pubsub subscribe failed", err) + } else { + p.Log.DebugMsg("subscribed to pubsub", "channel", psKey) + } +} + +func (p *PubSubPipe) Unsubscribe(key interface{}) { + psKey, err := p.channel(key) + if err != nil { + p.Log.Error("invalid key passed to Unsubscribe", err) + return + } + + if err := p.PubSub.Unsubscribe(context.TODO(), psKey); err != nil { + p.Log.Error("pubsub unsubscribe failed", err) + } else { + p.Log.DebugMsg("unsubscribed from pubsub", "channel", psKey) + } +} + +func (p *PubSubPipe) Push(upd mess.Update) error { + psKey, err := p.channel(upd.Key) + if err != nil { + return err + } + + updBlob, err := formatUpdate(p.myID(), upd) + if err != nil { + return err + } + + return p.PubSub.Publish(psKey, updBlob) +} + +func (p *PubSubPipe) Close() error { + return p.PubSub.Close() +}