From 34e627df66b5e46d5e4a26008df389e36fb463e9 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Fri, 28 Nov 2025 08:59:28 -0800 Subject: [PATCH 1/8] add logging --- .github/workflows/run_tests/action.yml | 11 +++- python/python/tests/test_namespace_rest.py | 14 +++++ python/src/namespace.rs | 20 ++++++- .../lance-namespace-impls/src/dir/manifest.rs | 52 ++++++++++++++++--- .../lance-namespace-impls/src/rest_adapter.rs | 19 ++++++- 5 files changed, 104 insertions(+), 12 deletions(-) diff --git a/.github/workflows/run_tests/action.yml b/.github/workflows/run_tests/action.yml index 14c4b3d6f46..1d81cb021b0 100644 --- a/.github/workflows/run_tests/action.yml +++ b/.github/workflows/run_tests/action.yml @@ -24,7 +24,14 @@ runs: run: | # Install cpu only pytorch pip install torch --index-url https://download.pytorch.org/whl/cpu - - name: Run python tests + # - name: Run python tests + # shell: bash + # working-directory: python + # run: make test + - name: Run namespace tests with logging (for flaky test debugging) shell: bash working-directory: python - run: make test + run: pytest python/tests/test_namespace_rest.py python/tests/test_namespace_dir.py -v -s + env: + LANCE_LOG: debug + RUST_LOG: lance_namespace_impls=info diff --git a/python/python/tests/test_namespace_rest.py b/python/python/tests/test_namespace_rest.py index 6b988d7c476..327ca0ab35f 100644 --- a/python/python/tests/test_namespace_rest.py +++ b/python/python/tests/test_namespace_rest.py @@ -11,12 +11,22 @@ DirectoryNamespace and RestNamespace implementations. """ +import logging +import os import tempfile import uuid import lance.namespace import pyarrow as pa import pytest + +# Enable debug logging for lance if LANCE_LOG env var is set +if os.environ.get("LANCE_LOG"): + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + logging.getLogger("lance").setLevel(logging.DEBUG) from lance_namespace import ( CreateEmptyTableRequest, CreateNamespaceRequest, @@ -64,10 +74,14 @@ def rest_namespace(): backend_config = {"root": tmpdir} port = 4000 + hash(unique_id) % 10000 + print(f"\n[FIXTURE] Creating RestAdapter with tmpdir={tmpdir}, port={port}") with lance.namespace.RestAdapter("dir", backend_config, port=port): + print(f"[FIXTURE] RestAdapter context entered, creating client") # Use lance.namespace.connect() for consistency client = connect("rest", {"uri": f"http://127.0.0.1:{port}"}) + print(f"[FIXTURE] Client created, yielding") yield client + print(f"[FIXTURE] Test completed, cleaning up") class TestCreateTable: diff --git a/python/src/namespace.rs b/python/src/namespace.rs index 47e23f94aa8..09d49975b49 100644 --- a/python/src/namespace.rs +++ b/python/src/namespace.rs @@ -371,11 +371,18 @@ impl PyRestAdapter { host: String, port: u16, ) -> PyResult { + log::info!( + "PyRestAdapter::new() creating backend with impl={}, host={}, port={}", + namespace_impl, + host, + port + ); let mut props = HashMap::new(); if let Some(dict) = namespace_properties { props = dict_to_hashmap(dict)?; } + log::info!("PyRestAdapter::new() properties={:?}", props); // Use ConnectBuilder to build namespace from impl and properties let mut builder = ConnectBuilder::new(namespace_impl); @@ -388,9 +395,11 @@ impl PyRestAdapter { builder = builder.session(sess.borrow().inner.clone()); } + log::info!("PyRestAdapter::new() calling builder.connect()"); let backend = crate::rt() .block_on(None, builder.connect())? .infer_error()?; + log::info!("PyRestAdapter::new() backend created successfully"); let config = RestAdapterConfig { host, port }; @@ -399,16 +408,25 @@ impl PyRestAdapter { /// Start the REST server in the background fn serve(&mut self, py: Python) -> PyResult<()> { + log::info!( + "PyRestAdapter::serve() starting server on {}:{}", + self.config.host, + self.config.port + ); let adapter = RestAdapter::new(self.backend.clone(), self.config.clone()); crate::rt().spawn_background(Some(py), async move { - let _ = adapter.serve().await; + log::info!("PyRestAdapter::serve() background task started, calling adapter.serve()"); + let result = adapter.serve().await; + log::info!("PyRestAdapter::serve() adapter.serve() returned: {:?}", result.is_ok()); }); // Give server time to start + log::info!("PyRestAdapter::serve() sleeping 500ms to wait for server startup"); py.allow_threads(|| { std::thread::sleep(std::time::Duration::from_millis(500)); }); + log::info!("PyRestAdapter::serve() done sleeping, returning"); Ok(()) } diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index d95e8118f6f..d2d4a9ceec3 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -144,21 +144,45 @@ impl DatasetConsistencyWrapper { async fn reload(&self) -> Result<()> { // First check if we need to reload (with read lock) let read_guard = self.0.read().await; + let dataset_uri = read_guard.uri().to_string(); + let current_version = read_guard.version().version; + log::debug!( + "DatasetConsistencyWrapper::reload() starting for uri={}, current_version={}", + dataset_uri, + current_version + ); let latest_version = read_guard .latest_version_id() .await - .map_err(|e| Error::IO { - source: box_error(std::io::Error::other(format!( - "Failed to get latest version: {}", + .map_err(|e| { + log::error!( + "DatasetConsistencyWrapper::reload() failed to get latest version for uri={}, current_version={}, error={}", + dataset_uri, + current_version, e - ))), - location: location!(), + ); + Error::IO { + source: box_error(std::io::Error::other(format!( + "Failed to get latest version: {}", + e + ))), + location: location!(), + } })?; - let current_version = read_guard.version().version; + log::debug!( + "DatasetConsistencyWrapper::reload() got latest_version={} for uri={}, current_version={}", + latest_version, + dataset_uri, + current_version + ); drop(read_guard); // If already up-to-date, return early if latest_version == current_version { + log::debug!( + "DatasetConsistencyWrapper::reload() already up-to-date for uri={}", + dataset_uri + ); return Ok(()); } @@ -936,6 +960,10 @@ impl ManifestNamespace { session: Option>, ) -> Result { let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME); + log::info!( + "create_or_get_manifest: attempting to load manifest from {}", + manifest_path + ); let mut builder = DatasetBuilder::from_uri(&manifest_path); if let Some(sess) = session.clone() { @@ -947,6 +975,11 @@ impl ManifestNamespace { } let dataset_result = builder.load().await; + log::info!( + "create_or_get_manifest: load result for {} is {}", + manifest_path, + if dataset_result.is_ok() { "Ok" } else { "Err" } + ); if let Ok(dataset) = dataset_result { Ok(DatasetConsistencyWrapper::new(dataset)) @@ -975,7 +1008,12 @@ impl ManifestNamespace { location: location!(), })?; - log::info!("Successfully created manifest table at {}", manifest_path); + log::info!( + "Successfully created manifest table at {}, version={}, uri={}", + manifest_path, + dataset.version().version, + dataset.uri() + ); Ok(DatasetConsistencyWrapper::new(dataset)) } } diff --git a/rust/lance-namespace-impls/src/rest_adapter.rs b/rust/lance-namespace-impls/src/rest_adapter.rs index 2f454d8cd2a..9529275cca6 100644 --- a/rust/lance-namespace-impls/src/rest_adapter.rs +++ b/rust/lance-namespace-impls/src/rest_adapter.rs @@ -79,6 +79,7 @@ impl RestAdapter { /// Start the REST server (blocking) pub async fn serve(self) -> Result<()> { let addr = format!("{}:{}", self.config.host, self.config.port); + log::info!("RestAdapter::serve() binding to {}", addr); let listener = tokio::net::TcpListener::bind(&addr) .await .map_err(|e| Error::IO { @@ -86,6 +87,10 @@ impl RestAdapter { location: snafu::location!(), })?; + log::info!( + "RestAdapter::serve() successfully bound to {}, starting to accept connections", + addr + ); axum::serve(listener, self.router()) .await .map_err(|e| Error::IO { @@ -191,10 +196,20 @@ async fn create_namespace( Json(mut request): Json, ) -> Response { request.id = Some(parse_id(&id, params.delimiter.as_deref())); + log::info!( + "REST create_namespace: received request for id={:?}", + request.id + ); match backend.create_namespace(request).await { - Ok(response) => (StatusCode::CREATED, Json(response)).into_response(), - Err(e) => error_to_response(e), + Ok(response) => { + log::info!("REST create_namespace: success"); + (StatusCode::CREATED, Json(response)).into_response() + } + Err(e) => { + log::error!("REST create_namespace: error={:?}", e); + error_to_response(e) + } } } From 882924fd993f3cbc3c5e5c16ae6282e6124c1513 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Mon, 1 Dec 2025 12:43:47 -0800 Subject: [PATCH 2/8] improve server start --- java/lance-jni/Cargo.lock | 12 +-- java/lance-jni/src/namespace.rs | 18 ++--- python/Cargo.lock | 12 +-- python/src/namespace.rs | 37 +++++---- rust/lance-namespace-impls/src/lib.rs | 2 +- .../lance-namespace-impls/src/rest_adapter.rs | 81 +++++++++++++------ 6 files changed, 103 insertions(+), 59 deletions(-) diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index 991ca0dcacd..e24ef2b1e9e 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -4414,12 +4414,13 @@ dependencies = [ [[package]] name = "object_store_opendal" -version = "0.54.1" +version = "0.55.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0b88fc0e0c4890c1d99e2b8c519c5db40f7d9b69a0f562ff1ad4967a4c8bbc6" +checksum = "113ab0769e972eee585e57407b98de08bda5354fa28e8ba4d89038d6cb6a8991" dependencies = [ "async-trait", "bytes", + "chrono", "futures", "object_store", "opendal", @@ -4447,20 +4448,20 @@ checksum = "b4ce411919553d3f9fa53a0880544cda985a112117a0444d5ff1e870a893d6ea" [[package]] name = "opendal" -version = "0.54.1" +version = "0.55.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42afda58fa2cf50914402d132cc1caacff116a85d10c72ab2082bb7c50021754" +checksum = "d075ab8a203a6ab4bc1bce0a4b9fe486a72bf8b939037f4b78d95386384bc80a" dependencies = [ "anyhow", "backon", "base64 0.22.1", "bytes", - "chrono", "crc32c", "futures", "getrandom 0.2.16", "http 1.4.0", "http-body 1.0.1", + "jiff", "log", "md-5", "percent-encoding", @@ -4471,6 +4472,7 @@ dependencies = [ "serde_json", "sha2", "tokio", + "url", "uuid", ] diff --git a/java/lance-jni/src/namespace.rs b/java/lance-jni/src/namespace.rs index 70f4d27f626..e70c4df9774 100644 --- a/java/lance-jni/src/namespace.rs +++ b/java/lance-jni/src/namespace.rs @@ -1225,7 +1225,7 @@ fn call_rest_namespace_query_method<'local>( pub struct BlockingRestAdapter { backend: Arc, config: RestAdapterConfig, - server_handle: Option>, + server_handle: Option, } #[no_mangle] @@ -1300,16 +1300,12 @@ fn serve_internal(handle: jlong) -> Result<()> { let rest_adapter = RestAdapter::new(adapter.backend.clone(), adapter.config.clone()); - // Spawn server in background - let server_handle = RT.spawn(async move { - let _ = rest_adapter.serve().await; - }); + // Start server - this binds the port and returns immediately + // If binding fails, an error is returned immediately + let server_handle = RT.block_on(rest_adapter.start())?; adapter.server_handle = Some(server_handle); - // Give server time to start - std::thread::sleep(std::time::Duration::from_millis(500)); - Ok(()) } @@ -1322,7 +1318,9 @@ pub extern "system" fn Java_org_lance_namespace_RestAdapter_stop( let adapter = unsafe { &mut *(handle as *mut BlockingRestAdapter) }; if let Some(server_handle) = adapter.server_handle.take() { - server_handle.abort(); + server_handle.shutdown(); + // Give server time to shutdown gracefully + std::thread::sleep(std::time::Duration::from_millis(100)); } } @@ -1336,7 +1334,7 @@ pub extern "system" fn Java_org_lance_namespace_RestAdapter_releaseNative( unsafe { let mut adapter = Box::from_raw(handle as *mut BlockingRestAdapter); if let Some(server_handle) = adapter.server_handle.take() { - server_handle.abort(); + server_handle.shutdown(); } } } diff --git a/python/Cargo.lock b/python/Cargo.lock index 0a73294833c..ae3aee8dc75 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -5083,12 +5083,13 @@ dependencies = [ [[package]] name = "object_store_opendal" -version = "0.54.1" +version = "0.55.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0b88fc0e0c4890c1d99e2b8c519c5db40f7d9b69a0f562ff1ad4967a4c8bbc6" +checksum = "113ab0769e972eee585e57407b98de08bda5354fa28e8ba4d89038d6cb6a8991" dependencies = [ "async-trait", "bytes", + "chrono", "futures", "object_store", "opendal", @@ -5116,20 +5117,20 @@ checksum = "b4ce411919553d3f9fa53a0880544cda985a112117a0444d5ff1e870a893d6ea" [[package]] name = "opendal" -version = "0.54.1" +version = "0.55.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42afda58fa2cf50914402d132cc1caacff116a85d10c72ab2082bb7c50021754" +checksum = "d075ab8a203a6ab4bc1bce0a4b9fe486a72bf8b939037f4b78d95386384bc80a" dependencies = [ "anyhow", "backon", "base64 0.22.1", "bytes", - "chrono", "crc32c", "futures", "getrandom 0.2.16", "http 1.4.0", "http-body 1.0.1", + "jiff", "log", "md-5", "percent-encoding", @@ -5140,6 +5141,7 @@ dependencies = [ "serde_json", "sha2", "tokio", + "url", "uuid", ] diff --git a/python/src/namespace.rs b/python/src/namespace.rs index 09d49975b49..b644f9c2ec2 100644 --- a/python/src/namespace.rs +++ b/python/src/namespace.rs @@ -11,7 +11,7 @@ use lance_namespace_impls::DirectoryNamespaceBuilder; #[cfg(feature = "rest")] use lance_namespace_impls::RestNamespaceBuilder; #[cfg(feature = "rest-adapter")] -use lance_namespace_impls::{ConnectBuilder, RestAdapter, RestAdapterConfig}; +use lance_namespace_impls::{ConnectBuilder, RestAdapter, RestAdapterConfig, RestAdapterHandle}; use pyo3::prelude::*; use pyo3::types::{PyBytes, PyDict}; use pythonize::{depythonize, pythonize}; @@ -356,6 +356,7 @@ impl PyRestNamespace { pub struct PyRestAdapter { backend: Arc, config: RestAdapterConfig, + handle: Option, } #[cfg(feature = "rest-adapter")] @@ -403,7 +404,11 @@ impl PyRestAdapter { let config = RestAdapterConfig { host, port }; - Ok(Self { backend, config }) + Ok(Self { + backend, + config, + handle: None, + }) } /// Start the REST server in the background @@ -413,27 +418,29 @@ impl PyRestAdapter { self.config.host, self.config.port ); - let adapter = RestAdapter::new(self.backend.clone(), self.config.clone()); - crate::rt().spawn_background(Some(py), async move { - log::info!("PyRestAdapter::serve() background task started, calling adapter.serve()"); - let result = adapter.serve().await; - log::info!("PyRestAdapter::serve() adapter.serve() returned: {:?}", result.is_ok()); - }); + let adapter = RestAdapter::new(self.backend.clone(), self.config.clone()); - // Give server time to start - log::info!("PyRestAdapter::serve() sleeping 500ms to wait for server startup"); - py.allow_threads(|| { - std::thread::sleep(std::time::Duration::from_millis(500)); - }); - log::info!("PyRestAdapter::serve() done sleeping, returning"); + // Start the server - this binds the port and returns immediately + // If binding fails, an error is returned immediately + let handle = crate::rt() + .block_on(Some(py), adapter.start())? + .infer_error()?; + self.handle = Some(handle); + log::info!("PyRestAdapter::serve() server started successfully"); Ok(()) } /// Stop the REST server fn stop(&mut self) { - // Server will be stopped when dropped + if let Some(handle) = self.handle.take() { + log::info!("PyRestAdapter::stop() shutting down server"); + handle.shutdown(); + // Give server time to shutdown gracefully + std::thread::sleep(std::time::Duration::from_millis(100)); + log::info!("PyRestAdapter::stop() server shutdown complete"); + } } fn __enter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { diff --git a/rust/lance-namespace-impls/src/lib.rs b/rust/lance-namespace-impls/src/lib.rs index 5c72cd56dc5..634199ce98a 100644 --- a/rust/lance-namespace-impls/src/lib.rs +++ b/rust/lance-namespace-impls/src/lib.rs @@ -48,4 +48,4 @@ pub use dir::{manifest::ManifestNamespace, DirectoryNamespace, DirectoryNamespac pub use rest::{RestNamespace, RestNamespaceBuilder}; #[cfg(feature = "rest-adapter")] -pub use rest_adapter::{RestAdapter, RestAdapterConfig}; +pub use rest_adapter::{RestAdapter, RestAdapterConfig, RestAdapterHandle}; diff --git a/rust/lance-namespace-impls/src/rest_adapter.rs b/rust/lance-namespace-impls/src/rest_adapter.rs index 9529275cca6..8c3c59cdc40 100644 --- a/rust/lance-namespace-impls/src/rest_adapter.rs +++ b/rust/lance-namespace-impls/src/rest_adapter.rs @@ -18,6 +18,7 @@ use axum::{ Json, Router, }; use serde::Deserialize; +use tokio::sync::watch; use tower_http::trace::TraceLayer; use lance_core::{Error, Result}; @@ -76,29 +77,69 @@ impl RestAdapter { .with_state(self.backend.clone()) } - /// Start the REST server (blocking) - pub async fn serve(self) -> Result<()> { + /// Start the REST server in the background and return a handle for shutdown. + /// + /// This method binds to the configured address and spawns a background task + /// to handle requests. The returned handle can be used to gracefully shut down + /// the server. + /// + /// Returns an error immediately if the server fails to bind to the address. + pub async fn start(self) -> Result { let addr = format!("{}:{}", self.config.host, self.config.port); - log::info!("RestAdapter::serve() binding to {}", addr); + log::info!("RestAdapter::start() binding to {}", addr); + let listener = tokio::net::TcpListener::bind(&addr) .await - .map_err(|e| Error::IO { - source: Box::new(e), - location: snafu::location!(), + .map_err(|e| { + log::error!("RestAdapter::start() failed to bind to {}: {}", addr, e); + Error::IO { + source: Box::new(e), + location: snafu::location!(), + } })?; log::info!( - "RestAdapter::serve() successfully bound to {}, starting to accept connections", + "RestAdapter::start() successfully bound to {}, starting server", addr ); - axum::serve(listener, self.router()) - .await - .map_err(|e| Error::IO { - source: Box::new(e), - location: snafu::location!(), - })?; - Ok(()) + let (shutdown_tx, mut shutdown_rx) = watch::channel(false); + let router = self.router(); + + tokio::spawn(async move { + let result = axum::serve(listener, router) + .with_graceful_shutdown(async move { + let _ = shutdown_rx.changed().await; + log::info!("RestAdapter: received shutdown signal"); + }) + .await; + + if let Err(e) = result { + log::error!("RestAdapter: server error: {}", e); + } else { + log::info!("RestAdapter: shut down gracefully"); + } + }); + + Ok(RestAdapterHandle { shutdown_tx }) + } +} + +/// Handle for controlling a running REST adapter server. +/// +/// Use this handle to gracefully shut down the server when it's no longer needed. +pub struct RestAdapterHandle { + shutdown_tx: watch::Sender, +} + +impl RestAdapterHandle { + /// Gracefully shut down the server. + /// + /// This signals the server to stop accepting new connections and wait for + /// existing connections to complete. + pub fn shutdown(&self) { + log::info!("RestAdapterHandle::shutdown() sending shutdown signal"); + let _ = self.shutdown_tx.send(true); } } @@ -491,13 +532,12 @@ mod tests { use crate::{DirectoryNamespaceBuilder, RestNamespaceBuilder}; use std::sync::Arc; use tempfile::TempDir; - use tokio::task::JoinHandle; /// Test fixture that manages server lifecycle struct RestServerFixture { _temp_dir: TempDir, namespace: crate::RestNamespace, - server_handle: JoinHandle<()>, + server_handle: RestAdapterHandle, } impl RestServerFixture { @@ -520,12 +560,7 @@ mod tests { }; let server = RestAdapter::new(backend.clone(), config); - let server_handle = tokio::spawn(async move { - server.serve().await.unwrap(); - }); - - // Give server time to start - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + let server_handle = server.start().await.unwrap(); // Create RestNamespace client let server_url = format!("http://127.0.0.1:{}", port); @@ -543,7 +578,7 @@ mod tests { impl Drop for RestServerFixture { fn drop(&mut self) { - self.server_handle.abort(); + self.server_handle.shutdown(); } } From ec244a95b2ca9cbea5e0b0e5585b382b8b4ebb61 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Mon, 1 Dec 2025 13:05:03 -0800 Subject: [PATCH 3/8] cleanup --- .../lance/namespace/RestNamespaceTest.java | 3 +- python/python/tests/test_namespace_rest.py | 13 +++---- python/src/namespace.rs | 24 ------------- .../lance-namespace-impls/src/rest_adapter.rs | 35 +++++-------------- 4 files changed, 14 insertions(+), 61 deletions(-) diff --git a/java/src/test/java/org/lance/namespace/RestNamespaceTest.java b/java/src/test/java/org/lance/namespace/RestNamespaceTest.java index 3e861de44e4..1845d51eaaa 100644 --- a/java/src/test/java/org/lance/namespace/RestNamespaceTest.java +++ b/java/src/test/java/org/lance/namespace/RestNamespaceTest.java @@ -57,8 +57,7 @@ public class RestNamespaceTest { void setUp() { allocator = new RootAllocator(Long.MAX_VALUE); - // Use a random port to avoid conflicts - port = 4000 + new Random().nextInt(10000); + port = 10000 + new Random().nextInt(10000); // Create backend configuration for DirectoryNamespace Map backendConfig = new HashMap<>(); diff --git a/python/python/tests/test_namespace_rest.py b/python/python/tests/test_namespace_rest.py index 327ca0ab35f..fe020c0c389 100644 --- a/python/python/tests/test_namespace_rest.py +++ b/python/python/tests/test_namespace_rest.py @@ -72,16 +72,11 @@ def rest_namespace(): unique_id = uuid.uuid4().hex[:8] with tempfile.TemporaryDirectory() as tmpdir: backend_config = {"root": tmpdir} - port = 4000 + hash(unique_id) % 10000 + port = 10000 + hash(unique_id) % 10000 - print(f"\n[FIXTURE] Creating RestAdapter with tmpdir={tmpdir}, port={port}") with lance.namespace.RestAdapter("dir", backend_config, port=port): - print(f"[FIXTURE] RestAdapter context entered, creating client") - # Use lance.namespace.connect() for consistency client = connect("rest", {"uri": f"http://127.0.0.1:{port}"}) - print(f"[FIXTURE] Client created, yielding") yield client - print(f"[FIXTURE] Test completed, cleaning up") class TestCreateTable: @@ -662,7 +657,8 @@ def test_connect_with_rest(self): unique_id = uuid.uuid4().hex[:8] with tempfile.TemporaryDirectory() as tmpdir: backend_config = {"root": tmpdir} - port = 4000 + hash(unique_id) % 10000 + # Use port above 10000 to avoid conflicts with LocalStack (4566) + port = 10000 + hash(unique_id) % 10000 with lance.namespace.RestAdapter("dir", backend_config, port=port): # Connect via lance.namespace.connect @@ -690,7 +686,8 @@ def test_connect_with_custom_delimiter(self): unique_id = uuid.uuid4().hex[:8] with tempfile.TemporaryDirectory() as tmpdir: backend_config = {"root": tmpdir} - port = 4000 + hash(unique_id) % 10000 + # Use port above 10000 to avoid conflicts with LocalStack (4566) + port = 10000 + hash(unique_id) % 10000 with lance.namespace.RestAdapter("dir", backend_config, port=port): # Connect with custom delimiter diff --git a/python/src/namespace.rs b/python/src/namespace.rs index b644f9c2ec2..c1ec45015e3 100644 --- a/python/src/namespace.rs +++ b/python/src/namespace.rs @@ -372,35 +372,24 @@ impl PyRestAdapter { host: String, port: u16, ) -> PyResult { - log::info!( - "PyRestAdapter::new() creating backend with impl={}, host={}, port={}", - namespace_impl, - host, - port - ); let mut props = HashMap::new(); if let Some(dict) = namespace_properties { props = dict_to_hashmap(dict)?; } - log::info!("PyRestAdapter::new() properties={:?}", props); - // Use ConnectBuilder to build namespace from impl and properties let mut builder = ConnectBuilder::new(namespace_impl); for (k, v) in props { builder = builder.property(k, v); } - // Add session if provided if let Some(sess) = session { builder = builder.session(sess.borrow().inner.clone()); } - log::info!("PyRestAdapter::new() calling builder.connect()"); let backend = crate::rt() .block_on(None, builder.connect())? .infer_error()?; - log::info!("PyRestAdapter::new() backend created successfully"); let config = RestAdapterConfig { host, port }; @@ -413,33 +402,20 @@ impl PyRestAdapter { /// Start the REST server in the background fn serve(&mut self, py: Python) -> PyResult<()> { - log::info!( - "PyRestAdapter::serve() starting server on {}:{}", - self.config.host, - self.config.port - ); - let adapter = RestAdapter::new(self.backend.clone(), self.config.clone()); - - // Start the server - this binds the port and returns immediately - // If binding fails, an error is returned immediately let handle = crate::rt() .block_on(Some(py), adapter.start())? .infer_error()?; self.handle = Some(handle); - log::info!("PyRestAdapter::serve() server started successfully"); Ok(()) } /// Stop the REST server fn stop(&mut self) { if let Some(handle) = self.handle.take() { - log::info!("PyRestAdapter::stop() shutting down server"); handle.shutdown(); - // Give server time to shutdown gracefully std::thread::sleep(std::time::Duration::from_millis(100)); - log::info!("PyRestAdapter::stop() server shutdown complete"); } } diff --git a/rust/lance-namespace-impls/src/rest_adapter.rs b/rust/lance-namespace-impls/src/rest_adapter.rs index 8c3c59cdc40..b0e963a8f19 100644 --- a/rust/lance-namespace-impls/src/rest_adapter.rs +++ b/rust/lance-namespace-impls/src/rest_adapter.rs @@ -86,22 +86,14 @@ impl RestAdapter { /// Returns an error immediately if the server fails to bind to the address. pub async fn start(self) -> Result { let addr = format!("{}:{}", self.config.host, self.config.port); - log::info!("RestAdapter::start() binding to {}", addr); - let listener = tokio::net::TcpListener::bind(&addr) - .await - .map_err(|e| { - log::error!("RestAdapter::start() failed to bind to {}: {}", addr, e); - Error::IO { - source: Box::new(e), - location: snafu::location!(), - } - })?; - - log::info!( - "RestAdapter::start() successfully bound to {}, starting server", - addr - ); + let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| { + log::error!("RestAdapter::start() failed to bind to {}: {}", addr, e); + Error::IO { + source: Box::new(e), + location: snafu::location!(), + } + })?; let (shutdown_tx, mut shutdown_rx) = watch::channel(false); let router = self.router(); @@ -110,14 +102,11 @@ impl RestAdapter { let result = axum::serve(listener, router) .with_graceful_shutdown(async move { let _ = shutdown_rx.changed().await; - log::info!("RestAdapter: received shutdown signal"); }) .await; if let Err(e) = result { log::error!("RestAdapter: server error: {}", e); - } else { - log::info!("RestAdapter: shut down gracefully"); } }); @@ -138,7 +127,6 @@ impl RestAdapterHandle { /// This signals the server to stop accepting new connections and wait for /// existing connections to complete. pub fn shutdown(&self) { - log::info!("RestAdapterHandle::shutdown() sending shutdown signal"); let _ = self.shutdown_tx.send(true); } } @@ -237,16 +225,9 @@ async fn create_namespace( Json(mut request): Json, ) -> Response { request.id = Some(parse_id(&id, params.delimiter.as_deref())); - log::info!( - "REST create_namespace: received request for id={:?}", - request.id - ); match backend.create_namespace(request).await { - Ok(response) => { - log::info!("REST create_namespace: success"); - (StatusCode::CREATED, Json(response)).into_response() - } + Ok(response) => (StatusCode::CREATED, Json(response)).into_response(), Err(e) => { log::error!("REST create_namespace: error={:?}", e); error_to_response(e) From 256cc5327a961b1e22c23a1820ec370eafe3e284 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Mon, 1 Dec 2025 13:13:23 -0800 Subject: [PATCH 4/8] cleanup2 --- java/lance-jni/src/namespace.rs | 7 ---- python/python/tests/test_namespace_rest.py | 9 ----- python/src/namespace.rs | 2 + .../lance-namespace-impls/src/dir/manifest.rs | 38 +++++-------------- .../lance-namespace-impls/src/rest_adapter.rs | 5 +-- 5 files changed, 12 insertions(+), 49 deletions(-) diff --git a/java/lance-jni/src/namespace.rs b/java/lance-jni/src/namespace.rs index e70c4df9774..16843dbdafc 100644 --- a/java/lance-jni/src/namespace.rs +++ b/java/lance-jni/src/namespace.rs @@ -1297,15 +1297,9 @@ pub extern "system" fn Java_org_lance_namespace_RestAdapter_serve( fn serve_internal(handle: jlong) -> Result<()> { let adapter = unsafe { &mut *(handle as *mut BlockingRestAdapter) }; - let rest_adapter = RestAdapter::new(adapter.backend.clone(), adapter.config.clone()); - - // Start server - this binds the port and returns immediately - // If binding fails, an error is returned immediately let server_handle = RT.block_on(rest_adapter.start())?; - adapter.server_handle = Some(server_handle); - Ok(()) } @@ -1319,7 +1313,6 @@ pub extern "system" fn Java_org_lance_namespace_RestAdapter_stop( if let Some(server_handle) = adapter.server_handle.take() { server_handle.shutdown(); - // Give server time to shutdown gracefully std::thread::sleep(std::time::Duration::from_millis(100)); } } diff --git a/python/python/tests/test_namespace_rest.py b/python/python/tests/test_namespace_rest.py index fe020c0c389..4f9436086d4 100644 --- a/python/python/tests/test_namespace_rest.py +++ b/python/python/tests/test_namespace_rest.py @@ -11,8 +11,6 @@ DirectoryNamespace and RestNamespace implementations. """ -import logging -import os import tempfile import uuid @@ -20,13 +18,6 @@ import pyarrow as pa import pytest -# Enable debug logging for lance if LANCE_LOG env var is set -if os.environ.get("LANCE_LOG"): - logging.basicConfig( - level=logging.DEBUG, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - ) - logging.getLogger("lance").setLevel(logging.DEBUG) from lance_namespace import ( CreateEmptyTableRequest, CreateNamespaceRequest, diff --git a/python/src/namespace.rs b/python/src/namespace.rs index c1ec45015e3..70be75c46a8 100644 --- a/python/src/namespace.rs +++ b/python/src/namespace.rs @@ -378,11 +378,13 @@ impl PyRestAdapter { props = dict_to_hashmap(dict)?; } + // Use ConnectBuilder to build namespace from impl and properties let mut builder = ConnectBuilder::new(namespace_impl); for (k, v) in props { builder = builder.property(k, v); } + // Add session if provided if let Some(sess) = session { builder = builder.session(sess.borrow().inner.clone()); } diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index d2d4a9ceec3..ddc934d79ee 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -147,30 +147,22 @@ impl DatasetConsistencyWrapper { let dataset_uri = read_guard.uri().to_string(); let current_version = read_guard.version().version; log::debug!( - "DatasetConsistencyWrapper::reload() starting for uri={}, current_version={}", + "Reload starting for uri={}, current_version={}", dataset_uri, current_version ); let latest_version = read_guard .latest_version_id() .await - .map_err(|e| { - log::error!( - "DatasetConsistencyWrapper::reload() failed to get latest version for uri={}, current_version={}, error={}", - dataset_uri, - current_version, + .map_err(|e| Error::IO { + source: box_error(std::io::Error::other(format!( + "Failed to get latest version: {}", e - ); - Error::IO { - source: box_error(std::io::Error::other(format!( - "Failed to get latest version: {}", - e - ))), - location: location!(), - } + ))), + location: location!(), })?; log::debug!( - "DatasetConsistencyWrapper::reload() got latest_version={} for uri={}, current_version={}", + "Reload got latest_version={} for uri={}, current_version={}", latest_version, dataset_uri, current_version @@ -179,10 +171,7 @@ impl DatasetConsistencyWrapper { // If already up-to-date, return early if latest_version == current_version { - log::debug!( - "DatasetConsistencyWrapper::reload() already up-to-date for uri={}", - dataset_uri - ); + log::debug!("Already up-to-date for uri={}", dataset_uri); return Ok(()); } @@ -960,10 +949,7 @@ impl ManifestNamespace { session: Option>, ) -> Result { let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME); - log::info!( - "create_or_get_manifest: attempting to load manifest from {}", - manifest_path - ); + log::debug!("Attempting to load manifest from {}", manifest_path); let mut builder = DatasetBuilder::from_uri(&manifest_path); if let Some(sess) = session.clone() { @@ -975,12 +961,6 @@ impl ManifestNamespace { } let dataset_result = builder.load().await; - log::info!( - "create_or_get_manifest: load result for {} is {}", - manifest_path, - if dataset_result.is_ok() { "Ok" } else { "Err" } - ); - if let Ok(dataset) = dataset_result { Ok(DatasetConsistencyWrapper::new(dataset)) } else { diff --git a/rust/lance-namespace-impls/src/rest_adapter.rs b/rust/lance-namespace-impls/src/rest_adapter.rs index b0e963a8f19..deb9fa34b78 100644 --- a/rust/lance-namespace-impls/src/rest_adapter.rs +++ b/rust/lance-namespace-impls/src/rest_adapter.rs @@ -228,10 +228,7 @@ async fn create_namespace( match backend.create_namespace(request).await { Ok(response) => (StatusCode::CREATED, Json(response)).into_response(), - Err(e) => { - log::error!("REST create_namespace: error={:?}", e); - error_to_response(e) - } + Err(e) => error_to_response(e), } } From fdbd49c926cdc60a1543f2a5bca35bb01b696a61 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Mon, 1 Dec 2025 13:14:01 -0800 Subject: [PATCH 5/8] workflow --- .github/workflows/run_tests/action.yml | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/.github/workflows/run_tests/action.yml b/.github/workflows/run_tests/action.yml index 1d81cb021b0..14c4b3d6f46 100644 --- a/.github/workflows/run_tests/action.yml +++ b/.github/workflows/run_tests/action.yml @@ -24,14 +24,7 @@ runs: run: | # Install cpu only pytorch pip install torch --index-url https://download.pytorch.org/whl/cpu - # - name: Run python tests - # shell: bash - # working-directory: python - # run: make test - - name: Run namespace tests with logging (for flaky test debugging) + - name: Run python tests shell: bash working-directory: python - run: pytest python/tests/test_namespace_rest.py python/tests/test_namespace_dir.py -v -s - env: - LANCE_LOG: debug - RUST_LOG: lance_namespace_impls=info + run: make test From ce725c8ec440294d4f3651a0131585989ef77bce Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Mon, 1 Dec 2025 13:30:57 -0800 Subject: [PATCH 6/8] lint --- python/python/tests/test_namespace_rest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/python/tests/test_namespace_rest.py b/python/python/tests/test_namespace_rest.py index 4f9436086d4..f96de0f3242 100644 --- a/python/python/tests/test_namespace_rest.py +++ b/python/python/tests/test_namespace_rest.py @@ -17,7 +17,6 @@ import lance.namespace import pyarrow as pa import pytest - from lance_namespace import ( CreateEmptyTableRequest, CreateNamespaceRequest, From 038ff859882c5e6c99cbb497228f3533bc78b7e9 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Mon, 1 Dec 2025 17:54:10 -0800 Subject: [PATCH 7/8] address comments --- java/lance-jni/src/namespace.rs | 15 +- .../java/org/lance/namespace/RestAdapter.java | 34 +++- .../lance/namespace/RestNamespaceTest.java | 8 +- python/python/lance/namespace.py | 22 ++- python/python/tests/test_namespace_rest.py | 29 +--- python/src/namespace.rs | 16 +- .../lance-namespace-impls/src/rest_adapter.rs | 157 +++++++++++------- 7 files changed, 173 insertions(+), 108 deletions(-) diff --git a/java/lance-jni/src/namespace.rs b/java/lance-jni/src/namespace.rs index 16843dbdafc..9f6b879f513 100644 --- a/java/lance-jni/src/namespace.rs +++ b/java/lance-jni/src/namespace.rs @@ -1303,6 +1303,20 @@ fn serve_internal(handle: jlong) -> Result<()> { Ok(()) } +#[no_mangle] +pub extern "system" fn Java_org_lance_namespace_RestAdapter_getPort( + _env: JNIEnv, + _obj: JObject, + handle: jlong, +) -> jni::sys::jint { + let adapter = unsafe { &*(handle as *const BlockingRestAdapter) }; + adapter + .server_handle + .as_ref() + .map(|h| h.port() as jni::sys::jint) + .unwrap_or(0) +} + #[no_mangle] pub extern "system" fn Java_org_lance_namespace_RestAdapter_stop( _env: JNIEnv, @@ -1313,7 +1327,6 @@ pub extern "system" fn Java_org_lance_namespace_RestAdapter_stop( if let Some(server_handle) = adapter.server_handle.take() { server_handle.shutdown(); - std::thread::sleep(std::time::Duration::from_millis(100)); } } diff --git a/java/src/main/java/org/lance/namespace/RestAdapter.java b/java/src/main/java/org/lance/namespace/RestAdapter.java index 7004994c97f..a8da5ed9904 100644 --- a/java/src/main/java/org/lance/namespace/RestAdapter.java +++ b/java/src/main/java/org/lance/namespace/RestAdapter.java @@ -30,12 +30,16 @@ * Map backendConfig = new HashMap<>(); * backendConfig.put("root", "/tmp/test-data"); * - * try (RestAdapter adapter = new RestAdapter("dir", backendConfig, "127.0.0.1", 8080)) { + * // Use port 0 to let OS assign an available port + * try (RestAdapter adapter = new RestAdapter("dir", backendConfig)) { * adapter.serve(); * + * // Get the actual port assigned by the OS + * int port = adapter.getPort(); + * * // Now you can connect with RestNamespace * Map clientConfig = new HashMap<>(); - * clientConfig.put("uri", "http://127.0.0.1:8080"); + * clientConfig.put("uri", "http://127.0.0.1:" + port); * RestNamespace client = new RestNamespace(); * client.initialize(clientConfig, allocator); * @@ -57,7 +61,7 @@ public class RestAdapter implements Closeable, AutoCloseable { * @param namespaceImpl The namespace implementation type (e.g., "dir" for DirectoryNamespace) * @param backendConfig Configuration properties for the backend namespace * @param host Host to bind the server to - * @param port Port to bind the server to + * @param port Port to bind the server to. Use 0 to let the OS assign an available port. */ public RestAdapter( String namespaceImpl, Map backendConfig, String host, int port) { @@ -70,21 +74,21 @@ public RestAdapter( if (host == null || host.isEmpty()) { throw new IllegalArgumentException("host cannot be null or empty"); } - if (port <= 0 || port > 65535) { - throw new IllegalArgumentException("port must be between 1 and 65535"); + if (port < 0 || port > 65535) { + throw new IllegalArgumentException("port must be between 0 and 65535"); } this.nativeRestAdapterHandle = createNative(namespaceImpl, backendConfig, host, port); } /** - * Creates a new REST adapter with default host (127.0.0.1) and port (2333). + * Creates a new REST adapter with default host (127.0.0.1) and port 0 (OS-assigned). * * @param namespaceImpl The namespace implementation type * @param backendConfig Configuration properties for the backend namespace */ public RestAdapter(String namespaceImpl, Map backendConfig) { - this(namespaceImpl, backendConfig, "127.0.0.1", 2333); + this(namespaceImpl, backendConfig, "127.0.0.1", 0); } /** @@ -105,6 +109,20 @@ public void serve() { serverStarted = true; } + /** + * Get the actual port the server is listening on. + * + *

