From ef5a26a7e5263c1541beaece401094b05ac808f7 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Mon, 12 Oct 2020 19:57:16 +0200 Subject: [PATCH] ARROW-10289: [Rust] Read dictionaries in IPC streams --- rust/arrow/src/ipc/reader.rs | 174 +++++++++++++++++++++-------------- 1 file changed, 107 insertions(+), 67 deletions(-) diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs index 53c422d481c..e4bb003d0bc 100644 --- a/rust/arrow/src/ipc/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -445,6 +445,69 @@ pub fn read_record_batch( RecordBatch::try_new(schema, arrays) } +/// Read the dictionary from the buffer and provided metadata, +/// updating the `dictionaries_by_field` with the resulting dictionary +fn read_dictionary( + buf: &[u8], + batch: ipc::DictionaryBatch, + ipc_schema: &ipc::Schema, + schema: &Schema, + dictionaries_by_field: &mut [Option], +) -> Result<()> { + if batch.isDelta() { + return Err(ArrowError::IoError( + "delta dictionary batches not supported".to_string(), + )); + } + + let id = batch.id(); + + // As the dictionary batch does not contain the type of the + // values array, we need to retrieve this from the schema. + let first_field = find_dictionary_field(ipc_schema, id).ok_or_else(|| { + ArrowError::InvalidArgumentError("dictionary id not found in schema".to_string()) + })?; + + // Get an array representing this dictionary's values. + let dictionary_values: ArrayRef = match schema.field(first_field).data_type() { + DataType::Dictionary(_, ref value_type) => { + // Make a fake schema for the dictionary batch. + let schema = Schema { + fields: vec![Field::new("", value_type.as_ref().clone(), false)], + metadata: HashMap::new(), + }; + // Read a single column + let record_batch = read_record_batch( + &buf, + batch.data().unwrap(), + Arc::new(schema), + &dictionaries_by_field, + )?; + Some(record_batch.column(0).clone()) + } + _ => None, + } + .ok_or_else(|| { + ArrowError::InvalidArgumentError("dictionary id not found in schema".to_string()) + })?; + + // for all fields with this dictionary id, update the dictionaries vector + // in the reader. Note that a dictionary batch may be shared between many fields. + // We don't currently record the isOrdered field. This could be general + // attributes of arrays. + let fields = ipc_schema.fields().unwrap(); + for (i, field) in fields.iter().enumerate() { + if let Some(dictionary) = field.dictionary() { + if dictionary.id() == id { + // Add (possibly multiple) array refs to the dictionaries array. + dictionaries_by_field[i] = Some(dictionary_values.clone()); + } + } + } + + Ok(()) +} + // Linear search for the first dictionary field with a dictionary id. fn find_dictionary_field(ipc_schema: &ipc::Schema, id: i64) -> Option { let fields = ipc_schema.fields().unwrap(); @@ -556,67 +619,13 @@ impl FileReader { ))?; reader.read_exact(&mut buf)?; - if batch.isDelta() { - return Err(ArrowError::IoError( - "delta dictionary batches not supported".to_string(), - )); - } - - let id = batch.id(); - - // As the dictionary batch does not contain the type of the - // values array, we need to retieve this from the schema. - let first_field = - find_dictionary_field(&ipc_schema, id).ok_or_else(|| { - ArrowError::InvalidArgumentError( - "dictionary id not found in schema".to_string(), - ) - })?; - - // Get an array representing this dictionary's values. - let dictionary_values: ArrayRef = - match schema.field(first_field).data_type() { - DataType::Dictionary(_, ref value_type) => { - // Make a fake schema for the dictionary batch. - let schema = Schema { - fields: vec![Field::new( - "", - value_type.as_ref().clone(), - false, - )], - metadata: HashMap::new(), - }; - // Read a single column - let record_batch = read_record_batch( - &buf, - batch.data().unwrap(), - Arc::new(schema), - &dictionaries_by_field, - )?; - Some(record_batch.column(0).clone()) - } - _ => None, - } - .ok_or_else(|| { - ArrowError::InvalidArgumentError( - "dictionary id not found in schema".to_string(), - ) - })?; - - // for all fields with this dictionary id, update the dictionaries vector - // in the reader. Note that a dictionary batch may be shared between many fields. - // We don't currently record the isOrdered field. This could be general - // attributes of arrays. - let fields = ipc_schema.fields().unwrap(); - for (i, field) in fields.iter().enumerate() { - if let Some(dictionary) = field.dictionary() { - if dictionary.id() == id { - // Add (possibly multiple) array refs to the dictionaries array. - dictionaries_by_field[i] = - Some(dictionary_values.clone()); - } - } - } + read_dictionary( + &buf, + batch, + &ipc_schema, + &schema, + &mut dictionaries_by_field, + )?; } _ => { return Err(ArrowError::IoError( @@ -747,17 +756,24 @@ impl RecordBatchReader for FileReader { pub struct StreamReader { /// Buffered stream reader reader: BufReader, + /// The schema that is read from the stream's first message schema: SchemaRef, - /// An indicator of whether the strewam is complete. + + /// The bytes of the IPC schema that is read from the stream's first message /// - /// This value is set to `true` the first time the reader's `next()` returns `None`. - finished: bool, + /// This is kept in order to interpret dictionary data + ipc_schema: Vec, /// Optional dictionaries for each schema field. /// /// Dictionaries may be appended to in the streaming format. dictionaries_by_field: Vec>, + + /// An indicator of whether the stream is complete. + /// + /// This value is set to `true` the first time the reader's `next()` returns `None`. + finished: bool, } impl StreamReader { @@ -783,8 +799,7 @@ impl StreamReader { let mut meta_buffer = vec![0; meta_len as usize]; reader.read_exact(&mut meta_buffer)?; - let vecs = &meta_buffer.to_vec(); - let message = ipc::get_root_as_message(vecs); + let message = ipc::get_root_as_message(meta_buffer.as_slice()); // message header is a Schema, so read it let ipc_schema: ipc::Schema = message.header_as_schema().ok_or_else(|| { ArrowError::IoError("Unable to read IPC message as schema".to_string()) @@ -797,6 +812,7 @@ impl StreamReader { Ok(Self { reader, schema: Arc::new(schema), + ipc_schema: meta_buffer, finished: false, dictionaries_by_field, }) @@ -871,6 +887,30 @@ impl StreamReader { read_record_batch(&buf, batch, self.schema(), &self.dictionaries_by_field).map(Some) } + ipc::MessageHeader::DictionaryBatch => { + let batch = message.header_as_dictionary_batch().ok_or_else(|| { + ArrowError::IoError( + "Unable to read IPC message as dictionary batch".to_string(), + ) + })?; + // read the block that makes up the dictionary batch into a buffer + let mut buf = vec![0; message.bodyLength() as usize]; + self.reader.read_exact(&mut buf)?; + + let ipc_schema = ipc::get_root_as_message(&self.ipc_schema).header_as_schema() + .ok_or_else(|| { + ArrowError::IoError( + "Unable to read schema from stored message header".to_string(), + ) + })?; + + read_dictionary( + &buf, batch, &ipc_schema, &self.schema, &mut self.dictionaries_by_field + )?; + + // read the next message until we encounter a RecordBatch + self.maybe_next() + } ipc::MessageHeader::NONE => { Ok(None) } @@ -940,7 +980,7 @@ mod tests { let paths = vec![ "generated_interval", "generated_datetime", - // "generated_dictionary", + "generated_dictionary", "generated_nested", "generated_primitive_no_batches", "generated_primitive_zerolength",