From 7c245b069eaf7a1f735055cf885e8597b0a72932 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 17 Jun 2021 21:55:04 +0800 Subject: [PATCH 1/2] *: fix bug that write on temporary table send request to TiKV --- executor/executor_test.go | 7 ++--- kv/option.go | 2 ++ session/session.go | 48 ++++++++++------------------------ store/driver/txn/txn_driver.go | 2 ++ 4 files changed, 22 insertions(+), 37 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 151d32a2ffd9f..05ef96515735d 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8403,15 +8403,16 @@ func (s testSerialSuite) TestTemporaryTableNoNetwork(c *C) { tk.MustExec("set tidb_enable_global_temporary_table=true") tk.MustExec("create global temporary table tmp_t (id int, a int, index(a)) on commit delete rows") - tk.MustExec("begin") - tk.MustExec("insert into tmp_t values (1, 1)") - tk.MustExec("insert into tmp_t values (2, 2)") c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy", "return(true)"), IsNil) defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy"), IsNil) }() + tk.MustExec("begin") + tk.MustExec("insert into tmp_t values (1, 1)") + tk.MustExec("insert into tmp_t values (2, 2)") + // Make sure the fail point works. // With that failpoint, all requests to the TiKV is discard. rs, err := tk1.Exec("select * from normal") diff --git a/kv/option.go b/kv/option.go index de5a1d8834c40..6b1e830e12984 100644 --- a/kv/option.go +++ b/kv/option.go @@ -61,6 +61,8 @@ const ( MatchStoreLabels // ResourceGroupTag indicates the resource group of the kv request. ResourceGroupTag + // KVFilter indicates the filter to ignore key-values in the transaction's memory buffer. + KVFilter ) // ReplicaReadType is the type of replica to read data from diff --git a/session/session.go b/session/session.go index b504392f02174..d4109ffae83e9 100644 --- a/session/session.go +++ b/session/session.go @@ -81,7 +81,9 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sli" "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/tableutil" "github.com/pingcap/tidb/util/timeutil" + tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/tikv" tikvutil "github.com/tikv/client-go/v2/util" ) @@ -474,10 +476,6 @@ func (s *session) doCommit(ctx context.Context) error { if err != nil { return err } - if err = s.removeTempTableFromBuffer(); err != nil { - return err - } - // mockCommitError and mockGetTSErrorInRetry use to test PR #8743. failpoint.Inject("mockCommitError", func(val failpoint.Value) { if val.(bool) && tikv.IsMockCommitErrorEnable() { @@ -535,41 +533,23 @@ func (s *session) doCommit(ctx context.Context) error { s.txn.SetOption(kv.GuaranteeLinearizability, s.GetSessionVars().TxnCtx.IsExplicit && s.GetSessionVars().GuaranteeLinearizability) } + if tables := s.GetSessionVars().TxnCtx.GlobalTemporaryTables; len(tables) > 0 { + s.txn.SetOption(kv.KVFilter, temporaryTableKVFilter(tables)) + } return s.txn.Commit(tikvutil.SetSessionID(ctx, s.GetSessionVars().ConnectionID)) } -// removeTempTableFromBuffer filters out the temporary table key-values. -func (s *session) removeTempTableFromBuffer() error { - tables := s.GetSessionVars().TxnCtx.GlobalTemporaryTables - if len(tables) == 0 { - return nil - } - memBuffer := s.txn.GetMemBuffer() - // Reset and new an empty stage buffer. - defer func() { - s.txn.cleanup() - }() - for tid := range tables { - seekKey := tablecodec.EncodeTablePrefix(tid) - endKey := tablecodec.EncodeTablePrefix(tid + 1) - iter, err := memBuffer.Iter(seekKey, endKey) - if err != nil { - return err - } - for iter.Valid() && iter.Key().HasPrefix(seekKey) { - if err = memBuffer.Delete(iter.Key()); err != nil { - return err - } - s.txn.UpdateEntriesCountAndSize() - if err = iter.Next(); err != nil { - return err - } - } +type temporaryTableKVFilter map[int64]tableutil.TempTable + +func (m temporaryTableKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) bool { + tid := tablecodec.DecodeTableID(key) + if _, ok := m[tid]; ok { + return true } - // Flush to the root membuffer. - s.txn.flushStmtBuf() - return nil + + // This is the default filter for all tables. + return tablecodec.IsUntouchedIndexKValue(key, value) } // errIsNoisy is used to filter DUPLCATE KEY errors. diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 579c22fe1b6a7..3c0500e9934ff 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -176,6 +176,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { txn.KVTxn.GetSnapshot().SetMatchStoreLabels(val.([]*metapb.StoreLabel)) case kv.ResourceGroupTag: txn.KVTxn.SetResourceGroupTag(val.([]byte)) + case kv.KVFilter: + txn.KVTxn.SetKVFilter(val.(tikv.KVFilter)) } } From e768188f9a11f109f0e13f0d34b05655b0db7db1 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 18 Jun 2021 11:00:25 +0800 Subject: [PATCH 2/2] make fmt --- executor/executor_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 05ef96515735d..985144641c78e 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8403,7 +8403,6 @@ func (s testSerialSuite) TestTemporaryTableNoNetwork(c *C) { tk.MustExec("set tidb_enable_global_temporary_table=true") tk.MustExec("create global temporary table tmp_t (id int, a int, index(a)) on commit delete rows") - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy", "return(true)"), IsNil) defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy"), IsNil)