Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,14 @@ impl Field {
.unwrap_or(false)
}

// Blob columns intentionally have two schema representations:
// the loaded value view (legacy LargeBinary or blob v2 struct) and the unloaded
// descriptor view used by projection/planning. Schema set operations need to
// treat them as the same logical column instead of ordinary incompatible types.
fn is_compatible_blob_projection(&self, other: &Self) -> bool {
self.is_blob() && other.is_blob() && self.is_blob_v2() == other.is_blob_v2()
}

/// If the field is a blob, update this field with the same name and id
/// but with the data type set to a struct of the blob description fields.
///
Expand Down Expand Up @@ -640,6 +648,10 @@ impl Field {
)));
};

if self.is_compatible_blob_projection(other) {
return Ok(self.clone());
}

match (self.data_type(), other.data_type()) {
(DataType::Boolean, DataType::Boolean) => Ok(self.clone()),
(dt, other_dt)
Expand Down Expand Up @@ -768,6 +780,14 @@ impl Field {
)));
}

if self.is_compatible_blob_projection(other) {
return Ok(if self.id >= 0 {
self.clone()
} else {
other.clone()
});
}

let self_type = self.data_type();
let other_type = other.data_type();

Expand Down Expand Up @@ -1790,4 +1810,25 @@ mod tests {
assert_eq!(field.children.len(), 5);
assert_eq!(field.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type);
}

#[test]
fn project_by_field_accepts_blob_descriptor_projection() {
let metadata = HashMap::from([(BLOB_META_KEY.to_string(), "true".to_string())]);
let field: Field = ArrowField::new("blob", DataType::LargeBinary, true)
.with_metadata(metadata)
.try_into()
.unwrap();
let mut unloaded = field.clone();
unloaded.unloaded_mut();

let projected = field
.project_by_field(&unloaded, OnTypeMismatch::Error)
.unwrap();
assert_eq!(projected, field);

let unloaded_projected = unloaded
.project_by_field(&field, OnTypeMismatch::Error)
.unwrap();
assert_eq!(unloaded_projected, unloaded);
}
}
40 changes: 40 additions & 0 deletions rust/lance/src/dataset/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,46 @@ mod test {
assert_eq!(struct_arr.fields()[4].name(), "blob_uri");
}

#[tokio::test]
async fn test_projection_plan_accepts_unloaded_legacy_blob_schema() {
let mut metadata = HashMap::new();
metadata.insert(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string());
let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("blob", DataType::LargeBinary, true).with_metadata(metadata),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(LargeBinaryArray::from(vec![Some(
b"hello".as_slice(),
)]))],
)
.unwrap();
let write_params = WriteParams {
data_storage_version: Some(LanceFileVersion::Legacy),
..Default::default()
};
let batches = RecordBatchIterator::new([Ok(batch)], schema);
let dataset = Dataset::write(batches, "memory://", Some(write_params))
.await
.unwrap();

let mut projection = dataset.schema().project(&["blob"]).unwrap();
projection.fields[0].unloaded_mut();

let projection = ProjectionRequest::from_schema(projection)
.into_projection_plan(Arc::new(dataset))
.unwrap();

let output_schema = projection.output_schema().unwrap();
let blob_field = output_schema.field_with_name("blob").unwrap();
let DataType::Struct(fields) = blob_field.data_type() else {
panic!("expected blob output schema to be a struct, got {blob_field:?}");
};
assert_eq!(fields.len(), 2);
assert_eq!(fields[0].name(), "position");
assert_eq!(fields[1].name(), "size");
}

#[rstest]
#[tokio::test]
async fn test_take_rowid_rowaddr_with_projection_enable_stable_row_ids_projection_from_sql(
Expand Down
Loading