diff --git a/cmd/maddyctl/imap.go b/cmd/maddyctl/imap.go index 3e45de5..d31e563 100644 --- a/cmd/maddyctl/imap.go +++ b/cmd/maddyctl/imap.go @@ -233,7 +233,6 @@ func msgsRemove(be Storage, ctx *cli.Context) error { } if !ctx.Bool("yes") { - fmt.Fprintf(os.Stderr, "Currently, it is unsafe to remove messages from mailboxes used by connected clients, continue?") if !clitools.Confirmation("Are you sure you want to delete these messages?", false) { return errors.New("Cancelled") } diff --git a/cmd/maddyctl/main.go b/cmd/maddyctl/main.go index 9681850..bf8e9cf 100644 --- a/cmd/maddyctl/main.go +++ b/cmd/maddyctl/main.go @@ -8,6 +8,7 @@ import ( "github.com/emersion/go-imap/backend" "github.com/foxcpp/maddy" + "github.com/foxcpp/maddy/internal/updatepipe" "github.com/urfave/cli" "golang.org/x/crypto/bcrypt" ) @@ -580,12 +581,26 @@ func openStorage(ctx *cli.Context) (Storage, error) { return nil, err } + var store Storage switch node.Name { case "sql": - return sqlFromCfgBlock(root, node) + store, err = sqlFromCfgBlock(root, node) + if err != nil { + return nil, err + } default: return nil, errors.New("Error: Storage backend is not supported by maddyctl") } + + if updStore, ok := store.(updatepipe.Backend); ok { + if err := updStore.EnableUpdatePipe(updatepipe.ModePush); err != nil && !errors.Is(err, os.ErrNotExist) { + fmt.Fprintf(os.Stderr, "Failed to initialize update pipe, do not remove messages from mailboxes open by clients: %v\n", err) + } + } else { + fmt.Fprintf(os.Stderr, "No update pipe support, do not remove messages from mailboxes open by clients\n") + } + + return store, nil } func openUserDB(ctx *cli.Context) (UserDB, error) { diff --git a/go.mod b/go.mod index 2fc770d..1a71718 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/emersion/go-msgauth v0.3.2-0.20191028231513-55b75676976c github.com/emersion/go-sasl v0.0.0-20190817083125-240c8404624e github.com/emersion/go-smtp v0.12.1-0.20191206174923-1f576e0ec85c - github.com/foxcpp/go-imap-sql v0.3.2-0.20191109135752-42bd4cfc6673 + github.com/foxcpp/go-imap-sql v0.3.2-0.20191208094750-8b4ec6b19a78 github.com/foxcpp/go-mockdns v0.0.0-20191123143003-02edb10da1e3 github.com/go-sql-driver/mysql v1.4.1 github.com/google/uuid v1.1.1 diff --git a/go.sum b/go.sum index 1c90ebf..d7e5f8b 100644 --- a/go.sum +++ b/go.sum @@ -65,6 +65,10 @@ github.com/foxcpp/go-imap-backend-tests v0.0.0-20190615132041-281c43ad777b h1:tC github.com/foxcpp/go-imap-backend-tests v0.0.0-20190615132041-281c43ad777b/go.mod h1:yUISYv/uXLQ6tQZcds/p/hdcZ5JzrEUifyED2VffWpc= github.com/foxcpp/go-imap-sql v0.3.2-0.20191109135752-42bd4cfc6673 h1:YDvjzA0mFuRKqSm31Yp9sbyql17cMPpnBj8/8tOJH0E= github.com/foxcpp/go-imap-sql v0.3.2-0.20191109135752-42bd4cfc6673/go.mod h1:ooCDdmcUlEhvNl57ROzeZTpqJuoMP3HNd03kemcF4AE= +github.com/foxcpp/go-imap-sql v0.3.2-0.20191207234110-dc9fdf3fe4c5 h1:2oLeS6iixM/TIuKi8jHWuxjDlYPuCKq8zdXmOeYyFOw= +github.com/foxcpp/go-imap-sql v0.3.2-0.20191207234110-dc9fdf3fe4c5/go.mod h1:ooCDdmcUlEhvNl57ROzeZTpqJuoMP3HNd03kemcF4AE= +github.com/foxcpp/go-imap-sql v0.3.2-0.20191208094750-8b4ec6b19a78 h1:vZrsheTVI5Rywi9q4gEuhF23GTRfUjVsWrN2co9xVPg= +github.com/foxcpp/go-imap-sql v0.3.2-0.20191208094750-8b4ec6b19a78/go.mod h1:ooCDdmcUlEhvNl57ROzeZTpqJuoMP3HNd03kemcF4AE= github.com/foxcpp/go-mockdns v0.0.0-20191116212937-0c6663a11ba3 h1:WpRSDZUlGndFXNGagURufe4nsxzKf/KkcLWGopyO1So= github.com/foxcpp/go-mockdns v0.0.0-20191116212937-0c6663a11ba3/go.mod h1:tPg4cp4nseejPd+UKxtCVQ2hUxNTZ7qQZJa7CLriIeo= github.com/foxcpp/go-mockdns v0.0.0-20191123143003-02edb10da1e3 h1:Xb9iu/7Gy3ZIaMUCHQICa0jSCeY4Sow2X877Y0GZ1pM= diff --git a/internal/endpoint/imap/imap.go b/internal/endpoint/imap/imap.go index fff2b74..953e136 100644 --- a/internal/endpoint/imap/imap.go +++ b/internal/endpoint/imap/imap.go @@ -24,6 +24,7 @@ import ( modconfig "github.com/foxcpp/maddy/internal/config/module" "github.com/foxcpp/maddy/internal/log" "github.com/foxcpp/maddy/internal/module" + "github.com/foxcpp/maddy/internal/updatepipe" ) type Endpoint struct { @@ -73,6 +74,12 @@ func (endp *Endpoint) Init(cfg *config.Map) error { return fmt.Errorf("imap: storage module %T does not implement imapbackend.BackendUpdater", endp.Store) } + if updBe, ok := endp.Store.(updatepipe.Backend); ok { + if err := updBe.EnableUpdatePipe(updatepipe.ModeReplicate); err != nil { + endp.Log.Error("failed to initialize updates pipe", err) + } + } + // Call Updates once at start, some storage backends initialize update // channel lazily and may not generate updates at all unless it is called. if endp.updater.Updates() == nil { @@ -105,6 +112,14 @@ func (endp *Endpoint) Init(cfg *config.Map) error { return err } + if err := endp.setupListeners(addresses); err != nil { + return err + } + + return nil +} + +func (endp *Endpoint) setupListeners(addresses []config.Endpoint) error { for _, addr := range addresses { var l net.Listener var err error diff --git a/internal/storage/sql/sql.go b/internal/storage/sql/sql.go index 2cb2d58..478f060 100644 --- a/internal/storage/sql/sql.go +++ b/internal/storage/sql/sql.go @@ -8,10 +8,13 @@ package sql import ( + "crypto/sha256" + "encoding/hex" "errors" "fmt" "net" "os" + "path/filepath" "strconv" "strings" @@ -27,6 +30,7 @@ import ( "github.com/foxcpp/maddy/internal/log" "github.com/foxcpp/maddy/internal/module" "github.com/foxcpp/maddy/internal/target" + "github.com/foxcpp/maddy/internal/updatepipe" "golang.org/x/text/secure/precis" _ "github.com/go-sql-driver/mysql" @@ -40,10 +44,14 @@ type Storage struct { junkMbox string - inlineDriver string - inlineDsn []string + driver string + dsn []string resolver dns.Resolver + + updates <-chan backend.Update + updPipe updatepipe.P + updPushStop chan struct{} } type delivery struct { @@ -144,8 +152,8 @@ func New(_, instName string, _, inlineArgs []string) (module.Module, error) { return nil, errors.New("sql: expected at least 2 arguments") } - store.inlineDriver = inlineArgs[0] - store.inlineDsn = inlineArgs[1:] + store.driver = inlineArgs[0] + store.dsn = inlineArgs[1:] } return store, nil } @@ -164,8 +172,8 @@ func (store *Storage) Init(cfg *config.Map) error { // configured). LazyUpdatesInit: true, } - cfg.String("driver", false, false, store.inlineDriver, &driver) - cfg.StringList("dsn", false, false, store.inlineDsn, &dsn) + cfg.String("driver", false, false, store.driver, &driver) + cfg.StringList("dsn", false, false, store.dsn, &dsn) cfg.Custom("fsstore", false, false, func() (interface{}, error) { return "messages", nil }, func(m *config.Map, node *config.Node) (interface{}, error) { @@ -244,6 +252,63 @@ func (store *Storage) Init(cfg *config.Map) error { store.Log.Debugln("go-imap-sql version", imapsql.VersionStr) + store.driver = driver + store.dsn = dsn + + return nil +} + +func (store *Storage) EnableUpdatePipe(mode updatepipe.BackendMode) error { + if store.updPipe != nil { + return nil + } + if store.updates != nil { + panic("sql: EnableUpdatePipe called after Updates") + } + + upds := store.Back.Updates() + + switch store.driver { + case "sqlite3": + dbId := sha256.Sum256([]byte(strings.Join(store.dsn, " "))) + store.updPipe = &updatepipe.UnixSockPipe{ + SockPath: filepath.Join( + config.RuntimeDirectory, + fmt.Sprintf("sql-%s.sock", hex.EncodeToString(dbId[:]))), + Log: log.Logger{Name: "sql/updpipe", Debug: store.Log.Debug}, + } + default: + return errors.New("sql: driver does not have an update pipe implementation") + } + + wrapped := make(chan backend.Update, cap(upds)*2) + + if mode == updatepipe.ModeReplicate { + if err := store.updPipe.Listen(wrapped); err != nil { + return err + } + } + + if err := store.updPipe.InitPush(); err != nil { + return err + } + + store.updPushStop = make(chan struct{}) + go func() { + for u := range upds { + if err := store.updPipe.Push(u); err != nil { + store.Log.Error("IMAP update pipe push failed", err) + } + + if mode != updatepipe.ModePush { + wrapped <- u + } + } + close(wrapped) + store.updPushStop <- struct{}{} + }() + + store.updates = wrapped return nil } @@ -256,7 +321,12 @@ func (store *Storage) CreateMessageLimit() *uint32 { } func (store *Storage) Updates() <-chan backend.Update { - return store.Back.Updates() + if store.updates != nil { + return store.updates + } + + store.updates = store.Back.Updates() + return store.updates } func (store *Storage) EnableChildrenExt() bool { @@ -313,7 +383,23 @@ func (store *Storage) GetOrCreateUser(username string) (backend.User, error) { } func (store *Storage) Close() error { - return store.Back.Close() + // Closes update channel. 'updates replicate' goroutine stops (see + // EnableUpdatePipe). + store.Back.Close() + + // Close UpdatePipe, stops generation of new updates. + if store.updPipe != nil { + store.updPipe.Close() + } + + // Wait for 'updates replicate' goroutine to actually stop so we will send + // all updates before shuting down (this is especially important for + // maddyctl). + if store.updPushStop != nil { + <-store.updPushStop + } + + return nil } func init() { diff --git a/internal/updatepipe/backend.go b/internal/updatepipe/backend.go new file mode 100644 index 0000000..8493d8c --- /dev/null +++ b/internal/updatepipe/backend.go @@ -0,0 +1,27 @@ +package updatepipe + +type BackendMode int + +const ( + // ModeReplicate configures backend to both send and receive updates over + // the pipe. + ModeReplicate BackendMode = iota + + // ModePush configures backend to send updates over the pipe only. + // + // If EnableUpdatePipe(ModePush) is called for backend, its Updates() + // channel will never receive any updates. + ModePush BackendMode = iota +) + +// The Backend interface is implemented by storage backends that support both +// updates serialization using the internal updatepipe.P implementation. +// To activate this implementation, EnableUpdatePipe should be called. +type Backend interface { + // EnableUpdatePipe enables the internal update pipe implementation. + // The mode argument selects the pipe behavior. EnableUpdatePipe must be + // called before the first call to the Updates() method. + // + // This method is idempotent. All calls after a successful one do nothing. + EnableUpdatePipe(mode BackendMode) error +} diff --git a/internal/updatepipe/serialize.go b/internal/updatepipe/serialize.go new file mode 100644 index 0000000..d8ad3c8 --- /dev/null +++ b/internal/updatepipe/serialize.go @@ -0,0 +1,112 @@ +package updatepipe + +import ( + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/emersion/go-imap" + "github.com/emersion/go-imap/backend" +) + +func unescapeName(s string) string { + return strings.ReplaceAll(s, "\x10", ";") +} + +func escapeName(s string) string { + return strings.ReplaceAll(s, ";", "\x10") +} + +type message struct { + SeqNum uint32 + Flags []string +} + +func parseUpdate(s string) (id string, upd backend.Update, err error) { + parts := strings.SplitN(s, ";", 5) + if len(parts) != 5 { + return "", nil, errors.New("updatepipe: mismatched parts count") + } + + updBase := backend.NewUpdate(unescapeName(parts[2]), unescapeName(parts[3])) + switch parts[1] { + case "ExpungeUpdate": + exUpd := &backend.ExpungeUpdate{Update: updBase} + if err := json.Unmarshal([]byte(parts[4]), &exUpd.SeqNum); err != nil { + return "", nil, err + } + upd = exUpd + case "MailboxUpdate": + mboxUpd := &backend.MailboxUpdate{Update: updBase} + if err := json.Unmarshal([]byte(parts[4]), &mboxUpd.MailboxStatus); err != nil { + return "", nil, err + } + upd = mboxUpd + case "MessageUpdate": + // imap.Message is not JSON-serializable because it contains maps with + // complex keys. + // In practice, however, MessageUpdate is used only for FLAGS, so we + // serialize them only with a SeqNum. + + msg := message{} + if err := json.Unmarshal([]byte(parts[4]), &msg); err != nil { + return "", nil, err + } + + msgUpd := &backend.MessageUpdate{ + Update: updBase, + Message: imap.NewMessage(msg.SeqNum, []imap.FetchItem{imap.FetchFlags}), + } + msgUpd.Message.Flags = msg.Flags + upd = msgUpd + } + + return parts[0], upd, nil +} + +func formatUpdate(myID string, upd backend.Update) (string, error) { + var ( + objType string + objStr []byte + err error + ) + switch v := upd.(type) { + case *backend.ExpungeUpdate: + objType = "ExpungeUpdate" + objStr, err = json.Marshal(v.SeqNum) + if err != nil { + return "", err + } + case *backend.MessageUpdate: + // imap.Message is not JSON-serializable because it contains maps with + // complex keys. + // In practice, however, MessageUpdate is used only for FLAGS, so we + // serialize them only with a seqnum. + + objType = "MessageUpdate" + objStr, err = json.Marshal(message{ + SeqNum: v.Message.SeqNum, + Flags: v.Message.Flags, + }) + if err != nil { + return "", err + } + case *backend.MailboxUpdate: + objType = "MailboxUpdate" + objStr, err = json.Marshal(v.MailboxStatus) + if err != nil { + return "", err + } + default: + return "", fmt.Errorf("updatepipe: unknown update type: %T", upd) + } + + return strings.Join([]string{ + myID, + objType, + escapeName(upd.Username()), + escapeName(upd.Mailbox()), + string(objStr), + }, ";") + "\n", nil +} diff --git a/internal/updatepipe/unix_pipe.go b/internal/updatepipe/unix_pipe.go new file mode 100644 index 0000000..b30429f --- /dev/null +++ b/internal/updatepipe/unix_pipe.go @@ -0,0 +1,118 @@ +package updatepipe + +import ( + "bufio" + "fmt" + "io" + "net" + "os" + + "github.com/emersion/go-imap/backend" + "github.com/foxcpp/maddy/internal/log" +) + +// UnixSockPipe implements the UpdatePipe interface by serializating updates +// to/from a Unix domain socket. Due to the way Unix sockets work, only one +// Listen goroutine can be running. +// +// The socket is stream-oriented and consists of the following messages: +// OBJ_ID;TYPE_NAME;USER;MAILBOX;JSON_SERIALIZED_INTERNAL_OBJECT\n +// +// Where TYPE_NAME is one of the folow: ExpungeUpdate, MailboxUpdate, +// MessageUpdate. +// And OBJ_ID is Process ID and UnixSockPipe address concated as a string. +// It is used to deduplicate updates sent to Push and recevied via Listen. +// +// The SockPath field specifies the socket path to use. The actual socket +// is initialized on the first call to Listen or (Init)Push. +type UnixSockPipe struct { + SockPath string + Log log.Logger + + listener net.Listener + sender net.Conn +} + +var _ P = &UnixSockPipe{} + +func (usp *UnixSockPipe) myID() string { + return fmt.Sprintf("%d-%p", os.Getpid(), usp) +} + +func (usp *UnixSockPipe) readUpdates(conn net.Conn, updCh chan<- backend.Update) { + scnr := bufio.NewScanner(conn) + for scnr.Scan() { + id, upd, err := parseUpdate(scnr.Text()) + if err != nil { + usp.Log.Error("malformed update received", err, "str", scnr.Text()) + } + + // It is our own update, skip. + if id == usp.myID() { + continue + } + + updCh <- upd + } +} + +func (usp *UnixSockPipe) Wrap(upd <-chan backend.Update) chan backend.Update { + ourUpds := make(chan backend.Update, cap(upd)) + + return ourUpds +} + +func (usp *UnixSockPipe) Listen(upd chan<- backend.Update) error { + l, err := net.Listen("unix", usp.SockPath) + if err != nil { + return err + } + usp.listener = l + go func() { + for { + conn, err := l.Accept() + if err != nil { + return + } + go usp.readUpdates(conn, upd) + } + }() + return nil +} + +func (usp *UnixSockPipe) InitPush() error { + sock, err := net.Dial("unix", usp.SockPath) + if err != nil { + return err + } + + usp.sender = sock + return nil +} + +func (usp *UnixSockPipe) Push(upd backend.Update) error { + if usp.sender == nil { + if err := usp.InitPush(); err != nil { + return err + } + } + + updStr, err := formatUpdate(usp.myID(), upd) + if err != nil { + return err + } + + _, err = io.WriteString(usp.sender, updStr) + return err +} + +func (usp *UnixSockPipe) Close() error { + if usp.sender != nil { + usp.sender.Close() + } + if usp.listener != nil { + usp.listener.Close() + os.Remove(usp.SockPath) + } + return nil +} diff --git a/internal/updatepipe/update_pipe.go b/internal/updatepipe/update_pipe.go new file mode 100644 index 0000000..1f7be70 --- /dev/null +++ b/internal/updatepipe/update_pipe.go @@ -0,0 +1,44 @@ +// Package updatepipe implements utilities for serialization and transport of +// IMAP update objects between processes and machines. +// +// Its main goal is provide maddyctl with ability to properly notify the server +// about changes without relying on it to coordinate access in the first place +// (so maddyctl can work without a running server or with a broken server +// instance). +// +// Additionally, it can be used to transfer IMAP updates between replicated +// nodes. +package updatepipe + +import ( + "github.com/emersion/go-imap/backend" +) + +// The P interface represents the handle for a transport medium used for IMAP +// updates. +type P interface { + // Listen starts the "pull" goroutine that reads updates from the pipe and + // sends them to the channel. + // + // Usually it is not possible to call Listen multiple times for the same + // pipe. + // + // Updates sent using the same UpdatePipe object using Push are not + // duplicates to the channel passed to Listen. + Listen(upds chan<- backend.Update) error + + // InitPush prepares the UpdatePipe to be used as updates source (Push + // method). + // + // It is called implicitly on the first Push call, but calling it + // explicitly allows to detect initialization errors early. + InitPush() error + + // Push writes the update to the pipe. + // + // The update will not be duplicated if the UpdatePipe is also listening + // for updates. + Push(upd backend.Update) error + + Close() error +}