From fdd2ca1658b06e312d57d7aabe705d31f8ed51d0 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 14 Dec 2020 18:36:22 +0100 Subject: [PATCH 1/8] Support date32 / date64 in csv --- rust/arrow/src/csv/reader.rs | 80 +++++++++++++++++++++++++- rust/arrow/src/datatypes.rs | 40 +++++++++---- rust/arrow/test/data/various_types.csv | 12 ++-- 3 files changed, 113 insertions(+), 19 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 8e5b3ad6c99..93bcc2e1c77 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -65,6 +65,9 @@ lazy_static! { .case_insensitive(true) .build() .unwrap(); + static ref DATE_RE: Regex = Regex::new(r"^\d{4}-\d\d-\d\d$").unwrap(); + static ref DATETIME_RE: Regex = + Regex::new(r"^\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d$").unwrap(); } /// Infer the data type of a record @@ -81,6 +84,10 @@ fn infer_field_schema(string: &str) -> DataType { DataType::Float64 } else if INTEGER_RE.is_match(string) { DataType::Int64 + } else if DATETIME_RE.is_match(string) { + DataType::Date64(DateUnit::Millisecond) + } else if DATE_RE.is_match(string) { + DataType::Date32(DateUnit::Day) } else { DataType::Utf8 } @@ -436,6 +443,12 @@ fn parse( &DataType::Float64 => { build_primitive_array::(line_number, rows, i) } + &DataType::Date32(_) => { + build_primitive_array::(line_number, rows, i) + } + &DataType::Date64(_) => { + build_primitive_array::(line_number, rows, i) + } &DataType::Utf8 => { let mut builder = StringBuilder::new(rows.len()); for row in rows.iter() { @@ -496,6 +509,36 @@ impl Parser for Int16Type {} impl Parser for Int8Type {} +impl Parser for Date32Type { + fn parse(string: &str) -> Option { + let from_ymd = chrono::NaiveDate::from_ymd; + let since = chrono::NaiveDate::signed_duration_since; + + match Self::DATA_TYPE { + DataType::Date32(DateUnit::Day) => { + let days = chrono::NaiveDate::parse_from_str(string, "%Y-%m-%d").ok()?; + Self::Native::from_i32(since(days, from_ymd(1970, 1, 1)).num_days() as i32) + } + _ => unreachable!("No other "), + } + } +} + +impl Parser for Date64Type { + fn parse(string: &str) -> Option { + match Self::DATA_TYPE { + DataType::Date64(DateUnit::Millisecond) => { + let millis = + chrono::NaiveDateTime::parse_from_str(string, "%Y-%m-%dT%H:%M:%S") + .map(|t| t.timestamp_millis()) + .ok()?; + Self::Native::from_i64(millis) + } + _ => unreachable!(""), + } + } +} + fn parse_item(string: &str) -> Option { T::parse(string) } @@ -929,13 +972,13 @@ mod tests { .has_header(true) .with_delimiter(b'|') .with_batch_size(512) - .with_projection(vec![0, 1, 2, 3]); + .with_projection(vec![0, 1, 2, 3, 4]); let mut csv = builder.build(file).unwrap(); let batch = csv.next().unwrap().unwrap(); assert_eq!(5, batch.num_rows()); - assert_eq!(4, batch.num_columns()); + assert_eq!(5, batch.num_columns()); let schema = batch.schema(); @@ -943,11 +986,16 @@ mod tests { assert_eq!(&DataType::Float64, schema.field(1).data_type()); assert_eq!(&DataType::Float64, schema.field(2).data_type()); assert_eq!(&DataType::Boolean, schema.field(3).data_type()); + assert_eq!( + &DataType::Date32(DateUnit::Day), + schema.field(4).data_type() + ); assert_eq!(false, schema.field(0).is_nullable()); assert_eq!(true, schema.field(1).is_nullable()); assert_eq!(true, schema.field(2).is_nullable()); assert_eq!(false, schema.field(3).is_nullable()); + assert_eq!(true, schema.field(4).is_nullable()); assert_eq!(false, batch.column(1).is_null(0)); assert_eq!(false, batch.column(1).is_null(1)); @@ -995,6 +1043,34 @@ mod tests { assert_eq!(infer_field_schema("10.2"), DataType::Float64); assert_eq!(infer_field_schema("true"), DataType::Boolean); assert_eq!(infer_field_schema("false"), DataType::Boolean); + assert_eq!( + infer_field_schema("2020-11-08"), + DataType::Date32(DateUnit::Day) + ); + assert_eq!( + infer_field_schema("2020-11-08T14:20:01"), + DataType::Date64(DateUnit::Millisecond) + ); + } + + #[test] + fn parse_date32() { + assert_eq!(parse_item::("1970-01-01").unwrap(), 0); + assert_eq!(parse_item::("2020-03-15").unwrap(), 18336); + assert_eq!(parse_item::("1945-05-08").unwrap(), -9004); + } + + #[test] + fn parse_date64() { + assert_eq!(parse_item::("1970-01-01T00:00:00").unwrap(), 0); + assert_eq!( + parse_item::("2018-11-13T17:11:10").unwrap(), + 1542129070000 + ); + assert_eq!( + parse_item::("1900-02-28T12:34:56").unwrap(), + -2203932304000 + ); } #[test] diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index d3ae00de9f8..75fd902a1c5 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -209,6 +209,16 @@ pub trait ArrowNativeType: fn to_usize(&self) -> Option { None } + + /// Convert native type from i32. + fn from_i32(_: i32) -> Option { + None + } + + /// Convert native type from i64. + fn from_i64(_: i64) -> Option { + None + } } /// Trait indicating a primitive fixed-width type (bool, ints and floats). @@ -278,6 +288,11 @@ impl ArrowNativeType for i32 { fn to_usize(&self) -> Option { num::ToPrimitive::to_usize(self) } + + /// Convert native type from i32. + fn from_i32(val: i32) -> Option { + Some(val) + } } impl ArrowNativeType for i64 { @@ -292,6 +307,11 @@ impl ArrowNativeType for i64 { fn to_usize(&self) -> Option { num::ToPrimitive::to_usize(self) } + + /// Convert native type from i64. + fn from_i64(val: i64) -> Option { + Some(val) + } } impl ArrowNativeType for u8 { @@ -1333,18 +1353,16 @@ impl Field { )); } match data_type { - DataType::List(_) => DataType::List(Box::new( - Self::from(&values[0])?, - )), - DataType::LargeList(_) => DataType::LargeList(Box::new( - Self::from(&values[0])?, - )), - DataType::FixedSizeList(_, int) => { - DataType::FixedSizeList( - Box::new(Self::from(&values[0])?), - int, - ) + DataType::List(_) => { + DataType::List(Box::new(Self::from(&values[0])?)) + } + DataType::LargeList(_) => { + DataType::LargeList(Box::new(Self::from(&values[0])?)) } + DataType::FixedSizeList(_, int) => DataType::FixedSizeList( + Box::new(Self::from(&values[0])?), + int, + ), _ => unreachable!( "Data type should be a list, largelist or fixedsizelist" ), diff --git a/rust/arrow/test/data/various_types.csv b/rust/arrow/test/data/various_types.csv index 322d9c347aa..fc36c542eb8 100644 --- a/rust/arrow/test/data/various_types.csv +++ b/rust/arrow/test/data/various_types.csv @@ -1,6 +1,6 @@ -c_int|c_float|c_string|c_bool -1|1.1|"1.11"|true -2|2.2|"2.22"|true -3||"3.33"|true -4|4.4||false -5|6.6|""|false \ No newline at end of file +c_int|c_float|c_string|c_bool|c_date +1|1.1|"1.11"|true|1970-01-01 +2|2.2|"2.22"|true|2020-11-08 +3||"3.33"|true|1969-12-31 +4|4.4||false| +5|6.6|""|false|1990-01-01 \ No newline at end of file From fd629ff5e10ca9e45dbc656056ac29d1714d20ff Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 14 Dec 2020 19:04:08 +0100 Subject: [PATCH 2/8] Remove panics --- rust/arrow/src/csv/reader.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 93bcc2e1c77..ae618beef37 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -519,7 +519,7 @@ impl Parser for Date32Type { let days = chrono::NaiveDate::parse_from_str(string, "%Y-%m-%d").ok()?; Self::Native::from_i32(since(days, from_ymd(1970, 1, 1)).num_days() as i32) } - _ => unreachable!("No other "), + _ => None, } } } @@ -534,7 +534,7 @@ impl Parser for Date64Type { .ok()?; Self::Native::from_i64(millis) } - _ => unreachable!(""), + _ => None, } } } From 75e8b7742ae0fa52e21bfc1acbcff1a267e006ec Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 15 Dec 2020 15:39:38 +0100 Subject: [PATCH 3/8] Small simplification --- rust/arrow/src/csv/reader.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index ae618beef37..ec71cb1e924 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -516,7 +516,7 @@ impl Parser for Date32Type { match Self::DATA_TYPE { DataType::Date32(DateUnit::Day) => { - let days = chrono::NaiveDate::parse_from_str(string, "%Y-%m-%d").ok()?; + let days = string.parse::().ok()?; Self::Native::from_i32(since(days, from_ymd(1970, 1, 1)).num_days() as i32) } _ => None, @@ -529,7 +529,7 @@ impl Parser for Date64Type { match Self::DATA_TYPE { DataType::Date64(DateUnit::Millisecond) => { let millis = - chrono::NaiveDateTime::parse_from_str(string, "%Y-%m-%dT%H:%M:%S") + string.parse::() .map(|t| t.timestamp_millis()) .ok()?; Self::Native::from_i64(millis) From 2ab4d37f654d87396b7d029e4895aab1c1a3fb44 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 15 Dec 2020 16:27:43 +0100 Subject: [PATCH 4/8] fmt --- rust/arrow/src/csv/reader.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index ec71cb1e924..830ae837aa0 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -528,10 +528,10 @@ impl Parser for Date64Type { fn parse(string: &str) -> Option { match Self::DATA_TYPE { DataType::Date64(DateUnit::Millisecond) => { - let millis = - string.parse::() - .map(|t| t.timestamp_millis()) - .ok()?; + let millis = string + .parse::() + .map(|t| t.timestamp_millis()) + .ok()?; Self::Native::from_i64(millis) } _ => None, From bba952a7018dd8f207e01edb97ed788ee1f70c7b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 15 Dec 2020 17:46:04 +0100 Subject: [PATCH 5/8] Add test case for floating point part --- rust/arrow/src/csv/reader.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 830ae837aa0..ef9104fc760 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -1067,6 +1067,10 @@ mod tests { parse_item::("2018-11-13T17:11:10").unwrap(), 1542129070000 ); + assert_eq!( + parse_item::("2018-11-13T17:11:10.011").unwrap(), + 1542129070011 + ); assert_eq!( parse_item::("1900-02-28T12:34:56").unwrap(), -2203932304000 From 842b27fdf2a6749979b5824a4fb1c6a744866f12 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 15 Dec 2020 17:51:27 +0100 Subject: [PATCH 6/8] Small simplification --- rust/arrow/src/csv/reader.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index ef9104fc760..9bcce0885ba 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -528,11 +528,8 @@ impl Parser for Date64Type { fn parse(string: &str) -> Option { match Self::DATA_TYPE { DataType::Date64(DateUnit::Millisecond) => { - let millis = string - .parse::() - .map(|t| t.timestamp_millis()) - .ok()?; - Self::Native::from_i64(millis) + let date_time = string.parse::().ok()?; + Self::Native::from_i64(date_time.timestamp_millis()) } _ => None, } From 7fbd42aceef202e61c06ada0925fd9fda7aeb747 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 15 Dec 2020 18:07:17 +0100 Subject: [PATCH 7/8] Extend inference test --- rust/arrow/src/csv/reader.rs | 9 +++++++-- rust/arrow/test/data/various_types.csv | 12 ++++++------ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 9bcce0885ba..ee4611973dd 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -969,13 +969,13 @@ mod tests { .has_header(true) .with_delimiter(b'|') .with_batch_size(512) - .with_projection(vec![0, 1, 2, 3, 4]); + .with_projection(vec![0, 1, 2, 3, 4, 5]); let mut csv = builder.build(file).unwrap(); let batch = csv.next().unwrap().unwrap(); assert_eq!(5, batch.num_rows()); - assert_eq!(5, batch.num_columns()); + assert_eq!(6, batch.num_columns()); let schema = batch.schema(); @@ -987,12 +987,17 @@ mod tests { &DataType::Date32(DateUnit::Day), schema.field(4).data_type() ); + assert_eq!( + &DataType::Date64(DateUnit::Millisecond), + schema.field(5).data_type() + ); assert_eq!(false, schema.field(0).is_nullable()); assert_eq!(true, schema.field(1).is_nullable()); assert_eq!(true, schema.field(2).is_nullable()); assert_eq!(false, schema.field(3).is_nullable()); assert_eq!(true, schema.field(4).is_nullable()); + assert_eq!(true, schema.field(5).is_nullable()); assert_eq!(false, batch.column(1).is_null(0)); assert_eq!(false, batch.column(1).is_null(1)); diff --git a/rust/arrow/test/data/various_types.csv b/rust/arrow/test/data/various_types.csv index fc36c542eb8..4e7816975e8 100644 --- a/rust/arrow/test/data/various_types.csv +++ b/rust/arrow/test/data/various_types.csv @@ -1,6 +1,6 @@ -c_int|c_float|c_string|c_bool|c_date -1|1.1|"1.11"|true|1970-01-01 -2|2.2|"2.22"|true|2020-11-08 -3||"3.33"|true|1969-12-31 -4|4.4||false| -5|6.6|""|false|1990-01-01 \ No newline at end of file +c_int|c_float|c_string|c_bool|c_date|c_date_time +1|1.1|"1.11"|true|1970-01-01|1970-01-01T00:00:00 +2|2.2|"2.22"|true|2020-11-08|2020-11-08T01:00:00 +3||"3.33"|true|1969-12-31|1969-11-08T02:00:00 +4|4.4||false|| +5|6.6|""|false|1990-01-01|1990-01-01T03:00:00 \ No newline at end of file From e5d2fd13f502e9fd0b07f5e165f5fbe153f56281 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 15 Dec 2020 18:17:09 +0100 Subject: [PATCH 8/8] Add check for columns names --- rust/arrow/src/csv/reader.rs | 14 ++++++++++++++ rust/arrow/test/data/various_types.csv | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index ee4611973dd..d91fef42229 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -992,6 +992,20 @@ mod tests { schema.field(5).data_type() ); + let names: Vec<&str> = + schema.fields().iter().map(|x| x.name().as_str()).collect(); + assert_eq!( + names, + vec![ + "c_int", + "c_float", + "c_string", + "c_bool", + "c_date", + "c_datetime" + ] + ); + assert_eq!(false, schema.field(0).is_nullable()); assert_eq!(true, schema.field(1).is_nullable()); assert_eq!(true, schema.field(2).is_nullable()); diff --git a/rust/arrow/test/data/various_types.csv b/rust/arrow/test/data/various_types.csv index 4e7816975e8..8f4466fbe6a 100644 --- a/rust/arrow/test/data/various_types.csv +++ b/rust/arrow/test/data/various_types.csv @@ -1,4 +1,4 @@ -c_int|c_float|c_string|c_bool|c_date|c_date_time +c_int|c_float|c_string|c_bool|c_date|c_datetime 1|1.1|"1.11"|true|1970-01-01|1970-01-01T00:00:00 2|2.2|"2.22"|true|2020-11-08|2020-11-08T01:00:00 3||"3.33"|true|1969-12-31|1969-11-08T02:00:00