From 72edc01c7e96f98ab645febcdb4a6d0d0dc9d72f Mon Sep 17 00:00:00 2001 From: luancheng Date: Fri, 21 Aug 2020 15:14:18 +0800 Subject: [PATCH 1/9] storage: update WalkDir --- pkg/storage/gcs.go | 2 +- pkg/storage/local.go | 2 +- pkg/storage/noop.go | 2 +- pkg/storage/s3.go | 13 ++++++++++--- pkg/storage/storage.go | 2 +- 5 files changed, 14 insertions(+), 7 deletions(-) diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index 8e82b228f..7805cf942 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -135,7 +135,7 @@ func (s *gcsStorage) Open(ctx context.Context, path string) (ReadSeekCloser, err // The first argument is the file path that can be used in `Open` // function; the second argument is the size in byte of the file determined // by path. -func (s *gcsStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error { +func (s *gcsStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { // TODO, implement this if needed panic("Unsupported Operation") } diff --git a/pkg/storage/local.go b/pkg/storage/local.go index 2bca89c09..848e14773 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -41,7 +41,7 @@ func (l *LocalStorage) FileExists(ctx context.Context, name string) (bool, error // The first argument is the file path that can be used in `Open` // function; the second argument is the size in byte of the file determined // by path. -func (l *LocalStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error { +func (l *LocalStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { return filepath.Walk(l.base, func(path string, f os.FileInfo, err error) error { if err != nil { return errors.Trace(err) diff --git a/pkg/storage/noop.go b/pkg/storage/noop.go index ecee13115..5e38b895b 100644 --- a/pkg/storage/noop.go +++ b/pkg/storage/noop.go @@ -29,7 +29,7 @@ func (*noopStorage) Open(ctx context.Context, path string) (ReadSeekCloser, erro } // WalkDir traverse all the files in a dir. -func (*noopStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error { +func (*noopStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { return nil } diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 94b63d3c4..a4584d97d 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -363,13 +363,20 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) // The first argument is the file path that can be used in `Open` // function; the second argument is the size in byte of the file determined // by path. -func (rs *S3Storage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error { +func (rs *S3Storage) WalkDir(ctx context.Context, fn func(string, int64) error) error { var marker *string - prefix := rs.options.Prefix + dir + prefix := rs.options.Prefix maxKeys := int64(1000) - if listCount > 0 { + + dir, ok := ctx.Value("subDir").(string) + if ok { + prefix += dir + } + listCount, ok := ctx.Value("listCount").(int64) + if ok { maxKeys = listCount } + req := &s3.ListObjectsInput{ Bucket: aws.String(rs.options.Bucket), Prefix: aws.String(prefix), diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index a8fd0f3fe..43e3d5c82 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -41,7 +41,7 @@ type ExternalStorage interface { // The argument `path` is the file path that can be used in `Open` // function; the argument `size` is the size in byte of the file determined // by path. - WalkDir(ctx context.Context, dir string, listCount int64, fn func(path string, size int64) error) error + WalkDir(ctx context.Context, fn func(path string, size int64) error) error // CreateUploader create a uploader that will upload chunks data to storage. // It's design for s3 multi-part upload currently. e.g. cdc log backup use this to do multi part upload From f6c33576896579de29ab3380f88c7f9f1a9b41b8 Mon Sep 17 00:00:00 2001 From: luancheng Date: Fri, 21 Aug 2020 17:51:16 +0800 Subject: [PATCH 2/9] address comment --- pkg/storage/gcs.go | 2 +- pkg/storage/local.go | 2 +- pkg/storage/noop.go | 2 +- pkg/storage/s3.go | 14 ++++---------- pkg/storage/storage.go | 8 +++++++- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index 7805cf942..b613aa982 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -135,7 +135,7 @@ func (s *gcsStorage) Open(ctx context.Context, path string) (ReadSeekCloser, err // The first argument is the file path that can be used in `Open` // function; the second argument is the size in byte of the file determined // by path. -func (s *gcsStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { +func (s *gcsStorage) WalkDir(ctx context.Context, opt WalkOption, fn func(string, int64) error) error { // TODO, implement this if needed panic("Unsupported Operation") } diff --git a/pkg/storage/local.go b/pkg/storage/local.go index 848e14773..badf07dd6 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -41,7 +41,7 @@ func (l *LocalStorage) FileExists(ctx context.Context, name string) (bool, error // The first argument is the file path that can be used in `Open` // function; the second argument is the size in byte of the file determined // by path. -func (l *LocalStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { +func (l *LocalStorage) WalkDir(ctx context.Context, opt WalkOption, fn func(string, int64) error) error { return filepath.Walk(l.base, func(path string, f os.FileInfo, err error) error { if err != nil { return errors.Trace(err) diff --git a/pkg/storage/noop.go b/pkg/storage/noop.go index 5e38b895b..cc40e5897 100644 --- a/pkg/storage/noop.go +++ b/pkg/storage/noop.go @@ -29,7 +29,7 @@ func (*noopStorage) Open(ctx context.Context, path string) (ReadSeekCloser, erro } // WalkDir traverse all the files in a dir. -func (*noopStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { +func (*noopStorage) WalkDir(ctx context.Context, opt WalkOption, fn func(string, int64) error) error { return nil } diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index a4584d97d..ba05e98ea 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -363,18 +363,12 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) // The first argument is the file path that can be used in `Open` // function; the second argument is the size in byte of the file determined // by path. -func (rs *S3Storage) WalkDir(ctx context.Context, fn func(string, int64) error) error { +func (rs *S3Storage) WalkDir(ctx context.Context, opt WalkOption, fn func(string, int64) error) error { var marker *string - prefix := rs.options.Prefix + prefix := rs.options.Prefix + opt.subDir maxKeys := int64(1000) - - dir, ok := ctx.Value("subDir").(string) - if ok { - prefix += dir - } - listCount, ok := ctx.Value("listCount").(int64) - if ok { - maxKeys = listCount + if opt.listCount > 0 { + maxKeys = opt.listCount } req := &s3.ListObjectsInput{ diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 43e3d5c82..d59ffeeca 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -10,6 +10,12 @@ import ( "github.com/pingcap/kvproto/pkg/backup" ) +// WalkOption is the option of storage.WalkDir +type WalkOption struct { + subDir string + listCount int64 +} + // ReadSeekCloser is the interface that groups the basic Read, Seek and Close methods. type ReadSeekCloser interface { io.Reader @@ -41,7 +47,7 @@ type ExternalStorage interface { // The argument `path` is the file path that can be used in `Open` // function; the argument `size` is the size in byte of the file determined // by path. - WalkDir(ctx context.Context, fn func(path string, size int64) error) error + WalkDir(ctx context.Context, opt WalkOption, fn func(path string, size int64) error) error // CreateUploader create a uploader that will upload chunks data to storage. // It's design for s3 multi-part upload currently. e.g. cdc log backup use this to do multi part upload From 4677b5dafa81c870d2927a09b90e3b52d2e1b0ee Mon Sep 17 00:00:00 2001 From: luancheng Date: Fri, 21 Aug 2020 17:54:10 +0800 Subject: [PATCH 3/9] fix build --- pkg/storage/storage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index d59ffeeca..90834deae 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -12,7 +12,7 @@ import ( // WalkOption is the option of storage.WalkDir type WalkOption struct { - subDir string + subDir string listCount int64 } From eeadc4de3e1c268108ddc6ab6fca4e81e304c2a2 Mon Sep 17 00:00:00 2001 From: luancheng Date: Fri, 21 Aug 2020 19:43:49 +0800 Subject: [PATCH 4/9] fix check --- pkg/storage/storage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 90834deae..4d5932b7f 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -10,7 +10,7 @@ import ( "github.com/pingcap/kvproto/pkg/backup" ) -// WalkOption is the option of storage.WalkDir +// WalkOption is the option of storage.WalkDir. type WalkOption struct { subDir string listCount int64 From 4e9142d27163a3d662eb4f5f1b16e2f72a0cf8d6 Mon Sep 17 00:00:00 2001 From: luancheng Date: Mon, 24 Aug 2020 14:48:23 +0800 Subject: [PATCH 5/9] update walk option --- pkg/storage/s3.go | 7 ++++++- pkg/storage/storage.go | 6 ++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index ba05e98ea..88cdeb760 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -9,6 +9,7 @@ import ( "io" "io/ioutil" "net/url" + "strings" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -383,7 +384,11 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt WalkOption, fn func(string return err } for _, r := range res.Contents { - if err = fn(*r.Key, *r.Size); err != nil { + path := *r.Key + if opt.removePrefix { + path = strings.TrimLeft(path, rs.options.Prefix) + } + if err = fn(path, *r.Size); err != nil { return err } } diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 4d5932b7f..9c0961c87 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -12,8 +12,14 @@ import ( // WalkOption is the option of storage.WalkDir. type WalkOption struct { + // walk on subDir of specify directory subDir string + // number of list count, default 1000 listCount int64 + // this is used for s3 storage, when walk on specify directory, + // the result include storage.Prefix, which can not be reuse in other API.(Open/Read), + // we can set removePrefix to true, to avoid this problem. + removePrefix bool } // ReadSeekCloser is the interface that groups the basic Read, Seek and Close methods. From 7c1c605fbf226c237dd2e2587da0c54eabedaf5b Mon Sep 17 00:00:00 2001 From: luancheng Date: Mon, 24 Aug 2020 15:11:08 +0800 Subject: [PATCH 6/9] public field --- pkg/storage/s3.go | 8 ++++---- pkg/storage/storage.go | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 88cdeb760..4766f6ec4 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -366,10 +366,10 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) // by path. func (rs *S3Storage) WalkDir(ctx context.Context, opt WalkOption, fn func(string, int64) error) error { var marker *string - prefix := rs.options.Prefix + opt.subDir + prefix := rs.options.Prefix + opt.SubDir maxKeys := int64(1000) - if opt.listCount > 0 { - maxKeys = opt.listCount + if opt.ListCount > 0 { + maxKeys = opt.ListCount } req := &s3.ListObjectsInput{ @@ -385,7 +385,7 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt WalkOption, fn func(string } for _, r := range res.Contents { path := *r.Key - if opt.removePrefix { + if opt.RemovePrefix { path = strings.TrimLeft(path, rs.options.Prefix) } if err = fn(path, *r.Size); err != nil { diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 9c0961c87..53548f125 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -12,14 +12,14 @@ import ( // WalkOption is the option of storage.WalkDir. type WalkOption struct { - // walk on subDir of specify directory - subDir string + // walk on SubDir of specify directory + SubDir string // number of list count, default 1000 - listCount int64 + ListCount int64 // this is used for s3 storage, when walk on specify directory, // the result include storage.Prefix, which can not be reuse in other API.(Open/Read), - // we can set removePrefix to true, to avoid this problem. - removePrefix bool + // we can set RemovePrefix to true, to avoid this problem. + RemovePrefix bool } // ReadSeekCloser is the interface that groups the basic Read, Seek and Close methods. From 824b803433fa22425541fb3d2fe3dbd3c4ecfe84 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 24 Aug 2020 16:07:17 +0800 Subject: [PATCH 7/9] Update pkg/storage/s3.go Co-authored-by: kennytm --- pkg/storage/s3.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 4766f6ec4..1a63ae68b 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -386,7 +386,7 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt WalkOption, fn func(string for _, r := range res.Contents { path := *r.Key if opt.RemovePrefix { - path = strings.TrimLeft(path, rs.options.Prefix) + path = strings.TrimPrefix(path, rs.options.Prefix) } if err = fn(path, *r.Size); err != nil { return err From a02c264268e314c34ff7f9b7e292269053b0c84a Mon Sep 17 00:00:00 2001 From: luancheng Date: Mon, 24 Aug 2020 16:51:48 +0800 Subject: [PATCH 8/9] address comment --- pkg/storage/s3.go | 8 ++++---- pkg/storage/storage.go | 4 ---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 4766f6ec4..bf90e7191 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -384,10 +384,10 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt WalkOption, fn func(string return err } for _, r := range res.Contents { - path := *r.Key - if opt.RemovePrefix { - path = strings.TrimLeft(path, rs.options.Prefix) - } + // when walk on specify directory, the result include storage.Prefix, + // which can not be reuse in other API(Open/Read) directly. + // so we use TrimPrefix to filter Prefix for next Open/Read. + path := strings.TrimPrefix(*r.Key, rs.options.Prefix) if err = fn(path, *r.Size); err != nil { return err } diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 53548f125..a4486c116 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -16,10 +16,6 @@ type WalkOption struct { SubDir string // number of list count, default 1000 ListCount int64 - // this is used for s3 storage, when walk on specify directory, - // the result include storage.Prefix, which can not be reuse in other API.(Open/Read), - // we can set RemovePrefix to true, to avoid this problem. - RemovePrefix bool } // ReadSeekCloser is the interface that groups the basic Read, Seek and Close methods. From 6d69f2161466a1b926cf1bfd4effbe68ef435e40 Mon Sep 17 00:00:00 2001 From: luancheng Date: Mon, 24 Aug 2020 19:03:15 +0800 Subject: [PATCH 9/9] make opt to pointer --- pkg/storage/gcs.go | 2 +- pkg/storage/local.go | 2 +- pkg/storage/noop.go | 2 +- pkg/storage/s3.go | 2 +- pkg/storage/storage.go | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index b613aa982..956a5f488 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -135,7 +135,7 @@ func (s *gcsStorage) Open(ctx context.Context, path string) (ReadSeekCloser, err // The first argument is the file path that can be used in `Open` // function; the second argument is the size in byte of the file determined // by path. -func (s *gcsStorage) WalkDir(ctx context.Context, opt WalkOption, fn func(string, int64) error) error { +func (s *gcsStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error { // TODO, implement this if needed panic("Unsupported Operation") } diff --git a/pkg/storage/local.go b/pkg/storage/local.go index badf07dd6..25678d7cd 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -41,7 +41,7 @@ func (l *LocalStorage) FileExists(ctx context.Context, name string) (bool, error // The first argument is the file path that can be used in `Open` // function; the second argument is the size in byte of the file determined // by path. -func (l *LocalStorage) WalkDir(ctx context.Context, opt WalkOption, fn func(string, int64) error) error { +func (l *LocalStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error { return filepath.Walk(l.base, func(path string, f os.FileInfo, err error) error { if err != nil { return errors.Trace(err) diff --git a/pkg/storage/noop.go b/pkg/storage/noop.go index cc40e5897..a29d62e1d 100644 --- a/pkg/storage/noop.go +++ b/pkg/storage/noop.go @@ -29,7 +29,7 @@ func (*noopStorage) Open(ctx context.Context, path string) (ReadSeekCloser, erro } // WalkDir traverse all the files in a dir. -func (*noopStorage) WalkDir(ctx context.Context, opt WalkOption, fn func(string, int64) error) error { +func (*noopStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error { return nil } diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index bf90e7191..c9423a7c1 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -364,7 +364,7 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) // The first argument is the file path that can be used in `Open` // function; the second argument is the size in byte of the file determined // by path. -func (rs *S3Storage) WalkDir(ctx context.Context, opt WalkOption, fn func(string, int64) error) error { +func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error { var marker *string prefix := rs.options.Prefix + opt.SubDir maxKeys := int64(1000) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index a4486c116..ff1a1eeee 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -49,7 +49,7 @@ type ExternalStorage interface { // The argument `path` is the file path that can be used in `Open` // function; the argument `size` is the size in byte of the file determined // by path. - WalkDir(ctx context.Context, opt WalkOption, fn func(path string, size int64) error) error + WalkDir(ctx context.Context, opt *WalkOption, fn func(path string, size int64) error) error // CreateUploader create a uploader that will upload chunks data to storage. // It's design for s3 multi-part upload currently. e.g. cdc log backup use this to do multi part upload