storage/imapsql: Add support for using PostgreSQL broker for updates

This commit is contained in:
fox.cpp 2021-07-18 21:12:12 +03:00
parent 8b494ff5d7
commit 461cf0b90f
No known key found for this signature in database
GPG key ID: 5B991F6215D2FCC0
6 changed files with 217 additions and 5 deletions

2
go.mod
View file

@ -23,7 +23,7 @@ require (
github.com/foxcpp/go-dovecot-sasl v0.0.0-20200522223722-c4699d7a24bf
github.com/foxcpp/go-imap-backend-tests v0.0.0-20200802090154-7e6248c85a0e
github.com/foxcpp/go-imap-i18nlevel v0.0.0-20200208001533-d6ec88553005
github.com/foxcpp/go-imap-mess v0.0.0-20210718073110-d5eb968a0995
github.com/foxcpp/go-imap-mess v0.0.0-20210718180745-f14f34d14a3b
github.com/foxcpp/go-imap-namespace v0.0.0-20200802091432-08496dd8e0ed
github.com/foxcpp/go-imap-sql v0.4.1-0.20210718082546-d38d40f5442c
github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15

5
go.sum
View file

@ -164,12 +164,11 @@ github.com/foxcpp/go-imap-i18nlevel v0.0.0-20200208001533-d6ec88553005 h1:pfoFtk
github.com/foxcpp/go-imap-i18nlevel v0.0.0-20200208001533-d6ec88553005/go.mod h1:34FwxnjC2N+EFs2wMtsHevrZLWRKRuVU8wEcHWKq/nE=
github.com/foxcpp/go-imap-idle v0.0.0-20200829140055-32dc40172769 h1:qMdULYMxKuAgboOllBNLmA7wQ4YEwnhvq8onksrMC3A=
github.com/foxcpp/go-imap-idle v0.0.0-20200829140055-32dc40172769/go.mod h1:PLnHIusEiOdmy63Y7IL2RjShIk4cyFi3a8MTC/WcLkk=
github.com/foxcpp/go-imap-mess v0.0.0-20210718073110-d5eb968a0995 h1:UXksj5CP+1Zg5GCD74JEijuQ2C22vXFgquhdG/YlXuI=
github.com/foxcpp/go-imap-mess v0.0.0-20210718073110-d5eb968a0995/go.mod h1:cps13jIcqI/3FGVJQ8azz3apLjikGoKKcB6xb65VSag=
github.com/foxcpp/go-imap-mess v0.0.0-20210718180745-f14f34d14a3b h1:O85JcWCduTZz5FkqCrQX79wc88lgSezbTqrWNgXyEow=
github.com/foxcpp/go-imap-mess v0.0.0-20210718180745-f14f34d14a3b/go.mod h1:cps13jIcqI/3FGVJQ8azz3apLjikGoKKcB6xb65VSag=
github.com/foxcpp/go-imap-namespace v0.0.0-20200802091432-08496dd8e0ed h1:1Jo7geyvunrPSjL6F6D9EcXoNApS5v3LQaro7aUNPnE=
github.com/foxcpp/go-imap-namespace v0.0.0-20200802091432-08496dd8e0ed/go.mod h1:Shows1vmkBWO40ChOClaUe6DUnZrsP1UPAuoWzIUdgQ=
github.com/foxcpp/go-imap-sql v0.4.1-0.20210718081250-7f103db60f22 h1:ZXgI+FkKj+iKSrI58CZEkbSSojV84jm4LjEwglyH4Ik=
github.com/foxcpp/go-imap-sql v0.4.1-0.20210718081250-7f103db60f22/go.mod h1:kl+x+noffdBsp1pAR+PTHupLoxHnJZZw6BFcmmCZeUI=
github.com/foxcpp/go-imap-sql v0.4.1-0.20210718082546-d38d40f5442c h1:bNaw79UJEonuRo1box0wUGGBT2V/XGkhT2NakjZnRIs=
github.com/foxcpp/go-imap-sql v0.4.1-0.20210718082546-d38d40f5442c/go.mod h1:kl+x+noffdBsp1pAR+PTHupLoxHnJZZw6BFcmmCZeUI=
github.com/foxcpp/go-mockdns v0.0.0-20191216195825-5eabd8dbfe1f/go.mod h1:tPg4cp4nseejPd+UKxtCVQ2hUxNTZ7qQZJa7CLriIeo=

View file

@ -48,6 +48,7 @@ import (
"github.com/foxcpp/maddy/framework/module"
"github.com/foxcpp/maddy/internal/authz"
"github.com/foxcpp/maddy/internal/updatepipe"
"github.com/foxcpp/maddy/internal/updatepipe/pubsub"
_ "github.com/go-sql-driver/mysql"
_ "github.com/lib/pq"
@ -252,8 +253,22 @@ func (store *Storage) EnableUpdatePipe(mode updatepipe.BackendMode) error {
store.Log.DebugMsg("using unix socket for external updates", "path", sockPath)
store.updPipe = &updatepipe.UnixSockPipe{
SockPath: sockPath,
Log: log.Logger{Name: "sql/updpipe", Debug: store.Log.Debug},
Log: log.Logger{Name: "storage.imapsql/updpipe", Debug: store.Log.Debug},
}
case "postgres":
store.Log.DebugMsg("using PostgreSQL broker for external updates")
ps, err := pubsub.NewPQ(strings.Join(store.dsn, " "))
if err != nil {
return fmt.Errorf("enable_update_pipe: %w", err)
}
ps.Log = log.Logger{Name: "storage.imapsql/updpipe/pubsub", Debug: store.Log.Debug}
pipe := &updatepipe.PubSubPipe{
PubSub: ps,
Log: log.Logger{Name: "storage.imapsql/updpipe", Debug: store.Log.Debug},
}
store.Back.UpdateManager().ExternalUnsubscribe = pipe.Unsubscribe
store.Back.UpdateManager().ExternalSubscribe = pipe.Subscribe
store.updPipe = pipe
default:
return errors.New("imapsql: driver does not have an update pipe implementation")
}

View file

@ -0,0 +1,86 @@
package pubsub
import (
"context"
"database/sql"
"time"
"github.com/foxcpp/maddy/framework/log"
"github.com/lib/pq"
)
type Msg struct {
Key string
Payload string
}
type PqPubSub struct {
Notify chan Msg
L *pq.Listener
sender *sql.DB
Log log.Logger
}
func NewPQ(dsn string) (*PqPubSub, error) {
l := &PqPubSub{
Log: log.Logger{Name: "pgpubsub"},
Notify: make(chan Msg),
}
l.L = pq.NewListener(dsn, 10*time.Second, time.Minute, l.eventHandler)
var err error
l.sender, err = sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
go func() {
defer close(l.Notify)
for n := range l.L.Notify {
if n == nil {
continue
}
l.Notify <- Msg{Key: n.Channel, Payload: n.Extra}
}
}()
return l, nil
}
func (l *PqPubSub) Close() error {
l.sender.Close()
l.L.Close()
return nil
}
func (l *PqPubSub) eventHandler(ev pq.ListenerEventType, err error) {
switch ev {
case pq.ListenerEventConnected:
l.Log.DebugMsg("connected")
case pq.ListenerEventReconnected:
l.Log.Msg("connection reestablished")
case pq.ListenerEventConnectionAttemptFailed:
l.Log.Error("connection attempt failed", err)
case pq.ListenerEventDisconnected:
l.Log.Msg("connection closed", "err", err)
}
}
func (l *PqPubSub) Subscribe(_ context.Context, key string) error {
return l.L.Listen(key)
}
func (l *PqPubSub) Unsubscribe(_ context.Context, key string) error {
return l.L.Unlisten(key)
}
func (l *PqPubSub) Publish(key, payload string) error {
_, err := l.sender.Exec(`SELECT pg_notify($1, $2)`, key, payload)
return err
}
func (l *PqPubSub) Listener() chan Msg {
return l.Notify
}

View file

@ -0,0 +1,11 @@
package pubsub
import "context"
type PubSub interface {
Subscribe(ctx context.Context, key string) error
Unsubscribe(ctx context.Context, key string) error
Publish(key, payload string) error
Listener() chan Msg
Close() error
}

View file

@ -0,0 +1,101 @@
package updatepipe
import (
"context"
"fmt"
"os"
"strconv"
mess "github.com/foxcpp/go-imap-mess"
"github.com/foxcpp/maddy/framework/log"
"github.com/foxcpp/maddy/internal/updatepipe/pubsub"
)
type PubSubPipe struct {
PubSub pubsub.PubSub
Log log.Logger
}
func (p *PubSubPipe) Listen(upds chan<- mess.Update) error {
go func() {
for m := range p.PubSub.Listener() {
id, upd, err := parseUpdate(m.Payload)
if err != nil {
p.Log.Error("failed to parse update", err)
continue
}
if id == p.myID() {
continue
}
upds <- *upd
}
}()
return nil
}
func (p *PubSubPipe) InitPush() error {
return nil
}
func (p *PubSubPipe) myID() string {
return fmt.Sprintf("%d-%p", os.Getpid(), p)
}
func (p *PubSubPipe) channel(key interface{}) (string, error) {
var psKey string
switch k := key.(type) {
case string:
psKey = k
case uint64:
psKey = "__uint64_" + strconv.FormatUint(k, 10)
default:
return "", fmt.Errorf("updatepipe: key type must be either string or uint64")
}
return psKey, nil
}
func (p *PubSubPipe) Subscribe(key interface{}) {
psKey, err := p.channel(key)
if err != nil {
p.Log.Error("invalid key passed to Subscribe", err)
return
}
if err := p.PubSub.Subscribe(context.TODO(), psKey); err != nil {
p.Log.Error("pubsub subscribe failed", err)
} else {
p.Log.DebugMsg("subscribed to pubsub", "channel", psKey)
}
}
func (p *PubSubPipe) Unsubscribe(key interface{}) {
psKey, err := p.channel(key)
if err != nil {
p.Log.Error("invalid key passed to Unsubscribe", err)
return
}
if err := p.PubSub.Unsubscribe(context.TODO(), psKey); err != nil {
p.Log.Error("pubsub unsubscribe failed", err)
} else {
p.Log.DebugMsg("unsubscribed from pubsub", "channel", psKey)
}
}
func (p *PubSubPipe) Push(upd mess.Update) error {
psKey, err := p.channel(upd.Key)
if err != nil {
return err
}
updBlob, err := formatUpdate(p.myID(), upd)
if err != nil {
return err
}
return p.PubSub.Publish(psKey, updBlob)
}
func (p *PubSubPipe) Close() error {
return p.PubSub.Close()
}