Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 1 addition & 30 deletions pkg/gluetikv/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ package gluetikv

import (
"context"
"sync/atomic"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
"github.com/prometheus/common/log"
pd "github.com/tikv/pd/client"

"github.com/pingcap/br/pkg/glue"
Expand Down Expand Up @@ -50,37 +48,10 @@ func (Glue) OwnsStorage() bool {

// StartProgress implements glue.Glue.
func (Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress {
return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog), closed: 0}
return utils.StartProgress(ctx, cmdName, total, redirectLog)
}

// Record implements glue.Glue.
func (Glue) Record(name string, val uint64) {
summary.CollectUint(name, val)
}

type progress struct {
ch chan<- struct{}
closed int32
}

// Inc implements glue.Progress.
func (p progress) Inc() {
if atomic.LoadInt32(&p.closed) != 0 {
log.Warn("proposing a closed progress")
return
}
// there might be buggy if the thread is yielded here.
// however, there should not be gosched, at most time.
// so send here probably is safe, even not totally safe.
// since adding an extra lock should be costly, we just be optimistic.
// (Maybe a spin lock here would be better?)
p.ch <- struct{}{}
}

// Close implements glue.Progress.
func (p progress) Close() {
// set closed to true firstly,
// so we won't see a state that the channel is closed and the p.closed is false.
atomic.StoreInt32(&p.closed, 1)
close(p.ch)
}
46 changes: 29 additions & 17 deletions pkg/utils/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding/json"
"io"
"sync/atomic"
"time"

"github.com/cheggaaa/pb/v3"
Expand All @@ -18,8 +19,9 @@ type ProgressPrinter struct {
name string
total int64
redirectLog bool
progress int64

updateCh chan struct{}
cancel context.CancelFunc
}

// NewProgressPrinter returns a new progress printer.
Expand All @@ -32,20 +34,29 @@ func NewProgressPrinter(
name: name,
total: total,
redirectLog: redirectLog,
updateCh: make(chan struct{}, total/2),
cancel: func() {
log.Warn("canceling non-started progress printer")
},
}
}

// UpdateCh returns an update channel.
func (pp *ProgressPrinter) UpdateCh() chan<- struct{} {
return pp.updateCh
// Inc increases the current progress bar.
func (pp *ProgressPrinter) Inc() {
atomic.AddInt64(&pp.progress, 1)
}

// Close closes the current progress bar.
func (pp *ProgressPrinter) Close() {
pp.cancel()
}

// goPrintProgress starts a gorouinte and prints progress.
func (pp *ProgressPrinter) goPrintProgress(
ctx context.Context,
testWriter io.Writer, // Only for tests
) {
cctx, cancel := context.WithCancel(ctx)
pp.cancel = cancel
bar := pb.New64(pp.total)
if pp.redirectLog || testWriter != nil {
tmpl := `{"P":"{{percent .}}","C":"{{counters . }}","E":"{{etime .}}","R":"{{rtime .}}","S":"{{speed .}}"}`
Expand All @@ -64,7 +75,7 @@ func (pp *ProgressPrinter) goPrintProgress(
}
if testWriter != nil {
bar.SetWriter(testWriter)
bar.SetRefreshRate(10 * time.Millisecond)
bar.SetRefreshRate(2 * time.Second)
Comment thread
kennytm marked this conversation as resolved.
}
bar.Start()

Expand All @@ -73,22 +84,23 @@ func (pp *ProgressPrinter) goPrintProgress(
defer t.Stop()
defer bar.Finish()

var counter int64
for {
select {
case <-ctx.Done():
return
case _, ok := <-pp.updateCh:
if !ok {
bar.SetCurrent(pp.total)
case <-cctx.Done():
// a hacky way to adapt the old behavior:
// when canceled by the outer context, leave the progress unchanged.
// when canceled by Close method (the 'internal' way), push the progress to 100%.
if ctx.Err() != nil {
return
}
counter++
bar.SetCurrent(pp.total)
return
case <-t.C:
}

if counter <= pp.total {
bar.SetCurrent(counter)
currentProgress := atomic.LoadInt64(&pp.progress)
if currentProgress <= pp.total {
bar.SetCurrent(currentProgress)
} else {
bar.SetCurrent(pp.total)
}
Expand Down Expand Up @@ -127,8 +139,8 @@ func StartProgress(
name string,
total int64,
redirectLog bool,
) chan<- struct{} {
) *ProgressPrinter {
progress := NewProgressPrinter(name, total, redirectLog)
progress.goPrintProgress(ctx, nil)
return progress.UpdateCh()
return progress
}
27 changes: 15 additions & 12 deletions pkg/utils/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package utils

import (
"context"
"time"

. "github.com/pingcap/check"
)
Expand All @@ -30,14 +31,16 @@ func (r *testProgressSuite) TestProgress(c *C) {
progress2.goPrintProgress(ctx, &testWriter{
fn: func(p string) { pCh2 <- p },
})
updateCh2 := progress2.UpdateCh()
updateCh2 <- struct{}{}
progress2.Inc()
time.Sleep(2 * time.Second)
p = <-pCh2
c.Assert(p, Matches, `.*"P":"50\.00%".*`)
updateCh2 <- struct{}{}
progress2.Inc()
time.Sleep(2 * time.Second)
p = <-pCh2
c.Assert(p, Matches, `.*"P":"100\.00%".*`)
updateCh2 <- struct{}{}
progress2.Inc()
time.Sleep(2 * time.Second)
p = <-pCh2
c.Assert(p, Matches, `.*"P":"100\.00%".*`)

Expand All @@ -46,12 +49,13 @@ func (r *testProgressSuite) TestProgress(c *C) {
progress4.goPrintProgress(ctx, &testWriter{
fn: func(p string) { pCh4 <- p },
})
updateCh4 := progress4.UpdateCh()
updateCh4 <- struct{}{}
progress4.Inc()
time.Sleep(2 * time.Second)
p = <-pCh4
c.Assert(p, Matches, `.*"P":"25\.00%".*`)
updateCh4 <- struct{}{}
close(updateCh4)
progress4.Inc()
progress4.Close()
time.Sleep(2 * time.Second)
p = <-pCh4
c.Assert(p, Matches, `.*"P":"100\.00%".*`)

Expand All @@ -60,10 +64,9 @@ func (r *testProgressSuite) TestProgress(c *C) {
progress8.goPrintProgress(ctx, &testWriter{
fn: func(p string) { pCh8 <- p },
})
updateCh8 := progress8.UpdateCh()
updateCh8 <- struct{}{}
updateCh8 <- struct{}{}
<-pCh8
progress8.Inc()
progress8.Inc()
time.Sleep(2 * time.Second)
p = <-pCh8
c.Assert(p, Matches, `.*"P":"25\.00%".*`)

Expand Down