Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rust/lance-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jsonb = {workspace = true}
lance-arrow.workspace = true
lance-core = {workspace = true, features = ["datafusion"]}
lance-datagen.workspace = true
lance-geo = {workspace = true}
lance-geo = {workspace = true, optional = true}
chrono.workspace = true
log.workspace = true
pin-project.workspace = true
Expand All @@ -44,6 +44,7 @@ protobuf-src = {version = "2.1", optional = true}
lance-datagen.workspace = true

[features]
geo = ["dep:lance-geo"]
substrait = ["dep:datafusion-substrait"]
protoc = ["dep:protobuf-src"]

Expand Down
45 changes: 45 additions & 0 deletions rust/lance-datafusion/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,52 @@ pub fn register_functions(ctx: &SessionContext) {
ctx.register_udf(json::json_array_contains_udf());
ctx.register_udf(json::json_array_length_udf());
// GEO functions
#[cfg(feature = "geo")]
lance_geo::register_functions(ctx);
#[cfg(not(feature = "geo"))]
register_geo_stub_functions(ctx);
}

/// When the `geo` feature is disabled, register stub UDFs for spatial SQL functions
/// so that users get a clear error mentioning the feature flag instead of
/// DataFusion's generic "Unknown function" error.
#[cfg(not(feature = "geo"))]
fn register_geo_stub_functions(ctx: &SessionContext) {
let geo_funcs = [
"st_intersects",
"st_contains",
"st_within",
"st_touches",
"st_crosses",
"st_overlaps",
"st_covers",
"st_coveredby",
"st_distance",
"st_area",
"st_length",
];

for name in geo_funcs {
let func_name = name.to_string();
let stub = Arc::new(make_scalar_function(
move |_args: &[ArrayRef]| {
Err(datafusion::error::DataFusionError::Plan(format!(
"Function '{}' requires the `geo` feature. \
Rebuild with `--features geo` to enable geospatial functions.",
func_name
)))
},
vec![],
));

ctx.register_udf(create_udf(
name,
vec![DataType::Binary, DataType::Binary],
DataType::Boolean,
Volatility::Immutable,
stub,
));
}
}

/// This method checks whether a string contains all specified tokens. The tokens are separated by
Expand Down
13 changes: 8 additions & 5 deletions rust/lance-geo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ description = "Lance's geospatial extension providing geospatial UDFs."

[dependencies]
datafusion.workspace = true
geoarrow-array.workspace = true
geoarrow-schema.workspace = true
geodatafusion.workspace = true
geo-traits.workspace = true
geo-types.workspace = true
geoarrow-array = { workspace = true, optional = true }
geoarrow-schema = { workspace = true, optional = true }
geodatafusion = { workspace = true, optional = true }
geo-traits = { workspace = true, optional = true }
geo-types = { workspace = true, optional = true }
lance-core.workspace = true
serde.workspace = true

[features]
geo = ["dep:geoarrow-array", "dep:geoarrow-schema", "dep:geodatafusion", "dep:geo-traits", "dep:geo-types"]

[lints]
workspace = true
4 changes: 4 additions & 0 deletions rust/lance-geo/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@

use datafusion::prelude::SessionContext;

#[cfg(feature = "geo")]
pub mod bbox;

pub fn register_functions(ctx: &SessionContext) {
#[cfg(feature = "geo")]
geodatafusion::register(ctx);
#[cfg(not(feature = "geo"))]
let _ = ctx;
}
10 changes: 6 additions & 4 deletions rust/lance-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ deepsize.workspace = true
dirs.workspace = true
fst.workspace = true
futures.workspace = true
geoarrow-array = { workspace = true }
geoarrow-schema = { workspace = true }
geo-types = { workspace = true }
geoarrow-array = { workspace = true, optional = true }
geoarrow-schema = { workspace = true, optional = true }
geo-types = { workspace = true, optional = true }
half.workspace = true
itertools.workspace = true
jieba-rs = { workspace = true, optional = true }
Expand All @@ -42,7 +42,7 @@ lance-core.workspace = true
lance-datafusion.workspace = true
lance-encoding.workspace = true
lance-file.workspace = true
lance-geo.workspace = true
lance-geo = { workspace = true, optional = true }
lance-io.workspace = true
lance-linalg.workspace = true
lance-table.workspace = true
Expand Down Expand Up @@ -88,6 +88,7 @@ rstest.workspace = true
chrono.workspace = true

