diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index c41fc2e09..862661ad3 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -135,11 +135,17 @@ func (s *gcsStorage) Open(ctx context.Context, name string) (ReadSeekCloser, err // The first argument is the file path that can be used in `Open` // function; the second argument is the size in byte of the file determined // by path. -func (s *gcsStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { +func (s *gcsStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error { // TODO, implement this if needed panic("Unsupported Operation") } +// CreateUploader implenments ExternalStorage interface. +func (s *gcsStorage) CreateUploader(ctx context.Context, name string) (Uploader, error) { + // TODO, implement this if needed + panic("gcs storage not support multi-upload") +} + func newGCSStorage(ctx context.Context, gcs *backup.GCS, sendCredential bool) (*gcsStorage, error) { return newGCSStorageWithHTTPClient(ctx, gcs, nil, sendCredential) } diff --git a/pkg/storage/local.go b/pkg/storage/local.go index 06eae91f9..a7e298339 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -42,7 +42,7 @@ func (l *LocalStorage) FileExists(ctx context.Context, name string) (bool, error // The first argument is the file path that can be used in `Open` // function; the second argument is the size in byte of the file determined // by path. -func (l *LocalStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { +func (l *LocalStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error { return filepath.Walk(l.base, func(path string, f os.FileInfo, err error) error { if err != nil { return errors.Trace(err) @@ -56,6 +56,11 @@ func (l *LocalStorage) WalkDir(ctx context.Context, fn func(string, int64) error }) } +// CreateUploader implenments ExternalStorage interface. +func (l *LocalStorage) CreateUploader(ctx context.Context, name string) (Uploader, error) { + panic("local storage not support multi-upload") +} + // Open a Reader by file name. func (l *LocalStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) { return os.Open(path.Join(l.base, name)) diff --git a/pkg/storage/noop.go b/pkg/storage/noop.go index 83c7384ed..42e60e3a6 100644 --- a/pkg/storage/noop.go +++ b/pkg/storage/noop.go @@ -29,10 +29,15 @@ func (*noopStorage) Open(ctx context.Context, name string) (ReadSeekCloser, erro } // WalkDir traverse all the files in a dir. -func (*noopStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error { +func (*noopStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error { return nil } +// CreateUploader implenments ExternalStorage interface. +func (*noopStorage) CreateUploader(ctx context.Context, name string) (Uploader, error) { + panic("noop storage not support multi-upload") +} + func newNoopStorage() *noopStorage { return &noopStorage{} } diff --git a/pkg/storage/parse.go b/pkg/storage/parse.go index fff518bfb..8e4f868e1 100644 --- a/pkg/storage/parse.go +++ b/pkg/storage/parse.go @@ -52,7 +52,7 @@ func ParseBackend(rawURL string, options *BackendOptions) (*backup.StorageBacken options = &BackendOptions{} } ExtractQueryParameters(u, &options.S3) - if err := options.S3.apply(s3); err != nil { + if err := options.S3.Apply(s3); err != nil { return nil, err } return &backup.StorageBackend{Backend: &backup.StorageBackend_S3{S3: s3}}, nil diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 429e8b8d8..312d78190 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -46,6 +46,17 @@ type s3Handlers interface { ListObjectsWithContext(context.Context, *s3.ListObjectsInput, ...request.Option) (*s3.ListObjectsOutput, error) HeadBucketWithContext(context.Context, *s3.HeadBucketInput, ...request.Option) (*s3.HeadBucketOutput, error) WaitUntilObjectExistsWithContext(context.Context, *s3.HeadObjectInput, ...request.WaiterOption) error + + ListObjectsV2WithContext(context.Context, *s3.ListObjectsV2Input, ...request.Option) (*s3.ListObjectsV2Output, error) + CreateMultipartUploadWithContext( + context.Context, + *s3.CreateMultipartUploadInput, + ...request.Option) (*s3.CreateMultipartUploadOutput, error) + CompleteMultipartUploadWithContext( + context.Context, + *s3.CompleteMultipartUploadInput, + ...request.Option) (*s3.CompleteMultipartUploadOutput, error) + UploadPartWithContext(context.Context, *s3.UploadPartInput, ...request.Option) (*s3.UploadPartOutput, error) } // S3Storage info for s3 storage. @@ -55,6 +66,50 @@ type S3Storage struct { options *backup.S3 } +// S3Uploader does multi-part upload to s3. +type S3Uploader struct { + svc s3Handlers + createOutput *s3.CreateMultipartUploadOutput + completeParts []*s3.CompletedPart +} + +// UploadPart update partial data to s3, we should call CreateMultipartUpload to start it, +// and call CompleteMultipartUpload to finish it. +func (u *S3Uploader) UploadPart(ctx context.Context, data []byte) error { + partInput := &s3.UploadPartInput{ + Body: bytes.NewReader(data), + Bucket: u.createOutput.Bucket, + Key: u.createOutput.Key, + PartNumber: aws.Int64(int64(len(u.completeParts) + 1)), + UploadId: u.createOutput.UploadId, + ContentLength: aws.Int64(int64(len(data))), + } + + uploadResult, err := u.svc.UploadPartWithContext(ctx, partInput) + if err != nil { + return err + } + u.completeParts = append(u.completeParts, &s3.CompletedPart{ + ETag: uploadResult.ETag, + PartNumber: partInput.PartNumber, + }) + return nil +} + +// CompleteUpload complete multi upload request. +func (u *S3Uploader) CompleteUpload(ctx context.Context) error { + completeInput := &s3.CompleteMultipartUploadInput{ + Bucket: u.createOutput.Bucket, + Key: u.createOutput.Key, + UploadId: u.createOutput.UploadId, + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: u.completeParts, + }, + } + _, err := u.svc.CompleteMultipartUploadWithContext(ctx, completeInput) + return err +} + // S3BackendOptions contains options for s3 storage. type S3BackendOptions struct { Endpoint string `json:"endpoint" toml:"endpoint"` @@ -70,7 +125,8 @@ type S3BackendOptions struct { UseAccelerateEndpoint bool `json:"use-accelerate-endpoint" toml:"use-accelerate-endpoint"` } -func (options *S3BackendOptions) apply(s3 *backup.S3) error { +// Apply apply s3 options on backup.S3. +func (options *S3BackendOptions) Apply(s3 *backup.S3) error { if options.Region == "" { options.Region = "us-east-1" } @@ -161,8 +217,8 @@ func (options *S3BackendOptions) parseFromFlags(flags *pflag.FlagSet) error { return nil } -// newS3Storage initialize a new s3 storage for metadata. -func newS3Storage( // revive:disable-line:flag-parameter +// NewS3Storage initialize a new s3 storage for metadata. +func NewS3Storage( // revive:disable-line:flag-parameter backend *backup.S3, sendCredential bool, ) (*S3Storage, error) { @@ -307,13 +363,17 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) // The first argument is the file path that can be used in `Open` // function; the second argument is the size in byte of the file determined // by path. -func (rs *S3Storage) WalkDir(ctx context.Context, fn func(string, int64) error) error { +func (rs *S3Storage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error { var marker *string + prefix := rs.options.Prefix + dir maxKeys := int64(1000) + if listCount > 0 { + maxKeys = listCount + } req := &s3.ListObjectsInput{ - Bucket: &rs.options.Bucket, - Prefix: &rs.options.Prefix, - MaxKeys: &maxKeys, + Bucket: aws.String(rs.options.Bucket), + Prefix: aws.String(prefix), + MaxKeys: aws.Int64(maxKeys), } for { req.Marker = marker @@ -440,3 +500,20 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) { r.pos = realOffset return realOffset, nil } + +// CreateUploader create multi upload request. +func (rs *S3Storage) CreateUploader(ctx context.Context, name string) (Uploader, error) { + input := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(rs.options.Bucket), + Key: aws.String(rs.options.Prefix + name), + } + resp, err := rs.svc.CreateMultipartUploadWithContext(ctx, input) + if err != nil { + return nil, err + } + return &S3Uploader{ + svc: rs.svc, + createOutput: resp, + completeParts: make([]*s3.CompletedPart, 0, 128), + }, nil +} diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index 16f4350a8..37c1687e3 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -502,3 +502,19 @@ func (c *mockS3Handler) WaitUntilObjectExistsWithContext(ctx context.Context, input *s3.HeadObjectInput, opts ...request.WaiterOption) error { return c.err } +func (c *mockS3Handler) ListObjectsV2WithContext(context.Context, + *s3.ListObjectsV2Input, ...request.Option) (*s3.ListObjectsV2Output, error) { + return nil, c.err +} +func (c *mockS3Handler) CreateMultipartUploadWithContext(context.Context, + *s3.CreateMultipartUploadInput, ...request.Option) (*s3.CreateMultipartUploadOutput, error) { + return nil, c.err +} +func (c *mockS3Handler) CompleteMultipartUploadWithContext(context.Context, + *s3.CompleteMultipartUploadInput, ...request.Option) (*s3.CompleteMultipartUploadOutput, error) { + return nil, c.err +} +func (c *mockS3Handler) UploadPartWithContext(context.Context, + *s3.UploadPartInput, ...request.Option) (*s3.UploadPartOutput, error) { + return nil, c.err +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 4e6ad6ca1..9c70b9917 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -17,6 +17,14 @@ type ReadSeekCloser interface { io.Closer } +// Uploader upload file with chunks. +type Uploader interface { + // UploadPart upload part of file data to storage + UploadPart(ctx context.Context, data []byte) error + // CompleteUpload make the upload data to a complete file + CompleteUpload(ctx context.Context) error +} + // ExternalStorage represents a kind of file system storage. type ExternalStorage interface { // Write file to storage @@ -33,7 +41,12 @@ type ExternalStorage interface { // The argument `path` is the file path that can be used in `Open` // function; the argument `size` is the size in byte of the file determined // by path. - WalkDir(ctx context.Context, fn func(path string, size int64) error) error + WalkDir(ctx context.Context, dir string, listCount int64, fn func(path string, size int64) error) error + + // CreateUploader create a uploader that will upload chunks data to storage. + // It's design for s3 multi-part upload currently. e.g. cdc log backup use this to do multi part upload + // to avoid generate small fragment files. + CreateUploader(ctx context.Context, name string) (Uploader, error) } // Create creates ExternalStorage. @@ -45,7 +58,7 @@ func Create(ctx context.Context, backend *backup.StorageBackend, sendCreds bool) if backend.S3 == nil { return nil, errors.New("s3 config not found") } - return newS3Storage(backend.S3, sendCreds) + return NewS3Storage(backend.S3, sendCreds) case *backup.StorageBackend_Noop: return newNoopStorage(), nil case *backup.StorageBackend_Gcs: