diff --git a/rust/arrow/src/compute/kernels/cast.rs b/rust/arrow/src/compute/kernels/cast.rs index 2a00be59cd64..d9a2cbee101c 100644 --- a/rust/arrow/src/compute/kernels/cast.rs +++ b/rust/arrow/src/compute/kernels/cast.rs @@ -35,6 +35,7 @@ //! assert_eq!(7.0, c.value(2)); //! ``` +use std::str; use std::sync::Arc; use crate::array::*; @@ -203,6 +204,22 @@ pub fn cast(array: &ArrayRef, to_type: &DataType) -> Result { Int64 => cast_numeric_to_string::(array), Float32 => cast_numeric_to_string::(array), Float64 => cast_numeric_to_string::(array), + Binary => { + let from = array.as_any().downcast_ref::().unwrap(); + let mut b = StringBuilder::new(array.len()); + for i in 0..array.len() { + if array.is_null(i) { + b.append_null()?; + } else { + match str::from_utf8(from.value(i)) { + Ok(s) => b.append_value(s)?, + Err(_) => b.append_null()?, // not valid UTF8 + } + } + } + + Ok(Arc::new(b.finish()) as ArrayRef) + } _ => Err(ArrowError::ComputeError(format!( "Casting from {:?} to {:?} not supported", from_type, to_type, diff --git a/rust/datafusion/examples/parquet_sql.rs b/rust/datafusion/examples/parquet_sql.rs index d8bb9f86473d..142ef090f85a 100644 --- a/rust/datafusion/examples/parquet_sql.rs +++ b/rust/datafusion/examples/parquet_sql.rs @@ -39,7 +39,7 @@ fn main() -> Result<()> { )?; // simple selection - let sql = "SELECT int_col, double_col, date_string_col FROM alltypes_plain WHERE id > 1 AND tinyint_col < double_col"; + let sql = "SELECT int_col, double_col, CAST(date_string_col as VARCHAR) FROM alltypes_plain WHERE id > 1 AND tinyint_col < double_col"; // create the query plan let plan = ctx.create_logical_plan(&sql)?; diff --git a/rust/datafusion/src/datasource/csv.rs b/rust/datafusion/src/datasource/csv.rs index a879abe1b592..fb3014e73736 100644 --- a/rust/datafusion/src/datasource/csv.rs +++ b/rust/datafusion/src/datasource/csv.rs @@ -50,8 +50,8 @@ impl CsvFile { } impl TableProvider for CsvFile { - fn schema(&self) -> &Arc { - &self.schema + fn schema(&self) -> Arc { + self.schema.clone() } fn scan( diff --git a/rust/datafusion/src/datasource/datasource.rs b/rust/datafusion/src/datasource/datasource.rs index 7249731d06b0..48f521966725 100644 --- a/rust/datafusion/src/datasource/datasource.rs +++ b/rust/datafusion/src/datasource/datasource.rs @@ -31,7 +31,7 @@ pub type ScanResult = Arc>; /// Source table pub trait TableProvider { /// Get a reference to the schema for this table - fn schema(&self) -> &Arc; + fn schema(&self) -> Arc; /// Perform a scan of a table and return a sequence of iterators over the data (one /// iterator per partition) diff --git a/rust/datafusion/src/datasource/memory.rs b/rust/datafusion/src/datasource/memory.rs index 0fec2d61890d..787aa3b50e44 100644 --- a/rust/datafusion/src/datasource/memory.rs +++ b/rust/datafusion/src/datasource/memory.rs @@ -66,8 +66,8 @@ impl MemTable { } impl TableProvider for MemTable { - fn schema(&self) -> &Arc { - &self.schema + fn schema(&self) -> Arc { + self.schema.clone() } fn scan( diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index 25e72694c828..a725e3109092 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -17,54 +17,38 @@ //! Parquet data source -use std::fs::File; use std::string::String; -use std::sync::{Arc, Mutex}; -use std::thread; +use std::sync::Arc; -use crossbeam::channel::{unbounded, Receiver, Sender}; - -use arrow::array::{ - Array, PrimitiveArray, PrimitiveBuilder, StringBuilder, TimestampNanosecondBuilder, -}; use arrow::datatypes::*; -use arrow::record_batch::RecordBatch; - -use parquet::arrow::schema::parquet_to_arrow_schema; -use parquet::column::reader::*; -use parquet::data_type::{ByteArray, Int96}; -use parquet::file::reader::*; use crate::datasource::{ScanResult, TableProvider}; -use crate::error::{ExecutionError, Result}; -use crate::execution::physical_plan::common; -use crate::execution::physical_plan::BatchIterator; +use crate::error::Result; +use crate::execution::physical_plan::parquet::ParquetExec; +use crate::execution::physical_plan::{common, ExecutionPlan}; /// Table-based representation of a `ParquetFile` pub struct ParquetTable { - filenames: Vec, + path: String, schema: Arc, } impl ParquetTable { /// Attempt to initialize a new `ParquetTable` from a file path pub fn try_new(path: &str) -> Result { - let mut filenames: Vec = vec![]; - common::build_file_list(path, &mut filenames, ".parquet")?; - if filenames.is_empty() { - Err(ExecutionError::General("No files found".to_string())) - } else { - let parquet_file = ParquetFile::open(&filenames[0], None, 0)?; - let schema = parquet_file.projection_schema.clone(); - Ok(Self { filenames, schema }) - } + let parquet_exec = ParquetExec::try_new(path, None, 0)?; + let schema = parquet_exec.schema(); + Ok(Self { + path: path.to_string(), + schema, + }) } } impl TableProvider for ParquetTable { /// Get the schema for this parquet file - fn schema(&self) -> &Arc { - &self.schema + fn schema(&self) -> Arc { + self.schema.clone() } /// Scan the file(s), using the provided projection, and return one BatchIterator per @@ -74,475 +58,19 @@ impl TableProvider for ParquetTable { projection: &Option>, batch_size: usize, ) -> Result> { - Ok(self - .filenames - .iter() - .map(|filename| { - ParquetScanPartition::try_new(filename, projection.clone(), batch_size) - .and_then(|part| { - Ok(Arc::new(Mutex::new(part)) as Arc>) - }) - }) - .collect::>>()?) - } -} - -/// Loader and reader for parquet data -pub struct ParquetFile { - reader: SerializedFileReader, - /// Projection expressed as column indices into underlying parquet reader - projection: Vec, - /// The schema of the projection - projection_schema: Arc, - batch_size: usize, - row_group_index: usize, - current_row_group: Option>, - column_readers: Vec, -} - -/// Thread-safe wrapper around a ParquetFile -struct ParquetScanPartition { - schema: Arc, - request_tx: Sender<()>, - response_rx: Receiver>>, -} - -impl ParquetScanPartition { - pub fn try_new( - filename: &str, - projection: Option>, - batch_size: usize, - ) -> Result { - // determine the schema after the projection is applied - let schema = match &projection { - Some(p) => { - let table = ParquetFile::open(&filename, Some(p.clone()), batch_size)?; - table.schema().clone() - } - None => { - let table = ParquetFile::open(&filename, None, batch_size)?; - table.schema().clone() - } - }; - - // because the parquet implementation is not thread-safe, it is necessary to execute - // on a thread and communicate with channels - let (request_tx, request_rx): (Sender<()>, Receiver<()>) = unbounded(); - let (response_tx, response_rx): ( - Sender>>, - Receiver>>, - ) = unbounded(); - let filename = filename.to_string(); - thread::spawn(move || { - match ParquetFile::open(&filename, projection, batch_size) { - Ok(mut table) => { - while let Ok(_) = request_rx.recv() { - response_tx.send(table.next()).unwrap(); - } - } - Err(e) => { - response_tx.send(Err(e)).unwrap(); - } - } - }); - - Ok(Self { - schema, - request_tx, - response_rx, - }) - } -} - -impl BatchIterator for ParquetScanPartition { - fn schema(&self) -> Arc { - self.schema.clone() - } - - fn next(&mut self) -> Result> { - match self.request_tx.send(()) { - Ok(_) => match self.response_rx.recv() { - Ok(batch) => batch, - Err(e) => Err(ExecutionError::General(format!( - "Error receiving batch: {:?}", - e - ))), - }, - _ => Err(ExecutionError::General( - "Error sending request for next batch".to_string(), - )), - } - } -} - -macro_rules! read_binary_column { - ($SELF:ident, $R:ident, $INDEX:expr) => {{ - let mut read_buffer: Vec = - vec![ByteArray::default(); $SELF.batch_size]; - let mut def_levels: Vec = vec![0; $SELF.batch_size]; - let (_, levels_read) = $R.read_batch( - $SELF.batch_size, - Some(&mut def_levels), - None, - &mut read_buffer, - )?; - let mut builder = StringBuilder::new(levels_read); - let mut value_index = 0; - for i in 0..levels_read { - if def_levels[i] > 0 { - builder.append_value( - &String::from_utf8(read_buffer[value_index].data().to_vec()).unwrap(), - )?; - value_index += 1; - } else { - builder.append_null()?; - } - } - Arc::new(builder.finish()) - }}; -} - -trait ArrowReader -where - T: ArrowPrimitiveType, -{ - fn read( - &mut self, - batch_size: usize, - is_nullable: bool, - ) -> Result>>; -} - -impl ArrowReader for ColumnReaderImpl

