From 129859889d55e10a29abc225d93841225f9983b9 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Sat, 22 Feb 2020 15:32:54 +0800 Subject: [PATCH 1/7] *: unify Range and RangeTree Signed-off-by: Neil Shen --- cmd/validate.go | 7 +- pkg/backup/client.go | 29 ++--- pkg/backup/push.go | 8 +- pkg/backup/range.go | 73 ++++++++++++ pkg/backup/range_test.go | 148 ++++++++++++++++++++++++ pkg/backup/range_tree.go | 207 ---------------------------------- pkg/backup/range_tree_test.go | 190 ------------------------------- pkg/restore/range.go | 86 ++------------ pkg/restore/range_test.go | 28 ++--- pkg/restore/split.go | 6 +- pkg/restore/split_test.go | 14 ++- pkg/restore/util.go | 9 +- pkg/task/restore.go | 9 +- pkg/utils/rtree/rtree.go | 177 +++++++++++++++++++++++++++++ pkg/utils/rtree/rtree_test.go | 86 ++++++++++++++ 15 files changed, 552 insertions(+), 525 deletions(-) create mode 100644 pkg/backup/range.go create mode 100644 pkg/backup/range_test.go delete mode 100644 pkg/backup/range_tree.go delete mode 100644 pkg/backup/range_tree_test.go create mode 100644 pkg/utils/rtree/rtree.go create mode 100644 pkg/utils/rtree/rtree_test.go diff --git a/cmd/validate.go b/cmd/validate.go index 8bca7e553..32a53d3ac 100644 --- a/cmd/validate.go +++ b/cmd/validate.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/br/pkg/restore" "github.com/pingcap/br/pkg/task" "github.com/pingcap/br/pkg/utils" + "github.com/pingcap/br/pkg/utils/rtree" ) // NewValidateCommand return a debug subcommand. @@ -166,15 +167,15 @@ func newBackupMetaCommand() *cobra.Command { tables = append(tables, db.Tables...) } // Check if the ranges of files overlapped - rangeTree := restore.NewRangeTree() + rangeTree := rtree.NewRangeTree() for _, file := range files { - if out := rangeTree.InsertRange(restore.Range{ + if out := rangeTree.InsertRange(rtree.Range{ StartKey: file.GetStartKey(), EndKey: file.GetEndKey(), }); out != nil { log.Error( "file ranges overlapped", - zap.Stringer("out", out.(*restore.Range)), + zap.Stringer("out", out), zap.Stringer("file", file), ) } diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 6d6eff033..05f136010 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" "github.com/pingcap/br/pkg/utils" + "github.com/pingcap/br/pkg/utils/rtree" ) // ClientMgr manages connections needed by backup. @@ -179,13 +180,13 @@ func BuildBackupRangeAndSchema( storage kv.Storage, tableFilter *filter.Filter, backupTS uint64, -) ([]Range, *Schemas, error) { +) ([]rtree.Range, *Schemas, error) { info, err := dom.GetSnapshotInfoSchema(backupTS) if err != nil { return nil, nil, errors.Trace(err) } - ranges := make([]Range, 0) + ranges := make([]rtree.Range, 0) backupSchemas := newBackupSchemas() for _, dbInfo := range info.AllSchemas() { // skip system databases @@ -233,7 +234,7 @@ func BuildBackupRangeAndSchema( return nil, nil, err } for _, r := range tableRanges { - ranges = append(ranges, Range{ + ranges = append(ranges, rtree.Range{ StartKey: r.StartKey, EndKey: r.EndKey, }) @@ -295,7 +296,7 @@ func GetBackupDDLJobs(dom *domain.Domain, lastBackupTS, backupTS uint64) ([]*mod // BackupRanges make a backup of the given key ranges. func (bc *Client) BackupRanges( ctx context.Context, - ranges []Range, + ranges []rtree.Range, lastBackupTS uint64, backupTS uint64, rateLimit uint64, @@ -400,12 +401,12 @@ func (bc *Client) backupRange( } push := newPushDown(ctx, bc.mgr, len(allStores)) - var results RangeTree + var results rtree.RangeTree results, err = push.pushBackup(req, allStores, updateCh) if err != nil { return err } - log.Info("finish backup push down", zap.Int("Ok", results.len())) + log.Info("finish backup push down", zap.Int("Ok", results.Len())) // Find and backup remaining ranges. // TODO: test fine grained backup. @@ -422,14 +423,14 @@ func (bc *Client) backupRange( zap.Reflect("StartVersion", lastBackupTS), zap.Reflect("EndVersion", backupTS)) - results.tree.Ascend(func(i btree.Item) bool { - r := i.(*Range) + results.Ascend(func(i btree.Item) bool { + r := i.(*rtree.Range) bc.backupMeta.Files = append(bc.backupMeta.Files, r.Files...) return true }) // Check if there are duplicated files. - results.checkDupFiles() + checkDupFiles(&results) return nil } @@ -467,13 +468,13 @@ func (bc *Client) fineGrainedBackup( backupTS uint64, rateLimit uint64, concurrency uint32, - rangeTree RangeTree, + rangeTree rtree.RangeTree, updateCh chan<- struct{}, ) error { bo := tikv.NewBackoffer(ctx, backupFineGrainedMaxBackoff) for { // Step1, check whether there is any incomplete range - incomplete := rangeTree.getIncompleteRange(startKey, endKey) + incomplete := getIncompleteRange(&rangeTree, startKey, endKey) if len(incomplete) == 0 { return nil } @@ -481,7 +482,7 @@ func (bc *Client) fineGrainedBackup( // Step2, retry backup on incomplete range respCh := make(chan *backup.BackupResponse, 4) errCh := make(chan error, 4) - retry := make(chan Range, 4) + retry := make(chan rtree.Range, 4) max := &struct { ms int @@ -540,7 +541,7 @@ func (bc *Client) fineGrainedBackup( zap.Binary("StartKey", resp.StartKey), zap.Binary("EndKey", resp.EndKey), ) - rangeTree.put(resp.StartKey, resp.EndKey, resp.Files) + rangeTree.Put(resp.StartKey, resp.EndKey, resp.Files) // Update progress updateCh <- struct{}{} @@ -626,7 +627,7 @@ func onBackupResponse( func (bc *Client) handleFineGrained( ctx context.Context, bo *tikv.Backoffer, - rg Range, + rg rtree.Range, lastBackupTS uint64, backupTS uint64, rateLimit uint64, diff --git a/pkg/backup/push.go b/pkg/backup/push.go index 23c4f01d4..88a4b24e7 100644 --- a/pkg/backup/push.go +++ b/pkg/backup/push.go @@ -9,6 +9,8 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "go.uber.org/zap" + + "github.com/pingcap/br/pkg/utils/rtree" ) // pushDown warps a backup task. @@ -35,9 +37,9 @@ func (push *pushDown) pushBackup( req backup.BackupRequest, stores []*metapb.Store, updateCh chan<- struct{}, -) (RangeTree, error) { +) (rtree.RangeTree, error) { // Push down backup tasks to all tikv instances. - res := newRangeTree() + res := rtree.NewRangeTree() wg := new(sync.WaitGroup) for _, s := range stores { storeID := s.GetId() @@ -82,7 +84,7 @@ func (push *pushDown) pushBackup( } if resp.GetError() == nil { // None error means range has been backuped successfully. - res.put( + res.Put( resp.GetStartKey(), resp.GetEndKey(), resp.GetFiles()) // Update progress diff --git a/pkg/backup/range.go b/pkg/backup/range.go new file mode 100644 index 000000000..fd568a47b --- /dev/null +++ b/pkg/backup/range.go @@ -0,0 +1,73 @@ +package backup + +import ( + "bytes" + "encoding/hex" + + "github.com/google/btree" + "github.com/pingcap/log" + "go.uber.org/zap" + + "github.com/pingcap/br/pkg/utils/rtree" +) + +func getIncompleteRange( + rangeTree *rtree.RangeTree, startKey, endKey []byte, +) []rtree.Range { + if len(startKey) != 0 && bytes.Equal(startKey, endKey) { + return []rtree.Range{} + } + incomplete := make([]rtree.Range, 0, 64) + requsetRange := rtree.Range{StartKey: startKey, EndKey: endKey} + lastEndKey := startKey + pviot := &rtree.Range{StartKey: startKey} + if first := rangeTree.Find(pviot); first != nil { + pviot.StartKey = first.StartKey + } + rangeTree.AscendGreaterOrEqual(pviot, func(i btree.Item) bool { + rg := i.(*rtree.Range) + if bytes.Compare(lastEndKey, rg.StartKey) < 0 { + start, end, isIntersect := + requsetRange.Intersect(lastEndKey, rg.StartKey) + if isIntersect { + // There is a gap between the last item and the current item. + incomplete = + append(incomplete, rtree.Range{StartKey: start, EndKey: end}) + } + } + lastEndKey = rg.EndKey + return len(endKey) == 0 || bytes.Compare(rg.EndKey, endKey) < 0 + }) + + // Check whether we need append the last range + if !bytes.Equal(lastEndKey, endKey) && len(lastEndKey) != 0 && + (len(endKey) == 0 || bytes.Compare(lastEndKey, endKey) < 0) { + start, end, isIntersect := requsetRange.Intersect(lastEndKey, endKey) + if isIntersect { + incomplete = + append(incomplete, rtree.Range{StartKey: start, EndKey: end}) + } + } + return incomplete +} + +func checkDupFiles(rangeTree *rtree.RangeTree) { + // Name -> SHA256 + files := make(map[string][]byte) + rangeTree.Ascend(func(i btree.Item) bool { + rg := i.(*rtree.Range) + for _, f := range rg.Files { + old, ok := files[f.Name] + if ok { + log.Error("dup file", + zap.String("Name", f.Name), + zap.String("SHA256_1", hex.EncodeToString(old)), + zap.String("SHA256_2", hex.EncodeToString(f.Sha256)), + ) + } else { + files[f.Name] = f.Sha256 + } + } + return true + }) +} diff --git a/pkg/backup/range_test.go b/pkg/backup/range_test.go new file mode 100644 index 000000000..b9b94a413 --- /dev/null +++ b/pkg/backup/range_test.go @@ -0,0 +1,148 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package backup + +import ( + "fmt" + "testing" + + . "github.com/pingcap/check" + + "github.com/pingcap/br/pkg/utils/rtree" +) + +var _ = Suite(&testRangeTreeSuite{}) + +type testRangeTreeSuite struct{} + +func newRange(start, end []byte) *rtree.Range { + return &rtree.Range{ + StartKey: start, + EndKey: end, + } +} + +func (s *testRangeTreeSuite) TestRangeTree(c *C) { + rangeTree := rtree.NewRangeTree() + c.Assert(rangeTree.Get(newRange([]byte(""), []byte(""))), IsNil) + + search := func(key []byte) *rtree.Range { + rg := rangeTree.Get(newRange(key, []byte(""))) + if rg == nil { + return nil + } + return rg.(*rtree.Range) + } + assertIncomplete := func(startKey, endKey []byte, ranges []rtree.Range) { + incomplete := getIncompleteRange(&rangeTree, startKey, endKey) + c.Logf("%#v %#v\n%#v\n%#v\n", startKey, endKey, incomplete, ranges) + c.Assert(len(incomplete), Equals, len(ranges)) + for idx, rg := range incomplete { + c.Assert(rg.StartKey, DeepEquals, ranges[idx].StartKey) + c.Assert(rg.EndKey, DeepEquals, ranges[idx].EndKey) + } + } + assertAllComplete := func() { + for s := 0; s < 0xfe; s++ { + for e := s + 1; e < 0xff; e++ { + start := []byte{byte(s)} + end := []byte{byte(e)} + assertIncomplete(start, end, []rtree.Range{}) + } + } + } + + range0 := newRange([]byte(""), []byte("a")) + rangeA := newRange([]byte("a"), []byte("b")) + rangeB := newRange([]byte("b"), []byte("c")) + rangeC := newRange([]byte("c"), []byte("d")) + rangeD := newRange([]byte("d"), []byte("")) + + rangeTree.Update(*rangeA) + c.Assert(rangeTree.Len(), Equals, 1) + assertIncomplete([]byte("a"), []byte("b"), []rtree.Range{}) + assertIncomplete([]byte(""), []byte(""), + []rtree.Range{ + {StartKey: []byte(""), EndKey: []byte("a")}, + {StartKey: []byte("b"), EndKey: []byte("")}, + }) + + rangeTree.Update(*rangeC) + c.Assert(rangeTree.Len(), Equals, 2) + assertIncomplete([]byte("a"), []byte("c"), []rtree.Range{ + {StartKey: []byte("b"), EndKey: []byte("c")}, + }) + assertIncomplete([]byte("b"), []byte("c"), []rtree.Range{ + {StartKey: []byte("b"), EndKey: []byte("c")}, + }) + assertIncomplete([]byte(""), []byte(""), + []rtree.Range{ + {StartKey: []byte(""), EndKey: []byte("a")}, + {StartKey: []byte("b"), EndKey: []byte("c")}, + {StartKey: []byte("d"), EndKey: []byte("")}, + }) + + c.Assert(search([]byte{}), IsNil) + c.Assert(search([]byte("a")), DeepEquals, rangeA) + c.Assert(search([]byte("b")), IsNil) + c.Assert(search([]byte("c")), DeepEquals, rangeC) + c.Assert(search([]byte("d")), IsNil) + + rangeTree.Update(*rangeB) + c.Assert(rangeTree.Len(), Equals, 3) + c.Assert(search([]byte("b")), DeepEquals, rangeB) + assertIncomplete([]byte(""), []byte(""), + []rtree.Range{ + {StartKey: []byte(""), EndKey: []byte("a")}, + {StartKey: []byte("d"), EndKey: []byte("")}, + }) + + rangeTree.Update(*rangeD) + c.Assert(rangeTree.Len(), Equals, 4) + c.Assert(search([]byte("d")), DeepEquals, rangeD) + assertIncomplete([]byte(""), []byte(""), []rtree.Range{ + {StartKey: []byte(""), EndKey: []byte("a")}, + }) + + // None incomplete for any range after insert range 0 + rangeTree.Update(*range0) + c.Assert(rangeTree.Len(), Equals, 5) + + // Overwrite range B and C. + rangeBD := newRange([]byte("b"), []byte("d")) + rangeTree.Update(*rangeBD) + c.Assert(rangeTree.Len(), Equals, 4) + assertAllComplete() + + // Overwrite range BD, c-d should be empty + rangeTree.Update(*rangeB) + c.Assert(rangeTree.Len(), Equals, 4) + assertIncomplete([]byte(""), []byte(""), []rtree.Range{ + {StartKey: []byte("c"), EndKey: []byte("d")}, + }) + + rangeTree.Update(*rangeC) + c.Assert(rangeTree.Len(), Equals, 5) + assertAllComplete() +} + +func BenchmarkRangeTreeUpdate(b *testing.B) { + rangeTree := rtree.NewRangeTree() + for i := 0; i < b.N; i++ { + item := &rtree.Range{ + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1))} + rangeTree.Update(*item) + } +} diff --git a/pkg/backup/range_tree.go b/pkg/backup/range_tree.go deleted file mode 100644 index 4d4b3c695..000000000 --- a/pkg/backup/range_tree.go +++ /dev/null @@ -1,207 +0,0 @@ -package backup - -import ( - "bytes" - "encoding/hex" - - "github.com/google/btree" - "github.com/pingcap/kvproto/pkg/backup" - "github.com/pingcap/log" - "go.uber.org/zap" -) - -// Range represents a backup response. -type Range struct { - StartKey []byte - EndKey []byte - Files []*backup.File - Error *backup.Error -} - -func (rg *Range) intersect( - start, end []byte, -) (subStart, subEnd []byte, isIntersect bool) { - // empty mean the max end key - if len(rg.EndKey) != 0 && bytes.Compare(start, rg.EndKey) >= 0 { - isIntersect = false - return - } - if len(end) != 0 && bytes.Compare(end, rg.StartKey) <= 0 { - isIntersect = false - return - } - isIntersect = true - if bytes.Compare(start, rg.StartKey) >= 0 { - subStart = start - } else { - subStart = rg.StartKey - } - switch { - case len(end) == 0: - subEnd = rg.EndKey - case len(rg.EndKey) == 0: - subEnd = end - case bytes.Compare(end, rg.EndKey) < 0: - subEnd = end - default: - subEnd = rg.EndKey - } - return -} - -// contains check if the range contains the given key, [start, end) -func (rg *Range) contains(key []byte) bool { - start, end := rg.StartKey, rg.EndKey - return bytes.Compare(key, start) >= 0 && - (len(end) == 0 || bytes.Compare(key, end) < 0) -} - -// Less impls btree.Item -func (rg *Range) Less(than btree.Item) bool { - // rg.StartKey < than.StartKey - ta := than.(*Range) - return bytes.Compare(rg.StartKey, ta.StartKey) < 0 -} - -var _ btree.Item = &Range{} - -// RangeTree is the result of a backup task -type RangeTree struct { - tree *btree.BTree -} - -func newRangeTree() RangeTree { - return RangeTree{ - tree: btree.New(32), - } -} - -func (rangeTree *RangeTree) len() int { - return rangeTree.tree.Len() -} - -// find is a helper function to find an item that contains the range start -// key. -func (rangeTree *RangeTree) find(rg *Range) *Range { - var ret *Range - rangeTree.tree.DescendLessOrEqual(rg, func(i btree.Item) bool { - ret = i.(*Range) - return false - }) - - if ret == nil || !ret.contains(rg.StartKey) { - return nil - } - - return ret -} - -// getOverlaps gets the ranges which are overlapped with the specified range range. -func (rangeTree *RangeTree) getOverlaps(rg *Range) []*Range { - // note that find() gets the last item that is less or equal than the range. - // in the case: |_______a_______|_____b_____|___c___| - // new range is |______d______| - // find() will return Range of range_a - // and both startKey of range_a and range_b are less than endKey of range_d, - // thus they are regarded as overlapped ranges. - found := rangeTree.find(rg) - if found == nil { - found = rg - } - - var overlaps []*Range - rangeTree.tree.AscendGreaterOrEqual(found, func(i btree.Item) bool { - over := i.(*Range) - if len(rg.EndKey) > 0 && bytes.Compare(rg.EndKey, over.StartKey) <= 0 { - return false - } - overlaps = append(overlaps, over) - return true - }) - return overlaps -} - -func (rangeTree *RangeTree) update(rg *Range) { - overlaps := rangeTree.getOverlaps(rg) - // Range has backuped, overwrite overlapping range. - for _, item := range overlaps { - log.Info("delete overlapping range", - zap.Binary("StartKey", item.StartKey), - zap.Binary("EndKey", item.EndKey), - ) - rangeTree.tree.Delete(item) - } - rangeTree.tree.ReplaceOrInsert(rg) -} - -func (rangeTree *RangeTree) put( - startKey, endKey []byte, files []*backup.File, -) { - rg := &Range{ - StartKey: startKey, - EndKey: endKey, - Files: files, - } - rangeTree.update(rg) -} - -func (rangeTree *RangeTree) getIncompleteRange( - startKey, endKey []byte, -) []Range { - if len(startKey) != 0 && bytes.Equal(startKey, endKey) { - return []Range{} - } - incomplete := make([]Range, 0, 64) - requsetRange := Range{StartKey: startKey, EndKey: endKey} - lastEndKey := startKey - pviot := &Range{StartKey: startKey} - if first := rangeTree.find(pviot); first != nil { - pviot.StartKey = first.StartKey - } - rangeTree.tree.AscendGreaterOrEqual(pviot, func(i btree.Item) bool { - rg := i.(*Range) - if bytes.Compare(lastEndKey, rg.StartKey) < 0 { - start, end, isIntersect := - requsetRange.intersect(lastEndKey, rg.StartKey) - if isIntersect { - // There is a gap between the last item and the current item. - incomplete = - append(incomplete, Range{StartKey: start, EndKey: end}) - } - } - lastEndKey = rg.EndKey - return len(endKey) == 0 || bytes.Compare(rg.EndKey, endKey) < 0 - }) - - // Check whether we need append the last range - if !bytes.Equal(lastEndKey, endKey) && len(lastEndKey) != 0 && - (len(endKey) == 0 || bytes.Compare(lastEndKey, endKey) < 0) { - start, end, isIntersect := requsetRange.intersect(lastEndKey, endKey) - if isIntersect { - incomplete = - append(incomplete, Range{StartKey: start, EndKey: end}) - } - } - return incomplete -} - -func (rangeTree *RangeTree) checkDupFiles() { - // Name -> SHA256 - files := make(map[string][]byte) - rangeTree.tree.Ascend(func(i btree.Item) bool { - rg := i.(*Range) - for _, f := range rg.Files { - old, ok := files[f.Name] - if ok { - log.Error("dup file", - zap.String("Name", f.Name), - zap.String("SHA256_1", hex.EncodeToString(old)), - zap.String("SHA256_2", hex.EncodeToString(f.Sha256)), - ) - } else { - files[f.Name] = f.Sha256 - } - } - return true - }) -} diff --git a/pkg/backup/range_tree_test.go b/pkg/backup/range_tree_test.go deleted file mode 100644 index a7c2d1cd1..000000000 --- a/pkg/backup/range_tree_test.go +++ /dev/null @@ -1,190 +0,0 @@ -// Copyright 2016 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package backup - -import ( - "fmt" - "testing" - - . "github.com/pingcap/check" -) - -var _ = Suite(&testRangeTreeSuite{}) - -type testRangeTreeSuite struct{} - -func newRange(start, end []byte) *Range { - return &Range{ - StartKey: start, - EndKey: end, - } -} - -func (s *testRangeTreeSuite) TestRangeIntersect(c *C) { - rg := newRange([]byte("a"), []byte("c")) - - start, end, isIntersect := rg.intersect([]byte(""), []byte("")) - c.Assert(isIntersect, Equals, true) - c.Assert(start, DeepEquals, []byte("a")) - c.Assert(end, DeepEquals, []byte("c")) - - start, end, isIntersect = rg.intersect([]byte(""), []byte("a")) - c.Assert(isIntersect, Equals, false) - c.Assert(start, DeepEquals, []byte(nil)) - c.Assert(end, DeepEquals, []byte(nil)) - - start, end, isIntersect = rg.intersect([]byte(""), []byte("b")) - c.Assert(isIntersect, Equals, true) - c.Assert(start, DeepEquals, []byte("a")) - c.Assert(end, DeepEquals, []byte("b")) - - start, end, isIntersect = rg.intersect([]byte("a"), []byte("b")) - c.Assert(isIntersect, Equals, true) - c.Assert(start, DeepEquals, []byte("a")) - c.Assert(end, DeepEquals, []byte("b")) - - start, end, isIntersect = rg.intersect([]byte("aa"), []byte("b")) - c.Assert(isIntersect, Equals, true) - c.Assert(start, DeepEquals, []byte("aa")) - c.Assert(end, DeepEquals, []byte("b")) - - start, end, isIntersect = rg.intersect([]byte("b"), []byte("c")) - c.Assert(isIntersect, Equals, true) - c.Assert(start, DeepEquals, []byte("b")) - c.Assert(end, DeepEquals, []byte("c")) - - start, end, isIntersect = rg.intersect([]byte(""), []byte{1}) - c.Assert(isIntersect, Equals, false) - c.Assert(start, DeepEquals, []byte(nil)) - c.Assert(end, DeepEquals, []byte(nil)) - - start, end, isIntersect = rg.intersect([]byte("c"), []byte("")) - c.Assert(isIntersect, Equals, false) - c.Assert(start, DeepEquals, []byte(nil)) - c.Assert(end, DeepEquals, []byte(nil)) -} - -func (s *testRangeTreeSuite) TestRangeTree(c *C) { - rangeTree := newRangeTree() - c.Assert(rangeTree.tree.Get(newRange([]byte(""), []byte(""))), IsNil) - - search := func(key []byte) *Range { - rg := rangeTree.tree.Get(newRange(key, []byte(""))) - if rg == nil { - return nil - } - return rg.(*Range) - } - assertIncomplete := func(startKey, endKey []byte, ranges []Range) { - incomplete := rangeTree.getIncompleteRange(startKey, endKey) - c.Logf("%#v %#v\n%#v\n%#v\n", startKey, endKey, incomplete, ranges) - c.Assert(len(incomplete), Equals, len(ranges)) - for idx, rg := range incomplete { - c.Assert(rg.StartKey, DeepEquals, ranges[idx].StartKey) - c.Assert(rg.EndKey, DeepEquals, ranges[idx].EndKey) - } - } - assertAllComplete := func() { - for s := 0; s < 0xfe; s++ { - for e := s + 1; e < 0xff; e++ { - start := []byte{byte(s)} - end := []byte{byte(e)} - assertIncomplete(start, end, []Range{}) - } - } - } - - range0 := newRange([]byte(""), []byte("a")) - rangeA := newRange([]byte("a"), []byte("b")) - rangeB := newRange([]byte("b"), []byte("c")) - rangeC := newRange([]byte("c"), []byte("d")) - rangeD := newRange([]byte("d"), []byte("")) - - rangeTree.update(rangeA) - c.Assert(rangeTree.len(), Equals, 1) - assertIncomplete([]byte("a"), []byte("b"), []Range{}) - assertIncomplete([]byte(""), []byte(""), - []Range{ - {StartKey: []byte(""), EndKey: []byte("a")}, - {StartKey: []byte("b"), EndKey: []byte("")}, - }) - - rangeTree.update(rangeC) - c.Assert(rangeTree.len(), Equals, 2) - assertIncomplete([]byte("a"), []byte("c"), []Range{ - {StartKey: []byte("b"), EndKey: []byte("c")}, - }) - assertIncomplete([]byte("b"), []byte("c"), []Range{ - {StartKey: []byte("b"), EndKey: []byte("c")}, - }) - assertIncomplete([]byte(""), []byte(""), - []Range{ - {StartKey: []byte(""), EndKey: []byte("a")}, - {StartKey: []byte("b"), EndKey: []byte("c")}, - {StartKey: []byte("d"), EndKey: []byte("")}, - }) - - c.Assert(search([]byte{}), IsNil) - c.Assert(search([]byte("a")), Equals, rangeA) - c.Assert(search([]byte("b")), IsNil) - c.Assert(search([]byte("c")), Equals, rangeC) - c.Assert(search([]byte("d")), IsNil) - - rangeTree.update(rangeB) - c.Assert(rangeTree.len(), Equals, 3) - c.Assert(search([]byte("b")), Equals, rangeB) - assertIncomplete([]byte(""), []byte(""), - []Range{ - {StartKey: []byte(""), EndKey: []byte("a")}, - {StartKey: []byte("d"), EndKey: []byte("")}, - }) - - rangeTree.update(rangeD) - c.Assert(rangeTree.len(), Equals, 4) - c.Assert(search([]byte("d")), Equals, rangeD) - assertIncomplete([]byte(""), []byte(""), []Range{ - {StartKey: []byte(""), EndKey: []byte("a")}, - }) - - // None incomplete for any range after insert range 0 - rangeTree.update(range0) - c.Assert(rangeTree.len(), Equals, 5) - - // Overwrite range B and C. - rangeBD := newRange([]byte("b"), []byte("d")) - rangeTree.update(rangeBD) - c.Assert(rangeTree.len(), Equals, 4) - assertAllComplete() - - // Overwrite range BD, c-d should be empty - rangeTree.update(rangeB) - c.Assert(rangeTree.len(), Equals, 4) - assertIncomplete([]byte(""), []byte(""), []Range{ - {StartKey: []byte("c"), EndKey: []byte("d")}, - }) - - rangeTree.update(rangeC) - c.Assert(rangeTree.len(), Equals, 5) - assertAllComplete() -} - -func BenchmarkRangeTreeUpdate(b *testing.B) { - rangeTree := newRangeTree() - for i := 0; i < b.N; i++ { - item := &Range{ - StartKey: []byte(fmt.Sprintf("%20d", i)), - EndKey: []byte(fmt.Sprintf("%20d", i+1))} - rangeTree.update(item) - } -} diff --git a/pkg/restore/range.go b/pkg/restore/range.go index f3914539e..debad213d 100644 --- a/pkg/restore/range.go +++ b/pkg/restore/range.go @@ -1,9 +1,6 @@ package restore import ( - "bytes" - "fmt" - "github.com/google/btree" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/import_sstpb" @@ -11,35 +8,13 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/tablecodec" "go.uber.org/zap" -) - -// Range represents a range of keys. -type Range struct { - StartKey []byte - EndKey []byte -} -// String formats a range to a string -func (r *Range) String() string { - return fmt.Sprintf("[%x %x]", r.StartKey, r.EndKey) -} - -// Less compares a range with a btree.Item -func (r *Range) Less(than btree.Item) bool { - t := than.(*Range) - return len(r.EndKey) != 0 && bytes.Compare(r.EndKey, t.StartKey) <= 0 -} - -// contains returns if a key is included in the range. -func (r *Range) contains(key []byte) bool { - start, end := r.StartKey, r.EndKey - return bytes.Compare(key, start) >= 0 && - (len(end) == 0 || bytes.Compare(key, end) < 0) -} + "github.com/pingcap/br/pkg/utils/rtree" +) // sortRanges checks if the range overlapped and sort them -func sortRanges(ranges []Range, rewriteRules *RewriteRules) ([]Range, error) { - rangeTree := NewRangeTree() +func sortRanges(ranges []rtree.Range, rewriteRules *RewriteRules) ([]rtree.Range, error) { + rangeTree := rtree.NewRangeTree() for _, rg := range ranges { if rewriteRules != nil { startID := tablecodec.DecodeTableID(rg.StartKey) @@ -77,64 +52,17 @@ func sortRanges(ranges []Range, rewriteRules *RewriteRules) ([]Range, error) { return nil, errors.Errorf("ranges overlapped: %s, %s", out, rg) } } - sortedRanges := make([]Range, 0, len(ranges)) - rangeTree.Ascend(func(rg *Range) bool { + sortedRanges := make([]rtree.Range, 0, len(ranges)) + rangeTree.Ascend(func(rg btree.Item) bool { if rg == nil { return false } - sortedRanges = append(sortedRanges, *rg) + sortedRanges = append(sortedRanges, *rg.(*rtree.Range)) return true }) return sortedRanges, nil } -// RangeTree stores the ranges in an orderly manner. -// All the ranges it stored do not overlap. -type RangeTree struct { - tree *btree.BTree -} - -// NewRangeTree returns a new RangeTree. -func NewRangeTree() *RangeTree { - return &RangeTree{tree: btree.New(32)} -} - -// Find returns nil or a range in the range tree -func (rt *RangeTree) Find(key []byte) *Range { - var ret *Range - r := &Range{ - StartKey: key, - } - rt.tree.DescendLessOrEqual(r, func(i btree.Item) bool { - ret = i.(*Range) - return false - }) - if ret == nil || !ret.contains(key) { - return nil - } - return ret -} - -// InsertRange inserts ranges into the range tree. -// it returns true if all ranges inserted successfully. -// it returns false if there are some overlapped ranges. -func (rt *RangeTree) InsertRange(rg Range) btree.Item { - return rt.tree.ReplaceOrInsert(&rg) -} - -// RangeIterator allows callers of Ascend to iterate in-order over portions of -// the tree. When this function returns false, iteration will stop and the -// associated Ascend function will immediately return. -type RangeIterator func(rg *Range) bool - -// Ascend calls the iterator for every value in the tree within [first, last], -// until the iterator returns false. -func (rt *RangeTree) Ascend(iterator RangeIterator) { - rt.tree.Ascend(func(i btree.Item) bool { - return iterator(i.(*Range)) - }) -} - // RegionInfo includes a region and the leader of the region. type RegionInfo struct { Region *metapb.Region diff --git a/pkg/restore/range_test.go b/pkg/restore/range_test.go index a9edc5b82..84e8e6ecd 100644 --- a/pkg/restore/range_test.go +++ b/pkg/restore/range_test.go @@ -6,6 +6,8 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/tidb/tablecodec" + + "github.com/pingcap/br/pkg/utils/rtree" ) type testRangeSuite struct{} @@ -21,8 +23,8 @@ var RangeEquals Checker = &rangeEquals{ } func (checker *rangeEquals) Check(params []interface{}, names []string) (result bool, error string) { - obtained := params[0].([]Range) - expected := params[1].([]Range) + obtained := params[0].([]rtree.Range) + expected := params[1].([]rtree.Range) if len(obtained) != len(expected) { return false, "" } @@ -44,20 +46,20 @@ func (s *testRangeSuite) TestSortRange(c *C) { Table: make([]*import_sstpb.RewriteRule, 0), Data: dataRules, } - ranges1 := []Range{ + ranges1 := []rtree.Range{ {append(tablecodec.GenTableRecordPrefix(1), []byte("aaa")...), - append(tablecodec.GenTableRecordPrefix(1), []byte("bbb")...)}, + append(tablecodec.GenTableRecordPrefix(1), []byte("bbb")...), nil}, } rs1, err := sortRanges(ranges1, rewriteRules) c.Assert(err, IsNil, Commentf("sort range1 failed: %v", err)) - c.Assert(rs1, RangeEquals, []Range{ + c.Assert(rs1, RangeEquals, []rtree.Range{ {append(tablecodec.GenTableRecordPrefix(4), []byte("aaa")...), - append(tablecodec.GenTableRecordPrefix(4), []byte("bbb")...)}, + append(tablecodec.GenTableRecordPrefix(4), []byte("bbb")...), nil}, }) - ranges2 := []Range{ + ranges2 := []rtree.Range{ {append(tablecodec.GenTableRecordPrefix(1), []byte("aaa")...), - append(tablecodec.GenTableRecordPrefix(2), []byte("bbb")...)}, + append(tablecodec.GenTableRecordPrefix(2), []byte("bbb")...), nil}, } _, err = sortRanges(ranges2, rewriteRules) c.Assert(err, ErrorMatches, ".*table id does not match.*") @@ -66,10 +68,10 @@ func (s *testRangeSuite) TestSortRange(c *C) { rewriteRules1 := initRewriteRules() rs3, err := sortRanges(ranges3, rewriteRules1) c.Assert(err, IsNil, Commentf("sort range1 failed: %v", err)) - c.Assert(rs3, RangeEquals, []Range{ - {[]byte("bbd"), []byte("bbf")}, - {[]byte("bbf"), []byte("bbj")}, - {[]byte("xxa"), []byte("xxe")}, - {[]byte("xxe"), []byte("xxz")}, + c.Assert(rs3, RangeEquals, []rtree.Range{ + {[]byte("bbd"), []byte("bbf"), nil}, + {[]byte("bbf"), []byte("bbj"), nil}, + {[]byte("xxa"), []byte("xxe"), nil}, + {[]byte("xxe"), []byte("xxz"), nil}, }) } diff --git a/pkg/restore/split.go b/pkg/restore/split.go index 378e256c6..e8c2ed523 100644 --- a/pkg/restore/split.go +++ b/pkg/restore/split.go @@ -13,6 +13,8 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" + + "github.com/pingcap/br/pkg/utils/rtree" ) // Constants for split retry machinery. @@ -54,7 +56,7 @@ type OnSplitFunc func(key [][]byte) // note: all ranges and rewrite rules must have raw key. func (rs *RegionSplitter) Split( ctx context.Context, - ranges []Range, + ranges []rtree.Range, rewriteRules *RewriteRules, onSplit OnSplitFunc, ) error { @@ -253,7 +255,7 @@ func (rs *RegionSplitter) splitAndScatterRegions( // getSplitKeys checks if the regions should be split by the new prefix of the rewrites rule and the end key of // the ranges, groups the split keys by region id -func getSplitKeys(rewriteRules *RewriteRules, ranges []Range, regions []*RegionInfo) map[uint64][][]byte { +func getSplitKeys(rewriteRules *RewriteRules, ranges []rtree.Range, regions []*RegionInfo) map[uint64][][]byte { splitKeyMap := make(map[uint64][][]byte) checkKeys := make([][]byte, 0) for _, rule := range rewriteRules.Table { diff --git a/pkg/restore/split_test.go b/pkg/restore/split_test.go index 3ace5b8c8..06fcd957a 100644 --- a/pkg/restore/split_test.go +++ b/pkg/restore/split_test.go @@ -12,6 +12,8 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/server/schedule/placement" "github.com/pingcap/tidb/util/codec" + + "github.com/pingcap/br/pkg/utils/rtree" ) type testClient struct { @@ -234,21 +236,21 @@ func initTestClient() *testClient { } // range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj) -func initRanges() []Range { - var ranges [4]Range - ranges[0] = Range{ +func initRanges() []rtree.Range { + var ranges [4]rtree.Range + ranges[0] = rtree.Range{ StartKey: []byte("aaa"), EndKey: []byte("aae"), } - ranges[1] = Range{ + ranges[1] = rtree.Range{ StartKey: []byte("aae"), EndKey: []byte("aaz"), } - ranges[2] = Range{ + ranges[2] = rtree.Range{ StartKey: []byte("ccd"), EndKey: []byte("ccf"), } - ranges[3] = Range{ + ranges[3] = rtree.Range{ StartKey: []byte("ccf"), EndKey: []byte("ccj"), } diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 64ccfab19..56850703c 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -19,6 +19,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/br/pkg/summary" + "github.com/pingcap/br/pkg/utils/rtree" ) var recordPrefixSep = []byte("_r") @@ -153,8 +154,8 @@ func getSSTMetaFromFile( func ValidateFileRanges( files []*backup.File, rewriteRules *RewriteRules, -) ([]Range, error) { - ranges := make([]Range, 0, len(files)) +) ([]rtree.Range, error) { + ranges := make([]rtree.Range, 0, len(files)) fileAppended := make(map[string]bool) for _, file := range files { @@ -173,7 +174,7 @@ func ValidateFileRanges( zap.Stringer("file", file)) return nil, errors.New("table ids dont match") } - ranges = append(ranges, Range{ + ranges = append(ranges, rtree.Range{ StartKey: file.GetStartKey(), EndKey: file.GetEndKey(), }) @@ -275,7 +276,7 @@ func truncateTS(key []byte) []byte { func SplitRanges( ctx context.Context, client *Client, - ranges []Range, + ranges []rtree.Range, rewriteRules *RewriteRules, updateCh chan<- struct{}, ) error { diff --git a/pkg/task/restore.go b/pkg/task/restore.go index bb00d189d..bc468acc6 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -131,6 +131,11 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf int64(len(ranges)+len(files)), !cfg.LogProgress) + removedSchedulers, err := restorePreWork(ctx, client, mgr) + if err != nil { + return err + } + err = restore.SplitRanges(ctx, client, ranges, rewriteRules, updateCh) if err != nil { log.Error("split regions failed", zap.Error(err)) @@ -144,10 +149,6 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } } - removedSchedulers, err := restorePreWork(ctx, client, mgr) - if err != nil { - return err - } err = client.RestoreFiles(files, rewriteRules, updateCh) // always run the post-work even on error, so we don't stuck in the import mode or emptied schedulers postErr := restorePostWork(ctx, client, mgr, removedSchedulers) diff --git a/pkg/utils/rtree/rtree.go b/pkg/utils/rtree/rtree.go new file mode 100644 index 000000000..d481d9c1a --- /dev/null +++ b/pkg/utils/rtree/rtree.go @@ -0,0 +1,177 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package rtree + +import ( + "bytes" + "fmt" + + "github.com/google/btree" + "github.com/pingcap/kvproto/pkg/backup" + "github.com/pingcap/log" + "go.uber.org/zap" +) + +// Range represents a backup response. +type Range struct { + StartKey []byte + EndKey []byte + Files []*backup.File + // Addition data attached in Range + // Attachment interface{} +} + +// String formats a range to a string +func (rg *Range) String() string { + return fmt.Sprintf("[%x %x]", rg.StartKey, rg.EndKey) +} + +// Intersect returns +func (rg *Range) Intersect( + start, end []byte, +) (subStart, subEnd []byte, isIntersect bool) { + // empty mean the max end key + if len(rg.EndKey) != 0 && bytes.Compare(start, rg.EndKey) >= 0 { + isIntersect = false + return + } + if len(end) != 0 && bytes.Compare(end, rg.StartKey) <= 0 { + isIntersect = false + return + } + isIntersect = true + if bytes.Compare(start, rg.StartKey) >= 0 { + subStart = start + } else { + subStart = rg.StartKey + } + switch { + case len(end) == 0: + subEnd = rg.EndKey + case len(rg.EndKey) == 0: + subEnd = end + case bytes.Compare(end, rg.EndKey) < 0: + subEnd = end + default: + subEnd = rg.EndKey + } + return +} + +// Contains check if the range contains the given key, [start, end) +func (rg *Range) Contains(key []byte) bool { + start, end := rg.StartKey, rg.EndKey + return bytes.Compare(key, start) >= 0 && + (len(end) == 0 || bytes.Compare(key, end) < 0) +} + +// Less impls btree.Item +func (rg *Range) Less(than btree.Item) bool { + // rg.StartKey < than.StartKey + ta := than.(*Range) + return bytes.Compare(rg.StartKey, ta.StartKey) < 0 +} + +var _ btree.Item = &Range{} + +// RangeTree is sorted tree for Ranges. +// All the ranges it stored do not overlap. +type RangeTree struct { + *btree.BTree +} + +// NewRangeTree returns an empty range tree. +func NewRangeTree() RangeTree { + return RangeTree{ + BTree: btree.New(32), + } +} + +// Find is a helper function to find an item that contains the range start +// key. +func (rangeTree *RangeTree) Find(rg *Range) *Range { + var ret *Range + rangeTree.DescendLessOrEqual(rg, func(i btree.Item) bool { + ret = i.(*Range) + return false + }) + + if ret == nil || !ret.Contains(rg.StartKey) { + return nil + } + + return ret +} + +// getOverlaps gets the ranges which are overlapped with the specified range range. +func (rangeTree *RangeTree) getOverlaps(rg *Range) []*Range { + // note that find() gets the last item that is less or equal than the range. + // in the case: |_______a_______|_____b_____|___c___| + // new range is |______d______| + // find() will return Range of range_a + // and both startKey of range_a and range_b are less than endKey of range_d, + // thus they are regarded as overlapped ranges. + found := rangeTree.Find(rg) + if found == nil { + found = rg + } + + var overlaps []*Range + rangeTree.AscendGreaterOrEqual(found, func(i btree.Item) bool { + over := i.(*Range) + if len(rg.EndKey) > 0 && bytes.Compare(rg.EndKey, over.StartKey) <= 0 { + return false + } + overlaps = append(overlaps, over) + return true + }) + return overlaps +} + +// Update inserts range into tree and delete overlapping ranges. +func (rangeTree *RangeTree) Update(rg Range) { + overlaps := rangeTree.getOverlaps(&rg) + // Range has backuped, overwrite overlapping range. + for _, item := range overlaps { + log.Info("delete overlapping range", + zap.Binary("StartKey", item.StartKey), + zap.Binary("EndKey", item.EndKey), + ) + rangeTree.Delete(item) + } + rangeTree.ReplaceOrInsert(&rg) +} + +// Put forms a range and inserts it into tree. +func (rangeTree *RangeTree) Put( + startKey, endKey []byte, files []*backup.File, +) { + rg := Range{ + StartKey: startKey, + EndKey: endKey, + Files: files, + } + rangeTree.Update(rg) +} + +// InsertRange inserts ranges into the range tree. +// it returns true if all ranges inserted successfully. +// it returns false if there are some overlapped ranges. +func (rangeTree *RangeTree) InsertRange(rg Range) *Range { + out := rangeTree.ReplaceOrInsert(&rg) + if out == nil { + return nil + } + return out.(*Range) +} diff --git a/pkg/utils/rtree/rtree_test.go b/pkg/utils/rtree/rtree_test.go new file mode 100644 index 000000000..49932216e --- /dev/null +++ b/pkg/utils/rtree/rtree_test.go @@ -0,0 +1,86 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package rtree + +import ( + "fmt" + "testing" + + . "github.com/pingcap/check" +) + +var _ = Suite(&testRangeTreeSuite{}) + +type testRangeTreeSuite struct{} + +func newRange(start, end []byte) *Range { + return &Range{ + StartKey: start, + EndKey: end, + } +} + +func (s *testRangeTreeSuite) TestRangeIntersect(c *C) { + rg := newRange([]byte("a"), []byte("c")) + + start, end, isIntersect := rg.Intersect([]byte(""), []byte("")) + c.Assert(isIntersect, Equals, true) + c.Assert(start, DeepEquals, []byte("a")) + c.Assert(end, DeepEquals, []byte("c")) + + start, end, isIntersect = rg.Intersect([]byte(""), []byte("a")) + c.Assert(isIntersect, Equals, false) + c.Assert(start, DeepEquals, []byte(nil)) + c.Assert(end, DeepEquals, []byte(nil)) + + start, end, isIntersect = rg.Intersect([]byte(""), []byte("b")) + c.Assert(isIntersect, Equals, true) + c.Assert(start, DeepEquals, []byte("a")) + c.Assert(end, DeepEquals, []byte("b")) + + start, end, isIntersect = rg.Intersect([]byte("a"), []byte("b")) + c.Assert(isIntersect, Equals, true) + c.Assert(start, DeepEquals, []byte("a")) + c.Assert(end, DeepEquals, []byte("b")) + + start, end, isIntersect = rg.Intersect([]byte("aa"), []byte("b")) + c.Assert(isIntersect, Equals, true) + c.Assert(start, DeepEquals, []byte("aa")) + c.Assert(end, DeepEquals, []byte("b")) + + start, end, isIntersect = rg.Intersect([]byte("b"), []byte("c")) + c.Assert(isIntersect, Equals, true) + c.Assert(start, DeepEquals, []byte("b")) + c.Assert(end, DeepEquals, []byte("c")) + + start, end, isIntersect = rg.Intersect([]byte(""), []byte{1}) + c.Assert(isIntersect, Equals, false) + c.Assert(start, DeepEquals, []byte(nil)) + c.Assert(end, DeepEquals, []byte(nil)) + + start, end, isIntersect = rg.Intersect([]byte("c"), []byte("")) + c.Assert(isIntersect, Equals, false) + c.Assert(start, DeepEquals, []byte(nil)) + c.Assert(end, DeepEquals, []byte(nil)) +} + +func BenchmarkRangeTreeUpdate(b *testing.B) { + rangeTree := NewRangeTree() + for i := 0; i < b.N; i++ { + item := Range{ + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1))} + rangeTree.Update(item) + } +} From 8a9c80f8cc34e6e9839d7110de622664cf714f72 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Sat, 22 Feb 2020 20:00:46 +0800 Subject: [PATCH 2/7] restore: split restore files into small batch Signed-off-by: Neil Shen --- pkg/restore/range.go | 10 +------- pkg/restore/split_test.go | 24 +++++++++++-------- pkg/restore/util.go | 37 +++++++++++++++++++++++++++++ pkg/task/restore.go | 50 +++++++++++++++++++++++++++++---------- pkg/utils/rtree/rtree.go | 16 +++++++++++-- 5 files changed, 103 insertions(+), 34 deletions(-) diff --git a/pkg/restore/range.go b/pkg/restore/range.go index debad213d..fce3780be 100644 --- a/pkg/restore/range.go +++ b/pkg/restore/range.go @@ -1,7 +1,6 @@ package restore import ( - "github.com/google/btree" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" @@ -52,14 +51,7 @@ func sortRanges(ranges []rtree.Range, rewriteRules *RewriteRules) ([]rtree.Range return nil, errors.Errorf("ranges overlapped: %s, %s", out, rg) } } - sortedRanges := make([]rtree.Range, 0, len(ranges)) - rangeTree.Ascend(func(rg btree.Item) bool { - if rg == nil { - return false - } - sortedRanges = append(sortedRanges, *rg.(*rtree.Range)) - return true - }) + sortedRanges := rangeTree.GetSortedRanges() return sortedRanges, nil } diff --git a/pkg/restore/split_test.go b/pkg/restore/split_test.go index 06fcd957a..313f971d7 100644 --- a/pkg/restore/split_test.go +++ b/pkg/restore/split_test.go @@ -10,6 +10,7 @@ import ( "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/schedule/placement" "github.com/pingcap/tidb/util/codec" @@ -20,13 +21,19 @@ type testClient struct { mu sync.RWMutex stores map[uint64]*metapb.Store regions map[uint64]*RegionInfo + regionsInfo *core.RegionsInfo nextRegionID uint64 } func newTestClient(stores map[uint64]*metapb.Store, regions map[uint64]*RegionInfo, nextRegionID uint64) *testClient { + regionsInfo := core.NewRegionsInfo() + for _, regionInfo := range regions { + regionsInfo.AddRegion(core.NewRegionInfo(regionInfo.Region, regionInfo.Leader)) + } return &testClient{ stores: stores, regions: regions, + regionsInfo: regionsInfo, nextRegionID: nextRegionID, } } @@ -144,16 +151,13 @@ func (c *testClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.Ge } func (c *testClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) { - regions := make([]*RegionInfo, 0) - for _, region := range c.regions { - if limit > 0 && len(regions) >= limit { - break - } - if (len(region.Region.GetEndKey()) != 0 && bytes.Compare(region.Region.GetEndKey(), key) <= 0) || - bytes.Compare(region.Region.GetStartKey(), endKey) > 0 { - continue - } - regions = append(regions, region) + infos := c.regionsInfo.ScanRange(key, endKey, limit) + regions := make([]*RegionInfo, 0, len(infos)) + for _, info := range infos { + regions = append(regions, &RegionInfo{ + Region: info.GetMeta(), + Leader: info.GetLeader(), + }) } return regions, nil } diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 56850703c..cffaee409 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -184,6 +184,39 @@ func ValidateFileRanges( return ranges, nil } +// AttachFilesToRanges attach files to ranges. +// Panic if range is overlapped or no range for files. +func AttachFilesToRanges( + files []*backup.File, + ranges []rtree.Range, +) []rtree.Range { + rangeTree := rtree.NewRangeTree() + for _, rg := range ranges { + rangeTree.Update(rg) + } + for _, f := range files { + + rg := rangeTree.Find(&rtree.Range{ + StartKey: f.GetStartKey(), + EndKey: f.GetEndKey(), + }) + if rg == nil { + log.Fatal("range not found", + zap.Binary("startKey", f.GetStartKey()), + zap.Binary("endKey", f.GetEndKey())) + } + file := *f + rg.Files = append(rg.Files, &file) + } + if rangeTree.Len() != len(ranges) { + log.Fatal("ranges overlapped", + zap.Int("ranges length", len(ranges)), + zap.Int("tree length", rangeTree.Len())) + } + sortedRanges := rangeTree.GetSortedRanges() + return sortedRanges +} + // ValidateFileRewriteRule uses rewrite rules to validate the ranges of a file func ValidateFileRewriteRule(file *backup.File, rewriteRules *RewriteRules) error { // Check if the start key has a matched rewrite key @@ -300,6 +333,10 @@ func rewriteFileKeys(file *backup.File, rewriteRules *RewriteRules) (startKey, e if startID == endID { startKey, rule = rewriteRawKey(file.GetStartKey(), rewriteRules) if rewriteRules != nil && rule == nil { + log.Error("cannot find rewrite rule", + zap.Binary("startKey", file.GetStartKey()), + zap.Reflect("rewrite table", rewriteRules.Table), + zap.Reflect("rewrite data", rewriteRules.Data)) err = errors.New("cannot find rewrite rule for start key") return } diff --git a/pkg/task/restore.go b/pkg/task/restore.go index bc468acc6..3ae5c96cc 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -15,6 +15,7 @@ import ( "github.com/pingcap/br/pkg/restore" "github.com/pingcap/br/pkg/summary" "github.com/pingcap/br/pkg/utils" + "github.com/pingcap/br/pkg/utils/rtree" ) const ( @@ -123,6 +124,8 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } summary.CollectInt("restore ranges", len(ranges)) + ranges = restore.AttachFilesToRanges(files, ranges) + // Redirect to log if there is no log file to avoid unreadable output. updateCh := utils.StartProgress( ctx, @@ -131,17 +134,13 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf int64(len(ranges)+len(files)), !cfg.LogProgress) - removedSchedulers, err := restorePreWork(ctx, client, mgr) - if err != nil { - return err - } - - err = restore.SplitRanges(ctx, client, ranges, rewriteRules, updateCh) + clusterCfg, err := restorePreWork(ctx, client, mgr) if err != nil { - log.Error("split regions failed", zap.Error(err)) return err } + // Do not reset timestamp if we are doing incremental restore, because + // we are not allowed to decrease timestamp. if !client.IsIncremental() { if err = client.ResetTS(cfg.PD); err != nil { log.Error("reset pd TS failed", zap.Error(err)) @@ -149,16 +148,41 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } } - err = client.RestoreFiles(files, rewriteRules, updateCh) - // always run the post-work even on error, so we don't stuck in the import mode or emptied schedulers - postErr := restorePostWork(ctx, client, mgr, removedSchedulers) + // Restore sst files in batch, the max size of batches are 64. + batchSize := 64 + batches := make([][]rtree.Range, 0, (len(ranges)+batchSize-1)/batchSize) + for batchSize < len(ranges) { + ranges, batches = ranges[batchSize:], append(batches, ranges[0:batchSize:batchSize]) + } + batches = append(batches, ranges) + for _, rangeBatch := range batches { + // Split regions by the given rangeBatch. + err = restore.SplitRanges(ctx, client, rangeBatch, rewriteRules, updateCh) + if err != nil { + log.Error("split regions failed", zap.Error(err)) + return err + } + + // Collect related files in the given rangeBatch. + fileBatch := make([]*backup.File, 0, 2*len(rangeBatch)) + for _, rg := range rangeBatch { + fileBatch = append(fileBatch, rg.Files...) + } + + // After split, we can restore backup files. + err = client.RestoreFiles(fileBatch, rewriteRules, updateCh) + if err != nil { + break + } + } + + // Always run the post-work even on error, so we don't stuck in the import + // mode or emptied schedulers + err = restorePostWork(ctx, client, mgr, clusterCfg) if err != nil { return err } - if postErr != nil { - return postErr - } // Restore has finished. close(updateCh) diff --git a/pkg/utils/rtree/rtree.go b/pkg/utils/rtree/rtree.go index d481d9c1a..342560eec 100644 --- a/pkg/utils/rtree/rtree.go +++ b/pkg/utils/rtree/rtree.go @@ -166,8 +166,7 @@ func (rangeTree *RangeTree) Put( } // InsertRange inserts ranges into the range tree. -// it returns true if all ranges inserted successfully. -// it returns false if there are some overlapped ranges. +// It returns a non-nil range if there are soe overlapped ranges. func (rangeTree *RangeTree) InsertRange(rg Range) *Range { out := rangeTree.ReplaceOrInsert(&rg) if out == nil { @@ -175,3 +174,16 @@ func (rangeTree *RangeTree) InsertRange(rg Range) *Range { } return out.(*Range) } + +// GetSortedRanges collects and returns sorted ranges. +func (rangeTree *RangeTree) GetSortedRanges() []Range { + sortedRanges := make([]Range, 0, rangeTree.Len()) + rangeTree.Ascend(func(rg btree.Item) bool { + if rg == nil { + return false + } + sortedRanges = append(sortedRanges, *rg.(*Range)) + return true + }) + return sortedRanges +} From 916946f9067921eb3bea53814947de62746713f6 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Sun, 23 Feb 2020 13:32:26 +0800 Subject: [PATCH 3/7] task: set default restore concurrency to 128 Signed-off-by: Neil Shen --- pkg/task/backup.go | 5 +++++ pkg/task/common.go | 7 ++++++- pkg/task/restore.go | 19 ++++++++++++++++--- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/pkg/task/backup.go b/pkg/task/backup.go index ead4c2351..4f6a86170 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -21,6 +21,8 @@ import ( const ( flagBackupTimeago = "timeago" flagLastBackupTS = "lastbackupts" + + defaultBackupConcurrency = 4 ) // BackupConfig is the configuration specific for backup tasks. @@ -58,6 +60,9 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err = cfg.Config.ParseFromFlags(flags); err != nil { return errors.Trace(err) } + if cfg.Config.Concurrency == 0 { + cfg.Config.Concurrency = defaultBackupConcurrency + } return nil } diff --git a/pkg/task/common.go b/pkg/task/common.go index c3f866492..08811b953 100644 --- a/pkg/task/common.go +++ b/pkg/task/common.go @@ -78,9 +78,14 @@ func DefineCommonFlags(flags *pflag.FlagSet) { flags.String(flagKey, "", "Private key path for TLS connection") flags.Uint64(flagRateLimit, 0, "The rate limit of the task, MB/s per node") - flags.Uint32(flagConcurrency, 4, "The size of thread pool on each node that executes the task") flags.Bool(flagChecksum, true, "Run checksum at end of task") + // Default concurrency is different for backup and restore. + // Leave it 0 and let them adjust the value. + flags.Uint32(flagConcurrency, 0, "The size of thread pool on each node that executes the task") + // It may confuse users , so just hide it. + _ = flags.MarkHidden(flagConcurrency) + flags.Uint64(flagRateLimitUnit, utils.MB, "The unit of rate limit") _ = flags.MarkHidden(flagRateLimitUnit) diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 3ae5c96cc..e606ec931 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -2,6 +2,7 @@ package task import ( "context" + "math" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" @@ -32,6 +33,11 @@ var schedulers = map[string]struct{}{ "shuffle-hot-region-scheduler": {}, } +const ( + defaultRestoreConcurrency = 128 + maxRestoreBatchSizeLimit = 256 +) + // RestoreConfig is the configuration specific for restore tasks. type RestoreConfig struct { Config @@ -53,7 +59,14 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } - return cfg.Config.ParseFromFlags(flags) + err = cfg.Config.ParseFromFlags(flags) + if err != nil { + return errors.Trace(err) + } + if cfg.Config.Concurrency == 0 { + cfg.Config.Concurrency = defaultRestoreConcurrency + } + return nil } // RunRestore starts a restore task inside the current goroutine. @@ -148,8 +161,8 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } } - // Restore sst files in batch, the max size of batches are 64. - batchSize := 64 + // Restore sst files in batch. + batchSize := int(math.Min(maxRestoreBatchSizeLimit /* 256 */, float64(cfg.Concurrency))) batches := make([][]rtree.Range, 0, (len(ranges)+batchSize-1)/batchSize) for batchSize < len(ranges) { ranges, batches = ranges[batchSize:], append(batches, ranges[0:batchSize:batchSize]) From e90a8f53985445d7b92422be58a0f934a5ab3915 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Sun, 23 Feb 2020 13:35:14 +0800 Subject: [PATCH 4/7] restore: unused table worker pool Signed-off-by: Neil Shen --- pkg/restore/client.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index a06617084..330de12d8 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -37,10 +37,9 @@ type Client struct { ctx context.Context cancel context.CancelFunc - pdClient pd.Client - fileImporter FileImporter - workerPool *utils.WorkerPool - tableWorkerPool *utils.WorkerPool + pdClient pd.Client + fileImporter FileImporter + workerPool *utils.WorkerPool databases map[string]*utils.Database ddlJobs []*model.Job @@ -66,11 +65,10 @@ func NewRestoreClient( } return &Client{ - ctx: ctx, - cancel: cancel, - pdClient: pdClient, - tableWorkerPool: utils.NewWorkerPool(128, "table"), - db: db, + ctx: ctx, + cancel: cancel, + pdClient: pdClient, + db: db, }, nil } From f29d9e6e488016940bd58083e670fda6f00d04c3 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Sun, 23 Feb 2020 13:52:42 +0800 Subject: [PATCH 5/7] summary: sum up repeated duration and int Signed-off-by: Neil Shen --- pkg/summary/collector.go | 44 +++++++++++++++++++++++++-------- pkg/summary/collector_test.go | 46 +++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 10 deletions(-) create mode 100644 pkg/summary/collector_test.go diff --git a/pkg/summary/collector.go b/pkg/summary/collector.go index cd5aac6c6..e49428df5 100644 --- a/pkg/summary/collector.go +++ b/pkg/summary/collector.go @@ -36,7 +36,9 @@ type LogCollector interface { Summary(name string) } -var collector = newLogCollector() +type logFunc func(msg string, fields ...zap.Field) + +var collector = newLogCollector(log.Info) type logCollector struct { mu sync.Mutex @@ -45,16 +47,21 @@ type logCollector struct { successCosts map[string]time.Duration successData map[string]uint64 failureReasons map[string]error - fields []zap.Field + durations map[string]time.Duration + ints map[string]int + + log logFunc } -func newLogCollector() LogCollector { +func newLogCollector(log logFunc) LogCollector { return &logCollector{ unitCount: 0, - fields: make([]zap.Field, 0), successCosts: make(map[string]time.Duration), successData: make(map[string]uint64), failureReasons: make(map[string]error), + durations: make(map[string]time.Duration), + ints: make(map[string]int), + log: log, } } @@ -97,19 +104,30 @@ func (tc *logCollector) CollectFailureUnit(name string, reason error) { func (tc *logCollector) CollectDuration(name string, t time.Duration) { tc.mu.Lock() defer tc.mu.Unlock() - tc.fields = append(tc.fields, zap.Duration(name, t)) + val, ok := tc.durations[name] + if ok { + tc.durations[name] = val + t + return + } + tc.durations[name] = t } func (tc *logCollector) CollectInt(name string, t int) { tc.mu.Lock() defer tc.mu.Unlock() - tc.fields = append(tc.fields, zap.Int(name, t)) + val, ok := tc.ints[name] + if ok { + tc.ints[name] = val + t + return + } + tc.ints[name] = t } func (tc *logCollector) Summary(name string) { tc.mu.Lock() defer func() { - tc.fields = tc.fields[:0] + tc.durations = make(map[string]time.Duration) + tc.ints = make(map[string]int) tc.successCosts = make(map[string]time.Duration) tc.failureReasons = make(map[string]error) tc.mu.Unlock() @@ -131,11 +149,17 @@ func (tc *logCollector) Summary(name string) { } } - logFields := tc.fields + logFields := make([]zap.Field, 0, len(tc.durations)+len(tc.ints)) + for key, val := range tc.durations { + logFields = append(logFields, zap.Duration(key, val)) + } + for key, val := range tc.ints { + logFields = append(logFields, zap.Int(key, val)) + } + if len(tc.failureReasons) != 0 { names := make([]string, 0, len(tc.failureReasons)) for name := range tc.failureReasons { - // logFields = append(logFields, zap.NamedError(name, reason)) names = append(names, name) } logFields = append(logFields, zap.Strings(msg, names)) @@ -162,7 +186,7 @@ func (tc *logCollector) Summary(name string) { msg += fmt.Sprintf(", %s: %d", name, data) } - log.Info(name+" summary: "+msg, logFields...) + tc.log(name+" summary: "+msg, logFields...) } // SetLogCollector allow pass LogCollector outside diff --git a/pkg/summary/collector_test.go b/pkg/summary/collector_test.go new file mode 100644 index 000000000..b00efac16 --- /dev/null +++ b/pkg/summary/collector_test.go @@ -0,0 +1,46 @@ +package summary + +import ( + "testing" + "time" + + . "github.com/pingcap/check" + "go.uber.org/zap" +) + +func TestT(t *testing.T) { + TestingT(t) +} + +var _ = Suite(&testCollectorSuite{}) + +type testCollectorSuite struct { +} + +func (suit *testCollectorSuite) TestSumDurationInt(c *C) { + fields := []zap.Field{} + logger := func(msg string, fs ...zap.Field) { + fields = append(fields, fs...) + } + col := newLogCollector(logger) + col.CollectDuration("a", time.Second) + col.CollectDuration("b", time.Second) + col.CollectDuration("b", time.Second) + col.CollectInt("c", 2) + col.CollectInt("c", 2) + col.Summary("foo") + + c.Assert(len(fields), Equals, 3) + assertContains := func(field zap.Field) { + for _, f := range fields { + if f.Key == field.Key { + c.Assert(f, DeepEquals, field) + return + } + } + c.Assert(fields, DeepEquals, field) // Must fail and prints detail + } + assertContains(zap.Duration("a", time.Second)) + assertContains(zap.Duration("b", 2*time.Second)) + assertContains(zap.Int("c", 4)) +} From 9cc5029d5baee79aed64343b133aa4cc646e4d62 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Mon, 24 Feb 2020 11:12:37 +0800 Subject: [PATCH 6/7] address comments Signed-off-by: Neil Shen --- pkg/backup/client.go | 4 +- pkg/backup/range.go | 73 ----------------- pkg/backup/range_test.go | 148 ---------------------------------- pkg/summary/collector.go | 14 +--- pkg/summary/collector_test.go | 2 +- pkg/task/restore.go | 19 +++-- pkg/utils/rtree/check.go | 31 +++++++ pkg/utils/rtree/rtree.go | 43 +++++++++- pkg/utils/rtree/rtree_test.go | 104 ++++++++++++++++++++++++ 9 files changed, 193 insertions(+), 245 deletions(-) delete mode 100644 pkg/backup/range.go delete mode 100644 pkg/backup/range_test.go create mode 100644 pkg/utils/rtree/check.go diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 05f136010..464c00dff 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -430,7 +430,7 @@ func (bc *Client) backupRange( }) // Check if there are duplicated files. - checkDupFiles(&results) + rtree.CheckDupFiles(&results) return nil } @@ -474,7 +474,7 @@ func (bc *Client) fineGrainedBackup( bo := tikv.NewBackoffer(ctx, backupFineGrainedMaxBackoff) for { // Step1, check whether there is any incomplete range - incomplete := getIncompleteRange(&rangeTree, startKey, endKey) + incomplete := rangeTree.GetIncompleteRange(startKey, endKey) if len(incomplete) == 0 { return nil } diff --git a/pkg/backup/range.go b/pkg/backup/range.go deleted file mode 100644 index fd568a47b..000000000 --- a/pkg/backup/range.go +++ /dev/null @@ -1,73 +0,0 @@ -package backup - -import ( - "bytes" - "encoding/hex" - - "github.com/google/btree" - "github.com/pingcap/log" - "go.uber.org/zap" - - "github.com/pingcap/br/pkg/utils/rtree" -) - -func getIncompleteRange( - rangeTree *rtree.RangeTree, startKey, endKey []byte, -) []rtree.Range { - if len(startKey) != 0 && bytes.Equal(startKey, endKey) { - return []rtree.Range{} - } - incomplete := make([]rtree.Range, 0, 64) - requsetRange := rtree.Range{StartKey: startKey, EndKey: endKey} - lastEndKey := startKey - pviot := &rtree.Range{StartKey: startKey} - if first := rangeTree.Find(pviot); first != nil { - pviot.StartKey = first.StartKey - } - rangeTree.AscendGreaterOrEqual(pviot, func(i btree.Item) bool { - rg := i.(*rtree.Range) - if bytes.Compare(lastEndKey, rg.StartKey) < 0 { - start, end, isIntersect := - requsetRange.Intersect(lastEndKey, rg.StartKey) - if isIntersect { - // There is a gap between the last item and the current item. - incomplete = - append(incomplete, rtree.Range{StartKey: start, EndKey: end}) - } - } - lastEndKey = rg.EndKey - return len(endKey) == 0 || bytes.Compare(rg.EndKey, endKey) < 0 - }) - - // Check whether we need append the last range - if !bytes.Equal(lastEndKey, endKey) && len(lastEndKey) != 0 && - (len(endKey) == 0 || bytes.Compare(lastEndKey, endKey) < 0) { - start, end, isIntersect := requsetRange.Intersect(lastEndKey, endKey) - if isIntersect { - incomplete = - append(incomplete, rtree.Range{StartKey: start, EndKey: end}) - } - } - return incomplete -} - -func checkDupFiles(rangeTree *rtree.RangeTree) { - // Name -> SHA256 - files := make(map[string][]byte) - rangeTree.Ascend(func(i btree.Item) bool { - rg := i.(*rtree.Range) - for _, f := range rg.Files { - old, ok := files[f.Name] - if ok { - log.Error("dup file", - zap.String("Name", f.Name), - zap.String("SHA256_1", hex.EncodeToString(old)), - zap.String("SHA256_2", hex.EncodeToString(f.Sha256)), - ) - } else { - files[f.Name] = f.Sha256 - } - } - return true - }) -} diff --git a/pkg/backup/range_test.go b/pkg/backup/range_test.go deleted file mode 100644 index b9b94a413..000000000 --- a/pkg/backup/range_test.go +++ /dev/null @@ -1,148 +0,0 @@ -// Copyright 2016 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package backup - -import ( - "fmt" - "testing" - - . "github.com/pingcap/check" - - "github.com/pingcap/br/pkg/utils/rtree" -) - -var _ = Suite(&testRangeTreeSuite{}) - -type testRangeTreeSuite struct{} - -func newRange(start, end []byte) *rtree.Range { - return &rtree.Range{ - StartKey: start, - EndKey: end, - } -} - -func (s *testRangeTreeSuite) TestRangeTree(c *C) { - rangeTree := rtree.NewRangeTree() - c.Assert(rangeTree.Get(newRange([]byte(""), []byte(""))), IsNil) - - search := func(key []byte) *rtree.Range { - rg := rangeTree.Get(newRange(key, []byte(""))) - if rg == nil { - return nil - } - return rg.(*rtree.Range) - } - assertIncomplete := func(startKey, endKey []byte, ranges []rtree.Range) { - incomplete := getIncompleteRange(&rangeTree, startKey, endKey) - c.Logf("%#v %#v\n%#v\n%#v\n", startKey, endKey, incomplete, ranges) - c.Assert(len(incomplete), Equals, len(ranges)) - for idx, rg := range incomplete { - c.Assert(rg.StartKey, DeepEquals, ranges[idx].StartKey) - c.Assert(rg.EndKey, DeepEquals, ranges[idx].EndKey) - } - } - assertAllComplete := func() { - for s := 0; s < 0xfe; s++ { - for e := s + 1; e < 0xff; e++ { - start := []byte{byte(s)} - end := []byte{byte(e)} - assertIncomplete(start, end, []rtree.Range{}) - } - } - } - - range0 := newRange([]byte(""), []byte("a")) - rangeA := newRange([]byte("a"), []byte("b")) - rangeB := newRange([]byte("b"), []byte("c")) - rangeC := newRange([]byte("c"), []byte("d")) - rangeD := newRange([]byte("d"), []byte("")) - - rangeTree.Update(*rangeA) - c.Assert(rangeTree.Len(), Equals, 1) - assertIncomplete([]byte("a"), []byte("b"), []rtree.Range{}) - assertIncomplete([]byte(""), []byte(""), - []rtree.Range{ - {StartKey: []byte(""), EndKey: []byte("a")}, - {StartKey: []byte("b"), EndKey: []byte("")}, - }) - - rangeTree.Update(*rangeC) - c.Assert(rangeTree.Len(), Equals, 2) - assertIncomplete([]byte("a"), []byte("c"), []rtree.Range{ - {StartKey: []byte("b"), EndKey: []byte("c")}, - }) - assertIncomplete([]byte("b"), []byte("c"), []rtree.Range{ - {StartKey: []byte("b"), EndKey: []byte("c")}, - }) - assertIncomplete([]byte(""), []byte(""), - []rtree.Range{ - {StartKey: []byte(""), EndKey: []byte("a")}, - {StartKey: []byte("b"), EndKey: []byte("c")}, - {StartKey: []byte("d"), EndKey: []byte("")}, - }) - - c.Assert(search([]byte{}), IsNil) - c.Assert(search([]byte("a")), DeepEquals, rangeA) - c.Assert(search([]byte("b")), IsNil) - c.Assert(search([]byte("c")), DeepEquals, rangeC) - c.Assert(search([]byte("d")), IsNil) - - rangeTree.Update(*rangeB) - c.Assert(rangeTree.Len(), Equals, 3) - c.Assert(search([]byte("b")), DeepEquals, rangeB) - assertIncomplete([]byte(""), []byte(""), - []rtree.Range{ - {StartKey: []byte(""), EndKey: []byte("a")}, - {StartKey: []byte("d"), EndKey: []byte("")}, - }) - - rangeTree.Update(*rangeD) - c.Assert(rangeTree.Len(), Equals, 4) - c.Assert(search([]byte("d")), DeepEquals, rangeD) - assertIncomplete([]byte(""), []byte(""), []rtree.Range{ - {StartKey: []byte(""), EndKey: []byte("a")}, - }) - - // None incomplete for any range after insert range 0 - rangeTree.Update(*range0) - c.Assert(rangeTree.Len(), Equals, 5) - - // Overwrite range B and C. - rangeBD := newRange([]byte("b"), []byte("d")) - rangeTree.Update(*rangeBD) - c.Assert(rangeTree.Len(), Equals, 4) - assertAllComplete() - - // Overwrite range BD, c-d should be empty - rangeTree.Update(*rangeB) - c.Assert(rangeTree.Len(), Equals, 4) - assertIncomplete([]byte(""), []byte(""), []rtree.Range{ - {StartKey: []byte("c"), EndKey: []byte("d")}, - }) - - rangeTree.Update(*rangeC) - c.Assert(rangeTree.Len(), Equals, 5) - assertAllComplete() -} - -func BenchmarkRangeTreeUpdate(b *testing.B) { - rangeTree := rtree.NewRangeTree() - for i := 0; i < b.N; i++ { - item := &rtree.Range{ - StartKey: []byte(fmt.Sprintf("%20d", i)), - EndKey: []byte(fmt.Sprintf("%20d", i+1))} - rangeTree.Update(*item) - } -} diff --git a/pkg/summary/collector.go b/pkg/summary/collector.go index e49428df5..0fb1dfcf9 100644 --- a/pkg/summary/collector.go +++ b/pkg/summary/collector.go @@ -104,23 +104,13 @@ func (tc *logCollector) CollectFailureUnit(name string, reason error) { func (tc *logCollector) CollectDuration(name string, t time.Duration) { tc.mu.Lock() defer tc.mu.Unlock() - val, ok := tc.durations[name] - if ok { - tc.durations[name] = val + t - return - } - tc.durations[name] = t + tc.durations[name] += t } func (tc *logCollector) CollectInt(name string, t int) { tc.mu.Lock() defer tc.mu.Unlock() - val, ok := tc.ints[name] - if ok { - tc.ints[name] = val + t - return - } - tc.ints[name] = t + tc.ints[name] += t } func (tc *logCollector) Summary(name string) { diff --git a/pkg/summary/collector_test.go b/pkg/summary/collector_test.go index b00efac16..6a8704db2 100644 --- a/pkg/summary/collector_test.go +++ b/pkg/summary/collector_test.go @@ -38,7 +38,7 @@ func (suit *testCollectorSuite) TestSumDurationInt(c *C) { return } } - c.Assert(fields, DeepEquals, field) // Must fail and prints detail + c.Error(fields, "do not contain", field) } assertContains(zap.Duration("a", time.Second)) assertContains(zap.Duration("b", 2*time.Second)) diff --git a/pkg/task/restore.go b/pkg/task/restore.go index e606ec931..7baaa6928 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -2,7 +2,6 @@ package task import ( "context" - "math" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" @@ -162,14 +161,20 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } // Restore sst files in batch. - batchSize := int(math.Min(maxRestoreBatchSizeLimit /* 256 */, float64(cfg.Concurrency))) - batches := make([][]rtree.Range, 0, (len(ranges)+batchSize-1)/batchSize) - for batchSize < len(ranges) { - ranges, batches = ranges[batchSize:], append(batches, ranges[0:batchSize:batchSize]) + batchSize := int(cfg.Concurrency) + if batchSize > maxRestoreBatchSizeLimit { + batchSize = maxRestoreBatchSizeLimit // 256 } - batches = append(batches, ranges) + for { + if len(ranges) == 0 { + break + } + if batchSize > len(ranges) { + batchSize = len(ranges) + } + var rangeBatch []rtree.Range + ranges, rangeBatch = ranges[batchSize:], ranges[0:batchSize:batchSize] - for _, rangeBatch := range batches { // Split regions by the given rangeBatch. err = restore.SplitRanges(ctx, client, rangeBatch, rewriteRules, updateCh) if err != nil { diff --git a/pkg/utils/rtree/check.go b/pkg/utils/rtree/check.go new file mode 100644 index 000000000..08c98d2f4 --- /dev/null +++ b/pkg/utils/rtree/check.go @@ -0,0 +1,31 @@ +package rtree + +import ( + "encoding/hex" + + "github.com/google/btree" + "github.com/pingcap/log" + "go.uber.org/zap" +) + +// CheckDupFiles checks if there are any files are duplicated. +func CheckDupFiles(rangeTree *RangeTree) { + // Name -> SHA256 + files := make(map[string][]byte) + rangeTree.Ascend(func(i btree.Item) bool { + rg := i.(*Range) + for _, f := range rg.Files { + old, ok := files[f.Name] + if ok { + log.Error("dup file", + zap.String("Name", f.Name), + zap.String("SHA256_1", hex.EncodeToString(old)), + zap.String("SHA256_2", hex.EncodeToString(f.Sha256)), + ) + } else { + files[f.Name] = f.Sha256 + } + } + return true + }) +} diff --git a/pkg/utils/rtree/rtree.go b/pkg/utils/rtree/rtree.go index 342560eec..e3c136803 100644 --- a/pkg/utils/rtree/rtree.go +++ b/pkg/utils/rtree/rtree.go @@ -28,8 +28,6 @@ type Range struct { StartKey []byte EndKey []byte Files []*backup.File - // Addition data attached in Range - // Attachment interface{} } // String formats a range to a string @@ -187,3 +185,44 @@ func (rangeTree *RangeTree) GetSortedRanges() []Range { }) return sortedRanges } + +// GetIncompleteRange returns missing range covered by startKey and endKey. +func (rangeTree *RangeTree) GetIncompleteRange( + startKey, endKey []byte, +) []Range { + if len(startKey) != 0 && bytes.Equal(startKey, endKey) { + return []Range{} + } + incomplete := make([]Range, 0, 64) + requsetRange := Range{StartKey: startKey, EndKey: endKey} + lastEndKey := startKey + pviot := &Range{StartKey: startKey} + if first := rangeTree.Find(pviot); first != nil { + pviot.StartKey = first.StartKey + } + rangeTree.AscendGreaterOrEqual(pviot, func(i btree.Item) bool { + rg := i.(*Range) + if bytes.Compare(lastEndKey, rg.StartKey) < 0 { + start, end, isIntersect := + requsetRange.Intersect(lastEndKey, rg.StartKey) + if isIntersect { + // There is a gap between the last item and the current item. + incomplete = + append(incomplete, Range{StartKey: start, EndKey: end}) + } + } + lastEndKey = rg.EndKey + return len(endKey) == 0 || bytes.Compare(rg.EndKey, endKey) < 0 + }) + + // Check whether we need append the last range + if !bytes.Equal(lastEndKey, endKey) && len(lastEndKey) != 0 && + (len(endKey) == 0 || bytes.Compare(lastEndKey, endKey) < 0) { + start, end, isIntersect := requsetRange.Intersect(lastEndKey, endKey) + if isIntersect { + incomplete = + append(incomplete, Range{StartKey: start, EndKey: end}) + } + } + return incomplete +} diff --git a/pkg/utils/rtree/rtree_test.go b/pkg/utils/rtree/rtree_test.go index 49932216e..f4ec4f201 100644 --- a/pkg/utils/rtree/rtree_test.go +++ b/pkg/utils/rtree/rtree_test.go @@ -31,6 +31,110 @@ func newRange(start, end []byte) *Range { } } +func (s *testRangeTreeSuite) TestRangeTree(c *C) { + rangeTree := NewRangeTree() + c.Assert(rangeTree.Get(newRange([]byte(""), []byte(""))), IsNil) + + search := func(key []byte) *Range { + rg := rangeTree.Get(newRange(key, []byte(""))) + if rg == nil { + return nil + } + return rg.(*Range) + } + assertIncomplete := func(startKey, endKey []byte, ranges []Range) { + incomplete := rangeTree.GetIncompleteRange(startKey, endKey) + c.Logf("%#v %#v\n%#v\n%#v\n", startKey, endKey, incomplete, ranges) + c.Assert(len(incomplete), Equals, len(ranges)) + for idx, rg := range incomplete { + c.Assert(rg.StartKey, DeepEquals, ranges[idx].StartKey) + c.Assert(rg.EndKey, DeepEquals, ranges[idx].EndKey) + } + } + assertAllComplete := func() { + for s := 0; s < 0xfe; s++ { + for e := s + 1; e < 0xff; e++ { + start := []byte{byte(s)} + end := []byte{byte(e)} + assertIncomplete(start, end, []Range{}) + } + } + } + + range0 := newRange([]byte(""), []byte("a")) + rangeA := newRange([]byte("a"), []byte("b")) + rangeB := newRange([]byte("b"), []byte("c")) + rangeC := newRange([]byte("c"), []byte("d")) + rangeD := newRange([]byte("d"), []byte("")) + + rangeTree.Update(*rangeA) + c.Assert(rangeTree.Len(), Equals, 1) + assertIncomplete([]byte("a"), []byte("b"), []Range{}) + assertIncomplete([]byte(""), []byte(""), + []Range{ + {StartKey: []byte(""), EndKey: []byte("a")}, + {StartKey: []byte("b"), EndKey: []byte("")}, + }) + + rangeTree.Update(*rangeC) + c.Assert(rangeTree.Len(), Equals, 2) + assertIncomplete([]byte("a"), []byte("c"), []Range{ + {StartKey: []byte("b"), EndKey: []byte("c")}, + }) + assertIncomplete([]byte("b"), []byte("c"), []Range{ + {StartKey: []byte("b"), EndKey: []byte("c")}, + }) + assertIncomplete([]byte(""), []byte(""), + []Range{ + {StartKey: []byte(""), EndKey: []byte("a")}, + {StartKey: []byte("b"), EndKey: []byte("c")}, + {StartKey: []byte("d"), EndKey: []byte("")}, + }) + + c.Assert(search([]byte{}), IsNil) + c.Assert(search([]byte("a")), DeepEquals, rangeA) + c.Assert(search([]byte("b")), IsNil) + c.Assert(search([]byte("c")), DeepEquals, rangeC) + c.Assert(search([]byte("d")), IsNil) + + rangeTree.Update(*rangeB) + c.Assert(rangeTree.Len(), Equals, 3) + c.Assert(search([]byte("b")), DeepEquals, rangeB) + assertIncomplete([]byte(""), []byte(""), + []Range{ + {StartKey: []byte(""), EndKey: []byte("a")}, + {StartKey: []byte("d"), EndKey: []byte("")}, + }) + + rangeTree.Update(*rangeD) + c.Assert(rangeTree.Len(), Equals, 4) + c.Assert(search([]byte("d")), DeepEquals, rangeD) + assertIncomplete([]byte(""), []byte(""), []Range{ + {StartKey: []byte(""), EndKey: []byte("a")}, + }) + + // None incomplete for any range after insert range 0 + rangeTree.Update(*range0) + c.Assert(rangeTree.Len(), Equals, 5) + + // Overwrite range B and C. + rangeBD := newRange([]byte("b"), []byte("d")) + rangeTree.Update(*rangeBD) + c.Assert(rangeTree.Len(), Equals, 4) + assertAllComplete() + + // Overwrite range BD, c-d should be empty + rangeTree.Update(*rangeB) + c.Assert(rangeTree.Len(), Equals, 4) + assertIncomplete([]byte(""), []byte(""), []Range{ + {StartKey: []byte("c"), EndKey: []byte("d")}, + }) + + rangeTree.Update(*rangeC) + c.Assert(rangeTree.Len(), Equals, 5) + assertAllComplete() +} + func (s *testRangeTreeSuite) TestRangeIntersect(c *C) { rg := newRange([]byte("a"), []byte("c")) From 8810be1f85905fb13b6ddf7a0f32e076ab434afc Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Mon, 24 Feb 2020 11:14:09 +0800 Subject: [PATCH 7/7] rtree: move rtree from utils to pkg Signed-off-by: Neil Shen --- cmd/validate.go | 2 +- pkg/backup/client.go | 2 +- pkg/backup/push.go | 2 +- pkg/restore/range.go | 2 +- pkg/restore/range_test.go | 2 +- pkg/restore/split.go | 2 +- pkg/restore/split_test.go | 2 +- pkg/restore/util.go | 2 +- pkg/{utils => }/rtree/check.go | 0 pkg/{utils => }/rtree/rtree.go | 0 pkg/{utils => }/rtree/rtree_test.go | 0 pkg/task/restore.go | 2 +- 12 files changed, 9 insertions(+), 9 deletions(-) rename pkg/{utils => }/rtree/check.go (100%) rename pkg/{utils => }/rtree/rtree.go (100%) rename pkg/{utils => }/rtree/rtree_test.go (100%) diff --git a/cmd/validate.go b/cmd/validate.go index 32a53d3ac..d358995a3 100644 --- a/cmd/validate.go +++ b/cmd/validate.go @@ -19,9 +19,9 @@ import ( "go.uber.org/zap" "github.com/pingcap/br/pkg/restore" + "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/task" "github.com/pingcap/br/pkg/utils" - "github.com/pingcap/br/pkg/utils/rtree" ) // NewValidateCommand return a debug subcommand. diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 464c00dff..13854192d 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -29,10 +29,10 @@ import ( "github.com/pingcap/tidb/util/ranger" "go.uber.org/zap" + "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" "github.com/pingcap/br/pkg/utils" - "github.com/pingcap/br/pkg/utils/rtree" ) // ClientMgr manages connections needed by backup. diff --git a/pkg/backup/push.go b/pkg/backup/push.go index 88a4b24e7..803a8ec92 100644 --- a/pkg/backup/push.go +++ b/pkg/backup/push.go @@ -10,7 +10,7 @@ import ( "github.com/pingcap/log" "go.uber.org/zap" - "github.com/pingcap/br/pkg/utils/rtree" + "github.com/pingcap/br/pkg/rtree" ) // pushDown warps a backup task. diff --git a/pkg/restore/range.go b/pkg/restore/range.go index fce3780be..97e2469dc 100644 --- a/pkg/restore/range.go +++ b/pkg/restore/range.go @@ -8,7 +8,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "go.uber.org/zap" - "github.com/pingcap/br/pkg/utils/rtree" + "github.com/pingcap/br/pkg/rtree" ) // sortRanges checks if the range overlapped and sort them diff --git a/pkg/restore/range_test.go b/pkg/restore/range_test.go index 84e8e6ecd..18da4f897 100644 --- a/pkg/restore/range_test.go +++ b/pkg/restore/range_test.go @@ -7,7 +7,7 @@ import ( "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/br/pkg/utils/rtree" + "github.com/pingcap/br/pkg/rtree" ) type testRangeSuite struct{} diff --git a/pkg/restore/split.go b/pkg/restore/split.go index e8c2ed523..10373f277 100644 --- a/pkg/restore/split.go +++ b/pkg/restore/split.go @@ -14,7 +14,7 @@ import ( "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" - "github.com/pingcap/br/pkg/utils/rtree" + "github.com/pingcap/br/pkg/rtree" ) // Constants for split retry machinery. diff --git a/pkg/restore/split_test.go b/pkg/restore/split_test.go index 313f971d7..bfaf4dbb6 100644 --- a/pkg/restore/split_test.go +++ b/pkg/restore/split_test.go @@ -14,7 +14,7 @@ import ( "github.com/pingcap/pd/server/schedule/placement" "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/br/pkg/utils/rtree" + "github.com/pingcap/br/pkg/rtree" ) type testClient struct { diff --git a/pkg/restore/util.go b/pkg/restore/util.go index cffaee409..a288d091c 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -18,8 +18,8 @@ import ( "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" + "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/summary" - "github.com/pingcap/br/pkg/utils/rtree" ) var recordPrefixSep = []byte("_r") diff --git a/pkg/utils/rtree/check.go b/pkg/rtree/check.go similarity index 100% rename from pkg/utils/rtree/check.go rename to pkg/rtree/check.go diff --git a/pkg/utils/rtree/rtree.go b/pkg/rtree/rtree.go similarity index 100% rename from pkg/utils/rtree/rtree.go rename to pkg/rtree/rtree.go diff --git a/pkg/utils/rtree/rtree_test.go b/pkg/rtree/rtree_test.go similarity index 100% rename from pkg/utils/rtree/rtree_test.go rename to pkg/rtree/rtree_test.go diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 7baaa6928..3e1f53cae 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -13,9 +13,9 @@ import ( "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/restore" + "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/summary" "github.com/pingcap/br/pkg/utils" - "github.com/pingcap/br/pkg/utils/rtree" ) const (