Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions dev/archery/archery/integration/datagen.py
Original file line number Diff line number Diff line change
Expand Up @@ -1493,32 +1493,26 @@ def _temp_path():

file_objs = [
generate_primitive_case([], name='primitive_no_batches'),
generate_primitive_case([17, 20], name='primitive')
.skip_category('Rust'),
generate_primitive_case([0, 0, 0], name='primitive_zerolength')
.skip_category('Rust'),
generate_primitive_case([17, 20], name='primitive'),
generate_primitive_case([0, 0, 0], name='primitive_zerolength'),

generate_primitive_large_offsets_case([17, 20])
.skip_category('Go')
.skip_category('JS')
.skip_category('Rust'),
.skip_category('JS'),

generate_null_case([10, 0])
.skip_category('Rust')
.skip_category('JS') # TODO(ARROW-7900)
.skip_category('Go'), # TODO(ARROW-7901)

generate_null_trivial_case([0, 0])
.skip_category('Rust')
.skip_category('JS') # TODO(ARROW-7900)
.skip_category('Go'), # TODO(ARROW-7901)

generate_decimal_case()
.skip_category('Go') # TODO(ARROW-7948): Decimal + Go
.skip_category('Rust'),

generate_datetime_case()
.skip_category('Rust'),
generate_datetime_case(),

generate_interval_case()
.skip_category('JS') # TODO(ARROW-5239): Intervals + JS
Expand Down
15 changes: 9 additions & 6 deletions rust/arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,36 @@ use arrow::record_batch::RecordBatch;
/// Convert a `RecordBatch` to `FlightData` by getting the header and body as bytes
impl From<&RecordBatch> for FlightData {
fn from(batch: &RecordBatch) -> Self {
let (header, body) = writer::record_batch_to_bytes(batch);
let options = writer::IpcWriteOptions::default();
let data = writer::record_batch_to_bytes(batch, &options);
Self {
flight_descriptor: None,
app_metadata: vec![],
data_header: header,
data_body: body,
data_header: data.ipc_message,
data_body: data.arrow_data,
}
}
}

/// Convert a `Schema` to `SchemaResult` by converting to an IPC message
impl From<&Schema> for SchemaResult {
fn from(schema: &Schema) -> Self {
let options = writer::IpcWriteOptions::default();
Self {
schema: writer::schema_to_bytes(schema),
schema: writer::schema_to_bytes(schema, &options).ipc_message,
}
}
}

/// Convert a `Schema` to `FlightData` by converting to an IPC message
impl From<&Schema> for FlightData {
fn from(schema: &Schema) -> Self {
let schema = writer::schema_to_bytes(schema);
let options = writer::IpcWriteOptions::default();
let schema = writer::schema_to_bytes(schema, &options);
Self {
flight_descriptor: None,
app_metadata: vec![],
data_header: schema,
data_header: schema.ipc_message,
data_body: vec![],
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust/arrow/src/ipc/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ pub(crate) fn get_fb_field_type<'a: 'b, 'b>(
Null => FBFieldType {
type_type: ipc::Type::Null,
type_: ipc::NullBuilder::new(fbb).finish().as_union_value(),
children: None,
children: Some(fbb.create_vector(&empty_fields[..])),
},
Boolean => FBFieldType {
type_type: ipc::Type::Bool,
Expand Down
1 change: 1 addition & 0 deletions rust/arrow/src/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ pub use self::gen::SparseTensor::*;
pub use self::gen::Tensor::*;

static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is const better than static here? IIRC I used static for ARROW_MAGIC when const either wasn't a thing yet, or I was unfamiliar with it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think this can be const.

static CONTINUATION_MARKER: [u8; 4] = [0xff; 4];
77 changes: 51 additions & 26 deletions rust/arrow/src/ipc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use crate::datatypes::{DataType, Field, IntervalUnit, Schema, SchemaRef};
use crate::error::{ArrowError, Result};
use crate::ipc;
use crate::record_batch::{RecordBatch, RecordBatchReader};
use DataType::*;

const CONTINUATION_MARKER: u32 = 0xffff_ffff;
use ipc::CONTINUATION_MARKER;
use DataType::*;

/// Read a buffer based on offset and length
fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
Expand Down Expand Up @@ -482,6 +482,9 @@ pub struct FileReader<R: Read + Seek> {
///
/// Dictionaries may be appended to in the streaming format.
dictionaries_by_field: Vec<Option<ArrayRef>>,

/// Metadata version
metadata_version: ipc::MetadataVersion,
}

impl<R: Read + Seek> FileReader<R> {
Expand All @@ -506,12 +509,11 @@ impl<R: Read + Seek> FileReader<R> {
"Arrow file does not contain correct footer".to_string(),
));
}

// what does the footer contain?
// read footer length
let mut footer_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::End(-10))?;
reader.read_exact(&mut footer_size)?;
let footer_len = u32::from_le_bytes(footer_size);
let footer_len = i32::from_le_bytes(footer_size);

// read footer
let mut footer_data = vec![0; footer_len as usize];
Expand All @@ -534,6 +536,7 @@ impl<R: Read + Seek> FileReader<R> {
let mut dictionaries_by_field = vec![None; schema.fields().len()];
for block in footer.dictionaries().unwrap() {
// read length from end of offset
// TODO: ARROW-9848: dictionary metadata has not been tested
let meta_len = block.metaDataLength() - 4;

let mut block_data = vec![0; meta_len as usize];
Expand All @@ -554,15 +557,21 @@ impl<R: Read + Seek> FileReader<R> {
reader.read_exact(&mut buf)?;

if batch.isDelta() {
panic!("delta dictionary batches not supported");
return Err(ArrowError::IoError(
"delta dictionary batches not supported".to_string(),
));
}

let id = batch.id();

// As the dictionary batch does not contain the type of the
// values array, we need to retieve this from the schema.
let first_field = find_dictionary_field(&ipc_schema, id)
.expect("dictionary id not found in shchema");
let first_field =
find_dictionary_field(&ipc_schema, id).ok_or_else(|| {
ArrowError::InvalidArgumentError(
"dictionary id not found in schema".to_string(),
)
})?;

// Get an array representing this dictionary's values.
let dictionary_values: ArrayRef =
Expand All @@ -589,7 +598,11 @@ impl<R: Read + Seek> FileReader<R> {
}
_ => None,
}
.expect("dictionary id not found in schema");
.ok_or_else(|| {
ArrowError::InvalidArgumentError(
"dictionary id not found in schema".to_string(),
)
})?;

// for all fields with this dictionary id, update the dictionaries vector
// in the reader. Note that a dictionary batch may be shared between many fields.
Expand All @@ -606,7 +619,11 @@ impl<R: Read + Seek> FileReader<R> {
}
}
}
_ => panic!("Expecting DictionaryBatch in dictionary blocks."),
_ => {
return Err(ArrowError::IoError(
"Expecting DictionaryBatch in dictionary blocks.".to_string(),
))
}
};
}

Expand All @@ -617,6 +634,7 @@ impl<R: Read + Seek> FileReader<R> {
current_block: 0,
total_blocks,
dictionaries_by_field,
metadata_version: footer.version(),
})
}