-where - A: ArrowPrimitiveType, - P: parquet::data_type::DataType, - P::T: std::convert::From, - A::Native: std::convert::From, -{ - fn read( - &mut self, - batch_size: usize, - is_nullable: bool, - ) -> Result>> { - // create read buffer - let mut read_buffer: Vec = vec![A::default_value().into(); batch_size]; - - if is_nullable { - let mut def_levels: Vec = vec![0; batch_size]; - - let (values_read, levels_read) = self.read_batch( - batch_size, - Some(&mut def_levels), - None, - &mut read_buffer, - )?; - let mut builder = PrimitiveBuilder::::new(levels_read); - let converted_buffer: Vec = - read_buffer.into_iter().map(|v| v.into()).collect(); - if values_read == levels_read { - builder.append_slice(&converted_buffer[0..values_read])?; - } else { - let mut value_index = 0; - for i in 0..levels_read { - if def_levels[i] != 0 { - builder.append_value(converted_buffer[value_index].into())?; - value_index += 1; - } else { - builder.append_null()?; - } - } - } - Ok(Arc::new(builder.finish())) - } else { - let (values_read, _) = - self.read_batch(batch_size, None, None, &mut read_buffer)?; - - let mut builder = PrimitiveBuilder::::new(values_read); - let converted_buffer: Vec = - read_buffer.into_iter().map(|v| v.into()).collect(); - builder.append_slice(&converted_buffer[0..values_read])?; - Ok(Arc::new(builder.finish())) - } - } -} - -impl ParquetFile { - /// Read parquet data from a `File` - pub fn open( - filename: &str, - projection: Option>, - batch_size: usize, - ) -> Result { - let file = File::open(filename)?; - let reader = SerializedFileReader::new(file)?; - - let metadata = reader.metadata(); - let schema = - parquet_to_arrow_schema(metadata.file_metadata().schema_descr_ptr())?; - - // even if we aren't referencing structs or lists in our projection, column reader - // indexes will be off until we have support for nested schemas - for i in 0..schema.fields().len() { - match schema.field(i).data_type() { - DataType::List(_) => { - return Err(ExecutionError::NotImplemented( - "Parquet datasource does not support LIST".to_string(), - )); - } - DataType::Struct(_) => { - return Err(ExecutionError::NotImplemented( - "Parquet datasource does not support STRUCT".to_string(), - )); - } - _ => {} - } - } - - let projection = match projection { - Some(p) => p, - None => { - let mut p = Vec::with_capacity(schema.fields().len()); - for i in 0..schema.fields().len() { - p.push(i); - } - p - } - }; - - let projected_schema = schema_projection(&schema, &projection)?; - - Ok(ParquetFile { - reader: reader, - row_group_index: 0, - projection_schema: projected_schema, - projection, - batch_size, - current_row_group: None, - column_readers: vec![], - }) - } - - fn load_next_row_group(&mut self) -> Result<()> { - if self.row_group_index < self.reader.num_row_groups() { - let reader = self.reader.get_row_group(self.row_group_index)?; - - self.column_readers.clear(); - self.column_readers = Vec::with_capacity(self.projection.len()); - - for i in 0..self.projection.len() { - self.column_readers - .push(reader.get_column_reader(self.projection[i])?); - } - - self.current_row_group = Some(reader); - self.row_group_index += 1; - - Ok(()) - } else { - Err(ExecutionError::General( - "Attempt to read past final row group".to_string(), - )) - } - } - - fn load_batch(&mut self) -> Result> { - match &self.current_row_group { - Some(reader) => { - let mut batch: Vec> = - Vec::with_capacity(reader.num_columns()); - for i in 0..self.column_readers.len() { - let dt = self.schema().field(i).data_type().clone(); - let is_nullable = self.schema().field(i).is_nullable(); - let array: Arc = match self.column_readers[i] { - ColumnReader::BoolColumnReader(ref mut r) => { - ArrowReader::::read( - r, - self.batch_size, - is_nullable, - )? - } - ColumnReader::Int32ColumnReader(ref mut r) => match dt { - DataType::Date32(DateUnit::Day) => { - ArrowReader::::read( - r, - self.batch_size, - is_nullable, - )? - } - DataType::Time32(TimeUnit::Millisecond) => { - ArrowReader::::read( - r, - self.batch_size, - is_nullable, - )? - } - _ => ArrowReader::::read( - r, - self.batch_size, - is_nullable, - )?, - }, - ColumnReader::Int64ColumnReader(ref mut r) => match dt { - DataType::Time64(TimeUnit::Microsecond) => { - ArrowReader::::read( - r, - self.batch_size, - is_nullable, - )? - } - DataType::Time64(TimeUnit::Nanosecond) => { - ArrowReader::::read( - r, - self.batch_size, - is_nullable, - )? - } - DataType::Timestamp(TimeUnit::Millisecond, None) => { - ArrowReader::::read( - r, - self.batch_size, - is_nullable, - )? - } - DataType::Timestamp(TimeUnit::Microsecond, None) => { - ArrowReader::::read( - r, - self.batch_size, - is_nullable, - )? - } - DataType::Timestamp(TimeUnit::Nanosecond, None) => { - ArrowReader::::read( - r, - self.batch_size, - is_nullable, - )? - } - _ => ArrowReader::::read( - r, - self.batch_size, - is_nullable, - )?, - }, - ColumnReader::Int96ColumnReader(ref mut r) => { - let mut read_buffer: Vec = - vec![Int96::new(); self.batch_size]; - - let mut def_levels: Vec = vec![0; self.batch_size]; - let (_, levels_read) = r.read_batch( - self.batch_size, - Some(&mut def_levels), - None, - &mut read_buffer, - )?; - - let mut builder = - TimestampNanosecondBuilder::new(levels_read); - let mut value_index = 0; - for i in 0..levels_read { - if def_levels[i] > 0 { - builder.append_value(convert_int96_timestamp( - read_buffer[value_index].data(), - ))?; - value_index += 1; - } else { - builder.append_null()?; - } - } - Arc::new(builder.finish()) - } - ColumnReader::FloatColumnReader(ref mut r) => { - ArrowReader::::read( - r, - self.batch_size, - is_nullable, - )? - } - ColumnReader::DoubleColumnReader(ref mut r) => { - ArrowReader::::read( - r, - self.batch_size, - is_nullable, - )? - } - ColumnReader::FixedLenByteArrayColumnReader(ref mut r) => { - read_binary_column!(self, r, i) - } - ColumnReader::ByteArrayColumnReader(ref mut r) => { - read_binary_column!(self, r, i) - } - }; - - batch.push(array); - } - - if batch.len() == 0 || batch[0].data().len() == 0 { - Ok(None) - } else { - Ok(Some(RecordBatch::try_new( - self.projection_schema.clone(), - batch, - )?)) - } - } - _ => Ok(None), - } - } -} - -/// Create a new schema by applying a projection to this schema's fields -fn schema_projection(schema: &Schema, projection: &[usize]) -> Result> { - let mut fields: Vec = Vec::with_capacity(projection.len()); - for i in projection { - if *i < schema.fields().len() { - fields.push(schema.field(*i).clone()); - } else { - return Err(ExecutionError::InvalidColumn(format!( - "Invalid column index {} in projection", - i - ))); - } - } - Ok(Arc::new(Schema::new(fields))) -} - -/// convert a Parquet INT96 to an Arrow timestamp in nanoseconds -fn convert_int96_timestamp(v: &[u32]) -> i64 { - const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588; - const SECONDS_PER_DAY: i64 = 86_400; - const MILLIS_PER_SECOND: i64 = 1_000; + let mut filenames: Vec = vec![]; + common::build_file_list(&self.path, &mut filenames, ".parquet")?; - let day = v[2] as i64; - let nanoseconds = ((v[1] as i64) << 32) + v[0] as i64; - let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY; - seconds * MILLIS_PER_SECOND * 1_000_000 + nanoseconds -} + let parquet_exec = + ParquetExec::try_new(&self.path, projection.clone(), batch_size)?; -impl ParquetFile { - fn schema(&self) -> &Arc { - &self.projection_schema - } + let partitions = parquet_exec.partitions()?; - fn next(&mut self) -> Result> { - // advance the row group reader if necessary - if self.current_row_group.is_none() { - self.load_next_row_group()?; - self.load_batch() - } else { - match self.load_batch() { - Ok(Some(b)) => Ok(Some(b)), - Ok(None) => { - if self.row_group_index < self.reader.num_row_groups() { - self.load_next_row_group()?; - self.load_batch() - } else { - Ok(None) - } - } - Err(e) => Err(e), - } - } + let iterators = partitions + .iter() + .map(|p| p.execute()) + .collect::>>()?; + Ok(iterators) } } @@ -550,7 +78,7 @@ impl ParquetFile { mod tests { use super::*; use arrow::array::{ - BooleanArray, Float32Array, Float64Array, Int32Array, StringArray, + BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, TimestampNanosecondArray, }; use std::env; @@ -594,8 +122,8 @@ mod tests { bigint_col: Int64\n\ float_col: Float32\n\ double_col: Float64\n\ - date_string_col: Utf8\n\ - string_col: Utf8\n\ + date_string_col: Binary\n\ + string_col: Binary\n\ timestamp_col: Timestamp(Nanosecond, None)", y ); @@ -744,7 +272,7 @@ mod tests { } #[test] - fn read_utf8_alltypes_plain_parquet() { + fn read_binary_alltypes_plain_parquet() { let table = load_table("alltypes_plain.parquet"); let projection = Some(vec![9]); @@ -758,11 +286,11 @@ mod tests { let array = batch .column(0) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap(); - let mut values: Vec = vec![]; + let mut values: Vec<&str> = vec![]; for i in 0..batch.num_rows() { - values.push(array.value(i).to_string()); + values.push(std::str::from_utf8(array.value(i)).unwrap()); } assert_eq!( diff --git a/rust/datafusion/src/execution/physical_plan/expressions.rs b/rust/datafusion/src/execution/physical_plan/expressions.rs index 8cc1fa4656aa..2c9c5a957f8f 100644 --- a/rust/datafusion/src/execution/physical_plan/expressions.rs +++ b/rust/datafusion/src/execution/physical_plan/expressions.rs @@ -1018,6 +1018,8 @@ impl CastExpr { && (is_numeric(&cast_type) || cast_type == DataType::Utf8) { Ok(Self { expr, cast_type }) + } else if expr_type == DataType::Binary && cast_type == DataType::Utf8 { + Ok(Self { expr, cast_type }) } else { Err(ExecutionError::General(format!( "Invalid CAST from {:?} to {:?}", diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs index 966075f6c5ad..44095f05f8aa 100644 --- a/rust/datafusion/src/execution/physical_plan/mod.rs +++ b/rust/datafusion/src/execution/physical_plan/mod.rs @@ -92,5 +92,6 @@ pub mod expressions; pub mod hash_aggregate; pub mod limit; pub mod merge; +pub mod parquet; pub mod projection; pub mod selection; diff --git a/rust/datafusion/src/execution/physical_plan/parquet.rs b/rust/datafusion/src/execution/physical_plan/parquet.rs new file mode 100644 index 000000000000..4afd1eb3a2a3 --- /dev/null +++ b/rust/datafusion/src/execution/physical_plan/parquet.rs @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Execution plan for reading Parquet files + +use std::fs::File; +use std::rc::Rc; +use std::sync::{Arc, Mutex}; +use std::thread; + +use crate::error::{ExecutionError, Result}; +use crate::execution::physical_plan::common; +use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition}; +use arrow::datatypes::Schema; +use arrow::record_batch::RecordBatch; +use parquet::file::reader::SerializedFileReader; + +use crossbeam::channel::{unbounded, Receiver, Sender}; +use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; + +/// Execution plan for scanning a Parquet file +pub struct ParquetExec { + /// Path to directory containing partitioned Parquet files with the same schema + filenames: Vec, + /// Schema after projection is applied + schema: Arc, + /// Projection for which columns to load + projection: Vec, + /// Batch size + batch_size: usize, +} + +impl ParquetExec { + /// Create a new Parquet reader execution plan + pub fn try_new( + path: &str, + projection: Option>, + batch_size: usize, + ) -> Result { + let mut filenames: Vec = vec![]; + common::build_file_list(path, &mut filenames, ".parquet")?; + if filenames.is_empty() { + Err(ExecutionError::General("No files found".to_string())) + } else { + let file = File::open(&filenames[0])?; + let file_reader = Rc::new(SerializedFileReader::new(file)?); + let mut arrow_reader = ParquetFileArrowReader::new(file_reader); + let schema = arrow_reader.get_schema()?; + + let projection = match projection { + Some(p) => p, + None => (0..schema.fields().len()).collect(), + }; + + let projected_schema = Schema::new( + projection + .iter() + .map(|i| schema.field(*i).clone()) + .collect(), + ); + + Ok(Self { + filenames, + schema: Arc::new(projected_schema), + projection, + batch_size, + }) + } + } +} + +impl ExecutionPlan for ParquetExec { + fn schema(&self) -> Arc { + self.schema.clone() + } + + fn partitions(&self) -> Result>> { + let partitions = self + .filenames + .iter() + .map(|filename| { + Arc::new(ParquetPartition::new( + &filename, + self.projection.clone(), + self.schema.clone(), + self.batch_size, + )) as Arc + }) + .collect(); + Ok(partitions) + } +} + +struct ParquetPartition { + iterator: Arc>, +} + +impl ParquetPartition { + /// Create a new Parquet partition + pub fn new( + filename: &str, + projection: Vec, + schema: Arc, + batch_size: usize, + ) -> Self { + // because the parquet implementation is not thread-safe, it is necessary to execute + // on a thread and communicate with channels + let (request_tx, request_rx): (Sender<()>, Receiver<()>) = unbounded(); + let (response_tx, response_rx): ( + Sender>>, + Receiver>>, + ) = unbounded(); + + let filename = filename.to_string(); + + thread::spawn(move || { + //TODO error handling, remove unwraps + + // open file + let file = File::open(&filename).unwrap(); + match SerializedFileReader::new(file) { + Ok(file_reader) => { + let file_reader = Rc::new(file_reader); + + let mut arrow_reader = ParquetFileArrowReader::new(file_reader); + + match arrow_reader + .get_record_reader_by_columns(projection, batch_size) + { + Ok(mut batch_reader) => { + while let Ok(_) = request_rx.recv() { + match batch_reader.next_batch() { + Ok(Some(batch)) => { + response_tx.send(Ok(Some(batch))).unwrap(); + } + Ok(None) => { + response_tx.send(Ok(None)).unwrap(); + break; + } + Err(e) => { + response_tx + .send(Err(ExecutionError::General(format!( + "{:?}", + e + )))) + .unwrap(); + break; + } + } + } + } + + Err(e) => { + response_tx + .send(Err(ExecutionError::General(format!("{:?}", e)))) + .unwrap(); + } + } + } + + Err(e) => { + response_tx + .send(Err(ExecutionError::General(format!("{:?}", e)))) + .unwrap(); + } + } + }); + + let iterator = Arc::new(Mutex::new(ParquetIterator { + schema, + request_tx, + response_rx, + })); + + Self { iterator } + } +} + +impl Partition for ParquetPartition { + fn execute(&self) -> Result>> { + Ok(self.iterator.clone()) + } +} + +struct ParquetIterator { + schema: Arc, + request_tx: Sender<()>, + response_rx: Receiver>>, +} + +impl BatchIterator for ParquetIterator { + fn schema(&self) -> Arc { + self.schema.clone() + } + + fn next(&mut self) -> Result> { + match self.request_tx.send(()) { + Ok(_) => match self.response_rx.recv() { + Ok(batch) => batch, + Err(e) => Err(ExecutionError::General(format!( + "Error receiving batch: {:?}", + e + ))), + }, + _ => Err(ExecutionError::General( + "Error sending request for next batch".to_string(), + )), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::env; + + #[test] + fn test() -> Result<()> { + let testdata = + env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not defined"); + let filename = format!("{}/alltypes_plain.parquet", testdata); + let parquet_exec = ParquetExec::try_new(&filename, Some(vec![0, 1, 2]), 1024)?; + let partitions = parquet_exec.partitions()?; + assert_eq!(partitions.len(), 1); + + let results = partitions[0].execute()?; + let mut results = results.lock().unwrap(); + let batch = results.next()?.unwrap(); + + assert_eq!(8, batch.num_rows()); + assert_eq!(3, batch.num_columns()); + + let field_names: Vec<&str> = batch + .schema() + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); + assert_eq!(vec!["id", "bool_col", "tinyint_col"], field_names); + + let batch = results.next()?; + assert!(batch.is_none()); + + Ok(()) + } +} diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 103d436ec3d1..f626659d9aa5 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -86,7 +86,9 @@ fn nyc() -> Result<()> { fn parquet_query() { let mut ctx = ExecutionContext::new(); register_alltypes_parquet(&mut ctx); - let sql = "SELECT id, string_col FROM alltypes_plain"; + // NOTE that string_col is actually a binary column and does not have the UTF8 logical type + // so we need an explicit cast + let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain"; let actual = execute(&mut ctx, sql).join("\n"); let expected = "4\t\"0\"\n5\t\"1\"\n6\t\"0\"\n7\t\"1\"\n2\t\"0\"\n3\t\"1\"\n0\t\"0\"\n1\t\"1\"" diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 9edd2d2bf587..c1ea6a27d11e 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -34,9 +34,9 @@ use arrow::buffer::{Buffer, MutableBuffer}; use arrow::datatypes::{DataType as ArrowType, Field}; use crate::arrow::converter::{ - BoolConverter, Converter, Float32Converter, Float64Converter, Int16Converter, - Int32Converter, Int64Converter, Int8Converter, UInt16Converter, UInt32Converter, - UInt64Converter, UInt8Converter, Utf8Converter, + BinaryConverter, BoolConverter, Converter, Float32Converter, Float64Converter, + Int16Converter, Int32Converter, Int64Converter, Int8Converter, Int96Converter, + UInt16Converter, UInt32Converter, UInt64Converter, UInt8Converter, Utf8Converter, }; use crate::arrow::record_reader::RecordReader; use crate::arrow::schema::parquet_to_arrow_field; @@ -217,9 +217,10 @@ impl ArrayReader for PrimitiveArrayReader { &mut RecordReader, >(&mut self.record_reader)) }, - (arrow_type, _) => Err(general_err!( - "Reading {:?} type from parquet is not supported yet.", - arrow_type + (arrow_type, physical_type) => Err(general_err!( + "Reading {:?} type from parquet {:?} is not supported yet.", + arrow_type, + physical_type )), }?; @@ -819,10 +820,12 @@ impl<'a> ArrayReaderBuilder { page_iterator, column_desc, )?)), - PhysicalType::INT96 => Ok(Box::new(PrimitiveArrayReader::::new( - page_iterator, - column_desc, - )?)), + PhysicalType::INT96 => { + Ok(Box::new(ComplexObjectArrayReader::< + Int96Type, + Int96Converter, + >::new(page_iterator, column_desc)?)) + } PhysicalType::FLOAT => Ok(Box::new(PrimitiveArrayReader::::new( page_iterator, column_desc, @@ -830,16 +833,25 @@ impl<'a> ArrayReaderBuilder { PhysicalType::DOUBLE => Ok(Box::new( PrimitiveArrayReader::::new(page_iterator, column_desc)?, )), - PhysicalType::BYTE_ARRAY - if cur_type.get_basic_info().logical_type() == LogicalType::UTF8 => - { - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - Utf8Converter, - >::new(page_iterator, column_desc)?)) + PhysicalType::BYTE_ARRAY => { + if cur_type.get_basic_info().logical_type() == LogicalType::UTF8 { + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + Utf8Converter, + >::new( + page_iterator, column_desc + )?)) + } else { + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + BinaryConverter, + >::new( + page_iterator, column_desc + )?)) + } } other => Err(ArrowError(format!( - "Unable to create primite array reader for parquet physical type {}", + "Unable to create primitive array reader for parquet physical type {}", other ))), } diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index 1c8939bac9b8..6a66317c5f74 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -16,10 +16,10 @@ // under the License. use crate::arrow::record_reader::RecordReader; -use crate::data_type::{ByteArray, DataType}; +use crate::data_type::{ByteArray, DataType, Int96}; use arrow::array::{ - Array, ArrayRef, BooleanArray, BooleanBufferBuilder, BufferBuilderTrait, - StringBuilder, + Array, ArrayRef, BinaryBuilder, BooleanArray, BooleanBufferBuilder, + BufferBuilderTrait, StringBuilder, TimestampNanosecondBuilder, }; use arrow::compute::cast; use std::convert::From; @@ -29,7 +29,7 @@ use crate::errors::Result; use arrow::datatypes::{ArrowPrimitiveType, DataType as ArrowDataType}; use arrow::array::ArrayDataBuilder; -use arrow::array::{PrimitiveArray, StringArray}; +use arrow::array::{BinaryArray, PrimitiveArray, StringArray, TimestampNanosecondArray}; use std::marker::PhantomData; use crate::data_type::{ @@ -108,6 +108,22 @@ impl Converter<&mut RecordReader, BooleanArray> for BooleanArrayConver } } +pub struct Int96ArrayConverter {} + +impl Converter>, TimestampNanosecondArray> for Int96ArrayConverter { + fn convert(source: Vec>) -> Result { + let mut builder = TimestampNanosecondBuilder::new(source.len()); + for v in source { + match v { + Some(array) => builder.append_value(array.to_i64() * 1000000), + None => builder.append_null(), + }? + } + + Ok(builder.finish()) + } +} + pub struct Utf8ArrayConverter {} impl Converter>, StringArray> for Utf8ArrayConverter { @@ -124,6 +140,22 @@ impl Converter>, StringArray> for Utf8ArrayConverter { } } +pub struct BinaryArrayConverter {} + +impl Converter>, BinaryArray> for BinaryArrayConverter { + fn convert(source: Vec>) -> Result { + let mut builder = BinaryBuilder::new(source.len()); + for v in source { + match v { + Some(array) => builder.append_value(array.data()), + None => builder.append_null(), + }? + } + + Ok(builder.finish()) + } +} + pub type BoolConverter<'a> = ArrayRefConverter< &'a mut RecordReader, BooleanArray, @@ -141,6 +173,10 @@ pub type Float32Converter = CastConverter; pub type Utf8Converter = ArrayRefConverter>, StringArray, Utf8ArrayConverter>; +pub type BinaryConverter = + ArrayRefConverter>, BinaryArray, BinaryArrayConverter>; +pub type Int96Converter = + ArrayRefConverter>, TimestampNanosecondArray, Int96ArrayConverter>; pub struct FromConverter { _source: PhantomData, diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index f56deaa7b7e8..9aa40561fd3b 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -241,7 +241,7 @@ impl ParquetTypeConverter { fn from_byte_array(&self) -> Result { match self.schema.get_basic_info().logical_type() { - LogicalType::NONE => Ok(DataType::Utf8), + LogicalType::NONE => Ok(DataType::Binary), LogicalType::UTF8 => Ok(DataType::Utf8), other => Err(ArrowError(format!( "Unable to convert parquet BYTE_ARRAY logical type {}", diff --git a/rust/parquet/src/data_type.rs b/rust/parquet/src/data_type.rs index f5b4038a7211..c5647765225d 100644 --- a/rust/parquet/src/data_type.rs +++ b/rust/parquet/src/data_type.rs @@ -50,6 +50,30 @@ impl Int96 { pub fn set_data(&mut self, elem0: u32, elem1: u32, elem2: u32) { self.value = Some([elem0, elem1, elem2]); } + + /// Converts this INT96 into an i64 representing the number of MILLISECONDS since Epoch + pub fn to_i64(&self) -> i64 { + const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588; + const SECONDS_PER_DAY: i64 = 86_400; + const MILLIS_PER_SECOND: i64 = 1_000; + + let day = self.data()[2] as i64; + let nanoseconds = ((self.data()[1] as i64) << 32) + self.data()[0] as i64; + let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY; + let millis = seconds * MILLIS_PER_SECOND + nanoseconds / 1_000_000; + + // TODO: Add support for negative milliseconds. + // Chrono library does not handle negative timestamps, but we could probably write + // something similar to java.util.Date and java.util.Calendar. + if millis < 0 { + panic!( + "Expected non-negative milliseconds when converting Int96, found {}", + millis + ); + } + + millis + } } impl Default for Int96 { diff --git a/rust/parquet/src/record/api.rs b/rust/parquet/src/record/api.rs index 9c76a0c93009..1975e376e6f0 100644 --- a/rust/parquet/src/record/api.rs +++ b/rust/parquet/src/record/api.rs @@ -517,26 +517,7 @@ impl Field { /// `Timestamp` value. #[inline] pub fn convert_int96(_descr: &ColumnDescPtr, value: Int96) -> Self { - const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588; - const SECONDS_PER_DAY: i64 = 86_400; - const MILLIS_PER_SECOND: i64 = 1_000; - - let day = value.data()[2] as i64; - let nanoseconds = ((value.data()[1] as i64) << 32) + value.data()[0] as i64; - let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY; - let millis = seconds * MILLIS_PER_SECOND + nanoseconds / 1_000_000; - - // TODO: Add support for negative milliseconds. - // Chrono library does not handle negative timestamps, but we could probably write - // something similar to java.util.Date and java.util.Calendar. - if millis < 0 { - panic!( - "Expected non-negative milliseconds when converting Int96, found {}", - millis - ); - } - - Field::Timestamp(millis as u64) + Field::Timestamp(value.to_i64() as u64) } /// Converts Parquet FLOAT type with logical type into `f32` value.