Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"os"
"reflect"
"runtime"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -479,6 +480,69 @@ func benchmarkGoCsvReaderReadOneByOneWithReuseRecord(rowsCount int64) func(b *te
}
}

func benchmarkGoCsvReaderReadOneByOneProcessParalell(rowsCount int64) func(b *testing.B) {
return func(b *testing.B) {
fName, err := setUpTmpCsvFile(rowsCount)
if err != nil {
b.Fatalf("prerequisite failed: could not generate CSV file: %v", err)
}
defer tearDownTmpCsvFile(fName)

numWorkers := runtime.GOMAXPROCS(0)

b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
// setup workers for parallel processing
rowsChan := make(chan []string, numWorkers)
var (
count int64
wg sync.WaitGroup
)
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
var localCount int64
for record := range rowsChan {
localCount++
fakeProcessRow(record)
}
atomic.AddInt64(&count, localCount)
wg.Done()
}()
}

// sequential reading
f, err := os.Open(fName)
if err != nil {
b.Fatal(err)
}
subject := csv.NewReader(f)
subject.FieldsPerRecord = 5
subject.Comma = ','
count = 0

for {
record, err := subject.Read()
if err != nil {
if err == io.EOF {
break
}
b.Error(err)
} else { // "consume" row
rowsChan <- record
}
}
close(rowsChan)
wg.Wait()
_ = f.Close()
if count != rowsCount {
b.Errorf("expected %d, but got %d", rowsCount, count)
}
}
}
}
func Benchmark50000Rows_50Mb_withBigCsvReader(b *testing.B) {
benchmarkBigCsvReader(5e4)(b)
}
Expand All @@ -490,3 +554,7 @@ func Benchmark50000Rows_50Mb_withGoCsvReaderReadAll(b *testing.B) {
func Benchmark50000Rows_50Mb_withGoCsvReaderReadOneByOneAndReuseRecord(b *testing.B) {
benchmarkGoCsvReaderReadOneByOneWithReuseRecord(5e4)(b)
}

func Benchmark50000Rows_50Mb_withGoCsvReaderReadOneByOneProcessParalell(b *testing.B) {
benchmarkGoCsvReaderReadOneByOneProcessParalell(5e4)(b)
}