Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions python/python/tests/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,11 @@ def test_scan_blob(tmp_path, dataset_with_blobs):

def test_blob_extension_write_inline(tmp_path):
table = pa.table({"blob": lance.blob_array([b"foo", b"bar"])})
ds = lance.write_dataset(table, tmp_path / "test_ds_v2", data_storage_version="2.2")
ds = lance.write_dataset(
table,
tmp_path / "test_ds_v2",
data_storage_version="2.2",
)

desc = ds.to_table(columns=["blob"]).column("blob").chunk(0)
assert pa.types.is_struct(desc.type)
Expand All @@ -319,7 +323,9 @@ def test_blob_extension_write_external(tmp_path):

table = pa.table({"blob": lance.blob_array([uri])})
ds = lance.write_dataset(
table, tmp_path / "test_ds_v2_external", data_storage_version="2.2"
table,
tmp_path / "test_ds_v2_external",
data_storage_version="2.2",
)

blob = ds.take_blobs("blob", indices=[0])[0]
Expand Down
77 changes: 26 additions & 51 deletions rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,6 @@ pub enum BlobVersion {
/// Blob v2 struct format.
V2,
}

impl BlobVersion {
/// Convert a persisted string value (e.g. table config) into a blob version
pub fn from_config_value(value: &str) -> Option<Self> {
match value {
"1" => Some(Self::V1),
"2" => Some(Self::V2),
_ => None,
}
}

/// Persistable string representation for table config.
pub fn config_value(self) -> &'static str {
match self {
Self::V1 => "1",
Self::V2 => "2",
}
}
}
/// Encoding enum.
#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
pub enum Encoding {
Expand Down Expand Up @@ -302,11 +283,7 @@ impl Field {
} else {
let mut new_field = self.clone();
new_field.children = children;
Some(
projection
.blob_handling
.unload_if_needed(new_field, projection.blob_version),
)
Some(projection.blob_handling.unload_if_needed(new_field))
}
}

Expand Down Expand Up @@ -562,28 +539,6 @@ impl Field {
}
}

/// If the field is a blob, return a new field with the same name and id
/// but with the data type set to a struct of the blob description fields.
///
/// If the field is not a blob, return the field itself.
pub fn into_unloaded_with_version(mut self, version: BlobVersion) -> Self {
if self.is_blob() {
match version {
BlobVersion::V2 => {
self.logical_type = BLOB_V2_DESC_LANCE_FIELD.logical_type.clone();
self.children = BLOB_V2_DESC_LANCE_FIELD.children.clone();
self.metadata = BLOB_V2_DESC_LANCE_FIELD.metadata.clone();
}
BlobVersion::V1 => {
self.logical_type = BLOB_DESC_LANCE_FIELD.logical_type.clone();
self.children = BLOB_DESC_LANCE_FIELD.children.clone();
self.metadata = BLOB_DESC_LANCE_FIELD.metadata.clone();
}
}
}
self
}

