diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 1c941b1e4..558ed4612 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -103,6 +103,8 @@ var ( localMaxTiKVVersion = version.NextMajorVersion() localMaxPDVersion = version.NextMajorVersion() tiFlashMinVersion = *semver.New("4.0.5") + + errorEngineClosed = errors.New("engine is closed") ) var ( @@ -146,6 +148,7 @@ type metaOrFlush struct { type File struct { localFileMeta + closed atomic.Bool db *pebble.DB UUID uuid.UUID localWriters sync.Map @@ -603,6 +606,9 @@ func (e *File) ingestSSTs(metas []*sstMeta) error { // use raw RLock to avoid change the lock state during flushing. e.mutex.RLock() defer e.mutex.RUnlock() + if e.closed.Load() { + return errorEngineClosed + } totalSize := int64(0) totalCount := int64(0) fileSize := int64(0) @@ -896,8 +902,13 @@ func (local *local) tryRLockAllEngines() []*File { var allEngines []*File local.engines.Range(func(k, v interface{}) bool { engine := v.(*File) + // skip closed engine if engine.tryRLock() { - allEngines = append(allEngines, engine) + if !engine.closed.Load() { + allEngines = append(allEngines, engine) + } else { + engine.rUnlock() + } } return true }) @@ -993,6 +1004,9 @@ func (local *local) FlushEngine(ctx context.Context, engineID uuid.UUID) error { return errors.Errorf("engine '%s' not found", engineID) } defer engineFile.rUnlock() + if engineFile.closed.Load() { + return nil + } return engineFile.flushEngineWithoutLock(ctx) } @@ -1124,9 +1138,20 @@ func (local *local) CloseEngine(ctx context.Context, engineUUID uuid.UUID) error engineFile := engine.(*File) engineFile.rLock() + if engineFile.closed.Load() { + engineFile.rUnlock() + return nil + } + err := engineFile.flushEngineWithoutLock(ctx) engineFile.rUnlock() + + // use mutex to make sure we won't close sstMetasChan while other routines + // trying to do flush. + engineFile.lock(importMutexStateClose) + engineFile.closed.Store(true) close(engineFile.sstMetasChan) + engineFile.unlock() if err != nil { return errors.Trace(err) } @@ -2485,6 +2510,10 @@ func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames [ return nil } + if w.local.closed.Load() { + return errorEngineClosed + } + w.Lock() defer w.Unlock() diff --git a/pkg/lightning/backend/local/localhelper_test.go b/pkg/lightning/backend/local/localhelper_test.go index d3323bc0b..073bfe99c 100644 --- a/pkg/lightning/backend/local/localhelper_test.go +++ b/pkg/lightning/backend/local/localhelper_test.go @@ -20,7 +20,6 @@ import ( "math/rand" "sort" "sync" - "sync/atomic" "time" . "github.com/pingcap/check" @@ -34,6 +33,7 @@ import ( "github.com/pingcap/tidb/util/codec" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule/placement" + "go.uber.org/atomic" "github.com/pingcap/br/pkg/restore" ) @@ -44,7 +44,7 @@ type testClient struct { regions map[uint64]*restore.RegionInfo regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions nextRegionID uint64 - splitCount int + splitCount atomic.Int32 hook clientHook } @@ -150,7 +150,7 @@ func (c *testClient) BatchSplitRegionsWithOrigin( ) (*restore.RegionInfo, []*restore.RegionInfo, error) { c.mu.Lock() defer c.mu.Unlock() - c.splitCount++ + c.splitCount.Inc() if c.hook != nil { regionInfo, keys = c.hook.BeforeSplitRegion(ctx, regionInfo, keys) @@ -388,7 +388,7 @@ func (d defaultHook) check(c *C, cli *testClient) { // 7. region: [bv, cca), keys: [bw, bx, by, bz] // since it may encounter error retries, here only check the lower threshold. - c.Assert(cli.splitCount >= 7, IsTrue) + c.Assert(cli.splitCount.Load() >= 7, IsTrue) } func (s *localSuite) doTestBatchSplitRegionByRanges(ctx context.Context, c *C, hook clientHook, errPat string, splitHook batchSplitHook) { @@ -474,7 +474,7 @@ func (h batchSizeHook) check(c *C, cli *testClient) { // 10. region: [bv, cca), keys: [bx, by, bz] // since it may encounter error retries, here only check the lower threshold. - c.Assert(cli.splitCount, Equals, 9) + c.Assert(cli.splitCount.Load(), Equals, int32(9)) } func (s *localSuite) TestBatchSplitRegionByRangesKeySizeLimit(c *C) { @@ -518,13 +518,12 @@ func (s *localSuite) TestBatchSplitByRangesEpochNotMatch(c *C) { // return epoch not match error in every other call type splitRegionEpochNotMatchHookRandom struct { noopHook - cnt int32 + cnt atomic.Int32 } func (h *splitRegionEpochNotMatchHookRandom) BeforeSplitRegion(ctx context.Context, regionInfo *restore.RegionInfo, keys [][]byte) (*restore.RegionInfo, [][]byte) { regionInfo, keys = h.noopHook.BeforeSplitRegion(ctx, regionInfo, keys) - cnt := atomic.AddInt32(&h.cnt, 1) - if cnt%2 != 0 { + if h.cnt.Inc() != 0 { return regionInfo, keys } regionInfo = cloneRegion(regionInfo) @@ -540,12 +539,12 @@ func (s *localSuite) TestBatchSplitByRangesEpochNotMatchOnce(c *C) { type splitRegionNoValidKeyHook struct { noopHook returnErrTimes int32 - errorCnt int32 + errorCnt atomic.Int32 } func (h *splitRegionNoValidKeyHook) BeforeSplitRegion(ctx context.Context, regionInfo *restore.RegionInfo, keys [][]byte) (*restore.RegionInfo, [][]byte) { regionInfo, keys = h.noopHook.BeforeSplitRegion(ctx, regionInfo, keys) - if atomic.AddInt32(&h.errorCnt, 1) <= h.returnErrTimes { + if h.errorCnt.Inc() <= h.returnErrTimes { // clean keys to trigger "no valid keys" error keys = keys[:0] }