From 1b3a18b1a748ff680e3fcd2b68ced6390e82ce29 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 24 Feb 2026 15:54:09 -0800 Subject: [PATCH 01/11] commit --- .../org/lance/NamespaceIntegrationTest.java | 344 ++++++++++++ .../namespace/DirectoryNamespaceTest.java | 246 ++++++++ python/python/tests/test_namespace_dir.py | 233 +++++++- .../tests/test_namespace_integration.py | 288 ++++++++++ rust/lance-core/src/datatypes.rs | 1 + rust/lance-namespace-impls/src/dir.rs | 15 + .../lance-namespace-impls/src/dir/manifest.rs | 523 ++++++++++++++++-- rust/lance-namespace/src/error.rs | 12 +- rust/lance/src/dataset/write/merge_insert.rs | 14 + 9 files changed, 1616 insertions(+), 60 deletions(-) diff --git a/java/src/test/java/org/lance/NamespaceIntegrationTest.java b/java/src/test/java/org/lance/NamespaceIntegrationTest.java index 2d6f8ab1443..975bf4d34ff 100644 --- a/java/src/test/java/org/lance/NamespaceIntegrationTest.java +++ b/java/src/test/java/org/lance/NamespaceIntegrationTest.java @@ -18,23 +18,32 @@ import org.lance.namespace.LanceNamespaceStorageOptionsProvider; import org.lance.namespace.model.CreateEmptyTableRequest; import org.lance.namespace.model.CreateEmptyTableResponse; +import org.lance.namespace.model.CreateTableRequest; +import org.lance.namespace.model.CreateTableResponse; import org.lance.namespace.model.DeclareTableRequest; import org.lance.namespace.model.DeclareTableResponse; import org.lance.namespace.model.DescribeTableRequest; import org.lance.namespace.model.DescribeTableResponse; +import org.lance.namespace.model.DropTableRequest; +import org.lance.namespace.model.DropTableResponse; +import org.lance.namespace.model.TableExistsRequest; import org.lance.operation.Append; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; @@ -47,6 +56,7 @@ import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.S3Object; +import java.io.ByteArrayOutputStream; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; @@ -55,9 +65,16 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Integration tests for Lance with S3 and credential refresh using StorageOptionsProvider. @@ -79,6 +96,21 @@ public class NamespaceIntegrationTest { private static final String BUCKET_NAME = "lance-namespace-integtest-java"; private static S3Client s3Client; + private BufferAllocator testAllocator; + private String testPrefix; + + @BeforeEach + void setUpTest() { + testAllocator = new RootAllocator(Long.MAX_VALUE); + testPrefix = "test-" + UUID.randomUUID().toString().substring(0, 8); + } + + @AfterEach + void tearDownTest() { + if (testAllocator != null) { + testAllocator.close(); + } + } @BeforeAll static void setup() { @@ -1438,4 +1470,316 @@ void testTransactionCommitWithNamespace() throws Exception { } } } + + private Map createDirectoryNamespaceS3Config() { + Map config = new HashMap<>(); + config.put("root", "s3://" + BUCKET_NAME + "/" + testPrefix); + config.put("storage.access_key_id", ACCESS_KEY); + config.put("storage.secret_access_key", SECRET_KEY); + config.put("storage.endpoint", ENDPOINT_URL); + config.put("storage.region", REGION); + config.put("storage.allow_http", "true"); + config.put("storage.virtual_hosted_style_request", "false"); + config.put("inline_optimization_enabled", "false"); + // Very high retry count to guarantee all concurrent operations succeed + config.put("commit_retries", "2147483647"); + return config; + } + + private byte[] createTestTableData() throws Exception { + Schema schema = + new Schema( + Arrays.asList( + new Field("id", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("name", FieldType.nullable(new ArrowType.Utf8()), null), + new Field("age", FieldType.nullable(new ArrowType.Int(32, true)), null))); + + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, testAllocator)) { + IntVector idVector = (IntVector) root.getVector("id"); + VarCharVector nameVector = (VarCharVector) root.getVector("name"); + IntVector ageVector = (IntVector) root.getVector("age"); + + idVector.allocateNew(3); + nameVector.allocateNew(3); + ageVector.allocateNew(3); + + idVector.set(0, 1); + nameVector.set(0, "Alice".getBytes()); + ageVector.set(0, 30); + + idVector.set(1, 2); + nameVector.set(1, "Bob".getBytes()); + ageVector.set(1, 25); + + idVector.set(2, 3); + nameVector.set(2, "Charlie".getBytes()); + ageVector.set(2, 35); + + idVector.setValueCount(3); + nameVector.setValueCount(3); + ageVector.setValueCount(3); + root.setRowCount(3); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (ArrowStreamWriter writer = new ArrowStreamWriter(root, null, out)) { + writer.writeBatch(); + } + return out.toByteArray(); + } + } + + @Test + void testBasicCreateAndDropOnS3() throws Exception { + DirectoryNamespace namespace = new DirectoryNamespace(); + namespace.initialize(createDirectoryNamespaceS3Config(), testAllocator); + + try { + String tableName = "basic_test_table"; + List tableId = Arrays.asList("test_ns", tableName); + byte[] tableData = createTestTableData(); + + CreateTableRequest createReq = new CreateTableRequest().id(tableId); + CreateTableResponse createResp = namespace.createTable(createReq, tableData); + assertNotNull(createResp); + assertNotNull(createResp.getLocation()); + + DropTableRequest dropReq = new DropTableRequest().id(tableId); + DropTableResponse dropResp = namespace.dropTable(dropReq); + assertNotNull(dropResp); + + TableExistsRequest existsReq = new TableExistsRequest().id(tableId); + assertThrows(RuntimeException.class, () -> namespace.tableExists(existsReq)); + } finally { + namespace.close(); + } + } + + @Test + void testConcurrentCreateAndDropWithSingleInstanceOnS3() throws Exception { + DirectoryNamespace namespace = new DirectoryNamespace(); + namespace.initialize(createDirectoryNamespaceS3Config(), testAllocator); + + try { + int numTables = 10; + ExecutorService executor = Executors.newFixedThreadPool(numTables); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(numTables); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger failCount = new AtomicInteger(0); + + for (int i = 0; i < numTables; i++) { + final int tableIndex = i; + executor.submit( + () -> { + try { + startLatch.await(); + + String tableName = "s3_concurrent_table_" + tableIndex; + List tableId = Arrays.asList("test_ns", tableName); + byte[] tableData = createTestTableData(); + + CreateTableRequest createReq = new CreateTableRequest().id(tableId); + namespace.createTable(createReq, tableData); + + DropTableRequest dropReq = new DropTableRequest().id(tableId); + namespace.dropTable(dropReq); + + successCount.incrementAndGet(); + } catch (Exception e) { + failCount.incrementAndGet(); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + assertTrue(doneLatch.await(120, TimeUnit.SECONDS), "Timed out waiting for tasks to complete"); + + executor.shutdown(); + assertTrue(executor.awaitTermination(30, TimeUnit.SECONDS)); + + assertEquals(numTables, successCount.get(), "All tasks should succeed"); + assertEquals(0, failCount.get(), "No tasks should fail"); + } finally { + namespace.close(); + } + } + + @Test + void testConcurrentCreateAndDropWithMultipleInstancesOnS3() throws Exception { + int numTables = 10; + ExecutorService executor = Executors.newFixedThreadPool(numTables); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(numTables); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger failCount = new AtomicInteger(0); + List namespaces = new ArrayList<>(); + + Map baseConfig = createDirectoryNamespaceS3Config(); + + for (int i = 0; i < numTables; i++) { + final int tableIndex = i; + executor.submit( + () -> { + DirectoryNamespace localNs = null; + try { + startLatch.await(); + + localNs = new DirectoryNamespace(); + localNs.initialize(new HashMap<>(baseConfig), testAllocator); + + synchronized (namespaces) { + namespaces.add(localNs); + } + + String tableName = "s3_multi_ns_table_" + tableIndex; + List tableId = Arrays.asList("test_ns", tableName); + byte[] tableData = createTestTableData(); + + CreateTableRequest createReq = new CreateTableRequest().id(tableId); + localNs.createTable(createReq, tableData); + + DropTableRequest dropReq = new DropTableRequest().id(tableId); + localNs.dropTable(dropReq); + + successCount.incrementAndGet(); + } catch (Exception e) { + failCount.incrementAndGet(); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + assertTrue(doneLatch.await(120, TimeUnit.SECONDS), "Timed out waiting for tasks to complete"); + + executor.shutdown(); + assertTrue(executor.awaitTermination(30, TimeUnit.SECONDS)); + + for (DirectoryNamespace ns : namespaces) { + try { + ns.close(); + } catch (Exception e) { + // Ignore + } + } + + assertEquals(numTables, successCount.get(), "All tasks should succeed"); + assertEquals(0, failCount.get(), "No tasks should fail"); + } + + @Test + void testConcurrentCreateThenDropFromDifferentInstanceOnS3() throws Exception { + int numTables = 10; + Map baseConfig = createDirectoryNamespaceS3Config(); + + // First, create all tables using separate namespace instances + ExecutorService createExecutor = Executors.newFixedThreadPool(numTables); + CountDownLatch createStartLatch = new CountDownLatch(1); + CountDownLatch createDoneLatch = new CountDownLatch(numTables); + AtomicInteger createSuccessCount = new AtomicInteger(0); + List createNamespaces = new ArrayList<>(); + + for (int i = 0; i < numTables; i++) { + final int tableIndex = i; + createExecutor.submit( + () -> { + DirectoryNamespace localNs = null; + try { + createStartLatch.await(); + + localNs = new DirectoryNamespace(); + localNs.initialize(new HashMap<>(baseConfig), testAllocator); + + synchronized (createNamespaces) { + createNamespaces.add(localNs); + } + + String tableName = "s3_cross_instance_table_" + tableIndex; + List tableId = Arrays.asList("test_ns", tableName); + byte[] tableData = createTestTableData(); + + CreateTableRequest createReq = new CreateTableRequest().id(tableId); + localNs.createTable(createReq, tableData); + + createSuccessCount.incrementAndGet(); + } catch (Exception e) { + // Ignore + } finally { + createDoneLatch.countDown(); + } + }); + } + + createStartLatch.countDown(); + assertTrue(createDoneLatch.await(120, TimeUnit.SECONDS), "Timed out waiting for creates"); + createExecutor.shutdown(); + + assertEquals(numTables, createSuccessCount.get(), "All creates should succeed"); + + // Close create namespaces + for (DirectoryNamespace ns : createNamespaces) { + try { + ns.close(); + } catch (Exception e) { + // Ignore + } + } + + // Now drop all tables using NEW namespace instances + ExecutorService dropExecutor = Executors.newFixedThreadPool(numTables); + CountDownLatch dropStartLatch = new CountDownLatch(1); + CountDownLatch dropDoneLatch = new CountDownLatch(numTables); + AtomicInteger dropSuccessCount = new AtomicInteger(0); + AtomicInteger dropFailCount = new AtomicInteger(0); + List dropNamespaces = new ArrayList<>(); + + for (int i = 0; i < numTables; i++) { + final int tableIndex = i; + dropExecutor.submit( + () -> { + DirectoryNamespace localNs = null; + try { + dropStartLatch.await(); + + localNs = new DirectoryNamespace(); + localNs.initialize(new HashMap<>(baseConfig), testAllocator); + + synchronized (dropNamespaces) { + dropNamespaces.add(localNs); + } + + String tableName = "s3_cross_instance_table_" + tableIndex; + List tableId = Arrays.asList("test_ns", tableName); + + DropTableRequest dropReq = new DropTableRequest().id(tableId); + localNs.dropTable(dropReq); + + dropSuccessCount.incrementAndGet(); + } catch (Exception e) { + dropFailCount.incrementAndGet(); + } finally { + dropDoneLatch.countDown(); + } + }); + } + + dropStartLatch.countDown(); + assertTrue(dropDoneLatch.await(120, TimeUnit.SECONDS), "Timed out waiting for drops"); + dropExecutor.shutdown(); + + // Close drop namespaces + for (DirectoryNamespace ns : dropNamespaces) { + try { + ns.close(); + } catch (Exception e) { + // Ignore + } + } + + assertEquals(numTables, dropSuccessCount.get(), "All drops should succeed"); + assertEquals(0, dropFailCount.get(), "No drops should fail"); + } } diff --git a/java/src/test/java/org/lance/namespace/DirectoryNamespaceTest.java b/java/src/test/java/org/lance/namespace/DirectoryNamespaceTest.java index 5850f57453f..87885ff594f 100644 --- a/java/src/test/java/org/lance/namespace/DirectoryNamespaceTest.java +++ b/java/src/test/java/org/lance/namespace/DirectoryNamespaceTest.java @@ -38,9 +38,15 @@ import java.io.ByteArrayOutputStream; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.jupiter.api.Assertions.*; @@ -688,4 +694,244 @@ public VectorSchemaRoot getVectorSchemaRoot() { namespace.close(); } } + + @Test + void testConcurrentCreateAndDropWithSingleInstance() throws Exception { + int numTables = 10; + ExecutorService executor = Executors.newFixedThreadPool(numTables); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(numTables); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger failCount = new AtomicInteger(0); + + for (int i = 0; i < numTables; i++) { + final int tableIndex = i; + executor.submit( + () -> { + try { + startLatch.await(); + + String tableName = "concurrent_table_" + tableIndex; + byte[] tableData = createTestTableData(); + + CreateTableRequest createReq = new CreateTableRequest().id(Arrays.asList(tableName)); + namespace.createTable(createReq, tableData); + + DropTableRequest dropReq = new DropTableRequest().id(Arrays.asList(tableName)); + namespace.dropTable(dropReq); + + successCount.incrementAndGet(); + } catch (Exception e) { + failCount.incrementAndGet(); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + assertTrue(doneLatch.await(60, TimeUnit.SECONDS), "Timed out waiting for tasks to complete"); + + executor.shutdown(); + assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + + assertEquals(numTables, successCount.get(), "All tasks should succeed"); + assertEquals(0, failCount.get(), "No tasks should fail"); + + ListTablesRequest listReq = new ListTablesRequest().id(Arrays.asList()); + ListTablesResponse listResp = namespace.listTables(listReq); + assertEquals(0, listResp.getTables().size(), "All tables should be dropped"); + } + + @Test + void testConcurrentCreateAndDropWithMultipleInstances() throws Exception { + int numTables = 10; + ExecutorService executor = Executors.newFixedThreadPool(numTables); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(numTables); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger failCount = new AtomicInteger(0); + List namespaces = new ArrayList<>(); + + for (int i = 0; i < numTables; i++) { + executor.submit( + () -> { + DirectoryNamespace localNs = null; + try { + startLatch.await(); + + localNs = new DirectoryNamespace(); + Map config = new HashMap<>(); + config.put("root", tempDir.toString()); + config.put("inline_optimization_enabled", "false"); + localNs.initialize(config, allocator); + + synchronized (namespaces) { + namespaces.add(localNs); + } + + String tableName = "multi_ns_table_" + Thread.currentThread().getId(); + byte[] tableData = createTestTableData(); + + CreateTableRequest createReq = new CreateTableRequest().id(Arrays.asList(tableName)); + localNs.createTable(createReq, tableData); + + DropTableRequest dropReq = new DropTableRequest().id(Arrays.asList(tableName)); + localNs.dropTable(dropReq); + + successCount.incrementAndGet(); + } catch (Exception e) { + failCount.incrementAndGet(); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + assertTrue(doneLatch.await(60, TimeUnit.SECONDS), "Timed out waiting for tasks to complete"); + + executor.shutdown(); + assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + + // Close all namespace instances + for (DirectoryNamespace ns : namespaces) { + try { + ns.close(); + } catch (Exception e) { + // Ignore + } + } + + assertEquals(numTables, successCount.get(), "All tasks should succeed"); + assertEquals(0, failCount.get(), "No tasks should fail"); + + // Verify with a fresh namespace + DirectoryNamespace verifyNs = new DirectoryNamespace(); + Map config = new HashMap<>(); + config.put("root", tempDir.toString()); + verifyNs.initialize(config, allocator); + + ListTablesRequest listReq = new ListTablesRequest().id(Arrays.asList()); + ListTablesResponse listResp = verifyNs.listTables(listReq); + assertEquals(0, listResp.getTables().size(), "All tables should be dropped"); + + verifyNs.close(); + } + + @Test + void testConcurrentCreateThenDropFromDifferentInstance() throws Exception { + int numTables = 10; + + // First, create all tables using separate namespace instances + ExecutorService createExecutor = Executors.newFixedThreadPool(numTables); + CountDownLatch createStartLatch = new CountDownLatch(1); + CountDownLatch createDoneLatch = new CountDownLatch(numTables); + AtomicInteger createSuccessCount = new AtomicInteger(0); + List createNamespaces = new ArrayList<>(); + + for (int i = 0; i < numTables; i++) { + final int tableIndex = i; + createExecutor.submit( + () -> { + DirectoryNamespace localNs = null; + try { + createStartLatch.await(); + + localNs = new DirectoryNamespace(); + Map config = new HashMap<>(); + config.put("root", tempDir.toString()); + config.put("inline_optimization_enabled", "false"); + localNs.initialize(config, allocator); + + synchronized (createNamespaces) { + createNamespaces.add(localNs); + } + + String tableName = "cross_instance_table_" + tableIndex; + byte[] tableData = createTestTableData(); + + CreateTableRequest createReq = new CreateTableRequest().id(Arrays.asList(tableName)); + localNs.createTable(createReq, tableData); + + createSuccessCount.incrementAndGet(); + } catch (Exception e) { + // Ignore - test will fail on assertion + } finally { + createDoneLatch.countDown(); + } + }); + } + + createStartLatch.countDown(); + assertTrue(createDoneLatch.await(60, TimeUnit.SECONDS), "Timed out waiting for creates"); + createExecutor.shutdown(); + + assertEquals(numTables, createSuccessCount.get(), "All creates should succeed"); + + // Close create namespaces + for (DirectoryNamespace ns : createNamespaces) { + try { + ns.close(); + } catch (Exception e) { + // Ignore + } + } + + // Now drop all tables using NEW namespace instances + ExecutorService dropExecutor = Executors.newFixedThreadPool(numTables); + CountDownLatch dropStartLatch = new CountDownLatch(1); + CountDownLatch dropDoneLatch = new CountDownLatch(numTables); + AtomicInteger dropSuccessCount = new AtomicInteger(0); + AtomicInteger dropFailCount = new AtomicInteger(0); + List dropNamespaces = new ArrayList<>(); + + for (int i = 0; i < numTables; i++) { + final int tableIndex = i; + dropExecutor.submit( + () -> { + DirectoryNamespace localNs = null; + try { + dropStartLatch.await(); + + localNs = new DirectoryNamespace(); + Map config = new HashMap<>(); + config.put("root", tempDir.toString()); + config.put("inline_optimization_enabled", "false"); + localNs.initialize(config, allocator); + + synchronized (dropNamespaces) { + dropNamespaces.add(localNs); + } + + String tableName = "cross_instance_table_" + tableIndex; + + DropTableRequest dropReq = new DropTableRequest().id(Arrays.asList(tableName)); + localNs.dropTable(dropReq); + + dropSuccessCount.incrementAndGet(); + } catch (Exception e) { + dropFailCount.incrementAndGet(); + } finally { + dropDoneLatch.countDown(); + } + }); + } + + dropStartLatch.countDown(); + assertTrue(dropDoneLatch.await(60, TimeUnit.SECONDS), "Timed out waiting for drops"); + dropExecutor.shutdown(); + + // Close drop namespaces + for (DirectoryNamespace ns : dropNamespaces) { + try { + ns.close(); + } catch (Exception e) { + // Ignore + } + } + + assertEquals(numTables, dropSuccessCount.get(), "All drops should succeed"); + assertEquals(0, dropFailCount.get(), "No drops should fail"); + } } diff --git a/python/python/tests/test_namespace_dir.py b/python/python/tests/test_namespace_dir.py index abbb37b9865..d91721f3b1d 100644 --- a/python/python/tests/test_namespace_dir.py +++ b/python/python/tests/test_namespace_dir.py @@ -69,7 +69,10 @@ def temp_namespace(): """Create a temporary DirectoryNamespace for testing.""" with tempfile.TemporaryDirectory() as tmpdir: # Use lance.namespace.connect() for consistency - ns = connect("dir", {"root": f"file://{tmpdir}"}) + # Use high commit_retries for concurrent operation tests + ns = connect( + "dir", {"root": f"file://{tmpdir}", "commit_retries": "2147483647"} + ) yield ns @@ -894,3 +897,231 @@ def test_external_manifest_store_invokes_namespace_apis(): assert namespace.describe_table_version_count == describe_count_before_v1 + 1, ( "describe_table_version should be called once when opening version 1" ) + + +@pytest.mark.skipif( + sys.platform == "win32", + reason="Windows file locking prevents reliable concurrent filesystem operations", +) +class TestConcurrentOperations: + """Tests for concurrent table operations. + + These tests mirror the Rust and Java concurrent tests to ensure + the DirectoryNamespace handles concurrent create/drop operations correctly. + """ + + def test_concurrent_create_and_drop_single_instance(self, temp_namespace): + """Test concurrent create/drop with single namespace instance. + + Mirrors: + - Rust: test_concurrent_create_and_drop_single_instance + - Java: testConcurrentCreateAndDropWithSingleInstance + """ + import concurrent.futures + + num_tables = 10 + success_count = 0 + fail_count = 0 + lock = Lock() + + def create_and_drop_table(table_index): + nonlocal success_count, fail_count + try: + table_name = f"concurrent_table_{table_index}" + table_id = ["test_ns", table_name] + table_data = create_test_data() + ipc_data = table_to_ipc_bytes(table_data) + + # Create table + create_req = CreateTableRequest(id=table_id) + temp_namespace.create_table(create_req, ipc_data) + + # Drop table + drop_req = DropTableRequest(id=table_id) + temp_namespace.drop_table(drop_req) + + with lock: + success_count += 1 + except Exception as e: + with lock: + fail_count += 1 + raise e + + with concurrent.futures.ThreadPoolExecutor(max_workers=num_tables) as executor: + futures = [ + executor.submit(create_and_drop_table, i) for i in range(num_tables) + ] + concurrent.futures.wait(futures) + + assert success_count == num_tables, ( + f"Expected {num_tables} successes, got {success_count}" + ) + assert fail_count == 0, f"Expected 0 failures, got {fail_count}" + + # Verify all tables are dropped + list_req = ListTablesRequest(id=[]) + response = temp_namespace.list_tables(list_req) + assert len(response.tables) == 0, "All tables should be dropped" + + def test_concurrent_create_and_drop_multiple_instances(self): + """Test concurrent create/drop with multiple namespace instances. + + Mirrors: + - Rust: test_concurrent_create_and_drop_multiple_instances + - Java: testConcurrentCreateAndDropWithMultipleInstances + """ + import concurrent.futures + + with tempfile.TemporaryDirectory() as tmpdir: + num_tables = 10 + success_count = 0 + fail_count = 0 + lock = Lock() + + def create_and_drop_table(table_index): + nonlocal success_count, fail_count + try: + # Each thread creates its own namespace instance + # Use high commit_retries to handle version collisions + ns = connect( + "dir", + {"root": f"file://{tmpdir}", "commit_retries": "2147483647"}, + ) + + table_name = f"multi_ns_table_{table_index}" + table_id = ["test_ns", table_name] + table_data = create_test_data() + ipc_data = table_to_ipc_bytes(table_data) + + # Create table + create_req = CreateTableRequest(id=table_id) + ns.create_table(create_req, ipc_data) + + # Drop table + drop_req = DropTableRequest(id=table_id) + ns.drop_table(drop_req) + + with lock: + success_count += 1 + except Exception as e: + with lock: + fail_count += 1 + raise e + + with concurrent.futures.ThreadPoolExecutor( + max_workers=num_tables + ) as executor: + futures = [ + executor.submit(create_and_drop_table, i) for i in range(num_tables) + ] + concurrent.futures.wait(futures) + + assert success_count == num_tables, ( + f"Expected {num_tables} successes, got {success_count}" + ) + assert fail_count == 0, f"Expected 0 failures, got {fail_count}" + + # Verify with a fresh namespace instance + verify_ns = connect( + "dir", {"root": f"file://{tmpdir}", "commit_retries": "2147483647"} + ) + list_req = ListTablesRequest(id=[]) + response = verify_ns.list_tables(list_req) + assert len(response.tables) == 0, "All tables should be dropped" + + def test_concurrent_create_then_drop_from_different_instance(self): + """Test creating from one set of instances, dropping from different ones. + + Mirrors: + - Rust: test_concurrent_create_then_drop_from_different_instance + - Java: testConcurrentCreateThenDropFromDifferentInstance + """ + import concurrent.futures + + with tempfile.TemporaryDirectory() as tmpdir: + num_tables = 10 + + # Phase 1: Create all tables concurrently using separate namespace instances + create_success_count = 0 + create_fail_count = 0 + create_lock = Lock() + + def create_table(table_index): + nonlocal create_success_count, create_fail_count + try: + # Use high commit_retries to handle version collisions + ns = connect( + "dir", + {"root": f"file://{tmpdir}", "commit_retries": "2147483647"}, + ) + + table_name = f"cross_instance_table_{table_index}" + table_id = ["test_ns", table_name] + table_data = create_test_data() + ipc_data = table_to_ipc_bytes(table_data) + + create_req = CreateTableRequest(id=table_id) + ns.create_table(create_req, ipc_data) + + with create_lock: + create_success_count += 1 + except Exception as e: + with create_lock: + create_fail_count += 1 + raise e + + with concurrent.futures.ThreadPoolExecutor( + max_workers=num_tables + ) as executor: + futures = [executor.submit(create_table, i) for i in range(num_tables)] + concurrent.futures.wait(futures) + + assert create_success_count == num_tables, ( + f"All creates should succeed, got {create_success_count}" + ) + + # Phase 2: Drop all tables concurrently using NEW namespace instances + drop_success_count = 0 + drop_fail_count = 0 + drop_lock = Lock() + + def drop_table(table_index): + nonlocal drop_success_count, drop_fail_count + try: + # Use high commit_retries to handle version collisions + ns = connect( + "dir", + {"root": f"file://{tmpdir}", "commit_retries": "2147483647"}, + ) + + table_name = f"cross_instance_table_{table_index}" + table_id = ["test_ns", table_name] + + drop_req = DropTableRequest(id=table_id) + ns.drop_table(drop_req) + + with drop_lock: + drop_success_count += 1 + except Exception as e: + with drop_lock: + drop_fail_count += 1 + raise e + + with concurrent.futures.ThreadPoolExecutor( + max_workers=num_tables + ) as executor: + futures = [executor.submit(drop_table, i) for i in range(num_tables)] + concurrent.futures.wait(futures) + + assert drop_success_count == num_tables, ( + f"All drops should succeed, got {drop_success_count}" + ) + assert drop_fail_count == 0, f"No drops should fail, got {drop_fail_count}" + + # Verify all tables are dropped + verify_ns = connect( + "dir", {"root": f"file://{tmpdir}", "commit_retries": "2147483647"} + ) + list_req = ListTablesRequest(id=[]) + response = verify_ns.list_tables(list_req) + assert len(response.tables) == 0, "All tables should be dropped" diff --git a/python/python/tests/test_namespace_integration.py b/python/python/tests/test_namespace_integration.py index 30489496e38..f58b358ba4f 100644 --- a/python/python/tests/test_namespace_integration.py +++ b/python/python/tests/test_namespace_integration.py @@ -838,3 +838,291 @@ def test_file_session_with_storage_options_provider(s3_bucket: str): final_describe_count = namespace.get_describe_call_count() assert final_describe_count == describe_count_after_second_write + + +def create_test_table_data(): + """Create test PyArrow table data for concurrent tests.""" + return pa.Table.from_pylist( + [ + {"id": 1, "name": "Alice", "age": 30}, + {"id": 2, "name": "Bob", "age": 25}, + {"id": 3, "name": "Charlie", "age": 35}, + ] + ) + + +def table_to_ipc_bytes(table): + """Convert PyArrow table to IPC bytes.""" + import io + + sink = io.BytesIO() + with pa.ipc.RecordBatchStreamWriter(sink, table.schema) as writer: + writer.write_table(table) + return sink.getvalue() + + +@pytest.mark.integration +def test_basic_create_and_drop_on_s3(s3_bucket: str): + """Test basic create and drop table operations on S3. + + Mirrors Java: testBasicCreateAndDropOnS3 + """ + from lance.namespace import DirectoryNamespace + from lance_namespace import ( + DropTableRequest, + TableExistsRequest, + ) + + test_prefix = f"test-{uuid.uuid4().hex[:8]}" + storage_options = copy.deepcopy(CONFIG) + dir_props = {f"storage.{k}": v for k, v in storage_options.items()} + dir_props["root"] = f"s3://{s3_bucket}/{test_prefix}" + namespace = DirectoryNamespace(**dir_props) + + table_name = "basic_test_table" + table_data = create_test_table_data() + table_id = ["test_ns", table_name] + + # Create table using lance.write_dataset (same as other passing tests) + ds = lance.write_dataset( + table_data, namespace=namespace, table_id=table_id, mode="create" + ) + assert ds is not None + assert ds.count_rows() == 3 + + # Drop table + drop_req = DropTableRequest(id=table_id) + drop_resp = namespace.drop_table(drop_req) + assert drop_resp is not None + + # Verify table no longer exists + exists_req = TableExistsRequest(id=table_id) + with pytest.raises(Exception): + namespace.table_exists(exists_req) + + +@pytest.mark.integration +def test_concurrent_create_and_drop_single_instance_on_s3(s3_bucket: str): + """Test concurrent create/drop with single namespace instance on S3. + + Mirrors: + - Rust: test_concurrent_create_and_drop_single_instance + - Java: testConcurrentCreateAndDropWithSingleInstanceOnS3 + """ + import concurrent.futures + + from lance.namespace import DirectoryNamespace + from lance_namespace import CreateTableRequest, DropTableRequest + + test_prefix = f"test-{uuid.uuid4().hex[:8]}" + storage_options = copy.deepcopy(CONFIG) + dir_props = {f"storage.{k}": v for k, v in storage_options.items()} + dir_props["root"] = f"s3://{s3_bucket}/{test_prefix}" + # Very high retry count to guarantee all operations succeed + dir_props["commit_retries"] = "2147483647" + namespace = DirectoryNamespace(**dir_props) + + num_tables = 10 + success_count = 0 + fail_count = 0 + lock = Lock() + + def create_and_drop_table(table_index): + nonlocal success_count, fail_count + try: + table_name = f"s3_concurrent_table_{table_index}" + table_data = create_test_table_data() + table_id = ["test_ns", table_name] + ipc_data = table_to_ipc_bytes(table_data) + + # Create table using atomic create_table API + create_req = CreateTableRequest(id=table_id) + namespace.create_table(create_req, ipc_data) + + # Drop table + drop_req = DropTableRequest(id=table_id) + namespace.drop_table(drop_req) + + with lock: + success_count += 1 + except Exception: + with lock: + fail_count += 1 + raise + + with concurrent.futures.ThreadPoolExecutor(max_workers=num_tables) as executor: + futures = [executor.submit(create_and_drop_table, i) for i in range(num_tables)] + concurrent.futures.wait(futures) + + # All operations must succeed with very high retry count + assert success_count == num_tables, ( + f"Expected {num_tables} successes, got {success_count}" + ) + assert fail_count == 0, f"Expected 0 failures, got {fail_count}" + + +@pytest.mark.integration +def test_concurrent_create_and_drop_multiple_instances_on_s3(s3_bucket: str): + """Test concurrent create/drop with multiple namespace instances on S3. + + Mirrors: + - Rust: test_concurrent_create_and_drop_multiple_instances + - Java: testConcurrentCreateAndDropWithMultipleInstancesOnS3 + """ + import concurrent.futures + + from lance.namespace import DirectoryNamespace + from lance_namespace import CreateTableRequest, DropTableRequest, ListTablesRequest + + test_prefix = f"test-{uuid.uuid4().hex[:8]}" + storage_options = copy.deepcopy(CONFIG) + base_dir_props = {f"storage.{k}": v for k, v in storage_options.items()} + base_dir_props["root"] = f"s3://{s3_bucket}/{test_prefix}" + # Very high retry count to guarantee all operations succeed + base_dir_props["commit_retries"] = "2147483647" + + num_tables = 10 + success_count = 0 + fail_count = 0 + lock = Lock() + + def create_and_drop_table(table_index): + nonlocal success_count, fail_count + try: + # Each thread creates its own namespace instance + ns = DirectoryNamespace(**base_dir_props.copy()) + + table_name = f"s3_multi_ns_table_{table_index}" + table_data = create_test_table_data() + table_id = ["test_ns", table_name] + ipc_data = table_to_ipc_bytes(table_data) + + # Create table using atomic create_table API + create_req = CreateTableRequest(id=table_id) + ns.create_table(create_req, ipc_data) + + # Drop table + drop_req = DropTableRequest(id=table_id) + ns.drop_table(drop_req) + + with lock: + success_count += 1 + except Exception: + with lock: + fail_count += 1 + raise + + with concurrent.futures.ThreadPoolExecutor(max_workers=num_tables) as executor: + futures = [executor.submit(create_and_drop_table, i) for i in range(num_tables)] + concurrent.futures.wait(futures) + + # All operations must succeed with very high retry count + assert success_count == num_tables, ( + f"Expected {num_tables} successes, got {success_count}" + ) + assert fail_count == 0, f"Expected 0 failures, got {fail_count}" + + # Verify remaining state is consistent (no corruption) + verify_ns = DirectoryNamespace(**base_dir_props) + list_req = ListTablesRequest(id=["test_ns"]) + _ = verify_ns.list_tables(list_req) # Should not error + + +@pytest.mark.integration +def test_concurrent_create_then_drop_from_different_instance_on_s3(s3_bucket: str): + """Test creating from one set of instances, dropping from different ones on S3. + + Mirrors: + - Rust: test_concurrent_create_then_drop_from_different_instance + - Java: testConcurrentCreateThenDropFromDifferentInstanceOnS3 + """ + import concurrent.futures + + from lance.namespace import DirectoryNamespace + from lance_namespace import CreateTableRequest, DropTableRequest, ListTablesRequest + + test_prefix = f"test-{uuid.uuid4().hex[:8]}" + storage_options = copy.deepcopy(CONFIG) + base_dir_props = {f"storage.{k}": v for k, v in storage_options.items()} + base_dir_props["root"] = f"s3://{s3_bucket}/{test_prefix}" + # Very high retry count to guarantee all operations succeed + base_dir_props["commit_retries"] = "2147483647" + + num_tables = 10 + + # Phase 1: Create all tables concurrently using separate namespace instances + create_success_count = 0 + create_fail_count = 0 + create_lock = Lock() + + def create_table(table_index): + nonlocal create_success_count, create_fail_count + table_name = f"s3_cross_instance_table_{table_index}" + try: + ns = DirectoryNamespace(**base_dir_props.copy()) + + table_data = create_test_table_data() + table_id = ["test_ns", table_name] + ipc_data = table_to_ipc_bytes(table_data) + + # Create table using atomic create_table API + create_req = CreateTableRequest(id=table_id) + ns.create_table(create_req, ipc_data) + + with create_lock: + create_success_count += 1 + except Exception: + with create_lock: + create_fail_count += 1 + raise + + with concurrent.futures.ThreadPoolExecutor(max_workers=num_tables) as executor: + futures = [executor.submit(create_table, i) for i in range(num_tables)] + concurrent.futures.wait(futures) + + # All creates must succeed with very high retry count + assert create_success_count == num_tables, ( + f"Expected {num_tables} create successes, got {create_success_count}" + ) + assert create_fail_count == 0, ( + f"Expected 0 create failures, got {create_fail_count}" + ) + + # Phase 2: Drop all tables using NEW namespace instances + drop_success_count = 0 + drop_fail_count = 0 + drop_lock = Lock() + + def drop_table(table_index): + nonlocal drop_success_count, drop_fail_count + try: + ns = DirectoryNamespace(**base_dir_props.copy()) + + table_name = f"s3_cross_instance_table_{table_index}" + table_id = ["test_ns", table_name] + + drop_req = DropTableRequest(id=table_id) + ns.drop_table(drop_req) + + with drop_lock: + drop_success_count += 1 + except Exception: + with drop_lock: + drop_fail_count += 1 + raise + + # Drop all tables + with concurrent.futures.ThreadPoolExecutor(max_workers=num_tables) as executor: + futures = [executor.submit(drop_table, i) for i in range(num_tables)] + concurrent.futures.wait(futures) + + # All drops must succeed with very high retry count + assert drop_success_count == num_tables, ( + f"Expected {num_tables} drop successes, got {drop_success_count}" + ) + assert drop_fail_count == 0, f"Expected 0 drop failures, got {drop_fail_count}" + + # Verify remaining state is consistent (no corruption) + verify_ns = DirectoryNamespace(**base_dir_props) + list_req = ListTablesRequest(id=["test_ns"]) + _ = verify_ns.list_tables(list_req) # Should not error diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index 13cc5e33801..e9f8ede4096 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -20,6 +20,7 @@ mod schema; use crate::{Error, Result}; pub use field::{ BlobVersion, Encoding, Field, NullabilityComparison, OnTypeMismatch, SchemaCompareOptions, + LANCE_UNENFORCED_PRIMARY_KEY_POSITION, }; pub use schema::{ escape_field_path_for_project, format_field_path, parse_field_path, BlobHandling, FieldRef, diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 3b8a398664d..3f983d12fbf 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -103,6 +103,7 @@ pub struct DirectoryNamespaceBuilder { table_version_tracking_enabled: bool, credential_vendor_properties: HashMap, context_provider: Option>, + commit_retries: Option, } impl std::fmt::Debug for DirectoryNamespaceBuilder { @@ -145,6 +146,7 @@ impl DirectoryNamespaceBuilder { table_version_tracking_enabled: false, // Default to disabled credential_vendor_properties: HashMap::new(), context_provider: None, + commit_retries: None, } } @@ -315,6 +317,10 @@ impl DirectoryNamespaceBuilder { }) .collect(); + let commit_retries = properties + .get("commit_retries") + .and_then(|v| v.parse::().ok()); + Ok(Self { root: root.trim_end_matches('/').to_string(), storage_options, @@ -325,6 +331,7 @@ impl DirectoryNamespaceBuilder { table_version_tracking_enabled, credential_vendor_properties, context_provider: None, + commit_retries, }) } @@ -367,6 +374,13 @@ impl DirectoryNamespaceBuilder { self } + /// Set the number of retries for commit operations on the manifest table. + /// If not set, uses the lance default. + pub fn commit_retries(mut self, retries: u32) -> Self { + self.commit_retries = Some(retries); + self + } + /// Add a credential vendor property. /// /// Use short property names without the `credential_vendor.` prefix. @@ -455,6 +469,7 @@ impl DirectoryNamespaceBuilder { base_path.clone(), self.dir_listing_enabled, self.inline_optimization_enabled, + self.commit_retries, ) .await { diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index af0e4d9bb4b..b4d940756b3 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -13,15 +13,17 @@ use async_trait::async_trait; use bytes::Bytes; use futures::{stream::StreamExt, FutureExt}; use lance::dataset::optimize::{compact_files, CompactionOptions}; -use lance::dataset::{builder::DatasetBuilder, WriteParams}; +use lance::dataset::{builder::DatasetBuilder, ReadParams, WriteParams}; use lance::session::Session; use lance::{dataset::scanner::Scanner, Dataset}; +use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION; use lance_core::{box_error, Error, Result}; use lance_index::optimize::OptimizeOptions; use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams}; use lance_index::traits::DatasetIndexExt; use lance_index::IndexType; use lance_io::object_store::{ObjectStore, ObjectStoreParams}; +use lance_namespace::error::NamespaceError; use lance_namespace::models::{ CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest, CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DeclareTableRequest, @@ -238,7 +240,6 @@ impl DerefMut for DatasetWriteGuard<'_> { /// Manifest-based namespace implementation /// /// Uses a special `__manifest` Lance table to track tables and nested namespaces. -#[derive(Debug)] pub struct ManifestNamespace { root: String, storage_options: Option>, @@ -256,10 +257,28 @@ pub struct ManifestNamespace { /// Whether to perform inline optimization (compaction and indexing) on the __manifest table /// after every write. Defaults to true. inline_optimization_enabled: bool, + /// Number of retries for commit operations on the manifest table. + /// If None, uses the lance default. + commit_retries: Option, +} + +impl std::fmt::Debug for ManifestNamespace { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ManifestNamespace") + .field("root", &self.root) + .field("storage_options", &self.storage_options) + .field("dir_listing_enabled", &self.dir_listing_enabled) + .field( + "inline_optimization_enabled", + &self.inline_optimization_enabled, + ) + .finish() + } } impl ManifestNamespace { /// Create a new ManifestNamespace from an existing DirectoryNamespace + #[allow(clippy::too_many_arguments)] pub async fn from_directory( root: String, storage_options: Option>, @@ -268,9 +287,11 @@ impl ManifestNamespace { base_path: Path, dir_listing_enabled: bool, inline_optimization_enabled: bool, + commit_retries: Option, ) -> Result { let manifest_dataset = - Self::create_or_get_manifest(&root, &storage_options, session.clone()).await?; + Self::ensure_manifest_table_up_to_date(&root, &storage_options, session.clone()) + .await?; Ok(Self { root, @@ -281,6 +302,7 @@ impl ManifestNamespace { manifest_dataset, dir_listing_enabled, inline_optimization_enabled, + commit_retries, }) } @@ -519,7 +541,15 @@ impl ManifestNamespace { /// Get the manifest schema fn manifest_schema() -> Arc { Arc::new(ArrowSchema::new(vec![ - Field::new("object_id", DataType::Utf8, false), + // Set unenforced primary key on object_id for bloom filter conflict detection + Field::new("object_id", DataType::Utf8, false).with_metadata( + [( + LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(), + "0".to_string(), + )] + .into_iter() + .collect(), + ), Field::new("object_type", DataType::Utf8, false), Field::new("location", DataType::Utf8, true), Field::new("metadata", DataType::Utf8, true), @@ -780,6 +810,12 @@ impl ManifestNamespace { merge_builder.when_matched(lance::dataset::WhenMatched::Fail); merge_builder.when_not_matched(lance::dataset::WhenNotMatched::InsertAll); + // conflict_retries=0: no outer loop retry on semantic conflicts (handled by caller) + // commit_retries: inner retry for manifest version conflicts (uses lance default if not set) + merge_builder.conflict_retries(0); + if let Some(retries) = self.commit_retries { + merge_builder.commit_retries(retries); + } let (new_dataset_arc, _merge_stats) = merge_builder .try_build() @@ -793,16 +829,41 @@ impl ManifestNamespace { .execute_reader(Box::new(reader)) .await .map_err(|e| { - // Check if this is a "matched row" error from WhenMatched::Fail + // CommitConflict: version collision retries exhausted -> Throttled (safe to retry) + // TooMuchWriteContention: semantic conflict -> ConcurrentModification (don't retry) + // matched/duplicate: WhenMatched::Fail triggered -> ConcurrentModification let error_msg = e.to_string(); - if error_msg.contains("matched") + if error_msg.contains("CommitConflict") + || error_msg.contains("Failed to commit the transaction after") + { + NamespaceError::Throttled { + message: format!( + "Too many concurrent writes, please retry later: {}", + error_msg + ), + } + .into() + } else if error_msg.contains("TooMuchWriteContention") + || error_msg.contains("Too many concurrent writers") + { + NamespaceError::ConcurrentModification { + message: format!( + "Object '{}' was concurrently modified by another operation", + object_id + ), + } + .into() + } else if error_msg.contains("matched") || error_msg.contains("duplicate") || error_msg.contains("already exists") { - Error::io( - format!("Object with id '{}' already exists in manifest", object_id), - location!(), - ) + NamespaceError::ConcurrentModification { + message: format!( + "Object '{}' was concurrently created by another operation", + object_id + ), + } + .into() } else { Error::IO { source: box_error(std::io::Error::other(format!( @@ -830,19 +891,43 @@ impl ManifestNamespace { /// Delete an entry from the manifest table pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> { - { - let predicate = format!("object_id = '{}'", object_id); - let mut dataset_guard = self.manifest_dataset.get_mut().await?; - dataset_guard - .delete(&predicate) - .await - .map_err(|e| Error::IO { - source: box_error(std::io::Error::other(format!("Failed to delete: {}", e))), - location: location!(), - })?; - } // Drop the guard here + let predicate = format!("object_id = '{}'", object_id); - self.manifest_dataset.reload().await?; + // Get dataset and use DeleteBuilder with configured retries + let dataset_guard = self.manifest_dataset.get().await?; + let dataset = Arc::new(dataset_guard.clone()); + drop(dataset_guard); // Drop read guard before delete + + let new_dataset = lance::dataset::DeleteBuilder::new(dataset, &predicate) + .execute() + .await + .map_err(|e| { + let error_msg = e.to_string(); + if error_msg.contains("CommitConflict") + || error_msg.contains("TooMuchWriteContention") + { + NamespaceError::Throttled { + message: format!( + "Too many concurrent writes during delete, please retry later: {}", + error_msg + ), + } + .into() + } else { + Error::IO { + source: box_error(std::io::Error::other(format!( + "Failed to delete: {}", + e + ))), + location: location!(), + } + } + })?; + + // Update the wrapper with the new dataset + self.manifest_dataset + .set_latest(Arc::try_unwrap(new_dataset).unwrap_or_else(|arc| (*arc).clone())) + .await; // Run inline optimization after delete if let Err(e) = self.run_inline_optimization().await { @@ -952,26 +1037,71 @@ impl ManifestNamespace { Ok(found_result) } - /// Create or get the manifest dataset - async fn create_or_get_manifest( + /// Create or load the manifest dataset, ensuring it has the latest schema setup. + /// + /// This function will: + /// 1. Try to load an existing manifest table + /// 2. If it exists, check and migrate the schema if needed (e.g., add primary key metadata) + /// 3. If it doesn't exist, create a new manifest table with the current schema + async fn ensure_manifest_table_up_to_date( root: &str, storage_options: &Option>, session: Option>, ) -> Result { let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME); log::debug!("Attempting to load manifest from {}", manifest_path); - let mut builder = DatasetBuilder::from_uri(&manifest_path); - - if let Some(sess) = session.clone() { - builder = builder.with_session(sess); - } - - if let Some(opts) = storage_options { - builder = builder.with_storage_options(opts.clone()); - } + let store_options = ObjectStoreParams { + storage_options_accessor: storage_options.as_ref().map(|opts| { + Arc::new( + lance_io::object_store::StorageOptionsAccessor::with_static_options( + opts.clone(), + ), + ) + }), + ..Default::default() + }; + let read_params = ReadParams { + session: session.clone(), + store_options: Some(store_options.clone()), + ..Default::default() + }; + let dataset_result = DatasetBuilder::from_uri(&manifest_path) + .with_read_params(read_params) + .load() + .await; + if let Ok(mut dataset) = dataset_result { + // Check if the object_id field has primary key metadata, migrate if not + let needs_pk_migration = dataset + .schema() + .field("object_id") + .map(|f| { + !f.metadata + .contains_key(LANCE_UNENFORCED_PRIMARY_KEY_POSITION) + }) + .unwrap_or(false); + + if needs_pk_migration { + log::info!("Migrating __manifest table to add primary key metadata on object_id"); + dataset + .update_field_metadata() + .update("object_id", [(LANCE_UNENFORCED_PRIMARY_KEY_POSITION, "0")]) + .map_err(|e| Error::IO { + source: box_error(std::io::Error::other(format!( + "Failed to find object_id field for migration: {}", + e + ))), + location: location!(), + })? + .await + .map_err(|e| Error::IO { + source: box_error(std::io::Error::other(format!( + "Failed to migrate primary key metadata: {}", + e + ))), + location: location!(), + })?; + } - let dataset_result = builder.load().await; - if let Ok(dataset) = dataset_result { Ok(DatasetConsistencyWrapper::new(dataset)) } else { log::info!("Creating new manifest table at {}", manifest_path); @@ -979,36 +1109,87 @@ impl ManifestNamespace { let empty_batch = RecordBatch::new_empty(schema.clone()); let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone()); - let write_params = WriteParams { - session, - store_params: storage_options.as_ref().map(|opts| ObjectStoreParams { - storage_options_accessor: Some(Arc::new( + let store_params = ObjectStoreParams { + storage_options_accessor: storage_options.as_ref().map(|opts| { + Arc::new( lance_io::object_store::StorageOptionsAccessor::with_static_options( opts.clone(), ), - )), - ..Default::default() + ) }), ..Default::default() }; + let write_params = WriteParams { + session: session.clone(), + store_params: Some(store_params), + ..Default::default() + }; - let dataset = Dataset::write(Box::new(reader), &manifest_path, Some(write_params)) - .await - .map_err(|e| Error::IO { - source: box_error(std::io::Error::other(format!( - "Failed to create manifest dataset: {}", - e - ))), - location: location!(), - })?; + let dataset = + Dataset::write(Box::new(reader), &manifest_path, Some(write_params)).await; - log::info!( - "Successfully created manifest table at {}, version={}, uri={}", - manifest_path, - dataset.version().version, - dataset.uri() - ); - Ok(DatasetConsistencyWrapper::new(dataset)) + // Handle race condition where another process created the manifest concurrently + match dataset { + Ok(dataset) => { + log::info!( + "Successfully created manifest table at {}, version={}, uri={}", + manifest_path, + dataset.version().version, + dataset.uri() + ); + Ok(DatasetConsistencyWrapper::new(dataset)) + } + Err(e) => { + let error_msg = e.to_string(); + // Check if the error indicates the manifest already exists + if error_msg.contains("already exists") + || error_msg.contains("TableAlreadyExists") + || error_msg.contains("CommitConflict") + || error_msg.contains("Commit conflict") + { + log::info!( + "Manifest table was created by another process, loading it: {}", + manifest_path + ); + // Try to load the manifest that was created by another process + let recovery_store_options = ObjectStoreParams { + storage_options_accessor: storage_options.as_ref().map(|opts| { + Arc::new( + lance_io::object_store::StorageOptionsAccessor::with_static_options( + opts.clone(), + ), + ) + }), + ..Default::default() + }; + let recovery_read_params = ReadParams { + session, + store_options: Some(recovery_store_options), + ..Default::default() + }; + let dataset = DatasetBuilder::from_uri(&manifest_path) + .with_read_params(recovery_read_params) + .load() + .await + .map_err(|e| Error::IO { + source: box_error(std::io::Error::other(format!( + "Failed to load manifest dataset after creation conflict: {}", + e + ))), + location: location!(), + })?; + Ok(DatasetConsistencyWrapper::new(dataset)) + } else { + Err(Error::IO { + source: box_error(std::io::Error::other(format!( + "Failed to create manifest dataset: {}", + e + ))), + location: location!(), + }) + } + } + } } } } @@ -1258,7 +1439,21 @@ impl LanceNamespace for ManifestNamespace { batches.into_iter().map(Ok).collect(); let reader = RecordBatchIterator::new(batch_results, schema); - let write_params = WriteParams::default(); + let store_params = ObjectStoreParams { + storage_options_accessor: self.storage_options.as_ref().map(|opts| { + Arc::new( + lance_io::object_store::StorageOptionsAccessor::with_static_options( + opts.clone(), + ), + ) + }), + ..Default::default() + }; + let write_params = WriteParams { + session: self.session.clone(), + store_params: Some(store_params), + ..Default::default() + }; let _dataset = Dataset::write(Box::new(reader), &table_uri, Some(write_params)) .await .map_err(|e| Error::IO { @@ -2571,6 +2766,218 @@ mod tests { ); } + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_concurrent_create_and_drop_single_instance(#[case] inline_optimization: bool) { + use futures::future::join_all; + use std::sync::Arc; + + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + + let dir_namespace = Arc::new( + DirectoryNamespaceBuilder::new(temp_path) + .inline_optimization_enabled(inline_optimization) + .build() + .await + .unwrap(), + ); + + let num_tables = 10; + let mut handles = Vec::new(); + + for i in 0..num_tables { + let ns = dir_namespace.clone(); + let handle = async move { + let table_name = format!("concurrent_table_{}", i); + let table_id = vec!["test_ns".to_string(), table_name.clone()]; + let buffer = create_test_ipc_data(); + + // Create table + let mut create_request = CreateTableRequest::new(); + create_request.id = Some(table_id.clone()); + ns.create_table(create_request, Bytes::from(buffer)) + .await + .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e)); + + // Drop table + let mut drop_request = DropTableRequest::new(); + drop_request.id = Some(table_id); + ns.drop_table(drop_request) + .await + .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e)); + + Ok::<_, lance_core::Error>(()) + }; + handles.push(handle); + } + + let results = join_all(handles).await; + for result in results { + assert!(result.is_ok(), "All concurrent operations should succeed"); + } + + // Verify all tables are dropped + let mut request = ListTablesRequest::new(); + request.id = Some(vec![]); + let response = dir_namespace.list_tables(request).await.unwrap(); + assert_eq!(response.tables.len(), 0, "All tables should be dropped"); + } + + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_concurrent_create_and_drop_multiple_instances(#[case] inline_optimization: bool) { + use futures::future::join_all; + + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap().to_string(); + + let num_tables = 10; + let mut handles = Vec::new(); + + for i in 0..num_tables { + let path = temp_path.clone(); + let handle = async move { + // Each task creates its own namespace instance + let ns = DirectoryNamespaceBuilder::new(&path) + .inline_optimization_enabled(inline_optimization) + .build() + .await + .unwrap(); + + let table_name = format!("multi_ns_table_{}", i); + let table_id = vec!["test_ns".to_string(), table_name.clone()]; + let buffer = create_test_ipc_data(); + + // Create table + let mut create_request = CreateTableRequest::new(); + create_request.id = Some(table_id.clone()); + ns.create_table(create_request, Bytes::from(buffer)) + .await + .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e)); + + // Drop table + let mut drop_request = DropTableRequest::new(); + drop_request.id = Some(table_id); + ns.drop_table(drop_request) + .await + .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e)); + + Ok::<_, lance_core::Error>(()) + }; + handles.push(handle); + } + + let results = join_all(handles).await; + for result in results { + assert!(result.is_ok(), "All concurrent operations should succeed"); + } + + // Verify with a fresh namespace instance + let verify_ns = DirectoryNamespaceBuilder::new(&temp_path) + .inline_optimization_enabled(inline_optimization) + .build() + .await + .unwrap(); + + let mut request = ListTablesRequest::new(); + request.id = Some(vec![]); + let response = verify_ns.list_tables(request).await.unwrap(); + assert_eq!(response.tables.len(), 0, "All tables should be dropped"); + } + + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_concurrent_create_then_drop_from_different_instance( + #[case] inline_optimization: bool, + ) { + use futures::future::join_all; + + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap().to_string(); + + let num_tables = 10; + + // Phase 1: Create all tables concurrently using separate namespace instances + let mut create_handles = Vec::new(); + for i in 0..num_tables { + let path = temp_path.clone(); + let handle = async move { + let ns = DirectoryNamespaceBuilder::new(&path) + .inline_optimization_enabled(inline_optimization) + .build() + .await + .unwrap(); + + let table_name = format!("cross_instance_table_{}", i); + let table_id = vec!["test_ns".to_string(), table_name.clone()]; + let buffer = create_test_ipc_data(); + + let mut create_request = CreateTableRequest::new(); + create_request.id = Some(table_id); + ns.create_table(create_request, Bytes::from(buffer)) + .await + .unwrap_or_else(|e| panic!("Failed to create table {}: {}", table_name, e)); + + Ok::<_, lance_core::Error>(()) + }; + create_handles.push(handle); + } + + let create_results = join_all(create_handles).await; + for result in create_results { + assert!(result.is_ok(), "All create operations should succeed"); + } + + // Phase 2: Drop all tables concurrently using NEW namespace instances + let mut drop_handles = Vec::new(); + for i in 0..num_tables { + let path = temp_path.clone(); + let handle = async move { + let ns = DirectoryNamespaceBuilder::new(&path) + .inline_optimization_enabled(inline_optimization) + .build() + .await + .unwrap(); + + let table_name = format!("cross_instance_table_{}", i); + let table_id = vec!["test_ns".to_string(), table_name.clone()]; + + let mut drop_request = DropTableRequest::new(); + drop_request.id = Some(table_id); + ns.drop_table(drop_request) + .await + .unwrap_or_else(|e| panic!("Failed to drop table {}: {}", table_name, e)); + + Ok::<_, lance_core::Error>(()) + }; + drop_handles.push(handle); + } + + let drop_results = join_all(drop_handles).await; + for result in drop_results { + assert!(result.is_ok(), "All drop operations should succeed"); + } + + // Verify all tables are dropped + let verify_ns = DirectoryNamespaceBuilder::new(&temp_path) + .inline_optimization_enabled(inline_optimization) + .build() + .await + .unwrap(); + + let mut request = ListTablesRequest::new(); + request.id = Some(vec![]); + let response = verify_ns.list_tables(request).await.unwrap(); + assert_eq!(response.tables.len(), 0, "All tables should be dropped"); + } + #[test] fn test_construct_full_uri_with_cloud_urls() { // Test S3-style URL with nested path (no trailing slash) diff --git a/rust/lance-namespace/src/error.rs b/rust/lance-namespace/src/error.rs index 71fb7c12c31..7ead678c850 100644 --- a/rust/lance-namespace/src/error.rs +++ b/rust/lance-namespace/src/error.rs @@ -77,6 +77,8 @@ pub enum ErrorCode { InvalidTableState = 19, /// Table schema validation failed TableSchemaValidationError = 20, + /// Request was throttled due to rate limiting or too many concurrent operations + Throttled = 21, } impl ErrorCode { @@ -111,6 +113,7 @@ impl ErrorCode { 18 => Some(Self::Internal), 19 => Some(Self::InvalidTableState), 20 => Some(Self::TableSchemaValidationError), + 21 => Some(Self::Throttled), _ => None, } } @@ -140,6 +143,7 @@ impl std::fmt::Display for ErrorCode { Self::Internal => "Internal", Self::InvalidTableState => "InvalidTableState", Self::TableSchemaValidationError => "TableSchemaValidationError", + Self::Throttled => "Throttled", }; write!(f, "{}", name) } @@ -253,6 +257,10 @@ pub enum NamespaceError { /// Table schema validation failed. #[snafu(display("Table schema validation error: {message}"))] TableSchemaValidationError { message: String }, + + /// Request was throttled due to rate limiting or too many concurrent operations. + #[snafu(display("Throttled: {message}"))] + Throttled { message: String }, } impl NamespaceError { @@ -282,6 +290,7 @@ impl NamespaceError { Self::Internal { .. } => ErrorCode::Internal, Self::InvalidTableState { .. } => ErrorCode::InvalidTableState, Self::TableSchemaValidationError { .. } => ErrorCode::TableSchemaValidationError, + Self::Throttled { .. } => ErrorCode::Throttled, } } @@ -314,6 +323,7 @@ impl NamespaceError { Some(ErrorCode::TableSchemaValidationError) => { Self::TableSchemaValidationError { message } } + Some(ErrorCode::Throttled) => Self::Throttled { message }, None => Self::Internal { message }, } } @@ -342,7 +352,7 @@ mod tests { #[test] fn test_error_code_roundtrip() { - for code in 0..=20 { + for code in 0..=21 { let error_code = ErrorCode::from_u32(code).unwrap(); assert_eq!(error_code.as_u32(), code); } diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 974067edb60..11a69f69da8 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -324,6 +324,8 @@ struct MergeInsertParams { use_index: bool, // Controls how to handle duplicate source rows that match the same target row. source_dedupe_behavior: SourceDedupeBehavior, + // Number of inner commit retries for manifest version conflicts. Default is 20. + commit_retries: Option, } /// A MergeInsertJob inserts new rows, deletes old rows, and updates existing rows all as @@ -447,6 +449,7 @@ impl MergeInsertBuilder { skip_auto_cleanup: false, use_index: true, source_dedupe_behavior: SourceDedupeBehavior::Fail, + commit_retries: None, }, }) } @@ -537,6 +540,14 @@ impl MergeInsertBuilder { self } + /// Set the number of inner commit retries for manifest version conflicts. + /// Different from `conflict_retries` which handles semantic conflicts. + /// Default: 20 + pub fn commit_retries(&mut self, retries: u32) -> &mut Self { + self.params.commit_retries = Some(retries); + self + } + /// Crate a merge insert job pub fn try_build(&mut self) -> Result { if !self.params.insert_not_matched @@ -1873,6 +1884,9 @@ impl RetryExecutor for MergeInsertJobWithIterator { let mut commit_builder = CommitBuilder::new(dataset).with_skip_auto_cleanup(self.job.params.skip_auto_cleanup); + if let Some(commit_retries) = self.job.params.commit_retries { + commit_builder = commit_builder.with_max_retries(commit_retries); + } if let Some(affected_rows) = data.affected_rows { commit_builder = commit_builder.with_affected_rows(affected_rows); } From dee4d1e4908f9faf6aee2f667bd2a09c3437a1a1 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 24 Feb 2026 16:15:39 -0800 Subject: [PATCH 02/11] rebase --- .../lance-namespace-impls/src/dir/manifest.rs | 85 ++++++++++--------- 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index b4d940756b3..20aa703f7e8 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -829,48 +829,49 @@ impl ManifestNamespace { .execute_reader(Box::new(reader)) .await .map_err(|e| { - // CommitConflict: version collision retries exhausted -> Throttled (safe to retry) - // TooMuchWriteContention: semantic conflict -> ConcurrentModification (don't retry) - // matched/duplicate: WhenMatched::Fail triggered -> ConcurrentModification - let error_msg = e.to_string(); - if error_msg.contains("CommitConflict") - || error_msg.contains("Failed to commit the transaction after") - { - NamespaceError::Throttled { - message: format!( - "Too many concurrent writes, please retry later: {}", - error_msg - ), - } - .into() - } else if error_msg.contains("TooMuchWriteContention") - || error_msg.contains("Too many concurrent writers") - { - NamespaceError::ConcurrentModification { - message: format!( - "Object '{}' was concurrently modified by another operation", - object_id - ), + use lance_core::Error as LanceError; + match &e { + // CommitConflict/RetryableCommitConflict: version collision retries exhausted -> Throttled (safe to retry) + LanceError::CommitConflict { .. } + | LanceError::RetryableCommitConflict { .. } => NamespaceError::Throttled { + message: format!("Too many concurrent writes, please retry later: {}", e), } - .into() - } else if error_msg.contains("matched") - || error_msg.contains("duplicate") - || error_msg.contains("already exists") - { - NamespaceError::ConcurrentModification { - message: format!( - "Object '{}' was concurrently created by another operation", - object_id - ), + .into(), + // TooMuchWriteContention: semantic conflict -> ConcurrentModification (don't retry) + // IncompatibleTransaction: incompatible concurrent change -> ConcurrentModification (don't retry) + LanceError::TooMuchWriteContention { .. } + | LanceError::IncompatibleTransaction { .. } => { + NamespaceError::ConcurrentModification { + message: format!( + "Object '{}' was concurrently modified by another operation", + object_id + ), + } + .into() } - .into() - } else { - Error::IO { - source: box_error(std::io::Error::other(format!( - "Failed to execute merge: {}", - e - ))), - location: location!(), + // Other errors: check message for semantic conflicts (matched/duplicate from WhenMatched::Fail) + _ => { + let error_msg = e.to_string(); + if error_msg.contains("matched") + || error_msg.contains("duplicate") + || error_msg.contains("already exists") + { + NamespaceError::ConcurrentModification { + message: format!( + "Object '{}' was concurrently created by another operation", + object_id + ), + } + .into() + } else { + Error::IO { + source: box_error(std::io::Error::other(format!( + "Failed to execute merge: {}", + e + ))), + location: location!(), + } + } } } })?; @@ -926,7 +927,9 @@ impl ManifestNamespace { // Update the wrapper with the new dataset self.manifest_dataset - .set_latest(Arc::try_unwrap(new_dataset).unwrap_or_else(|arc| (*arc).clone())) + .set_latest( + Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()), + ) .await; // Run inline optimization after delete From da2b8620b53f9f2b26ac8c283142e77b1b44aa90 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 24 Feb 2026 16:35:15 -0800 Subject: [PATCH 03/11] cleanup --- rust/lance-namespace-impls/src/dir.rs | 2 +- rust/lance-namespace-impls/src/dir/manifest.rs | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 3f983d12fbf..d85f5cd4e46 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -375,7 +375,7 @@ impl DirectoryNamespaceBuilder { } /// Set the number of retries for commit operations on the manifest table. - /// If not set, uses the lance default. + /// If not set, defaults to [`lance_table::io::commit::CommitConfig::default().num_retries`]. pub fn commit_retries(mut self, retries: u32) -> Self { self.commit_retries = Some(retries); self diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index 20aa703f7e8..4325f540c98 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -17,6 +17,7 @@ use lance::dataset::{builder::DatasetBuilder, ReadParams, WriteParams}; use lance::session::Session; use lance::{dataset::scanner::Scanner, Dataset}; use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION; +use lance_core::Error as LanceError; use lance_core::{box_error, Error, Result}; use lance_index::optimize::OptimizeOptions; use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams}; @@ -258,7 +259,7 @@ pub struct ManifestNamespace { /// after every write. Defaults to true. inline_optimization_enabled: bool, /// Number of retries for commit operations on the manifest table. - /// If None, uses the lance default. + /// If None, defaults to [`lance_table::io::commit::CommitConfig::default().num_retries`]. commit_retries: Option, } @@ -829,16 +830,14 @@ impl ManifestNamespace { .execute_reader(Box::new(reader)) .await .map_err(|e| { - use lance_core::Error as LanceError; match &e { - // CommitConflict/RetryableCommitConflict: version collision retries exhausted -> Throttled (safe to retry) - LanceError::CommitConflict { .. } - | LanceError::RetryableCommitConflict { .. } => NamespaceError::Throttled { + // CommitConflict: version collision retries exhausted -> Throttled (safe to retry) + LanceError::CommitConflict { .. } => NamespaceError::Throttled { message: format!("Too many concurrent writes, please retry later: {}", e), } .into(), - // TooMuchWriteContention: semantic conflict -> ConcurrentModification (don't retry) - // IncompatibleTransaction: incompatible concurrent change -> ConcurrentModification (don't retry) + // TooMuchWriteContention: RetryableCommitConflict (semantic conflict) retries exhausted -> ConcurrentModification + // IncompatibleTransaction: incompatible concurrent change -> ConcurrentModification LanceError::TooMuchWriteContention { .. } | LanceError::IncompatibleTransaction { .. } => { NamespaceError::ConcurrentModification { From 3e6778815b3afbfd165f758c5c138af04b6b0e98 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 24 Feb 2026 16:45:35 -0800 Subject: [PATCH 04/11] more cleanup --- .../lance-namespace-impls/src/dir/manifest.rs | 126 +++++++++--------- 1 file changed, 61 insertions(+), 65 deletions(-) diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index 4325f540c98..cb9f7bc56e7 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -277,6 +277,65 @@ impl std::fmt::Debug for ManifestNamespace { } } +/// Convert a Lance commit error to an appropriate namespace error. +/// +/// Maps lance commit errors to namespace errors: +/// - `CommitConflict`: version collision retries exhausted -> Throttled (safe to retry) +/// - `TooMuchWriteContention`: RetryableCommitConflict (semantic conflict) retries exhausted -> ConcurrentModification +/// - `IncompatibleTransaction`: incompatible concurrent change -> ConcurrentModification +/// - Errors containing "matched/duplicate/already exists": ConcurrentModification (from WhenMatched::Fail) +/// - Other errors: IO error with the operation description +fn convert_lance_commit_error(e: &LanceError, operation: &str, object_id: Option<&str>) -> Error { + match e { + // CommitConflict: version collision retries exhausted -> Throttled (safe to retry) + LanceError::CommitConflict { .. } => NamespaceError::Throttled { + message: format!("Too many concurrent writes, please retry later: {:?}", e), + } + .into(), + // TooMuchWriteContention: RetryableCommitConflict (semantic conflict) retries exhausted -> ConcurrentModification + // IncompatibleTransaction: incompatible concurrent change -> ConcurrentModification + LanceError::TooMuchWriteContention { .. } | LanceError::IncompatibleTransaction { .. } => { + let message = if let Some(id) = object_id { + format!( + "Object '{}' was concurrently modified by another operation: {:?}", + id, e + ) + } else { + format!( + "Object was concurrently modified by another operation: {:?}", + e + ) + }; + NamespaceError::ConcurrentModification { message }.into() + } + // Other errors: check message for semantic conflicts (matched/duplicate from WhenMatched::Fail) + _ => { + let error_msg = e.to_string(); + if error_msg.contains("matched") + || error_msg.contains("duplicate") + || error_msg.contains("already exists") + { + let message = if let Some(id) = object_id { + format!( + "Object '{}' was concurrently created by another operation: {:?}", + id, e + ) + } else { + format!( + "Object was concurrently created by another operation: {:?}", + e + ) + }; + return NamespaceError::ConcurrentModification { message }.into(); + } + Error::IO { + source: box_error(std::io::Error::other(format!("{}: {:?}", operation, e))), + location: location!(), + } + } + } +} + impl ManifestNamespace { /// Create a new ManifestNamespace from an existing DirectoryNamespace #[allow(clippy::too_many_arguments)] @@ -830,49 +889,7 @@ impl ManifestNamespace { .execute_reader(Box::new(reader)) .await .map_err(|e| { - match &e { - // CommitConflict: version collision retries exhausted -> Throttled (safe to retry) - LanceError::CommitConflict { .. } => NamespaceError::Throttled { - message: format!("Too many concurrent writes, please retry later: {}", e), - } - .into(), - // TooMuchWriteContention: RetryableCommitConflict (semantic conflict) retries exhausted -> ConcurrentModification - // IncompatibleTransaction: incompatible concurrent change -> ConcurrentModification - LanceError::TooMuchWriteContention { .. } - | LanceError::IncompatibleTransaction { .. } => { - NamespaceError::ConcurrentModification { - message: format!( - "Object '{}' was concurrently modified by another operation", - object_id - ), - } - .into() - } - // Other errors: check message for semantic conflicts (matched/duplicate from WhenMatched::Fail) - _ => { - let error_msg = e.to_string(); - if error_msg.contains("matched") - || error_msg.contains("duplicate") - || error_msg.contains("already exists") - { - NamespaceError::ConcurrentModification { - message: format!( - "Object '{}' was concurrently created by another operation", - object_id - ), - } - .into() - } else { - Error::IO { - source: box_error(std::io::Error::other(format!( - "Failed to execute merge: {}", - e - ))), - location: location!(), - } - } - } - } + convert_lance_commit_error(&e, "Failed to execute merge", Some(&object_id)) })?; let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone()); @@ -901,28 +918,7 @@ impl ManifestNamespace { let new_dataset = lance::dataset::DeleteBuilder::new(dataset, &predicate) .execute() .await - .map_err(|e| { - let error_msg = e.to_string(); - if error_msg.contains("CommitConflict") - || error_msg.contains("TooMuchWriteContention") - { - NamespaceError::Throttled { - message: format!( - "Too many concurrent writes during delete, please retry later: {}", - error_msg - ), - } - .into() - } else { - Error::IO { - source: box_error(std::io::Error::other(format!( - "Failed to delete: {}", - e - ))), - location: location!(), - } - } - })?; + .map_err(|e| convert_lance_commit_error(&e, "Failed to delete", None))?; // Update the wrapper with the new dataset self.manifest_dataset From 0a5d20ca8706be010609a89a3caaddc815faac16 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 24 Feb 2026 17:15:41 -0800 Subject: [PATCH 05/11] fix docs --- rust/lance-namespace-impls/src/dir.rs | 2 +- rust/lance-namespace-impls/src/dir/manifest.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index d85f5cd4e46..320144b9c7e 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -375,7 +375,7 @@ impl DirectoryNamespaceBuilder { } /// Set the number of retries for commit operations on the manifest table. - /// If not set, defaults to [`lance_table::io::commit::CommitConfig::default().num_retries`]. + /// If not set, defaults to [`lance_table::io::commit::CommitConfig`] default (20). pub fn commit_retries(mut self, retries: u32) -> Self { self.commit_retries = Some(retries); self diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index cb9f7bc56e7..a441d545b8c 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -259,7 +259,7 @@ pub struct ManifestNamespace { /// after every write. Defaults to true. inline_optimization_enabled: bool, /// Number of retries for commit operations on the manifest table. - /// If None, defaults to [`lance_table::io::commit::CommitConfig::default().num_retries`]. + /// If None, defaults to [`lance_table::io::commit::CommitConfig`] default (20). commit_retries: Option, } From 7bea775e464d812623a062245df98c2fb4f5f3b5 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 24 Feb 2026 20:54:24 -0800 Subject: [PATCH 06/11] fix concurrent manifest table creation error --- .../lance-namespace-impls/src/dir/manifest.rs | 88 +++++++++---------- 1 file changed, 43 insertions(+), 45 deletions(-) diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index a441d545b8c..4bcee92241c 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -1137,56 +1137,54 @@ impl ManifestNamespace { ); Ok(DatasetConsistencyWrapper::new(dataset)) } - Err(e) => { - let error_msg = e.to_string(); - // Check if the error indicates the manifest already exists - if error_msg.contains("already exists") - || error_msg.contains("TableAlreadyExists") - || error_msg.contains("CommitConflict") - || error_msg.contains("Commit conflict") - { - log::info!( - "Manifest table was created by another process, loading it: {}", - manifest_path - ); - // Try to load the manifest that was created by another process - let recovery_store_options = ObjectStoreParams { - storage_options_accessor: storage_options.as_ref().map(|opts| { - Arc::new( - lance_io::object_store::StorageOptionsAccessor::with_static_options( - opts.clone(), - ), - ) - }), - ..Default::default() - }; - let recovery_read_params = ReadParams { - session, - store_options: Some(recovery_store_options), - ..Default::default() - }; - let dataset = DatasetBuilder::from_uri(&manifest_path) - .with_read_params(recovery_read_params) - .load() - .await - .map_err(|e| Error::IO { - source: box_error(std::io::Error::other(format!( - "Failed to load manifest dataset after creation conflict: {}", - e - ))), - location: location!(), - })?; - Ok(DatasetConsistencyWrapper::new(dataset)) - } else { - Err(Error::IO { + Err(ref e) + if matches!( + e, + LanceError::DatasetAlreadyExists { .. } + | LanceError::CommitConflict { .. } + | LanceError::IncompatibleTransaction { .. } + ) => + { + // Another process created the manifest concurrently, try to load it + log::info!( + "Manifest table was created by another process, loading it: {}", + manifest_path + ); + let recovery_store_options = ObjectStoreParams { + storage_options_accessor: storage_options.as_ref().map(|opts| { + Arc::new( + lance_io::object_store::StorageOptionsAccessor::with_static_options( + opts.clone(), + ), + ) + }), + ..Default::default() + }; + let recovery_read_params = ReadParams { + session, + store_options: Some(recovery_store_options), + ..Default::default() + }; + let dataset = DatasetBuilder::from_uri(&manifest_path) + .with_read_params(recovery_read_params) + .load() + .await + .map_err(|e| Error::IO { source: box_error(std::io::Error::other(format!( - "Failed to create manifest dataset: {}", + "Failed to load manifest dataset after creation conflict: {}", e ))), location: location!(), - }) - } + })?; + Ok(DatasetConsistencyWrapper::new(dataset)) } + Err(e) => Err(Error::IO { + source: box_error(std::io::Error::other(format!( + "Failed to create manifest dataset: {}", + e + ))), + location: location!(), + }), } } } From dcf3985c2e1f644f9c0a5946c47238887f7ea869 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 24 Feb 2026 23:19:46 -0800 Subject: [PATCH 07/11] temp fix with todo --- rust/lance-namespace-impls/src/dir/manifest.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index 4bcee92241c..c2a9f291dd5 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -873,6 +873,11 @@ impl ManifestNamespace { // conflict_retries=0: no outer loop retry on semantic conflicts (handled by caller) // commit_retries: inner retry for manifest version conflicts (uses lance default if not set) merge_builder.conflict_retries(0); + // TODO: after BTREE index creation on object_id, has_scalar_index=true causes + // MergeInsert to use V1 path which lacks bloom filters for conflict detection. This + // results in (Some, None) filter mismatch when rebasing against V2 operations. + // Setting use_index=false ensures all operations consistently use V2 path. + merge_builder.use_index(false); if let Some(retries) = self.commit_retries { merge_builder.commit_retries(retries); } From 4ba126d30f0b1f66941c0c0d7e1ba67faf3f81d7 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 25 Feb 2026 15:37:57 -0800 Subject: [PATCH 08/11] rebase and fix --- rust/lance-namespace-impls/src/dir/manifest.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index c2a9f291dd5..d1bb2a37f15 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -1148,6 +1148,7 @@ impl ManifestNamespace { LanceError::DatasetAlreadyExists { .. } | LanceError::CommitConflict { .. } | LanceError::IncompatibleTransaction { .. } + | LanceError::RetryableCommitConflict { .. } ) => { // Another process created the manifest concurrently, try to load it From cce2384289dca6919d7c9c0f04cf09f556307d6c Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 25 Feb 2026 15:58:12 -0800 Subject: [PATCH 09/11] fix docs --- python/python/tests/test_namespace_dir.py | 21 +++---------------- .../tests/test_namespace_integration.py | 21 +++---------------- 2 files changed, 6 insertions(+), 36 deletions(-) diff --git a/python/python/tests/test_namespace_dir.py b/python/python/tests/test_namespace_dir.py index d91721f3b1d..a315c58bb39 100644 --- a/python/python/tests/test_namespace_dir.py +++ b/python/python/tests/test_namespace_dir.py @@ -911,12 +911,7 @@ class TestConcurrentOperations: """ def test_concurrent_create_and_drop_single_instance(self, temp_namespace): - """Test concurrent create/drop with single namespace instance. - - Mirrors: - - Rust: test_concurrent_create_and_drop_single_instance - - Java: testConcurrentCreateAndDropWithSingleInstance - """ + """Test concurrent create/drop with single namespace instance.""" import concurrent.futures num_tables = 10 @@ -964,12 +959,7 @@ def create_and_drop_table(table_index): assert len(response.tables) == 0, "All tables should be dropped" def test_concurrent_create_and_drop_multiple_instances(self): - """Test concurrent create/drop with multiple namespace instances. - - Mirrors: - - Rust: test_concurrent_create_and_drop_multiple_instances - - Java: testConcurrentCreateAndDropWithMultipleInstances - """ + """Test concurrent create/drop with multiple namespace instances.""" import concurrent.futures with tempfile.TemporaryDirectory() as tmpdir: @@ -1030,12 +1020,7 @@ def create_and_drop_table(table_index): assert len(response.tables) == 0, "All tables should be dropped" def test_concurrent_create_then_drop_from_different_instance(self): - """Test creating from one set of instances, dropping from different ones. - - Mirrors: - - Rust: test_concurrent_create_then_drop_from_different_instance - - Java: testConcurrentCreateThenDropFromDifferentInstance - """ + """Test creating from one set of instances, dropping from different ones.""" import concurrent.futures with tempfile.TemporaryDirectory() as tmpdir: diff --git a/python/python/tests/test_namespace_integration.py b/python/python/tests/test_namespace_integration.py index f58b358ba4f..8582651086a 100644 --- a/python/python/tests/test_namespace_integration.py +++ b/python/python/tests/test_namespace_integration.py @@ -903,12 +903,7 @@ def test_basic_create_and_drop_on_s3(s3_bucket: str): @pytest.mark.integration def test_concurrent_create_and_drop_single_instance_on_s3(s3_bucket: str): - """Test concurrent create/drop with single namespace instance on S3. - - Mirrors: - - Rust: test_concurrent_create_and_drop_single_instance - - Java: testConcurrentCreateAndDropWithSingleInstanceOnS3 - """ + """Test concurrent create/drop with single namespace instance on S3.""" import concurrent.futures from lance.namespace import DirectoryNamespace @@ -963,12 +958,7 @@ def create_and_drop_table(table_index): @pytest.mark.integration def test_concurrent_create_and_drop_multiple_instances_on_s3(s3_bucket: str): - """Test concurrent create/drop with multiple namespace instances on S3. - - Mirrors: - - Rust: test_concurrent_create_and_drop_multiple_instances - - Java: testConcurrentCreateAndDropWithMultipleInstancesOnS3 - """ + """Test concurrent create/drop with multiple namespace instances on S3.""" import concurrent.futures from lance.namespace import DirectoryNamespace @@ -1030,12 +1020,7 @@ def create_and_drop_table(table_index): @pytest.mark.integration def test_concurrent_create_then_drop_from_different_instance_on_s3(s3_bucket: str): - """Test creating from one set of instances, dropping from different ones on S3. - - Mirrors: - - Rust: test_concurrent_create_then_drop_from_different_instance - - Java: testConcurrentCreateThenDropFromDifferentInstanceOnS3 - """ + """Test creating from one set of instances, dropping from different ones on S3.""" import concurrent.futures from lance.namespace import DirectoryNamespace From be7cd3f6467c386967900738e559665b854688de Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 25 Feb 2026 17:22:42 -0800 Subject: [PATCH 10/11] fix --- java/lance-jni/Cargo.lock | 195 +++++++----------- .../org/lance/NamespaceIntegrationTest.java | 30 ++- .../namespace/DirectoryNamespaceTest.java | 54 ++++- python/python/tests/test_namespace_dir.py | 29 ++- .../tests/test_namespace_integration.py | 37 +++- .../lance-namespace-impls/src/dir/manifest.rs | 41 +++- 6 files changed, 245 insertions(+), 141 deletions(-) diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index 19ed5aaca10..4b844c87bc4 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -1429,9 +1429,9 @@ dependencies = [ [[package]] name = "datafusion" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ba7cb113e9c0bedf9e9765926031e132fa05a1b09ba6e93a6d1a4d7044457b8" +checksum = "d12ee9fdc6cdb5898c7691bb994f0ba606c4acc93a2258d78bb9f26ff8158bb3" dependencies = [ "arrow", "arrow-schema", @@ -1471,7 +1471,6 @@ dependencies = [ "parquet", "rand 0.9.2", "regex", - "rstest", "sqlparser", "tempfile", "tokio", @@ -1481,9 +1480,9 @@ dependencies = [ [[package]] name = "datafusion-catalog" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66a3a799f914a59b1ea343906a0486f17061f39509af74e874a866428951130d" +checksum = "462dc9ef45e5d688aeaae49a7e310587e81b6016b9d03bace5626ad0043e5a9e" dependencies = [ "arrow", "async-trait", @@ -1506,9 +1505,9 @@ dependencies = [ [[package]] name = "datafusion-catalog-listing" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db1b113c80d7a0febcd901476a57aef378e717c54517a163ed51417d87621b0" +checksum = "1b96dbf1d728fc321817b744eb5080cdd75312faa6980b338817f68f3caa4208" dependencies = [ "arrow", "async-trait", @@ -1525,21 +1524,20 @@ dependencies = [ "itertools 0.14.0", "log", "object_store", - "tokio", ] [[package]] name = "datafusion-common" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c10f7659e96127d25e8366be7c8be4109595d6a2c3eac70421f380a7006a1b0" +checksum = "3237a6ff0d2149af4631290074289cae548c9863c885d821315d54c6673a074a" dependencies = [ "ahash", "arrow", "arrow-ipc", "chrono", "half", - "hashbrown 0.14.5", + "hashbrown 0.16.1", "indexmap", "libc", "log", @@ -1553,9 +1551,9 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b92065bbc6532c6651e2f7dd30b55cba0c7a14f860c7e1d15f165c41a1868d95" +checksum = "70b5e34026af55a1bfccb1ef0a763cf1f64e77c696ffcf5a128a278c31236528" dependencies = [ "futures", "log", @@ -1564,9 +1562,9 @@ dependencies = [ [[package]] name = "datafusion-datasource" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fde13794244bc7581cd82f6fff217068ed79cdc344cafe4ab2c3a1c3510b38d6" +checksum = "1b2a6be734cc3785e18bbf2a7f2b22537f6b9fb960d79617775a51568c281842" dependencies = [ "arrow", "async-trait", @@ -1593,9 +1591,9 @@ dependencies = [ [[package]] name = "datafusion-datasource-arrow" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "804fa9b4ecf3157982021770617200ef7c1b2979d57bec9044748314775a9aea" +checksum = "1739b9b07c9236389e09c74f770e88aff7055250774e9def7d3f4f56b3dcc7be" dependencies = [ "arrow", "arrow-ipc", @@ -1617,9 +1615,9 @@ dependencies = [ [[package]] name = "datafusion-datasource-csv" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a1641a40b259bab38131c5e6f48fac0717bedb7dc93690e604142a849e0568" +checksum = "61c73bc54b518bbba7c7650299d07d58730293cfba4356f6f428cc94c20b7600" dependencies = [ "arrow", "async-trait", @@ -1640,9 +1638,9 @@ dependencies = [ [[package]] name = "datafusion-datasource-json" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adeacdb00c1d37271176f8fb6a1d8ce096baba16ea7a4b2671840c5c9c64fe85" +checksum = "37812c8494c698c4d889374ecfabbff780f1f26d9ec095dd1bddfc2a8ca12559" dependencies = [ "arrow", "async-trait", @@ -1662,9 +1660,9 @@ dependencies = [ [[package]] name = "datafusion-datasource-parquet" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43d0b60ffd66f28bfb026565d62b0a6cbc416da09814766a3797bba7d85a3cd9" +checksum = "2210937ecd9f0e824c397e73f4b5385c97cd1aff43ab2b5836fcfd2d321523fb" dependencies = [ "arrow", "async-trait", @@ -1692,18 +1690,19 @@ dependencies = [ [[package]] name = "datafusion-doc" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b99e13947667b36ad713549237362afb054b2d8f8cc447751e23ec61202db07" +checksum = "2c825f969126bc2ef6a6a02d94b3c07abff871acf4d6dd759ce1255edb7923ce" [[package]] name = "datafusion-execution" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63695643190679037bc946ad46a263b62016931547bf119859c511f7ff2f5178" +checksum = "fa03ef05a2c2f90dd6c743e3e111078e322f4b395d20d4b4d431a245d79521ae" dependencies = [ "arrow", "async-trait", + "chrono", "dashmap", "datafusion-common", "datafusion-expr", @@ -1718,9 +1717,9 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a4787cbf5feb1ab351f789063398f67654a6df75c4d37d7f637dc96f951a91" +checksum = "ef33934c1f98ee695cc51192cc5f9ed3a8febee84fdbcd9131bf9d3a9a78276f" dependencies = [ "arrow", "async-trait", @@ -1740,9 +1739,9 @@ dependencies = [ [[package]] name = "datafusion-expr-common" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ce2fb1b8c15c9ac45b0863c30b268c69dc9ee7a1ee13ecf5d067738338173dc" +checksum = "000c98206e3dd47d2939a94b6c67af4bfa6732dd668ac4fafdbde408fd9134ea" dependencies = [ "arrow", "datafusion-common", @@ -1753,9 +1752,9 @@ dependencies = [ [[package]] name = "datafusion-functions" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "794a9db7f7b96b3346fc007ff25e994f09b8f0511b4cf7dff651fadfe3ebb28f" +checksum = "379b01418ab95ca947014066248c22139fe9af9289354de10b445bd000d5d276" dependencies = [ "arrow", "arrow-buffer", @@ -1763,6 +1762,7 @@ dependencies = [ "blake2", "blake3", "chrono", + "chrono-tz", "datafusion-common", "datafusion-doc", "datafusion-execution", @@ -1783,9 +1783,9 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c25210520a9dcf9c2b2cbbce31ebd4131ef5af7fc60ee92b266dc7d159cb305" +checksum = "fd00d5454ba4c3f8ebbd04bd6a6a9dc7ced7c56d883f70f2076c188be8459e4c" dependencies = [ "ahash", "arrow", @@ -1804,9 +1804,9 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62f4a66f3b87300bb70f4124b55434d2ae3fe80455f3574701d0348da040b55d" +checksum = "aec06b380729a87210a4e11f555ec2d729a328142253f8d557b87593622ecc9f" dependencies = [ "ahash", "arrow", @@ -1817,9 +1817,9 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae5c06eed03918dc7fe7a9f082a284050f0e9ecf95d72f57712d1496da03b8c4" +checksum = "904f48d45e0f1eb7d0eb5c0f80f2b5c6046a85454364a6b16a2e0b46f62e7dff" dependencies = [ "arrow", "arrow-ord", @@ -1840,9 +1840,9 @@ dependencies = [ [[package]] name = "datafusion-functions-table" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db4fed1d71738fbe22e2712d71396db04c25de4111f1ec252b8f4c6d3b25d7f5" +checksum = "e9a0d20e2b887e11bee24f7734d780a2588b925796ac741c3118dd06d5aa77f0" dependencies = [ "arrow", "async-trait", @@ -1856,9 +1856,9 @@ dependencies = [ [[package]] name = "datafusion-functions-window" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d92206aa5ae21892f1552b4d61758a862a70956e6fd7a95cb85db1de74bc6d1" +checksum = "d3414b0a07e39b6979fe3a69c7aa79a9f1369f1d5c8e52146e66058be1b285ee" dependencies = [ "arrow", "datafusion-common", @@ -1874,9 +1874,9 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53ae9bcc39800820d53a22d758b3b8726ff84a5a3e24cecef04ef4e5fdf1c7cc" +checksum = "5bf2feae63cd4754e31add64ce75cae07d015bce4bb41cd09872f93add32523a" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -1884,9 +1884,9 @@ dependencies = [ [[package]] name = "datafusion-macros" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1063ad4c9e094b3f798acee16d9a47bd7372d9699be2de21b05c3bd3f34ab848" +checksum = "c4fe888aeb6a095c4bcbe8ac1874c4b9a4c7ffa2ba849db7922683ba20875aaf" dependencies = [ "datafusion-doc", "quote", @@ -1895,9 +1895,9 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f35f9ec5d08b87fd1893a30c2929f2559c2f9806ca072d8fefca5009dc0f06a" +checksum = "8a6527c063ae305c11be397a86d8193936f4b84d137fe40bd706dfc178cf733c" dependencies = [ "arrow", "chrono", @@ -1914,9 +1914,9 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c30cc8012e9eedcb48bbe112c6eff4ae5ed19cf3003cb0f505662e88b7014c5d" +checksum = "0bb028323dd4efd049dd8a78d78fe81b2b969447b39c51424167f973ac5811d9" dependencies = [ "ahash", "arrow", @@ -1926,19 +1926,20 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-physical-expr-common", "half", - "hashbrown 0.14.5", + "hashbrown 0.16.1", "indexmap", "itertools 0.14.0", "parking_lot", "paste", "petgraph", + "tokio", ] [[package]] name = "datafusion-physical-expr-adapter" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f9ff2dbd476221b1f67337699eff432781c4e6e1713d2aefdaa517dfbf79768" +checksum = "78fe0826aef7eab6b4b61533d811234a7a9e5e458331ebbf94152a51fc8ab433" dependencies = [ "arrow", "datafusion-common", @@ -1951,23 +1952,26 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90da43e1ec550b172f34c87ec68161986ced70fd05c8d2a2add66eef9c276f03" +checksum = "cfccd388620734c661bd8b7ca93c44cdd59fecc9b550eea416a78ffcbb29475f" dependencies = [ "ahash", "arrow", + "chrono", "datafusion-common", "datafusion-expr-common", - "hashbrown 0.14.5", + "hashbrown 0.16.1", + "indexmap", "itertools 0.14.0", + "parking_lot", ] [[package]] name = "datafusion-physical-optimizer" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce9804f799acd7daef3be7aaffe77c0033768ed8fdbf5fb82fc4c5f2e6bc14e6" +checksum = "bde5fa10e73259a03b705d5fddc136516814ab5f441b939525618a4070f5a059" dependencies = [ "arrow", "datafusion-common", @@ -1983,27 +1987,27 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0acf0ad6b6924c6b1aa7d213b181e012e2d3ec0a64ff5b10ee6282ab0f8532ac" +checksum = "0e1098760fb29127c24cc9ade3277051dc73c9ed0ac0131bd7bcd742e0ad7470" dependencies = [ "ahash", "arrow", "arrow-ord", "arrow-schema", "async-trait", - "chrono", "datafusion-common", "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", + "datafusion-functions", "datafusion-functions-aggregate-common", "datafusion-functions-window-common", "datafusion-physical-expr", "datafusion-physical-expr-common", "futures", "half", - "hashbrown 0.14.5", + "hashbrown 0.16.1", "indexmap", "itertools 0.14.0", "log", @@ -2014,9 +2018,9 @@ dependencies = [ [[package]] name = "datafusion-pruning" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac2c2498a1f134a9e11a9f5ed202a2a7d7e9774bd9249295593053ea3be999db" +checksum = "64d0fef4201777b52951edec086c21a5b246f3c82621569ddb4a26f488bc38a9" dependencies = [ "arrow", "datafusion-common", @@ -2031,9 +2035,9 @@ dependencies = [ [[package]] name = "datafusion-session" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f96eebd17555386f459037c65ab73aae8df09f464524c709d6a3134ad4f4776" +checksum = "f71f1e39e8f2acbf1c63b0e93756c2e970a64729dab70ac789587d6237c4fde0" dependencies = [ "async-trait", "datafusion-common", @@ -2045,9 +2049,9 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fc195fe60634b2c6ccfd131b487de46dc30eccae8a3c35a13f136e7f440414f" +checksum = "f44693cfcaeb7a9f12d71d1c576c3a6dc025a12cef209375fa2d16fb3b5670ee" dependencies = [ "arrow", "bigdecimal", @@ -2062,9 +2066,9 @@ dependencies = [ [[package]] name = "datafusion-substrait" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2505af06d103a55b4e8ded0c6aeb6c72a771948da939c0bd3f8eee67af475a9c" +checksum = "6042adacd0bd64e56c22f6a7f9ce0ce1793dd367c899d868179d029f110d9215" dependencies = [ "async-recursion", "async-trait", @@ -2478,12 +2482,6 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" -[[package]] -name = "futures-timer" -version = "3.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" - [[package]] name = "futures-util" version = "0.3.32" @@ -2611,9 +2609,9 @@ dependencies = [ [[package]] name = "geodatafusion" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "773cfa1fb0d7f7661b76b3fde00f3ffd8e0ff7b3635096f0ff6294fe5ca62a2b" +checksum = "4cb8faa9b3bf4ae9f49b1f023b82d20626826f6448a7055498376146c10c4ead" dependencies = [ "arrow-arith", "arrow-array", @@ -2751,10 +2749,6 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" -dependencies = [ - "ahash", - "allocator-api2", -] [[package]] name = "hashbrown" @@ -5193,12 +5187,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "relative-path" -version = "1.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" - [[package]] name = "reqsign" version = "0.16.5" @@ -5339,35 +5327,6 @@ dependencies = [ "smallvec", ] -[[package]] -name = "rstest" -version = "0.26.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5a3193c063baaa2a95a33f03035c8a72b83d97a54916055ba22d35ed3839d49" -dependencies = [ - "futures-timer", - "futures-util", - "rstest_macros", -] - -[[package]] -name = "rstest_macros" -version = "0.26.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c845311f0ff7951c5506121a9ad75aec44d083c31583b2ea5a30bcb0b0abba0" -dependencies = [ - "cfg-if", - "glob", - "proc-macro-crate", - "proc-macro2", - "quote", - "regex", - "relative-path", - "rustc_version", - "syn 2.0.117", - "unicode-ident", -] - [[package]] name = "rust-ini" version = "0.21.3" diff --git a/java/src/test/java/org/lance/NamespaceIntegrationTest.java b/java/src/test/java/org/lance/NamespaceIntegrationTest.java index 975bf4d34ff..a14afac7757 100644 --- a/java/src/test/java/org/lance/NamespaceIntegrationTest.java +++ b/java/src/test/java/org/lance/NamespaceIntegrationTest.java @@ -18,6 +18,7 @@ import org.lance.namespace.LanceNamespaceStorageOptionsProvider; import org.lance.namespace.model.CreateEmptyTableRequest; import org.lance.namespace.model.CreateEmptyTableResponse; +import org.lance.namespace.model.CreateNamespaceRequest; import org.lance.namespace.model.CreateTableRequest; import org.lance.namespace.model.CreateTableResponse; import org.lance.namespace.model.DeclareTableRequest; @@ -1560,6 +1561,12 @@ void testConcurrentCreateAndDropWithSingleInstanceOnS3() throws Exception { namespace.initialize(createDirectoryNamespaceS3Config(), testAllocator); try { + // Initialize namespace first - create parent namespace to ensure __manifest table + // is created before concurrent operations + CreateNamespaceRequest createNsReq = + new CreateNamespaceRequest().id(Arrays.asList("test_ns")); + namespace.createNamespace(createNsReq); + int numTables = 10; ExecutorService executor = Executors.newFixedThreadPool(numTables); CountDownLatch startLatch = new CountDownLatch(1); @@ -1608,6 +1615,16 @@ void testConcurrentCreateAndDropWithSingleInstanceOnS3() throws Exception { @Test void testConcurrentCreateAndDropWithMultipleInstancesOnS3() throws Exception { + Map baseConfig = createDirectoryNamespaceS3Config(); + + // Initialize namespace first with a single instance to ensure __manifest + // table is created and parent namespace exists before concurrent operations + DirectoryNamespace initNs = new DirectoryNamespace(); + initNs.initialize(new HashMap<>(baseConfig), testAllocator); + CreateNamespaceRequest createNsReq = new CreateNamespaceRequest().id(Arrays.asList("test_ns")); + initNs.createNamespace(createNsReq); + initNs.close(); + int numTables = 10; ExecutorService executor = Executors.newFixedThreadPool(numTables); CountDownLatch startLatch = new CountDownLatch(1); @@ -1616,8 +1633,6 @@ void testConcurrentCreateAndDropWithMultipleInstancesOnS3() throws Exception { AtomicInteger failCount = new AtomicInteger(0); List namespaces = new ArrayList<>(); - Map baseConfig = createDirectoryNamespaceS3Config(); - for (int i = 0; i < numTables; i++) { final int tableIndex = i; executor.submit( @@ -1672,9 +1687,18 @@ void testConcurrentCreateAndDropWithMultipleInstancesOnS3() throws Exception { @Test void testConcurrentCreateThenDropFromDifferentInstanceOnS3() throws Exception { - int numTables = 10; Map baseConfig = createDirectoryNamespaceS3Config(); + // Initialize namespace first with a single instance to ensure __manifest + // table is created and parent namespace exists before concurrent operations + DirectoryNamespace initNs = new DirectoryNamespace(); + initNs.initialize(new HashMap<>(baseConfig), testAllocator); + CreateNamespaceRequest createNsReq = new CreateNamespaceRequest().id(Arrays.asList("test_ns")); + initNs.createNamespace(createNsReq); + initNs.close(); + + int numTables = 10; + // First, create all tables using separate namespace instances ExecutorService createExecutor = Executors.newFixedThreadPool(numTables); CountDownLatch createStartLatch = new CountDownLatch(1); diff --git a/java/src/test/java/org/lance/namespace/DirectoryNamespaceTest.java b/java/src/test/java/org/lance/namespace/DirectoryNamespaceTest.java index 87885ff594f..b5144f433d4 100644 --- a/java/src/test/java/org/lance/namespace/DirectoryNamespaceTest.java +++ b/java/src/test/java/org/lance/namespace/DirectoryNamespaceTest.java @@ -697,6 +697,11 @@ public VectorSchemaRoot getVectorSchemaRoot() { @Test void testConcurrentCreateAndDropWithSingleInstance() throws Exception { + // Initialize namespace first - create parent namespace to ensure __manifest table + // is created before concurrent operations + CreateNamespaceRequest createNsReq = new CreateNamespaceRequest().id(Arrays.asList("test_ns")); + namespace.createNamespace(createNsReq); + int numTables = 10; ExecutorService executor = Executors.newFixedThreadPool(numTables); CountDownLatch startLatch = new CountDownLatch(1); @@ -714,10 +719,12 @@ void testConcurrentCreateAndDropWithSingleInstance() throws Exception { String tableName = "concurrent_table_" + tableIndex; byte[] tableData = createTestTableData(); - CreateTableRequest createReq = new CreateTableRequest().id(Arrays.asList(tableName)); + CreateTableRequest createReq = + new CreateTableRequest().id(Arrays.asList("test_ns", tableName)); namespace.createTable(createReq, tableData); - DropTableRequest dropReq = new DropTableRequest().id(Arrays.asList(tableName)); + DropTableRequest dropReq = + new DropTableRequest().id(Arrays.asList("test_ns", tableName)); namespace.dropTable(dropReq); successCount.incrementAndGet(); @@ -738,13 +745,25 @@ void testConcurrentCreateAndDropWithSingleInstance() throws Exception { assertEquals(numTables, successCount.get(), "All tasks should succeed"); assertEquals(0, failCount.get(), "No tasks should fail"); - ListTablesRequest listReq = new ListTablesRequest().id(Arrays.asList()); + ListTablesRequest listReq = new ListTablesRequest().id(Arrays.asList("test_ns")); ListTablesResponse listResp = namespace.listTables(listReq); assertEquals(0, listResp.getTables().size(), "All tables should be dropped"); } @Test void testConcurrentCreateAndDropWithMultipleInstances() throws Exception { + // Initialize namespace first with a single instance to ensure __manifest + // table is created and parent namespace exists before concurrent operations + DirectoryNamespace initNs = new DirectoryNamespace(); + Map initConfig = new HashMap<>(); + initConfig.put("root", tempDir.toString()); + initConfig.put("inline_optimization_enabled", "false"); + initNs.initialize(initConfig, allocator); + + CreateNamespaceRequest createNsReq = new CreateNamespaceRequest().id(Arrays.asList("test_ns")); + initNs.createNamespace(createNsReq); + initNs.close(); + int numTables = 10; ExecutorService executor = Executors.newFixedThreadPool(numTables); CountDownLatch startLatch = new CountDownLatch(1); @@ -754,6 +773,7 @@ void testConcurrentCreateAndDropWithMultipleInstances() throws Exception { List namespaces = new ArrayList<>(); for (int i = 0; i < numTables; i++) { + final int tableIndex = i; executor.submit( () -> { DirectoryNamespace localNs = null; @@ -770,13 +790,15 @@ void testConcurrentCreateAndDropWithMultipleInstances() throws Exception { namespaces.add(localNs); } - String tableName = "multi_ns_table_" + Thread.currentThread().getId(); + String tableName = "multi_ns_table_" + tableIndex; byte[] tableData = createTestTableData(); - CreateTableRequest createReq = new CreateTableRequest().id(Arrays.asList(tableName)); + CreateTableRequest createReq = + new CreateTableRequest().id(Arrays.asList("test_ns", tableName)); localNs.createTable(createReq, tableData); - DropTableRequest dropReq = new DropTableRequest().id(Arrays.asList(tableName)); + DropTableRequest dropReq = + new DropTableRequest().id(Arrays.asList("test_ns", tableName)); localNs.dropTable(dropReq); successCount.incrementAndGet(); @@ -812,7 +834,7 @@ void testConcurrentCreateAndDropWithMultipleInstances() throws Exception { config.put("root", tempDir.toString()); verifyNs.initialize(config, allocator); - ListTablesRequest listReq = new ListTablesRequest().id(Arrays.asList()); + ListTablesRequest listReq = new ListTablesRequest().id(Arrays.asList("test_ns")); ListTablesResponse listResp = verifyNs.listTables(listReq); assertEquals(0, listResp.getTables().size(), "All tables should be dropped"); @@ -821,6 +843,18 @@ void testConcurrentCreateAndDropWithMultipleInstances() throws Exception { @Test void testConcurrentCreateThenDropFromDifferentInstance() throws Exception { + // Initialize namespace first with a single instance to ensure __manifest + // table is created and parent namespace exists before concurrent operations + DirectoryNamespace initNs = new DirectoryNamespace(); + Map initConfig = new HashMap<>(); + initConfig.put("root", tempDir.toString()); + initConfig.put("inline_optimization_enabled", "false"); + initNs.initialize(initConfig, allocator); + + CreateNamespaceRequest createNsReq = new CreateNamespaceRequest().id(Arrays.asList("test_ns")); + initNs.createNamespace(createNsReq); + initNs.close(); + int numTables = 10; // First, create all tables using separate namespace instances @@ -851,7 +885,8 @@ void testConcurrentCreateThenDropFromDifferentInstance() throws Exception { String tableName = "cross_instance_table_" + tableIndex; byte[] tableData = createTestTableData(); - CreateTableRequest createReq = new CreateTableRequest().id(Arrays.asList(tableName)); + CreateTableRequest createReq = + new CreateTableRequest().id(Arrays.asList("test_ns", tableName)); localNs.createTable(createReq, tableData); createSuccessCount.incrementAndGet(); @@ -906,7 +941,8 @@ void testConcurrentCreateThenDropFromDifferentInstance() throws Exception { String tableName = "cross_instance_table_" + tableIndex; - DropTableRequest dropReq = new DropTableRequest().id(Arrays.asList(tableName)); + DropTableRequest dropReq = + new DropTableRequest().id(Arrays.asList("test_ns", tableName)); localNs.dropTable(dropReq); dropSuccessCount.incrementAndGet(); diff --git a/python/python/tests/test_namespace_dir.py b/python/python/tests/test_namespace_dir.py index a315c58bb39..6cba5cf5135 100644 --- a/python/python/tests/test_namespace_dir.py +++ b/python/python/tests/test_namespace_dir.py @@ -914,6 +914,11 @@ def test_concurrent_create_and_drop_single_instance(self, temp_namespace): """Test concurrent create/drop with single namespace instance.""" import concurrent.futures + # Initialize namespace first - create parent namespace to ensure __manifest table + # is created before concurrent operations + create_ns_req = CreateNamespaceRequest(id=["test_ns"]) + temp_namespace.create_namespace(create_ns_req) + num_tables = 10 success_count = 0 fail_count = 0 @@ -954,7 +959,7 @@ def create_and_drop_table(table_index): assert fail_count == 0, f"Expected 0 failures, got {fail_count}" # Verify all tables are dropped - list_req = ListTablesRequest(id=[]) + list_req = ListTablesRequest(id=["test_ns"]) response = temp_namespace.list_tables(list_req) assert len(response.tables) == 0, "All tables should be dropped" @@ -963,6 +968,15 @@ def test_concurrent_create_and_drop_multiple_instances(self): import concurrent.futures with tempfile.TemporaryDirectory() as tmpdir: + # Initialize namespace first with a single instance to ensure __manifest + # table is created and parent namespace exists before concurrent operations + init_ns = connect( + "dir", + {"root": f"file://{tmpdir}", "commit_retries": "2147483647"}, + ) + create_ns_req = CreateNamespaceRequest(id=["test_ns"]) + init_ns.create_namespace(create_ns_req) + num_tables = 10 success_count = 0 fail_count = 0 @@ -1015,7 +1029,7 @@ def create_and_drop_table(table_index): verify_ns = connect( "dir", {"root": f"file://{tmpdir}", "commit_retries": "2147483647"} ) - list_req = ListTablesRequest(id=[]) + list_req = ListTablesRequest(id=["test_ns"]) response = verify_ns.list_tables(list_req) assert len(response.tables) == 0, "All tables should be dropped" @@ -1024,6 +1038,15 @@ def test_concurrent_create_then_drop_from_different_instance(self): import concurrent.futures with tempfile.TemporaryDirectory() as tmpdir: + # Initialize namespace first with a single instance to ensure __manifest + # table is created and parent namespace exists before concurrent operations + init_ns = connect( + "dir", + {"root": f"file://{tmpdir}", "commit_retries": "2147483647"}, + ) + create_ns_req = CreateNamespaceRequest(id=["test_ns"]) + init_ns.create_namespace(create_ns_req) + num_tables = 10 # Phase 1: Create all tables concurrently using separate namespace instances @@ -1107,6 +1130,6 @@ def drop_table(table_index): verify_ns = connect( "dir", {"root": f"file://{tmpdir}", "commit_retries": "2147483647"} ) - list_req = ListTablesRequest(id=[]) + list_req = ListTablesRequest(id=["test_ns"]) response = verify_ns.list_tables(list_req) assert len(response.tables) == 0, "All tables should be dropped" diff --git a/python/python/tests/test_namespace_integration.py b/python/python/tests/test_namespace_integration.py index 8582651086a..7bc1a362d97 100644 --- a/python/python/tests/test_namespace_integration.py +++ b/python/python/tests/test_namespace_integration.py @@ -907,7 +907,11 @@ def test_concurrent_create_and_drop_single_instance_on_s3(s3_bucket: str): import concurrent.futures from lance.namespace import DirectoryNamespace - from lance_namespace import CreateTableRequest, DropTableRequest + from lance_namespace import ( + CreateNamespaceRequest, + CreateTableRequest, + DropTableRequest, + ) test_prefix = f"test-{uuid.uuid4().hex[:8]}" storage_options = copy.deepcopy(CONFIG) @@ -917,6 +921,11 @@ def test_concurrent_create_and_drop_single_instance_on_s3(s3_bucket: str): dir_props["commit_retries"] = "2147483647" namespace = DirectoryNamespace(**dir_props) + # Initialize namespace first - create parent namespace to ensure __manifest table + # is created before concurrent operations + create_ns_req = CreateNamespaceRequest(id=["test_ns"]) + namespace.create_namespace(create_ns_req) + num_tables = 10 success_count = 0 fail_count = 0 @@ -962,7 +971,12 @@ def test_concurrent_create_and_drop_multiple_instances_on_s3(s3_bucket: str): import concurrent.futures from lance.namespace import DirectoryNamespace - from lance_namespace import CreateTableRequest, DropTableRequest, ListTablesRequest + from lance_namespace import ( + CreateNamespaceRequest, + CreateTableRequest, + DropTableRequest, + ListTablesRequest, + ) test_prefix = f"test-{uuid.uuid4().hex[:8]}" storage_options = copy.deepcopy(CONFIG) @@ -971,6 +985,12 @@ def test_concurrent_create_and_drop_multiple_instances_on_s3(s3_bucket: str): # Very high retry count to guarantee all operations succeed base_dir_props["commit_retries"] = "2147483647" + # Initialize namespace first with a single instance to ensure __manifest + # table is created and parent namespace exists before concurrent operations + init_ns = DirectoryNamespace(**base_dir_props.copy()) + create_ns_req = CreateNamespaceRequest(id=["test_ns"]) + init_ns.create_namespace(create_ns_req) + num_tables = 10 success_count = 0 fail_count = 0 @@ -1024,7 +1044,12 @@ def test_concurrent_create_then_drop_from_different_instance_on_s3(s3_bucket: st import concurrent.futures from lance.namespace import DirectoryNamespace - from lance_namespace import CreateTableRequest, DropTableRequest, ListTablesRequest + from lance_namespace import ( + CreateNamespaceRequest, + CreateTableRequest, + DropTableRequest, + ListTablesRequest, + ) test_prefix = f"test-{uuid.uuid4().hex[:8]}" storage_options = copy.deepcopy(CONFIG) @@ -1033,6 +1058,12 @@ def test_concurrent_create_then_drop_from_different_instance_on_s3(s3_bucket: st # Very high retry count to guarantee all operations succeed base_dir_props["commit_retries"] = "2147483647" + # Initialize namespace first with a single instance to ensure __manifest + # table is created and parent namespace exists before concurrent operations + init_ns = DirectoryNamespace(**base_dir_props.copy()) + create_ns_req = CreateNamespaceRequest(id=["test_ns"]) + init_ns.create_namespace(create_ns_req) + num_tables = 10 # Phase 1: Create all tables concurrently using separate namespace instances diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index d1bb2a37f15..a4c7b7e621d 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -2086,8 +2086,8 @@ mod tests { use bytes::Bytes; use lance_core::utils::tempfile::TempStdDir; use lance_namespace::models::{ - CreateTableRequest, DescribeTableRequest, DropTableRequest, ListTablesRequest, - TableExistsRequest, + CreateNamespaceRequest, CreateTableRequest, DescribeTableRequest, DropTableRequest, + ListTablesRequest, TableExistsRequest, }; use lance_namespace::LanceNamespace; use rstest::rstest; @@ -2787,6 +2787,15 @@ mod tests { .unwrap(), ); + // Initialize namespace first - create parent namespace to ensure __manifest table + // is created before concurrent operations + let mut create_ns_request = CreateNamespaceRequest::new(); + create_ns_request.id = Some(vec!["test_ns".to_string()]); + dir_namespace + .create_namespace(create_ns_request) + .await + .unwrap(); + let num_tables = 10; let mut handles = Vec::new(); @@ -2823,7 +2832,7 @@ mod tests { // Verify all tables are dropped let mut request = ListTablesRequest::new(); - request.id = Some(vec![]); + request.id = Some(vec!["test_ns".to_string()]); let response = dir_namespace.list_tables(request).await.unwrap(); assert_eq!(response.tables.len(), 0, "All tables should be dropped"); } @@ -2838,6 +2847,17 @@ mod tests { let temp_dir = TempStdDir::default(); let temp_path = temp_dir.to_str().unwrap().to_string(); + // Initialize namespace first with a single instance to ensure __manifest + // table is created and parent namespace exists before concurrent operations + let init_ns = DirectoryNamespaceBuilder::new(&temp_path) + .inline_optimization_enabled(inline_optimization) + .build() + .await + .unwrap(); + let mut create_ns_request = CreateNamespaceRequest::new(); + create_ns_request.id = Some(vec!["test_ns".to_string()]); + init_ns.create_namespace(create_ns_request).await.unwrap(); + let num_tables = 10; let mut handles = Vec::new(); @@ -2887,7 +2907,7 @@ mod tests { .unwrap(); let mut request = ListTablesRequest::new(); - request.id = Some(vec![]); + request.id = Some(vec!["test_ns".to_string()]); let response = verify_ns.list_tables(request).await.unwrap(); assert_eq!(response.tables.len(), 0, "All tables should be dropped"); } @@ -2904,6 +2924,17 @@ mod tests { let temp_dir = TempStdDir::default(); let temp_path = temp_dir.to_str().unwrap().to_string(); + // Initialize namespace first with a single instance to ensure __manifest + // table is created and parent namespace exists before concurrent operations + let init_ns = DirectoryNamespaceBuilder::new(&temp_path) + .inline_optimization_enabled(inline_optimization) + .build() + .await + .unwrap(); + let mut create_ns_request = CreateNamespaceRequest::new(); + create_ns_request.id = Some(vec!["test_ns".to_string()]); + init_ns.create_namespace(create_ns_request).await.unwrap(); + let num_tables = 10; // Phase 1: Create all tables concurrently using separate namespace instances @@ -2975,7 +3006,7 @@ mod tests { .unwrap(); let mut request = ListTablesRequest::new(); - request.id = Some(vec![]); + request.id = Some(vec!["test_ns".to_string()]); let response = verify_ns.list_tables(request).await.unwrap(); assert_eq!(response.tables.len(), 0, "All tables should be dropped"); } From 96fd05bf40a8b9886d9e09d3b40e6ac2625fdba6 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 25 Feb 2026 17:52:04 -0800 Subject: [PATCH 11/11] lint --- python/python/tests/test_namespace_dir.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/python/tests/test_namespace_dir.py b/python/python/tests/test_namespace_dir.py index 6cba5cf5135..6b876e52306 100644 --- a/python/python/tests/test_namespace_dir.py +++ b/python/python/tests/test_namespace_dir.py @@ -914,8 +914,8 @@ def test_concurrent_create_and_drop_single_instance(self, temp_namespace): """Test concurrent create/drop with single namespace instance.""" import concurrent.futures - # Initialize namespace first - create parent namespace to ensure __manifest table - # is created before concurrent operations + # Initialize namespace first - create parent namespace to ensure __manifest + # table is created before concurrent operations create_ns_req = CreateNamespaceRequest(id=["test_ns"]) temp_namespace.create_namespace(create_ns_req)