pub fn project(&self, path_components: &[&str]) -> Result<Self> {
let mut f = Self {
name: self.name.clone(),
Expand Down Expand Up @@ -1806,14 +1761,34 @@ mod tests {
}

#[test]
fn blob_into_unloaded_selects_v2_layout() {
fn blob_unloaded_mut_selects_layout_from_metadata() {
let metadata = HashMap::from([(BLOB_META_KEY.to_string(), "true".to_string())]);
let field: Field = ArrowField::new("blob", DataType::LargeBinary, true)
let mut field: Field = ArrowField::new("blob", DataType::LargeBinary, true)
.with_metadata(metadata)
.try_into()
.unwrap();
let unloaded = field.into_unloaded_with_version(BlobVersion::V2);
assert_eq!(unloaded.children.len(), 5);
assert_eq!(unloaded.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type);
field.unloaded_mut();
assert_eq!(field.children.len(), 2);
assert_eq!(field.logical_type, BLOB_DESC_LANCE_FIELD.logical_type);

let metadata =
HashMap::from([(ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string())]);
let mut field: Field = ArrowField::new(
"blob",
DataType::Struct(
vec![
ArrowField::new("data", DataType::LargeBinary, true),
ArrowField::new("uri", DataType::Utf8, true),
]
.into(),
),
true,
)
.with_metadata(metadata)
.try_into()
.unwrap();
field.unloaded_mut();
Comment thread
Xuanwo marked this conversation as resolved.
assert_eq!(field.children.len(), 5);
assert_eq!(field.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type);
}
}
30 changes: 4 additions & 26 deletions rust/lance-core/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use deepsize::DeepSizeOf;
use lance_arrow::*;
use snafu::location;

use super::field::{BlobVersion, Field, OnTypeMismatch, SchemaCompareOptions};
use super::field::{Field, OnTypeMismatch, SchemaCompareOptions};
use crate::{
Error, Result, ROW_ADDR, ROW_ADDR_FIELD, ROW_CREATED_AT_VERSION, ROW_CREATED_AT_VERSION_FIELD,
ROW_ID, ROW_ID_FIELD, ROW_LAST_UPDATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION_FIELD,
Expand Down Expand Up @@ -1061,12 +1061,11 @@ impl BlobHandling {
}
}

pub fn unload_if_needed(&self, field: Field, version: BlobVersion) -> Field {
pub fn unload_if_needed(&self, mut field: Field) -> Field {
if self.should_unload(&field) {
field.into_unloaded_with_version(version)
} else {
field
field.unloaded_mut();
}
field
}
}

Expand All @@ -1083,7 +1082,6 @@ pub struct Projection {
pub with_row_last_updated_at_version: bool,
pub with_row_created_at_version: bool,
pub blob_handling: BlobHandling,
pub blob_version: BlobVersion,
}

impl Debug for Projection {
Expand All @@ -1101,7 +1099,6 @@ impl Debug for Projection {
&self.with_row_created_at_version,
)
.field("blob_handling", &self.blob_handling)
.field("blob_version", &self.blob_version)
.finish()
}
}
Expand All @@ -1117,7 +1114,6 @@ impl Projection {
with_row_last_updated_at_version: false,
with_row_created_at_version: false,
blob_handling: BlobHandling::default(),
blob_version: BlobVersion::V1,
}
}

Expand Down Expand Up @@ -1151,11 +1147,6 @@ impl Projection {
self
}

pub fn with_blob_version(mut self, blob_version: BlobVersion) -> Self {
self.blob_version = blob_version;
self
}

