diff --git a/executor/builder.go b/executor/builder.go index fb94a1d8d55b2..5a24c317aa38f 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1391,6 +1391,11 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu return e } +// IsStaleness returns if the query is staleness +func (b *executorBuilder) IsStaleness() bool { + return b.ctx.GetSessionVars().TxnCtx.IsStaleness || b.explicitStaleness +} + // `getSnapshotTS` returns the timestamp of the snapshot that a reader should read. func (b *executorBuilder) getSnapshotTS() (uint64, error) { // `refreshForUpdateTSForRC` should always be invoked before returning the cached value to @@ -2662,6 +2667,10 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea return nil, err } ts := v.GetTableScan() + if ts.Table.TempTableType != model.TempTableNone && b.IsStaleness() { + return nil, errors.New("can not stale read temporary table") + } + tbl, _ := b.is.TableByID(ts.Table.ID) isPartition, physicalTableID := ts.IsPartition() if isPartition { @@ -2774,6 +2783,11 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E } ts := v.GetTableScan() + if ts.Table.TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + ret.ranges = ts.Ranges sctx := b.ctx.GetSessionVars().StmtCtx sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID) @@ -2991,13 +3005,18 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea } func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) Executor { + is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) + if is.Table.TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + ret, err := buildNoRangeIndexReader(b, v) if err != nil { b.err = err return nil } - is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) ret.ranges = is.Ranges sctx := b.ctx.GetSessionVars().StmtCtx sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O) @@ -3145,13 +3164,18 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn } func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLookUpReader) Executor { + is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) + if is.Table.TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + ret, err := buildNoRangeIndexLookUpReader(b, v) if err != nil { b.err = err return nil } - is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) ret.ranges = is.Ranges @@ -3255,6 +3279,12 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd } func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) Executor { + ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) + if ts.Table.TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + ret, err := buildNoRangeIndexMergeReader(b, v) if err != nil { b.err = err @@ -3274,7 +3304,6 @@ func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMerg } } } - ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID) executorCounterIndexMergeReaderExecutor.Inc() @@ -4048,6 +4077,11 @@ func NewRowDecoder(ctx sessionctx.Context, schema *expression.Schema, tbl *model } func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan) Executor { + if plan.TblInfo.TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + startTS, err := b.getSnapshotTS() if err != nil { b.err = err @@ -4182,6 +4216,11 @@ func fullRangePartition(idxArr []int) bool { } func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) *TableSampleExecutor { + if v.TableInfo.Meta().TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + startTS, err := b.getSnapshotTS() if err != nil { b.err = err diff --git a/executor/point_get.go b/executor/point_get.go index 65200df9e965a..a4ec6b5551d07 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -42,6 +42,11 @@ import ( ) func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor { + if p.TblInfo.TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + startTS, err := b.getSnapshotTS() if err != nil { b.err = err diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 6ca8aac96f0f5..73b821f1798b6 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -15,6 +15,7 @@ package executor_test import ( "fmt" + "strings" "time" . "github.com/pingcap/check" @@ -937,3 +938,92 @@ func (s *testStaleTxnSuite) TestStaleSelect(c *C) { time.Sleep(tolerance) tk.MustQuery(fmt.Sprintf("select * from t as of timestamp '%s' where c=5", time6.Format("2006-1-2 15:04:05.000"))).Check(testkit.Rows("4 5 ")) } + +func (s *testStaleTxnSuite) TestStaleReadTemporaryTable(c *C) { + tk := testkit.NewTestKit(c, s.store) + // For mocktikv, safe point is not initialized, we manually insert it for snapshot to use. + safePointName := "tikv_gc_safe_point" + safePointValue := "20160102-15:04:05 -0700" + safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)" + updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s') + ON DUPLICATE KEY + UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment) + tk.MustExec(updateSafePoint) + + tk.MustExec("set @@tidb_enable_global_temporary_table=1") + tk.MustExec("use test") + tk.MustExec("drop table if exists tmp1") + tk.MustExec("create global temporary table tmp1 " + + "(id int not null primary key, code int not null, value int default null, unique key code(code))" + + "on commit delete rows") + time.Sleep(time.Second) + tk.MustGetErrMsg("select * from tmp1 as of timestamp NOW() where id=1", "can not stale read temporary table") + + queries := []struct { + sql string + }{ + { + sql: "select * from tmp1 where id=1", + }, + { + sql: "select * from tmp1 where code=1", + }, + { + sql: "select * from tmp1 where id in (1, 2, 3)", + }, + { + sql: "select * from tmp1 where code in (1, 2, 3)", + }, + { + sql: "select * from tmp1 where id > 1", + }, + { + sql: "select /*+use_index(tmp1, code)*/ * from tmp1 where code > 1", + }, + { + sql: "select /*+use_index(tmp1, code)*/ code from tmp1 where code > 1", + }, + { + sql: "select * from tmp1 tablesample regions()", + }, + { + sql: "select /*+ use_index_merge(tmp1, primary, code) */ * from tmp1 where id > 1 or code > 2", + }, + } + + addStaleReadToSQL := func(sql string) string { + idx := strings.Index(sql, " where ") + if idx < 0 { + return "" + } + return sql[0:idx] + " as of timestamp NOW()" + sql[idx:] + } + + for _, query := range queries { + sql := addStaleReadToSQL(query.sql) + if sql != "" { + tk.MustGetErrMsg(sql, "can not stale read temporary table") + } + } + + tk.MustExec("start transaction read only as of timestamp NOW()") + for _, query := range queries { + tk.MustGetErrMsg(query.sql, "can not stale read temporary table") + } + tk.MustExec("commit") + + for _, query := range queries { + tk.MustExec(query.sql) + } + + tk.MustExec("set transaction read only as of timestamp NOW()") + tk.MustExec("start transaction") + for _, query := range queries { + tk.MustGetErrMsg(query.sql, "can not stale read temporary table") + } + tk.MustExec("commit") + + for _, query := range queries { + tk.MustExec(query.sql) + } +}