Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions tools/replay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var fHost = flag.String("host", "127.0.0.1:6379", "Redis host")
var fClientBuffer = flag.Int("buffer", 100, "How many records to buffer per client")
var fPace = flag.Bool("pace", true, "whether to pace the traffic according to the original timings.false - to pace as fast as possible")
var fSkip = flag.Uint("skip", 0, "skip N records")
var fSkipTimeSec = flag.Int("skip-time-sec", 0, "skip records in the first N seconds of the recording")
var fIgnoreParseErrors = flag.Bool("ignore-parse-errors", false, "ignore parsing errors")

func RenderTable(area *pterm.AreaPrinter, files []string, workers []FileWorker) {
Expand Down Expand Up @@ -77,14 +78,23 @@ func RenderPipelineRangesTable(area *pterm.AreaPrinter, files []string, workers
}

func Run(files []string) {
timeOffset := time.Now().Add(500 * time.Millisecond).Sub(DetermineBaseTime(files))
baseTime := DetermineBaseTime(files)

var skipUntil uint64
effectiveBaseTime := baseTime
if *fSkipTimeSec > 0 {
skipDuration := time.Duration(*fSkipTimeSec) * time.Second
skipUntil = uint64(baseTime.Add(skipDuration).UnixNano())
effectiveBaseTime = baseTime.Add(skipDuration)
}
timeOffset := time.Now().Add(500 * time.Millisecond).Sub(effectiveBaseTime)
fmt.Println("Offset -> ", timeOffset)

// Start a worker for every file. They take care of spawning client workers.
var wg sync.WaitGroup
workers := make([]FileWorker, len(files))
for i := range workers {
workers[i] = FileWorker{timeOffset: timeOffset}
workers[i] = FileWorker{timeOffset: timeOffset, skipUntil: skipUntil}
wg.Add(1)
go workers[i].Run(files[i], &wg)
}
Expand Down Expand Up @@ -224,6 +234,7 @@ func main() {

fmt.Fprintln(os.Stderr, "\nExamples:")
fmt.Fprintf(os.Stderr, " %s -host 192.168.1.10:6379 -buffer 50 run *.bin\n", binaryName)
fmt.Fprintf(os.Stderr, " %s -skip-time-sec 30 run *.bin\n", binaryName)
fmt.Fprintf(os.Stderr, " %s print *.bin\n", binaryName)
}

Expand Down
6 changes: 6 additions & 0 deletions tools/replay/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ var pipelineRanges = []struct {
type FileWorker struct {
clientGroup sync.WaitGroup
timeOffset time.Duration
skipUntil uint64
// stats for output, updated by clients, read by rendering goroutine
processed uint64
delayed uint64
Expand Down Expand Up @@ -170,6 +171,11 @@ func (w *FileWorker) Run(file string, wg *sync.WaitGroup) {
if cmdName != "eval" && recordId < uint64(*fSkip) {
return true
}

if w.skipUntil > 0 && r.Time < w.skipUntil {
return true
}

atomic.AddUint64(&w.parsed, 1)
client.incoming <- r
return true
Expand Down
Loading