Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 68 additions & 6 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,14 +602,45 @@ impl Dataset {
// Set up namespace commit handler if namespace and table_id are provided
if let (Some(ns), Some(tid)) = (namespace, table_id) {
// Extract the inner namespace Arc from PyDirectoryNamespace, PyRestNamespace,
// or create a PyLanceNamespace wrapper for custom Python implementations
// or create a PyLanceNamespace wrapper for custom Python implementations.
//
// Python wrapper classes (DirectoryNamespace, RestNamespace in namespace.py)
// store the PyO3 class in `_inner`. For the EXACT wrapper classes, we can
// use _inner directly. For subclasses (which may override methods), we must
// use PyLanceNamespace to call through Python.
let ns_arc: Arc<dyn LanceNamespace> =
if let Ok(dir_ns) = ns.downcast::<PyDirectoryNamespace>() {
// Native DirectoryNamespace - use inner directly (bypass Python layer)
// Direct PyO3 class
dir_ns.borrow().inner.clone()
} else if let Ok(rest_ns) = ns.downcast::<PyRestNamespace>() {
// Native RestNamespace - use inner directly (bypass Python layer)
// Direct PyO3 class
rest_ns.borrow().inner.clone()
} else if let Ok(inner) = ns.getattr("_inner") {
// Python wrapper class - check if it's the exact wrapper class
// (not a subclass) by comparing type names
let type_name = ns
.get_type()
.name()
.map(|n| n.to_string())
.unwrap_or_default();

if type_name == "DirectoryNamespace" {
if let Ok(dir_ns) = inner.downcast::<PyDirectoryNamespace>() {
dir_ns.borrow().inner.clone()
} else {
PyLanceNamespace::create_arc(py, ns)?
}
} else if type_name == "RestNamespace" {
if let Ok(rest_ns) = inner.downcast::<PyRestNamespace>() {
rest_ns.borrow().inner.clone()
} else {
PyLanceNamespace::create_arc(py, ns)?
}
} else {
// Subclass or custom implementation - use PyLanceNamespace
// to call through Python (requires *_json methods)
PyLanceNamespace::create_arc(py, ns)?
}
} else {
// Custom Python implementation - wrap with PyLanceNamespace
// This calls back into Python for namespace methods
Expand Down Expand Up @@ -3171,14 +3202,45 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult<Option<WritePar
if let (Some(ns), Some(table_id)) = (namespace_opt, table_id_opt) {
let py = options.py();
// Extract the inner namespace Arc from PyDirectoryNamespace, PyRestNamespace,
// or create a PyLanceNamespace wrapper for custom Python implementations
// or create a PyLanceNamespace wrapper for custom Python implementations.
//
// Python wrapper classes (DirectoryNamespace, RestNamespace in namespace.py)
// store the PyO3 class in `_inner`. For the EXACT wrapper classes, we can
// use _inner directly. For subclasses (which may override methods), we must
// use PyLanceNamespace to call through Python.
let ns_arc: Arc<dyn LanceNamespace> =
if let Ok(dir_ns) = ns.downcast::<PyDirectoryNamespace>() {
// Native DirectoryNamespace - use inner directly (bypass Python layer)
// Direct PyO3 class
dir_ns.borrow().inner.clone()
} else if let Ok(rest_ns) = ns.downcast::<PyRestNamespace>() {
// Native RestNamespace - use inner directly (bypass Python layer)
// Direct PyO3 class
rest_ns.borrow().inner.clone()
} else if let Ok(inner) = ns.getattr("_inner") {
// Python wrapper class - check if it's the exact wrapper class
// (not a subclass) by comparing type names
let type_name = ns
.get_type()
.name()
.map(|n| n.to_string())
.unwrap_or_default();

if type_name == "DirectoryNamespace" {
if let Ok(dir_ns) = inner.downcast::<PyDirectoryNamespace>() {
dir_ns.borrow().inner.clone()
} else {
PyLanceNamespace::create_arc(py, &ns)?
}
} else if type_name == "RestNamespace" {
if let Ok(rest_ns) = inner.downcast::<PyRestNamespace>() {
rest_ns.borrow().inner.clone()
} else {
PyLanceNamespace::create_arc(py, &ns)?
}
} else {
// Subclass or custom implementation - use PyLanceNamespace
// to call through Python (requires *_json methods)
PyLanceNamespace::create_arc(py, &ns)?
}
} else {
// Custom Python implementation - wrap with PyLanceNamespace
PyLanceNamespace::create_arc(py, &ns)?
Expand Down
4 changes: 2 additions & 2 deletions python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use lance_file::{version::LanceFileVersion, LanceEncodingsIo};
use lance_io::object_store::ObjectStoreParams;
use lance_io::{
scheduler::{ScanScheduler, SchedulerConfig},
traits::Writer,
utils::CachedFileSize,
ReadBatchParams,
};
Expand Down Expand Up @@ -577,8 +578,7 @@ impl LanceFileSession {
tokio::io::copy(&mut reader, &mut writer)
.await
.map_err(|e| PyIOError::new_err(format!("Failed to upload file: {}", e)))?;
writer
.shutdown()
Writer::shutdown(writer.as_mut())
.await
.map_err(|e| PyIOError::new_err(format!("Failed to finalize upload: {}", e)))?;

Expand Down
29 changes: 22 additions & 7 deletions rust/lance-namespace-impls/src/dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use arrow_ipc::reader::StreamReader;
use async_trait::async_trait;
use bytes::Bytes;
use futures::TryStreamExt;
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::{Dataset, WriteParams};
use lance::session::Session;
use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
Expand Down Expand Up @@ -1148,7 +1149,15 @@ impl LanceNamespace for DirectoryNamespace {
}

// Try to load the dataset to get real information
match Dataset::open(&table_uri).await {
// Use DatasetBuilder with storage options to support S3 with custom endpoints
let mut builder = DatasetBuilder::from_uri(&table_uri);
if let Some(opts) = &self.storage_options {
builder = builder.with_storage_options(opts.clone());
}
if let Some(sess) = &self.session {
builder = builder.with_session(sess.clone());
}
match builder.load().await {
Ok(mut dataset) => {
// If a specific version is requested, checkout that version
if let Some(requested_version) = request.version {
Expand Down Expand Up @@ -1788,12 +1797,18 @@ impl LanceNamespace for DirectoryNamespace {
) -> Result<DescribeTableVersionResponse> {
let table_uri = self.resolve_table_location(&request.id).await?;

let mut dataset = Dataset::open(&table_uri)
.await
.map_err(|e| Error::Namespace {
source: format!("Failed to open table at '{}': {}", table_uri, e).into(),
location: snafu::location!(),
})?;
// Use DatasetBuilder with storage options to support S3 with custom endpoints
let mut builder = DatasetBuilder::from_uri(&table_uri);
if let Some(opts) = &self.storage_options {
builder = builder.with_storage_options(opts.clone());
}
if let Some(sess) = &self.session {
builder = builder.with_session(sess.clone());
}
let mut dataset = builder.load().await.map_err(|e| Error::Namespace {
source: format!("Failed to open table at '{}': {}", table_uri, e).into(),
location: snafu::location!(),
})?;

if let Some(version) = request.version {
dataset = dataset
Expand Down
Loading