From 05fe095b2145fbdd4d6ccc6f74b16cba0a21749f Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Thu, 11 Feb 2021 17:01:42 -0500 Subject: [PATCH 01/12] ARROW-11570: [Rust] ScalarValue - support Date64 Introduce support for ScalarValue::Date64. Closes #9452 from ovr/issue-11570 Authored-by: Dmitry Patsura Signed-off-by: Andrew Lamb --- rust/datafusion/src/scalar.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/rust/datafusion/src/scalar.rs b/rust/datafusion/src/scalar.rs index 31ca1da8edf..d5a6c141179 100644 --- a/rust/datafusion/src/scalar.rs +++ b/rust/datafusion/src/scalar.rs @@ -20,7 +20,7 @@ use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc}; use arrow::array::{ - Int16Builder, Int32Builder, Int64Builder, Int8Builder, ListBuilder, + Date64Array, Int16Builder, Int32Builder, Int64Builder, Int8Builder, ListBuilder, TimestampMicrosecondArray, TimestampNanosecondArray, UInt16Builder, UInt32Builder, UInt64Builder, UInt8Builder, }; @@ -75,6 +75,8 @@ pub enum ScalarValue { List(Option>, DataType), /// Date stored as a signed 32bit int Date32(Option), + /// Date stored as a signed 64bit int + Date64(Option), /// Timestamp Microseconds TimeMicrosecond(Option), /// Timestamp Nanoseconds @@ -156,6 +158,7 @@ impl ScalarValue { DataType::List(Box::new(Field::new("item", data_type.clone(), true))) } ScalarValue::Date32(_) => DataType::Date32, + ScalarValue::Date64(_) => DataType::Date64, ScalarValue::IntervalYearMonth(_) => { DataType::Interval(IntervalUnit::YearMonth) } @@ -329,6 +332,12 @@ impl ScalarValue { } None => Arc::new(repeat(None).take(size).collect::()), }, + ScalarValue::Date64(e) => match e { + Some(value) => { + Arc::new(Date64Array::from_iter_values(repeat(*value).take(size))) + } + None => Arc::new(repeat(None).take(size).collect::()), + }, ScalarValue::IntervalDayTime(e) => match e { Some(value) => Arc::new(IntervalDayTimeArray::from_iter_values( repeat(*value).take(size), @@ -386,6 +395,9 @@ impl ScalarValue { DataType::Date32 => { typed_cast!(array, index, Date32Array, Date32) } + DataType::Date64 => { + typed_cast!(array, index, Date64Array, Date64) + } other => { return Err(DataFusionError::NotImplemented(format!( "Can't create a scalar of array of type \"{:?}\"", @@ -580,6 +592,7 @@ impl fmt::Display for ScalarValue { None => write!(f, "NULL")?, }, ScalarValue::Date32(e) => format_option!(f, e)?, + ScalarValue::Date64(e) => format_option!(f, e)?, ScalarValue::IntervalDayTime(e) => format_option!(f, e)?, ScalarValue::IntervalYearMonth(e) => format_option!(f, e)?, }; @@ -609,6 +622,7 @@ impl fmt::Debug for ScalarValue { ScalarValue::LargeUtf8(Some(_)) => write!(f, "LargeUtf8(\"{}\")", self), ScalarValue::List(_, _) => write!(f, "List([{}])", self), ScalarValue::Date32(_) => write!(f, "Date32(\"{}\")", self), + ScalarValue::Date64(_) => write!(f, "Date64(\"{}\")", self), ScalarValue::IntervalDayTime(_) => { write!(f, "IntervalDayTime(\"{}\")", self) } From 2be54a5cfaa557f8b02e0f142be93c61b6d366e9 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 12 Feb 2021 06:10:06 -0500 Subject: [PATCH 02/12] ARROW-11563: [Rust] Support Cast(Utf8, TimeStamp(Nanoseconds, None)) DataFushion already has this cast with its own implementation inside DF based on top of the chrono library. In this PR, I moved functions called string_to_timestamp_nanos and naive_datetime_to_timestamp (helpers) to Arrow's compute::kernels crate to make it possible to do Cast(Utf8, Timestamp(Nanoseconds, None)) and usage these helpers inside DF for to_timestamp function. Closes #9449 from ovr/issue-11563 Authored-by: Dmitry Patsura Signed-off-by: Andrew Lamb --- rust/arrow/src/compute/kernels/cast.rs | 36 +++ rust/arrow/src/compute/kernels/cast_utils.rs | 299 ++++++++++++++++++ rust/arrow/src/compute/kernels/mod.rs | 1 + rust/arrow/src/error.rs | 2 + .../src/physical_plan/datetime_expressions.rs | 282 +---------------- 5 files changed, 341 insertions(+), 279 deletions(-) create mode 100644 rust/arrow/src/compute/kernels/cast_utils.rs diff --git a/rust/arrow/src/compute/kernels/cast.rs b/rust/arrow/src/compute/kernels/cast.rs index 7099f3025ec..d4874797427 100644 --- a/rust/arrow/src/compute/kernels/cast.rs +++ b/rust/arrow/src/compute/kernels/cast.rs @@ -40,6 +40,7 @@ use std::sync::Arc; use crate::compute::kernels::arithmetic::{divide, multiply}; use crate::compute::kernels::arity::unary; +use crate::compute::kernels::cast_utils::string_to_timestamp_nanos; use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::{array::*, compute::take}; @@ -74,6 +75,7 @@ pub fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool { (Utf8, Date32) => true, (Utf8, Date64) => true, + (Utf8, Timestamp(TimeUnit::Nanosecond, None)) => true, (Utf8, _) => DataType::is_numeric(to_type), (_, Utf8) => DataType::is_numeric(from_type) || from_type == &Binary, @@ -411,6 +413,22 @@ pub fn cast(array: &ArrayRef, to_type: &DataType) -> Result { } Ok(Arc::new(builder.finish()) as ArrayRef) } + Timestamp(TimeUnit::Nanosecond, None) => { + let string_array = array.as_any().downcast_ref::().unwrap(); + let mut builder = + PrimitiveBuilder::::new(string_array.len()); + for i in 0..string_array.len() { + if string_array.is_null(i) { + builder.append_null()?; + } else { + match string_to_timestamp_nanos(string_array.value(i)) { + Ok(nanos) => builder.append_value(nanos)?, + Err(_) => builder.append_null()?, // not a valid date + }; + } + } + Ok(Arc::new(builder.finish()) as ArrayRef) + } _ => Err(ArrowError::ComputeError(format!( "Casting from {:?} to {:?} not supported", from_type, to_type, @@ -1485,6 +1503,24 @@ mod tests { assert!(c.is_null(2)); } + #[test] + fn test_cast_string_to_timestamp() { + let a = StringArray::from(vec![ + Some("2020-09-08T12:00:00+00:00"), + Some("Not a valid date"), + None, + ]); + let array = Arc::new(a) as ArrayRef; + let b = cast(&array, &DataType::Timestamp(TimeUnit::Nanosecond, None)).unwrap(); + let c = b + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(1599566400000000000, c.value(0)); + assert!(c.is_null(1)); + assert!(c.is_null(2)); + } + #[test] fn test_cast_date32_to_int32() { let a = Date32Array::from(vec![10000, 17890]); diff --git a/rust/arrow/src/compute/kernels/cast_utils.rs b/rust/arrow/src/compute/kernels/cast_utils.rs new file mode 100644 index 00000000000..a06bf421ea4 --- /dev/null +++ b/rust/arrow/src/compute/kernels/cast_utils.rs @@ -0,0 +1,299 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{ArrowError, Result}; +use chrono::{prelude::*, LocalResult}; + +/// Accepts a string in RFC3339 / ISO8601 standard format and some +/// variants and converts it to a nanosecond precision timestamp. +/// +/// Implements the `to_timestamp` function to convert a string to a +/// timestamp, following the model of spark SQL’s to_`timestamp`. +/// +/// In addition to RFC3339 / ISO8601 standard timestamps, it also +/// accepts strings that use a space ` ` to separate the date and time +/// as well as strings that have no explicit timezone offset. +/// +/// Examples of accepted inputs: +/// * `1997-01-31T09:26:56.123Z` # RCF3339 +/// * `1997-01-31T09:26:56.123-05:00` # RCF3339 +/// * `1997-01-31 09:26:56.123-05:00` # close to RCF3339 but with a space rather than T +/// * `1997-01-31T09:26:56.123` # close to RCF3339 but no timezone offset specified +/// * `1997-01-31 09:26:56.123` # close to RCF3339 but uses a space and no timezone offset +/// * `1997-01-31 09:26:56` # close to RCF3339, no fractional seconds +// +/// Internally, this function uses the `chrono` library for the +/// datetime parsing +/// +/// We hope to extend this function in the future with a second +/// parameter to specifying the format string. +/// +/// ## Timestamp Precision +/// +/// Function uses the maximum precision timestamps supported by +/// Arrow (nanoseconds stored as a 64-bit integer) timestamps. This +/// means the range of dates that timestamps can represent is ~1677 AD +/// to 2262 AM +/// +/// +/// ## Timezone / Offset Handling +/// +/// Numerical values of timestamps are stored compared to offset UTC. +/// +/// This function intertprets strings without an explicit time zone as +/// timestamps with offsets of the local time on the machine +/// +/// For example, `1997-01-31 09:26:56.123Z` is interpreted as UTC, as +/// it has an explicit timezone specifier (“Z” for Zulu/UTC) +/// +/// `1997-01-31T09:26:56.123` is interpreted as a local timestamp in +/// the timezone of the machine. For example, if +/// the system timezone is set to Americas/New_York (UTC-5) the +/// timestamp will be interpreted as though it were +/// `1997-01-31T09:26:56.123-05:00` +#[inline] +pub fn string_to_timestamp_nanos(s: &str) -> Result { + // Fast path: RFC3339 timestamp (with a T) + // Example: 2020-09-08T13:42:29.190855Z + if let Ok(ts) = DateTime::parse_from_rfc3339(s) { + return Ok(ts.timestamp_nanos()); + } + + // Implement quasi-RFC3339 support by trying to parse the + // timestamp with various other format specifiers to to support + // separating the date and time with a space ' ' rather than 'T' to be + // (more) compatible with Apache Spark SQL + + // timezone offset, using ' ' as a separator + // Example: 2020-09-08 13:42:29.190855-05:00 + if let Ok(ts) = DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%:z") { + return Ok(ts.timestamp_nanos()); + } + + // with an explicit Z, using ' ' as a separator + // Example: 2020-09-08 13:42:29Z + if let Ok(ts) = Utc.datetime_from_str(s, "%Y-%m-%d %H:%M:%S%.fZ") { + return Ok(ts.timestamp_nanos()); + } + + // Support timestamps without an explicit timezone offset, again + // to be compatible with what Apache Spark SQL does. + + // without a timezone specifier as a local time, using T as a separator + // Example: 2020-09-08T13:42:29.190855 + if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S.%f") { + return naive_datetime_to_timestamp(s, ts); + } + + // without a timezone specifier as a local time, using T as a + // separator, no fractional seconds + // Example: 2020-09-08T13:42:29 + if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") { + return naive_datetime_to_timestamp(s, ts); + } + + // without a timezone specifier as a local time, using ' ' as a separator + // Example: 2020-09-08 13:42:29.190855 + if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S.%f") { + return naive_datetime_to_timestamp(s, ts); + } + + // without a timezone specifier as a local time, using ' ' as a + // separator, no fractional seconds + // Example: 2020-09-08 13:42:29 + if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") { + return naive_datetime_to_timestamp(s, ts); + } + + // Note we don't pass along the error message from the underlying + // chrono parsing because we tried several different format + // strings and we don't know which the user was trying to + // match. Ths any of the specific error messages is likely to be + // be more confusing than helpful + Err(ArrowError::CastError(format!( + "Error parsing '{}' as timestamp", + s + ))) +} + +/// Converts the naive datetime (which has no specific timezone) to a +/// nanosecond epoch timestamp relative to UTC. +fn naive_datetime_to_timestamp(s: &str, datetime: NaiveDateTime) -> Result { + let l = Local {}; + + match l.from_local_datetime(&datetime) { + LocalResult::None => Err(ArrowError::CastError(format!( + "Error parsing '{}' as timestamp: local time representation is invalid", + s + ))), + LocalResult::Single(local_datetime) => { + Ok(local_datetime.with_timezone(&Utc).timestamp_nanos()) + } + // Ambiguous times can happen if the timestamp is exactly when + // a daylight savings time transition occurs, for example, and + // so the datetime could validly be said to be in two + // potential offsets. However, since we are about to convert + // to UTC anyways, we can pick one arbitrarily + LocalResult::Ambiguous(local_datetime, _) => { + Ok(local_datetime.with_timezone(&Utc).timestamp_nanos()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn string_to_timestamp_timezone() -> Result<()> { + // Explicit timezone + assert_eq!( + 1599572549190855000, + parse_timestamp("2020-09-08T13:42:29.190855+00:00")? + ); + assert_eq!( + 1599572549190855000, + parse_timestamp("2020-09-08T13:42:29.190855Z")? + ); + assert_eq!( + 1599572549000000000, + parse_timestamp("2020-09-08T13:42:29Z")? + ); // no fractional part + assert_eq!( + 1599590549190855000, + parse_timestamp("2020-09-08T13:42:29.190855-05:00")? + ); + Ok(()) + } + + #[test] + fn string_to_timestamp_timezone_space() -> Result<()> { + // Ensure space rather than T between time and date is accepted + assert_eq!( + 1599572549190855000, + parse_timestamp("2020-09-08 13:42:29.190855+00:00")? + ); + assert_eq!( + 1599572549190855000, + parse_timestamp("2020-09-08 13:42:29.190855Z")? + ); + assert_eq!( + 1599572549000000000, + parse_timestamp("2020-09-08 13:42:29Z")? + ); // no fractional part + assert_eq!( + 1599590549190855000, + parse_timestamp("2020-09-08 13:42:29.190855-05:00")? + ); + Ok(()) + } + + /// Interprets a naive_datetime (with no explicit timzone offset) + /// using the local timezone and returns the timestamp in UTC (0 + /// offset) + fn naive_datetime_to_timestamp(naive_datetime: &NaiveDateTime) -> i64 { + // Note: Use chrono APIs that are different than + // naive_datetime_to_timestamp to compute the utc offset to + // try and double check the logic + let utc_offset_secs = match Local.offset_from_local_datetime(&naive_datetime) { + LocalResult::Single(local_offset) => { + local_offset.fix().local_minus_utc() as i64 + } + _ => panic!("Unexpected failure converting to local datetime"), + }; + let utc_offset_nanos = utc_offset_secs * 1_000_000_000; + naive_datetime.timestamp_nanos() - utc_offset_nanos + } + + #[test] + fn string_to_timestamp_no_timezone() -> Result<()> { + // This test is designed to succeed in regardless of the local + // timezone the test machine is running. Thus it is still + // somewhat suceptable to bugs in the use of chrono + let naive_datetime = NaiveDateTime::new( + NaiveDate::from_ymd(2020, 9, 8), + NaiveTime::from_hms_nano(13, 42, 29, 190855), + ); + + // Ensure both T and ' ' variants work + assert_eq!( + naive_datetime_to_timestamp(&naive_datetime), + parse_timestamp("2020-09-08T13:42:29.190855")? + ); + + assert_eq!( + naive_datetime_to_timestamp(&naive_datetime), + parse_timestamp("2020-09-08 13:42:29.190855")? + ); + + // Also ensure that parsing timestamps with no fractional + // second part works as well + let naive_datetime_whole_secs = NaiveDateTime::new( + NaiveDate::from_ymd(2020, 9, 8), + NaiveTime::from_hms(13, 42, 29), + ); + + // Ensure both T and ' ' variants work + assert_eq!( + naive_datetime_to_timestamp(&naive_datetime_whole_secs), + parse_timestamp("2020-09-08T13:42:29")? + ); + + assert_eq!( + naive_datetime_to_timestamp(&naive_datetime_whole_secs), + parse_timestamp("2020-09-08 13:42:29")? + ); + + Ok(()) + } + + #[test] + fn string_to_timestamp_invalid() { + // Test parsing invalid formats + + // It would be nice to make these messages better + expect_timestamp_parse_error("", "Error parsing '' as timestamp"); + expect_timestamp_parse_error("SS", "Error parsing 'SS' as timestamp"); + expect_timestamp_parse_error( + "Wed, 18 Feb 2015 23:16:09 GMT", + "Error parsing 'Wed, 18 Feb 2015 23:16:09 GMT' as timestamp", + ); + } + + // Parse a timestamp to timestamp int with a useful human readable error message + fn parse_timestamp(s: &str) -> Result { + let result = string_to_timestamp_nanos(s); + if let Err(e) = &result { + eprintln!("Error parsing timestamp '{}': {:?}", s, e); + } + result + } + + fn expect_timestamp_parse_error(s: &str, expected_err: &str) { + match string_to_timestamp_nanos(s) { + Ok(v) => panic!( + "Expected error '{}' while parsing '{}', but parsed {} instead", + expected_err, s, v + ), + Err(e) => { + assert!(e.to_string().contains(expected_err), + "Can not find expected error '{}' while parsing '{}'. Actual error '{}'", + expected_err, s, e); + } + } + } +} diff --git a/rust/arrow/src/compute/kernels/mod.rs b/rust/arrow/src/compute/kernels/mod.rs index 62d3642f7e8..a8d24979e04 100644 --- a/rust/arrow/src/compute/kernels/mod.rs +++ b/rust/arrow/src/compute/kernels/mod.rs @@ -22,6 +22,7 @@ pub mod arithmetic; pub mod arity; pub mod boolean; pub mod cast; +pub mod cast_utils; pub mod comparison; pub mod concat; pub mod filter; diff --git a/rust/arrow/src/error.rs b/rust/arrow/src/error.rs index 2b9b2b577fe..861c7080094 100644 --- a/rust/arrow/src/error.rs +++ b/rust/arrow/src/error.rs @@ -27,6 +27,7 @@ pub enum ArrowError { /// Returned when functionality is not yet available. NotYetImplemented(String), ExternalError(Box), + CastError(String), MemoryError(String), ParseError(String), SchemaError(String), @@ -96,6 +97,7 @@ impl Display for ArrowError { write!(f, "Not yet implemented: {}", &source) } ArrowError::ExternalError(source) => write!(f, "External error: {}", &source), + ArrowError::CastError(desc) => write!(f, "Cast error: {}", desc), ArrowError::MemoryError(desc) => write!(f, "Memory error: {}", desc), ArrowError::ParseError(desc) => write!(f, "Parser error: {}", desc), ArrowError::SchemaError(desc) => write!(f, "Schema error: {}", desc), diff --git a/rust/datafusion/src/physical_plan/datetime_expressions.rs b/rust/datafusion/src/physical_plan/datetime_expressions.rs index a70ebcc4fef..34414586983 100644 --- a/rust/datafusion/src/physical_plan/datetime_expressions.rs +++ b/rust/datafusion/src/physical_plan/datetime_expressions.rs @@ -23,149 +23,11 @@ use crate::error::{DataFusionError, Result}; use arrow::{ array::{Array, ArrayData, ArrayRef, StringArray, TimestampNanosecondArray}, buffer::Buffer, + compute::kernels::cast_utils::string_to_timestamp_nanos, datatypes::{DataType, TimeUnit, ToByteSlice}, }; +use chrono::prelude::*; use chrono::Duration; -use chrono::{prelude::*, LocalResult}; - -#[inline] -/// Accepts a string in RFC3339 / ISO8601 standard format and some -/// variants and converts it to a nanosecond precision timestamp. -/// -/// Implements the `to_timestamp` function to convert a string to a -/// timestamp, following the model of spark SQL’s to_`timestamp`. -/// -/// In addition to RFC3339 / ISO8601 standard timestamps, it also -/// accepts strings that use a space ` ` to separate the date and time -/// as well as strings that have no explicit timezone offset. -/// -/// Examples of accepted inputs: -/// * `1997-01-31T09:26:56.123Z` # RCF3339 -/// * `1997-01-31T09:26:56.123-05:00` # RCF3339 -/// * `1997-01-31 09:26:56.123-05:00` # close to RCF3339 but with a space rather than T -/// * `1997-01-31T09:26:56.123` # close to RCF3339 but no timezone offset specified -/// * `1997-01-31 09:26:56.123` # close to RCF3339 but uses a space and no timezone offset -/// * `1997-01-31 09:26:56` # close to RCF3339, no fractional seconds -// -/// Internally, this function uses the `chrono` library for the -/// datetime parsing -/// -/// We hope to extend this function in the future with a second -/// parameter to specifying the format string. -/// -/// ## Timestamp Precision -/// -/// DataFusion uses the maximum precision timestamps supported by -/// Arrow (nanoseconds stored as a 64-bit integer) timestamps. This -/// means the range of dates that timestamps can represent is ~1677 AD -/// to 2262 AM -/// -/// -/// ## Timezone / Offset Handling -/// -/// By using the Arrow format, DataFusion inherits Arrow’s handling of -/// timestamp values. Specifically, the stored numerical values of -/// timestamps are stored compared to offset UTC. -/// -/// This function intertprets strings without an explicit time zone as -/// timestamps with offsets of the local time on the machine that ran -/// the datafusion query -/// -/// For example, `1997-01-31 09:26:56.123Z` is interpreted as UTC, as -/// it has an explicit timezone specifier (“Z” for Zulu/UTC) -/// -/// `1997-01-31T09:26:56.123` is interpreted as a local timestamp in -/// the timezone of the machine that ran DataFusion. For example, if -/// the system timezone is set to Americas/New_York (UTC-5) the -/// timestamp will be interpreted as though it were -/// `1997-01-31T09:26:56.123-05:00` -fn string_to_timestamp_nanos(s: &str) -> Result { - // Fast path: RFC3339 timestamp (with a T) - // Example: 2020-09-08T13:42:29.190855Z - if let Ok(ts) = DateTime::parse_from_rfc3339(s) { - return Ok(ts.timestamp_nanos()); - } - - // Implement quasi-RFC3339 support by trying to parse the - // timestamp with various other format specifiers to to support - // separating the date and time with a space ' ' rather than 'T' to be - // (more) compatible with Apache Spark SQL - - // timezone offset, using ' ' as a separator - // Example: 2020-09-08 13:42:29.190855-05:00 - if let Ok(ts) = DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%:z") { - return Ok(ts.timestamp_nanos()); - } - - // with an explicit Z, using ' ' as a separator - // Example: 2020-09-08 13:42:29Z - if let Ok(ts) = Utc.datetime_from_str(s, "%Y-%m-%d %H:%M:%S%.fZ") { - return Ok(ts.timestamp_nanos()); - } - - // Support timestamps without an explicit timezone offset, again - // to be compatible with what Apache Spark SQL does. - - // without a timezone specifier as a local time, using T as a separator - // Example: 2020-09-08T13:42:29.190855 - if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S.%f") { - return naive_datetime_to_timestamp(s, ts); - } - - // without a timezone specifier as a local time, using T as a - // separator, no fractional seconds - // Example: 2020-09-08T13:42:29 - if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") { - return naive_datetime_to_timestamp(s, ts); - } - - // without a timezone specifier as a local time, using ' ' as a separator - // Example: 2020-09-08 13:42:29.190855 - if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S.%f") { - return naive_datetime_to_timestamp(s, ts); - } - - // without a timezone specifier as a local time, using ' ' as a - // separator, no fractional seconds - // Example: 2020-09-08 13:42:29 - if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") { - return naive_datetime_to_timestamp(s, ts); - } - - // Note we don't pass along the error message from the underlying - // chrono parsing because we tried several different format - // strings and we don't know which the user was trying to - // match. Ths any of the specific error messages is likely to be - // be more confusing than helpful - Err(DataFusionError::Execution(format!( - "Error parsing '{}' as timestamp", - s - ))) -} - -/// Converts the naive datetime (which has no specific timezone) to a -/// nanosecond epoch timestamp relative to UTC. -fn naive_datetime_to_timestamp(s: &str, datetime: NaiveDateTime) -> Result { - let l = Local {}; - - match l.from_local_datetime(&datetime) { - LocalResult::None => Err(DataFusionError::Execution(format!( - "Error parsing '{}' as timestamp: local time representation is invalid", - s - ))), - LocalResult::Single(local_datetime) => { - Ok(local_datetime.with_timezone(&Utc).timestamp_nanos()) - } - // Ambiguous times can happen if the timestamp is exactly when - // a daylight savings time transition occurs, for example, and - // so the datetime could validly be said to be in two - // potential offsets. However, since we are about to convert - // to UTC anyways, we can pick one arbitrarily - LocalResult::Ambiguous(local_datetime, _) => { - Ok(local_datetime.with_timezone(&Utc).timestamp_nanos()) - } - } -} /// convert an array of strings into `Timestamp(Nanosecond, None)` pub fn to_timestamp(args: &[ArrayRef]) -> Result { @@ -189,6 +51,7 @@ pub fn to_timestamp(args: &[ArrayRef]) -> Result { Ok(0) } else { string_to_timestamp_nanos(string_args.value(i)) + .map_err(DataFusionError::ArrowError) } }) .collect::>>()?; @@ -316,145 +179,6 @@ mod tests { use super::*; - #[test] - fn string_to_timestamp_timezone() -> Result<()> { - // Explicit timezone - assert_eq!( - 1599572549190855000, - parse_timestamp("2020-09-08T13:42:29.190855+00:00")? - ); - assert_eq!( - 1599572549190855000, - parse_timestamp("2020-09-08T13:42:29.190855Z")? - ); - assert_eq!( - 1599572549000000000, - parse_timestamp("2020-09-08T13:42:29Z")? - ); // no fractional part - assert_eq!( - 1599590549190855000, - parse_timestamp("2020-09-08T13:42:29.190855-05:00")? - ); - Ok(()) - } - - #[test] - fn string_to_timestamp_timezone_space() -> Result<()> { - // Ensure space rather than T between time and date is accepted - assert_eq!( - 1599572549190855000, - parse_timestamp("2020-09-08 13:42:29.190855+00:00")? - ); - assert_eq!( - 1599572549190855000, - parse_timestamp("2020-09-08 13:42:29.190855Z")? - ); - assert_eq!( - 1599572549000000000, - parse_timestamp("2020-09-08 13:42:29Z")? - ); // no fractional part - assert_eq!( - 1599590549190855000, - parse_timestamp("2020-09-08 13:42:29.190855-05:00")? - ); - Ok(()) - } - - /// Interprets a naive_datetime (with no explicit timzone offset) - /// using the local timezone and returns the timestamp in UTC (0 - /// offset) - fn naive_datetime_to_timestamp(naive_datetime: &NaiveDateTime) -> i64 { - // Note: Use chrono APIs that are different than - // naive_datetime_to_timestamp to compute the utc offset to - // try and double check the logic - let utc_offset_secs = match Local.offset_from_local_datetime(&naive_datetime) { - LocalResult::Single(local_offset) => { - local_offset.fix().local_minus_utc() as i64 - } - _ => panic!("Unexpected failure converting to local datetime"), - }; - let utc_offset_nanos = utc_offset_secs * 1_000_000_000; - naive_datetime.timestamp_nanos() - utc_offset_nanos - } - - #[test] - fn string_to_timestamp_no_timezone() -> Result<()> { - // This test is designed to succeed in regardless of the local - // timezone the test machine is running. Thus it is still - // somewhat suceptable to bugs in the use of chrono - let naive_datetime = NaiveDateTime::new( - NaiveDate::from_ymd(2020, 9, 8), - NaiveTime::from_hms_nano(13, 42, 29, 190855), - ); - - // Ensure both T and ' ' variants work - assert_eq!( - naive_datetime_to_timestamp(&naive_datetime), - parse_timestamp("2020-09-08T13:42:29.190855")? - ); - - assert_eq!( - naive_datetime_to_timestamp(&naive_datetime), - parse_timestamp("2020-09-08 13:42:29.190855")? - ); - - // Also ensure that parsing timestamps with no fractional - // second part works as well - let naive_datetime_whole_secs = NaiveDateTime::new( - NaiveDate::from_ymd(2020, 9, 8), - NaiveTime::from_hms(13, 42, 29), - ); - - // Ensure both T and ' ' variants work - assert_eq!( - naive_datetime_to_timestamp(&naive_datetime_whole_secs), - parse_timestamp("2020-09-08T13:42:29")? - ); - - assert_eq!( - naive_datetime_to_timestamp(&naive_datetime_whole_secs), - parse_timestamp("2020-09-08 13:42:29")? - ); - - Ok(()) - } - - #[test] - fn string_to_timestamp_invalid() { - // Test parsing invalid formats - - // It would be nice to make these messages better - expect_timestamp_parse_error("", "Error parsing '' as timestamp"); - expect_timestamp_parse_error("SS", "Error parsing 'SS' as timestamp"); - expect_timestamp_parse_error( - "Wed, 18 Feb 2015 23:16:09 GMT", - "Error parsing 'Wed, 18 Feb 2015 23:16:09 GMT' as timestamp", - ); - } - - // Parse a timestamp to timestamp int with a useful human readable error message - fn parse_timestamp(s: &str) -> Result { - let result = string_to_timestamp_nanos(s); - if let Err(e) = &result { - eprintln!("Error parsing timestamp '{}': {:?}", s, e); - } - result - } - - fn expect_timestamp_parse_error(s: &str, expected_err: &str) { - match string_to_timestamp_nanos(s) { - Ok(v) => panic!( - "Expected error '{}' while parsing '{}', but parsed {} instead", - expected_err, s, v - ), - Err(e) => { - assert!(e.to_string().contains(expected_err), - "Can not find expected error '{}' while parsing '{}'. Actual error '{}'", - expected_err, s, e); - } - } - } - #[test] fn to_timestamp_arrays_and_nulls() -> Result<()> { // ensure that arrow array implementation is wired up and handles nulls correctly From d6fee755da3c27f159f01293ed035d841f5f4c03 Mon Sep 17 00:00:00 2001 From: Marc Prud'hommeaux Date: Fri, 12 Feb 2021 06:14:53 -0500 Subject: [PATCH 03/12] ARROW-11557: [Rust][Datafusion] Add deregister_table https://issues.apache.org/jira/browse/ARROW-11557 Table de-registration, as discussed at https://lists.apache.org/thread.html/r0b3bc62a720c204c5bbe26d8157963276f7d61c05fcbad7eaf2ae9ff%40%3Cdev.arrow.apache.org%3E Closes #9445 from marcprux/patch-4 Authored-by: Marc Prud'hommeaux Signed-off-by: Andrew Lamb --- rust/datafusion/src/execution/context.rs | 29 ++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index a6fd020fa0b..2977d9816ca 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -287,6 +287,18 @@ impl ExecutionContext { .insert(name.to_string(), provider.into()); } + /// Deregisters the named table. + /// + /// Returns true if the table was successfully de-reregistered. + pub fn deregister_table(&mut self, name: &str) -> bool { + self.state + .lock() + .unwrap() + .datasources + .remove(&name.to_string()) + .is_some() + } + /// Retrieves a DataFrame representing a table previously registered by calling the /// register_table function. /// @@ -723,6 +735,21 @@ mod tests { Ok(()) } + #[tokio::test] + async fn register_deregister() -> Result<()> { + let tmp_dir = TempDir::new()?; + let partition_count = 4; + let mut ctx = create_ctx(&tmp_dir, partition_count)?; + + let provider = test::create_table_dual(); + ctx.register_table("dual", provider); + + assert_eq!(ctx.deregister_table("dual"), true); + assert_eq!(ctx.deregister_table("dual"), false); + + Ok(()) + } + #[tokio::test] async fn parallel_query_with_filter() -> Result<()> { let tmp_dir = TempDir::new()?; @@ -1668,6 +1695,8 @@ mod tests { assert_eq!(a.value(i) + b.value(i), sum.value(i)); } + ctx.deregister_table("t"); + Ok(()) } From 34e76712a1185f6d80d15e3edc9c193b78c58091 Mon Sep 17 00:00:00 2001 From: Diana Clarke Date: Fri, 12 Feb 2021 07:51:02 -0500 Subject: [PATCH 04/12] ARROW-11539: [Developer][Archery] Change items_per_seconds units Closes #9433 from dianaclarke/ARROW-11539 and squashes the following commits: 4041d49cb ARROW-11539: Change items_per_seconds units Authored-by: Diana Clarke Signed-off-by: David Li --- dev/archery/archery/benchmark/compare.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dev/archery/archery/benchmark/compare.py b/dev/archery/archery/benchmark/compare.py index 474f6d40064..622b8017917 100644 --- a/dev/archery/archery/benchmark/compare.py +++ b/dev/archery/archery/benchmark/compare.py @@ -25,11 +25,11 @@ def items_per_seconds_fmt(value): if value < 1000: return "{} items/sec".format(value) if value < 1000**2: - return "{:.3f}k items/sec".format(value / 1000) + return "{:.3f}K items/sec".format(value / 1000) if value < 1000**3: - return "{:.3f}m items/sec".format(value / 1000**2) + return "{:.3f}M items/sec".format(value / 1000**2) else: - return "{:.3f}b items/sec".format(value / 1000**3) + return "{:.3f}G items/sec".format(value / 1000**3) def bytes_per_seconds_fmt(value): From a350ebc5795166acf7caa6a532aa6494f5d95fa4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 12 Feb 2021 17:28:00 -0500 Subject: [PATCH 05/12] ARROW-11606: [Rust] [DataFusion] Add input schema to HashAggregateExec To make it easier to implement serde for `HashAggregateExec` we need access to the schema that the aggregate expressions are compiled against. For `Partial` aggregates this is the same as the schema of the input. However, for `Final` aggregates it is not, which is why we need to add this additional schema information. Closes #9481 from andygrove/hash-agg-input-schema Authored-by: Andy Grove Signed-off-by: Andrew Lamb --- .../src/physical_plan/hash_aggregate.rs | 17 +++++++++++- rust/datafusion/src/physical_plan/planner.rs | 26 +++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs b/rust/datafusion/src/physical_plan/hash_aggregate.rs index 54d25f17dba..342ca320f86 100644 --- a/rust/datafusion/src/physical_plan/hash_aggregate.rs +++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs @@ -77,10 +77,14 @@ pub struct HashAggregateExec { group_expr: Vec<(Arc, String)>, /// Aggregate expressions aggr_expr: Vec>, - /// Input plan + /// Input plan, could be a partial aggregate or the input to the aggregate input: Arc, /// Schema after the aggregate is applied schema: SchemaRef, + /// Input schema before any aggregation is applied. For partial aggregate this will be the + /// same as input.schema() but for the final aggregate it will be the same as the input + /// to the partial aggregate + input_schema: SchemaRef, } fn create_schema( @@ -123,6 +127,7 @@ impl HashAggregateExec { group_expr: Vec<(Arc, String)>, aggr_expr: Vec>, input: Arc, + input_schema: SchemaRef, ) -> Result { let schema = create_schema(&input.schema(), &group_expr, &aggr_expr, mode)?; @@ -134,6 +139,7 @@ impl HashAggregateExec { aggr_expr, input, schema, + input_schema, }) } @@ -156,6 +162,11 @@ impl HashAggregateExec { pub fn input(&self) -> &Arc { &self.input } + + /// Get the input schema before any aggregates are applied + pub fn input_schema(&self) -> SchemaRef { + self.input_schema.clone() + } } #[async_trait] @@ -217,6 +228,7 @@ impl ExecutionPlan for HashAggregateExec { self.group_expr.clone(), self.aggr_expr.clone(), children[0].clone(), + self.input_schema.clone(), )?)), _ => Err(DataFusionError::Internal( "HashAggregateExec wrong number of children".to_string(), @@ -1048,11 +1060,13 @@ mod tests { DataType::Float64, ))]; + let input_schema = input.schema(); let partial_aggregate = Arc::new(HashAggregateExec::try_new( AggregateMode::Partial, groups.clone(), aggregates.clone(), input, + input_schema.clone(), )?); let result = common::collect(partial_aggregate.execute(0).await?).await?; @@ -1082,6 +1096,7 @@ mod tests { .collect(), aggregates, merge, + input_schema, )?); let result = common::collect(merged_aggregate.execute(0).await?).await?; diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index 93bb94327a4..3f2df33d8f8 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -186,6 +186,7 @@ impl DefaultPhysicalPlanner { } => { // Initially need to perform the aggregate and then merge the partitions let input_exec = self.create_physical_plan(input, ctx_state)?; + let input_schema = input_exec.schema(); let physical_input_schema = input_exec.as_ref().schema(); let logical_input_schema = input.as_ref().schema(); @@ -219,6 +220,7 @@ impl DefaultPhysicalPlanner { groups.clone(), aggregates.clone(), input_exec, + input_schema.clone(), )?); let final_group: Vec> = @@ -235,6 +237,7 @@ impl DefaultPhysicalPlanner { .collect(), aggregates, initial_aggr, + input_schema, )?)) } LogicalPlan::Projection { input, expr, .. } => { @@ -978,6 +981,29 @@ mod tests { Ok(()) } + #[test] + fn hash_agg_input_schema() -> Result<()> { + let testdata = arrow::util::test_util::arrow_test_data(); + let path = format!("{}/csv/aggregate_test_100.csv", testdata); + + let options = CsvReadOptions::new().schema_infer_max_records(100); + let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)? + .aggregate(&[col("c1")], &[sum(col("c2"))])? + .build()?; + + let execution_plan = plan(&logical_plan)?; + let final_hash_agg = execution_plan + .as_any() + .downcast_ref::() + .expect("hash aggregate"); + assert_eq!("SUM(c2)", final_hash_agg.schema().field(1).name()); + // we need access to the input to the partial aggregate so that other projects can + // implement serde + assert_eq!("c2", final_hash_agg.input_schema().field(1).name()); + + Ok(()) + } + /// An example extension node that doesn't do anything struct NoOpExtensionNode { schema: DFSchemaRef, From 7660a22090fcf1d0230ca1e700a4b98f647b0c48 Mon Sep 17 00:00:00 2001 From: Marc Prud'hommeaux Date: Fri, 12 Feb 2021 17:29:42 -0500 Subject: [PATCH 06/12] ARROW-11586: [Rust][Datafusion] Remove force unwrap Fix for https://issues.apache.org/jira/browse/ARROW-11586 Closes #9479 from marcprux/patch-3 Authored-by: Marc Prud'hommeaux Signed-off-by: Andrew Lamb --- rust/datafusion/src/sql/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 5d505fd2ec6..58310f50856 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -649,7 +649,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .iter() .map(|e| { Ok(Expr::Sort { - expr: Box::new(self.sql_to_rex(&e.expr, &input_schema).unwrap()), + expr: Box::new(self.sql_to_rex(&e.expr, &input_schema)?), // by default asc asc: e.asc.unwrap_or(true), // by default nulls first to be consistent with spark From 3f18f420d6d739ae61ae47d4554156f85676067a Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 13 Feb 2021 17:36:01 +0100 Subject: [PATCH 07/12] CSV reader performance & redesign --- rust/arrow/Cargo.toml | 1 + rust/arrow/examples/read_csv.rs | 2 +- rust/arrow/examples/read_csv_infer_schema.rs | 20 +- rust/arrow/src/csv/reader.rs | 794 +++++++++++-------- rust/arrow/src/csv/writer.rs | 2 +- rust/datafusion/src/physical_plan/csv.rs | 13 +- 6 files changed, 468 insertions(+), 364 deletions(-) diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml index 0b14b5bfae8..7e8e678b569 100644 --- a/rust/arrow/Cargo.toml +++ b/rust/arrow/Cargo.toml @@ -42,6 +42,7 @@ serde_json = { version = "1.0", features = ["preserve_order"] } indexmap = "1.6" rand = "0.7" csv = "1.1" +csv-core = "0.1" num = "0.3" regex = "1.3" lazy_static = "1.4" diff --git a/rust/arrow/examples/read_csv.rs b/rust/arrow/examples/read_csv.rs index 9e2b9c34c86..aa5d6135d73 100644 --- a/rust/arrow/examples/read_csv.rs +++ b/rust/arrow/examples/read_csv.rs @@ -36,7 +36,7 @@ fn main() { let mut csv = csv::Reader::new(file, Arc::new(schema), false, None, 1024, None, None); let _batch = csv.next().unwrap().unwrap(); - #[cfg(feature = "prettyprint")] + #[cfg(feature = "prett yprint")] { print_batches(&[_batch]).unwrap(); } diff --git a/rust/arrow/examples/read_csv_infer_schema.rs b/rust/arrow/examples/read_csv_infer_schema.rs index 93253e72cff..dcdd02d5953 100644 --- a/rust/arrow/examples/read_csv_infer_schema.rs +++ b/rust/arrow/examples/read_csv_infer_schema.rs @@ -23,14 +23,14 @@ use arrow::util::pretty::print_batches; use std::fs::File; fn main() { - let file = File::open("test/data/uk_cities_with_headers.csv").unwrap(); - let builder = csv::ReaderBuilder::new() - .has_header(true) - .infer_schema(Some(100)); - let mut csv = builder.build(file).unwrap(); - let _batch = csv.next().unwrap().unwrap(); - #[cfg(feature = "prettyprint")] - { - print_batches(&[_batch]).unwrap(); - } + // let file = File::open("test/data/uk_cities_with_headers.csv").unwrap(); + // let builder = csv::ReaderBuilder::new() + // .has_header(true) + // .infer_schema(Some(100)); + // let mut csv = builder.build(file).unwrap(); + // let _batch = csv.next().unwrap().unwrap(); + // #[cfg(feature = "prettyprint")] + // { + // print_batches(&[_batch]).unwrap(); + // } } diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 9ad3691d4fc..a788e56cacc 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -40,7 +40,7 @@ //! let batch = csv.next().unwrap().unwrap(); //! ``` -use core::cmp::min; +use csv_core::ReadFieldResult; use lazy_static::lazy_static; use regex::{Regex, RegexBuilder}; use std::collections::HashSet; @@ -56,7 +56,7 @@ use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; -use self::csv_crate::{ByteRecord, StringRecord}; +use self::csv_crate::StringRecord; lazy_static! { static ref DECIMAL_RE: Regex = Regex::new(r"^-?(\d+\.\d+)$").unwrap(); @@ -66,8 +66,7 @@ lazy_static! { .build() .unwrap(); static ref DATE_RE: Regex = Regex::new(r"^\d{4}-\d\d-\d\d$").unwrap(); - static ref DATETIME_RE: Regex = - Regex::new(r"^\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d$").unwrap(); + static ref DATETIME_RE: Regex = Regex::new(r"^\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d$").unwrap(); } /// Infer the data type of a record @@ -100,7 +99,7 @@ fn infer_field_schema(string: &str) -> DataType { /// /// Return infered schema and number of records used for inference. fn infer_file_schema( - reader: &mut R, + reader: R, delimiter: u8, max_read_records: Option, has_header: bool, @@ -207,7 +206,7 @@ pub fn infer_schema_from_files( for fname in files.iter() { let (schema, records_read) = infer_file_schema( - &mut File::open(fname)?, + File::open(fname)?, delimiter, Some(records_to_read), has_header, @@ -235,15 +234,19 @@ pub struct Reader { /// Optional projection for which columns to load (zero-based column indices) projection: Option>, /// File reader - reader: csv_crate::Reader, + reader: R, /// Current line number line_number: usize, /// Maximum number of rows to read end: usize, /// Number of records per batch batch_size: usize, - /// Vector that can hold the `StringRecord`s of the batches - batch_records: Vec, + /// + csv_reader: csv_core::Reader, + /// Vector keeping the offsets of the values + offsets: Vec, + /// Vector keeping the data + data: Vec, } impl fmt::Debug for Reader @@ -307,47 +310,63 @@ impl Reader { bounds: Bounds, projection: Option>, ) -> Self { - let mut reader_builder = csv_crate::ReaderBuilder::new(); - reader_builder.has_headers(has_header); + let mut reader_builder = csv_core::ReaderBuilder::new(); + //reader_builder.has_headers(has_header); if let Some(c) = delimiter { reader_builder.delimiter(c); } - let mut csv_reader = reader_builder.from_reader(reader); - let (start, end) = match bounds { None => (0, usize::MAX), Some((start, end)) => (start, end), }; + //TODO: header, skipping + // First we will skip `start` rows // note that this skips by iteration. This is because in general it is not possible // to seek in CSV. However, skiping still saves the burden of creating arrow arrays, // which is a slow operation that scales with the number of columns - - let mut record = ByteRecord::new(); - // Skip first start items - for _ in 0..start { - let res = csv_reader.read_byte_record(&mut record); - if !res.unwrap_or(false) { - break; - } - } - - // Initialize batch_records with StringRecords so they - // can be reused accross batches - let mut batch_records = Vec::with_capacity(batch_size); - batch_records.resize_with(batch_size, Default::default); + let csv_reader = reader_builder.build(); + // Skip first start items TODO + // let buf_in = &mut [0; 0]; + // let mut csv_reader = csv_core::Reader::new(); + // let mut read = reader.read(buf_in).unwrap(); + // let mut count_records = 0; + // let mut buf_in_start = 0; + // while count_records < start { + // let (result, n_in, _) = + // csv_reader.read_field(&buf_in[buf_in_start..read], &mut [0; 1024]); + // buf_in_start += n_in; + // match result { + // ReadFieldResult::InputEmpty => { + // read = reader.read(buf_in).unwrap(); + // if read == 0 { + // break; + // } + // buf_in_start = 0; + // } + // ReadFieldResult::OutputFull => panic!("field too large"), + // ReadFieldResult::Field { record_end } => { + // if record_end { + // count_records += 1; + // } + // } + // ReadFieldResult::End => break, + // } + // } Self { schema, projection, - reader: csv_reader, + reader, line_number: if has_header { start + 1 } else { start }, batch_size, end, - batch_records, + csv_reader, + offsets: vec![], + data: Vec::new(), } } } @@ -359,22 +378,56 @@ impl Iterator for Reader { let remaining = self.end - self.line_number; let mut read_records = 0; - for i in 0..min(self.batch_size, remaining) { - match self.reader.read_record(&mut self.batch_records[i]) { - Ok(true) => { - read_records += 1; + // TODO, reuse buffers + let buf_in = &mut [0; 100000]; + let csv_buf = &mut [0; 100000]; + let mut current_offset = 0; + self.offsets.clear(); + self.data.clear(); + self.offsets.push(0); + let mut read = self.reader.read(buf_in).ok()?; + let mut buf_in_start = 0; + while read_records <= std::cmp::min(self.batch_size, remaining) { + let (result, n_in, n_out) = self + .csv_reader + .read_field(&buf_in[buf_in_start..read], csv_buf); + buf_in_start += n_in; + match result { + ReadFieldResult::InputEmpty | ReadFieldResult::End => { + read = self.reader.read(buf_in).ok()?; + if read == 0 { + if n_out > 0 { + current_offset += n_out; + self.offsets.push(current_offset); + self.data.extend_from_slice(&csv_buf[..n_out]); + read_records += 1; + } + println!("empty!"); + break; + } + buf_in_start = 0; } - Ok(false) => break, - Err(e) => { - return Some(Err(ArrowError::ParseError(format!( - "Error parsing line {}: {:?}", - self.line_number + i, - e - )))) + ReadFieldResult::OutputFull => panic!("field too large"), // TODO: grow buffer + ReadFieldResult::Field { record_end } => { + current_offset += n_out; + self.offsets.push(current_offset); + self.data.extend_from_slice(&csv_buf[..n_out]); + if record_end { + read_records += 1; + } } } } + //println!("{:?}", self.offsets); + + println!( + "read {} {} {}", + read_records, + self.offsets.len(), + self.data.len() + ); + // return early if no data was loaded if read_records == 0 { return None; @@ -382,7 +435,8 @@ impl Iterator for Reader { // parse the batches into a RecordBatch let result = parse( - &self.batch_records[..read_records], + &self.data, + &self.offsets, &self.schema.fields(), &self.projection, self.line_number, @@ -396,7 +450,8 @@ impl Iterator for Reader { /// parses a slice of [csv_crate::StringRecord] into a [array::record_batch::RecordBatch]. fn parse( - rows: &[StringRecord], + row_data: &Vec, + row_offsets: &Vec, fields: &[Field], projection: &Option>, line_number: usize, @@ -412,56 +467,123 @@ fn parse( let i = *i; let field = &fields[i]; match field.data_type() { - &DataType::Boolean => build_boolean_array(line_number, rows, i), - &DataType::Int8 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Int16 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Int32 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Int64 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::UInt8 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::UInt16 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::UInt32 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::UInt64 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Float32 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Float64 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Date32 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Date64 => { - build_primitive_array::(line_number, rows, i) + &DataType::Boolean => { + build_boolean_array(line_number, row_data, row_offsets, i, fields.len()) } + &DataType::Int8 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Int16 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Int32 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Int64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt8 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt16 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt32 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Float32 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Float64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Date32 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Date64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), &DataType::Timestamp(TimeUnit::Microsecond, _) => { build_primitive_array::( line_number, - rows, + row_data, + row_offsets, i, + fields.len(), ) } &DataType::Timestamp(TimeUnit::Nanosecond, _) => { - build_primitive_array::(line_number, rows, i) + build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ) } - &DataType::Utf8 => Ok(Arc::new( - rows.iter().map(|row| row.get(i)).collect::(), - ) as ArrayRef), + &DataType::Utf8 => Ok(Arc::new(StringArray::from_iter_values( + row_offsets + .windows(2) + .skip(i) + .step_by(fields.len()) + .map(|window| { + let str = std::str::from_utf8(&row_data[window[0]..window[1]]) + .unwrap() + .trim_matches('"'); + str + }), + )) as ArrayRef), other => Err(ArrowError::ParseError(format!( "Unsupported data type {:?}", other @@ -470,8 +592,7 @@ fn parse( }) .collect(); - let projected_fields: Vec = - projection.iter().map(|i| fields[*i].clone()).collect(); + let projected_fields: Vec = projection.iter().map(|i| fields[*i].clone()).collect(); let projected_schema = Arc::new(Schema::new(projected_fields)); @@ -569,10 +690,10 @@ fn parse_item(string: &str) -> Option { T::parse(string) } -fn parse_bool(string: &str) -> Option { - if string.eq_ignore_ascii_case("false") { +fn parse_bool(string: &[u8]) -> Option { + if string.eq_ignore_ascii_case(b"false") { Some(false) - } else if string.eq_ignore_ascii_case("true") { + } else if string.eq_ignore_ascii_case(b"true") { Some(true) } else { None @@ -582,31 +703,34 @@ fn parse_bool(string: &str) -> Option { // parses a specific column (col_idx) into an Arrow Array. fn build_primitive_array( line_number: usize, - rows: &[StringRecord], + data: &Vec, + offsets: &Vec, col_idx: usize, + num_cols: usize, ) -> Result { - rows.iter() + offsets + .windows(2) + .skip(col_idx) + .step_by(num_cols) .enumerate() - .map(|(row_index, row)| { - match row.get(col_idx) { - Some(s) => { - if s.is_empty() { - return Ok(None); - } + .map(|(row_index, window)| { + let str = &data[window[0]..window[1]]; + if str.is_empty() { + return Ok(None); + } - let parsed = parse_item::(s); - match parsed { - Some(e) => Ok(Some(e)), - None => Err(ArrowError::ParseError(format!( - // TODO: we should surface the underlying error here. - "Error while parsing value {} for column {} at line {}", - s, - col_idx, - line_number + row_index - ))), - } - } - None => Ok(None), + let str = std::str::from_utf8(str).map_err(|_| ArrowError::ParseError(format!("x")))?; + println!("{}, {} {} {}", str, T::DATA_TYPE, window[0], window[1]); + let parsed = parse_item::(str); + match parsed { + Some(e) => Ok(Some(e)), + None => Err(ArrowError::ParseError(format!( + // TODO: we should surface the underlying error here. + "Error while parsing primitive value {} for column {} at line {}", + str, + col_idx, + line_number + row_index + ))), } }) .collect::>>() @@ -616,31 +740,31 @@ fn build_primitive_array( // parses a specific column (col_idx) into an Arrow Array. fn build_boolean_array( line_number: usize, - rows: &[StringRecord], + data: &Vec, + offsets: &Vec, col_idx: usize, + num_cols: usize, ) -> Result { - rows.iter() + offsets + .windows(2) + .skip(col_idx) + .step_by(num_cols) .enumerate() - .map(|(row_index, row)| { - match row.get(col_idx) { - Some(s) => { - if s.is_empty() { - return Ok(None); - } - - let parsed = parse_bool(s); - match parsed { - Some(e) => Ok(Some(e)), - None => Err(ArrowError::ParseError(format!( - // TODO: we should surface the underlying error here. - "Error while parsing value {} for column {} at line {}", - s, - col_idx, - line_number + row_index - ))), - } - } - None => Ok(None), + .map(|(row_index, window)| { + let str = &data[window[0]..window[1]]; + if str.is_empty() { + return Ok(None); + } + let parsed = parse_bool(str); + match parsed { + Some(e) => Ok(Some(e)), + None => Err(ArrowError::ParseError(format!( + // TODO: we should surface the underlying error here. + "Error while parsing boolean value {:?} for column {} at line {}", + str, + col_idx, + line_number + row_index + ))), } }) .collect::>() @@ -756,33 +880,28 @@ impl ReaderBuilder { self } - /// Create a new `Reader` from the `ReaderBuilder` - pub fn build(self, mut reader: R) -> Result> { - // check if schema should be inferred - let delimiter = self.delimiter.unwrap_or(b','); - let schema = match self.schema { - Some(schema) => schema, - None => { - let (inferred_schema, _) = infer_file_schema( - &mut reader, - delimiter, - self.max_records, - self.has_header, - )?; - - Arc::new(inferred_schema) - } - }; - Ok(Reader::from_reader( - reader, - schema, - self.has_header, - self.delimiter, - self.batch_size, - None, - self.projection.clone(), - )) - } + // pub fn build(self, mut reader: R) -> Result> { + // // check if schema should be inferred + // let delimiter = self.delimiter.unwrap_or(b','); + // let schema = match self.schema { + // Some(schema) => schema, + // None => { + // let (inferred_schema, _) = + // infer_file_schema(&mut reader, delimiter, self.max_records, self.has_header)?; + + // Arc::new(inferred_schema) + // } + // }; + // Ok(Reader::from_reader( + // &reader, + // schema, + // self.has_header, + // self.delimiter, + // self.batch_size, + // None, + // self.projection.clone(), + // )) + // } } #[cfg(test)] @@ -846,98 +965,90 @@ mod tests { Field::new("lng", DataType::Float64, false), ]); - let file_with_headers = - File::open("test/data/uk_cities_with_headers.csv").unwrap(); + let file_with_headers = File::open("test/data/uk_cities_with_headers.csv").unwrap(); let file_without_headers = File::open("test/data/uk_cities.csv").unwrap(); let both_files = file_with_headers .chain(Cursor::new("\n".to_string())) .chain(file_without_headers); - let mut csv = Reader::from_reader( - both_files, - Arc::new(schema), - true, - None, - 1024, - None, - None, - ); + let mut csv = + Reader::from_reader(both_files, Arc::new(schema), true, None, 1024, None, None); let batch = csv.next().unwrap().unwrap(); assert_eq!(74, batch.num_rows()); assert_eq!(3, batch.num_columns()); } - #[test] - fn test_csv_with_schema_inference() { - let file = File::open("test/data/uk_cities_with_headers.csv").unwrap(); - - let builder = ReaderBuilder::new().has_header(true).infer_schema(None); - - let mut csv = builder.build(file).unwrap(); - let expected_schema = Schema::new(vec![ - Field::new("city", DataType::Utf8, false), - Field::new("lat", DataType::Float64, false), - Field::new("lng", DataType::Float64, false), - ]); - assert_eq!(Arc::new(expected_schema), csv.schema()); - let batch = csv.next().unwrap().unwrap(); - assert_eq!(37, batch.num_rows()); - assert_eq!(3, batch.num_columns()); - - // access data from a primitive array - let lat = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert!(57.653484 - lat.value(0) < f64::EPSILON); - - // access data from a string array (ListArray) - let city = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); - } - - #[test] - fn test_csv_with_schema_inference_no_headers() { - let file = File::open("test/data/uk_cities.csv").unwrap(); - - let builder = ReaderBuilder::new().infer_schema(None); - - let mut csv = builder.build(file).unwrap(); - - // csv field names should be 'column_{number}' - let schema = csv.schema(); - assert_eq!("column_1", schema.field(0).name()); - assert_eq!("column_2", schema.field(1).name()); - assert_eq!("column_3", schema.field(2).name()); - let batch = csv.next().unwrap().unwrap(); - let batch_schema = batch.schema(); - - assert_eq!(schema, batch_schema); - assert_eq!(37, batch.num_rows()); - assert_eq!(3, batch.num_columns()); - - // access data from a primitive array - let lat = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert!(57.653484 - lat.value(0) < f64::EPSILON); - - // access data from a string array (ListArray) - let city = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); - } + // #[test] + // fn test_csv_with_schema_inference() { + // let file = File::open("test/data/uk_cities_with_headers.csv").unwrap(); + + // let builder = ReaderBuilder::new().has_header(true).infer_schema(None); + + // let mut csv = builder.build(file).unwrap(); + // let expected_schema = Schema::new(vec![ + // Field::new("city", DataType::Utf8, false), + // Field::new("lat", DataType::Float64, false), + // Field::new("lng", DataType::Float64, false), + // ]); + // assert_eq!(Arc::new(expected_schema), csv.schema()); + // let batch = csv.next().unwrap().unwrap(); + // assert_eq!(37, batch.num_rows()); + // assert_eq!(3, batch.num_columns()); + + // // access data from a primitive array + // let lat = batch + // .column(1) + // .as_any() + // .downcast_ref::() + // .unwrap(); + // assert!(57.653484 - lat.value(0) < f64::EPSILON); + + // // access data from a string array (ListArray) + // let city = batch + // .column(0) + // .as_any() + // .downcast_ref::() + // .unwrap(); + + // assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); + // } + + // #[test] + // fn test_csv_with_schema_inference_no_headers() { + // let file = File::open("test/data/uk_cities.csv").unwrap(); + + // let builder = ReaderBuilder::new().infer_schema(None); + + // let mut csv = builder.build(file).unwrap(); + + // // csv field names should be 'column_{number}' + // let schema = csv.schema(); + // assert_eq!("column_1", schema.field(0).name()); + // assert_eq!("column_2", schema.field(1).name()); + // assert_eq!("column_3", schema.field(2).name()); + // let batch = csv.next().unwrap().unwrap(); + // let batch_schema = batch.schema(); + + // assert_eq!(schema, batch_schema); + // assert_eq!(37, batch.num_rows()); + // assert_eq!(3, batch.num_columns()); + + // // access data from a primitive array + // let lat = batch + // .column(1) + // .as_any() + // .downcast_ref::() + // .unwrap(); + // assert!(57.653484 - lat.value(0) < f64::EPSILON); + + // // access data from a string array (ListArray) + // let city = batch + // .column(0) + // .as_any() + // .downcast_ref::() + // .unwrap(); + + // assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); + // } #[test] fn test_csv_with_projection() { @@ -989,90 +1100,89 @@ mod tests { assert_eq!(false, batch.column(1).is_null(4)); } - #[test] - fn test_nulls_with_inference() { - let file = File::open("test/data/various_types.csv").unwrap(); - - let builder = ReaderBuilder::new() - .infer_schema(None) - .has_header(true) - .with_delimiter(b'|') - .with_batch_size(512) - .with_projection(vec![0, 1, 2, 3, 4, 5]); - - let mut csv = builder.build(file).unwrap(); - let batch = csv.next().unwrap().unwrap(); - - assert_eq!(5, batch.num_rows()); - assert_eq!(6, batch.num_columns()); - - let schema = batch.schema(); - - assert_eq!(&DataType::Int64, schema.field(0).data_type()); - assert_eq!(&DataType::Float64, schema.field(1).data_type()); - assert_eq!(&DataType::Float64, schema.field(2).data_type()); - assert_eq!(&DataType::Boolean, schema.field(3).data_type()); - assert_eq!(&DataType::Date32, schema.field(4).data_type()); - assert_eq!(&DataType::Date64, schema.field(5).data_type()); - - let names: Vec<&str> = - schema.fields().iter().map(|x| x.name().as_str()).collect(); - assert_eq!( - names, - vec![ - "c_int", - "c_float", - "c_string", - "c_bool", - "c_date", - "c_datetime" - ] - ); - - assert_eq!(false, schema.field(0).is_nullable()); - assert_eq!(true, schema.field(1).is_nullable()); - assert_eq!(true, schema.field(2).is_nullable()); - assert_eq!(false, schema.field(3).is_nullable()); - assert_eq!(true, schema.field(4).is_nullable()); - assert_eq!(true, schema.field(5).is_nullable()); - - assert_eq!(false, batch.column(1).is_null(0)); - assert_eq!(false, batch.column(1).is_null(1)); - assert_eq!(true, batch.column(1).is_null(2)); - assert_eq!(false, batch.column(1).is_null(3)); - assert_eq!(false, batch.column(1).is_null(4)); - } - - #[test] - fn test_parse_invalid_csv() { - let file = File::open("test/data/various_types_invalid.csv").unwrap(); - - let schema = Schema::new(vec![ - Field::new("c_int", DataType::UInt64, false), - Field::new("c_float", DataType::Float32, false), - Field::new("c_string", DataType::Utf8, false), - Field::new("c_bool", DataType::Boolean, false), - ]); - - let builder = ReaderBuilder::new() - .with_schema(Arc::new(schema)) - .has_header(true) - .with_delimiter(b'|') - .with_batch_size(512) - .with_projection(vec![0, 1, 2, 3]); - - let mut csv = builder.build(file).unwrap(); - match csv.next() { - Some(e) => match e { - Err(e) => assert_eq!( - "ParseError(\"Error while parsing value 4.x4 for column 1 at line 4\")", - format!("{:?}", e) - ), - Ok(_) => panic!("should have failed"), - }, - None => panic!("should have failed"), - } - } + // #[test] + // fn test_nulls_with_inference() { + // let file = File::open("test/data/various_types.csv").unwrap(); + + // let builder = ReaderBuilder::new() + // .infer_schema(None) + // .has_header(true) + // .with_delimiter(b'|') + // .with_batch_size(512) + // .with_projection(vec![0, 1, 2, 3, 4, 5]); + + // let mut csv = builder.build(file).unwrap(); + // let batch = csv.next().unwrap().unwrap(); + + // assert_eq!(5, batch.num_rows()); + // assert_eq!(6, batch.num_columns()); + + // let schema = batch.schema(); + + // assert_eq!(&DataType::Int64, schema.field(0).data_type()); + // assert_eq!(&DataType::Float64, schema.field(1).data_type()); + // assert_eq!(&DataType::Float64, schema.field(2).data_type()); + // assert_eq!(&DataType::Boolean, schema.field(3).data_type()); + // assert_eq!(&DataType::Date32, schema.field(4).data_type()); + // assert_eq!(&DataType::Date64, schema.field(5).data_type()); + + // let names: Vec<&str> = schema.fields().iter().map(|x| x.name().as_str()).collect(); + // assert_eq!( + // names, + // vec![ + // "c_int", + // "c_float", + // "c_string", + // "c_bool", + // "c_date", + // "c_datetime" + // ] + // ); + + // assert_eq!(false, schema.field(0).is_nullable()); + // assert_eq!(true, schema.field(1).is_nullable()); + // assert_eq!(true, schema.field(2).is_nullable()); + // assert_eq!(false, schema.field(3).is_nullable()); + // assert_eq!(true, schema.field(4).is_nullable()); + // assert_eq!(true, schema.field(5).is_nullable()); + + // assert_eq!(false, batch.column(1).is_null(0)); + // assert_eq!(false, batch.column(1).is_null(1)); + // assert_eq!(true, batch.column(1).is_null(2)); + // assert_eq!(false, batch.column(1).is_null(3)); + // assert_eq!(false, batch.column(1).is_null(4)); + // } + + // #[test] + // fn test_parse_invalid_csv() { + // let file = File::open("test/data/various_types_invalid.csv").unwrap(); + + // let schema = Schema::new(vec![ + // Field::new("c_int", DataType::UInt64, false), + // Field::new("c_float", DataType::Float32, false), + // Field::new("c_string", DataType::Utf8, false), + // Field::new("c_bool", DataType::Boolean, false), + // ]); + + // let builder = ReaderBuilder::new() + // .with_schema(Arc::new(schema)) + // .has_header(true) + // .with_delimiter(b'|') + // .with_batch_size(512) + // .with_projection(vec![0, 1, 2, 3]); + + // let mut csv = builder.build(file).unwrap(); + // match csv.next() { + // Some(e) => match e { + // Err(e) => assert_eq!( + // "ParseError(\"Error while parsing value 4.x4 for column 1 at line 4\")", + // format!("{:?}", e) + // ), + // Ok(_) => panic!("should have failed"), + // }, + // None => panic!("should have failed"), + // } + // } #[test] fn test_infer_field_schema() { @@ -1172,10 +1282,10 @@ mod tests { .join("\n"); let data = data.as_bytes(); - let reader = std::io::Cursor::new(data); + let mut reader = std::io::Cursor::new(data); let mut csv = Reader::new( - reader, + &mut reader, Arc::new(schema), false, None, @@ -1201,21 +1311,21 @@ mod tests { #[test] fn test_parsing_bool() { // Encode the expected behavior of boolean parsing - assert_eq!(Some(true), parse_bool("true")); - assert_eq!(Some(true), parse_bool("tRUe")); - assert_eq!(Some(true), parse_bool("True")); - assert_eq!(Some(true), parse_bool("TRUE")); - assert_eq!(None, parse_bool("t")); - assert_eq!(None, parse_bool("T")); - assert_eq!(None, parse_bool("")); - - assert_eq!(Some(false), parse_bool("false")); - assert_eq!(Some(false), parse_bool("fALse")); - assert_eq!(Some(false), parse_bool("False")); - assert_eq!(Some(false), parse_bool("FALSE")); - assert_eq!(None, parse_bool("f")); - assert_eq!(None, parse_bool("F")); - assert_eq!(None, parse_bool("")); + assert_eq!(Some(true), parse_bool(b"true")); + assert_eq!(Some(true), parse_bool(b"tRUe")); + assert_eq!(Some(true), parse_bool(b"True")); + assert_eq!(Some(true), parse_bool(b"TRUE")); + assert_eq!(None, parse_bool(b"t")); + assert_eq!(None, parse_bool(b"T")); + assert_eq!(None, parse_bool(b"")); + + assert_eq!(Some(false), parse_bool(b"false")); + assert_eq!(Some(false), parse_bool(b"fALse")); + assert_eq!(Some(false), parse_bool(b"False")); + assert_eq!(Some(false), parse_bool(b"FALSE")); + assert_eq!(None, parse_bool(b"f")); + assert_eq!(None, parse_bool(b"F")); + assert_eq!(None, parse_bool(b"")); } #[test] diff --git a/rust/arrow/src/csv/writer.rs b/rust/arrow/src/csv/writer.rs index e9d8565b2a5..f38f71e5ec6 100644 --- a/rust/arrow/src/csv/writer.rs +++ b/rust/arrow/src/csv/writer.rs @@ -628,7 +628,7 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03\n"; buf.set_position(0); let mut reader = Reader::new( - buf, + &mut buf, Arc::new(schema), false, None, diff --git a/rust/datafusion/src/physical_plan/csv.rs b/rust/datafusion/src/physical_plan/csv.rs index d62d18756f8..6740b04627e 100644 --- a/rust/datafusion/src/physical_plan/csv.rs +++ b/rust/datafusion/src/physical_plan/csv.rs @@ -213,10 +213,7 @@ impl CsvExec { } /// Infer schema for given CSV dataset - pub fn try_infer_schema( - filenames: &[String], - options: &CsvReadOptions, - ) -> Result { + pub fn try_infer_schema(filenames: &[String], options: &CsvReadOptions) -> Result { Ok(csv::infer_schema_from_files( filenames, options.delimiter, @@ -308,10 +305,7 @@ impl CsvStream { impl Stream for CsvStream { type Item = ArrowResult; - fn poll_next( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(self.reader.next()) } } @@ -362,8 +356,7 @@ mod tests { let testdata = arrow::util::test_util::arrow_test_data(); let filename = "aggregate_test_100.csv"; let path = format!("{}/csv/{}", testdata, filename); - let csv = - CsvExec::try_new(&path, CsvReadOptions::new().schema(&schema), None, 1024)?; + let csv = CsvExec::try_new(&path, CsvReadOptions::new().schema(&schema), None, 1024)?; assert_eq!(13, csv.schema.fields().len()); assert_eq!(13, csv.projected_schema.fields().len()); assert_eq!(13, csv.file_schema().fields().len()); From 6035f6940d8fffc5987b945998fd828b0566b799 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 13 Feb 2021 17:45:04 +0100 Subject: [PATCH 08/12] Fmt --- rust/arrow/src/csv/reader.rs | 265 ++++++++++++++++++----------------- 1 file changed, 140 insertions(+), 125 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index a788e56cacc..37bd0298716 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -66,7 +66,8 @@ lazy_static! { .build() .unwrap(); static ref DATE_RE: Regex = Regex::new(r"^\d{4}-\d\d-\d\d$").unwrap(); - static ref DATETIME_RE: Regex = Regex::new(r"^\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d$").unwrap(); + static ref DATETIME_RE: Regex = + Regex::new(r"^\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d$").unwrap(); } /// Infer the data type of a record @@ -461,138 +462,143 @@ fn parse( None => fields.iter().enumerate().map(|(i, _)| i).collect(), }; - let arrays: Result> = projection - .iter() - .map(|i| { - let i = *i; - let field = &fields[i]; - match field.data_type() { - &DataType::Boolean => { - build_boolean_array(line_number, row_data, row_offsets, i, fields.len()) - } - &DataType::Int8 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Int16 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Int32 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Int64 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::UInt8 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::UInt16 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::UInt32 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::UInt64 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Float32 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Float64 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Date32 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Date64 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Timestamp(TimeUnit::Microsecond, _) => { - build_primitive_array::( + let arrays: Result> = + projection + .iter() + .map(|i| { + let i = *i; + let field = &fields[i]; + match field.data_type() { + &DataType::Boolean => build_boolean_array( line_number, row_data, row_offsets, i, fields.len(), - ) - } - &DataType::Timestamp(TimeUnit::Nanosecond, _) => { - build_primitive_array::( + ), + &DataType::Int8 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Int16 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Int32 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Int64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt8 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt16 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt32 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Float32 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Float64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Date32 => build_primitive_array::( line_number, row_data, row_offsets, i, fields.len(), - ) + ), + &DataType::Date64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Timestamp(TimeUnit::Microsecond, _) => { + build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ) + } + &DataType::Timestamp(TimeUnit::Nanosecond, _) => { + build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ) + } + &DataType::Utf8 => Ok(Arc::new(StringArray::from_iter_values( + row_offsets.windows(2).skip(i).step_by(fields.len()).map( + |window| { + let str = + std::str::from_utf8(&row_data[window[0]..window[1]]) + .unwrap() + .trim_matches('"'); + str + }, + ), + )) as ArrayRef), + other => Err(ArrowError::ParseError(format!( + "Unsupported data type {:?}", + other + ))), } - &DataType::Utf8 => Ok(Arc::new(StringArray::from_iter_values( - row_offsets - .windows(2) - .skip(i) - .step_by(fields.len()) - .map(|window| { - let str = std::str::from_utf8(&row_data[window[0]..window[1]]) - .unwrap() - .trim_matches('"'); - str - }), - )) as ArrayRef), - other => Err(ArrowError::ParseError(format!( - "Unsupported data type {:?}", - other - ))), - } - }) - .collect(); + }) + .collect(); - let projected_fields: Vec = projection.iter().map(|i| fields[*i].clone()).collect(); + let projected_fields: Vec = + projection.iter().map(|i| fields[*i].clone()).collect(); let projected_schema = Arc::new(Schema::new(projected_fields)); @@ -719,7 +725,8 @@ fn build_primitive_array( return Ok(None); } - let str = std::str::from_utf8(str).map_err(|_| ArrowError::ParseError(format!("x")))?; + let str = std::str::from_utf8(str) + .map_err(|_| ArrowError::ParseError(format!("x")))?; println!("{}, {} {} {}", str, T::DATA_TYPE, window[0], window[1]); let parsed = parse_item::(str); match parsed { @@ -965,13 +972,21 @@ mod tests { Field::new("lng", DataType::Float64, false), ]); - let file_with_headers = File::open("test/data/uk_cities_with_headers.csv").unwrap(); + let file_with_headers = + File::open("test/data/uk_cities_with_headers.csv").unwrap(); let file_without_headers = File::open("test/data/uk_cities.csv").unwrap(); let both_files = file_with_headers .chain(Cursor::new("\n".to_string())) .chain(file_without_headers); - let mut csv = - Reader::from_reader(both_files, Arc::new(schema), true, None, 1024, None, None); + let mut csv = Reader::from_reader( + both_files, + Arc::new(schema), + true, + None, + 1024, + None, + None, + ); let batch = csv.next().unwrap().unwrap(); assert_eq!(74, batch.num_rows()); assert_eq!(3, batch.num_columns()); From 6e750e1eb83be92837d61fee7251ab26a0aaeec4 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 13 Feb 2021 17:45:45 +0100 Subject: [PATCH 09/12] Spacing --- rust/arrow/examples/read_csv.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/arrow/examples/read_csv.rs b/rust/arrow/examples/read_csv.rs index aa5d6135d73..9e2b9c34c86 100644 --- a/rust/arrow/examples/read_csv.rs +++ b/rust/arrow/examples/read_csv.rs @@ -36,7 +36,7 @@ fn main() { let mut csv = csv::Reader::new(file, Arc::new(schema), false, None, 1024, None, None); let _batch = csv.next().unwrap().unwrap(); - #[cfg(feature = "prett yprint")] + #[cfg(feature = "prettyprint")] { print_batches(&[_batch]).unwrap(); } From cf2bd060c83c5f8db35c620ca6899047400603b1 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 13 Feb 2021 17:50:45 +0100 Subject: [PATCH 10/12] Include header in start rows --- rust/arrow/src/csv/reader.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 37bd0298716..09ba0fa33f4 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -312,18 +312,18 @@ impl Reader { projection: Option>, ) -> Self { let mut reader_builder = csv_core::ReaderBuilder::new(); - //reader_builder.has_headers(has_header); + let header_row = if has_header { 1 } else { 0 }; if let Some(c) = delimiter { reader_builder.delimiter(c); } let (start, end) = match bounds { - None => (0, usize::MAX), - Some((start, end)) => (start, end), + None => (0 + header_row, usize::MAX), + Some((start, end)) => (start + header_row, end), }; - //TODO: header, skipping + //TODO: header + skipping // First we will skip `start` rows // note that this skips by iteration. This is because in general it is not possible From 6c9f261f4ce0427755de0350f57a49947bf1033d Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 13 Feb 2021 18:25:07 +0100 Subject: [PATCH 11/12] Use `read_record` API --- rust/arrow/src/csv/reader.rs | 301 +++++++++++++++++------------------ 1 file changed, 149 insertions(+), 152 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 09ba0fa33f4..49f7dd5f09d 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -40,7 +40,7 @@ //! let batch = csv.next().unwrap().unwrap(); //! ``` -use csv_core::ReadFieldResult; +use csv_core::ReadRecordResult; use lazy_static::lazy_static; use regex::{Regex, RegexBuilder}; use std::collections::HashSet; @@ -66,8 +66,7 @@ lazy_static! { .build() .unwrap(); static ref DATE_RE: Regex = Regex::new(r"^\d{4}-\d\d-\d\d$").unwrap(); - static ref DATETIME_RE: Regex = - Regex::new(r"^\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d$").unwrap(); + static ref DATETIME_RE: Regex = Regex::new(r"^\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d$").unwrap(); } /// Infer the data type of a record @@ -388,19 +387,24 @@ impl Iterator for Reader { self.offsets.push(0); let mut read = self.reader.read(buf_in).ok()?; let mut buf_in_start = 0; + let mut ends = vec![0; self.schema().fields().len()]; while read_records <= std::cmp::min(self.batch_size, remaining) { - let (result, n_in, n_out) = self - .csv_reader - .read_field(&buf_in[buf_in_start..read], csv_buf); + let (result, n_in, n_out, n_ends) = + self.csv_reader + .read_record(&buf_in[buf_in_start..read], csv_buf, &mut ends); buf_in_start += n_in; match result { - ReadFieldResult::InputEmpty | ReadFieldResult::End => { + ReadRecordResult::InputEmpty | ReadRecordResult::End => { read = self.reader.read(buf_in).ok()?; if read == 0 { if n_out > 0 { + self.data.extend_from_slice(&csv_buf[..n_out]); + + for end in ends.iter().take(n_ends) { + self.offsets.push(current_offset + end); + } current_offset += n_out; self.offsets.push(current_offset); - self.data.extend_from_slice(&csv_buf[..n_out]); read_records += 1; } println!("empty!"); @@ -408,14 +412,20 @@ impl Iterator for Reader { } buf_in_start = 0; } - ReadFieldResult::OutputFull => panic!("field too large"), // TODO: grow buffer - ReadFieldResult::Field { record_end } => { - current_offset += n_out; - self.offsets.push(current_offset); + ReadRecordResult::OutputFull => panic!("field too large"), // TODO: grow buffer + ReadRecordResult::Record => { + assert!(n_ends == ends.len()); + self.data.extend_from_slice(&csv_buf[..n_out]); - if record_end { - read_records += 1; + assert!(n_ends == ends.len()); + for end in ends.iter().take(n_ends) { + self.offsets.push(current_offset + end); } + current_offset += n_out; + read_records += 1; + } + ReadRecordResult::OutputEndsFull => { + panic!("more ends!") } } } @@ -462,143 +472,138 @@ fn parse( None => fields.iter().enumerate().map(|(i, _)| i).collect(), }; - let arrays: Result> = - projection - .iter() - .map(|i| { - let i = *i; - let field = &fields[i]; - match field.data_type() { - &DataType::Boolean => build_boolean_array( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Int8 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Int16 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Int32 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Int64 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::UInt8 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::UInt16 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::UInt32 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::UInt64 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Float32 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Float64 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Date32 => build_primitive_array::( + let arrays: Result> = projection + .iter() + .map(|i| { + let i = *i; + let field = &fields[i]; + match field.data_type() { + &DataType::Boolean => { + build_boolean_array(line_number, row_data, row_offsets, i, fields.len()) + } + &DataType::Int8 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Int16 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Int32 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Int64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt8 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt16 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt32 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Float32 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Float64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Date32 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Date64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Timestamp(TimeUnit::Microsecond, _) => { + build_primitive_array::( line_number, row_data, row_offsets, i, fields.len(), - ), - &DataType::Date64 => build_primitive_array::( + ) + } + &DataType::Timestamp(TimeUnit::Nanosecond, _) => { + build_primitive_array::( line_number, row_data, row_offsets, i, fields.len(), - ), - &DataType::Timestamp(TimeUnit::Microsecond, _) => { - build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ) - } - &DataType::Timestamp(TimeUnit::Nanosecond, _) => { - build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ) - } - &DataType::Utf8 => Ok(Arc::new(StringArray::from_iter_values( - row_offsets.windows(2).skip(i).step_by(fields.len()).map( - |window| { - let str = - std::str::from_utf8(&row_data[window[0]..window[1]]) - .unwrap() - .trim_matches('"'); - str - }, - ), - )) as ArrayRef), - other => Err(ArrowError::ParseError(format!( - "Unsupported data type {:?}", - other - ))), + ) } - }) - .collect(); + &DataType::Utf8 => Ok(Arc::new(StringArray::from_iter_values( + row_offsets + .windows(2) + .skip(i) + .step_by(fields.len()) + .map(|window| { + let str = std::str::from_utf8(&row_data[window[0]..window[1]]) + .unwrap() + .trim_matches('"'); + str + }), + )) as ArrayRef), + other => Err(ArrowError::ParseError(format!( + "Unsupported data type {:?}", + other + ))), + } + }) + .collect(); - let projected_fields: Vec = - projection.iter().map(|i| fields[*i].clone()).collect(); + let projected_fields: Vec = projection.iter().map(|i| fields[*i].clone()).collect(); let projected_schema = Arc::new(Schema::new(projected_fields)); @@ -725,8 +730,7 @@ fn build_primitive_array( return Ok(None); } - let str = std::str::from_utf8(str) - .map_err(|_| ArrowError::ParseError(format!("x")))?; + let str = std::str::from_utf8(str).map_err(|_| ArrowError::ParseError(format!("x")))?; println!("{}, {} {} {}", str, T::DATA_TYPE, window[0], window[1]); let parsed = parse_item::(str); match parsed { @@ -972,21 +976,13 @@ mod tests { Field::new("lng", DataType::Float64, false), ]); - let file_with_headers = - File::open("test/data/uk_cities_with_headers.csv").unwrap(); + let file_with_headers = File::open("test/data/uk_cities_with_headers.csv").unwrap(); let file_without_headers = File::open("test/data/uk_cities.csv").unwrap(); let both_files = file_with_headers .chain(Cursor::new("\n".to_string())) .chain(file_without_headers); - let mut csv = Reader::from_reader( - both_files, - Arc::new(schema), - true, - None, - 1024, - None, - None, - ); + let mut csv = + Reader::from_reader(both_files, Arc::new(schema), true, None, 1024, None, None); let batch = csv.next().unwrap().unwrap(); assert_eq!(74, batch.num_rows()); assert_eq!(3, batch.num_columns()); @@ -1101,6 +1097,7 @@ mod tests { Field::new("c_int", DataType::UInt64, false), Field::new("c_float", DataType::Float32, false), Field::new("c_string", DataType::Utf8, false), + Field::new("c_bool", DataType::Boolean, false), ]); let file = File::open("test/data/null_test.csv").unwrap(); From 2770c6914824ee7919ba1e308d17996d6472b730 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 13 Feb 2021 19:04:32 +0100 Subject: [PATCH 12/12] Allow for extra terminator --- rust/arrow/src/csv/reader.rs | 288 ++++++++++++++++++----------------- 1 file changed, 152 insertions(+), 136 deletions(-) diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 49f7dd5f09d..bdcffae0794 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -66,7 +66,8 @@ lazy_static! { .build() .unwrap(); static ref DATE_RE: Regex = Regex::new(r"^\d{4}-\d\d-\d\d$").unwrap(); - static ref DATETIME_RE: Regex = Regex::new(r"^\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d$").unwrap(); + static ref DATETIME_RE: Regex = + Regex::new(r"^\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d$").unwrap(); } /// Infer the data type of a record @@ -387,20 +388,25 @@ impl Iterator for Reader { self.offsets.push(0); let mut read = self.reader.read(buf_in).ok()?; let mut buf_in_start = 0; - let mut ends = vec![0; self.schema().fields().len()]; + // +1 to allow trailing delimiter + let num_ends = self.schema().fields().len(); + let mut ends = vec![0; self.schema().fields().len() + 1]; while read_records <= std::cmp::min(self.batch_size, remaining) { - let (result, n_in, n_out, n_ends) = - self.csv_reader - .read_record(&buf_in[buf_in_start..read], csv_buf, &mut ends); + let (result, n_in, n_out, n_ends) = self.csv_reader.read_record( + &buf_in[buf_in_start..read], + csv_buf, + &mut ends, + ); buf_in_start += n_in; match result { ReadRecordResult::InputEmpty | ReadRecordResult::End => { read = self.reader.read(buf_in).ok()?; if read == 0 { if n_out > 0 { + // One record (without terminator) self.data.extend_from_slice(&csv_buf[..n_out]); - for end in ends.iter().take(n_ends) { + for end in ends.iter().take(num_ends - 1) { self.offsets.push(current_offset + end); } current_offset += n_out; @@ -414,11 +420,9 @@ impl Iterator for Reader { } ReadRecordResult::OutputFull => panic!("field too large"), // TODO: grow buffer ReadRecordResult::Record => { - assert!(n_ends == ends.len()); - + assert!(n_ends >= num_ends); self.data.extend_from_slice(&csv_buf[..n_out]); - assert!(n_ends == ends.len()); - for end in ends.iter().take(n_ends) { + for end in ends.iter().take(num_ends) { self.offsets.push(current_offset + end); } current_offset += n_out; @@ -430,8 +434,6 @@ impl Iterator for Reader { } } - //println!("{:?}", self.offsets); - println!( "read {} {} {}", read_records, @@ -472,138 +474,143 @@ fn parse( None => fields.iter().enumerate().map(|(i, _)| i).collect(), }; - let arrays: Result> = projection - .iter() - .map(|i| { - let i = *i; - let field = &fields[i]; - match field.data_type() { - &DataType::Boolean => { - build_boolean_array(line_number, row_data, row_offsets, i, fields.len()) - } - &DataType::Int8 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Int16 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Int32 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Int64 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::UInt8 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::UInt16 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::UInt32 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::UInt64 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Float32 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Float64 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Date32 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Date64 => build_primitive_array::( - line_number, - row_data, - row_offsets, - i, - fields.len(), - ), - &DataType::Timestamp(TimeUnit::Microsecond, _) => { - build_primitive_array::( + let arrays: Result> = + projection + .iter() + .map(|i| { + let i = *i; + let field = &fields[i]; + match field.data_type() { + &DataType::Boolean => build_boolean_array( line_number, row_data, row_offsets, i, fields.len(), - ) - } - &DataType::Timestamp(TimeUnit::Nanosecond, _) => { - build_primitive_array::( + ), + &DataType::Int8 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Int16 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Int32 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Int64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt8 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt16 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt32 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Float32 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Float64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Date32 => build_primitive_array::( line_number, row_data, row_offsets, i, fields.len(), - ) + ), + &DataType::Date64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Timestamp(TimeUnit::Microsecond, _) => { + build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ) + } + &DataType::Timestamp(TimeUnit::Nanosecond, _) => { + build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ) + } + &DataType::Utf8 => Ok(Arc::new(StringArray::from_iter_values( + row_offsets.windows(2).skip(i).step_by(fields.len()).map( + |window| { + let str = + std::str::from_utf8(&row_data[window[0]..window[1]]) + .unwrap() + .trim_matches('"'); + str + }, + ), + )) as ArrayRef), + other => Err(ArrowError::ParseError(format!( + "Unsupported data type {:?}", + other + ))), } - &DataType::Utf8 => Ok(Arc::new(StringArray::from_iter_values( - row_offsets - .windows(2) - .skip(i) - .step_by(fields.len()) - .map(|window| { - let str = std::str::from_utf8(&row_data[window[0]..window[1]]) - .unwrap() - .trim_matches('"'); - str - }), - )) as ArrayRef), - other => Err(ArrowError::ParseError(format!( - "Unsupported data type {:?}", - other - ))), - } - }) - .collect(); + }) + .collect(); - let projected_fields: Vec = projection.iter().map(|i| fields[*i].clone()).collect(); + let projected_fields: Vec = + projection.iter().map(|i| fields[*i].clone()).collect(); let projected_schema = Arc::new(Schema::new(projected_fields)); @@ -730,7 +737,8 @@ fn build_primitive_array( return Ok(None); } - let str = std::str::from_utf8(str).map_err(|_| ArrowError::ParseError(format!("x")))?; + let str = std::str::from_utf8(str) + .map_err(|_| ArrowError::ParseError(format!("x")))?; println!("{}, {} {} {}", str, T::DATA_TYPE, window[0], window[1]); let parsed = parse_item::(str); match parsed { @@ -976,13 +984,21 @@ mod tests { Field::new("lng", DataType::Float64, false), ]); - let file_with_headers = File::open("test/data/uk_cities_with_headers.csv").unwrap(); + let file_with_headers = + File::open("test/data/uk_cities_with_headers.csv").unwrap(); let file_without_headers = File::open("test/data/uk_cities.csv").unwrap(); let both_files = file_with_headers .chain(Cursor::new("\n".to_string())) .chain(file_without_headers); - let mut csv = - Reader::from_reader(both_files, Arc::new(schema), true, None, 1024, None, None); + let mut csv = Reader::from_reader( + both_files, + Arc::new(schema), + true, + None, + 1024, + None, + None, + ); let batch = csv.next().unwrap().unwrap(); assert_eq!(74, batch.num_rows()); assert_eq!(3, batch.num_columns());