[features]
geo = ["dep:lance-geo", "lance-geo/geo", "dep:geoarrow-array", "dep:geoarrow-schema", "dep:geo-types"]
protoc = ["dep:protobuf-src"]
tokenizer-lindera = ["dep:lindera", "dep:lindera-tantivy"]
tokenizer-jieba = ["dep:jieba-rs"]
Expand Down Expand Up @@ -162,6 +163,7 @@ harness = false
[[bench]]
name = "geo"
harness = false
required-features = ["geo"]

[lints]
workspace = true
20 changes: 15 additions & 5 deletions rust/lance-index/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use std::{collections::HashMap, sync::Arc};
use lance_core::{Error, Result};
use snafu::location;

#[cfg(feature = "geo")]
use crate::scalar::rtree::RTreeIndexPlugin;
use crate::{
pb, pbold,
scalar::{
bitmap::BitmapIndexPlugin, bloomfilter::BloomFilterIndexPlugin, btree::BTreeIndexPlugin,
inverted::InvertedIndexPlugin, json::JsonIndexPlugin, label_list::LabelListIndexPlugin,
ngram::NGramIndexPlugin, registry::ScalarIndexPlugin, rtree::RTreeIndexPlugin,
zonemap::ZoneMapIndexPlugin,
ngram::NGramIndexPlugin, registry::ScalarIndexPlugin, zonemap::ZoneMapIndexPlugin,
},
};