fn add_field_children(field_ids: &mut HashSet<i32>, field: &Field) {
for child in &field.children {
field_ids.insert(child.id);
Expand Down Expand Up @@ -1620,19 +1611,6 @@ mod tests {

use super::*;

#[test]
fn projection_from_schema_defaults_to_v1() {
let field = Field::try_from(&ArrowField::new("a", ArrowDataType::Int32, true)).unwrap();
let schema = Schema {
fields: vec![field],
metadata: HashMap::new(),
};

let projection = Projection::empty(Arc::new(schema));

assert_eq!(projection.blob_version, BlobVersion::V1);
}

#[test]
fn test_resolve_with_quoted_fields() {
// Create a schema with fields containing dots
Expand Down
36 changes: 10 additions & 26 deletions rust/lance-datafusion/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{
use tracing::instrument;

use lance_core::{
datatypes::{BlobVersion, OnMissing, Projectable, Projection, Schema},
datatypes::{OnMissing, Projectable, Projection, Schema},
Error, Result, ROW_ADDR, ROW_CREATED_AT_VERSION, ROW_ID, ROW_LAST_UPDATED_AT_VERSION,
ROW_OFFSET, WILDCARD,
};
Expand All @@ -38,16 +38,11 @@ struct ProjectionBuilder {
needs_row_created_at: bool,
must_add_row_offset: bool,
has_wildcard: bool,
blob_version: BlobVersion,
}

impl ProjectionBuilder {
fn new(base: Arc<dyn Projectable>, blob_version: BlobVersion) -> Self {
let full_schema = Arc::new(
Projection::full(base.clone())
.with_blob_version(blob_version)
.to_arrow_schema(),
);
fn new(base: Arc<dyn Projectable>) -> Self {
let full_schema = Arc::new(Projection::full(base.clone()).to_arrow_schema());
let full_schema = Arc::new(ProjectionPlan::add_system_columns(&full_schema));
let planner = Planner::new(full_schema);

Expand All @@ -64,7 +59,6 @@ impl ProjectionBuilder {
needs_row_last_updated_at: false,
must_add_row_offset: false,
has_wildcard: false,
blob_version,
}
}

Expand Down Expand Up @@ -153,8 +147,6 @@ impl ProjectionBuilder {
.union_columns(&self.physical_cols, OnMissing::Ignore)?
};

physical_projection = physical_projection.with_blob_version(self.blob_version);

physical_projection.with_row_id = self.needs_row_id;
physical_projection.with_row_addr = self.needs_row_addr || self.must_add_row_offset;
physical_projection.with_row_last_updated_at_version = self.needs_row_last_updated_at;
Expand Down Expand Up @@ -211,9 +203,8 @@ impl ProjectionPlan {
pub fn from_expressions(
base: Arc<dyn Projectable>,
columns: &[(impl AsRef<str>, impl AsRef<str>)],
blob_version: BlobVersion,
) -> Result<Self> {
let mut builder = ProjectionBuilder::new(base, blob_version);
let mut builder = ProjectionBuilder::new(base);
builder.add_columns(columns)?;
builder.build()
}
Expand Down Expand Up @@ -252,11 +243,7 @@ impl ProjectionPlan {
/// ```
///
/// This is something that cannot be done easily using expressions.
pub fn from_schema(
base: Arc<dyn Projectable>,
projection: &Schema,
blob_version: BlobVersion,
) -> Result<Self> {
pub fn from_schema(base: Arc<dyn Projectable>, projection: &Schema) -> Result<Self> {
// Separate data columns from system columns
// System columns (_rowid, _rowaddr, etc.) are handled via flags in Projection,
// not as fields in the Schema
Expand Down Expand Up @@ -302,9 +289,7 @@ impl ProjectionPlan {
};

// Calculate the physical projection from data columns only
let mut physical_projection = Projection::empty(base)
.union_schema(&data_schema)
.with_blob_version(blob_version);
let mut physical_projection = Projection::empty(base).union_schema(&data_schema);
physical_projection.with_row_id = with_row_id;
physical_projection.with_row_addr = with_row_addr;
physical_projection.with_row_last_updated_at_version = with_row_last_updated_at_version;
Expand All @@ -327,17 +312,16 @@ impl ProjectionPlan {
})
}

pub fn full(base: Arc<dyn Projectable>, blob_version: BlobVersion) -> Result<Self> {
pub fn full(base: Arc<dyn Projectable>) -> Result<Self> {
let physical_cols: Vec<&str> = base
.schema()
.fields
.iter()
.map(|f| f.name.as_ref())
.collect::<Vec<_>>();

let physical_projection = Projection::empty(base.clone())
.union_columns(&physical_cols, OnMissing::Ignore)?
.with_blob_version(blob_version);
let physical_projection =
Projection::empty(base.clone()).union_columns(&physical_cols, OnMissing::Ignore)?;

let requested_output_expr = physical_cols
.into_iter()
Expand Down Expand Up @@ -486,7 +470,7 @@ mod tests {
let base_schema = Schema::try_from(&arrow_schema).unwrap();
let base = Arc::new(base_schema.clone());

let plan = ProjectionPlan::from_schema(base, &base_schema, BlobVersion::default()).unwrap();
let plan = ProjectionPlan::from_schema(base, &base_schema).unwrap();

let physical = plan.physical_projection.to_arrow_schema();
assert!(is_json_field(physical.field_with_name("meta").unwrap()));
Expand Down
Loading
Loading