Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 96 additions & 32 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"io"
"io/ioutil"
"net/url"
"regexp"
"strconv"
"strings"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -365,6 +367,9 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error)
// function; the second argument is the size in byte of the file determined
// by path.
func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error {
if opt == nil {
opt = &WalkOption{}
}
var marker *string
prefix := rs.options.Prefix + opt.SubDir
maxKeys := int64(1000)
Expand Down Expand Up @@ -393,7 +398,7 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin
}
}
if res.IsTruncated != nil && *res.IsTruncated {
marker = res.Marker
marker = res.NextMarker
} else {
break
}
Expand All @@ -404,59 +409,114 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin

// Open a Reader by file path.
func (rs *S3Storage) Open(ctx context.Context, path string) (ReadSeekCloser, error) {
reader, err := rs.open(ctx, path, 0, 0)
reader, r, err := rs.open(ctx, path, 0, 0)
if err != nil {
return nil, err
}
return &s3ObjectReader{
storage: rs,
name: path,
reader: reader,
storage: rs,
name: path,
reader: reader,
ctx: ctx,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Urgh, you're voiding the progress of #452. At this point it's better to change the method to be SeekContext(ctx context.Context, offset int64, whence int) error and abandon the Seeker interface.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this ctx is mainly using for read parquet files, because parquet reader need open multi reader for each column, if we change this interface, how can we met the parquet file read requirement here: https://github.com/xitongsys/parquet-go/blob/master/source/source.go#L9-L16

rangeInfo: r,
}, nil
}

func (rs *S3Storage) open(ctx context.Context, path string, startOffset int64, endOffset int64) (io.ReadCloser, error) {
type rangeInfo struct {
start int64
end int64
size int64
}

// if endOffset > startOffset, should return reader for bytes in [startOffset, endOffset).
func (rs *S3Storage) open(
ctx context.Context,
path string,
startOffset, endOffset int64,
) (io.ReadCloser, rangeInfo, error) {
input := &s3.GetObjectInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + path),
}

// always set rangeOffset to fetch file size info
// s3 endOffset is inclusive
var rangeOffset *string
if startOffset > 0 {
if endOffset > startOffset {
rangeOffset = aws.String(fmt.Sprintf("bytes=%d-%d", startOffset, endOffset))
} else {
rangeOffset = aws.String(fmt.Sprintf("bytes=%d-", startOffset))
}
input.Range = rangeOffset
if endOffset > startOffset {
rangeOffset = aws.String(fmt.Sprintf("bytes=%d-%d", startOffset, endOffset-1))
} else {
rangeOffset = aws.String(fmt.Sprintf("bytes=%d-", startOffset))
}

input.Range = rangeOffset
result, err := rs.svc.GetObjectWithContext(ctx, input)
if err != nil {
return nil, err
return nil, rangeInfo{}, err
}

// FIXME: we test in minio, when request with Range, the result.AcceptRanges is a bare string 'range',
// not sure whether this is a feature or bug
//if rangeOffset != nil && (result.AcceptRanges == nil || *result.AcceptRanges != *rangeOffset) {
// return nil, errors.Errorf("open file '%s' failed, expected range: %s, got: %v",
// name, *rangeOffset, result.AcceptRanges)
//}
r, err := parseRangeInfo(result.ContentRange)
if err != nil {
return nil, rangeInfo{}, errors.Trace(err)
}

if startOffset != r.start || (endOffset != 0 && endOffset != r.end+1) {
return nil, r, errors.Errorf("open file '%s' failed, expected range: %s, got: %v",
path, *rangeOffset, result.ContentRange)
}

return result.Body, r, nil
}

var (
contentRangeRegex = regexp.MustCompile(`bytes (\d+)-(\d+)/(\d+)$`)
)

