diff --git a/tools/replay/main.go b/tools/replay/main.go index 7b26eee33ca1..a2d11b1c3eea 100644 --- a/tools/replay/main.go +++ b/tools/replay/main.go @@ -17,6 +17,7 @@ var fHost = flag.String("host", "127.0.0.1:6379", "Redis host") var fClientBuffer = flag.Int("buffer", 100, "How many records to buffer per client") var fPace = flag.Bool("pace", true, "whether to pace the traffic according to the original timings.false - to pace as fast as possible") var fSkip = flag.Uint("skip", 0, "skip N records") +var fSkipTimeSec = flag.Int("skip-time-sec", 0, "skip records in the first N seconds of the recording") var fIgnoreParseErrors = flag.Bool("ignore-parse-errors", false, "ignore parsing errors") func RenderTable(area *pterm.AreaPrinter, files []string, workers []FileWorker) { @@ -77,14 +78,23 @@ func RenderPipelineRangesTable(area *pterm.AreaPrinter, files []string, workers } func Run(files []string) { - timeOffset := time.Now().Add(500 * time.Millisecond).Sub(DetermineBaseTime(files)) + baseTime := DetermineBaseTime(files) + + var skipUntil uint64 + effectiveBaseTime := baseTime + if *fSkipTimeSec > 0 { + skipDuration := time.Duration(*fSkipTimeSec) * time.Second + skipUntil = uint64(baseTime.Add(skipDuration).UnixNano()) + effectiveBaseTime = baseTime.Add(skipDuration) + } + timeOffset := time.Now().Add(500 * time.Millisecond).Sub(effectiveBaseTime) fmt.Println("Offset -> ", timeOffset) // Start a worker for every file. They take care of spawning client workers. var wg sync.WaitGroup workers := make([]FileWorker, len(files)) for i := range workers { - workers[i] = FileWorker{timeOffset: timeOffset} + workers[i] = FileWorker{timeOffset: timeOffset, skipUntil: skipUntil} wg.Add(1) go workers[i].Run(files[i], &wg) } @@ -224,6 +234,7 @@ func main() { fmt.Fprintln(os.Stderr, "\nExamples:") fmt.Fprintf(os.Stderr, " %s -host 192.168.1.10:6379 -buffer 50 run *.bin\n", binaryName) + fmt.Fprintf(os.Stderr, " %s -skip-time-sec 30 run *.bin\n", binaryName) fmt.Fprintf(os.Stderr, " %s print *.bin\n", binaryName) } diff --git a/tools/replay/workers.go b/tools/replay/workers.go index bd219cda3629..6540dcfddf14 100644 --- a/tools/replay/workers.go +++ b/tools/replay/workers.go @@ -64,6 +64,7 @@ var pipelineRanges = []struct { type FileWorker struct { clientGroup sync.WaitGroup timeOffset time.Duration + skipUntil uint64 // stats for output, updated by clients, read by rendering goroutine processed uint64 delayed uint64 @@ -170,6 +171,11 @@ func (w *FileWorker) Run(file string, wg *sync.WaitGroup) { if cmdName != "eval" && recordId < uint64(*fSkip) { return true } + + if w.skipUntil > 0 && r.Time < w.skipUntil { + return true + } + atomic.AddUint64(&w.parsed, 1) client.incoming <- r return true