From 68388322ff4f778070260c103613c89169628821 Mon Sep 17 00:00:00 2001 From: DanielZhangQD Date: Wed, 11 Dec 2019 16:17:05 +0800 Subject: [PATCH 01/14] backup: store backupmeta to s3 --- pkg/backup/client.go | 8 +- pkg/storage/flags.go | 7 ++ pkg/storage/parse.go | 7 +- pkg/storage/remote.go | 51 +++++++++++ pkg/storage/s3.go | 203 ++++++++++++++++++++++++++++++++++++++--- pkg/storage/storage.go | 5 + 6 files changed, 262 insertions(+), 19 deletions(-) create mode 100644 pkg/storage/remote.go diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 6bbf06061..70e95a42d 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -107,9 +107,11 @@ func (bc *Client) SetStorage(ctx context.Context, backend *backup.StorageBackend return err } // backupmeta already exists - if exist, err := bc.storage.FileExists(ctx, utils.MetaFile); err != nil { - return err - } else if exist { + exist, err := bc.storage.FileExists(ctx, utils.MetaFile) + if err != nil { + return errors.Errorf("error occurred when check %s file: %v", utils.MetaFile, err) + } + if exist { return errors.New("backup meta exists, may be some backup files in the path already") } bc.backend = backend diff --git a/pkg/storage/flags.go b/pkg/storage/flags.go index e992572f2..13dc452e8 100644 --- a/pkg/storage/flags.go +++ b/pkg/storage/flags.go @@ -6,8 +6,15 @@ import ( "github.com/spf13/pflag" ) +const ( + // flagSendCredentialOption specify whether to send credentials to tikv + flagSendCredentialOption = "send-credentials-to-tikv" +) + // DefineFlags adds flags to the flag set corresponding to all backend options. func DefineFlags(flags *pflag.FlagSet) { + flags.BoolP(flagSendCredentialOption, "c", true, + "If send credentials to tikv(default true)") defineS3Flags(flags) defineGCSFlags(flags) } diff --git a/pkg/storage/parse.go b/pkg/storage/parse.go index ac5bb202f..c470d5458 100644 --- a/pkg/storage/parse.go +++ b/pkg/storage/parse.go @@ -2,6 +2,7 @@ package storage import ( "net/url" + "strings" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" @@ -38,7 +39,11 @@ func ParseBackend(rawURL string, options *BackendOptions) (*backup.StorageBacken return &backup.StorageBackend{Backend: &backup.StorageBackend_Noop{Noop: noop}}, nil case "s3": - s3 := &backup.S3{Bucket: u.Host, Prefix: u.Path} + if u.Host == "" { + return nil, errors.Errorf("please specify the bucket for s3 in %s", rawURL) + } + prefix := strings.Trim(u.Path, "/") + s3 := &backup.S3{Bucket: u.Host, Prefix: prefix} if options != nil { if err := options.S3.apply(s3); err != nil { return nil, err diff --git a/pkg/storage/remote.go b/pkg/storage/remote.go new file mode 100644 index 000000000..d5bed0af5 --- /dev/null +++ b/pkg/storage/remote.go @@ -0,0 +1,51 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + + "gocloud.dev/blob" +) + +// RemoteStorage info for remote storage +type RemoteStorage struct { + bucket *blob.Bucket +} + +// Write write to remote storage +func (rs *RemoteStorage) Write(file string, data []byte) error { + ctx := context.Background() + + // Open the key for writing with the default options. + err := rs.bucket.WriteAll(ctx, file, data, nil) + if err != nil { + return err + } + return nil +} + +// Read read file from remote storage +func (rs *RemoteStorage) Read(file string) ([]byte, error) { + ctx := context.Background() + // Read from the key. + return rs.bucket.ReadAll(ctx, file) +} + +// FileExists check if file exists on remote storage +func (rs *RemoteStorage) FileExists(file string) (bool, error) { + ctx := context.Background() + // Check the key. + return rs.bucket.Exists(ctx, file) +} diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 2c4da92b9..8c2275eae 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -1,56 +1,229 @@ package storage import ( + "context" + "net/http" + "os" + "strings" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds" + "github.com/aws/aws-sdk-go/aws/defaults" + "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/spf13/pflag" + "gocloud.dev/blob" + "gocloud.dev/blob/s3blob" + "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" - "github.com/spf13/pflag" ) const ( - s3EndpointOption = "s3.endpoint" - s3RegionOption = "s3.region" + s3EndpointOption = "s3.endpoint" + s3RegionOption = "s3.region" + s3StorageClassOption = "s3.storage-class" + s3SSEOption = "s3.sse" + s3ACLOption = "s3.acl" + s3ProviderOption = "s3.provider" + sendCredentialOption = "send-credentials-to-tikv" + accessKeyEnv = "AWS_ACCESS_KEY_ID" + secretAccessKeyEnv = "AWS_SECRET_ACCESS_KEY" + // number of retries to make of operations + maxRetries = 3 ) -// S3BackendOptions are options for configuration the S3 storage. +// S3BackendOptions contains options for s3 storage type S3BackendOptions struct { - Endpoint string `json:"endpoint" toml:"endpoint"` - Region string `json:"region" toml:"region"` + Endpoint string `json:"endpoint" toml:"endpoint"` + Region string `json:"region" toml:"region"` + StorageClass string `json:"storage_class" toml:"storage_class"` + SSE string `json:"sse" toml:"sse"` + ACL string `json:"acl" toml:"acl"` + ForcePathStyle bool `json:"force_path_style" toml:"force_path_style"` + AccessKey string `json:"access_key" toml:"access_key"` + SecretAccessKey string `json:"secret_access_key" toml:"secret_access_key"` } func (options *S3BackendOptions) apply(s3 *backup.S3) error { if options.Endpoint == "" && options.Region == "" { return errors.New("must provide either 's3.region' or 's3.endpoint'") } - // TODO: Verify Region. + if options.AccessKey == "" && options.SecretAccessKey != "" { + return errors.New("secret_access_key not found") + } + if options.AccessKey != "" && options.SecretAccessKey == "" { + return errors.New("access_key not found") + } + // StorageClass, SSE and ACL are acceptable to be empty s3.Endpoint = options.Endpoint s3.Region = options.Region + s3.StorageClass = options.StorageClass + s3.Sse = options.SSE + s3.Acl = options.ACL + s3.AccessKey = options.AccessKey + s3.SecretAccessKey = options.SecretAccessKey + s3.ForcePathStyle = options.ForcePathStyle return nil } func defineS3Flags(flags *pflag.FlagSet) { - flags.String(s3EndpointOption, "", "Set the AWS S3 endpoint URL") - flags.String(s3RegionOption, "", "Set the AWS region") - // TODO: Finalize the list of options. - _ = flags.MarkHidden(s3EndpointOption) + flags.String(s3EndpointOption, "", "Set the S3 endpoint URL") + flags.String(s3RegionOption, "", "Set the S3 region") + flags.String(s3StorageClassOption, "", "Set the S3 storage class") + flags.String(s3SSEOption, "", "Set the S3 server-side encryption algorithm") + flags.String(s3ACLOption, "", "Set the S3 canned ACLs") + flags.String(s3ProviderOption, "", "Set the S3 provider") } func getBackendOptionsFromS3Flags(flags *pflag.FlagSet) (options S3BackendOptions, err error) { + send, err := flags.GetBool(flagSendCredentialOption) + if err != nil { + err = errors.Trace(err) + return + } + if send { + options.AccessKey = os.Getenv(accessKeyEnv) + options.SecretAccessKey = os.Getenv(secretAccessKeyEnv) + } options.Endpoint, err = flags.GetString(s3EndpointOption) if err != nil { err = errors.Trace(err) return } - + if options.Endpoint != "" { + if !strings.HasPrefix(options.Endpoint, "https://") && + !strings.HasPrefix(options.Endpoint, "http://") { + options.Endpoint = "http://" + options.Endpoint + } + } options.Region, err = flags.GetString(s3RegionOption) if err != nil { err = errors.Trace(err) return } - - // TODO: Add more options here. + options.SSE, err = flags.GetString(s3SSEOption) + if err != nil { + err = errors.Trace(err) + return + } + options.ACL, err = flags.GetString(s3ACLOption) + if err != nil { + err = errors.Trace(err) + return + } + options.StorageClass, err = flags.GetString(s3StorageClassOption) + if err != nil { + err = errors.Trace(err) + return + } + options.ForcePathStyle = true + provider, err := flags.GetString(s3ProviderOption) + if err != nil { + err = errors.Trace(err) + return + } + // TODO: ForcePathStyle may need to be false + // if UseAccelerateEndpoint enabled for aws s3 + if provider == "alibaba" || provider == "netease" { + options.ForcePathStyle = false + } return } -// TODO: Define S3 storage. +// newS3Storage initialize a new s3 storage for metadata +func newS3Storage(s3Back *backup.S3) (*RemoteStorage, error) { + qs := *s3Back + v := credentials.Value{ + AccessKeyID: qs.AccessKey, + SecretAccessKey: qs.SecretAccessKey, + } + + // low timeout to ec2 metadata service + lowTimeoutClient := &http.Client{Timeout: 1 * time.Second} + def := defaults.Get() + def.Config.HTTPClient = lowTimeoutClient + + // first provider to supply a credential set "wins" + providers := []credentials.Provider{ + // use static credentials if they're present (checked by provider) + &credentials.StaticProvider{Value: v}, + + // * Access Key ID: AWS_ACCESS_KEY_ID or AWS_ACCESS_KEY + // * Secret Access Key: AWS_SECRET_ACCESS_KEY or AWS_SECRET_KEY + &credentials.EnvProvider{}, + + // A SharedCredentialsProvider retrieves credentials + // from the current user's home directory. It checks + // AWS_SHARED_CREDENTIALS_FILE and AWS_PROFILE too. + &credentials.SharedCredentialsProvider{}, + + // Pick up IAM role if we're in an ECS task + defaults.RemoteCredProvider(*def.Config, def.Handlers), + + // Pick up IAM role in case we're on EC2 + &ec2rolecreds.EC2RoleProvider{ + Client: ec2metadata.New(session.New(), &aws.Config{ + HTTPClient: lowTimeoutClient, + }), + ExpiryWindow: 3 * time.Minute, + }, + } + cred := credentials.NewChainCredentials(providers) + + if qs.Region == "" && qs.Endpoint == "" { + qs.Endpoint = "https://s3.amazonaws.com/" + } + if qs.Region == "" { + qs.Region = "us-east-1" + } + + awsConfig := aws.NewConfig(). + WithMaxRetries(maxRetries). + WithCredentials(cred). + WithS3ForcePathStyle(qs.ForcePathStyle) + if qs.Region != "" { + awsConfig.WithRegion(qs.Region) + } + if qs.Endpoint != "" { + awsConfig.WithEndpoint(qs.Endpoint) + } + // awsConfig.WithLogLevel(aws.LogDebugWithSigning) + awsSessionOpts := session.Options{ + Config: *awsConfig, + } + ses, err := session.NewSessionWithOptions(awsSessionOpts) + if err != nil { + return nil, err + } + c := s3.New(ses) + err = checkS3Bucket(c, qs.Bucket) + if err != nil { + return nil, errors.Errorf("checkS3Bucket error: %v", err) + } + // Create a *blob.Bucket. + bkt, err := s3blob.OpenBucket(context.Background(), ses, qs.Bucket, nil) + if err != nil { + return nil, err + } + + qs.Prefix = strings.Trim(qs.Prefix, "/") + qs.Prefix += "/" + return &RemoteStorage{ + bucket: blob.PrefixedBucket(bkt, qs.Prefix), + }, nil +} + +// checkBucket checks if a bucket exists and creates it if not +func checkS3Bucket(svc *s3.S3, bucket string) error { + input := &s3.HeadBucketInput{ + Bucket: aws.String(bucket), + } + _, err := svc.HeadBucket(input) + return err +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 9f79a2061..a65b280b1 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -22,6 +22,11 @@ func Create(ctx context.Context, backend *backup.StorageBackend) (ExternalStorag switch backend := backend.Backend.(type) { case *backup.StorageBackend_Local: return newLocalStorage(backend.Local.Path) + case *backup.StorageBackend_S3: + if backend.S3 == nil { + return nil, errors.Errorf("no s3 config in %#v", backend) + } + return newS3Storage(backend.S3) case *backup.StorageBackend_Noop: return newNoopStorage(), nil case *backup.StorageBackend_Gcs: From 529452a2b40ea412d90d46d957d1d3a5518e0f8b Mon Sep 17 00:00:00 2001 From: DanielZhangQD Date: Wed, 11 Dec 2019 17:20:19 +0800 Subject: [PATCH 02/14] add UT case --- Makefile | 1 + pkg/storage/parse_test.go | 4 +++- pkg/storage/s3.go | 9 ++++++--- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index a03cedc54..9e2fabab3 100644 --- a/Makefile +++ b/Makefile @@ -69,6 +69,7 @@ static: --disable interfacer \ --disable goimports \ --disable gofmt \ + --exclude G101 \ $$($(PACKAGE_DIRECTORIES)) lint: diff --git a/pkg/storage/parse_test.go b/pkg/storage/parse_test.go index 04c4e4bd0..c295b14e5 100644 --- a/pkg/storage/parse_test.go +++ b/pkg/storage/parse_test.go @@ -40,6 +40,8 @@ func (r *testStorageSuite) TestCreateStorage(c *C) { _, err = storage.ParseBackend("s3://bucket/more/prefix/", &storage.BackendOptions{}) c.Assert(err, ErrorMatches, `must provide either 's3\.region' or 's3\.endpoint'`) + _, err = storage.ParseBackend("s3:///bucket/more/prefix/", &storage.BackendOptions{}) + c.Assert(err, ErrorMatches, `please specify the bucket for s3 in s3:///bucket/more/prefix/`) s3opt := &storage.BackendOptions{ S3: storage.S3BackendOptions{ @@ -51,7 +53,7 @@ func (r *testStorageSuite) TestCreateStorage(c *C) { s3 := s.GetS3() c.Assert(s3, NotNil) c.Assert(s3.Bucket, Equals, "bucket2") - c.Assert(s3.Prefix, Equals, "/prefix/") + c.Assert(s3.Prefix, Equals, "prefix") c.Assert(s3.Endpoint, Equals, "https://s3.example.com/") fakeCredentialsFile, err := ioutil.TempFile("", "fakeCredentialsFile") diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 8c2275eae..50e9096dc 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -29,7 +29,6 @@ const ( s3SSEOption = "s3.sse" s3ACLOption = "s3.acl" s3ProviderOption = "s3.provider" - sendCredentialOption = "send-credentials-to-tikv" accessKeyEnv = "AWS_ACCESS_KEY_ID" secretAccessKeyEnv = "AWS_SECRET_ACCESS_KEY" // number of retries to make of operations @@ -133,7 +132,7 @@ func getBackendOptionsFromS3Flags(flags *pflag.FlagSet) (options S3BackendOption options.ForcePathStyle = false } - return + return options, err } // newS3Storage initialize a new s3 storage for metadata @@ -148,6 +147,10 @@ func newS3Storage(s3Back *backup.S3) (*RemoteStorage, error) { lowTimeoutClient := &http.Client{Timeout: 1 * time.Second} def := defaults.Get() def.Config.HTTPClient = lowTimeoutClient + ec2Session, err := session.NewSession() + if err != nil { + return nil, err + } // first provider to supply a credential set "wins" providers := []credentials.Provider{ @@ -168,7 +171,7 @@ func newS3Storage(s3Back *backup.S3) (*RemoteStorage, error) { // Pick up IAM role in case we're on EC2 &ec2rolecreds.EC2RoleProvider{ - Client: ec2metadata.New(session.New(), &aws.Config{ + Client: ec2metadata.New(ec2Session, &aws.Config{ HTTPClient: lowTimeoutClient, }), ExpiryWindow: 3 * time.Minute, From fb12ffda3d6d8fec40abff129bc12a7daa27199c Mon Sep 17 00:00:00 2001 From: DanielZhangQD Date: Wed, 11 Dec 2019 21:49:31 +0800 Subject: [PATCH 03/14] backup: replace go cdk with aws-sdk --- pkg/storage/s3.go | 100 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 89 insertions(+), 11 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 50e9096dc..0ae5fe649 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -1,13 +1,15 @@ package storage import ( - "context" + "bytes" + "io/ioutil" "net/http" "os" "strings" "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds" "github.com/aws/aws-sdk-go/aws/defaults" @@ -15,8 +17,6 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/spf13/pflag" - "gocloud.dev/blob" - "gocloud.dev/blob/s3blob" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" @@ -35,6 +35,13 @@ const ( maxRetries = 3 ) +// S3Storage info for s3 storage +type S3Storage struct { + session *session.Session + svc *s3.S3 + options *backup.S3 +} + // S3BackendOptions contains options for s3 storage type S3BackendOptions struct { Endpoint string `json:"endpoint" toml:"endpoint"` @@ -136,7 +143,7 @@ func getBackendOptionsFromS3Flags(flags *pflag.FlagSet) (options S3BackendOption } // newS3Storage initialize a new s3 storage for metadata -func newS3Storage(s3Back *backup.S3) (*RemoteStorage, error) { +func newS3Storage(s3Back *backup.S3) (*S3Storage, error) { qs := *s3Back v := credentials.Value{ AccessKeyID: qs.AccessKey, @@ -209,16 +216,13 @@ func newS3Storage(s3Back *backup.S3) (*RemoteStorage, error) { if err != nil { return nil, errors.Errorf("checkS3Bucket error: %v", err) } - // Create a *blob.Bucket. - bkt, err := s3blob.OpenBucket(context.Background(), ses, qs.Bucket, nil) - if err != nil { - return nil, err - } qs.Prefix = strings.Trim(qs.Prefix, "/") qs.Prefix += "/" - return &RemoteStorage{ - bucket: blob.PrefixedBucket(bkt, qs.Prefix), + return &S3Storage{ + session: ses, + svc: c, + options: &qs, }, nil } @@ -230,3 +234,77 @@ func checkS3Bucket(svc *s3.S3, bucket string) error { _, err := svc.HeadBucket(input) return err } + +// Write write to s3 storage +func (rs *S3Storage) Write(file string, data []byte) error { + input := &s3.PutObjectInput{ + Body: aws.ReadSeekCloser(bytes.NewReader(data)), + Bucket: aws.String(rs.options.Bucket), + Key: aws.String(rs.options.Prefix + file), + } + if rs.options.Acl != "" { + input = input.SetACL(rs.options.Acl) + } + if rs.options.Sse != "" { + input = input.SetServerSideEncryption(rs.options.Sse) + } + if rs.options.StorageClass != "" { + input = input.SetStorageClass(rs.options.StorageClass) + } + + // TODO: PutObjectWithContext + _, err := rs.svc.PutObject(input) + if err != nil { + return err + } + hinput := &s3.HeadObjectInput{ + Bucket: aws.String(rs.options.Bucket), + Key: aws.String(rs.options.Prefix + file), + } + // TODO: WaitUntilObjectExistsWithContext + err = rs.svc.WaitUntilObjectExists(hinput) + return err +} + +// Read read file from s3 +func (rs *S3Storage) Read(file string) ([]byte, error) { + input := &s3.GetObjectInput{ + Bucket: aws.String(rs.options.Bucket), + Key: aws.String(rs.options.Prefix + file), + } + + // TODO: GetObjectWithContext + result, err := rs.svc.GetObject(input) + if err != nil { + return nil, err + } + defer result.Body.Close() + data, err := ioutil.ReadAll(result.Body) + if err != nil { + return nil, err + } + return data, nil +} + +// FileExists check if file exists on s3 storage +func (rs *S3Storage) FileExists(file string) (bool, error) { + input := &s3.HeadObjectInput{ + Bucket: aws.String(rs.options.Bucket), + Key: aws.String(rs.options.Prefix + file), + } + + // TODO: HeadObjectWithContext + _, err := rs.svc.HeadObject(input) + if err != nil { + if aerr, ok := err.(awserr.Error); ok { + switch aerr.Code() { + case s3.ErrCodeNoSuchBucket, s3.ErrCodeNoSuchKey: + return false, nil + default: + return true, err + } + } + } + + return true, err +} From 5d7c1acb349851df7bc4ed7630f22fd238f824f1 Mon Sep 17 00:00:00 2001 From: DanielZhangQD Date: Thu, 12 Dec 2019 10:22:24 +0800 Subject: [PATCH 04/14] address comments --- Makefile | 1 - pkg/storage/remote.go | 51 ------------------------------- pkg/storage/s3.go | 68 +++++++++++++++++++++++------------------- pkg/storage/storage.go | 2 +- 4 files changed, 39 insertions(+), 83 deletions(-) delete mode 100644 pkg/storage/remote.go diff --git a/Makefile b/Makefile index 9e2fabab3..a03cedc54 100644 --- a/Makefile +++ b/Makefile @@ -69,7 +69,6 @@ static: --disable interfacer \ --disable goimports \ --disable gofmt \ - --exclude G101 \ $$($(PACKAGE_DIRECTORIES)) lint: diff --git a/pkg/storage/remote.go b/pkg/storage/remote.go deleted file mode 100644 index d5bed0af5..000000000 --- a/pkg/storage/remote.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import ( - "context" - - "gocloud.dev/blob" -) - -// RemoteStorage info for remote storage -type RemoteStorage struct { - bucket *blob.Bucket -} - -// Write write to remote storage -func (rs *RemoteStorage) Write(file string, data []byte) error { - ctx := context.Background() - - // Open the key for writing with the default options. - err := rs.bucket.WriteAll(ctx, file, data, nil) - if err != nil { - return err - } - return nil -} - -// Read read file from remote storage -func (rs *RemoteStorage) Read(file string) ([]byte, error) { - ctx := context.Background() - // Read from the key. - return rs.bucket.ReadAll(ctx, file) -} - -// FileExists check if file exists on remote storage -func (rs *RemoteStorage) FileExists(file string) (bool, error) { - ctx := context.Background() - // Check the key. - return rs.bucket.Exists(ctx, file) -} diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 0ae5fe649..9cfbcd422 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -4,7 +4,6 @@ import ( "bytes" "io/ioutil" "net/http" - "os" "strings" "time" @@ -29,8 +28,9 @@ const ( s3SSEOption = "s3.sse" s3ACLOption = "s3.acl" s3ProviderOption = "s3.provider" - accessKeyEnv = "AWS_ACCESS_KEY_ID" - secretAccessKeyEnv = "AWS_SECRET_ACCESS_KEY" + // accessKeyEnv = "AWS_ACCESS_KEY_ID" + // secretAccessKeyEnv = "AWS_SECRET_ACCESS_KEY" + notFound = "NotFound" // number of retries to make of operations maxRetries = 3 ) @@ -44,14 +44,17 @@ type S3Storage struct { // S3BackendOptions contains options for s3 storage type S3BackendOptions struct { - Endpoint string `json:"endpoint" toml:"endpoint"` - Region string `json:"region" toml:"region"` - StorageClass string `json:"storage_class" toml:"storage_class"` - SSE string `json:"sse" toml:"sse"` - ACL string `json:"acl" toml:"acl"` - ForcePathStyle bool `json:"force_path_style" toml:"force_path_style"` - AccessKey string `json:"access_key" toml:"access_key"` - SecretAccessKey string `json:"secret_access_key" toml:"secret_access_key"` + Endpoint string `json:"endpoint" toml:"endpoint"` + Region string `json:"region" toml:"region"` + StorageClass string `json:"storage_class" toml:"storage_class"` + SSE string `json:"sse" toml:"sse"` + ACL string `json:"acl" toml:"acl"` + AccessKey string `json:"access_key" toml:"access_key"` + SecretAccessKey string `json:"secret_access_key" toml:"secret_access_key"` + Provider string `json:"provider" toml:"provider"` + ForcePathStyle bool `json:"force_path_style" toml:"force_path_style"` + UseAccelerateEndpoint bool `json:"use_accelerate_endpoint" toml:"use_accelerate_endpoint"` + SendCredential bool `json:"send_credential" toml:"send_credential"` } func (options *S3BackendOptions) apply(s3 *backup.S3) error { @@ -65,9 +68,29 @@ func (options *S3BackendOptions) apply(s3 *backup.S3) error { return errors.New("access_key not found") } - // StorageClass, SSE and ACL are acceptable to be empty + if options.Endpoint != "" { + if !strings.HasPrefix(options.Endpoint, "https://") && + !strings.HasPrefix(options.Endpoint, "http://") { + options.Endpoint = "http://" + options.Endpoint + } + } + if options.Provider == "alibaba" || options.Provider == "netease" || + options.UseAccelerateEndpoint { + options.ForcePathStyle = false + } + if options.SendCredential { + c := credentials.NewEnvCredentials() + v, cerr := c.Get() + if cerr != nil { + cerr = errors.Trace(cerr) + return cerr + } + options.AccessKey = v.AccessKeyID + options.SecretAccessKey = v.SecretAccessKey + } s3.Endpoint = options.Endpoint s3.Region = options.Region + // StorageClass, SSE and ACL are acceptable to be empty s3.StorageClass = options.StorageClass s3.Sse = options.SSE s3.Acl = options.ACL @@ -87,26 +110,16 @@ func defineS3Flags(flags *pflag.FlagSet) { } func getBackendOptionsFromS3Flags(flags *pflag.FlagSet) (options S3BackendOptions, err error) { - send, err := flags.GetBool(flagSendCredentialOption) + options.SendCredential, err = flags.GetBool(flagSendCredentialOption) if err != nil { err = errors.Trace(err) return } - if send { - options.AccessKey = os.Getenv(accessKeyEnv) - options.SecretAccessKey = os.Getenv(secretAccessKeyEnv) - } options.Endpoint, err = flags.GetString(s3EndpointOption) if err != nil { err = errors.Trace(err) return } - if options.Endpoint != "" { - if !strings.HasPrefix(options.Endpoint, "https://") && - !strings.HasPrefix(options.Endpoint, "http://") { - options.Endpoint = "http://" + options.Endpoint - } - } options.Region, err = flags.GetString(s3RegionOption) if err != nil { err = errors.Trace(err) @@ -128,16 +141,11 @@ func getBackendOptionsFromS3Flags(flags *pflag.FlagSet) (options S3BackendOption return } options.ForcePathStyle = true - provider, err := flags.GetString(s3ProviderOption) + options.Provider, err = flags.GetString(s3ProviderOption) if err != nil { err = errors.Trace(err) return } - // TODO: ForcePathStyle may need to be false - // if UseAccelerateEndpoint enabled for aws s3 - if provider == "alibaba" || provider == "netease" { - options.ForcePathStyle = false - } return options, err } @@ -298,7 +306,7 @@ func (rs *S3Storage) FileExists(file string) (bool, error) { if err != nil { if aerr, ok := err.(awserr.Error); ok { switch aerr.Code() { - case s3.ErrCodeNoSuchBucket, s3.ErrCodeNoSuchKey: + case s3.ErrCodeNoSuchBucket, s3.ErrCodeNoSuchKey, notFound: return false, nil default: return true, err diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index a65b280b1..c93525257 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -24,7 +24,7 @@ func Create(ctx context.Context, backend *backup.StorageBackend) (ExternalStorag return newLocalStorage(backend.Local.Path) case *backup.StorageBackend_S3: if backend.S3 == nil { - return nil, errors.Errorf("no s3 config in %#v", backend) + return nil, errors.New("s3 config not found") } return newS3Storage(backend.S3) case *backup.StorageBackend_Noop: From 9304e3ed4c3d289bdd28cd55437fb7afa0997762 Mon Sep 17 00:00:00 2001 From: DanielZhangQD Date: Thu, 12 Dec 2019 17:37:11 +0800 Subject: [PATCH 05/14] add more UT and address comments --- pkg/backup/client.go | 2 +- pkg/storage/parse_test.go | 31 ++- pkg/storage/s3.go | 37 ++-- pkg/storage/s3_test.go | 407 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 444 insertions(+), 33 deletions(-) create mode 100644 pkg/storage/s3_test.go diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 70e95a42d..e515357a5 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -109,7 +109,7 @@ func (bc *Client) SetStorage(ctx context.Context, backend *backup.StorageBackend // backupmeta already exists exist, err := bc.storage.FileExists(ctx, utils.MetaFile) if err != nil { - return errors.Errorf("error occurred when check %s file: %v", utils.MetaFile, err) + return errors.Annotatef(err, "error occurred when checking %s file", utils.MetaFile) } if exist { return errors.New("backup meta exists, may be some backup files in the path already") diff --git a/pkg/storage/parse_test.go b/pkg/storage/parse_test.go index c295b14e5..78ebc2592 100644 --- a/pkg/storage/parse_test.go +++ b/pkg/storage/parse_test.go @@ -1,4 +1,4 @@ -package storage_test +package storage import ( "io/ioutil" @@ -7,8 +7,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/backup" - - "github.com/pingcap/br/pkg/storage" + // "github.com/pingcap/br/pkg/storage" ) func Test(t *testing.T) { @@ -20,35 +19,35 @@ type testStorageSuite struct{} var _ = Suite(&testStorageSuite{}) func (r *testStorageSuite) TestCreateStorage(c *C) { - _, err := storage.ParseBackend("1invalid:", nil) + _, err := ParseBackend("1invalid:", nil) c.Assert(err, ErrorMatches, "parse 1invalid:: first path segment in URL cannot contain colon") - _, err = storage.ParseBackend("net:storage", nil) + _, err = ParseBackend("net:storage", nil) c.Assert(err, ErrorMatches, "storage net not support yet") - s, err := storage.ParseBackend("local:///tmp/storage", nil) + s, err := ParseBackend("local:///tmp/storage", nil) c.Assert(err, IsNil) c.Assert(s.GetLocal().GetPath(), Equals, "/tmp/storage") - s, err = storage.ParseBackend("file:///tmp/storage", nil) + s, err = ParseBackend("file:///tmp/storage", nil) c.Assert(err, IsNil) c.Assert(s.GetLocal().GetPath(), Equals, "/tmp/storage") - s, err = storage.ParseBackend("noop://", nil) + s, err = ParseBackend("noop://", nil) c.Assert(err, IsNil) c.Assert(s.GetNoop(), NotNil) - _, err = storage.ParseBackend("s3://bucket/more/prefix/", &storage.BackendOptions{}) + _, err = ParseBackend("s3://bucket/more/prefix/", &BackendOptions{}) c.Assert(err, ErrorMatches, `must provide either 's3\.region' or 's3\.endpoint'`) - _, err = storage.ParseBackend("s3:///bucket/more/prefix/", &storage.BackendOptions{}) + _, err = ParseBackend("s3:///bucket/more/prefix/", &BackendOptions{}) c.Assert(err, ErrorMatches, `please specify the bucket for s3 in s3:///bucket/more/prefix/`) - s3opt := &storage.BackendOptions{ - S3: storage.S3BackendOptions{ + s3opt := &BackendOptions{ + S3: S3BackendOptions{ Endpoint: "https://s3.example.com/", }, } - s, err = storage.ParseBackend("s3://bucket2/prefix/", s3opt) + s, err = ParseBackend("s3://bucket2/prefix/", s3opt) c.Assert(err, IsNil) s3 := s.GetS3() c.Assert(s3, NotNil) @@ -85,21 +84,21 @@ func (r *testStorageSuite) TestCreateStorage(c *C) { } func (r *testStorageSuite) TestFormatBackendURL(c *C) { - url := storage.FormatBackendURL(&backup.StorageBackend{ + url := FormatBackendURL(&backup.StorageBackend{ Backend: &backup.StorageBackend_Local{ Local: &backup.Local{Path: "/tmp/file"}, }, }) c.Assert(url.String(), Equals, "local:///tmp/file") - url = storage.FormatBackendURL(&backup.StorageBackend{ + url = FormatBackendURL(&backup.StorageBackend{ Backend: &backup.StorageBackend_Noop{ Noop: &backup.Noop{}, }, }) c.Assert(url.String(), Equals, "noop:///") - url = storage.FormatBackendURL(&backup.StorageBackend{ + url = FormatBackendURL(&backup.StorageBackend{ Backend: &backup.StorageBackend_S3{ S3: &backup.S3{ Bucket: "bucket", diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 9cfbcd422..68658cfa9 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -35,10 +35,20 @@ const ( maxRetries = 3 ) +// s3Handlers make it easy to inject test functions +type s3Handlers interface { + HeadObject(*s3.HeadObjectInput) (*s3.HeadObjectOutput, error) + GetObject(*s3.GetObjectInput) (*s3.GetObjectOutput, error) + PutObject(*s3.PutObjectInput) (*s3.PutObjectOutput, error) + HeadBucket(*s3.HeadBucketInput) (*s3.HeadBucketOutput, error) + WaitUntilObjectExists(*s3.HeadObjectInput) error +} + // S3Storage info for s3 storage type S3Storage struct { session *session.Session - svc *s3.S3 + // svc *s3.S3 + svc s3Handlers options *backup.S3 } @@ -61,13 +71,6 @@ func (options *S3BackendOptions) apply(s3 *backup.S3) error { if options.Endpoint == "" && options.Region == "" { return errors.New("must provide either 's3.region' or 's3.endpoint'") } - if options.AccessKey == "" && options.SecretAccessKey != "" { - return errors.New("secret_access_key not found") - } - if options.AccessKey != "" && options.SecretAccessKey == "" { - return errors.New("access_key not found") - } - if options.Endpoint != "" { if !strings.HasPrefix(options.Endpoint, "https://") && !strings.HasPrefix(options.Endpoint, "http://") { @@ -82,12 +85,18 @@ func (options *S3BackendOptions) apply(s3 *backup.S3) error { c := credentials.NewEnvCredentials() v, cerr := c.Get() if cerr != nil { - cerr = errors.Trace(cerr) return cerr } options.AccessKey = v.AccessKeyID options.SecretAccessKey = v.SecretAccessKey } + if options.AccessKey == "" && options.SecretAccessKey != "" { + return errors.New("access_key not found") + } + if options.AccessKey != "" && options.SecretAccessKey == "" { + return errors.New("secret_access_key not found") + } + s3.Endpoint = options.Endpoint s3.Region = options.Region // StorageClass, SSE and ACL are acceptable to be empty @@ -193,21 +202,17 @@ func newS3Storage(s3Back *backup.S3) (*S3Storage, error) { }, } cred := credentials.NewChainCredentials(providers) - if qs.Region == "" && qs.Endpoint == "" { qs.Endpoint = "https://s3.amazonaws.com/" } if qs.Region == "" { qs.Region = "us-east-1" } - awsConfig := aws.NewConfig(). WithMaxRetries(maxRetries). WithCredentials(cred). - WithS3ForcePathStyle(qs.ForcePathStyle) - if qs.Region != "" { - awsConfig.WithRegion(qs.Region) - } + WithS3ForcePathStyle(qs.ForcePathStyle). + WithRegion(qs.Region) if qs.Endpoint != "" { awsConfig.WithEndpoint(qs.Endpoint) } @@ -235,7 +240,7 @@ func newS3Storage(s3Back *backup.S3) (*S3Storage, error) { } // checkBucket checks if a bucket exists and creates it if not -func checkS3Bucket(svc *s3.S3, bucket string) error { +var checkS3Bucket = func(svc *s3.S3, bucket string) error { input := &s3.HeadBucketInput{ Bucket: aws.String(bucket), } diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go new file mode 100644 index 000000000..f852855fd --- /dev/null +++ b/pkg/storage/s3_test.go @@ -0,0 +1,407 @@ +package storage + +import ( + "io/ioutil" + "os" + "strings" + + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/s3" + . "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/backup" + "github.com/spf13/pflag" +) + +func (r *testStorageSuite) TestApply(c *C) { + type testcase struct { + name string + options S3BackendOptions + errMsg string + errReturn bool + } + testFn := func(test *testcase, c *C) { + c.Log(test.name) + _, err := ParseBackend("s3://bucket2/prefix/", &BackendOptions{S3: test.options}) + if test.errReturn { + c.Assert(err, ErrorMatches, test.errMsg) + } else { + c.Assert(err, IsNil) + } + } + tests := []testcase{ + { + name: "missing region and endpoint", + options: S3BackendOptions{ + Region: "", + Endpoint: "", + }, + errMsg: "must provide either 's3.region' or 's3.endpoint'", + errReturn: true, + }, + { + name: "access_key not found", + options: S3BackendOptions{ + Region: "us-west-2", + SendCredential: false, + SecretAccessKey: "cd", + }, + errMsg: "access_key not found", + errReturn: true, + }, + { + name: "secret_access_key not found", + options: S3BackendOptions{ + Region: "us-west-2", + SendCredential: false, + AccessKey: "ab", + }, + errMsg: "secret_access_key not found", + errReturn: true, + }, + } + for i := range tests { + testFn(&tests[i], c) + } +} +func (r *testStorageSuite) TestApplyUpdate(c *C) { + type testcase struct { + name string + options S3BackendOptions + setEnv bool + s3 *backup.S3 + } + testFn := func(test *testcase, c *C) { + c.Log(test.name) + if test.setEnv { + os.Setenv("AWS_ACCESS_KEY_ID", "ab") + os.Setenv("AWS_SECRET_ACCESS_KEY", "cd") + } + u, err := ParseBackend("s3://bucket/prefix/", &BackendOptions{S3: test.options}) + s3 := u.GetS3() + c.Assert(err, IsNil) + c.Assert(true, Equals, equalS3(s3, test.s3)) + } + tests := []testcase{ + { + name: "no endpoint", + options: S3BackendOptions{ + Region: "us-west-2", + }, + s3: &backup.S3{ + Region: "us-west-2", + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "https endpoint", + options: S3BackendOptions{ + Endpoint: "https://s3.us-west-2", + }, + s3: &backup.S3{ + Endpoint: "https://s3.us-west-2", + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "http endpoint", + options: S3BackendOptions{ + Endpoint: "http://s3.us-west-2", + }, + s3: &backup.S3{ + Endpoint: "http://s3.us-west-2", + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "no scheme endpoint", + options: S3BackendOptions{ + Endpoint: "s3.us-west-2", + }, + s3: &backup.S3{ + Endpoint: "http://s3.us-west-2", + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "ceph provider", + options: S3BackendOptions{ + Region: "us-west-2", + ForcePathStyle: true, + Provider: "ceph", + }, + s3: &backup.S3{ + Region: "us-west-2", + ForcePathStyle: true, + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "ali provider", + options: S3BackendOptions{ + Region: "us-west-2", + ForcePathStyle: true, + Provider: "alibaba", + }, + s3: &backup.S3{ + Region: "us-west-2", + ForcePathStyle: false, + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "netease provider", + options: S3BackendOptions{ + Region: "us-west-2", + ForcePathStyle: true, + Provider: "netease", + }, + s3: &backup.S3{ + Region: "us-west-2", + ForcePathStyle: false, + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "useAccelerateEndpoint", + options: S3BackendOptions{ + Region: "us-west-2", + ForcePathStyle: true, + UseAccelerateEndpoint: true, + }, + s3: &backup.S3{ + Region: "us-west-2", + ForcePathStyle: false, + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "SendCredential", + options: S3BackendOptions{ + Region: "us-west-2", + AccessKey: "Access", + SecretAccessKey: "SecretAccess", + SendCredential: true, + }, + s3: &backup.S3{ + Region: "us-west-2", + AccessKey: "ab", + SecretAccessKey: "cd", + Bucket: "bucket", + Prefix: "prefix", + }, + setEnv: true, + }, + } + for i := range tests { + testFn(&tests[i], c) + } +} + +func equalS3(s1, s2 *backup.S3) bool { + return s1.Endpoint == s2.Endpoint && + s1.Region == s2.Region && + s1.Bucket == s2.Bucket && + s1.Prefix == s2.Prefix && + s1.StorageClass == s2.StorageClass && + s1.Sse == s2.Sse && + s1.Acl == s2.Acl && + s1.AccessKey == s2.AccessKey && + s1.SecretAccessKey == s2.SecretAccessKey && + s1.ForcePathStyle == s2.ForcePathStyle +} + +func (r *testStorageSuite) TestS3Storage(c *C) { + type testcase struct { + name string + s3 *backup.S3 + errReturn bool + hackCheck bool + } + testFn := func(test *testcase, c *C) { + c.Log(test.name) + if test.hackCheck { + checkS3Bucket = func(svc *s3.S3, bucket string) error { return nil } + } + s3 := &backup.StorageBackend{ + Backend: &backup.StorageBackend_S3{ + S3: test.s3, + }, + } + _, err := Create(s3) + if test.errReturn { + c.Assert(err, NotNil) + return + } + c.Assert(err, IsNil) + } + tests := []testcase{ + { + name: "no region and endpoint", + s3: &backup.S3{ + Region: "", + Endpoint: "", + Bucket: "bucket", + Prefix: "prefix", + }, + errReturn: true, + }, + { + name: "no region", + s3: &backup.S3{ + Region: "", + Endpoint: "http://10.1.2.3", + Bucket: "bucket", + Prefix: "prefix", + }, + errReturn: true, + }, + { + name: "no endpoint", + s3: &backup.S3{ + Region: "us-west-2", + Endpoint: "", + Bucket: "bucket", + Prefix: "prefix", + }, + errReturn: true, + }, + { + name: "no region", + s3: &backup.S3{ + Region: "", + Endpoint: "http://10.1.2.3", + Bucket: "bucket", + Prefix: "prefix", + }, + errReturn: false, + hackCheck: true, + }, + { + name: "normal region", + s3: &backup.S3{ + Region: "us-west-2", + Endpoint: "", + Bucket: "bucket", + Prefix: "prefix", + }, + errReturn: false, + hackCheck: true, + }, + } + for i := range tests { + testFn(&tests[i], c) + } +} +func (r *testStorageSuite) TestS3Handlers(c *C) { + type testcase struct { + name string + mh *mockS3Handler + options *backup.S3 + } + + testFn := func(test *testcase, c *C) { + c.Log(test.name) + ms3 := S3Storage{ + svc: test.mh, + options: test.options, + } + err := ms3.Write("file", []byte("test")) + c.Assert(err, Equals, test.mh.err) + _, err = ms3.Read("file") + c.Assert(err, Equals, test.mh.err) + _, err = ms3.FileExists("file") + if err != nil { + c.Assert(err, Equals, test.mh.err) + } + } + tests := []testcase{ + { + name: "no error", + mh: &mockS3Handler{ + err: nil, + }, + options: &backup.S3{ + Region: "us-west-2", + Bucket: "bucket", + Prefix: "prefix", + Acl: "acl", + Sse: "sse", + StorageClass: "sc", + }, + }, + { + name: "error", + mh: &mockS3Handler{ + err: errors.New("write error"), + }, + options: &backup.S3{ + Region: "us-west-2", + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "aws not found error", + mh: &mockS3Handler{ + err: awserr.New(notFound, notFound, errors.New("not found")), + }, + options: &backup.S3{ + Region: "us-west-2", + Bucket: "bucket", + Prefix: "prefix", + }, + }, + { + name: "aws other error", + mh: &mockS3Handler{ + err: awserr.New("other", "other", errors.New("other")), + }, + options: &backup.S3{ + Region: "us-west-2", + Bucket: "bucket", + Prefix: "prefix", + }, + }, + } + for i := range tests { + testFn(&tests[i], c) + } +} + +func (r *testStorageSuite) TestS3Others(c *C) { + defineS3Flags(&pflag.FlagSet{}) +} + +type mockS3Handler struct { + err error +} + +func (c *mockS3Handler) HeadObject(input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { + return nil, c.err +} +func (c *mockS3Handler) GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) { + if c.err != nil { + return nil, c.err + } + return &s3.GetObjectOutput{ + Body: ioutil.NopCloser(strings.NewReader("HappyFace.jpg")), + }, nil +} +func (c *mockS3Handler) PutObject(input *s3.PutObjectInput) (*s3.PutObjectOutput, error) { + return nil, c.err +} +func (c *mockS3Handler) HeadBucket(input *s3.HeadBucketInput) (*s3.HeadBucketOutput, error) { + return nil, c.err +} +func (c *mockS3Handler) WaitUntilObjectExists(input *s3.HeadObjectInput) error { + return c.err +} From f5a6765be761ee16f0bea2acf428f6ed9725f930 Mon Sep 17 00:00:00 2001 From: DanielZhangQD Date: Thu, 12 Dec 2019 17:48:45 +0800 Subject: [PATCH 06/14] address comments --- pkg/storage/s3.go | 21 ++++++++++----------- pkg/storage/s3_test.go | 13 +++++-------- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 68658cfa9..853386a28 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -64,7 +64,6 @@ type S3BackendOptions struct { Provider string `json:"provider" toml:"provider"` ForcePathStyle bool `json:"force_path_style" toml:"force_path_style"` UseAccelerateEndpoint bool `json:"use_accelerate_endpoint" toml:"use_accelerate_endpoint"` - SendCredential bool `json:"send_credential" toml:"send_credential"` } func (options *S3BackendOptions) apply(s3 *backup.S3) error { @@ -81,15 +80,6 @@ func (options *S3BackendOptions) apply(s3 *backup.S3) error { options.UseAccelerateEndpoint { options.ForcePathStyle = false } - if options.SendCredential { - c := credentials.NewEnvCredentials() - v, cerr := c.Get() - if cerr != nil { - return cerr - } - options.AccessKey = v.AccessKeyID - options.SecretAccessKey = v.SecretAccessKey - } if options.AccessKey == "" && options.SecretAccessKey != "" { return errors.New("access_key not found") } @@ -119,11 +109,20 @@ func defineS3Flags(flags *pflag.FlagSet) { } func getBackendOptionsFromS3Flags(flags *pflag.FlagSet) (options S3BackendOptions, err error) { - options.SendCredential, err = flags.GetBool(flagSendCredentialOption) + send, err := flags.GetBool(flagSendCredentialOption) if err != nil { err = errors.Trace(err) return } + if send { + c := credentials.NewEnvCredentials() + v, cerr := c.Get() + if cerr != nil { + return options, errors.Trace(cerr) + } + options.AccessKey = v.AccessKeyID + options.SecretAccessKey = v.SecretAccessKey + } options.Endpoint, err = flags.GetString(s3EndpointOption) if err != nil { err = errors.Trace(err) diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index f852855fd..6f78be894 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -43,7 +43,6 @@ func (r *testStorageSuite) TestApply(c *C) { name: "access_key not found", options: S3BackendOptions{ Region: "us-west-2", - SendCredential: false, SecretAccessKey: "cd", }, errMsg: "access_key not found", @@ -52,9 +51,8 @@ func (r *testStorageSuite) TestApply(c *C) { { name: "secret_access_key not found", options: S3BackendOptions{ - Region: "us-west-2", - SendCredential: false, - AccessKey: "ab", + Region: "us-west-2", + AccessKey: "ab", }, errMsg: "secret_access_key not found", errReturn: true, @@ -184,12 +182,11 @@ func (r *testStorageSuite) TestApplyUpdate(c *C) { }, }, { - name: "SendCredential", + name: "keys", options: S3BackendOptions{ Region: "us-west-2", - AccessKey: "Access", - SecretAccessKey: "SecretAccess", - SendCredential: true, + AccessKey: "ab", + SecretAccessKey: "cd", }, s3: &backup.S3{ Region: "us-west-2", From 5f1b9526cfb447f3ec8735157bde58a88014014c Mon Sep 17 00:00:00 2001 From: DanielZhangQD Date: Thu, 12 Dec 2019 18:09:45 +0800 Subject: [PATCH 07/14] fix integration test --- pkg/storage/s3.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 853386a28..2418d3f35 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -4,6 +4,7 @@ import ( "bytes" "io/ioutil" "net/http" + "os" "strings" "time" @@ -115,13 +116,8 @@ func getBackendOptionsFromS3Flags(flags *pflag.FlagSet) (options S3BackendOption return } if send { - c := credentials.NewEnvCredentials() - v, cerr := c.Get() - if cerr != nil { - return options, errors.Trace(cerr) - } - options.AccessKey = v.AccessKeyID - options.SecretAccessKey = v.SecretAccessKey + options.AccessKey = os.Getenv("AWS_ACCESS_KEY_ID") + options.SecretAccessKey = os.Getenv("AWS_SECRET_ACCESS_KEY") } options.Endpoint, err = flags.GetString(s3EndpointOption) if err != nil { From d795933b8614101b63d7bcd23e6ad9d0d3ee0d93 Mon Sep 17 00:00:00 2001 From: DanielZhangQD Date: Thu, 12 Dec 2019 19:53:52 +0800 Subject: [PATCH 08/14] address comments --- pkg/storage/parse_test.go | 2 -- pkg/storage/s3.go | 15 ++++++--------- pkg/storage/s3_test.go | 25 ++++++++++++++++--------- 3 files changed, 22 insertions(+), 20 deletions(-) diff --git a/pkg/storage/parse_test.go b/pkg/storage/parse_test.go index 78ebc2592..f1d23cb15 100644 --- a/pkg/storage/parse_test.go +++ b/pkg/storage/parse_test.go @@ -37,8 +37,6 @@ func (r *testStorageSuite) TestCreateStorage(c *C) { c.Assert(err, IsNil) c.Assert(s.GetNoop(), NotNil) - _, err = ParseBackend("s3://bucket/more/prefix/", &BackendOptions{}) - c.Assert(err, ErrorMatches, `must provide either 's3\.region' or 's3\.endpoint'`) _, err = ParseBackend("s3:///bucket/more/prefix/", &BackendOptions{}) c.Assert(err, ErrorMatches, `please specify the bucket for s3 in s3:///bucket/more/prefix/`) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 2418d3f35..ad0cd0dfd 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -68,8 +68,11 @@ type S3BackendOptions struct { } func (options *S3BackendOptions) apply(s3 *backup.S3) error { - if options.Endpoint == "" && options.Region == "" { - return errors.New("must provide either 's3.region' or 's3.endpoint'") + if options.Region == "" && options.Endpoint == "" { + options.Endpoint = "https://s3.amazonaws.com/" + } + if options.Region == "" { + options.Region = "us-east-1" } if options.Endpoint != "" { if !strings.HasPrefix(options.Endpoint, "https://") && @@ -197,12 +200,7 @@ func newS3Storage(s3Back *backup.S3) (*S3Storage, error) { }, } cred := credentials.NewChainCredentials(providers) - if qs.Region == "" && qs.Endpoint == "" { - qs.Endpoint = "https://s3.amazonaws.com/" - } - if qs.Region == "" { - qs.Region = "us-east-1" - } + awsConfig := aws.NewConfig(). WithMaxRetries(maxRetries). WithCredentials(cred). @@ -225,7 +223,6 @@ func newS3Storage(s3Back *backup.S3) (*S3Storage, error) { return nil, errors.Errorf("checkS3Bucket error: %v", err) } - qs.Prefix = strings.Trim(qs.Prefix, "/") qs.Prefix += "/" return &S3Storage{ session: ses, diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index 6f78be894..220b5046c 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -30,15 +30,6 @@ func (r *testStorageSuite) TestApply(c *C) { } } tests := []testcase{ - { - name: "missing region and endpoint", - options: S3BackendOptions{ - Region: "", - Endpoint: "", - }, - errMsg: "must provide either 's3.region' or 's3.endpoint'", - errReturn: true, - }, { name: "access_key not found", options: S3BackendOptions{ @@ -81,6 +72,19 @@ func (r *testStorageSuite) TestApplyUpdate(c *C) { c.Assert(true, Equals, equalS3(s3, test.s3)) } tests := []testcase{ + { + name: "no region and no endpoint", + options: S3BackendOptions{ + Region: "", + Endpoint: "", + }, + s3: &backup.S3{ + Region: "us-east-1", + Bucket: "bucket", + Prefix: "prefix", + Endpoint: "https://s3.amazonaws.com/", + }, + }, { name: "no endpoint", options: S3BackendOptions{ @@ -98,6 +102,7 @@ func (r *testStorageSuite) TestApplyUpdate(c *C) { Endpoint: "https://s3.us-west-2", }, s3: &backup.S3{ + Region: "us-east-1", Endpoint: "https://s3.us-west-2", Bucket: "bucket", Prefix: "prefix", @@ -109,6 +114,7 @@ func (r *testStorageSuite) TestApplyUpdate(c *C) { Endpoint: "http://s3.us-west-2", }, s3: &backup.S3{ + Region: "us-east-1", Endpoint: "http://s3.us-west-2", Bucket: "bucket", Prefix: "prefix", @@ -120,6 +126,7 @@ func (r *testStorageSuite) TestApplyUpdate(c *C) { Endpoint: "s3.us-west-2", }, s3: &backup.S3{ + Region: "us-east-1", Endpoint: "http://s3.us-west-2", Bucket: "bucket", Prefix: "prefix", From ecfd78178697965905d68fd330ba4ce8198e3d6c Mon Sep 17 00:00:00 2001 From: DanielZhangQD Date: Thu, 12 Dec 2019 21:17:45 +0800 Subject: [PATCH 09/14] address comments --- pkg/storage/flags.go | 2 +- pkg/storage/parse_test.go | 1 - pkg/storage/s3.go | 66 +++++++++++++++++++++++---------------- pkg/storage/s3_test.go | 43 +++++++++++++++---------- 4 files changed, 67 insertions(+), 45 deletions(-) diff --git a/pkg/storage/flags.go b/pkg/storage/flags.go index 13dc452e8..26c26dc34 100644 --- a/pkg/storage/flags.go +++ b/pkg/storage/flags.go @@ -14,7 +14,7 @@ const ( // DefineFlags adds flags to the flag set corresponding to all backend options. func DefineFlags(flags *pflag.FlagSet) { flags.BoolP(flagSendCredentialOption, "c", true, - "If send credentials to tikv(default true)") + "Whether send credentials to tikv") defineS3Flags(flags) defineGCSFlags(flags) } diff --git a/pkg/storage/parse_test.go b/pkg/storage/parse_test.go index f1d23cb15..04e925963 100644 --- a/pkg/storage/parse_test.go +++ b/pkg/storage/parse_test.go @@ -7,7 +7,6 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/backup" - // "github.com/pingcap/br/pkg/storage" ) func Test(t *testing.T) { diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index ad0cd0dfd..ee7e82314 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -4,8 +4,7 @@ import ( "bytes" "io/ioutil" "net/http" - "os" - "strings" + "net/url" "time" "github.com/aws/aws-sdk-go/aws" @@ -22,6 +21,10 @@ import ( "github.com/pingcap/kvproto/pkg/backup" ) +var ( + sendCredential bool +) + const ( s3EndpointOption = "s3.endpoint" s3RegionOption = "s3.region" @@ -29,11 +32,11 @@ const ( s3SSEOption = "s3.sse" s3ACLOption = "s3.acl" s3ProviderOption = "s3.provider" - // accessKeyEnv = "AWS_ACCESS_KEY_ID" - // secretAccessKeyEnv = "AWS_SECRET_ACCESS_KEY" - notFound = "NotFound" + notFound = "NotFound" // number of retries to make of operations maxRetries = 3 + // low timeout 1s to ec2 metadata service + timeout = 1 ) // s3Handlers make it easy to inject test functions @@ -48,7 +51,6 @@ type s3Handlers interface { // S3Storage info for s3 storage type S3Storage struct { session *session.Session - // svc *s3.S3 svc s3Handlers options *backup.S3 } @@ -68,18 +70,23 @@ type S3BackendOptions struct { } func (options *S3BackendOptions) apply(s3 *backup.S3) error { - if options.Region == "" && options.Endpoint == "" { - options.Endpoint = "https://s3.amazonaws.com/" - } if options.Region == "" { options.Region = "us-east-1" } if options.Endpoint != "" { - if !strings.HasPrefix(options.Endpoint, "https://") && - !strings.HasPrefix(options.Endpoint, "http://") { - options.Endpoint = "http://" + options.Endpoint + u, err := url.Parse(options.Endpoint) + if err != nil { + return err + } + if u.Scheme == "" { + return errors.New("scheme not found in endpoint") + } + if u.Host == "" { + return errors.New("host not found in endpoint") } } + // In some cases, we need to set ForcePathStyle to false. + // Refer to: https://rclone.org/s3/#s3-force-path-style if options.Provider == "alibaba" || options.Provider == "netease" || options.UseAccelerateEndpoint { options.ForcePathStyle = false @@ -104,24 +111,20 @@ func (options *S3BackendOptions) apply(s3 *backup.S3) error { } func defineS3Flags(flags *pflag.FlagSet) { - flags.String(s3EndpointOption, "", "Set the S3 endpoint URL") - flags.String(s3RegionOption, "", "Set the S3 region") - flags.String(s3StorageClassOption, "", "Set the S3 storage class") - flags.String(s3SSEOption, "", "Set the S3 server-side encryption algorithm") - flags.String(s3ACLOption, "", "Set the S3 canned ACLs") - flags.String(s3ProviderOption, "", "Set the S3 provider") + flags.String(s3EndpointOption, "", "Set the S3 endpoint URL, please specify the http or https scheme explicitly") + flags.String(s3RegionOption, "", "Set the S3 region, e.g. us-east-1") + flags.String(s3StorageClassOption, "", "Set the S3 storage class, e.g. STANDARD") + flags.String(s3SSEOption, "", "Set the S3 server-side encryption algorithm, e.g. AES256") + flags.String(s3ACLOption, "", "Set the S3 canned ACLs, e.g. authenticated-read") + flags.String(s3ProviderOption, "", "Set the S3 provider, e.g. aws, alibaba, ceph") } func getBackendOptionsFromS3Flags(flags *pflag.FlagSet) (options S3BackendOptions, err error) { - send, err := flags.GetBool(flagSendCredentialOption) + sendCredential, err = flags.GetBool(flagSendCredentialOption) if err != nil { err = errors.Trace(err) return } - if send { - options.AccessKey = os.Getenv("AWS_ACCESS_KEY_ID") - options.SecretAccessKey = os.Getenv("AWS_SECRET_ACCESS_KEY") - } options.Endpoint, err = flags.GetString(s3EndpointOption) if err != nil { err = errors.Trace(err) @@ -158,15 +161,15 @@ func getBackendOptionsFromS3Flags(flags *pflag.FlagSet) (options S3BackendOption } // newS3Storage initialize a new s3 storage for metadata -func newS3Storage(s3Back *backup.S3) (*S3Storage, error) { - qs := *s3Back +func newS3Storage(backend *backup.S3) (*S3Storage, error) { + qs := *backend v := credentials.Value{ AccessKeyID: qs.AccessKey, SecretAccessKey: qs.SecretAccessKey, } // low timeout to ec2 metadata service - lowTimeoutClient := &http.Client{Timeout: 1 * time.Second} + lowTimeoutClient := &http.Client{Timeout: time.Duration(timeout) * time.Second} def := defaults.Get() def.Config.HTTPClient = lowTimeoutClient ec2Session, err := session.NewSession() @@ -200,7 +203,16 @@ func newS3Storage(s3Back *backup.S3) (*S3Storage, error) { }, } cred := credentials.NewChainCredentials(providers) - + if sendCredential { + if qs.AccessKey == "" || qs.SecretAccessKey == "" { + v, cerr := cred.Get() + if cerr != nil { + return nil, cerr + } + backend.AccessKey = v.AccessKeyID + backend.SecretAccessKey = v.SecretAccessKey + } + } awsConfig := aws.NewConfig(). WithMaxRetries(maxRetries). WithCredentials(cred). diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index 220b5046c..36e15c64c 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -48,6 +48,30 @@ func (r *testStorageSuite) TestApply(c *C) { errMsg: "secret_access_key not found", errReturn: true, }, + { + name: "scheme not found", + options: S3BackendOptions{ + Endpoint: "12345", + }, + errMsg: "scheme not found in endpoint", + errReturn: true, + }, + { + name: "host not found", + options: S3BackendOptions{ + Endpoint: "http:12345", + }, + errMsg: "host not found in endpoint", + errReturn: true, + }, + { + name: "invalid endpoint", + options: S3BackendOptions{ + Endpoint: "!http:12345", + }, + errMsg: "parse !http:12345: first path segment in URL cannot contain colon", + errReturn: true, + }, } for i := range tests { testFn(&tests[i], c) @@ -79,10 +103,9 @@ func (r *testStorageSuite) TestApplyUpdate(c *C) { Endpoint: "", }, s3: &backup.S3{ - Region: "us-east-1", - Bucket: "bucket", - Prefix: "prefix", - Endpoint: "https://s3.amazonaws.com/", + Region: "us-east-1", + Bucket: "bucket", + Prefix: "prefix", }, }, { @@ -120,18 +143,6 @@ func (r *testStorageSuite) TestApplyUpdate(c *C) { Prefix: "prefix", }, }, - { - name: "no scheme endpoint", - options: S3BackendOptions{ - Endpoint: "s3.us-west-2", - }, - s3: &backup.S3{ - Region: "us-east-1", - Endpoint: "http://s3.us-west-2", - Bucket: "bucket", - Prefix: "prefix", - }, - }, { name: "ceph provider", options: S3BackendOptions{ From a2f7e2c37aeb20cfcba29e3c481fbe4c7b48e37e Mon Sep 17 00:00:00 2001 From: DanielZhangQD Date: Fri, 13 Dec 2019 12:05:33 +0800 Subject: [PATCH 10/14] address comments --- pkg/storage/s3.go | 75 ++++++++++-------------------------------- pkg/storage/s3_test.go | 35 ++++++++++++++++++++ 2 files changed, 53 insertions(+), 57 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index ee7e82314..202217caa 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -3,16 +3,11 @@ package storage import ( "bytes" "io/ioutil" - "net/http" "net/url" - "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds" - "github.com/aws/aws-sdk-go/aws/defaults" - "github.com/aws/aws-sdk-go/aws/ec2metadata" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/spf13/pflag" @@ -35,8 +30,6 @@ const ( notFound = "NotFound" // number of retries to make of operations maxRetries = 3 - // low timeout 1s to ec2 metadata service - timeout = 1 ) // s3Handlers make it easy to inject test functions @@ -163,64 +156,20 @@ func getBackendOptionsFromS3Flags(flags *pflag.FlagSet) (options S3BackendOption // newS3Storage initialize a new s3 storage for metadata func newS3Storage(backend *backup.S3) (*S3Storage, error) { qs := *backend - v := credentials.Value{ - AccessKeyID: qs.AccessKey, - SecretAccessKey: qs.SecretAccessKey, - } - - // low timeout to ec2 metadata service - lowTimeoutClient := &http.Client{Timeout: time.Duration(timeout) * time.Second} - def := defaults.Get() - def.Config.HTTPClient = lowTimeoutClient - ec2Session, err := session.NewSession() - if err != nil { - return nil, err - } - - // first provider to supply a credential set "wins" - providers := []credentials.Provider{ - // use static credentials if they're present (checked by provider) - &credentials.StaticProvider{Value: v}, - - // * Access Key ID: AWS_ACCESS_KEY_ID or AWS_ACCESS_KEY - // * Secret Access Key: AWS_SECRET_ACCESS_KEY or AWS_SECRET_KEY - &credentials.EnvProvider{}, - - // A SharedCredentialsProvider retrieves credentials - // from the current user's home directory. It checks - // AWS_SHARED_CREDENTIALS_FILE and AWS_PROFILE too. - &credentials.SharedCredentialsProvider{}, - - // Pick up IAM role if we're in an ECS task - defaults.RemoteCredProvider(*def.Config, def.Handlers), - - // Pick up IAM role in case we're on EC2 - &ec2rolecreds.EC2RoleProvider{ - Client: ec2metadata.New(ec2Session, &aws.Config{ - HTTPClient: lowTimeoutClient, - }), - ExpiryWindow: 3 * time.Minute, - }, - } - cred := credentials.NewChainCredentials(providers) - if sendCredential { - if qs.AccessKey == "" || qs.SecretAccessKey == "" { - v, cerr := cred.Get() - if cerr != nil { - return nil, cerr - } - backend.AccessKey = v.AccessKeyID - backend.SecretAccessKey = v.SecretAccessKey - } + var cred *credentials.Credentials + if qs.AccessKey != "" && qs.SecretAccessKey != "" { + cred = credentials.NewStaticCredentials(qs.AccessKey, qs.SecretAccessKey, "") } awsConfig := aws.NewConfig(). WithMaxRetries(maxRetries). - WithCredentials(cred). WithS3ForcePathStyle(qs.ForcePathStyle). WithRegion(qs.Region) if qs.Endpoint != "" { awsConfig.WithEndpoint(qs.Endpoint) } + if cred != nil { + awsConfig.WithCredentials(cred) + } // awsConfig.WithLogLevel(aws.LogDebugWithSigning) awsSessionOpts := session.Options{ Config: *awsConfig, @@ -229,6 +178,18 @@ func newS3Storage(backend *backup.S3) (*S3Storage, error) { if err != nil { return nil, err } + + if sendCredential && ses.Config.Credentials != nil { + if qs.AccessKey == "" || qs.SecretAccessKey == "" { + v, cerr := ses.Config.Credentials.Get() + if cerr != nil { + return nil, cerr + } + backend.AccessKey = v.AccessKeyID + backend.SecretAccessKey = v.SecretAccessKey + } + } + c := s3.New(ses) err = checkS3Bucket(c, qs.Bucket) if err != nil { diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index 36e15c64c..15e8a52e0 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -243,6 +243,7 @@ func (r *testStorageSuite) TestS3Storage(c *C) { } testFn := func(test *testcase, c *C) { c.Log(test.name) + sendCredential = true if test.hackCheck { checkS3Bucket = func(svc *s3.S3, bucket string) error { return nil } } @@ -311,6 +312,40 @@ func (r *testStorageSuite) TestS3Storage(c *C) { errReturn: false, hackCheck: true, }, + { + name: "keys configured explicitly", + s3: &backup.S3{ + Region: "us-west-2", + AccessKey: "ab", + SecretAccessKey: "cd", + Bucket: "bucket", + Prefix: "prefix", + }, + errReturn: false, + hackCheck: true, + }, + { + name: "no access key", + s3: &backup.S3{ + Region: "us-west-2", + SecretAccessKey: "cd", + Bucket: "bucket", + Prefix: "prefix", + }, + errReturn: false, + hackCheck: true, + }, + { + name: "no secret access key", + s3: &backup.S3{ + Region: "us-west-2", + AccessKey: "ab", + Bucket: "bucket", + Prefix: "prefix", + }, + errReturn: false, + hackCheck: true, + }, } for i := range tests { testFn(&tests[i], c) From bf30a762cb49266678a9f408ce4c48e274edc8db Mon Sep 17 00:00:00 2001 From: DanielZhangQD Date: Fri, 13 Dec 2019 13:52:18 +0800 Subject: [PATCH 11/14] address comments --- pkg/storage/s3.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 202217caa..85a7ed5a7 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -52,14 +52,14 @@ type S3Storage struct { type S3BackendOptions struct { Endpoint string `json:"endpoint" toml:"endpoint"` Region string `json:"region" toml:"region"` - StorageClass string `json:"storage_class" toml:"storage_class"` + StorageClass string `json:"storage-class" toml:"storage-class"` SSE string `json:"sse" toml:"sse"` ACL string `json:"acl" toml:"acl"` - AccessKey string `json:"access_key" toml:"access_key"` - SecretAccessKey string `json:"secret_access_key" toml:"secret_access_key"` + AccessKey string `json:"access-key" toml:"access-key"` + SecretAccessKey string `json:"secret-access-key" toml:"secret-access-key"` Provider string `json:"provider" toml:"provider"` - ForcePathStyle bool `json:"force_path_style" toml:"force_path_style"` - UseAccelerateEndpoint bool `json:"use_accelerate_endpoint" toml:"use_accelerate_endpoint"` + ForcePathStyle bool `json:"force-path-style" toml:"force-path-style"` + UseAccelerateEndpoint bool `json:"use-accelerate-endpoint" toml:"use-accelerate-endpoint"` } func (options *S3BackendOptions) apply(s3 *backup.S3) error { @@ -156,10 +156,6 @@ func getBackendOptionsFromS3Flags(flags *pflag.FlagSet) (options S3BackendOption // newS3Storage initialize a new s3 storage for metadata func newS3Storage(backend *backup.S3) (*S3Storage, error) { qs := *backend - var cred *credentials.Credentials - if qs.AccessKey != "" && qs.SecretAccessKey != "" { - cred = credentials.NewStaticCredentials(qs.AccessKey, qs.SecretAccessKey, "") - } awsConfig := aws.NewConfig(). WithMaxRetries(maxRetries). WithS3ForcePathStyle(qs.ForcePathStyle). @@ -167,6 +163,10 @@ func newS3Storage(backend *backup.S3) (*S3Storage, error) { if qs.Endpoint != "" { awsConfig.WithEndpoint(qs.Endpoint) } + var cred *credentials.Credentials + if qs.AccessKey != "" && qs.SecretAccessKey != "" { + cred = credentials.NewStaticCredentials(qs.AccessKey, qs.SecretAccessKey, "") + } if cred != nil { awsConfig.WithCredentials(cred) } @@ -193,7 +193,7 @@ func newS3Storage(backend *backup.S3) (*S3Storage, error) { c := s3.New(ses) err = checkS3Bucket(c, qs.Bucket) if err != nil { - return nil, errors.Errorf("checkS3Bucket error: %v", err) + return nil, errors.Errorf("Bucket %s is not accessible: %v", qs.Bucket, err) } qs.Prefix += "/" From 296c6cfa705582fd33789124484f729ffea897dc Mon Sep 17 00:00:00 2001 From: DanielZhangQD Date: Fri, 13 Dec 2019 13:55:51 +0800 Subject: [PATCH 12/14] address comments --- pkg/storage/s3_test.go | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index 15e8a52e0..c84c62031 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -93,7 +93,8 @@ func (r *testStorageSuite) TestApplyUpdate(c *C) { u, err := ParseBackend("s3://bucket/prefix/", &BackendOptions{S3: test.options}) s3 := u.GetS3() c.Assert(err, IsNil) - c.Assert(true, Equals, equalS3(s3, test.s3)) + c.Assert(s3, DeepEquals, test.s3) + } tests := []testcase{ { @@ -221,19 +222,6 @@ func (r *testStorageSuite) TestApplyUpdate(c *C) { } } -func equalS3(s1, s2 *backup.S3) bool { - return s1.Endpoint == s2.Endpoint && - s1.Region == s2.Region && - s1.Bucket == s2.Bucket && - s1.Prefix == s2.Prefix && - s1.StorageClass == s2.StorageClass && - s1.Sse == s2.Sse && - s1.Acl == s2.Acl && - s1.AccessKey == s2.AccessKey && - s1.SecretAccessKey == s2.SecretAccessKey && - s1.ForcePathStyle == s2.ForcePathStyle -} - func (r *testStorageSuite) TestS3Storage(c *C) { type testcase struct { name string From a0898c769b4abdc3f9ba714fc9488f02ca7f8a3c Mon Sep 17 00:00:00 2001 From: DanielZhangQD Date: Fri, 13 Dec 2019 14:34:06 +0800 Subject: [PATCH 13/14] resolve conflicts --- go.mod | 1 + go.sum | 4 ++++ pkg/storage/parse_test.go | 10 +++++----- pkg/storage/s3.go | 30 ++++++++++++++---------------- pkg/storage/s3_test.go | 28 +++++++++++++++++++--------- 5 files changed, 43 insertions(+), 30 deletions(-) diff --git a/go.mod b/go.mod index 8e3a8bc0b..72e95007a 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.13 require ( cloud.google.com/go/storage v1.4.0 + github.com/aws/aws-sdk-go v1.26.1 github.com/cheggaaa/pb/v3 v3.0.1 github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect diff --git a/go.sum b/go.sum index 4e32aacf4..e143fb43e 100644 --- a/go.sum +++ b/go.sum @@ -27,6 +27,8 @@ github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmx github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/aws/aws-sdk-go v1.26.1 h1:JGQggXhOiNJIqsmbYUl3cYtJZUffeOWlHtxfzGK7WPI= +github.com/aws/aws-sdk-go v1.26.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -171,6 +173,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= diff --git a/pkg/storage/parse_test.go b/pkg/storage/parse_test.go index 04e925963..c13216f46 100644 --- a/pkg/storage/parse_test.go +++ b/pkg/storage/parse_test.go @@ -58,20 +58,20 @@ func (r *testStorageSuite) TestCreateStorage(c *C) { fakeCredentialsFile.Close() os.Remove(fakeCredentialsFile.Name()) }() - gcsOpt := &storage.BackendOptions{ - GCS: storage.GCSBackendOptions{ + gcsOpt := &BackendOptions{ + GCS: GCSBackendOptions{ Endpoint: "https://gcs.example.com/", CredentialsFile: fakeCredentialsFile.Name(), }, } - s, err = storage.ParseBackend("gcs://bucket2/prefix/", gcsOpt) + s, err = ParseBackend("gcs://bucket2/prefix/", gcsOpt) c.Assert(err, IsNil) gcs := s.GetGcs() c.Assert(gcs, NotNil) c.Assert(gcs.Bucket, Equals, "bucket2") c.Assert(gcs.Prefix, Equals, "prefix/") c.Assert(gcs.Endpoint, Equals, "https://gcs.example.com/") - s, err = storage.ParseBackend("gcs://bucket/more/prefix/", gcsOpt) + s, err = ParseBackend("gcs://bucket/more/prefix/", gcsOpt) c.Assert(err, IsNil) gcs = s.GetGcs() c.Assert(gcs, NotNil) @@ -106,7 +106,7 @@ func (r *testStorageSuite) TestFormatBackendURL(c *C) { }) c.Assert(url.String(), Equals, "s3://bucket/some%20prefix/") - url = storage.FormatBackendURL(&backup.StorageBackend{ + url = FormatBackendURL(&backup.StorageBackend{ Backend: &backup.StorageBackend_Gcs{ Gcs: &backup.GCS{ Bucket: "bucket", diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 85a7ed5a7..f1a5421ae 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -2,12 +2,14 @@ package storage import ( "bytes" + "context" "io/ioutil" "net/url" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/spf13/pflag" @@ -34,11 +36,11 @@ const ( // s3Handlers make it easy to inject test functions type s3Handlers interface { - HeadObject(*s3.HeadObjectInput) (*s3.HeadObjectOutput, error) - GetObject(*s3.GetObjectInput) (*s3.GetObjectOutput, error) - PutObject(*s3.PutObjectInput) (*s3.PutObjectOutput, error) - HeadBucket(*s3.HeadBucketInput) (*s3.HeadBucketOutput, error) - WaitUntilObjectExists(*s3.HeadObjectInput) error + HeadObjectWithContext(context.Context, *s3.HeadObjectInput, ...request.Option) (*s3.HeadObjectOutput, error) + GetObjectWithContext(context.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) + PutObjectWithContext(context.Context, *s3.PutObjectInput, ...request.Option) (*s3.PutObjectOutput, error) + HeadBucketWithContext(context.Context, *s3.HeadBucketInput, ...request.Option) (*s3.HeadBucketOutput, error) + WaitUntilObjectExistsWithContext(context.Context, *s3.HeadObjectInput, ...request.WaiterOption) error } // S3Storage info for s3 storage @@ -214,7 +216,7 @@ var checkS3Bucket = func(svc *s3.S3, bucket string) error { } // Write write to s3 storage -func (rs *S3Storage) Write(file string, data []byte) error { +func (rs *S3Storage) Write(ctx context.Context, file string, data []byte) error { input := &s3.PutObjectInput{ Body: aws.ReadSeekCloser(bytes.NewReader(data)), Bucket: aws.String(rs.options.Bucket), @@ -230,8 +232,7 @@ func (rs *S3Storage) Write(file string, data []byte) error { input = input.SetStorageClass(rs.options.StorageClass) } - // TODO: PutObjectWithContext - _, err := rs.svc.PutObject(input) + _, err := rs.svc.PutObjectWithContext(ctx, input) if err != nil { return err } @@ -239,20 +240,18 @@ func (rs *S3Storage) Write(file string, data []byte) error { Bucket: aws.String(rs.options.Bucket), Key: aws.String(rs.options.Prefix + file), } - // TODO: WaitUntilObjectExistsWithContext - err = rs.svc.WaitUntilObjectExists(hinput) + err = rs.svc.WaitUntilObjectExistsWithContext(ctx, hinput) return err } // Read read file from s3 -func (rs *S3Storage) Read(file string) ([]byte, error) { +func (rs *S3Storage) Read(ctx context.Context, file string) ([]byte, error) { input := &s3.GetObjectInput{ Bucket: aws.String(rs.options.Bucket), Key: aws.String(rs.options.Prefix + file), } - // TODO: GetObjectWithContext - result, err := rs.svc.GetObject(input) + result, err := rs.svc.GetObjectWithContext(ctx, input) if err != nil { return nil, err } @@ -265,14 +264,13 @@ func (rs *S3Storage) Read(file string) ([]byte, error) { } // FileExists check if file exists on s3 storage -func (rs *S3Storage) FileExists(file string) (bool, error) { +func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) { input := &s3.HeadObjectInput{ Bucket: aws.String(rs.options.Bucket), Key: aws.String(rs.options.Prefix + file), } - // TODO: HeadObjectWithContext - _, err := rs.svc.HeadObject(input) + _, err := rs.svc.HeadObjectWithContext(ctx, input) if err != nil { if aerr, ok := err.(awserr.Error); ok { switch aerr.Code() { diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index c84c62031..a5c5af61e 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -1,11 +1,14 @@ package storage import ( + "context" "io/ioutil" "os" "strings" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/s3" . "github.com/pingcap/check" "github.com/pingcap/errors" @@ -231,6 +234,7 @@ func (r *testStorageSuite) TestS3Storage(c *C) { } testFn := func(test *testcase, c *C) { c.Log(test.name) + ctx := aws.BackgroundContext() sendCredential = true if test.hackCheck { checkS3Bucket = func(svc *s3.S3, bucket string) error { return nil } @@ -240,7 +244,7 @@ func (r *testStorageSuite) TestS3Storage(c *C) { S3: test.s3, }, } - _, err := Create(s3) + _, err := Create(ctx, s3) if test.errReturn { c.Assert(err, NotNil) return @@ -348,15 +352,16 @@ func (r *testStorageSuite) TestS3Handlers(c *C) { testFn := func(test *testcase, c *C) { c.Log(test.name) + ctx := aws.BackgroundContext() ms3 := S3Storage{ svc: test.mh, options: test.options, } - err := ms3.Write("file", []byte("test")) + err := ms3.Write(ctx, "file", []byte("test")) c.Assert(err, Equals, test.mh.err) - _, err = ms3.Read("file") + _, err = ms3.Read(ctx, "file") c.Assert(err, Equals, test.mh.err) - _, err = ms3.FileExists("file") + _, err = ms3.FileExists(ctx, "file") if err != nil { c.Assert(err, Equals, test.mh.err) } @@ -423,10 +428,12 @@ type mockS3Handler struct { err error } -func (c *mockS3Handler) HeadObject(input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { +func (c *mockS3Handler) HeadObjectWithContext(ctx context.Context, + input *s3.HeadObjectInput, opts ...request.Option) (*s3.HeadObjectOutput, error) { return nil, c.err } -func (c *mockS3Handler) GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) { +func (c *mockS3Handler) GetObjectWithContext(ctx context.Context, + input *s3.GetObjectInput, opts ...request.Option) (*s3.GetObjectOutput, error) { if c.err != nil { return nil, c.err } @@ -434,12 +441,15 @@ func (c *mockS3Handler) GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput Body: ioutil.NopCloser(strings.NewReader("HappyFace.jpg")), }, nil } -func (c *mockS3Handler) PutObject(input *s3.PutObjectInput) (*s3.PutObjectOutput, error) { +func (c *mockS3Handler) PutObjectWithContext(ctx context.Context, + input *s3.PutObjectInput, opts ...request.Option) (*s3.PutObjectOutput, error) { return nil, c.err } -func (c *mockS3Handler) HeadBucket(input *s3.HeadBucketInput) (*s3.HeadBucketOutput, error) { +func (c *mockS3Handler) HeadBucketWithContext(ctx context.Context, + input *s3.HeadBucketInput, opts ...request.Option) (*s3.HeadBucketOutput, error) { return nil, c.err } -func (c *mockS3Handler) WaitUntilObjectExists(input *s3.HeadObjectInput) error { +func (c *mockS3Handler) WaitUntilObjectExistsWithContext(ctx context.Context, + input *s3.HeadObjectInput, opts ...request.WaiterOption) error { return c.err } From 115aa7ced0e2579b7a4a36db6acbbcbf484ac8e1 Mon Sep 17 00:00:00 2001 From: DanielZhangQD Date: Fri, 13 Dec 2019 15:08:10 +0800 Subject: [PATCH 14/14] fix comments --- pkg/storage/s3.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index f1a5421ae..a64f8798c 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -206,7 +206,7 @@ func newS3Storage(backend *backup.S3) (*S3Storage, error) { }, nil } -// checkBucket checks if a bucket exists and creates it if not +// checkBucket checks if a bucket exists var checkS3Bucket = func(svc *s3.S3, bucket string) error { input := &s3.HeadBucketInput{ Bucket: aws.String(bucket),