Expand Down Expand Up @@ -657,16 +675,31 @@ impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
let block = self.blocks[self.current_block];
self.current_block += 1;

// read length from end of offset
let meta_len = block.metaDataLength() - 4;
// read length
self.reader.seek(SeekFrom::Start(block.offset() as u64))?;
let mut meta_buf = [0; 4];
self.reader.read_exact(&mut meta_buf)?;
if meta_buf == CONTINUATION_MARKER {
// continuation marker encountered, read message next
self.reader.read_exact(&mut meta_buf)?;
}
let meta_len = i32::from_le_bytes(meta_buf);

let mut block_data = vec![0; meta_len as usize];
self.reader
.seek(SeekFrom::Start(block.offset() as u64 + 4))?;
self.reader.read_exact(&mut block_data)?;

let message = ipc::get_root_as_message(&block_data[..]);

// some old test data's footer metadata is not set, so we account for that
if self.metadata_version != ipc::MetadataVersion::V1
&& message.version() != self.metadata_version
{
return Err(ArrowError::IoError(
"Could not read IPC message as metadata versions mismatch"
.to_string(),
));
}

match message.header_type() {
ipc::MessageHeader::Schema => Err(ArrowError::IoError(
"Not expecting a schema when messages are read".to_string(),
Expand Down Expand Up @@ -733,16 +766,12 @@ impl<R: Read> StreamReader<R> {
let mut meta_size: [u8; 4] = [0; 4];
reader.read_exact(&mut meta_size)?;
let meta_len = {
let meta_len = u32::from_le_bytes(meta_size);

// If a continuation marker is encountered, skip over it and read
// the size from the next four bytes.
if meta_len == CONTINUATION_MARKER {
if meta_size == CONTINUATION_MARKER {
reader.read_exact(&mut meta_size)?;
u32::from_le_bytes(meta_size)
} else {
meta_len
}
i32::from_le_bytes(meta_size)
};

let mut meta_buffer = vec![0; meta_len as usize];
Expand Down Expand Up @@ -806,16 +835,12 @@ impl<R: Read> RecordBatchReader for StreamReader<R> {
}

let meta_len = {
let meta_len = u32::from_le_bytes(meta_size);

// If a continuation marker is encountered, skip over it and read
// the size from the next four bytes.
if meta_len == CONTINUATION_MARKER {
if meta_size == CONTINUATION_MARKER {
self.reader.read_exact(&mut meta_size)?;
u32::from_le_bytes(meta_size)
} else {
meta_len
}
i32::from_le_bytes(meta_size)
};

if meta_len == 0 {
Expand Down
Loading