Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,17 @@ func (s *gcsStorage) Open(ctx context.Context, name string) (ReadSeekCloser, err
// The first argument is the file path that can be used in `Open`
// function; the second argument is the size in byte of the file determined
// by path.
func (s *gcsStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error {
func (s *gcsStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error {
// TODO, implement this if needed
panic("Unsupported Operation")
}

// CreateUploader implenments ExternalStorage interface.
func (s *gcsStorage) CreateUploader(ctx context.Context, name string) (Uploader, error) {
// TODO, implement this if needed
panic("gcs storage not support multi-upload")
}

func newGCSStorage(ctx context.Context, gcs *backup.GCS, sendCredential bool) (*gcsStorage, error) {
return newGCSStorageWithHTTPClient(ctx, gcs, nil, sendCredential)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (l *LocalStorage) FileExists(ctx context.Context, name string) (bool, error
// The first argument is the file path that can be used in `Open`
// function; the second argument is the size in byte of the file determined
// by path.
func (l *LocalStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error {
func (l *LocalStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error {
return filepath.Walk(l.base, func(path string, f os.FileInfo, err error) error {
if err != nil {
return errors.Trace(err)
Expand All @@ -56,6 +56,11 @@ func (l *LocalStorage) WalkDir(ctx context.Context, fn func(string, int64) error
})
}

// CreateUploader implenments ExternalStorage interface.
func (l *LocalStorage) CreateUploader(ctx context.Context, name string) (Uploader, error) {
panic("local storage not support multi-upload")
}

// Open a Reader by file name.
func (l *LocalStorage) Open(ctx context.Context, name string) (ReadSeekCloser, error) {
return os.Open(path.Join(l.base, name))
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@ func (*noopStorage) Open(ctx context.Context, name string) (ReadSeekCloser, erro
}

// WalkDir traverse all the files in a dir.
func (*noopStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error {
func (*noopStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error {
return nil
}

// CreateUploader implenments ExternalStorage interface.
func (*noopStorage) CreateUploader(ctx context.Context, name string) (Uploader, error) {
panic("noop storage not support multi-upload")
}

func newNoopStorage() *noopStorage {
return &noopStorage{}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func ParseBackend(rawURL string, options *BackendOptions) (*backup.StorageBacken
options = &BackendOptions{}
}
ExtractQueryParameters(u, &options.S3)
if err := options.S3.apply(s3); err != nil {
if err := options.S3.Apply(s3); err != nil {
return nil, err
}
return &backup.StorageBackend{Backend: &backup.StorageBackend_S3{S3: s3}}, nil
Expand Down
91 changes: 84 additions & 7 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ type s3Handlers interface {
ListObjectsWithContext(context.Context, *s3.ListObjectsInput, ...request.Option) (*s3.ListObjectsOutput, error)
HeadBucketWithContext(context.Context, *s3.HeadBucketInput, ...request.Option) (*s3.HeadBucketOutput, error)
WaitUntilObjectExistsWithContext(context.Context, *s3.HeadObjectInput, ...request.WaiterOption) error

ListObjectsV2WithContext(context.Context, *s3.ListObjectsV2Input, ...request.Option) (*s3.ListObjectsV2Output, error)
CreateMultipartUploadWithContext(
context.Context,
*s3.CreateMultipartUploadInput,
...request.Option) (*s3.CreateMultipartUploadOutput, error)
CompleteMultipartUploadWithContext(
context.Context,
*s3.CompleteMultipartUploadInput,
...request.Option) (*s3.CompleteMultipartUploadOutput, error)
UploadPartWithContext(context.Context, *s3.UploadPartInput, ...request.Option) (*s3.UploadPartOutput, error)
}

// S3Storage info for s3 storage.
Expand All @@ -55,6 +66,50 @@ type S3Storage struct {
options *backup.S3
}

// S3Uploader does multi-part upload to s3.
type S3Uploader struct {
svc s3Handlers
createOutput *s3.CreateMultipartUploadOutput
completeParts []*s3.CompletedPart
}

// UploadPart update partial data to s3, we should call CreateMultipartUpload to start it,
// and call CompleteMultipartUpload to finish it.
func (u *S3Uploader) UploadPart(ctx context.Context, data []byte) error {
partInput := &s3.UploadPartInput{
Body: bytes.NewReader(data),
Bucket: u.createOutput.Bucket,
Key: u.createOutput.Key,
PartNumber: aws.Int64(int64(len(u.completeParts) + 1)),
UploadId: u.createOutput.UploadId,
ContentLength: aws.Int64(int64(len(data))),
}

uploadResult, err := u.svc.UploadPartWithContext(ctx, partInput)
if err != nil {
return err
}
u.completeParts = append(u.completeParts, &s3.CompletedPart{
ETag: uploadResult.ETag,
PartNumber: partInput.PartNumber,
})
return nil
}

// CompleteUpload complete multi upload request.
func (u *S3Uploader) CompleteUpload(ctx context.Context) error {
completeInput := &s3.CompleteMultipartUploadInput{
Bucket: u.createOutput.Bucket,
Key: u.createOutput.Key,
UploadId: u.createOutput.UploadId,
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: u.completeParts,
},
}
_, err := u.svc.CompleteMultipartUploadWithContext(ctx, completeInput)
return err
}

// S3BackendOptions contains options for s3 storage.
type S3BackendOptions struct {
Endpoint string `json:"endpoint" toml:"endpoint"`
Expand All @@ -70,7 +125,8 @@ type S3BackendOptions struct {
UseAccelerateEndpoint bool `json:"use-accelerate-endpoint" toml:"use-accelerate-endpoint"`
}

func (options *S3BackendOptions) apply(s3 *backup.S3) error {
// Apply apply s3 options on backup.S3.
func (options *S3BackendOptions) Apply(s3 *backup.S3) error {
if options.Region == "" {
options.Region = "us-east-1"
}
Expand Down Expand Up @@ -161,8 +217,8 @@ func (options *S3BackendOptions) parseFromFlags(flags *pflag.FlagSet) error {
return nil
}

// newS3Storage initialize a new s3 storage for metadata.
func newS3Storage( // revive:disable-line:flag-parameter
// NewS3Storage initialize a new s3 storage for metadata.
func NewS3Storage( // revive:disable-line:flag-parameter
backend *backup.S3,
sendCredential bool,
) (*S3Storage, error) {
Expand Down Expand Up @@ -307,13 +363,17 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error)
// The first argument is the file path that can be used in `Open`
// function; the second argument is the size in byte of the file determined
// by path.
func (rs *S3Storage) WalkDir(ctx context.Context, fn func(string, int64) error) error {
func (rs *S3Storage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error {
var marker *string
prefix := rs.options.Prefix + dir
maxKeys := int64(1000)
if listCount > 0 {
maxKeys = listCount
}
req := &s3.ListObjectsInput{
Bucket: &rs.options.Bucket,
Prefix: &rs.options.Prefix,
MaxKeys: &maxKeys,
Bucket: aws.String(rs.options.Bucket),
Prefix: aws.String(prefix),
MaxKeys: aws.Int64(maxKeys),
}
for {
req.Marker = marker
Expand Down Expand Up @@ -440,3 +500,20 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) {
r.pos = realOffset
return realOffset, nil
}

// CreateUploader create multi upload request.
func (rs *S3Storage) CreateUploader(ctx context.Context, name string) (Uploader, error) {
input := &s3.CreateMultipartUploadInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + name),
}
resp, err := rs.svc.CreateMultipartUploadWithContext(ctx, input)
if err != nil {
return nil, err
}
return &S3Uploader{
svc: rs.svc,
createOutput: resp,
completeParts: make([]*s3.CompletedPart, 0, 128),
}, nil
}
16 changes: 16 additions & 0 deletions pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,3 +502,19 @@ func (c *mockS3Handler) WaitUntilObjectExistsWithContext(ctx context.Context,
input *s3.HeadObjectInput, opts ...request.WaiterOption) error {
return c.err
}
func (c *mockS3Handler) ListObjectsV2WithContext(context.Context,
*s3.ListObjectsV2Input, ...request.Option) (*s3.ListObjectsV2Output, error) {
return nil, c.err
}
func (c *mockS3Handler) CreateMultipartUploadWithContext(context.Context,
*s3.CreateMultipartUploadInput, ...request.Option) (*s3.CreateMultipartUploadOutput, error) {
return nil, c.err
}
func (c *mockS3Handler) CompleteMultipartUploadWithContext(context.Context,
*s3.CompleteMultipartUploadInput, ...request.Option) (*s3.CompleteMultipartUploadOutput, error) {
return nil, c.err
}
func (c *mockS3Handler) UploadPartWithContext(context.Context,
*s3.UploadPartInput, ...request.Option) (*s3.UploadPartOutput, error) {
return nil, c.err
}
17 changes: 15 additions & 2 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ type ReadSeekCloser interface {
io.Closer
}

// Uploader upload file with chunks.
type Uploader interface {
// UploadPart upload part of file data to storage
UploadPart(ctx context.Context, data []byte) error
// CompleteUpload make the upload data to a complete file
CompleteUpload(ctx context.Context) error
}

// ExternalStorage represents a kind of file system storage.
type ExternalStorage interface {
// Write file to storage
Expand All @@ -33,7 +41,12 @@ type ExternalStorage interface {
// The argument `path` is the file path that can be used in `Open`
// function; the argument `size` is the size in byte of the file determined
// by path.
WalkDir(ctx context.Context, fn func(path string, size int64) error) error
WalkDir(ctx context.Context, dir string, listCount int64, fn func(path string, size int64) error) error

// CreateUploader create a uploader that will upload chunks data to storage.
// It's design for s3 multi-part upload currently. e.g. cdc log backup use this to do multi part upload
// to avoid generate small fragment files.
CreateUploader(ctx context.Context, name string) (Uploader, error)
}

// Create creates ExternalStorage.
Expand All @@ -45,7 +58,7 @@ func Create(ctx context.Context, backend *backup.StorageBackend, sendCreds bool)
if backend.S3 == nil {
return nil, errors.New("s3 config not found")
}
return newS3Storage(backend.S3, sendCreds)
return NewS3Storage(backend.S3, sendCreds)
case *backup.StorageBackend_Noop:
return newNoopStorage(), nil
case *backup.StorageBackend_Gcs:
Expand Down