This is useful when port 0 was specified to get an OS-assigned port. + * + * @return The actual port, or 0 if the server hasn't been started + */ + public int getPort() { + if (nativeRestAdapterHandle == 0) { + return 0; + } + return getPort(nativeRestAdapterHandle); + } + /** * Stop the REST server. * @@ -132,6 +150,8 @@ private native long createNative( private native void serve(long handle); + private native int getPort(long handle); + private native void stop(long handle); private native void releaseNative(long handle); diff --git a/java/src/test/java/org/lance/namespace/RestNamespaceTest.java b/java/src/test/java/org/lance/namespace/RestNamespaceTest.java index 1845d51eaaa..6f6ca6079e1 100644 --- a/java/src/test/java/org/lance/namespace/RestNamespaceTest.java +++ b/java/src/test/java/org/lance/namespace/RestNamespaceTest.java @@ -35,7 +35,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import java.util.Random; import static org.junit.jupiter.api.Assertions.*; @@ -57,15 +56,14 @@ public class RestNamespaceTest { void setUp() { allocator = new RootAllocator(Long.MAX_VALUE); - port = 10000 + new Random().nextInt(10000); - // Create backend configuration for DirectoryNamespace Map backendConfig = new HashMap<>(); backendConfig.put("root", tempDir.toString()); - // Create and start REST adapter - adapter = new RestAdapter("dir", backendConfig, "127.0.0.1", port); + // Create and start REST adapter (port 0 lets OS assign available port) + adapter = new RestAdapter("dir", backendConfig); adapter.serve(); + port = adapter.getPort(); // Create REST namespace client namespace = new RestNamespace(); diff --git a/python/python/lance/namespace.py b/python/python/lance/namespace.py index 4619f2ce244..f985f73fa2b 100644 --- a/python/python/lance/namespace.py +++ b/python/python/lance/namespace.py @@ -316,17 +316,18 @@ class RestAdapter: host : str, optional Host address to bind to, default "127.0.0.1" port : int, optional - Port to listen on, default 2333 + Port to listen on. Default 0 lets the OS assign an available port. + Use the `port` property after `serve()` to get the actual port. Examples -------- >>> import lance.namespace >>> - >>> # Start REST adapter with DirectoryNamespace backend + >>> # Start REST adapter with DirectoryNamespace backend (auto port) >>> namespace_config = {"root": "memory://test"} - >>> with lance.namespace.RestAdapter("dir", namespace_config, port=4001) as adapter: - ... # Create REST client - ... client = lance.namespace.RestNamespace(uri="http://127.0.0.1:4001") + >>> with lance.namespace.RestAdapter("dir", namespace_config) as adapter: + ... # Create REST client using the assigned port + ... client = lance.namespace.RestNamespace(uri=f"http://127.0.0.1:{adapter.port}") ... # Use the client... """ @@ -336,7 +337,7 @@ def __init__( namespace_properties: Dict[str, str] = None, session=None, host: str = "127.0.0.1", - port: int = 2333, + port: int = 0, ): if PyRestAdapter is None: raise RuntimeError( @@ -353,9 +354,16 @@ def __init__( # Create the underlying Rust adapter self._inner = PyRestAdapter(namespace_impl, str_properties, session, host, port) self.host = host - self.port = port self.namespace_impl = namespace_impl + @property + def port(self) -> int: + """Get the actual port the server is listening on. + + Returns 0 if the server hasn't been started yet. + """ + return self._inner.port + def serve(self): """Start the REST server in the background.""" self._inner.serve() diff --git a/python/python/tests/test_namespace_rest.py b/python/python/tests/test_namespace_rest.py index f96de0f3242..e83f090efbc 100644 --- a/python/python/tests/test_namespace_rest.py +++ b/python/python/tests/test_namespace_rest.py @@ -12,7 +12,6 @@ """ import tempfile -import uuid import lance.namespace import pyarrow as pa @@ -59,13 +58,11 @@ def table_to_ipc_bytes(table): @pytest.fixture def rest_namespace(): """Create a REST namespace with adapter for testing.""" - unique_id = uuid.uuid4().hex[:8] with tempfile.TemporaryDirectory() as tmpdir: backend_config = {"root": tmpdir} - port = 10000 + hash(unique_id) % 10000 - with lance.namespace.RestAdapter("dir", backend_config, port=port): - client = connect("rest", {"uri": f"http://127.0.0.1:{port}"}) + with lance.namespace.RestAdapter("dir", backend_config) as adapter: + client = connect("rest", {"uri": f"http://127.0.0.1:{adapter.port}"}) yield client @@ -644,28 +641,21 @@ class TestLanceNamespaceConnect: def test_connect_with_rest(self): """Test creating RestNamespace via lance.namespace.connect().""" - unique_id = uuid.uuid4().hex[:8] with tempfile.TemporaryDirectory() as tmpdir: backend_config = {"root": tmpdir} - # Use port above 10000 to avoid conflicts with LocalStack (4566) - port = 10000 + hash(unique_id) % 10000 - with lance.namespace.RestAdapter("dir", backend_config, port=port): - # Connect via lance.namespace.connect - properties = {"uri": f"http://127.0.0.1:{port}"} + with lance.namespace.RestAdapter("dir", backend_config) as adapter: + properties = {"uri": f"http://127.0.0.1:{adapter.port}"} ns = connect("rest", properties) - # Verify it's a RestNamespace instance assert isinstance(ns, lance.namespace.RestNamespace) - # Verify it works create_req = CreateTableRequest(id=["test_table"]) table_data = create_test_data() ipc_data = table_to_ipc_bytes(table_data) response = ns.create_table(create_req, ipc_data) assert response is not None - # Verify we can list the table list_req = ListTablesRequest(id=[]) list_response = ns.list_tables(list_req) assert len(list_response.tables) == 1 @@ -673,25 +663,18 @@ def test_connect_with_rest(self): def test_connect_with_custom_delimiter(self): """Test creating RestNamespace with custom delimiter via connect().""" - unique_id = uuid.uuid4().hex[:8] with tempfile.TemporaryDirectory() as tmpdir: backend_config = {"root": tmpdir} - # Use port above 10000 to avoid conflicts with LocalStack (4566) - port = 10000 + hash(unique_id) % 10000 - with lance.namespace.RestAdapter("dir", backend_config, port=port): - # Connect with custom delimiter - # Use URL-friendly delimiter instead of default '$' + with lance.namespace.RestAdapter("dir", backend_config) as adapter: properties = { - "uri": f"http://127.0.0.1:{port}", + "uri": f"http://127.0.0.1:{adapter.port}", "delimiter": "@", } ns = connect("rest", properties) - # Verify it's a RestNamespace instance assert isinstance(ns, lance.namespace.RestNamespace) - # This should work without errors create_req = CreateTableRequest(id=["test_table"]) table_data = create_test_data() ipc_data = table_to_ipc_bytes(table_data) diff --git a/python/src/namespace.rs b/python/src/namespace.rs index 70be75c46a8..9b695da185a 100644 --- a/python/src/namespace.rs +++ b/python/src/namespace.rs @@ -362,9 +362,11 @@ pub struct PyRestAdapter { #[cfg(feature = "rest-adapter")] #[pymethods] impl PyRestAdapter { - /// Create a new REST adapter server with namespace configuration + /// Create a new REST adapter server with namespace configuration. + /// If port is 0 (default), the OS will assign an available ephemeral port. + /// Use `port` property after `serve()` to get the actual port. #[new] - #[pyo3(signature = (namespace_impl, namespace_properties, session = None, host = "127.0.0.1".to_string(), port = 2333))] + #[pyo3(signature = (namespace_impl, namespace_properties, session = None, host = "127.0.0.1".to_string(), port = 0))] fn new( namespace_impl: String, namespace_properties: Option<&Bound<'_, PyDict>>, @@ -378,13 +380,11 @@ impl PyRestAdapter { props = dict_to_hashmap(dict)?; } - // Use ConnectBuilder to build namespace from impl and properties let mut builder = ConnectBuilder::new(namespace_impl); for (k, v) in props { builder = builder.property(k, v); } - // Add session if provided if let Some(sess) = session { builder = builder.session(sess.borrow().inner.clone()); } @@ -402,6 +402,13 @@ impl PyRestAdapter { }) } + /// Get the actual port the server is listening on. + /// Returns 0 if server is not started yet. + #[getter] + fn port(&self) -> u16 { + self.handle.as_ref().map(|h| h.port()).unwrap_or(0) + } + /// Start the REST server in the background fn serve(&mut self, py: Python) -> PyResult<()> { let adapter = RestAdapter::new(self.backend.clone(), self.config.clone()); @@ -417,7 +424,6 @@ impl PyRestAdapter { fn stop(&mut self) { if let Some(handle) = self.handle.take() { handle.shutdown(); - std::thread::sleep(std::time::Duration::from_millis(100)); } } diff --git a/rust/lance-namespace-impls/src/rest_adapter.rs b/rust/lance-namespace-impls/src/rest_adapter.rs index deb9fa34b78..e93a391c0a3 100644 --- a/rust/lance-namespace-impls/src/rest_adapter.rs +++ b/rust/lance-namespace-impls/src/rest_adapter.rs @@ -38,7 +38,7 @@ impl Default for RestAdapterConfig { fn default() -> Self { Self { host: "127.0.0.1".to_string(), - port: 2333, + port: 0, } } } @@ -84,6 +84,8 @@ impl RestAdapter { /// the server. /// /// Returns an error immediately if the server fails to bind to the address. + /// If port 0 is specified, the OS will assign an available ephemeral port. + /// The actual port can be retrieved from the returned handle via `port()`. pub async fn start(self) -> Result { let addr = format!("{}:{}", self.config.host, self.config.port); @@ -95,7 +97,11 @@ impl RestAdapter { } })?; + // Get the actual port (important when port 0 was specified) + let actual_port = listener.local_addr().map(|a| a.port()).unwrap_or(0); + let (shutdown_tx, mut shutdown_rx) = watch::channel(false); + let (done_tx, done_rx) = tokio::sync::oneshot::channel::<()>(); let router = self.router(); tokio::spawn(async move { @@ -108,9 +114,16 @@ impl RestAdapter { if let Err(e) = result { log::error!("RestAdapter: server error: {}", e); } + + // Signal that server has shut down + let _ = done_tx.send(()); }); - Ok(RestAdapterHandle { shutdown_tx }) + Ok(RestAdapterHandle { + shutdown_tx, + done_rx: std::sync::Mutex::new(Some(done_rx)), + port: actual_port, + }) } } @@ -119,15 +132,39 @@ impl RestAdapter { /// Use this handle to gracefully shut down the server when it's no longer needed. pub struct RestAdapterHandle { shutdown_tx: watch::Sender, + done_rx: std::sync::Mutex>>, + port: u16, } impl RestAdapterHandle { - /// Gracefully shut down the server. + /// Get the actual port the server is listening on. + /// This is useful when port 0 was specified to get an OS-assigned port. + pub fn port(&self) -> u16 { + self.port + } + + /// Gracefully shut down the server and wait for it to complete. /// - /// This signals the server to stop accepting new connections and wait for - /// existing connections to complete. + /// This signals the server to stop accepting new connections, waits for + /// existing connections to complete, and blocks until the server has + /// fully shut down. pub fn shutdown(&self) { + // Send shutdown signal let _ = self.shutdown_tx.send(true); + + // Wait for server to complete + if let Some(done_rx) = self.done_rx.lock().unwrap().take() { + // Use a new runtime to block on the oneshot receiver + // This is needed because shutdown() is called from sync context + let _ = std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let _ = rt.block_on(done_rx); + }) + .join(); + } } } @@ -519,7 +556,7 @@ mod tests { } impl RestServerFixture { - async fn new(port: u16) -> Self { + async fn new() -> Self { let temp_dir = TempDir::new().unwrap(); let temp_path = temp_dir.path().to_str().unwrap().to_string(); @@ -531,17 +568,17 @@ mod tests { .unwrap(); let backend = Arc::new(backend); - // Start REST server - let config = RestAdapterConfig { - host: "127.0.0.1".to_string(), - port, - }; + // Start REST server with port 0 (OS assigns available port) + let config = RestAdapterConfig::default(); let server = RestAdapter::new(backend.clone(), config); let server_handle = server.start().await.unwrap(); + // Get the actual port assigned by OS + let actual_port = server_handle.port(); + // Create RestNamespace client - let server_url = format!("http://127.0.0.1:{}", port); + let server_url = format!("http://127.0.0.1:{}", actual_port); let namespace = RestNamespaceBuilder::new(&server_url) .delimiter("$") .build(); @@ -591,9 +628,9 @@ mod tests { Bytes::from(buffer) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_create_and_list_child_namespaces() { - let fixture = RestServerFixture::new(4001).await; + let fixture = RestServerFixture::new().await; // Create child namespaces for i in 1..=3 { @@ -621,9 +658,9 @@ mod tests { assert!(namespaces.namespaces.contains(&"namespace3".to_string())); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_nested_namespace_hierarchy() { - let fixture = RestServerFixture::new(4002).await; + let fixture = RestServerFixture::new().await; // Create parent namespace let create_req = CreateNamespaceRequest { @@ -674,9 +711,9 @@ mod tests { assert!(children.contains(&"child2".to_string())); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_create_table_in_child_namespace() { - let fixture = RestServerFixture::new(4003).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create child namespace first @@ -727,9 +764,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_list_tables_in_child_namespace() { - let fixture = RestServerFixture::new(4004).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create child namespace @@ -774,9 +811,9 @@ mod tests { assert!(tables.tables.contains(&"table3".to_string())); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_table_exists_in_child_namespace() { - let fixture = RestServerFixture::new(4005).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create child namespace @@ -811,9 +848,9 @@ mod tests { assert!(result.is_ok(), "Table should exist in child namespace"); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_empty_table_exists_in_child_namespace() { - let fixture = RestServerFixture::new(4015).await; + let fixture = RestServerFixture::new().await; // Create child namespace let create_ns_req = CreateNamespaceRequest { @@ -846,9 +883,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_describe_table_in_child_namespace() { - let fixture = RestServerFixture::new(4006).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create child namespace @@ -936,9 +973,9 @@ mod tests { } } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_drop_table_in_child_namespace() { - let fixture = RestServerFixture::new(4007).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create child namespace @@ -986,9 +1023,9 @@ mod tests { // (error message varies depending on implementation details) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_create_empty_table_in_child_namespace() { - let fixture = RestServerFixture::new(4008).await; + let fixture = RestServerFixture::new().await; // Create child namespace let create_ns_req = CreateNamespaceRequest { @@ -1040,9 +1077,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_describe_empty_table_in_child_namespace() { - let fixture = RestServerFixture::new(4016).await; + let fixture = RestServerFixture::new().await; // Create child namespace let create_ns_req = CreateNamespaceRequest { @@ -1095,9 +1132,9 @@ mod tests { // (schema is None until data is added) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_drop_empty_table_in_child_namespace() { - let fixture = RestServerFixture::new(4017).await; + let fixture = RestServerFixture::new().await; // Create child namespace let create_ns_req = CreateNamespaceRequest { @@ -1140,9 +1177,9 @@ mod tests { // (error message varies depending on implementation details) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_deeply_nested_namespace_with_empty_table() { - let fixture = RestServerFixture::new(4018).await; + let fixture = RestServerFixture::new().await; // Create deeply nested namespace hierarchy let create_req = CreateNamespaceRequest { @@ -1213,9 +1250,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_deeply_nested_namespace_with_table() { - let fixture = RestServerFixture::new(4009).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create deeply nested namespace hierarchy @@ -1294,9 +1331,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_namespace_isolation() { - let fixture = RestServerFixture::new(4010).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create two sibling namespaces @@ -1369,9 +1406,9 @@ mod tests { assert!(fixture.namespace.table_exists(exists_req).await.is_ok()); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_drop_namespace_with_tables_fails() { - let fixture = RestServerFixture::new(4011).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create namespace @@ -1415,9 +1452,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_drop_empty_child_namespace() { - let fixture = RestServerFixture::new(4012).await; + let fixture = RestServerFixture::new().await; // Create namespace let create_ns_req = CreateNamespaceRequest { @@ -1450,9 +1487,9 @@ mod tests { // (error message varies depending on implementation details) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_namespace_with_properties() { - let fixture = RestServerFixture::new(4013).await; + let fixture = RestServerFixture::new().await; // Create namespace with properties let mut properties = std::collections::HashMap::new(); @@ -1483,9 +1520,9 @@ mod tests { assert_eq!(props.get("environment"), Some(&"production".to_string())); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_root_namespace_operations() { - let fixture = RestServerFixture::new(4014).await; + let fixture = RestServerFixture::new().await; // Root namespace should always exist let exists_req = NamespaceExistsRequest { id: Some(vec![]) }; @@ -1520,9 +1557,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_register_table() { - let fixture = RestServerFixture::new(4019).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create child namespace @@ -1584,9 +1621,9 @@ mod tests { assert!(result.is_ok(), "Registered table should exist"); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_register_table_rejects_absolute_uri() { - let fixture = RestServerFixture::new(4020).await; + let fixture = RestServerFixture::new().await; // Create child namespace let create_ns_req = CreateNamespaceRequest { @@ -1618,9 +1655,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_register_table_rejects_path_traversal() { - let fixture = RestServerFixture::new(4021).await; + let fixture = RestServerFixture::new().await; // Create child namespace let create_ns_req = CreateNamespaceRequest { @@ -1652,9 +1689,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_deregister_table() { - let fixture = RestServerFixture::new(4022).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create child namespace @@ -1729,9 +1766,9 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_register_deregister_round_trip() { - let fixture = RestServerFixture::new(4023).await; + let fixture = RestServerFixture::new().await; let table_data = create_test_arrow_data(); // Create child namespace @@ -1842,7 +1879,7 @@ mod tests { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_namespace_write() { use arrow::array::Int32Array; use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema}; @@ -1850,7 +1887,7 @@ mod tests { use lance::dataset::{Dataset, WriteMode, WriteParams}; use lance_namespace::LanceNamespace; - let fixture = RestServerFixture::new(4024).await; + let fixture = RestServerFixture::new().await; let namespace = Arc::new(fixture.namespace.clone()) as Arc; // Use child namespace instead of root From 80de0da7a5b34be6b5ec2c31b3bdb5d1cbc902a6 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 2 Dec 2025 11:06:31 -0800 Subject: [PATCH 8/8] cleanup --- java/lance-jni/src/namespace.rs | 31 ++++++++++++------- .../java/org/lance/namespace/RestAdapter.java | 26 +++++++--------- .../lance/namespace/RestNamespaceTest.java | 4 +-- python/python/lance/namespace.py | 17 +++++----- python/python/tests/test_namespace_rest.py | 6 ++-- python/src/namespace.rs | 20 +++++++----- .../lance-namespace-impls/src/rest_adapter.rs | 7 +++-- 7 files changed, 64 insertions(+), 47 deletions(-) diff --git a/java/lance-jni/src/namespace.rs b/java/lance-jni/src/namespace.rs index 9f6b879f513..d197c2b594b 100644 --- a/java/lance-jni/src/namespace.rs +++ b/java/lance-jni/src/namespace.rs @@ -1235,7 +1235,7 @@ pub extern "system" fn Java_org_lance_namespace_RestAdapter_createNative( namespace_impl: JString, properties_map: JObject, host: JString, - port: jni::sys::jint, + port: JObject, ) -> jlong { ok_or_throw_with_return!( env, @@ -1249,7 +1249,7 @@ fn create_rest_adapter_internal( namespace_impl: JString, properties_map: JObject, host: JString, - port: jni::sys::jint, + port: JObject, ) -> Result { // Get namespace implementation type let impl_str: String = env.get_string(&namespace_impl)?.into(); @@ -1268,13 +1268,22 @@ fn create_rest_adapter_internal( .block_on(builder.connect()) .map_err(|e| Error::runtime_error(format!("Failed to build backend namespace: {}", e)))?; - // Get host string - let host_str: String = env.get_string(&host)?.into(); + // Build config with defaults, overriding if values provided + let mut config = RestAdapterConfig::default(); - let config = RestAdapterConfig { - host: host_str, - port: port as u16, - }; + // Get host string if not null + if !host.is_null() { + config.host = env.get_string(&host)?.into(); + } + + // Get port if not null (Integer object) + if !port.is_null() { + let port_value = env + .call_method(&port, "intValue", "()I", &[])? + .i() + .map_err(|e| Error::runtime_error(format!("Failed to get port value: {}", e)))?; + config.port = port_value as u16; + } let adapter = BlockingRestAdapter { backend, @@ -1287,15 +1296,15 @@ fn create_rest_adapter_internal( } #[no_mangle] -pub extern "system" fn Java_org_lance_namespace_RestAdapter_serve( +pub extern "system" fn Java_org_lance_namespace_RestAdapter_start( mut env: JNIEnv, _obj: JObject, handle: jlong, ) { - ok_or_throw_without_return!(env, serve_internal(handle)) + ok_or_throw_without_return!(env, start_internal(handle)) } -fn serve_internal(handle: jlong) -> Result<()> { +fn start_internal(handle: jlong) -> Result<()> { let adapter = unsafe { &mut *(handle as *mut BlockingRestAdapter) }; let rest_adapter = RestAdapter::new(adapter.backend.clone(), adapter.config.clone()); let server_handle = RT.block_on(rest_adapter.start())?; diff --git a/java/src/main/java/org/lance/namespace/RestAdapter.java b/java/src/main/java/org/lance/namespace/RestAdapter.java index a8da5ed9904..534a7eabb9e 100644 --- a/java/src/main/java/org/lance/namespace/RestAdapter.java +++ b/java/src/main/java/org/lance/namespace/RestAdapter.java @@ -32,7 +32,7 @@ * * // Use port 0 to let OS assign an available port * try (RestAdapter adapter = new RestAdapter("dir", backendConfig)) { - * adapter.serve(); + * adapter.start(); * * // Get the actual port assigned by the OS * int port = adapter.getPort(); @@ -60,21 +60,19 @@ public class RestAdapter implements Closeable, AutoCloseable { * * @param namespaceImpl The namespace implementation type (e.g., "dir" for DirectoryNamespace) * @param backendConfig Configuration properties for the backend namespace - * @param host Host to bind the server to - * @param port Port to bind the server to. Use 0 to let the OS assign an available port. + * @param host Host to bind the server to, or null for default (127.0.0.1) + * @param port Port to bind the server to. Use 0 to let the OS assign an available port, or null + * for default (2333). */ public RestAdapter( - String namespaceImpl, Map backendConfig, String host, int port) { + String namespaceImpl, Map backendConfig, String host, Integer port) { if (namespaceImpl == null || namespaceImpl.isEmpty()) { throw new IllegalArgumentException("namespace implementation cannot be null or empty"); } if (backendConfig == null) { throw new IllegalArgumentException("backend config cannot be null"); } - if (host == null || host.isEmpty()) { - throw new IllegalArgumentException("host cannot be null or empty"); - } - if (port < 0 || port > 65535) { + if (port != null && (port < 0 || port > 65535)) { throw new IllegalArgumentException("port must be between 0 and 65535"); } @@ -82,13 +80,13 @@ public RestAdapter( } /** - * Creates a new REST adapter with default host (127.0.0.1) and port 0 (OS-assigned). + * Creates a new REST adapter with default host and port. * * @param namespaceImpl The namespace implementation type * @param backendConfig Configuration properties for the backend namespace */ public RestAdapter(String namespaceImpl, Map backendConfig) { - this(namespaceImpl, backendConfig, "127.0.0.1", 0); + this(namespaceImpl, backendConfig, null, null); } /** @@ -97,7 +95,7 @@ public RestAdapter(String namespaceImpl, Map backendConfig) { *

