Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 77 additions & 118 deletions java/lance-jni/Cargo.lock

Large diffs are not rendered by default.

368 changes: 368 additions & 0 deletions java/src/test/java/org/lance/NamespaceIntegrationTest.java

Large diffs are not rendered by default.

282 changes: 282 additions & 0 deletions java/src/test/java/org/lance/namespace/DirectoryNamespaceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,15 @@

import java.io.ByteArrayOutputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.*;
Expand Down Expand Up @@ -688,4 +694,280 @@ public VectorSchemaRoot getVectorSchemaRoot() {
namespace.close();
}
}

@Test
void testConcurrentCreateAndDropWithSingleInstance() throws Exception {
// Initialize namespace first - create parent namespace to ensure __manifest table
// is created before concurrent operations
CreateNamespaceRequest createNsReq = new CreateNamespaceRequest().id(Arrays.asList("test_ns"));
namespace.createNamespace(createNsReq);

int numTables = 10;
ExecutorService executor = Executors.newFixedThreadPool(numTables);
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch doneLatch = new CountDownLatch(numTables);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failCount = new AtomicInteger(0);

for (int i = 0; i < numTables; i++) {
final int tableIndex = i;
executor.submit(
() -> {
try {
startLatch.await();

String tableName = "concurrent_table_" + tableIndex;
byte[] tableData = createTestTableData();

CreateTableRequest createReq =
new CreateTableRequest().id(Arrays.asList("test_ns", tableName));
namespace.createTable(createReq, tableData);

DropTableRequest dropReq =
new DropTableRequest().id(Arrays.asList("test_ns", tableName));
namespace.dropTable(dropReq);

successCount.incrementAndGet();
} catch (Exception e) {
failCount.incrementAndGet();
} finally {
doneLatch.countDown();
}
});
}

startLatch.countDown();
assertTrue(doneLatch.await(60, TimeUnit.SECONDS), "Timed out waiting for tasks to complete");

executor.shutdown();
assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));

assertEquals(numTables, successCount.get(), "All tasks should succeed");
assertEquals(0, failCount.get(), "No tasks should fail");

ListTablesRequest listReq = new ListTablesRequest().id(Arrays.asList("test_ns"));
ListTablesResponse listResp = namespace.listTables(listReq);
assertEquals(0, listResp.getTables().size(), "All tables should be dropped");
}

@Test
void testConcurrentCreateAndDropWithMultipleInstances() throws Exception {
// Initialize namespace first with a single instance to ensure __manifest
// table is created and parent namespace exists before concurrent operations
DirectoryNamespace initNs = new DirectoryNamespace();
Map<String, String> initConfig = new HashMap<>();
initConfig.put("root", tempDir.toString());
initConfig.put("inline_optimization_enabled", "false");
initNs.initialize(initConfig, allocator);

CreateNamespaceRequest createNsReq = new CreateNamespaceRequest().id(Arrays.asList("test_ns"));
initNs.createNamespace(createNsReq);
initNs.close();

int numTables = 10;
ExecutorService executor = Executors.newFixedThreadPool(numTables);
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch doneLatch = new CountDownLatch(numTables);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failCount = new AtomicInteger(0);
List<DirectoryNamespace> namespaces = new ArrayList<>();

for (int i = 0; i < numTables; i++) {
final int tableIndex = i;
executor.submit(
() -> {
DirectoryNamespace localNs = null;
try {
startLatch.await();

localNs = new DirectoryNamespace();
Map<String, String> config = new HashMap<>();
config.put("root", tempDir.toString());
config.put("inline_optimization_enabled", "false");
localNs.initialize(config, allocator);

synchronized (namespaces) {
namespaces.add(localNs);
}

String tableName = "multi_ns_table_" + tableIndex;
byte[] tableData = createTestTableData();

CreateTableRequest createReq =
new CreateTableRequest().id(Arrays.asList("test_ns", tableName));
localNs.createTable(createReq, tableData);

DropTableRequest dropReq =
new DropTableRequest().id(Arrays.asList("test_ns", tableName));
localNs.dropTable(dropReq);

successCount.incrementAndGet();
} catch (Exception e) {
failCount.incrementAndGet();
} finally {
doneLatch.countDown();
}
});
}

startLatch.countDown();
assertTrue(doneLatch.await(60, TimeUnit.SECONDS), "Timed out waiting for tasks to complete");

executor.shutdown();
assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));

// Close all namespace instances
for (DirectoryNamespace ns : namespaces) {
try {
ns.close();
} catch (Exception e) {
// Ignore
}
}

assertEquals(numTables, successCount.get(), "All tasks should succeed");
assertEquals(0, failCount.get(), "No tasks should fail");

