Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
f9b49db
planner, exector: supports select statement with AS OF
nolouch May 13, 2021
2ff63cf
update proto
nolouch May 13, 2021
6afe3b9
fix
nolouch May 13, 2021
25f55ec
format
nolouch May 13, 2021
5fad63c
add more tests
nolouch May 16, 2021
c0a561a
Merge remote-tracking branch 'origin/master' into stale-select
nolouch May 16, 2021
b658219
clean up
nolouch May 16, 2021
a06b1a3
Merge remote-tracking branch 'origin/master' into stale-select
nolouch May 17, 2021
0f99648
update parser
nolouch May 17, 2021
c522744
address comments
nolouch May 18, 2021
09e0a2e
Merge remote-tracking branch 'origin/master' into stale-select
nolouch May 19, 2021
8446742
address comments
nolouch May 19, 2021
03e8455
*: use preprocessor to get timestamp
xhebox May 19, 2021
2c7113e
*: remove asofResolver, rename as WithPreprocessorReturn
xhebox May 20, 2021
758fe74
use millisecond precision
nolouch May 20, 2021
d3c9647
*: fix tests
xhebox May 21, 2021
11f2c16
*: fix remaining tests
xhebox May 21, 2021
aee7755
Merge branch 'master' into stale-select
nolouch May 21, 2021
5d745ef
Merge remote-tracking branch 'origin/master' into stale-select
nolouch May 24, 2021
3fa95e5
extract calculateTsExpr
nolouch May 24, 2021
8ce9e5f
fix test
nolouch May 24, 2021
f0aefdc
update mod
nolouch May 24, 2021
7e9a859
Merge branch 'master' into stale-select
xhebox May 24, 2021
e929ee6
Merge remote-tracking branch 'origin' into stale-select
nolouch May 24, 2021
47c20b4
fix conflict
nolouch May 24, 2021
6dbf8ef
try to fix test
nolouch May 25, 2021
ab2825b
address comment
nolouch May 25, 2021
0fc6fb5
Merge remote-tracking branch 'origin/master' into stale-select
nolouch May 25, 2021
4eeb62c
fix test
nolouch May 25, 2021
58c3c0d
*: try to fix
xhebox May 25, 2021
865db75
Merge remote-tracking branch 'pingcap/master' into stale_1
xhebox May 25, 2021
880541b
try fix
nolouch May 25, 2021
c0b5145
add comment
nolouch May 26, 2021
0713d8f
Merge branch 'master' into stale-select
nolouch May 26, 2021
c88bd2f
clean
nolouch May 26, 2021
82674a7
Merge remote-tracking branch 'nolouch/stale-select' into stale-select
nolouch May 26, 2021
36477da
try stable
nolouch May 26, 2021
8fb38a1
address comment
nolouch May 26, 2021
e42f736
Merge branch 'master' into stale-select
nolouch May 26, 2021
55966f9
Merge branch 'master' into stale-select
nolouch May 26, 2021
0f56721
address comment
nolouch May 27, 2021
c7a8a96
Merge remote-tracking branch 'nolouch/stale-select' into stale-select
nolouch May 27, 2021
df4850f
Merge branch 'master' into stale-select
ti-chi-bot May 27, 2021
6837ab5
Merge branch 'master' into stale-select
ti-chi-bot May 27, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cznic/mathutil"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -177,6 +179,9 @@ func (a *recordSet) OnFetchReturned() {
type ExecStmt struct {
// GoCtx stores parent go context.Context for a stmt.
GoCtx context.Context
// SnapshotTS stores the timestamp for stale read.
// It is not equivalent to session variables's snapshot ts, it only use to build the executor.
SnapshotTS uint64
// InfoSchema stores a reference to the schema information.
InfoSchema infoschema.InfoSchema
// Plan stores a reference to the final physical plan.
Expand Down Expand Up @@ -268,18 +273,19 @@ func (a *ExecStmt) IsReadOnly(vars *variable.SessionVars) bool {
// RebuildPlan rebuilds current execute statement plan.
// It returns the current information schema version that 'a' is using.
func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
is := a.Ctx.GetInfoSchema().(infoschema.InfoSchema)
a.InfoSchema = is
if err := plannercore.Preprocess(a.Ctx, a.StmtNode, is, plannercore.InTxnRetry); err != nil {
ret := &plannercore.PreprocessorReturn{}
if err := plannercore.Preprocess(a.Ctx, a.StmtNode, plannercore.InTxnRetry, plannercore.WithPreprocessorReturn(ret)); err != nil {
return 0, err
}
p, names, err := planner.Optimize(ctx, a.Ctx, a.StmtNode, is)
a.InfoSchema = ret.InfoSchema
a.SnapshotTS = ret.SnapshotTS
p, names, err := planner.Optimize(ctx, a.Ctx, a.StmtNode, a.InfoSchema)
if err != nil {
return 0, err
}
a.OutputNames = names
a.Plan = p
return is.SchemaMetaVersion(), nil
return a.InfoSchema.SchemaMetaVersion(), nil
}

// Exec builds an Executor from a plan. If the Executor doesn't return result,
Expand All @@ -305,6 +311,25 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
logutil.Logger(ctx).Error("execute sql panic", zap.String("sql", a.GetTextToLog()), zap.Stack("stack"))
}()

failpoint.Inject("assertStaleTSO", func(val failpoint.Value) {
if n, ok := val.(int); ok {
startTS := oracle.ExtractPhysical(a.SnapshotTS) / 1000
if n != int(startTS) {
panic("different tso")
}
failpoint.Return()
}
})
failpoint.Inject("assertStaleTSOWithTolerance", func(val failpoint.Value) {
if n, ok := val.(int); ok {
// Convert to seconds
startTS := oracle.ExtractPhysical(a.SnapshotTS) / 1000
if int(startTS) <= n-1 || n+1 <= int(startTS) {
panic("tso violate tolerance")
}
failpoint.Return()
}
})
sctx := a.Ctx
ctx = util.SetSessionID(ctx, sctx.GetSessionVars().ConnectionID)
if _, ok := a.Plan.(*plannercore.Analyze); ok && sctx.GetSessionVars().InRestrictedSQL {
Expand Down Expand Up @@ -747,6 +772,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
}

b := newExecutorBuilder(ctx, a.InfoSchema)
b.snapshotTS = a.SnapshotTS
e := b.build(a.Plan)
if b.err != nil {
return nil, errors.Trace(b.err)
Expand Down
1 change: 0 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,6 @@ func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor {
base.initCap = chunk.ZeroCapacity
return &PrepareExec{
baseExecutor: base,
is: b.is,
name: v.Name,
sqlText: v.SQLText,
}
Expand Down
10 changes: 5 additions & 5 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -53,13 +52,13 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
ctx = opentracing.ContextWithSpan(ctx, span1)
}

infoSchema := c.Ctx.GetInfoSchema().(infoschema.InfoSchema)
if err := plannercore.Preprocess(c.Ctx, stmtNode, infoSchema); err != nil {
ret := &plannercore.PreprocessorReturn{}
if err := plannercore.Preprocess(c.Ctx, stmtNode, plannercore.WithPreprocessorReturn(ret)); err != nil {
return nil, err
}
stmtNode = plannercore.TryAddExtraLimit(c.Ctx, stmtNode)

finalPlan, names, err := planner.Optimize(ctx, c.Ctx, stmtNode, infoSchema)
finalPlan, names, err := planner.Optimize(ctx, c.Ctx, stmtNode, ret.InfoSchema)
if err != nil {
return nil, err
}
Expand All @@ -71,7 +70,8 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
}
return &ExecStmt{
GoCtx: ctx,
InfoSchema: infoSchema,
SnapshotTS: ret.SnapshotTS,
InfoSchema: ret.InfoSchema,
Plan: finalPlan,
LowerPriority: lowerPriority,
Text: stmtNode.Text(),
Expand Down
12 changes: 6 additions & 6 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2349,14 +2349,14 @@ func (s *testSuiteP2) TestIsPointGet(c *C) {
"select * from help_topic where help_topic_id=1": true,
"select * from help_topic where help_category_id=1": false,
}
infoSchema := ctx.GetInfoSchema().(infoschema.InfoSchema)

for sqlStr, result := range tests {
stmtNode, err := s.ParseOneStmt(sqlStr, "", "")
c.Check(err, IsNil)
err = plannercore.Preprocess(ctx, stmtNode, infoSchema)
preprocessorReturn := &plannercore.PreprocessorReturn{}
err = plannercore.Preprocess(ctx, stmtNode, plannercore.WithPreprocessorReturn(preprocessorReturn))
c.Check(err, IsNil)
p, _, err := planner.Optimize(context.TODO(), ctx, stmtNode, infoSchema)
p, _, err := planner.Optimize(context.TODO(), ctx, stmtNode, preprocessorReturn.InfoSchema)
c.Check(err, IsNil)
ret, err := plannercore.IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx, p)
c.Assert(err, IsNil)
Expand All @@ -2381,13 +2381,13 @@ func (s *testSuiteP2) TestClusteredIndexIsPointGet(c *C) {
"select * from t where a='x' and c='x'": true,
"select * from t where a='x' and c='x' and b=1": false,
}
infoSchema := ctx.GetInfoSchema().(infoschema.InfoSchema)
for sqlStr, result := range tests {
stmtNode, err := s.ParseOneStmt(sqlStr, "", "")
c.Check(err, IsNil)
err = plannercore.Preprocess(ctx, stmtNode, infoSchema)
preprocessorReturn := &plannercore.PreprocessorReturn{}
err = plannercore.Preprocess(ctx, stmtNode, plannercore.WithPreprocessorReturn(preprocessorReturn))
c.Check(err, IsNil)
p, _, err := planner.Optimize(context.TODO(), ctx, stmtNode, infoSchema)
p, _, err := planner.Optimize(context.TODO(), ctx, stmtNode, preprocessorReturn.InfoSchema)
c.Check(err, IsNil)
ret, err := plannercore.IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx, p)
c.Assert(err, IsNil)
Expand Down
9 changes: 4 additions & 5 deletions executor/metrics_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/parser"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/testkit"
)

Expand Down Expand Up @@ -62,10 +60,11 @@ func (s *testSuite7) TestStmtLabel(c *C) {
for _, tt := range tests {
stmtNode, err := parser.New().ParseOneStmt(tt.sql, "", "")
c.Check(err, IsNil)
is := tk.Se.GetInfoSchema().(infoschema.InfoSchema)
err = plannercore.Preprocess(tk.Se.(sessionctx.Context), stmtNode, is)
preprocessorReturn := &plannercore.PreprocessorReturn{}
err = plannercore.Preprocess(tk.Se, stmtNode, plannercore.WithPreprocessorReturn(preprocessorReturn))
c.Check(err, IsNil)
c.Assert(err, IsNil)
_, _, err = planner.Optimize(context.TODO(), tk.Se, stmtNode, is)
_, _, err = planner.Optimize(context.TODO(), tk.Se, stmtNode, preprocessorReturn.InfoSchema)
c.Assert(err, IsNil)
c.Assert(executor.GetStmtLabel(stmtNode), Equals, tt.label)
}
Expand Down
13 changes: 6 additions & 7 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func (e *paramMarkerExtractor) Leave(in ast.Node) (ast.Node, bool) {
type PrepareExec struct {
baseExecutor

is infoschema.InfoSchema
name string
sqlText string

Expand All @@ -89,12 +88,11 @@ type PrepareExec struct {
}

// NewPrepareExec creates a new PrepareExec.
func NewPrepareExec(ctx sessionctx.Context, is infoschema.InfoSchema, sqlTxt string) *PrepareExec {
func NewPrepareExec(ctx sessionctx.Context, sqlTxt string) *PrepareExec {
base := newBaseExecutor(ctx, nil, 0)
base.initCap = chunk.ZeroCapacity
return &PrepareExec{
baseExecutor: base,
is: is,
sqlText: sqlTxt,
}
}
Expand Down Expand Up @@ -159,7 +157,8 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error {
return ErrPsManyParam
}

err = plannercore.Preprocess(e.ctx, stmt, e.is, plannercore.InPrepare)
ret := &plannercore.PreprocessorReturn{}
err = plannercore.Preprocess(e.ctx, stmt, plannercore.InPrepare, plannercore.WithPreprocessorReturn(ret))
if err != nil {
return err
}
Expand All @@ -177,14 +176,14 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error {
Stmt: stmt,
StmtType: GetStmtLabel(stmt),
Params: sorter.markers,
SchemaVersion: e.is.SchemaMetaVersion(),
SchemaVersion: ret.InfoSchema.SchemaMetaVersion(),
Comment thread
winoros marked this conversation as resolved.
}

if !plannercore.PreparedPlanCacheEnabled() {
prepared.UseCache = false
} else {
if !e.ctx.GetSessionVars().UseDynamicPartitionPrune() {
prepared.UseCache = plannercore.Cacheable(stmt, e.is)
prepared.UseCache = plannercore.Cacheable(stmt, ret.InfoSchema)
} else {
prepared.UseCache = plannercore.Cacheable(stmt, nil)
}
Expand All @@ -199,7 +198,7 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error {
var p plannercore.Plan
e.ctx.GetSessionVars().PlanID = 0
e.ctx.GetSessionVars().PlanColumnID = 0
destBuilder, _ := plannercore.NewPlanBuilder(e.ctx, e.is, &hint.BlockHintProcessor{})
destBuilder, _ := plannercore.NewPlanBuilder(e.ctx, ret.InfoSchema, &hint.BlockHintProcessor{})
p, err = destBuilder.Build(ctx, stmt)
if err != nil {
return err
Expand Down
113 changes: 113 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -93,6 +94,118 @@ func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) {
failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope")
}

func (s *testStaleTxnSerialSuite) TestSelectAsOf(c *C) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should test this case:

begin
select .... as of timestamp (expecting error)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let us handle it in another PR.

tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec(`drop table if exists b`)
tk.MustExec("create table t (id int primary key);")
tk.MustExec("create table b (pid int primary key);")
defer func() {
tk.MustExec(`drop table if exists b`)
tk.MustExec(`drop table if exists t`)
}()
time.Sleep(2 * time.Second)
now := time.Now()
time.Sleep(2 * time.Second)

testcases := []struct {
name string
sql string
expectPhysicalTS int64
preSec int64
// IsStaleness is auto cleanup in select stmt.
errorStr string
}{
{
name: "TimestampExactRead1",
sql: fmt.Sprintf("select * from t as of timestamp '%s';", now.Format("2006-1-2 15:04:05")),
expectPhysicalTS: now.Unix(),
},
{
name: "NomalRead",
sql: `select * from b;`,
preSec: 0,
},
{
name: "TimestampExactRead2",
sql: fmt.Sprintf("select * from t as of timestamp TIMESTAMP('%s');", now.Format("2006-1-2 15:04:05")),
expectPhysicalTS: now.Unix(),
},
{
name: "TimestampExactRead3",
sql: `select * from t as of timestamp NOW() - INTERVAL 2 SECOND;`,
preSec: 2,
},
{
name: "TimestampExactRead4",
sql: `select * from t as of timestamp TIMESTAMP(NOW() - INTERVAL 2 SECOND);`,
preSec: 2,
},
{
name: "TimestampExactRead5",
sql: `select * from t as of timestamp TIMESTAMP(NOW() - INTERVAL 1 SECOND), b as of timestamp TIMESTAMP(NOW() - INTERVAL 1 SECOND);`,
preSec: 1,
},
{
name: "TimestampExactRead6",
sql: `select * from t as of timestamp TIMESTAMP(NOW() - INTERVAL 1 SECOND), b as of timestamp TIMESTAMP('2020-09-06 00:00:00');`,
errorStr: ".*can not set different time in the as of.*",
},
{
name: "TimestampExactRead7",
sql: `select * from t as of timestamp TIMESTAMP(NOW() - INTERVAL 1 SECOND), b;`,
errorStr: ".*can not set different time in the as of.*",
},
{
name: "TimestampExactRead8",
sql: `select * from t, b as of timestamp TIMESTAMP(NOW() - INTERVAL 1 SECOND);`,
errorStr: ".*can not set different time in the as of.*",
},
{
name: "NomalRead",
sql: `select * from t, b;`,
preSec: 0,
},
{
name: "TimestampExactRead9",
sql: `select * from (select * from t as of timestamp TIMESTAMP(NOW() - INTERVAL 1 SECOND), b as of timestamp TIMESTAMP(NOW() - INTERVAL 1 SECOND)) as c, b;`,
errorStr: ".*can not set different time in the as of.*",
},
{
name: "TimestampExactRead10",
sql: `select * from (select * from t as of timestamp TIMESTAMP(NOW() - INTERVAL 2 SECOND), b as of timestamp TIMESTAMP(NOW() - INTERVAL 2 SECOND)) as c;`,
preSec: 2,
},
// Cannot be supported the SubSelect
{
name: "TimestampExactRead11",
sql: `select * from (select * from t as of timestamp TIMESTAMP(NOW() - INTERVAL 20 SECOND), b as of timestamp TIMESTAMP(NOW() - INTERVAL 20 SECOND)) as c as of timestamp Now();`,
errorStr: ".*You have an error in your SQL syntax.*",
},
}

for _, testcase := range testcases {
c.Log(testcase.name)
if testcase.expectPhysicalTS > 0 {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/assertStaleTSO", fmt.Sprintf(`return(%d)`, testcase.expectPhysicalTS)), IsNil)
} else if testcase.preSec > 0 {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/assertStaleTSOWithTolerance", fmt.Sprintf(`return(%d)`, time.Now().Unix()-testcase.preSec)), IsNil)
}
_, err := tk.Exec(testcase.sql)
if len(testcase.errorStr) != 0 {
c.Assert(err, ErrorMatches, testcase.errorStr)
continue
}
c.Assert(err, IsNil, Commentf("sql:%s, error stack %v", testcase.sql, errors.ErrorStack(err)))
if testcase.expectPhysicalTS > 0 {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleTSO"), IsNil)
} else if testcase.preSec > 0 {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleTSOWithTolerance"), IsNil)
}
}
}

func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
6 changes: 3 additions & 3 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4850,10 +4850,10 @@ func (s *testIntegrationSuite) TestFilterExtractFromDNF(c *C) {
stmts, err := session.Parse(sctx, sql)
c.Assert(err, IsNil, Commentf("error %v, for expr %s", err, tt.exprStr))
c.Assert(stmts, HasLen, 1)
is := domain.GetDomain(sctx).InfoSchema()
err = plannercore.Preprocess(sctx, stmts[0], is)
ret := &plannercore.PreprocessorReturn{}
err = plannercore.Preprocess(sctx, stmts[0], plannercore.WithPreprocessorReturn(ret))
c.Assert(err, IsNil, Commentf("error %v, for resolve name, expr %s", err, tt.exprStr))
p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], is)
p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], ret.InfoSchema)
c.Assert(err, IsNil, Commentf("error %v, for build plan, expr %s", err, tt.exprStr))
selection := p.(plannercore.LogicalPlan).Children()[0].(*plannercore.LogicalSelection)
conds := make([]expression.Expression, len(selection.Conditions))
Expand Down
Loading