diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index ccfaaf70b..c41fc2e09 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -123,6 +123,23 @@ func (s *gcsStorage) FileExists(ctx context.Context, name string) (bool, error) return true, nil } +// Open a Reader by file name. +func (s *gcsStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { + // TODO, implement this if needed + panic("Unsupported Operation") +} + +// WalkDir traverse all the files in a dir. +// +// fn is the function called for each regular file visited by WalkDir. +// The first argument is the file path that can be used in `Open` +// function; the second argument is the size in byte of the file determined +// by path. +func (s *gcsStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { + // TODO, implement this if needed + panic("Unsupported Operation") +} + func newGCSStorage(ctx context.Context, gcs *backup.GCS, sendCredential bool) (*gcsStorage, error) { return newGCSStorageWithHTTPClient(ctx, gcs, nil, sendCredential) } diff --git a/pkg/storage/local.go b/pkg/storage/local.go index e49fe16ee..06eae91f9 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -7,30 +7,60 @@ import ( "io/ioutil" "os" "path" + "path/filepath" + + "github.com/pingcap/errors" ) -// localStorage represents local file system storage. -type localStorage struct { +// LocalStorage represents local file system storage. +// +// export for using in tests. +type LocalStorage struct { base string } -func (l *localStorage) Write(ctx context.Context, name string, data []byte) error { +func (l *LocalStorage) Write(ctx context.Context, name string, data []byte) error { filepath := path.Join(l.base, name) return ioutil.WriteFile(filepath, data, 0644) // nolint:gosec // the backupmeta file _is_ intended to be world-readable. } -func (l *localStorage) Read(ctx context.Context, name string) ([]byte, error) { +func (l *LocalStorage) Read(ctx context.Context, name string) ([]byte, error) { filepath := path.Join(l.base, name) return ioutil.ReadFile(filepath) } // FileExists implement ExternalStorage.FileExists. -func (l *localStorage) FileExists(ctx context.Context, name string) (bool, error) { +func (l *LocalStorage) FileExists(ctx context.Context, name string) (bool, error) { filepath := path.Join(l.base, name) return pathExists(filepath) } +// WalkDir traverse all the files in a dir. +// +// fn is the function called for each regular file visited by WalkDir. +// The first argument is the file path that can be used in `Open` +// function; the second argument is the size in byte of the file determined +// by path. +func (l *LocalStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { + return filepath.Walk(l.base, func(path string, f os.FileInfo, err error) error { + if err != nil { + return errors.Trace(err) + } + + if f == nil || f.IsDir() { + return nil + } + + return fn(f.Name(), f.Size()) + }) +} + +// Open a Reader by file name. +func (l *LocalStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { + return os.Open(path.Join(l.base, name)) +} + func pathExists(_path string) (bool, error) { _, err := os.Stat(_path) if err != nil { @@ -42,7 +72,10 @@ func pathExists(_path string) (bool, error) { return true, nil } -func newLocalStorage(base string) (*localStorage, error) { +// NewLocalStorage return a LocalStorage at directory `base`. +// +// export for test. +func NewLocalStorage(base string) (*LocalStorage, error) { ok, err := pathExists(base) if err != nil { return nil, err @@ -53,5 +86,5 @@ func newLocalStorage(base string) (*localStorage, error) { return nil, err } } - return &localStorage{base: base}, nil + return &LocalStorage{base: base}, nil } diff --git a/pkg/storage/noop.go b/pkg/storage/noop.go index bfd79a7cf..83c7384ed 100644 --- a/pkg/storage/noop.go +++ b/pkg/storage/noop.go @@ -2,7 +2,9 @@ package storage -import "context" +import ( + "context" +) type noopStorage struct{} @@ -21,6 +23,30 @@ func (*noopStorage) FileExists(ctx context.Context, name string) (bool, error) { return false, nil } +// Open a Reader by file name. +func (*noopStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { + return noopReader{}, nil +} + +// WalkDir traverse all the files in a dir. +func (*noopStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { + return nil +} + func newNoopStorage() *noopStorage { return &noopStorage{} } + +type noopReader struct{} + +func (noopReader) Read(p []byte) (n int, err error) { + return 0, nil +} + +func (noopReader) Close() error { + return nil +} + +func (noopReader) Seek(offset int64, whence int) (int64, error) { + return 0, nil +} diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index d8d6e780f..429e8b8d8 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -5,6 +5,8 @@ package storage import ( "bytes" "context" + "fmt" + "io" "io/ioutil" "net/url" @@ -31,6 +33,9 @@ const ( notFound = "NotFound" // number of retries to make of operations maxRetries = 3 + + // the maximum number of byte to read for seek + maxSkipOffsetByRead = 1 << 16 //64KB ) // s3Handlers make it easy to inject test functions. @@ -38,6 +43,7 @@ type s3Handlers interface { HeadObjectWithContext(context.Context, *s3.HeadObjectInput, ...request.Option) (*s3.HeadObjectOutput, error) GetObjectWithContext(context.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) PutObjectWithContext(context.Context, *s3.PutObjectInput, ...request.Option) (*s3.PutObjectOutput, error) + ListObjectsWithContext(context.Context, *s3.ListObjectsInput, ...request.Option) (*s3.ListObjectsOutput, error) HeadBucketWithContext(context.Context, *s3.HeadBucketInput, ...request.Option) (*s3.HeadBucketOutput, error) WaitUntilObjectExistsWithContext(context.Context, *s3.HeadObjectInput, ...request.WaiterOption) error } @@ -106,6 +112,7 @@ func (options *S3BackendOptions) apply(s3 *backup.S3) error { return nil } +// defineS3Flags defines the command line flags for S3BackendOptions. func defineS3Flags(flags *pflag.FlagSet) { // TODO: remove experimental tag if it's stable flags.String(s3EndpointOption, "", @@ -119,6 +126,7 @@ func defineS3Flags(flags *pflag.FlagSet) { flags.String(s3ProviderOption, "", "(experimental) Set the S3 provider, e.g. aws, alibaba, ceph") } +// parseFromFlags parse S3BackendOptions from command line flags. func (options *S3BackendOptions) parseFromFlags(flags *pflag.FlagSet) error { var err error options.Endpoint, err = flags.GetString(s3EndpointOption) @@ -292,3 +300,143 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) return true, err } + +// WalkDir traverse all the files in a dir. +// +// fn is the function called for each regular file visited by WalkDir. +// The first argument is the file path that can be used in `Open` +// function; the second argument is the size in byte of the file determined +// by path. +func (rs *S3Storage) WalkDir(ctx context.Context, fn func(string, int64) error) error { + var marker *string + maxKeys := int64(1000) + req := &s3.ListObjectsInput{ + Bucket: &rs.options.Bucket, + Prefix: &rs.options.Prefix, + MaxKeys: &maxKeys, + } + for { + req.Marker = marker + res, err := rs.svc.ListObjectsWithContext(ctx, req) + if err != nil { + return err + } + for _, r := range res.Contents { + if err = fn(*r.Key, *r.Size); err != nil { + return err + } + } + if res.IsTruncated != nil && *res.IsTruncated { + marker = res.Marker + } else { + break + } + } + + return nil +} + +// Open a Reader by file name. +func (rs *S3Storage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { + reader, err := rs.open(ctx, name, 0, 0) + if err != nil { + return nil, err + } + return &s3ObjectReader{ + storage: rs, + name: name, + reader: reader, + }, nil +} + +func (rs *S3Storage) open(ctx context.Context, name string, startOffset int64, endOffset int64) (io.ReadCloser, error) { + input := &s3.GetObjectInput{ + Bucket: aws.String(rs.options.Bucket), + Key: aws.String(rs.options.Prefix + name), + } + + var rangeOffset *string + if startOffset > 0 { + if endOffset > startOffset { + rangeOffset = aws.String(fmt.Sprintf("bytes=%d-%d", startOffset, endOffset)) + } else { + rangeOffset = aws.String(fmt.Sprintf("bytes=%d-", startOffset)) + } + input.Range = rangeOffset + } + + result, err := rs.svc.GetObjectWithContext(ctx, input) + if err != nil { + return nil, err + } + + // FIXME: we test in minio, when request with Range, the result.AcceptRanges is a bare string 'range', + // not sure whether this is a feature or bug + //if rangeOffset != nil && (result.AcceptRanges == nil || *result.AcceptRanges != *rangeOffset) { + // return nil, errors.Errorf("open file '%s' failed, expected range: %s, got: %v", + // name, *rangeOffset, result.AcceptRanges) + //} + + return result.Body, nil +} + +// s3ObjectReader wrap GetObjectOutput.Body and add the `Seek` method. +type s3ObjectReader struct { + storage *S3Storage + name string + reader io.ReadCloser + pos int64 +} + +// Read implement the io.Reader interface. +func (r *s3ObjectReader) Read(p []byte) (n int, err error) { + n, err = r.reader.Read(p) + r.pos += int64(n) + return +} + +// Close implement the io.Closer interface. +func (r *s3ObjectReader) Close() error { + return r.reader.Close() +} + +// Seek implement the io.Seeker interface. +func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { + var realOffset int64 + switch whence { + case io.SeekStart: + realOffset = offset + case io.SeekCurrent: + realOffset = r.pos + offset + default: + // TODO: maybe we can fetch the object stat and calculate the absolute offset + return 0, errors.New("seek by SeekEnd is not supported yet") + } + + if realOffset == r.pos { + return realOffset, nil + } + + // if seek ahead no more than 64k, we discard these data + if realOffset > r.pos && offset-r.pos <= maxSkipOffsetByRead { + _, err := io.CopyN(ioutil.Discard, r, offset-r.pos) + if err != nil { + return r.pos, err + } + return realOffset, nil + } + + // close current read and open a new one which target offset + err := r.reader.Close() + if err != nil { + return 0, err + } + + newReader, err := r.storage.open(context.TODO(), r.name, realOffset, 0) + if err != nil { + return 0, err + } + r.reader = newReader + r.pos = realOffset + return realOffset, nil +} diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index 3e3d874fb..16f4350a8 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -473,6 +473,27 @@ func (c *mockS3Handler) PutObjectWithContext(ctx context.Context, input *s3.PutObjectInput, opts ...request.Option) (*s3.PutObjectOutput, error) { return nil, c.err } +func (c *mockS3Handler) ListObjectsWithContext( + context.Context, + *s3.ListObjectsInput, + ...request.Option, +) (*s3.ListObjectsOutput, error) { + if c.err != nil { + return nil, c.err + } + truncated := false + key := "/HappyFace.jpg" + size := int64(13) + return &s3.ListObjectsOutput{ + Contents: []*s3.Object{ + { + Key: &key, + Size: &size, + }, + }, + IsTruncated: &truncated, + }, nil +} func (c *mockS3Handler) HeadBucketWithContext(ctx context.Context, input *s3.HeadBucketInput, opts ...request.Option) (*s3.HeadBucketOutput, error) { return nil, c.err diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index f93acac55..4e6ad6ca1 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -4,11 +4,19 @@ package storage import ( "context" + "io" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" ) +// ReadSeekCloser is the interface that groups the basic Read, Seek and Close methods. +type ReadSeekCloser interface { + io.Reader + io.Seeker + io.Closer +} + // ExternalStorage represents a kind of file system storage. type ExternalStorage interface { // Write file to storage @@ -17,13 +25,22 @@ type ExternalStorage interface { Read(ctx context.Context, name string) ([]byte, error) // FileExists return true if file exists FileExists(ctx context.Context, name string) (bool, error) + // Open a Reader by file name. + Open(ctx context.Context, name string) (ReadSeekCloser, error) + // WalkDir traverse all the files in a dir. + // + // fn is the function called for each regular file visited by WalkDir. + // The argument `path` is the file path that can be used in `Open` + // function; the argument `size` is the size in byte of the file determined + // by path. + WalkDir(ctx context.Context, fn func(path string, size int64) error) error } // Create creates ExternalStorage. func Create(ctx context.Context, backend *backup.StorageBackend, sendCreds bool) (ExternalStorage, error) { switch backend := backend.Backend.(type) { case *backup.StorageBackend_Local: - return newLocalStorage(backend.Local.Path) + return NewLocalStorage(backend.Local.Path) case *backup.StorageBackend_S3: if backend.S3 == nil { return nil, errors.New("s3 config not found") diff --git a/pkg/utils/math.go b/pkg/utils/math.go index 42615ff48..c0cdb8254 100644 --- a/pkg/utils/math.go +++ b/pkg/utils/math.go @@ -37,3 +37,14 @@ func ClampInt(n, min, max int) int { return MinInt(max, MaxInt(min, n)) } + +// MinInt64 choice smallest integer from its arguments. +func MinInt64(x int64, xs ...int64) int64 { + min := x + for _, n := range xs { + if n < min { + min = n + } + } + return min +} diff --git a/pkg/utils/math_test.go b/pkg/utils/math_test.go index 91c3300ad..27c9821bd 100644 --- a/pkg/utils/math_test.go +++ b/pkg/utils/math_test.go @@ -31,3 +31,10 @@ func (*testMathSuite) TestClampInt(c *C) { c.Assert(ClampInt(0, 1, 1), Equals, 1) c.Assert(ClampInt(100, 1, 1), Equals, 1) } + +func (*testMathSuite) TestMinInt64(c *C) { + c.Assert(MinInt(1, 2), Equals, 1) + c.Assert(MinInt(2, 1), Equals, 1) + c.Assert(MinInt(4, 2, 1, 3), Equals, 1) + c.Assert(MinInt(1, 1), Equals, 1) +}