From 530fa2ed742e73017d6ce1e22074e9e93715d5d3 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Thu, 24 Nov 2022 14:59:34 +0800 Subject: [PATCH 1/7] add batch cop rfc --- docs/design/2022-11-23-batch-cop.md | 139 ++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 docs/design/2022-11-23-batch-cop.md diff --git a/docs/design/2022-11-23-batch-cop.md b/docs/design/2022-11-23-batch-cop.md new file mode 100644 index 0000000000000..2d2b855ad2c4b --- /dev/null +++ b/docs/design/2022-11-23-batch-cop.md @@ -0,0 +1,139 @@ +# Proposal: support batch coprocessor for tikv + +* Authors: [cfzjywxk](https://github.com/cfzjywxk) +* Tracking issue: TODO + +## Motivation + +The fanout issue in index lookup queries may increase the query latency and cost a lot. If there are +1,000 handles and they are distributed in 1,000 regions. TiDB would construct 1,000 small tasks to retrieve +the 1000 related row contents, even when all the region leaders are in the same store. The problem is: +1. Each task requires a single RPC request, there could be too many tasks or RPC requests though each +request just fetches a few rows. +2. Increasing task numbers may lead to more queueing, and tuning the concurrency parameters or task scheduling +policies is difficult, so it’s difficult to get the best performance. + +In the current coprocessor implementation, key ranges in the same region would be batched in a single +task(there is an 25000 upper limit which is hard coded), so how about batching all the cop tasks which would +be sent to the same store? + +In a user situation, the index range scan returns 4000000 rows, and finally 400000 coprocessor table-lookup +tasks are generated, which means the key ranges are scattered in different regions. + +## Optimization + +### The IndexLookUp Execution Review + +Usually, the IndexLookUp executor may have an index worker which tries to read index keys and handles +according to the index filter conditions. Each time it fetches enough handle data, it would create a +coprocessor table task and send it to the table workers. The handle data size limit could be configured +by the [tidb_index_lookup_size](https://docs.pingcap.com/zh/tidb/dev/system-variables#tidb_index_lookup_size) +system variable. + +When the table worker gets a coprocessor task, it would split the handle ranges according to the region +information from the region cache. Then these region-aware tasks are processed by the coprocessor client +which has a default concurrency limit configured by the [tidb_distsql_scan_concurrency](https://docs.pingcap.com/zh/tidb/dev/system-variables#tidb_distsql_scan_concurrency) system +variable(default value is 15). + +### Batching Strategy + +As coprocessor streaming is already deprecated, bringing it back may not be a good idea. To make the design +simple, we could just do the batching for each coprocessor table task separately. Different coprocessor table +tasks may still require different RPC requests, while row handle ranges within one task could be batched if +their region leaders are in the same store. So the main idea is trying to batch sending the tasks using one +RPC for each original `copTask` if the row handle range-related region leaders are located in the same store. + +With the batching optimization, each table lookup task processing may need at most as many as RPC number of +the store nodes. Consider a limit case, if the index scan returns 4000000 rows and each task range is one row +, there could be `4000000/25000=160` table lookup tasks each containg 25000 key ranges. But now the RPC number +would become at most `160 * store_numbers`, for example if store_number is 10, the total request number is +1600 which is much less than the previous 400000. + +### Proto Change + +The RegionInfo is already added by the tiflash batch coprocessor implementation. It contains information about +the task region and related key ranges. + +```protobuf +message RegionInfo { + uint64 region_id = 1; + metapb.RegionEpoch region_epoch = 2; + repeated KeyRange ranges = 3; +} +``` + +Attach the batched tasks into the `Corprocessor` request. Reuse the `RegionInfo` mentioned above to store tasks +in different regions but the same store. +```protobuf +message Request { + … + + // Store the batched tasks belonging to other regions. + repeated region_tasks = 11; +} +``` + +Add batched task results in `Response`, different tasks may encounter different kinds of errors, collect them +together. +```protobuf +message Response { + … + // The returned region error handling batch tasks. + repeated errorpb.Error region_errors = 13; + // The other errors handling batch tasks. + repeated string other_errors = 14; +} +``` + +### The TiDB Side + +Adding a flag in `kv.Request` to indicate if the batch strategy is enabled or not. +```golang +type Request struct { + … + // EnableStoreBatch indicates if the tasks are batched. + EnableStoreBatch bool +} +``` + +Adding batch task related fields in `corp.copTask`. They would be collected when the `copTask` is being +prepared and the store batch is enabled. +```golang +type copTask struct { + … + // + batchTaskList []kvproto.Coprocessor.RegionInfo +} +``` + +When building coprocessor tasks in the `buildCopTasks` function, try to fill the `batchTaskList` if +necessary.The steps are: +1. Creating a map to record `store address => *copTask`.If store batch is enabled, tasks would be appended +to existing `copTask` when the store address is the same. +2. Split the ranges according to the region information as usual. After this, all the tasks are correspond +to a single region. +3. When processing a new `KeyLocation`, try to append it as the batch task to the existing coprocessor task +if possible. + +The coprocessor client just sends the tasks as usual, the `Coprocessor` request is still a unary RPC +request though it may be batched. When handling `CopResponse`, if the batch path is enabled and +there are region errors or other errors processing batch tasks, rescheduling the cop tasks or +reporting errors to the upper layer. + +Note if the `keepOrder` is quired, the partial query result could not be sent back until all the reads +have succeeded. + + + +### The TiKV Side + +A simple way is to change the logic in `Endpoint.parse_and_handle_unary_request`, after parsing the +original request, the batched task-related builder and handler could be generated using the input +information from the RPC context, region information, and key ranges as long as they are properly passed in +the `Coprocessor` request. + +All the request handling could be scheduled to the read pool at the same time, +so before finishing something like `join_all` would be needed to wait for all the results of +different tasks. If any error is returned, do fill in the error fields in the `Response`. + + From bffc20ceb6a139f59ad26e73c42f086cb38d1b12 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Fri, 25 Nov 2022 09:58:49 +0800 Subject: [PATCH 2/7] Update 2022-11-23-batch-cop.md --- docs/design/2022-11-23-batch-cop.md | 40 ++++++++++++++--------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/docs/design/2022-11-23-batch-cop.md b/docs/design/2022-11-23-batch-cop.md index 2d2b855ad2c4b..70fb32ea1128f 100644 --- a/docs/design/2022-11-23-batch-cop.md +++ b/docs/design/2022-11-23-batch-cop.md @@ -1,20 +1,20 @@ # Proposal: support batch coprocessor for tikv * Authors: [cfzjywxk](https://github.com/cfzjywxk) -* Tracking issue: TODO +* Tracking issue: [39361](https://github.com/pingcap/tidb/issues/39361) ## Motivation -The fanout issue in index lookup queries may increase the query latency and cost a lot. If there are -1,000 handles and they are distributed in 1,000 regions. TiDB would construct 1,000 small tasks to retrieve -the 1000 related row contents, even when all the region leaders are in the same store. The problem is: +The fanout issue in index lookup queries is one cause of increased query latency and cost. If there are +1,000 handles and they are distributed in 1,000 regions, TiDB would construct 1,000 small tasks to retrieve +the 1000 related row contents, even when all the region leaders are in the same store. This results in the following problems: 1. Each task requires a single RPC request, there could be too many tasks or RPC requests though each -request just fetches a few rows. -2. Increasing task numbers may lead to more queueing, and tuning the concurrency parameters or task scheduling -policies is difficult, so it’s difficult to get the best performance. +request just fetches a few rows. Sometimes the cost of RPC could not be ignored. +2. Increasing task numbers may lead to more queueing. Tuning the related concurrency parameters or task scheduling +policies become more complex and it’s difficult to get best performance. In the current coprocessor implementation, key ranges in the same region would be batched in a single -task(there is an 25000 upper limit which is hard coded), so how about batching all the cop tasks which would +task(there is a hard coded 25000 upper limit), how about batching all the cop tasks which would be sent to the same store? In a user situation, the index range scan returns 4000000 rows, and finally 400000 coprocessor table-lookup @@ -24,28 +24,28 @@ tasks are generated, which means the key ranges are scattered in different regio ### The IndexLookUp Execution Review -Usually, the IndexLookUp executor may have an index worker which tries to read index keys and handles -according to the index filter conditions. Each time it fetches enough handle data, it would create a -coprocessor table task and send it to the table workers. The handle data size limit could be configured -by the [tidb_index_lookup_size](https://docs.pingcap.com/zh/tidb/dev/system-variables#tidb_index_lookup_size) +Usually, the IndexLookUp executor may have an index worker which tries to read index keys and related row handles +according to the index filter conditions. Each time it fetches enough row handle data, it would create a +coprocessor table lookup task and send it to the table workers. The handle data size limit for one task could be configured +by the [tidb_index_lookup_size](https://docs.pingcap.com/tidb/dev/system-variables#tidb_index_lookup_size) system variable. -When the table worker gets a coprocessor task, it would split the handle ranges according to the region +When the table worker gets a coprocessor task, it would split the handle ranges according to the region information from the region cache. Then these region-aware tasks are processed by the coprocessor client -which has a default concurrency limit configured by the [tidb_distsql_scan_concurrency](https://docs.pingcap.com/zh/tidb/dev/system-variables#tidb_distsql_scan_concurrency) system -variable(default value is 15). +which has a default concurrency limit configured by the [tidb_distsql_scan_concurrency](https://docs.pingcap.com/tidb/dev/system-variables#tidb_distsql_scan_concurrency) system +variable. ### Batching Strategy As coprocessor streaming is already deprecated, bringing it back may not be a good idea. To make the design simple, we could just do the batching for each coprocessor table task separately. Different coprocessor table tasks may still require different RPC requests, while row handle ranges within one task could be batched if -their region leaders are in the same store. So the main idea is trying to batch sending the tasks using one -RPC for each original `copTask` if the row handle range-related region leaders are located in the same store. +their region leaders are in the same store. The main idea is trying to batch sending the tasks using one +RPC for each original `copTask` if the row handle range-related region leaders are located in the same tikv store. With the batching optimization, each table lookup task processing may need at most as many as RPC number of the store nodes. Consider a limit case, if the index scan returns 4000000 rows and each task range is one row -, there could be `4000000/25000=160` table lookup tasks each containg 25000 key ranges. But now the RPC number +, there could be as many as `4000000/25000=160` table lookup tasks each containg 25000 key ranges. But now the RPC number would become at most `160 * store_numbers`, for example if store_number is 10, the total request number is 1600 which is much less than the previous 400000. @@ -120,7 +120,7 @@ request though it may be batched. When handling `CopResponse`, if the batch path there are region errors or other errors processing batch tasks, rescheduling the cop tasks or reporting errors to the upper layer. -Note if the `keepOrder` is quired, the partial query result could not be sent back until all the reads +Note if the `keepOrder` is required, the partial query result could not be sent back until all the reads have succeeded. @@ -128,7 +128,7 @@ have succeeded. ### The TiKV Side A simple way is to change the logic in `Endpoint.parse_and_handle_unary_request`, after parsing the -original request, the batched task-related builder and handler could be generated using the input +original request, the batched task-related builder and handler could be also generated using the input information from the RPC context, region information, and key ranges as long as they are properly passed in the `Coprocessor` request. From 90ea9224fed76af7344a4931f2881a2dd24f2f3b Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Fri, 25 Nov 2022 09:59:10 +0800 Subject: [PATCH 3/7] Update docs/design/2022-11-23-batch-cop.md Co-authored-by: Yilin Chen --- docs/design/2022-11-23-batch-cop.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/design/2022-11-23-batch-cop.md b/docs/design/2022-11-23-batch-cop.md index 70fb32ea1128f..0e98bd55b0104 100644 --- a/docs/design/2022-11-23-batch-cop.md +++ b/docs/design/2022-11-23-batch-cop.md @@ -96,7 +96,7 @@ type Request struct { } ``` -Adding batch task related fields in `corp.copTask`. They would be collected when the `copTask` is being +Adding batch task related fields in `copr.copTask`. They would be collected when the `copTask` is being prepared and the store batch is enabled. ```golang type copTask struct { From 207a6699febcc04aa8b7d602cf1646aaaf52db77 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Mon, 28 Nov 2022 09:40:57 +0800 Subject: [PATCH 4/7] Update docs/design/2022-11-23-batch-cop.md Co-authored-by: ekexium --- docs/design/2022-11-23-batch-cop.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/design/2022-11-23-batch-cop.md b/docs/design/2022-11-23-batch-cop.md index 0e98bd55b0104..6746198deb96f 100644 --- a/docs/design/2022-11-23-batch-cop.md +++ b/docs/design/2022-11-23-batch-cop.md @@ -43,7 +43,7 @@ tasks may still require different RPC requests, while row handle ranges within o their region leaders are in the same store. The main idea is trying to batch sending the tasks using one RPC for each original `copTask` if the row handle range-related region leaders are located in the same tikv store. -With the batching optimization, each table lookup task processing may need at most as many as RPC number of +With the batching optimization, the number of RPC requests may be at most the number of store nodes for each table lookup task the store nodes. Consider a limit case, if the index scan returns 4000000 rows and each task range is one row , there could be as many as `4000000/25000=160` table lookup tasks each containg 25000 key ranges. But now the RPC number would become at most `160 * store_numbers`, for example if store_number is 10, the total request number is From 2447324aa6e42b3444f12d62e1be9cb3a5ed31a0 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Mon, 28 Nov 2022 09:41:05 +0800 Subject: [PATCH 5/7] Update docs/design/2022-11-23-batch-cop.md Co-authored-by: ekexium --- docs/design/2022-11-23-batch-cop.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/design/2022-11-23-batch-cop.md b/docs/design/2022-11-23-batch-cop.md index 6746198deb96f..d376137d75b1a 100644 --- a/docs/design/2022-11-23-batch-cop.md +++ b/docs/design/2022-11-23-batch-cop.md @@ -44,7 +44,7 @@ their region leaders are in the same store. The main idea is trying to batch sen RPC for each original `copTask` if the row handle range-related region leaders are located in the same tikv store. With the batching optimization, the number of RPC requests may be at most the number of store nodes for each table lookup task -the store nodes. Consider a limit case, if the index scan returns 4000000 rows and each task range is one row +. Consider an extreme case, if the index scan returns 4000000 rows and each task range is one row , there could be as many as `4000000/25000=160` table lookup tasks each containg 25000 key ranges. But now the RPC number would become at most `160 * store_numbers`, for example if store_number is 10, the total request number is 1600 which is much less than the previous 400000. From 818f6ae0cdd0540c1bc24a29c616ac4939891fa5 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Mon, 28 Nov 2022 09:45:07 +0800 Subject: [PATCH 6/7] Update 2022-11-23-batch-cop.md --- docs/design/2022-11-23-batch-cop.md | 30 +++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/docs/design/2022-11-23-batch-cop.md b/docs/design/2022-11-23-batch-cop.md index d376137d75b1a..449855c14f313 100644 --- a/docs/design/2022-11-23-batch-cop.md +++ b/docs/design/2022-11-23-batch-cop.md @@ -51,14 +51,26 @@ would become at most `160 * store_numbers`, for example if store_number is 10, t ### Proto Change -The RegionInfo is already added by the tiflash batch coprocessor implementation. It contains information about -the task region and related key ranges. +Create a new structure for the batched tasks, including the request `StoreBatchTask` and response `StoreBatchTaskResponse` types. ```protobuf -message RegionInfo { +message StoreBatchTask { uint64 region_id = 1; metapb.RegionEpoch region_epoch = 2; - repeated KeyRange ranges = 3; + metapb.Peer peer = 3; + repeated KeyRange ranges = 4; + uint64 task_id = 5; +} +``` + +```protobuf +message StoreBatchTaskResponse { + bytes data = 1 [(gogoproto.customtype) = "github.com/pingcap/kvproto/pkg/sharedbytes.SharedBytes", (gogoproto.nullable) = false]; + errorpb.Error region_error = 2; + kvrpcpb.LockInfo locked = 3; + string other_error = 4; + uint64 task_id = 5; + kvrpcpb.ExecDetailsV2 exec_details_v2 = 6; } ``` @@ -69,7 +81,7 @@ message Request { … // Store the batched tasks belonging to other regions. - repeated region_tasks = 11; + repeated StoreBatchTask tasks = 11; } ``` @@ -78,10 +90,7 @@ together. ```protobuf message Response { … - // The returned region error handling batch tasks. - repeated errorpb.Error region_errors = 13; - // The other errors handling batch tasks. - repeated string other_errors = 14; + repeated StoreBatchTaskResponse batch_responses = 13; } ``` @@ -136,4 +145,5 @@ All the request handling could be scheduled to the read pool at the same time, so before finishing something like `join_all` would be needed to wait for all the results of different tasks. If any error is returned, do fill in the error fields in the `Response`. - +For the execution tracking, creating seperate trackers for the requests, all the execution details would be returned +to the client. From d4cb1f331a50e59d9ef0fc255e39278fb8dcf534 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Mon, 28 Nov 2022 09:45:20 +0800 Subject: [PATCH 7/7] Update docs/design/2022-11-23-batch-cop.md Co-authored-by: ekexium --- docs/design/2022-11-23-batch-cop.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/design/2022-11-23-batch-cop.md b/docs/design/2022-11-23-batch-cop.md index 449855c14f313..34f81fea82f46 100644 --- a/docs/design/2022-11-23-batch-cop.md +++ b/docs/design/2022-11-23-batch-cop.md @@ -119,7 +119,7 @@ When building coprocessor tasks in the `buildCopTasks` function, try to fill the necessary.The steps are: 1. Creating a map to record `store address => *copTask`.If store batch is enabled, tasks would be appended to existing `copTask` when the store address is the same. -2. Split the ranges according to the region information as usual. After this, all the tasks are correspond +2. Split the ranges according to the region information as usual. After this, all the tasks correspond to a single region. 3. When processing a new `KeyLocation`, try to append it as the batch task to the existing coprocessor task if possible.