Expand Down Expand Up @@ -62,6 +63,7 @@ impl IndexPluginRegistry {
registry.add_plugin::<pb::BloomFilterIndexDetails, BloomFilterIndexPlugin>();
registry.add_plugin::<pbold::InvertedIndexDetails, InvertedIndexPlugin>();
registry.add_plugin::<pb::JsonIndexDetails, JsonIndexPlugin>();
#[cfg(feature = "geo")]
registry.add_plugin::<pb::RTreeIndexDetails, RTreeIndexPlugin>();

let registry = Arc::new(registry);
Expand All @@ -77,9 +79,17 @@ impl IndexPluginRegistry {
self.plugins
.get(name)
.map(|plugin| plugin.as_ref())
.ok_or_else(|| Error::InvalidInput {
source: format!("No scalar index plugin found for name {}", name).into(),
location: location!(),
.ok_or_else(|| {
let hint = if name == "rtree" {
". The 'rtree' index requires the `geo` feature. \
Rebuild with `--features geo` to enable geospatial support"
} else {
""
};
Error::InvalidInput {
source: format!("No scalar index plugin found for name '{name}'{hint}").into(),
location: location!(),
}
})
}

Expand Down
4 changes: 4 additions & 0 deletions rust/lance-index/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub mod label_list;
pub mod lance_format;
pub mod ngram;
pub mod registry;
#[cfg(feature = "geo")]
pub mod rtree;
pub mod zoned;
pub mod zonemap;
Expand Down Expand Up @@ -736,19 +737,22 @@ impl AnyQuery for TokenQuery {
}
}

#[cfg(feature = "geo")]
#[derive(Debug, Clone, PartialEq)]
pub struct RelationQuery {
pub value: ScalarValue,
pub field: Field,
}

/// A query that a Geo index can satisfy
#[cfg(feature = "geo")]
#[derive(Debug, Clone, PartialEq)]
pub enum GeoQuery {
IntersectQuery(RelationQuery),
IsNull,
}

#[cfg(feature = "geo")]
impl AnyQuery for GeoQuery {
fn as_any(&self) -> &dyn Any {
self
Expand Down
9 changes: 7 additions & 2 deletions rust/lance-index/src/scalar/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ use datafusion_expr::{
use tokio::try_join;

use super::{
AnyQuery, BloomFilterQuery, GeoQuery, LabelListQuery, MetricsCollector, RelationQuery,
SargableQuery, ScalarIndex, SearchResult, TextQuery, TokenQuery,
AnyQuery, BloomFilterQuery, LabelListQuery, MetricsCollector, SargableQuery, ScalarIndex,
SearchResult, TextQuery, TokenQuery,
};
#[cfg(feature = "geo")]
use super::{GeoQuery, RelationQuery};
use lance_core::{
utils::mask::{NullableRowAddrMask, RowAddrMask},
Error, Result,
Expand Down Expand Up @@ -692,17 +694,20 @@ impl ScalarQueryParser for FtsQueryParser {
}

/// A parser for geo indices that handles spatial queries
#[cfg(feature = "geo")]
#[derive(Debug, Clone)]
pub struct GeoQueryParser {
index_name: String,
}

#[cfg(feature = "geo")]
impl GeoQueryParser {
pub fn new(index_name: String) -> Self {
Self { index_name }
}
}

#[cfg(feature = "geo")]
impl ScalarQueryParser for GeoQueryParser {
fn visit_between(
&self,
Expand Down
5 changes: 3 additions & 2 deletions rust/lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ lance-linalg = { workspace = true }
lance-index = { workspace = true }
lance-namespace = { workspace = true }
lance-table = { workspace = true }
lance-geo = { workspace = true }
lance-geo = { workspace = true, optional = true }
arrow-arith = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
Expand Down Expand Up @@ -119,7 +119,7 @@ geo-types = { workspace = true }
datafusion-substrait = { workspace = true }

[features]
default = ["aws", "azure", "gcp", "oss", "huggingface", "tencent"]
default = ["aws", "azure", "gcp", "oss", "huggingface", "tencent", "geo"]
fp16kernels = ["lance-linalg/fp16kernels"]
# Prevent dynamic linking of lzma, which comes from datafusion
cli = ["dep:clap", "lzma-sys/static"]
Expand All @@ -138,6 +138,7 @@ azure = ["lance-io/azure"]
oss = ["lance-io/oss"]
tencent = ["lance-io/tencent"]
huggingface = ["lance-io/huggingface"]
geo = ["dep:lance-geo", "lance-geo/geo", "lance-datafusion/geo", "lance-index/geo"]
# Enable slow integration tests (disabled by default in CI)
slow_tests = []

Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
mod dataset_aggregate;
mod dataset_common;
mod dataset_concurrency_store;
#[cfg(feature = "geo")]
mod dataset_geo;
mod dataset_index;
mod dataset_io;
Expand Down
31 changes: 28 additions & 3 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,22 @@ pub(crate) async fn remap_index(

let new_id = Uuid::new_v4();

let generic = dataset
let generic = match dataset
.open_generic_index(&field_path, &index_id.to_string(), &NoOpMetricsCollector)
.await?;
.await
{
Ok(g) => g,
Err(e) => {
log::warn!(
"Cannot open index '{}' on '{}': {}. \
Index will be dropped during compaction.",
index_id,
field_path,
e
);
return Ok(RemapResult::Drop);
}
};

let created_index = match generic.index_type() {
it if it.is_scalar() => {
Expand Down Expand Up @@ -1718,7 +1731,19 @@ impl DatasetIndexInternalExt for Dataset {
continue;
}

let plugin = index_details.get_plugin()?;
let plugin = match index_details.get_plugin() {
Ok(plugin) => plugin,
Err(e) => {
log::warn!(
"Skipping index '{}' on column '{}': {}. \
Queries on this column will fall back to a full scan.",
index.name,
field_path,
e
);
continue;
}
};
let query_parser = plugin.new_query_parser(index.name.clone(), &index_details.0);

if let Some(query_parser) = query_parser {
Expand Down
17 changes: 14 additions & 3 deletions rust/lance/src/index/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,21 @@ pub async fn merge_indices_with_unindexed_frags<'a>(
let field_path = dataset.schema().field_path(old_indices[0].fields[0])?;
let mut indices = Vec::with_capacity(old_indices.len());
for idx in old_indices {
let index = dataset
match dataset
.open_generic_index(&field_path, &idx.uuid.to_string(), &NoOpMetricsCollector)
.await?;
indices.push(index);
.await
{
Ok(index) => indices.push(index),
Err(e) => {
log::warn!(
"Cannot open index on column '{}': {}. \
Skipping index merge for this column.",
field_path,
e
);
return Ok(None);
}
}
}

if indices
Expand Down