From 7d802e138ce8601a3ffac1e76e3c964d6fccb77e Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 19 Nov 2020 12:17:40 +0100 Subject: [PATCH 1/4] Specialize parsers --- rust/arrow/Cargo.toml | 1 + rust/arrow/src/csv/reader.rs | 117 +++++++++++++++++++++-------------- 2 files changed, 70 insertions(+), 48 deletions(-) diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml index ff53dc9c445..0e35fcc746a 100644 --- a/rust/arrow/Cargo.toml +++ b/rust/arrow/Cargo.toml @@ -50,6 +50,7 @@ chrono = "0.4" flatbuffers = "0.6" hex = "0.4" prettytable-rs = { version = "0.8.0", optional = true } +lexical-core = "^0.7" [features] default = [] diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 9ed2d1fd4bc..50c4d966d3d 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -229,8 +229,7 @@ pub struct Reader { /// Optional projection for which columns to load (zero-based column indices) projection: Option>, /// File reader - record_iter: - Buffered>>>, StringRecord, Error>, + record_iter: Buffered>>>, StringRecord, Error>, /// Current line number line_number: usize, } @@ -369,6 +368,24 @@ impl Iterator for Reader { } } +trait Parser: ArrowPrimitiveType { + fn parse(string: &str) -> Option { + string.parse::().ok() + } +} + +impl Parser for BooleanType { + fn parse(string: &str) -> Option { + if string.eq_ignore_ascii_case("false") { + return Some(false); + } + if string.eq_ignore_ascii_case("true") { + return Some(true); + } + None + } +} + /// parses a slice of [csv_crate::StringRecord] into a [array::record_batch::RecordBatch]. fn parse( rows: &[StringRecord], @@ -387,39 +404,17 @@ fn parse( let i = *i; let field = &fields[i]; match field.data_type() { - &DataType::Boolean => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Int8 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Int16 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Int32 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Int64 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::UInt8 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::UInt16 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::UInt32 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::UInt64 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Float32 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Float64 => { - build_primitive_array::(line_number, rows, i) - } + &DataType::Boolean => build_primitive_array::(line_number, rows, i), + &DataType::Int8 => build_primitive_array::(line_number, rows, i), + &DataType::Int16 => build_primitive_array::(line_number, rows, i), + &DataType::Int32 => build_primitive_array::(line_number, rows, i), + &DataType::Int64 => build_primitive_array::(line_number, rows, i), + &DataType::UInt8 => build_primitive_array::(line_number, rows, i), + &DataType::UInt16 => build_primitive_array::(line_number, rows, i), + &DataType::UInt32 => build_primitive_array::(line_number, rows, i), + &DataType::UInt64 => build_primitive_array::(line_number, rows, i), + &DataType::Float32 => build_primitive_array::(line_number, rows, i), + &DataType::Float64 => build_primitive_array::(line_number, rows, i), &DataType::Utf8 => { let mut builder = StringBuilder::new(rows.len()); for row in rows.iter() { @@ -438,16 +433,46 @@ fn parse( }) .collect(); - let projected_fields: Vec = - projection.iter().map(|i| fields[*i].clone()).collect(); + let projected_fields: Vec = projection.iter().map(|i| fields[*i].clone()).collect(); let projected_schema = Arc::new(Schema::new(projected_fields)); arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr)) } +impl Parser for Float32Type { + fn parse(string: &str) -> Option { + lexical_core::parse(string.as_bytes()).ok() + } +} +impl Parser for Float64Type { + fn parse(string: &str) -> Option { + lexical_core::parse(string.as_bytes()).ok() + } +} + +impl Parser for UInt64Type {} + +impl Parser for UInt32Type {} + +impl Parser for UInt16Type {} + +impl Parser for UInt8Type {} + +impl Parser for Int64Type {} + +impl Parser for Int32Type {} + +impl Parser for Int16Type {} + +impl Parser for Int8Type {} + +fn parse_item(string: &str) -> Option { + T::parse(string) +} + // parses a specific column (col_idx) into an Arrow Array. -fn build_primitive_array( +fn build_primitive_array( line_number: usize, rows: &[StringRecord], col_idx: usize, @@ -460,14 +485,11 @@ fn build_primitive_array( if s.is_empty() { return Ok(None); } - let parsed = if T::DATA_TYPE == DataType::Boolean { - s.to_lowercase().parse::() - } else { - s.parse::() - }; + + let parsed = parse_item::(s); match parsed { - Ok(e) => Ok(Some(e)), - Err(_) => Err(ArrowError::ParseError(format!( + Some(e) => Ok(Some(e)), + None => Err(ArrowError::ParseError(format!( // TODO: we should surface the underlying error here. "Error while parsing value {} for column {} at line {}", s, @@ -683,8 +705,7 @@ mod tests { Field::new("lng", DataType::Float64, false), ]); - let file_with_headers = - File::open("test/data/uk_cities_with_headers.csv").unwrap(); + let file_with_headers = File::open("test/data/uk_cities_with_headers.csv").unwrap(); let file_without_headers = File::open("test/data/uk_cities.csv").unwrap(); let both_files = file_with_headers .chain(Cursor::new("\n".to_string())) @@ -888,7 +909,7 @@ mod tests { format!("{:?}", e) ), Ok(_) => panic!("should have failed"), - } + }, None => panic!("should have failed"), } } From 9fe98bc8f14c3a8084920e0980691a536f4c256a Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 19 Nov 2020 12:20:00 +0100 Subject: [PATCH 2/4] Format --- rust/arrow/src/csv/reader.rs | 53 ++++++++++++++++++++++++++---------- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 50c4d966d3d..61f0e9a240f 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -229,7 +229,8 @@ pub struct Reader { /// Optional projection for which columns to load (zero-based column indices) projection: Option>, /// File reader - record_iter: Buffered>>>, StringRecord, Error>, + record_iter: + Buffered>>>, StringRecord, Error>, /// Current line number line_number: usize, } @@ -404,17 +405,39 @@ fn parse( let i = *i; let field = &fields[i]; match field.data_type() { - &DataType::Boolean => build_primitive_array::(line_number, rows, i), - &DataType::Int8 => build_primitive_array::(line_number, rows, i), - &DataType::Int16 => build_primitive_array::(line_number, rows, i), - &DataType::Int32 => build_primitive_array::(line_number, rows, i), - &DataType::Int64 => build_primitive_array::(line_number, rows, i), - &DataType::UInt8 => build_primitive_array::(line_number, rows, i), - &DataType::UInt16 => build_primitive_array::(line_number, rows, i), - &DataType::UInt32 => build_primitive_array::(line_number, rows, i), - &DataType::UInt64 => build_primitive_array::(line_number, rows, i), - &DataType::Float32 => build_primitive_array::(line_number, rows, i), - &DataType::Float64 => build_primitive_array::(line_number, rows, i), + &DataType::Boolean => { + build_primitive_array::(line_number, rows, i) + } + &DataType::Int8 => { + build_primitive_array::(line_number, rows, i) + } + &DataType::Int16 => { + build_primitive_array::(line_number, rows, i) + } + &DataType::Int32 => { + build_primitive_array::(line_number, rows, i) + } + &DataType::Int64 => { + build_primitive_array::(line_number, rows, i) + } + &DataType::UInt8 => { + build_primitive_array::(line_number, rows, i) + } + &DataType::UInt16 => { + build_primitive_array::(line_number, rows, i) + } + &DataType::UInt32 => { + build_primitive_array::(line_number, rows, i) + } + &DataType::UInt64 => { + build_primitive_array::(line_number, rows, i) + } + &DataType::Float32 => { + build_primitive_array::(line_number, rows, i) + } + &DataType::Float64 => { + build_primitive_array::(line_number, rows, i) + } &DataType::Utf8 => { let mut builder = StringBuilder::new(rows.len()); for row in rows.iter() { @@ -433,7 +456,8 @@ fn parse( }) .collect(); - let projected_fields: Vec = projection.iter().map(|i| fields[*i].clone()).collect(); + let projected_fields: Vec = + projection.iter().map(|i| fields[*i].clone()).collect(); let projected_schema = Arc::new(Schema::new(projected_fields)); @@ -705,7 +729,8 @@ mod tests { Field::new("lng", DataType::Float64, false), ]); - let file_with_headers = File::open("test/data/uk_cities_with_headers.csv").unwrap(); + let file_with_headers = + File::open("test/data/uk_cities_with_headers.csv").unwrap(); let file_without_headers = File::open("test/data/uk_cities.csv").unwrap(); let both_files = file_with_headers .chain(Cursor::new("\n".to_string())) From 9b3e5d84a49c89004aa60b1675cbd32fde4cb2ee Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 19 Nov 2020 12:54:31 +0100 Subject: [PATCH 3/4] Format --- rust/arrow/src/csv/reader.rs | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 61f0e9a240f..6311d32ba68 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -369,24 +369,6 @@ impl Iterator for Reader { } } -trait Parser: ArrowPrimitiveType { - fn parse(string: &str) -> Option { - string.parse::().ok() - } -} - -impl Parser for BooleanType { - fn parse(string: &str) -> Option { - if string.eq_ignore_ascii_case("false") { - return Some(false); - } - if string.eq_ignore_ascii_case("true") { - return Some(true); - } - None - } -} - /// parses a slice of [csv_crate::StringRecord] into a [array::record_batch::RecordBatch]. fn parse( rows: &[StringRecord], @@ -464,6 +446,24 @@ fn parse( arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr)) } +trait Parser: ArrowPrimitiveType { + fn parse(string: &str) -> Option { + string.parse::().ok() + } +} + +impl Parser for BooleanType { + fn parse(string: &str) -> Option { + if string.eq_ignore_ascii_case("false") { + return Some(false); + } + if string.eq_ignore_ascii_case("true") { + return Some(true); + } + None + } +} + impl Parser for Float32Type { fn parse(string: &str) -> Option { lexical_core::parse(string.as_bytes()).ok() From 52bf0e839453749d1f042572a5e264b701e16904 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 21 Nov 2020 17:23:20 +0100 Subject: [PATCH 4/4] Write out cases in boolean parser --- rust/arrow/src/csv/reader.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 6311d32ba68..fc0692268ae 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -454,12 +454,12 @@ trait Parser: ArrowPrimitiveType { impl Parser for BooleanType { fn parse(string: &str) -> Option { - if string.eq_ignore_ascii_case("false") { - return Some(false); - } - if string.eq_ignore_ascii_case("true") { + if string == "false" || string == "FALSE" || string == "False" { return Some(true); } + if string == "true" || string == "TRUE" || string == "True" { + return Some(false); + } None } }