Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion rust/lance-core/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1489,10 +1489,23 @@ pub fn escape_field_path_for_project(name: &str) -> String {
#[cfg(test)]
mod tests {
use arrow_schema::{DataType as ArrowDataType, Fields as ArrowFields};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

use super::*;

#[test]
fn projection_from_schema_defaults_to_v1() {
let field = Field::try_from(&ArrowField::new("a", ArrowDataType::Int32, true)).unwrap();
let schema = Schema {
fields: vec![field],
metadata: HashMap::new(),
};

let projection = Projection::empty(Arc::new(schema));

assert_eq!(projection.blob_version, BlobVersion::V1);
}

#[test]
fn test_resolve_with_quoted_fields() {
// Create a schema with fields containing dots
Expand Down
34 changes: 25 additions & 9 deletions rust/lance-datafusion/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
};

use lance_core::{
datatypes::{OnMissing, Projectable, Projection, Schema},
datatypes::{BlobVersion, OnMissing, Projectable, Projection, Schema},
Error, Result, ROW_ADDR, ROW_CREATED_AT_VERSION, ROW_ID, ROW_LAST_UPDATED_AT_VERSION,
ROW_OFFSET, WILDCARD,
};
Expand All @@ -37,11 +37,16 @@ struct ProjectionBuilder {
needs_row_created_at: bool,
must_add_row_offset: bool,
has_wildcard: bool,
blob_version: BlobVersion,
}

impl ProjectionBuilder {
fn new(base: Arc<dyn Projectable>) -> Self {
let full_schema = Arc::new(Projection::full(base.clone()).to_arrow_schema());
fn new(base: Arc<dyn Projectable>, blob_version: BlobVersion) -> Self {
let full_schema = Arc::new(
Projection::full(base.clone())
.with_blob_version(blob_version)
.to_arrow_schema(),
);
let full_schema = Arc::new(ProjectionPlan::add_system_columns(&full_schema));
let planner = Planner::new(full_schema);

Expand All @@ -58,6 +63,7 @@ impl ProjectionBuilder {
needs_row_last_updated_at: false,
must_add_row_offset: false,
has_wildcard: false,
blob_version,
}
}

Expand Down Expand Up @@ -146,6 +152,8 @@ impl ProjectionBuilder {
.union_columns(&self.physical_cols, OnMissing::Ignore)?
};

physical_projection = physical_projection.with_blob_version(self.blob_version);

physical_projection.with_row_id = self.needs_row_id;
physical_projection.with_row_addr = self.needs_row_addr || self.must_add_row_offset;
physical_projection.with_row_last_updated_at_version = self.needs_row_last_updated_at;
Expand Down Expand Up @@ -202,8 +210,9 @@ impl ProjectionPlan {
pub fn from_expressions(
base: Arc<dyn Projectable>,
columns: &[(impl AsRef<str>, impl AsRef<str>)],
blob_version: BlobVersion,
) -> Result<Self> {
let mut builder = ProjectionBuilder::new(base);
let mut builder = ProjectionBuilder::new(base, blob_version);
builder.add_columns(columns)?;
builder.build()
}
Expand Down Expand Up @@ -242,7 +251,11 @@ impl ProjectionPlan {
/// ```
///
/// This is something that cannot be done easily using expressions.
pub fn from_schema(base: Arc<dyn Projectable>, projection: &Schema) -> Result<Self> {
pub fn from_schema(
base: Arc<dyn Projectable>,
projection: &Schema,
blob_version: BlobVersion,
) -> Result<Self> {
// Separate data columns from system columns
// System columns (_rowid, _rowaddr, etc.) are handled via flags in Projection,
// not as fields in the Schema
Expand Down Expand Up @@ -282,7 +295,9 @@ impl ProjectionPlan {
};

// Calculate the physical projection from data columns only
let mut physical_projection = Projection::empty(base).union_schema(&data_schema);
let mut physical_projection = Projection::empty(base)
.union_schema(&data_schema)
.with_blob_version(blob_version);
physical_projection.with_row_id = with_row_id;
physical_projection.with_row_addr = with_row_addr;

Expand All @@ -303,16 +318,17 @@ impl ProjectionPlan {
})
}

pub fn full(base: Arc<dyn Projectable>) -> Result<Self> {
pub fn full(base: Arc<dyn Projectable>, blob_version: BlobVersion) -> Result<Self> {
let physical_cols: Vec<&str> = base
.schema()
.fields
.iter()
.map(|f| f.name.as_ref())
.collect::<Vec<_>>();

let physical_projection =
Projection::empty(base.clone()).union_columns(&physical_cols, OnMissing::Ignore)?;
let physical_projection = Projection::empty(base.clone())
.union_columns(&physical_cols, OnMissing::Ignore)?
.with_blob_version(blob_version);

let requested_output_expr = physical_cols
.into_iter()
Expand Down
7 changes: 4 additions & 3 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ impl ProjectionRequest {
}

pub fn into_projection_plan(self, dataset: Arc<Dataset>) -> Result<ProjectionPlan> {
let blob_version = dataset.blob_version();
match self {
Self::Schema(schema) => {
// The schema might contain system columns (_rowid, _rowaddr) which are not
Expand All @@ -407,18 +408,18 @@ impl ProjectionRequest {
if system_columns_present {
// If system columns are present, we can't use project_by_schema directly
// Just pass the schema to ProjectionPlan::from_schema which handles it
ProjectionPlan::from_schema(dataset, schema.as_ref())
ProjectionPlan::from_schema(dataset, schema.as_ref(), blob_version)
} else {
// No system columns, use normal path with validation
let projection = dataset.schema().project_by_schema(
schema.as_ref(),
OnMissing::Error,
OnTypeMismatch::Error,
)?;
ProjectionPlan::from_schema(dataset, &projection)
ProjectionPlan::from_schema(dataset, &projection, blob_version)
}
}
Self::Sql(columns) => ProjectionPlan::from_expressions(dataset, &columns),
Self::Sql(columns) => ProjectionPlan::from_expressions(dataset, &columns, blob_version),
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,8 @@ impl TakeOperation {

impl Scanner {
pub fn new(dataset: Arc<Dataset>) -> Self {
let projection_plan = ProjectionPlan::full(dataset.clone()).unwrap();
let projection_plan =
ProjectionPlan::full(dataset.clone(), dataset.blob_version()).unwrap();
let file_reader_options = dataset.file_reader_options.clone();
let mut scanner = Self {
dataset,
Expand Down Expand Up @@ -723,7 +724,11 @@ impl Scanner {
columns: &[(impl AsRef<str>, impl AsRef<str>)],
) -> Result<&mut Self> {
self.explicit_projection = true;
self.projection_plan = ProjectionPlan::from_expressions(self.dataset.clone(), columns)?;
self.projection_plan = ProjectionPlan::from_expressions(
self.dataset.clone(),
columns,
self.dataset.blob_version(),
)?;
if self.legacy_with_row_id {
self.projection_plan.include_row_id();
}
Expand Down