Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rust/arrow/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ pub struct Field {
name: String,
data_type: DataType,
nullable: bool,
dict_id: i64,
dict_is_ordered: bool,
pub(crate) dict_id: i64,
pub(crate) dict_is_ordered: bool,
}

pub trait ArrowNativeType:
Expand Down
105 changes: 81 additions & 24 deletions rust/arrow/src/ipc/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,8 @@ pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder {

let mut fields = vec![];
for field in schema.fields() {
let fb_field_name = fbb.create_string(field.name().as_str());
let field_type = get_fb_field_type(field.data_type(), &mut fbb);
let mut field_builder = ipc::FieldBuilder::new(&mut fbb);
field_builder.add_name(fb_field_name);
field_builder.add_type_type(field_type.type_type);
field_builder.add_nullable(field.is_nullable());
match field_type.children {
None => {}
Some(children) => field_builder.add_children(children),
};
field_builder.add_type_(field_type.type_);
fields.push(field_builder.finish());
let fb_field = build_field(&mut fbb, field);
fields.push(fb_field);
}

let mut custom_metadata = vec![];
Expand Down Expand Up @@ -80,18 +70,8 @@ pub fn schema_to_fb_offset<'a: 'b, 'b>(
) -> WIPOffset<ipc::Schema<'b>> {
let mut fields = vec![];
for field in schema.fields() {
let fb_field_name = fbb.create_string(field.name().as_str());
let field_type = get_fb_field_type(field.data_type(), fbb);
let mut field_builder = ipc::FieldBuilder::new(fbb);
field_builder.add_name(fb_field_name);
field_builder.add_type_type(field_type.type_type);
field_builder.add_nullable(field.is_nullable());
match field_type.children {
None => {}
Some(children) => field_builder.add_children(children),
};
field_builder.add_type_(field_type.type_);
fields.push(field_builder.finish());
let fb_field = build_field(fbb, field);
fields.push(fb_field);
}

let mut custom_metadata = vec![];
Expand Down Expand Up @@ -333,6 +313,38 @@ pub(crate) struct FBFieldType<'b> {
pub(crate) children: Option<WIPOffset<Vector<'b, ForwardsUOffset<ipc::Field<'b>>>>>,
}

/// Create an IPC Field from an Arrow Field
pub(crate) fn build_field<'a: 'b, 'b>(
fbb: &mut FlatBufferBuilder<'a>,
field: &Field,
) -> WIPOffset<ipc::Field<'b>> {
let fb_field_name = fbb.create_string(field.name().as_str());
let field_type = get_fb_field_type(field.data_type(), fbb);

let fb_dictionary = if let Dictionary(index_type, _) = field.data_type() {
Some(get_fb_dictionary(
index_type,
field.dict_id,
field.dict_is_ordered,
fbb,
))
} else {
None
};

let mut field_builder = ipc::FieldBuilder::new(fbb);
field_builder.add_name(fb_field_name);
fb_dictionary.map(|dictionary| field_builder.add_dictionary(dictionary));
field_builder.add_type_type(field_type.type_type);
field_builder.add_nullable(field.is_nullable());
match field_type.children {
None => {}
Some(children) => field_builder.add_children(children),
};
field_builder.add_type_(field_type.type_);
field_builder.finish()
}

/// Get the IPC type of a data type
pub(crate) fn get_fb_field_type<'a: 'b, 'b>(
data_type: &DataType,
Expand Down Expand Up @@ -609,10 +621,45 @@ pub(crate) fn get_fb_field_type<'a: 'b, 'b>(
children: Some(fbb.create_vector(&children[..])),
}
}
Dictionary(_, value_type) => {
// In this library, the dictionary "type" is a logical construct. Here we
// pass through to the value type, as we've already captured the index
// type in the DictionaryEncoding metadata in the parent field
get_fb_field_type(value_type, fbb)
}
t => unimplemented!("Type {:?} not supported", t),
}
}

/// Create an IPC dictionary encoding
pub(crate) fn get_fb_dictionary<'a: 'b, 'b>(
index_type: &DataType,
dict_id: i64,
dict_is_ordered: bool,
fbb: &mut FlatBufferBuilder<'a>,
) -> WIPOffset<ipc::DictionaryEncoding<'b>> {
// We assume that the dictionary index type (as an integer) has already been
// validated elsewhere, and can safely assume we are dealing with signed
// integers
let mut index_builder = ipc::IntBuilder::new(fbb);
index_builder.add_is_signed(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although signed indices are preferred, we do support unsigned dictionary indices in the data types, so we might need to populate this based on the index_type

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carols10cents we can address this in future, I'm merging this

match *index_type {
Int8 => index_builder.add_bitWidth(8),
Int16 => index_builder.add_bitWidth(16),
Int32 => index_builder.add_bitWidth(32),
Int64 => index_builder.add_bitWidth(64),
_ => {}
}
let index_builder = index_builder.finish();

let mut builder = ipc::DictionaryEncodingBuilder::new(fbb);
builder.add_id(dict_id);
builder.add_indexType(index_builder);
builder.add_isOrdered(dict_is_ordered);

builder.finish()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -714,6 +761,16 @@ mod tests {
false,
),
Field::new("struct<>", DataType::Struct(vec![]), true),
Field::new_dict(
"dictionary<int32, utf8>",
DataType::Dictionary(
Box::new(DataType::Int32),
Box::new(DataType::Utf8),
),
true,
123,
true,
),
],
md,
);
Expand Down
20 changes: 10 additions & 10 deletions rust/parquet/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1396,16 +1396,16 @@ mod tests {
// Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false),
// Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false),
// Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false),
// Field::new_dict(
// "c31",
// DataType::Dictionary(
// Box::new(DataType::Int32),
// Box::new(DataType::Utf8),
// ),
// true,
// 123,
// true,
// ),
Field::new_dict(
"c31",
DataType::Dictionary(
Box::new(DataType::Int32),
Box::new(DataType::Utf8),
),
true,
123,
true,
),
Field::new("c32", DataType::LargeBinary, true),
Field::new("c33", DataType::LargeUtf8, true),
Field::new(
Expand Down