diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index 991ca0dcacd..e24ef2b1e9e 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -4414,12 +4414,13 @@ dependencies = [ [[package]] name = "object_store_opendal" -version = "0.54.1" +version = "0.55.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0b88fc0e0c4890c1d99e2b8c519c5db40f7d9b69a0f562ff1ad4967a4c8bbc6" +checksum = "113ab0769e972eee585e57407b98de08bda5354fa28e8ba4d89038d6cb6a8991" dependencies = [ "async-trait", "bytes", + "chrono", "futures", "object_store", "opendal", @@ -4447,20 +4448,20 @@ checksum = "b4ce411919553d3f9fa53a0880544cda985a112117a0444d5ff1e870a893d6ea" [[package]] name = "opendal" -version = "0.54.1" +version = "0.55.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42afda58fa2cf50914402d132cc1caacff116a85d10c72ab2082bb7c50021754" +checksum = "d075ab8a203a6ab4bc1bce0a4b9fe486a72bf8b939037f4b78d95386384bc80a" dependencies = [ "anyhow", "backon", "base64 0.22.1", "bytes", - "chrono", "crc32c", "futures", "getrandom 0.2.16", "http 1.4.0", "http-body 1.0.1", + "jiff", "log", "md-5", "percent-encoding", @@ -4471,6 +4472,7 @@ dependencies = [ "serde_json", "sha2", "tokio", + "url", "uuid", ] diff --git a/java/lance-jni/src/namespace.rs b/java/lance-jni/src/namespace.rs index 70f4d27f626..d197c2b594b 100644 --- a/java/lance-jni/src/namespace.rs +++ b/java/lance-jni/src/namespace.rs @@ -1225,7 +1225,7 @@ fn call_rest_namespace_query_method<'local>( pub struct BlockingRestAdapter { backend: Arc, config: RestAdapterConfig, - server_handle: Option>, + server_handle: Option, } #[no_mangle] @@ -1235,7 +1235,7 @@ pub extern "system" fn Java_org_lance_namespace_RestAdapter_createNative( namespace_impl: JString, properties_map: JObject, host: JString, - port: jni::sys::jint, + port: JObject, ) -> jlong { ok_or_throw_with_return!( env, @@ -1249,7 +1249,7 @@ fn create_rest_adapter_internal( namespace_impl: JString, properties_map: JObject, host: JString, - port: jni::sys::jint, + port: JObject, ) -> Result { // Get namespace implementation type let impl_str: String = env.get_string(&namespace_impl)?.into(); @@ -1268,13 +1268,22 @@ fn create_rest_adapter_internal( .block_on(builder.connect()) .map_err(|e| Error::runtime_error(format!("Failed to build backend namespace: {}", e)))?; - // Get host string - let host_str: String = env.get_string(&host)?.into(); + // Build config with defaults, overriding if values provided + let mut config = RestAdapterConfig::default(); - let config = RestAdapterConfig { - host: host_str, - port: port as u16, - }; + // Get host string if not null + if !host.is_null() { + config.host = env.get_string(&host)?.into(); + } + + // Get port if not null (Integer object) + if !port.is_null() { + let port_value = env + .call_method(&port, "intValue", "()I", &[])? + .i() + .map_err(|e| Error::runtime_error(format!("Failed to get port value: {}", e)))?; + config.port = port_value as u16; + } let adapter = BlockingRestAdapter { backend, @@ -1287,32 +1296,36 @@ fn create_rest_adapter_internal( } #[no_mangle] -pub extern "system" fn Java_org_lance_namespace_RestAdapter_serve( +pub extern "system" fn Java_org_lance_namespace_RestAdapter_start( mut env: JNIEnv, _obj: JObject, handle: jlong, ) { - ok_or_throw_without_return!(env, serve_internal(handle)) + ok_or_throw_without_return!(env, start_internal(handle)) } -fn serve_internal(handle: jlong) -> Result<()> { +fn start_internal(handle: jlong) -> Result<()> { let adapter = unsafe { &mut *(handle as *mut BlockingRestAdapter) }; - let rest_adapter = RestAdapter::new(adapter.backend.clone(), adapter.config.clone()); - - // Spawn server in background - let server_handle = RT.spawn(async move { - let _ = rest_adapter.serve().await; - }); - + let server_handle = RT.block_on(rest_adapter.start())?; adapter.server_handle = Some(server_handle); - - // Give server time to start - std::thread::sleep(std::time::Duration::from_millis(500)); - Ok(()) } +#[no_mangle] +pub extern "system" fn Java_org_lance_namespace_RestAdapter_getPort( + _env: JNIEnv, + _obj: JObject, + handle: jlong, +) -> jni::sys::jint { + let adapter = unsafe { &*(handle as *const BlockingRestAdapter) }; + adapter + .server_handle + .as_ref() + .map(|h| h.port() as jni::sys::jint) + .unwrap_or(0) +} + #[no_mangle] pub extern "system" fn Java_org_lance_namespace_RestAdapter_stop( _env: JNIEnv, @@ -1322,7 +1335,7 @@ pub extern "system" fn Java_org_lance_namespace_RestAdapter_stop( let adapter = unsafe { &mut *(handle as *mut BlockingRestAdapter) }; if let Some(server_handle) = adapter.server_handle.take() { - server_handle.abort(); + server_handle.shutdown(); } } @@ -1336,7 +1349,7 @@ pub extern "system" fn Java_org_lance_namespace_RestAdapter_releaseNative( unsafe { let mut adapter = Box::from_raw(handle as *mut BlockingRestAdapter); if let Some(server_handle) = adapter.server_handle.take() { - server_handle.abort(); + server_handle.shutdown(); } } } diff --git a/java/src/main/java/org/lance/namespace/RestAdapter.java b/java/src/main/java/org/lance/namespace/RestAdapter.java index 7004994c97f..534a7eabb9e 100644 --- a/java/src/main/java/org/lance/namespace/RestAdapter.java +++ b/java/src/main/java/org/lance/namespace/RestAdapter.java @@ -30,12 +30,16 @@ * Map backendConfig = new HashMap<>(); * backendConfig.put("root", "/tmp/test-data"); * - * try (RestAdapter adapter = new RestAdapter("dir", backendConfig, "127.0.0.1", 8080)) { - * adapter.serve(); + * // Use port 0 to let OS assign an available port + * try (RestAdapter adapter = new RestAdapter("dir", backendConfig)) { + * adapter.start(); + * + * // Get the actual port assigned by the OS + * int port = adapter.getPort(); * * // Now you can connect with RestNamespace * Map clientConfig = new HashMap<>(); - * clientConfig.put("uri", "http://127.0.0.1:8080"); + * clientConfig.put("uri", "http://127.0.0.1:" + port); * RestNamespace client = new RestNamespace(); * client.initialize(clientConfig, allocator); * @@ -56,35 +60,33 @@ public class RestAdapter implements Closeable, AutoCloseable { * * @param namespaceImpl The namespace implementation type (e.g., "dir" for DirectoryNamespace) * @param backendConfig Configuration properties for the backend namespace - * @param host Host to bind the server to - * @param port Port to bind the server to + * @param host Host to bind the server to, or null for default (127.0.0.1) + * @param port Port to bind the server to. Use 0 to let the OS assign an available port, or null + * for default (2333). */ public RestAdapter( - String namespaceImpl, Map backendConfig, String host, int port) { + String namespaceImpl, Map backendConfig, String host, Integer port) { if (namespaceImpl == null || namespaceImpl.isEmpty()) { throw new IllegalArgumentException("namespace implementation cannot be null or empty"); } if (backendConfig == null) { throw new IllegalArgumentException("backend config cannot be null"); } - if (host == null || host.isEmpty()) { - throw new IllegalArgumentException("host cannot be null or empty"); - } - if (port <= 0 || port > 65535) { - throw new IllegalArgumentException("port must be between 1 and 65535"); + if (port != null && (port < 0 || port > 65535)) { + throw new IllegalArgumentException("port must be between 0 and 65535"); } this.nativeRestAdapterHandle = createNative(namespaceImpl, backendConfig, host, port); } /** - * Creates a new REST adapter with default host (127.0.0.1) and port (2333). + * Creates a new REST adapter with default host and port. * * @param namespaceImpl The namespace implementation type * @param backendConfig Configuration properties for the backend namespace */ public RestAdapter(String namespaceImpl, Map backendConfig) { - this(namespaceImpl, backendConfig, "127.0.0.1", 2333); + this(namespaceImpl, backendConfig, null, null); } /** @@ -93,7 +95,7 @@ public RestAdapter(String namespaceImpl, Map backendConfig) { *

This method returns immediately after starting the server. The server runs in a background * thread until {@link #stop()} is called or the adapter is closed. */ - public void serve() { + public void start() { if (nativeRestAdapterHandle == 0) { throw new IllegalStateException("RestAdapter not initialized"); } @@ -101,10 +103,24 @@ public void serve() { throw new IllegalStateException("Server already started"); } - serve(nativeRestAdapterHandle); + start(nativeRestAdapterHandle); serverStarted = true; } + /** + * Get the actual port the server is listening on. + * + *

This is useful when port 0 was specified to get an OS-assigned port. + * + * @return The actual port, or 0 if the server hasn't been started + */ + public int getPort() { + if (nativeRestAdapterHandle == 0) { + return 0; + } + return getPort(nativeRestAdapterHandle); + } + /** * Stop the REST server. * @@ -128,9 +144,11 @@ public void close() { // Native methods private native long createNative( - String namespaceImpl, Map backendConfig, String host, int port); + String namespaceImpl, Map backendConfig, String host, Integer port); + + private native void start(long handle); - private native void serve(long handle); + private native int getPort(long handle); private native void stop(long handle); diff --git a/java/src/test/java/org/lance/namespace/RestNamespaceTest.java b/java/src/test/java/org/lance/namespace/RestNamespaceTest.java index 3e861de44e4..797ef5d6785 100644 --- a/java/src/test/java/org/lance/namespace/RestNamespaceTest.java +++ b/java/src/test/java/org/lance/namespace/RestNamespaceTest.java @@ -35,7 +35,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import java.util.Random; import static org.junit.jupiter.api.Assertions.*; @@ -57,16 +56,14 @@ public class RestNamespaceTest { void setUp() { allocator = new RootAllocator(Long.MAX_VALUE); - // Use a random port to avoid conflicts - port = 4000 + new Random().nextInt(10000); - // Create backend configuration for DirectoryNamespace Map backendConfig = new HashMap<>(); backendConfig.put("root", tempDir.toString()); - // Create and start REST adapter - adapter = new RestAdapter("dir", backendConfig, "127.0.0.1", port); - adapter.serve(); + // Create and start REST adapter (port 0 lets OS assign available port) + adapter = new RestAdapter("dir", backendConfig, "127.0.0.1", 0); + adapter.start(); + port = adapter.getPort(); // Create REST namespace client namespace = new RestNamespace(); diff --git a/python/Cargo.lock b/python/Cargo.lock index 0a73294833c..ae3aee8dc75 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -5083,12 +5083,13 @@ dependencies = [ [[package]] name = "object_store_opendal" -version = "0.54.1" +version = "0.55.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0b88fc0e0c4890c1d99e2b8c519c5db40f7d9b69a0f562ff1ad4967a4c8bbc6" +checksum = "113ab0769e972eee585e57407b98de08bda5354fa28e8ba4d89038d6cb6a8991" dependencies = [ "async-trait", "bytes", + "chrono", "futures", "object_store", "opendal", @@ -5116,20 +5117,20 @@ checksum = "b4ce411919553d3f9fa53a0880544cda985a112117a0444d5ff1e870a893d6ea" [[package]] name = "opendal" -version = "0.54.1" +version = "0.55.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42afda58fa2cf50914402d132cc1caacff116a85d10c72ab2082bb7c50021754" +checksum = "d075ab8a203a6ab4bc1bce0a4b9fe486a72bf8b939037f4b78d95386384bc80a" dependencies = [ "anyhow", "backon", "base64 0.22.1", "bytes", - "chrono", "crc32c", "futures", "getrandom 0.2.16", "http 1.4.0", "http-body 1.0.1", + "jiff", "log", "md-5", "percent-encoding", @@ -5140,6 +5141,7 @@ dependencies = [ "serde_json", "sha2", "tokio", + "url", "uuid", ] diff --git a/python/python/lance/namespace.py b/python/python/lance/namespace.py index 4619f2ce244..426c7176d74 100644 --- a/python/python/lance/namespace.py +++ b/python/python/lance/namespace.py @@ -314,19 +314,21 @@ class RestAdapter: session : Session, optional Lance session for sharing object store connections with the backend namespace. host : str, optional - Host address to bind to, default "127.0.0.1" + Host address to bind to. Default "127.0.0.1". port : int, optional - Port to listen on, default 2333 + Port to listen on. Default 2333 per REST spec. + Use 0 to let the OS assign an available ephemeral port. + Use the `port` property after `start()` to get the actual port. Examples -------- >>> import lance.namespace >>> - >>> # Start REST adapter with DirectoryNamespace backend + >>> # Start REST adapter with DirectoryNamespace backend (auto port) >>> namespace_config = {"root": "memory://test"} - >>> with lance.namespace.RestAdapter("dir", namespace_config, port=4001) as adapter: - ... # Create REST client - ... client = lance.namespace.RestNamespace(uri="http://127.0.0.1:4001") + >>> with lance.namespace.RestAdapter("dir", namespace_config) as adapter: + ... # Create REST client using the assigned port + ... client = lance.namespace.RestNamespace(uri=f"http://127.0.0.1:{adapter.port}") ... # Use the client... """ @@ -335,8 +337,8 @@ def __init__( namespace_impl: str, namespace_properties: Dict[str, str] = None, session=None, - host: str = "127.0.0.1", - port: int = 2333, + host: str = None, + port: int = None, ): if PyRestAdapter is None: raise RuntimeError( @@ -353,12 +355,19 @@ def __init__( # Create the underlying Rust adapter self._inner = PyRestAdapter(namespace_impl, str_properties, session, host, port) self.host = host - self.port = port self.namespace_impl = namespace_impl - def serve(self): + @property + def port(self) -> int: + """Get the actual port the server is listening on. + + Returns 0 if the server hasn't been started yet. + """ + return self._inner.port + + def start(self): """Start the REST server in the background.""" - self._inner.serve() + self._inner.start() def stop(self): """Stop the REST server.""" @@ -366,7 +375,7 @@ def stop(self): def __enter__(self): """Start server when entering context.""" - self.serve() + self.start() return self def __exit__(self, exc_type, exc_value, traceback): diff --git a/python/python/tests/test_namespace_rest.py b/python/python/tests/test_namespace_rest.py index 6b988d7c476..7fa3a65c5f1 100644 --- a/python/python/tests/test_namespace_rest.py +++ b/python/python/tests/test_namespace_rest.py @@ -12,7 +12,6 @@ """ import tempfile -import uuid import lance.namespace import pyarrow as pa @@ -59,14 +58,11 @@ def table_to_ipc_bytes(table): @pytest.fixture def rest_namespace(): """Create a REST namespace with adapter for testing.""" - unique_id = uuid.uuid4().hex[:8] with tempfile.TemporaryDirectory() as tmpdir: backend_config = {"root": tmpdir} - port = 4000 + hash(unique_id) % 10000 - with lance.namespace.RestAdapter("dir", backend_config, port=port): - # Use lance.namespace.connect() for consistency - client = connect("rest", {"uri": f"http://127.0.0.1:{port}"}) + with lance.namespace.RestAdapter("dir", backend_config, port=0) as adapter: + client = connect("rest", {"uri": f"http://127.0.0.1:{adapter.port}"}) yield client @@ -645,27 +641,21 @@ class TestLanceNamespaceConnect: def test_connect_with_rest(self): """Test creating RestNamespace via lance.namespace.connect().""" - unique_id = uuid.uuid4().hex[:8] with tempfile.TemporaryDirectory() as tmpdir: backend_config = {"root": tmpdir} - port = 4000 + hash(unique_id) % 10000 - with lance.namespace.RestAdapter("dir", backend_config, port=port): - # Connect via lance.namespace.connect - properties = {"uri": f"http://127.0.0.1:{port}"} + with lance.namespace.RestAdapter("dir", backend_config, port=0) as adapter: + properties = {"uri": f"http://127.0.0.1:{adapter.port}"} ns = connect("rest", properties) - # Verify it's a RestNamespace instance assert isinstance(ns, lance.namespace.RestNamespace) - # Verify it works create_req = CreateTableRequest(id=["test_table"]) table_data = create_test_data() ipc_data = table_to_ipc_bytes(table_data) response = ns.create_table(create_req, ipc_data) assert response is not None - # Verify we can list the table list_req = ListTablesRequest(id=[]) list_response = ns.list_tables(list_req) assert len(list_response.tables) == 1 @@ -673,24 +663,18 @@ def test_connect_with_rest(self): def test_connect_with_custom_delimiter(self): """Test creating RestNamespace with custom delimiter via connect().""" - unique_id = uuid.uuid4().hex[:8] with tempfile.TemporaryDirectory() as tmpdir: backend_config = {"root": tmpdir} - port = 4000 + hash(unique_id) % 10000 - with lance.namespace.RestAdapter("dir", backend_config, port=port): - # Connect with custom delimiter - # Use URL-friendly delimiter instead of default '$' + with lance.namespace.RestAdapter("dir", backend_config, port=0) as adapter: properties = { - "uri": f"http://127.0.0.1:{port}", + "uri": f"http://127.0.0.1:{adapter.port}", "delimiter": "@", } ns = connect("rest", properties) - # Verify it's a RestNamespace instance assert isinstance(ns, lance.namespace.RestNamespace) - # This should work without errors create_req = CreateTableRequest(id=["test_table"]) table_data = create_test_data() ipc_data = table_to_ipc_bytes(table_data) diff --git a/python/src/namespace.rs b/python/src/namespace.rs index 47e23f94aa8..4ddf0fc76a4 100644 --- a/python/src/namespace.rs +++ b/python/src/namespace.rs @@ -11,7 +11,7 @@ use lance_namespace_impls::DirectoryNamespaceBuilder; #[cfg(feature = "rest")] use lance_namespace_impls::RestNamespaceBuilder; #[cfg(feature = "rest-adapter")] -use lance_namespace_impls::{ConnectBuilder, RestAdapter, RestAdapterConfig}; +use lance_namespace_impls::{ConnectBuilder, RestAdapter, RestAdapterConfig, RestAdapterHandle}; use pyo3::prelude::*; use pyo3::types::{PyBytes, PyDict}; use pythonize::{depythonize, pythonize}; @@ -356,20 +356,23 @@ impl PyRestNamespace { pub struct PyRestAdapter { backend: Arc, config: RestAdapterConfig, + handle: Option, } #[cfg(feature = "rest-adapter")] #[pymethods] impl PyRestAdapter { - /// Create a new REST adapter server with namespace configuration + /// Create a new REST adapter server with namespace configuration. + /// Default port is 2333 per REST spec. Use port 0 to let OS assign an ephemeral port. + /// Use `port` property after `start()` to get the actual port. #[new] - #[pyo3(signature = (namespace_impl, namespace_properties, session = None, host = "127.0.0.1".to_string(), port = 2333))] + #[pyo3(signature = (namespace_impl, namespace_properties, session = None, host = None, port = None))] fn new( namespace_impl: String, namespace_properties: Option<&Bound<'_, PyDict>>, session: Option<&Bound<'_, Session>>, - host: String, - port: u16, + host: Option, + port: Option, ) -> PyResult { let mut props = HashMap::new(); @@ -377,13 +380,11 @@ impl PyRestAdapter { props = dict_to_hashmap(dict)?; } - // Use ConnectBuilder to build namespace from impl and properties let mut builder = ConnectBuilder::new(namespace_impl); for (k, v) in props { builder = builder.property(k, v); } - // Add session if provided if let Some(sess) = session { builder = builder.session(sess.borrow().inner.clone()); } @@ -392,30 +393,44 @@ impl PyRestAdapter { .block_on(None, builder.connect())? .infer_error()?; - let config = RestAdapterConfig { host, port }; + let mut config = RestAdapterConfig::default(); + if let Some(h) = host { + config.host = h; + } + if let Some(p) = port { + config.port = p; + } + + Ok(Self { + backend, + config, + handle: None, + }) + } - Ok(Self { backend, config }) + /// Get the actual port the server is listening on. + /// Returns 0 if server is not started yet. + #[getter] + fn port(&self) -> u16 { + self.handle.as_ref().map(|h| h.port()).unwrap_or(0) } /// Start the REST server in the background - fn serve(&mut self, py: Python) -> PyResult<()> { + fn start(&mut self, py: Python) -> PyResult<()> { let adapter = RestAdapter::new(self.backend.clone(), self.config.clone()); + let handle = crate::rt() + .block_on(Some(py), adapter.start())? + .infer_error()?; - crate::rt().spawn_background(Some(py), async move { - let _ = adapter.serve().await; - }); - - // Give server time to start - py.allow_threads(|| { - std::thread::sleep(std::time::Duration::from_millis(500)); - }); - + self.handle = Some(handle); Ok(()) } /// Stop the REST server fn stop(&mut self) { - // Server will be stopped when dropped + if let Some(handle) = self.handle.take() { + handle.shutdown(); + } } fn __enter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index d95e8118f6f..ddc934d79ee 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -144,6 +144,13 @@ impl DatasetConsistencyWrapper { async fn reload(&self) -> Result<()> { // First check if we need to reload (with read lock) let read_guard = self.0.read().await; + let dataset_uri = read_guard.uri().to_string(); + let current_version = read_guard.version().version; + log::debug!( + "Reload starting for uri={}, current_version={}", + dataset_uri, + current_version + ); let latest_version = read_guard .latest_version_id() .await @@ -154,11 +161,17 @@ impl DatasetConsistencyWrapper { ))), location: location!(), })?; - let current_version = read_guard.version().version; + log::debug!( + "Reload got latest_version={} for uri={}, current_version={}", + latest_version, + dataset_uri, + current_version + ); drop(read_guard); // If already up-to-date, return early if latest_version == current_version { + log::debug!("Already up-to-date for uri={}", dataset_uri); return Ok(()); } @@ -936,6 +949,7 @@ impl ManifestNamespace { session: Option>, ) -> Result { let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME); + log::debug!("Attempting to load manifest from {}", manifest_path); let mut builder = DatasetBuilder::from_uri(&manifest_path); if let Some(sess) = session.clone() { @@ -947,7 +961,6 @@ impl ManifestNamespace { } let dataset_result = builder.load().await; - if let Ok(dataset) = dataset_result { Ok(DatasetConsistencyWrapper::new(dataset)) } else { @@ -975,7 +988,12 @@ impl ManifestNamespace { location: location!(), })?; - log::info!("Successfully created manifest table at {}", manifest_path); + log::info!( + "Successfully created manifest table at {}, version={}, uri={}", + manifest_path, + dataset.version().version, + dataset.uri() + ); Ok(DatasetConsistencyWrapper::new(dataset)) } } diff --git a/rust/lance-namespace-impls/src/lib.rs b/rust/lance-namespace-impls/src/lib.rs index 5c72cd56dc5..634199ce98a 100644 --- a/rust/lance-namespace-impls/src/lib.rs +++ b/rust/lance-namespace-impls/src/lib.rs @@ -48,4 +48,4 @@ pub use dir::{manifest::ManifestNamespace, DirectoryNamespace, DirectoryNamespac pub use rest::{RestNamespace, RestNamespaceBuilder}; #[cfg(feature = "rest-adapter")] -pub use rest_adapter::{RestAdapter, RestAdapterConfig}; +pub use rest_adapter::{RestAdapter, RestAdapterConfig, RestAdapterHandle}; diff --git a/rust/lance-namespace-impls/src/rest_adapter.rs b/rust/lance-namespace-impls/src/rest_adapter.rs index 2f454d8cd2a..eae81d73ba6 100644 --- a/rust/lance-namespace-impls/src/rest_adapter.rs +++ b/rust/lance-namespace-impls/src/rest_adapter.rs @@ -18,6 +18,7 @@ use axum::{ Json, Router, }; use serde::Deserialize; +use tokio::sync::watch; use tower_http::trace::TraceLayer; use lance_core::{Error, Result}; @@ -76,24 +77,94 @@ impl RestAdapter { .with_state(self.backend.clone()) } - /// Start the REST server (blocking) - pub async fn serve(self) -> Result<()> { + /// Start the REST server in the background and return a handle for shutdown. + /// + /// This method binds to the configured address and spawns a background task + /// to handle requests. The returned handle can be used to gracefully shut down + /// the server. + /// + /// Returns an error immediately if the server fails to bind to the address. + /// If port 0 is specified, the OS will assign an available ephemeral port. + /// The actual port can be retrieved from the returned handle via `port()`. + pub async fn start(self) -> Result { let addr = format!("{}:{}", self.config.host, self.config.port); - let listener = tokio::net::TcpListener::bind(&addr) - .await - .map_err(|e| Error::IO { - source: Box::new(e), - location: snafu::location!(), - })?; - axum::serve(listener, self.router()) - .await - .map_err(|e| Error::IO { + let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| { + log::error!("RestAdapter::start() failed to bind to {}: {}", addr, e); + Error::IO { source: Box::new(e), location: snafu::location!(), - })?; + } + })?; + + // Get the actual port (important when port 0 was specified) + let actual_port = listener.local_addr().map(|a| a.port()).unwrap_or(0); + + let (shutdown_tx, mut shutdown_rx) = watch::channel(false); + let (done_tx, done_rx) = tokio::sync::oneshot::channel::<()>(); + let router = self.router(); + + tokio::spawn(async move { + let result = axum::serve(listener, router) + .with_graceful_shutdown(async move { + let _ = shutdown_rx.changed().await; + }) + .await; + + if let Err(e) = result { + log::error!("RestAdapter: server error: {}", e); + } - Ok(()) + // Signal that server has shut down + let _ = done_tx.send(()); + }); + + Ok(RestAdapterHandle { + shutdown_tx, + done_rx: std::sync::Mutex::new(Some(done_rx)), + port: actual_port, + }) + } +} + +/// Handle for controlling a running REST adapter server. +/// +/// Use this handle to gracefully shut down the server when it's no longer needed. +pub struct RestAdapterHandle { + shutdown_tx: watch::Sender, + done_rx: std::sync::Mutex>>, + port: u16, +} + +impl RestAdapterHandle { + /// Get the actual port the server is listening on. + /// This is useful when port 0 was specified to get an OS-assigned port. + pub fn port(&self) -> u16 { + self.port + } + + /// Gracefully shut down the server and wait for it to complete. + /// + /// This signals the server to stop accepting new connections, waits for + /// existing connections to complete, and blocks until the server has + /// fully shut down. + pub fn shutdown(&self) { + // Send shutdown signal + let _ = self.shutdown_tx.send(true); + + // Wait for server to complete + if let Some(done_rx) = self.done_rx.lock().unwrap().take() { + // Use a new runtime to block on the oneshot receiver + // This is needed because shutdown() is called from sync context + let _ = std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let _ = rt.block_on(done_rx); + }) + .join(); + } } } @@ -476,17 +547,16 @@ mod tests { use crate::{DirectoryNamespaceBuilder, RestNamespaceBuilder}; use std::sync::Arc; use tempfile::TempDir; - use tokio::task::JoinHandle; /// Test fixture that manages server lifecycle struct RestServerFixture { _temp_dir: TempDir, namespace: crate::RestNamespace, - server_handle: JoinHandle<()>, + server_handle: RestAdapterHandle, } impl RestServerFixture { - async fn new(port: u16) -> Self { + async fn new() -> Self { let temp_dir = TempDir::new().unwrap(); let temp_path = temp_dir.path().to_str().unwrap().to_string(); @@ -498,22 +568,20 @@ mod tests { .unwrap(); let backend = Arc::new(backend); - // Start REST server + // Start REST server with port 0 (OS assigns available port) let config = RestAdapterConfig { - host: "127.0.0.1".to_string(), - port, + port: 0, + ..Default::default() }; let server = RestAdapter::new(backend.clone(), config); - let server_handle = tokio::spawn(async move { - server.serve().await.unwrap(); - }); + let server_handle = server.start().await.unwrap(); - // Give server time to start - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + // Get the actual port assigned by OS + let actual_port = server_handle.port(); // Create RestNamespace client - let server_url = format!("http://127.0.0.1:{}", port); + let server_url = format!("http://127.0.0.1:{}", actual_port); let namespace = RestNamespaceBuilder::new(&server_url) .delimiter("$") .build(); @@ -528,7 +596,7 @@ mod tests { impl Drop for RestServerFixture { fn drop(&mut self) { - self.server_handle.abort(); + self.server_handle.shutdown(); } } @@ -563,9 +631,9 @@ mod tests { Bytes::from(buffer) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_create_and_list_child_namespaces() { - let fixture = RestServerFixture::new(4001).await; + let fixture = RestServerFixture::new().await; // Create child namespaces for i in 1..=3 { @@ -593,9 +661,9 @@ mod tests { assert!(namespaces.namespaces.contains(&"namespace3".to_string())); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_nested_namespace_hierarchy() { - let fixture = RestServerFixture::new(4002).await; + let fixture = RestServerFixture::new().await; // Create parent namespace let create_req = CreateNamespaceRequest { @@ -646,9 +714,9 @@ mod tests { assert!(children.contains(&"child2".to_string())); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_create_table_in_child_namespace() { - let fixture = RestServerFixture::new(4003).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create child namespace first @@ -699,9 +767,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_list_tables_in_child_namespace() { - let fixture = RestServerFixture::new(4004).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create child namespace @@ -746,9 +814,9 @@ mod tests { assert!(tables.tables.contains(&"table3".to_string())); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_table_exists_in_child_namespace() { - let fixture = RestServerFixture::new(4005).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create child namespace @@ -783,9 +851,9 @@ mod tests { assert!(result.is_ok(), "Table should exist in child namespace"); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_empty_table_exists_in_child_namespace() { - let fixture = RestServerFixture::new(4015).await; + let fixture = RestServerFixture::new().await; // Create child namespace let create_ns_req = CreateNamespaceRequest { @@ -818,9 +886,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_describe_table_in_child_namespace() { - let fixture = RestServerFixture::new(4006).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create child namespace @@ -908,9 +976,9 @@ mod tests { } } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_drop_table_in_child_namespace() { - let fixture = RestServerFixture::new(4007).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create child namespace @@ -958,9 +1026,9 @@ mod tests { // (error message varies depending on implementation details) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_create_empty_table_in_child_namespace() { - let fixture = RestServerFixture::new(4008).await; + let fixture = RestServerFixture::new().await; // Create child namespace let create_ns_req = CreateNamespaceRequest { @@ -1012,9 +1080,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_describe_empty_table_in_child_namespace() { - let fixture = RestServerFixture::new(4016).await; + let fixture = RestServerFixture::new().await; // Create child namespace let create_ns_req = CreateNamespaceRequest { @@ -1067,9 +1135,9 @@ mod tests { // (schema is None until data is added) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_drop_empty_table_in_child_namespace() { - let fixture = RestServerFixture::new(4017).await; + let fixture = RestServerFixture::new().await; // Create child namespace let create_ns_req = CreateNamespaceRequest { @@ -1112,9 +1180,9 @@ mod tests { // (error message varies depending on implementation details) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_deeply_nested_namespace_with_empty_table() { - let fixture = RestServerFixture::new(4018).await; + let fixture = RestServerFixture::new().await; // Create deeply nested namespace hierarchy let create_req = CreateNamespaceRequest { @@ -1185,9 +1253,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_deeply_nested_namespace_with_table() { - let fixture = RestServerFixture::new(4009).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create deeply nested namespace hierarchy @@ -1266,9 +1334,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_namespace_isolation() { - let fixture = RestServerFixture::new(4010).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create two sibling namespaces @@ -1341,9 +1409,9 @@ mod tests { assert!(fixture.namespace.table_exists(exists_req).await.is_ok()); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_drop_namespace_with_tables_fails() { - let fixture = RestServerFixture::new(4011).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create namespace @@ -1387,9 +1455,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_drop_empty_child_namespace() { - let fixture = RestServerFixture::new(4012).await; + let fixture = RestServerFixture::new().await; // Create namespace let create_ns_req = CreateNamespaceRequest { @@ -1422,9 +1490,9 @@ mod tests { // (error message varies depending on implementation details) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_namespace_with_properties() { - let fixture = RestServerFixture::new(4013).await; + let fixture = RestServerFixture::new().await; // Create namespace with properties let mut properties = std::collections::HashMap::new(); @@ -1455,9 +1523,9 @@ mod tests { assert_eq!(props.get("environment"), Some(&"production".to_string())); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_root_namespace_operations() { - let fixture = RestServerFixture::new(4014).await; + let fixture = RestServerFixture::new().await; // Root namespace should always exist let exists_req = NamespaceExistsRequest { id: Some(vec![]) }; @@ -1492,9 +1560,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_register_table() { - let fixture = RestServerFixture::new(4019).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create child namespace @@ -1556,9 +1624,9 @@ mod tests { assert!(result.is_ok(), "Registered table should exist"); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_register_table_rejects_absolute_uri() { - let fixture = RestServerFixture::new(4020).await; + let fixture = RestServerFixture::new().await; // Create child namespace let create_ns_req = CreateNamespaceRequest { @@ -1590,9 +1658,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_register_table_rejects_path_traversal() { - let fixture = RestServerFixture::new(4021).await; + let fixture = RestServerFixture::new().await; // Create child namespace let create_ns_req = CreateNamespaceRequest { @@ -1624,9 +1692,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_deregister_table() { - let fixture = RestServerFixture::new(4022).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create child namespace @@ -1701,9 +1769,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_register_deregister_round_trip() { - let fixture = RestServerFixture::new(4023).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create child namespace @@ -1814,7 +1882,7 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_namespace_write() { use arrow::array::Int32Array; use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema}; @@ -1822,7 +1890,7 @@ mod tests { use lance::dataset::{Dataset, WriteMode, WriteParams}; use lance_namespace::LanceNamespace; - let fixture = RestServerFixture::new(4024).await; + let fixture = RestServerFixture::new().await; let namespace = Arc::new(fixture.namespace.clone()) as Arc; // Use child namespace instead of root