diff --git a/reader_test.go b/reader_test.go index c2ab5a1..08830c7 100644 --- a/reader_test.go +++ b/reader_test.go @@ -12,6 +12,7 @@ import ( "io" "os" "reflect" + "runtime" "strconv" "strings" "sync" @@ -479,6 +480,69 @@ func benchmarkGoCsvReaderReadOneByOneWithReuseRecord(rowsCount int64) func(b *te } } +func benchmarkGoCsvReaderReadOneByOneProcessParalell(rowsCount int64) func(b *testing.B) { + return func(b *testing.B) { + fName, err := setUpTmpCsvFile(rowsCount) + if err != nil { + b.Fatalf("prerequisite failed: could not generate CSV file: %v", err) + } + defer tearDownTmpCsvFile(fName) + + numWorkers := runtime.GOMAXPROCS(0) + + b.ReportAllocs() + b.ResetTimer() + + for n := 0; n < b.N; n++ { + // setup workers for parallel processing + rowsChan := make(chan []string, numWorkers) + var ( + count int64 + wg sync.WaitGroup + ) + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func() { + var localCount int64 + for record := range rowsChan { + localCount++ + fakeProcessRow(record) + } + atomic.AddInt64(&count, localCount) + wg.Done() + }() + } + + // sequential reading + f, err := os.Open(fName) + if err != nil { + b.Fatal(err) + } + subject := csv.NewReader(f) + subject.FieldsPerRecord = 5 + subject.Comma = ',' + count = 0 + + for { + record, err := subject.Read() + if err != nil { + if err == io.EOF { + break + } + b.Error(err) + } else { // "consume" row + rowsChan <- record + } + } + close(rowsChan) + wg.Wait() + _ = f.Close() + if count != rowsCount { + b.Errorf("expected %d, but got %d", rowsCount, count) + } + } + } +} func Benchmark50000Rows_50Mb_withBigCsvReader(b *testing.B) { benchmarkBigCsvReader(5e4)(b) } @@ -490,3 +554,7 @@ func Benchmark50000Rows_50Mb_withGoCsvReaderReadAll(b *testing.B) { func Benchmark50000Rows_50Mb_withGoCsvReaderReadOneByOneAndReuseRecord(b *testing.B) { benchmarkGoCsvReaderReadOneByOneWithReuseRecord(5e4)(b) } + +func Benchmark50000Rows_50Mb_withGoCsvReaderReadOneByOneProcessParalell(b *testing.B) { + benchmarkGoCsvReaderReadOneByOneProcessParalell(5e4)(b) +}