From 9ee6dda7222c754192096f386cdc0408a9ac59e3 Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 28 Jul 2020 16:21:16 +0800 Subject: [PATCH 01/14] update storage --- pkg/storage/gcs.go | 10 +++ pkg/storage/local.go | 23 +++++++ pkg/storage/noop.go | 28 ++++++++- pkg/storage/s3.go | 135 ++++++++++++++++++++++++++++++++++++++++- pkg/storage/storage.go | 11 ++++ pkg/utils/math.go | 11 ++++ pkg/utils/math_test.go | 7 +++ 7 files changed, 221 insertions(+), 4 deletions(-) diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index ccfaaf70b..0d7967fe3 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -123,6 +123,16 @@ func (s *gcsStorage) FileExists(ctx context.Context, name string) (bool, error) return true, nil } +// Open a Reader by file name +func (s *gcsStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { + return nil, errors.New("Unsupported Operation") +} + +// WalkDir traverse all the files in a dir +func (s *gcsStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { + return errors.New("Unsupported Operation") +} + func newGCSStorage(ctx context.Context, gcs *backup.GCS, sendCredential bool) (*gcsStorage, error) { return newGCSStorageWithHTTPClient(ctx, gcs, nil, sendCredential) } diff --git a/pkg/storage/local.go b/pkg/storage/local.go index e49fe16ee..6807b0478 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -4,9 +4,11 @@ package storage import ( "context" + "github.com/pingcap/errors" "io/ioutil" "os" "path" + "path/filepath" ) // localStorage represents local file system storage. @@ -31,6 +33,24 @@ func (l *localStorage) FileExists(ctx context.Context, name string) (bool, error return pathExists(filepath) } +func (l *localStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { + return filepath.Walk(l.base, func(path string, f os.FileInfo, err error) error { + if err != nil { + return errors.Trace(err) + } + + if f == nil || f.IsDir() { + return nil + } + + return fn(f.Name(), f.Size()) + }) +} + +func (l *localStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { + return os.Open(name) +} + func pathExists(_path string) (bool, error) { _, err := os.Stat(_path) if err != nil { @@ -55,3 +75,6 @@ func newLocalStorage(base string) (*localStorage, error) { } return &localStorage{base: base}, nil } + +type LocalReader struct { +} diff --git a/pkg/storage/noop.go b/pkg/storage/noop.go index bfd79a7cf..3cd6ee61f 100644 --- a/pkg/storage/noop.go +++ b/pkg/storage/noop.go @@ -2,7 +2,9 @@ package storage -import "context" +import ( + "context" +) type noopStorage struct{} @@ -21,6 +23,30 @@ func (*noopStorage) FileExists(ctx context.Context, name string) (bool, error) { return false, nil } +// Open a Reader by file name +func (*noopStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { + return noopReader{}, nil +} + +// WalkDir traverse all the files in a dir +func (*noopStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { + return nil +} + func newNoopStorage() *noopStorage { return &noopStorage{} } + +type noopReader struct{} + +func (noopReader) Read(p []byte) (n int, err error) { + return 0, nil +} + +func (noopReader) Close() error { + return nil +} + +func (noopReader) Seek(offset int64, whence int) (int64, error) { + return 0, nil +} diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index d8d6e780f..9afab2b5f 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -5,16 +5,18 @@ package storage import ( "bytes" "context" - "io/ioutil" - "net/url" - + "fmt" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" + "github.com/pingcap/br/pkg/utils" "github.com/spf13/pflag" + "io" + "io/ioutil" + "net/url" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" @@ -38,6 +40,7 @@ type s3Handlers interface { HeadObjectWithContext(context.Context, *s3.HeadObjectInput, ...request.Option) (*s3.HeadObjectOutput, error) GetObjectWithContext(context.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) PutObjectWithContext(context.Context, *s3.PutObjectInput, ...request.Option) (*s3.PutObjectOutput, error) + ListObjectsWithContext(context.Context, *s3.ListObjectsInput, ...request.Option) (*s3.ListObjectsOutput, error) HeadBucketWithContext(context.Context, *s3.HeadBucketInput, ...request.Option) (*s3.HeadBucketOutput, error) WaitUntilObjectExistsWithContext(context.Context, *s3.HeadObjectInput, ...request.WaiterOption) error } @@ -292,3 +295,129 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) return true, err } + +func (rs *S3Storage) WalkDir(ctx context.Context, fn func(string, int64) error) error { + var marker *string + maxKeys := int64(1000) + req := &s3.ListObjectsInput{ + Bucket: &rs.options.Bucket, + Prefix: &rs.options.Prefix, + MaxKeys: &maxKeys, + } + for { + req.Marker = marker + res, err := rs.svc.ListObjectsWithContext(ctx, req) + if err != nil { + return err + } + for _, r := range res.Contents { + if err = fn(*r.Key, *r.Size); err != nil { + return err + } + } + if res.IsTruncated != nil && *res.IsTruncated { + marker = res.Marker + } else { + break + } + } + + return nil +} + +func (rs *S3Storage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { + reader, err := rs.open(ctx, name, 0, 0) + if err != nil { + return nil, err + } + return &S3ObjectReader{ + storage: rs, + name: name, + reader: reader, + }, nil +} + +func (rs *S3Storage) open(ctx context.Context, name string, startOffset int64, endOffset int64) (io.ReadCloser, error) { + input := &s3.GetObjectInput{ + Bucket: aws.String(rs.options.Bucket), + Key: aws.String(rs.options.Prefix + name), + } + + var rangeOffset string + if startOffset > 0 { + if endOffset > startOffset { + rangeOffset = fmt.Sprintf("%d-%d", startOffset, endOffset) + } else { + rangeOffset = fmt.Sprintf("%d-", startOffset) + } + input.Range = &rangeOffset + } + + result, err := rs.svc.GetObjectWithContext(ctx, input) + if err != nil { + return nil, err + } + + if rangeOffset != "" && (result.AcceptRanges == nil || *result.AcceptRanges != rangeOffset) { + return nil, errors.Errorf("open file '%' failed, expected range: %s, got: %s", rangeOffset, *result.AcceptRanges) + } + + return result.Body, nil +} + +type S3ObjectReader struct { + storage *S3Storage + name string + reader io.ReadCloser + pos int64 +} + +func (r *S3ObjectReader) Read(p []byte) (n int, err error) { + n, err = r.reader.Read(p) + r.pos += int64(n) + return +} + +func (r *S3ObjectReader) Close() error { + return r.reader.Close() +} + +func (r *S3ObjectReader) Seek(offset int64, whence int) (int64, error) { + // if seek ahead no more than 64k, we read add drop these data + var realOffset int64 + if whence == io.SeekStart { + realOffset = offset + } else if whence == io.SeekCurrent { + realOffset = r.pos + offset + } else { + // TODO + return 0, errors.New("seek by SeekEnd is not supported yet") + } + if realOffset > r.pos && offset-r.pos < 1<<16 { + batch := int64(4096) + buf := make([]byte, batch, batch) + total := offset - r.pos + remain := total + for remain > 0 { + n, err := r.Read(buf[:utils.MinInt64(remain, batch)]) + if err != nil { + return total - remain + int64(n), err + } + remain -= int64(n) + } + return realOffset, nil + } + + // close current read and open a new one + err := r.reader.Close() + if err == nil { + return 0, err + } + + newReader, err := r.storage.open(context.Background(), r.name, realOffset, 0) + if err != nil { + return 0, err + } + r.reader = newReader + return realOffset, nil +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index f93acac55..da4f82691 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -4,11 +4,18 @@ package storage import ( "context" + "io" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" ) +type ReadSeekCloser interface { + io.Reader + io.Seeker + io.Closer +} + // ExternalStorage represents a kind of file system storage. type ExternalStorage interface { // Write file to storage @@ -17,6 +24,10 @@ type ExternalStorage interface { Read(ctx context.Context, name string) ([]byte, error) // FileExists return true if file exists FileExists(ctx context.Context, name string) (bool, error) + // Open a Reader by file name + Open(ctx context.Context, name string) (ReadSeekCloser, error) + // WalkDir traverse all the files in a dir + WalkDir(ctx context.Context, fn func(string, int64) error) error } // Create creates ExternalStorage. diff --git a/pkg/utils/math.go b/pkg/utils/math.go index 42615ff48..0b8b6dd52 100644 --- a/pkg/utils/math.go +++ b/pkg/utils/math.go @@ -37,3 +37,14 @@ func ClampInt(n, min, max int) int { return MinInt(max, MaxInt(min, n)) } + +// MinInt choice smallest integer from its arguments. +func MinInt64(x int64, xs ...int64) int64 { + min := x + for _, n := range xs { + if n < min { + min = n + } + } + return min +} diff --git a/pkg/utils/math_test.go b/pkg/utils/math_test.go index 91c3300ad..27c9821bd 100644 --- a/pkg/utils/math_test.go +++ b/pkg/utils/math_test.go @@ -31,3 +31,10 @@ func (*testMathSuite) TestClampInt(c *C) { c.Assert(ClampInt(0, 1, 1), Equals, 1) c.Assert(ClampInt(100, 1, 1), Equals, 1) } + +func (*testMathSuite) TestMinInt64(c *C) { + c.Assert(MinInt(1, 2), Equals, 1) + c.Assert(MinInt(2, 1), Equals, 1) + c.Assert(MinInt(4, 2, 1, 3), Equals, 1) + c.Assert(MinInt(1, 1), Equals, 1) +} From 4b408129dc00320ba5d43af9d2162b7e500033d8 Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 28 Jul 2020 18:18:17 +0800 Subject: [PATCH 02/14] update --- pkg/storage/local.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/storage/local.go b/pkg/storage/local.go index 6807b0478..45c6f908d 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -62,7 +62,9 @@ func pathExists(_path string) (bool, error) { return true, nil } -func newLocalStorage(base string) (*localStorage, error) { +// NewLocalStorage return a localStorage at directory `base` +// export for test use +func NewLocalStorage(base string) (*localStorage, error) { ok, err := pathExists(base) if err != nil { return nil, err From 043e0b35c197244b6eebebc6df1c0b6e9e240e4a Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 28 Jul 2020 19:30:20 +0800 Subject: [PATCH 03/14] fix --- pkg/storage/storage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index da4f82691..caa118923 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -34,7 +34,7 @@ type ExternalStorage interface { func Create(ctx context.Context, backend *backup.StorageBackend, sendCreds bool) (ExternalStorage, error) { switch backend := backend.Backend.(type) { case *backup.StorageBackend_Local: - return newLocalStorage(backend.Local.Path) + return NewLocalStorage(backend.Local.Path) case *backup.StorageBackend_S3: if backend.S3 == nil { return nil, errors.New("s3 config not found") From 568b53aea3d08e3b9dabad1f7a996e37b463c049 Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 29 Jul 2020 11:01:21 +0800 Subject: [PATCH 04/14] export s3 flags --- pkg/storage/flags.go | 4 ++-- pkg/storage/s3.go | 4 ++-- pkg/storage/s3_test.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/storage/flags.go b/pkg/storage/flags.go index c828f57a1..adbaa986c 100644 --- a/pkg/storage/flags.go +++ b/pkg/storage/flags.go @@ -8,13 +8,13 @@ import ( // DefineFlags adds flags to the flag set corresponding to all backend options. func DefineFlags(flags *pflag.FlagSet) { - defineS3Flags(flags) + DefineS3Flags(flags) defineGCSFlags(flags) } // ParseFromFlags obtains the backend options from the flag set. func (options *BackendOptions) ParseFromFlags(flags *pflag.FlagSet) error { - if err := options.S3.parseFromFlags(flags); err != nil { + if err := options.S3.ParseFromFlags(flags); err != nil { return err } return options.GCS.parseFromFlags(flags) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 9afab2b5f..201f18ffe 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -109,7 +109,7 @@ func (options *S3BackendOptions) apply(s3 *backup.S3) error { return nil } -func defineS3Flags(flags *pflag.FlagSet) { +func DefineS3Flags(flags *pflag.FlagSet) { // TODO: remove experimental tag if it's stable flags.String(s3EndpointOption, "", "(experimental) Set the S3 endpoint URL, please specify the http or https scheme explicitly") @@ -122,7 +122,7 @@ func defineS3Flags(flags *pflag.FlagSet) { flags.String(s3ProviderOption, "", "(experimental) Set the S3 provider, e.g. aws, alibaba, ceph") } -func (options *S3BackendOptions) parseFromFlags(flags *pflag.FlagSet) error { +func (options *S3BackendOptions) ParseFromFlags(flags *pflag.FlagSet) error { var err error options.Endpoint, err = flags.GetString(s3EndpointOption) if err != nil { diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index 3e3d874fb..18cae9d33 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -449,7 +449,7 @@ func (r *testStorageSuite) TestS3Handlers(c *C) { } func (r *testStorageSuite) TestS3Others(c *C) { - defineS3Flags(&pflag.FlagSet{}) + DefineS3Flags(&pflag.FlagSet{}) } type mockS3Handler struct { From 3cc77d5550f7023fca114e6d17a93f8c8001be03 Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 29 Jul 2020 13:51:30 +0800 Subject: [PATCH 05/14] fix local path --- pkg/storage/local.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/local.go b/pkg/storage/local.go index 45c6f908d..a043914ed 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -48,7 +48,7 @@ func (l *localStorage) WalkDir(ctx context.Context, fn func(string, int64) error } func (l *localStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { - return os.Open(name) + return os.Open(path.Join(l.base, name)) } func pathExists(_path string) (bool, error) { From f7e38f327e0d8987b9a0b4ce40e53edb01c09619 Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 29 Jul 2020 19:25:44 +0800 Subject: [PATCH 06/14] fix s3 reader --- pkg/storage/s3.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 201f18ffe..428cb242f 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -393,6 +393,11 @@ func (r *S3ObjectReader) Seek(offset int64, whence int) (int64, error) { // TODO return 0, errors.New("seek by SeekEnd is not supported yet") } + + if realOffset == r.pos { + return realOffset, nil + } + if realOffset > r.pos && offset-r.pos < 1<<16 { batch := int64(4096) buf := make([]byte, batch, batch) @@ -410,7 +415,7 @@ func (r *S3ObjectReader) Seek(offset int64, whence int) (int64, error) { // close current read and open a new one err := r.reader.Close() - if err == nil { + if err != nil { return 0, err } From fb34eff52143bb9047ff387212d2555a44480262 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 30 Jul 2020 14:58:35 +0800 Subject: [PATCH 07/14] fix range offset --- pkg/storage/s3.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 428cb242f..0722dc640 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -343,14 +343,14 @@ func (rs *S3Storage) open(ctx context.Context, name string, startOffset int64, e Key: aws.String(rs.options.Prefix + name), } - var rangeOffset string + var rangeOffset *string if startOffset > 0 { if endOffset > startOffset { - rangeOffset = fmt.Sprintf("%d-%d", startOffset, endOffset) + rangeOffset = aws.String(fmt.Sprintf("bytes=%d-%d", startOffset, endOffset)) } else { - rangeOffset = fmt.Sprintf("%d-", startOffset) + rangeOffset = aws.String(fmt.Sprintf("bytes=%d-", startOffset)) } - input.Range = &rangeOffset + input.Range = rangeOffset } result, err := rs.svc.GetObjectWithContext(ctx, input) @@ -358,7 +358,7 @@ func (rs *S3Storage) open(ctx context.Context, name string, startOffset int64, e return nil, err } - if rangeOffset != "" && (result.AcceptRanges == nil || *result.AcceptRanges != rangeOffset) { + if rangeOffset != nil && (result.AcceptRanges == nil || *result.AcceptRanges != *rangeOffset) { return nil, errors.Errorf("open file '%' failed, expected range: %s, got: %s", rangeOffset, *result.AcceptRanges) } @@ -424,5 +424,6 @@ func (r *S3ObjectReader) Seek(offset int64, whence int) (int64, error) { return 0, err } r.reader = newReader + r.pos = realOffset return realOffset, nil } From 6241ed3b9b6a26f16886202e8028701e0f4817e1 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 30 Jul 2020 15:22:41 +0800 Subject: [PATCH 08/14] fix the req range errors --- pkg/storage/gcs.go | 6 ++++-- pkg/storage/s3.go | 8 +++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index 0d7967fe3..a5576278c 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -125,12 +125,14 @@ func (s *gcsStorage) FileExists(ctx context.Context, name string) (bool, error) // Open a Reader by file name func (s *gcsStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { - return nil, errors.New("Unsupported Operation") + // TODO, implement this if needed + panic("Unsupported Operation") } // WalkDir traverse all the files in a dir func (s *gcsStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { - return errors.New("Unsupported Operation") + // TODO, implement this if needed + panic("Unsupported Operation") } func newGCSStorage(ctx context.Context, gcs *backup.GCS, sendCredential bool) (*gcsStorage, error) { diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 0722dc640..0084ea595 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -358,9 +358,11 @@ func (rs *S3Storage) open(ctx context.Context, name string, startOffset int64, e return nil, err } - if rangeOffset != nil && (result.AcceptRanges == nil || *result.AcceptRanges != *rangeOffset) { - return nil, errors.Errorf("open file '%' failed, expected range: %s, got: %s", rangeOffset, *result.AcceptRanges) - } + // FIXME: we test in minio, when request with Range, the result.AcceptRanges is a bare string 'range', not sure + // whether this is a feature or bug + //if rangeOffset != nil && (result.AcceptRanges == nil || *result.AcceptRanges != *rangeOffset) { + // return nil, errors.Errorf("open file '%s' failed, expected range: %s, got: %v", name, *rangeOffset, result.AcceptRanges) + //} return result.Body, nil } From 0acd776cf176d6bd8fad25f127dbbe7581c2b509 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 30 Jul 2020 15:49:46 +0800 Subject: [PATCH 09/14] fix comments --- pkg/storage/local.go | 29 +++++++++++++++-------------- pkg/storage/s3.go | 28 +++++++++++++++++++--------- pkg/storage/storage.go | 1 + pkg/utils/math.go | 2 +- 4 files changed, 36 insertions(+), 24 deletions(-) diff --git a/pkg/storage/local.go b/pkg/storage/local.go index a043914ed..631c6a1bf 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -4,36 +4,39 @@ package storage import ( "context" - "github.com/pingcap/errors" "io/ioutil" "os" "path" "path/filepath" + + "github.com/pingcap/errors" ) -// localStorage represents local file system storage. -type localStorage struct { +// LocalStorage represents local file system storage. +// export for using in tests +type LocalStorage struct { base string } -func (l *localStorage) Write(ctx context.Context, name string, data []byte) error { +func (l *LocalStorage) Write(ctx context.Context, name string, data []byte) error { filepath := path.Join(l.base, name) return ioutil.WriteFile(filepath, data, 0644) // nolint:gosec // the backupmeta file _is_ intended to be world-readable. } -func (l *localStorage) Read(ctx context.Context, name string) ([]byte, error) { +func (l *LocalStorage) Read(ctx context.Context, name string) ([]byte, error) { filepath := path.Join(l.base, name) return ioutil.ReadFile(filepath) } // FileExists implement ExternalStorage.FileExists. -func (l *localStorage) FileExists(ctx context.Context, name string) (bool, error) { +func (l *LocalStorage) FileExists(ctx context.Context, name string) (bool, error) { filepath := path.Join(l.base, name) return pathExists(filepath) } -func (l *localStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { +// WalkDir traverse all the files in a dir +func (l *LocalStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { return filepath.Walk(l.base, func(path string, f os.FileInfo, err error) error { if err != nil { return errors.Trace(err) @@ -47,7 +50,8 @@ func (l *localStorage) WalkDir(ctx context.Context, fn func(string, int64) error }) } -func (l *localStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { +// Open a Reader by file name +func (l *LocalStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { return os.Open(path.Join(l.base, name)) } @@ -62,9 +66,9 @@ func pathExists(_path string) (bool, error) { return true, nil } -// NewLocalStorage return a localStorage at directory `base` +// NewLocalStorage return a LocalStorage at directory `base` // export for test use -func NewLocalStorage(base string) (*localStorage, error) { +func NewLocalStorage(base string) (*LocalStorage, error) { ok, err := pathExists(base) if err != nil { return nil, err @@ -75,8 +79,5 @@ func NewLocalStorage(base string) (*localStorage, error) { return nil, err } } - return &localStorage{base: base}, nil -} - -type LocalReader struct { + return &LocalStorage{base: base}, nil } diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 0084ea595..9a6c61501 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -6,17 +6,19 @@ import ( "bytes" "context" "fmt" + "io" + "io/ioutil" + "net/url" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" - "github.com/pingcap/br/pkg/utils" "github.com/spf13/pflag" - "io" - "io/ioutil" - "net/url" + + "github.com/pingcap/br/pkg/utils" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" @@ -109,6 +111,7 @@ func (options *S3BackendOptions) apply(s3 *backup.S3) error { return nil } +// DefineS3Flags defines the command line flags for S3BackendOptions func DefineS3Flags(flags *pflag.FlagSet) { // TODO: remove experimental tag if it's stable flags.String(s3EndpointOption, "", @@ -122,6 +125,7 @@ func DefineS3Flags(flags *pflag.FlagSet) { flags.String(s3ProviderOption, "", "(experimental) Set the S3 provider, e.g. aws, alibaba, ceph") } +// ParseFromFlags parse S3BackendOptions from command line flags func (options *S3BackendOptions) ParseFromFlags(flags *pflag.FlagSet) error { var err error options.Endpoint, err = flags.GetString(s3EndpointOption) @@ -296,6 +300,7 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) return true, err } +// WalkDir traverse all the files in a dir func (rs *S3Storage) WalkDir(ctx context.Context, fn func(string, int64) error) error { var marker *string maxKeys := int64(1000) @@ -325,12 +330,13 @@ func (rs *S3Storage) WalkDir(ctx context.Context, fn func(string, int64) error) return nil } +// Open a Reader by file name func (rs *S3Storage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { reader, err := rs.open(ctx, name, 0, 0) if err != nil { return nil, err } - return &S3ObjectReader{ + return &s3ObjectReader{ storage: rs, name: name, reader: reader, @@ -367,24 +373,28 @@ func (rs *S3Storage) open(ctx context.Context, name string, startOffset int64, e return result.Body, nil } -type S3ObjectReader struct { +// s3ObjectReader wrap GetObjectOutput.Body and add the `Seek` method +type s3ObjectReader struct { storage *S3Storage name string reader io.ReadCloser pos int64 } -func (r *S3ObjectReader) Read(p []byte) (n int, err error) { +// Read implement the io.Reader interface +func (r *s3ObjectReader) Read(p []byte) (n int, err error) { n, err = r.reader.Read(p) r.pos += int64(n) return } -func (r *S3ObjectReader) Close() error { +// Close implement the io.Closer interface +func (r *s3ObjectReader) Close() error { return r.reader.Close() } -func (r *S3ObjectReader) Seek(offset int64, whence int) (int64, error) { +// Seek implement the io.Seeker interface +func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { // if seek ahead no more than 64k, we read add drop these data var realOffset int64 if whence == io.SeekStart { diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index caa118923..d57210058 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -10,6 +10,7 @@ import ( "github.com/pingcap/kvproto/pkg/backup" ) +// ReadSeekCloser is the interface that groups the basic Read, Seek and Close methods. type ReadSeekCloser interface { io.Reader io.Seeker diff --git a/pkg/utils/math.go b/pkg/utils/math.go index 0b8b6dd52..c0cdb8254 100644 --- a/pkg/utils/math.go +++ b/pkg/utils/math.go @@ -38,7 +38,7 @@ func ClampInt(n, min, max int) int { return MinInt(max, MaxInt(min, n)) } -// MinInt choice smallest integer from its arguments. +// MinInt64 choice smallest integer from its arguments. func MinInt64(x int64, xs ...int64) int64 { min := x for _, n := range xs { From c259509c4fbb8014f37bca42f6bc3a40807b8d76 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 30 Jul 2020 17:05:46 +0800 Subject: [PATCH 10/14] fix review comments --- pkg/storage/gcs.go | 9 +++++-- pkg/storage/local.go | 17 ++++++++---- pkg/storage/noop.go | 4 +-- pkg/storage/s3.go | 61 +++++++++++++++++++++--------------------- pkg/storage/s3_test.go | 21 +++++++++++++++ pkg/storage/storage.go | 11 +++++--- 6 files changed, 81 insertions(+), 42 deletions(-) diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index a5576278c..c41fc2e09 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -123,13 +123,18 @@ func (s *gcsStorage) FileExists(ctx context.Context, name string) (bool, error) return true, nil } -// Open a Reader by file name +// Open a Reader by file name. func (s *gcsStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { // TODO, implement this if needed panic("Unsupported Operation") } -// WalkDir traverse all the files in a dir +// WalkDir traverse all the files in a dir. +// +// fn is the function called for each regular file visited by WalkDir. +// The first argument is the file path that can be used in `Open` +// function; the second argument is the size in byte of the file determined +// by path. func (s *gcsStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { // TODO, implement this if needed panic("Unsupported Operation") diff --git a/pkg/storage/local.go b/pkg/storage/local.go index 631c6a1bf..06eae91f9 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -13,7 +13,8 @@ import ( ) // LocalStorage represents local file system storage. -// export for using in tests +// +// export for using in tests. type LocalStorage struct { base string } @@ -35,7 +36,12 @@ func (l *LocalStorage) FileExists(ctx context.Context, name string) (bool, error return pathExists(filepath) } -// WalkDir traverse all the files in a dir +// WalkDir traverse all the files in a dir. +// +// fn is the function called for each regular file visited by WalkDir. +// The first argument is the file path that can be used in `Open` +// function; the second argument is the size in byte of the file determined +// by path. func (l *LocalStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { return filepath.Walk(l.base, func(path string, f os.FileInfo, err error) error { if err != nil { @@ -50,7 +56,7 @@ func (l *LocalStorage) WalkDir(ctx context.Context, fn func(string, int64) error }) } -// Open a Reader by file name +// Open a Reader by file name. func (l *LocalStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { return os.Open(path.Join(l.base, name)) } @@ -66,8 +72,9 @@ func pathExists(_path string) (bool, error) { return true, nil } -// NewLocalStorage return a LocalStorage at directory `base` -// export for test use +// NewLocalStorage return a LocalStorage at directory `base`. +// +// export for test. func NewLocalStorage(base string) (*LocalStorage, error) { ok, err := pathExists(base) if err != nil { diff --git a/pkg/storage/noop.go b/pkg/storage/noop.go index 3cd6ee61f..83c7384ed 100644 --- a/pkg/storage/noop.go +++ b/pkg/storage/noop.go @@ -23,12 +23,12 @@ func (*noopStorage) FileExists(ctx context.Context, name string) (bool, error) { return false, nil } -// Open a Reader by file name +// Open a Reader by file name. func (*noopStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { return noopReader{}, nil } -// WalkDir traverse all the files in a dir +// WalkDir traverse all the files in a dir. func (*noopStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { return nil } diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 9a6c61501..fef2305bd 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -18,8 +18,6 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/spf13/pflag" - "github.com/pingcap/br/pkg/utils" - "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" ) @@ -35,6 +33,9 @@ const ( notFound = "NotFound" // number of retries to make of operations maxRetries = 3 + + // the maximum number of byte to read for seek + maxSkipOffsetByRead = 1 << 16 //64KB ) // s3Handlers make it easy to inject test functions. @@ -111,7 +112,7 @@ func (options *S3BackendOptions) apply(s3 *backup.S3) error { return nil } -// DefineS3Flags defines the command line flags for S3BackendOptions +// DefineS3Flags defines the command line flags for S3BackendOptions. func DefineS3Flags(flags *pflag.FlagSet) { // TODO: remove experimental tag if it's stable flags.String(s3EndpointOption, "", @@ -125,7 +126,7 @@ func DefineS3Flags(flags *pflag.FlagSet) { flags.String(s3ProviderOption, "", "(experimental) Set the S3 provider, e.g. aws, alibaba, ceph") } -// ParseFromFlags parse S3BackendOptions from command line flags +// ParseFromFlags parse S3BackendOptions from command line flags. func (options *S3BackendOptions) ParseFromFlags(flags *pflag.FlagSet) error { var err error options.Endpoint, err = flags.GetString(s3EndpointOption) @@ -300,7 +301,12 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) return true, err } -// WalkDir traverse all the files in a dir +// WalkDir traverse all the files in a dir. +// +// fn is the function called for each regular file visited by WalkDir. +// The first argument is the file path that can be used in `Open` +// function; the second argument is the size in byte of the file determined +// by path. func (rs *S3Storage) WalkDir(ctx context.Context, fn func(string, int64) error) error { var marker *string maxKeys := int64(1000) @@ -330,7 +336,7 @@ func (rs *S3Storage) WalkDir(ctx context.Context, fn func(string, int64) error) return nil } -// Open a Reader by file name +// Open a Reader by file name. func (rs *S3Storage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { reader, err := rs.open(ctx, name, 0, 0) if err != nil { @@ -364,16 +370,17 @@ func (rs *S3Storage) open(ctx context.Context, name string, startOffset int64, e return nil, err } - // FIXME: we test in minio, when request with Range, the result.AcceptRanges is a bare string 'range', not sure - // whether this is a feature or bug + // FIXME: we test in minio, when request with Range, the result.AcceptRanges is a bare string 'range', + // not sure whether this is a feature or bug //if rangeOffset != nil && (result.AcceptRanges == nil || *result.AcceptRanges != *rangeOffset) { - // return nil, errors.Errorf("open file '%s' failed, expected range: %s, got: %v", name, *rangeOffset, result.AcceptRanges) + // return nil, errors.Errorf("open file '%s' failed, expected range: %s, got: %v", + // name, *rangeOffset, result.AcceptRanges) //} return result.Body, nil } -// s3ObjectReader wrap GetObjectOutput.Body and add the `Seek` method +// s3ObjectReader wrap GetObjectOutput.Body and add the `Seek` method. type s3ObjectReader struct { storage *S3Storage name string @@ -381,28 +388,28 @@ type s3ObjectReader struct { pos int64 } -// Read implement the io.Reader interface +// Read implement the io.Reader interface. func (r *s3ObjectReader) Read(p []byte) (n int, err error) { n, err = r.reader.Read(p) r.pos += int64(n) return } -// Close implement the io.Closer interface +// Close implement the io.Closer interface. func (r *s3ObjectReader) Close() error { return r.reader.Close() } -// Seek implement the io.Seeker interface +// Seek implement the io.Seeker interface. func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { - // if seek ahead no more than 64k, we read add drop these data var realOffset int64 - if whence == io.SeekStart { + switch whence { + case io.SeekStart: realOffset = offset - } else if whence == io.SeekCurrent { + case io.SeekCurrent: realOffset = r.pos + offset - } else { - // TODO + default: + // TODO: maybe we can fetch the object stat and calculate the absolute offset return 0, errors.New("seek by SeekEnd is not supported yet") } @@ -410,22 +417,16 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { return realOffset, nil } - if realOffset > r.pos && offset-r.pos < 1<<16 { - batch := int64(4096) - buf := make([]byte, batch, batch) - total := offset - r.pos - remain := total - for remain > 0 { - n, err := r.Read(buf[:utils.MinInt64(remain, batch)]) - if err != nil { - return total - remain + int64(n), err - } - remain -= int64(n) + // if seek ahead no more than 64k, we read add drop these data + if realOffset > r.pos && offset-r.pos <= maxSkipOffsetByRead { + _, err := io.CopyN(ioutil.Discard, r, offset-r.pos) + if err != nil { + return r.pos, err } return realOffset, nil } - // close current read and open a new one + // close current read and open a new one which target offset err := r.reader.Close() if err != nil { return 0, err diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index 18cae9d33..644e62103 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -473,6 +473,27 @@ func (c *mockS3Handler) PutObjectWithContext(ctx context.Context, input *s3.PutObjectInput, opts ...request.Option) (*s3.PutObjectOutput, error) { return nil, c.err } +func (c *mockS3Handler) ListObjectsWithContext( + context.Context, + *s3.ListObjectsInput, + ...request.Option, +) (*s3.ListObjectsOutput, error) { + if c.err != nil { + return nil, c.err + } + truncated := false + key := "/HappyFace.jpg" + size := int64(13) + return &s3.ListObjectsOutput{ + Contents: []*s3.Object{ + { + Key: &key, + Size: &size, + }, + }, + IsTruncated: &truncated, + }, nil +} func (c *mockS3Handler) HeadBucketWithContext(ctx context.Context, input *s3.HeadBucketInput, opts ...request.Option) (*s3.HeadBucketOutput, error) { return nil, c.err diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index d57210058..4e6ad6ca1 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -25,10 +25,15 @@ type ExternalStorage interface { Read(ctx context.Context, name string) ([]byte, error) // FileExists return true if file exists FileExists(ctx context.Context, name string) (bool, error) - // Open a Reader by file name + // Open a Reader by file name. Open(ctx context.Context, name string) (ReadSeekCloser, error) - // WalkDir traverse all the files in a dir - WalkDir(ctx context.Context, fn func(string, int64) error) error + // WalkDir traverse all the files in a dir. + // + // fn is the function called for each regular file visited by WalkDir. + // The argument `path` is the file path that can be used in `Open` + // function; the argument `size` is the size in byte of the file determined + // by path. + WalkDir(ctx context.Context, fn func(path string, size int64) error) error } // Create creates ExternalStorage. From 1f048977ec4e2cee0e4abdcfda066d9248cd4c81 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 10 Aug 2020 15:28:13 +0800 Subject: [PATCH 11/14] fix some comments --- pkg/storage/s3.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index fef2305bd..bc7c6b6b3 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -417,7 +417,7 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { return realOffset, nil } - // if seek ahead no more than 64k, we read add drop these data + // if seek ahead no more than 64k, we discard these data if realOffset > r.pos && offset-r.pos <= maxSkipOffsetByRead { _, err := io.CopyN(ioutil.Discard, r, offset-r.pos) if err != nil { @@ -432,7 +432,7 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { return 0, err } - newReader, err := r.storage.open(context.Background(), r.name, realOffset, 0) + newReader, err := r.storage.open(context.TODO(), r.name, realOffset, 0) if err != nil { return 0, err } From 1d52244ca8f175e255ec495a2ce1860ecd1187c2 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 10 Aug 2020 15:30:22 +0800 Subject: [PATCH 12/14] update --- pkg/storage/flags.go | 2 +- pkg/storage/s3.go | 4 ++-- pkg/storage/s3_test.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/storage/flags.go b/pkg/storage/flags.go index adbaa986c..767ab472c 100644 --- a/pkg/storage/flags.go +++ b/pkg/storage/flags.go @@ -8,7 +8,7 @@ import ( // DefineFlags adds flags to the flag set corresponding to all backend options. func DefineFlags(flags *pflag.FlagSet) { - DefineS3Flags(flags) + defineS3Flags(flags) defineGCSFlags(flags) } diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index bc7c6b6b3..3455e763b 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -112,8 +112,8 @@ func (options *S3BackendOptions) apply(s3 *backup.S3) error { return nil } -// DefineS3Flags defines the command line flags for S3BackendOptions. -func DefineS3Flags(flags *pflag.FlagSet) { +// defineS3Flags defines the command line flags for S3BackendOptions. +func defineS3Flags(flags *pflag.FlagSet) { // TODO: remove experimental tag if it's stable flags.String(s3EndpointOption, "", "(experimental) Set the S3 endpoint URL, please specify the http or https scheme explicitly") diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index 644e62103..16f4350a8 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -449,7 +449,7 @@ func (r *testStorageSuite) TestS3Handlers(c *C) { } func (r *testStorageSuite) TestS3Others(c *C) { - DefineS3Flags(&pflag.FlagSet{}) + defineS3Flags(&pflag.FlagSet{}) } type mockS3Handler struct { From c911dc87c2e77e6b35f54ce82f4c6dcd3055caf6 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 10 Aug 2020 15:53:40 +0800 Subject: [PATCH 13/14] fix --- pkg/storage/flags.go | 4 ++-- pkg/storage/s3.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/storage/flags.go b/pkg/storage/flags.go index 767ab472c..9143d4d74 100644 --- a/pkg/storage/flags.go +++ b/pkg/storage/flags.go @@ -12,9 +12,9 @@ func DefineFlags(flags *pflag.FlagSet) { defineGCSFlags(flags) } -// ParseFromFlags obtains the backend options from the flag set. +// parseFromFlags obtains the backend options from the flag set. func (options *BackendOptions) ParseFromFlags(flags *pflag.FlagSet) error { - if err := options.S3.ParseFromFlags(flags); err != nil { + if err := options.S3.parseFromFlags(flags); err != nil { return err } return options.GCS.parseFromFlags(flags) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 3455e763b..c31997d76 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -127,7 +127,7 @@ func defineS3Flags(flags *pflag.FlagSet) { } // ParseFromFlags parse S3BackendOptions from command line flags. -func (options *S3BackendOptions) ParseFromFlags(flags *pflag.FlagSet) error { +func (options *S3BackendOptions) parseFromFlags(flags *pflag.FlagSet) error { var err error options.Endpoint, err = flags.GetString(s3EndpointOption) if err != nil { From 1b67827b8a959e3c821b3fea878b33b060f62efd Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 10 Aug 2020 15:56:34 +0800 Subject: [PATCH 14/14] update --- pkg/storage/flags.go | 2 +- pkg/storage/s3.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/storage/flags.go b/pkg/storage/flags.go index 9143d4d74..c828f57a1 100644 --- a/pkg/storage/flags.go +++ b/pkg/storage/flags.go @@ -12,7 +12,7 @@ func DefineFlags(flags *pflag.FlagSet) { defineGCSFlags(flags) } -// parseFromFlags obtains the backend options from the flag set. +// ParseFromFlags obtains the backend options from the flag set. func (options *BackendOptions) ParseFromFlags(flags *pflag.FlagSet) error { if err := options.S3.parseFromFlags(flags); err != nil { return err diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index c31997d76..429e8b8d8 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -126,7 +126,7 @@ func defineS3Flags(flags *pflag.FlagSet) { flags.String(s3ProviderOption, "", "(experimental) Set the S3 provider, e.g. aws, alibaba, ceph") } -// ParseFromFlags parse S3BackendOptions from command line flags. +// parseFromFlags parse S3BackendOptions from command line flags. func (options *S3BackendOptions) parseFromFlags(flags *pflag.FlagSet) error { var err error options.Endpoint, err = flags.GetString(s3EndpointOption)