Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
812e0d8
Merge pull request #1 from pingcap/master
JK1Zhang Sep 11, 2023
7c8808b
add DistTaskDispatcherGauge
JK1Zhang Sep 12, 2023
98a0c25
add duration
JK1Zhang Sep 16, 2023
9fc809c
duration
JK1Zhang Sep 16, 2023
80c0eff
add metrics/disttask.go
JK1Zhang Sep 16, 2023
bbe7f07
Update disttask/framework/dispatcher/dispatcher_manager.go
JK1Zhang Sep 18, 2023
75ab579
Update metrics/disttask.go
JK1Zhang Sep 18, 2023
2e9f3b0
resolve all comments
JK1Zhang Sep 19, 2023
a39c10c
add json
JK1Zhang Sep 20, 2023
fa240db
add json
JK1Zhang Sep 20, 2023
caeb50b
add json
JK1Zhang Sep 21, 2023
374420e
resolve comments
JK1Zhang Sep 22, 2023
2feb6e6
Merge branch 'master' into metrics-dev
JK1Zhang Sep 22, 2023
40ae4c8
Merge pull request #2 from pingcap/master
JK1Zhang Sep 22, 2023
444792f
Merge pull request #3 from JK1Zhang/master
JK1Zhang Sep 22, 2023
1d406f3
resolve comments
JK1Zhang Sep 22, 2023
d45089e
resolve comments
JK1Zhang Sep 22, 2023
74279a3
resolve comments
JK1Zhang Sep 22, 2023
14790ca
fix json
JK1Zhang Sep 22, 2023
fd45a4c
fix json
JK1Zhang Sep 22, 2023
b8346bd
fix json
JK1Zhang Sep 22, 2023
4c45da7
add deps
JK1Zhang Sep 22, 2023
922626c
add comments
JK1Zhang Sep 22, 2023
17ffb08
add comments
JK1Zhang Sep 22, 2023
8d47ca2
fix bazel
JK1Zhang Sep 22, 2023
ee932fc
fix bazel
JK1Zhang Sep 22, 2023
f895c82
add label for expr in grafana
JK1Zhang Sep 25, 2023
aa9e7ac
fix conflicts
JK1Zhang Sep 25, 2023
2552616
Merge branch 'master' into metrics-dev
JK1Zhang Sep 25, 2023
f491a41
fix conflicts
JK1Zhang Sep 25, 2023
1f1e002
Update metrics/grafana/tidb.json
JK1Zhang Sep 25, 2023
ce76c75
Update metrics/grafana/tidb.json
JK1Zhang Sep 25, 2023
3a8b8c8
Update metrics/disttask.go
JK1Zhang Sep 25, 2023
68ca7a0
Update metrics/grafana/tidb.json
JK1Zhang Sep 25, 2023
fee895d
Update disttask.go
JK1Zhang Sep 25, 2023
5560887
merge dispatcher row into Dist Execute Framework
JK1Zhang Sep 25, 2023
9411a4d
a typo
JK1Zhang Sep 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions disttask/framework/dispatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//disttask/framework/proto",
"//disttask/framework/storage",
"//domain/infosync",
"//metrics",
"//resourcemanager/pool/spool",
"//resourcemanager/util",
"//sessionctx",
Expand Down
2 changes: 2 additions & 0 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
disttaskutil "github.com/pingcap/tidb/util/disttask"
"github.com/pingcap/tidb/util/intest"
Expand Down Expand Up @@ -340,6 +341,7 @@ func (d *BaseDispatcher) onRunning() error {
}

func (d *BaseDispatcher) onFinished() error {
metrics.UpdateMetricsForFinishTask(d.Task)
logutil.Logger(d.logCtx).Debug("schedule task, task is finished", zap.String("state", d.Task.State))
return d.taskMgr.TransferSubTasks2History(d.Task.ID)
}
Expand Down
5 changes: 5 additions & 0 deletions disttask/framework/dispatcher/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/resourcemanager/pool/spool"
"github.com/pingcap/tidb/resourcemanager/util"
tidbutil "github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -50,6 +51,7 @@ func (dm *Manager) setRunningTask(task *proto.Task, dispatcher Dispatcher) {
defer dm.runningTasks.Unlock()
dm.runningTasks.taskIDs[task.ID] = struct{}{}
dm.runningTasks.dispatchers[task.ID] = dispatcher
metrics.UpdateMetricsForRunTask(task)
}

