Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
17 changes: 17 additions & 0 deletions pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,23 @@ func (s *gcsStorage) FileExists(ctx context.Context, name string) (bool, error)
return true, nil
}

// Open a Reader by file name.
func (s *gcsStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) {
// TODO, implement this if needed
panic("Unsupported Operation")
}

// WalkDir traverse all the files in a dir.
//
// fn is the function called for each regular file visited by WalkDir.
// The first argument is the file path that can be used in `Open`
// function; the second argument is the size in byte of the file determined
// by path.
func (s *gcsStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error {
Comment thread
kennytm marked this conversation as resolved.
// TODO, implement this if needed
panic("Unsupported Operation")
}

func newGCSStorage(ctx context.Context, gcs *backup.GCS, sendCredential bool) (*gcsStorage, error) {
return newGCSStorageWithHTTPClient(ctx, gcs, nil, sendCredential)
}
Expand Down
47 changes: 40 additions & 7 deletions pkg/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,60 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"

"github.com/pingcap/errors"
)

// localStorage represents local file system storage.
type localStorage struct {
// LocalStorage represents local file system storage.
//
// export for using in tests.
type LocalStorage struct {
base string
}

func (l *localStorage) Write(ctx context.Context, name string, data []byte) error {
func (l *LocalStorage) Write(ctx context.Context, name string, data []byte) error {
filepath := path.Join(l.base, name)
return ioutil.WriteFile(filepath, data, 0644) // nolint:gosec
// the backupmeta file _is_ intended to be world-readable.
}

func (l *localStorage) Read(ctx context.Context, name string) ([]byte, error) {
func (l *LocalStorage) Read(ctx context.Context, name string) ([]byte, error) {
filepath := path.Join(l.base, name)
return ioutil.ReadFile(filepath)
}

// FileExists implement ExternalStorage.FileExists.
func (l *localStorage) FileExists(ctx context.Context, name string) (bool, error) {
func (l *LocalStorage) FileExists(ctx context.Context, name string) (bool, error) {
filepath := path.Join(l.base, name)
return pathExists(filepath)
}

// WalkDir traverse all the files in a dir.
//
// fn is the function called for each regular file visited by WalkDir.
// The first argument is the file path that can be used in `Open`
// function; the second argument is the size in byte of the file determined
// by path.
func (l *LocalStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error {
return filepath.Walk(l.base, func(path string, f os.FileInfo, err error) error {
if err != nil {
return errors.Trace(err)
}

if f == nil || f.IsDir() {
return nil
}

return fn(f.Name(), f.Size())
})
}

// Open a Reader by file name.
func (l *LocalStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) {
return os.Open(path.Join(l.base, name))
}

func pathExists(_path string) (bool, error) {
_, err := os.Stat(_path)
if err != nil {
Expand All @@ -42,7 +72,10 @@ func pathExists(_path string) (bool, error) {
return true, nil
}

func newLocalStorage(base string) (*localStorage, error) {
// NewLocalStorage return a LocalStorage at directory `base`.
//
// export for test.
func NewLocalStorage(base string) (*LocalStorage, error) {
Comment on lines +77 to +78
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the test?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used in lightning tests

ok, err := pathExists(base)
if err != nil {
return nil, err
Expand All @@ -53,5 +86,5 @@ func newLocalStorage(base string) (*localStorage, error) {
return nil, err
}
}
return &localStorage{base: base}, nil
return &LocalStorage{base: base}, nil
}
28 changes: 27 additions & 1 deletion pkg/storage/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

package storage

import "context"
import (
"context"
)

type noopStorage struct{}

Expand All @@ -21,6 +23,30 @@ func (*noopStorage) FileExists(ctx context.Context, name string) (bool, error) {
return false, nil
}

// Open a Reader by file name.
func (*noopStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) {
return noopReader{}, nil
}

// WalkDir traverse all the files in a dir.
func (*noopStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error {
return nil
}

func newNoopStorage() *noopStorage {
return &noopStorage{}
}

type noopReader struct{}

func (noopReader) Read(p []byte) (n int, err error) {
return 0, nil
}

func (noopReader) Close() error {
return nil
}

func (noopReader) Seek(offset int64, whence int) (int64, error) {
return 0, nil
}
148 changes: 148 additions & 0 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package storage
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/url"

Expand All @@ -31,13 +33,17 @@ const (
notFound = "NotFound"
// number of retries to make of operations
maxRetries = 3

// the maximum number of byte to read for seek
maxSkipOffsetByRead = 1 << 16 //64KB
)

// s3Handlers make it easy to inject test functions.
type s3Handlers interface {
HeadObjectWithContext(context.Context, *s3.HeadObjectInput, ...request.Option) (*s3.HeadObjectOutput, error)
GetObjectWithContext(context.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error)
PutObjectWithContext(context.Context, *s3.PutObjectInput, ...request.Option) (*s3.PutObjectOutput, error)
ListObjectsWithContext(context.Context, *s3.ListObjectsInput, ...request.Option) (*s3.ListObjectsOutput, error)
HeadBucketWithContext(context.Context, *s3.HeadBucketInput, ...request.Option) (*s3.HeadBucketOutput, error)
WaitUntilObjectExistsWithContext(context.Context, *s3.HeadObjectInput, ...request.WaiterOption) error
}
Expand Down Expand Up @@ -106,6 +112,7 @@ func (options *S3BackendOptions) apply(s3 *backup.S3) error {
return nil
}

// defineS3Flags defines the command line flags for S3BackendOptions.
func defineS3Flags(flags *pflag.FlagSet) {
// TODO: remove experimental tag if it's stable
flags.String(s3EndpointOption, "",
Expand All @@ -119,6 +126,7 @@ func defineS3Flags(flags *pflag.FlagSet) {
flags.String(s3ProviderOption, "", "(experimental) Set the S3 provider, e.g. aws, alibaba, ceph")
}

// parseFromFlags parse S3BackendOptions from command line flags.
func (options *S3BackendOptions) parseFromFlags(flags *pflag.FlagSet) error {
var err error
options.Endpoint, err = flags.GetString(s3EndpointOption)
Expand Down Expand Up @@ -292,3 +300,143 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error)

return true, err
}

// WalkDir traverse all the files in a dir.
//
// fn is the function called for each regular file visited by WalkDir.
// The first argument is the file path that can be used in `Open`
// function; the second argument is the size in byte of the file determined
// by path.
func (rs *S3Storage) WalkDir(ctx context.Context, fn func(string, int64) error) error {
var marker *string
maxKeys := int64(1000)
req := &s3.ListObjectsInput{
Bucket: &rs.options.Bucket,
Prefix: &rs.options.Prefix,
MaxKeys: &maxKeys,
}
for {
req.Marker = marker
res, err := rs.svc.ListObjectsWithContext(ctx, req)
if err != nil {
return err
}
for _, r := range res.Contents {
if err = fn(*r.Key, *r.Size); err != nil {
return err
}
}
if res.IsTruncated != nil && *res.IsTruncated {
marker = res.Marker
} else {
break
}
}

return nil
}

// Open a Reader by file name.
func (rs *S3Storage) Open(ctx context.Context, name string) (ReadSeekCloser, error) {
reader, err := rs.open(ctx, name, 0, 0)
if err != nil {
return nil, err
}
return &s3ObjectReader{
storage: rs,
name: name,
reader: reader,
}, nil
}

func (rs *S3Storage) open(ctx context.Context, name string, startOffset int64, endOffset int64) (io.ReadCloser, error) {
input := &s3.GetObjectInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + name),
}

var rangeOffset *string
if startOffset > 0 {
if endOffset > startOffset {
rangeOffset = aws.String(fmt.Sprintf("bytes=%d-%d", startOffset, endOffset))
} else {
rangeOffset = aws.String(fmt.Sprintf("bytes=%d-", startOffset))
}
input.Range = rangeOffset
}

result, err := rs.svc.GetObjectWithContext(ctx, input)
if err != nil {
return nil, err
}

// FIXME: we test in minio, when request with Range, the result.AcceptRanges is a bare string 'range',
// not sure whether this is a feature or bug
//if rangeOffset != nil && (result.AcceptRanges == nil || *result.AcceptRanges != *rangeOffset) {
// return nil, errors.Errorf("open file '%s' failed, expected range: %s, got: %v",
// name, *rangeOffset, result.AcceptRanges)
//}
Comment on lines +373 to +378
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we tested in AWS S3?


return result.Body, nil
}

// s3ObjectReader wrap GetObjectOutput.Body and add the `Seek` method.
type s3ObjectReader struct {
storage *S3Storage
name string
reader io.ReadCloser
pos int64
}

// Read implement the io.Reader interface.
func (r *s3ObjectReader) Read(p []byte) (n int, err error) {
n, err = r.reader.Read(p)
r.pos += int64(n)
return
}

// Close implement the io.Closer interface.
func (r *s3ObjectReader) Close() error {
return r.reader.Close()
}

// Seek implement the io.Seeker interface.
func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) {
var realOffset int64
switch whence {
case io.SeekStart:
realOffset = offset
case io.SeekCurrent:
realOffset = r.pos + offset
default:
// TODO: maybe we can fetch the object stat and calculate the absolute offset
return 0, errors.New("seek by SeekEnd is not supported yet")
}

if realOffset == r.pos {
return realOffset, nil
}

// if seek ahead no more than 64k, we discard these data
if realOffset > r.pos && offset-r.pos <= maxSkipOffsetByRead {
_, err := io.CopyN(ioutil.Discard, r, offset-r.pos)
if err != nil {
return r.pos, err
}
return realOffset, nil
}

// close current read and open a new one which target offset
err := r.reader.Close()
if err != nil {
return 0, err
}

newReader, err := r.storage.open(context.TODO(), r.name, realOffset, 0)
if err != nil {
return 0, err
}
r.reader = newReader
r.pos = realOffset
return realOffset, nil
}
21 changes: 21 additions & 0 deletions pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,27 @@ func (c *mockS3Handler) PutObjectWithContext(ctx context.Context,
input *s3.PutObjectInput, opts ...request.Option) (*s3.PutObjectOutput, error) {
return nil, c.err
}
func (c *mockS3Handler) ListObjectsWithContext(
context.Context,
*s3.ListObjectsInput,
...request.Option,
) (*s3.ListObjectsOutput, error) {
if c.err != nil {
return nil, c.err
}
truncated := false
key := "/HappyFace.jpg"
size := int64(13)
return &s3.ListObjectsOutput{
Contents: []*s3.Object{
{
Key: &key,
Size: &size,
},
},
IsTruncated: &truncated,
}, nil
}
func (c *mockS3Handler) HeadBucketWithContext(ctx context.Context,
input *s3.HeadBucketInput, opts ...request.Option) (*s3.HeadBucketOutput, error) {
return nil, c.err
Expand Down
Loading