Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
is := dom.InfoSchema()
txnCtx := e.ctx.GetSessionVars().TxnCtx
txnCtx.InfoSchema = is
txnCtx.SchemaVersion = is.SchemaMetaVersion()
// DDL will force commit old transaction, after DDL, in transaction status should be false.
e.ctx.GetSessionVars().SetInTxn(false)
return nil
Expand Down
6 changes: 3 additions & 3 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,13 +686,13 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte
}
failpoint.Inject("mockStalenessTxnSchemaVer", func(val failpoint.Value) {
if val.(bool) {
staleVer = e.ctx.GetSessionVars().TxnCtx.SchemaVersion - 1
staleVer = e.ctx.GetSessionVars().GetInfoSchema().SchemaMetaVersion() - 1
} else {
staleVer = e.ctx.GetSessionVars().TxnCtx.SchemaVersion
staleVer = e.ctx.GetSessionVars().GetInfoSchema().SchemaMetaVersion()
}
})
// TODO: currently we directly check the schema version. In future, we can cache the stale infoschema instead.
if e.ctx.GetSessionVars().TxnCtx.SchemaVersion > staleVer {
if e.ctx.GetSessionVars().GetInfoSchema().SchemaMetaVersion() > staleVer {
return errors.New("schema version changed after the staleness startTS")
}

Expand Down
47 changes: 22 additions & 25 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func (s *session) doCommit(ctx context.Context) error {
physicalTableIDs = append(physicalTableIDs, id)
}
// Set this option for 2 phase commit to validate schema lease.
s.txn.SetOption(tikvstore.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.sessionVars.TxnCtx.SchemaVersion, physicalTableIDs))
s.txn.SetOption(tikvstore.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.sessionVars.GetInfoSchema().SchemaMetaVersion(), physicalTableIDs))
s.txn.SetOption(tikvstore.InfoSchema, s.sessionVars.TxnCtx.InfoSchema)
s.txn.SetOption(tikvstore.CommitHook, func(info string, _ error) { s.sessionVars.LastTxnInfo = info })
if s.GetSessionVars().EnableAmendPessimisticTxn {
Expand Down Expand Up @@ -1485,7 +1485,7 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex
if err != nil {
if !kv.ErrKeyExists.Equal(err) {
logutil.Logger(ctx).Warn("run statement failed",
zap.Int64("schemaVersion", s.sessionVars.TxnCtx.SchemaVersion),
zap.Int64("schemaVersion", s.sessionVars.GetInfoSchema().SchemaMetaVersion()),
zap.Error(err),
zap.String("session", s.String()))
}
Expand Down Expand Up @@ -1926,7 +1926,7 @@ func (s *session) NewTxn(ctx context.Context) error {
}
vars := s.GetSessionVars()
logutil.Logger(ctx).Info("NewTxn() inside a transaction auto commit",
zap.Int64("schemaVersion", vars.TxnCtx.SchemaVersion),
zap.Int64("schemaVersion", vars.GetInfoSchema().SchemaMetaVersion()),
zap.Uint64("txnStartTS", txnID),
zap.String("txnScope", txnScope))
}
Expand All @@ -1942,13 +1942,12 @@ func (s *session) NewTxn(ctx context.Context) error {
s.txn.changeInvalidToValid(txn)
is := domain.GetDomain(s).InfoSchema()
s.sessionVars.TxnCtx = &variable.TransactionContext{
InfoSchema: is,
SchemaVersion: is.SchemaMetaVersion(),
CreateTime: time.Now(),
StartTS: txn.StartTS(),
ShardStep: int(s.sessionVars.ShardAllocateStep),
IsStaleness: false,
TxnScope: s.sessionVars.CheckAndGetTxnScope(),
InfoSchema: is,
CreateTime: time.Now(),
StartTS: txn.StartTS(),
ShardStep: int(s.sessionVars.ShardAllocateStep),
IsStaleness: false,
TxnScope: s.sessionVars.CheckAndGetTxnScope(),
}
return nil
}
Expand Down Expand Up @@ -2678,11 +2677,10 @@ func (s *session) PrepareTxnCtx(ctx context.Context) {

is := domain.GetDomain(s).InfoSchema()
s.sessionVars.TxnCtx = &variable.TransactionContext{
InfoSchema: is,
SchemaVersion: is.SchemaMetaVersion(),
CreateTime: time.Now(),
ShardStep: int(s.sessionVars.ShardAllocateStep),
TxnScope: s.GetSessionVars().CheckAndGetTxnScope(),
InfoSchema: is,
CreateTime: time.Now(),
ShardStep: int(s.sessionVars.ShardAllocateStep),
TxnScope: s.GetSessionVars().CheckAndGetTxnScope(),
}
if !s.sessionVars.IsAutocommit() || s.sessionVars.RetryInfo.Retrying {
if s.sessionVars.TxnMode == ast.Pessimistic {
Expand Down Expand Up @@ -2754,7 +2752,7 @@ func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionc
}
vars := s.GetSessionVars()
logutil.Logger(ctx).Info("InitTxnWithExactStaleness() inside a transaction auto commit",
zap.Int64("schemaVersion", vars.TxnCtx.SchemaVersion),
zap.Int64("schemaVersion", vars.GetInfoSchema().SchemaMetaVersion()),
zap.Uint64("txnStartTS", txnID),
zap.String("txnScope", txnScope))
}
Expand Down Expand Up @@ -2792,13 +2790,12 @@ func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionc
s.txn.changeInvalidToValid(txn)
is := domain.GetDomain(s).InfoSchema()
s.sessionVars.TxnCtx = &variable.TransactionContext{
InfoSchema: is,
SchemaVersion: is.SchemaMetaVersion(),
CreateTime: time.Now(),
StartTS: txn.StartTS(),
ShardStep: int(s.sessionVars.ShardAllocateStep),
IsStaleness: true,
TxnScope: txnScope,
InfoSchema: is,
CreateTime: time.Now(),
StartTS: txn.StartTS(),
ShardStep: int(s.sessionVars.ShardAllocateStep),
IsStaleness: true,
TxnScope: txnScope,
}
return nil
}
Expand All @@ -2825,7 +2822,7 @@ func logStmt(execStmt *executor.ExecStmt, vars *variable.SessionVars) {
*ast.RevokeStmt, *ast.AlterTableStmt, *ast.CreateDatabaseStmt, *ast.CreateIndexStmt, *ast.CreateTableStmt,
*ast.DropDatabaseStmt, *ast.DropIndexStmt, *ast.DropTableStmt, *ast.RenameTableStmt, *ast.TruncateTableStmt:
user := vars.User
schemaVersion := vars.TxnCtx.SchemaVersion
schemaVersion := vars.GetInfoSchema().SchemaMetaVersion()
if ss, ok := execStmt.StmtNode.(ast.SensitiveStmtNode); ok {
logutil.BgLogger().Info("CRUCIAL OPERATION",
zap.Uint64("conn", vars.ConnectionID),
Expand Down Expand Up @@ -2854,7 +2851,7 @@ func logQuery(query string, vars *variable.SessionVars) {
logutil.BgLogger().Info("GENERAL_LOG",
zap.Uint64("conn", vars.ConnectionID),
zap.Stringer("user", vars.User),
zap.Int64("schemaVersion", vars.TxnCtx.SchemaVersion),
zap.Int64("schemaVersion", vars.GetInfoSchema().SchemaMetaVersion()),
zap.Uint64("txnStartTS", vars.TxnCtx.StartTS),
zap.Uint64("forUpdateTS", vars.TxnCtx.GetForUpdateTS()),
zap.Bool("isReadConsistency", vars.IsIsolation(ast.ReadCommitted)),
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/binloginfo/binloginfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func GetPrewriteValue(ctx sessionctx.Context, createIfNotExists bool) *binlog.Pr
vars := ctx.GetSessionVars()
v, ok := vars.TxnCtx.Binlog.(*binlog.PrewriteValue)
if !ok && createIfNotExists {
schemaVer := ctx.GetSessionVars().TxnCtx.SchemaVersion
schemaVer := ctx.GetSessionVars().GetInfoSchema().SchemaMetaVersion()
v = &binlog.PrewriteValue{SchemaVersion: schemaVer}
vars.TxnCtx.Binlog = v
}
Expand Down
34 changes: 20 additions & 14 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,12 @@ func (r *retryInfoAutoIDs) getCurrent() (int64, bool) {

// TransactionContext is used to store variables that has transaction scope.
type TransactionContext struct {
forUpdateTS uint64
stmtFuture oracle.Future
Binlog interface{}
InfoSchema interface{}
History interface{}
SchemaVersion int64
StartTS uint64
forUpdateTS uint64
stmtFuture oracle.Future
Binlog interface{}
InfoSchema interface{}
History interface{}
StartTS uint64

// ShardStep indicates the max size of continuous rowid shard in one transaction.
ShardStep int
Expand Down Expand Up @@ -870,19 +869,26 @@ func (s *SessionVars) BuildParserConfig() parser.ParserConfig {
}
}

// FIXME: remove this interface
// infoschemaMetaVersion is a workaround. Due to circular dependency,
// can not return the complete interface. But SchemaMetaVersion is widely used for logging.
// So we give a convenience for that
type infoschemaMetaVersion interface {
SchemaMetaVersion() int64
}

// GetInfoSchema returns snapshotInfoSchema if snapshot schema is set.
// Otherwise, transaction infoschema is returned.
// Nil if there is no available infoschema.
func (s *SessionVars) GetInfoSchema() interface{} {
type IS interface {
SchemaMetaVersion() int64
}
if snap, ok := s.SnapshotInfoschema.(IS); ok {
func (s *SessionVars) GetInfoSchema() infoschemaMetaVersion {
if snap, ok := s.SnapshotInfoschema.(infoschemaMetaVersion); ok {
logutil.BgLogger().Info("use snapshot schema", zap.Uint64("conn", s.ConnectionID), zap.Int64("schemaVersion", snap.SchemaMetaVersion()))
return snap
}
if s.TxnCtx != nil && s.TxnCtx.InfoSchema != nil {
return s.TxnCtx.InfoSchema
if s.TxnCtx != nil {
if is, ok := s.TxnCtx.InfoSchema.(infoschemaMetaVersion); ok {
return is
}
}
return nil
}
Expand Down