diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index 6869c1199..2ab0a0232 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/keepalive" "github.com/pingcap/br/pkg/glue" + "github.com/pingcap/br/pkg/utils" ) const ( @@ -37,6 +38,7 @@ const ( clusterVersionPrefix = "pd/api/v1/config/cluster-version" regionCountPrefix = "pd/api/v1/stats/region" schdulerPrefix = "pd/api/v1/schedulers" + maxMsgSize = int(128 * utils.MB) // pd.ScanRegion may return a large response ) // Mgr manages connections to a TiDB cluster. @@ -125,7 +127,12 @@ func NewMgr( return nil, errors.Annotatef(failure, "pd address (%s) not available, please check network", pdAddrs) } - pdClient, err := pd.NewClient(addrs, securityOption) + maxCallMsgSize := []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)), + } + pdClient, err := pd.NewClient( + addrs, securityOption, pd.WithGRPCDialOptions(maxCallMsgSize...)) if err != nil { log.Error("fail to create pd client", zap.Error(err)) return nil, err diff --git a/pkg/restore/import.go b/pkg/restore/import.go index c1e1b5dd8..b8928418d 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -23,6 +23,7 @@ import ( ) const importScanRegionTime = 10 * time.Second +const scanRegionPaginationLimit = int(128) // ImporterClient is used to import a file to TiKV type ImporterClient interface { @@ -171,7 +172,8 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul ctx, cancel := context.WithTimeout(importer.ctx, importScanRegionTime) defer cancel() // Scan regions covered by the file range - regionInfos, errScanRegion := importer.metaClient.ScanRegions(ctx, startKey, endKey, 0) + regionInfos, errScanRegion := paginateScanRegion( + ctx, importer.metaClient, startKey, endKey, scanRegionPaginationLimit) if errScanRegion != nil { return errors.Trace(errScanRegion) } @@ -199,6 +201,7 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul zap.Error(errDownload)) return errDownload } + ingestResp, errIngest := importer.ingestSST(downloadMeta, info) ingestRetry: for errIngest == nil { diff --git a/pkg/restore/split.go b/pkg/restore/split.go index 378e256c6..64bf83e8c 100644 --- a/pkg/restore/split.go +++ b/pkg/restore/split.go @@ -89,10 +89,9 @@ func (rs *RegionSplitter) Split( scatterRegions := make([]*RegionInfo, 0) SplitRegions: for i := 0; i < SplitRetryTimes; i++ { - var regions []*RegionInfo - regions, err = rs.client.ScanRegions(ctx, minKey, maxKey, 0) - if err != nil { - return errors.Trace(err) + regions, err1 := paginateScanRegion(ctx, rs.client, minKey, maxKey, scanRegionPaginationLimit) + if err1 != nil { + return errors.Trace(err1) } if len(regions) == 0 { log.Warn("cannot scan any region") diff --git a/pkg/restore/split_test.go b/pkg/restore/split_test.go index 3ace5b8c8..a0dbc3678 100644 --- a/pkg/restore/split_test.go +++ b/pkg/restore/split_test.go @@ -10,6 +10,7 @@ import ( "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/schedule/placement" "github.com/pingcap/tidb/util/codec" ) @@ -18,13 +19,19 @@ type testClient struct { mu sync.RWMutex stores map[uint64]*metapb.Store regions map[uint64]*RegionInfo + regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions nextRegionID uint64 } func newTestClient(stores map[uint64]*metapb.Store, regions map[uint64]*RegionInfo, nextRegionID uint64) *testClient { + regionsInfo := core.NewRegionsInfo() + for _, regionInfo := range regions { + regionsInfo.AddRegion(core.NewRegionInfo(regionInfo.Region, regionInfo.Leader)) + } return &testClient{ stores: stores, regions: regions, + regionsInfo: regionsInfo, nextRegionID: nextRegionID, } } @@ -142,16 +149,13 @@ func (c *testClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.Ge } func (c *testClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) { - regions := make([]*RegionInfo, 0) - for _, region := range c.regions { - if limit > 0 && len(regions) >= limit { - break - } - if (len(region.Region.GetEndKey()) != 0 && bytes.Compare(region.Region.GetEndKey(), key) <= 0) || - bytes.Compare(region.Region.GetStartKey(), endKey) > 0 { - continue - } - regions = append(regions, region) + infos := c.regionsInfo.ScanRange(key, endKey, limit) + regions := make([]*RegionInfo, 0, len(infos)) + for _, info := range infos { + regions = append(regions, &RegionInfo{ + Region: info.GetMeta(), + Leader: info.GetLeader(), + }) } return regions, nil } diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 7b70c9806..0936c1085 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -3,6 +3,7 @@ package restore import ( "bytes" "context" + "encoding/hex" "strings" "time" @@ -324,3 +325,35 @@ func encodeKeyPrefix(key []byte) []byte { encodedPrefix = append(encodedPrefix, codec.EncodeBytes([]byte{}, key[:len(key)-ungroupedLen])...) return append(encodedPrefix[:len(encodedPrefix)-9], key[len(key)-ungroupedLen:]...) } + +// paginateScanRegion scan regions with a limit pagination and +// return all regions at once. +// It reduces max gRPC message size. +func paginateScanRegion( + ctx context.Context, client SplitClient, startKey, endKey []byte, limit int, +) ([]*RegionInfo, error) { + if len(endKey) != 0 && bytes.Compare(startKey, endKey) >= 0 { + return nil, errors.Errorf("startKey >= endKey, startKey %s, endkey %s", + hex.EncodeToString(startKey), hex.EncodeToString(endKey)) + } + + regions := []*RegionInfo{} + for { + batch, err := client.ScanRegions(ctx, startKey, endKey, limit) + if err != nil { + return nil, errors.Trace(err) + } + regions = append(regions, batch...) + if len(batch) < limit { + // No more region + break + } + startKey = batch[len(batch)-1].Region.GetEndKey() + if len(startKey) == 0 || + (len(endKey) > 0 && bytes.Compare(startKey, endKey) >= 0) { + // All key space have scanned + break + } + } + return regions, nil +} diff --git a/pkg/restore/util_test.go b/pkg/restore/util_test.go index bc4da9168..1b5e86b96 100644 --- a/pkg/restore/util_test.go +++ b/pkg/restore/util_test.go @@ -1,11 +1,15 @@ package restore import ( + "context" + "encoding/binary" + . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/util/codec" ) var _ = Suite(&testRestoreUtilSuite{}) @@ -103,3 +107,105 @@ func (s *testRestoreUtilSuite) TestValidateFileRanges(c *C) { ) c.Assert(err, ErrorMatches, "unexpected rewrite rules") } + +func (s *testRestoreUtilSuite) TestPaginateScanRegion(c *C) { + peers := make([]*metapb.Peer, 1) + peers[0] = &metapb.Peer{ + Id: 1, + StoreId: 1, + } + stores := make(map[uint64]*metapb.Store) + stores[1] = &metapb.Store{ + Id: 1, + } + + makeRegions := func(num uint64) (map[uint64]*RegionInfo, []*RegionInfo) { + regionsMap := make(map[uint64]*RegionInfo, num) + regions := make([]*RegionInfo, 0, num) + endKey := make([]byte, 8) + for i := uint64(0); i < num-1; i++ { + ri := &RegionInfo{ + Region: &metapb.Region{ + Id: i + 1, + Peers: peers, + }, + } + + if i != 0 { + startKey := make([]byte, 8) + binary.BigEndian.PutUint64(startKey, i) + ri.Region.StartKey = codec.EncodeBytes([]byte{}, startKey) + } + endKey = make([]byte, 8) + binary.BigEndian.PutUint64(endKey, i+1) + ri.Region.EndKey = codec.EncodeBytes([]byte{}, endKey) + + regionsMap[i] = ri + regions = append(regions, ri) + } + + if num == 1 { + endKey = []byte{} + } else { + endKey = codec.EncodeBytes([]byte{}, endKey) + } + ri := &RegionInfo{ + Region: &metapb.Region{ + Id: num, + Peers: peers, + StartKey: endKey, + EndKey: []byte{}, + }, + } + regionsMap[num] = ri + regions = append(regions, ri) + + return regionsMap, regions + } + + ctx := context.Background() + regionMap := make(map[uint64]*RegionInfo) + regions := []*RegionInfo{} + batch, err := paginateScanRegion(ctx, newTestClient(stores, regionMap, 0), []byte{}, []byte{}, 3) + c.Assert(err, IsNil) + c.Assert(batch, DeepEquals, regions) + + regionMap, regions = makeRegions(1) + batch, err = paginateScanRegion(ctx, newTestClient(stores, regionMap, 0), []byte{}, []byte{}, 3) + c.Assert(err, IsNil) + c.Assert(batch, DeepEquals, regions) + + regionMap, regions = makeRegions(2) + batch, err = paginateScanRegion(ctx, newTestClient(stores, regionMap, 0), []byte{}, []byte{}, 3) + c.Assert(err, IsNil) + c.Assert(batch, DeepEquals, regions) + + regionMap, regions = makeRegions(3) + batch, err = paginateScanRegion(ctx, newTestClient(stores, regionMap, 0), []byte{}, []byte{}, 3) + c.Assert(err, IsNil) + c.Assert(batch, DeepEquals, regions) + + regionMap, regions = makeRegions(8) + batch, err = paginateScanRegion(ctx, newTestClient(stores, regionMap, 0), []byte{}, []byte{}, 3) + c.Assert(err, IsNil) + c.Assert(batch, DeepEquals, regions) + + regionMap, regions = makeRegions(8) + batch, err = paginateScanRegion( + ctx, newTestClient(stores, regionMap, 0), regions[1].Region.StartKey, []byte{}, 3) + c.Assert(err, IsNil) + c.Assert(batch, DeepEquals, regions[1:]) + + batch, err = paginateScanRegion( + ctx, newTestClient(stores, regionMap, 0), []byte{}, regions[6].Region.EndKey, 3) + c.Assert(err, IsNil) + c.Assert(batch, DeepEquals, regions[:7]) + + batch, err = paginateScanRegion( + ctx, newTestClient(stores, regionMap, 0), regions[1].Region.StartKey, regions[1].Region.EndKey, 3) + c.Assert(err, IsNil) + c.Assert(batch, DeepEquals, regions[1:2]) + + _, err = paginateScanRegion(ctx, newTestClient(stores, regionMap, 0), []byte{2}, []byte{1}, 3) + c.Assert(err, ErrorMatches, "startKey >= endKey.*") +} diff --git a/tests/_utils/run_services b/tests/_utils/run_services index 1118d7ccc..769b9b22a 100644 --- a/tests/_utils/run_services +++ b/tests/_utils/run_services @@ -48,7 +48,7 @@ start_services() { i=0 while ! curl -o /dev/null -sf "http://$PD_ADDR/pd/api/v1/version"; do i=$((i+1)) - if [ "$i" -gt 10 ]; then + if [ "$i" -gt 20 ]; then echo 'Failed to start PD' exit 1 fi @@ -70,7 +70,7 @@ start_services() { echo "Waiting initializing TiKV..." while ! curl -sf "http://$PD_ADDR/pd/api/v1/cluster/status" | grep '"is_initialized": true'; do i=$((i+1)) - if [ "$i" -gt 10 ]; then + if [ "$i" -gt 20 ]; then echo 'Failed to initialize TiKV cluster' exit 1 fi @@ -90,7 +90,7 @@ start_services() { i=0 while ! curl -o /dev/null -sf "http://$TIDB_IP:10080/status"; do i=$((i+1)) - if [ "$i" -gt 10 ]; then + if [ "$i" -gt 20 ]; then echo 'Failed to start TiDB' exit 1 fi @@ -100,7 +100,7 @@ start_services() { i=0 while ! curl "http://$PD_ADDR/pd/api/v1/cluster/status" -sf | grep -q "\"is_initialized\": true"; do i=$((i+1)) - if [ "$i" -gt 10 ]; then + if [ "$i" -gt 20 ]; then echo 'Failed to bootstrap cluster' exit 1 fi @@ -132,7 +132,7 @@ start_services_withTLS() { --key $1/certificates/client-key.pem \ -o /dev/null -sf "https://$PD_ADDR/pd/api/v1/version"; do i=$((i+1)) - if [ "$i" -gt 10 ]; then + if [ "$i" -gt 20 ]; then echo 'Failed to start PD' exit 1 fi @@ -155,7 +155,7 @@ start_services_withTLS() { --key $1/certificates/client-key.pem \ -sf "https://$PD_ADDR/pd/api/v1/cluster/status" | grep '"is_initialized": true'; do i=$((i+1)) - if [ "$i" -gt 10 ]; then + if [ "$i" -gt 20 ]; then echo 'Failed to initialize TiKV cluster' exit 1 fi @@ -178,7 +178,7 @@ start_services_withTLS() { --key $1/certificates/client-key.pem \ -o /dev/null -sf "https://$TIDB_IP:10080/status"; do i=$((i+1)) - if [ "$i" -gt 10 ]; then + if [ "$i" -gt 20 ]; then echo 'Failed to start TiDB' exit 1 fi @@ -191,7 +191,7 @@ start_services_withTLS() { --key $1/certificates/client-key.pem \ "https://$PD_ADDR/pd/api/v1/cluster/status" -sf | grep -q "\"is_initialized\": true"; do i=$((i+1)) - if [ "$i" -gt 10 ]; then + if [ "$i" -gt 20 ]; then echo 'Failed to bootstrap cluster' exit 1 fi