Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ func (do *Domain) EtcdClient() *clientv3.Client {
return do.etcdClient
}

var (
loadSchemaCounterSnapshot = metrics.LoadSchemaCounter.WithLabelValues("snapshot")
loadSchemaDurationTotal = metrics.LoadSchemaDuration.WithLabelValues("total")
loadSchemaDurationLoadDiff = metrics.LoadSchemaDuration.WithLabelValues("load-diff")
loadSchemaDurationLoadAll = metrics.LoadSchemaDuration.WithLabelValues("load-all")
)

// loadInfoSchema loads infoschema at startTS.
// It returns:
// 1. the needed infoschema
Expand All @@ -172,6 +179,10 @@ func (do *Domain) EtcdClient() *clientv3.Client {
// 4. the changed table IDs if it is not full load
// 5. an error if any
func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, int64, *transaction.RelatedSchemaChange, error) {
beginTime := time.Now()
defer func() {
loadSchemaDurationTotal.Observe(time.Since(beginTime).Seconds())
}()
snapshot := do.store.GetSnapshot(kv.NewVersion(startTS))
m := meta.NewSnapshotMeta(snapshot)
neededSchemaVersion, err := m.GetSchemaVersionWithNonEmptyDiff()
Expand Down Expand Up @@ -207,6 +218,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 {
is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion)
if err == nil {
loadSchemaDurationLoadDiff.Observe(time.Since(startTime).Seconds())
do.infoCache.Insert(is, uint64(schemaTs))
logutil.BgLogger().Info("diff load InfoSchema success",
zap.Int64("currentSchemaVersion", currentSchemaVersion),
Expand Down Expand Up @@ -234,6 +246,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
if err != nil {
return nil, false, currentSchemaVersion, nil, err
}
loadSchemaDurationLoadAll.Observe(time.Since(startTime).Seconds())
logutil.BgLogger().Info("full load InfoSchema success",
zap.Int64("currentSchemaVersion", currentSchemaVersion),
zap.Int64("neededSchemaVersion", neededSchemaVersion),
Expand Down Expand Up @@ -415,6 +428,7 @@ func (do *Domain) GetSnapshotInfoSchema(snapshotTS uint64) (infoschema.InfoSchem
return is, nil
}
is, _, _, _, err := do.loadInfoSchema(snapshotTS)
loadSchemaCounterSnapshot.Inc()
return is, err
}

Expand Down Expand Up @@ -496,7 +510,6 @@ func (do *Domain) Reload() error {
}

is, hitCache, oldSchemaVersion, changes, err := do.loadInfoSchema(ver.Ver)
metrics.LoadSchemaDuration.Observe(time.Since(startTime).Seconds())
if err != nil {
metrics.LoadSchemaCounter.WithLabelValues("failed").Inc()
return err
Expand Down Expand Up @@ -926,7 +939,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
exit: make(chan struct{}),
sysSessionPool: newSessionPool(capacity, factory),
statsLease: statsLease,
infoCache: infoschema.NewCache(16),
infoCache: infoschema.NewCache(int(variable.SchemaVersionCacheLimit.Load())),
slowQuery: newTopNSlowQueries(30, time.Hour*24*7, 500),
indexUsageSyncLease: idxUsageSyncLease,
dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{GetPlanReplayerDirName(), GetOptimizerTraceDirName()}},
Expand Down
1 change: 1 addition & 0 deletions domain/sysvar_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,5 +158,6 @@ func (do *Domain) rebuildSysVarCache(ctx sessionctx.Context) error {
defer do.sysVarCache.Unlock()
do.sysVarCache.session = newSessionCache
do.sysVarCache.global = newGlobalCache
do.infoCache.ReSize(int(variable.SchemaVersionCacheLimit.Load()))
return nil
}
17 changes: 17 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,23 @@ func TestSetVar(t *testing.T) {
require.Equal(t, uint64(2), tk.Session().GetSessionVars().CDCWriteSource)
tk.MustExec("set @@session.tidb_cdc_write_source = 0")
require.Equal(t, uint64(0), tk.Session().GetSessionVars().CDCWriteSource)

// test tidb_schema_version_cache_limit
tk.MustQuery("select @@global.tidb_schema_version_cache_limit").Check(testkit.Rows("64"))
tk.MustExec("set @@global.tidb_schema_version_cache_limit=64;")
tk.MustQuery("select @@global.tidb_schema_version_cache_limit").Check(testkit.Rows("64"))
tk.MustExec("set @@global.tidb_schema_version_cache_limit=2;")
tk.MustQuery("select @@global.tidb_schema_version_cache_limit").Check(testkit.Rows("2"))
tk.MustExec("set @@global.tidb_schema_version_cache_limit=256;")
tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_schema_version_cache_limit value: '256'"))
tk.MustQuery("select @@global.tidb_schema_version_cache_limit").Check(testkit.Rows("255"))
tk.MustExec("set @@global.tidb_schema_version_cache_limit=0;")
tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_schema_version_cache_limit value: '0'"))
tk.MustQuery("select @@global.tidb_schema_version_cache_limit").Check(testkit.Rows("2"))
tk.MustGetErrMsg("set @@global.tidb_schema_version_cache_limit='x';", "[variable:1232]Incorrect argument type to variable 'tidb_schema_version_cache_limit'")
tk.MustQuery("select @@global.tidb_schema_version_cache_limit").Check(testkit.Rows("2"))
tk.MustExec("set @@global.tidb_schema_version_cache_limit=64;")
tk.MustQuery("select @@global.tidb_schema_version_cache_limit").Check(testkit.Rows("64"))
}

func TestGetSetNoopVars(t *testing.T) {
Expand Down
24 changes: 24 additions & 0 deletions infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,30 @@ func NewCache(capacity int) *InfoCache {
}
}

// ReSize re-size the cache.
func (h *InfoCache) ReSize(capacity int) {
h.mu.Lock()
defer h.mu.Unlock()
if cap(h.cache) == capacity {
return
}
oldCache := h.cache
h.cache = make([]schemaAndTimestamp, 0, capacity)
for i, v := range oldCache {
if i >= capacity {
break
}
h.cache = append(h.cache, v)
}
}

// Size returns the size of the cache, export for test.
func (h *InfoCache) Size() int {
h.mu.Lock()
defer h.mu.Unlock()
return len(h.cache)
}

// Reset resets the cache.
func (h *InfoCache) Reset(capacity int) {
h.mu.Lock()
Expand Down
34 changes: 34 additions & 0 deletions infoschema/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,37 @@ func TestGetByTimestamp(t *testing.T) {
require.Equal(t, is3, ic.GetBySnapshotTS(3))
require.Equal(t, 3, ic.Len())
}

func TestReSize(t *testing.T) {
ic := infoschema.NewCache(2)
require.NotNil(t, ic)
is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1)
ic.Insert(is1, 1)
is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2)
ic.Insert(is2, 2)

ic.ReSize(3)
require.Equal(t, 2, ic.Size())
require.Equal(t, is1, ic.GetByVersion(1))
require.Equal(t, is2, ic.GetByVersion(2))
is3 := infoschema.MockInfoSchemaWithSchemaVer(nil, 3)
require.True(t, ic.Insert(is3, 3))
require.Equal(t, is1, ic.GetByVersion(1))
require.Equal(t, is2, ic.GetByVersion(2))
require.Equal(t, is3, ic.GetByVersion(3))

ic.ReSize(1)
require.Equal(t, 1, ic.Size())
require.Nil(t, ic.GetByVersion(1))
require.Nil(t, ic.GetByVersion(2))
require.Equal(t, is3, ic.GetByVersion(3))
require.False(t, ic.Insert(is2, 2))
require.Equal(t, 1, ic.Size())
is4 := infoschema.MockInfoSchemaWithSchemaVer(nil, 4)
require.True(t, ic.Insert(is4, 4))
require.Equal(t, 1, ic.Size())
require.Nil(t, ic.GetByVersion(1))
require.Nil(t, ic.GetByVersion(2))
require.Nil(t, ic.GetByVersion(3))
require.Equal(t, is4, ic.GetByVersion(4))
}
4 changes: 2 additions & 2 deletions metrics/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ var (
}, []string{LblType})

// LoadSchemaDuration records the duration of load schema.
LoadSchemaDuration = prometheus.NewHistogram(
LoadSchemaDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "domain",
Name: "load_schema_duration_seconds",
Help: "Bucketed histogram of processing time (s) in load schema.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms ~ 524s
})
}, []string{LblAction})