// Verify with a fresh namespace
DirectoryNamespace verifyNs = new DirectoryNamespace();
Map<String, String> config = new HashMap<>();
config.put("root", tempDir.toString());
verifyNs.initialize(config, allocator);

ListTablesRequest listReq = new ListTablesRequest().id(Arrays.asList("test_ns"));
ListTablesResponse listResp = verifyNs.listTables(listReq);
assertEquals(0, listResp.getTables().size(), "All tables should be dropped");

verifyNs.close();
}

@Test
void testConcurrentCreateThenDropFromDifferentInstance() throws Exception {
// Initialize namespace first with a single instance to ensure __manifest
// table is created and parent namespace exists before concurrent operations
DirectoryNamespace initNs = new DirectoryNamespace();
Map<String, String> initConfig = new HashMap<>();
initConfig.put("root", tempDir.toString());
initConfig.put("inline_optimization_enabled", "false");
initNs.initialize(initConfig, allocator);

CreateNamespaceRequest createNsReq = new CreateNamespaceRequest().id(Arrays.asList("test_ns"));
initNs.createNamespace(createNsReq);
initNs.close();

int numTables = 10;

// First, create all tables using separate namespace instances
ExecutorService createExecutor = Executors.newFixedThreadPool(numTables);
CountDownLatch createStartLatch = new CountDownLatch(1);
CountDownLatch createDoneLatch = new CountDownLatch(numTables);
AtomicInteger createSuccessCount = new AtomicInteger(0);
List<DirectoryNamespace> createNamespaces = new ArrayList<>();

for (int i = 0; i < numTables; i++) {
final int tableIndex = i;
createExecutor.submit(
() -> {
DirectoryNamespace localNs = null;
try {
createStartLatch.await();

localNs = new DirectoryNamespace();
Map<String, String> config = new HashMap<>();
config.put("root", tempDir.toString());
config.put("inline_optimization_enabled", "false");
localNs.initialize(config, allocator);

synchronized (createNamespaces) {
createNamespaces.add(localNs);
}

String tableName = "cross_instance_table_" + tableIndex;
byte[] tableData = createTestTableData();

CreateTableRequest createReq =
new CreateTableRequest().id(Arrays.asList("test_ns", tableName));
localNs.createTable(createReq, tableData);

createSuccessCount.incrementAndGet();
} catch (Exception e) {
// Ignore - test will fail on assertion
} finally {
createDoneLatch.countDown();
}
});
}

createStartLatch.countDown();
assertTrue(createDoneLatch.await(60, TimeUnit.SECONDS), "Timed out waiting for creates");
createExecutor.shutdown();

assertEquals(numTables, createSuccessCount.get(), "All creates should succeed");

// Close create namespaces
for (DirectoryNamespace ns : createNamespaces) {
try {
ns.close();
} catch (Exception e) {
// Ignore
}
}

// Now drop all tables using NEW namespace instances
ExecutorService dropExecutor = Executors.newFixedThreadPool(numTables);
CountDownLatch dropStartLatch = new CountDownLatch(1);
CountDownLatch dropDoneLatch = new CountDownLatch(numTables);
AtomicInteger dropSuccessCount = new AtomicInteger(0);
AtomicInteger dropFailCount = new AtomicInteger(0);
List<DirectoryNamespace> dropNamespaces = new ArrayList<>();

for (int i = 0; i < numTables; i++) {
final int tableIndex = i;
dropExecutor.submit(
() -> {
DirectoryNamespace localNs = null;
try {
dropStartLatch.await();

localNs = new DirectoryNamespace();
Map<String, String> config = new HashMap<>();
config.put("root", tempDir.toString());
config.put("inline_optimization_enabled", "false");
localNs.initialize(config, allocator);

synchronized (dropNamespaces) {
dropNamespaces.add(localNs);
}

String tableName = "cross_instance_table_" + tableIndex;

DropTableRequest dropReq =
new DropTableRequest().id(Arrays.asList("test_ns", tableName));
localNs.dropTable(dropReq);

dropSuccessCount.incrementAndGet();
} catch (Exception e) {
dropFailCount.incrementAndGet();
} finally {
dropDoneLatch.countDown();
}
});
}

dropStartLatch.countDown();
assertTrue(dropDoneLatch.await(60, TimeUnit.SECONDS), "Timed out waiting for drops");
dropExecutor.shutdown();

// Close drop namespaces
for (DirectoryNamespace ns : dropNamespaces) {
try {
ns.close();
} catch (Exception e) {
// Ignore
}
}

assertEquals(numTables, dropSuccessCount.get(), "All drops should succeed");
assertEquals(0, dropFailCount.get(), "No drops should fail");
}
}
Loading
Loading