From b3c16209b821913ff85d5374023e497664cbc1ad Mon Sep 17 00:00:00 2001 From: luancheng Date: Tue, 11 Aug 2020 17:37:28 +0800 Subject: [PATCH 1/9] add s3 api to adapt cdc log --- pkg/storage/parse.go | 2 +- pkg/storage/s3.go | 78 ++++++++++++++++++++++++++++++++++++++++-- pkg/storage/s3_test.go | 16 +++++++++ pkg/storage/storage.go | 2 +- 4 files changed, 93 insertions(+), 5 deletions(-) diff --git a/pkg/storage/parse.go b/pkg/storage/parse.go index fff518bfb..8e4f868e1 100644 --- a/pkg/storage/parse.go +++ b/pkg/storage/parse.go @@ -52,7 +52,7 @@ func ParseBackend(rawURL string, options *BackendOptions) (*backup.StorageBacken options = &BackendOptions{} } ExtractQueryParameters(u, &options.S3) - if err := options.S3.apply(s3); err != nil { + if err := options.S3.Apply(s3); err != nil { return nil, err } return &backup.StorageBackend{Backend: &backup.StorageBackend_S3{S3: s3}}, nil diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 429e8b8d8..35de56c6a 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -46,6 +46,17 @@ type s3Handlers interface { ListObjectsWithContext(context.Context, *s3.ListObjectsInput, ...request.Option) (*s3.ListObjectsOutput, error) HeadBucketWithContext(context.Context, *s3.HeadBucketInput, ...request.Option) (*s3.HeadBucketOutput, error) WaitUntilObjectExistsWithContext(context.Context, *s3.HeadObjectInput, ...request.WaiterOption) error + + ListObjectsV2WithContext(context.Context, *s3.ListObjectsV2Input, ...request.Option) (*s3.ListObjectsV2Output, error) + CreateMultipartUploadWithContext( + context.Context, + *s3.CreateMultipartUploadInput, + ...request.Option) (*s3.CreateMultipartUploadOutput, error) + CompleteMultipartUploadWithContext( + context.Context, + *s3.CompleteMultipartUploadInput, + ...request.Option) (*s3.CompleteMultipartUploadOutput, error) + UploadPartWithContext(context.Context, *s3.UploadPartInput, ...request.Option) (*s3.UploadPartOutput, error) } // S3Storage info for s3 storage. @@ -70,7 +81,7 @@ type S3BackendOptions struct { UseAccelerateEndpoint bool `json:"use-accelerate-endpoint" toml:"use-accelerate-endpoint"` } -func (options *S3BackendOptions) apply(s3 *backup.S3) error { +func (options *S3BackendOptions) Apply(s3 *backup.S3) error { if options.Region == "" { options.Region = "us-east-1" } @@ -161,8 +172,8 @@ func (options *S3BackendOptions) parseFromFlags(flags *pflag.FlagSet) error { return nil } -// newS3Storage initialize a new s3 storage for metadata. -func newS3Storage( // revive:disable-line:flag-parameter +// NewS3Storage initialize a new s3 storage for metadata. +func NewS3Storage( // revive:disable-line:flag-parameter backend *backup.S3, sendCredential bool, ) (*S3Storage, error) { @@ -440,3 +451,64 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { r.pos = realOffset return realOffset, nil } + +// ListObject list `maxkeys` objects with specify path. +func (rs *S3Storage) ListObject(ctx context.Context, name string, maxKeys int64) (*s3.ListObjectsV2Output, error) { + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(rs.options.Bucket), + Prefix: aws.String(rs.options.Prefix + name), + MaxKeys: aws.Int64(maxKeys), + } + return rs.svc.ListObjectsV2WithContext(ctx, input) +} + +// CreateMultipartUpload create multi upload request. +func (rs *S3Storage) CreateMultipartUpload(ctx context.Context, name string) (*s3.CreateMultipartUploadOutput, error) { + input := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(rs.options.Bucket), + Key: aws.String(rs.options.Prefix + name), + } + return rs.svc.CreateMultipartUploadWithContext(ctx, input) +} + +// CompleteMultipartUpload complete multi upload request. +func (rs *S3Storage) CompleteMultipartUpload( + ctx context.Context, + resp *s3.CreateMultipartUploadOutput, + completedParts []*s3.CompletedPart) (*s3.CompleteMultipartUploadOutput, error) { + completeInput := &s3.CompleteMultipartUploadInput{ + Bucket: resp.Bucket, + Key: resp.Key, + UploadId: resp.UploadId, + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: completedParts, + }, + } + return rs.svc.CompleteMultipartUploadWithContext(ctx, completeInput) +} + +// UploadPart update partial data to s3, we should call CreateMultipartUpload to start it, +// and call CompleteMultipartUpload to finish it. +func (rs *S3Storage) UploadPart( + ctx context.Context, + resp *s3.CreateMultipartUploadOutput, + fileBytes []byte, + partNumber int) (*s3.CompletedPart, error) { + partInput := &s3.UploadPartInput{ + Body: bytes.NewReader(fileBytes), + Bucket: resp.Bucket, + Key: resp.Key, + PartNumber: aws.Int64(int64(partNumber)), + UploadId: resp.UploadId, + ContentLength: aws.Int64(int64(len(fileBytes))), + } + + uploadResult, err := rs.svc.UploadPartWithContext(ctx, partInput) + if err != nil { + return nil, err + } + return &s3.CompletedPart{ + ETag: uploadResult.ETag, + PartNumber: aws.Int64(int64(partNumber)), + }, nil +} diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index 16f4350a8..37c1687e3 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -502,3 +502,19 @@ func (c *mockS3Handler) WaitUntilObjectExistsWithContext(ctx context.Context, input *s3.HeadObjectInput, opts ...request.WaiterOption) error { return c.err } +func (c *mockS3Handler) ListObjectsV2WithContext(context.Context, + *s3.ListObjectsV2Input, ...request.Option) (*s3.ListObjectsV2Output, error) { + return nil, c.err +} +func (c *mockS3Handler) CreateMultipartUploadWithContext(context.Context, + *s3.CreateMultipartUploadInput, ...request.Option) (*s3.CreateMultipartUploadOutput, error) { + return nil, c.err +} +func (c *mockS3Handler) CompleteMultipartUploadWithContext(context.Context, + *s3.CompleteMultipartUploadInput, ...request.Option) (*s3.CompleteMultipartUploadOutput, error) { + return nil, c.err +} +func (c *mockS3Handler) UploadPartWithContext(context.Context, + *s3.UploadPartInput, ...request.Option) (*s3.UploadPartOutput, error) { + return nil, c.err +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 4e6ad6ca1..d74b480cc 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -45,7 +45,7 @@ func Create(ctx context.Context, backend *backup.StorageBackend, sendCreds bool) if backend.S3 == nil { return nil, errors.New("s3 config not found") } - return newS3Storage(backend.S3, sendCreds) + return NewS3Storage(backend.S3, sendCreds) case *backup.StorageBackend_Noop: return newNoopStorage(), nil case *backup.StorageBackend_Gcs: From f23355df99fdda40e57cf7c7966a735d8bff71c9 Mon Sep 17 00:00:00 2001 From: luancheng Date: Wed, 12 Aug 2020 18:09:57 +0800 Subject: [PATCH 2/9] fix check --- pkg/storage/s3.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 35de56c6a..cc13359ed 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -81,6 +81,7 @@ type S3BackendOptions struct { UseAccelerateEndpoint bool `json:"use-accelerate-endpoint" toml:"use-accelerate-endpoint"` } +// Apply apply s3 options on backup.S3. func (options *S3BackendOptions) Apply(s3 *backup.S3) error { if options.Region == "" { options.Region = "us-east-1" From dc487ae18c0c6731e4960b73605d423416d28641 Mon Sep 17 00:00:00 2001 From: luancheng Date: Fri, 14 Aug 2020 14:28:59 +0800 Subject: [PATCH 3/9] refine s3 code --- pkg/storage/s3.go | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index cc13359ed..2209f92b3 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -319,13 +319,20 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) // The first argument is the file path that can be used in `Open` // function; the second argument is the size in byte of the file determined // by path. -func (rs *S3Storage) WalkDir(ctx context.Context, fn func(string, int64) error) error { +func (rs *S3Storage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error { var marker *string + prefix := rs.options.Prefix + if len(dir) > 0 { + prefix += dir + } maxKeys := int64(1000) + if listCount > 0 { + maxKeys = listCount + } req := &s3.ListObjectsInput{ - Bucket: &rs.options.Bucket, - Prefix: &rs.options.Prefix, - MaxKeys: &maxKeys, + Bucket: aws.String(rs.options.Bucket), + Prefix: aws.String(prefix), + MaxKeys: aws.Int64(maxKeys), } for { req.Marker = marker @@ -453,16 +460,6 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { return realOffset, nil } -// ListObject list `maxkeys` objects with specify path. -func (rs *S3Storage) ListObject(ctx context.Context, name string, maxKeys int64) (*s3.ListObjectsV2Output, error) { - input := &s3.ListObjectsV2Input{ - Bucket: aws.String(rs.options.Bucket), - Prefix: aws.String(rs.options.Prefix + name), - MaxKeys: aws.Int64(maxKeys), - } - return rs.svc.ListObjectsV2WithContext(ctx, input) -} - // CreateMultipartUpload create multi upload request. func (rs *S3Storage) CreateMultipartUpload(ctx context.Context, name string) (*s3.CreateMultipartUploadOutput, error) { input := &s3.CreateMultipartUploadInput{ From 6296a8b71e53c6e709dee208623a506d43cee4f8 Mon Sep 17 00:00:00 2001 From: luancheng Date: Fri, 14 Aug 2020 15:23:30 +0800 Subject: [PATCH 4/9] refine uploader --- pkg/storage/gcs.go | 7 +++- pkg/storage/local.go | 7 +++- pkg/storage/noop.go | 7 +++- pkg/storage/s3.go | 92 +++++++++++++++++++++++------------------- pkg/storage/storage.go | 15 ++++++- 5 files changed, 83 insertions(+), 45 deletions(-) diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index c41fc2e09..b8a7c3031 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -135,11 +135,16 @@ func (s *gcsStorage) Open(ctx context.Context, name string) (ReadSeekCloser, err // The first argument is the file path that can be used in `Open` // function; the second argument is the size in byte of the file determined // by path. -func (s *gcsStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { +func (s *gcsStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error { // TODO, implement this if needed panic("Unsupported Operation") } +// CreateUploader implenments ExternalStorage interface. +func (s *gcsStorage) CreateUploader(ctx context.Context, name string) (Uploader, error) { + return nil, nil +} + func newGCSStorage(ctx context.Context, gcs *backup.GCS, sendCredential bool) (*gcsStorage, error) { return newGCSStorageWithHTTPClient(ctx, gcs, nil, sendCredential) } diff --git a/pkg/storage/local.go b/pkg/storage/local.go index 06eae91f9..ba10a85a0 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -42,7 +42,7 @@ func (l *LocalStorage) FileExists(ctx context.Context, name string) (bool, error // The first argument is the file path that can be used in `Open` // function; the second argument is the size in byte of the file determined // by path. -func (l *LocalStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { +func (l *LocalStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error { return filepath.Walk(l.base, func(path string, f os.FileInfo, err error) error { if err != nil { return errors.Trace(err) @@ -56,6 +56,11 @@ func (l *LocalStorage) WalkDir(ctx context.Context, fn func(string, int64) error }) } +// CreateUploader implenments ExternalStorage interface. +func (l *LocalStorage) CreateUploader(ctx context.Context, name string) (Uploader, error) { + return nil, nil +} + // Open a Reader by file name. func (l *LocalStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { return os.Open(path.Join(l.base, name)) diff --git a/pkg/storage/noop.go b/pkg/storage/noop.go index 83c7384ed..51c7892ae 100644 --- a/pkg/storage/noop.go +++ b/pkg/storage/noop.go @@ -29,10 +29,15 @@ func (*noopStorage) Open(ctx context.Context, name string) (ReadSeekCloser, erro } // WalkDir traverse all the files in a dir. -func (*noopStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { +func (*noopStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error { return nil } +// CreateUploader implenments ExternalStorage interface. +func (*noopStorage) CreateUploader(ctx context.Context, name string) (Uploader, error) { + return nil, nil +} + func newNoopStorage() *noopStorage { return &noopStorage{} } diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 2209f92b3..801492ed6 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -66,6 +66,50 @@ type S3Storage struct { options *backup.S3 } +// S3Uploader does multi-part upload to s3 +type S3Uploader struct { + svc s3Handlers + createOutput *s3.CreateMultipartUploadOutput + completeParts []*s3.CompletedPart +} + +// UploadPart update partial data to s3, we should call CreateMultipartUpload to start it, +// and call CompleteMultipartUpload to finish it. +func (u *S3Uploader) UploadPart(ctx context.Context, data []byte, partNum int) error { + partInput := &s3.UploadPartInput{ + Body: bytes.NewReader(data), + Bucket: u.createOutput.Bucket, + Key: u.createOutput.Key, + PartNumber: aws.Int64(int64(partNum)), + UploadId: u.createOutput.UploadId, + ContentLength: aws.Int64(int64(len(data))), + } + + uploadResult, err := u.svc.UploadPartWithContext(ctx, partInput) + if err != nil { + return err + } + u.completeParts = append(u.completeParts, &s3.CompletedPart{ + ETag: uploadResult.ETag, + PartNumber: aws.Int64(int64(partNum)), + }) + return nil +} + +// CompleteUpload complete multi upload request. +func (u *S3Uploader) CompleteUpload(ctx context.Context) error { + completeInput := &s3.CompleteMultipartUploadInput{ + Bucket: u.createOutput.Bucket, + Key: u.createOutput.Key, + UploadId: u.createOutput.UploadId, + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: u.completeParts, + }, + } + _, err := u.svc.CompleteMultipartUploadWithContext(ctx, completeInput) + return err +} + // S3BackendOptions contains options for s3 storage. type S3BackendOptions struct { Endpoint string `json:"endpoint" toml:"endpoint"` @@ -460,53 +504,19 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { return realOffset, nil } -// CreateMultipartUpload create multi upload request. -func (rs *S3Storage) CreateMultipartUpload(ctx context.Context, name string) (*s3.CreateMultipartUploadOutput, error) { +// CreateUploader create multi upload request. +func (rs *S3Storage) CreateUploader(ctx context.Context, name string) (Uploader, error) { input := &s3.CreateMultipartUploadInput{ Bucket: aws.String(rs.options.Bucket), Key: aws.String(rs.options.Prefix + name), } - return rs.svc.CreateMultipartUploadWithContext(ctx, input) -} - -// CompleteMultipartUpload complete multi upload request. -func (rs *S3Storage) CompleteMultipartUpload( - ctx context.Context, - resp *s3.CreateMultipartUploadOutput, - completedParts []*s3.CompletedPart) (*s3.CompleteMultipartUploadOutput, error) { - completeInput := &s3.CompleteMultipartUploadInput{ - Bucket: resp.Bucket, - Key: resp.Key, - UploadId: resp.UploadId, - MultipartUpload: &s3.CompletedMultipartUpload{ - Parts: completedParts, - }, - } - return rs.svc.CompleteMultipartUploadWithContext(ctx, completeInput) -} - -// UploadPart update partial data to s3, we should call CreateMultipartUpload to start it, -// and call CompleteMultipartUpload to finish it. -func (rs *S3Storage) UploadPart( - ctx context.Context, - resp *s3.CreateMultipartUploadOutput, - fileBytes []byte, - partNumber int) (*s3.CompletedPart, error) { - partInput := &s3.UploadPartInput{ - Body: bytes.NewReader(fileBytes), - Bucket: resp.Bucket, - Key: resp.Key, - PartNumber: aws.Int64(int64(partNumber)), - UploadId: resp.UploadId, - ContentLength: aws.Int64(int64(len(fileBytes))), - } - - uploadResult, err := rs.svc.UploadPartWithContext(ctx, partInput) + resp, err := rs.svc.CreateMultipartUploadWithContext(ctx, input) if err != nil { return nil, err } - return &s3.CompletedPart{ - ETag: uploadResult.ETag, - PartNumber: aws.Int64(int64(partNumber)), + return &S3Uploader{ + svc: rs.svc, + createOutput: resp, + completeParts: make([]*s3.CompletedPart, 0, 128), }, nil } diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index d74b480cc..ff85efbf6 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -17,6 +17,14 @@ type ReadSeekCloser interface { io.Closer } +// Uploader upload file with chunks. +type Uploader interface { + // UploadPart upload part of file data to storage + UploadPart(ctx context.Context, data []byte, partNum int) error + // CompleteUpload make the upload data to a complete file + CompleteUpload(ctx context.Context) error +} + // ExternalStorage represents a kind of file system storage. type ExternalStorage interface { // Write file to storage @@ -33,7 +41,12 @@ type ExternalStorage interface { // The argument `path` is the file path that can be used in `Open` // function; the argument `size` is the size in byte of the file determined // by path. - WalkDir(ctx context.Context, fn func(path string, size int64) error) error + WalkDir(ctx context.Context, dir string, listCount int64, fn func(path string, size int64) error) error + + // CreateUploader create a uploader that will upload chunks data to storage. + // It's design for s3 multi-part upload currently. e.g. cdc log backup use this to do multi part upload + // to avoid generate small fragment files. + CreateUploader(ctx context.Context, name string) (Uploader, error) } // Create creates ExternalStorage. From d2e93b5f05f4733f1d0775382c394d9873805b86 Mon Sep 17 00:00:00 2001 From: luancheng Date: Fri, 14 Aug 2020 15:25:51 +0800 Subject: [PATCH 5/9] fix build --- pkg/storage/s3.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 801492ed6..285bded8e 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -66,7 +66,7 @@ type S3Storage struct { options *backup.S3 } -// S3Uploader does multi-part upload to s3 +// S3Uploader does multi-part upload to s3. type S3Uploader struct { svc s3Handlers createOutput *s3.CreateMultipartUploadOutput From ea0fe80f53f3f052bdc367073fad9c24b26a14c8 Mon Sep 17 00:00:00 2001 From: luancheng Date: Fri, 14 Aug 2020 19:23:21 +0800 Subject: [PATCH 6/9] address comment --- pkg/storage/gcs.go | 3 ++- pkg/storage/local.go | 2 +- pkg/storage/noop.go | 2 +- pkg/storage/s3.go | 6 +++--- pkg/storage/storage.go | 2 +- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index b8a7c3031..862661ad3 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -142,7 +142,8 @@ func (s *gcsStorage) WalkDir(ctx context.Context, dir string, listCount int64, f // CreateUploader implenments ExternalStorage interface. func (s *gcsStorage) CreateUploader(ctx context.Context, name string) (Uploader, error) { - return nil, nil + // TODO, implement this if needed + panic("gcs storage not support multi-upload") } func newGCSStorage(ctx context.Context, gcs *backup.GCS, sendCredential bool) (*gcsStorage, error) { diff --git a/pkg/storage/local.go b/pkg/storage/local.go index ba10a85a0..a7e298339 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -58,7 +58,7 @@ func (l *LocalStorage) WalkDir(ctx context.Context, dir string, listCount int64, // CreateUploader implenments ExternalStorage interface. func (l *LocalStorage) CreateUploader(ctx context.Context, name string) (Uploader, error) { - return nil, nil + panic("local storage not support multi-upload") } // Open a Reader by file name. diff --git a/pkg/storage/noop.go b/pkg/storage/noop.go index 51c7892ae..42e60e3a6 100644 --- a/pkg/storage/noop.go +++ b/pkg/storage/noop.go @@ -35,7 +35,7 @@ func (*noopStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn // CreateUploader implenments ExternalStorage interface. func (*noopStorage) CreateUploader(ctx context.Context, name string) (Uploader, error) { - return nil, nil + panic("noop storage not support multi-upload") } func newNoopStorage() *noopStorage { diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 285bded8e..aeceee5e8 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -75,12 +75,12 @@ type S3Uploader struct { // UploadPart update partial data to s3, we should call CreateMultipartUpload to start it, // and call CompleteMultipartUpload to finish it. -func (u *S3Uploader) UploadPart(ctx context.Context, data []byte, partNum int) error { +func (u *S3Uploader) UploadPart(ctx context.Context, data []byte) error { partInput := &s3.UploadPartInput{ Body: bytes.NewReader(data), Bucket: u.createOutput.Bucket, Key: u.createOutput.Key, - PartNumber: aws.Int64(int64(partNum)), + PartNumber: aws.Int64(int64(len(u.completeParts))), UploadId: u.createOutput.UploadId, ContentLength: aws.Int64(int64(len(data))), } @@ -91,7 +91,7 @@ func (u *S3Uploader) UploadPart(ctx context.Context, data []byte, partNum int) e } u.completeParts = append(u.completeParts, &s3.CompletedPart{ ETag: uploadResult.ETag, - PartNumber: aws.Int64(int64(partNum)), + PartNumber: aws.Int64(int64(len(u.completeParts))), }) return nil } diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index ff85efbf6..9c70b9917 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -20,7 +20,7 @@ type ReadSeekCloser interface { // Uploader upload file with chunks. type Uploader interface { // UploadPart upload part of file data to storage - UploadPart(ctx context.Context, data []byte, partNum int) error + UploadPart(ctx context.Context, data []byte) error // CompleteUpload make the upload data to a complete file CompleteUpload(ctx context.Context) error } From fac1ea4a9cd31482f42c9ce6335aa3d9ade79470 Mon Sep 17 00:00:00 2001 From: luancheng Date: Mon, 17 Aug 2020 11:08:23 +0800 Subject: [PATCH 7/9] address comment --- pkg/storage/s3.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index aeceee5e8..bc36e60c7 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -80,7 +80,7 @@ func (u *S3Uploader) UploadPart(ctx context.Context, data []byte) error { Body: bytes.NewReader(data), Bucket: u.createOutput.Bucket, Key: u.createOutput.Key, - PartNumber: aws.Int64(int64(len(u.completeParts))), + PartNumber: aws.Int64(int64(len(u.completeParts)+1)), UploadId: u.createOutput.UploadId, ContentLength: aws.Int64(int64(len(data))), } @@ -91,7 +91,7 @@ func (u *S3Uploader) UploadPart(ctx context.Context, data []byte) error { } u.completeParts = append(u.completeParts, &s3.CompletedPart{ ETag: uploadResult.ETag, - PartNumber: aws.Int64(int64(len(u.completeParts))), + PartNumber: partInput.PartNumber, }) return nil } From 5c04f7798953f93bb37793942e4244b7507a01a3 Mon Sep 17 00:00:00 2001 From: luancheng Date: Mon, 17 Aug 2020 11:09:36 +0800 Subject: [PATCH 8/9] format code --- pkg/storage/s3.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index bc36e60c7..bf6ce2ed1 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -80,7 +80,7 @@ func (u *S3Uploader) UploadPart(ctx context.Context, data []byte) error { Body: bytes.NewReader(data), Bucket: u.createOutput.Bucket, Key: u.createOutput.Key, - PartNumber: aws.Int64(int64(len(u.completeParts)+1)), + PartNumber: aws.Int64(int64(len(u.completeParts) + 1)), UploadId: u.createOutput.UploadId, ContentLength: aws.Int64(int64(len(data))), } From 834beb31fc2afc6b7df96fd706bf2ee2397ec17b Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 17 Aug 2020 17:57:01 +0800 Subject: [PATCH 9/9] Update pkg/storage/s3.go Co-authored-by: kennytm --- pkg/storage/s3.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index bf6ce2ed1..312d78190 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -365,10 +365,7 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) // by path. func (rs *S3Storage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error { var marker *string - prefix := rs.options.Prefix - if len(dir) > 0 { - prefix += dir - } + prefix := rs.options.Prefix + dir maxKeys := int64(1000) if listCount > 0 { maxKeys = listCount