diff --git a/executor/ddl.go b/executor/ddl.go index 64a597e2eb024..81f7221d1e60e 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -140,7 +140,6 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { is := dom.InfoSchema() txnCtx := e.ctx.GetSessionVars().TxnCtx txnCtx.InfoSchema = is - txnCtx.SchemaVersion = is.SchemaMetaVersion() // DDL will force commit old transaction, after DDL, in transaction status should be false. e.ctx.GetSessionVars().SetInTxn(false) return nil diff --git a/executor/simple.go b/executor/simple.go index 65df5ca43117f..24cb857aec3d5 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -686,13 +686,13 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte } failpoint.Inject("mockStalenessTxnSchemaVer", func(val failpoint.Value) { if val.(bool) { - staleVer = e.ctx.GetSessionVars().TxnCtx.SchemaVersion - 1 + staleVer = e.ctx.GetSessionVars().GetInfoSchema().SchemaMetaVersion() - 1 } else { - staleVer = e.ctx.GetSessionVars().TxnCtx.SchemaVersion + staleVer = e.ctx.GetSessionVars().GetInfoSchema().SchemaMetaVersion() } }) // TODO: currently we directly check the schema version. In future, we can cache the stale infoschema instead. - if e.ctx.GetSessionVars().TxnCtx.SchemaVersion > staleVer { + if e.ctx.GetSessionVars().GetInfoSchema().SchemaMetaVersion() > staleVer { return errors.New("schema version changed after the staleness startTS") } diff --git a/session/session.go b/session/session.go index 94582fbcb6886..7c499d83afda1 100644 --- a/session/session.go +++ b/session/session.go @@ -491,7 +491,7 @@ func (s *session) doCommit(ctx context.Context) error { physicalTableIDs = append(physicalTableIDs, id) } // Set this option for 2 phase commit to validate schema lease. - s.txn.SetOption(tikvstore.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.sessionVars.TxnCtx.SchemaVersion, physicalTableIDs)) + s.txn.SetOption(tikvstore.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.sessionVars.GetInfoSchema().SchemaMetaVersion(), physicalTableIDs)) s.txn.SetOption(tikvstore.InfoSchema, s.sessionVars.TxnCtx.InfoSchema) s.txn.SetOption(tikvstore.CommitHook, func(info string, _ error) { s.sessionVars.LastTxnInfo = info }) if s.GetSessionVars().EnableAmendPessimisticTxn { @@ -1485,7 +1485,7 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex if err != nil { if !kv.ErrKeyExists.Equal(err) { logutil.Logger(ctx).Warn("run statement failed", - zap.Int64("schemaVersion", s.sessionVars.TxnCtx.SchemaVersion), + zap.Int64("schemaVersion", s.sessionVars.GetInfoSchema().SchemaMetaVersion()), zap.Error(err), zap.String("session", s.String())) } @@ -1926,7 +1926,7 @@ func (s *session) NewTxn(ctx context.Context) error { } vars := s.GetSessionVars() logutil.Logger(ctx).Info("NewTxn() inside a transaction auto commit", - zap.Int64("schemaVersion", vars.TxnCtx.SchemaVersion), + zap.Int64("schemaVersion", vars.GetInfoSchema().SchemaMetaVersion()), zap.Uint64("txnStartTS", txnID), zap.String("txnScope", txnScope)) } @@ -1942,13 +1942,12 @@ func (s *session) NewTxn(ctx context.Context) error { s.txn.changeInvalidToValid(txn) is := domain.GetDomain(s).InfoSchema() s.sessionVars.TxnCtx = &variable.TransactionContext{ - InfoSchema: is, - SchemaVersion: is.SchemaMetaVersion(), - CreateTime: time.Now(), - StartTS: txn.StartTS(), - ShardStep: int(s.sessionVars.ShardAllocateStep), - IsStaleness: false, - TxnScope: s.sessionVars.CheckAndGetTxnScope(), + InfoSchema: is, + CreateTime: time.Now(), + StartTS: txn.StartTS(), + ShardStep: int(s.sessionVars.ShardAllocateStep), + IsStaleness: false, + TxnScope: s.sessionVars.CheckAndGetTxnScope(), } return nil } @@ -2678,11 +2677,10 @@ func (s *session) PrepareTxnCtx(ctx context.Context) { is := domain.GetDomain(s).InfoSchema() s.sessionVars.TxnCtx = &variable.TransactionContext{ - InfoSchema: is, - SchemaVersion: is.SchemaMetaVersion(), - CreateTime: time.Now(), - ShardStep: int(s.sessionVars.ShardAllocateStep), - TxnScope: s.GetSessionVars().CheckAndGetTxnScope(), + InfoSchema: is, + CreateTime: time.Now(), + ShardStep: int(s.sessionVars.ShardAllocateStep), + TxnScope: s.GetSessionVars().CheckAndGetTxnScope(), } if !s.sessionVars.IsAutocommit() || s.sessionVars.RetryInfo.Retrying { if s.sessionVars.TxnMode == ast.Pessimistic { @@ -2754,7 +2752,7 @@ func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionc } vars := s.GetSessionVars() logutil.Logger(ctx).Info("InitTxnWithExactStaleness() inside a transaction auto commit", - zap.Int64("schemaVersion", vars.TxnCtx.SchemaVersion), + zap.Int64("schemaVersion", vars.GetInfoSchema().SchemaMetaVersion()), zap.Uint64("txnStartTS", txnID), zap.String("txnScope", txnScope)) } @@ -2792,13 +2790,12 @@ func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionc s.txn.changeInvalidToValid(txn) is := domain.GetDomain(s).InfoSchema() s.sessionVars.TxnCtx = &variable.TransactionContext{ - InfoSchema: is, - SchemaVersion: is.SchemaMetaVersion(), - CreateTime: time.Now(), - StartTS: txn.StartTS(), - ShardStep: int(s.sessionVars.ShardAllocateStep), - IsStaleness: true, - TxnScope: txnScope, + InfoSchema: is, + CreateTime: time.Now(), + StartTS: txn.StartTS(), + ShardStep: int(s.sessionVars.ShardAllocateStep), + IsStaleness: true, + TxnScope: txnScope, } return nil } @@ -2825,7 +2822,7 @@ func logStmt(execStmt *executor.ExecStmt, vars *variable.SessionVars) { *ast.RevokeStmt, *ast.AlterTableStmt, *ast.CreateDatabaseStmt, *ast.CreateIndexStmt, *ast.CreateTableStmt, *ast.DropDatabaseStmt, *ast.DropIndexStmt, *ast.DropTableStmt, *ast.RenameTableStmt, *ast.TruncateTableStmt: user := vars.User - schemaVersion := vars.TxnCtx.SchemaVersion + schemaVersion := vars.GetInfoSchema().SchemaMetaVersion() if ss, ok := execStmt.StmtNode.(ast.SensitiveStmtNode); ok { logutil.BgLogger().Info("CRUCIAL OPERATION", zap.Uint64("conn", vars.ConnectionID), @@ -2854,7 +2851,7 @@ func logQuery(query string, vars *variable.SessionVars) { logutil.BgLogger().Info("GENERAL_LOG", zap.Uint64("conn", vars.ConnectionID), zap.Stringer("user", vars.User), - zap.Int64("schemaVersion", vars.TxnCtx.SchemaVersion), + zap.Int64("schemaVersion", vars.GetInfoSchema().SchemaMetaVersion()), zap.Uint64("txnStartTS", vars.TxnCtx.StartTS), zap.Uint64("forUpdateTS", vars.TxnCtx.GetForUpdateTS()), zap.Bool("isReadConsistency", vars.IsIsolation(ast.ReadCommitted)), diff --git a/sessionctx/binloginfo/binloginfo.go b/sessionctx/binloginfo/binloginfo.go index 044e6cdc11df9..58313505e1c8e 100644 --- a/sessionctx/binloginfo/binloginfo.go +++ b/sessionctx/binloginfo/binloginfo.go @@ -101,7 +101,7 @@ func GetPrewriteValue(ctx sessionctx.Context, createIfNotExists bool) *binlog.Pr vars := ctx.GetSessionVars() v, ok := vars.TxnCtx.Binlog.(*binlog.PrewriteValue) if !ok && createIfNotExists { - schemaVer := ctx.GetSessionVars().TxnCtx.SchemaVersion + schemaVer := ctx.GetSessionVars().GetInfoSchema().SchemaMetaVersion() v = &binlog.PrewriteValue{SchemaVersion: schemaVer} vars.TxnCtx.Binlog = v } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 89b31da2e8bbd..7db9de383ba55 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -129,13 +129,12 @@ func (r *retryInfoAutoIDs) getCurrent() (int64, bool) { // TransactionContext is used to store variables that has transaction scope. type TransactionContext struct { - forUpdateTS uint64 - stmtFuture oracle.Future - Binlog interface{} - InfoSchema interface{} - History interface{} - SchemaVersion int64 - StartTS uint64 + forUpdateTS uint64 + stmtFuture oracle.Future + Binlog interface{} + InfoSchema interface{} + History interface{} + StartTS uint64 // ShardStep indicates the max size of continuous rowid shard in one transaction. ShardStep int @@ -870,19 +869,26 @@ func (s *SessionVars) BuildParserConfig() parser.ParserConfig { } } +// FIXME: remove this interface +// infoschemaMetaVersion is a workaround. Due to circular dependency, +// can not return the complete interface. But SchemaMetaVersion is widely used for logging. +// So we give a convenience for that +type infoschemaMetaVersion interface { + SchemaMetaVersion() int64 +} + // GetInfoSchema returns snapshotInfoSchema if snapshot schema is set. // Otherwise, transaction infoschema is returned. // Nil if there is no available infoschema. -func (s *SessionVars) GetInfoSchema() interface{} { - type IS interface { - SchemaMetaVersion() int64 - } - if snap, ok := s.SnapshotInfoschema.(IS); ok { +func (s *SessionVars) GetInfoSchema() infoschemaMetaVersion { + if snap, ok := s.SnapshotInfoschema.(infoschemaMetaVersion); ok { logutil.BgLogger().Info("use snapshot schema", zap.Uint64("conn", s.ConnectionID), zap.Int64("schemaVersion", snap.SchemaMetaVersion())) return snap } - if s.TxnCtx != nil && s.TxnCtx.InfoSchema != nil { - return s.TxnCtx.InfoSchema + if s.TxnCtx != nil { + if is, ok := s.TxnCtx.InfoSchema.(infoschemaMetaVersion); ok { + return is + } } return nil }