Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions rust/parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ fn write_leaves(
| ArrowDataType::Binary
| ArrowDataType::Utf8
| ArrowDataType::LargeUtf8
| ArrowDataType::Decimal(_, _) => {
| ArrowDataType::Decimal(_, _)
| ArrowDataType::FixedSizeBinary(_) => {
let mut col_writer = get_col_writer(&mut row_group_writer)?;
write_leaf(
&mut col_writer,
Expand Down Expand Up @@ -189,11 +190,14 @@ fn write_leaves(
ArrowDataType::Float16 => Err(ParquetError::ArrowError(
"Float16 arrays not supported".to_string(),
)),
ArrowDataType::FixedSizeList(_, _)
| ArrowDataType::FixedSizeBinary(_)
| ArrowDataType::Union(_) => Err(ParquetError::NYI(
"Attempting to write an Arrow type that is not yet implemented".to_string(),
)),
ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Union(_) => {
Err(ParquetError::NYI(
format!(
"Attempting to write an Arrow type {:?} to parquet that is not yet implemented",
array.data_type()
)
))
}
}
}

Expand Down Expand Up @@ -1224,6 +1228,18 @@ mod tests {
);
}

#[test]
fn fixed_size_binary_single_column() {
let mut builder = FixedSizeBinaryBuilder::new(16, 4);
builder.append_value(b"0123").unwrap();
builder.append_null().unwrap();
builder.append_value(b"8910").unwrap();
builder.append_value(b"1112").unwrap();
let array = Arc::new(builder.finish());

one_column_roundtrip("timestamp_millisecond_single_column", array, true);
}

#[test]
fn string_single_column() {
let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
Expand Down
30 changes: 20 additions & 10 deletions rust/parquet/src/arrow/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ impl LevelInfo {
| DataType::Interval(_)
| DataType::Binary
| DataType::LargeBinary
| DataType::Decimal(_, _) => {
| DataType::Decimal(_, _)
| DataType::FixedSizeBinary(_) => {
// we return a vector of 1 value to represent the primitive
vec![self.calculate_child_levels(
array_offsets,
Expand All @@ -145,7 +146,6 @@ impl LevelInfo {
field.is_nullable(),
)]
}
DataType::FixedSizeBinary(_) => unimplemented!(),
DataType::List(list_field) | DataType::LargeList(list_field) => {
// Calculate the list level
let list_level = self.calculate_child_levels(
Expand Down Expand Up @@ -189,15 +189,15 @@ impl LevelInfo {
| DataType::Utf8
| DataType::LargeUtf8
| DataType::Dictionary(_, _)
| DataType::Decimal(_, _) => {
| DataType::Decimal(_, _)
| DataType::FixedSizeBinary(_) => {
vec![list_level.calculate_child_levels(
child_offsets,
child_mask,
false,
list_field.is_nullable(),
)]
}
DataType::FixedSizeBinary(_) => unimplemented!(),
DataType::List(_) | DataType::LargeList(_) | DataType::Struct(_) => {
list_level.calculate_array_levels(&child_array, list_field)
}
Expand Down Expand Up @@ -297,9 +297,10 @@ impl LevelInfo {
is_list: bool,
is_nullable: bool,
) -> Self {
let mut definition = vec![];
let mut repetition = vec![];
let mut merged_array_mask = vec![];
let min_len = *(array_offsets.last().unwrap()) as usize;
let mut definition = Vec::with_capacity(min_len);
let mut repetition = Vec::with_capacity(min_len);
let mut merged_array_mask = Vec::with_capacity(min_len);

// determine the total level increment based on data types
let max_definition = match is_list {
Expand Down Expand Up @@ -624,9 +625,18 @@ impl LevelInfo {
let masks = offsets.windows(2).map(|w| w[1] > w[0]).collect();
(offsets, masks)
}
DataType::FixedSizeBinary(_)
| DataType::FixedSizeList(_, _)
| DataType::Union(_) => {
DataType::FixedSizeBinary(value_len) => {
let array_mask = match array.data().null_buffer() {
Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()),
None => vec![true; array.len()],
};
let value_len = *value_len as i64;
(
(0..=(array.len() as i64)).map(|v| v * value_len).collect(),
array_mask,
)
}
DataType::FixedSizeList(_, _) | DataType::Union(_) => {
unimplemented!("Getting offsets not yet implemented")
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust/parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub(in crate::arrow) mod array_reader;
pub mod arrow_reader;
pub mod arrow_writer;
pub(in crate::arrow) mod converter;
pub mod levels;
pub(in crate::arrow) mod levels;
pub(in crate::arrow) mod record_reader;
pub mod schema;

Expand Down