From 0a90d40912cadf9eab39eafa4b99dc3c7e09bf1e Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 24 May 2021 12:09:33 +0800 Subject: [PATCH] This is an automated cherry-pick of #1086 Signed-off-by: ti-chi-bot --- pkg/lightning/backend/local/local.go | 384 ++++++++++++++++++ .../backend/local/localhelper_test.go | 83 +++- 2 files changed, 460 insertions(+), 7 deletions(-) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index afef7b043..2fb839c3c 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -110,6 +110,8 @@ var ( localMaxTiKVVersion = version.NextMajorVersion() localMaxPDVersion = version.NextMajorVersion() tiFlashMinVersion = *semver.New("4.0.5") + + errorEngineClosed = errors.New("engine is closed") ) var ( @@ -145,6 +147,7 @@ const ( type File struct { localFileMeta + closed atomic.Bool db *pebble.DB UUID uuid.UUID localWriters sync.Map @@ -257,6 +260,318 @@ func (e *File) unlock() { e.mutex.Unlock() } +<<<<<<< HEAD +======= +type intHeap struct { + arr []int32 +} + +func (h *intHeap) Len() int { + return len(h.arr) +} + +func (h *intHeap) Less(i, j int) bool { + return h.arr[i] < h.arr[j] +} + +func (h *intHeap) Swap(i, j int) { + h.arr[i], h.arr[j] = h.arr[j], h.arr[i] +} + +func (h *intHeap) Push(x interface{}) { + h.arr = append(h.arr, x.(int32)) +} + +func (h *intHeap) Pop() interface{} { + item := h.arr[len(h.arr)-1] + h.arr = h.arr[:len(h.arr)-1] + return item +} + +func (e *File) ingestSSTLoop() { + defer e.wg.Done() + + type flushSeq struct { + seq int32 + ch chan struct{} + } + + seq := atomic.NewInt32(0) + finishedSeq := atomic.NewInt32(0) + var seqLock sync.Mutex + // a flush is finished iff all the compaction&ingest tasks with a lower seq number are finished. + flushQueue := make([]flushSeq, 0) + // inSyncSeqs is a heap that stores all the finished compaction tasks whose seq is bigger than `finishedSeq + 1` + // this mean there are still at lease one compaction task with a lower seq unfinished. + inSyncSeqs := &intHeap{arr: make([]int32, 0)} + + type metaAndSeq struct { + metas []*sstMeta + seq int32 + } + + concurrency := e.config.CompactConcurrency + // when compaction is disabled, ingest is an serial action, so 1 routine is enough + if !e.config.Compact { + concurrency = 1 + } + metaChan := make(chan metaAndSeq, concurrency) + for i := 0; i < concurrency; i++ { + e.wg.Add(1) + go func() { + defer e.wg.Done() + defer func() { + if e.ingestErr.Get() != nil { + seqLock.Lock() + for _, f := range flushQueue { + f.ch <- struct{}{} + } + flushQueue = flushQueue[:0] + seqLock.Unlock() + } + }() + for { + select { + case <-e.ctx.Done(): + return + case metas, ok := <-metaChan: + if !ok { + return + } + ingestMetas := metas.metas + if e.config.Compact { + newMeta, err := e.sstIngester.mergeSSTs(metas.metas, e.sstDir) + if err != nil { + e.setError(err) + return + } + ingestMetas = []*sstMeta{newMeta} + } + + if err := e.batchIngestSSTs(ingestMetas); err != nil { + e.setError(err) + return + } + seqLock.Lock() + finSeq := finishedSeq.Load() + if metas.seq == finSeq+1 { + finSeq = metas.seq + for len(inSyncSeqs.arr) > 0 { + if inSyncSeqs.arr[0] == finSeq+1 { + finSeq++ + heap.Remove(inSyncSeqs, 0) + } else { + break + } + } + + var flushChans []chan struct{} + for _, seq := range flushQueue { + if seq.seq <= finSeq { + flushChans = append(flushChans, seq.ch) + } else { + break + } + } + flushQueue = flushQueue[len(flushChans):] + finishedSeq.Store(finSeq) + seqLock.Unlock() + for _, c := range flushChans { + c <- struct{}{} + } + } else { + heap.Push(inSyncSeqs, metas.seq) + seqLock.Unlock() + } + } + } + }() + } + + compactAndIngestSSTs := func(metas []*sstMeta) { + if len(metas) > 0 { + seqLock.Lock() + metaSeq := seq.Add(1) + seqLock.Unlock() + select { + case <-e.ctx.Done(): + case metaChan <- metaAndSeq{metas: metas, seq: metaSeq}: + } + } + } + + pendingMetas := make([]*sstMeta, 0, 16) + totalSize := int64(0) + metasTmp := make([]*sstMeta, 0) + addMetas := func() { + if len(metasTmp) == 0 { + return + } + metas := metasTmp + metasTmp = make([]*sstMeta, 0, len(metas)) + if !e.config.Compact { + compactAndIngestSSTs(metas) + return + } + for _, m := range metas { + if m.totalCount > 0 { + pendingMetas = append(pendingMetas, m) + totalSize += m.totalSize + if totalSize >= e.config.CompactThreshold { + compactMetas := pendingMetas + pendingMetas = make([]*sstMeta, 0, len(pendingMetas)) + totalSize = 0 + compactAndIngestSSTs(compactMetas) + } + } + } + } +readMetaLoop: + for { + closed := false + select { + case <-e.ctx.Done(): + close(metaChan) + return + case m, ok := <-e.sstMetasChan: + if !ok { + closed = true + break + } + if m.flushCh != nil { + // meet a flush event, we should trigger a ingest task if there are pending metas, + // and then waiting for all the running flush tasks to be done. + if len(metasTmp) > 0 { + addMetas() + } + if len(pendingMetas) > 0 { + seqLock.Lock() + metaSeq := seq.Add(1) + flushQueue = append(flushQueue, flushSeq{ch: m.flushCh, seq: metaSeq}) + seqLock.Unlock() + select { + case metaChan <- metaAndSeq{metas: pendingMetas, seq: metaSeq}: + case <-e.ctx.Done(): + close(metaChan) + return + } + + pendingMetas = make([]*sstMeta, 0, len(pendingMetas)) + totalSize = 0 + } else { + // none remaining metas needed to be ingested + seqLock.Lock() + curSeq := seq.Load() + finSeq := finishedSeq.Load() + // if all pending SST files are written, directly do a db.Flush + if curSeq == finSeq { + seqLock.Unlock() + m.flushCh <- struct{}{} + } else { + // waiting for pending compaction tasks + flushQueue = append(flushQueue, flushSeq{ch: m.flushCh, seq: curSeq}) + seqLock.Unlock() + } + } + continue readMetaLoop + } + metasTmp = append(metasTmp, m.meta) + // try to drain all the sst meta from the chan to make sure all the SSTs are processed before handle a flush msg. + if len(e.sstMetasChan) > 0 { + continue readMetaLoop + } + + addMetas() + } + if closed { + compactAndIngestSSTs(pendingMetas) + close(metaChan) + return + } + } +} + +func (e *File) addSST(ctx context.Context, m *sstMeta) error { + // set pending size after SST file is generated + e.pendingFileSize.Add(m.fileSize) + select { + case e.sstMetasChan <- metaOrFlush{meta: m}: + case <-ctx.Done(): + return ctx.Err() + case <-e.ctx.Done(): + } + return e.ingestErr.Get() +} + +func (e *File) batchIngestSSTs(metas []*sstMeta) error { + if len(metas) == 0 { + return nil + } + sort.Slice(metas, func(i, j int) bool { + return bytes.Compare(metas[i].minKey, metas[j].minKey) < 0 + }) + + metaLevels := make([][]*sstMeta, 0) + for _, meta := range metas { + inserted := false + for i, l := range metaLevels { + if bytes.Compare(l[len(l)-1].maxKey, meta.minKey) >= 0 { + continue + } + metaLevels[i] = append(l, meta) + inserted = true + break + } + if !inserted { + metaLevels = append(metaLevels, []*sstMeta{meta}) + } + } + + for _, l := range metaLevels { + if err := e.ingestSSTs(l); err != nil { + return err + } + } + return nil +} + +func (e *File) ingestSSTs(metas []*sstMeta) error { + // use raw RLock to avoid change the lock state during flushing. + e.mutex.RLock() + defer e.mutex.RUnlock() + if e.closed.Load() { + return errorEngineClosed + } + totalSize := int64(0) + totalCount := int64(0) + fileSize := int64(0) + for _, m := range metas { + totalSize += m.totalSize + totalCount += m.totalCount + fileSize += m.fileSize + } + log.L().Info("write data to local DB", + zap.Int64("size", totalSize), + zap.Int64("kvs", totalCount), + zap.Int64("sstFileSize", fileSize), + zap.String("file", metas[0].path), + logutil.Key("firstKey", metas[0].minKey), + logutil.Key("lastKey", metas[len(metas)-1].maxKey)) + if err := e.sstIngester.ingest(metas); err != nil { + return errors.Trace(err) + } + count := int64(0) + size := int64(0) + for _, m := range metas { + count += m.totalCount + size += m.totalSize + } + e.Length.Add(count) + e.TotalSize.Add(size) + return nil +} + +>>>>>>> d0673106 (test: fix lighting unit test TestBatchSplitRegionByRanges and a panic issue (#1086)) func (e *File) flushLocalWriters(parentCtx context.Context) error { eg, ctx := errgroup.WithContext(parentCtx) e.localWriters.Range(func(k, v interface{}) bool { @@ -493,6 +808,27 @@ func (local *local) lockEngine(engineID uuid.UUID, state importMutexState) *File return nil } +<<<<<<< HEAD +======= +// tryRLockAllEngines tries to read lock all engines, return all `File`s that are successfully locked. +func (local *local) tryRLockAllEngines() []*File { + var allEngines []*File + local.engines.Range(func(k, v interface{}) bool { + engine := v.(*File) + // skip closed engine + if engine.tryRLock() { + if !engine.closed.Load() { + allEngines = append(allEngines, engine) + } else { + engine.rUnlock() + } + } + return true + }) + return allEngines +} + +>>>>>>> d0673106 (test: fix lighting unit test TestBatchSplitRegionByRanges and a panic issue (#1086)) // lockAllEnginesUnless tries to lock all engines, unless those which are already locked in the // state given by ignoreStateMask. Returns the list of locked engines. func (local *local) lockAllEnginesUnless(newState, ignoreStateMask importMutexState) []*File { @@ -582,7 +918,14 @@ func (local *local) FlushEngine(ctx context.Context, engineID uuid.UUID) error { if engineFile == nil { return errors.Errorf("engine '%s' not found", engineID) } +<<<<<<< HEAD defer engineFile.unlock() +======= + defer engineFile.rUnlock() + if engineFile.closed.Load() { + return nil + } +>>>>>>> d0673106 (test: fix lighting unit test TestBatchSplitRegionByRanges and a panic issue (#1086)) return engineFile.flushEngineWithoutLock(ctx) } @@ -676,9 +1019,32 @@ func (local *local) CloseEngine(ctx context.Context, engineUUID uuid.UUID) error return nil } engineFile := engine.(*File) +<<<<<<< HEAD engineFile.lock(importMutexStateFlush) defer engineFile.unlock() return engineFile.flushEngineWithoutLock(ctx) +======= + engineFile.rLock() + if engineFile.closed.Load() { + engineFile.rUnlock() + return nil + } + + err := engineFile.flushEngineWithoutLock(ctx) + engineFile.rUnlock() + + // use mutex to make sure we won't close sstMetasChan while other routines + // trying to do flush. + engineFile.lock(importMutexStateClose) + engineFile.closed.Store(true) + close(engineFile.sstMetasChan) + engineFile.unlock() + if err != nil { + return errors.Trace(err) + } + engineFile.wg.Wait() + return engineFile.ingestErr.Get() +>>>>>>> d0673106 (test: fix lighting unit test TestBatchSplitRegionByRanges and a panic issue (#1086)) } func (local *local) getImportClient(ctx context.Context, peer *metapb.Peer) (sst.ImportSSTClient, error) { @@ -1875,8 +2241,26 @@ func (w *Writer) AppendRowsAsync(ctx context.Context, tableName string, columnNa if len(kvs) == 0 { return nil } +<<<<<<< HEAD if err := w.writeErr.Get(); err != nil { return err +======= + + if w.local.closed.Load() { + return errorEngineClosed + } + + w.Lock() + defer w.Unlock() + + // if chunk has _tidb_rowid field, we can't ensure that the rows are sorted. + if w.isWriteBatchSorted && w.writer == nil { + for _, c := range columnNames { + if c == model.ExtraHandleName.L { + w.isWriteBatchSorted = false + } + } +>>>>>>> d0673106 (test: fix lighting unit test TestBatchSplitRegionByRanges and a panic issue (#1086)) } w.kvsChan <- kvs w.local.TS = ts diff --git a/pkg/lightning/backend/local/localhelper_test.go b/pkg/lightning/backend/local/localhelper_test.go index d4bdc0364..f73a3bbab 100644 --- a/pkg/lightning/backend/local/localhelper_test.go +++ b/pkg/lightning/backend/local/localhelper_test.go @@ -20,7 +20,6 @@ import ( "math/rand" "sort" "sync" - "sync/atomic" "time" . "github.com/pingcap/check" @@ -34,6 +33,7 @@ import ( "github.com/pingcap/tidb/util/codec" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule/placement" + "go.uber.org/atomic" "github.com/pingcap/br/pkg/restore" ) @@ -44,7 +44,7 @@ type testClient struct { regions map[uint64]*restore.RegionInfo regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions nextRegionID uint64 - splitCount int + splitCount atomic.Int32 hook clientHook } @@ -148,6 +148,13 @@ func (c *testClient) SplitRegion( func (c *testClient) BatchSplitRegionsWithOrigin( ctx context.Context, regionInfo *restore.RegionInfo, keys [][]byte, ) (*restore.RegionInfo, []*restore.RegionInfo, error) { +<<<<<<< HEAD +======= + c.mu.Lock() + defer c.mu.Unlock() + c.splitCount.Inc() + +>>>>>>> d0673106 (test: fix lighting unit test TestBatchSplitRegionByRanges and a panic issue (#1086)) if c.hook != nil { regionInfo, keys = c.hook.BeforeSplitRegion(ctx, regionInfo, keys) } @@ -366,7 +373,33 @@ func (s *localSuite) doTestBatchSplitRegionByRanges(ctx context.Context, c *C, h defer func() { maxBatchSplitKeys = oldLimit splitRegionBaseBackOffTime = oldSplitBackoffTime +<<<<<<< HEAD }() +======= + } +} + +func (d defaultHook) check(c *C, cli *testClient) { + // so with a batch split size of 4, there will be 7 time batch split + // 1. region: [aay, bba), keys: [b, ba, bb] + // 2. region: [bbh, cca), keys: [bc, bd, be, bf] + // 3. region: [bf, cca), keys: [bg, bh, bi, bj] + // 4. region: [bj, cca), keys: [bk, bl, bm, bn] + // 5. region: [bn, cca), keys: [bo, bp, bq, br] + // 6. region: [br, cca), keys: [bs, bt, bu, bv] + // 7. region: [bv, cca), keys: [bw, bx, by, bz] + + // since it may encounter error retries, here only check the lower threshold. + c.Assert(cli.splitCount.Load() >= 7, IsTrue) +} + +func (s *localSuite) doTestBatchSplitRegionByRanges(ctx context.Context, c *C, hook clientHook, errPat string, splitHook batchSplitHook) { + if splitHook == nil { + splitHook = defaultHook{} + } + deferFunc := splitHook.setup(c) + defer deferFunc() +>>>>>>> d0673106 (test: fix lighting unit test TestBatchSplitRegionByRanges and a panic issue (#1086)) keys := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")} client := initTestClient(keys, hook) @@ -425,7 +458,44 @@ func (s *localSuite) doTestBatchSplitRegionByRanges(ctx context.Context, c *C, h } func (s *localSuite) TestBatchSplitRegionByRanges(c *C) { +<<<<<<< HEAD s.doTestBatchSplitRegionByRanges(context.Background(), c, nil, "") +======= + s.doTestBatchSplitRegionByRanges(context.Background(), c, nil, "", nil) +} + +type batchSizeHook struct{} + +func (h batchSizeHook) setup(c *C) func() { + oldSizeLimit := maxBatchSplitSize + oldSplitBackoffTime := splitRegionBaseBackOffTime + maxBatchSplitSize = 6 + splitRegionBaseBackOffTime = time.Millisecond + return func() { + maxBatchSplitSize = oldSizeLimit + splitRegionBaseBackOffTime = oldSplitBackoffTime + } +} + +func (h batchSizeHook) check(c *C, cli *testClient) { + // so with a batch split key size of 6, there will be 9 time batch split + // 1. region: [aay, bba), keys: [b, ba, bb] + // 2. region: [bbh, cca), keys: [bc, bd, be] + // 3. region: [bf, cca), keys: [bf, bg, bh] + // 4. region: [bj, cca), keys: [bi, bj, bk] + // 5. region: [bj, cca), keys: [bl, bm, bn] + // 6. region: [bn, cca), keys: [bo, bp, bq] + // 7. region: [bn, cca), keys: [br, bs, bt] + // 9. region: [br, cca), keys: [bu, bv, bw] + // 10. region: [bv, cca), keys: [bx, by, bz] + + // since it may encounter error retries, here only check the lower threshold. + c.Assert(cli.splitCount.Load(), Equals, int32(9)) +} + +func (s *localSuite) TestBatchSplitRegionByRangesKeySizeLimit(c *C) { + s.doTestBatchSplitRegionByRanges(context.Background(), c, nil, "", batchSizeHook{}) +>>>>>>> d0673106 (test: fix lighting unit test TestBatchSplitRegionByRanges and a panic issue (#1086)) } type scanRegionEmptyHook struct { @@ -465,13 +535,12 @@ func (s *localSuite) TestBatchSplitByRangesEpochNotMatch(c *C) { // return epoch not match error in every other call type splitRegionEpochNotMatchHookRandom struct { noopHook - cnt int32 + cnt atomic.Int32 } func (h *splitRegionEpochNotMatchHookRandom) BeforeSplitRegion(ctx context.Context, regionInfo *restore.RegionInfo, keys [][]byte) (*restore.RegionInfo, [][]byte) { regionInfo, keys = h.noopHook.BeforeSplitRegion(ctx, regionInfo, keys) - cnt := atomic.AddInt32(&h.cnt, 1) - if cnt%2 != 0 { + if h.cnt.Inc() != 0 { return regionInfo, keys } regionInfo = cloneRegion(regionInfo) @@ -487,12 +556,12 @@ func (s *localSuite) TestBatchSplitByRangesEpochNotMatchOnce(c *C) { type splitRegionNoValidKeyHook struct { noopHook returnErrTimes int32 - errorCnt int32 + errorCnt atomic.Int32 } func (h splitRegionNoValidKeyHook) BeforeSplitRegion(ctx context.Context, regionInfo *restore.RegionInfo, keys [][]byte) (*restore.RegionInfo, [][]byte) { regionInfo, keys = h.noopHook.BeforeSplitRegion(ctx, regionInfo, keys) - if atomic.AddInt32(&h.errorCnt, 1) <= h.returnErrTimes { + if h.errorCnt.Inc() <= h.returnErrTimes { // clean keys to trigger "no valid keys" error keys = keys[:0] }