-
Notifications
You must be signed in to change notification settings - Fork 6.2k
doc: add batch coprocessor rfc #39362
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
530fa2e
add batch cop rfc
cfzjywxk bffc20c
Update 2022-11-23-batch-cop.md
cfzjywxk 90ea922
Update docs/design/2022-11-23-batch-cop.md
cfzjywxk 207a669
Update docs/design/2022-11-23-batch-cop.md
cfzjywxk 2447324
Update docs/design/2022-11-23-batch-cop.md
cfzjywxk 818f6ae
Update 2022-11-23-batch-cop.md
cfzjywxk d4cb1f3
Update docs/design/2022-11-23-batch-cop.md
cfzjywxk File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,149 @@ | ||
| # Proposal: support batch coprocessor for tikv | ||
|
|
||
| * Authors: [cfzjywxk](https://github.com/cfzjywxk) | ||
| * Tracking issue: [39361](https://github.com/pingcap/tidb/issues/39361) | ||
|
|
||
| ## Motivation | ||
|
|
||
| The fanout issue in index lookup queries is one cause of increased query latency and cost. If there are | ||
| 1,000 handles and they are distributed in 1,000 regions, TiDB would construct 1,000 small tasks to retrieve | ||
| the 1000 related row contents, even when all the region leaders are in the same store. This results in the following problems: | ||
| 1. Each task requires a single RPC request, there could be too many tasks or RPC requests though each | ||
| request just fetches a few rows. Sometimes the cost of RPC could not be ignored. | ||
| 2. Increasing task numbers may lead to more queueing. Tuning the related concurrency parameters or task scheduling | ||
| policies become more complex and it’s difficult to get best performance. | ||
|
|
||
| In the current coprocessor implementation, key ranges in the same region would be batched in a single | ||
| task(there is a hard coded 25000 upper limit), how about batching all the cop tasks which would | ||
| be sent to the same store? | ||
|
|
||
| In a user situation, the index range scan returns 4000000 rows, and finally 400000 coprocessor table-lookup | ||
| tasks are generated, which means the key ranges are scattered in different regions. | ||
|
|
||
| ## Optimization | ||
|
|
||
| ### The IndexLookUp Execution Review | ||
|
|
||
| Usually, the IndexLookUp executor may have an index worker which tries to read index keys and related row handles | ||
| according to the index filter conditions. Each time it fetches enough row handle data, it would create a | ||
| coprocessor table lookup task and send it to the table workers. The handle data size limit for one task could be configured | ||
| by the [tidb_index_lookup_size](https://docs.pingcap.com/tidb/dev/system-variables#tidb_index_lookup_size) | ||
| system variable. | ||
|
|
||
| When the table worker gets a coprocessor task, it would split the handle ranges according to the region | ||
| information from the region cache. Then these region-aware tasks are processed by the coprocessor client | ||
| which has a default concurrency limit configured by the [tidb_distsql_scan_concurrency](https://docs.pingcap.com/tidb/dev/system-variables#tidb_distsql_scan_concurrency) system | ||
| variable. | ||
|
|
||
| ### Batching Strategy | ||
|
|
||
| As coprocessor streaming is already deprecated, bringing it back may not be a good idea. To make the design | ||
| simple, we could just do the batching for each coprocessor table task separately. Different coprocessor table | ||
| tasks may still require different RPC requests, while row handle ranges within one task could be batched if | ||
| their region leaders are in the same store. The main idea is trying to batch sending the tasks using one | ||
| RPC for each original `copTask` if the row handle range-related region leaders are located in the same tikv store. | ||
|
|
||
| With the batching optimization, the number of RPC requests may be at most the number of store nodes for each table lookup task | ||
| . Consider an extreme case, if the index scan returns 4000000 rows and each task range is one row | ||
| , there could be as many as `4000000/25000=160` table lookup tasks each containg 25000 key ranges. But now the RPC number | ||
| would become at most `160 * store_numbers`, for example if store_number is 10, the total request number is | ||
| 1600 which is much less than the previous 400000. | ||
|
|
||
| ### Proto Change | ||
|
|
||
| Create a new structure for the batched tasks, including the request `StoreBatchTask` and response `StoreBatchTaskResponse` types. | ||
|
|
||
| ```protobuf | ||
| message StoreBatchTask { | ||
| uint64 region_id = 1; | ||
| metapb.RegionEpoch region_epoch = 2; | ||
| metapb.Peer peer = 3; | ||
| repeated KeyRange ranges = 4; | ||
| uint64 task_id = 5; | ||
| } | ||
| ``` | ||
|
|
||
| ```protobuf | ||
| message StoreBatchTaskResponse { | ||
| bytes data = 1 [(gogoproto.customtype) = "github.com/pingcap/kvproto/pkg/sharedbytes.SharedBytes", (gogoproto.nullable) = false]; | ||
| errorpb.Error region_error = 2; | ||
| kvrpcpb.LockInfo locked = 3; | ||
| string other_error = 4; | ||
| uint64 task_id = 5; | ||
| kvrpcpb.ExecDetailsV2 exec_details_v2 = 6; | ||
| } | ||
| ``` | ||
|
|
||
| Attach the batched tasks into the `Corprocessor` request. Reuse the `RegionInfo` mentioned above to store tasks | ||
| in different regions but the same store. | ||
| ```protobuf | ||
| message Request { | ||
| … | ||
|
|
||
| // Store the batched tasks belonging to other regions. | ||
| repeated StoreBatchTask tasks = 11; | ||
| } | ||
| ``` | ||
|
|
||
| Add batched task results in `Response`, different tasks may encounter different kinds of errors, collect them | ||
| together. | ||
| ```protobuf | ||
| message Response { | ||
| … | ||
| repeated StoreBatchTaskResponse batch_responses = 13; | ||
| } | ||
| ``` | ||
|
|
||
| ### The TiDB Side | ||
|
|
||
| Adding a flag in `kv.Request` to indicate if the batch strategy is enabled or not. | ||
| ```golang | ||
| type Request struct { | ||
| … | ||
| // EnableStoreBatch indicates if the tasks are batched. | ||
| EnableStoreBatch bool | ||
| } | ||
| ``` | ||
|
|
||
| Adding batch task related fields in `copr.copTask`. They would be collected when the `copTask` is being | ||
| prepared and the store batch is enabled. | ||
| ```golang | ||
| type copTask struct { | ||
| … | ||
| // | ||
| batchTaskList []kvproto.Coprocessor.RegionInfo | ||
| } | ||
| ``` | ||
|
|
||
| When building coprocessor tasks in the `buildCopTasks` function, try to fill the `batchTaskList` if | ||
| necessary.The steps are: | ||
| 1. Creating a map to record `store address => *copTask`.If store batch is enabled, tasks would be appended | ||
| to existing `copTask` when the store address is the same. | ||
| 2. Split the ranges according to the region information as usual. After this, all the tasks correspond | ||
| to a single region. | ||
| 3. When processing a new `KeyLocation`, try to append it as the batch task to the existing coprocessor task | ||
| if possible. | ||
|
|
||
| The coprocessor client just sends the tasks as usual, the `Coprocessor` request is still a unary RPC | ||
| request though it may be batched. When handling `CopResponse`, if the batch path is enabled and | ||
| there are region errors or other errors processing batch tasks, rescheduling the cop tasks or | ||
| reporting errors to the upper layer. | ||
|
|
||
| Note if the `keepOrder` is required, the partial query result could not be sent back until all the reads | ||
| have succeeded. | ||
|
|
||
|
|
||
|
|
||
| ### The TiKV Side | ||
|
|
||
| A simple way is to change the logic in `Endpoint.parse_and_handle_unary_request`, after parsing the | ||
| original request, the batched task-related builder and handler could be also generated using the input | ||
| information from the RPC context, region information, and key ranges as long as they are properly passed in | ||
| the `Coprocessor` request. | ||
|
|
||
| All the request handling could be scheduled to the read pool at the same time, | ||
| so before finishing something like `join_all` would be needed to wait for all the results of | ||
| different tasks. If any error is returned, do fill in the error fields in the `Response`. | ||
|
|
||
| For the execution tracking, creating seperate trackers for the requests, all the execution details would be returned | ||
| to the client. | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.