diff --git a/executor/executor_test.go b/executor/executor_test.go index 151d32a2ffd9f..985144641c78e 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8403,15 +8403,15 @@ func (s testSerialSuite) TestTemporaryTableNoNetwork(c *C) { tk.MustExec("set tidb_enable_global_temporary_table=true") tk.MustExec("create global temporary table tmp_t (id int, a int, index(a)) on commit delete rows") - tk.MustExec("begin") - tk.MustExec("insert into tmp_t values (1, 1)") - tk.MustExec("insert into tmp_t values (2, 2)") - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy", "return(true)"), IsNil) defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy"), IsNil) }() + tk.MustExec("begin") + tk.MustExec("insert into tmp_t values (1, 1)") + tk.MustExec("insert into tmp_t values (2, 2)") + // Make sure the fail point works. // With that failpoint, all requests to the TiKV is discard. rs, err := tk1.Exec("select * from normal") diff --git a/kv/option.go b/kv/option.go index de5a1d8834c40..6b1e830e12984 100644 --- a/kv/option.go +++ b/kv/option.go @@ -61,6 +61,8 @@ const ( MatchStoreLabels // ResourceGroupTag indicates the resource group of the kv request. ResourceGroupTag + // KVFilter indicates the filter to ignore key-values in the transaction's memory buffer. + KVFilter ) // ReplicaReadType is the type of replica to read data from diff --git a/session/session.go b/session/session.go index 0ff311d53c052..42b331f1b06f6 100644 --- a/session/session.go +++ b/session/session.go @@ -81,7 +81,9 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sli" "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/tableutil" "github.com/pingcap/tidb/util/timeutil" + tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/tikv" tikvutil "github.com/tikv/client-go/v2/util" ) @@ -474,10 +476,6 @@ func (s *session) doCommit(ctx context.Context) error { if err != nil { return err } - if err = s.removeTempTableFromBuffer(); err != nil { - return err - } - // mockCommitError and mockGetTSErrorInRetry use to test PR #8743. failpoint.Inject("mockCommitError", func(val failpoint.Value) { if val.(bool) && tikv.IsMockCommitErrorEnable() { @@ -535,41 +533,23 @@ func (s *session) doCommit(ctx context.Context) error { s.txn.SetOption(kv.GuaranteeLinearizability, s.GetSessionVars().TxnCtx.IsExplicit && s.GetSessionVars().GuaranteeLinearizability) } + if tables := s.GetSessionVars().TxnCtx.GlobalTemporaryTables; len(tables) > 0 { + s.txn.SetOption(kv.KVFilter, temporaryTableKVFilter(tables)) + } return s.txn.Commit(tikvutil.SetSessionID(ctx, s.GetSessionVars().ConnectionID)) } -// removeTempTableFromBuffer filters out the temporary table key-values. -func (s *session) removeTempTableFromBuffer() error { - tables := s.GetSessionVars().TxnCtx.GlobalTemporaryTables - if len(tables) == 0 { - return nil - } - memBuffer := s.txn.GetMemBuffer() - // Reset and new an empty stage buffer. - defer func() { - s.txn.cleanup() - }() - for tid := range tables { - seekKey := tablecodec.EncodeTablePrefix(tid) - endKey := tablecodec.EncodeTablePrefix(tid + 1) - iter, err := memBuffer.Iter(seekKey, endKey) - if err != nil { - return err - } - for iter.Valid() && iter.Key().HasPrefix(seekKey) { - if err = memBuffer.Delete(iter.Key()); err != nil { - return err - } - s.txn.UpdateEntriesCountAndSize() - if err = iter.Next(); err != nil { - return err - } - } +type temporaryTableKVFilter map[int64]tableutil.TempTable + +func (m temporaryTableKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) bool { + tid := tablecodec.DecodeTableID(key) + if _, ok := m[tid]; ok { + return true } - // Flush to the root membuffer. - s.txn.flushStmtBuf() - return nil + + // This is the default filter for all tables. + return tablecodec.IsUntouchedIndexKValue(key, value) } // errIsNoisy is used to filter DUPLCATE KEY errors. diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 579c22fe1b6a7..3c0500e9934ff 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -176,6 +176,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { txn.KVTxn.GetSnapshot().SetMatchStoreLabels(val.([]*metapb.StoreLabel)) case kv.ResourceGroupTag: txn.KVTxn.SetResourceGroupTag(val.([]byte)) + case kv.KVFilter: + txn.KVTxn.SetKVFilter(val.(tikv.KVFilter)) } }