storage/blob: Implement S3-compatible storage support

Closes #304.
This commit is contained in:
fox.cpp 2021-07-15 20:31:50 +03:00
parent 02924d8d4b
commit ef63383248
No known key found for this signature in database
GPG key ID: 5B991F6215D2FCC0
8 changed files with 312 additions and 3 deletions

View file

@ -0,0 +1,144 @@
package s3
import (
"context"
"fmt"
"io"
"net/http"
"github.com/foxcpp/maddy/framework/config"
"github.com/foxcpp/maddy/framework/log"
"github.com/foxcpp/maddy/framework/module"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
const modName = "storage.blob.s3"
type Store struct {
instName string
log log.Logger
endpoint string
cl *minio.Client
bucketName string
objectPrefix string
}
func New(_, instName string, _, inlineArgs []string) (module.Module, error) {
if len(inlineArgs) != 0 {
return nil, fmt.Errorf("%s: expected 0 arguments", modName)
}
return &Store{
instName: instName,
log: log.Logger{Name: modName},
}, nil
}
func (s *Store) Init(cfg *config.Map) error {
var (
secure bool
accessKeyID string
secretAccessKey string
location string
)
cfg.String("endpoint", false, true, "", &s.endpoint)
cfg.Bool("secure", false, true, &secure)
cfg.String("access_key", false, true, "", &accessKeyID)
cfg.String("secret_key", false, true, "", &secretAccessKey)
cfg.String("bucket", false, true, "", &s.bucketName)
cfg.String("region", false, false, "", &location)
cfg.String("object_prefix", false, false, "", &s.objectPrefix)
if _, err := cfg.Process(); err != nil {
return err
}
if s.endpoint == "" {
return fmt.Errorf("%s: endpoint not set", modName)
}
cl, err := minio.New(s.endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
Secure: secure,
Region: location,
})
if err != nil {
return fmt.Errorf("%s: %w", modName, err)
}
s.cl = cl
return nil
}
func (s *Store) Name() string {
return modName
}
func (s *Store) InstanceName() string {
return s.instName
}
type s3blob struct {
pw *io.PipeWriter
didSync bool
errCh chan error
}
func (b *s3blob) Sync() error {
// We do this in Sync instead of Close because
// backend may not actually check the error of Close.
// The problematic restriction is that Sync can now be called
// only once.
b.pw.Close()
return <-b.errCh
}
func (b *s3blob) Write(p []byte) (n int, err error) {
return b.pw.Write(p)
}
func (b *s3blob) Close() error {
return nil
}
func (s *Store) Create(key string) (module.Blob, error) {
pr, pw := io.Pipe()
errCh := make(chan error, 1)
go func() {
_, err := s.cl.PutObject(context.TODO(), s.bucketName, s.objectPrefix+key, pr, -1, minio.PutObjectOptions{})
errCh <- err
}()
return &s3blob{pw: pw, errCh: errCh}, nil
}
func (s *Store) Open(key string) (io.ReadCloser, error) {
obj, err := s.cl.GetObject(context.TODO(), s.bucketName, s.objectPrefix+key, minio.GetObjectOptions{})
if err != nil {
resp := minio.ToErrorResponse(err)
if resp.StatusCode == http.StatusNotFound {
return nil, module.ErrNoSuchBlob
}
return nil, err
}
return obj, nil
}
func (s *Store) Delete(keys []string) error {
var lastErr error
for _, k := range keys {
lastErr = s.cl.RemoveObject(context.TODO(), s.bucketName, s.objectPrefix+k, minio.RemoveObjectOptions{})
if lastErr != nil {
s.log.Error("failed to delete object", lastErr, s.objectPrefix+k)
}
}
return lastErr
}
func init() {
module.Register(modName, New)
}