return result.Body, nil
func parseRangeInfo(info *string) (rangeInfo, error) {
if info == nil || len(*info) == 0 {
return rangeInfo{}, errors.New("ContentRange is empty")
}
subMatches := contentRangeRegex.FindStringSubmatch(*info)
if len(subMatches) != 4 {
return rangeInfo{}, errors.Errorf("invalid content range: '%s'", *info)
}

start, err := strconv.ParseInt(subMatches[1], 10, 64)
if err != nil {
return rangeInfo{}, errors.Annotatef(err,
"invalid start offset value '%s' in ContentRange '%s'", subMatches[1], *info)
}
end, err := strconv.ParseInt(subMatches[2], 10, 64)
if err != nil {
return rangeInfo{}, errors.Annotatef(err,
"invalid end offset value '%s' in ContentRange '%s'", subMatches[2], *info)
}
size, err := strconv.ParseInt(subMatches[3], 10, 64)
if err != nil {
return rangeInfo{}, errors.Annotatef(err,
"invalid size size value '%s' in ContentRange '%s'", subMatches[3], *info)
}
return rangeInfo{start: start, end: end, size: size}, nil
}

// s3ObjectReader wrap GetObjectOutput.Body and add the `Seek` method.
type s3ObjectReader struct {
storage *S3Storage
name string
reader io.ReadCloser
pos int64
storage *S3Storage
name string
reader io.ReadCloser
pos int64
rangeInfo rangeInfo
// reader context used for implement `io.Seek`
// currently, lightning depends on package `xitongsys/parquet-go` to read parquet file and it needs `io.Seeker`
// See: https://github.com/xitongsys/parquet-go/blob/207a3cee75900b2b95213627409b7bac0f190bb3/source/source.go#L9-L10
ctx context.Context
}

// Read implement the io.Reader interface.
func (r *s3ObjectReader) Read(p []byte) (n int, err error) {
n, err = r.reader.Read(p)
maxCnt := r.rangeInfo.end + 1 - r.pos
if maxCnt > int64(len(p)) {
maxCnt = int64(len(p))
}
n, err = io.ReadFull(r.reader, p[:maxCnt])
r.pos += int64(n)
return
}
Expand All @@ -467,25 +527,28 @@ func (r *s3ObjectReader) Close() error {
}

// Seek implement the io.Seeker interface.
//
// Currently, tidb-lightning depends on this method to read parquet file for s3 storage.
func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) {
var realOffset int64
switch whence {
case io.SeekStart:
realOffset = offset
case io.SeekCurrent:
realOffset = r.pos + offset
case io.SeekEnd:
realOffset = r.rangeInfo.size + offset
Comment thread
3pointer marked this conversation as resolved.
default:
// TODO: maybe we can fetch the object stat and calculate the absolute offset
return 0, errors.New("seek by SeekEnd is not supported yet")
return 0, errors.Errorf("Seek: invalid whence '%d'", whence)
}

if realOffset == r.pos {
return realOffset, nil
}

// if seek ahead no more than 64k, we discard these data
if realOffset > r.pos && offset-r.pos <= maxSkipOffsetByRead {
_, err := io.CopyN(ioutil.Discard, r, offset-r.pos)
if realOffset > r.pos && realOffset-r.pos <= maxSkipOffsetByRead {
_, err := io.CopyN(ioutil.Discard, r, realOffset-r.pos)
if err != nil {
return r.pos, err
}
Expand All @@ -498,11 +561,12 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) {
return 0, err
}

newReader, err := r.storage.open(context.TODO(), r.name, realOffset, 0)
newReader, info, err := r.storage.open(r.ctx, r.name, realOffset, 0)
if err != nil {
return 0, err
}
r.reader = newReader
r.rangeInfo = info
r.pos = realOffset
return realOffset, nil
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,20 @@ func (r *testStorageSuite) TestS3Others(c *C) {
defineS3Flags(&pflag.FlagSet{})
}

func (r *testStorageSuite) TestS3Range(c *C) {
contentRange := "bytes 0-9/443"
ri, err := parseRangeInfo(&contentRange)
c.Assert(err, IsNil)
c.Assert(ri, Equals, rangeInfo{start: 0, end: 9, size: 443})

_, err = parseRangeInfo(nil)
c.Assert(err, ErrorMatches, "ContentRange is empty")

badRange := "bytes "
_, err = parseRangeInfo(&badRange)
c.Assert(err, ErrorMatches, "invalid content range: 'bytes '")
}

type mockS3Handler struct {
err error
}
Expand Down