diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 3c7e51f0cc0..d768f27f30d 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -239,7 +239,6 @@ use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy}; use crate::data::DataBlock; use crate::encoder::EncodedBatch; use crate::encodings::logical::list::StructuralListScheduler; -use crate::encodings::logical::map::StructuralMapScheduler; use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler; use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler}; use crate::format::pb::{self, column_encoding}; @@ -774,7 +773,7 @@ impl CoreFieldDecoderStrategy { Ok(Box::new(StructuralListScheduler::new(child_scheduler)) as Box) } - DataType::Map(_, keys_sorted) => { + DataType::Map(entries_field, keys_sorted) => { // TODO: We only support keys_sorted=false for now, // because converting a rust arrow map field to the python arrow field will // lose the keys_sorted property. @@ -784,14 +783,14 @@ impl CoreFieldDecoderStrategy { location: location!(), }); } - let entries_child = field - .children - .first() - .expect("Map field must have an entries child"); - let child_scheduler = - self.create_structural_field_scheduler(entries_child, column_infos)?; - Ok(Box::new(StructuralMapScheduler::new(child_scheduler)) - as Box) + + let list_field = Field::try_from(ArrowField::new( + field.name.clone(), + DataType::List(entries_field.clone()), + field.nullable, + ))?; + + self.create_structural_field_scheduler(&list_field, column_infos) } _ => todo!("create_structural_field_scheduler for {}", data_type), } diff --git a/rust/lance-encoding/src/encodings/logical/map.rs b/rust/lance-encoding/src/encodings/logical/map.rs index 4205a01a892..c73efe4c009 100644 --- a/rust/lance-encoding/src/encodings/logical/map.rs +++ b/rust/lance-encoding/src/encodings/logical/map.rs @@ -1,40 +1,32 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::{ops::Range, sync::Arc}; +use std::sync::Arc; -use arrow_array::{Array, ArrayRef, MapArray}; +use arrow_array::{Array, ArrayRef, ListArray, MapArray}; use arrow_schema::DataType; use futures::future::BoxFuture; -use lance_arrow::deepcopy::deep_copy_nulls; use lance_core::{Error, Result}; use snafu::location; use crate::{ - decoder::{ - DecodedArray, FilterExpression, ScheduledScanLine, SchedulerContext, - StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler, - StructuralSchedulingJob, - }, + decoder::{DecodedArray, StructuralDecodeArrayTask, StructuralFieldDecoder}, encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers}, + encodings::logical::list::ListStructuralEncoder, repdef::RepDefBuilder, }; - /// A structural encoder for map fields /// /// Map in Arrow is represented as List> -/// The map's offsets are added to the rep/def builder -/// and the map's entries (struct array) are passed to the child encoder +/// This encoder uses the [`ListStructuralEncoder`] to encode the data pub struct MapStructuralEncoder { - keep_original_array: bool, - child: Box, + list_encoder: ListStructuralEncoder, } impl MapStructuralEncoder { pub fn new(keep_original_array: bool, child: Box) -> Self { Self { - keep_original_array, - child, + list_encoder: ListStructuralEncoder::new(keep_original_array, child), } } } @@ -44,7 +36,7 @@ impl FieldEncoder for MapStructuralEncoder { &mut self, array: ArrayRef, external_buffers: &mut OutOfLineBuffers, - mut repdef: RepDefBuilder, + repdef: RepDefBuilder, row_number: u64, num_rows: u64, ) -> Result> { @@ -53,115 +45,63 @@ impl FieldEncoder for MapStructuralEncoder { .downcast_ref::() .expect("MapEncoder used for non-map data"); - // Map internally has offsets and entries (struct array) - let entries = map_array.entries(); - let offsets = map_array.offsets(); + let (entries_field, offsets, entries, validity, _keys_sorted) = + map_array.clone().into_parts(); - // Add offsets to RepDefBuilder to handle nullability and list structure - if self.keep_original_array { - repdef.add_offsets(offsets.clone(), array.nulls().cloned()) - } else { - repdef.add_offsets(offsets.clone(), deep_copy_nulls(array.nulls())) - }; + let list_array = ListArray::new(entries_field, offsets, Arc::new(entries), validity); - // Pass the entries (struct array) to the child encoder - // Convert to Arc - let entries_arc: ArrayRef = Arc::new(entries.clone()); - self.child - .maybe_encode(entries_arc, external_buffers, repdef, row_number, num_rows) + self.list_encoder.maybe_encode( + Arc::new(list_array), + external_buffers, + repdef, + row_number, + num_rows, + ) } fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result> { - self.child.flush(external_buffers) + self.list_encoder.flush(external_buffers) } fn num_columns(&self) -> u32 { - self.child.num_columns() + self.list_encoder.num_columns() } fn finish( &mut self, external_buffers: &mut OutOfLineBuffers, ) -> BoxFuture<'_, Result>> { - self.child.finish(external_buffers) - } -} - -#[derive(Debug)] -pub struct StructuralMapScheduler { - child: Box, -} - -impl StructuralMapScheduler { - pub fn new(child: Box) -> Self { - Self { child } + self.list_encoder.finish(external_buffers) } } -impl StructuralFieldScheduler for StructuralMapScheduler { - fn schedule_ranges<'a>( - &'a self, - ranges: &[Range], - filter: &FilterExpression, - ) -> Result> { - let child = self.child.schedule_ranges(ranges, filter)?; - - Ok(Box::new(StructuralMapSchedulingJob::new(child))) - } - - fn initialize<'a>( - &'a mut self, - filter: &'a FilterExpression, - context: &'a SchedulerContext, - ) -> BoxFuture<'a, Result<()>> { - self.child.initialize(filter, context) - } -} - -/// Scheduling job for map data +/// A structural decoder for map fields /// -/// Scheduling is handled by the child encoder (struct) and nothing special -/// happens here, similar to list. -#[derive(Debug)] -struct StructuralMapSchedulingJob<'a> { - child: Box, -} - -impl<'a> StructuralMapSchedulingJob<'a> { - fn new(child: Box) -> Self { - Self { child } - } -} - -impl StructuralSchedulingJob for StructuralMapSchedulingJob<'_> { - fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result> { - self.child.schedule_next(context) - } -} - +/// This decoder uses the [`StructuralListDecoder`] to decode the data as a list. +/// It then simply casts the list array to a map array. #[derive(Debug)] pub struct StructuralMapDecoder { - child: Box, + list_decoder: Box, data_type: DataType, } impl StructuralMapDecoder { - pub fn new(child: Box, data_type: DataType) -> Self { - Self { child, data_type } + pub fn new(list_decoder: Box, data_type: DataType) -> Self { + Self { + list_decoder, + data_type, + } } } impl StructuralFieldDecoder for StructuralMapDecoder { - fn accept_page(&mut self, child: crate::decoder::LoadedPageShard) -> Result<()> { - self.child.accept_page(child) + fn accept_page(&mut self, list_decoder: crate::decoder::LoadedPageShard) -> Result<()> { + self.list_decoder.accept_page(list_decoder) } fn drain(&mut self, num_rows: u64) -> Result> { - let child_task = self.child.drain(num_rows)?; - Ok(Box::new(StructuralMapDecodeTask::new( - child_task, - self.data_type.clone(), - ))) + let child_task = self.list_decoder.drain(num_rows)?; + Ok(Box::new(StructuralMapDecodeTask::new(child_task))) } fn data_type(&self) -> &DataType { @@ -171,49 +111,45 @@ impl StructuralFieldDecoder for StructuralMapDecoder { #[derive(Debug)] struct StructuralMapDecodeTask { - child_task: Box, - data_type: DataType, + list_decode_task: Box, } impl StructuralMapDecodeTask { - fn new(child_task: Box, data_type: DataType) -> Self { - Self { - child_task, - data_type, - } + fn new(list_decode_task: Box) -> Self { + Self { list_decode_task } } } impl StructuralDecodeArrayTask for StructuralMapDecodeTask { fn decode(self: Box) -> Result { - let DecodedArray { array, mut repdef } = self.child_task.decode()?; - - // Decode the offsets from RepDef - let (offsets, validity) = repdef.unravel_offsets::()?; + let DecodedArray { array, repdef } = self.list_decode_task.decode()?; + + let list_array = + array + .as_any() + .downcast_ref::() + .ok_or_else(|| Error::Schema { + message: format!( + "Expected list array from map's inner decoder, got: {:?}", + array.data_type() + ), + location: location!(), + })?; // Extract the entries field and keys_sorted from the map data type - let (entries_field, keys_sorted) = match &self.data_type { - DataType::Map(field, keys_sorted) => { - if *keys_sorted { - return Err(Error::NotSupported { - source: "Map type decoder does not support keys_sorted=true now" - .to_string() - .into(), - location: location!(), - }); - } - (field.clone(), *keys_sorted) - } + let entries_field = match list_array.data_type() { + DataType::List(field) => field.clone(), _ => { return Err(Error::Schema { - message: "Map decoder did not have a map field".to_string(), + message: "List array did not have list data type".to_string(), location: location!(), }); } }; // Convert the decoded array to StructArray - let entries = array + let entries = list_array + .values() .as_any() .downcast_ref::() .ok_or_else(|| Error::Schema { @@ -223,7 +159,13 @@ impl StructuralDecodeArrayTask for StructuralMapDecodeTask { .clone(); // Build the MapArray from offsets, entries, validity, and keys_sorted - let map_array = MapArray::new(entries_field, offsets, entries, validity, keys_sorted); + let map_array = MapArray::new( + entries_field, + list_array.offsets().clone(), + entries, + list_array.nulls().cloned(), + false, // keys_sorted is always false at the moment + ); Ok(DecodedArray { array: Arc::new(map_array), diff --git a/rust/lance-encoding/src/encodings/logical/struct.rs b/rust/lance-encoding/src/encodings/logical/struct.rs index 0a53ec9a21c..73f2ae8992d 100644 --- a/rust/lance-encoding/src/encodings/logical/struct.rs +++ b/rust/lance-encoding/src/encodings/logical/struct.rs @@ -284,9 +284,14 @@ impl StructuralStructDecoder { location: location!(), }); } - let child_decoder = Self::field_to_decoder(entries_field, should_validate)?; + let list_field = Arc::new(arrow_schema::Field::new( + field.name().clone(), + DataType::List(entries_field.clone()), + field.is_nullable(), + )); + let list_decoder = Self::field_to_decoder(&list_field, should_validate)?; Ok(Box::new(StructuralMapDecoder::new( - child_decoder, + list_decoder, field.data_type().clone(), ))) }