-
Notifications
You must be signed in to change notification settings - Fork 6.2k
executor,metrics: add a metric for observing execution phases #35906
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
81a5274
631c38d
732be48
cc0ca97
6fc9782
1e2bff1
519d053
a61399f
4ccde74
df8b914
0e90568
4e2d00c
9e97b78
bfb259d
eca9088
f14fed6
9884452
eece875
58419d7
1b25036
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,6 +61,7 @@ import ( | |
| "github.com/pingcap/tidb/util/stringutil" | ||
| "github.com/pingcap/tidb/util/topsql" | ||
| topsqlstate "github.com/pingcap/tidb/util/topsql/state" | ||
| "github.com/prometheus/client_golang/prometheus" | ||
| tikverr "github.com/tikv/client-go/v2/error" | ||
| "github.com/tikv/client-go/v2/oracle" | ||
| "github.com/tikv/client-go/v2/util" | ||
|
|
@@ -148,7 +149,7 @@ func (a *recordSet) Next(ctx context.Context, req *chunk.Chunk) (err error) { | |
| logutil.Logger(ctx).Error("execute sql panic", zap.String("sql", a.stmt.GetTextToLog()), zap.Stack("stack")) | ||
| }() | ||
|
|
||
| err = Next(ctx, a.executor, req) | ||
| err = a.stmt.next(ctx, a.executor, req) | ||
| if err != nil { | ||
| a.lastErr = err | ||
| return err | ||
|
|
@@ -216,6 +217,17 @@ type ExecStmt struct { | |
| retryCount uint | ||
| retryStartTime time.Time | ||
|
|
||
| // Phase durations are splited into two parts: 1. trying to lock keys (but | ||
| // failed); 2. the final iteration of the retry loop. Here we use | ||
| // [2]time.Duration to record such info for each phase. The first duration | ||
| // is increased only within the current iteration. When we meet a | ||
| // pessimistic lock error and decide to retry, we add the first duration to | ||
| // the second and reset the first to 0 by calling `resetPhaseDurations`. | ||
| phaseBuildDurations [2]time.Duration | ||
| phaseOpenDurations [2]time.Duration | ||
| phaseNextDurations [2]time.Duration | ||
| phaseLockDurations [2]time.Duration | ||
|
|
||
| // OutputNames will be set if using cached plan | ||
| OutputNames []*types.FieldName | ||
| PsStmt *plannercore.CachedPrepareStmt | ||
|
|
@@ -425,7 +437,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { | |
| ctx = a.observeStmtBeginForTopSQL(ctx) | ||
|
|
||
| breakpoint.Inject(a.Ctx, sessiontxn.BreakPointBeforeExecutorFirstRun) | ||
| if err = e.Open(ctx); err != nil { | ||
| if err = a.openExecutor(ctx, e); err != nil { | ||
| terror.Call(e.Close) | ||
| return nil, err | ||
| } | ||
|
|
@@ -625,7 +637,7 @@ func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor | |
| var err error | ||
| req := newFirstChunk(e) | ||
| for { | ||
| err = Next(ctx, e, req) | ||
| err = a.next(ctx, e, req) | ||
| if err != nil { | ||
| // Handle 'write conflict' error. | ||
| break | ||
|
|
@@ -671,7 +683,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex | |
| } | ||
| } | ||
|
|
||
| err = Next(ctx, e, newFirstChunk(e)) | ||
| err = a.next(ctx, e, newFirstChunk(e)) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
@@ -724,6 +736,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { | |
| ctx = context.WithValue(ctx, util.LockKeysDetailCtxKey, &lockKeyStats) | ||
| startLocking := time.Now() | ||
| err = txn.LockKeys(ctx, lockCtx, keys...) | ||
| a.phaseLockDurations[0] += time.Since(startLocking) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Point get and batch point get will lock keys internally, that duration will not be observed here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, here the lock phase is only for pessimistic DMLs that have extra keys to lock. The internal lock key duration (of point-get, point-update, etc) is counted in next phase. |
||
| if lockKeyStats != nil { | ||
| seVars.StmtCtx.MergeLockKeysExecDetails(lockKeyStats) | ||
| } | ||
|
|
@@ -789,6 +802,8 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error | |
|
|
||
| breakpoint.Inject(a.Ctx, sessiontxn.BreakPointOnStmtRetryAfterLockError) | ||
|
|
||
| a.resetPhaseDurations() | ||
|
|
||
| e, err := a.buildExecutor() | ||
| if err != nil { | ||
| return nil, err | ||
|
|
@@ -802,7 +817,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error | |
| sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerAfterPessimisticLockErrorRetry", true) | ||
| }) | ||
|
|
||
| if err = e.Open(ctx); err != nil { | ||
| if err = a.openExecutor(ctx, e); err != nil { | ||
| return nil, err | ||
| } | ||
| return e, nil | ||
|
|
@@ -816,6 +831,7 @@ type pessimisticTxn interface { | |
|
|
||
| // buildExecutor build an executor from plan, prepared statement may need additional procedure. | ||
| func (a *ExecStmt) buildExecutor() (Executor, error) { | ||
| defer func(start time.Time) { a.phaseBuildDurations[0] += time.Since(start) }(time.Now()) | ||
| ctx := a.Ctx | ||
| stmtCtx := ctx.GetSessionVars().StmtCtx | ||
| if _, ok := a.Plan.(*plannercore.Execute); !ok { | ||
|
|
@@ -858,6 +874,31 @@ func (a *ExecStmt) buildExecutor() (Executor, error) { | |
| return e, nil | ||
| } | ||
|
|
||
| func (a *ExecStmt) openExecutor(ctx context.Context, e Executor) error { | ||
| start := time.Now() | ||
| err := e.Open(ctx) | ||
| a.phaseOpenDurations[0] += time.Since(start) | ||
| return err | ||
| } | ||
|
|
||
| func (a *ExecStmt) next(ctx context.Context, e Executor, req *chunk.Chunk) error { | ||
| start := time.Now() | ||
| err := Next(ctx, e, req) | ||
| a.phaseNextDurations[0] += time.Since(start) | ||
| return err | ||
| } | ||
|
|
||
| func (a *ExecStmt) resetPhaseDurations() { | ||
| a.phaseBuildDurations[1] += a.phaseBuildDurations[0] | ||
| a.phaseBuildDurations[0] = 0 | ||
| a.phaseOpenDurations[1] += a.phaseOpenDurations[0] | ||
| a.phaseOpenDurations[0] = 0 | ||
| a.phaseNextDurations[1] += a.phaseNextDurations[0] | ||
| a.phaseNextDurations[0] = 0 | ||
| a.phaseLockDurations[1] += a.phaseLockDurations[0] | ||
| a.phaseLockDurations[0] = 0 | ||
| } | ||
|
|
||
| // QueryReplacer replaces new line and tab for grep result including query string. | ||
| var QueryReplacer = strings.NewReplacer("\r", " ", "\n", " ", "\t", " ") | ||
|
|
||
|
|
@@ -896,12 +937,130 @@ func FormatSQL(sql string) stringutil.StringerFunc { | |
| } | ||
| } | ||
|
|
||
| const ( | ||
| phaseBuildLocking = "build:locking" | ||
| phaseOpenLocking = "open:locking" | ||
| phaseNextLocking = "next:locking" | ||
| phaseLockLocking = "lock:locking" | ||
| phaseBuildFinal = "build:final" | ||
| phaseOpenFinal = "open:final" | ||
| phaseNextFinal = "next:final" | ||
| phaseLockFinal = "lock:final" | ||
| phaseCommitPrewrite = "commit:prewrite" | ||
| phaseCommitCommit = "commit:commit" | ||
| phaseCommitWaitCommitTS = "commit:wait:commit-ts" | ||
| phaseCommitWaitLatestTS = "commit:wait:latest-ts" | ||
| phaseCommitWaitLatch = "commit:wait:local-latch" | ||
| phaseCommitWaitBinlog = "commit:wait:prewrite-binlog" | ||
| phaseWriteResponse = "write-response" | ||
|
Comment on lines
+941
to
+955
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cfzjywxk @you06 @sticnarf @longfangsong Any suggestion about naming? eg.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| ) | ||
|
|
||
| var ( | ||
| sessionExecuteRunDurationInternal = metrics.SessionExecuteRunDuration.WithLabelValues(metrics.LblInternal) | ||
| sessionExecuteRunDurationGeneral = metrics.SessionExecuteRunDuration.WithLabelValues(metrics.LblGeneral) | ||
| totalTiFlashQuerySuccCounter = metrics.TiFlashQueryTotalCounter.WithLabelValues("", metrics.LblOK) | ||
|
|
||
| // pre-define observers for non-internal queries | ||
| execBuildLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseBuildLocking, "0") | ||
| execOpenLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseOpenLocking, "0") | ||
| execNextLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseNextLocking, "0") | ||
| execLockLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseLockLocking, "0") | ||
| execBuildFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseBuildFinal, "0") | ||
| execOpenFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseOpenFinal, "0") | ||
| execNextFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseNextFinal, "0") | ||
| execLockFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseLockFinal, "0") | ||
| execCommitPrewrite = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitPrewrite, "0") | ||
| execCommitCommit = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitCommit, "0") | ||
| execCommitWaitCommitTS = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitCommitTS, "0") | ||
| execCommitWaitLatestTS = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitLatestTS, "0") | ||
| execCommitWaitLatch = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitLatch, "0") | ||
| execCommitWaitBinlog = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitBinlog, "0") | ||
| execWriteResponse = metrics.ExecPhaseDuration.WithLabelValues(phaseWriteResponse, "0") | ||
| ) | ||
|
|
||
| func getPhaseDurationObserver(phase string, internal bool) prometheus.Observer { | ||
| if internal { | ||
| return metrics.ExecPhaseDuration.WithLabelValues(phase, "1") | ||
| } | ||
| switch phase { | ||
| case phaseBuildLocking: | ||
| return execBuildLocking | ||
| case phaseOpenLocking: | ||
| return execOpenLocking | ||
| case phaseNextLocking: | ||
| return execNextLocking | ||
| case phaseLockLocking: | ||
| return execLockLocking | ||
| case phaseBuildFinal: | ||
| return execBuildFinal | ||
| case phaseOpenFinal: | ||
| return execOpenFinal | ||
| case phaseNextFinal: | ||
| return execNextFinal | ||
| case phaseLockFinal: | ||
| return execLockFinal | ||
| case phaseCommitPrewrite: | ||
| return execCommitPrewrite | ||
| case phaseCommitCommit: | ||
| return execCommitCommit | ||
| case phaseCommitWaitCommitTS: | ||
| return execCommitWaitCommitTS | ||
| case phaseCommitWaitLatestTS: | ||
| return execCommitWaitLatestTS | ||
| case phaseCommitWaitLatch: | ||
| return execCommitWaitLatch | ||
| case phaseCommitWaitBinlog: | ||
| return execCommitWaitBinlog | ||
| case phaseWriteResponse: | ||
| return execWriteResponse | ||
| default: | ||
| return metrics.ExecPhaseDuration.WithLabelValues(phase, "0") | ||
| } | ||
| } | ||
|
|
||
| func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.CommitDetails) { | ||
| for _, it := range []struct { | ||
| duration time.Duration | ||
| phase string | ||
| }{ | ||
| {a.phaseBuildDurations[0], phaseBuildFinal}, | ||
| {a.phaseBuildDurations[1], phaseBuildLocking}, | ||
| {a.phaseOpenDurations[0], phaseOpenFinal}, | ||
| {a.phaseOpenDurations[1], phaseOpenLocking}, | ||
| {a.phaseNextDurations[0], phaseNextFinal}, | ||
| {a.phaseNextDurations[1], phaseNextLocking}, | ||
| {a.phaseLockDurations[0], phaseLockFinal}, | ||
| {a.phaseLockDurations[1], phaseLockLocking}, | ||
| } { | ||
| if it.duration > 0 { | ||
| getPhaseDurationObserver(it.phase, internal).Observe(it.duration.Seconds()) | ||
| } | ||
| } | ||
| if commitDetails != nil { | ||
| for _, it := range []struct { | ||
| duration time.Duration | ||
| phase string | ||
| }{ | ||
| {commitDetails.PrewriteTime, phaseCommitPrewrite}, | ||
| {commitDetails.CommitTime, phaseCommitCommit}, | ||
| {commitDetails.GetCommitTsTime, phaseCommitWaitCommitTS}, | ||
| {commitDetails.GetLatestTsTime, phaseCommitWaitLatestTS}, | ||
| {commitDetails.LocalLatchTime, phaseCommitWaitLatch}, | ||
| {commitDetails.WaitPrewriteBinlogTime, phaseCommitWaitBinlog}, | ||
| } { | ||
| if it.duration > 0 { | ||
| getPhaseDurationObserver(it.phase, internal).Observe(it.duration.Seconds()) | ||
| } | ||
| } | ||
| } | ||
| if stmtDetailsRaw := a.GoCtx.Value(execdetails.StmtExecDetailKey); stmtDetailsRaw != nil { | ||
| d := stmtDetailsRaw.(*execdetails.StmtExecDetails).WriteSQLRespDuration | ||
| if d > 0 { | ||
| getPhaseDurationObserver(phaseWriteResponse, internal).Observe(d.Seconds()) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // FinishExecuteStmt is used to record some information after `ExecStmt` execution finished: | ||
| // 1. record slow log if needed. | ||
| // 2. record summary statement. | ||
|
|
@@ -946,6 +1105,7 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo | |
| } | ||
| sessVars.PrevStmt = FormatSQL(a.GetTextToLog()) | ||
|
|
||
| a.observePhaseDurations(sessVars.InRestrictedSQL, execDetail.CommitDetail) | ||
| executeDuration := time.Since(sessVars.StartTime) - sessVars.DurationCompile | ||
| if sessVars.InRestrictedSQL { | ||
| sessionExecuteRunDurationInternal.Observe(executeDuration.Seconds()) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,4 +46,13 @@ var ( | |
| Name: "statement_db_total", | ||
| Help: "Counter of StmtNode by Database.", | ||
| }, []string{LblDb, LblType}) | ||
|
|
||
| // ExecPhaseDuration records the duration of each execution phase. | ||
| ExecPhaseDuration = prometheus.NewSummaryVec( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain why summary is used here? I have never seen summary type metrics used in TiDB before and histogram is always used....
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To reduce the size of metrics data (histogram = summary + buckets). IMO, we won't care too much about sth like "what's the p99 latency of next phase". There are too many kinds of executors (as well as their combinations), some may be fast and other may be extremely slow, a higher or lower p99 latency may not provide more info (we do not known about the distribution of each kind of executors). Besides, it's hard to decide buckets here, some phases (like open) take very little time, but phases like lock may cost a few seconds. |
||
| prometheus.SummaryOpts{ | ||
| Namespace: "tidb", | ||
| Subsystem: "executor", | ||
| Name: "phase_duration_seconds", | ||
| Help: "Summary of each execution phase duration.", | ||
| }, []string{LblPhase, LblInternal}) | ||
| ) | ||
Uh oh!
There was an error while loading. Please reload this page.