This method returns immediately after starting the server. The server runs in a background * thread until {@link #stop()} is called or the adapter is closed. */ - public void serve() { + public void start() { if (nativeRestAdapterHandle == 0) { throw new IllegalStateException("RestAdapter not initialized"); } @@ -105,7 +103,7 @@ public void serve() { throw new IllegalStateException("Server already started"); } - serve(nativeRestAdapterHandle); + start(nativeRestAdapterHandle); serverStarted = true; } @@ -146,9 +144,9 @@ public void close() { // Native methods private native long createNative( - String namespaceImpl, Map backendConfig, String host, int port); + String namespaceImpl, Map backendConfig, String host, Integer port); - private native void serve(long handle); + private native void start(long handle); private native int getPort(long handle); diff --git a/java/src/test/java/org/lance/namespace/RestNamespaceTest.java b/java/src/test/java/org/lance/namespace/RestNamespaceTest.java index 6f6ca6079e1..797ef5d6785 100644 --- a/java/src/test/java/org/lance/namespace/RestNamespaceTest.java +++ b/java/src/test/java/org/lance/namespace/RestNamespaceTest.java @@ -61,8 +61,8 @@ void setUp() { backendConfig.put("root", tempDir.toString()); // Create and start REST adapter (port 0 lets OS assign available port) - adapter = new RestAdapter("dir", backendConfig); - adapter.serve(); + adapter = new RestAdapter("dir", backendConfig, "127.0.0.1", 0); + adapter.start(); port = adapter.getPort(); // Create REST namespace client diff --git a/python/python/lance/namespace.py b/python/python/lance/namespace.py index f985f73fa2b..426c7176d74 100644 --- a/python/python/lance/namespace.py +++ b/python/python/lance/namespace.py @@ -314,10 +314,11 @@ class RestAdapter: session : Session, optional Lance session for sharing object store connections with the backend namespace. host : str, optional - Host address to bind to, default "127.0.0.1" + Host address to bind to. Default "127.0.0.1". port : int, optional - Port to listen on. Default 0 lets the OS assign an available port. - Use the `port` property after `serve()` to get the actual port. + Port to listen on. Default 2333 per REST spec. + Use 0 to let the OS assign an available ephemeral port. + Use the `port` property after `start()` to get the actual port. Examples -------- @@ -336,8 +337,8 @@ def __init__( namespace_impl: str, namespace_properties: Dict[str, str] = None, session=None, - host: str = "127.0.0.1", - port: int = 0, + host: str = None, + port: int = None, ): if PyRestAdapter is None: raise RuntimeError( @@ -364,9 +365,9 @@ def port(self) -> int: """ return self._inner.port - def serve(self): + def start(self): """Start the REST server in the background.""" - self._inner.serve() + self._inner.start() def stop(self): """Stop the REST server.""" @@ -374,7 +375,7 @@ def stop(self): def __enter__(self): """Start server when entering context.""" - self.serve() + self.start() return self def __exit__(self, exc_type, exc_value, traceback): diff --git a/python/python/tests/test_namespace_rest.py b/python/python/tests/test_namespace_rest.py index e83f090efbc..7fa3a65c5f1 100644 --- a/python/python/tests/test_namespace_rest.py +++ b/python/python/tests/test_namespace_rest.py @@ -61,7 +61,7 @@ def rest_namespace(): with tempfile.TemporaryDirectory() as tmpdir: backend_config = {"root": tmpdir} - with lance.namespace.RestAdapter("dir", backend_config) as adapter: + with lance.namespace.RestAdapter("dir", backend_config, port=0) as adapter: client = connect("rest", {"uri": f"http://127.0.0.1:{adapter.port}"}) yield client @@ -644,7 +644,7 @@ def test_connect_with_rest(self): with tempfile.TemporaryDirectory() as tmpdir: backend_config = {"root": tmpdir} - with lance.namespace.RestAdapter("dir", backend_config) as adapter: + with lance.namespace.RestAdapter("dir", backend_config, port=0) as adapter: properties = {"uri": f"http://127.0.0.1:{adapter.port}"} ns = connect("rest", properties) @@ -666,7 +666,7 @@ def test_connect_with_custom_delimiter(self): with tempfile.TemporaryDirectory() as tmpdir: backend_config = {"root": tmpdir} - with lance.namespace.RestAdapter("dir", backend_config) as adapter: + with lance.namespace.RestAdapter("dir", backend_config, port=0) as adapter: properties = { "uri": f"http://127.0.0.1:{adapter.port}", "delimiter": "@", diff --git a/python/src/namespace.rs b/python/src/namespace.rs index 9b695da185a..4ddf0fc76a4 100644 --- a/python/src/namespace.rs +++ b/python/src/namespace.rs @@ -363,16 +363,16 @@ pub struct PyRestAdapter { #[pymethods] impl PyRestAdapter { /// Create a new REST adapter server with namespace configuration. - /// If port is 0 (default), the OS will assign an available ephemeral port. - /// Use `port` property after `serve()` to get the actual port. + /// Default port is 2333 per REST spec. Use port 0 to let OS assign an ephemeral port. + /// Use `port` property after `start()` to get the actual port. #[new] - #[pyo3(signature = (namespace_impl, namespace_properties, session = None, host = "127.0.0.1".to_string(), port = 0))] + #[pyo3(signature = (namespace_impl, namespace_properties, session = None, host = None, port = None))] fn new( namespace_impl: String, namespace_properties: Option<&Bound<'_, PyDict>>, session: Option<&Bound<'_, Session>>, - host: String, - port: u16, + host: Option, + port: Option, ) -> PyResult { let mut props = HashMap::new(); @@ -393,7 +393,13 @@ impl PyRestAdapter { .block_on(None, builder.connect())? .infer_error()?; - let config = RestAdapterConfig { host, port }; + let mut config = RestAdapterConfig::default(); + if let Some(h) = host { + config.host = h; + } + if let Some(p) = port { + config.port = p; + } Ok(Self { backend, @@ -410,7 +416,7 @@ impl PyRestAdapter { } /// Start the REST server in the background - fn serve(&mut self, py: Python) -> PyResult<()> { + fn start(&mut self, py: Python) -> PyResult<()> { let adapter = RestAdapter::new(self.backend.clone(), self.config.clone()); let handle = crate::rt() .block_on(Some(py), adapter.start())? diff --git a/rust/lance-namespace-impls/src/rest_adapter.rs b/rust/lance-namespace-impls/src/rest_adapter.rs index e93a391c0a3..eae81d73ba6 100644 --- a/rust/lance-namespace-impls/src/rest_adapter.rs +++ b/rust/lance-namespace-impls/src/rest_adapter.rs @@ -38,7 +38,7 @@ impl Default for RestAdapterConfig { fn default() -> Self { Self { host: "127.0.0.1".to_string(), - port: 0, + port: 2333, } } } @@ -569,7 +569,10 @@ mod tests { let backend = Arc::new(backend); // Start REST server with port 0 (OS assigns available port) - let config = RestAdapterConfig::default(); + let config = RestAdapterConfig { + port: 0, + ..Default::default() + }; let server = RestAdapter::new(backend.clone(), config); let server_handle = server.start().await.unwrap();