diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index c94e4e39364..689f63c659b 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -1935,13 +1935,17 @@ impl Transaction { removed_indices, } => { final_fragments.extend(maybe_existing_fragments?.clone()); + let removed_uuids = removed_indices + .iter() + .map(|old_index| old_index.uuid) + .collect::>(); + let new_uuids = new_indices + .iter() + .map(|new_index| new_index.uuid) + .collect::>(); final_indices.retain(|existing_index| { - !new_indices - .iter() - .any(|new_index| new_index.name == existing_index.name) - && !removed_indices - .iter() - .any(|old_index| old_index.uuid == existing_index.uuid) + !removed_uuids.contains(&existing_index.uuid) + && !new_uuids.contains(&existing_index.uuid) }); final_indices.extend(new_indices.clone()); } @@ -3441,7 +3445,36 @@ fn merge_fragments_valid(manifest: &Manifest, new_fragments: &[Fragment]) -> Res #[cfg(test)] mod tests { use super::*; + use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use chrono::Utc; + use lance_core::datatypes::Schema as LanceSchema; use lance_io::utils::CachedFileSize; + use std::sync::Arc; + use uuid::Uuid; + + fn sample_manifest() -> Manifest { + let schema = ArrowSchema::new(vec![ArrowField::new("id", DataType::Int32, false)]); + Manifest::new( + LanceSchema::try_from(&schema).unwrap(), + Arc::new(vec![Fragment::new(0)]), + DataStorageFormat::new(LanceFileVersion::V2_0), + HashMap::new(), + ) + } + + fn sample_index_metadata(name: &str) -> IndexMetadata { + IndexMetadata { + uuid: Uuid::new_v4(), + fields: vec![0], + name: name.to_string(), + dataset_version: 0, + fragment_bitmap: Some([0].into_iter().collect()), + index_details: None, + index_version: 1, + created_at: Some(Utc::now()), + base_id: None, + } + } #[test] fn test_rewrite_fragments() { @@ -3496,11 +3529,6 @@ mod tests { #[test] fn test_merge_fragments_valid() { - use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; - use lance_core::datatypes::Schema as LanceSchema; - use lance_table::format::Manifest; - use std::sync::Arc; - // Create a simple schema for testing let schema = ArrowSchema::new(vec![ ArrowField::new("id", DataType::Int32, false), @@ -3577,6 +3605,82 @@ mod tests { assert!(result.is_ok()); } + #[test] + fn test_create_index_build_manifest_keeps_unremoved_same_name_indices() { + let manifest = sample_manifest(); + let first_index = sample_index_metadata("vector_idx"); + let second_index = sample_index_metadata("vector_idx"); + let third_index = sample_index_metadata("vector_idx"); + + let transaction = Transaction::new( + manifest.version, + Operation::CreateIndex { + new_indices: vec![third_index.clone()], + removed_indices: vec![second_index.clone()], + }, + None, + ); + + let (_, final_indices) = transaction + .build_manifest( + Some(&manifest), + vec![first_index.clone(), second_index.clone()], + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); + + assert_eq!(final_indices.len(), 2); + assert!(final_indices.iter().any(|idx| idx.uuid == first_index.uuid)); + assert!(final_indices.iter().any(|idx| idx.uuid == third_index.uuid)); + assert!( + !final_indices + .iter() + .any(|idx| idx.uuid == second_index.uuid) + ); + } + + #[test] + fn test_create_index_build_manifest_deduplicates_relisted_indices_by_uuid() { + let manifest = sample_manifest(); + let first_index = sample_index_metadata("vector_idx"); + let second_index = sample_index_metadata("vector_idx"); + let third_index = sample_index_metadata("vector_idx"); + + let transaction = Transaction::new( + manifest.version, + Operation::CreateIndex { + new_indices: vec![first_index.clone(), third_index.clone()], + removed_indices: vec![second_index.clone()], + }, + None, + ); + + let (_, final_indices) = transaction + .build_manifest( + Some(&manifest), + vec![first_index.clone(), second_index.clone()], + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); + + assert_eq!(final_indices.len(), 2); + assert_eq!( + final_indices + .iter() + .filter(|idx| idx.uuid == first_index.uuid) + .count(), + 1 + ); + assert!(final_indices.iter().any(|idx| idx.uuid == third_index.uuid)); + assert!( + !final_indices + .iter() + .any(|idx| idx.uuid == second_index.uuid) + ); + } + #[test] fn test_remove_tombstoned_data_files() { // Create a fragment with mixed data files: some normal, some fully tombstoned diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index f4cd31e44b8..ab3876b7977 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -900,13 +900,6 @@ impl DatasetIndexExt for Dataset { base_id: None, // Mew merged index file locates in the cloned dataset. }; removed_indices.extend(res.removed_indices.iter().map(|&idx| idx.clone())); - if deltas.len() > res.removed_indices.len() { - new_indices.extend( - deltas[0..(deltas.len() - res.removed_indices.len())] - .iter() - .map(|&idx| idx.clone()), - ); - } new_indices.push(new_idx); } diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index be76b9a1878..d5c11c84694 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -171,19 +171,24 @@ impl<'a> CreateIndexBuilder<'a> { } candidate }; - if let Some(idx) = indices.iter().find(|i| i.name == index_name) { - if idx.fields == [field.id] && !self.replace { - return Err(Error::index(format!( - "Index name '{index_name} already exists, \ - please specify a different name or use replace=True" - ))); - }; - if idx.fields != [field.id] { - return Err(Error::index(format!( - "Index name '{index_name} already exists with different fields, \ - please specify a different name" - ))); - } + let existing_named_indices = indices + .iter() + .filter(|idx| idx.name == index_name) + .collect::>(); + if existing_named_indices + .iter() + .any(|idx| idx.fields != [field.id]) + { + return Err(Error::index(format!( + "Index name '{index_name}' already exists with different fields, \ + please specify a different name" + ))); + } + if !existing_named_indices.is_empty() && !self.replace { + return Err(Error::index(format!( + "Index name '{index_name}' already exists, \ + please specify a different name or use replace=True" + ))); } let index_id = match &self.index_uuid { @@ -435,11 +440,22 @@ impl<'a> CreateIndexBuilder<'a> { async fn execute(mut self) -> Result { let new_idx = self.execute_uncommitted().await?; let index_uuid = new_idx.uuid; + let removed_indices = if self.replace { + self.dataset + .load_indices() + .await? + .iter() + .filter(|idx| idx.name == new_idx.name) + .cloned() + .collect() + } else { + vec![] + }; let transaction = Transaction::new( new_idx.dataset_version, Operation::CreateIndex { new_indices: vec![new_idx], - removed_indices: vec![], + removed_indices, }, None, ); @@ -628,6 +644,92 @@ mod tests { assert!(err.to_string().contains("already exists")); } + #[tokio::test] + async fn test_concurrent_create_index_same_name_returns_retryable_conflict() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + let reader = gen_batch() + .col("a", lance_datagen::array::step::()) + .into_reader_rows( + lance_datagen::RowCount::from(100), + lance_datagen::BatchCount::from(1), + ); + let dataset = Dataset::write(reader, &dataset_uri, None).await.unwrap(); + + let params = ScalarIndexParams::for_builtin(lance_index::scalar::BuiltinIndexType::BTree); + let read_version = dataset.manifest.version; + let mut reader1 = dataset.checkout_version(read_version).await.unwrap(); + let mut reader2 = dataset.checkout_version(read_version).await.unwrap(); + + let first = CreateIndexBuilder::new(&mut reader1, &["a"], IndexType::BTree, ¶ms) + .name("a_idx".to_string()) + .execute() + .await; + assert!( + first.is_ok(), + "first create_index should succeed: {first:?}" + ); + + let second = CreateIndexBuilder::new(&mut reader2, &["a"], IndexType::BTree, ¶ms) + .name("a_idx".to_string()) + .execute() + .await; + assert!( + matches!(second, Err(Error::RetryableCommitConflict { .. })), + "second concurrent create_index should be retryable, got {second:?}" + ); + + let latest_indices = reader1.load_indices_by_name("a_idx").await.unwrap(); + assert_eq!(latest_indices.len(), 1); + } + + #[tokio::test] + async fn test_concurrent_replace_index_same_name_returns_retryable_conflict() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + let reader = gen_batch() + .col("a", lance_datagen::array::step::()) + .into_reader_rows( + lance_datagen::RowCount::from(100), + lance_datagen::BatchCount::from(1), + ); + let mut dataset = Dataset::write(reader, &dataset_uri, None).await.unwrap(); + + let params = ScalarIndexParams::for_builtin(lance_index::scalar::BuiltinIndexType::BTree); + let original = CreateIndexBuilder::new(&mut dataset, &["a"], IndexType::BTree, ¶ms) + .name("a_idx".to_string()) + .execute() + .await + .unwrap(); + + let read_version = dataset.manifest.version; + let mut reader1 = dataset.checkout_version(read_version).await.unwrap(); + let mut reader2 = dataset.checkout_version(read_version).await.unwrap(); + + let replacement = CreateIndexBuilder::new(&mut reader1, &["a"], IndexType::BTree, ¶ms) + .name("a_idx".to_string()) + .replace(true) + .execute() + .await + .unwrap(); + assert_ne!(replacement.uuid, original.uuid); + + let second = CreateIndexBuilder::new(&mut reader2, &["a"], IndexType::BTree, ¶ms) + .name("a_idx".to_string()) + .replace(true) + .execute() + .await; + assert!( + matches!(second, Err(Error::RetryableCommitConflict { .. })), + "second concurrent replace should be retryable, got {second:?}" + ); + + let latest_indices = reader1.load_indices_by_name("a_idx").await.unwrap(); + assert_eq!(latest_indices.len(), 1); + assert_eq!(latest_indices[0].uuid, replacement.uuid); + assert_ne!(latest_indices[0].uuid, original.uuid); + } + // Helper function to create test data with text field suitable for inverted index fn create_text_batch(start: i32, end: i32) -> RecordBatch { let schema = Arc::new(ArrowSchema::new(vec![ diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 96be37e18ed..34e33e42e52 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -2367,6 +2367,7 @@ mod tests { Some((dataset.clone(), vectors.clone())), ) .await; + dataset.checkout_latest().await.unwrap(); // retest with v3 params on the same dataset test_index( v3_params, diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index ebab7eb6b4d..7dbc472e6b4 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -1216,9 +1216,16 @@ mod tests { .collect(); let results = join_all(futures).await; - for result in results { - assert!(matches!(result, Ok(Ok(_))), "{:?}", result); - } + let success_count = results + .iter() + .filter(|result| matches!(result, Ok(Ok(_)))) + .count(); + let retryable_count = results + .iter() + .filter(|result| matches!(result, Ok(Err(Error::RetryableCommitConflict { .. })))) + .count(); + assert_eq!(success_count, 2, "{results:?}"); + assert_eq!(retryable_count, 1, "{results:?}"); // Validate that each version has the anticipated number of indexes let dataset = dataset.checkout_version(1).await.unwrap(); @@ -1241,12 +1248,7 @@ mod tests { assert_eq!(indices[0].fields, vec![0]); } - let dataset = dataset.checkout_version(4).await.unwrap(); - let indices = dataset.load_indices().await.unwrap(); - assert_eq!(indices.len(), 2); - let mut fields: Vec = indices.iter().flat_map(|i| i.fields.clone()).collect(); - fields.sort(); - assert_eq!(fields, vec![0, 1]); + assert!(dataset.checkout_version(4).await.is_err()); } #[tokio::test] diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index 450bd03a964..da044bac43d 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -14,6 +14,7 @@ use lance_core::{ Error, Result, utils::{deletion::DeletionVector, mask::RowAddrTreeMap}, }; +use lance_index::DatasetIndexExt; use lance_index::frag_reuse::FRAG_REUSE_INDEX_NAME; use lance_index::mem_wal::{MEM_WAL_INDEX_NAME, MergedGeneration}; use lance_table::format::IndexMetadata; @@ -501,8 +502,6 @@ impl<'a> TransactionRebase<'a> { Operation::Append { .. } | Operation::Clone { .. } | Operation::UpdateBases { .. } => Ok(()), - // Indices are identified by UUIDs, so they shouldn't conflict. - // unless it is the same frag reuse index or MemWAL index Operation::CreateIndex { new_indices: created_indices, .. @@ -518,9 +517,20 @@ impl<'a> TransactionRebase<'a> { let other_has_mem_wal = created_indices .iter() .any(|idx| idx.name == MEM_WAL_INDEX_NAME); + let has_regular_name_conflict = new_indices + .iter() + .filter(|idx| { + idx.name != FRAG_REUSE_INDEX_NAME && idx.name != MEM_WAL_INDEX_NAME + }) + .any(|new_index| { + created_indices + .iter() + .any(|created_index| created_index.name == new_index.name) + }); if (self_has_frag_reuse && other_has_frag_reuse) || (self_has_mem_wal && other_has_mem_wal) + || has_regular_name_conflict { Err(self.retryable_conflict_err(other_transaction, other_version)) } else { @@ -1440,7 +1450,11 @@ impl<'a> TransactionRebase<'a> { } async fn finish_create_index(mut self, dataset: &Dataset) -> Result { - if let Operation::CreateIndex { new_indices, .. } = &mut self.transaction.operation { + if let Operation::CreateIndex { + new_indices, + removed_indices, + } = &mut self.transaction.operation + { // Handle FRAG_REUSE_INDEX rebasing let has_frag_reuse = new_indices .iter() @@ -1522,6 +1536,25 @@ impl<'a> TransactionRebase<'a> { new_indices.push(new_meta); } + for singleton_name in [FRAG_REUSE_INDEX_NAME, MEM_WAL_INDEX_NAME] { + if new_indices.iter().any(|idx| idx.name == singleton_name) { + for existing_idx in dataset + .load_indices() + .await? + .iter() + .filter(|idx| idx.name == singleton_name) + .cloned() + { + if !removed_indices + .iter() + .any(|removed_idx| removed_idx.uuid == existing_idx.uuid) + { + removed_indices.push(existing_idx); + } + } + } + } + Ok(self.transaction) } else { Err(wrong_operation_err(&self.transaction.operation)) @@ -2296,10 +2329,10 @@ mod tests { new_indices: vec![index0.clone()], removed_indices: vec![index0], }, - // Will only conflict with operations that modify row ids. + // Conflicts with row-id-changing operations and same-name CreateIndex. [ Compatible, // append - Compatible, // create index + Retryable, // create index Compatible, // delete Compatible, // merge NotCompatible, // overwrite @@ -2629,6 +2662,102 @@ mod tests { } } + #[test] + fn test_create_index_conflicts_only_on_same_name() { + let index0 = IndexMetadata { + uuid: uuid::Uuid::new_v4(), + name: "test".to_string(), + fields: vec![0], + dataset_version: 1, + fragment_bitmap: None, + index_details: None, + index_version: 0, + created_at: None, + base_id: None, + }; + let index1 = IndexMetadata { + uuid: uuid::Uuid::new_v4(), + name: "other".to_string(), + ..index0.clone() + }; + + let txn = Transaction::new( + 0, + Operation::CreateIndex { + new_indices: vec![index0.clone()], + removed_indices: vec![], + }, + None, + ); + let mut rebase = TransactionRebase { + transaction: txn, + initial_fragments: HashMap::new(), + modified_fragment_ids: HashSet::new(), + affected_rows: None, + conflicting_frag_reuse_indices: Vec::new(), + conflicting_mem_wal_merged_gens: Vec::new(), + }; + + let same_name = Transaction::new( + 0, + Operation::CreateIndex { + new_indices: vec![IndexMetadata { + uuid: uuid::Uuid::new_v4(), + ..index0 + }], + removed_indices: vec![], + }, + None, + ); + let different_name = Transaction::new( + 0, + Operation::CreateIndex { + new_indices: vec![index1], + removed_indices: vec![], + }, + None, + ); + + let same_name_result = rebase.check_txn(&same_name, 1); + assert!( + matches!(same_name_result, Err(Error::RetryableCommitConflict { .. })), + "Expected retryable conflict for same-name CreateIndex, got {:?}", + same_name_result + ); + + let mut rebase = TransactionRebase { + transaction: Transaction::new( + 0, + Operation::CreateIndex { + new_indices: vec![IndexMetadata { + uuid: uuid::Uuid::new_v4(), + name: "test".to_string(), + fields: vec![0], + dataset_version: 1, + fragment_bitmap: None, + index_details: None, + index_version: 0, + created_at: None, + base_id: None, + }], + removed_indices: vec![], + }, + None, + ), + initial_fragments: HashMap::new(), + modified_fragment_ids: HashSet::new(), + affected_rows: None, + conflicting_frag_reuse_indices: Vec::new(), + conflicting_mem_wal_merged_gens: Vec::new(), + }; + let different_name_result = rebase.check_txn(&different_name, 1); + assert!( + different_name_result.is_ok(), + "Expected compatibility for different-name CreateIndex, got {:?}", + different_name_result + ); + } + #[tokio::test] async fn test_add_bases_non_conflicting() { let dataset = test_dataset(10, 2).await;