Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions java/lance-jni/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 38 additions & 25 deletions java/lance-jni/src/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ fn call_rest_namespace_query_method<'local>(
pub struct BlockingRestAdapter {
backend: Arc<dyn LanceNamespaceTrait>,
config: RestAdapterConfig,
server_handle: Option<tokio::task::JoinHandle<()>>,
server_handle: Option<lance_namespace_impls::RestAdapterHandle>,
}

#[no_mangle]
Expand All @@ -1235,7 +1235,7 @@ pub extern "system" fn Java_org_lance_namespace_RestAdapter_createNative(
namespace_impl: JString,
properties_map: JObject,
host: JString,
port: jni::sys::jint,
port: JObject,
) -> jlong {
ok_or_throw_with_return!(
env,
Expand All @@ -1249,7 +1249,7 @@ fn create_rest_adapter_internal(
namespace_impl: JString,
properties_map: JObject,
host: JString,
port: jni::sys::jint,
port: JObject,
) -> Result<jlong> {
// Get namespace implementation type
let impl_str: String = env.get_string(&namespace_impl)?.into();
Expand All @@ -1268,13 +1268,22 @@ fn create_rest_adapter_internal(
.block_on(builder.connect())
.map_err(|e| Error::runtime_error(format!("Failed to build backend namespace: {}", e)))?;

// Get host string
let host_str: String = env.get_string(&host)?.into();
// Build config with defaults, overriding if values provided
let mut config = RestAdapterConfig::default();

let config = RestAdapterConfig {
host: host_str,
port: port as u16,
};
// Get host string if not null
if !host.is_null() {
config.host = env.get_string(&host)?.into();
}

// Get port if not null (Integer object)
if !port.is_null() {
let port_value = env
.call_method(&port, "intValue", "()I", &[])?
.i()
.map_err(|e| Error::runtime_error(format!("Failed to get port value: {}", e)))?;
config.port = port_value as u16;
}

let adapter = BlockingRestAdapter {
backend,
Expand All @@ -1287,32 +1296,36 @@ fn create_rest_adapter_internal(
}

#[no_mangle]
pub extern "system" fn Java_org_lance_namespace_RestAdapter_serve(
pub extern "system" fn Java_org_lance_namespace_RestAdapter_start(
mut env: JNIEnv,
_obj: JObject,
handle: jlong,
) {
ok_or_throw_without_return!(env, serve_internal(handle))
ok_or_throw_without_return!(env, start_internal(handle))
}

fn serve_internal(handle: jlong) -> Result<()> {
fn start_internal(handle: jlong) -> Result<()> {
let adapter = unsafe { &mut *(handle as *mut BlockingRestAdapter) };

let rest_adapter = RestAdapter::new(adapter.backend.clone(), adapter.config.clone());

// Spawn server in background
let server_handle = RT.spawn(async move {
let _ = rest_adapter.serve().await;
});

let server_handle = RT.block_on(rest_adapter.start())?;
adapter.server_handle = Some(server_handle);

// Give server time to start
std::thread::sleep(std::time::Duration::from_millis(500));

Ok(())
}

#[no_mangle]
pub extern "system" fn Java_org_lance_namespace_RestAdapter_getPort(
_env: JNIEnv,
_obj: JObject,
handle: jlong,
) -> jni::sys::jint {
let adapter = unsafe { &*(handle as *const BlockingRestAdapter) };
adapter
.server_handle
.as_ref()
.map(|h| h.port() as jni::sys::jint)
.unwrap_or(0)
}

#[no_mangle]
pub extern "system" fn Java_org_lance_namespace_RestAdapter_stop(
_env: JNIEnv,
Expand All @@ -1322,7 +1335,7 @@ pub extern "system" fn Java_org_lance_namespace_RestAdapter_stop(
let adapter = unsafe { &mut *(handle as *mut BlockingRestAdapter) };

if let Some(server_handle) = adapter.server_handle.take() {
server_handle.abort();
server_handle.shutdown();
}
}

Expand All @@ -1336,7 +1349,7 @@ pub extern "system" fn Java_org_lance_namespace_RestAdapter_releaseNative(
unsafe {
let mut adapter = Box::from_raw(handle as *mut BlockingRestAdapter);
if let Some(server_handle) = adapter.server_handle.take() {
server_handle.abort();
server_handle.shutdown();
}
}
}
Expand Down
52 changes: 35 additions & 17 deletions java/src/main/java/org/lance/namespace/RestAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@
* Map<String, String> backendConfig = new HashMap<>();
* backendConfig.put("root", "/tmp/test-data");
*
* try (RestAdapter adapter = new RestAdapter("dir", backendConfig, "127.0.0.1", 8080)) {
* adapter.serve();
* // Use port 0 to let OS assign an available port
* try (RestAdapter adapter = new RestAdapter("dir", backendConfig)) {
* adapter.start();
*
* // Get the actual port assigned by the OS
* int port = adapter.getPort();
*
* // Now you can connect with RestNamespace
* Map<String, String> clientConfig = new HashMap<>();
* clientConfig.put("uri", "http://127.0.0.1:8080");
* clientConfig.put("uri", "http://127.0.0.1:" + port);
* RestNamespace client = new RestNamespace();
* client.initialize(clientConfig, allocator);
*
Expand All @@ -56,35 +60,33 @@ public class RestAdapter implements Closeable, AutoCloseable {
*
* @param namespaceImpl The namespace implementation type (e.g., "dir" for DirectoryNamespace)
* @param backendConfig Configuration properties for the backend namespace
* @param host Host to bind the server to
* @param port Port to bind the server to
* @param host Host to bind the server to, or null for default (127.0.0.1)
* @param port Port to bind the server to. Use 0 to let the OS assign an available port, or null
* for default (2333).
*/
public RestAdapter(
String namespaceImpl, Map<String, String> backendConfig, String host, int port) {
String namespaceImpl, Map<String, String> backendConfig, String host, Integer port) {
if (namespaceImpl == null || namespaceImpl.isEmpty()) {
throw new IllegalArgumentException("namespace implementation cannot be null or empty");
}
if (backendConfig == null) {
throw new IllegalArgumentException("backend config cannot be null");
}
if (host == null || host.isEmpty()) {
throw new IllegalArgumentException("host cannot be null or empty");
}
if (port <= 0 || port > 65535) {
throw new IllegalArgumentException("port must be between 1 and 65535");
if (port != null && (port < 0 || port > 65535)) {
throw new IllegalArgumentException("port must be between 0 and 65535");
}

this.nativeRestAdapterHandle = createNative(namespaceImpl, backendConfig, host, port);
}

/**
* Creates a new REST adapter with default host (127.0.0.1) and port (2333).
* Creates a new REST adapter with default host and port.
*
* @param namespaceImpl The namespace implementation type
* @param backendConfig Configuration properties for the backend namespace
*/
public RestAdapter(String namespaceImpl, Map<String, String> backendConfig) {
this(namespaceImpl, backendConfig, "127.0.0.1", 2333);
this(namespaceImpl, backendConfig, null, null);
}

/**
Expand All @@ -93,18 +95,32 @@ public RestAdapter(String namespaceImpl, Map<String, String> backendConfig) {
* <p>This method returns immediately after starting the server. The server runs in a background
* thread until {@link #stop()} is called or the adapter is closed.
*/
public void serve() {
public void start() {
if (nativeRestAdapterHandle == 0) {
throw new IllegalStateException("RestAdapter not initialized");
}
if (serverStarted) {
throw new IllegalStateException("Server already started");
}

serve(nativeRestAdapterHandle);
start(nativeRestAdapterHandle);
serverStarted = true;
}

/**
* Get the actual port the server is listening on.
*
* <p>This is useful when port 0 was specified to get an OS-assigned port.
*
* @return The actual port, or 0 if the server hasn't been started
*/
public int getPort() {
if (nativeRestAdapterHandle == 0) {
return 0;
}
return getPort(nativeRestAdapterHandle);
}

/**
* Stop the REST server.
*
Expand All @@ -128,9 +144,11 @@ public void close() {

// Native methods
private native long createNative(
String namespaceImpl, Map<String, String> backendConfig, String host, int port);
String namespaceImpl, Map<String, String> backendConfig, String host, Integer port);

private native void start(long handle);

private native void serve(long handle);
private native int getPort(long handle);

private native void stop(long handle);

Expand Down
11 changes: 4 additions & 7 deletions java/src/test/java/org/lance/namespace/RestNamespaceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import static org.junit.jupiter.api.Assertions.*;

Expand All @@ -57,16 +56,14 @@ public class RestNamespaceTest {
void setUp() {
allocator = new RootAllocator(Long.MAX_VALUE);

// Use a random port to avoid conflicts
port = 4000 + new Random().nextInt(10000);

// Create backend configuration for DirectoryNamespace
Map<String, String> backendConfig = new HashMap<>();
backendConfig.put("root", tempDir.toString());

// Create and start REST adapter
adapter = new RestAdapter("dir", backendConfig, "127.0.0.1", port);
adapter.serve();
// Create and start REST adapter (port 0 lets OS assign available port)
adapter = new RestAdapter("dir", backendConfig, "127.0.0.1", 0);
adapter.start();
port = adapter.getPort();

// Create REST namespace client
namespace = new RestNamespace();
Expand Down
12 changes: 7 additions & 5 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading