Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,25 @@ require (
github.com/google/uuid v1.1.1
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20200729012136-4e113ddee29e
github.com/pingcap/failpoint v0.0.0-20200603062251-b230c36c413c
github.com/pingcap/kvproto v0.0.0-20200803054707-ebd5de15093f
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20200818080353-7aaed8998596
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad
github.com/pingcap/parser v0.0.0-20200731033026-84f62115187c
github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200730093003-dc8c75cf7ca0
github.com/pingcap/tidb v1.1.0-beta.0.20200810064414-d81150394f9d
github.com/pingcap/parser v0.0.0-20200803072748-fdf66528323d
github.com/pingcap/pd/v4 v4.0.5-0.20200817114353-e465cafe8a91
github.com/pingcap/tidb v1.1.0-beta.0.20200820092836-c5b7658b0896
github.com/pingcap/tidb-tools v4.0.1+incompatible
github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/common v0.9.1
github.com/sirupsen/logrus v1.6.0
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5
go.etcd.io/bbolt v1.3.5 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/multierr v1.5.0
go.uber.org/zap v1.15.0
golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
google.golang.org/api v0.15.1
google.golang.org/grpc v1.26.0
)
280 changes: 72 additions & 208 deletions go.sum

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ func TestT(t *testing.T) {
}

func (r *testBackup) SetUpSuite(c *C) {
store := mocktikv.MustNewMVCCStore()
r.mockPDClient = mocktikv.NewPDClient(mocktikv.NewCluster(store))
r.mockPDClient = mocktikv.NewPDClient(mocktikv.NewCluster())
r.ctx, r.cancel = context.WithCancel(context.Background())
mockMgr := &conn.Mgr{}
mockMgr.SetPDClient(r.mockPDClient)
Expand Down
3 changes: 2 additions & 1 deletion pkg/mock/mock_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func NewCluster() (*Cluster, error) {
}()
})

client, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("")
cluster := mocktikv.NewCluster()
client, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, nil, "")
mocktikv.BootstrapWithSingleStore(cluster)
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,15 @@ func (c *pdClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetO
}

