From 05f41ae3e7175307df36f7846cc75d4a405a8f51 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 27 Aug 2020 15:38:47 +0800 Subject: [PATCH 01/18] support s3 seek end --- pkg/storage/s3.go | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index c9423a7c1..677e0e6e8 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -412,6 +412,7 @@ func (rs *S3Storage) Open(ctx context.Context, path string) (ReadSeekCloser, err storage: rs, name: path, reader: reader, + ctx: ctx, }, nil } @@ -452,6 +453,9 @@ type s3ObjectReader struct { name string reader io.ReadCloser pos int64 + size *int64 + // reader context used for seek + ctx context.Context } // Read implement the io.Reader interface. @@ -466,6 +470,24 @@ func (r *s3ObjectReader) Close() error { return r.reader.Close() } +func (r *s3ObjectReader) fileSize(ctx context.Context) (int64, error) { + if r.size == nil { + hio := &s3.HeadObjectInput{ + Bucket: aws.String(r.storage.options.Bucket), + Key: aws.String(r.storage.options.Prefix + r.name), + } + + hoo, err := r.storage.svc.HeadObjectWithContext(ctx, hio) + if err != nil { + return 0, errors.Trace(err) + } + + r.size = hoo.ContentLength + } + + return *r.size, nil +} + // Seek implement the io.Seeker interface. func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { var realOffset int64 @@ -474,9 +496,14 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { realOffset = offset case io.SeekCurrent: realOffset = r.pos + offset + case io.SeekEnd: + fileSize, err := r.fileSize(r.ctx) + if err != nil { + return 0, errors.Trace(err) + } + realOffset = fileSize + offset default: - // TODO: maybe we can fetch the object stat and calculate the absolute offset - return 0, errors.New("seek by SeekEnd is not supported yet") + return 0, errors.Errorf("Seek: invalid whence '%d'", whence) } if realOffset == r.pos { @@ -498,7 +525,7 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { return 0, err } - newReader, err := r.storage.open(context.TODO(), r.name, realOffset, 0) + newReader, err := r.storage.open(r.ctx, r.name, realOffset, 0) if err != nil { return 0, err } From 9c07281b5eb45111621389c8d8443cf92c47b5ea Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 27 Aug 2020 16:00:55 +0800 Subject: [PATCH 02/18] support nil for WalkOption --- pkg/storage/s3.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 677e0e6e8..bfbb05d0e 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -365,6 +365,9 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) // function; the second argument is the size in byte of the file determined // by path. func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error { + if opt == nil { + opt = &WalkOption{} + } var marker *string prefix := rs.options.Prefix + opt.SubDir maxKeys := int64(1000) From 6093b77cb9b7ccc7c7c1a612ef136a9d6730926e Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 27 Aug 2020 18:08:05 +0800 Subject: [PATCH 03/18] try parse range info from respose --- pkg/storage/s3.go | 90 +++++++++++++++++++++++++++++++----------- pkg/storage/s3_test.go | 15 +++++++ 2 files changed, 82 insertions(+), 23 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index bfbb05d0e..3b99049bf 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -9,6 +9,8 @@ import ( "io" "io/ioutil" "net/url" + "regexp" + "strconv" "strings" "github.com/aws/aws-sdk-go/aws" @@ -407,19 +409,26 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin // Open a Reader by file path. func (rs *S3Storage) Open(ctx context.Context, path string) (ReadSeekCloser, error) { - reader, err := rs.open(ctx, path, 0, 0) + reader, r, err := rs.open(ctx, path, 0, 0) if err != nil { return nil, err } return &s3ObjectReader{ - storage: rs, - name: path, - reader: reader, - ctx: ctx, + storage: rs, + name: path, + reader: reader, + ctx: ctx, + rangeInfo: r, }, nil } -func (rs *S3Storage) open(ctx context.Context, path string, startOffset int64, endOffset int64) (io.ReadCloser, error) { +type rangeInfo struct { + start int64 + end int64 + size int64 +} + +func (rs *S3Storage) open(ctx context.Context, path string, startOffset int64, endOffset int64) (io.ReadCloser, *rangeInfo, error) { input := &s3.GetObjectInput{ Bucket: aws.String(rs.options.Bucket), Key: aws.String(rs.options.Prefix + path), @@ -437,26 +446,60 @@ func (rs *S3Storage) open(ctx context.Context, path string, startOffset int64, e result, err := rs.svc.GetObjectWithContext(ctx, input) if err != nil { - return nil, err + return nil, nil, err } - // FIXME: we test in minio, when request with Range, the result.AcceptRanges is a bare string 'range', - // not sure whether this is a feature or bug - //if rangeOffset != nil && (result.AcceptRanges == nil || *result.AcceptRanges != *rangeOffset) { - // return nil, errors.Errorf("open file '%s' failed, expected range: %s, got: %v", - // name, *rangeOffset, result.AcceptRanges) - //} + var r *rangeInfo + if result.ContentRange != nil { + r, err = parseRangeInfo(result.ContentRange) + if err != nil { + return nil, nil, errors.Trace(err) + } + } - return result.Body, nil + if rangeOffset != nil && (r == nil || startOffset != r.start || (endOffset != 0 && endOffset != r.end)) { + return nil, nil, errors.Errorf("open file '%s' failed, expected range: %s, got: %v", + path, *rangeOffset, result.ContentRange) + } + + return result.Body, r, nil +} + +var ( + contentRangeRegex = regexp.MustCompile(`bytes (\d+)-(\d+)/(\d+)$`) +) + +func parseRangeInfo(info *string) (*rangeInfo, error) { + if info == nil || len(*info) == 0 { + return nil, nil + } + subMatches := contentRangeRegex.FindStringSubmatch(*info) + if len(subMatches) != 4 { + return nil, errors.Errorf("invalid content range: '%s'", *info) + } + + start, err := strconv.ParseInt(subMatches[1], 10, 64) + if err != nil { + return nil, errors.Annotatef(err, "invalid start offset value '%s' in ContentRange '%s'", subMatches[1], *info) + } + end, err := strconv.ParseInt(subMatches[2], 10, 64) + if err != nil { + return nil, errors.Annotatef(err, "invalid end offset value '%s' in ContentRange '%s'", subMatches[2], *info) + } + size, err := strconv.ParseInt(subMatches[3], 10, 64) + if err != nil { + return nil, errors.Annotatef(err, "invalid size size value '%s' in ContentRange '%s'", subMatches[3], *info) + } + return &rangeInfo{start: start, end: end, size: size}, nil } // s3ObjectReader wrap GetObjectOutput.Body and add the `Seek` method. type s3ObjectReader struct { - storage *S3Storage - name string - reader io.ReadCloser - pos int64 - size *int64 + storage *S3Storage + name string + reader io.ReadCloser + pos int64 + rangeInfo *rangeInfo // reader context used for seek ctx context.Context } @@ -474,7 +517,7 @@ func (r *s3ObjectReader) Close() error { } func (r *s3ObjectReader) fileSize(ctx context.Context) (int64, error) { - if r.size == nil { + if r.rangeInfo == nil { hio := &s3.HeadObjectInput{ Bucket: aws.String(r.storage.options.Bucket), Key: aws.String(r.storage.options.Prefix + r.name), @@ -485,10 +528,10 @@ func (r *s3ObjectReader) fileSize(ctx context.Context) (int64, error) { return 0, errors.Trace(err) } - r.size = hoo.ContentLength + r.rangeInfo = &rangeInfo{start: 0, end: *hoo.ContentLength, size: *hoo.ContentLength} } - return *r.size, nil + return r.rangeInfo.size, nil } // Seek implement the io.Seeker interface. @@ -528,11 +571,12 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { return 0, err } - newReader, err := r.storage.open(r.ctx, r.name, realOffset, 0) + newReader, rangeInfo, err := r.storage.open(r.ctx, r.name, realOffset, 0) if err != nil { return 0, err } r.reader = newReader + r.rangeInfo = rangeInfo r.pos = realOffset return realOffset, nil } diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index 37c1687e3..f4070b601 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -452,6 +452,21 @@ func (r *testStorageSuite) TestS3Others(c *C) { defineS3Flags(&pflag.FlagSet{}) } +func (r *testStorageSuite) TestS3Range(c *C) { + contentRange := "bytes 0-9/443" + ri, err := parseRangeInfo(&contentRange) + c.Assert(err, IsNil) + c.Assert(*ri, Equals, rangeInfo{start: 0, end: 9, size: 443}) + + ri, err = parseRangeInfo(nil) + c.Assert(err, IsNil) + c.Assert(ri, IsNil) + + badRange := "bytes " + ri, err = parseRangeInfo(&badRange) + c.Assert(err, ErrorMatches, "invalid content range: 'bytes '") +} + type mockS3Handler struct { err error } From 30254e9fca4db336a3f8e196ec5e585f7ac8b57f Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 27 Aug 2020 18:25:04 +0800 Subject: [PATCH 04/18] fix test --- pkg/storage/s3_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index f4070b601..0e63a4e9c 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -463,7 +463,7 @@ func (r *testStorageSuite) TestS3Range(c *C) { c.Assert(ri, IsNil) badRange := "bytes " - ri, err = parseRangeInfo(&badRange) + _, err = parseRangeInfo(&badRange) c.Assert(err, ErrorMatches, "invalid content range: 'bytes '") } From 223cb1894f31bd88e447ef2bc230645a58654e28 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 27 Aug 2020 19:57:02 +0800 Subject: [PATCH 05/18] fix lint --- pkg/storage/s3.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 3b99049bf..317612a4b 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -428,7 +428,11 @@ type rangeInfo struct { size int64 } -func (rs *S3Storage) open(ctx context.Context, path string, startOffset int64, endOffset int64) (io.ReadCloser, *rangeInfo, error) { +func (rs *S3Storage) open( + ctx context.Context, + path string, + startOffset, endOffset int64, +) (io.ReadCloser, *rangeInfo, error) { input := &s3.GetObjectInput{ Bucket: aws.String(rs.options.Bucket), Key: aws.String(rs.options.Prefix + path), From b16b258c3e64ae2e2e91f644e43987dfa5bcb1eb Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 27 Aug 2020 20:57:48 +0800 Subject: [PATCH 06/18] fix a bug in s3 WalkDir --- pkg/storage/s3.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 317612a4b..3dde849c7 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -398,7 +398,7 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin } } if res.IsTruncated != nil && *res.IsTruncated { - marker = res.Marker + marker = res.NextMarker } else { break } From daabdc060cab542758a308a0deddd71a8c96f8f9 Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 28 Aug 2020 10:59:49 +0800 Subject: [PATCH 07/18] update --- pkg/storage/s3.go | 73 ++++++++++++++---------------------------- pkg/storage/s3_test.go | 5 ++- 2 files changed, 26 insertions(+), 52 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 3dde849c7..94816eaea 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -428,41 +428,38 @@ type rangeInfo struct { size int64 } +// if endOffset > startOffset, should return reader for bytes in [startOffset, endOffset) func (rs *S3Storage) open( ctx context.Context, path string, startOffset, endOffset int64, -) (io.ReadCloser, *rangeInfo, error) { +) (io.ReadCloser, rangeInfo, error) { input := &s3.GetObjectInput{ Bucket: aws.String(rs.options.Bucket), Key: aws.String(rs.options.Prefix + path), } + // always set rangeOffset to fetch file size info + // s3 endOffset is inclusive var rangeOffset *string - if startOffset > 0 { - if endOffset > startOffset { - rangeOffset = aws.String(fmt.Sprintf("bytes=%d-%d", startOffset, endOffset)) - } else { - rangeOffset = aws.String(fmt.Sprintf("bytes=%d-", startOffset)) - } - input.Range = rangeOffset + if endOffset > startOffset { + rangeOffset = aws.String(fmt.Sprintf("bytes=%d-%d", startOffset, endOffset-1)) + } else { + rangeOffset = aws.String(fmt.Sprintf("bytes=%d-", startOffset)) } - + input.Range = rangeOffset result, err := rs.svc.GetObjectWithContext(ctx, input) if err != nil { - return nil, nil, err + return nil, rangeInfo{}, err } - var r *rangeInfo - if result.ContentRange != nil { - r, err = parseRangeInfo(result.ContentRange) - if err != nil { - return nil, nil, errors.Trace(err) - } + r, err := parseRangeInfo(result.ContentRange) + if err != nil { + return nil, rangeInfo{}, errors.Trace(err) } - if rangeOffset != nil && (r == nil || startOffset != r.start || (endOffset != 0 && endOffset != r.end)) { - return nil, nil, errors.Errorf("open file '%s' failed, expected range: %s, got: %v", + if startOffset != r.start || (endOffset != 0 && endOffset != r.end) { + return nil, r, errors.Errorf("open file '%s' failed, expected range: %s, got: %v", path, *rangeOffset, result.ContentRange) } @@ -473,28 +470,28 @@ var ( contentRangeRegex = regexp.MustCompile(`bytes (\d+)-(\d+)/(\d+)$`) ) -func parseRangeInfo(info *string) (*rangeInfo, error) { +func parseRangeInfo(info *string) (rangeInfo, error) { if info == nil || len(*info) == 0 { - return nil, nil + return rangeInfo{}, errors.New("ContentRange is empty") } subMatches := contentRangeRegex.FindStringSubmatch(*info) if len(subMatches) != 4 { - return nil, errors.Errorf("invalid content range: '%s'", *info) + return rangeInfo{}, errors.Errorf("invalid content range: '%s'", *info) } start, err := strconv.ParseInt(subMatches[1], 10, 64) if err != nil { - return nil, errors.Annotatef(err, "invalid start offset value '%s' in ContentRange '%s'", subMatches[1], *info) + return rangeInfo{}, errors.Annotatef(err, "invalid start offset value '%s' in ContentRange '%s'", subMatches[1], *info) } end, err := strconv.ParseInt(subMatches[2], 10, 64) if err != nil { - return nil, errors.Annotatef(err, "invalid end offset value '%s' in ContentRange '%s'", subMatches[2], *info) + return rangeInfo{}, errors.Annotatef(err, "invalid end offset value '%s' in ContentRange '%s'", subMatches[2], *info) } size, err := strconv.ParseInt(subMatches[3], 10, 64) if err != nil { - return nil, errors.Annotatef(err, "invalid size size value '%s' in ContentRange '%s'", subMatches[3], *info) + return rangeInfo{}, errors.Annotatef(err, "invalid size size value '%s' in ContentRange '%s'", subMatches[3], *info) } - return &rangeInfo{start: start, end: end, size: size}, nil + return rangeInfo{start: start, end: end, size: size}, nil } // s3ObjectReader wrap GetObjectOutput.Body and add the `Seek` method. @@ -503,7 +500,7 @@ type s3ObjectReader struct { name string reader io.ReadCloser pos int64 - rangeInfo *rangeInfo + rangeInfo rangeInfo // reader context used for seek ctx context.Context } @@ -520,24 +517,6 @@ func (r *s3ObjectReader) Close() error { return r.reader.Close() } -func (r *s3ObjectReader) fileSize(ctx context.Context) (int64, error) { - if r.rangeInfo == nil { - hio := &s3.HeadObjectInput{ - Bucket: aws.String(r.storage.options.Bucket), - Key: aws.String(r.storage.options.Prefix + r.name), - } - - hoo, err := r.storage.svc.HeadObjectWithContext(ctx, hio) - if err != nil { - return 0, errors.Trace(err) - } - - r.rangeInfo = &rangeInfo{start: 0, end: *hoo.ContentLength, size: *hoo.ContentLength} - } - - return r.rangeInfo.size, nil -} - // Seek implement the io.Seeker interface. func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { var realOffset int64 @@ -547,11 +526,7 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { case io.SeekCurrent: realOffset = r.pos + offset case io.SeekEnd: - fileSize, err := r.fileSize(r.ctx) - if err != nil { - return 0, errors.Trace(err) - } - realOffset = fileSize + offset + realOffset = r.rangeInfo.size + offset default: return 0, errors.Errorf("Seek: invalid whence '%d'", whence) } diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index 0e63a4e9c..6136a0978 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -456,11 +456,10 @@ func (r *testStorageSuite) TestS3Range(c *C) { contentRange := "bytes 0-9/443" ri, err := parseRangeInfo(&contentRange) c.Assert(err, IsNil) - c.Assert(*ri, Equals, rangeInfo{start: 0, end: 9, size: 443}) + c.Assert(ri, Equals, rangeInfo{start: 0, end: 9, size: 443}) ri, err = parseRangeInfo(nil) - c.Assert(err, IsNil) - c.Assert(ri, IsNil) + c.Assert(err, ErrorMatches, "ContentRange is empty") badRange := "bytes " _, err = parseRangeInfo(&badRange) From f34febe95c529d642af2437f070dc8956660e7c7 Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 28 Aug 2020 11:05:23 +0800 Subject: [PATCH 08/18] update --- pkg/storage/s3.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 94816eaea..0d059924f 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -458,7 +458,7 @@ func (rs *S3Storage) open( return nil, rangeInfo{}, errors.Trace(err) } - if startOffset != r.start || (endOffset != 0 && endOffset != r.end) { + if startOffset != r.start || (endOffset != 0 && endOffset != r.end+1) { return nil, r, errors.Errorf("open file '%s' failed, expected range: %s, got: %v", path, *rangeOffset, result.ContentRange) } From 7a333546919e59aa046b8f6b89b2579f87fec362 Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 28 Aug 2020 11:57:12 +0800 Subject: [PATCH 09/18] fix --- pkg/storage/s3.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 0d059924f..e668c0277 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -536,8 +536,8 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { } // if seek ahead no more than 64k, we discard these data - if realOffset > r.pos && offset-r.pos <= maxSkipOffsetByRead { - _, err := io.CopyN(ioutil.Discard, r, offset-r.pos) + if realOffset > r.pos && realOffset-r.pos <= maxSkipOffsetByRead { + _, err := io.CopyN(ioutil.Discard, r, realOffset-r.pos) if err != nil { return r.pos, err } From 7581dacb6490008b35f9ea2a01bf79ffc8bad203 Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 28 Aug 2020 14:42:01 +0800 Subject: [PATCH 10/18] loop read s3 reader --- pkg/storage/s3.go | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index e668c0277..9b010e435 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -507,9 +507,28 @@ type s3ObjectReader struct { // Read implement the io.Reader interface. func (r *s3ObjectReader) Read(p []byte) (n int, err error) { - n, err = r.reader.Read(p) - r.pos += int64(n) - return + maxCnt := r.rangeInfo.end + 1 - r.pos + if maxCnt > int64(len(p)) { + maxCnt = int64(len(p)) + } + var c int + // s3 api may not return enough data, so we need to loop fetch enough + for { + c, err = r.reader.Read(p[n:maxCnt]) + if err != nil { + // TODO: currently, if read to the end, s3 will return io.EOF + if err == io.EOF && r.pos+int64(c) == r.rangeInfo.end+1 { + err = nil + } else { + return + } + } + n += c + r.pos += int64(c) + if n >= int(maxCnt) { + return + } + } } // Close implement the io.Closer interface. From b44a9ad44b90dc02390b202bd720fe3eb7aaad9b Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 28 Aug 2020 14:52:13 +0800 Subject: [PATCH 11/18] fix lint --- pkg/storage/s3.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 9b010e435..8cbb7c391 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -569,12 +569,12 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { return 0, err } - newReader, rangeInfo, err := r.storage.open(r.ctx, r.name, realOffset, 0) + newReader, info, err := r.storage.open(r.ctx, r.name, realOffset, 0) if err != nil { return 0, err } r.reader = newReader - r.rangeInfo = rangeInfo + r.rangeInfo = info r.pos = realOffset return realOffset, nil } From c5bd76d9b59d38b6214ccb035fce7e88029c04bc Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 28 Aug 2020 15:29:05 +0800 Subject: [PATCH 12/18] fix more lint --- pkg/storage/s3.go | 2 +- pkg/storage/s3_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 8cbb7c391..60627a078 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -428,7 +428,7 @@ type rangeInfo struct { size int64 } -// if endOffset > startOffset, should return reader for bytes in [startOffset, endOffset) +// if endOffset > startOffset, should return reader for bytes in [startOffset, endOffset). func (rs *S3Storage) open( ctx context.Context, path string, diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index 6136a0978..07715e311 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -458,7 +458,7 @@ func (r *testStorageSuite) TestS3Range(c *C) { c.Assert(err, IsNil) c.Assert(ri, Equals, rangeInfo{start: 0, end: 9, size: 443}) - ri, err = parseRangeInfo(nil) + _, err = parseRangeInfo(nil) c.Assert(err, ErrorMatches, "ContentRange is empty") badRange := "bytes " From 0fa3c96726be6d94580a5fcf7f3ddc8c3c4eafa6 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 31 Aug 2020 13:38:56 +0800 Subject: [PATCH 13/18] fmt --- pkg/storage/s3.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 60627a078..074eb739b 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -481,15 +481,18 @@ func parseRangeInfo(info *string) (rangeInfo, error) { start, err := strconv.ParseInt(subMatches[1], 10, 64) if err != nil { - return rangeInfo{}, errors.Annotatef(err, "invalid start offset value '%s' in ContentRange '%s'", subMatches[1], *info) + return rangeInfo{}, errors.Annotatef(err, + "invalid start offset value '%s' in ContentRange '%s'", subMatches[1], *info) } end, err := strconv.ParseInt(subMatches[2], 10, 64) if err != nil { - return rangeInfo{}, errors.Annotatef(err, "invalid end offset value '%s' in ContentRange '%s'", subMatches[2], *info) + return rangeInfo{}, errors.Annotatef(err, + "invalid end offset value '%s' in ContentRange '%s'", subMatches[2], *info) } size, err := strconv.ParseInt(subMatches[3], 10, 64) if err != nil { - return rangeInfo{}, errors.Annotatef(err, "invalid size size value '%s' in ContentRange '%s'", subMatches[3], *info) + return rangeInfo{}, errors.Annotatef(err, + "invalid size size value '%s' in ContentRange '%s'", subMatches[3], *info) } return rangeInfo{start: start, end: end, size: size}, nil } From 3554ada53d725a50fbd5bf4ddedbf354a331eab2 Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 1 Sep 2020 10:12:33 +0800 Subject: [PATCH 14/18] use io.ReadFull instead of manual loop --- pkg/storage/s3.go | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 074eb739b..be7aa28cf 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -514,24 +514,7 @@ func (r *s3ObjectReader) Read(p []byte) (n int, err error) { if maxCnt > int64(len(p)) { maxCnt = int64(len(p)) } - var c int - // s3 api may not return enough data, so we need to loop fetch enough - for { - c, err = r.reader.Read(p[n:maxCnt]) - if err != nil { - // TODO: currently, if read to the end, s3 will return io.EOF - if err == io.EOF && r.pos+int64(c) == r.rangeInfo.end+1 { - err = nil - } else { - return - } - } - n += c - r.pos += int64(c) - if n >= int(maxCnt) { - return - } - } + return io.ReadFull(r, p[:maxCnt]) } // Close implement the io.Closer interface. From f98de75bc108e917bf8bc9e58682a27161a4298c Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 1 Sep 2020 10:19:22 +0800 Subject: [PATCH 15/18] fix --- pkg/storage/s3.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index be7aa28cf..ec91da359 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -514,7 +514,7 @@ func (r *s3ObjectReader) Read(p []byte) (n int, err error) { if maxCnt > int64(len(p)) { maxCnt = int64(len(p)) } - return io.ReadFull(r, p[:maxCnt]) + return io.ReadFull(r.reader, p[:maxCnt]) } // Close implement the io.Closer interface. From f00b7970af04adfabce7e55e3716455212b8ee7e Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 1 Sep 2020 11:05:56 +0800 Subject: [PATCH 16/18] fix --- pkg/storage/s3.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index ec91da359..c0432fc69 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -514,7 +514,9 @@ func (r *s3ObjectReader) Read(p []byte) (n int, err error) { if maxCnt > int64(len(p)) { maxCnt = int64(len(p)) } - return io.ReadFull(r.reader, p[:maxCnt]) + n, err = io.ReadFull(r.reader, p[:maxCnt]) + r.pos += int64(n) + return } // Close implement the io.Closer interface. From 52277ae8a8a35c7ad4b6debe3fe814561c7d62f2 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 3 Sep 2020 16:09:06 +0800 Subject: [PATCH 17/18] add a comment for the reason to put in struct --- pkg/storage/s3.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index c0432fc69..23d127de7 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -504,7 +504,7 @@ type s3ObjectReader struct { reader io.ReadCloser pos int64 rangeInfo rangeInfo - // reader context used for seek + // reader context used for implement `io.Seek` ctx context.Context } @@ -525,6 +525,8 @@ func (r *s3ObjectReader) Close() error { } // Seek implement the io.Seeker interface. +// +// Currently, tidb-lightning depends on this method to read parquet file for s3 storage. func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { var realOffset int64 switch whence { From e0082861f6414ee3d0a0f9a211fa38c3642656f0 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 3 Sep 2020 18:09:20 +0800 Subject: [PATCH 18/18] add a comment --- pkg/storage/s3.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 23d127de7..1239bb4c4 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -505,6 +505,8 @@ type s3ObjectReader struct { pos int64 rangeInfo rangeInfo // reader context used for implement `io.Seek` + // currently, lightning depends on package `xitongsys/parquet-go` to read parquet file and it needs `io.Seeker` + // See: https://github.com/xitongsys/parquet-go/blob/207a3cee75900b2b95213627409b7bac0f190bb3/source/source.go#L9-L10 ctx context.Context }