diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 20ee0e5e707..acb207f3f15 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -602,14 +602,45 @@ impl Dataset { // Set up namespace commit handler if namespace and table_id are provided if let (Some(ns), Some(tid)) = (namespace, table_id) { // Extract the inner namespace Arc from PyDirectoryNamespace, PyRestNamespace, - // or create a PyLanceNamespace wrapper for custom Python implementations + // or create a PyLanceNamespace wrapper for custom Python implementations. + // + // Python wrapper classes (DirectoryNamespace, RestNamespace in namespace.py) + // store the PyO3 class in `_inner`. For the EXACT wrapper classes, we can + // use _inner directly. For subclasses (which may override methods), we must + // use PyLanceNamespace to call through Python. let ns_arc: Arc = if let Ok(dir_ns) = ns.downcast::() { - // Native DirectoryNamespace - use inner directly (bypass Python layer) + // Direct PyO3 class dir_ns.borrow().inner.clone() } else if let Ok(rest_ns) = ns.downcast::() { - // Native RestNamespace - use inner directly (bypass Python layer) + // Direct PyO3 class rest_ns.borrow().inner.clone() + } else if let Ok(inner) = ns.getattr("_inner") { + // Python wrapper class - check if it's the exact wrapper class + // (not a subclass) by comparing type names + let type_name = ns + .get_type() + .name() + .map(|n| n.to_string()) + .unwrap_or_default(); + + if type_name == "DirectoryNamespace" { + if let Ok(dir_ns) = inner.downcast::() { + dir_ns.borrow().inner.clone() + } else { + PyLanceNamespace::create_arc(py, ns)? + } + } else if type_name == "RestNamespace" { + if let Ok(rest_ns) = inner.downcast::() { + rest_ns.borrow().inner.clone() + } else { + PyLanceNamespace::create_arc(py, ns)? + } + } else { + // Subclass or custom implementation - use PyLanceNamespace + // to call through Python (requires *_json methods) + PyLanceNamespace::create_arc(py, ns)? + } } else { // Custom Python implementation - wrap with PyLanceNamespace // This calls back into Python for namespace methods @@ -3171,14 +3202,45 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult = if let Ok(dir_ns) = ns.downcast::() { - // Native DirectoryNamespace - use inner directly (bypass Python layer) + // Direct PyO3 class dir_ns.borrow().inner.clone() } else if let Ok(rest_ns) = ns.downcast::() { - // Native RestNamespace - use inner directly (bypass Python layer) + // Direct PyO3 class rest_ns.borrow().inner.clone() + } else if let Ok(inner) = ns.getattr("_inner") { + // Python wrapper class - check if it's the exact wrapper class + // (not a subclass) by comparing type names + let type_name = ns + .get_type() + .name() + .map(|n| n.to_string()) + .unwrap_or_default(); + + if type_name == "DirectoryNamespace" { + if let Ok(dir_ns) = inner.downcast::() { + dir_ns.borrow().inner.clone() + } else { + PyLanceNamespace::create_arc(py, &ns)? + } + } else if type_name == "RestNamespace" { + if let Ok(rest_ns) = inner.downcast::() { + rest_ns.borrow().inner.clone() + } else { + PyLanceNamespace::create_arc(py, &ns)? + } + } else { + // Subclass or custom implementation - use PyLanceNamespace + // to call through Python (requires *_json methods) + PyLanceNamespace::create_arc(py, &ns)? + } } else { // Custom Python implementation - wrap with PyLanceNamespace PyLanceNamespace::create_arc(py, &ns)? diff --git a/python/src/file.rs b/python/src/file.rs index 85a5818037e..bf5e68e8876 100644 --- a/python/src/file.rs +++ b/python/src/file.rs @@ -31,6 +31,7 @@ use lance_file::{version::LanceFileVersion, LanceEncodingsIo}; use lance_io::object_store::ObjectStoreParams; use lance_io::{ scheduler::{ScanScheduler, SchedulerConfig}, + traits::Writer, utils::CachedFileSize, ReadBatchParams, }; @@ -577,8 +578,7 @@ impl LanceFileSession { tokio::io::copy(&mut reader, &mut writer) .await .map_err(|e| PyIOError::new_err(format!("Failed to upload file: {}", e)))?; - writer - .shutdown() + Writer::shutdown(writer.as_mut()) .await .map_err(|e| PyIOError::new_err(format!("Failed to finalize upload: {}", e)))?; diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 88f92d1395d..3b8a398664d 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -13,6 +13,7 @@ use arrow_ipc::reader::StreamReader; use async_trait::async_trait; use bytes::Bytes; use futures::TryStreamExt; +use lance::dataset::builder::DatasetBuilder; use lance::dataset::{Dataset, WriteParams}; use lance::session::Session; use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; @@ -1148,7 +1149,15 @@ impl LanceNamespace for DirectoryNamespace { } // Try to load the dataset to get real information - match Dataset::open(&table_uri).await { + // Use DatasetBuilder with storage options to support S3 with custom endpoints + let mut builder = DatasetBuilder::from_uri(&table_uri); + if let Some(opts) = &self.storage_options { + builder = builder.with_storage_options(opts.clone()); + } + if let Some(sess) = &self.session { + builder = builder.with_session(sess.clone()); + } + match builder.load().await { Ok(mut dataset) => { // If a specific version is requested, checkout that version if let Some(requested_version) = request.version { @@ -1788,12 +1797,18 @@ impl LanceNamespace for DirectoryNamespace { ) -> Result { let table_uri = self.resolve_table_location(&request.id).await?; - let mut dataset = Dataset::open(&table_uri) - .await - .map_err(|e| Error::Namespace { - source: format!("Failed to open table at '{}': {}", table_uri, e).into(), - location: snafu::location!(), - })?; + // Use DatasetBuilder with storage options to support S3 with custom endpoints + let mut builder = DatasetBuilder::from_uri(&table_uri); + if let Some(opts) = &self.storage_options { + builder = builder.with_storage_options(opts.clone()); + } + if let Some(sess) = &self.session { + builder = builder.with_session(sess.clone()); + } + let mut dataset = builder.load().await.map_err(|e| Error::Namespace { + source: format!("Failed to open table at '{}': {}", table_uri, e).into(), + location: snafu::location!(), + })?; if let Some(version) = request.version { dataset = dataset