From cf7f8f4064edfb295f672fea4f07126165600c16 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Fri, 27 Sep 2019 15:42:24 +0800 Subject: [PATCH 1/3] Finish arrow reader --- cpp/submodules/parquet-testing | 2 +- rust/arrow/src/array/array.rs | 5 + rust/arrow/src/datatypes.rs | 3 + rust/arrow/src/error.rs | 1 + rust/arrow/src/record_batch.rs | 21 ++ rust/parquet/Cargo.toml | 3 +- rust/parquet/src/arrow/array_reader.rs | 18 +- rust/parquet/src/arrow/arrow_reader.rs | 270 +++++++++++++++++++++++++ rust/parquet/src/arrow/mod.rs | 32 +++ rust/parquet/src/errors.rs | 10 + 10 files changed, 362 insertions(+), 3 deletions(-) create mode 100644 rust/parquet/src/arrow/arrow_reader.rs diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index 46c9e977f58f..2fc3ade4ccbf 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit 46c9e977f58f6c5ef1b81f782f3746b3656e5a8c +Subproject commit 2fc3ade4ccbf17271194df0b1549bc6733204314 diff --git a/rust/arrow/src/array/array.rs b/rust/arrow/src/array/array.rs index 19ab72f86809..d0ae1e3ca577 100644 --- a/rust/arrow/src/array/array.rs +++ b/rust/arrow/src/array/array.rs @@ -1056,6 +1056,11 @@ impl StructArray { self.boxed_fields.iter().collect() } + /// Returns child array refs of the struct array + pub fn columns_ref(&self) -> Vec { + self.boxed_fields.clone() + } + /// Return field names in this struct array pub fn column_names(&self) -> Vec<&str> { match self.data.data_type() { diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index 40e0153e81d8..abd526f839eb 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -34,6 +34,7 @@ use serde_derive::{Deserialize, Serialize}; use serde_json::{json, Number, Value, Value::Number as VNumber}; use crate::error::{ArrowError, Result}; +use std::sync::Arc; /// The possible relative types that are supported. /// @@ -869,6 +870,8 @@ impl fmt::Display for Schema { } } +pub type SchemaRef = Arc; + #[cfg(test)] mod tests { use super::*; diff --git a/rust/arrow/src/error.rs b/rust/arrow/src/error.rs index 2f758d4b385f..e9a92358f2ff 100644 --- a/rust/arrow/src/error.rs +++ b/rust/arrow/src/error.rs @@ -31,6 +31,7 @@ pub enum ArrowError { JsonError(String), IoError(String), InvalidArgumentError(String), + ParquetError(String), } impl From<::std::io::Error> for ArrowError { diff --git a/rust/arrow/src/record_batch.rs b/rust/arrow/src/record_batch.rs index e718118b4a87..0450c0f58028 100644 --- a/rust/arrow/src/record_batch.rs +++ b/rust/arrow/src/record_batch.rs @@ -113,9 +113,30 @@ impl From<&StructArray> for RecordBatch { } } +impl Into for RecordBatch { + fn into(self) -> StructArray { + self.schema.fields.iter().zip(self.columns.iter()) + .map(|t| (t.0.clone(), t.1.clone())) + .collect::>() + .into() + } +} + unsafe impl Send for RecordBatch {} unsafe impl Sync for RecordBatch {} + +/// Definition of record batch reader. +pub trait RecordBatchReader { + /// Returns schemas of this record batch reader. + /// Implementation of this trait should guarantee that all record batches returned + /// by this reader should have same schema as returned from this method. + fn schema(&mut self) -> SchemaRef; + + /// Returns next record batch. + fn next_batch(&mut self) -> Result; +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/parquet/Cargo.toml b/rust/parquet/Cargo.toml index 66292c0f9dff..4236b7c12cd9 100644 --- a/rust/parquet/Cargo.toml +++ b/rust/parquet/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "parquet" -version = "1.0.0-SNAPSHOT" +version = "0.15.0-SNAPSHOT" license = "Apache-2.0" description = "Apache Parquet implementation in Rust" homepage = "https://github.com/apache/arrow" @@ -41,6 +41,7 @@ zstd = "0.4" chrono = "0.4" num-bigint = "0.2" arrow = { path = "../arrow", version = "1.0.0-SNAPSHOT" } +serde_json = { version = "1.0.13", features = ["preserve_order"] } [dev-dependencies] lazy_static = "1" diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 3a4a7864cbf4..2e58c0ce2aac 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -52,9 +52,12 @@ use crate::schema::types::{ ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr, }; use crate::schema::visitor::TypeVisitor; +use std::any::Any; /// Array reader reads parquet data into arrow array. pub trait ArrayReader { + fn as_any(&self) -> &dyn Any; + /// Returns the arrow type of this array reader. fn get_data_type(&self) -> &ArrowType; @@ -115,6 +118,10 @@ impl PrimitiveArrayReader { /// Implementation of primitive array reader. impl ArrayReader for PrimitiveArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + /// Returns data type of primitive array. fn get_data_type(&self) -> &ArrowType { &self.data_type @@ -232,7 +239,7 @@ impl ArrayReader for PrimitiveArrayReader { } /// Implementation of struct array reader. -struct StructArrayReader { +pub struct StructArrayReader { children: Vec>, data_type: ArrowType, struct_def_level: i16, @@ -261,6 +268,10 @@ impl StructArrayReader { } impl ArrayReader for StructArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + /// Returns data type. /// This must be a struct. fn get_data_type(&self) -> &ArrowType { @@ -705,6 +716,7 @@ mod tests { use arrow::array::{Array, ArrayRef, PrimitiveArray, StructArray}; use arrow::datatypes::{DataType as ArrowType, Field, Int32Type as ArrowInt32}; use rand::distributions::range::SampleRange; + use std::any::Any; use std::collections::VecDeque; use std::rc::Rc; use std::sync::Arc; @@ -953,6 +965,10 @@ mod tests { } impl ArrayReader for InMemoryArrayReader { + fn as_any(&self) -> &Any { + self + } + fn get_data_type(&self) -> &ArrowType { &self.data_type } diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs new file mode 100644 index 000000000000..0781614de9f8 --- /dev/null +++ b/rust/parquet/src/arrow/arrow_reader.rs @@ -0,0 +1,270 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Contains reader which reads parquet data into arrow array. + +use crate::arrow::array_reader::{build_array_reader, ArrayReader, StructArrayReader}; +use crate::arrow::schema::parquet_to_arrow_schema; +use crate::arrow::schema::parquet_to_arrow_schema_by_columns; +use crate::errors::{ParquetError, Result}; +use crate::file::reader::FileReader; +use arrow::array::StructArray; +use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::{RecordBatch, RecordBatchReader}; +use std::rc::Rc; +use std::sync::Arc; + +/// Arrow reader api. +/// With this api, user can get arrow schema from parquet file, and read parquet data +/// into arrow arrays. +pub trait ArrowReader { + /// Read parquet schema and convert it into arrow schema. + fn get_schema(&mut self) -> Result; + + /// Read parquet schema and convert it into arrow schema. + /// This schema only includes columns identified by `column_indices`. + fn get_schema_by_columns(&mut self, column_indices: T) -> Result + where + T: IntoIterator; + + /// Returns record batch reader from whole parquet file. + /// + /// # Arguments + /// + /// `batch_size`: The size of each record batch returned from this reader. Only the + /// last batch may contain records less than this size, otherwise record batches + /// returned from this reader should contains exactly `batch_size` elements. + fn get_record_reader( + &mut self, + batch_size: usize, + ) -> Result>; + + /// Returns record batch reader whose record batch contains columns identified by + /// `column_indices`. + /// + /// # Arguments + /// + /// `column_indices`: The columns that should be included in record batches. + /// `batch_size`: Please refer to `get_record_reader`. + fn get_record_reader_by_columns( + &mut self, + column_indices: T, + batch_size: usize, + ) -> Result> + where + T: IntoIterator; +} + +pub struct ParquetFileArrowReader { + file_reader: Rc, +} + +impl ArrowReader for ParquetFileArrowReader { + fn get_schema(&mut self) -> Result { + parquet_to_arrow_schema( + self.file_reader + .metadata() + .file_metadata() + .schema_descr_ptr(), + ) + } + + fn get_schema_by_columns(&mut self, column_indices: T) -> Result + where + T: IntoIterator, + { + parquet_to_arrow_schema_by_columns( + self.file_reader + .metadata() + .file_metadata() + .schema_descr_ptr(), + column_indices, + ) + } + + fn get_record_reader( + &mut self, + batch_size: usize, + ) -> Result> { + let column_indices = 0..self + .file_reader + .metadata() + .file_metadata() + .schema_descr_ptr() + .num_columns(); + + self.get_record_reader_by_columns(column_indices, batch_size) + } + + fn get_record_reader_by_columns( + &mut self, + column_indices: T, + batch_size: usize, + ) -> Result> + where + T: IntoIterator, + { + let array_reader = build_array_reader( + self.file_reader + .metadata() + .file_metadata() + .schema_descr_ptr(), + column_indices, + self.file_reader.clone(), + )?; + + Ok(Box::new(ParquetRecordBatchReader::try_new( + batch_size, + array_reader, + )?)) + } +} + +impl ParquetFileArrowReader { + pub fn new(file_reader: Rc) -> Self { + Self { file_reader } + } +} + +struct ParquetRecordBatchReader { + batch_size: usize, + array_reader: Box, + schema: SchemaRef, +} + +impl RecordBatchReader for ParquetRecordBatchReader { + fn schema(&mut self) -> SchemaRef { + self.schema.clone() + } + + fn next_batch(&mut self) -> ArrowResult { + self.array_reader + .next_batch(self.batch_size) + .map_err(|err| err.into()) + .and_then(|array| { + array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + general_err!("Struct array reader should return struct array") + .into() + }) + .and_then(|struct_array| { + RecordBatch::try_new( + self.schema.clone(), + struct_array.columns_ref(), + ) + }) + }) + } +} + +impl ParquetRecordBatchReader { + pub fn try_new( + batch_size: usize, + array_reader: Box, + ) -> Result { + // Check that array reader is struct array reader + array_reader + .as_any() + .downcast_ref::() + .ok_or_else(|| general_err!("The input must be struct array reader!"))?; + + let schema = match array_reader.get_data_type() { + &ArrowType::Struct(ref fields) => Schema::new(fields.clone()), + _ => unreachable!("Struct array reader's data type is not struct!"), + }; + + Ok(Self { + batch_size, + array_reader, + schema: Arc::new(schema), + }) + } +} + +#[cfg(test)] +mod tests { + use crate::arrow::arrow_reader::{ArrowReader, ParquetFileArrowReader}; + use crate::file::reader::{FileReader, SerializedFileReader}; + use arrow::array::Array; + use arrow::array::StructArray; + use serde_json::Value::Array as JArray; + use std::cmp::min; + use std::env; + use std::fs::File; + use std::path::PathBuf; + use std::rc::Rc; + + #[test] + fn test_arrow_reader() { + let json_values = match serde_json::from_reader(get_test_file( + "parquet/generated_simple_numerics/blogs.json", + )).expect("Failed to read json value from file!") + { + JArray(values) => values, + _ => panic!("Input should be json array!"), + }; + + let parquet_file_reader = + get_test_reader("parquet/generated_simple_numerics/blogs.parquet"); + + let max_len = parquet_file_reader.metadata().file_metadata().num_rows() as usize; + + let mut arrow_reader = ParquetFileArrowReader::new(parquet_file_reader); + + let mut record_batch_reader = arrow_reader + .get_record_reader(60) + .expect("Failed to read into array!"); + + for i in 0..20 { + let array: StructArray = record_batch_reader + .next_batch() + .expect("Failed to read record batch!") + .into(); + + let (start, end) = (i * 60 as usize, (i + 1) * 60 as usize); + + if start < max_len { + assert_ne!(0, array.len()); + let end = min(end, max_len); + let json = JArray(Vec::from(&json_values[start..end])); + assert_eq!(array, json) + } else { + assert_eq!(0, array.len()); + } + } + } + + fn get_test_reader(file_name: &str) -> Rc { + let file = get_test_file(file_name); + + let reader = + SerializedFileReader::new(file).expect("Failed to create serialized reader"); + + Rc::new(reader) + } + + fn get_test_file(file_name: &str) -> File { + let mut path = PathBuf::new(); + path.push(env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined!")); + path.push(file_name); + + File::open(path.as_path()).expect("File not found!") + } +} diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs index a2c6031cfb8d..a09c286390a3 100644 --- a/rust/parquet/src/arrow/mod.rs +++ b/rust/parquet/src/arrow/mod.rs @@ -19,10 +19,42 @@ //! in-memory data. //! //! This mod provides API for converting between arrow and parquet. +//! +//! # Example of reading parquet file into arrow record batch +//! +//! ```rust, no_run +//! use arrow::record_batch::RecordBatchReader; +//! use parquet::file::reader::SerializedFileReader; +//! use parquet::arrow::{ParquetFileArrowReader, ArrowReader}; +//! use std::rc::Rc; +//! use std::fs::File; +//! +//! let file = File::open("parquet.file").unwrap(); +//! let file_reader = SerializedFileReader::new(file).unwrap(); +//! let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(file_reader)); +//! +//! println!("Converted arrow schema is: {}", arrow_reader.get_schema().unwrap()); +//! println!("Arrow schema after projection is: {}", +//! arrow_reader.get_schema_by_columns(vec![2, 4, 6]).unwrap()); +//! +//! let mut record_batch_reader = arrow_reader.get_record_reader(2048).unwrap(); +//! +//! loop { +//! let record_batch = record_batch_reader.next_batch().unwrap(); +//! if record_batch.num_rows() > 0 { +//! println!("Read {} records.", record_batch.num_rows()); +//! } else { +//! println!("End of file!"); +//! } +//!} +//! ``` pub(in crate::arrow) mod array_reader; +pub mod arrow_reader; pub(in crate::arrow) mod converter; pub(in crate::arrow) mod record_reader; pub mod schema; +pub use self::arrow_reader::ArrowReader; +pub use self::arrow_reader::ParquetFileArrowReader; pub use self::schema::{parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns}; diff --git a/rust/parquet/src/errors.rs b/rust/parquet/src/errors.rs index fe19c1f5ea95..4e0e2fe6e719 100644 --- a/rust/parquet/src/errors.rs +++ b/rust/parquet/src/errors.rs @@ -22,6 +22,7 @@ use std::{cell, convert, io, result}; use arrow::error::ArrowError; use quick_error::quick_error; use snap; +use std::error::Error; use thrift; quick_error! { @@ -97,3 +98,12 @@ macro_rules! eof_err { ($fmt:expr) => (ParquetError::EOF($fmt.to_owned())); ($fmt:expr, $($args:expr),*) => (ParquetError::EOF(format!($fmt, $($args),*))); } + +// ---------------------------------------------------------------------- +// Convert parquet error into other errors + +impl Into for ParquetError { + fn into(self) -> ArrowError { + ArrowError::ParquetError(self.description().to_string()) + } +} From cecdd97e39c2feab9ad5366878358b479b11fe3a Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Sat, 12 Oct 2019 20:57:30 +0800 Subject: [PATCH 2/3] Fix test --- cpp/submodules/parquet-testing | 2 +- rust/arrow/src/record_batch.rs | 2 +- rust/parquet/Cargo.toml | 2 +- rust/parquet/src/arrow/arrow_reader.rs | 20 ++++++++++++++------ rust/parquet/src/arrow/mod.rs | 2 +- testing | 2 +- 6 files changed, 19 insertions(+), 11 deletions(-) diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index 2fc3ade4ccbf..46c9e977f58f 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit 2fc3ade4ccbf17271194df0b1549bc6733204314 +Subproject commit 46c9e977f58f6c5ef1b81f782f3746b3656e5a8c diff --git a/rust/arrow/src/record_batch.rs b/rust/arrow/src/record_batch.rs index 0450c0f58028..7d4acf67fb1c 100644 --- a/rust/arrow/src/record_batch.rs +++ b/rust/arrow/src/record_batch.rs @@ -134,7 +134,7 @@ pub trait RecordBatchReader { fn schema(&mut self) -> SchemaRef; /// Returns next record batch. - fn next_batch(&mut self) -> Result; + fn next_batch(&mut self) -> Result>; } #[cfg(test)] diff --git a/rust/parquet/Cargo.toml b/rust/parquet/Cargo.toml index 4236b7c12cd9..549cc4469ca1 100644 --- a/rust/parquet/Cargo.toml +++ b/rust/parquet/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "parquet" -version = "0.15.0-SNAPSHOT" +version = "1.0.0-SNAPSHOT" license = "Apache-2.0" description = "Apache Parquet implementation in Rust" homepage = "https://github.com/apache/arrow" diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs index 0781614de9f8..13061aa53831 100644 --- a/rust/parquet/src/arrow/arrow_reader.rs +++ b/rust/parquet/src/arrow/arrow_reader.rs @@ -152,7 +152,7 @@ impl RecordBatchReader for ParquetRecordBatchReader { self.schema.clone() } - fn next_batch(&mut self) -> ArrowResult { + fn next_batch(&mut self) -> ArrowResult> { self.array_reader .next_batch(self.batch_size) .map_err(|err| err.into()) @@ -171,6 +171,13 @@ impl RecordBatchReader for ParquetRecordBatchReader { ) }) }) + .map(|record_batch| { + if record_batch.num_rows() > 0 { + Some(record_batch) + } else { + None + } + }) } } @@ -233,20 +240,21 @@ mod tests { .expect("Failed to read into array!"); for i in 0..20 { - let array: StructArray = record_batch_reader + let array: Option = record_batch_reader .next_batch() .expect("Failed to read record batch!") - .into(); + .map(|r| r.into()); let (start, end) = (i * 60 as usize, (i + 1) * 60 as usize); if start < max_len { - assert_ne!(0, array.len()); + assert!(array.is_some()); + assert_ne!(0, array.as_ref().unwrap().len()); let end = min(end, max_len); let json = JArray(Vec::from(&json_values[start..end])); - assert_eq!(array, json) + assert_eq!(array.unwrap(), json) } else { - assert_eq!(0, array.len()); + assert!(array.is_none()); } } } diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs index a09c286390a3..02f50fd3a904 100644 --- a/rust/parquet/src/arrow/mod.rs +++ b/rust/parquet/src/arrow/mod.rs @@ -40,7 +40,7 @@ //! let mut record_batch_reader = arrow_reader.get_record_reader(2048).unwrap(); //! //! loop { -//! let record_batch = record_batch_reader.next_batch().unwrap(); +//! let record_batch = record_batch_reader.next_batch().unwrap().unwrap(); //! if record_batch.num_rows() > 0 { //! println!("Read {} records.", record_batch.num_rows()); //! } else { diff --git a/testing b/testing index 93f33ae92222..90ae758c55ae 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 93f33ae922226686e623d6b1d0307f48030a8a67 +Subproject commit 90ae758c55aebf40e926ce049a662726b26f485f From 8b915b399800ade7b9eb710069f462b954439179 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Sun, 13 Oct 2019 16:30:44 +0800 Subject: [PATCH 3/3] Fix format --- rust/arrow/src/record_batch.rs | 10 ++++++---- rust/parquet/src/arrow/arrow_reader.rs | 3 ++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/rust/arrow/src/record_batch.rs b/rust/arrow/src/record_batch.rs index 7d4acf67fb1c..7ac1d41eea92 100644 --- a/rust/arrow/src/record_batch.rs +++ b/rust/arrow/src/record_batch.rs @@ -115,17 +115,19 @@ impl From<&StructArray> for RecordBatch { impl Into for RecordBatch { fn into(self) -> StructArray { - self.schema.fields.iter().zip(self.columns.iter()) - .map(|t| (t.0.clone(), t.1.clone())) + self.schema + .fields + .iter() + .zip(self.columns.iter()) + .map(|t| (t.0.clone(), t.1.clone())) .collect::>() - .into() + .into() } } unsafe impl Send for RecordBatch {} unsafe impl Sync for RecordBatch {} - /// Definition of record batch reader. pub trait RecordBatchReader { /// Returns schemas of this record batch reader. diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs index 13061aa53831..4d121957c08b 100644 --- a/rust/parquet/src/arrow/arrow_reader.rs +++ b/rust/parquet/src/arrow/arrow_reader.rs @@ -222,7 +222,8 @@ mod tests { fn test_arrow_reader() { let json_values = match serde_json::from_reader(get_test_file( "parquet/generated_simple_numerics/blogs.json", - )).expect("Failed to read json value from file!") + )) + .expect("Failed to read json value from file!") { JArray(values) => values, _ => panic!("Input should be json array!"),