Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
774 changes: 738 additions & 36 deletions DEPS.bzl

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,10 @@ replace (
// Downgrade grpc to v1.63.2, as well as other related modules.
github.com/apache/arrow-go/v18 => github.com/joechenrh/arrow-go/v18 v18.0.0-20250911101656-62c34c9a3b82
github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117
// Temporary: pin to JmPotato/kvproto demo/ru-paging-size which adds
// `paging_size_bytes` to coprocessor.Request. Revert once
// pingcap/kvproto#1448 is merged and tagged.
github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20260428063603-042bfc75f2ac
github.com/pingcap/tidb/pkg/parser => ./pkg/parser
// TODO: `sourcegraph.com/sourcegraph/appdash` has been archived, and the original host has been removed.
// Please remove these dependencies.
Expand Down
1,477 changes: 1,469 additions & 8 deletions go.sum

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pkg/distsql/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type DistSQLContext struct {
EnablePaging bool
MinPagingSize int
MaxPagingSize int
PagingSizeBytes int
RCNonBurstable bool
RequestSourceType string
ExplicitRequestSourceType string
StoreBatchSize int
Expand Down
2 changes: 2 additions & 0 deletions pkg/distsql/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func TestContextDetach(t *testing.T) {
EnablePaging: true,
MinPagingSize: 1,
MaxPagingSize: 1,
PagingSizeBytes: 1,
RCNonBurstable: true,
RequestSourceType: "a",
ExplicitRequestSourceType: "b",
StoreBatchSize: 1,
Expand Down
2 changes: 2 additions & 0 deletions pkg/distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ func (builder *RequestBuilder) SetFromSessionVars(dctx *distsqlctx.DistSQLContex
builder.SetPaging(dctx.EnablePaging)
builder.Request.Paging.MinPagingSize = uint64(dctx.MinPagingSize)
builder.Request.Paging.MaxPagingSize = uint64(dctx.MaxPagingSize)
builder.Request.Paging.PagingSizeBytes = uint64(dctx.PagingSizeBytes)
builder.Request.Paging.RCNonBurstable = dctx.RCNonBurstable
}
builder.RequestSource.RequestSourceInternal = dctx.InRestrictedSQL
builder.RequestSource.RequestSourceType = dctx.RequestSourceType
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,15 @@ type Request struct {
// when enabled, this field is adjusted to be max(MaxPagingSize, paging.MinAllowedMaxPagingSize),
// see paging.GrowPagingSize
MaxPagingSize uint64
// PagingSizeBytes is the byte budget per page.
// 0 means disabled (no byte-budget paging).
PagingSizeBytes uint64
// RCNonBurstable is true when the request is bound to a Resource
// Control resource group whose adjusted BurstLimit is non-negative
// (hard-capped at RU_PER_SEC). Used as an additional gate for
// PagingSizeBytes: byte-budget paging only delivers RU value for
// RC-capped groups.
RCNonBurstable bool
}
// RequestSource indicates whether the request is an internal request.
RequestSource util.RequestSource
Expand Down
13 changes: 13 additions & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3466,6 +3466,17 @@ func (s *session) GetDistSQLCtx() *distsqlctx.DistSQLContext {
ruConsumptionReporter = rgCtl
}
}
// rcNonBurstable gates byte-budget paging: only forward
// PagingSizeBytes when Resource Control is enabled and the current
// resource group is hard-capped at RU_PER_SEC. The default group
// is treated as a normal group; if its BurstLimit has been altered
// to a non-negative value, byte-budget paging will trigger.
rcNonBurstable := false
if vardef.EnableResourceControl.Load() && dom != nil && sc.ResourceGroupName != "" {
if rg, ok := dom.InfoSchema().ResourceGroupByName(ast.NewCIStr(sc.ResourceGroupName)); ok {
rcNonBurstable = rg.GetBurstLimitAdjusted() >= 0
}
}
return &distsqlctx.DistSQLContext{
WarnHandler: sc.WarnHandler,
InRestrictedSQL: sc.InRestrictedSQL,
Expand Down Expand Up @@ -3505,6 +3516,8 @@ func (s *session) GetDistSQLCtx() *distsqlctx.DistSQLContext {
EnablePaging: vars.EnablePaging,
MinPagingSize: vars.MinPagingSize,
MaxPagingSize: vars.MaxPagingSize,
PagingSizeBytes: vars.PagingSizeBytes,
RCNonBurstable: rcNonBurstable,
RequestSourceType: vars.RequestSourceType,
ExplicitRequestSourceType: vars.ExplicitRequestSourceType,
StoreBatchSize: vars.StoreBatchSize,
Expand Down
9 changes: 9 additions & 0 deletions pkg/sessionctx/vardef/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,14 @@ const (
// TiDBMaxPagingSize is used to control the max paging size in the coprocessor paging protocol.
TiDBMaxPagingSize = "tidb_max_paging_size"

// TiDBPagingSizeBytes is the byte budget per coprocessor page.
// When non-zero, TiKV stops scanning a page once accumulated response bytes
// reach this limit. Setting a non-zero value implicitly enables row-count
// paging if it is not already enabled, because the TiKV paging protocol
// requires it.
// 0 means disabled (no byte-budget paging).
TiDBPagingSizeBytes = "tidb_paging_size_bytes"

// TiDBEnableCascadesPlanner is used to control whether to enable the cascades planner.
TiDBEnableCascadesPlanner = "tidb_enable_cascades_planner"

Expand Down Expand Up @@ -1491,6 +1499,7 @@ const (
DefInitChunkSize = 32
DefMinPagingSize = int(paging.MinPagingSize)
DefMaxPagingSize = int(paging.MinAllowedMaxPagingSize)
DefPagingSizeBytes = 0
DefMaxChunkSize = 1024
DefDMLBatchSize = 0
DefMaxPreparedStmtCount = -1
Expand Down
5 changes: 5 additions & 0 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1749,6 +1749,10 @@ type SessionVars struct {
// NOTE: please don't change it directly. Use `SetResourceGroupName`, because it'll need to inc/dec the metrics
ResourceGroupName string

// PagingSizeBytes is the byte budget per page.
// 0 means disabled.
PagingSizeBytes int

// PessimisticTransactionFairLocking controls whether fair locking for pessimistic transaction
// is enabled.
PessimisticTransactionFairLocking bool
Expand Down Expand Up @@ -2468,6 +2472,7 @@ func NewSessionVars(hctx HookContext) *SessionVars {
EnableLateMaterialization: vardef.DefTiDBOptEnableLateMaterialization,
TiFlashComputeDispatchPolicy: tiflashcompute.DispatchPolicyConsistentHash,
ResourceGroupName: resourcegroup.DefaultResourceGroupName,
PagingSizeBytes: vardef.DefPagingSizeBytes,
DefaultCollationForUTF8MB4: mysql.DefaultCollationName,
GroupConcatMaxLen: vardef.DefGroupConcatMaxLen,
EnableRedactLog: vardef.DefTiDBRedactLog,
Expand Down
1 change: 1 addition & 0 deletions pkg/sessionctx/variable/setvar_affect.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ var isHintUpdatableVerified = map[string]struct{}{
"tidb_max_chunk_size": {},
"tidb_min_paging_size": {},
"tidb_max_paging_size": {},
"tidb_paging_size_bytes": {},
"tidb_enable_cascades_planner": {},
"tidb_merge_join_concurrency": {},
"tidb_index_merge_intersection_concurrency": {},
Expand Down
4 changes: 4 additions & 0 deletions pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2982,6 +2982,10 @@ var defaultSysVars = []*SysVar{
s.MaxPagingSize = tidbOptPositiveInt32(val, vardef.DefMaxPagingSize)
return nil
}},
{Scope: vardef.ScopeGlobal | vardef.ScopeSession, Name: vardef.TiDBPagingSizeBytes, Value: strconv.Itoa(vardef.DefPagingSizeBytes), Type: vardef.TypeUnsigned, MinValue: 0, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error {
s.PagingSizeBytes = int(TidbOptInt64(val, int64(vardef.DefPagingSizeBytes)))
return nil
}},
{Scope: vardef.ScopeSession, Name: vardef.TiDBMemoryDebugModeMinHeapInUse, Value: strconv.Itoa(0), Type: vardef.TypeInt, MinValue: math.MinInt64, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error {
s.MemoryDebugModeMinHeapInUse = TidbOptInt64(val, 0)
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ go_test(
embed = [":copr"],
flaky = True,
race = "on",
shard_count = 40,
shard_count = 42,
deps = [
"//pkg/kv",
"//pkg/store/driver/backoff",
Expand Down
8 changes: 5 additions & 3 deletions pkg/store/copr/copr_test/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,11 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) {
Concurrency: 15,
StoreBatchSize: 3,
Paging: struct {
Enable bool
MinPagingSize uint64
MaxPagingSize uint64
Enable bool
MinPagingSize uint64
MaxPagingSize uint64
PagingSizeBytes uint64
RCNonBurstable bool
}{
Enable: true,
MinPagingSize: 1,
Expand Down
62 changes: 53 additions & 9 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,23 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars
// coprocessor request but type is not DAG
req.Paging.Enable = false
}
// Byte-budget paging: if the request is eligible and has a byte budget,
// ensure paging is enabled so each page's scanned bytes are bounded.
// The TiKV coprocessor protocol requires paging to be enabled for the
// PagingSizeBytes field to take effect. When force-enabling paging, we
// use minimal row-count parameters so the byte budget becomes the
// dominant page-break signal.
// This must happen before checkStoreBatchCopr, which disables batch copr
// when paging is enabled.
pagingSizeBytes := uint64(0)
if pagingBytesEligible(req) && req.Paging.PagingSizeBytes > 0 {
if !req.Paging.Enable {
req.Paging.Enable = true
req.Paging.MinPagingSize = paging.MinPagingSize
req.Paging.MaxPagingSize = paging.MinAllowedMaxPagingSize
}
pagingSizeBytes = req.Paging.PagingSizeBytes
}
failpoint.Inject("checkKeyRangeSortedForPaging", func(_ failpoint.Value) {
if req.Paging.Enable {
if !req.KeyRanges.IsFullySorted() {
Expand Down Expand Up @@ -160,11 +177,12 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars
tryRowHint := optRowHint(req)
elapsed := time.Duration(0)
buildOpt := &buildCopTaskOpt{
req: req,
cache: c.store.GetRegionCache(),
eventCb: eventCb,
respChan: req.KeepOrder,
elapsed: &elapsed,
req: req,
cache: c.store.GetRegionCache(),
eventCb: eventCb,
respChan: req.KeepOrder,
elapsed: &elapsed,
pagingSizeBytes: pagingSizeBytes,
}
buildTaskFunc := func(ranges []kv.KeyRange, hints []int) error {
keyRanges := NewKeyRanges(ranges)
Expand Down Expand Up @@ -287,10 +305,11 @@ type copTask struct {
cmdType tikvrpc.CmdType
storeType kv.StoreType

eventCb trxevents.EventCallback
paging bool
pagingSize uint64
pagingTaskIdx uint32
eventCb trxevents.EventCallback
paging bool
pagingSize uint64
pagingSizeBytes uint64
pagingTaskIdx uint32

partitionIndex int64 // used by balanceBatchCopTask in PartitionTableScan
requestSource util.RequestSource
Expand Down Expand Up @@ -367,6 +386,9 @@ type buildCopTaskOpt struct {
skipBuckets bool
// exceedsBoundRetry propagates bounded retry attempts to generated tasks.
exceedsBoundRetry int
// pagingSizeBytes is the byte budget per page.
// 0 means no byte-based limit.
pagingSizeBytes uint64
}

const (
Expand Down Expand Up @@ -690,6 +712,7 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c
eventCb: eventCb,
paging: req.Paging.Enable,
pagingSize: pagingSize,
pagingSizeBytes: opt.pagingSizeBytes,
requestSource: req.RequestSource,
RowCountHint: hint,
busyThreshold: req.StoreBusyThreshold,
Expand Down Expand Up @@ -717,6 +740,7 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c
// disable paging for small limit.
task.paging = false
task.pagingSize = 0
task.pagingSizeBytes = 0
} else {
pagingSize = paging.GrowPagingSize(pagingSize, req.Paging.MaxPagingSize)
}
Expand Down Expand Up @@ -1662,6 +1686,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask) (*
Ranges: task.ranges.ToPBRanges(),
SchemaVer: worker.req.SchemaVar,
PagingSize: task.pagingSize,
PagingSizeBytes: task.pagingSizeBytes,
Tasks: task.ToPBBatchTasks(),
ConnectionId: worker.req.ConnID,
ConnectionAlias: worker.req.ConnAlias,
Expand Down Expand Up @@ -1850,6 +1875,7 @@ func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *ti
// If there is region error or lock error, keep the paging size and retry.
for _, remainedTask := range result.remains {
remainedTask.pagingSize = task.pagingSize
remainedTask.pagingSizeBytes = task.pagingSizeBytes
}
return result, nil
}
Expand Down Expand Up @@ -2889,6 +2915,24 @@ func optRowHint(req *kv.Request) bool {
return opt
}

// pagingBytesEligible checks whether byte-budget paging should be applied.
// Only DAG requests on TiKV bound to a Resource Control group whose
// BurstLimit is non-negative (hard-capped at RU_PER_SEC) are eligible:
// for burstable/unlimited groups (or when RC is disabled) the per-page
// byte break adds RPC overhead without bounding any token budget.
func pagingBytesEligible(req *kv.Request) bool {
if req.StoreType != kv.TiKV {
return false
}
if req.Tp != kv.ReqTypeDAG {
return false
}
if !req.Paging.RCNonBurstable {
return false
}
return true
}

func checkStoreBatchCopr(req *kv.Request) bool {
if req.Tp != kv.ReqTypeDAG || req.StoreType != kv.TiKV {
return false
Expand Down
72 changes: 72 additions & 0 deletions pkg/store/copr/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,78 @@ func TestBuildPagingTasksDisablePagingForSmallLimit(t *testing.T) {
require.Equal(t, tasks[0].pagingSize, uint64(0))
}

func TestPagingBytesEligible(t *testing.T) {
// Eligible: TiKV + DAG + RC-capped group.
req := &kv.Request{
Tp: kv.ReqTypeDAG,
StoreType: kv.TiKV,
}
req.Paging.RCNonBurstable = true
require.True(t, pagingBytesEligible(req))

// Not eligible: TiFlash.
req2 := &kv.Request{Tp: kv.ReqTypeDAG, StoreType: kv.TiFlash}
req2.Paging.RCNonBurstable = true
require.False(t, pagingBytesEligible(req2))

// Not eligible: non-DAG.
req3 := &kv.Request{Tp: kv.ReqTypeAnalyze, StoreType: kv.TiKV}
req3.Paging.RCNonBurstable = true
require.False(t, pagingBytesEligible(req3))

// Not eligible: RC disabled or current group is burstable/unlimited.
req4 := &kv.Request{Tp: kv.ReqTypeDAG, StoreType: kv.TiKV}
require.False(t, pagingBytesEligible(req4))
}

func TestBuildCopTasksWithPagingSizeBytes(t *testing.T) {
mockClient, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
defer func() {
pdClient.Close()
err = mockClient.Close()
require.NoError(t, err)
}()
_, regionIDs, _ := testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t"))

pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
defer pdCli.Close()
cache := NewRegionCache(tikv.NewRegionCache(pdCli))
defer cache.Close()
bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil)

req := &kv.Request{}
req.Paging.Enable = true
req.Paging.MinPagingSize = paging.MinPagingSize
req.Paging.RCNonBurstable = true

// With pagingSizeBytes set, tasks should carry the byte budget.
tasks, err := buildCopTasks(bo, buildCopRanges("a", "c"), &buildCopTaskOpt{
req: req,
cache: cache,
respChan: true,
pagingSizeBytes: uint64(4 * 1024 * 1024),
})
require.NoError(t, err)
require.Len(t, tasks, 1)
taskEqual(t, tasks[0], regionIDs[0], 0, "a", "c")
require.True(t, tasks[0].paging)
require.Equal(t, uint64(4*1024*1024), tasks[0].pagingSizeBytes)

// When small limit disables paging, pagingSizeBytes should also be cleared.
req.LimitSize = 1
tasks, err = buildCopTasks(bo, buildCopRanges("a", "c"), &buildCopTaskOpt{
req: req,
cache: cache,
respChan: true,
pagingSizeBytes: uint64(4 * 1024 * 1024),
})
require.NoError(t, err)
require.Len(t, tasks, 1)
require.False(t, tasks[0].paging)
require.Equal(t, uint64(0), tasks[0].pagingSizeBytes)
}

func toCopRange(r kv.KeyRange) *coprocessor.KeyRange {
coprRange := coprocessor.KeyRange{}
coprRange.Start = r.StartKey
Expand Down
22 changes: 22 additions & 0 deletions tests/integrationtest/r/sessionctx/setvar.result
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,28 @@ set @@tidb_max_paging_size=default;
select @@tidb_max_paging_size;
@@tidb_max_paging_size
50000
select /*+ set_var(tidb_paging_size_bytes=1048576) */ @@tidb_paging_size_bytes;
@@tidb_paging_size_bytes
1048576
select @@tidb_paging_size_bytes;
@@tidb_paging_size_bytes
0
set @@tidb_paging_size_bytes=default;
select @@tidb_paging_size_bytes;
@@tidb_paging_size_bytes
0
set @@tidb_paging_size_bytes=0;
select @@tidb_paging_size_bytes;
@@tidb_paging_size_bytes
0
set @@tidb_paging_size_bytes=4194304;
select @@tidb_paging_size_bytes;
@@tidb_paging_size_bytes
4194304
set @@tidb_paging_size_bytes=default;
select @@tidb_paging_size_bytes;
@@tidb_paging_size_bytes
0
select /*+ set_var(tidb_enable_cascades_planner=0) */ @@tidb_enable_cascades_planner;
@@tidb_enable_cascades_planner
0
Expand Down
Loading
Loading