From d15deeb8697965983eb1b00fd67e82d17c91839e Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 12 Mar 2026 15:06:56 -0700 Subject: [PATCH 01/12] adding support for purge_table --- crates/catalog/glue/src/catalog.rs | 2 +- crates/catalog/hms/src/catalog.rs | 2 +- crates/catalog/rest/src/catalog.rs | 13 +- crates/catalog/s3tables/src/catalog.rs | 2 +- crates/catalog/sql/src/catalog.rs | 2 +- crates/iceberg/src/catalog/memory/catalog.rs | 2 +- crates/iceberg/src/catalog/mod.rs | 12 +- crates/iceberg/src/catalog/utils.rs | 151 +++++++++++++++++++ 8 files changed, 176 insertions(+), 10 deletions(-) create mode 100644 crates/iceberg/src/catalog/utils.rs diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 9e9d4580c3..5d7802912c 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -642,7 +642,7 @@ impl Catalog for GlueCatalog { /// attempting to drop the table. This includes scenarios where /// the table does not exist. /// - Any network or communication error occurs with the database backend. - async fn drop_table(&self, table: &TableIdent) -> Result<()> { + async fn drop_table_with_purge(&self, table: &TableIdent, _purge: bool) -> Result<()> { let db_name = validate_namespace(table.namespace())?; let table_name = table.name(); diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index bd78193732..a2161707ff 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -580,7 +580,7 @@ impl Catalog for HmsCatalog { /// attempting to drop the table. This includes scenarios where /// the table does not exist. /// - Any network or communication error occurs with the database backend. - async fn drop_table(&self, table: &TableIdent) -> Result<()> { + async fn drop_table_with_purge(&self, table: &TableIdent, _purge: bool) -> Result<()> { let db_name = validate_namespace(table.namespace())?; if !self.namespace_exists(table.namespace()).await? { return Err(Error::new( diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 3551b05160..da490d7d95 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -827,13 +827,18 @@ impl Catalog for RestCatalog { } /// Drop a table from the catalog. - async fn drop_table(&self, table: &TableIdent) -> Result<()> { + async fn drop_table_with_purge(&self, table: &TableIdent, purge: bool) -> Result<()> { let context = self.context().await?; - let request = context + let mut request_builder = context .client - .request(Method::DELETE, context.config.table_endpoint(table)) - .build()?; + .request(Method::DELETE, context.config.table_endpoint(table)); + + if purge { + request_builder = request_builder.query(&[("purgeRequested", "true")]); + } + + let request = request_builder.build()?; let http_response = context.client.query_catalog(request).await?; diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index a416c38f22..2bd57d9c74 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -570,7 +570,7 @@ impl Catalog for S3TablesCatalog { /// This function can return an error in the following situations: /// - Errors from the underlying database deletion process, converted using /// `from_aws_sdk_error`. - async fn drop_table(&self, table: &TableIdent) -> Result<()> { + async fn drop_table_with_purge(&self, table: &TableIdent, _purge: bool) -> Result<()> { let req = self .s3tables_client .delete_table() diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 195f6c9de4..55f738b9c6 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -729,7 +729,7 @@ impl Catalog for SqlCatalog { } } - async fn drop_table(&self, identifier: &TableIdent) -> Result<()> { + async fn drop_table_with_purge(&self, identifier: &TableIdent, _purge: bool) -> Result<()> { if !self.table_exists(identifier).await? { return no_such_table_err(identifier); } diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 25ae004417..ffbcb88b45 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -319,7 +319,7 @@ impl Catalog for MemoryCatalog { } /// Drop a table from the catalog. - async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> { + async fn drop_table_with_purge(&self, table_ident: &TableIdent, _purge: bool) -> Result<()> { let mut root_namespace_state = self.root_namespace_state.lock().await; root_namespace_state.remove_existing_table(table_ident)?; diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 06326917ec..59c4947cf5 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -19,6 +19,7 @@ pub mod memory; mod metadata_location; +pub mod utils; use std::collections::HashMap; use std::fmt::{Debug, Display}; @@ -95,8 +96,17 @@ pub trait Catalog: Debug + Sync + Send { /// Load table from the catalog. async fn load_table(&self, table: &TableIdent) -> Result; + /// Drop a table from the catalog and purge its data, or returns error if it doesn't exist. + /// + /// This is equivalent to calling `drop_table_with_purge(table, true)`. + async fn drop_table(&self, table: &TableIdent) -> Result<()> { + self.drop_table_with_purge(table, true).await + } + /// Drop a table from the catalog, or returns error if it doesn't exist. - async fn drop_table(&self, table: &TableIdent) -> Result<()>; + /// + /// If `purge` is true, the catalog should also delete the underlying table data. + async fn drop_table_with_purge(&self, table: &TableIdent, purge: bool) -> Result<()>; /// Check if a table exists in the catalog. async fn table_exists(&self, table: &TableIdent) -> Result; diff --git a/crates/iceberg/src/catalog/utils.rs b/crates/iceberg/src/catalog/utils.rs new file mode 100644 index 0000000000..000de96b22 --- /dev/null +++ b/crates/iceberg/src/catalog/utils.rs @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utility functions for catalog operations. + +use std::collections::HashSet; + +use crate::io::FileIO; +use crate::spec::TableMetadata; +use crate::Result; + +/// Property key for enabling garbage collection on drop. +/// When set to `false`, data files will not be deleted when a table is dropped. +/// Defaults to `true`. +pub const GC_ENABLED: &str = "gc.enabled"; +const GC_ENABLED_DEFAULT: bool = true; + +/// Deletes all data and metadata files referenced by the given table metadata. +/// +/// This mirrors the Java implementation's `CatalogUtil.dropTableData`. +/// It collects all manifest files, manifest lists, previous metadata files, +/// statistics files, and partition statistics files, then deletes them. +/// +/// Data files within manifests are only deleted if the `gc.enabled` table +/// property is `true` (the default), to avoid corrupting other tables that +/// may share the same data files. +/// +/// Individual file deletion failures are suppressed to complete as much +/// cleanup as possible, matching the Java behavior. +pub async fn drop_table_data( + io: &FileIO, + metadata: &TableMetadata, + metadata_location: Option<&str>, +) -> Result<()> { + let mut manifest_lists_to_delete: HashSet = HashSet::new(); + let mut manifests_to_delete: HashSet = HashSet::new(); + + for snapshot in metadata.snapshots() { + // Collect the manifest list location + let manifest_list_location = snapshot.manifest_list(); + if !manifest_list_location.is_empty() { + manifest_lists_to_delete.insert(manifest_list_location.to_string()); + } + + // Load all manifests from this snapshot + match snapshot.load_manifest_list(io, metadata).await { + Ok(manifest_list) => { + for manifest_file in manifest_list.entries() { + manifests_to_delete.insert(manifest_file.manifest_path.clone()); + } + } + Err(_) => { + // Suppress failure to continue cleanup + } + } + } + + let gc_enabled = metadata + .properties() + .get(GC_ENABLED) + .and_then(|v| v.parse::().ok()) + .unwrap_or(GC_ENABLED_DEFAULT); + + // Delete data files only if gc.enabled is true, to avoid corrupting shared tables + if gc_enabled { + delete_data_files(io, &manifests_to_delete).await; + } + + // Delete manifest files + delete_files(io, manifests_to_delete.iter().map(String::as_str)).await; + + // Delete manifest lists + delete_files(io, manifest_lists_to_delete.iter().map(String::as_str)).await; + + // Delete previous metadata files + delete_files( + io, + metadata.metadata_log().iter().map(|m| m.metadata_file.as_str()), + ) + .await; + + // Delete statistics files + delete_files( + io, + metadata + .statistics_iter() + .map(|s| s.statistics_path.as_str()), + ) + .await; + + // Delete partition statistics files + delete_files( + io, + metadata + .partition_statistics_iter() + .map(|s| s.statistics_path.as_str()), + ) + .await; + + // Delete the current metadata file + if let Some(location) = metadata_location { + let _ = io.delete(location).await; + } + + Ok(()) +} + +/// Reads each manifest and deletes the data files referenced within. +async fn delete_data_files(io: &FileIO, manifest_paths: &HashSet) { + for manifest_path in manifest_paths { + let input = match io.new_input(manifest_path) { + Ok(input) => input, + Err(_) => continue, + }; + + let manifest_content = match input.read().await { + Ok(content) => content, + Err(_) => continue, + }; + + let manifest = match crate::spec::Manifest::parse_avro(&manifest_content) { + Ok(manifest) => manifest, + Err(_) => continue, + }; + + for entry in manifest.entries() { + let _ = io.delete(entry.data_file.file_path()).await; + } + } +} + +/// Deletes a collection of files, suppressing individual failures. +async fn delete_files<'a>(io: &FileIO, paths: impl Iterator) { + for path in paths { + let _ = io.delete(path).await; + } +} From c79238dec9b09c2c8f6f45339652da54d78c9c2a Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 18 Mar 2026 14:26:09 -0700 Subject: [PATCH 02/12] improve --- crates/iceberg/src/catalog/utils.rs | 78 +++++++++------------ crates/iceberg/src/spec/table_properties.rs | 35 +++++++++ 2 files changed, 67 insertions(+), 46 deletions(-) diff --git a/crates/iceberg/src/catalog/utils.rs b/crates/iceberg/src/catalog/utils.rs index 000de96b22..30213e9767 100644 --- a/crates/iceberg/src/catalog/utils.rs +++ b/crates/iceberg/src/catalog/utils.rs @@ -19,15 +19,11 @@ use std::collections::HashSet; -use crate::io::FileIO; -use crate::spec::TableMetadata; -use crate::Result; +use futures::stream; -/// Property key for enabling garbage collection on drop. -/// When set to `false`, data files will not be deleted when a table is dropped. -/// Defaults to `true`. -pub const GC_ENABLED: &str = "gc.enabled"; -const GC_ENABLED_DEFAULT: bool = true; +use crate::Result; +use crate::io::FileIO; +use crate::spec::{TableMetadata, TableProperties}; /// Deletes all data and metadata files referenced by the given table metadata. /// @@ -69,47 +65,40 @@ pub async fn drop_table_data( } } - let gc_enabled = metadata - .properties() - .get(GC_ENABLED) - .and_then(|v| v.parse::().ok()) - .unwrap_or(GC_ENABLED_DEFAULT); - - // Delete data files only if gc.enabled is true, to avoid corrupting shared tables - if gc_enabled { + // Delete manifest files only if gc.enabled is true, to avoid corrupting shared tables + if metadata.table_properties()?.gc_enabled { delete_data_files(io, &manifests_to_delete).await; } // Delete manifest files - delete_files(io, manifests_to_delete.iter().map(String::as_str)).await; + let manifest_paths: Vec = manifests_to_delete.into_iter().collect(); + let _ = io.delete_stream(stream::iter(manifest_paths)).await; // Delete manifest lists - delete_files(io, manifest_lists_to_delete.iter().map(String::as_str)).await; + let manifest_list_paths: Vec = manifest_lists_to_delete.into_iter().collect(); + let _ = io.delete_stream(stream::iter(manifest_list_paths)).await; // Delete previous metadata files - delete_files( - io, - metadata.metadata_log().iter().map(|m| m.metadata_file.as_str()), - ) - .await; + let prev_metadata_paths: Vec = metadata + .metadata_log() + .iter() + .map(|m| m.metadata_file.clone()) + .collect(); + let _ = io.delete_stream(stream::iter(prev_metadata_paths)).await; // Delete statistics files - delete_files( - io, - metadata - .statistics_iter() - .map(|s| s.statistics_path.as_str()), - ) - .await; + let stats_paths: Vec = metadata + .statistics_iter() + .map(|s| s.statistics_path.clone()) + .collect(); + let _ = io.delete_stream(stream::iter(stats_paths)).await; // Delete partition statistics files - delete_files( - io, - metadata - .partition_statistics_iter() - .map(|s| s.statistics_path.as_str()), - ) - .await; + let partition_stats_paths: Vec = metadata + .partition_statistics_iter() + .map(|s| s.statistics_path.clone()) + .collect(); + let _ = io.delete_stream(stream::iter(partition_stats_paths)).await; // Delete the current metadata file if let Some(location) = metadata_location { @@ -137,15 +126,12 @@ async fn delete_data_files(io: &FileIO, manifest_paths: &HashSet) { Err(_) => continue, }; - for entry in manifest.entries() { - let _ = io.delete(entry.data_file.file_path()).await; - } - } -} + let data_file_paths: Vec = manifest + .entries() + .iter() + .map(|entry| entry.data_file.file_path().to_string()) + .collect(); -/// Deletes a collection of files, suppressing individual failures. -async fn delete_files<'a>(io: &FileIO, paths: impl Iterator) { - for path in paths { - let _ = io.delete(path).await; + let _ = io.delete_stream(stream::iter(data_file_paths)).await; } } diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index 6e08318479..dd863b5aca 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -114,6 +114,9 @@ pub struct TableProperties { pub metadata_compression_codec: CompressionCodec, /// Whether to use `FanoutWriter` for partitioned tables. pub write_datafusion_fanout_enabled: bool, + /// Whether garbage collection is enabled on drop. + /// When `false`, manifest files will not be deleted when a table is dropped. + pub gc_enabled: bool, } impl TableProperties { @@ -212,6 +215,13 @@ impl TableProperties { pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = "write.datafusion.fanout.enabled"; /// Default value for fanout writer enabled pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true; + + /// Property key for enabling garbage collection on drop. + /// When set to `false`, manifest files will not be deleted when a table is dropped. + /// Defaults to `true`. + pub const PROPERTY_GC_ENABLED: &str = "gc.enabled"; + /// Default value for gc.enabled + pub const PROPERTY_GC_ENABLED_DEFAULT: bool = true; } impl TryFrom<&HashMap> for TableProperties { @@ -256,6 +266,11 @@ impl TryFrom<&HashMap> for TableProperties { TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED, TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT, )?, + gc_enabled: parse_property( + props, + TableProperties::PROPERTY_GC_ENABLED, + TableProperties::PROPERTY_GC_ENABLED_DEFAULT, + )?, }) } } @@ -294,6 +309,10 @@ mod tests { table_properties.metadata_compression_codec, CompressionCodec::None ); + assert_eq!( + table_properties.gc_enabled, + TableProperties::PROPERTY_GC_ENABLED_DEFAULT + ); } #[test] @@ -377,12 +396,17 @@ mod tests { TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(), "512".to_string(), ), + ( + TableProperties::PROPERTY_GC_ENABLED.to_string(), + "false".to_string(), + ), ]); let table_properties = TableProperties::try_from(&props).unwrap(); assert_eq!(table_properties.commit_num_retries, 10); assert_eq!(table_properties.commit_max_retry_wait_ms, 20); assert_eq!(table_properties.write_format_default, "avro".to_string()); assert_eq!(table_properties.write_target_file_size_bytes, 512); + assert!(!table_properties.gc_enabled); } #[test] @@ -429,6 +453,17 @@ mod tests { assert!(table_properties.to_string().contains( "Invalid value for write.target-file-size-bytes: invalid digit found in string" )); + + let invalid_gc_enabled = HashMap::from([( + TableProperties::PROPERTY_GC_ENABLED.to_string(), + "notabool".to_string(), + )]); + let table_properties = TableProperties::try_from(&invalid_gc_enabled).unwrap_err(); + assert!( + table_properties + .to_string() + .contains("Invalid value for gc.enabled") + ); } #[test] From d85c92278079e361ff1ebdd1161aa212f00b9f23 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 18 Mar 2026 14:36:27 -0700 Subject: [PATCH 03/12] expose error correctly --- crates/iceberg/src/catalog/utils.rs | 48 ++++++++------------- crates/iceberg/src/spec/table_properties.rs | 4 +- 2 files changed, 20 insertions(+), 32 deletions(-) diff --git a/crates/iceberg/src/catalog/utils.rs b/crates/iceberg/src/catalog/utils.rs index 30213e9767..25d9d0a801 100644 --- a/crates/iceberg/src/catalog/utils.rs +++ b/crates/iceberg/src/catalog/utils.rs @@ -23,7 +23,7 @@ use futures::stream; use crate::Result; use crate::io::FileIO; -use crate::spec::{TableMetadata, TableProperties}; +use crate::spec::TableMetadata; /// Deletes all data and metadata files referenced by the given table metadata. /// @@ -34,9 +34,6 @@ use crate::spec::{TableMetadata, TableProperties}; /// Data files within manifests are only deleted if the `gc.enabled` table /// property is `true` (the default), to avoid corrupting other tables that /// may share the same data files. -/// -/// Individual file deletion failures are suppressed to complete as much -/// cleanup as possible, matching the Java behavior. pub async fn drop_table_data( io: &FileIO, metadata: &TableMetadata, @@ -65,18 +62,17 @@ pub async fn drop_table_data( } } - // Delete manifest files only if gc.enabled is true, to avoid corrupting shared tables + // Delete data files only if gc.enabled is true, to avoid corrupting shared tables if metadata.table_properties()?.gc_enabled { - delete_data_files(io, &manifests_to_delete).await; + delete_data_files(io, &manifests_to_delete).await?; } // Delete manifest files - let manifest_paths: Vec = manifests_to_delete.into_iter().collect(); - let _ = io.delete_stream(stream::iter(manifest_paths)).await; + io.delete_stream(stream::iter(manifests_to_delete)).await?; // Delete manifest lists - let manifest_list_paths: Vec = manifest_lists_to_delete.into_iter().collect(); - let _ = io.delete_stream(stream::iter(manifest_list_paths)).await; + io.delete_stream(stream::iter(manifest_lists_to_delete)) + .await?; // Delete previous metadata files let prev_metadata_paths: Vec = metadata @@ -84,47 +80,37 @@ pub async fn drop_table_data( .iter() .map(|m| m.metadata_file.clone()) .collect(); - let _ = io.delete_stream(stream::iter(prev_metadata_paths)).await; + io.delete_stream(stream::iter(prev_metadata_paths)).await?; // Delete statistics files let stats_paths: Vec = metadata .statistics_iter() .map(|s| s.statistics_path.clone()) .collect(); - let _ = io.delete_stream(stream::iter(stats_paths)).await; + io.delete_stream(stream::iter(stats_paths)).await?; // Delete partition statistics files let partition_stats_paths: Vec = metadata .partition_statistics_iter() .map(|s| s.statistics_path.clone()) .collect(); - let _ = io.delete_stream(stream::iter(partition_stats_paths)).await; + io.delete_stream(stream::iter(partition_stats_paths)) + .await?; // Delete the current metadata file if let Some(location) = metadata_location { - let _ = io.delete(location).await; + io.delete(location).await?; } Ok(()) } /// Reads each manifest and deletes the data files referenced within. -async fn delete_data_files(io: &FileIO, manifest_paths: &HashSet) { +async fn delete_data_files(io: &FileIO, manifest_paths: &HashSet) -> Result<()> { for manifest_path in manifest_paths { - let input = match io.new_input(manifest_path) { - Ok(input) => input, - Err(_) => continue, - }; - - let manifest_content = match input.read().await { - Ok(content) => content, - Err(_) => continue, - }; - - let manifest = match crate::spec::Manifest::parse_avro(&manifest_content) { - Ok(manifest) => manifest, - Err(_) => continue, - }; + let input = io.new_input(manifest_path)?; + let manifest_content = input.read().await?; + let manifest = crate::spec::Manifest::parse_avro(&manifest_content)?; let data_file_paths: Vec = manifest .entries() @@ -132,6 +118,8 @@ async fn delete_data_files(io: &FileIO, manifest_paths: &HashSet) { .map(|entry| entry.data_file.file_path().to_string()) .collect(); - let _ = io.delete_stream(stream::iter(data_file_paths)).await; + io.delete_stream(stream::iter(data_file_paths)).await?; } + + Ok(()) } diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index dd863b5aca..07c157304e 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -115,7 +115,7 @@ pub struct TableProperties { /// Whether to use `FanoutWriter` for partitioned tables. pub write_datafusion_fanout_enabled: bool, /// Whether garbage collection is enabled on drop. - /// When `false`, manifest files will not be deleted when a table is dropped. + /// When `false`, data files will not be deleted when a table is dropped. pub gc_enabled: bool, } @@ -217,7 +217,7 @@ impl TableProperties { pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true; /// Property key for enabling garbage collection on drop. - /// When set to `false`, manifest files will not be deleted when a table is dropped. + /// When set to `false`, data files will not be deleted when a table is dropped. /// Defaults to `true`. pub const PROPERTY_GC_ENABLED: &str = "gc.enabled"; /// Default value for gc.enabled From 31b43101a442332af38875b9d558da4b5ece6beb Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 18 Mar 2026 15:03:24 -0700 Subject: [PATCH 04/12] add default purge implementation --- crates/catalog/glue/src/catalog.rs | 2 +- crates/catalog/hms/src/catalog.rs | 2 +- crates/catalog/rest/src/catalog.rs | 8 ++----- crates/catalog/s3tables/src/catalog.rs | 10 +++++++- crates/catalog/sql/src/catalog.rs | 2 +- crates/iceberg/src/catalog/memory/catalog.rs | 2 +- crates/iceberg/src/catalog/mod.rs | 25 +++++++++++++------- crates/iceberg/src/catalog/utils.rs | 12 +++------- 8 files changed, 34 insertions(+), 29 deletions(-) diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 5d7802912c..9e9d4580c3 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -642,7 +642,7 @@ impl Catalog for GlueCatalog { /// attempting to drop the table. This includes scenarios where /// the table does not exist. /// - Any network or communication error occurs with the database backend. - async fn drop_table_with_purge(&self, table: &TableIdent, _purge: bool) -> Result<()> { + async fn drop_table(&self, table: &TableIdent) -> Result<()> { let db_name = validate_namespace(table.namespace())?; let table_name = table.name(); diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index a2161707ff..bd78193732 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -580,7 +580,7 @@ impl Catalog for HmsCatalog { /// attempting to drop the table. This includes scenarios where /// the table does not exist. /// - Any network or communication error occurs with the database backend. - async fn drop_table_with_purge(&self, table: &TableIdent, _purge: bool) -> Result<()> { + async fn drop_table(&self, table: &TableIdent) -> Result<()> { let db_name = validate_namespace(table.namespace())?; if !self.namespace_exists(table.namespace()).await? { return Err(Error::new( diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index da490d7d95..d6b173335f 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -827,17 +827,13 @@ impl Catalog for RestCatalog { } /// Drop a table from the catalog. - async fn drop_table_with_purge(&self, table: &TableIdent, purge: bool) -> Result<()> { + async fn drop_table(&self, table: &TableIdent) -> Result<()> { let context = self.context().await?; - let mut request_builder = context + let request_builder = context .client .request(Method::DELETE, context.config.table_endpoint(table)); - if purge { - request_builder = request_builder.query(&[("purgeRequested", "true")]); - } - let request = request_builder.build()?; let http_response = context.client.query_catalog(request).await?; diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 2bd57d9c74..9b1b2fb36f 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -570,7 +570,7 @@ impl Catalog for S3TablesCatalog { /// This function can return an error in the following situations: /// - Errors from the underlying database deletion process, converted using /// `from_aws_sdk_error`. - async fn drop_table_with_purge(&self, table: &TableIdent, _purge: bool) -> Result<()> { + async fn drop_table(&self, table: &TableIdent) -> Result<()> { let req = self .s3tables_client .delete_table() @@ -581,6 +581,14 @@ impl Catalog for S3TablesCatalog { Ok(()) } + /// Purge a table from the S3 Tables catalog. + /// + /// S3 Tables data is managed by the service, so this just delegates + /// to `drop_table` + async fn purge_table(&self, table: &TableIdent) -> Result<()> { + self.drop_table(table).await + } + /// Checks if a table exists within the s3tables catalog. /// /// Validates the table identifier by querying the s3tables catalog diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 55f738b9c6..195f6c9de4 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -729,7 +729,7 @@ impl Catalog for SqlCatalog { } } - async fn drop_table_with_purge(&self, identifier: &TableIdent, _purge: bool) -> Result<()> { + async fn drop_table(&self, identifier: &TableIdent) -> Result<()> { if !self.table_exists(identifier).await? { return no_such_table_err(identifier); } diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index ffbcb88b45..25ae004417 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -319,7 +319,7 @@ impl Catalog for MemoryCatalog { } /// Drop a table from the catalog. - async fn drop_table_with_purge(&self, table_ident: &TableIdent, _purge: bool) -> Result<()> { + async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> { let mut root_namespace_state = self.root_namespace_state.lock().await; root_namespace_state.remove_existing_table(table_ident)?; diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 59c4947cf5..d255168672 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -96,17 +96,24 @@ pub trait Catalog: Debug + Sync + Send { /// Load table from the catalog. async fn load_table(&self, table: &TableIdent) -> Result
; - /// Drop a table from the catalog and purge its data, or returns error if it doesn't exist. - /// - /// This is equivalent to calling `drop_table_with_purge(table, true)`. - async fn drop_table(&self, table: &TableIdent) -> Result<()> { - self.drop_table_with_purge(table, true).await - } - /// Drop a table from the catalog, or returns error if it doesn't exist. + async fn drop_table(&self, table: &TableIdent) -> Result<()>; + + /// Drop a table from the catalog and delete the underlying table data. /// - /// If `purge` is true, the catalog should also delete the underlying table data. - async fn drop_table_with_purge(&self, table: &TableIdent, purge: bool) -> Result<()>; + /// The default implementation loads the table metadata, drops the table + /// from the catalog, then deletes all associated data and metadata files + /// using [`drop_table_data`](utils::drop_table_data). + async fn purge_table(&self, table: &TableIdent) -> Result<()> { + let table_info = self.load_table(table).await?; + self.drop_table(table).await?; + utils::drop_table_data( + table_info.file_io(), + table_info.metadata(), + table_info.metadata_location(), + ) + .await + } /// Check if a table exists in the catalog. async fn table_exists(&self, table: &TableIdent) -> Result; diff --git a/crates/iceberg/src/catalog/utils.rs b/crates/iceberg/src/catalog/utils.rs index 25d9d0a801..8e5c8c02ae 100644 --- a/crates/iceberg/src/catalog/utils.rs +++ b/crates/iceberg/src/catalog/utils.rs @@ -50,15 +50,9 @@ pub async fn drop_table_data( } // Load all manifests from this snapshot - match snapshot.load_manifest_list(io, metadata).await { - Ok(manifest_list) => { - for manifest_file in manifest_list.entries() { - manifests_to_delete.insert(manifest_file.manifest_path.clone()); - } - } - Err(_) => { - // Suppress failure to continue cleanup - } + let manifest_list = snapshot.load_manifest_list(io, metadata).await?; + for manifest_file in manifest_list.entries() { + manifests_to_delete.insert(manifest_file.manifest_path.clone()); } } From 06ee8f6d0e4634fe88e66be236b60120e7252203 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 18 Mar 2026 15:05:20 -0700 Subject: [PATCH 05/12] revert unneeded changes --- crates/catalog/rest/src/catalog.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index d6b173335f..3551b05160 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -830,11 +830,10 @@ impl Catalog for RestCatalog { async fn drop_table(&self, table: &TableIdent) -> Result<()> { let context = self.context().await?; - let request_builder = context + let request = context .client - .request(Method::DELETE, context.config.table_endpoint(table)); - - let request = request_builder.build()?; + .request(Method::DELETE, context.config.table_endpoint(table)) + .build()?; let http_response = context.client.query_catalog(request).await?; From 68e217a9e981680ab048c51bff61884d2b239017 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 18 Mar 2026 15:10:02 -0700 Subject: [PATCH 06/12] implement purge for rest --- crates/catalog/rest/src/catalog.rs | 55 +++++++++++++++++++----------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 3551b05160..7d5df24d52 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -363,6 +363,35 @@ impl RestCatalog { } } + /// Sends a DELETE request for the given table, optionally requesting purge. + async fn delete_table(&self, table: &TableIdent, purge: bool) -> Result<()> { + let context = self.context().await?; + + let mut request_builder = context + .client + .request(Method::DELETE, context.config.table_endpoint(table)); + + if purge { + request_builder = request_builder.query(&[("purgeRequested", "true")]); + } + + let request = request_builder.build()?; + let http_response = context.client.query_catalog(request).await?; + + match http_response.status() { + StatusCode::NO_CONTENT | StatusCode::OK => Ok(()), + StatusCode::NOT_FOUND => Err(Error::new( + ErrorKind::TableNotFound, + "Tried to drop a table that does not exist", + )), + _ => Err(deserialize_unexpected_catalog_error( + http_response, + context.client.disable_header_redaction(), + ) + .await), + } + } + /// Gets the [`RestContext`] from the catalog. async fn context(&self) -> Result<&RestContext> { self.ctx @@ -828,27 +857,13 @@ impl Catalog for RestCatalog { /// Drop a table from the catalog. async fn drop_table(&self, table: &TableIdent) -> Result<()> { - let context = self.context().await?; - - let request = context - .client - .request(Method::DELETE, context.config.table_endpoint(table)) - .build()?; - - let http_response = context.client.query_catalog(request).await?; + self.delete_table(table, false).await + } - match http_response.status() { - StatusCode::NO_CONTENT | StatusCode::OK => Ok(()), - StatusCode::NOT_FOUND => Err(Error::new( - ErrorKind::TableNotFound, - "Tried to drop a table that does not exist", - )), - _ => Err(deserialize_unexpected_catalog_error( - http_response, - context.client.disable_header_redaction(), - ) - .await), - } + /// Drop a table from the catalog and purge its data by sending + /// `purgeRequested=true` to the REST server. + async fn purge_table(&self, table: &TableIdent) -> Result<()> { + self.delete_table(table, true).await } /// Check if a table exists in the catalog. From a98a55b2b138b1ca09567093aafcac64ed6ee6f7 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 18 Mar 2026 15:24:09 -0700 Subject: [PATCH 07/12] add purge_table tests --- crates/catalog/loader/tests/table_suite.rs | 77 ++++++++++++++++++++++ crates/iceberg/src/catalog/mod.rs | 2 +- 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/crates/catalog/loader/tests/table_suite.rs b/crates/catalog/loader/tests/table_suite.rs index 6b7a3a822c..cdc9b11043 100644 --- a/crates/catalog/loader/tests/table_suite.rs +++ b/crates/catalog/loader/tests/table_suite.rs @@ -274,3 +274,80 @@ async fn test_catalog_drop_table_missing_errors(#[case] kind: CatalogKind) -> Re assert!(catalog.drop_table(&table_ident).await.is_err()); Ok(()) } + +// Common behavior: purge_table removes the table from the catalog. +#[rstest] +#[case::rest_catalog(CatalogKind::Rest)] +#[case::glue_catalog(CatalogKind::Glue)] +#[case::hms_catalog(CatalogKind::Hms)] +#[case::sql_catalog(CatalogKind::Sql)] +#[case::s3tables_catalog(CatalogKind::S3Tables)] +#[case::memory_catalog(CatalogKind::Memory)] +#[tokio::test] +async fn test_catalog_purge_table(#[case] kind: CatalogKind) -> Result<()> { + let Some(harness) = load_catalog(kind).await else { + return Ok(()); + }; + let catalog = harness.catalog; + let namespace = NamespaceIdent::new(normalize_test_name_with_parts!( + "catalog_purge_table", + harness.label + )); + + cleanup_namespace_dyn(catalog.as_ref(), &namespace).await; + catalog.create_namespace(&namespace, HashMap::new()).await?; + + let table_name = normalize_test_name_with_parts!("catalog_purge_table", harness.label, "table"); + let table = catalog + .create_table(&namespace, table_creation(table_name)) + .await?; + let ident = table.identifier().clone(); + + assert!(catalog.table_exists(&ident).await?); + + // Capture metadata location and file_io before purge so we can verify + // that the underlying files are actually deleted. + let metadata_location = table.metadata_location().map(|s| s.to_string()); + let file_io = table.file_io().clone(); + + catalog.purge_table(&ident).await?; + assert!(!catalog.table_exists(&ident).await?); + + if let Some(location) = &metadata_location { + assert!( + !file_io.exists(location).await?, + "Metadata file should have been deleted after purge" + ); + } + + catalog.drop_namespace(&namespace).await?; + + Ok(()) +} + +// Common behavior: purging a missing table should error. +#[rstest] +#[case::rest_catalog(CatalogKind::Rest)] +#[case::glue_catalog(CatalogKind::Glue)] +#[case::hms_catalog(CatalogKind::Hms)] +#[case::sql_catalog(CatalogKind::Sql)] +#[case::s3tables_catalog(CatalogKind::S3Tables)] +#[case::memory_catalog(CatalogKind::Memory)] +#[tokio::test] +async fn test_catalog_purge_table_missing_errors(#[case] kind: CatalogKind) -> Result<()> { + let Some(harness) = load_catalog(kind).await else { + return Ok(()); + }; + let catalog = harness.catalog; + let namespace = NamespaceIdent::new(normalize_test_name_with_parts!( + "catalog_purge_table_missing_errors", + harness.label + )); + + cleanup_namespace_dyn(catalog.as_ref(), &namespace).await; + catalog.create_namespace(&namespace, HashMap::new()).await?; + + let table_ident = TableIdent::new(namespace.clone(), "missing".to_string()); + assert!(catalog.purge_table(&table_ident).await.is_err()); + Ok(()) +} diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index d255168672..11e37eb26d 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -19,7 +19,7 @@ pub mod memory; mod metadata_location; -pub mod utils; +pub(crate) mod utils; use std::collections::HashMap; use std::fmt::{Debug, Display}; From ce27668603bdbed9c9418a4e72efad76bd0bc528 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 20 Mar 2026 10:43:00 -0700 Subject: [PATCH 08/12] optimize concurrency, remove default impl for purge_table --- crates/catalog/glue/src/catalog.rs | 11 ++++ crates/catalog/hms/src/catalog.rs | 11 ++++ crates/catalog/sql/src/catalog.rs | 11 ++++ crates/iceberg/src/catalog/memory/catalog.rs | 11 ++++ crates/iceberg/src/catalog/mod.rs | 18 ++----- crates/iceberg/src/catalog/utils.rs | 53 +++++++++++--------- crates/iceberg/src/lib.rs | 1 + 7 files changed, 79 insertions(+), 37 deletions(-) diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 9e9d4580c3..a7e0171337 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -659,6 +659,17 @@ impl Catalog for GlueCatalog { Ok(()) } + async fn purge_table(&self, table: &TableIdent) -> Result<()> { + let table_info = self.load_table(table).await?; + self.drop_table(table).await?; + iceberg::drop_table_data( + table_info.file_io(), + table_info.metadata(), + table_info.metadata_location(), + ) + .await + } + /// Asynchronously checks the existence of a specified table /// in the database. /// diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index bd78193732..4a030c1104 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -604,6 +604,17 @@ impl Catalog for HmsCatalog { Ok(()) } + async fn purge_table(&self, table: &TableIdent) -> Result<()> { + let table_info = self.load_table(table).await?; + self.drop_table(table).await?; + iceberg::drop_table_data( + table_info.file_io(), + table_info.metadata(), + table_info.metadata_location(), + ) + .await + } + /// Asynchronously checks the existence of a specified table /// in the database. /// diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 195f6c9de4..7e468e7e37 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -757,6 +757,17 @@ impl Catalog for SqlCatalog { Ok(()) } + async fn purge_table(&self, table: &TableIdent) -> Result<()> { + let table_info = self.load_table(table).await?; + self.drop_table(table).await?; + iceberg::drop_table_data( + table_info.file_io(), + table_info.metadata(), + table_info.metadata_location(), + ) + .await + } + async fn load_table(&self, identifier: &TableIdent) -> Result
{ if !self.table_exists(identifier).await? { return no_such_table_err(identifier); diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 25ae004417..8fa5c479c3 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -326,6 +326,17 @@ impl Catalog for MemoryCatalog { Ok(()) } + async fn purge_table(&self, table_ident: &TableIdent) -> Result<()> { + let table_info = self.load_table(table_ident).await?; + self.drop_table(table_ident).await?; + crate::catalog::utils::drop_table_data( + table_info.file_io(), + table_info.metadata(), + table_info.metadata_location(), + ) + .await + } + /// Check if a table exists in the catalog. async fn table_exists(&self, table_ident: &TableIdent) -> Result { let root_namespace_state = self.root_namespace_state.lock().await; diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 11e37eb26d..f296cf2260 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -101,19 +101,11 @@ pub trait Catalog: Debug + Sync + Send { /// Drop a table from the catalog and delete the underlying table data. /// - /// The default implementation loads the table metadata, drops the table - /// from the catalog, then deletes all associated data and metadata files - /// using [`drop_table_data`](utils::drop_table_data). - async fn purge_table(&self, table: &TableIdent) -> Result<()> { - let table_info = self.load_table(table).await?; - self.drop_table(table).await?; - utils::drop_table_data( - table_info.file_io(), - table_info.metadata(), - table_info.metadata_location(), - ) - .await - } + /// Implementations should load the table metadata, drop the table + /// from the catalog, then delete all associated data and metadata files. + /// The [`drop_table_data`](utils::drop_table_data) utility function can + /// be used for the file cleanup step. + async fn purge_table(&self, table: &TableIdent) -> Result<()>; /// Check if a table exists in the catalog. async fn table_exists(&self, table: &TableIdent) -> Result; diff --git a/crates/iceberg/src/catalog/utils.rs b/crates/iceberg/src/catalog/utils.rs index 8e5c8c02ae..653725cf4d 100644 --- a/crates/iceberg/src/catalog/utils.rs +++ b/crates/iceberg/src/catalog/utils.rs @@ -19,12 +19,14 @@ use std::collections::HashSet; -use futures::stream; +use futures::{TryStreamExt, stream}; use crate::Result; use crate::io::FileIO; use crate::spec::TableMetadata; +const DELETE_CONCURRENCY: usize = 10; + /// Deletes all data and metadata files referenced by the given table metadata. /// /// This mirrors the Java implementation's `CatalogUtil.dropTableData`. @@ -42,15 +44,18 @@ pub async fn drop_table_data( let mut manifest_lists_to_delete: HashSet = HashSet::new(); let mut manifests_to_delete: HashSet = HashSet::new(); - for snapshot in metadata.snapshots() { - // Collect the manifest list location - let manifest_list_location = snapshot.manifest_list(); + // Load all manifest lists concurrently + let results: Vec<_> = + futures::future::try_join_all(metadata.snapshots().map(|snapshot| async { + let manifest_list = snapshot.load_manifest_list(io, metadata).await?; + Ok::<_, crate::Error>((snapshot.manifest_list().to_string(), manifest_list)) + })) + .await?; + + for (manifest_list_location, manifest_list) in results { if !manifest_list_location.is_empty() { - manifest_lists_to_delete.insert(manifest_list_location.to_string()); + manifest_lists_to_delete.insert(manifest_list_location); } - - // Load all manifests from this snapshot - let manifest_list = snapshot.load_manifest_list(io, metadata).await?; for manifest_file in manifest_list.entries() { manifests_to_delete.insert(manifest_file.manifest_path.clone()); } @@ -99,21 +104,21 @@ pub async fn drop_table_data( Ok(()) } -/// Reads each manifest and deletes the data files referenced within. +/// Reads manifests concurrently and deletes the data files referenced within. async fn delete_data_files(io: &FileIO, manifest_paths: &HashSet) -> Result<()> { - for manifest_path in manifest_paths { - let input = io.new_input(manifest_path)?; - let manifest_content = input.read().await?; - let manifest = crate::spec::Manifest::parse_avro(&manifest_content)?; - - let data_file_paths: Vec = manifest - .entries() - .iter() - .map(|entry| entry.data_file.file_path().to_string()) - .collect(); - - io.delete_stream(stream::iter(data_file_paths)).await?; - } - - Ok(()) + stream::iter(manifest_paths.iter().map(Ok)) + .try_for_each_concurrent(DELETE_CONCURRENCY, |manifest_path| async move { + let input = io.new_input(manifest_path)?; + let manifest_content = input.read().await?; + let manifest = crate::spec::Manifest::parse_avro(&manifest_bytes)?; + + let data_file_paths = manifest + .entries() + .iter() + .map(|entry| entry.data_file.file_path().to_string()) + .collect::>(); + + io.delete_stream(stream::iter(data_file_paths)).await + }) + .await } diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 8b345deb6e..d06c04e520 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -71,6 +71,7 @@ pub use error::{Error, ErrorKind, Result}; mod catalog; +pub use catalog::utils::drop_table_data; pub use catalog::*; pub mod table; From 5d24a4162cdceb734dc0365885c12948cb2dd410 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 20 Mar 2026 10:51:22 -0700 Subject: [PATCH 09/12] minor --- crates/iceberg/src/catalog/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/catalog/utils.rs b/crates/iceberg/src/catalog/utils.rs index 653725cf4d..d450f9df80 100644 --- a/crates/iceberg/src/catalog/utils.rs +++ b/crates/iceberg/src/catalog/utils.rs @@ -110,7 +110,7 @@ async fn delete_data_files(io: &FileIO, manifest_paths: &HashSet) -> Res .try_for_each_concurrent(DELETE_CONCURRENCY, |manifest_path| async move { let input = io.new_input(manifest_path)?; let manifest_content = input.read().await?; - let manifest = crate::spec::Manifest::parse_avro(&manifest_bytes)?; + let manifest = crate::spec::Manifest::parse_avro(&manifest_content)?; let data_file_paths = manifest .entries() From 8cc843ce83a2b60703ae458cd670d6407c623555 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 24 Mar 2026 13:44:29 -0700 Subject: [PATCH 10/12] fail s3table::drop_table --- crates/catalog/s3tables/src/catalog.rs | 29 +++++++++++--------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 9b1b2fb36f..2c1cd3063f 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -562,15 +562,18 @@ impl Catalog for S3TablesCatalog { Ok(self.load_table_with_version_token(table_ident).await?.0) } - /// Drops an existing table from the s3tables catalog. - /// - /// Validates the table identifier and then deletes the corresponding - /// table from the s3tables catalog. - /// - /// This function can return an error in the following situations: - /// - Errors from the underlying database deletion process, converted using - /// `from_aws_sdk_error`. - async fn drop_table(&self, table: &TableIdent) -> Result<()> { + /// Not supported for S3Tables. Use `purge_table` instead. + /// + /// S3 Tables doesn't support soft delete, so dropping a table will permanently remove it from the catalog. + async fn drop_table(&self, _table: &TableIdent) -> Result<()> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "drop_table is not supported for S3Tables; use purge_table instead", + )) + } + + /// Purge a table from the S3 Tables catalog. + async fn purge_table(&self, table: &TableIdent) -> Result<()> { let req = self .s3tables_client .delete_table() @@ -581,14 +584,6 @@ impl Catalog for S3TablesCatalog { Ok(()) } - /// Purge a table from the S3 Tables catalog. - /// - /// S3 Tables data is managed by the service, so this just delegates - /// to `drop_table` - async fn purge_table(&self, table: &TableIdent) -> Result<()> { - self.drop_table(table).await - } - /// Checks if a table exists within the s3tables catalog. /// /// Validates the table identifier by querying the s3tables catalog From ee20b3819d1a393102c6d7c2726e6e9082e677f4 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 24 Mar 2026 13:46:38 -0700 Subject: [PATCH 11/12] fmt --- crates/catalog/s3tables/src/catalog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 2c1cd3063f..b88bd77d29 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -563,7 +563,7 @@ impl Catalog for S3TablesCatalog { } /// Not supported for S3Tables. Use `purge_table` instead. - /// + /// /// S3 Tables doesn't support soft delete, so dropping a table will permanently remove it from the catalog. async fn drop_table(&self, _table: &TableIdent) -> Result<()> { Err(Error::new( From 43ad3fa50be805e0795ea66f613ec5b37230d67d Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 24 Mar 2026 15:03:28 -0700 Subject: [PATCH 12/12] use purge_table instead to cleanup --- crates/catalog/loader/tests/common/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/loader/tests/common/mod.rs b/crates/catalog/loader/tests/common/mod.rs index 90b72df8ab..600cd9b6f4 100644 --- a/crates/catalog/loader/tests/common/mod.rs +++ b/crates/catalog/loader/tests/common/mod.rs @@ -335,7 +335,7 @@ pub fn assert_map_contains(expected: &HashMap, actual: &HashMap< pub async fn cleanup_namespace_dyn(catalog: &dyn Catalog, namespace: &NamespaceIdent) { if let Ok(tables) = catalog.list_tables(namespace).await { for table in tables { - let _ = catalog.drop_table(&table).await; + let _ = catalog.purge_table(&table).await; } } let _ = catalog.drop_namespace(namespace).await;