Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ go_library(
"//util/logutil",
"//util/memory",
"//util/ranger",
"//util/tracing",
"//util/trxevents",
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/metapb",
Expand Down
9 changes: 3 additions & 6 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"strconv"
"unsafe"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/tracing"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
Expand Down Expand Up @@ -64,11 +64,8 @@ func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.
// Select sends a DAG request, returns SelectResult.
// In kvReq, KeyRanges is required, Concurrency/KeepOrder/Desc/IsolationLevel/Priority are optional.
func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (SelectResult, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("distsql.Select", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "distsql.Select")
defer r.End()

// For testing purpose.
if hook := ctx.Value("CheckSelectRequestHook"); hook != nil {
Expand Down
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ go_library(
"//util/tls",
"//util/topsql",
"//util/topsql/state",
"//util/tracing",
"@com_github_burntsushi_toml//:toml",
"@com_github_gogo_protobuf//proto",
"@com_github_ngaut_pools//:pools",
Expand Down
19 changes: 8 additions & 11 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
Expand Down Expand Up @@ -65,6 +64,7 @@ import (
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/pingcap/tidb/util/tracing"
"github.com/prometheus/client_golang/prometheus"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -283,12 +283,12 @@ func (a *ExecStmt) GetStmtNode() ast.StmtNode {

// PointGet short path for point exec directly from plan, keep only necessary steps
func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("ExecStmt.PointGet", opentracing.ChildOf(span.Context()))
span1.LogKV("sql", a.OriginText())
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
r, ctx := tracing.StartRegionEx(ctx, "ExecStmt.PointGet")
defer r.End()
if r.Span != nil {
Comment thread
tiancaiamao marked this conversation as resolved.
r.Span.LogKV("sql", a.OriginText())
}

failpoint.Inject("assertTxnManagerInShortPointGetPlan", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true)
// stale read should not reach here
Expand Down Expand Up @@ -921,11 +921,8 @@ func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor

func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlexec.RecordSet, error) {
sctx := a.Ctx
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.handleNoDelayExecutor", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "executor.handleNoDelayExecutor")
defer r.End()

var err error
defer func() {
Expand Down
10 changes: 4 additions & 6 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"strings"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/tracing"
"go.uber.org/zap"
)

Expand All @@ -56,11 +56,9 @@ type Compiler struct {

// Compile compiles an ast.StmtNode to a physical plan.
func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecStmt, err error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.Compile", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "executor.Compile")
defer r.End()

defer func() {
r := recover()
if r == nil {
Expand Down
21 changes: 7 additions & 14 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"math"
"runtime/pprof"
"runtime/trace"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -68,6 +67,7 @@ import (
"github.com/pingcap/tidb/util/resourcegrouptag"
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/pingcap/tidb/util/tracing"
tikverr "github.com/tikv/client-go/v2/error"
tikvstore "github.com/tikv/client-go/v2/kv"
tikvutil "github.com/tikv/client-go/v2/util"
Expand Down Expand Up @@ -314,14 +314,10 @@ func Next(ctx context.Context, e Executor, req *chunk.Chunk) error {
if atomic.LoadUint32(&sessVars.Killed) == 1 {
return ErrQueryInterrupted
}
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan(fmt.Sprintf("%T.Next", e), opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
if trace.IsEnabled() {
defer trace.StartRegion(ctx, fmt.Sprintf("%T.Next", e)).End()
}

r, ctx := tracing.StartRegionEx(ctx, fmt.Sprintf("%T.Next", e))
defer r.End()

if topsqlstate.TopSQLEnabled() && sessVars.StmtCtx.IsSQLAndPlanRegistered.CompareAndSwap(false, true) {
registerSQLAndPlanInExecForTopSQL(sessVars)
}
Expand Down Expand Up @@ -1527,11 +1523,8 @@ func init() {
s.RewritePhaseInfo.DurationPreprocessSubQuery += time.Since(begin)
}(time.Now())

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.EvalSubQuery", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "executor.EvalSubQuery")
defer r.End()

e := newExecutorBuilder(sctx, is, nil)
exec := e.build(p)
Expand Down
22 changes: 6 additions & 16 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"runtime/trace"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/expression"
Expand All @@ -37,6 +36,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/tracing"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -116,11 +116,8 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
}

func prefetchUniqueIndices(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) (map[string][]byte, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchUniqueIndices", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "prefetchUniqueIndices")
defer r.End()

nKeys := 0
for _, r := range rows {
Expand Down Expand Up @@ -148,11 +145,8 @@ func prefetchUniqueIndices(ctx context.Context, txn kv.Transaction, rows []toBeC
}

func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow, values map[string][]byte) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchConflictedOldRows", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "prefetchConflictedOldRows")
defer r.End()

batchKeys := make([]kv.Key, 0, len(rows))
for _, r := range rows {
Expand Down Expand Up @@ -182,11 +176,7 @@ func (e *InsertValues) prefetchDataCache(ctx context.Context, txn kv.Transaction
return nil
}

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchDataCache", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
defer tracing.StartRegion(ctx, "prefetchDataCache").End()
values, err := prefetchUniqueIndices(ctx, txn, rows)
if err != nil {
return err
Expand Down
8 changes: 2 additions & 6 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sync"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/expression"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/tracing"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -1142,11 +1142,7 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
replace bool) error {
// all the rows will be checked, so it is safe to set BatchCheck = true
e.ctx.GetSessionVars().StmtCtx.BatchCheck = true
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("InsertValues.batchCheckAndInsert", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
defer tracing.StartRegion(ctx, "InsertValues.batchCheckAndInsert").End()
start := time.Now()
// Get keys need to be checked.
toBeCheckedRows, err := getKeysNeedCheck(ctx, e.ctx, e.Table, rows)
Expand Down
54 changes: 13 additions & 41 deletions executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package executor
import (
"context"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
Expand All @@ -34,6 +33,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/tracing"
)

type memReader interface {
Expand Down Expand Up @@ -65,11 +65,7 @@ type memIndexReader struct {
}

func buildMemIndexReader(ctx context.Context, us *UnionScanExec, idxReader *IndexReaderExecutor) *memIndexReader {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("buildMemIndexReader", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
defer tracing.StartRegion(ctx, "buildMemIndexReader").End()
kvRanges := idxReader.kvRanges
outputOffset := make([]int, 0, len(us.columns))
for _, col := range idxReader.outputColumns {
Expand All @@ -90,11 +86,7 @@ func buildMemIndexReader(ctx context.Context, us *UnionScanExec, idxReader *Inde
}

func (m *memIndexReader) getMemRows(ctx context.Context) ([][]types.Datum, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("memIndexReader.getMemRows", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
defer tracing.StartRegion(ctx, "memIndexReader.getMemRows").End()
tps := make([]*types.FieldType, 0, len(m.index.Columns)+1)
cols := m.table.Columns
for _, col := range m.index.Columns {
Expand Down Expand Up @@ -190,11 +182,7 @@ type allocBuf struct {
}

func buildMemTableReader(ctx context.Context, us *UnionScanExec, tblReader *TableReaderExecutor) *memTableReader {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("buildMemTableReader", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
defer tracing.StartRegion(ctx, "buildMemTableReader").End()
colIDs := make(map[int64]int, len(us.columns))
for i, col := range us.columns {
colIDs[col.ID] = i
Expand Down Expand Up @@ -235,11 +223,7 @@ func buildMemTableReader(ctx context.Context, us *UnionScanExec, tblReader *Tabl

// TODO: Try to make memXXXReader lazy, There is no need to decode many rows when parent operator only need 1 row.
func (m *memTableReader) getMemRows(ctx context.Context) ([][]types.Datum, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("memTableReader.getMemRows", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
defer tracing.StartRegion(ctx, "memTableReader.getMemRows").End()
mutableRow := chunk.MutRowFromTypes(m.retFieldTypes)
resultRows := make([]types.Datum, len(m.columns))
m.offsets = make([]int, len(m.columns))
Expand Down Expand Up @@ -490,11 +474,8 @@ type memIndexLookUpReader struct {
}

func buildMemIndexLookUpReader(ctx context.Context, us *UnionScanExec, idxLookUpReader *IndexLookUpExecutor) *memIndexLookUpReader {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("buildMemIndexLookUpReader", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
defer tracing.StartRegion(ctx, "buildMemIndexLookUpReader").End()

kvRanges := idxLookUpReader.kvRanges
outputOffset := []int{len(idxLookUpReader.index.Columns)}
memIdxReader := &memIndexReader{
Expand Down Expand Up @@ -527,11 +508,9 @@ func buildMemIndexLookUpReader(ctx context.Context, us *UnionScanExec, idxLookUp
}

func (m *memIndexLookUpReader) getMemRows(ctx context.Context) ([][]types.Datum, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("memIndexLookUpReader.getMemRows", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "memIndexLookUpReader.getMemRows")
defer r.End()

kvRanges := [][]kv.KeyRange{m.idxReader.kvRanges}
tbls := []table.Table{m.table}
if m.partitionMode {
Expand Down Expand Up @@ -604,11 +583,7 @@ type memIndexMergeReader struct {
}

func buildMemIndexMergeReader(ctx context.Context, us *UnionScanExec, indexMergeReader *IndexMergeReaderExecutor) *memIndexMergeReader {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("buildMemIndexMergeReader", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
defer tracing.StartRegion(ctx, "buildMemIndexMergeReader").End()
indexCount := len(indexMergeReader.indexes)
memReaders := make([]memReader, 0, indexCount)
for i := 0; i < indexCount; i++ {
Expand Down Expand Up @@ -661,11 +636,8 @@ func buildMemIndexMergeReader(ctx context.Context, us *UnionScanExec, indexMerge
}

func (m *memIndexMergeReader) getMemRows(ctx context.Context) ([][]types.Datum, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("memIndexMergeReader.getMemRows", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "memIndexMergeReader.getMemRows")
defer r.End()
tbls := []table.Table{m.table}
// [partNum][indexNum][rangeNum]
var kvRanges [][][]kv.KeyRange
Expand Down
Loading