Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,11 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu
return e
}

// IsStaleness returns if the query is staleness
func (b *executorBuilder) IsStaleness() bool {
return b.ctx.GetSessionVars().TxnCtx.IsStaleness || b.explicitStaleness
}

// `getSnapshotTS` returns the timestamp of the snapshot that a reader should read.
func (b *executorBuilder) getSnapshotTS() (uint64, error) {
// `refreshForUpdateTSForRC` should always be invoked before returning the cached value to
Expand Down Expand Up @@ -2662,6 +2667,10 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
return nil, err
}
ts := v.GetTableScan()
if ts.Table.TempTableType != model.TempTableNone && b.IsStaleness() {
return nil, errors.New("can not stale read temporary table")
}

tbl, _ := b.is.TableByID(ts.Table.ID)
isPartition, physicalTableID := ts.IsPartition()
if isPartition {
Expand Down Expand Up @@ -2774,6 +2783,11 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E
}

ts := v.GetTableScan()
if ts.Table.TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
return nil
}

ret.ranges = ts.Ranges
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)
Expand Down Expand Up @@ -2991,13 +3005,18 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea
}

func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) Executor {
is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
if is.Table.TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
return nil
}

ret, err := buildNoRangeIndexReader(b, v)
if err != nil {
b.err = err
return nil
}

is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
ret.ranges = is.Ranges
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O)
Expand Down Expand Up @@ -3145,13 +3164,18 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
}

func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLookUpReader) Executor {
is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
if is.Table.TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
return nil
}

ret, err := buildNoRangeIndexLookUpReader(b, v)
if err != nil {
b.err = err
return nil
}

is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)

ret.ranges = is.Ranges
Expand Down Expand Up @@ -3255,6 +3279,12 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
}

func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) Executor {
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
if ts.Table.TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
return nil
}

ret, err := buildNoRangeIndexMergeReader(b, v)
if err != nil {
b.err = err
Expand All @@ -3274,7 +3304,6 @@ func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMerg
}
}
}
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)
executorCounterIndexMergeReaderExecutor.Inc()

Expand Down Expand Up @@ -4048,6 +4077,11 @@ func NewRowDecoder(ctx sessionctx.Context, schema *expression.Schema, tbl *model
}

func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan) Executor {
if plan.TblInfo.TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
return nil
}

startTS, err := b.getSnapshotTS()
if err != nil {
b.err = err
Expand Down Expand Up @@ -4182,6 +4216,11 @@ func fullRangePartition(idxArr []int) bool {
}

func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) *TableSampleExecutor {
if v.TableInfo.Meta().TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
return nil
}

startTS, err := b.getSnapshotTS()
if err != nil {
b.err = err
Expand Down
5 changes: 5 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ import (
)

func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
if p.TblInfo.TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
return nil
}

startTS, err := b.getSnapshotTS()
if err != nil {
b.err = err
Expand Down
90 changes: 90 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor_test

import (
"fmt"
"strings"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -937,3 +938,92 @@ func (s *testStaleTxnSuite) TestStaleSelect(c *C) {
time.Sleep(tolerance)
tk.MustQuery(fmt.Sprintf("select * from t as of timestamp '%s' where c=5", time6.Format("2006-1-2 15:04:05.000"))).Check(testkit.Rows("4 5 <nil>"))
}

func (s *testStaleTxnSuite) TestStaleReadTemporaryTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
safePointName := "tikv_gc_safe_point"
safePointValue := "20160102-15:04:05 -0700"
safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)"
updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s')
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
tk.MustExec(updateSafePoint)

tk.MustExec("set @@tidb_enable_global_temporary_table=1")
tk.MustExec("use test")
tk.MustExec("drop table if exists tmp1")
tk.MustExec("create global temporary table tmp1 " +
"(id int not null primary key, code int not null, value int default null, unique key code(code))" +
"on commit delete rows")
time.Sleep(time.Second)
tk.MustGetErrMsg("select * from tmp1 as of timestamp NOW() where id=1", "can not stale read temporary table")

queries := []struct {
sql string
}{
{
sql: "select * from tmp1 where id=1",
},
{
sql: "select * from tmp1 where code=1",
},
{
sql: "select * from tmp1 where id in (1, 2, 3)",
},
{
sql: "select * from tmp1 where code in (1, 2, 3)",
},
{
sql: "select * from tmp1 where id > 1",
},
{
sql: "select /*+use_index(tmp1, code)*/ * from tmp1 where code > 1",
},
{
sql: "select /*+use_index(tmp1, code)*/ code from tmp1 where code > 1",
},
{
sql: "select * from tmp1 tablesample regions()",
},
{
sql: "select /*+ use_index_merge(tmp1, primary, code) */ * from tmp1 where id > 1 or code > 2",
},
}

addStaleReadToSQL := func(sql string) string {
idx := strings.Index(sql, " where ")
if idx < 0 {
return ""
}
return sql[0:idx] + " as of timestamp NOW()" + sql[idx:]
}

for _, query := range queries {
sql := addStaleReadToSQL(query.sql)
if sql != "" {
tk.MustGetErrMsg(sql, "can not stale read temporary table")
}
}

tk.MustExec("start transaction read only as of timestamp NOW()")
for _, query := range queries {
tk.MustGetErrMsg(query.sql, "can not stale read temporary table")
}
tk.MustExec("commit")

for _, query := range queries {
tk.MustExec(query.sql)
}

tk.MustExec("set transaction read only as of timestamp NOW()")
tk.MustExec("start transaction")
for _, query := range queries {
tk.MustGetErrMsg(query.sql, "can not stale read temporary table")
}
tk.MustExec("commit")

for _, query := range queries {
tk.MustExec(query.sql)
}
}