From 93526b1dca83f937166c9fd5c0cbd96d74fc6a63 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 26 Nov 2020 21:23:01 +0100 Subject: [PATCH 01/21] WIP CSV reader optimization --- rust/arrow/Cargo.toml | 1 + rust/arrow/src/csv/reader.rs | 48 +++++++++++++----------------- rust/benchmarks/src/bin/nyctaxi.rs | 2 +- 3 files changed, 23 insertions(+), 28 deletions(-) diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml index 0e35fcc746a..9dd29daf345 100644 --- a/rust/arrow/Cargo.toml +++ b/rust/arrow/Cargo.toml @@ -42,6 +42,7 @@ serde_json = { version = "1.0", features = ["preserve_order"] } indexmap = "1.6" rand = "0.7" csv = "1.1" +csv-core = "^0.1" num = "0.3" regex = "1.3" lazy_static = "1.4" diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index e38926d37fe..ad04d55cd44 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -42,11 +42,11 @@ use lazy_static::lazy_static; use regex::{Regex, RegexBuilder}; +use std::collections::HashSet; +use std::fmt; use std::fs::File; use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; -use std::{collections::HashSet, iter::Skip}; -use std::{fmt, iter::Take}; use csv as csv_crate; @@ -55,10 +55,9 @@ use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; use crate::{ array::{ArrayRef, PrimitiveArray, StringBuilder}, - util::buffered_iterator::Buffered, }; -use self::csv_crate::{Error, StringRecord, StringRecordsIntoIter}; +use self::csv_crate::{Error, StringRecord}; lazy_static! { static ref DECIMAL_RE: Regex = Regex::new(r"^-?(\d+\.\d+)$").unwrap(); @@ -229,10 +228,13 @@ pub struct Reader { /// Optional projection for which columns to load (zero-based column indices) projection: Option>, /// File reader - record_iter: - Buffered>>>, StringRecord, Error>, + reader: csv_crate::Reader>, /// Current line number line_number: usize, + // max rows + end: usize, + // number of records per batch + batch_size: usize, } impl fmt::Debug for Reader @@ -310,26 +312,19 @@ impl Reader { } let csv_reader = reader_builder.from_reader(buf_reader); - let record_iter = csv_reader.into_records(); let (start, end) = match bounds { None => (0, usize::MAX), Some((start, end)) => (start, end), }; - // Create an iterator that: - // * skips the first `start` items - // * runs up to `end` items - // * buffers `batch_size` items - // note that this skips by iteration. This is because in general it is not possible - // to seek in CSV. However, skiping still saves the burden of creating arrow arrays, - // which is a slow operation that scales with the number of columns - let record_iter = Buffered::new(record_iter.take(end).skip(start), batch_size); Self { schema, projection, - record_iter, + reader: csv_reader, line_number: if has_header { start + 1 } else { start }, + batch_size, + end, } } } @@ -338,19 +333,18 @@ impl Iterator for Reader { type Item = Result; fn next(&mut self) -> Option { - let rows = match self.record_iter.next() { - Some(Ok(r)) => r, - Some(Err(e)) => { - return Some(Err(ArrowError::ParseError(format!( - "Error parsing line {}: {:?}", - self.line_number + self.record_iter.n(), - e - )))); + let mut record = StringRecord::new(); + + let mut rows = Vec::with_capacity(self.batch_size); + + for _ in 0..self.batch_size { + match self.reader.read_record(&mut record) { + Ok(true) => rows.push(record.clone()), + Ok(false) => break, + Err(_) => return None, // TODO ERROR } - None => return None, - }; + } - // return early if no data was loaded if rows.is_empty() { return None; } diff --git a/rust/benchmarks/src/bin/nyctaxi.rs b/rust/benchmarks/src/bin/nyctaxi.rs index 02a790bdcaf..4f51d84ce48 100644 --- a/rust/benchmarks/src/bin/nyctaxi.rs +++ b/rust/benchmarks/src/bin/nyctaxi.rs @@ -92,7 +92,7 @@ async fn datafusion_sql_benchmarks( debug: bool, ) -> Result<()> { let mut queries = HashMap::new(); - queries.insert("fare_amt_by_passenger", "SELECT passenger_count, MIN(fare_amount), MIN(fare_amount), SUM(fare_amount) FROM tripdata GROUP BY passenger_count"); + queries.insert("fare_amt_by_passenger", "SELECT passenger_count, MIN(fare_amount), SUM(fare_amount) FROM tripdata GROUP BY passenger_count"); for (name, sql) in &queries { println!("Executing '{}'", name); for i in 0..iterations { From 02d93b9f95eebf0ba63d1656de9395cb2b3b656a Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 26 Nov 2020 21:29:22 +0100 Subject: [PATCH 02/21] Small cleanup --- rust/arrow/Cargo.toml | 1 - rust/arrow/src/csv/reader.rs | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml index 9dd29daf345..0e35fcc746a 100644 --- a/rust/arrow/Cargo.toml +++ b/rust/arrow/Cargo.toml @@ -42,7 +42,6 @@ serde_json = { version = "1.0", features = ["preserve_order"] } indexmap = "1.6" rand = "0.7" csv = "1.1" -csv-core = "^0.1" num = "0.3" regex = "1.3" lazy_static = "1.4" diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index ad04d55cd44..8868a51c14b 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -50,12 +50,10 @@ use std::sync::Arc; use csv as csv_crate; +use crate::array::{ArrayRef, PrimitiveArray, StringBuilder}; use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; -use crate::{ - array::{ArrayRef, PrimitiveArray, StringBuilder}, -}; use self::csv_crate::{Error, StringRecord}; From 5df4339ee3afec84f8b174f32cd9901eb07f12b6 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 26 Nov 2020 21:40:03 +0100 Subject: [PATCH 03/21] Add error --- rust/arrow/src/csv/reader.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 8868a51c14b..3da4ba4cc75 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -335,14 +335,21 @@ impl Iterator for Reader { let mut rows = Vec::with_capacity(self.batch_size); - for _ in 0..self.batch_size { + for i in 0..self.batch_size { match self.reader.read_record(&mut record) { Ok(true) => rows.push(record.clone()), Ok(false) => break, - Err(_) => return None, // TODO ERROR + Err(e) => { + return Some(Err(ArrowError::ParseError(format!( + "Error parsing line {}: {:?}", + self.line_number + i, + e + )))) + } } } + // return early if no data was loaded if rows.is_empty() { return None; } From 65e778932c08dcf18348031bdcc5aeeb56e44291 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 26 Nov 2020 21:49:25 +0100 Subject: [PATCH 04/21] Reuse row batch allocation --- rust/arrow/src/csv/reader.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 3da4ba4cc75..ca2cb7bff22 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -233,6 +233,9 @@ pub struct Reader { end: usize, // number of records per batch batch_size: usize, + + // String records + rows: Vec } impl fmt::Debug for Reader @@ -316,6 +319,8 @@ impl Reader { Some((start, end)) => (start, end), }; + let rows = Vec::with_capacity(batch_size); + Self { schema, projection, @@ -323,6 +328,7 @@ impl Reader { line_number: if has_header { start + 1 } else { start }, batch_size, end, + rows } } } @@ -332,12 +338,10 @@ impl Iterator for Reader { fn next(&mut self) -> Option { let mut record = StringRecord::new(); - - let mut rows = Vec::with_capacity(self.batch_size); - + self.rows.clear(); for i in 0..self.batch_size { match self.reader.read_record(&mut record) { - Ok(true) => rows.push(record.clone()), + Ok(true) => self.rows.push(record.clone()), Ok(false) => break, Err(e) => { return Some(Err(ArrowError::ParseError(format!( @@ -350,19 +354,19 @@ impl Iterator for Reader { } // return early if no data was loaded - if rows.is_empty() { + if self.rows.is_empty() { return None; } // parse the batches into a RecordBatch let result = parse( - &rows, + &self.rows, &self.schema.fields(), &self.projection, self.line_number, ); - self.line_number += rows.len(); + self.line_number += self.rows.len(); Some(result) } From 687868030975a96e6279bed98c5fbe2104c2bd6b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 26 Nov 2020 22:22:08 +0100 Subject: [PATCH 05/21] Implement end bound --- rust/arrow/src/csv/reader.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index ca2cb7bff22..ca31078d715 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -235,7 +235,7 @@ pub struct Reader { batch_size: usize, // String records - rows: Vec + rows: Vec, } impl fmt::Debug for Reader @@ -328,7 +328,7 @@ impl Reader { line_number: if has_header { start + 1 } else { start }, batch_size, end, - rows + rows, } } } @@ -339,7 +339,8 @@ impl Iterator for Reader { fn next(&mut self) -> Option { let mut record = StringRecord::new(); self.rows.clear(); - for i in 0..self.batch_size { + let remaining = self.end - self.line_number; + for i in 0..min(self.batch_size, remaining) { match self.reader.read_record(&mut record) { Ok(true) => self.rows.push(record.clone()), Ok(false) => break, From c559759a76d036e2d6d33382f9bad9fd8d7f0c01 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 26 Nov 2020 22:31:42 +0100 Subject: [PATCH 06/21] Implement skipping --- rust/arrow/src/csv/reader.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index ca31078d715..b9436fa00e1 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -40,6 +40,7 @@ //! let batch = csv.next().unwrap().unwrap(); //! ``` +use core::cmp::min; use lazy_static::lazy_static; use regex::{Regex, RegexBuilder}; use std::collections::HashSet; @@ -312,7 +313,7 @@ impl Reader { reader_builder.delimiter(c); } - let csv_reader = reader_builder.from_reader(buf_reader); + let mut csv_reader = reader_builder.from_reader(buf_reader); let (start, end) = match bounds { None => (0, usize::MAX), @@ -321,11 +322,21 @@ impl Reader { let rows = Vec::with_capacity(batch_size); + let mut record = StringRecord::new(); + let start_line = if has_header { start + 1 } else { start }; + // skip first start_line items + for _ in 0..start_line { + let res = csv_reader.read_record(&mut record); + if !res.unwrap_or(false) { + break; + } + } + Self { schema, projection, reader: csv_reader, - line_number: if has_header { start + 1 } else { start }, + line_number: start_line, batch_size, end, rows, From 9ab1e5e1832e774c0be365cccce281553e85f17a Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 26 Nov 2020 22:46:16 +0100 Subject: [PATCH 07/21] Further optimize by reusing individual StringRecords --- rust/arrow/src/csv/reader.rs | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index b9436fa00e1..7a4a6d44272 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -56,7 +56,7 @@ use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; -use self::csv_crate::{Error, StringRecord}; +use self::csv_crate::StringRecord; lazy_static! { static ref DECIMAL_RE: Regex = Regex::new(r"^-?(\d+\.\d+)$").unwrap(); @@ -234,8 +234,7 @@ pub struct Reader { end: usize, // number of records per batch batch_size: usize, - - // String records + // Vector that can hold the string records of the batches rows: Vec, } @@ -320,12 +319,14 @@ impl Reader { Some((start, end)) => (start, end), }; - let rows = Vec::with_capacity(batch_size); + // First we will skip `start` rows + // note that this skips by iteration. This is because in general it is not possible + // to seek in CSV. However, skiping still saves the burden of creating arrow arrays, + // which is a slow operation that scales with the number of columns let mut record = StringRecord::new(); - let start_line = if has_header { start + 1 } else { start }; // skip first start_line items - for _ in 0..start_line { + for _ in 0..start { let res = csv_reader.read_record(&mut record); if !res.unwrap_or(false) { break; @@ -336,10 +337,10 @@ impl Reader { schema, projection, reader: csv_reader, - line_number: start_line, + line_number: if has_header { start + 1 } else { start }, batch_size, end, - rows, + rows: vec![], } } } @@ -348,12 +349,19 @@ impl Iterator for Reader { type Item = Result; fn next(&mut self) -> Option { - let mut record = StringRecord::new(); - self.rows.clear(); let remaining = self.end - self.line_number; + if self.rows.capacity() == 0 { + let record = StringRecord::new(); + for _ in 0..self.batch_size { + self.rows.push(record.clone()); + } + } + let mut read_records = 0; for i in 0..min(self.batch_size, remaining) { - match self.reader.read_record(&mut record) { - Ok(true) => self.rows.push(record.clone()), + match self.reader.read_record(&mut self.rows[i]) { + Ok(true) => { + read_records += 1; + } Ok(false) => break, Err(e) => { return Some(Err(ArrowError::ParseError(format!( @@ -366,13 +374,13 @@ impl Iterator for Reader { } // return early if no data was loaded - if self.rows.is_empty() { + if read_records == 0 { return None; } // parse the batches into a RecordBatch let result = parse( - &self.rows, + &self.rows[..read_records], &self.schema.fields(), &self.projection, self.line_number, From 03db14191618cc5753705c26ec14defd41776bd8 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 26 Nov 2020 22:50:19 +0100 Subject: [PATCH 08/21] Comment fix --- rust/arrow/src/csv/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 7a4a6d44272..4e91020f8b0 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -325,7 +325,7 @@ impl Reader { // which is a slow operation that scales with the number of columns let mut record = StringRecord::new(); - // skip first start_line items + // skip first start items for _ in 0..start { let res = csv_reader.read_record(&mut record); if !res.unwrap_or(false) { From c9e277d05bddadd348e26873113c00b54b53b078 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 26 Nov 2020 22:51:40 +0100 Subject: [PATCH 09/21] Some comments and line nr. fix --- rust/arrow/src/csv/reader.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 4e91020f8b0..eed35f01c21 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -235,7 +235,7 @@ pub struct Reader { // number of records per batch batch_size: usize, // Vector that can hold the string records of the batches - rows: Vec, + batch_records: Vec, } impl fmt::Debug for Reader @@ -340,7 +340,7 @@ impl Reader { line_number: if has_header { start + 1 } else { start }, batch_size, end, - rows: vec![], + batch_records: vec![], } } } @@ -350,15 +350,15 @@ impl Iterator for Reader { fn next(&mut self) -> Option { let remaining = self.end - self.line_number; - if self.rows.capacity() == 0 { + if self.batch_records.capacity() == 0 { let record = StringRecord::new(); for _ in 0..self.batch_size { - self.rows.push(record.clone()); + self.batch_records.push(record.clone()); } } let mut read_records = 0; for i in 0..min(self.batch_size, remaining) { - match self.reader.read_record(&mut self.rows[i]) { + match self.reader.read_record(&mut self.batch_records[i]) { Ok(true) => { read_records += 1; } @@ -380,13 +380,13 @@ impl Iterator for Reader { // parse the batches into a RecordBatch let result = parse( - &self.rows[..read_records], + &self.batch_records[..read_records], &self.schema.fields(), &self.projection, self.line_number, ); - self.line_number += self.rows.len(); + self.line_number += read_records; Some(result) } From a218870cb9a48f6d9f938f6be9c83d4abc8d6134 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 26 Nov 2020 22:55:48 +0100 Subject: [PATCH 10/21] Initialize it outside of iterator --- rust/arrow/src/csv/reader.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index eed35f01c21..586daada4e6 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -333,6 +333,14 @@ impl Reader { } } + // Initialize batch_records with StringRecords so they + // can be reused accross batches + let mut batch_records = Vec::with_capacity(batch_size); + let record = StringRecord::new(); + for _ in 0..batch_size { + batch_records.push(record.clone()); + } + Self { schema, projection, @@ -340,7 +348,7 @@ impl Reader { line_number: if has_header { start + 1 } else { start }, batch_size, end, - batch_records: vec![], + batch_records, } } } @@ -350,12 +358,7 @@ impl Iterator for Reader { fn next(&mut self) -> Option { let remaining = self.end - self.line_number; - if self.batch_records.capacity() == 0 { - let record = StringRecord::new(); - for _ in 0..self.batch_size { - self.batch_records.push(record.clone()); - } - } + let mut read_records = 0; for i in 0..min(self.batch_size, remaining) { match self.reader.read_record(&mut self.batch_records[i]) { From cd5ba77dbf993454117ae88e51fd0315591fcd84 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 27 Nov 2020 07:39:45 +0100 Subject: [PATCH 11/21] Use read_record api as well in infer_file_schema --- rust/arrow/src/csv/reader.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 586daada4e6..17742775658 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -129,11 +129,12 @@ fn infer_file_schema( let mut records_count = 0; let mut fields = vec![]; - for result in csv_reader - .records() - .take(max_read_records.unwrap_or(std::usize::MAX)) - { - let record = result?; + let mut record = StringRecord::new(); + let max_records = max_read_records.unwrap_or(usize::MAX); + while records_count < max_records { + if !csv_reader.read_record(&mut record)? { + break; + } records_count += 1; for i in 0..header_length { From a741ceaf519afb995bd884bd8cc8dbc72c2616d0 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 27 Nov 2020 09:06:18 +0100 Subject: [PATCH 12/21] Simplify / speed up by removing BufReader --- rust/arrow/src/csv/reader.rs | 37 +++++++++++++++--------------------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 17742775658..35059a9adc6 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -46,7 +46,7 @@ use regex::{Regex, RegexBuilder}; use std::collections::HashSet; use std::fmt; use std::fs::File; -use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; use csv as csv_crate; @@ -93,7 +93,7 @@ fn infer_field_schema(string: &str) -> DataType { /// /// Return infered schema and number of records used for inference. fn infer_file_schema( - reader: &mut BufReader, + reader: &mut R, delimiter: u8, max_read_records: Option, has_header: bool, @@ -200,7 +200,7 @@ pub fn infer_schema_from_files( for fname in files.iter() { let (schema, records_read) = infer_file_schema( - &mut BufReader::new(File::open(fname)?), + &mut File::open(fname)?, delimiter, Some(records_to_read), has_header, @@ -228,7 +228,7 @@ pub struct Reader { /// Optional projection for which columns to load (zero-based column indices) projection: Option>, /// File reader - reader: csv_crate::Reader>, + reader: csv_crate::Reader, /// Current line number line_number: usize, // max rows @@ -267,14 +267,8 @@ impl Reader { bounds: Bounds, projection: Option>, ) -> Self { - Self::from_buf_reader( - BufReader::new(reader), - schema, - has_header, - delimiter, - batch_size, - bounds, - projection, + Self::from_reader( + reader, schema, has_header, delimiter, batch_size, bounds, projection, ) } @@ -297,8 +291,8 @@ impl Reader { /// /// This constructor allows you more flexibility in what records are processed by the /// csv reader. - pub fn from_buf_reader( - buf_reader: BufReader, + pub fn from_reader( + reader: R, schema: SchemaRef, has_header: bool, delimiter: Option, @@ -313,7 +307,7 @@ impl Reader { reader_builder.delimiter(c); } - let mut csv_reader = reader_builder.from_reader(buf_reader); + let mut csv_reader = reader_builder.from_reader(reader); let (start, end) = match bounds { None => (0, usize::MAX), @@ -667,15 +661,14 @@ impl ReaderBuilder { } /// Create a new `Reader` from the `ReaderBuilder` - pub fn build(self, reader: R) -> Result> { + pub fn build(self, mut reader: R) -> Result> { // check if schema should be inferred - let mut buf_reader = BufReader::new(reader); let delimiter = self.delimiter.unwrap_or(b','); let schema = match self.schema { Some(schema) => schema, None => { let (inferred_schema, _) = infer_file_schema( - &mut buf_reader, + &mut reader, delimiter, self.max_records, self.has_header, @@ -684,8 +677,8 @@ impl ReaderBuilder { Arc::new(inferred_schema) } }; - Ok(Reader::from_buf_reader( - buf_reader, + Ok(Reader::from_reader( + reader, schema, self.has_header, self.delimiter, @@ -763,8 +756,8 @@ mod tests { let both_files = file_with_headers .chain(Cursor::new("\n".to_string())) .chain(file_without_headers); - let mut csv = Reader::from_buf_reader( - BufReader::new(both_files), + let mut csv = Reader::from_reader( + both_files, Arc::new(schema), true, None, From e43a5c11b3c198b953a8919edf66aa2e3922866a Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 27 Nov 2020 12:12:18 +0100 Subject: [PATCH 13/21] Slightly optimize skipping by using ByteRecord --- rust/arrow/src/csv/reader.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 35059a9adc6..4b6125d0b70 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -56,7 +56,7 @@ use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; -use self::csv_crate::StringRecord; +use self::csv_crate::{ByteRecord, StringRecord}; lazy_static! { static ref DECIMAL_RE: Regex = Regex::new(r"^-?(\d+\.\d+)$").unwrap(); @@ -231,11 +231,11 @@ pub struct Reader { reader: csv_crate::Reader, /// Current line number line_number: usize, - // max rows + /// Maximum number of rows to read end: usize, - // number of records per batch + /// Number of records per batch batch_size: usize, - // Vector that can hold the string records of the batches + /// Vector that can hold the `StringRecord`s of the batches batch_records: Vec, } @@ -287,7 +287,7 @@ impl Reader { } } - /// Create a new CsvReader from a `BufReader + /// Create a new CsvReader from a Reader /// /// This constructor allows you more flexibility in what records are processed by the /// csv reader. @@ -319,10 +319,10 @@ impl Reader { // to seek in CSV. However, skiping still saves the burden of creating arrow arrays, // which is a slow operation that scales with the number of columns - let mut record = StringRecord::new(); - // skip first start items + let mut record = ByteRecord::new(); + // Skip first start items for _ in 0..start { - let res = csv_reader.read_record(&mut record); + let res = csv_reader.read_byte_record(&mut record); if !res.unwrap_or(false) { break; } From 7241c20f90ff32ddaeb05e71961a192e8cb83b58 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 27 Nov 2020 12:41:45 +0100 Subject: [PATCH 14/21] Slightly simplify initialization --- rust/arrow/src/csv/reader.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 4b6125d0b70..d1e7ff93698 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -331,10 +331,7 @@ impl Reader { // Initialize batch_records with StringRecords so they // can be reused accross batches let mut batch_records = Vec::with_capacity(batch_size); - let record = StringRecord::new(); - for _ in 0..batch_size { - batch_records.push(record.clone()); - } + batch_records.resize_with(batch_size, Default::default); Self { schema, From ecf48697536ea13802e1378d4b2cb2df487455ef Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 27 Nov 2020 17:29:56 +0100 Subject: [PATCH 15/21] Use MAX(fare_amount) instead of removing it --- rust/benchmarks/src/bin/nyctaxi.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/benchmarks/src/bin/nyctaxi.rs b/rust/benchmarks/src/bin/nyctaxi.rs index 4f51d84ce48..1ffa6842061 100644 --- a/rust/benchmarks/src/bin/nyctaxi.rs +++ b/rust/benchmarks/src/bin/nyctaxi.rs @@ -92,7 +92,7 @@ async fn datafusion_sql_benchmarks( debug: bool, ) -> Result<()> { let mut queries = HashMap::new(); - queries.insert("fare_amt_by_passenger", "SELECT passenger_count, MIN(fare_amount), SUM(fare_amount) FROM tripdata GROUP BY passenger_count"); + queries.insert("fare_amt_by_passenger", "SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM tripdata GROUP BY passenger_count"); for (name, sql) in &queries { println!("Executing '{}'", name); for i in 0..iterations { From 16e8c1f535e149beceda3daee7f4a40c7c85a6e0 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 27 Nov 2020 20:15:20 +0100 Subject: [PATCH 16/21] Trigger CI From cce93c5fc78669caf5700e24cb2b57c571679300 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 28 Nov 2020 10:08:34 +0100 Subject: [PATCH 17/21] Trigger CI From 48ad727804b737946e1a8767ad70c936632a4232 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 28 Nov 2020 10:47:46 +0100 Subject: [PATCH 18/21] Test different allocator --- rust/benchmarks/Cargo.toml | 1 + rust/benchmarks/src/bin/nyctaxi.rs | 2 +- rust/benchmarks/src/bin/tpch.rs | 4 ++++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/rust/benchmarks/Cargo.toml b/rust/benchmarks/Cargo.toml index 7d7b6c97342..655fa716567 100644 --- a/rust/benchmarks/Cargo.toml +++ b/rust/benchmarks/Cargo.toml @@ -32,3 +32,4 @@ datafusion = { path = "../datafusion" } structopt = { version = "0.3", default-features = false } tokio = { version = "0.2", features = ["macros", "rt-core", "rt-threaded"] } futures = "0.3" +mimalloc = { version = "*", default-features = false } diff --git a/rust/benchmarks/src/bin/nyctaxi.rs b/rust/benchmarks/src/bin/nyctaxi.rs index 02a790bdcaf..1ffa6842061 100644 --- a/rust/benchmarks/src/bin/nyctaxi.rs +++ b/rust/benchmarks/src/bin/nyctaxi.rs @@ -92,7 +92,7 @@ async fn datafusion_sql_benchmarks( debug: bool, ) -> Result<()> { let mut queries = HashMap::new(); - queries.insert("fare_amt_by_passenger", "SELECT passenger_count, MIN(fare_amount), MIN(fare_amount), SUM(fare_amount) FROM tripdata GROUP BY passenger_count"); + queries.insert("fare_amt_by_passenger", "SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM tripdata GROUP BY passenger_count"); for (name, sql) in &queries { println!("Executing '{}'", name); for i in 0..iterations { diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs index 82edd9874c8..96ff9817a95 100644 --- a/rust/benchmarks/src/bin/tpch.rs +++ b/rust/benchmarks/src/bin/tpch.rs @@ -16,6 +16,10 @@ // under the License. //! Benchmark derived from TPC-H. This is not an official TPC-H benchmark. +use mimalloc::MiMalloc; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; use std::path::PathBuf; use std::sync::Arc; From 3955badc7755da729c213c4e4e7112d91b6fd824 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 28 Nov 2020 12:48:08 +0100 Subject: [PATCH 19/21] Remove unused buffered iterator --- rust/arrow/src/util/buffered_iterator.rs | 138 ----------------------- 1 file changed, 138 deletions(-) delete mode 100644 rust/arrow/src/util/buffered_iterator.rs diff --git a/rust/arrow/src/util/buffered_iterator.rs b/rust/arrow/src/util/buffered_iterator.rs deleted file mode 100644 index 5d42ee43e66..00000000000 --- a/rust/arrow/src/util/buffered_iterator.rs +++ /dev/null @@ -1,138 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! [Buffered] is an iterator useful to build an [arrow::array::Array] and other -//! containers that benefit from batching or chunking. - -use std::marker::PhantomData; - -/// An iterator that buffers results in a vector so that the iterator returns a vector of `size` items. -/// The items must be a [std::result::Result] and if an error is returned, tha error is returned -/// and the iterator continues. -/// An invariant of this iterator is that every returned vector's size is at most the specified size. -#[derive(Debug)] -pub struct Buffered -where - T: Clone, - I: Iterator>, -{ - iter: I, - size: usize, - buffer: Vec, - phantom: PhantomData, -} - -impl Buffered -where - T: Clone, - I: Iterator>, -{ - pub fn new(iter: I, size: usize) -> Self { - Buffered { - iter, - size, - buffer: Vec::with_capacity(size), - phantom: PhantomData, - } - } - - /// returns the number of items buffered so far. - /// Useful to extract the exact item where an error occurred - #[inline] - pub fn n(&self) -> usize { - self.buffer.len() - } -} - -impl Iterator for Buffered -where - T: Clone, - I: Iterator>, -{ - type Item = Result, R>; - - fn next(&mut self) -> Option { - for _ in 0..(self.size - self.n()) { - match self.iter.next() { - Some(Ok(item)) => self.buffer.push(item), - Some(Err(error)) => return Some(Err(error)), - None => break, - } - } - if self.buffer.is_empty() { - None - } else { - let result = self.buffer.clone(); - self.buffer.clear(); - Some(Ok(result)) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[derive(Debug, PartialEq)] - struct AError {} - - impl std::fmt::Display for AError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Bla") - } - } - impl std::error::Error for AError {} - - #[test] - fn test_basic() { - let a: Vec> = vec![Ok(1), Ok(2), Ok(3)]; - let iter = a.into_iter(); - let mut iter = Buffered::new(iter, 2); - - assert_eq!(iter.next(), Some(Ok(vec![1, 2]))); - assert_eq!(iter.next(), Some(Ok(vec![3]))); - assert_eq!(iter.next(), None); - } - - #[test] - fn test_error_first() { - let a: Vec> = - vec![Ok(1), Ok(2), Err(AError {}), Ok(4), Ok(5)]; - let iter = a.into_iter(); - let mut iter = Buffered::new(iter, 2); - - assert_eq!(iter.next(), Some(Ok(vec![1, 2]))); - assert_eq!(iter.next(), Some(Err(AError {}))); - // 4 is here: it was not skipped on the previous - assert_eq!(iter.n(), 0); - assert_eq!(iter.next(), Some(Ok(vec![4, 5]))); - assert_eq!(iter.next(), None); - } - - #[test] - fn test_error_last() { - let a: Vec> = vec![Ok(1), Err(AError {}), Ok(3), Ok(4)]; - let iter = a.into_iter(); - let mut iter = Buffered::new(iter, 2); - - assert_eq!(iter.next(), Some(Err(AError {}))); - assert_eq!(iter.n(), 1); - assert_eq!(iter.next(), Some(Ok(vec![1, 3]))); - assert_eq!(iter.next(), Some(Ok(vec![4]))); - assert_eq!(iter.next(), None); - } -} From 31b059cc2e181d316eaff8ff200b3570ea3b0ae5 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 28 Nov 2020 12:52:19 +0100 Subject: [PATCH 20/21] Fix mod export --- rust/arrow/src/util/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/arrow/src/util/mod.rs b/rust/arrow/src/util/mod.rs index 053d1329631..0f95043ea9d 100644 --- a/rust/arrow/src/util/mod.rs +++ b/rust/arrow/src/util/mod.rs @@ -17,7 +17,6 @@ pub mod bit_chunk_iterator; pub mod bit_util; -pub mod buffered_iterator; pub mod display; pub mod integration_util; #[cfg(feature = "prettyprint")] From da3bad00b5351efc473a8a67708822c93b357c7a Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 28 Nov 2020 12:57:55 +0100 Subject: [PATCH 21/21] Undo unrelated change --- rust/benchmarks/Cargo.toml | 3 +-- rust/benchmarks/src/bin/tpch.rs | 4 ---- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/rust/benchmarks/Cargo.toml b/rust/benchmarks/Cargo.toml index 655fa716567..6017d88e1e0 100644 --- a/rust/benchmarks/Cargo.toml +++ b/rust/benchmarks/Cargo.toml @@ -31,5 +31,4 @@ parquet = { path = "../parquet" } datafusion = { path = "../datafusion" } structopt = { version = "0.3", default-features = false } tokio = { version = "0.2", features = ["macros", "rt-core", "rt-threaded"] } -futures = "0.3" -mimalloc = { version = "*", default-features = false } +futures = "0.3" \ No newline at end of file diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs index 96ff9817a95..82edd9874c8 100644 --- a/rust/benchmarks/src/bin/tpch.rs +++ b/rust/benchmarks/src/bin/tpch.rs @@ -16,10 +16,6 @@ // under the License. //! Benchmark derived from TPC-H. This is not an official TPC-H benchmark. -use mimalloc::MiMalloc; - -#[global_allocator] -static GLOBAL: MiMalloc = MiMalloc; use std::path::PathBuf; use std::sync::Arc;