Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions rust/arrow/src/array/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,11 @@ impl StructArray {
self.boxed_fields.iter().collect()
}

/// Returns child array refs of the struct array
pub fn columns_ref(&self) -> Vec<ArrayRef> {
self.boxed_fields.clone()
}

/// Return field names in this struct array
pub fn column_names(&self) -> Vec<&str> {
match self.data.data_type() {
Expand Down
3 changes: 3 additions & 0 deletions rust/arrow/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use serde_derive::{Deserialize, Serialize};
use serde_json::{json, Number, Value, Value::Number as VNumber};

use crate::error::{ArrowError, Result};
use std::sync::Arc;

/// The possible relative types that are supported.
///
Expand Down Expand Up @@ -869,6 +870,8 @@ impl fmt::Display for Schema {
}
}

pub type SchemaRef = Arc<Schema>;

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions rust/arrow/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub enum ArrowError {
JsonError(String),
IoError(String),
InvalidArgumentError(String),
ParquetError(String),
}

impl From<::std::io::Error> for ArrowError {
Expand Down
23 changes: 23 additions & 0 deletions rust/arrow/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,32 @@ impl From<&StructArray> for RecordBatch {
}
}

impl Into<StructArray> for RecordBatch {
fn into(self) -> StructArray {
self.schema
.fields
.iter()
.zip(self.columns.iter())
.map(|t| (t.0.clone(), t.1.clone()))
.collect::<Vec<(Field, ArrayRef)>>()
.into()
}
}

unsafe impl Send for RecordBatch {}
unsafe impl Sync for RecordBatch {}

/// Definition of record batch reader.
pub trait RecordBatchReader {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a very similar trait in DataFusion, only it inherits Sync + Send too so that it can be passed between threads. Maybe we could do that here too? See https://github.com/apache/arrow/blob/master/rust/datafusion/src/execution/physical_plan/mod.rs#L44-L50

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's safe to make this trait as Send+Sync because the reader used some unsafe data(e.g. MutableBuffer).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, makes sense.

/// Returns schemas of this record batch reader.
/// Implementation of this trait should guarantee that all record batches returned
/// by this reader should have same schema as returned from this method.
fn schema(&mut self) -> SchemaRef;

/// Returns next record batch.
fn next_batch(&mut self) -> Result<Option<RecordBatch>>;
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions rust/parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ zstd = "0.4"
chrono = "0.4"
num-bigint = "0.2"
arrow = { path = "../arrow", version = "1.0.0-SNAPSHOT" }
serde_json = { version = "1.0.13", features = ["preserve_order"] }

[dev-dependencies]
lazy_static = "1"
Expand Down
18 changes: 17 additions & 1 deletion rust/parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ use crate::schema::types::{
ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr,
};
use crate::schema::visitor::TypeVisitor;
use std::any::Any;

/// Array reader reads parquet data into arrow array.
pub trait ArrayReader {
fn as_any(&self) -> &dyn Any;

/// Returns the arrow type of this array reader.
fn get_data_type(&self) -> &ArrowType;

Expand Down Expand Up @@ -115,6 +118,10 @@ impl<T: DataType> PrimitiveArrayReader<T> {

/// Implementation of primitive array reader.
impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
fn as_any(&self) -> &dyn Any {
self
}

/// Returns data type of primitive array.
fn get_data_type(&self) -> &ArrowType {
&self.data_type
Expand Down Expand Up @@ -232,7 +239,7 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
}

/// Implementation of struct array reader.
struct StructArrayReader {
pub struct StructArrayReader {
children: Vec<Box<dyn ArrayReader>>,
data_type: ArrowType,
struct_def_level: i16,
Expand Down Expand Up @@ -261,6 +268,10 @@ impl StructArrayReader {
}

impl ArrayReader for StructArrayReader {
fn as_any(&self) -> &dyn Any {
self
}

/// Returns data type.
/// This must be a struct.
fn get_data_type(&self) -> &ArrowType {
Expand Down Expand Up @@ -705,6 +716,7 @@ mod tests {
use arrow::array::{Array, ArrayRef, PrimitiveArray, StructArray};
use arrow::datatypes::{DataType as ArrowType, Field, Int32Type as ArrowInt32};
use rand::distributions::range::SampleRange;
use std::any::Any;
use std::collections::VecDeque;
use std::rc::Rc;
use std::sync::Arc;
Expand Down Expand Up @@ -953,6 +965,10 @@ mod tests {
}

impl ArrayReader for InMemoryArrayReader {
fn as_any(&self) -> &Any {
self
}

fn get_data_type(&self) -> &ArrowType {
&self.data_type
}
Expand Down
Loading