Use Unix socket to pass IMAP updates from maddyctl to daemon

There is abstraction 'updates pipe' defined for future use with
configuration involving IMAP data replication (e.g. multiple nodes with
maddy instances + PostgreSQL replicas + S3 bucket for messages).

However, for the case of local SQLite3 DB, limited UDS-based
implementation is provided. It solves the problem of maddyctl not being
able to tell the server about modifications it makes. Alternative to
this approach would be to have server actually perform operations and
maddyctl being a dumb API client, but this requires a lot more complex
IPC interface and will not work when the server is down.
This commit is contained in:
fox.cpp 2019-12-08 03:27:31 +03:00
parent 9e5bb288b3
commit a574b9fbb2
No known key found for this signature in database
GPG key ID: E76D97CCEDE90B6C
10 changed files with 431 additions and 11 deletions

View file

@ -233,7 +233,6 @@ func msgsRemove(be Storage, ctx *cli.Context) error {
}
if !ctx.Bool("yes") {
fmt.Fprintf(os.Stderr, "Currently, it is unsafe to remove messages from mailboxes used by connected clients, continue?")
if !clitools.Confirmation("Are you sure you want to delete these messages?", false) {
return errors.New("Cancelled")
}

View file

@ -8,6 +8,7 @@ import (
"github.com/emersion/go-imap/backend"
"github.com/foxcpp/maddy"
"github.com/foxcpp/maddy/internal/updatepipe"
"github.com/urfave/cli"
"golang.org/x/crypto/bcrypt"
)
@ -580,12 +581,26 @@ func openStorage(ctx *cli.Context) (Storage, error) {
return nil, err
}
var store Storage
switch node.Name {
case "sql":
return sqlFromCfgBlock(root, node)
store, err = sqlFromCfgBlock(root, node)
if err != nil {
return nil, err
}
default:
return nil, errors.New("Error: Storage backend is not supported by maddyctl")
}
if updStore, ok := store.(updatepipe.Backend); ok {
if err := updStore.EnableUpdatePipe(updatepipe.ModePush); err != nil && !errors.Is(err, os.ErrNotExist) {
fmt.Fprintf(os.Stderr, "Failed to initialize update pipe, do not remove messages from mailboxes open by clients: %v\n", err)
}
} else {
fmt.Fprintf(os.Stderr, "No update pipe support, do not remove messages from mailboxes open by clients\n")
}
return store, nil
}
func openUserDB(ctx *cli.Context) (UserDB, error) {

2
go.mod
View file

@ -17,7 +17,7 @@ require (
github.com/emersion/go-msgauth v0.3.2-0.20191028231513-55b75676976c
github.com/emersion/go-sasl v0.0.0-20190817083125-240c8404624e
github.com/emersion/go-smtp v0.12.1-0.20191206174923-1f576e0ec85c
github.com/foxcpp/go-imap-sql v0.3.2-0.20191109135752-42bd4cfc6673
github.com/foxcpp/go-imap-sql v0.3.2-0.20191208094750-8b4ec6b19a78
github.com/foxcpp/go-mockdns v0.0.0-20191123143003-02edb10da1e3
github.com/go-sql-driver/mysql v1.4.1
github.com/google/uuid v1.1.1

4
go.sum
View file

@ -65,6 +65,10 @@ github.com/foxcpp/go-imap-backend-tests v0.0.0-20190615132041-281c43ad777b h1:tC
github.com/foxcpp/go-imap-backend-tests v0.0.0-20190615132041-281c43ad777b/go.mod h1:yUISYv/uXLQ6tQZcds/p/hdcZ5JzrEUifyED2VffWpc=
github.com/foxcpp/go-imap-sql v0.3.2-0.20191109135752-42bd4cfc6673 h1:YDvjzA0mFuRKqSm31Yp9sbyql17cMPpnBj8/8tOJH0E=
github.com/foxcpp/go-imap-sql v0.3.2-0.20191109135752-42bd4cfc6673/go.mod h1:ooCDdmcUlEhvNl57ROzeZTpqJuoMP3HNd03kemcF4AE=
github.com/foxcpp/go-imap-sql v0.3.2-0.20191207234110-dc9fdf3fe4c5 h1:2oLeS6iixM/TIuKi8jHWuxjDlYPuCKq8zdXmOeYyFOw=
github.com/foxcpp/go-imap-sql v0.3.2-0.20191207234110-dc9fdf3fe4c5/go.mod h1:ooCDdmcUlEhvNl57ROzeZTpqJuoMP3HNd03kemcF4AE=
github.com/foxcpp/go-imap-sql v0.3.2-0.20191208094750-8b4ec6b19a78 h1:vZrsheTVI5Rywi9q4gEuhF23GTRfUjVsWrN2co9xVPg=
github.com/foxcpp/go-imap-sql v0.3.2-0.20191208094750-8b4ec6b19a78/go.mod h1:ooCDdmcUlEhvNl57ROzeZTpqJuoMP3HNd03kemcF4AE=
github.com/foxcpp/go-mockdns v0.0.0-20191116212937-0c6663a11ba3 h1:WpRSDZUlGndFXNGagURufe4nsxzKf/KkcLWGopyO1So=
github.com/foxcpp/go-mockdns v0.0.0-20191116212937-0c6663a11ba3/go.mod h1:tPg4cp4nseejPd+UKxtCVQ2hUxNTZ7qQZJa7CLriIeo=
github.com/foxcpp/go-mockdns v0.0.0-20191123143003-02edb10da1e3 h1:Xb9iu/7Gy3ZIaMUCHQICa0jSCeY4Sow2X877Y0GZ1pM=

View file

@ -24,6 +24,7 @@ import (
modconfig "github.com/foxcpp/maddy/internal/config/module"
"github.com/foxcpp/maddy/internal/log"
"github.com/foxcpp/maddy/internal/module"
"github.com/foxcpp/maddy/internal/updatepipe"
)
type Endpoint struct {
@ -73,6 +74,12 @@ func (endp *Endpoint) Init(cfg *config.Map) error {
return fmt.Errorf("imap: storage module %T does not implement imapbackend.BackendUpdater", endp.Store)
}
if updBe, ok := endp.Store.(updatepipe.Backend); ok {
if err := updBe.EnableUpdatePipe(updatepipe.ModeReplicate); err != nil {
endp.Log.Error("failed to initialize updates pipe", err)
}
}
// Call Updates once at start, some storage backends initialize update
// channel lazily and may not generate updates at all unless it is called.
if endp.updater.Updates() == nil {
@ -105,6 +112,14 @@ func (endp *Endpoint) Init(cfg *config.Map) error {
return err
}
if err := endp.setupListeners(addresses); err != nil {
return err
}
return nil
}
func (endp *Endpoint) setupListeners(addresses []config.Endpoint) error {
for _, addr := range addresses {
var l net.Listener
var err error

View file

@ -8,10 +8,13 @@
package sql
import (
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"strconv"
"strings"
@ -27,6 +30,7 @@ import (
"github.com/foxcpp/maddy/internal/log"
"github.com/foxcpp/maddy/internal/module"
"github.com/foxcpp/maddy/internal/target"
"github.com/foxcpp/maddy/internal/updatepipe"
"golang.org/x/text/secure/precis"
_ "github.com/go-sql-driver/mysql"
@ -40,10 +44,14 @@ type Storage struct {
junkMbox string
inlineDriver string
inlineDsn []string
driver string
dsn []string
resolver dns.Resolver
updates <-chan backend.Update
updPipe updatepipe.P
updPushStop chan struct{}
}
type delivery struct {
@ -144,8 +152,8 @@ func New(_, instName string, _, inlineArgs []string) (module.Module, error) {
return nil, errors.New("sql: expected at least 2 arguments")
}
store.inlineDriver = inlineArgs[0]
store.inlineDsn = inlineArgs[1:]
store.driver = inlineArgs[0]
store.dsn = inlineArgs[1:]
}
return store, nil
}
@ -164,8 +172,8 @@ func (store *Storage) Init(cfg *config.Map) error {
// configured).
LazyUpdatesInit: true,
}
cfg.String("driver", false, false, store.inlineDriver, &driver)
cfg.StringList("dsn", false, false, store.inlineDsn, &dsn)
cfg.String("driver", false, false, store.driver, &driver)
cfg.StringList("dsn", false, false, store.dsn, &dsn)
cfg.Custom("fsstore", false, false, func() (interface{}, error) {
return "messages", nil
}, func(m *config.Map, node *config.Node) (interface{}, error) {
@ -244,6 +252,63 @@ func (store *Storage) Init(cfg *config.Map) error {
store.Log.Debugln("go-imap-sql version", imapsql.VersionStr)
store.driver = driver
store.dsn = dsn
return nil
}
func (store *Storage) EnableUpdatePipe(mode updatepipe.BackendMode) error {
if store.updPipe != nil {
return nil
}
if store.updates != nil {
panic("sql: EnableUpdatePipe called after Updates")
}
upds := store.Back.Updates()
switch store.driver {
case "sqlite3":
dbId := sha256.Sum256([]byte(strings.Join(store.dsn, " ")))
store.updPipe = &updatepipe.UnixSockPipe{
SockPath: filepath.Join(
config.RuntimeDirectory,
fmt.Sprintf("sql-%s.sock", hex.EncodeToString(dbId[:]))),
Log: log.Logger{Name: "sql/updpipe", Debug: store.Log.Debug},
}
default:
return errors.New("sql: driver does not have an update pipe implementation")
}
wrapped := make(chan backend.Update, cap(upds)*2)
if mode == updatepipe.ModeReplicate {
if err := store.updPipe.Listen(wrapped); err != nil {
return err
}
}
if err := store.updPipe.InitPush(); err != nil {
return err
}
store.updPushStop = make(chan struct{})
go func() {
for u := range upds {
if err := store.updPipe.Push(u); err != nil {
store.Log.Error("IMAP update pipe push failed", err)
}
if mode != updatepipe.ModePush {
wrapped <- u
}
}
close(wrapped)
store.updPushStop <- struct{}{}
}()
store.updates = wrapped
return nil
}
@ -256,7 +321,12 @@ func (store *Storage) CreateMessageLimit() *uint32 {
}
func (store *Storage) Updates() <-chan backend.Update {
return store.Back.Updates()
if store.updates != nil {
return store.updates
}
store.updates = store.Back.Updates()
return store.updates
}
func (store *Storage) EnableChildrenExt() bool {
@ -313,7 +383,23 @@ func (store *Storage) GetOrCreateUser(username string) (backend.User, error) {
}
func (store *Storage) Close() error {
return store.Back.Close()
// Closes update channel. 'updates replicate' goroutine stops (see
// EnableUpdatePipe).
store.Back.Close()
// Close UpdatePipe, stops generation of new updates.
if store.updPipe != nil {
store.updPipe.Close()
}
// Wait for 'updates replicate' goroutine to actually stop so we will send
// all updates before shuting down (this is especially important for
// maddyctl).
if store.updPushStop != nil {
<-store.updPushStop
}
return nil
}
func init() {

View file

@ -0,0 +1,27 @@
package updatepipe
type BackendMode int
const (
// ModeReplicate configures backend to both send and receive updates over
// the pipe.
ModeReplicate BackendMode = iota
// ModePush configures backend to send updates over the pipe only.
//
// If EnableUpdatePipe(ModePush) is called for backend, its Updates()
// channel will never receive any updates.
ModePush BackendMode = iota
)
// The Backend interface is implemented by storage backends that support both
// updates serialization using the internal updatepipe.P implementation.
// To activate this implementation, EnableUpdatePipe should be called.
type Backend interface {
// EnableUpdatePipe enables the internal update pipe implementation.
// The mode argument selects the pipe behavior. EnableUpdatePipe must be
// called before the first call to the Updates() method.
//
// This method is idempotent. All calls after a successful one do nothing.
EnableUpdatePipe(mode BackendMode) error
}

View file

@ -0,0 +1,112 @@
package updatepipe
import (
"encoding/json"
"errors"
"fmt"
"strings"
"github.com/emersion/go-imap"
"github.com/emersion/go-imap/backend"
)
func unescapeName(s string) string {
return strings.ReplaceAll(s, "\x10", ";")
}
func escapeName(s string) string {
return strings.ReplaceAll(s, ";", "\x10")
}
type message struct {
SeqNum uint32
Flags []string
}
func parseUpdate(s string) (id string, upd backend.Update, err error) {
parts := strings.SplitN(s, ";", 5)
if len(parts) != 5 {
return "", nil, errors.New("updatepipe: mismatched parts count")
}
updBase := backend.NewUpdate(unescapeName(parts[2]), unescapeName(parts[3]))
switch parts[1] {
case "ExpungeUpdate":
exUpd := &backend.ExpungeUpdate{Update: updBase}
if err := json.Unmarshal([]byte(parts[4]), &exUpd.SeqNum); err != nil {
return "", nil, err
}
upd = exUpd
case "MailboxUpdate":
mboxUpd := &backend.MailboxUpdate{Update: updBase}
if err := json.Unmarshal([]byte(parts[4]), &mboxUpd.MailboxStatus); err != nil {
return "", nil, err
}
upd = mboxUpd
case "MessageUpdate":
// imap.Message is not JSON-serializable because it contains maps with
// complex keys.
// In practice, however, MessageUpdate is used only for FLAGS, so we
// serialize them only with a SeqNum.
msg := message{}
if err := json.Unmarshal([]byte(parts[4]), &msg); err != nil {
return "", nil, err
}
msgUpd := &backend.MessageUpdate{
Update: updBase,
Message: imap.NewMessage(msg.SeqNum, []imap.FetchItem{imap.FetchFlags}),
}
msgUpd.Message.Flags = msg.Flags
upd = msgUpd
}
return parts[0], upd, nil
}
func formatUpdate(myID string, upd backend.Update) (string, error) {
var (
objType string
objStr []byte
err error
)
switch v := upd.(type) {
case *backend.ExpungeUpdate:
objType = "ExpungeUpdate"
objStr, err = json.Marshal(v.SeqNum)
if err != nil {
return "", err
}
case *backend.MessageUpdate:
// imap.Message is not JSON-serializable because it contains maps with
// complex keys.
// In practice, however, MessageUpdate is used only for FLAGS, so we
// serialize them only with a seqnum.
objType = "MessageUpdate"
objStr, err = json.Marshal(message{
SeqNum: v.Message.SeqNum,
Flags: v.Message.Flags,
})
if err != nil {
return "", err
}
case *backend.MailboxUpdate:
objType = "MailboxUpdate"
objStr, err = json.Marshal(v.MailboxStatus)
if err != nil {
return "", err
}
default:
return "", fmt.Errorf("updatepipe: unknown update type: %T", upd)
}
return strings.Join([]string{
myID,
objType,
escapeName(upd.Username()),
escapeName(upd.Mailbox()),
string(objStr),
}, ";") + "\n", nil
}

View file

@ -0,0 +1,118 @@
package updatepipe
import (
"bufio"
"fmt"
"io"
"net"
"os"
"github.com/emersion/go-imap/backend"
"github.com/foxcpp/maddy/internal/log"
)
// UnixSockPipe implements the UpdatePipe interface by serializating updates
// to/from a Unix domain socket. Due to the way Unix sockets work, only one
// Listen goroutine can be running.
//
// The socket is stream-oriented and consists of the following messages:
// OBJ_ID;TYPE_NAME;USER;MAILBOX;JSON_SERIALIZED_INTERNAL_OBJECT\n
//
// Where TYPE_NAME is one of the folow: ExpungeUpdate, MailboxUpdate,
// MessageUpdate.
// And OBJ_ID is Process ID and UnixSockPipe address concated as a string.
// It is used to deduplicate updates sent to Push and recevied via Listen.
//
// The SockPath field specifies the socket path to use. The actual socket
// is initialized on the first call to Listen or (Init)Push.
type UnixSockPipe struct {
SockPath string
Log log.Logger
listener net.Listener
sender net.Conn
}
var _ P = &UnixSockPipe{}
func (usp *UnixSockPipe) myID() string {
return fmt.Sprintf("%d-%p", os.Getpid(), usp)
}
func (usp *UnixSockPipe) readUpdates(conn net.Conn, updCh chan<- backend.Update) {
scnr := bufio.NewScanner(conn)
for scnr.Scan() {
id, upd, err := parseUpdate(scnr.Text())
if err != nil {
usp.Log.Error("malformed update received", err, "str", scnr.Text())
}
// It is our own update, skip.
if id == usp.myID() {
continue
}
updCh <- upd
}
}
func (usp *UnixSockPipe) Wrap(upd <-chan backend.Update) chan backend.Update {
ourUpds := make(chan backend.Update, cap(upd))
return ourUpds
}
func (usp *UnixSockPipe) Listen(upd chan<- backend.Update) error {
l, err := net.Listen("unix", usp.SockPath)
if err != nil {
return err
}
usp.listener = l
go func() {
for {
conn, err := l.Accept()
if err != nil {
return
}
go usp.readUpdates(conn, upd)
}
}()
return nil
}
func (usp *UnixSockPipe) InitPush() error {
sock, err := net.Dial("unix", usp.SockPath)
if err != nil {
return err
}
usp.sender = sock
return nil
}
func (usp *UnixSockPipe) Push(upd backend.Update) error {
if usp.sender == nil {
if err := usp.InitPush(); err != nil {
return err
}
}
updStr, err := formatUpdate(usp.myID(), upd)
if err != nil {
return err
}
_, err = io.WriteString(usp.sender, updStr)
return err
}
func (usp *UnixSockPipe) Close() error {
if usp.sender != nil {
usp.sender.Close()
}
if usp.listener != nil {
usp.listener.Close()
os.Remove(usp.SockPath)
}
return nil
}

View file

@ -0,0 +1,44 @@
// Package updatepipe implements utilities for serialization and transport of
// IMAP update objects between processes and machines.
//
// Its main goal is provide maddyctl with ability to properly notify the server
// about changes without relying on it to coordinate access in the first place
// (so maddyctl can work without a running server or with a broken server
// instance).
//
// Additionally, it can be used to transfer IMAP updates between replicated
// nodes.
package updatepipe
import (
"github.com/emersion/go-imap/backend"
)
// The P interface represents the handle for a transport medium used for IMAP
// updates.
type P interface {
// Listen starts the "pull" goroutine that reads updates from the pipe and
// sends them to the channel.
//
// Usually it is not possible to call Listen multiple times for the same
// pipe.
//
// Updates sent using the same UpdatePipe object using Push are not
// duplicates to the channel passed to Listen.
Listen(upds chan<- backend.Update) error
// InitPush prepares the UpdatePipe to be used as updates source (Push
// method).
//
// It is called implicitly on the first Push call, but calling it
// explicitly allows to detect initialization errors early.
InitPush() error
// Push writes the update to the pipe.
//
// The update will not be duplicated if the UpdatePipe is also listening
// for updates.
Push(upd backend.Update) error
Close() error
}