Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ var (
localMaxTiKVVersion = version.NextMajorVersion()
localMaxPDVersion = version.NextMajorVersion()
tiFlashMinVersion = *semver.New("4.0.5")

errorEngineClosed = errors.New("engine is closed")
)

var (
Expand Down Expand Up @@ -146,6 +148,7 @@ type metaOrFlush struct {

type File struct {
localFileMeta
closed atomic.Bool
db *pebble.DB
UUID uuid.UUID
localWriters sync.Map
Expand Down Expand Up @@ -603,6 +606,9 @@ func (e *File) ingestSSTs(metas []*sstMeta) error {
// use raw RLock to avoid change the lock state during flushing.
e.mutex.RLock()
defer e.mutex.RUnlock()
if e.closed.Load() {
return errorEngineClosed
}
totalSize := int64(0)
totalCount := int64(0)
fileSize := int64(0)
Expand Down Expand Up @@ -896,8 +902,13 @@ func (local *local) tryRLockAllEngines() []*File {
var allEngines []*File
local.engines.Range(func(k, v interface{}) bool {
engine := v.(*File)
// skip closed engine
if engine.tryRLock() {
allEngines = append(allEngines, engine)
if !engine.closed.Load() {
allEngines = append(allEngines, engine)
} else {
engine.rUnlock()
}
}
return true
})
Expand Down Expand Up @@ -993,6 +1004,9 @@ func (local *local) FlushEngine(ctx context.Context, engineID uuid.UUID) error {
return errors.Errorf("engine '%s' not found", engineID)
}
defer engineFile.rUnlock()
if engineFile.closed.Load() {
return nil
}
return engineFile.flushEngineWithoutLock(ctx)
}

Expand Down Expand Up @@ -1124,9 +1138,20 @@ func (local *local) CloseEngine(ctx context.Context, engineUUID uuid.UUID) error

engineFile := engine.(*File)
engineFile.rLock()
if engineFile.closed.Load() {
engineFile.rUnlock()
return nil
}

err := engineFile.flushEngineWithoutLock(ctx)
engineFile.rUnlock()

// use mutex to make sure we won't close sstMetasChan while other routines
// trying to do flush.
engineFile.lock(importMutexStateClose)
engineFile.closed.Store(true)
close(engineFile.sstMetasChan)
engineFile.unlock()
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -2485,6 +2510,10 @@ func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames [
return nil
}

if w.local.closed.Load() {
return errorEngineClosed
}

w.Lock()
defer w.Unlock()

Expand Down
19 changes: 9 additions & 10 deletions pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"

. "github.com/pingcap/check"
Expand All @@ -34,6 +33,7 @@ import (
"github.com/pingcap/tidb/util/codec"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule/placement"
"go.uber.org/atomic"

"github.com/pingcap/br/pkg/restore"
)
Expand All @@ -44,7 +44,7 @@ type testClient struct {
regions map[uint64]*restore.RegionInfo
regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions
nextRegionID uint64
splitCount int
splitCount atomic.Int32
hook clientHook
}

Expand Down Expand Up @@ -150,7 +150,7 @@ func (c *testClient) BatchSplitRegionsWithOrigin(
) (*restore.RegionInfo, []*restore.RegionInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.splitCount++
c.splitCount.Inc()

if c.hook != nil {
regionInfo, keys = c.hook.BeforeSplitRegion(ctx, regionInfo, keys)
Expand Down Expand Up @@ -388,7 +388,7 @@ func (d defaultHook) check(c *C, cli *testClient) {
// 7. region: [bv, cca), keys: [bw, bx, by, bz]

// since it may encounter error retries, here only check the lower threshold.
c.Assert(cli.splitCount >= 7, IsTrue)
c.Assert(cli.splitCount.Load() >= 7, IsTrue)
}

func (s *localSuite) doTestBatchSplitRegionByRanges(ctx context.Context, c *C, hook clientHook, errPat string, splitHook batchSplitHook) {
Expand Down Expand Up @@ -474,7 +474,7 @@ func (h batchSizeHook) check(c *C, cli *testClient) {
// 10. region: [bv, cca), keys: [bx, by, bz]

// since it may encounter error retries, here only check the lower threshold.
c.Assert(cli.splitCount, Equals, 9)
c.Assert(cli.splitCount.Load(), Equals, int32(9))
}

func (s *localSuite) TestBatchSplitRegionByRangesKeySizeLimit(c *C) {
Expand Down Expand Up @@ -518,13 +518,12 @@ func (s *localSuite) TestBatchSplitByRangesEpochNotMatch(c *C) {
// return epoch not match error in every other call
type splitRegionEpochNotMatchHookRandom struct {
noopHook
cnt int32
cnt atomic.Int32
}

func (h *splitRegionEpochNotMatchHookRandom) BeforeSplitRegion(ctx context.Context, regionInfo *restore.RegionInfo, keys [][]byte) (*restore.RegionInfo, [][]byte) {
regionInfo, keys = h.noopHook.BeforeSplitRegion(ctx, regionInfo, keys)
cnt := atomic.AddInt32(&h.cnt, 1)
if cnt%2 != 0 {
if h.cnt.Inc() != 0 {
return regionInfo, keys
}
regionInfo = cloneRegion(regionInfo)
Expand All @@ -540,12 +539,12 @@ func (s *localSuite) TestBatchSplitByRangesEpochNotMatchOnce(c *C) {
type splitRegionNoValidKeyHook struct {
noopHook
returnErrTimes int32
errorCnt int32
errorCnt atomic.Int32
}

func (h *splitRegionNoValidKeyHook) BeforeSplitRegion(ctx context.Context, regionInfo *restore.RegionInfo, keys [][]byte) (*restore.RegionInfo, [][]byte) {
regionInfo, keys = h.noopHook.BeforeSplitRegion(ctx, regionInfo, keys)
if atomic.AddInt32(&h.errorCnt, 1) <= h.returnErrTimes {
if h.errorCnt.Inc() <= h.returnErrTimes {
// clean keys to trigger "no valid keys" error
keys = keys[:0]
}
Expand Down