Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 107 additions & 67 deletions rust/arrow/src/ipc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,69 @@ pub fn read_record_batch(
RecordBatch::try_new(schema, arrays)
}

/// Read the dictionary from the buffer and provided metadata,
/// updating the `dictionaries_by_field` with the resulting dictionary
fn read_dictionary(
buf: &[u8],
batch: ipc::DictionaryBatch,
ipc_schema: &ipc::Schema,
schema: &Schema,
dictionaries_by_field: &mut [Option<ArrayRef>],
) -> Result<()> {
if batch.isDelta() {
return Err(ArrowError::IoError(
"delta dictionary batches not supported".to_string(),
));
}

let id = batch.id();

// As the dictionary batch does not contain the type of the
// values array, we need to retrieve this from the schema.
let first_field = find_dictionary_field(ipc_schema, id).ok_or_else(|| {
ArrowError::InvalidArgumentError("dictionary id not found in schema".to_string())
})?;

// Get an array representing this dictionary's values.
let dictionary_values: ArrayRef = match schema.field(first_field).data_type() {
DataType::Dictionary(_, ref value_type) => {
// Make a fake schema for the dictionary batch.
let schema = Schema {
fields: vec![Field::new("", value_type.as_ref().clone(), false)],
metadata: HashMap::new(),
};
// Read a single column
let record_batch = read_record_batch(
&buf,
batch.data().unwrap(),
Arc::new(schema),
&dictionaries_by_field,
)?;
Some(record_batch.column(0).clone())
}
_ => None,
}
.ok_or_else(|| {
ArrowError::InvalidArgumentError("dictionary id not found in schema".to_string())
})?;

// for all fields with this dictionary id, update the dictionaries vector
// in the reader. Note that a dictionary batch may be shared between many fields.
// We don't currently record the isOrdered field. This could be general
// attributes of arrays.
let fields = ipc_schema.fields().unwrap();
for (i, field) in fields.iter().enumerate() {
if let Some(dictionary) = field.dictionary() {
if dictionary.id() == id {
// Add (possibly multiple) array refs to the dictionaries array.
dictionaries_by_field[i] = Some(dictionary_values.clone());
}
}
}

Ok(())
}

// Linear search for the first dictionary field with a dictionary id.
fn find_dictionary_field(ipc_schema: &ipc::Schema, id: i64) -> Option<usize> {
let fields = ipc_schema.fields().unwrap();
Expand Down Expand Up @@ -556,67 +619,13 @@ impl<R: Read + Seek> FileReader<R> {
))?;
reader.read_exact(&mut buf)?;

if batch.isDelta() {
return Err(ArrowError::IoError(
"delta dictionary batches not supported".to_string(),
));
}

let id = batch.id();

// As the dictionary batch does not contain the type of the
// values array, we need to retieve this from the schema.
let first_field =
find_dictionary_field(&ipc_schema, id).ok_or_else(|| {
ArrowError::InvalidArgumentError(
"dictionary id not found in schema".to_string(),
)
})?;

// Get an array representing this dictionary's values.
let dictionary_values: ArrayRef =
match schema.field(first_field).data_type() {
DataType::Dictionary(_, ref value_type) => {
// Make a fake schema for the dictionary batch.
let schema = Schema {
fields: vec![Field::new(
"",
value_type.as_ref().clone(),
false,
)],
metadata: HashMap::new(),
};
// Read a single column
let record_batch = read_record_batch(
&buf,
batch.data().unwrap(),
Arc::new(schema),
&dictionaries_by_field,
)?;
Some(record_batch.column(0).clone())
}
_ => None,
}
.ok_or_else(|| {
ArrowError::InvalidArgumentError(
"dictionary id not found in schema".to_string(),
)
})?;

// for all fields with this dictionary id, update the dictionaries vector
// in the reader. Note that a dictionary batch may be shared between many fields.
// We don't currently record the isOrdered field. This could be general
// attributes of arrays.
let fields = ipc_schema.fields().unwrap();
for (i, field) in fields.iter().enumerate() {
if let Some(dictionary) = field.dictionary() {
if dictionary.id() == id {
// Add (possibly multiple) array refs to the dictionaries array.
dictionaries_by_field[i] =
Some(dictionary_values.clone());
}
}
}
read_dictionary(
&buf,
batch,
&ipc_schema,
&schema,
&mut dictionaries_by_field,
)?;
}
_ => {
return Err(ArrowError::IoError(
Expand Down Expand Up @@ -747,17 +756,24 @@ impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
pub struct StreamReader<R: Read> {
/// Buffered stream reader
reader: BufReader<R>,

/// The schema that is read from the stream's first message
schema: SchemaRef,
/// An indicator of whether the strewam is complete.

/// The bytes of the IPC schema that is read from the stream's first message
///
/// This value is set to `true` the first time the reader's `next()` returns `None`.
finished: bool,
/// This is kept in order to interpret dictionary data
ipc_schema: Vec<u8>,

/// Optional dictionaries for each schema field.
///
/// Dictionaries may be appended to in the streaming format.
dictionaries_by_field: Vec<Option<ArrayRef>>,

/// An indicator of whether the stream is complete.
///
/// This value is set to `true` the first time the reader's `next()` returns `None`.
finished: bool,
}

impl<R: Read> StreamReader<R> {
Expand All @@ -783,8 +799,7 @@ impl<R: Read> StreamReader<R> {
let mut meta_buffer = vec![0; meta_len as usize];
reader.read_exact(&mut meta_buffer)?;

let vecs = &meta_buffer.to_vec();
let message = ipc::get_root_as_message(vecs);
let message = ipc::get_root_as_message(meta_buffer.as_slice());
// message header is a Schema, so read it
let ipc_schema: ipc::Schema = message.header_as_schema().ok_or_else(|| {
ArrowError::IoError("Unable to read IPC message as schema".to_string())
Expand All @@ -797,6 +812,7 @@ impl<R: Read> StreamReader<R> {
Ok(Self {
reader,
schema: Arc::new(schema),
ipc_schema: meta_buffer,
finished: false,
dictionaries_by_field,
})
Expand Down Expand Up @@ -871,6 +887,30 @@ impl<R: Read> StreamReader<R> {

read_record_batch(&buf, batch, self.schema(), &self.dictionaries_by_field).map(Some)
}
ipc::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().ok_or_else(|| {
ArrowError::IoError(
"Unable to read IPC message as dictionary batch".to_string(),
)
})?;
// read the block that makes up the dictionary batch into a buffer
let mut buf = vec![0; message.bodyLength() as usize];
self.reader.read_exact(&mut buf)?;

let ipc_schema = ipc::get_root_as_message(&self.ipc_schema).header_as_schema()
.ok_or_else(|| {
ArrowError::IoError(
"Unable to read schema from stored message header".to_string(),
)
})?;

read_dictionary(
&buf, batch, &ipc_schema, &self.schema, &mut self.dictionaries_by_field
)?;

// read the next message until we encounter a RecordBatch
self.maybe_next()
}
ipc::MessageHeader::NONE => {
Ok(None)
}
Expand Down Expand Up @@ -940,7 +980,7 @@ mod tests {
let paths = vec![
"generated_interval",
"generated_datetime",
// "generated_dictionary",
"generated_dictionary",
"generated_nested",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
Expand Down