From a8af9dea324204a2137de6bd7a1bd7e80c6cc58b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Mon, 7 Jun 2021 21:06:28 +0800 Subject: [PATCH 1/2] cherry pick #25206 to release-5.1 Signed-off-by: ti-srebot --- executor/adapter.go | 14 ++++++ executor/builder.go | 48 ++++++++++++++++++-- executor/compiler.go | 5 +++ executor/point_get.go | 5 +++ executor/stale_txn_test.go | 91 ++++++++++++++++++++++++++++++++++++++ planner/core/preprocess.go | 15 +++++++ 6 files changed, 175 insertions(+), 3 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index faadfe267bfdf..16d03f655cc59 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -191,8 +191,11 @@ type ExecStmt struct { SnapshotTS uint64 // ExplicitStaleness means whether the 'SELECT' clause are using 'AS OF TIMESTAMP' to perform stale read explicitly. ExplicitStaleness bool +<<<<<<< HEAD // TxnScope indicates the scope the store selector scope the request visited TxnScope string +======= +>>>>>>> 811253785... planner, executor: add stale read compatibility for temporary table (#25206) // InfoSchema stores a reference to the schema information. InfoSchema infoschema.InfoSchema // Plan stores a reference to the final physical plan. @@ -291,9 +294,14 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) { return 0, err } a.InfoSchema = ret.InfoSchema +<<<<<<< HEAD a.SnapshotTS = ret.LastSnapshotTS a.ExplicitStaleness = ret.ExplicitStaleness a.TxnScope = ret.TxnScope +======= + a.SnapshotTS = ret.SnapshotTS + a.ExplicitStaleness = ret.ExplicitStaleness +>>>>>>> 811253785... planner, executor: add stale read compatibility for temporary table (#25206) p, names, err := planner.Optimize(ctx, a.Ctx, a.StmtNode, a.InfoSchema) if err != nil { return 0, err @@ -795,7 +803,13 @@ func (a *ExecStmt) buildExecutor() (Executor, error) { ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow } +<<<<<<< HEAD b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti, a.SnapshotTS, a.ExplicitStaleness, a.TxnScope) +======= + b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti) + b.snapshotTS = a.SnapshotTS + b.explicitStaleness = a.ExplicitStaleness +>>>>>>> 811253785... planner, executor: add stale read compatibility for temporary table (#25206) e := b.build(a.Plan) if b.err != nil { return nil, errors.Trace(b.err) diff --git a/executor/builder.go b/executor/builder.go index fb94a1d8d55b2..4d3d9777a932a 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -85,7 +85,10 @@ type executorBuilder struct { Ti *TelemetryInfo // ExplicitStaleness means whether the 'SELECT' clause are using 'AS OF TIMESTAMP' to perform stale read explicitly. explicitStaleness bool +<<<<<<< HEAD txnScope string +======= +>>>>>>> 811253785... planner, executor: add stale read compatibility for temporary table (#25206) } // CTEStorages stores resTbl and iterInTbl for CTEExec. @@ -1391,6 +1394,11 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu return e } +// IsStaleness returns if the query is staleness +func (b *executorBuilder) IsStaleness() bool { + return b.ctx.GetSessionVars().TxnCtx.IsStaleness || b.explicitStaleness +} + // `getSnapshotTS` returns the timestamp of the snapshot that a reader should read. func (b *executorBuilder) getSnapshotTS() (uint64, error) { // `refreshForUpdateTSForRC` should always be invoked before returning the cached value to @@ -2662,6 +2670,10 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea return nil, err } ts := v.GetTableScan() + if ts.Table.TempTableType != model.TempTableNone && b.IsStaleness() { + return nil, errors.New("can not stale read temporary table") + } + tbl, _ := b.is.TableByID(ts.Table.ID) isPartition, physicalTableID := ts.IsPartition() if isPartition { @@ -2774,6 +2786,11 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E } ts := v.GetTableScan() + if ts.Table.TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + ret.ranges = ts.Ranges sctx := b.ctx.GetSessionVars().StmtCtx sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID) @@ -2991,13 +3008,18 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea } func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) Executor { + is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) + if is.Table.TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + ret, err := buildNoRangeIndexReader(b, v) if err != nil { b.err = err return nil } - is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) ret.ranges = is.Ranges sctx := b.ctx.GetSessionVars().StmtCtx sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O) @@ -3145,13 +3167,18 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn } func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLookUpReader) Executor { + is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) + if is.Table.TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + ret, err := buildNoRangeIndexLookUpReader(b, v) if err != nil { b.err = err return nil } - is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) ret.ranges = is.Ranges @@ -3255,6 +3282,12 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd } func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) Executor { + ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) + if ts.Table.TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + ret, err := buildNoRangeIndexMergeReader(b, v) if err != nil { b.err = err @@ -3274,7 +3307,6 @@ func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMerg } } } - ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID) executorCounterIndexMergeReaderExecutor.Inc() @@ -4048,6 +4080,11 @@ func NewRowDecoder(ctx sessionctx.Context, schema *expression.Schema, tbl *model } func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan) Executor { + if plan.TblInfo.TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + startTS, err := b.getSnapshotTS() if err != nil { b.err = err @@ -4182,6 +4219,11 @@ func fullRangePartition(idxArr []int) bool { } func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) *TableSampleExecutor { + if v.TableInfo.Meta().TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + startTS, err := b.getSnapshotTS() if err != nil { b.err = err diff --git a/executor/compiler.go b/executor/compiler.go index e5a6f9b0c6d69..15f674496799b 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -70,9 +70,14 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm } return &ExecStmt{ GoCtx: ctx, +<<<<<<< HEAD SnapshotTS: ret.LastSnapshotTS, ExplicitStaleness: ret.ExplicitStaleness, TxnScope: ret.TxnScope, +======= + SnapshotTS: ret.SnapshotTS, + ExplicitStaleness: ret.ExplicitStaleness, +>>>>>>> 811253785... planner, executor: add stale read compatibility for temporary table (#25206) InfoSchema: ret.InfoSchema, Plan: finalPlan, LowerPriority: lowerPriority, diff --git a/executor/point_get.go b/executor/point_get.go index 65200df9e965a..a4ec6b5551d07 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -42,6 +42,11 @@ import ( ) func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor { + if p.TblInfo.TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + startTS, err := b.getSnapshotTS() if err != nil { b.err = err diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 6ca8aac96f0f5..8c00c3aaace80 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -15,6 +15,7 @@ package executor_test import ( "fmt" + "strings" "time" . "github.com/pingcap/check" @@ -861,6 +862,7 @@ func (s *testStaleTxnSuite) TestSetTransactionInfoSchema(c *C) { c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer3) } +<<<<<<< HEAD func (s *testStaleTxnSuite) TestStaleSelect(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") @@ -936,4 +938,93 @@ func (s *testStaleTxnSuite) TestStaleSelect(c *C) { tk.MustExec("insert into t values (5, 5, 5)") time.Sleep(tolerance) tk.MustQuery(fmt.Sprintf("select * from t as of timestamp '%s' where c=5", time6.Format("2006-1-2 15:04:05.000"))).Check(testkit.Rows("4 5 ")) +======= +func (s *testStaleTxnSuite) TestStaleReadTemporaryTable(c *C) { + tk := testkit.NewTestKit(c, s.store) + // For mocktikv, safe point is not initialized, we manually insert it for snapshot to use. + safePointName := "tikv_gc_safe_point" + safePointValue := "20160102-15:04:05 -0700" + safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)" + updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s') + ON DUPLICATE KEY + UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment) + tk.MustExec(updateSafePoint) + + tk.MustExec("set @@tidb_enable_global_temporary_table=1") + tk.MustExec("use test") + tk.MustExec("drop table if exists tmp1") + tk.MustExec("create global temporary table tmp1 " + + "(id int not null primary key, code int not null, value int default null, unique key code(code))" + + "on commit delete rows") + time.Sleep(time.Second) + tk.MustGetErrMsg("select * from tmp1 as of timestamp NOW() where id=1", "can not stale read temporary table") + + queries := []struct { + sql string + }{ + { + sql: "select * from tmp1 where id=1", + }, + { + sql: "select * from tmp1 where code=1", + }, + { + sql: "select * from tmp1 where id in (1, 2, 3)", + }, + { + sql: "select * from tmp1 where code in (1, 2, 3)", + }, + { + sql: "select * from tmp1 where id > 1", + }, + { + sql: "select /*+use_index(tmp1, code)*/ * from tmp1 where code > 1", + }, + { + sql: "select /*+use_index(tmp1, code)*/ code from tmp1 where code > 1", + }, + { + sql: "select * from tmp1 tablesample regions()", + }, + { + sql: "select /*+ use_index_merge(tmp1, primary, code) */ * from tmp1 where id > 1 or code > 2", + }, + } + + addStaleReadToSQL := func(sql string) string { + idx := strings.Index(sql, " where ") + if idx < 0 { + return "" + } + return sql[0:idx] + " as of timestamp NOW()" + sql[idx:] + } + + for _, query := range queries { + sql := addStaleReadToSQL(query.sql) + if sql != "" { + tk.MustGetErrMsg(sql, "can not stale read temporary table") + } + } + + tk.MustExec("start transaction read only as of timestamp NOW()") + for _, query := range queries { + tk.MustGetErrMsg(query.sql, "can not stale read temporary table") + } + tk.MustExec("commit") + + for _, query := range queries { + tk.MustExec(query.sql) + } + + tk.MustExec("set transaction read only as of timestamp NOW()") + tk.MustExec("start transaction") + for _, query := range queries { + tk.MustGetErrMsg(query.sql, "can not stale read temporary table") + } + tk.MustExec("commit") + + for _, query := range queries { + tk.MustExec(query.sql) + } +>>>>>>> 811253785... planner, executor: add stale read compatibility for temporary table (#25206) } diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index 4dca6ddb50170..cebf7b8384cd5 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -127,6 +127,7 @@ const ( // PreprocessorReturn is used to retain information obtained in the preprocessor. type PreprocessorReturn struct { +<<<<<<< HEAD initedLastSnapshotTS bool ExplicitStaleness bool SnapshotTSEvaluator func(sessionctx.Context) (uint64, error) @@ -135,6 +136,11 @@ type PreprocessorReturn struct { LastSnapshotTS uint64 InfoSchema infoschema.InfoSchema TxnScope string +======= + SnapshotTS uint64 + ExplicitStaleness bool + InfoSchema infoschema.InfoSchema +>>>>>>> 811253785... planner, executor: add stale read compatibility for temporary table (#25206) } // preprocessor is an ast.Visitor that preprocess @@ -1450,6 +1456,15 @@ func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) { if p.err != nil { return } +<<<<<<< HEAD +======= + p.SnapshotTS = ts + p.ExplicitStaleness = true + p.InfoSchema = is + } + if p.SnapshotTS != ts { + p.err = ErrDifferentAsOf.GenWithStack("can not set different time in the as of") +>>>>>>> 811253785... planner, executor: add stale read compatibility for temporary table (#25206) } p.initedLastSnapshotTS = true } From 721a05c961263c7c96bbe3c664e645d1bd2dc869 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Mon, 28 Jun 2021 11:33:44 +0800 Subject: [PATCH 2/2] fix conflict --- executor/adapter.go | 14 -------------- executor/builder.go | 3 --- executor/compiler.go | 5 ----- executor/stale_txn_test.go | 5 ++--- planner/core/preprocess.go | 15 --------------- 5 files changed, 2 insertions(+), 40 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 16d03f655cc59..faadfe267bfdf 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -191,11 +191,8 @@ type ExecStmt struct { SnapshotTS uint64 // ExplicitStaleness means whether the 'SELECT' clause are using 'AS OF TIMESTAMP' to perform stale read explicitly. ExplicitStaleness bool -<<<<<<< HEAD // TxnScope indicates the scope the store selector scope the request visited TxnScope string -======= ->>>>>>> 811253785... planner, executor: add stale read compatibility for temporary table (#25206) // InfoSchema stores a reference to the schema information. InfoSchema infoschema.InfoSchema // Plan stores a reference to the final physical plan. @@ -294,14 +291,9 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) { return 0, err } a.InfoSchema = ret.InfoSchema -<<<<<<< HEAD a.SnapshotTS = ret.LastSnapshotTS a.ExplicitStaleness = ret.ExplicitStaleness a.TxnScope = ret.TxnScope -======= - a.SnapshotTS = ret.SnapshotTS - a.ExplicitStaleness = ret.ExplicitStaleness ->>>>>>> 811253785... planner, executor: add stale read compatibility for temporary table (#25206) p, names, err := planner.Optimize(ctx, a.Ctx, a.StmtNode, a.InfoSchema) if err != nil { return 0, err @@ -803,13 +795,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) { ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow } -<<<<<<< HEAD b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti, a.SnapshotTS, a.ExplicitStaleness, a.TxnScope) -======= - b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti) - b.snapshotTS = a.SnapshotTS - b.explicitStaleness = a.ExplicitStaleness ->>>>>>> 811253785... planner, executor: add stale read compatibility for temporary table (#25206) e := b.build(a.Plan) if b.err != nil { return nil, errors.Trace(b.err) diff --git a/executor/builder.go b/executor/builder.go index 4d3d9777a932a..5a24c317aa38f 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -85,10 +85,7 @@ type executorBuilder struct { Ti *TelemetryInfo // ExplicitStaleness means whether the 'SELECT' clause are using 'AS OF TIMESTAMP' to perform stale read explicitly. explicitStaleness bool -<<<<<<< HEAD txnScope string -======= ->>>>>>> 811253785... planner, executor: add stale read compatibility for temporary table (#25206) } // CTEStorages stores resTbl and iterInTbl for CTEExec. diff --git a/executor/compiler.go b/executor/compiler.go index 15f674496799b..e5a6f9b0c6d69 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -70,14 +70,9 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm } return &ExecStmt{ GoCtx: ctx, -<<<<<<< HEAD SnapshotTS: ret.LastSnapshotTS, ExplicitStaleness: ret.ExplicitStaleness, TxnScope: ret.TxnScope, -======= - SnapshotTS: ret.SnapshotTS, - ExplicitStaleness: ret.ExplicitStaleness, ->>>>>>> 811253785... planner, executor: add stale read compatibility for temporary table (#25206) InfoSchema: ret.InfoSchema, Plan: finalPlan, LowerPriority: lowerPriority, diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 8c00c3aaace80..73b821f1798b6 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -862,7 +862,6 @@ func (s *testStaleTxnSuite) TestSetTransactionInfoSchema(c *C) { c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer3) } -<<<<<<< HEAD func (s *testStaleTxnSuite) TestStaleSelect(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") @@ -938,7 +937,8 @@ func (s *testStaleTxnSuite) TestStaleSelect(c *C) { tk.MustExec("insert into t values (5, 5, 5)") time.Sleep(tolerance) tk.MustQuery(fmt.Sprintf("select * from t as of timestamp '%s' where c=5", time6.Format("2006-1-2 15:04:05.000"))).Check(testkit.Rows("4 5 ")) -======= +} + func (s *testStaleTxnSuite) TestStaleReadTemporaryTable(c *C) { tk := testkit.NewTestKit(c, s.store) // For mocktikv, safe point is not initialized, we manually insert it for snapshot to use. @@ -1026,5 +1026,4 @@ func (s *testStaleTxnSuite) TestStaleReadTemporaryTable(c *C) { for _, query := range queries { tk.MustExec(query.sql) } ->>>>>>> 811253785... planner, executor: add stale read compatibility for temporary table (#25206) } diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index cebf7b8384cd5..4dca6ddb50170 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -127,7 +127,6 @@ const ( // PreprocessorReturn is used to retain information obtained in the preprocessor. type PreprocessorReturn struct { -<<<<<<< HEAD initedLastSnapshotTS bool ExplicitStaleness bool SnapshotTSEvaluator func(sessionctx.Context) (uint64, error) @@ -136,11 +135,6 @@ type PreprocessorReturn struct { LastSnapshotTS uint64 InfoSchema infoschema.InfoSchema TxnScope string -======= - SnapshotTS uint64 - ExplicitStaleness bool - InfoSchema infoschema.InfoSchema ->>>>>>> 811253785... planner, executor: add stale read compatibility for temporary table (#25206) } // preprocessor is an ast.Visitor that preprocess @@ -1456,15 +1450,6 @@ func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) { if p.err != nil { return } -<<<<<<< HEAD -======= - p.SnapshotTS = ts - p.ExplicitStaleness = true - p.InfoSchema = is - } - if p.SnapshotTS != ts { - p.err = ErrDifferentAsOf.GenWithStack("can not set different time in the as of") ->>>>>>> 811253785... planner, executor: add stale read compatibility for temporary table (#25206) } p.initedLastSnapshotTS = true }