diff --git a/go.mod b/go.mod index 8e3a8bc0b..72e95007a 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.13 require ( cloud.google.com/go/storage v1.4.0 + github.com/aws/aws-sdk-go v1.26.1 github.com/cheggaaa/pb/v3 v3.0.1 github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect diff --git a/go.sum b/go.sum index 4e32aacf4..e143fb43e 100644 --- a/go.sum +++ b/go.sum @@ -27,6 +27,8 @@ github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmx github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/aws/aws-sdk-go v1.26.1 h1:JGQggXhOiNJIqsmbYUl3cYtJZUffeOWlHtxfzGK7WPI= +github.com/aws/aws-sdk-go v1.26.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -171,6 +173,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 6bbf06061..e515357a5 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -107,9 +107,11 @@ func (bc *Client) SetStorage(ctx context.Context, backend *backup.StorageBackend return err } // backupmeta already exists - if exist, err := bc.storage.FileExists(ctx, utils.MetaFile); err != nil { - return err - } else if exist { + exist, err := bc.storage.FileExists(ctx, utils.MetaFile) + if err != nil { + return errors.Annotatef(err, "error occurred when checking %s file", utils.MetaFile) + } + if exist { return errors.New("backup meta exists, may be some backup files in the path already") } bc.backend = backend diff --git a/pkg/storage/flags.go b/pkg/storage/flags.go index e992572f2..26c26dc34 100644 --- a/pkg/storage/flags.go +++ b/pkg/storage/flags.go @@ -6,8 +6,15 @@ import ( "github.com/spf13/pflag" ) +const ( + // flagSendCredentialOption specify whether to send credentials to tikv + flagSendCredentialOption = "send-credentials-to-tikv" +) + // DefineFlags adds flags to the flag set corresponding to all backend options. func DefineFlags(flags *pflag.FlagSet) { + flags.BoolP(flagSendCredentialOption, "c", true, + "Whether send credentials to tikv") defineS3Flags(flags) defineGCSFlags(flags) } diff --git a/pkg/storage/parse.go b/pkg/storage/parse.go index ac5bb202f..c470d5458 100644 --- a/pkg/storage/parse.go +++ b/pkg/storage/parse.go @@ -2,6 +2,7 @@ package storage import ( "net/url" + "strings" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" @@ -38,7 +39,11 @@ func ParseBackend(rawURL string, options *BackendOptions) (*backup.StorageBacken return &backup.StorageBackend{Backend: &backup.StorageBackend_Noop{Noop: noop}}, nil case "s3": - s3 := &backup.S3{Bucket: u.Host, Prefix: u.Path} + if u.Host == "" { + return nil, errors.Errorf("please specify the bucket for s3 in %s", rawURL) + } + prefix := strings.Trim(u.Path, "/") + s3 := &backup.S3{Bucket: u.Host, Prefix: prefix} if options != nil { if err := options.S3.apply(s3); err != nil { return nil, err diff --git a/pkg/storage/parse_test.go b/pkg/storage/parse_test.go index 04c4e4bd0..c13216f46 100644 --- a/pkg/storage/parse_test.go +++ b/pkg/storage/parse_test.go @@ -1,4 +1,4 @@ -package storage_test +package storage import ( "io/ioutil" @@ -7,8 +7,6 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/backup" - - "github.com/pingcap/br/pkg/storage" ) func Test(t *testing.T) { @@ -20,38 +18,38 @@ type testStorageSuite struct{} var _ = Suite(&testStorageSuite{}) func (r *testStorageSuite) TestCreateStorage(c *C) { - _, err := storage.ParseBackend("1invalid:", nil) + _, err := ParseBackend("1invalid:", nil) c.Assert(err, ErrorMatches, "parse 1invalid:: first path segment in URL cannot contain colon") - _, err = storage.ParseBackend("net:storage", nil) + _, err = ParseBackend("net:storage", nil) c.Assert(err, ErrorMatches, "storage net not support yet") - s, err := storage.ParseBackend("local:///tmp/storage", nil) + s, err := ParseBackend("local:///tmp/storage", nil) c.Assert(err, IsNil) c.Assert(s.GetLocal().GetPath(), Equals, "/tmp/storage") - s, err = storage.ParseBackend("file:///tmp/storage", nil) + s, err = ParseBackend("file:///tmp/storage", nil) c.Assert(err, IsNil) c.Assert(s.GetLocal().GetPath(), Equals, "/tmp/storage") - s, err = storage.ParseBackend("noop://", nil) + s, err = ParseBackend("noop://", nil) c.Assert(err, IsNil) c.Assert(s.GetNoop(), NotNil) - _, err = storage.ParseBackend("s3://bucket/more/prefix/", &storage.BackendOptions{}) - c.Assert(err, ErrorMatches, `must provide either 's3\.region' or 's3\.endpoint'`) + _, err = ParseBackend("s3:///bucket/more/prefix/", &BackendOptions{}) + c.Assert(err, ErrorMatches, `please specify the bucket for s3 in s3:///bucket/more/prefix/`) - s3opt := &storage.BackendOptions{ - S3: storage.S3BackendOptions{ + s3opt := &BackendOptions{ + S3: S3BackendOptions{ Endpoint: "https://s3.example.com/", }, } - s, err = storage.ParseBackend("s3://bucket2/prefix/", s3opt) + s, err = ParseBackend("s3://bucket2/prefix/", s3opt) c.Assert(err, IsNil) s3 := s.GetS3() c.Assert(s3, NotNil) c.Assert(s3.Bucket, Equals, "bucket2") - c.Assert(s3.Prefix, Equals, "/prefix/") + c.Assert(s3.Prefix, Equals, "prefix") c.Assert(s3.Endpoint, Equals, "https://s3.example.com/") fakeCredentialsFile, err := ioutil.TempFile("", "fakeCredentialsFile") @@ -60,20 +58,20 @@ func (r *testStorageSuite) TestCreateStorage(c *C) { fakeCredentialsFile.Close() os.Remove(fakeCredentialsFile.Name()) }() - gcsOpt := &storage.BackendOptions{ - GCS: storage.GCSBackendOptions{ + gcsOpt := &BackendOptions{ + GCS: GCSBackendOptions{ Endpoint: "https://gcs.example.com/", CredentialsFile: fakeCredentialsFile.Name(), }, } - s, err = storage.ParseBackend("gcs://bucket2/prefix/", gcsOpt) + s, err = ParseBackend("gcs://bucket2/prefix/", gcsOpt) c.Assert(err, IsNil) gcs := s.GetGcs() c.Assert(gcs, NotNil) c.Assert(gcs.Bucket, Equals, "bucket2") c.Assert(gcs.Prefix, Equals, "prefix/") c.Assert(gcs.Endpoint, Equals, "https://gcs.example.com/") - s, err = storage.ParseBackend("gcs://bucket/more/prefix/", gcsOpt) + s, err = ParseBackend("gcs://bucket/more/prefix/", gcsOpt) c.Assert(err, IsNil) gcs = s.GetGcs() c.Assert(gcs, NotNil) @@ -83,21 +81,21 @@ func (r *testStorageSuite) TestCreateStorage(c *C) { } func (r *testStorageSuite) TestFormatBackendURL(c *C) { - url := storage.FormatBackendURL(&backup.StorageBackend{ + url := FormatBackendURL(&backup.StorageBackend{ Backend: &backup.StorageBackend_Local{ Local: &backup.Local{Path: "/tmp/file"}, }, }) c.Assert(url.String(), Equals, "local:///tmp/file") - url = storage.FormatBackendURL(&backup.StorageBackend{ + url = FormatBackendURL(&backup.StorageBackend{ Backend: &backup.StorageBackend_Noop{ Noop: &backup.Noop{}, }, }) c.Assert(url.String(), Equals, "noop:///") - url = storage.FormatBackendURL(&backup.StorageBackend{ + url = FormatBackendURL(&backup.StorageBackend{ Backend: &backup.StorageBackend_S3{ S3: &backup.S3{ Bucket: "bucket", @@ -108,7 +106,7 @@ func (r *testStorageSuite) TestFormatBackendURL(c *C) { }) c.Assert(url.String(), Equals, "s3://bucket/some%20prefix/") - url = storage.FormatBackendURL(&backup.StorageBackend{ + url = FormatBackendURL(&backup.StorageBackend{ Backend: &backup.StorageBackend_Gcs{ Gcs: &backup.GCS{ Bucket: "bucket", diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 2c4da92b9..a64f8798c 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -1,56 +1,286 @@ package storage import ( + "bytes" + "context" + "io/ioutil" + "net/url" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/spf13/pflag" + "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" - "github.com/spf13/pflag" +) + +var ( + sendCredential bool ) const ( - s3EndpointOption = "s3.endpoint" - s3RegionOption = "s3.region" + s3EndpointOption = "s3.endpoint" + s3RegionOption = "s3.region" + s3StorageClassOption = "s3.storage-class" + s3SSEOption = "s3.sse" + s3ACLOption = "s3.acl" + s3ProviderOption = "s3.provider" + notFound = "NotFound" + // number of retries to make of operations + maxRetries = 3 ) -// S3BackendOptions are options for configuration the S3 storage. +// s3Handlers make it easy to inject test functions +type s3Handlers interface { + HeadObjectWithContext(context.Context, *s3.HeadObjectInput, ...request.Option) (*s3.HeadObjectOutput, error) + GetObjectWithContext(context.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) + PutObjectWithContext(context.Context, *s3.PutObjectInput, ...request.Option) (*s3.PutObjectOutput, error) + HeadBucketWithContext(context.Context, *s3.HeadBucketInput, ...request.Option) (*s3.HeadBucketOutput, error) + WaitUntilObjectExistsWithContext(context.Context, *s3.HeadObjectInput, ...request.WaiterOption) error +} + +// S3Storage info for s3 storage +type S3Storage struct { + session *session.Session + svc s3Handlers + options *backup.S3 +} + +// S3BackendOptions contains options for s3 storage type S3BackendOptions struct { - Endpoint string `json:"endpoint" toml:"endpoint"` - Region string `json:"region" toml:"region"` + Endpoint string `json:"endpoint" toml:"endpoint"` + Region string `json:"region" toml:"region"` + StorageClass string `json:"storage-class" toml:"storage-class"` + SSE string `json:"sse" toml:"sse"` + ACL string `json:"acl" toml:"acl"` + AccessKey string `json:"access-key" toml:"access-key"` + SecretAccessKey string `json:"secret-access-key" toml:"secret-access-key"` + Provider string `json:"provider" toml:"provider"` + ForcePathStyle bool `json:"force-path-style" toml:"force-path-style"` + UseAccelerateEndpoint bool `json:"use-accelerate-endpoint" toml:"use-accelerate-endpoint"` } func (options *S3BackendOptions) apply(s3 *backup.S3) error { - if options.Endpoint == "" && options.Region == "" { - return errors.New("must provide either 's3.region' or 's3.endpoint'") + if options.Region == "" { + options.Region = "us-east-1" + } + if options.Endpoint != "" { + u, err := url.Parse(options.Endpoint) + if err != nil { + return err + } + if u.Scheme == "" { + return errors.New("scheme not found in endpoint") + } + if u.Host == "" { + return errors.New("host not found in endpoint") + } + } + // In some cases, we need to set ForcePathStyle to false. + // Refer to: https://rclone.org/s3/#s3-force-path-style + if options.Provider == "alibaba" || options.Provider == "netease" || + options.UseAccelerateEndpoint { + options.ForcePathStyle = false + } + if options.AccessKey == "" && options.SecretAccessKey != "" { + return errors.New("access_key not found") + } + if options.AccessKey != "" && options.SecretAccessKey == "" { + return errors.New("secret_access_key not found") } - // TODO: Verify Region. s3.Endpoint = options.Endpoint s3.Region = options.Region + // StorageClass, SSE and ACL are acceptable to be empty + s3.StorageClass = options.StorageClass + s3.Sse = options.SSE + s3.Acl = options.ACL + s3.AccessKey = options.AccessKey + s3.SecretAccessKey = options.SecretAccessKey + s3.ForcePathStyle = options.ForcePathStyle return nil } func defineS3Flags(flags *pflag.FlagSet) { - flags.String(s3EndpointOption, "", "Set the AWS S3 endpoint URL") - flags.String(s3RegionOption, "", "Set the AWS region") - // TODO: Finalize the list of options. - _ = flags.MarkHidden(s3EndpointOption) + flags.String(s3EndpointOption, "", "Set the S3 endpoint URL, please specify the http or https scheme explicitly") + flags.String(s3RegionOption, "", "Set the S3 region, e.g. us-east-1") + flags.String(s3StorageClassOption, "", "Set the S3 storage class, e.g. STANDARD") + flags.String(s3SSEOption, "", "Set the S3 server-side encryption algorithm, e.g. AES256") + flags.String(s3ACLOption, "", "Set the S3 canned ACLs, e.g. authenticated-read") + flags.String(s3ProviderOption, "", "Set the S3 provider, e.g. aws, alibaba, ceph") } func getBackendOptionsFromS3Flags(flags *pflag.FlagSet) (options S3BackendOptions, err error) { + sendCredential, err = flags.GetBool(flagSendCredentialOption) + if err != nil { + err = errors.Trace(err) + return + } options.Endpoint, err = flags.GetString(s3EndpointOption) if err != nil { err = errors.Trace(err) return } - options.Region, err = flags.GetString(s3RegionOption) if err != nil { err = errors.Trace(err) return } + options.SSE, err = flags.GetString(s3SSEOption) + if err != nil { + err = errors.Trace(err) + return + } + options.ACL, err = flags.GetString(s3ACLOption) + if err != nil { + err = errors.Trace(err) + return + } + options.StorageClass, err = flags.GetString(s3StorageClassOption) + if err != nil { + err = errors.Trace(err) + return + } + options.ForcePathStyle = true + options.Provider, err = flags.GetString(s3ProviderOption) + if err != nil { + err = errors.Trace(err) + return + } - // TODO: Add more options here. + return options, err +} - return +// newS3Storage initialize a new s3 storage for metadata +func newS3Storage(backend *backup.S3) (*S3Storage, error) { + qs := *backend + awsConfig := aws.NewConfig(). + WithMaxRetries(maxRetries). + WithS3ForcePathStyle(qs.ForcePathStyle). + WithRegion(qs.Region) + if qs.Endpoint != "" { + awsConfig.WithEndpoint(qs.Endpoint) + } + var cred *credentials.Credentials + if qs.AccessKey != "" && qs.SecretAccessKey != "" { + cred = credentials.NewStaticCredentials(qs.AccessKey, qs.SecretAccessKey, "") + } + if cred != nil { + awsConfig.WithCredentials(cred) + } + // awsConfig.WithLogLevel(aws.LogDebugWithSigning) + awsSessionOpts := session.Options{ + Config: *awsConfig, + } + ses, err := session.NewSessionWithOptions(awsSessionOpts) + if err != nil { + return nil, err + } + + if sendCredential && ses.Config.Credentials != nil { + if qs.AccessKey == "" || qs.SecretAccessKey == "" { + v, cerr := ses.Config.Credentials.Get() + if cerr != nil { + return nil, cerr + } + backend.AccessKey = v.AccessKeyID + backend.SecretAccessKey = v.SecretAccessKey + } + } + + c := s3.New(ses) + err = checkS3Bucket(c, qs.Bucket) + if err != nil { + return nil, errors.Errorf("Bucket %s is not accessible: %v", qs.Bucket, err) + } + + qs.Prefix += "/" + return &S3Storage{ + session: ses, + svc: c, + options: &qs, + }, nil } -// TODO: Define S3 storage. +// checkBucket checks if a bucket exists +var checkS3Bucket = func(svc *s3.S3, bucket string) error { + input := &s3.HeadBucketInput{ + Bucket: aws.String(bucket), + } + _, err := svc.HeadBucket(input) + return err +} + +// Write write to s3 storage +func (rs *S3Storage) Write(ctx context.Context, file string, data []byte) error { + input := &s3.PutObjectInput{ + Body: aws.ReadSeekCloser(bytes.NewReader(data)), + Bucket: aws.String(rs.options.Bucket), + Key: aws.String(rs.options.Prefix + file), + } + if rs.options.Acl != "" { + input = input.SetACL(rs.options.Acl) + } + if rs.options.Sse != "" { + input = input.SetServerSideEncryption(rs.options.Sse) + } + if rs.options.StorageClass != "" { + input = input.SetStorageClass(rs.options.StorageClass) + } + + _, err := rs.svc.PutObjectWithContext(ctx, input) + if err != nil { + return err + } + hinput := &s3.HeadObjectInput{ + Bucket: aws.String(rs.options.Bucket), + Key: aws.String(rs.options.Prefix + file), + } + err = rs.svc.WaitUntilObjectExistsWithContext(ctx, hinput) + return err +} + +// Read read file from s3 +func (rs *S3Storage) Read(ctx context.Context, file string) ([]byte, error) { + input := &s3.GetObjectInput{ + Bucket: aws.String(rs.options.Bucket), + Key: aws.String(rs.options.Prefix + file), + } + + result, err := rs.svc.GetObjectWithContext(ctx, input) + if err != nil { + return nil, err + } + defer result.Body.Close() + data, err := ioutil.ReadAll(result.Body) + if err != nil { + return nil, err + } + return data, nil +} + +// FileExists check if file exists on s3 storage +func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) { + input := &s3.HeadObjectInput{ + Bucket: aws.String(rs.options.Bucket), + Key: aws.String(rs.options.Prefix + file), + } + + _, err := rs.svc.HeadObjectWithContext(ctx, input) + if err != nil { + if aerr, ok := err.(awserr.Error); ok { + switch aerr.Code() { + case s3.ErrCodeNoSuchBucket, s3.ErrCodeNoSuchKey, notFound: + return false, nil + default: + return true, err + } + } + } + + return true, err +} diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go new file mode 100644 index 000000000..a5c5af61e --- /dev/null +++ b/pkg/storage/s3_test.go @@ -0,0 +1,455 @@ +package storage + +import ( + "context" + "io/ioutil" + "os" + "strings" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/s3" + . "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/backup" + "github.com/spf13/pflag" +) + +func (r *testStorageSuite) TestApply(c *C) { + type testcase struct { + name string + options S3BackendOptions + errMsg string + errReturn bool + } + testFn := func(test *testcase, c *C) { + c.Log(test.name) + _, err := ParseBackend("s3://bucket2/prefix/", &BackendOptions{S3: test.options}) + if test.errReturn { + c.Assert(err, ErrorMatches, test.errMsg) + } else { + c.Assert(err, IsNil) + } + } + tests := []testcase{ + { + name: "access_key not found", + options: S3BackendOptions{ + Region: "us-west-2", + SecretAccessKey: "cd", + }, + errMsg: "access_key not found", + errReturn: true, + }, + { + name: "secret_access_key not found", + options: S3BackendOptions{ + Region: "us-west-2", + AccessKey: "ab", + }, + errMsg: "secret_access_key not found", + errReturn: true, + }, + { + name: "scheme not found", + options: S3BackendOptions{ + Endpoint: "12345", + }, + errMsg: "scheme not found in endpoint", + errReturn: true, + }, + { + name: "host not found", + options: S3BackendOptions{ + Endpoint: "http:12345", + }, + errMsg: "host not found in endpoint", + errReturn: true, + }, + { + name: "invalid endpoint", + options: S3BackendOptions{ + Endpoint: "!http:12345", + }, + errMsg: "parse !http:12345: first path segment in URL cannot contain colon", + errReturn: true, + }, + } + for i := range tests { + testFn(&tests[i], c) + } +} +func (r *testStorageSuite) TestApplyUpdate(c *C) { + type testcase struct { + name string + options S3BackendOptions + setEnv bool + s3 *backup.S3 + } + testFn := func(test *testcase, c *C) { + c.Log(test.name) + if test.setEnv { + os.Setenv("AWS_ACCESS_KEY_ID", "ab") + os.Setenv("AWS_SECRET_ACCESS_KEY", "cd") + } + u, err := ParseBackend("s3://bucket/prefix/", &BackendOptions{S3: test.options}) + s3 := u.GetS3() + c.Assert(err, IsNil) + c.Assert(s3, DeepEquals, test.s3) + + } + tests := []testcase{ + { + name: "no region and no endpoint", + options: S3BackendOptions{ + Region: "", + Endpoint: "", + }, + s3: &backup.S3{ + Region: "us-east-1", + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "no endpoint", + options: S3BackendOptions{ + Region: "us-west-2", + }, + s3: &backup.S3{ + Region: "us-west-2", + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "https endpoint", + options: S3BackendOptions{ + Endpoint: "https://s3.us-west-2", + }, + s3: &backup.S3{ + Region: "us-east-1", + Endpoint: "https://s3.us-west-2", + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "http endpoint", + options: S3BackendOptions{ + Endpoint: "http://s3.us-west-2", + }, + s3: &backup.S3{ + Region: "us-east-1", + Endpoint: "http://s3.us-west-2", + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "ceph provider", + options: S3BackendOptions{ + Region: "us-west-2", + ForcePathStyle: true, + Provider: "ceph", + }, + s3: &backup.S3{ + Region: "us-west-2", + ForcePathStyle: true, + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "ali provider", + options: S3BackendOptions{ + Region: "us-west-2", + ForcePathStyle: true, + Provider: "alibaba", + }, + s3: &backup.S3{ + Region: "us-west-2", + ForcePathStyle: false, + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "netease provider", + options: S3BackendOptions{ + Region: "us-west-2", + ForcePathStyle: true, + Provider: "netease", + }, + s3: &backup.S3{ + Region: "us-west-2", + ForcePathStyle: false, + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "useAccelerateEndpoint", + options: S3BackendOptions{ + Region: "us-west-2", + ForcePathStyle: true, + UseAccelerateEndpoint: true, + }, + s3: &backup.S3{ + Region: "us-west-2", + ForcePathStyle: false, + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "keys", + options: S3BackendOptions{ + Region: "us-west-2", + AccessKey: "ab", + SecretAccessKey: "cd", + }, + s3: &backup.S3{ + Region: "us-west-2", + AccessKey: "ab", + SecretAccessKey: "cd", + Bucket: "bucket", + Prefix: "prefix", + }, + setEnv: true, + }, + } + for i := range tests { + testFn(&tests[i], c) + } +} + +func (r *testStorageSuite) TestS3Storage(c *C) { + type testcase struct { + name string + s3 *backup.S3 + errReturn bool + hackCheck bool + } + testFn := func(test *testcase, c *C) { + c.Log(test.name) + ctx := aws.BackgroundContext() + sendCredential = true + if test.hackCheck { + checkS3Bucket = func(svc *s3.S3, bucket string) error { return nil } + } + s3 := &backup.StorageBackend{ + Backend: &backup.StorageBackend_S3{ + S3: test.s3, + }, + } + _, err := Create(ctx, s3) + if test.errReturn { + c.Assert(err, NotNil) + return + } + c.Assert(err, IsNil) + } + tests := []testcase{ + { + name: "no region and endpoint", + s3: &backup.S3{ + Region: "", + Endpoint: "", + Bucket: "bucket", + Prefix: "prefix", + }, + errReturn: true, + }, + { + name: "no region", + s3: &backup.S3{ + Region: "", + Endpoint: "http://10.1.2.3", + Bucket: "bucket", + Prefix: "prefix", + }, + errReturn: true, + }, + { + name: "no endpoint", + s3: &backup.S3{ + Region: "us-west-2", + Endpoint: "", + Bucket: "bucket", + Prefix: "prefix", + }, + errReturn: true, + }, + { + name: "no region", + s3: &backup.S3{ + Region: "", + Endpoint: "http://10.1.2.3", + Bucket: "bucket", + Prefix: "prefix", + }, + errReturn: false, + hackCheck: true, + }, + { + name: "normal region", + s3: &backup.S3{ + Region: "us-west-2", + Endpoint: "", + Bucket: "bucket", + Prefix: "prefix", + }, + errReturn: false, + hackCheck: true, + }, + { + name: "keys configured explicitly", + s3: &backup.S3{ + Region: "us-west-2", + AccessKey: "ab", + SecretAccessKey: "cd", + Bucket: "bucket", + Prefix: "prefix", + }, + errReturn: false, + hackCheck: true, + }, + { + name: "no access key", + s3: &backup.S3{ + Region: "us-west-2", + SecretAccessKey: "cd", + Bucket: "bucket", + Prefix: "prefix", + }, + errReturn: false, + hackCheck: true, + }, + { + name: "no secret access key", + s3: &backup.S3{ + Region: "us-west-2", + AccessKey: "ab", + Bucket: "bucket", + Prefix: "prefix", + }, + errReturn: false, + hackCheck: true, + }, + } + for i := range tests { + testFn(&tests[i], c) + } +} +func (r *testStorageSuite) TestS3Handlers(c *C) { + type testcase struct { + name string + mh *mockS3Handler + options *backup.S3 + } + + testFn := func(test *testcase, c *C) { + c.Log(test.name) + ctx := aws.BackgroundContext() + ms3 := S3Storage{ + svc: test.mh, + options: test.options, + } + err := ms3.Write(ctx, "file", []byte("test")) + c.Assert(err, Equals, test.mh.err) + _, err = ms3.Read(ctx, "file") + c.Assert(err, Equals, test.mh.err) + _, err = ms3.FileExists(ctx, "file") + if err != nil { + c.Assert(err, Equals, test.mh.err) + } + } + tests := []testcase{ + { + name: "no error", + mh: &mockS3Handler{ + err: nil, + }, + options: &backup.S3{ + Region: "us-west-2", + Bucket: "bucket", + Prefix: "prefix", + Acl: "acl", + Sse: "sse", + StorageClass: "sc", + }, + }, + { + name: "error", + mh: &mockS3Handler{ + err: errors.New("write error"), + }, + options: &backup.S3{ + Region: "us-west-2", + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "aws not found error", + mh: &mockS3Handler{ + err: awserr.New(notFound, notFound, errors.New("not found")), + }, + options: &backup.S3{ + Region: "us-west-2", + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "aws other error", + mh: &mockS3Handler{ + err: awserr.New("other", "other", errors.New("other")), + }, + options: &backup.S3{ + Region: "us-west-2", + Bucket: "bucket", + Prefix: "prefix", + }, + }, + } + for i := range tests { + testFn(&tests[i], c) + } +} + +func (r *testStorageSuite) TestS3Others(c *C) { + defineS3Flags(&pflag.FlagSet{}) +} + +type mockS3Handler struct { + err error +} + +func (c *mockS3Handler) HeadObjectWithContext(ctx context.Context, + input *s3.HeadObjectInput, opts ...request.Option) (*s3.HeadObjectOutput, error) { + return nil, c.err +} +func (c *mockS3Handler) GetObjectWithContext(ctx context.Context, + input *s3.GetObjectInput, opts ...request.Option) (*s3.GetObjectOutput, error) { + if c.err != nil { + return nil, c.err + } + return &s3.GetObjectOutput{ + Body: ioutil.NopCloser(strings.NewReader("HappyFace.jpg")), + }, nil +} +func (c *mockS3Handler) PutObjectWithContext(ctx context.Context, + input *s3.PutObjectInput, opts ...request.Option) (*s3.PutObjectOutput, error) { + return nil, c.err +} +func (c *mockS3Handler) HeadBucketWithContext(ctx context.Context, + input *s3.HeadBucketInput, opts ...request.Option) (*s3.HeadBucketOutput, error) { + return nil, c.err +} +func (c *mockS3Handler) WaitUntilObjectExistsWithContext(ctx context.Context, + input *s3.HeadObjectInput, opts ...request.WaiterOption) error { + return c.err +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 9f79a2061..c93525257 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -22,6 +22,11 @@ func Create(ctx context.Context, backend *backup.StorageBackend) (ExternalStorag switch backend := backend.Backend.(type) { case *backup.StorageBackend_Local: return newLocalStorage(backend.Local.Path) + case *backup.StorageBackend_S3: + if backend.S3 == nil { + return nil, errors.New("s3 config not found") + } + return newS3Storage(backend.S3) case *backup.StorageBackend_Noop: return newNoopStorage(), nil case *backup.StorageBackend_Gcs: