Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (s *gcsStorage) Open(ctx context.Context, path string) (ReadSeekCloser, err
// The first argument is the file path that can be used in `Open`
// function; the second argument is the size in byte of the file determined
// by path.
func (s *gcsStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error {
func (s *gcsStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error {
// TODO, implement this if needed
panic("Unsupported Operation")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (l *LocalStorage) FileExists(ctx context.Context, name string) (bool, error
// The first argument is the file path that can be used in `Open`
// function; the second argument is the size in byte of the file determined
// by path.
func (l *LocalStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error {
func (l *LocalStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error {
return filepath.Walk(l.base, func(path string, f os.FileInfo, err error) error {
if err != nil {
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (*noopStorage) Open(ctx context.Context, path string) (ReadSeekCloser, erro
}

// WalkDir traverse all the files in a dir.
func (*noopStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error {
func (*noopStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error {
return nil
}

Expand Down
16 changes: 11 additions & 5 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"io/ioutil"
"net/url"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand Down Expand Up @@ -363,13 +364,14 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error)
// The first argument is the file path that can be used in `Open`
// function; the second argument is the size in byte of the file determined
// by path.
func (rs *S3Storage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error {
func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error {
var marker *string
prefix := rs.options.Prefix + dir
prefix := rs.options.Prefix + opt.SubDir
maxKeys := int64(1000)
if listCount > 0 {
maxKeys = listCount
if opt.ListCount > 0 {
maxKeys = opt.ListCount
}

req := &s3.ListObjectsInput{
Bucket: aws.String(rs.options.Bucket),
Prefix: aws.String(prefix),
Expand All @@ -382,7 +384,11 @@ func (rs *S3Storage) WalkDir(ctx context.Context, dir string, listCount int64, f
return err
}
for _, r := range res.Contents {
if err = fn(*r.Key, *r.Size); err != nil {
// when walk on specify directory, the result include storage.Prefix,
// which can not be reuse in other API(Open/Read) directly.
// so we use TrimPrefix to filter Prefix for next Open/Read.
path := strings.TrimPrefix(*r.Key, rs.options.Prefix)
if err = fn(path, *r.Size); err != nil {
return err
}
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
"github.com/pingcap/kvproto/pkg/backup"
)

// WalkOption is the option of storage.WalkDir.
type WalkOption struct {
// walk on SubDir of specify directory
SubDir string
// number of list count, default 1000
ListCount int64
}

// ReadSeekCloser is the interface that groups the basic Read, Seek and Close methods.
type ReadSeekCloser interface {
io.Reader
Expand Down Expand Up @@ -41,7 +49,7 @@ type ExternalStorage interface {
// The argument `path` is the file path that can be used in `Open`
// function; the argument `size` is the size in byte of the file determined
// by path.
WalkDir(ctx context.Context, dir string, listCount int64, fn func(path string, size int64) error) error
WalkDir(ctx context.Context, opt *WalkOption, fn func(path string, size int64) error) error

// CreateUploader create a uploader that will upload chunks data to storage.
// It's design for s3 multi-part upload currently. e.g. cdc log backup use this to do multi part upload
Expand Down