From f97b42255268931af4cef5ca132d1682a37827f3 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 6 May 2021 19:29:44 +0800 Subject: [PATCH 1/5] make count atomic --- .../backend/local/localhelper_test.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/pkg/lightning/backend/local/localhelper_test.go b/pkg/lightning/backend/local/localhelper_test.go index 7c2c69778..71e8b9840 100644 --- a/pkg/lightning/backend/local/localhelper_test.go +++ b/pkg/lightning/backend/local/localhelper_test.go @@ -20,7 +20,6 @@ import ( "math/rand" "sort" "sync" - "sync/atomic" "time" . "github.com/pingcap/check" @@ -34,6 +33,7 @@ import ( "github.com/pingcap/tidb/util/codec" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule/placement" + "go.uber.org/atomic" "github.com/pingcap/br/pkg/restore" ) @@ -44,7 +44,7 @@ type testClient struct { regions map[uint64]*restore.RegionInfo regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions nextRegionID uint64 - splitCount int + splitCount atomic.Int32 hook clientHook } @@ -161,7 +161,7 @@ func (c *testClient) BatchSplitRegionsWithOrigin( default: } - c.splitCount++ + c.splitCount.Inc() c.mu.Lock() defer c.mu.Unlock() newRegions := make([]*restore.RegionInfo, 0) @@ -387,7 +387,7 @@ func (d defaultHook) check(c *C, cli *testClient) { // 7. region: [bv, cca), keys: [bw, bx, by, bz] // since it may encounter error retries, here only check the lower threshold. - c.Assert(cli.splitCount >= 7, IsTrue) + c.Assert(cli.splitCount.Load() >= 7, IsTrue) } func (s *localSuite) doTestBatchSplitRegionByRanges(ctx context.Context, c *C, hook clientHook, errPat string, splitHook batchSplitHook) { @@ -473,7 +473,7 @@ func (h batchSizeHook) check(c *C, cli *testClient) { // 10. region: [bv, cca), keys: [bx, by, bz] // since it may encounter error retries, here only check the lower threshold. - c.Assert(cli.splitCount, Equals, 9) + c.Assert(cli.splitCount.Load(), Equals, int32(9)) } func (s *localSuite) TestBatchSplitRegionByRangesKeySizeLimit(c *C) { @@ -517,13 +517,12 @@ func (s *localSuite) TestBatchSplitByRangesEpochNotMatch(c *C) { // return epoch not match error in every other call type splitRegionEpochNotMatchHookRandom struct { noopHook - cnt int32 + cnt atomic.Int32 } func (h *splitRegionEpochNotMatchHookRandom) BeforeSplitRegion(ctx context.Context, regionInfo *restore.RegionInfo, keys [][]byte) (*restore.RegionInfo, [][]byte) { regionInfo, keys = h.noopHook.BeforeSplitRegion(ctx, regionInfo, keys) - cnt := atomic.AddInt32(&h.cnt, 1) - if cnt%2 != 0 { + if h.cnt.Inc() != 0 { return regionInfo, keys } regionInfo = cloneRegion(regionInfo) @@ -539,12 +538,12 @@ func (s *localSuite) TestBatchSplitByRangesEpochNotMatchOnce(c *C) { type splitRegionNoValidKeyHook struct { noopHook returnErrTimes int32 - errorCnt int32 + errorCnt atomic.Int32 } func (h splitRegionNoValidKeyHook) BeforeSplitRegion(ctx context.Context, regionInfo *restore.RegionInfo, keys [][]byte) (*restore.RegionInfo, [][]byte) { regionInfo, keys = h.noopHook.BeforeSplitRegion(ctx, regionInfo, keys) - if atomic.AddInt32(&h.errorCnt, 1) <= h.returnErrTimes { + if h.errorCnt.Inc() <= h.returnErrTimes { // clean keys to trigger "no valid keys" error keys = keys[:0] } From 73073c8ad39b0471fc41466fa9c2dcd05a7d26b9 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 6 May 2021 19:54:42 +0800 Subject: [PATCH 2/5] avoid flush closed engine --- pkg/lightning/backend/local/local.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 791eda83b..e97f0526c 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -103,6 +103,8 @@ var ( localMaxTiKVVersion = version.NextMajorVersion() localMaxPDVersion = version.NextMajorVersion() tiFlashMinVersion = *semver.New("4.0.5") + + errorEngineClosed = errors.New("engine is closed") ) var ( @@ -146,6 +148,7 @@ type metaOrFlush struct { type File struct { localFileMeta + closed atomic.Bool db *pebble.DB UUID uuid.UUID localWriters sync.Map @@ -603,6 +606,9 @@ func (e *File) ingestSSTs(metas []*sstMeta) error { // use raw RLock to avoid change the lock state during flushing. e.mutex.RLock() defer e.mutex.RUnlock() + if e.closed.Load() { + return errorEngineClosed + } totalSize := int64(0) totalCount := int64(0) fileSize := int64(0) @@ -896,7 +902,8 @@ func (local *local) tryRLockAllEngines() []*File { var allEngines []*File local.engines.Range(func(k, v interface{}) bool { engine := v.(*File) - if engine.tryRLock() { + // skip closed engine + if engine.tryRLock() && !engine.closed.Load() { allEngines = append(allEngines, engine) } return true @@ -1123,9 +1130,15 @@ func (local *local) CloseEngine(ctx context.Context, engineUUID uuid.UUID) error } engineFile := engine.(*File) - engineFile.rLock() + engineFile.lock(importMutexStateClose) + defer engineFile.unlock() + if engineFile.closed.Load() { + return nil + } + engineFile.closed.Store(true) + err := engineFile.flushEngineWithoutLock(ctx) - engineFile.rUnlock() + engineFile.unlock() close(engineFile.sstMetasChan) if err != nil { return errors.Trace(err) @@ -2480,6 +2493,10 @@ func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames [ return nil } + if w.local.closed.Load() { + return errorEngineClosed + } + w.Lock() defer w.Unlock() From e8f5f9c51f9118ff307781d647f0547b2c95d4ce Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 7 May 2021 13:30:52 +0800 Subject: [PATCH 3/5] fix a deadlock and simplify the code --- pkg/lightning/backend/local/local.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index e97f0526c..00d8ecd95 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -903,8 +903,12 @@ func (local *local) tryRLockAllEngines() []*File { local.engines.Range(func(k, v interface{}) bool { engine := v.(*File) // skip closed engine - if engine.tryRLock() && !engine.closed.Load() { - allEngines = append(allEngines, engine) + if engine.tryRLock() { + if !engine.closed.Load() { + allEngines = append(allEngines, engine) + } else { + engine.rUnlock() + } } return true }) @@ -1132,10 +1136,9 @@ func (local *local) CloseEngine(ctx context.Context, engineUUID uuid.UUID) error engineFile := engine.(*File) engineFile.lock(importMutexStateClose) defer engineFile.unlock() - if engineFile.closed.Load() { + if engineFile.closed.Swap(true) { return nil } - engineFile.closed.Store(true) err := engineFile.flushEngineWithoutLock(ctx) engineFile.unlock() From abfc9e981931d47e7e28f9a23c825770a05d6a55 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 10 May 2021 10:48:49 +0800 Subject: [PATCH 4/5] fix unlock --- pkg/lightning/backend/local/local.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 00d8ecd95..36a74e6f8 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -1135,8 +1135,8 @@ func (local *local) CloseEngine(ctx context.Context, engineUUID uuid.UUID) error engineFile := engine.(*File) engineFile.lock(importMutexStateClose) - defer engineFile.unlock() if engineFile.closed.Swap(true) { + engineFile.unlock() return nil } From b31b1417319edb8d9cbb3620b3b3c1730564ad83 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 10 May 2021 19:45:34 +0800 Subject: [PATCH 5/5] fix dead lock --- pkg/lightning/backend/local/local.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 36a74e6f8..d814917fe 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -1004,6 +1004,9 @@ func (local *local) FlushEngine(ctx context.Context, engineID uuid.UUID) error { return errors.Errorf("engine '%s' not found", engineID) } defer engineFile.rUnlock() + if engineFile.closed.Load() { + return nil + } return engineFile.flushEngineWithoutLock(ctx) } @@ -1134,15 +1137,21 @@ func (local *local) CloseEngine(ctx context.Context, engineUUID uuid.UUID) error } engineFile := engine.(*File) - engineFile.lock(importMutexStateClose) - if engineFile.closed.Swap(true) { - engineFile.unlock() + engineFile.rLock() + if engineFile.closed.Load() { + engineFile.rUnlock() return nil } err := engineFile.flushEngineWithoutLock(ctx) - engineFile.unlock() + engineFile.rUnlock() + + // use mutex to make sure we won't close sstMetasChan while other routines + // trying to do flush. + engineFile.lock(importMutexStateClose) + engineFile.closed.Store(true) close(engineFile.sstMetasChan) + engineFile.unlock() if err != nil { return errors.Trace(err) }