Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,23 @@ func (s *gcsStorage) FileExists(ctx context.Context, name string) (bool, error)
return true, nil
}

// Open a Reader by file name.
func (s *gcsStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) {
// TODO, implement this if needed
panic("Unsupported Operation")
}

// WalkDir traverse all the files in a dir.
//
// fn is the function called for each regular file visited by WalkDir.
// The first argument is the file path that can be used in `Open`
// function; the second argument is the size in byte of the file determined
// by path.
func (s *gcsStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error {
// TODO, implement this if needed
panic("Unsupported Operation")
}

func newGCSStorage(ctx context.Context, gcs *backup.GCS, sendCredential bool) (*gcsStorage, error) {
return newGCSStorageWithHTTPClient(ctx, gcs, nil, sendCredential)
}
Expand Down
47 changes: 40 additions & 7 deletions pkg/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,60 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"

"github.com/pingcap/errors"
)

// localStorage represents local file system storage.
type localStorage struct {
// LocalStorage represents local file system storage.
//
// export for using in tests.
type LocalStorage struct {
base string
}

func (l *localStorage) Write(ctx context.Context, name string, data []byte) error {
func (l *LocalStorage) Write(ctx context.Context, name string, data []byte) error {
filepath := path.Join(l.base, name)
return ioutil.WriteFile(filepath, data, 0644) // nolint:gosec
// the backupmeta file _is_ intended to be world-readable.
}

func (l *localStorage) Read(ctx context.Context, name string) ([]byte, error) {
func (l *LocalStorage) Read(ctx context.Context, name string) ([]byte, error) {
filepath := path.Join(l.base, name)
return ioutil.ReadFile(filepath)
}

// FileExists implement ExternalStorage.FileExists.
func (l *localStorage) FileExists(ctx context.Context, name string) (bool, error) {
func (l *LocalStorage) FileExists(ctx context.Context, name string) (bool, error) {
filepath := path.Join(l.base, name)
return pathExists(filepath)
}

// WalkDir traverse all the files in a dir.
//
// fn is the function called for each regular file visited by WalkDir.
// The first argument is the file path that can be used in `Open`
// function; the second argument is the size in byte of the file determined
// by path.
func (l *LocalStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error {
return filepath.Walk(l.base, func(path string, f os.FileInfo, err error) error {
if err != nil {
return errors.Trace(err)
}

if f == nil || f.IsDir() {
return nil
}

return fn(f.Name(), f.Size())
})
}

// Open a Reader by file name.
func (l *LocalStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) {
return os.Open(path.Join(l.base, name))
}

func pathExists(_path string) (bool, error) {
_, err := os.Stat(_path)
if err != nil {
Expand All @@ -42,7 +72,10 @@ func pathExists(_path string) (bool, error) {
return true, nil
}

func newLocalStorage(base string) (*localStorage, error) {
// NewLocalStorage return a LocalStorage at directory `base`.
//
// export for test.
func NewLocalStorage(base string) (*LocalStorage, error) {
ok, err := pathExists(base)
if err != nil {
return nil, err
Expand All @@ -53,5 +86,5 @@ func newLocalStorage(base string) (*localStorage, error) {
return nil, err
}
}
return &localStorage{base: base}, nil
return &LocalStorage{base: base}, nil
}
28 changes: 27 additions & 1 deletion pkg/storage/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

package storage

import "context"
import (
"context"
)

type noopStorage struct{}

Expand All @@ -21,6 +23,30 @@ func (*noopStorage) FileExists(ctx context.Context, name string) (bool, error) {
return false, nil
}

// Open a Reader by file name.
func (*noopStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) {
return noopReader{}, nil
}

// WalkDir traverse all the files in a dir.
func (*noopStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error {
return nil
}

func newNoopStorage() *noopStorage {
return &noopStorage{}
}

type noopReader struct{}

func (noopReader) Read(p []byte) (n int, err error) {
return 0, nil
}

func (noopReader) Close() error {
return nil
}

func (noopReader) Seek(offset int64, whence int) (int64, error) {
return 0, nil
}
148 changes: 148 additions & 0 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package storage
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/url"

Expand All @@ -31,13 +33,17 @@ const (
notFound = "NotFound"
// number of retries to make of operations
maxRetries = 3

// the maximum number of byte to read for seek
maxSkipOffsetByRead = 1 << 16 //64KB
)

// s3Handlers make it easy to inject test functions.
type s3Handlers interface {
HeadObjectWithContext(context.Context, *s3.HeadObjectInput, ...request.Option) (*s3.HeadObjectOutput, error)
GetObjectWithContext(context.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error)
PutObjectWithContext(context.Context, *s3.PutObjectInput, ...request.Option) (*s3.PutObjectOutput, error)
ListObjectsWithContext(context.Context, *s3.ListObjectsInput, ...request.Option) (*s3.ListObjectsOutput, error)
HeadBucketWithContext(context.Context, *s3.HeadBucketInput, ...request.Option) (*s3.HeadBucketOutput, error)
WaitUntilObjectExistsWithContext(context.Context, *s3.HeadObjectInput, ...request.WaiterOption) error
}
Expand Down Expand Up @@ -106,6 +112,7 @@ func (options *S3BackendOptions) apply(s3 *backup.S3) error {
return nil
}

// defineS3Flags defines the command line flags for S3BackendOptions.
func defineS3Flags(flags *pflag.FlagSet) {
// TODO: remove experimental tag if it's stable
flags.String(s3EndpointOption, "",
Expand All @@ -119,6 +126,7 @@ func defineS3Flags(flags *pflag.FlagSet) {
flags.String(s3ProviderOption, "", "(experimental) Set the S3 provider, e.g. aws, alibaba, ceph")
}

// parseFromFlags parse S3BackendOptions from command line flags.
func (options *S3BackendOptions) parseFromFlags(flags *pflag.FlagSet) error {
var err error
options.Endpoint, err = flags.GetString(s3EndpointOption)
Expand Down Expand Up @@ -292,3 +300,143 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error)

return true, err
}

// WalkDir traverse all the files in a dir.
//
// fn is the function called for each regular file visited by WalkDir.
// The first argument is the file path that can be used in `Open`
// function; the second argument is the size in byte of the file determined
// by path.
func (rs *S3Storage) WalkDir(ctx context.Context, fn func(string, int64) error) error {
var marker *string
maxKeys := int64(1000)
req := &s3.ListObjectsInput{
Bucket: &rs.options.Bucket,
Prefix: &rs.options.Prefix,
MaxKeys: &maxKeys,
}
for {
req.Marker = marker
res, err := rs.svc.ListObjectsWithContext(ctx, req)
if err != nil {
return err
}
for _, r := range res.Contents {
if err = fn(*r.Key, *r.Size); err != nil {
return err
}
}
if res.IsTruncated != nil && *res.IsTruncated {
marker = res.Marker
} else {
break
}
}

return nil
}

// Open a Reader by file name.
func (rs *S3Storage) Open(ctx context.Context, name string) (ReadSeekCloser, error) {
reader, err := rs.open(ctx, name, 0, 0)
if err != nil {
return nil, err
}
return &s3ObjectReader{
storage: rs,
name: name,
reader: reader,
}, nil
}

func (rs *S3Storage) open(ctx context.Context, name string, startOffset int64, endOffset int64) (io.ReadCloser, error) {
input := &s3.GetObjectInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + name),
}

var rangeOffset *string
if startOffset > 0 {
if endOffset > startOffset {
rangeOffset = aws.String(fmt.Sprintf("bytes=%d-%d", startOffset, endOffset))
} else {
rangeOffset = aws.String(fmt.Sprintf("bytes=%d-", startOffset))
}
input.Range = rangeOffset
}

result, err := rs.svc.GetObjectWithContext(ctx, input)
if err != nil {
return nil, err
}

// FIXME: we test in minio, when request with Range, the result.AcceptRanges is a bare string 'range',
// not sure whether this is a feature or bug
//if rangeOffset != nil && (result.AcceptRanges == nil || *result.AcceptRanges != *rangeOffset) {
// return nil, errors.Errorf("open file '%s' failed, expected range: %s, got: %v",
// name, *rangeOffset, result.AcceptRanges)
//}

return result.Body, nil
}

// s3ObjectReader wrap GetObjectOutput.Body and add the `Seek` method.
type s3ObjectReader struct {
storage *S3Storage
name string
reader io.ReadCloser
pos int64
}

// Read implement the io.Reader interface.
func (r *s3ObjectReader) Read(p []byte) (n int, err error) {
n, err = r.reader.Read(p)
r.pos += int64(n)
return
}

// Close implement the io.Closer interface.
func (r *s3ObjectReader) Close() error {
return r.reader.Close()
}

// Seek implement the io.Seeker interface.
func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) {
var realOffset int64
switch whence {
case io.SeekStart:
realOffset = offset
case io.SeekCurrent:
realOffset = r.pos + offset
default:
// TODO: maybe we can fetch the object stat and calculate the absolute offset
return 0, errors.New("seek by SeekEnd is not supported yet")
}

if realOffset == r.pos {
return realOffset, nil
}

// if seek ahead no more than 64k, we discard these data
if realOffset > r.pos && offset-r.pos <= maxSkipOffsetByRead {
_, err := io.CopyN(ioutil.Discard, r, offset-r.pos)
if err != nil {
return r.pos, err
}
return realOffset, nil
}

// close current read and open a new one which target offset
err := r.reader.Close()
if err != nil {
return 0, err
}

newReader, err := r.storage.open(context.TODO(), r.name, realOffset, 0)
if err != nil {
return 0, err
}
r.reader = newReader
r.pos = realOffset
return realOffset, nil
}
21 changes: 21 additions & 0 deletions pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,27 @@ func (c *mockS3Handler) PutObjectWithContext(ctx context.Context,
input *s3.PutObjectInput, opts ...request.Option) (*s3.PutObjectOutput, error) {
return nil, c.err
}
func (c *mockS3Handler) ListObjectsWithContext(
context.Context,
*s3.ListObjectsInput,
...request.Option,
) (*s3.ListObjectsOutput, error) {
if c.err != nil {
return nil, c.err
}
truncated := false
key := "/HappyFace.jpg"
size := int64(13)
return &s3.ListObjectsOutput{
Contents: []*s3.Object{
{
Key: &key,
Size: &size,
},
},
IsTruncated: &truncated,
}, nil
}
func (c *mockS3Handler) HeadBucketWithContext(ctx context.Context,
input *s3.HeadBucketInput, opts ...request.Option) (*s3.HeadBucketOutput, error) {
return nil, c.err
Expand Down
Loading