From f035c406d10cd60fe57383ad30627329f41c6e77 Mon Sep 17 00:00:00 2001 From: Andreas Zimmerer Date: Sun, 8 Nov 2020 17:56:24 +0100 Subject: [PATCH] ARROW-4804: [Rust] Parse Date32 and Date64 in CSV reader --- rust/arrow/src/csv/reader.rs | 88 +++++++++++++++++++++++--- rust/arrow/test/data/various_types.csv | 12 ++-- 2 files changed, 86 insertions(+), 14 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 4f926f9a98f..ed724cac050 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -67,6 +67,9 @@ lazy_static! { .case_insensitive(true) .build() .unwrap(); + static ref DATE_RE: Regex = Regex::new(r"^\d\d\d\d-\d\d-\d\d$").unwrap(); + static ref DATETIME_RE: Regex = + Regex::new(r"^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d$").unwrap(); } /// Infer the data type of a record @@ -79,6 +82,10 @@ fn infer_field_schema(string: &str) -> DataType { // match regex in a particular order if BOOLEAN_RE.is_match(string) { DataType::Boolean + } else if DATETIME_RE.is_match(string) { + DataType::Date64(DateUnit::Millisecond) + } else if DATE_RE.is_match(string) { + DataType::Date32(DateUnit::Day) } else if DECIMAL_RE.is_match(string) { DataType::Float64 } else if INTEGER_RE.is_match(string) { @@ -219,6 +226,35 @@ pub fn infer_schema_from_files( Schema::try_merge(&schemas) } +/// Parses a string into the specified `ArrowPrimitiveType`. +fn parse_field(s: &str) -> Result { + let from_ymd = chrono::NaiveDate::from_ymd; + let since = chrono::NaiveDate::signed_duration_since; + + match T::DATA_TYPE { + DataType::Boolean => s + .to_lowercase() + .parse::() + .map_err(|_| ArrowError::ParseError("Error parsing boolean".to_string())), + DataType::Date32(DateUnit::Day) => { + let days = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") + .map(|t| since(t, from_ymd(1970, 1, 1)).num_days() as i32); + days.map(|t| unsafe { std::mem::transmute_copy::(&t) }) + .map_err(|e| ArrowError::ParseError(e.to_string())) + } + DataType::Date64(DateUnit::Millisecond) => { + let millis = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") + .map(|t| t.timestamp_millis()); + millis + .map(|t| unsafe { std::mem::transmute_copy::(&t) }) + .map_err(|e| ArrowError::ParseError(e.to_string())) + } + _ => s + .parse::() + .map_err(|_| ArrowError::ParseError("Error parsing field".to_string())), + } +} + // optional bounds of the reader, of the form (min line, max line). type Bounds = Option<(usize, usize)>; @@ -370,6 +406,7 @@ impl Iterator for Reader { } /// parses a slice of [csv_crate::StringRecord] into a [array::record_batch::RecordBatch]. + fn parse( rows: &[StringRecord], fields: &Vec, @@ -430,6 +467,12 @@ fn parse( } Ok(Arc::new(builder.finish()) as ArrayRef) } + &DataType::Date32(_) => { + build_primitive_array::(line_number, rows, i) + } + &DataType::Date64(_) => { + build_primitive_array::(line_number, rows, i) + } other => Err(ArrowError::ParseError(format!( "Unsupported data type {:?}", other @@ -446,7 +489,6 @@ fn parse( arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr)) } -// parses a specific column (col_idx) into an Arrow Array. fn build_primitive_array( line_number: usize, rows: &[StringRecord], @@ -460,11 +502,8 @@ fn build_primitive_array( if s.is_empty() { return Ok(None); } - let parsed = if T::DATA_TYPE == DataType::Boolean { - s.to_lowercase().parse::() - } else { - s.parse::() - }; + + let parsed = parse_field::(s); match parsed { Ok(e) => Ok(Some(e)), Err(_) => Err(ArrowError::ParseError(format!( @@ -835,13 +874,13 @@ mod tests { .has_header(true) .with_delimiter(b'|') .with_batch_size(512) - .with_projection(vec![0, 1, 2, 3]); + .with_projection(vec![0, 1, 2, 3, 4]); let mut csv = builder.build(file).unwrap(); let batch = csv.next().unwrap().unwrap(); assert_eq!(5, batch.num_rows()); - assert_eq!(4, batch.num_columns()); + assert_eq!(5, batch.num_columns()); let schema = batch.schema(); @@ -849,11 +888,16 @@ mod tests { assert_eq!(&DataType::Float64, schema.field(1).data_type()); assert_eq!(&DataType::Float64, schema.field(2).data_type()); assert_eq!(&DataType::Boolean, schema.field(3).data_type()); + assert_eq!( + &DataType::Date32(DateUnit::Day), + schema.field(4).data_type() + ); assert_eq!(false, schema.field(0).is_nullable()); assert_eq!(true, schema.field(1).is_nullable()); assert_eq!(true, schema.field(2).is_nullable()); assert_eq!(false, schema.field(3).is_nullable()); + assert_eq!(true, schema.field(4).is_nullable()); assert_eq!(false, batch.column(1).is_null(0)); assert_eq!(false, batch.column(1).is_null(1)); @@ -901,6 +945,34 @@ mod tests { assert_eq!(infer_field_schema("10.2"), DataType::Float64); assert_eq!(infer_field_schema("true"), DataType::Boolean); assert_eq!(infer_field_schema("false"), DataType::Boolean); + assert_eq!( + infer_field_schema("2020-11-08"), + DataType::Date32(DateUnit::Day) + ); + assert_eq!( + infer_field_schema("2020-11-08T14:20:01"), + DataType::Date64(DateUnit::Millisecond) + ); + } + + #[test] + fn parse_date32() { + assert_eq!(parse_field::("1970-01-01").unwrap(), 0); + assert_eq!(parse_field::("2020-03-15").unwrap(), 18336); + assert_eq!(parse_field::("1945-05-08").unwrap(), -9004); + } + + #[test] + fn parse_date64() { + assert_eq!(parse_field::("1970-01-01T00:00:00").unwrap(), 0); + assert_eq!( + parse_field::("2018-11-13T17:11:10").unwrap(), + 1542129070000 + ); + assert_eq!( + parse_field::("1900-02-28T12:34:56").unwrap(), + -2203932304000 + ); } #[test] diff --git a/rust/arrow/test/data/various_types.csv b/rust/arrow/test/data/various_types.csv index 322d9c347aa..fc36c542eb8 100644 --- a/rust/arrow/test/data/various_types.csv +++ b/rust/arrow/test/data/various_types.csv @@ -1,6 +1,6 @@ -c_int|c_float|c_string|c_bool -1|1.1|"1.11"|true -2|2.2|"2.22"|true -3||"3.33"|true -4|4.4||false -5|6.6|""|false \ No newline at end of file +c_int|c_float|c_string|c_bool|c_date +1|1.1|"1.11"|true|1970-01-01 +2|2.2|"2.22"|true|2020-11-08 +3||"3.33"|true|1969-12-31 +4|4.4||false| +5|6.6|""|false|1990-01-01 \ No newline at end of file