From dcb1122d1cfe7d3fe67cc6e2701c1916683abe41 Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 21 Oct 2020 14:25:48 +0800 Subject: [PATCH 1/3] progress: removed the progress struct --- pkg/gluetikv/glue.go | 31 +---------------------- pkg/utils/progress.go | 52 +++++++++++++++++++++++++------------- pkg/utils/progress_test.go | 27 +++++++++++--------- 3 files changed, 51 insertions(+), 59 deletions(-) diff --git a/pkg/gluetikv/glue.go b/pkg/gluetikv/glue.go index 79f2e6393..cde4b406e 100644 --- a/pkg/gluetikv/glue.go +++ b/pkg/gluetikv/glue.go @@ -4,13 +4,11 @@ package gluetikv import ( "context" - "sync/atomic" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv" - "github.com/prometheus/common/log" pd "github.com/tikv/pd/client" "github.com/pingcap/br/pkg/glue" @@ -50,37 +48,10 @@ func (Glue) OwnsStorage() bool { // StartProgress implements glue.Glue. func (Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress { - return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog), closed: 0} + return utils.StartProgress(ctx, cmdName, total, redirectLog) } // Record implements glue.Glue. func (Glue) Record(name string, val uint64) { summary.CollectUint(name, val) } - -type progress struct { - ch chan<- struct{} - closed int32 -} - -// Inc implements glue.Progress. -func (p progress) Inc() { - if atomic.LoadInt32(&p.closed) != 0 { - log.Warn("proposing a closed progress") - return - } - // there might be buggy if the thread is yielded here. - // however, there should not be gosched, at most time. - // so send here probably is safe, even not totally safe. - // since adding an extra lock should be costly, we just be optimistic. - // (Maybe a spin lock here would be better?) - p.ch <- struct{}{} -} - -// Close implements glue.Progress. -func (p progress) Close() { - // set closed to true firstly, - // so we won't see a state that the channel is closed and the p.closed is false. - atomic.StoreInt32(&p.closed, 1) - close(p.ch) -} diff --git a/pkg/utils/progress.go b/pkg/utils/progress.go index 75121bbc4..801dbd383 100644 --- a/pkg/utils/progress.go +++ b/pkg/utils/progress.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" "io" + "sync/atomic" "time" "github.com/cheggaaa/pb/v3" @@ -13,13 +14,24 @@ import ( "go.uber.org/zap" ) +type atomicAdder int64 + +func (a *atomicAdder) add(n int64) int64 { + return atomic.AddInt64((*int64)(a), n) +} + +func (a *atomicAdder) get() int64 { + return atomic.LoadInt64((*int64)(a)) +} + // ProgressPrinter prints a progress bar. type ProgressPrinter struct { name string total int64 redirectLog bool + progress atomicAdder - updateCh chan struct{} + cancel context.CancelFunc } // NewProgressPrinter returns a new progress printer. @@ -32,13 +44,17 @@ func NewProgressPrinter( name: name, total: total, redirectLog: redirectLog, - updateCh: make(chan struct{}, total/2), } } -// UpdateCh returns an update channel. -func (pp *ProgressPrinter) UpdateCh() chan<- struct{} { - return pp.updateCh +// Inc increases the current progress bar. +func (pp *ProgressPrinter) Inc() { + pp.progress.add(1) +} + +// Close closes the current progress bar. +func (pp *ProgressPrinter) Close() { + pp.cancel() } // goPrintProgress starts a gorouinte and prints progress. @@ -46,6 +62,8 @@ func (pp *ProgressPrinter) goPrintProgress( ctx context.Context, testWriter io.Writer, // Only for tests ) { + cctx, cancel := context.WithCancel(ctx) + pp.cancel = cancel bar := pb.New64(pp.total) if pp.redirectLog || testWriter != nil { tmpl := `{"P":"{{percent .}}","C":"{{counters . }}","E":"{{etime .}}","R":"{{rtime .}}","S":"{{speed .}}"}` @@ -64,7 +82,7 @@ func (pp *ProgressPrinter) goPrintProgress( } if testWriter != nil { bar.SetWriter(testWriter) - bar.SetRefreshRate(10 * time.Millisecond) + bar.SetRefreshRate(2 * time.Second) } bar.Start() @@ -73,22 +91,22 @@ func (pp *ProgressPrinter) goPrintProgress( defer t.Stop() defer bar.Finish() - var counter int64 for { select { - case <-ctx.Done(): - return - case _, ok := <-pp.updateCh: - if !ok { - bar.SetCurrent(pp.total) + case <-cctx.Done(): + // a hacky way to adapt the old behavior: + // when canceled by the outer context, leave the progress unchanged. + // when canceled by Close method (the 'internal' way), push the progress to 100%. + if ctx.Err() != nil { return } - counter++ + bar.SetCurrent(pp.total) + return case <-t.C: } - if counter <= pp.total { - bar.SetCurrent(counter) + if pp.progress.get() <= pp.total { + bar.SetCurrent(pp.progress.get()) } else { bar.SetCurrent(pp.total) } @@ -127,8 +145,8 @@ func StartProgress( name string, total int64, redirectLog bool, -) chan<- struct{} { +) *ProgressPrinter { progress := NewProgressPrinter(name, total, redirectLog) progress.goPrintProgress(ctx, nil) - return progress.UpdateCh() + return progress } diff --git a/pkg/utils/progress_test.go b/pkg/utils/progress_test.go index 1662ee0b9..9de874cbf 100644 --- a/pkg/utils/progress_test.go +++ b/pkg/utils/progress_test.go @@ -4,6 +4,7 @@ package utils import ( "context" + "time" . "github.com/pingcap/check" ) @@ -30,14 +31,16 @@ func (r *testProgressSuite) TestProgress(c *C) { progress2.goPrintProgress(ctx, &testWriter{ fn: func(p string) { pCh2 <- p }, }) - updateCh2 := progress2.UpdateCh() - updateCh2 <- struct{}{} + progress2.Inc() + time.Sleep(2 * time.Second) p = <-pCh2 c.Assert(p, Matches, `.*"P":"50\.00%".*`) - updateCh2 <- struct{}{} + progress2.Inc() + time.Sleep(2 * time.Second) p = <-pCh2 c.Assert(p, Matches, `.*"P":"100\.00%".*`) - updateCh2 <- struct{}{} + progress2.Inc() + time.Sleep(2 * time.Second) p = <-pCh2 c.Assert(p, Matches, `.*"P":"100\.00%".*`) @@ -46,12 +49,13 @@ func (r *testProgressSuite) TestProgress(c *C) { progress4.goPrintProgress(ctx, &testWriter{ fn: func(p string) { pCh4 <- p }, }) - updateCh4 := progress4.UpdateCh() - updateCh4 <- struct{}{} + progress4.Inc() + time.Sleep(2 * time.Second) p = <-pCh4 c.Assert(p, Matches, `.*"P":"25\.00%".*`) - updateCh4 <- struct{}{} - close(updateCh4) + progress4.Inc() + progress4.Close() + time.Sleep(2 * time.Second) p = <-pCh4 c.Assert(p, Matches, `.*"P":"100\.00%".*`) @@ -60,10 +64,9 @@ func (r *testProgressSuite) TestProgress(c *C) { progress8.goPrintProgress(ctx, &testWriter{ fn: func(p string) { pCh8 <- p }, }) - updateCh8 := progress8.UpdateCh() - updateCh8 <- struct{}{} - updateCh8 <- struct{}{} - <-pCh8 + progress8.Inc() + progress8.Inc() + time.Sleep(2 * time.Second) p = <-pCh8 c.Assert(p, Matches, `.*"P":"25\.00%".*`) From 3e17b400bfc31d43679e93a92602126b78fa363d Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 21 Oct 2020 15:11:00 +0800 Subject: [PATCH 2/3] progress: added a guard for cancel --- pkg/utils/progress.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/utils/progress.go b/pkg/utils/progress.go index 801dbd383..623107262 100644 --- a/pkg/utils/progress.go +++ b/pkg/utils/progress.go @@ -44,6 +44,9 @@ func NewProgressPrinter( name: name, total: total, redirectLog: redirectLog, + cancel: func() { + log.Warn("canceling non-started progress printer") + }, } } From 0c2f571c7ff3a1df74c47c618a815efba6f33799 Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 21 Oct 2020 17:06:03 +0800 Subject: [PATCH 3/3] progress: remove atomic adder --- pkg/utils/progress.go | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/pkg/utils/progress.go b/pkg/utils/progress.go index 623107262..d3903040e 100644 --- a/pkg/utils/progress.go +++ b/pkg/utils/progress.go @@ -14,22 +14,12 @@ import ( "go.uber.org/zap" ) -type atomicAdder int64 - -func (a *atomicAdder) add(n int64) int64 { - return atomic.AddInt64((*int64)(a), n) -} - -func (a *atomicAdder) get() int64 { - return atomic.LoadInt64((*int64)(a)) -} - // ProgressPrinter prints a progress bar. type ProgressPrinter struct { name string total int64 redirectLog bool - progress atomicAdder + progress int64 cancel context.CancelFunc } @@ -52,7 +42,7 @@ func NewProgressPrinter( // Inc increases the current progress bar. func (pp *ProgressPrinter) Inc() { - pp.progress.add(1) + atomic.AddInt64(&pp.progress, 1) } // Close closes the current progress bar. @@ -108,8 +98,9 @@ func (pp *ProgressPrinter) goPrintProgress( case <-t.C: } - if pp.progress.get() <= pp.total { - bar.SetCurrent(pp.progress.get()) + currentProgress := atomic.LoadInt64(&pp.progress) + if currentProgress <= pp.total { + bar.SetCurrent(currentProgress) } else { bar.SetCurrent(pp.total) }