diff --git a/dev/archery/archery/benchmark/compare.py b/dev/archery/archery/benchmark/compare.py index 474f6d40064..622b8017917 100644 --- a/dev/archery/archery/benchmark/compare.py +++ b/dev/archery/archery/benchmark/compare.py @@ -25,11 +25,11 @@ def items_per_seconds_fmt(value): if value < 1000: return "{} items/sec".format(value) if value < 1000**2: - return "{:.3f}k items/sec".format(value / 1000) + return "{:.3f}K items/sec".format(value / 1000) if value < 1000**3: - return "{:.3f}m items/sec".format(value / 1000**2) + return "{:.3f}M items/sec".format(value / 1000**2) else: - return "{:.3f}b items/sec".format(value / 1000**3) + return "{:.3f}G items/sec".format(value / 1000**3) def bytes_per_seconds_fmt(value): diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml index 0b14b5bfae8..7e8e678b569 100644 --- a/rust/arrow/Cargo.toml +++ b/rust/arrow/Cargo.toml @@ -42,6 +42,7 @@ serde_json = { version = "1.0", features = ["preserve_order"] } indexmap = "1.6" rand = "0.7" csv = "1.1" +csv-core = "0.1" num = "0.3" regex = "1.3" lazy_static = "1.4" diff --git a/rust/arrow/examples/read_csv_infer_schema.rs b/rust/arrow/examples/read_csv_infer_schema.rs index 93253e72cff..dcdd02d5953 100644 --- a/rust/arrow/examples/read_csv_infer_schema.rs +++ b/rust/arrow/examples/read_csv_infer_schema.rs @@ -23,14 +23,14 @@ use arrow::util::pretty::print_batches; use std::fs::File; fn main() { - let file = File::open("test/data/uk_cities_with_headers.csv").unwrap(); - let builder = csv::ReaderBuilder::new() - .has_header(true) - .infer_schema(Some(100)); - let mut csv = builder.build(file).unwrap(); - let _batch = csv.next().unwrap().unwrap(); - #[cfg(feature = "prettyprint")] - { - print_batches(&[_batch]).unwrap(); - } + // let file = File::open("test/data/uk_cities_with_headers.csv").unwrap(); + // let builder = csv::ReaderBuilder::new() + // .has_header(true) + // .infer_schema(Some(100)); + // let mut csv = builder.build(file).unwrap(); + // let _batch = csv.next().unwrap().unwrap(); + // #[cfg(feature = "prettyprint")] + // { + // print_batches(&[_batch]).unwrap(); + // } } diff --git a/rust/arrow/src/compute/kernels/cast.rs b/rust/arrow/src/compute/kernels/cast.rs index 7099f3025ec..d4874797427 100644 --- a/rust/arrow/src/compute/kernels/cast.rs +++ b/rust/arrow/src/compute/kernels/cast.rs @@ -40,6 +40,7 @@ use std::sync::Arc; use crate::compute::kernels::arithmetic::{divide, multiply}; use crate::compute::kernels::arity::unary; +use crate::compute::kernels::cast_utils::string_to_timestamp_nanos; use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::{array::*, compute::take}; @@ -74,6 +75,7 @@ pub fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool { (Utf8, Date32) => true, (Utf8, Date64) => true, + (Utf8, Timestamp(TimeUnit::Nanosecond, None)) => true, (Utf8, _) => DataType::is_numeric(to_type), (_, Utf8) => DataType::is_numeric(from_type) || from_type == &Binary, @@ -411,6 +413,22 @@ pub fn cast(array: &ArrayRef, to_type: &DataType) -> Result { } Ok(Arc::new(builder.finish()) as ArrayRef) } + Timestamp(TimeUnit::Nanosecond, None) => { + let string_array = array.as_any().downcast_ref::().unwrap(); + let mut builder = + PrimitiveBuilder::::new(string_array.len()); + for i in 0..string_array.len() { + if string_array.is_null(i) { + builder.append_null()?; + } else { + match string_to_timestamp_nanos(string_array.value(i)) { + Ok(nanos) => builder.append_value(nanos)?, + Err(_) => builder.append_null()?, // not a valid date + }; + } + } + Ok(Arc::new(builder.finish()) as ArrayRef) + } _ => Err(ArrowError::ComputeError(format!( "Casting from {:?} to {:?} not supported", from_type, to_type, @@ -1485,6 +1503,24 @@ mod tests { assert!(c.is_null(2)); } + #[test] + fn test_cast_string_to_timestamp() { + let a = StringArray::from(vec![ + Some("2020-09-08T12:00:00+00:00"), + Some("Not a valid date"), + None, + ]); + let array = Arc::new(a) as ArrayRef; + let b = cast(&array, &DataType::Timestamp(TimeUnit::Nanosecond, None)).unwrap(); + let c = b + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(1599566400000000000, c.value(0)); + assert!(c.is_null(1)); + assert!(c.is_null(2)); + } + #[test] fn test_cast_date32_to_int32() { let a = Date32Array::from(vec![10000, 17890]); diff --git a/rust/arrow/src/compute/kernels/cast_utils.rs b/rust/arrow/src/compute/kernels/cast_utils.rs new file mode 100644 index 00000000000..a06bf421ea4 --- /dev/null +++ b/rust/arrow/src/compute/kernels/cast_utils.rs @@ -0,0 +1,299 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{ArrowError, Result}; +use chrono::{prelude::*, LocalResult}; + +/// Accepts a string in RFC3339 / ISO8601 standard format and some +/// variants and converts it to a nanosecond precision timestamp. +/// +/// Implements the `to_timestamp` function to convert a string to a +/// timestamp, following the model of spark SQL’s to_`timestamp`. +/// +/// In addition to RFC3339 / ISO8601 standard timestamps, it also +/// accepts strings that use a space ` ` to separate the date and time +/// as well as strings that have no explicit timezone offset. +/// +/// Examples of accepted inputs: +/// * `1997-01-31T09:26:56.123Z` # RCF3339 +/// * `1997-01-31T09:26:56.123-05:00` # RCF3339 +/// * `1997-01-31 09:26:56.123-05:00` # close to RCF3339 but with a space rather than T +/// * `1997-01-31T09:26:56.123` # close to RCF3339 but no timezone offset specified +/// * `1997-01-31 09:26:56.123` # close to RCF3339 but uses a space and no timezone offset +/// * `1997-01-31 09:26:56` # close to RCF3339, no fractional seconds +// +/// Internally, this function uses the `chrono` library for the +/// datetime parsing +/// +/// We hope to extend this function in the future with a second +/// parameter to specifying the format string. +/// +/// ## Timestamp Precision +/// +/// Function uses the maximum precision timestamps supported by +/// Arrow (nanoseconds stored as a 64-bit integer) timestamps. This +/// means the range of dates that timestamps can represent is ~1677 AD +/// to 2262 AM +/// +/// +/// ## Timezone / Offset Handling +/// +/// Numerical values of timestamps are stored compared to offset UTC. +/// +/// This function intertprets strings without an explicit time zone as +/// timestamps with offsets of the local time on the machine +/// +/// For example, `1997-01-31 09:26:56.123Z` is interpreted as UTC, as +/// it has an explicit timezone specifier (“Z” for Zulu/UTC) +/// +/// `1997-01-31T09:26:56.123` is interpreted as a local timestamp in +/// the timezone of the machine. For example, if +/// the system timezone is set to Americas/New_York (UTC-5) the +/// timestamp will be interpreted as though it were +/// `1997-01-31T09:26:56.123-05:00` +#[inline] +pub fn string_to_timestamp_nanos(s: &str) -> Result { + // Fast path: RFC3339 timestamp (with a T) + // Example: 2020-09-08T13:42:29.190855Z + if let Ok(ts) = DateTime::parse_from_rfc3339(s) { + return Ok(ts.timestamp_nanos()); + } + + // Implement quasi-RFC3339 support by trying to parse the + // timestamp with various other format specifiers to to support + // separating the date and time with a space ' ' rather than 'T' to be + // (more) compatible with Apache Spark SQL + + // timezone offset, using ' ' as a separator + // Example: 2020-09-08 13:42:29.190855-05:00 + if let Ok(ts) = DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%:z") { + return Ok(ts.timestamp_nanos()); + } + + // with an explicit Z, using ' ' as a separator + // Example: 2020-09-08 13:42:29Z + if let Ok(ts) = Utc.datetime_from_str(s, "%Y-%m-%d %H:%M:%S%.fZ") { + return Ok(ts.timestamp_nanos()); + } + + // Support timestamps without an explicit timezone offset, again + // to be compatible with what Apache Spark SQL does. + + // without a timezone specifier as a local time, using T as a separator + // Example: 2020-09-08T13:42:29.190855 + if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S.%f") { + return naive_datetime_to_timestamp(s, ts); + } + + // without a timezone specifier as a local time, using T as a + // separator, no fractional seconds + // Example: 2020-09-08T13:42:29 + if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") { + return naive_datetime_to_timestamp(s, ts); + } + + // without a timezone specifier as a local time, using ' ' as a separator + // Example: 2020-09-08 13:42:29.190855 + if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S.%f") { + return naive_datetime_to_timestamp(s, ts); + } + + // without a timezone specifier as a local time, using ' ' as a + // separator, no fractional seconds + // Example: 2020-09-08 13:42:29 + if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") { + return naive_datetime_to_timestamp(s, ts); + } + + // Note we don't pass along the error message from the underlying + // chrono parsing because we tried several different format + // strings and we don't know which the user was trying to + // match. Ths any of the specific error messages is likely to be + // be more confusing than helpful + Err(ArrowError::CastError(format!( + "Error parsing '{}' as timestamp", + s + ))) +} + +/// Converts the naive datetime (which has no specific timezone) to a +/// nanosecond epoch timestamp relative to UTC. +fn naive_datetime_to_timestamp(s: &str, datetime: NaiveDateTime) -> Result { + let l = Local {}; + + match l.from_local_datetime(&datetime) { + LocalResult::None => Err(ArrowError::CastError(format!( + "Error parsing '{}' as timestamp: local time representation is invalid", + s + ))), + LocalResult::Single(local_datetime) => { + Ok(local_datetime.with_timezone(&Utc).timestamp_nanos()) + } + // Ambiguous times can happen if the timestamp is exactly when + // a daylight savings time transition occurs, for example, and + // so the datetime could validly be said to be in two + // potential offsets. However, since we are about to convert + // to UTC anyways, we can pick one arbitrarily + LocalResult::Ambiguous(local_datetime, _) => { + Ok(local_datetime.with_timezone(&Utc).timestamp_nanos()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn string_to_timestamp_timezone() -> Result<()> { + // Explicit timezone + assert_eq!( + 1599572549190855000, + parse_timestamp("2020-09-08T13:42:29.190855+00:00")? + ); + assert_eq!( + 1599572549190855000, + parse_timestamp("2020-09-08T13:42:29.190855Z")? + ); + assert_eq!( + 1599572549000000000, + parse_timestamp("2020-09-08T13:42:29Z")? + ); // no fractional part + assert_eq!( + 1599590549190855000, + parse_timestamp("2020-09-08T13:42:29.190855-05:00")? + ); + Ok(()) + } + + #[test] + fn string_to_timestamp_timezone_space() -> Result<()> { + // Ensure space rather than T between time and date is accepted + assert_eq!( + 1599572549190855000, + parse_timestamp("2020-09-08 13:42:29.190855+00:00")? + ); + assert_eq!( + 1599572549190855000, + parse_timestamp("2020-09-08 13:42:29.190855Z")? + ); + assert_eq!( + 1599572549000000000, + parse_timestamp("2020-09-08 13:42:29Z")? + ); // no fractional part + assert_eq!( + 1599590549190855000, + parse_timestamp("2020-09-08 13:42:29.190855-05:00")? + ); + Ok(()) + } + + /// Interprets a naive_datetime (with no explicit timzone offset) + /// using the local timezone and returns the timestamp in UTC (0 + /// offset) + fn naive_datetime_to_timestamp(naive_datetime: &NaiveDateTime) -> i64 { + // Note: Use chrono APIs that are different than + // naive_datetime_to_timestamp to compute the utc offset to + // try and double check the logic + let utc_offset_secs = match Local.offset_from_local_datetime(&naive_datetime) { + LocalResult::Single(local_offset) => { + local_offset.fix().local_minus_utc() as i64 + } + _ => panic!("Unexpected failure converting to local datetime"), + }; + let utc_offset_nanos = utc_offset_secs * 1_000_000_000; + naive_datetime.timestamp_nanos() - utc_offset_nanos + } + + #[test] + fn string_to_timestamp_no_timezone() -> Result<()> { + // This test is designed to succeed in regardless of the local + // timezone the test machine is running. Thus it is still + // somewhat suceptable to bugs in the use of chrono + let naive_datetime = NaiveDateTime::new( + NaiveDate::from_ymd(2020, 9, 8), + NaiveTime::from_hms_nano(13, 42, 29, 190855), + ); + + // Ensure both T and ' ' variants work + assert_eq!( + naive_datetime_to_timestamp(&naive_datetime), + parse_timestamp("2020-09-08T13:42:29.190855")? + ); + + assert_eq!( + naive_datetime_to_timestamp(&naive_datetime), + parse_timestamp("2020-09-08 13:42:29.190855")? + ); + + // Also ensure that parsing timestamps with no fractional + // second part works as well + let naive_datetime_whole_secs = NaiveDateTime::new( + NaiveDate::from_ymd(2020, 9, 8), + NaiveTime::from_hms(13, 42, 29), + ); + + // Ensure both T and ' ' variants work + assert_eq!( + naive_datetime_to_timestamp(&naive_datetime_whole_secs), + parse_timestamp("2020-09-08T13:42:29")? + ); + + assert_eq!( + naive_datetime_to_timestamp(&naive_datetime_whole_secs), + parse_timestamp("2020-09-08 13:42:29")? + ); + + Ok(()) + } + + #[test] + fn string_to_timestamp_invalid() { + // Test parsing invalid formats + + // It would be nice to make these messages better + expect_timestamp_parse_error("", "Error parsing '' as timestamp"); + expect_timestamp_parse_error("SS", "Error parsing 'SS' as timestamp"); + expect_timestamp_parse_error( + "Wed, 18 Feb 2015 23:16:09 GMT", + "Error parsing 'Wed, 18 Feb 2015 23:16:09 GMT' as timestamp", + ); + } + + // Parse a timestamp to timestamp int with a useful human readable error message + fn parse_timestamp(s: &str) -> Result { + let result = string_to_timestamp_nanos(s); + if let Err(e) = &result { + eprintln!("Error parsing timestamp '{}': {:?}", s, e); + } + result + } + + fn expect_timestamp_parse_error(s: &str, expected_err: &str) { + match string_to_timestamp_nanos(s) { + Ok(v) => panic!( + "Expected error '{}' while parsing '{}', but parsed {} instead", + expected_err, s, v + ), + Err(e) => { + assert!(e.to_string().contains(expected_err), + "Can not find expected error '{}' while parsing '{}'. Actual error '{}'", + expected_err, s, e); + } + } + } +} diff --git a/rust/arrow/src/compute/kernels/mod.rs b/rust/arrow/src/compute/kernels/mod.rs index 62d3642f7e8..a8d24979e04 100644 --- a/rust/arrow/src/compute/kernels/mod.rs +++ b/rust/arrow/src/compute/kernels/mod.rs @@ -22,6 +22,7 @@ pub mod arithmetic; pub mod arity; pub mod boolean; pub mod cast; +pub mod cast_utils; pub mod comparison; pub mod concat; pub mod filter; diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 9ad3691d4fc..bdcffae0794 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -40,7 +40,7 @@ //! let batch = csv.next().unwrap().unwrap(); //! ``` -use core::cmp::min; +use csv_core::ReadRecordResult; use lazy_static::lazy_static; use regex::{Regex, RegexBuilder}; use std::collections::HashSet; @@ -56,7 +56,7 @@ use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; -use self::csv_crate::{ByteRecord, StringRecord}; +use self::csv_crate::StringRecord; lazy_static! { static ref DECIMAL_RE: Regex = Regex::new(r"^-?(\d+\.\d+)$").unwrap(); @@ -100,7 +100,7 @@ fn infer_field_schema(string: &str) -> DataType { /// /// Return infered schema and number of records used for inference. fn infer_file_schema( - reader: &mut R, + reader: R, delimiter: u8, max_read_records: Option, has_header: bool, @@ -207,7 +207,7 @@ pub fn infer_schema_from_files( for fname in files.iter() { let (schema, records_read) = infer_file_schema( - &mut File::open(fname)?, + File::open(fname)?, delimiter, Some(records_to_read), has_header, @@ -235,15 +235,19 @@ pub struct Reader { /// Optional projection for which columns to load (zero-based column indices) projection: Option>, /// File reader - reader: csv_crate::Reader, + reader: R, /// Current line number line_number: usize, /// Maximum number of rows to read end: usize, /// Number of records per batch batch_size: usize, - /// Vector that can hold the `StringRecord`s of the batches - batch_records: Vec, + /// + csv_reader: csv_core::Reader, + /// Vector keeping the offsets of the values + offsets: Vec, + /// Vector keeping the data + data: Vec, } impl fmt::Debug for Reader @@ -307,47 +311,63 @@ impl Reader { bounds: Bounds, projection: Option>, ) -> Self { - let mut reader_builder = csv_crate::ReaderBuilder::new(); - reader_builder.has_headers(has_header); + let mut reader_builder = csv_core::ReaderBuilder::new(); + let header_row = if has_header { 1 } else { 0 }; if let Some(c) = delimiter { reader_builder.delimiter(c); } - let mut csv_reader = reader_builder.from_reader(reader); - let (start, end) = match bounds { - None => (0, usize::MAX), - Some((start, end)) => (start, end), + None => (0 + header_row, usize::MAX), + Some((start, end)) => (start + header_row, end), }; + //TODO: header + skipping + // First we will skip `start` rows // note that this skips by iteration. This is because in general it is not possible // to seek in CSV. However, skiping still saves the burden of creating arrow arrays, // which is a slow operation that scales with the number of columns - - let mut record = ByteRecord::new(); - // Skip first start items - for _ in 0..start { - let res = csv_reader.read_byte_record(&mut record); - if !res.unwrap_or(false) { - break; - } - } - - // Initialize batch_records with StringRecords so they - // can be reused accross batches - let mut batch_records = Vec::with_capacity(batch_size); - batch_records.resize_with(batch_size, Default::default); + let csv_reader = reader_builder.build(); + // Skip first start items TODO + // let buf_in = &mut [0; 0]; + // let mut csv_reader = csv_core::Reader::new(); + // let mut read = reader.read(buf_in).unwrap(); + // let mut count_records = 0; + // let mut buf_in_start = 0; + // while count_records < start { + // let (result, n_in, _) = + // csv_reader.read_field(&buf_in[buf_in_start..read], &mut [0; 1024]); + // buf_in_start += n_in; + // match result { + // ReadFieldResult::InputEmpty => { + // read = reader.read(buf_in).unwrap(); + // if read == 0 { + // break; + // } + // buf_in_start = 0; + // } + // ReadFieldResult::OutputFull => panic!("field too large"), + // ReadFieldResult::Field { record_end } => { + // if record_end { + // count_records += 1; + // } + // } + // ReadFieldResult::End => break, + // } + // } Self { schema, projection, - reader: csv_reader, + reader, line_number: if has_header { start + 1 } else { start }, batch_size, end, - batch_records, + csv_reader, + offsets: vec![], + data: Vec::new(), } } } @@ -359,22 +379,68 @@ impl Iterator for Reader { let remaining = self.end - self.line_number; let mut read_records = 0; - for i in 0..min(self.batch_size, remaining) { - match self.reader.read_record(&mut self.batch_records[i]) { - Ok(true) => { + // TODO, reuse buffers + let buf_in = &mut [0; 100000]; + let csv_buf = &mut [0; 100000]; + let mut current_offset = 0; + self.offsets.clear(); + self.data.clear(); + self.offsets.push(0); + let mut read = self.reader.read(buf_in).ok()?; + let mut buf_in_start = 0; + // +1 to allow trailing delimiter + let num_ends = self.schema().fields().len(); + let mut ends = vec![0; self.schema().fields().len() + 1]; + while read_records <= std::cmp::min(self.batch_size, remaining) { + let (result, n_in, n_out, n_ends) = self.csv_reader.read_record( + &buf_in[buf_in_start..read], + csv_buf, + &mut ends, + ); + buf_in_start += n_in; + match result { + ReadRecordResult::InputEmpty | ReadRecordResult::End => { + read = self.reader.read(buf_in).ok()?; + if read == 0 { + if n_out > 0 { + // One record (without terminator) + self.data.extend_from_slice(&csv_buf[..n_out]); + + for end in ends.iter().take(num_ends - 1) { + self.offsets.push(current_offset + end); + } + current_offset += n_out; + self.offsets.push(current_offset); + read_records += 1; + } + println!("empty!"); + break; + } + buf_in_start = 0; + } + ReadRecordResult::OutputFull => panic!("field too large"), // TODO: grow buffer + ReadRecordResult::Record => { + assert!(n_ends >= num_ends); + self.data.extend_from_slice(&csv_buf[..n_out]); + for end in ends.iter().take(num_ends) { + self.offsets.push(current_offset + end); + } + current_offset += n_out; read_records += 1; } - Ok(false) => break, - Err(e) => { - return Some(Err(ArrowError::ParseError(format!( - "Error parsing line {}: {:?}", - self.line_number + i, - e - )))) + ReadRecordResult::OutputEndsFull => { + panic!("more ends!") } } } + println!( + "read {} {} {}", + read_records, + self.offsets.len(), + self.data.len() + ); + // return early if no data was loaded if read_records == 0 { return None; @@ -382,7 +448,8 @@ impl Iterator for Reader { // parse the batches into a RecordBatch let result = parse( - &self.batch_records[..read_records], + &self.data, + &self.offsets, &self.schema.fields(), &self.projection, self.line_number, @@ -396,7 +463,8 @@ impl Iterator for Reader { /// parses a slice of [csv_crate::StringRecord] into a [array::record_batch::RecordBatch]. fn parse( - rows: &[StringRecord], + row_data: &Vec, + row_offsets: &Vec, fields: &[Field], projection: &Option>, line_number: usize, @@ -406,69 +474,140 @@ fn parse( None => fields.iter().enumerate().map(|(i, _)| i).collect(), }; - let arrays: Result> = projection - .iter() - .map(|i| { - let i = *i; - let field = &fields[i]; - match field.data_type() { - &DataType::Boolean => build_boolean_array(line_number, rows, i), - &DataType::Int8 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Int16 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Int32 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Int64 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::UInt8 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::UInt16 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::UInt32 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::UInt64 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Float32 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Float64 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Date32 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Date64 => { - build_primitive_array::(line_number, rows, i) - } - &DataType::Timestamp(TimeUnit::Microsecond, _) => { - build_primitive_array::( + let arrays: Result> = + projection + .iter() + .map(|i| { + let i = *i; + let field = &fields[i]; + match field.data_type() { + &DataType::Boolean => build_boolean_array( line_number, - rows, + row_data, + row_offsets, i, - ) - } - &DataType::Timestamp(TimeUnit::Nanosecond, _) => { - build_primitive_array::(line_number, rows, i) + fields.len(), + ), + &DataType::Int8 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Int16 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Int32 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Int64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt8 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt16 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt32 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::UInt64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Float32 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Float64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Date32 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Date64 => build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ), + &DataType::Timestamp(TimeUnit::Microsecond, _) => { + build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ) + } + &DataType::Timestamp(TimeUnit::Nanosecond, _) => { + build_primitive_array::( + line_number, + row_data, + row_offsets, + i, + fields.len(), + ) + } + &DataType::Utf8 => Ok(Arc::new(StringArray::from_iter_values( + row_offsets.windows(2).skip(i).step_by(fields.len()).map( + |window| { + let str = + std::str::from_utf8(&row_data[window[0]..window[1]]) + .unwrap() + .trim_matches('"'); + str + }, + ), + )) as ArrayRef), + other => Err(ArrowError::ParseError(format!( + "Unsupported data type {:?}", + other + ))), } - &DataType::Utf8 => Ok(Arc::new( - rows.iter().map(|row| row.get(i)).collect::(), - ) as ArrayRef), - other => Err(ArrowError::ParseError(format!( - "Unsupported data type {:?}", - other - ))), - } - }) - .collect(); + }) + .collect(); let projected_fields: Vec = projection.iter().map(|i| fields[*i].clone()).collect(); @@ -569,10 +708,10 @@ fn parse_item(string: &str) -> Option { T::parse(string) } -fn parse_bool(string: &str) -> Option { - if string.eq_ignore_ascii_case("false") { +fn parse_bool(string: &[u8]) -> Option { + if string.eq_ignore_ascii_case(b"false") { Some(false) - } else if string.eq_ignore_ascii_case("true") { + } else if string.eq_ignore_ascii_case(b"true") { Some(true) } else { None @@ -582,31 +721,35 @@ fn parse_bool(string: &str) -> Option { // parses a specific column (col_idx) into an Arrow Array. fn build_primitive_array( line_number: usize, - rows: &[StringRecord], + data: &Vec, + offsets: &Vec, col_idx: usize, + num_cols: usize, ) -> Result { - rows.iter() + offsets + .windows(2) + .skip(col_idx) + .step_by(num_cols) .enumerate() - .map(|(row_index, row)| { - match row.get(col_idx) { - Some(s) => { - if s.is_empty() { - return Ok(None); - } + .map(|(row_index, window)| { + let str = &data[window[0]..window[1]]; + if str.is_empty() { + return Ok(None); + } - let parsed = parse_item::(s); - match parsed { - Some(e) => Ok(Some(e)), - None => Err(ArrowError::ParseError(format!( - // TODO: we should surface the underlying error here. - "Error while parsing value {} for column {} at line {}", - s, - col_idx, - line_number + row_index - ))), - } - } - None => Ok(None), + let str = std::str::from_utf8(str) + .map_err(|_| ArrowError::ParseError(format!("x")))?; + println!("{}, {} {} {}", str, T::DATA_TYPE, window[0], window[1]); + let parsed = parse_item::(str); + match parsed { + Some(e) => Ok(Some(e)), + None => Err(ArrowError::ParseError(format!( + // TODO: we should surface the underlying error here. + "Error while parsing primitive value {} for column {} at line {}", + str, + col_idx, + line_number + row_index + ))), } }) .collect::>>() @@ -616,31 +759,31 @@ fn build_primitive_array( // parses a specific column (col_idx) into an Arrow Array. fn build_boolean_array( line_number: usize, - rows: &[StringRecord], + data: &Vec, + offsets: &Vec, col_idx: usize, + num_cols: usize, ) -> Result { - rows.iter() + offsets + .windows(2) + .skip(col_idx) + .step_by(num_cols) .enumerate() - .map(|(row_index, row)| { - match row.get(col_idx) { - Some(s) => { - if s.is_empty() { - return Ok(None); - } - - let parsed = parse_bool(s); - match parsed { - Some(e) => Ok(Some(e)), - None => Err(ArrowError::ParseError(format!( - // TODO: we should surface the underlying error here. - "Error while parsing value {} for column {} at line {}", - s, - col_idx, - line_number + row_index - ))), - } - } - None => Ok(None), + .map(|(row_index, window)| { + let str = &data[window[0]..window[1]]; + if str.is_empty() { + return Ok(None); + } + let parsed = parse_bool(str); + match parsed { + Some(e) => Ok(Some(e)), + None => Err(ArrowError::ParseError(format!( + // TODO: we should surface the underlying error here. + "Error while parsing boolean value {:?} for column {} at line {}", + str, + col_idx, + line_number + row_index + ))), } }) .collect::>() @@ -756,33 +899,28 @@ impl ReaderBuilder { self } - /// Create a new `Reader` from the `ReaderBuilder` - pub fn build(self, mut reader: R) -> Result> { - // check if schema should be inferred - let delimiter = self.delimiter.unwrap_or(b','); - let schema = match self.schema { - Some(schema) => schema, - None => { - let (inferred_schema, _) = infer_file_schema( - &mut reader, - delimiter, - self.max_records, - self.has_header, - )?; - - Arc::new(inferred_schema) - } - }; - Ok(Reader::from_reader( - reader, - schema, - self.has_header, - self.delimiter, - self.batch_size, - None, - self.projection.clone(), - )) - } + // pub fn build(self, mut reader: R) -> Result> { + // // check if schema should be inferred + // let delimiter = self.delimiter.unwrap_or(b','); + // let schema = match self.schema { + // Some(schema) => schema, + // None => { + // let (inferred_schema, _) = + // infer_file_schema(&mut reader, delimiter, self.max_records, self.has_header)?; + + // Arc::new(inferred_schema) + // } + // }; + // Ok(Reader::from_reader( + // &reader, + // schema, + // self.has_header, + // self.delimiter, + // self.batch_size, + // None, + // self.projection.clone(), + // )) + // } } #[cfg(test)] @@ -866,78 +1004,78 @@ mod tests { assert_eq!(3, batch.num_columns()); } - #[test] - fn test_csv_with_schema_inference() { - let file = File::open("test/data/uk_cities_with_headers.csv").unwrap(); - - let builder = ReaderBuilder::new().has_header(true).infer_schema(None); - - let mut csv = builder.build(file).unwrap(); - let expected_schema = Schema::new(vec![ - Field::new("city", DataType::Utf8, false), - Field::new("lat", DataType::Float64, false), - Field::new("lng", DataType::Float64, false), - ]); - assert_eq!(Arc::new(expected_schema), csv.schema()); - let batch = csv.next().unwrap().unwrap(); - assert_eq!(37, batch.num_rows()); - assert_eq!(3, batch.num_columns()); - - // access data from a primitive array - let lat = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert!(57.653484 - lat.value(0) < f64::EPSILON); - - // access data from a string array (ListArray) - let city = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); - } - - #[test] - fn test_csv_with_schema_inference_no_headers() { - let file = File::open("test/data/uk_cities.csv").unwrap(); - - let builder = ReaderBuilder::new().infer_schema(None); - - let mut csv = builder.build(file).unwrap(); - - // csv field names should be 'column_{number}' - let schema = csv.schema(); - assert_eq!("column_1", schema.field(0).name()); - assert_eq!("column_2", schema.field(1).name()); - assert_eq!("column_3", schema.field(2).name()); - let batch = csv.next().unwrap().unwrap(); - let batch_schema = batch.schema(); - - assert_eq!(schema, batch_schema); - assert_eq!(37, batch.num_rows()); - assert_eq!(3, batch.num_columns()); - - // access data from a primitive array - let lat = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert!(57.653484 - lat.value(0) < f64::EPSILON); - - // access data from a string array (ListArray) - let city = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); - } + // #[test] + // fn test_csv_with_schema_inference() { + // let file = File::open("test/data/uk_cities_with_headers.csv").unwrap(); + + // let builder = ReaderBuilder::new().has_header(true).infer_schema(None); + + // let mut csv = builder.build(file).unwrap(); + // let expected_schema = Schema::new(vec![ + // Field::new("city", DataType::Utf8, false), + // Field::new("lat", DataType::Float64, false), + // Field::new("lng", DataType::Float64, false), + // ]); + // assert_eq!(Arc::new(expected_schema), csv.schema()); + // let batch = csv.next().unwrap().unwrap(); + // assert_eq!(37, batch.num_rows()); + // assert_eq!(3, batch.num_columns()); + + // // access data from a primitive array + // let lat = batch + // .column(1) + // .as_any() + // .downcast_ref::() + // .unwrap(); + // assert!(57.653484 - lat.value(0) < f64::EPSILON); + + // // access data from a string array (ListArray) + // let city = batch + // .column(0) + // .as_any() + // .downcast_ref::() + // .unwrap(); + + // assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); + // } + + // #[test] + // fn test_csv_with_schema_inference_no_headers() { + // let file = File::open("test/data/uk_cities.csv").unwrap(); + + // let builder = ReaderBuilder::new().infer_schema(None); + + // let mut csv = builder.build(file).unwrap(); + + // // csv field names should be 'column_{number}' + // let schema = csv.schema(); + // assert_eq!("column_1", schema.field(0).name()); + // assert_eq!("column_2", schema.field(1).name()); + // assert_eq!("column_3", schema.field(2).name()); + // let batch = csv.next().unwrap().unwrap(); + // let batch_schema = batch.schema(); + + // assert_eq!(schema, batch_schema); + // assert_eq!(37, batch.num_rows()); + // assert_eq!(3, batch.num_columns()); + + // // access data from a primitive array + // let lat = batch + // .column(1) + // .as_any() + // .downcast_ref::() + // .unwrap(); + // assert!(57.653484 - lat.value(0) < f64::EPSILON); + + // // access data from a string array (ListArray) + // let city = batch + // .column(0) + // .as_any() + // .downcast_ref::() + // .unwrap(); + + // assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); + // } #[test] fn test_csv_with_projection() { @@ -975,6 +1113,7 @@ mod tests { Field::new("c_int", DataType::UInt64, false), Field::new("c_float", DataType::Float32, false), Field::new("c_string", DataType::Utf8, false), + Field::new("c_bool", DataType::Boolean, false), ]); let file = File::open("test/data/null_test.csv").unwrap(); @@ -989,90 +1128,89 @@ mod tests { assert_eq!(false, batch.column(1).is_null(4)); } - #[test] - fn test_nulls_with_inference() { - let file = File::open("test/data/various_types.csv").unwrap(); - - let builder = ReaderBuilder::new() - .infer_schema(None) - .has_header(true) - .with_delimiter(b'|') - .with_batch_size(512) - .with_projection(vec![0, 1, 2, 3, 4, 5]); - - let mut csv = builder.build(file).unwrap(); - let batch = csv.next().unwrap().unwrap(); - - assert_eq!(5, batch.num_rows()); - assert_eq!(6, batch.num_columns()); - - let schema = batch.schema(); - - assert_eq!(&DataType::Int64, schema.field(0).data_type()); - assert_eq!(&DataType::Float64, schema.field(1).data_type()); - assert_eq!(&DataType::Float64, schema.field(2).data_type()); - assert_eq!(&DataType::Boolean, schema.field(3).data_type()); - assert_eq!(&DataType::Date32, schema.field(4).data_type()); - assert_eq!(&DataType::Date64, schema.field(5).data_type()); - - let names: Vec<&str> = - schema.fields().iter().map(|x| x.name().as_str()).collect(); - assert_eq!( - names, - vec![ - "c_int", - "c_float", - "c_string", - "c_bool", - "c_date", - "c_datetime" - ] - ); - - assert_eq!(false, schema.field(0).is_nullable()); - assert_eq!(true, schema.field(1).is_nullable()); - assert_eq!(true, schema.field(2).is_nullable()); - assert_eq!(false, schema.field(3).is_nullable()); - assert_eq!(true, schema.field(4).is_nullable()); - assert_eq!(true, schema.field(5).is_nullable()); - - assert_eq!(false, batch.column(1).is_null(0)); - assert_eq!(false, batch.column(1).is_null(1)); - assert_eq!(true, batch.column(1).is_null(2)); - assert_eq!(false, batch.column(1).is_null(3)); - assert_eq!(false, batch.column(1).is_null(4)); - } - - #[test] - fn test_parse_invalid_csv() { - let file = File::open("test/data/various_types_invalid.csv").unwrap(); - - let schema = Schema::new(vec![ - Field::new("c_int", DataType::UInt64, false), - Field::new("c_float", DataType::Float32, false), - Field::new("c_string", DataType::Utf8, false), - Field::new("c_bool", DataType::Boolean, false), - ]); - - let builder = ReaderBuilder::new() - .with_schema(Arc::new(schema)) - .has_header(true) - .with_delimiter(b'|') - .with_batch_size(512) - .with_projection(vec![0, 1, 2, 3]); - - let mut csv = builder.build(file).unwrap(); - match csv.next() { - Some(e) => match e { - Err(e) => assert_eq!( - "ParseError(\"Error while parsing value 4.x4 for column 1 at line 4\")", - format!("{:?}", e) - ), - Ok(_) => panic!("should have failed"), - }, - None => panic!("should have failed"), - } - } + // #[test] + // fn test_nulls_with_inference() { + // let file = File::open("test/data/various_types.csv").unwrap(); + + // let builder = ReaderBuilder::new() + // .infer_schema(None) + // .has_header(true) + // .with_delimiter(b'|') + // .with_batch_size(512) + // .with_projection(vec![0, 1, 2, 3, 4, 5]); + + // let mut csv = builder.build(file).unwrap(); + // let batch = csv.next().unwrap().unwrap(); + + // assert_eq!(5, batch.num_rows()); + // assert_eq!(6, batch.num_columns()); + + // let schema = batch.schema(); + + // assert_eq!(&DataType::Int64, schema.field(0).data_type()); + // assert_eq!(&DataType::Float64, schema.field(1).data_type()); + // assert_eq!(&DataType::Float64, schema.field(2).data_type()); + // assert_eq!(&DataType::Boolean, schema.field(3).data_type()); + // assert_eq!(&DataType::Date32, schema.field(4).data_type()); + // assert_eq!(&DataType::Date64, schema.field(5).data_type()); + + // let names: Vec<&str> = schema.fields().iter().map(|x| x.name().as_str()).collect(); + // assert_eq!( + // names, + // vec![ + // "c_int", + // "c_float", + // "c_string", + // "c_bool", + // "c_date", + // "c_datetime" + // ] + // ); + + // assert_eq!(false, schema.field(0).is_nullable()); + // assert_eq!(true, schema.field(1).is_nullable()); + // assert_eq!(true, schema.field(2).is_nullable()); + // assert_eq!(false, schema.field(3).is_nullable()); + // assert_eq!(true, schema.field(4).is_nullable()); + // assert_eq!(true, schema.field(5).is_nullable()); + + // assert_eq!(false, batch.column(1).is_null(0)); + // assert_eq!(false, batch.column(1).is_null(1)); + // assert_eq!(true, batch.column(1).is_null(2)); + // assert_eq!(false, batch.column(1).is_null(3)); + // assert_eq!(false, batch.column(1).is_null(4)); + // } + + // #[test] + // fn test_parse_invalid_csv() { + // let file = File::open("test/data/various_types_invalid.csv").unwrap(); + + // let schema = Schema::new(vec![ + // Field::new("c_int", DataType::UInt64, false), + // Field::new("c_float", DataType::Float32, false), + // Field::new("c_string", DataType::Utf8, false), + // Field::new("c_bool", DataType::Boolean, false), + // ]); + + // let builder = ReaderBuilder::new() + // .with_schema(Arc::new(schema)) + // .has_header(true) + // .with_delimiter(b'|') + // .with_batch_size(512) + // .with_projection(vec![0, 1, 2, 3]); + + // let mut csv = builder.build(file).unwrap(); + // match csv.next() { + // Some(e) => match e { + // Err(e) => assert_eq!( + // "ParseError(\"Error while parsing value 4.x4 for column 1 at line 4\")", + // format!("{:?}", e) + // ), + // Ok(_) => panic!("should have failed"), + // }, + // None => panic!("should have failed"), + // } + // } #[test] fn test_infer_field_schema() { @@ -1172,10 +1310,10 @@ mod tests { .join("\n"); let data = data.as_bytes(); - let reader = std::io::Cursor::new(data); + let mut reader = std::io::Cursor::new(data); let mut csv = Reader::new( - reader, + &mut reader, Arc::new(schema), false, None, @@ -1201,21 +1339,21 @@ mod tests { #[test] fn test_parsing_bool() { // Encode the expected behavior of boolean parsing - assert_eq!(Some(true), parse_bool("true")); - assert_eq!(Some(true), parse_bool("tRUe")); - assert_eq!(Some(true), parse_bool("True")); - assert_eq!(Some(true), parse_bool("TRUE")); - assert_eq!(None, parse_bool("t")); - assert_eq!(None, parse_bool("T")); - assert_eq!(None, parse_bool("")); - - assert_eq!(Some(false), parse_bool("false")); - assert_eq!(Some(false), parse_bool("fALse")); - assert_eq!(Some(false), parse_bool("False")); - assert_eq!(Some(false), parse_bool("FALSE")); - assert_eq!(None, parse_bool("f")); - assert_eq!(None, parse_bool("F")); - assert_eq!(None, parse_bool("")); + assert_eq!(Some(true), parse_bool(b"true")); + assert_eq!(Some(true), parse_bool(b"tRUe")); + assert_eq!(Some(true), parse_bool(b"True")); + assert_eq!(Some(true), parse_bool(b"TRUE")); + assert_eq!(None, parse_bool(b"t")); + assert_eq!(None, parse_bool(b"T")); + assert_eq!(None, parse_bool(b"")); + + assert_eq!(Some(false), parse_bool(b"false")); + assert_eq!(Some(false), parse_bool(b"fALse")); + assert_eq!(Some(false), parse_bool(b"False")); + assert_eq!(Some(false), parse_bool(b"FALSE")); + assert_eq!(None, parse_bool(b"f")); + assert_eq!(None, parse_bool(b"F")); + assert_eq!(None, parse_bool(b"")); } #[test] diff --git a/rust/arrow/src/csv/writer.rs b/rust/arrow/src/csv/writer.rs index e9d8565b2a5..f38f71e5ec6 100644 --- a/rust/arrow/src/csv/writer.rs +++ b/rust/arrow/src/csv/writer.rs @@ -628,7 +628,7 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03\n"; buf.set_position(0); let mut reader = Reader::new( - buf, + &mut buf, Arc::new(schema), false, None, diff --git a/rust/arrow/src/error.rs b/rust/arrow/src/error.rs index 2b9b2b577fe..861c7080094 100644 --- a/rust/arrow/src/error.rs +++ b/rust/arrow/src/error.rs @@ -27,6 +27,7 @@ pub enum ArrowError { /// Returned when functionality is not yet available. NotYetImplemented(String), ExternalError(Box), + CastError(String), MemoryError(String), ParseError(String), SchemaError(String), @@ -96,6 +97,7 @@ impl Display for ArrowError { write!(f, "Not yet implemented: {}", &source) } ArrowError::ExternalError(source) => write!(f, "External error: {}", &source), + ArrowError::CastError(desc) => write!(f, "Cast error: {}", desc), ArrowError::MemoryError(desc) => write!(f, "Memory error: {}", desc), ArrowError::ParseError(desc) => write!(f, "Parser error: {}", desc), ArrowError::SchemaError(desc) => write!(f, "Schema error: {}", desc), diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index a6fd020fa0b..2977d9816ca 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -287,6 +287,18 @@ impl ExecutionContext { .insert(name.to_string(), provider.into()); } + /// Deregisters the named table. + /// + /// Returns true if the table was successfully de-reregistered. + pub fn deregister_table(&mut self, name: &str) -> bool { + self.state + .lock() + .unwrap() + .datasources + .remove(&name.to_string()) + .is_some() + } + /// Retrieves a DataFrame representing a table previously registered by calling the /// register_table function. /// @@ -723,6 +735,21 @@ mod tests { Ok(()) } + #[tokio::test] + async fn register_deregister() -> Result<()> { + let tmp_dir = TempDir::new()?; + let partition_count = 4; + let mut ctx = create_ctx(&tmp_dir, partition_count)?; + + let provider = test::create_table_dual(); + ctx.register_table("dual", provider); + + assert_eq!(ctx.deregister_table("dual"), true); + assert_eq!(ctx.deregister_table("dual"), false); + + Ok(()) + } + #[tokio::test] async fn parallel_query_with_filter() -> Result<()> { let tmp_dir = TempDir::new()?; @@ -1668,6 +1695,8 @@ mod tests { assert_eq!(a.value(i) + b.value(i), sum.value(i)); } + ctx.deregister_table("t"); + Ok(()) } diff --git a/rust/datafusion/src/physical_plan/csv.rs b/rust/datafusion/src/physical_plan/csv.rs index d62d18756f8..6740b04627e 100644 --- a/rust/datafusion/src/physical_plan/csv.rs +++ b/rust/datafusion/src/physical_plan/csv.rs @@ -213,10 +213,7 @@ impl CsvExec { } /// Infer schema for given CSV dataset - pub fn try_infer_schema( - filenames: &[String], - options: &CsvReadOptions, - ) -> Result { + pub fn try_infer_schema(filenames: &[String], options: &CsvReadOptions) -> Result { Ok(csv::infer_schema_from_files( filenames, options.delimiter, @@ -308,10 +305,7 @@ impl CsvStream { impl Stream for CsvStream { type Item = ArrowResult; - fn poll_next( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(self.reader.next()) } } @@ -362,8 +356,7 @@ mod tests { let testdata = arrow::util::test_util::arrow_test_data(); let filename = "aggregate_test_100.csv"; let path = format!("{}/csv/{}", testdata, filename); - let csv = - CsvExec::try_new(&path, CsvReadOptions::new().schema(&schema), None, 1024)?; + let csv = CsvExec::try_new(&path, CsvReadOptions::new().schema(&schema), None, 1024)?; assert_eq!(13, csv.schema.fields().len()); assert_eq!(13, csv.projected_schema.fields().len()); assert_eq!(13, csv.file_schema().fields().len()); diff --git a/rust/datafusion/src/physical_plan/datetime_expressions.rs b/rust/datafusion/src/physical_plan/datetime_expressions.rs index a70ebcc4fef..34414586983 100644 --- a/rust/datafusion/src/physical_plan/datetime_expressions.rs +++ b/rust/datafusion/src/physical_plan/datetime_expressions.rs @@ -23,149 +23,11 @@ use crate::error::{DataFusionError, Result}; use arrow::{ array::{Array, ArrayData, ArrayRef, StringArray, TimestampNanosecondArray}, buffer::Buffer, + compute::kernels::cast_utils::string_to_timestamp_nanos, datatypes::{DataType, TimeUnit, ToByteSlice}, }; +use chrono::prelude::*; use chrono::Duration; -use chrono::{prelude::*, LocalResult}; - -#[inline] -/// Accepts a string in RFC3339 / ISO8601 standard format and some -/// variants and converts it to a nanosecond precision timestamp. -/// -/// Implements the `to_timestamp` function to convert a string to a -/// timestamp, following the model of spark SQL’s to_`timestamp`. -/// -/// In addition to RFC3339 / ISO8601 standard timestamps, it also -/// accepts strings that use a space ` ` to separate the date and time -/// as well as strings that have no explicit timezone offset. -/// -/// Examples of accepted inputs: -/// * `1997-01-31T09:26:56.123Z` # RCF3339 -/// * `1997-01-31T09:26:56.123-05:00` # RCF3339 -/// * `1997-01-31 09:26:56.123-05:00` # close to RCF3339 but with a space rather than T -/// * `1997-01-31T09:26:56.123` # close to RCF3339 but no timezone offset specified -/// * `1997-01-31 09:26:56.123` # close to RCF3339 but uses a space and no timezone offset -/// * `1997-01-31 09:26:56` # close to RCF3339, no fractional seconds -// -/// Internally, this function uses the `chrono` library for the -/// datetime parsing -/// -/// We hope to extend this function in the future with a second -/// parameter to specifying the format string. -/// -/// ## Timestamp Precision -/// -/// DataFusion uses the maximum precision timestamps supported by -/// Arrow (nanoseconds stored as a 64-bit integer) timestamps. This -/// means the range of dates that timestamps can represent is ~1677 AD -/// to 2262 AM -/// -/// -/// ## Timezone / Offset Handling -/// -/// By using the Arrow format, DataFusion inherits Arrow’s handling of -/// timestamp values. Specifically, the stored numerical values of -/// timestamps are stored compared to offset UTC. -/// -/// This function intertprets strings without an explicit time zone as -/// timestamps with offsets of the local time on the machine that ran -/// the datafusion query -/// -/// For example, `1997-01-31 09:26:56.123Z` is interpreted as UTC, as -/// it has an explicit timezone specifier (“Z” for Zulu/UTC) -/// -/// `1997-01-31T09:26:56.123` is interpreted as a local timestamp in -/// the timezone of the machine that ran DataFusion. For example, if -/// the system timezone is set to Americas/New_York (UTC-5) the -/// timestamp will be interpreted as though it were -/// `1997-01-31T09:26:56.123-05:00` -fn string_to_timestamp_nanos(s: &str) -> Result { - // Fast path: RFC3339 timestamp (with a T) - // Example: 2020-09-08T13:42:29.190855Z - if let Ok(ts) = DateTime::parse_from_rfc3339(s) { - return Ok(ts.timestamp_nanos()); - } - - // Implement quasi-RFC3339 support by trying to parse the - // timestamp with various other format specifiers to to support - // separating the date and time with a space ' ' rather than 'T' to be - // (more) compatible with Apache Spark SQL - - // timezone offset, using ' ' as a separator - // Example: 2020-09-08 13:42:29.190855-05:00 - if let Ok(ts) = DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%:z") { - return Ok(ts.timestamp_nanos()); - } - - // with an explicit Z, using ' ' as a separator - // Example: 2020-09-08 13:42:29Z - if let Ok(ts) = Utc.datetime_from_str(s, "%Y-%m-%d %H:%M:%S%.fZ") { - return Ok(ts.timestamp_nanos()); - } - - // Support timestamps without an explicit timezone offset, again - // to be compatible with what Apache Spark SQL does. - - // without a timezone specifier as a local time, using T as a separator - // Example: 2020-09-08T13:42:29.190855 - if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S.%f") { - return naive_datetime_to_timestamp(s, ts); - } - - // without a timezone specifier as a local time, using T as a - // separator, no fractional seconds - // Example: 2020-09-08T13:42:29 - if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") { - return naive_datetime_to_timestamp(s, ts); - } - - // without a timezone specifier as a local time, using ' ' as a separator - // Example: 2020-09-08 13:42:29.190855 - if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S.%f") { - return naive_datetime_to_timestamp(s, ts); - } - - // without a timezone specifier as a local time, using ' ' as a - // separator, no fractional seconds - // Example: 2020-09-08 13:42:29 - if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") { - return naive_datetime_to_timestamp(s, ts); - } - - // Note we don't pass along the error message from the underlying - // chrono parsing because we tried several different format - // strings and we don't know which the user was trying to - // match. Ths any of the specific error messages is likely to be - // be more confusing than helpful - Err(DataFusionError::Execution(format!( - "Error parsing '{}' as timestamp", - s - ))) -} - -/// Converts the naive datetime (which has no specific timezone) to a -/// nanosecond epoch timestamp relative to UTC. -fn naive_datetime_to_timestamp(s: &str, datetime: NaiveDateTime) -> Result { - let l = Local {}; - - match l.from_local_datetime(&datetime) { - LocalResult::None => Err(DataFusionError::Execution(format!( - "Error parsing '{}' as timestamp: local time representation is invalid", - s - ))), - LocalResult::Single(local_datetime) => { - Ok(local_datetime.with_timezone(&Utc).timestamp_nanos()) - } - // Ambiguous times can happen if the timestamp is exactly when - // a daylight savings time transition occurs, for example, and - // so the datetime could validly be said to be in two - // potential offsets. However, since we are about to convert - // to UTC anyways, we can pick one arbitrarily - LocalResult::Ambiguous(local_datetime, _) => { - Ok(local_datetime.with_timezone(&Utc).timestamp_nanos()) - } - } -} /// convert an array of strings into `Timestamp(Nanosecond, None)` pub fn to_timestamp(args: &[ArrayRef]) -> Result { @@ -189,6 +51,7 @@ pub fn to_timestamp(args: &[ArrayRef]) -> Result { Ok(0) } else { string_to_timestamp_nanos(string_args.value(i)) + .map_err(DataFusionError::ArrowError) } }) .collect::>>()?; @@ -316,145 +179,6 @@ mod tests { use super::*; - #[test] - fn string_to_timestamp_timezone() -> Result<()> { - // Explicit timezone - assert_eq!( - 1599572549190855000, - parse_timestamp("2020-09-08T13:42:29.190855+00:00")? - ); - assert_eq!( - 1599572549190855000, - parse_timestamp("2020-09-08T13:42:29.190855Z")? - ); - assert_eq!( - 1599572549000000000, - parse_timestamp("2020-09-08T13:42:29Z")? - ); // no fractional part - assert_eq!( - 1599590549190855000, - parse_timestamp("2020-09-08T13:42:29.190855-05:00")? - ); - Ok(()) - } - - #[test] - fn string_to_timestamp_timezone_space() -> Result<()> { - // Ensure space rather than T between time and date is accepted - assert_eq!( - 1599572549190855000, - parse_timestamp("2020-09-08 13:42:29.190855+00:00")? - ); - assert_eq!( - 1599572549190855000, - parse_timestamp("2020-09-08 13:42:29.190855Z")? - ); - assert_eq!( - 1599572549000000000, - parse_timestamp("2020-09-08 13:42:29Z")? - ); // no fractional part - assert_eq!( - 1599590549190855000, - parse_timestamp("2020-09-08 13:42:29.190855-05:00")? - ); - Ok(()) - } - - /// Interprets a naive_datetime (with no explicit timzone offset) - /// using the local timezone and returns the timestamp in UTC (0 - /// offset) - fn naive_datetime_to_timestamp(naive_datetime: &NaiveDateTime) -> i64 { - // Note: Use chrono APIs that are different than - // naive_datetime_to_timestamp to compute the utc offset to - // try and double check the logic - let utc_offset_secs = match Local.offset_from_local_datetime(&naive_datetime) { - LocalResult::Single(local_offset) => { - local_offset.fix().local_minus_utc() as i64 - } - _ => panic!("Unexpected failure converting to local datetime"), - }; - let utc_offset_nanos = utc_offset_secs * 1_000_000_000; - naive_datetime.timestamp_nanos() - utc_offset_nanos - } - - #[test] - fn string_to_timestamp_no_timezone() -> Result<()> { - // This test is designed to succeed in regardless of the local - // timezone the test machine is running. Thus it is still - // somewhat suceptable to bugs in the use of chrono - let naive_datetime = NaiveDateTime::new( - NaiveDate::from_ymd(2020, 9, 8), - NaiveTime::from_hms_nano(13, 42, 29, 190855), - ); - - // Ensure both T and ' ' variants work - assert_eq!( - naive_datetime_to_timestamp(&naive_datetime), - parse_timestamp("2020-09-08T13:42:29.190855")? - ); - - assert_eq!( - naive_datetime_to_timestamp(&naive_datetime), - parse_timestamp("2020-09-08 13:42:29.190855")? - ); - - // Also ensure that parsing timestamps with no fractional - // second part works as well - let naive_datetime_whole_secs = NaiveDateTime::new( - NaiveDate::from_ymd(2020, 9, 8), - NaiveTime::from_hms(13, 42, 29), - ); - - // Ensure both T and ' ' variants work - assert_eq!( - naive_datetime_to_timestamp(&naive_datetime_whole_secs), - parse_timestamp("2020-09-08T13:42:29")? - ); - - assert_eq!( - naive_datetime_to_timestamp(&naive_datetime_whole_secs), - parse_timestamp("2020-09-08 13:42:29")? - ); - - Ok(()) - } - - #[test] - fn string_to_timestamp_invalid() { - // Test parsing invalid formats - - // It would be nice to make these messages better - expect_timestamp_parse_error("", "Error parsing '' as timestamp"); - expect_timestamp_parse_error("SS", "Error parsing 'SS' as timestamp"); - expect_timestamp_parse_error( - "Wed, 18 Feb 2015 23:16:09 GMT", - "Error parsing 'Wed, 18 Feb 2015 23:16:09 GMT' as timestamp", - ); - } - - // Parse a timestamp to timestamp int with a useful human readable error message - fn parse_timestamp(s: &str) -> Result { - let result = string_to_timestamp_nanos(s); - if let Err(e) = &result { - eprintln!("Error parsing timestamp '{}': {:?}", s, e); - } - result - } - - fn expect_timestamp_parse_error(s: &str, expected_err: &str) { - match string_to_timestamp_nanos(s) { - Ok(v) => panic!( - "Expected error '{}' while parsing '{}', but parsed {} instead", - expected_err, s, v - ), - Err(e) => { - assert!(e.to_string().contains(expected_err), - "Can not find expected error '{}' while parsing '{}'. Actual error '{}'", - expected_err, s, e); - } - } - } - #[test] fn to_timestamp_arrays_and_nulls() -> Result<()> { // ensure that arrow array implementation is wired up and handles nulls correctly diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs b/rust/datafusion/src/physical_plan/hash_aggregate.rs index 54d25f17dba..342ca320f86 100644 --- a/rust/datafusion/src/physical_plan/hash_aggregate.rs +++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs @@ -77,10 +77,14 @@ pub struct HashAggregateExec { group_expr: Vec<(Arc, String)>, /// Aggregate expressions aggr_expr: Vec>, - /// Input plan + /// Input plan, could be a partial aggregate or the input to the aggregate input: Arc, /// Schema after the aggregate is applied schema: SchemaRef, + /// Input schema before any aggregation is applied. For partial aggregate this will be the + /// same as input.schema() but for the final aggregate it will be the same as the input + /// to the partial aggregate + input_schema: SchemaRef, } fn create_schema( @@ -123,6 +127,7 @@ impl HashAggregateExec { group_expr: Vec<(Arc, String)>, aggr_expr: Vec>, input: Arc, + input_schema: SchemaRef, ) -> Result { let schema = create_schema(&input.schema(), &group_expr, &aggr_expr, mode)?; @@ -134,6 +139,7 @@ impl HashAggregateExec { aggr_expr, input, schema, + input_schema, }) } @@ -156,6 +162,11 @@ impl HashAggregateExec { pub fn input(&self) -> &Arc { &self.input } + + /// Get the input schema before any aggregates are applied + pub fn input_schema(&self) -> SchemaRef { + self.input_schema.clone() + } } #[async_trait] @@ -217,6 +228,7 @@ impl ExecutionPlan for HashAggregateExec { self.group_expr.clone(), self.aggr_expr.clone(), children[0].clone(), + self.input_schema.clone(), )?)), _ => Err(DataFusionError::Internal( "HashAggregateExec wrong number of children".to_string(), @@ -1048,11 +1060,13 @@ mod tests { DataType::Float64, ))]; + let input_schema = input.schema(); let partial_aggregate = Arc::new(HashAggregateExec::try_new( AggregateMode::Partial, groups.clone(), aggregates.clone(), input, + input_schema.clone(), )?); let result = common::collect(partial_aggregate.execute(0).await?).await?; @@ -1082,6 +1096,7 @@ mod tests { .collect(), aggregates, merge, + input_schema, )?); let result = common::collect(merged_aggregate.execute(0).await?).await?; diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index 93bb94327a4..3f2df33d8f8 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -186,6 +186,7 @@ impl DefaultPhysicalPlanner { } => { // Initially need to perform the aggregate and then merge the partitions let input_exec = self.create_physical_plan(input, ctx_state)?; + let input_schema = input_exec.schema(); let physical_input_schema = input_exec.as_ref().schema(); let logical_input_schema = input.as_ref().schema(); @@ -219,6 +220,7 @@ impl DefaultPhysicalPlanner { groups.clone(), aggregates.clone(), input_exec, + input_schema.clone(), )?); let final_group: Vec> = @@ -235,6 +237,7 @@ impl DefaultPhysicalPlanner { .collect(), aggregates, initial_aggr, + input_schema, )?)) } LogicalPlan::Projection { input, expr, .. } => { @@ -978,6 +981,29 @@ mod tests { Ok(()) } + #[test] + fn hash_agg_input_schema() -> Result<()> { + let testdata = arrow::util::test_util::arrow_test_data(); + let path = format!("{}/csv/aggregate_test_100.csv", testdata); + + let options = CsvReadOptions::new().schema_infer_max_records(100); + let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)? + .aggregate(&[col("c1")], &[sum(col("c2"))])? + .build()?; + + let execution_plan = plan(&logical_plan)?; + let final_hash_agg = execution_plan + .as_any() + .downcast_ref::() + .expect("hash aggregate"); + assert_eq!("SUM(c2)", final_hash_agg.schema().field(1).name()); + // we need access to the input to the partial aggregate so that other projects can + // implement serde + assert_eq!("c2", final_hash_agg.input_schema().field(1).name()); + + Ok(()) + } + /// An example extension node that doesn't do anything struct NoOpExtensionNode { schema: DFSchemaRef, diff --git a/rust/datafusion/src/scalar.rs b/rust/datafusion/src/scalar.rs index 31ca1da8edf..d5a6c141179 100644 --- a/rust/datafusion/src/scalar.rs +++ b/rust/datafusion/src/scalar.rs @@ -20,7 +20,7 @@ use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc}; use arrow::array::{ - Int16Builder, Int32Builder, Int64Builder, Int8Builder, ListBuilder, + Date64Array, Int16Builder, Int32Builder, Int64Builder, Int8Builder, ListBuilder, TimestampMicrosecondArray, TimestampNanosecondArray, UInt16Builder, UInt32Builder, UInt64Builder, UInt8Builder, }; @@ -75,6 +75,8 @@ pub enum ScalarValue { List(Option>, DataType), /// Date stored as a signed 32bit int Date32(Option), + /// Date stored as a signed 64bit int + Date64(Option), /// Timestamp Microseconds TimeMicrosecond(Option), /// Timestamp Nanoseconds @@ -156,6 +158,7 @@ impl ScalarValue { DataType::List(Box::new(Field::new("item", data_type.clone(), true))) } ScalarValue::Date32(_) => DataType::Date32, + ScalarValue::Date64(_) => DataType::Date64, ScalarValue::IntervalYearMonth(_) => { DataType::Interval(IntervalUnit::YearMonth) } @@ -329,6 +332,12 @@ impl ScalarValue { } None => Arc::new(repeat(None).take(size).collect::()), }, + ScalarValue::Date64(e) => match e { + Some(value) => { + Arc::new(Date64Array::from_iter_values(repeat(*value).take(size))) + } + None => Arc::new(repeat(None).take(size).collect::()), + }, ScalarValue::IntervalDayTime(e) => match e { Some(value) => Arc::new(IntervalDayTimeArray::from_iter_values( repeat(*value).take(size), @@ -386,6 +395,9 @@ impl ScalarValue { DataType::Date32 => { typed_cast!(array, index, Date32Array, Date32) } + DataType::Date64 => { + typed_cast!(array, index, Date64Array, Date64) + } other => { return Err(DataFusionError::NotImplemented(format!( "Can't create a scalar of array of type \"{:?}\"", @@ -580,6 +592,7 @@ impl fmt::Display for ScalarValue { None => write!(f, "NULL")?, }, ScalarValue::Date32(e) => format_option!(f, e)?, + ScalarValue::Date64(e) => format_option!(f, e)?, ScalarValue::IntervalDayTime(e) => format_option!(f, e)?, ScalarValue::IntervalYearMonth(e) => format_option!(f, e)?, }; @@ -609,6 +622,7 @@ impl fmt::Debug for ScalarValue { ScalarValue::LargeUtf8(Some(_)) => write!(f, "LargeUtf8(\"{}\")", self), ScalarValue::List(_, _) => write!(f, "List([{}])", self), ScalarValue::Date32(_) => write!(f, "Date32(\"{}\")", self), + ScalarValue::Date64(_) => write!(f, "Date64(\"{}\")", self), ScalarValue::IntervalDayTime(_) => { write!(f, "IntervalDayTime(\"{}\")", self) } diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 5d505fd2ec6..58310f50856 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -649,7 +649,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .iter() .map(|e| { Ok(Expr::Sort { - expr: Box::new(self.sql_to_rex(&e.expr, &input_schema).unwrap()), + expr: Box::new(self.sql_to_rex(&e.expr, &input_schema)?), // by default asc asc: e.asc.unwrap_or(true), // by default nulls first to be consistent with spark