// InfoCacheCounters are the counters of get/hit.
InfoCacheCounters = prometheus.NewCounterVec(
Expand Down
132 changes: 122 additions & 10 deletions metrics/grafana/tidb.json
Original file line number Diff line number Diff line change
Expand Up @@ -11670,13 +11670,15 @@
"legend": {
"alignAsTable": true,
"avg": false,
"current": false,
"max": false,
"current": true,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"sort": "current",
"sortDesc": true,
"total": false,
"values": false
"values": true
},
"lines": true,
"linewidth": 1,
Expand All @@ -11696,10 +11698,10 @@
"steppedLine": false,
"targets": [
{
"expr": "histogram_quantile(0.99, sum(rate(tidb_domain_load_schema_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le, instance))",
"expr": "histogram_quantile(0.99, sum(rate(tidb_domain_load_schema_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le, action))",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{instance}}",
"legendFormat": "{{action}}",
"metric": "",
"refId": "A",
"step": 10
Expand All @@ -11709,7 +11711,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Load Schema Duration",
"title": "Load Schema Action Duration",
"tooltip": {
"msResolution": false,
"shared": true,
Expand Down Expand Up @@ -11881,13 +11883,15 @@
"legend": {
"alignAsTable": true,
"avg": false,
"current": false,
"max": false,
"current": true,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"sort": "current",
"sortDesc": true,
"total": false,
"values": false
"values": true
},
"lines": true,
"linewidth": 1,
Expand Down Expand Up @@ -11944,7 +11948,7 @@
{
"format": "short",
"label": null,
"logBase": 10,
"logBase": 1,
"max": null,
"min": null,
"show": true
Expand Down Expand Up @@ -12075,6 +12079,114 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "TiDB schema cache operations per second.",
"editable": true,
"error": false,
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 0,
"fillGradient": 0,
"grid": {},
"gridPos": {
"h": 7,
"w": 12,
"x": 12,
"y": 24
},
"hiddenSeries": false,
"id": 314,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"sort": "current",
"sortDesc": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null as zero",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.11",
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum(rate(tidb_domain_infocache_counters{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (action,type)",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
"legendFormat": "{{action}}-{{type}}",
"metric": "",
"refId": "A",
"step": 10
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Schema Cache OPS",
"tooltip": {
"msResolution": false,
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"repeat": null,
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2297,6 +2297,11 @@ var defaultSysVars = []*SysVar{
return nil
},
},
{Scope: ScopeGlobal, Name: TiDBSchemaVersionCacheLimit, Value: strconv.Itoa(DefTiDBSchemaVersionCacheLimit), Type: TypeInt, MinValue: 2, MaxValue: math.MaxUint8, AllowEmpty: true,
SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
SchemaVersionCacheLimit.Store(TidbOptInt64(val, DefTiDBSchemaVersionCacheLimit))
return nil
}},
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
Loading