Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 115 additions & 11 deletions rust/lance/src/dataset/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1935,13 +1935,17 @@ impl Transaction {
removed_indices,
} => {
final_fragments.extend(maybe_existing_fragments?.clone());
let removed_uuids = removed_indices
.iter()
.map(|old_index| old_index.uuid)
.collect::<HashSet<_>>();
let new_uuids = new_indices
.iter()
.map(|new_index| new_index.uuid)
.collect::<HashSet<_>>();
final_indices.retain(|existing_index| {
!new_indices
.iter()
.any(|new_index| new_index.name == existing_index.name)
&& !removed_indices
.iter()
.any(|old_index| old_index.uuid == existing_index.uuid)
!removed_uuids.contains(&existing_index.uuid)
&& !new_uuids.contains(&existing_index.uuid)
});
final_indices.extend(new_indices.clone());
}
Expand Down Expand Up @@ -3441,7 +3445,36 @@ fn merge_fragments_valid(manifest: &Manifest, new_fragments: &[Fragment]) -> Res
#[cfg(test)]
mod tests {
use super::*;
use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
use chrono::Utc;
use lance_core::datatypes::Schema as LanceSchema;
use lance_io::utils::CachedFileSize;
use std::sync::Arc;
use uuid::Uuid;

fn sample_manifest() -> Manifest {
let schema = ArrowSchema::new(vec![ArrowField::new("id", DataType::Int32, false)]);
Manifest::new(
LanceSchema::try_from(&schema).unwrap(),
Arc::new(vec![Fragment::new(0)]),
DataStorageFormat::new(LanceFileVersion::V2_0),
HashMap::new(),
)
}

fn sample_index_metadata(name: &str) -> IndexMetadata {
IndexMetadata {
uuid: Uuid::new_v4(),
fields: vec![0],
name: name.to_string(),
dataset_version: 0,
fragment_bitmap: Some([0].into_iter().collect()),
index_details: None,
index_version: 1,
created_at: Some(Utc::now()),
base_id: None,
}
}

#[test]
fn test_rewrite_fragments() {
Expand Down Expand Up @@ -3496,11 +3529,6 @@ mod tests {

#[test]
fn test_merge_fragments_valid() {
use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
use lance_core::datatypes::Schema as LanceSchema;
use lance_table::format::Manifest;
use std::sync::Arc;

// Create a simple schema for testing
let schema = ArrowSchema::new(vec![
ArrowField::new("id", DataType::Int32, false),
Expand Down Expand Up @@ -3577,6 +3605,82 @@ mod tests {
assert!(result.is_ok());
}

#[test]
fn test_create_index_build_manifest_keeps_unremoved_same_name_indices() {
let manifest = sample_manifest();
let first_index = sample_index_metadata("vector_idx");
let second_index = sample_index_metadata("vector_idx");
let third_index = sample_index_metadata("vector_idx");

let transaction = Transaction::new(
manifest.version,
Operation::CreateIndex {
new_indices: vec![third_index.clone()],
removed_indices: vec![second_index.clone()],
},
None,
);

let (_, final_indices) = transaction
.build_manifest(
Some(&manifest),
vec![first_index.clone(), second_index.clone()],
"txn",
&ManifestWriteConfig::default(),
)
.unwrap();

assert_eq!(final_indices.len(), 2);
assert!(final_indices.iter().any(|idx| idx.uuid == first_index.uuid));
assert!(final_indices.iter().any(|idx| idx.uuid == third_index.uuid));
assert!(
!final_indices
.iter()
.any(|idx| idx.uuid == second_index.uuid)
);
}

#[test]
fn test_create_index_build_manifest_deduplicates_relisted_indices_by_uuid() {
let manifest = sample_manifest();
let first_index = sample_index_metadata("vector_idx");
let second_index = sample_index_metadata("vector_idx");
let third_index = sample_index_metadata("vector_idx");

let transaction = Transaction::new(
manifest.version,
Operation::CreateIndex {
new_indices: vec![first_index.clone(), third_index.clone()],
removed_indices: vec![second_index.clone()],
},
None,
);

let (_, final_indices) = transaction
.build_manifest(
Some(&manifest),
vec![first_index.clone(), second_index.clone()],
"txn",
&ManifestWriteConfig::default(),
)
.unwrap();

assert_eq!(final_indices.len(), 2);
assert_eq!(
final_indices
.iter()
.filter(|idx| idx.uuid == first_index.uuid)
.count(),
1
);
assert!(final_indices.iter().any(|idx| idx.uuid == third_index.uuid));
assert!(
!final_indices
.iter()
.any(|idx| idx.uuid == second_index.uuid)
);
}

#[test]
fn test_remove_tombstoned_data_files() {
// Create a fragment with mixed data files: some normal, some fully tombstoned
Expand Down
7 changes: 0 additions & 7 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -900,13 +900,6 @@ impl DatasetIndexExt for Dataset {
base_id: None, // Mew merged index file locates in the cloned dataset.
};
removed_indices.extend(res.removed_indices.iter().map(|&idx| idx.clone()));
if deltas.len() > res.removed_indices.len() {
new_indices.extend(
deltas[0..(deltas.len() - res.removed_indices.len())]
.iter()
.map(|&idx| idx.clone()),
);
}
new_indices.push(new_idx);
}

Expand Down
130 changes: 116 additions & 14 deletions rust/lance/src/index/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,19 +171,24 @@ impl<'a> CreateIndexBuilder<'a> {
}
candidate
};
if let Some(idx) = indices.iter().find(|i| i.name == index_name) {
if idx.fields == [field.id] && !self.replace {
return Err(Error::index(format!(
"Index name '{index_name} already exists, \
please specify a different name or use replace=True"
)));
};
if idx.fields != [field.id] {
return Err(Error::index(format!(
"Index name '{index_name} already exists with different fields, \
please specify a different name"
)));
}
let existing_named_indices = indices
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly. These two checks happen at different stages and serve different purposes.

It's possible to have a helper function to make it more clear, do you want it?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, the two implementations are diff already, maybe we don't need to check fields as we don't allow indices with the same name but diff fields. Good to have a helper function to get consistent behavior

.iter()
.filter(|idx| idx.name == index_name)
.collect::<Vec<_>>();
if existing_named_indices
.iter()
.any(|idx| idx.fields != [field.id])
{
return Err(Error::index(format!(
"Index name '{index_name}' already exists with different fields, \
please specify a different name"
)));
}
if !existing_named_indices.is_empty() && !self.replace {
return Err(Error::index(format!(
"Index name '{index_name}' already exists, \
please specify a different name or use replace=True"
)));
Comment on lines +187 to +191
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean I will need replace=true to add a uuid to an existing index?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, our current API didn't support adding a UUID to an existing index, it's our next step. We'll need a new public API for that.

}

let index_id = match &self.index_uuid {
Expand Down Expand Up @@ -435,11 +440,22 @@ impl<'a> CreateIndexBuilder<'a> {
async fn execute(mut self) -> Result<IndexMetadata> {
let new_idx = self.execute_uncommitted().await?;
let index_uuid = new_idx.uuid;
let removed_indices = if self.replace {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean if we need to manually commit a CreateIndex transaction, we need to check replace everywhere?

this seems a breaking change, are we replying on this somewhere (distributed indexing?)?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. With this change, Operation::CreateIndex becomes an explicit add/remove UUID set operation instead of relying on implicit name-based replacement during manifest merge.

That means callers which want replace semantics need to populate removed_indices themselves. The additive/manual commit paths (including distributed indexing’s commit_existing_index(...)) are not relying on the old behavior. They are registering already-built indices and should continue to use removed_indices: vec![].

self.dataset
.load_indices()
.await?
.iter()
.filter(|idx| idx.name == new_idx.name)
.cloned()
.collect()
} else {
vec![]
};
let transaction = Transaction::new(
new_idx.dataset_version,
Operation::CreateIndex {
new_indices: vec![new_idx],
removed_indices: vec![],
removed_indices,
},
None,
);
Expand Down Expand Up @@ -628,6 +644,92 @@ mod tests {
assert!(err.to_string().contains("already exists"));
}

#[tokio::test]
async fn test_concurrent_create_index_same_name_returns_retryable_conflict() {
let tmpdir = TempStrDir::default();
let dataset_uri = format!("file://{}", tmpdir.as_str());
let reader = gen_batch()
.col("a", lance_datagen::array::step::<Int32Type>())
.into_reader_rows(
lance_datagen::RowCount::from(100),
lance_datagen::BatchCount::from(1),
);
let dataset = Dataset::write(reader, &dataset_uri, None).await.unwrap();

let params = ScalarIndexParams::for_builtin(lance_index::scalar::BuiltinIndexType::BTree);
let read_version = dataset.manifest.version;
let mut reader1 = dataset.checkout_version(read_version).await.unwrap();
let mut reader2 = dataset.checkout_version(read_version).await.unwrap();

let first = CreateIndexBuilder::new(&mut reader1, &["a"], IndexType::BTree, &params)
.name("a_idx".to_string())
.execute()
.await;
assert!(
first.is_ok(),
"first create_index should succeed: {first:?}"
);

let second = CreateIndexBuilder::new(&mut reader2, &["a"], IndexType::BTree, &params)
.name("a_idx".to_string())
.execute()
.await;
assert!(
matches!(second, Err(Error::RetryableCommitConflict { .. })),
"second concurrent create_index should be retryable, got {second:?}"
);

let latest_indices = reader1.load_indices_by_name("a_idx").await.unwrap();
assert_eq!(latest_indices.len(), 1);
}

#[tokio::test]
async fn test_concurrent_replace_index_same_name_returns_retryable_conflict() {
let tmpdir = TempStrDir::default();
let dataset_uri = format!("file://{}", tmpdir.as_str());
let reader = gen_batch()
.col("a", lance_datagen::array::step::<Int32Type>())
.into_reader_rows(
lance_datagen::RowCount::from(100),
lance_datagen::BatchCount::from(1),
);
let mut dataset = Dataset::write(reader, &dataset_uri, None).await.unwrap();

let params = ScalarIndexParams::for_builtin(lance_index::scalar::BuiltinIndexType::BTree);
let original = CreateIndexBuilder::new(&mut dataset, &["a"], IndexType::BTree, &params)
.name("a_idx".to_string())
.execute()
.await
.unwrap();

let read_version = dataset.manifest.version;
let mut reader1 = dataset.checkout_version(read_version).await.unwrap();
let mut reader2 = dataset.checkout_version(read_version).await.unwrap();

let replacement = CreateIndexBuilder::new(&mut reader1, &["a"], IndexType::BTree, &params)
.name("a_idx".to_string())
.replace(true)
.execute()
.await
.unwrap();
assert_ne!(replacement.uuid, original.uuid);

let second = CreateIndexBuilder::new(&mut reader2, &["a"], IndexType::BTree, &params)
.name("a_idx".to_string())
.replace(true)
.execute()
.await;
assert!(
matches!(second, Err(Error::RetryableCommitConflict { .. })),
"second concurrent replace should be retryable, got {second:?}"
);

let latest_indices = reader1.load_indices_by_name("a_idx").await.unwrap();
assert_eq!(latest_indices.len(), 1);
assert_eq!(latest_indices[0].uuid, replacement.uuid);
assert_ne!(latest_indices[0].uuid, original.uuid);
}

// Helper function to create test data with text field suitable for inverted index
fn create_text_batch(start: i32, end: i32) -> RecordBatch {
let schema = Arc::new(ArrowSchema::new(vec![
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/index/vector/ivf/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2367,6 +2367,7 @@ mod tests {
Some((dataset.clone(), vectors.clone())),
)
.await;
dataset.checkout_latest().await.unwrap();
// retest with v3 params on the same dataset
test_index(
v3_params,
Expand Down
20 changes: 11 additions & 9 deletions rust/lance/src/io/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1216,9 +1216,16 @@ mod tests {
.collect();

let results = join_all(futures).await;
for result in results {
assert!(matches!(result, Ok(Ok(_))), "{:?}", result);
}
let success_count = results
.iter()
.filter(|result| matches!(result, Ok(Ok(_))))
.count();
let retryable_count = results
.iter()
.filter(|result| matches!(result, Ok(Err(Error::RetryableCommitConflict { .. }))))
.count();
assert_eq!(success_count, 2, "{results:?}");
assert_eq!(retryable_count, 1, "{results:?}");

// Validate that each version has the anticipated number of indexes
let dataset = dataset.checkout_version(1).await.unwrap();
Expand All @@ -1241,12 +1248,7 @@ mod tests {
assert_eq!(indices[0].fields, vec![0]);
}

let dataset = dataset.checkout_version(4).await.unwrap();
let indices = dataset.load_indices().await.unwrap();
assert_eq!(indices.len(), 2);
let mut fields: Vec<i32> = indices.iter().flat_map(|i| i.fields.clone()).collect();
fields.sort();
assert_eq!(fields, vec![0, 1]);
assert!(dataset.checkout_version(4).await.is_err());
}

#[tokio::test]
Expand Down
Loading
Loading