From 8d1a6abe680fc2f84c5f1dab598bbe49c725196d Mon Sep 17 00:00:00 2001 From: Zhigao TONG Date: Fri, 6 Mar 2026 14:23:04 +0800 Subject: [PATCH] mvservice grafana Signed-off-by: Zhigao TONG --- pkg/metrics/grafana/tidb.json | 304 ++++++++++++++++++ pkg/metrics/materialized_view.go | 27 +- pkg/mvservice/executor.go | 20 +- pkg/mvservice/metrics_reporter.go | 10 +- pkg/mvservice/mvservice_test.go | 39 ++- pkg/mvservice/service.go | 29 +- pkg/mvservice/service_helper.go | 15 +- pkg/mvservice/service_settings.go | 32 +- pkg/mvservice/task_handler_test.go | 45 +-- .../handler/mvhandler/mv_service_handler.go | 30 +- 10 files changed, 452 insertions(+), 99 deletions(-) diff --git a/pkg/metrics/grafana/tidb.json b/pkg/metrics/grafana/tidb.json index ad374b3ecf883..5bd20cd292d3f 100644 --- a/pkg/metrics/grafana/tidb.json +++ b/pkg/metrics/grafana/tidb.json @@ -24869,6 +24869,310 @@ ], "title": "Global Sort", "type": "row" + }, + { + "collapse": true, + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 23 + }, + "id": 23763573020, + "panels": [ + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "Current MV service and task executor counts by status type.", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 6, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 23763573021, + "legend": { + "alignAsTable": false, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "sideWidth": null, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "repeat": null, + "seriesOverrides": [], + "spaceLength": 10, + "stack": true, + "steppedLine": false, + "targets": [ + { + "expr": "sum(tidb_mv_service_task_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (type)", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "MV Service Task Status", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "MV service scheduler and task executor events per second by type.", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 0 + }, + "id": 23763573022, + "legend": { + "alignAsTable": false, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "sideWidth": null, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "repeat": null, + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(tidb_mv_service_run_event_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (type)", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "MV Service Run Events OPS", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "dashLength": 10, + "datasource": "${DS_TEST-CLUSTER}", + "description": "MV service operation duration for successful operations by type.", + "fill": 1, + "gridPos": { + "h": 6, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 23763573023, + "legend": { + "alignAsTable": false, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "sideWidth": null, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "pointradius": 5, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "targets": [ + { + "expr": "histogram_quantile(1, sum(rate(tidb_mv_service_operation_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", result=\"success\"}[1m])) by (le, type))", + "legendFormat": "{{type}}-success", + "interval": "", + "exemplar": true, + "format": "time_series", + "intervalFactor": 2, + "refId": "A" + }, + { + "expr": "histogram_quantile(1, sum(rate(tidb_mv_service_operation_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", result=\"failed\"}[1m])) by (le, type))", + "legendFormat": "{{type}}-failed", + "interval": "", + "exemplar": true, + "format": "time_series", + "intervalFactor": 2, + "refId": "B", + "hide": false + } + ], + "thresholds": [], + "title": "MV Operation Duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "options": { + "alertThreshold": true + }, + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "pluginVersion": "7.5.17", + "yaxis": { + "align": false, + "alignLevel": null + }, + "timeRegions": [], + "bars": false, + "dashes": false, + "fillGradient": 0, + "percentage": false, + "points": false, + "repeat": null, + "stack": false, + "steppedLine": false, + "timeFrom": null, + "timeShift": null, + "hiddenSeries": false + } + ], + "repeat": null, + "repeatIteration": null, + "repeatRowId": null, + "showTitle": true, + "title": "Materialized View", + "titleSize": "h6", + "type": "row" } ], "refresh": "30s", diff --git a/pkg/metrics/materialized_view.go b/pkg/metrics/materialized_view.go index 0fcd5d8c115bd..1fee0ea484427 100644 --- a/pkg/metrics/materialized_view.go +++ b/pkg/metrics/materialized_view.go @@ -19,15 +19,17 @@ import ( ) const ( - mvMetricRunEventTaskExecSubmitted = "exec_submitted" - mvMetricRunEventTaskExecFinished = "exec_finished" - mvMetricRunEventTaskExecFailed = "exec_failed" - mvMetricRunEventTaskExecTimeout = "exec_timeout" - mvMetricRunEventTaskExecRejected = "exec_rejected" + mvMetricRunEventTaskExecSubmitted = "exec_submitted" + mvMetricRunEventTaskExecFinished = "exec_finished" + mvMetricRunEventTaskExecFailed = "exec_failed" + mvMetricRunEventTaskExecTimeout = "exec_timeout" + mvMetricRunEventTaskExecRejected = "exec_rejected" + mvMetricRunEventTaskExecBackpressure = "exec_backpressure" mvMetricTaskStatusTaskExecRunning = "exec_running" mvMetricTaskStatusTaskExecWaiting = "exec_waiting" mvMetricTaskStatusTaskExecTimedOutRunning = "exec_timed_out_running" + mvMetricTaskStatusTaskExecBackpressureBlk = "exec_backpressure_blocked" mvMetricTaskStatusMVTotal = "mv_total" mvMetricTaskStatusMVLogTotal = "mvlog_total" @@ -43,15 +45,17 @@ var ( MVServiceRunEventCounterVec *prometheus.CounterVec - MVTaskExecutorSubmittedCounter prometheus.Counter - MVTaskExecutorFinishedCounter prometheus.Counter - MVTaskExecutorFailedCounter prometheus.Counter - MVTaskExecutorTimeoutCounter prometheus.Counter - MVTaskExecutorRejectedCounter prometheus.Counter + MVTaskExecutorSubmittedCounter prometheus.Counter + MVTaskExecutorFinishedCounter prometheus.Counter + MVTaskExecutorFailedCounter prometheus.Counter + MVTaskExecutorTimeoutCounter prometheus.Counter + MVTaskExecutorRejectedCounter prometheus.Counter + MVTaskExecutorBackpressureCounter prometheus.Counter MVTaskExecutorRunningTaskGauge prometheus.Gauge MVTaskExecutorWaitingTaskGauge prometheus.Gauge MVTaskExecutorTimedOutRunningTaskGauge prometheus.Gauge + MVTaskExecutorBackpressureBlockedGauge prometheus.Gauge MVServiceMVRefreshTotalGauge prometheus.Gauge MVServiceMVLogPurgeTotalGauge prometheus.Gauge @@ -91,10 +95,11 @@ func InitMVMetrics() { MVTaskExecutorFailedCounter = MVServiceRunEventCounterVec.WithLabelValues(mvMetricRunEventTaskExecFailed) MVTaskExecutorTimeoutCounter = MVServiceRunEventCounterVec.WithLabelValues(mvMetricRunEventTaskExecTimeout) MVTaskExecutorRejectedCounter = MVServiceRunEventCounterVec.WithLabelValues(mvMetricRunEventTaskExecRejected) + MVTaskExecutorBackpressureCounter = MVServiceRunEventCounterVec.WithLabelValues(mvMetricRunEventTaskExecBackpressure) MVTaskExecutorRunningTaskGauge = MVServiceTaskStatusGaugeVec.WithLabelValues(mvMetricTaskStatusTaskExecRunning) MVTaskExecutorWaitingTaskGauge = MVServiceTaskStatusGaugeVec.WithLabelValues(mvMetricTaskStatusTaskExecWaiting) MVTaskExecutorTimedOutRunningTaskGauge = MVServiceTaskStatusGaugeVec.WithLabelValues(mvMetricTaskStatusTaskExecTimedOutRunning) - + MVTaskExecutorBackpressureBlockedGauge = MVServiceTaskStatusGaugeVec.WithLabelValues(mvMetricTaskStatusTaskExecBackpressureBlk) MVServiceMVRefreshTotalGauge = MVServiceTaskStatusGaugeVec.WithLabelValues(mvMetricTaskStatusMVTotal) MVServiceMVLogPurgeTotalGauge = MVServiceTaskStatusGaugeVec.WithLabelValues(mvMetricTaskStatusMVLogTotal) MVServiceMVRefreshRunningGauge = MVServiceTaskStatusGaugeVec.WithLabelValues(mvMetricTaskStatusMVRefreshRun) diff --git a/pkg/mvservice/executor.go b/pkg/mvservice/executor.go index 5de8d84c9eee4..cff3392437940 100644 --- a/pkg/mvservice/executor.go +++ b/pkg/mvservice/executor.go @@ -35,16 +35,18 @@ type TaskExecutor struct { metrics struct { counters struct { - submittedCount atomic.Int64 - finishedCount atomic.Int64 - failedCount atomic.Int64 - timeoutCount atomic.Int64 - rejectedCount atomic.Int64 + submittedCount atomic.Int64 + finishedCount atomic.Int64 + failedCount atomic.Int64 + timeoutCount atomic.Int64 + rejectedCount atomic.Int64 + backpressureCount atomic.Int64 } gauges struct { runningCount atomic.Int64 waitingCount atomic.Int64 timedOutRunningCount atomic.Int64 + backpressureBlocked atomic.Int64 } } @@ -297,6 +299,7 @@ func (e *TaskExecutor) workerLoop() { // nextTask picks the next executable task. // It waits when the queue is empty, respects worker down-scaling, and applies backpressure. func (e *TaskExecutor) nextTask() (taskRequest, bool) { + inBackpressureWait := false for { e.queue.mu.Lock() for e.lifecycleState.Load() != taskExecutorStateClosed && e.queue.tasks.length() == 0 { @@ -314,12 +317,19 @@ func (e *TaskExecutor) nextTask() (taskRequest, bool) { e.queue.mu.Unlock() if blocked, delay := e.shouldBackpressure(); blocked { + if !inBackpressureWait { + e.metrics.counters.backpressureCount.Add(1) + inBackpressureWait = true + } + e.metrics.gauges.backpressureBlocked.Add(1) select { case <-e.closeCh: case <-mvsAfter(delay): } + e.metrics.gauges.backpressureBlocked.Add(-1) continue } + inBackpressureWait = false e.queue.mu.Lock() if e.shouldExitWorkerWithLock() { diff --git a/pkg/mvservice/metrics_reporter.go b/pkg/mvservice/metrics_reporter.go index b79e6513a6074..9aca977322e29 100644 --- a/pkg/mvservice/metrics_reporter.go +++ b/pkg/mvservice/metrics_reporter.go @@ -37,9 +37,11 @@ func (h *serviceHelper) reportMetrics(s *MVService) { reportCounterDelta(tidbmetrics.MVTaskExecutorFailedCounter, &h.reportCache.failedCount, s.executor.metrics.counters.failedCount.Load()) reportCounterDelta(tidbmetrics.MVTaskExecutorTimeoutCounter, &h.reportCache.timeoutCount, s.executor.metrics.counters.timeoutCount.Load()) reportCounterDelta(tidbmetrics.MVTaskExecutorRejectedCounter, &h.reportCache.rejectedCount, s.executor.metrics.counters.rejectedCount.Load()) + reportCounterDelta(tidbmetrics.MVTaskExecutorBackpressureCounter, &h.reportCache.backpressureCount, s.executor.metrics.counters.backpressureCount.Load()) tidbmetrics.MVTaskExecutorRunningTaskGauge.Set(float64(s.executor.metrics.gauges.runningCount.Load())) tidbmetrics.MVTaskExecutorWaitingTaskGauge.Set(float64(s.executor.metrics.gauges.waitingCount.Load())) tidbmetrics.MVTaskExecutorTimedOutRunningTaskGauge.Set(float64(s.executor.metrics.gauges.timedOutRunningCount.Load())) + tidbmetrics.MVTaskExecutorBackpressureBlockedGauge.Set(float64(s.executor.metrics.gauges.backpressureBlocked.Load())) // MVService metrics tidbmetrics.MVServiceMVRefreshTotalGauge.Set(float64(s.metrics.mvCount.Load())) @@ -56,14 +58,6 @@ func (h *serviceHelper) observeTaskDuration(taskType, result string, duration ti h.getDurationObserver(taskType, result).Observe(duration.Seconds()) } -// observeFetchDuration reports one metadata fetch duration sample. -func (h *serviceHelper) observeFetchDuration(fetchType, result string, duration time.Duration) { - if duration < 0 { - return - } - h.getDurationObserver(fetchType, result).Observe(duration.Seconds()) -} - // observeRunEvent increments one run-loop event counter. func (h *serviceHelper) observeRunEvent(eventType string) { if eventType == "" { diff --git a/pkg/mvservice/mvservice_test.go b/pkg/mvservice/mvservice_test.go index 8dcb9d822fe2c..3e0b61e4a7c41 100644 --- a/pkg/mvservice/mvservice_test.go +++ b/pkg/mvservice/mvservice_test.go @@ -269,10 +269,17 @@ func TestTaskExecutorBackpressure(t *testing.T) { t.Fatalf("task should still be blocked") default: } + require.Eventually(t, func() bool { + return exec.metrics.counters.backpressureCount.Load() > 0 + }, time.Second, time.Millisecond) + require.Equal(t, int64(1), exec.metrics.counters.backpressureCount.Load()) controller.blocked.Store(false) module.Advance(time.Second) waitForSignal(t, done, time.Hour) + require.Eventually(t, func() bool { + return exec.metrics.gauges.backpressureBlocked.Load() == 0 + }, time.Second, time.Millisecond) } func TestTaskExecutorBackpressureCheckDoesNotBlockSubmit(t *testing.T) { @@ -333,7 +340,7 @@ func TestTaskExecutorBackpressureCheckDoesNotBlockSubmit(t *testing.T) { waitStarted("t2") require.Eventually(t, func() bool { return exec.metrics.counters.finishedCount.Load() == 2 - }, time.Second, 10*time.Millisecond) + }, time.Second, time.Millisecond) } func TestTaskExecutorCloseInterruptsBackpressureWait(t *testing.T) { @@ -368,6 +375,10 @@ func TestTaskExecutorCloseInterruptsBackpressureWait(t *testing.T) { case <-time.After(time.Second): t.Fatal("close should interrupt backpressure wait") } + require.Eventually(t, func() bool { + return exec.metrics.gauges.backpressureBlocked.Load() == 0 + }, time.Second, time.Millisecond) + require.Greater(t, exec.metrics.counters.backpressureCount.Load(), int64(0)) } func TestCPUMemBackpressureController(t *testing.T) { @@ -433,7 +444,7 @@ func TestNewMVServiceConfig(t *testing.T) { require.Equal(t, cfg.TaskBackpressure, backpressureCfg) require.NotNil(t, svc.executor.backpressure.Load()) - require.Equal(t, cfg.FetchInterval, svc.fetchInterval) + require.Equal(t, cfg.FetchInterval, svc.GetFetchInterval()) require.Equal(t, cfg.BasicInterval, svc.basicInterval) require.Equal(t, cfg.ServerRefreshInterval, svc.serverRefreshInterval) require.Equal(t, cfg.ServerConsistentHashReplicas, svc.sch.chash.replicas) @@ -481,6 +492,20 @@ func TestNewMVServiceConfig(t *testing.T) { } func TestMVServiceUpdateConfigs(t *testing.T) { + t.Run("fetch_interval", func(t *testing.T) { + svc := NewMVService(context.Background(), mockSessionPool{}, &mockMVServiceHelper{}, DefaultMVServiceConfig()) + + err := svc.SetFetchInterval(2 * time.Second) + require.NoError(t, err) + require.Equal(t, 2*time.Second, svc.GetFetchInterval()) + + err = svc.SetFetchInterval(0) + require.Error(t, err) + + err = svc.SetFetchInterval(500 * time.Microsecond) + require.Error(t, err) + }) + t.Run("task_backpressure", func(t *testing.T) { svc := NewMVService(context.Background(), mockSessionPool{}, &mockMVServiceHelper{}, DefaultMVServiceConfig()) @@ -781,7 +806,7 @@ func (h *mvServiceTestHarness) waitFetchCycleSince(logCalls, viewCalls int32) { h.t.Helper() require.Eventually(h.t, func() bool { return h.helper.fetchLogsCalls.Load() > logCalls && h.helper.fetchViewCalls.Load() > viewCalls - }, time.Second, 10*time.Millisecond) + }, time.Second, time.Millisecond) } func (h *mvServiceTestHarness) waitRefreshTask(mvID int64) { @@ -799,7 +824,7 @@ func (h *mvServiceTestHarness) assertNoPending() { require.Eventually(h.t, func() bool { mvLogCount, mvCount := pendingTaskCounts(h.svc) return mvLogCount == 0 && mvCount == 0 - }, time.Second, 10*time.Millisecond) + }, time.Second, time.Millisecond) } func TestMVServiceFullChainSimulation(t *testing.T) { @@ -830,7 +855,7 @@ func TestMVServiceFullChainSimulation(t *testing.T) { require.Eventually(t, func() bool { return h.svc.executor.metrics.counters.finishedCount.Load() == 2 - }, time.Second, 10*time.Millisecond) + }, time.Second, time.Millisecond) assertTaskExecutorMetrics(t, h.svc.executor, 2, 2, 0, 0, 0, 0, 0, 0) require.Equal(t, 1, h.helper.taskDurationCount(mvTaskDurationTypeRefresh, mvDurationResultSuccess)) require.Equal(t, 1, h.helper.taskDurationCount(mvTaskDurationTypePurge, mvDurationResultSuccess)) @@ -860,7 +885,7 @@ func TestMVServiceFullChainSimulation(t *testing.T) { return h.svc.executor.metrics.counters.finishedCount.Load() >= finishedBase+2 && h.svc.executor.metrics.counters.failedCount.Load() >= failedBase+2 && h.svc.executor.metrics.counters.submittedCount.Load() >= submittedBase+2 - }, time.Second, 10*time.Millisecond) + }, time.Second, time.Millisecond) h.helper.refreshErr = nil h.helper.purgeErr = nil @@ -873,7 +898,7 @@ func TestMVServiceFullChainSimulation(t *testing.T) { return h.svc.executor.metrics.counters.finishedCount.Load() >= finishedBase+4 && h.svc.executor.metrics.counters.failedCount.Load() >= failedBase+2 && h.svc.executor.metrics.counters.submittedCount.Load() >= submittedBase+4 - }, time.Second, 10*time.Millisecond) + }, time.Second, time.Millisecond) require.Equal(t, 2, h.helper.taskDurationCount(mvTaskDurationTypeRefresh, mvDurationResultSuccess)) require.Equal(t, 2, h.helper.taskDurationCount(mvTaskDurationTypePurge, mvDurationResultSuccess)) diff --git a/pkg/mvservice/service.go b/pkg/mvservice/service.go index 004300673f705..46c0eb4b0e90d 100644 --- a/pkg/mvservice/service.go +++ b/pkg/mvservice/service.go @@ -35,7 +35,6 @@ type mvItem = Item[*mv] type MVMetricsReporter interface { reportMetrics(*MVService) observeTaskDuration(taskType, result string, duration time.Duration) - observeFetchDuration(fetchType, result string, duration time.Duration) observeRunEvent(eventType string) } @@ -59,7 +58,7 @@ type MVService struct { mh Helper - fetchInterval time.Duration + fetchIntervalMillis atomic.Int64 basicInterval time.Duration serverRefreshInterval time.Duration historyGCIntervalMillis atomic.Int64 @@ -120,13 +119,9 @@ const ( mvRunEventServerRefreshError = "server_refresh_error" mvRunEventFetchByDDL = "fetch_meta_trigger_ddl" mvRunEventFetchByInterval = "fetch_meta_trigger_interval" - mvRunEventFetchMLogOK = "fetch_mlog_ok" - mvRunEventFetchMLogErr = "fetch_mlog_error" - mvRunEventFetchMViewOK = "fetch_mviews_ok" - mvRunEventFetchMViewErr = "fetch_mviews_error" mvRunEventHistoryGCGetTSOErr = "history_gc_get_tso_error" - mvHistoryGCOwnerKey = "mv-history-gc" + mvHistoryGCOwnerKey = "gc-mv-op-hist" ) type mv struct { @@ -511,18 +506,16 @@ func (t *MVService) fetchAllTiDBMVLogPurge() (map[int64]*mvLog, error) { start := mvsNow() result := mvDurationResultSuccess defer func() { - t.mh.observeFetchDuration(mvFetchTypeMLogPurge, result, mvsSince(start)) + t.mh.observeTaskDuration(mvFetchTypeMLogPurge, result, mvsSince(start)) }() newPending, err := t.mh.fetchAllTiDBMVLogPurge(t.ctx, t.sysSessionPool) if err != nil { result = mvDurationResultFailed - t.mh.observeRunEvent(mvRunEventFetchMLogErr) fields := append(t.runtimeLogFields(), zap.Error(err)) logutil.BgLogger().Warn("fetch all mvlog purge tasks failed", fields...) return nil, err } - t.mh.observeRunEvent(mvRunEventFetchMLogOK) filterUnownedTasks(t.sch, newPending) return newPending, nil } @@ -532,18 +525,16 @@ func (t *MVService) fetchAllTiDBMVRefresh() (map[int64]*mv, error) { start := mvsNow() result := mvDurationResultSuccess defer func() { - t.mh.observeFetchDuration(mvFetchTypeMViewRefresh, result, mvsSince(start)) + t.mh.observeTaskDuration(mvFetchTypeMViewRefresh, result, mvsSince(start)) }() newPending, err := t.mh.fetchAllTiDBMVRefresh(t.ctx, t.sysSessionPool) if err != nil { result = mvDurationResultFailed - t.mh.observeRunEvent(mvRunEventFetchMViewErr) fields := append(t.runtimeLogFields(), zap.Error(err)) logutil.BgLogger().Warn("fetch all materialized view refresh tasks failed", fields...) return nil, err } - t.mh.observeRunEvent(mvRunEventFetchMViewOK) filterUnownedTasks(t.sch, newPending) return newPending, nil } @@ -715,6 +706,7 @@ func (t *MVService) Run() { // markFetchFailure records a synthetic lastMetaFetchMillis to control next fetch time. func (t *MVService) markFetchFailure(now time.Time, ddlTriggered bool) { + fetchInterval := t.GetFetchInterval() if !ddlTriggered { t.lastMetaFetchMillis.Store(now.UnixMilli()) return @@ -724,11 +716,11 @@ func (t *MVService) markFetchFailure(now time.Time, ddlTriggered bool) { if retryDelay <= 0 { retryDelay = defaultMVBasicInterval } - if retryDelay > t.fetchInterval { - retryDelay = t.fetchInterval + if retryDelay > fetchInterval { + retryDelay = fetchInterval } // next fetch time = lastMetaFetchMillis + fetchInterval = now + retryDelay - t.lastMetaFetchMillis.Store(now.Add(retryDelay - t.fetchInterval).UnixMilli()) + t.lastMetaFetchMillis.Store(now.Add(retryDelay - fetchInterval).UnixMilli()) } // shouldFetchMVMeta reports whether a periodic metadata refresh is due. @@ -737,16 +729,17 @@ func (t *MVService) shouldFetchMVMeta(now time.Time) bool { if last == 0 { return true } - return now.Sub(mvsUnixMilli(last)) >= t.fetchInterval + return now.Sub(mvsUnixMilli(last)) >= t.GetFetchInterval() } // nextFetchTime returns the next periodic metadata refresh time. func (t *MVService) nextFetchTime(now time.Time) time.Time { + fetchInterval := t.GetFetchInterval() last := t.lastMetaFetchMillis.Load() if last == 0 { return now } - next := mvsUnixMilli(last).Add(t.fetchInterval) + next := mvsUnixMilli(last).Add(fetchInterval) if next.Before(now) { return now } diff --git a/pkg/mvservice/service_helper.go b/pkg/mvservice/service_helper.go index f5600213fbd63..52c314a07935a 100644 --- a/pkg/mvservice/service_helper.go +++ b/pkg/mvservice/service_helper.go @@ -40,11 +40,12 @@ type serviceHelper struct { runEventCounterCache runEventCounterCache reportCache struct { - submittedCount int64 - finishedCount int64 - failedCount int64 - timeoutCount int64 - rejectedCount int64 + submittedCount int64 + finishedCount int64 + failedCount int64 + timeoutCount int64 + rejectedCount int64 + backpressureCount int64 } } @@ -330,7 +331,6 @@ func (*serviceHelper) fetchAllTiDBMVLogPurge(ctx context.Context, sysSessionPool const sql = `SELECT TIMESTAMPDIFF(SECOND, '1970-01-01 00:00:00', NEXT_TIME) as NEXT_TIME_SEC, MLOG_ID FROM mysql.tidb_mlog_purge_info WHERE NEXT_TIME IS NOT NULL` rows, err := execRCRestrictedSQLWithSessionPool(ctx, sysSessionPool, sql, nil) if err != nil { - logutil.BgLogger().Warn("fetch mysql.tidb_mlog_purge_info failed", zap.Error(err)) return nil, err } newPending := make(map[int64]*mvLog, len(rows)) @@ -358,7 +358,6 @@ func (*serviceHelper) fetchAllTiDBMVRefresh(ctx context.Context, sysSessionPool const sql = `SELECT TIMESTAMPDIFF(SECOND, '1970-01-01 00:00:00', NEXT_TIME) as NEXT_TIME_SEC, MVIEW_ID FROM mysql.tidb_mview_refresh_info WHERE NEXT_TIME IS NOT NULL` rows, err := execRCRestrictedSQLWithSessionPool(ctx, sysSessionPool, sql, nil) if err != nil { - logutil.BgLogger().Warn("fetch mysql.tidb_mview_refresh_info failed", zap.Error(err)) return nil, err } newPending := make(map[int64]*mv, len(rows)) @@ -389,7 +388,6 @@ func execRCRestrictedSQLWithSessionPool(ctx context.Context, sysSessionPool basi "get system session for restricted SQL failed", zap.String("sql", sql), zap.Int("param_count", len(params)), - zap.Any("params", params), zap.Error(err), ) return nil, err @@ -414,7 +412,6 @@ func execRCRestrictedSQLWithSession(ctx context.Context, sctx sessionctx.Context "execute restricted SQL failed", zap.String("sql", sql), zap.Int("param_count", len(params)), - zap.Any("params", params), zap.NamedError("context_error", ctx.Err()), zap.Error(err), ) diff --git a/pkg/mvservice/service_settings.go b/pkg/mvservice/service_settings.go index d4e7d785c7ad6..7f1cac1fa2d4c 100644 --- a/pkg/mvservice/service_settings.go +++ b/pkg/mvservice/service_settings.go @@ -117,10 +117,13 @@ func NewMVService(ctx context.Context, se basic.SessionPool, helper Helper, cfg ctx: ctx, mh: helper, - fetchInterval: cfg.FetchInterval, basicInterval: cfg.BasicInterval, serverRefreshInterval: cfg.ServerRefreshInterval, } + if err := mgr.SetFetchInterval(cfg.FetchInterval); err != nil { + panic(fmt.Sprintf("invalid MV service fetch interval config: fetch_interval=%s err=%v", + cfg.FetchInterval, err)) + } if err := mgr.SetHistoryGCConfig(cfg.HistoryGCInterval, cfg.HistoryGCRetention); err != nil { panic(fmt.Sprintf("invalid MV service history GC config: interval=%s retention=%s err=%v", cfg.HistoryGCInterval, cfg.HistoryGCRetention, err)) @@ -141,6 +144,33 @@ func (t *MVService) SetTaskExecConfig(maxConcurrency int, timeout time.Duration) t.executor.UpdateConfig(maxConcurrency, timeout) } +// SetFetchInterval sets metadata fetch interval. +func (t *MVService) SetFetchInterval(interval time.Duration) error { + if t == nil { + return fmt.Errorf("mv service is nil") + } + if interval <= 0 { + return fmt.Errorf("fetch interval must be positive") + } + if interval < time.Millisecond { + return fmt.Errorf("fetch interval must be at least 1ms") + } + t.fetchIntervalMillis.Store(interval.Milliseconds()) + return nil +} + +// GetFetchInterval returns metadata fetch interval. +func (t *MVService) GetFetchInterval() time.Duration { + if t == nil { + return defaultMVFetchInterval + } + interval := time.Duration(t.fetchIntervalMillis.Load()) * time.Millisecond + if interval <= 0 { + return defaultMVFetchInterval + } + return interval +} + // GetTaskExecConfig returns the current execution config for MV tasks. func (t *MVService) GetTaskExecConfig() (maxConcurrency int, timeout time.Duration) { return t.executor.GetConfig() diff --git a/pkg/mvservice/task_handler_test.go b/pkg/mvservice/task_handler_test.go index ee7e3e4833bf8..079cf2122770b 100644 --- a/pkg/mvservice/task_handler_test.go +++ b/pkg/mvservice/task_handler_test.go @@ -175,7 +175,7 @@ func waitExecutorFinishedCount(t *testing.T, svc *MVService, expected int64) { t.Helper() require.Eventually(t, func() bool { return svc.executor.metrics.counters.finishedCount.Load() == expected - }, time.Second, 10*time.Millisecond) + }, time.Second, time.Millisecond) } func setupPurgeMVLogMetaForTest(t *testing.T, se *recordingSessionContext, nextTimeRows []chunk.Row) { @@ -218,10 +218,9 @@ type mockMVServiceHelper struct { lastRefreshID int64 lastPurgeID int64 - metricsMu sync.Mutex - taskDurationCounts map[string]int - metaFetchDurationCount map[string]int - runEventCounts map[string]int + metricsMu sync.Mutex + taskDurationCounts map[string]int + runEventCounts map[string]int } func (m *mockMVServiceHelper) RefreshMV(_ context.Context, _ basic.SessionPool, mvID int64) (nextRefresh time.Time, err error) { @@ -281,18 +280,6 @@ func (m *mockMVServiceHelper) observeTaskDuration(taskType, result string, durat m.metricsMu.Unlock() } -func (m *mockMVServiceHelper) observeFetchDuration(fetchType, result string, duration time.Duration) { - if duration < 0 { - return - } - m.metricsMu.Lock() - if m.metaFetchDurationCount == nil { - m.metaFetchDurationCount = make(map[string]int) - } - m.metaFetchDurationCount[fetchType+"/"+result]++ - m.metricsMu.Unlock() -} - func (m *mockMVServiceHelper) taskDurationCount(taskType, result string) int { m.metricsMu.Lock() defer m.metricsMu.Unlock() @@ -302,7 +289,7 @@ func (m *mockMVServiceHelper) taskDurationCount(taskType, result string) int { func (m *mockMVServiceHelper) fetchDurationCount(fetchType, result string) int { m.metricsMu.Lock() defer m.metricsMu.Unlock() - return m.metaFetchDurationCount[fetchType+"/"+result] + return m.taskDurationCounts[fetchType+"/"+result] } func (m *mockMVServiceHelper) observeRunEvent(eventType string) { @@ -387,12 +374,10 @@ func TestMVServiceNotifyDDLChangeTriggersFetch(t *testing.T) { svc.NotifyDDLChange() require.Eventually(t, func() bool { return helper.fetchLogsCalls.Load() > 0 && helper.fetchViewCalls.Load() > 0 - }, time.Second, 20*time.Millisecond) + }, time.Second, time.Millisecond) require.Eventually(t, func() bool { - return helper.runEventCount(mvRunEventFetchByDDL) > 0 && - helper.runEventCount(mvRunEventFetchMLogOK) > 0 && - helper.runEventCount(mvRunEventFetchMViewOK) > 0 - }, time.Second, 20*time.Millisecond) + return helper.runEventCount(mvRunEventFetchByDDL) > 0 + }, time.Second, time.Millisecond) } func TestMVServiceMaintenanceTimerTriggersHistoryGC(t *testing.T) { @@ -425,17 +410,17 @@ func TestMVServiceMaintenanceTimerTriggersHistoryGC(t *testing.T) { require.Equal(t, int32(0), helper.historyGCCalls.Load()) require.Eventually(t, func() bool { return helper.runEventCount(mvRunEventServerRefreshOK) > 0 - }, time.Second, 10*time.Millisecond) + }, time.Second, time.Millisecond) module.Advance(defaultMVHistoryGCInterval + cfg.BasicInterval) require.Eventually(t, func() bool { return helper.historyGCCalls.Load() > 0 - }, time.Second, 10*time.Millisecond) + }, time.Second, time.Millisecond) require.Equal(t, helper.currentTSO, helper.lastHistoryGCCurrentTSO.Load()) require.Equal(t, int64(defaultMVHistoryGCRetention), helper.lastHistoryGCRetention.Load()) require.Eventually(t, func() bool { return helper.taskDurationCount(mvTaskDurationTypeHistoryGC, mvDurationResultSuccess) > 0 - }, time.Second, 10*time.Millisecond) + }, time.Second, time.Millisecond) } func TestMVServiceMaybeGCMVHistorySkipsWhenNotOwner(t *testing.T) { @@ -612,10 +597,6 @@ func TestMVServiceFetchAllMVMetaAvoidsPartialApplyOnFetchError(t *testing.T) { require.Equal(t, 1, helper.fetchDurationCount(mvFetchTypeMViewRefresh, mvDurationResultFailed)) require.Equal(t, 0, helper.fetchDurationCount(mvFetchTypeMLogPurge, mvDurationResultFailed)) require.Equal(t, 0, helper.fetchDurationCount(mvFetchTypeMViewRefresh, mvDurationResultSuccess)) - require.Equal(t, 1, helper.runEventCount(mvRunEventFetchMLogOK)) - require.Equal(t, 1, helper.runEventCount(mvRunEventFetchMViewErr)) - require.Equal(t, 0, helper.runEventCount(mvRunEventFetchMLogErr)) - require.Equal(t, 0, helper.runEventCount(mvRunEventFetchMViewOK)) svc.mvLogPurgeMu.Lock() _, hasOldMLog := svc.mvLogPurgeMu.pending[101] @@ -653,7 +634,7 @@ func TestMVServicePurgeMVLogTaskResult(t *testing.T) { _, ok := svc.mvLogPurgeMu.pending[l.ID] svc.mvLogPurgeMu.Unlock() return !ok - }, time.Second, 10*time.Millisecond) + }, time.Second, time.Millisecond) require.Equal(t, int64(0), l.retryCount.Load()) require.Equal(t, int64(0), svc.metrics.mvLogCount.Load()) }) @@ -709,7 +690,7 @@ func TestMVServiceRefreshMVTaskResult(t *testing.T) { _, ok := svc.mvRefreshMu.pending[m.ID] svc.mvRefreshMu.Unlock() return !ok - }, time.Second, 10*time.Millisecond) + }, time.Second, time.Millisecond) require.Equal(t, int64(0), m.retryCount.Load()) require.Equal(t, int64(0), svc.metrics.mvCount.Load()) }) diff --git a/pkg/server/handler/mvhandler/mv_service_handler.go b/pkg/server/handler/mvhandler/mv_service_handler.go index ab2d81aa94698..9f0eb58301b0c 100644 --- a/pkg/server/handler/mvhandler/mv_service_handler.go +++ b/pkg/server/handler/mvhandler/mv_service_handler.go @@ -28,14 +28,15 @@ import ( const ( mvServiceTaskMaxConcurrencyFormField = "task_max_concurrency" - mvServiceTaskTimeoutFormField = "task_timeout" + mvServiceTaskTimeoutFormField = "task_exec_timeout" + mvServiceFetchIntervalFormField = "meta_fetch_interval" mvServiceBackpressureCPUThresholdFormField = "backpressure_cpu_threshold" mvServiceBackpressureMemThresholdFormField = "backpressure_mem_threshold" - mvServiceBackpressureDelayFormField = "backpressure_delay" + mvServiceBackpressureDelayFormField = "backpressure_wait_delay" - mvServiceTaskFailRetryBaseDelayFormField = "task_fail_retry_base_delay" - mvServiceTaskFailRetryMaxDelayFormField = "task_fail_retry_max_delay" + mvServiceTaskFailRetryBaseDelayFormField = "task_retry_base_delay" + mvServiceTaskFailRetryMaxDelayFormField = "task_retry_max_delay" mvServiceHistoryGCIntervalFormField = "history_gc_interval" mvServiceHistoryGCRetentionFormField = "history_gc_retention" ) @@ -53,6 +54,8 @@ func NewMVServiceSettingsHandler(tool *handler.TikvHandlerTool) *MVServiceSettin type mvServiceRuntimeSettingsAccessor interface { GetTaskExecConfig() (maxConcurrency int, timeout time.Duration) SetTaskExecConfig(maxConcurrency int, timeout time.Duration) + GetFetchInterval() time.Duration + SetFetchInterval(interval time.Duration) error GetTaskBackpressureConfig() mvservice.TaskBackpressureConfig SetTaskBackpressureConfig(cfg mvservice.TaskBackpressureConfig) error @@ -67,6 +70,7 @@ type mvServiceRuntimeSettingsAccessor interface { type mvServiceRuntimeSettings struct { maxConcurrency int timeout time.Duration + fetchInterval time.Duration backpressureCfg mvservice.TaskBackpressureConfig @@ -87,6 +91,9 @@ var mvServiceSettingsFieldUpdaters = []settingsFieldUpdater{ newDurationSettingsFieldUpdater(mvServiceTaskTimeoutFormField, func(v time.Duration) bool { return v >= 0 }, func(settings *mvServiceRuntimeSettings, v time.Duration) { settings.timeout = v }), + newDurationSettingsFieldUpdater(mvServiceFetchIntervalFormField, nil, func(settings *mvServiceRuntimeSettings, v time.Duration) { + settings.fetchInterval = v + }), newFloat64SettingsFieldUpdater(mvServiceBackpressureCPUThresholdFormField, nil, func(settings *mvServiceRuntimeSettings, v float64) { settings.backpressureCfg.CPUThreshold = v }), @@ -113,12 +120,13 @@ var mvServiceSettingsFieldUpdaters = []settingsFieldUpdater{ // MVServiceSettingsResponse is MV service runtime settings response. type MVServiceSettingsResponse struct { TaskMaxConcurrency int `json:"task_max_concurrency"` - TaskTimeout string `json:"task_timeout"` + TaskTimeout string `json:"task_exec_timeout"` + FetchInterval string `json:"meta_fetch_interval"` BackpressureCPUThreshold float64 `json:"backpressure_cpu_threshold"` BackpressureMemThreshold float64 `json:"backpressure_mem_threshold"` - BackpressureDelay string `json:"backpressure_delay"` - TaskFailRetryBaseDelay string `json:"task_fail_retry_base_delay"` - TaskFailRetryMaxDelay string `json:"task_fail_retry_max_delay"` + BackpressureDelay string `json:"backpressure_wait_delay"` + TaskFailRetryBaseDelay string `json:"task_retry_base_delay"` + TaskFailRetryMaxDelay string `json:"task_retry_max_delay"` HistoryGCInterval string `json:"history_gc_interval"` HistoryGCRetention string `json:"history_gc_retention"` } @@ -190,12 +198,14 @@ func (MVServiceSettingsHandler) servePost(w http.ResponseWriter, req *http.Reque // loadMVServiceRuntimeSettings reads all mutable runtime settings from the service. func loadMVServiceRuntimeSettings(mvService mvServiceRuntimeSettingsAccessor) mvServiceRuntimeSettings { maxConcurrency, timeout := mvService.GetTaskExecConfig() + fetchInterval := mvService.GetFetchInterval() backpressureCfg := mvService.GetTaskBackpressureConfig() retryBase, retryMax := mvService.GetRetryDelayConfig() historyGCInterval, historyGCRetention := mvService.GetHistoryGCConfig() return mvServiceRuntimeSettings{ maxConcurrency: maxConcurrency, timeout: timeout, + fetchInterval: fetchInterval, backpressureCfg: backpressureCfg, retryBase: retryBase, retryMax: retryMax, @@ -209,6 +219,7 @@ func writeMVServiceSettingsResponse(w http.ResponseWriter, settings mvServiceRun handler.WriteData(w, MVServiceSettingsResponse{ TaskMaxConcurrency: settings.maxConcurrency, TaskTimeout: settings.timeout.String(), + FetchInterval: settings.fetchInterval.String(), BackpressureCPUThreshold: settings.backpressureCfg.CPUThreshold, BackpressureMemThreshold: settings.backpressureCfg.MemThreshold, BackpressureDelay: settings.backpressureCfg.Delay.String(), @@ -319,6 +330,9 @@ func newIllegalMVServiceSettingsFieldError(field string) error { // applyMVServiceSettings writes merged runtime settings back to the service. func applyMVServiceSettings(mvService mvServiceRuntimeSettingsAccessor, settings mvServiceRuntimeSettings) error { mvService.SetTaskExecConfig(settings.maxConcurrency, settings.timeout) + if err := mvService.SetFetchInterval(settings.fetchInterval); err != nil { + return err + } if err := mvService.SetTaskBackpressureConfig(settings.backpressureCfg); err != nil { return err }