diff --git a/executor/insert.go b/executor/insert.go index 399041b83c053..c79f4bc1071a0 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -51,15 +51,12 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { defer sessVars.CleanBuffers() ignoreErr := sessVars.StmtCtx.DupKeyAsWarning - if !sessVars.LightningMode { - txn, err := e.ctx.Txn(true) - if err != nil { - return err - } - sessVars.GetWriteStmtBufs().BufStore = kv.NewBufferStore(txn, kv.TempTxnMemBufCap) + txn, err := e.ctx.Txn(true) + if err != nil { + return err } - - e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(rows))) + sessVars.GetWriteStmtBufs().BufStore = kv.NewBufferStore(txn, kv.TempTxnMemBufCap) + sessVars.StmtCtx.AddRecordRows(uint64(len(rows))) // If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored. // For example, without IGNORE, a row that duplicates an existing UNIQUE index or PRIMARY KEY value in // the table causes a duplicate-key error and the statement is aborted. With IGNORE, the row is discarded and no error occurs. diff --git a/executor/insert_common.go b/executor/insert_common.go index fa9e9718cd22c..39da4fe3458c5 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -396,13 +396,11 @@ func (e *InsertValues) doBatchInsert(ctx context.Context) error { // We should return a special error for batch insert. return ErrBatchInsertFail.GenWithStack("BatchInsert failed with error: %v", err) } - if !sessVars.LightningMode { - txn, err := e.ctx.Txn(true) - if err != nil { - return err - } - sessVars.GetWriteStmtBufs().BufStore = kv.NewBufferStore(txn, kv.TempTxnMemBufCap) + txn, err := e.ctx.Txn(true) + if err != nil { + return err } + sessVars.GetWriteStmtBufs().BufStore = kv.NewBufferStore(txn, kv.TempTxnMemBufCap) return nil } diff --git a/executor/prepared.go b/executor/prepared.go index 76762c5ccd992..e44d14082aaab 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -173,7 +173,7 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error { Params: sorter.markers, SchemaVersion: e.is.SchemaMetaVersion(), } - prepared.UseCache = plannercore.PreparedPlanCacheEnabled() && (vars.LightningMode || plannercore.Cacheable(stmt)) + prepared.UseCache = plannercore.PreparedPlanCacheEnabled() && plannercore.Cacheable(stmt) // We try to build the real statement of preparedStmt. for i := range prepared.Params { diff --git a/kv/buffer_store.go b/kv/buffer_store.go index 3a7054193a9d7..04ddf6b7a2e65 100644 --- a/kv/buffer_store.go +++ b/kv/buffer_store.go @@ -20,8 +20,6 @@ import ( var ( // DefaultTxnMembufCap is the default transaction membuf capability. DefaultTxnMembufCap = 4 * 1024 - // ImportingTxnMembufCap is the capability of tidb importing data situation. - ImportingTxnMembufCap = 32 * 1024 // TempTxnMemBufCap is the capability of temporary membuf. TempTxnMemBufCap = 64 ) diff --git a/session/session.go b/session/session.go index e4e513025c458..1cd93c76eb4d5 100644 --- a/session/session.go +++ b/session/session.go @@ -246,10 +246,6 @@ func (s *session) DDLOwnerChecker() owner.DDLOwnerChecker { } func (s *session) getMembufCap() int { - if s.sessionVars.LightningMode { - return kv.ImportingTxnMembufCap - } - return kv.DefaultTxnMembufCap } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 143bd25289625..ecea3f5af6bde 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -306,9 +306,6 @@ type SessionVars struct { /* TiDB system variables */ - // LightningMode is true when the lightning use the kvencoder to transfer sql to raw kv. - LightningMode bool - // SkipUTF8Check check on input value. SkipUTF8Check bool @@ -593,9 +590,7 @@ func (s *SessionVars) GetSplitRegionTimeout() time.Duration { // CleanBuffers cleans the temporary bufs func (s *SessionVars) CleanBuffers() { - if !s.LightningMode { - s.GetWriteStmtBufs().clean() - } + s.GetWriteStmtBufs().clean() } // AllocPlanColumnID allocates column id for plan. diff --git a/table/tables/index.go b/table/tables/index.go index f0bc5644fddde..7b305bf602b24 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -198,9 +198,10 @@ func (c *index) Create(sctx sessionctx.Context, rm kv.RetrieverMutator, indexedV fn(&opt) } ss := opt.AssertionProto - writeBufs := sctx.GetSessionVars().GetWriteStmtBufs() - skipCheck := sctx.GetSessionVars().LightningMode || sctx.GetSessionVars().StmtCtx.BatchCheck - key, distinct, err := c.GenIndexKey(sctx.GetSessionVars().StmtCtx, indexedValues, h, writeBufs.IndexKeyBuf) + vars := sctx.GetSessionVars() + writeBufs := vars.GetWriteStmtBufs() + skipCheck := vars.StmtCtx.BatchCheck + key, distinct, err := c.GenIndexKey(vars.StmtCtx, indexedValues, h, writeBufs.IndexKeyBuf) if err != nil { return 0, err } diff --git a/table/tables/tables.go b/table/tables/tables.go index 4d2e705b319f6..e0e9315cd4a85 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -413,10 +413,6 @@ func adjustRowValuesBuf(writeBufs *variable.WriteStmtBufs, rowLen int) { // getRollbackableMemStore get a rollbackable BufferStore, when we are importing data, // Just add the kv to transaction's membuf directly. func (t *tableCommon) getRollbackableMemStore(ctx sessionctx.Context) (kv.RetrieverMutator, error) { - if ctx.GetSessionVars().LightningMode { - return ctx.Txn(true) - } - bs := ctx.GetSessionVars().GetWriteStmtBufs().BufStore if bs == nil { txn, err := ctx.Txn(true) @@ -528,12 +524,10 @@ func (t *tableCommon) AddRecord(ctx sessionctx.Context, r []types.Datum, opts .. } txn.SetAssertion(key, kv.None) - if !sessVars.LightningMode { - if err = rm.(*kv.BufferStore).SaveTo(txn); err != nil { - return 0, err - } - ctx.StmtAddDirtyTableOP(table.DirtyTableAddRow, t.physicalTableID, recordID) + if err = rm.(*kv.BufferStore).SaveTo(txn); err != nil { + return 0, err } + ctx.StmtAddDirtyTableOP(table.DirtyTableAddRow, t.physicalTableID, recordID) if shouldWriteBinlog(ctx) { // For insert, TiDB and Binlog can use same row and schema. @@ -594,7 +588,7 @@ func (t *tableCommon) addIndices(sctx sessionctx.Context, recordID int64, r []ty } else { ctx = context.Background() } - skipCheck := sctx.GetSessionVars().LightningMode || sctx.GetSessionVars().StmtCtx.BatchCheck + skipCheck := sctx.GetSessionVars().StmtCtx.BatchCheck if t.meta.PKIsHandle && !skipCheck && !opt.SkipHandleCheck { if err := CheckHandleExists(ctx, sctx, t, recordID, nil); err != nil { return recordID, err diff --git a/util/kvencoder/allocator.go b/util/kvencoder/allocator.go deleted file mode 100644 index 36a80382819f2..0000000000000 --- a/util/kvencoder/allocator.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package kvenc - -import ( - "sync/atomic" - - "github.com/pingcap/tidb/meta/autoid" -) - -var _ autoid.Allocator = &Allocator{} - -var ( - step = int64(5000) -) - -// NewAllocator creates an Allocator. -func NewAllocator() *Allocator { - return &Allocator{} -} - -// Allocator is an id allocator, it is only used in lightning. -type Allocator struct { - base int64 -} - -// Alloc allocs a next autoID for table with tableID. -func (alloc *Allocator) Alloc(tableID int64) (int64, error) { - return atomic.AddInt64(&alloc.base, 1), nil -} - -// Reset allow newBase smaller than alloc.base, and will set the alloc.base to newBase. -func (alloc *Allocator) Reset(newBase int64) { - atomic.StoreInt64(&alloc.base, newBase) -} - -// Rebase not allow newBase smaller than alloc.base, and will skip the smaller newBase. -func (alloc *Allocator) Rebase(tableID, newBase int64, allocIDs bool) error { - // CAS - for { - oldBase := atomic.LoadInt64(&alloc.base) - if newBase <= oldBase { - break - } - if atomic.CompareAndSwapInt64(&alloc.base, oldBase, newBase) { - break - } - } - - return nil -} - -// Base returns the current base of Allocator. -func (alloc *Allocator) Base() int64 { - return atomic.LoadInt64(&alloc.base) -} - -// End is only used for test. -func (alloc *Allocator) End() int64 { - return alloc.Base() + step -} - -// NextGlobalAutoID returns the next global autoID. -func (alloc *Allocator) NextGlobalAutoID(tableID int64) (int64, error) { - return alloc.End() + 1, nil -} diff --git a/util/kvencoder/kv_encoder.go b/util/kvencoder/kv_encoder.go deleted file mode 100644 index e46c6059e37dc..0000000000000 --- a/util/kvencoder/kv_encoder.go +++ /dev/null @@ -1,271 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package kvenc - -import ( - "bytes" - "context" - "fmt" - "strings" - "sync" - "sync/atomic" - - "github.com/pingcap/errors" - "github.com/pingcap/parser/mysql" - "github.com/pingcap/tidb/domain" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta" - "github.com/pingcap/tidb/meta/autoid" - "github.com/pingcap/tidb/session" - "github.com/pingcap/tidb/store/mockstore" - "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/logutil" - "go.uber.org/zap" -) - -var _ KvEncoder = &kvEncoder{} -var mockConnID uint64 - -// KvPair is a key-value pair. -type KvPair struct { - // Key is the key of the pair. - Key []byte - // Val is the value of the pair. if the op is delete, the len(Val) == 0 - Val []byte -} - -// KvEncoder is an encoder that transfer sql to key-value pairs. -type KvEncoder interface { - // Encode transfers sql to kv pairs. - // Before use Encode() method, please make sure you already created schame by calling ExecDDLSQL() method. - // NOTE: now we just support transfers insert statement to kv pairs. - // (if we wanna support other statement, we need to add a kv.Storage parameter, - // and pass tikv store in.) - // return encoded kvs array that generate by sql, and affectRows count. - Encode(sql string, tableID int64) (kvPairs []KvPair, affectedRows uint64, err error) - - // PrepareStmt prepare query statement, and return statement id. - // Pass stmtID into EncodePrepareStmt to execute a prepare statement. - PrepareStmt(query string) (stmtID uint32, err error) - - // EncodePrepareStmt transfer prepare query to kv pairs. - // stmtID is generated by PrepareStmt. - EncodePrepareStmt(tableID int64, stmtID uint32, param ...interface{}) (kvPairs []KvPair, affectedRows uint64, err error) - - // ExecDDLSQL executes ddl sql, you must use it to create schema infos. - ExecDDLSQL(sql string) error - - // EncodeMetaAutoID encode the table meta info, autoID to coresponding key-value pair. - EncodeMetaAutoID(dbID, tableID, autoID int64) (KvPair, error) - - // SetSystemVariable set system variable name = value. - SetSystemVariable(name string, value string) error - - // GetSystemVariable get the system variable value of name. - GetSystemVariable(name string) (string, bool) - - // Close cleanup the kvEncoder. - Close() error -} - -var ( - // refCount is used to ensure that there is only one domain.Domain instance. - refCount int64 - mu sync.Mutex - storeGlobal kv.Storage - domGlobal *domain.Domain -) - -type kvEncoder struct { - se session.Session - store kv.Storage - dom *domain.Domain -} - -// New new a KvEncoder -func New(dbName string, idAlloc autoid.Allocator) (KvEncoder, error) { - kvEnc := &kvEncoder{} - mu.Lock() - defer mu.Unlock() - if refCount == 0 { - if err := initGlobal(); err != nil { - return nil, err - } - } - err := kvEnc.initial(dbName, idAlloc) - if err != nil { - return nil, err - } - refCount++ - return kvEnc, nil -} - -func (e *kvEncoder) Close() error { - e.se.Close() - mu.Lock() - defer mu.Unlock() - refCount-- - if refCount == 0 { - e.dom.Close() - if err := e.store.Close(); err != nil { - return err - } - } - return nil -} - -func (e *kvEncoder) Encode(sql string, tableID int64) (kvPairs []KvPair, affectedRows uint64, err error) { - e.se.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, true) - defer e.se.RollbackTxn(context.Background()) - - _, err = e.se.Execute(context.Background(), sql) - if err != nil { - return nil, 0, err - } - - return e.getKvPairsInMemBuffer(tableID) -} - -func (e *kvEncoder) getKvPairsInMemBuffer(tableID int64) (kvPairs []KvPair, affectedRows uint64, err error) { - txn, err := e.se.Txn(true) - if err != nil { - return nil, 0, err - } - txnMemBuffer := txn.GetMemBuffer() - kvPairs = make([]KvPair, 0, txnMemBuffer.Len()) - err = kv.WalkMemBuffer(txnMemBuffer, func(k kv.Key, v []byte) error { - if bytes.HasPrefix(k, tablecodec.TablePrefix()) { - k = tablecodec.ReplaceRecordKeyTableID(k, tableID) - } - kvPairs = append(kvPairs, KvPair{Key: k, Val: v}) - return nil - }) - - if err != nil { - return nil, 0, err - } - return kvPairs, e.se.GetSessionVars().StmtCtx.AffectedRows(), nil -} - -func (e *kvEncoder) PrepareStmt(query string) (stmtID uint32, err error) { - stmtID, _, _, err = e.se.PrepareStmt(query) - return -} - -func (e *kvEncoder) EncodePrepareStmt(tableID int64, stmtID uint32, args ...interface{}) (kvPairs []KvPair, affectedRows uint64, err error) { - e.se.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, true) - defer e.se.RollbackTxn(context.Background()) - params := make([]types.Datum, len(args)) - for i := 0; i < len(params); i++ { - params[i] = types.NewDatum(args[i]) - } - _, err = e.se.ExecutePreparedStmt(context.Background(), stmtID, params) - if err != nil { - return nil, 0, err - } - - return e.getKvPairsInMemBuffer(tableID) -} - -func (e *kvEncoder) EncodeMetaAutoID(dbID, tableID, autoID int64) (KvPair, error) { - mockTxn := kv.NewMockTxn() - m := meta.NewMeta(mockTxn) - k, v := m.GenAutoTableIDKeyValue(dbID, tableID, autoID) - return KvPair{Key: k, Val: v}, nil -} - -func (e *kvEncoder) ExecDDLSQL(sql string) error { - _, err := e.se.Execute(context.Background(), sql) - if err != nil { - return err - } - - return nil -} - -func (e *kvEncoder) SetSystemVariable(name string, value string) error { - name = strings.ToLower(name) - if e.se != nil { - return e.se.GetSessionVars().SetSystemVar(name, value) - } - return errors.Errorf("e.se is nil, please new KvEncoder by kvencoder.New().") -} - -func (e *kvEncoder) GetSystemVariable(name string) (string, bool) { - name = strings.ToLower(name) - if e.se == nil { - return "", false - } - - return e.se.GetSessionVars().GetSystemVar(name) -} - -func newMockTikvWithBootstrap() (kv.Storage, *domain.Domain, error) { - store, err := mockstore.NewMockTikvStore() - if err != nil { - return nil, nil, err - } - session.SetSchemaLease(0) - dom, err := session.BootstrapSession(store) - return store, dom, err -} - -func (e *kvEncoder) initial(dbName string, idAlloc autoid.Allocator) (err error) { - se, err := session.CreateSession(storeGlobal) - if err != nil { - return - } - - dbName = strings.Replace(dbName, "`", "``", -1) - - se.SetConnectionID(atomic.AddUint64(&mockConnID, 1)) - _, err = se.Execute(context.Background(), fmt.Sprintf("create database if not exists `%s`", dbName)) - if err != nil { - return - } - _, err = se.Execute(context.Background(), fmt.Sprintf("use `%s`", dbName)) - if err != nil { - return - } - - se.GetSessionVars().IDAllocator = idAlloc - se.GetSessionVars().LightningMode = true - se.GetSessionVars().SkipUTF8Check = true - e.se = se - e.store = storeGlobal - e.dom = domGlobal - return nil -} - -// initGlobal modify the global domain and store -func initGlobal() error { - // disable stats update. - session.SetStatsLease(0) - var err error - storeGlobal, domGlobal, err = newMockTikvWithBootstrap() - if err == nil { - return nil - } - - if storeGlobal != nil { - if err1 := storeGlobal.Close(); err1 != nil { - logutil.BgLogger().Error("storeGlobal close error", zap.Error(err1)) - } - } - if domGlobal != nil { - domGlobal.Close() - } - return err -} diff --git a/util/kvencoder/kv_encoder_test.go b/util/kvencoder/kv_encoder_test.go deleted file mode 100644 index 1aa044dc28eea..0000000000000 --- a/util/kvencoder/kv_encoder_test.go +++ /dev/null @@ -1,733 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package kvenc - -import ( - "bytes" - "fmt" - "strconv" - "testing" - "time" - - "github.com/pingcap/tidb/meta/autoid" - "github.com/pingcap/tidb/store/mockstore" - - . "github.com/pingcap/check" - "github.com/pingcap/errors" - "github.com/pingcap/tidb/domain" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/session" - "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/structure" - "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tidb/util/testkit" - "github.com/pingcap/tidb/util/testleak" -) - -var _ = Suite(&testKvEncoderSuite{}) - -func TestT(t *testing.T) { - TestingT(t) -} - -func newStoreWithBootstrap() (kv.Storage, *domain.Domain, error) { - store, err := mockstore.NewMockTikvStore() - if err != nil { - return nil, nil, errors.Trace(err) - } - session.SetSchemaLease(0) - session.DisableStats4Test() - dom, err := session.BootstrapSession(store) - return store, dom, errors.Trace(err) -} - -type testKvEncoderSuite struct { - store kv.Storage - dom *domain.Domain -} - -func (s *testKvEncoderSuite) cleanEnv(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - r := tk.MustQuery("show tables") - for _, tb := range r.Rows() { - tableName := tb[0] - tk.MustExec(fmt.Sprintf("drop table %v", tableName)) - } -} - -func (s *testKvEncoderSuite) SetUpSuite(c *C) { - testleak.BeforeTest() - store, dom, err := newStoreWithBootstrap() - c.Assert(err, IsNil) - s.store = store - s.dom = dom -} - -func (s *testKvEncoderSuite) TearDownSuite(c *C) { - s.dom.Close() - s.store.Close() - testleak.AfterTest(c)() -} - -func getExpectKvPairs(tkExpect *testkit.TestKit, sql string) []KvPair { - tkExpect.MustExec("begin") - tkExpect.MustExec(sql) - kvPairsExpect := make([]KvPair, 0) - txn, err := tkExpect.Se.Txn(true) - if err != nil { - return nil - } - kv.WalkMemBuffer(txn.GetMemBuffer(), func(k kv.Key, v []byte) error { - kvPairsExpect = append(kvPairsExpect, KvPair{Key: k, Val: v}) - return nil - }) - tkExpect.MustExec("rollback") - return kvPairsExpect -} - -type testCase struct { - sql string - // expectEmptyValCnt only check if expectEmptyValCnt > 0 - expectEmptyValCnt int - // expectKvCnt only check if expectKvCnt > 0 - expectKvCnt int - expectAffectedRows int -} - -func (s *testKvEncoderSuite) runTestSQL(c *C, tkExpect *testkit.TestKit, encoder KvEncoder, cases []testCase, tableID int64) { - for _, ca := range cases { - comment := fmt.Sprintf("sql:%v", ca.sql) - kvPairs, affectedRows, err := encoder.Encode(ca.sql, tableID) - c.Assert(err, IsNil, Commentf(comment)) - - if ca.expectAffectedRows > 0 { - c.Assert(affectedRows, Equals, uint64(ca.expectAffectedRows)) - } - - kvPairsExpect := getExpectKvPairs(tkExpect, ca.sql) - c.Assert(len(kvPairs), Equals, len(kvPairsExpect), Commentf(comment)) - if ca.expectKvCnt > 0 { - c.Assert(len(kvPairs), Equals, ca.expectKvCnt, Commentf(comment)) - } - - emptyValCount := 0 - for i, row := range kvPairs { - expectKey := kvPairsExpect[i].Key - if bytes.HasPrefix(row.Key, tablecodec.TablePrefix()) { - expectKey = tablecodec.ReplaceRecordKeyTableID(expectKey, tableID) - } - c.Assert(bytes.Compare(row.Key, expectKey), Equals, 0, Commentf(comment)) - c.Assert(bytes.Compare(row.Val, kvPairsExpect[i].Val), Equals, 0, Commentf(comment)) - - if len(row.Val) == 0 { - emptyValCount++ - } - } - if ca.expectEmptyValCnt > 0 { - c.Assert(emptyValCount, Equals, ca.expectEmptyValCnt, Commentf(comment)) - } - } -} - -func (s *testKvEncoderSuite) TestCustomDatabaseHandle(c *C) { - dbname := "tidb" - - tkExpect := testkit.NewTestKit(c, s.store) - tkExpect.MustExec("create database if not exists " + dbname) - tkExpect.MustExec("use " + dbname) - - encoder, err := New(dbname, nil) - c.Assert(err, IsNil) - defer encoder.Close() - - var tableID int64 = 123 - - schemaSQL := "create table tis (id int auto_increment, a char(10), primary key(id))" - tkExpect.MustExec(schemaSQL) - err = encoder.ExecDDLSQL(schemaSQL) - c.Assert(err, IsNil) - - sqls := []testCase{ - {"insert into tis (a) values('test')", 0, 1, 1}, - {"insert into tis (a) values('test'), ('test1')", 0, 2, 2}, - } - - s.runTestSQL(c, tkExpect, encoder, sqls, tableID) -} - -func (s *testKvEncoderSuite) TestInsertPkIsHandle(c *C) { - tkExpect := testkit.NewTestKit(c, s.store) - tkExpect.MustExec("use test") - var tableID int64 = 1 - encoder, err := New("test", nil) - c.Assert(err, IsNil) - defer encoder.Close() - - tkExpect.MustExec("drop table if exists t") - schemaSQL := "create table t(id int auto_increment, a char(10), primary key(id))" - tkExpect.MustExec(schemaSQL) - err = encoder.ExecDDLSQL(schemaSQL) - c.Assert(err, IsNil) - - sqls := []testCase{ - {"insert into t values(1, 'test');", 0, 1, 1}, - {"insert into t(a) values('test')", 0, 1, 1}, - {"insert into t(a) values('test'), ('test1')", 0, 2, 2}, - {"insert into t(id, a) values(3, 'test')", 0, 1, 1}, - {"insert into t values(1000000, 'test')", 0, 1, 1}, - {"insert into t(a) values('test')", 0, 1, 1}, - {"insert into t(id, a) values(4, 'test')", 0, 1, 1}, - } - - s.runTestSQL(c, tkExpect, encoder, sqls, tableID) - - tkExpect.MustExec("drop table if exists t1") - schemaSQL = "create table t1(id int auto_increment, a char(10), primary key(id), key a_idx(a))" - tkExpect.MustExec(schemaSQL) - err = encoder.ExecDDLSQL(schemaSQL) - c.Assert(err, IsNil) - - tableID = 2 - sqls = []testCase{ - {"insert into t1 values(1, 'test');", 0, 2, 1}, - {"insert into t1(a) values('test')", 0, 2, 1}, - {"insert into t1(id, a) values(3, 'test')", 0, 2, 1}, - {"insert into t1 values(1000000, 'test')", 0, 2, 1}, - {"insert into t1(a) values('test')", 0, 2, 1}, - {"insert into t1(id, a) values(4, 'test')", 0, 2, 1}, - } - - s.runTestSQL(c, tkExpect, encoder, sqls, tableID) - - schemaSQL = `create table t2( - id int auto_increment, - a char(10), - b datetime default NULL, - primary key(id), - key a_idx(a), - unique b_idx(b)) - ` - - tableID = 3 - tkExpect.MustExec("drop table if exists t2") - tkExpect.MustExec(schemaSQL) - err = encoder.ExecDDLSQL(schemaSQL) - c.Assert(err, IsNil) - - sqls = []testCase{ - {"insert into t2(id, a) values(1, 'test')", 0, 3, 1}, - {"insert into t2 values(2, 'test', '2017-11-27 23:59:59')", 0, 3, 1}, - {"insert into t2 values(3, 'test', '2017-11-28 23:59:59')", 0, 3, 1}, - } - s.runTestSQL(c, tkExpect, encoder, sqls, tableID) -} - -type prepareTestCase struct { - sql string - formatSQL string - param []interface{} -} - -func makePrepareTestCase(sql, formatSQL string, param ...interface{}) prepareTestCase { - return prepareTestCase{sql, formatSQL, param} -} - -func (s *testKvEncoderSuite) TestPrepareEncode(c *C) { - alloc := NewAllocator() - encoder, err := New("test", alloc) - c.Assert(err, IsNil) - defer encoder.Close() - - schemaSQL := "create table t(id int auto_increment, a char(10), primary key(id))" - err = encoder.ExecDDLSQL(schemaSQL) - c.Assert(err, IsNil) - - cases := []prepareTestCase{ - makePrepareTestCase("insert into t values(1, 'test')", "insert into t values(?, ?)", 1, "test"), - makePrepareTestCase("insert into t(a) values('test')", "insert into t(a) values(?)", "test"), - makePrepareTestCase("insert into t(a) values('test'), ('test1')", "insert into t(a) values(?), (?)", "test", "test1"), - makePrepareTestCase("insert into t(id, a) values(3, 'test')", "insert into t(id, a) values(?, ?)", 3, "test"), - } - tableID := int64(1) - - for _, ca := range cases { - s.comparePrepareAndNormalEncode(c, alloc, tableID, encoder, ca.sql, ca.formatSQL, ca.param...) - } -} - -func (s *testKvEncoderSuite) comparePrepareAndNormalEncode(c *C, alloc autoid.Allocator, tableID int64, encoder KvEncoder, sql, prepareFormat string, param ...interface{}) { - comment := fmt.Sprintf("sql:%v", sql) - baseID := alloc.Base() - kvPairsExpect, affectedRowsExpect, err := encoder.Encode(sql, tableID) - c.Assert(err, IsNil, Commentf(comment)) - - stmtID, err := encoder.PrepareStmt(prepareFormat) - c.Assert(err, IsNil, Commentf(comment)) - alloc.(*Allocator).Reset(baseID) - kvPairs, affectedRows, err := encoder.EncodePrepareStmt(tableID, stmtID, param...) - c.Assert(err, IsNil, Commentf(comment)) - c.Assert(affectedRows, Equals, affectedRowsExpect, Commentf(comment)) - c.Assert(len(kvPairs), Equals, len(kvPairsExpect), Commentf(comment)) - - for i, kvPair := range kvPairs { - kvPairExpect := kvPairsExpect[i] - c.Assert(bytes.Compare(kvPair.Key, kvPairExpect.Key), Equals, 0, Commentf(comment)) - c.Assert(bytes.Compare(kvPair.Val, kvPairExpect.Val), Equals, 0, Commentf(comment)) - } -} - -func (s *testKvEncoderSuite) TestInsertPkIsNotHandle(c *C) { - tkExpect := testkit.NewTestKit(c, s.store) - tkExpect.MustExec("use test") - - var tableID int64 = 1 - encoder, err := New("test", nil) - c.Assert(err, IsNil) - defer encoder.Close() - - tkExpect.MustExec("drop table if exists t") - schemaSQL := `create table t( - id varchar(20), - a char(10), - primary key(id))` - tkExpect.MustExec(schemaSQL) - c.Assert(encoder.ExecDDLSQL(schemaSQL), IsNil) - - sqls := []testCase{ - {"insert into t values(1, 'test');", 0, 2, 1}, - {"insert into t(id, a) values(2, 'test')", 0, 2, 1}, - {"insert into t(id, a) values(3, 'test')", 0, 2, 1}, - {"insert into t values(1000000, 'test')", 0, 2, 1}, - {"insert into t(id, a) values(5, 'test')", 0, 2, 1}, - {"insert into t(id, a) values(4, 'test')", 0, 2, 1}, - {"insert into t(id, a) values(6, 'test'), (7, 'test'), (8, 'test')", 0, 6, 3}, - } - - s.runTestSQL(c, tkExpect, encoder, sqls, tableID) -} - -func (s *testKvEncoderSuite) TestRetryWithAllocator(c *C) { - tk := testkit.NewTestKit(c, s.store) - - tk.MustExec("use test") - alloc := NewAllocator() - var tableID int64 = 1 - encoder, err := New("test", alloc) - c.Assert(err, IsNil) - defer encoder.Close() - - tk.MustExec("drop table if exists t") - schemaSQL := `create table t( - id int auto_increment, - a char(10), - primary key(id))` - tk.MustExec(schemaSQL) - c.Assert(encoder.ExecDDLSQL(schemaSQL), IsNil) - - sqls := []string{ - "insert into t(a) values('test');", - "insert into t(a) values('test'), ('1'), ('2'), ('3');", - "insert into t(id, a) values(1000000, 'test')", - "insert into t(id, a) values(5, 'test')", - "insert into t(id, a) values(4, 'test')", - } - - for _, sql := range sqls { - baseID := alloc.Base() - kvPairs, _, err1 := encoder.Encode(sql, tableID) - c.Assert(err1, IsNil, Commentf("sql:%s", sql)) - alloc.Reset(baseID) - retryKvPairs, _, err1 := encoder.Encode(sql, tableID) - c.Assert(err1, IsNil, Commentf("sql:%s", sql)) - c.Assert(len(kvPairs), Equals, len(retryKvPairs)) - for i, row := range kvPairs { - c.Assert(bytes.Compare(row.Key, retryKvPairs[i].Key), Equals, 0, Commentf(sql)) - c.Assert(bytes.Compare(row.Val, retryKvPairs[i].Val), Equals, 0, Commentf(sql)) - } - } - - // specify id, it must be the same row - sql := "insert into t(id, a) values(5, 'test')" - kvPairs, _, err := encoder.Encode(sql, tableID) - c.Assert(err, IsNil, Commentf("sql:%s", sql)) - retryKvPairs, _, err := encoder.Encode(sql, tableID) - c.Assert(err, IsNil, Commentf("sql:%s", sql)) - c.Assert(len(kvPairs), Equals, len(retryKvPairs)) - for i, row := range kvPairs { - c.Assert(bytes.Compare(row.Key, retryKvPairs[i].Key), Equals, 0, Commentf(sql)) - c.Assert(bytes.Compare(row.Val, retryKvPairs[i].Val), Equals, 0, Commentf(sql)) - } - - tk.MustExec("drop table if exists t1") - schemaSQL = `create table t1( - id int auto_increment, - a char(10), - b char(10), - primary key(id), - KEY idx_a(a), - unique b_idx(b))` - tk.MustExec(schemaSQL) - c.Assert(encoder.ExecDDLSQL(schemaSQL), IsNil) - tableID = 2 - - sqls = []string{ - "insert into t1(a, b) values('test', 'b1');", - "insert into t1(a, b) values('test', 'b2'), ('1', 'b3'), ('2', 'b4'), ('3', 'b5');", - "insert into t1(id, a, b) values(1000000, 'test', 'b6')", - "insert into t1(id, a, b) values(5, 'test', 'b7')", - "insert into t1(id, a, b) values(4, 'test', 'b8')", - "insert into t1(a, b) values('test', 'b9');", - } - - for _, sql := range sqls { - baseID := alloc.Base() - kvPairs, _, err1 := encoder.Encode(sql, tableID) - c.Assert(err1, IsNil, Commentf("sql:%s", sql)) - alloc.Reset(baseID) - retryKvPairs, _, err1 := encoder.Encode(sql, tableID) - c.Assert(err1, IsNil, Commentf("sql:%s", sql)) - c.Assert(len(kvPairs), Equals, len(retryKvPairs)) - for i, row := range kvPairs { - c.Assert(bytes.Compare(row.Key, retryKvPairs[i].Key), Equals, 0, Commentf(sql)) - c.Assert(bytes.Compare(row.Val, retryKvPairs[i].Val), Equals, 0, Commentf(sql)) - } - } -} - -func (s *testKvEncoderSuite) TestAllocatorRebase(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - alloc := NewAllocator() - var tableID int64 = 1 - encoder, err := New("test", alloc) - c.Assert(err, IsNil) - err = alloc.Rebase(tableID, 100, false) - c.Assert(err, IsNil) - defer encoder.Close() - c.Assert(alloc.Base(), Equals, int64(100)) - - schemaSQL := `create table t( - id int auto_increment, - a char(10), - primary key(id))` - tk.MustExec(schemaSQL) - c.Assert(encoder.ExecDDLSQL(schemaSQL), IsNil) - - sql := "insert into t(id, a) values(1000, 'test')" - encoder.Encode(sql, tableID) - c.Assert(alloc.Base(), Equals, int64(1000)) - - sql = "insert into t(a) values('test')" - encoder.Encode(sql, tableID) - c.Assert(alloc.Base(), Equals, int64(1001)) - - sql = "insert into t(id, a) values(2000, 'test')" - encoder.Encode(sql, tableID) - c.Assert(alloc.Base(), Equals, int64(2000)) - c.Assert(alloc.End(), Equals, int64(2000)+step) - nextID, err := alloc.NextGlobalAutoID(tableID) - c.Assert(err, IsNil) - c.Assert(nextID, Equals, int64(2000)+step+1) -} - -func (s *testKvEncoderSuite) TestError(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - alloc := NewAllocator() - encoder, err := New("test", alloc) - c.Assert(err, IsNil) - defer encoder.Close() - _, err = New("", alloc) - c.Assert(err, NotNil) - c.Assert(err, ErrorMatches, "*Incorrect database name ''") - - err = encoder.ExecDDLSQL("x") - c.Assert(err, NotNil) - c.Assert(err, ErrorMatches, "*You have an error in your SQL syntax.*") - - _, err = encoder.PrepareStmt("") - c.Assert(err, NotNil) - c.Assert(err, ErrorMatches, "*Can not prepare multiple statements") - _, _, err = encoder.EncodePrepareStmt(0, 0, 0) - c.Assert(err, NotNil) - c.Assert(err, ErrorMatches, "*Prepared statement not found") - - encoder = &kvEncoder{} - err = encoder.SetSystemVariable("", "") - c.Assert(err, NotNil) - c.Assert(err, ErrorMatches, ".*please new KvEncoder by kvencoder.New.*") - _, ok := encoder.GetSystemVariable("") - c.Assert(ok, IsFalse) - c.Assert(err, ErrorMatches, ".*please new KvEncoder by kvencoder.New.*") -} - -func (s *testKvEncoderSuite) TestAllocatorRebaseSmaller(c *C) { - alloc := NewAllocator() - alloc.Rebase(1, 10, false) - c.Assert(alloc.Base(), Equals, int64(10)) - alloc.Rebase(1, 100, false) - c.Assert(alloc.Base(), Equals, int64(100)) - alloc.Rebase(1, 1, false) - c.Assert(alloc.Base(), Equals, int64(100)) - alloc.Reset(1) - c.Assert(alloc.Base(), Equals, int64(1)) -} - -func (s *testKvEncoderSuite) TestSimpleKeyEncode(c *C) { - encoder, err := New("test", nil) - c.Assert(err, IsNil) - defer encoder.Close() - - schemaSQL := `create table t( - id int auto_increment, - a char(10), - b char(10) default NULL, - primary key(id), - key a_idx(a)) - ` - encoder.ExecDDLSQL(schemaSQL) - tableID := int64(1) - indexID := int64(1) - - sql := "insert into t values(1, 'a', 'b')" - kvPairs, affectedRows, err := encoder.Encode(sql, tableID) - c.Assert(err, IsNil) - c.Assert(len(kvPairs), Equals, 2) - c.Assert(affectedRows, Equals, uint64(1)) - tablePrefix := tablecodec.GenTableRecordPrefix(tableID) - handle := int64(1) - expectRecordKey := tablecodec.EncodeRecordKey(tablePrefix, handle) - - sc := &stmtctx.StatementContext{TimeZone: time.Local} - indexPrefix := tablecodec.EncodeTableIndexPrefix(tableID, indexID) - expectIdxKey := make([]byte, 0) - expectIdxKey = append(expectIdxKey, []byte(indexPrefix)...) - expectIdxKey, err = codec.EncodeKey(sc, expectIdxKey, types.NewDatum([]byte("a"))) - c.Assert(err, IsNil) - expectIdxKey, err = codec.EncodeKey(sc, expectIdxKey, types.NewDatum(handle)) - c.Assert(err, IsNil) - - for _, row := range kvPairs { - tID, iID, isRecordKey, err1 := tablecodec.DecodeKeyHead(row.Key) - c.Assert(err1, IsNil) - c.Assert(tID, Equals, tableID) - if isRecordKey { - c.Assert(bytes.Compare(row.Key, expectRecordKey), Equals, 0) - } else { - c.Assert(iID, Equals, indexID) - c.Assert(bytes.Compare(row.Key, expectIdxKey), Equals, 0) - } - } - - // unique index key - schemaSQL = `create table t1( - id int auto_increment, - a char(10), - primary key(id), - unique a_idx(a)) - ` - encoder.ExecDDLSQL(schemaSQL) - tableID = int64(2) - sql = "insert into t1 values(1, 'a')" - kvPairs, affectedRows, err = encoder.Encode(sql, tableID) - c.Assert(err, IsNil) - c.Assert(len(kvPairs), Equals, 2) - c.Assert(affectedRows, Equals, uint64(1)) - - tablePrefix = tablecodec.GenTableRecordPrefix(tableID) - handle = int64(1) - expectRecordKey = tablecodec.EncodeRecordKey(tablePrefix, handle) - - indexPrefix = tablecodec.EncodeTableIndexPrefix(tableID, indexID) - expectIdxKey = []byte{} - expectIdxKey = append(expectIdxKey, []byte(indexPrefix)...) - expectIdxKey, err = codec.EncodeKey(sc, expectIdxKey, types.NewDatum([]byte("a"))) - c.Assert(err, IsNil) - - for _, row := range kvPairs { - tID, iID, isRecordKey, err1 := tablecodec.DecodeKeyHead(row.Key) - c.Assert(err1, IsNil) - c.Assert(tID, Equals, tableID) - if isRecordKey { - c.Assert(bytes.Compare(row.Key, expectRecordKey), Equals, 0) - } else { - c.Assert(iID, Equals, indexID) - c.Assert(bytes.Compare(row.Key, expectIdxKey), Equals, 0) - } - } -} - -var ( - mMetaPrefix = []byte("m") - mDBPrefix = "DB" - mTableIDPrefix = "TID" -) - -func dbKey(dbID int64) []byte { - return []byte(fmt.Sprintf("%s:%d", mDBPrefix, dbID)) -} - -func autoTableIDKey(tableID int64) []byte { - return []byte(fmt.Sprintf("%s:%d", mTableIDPrefix, tableID)) -} - -func encodeHashDataKey(key []byte, field []byte) kv.Key { - ek := make([]byte, 0, len(mMetaPrefix)+len(key)+len(field)+30) - ek = append(ek, mMetaPrefix...) - ek = codec.EncodeBytes(ek, key) - ek = codec.EncodeUint(ek, uint64(structure.HashData)) - return codec.EncodeBytes(ek, field) -} - -func hashFieldIntegerVal(val int64) []byte { - return []byte(strconv.FormatInt(val, 10)) -} - -func (s *testKvEncoderSuite) TestEncodeMetaAutoID(c *C) { - encoder, err := New("test", nil) - c.Assert(err, IsNil) - defer encoder.Close() - - dbID := int64(1) - tableID := int64(10) - autoID := int64(10000000111) - kvPair, err := encoder.EncodeMetaAutoID(dbID, tableID, autoID) - c.Assert(err, IsNil) - - expectKey := encodeHashDataKey(dbKey(dbID), autoTableIDKey(tableID)) - expectVal := hashFieldIntegerVal(autoID) - - c.Assert(bytes.Compare(kvPair.Key, expectKey), Equals, 0) - c.Assert(bytes.Compare(kvPair.Val, expectVal), Equals, 0) - - dbID = 10 - tableID = 1 - autoID = -1 - kvPair, err = encoder.EncodeMetaAutoID(dbID, tableID, autoID) - c.Assert(err, IsNil) - - expectKey = encodeHashDataKey(dbKey(dbID), autoTableIDKey(tableID)) - expectVal = hashFieldIntegerVal(autoID) - - c.Assert(bytes.Compare(kvPair.Key, expectKey), Equals, 0) - c.Assert(bytes.Compare(kvPair.Val, expectVal), Equals, 0) -} - -func (s *testKvEncoderSuite) TestGetSetSystemVariable(c *C) { - encoder, err := New("test", nil) - c.Assert(err, IsNil) - defer encoder.Close() - - err = encoder.SetSystemVariable("sql_mode", "") - c.Assert(err, IsNil) - - val, ok := encoder.GetSystemVariable("sql_mode") - c.Assert(ok, IsTrue) - c.Assert(val, Equals, "") - - sqlMode := "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" - err = encoder.SetSystemVariable("sql_mode", sqlMode) - c.Assert(err, IsNil) - - val, ok = encoder.GetSystemVariable("sql_mode") - c.Assert(ok, IsTrue) - c.Assert(val, Equals, sqlMode) - - val, ok = encoder.GetSystemVariable("SQL_MODE") - c.Assert(ok, IsTrue) - c.Assert(val, Equals, sqlMode) -} - -func (s *testKvEncoderSuite) TestDisableStrictSQLMode(c *C) { - sql := "create table `ORDER-LINE` (" + - " ol_w_id integer not null," + - " ol_d_id integer not null," + - " ol_o_id integer not null," + - " ol_number integer not null," + - " ol_i_id integer not null," + - " ol_delivery_d timestamp DEFAULT CURRENT_TIMESTAMP," + - " ol_amount decimal(6,2)," + - " ol_supply_w_id integer," + - " ol_quantity decimal(2,0)," + - " ol_dist_info char(24)," + - " primary key (ol_w_id, ol_d_id, ol_o_id, ol_number)" + - ");" - - encoder, err := New("test", nil) - c.Assert(err, IsNil) - defer encoder.Close() - - err = encoder.ExecDDLSQL(sql) - c.Assert(err, IsNil) - tableID := int64(1) - sql = "insert into `ORDER-LINE` values(2, 1, 1, 1, 1, 'NULL', 1, 1, 1, '1');" - _, _, err = encoder.Encode(sql, tableID) - c.Assert(err, NotNil) - - err = encoder.SetSystemVariable("sql_mode", "") - c.Assert(err, IsNil) - - sql = "insert into `ORDER-LINE` values(2, 1, 1, 1, 1, 'NULL', 1, 1, 1, '1');" - _, _, err = encoder.Encode(sql, tableID) - c.Assert(err, IsNil) -} - -func (s *testKvEncoderSuite) TestRefCount(c *C) { - var err error - var a [10]KvEncoder - for i := 0; i < 10; i++ { - a[i], err = New("test", nil) - c.Assert(err, IsNil) - } - dom1 := domGlobal - c.Assert(refCount, Equals, int64(10)) - a[0].Close() - a[1].Close() - dom2 := domGlobal - c.Assert(refCount, Equals, int64(8)) - c.Assert(dom1, Equals, dom2) - - for i := 2; i < 9; i++ { - a[i].Close() - } - dom3 := domGlobal - c.Assert(refCount, Equals, int64(1)) - c.Assert(dom3, Equals, dom2) - - a[9].Close() - c.Assert(refCount, Equals, int64(0)) - - tmp, err := New("test", nil) - c.Assert(err, IsNil) - dom4 := domGlobal - c.Assert(dom4 == dom3, IsFalse) - c.Assert(refCount, Equals, int64(1)) - tmp.Close() - c.Assert(refCount, Equals, int64(0)) -} - -// TestExoticDatabaseName checks if https://github.com/pingcap/tidb/issues/9532 -// is fixed. -func (s *testKvEncoderSuite) TestExoticDatabaseName(c *C) { - encoder1, err := New("pay-service_micro_db", nil) - c.Assert(err, IsNil) - encoder1.Close() - - encoder2, err := New("㎩¥`𝕊ℯ®Ⅵ₠—🎤肉 ㏈", nil) - c.Assert(err, IsNil) - encoder2.Close() -} diff --git a/util/mock/context.go b/util/mock/context.go index c2a8cae86e811..99a086ef9bf05 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -157,15 +157,11 @@ func (c *Context) InitTxnWithStartTS(startTS uint64) error { return nil } if c.Store != nil { - membufCap := kv.DefaultTxnMembufCap - if c.sessionVars.LightningMode { - membufCap = kv.ImportingTxnMembufCap - } txn, err := c.Store.BeginWithStartTS(startTS) if err != nil { return errors.Trace(err) } - txn.SetCap(membufCap) + txn.SetCap(kv.DefaultTxnMembufCap) c.txn.Transaction = txn } return nil