From c97aaeeb8817cd0b2a45c43cd37d72405f86ea48 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 19 May 2021 13:58:50 +0800 Subject: [PATCH 1/6] Support AS OF TIMESTAMP read-only begin statement Signed-off-by: JmPotato --- executor/builder.go | 9 +-- executor/simple.go | 32 +++++++-- executor/stale_txn_test.go | 120 +++++++++++++++++++++++--------- expression/builtin_time_test.go | 4 +- planner/core/common_plans.go | 3 + planner/core/planbuilder.go | 35 +++++++++- session/session.go | 86 ++++++++++++++++++++--- session/session_test.go | 4 +- sessionctx/context.go | 8 ++- util/mock/context.go | 10 +-- 10 files changed, 246 insertions(+), 65 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 54e2dfb93012a..47566f28cd928 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -706,10 +706,11 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor { base := newBaseExecutor(b.ctx, v.Schema(), v.ID()) base.initCap = chunk.ZeroCapacity e := &SimpleExec{ - baseExecutor: base, - Statement: v.Statement, - IsFromRemote: v.IsFromRemote, - is: b.is, + baseExecutor: base, + Statement: v.Statement, + IsFromRemote: v.IsFromRemote, + is: b.is, + StalenessTxnOption: v.StalenessTxnOption, } return e } diff --git a/executor/simple.go b/executor/simple.go index 7270f12aecdd0..18bb90dbac781 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -74,6 +74,9 @@ type SimpleExec struct { IsFromRemote bool done bool is infoschema.InfoSchema + + // StalenessTxnOption is used to execute the staleness txn during a read-only begin statement. + StalenessTxnOption *sessionctx.StalenessTxnOption } func (e *baseExecutor) getSysSession() (sessionctx.Context, error) { @@ -564,13 +567,16 @@ func (e *SimpleExec) executeUse(s *ast.UseStmt) error { } func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error { - // If `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND` is the first statement in TxnCtx, we should + // If `START TRANSACTION READ ONLY` is the first statement in TxnCtx, we should // always create a new Txn instead of reusing it. if s.ReadOnly { enableNoopFuncs := e.ctx.GetSessionVars().EnableNoopFuncs - if !enableNoopFuncs && s.Bound == nil { + if !enableNoopFuncs && s.AsOf == nil && s.Bound == nil { return expression.ErrFunctionsNoopImpl.GenWithStackByArgs("READ ONLY") } + if s.AsOf != nil { + return e.executeStartTransactionReadOnlyWithBoundedStaleness(ctx, s) + } if s.Bound != nil { return e.executeStartTransactionReadOnlyWithTimestampBound(ctx, s) } @@ -612,6 +618,22 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error { return nil } +func (e *SimpleExec) executeStartTransactionReadOnlyWithBoundedStaleness(ctx context.Context, s *ast.BeginStmt) error { + if e.StalenessTxnOption == nil { + return errors.New("Planner failed to create the txn option for AS OF TIMESTAMP statement") + } + if err := e.ctx.NewTxnWithStalenessOption(ctx, *e.StalenessTxnOption); err != nil { + return err + } + + // With START TRANSACTION, autocommit remains disabled until you end + // the transaction with COMMIT or ROLLBACK. The autocommit mode then + // reverts to its previous state. + e.ctx.GetSessionVars().SetInTxn(true) + return nil +} + +// TODO: deprecate this syntax and only keep `AS OF TIMESTAMP` statement. func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx context.Context, s *ast.BeginStmt) error { opt := sessionctx.StalenessTxnOption{} opt.Mode = s.Bound.Mode @@ -630,8 +652,7 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte if err != nil { return err } - startTS := oracle.ComposeTS(gt.Unix()*1000, 0) - opt.StartTS = startTS + opt.StartTS = oracle.GoTimeToTS(gt) case ast.TimestampBoundExactStaleness: // TODO: support funcCallExpr in future v, ok := s.Bound.Timestamp.(*driver.ValueExpr) @@ -666,8 +687,7 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte if err != nil { return err } - startTS := oracle.ComposeTS(gt.Unix()*1000, 0) - opt.StartTS = startTS + opt.StartTS = oracle.GoTimeToTS(gt) } err := e.ctx.NewTxnWithStalenessOption(ctx, opt) if err != nil { diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 64b334b15bf94..150660d9983a1 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -73,13 +73,30 @@ func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) { zone: "sz", }, { - name: "begin", + name: "begin after TimestampBoundReadTimestamp", preSQL: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`, sql: "begin", IsStaleness: false, txnScope: kv.GlobalTxnScope, zone: "", }, + { + name: "AsOfTimestamp", + preSQL: "begin", + sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`, + IsStaleness: true, + expectPhysicalTS: 1599321600000, + txnScope: "local", + zone: "sh", + }, + { + name: "begin after AsOfTimestamp", + preSQL: `START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`, + sql: "begin", + IsStaleness: false, + txnScope: oracle.GlobalTxnScope, + zone: "", + }, } tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") @@ -106,8 +123,8 @@ func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) { } c.Assert(tk.Se.GetSessionVars().TxnCtx.IsStaleness, Equals, testcase.IsStaleness) tk.MustExec("commit") - failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope") } + failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope") } func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) { @@ -147,13 +164,16 @@ func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) { failpoint.Enable("github.com/pingcap/tidb/config/injectTxnScope", fmt.Sprintf(`return("%v")`, testcase.zone)) failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStoreLabels", fmt.Sprintf(`return("%v_%v")`, placement.DCLabelKey, testcase.txnScope)) failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag", `return(true)`) + tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP CURRENT_TIMESTAMP(3);`) + tk.MustQuery(testcase.sql) + tk.MustExec(`commit`) tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:00';`) tk.MustQuery(testcase.sql) tk.MustExec(`commit`) - failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope") - failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStoreLabels") - failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag") } + failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope") + failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStoreLabels") + failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag") } func (s *testStaleTxnSerialSuite) TestStalenessAndHistoryRead(c *C) { @@ -169,6 +189,16 @@ func (s *testStaleTxnSerialSuite) TestStalenessAndHistoryRead(c *C) { tk.MustExec(updateSafePoint) // set @@tidb_snapshot before staleness txn tk.MustExec(`set @@tidb_snapshot="2016-10-08 16:45:26";`) + tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`) + c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000)) + tk.MustExec("commit") + // set @@tidb_snapshot during staleness txn + tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`) + tk.MustExec(`set @@tidb_snapshot="2016-10-08 16:45:26";`) + c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000)) + tk.MustExec("commit") + // set @@tidb_snapshot before staleness txn + tk.MustExec(`set @@tidb_snapshot="2016-10-08 16:45:26";`) tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`) c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000)) tk.MustExec("commit") @@ -190,25 +220,20 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) { name string sql string injectSafeTS uint64 - useSafeTS bool + // compareWithSafeTS will be 0 if StartTS==SafeTS, -1 if StartTS < SafeTS, and +1 if StartTS > SafeTS. + compareWithSafeTS int }{ { - name: "max 20 seconds ago, safeTS 10 secs ago", - sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:20'`, - injectSafeTS: func() uint64 { - phy := time.Now().Add(-10*time.Second).Unix() * 1000 - return oracle.ComposeTS(phy, 0) - }(), - useSafeTS: true, + name: "max 20 seconds ago, safeTS 10 secs ago", + sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:20'`, + injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-10 * time.Second)), + compareWithSafeTS: 0, }, { - name: "max 10 seconds ago, safeTS 20 secs ago", - sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:10'`, - injectSafeTS: func() uint64 { - phy := time.Now().Add(-20*time.Second).Unix() * 1000 - return oracle.ComposeTS(phy, 0) - }(), - useSafeTS: false, + name: "max 10 seconds ago, safeTS 20 secs ago", + sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:10'`, + injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-20 * time.Second)), + compareWithSafeTS: 1, }, { name: "max 20 seconds ago, safeTS 10 secs ago", @@ -216,11 +241,8 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) { return fmt.Sprintf(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MIN READ TIMESTAMP '%v'`, time.Now().Add(-20*time.Second).Format("2006-01-02 15:04:05")) }(), - injectSafeTS: func() uint64 { - phy := time.Now().Add(-10*time.Second).Unix() * 1000 - return oracle.ComposeTS(phy, 0) - }(), - useSafeTS: true, + injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-10 * time.Second)), + compareWithSafeTS: 0, }, { name: "max 10 seconds ago, safeTS 20 secs ago", @@ -228,26 +250,46 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) { return fmt.Sprintf(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MIN READ TIMESTAMP '%v'`, time.Now().Add(-10*time.Second).Format("2006-01-02 15:04:05")) }(), - injectSafeTS: func() uint64 { - phy := time.Now().Add(-20*time.Second).Unix() * 1000 - return oracle.ComposeTS(phy, 0) - }(), - useSafeTS: false, + injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-20 * time.Second)), + compareWithSafeTS: 1, + }, + { + name: "20 seconds ago to now, safeTS 10 secs ago", + sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP tidb_bounded_staleness(NOW() - INTERVAL 20 SECOND, NOW())`, + injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-10 * time.Second)), + compareWithSafeTS: 0, + }, + { + name: "10 seconds ago to now, safeTS 20 secs ago", + sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP tidb_bounded_staleness(NOW() - INTERVAL 10 SECOND, NOW())`, + injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-20 * time.Second)), + compareWithSafeTS: 1, + }, + { + name: "20 seconds ago to 10 seconds ago, safeTS 5 secs ago", + sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP tidb_bounded_staleness(NOW() - INTERVAL 20 SECOND, NOW() - INTERVAL 10 SECOND)`, + injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-5 * time.Second)), + compareWithSafeTS: -1, }, } for _, testcase := range testcases { c.Log(testcase.name) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/injectSafeTS", fmt.Sprintf("return(%v)", testcase.injectSafeTS)), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", testcase.injectSafeTS)), IsNil) tk.MustExec(testcase.sql) - if testcase.useSafeTS { + if testcase.compareWithSafeTS == 1 { + c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Greater, testcase.injectSafeTS) + } else if testcase.compareWithSafeTS == 0 { c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, testcase.injectSafeTS) } else { - c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Greater, testcase.injectSafeTS) + c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Less, testcase.injectSafeTS) } tk.MustExec("commit") - failpoint.Disable("github.com/pingcap/tidb/store/tikv/injectSafeTS") } + failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS") + failpoint.Disable("github.com/pingcap/tidb/store/tikv/injectSafeTS") } func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) { @@ -267,4 +309,16 @@ func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) { schemaVer3 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() // got an old infoSchema c.Assert(schemaVer3, Equals, schemaVer1) + + schemaVer4 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() + time.Sleep(time.Second) + tk.MustExec("create table t (id int primary key);") + schemaVer5 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() + // confirm schema changed + c.Assert(schemaVer4, Less, schemaVer5) + + tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP NOW() - INTERVAL 1 SECOND`) + schemaVer6 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() + // got an old infoSchema + c.Assert(schemaVer6, Equals, schemaVer4) } diff --git a/expression/builtin_time_test.go b/expression/builtin_time_test.go index e247e8756ae9a..4015794377486 100644 --- a/expression/builtin_time_test.go +++ b/expression/builtin_time_test.go @@ -2928,7 +2928,7 @@ func (s *testEvaluatorSuite) TestTiDBBoundedStaleness(c *C) { // Test whether it's deterministic. safeTime1 := t2.Add(-1 * time.Second) - safeTS1 := oracle.ComposeTS(safeTime1.Unix()*1000, 0) + safeTS1 := oracle.GoTimeToTS(safeTime1) c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", fmt.Sprintf("return(%v)", safeTS1)), IsNil) f, err := fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(t1Str), types.NewDatum(t2Str)})) @@ -2941,7 +2941,7 @@ func (s *testEvaluatorSuite) TestTiDBBoundedStaleness(c *C) { c.Assert(resultTime, Equals, safeTime1.Format(types.TimeFormat)) // SafeTS updated. safeTime2 := t2.Add(1 * time.Second) - safeTS2 := oracle.ComposeTS(safeTime2.Unix()*1000, 0) + safeTS2 := oracle.GoTimeToTS(safeTime2) c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", fmt.Sprintf("return(%v)", safeTS2)), IsNil) f, err = fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(t1Str), types.NewDatum(t2Str)})) diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 3818486955646..b42a84d926eeb 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -723,6 +723,9 @@ type Simple struct { // and executing in co-processor. // Used for `global kill`. See https://github.com/pingcap/tidb/blob/master/docs/design/2020-06-01-global-kill.md. IsFromRemote bool + + // StalenessTxnOption is the transaction option that will be built when planner builder calls buildSimple. + StalenessTxnOption *sessionctx.StalenessTxnOption } // PhysicalSimpleWrapper is a wrapper of `Simple` to implement physical plan interface. diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 7dc2459dace33..c4786e88cb008 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -43,6 +43,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" driver "github.com/pingcap/tidb/types/parser_driver" @@ -642,7 +643,7 @@ func (b *PlanBuilder) Build(ctx context.Context, node ast.Node) (Plan, error) { *ast.BeginStmt, *ast.CommitStmt, *ast.RollbackStmt, *ast.CreateUserStmt, *ast.SetPwdStmt, *ast.AlterInstanceStmt, *ast.GrantStmt, *ast.DropUserStmt, *ast.AlterUserStmt, *ast.RevokeStmt, *ast.KillStmt, *ast.DropStatsStmt, *ast.GrantRoleStmt, *ast.RevokeRoleStmt, *ast.SetRoleStmt, *ast.SetDefaultRoleStmt, *ast.ShutdownStmt: - return b.buildSimple(node.(ast.StmtNode)) + return b.buildSimple(ctx, node.(ast.StmtNode)) case ast.DDLNode: return b.buildDDL(ctx, x) case *ast.CreateBindingStmt: @@ -2258,7 +2259,7 @@ func (b *PlanBuilder) buildShow(ctx context.Context, show *ast.ShowStmt) (Plan, return np, nil } -func (b *PlanBuilder) buildSimple(node ast.StmtNode) (Plan, error) { +func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, error) { p := &Simple{Statement: node} switch raw := node.(type) { @@ -2324,6 +2325,36 @@ func (b *PlanBuilder) buildSimple(node ast.StmtNode) (Plan, error) { } case *ast.ShutdownStmt: b.visitInfo = appendVisitInfo(b.visitInfo, mysql.ShutdownPriv, "", "", "", nil) + case *ast.BeginStmt: + if raw.AsOf != nil { + p.StalenessTxnOption = &sessionctx.StalenessTxnOption{ + UseAsOf: true, + } + var tsTime time.Time + // For the normal expression like `AS OF TIMESTAMP '2015-09-21 00:00:00.000'`. + if tsExpr, isValueExpr := raw.AsOf.TsExpr.(*driver.ValueExpr); isValueExpr { + tsVal, err := types.ParseTime(b.ctx.GetSessionVars().StmtCtx, tsExpr.GetString(), tsExpr.GetType().Tp, types.GetFsp(tsExpr.GetString())) + if err != nil { + return nil, err + } + tsTime, err = tsVal.GoTime(b.ctx.GetSessionVars().TimeZone) + if err != nil { + return nil, err + } + } + // For the function call expression like `AS OF TIMESTAMP NOW()`. + if _, isFuncCall := raw.AsOf.TsExpr.(*ast.FuncCallExpr); isFuncCall { + tsVal, err := evalAstExpr(b.ctx, raw.AsOf.TsExpr) + if err != nil { + return nil, err + } + tsTime, err = tsVal.GetMysqlTime().GoTime(b.ctx.GetSessionVars().TimeZone) + if err != nil { + return nil, err + } + } + p.StalenessTxnOption.StartTS = oracle.GoTimeToTS(tsTime) + } } return p, nil } diff --git a/session/session.go b/session/session.go index 902f35ca28e79..a9c1e62375e8e 100644 --- a/session/session.go +++ b/session/session.go @@ -1960,6 +1960,31 @@ func (s *session) isTxnRetryable() bool { } func (s *session) NewTxn(ctx context.Context) error { + if err := s.checkBeforeCreateNewTxn(ctx); err != nil { + return err + } + txn, err := s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope())) + if err != nil { + return err + } + txn.SetVars(s.sessionVars.KVVars) + if s.GetSessionVars().GetReplicaRead().IsFollowerRead() { + txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } + s.txn.changeInvalidToValid(txn) + is := domain.GetDomain(s).InfoSchema() + s.sessionVars.TxnCtx = &variable.TransactionContext{ + InfoSchema: is, + CreateTime: time.Now(), + StartTS: txn.StartTS(), + ShardStep: int(s.sessionVars.ShardAllocateStep), + IsStaleness: false, + TxnScope: s.sessionVars.CheckAndGetTxnScope(), + } + return nil +} + +func (s *session) checkBeforeCreateNewTxn(ctx context.Context) error { if s.txn.Valid() { txnStartTS := s.txn.StartTS() txnScope := s.GetSessionVars().TxnCtx.TxnScope @@ -1968,29 +1993,71 @@ func (s *session) NewTxn(ctx context.Context) error { return err } vars := s.GetSessionVars() - logutil.Logger(ctx).Info("NewTxn() inside a transaction auto commit", + logutil.Logger(ctx).Info("Try to create a new txn inside a transaction auto commit", zap.Int64("schemaVersion", vars.GetInfoSchema().SchemaMetaVersion()), zap.Uint64("txnStartTS", txnStartTS), zap.String("txnScope", txnScope)) } + return nil +} - txn, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope())) +// NewTxnWithStalenessOption create a transaction with Staleness option +func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionctx.StalenessTxnOption) error { + err := s.checkBeforeCreateNewTxn(ctx) if err != nil { return err } - txn.SetVars(s.sessionVars.KVVars) - if s.GetSessionVars().GetReplicaRead().IsFollowerRead() { - txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + var ( + txn kv.Transaction + txnScope = s.GetSessionVars().CheckAndGetTxnScope() + ) + if option.UseAsOf { + txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) + if err != nil { + return err + } + } else { + switch option.Mode { + case ast.TimestampBoundReadTimestamp: + txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) + if err != nil { + return err + } + case ast.TimestampBoundExactStaleness: + txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec)) + if err != nil { + return err + } + case ast.TimestampBoundMaxStaleness: + txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec)) + if err != nil { + return err + } + case ast.TimestampBoundMinReadTimestamp: + txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS)) + if err != nil { + return err + } + default: + // For unsupported staleness txn cases, fallback to NewTxn + return s.NewTxn(ctx) + } } + txn.SetVars(s.sessionVars.KVVars) + txn.SetOption(kv.IsStalenessReadOnly, true) + txn.SetOption(kv.TxnScope, txnScope) s.txn.changeInvalidToValid(txn) - is := domain.GetDomain(s).InfoSchema() + is, err := domain.GetDomain(s).GetSnapshotInfoSchema(txn.StartTS()) + if err != nil { + return errors.Trace(err) + } s.sessionVars.TxnCtx = &variable.TransactionContext{ InfoSchema: is, CreateTime: time.Now(), StartTS: txn.StartTS(), ShardStep: int(s.sessionVars.ShardAllocateStep), - IsStaleness: false, - TxnScope: s.sessionVars.CheckAndGetTxnScope(), + IsStaleness: true, + TxnScope: txnScope, } return nil } @@ -2780,6 +2847,7 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error { return nil } +<<<<<<< HEAD // NewTxnWithStalenessOption create a transaction with Staleness option func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionctx.StalenessTxnOption) error { if s.txn.Valid() { @@ -2842,6 +2910,8 @@ func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionc return nil } +======= +>>>>>>> Support AS OF TIMESTAMP read-only begin statement // GetStore gets the store of session. func (s *session) GetStore() kv.Storage { return s.store diff --git a/session/session_test.go b/session/session_test.go index a6c7908237bca..ad6036f51dea1 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -4109,7 +4109,7 @@ func (s *testSessionSerialSuite) TestValidateReadOnlyInStalenessTransaction(c *C tk.MustExec(`set @@tidb_enable_noop_functions=1;`) for _, testcase := range testcases { c.Log(testcase.name) - tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:00';`) + tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP CURRENT_TIMESTAMP(3);`) if testcase.isValidate { _, err := tk.Exec(testcase.sql) c.Assert(err, IsNil) @@ -4169,7 +4169,7 @@ func (s *testSessionSerialSuite) TestSpecialSQLInStalenessTxn(c *C) { tk.MustExec("CREATE USER 'newuser' IDENTIFIED BY 'mypassword';") for _, testcase := range testcases { comment := Commentf(testcase.name) - tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:00';`) + tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP CURRENT_TIMESTAMP(3);`) c.Assert(tk.Se.GetSessionVars().TxnCtx.IsStaleness, Equals, true, comment) tk.MustExec(testcase.sql) c.Assert(tk.Se.GetSessionVars().TxnCtx.IsStaleness, Equals, testcase.sameSession, comment) diff --git a/sessionctx/context.go b/sessionctx/context.go index 2aeda663a038d..7752f9e7af5de 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -34,6 +34,8 @@ type Context interface { // If old transaction is valid, it is committed first. // It's used in BEGIN statement and DDL statements to commit old transaction. NewTxn(context.Context) error + // NewTxnWithStalenessOption initializes a transaction with StalenessTxnOption. + NewTxnWithStalenessOption(ctx context.Context, option StalenessTxnOption) error // Txn returns the current transaction which is created before executing a statement. // The returned kv.Transaction is not nil, but it maybe pending or invalid. @@ -73,9 +75,6 @@ type Context interface { // It should be called right before we builds an executor. InitTxnWithStartTS(startTS uint64) error - // NewTxnWithStalenessOption initializes a transaction with StalenessTxnOption - NewTxnWithStalenessOption(ctx context.Context, option StalenessTxnOption) error - // GetStore returns the store of session. GetStore() kv.Storage @@ -147,4 +146,7 @@ type StalenessTxnOption struct { Mode ast.TimestampBoundMode PrevSec uint64 StartTS uint64 + // UseAsOf is used to be compatible with the current implementation, + // it will be removed later. + UseAsOf bool } diff --git a/util/mock/context.go b/util/mock/context.go index d23124e555ea2..05ccc0fbf8d8a 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -188,6 +188,11 @@ func (c *Context) NewTxn(context.Context) error { return nil } +// NewTxnWithStalenessOption implements the sessionctx.Context interface. +func (c *Context) NewTxnWithStalenessOption(ctx context.Context, option sessionctx.StalenessTxnOption) error { + return c.NewTxn(ctx) +} + // RefreshTxnCtx implements the sessionctx.Context interface. func (c *Context) RefreshTxnCtx(ctx context.Context) error { return errors.Trace(c.NewTxn(ctx)) @@ -213,11 +218,6 @@ func (c *Context) InitTxnWithStartTS(startTS uint64) error { return nil } -// NewTxnWithStalenessOption implements the sessionctx.Context interface. -func (c *Context) NewTxnWithStalenessOption(ctx context.Context, option sessionctx.StalenessTxnOption) error { - return c.NewTxn(ctx) -} - // GetStore gets the store of session. func (c *Context) GetStore() kv.Storage { return c.Store From 85c0f999decef913ac7f1ed5f21a5328c6f6d15b Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 19 May 2021 18:09:37 +0800 Subject: [PATCH 2/6] Refine the TsExpr calculation Signed-off-by: JmPotato --- executor/stale_txn_test.go | 3 ++- planner/core/planbuilder.go | 41 +++++++++++++++---------------------- session/session_test.go | 4 ++-- 3 files changed, 21 insertions(+), 27 deletions(-) diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 150660d9983a1..c10513d49ee28 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -164,7 +164,8 @@ func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) { failpoint.Enable("github.com/pingcap/tidb/config/injectTxnScope", fmt.Sprintf(`return("%v")`, testcase.zone)) failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStoreLabels", fmt.Sprintf(`return("%v_%v")`, placement.DCLabelKey, testcase.txnScope)) failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag", `return(true)`) - tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP CURRENT_TIMESTAMP(3);`) + // Using NOW() will cause the loss of fsp precision, so we use NOW(3) to be accurate to the millisecond. + tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP NOW(3);`) tk.MustQuery(testcase.sql) tk.MustExec(`commit`) tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:00';`) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index c4786e88cb008..691c6b5e5597f 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -2327,33 +2327,26 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, b.visitInfo = appendVisitInfo(b.visitInfo, mysql.ShutdownPriv, "", "", "", nil) case *ast.BeginStmt: if raw.AsOf != nil { - p.StalenessTxnOption = &sessionctx.StalenessTxnOption{ - UseAsOf: true, + // Calculate the TsExpr to get a timestamp. + tsVal, err := evalAstExpr(b.ctx, raw.AsOf.TsExpr) + if err != nil { + return nil, err } - var tsTime time.Time - // For the normal expression like `AS OF TIMESTAMP '2015-09-21 00:00:00.000'`. - if tsExpr, isValueExpr := raw.AsOf.TsExpr.(*driver.ValueExpr); isValueExpr { - tsVal, err := types.ParseTime(b.ctx.GetSessionVars().StmtCtx, tsExpr.GetString(), tsExpr.GetType().Tp, types.GetFsp(tsExpr.GetString())) - if err != nil { - return nil, err - } - tsTime, err = tsVal.GoTime(b.ctx.GetSessionVars().TimeZone) - if err != nil { - return nil, err - } + toTypeTimestamp := types.NewFieldType(mysql.TypeTimestamp) + // We need at least the millionsecond here, so set fsp to 3. + toTypeTimestamp.Decimal = 3 + tsTimestamp, err := tsVal.ConvertTo(b.ctx.GetSessionVars().StmtCtx, toTypeTimestamp) + if err != nil { + return nil, err } - // For the function call expression like `AS OF TIMESTAMP NOW()`. - if _, isFuncCall := raw.AsOf.TsExpr.(*ast.FuncCallExpr); isFuncCall { - tsVal, err := evalAstExpr(b.ctx, raw.AsOf.TsExpr) - if err != nil { - return nil, err - } - tsTime, err = tsVal.GetMysqlTime().GoTime(b.ctx.GetSessionVars().TimeZone) - if err != nil { - return nil, err - } + tsTime, err := tsTimestamp.GetMysqlTime().GoTime(b.ctx.GetSessionVars().TimeZone) + if err != nil { + return nil, err + } + p.StalenessTxnOption = &sessionctx.StalenessTxnOption{ + UseAsOf: true, + StartTS: oracle.GoTimeToTS(tsTime), } - p.StalenessTxnOption.StartTS = oracle.GoTimeToTS(tsTime) } } return p, nil diff --git a/session/session_test.go b/session/session_test.go index ad6036f51dea1..9d2d63cb02804 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -4109,7 +4109,7 @@ func (s *testSessionSerialSuite) TestValidateReadOnlyInStalenessTransaction(c *C tk.MustExec(`set @@tidb_enable_noop_functions=1;`) for _, testcase := range testcases { c.Log(testcase.name) - tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP CURRENT_TIMESTAMP(3);`) + tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP NOW(3);`) if testcase.isValidate { _, err := tk.Exec(testcase.sql) c.Assert(err, IsNil) @@ -4169,7 +4169,7 @@ func (s *testSessionSerialSuite) TestSpecialSQLInStalenessTxn(c *C) { tk.MustExec("CREATE USER 'newuser' IDENTIFIED BY 'mypassword';") for _, testcase := range testcases { comment := Commentf(testcase.name) - tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP CURRENT_TIMESTAMP(3);`) + tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP NOW(3);`) c.Assert(tk.Se.GetSessionVars().TxnCtx.IsStaleness, Equals, true, comment) tk.MustExec(testcase.sql) c.Assert(tk.Se.GetSessionVars().TxnCtx.IsStaleness, Equals, testcase.sameSession, comment) From 122583ce2f6d98dea81e11fa9af500663f8212e5 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 19 May 2021 21:05:43 +0800 Subject: [PATCH 3/6] Resolve the conflict Signed-off-by: JmPotato --- session/session.go | 77 ++++------------------------------------------ 1 file changed, 6 insertions(+), 71 deletions(-) diff --git a/session/session.go b/session/session.go index a9c1e62375e8e..dd9a976cb2869 100644 --- a/session/session.go +++ b/session/session.go @@ -1963,7 +1963,7 @@ func (s *session) NewTxn(ctx context.Context) error { if err := s.checkBeforeCreateNewTxn(ctx); err != nil { return err } - txn, err := s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope())) + txn, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope())) if err != nil { return err } @@ -2012,29 +2012,29 @@ func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionc txnScope = s.GetSessionVars().CheckAndGetTxnScope() ) if option.UseAsOf { - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) if err != nil { return err } } else { switch option.Mode { case ast.TimestampBoundReadTimestamp: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) if err != nil { return err } case ast.TimestampBoundExactStaleness: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec)) if err != nil { return err } case ast.TimestampBoundMaxStaleness: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec)) if err != nil { return err } case ast.TimestampBoundMinReadTimestamp: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS)) if err != nil { return err } @@ -2847,71 +2847,6 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error { return nil } -<<<<<<< HEAD -// NewTxnWithStalenessOption create a transaction with Staleness option -func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionctx.StalenessTxnOption) error { - if s.txn.Valid() { - txnID := s.txn.StartTS() - txnScope := s.txn.GetOption(kv.TxnScope).(string) - err := s.CommitTxn(ctx) - if err != nil { - return err - } - vars := s.GetSessionVars() - logutil.Logger(ctx).Info("InitTxnWithExactStaleness() inside a transaction auto commit", - zap.Int64("schemaVersion", vars.GetInfoSchema().SchemaMetaVersion()), - zap.Uint64("txnStartTS", txnID), - zap.String("txnScope", txnScope)) - } - var txn kv.Transaction - var err error - txnScope := s.GetSessionVars().CheckAndGetTxnScope() - switch option.Mode { - case ast.TimestampBoundReadTimestamp: - txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) - if err != nil { - return err - } - case ast.TimestampBoundExactStaleness: - txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec)) - if err != nil { - return err - } - case ast.TimestampBoundMaxStaleness: - txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec)) - if err != nil { - return err - } - case ast.TimestampBoundMinReadTimestamp: - txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS)) - if err != nil { - return err - } - default: - // For unsupported staleness txn cases, fallback to NewTxn - return s.NewTxn(ctx) - } - txn.SetVars(s.sessionVars.KVVars) - txn.SetOption(kv.IsStalenessReadOnly, true) - txn.SetOption(kv.TxnScope, txnScope) - s.txn.changeInvalidToValid(txn) - is, err := domain.GetDomain(s).GetSnapshotInfoSchema(txn.StartTS()) - if err != nil { - return errors.Trace(err) - } - s.sessionVars.TxnCtx = &variable.TransactionContext{ - InfoSchema: is, - CreateTime: time.Now(), - StartTS: txn.StartTS(), - ShardStep: int(s.sessionVars.ShardAllocateStep), - IsStaleness: true, - TxnScope: txnScope, - } - return nil -} - -======= ->>>>>>> Support AS OF TIMESTAMP read-only begin statement // GetStore gets the store of session. func (s *session) GetStore() kv.Storage { return s.store From c49455591b38e5e1d42d99a396f1b9a762243621 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 19 May 2021 21:16:46 +0800 Subject: [PATCH 4/6] Use calculateTsExpr to calculate the startTS for AS OF Signed-off-by: JmPotato --- planner/core/planbuilder.go | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 691c6b5e5597f..603224c070aa6 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -2327,31 +2327,39 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, b.visitInfo = appendVisitInfo(b.visitInfo, mysql.ShutdownPriv, "", "", "", nil) case *ast.BeginStmt: if raw.AsOf != nil { - // Calculate the TsExpr to get a timestamp. - tsVal, err := evalAstExpr(b.ctx, raw.AsOf.TsExpr) - if err != nil { - return nil, err - } - toTypeTimestamp := types.NewFieldType(mysql.TypeTimestamp) - // We need at least the millionsecond here, so set fsp to 3. - toTypeTimestamp.Decimal = 3 - tsTimestamp, err := tsVal.ConvertTo(b.ctx.GetSessionVars().StmtCtx, toTypeTimestamp) - if err != nil { - return nil, err - } - tsTime, err := tsTimestamp.GetMysqlTime().GoTime(b.ctx.GetSessionVars().TimeZone) + startTS, err := b.calculateTsExpr(raw.AsOf) if err != nil { return nil, err } p.StalenessTxnOption = &sessionctx.StalenessTxnOption{ UseAsOf: true, - StartTS: oracle.GoTimeToTS(tsTime), + StartTS: startTS, } } } return p, nil } +// calculateTsExpr calculates the TsExpr of AsOfClause to get a StartTS. +func (b *PlanBuilder) calculateTsExpr(asOfClause *ast.AsOfClause) (uint64, error) { + tsVal, err := evalAstExpr(b.ctx, asOfClause.TsExpr) + if err != nil { + return 0, err + } + toTypeTimestamp := types.NewFieldType(mysql.TypeTimestamp) + // We need at least the millionsecond here, so set fsp to 3. + toTypeTimestamp.Decimal = 3 + tsTimestamp, err := tsVal.ConvertTo(b.ctx.GetSessionVars().StmtCtx, toTypeTimestamp) + if err != nil { + return 0, err + } + tsTime, err := tsTimestamp.GetMysqlTime().GoTime(b.ctx.GetSessionVars().TimeZone) + if err != nil { + return 0, err + } + return oracle.GoTimeToTS(tsTime), nil +} + func collectVisitInfoFromRevokeStmt(sctx sessionctx.Context, vi []visitInfo, stmt *ast.RevokeStmt) []visitInfo { // To use REVOKE, you must have the GRANT OPTION privilege, // and you must have the privileges that you are granting. From af8b9d38ebc82b8c6c921fe63bef75b4c99105b1 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 20 May 2021 16:29:40 +0800 Subject: [PATCH 5/6] Address the comments Signed-off-by: JmPotato --- executor/simple.go | 2 +- planner/core/planbuilder.go | 2 +- session/session.go | 119 +++++++++++++++++------------------- sessionctx/context.go | 8 +-- util/mock/context.go | 10 +-- 5 files changed, 66 insertions(+), 75 deletions(-) diff --git a/executor/simple.go b/executor/simple.go index 9f71e4008eac7..193949eda3fdd 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -622,7 +622,7 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error { func (e *SimpleExec) executeStartTransactionReadOnlyWithBoundedStaleness(ctx context.Context, s *ast.BeginStmt) error { if e.StalenessTxnOption == nil { - return errors.New("Planner failed to create the txn option for AS OF TIMESTAMP statement") + return errors.New("Failed to get timestamp during start transaction read only as of timestamp") } if err := e.ctx.NewTxnWithStalenessOption(ctx, *e.StalenessTxnOption); err != nil { return err diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 12ce33967ba72..2ae66a61602a5 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -2333,7 +2333,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, return nil, err } p.StalenessTxnOption = &sessionctx.StalenessTxnOption{ - UseAsOf: true, + Mode: ast.TimestampBoundReadTimestamp, StartTS: startTS, } } diff --git a/session/session.go b/session/session.go index 6389f967770eb..2874330a02fd3 100644 --- a/session/session.go +++ b/session/session.go @@ -1960,7 +1960,7 @@ func (s *session) isTxnRetryable() bool { } func (s *session) NewTxn(ctx context.Context) error { - if err := s.checkBeforeCreateNewTxn(ctx); err != nil { + if err := s.checkBeforeNewTxn(ctx); err != nil { return err } txn, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope())) @@ -1984,7 +1984,7 @@ func (s *session) NewTxn(ctx context.Context) error { return nil } -func (s *session) checkBeforeCreateNewTxn(ctx context.Context) error { +func (s *session) checkBeforeNewTxn(ctx context.Context) error { if s.txn.Valid() { txnStartTS := s.txn.StartTS() txnScope := s.GetSessionVars().TxnCtx.TxnScope @@ -2001,67 +2001,6 @@ func (s *session) checkBeforeCreateNewTxn(ctx context.Context) error { return nil } -// NewTxnWithStalenessOption create a transaction with Staleness option -func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionctx.StalenessTxnOption) error { - err := s.checkBeforeCreateNewTxn(ctx) - if err != nil { - return err - } - var ( - txn kv.Transaction - txnScope = s.GetSessionVars().CheckAndGetTxnScope() - ) - if option.UseAsOf { - txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) - if err != nil { - return err - } - } else { - switch option.Mode { - case ast.TimestampBoundReadTimestamp: - txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) - if err != nil { - return err - } - case ast.TimestampBoundExactStaleness: - txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec)) - if err != nil { - return err - } - case ast.TimestampBoundMaxStaleness: - txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec)) - if err != nil { - return err - } - case ast.TimestampBoundMinReadTimestamp: - txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS)) - if err != nil { - return err - } - default: - // For unsupported staleness txn cases, fallback to NewTxn - return s.NewTxn(ctx) - } - } - txn.SetVars(s.sessionVars.KVVars) - txn.SetOption(kv.IsStalenessReadOnly, true) - txn.SetOption(kv.TxnScope, txnScope) - s.txn.changeInvalidToValid(txn) - is, err := domain.GetDomain(s).GetSnapshotInfoSchema(txn.StartTS()) - if err != nil { - return errors.Trace(err) - } - s.sessionVars.TxnCtx = &variable.TransactionContext{ - InfoSchema: is, - CreateTime: time.Now(), - StartTS: txn.StartTS(), - ShardStep: int(s.sessionVars.ShardAllocateStep), - IsStaleness: true, - TxnScope: txnScope, - } - return nil -} - func (s *session) SetValue(key fmt.Stringer, value interface{}) { s.mu.Lock() s.mu.values[key] = value @@ -2847,6 +2786,60 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error { return nil } +// NewTxnWithStalenessOption create a transaction with Staleness option +func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionctx.StalenessTxnOption) error { + err := s.checkBeforeNewTxn(ctx) + if err != nil { + return err + } + var ( + txn kv.Transaction + txnScope = s.GetSessionVars().CheckAndGetTxnScope() + ) + switch option.Mode { + case ast.TimestampBoundReadTimestamp: + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) + if err != nil { + return err + } + case ast.TimestampBoundExactStaleness: + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec)) + if err != nil { + return err + } + case ast.TimestampBoundMaxStaleness: + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec)) + if err != nil { + return err + } + case ast.TimestampBoundMinReadTimestamp: + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS)) + if err != nil { + return err + } + default: + // For unsupported staleness txn cases, fallback to NewTxn + return s.NewTxn(ctx) + } + txn.SetVars(s.sessionVars.KVVars) + txn.SetOption(kv.IsStalenessReadOnly, true) + txn.SetOption(kv.TxnScope, txnScope) + s.txn.changeInvalidToValid(txn) + is, err := domain.GetDomain(s).GetSnapshotInfoSchema(txn.StartTS()) + if err != nil { + return errors.Trace(err) + } + s.sessionVars.TxnCtx = &variable.TransactionContext{ + InfoSchema: is, + CreateTime: time.Now(), + StartTS: txn.StartTS(), + ShardStep: int(s.sessionVars.ShardAllocateStep), + IsStaleness: true, + TxnScope: txnScope, + } + return nil +} + // GetStore gets the store of session. func (s *session) GetStore() kv.Storage { return s.store diff --git a/sessionctx/context.go b/sessionctx/context.go index 7752f9e7af5de..59a917f86a9bc 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -34,8 +34,6 @@ type Context interface { // If old transaction is valid, it is committed first. // It's used in BEGIN statement and DDL statements to commit old transaction. NewTxn(context.Context) error - // NewTxnWithStalenessOption initializes a transaction with StalenessTxnOption. - NewTxnWithStalenessOption(ctx context.Context, option StalenessTxnOption) error // Txn returns the current transaction which is created before executing a statement. // The returned kv.Transaction is not nil, but it maybe pending or invalid. @@ -75,6 +73,9 @@ type Context interface { // It should be called right before we builds an executor. InitTxnWithStartTS(startTS uint64) error + // NewTxnWithStalenessOption initializes a transaction with StalenessTxnOption. + NewTxnWithStalenessOption(ctx context.Context, option StalenessTxnOption) error + // GetStore returns the store of session. GetStore() kv.Storage @@ -146,7 +147,4 @@ type StalenessTxnOption struct { Mode ast.TimestampBoundMode PrevSec uint64 StartTS uint64 - // UseAsOf is used to be compatible with the current implementation, - // it will be removed later. - UseAsOf bool } diff --git a/util/mock/context.go b/util/mock/context.go index 05ccc0fbf8d8a..d23124e555ea2 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -188,11 +188,6 @@ func (c *Context) NewTxn(context.Context) error { return nil } -// NewTxnWithStalenessOption implements the sessionctx.Context interface. -func (c *Context) NewTxnWithStalenessOption(ctx context.Context, option sessionctx.StalenessTxnOption) error { - return c.NewTxn(ctx) -} - // RefreshTxnCtx implements the sessionctx.Context interface. func (c *Context) RefreshTxnCtx(ctx context.Context) error { return errors.Trace(c.NewTxn(ctx)) @@ -218,6 +213,11 @@ func (c *Context) InitTxnWithStartTS(startTS uint64) error { return nil } +// NewTxnWithStalenessOption implements the sessionctx.Context interface. +func (c *Context) NewTxnWithStalenessOption(ctx context.Context, option sessionctx.StalenessTxnOption) error { + return c.NewTxn(ctx) +} + // GetStore gets the store of session. func (c *Context) GetStore() kv.Storage { return c.Store From 18e8d8b106cc4ccd71dddb79b56fffe3b650d966 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 20 May 2021 17:44:33 +0800 Subject: [PATCH 6/6] Add a comment for the test timestamp Signed-off-by: JmPotato --- executor/stale_txn_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index c10513d49ee28..dc5ddd0785208 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -191,6 +191,7 @@ func (s *testStaleTxnSerialSuite) TestStalenessAndHistoryRead(c *C) { // set @@tidb_snapshot before staleness txn tk.MustExec(`set @@tidb_snapshot="2016-10-08 16:45:26";`) tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`) + // 1599321600000 == 2020-09-06 00:00:00 c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000)) tk.MustExec("commit") // set @@tidb_snapshot during staleness txn