func (dm *Manager) isRunningTask(taskID int64) bool {
Expand Down Expand Up @@ -175,6 +177,7 @@ func (dm *Manager) dispatchTaskLoop() {
if dm.isRunningTask(task.ID) {
continue
}
metrics.DistTaskGauge.WithLabelValues(task.Type, metrics.DispatchingStatus).Inc()
// we check it before start dispatcher, so no need to check it again.
// see startDispatcher.
// this should not happen normally, unless user modify system table
Expand All @@ -188,13 +191,15 @@ func (dm *Manager) dispatchTaskLoop() {
// the task is not in runningTasks set when:
// owner changed or task is cancelled when status is pending.
if task.State == proto.TaskStateRunning || task.State == proto.TaskStateReverting || task.State == proto.TaskStateCancelling {
metrics.UpdateMetricsForDispatchTask(task)
dm.startDispatcher(task)
cnt++
continue
}
if dm.checkConcurrencyOverflow(cnt) {
break
}
metrics.UpdateMetricsForDispatchTask(task)
dm.startDispatcher(task)
cnt++
}
Expand Down
1 change: 1 addition & 0 deletions disttask/framework/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
deps = [
"//disttask/framework/proto",
"//disttask/framework/storage",
"//metrics",
"//util/backoff",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
Expand Down
2 changes: 2 additions & 0 deletions disttask/framework/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/util/backoff"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -55,6 +56,7 @@ func SubmitGlobalTask(taskKey, taskType string, concurrency int, taskMeta []byte
if globalTask == nil {
return nil, errors.Errorf("cannot find global task with ID %d", taskID)
}
metrics.UpdateMetricsForAddTask(globalTask)
}
return globalTask, nil
}
Expand Down
56 changes: 56 additions & 0 deletions metrics/disttask.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package metrics

import (
"fmt"
"strconv"
"time"

"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -28,7 +30,19 @@ const (
lblSubTaskID = "subtask_id"
)

// status for task
const (
DispatchingStatus = "dispatching"
WaitingStatus = "waiting"
RunningStatus = "running"
CompletedStatus = "completed"
)

var (
//DistTaskGauge is the gauge of dist task count.
DistTaskGauge *prometheus.GaugeVec
//DistTaskStarttimeGauge is the gauge of dist task count.
DistTaskStarttimeGauge *prometheus.GaugeVec
// DistTaskSubTaskCntGauge is the gauge of dist task subtask count.
DistTaskSubTaskCntGauge *prometheus.GaugeVec
// DistTaskSubTaskStartTimeGauge is the gauge of dist task subtask start time.
Expand All @@ -37,6 +51,22 @@ var (

// InitDistTaskMetrics initializes disttask metrics.
func InitDistTaskMetrics() {
Comment thread
okJiang marked this conversation as resolved.
DistTaskGauge = NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "disttask",
Name: "task_status",
Help: "Gauge of disttask.",
}, []string{lblTaskType, lblTaskStatus})

DistTaskStarttimeGauge = NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "disttask",
Name: "start_time",
Help: "Gauge of start_time of disttask.",
}, []string{lblTaskType, lblTaskStatus, lblTaskID})

DistTaskSubTaskCntGauge = NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Expand Down Expand Up @@ -91,3 +121,29 @@ func EndDistTaskSubTask(subtask *proto.Subtask) {
strconv.Itoa(int(subtask.ID)),
)
}

// UpdateMetricsForAddTask update metrics when a task is added
func UpdateMetricsForAddTask(task *proto.Task) {
DistTaskGauge.WithLabelValues(task.Type, WaitingStatus).Inc()
DistTaskStarttimeGauge.WithLabelValues(task.Type, WaitingStatus, fmt.Sprint(task.ID)).Set(float64(time.Now().UnixMicro()))
}

// UpdateMetricsForDispatchTask update metrics when a task is added
func UpdateMetricsForDispatchTask(task *proto.Task) {
DistTaskGauge.WithLabelValues(task.Type, WaitingStatus).Dec()
DistTaskStarttimeGauge.DeleteLabelValues(task.Type, WaitingStatus, fmt.Sprint(task.ID))
DistTaskStarttimeGauge.WithLabelValues(task.Type, DispatchingStatus, fmt.Sprint(task.ID)).SetToCurrentTime()
}

// UpdateMetricsForRunTask update metrics when a task starts running
func UpdateMetricsForRunTask(task *proto.Task) {
DistTaskStarttimeGauge.DeleteLabelValues(task.Type, DispatchingStatus, fmt.Sprint(task.ID))
DistTaskGauge.WithLabelValues(task.Type, DispatchingStatus).Dec()
DistTaskGauge.WithLabelValues(task.Type, RunningStatus).Inc()
}

// UpdateMetricsForFinishTask update metrics when a task is finished
func UpdateMetricsForFinishTask(task *proto.Task) {
DistTaskGauge.WithLabelValues(task.Type, RunningStatus).Dec()
DistTaskGauge.WithLabelValues(task.Type, CompletedStatus).Inc()
}
Loading