Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions rust/lance-namespace-impls/src/dir/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use arrow_ipc::reader::StreamReader;
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::StreamExt;
use futures::{stream::StreamExt, FutureExt};
use lance::dataset::optimize::{compact_files, CompactionOptions};
use lance::dataset::{builder::DatasetBuilder, WriteParams};
use lance::session::Session;
Expand Down Expand Up @@ -1081,7 +1081,7 @@ impl LanceNamespace for ManifestNamespace {
}

let object_id = Self::str_object_id(table_id);
let table_info = self.query_manifest_for_table(&object_id).await?;
let table_info = self.query_manifest_for_table(&object_id).boxed().await?;

// Extract table name and namespace from table_id
let table_name = table_id.last().cloned().unwrap_or_default();
Expand Down Expand Up @@ -1298,12 +1298,12 @@ impl LanceNamespace for ManifestNamespace {
let object_id = Self::build_object_id(&namespace, &table_name);

// Query manifest for table location
let table_info = self.query_manifest_for_table(&object_id).await?;
let table_info = self.query_manifest_for_table(&object_id).boxed().await?;

match table_info {
Some(info) => {
// Delete from manifest first
self.delete_from_manifest(&object_id).await?;
self.delete_from_manifest(&object_id).boxed().await?;

// Delete physical data directory using the dir_name from manifest
let table_path = self.base_path.child(info.location.as_str());
Expand All @@ -1312,6 +1312,7 @@ impl LanceNamespace for ManifestNamespace {
// Remove the table directory
self.object_store
.remove_dir_all(table_path)
.boxed()
.await
.map_err(|e| Error::Namespace {
source: format!("Failed to delete table directory: {}", e).into(),
Expand Down Expand Up @@ -1489,7 +1490,7 @@ impl LanceNamespace for ManifestNamespace {
let object_id = namespace_id.join(DELIMITER);

// Check if namespace exists
if !self.manifest_contains_object(&object_id).await? {
if !self.manifest_contains_object(&object_id).boxed().await? {
return Err(Error::Namespace {
source: format!("Namespace '{}' not found", object_id).into(),
location: location!(),
Expand All @@ -1499,7 +1500,7 @@ impl LanceNamespace for ManifestNamespace {
// Check for child namespaces
let prefix = format!("{}{}", object_id, DELIMITER);
let filter = format!("starts_with(object_id, '{}')", prefix);
let mut scanner = self.manifest_scanner().await?;
let mut scanner = self.manifest_scanner().boxed().await?;
scanner.filter(&filter).map_err(|e| Error::IO {
source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
location: location!(),
Expand All @@ -1509,7 +1510,7 @@ impl LanceNamespace for ManifestNamespace {
location: location!(),
})?;
scanner.with_row_id();
let count = scanner.count_rows().await.map_err(|e| Error::IO {
let count = scanner.count_rows().boxed().await.map_err(|e| Error::IO {
source: box_error(std::io::Error::other(format!(
"Failed to count rows: {}",
e
Expand All @@ -1528,7 +1529,7 @@ impl LanceNamespace for ManifestNamespace {
});
}

self.delete_from_manifest(&object_id).await?;
self.delete_from_manifest(&object_id).boxed().await?;

Ok(DropNamespaceResponse::default())
}
Expand Down Expand Up @@ -1863,7 +1864,7 @@ impl LanceNamespace for ManifestNamespace {
let table_uri = match table_info {
Some(info) => {
// Delete from manifest only (leave physical data intact)
self.delete_from_manifest(&object_id).await?;
self.delete_from_manifest(&object_id).boxed().await?;
Self::construct_full_uri(&self.root, &info.location)?
}
None => {
Expand Down
Loading