diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index c9423a7c1..1239bb4c4 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -9,6 +9,8 @@ import ( "io" "io/ioutil" "net/url" + "regexp" + "strconv" "strings" "github.com/aws/aws-sdk-go/aws" @@ -365,6 +367,9 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) // function; the second argument is the size in byte of the file determined // by path. func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error { + if opt == nil { + opt = &WalkOption{} + } var marker *string prefix := rs.options.Prefix + opt.SubDir maxKeys := int64(1000) @@ -393,7 +398,7 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin } } if res.IsTruncated != nil && *res.IsTruncated { - marker = res.Marker + marker = res.NextMarker } else { break } @@ -404,59 +409,114 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin // Open a Reader by file path. func (rs *S3Storage) Open(ctx context.Context, path string) (ReadSeekCloser, error) { - reader, err := rs.open(ctx, path, 0, 0) + reader, r, err := rs.open(ctx, path, 0, 0) if err != nil { return nil, err } return &s3ObjectReader{ - storage: rs, - name: path, - reader: reader, + storage: rs, + name: path, + reader: reader, + ctx: ctx, + rangeInfo: r, }, nil } -func (rs *S3Storage) open(ctx context.Context, path string, startOffset int64, endOffset int64) (io.ReadCloser, error) { +type rangeInfo struct { + start int64 + end int64 + size int64 +} + +// if endOffset > startOffset, should return reader for bytes in [startOffset, endOffset). +func (rs *S3Storage) open( + ctx context.Context, + path string, + startOffset, endOffset int64, +) (io.ReadCloser, rangeInfo, error) { input := &s3.GetObjectInput{ Bucket: aws.String(rs.options.Bucket), Key: aws.String(rs.options.Prefix + path), } + // always set rangeOffset to fetch file size info + // s3 endOffset is inclusive var rangeOffset *string - if startOffset > 0 { - if endOffset > startOffset { - rangeOffset = aws.String(fmt.Sprintf("bytes=%d-%d", startOffset, endOffset)) - } else { - rangeOffset = aws.String(fmt.Sprintf("bytes=%d-", startOffset)) - } - input.Range = rangeOffset + if endOffset > startOffset { + rangeOffset = aws.String(fmt.Sprintf("bytes=%d-%d", startOffset, endOffset-1)) + } else { + rangeOffset = aws.String(fmt.Sprintf("bytes=%d-", startOffset)) } - + input.Range = rangeOffset result, err := rs.svc.GetObjectWithContext(ctx, input) if err != nil { - return nil, err + return nil, rangeInfo{}, err } - // FIXME: we test in minio, when request with Range, the result.AcceptRanges is a bare string 'range', - // not sure whether this is a feature or bug - //if rangeOffset != nil && (result.AcceptRanges == nil || *result.AcceptRanges != *rangeOffset) { - // return nil, errors.Errorf("open file '%s' failed, expected range: %s, got: %v", - // name, *rangeOffset, result.AcceptRanges) - //} + r, err := parseRangeInfo(result.ContentRange) + if err != nil { + return nil, rangeInfo{}, errors.Trace(err) + } + + if startOffset != r.start || (endOffset != 0 && endOffset != r.end+1) { + return nil, r, errors.Errorf("open file '%s' failed, expected range: %s, got: %v", + path, *rangeOffset, result.ContentRange) + } + + return result.Body, r, nil +} + +var ( + contentRangeRegex = regexp.MustCompile(`bytes (\d+)-(\d+)/(\d+)$`) +) - return result.Body, nil +func parseRangeInfo(info *string) (rangeInfo, error) { + if info == nil || len(*info) == 0 { + return rangeInfo{}, errors.New("ContentRange is empty") + } + subMatches := contentRangeRegex.FindStringSubmatch(*info) + if len(subMatches) != 4 { + return rangeInfo{}, errors.Errorf("invalid content range: '%s'", *info) + } + + start, err := strconv.ParseInt(subMatches[1], 10, 64) + if err != nil { + return rangeInfo{}, errors.Annotatef(err, + "invalid start offset value '%s' in ContentRange '%s'", subMatches[1], *info) + } + end, err := strconv.ParseInt(subMatches[2], 10, 64) + if err != nil { + return rangeInfo{}, errors.Annotatef(err, + "invalid end offset value '%s' in ContentRange '%s'", subMatches[2], *info) + } + size, err := strconv.ParseInt(subMatches[3], 10, 64) + if err != nil { + return rangeInfo{}, errors.Annotatef(err, + "invalid size size value '%s' in ContentRange '%s'", subMatches[3], *info) + } + return rangeInfo{start: start, end: end, size: size}, nil } // s3ObjectReader wrap GetObjectOutput.Body and add the `Seek` method. type s3ObjectReader struct { - storage *S3Storage - name string - reader io.ReadCloser - pos int64 + storage *S3Storage + name string + reader io.ReadCloser + pos int64 + rangeInfo rangeInfo + // reader context used for implement `io.Seek` + // currently, lightning depends on package `xitongsys/parquet-go` to read parquet file and it needs `io.Seeker` + // See: https://github.com/xitongsys/parquet-go/blob/207a3cee75900b2b95213627409b7bac0f190bb3/source/source.go#L9-L10 + ctx context.Context } // Read implement the io.Reader interface. func (r *s3ObjectReader) Read(p []byte) (n int, err error) { - n, err = r.reader.Read(p) + maxCnt := r.rangeInfo.end + 1 - r.pos + if maxCnt > int64(len(p)) { + maxCnt = int64(len(p)) + } + n, err = io.ReadFull(r.reader, p[:maxCnt]) r.pos += int64(n) return } @@ -467,6 +527,8 @@ func (r *s3ObjectReader) Close() error { } // Seek implement the io.Seeker interface. +// +// Currently, tidb-lightning depends on this method to read parquet file for s3 storage. func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { var realOffset int64 switch whence { @@ -474,9 +536,10 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { realOffset = offset case io.SeekCurrent: realOffset = r.pos + offset + case io.SeekEnd: + realOffset = r.rangeInfo.size + offset default: - // TODO: maybe we can fetch the object stat and calculate the absolute offset - return 0, errors.New("seek by SeekEnd is not supported yet") + return 0, errors.Errorf("Seek: invalid whence '%d'", whence) } if realOffset == r.pos { @@ -484,8 +547,8 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { } // if seek ahead no more than 64k, we discard these data - if realOffset > r.pos && offset-r.pos <= maxSkipOffsetByRead { - _, err := io.CopyN(ioutil.Discard, r, offset-r.pos) + if realOffset > r.pos && realOffset-r.pos <= maxSkipOffsetByRead { + _, err := io.CopyN(ioutil.Discard, r, realOffset-r.pos) if err != nil { return r.pos, err } @@ -498,11 +561,12 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { return 0, err } - newReader, err := r.storage.open(context.TODO(), r.name, realOffset, 0) + newReader, info, err := r.storage.open(r.ctx, r.name, realOffset, 0) if err != nil { return 0, err } r.reader = newReader + r.rangeInfo = info r.pos = realOffset return realOffset, nil } diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index 37c1687e3..07715e311 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -452,6 +452,20 @@ func (r *testStorageSuite) TestS3Others(c *C) { defineS3Flags(&pflag.FlagSet{}) } +func (r *testStorageSuite) TestS3Range(c *C) { + contentRange := "bytes 0-9/443" + ri, err := parseRangeInfo(&contentRange) + c.Assert(err, IsNil) + c.Assert(ri, Equals, rangeInfo{start: 0, end: 9, size: 443}) + + _, err = parseRangeInfo(nil) + c.Assert(err, ErrorMatches, "ContentRange is empty") + + badRange := "bytes " + _, err = parseRangeInfo(&badRange) + c.Assert(err, ErrorMatches, "invalid content range: 'bytes '") +} + type mockS3Handler struct { err error }