func (c *pdClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) {
regions, err := c.client.ScanRegions(ctx, key, endKey, limit)
regions, peers, err := c.client.ScanRegions(ctx, key, endKey, limit)
if err != nil {
return nil, err
}
regionInfos := make([]*RegionInfo, 0, len(regions))
for _, r := range regions {
for i, r := range regions {
regionInfos = append(regionInfos, &RegionInfo{
Region: r.Meta,
Leader: r.Leader,
Region: r,
Leader: peers[i],
})
}
return regionInfos, nil
Expand Down
8 changes: 7 additions & 1 deletion pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,17 @@ func (s *gcsStorage) Open(ctx context.Context, path string) (ReadSeekCloser, err
// The first argument is the file path that can be used in `Open`
// function; the second argument is the size in byte of the file determined
// by path.
func (s *gcsStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error {
func (s *gcsStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error {
// TODO, implement this if needed
panic("Unsupported Operation")
}

// CreateUploader implenments ExternalStorage interface.
func (s *gcsStorage) CreateUploader(ctx context.Context, name string) (Uploader, error) {
// TODO, implement this if needed
panic("gcs storage not support multi-upload")
}

func newGCSStorage(ctx context.Context, gcs *backup.GCS, sendCredential bool) (*gcsStorage, error) {
return newGCSStorageWithHTTPClient(ctx, gcs, nil, sendCredential)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (l *LocalStorage) FileExists(ctx context.Context, name string) (bool, error
// The first argument is the file path that can be used in `Open`
// function; the second argument is the size in byte of the file determined
// by path.
func (l *LocalStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error {
func (l *LocalStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the dir parameter means a specific sub dir to walk, the local implements should only walk the sub dir instead of ther whole base dir?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix it in next PR. because this is a cherry-pick PR

return filepath.Walk(l.base, func(path string, f os.FileInfo, err error) error {
if err != nil {
return errors.Trace(err)
Expand All @@ -57,6 +57,11 @@ func (l *LocalStorage) WalkDir(ctx context.Context, fn func(string, int64) error
})
}

// CreateUploader implenments ExternalStorage interface.
func (l *LocalStorage) CreateUploader(ctx context.Context, name string) (Uploader, error) {
panic("local storage not support multi-upload")
}

// Open a Reader by file path, path is a relative path to base path.
func (l *LocalStorage) Open(ctx context.Context, path string) (ReadSeekCloser, error) {
return os.Open(filepath.Join(l.base, path))
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@ func (*noopStorage) Open(ctx context.Context, path string) (ReadSeekCloser, erro
}

// WalkDir traverse all the files in a dir.
func (*noopStorage) WalkDir(ctx context.Context, fn func(string, int64) error) error {
func (*noopStorage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error {
return nil
}

// CreateUploader implenments ExternalStorage interface.
func (*noopStorage) CreateUploader(ctx context.Context, name string) (Uploader, error) {
panic("noop storage not support multi-upload")
}

func newNoopStorage() *noopStorage {
return &noopStorage{}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func ParseBackend(rawURL string, options *BackendOptions) (*backup.StorageBacken
options = &BackendOptions{}
}
ExtractQueryParameters(u, &options.S3)
if err := options.S3.apply(s3); err != nil {
if err := options.S3.Apply(s3); err != nil {
return nil, err
}
return &backup.StorageBackend{Backend: &backup.StorageBackend_S3{S3: s3}}, nil
Expand Down
91 changes: 84 additions & 7 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ type s3Handlers interface {
ListObjectsWithContext(context.Context, *s3.ListObjectsInput, ...request.Option) (*s3.ListObjectsOutput, error)
HeadBucketWithContext(context.Context, *s3.HeadBucketInput, ...request.Option) (*s3.HeadBucketOutput, error)
WaitUntilObjectExistsWithContext(context.Context, *s3.HeadObjectInput, ...request.WaiterOption) error

ListObjectsV2WithContext(context.Context, *s3.ListObjectsV2Input, ...request.Option) (*s3.ListObjectsV2Output, error)
CreateMultipartUploadWithContext(
context.Context,
*s3.CreateMultipartUploadInput,
...request.Option) (*s3.CreateMultipartUploadOutput, error)
CompleteMultipartUploadWithContext(
context.Context,
*s3.CompleteMultipartUploadInput,
...request.Option) (*s3.CompleteMultipartUploadOutput, error)
UploadPartWithContext(context.Context, *s3.UploadPartInput, ...request.Option) (*s3.UploadPartOutput, error)
}

// S3Storage info for s3 storage.
Expand All @@ -55,6 +66,50 @@ type S3Storage struct {
options *backup.S3
}

// S3Uploader does multi-part upload to s3.
type S3Uploader struct {
svc s3Handlers
createOutput *s3.CreateMultipartUploadOutput
completeParts []*s3.CompletedPart
}

// UploadPart update partial data to s3, we should call CreateMultipartUpload to start it,
// and call CompleteMultipartUpload to finish it.
func (u *S3Uploader) UploadPart(ctx context.Context, data []byte) error {
partInput := &s3.UploadPartInput{
Body: bytes.NewReader(data),
Bucket: u.createOutput.Bucket,
Key: u.createOutput.Key,
PartNumber: aws.Int64(int64(len(u.completeParts) + 1)),
UploadId: u.createOutput.UploadId,
ContentLength: aws.Int64(int64(len(data))),
}

uploadResult, err := u.svc.UploadPartWithContext(ctx, partInput)
if err != nil {
return err
}
u.completeParts = append(u.completeParts, &s3.CompletedPart{
ETag: uploadResult.ETag,
PartNumber: partInput.PartNumber,
})
return nil
}

// CompleteUpload complete multi upload request.
func (u *S3Uploader) CompleteUpload(ctx context.Context) error {
completeInput := &s3.CompleteMultipartUploadInput{
Bucket: u.createOutput.Bucket,
Key: u.createOutput.Key,
UploadId: u.createOutput.UploadId,
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: u.completeParts,
},
}
_, err := u.svc.CompleteMultipartUploadWithContext(ctx, completeInput)
return err
}

// S3BackendOptions contains options for s3 storage.
type S3BackendOptions struct {
Endpoint string `json:"endpoint" toml:"endpoint"`
Expand All @@ -70,7 +125,8 @@ type S3BackendOptions struct {
UseAccelerateEndpoint bool `json:"use-accelerate-endpoint" toml:"use-accelerate-endpoint"`
}

func (options *S3BackendOptions) apply(s3 *backup.S3) error {
// Apply apply s3 options on backup.S3.
func (options *S3BackendOptions) Apply(s3 *backup.S3) error {
if options.Region == "" {
options.Region = "us-east-1"
}
Expand Down Expand Up @@ -161,8 +217,8 @@ func (options *S3BackendOptions) parseFromFlags(flags *pflag.FlagSet) error {
return nil
}

// newS3Storage initialize a new s3 storage for metadata.
func newS3Storage( // revive:disable-line:flag-parameter
// NewS3Storage initialize a new s3 storage for metadata.
func NewS3Storage( // revive:disable-line:flag-parameter
backend *backup.S3,
sendCredential bool,
) (*S3Storage, error) {
Expand Down Expand Up @@ -307,13 +363,17 @@ func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error)
// The first argument is the file path that can be used in `Open`
// function; the second argument is the size in byte of the file determined
// by path.
func (rs *S3Storage) WalkDir(ctx context.Context, fn func(string, int64) error) error {
func (rs *S3Storage) WalkDir(ctx context.Context, dir string, listCount int64, fn func(string, int64) error) error {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the listCount parameter (or maybe also the dir param) is only used for s3 implement as a contextal value, I think It's better to put this kind of control param into a abstract ctx interface{} field. Maybe there are other similar contextal paramers need for gcs for example?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll move these parameters in ctx after this PR merged, because this PR is a cherry-pick PR.

var marker *string
prefix := rs.options.Prefix + dir
maxKeys := int64(1000)
if listCount > 0 {
maxKeys = listCount
}
req := &s3.ListObjectsInput{
Bucket: &rs.options.Bucket,
Prefix: &rs.options.Prefix,
MaxKeys: &maxKeys,
Bucket: aws.String(rs.options.Bucket),
Prefix: aws.String(prefix),
MaxKeys: aws.Int64(maxKeys),
}
for {
req.Marker = marker
Expand Down Expand Up @@ -440,3 +500,20 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) {
r.pos = realOffset
return realOffset, nil
}

// CreateUploader create multi upload request.
func (rs *S3Storage) CreateUploader(ctx context.Context, name string) (Uploader, error) {
input := &s3.CreateMultipartUploadInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + name),
}
resp, err := rs.svc.CreateMultipartUploadWithContext(ctx, input)
if err != nil {
return nil, err
}
return &S3Uploader{
svc: rs.svc,
createOutput: resp,
completeParts: make([]*s3.CompletedPart, 0, 128),
}, nil
}
16 changes: 16 additions & 0 deletions pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,3 +502,19 @@ func (c *mockS3Handler) WaitUntilObjectExistsWithContext(ctx context.Context,
input *s3.HeadObjectInput, opts ...request.WaiterOption) error {
return c.err
}
func (c *mockS3Handler) ListObjectsV2WithContext(context.Context,
*s3.ListObjectsV2Input, ...request.Option) (*s3.ListObjectsV2Output, error) {
return nil, c.err
}
func (c *mockS3Handler) CreateMultipartUploadWithContext(context.Context,
*s3.CreateMultipartUploadInput, ...request.Option) (*s3.CreateMultipartUploadOutput, error) {
return nil, c.err
}
func (c *mockS3Handler) CompleteMultipartUploadWithContext(context.Context,
*s3.CompleteMultipartUploadInput, ...request.Option) (*s3.CompleteMultipartUploadOutput, error) {
return nil, c.err
}
func (c *mockS3Handler) UploadPartWithContext(context.Context,
*s3.UploadPartInput, ...request.Option) (*s3.UploadPartOutput, error) {
return nil, c.err
}
17 changes: 15 additions & 2 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ type ReadSeekCloser interface {
io.Closer
}

// Uploader upload file with chunks.
type Uploader interface {
// UploadPart upload part of file data to storage
UploadPart(ctx context.Context, data []byte) error
// CompleteUpload make the upload data to a complete file
CompleteUpload(ctx context.Context) error
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, to name this as Close to be consistent with other io like interface is
a little better, anyway this is ok to me.

}

// ExternalStorage represents a kind of file system storage.
type ExternalStorage interface {
// Write file to storage
Expand All @@ -33,7 +41,12 @@ type ExternalStorage interface {
// The argument `path` is the file path that can be used in `Open`
// function; the argument `size` is the size in byte of the file determined
// by path.
WalkDir(ctx context.Context, fn func(path string, size int64) error) error
WalkDir(ctx context.Context, dir string, listCount int64, fn func(path string, size int64) error) error

// CreateUploader create a uploader that will upload chunks data to storage.
// It's design for s3 multi-part upload currently. e.g. cdc log backup use this to do multi part upload
// to avoid generate small fragment files.
CreateUploader(ctx context.Context, name string) (Uploader, error)
}

// Create creates ExternalStorage.
Expand All @@ -45,7 +58,7 @@ func Create(ctx context.Context, backend *backup.StorageBackend, sendCreds bool)
if backend.S3 == nil {
return nil, errors.New("s3 config not found")
}
return newS3Storage(backend.S3, sendCreds)
return NewS3Storage(backend.S3, sendCreds)
case *backup.StorageBackend_Noop:
return newNoopStorage(), nil
case *backup.StorageBackend_Gcs:
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (tbl *Table) NoChecksum() bool {

// NeedAutoID checks whether the table needs backing up with an autoid.
func NeedAutoID(tblInfo *model.TableInfo) bool {
hasRowID := !tblInfo.PKIsHandle && !tblInfo.IsCommonHandle
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

hasRowID := !tblInfo.PKIsHandle
hasAutoIncID := tblInfo.GetAutoIncrementColInfo() != nil
return hasRowID || hasAutoIncID
}
Expand Down
12 changes: 6 additions & 6 deletions tests/br_key_locked/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,25 @@ func (c *codecPDClient) ScanRegions(
startKey []byte,
endKey []byte,
limit int,
) ([]*pd.Region, error) {
) ([]*metapb.Region, []*metapb.Peer, error) {
startKey = codec.EncodeBytes(nil, startKey)
if len(endKey) > 0 {
endKey = codec.EncodeBytes(nil, endKey)
}

regions, err := c.Client.ScanRegions(ctx, startKey, endKey, limit)
regions, peers, err := c.Client.ScanRegions(ctx, startKey, endKey, limit)
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}
for _, region := range regions {
if region != nil {
err = decodeRegionMetaKey(region.Meta)
err = decodeRegionMetaKey(region)
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}
}
}
return regions, nil
return regions, peers, nil
}

func processRegionResult(region *pd.Region, err error) (*pd.Region, error) {
Expand Down
4 changes: 1 addition & 3 deletions tests/br_key_locked/locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"net/http"
"time"

"github.com/pingcap/tidb/kv"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -196,7 +194,7 @@ func (c *Locker) lockKeys(ctx context.Context, rowIDs []int64) error {

keyPrefix := tablecodec.GenTableRecordPrefix(c.tableID)
for _, rowID := range rowIDs {
key := tablecodec.EncodeRecordKey(keyPrefix, kv.IntHandle(rowID))
key := tablecodec.EncodeRecordKey(keyPrefix, rowID)
keys = append(keys, key)
}

Expand Down