diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index 8e82b228f..956a5f488 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -135,7 +135,7 @@ func (s *gcsStorage) Open(ctx context.Context, path string) (ReadSeekCloser, err // The first argument is the file path that can be used in `Open` // function; the second argument is the size in byte of the file determined // by path. -func (s *gcsStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error { +func (s *gcsStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error { // TODO, implement this if needed panic("Unsupported Operation") } diff --git a/pkg/storage/local.go b/pkg/storage/local.go index 2bca89c09..25678d7cd 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -41,7 +41,7 @@ func (l *LocalStorage) FileExists(ctx context.Context, name string) (bool, error // The first argument is the file path that can be used in `Open` // function; the second argument is the size in byte of the file determined // by path. -func (l *LocalStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error { +func (l *LocalStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error { return filepath.Walk(l.base, func(path string, f os.FileInfo, err error) error { if err != nil { return errors.Trace(err) diff --git a/pkg/storage/noop.go b/pkg/storage/noop.go index ecee13115..a29d62e1d 100644 --- a/pkg/storage/noop.go +++ b/pkg/storage/noop.go @@ -29,7 +29,7 @@ func (*noopStorage) Open(ctx context.Context, path string) (ReadSeekCloser, erro } // WalkDir traverse all the files in a dir. -func (*noopStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error { +func (*noopStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error { return nil } diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 94b63d3c4..c9423a7c1 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -9,6 +9,7 @@ import ( "io" "io/ioutil" "net/url" + "strings" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -363,13 +364,14 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) // The first argument is the file path that can be used in `Open` // function; the second argument is the size in byte of the file determined // by path. -func (rs *S3Storage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error { +func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error { var marker *string - prefix := rs.options.Prefix + dir + prefix := rs.options.Prefix + opt.SubDir maxKeys := int64(1000) - if listCount > 0 { - maxKeys = listCount + if opt.ListCount > 0 { + maxKeys = opt.ListCount } + req := &s3.ListObjectsInput{ Bucket: aws.String(rs.options.Bucket), Prefix: aws.String(prefix), @@ -382,7 +384,11 @@ func (rs *S3Storage) WalkDir(ctx context.Context, dir string, listCount int64, f return err } for _, r := range res.Contents { - if err = fn(*r.Key, *r.Size); err != nil { + // when walk on specify directory, the result include storage.Prefix, + // which can not be reuse in other API(Open/Read) directly. + // so we use TrimPrefix to filter Prefix for next Open/Read. + path := strings.TrimPrefix(*r.Key, rs.options.Prefix) + if err = fn(path, *r.Size); err != nil { return err } } diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index a8fd0f3fe..ff1a1eeee 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -10,6 +10,14 @@ import ( "github.com/pingcap/kvproto/pkg/backup" ) +// WalkOption is the option of storage.WalkDir. +type WalkOption struct { + // walk on SubDir of specify directory + SubDir string + // number of list count, default 1000 + ListCount int64 +} + // ReadSeekCloser is the interface that groups the basic Read, Seek and Close methods. type ReadSeekCloser interface { io.Reader @@ -41,7 +49,7 @@ type ExternalStorage interface { // The argument `path` is the file path that can be used in `Open` // function; the argument `size` is the size in byte of the file determined // by path. - WalkDir(ctx context.Context, dir string, listCount int64, fn func(path string, size int64) error) error + WalkDir(ctx context.Context, opt *WalkOption, fn func(path string, size int64) error) error // CreateUploader create a uploader that will upload chunks data to storage. // It's design for s3 multi-part upload currently. e.g. cdc log backup use this to do multi part upload