diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 5b3ccf3b39..c51f6a6a89 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -33,7 +33,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, - TableCommit, TableCreation, TableIdent, + Runtime, TableCommit, TableCreation, TableIdent, }; use iceberg_storage_opendal::OpenDalStorageFactory; @@ -58,6 +58,7 @@ pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; pub struct GlueCatalogBuilder { config: GlueCatalogConfig, storage_factory: Option>, + runtime: Option, } impl Default for GlueCatalogBuilder { @@ -71,6 +72,7 @@ impl Default for GlueCatalogBuilder { props: HashMap::new(), }, storage_factory: None, + runtime: None, } } } @@ -83,6 +85,11 @@ impl CatalogBuilder for GlueCatalogBuilder { self } + fn with_runtime(mut self, runtime: Runtime) -> Self { + self.runtime = Some(runtime); + self + } + fn load( mut self, name: impl Into, @@ -129,7 +136,11 @@ impl CatalogBuilder for GlueCatalogBuilder { )); } - GlueCatalog::new(self.config, self.storage_factory).await + let runtime = match self.runtime { + Some(rt) => rt, + None => Runtime::try_current()?, + }; + GlueCatalog::new(self.config, self.storage_factory, runtime).await } } } @@ -151,6 +162,7 @@ pub struct GlueCatalog { config: GlueCatalogConfig, client: GlueClient, file_io: FileIO, + runtime: Runtime, } impl Debug for GlueCatalog { @@ -166,6 +178,7 @@ impl GlueCatalog { async fn new( config: GlueCatalogConfig, storage_factory: Option>, + runtime: Runtime, ) -> Result { let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await; let mut file_io_props = config.props.clone(); @@ -214,6 +227,7 @@ impl GlueCatalog { config, client: GlueClient(client), file_io, + runtime, }) } /// Get the catalogs `FileIO` @@ -272,6 +286,7 @@ impl GlueCatalog { NamespaceIdent::new(db_name), table_name.to_owned(), )) + .runtime(self.runtime.clone()) .build()?; Ok((table, version_id)) @@ -611,6 +626,7 @@ impl Catalog for GlueCatalog { .metadata_location(metadata_location_str) .metadata(metadata) .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name)) + .runtime(self.runtime.clone()) .build() } @@ -845,6 +861,7 @@ impl Catalog for GlueCatalog { .metadata_location(metadata_location) .metadata(metadata) .file_io(self.file_io()) + .runtime(self.runtime.clone()) .build()?) } diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 4a030c1104..d778a3d5fc 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -31,7 +31,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, - TableCommit, TableCreation, TableIdent, + Runtime, TableCommit, TableCreation, TableIdent, }; use volo_thrift::MaybeException; @@ -56,6 +56,7 @@ pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; pub struct HmsCatalogBuilder { config: HmsCatalogConfig, storage_factory: Option>, + runtime: Option, } impl Default for HmsCatalogBuilder { @@ -69,6 +70,7 @@ impl Default for HmsCatalogBuilder { props: HashMap::new(), }, storage_factory: None, + runtime: None, } } } @@ -81,6 +83,11 @@ impl CatalogBuilder for HmsCatalogBuilder { self } + fn with_runtime(mut self, runtime: Runtime) -> Self { + self.runtime = Some(runtime); + self + } + fn load( mut self, name: impl Into, @@ -116,26 +123,31 @@ impl CatalogBuilder for HmsCatalogBuilder { }) .collect(); - let result = { + let result = (|| -> Result { if self.config.name.is_none() { - Err(Error::new( + return Err(Error::new( ErrorKind::DataInvalid, "Catalog name is required", - )) - } else if self.config.address.is_empty() { - Err(Error::new( + )); + } + if self.config.address.is_empty() { + return Err(Error::new( ErrorKind::DataInvalid, "Catalog address is required", - )) - } else if self.config.warehouse.is_empty() { - Err(Error::new( + )); + } + if self.config.warehouse.is_empty() { + return Err(Error::new( ErrorKind::DataInvalid, "Catalog warehouse is required", - )) - } else { - HmsCatalog::new(self.config, self.storage_factory) + )); } - }; + let runtime = match self.runtime { + Some(rt) => rt, + None => Runtime::try_current()?, + }; + HmsCatalog::new(self.config, self.storage_factory, runtime) + })(); std::future::ready(result) } @@ -169,6 +181,7 @@ pub struct HmsCatalog { config: HmsCatalogConfig, client: HmsClient, file_io: FileIO, + runtime: Runtime, } impl Debug for HmsCatalog { @@ -184,6 +197,7 @@ impl HmsCatalog { fn new( config: HmsCatalogConfig, storage_factory: Option>, + runtime: Runtime, ) -> Result { let address = config .address @@ -223,6 +237,7 @@ impl HmsCatalog { config, client: HmsClient(client), file_io, + runtime, }) } /// Get the catalogs `FileIO` @@ -529,6 +544,7 @@ impl Catalog for HmsCatalog { .metadata_location(metadata_location_str) .metadata(metadata) .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name)) + .runtime(self.runtime.clone()) .build() } @@ -567,6 +583,7 @@ impl Catalog for HmsCatalog { NamespaceIdent::new(db_name), table.name.clone(), )) + .runtime(self.runtime.clone()) .build() } diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 7d5df24d52..72780343d3 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -26,8 +26,8 @@ use async_trait::async_trait; use iceberg::io::{FileIO, FileIOBuilder, StorageFactory}; use iceberg::table::Table; use iceberg::{ - Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, - TableCreation, TableIdent, + Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, Runtime, + TableCommit, TableCreation, TableIdent, }; use itertools::Itertools; use reqwest::header::{ @@ -62,6 +62,7 @@ const PATH_V1: &str = "v1"; pub struct RestCatalogBuilder { config: RestCatalogConfig, storage_factory: Option>, + runtime: Option, } impl Default for RestCatalogBuilder { @@ -75,6 +76,7 @@ impl Default for RestCatalogBuilder { client: None, }, storage_factory: None, + runtime: None, } } } @@ -87,6 +89,11 @@ impl CatalogBuilder for RestCatalogBuilder { self } + fn with_runtime(mut self, runtime: Runtime) -> Self { + self.runtime = Some(runtime); + self + } + fn load( mut self, name: impl Into, @@ -123,7 +130,8 @@ impl CatalogBuilder for RestCatalogBuilder { "Catalog uri is required", )) } else { - Ok(RestCatalog::new(self.config, self.storage_factory)) + let runtime = self.runtime.unwrap_or_else(Runtime::current); + Ok(RestCatalog::new(self.config, self.storage_factory, runtime)) } }; @@ -351,15 +359,21 @@ pub struct RestCatalog { ctx: OnceCell, /// Storage factory for creating FileIO instances. storage_factory: Option>, + runtime: Runtime, } impl RestCatalog { /// Creates a `RestCatalog` from a [`RestCatalogConfig`]. - fn new(config: RestCatalogConfig, storage_factory: Option>) -> Self { + fn new( + config: RestCatalogConfig, + storage_factory: Option>, + runtime: Runtime, + ) -> Self { Self { user_config: config, ctx: OnceCell::new(), storage_factory, + runtime, } } @@ -790,7 +804,8 @@ impl Catalog for RestCatalog { let table_builder = Table::builder() .identifier(table_ident.clone()) .file_io(file_io) - .metadata(response.metadata); + .metadata(response.metadata) + .runtime(self.runtime.clone()); if let Some(metadata_location) = response.metadata_location { table_builder.metadata_location(metadata_location).build() @@ -846,7 +861,8 @@ impl Catalog for RestCatalog { let table_builder = Table::builder() .identifier(table_ident.clone()) .file_io(file_io) - .metadata(response.metadata); + .metadata(response.metadata) + .runtime(self.runtime.clone()); if let Some(metadata_location) = response.metadata_location { table_builder.metadata_location(metadata_location).build() @@ -982,6 +998,7 @@ impl Catalog for RestCatalog { .file_io(file_io) .metadata(response.metadata) .metadata_location(metadata_location.clone()) + .runtime(self.runtime.clone()) .build() } @@ -1054,6 +1071,7 @@ impl Catalog for RestCatalog { .file_io(file_io) .metadata(response.metadata) .metadata_location(response.metadata_location) + .runtime(self.runtime.clone()) .build() } } @@ -1099,6 +1117,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); assert_eq!( @@ -1173,6 +1192,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let token = catalog.context().await.unwrap().client.token().await; @@ -1220,6 +1240,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let token = catalog.context().await.unwrap().client.token().await; @@ -1244,6 +1265,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let token = catalog.context().await.unwrap().client.token().await; @@ -1275,6 +1297,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let token = catalog.context().await.unwrap().client.token().await; @@ -1306,6 +1329,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let token = catalog.context().await.unwrap().client.token().await; @@ -1337,6 +1361,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let token = catalog.context().await.unwrap().client.token().await; @@ -1450,6 +1475,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let token = catalog.context().await.unwrap().client.token().await; @@ -1497,6 +1523,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let _namespaces = catalog.list_namespaces(None).await.unwrap(); @@ -1527,6 +1554,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let namespaces = catalog.list_namespaces(None).await.unwrap(); @@ -1578,6 +1606,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let namespaces = catalog.list_namespaces(None).await.unwrap(); @@ -1677,6 +1706,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let namespaces = catalog.list_namespaces(None).await.unwrap(); @@ -1730,6 +1760,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let namespaces = catalog @@ -1773,6 +1804,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let namespaces = catalog @@ -1806,6 +1838,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); assert!( @@ -1834,6 +1867,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); catalog @@ -1874,6 +1908,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let tables = catalog @@ -1942,6 +1977,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let tables = catalog @@ -2073,6 +2109,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let tables = catalog @@ -2117,6 +2154,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); catalog @@ -2146,6 +2184,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); assert!( @@ -2177,6 +2216,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); catalog @@ -2211,6 +2251,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let table = catalog @@ -2328,6 +2369,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let table = catalog @@ -2364,6 +2406,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let table_creation = TableCreation::builder() @@ -2513,6 +2556,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let table_creation = TableCreation::builder() @@ -2582,6 +2626,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let table1 = { @@ -2725,6 +2770,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let table1 = { @@ -2789,6 +2835,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let table_ident = TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string()); @@ -2840,6 +2887,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), ); let table_ident = diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index cc43446943..03f4a28de3 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -32,7 +32,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, - TableCommit, TableCreation, TableIdent, + Runtime, TableCommit, TableCreation, TableIdent, }; use iceberg_storage_opendal::OpenDalStorageFactory; @@ -71,6 +71,7 @@ struct S3TablesCatalogConfig { pub struct S3TablesCatalogBuilder { config: S3TablesCatalogConfig, storage_factory: Option>, + runtime: Option, } /// Default builder for [`S3TablesCatalog`]. @@ -85,6 +86,7 @@ impl Default for S3TablesCatalogBuilder { props: HashMap::new(), }, storage_factory: None, + runtime: None, } } } @@ -132,6 +134,11 @@ impl CatalogBuilder for S3TablesCatalogBuilder { self } + fn with_runtime(mut self, runtime: Runtime) -> Self { + self.runtime = Some(runtime); + self + } + fn load( mut self, name: impl Into, @@ -172,7 +179,11 @@ impl CatalogBuilder for S3TablesCatalogBuilder { "Table bucket ARN is required", )) } else { - S3TablesCatalog::new(self.config, self.storage_factory).await + let runtime = match self.runtime { + Some(rt) => rt, + None => Runtime::try_current()?, + }; + S3TablesCatalog::new(self.config, self.storage_factory, runtime).await } } } @@ -184,6 +195,7 @@ pub struct S3TablesCatalog { config: S3TablesCatalogConfig, s3tables_client: aws_sdk_s3tables::Client, file_io: FileIO, + runtime: Runtime, } impl S3TablesCatalog { @@ -191,6 +203,7 @@ impl S3TablesCatalog { async fn new( config: S3TablesCatalogConfig, storage_factory: Option>, + runtime: Runtime, ) -> Result { let s3tables_client = if let Some(client) = config.client.clone() { client @@ -213,6 +226,7 @@ impl S3TablesCatalog { config, s3tables_client, file_io, + runtime, }) } @@ -245,6 +259,7 @@ impl S3TablesCatalog { .metadata(metadata) .metadata_location(metadata_location) .file_io(self.file_io.clone()) + .runtime(self.runtime.clone()) .build()?; Ok((table, resp.version_token)) } @@ -543,6 +558,7 @@ impl Catalog for S3TablesCatalog { .metadata_location(metadata_location_str) .metadata(metadata) .file_io(self.file_io.clone()) + .runtime(self.runtime.clone()) .build()?; Ok(table) } @@ -726,7 +742,9 @@ mod tests { props: HashMap::new(), }; - Ok(Some(S3TablesCatalog::new(config, None).await?)) + Ok(Some( + S3TablesCatalog::new(config, None, Runtime::current()).await?, + )) } #[tokio::test] diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 7e468e7e37..c7bf9d0cfd 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -26,7 +26,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, - TableCommit, TableCreation, TableIdent, + Runtime, TableCommit, TableCreation, TableIdent, }; use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers}; use sqlx::{Any, AnyPool, Row, Transaction}; @@ -67,6 +67,7 @@ static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each con pub struct SqlCatalogBuilder { config: SqlCatalogConfig, storage_factory: Option>, + runtime: Option, } impl Default for SqlCatalogBuilder { @@ -80,6 +81,7 @@ impl Default for SqlCatalogBuilder { props: HashMap::new(), }, storage_factory: None, + runtime: None, } } } @@ -143,6 +145,11 @@ impl CatalogBuilder for SqlCatalogBuilder { self } + fn with_runtime(mut self, runtime: Runtime) -> Self { + self.runtime = Some(runtime); + self + } + fn load( mut self, name: impl Into, @@ -190,7 +197,11 @@ impl CatalogBuilder for SqlCatalogBuilder { )) } else { self.config.name = name; - SqlCatalog::new(self.config, self.storage_factory).await + let runtime = match self.runtime { + Some(rt) => rt, + None => Runtime::try_current()?, + }; + SqlCatalog::new(self.config, self.storage_factory, runtime).await } } } @@ -221,6 +232,7 @@ pub struct SqlCatalog { warehouse_location: String, fileio: FileIO, sql_bind_style: SqlBindStyle, + runtime: Runtime, } #[derive(Debug, PartialEq, strum::EnumString, strum::Display)] @@ -237,6 +249,7 @@ impl SqlCatalog { async fn new( config: SqlCatalogConfig, storage_factory: Option>, + runtime: Runtime, ) -> Result { let factory = storage_factory.ok_or_else(|| { Error::new( @@ -303,6 +316,7 @@ impl SqlCatalog { warehouse_location: config.warehouse_location, fileio, sql_bind_style: config.sql_bind_style, + runtime, }) } @@ -810,6 +824,7 @@ impl Catalog for SqlCatalog { .identifier(identifier.clone()) .metadata_location(tbl_metadata_location) .metadata(metadata) + .runtime(self.runtime.clone()) .build()?) } @@ -880,6 +895,7 @@ impl Catalog for SqlCatalog { .metadata_location(tbl_metadata_location_str) .identifier(tbl_ident) .metadata(tbl_metadata) + .runtime(self.runtime.clone()) .build()?) } @@ -951,6 +967,7 @@ impl Catalog for SqlCatalog { .metadata_location(metadata_location) .metadata(metadata) .file_io(self.fileio.clone()) + .runtime(self.runtime.clone()) .build()?) } diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 18729176dc..58b2eacc35 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -74,7 +74,7 @@ serde_json = { workspace = true } serde_repr = { workspace = true } serde_with = { workspace = true } strum = { workspace = true, features = ["derive"] } -tokio = { workspace = true, optional = false, features = ["sync"] } +tokio = { workspace = true, optional = false, features = ["sync", "rt-multi-thread"] } typed-builder = { workspace = true } typetag = { workspace = true } url = { workspace = true } diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 231971fd54..81e92fcb98 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -31,6 +31,7 @@ use crate::delete_vector::DeleteVector; use crate::expr::Predicate::AlwaysTrue; use crate::expr::{Predicate, Reference}; use crate::io::FileIO; +use crate::runtime::Runtime; use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; use crate::spec::{ DataContentType, Datum, ListType, MapType, NestedField, NestedFieldRef, PartnerAccessor, @@ -46,6 +47,7 @@ pub(crate) struct CachingDeleteFileLoader { /// Shared filter state to allow caching loaded deletes across multiple /// calls to `load_deletes` (e.g., across multiple file scan tasks). delete_filter: DeleteFilter, + runtime: Runtime, } // Intermediate context during processing of a delete file task. @@ -77,12 +79,17 @@ enum ParsedDeleteFileContext { #[allow(unused_variables)] impl CachingDeleteFileLoader { - pub(crate) fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> Self { + pub(crate) fn new( + file_io: FileIO, + concurrency_limit_data_files: usize, + runtime: Runtime, + ) -> Self { let scan_metrics = ScanMetrics::new(); CachingDeleteFileLoader { basic_delete_file_loader: BasicDeleteFileLoader::new(file_io, scan_metrics), concurrency_limit_data_files, - delete_filter: DeleteFilter::default(), + delete_filter: DeleteFilter::new(runtime.clone()), + runtime, } } @@ -181,7 +188,7 @@ impl CachingDeleteFileLoader { let del_filter = self.delete_filter.clone(); let concurrency_limit_data_files = self.concurrency_limit_data_files; let basic_delete_file_loader = self.basic_delete_file_loader.clone(); - crate::runtime::spawn(async move { + self.runtime.io().spawn(async move { let result = async move { let mut del_filter = del_filter; let basic_delete_file_loader = basic_delete_file_loader.clone(); @@ -737,7 +744,8 @@ mod tests { let table_location = tmp_dir.path(); let file_io = FileIO::new_with_fs(); - let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let delete_file_loader = + CachingDeleteFileLoader::new(file_io.clone(), 10, Runtime::current()); let file_scan_tasks = setup(table_location); @@ -958,7 +966,8 @@ mod tests { }; // Load the deletes - should handle both types without error - let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let delete_file_loader = + CachingDeleteFileLoader::new(file_io.clone(), 10, Runtime::current()); let delete_filter = delete_file_loader .load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref()) .await @@ -1030,7 +1039,8 @@ mod tests { let table_location = tmp_dir.path(); let file_io = FileIO::new_with_fs(); - let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let delete_file_loader = + CachingDeleteFileLoader::new(file_io.clone(), 10, Runtime::current()); let file_scan_tasks = setup(table_location); diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 6369938ce2..2dabc11cae 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -24,6 +24,7 @@ use tokio::sync::oneshot::Receiver; use crate::delete_vector::DeleteVector; use crate::expr::Predicate::AlwaysTrue; use crate::expr::{Bind, BoundPredicate, Predicate}; +use crate::runtime::Runtime; use crate::scan::{FileScanTask, FileScanTaskDeleteFile}; use crate::spec::DataContentType; use crate::{Error, ErrorKind, Result}; @@ -53,9 +54,10 @@ struct DeleteFileFilterState { positional_deletes: HashMap, } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub(crate) struct DeleteFilter { state: Arc>, + runtime: Runtime, } /// Action to take when trying to start loading a positional delete file @@ -71,6 +73,14 @@ pub(crate) enum PosDelLoadAction { } impl DeleteFilter { + /// Create a new DeleteFilter with the given runtime. + pub(crate) fn new(runtime: Runtime) -> Self { + Self { + state: Arc::new(RwLock::new(DeleteFileFilterState::default())), + runtime, + } + } + /// Retrieve a delete vector for the data file associated with a given file scan task pub(crate) fn get_delete_vector( &self, @@ -245,7 +255,7 @@ impl DeleteFilter { let state = self.state.clone(); let delete_file_path = delete_file_path.to_string(); - crate::runtime::spawn(async move { + self.runtime.cpu().spawn(async move { let eq_del = eq_del.await.unwrap(); { let mut state = state.write().unwrap(); @@ -292,7 +302,8 @@ pub(crate) mod tests { let table_location = tmp_dir.path(); let file_io = FileIO::new_with_fs(); - let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let delete_file_loader = + CachingDeleteFileLoader::new(file_io.clone(), 10, Runtime::current()); let file_scan_tasks = setup(table_location); @@ -503,7 +514,7 @@ pub(crate) mod tests { case_sensitive: true, }; - let filter = DeleteFilter::default(); + let filter = DeleteFilter::new(Runtime::current()); // ---------- insert equality delete predicate ---------- let pred = Reference::new("id").equal_to(Datum::long(10)); diff --git a/crates/iceberg/src/arrow/reader/mod.rs b/crates/iceberg/src/arrow/reader/mod.rs index c6c41accb7..7e3c2f807f 100644 --- a/crates/iceberg/src/arrow/reader/mod.rs +++ b/crates/iceberg/src/arrow/reader/mod.rs @@ -19,6 +19,7 @@ use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; use crate::io::FileIO; +use crate::runtime::Runtime; use crate::util::available_parallelism; /// Default gap between byte ranges below which they are coalesced into a @@ -53,11 +54,12 @@ pub struct ArrowReaderBuilder { row_group_filtering_enabled: bool, row_selection_enabled: bool, parquet_read_options: ParquetReadOptions, + runtime: Runtime, } impl ArrowReaderBuilder { /// Create a new ArrowReaderBuilder - pub fn new(file_io: FileIO) -> Self { + pub fn new(file_io: FileIO, runtime: Runtime) -> Self { let num_cpus = available_parallelism().get(); ArrowReaderBuilder { @@ -67,6 +69,7 @@ impl ArrowReaderBuilder { row_group_filtering_enabled: true, row_selection_enabled: false, parquet_read_options: ParquetReadOptions::builder().build(), + runtime, } } @@ -129,6 +132,7 @@ impl ArrowReaderBuilder { delete_file_loader: CachingDeleteFileLoader::new( self.file_io.clone(), self.concurrency_limit_data_files, + self.runtime.clone(), ), concurrency_limit_data_files: self.concurrency_limit_data_files, row_group_filtering_enabled: self.row_group_filtering_enabled, diff --git a/crates/iceberg/src/arrow/reader/pipeline.rs b/crates/iceberg/src/arrow/reader/pipeline.rs index 8ecee294c4..edc0a3fa96 100644 --- a/crates/iceberg/src/arrow/reader/pipeline.rs +++ b/crates/iceberg/src/arrow/reader/pipeline.rs @@ -456,6 +456,7 @@ mod tests { use parquet::file::properties::WriterProperties; use tempfile::TempDir; + use crate::Runtime; use crate::arrow::ArrowReaderBuilder; use crate::io::FileIO; use crate::scan::{FileScanTask, FileScanTaskStream}; @@ -487,7 +488,7 @@ mod tests { project_field_ids: Vec, ) -> Vec { let file_io = FileIO::new_with_fs(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); let file_size = std::fs::metadata(file_path).unwrap().len(); let task = FileScanTask { @@ -698,7 +699,7 @@ mod tests { } // Read with concurrency=1 (fast-path) - let reader = ArrowReaderBuilder::new(file_io) + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()) .with_data_file_concurrency_limit(1) .build(); diff --git a/crates/iceberg/src/arrow/reader/positional_deletes.rs b/crates/iceberg/src/arrow/reader/positional_deletes.rs index b2993572c5..6cba80534b 100644 --- a/crates/iceberg/src/arrow/reader/positional_deletes.rs +++ b/crates/iceberg/src/arrow/reader/positional_deletes.rs @@ -166,6 +166,7 @@ mod tests { use roaring::RoaringTreemap; use tempfile::TempDir; + use crate::Runtime; use crate::arrow::{ArrowReader, ArrowReaderBuilder}; use crate::delete_vector::DeleteVector; use crate::io::FileIO; @@ -432,7 +433,7 @@ mod tests { // Step 3: Read the data file with the delete applied let file_io = FileIO::new_with_fs(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); let task = FileScanTask { file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(), @@ -652,7 +653,7 @@ mod tests { ); let file_io = FileIO::new_with_fs(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); // Create FileScanTask that reads ONLY row group 1 via byte range filtering let task = FileScanTask { @@ -867,7 +868,7 @@ mod tests { let rg1_length = row_group_1.compressed_size() as u64; let file_io = FileIO::new_with_fs(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); // Create FileScanTask that reads ONLY row group 1 via byte range filtering let task = FileScanTask { diff --git a/crates/iceberg/src/arrow/reader/projection.rs b/crates/iceberg/src/arrow/reader/projection.rs index deae027e14..2589c78366 100644 --- a/crates/iceberg/src/arrow/reader/projection.rs +++ b/crates/iceberg/src/arrow/reader/projection.rs @@ -431,12 +431,12 @@ mod tests { use parquet::schema::types::SchemaDescriptor; use tempfile::TempDir; - use crate::ErrorKind; use crate::arrow::{ArrowReader, ArrowReaderBuilder}; use crate::expr::{Bind, Reference}; use crate::io::FileIO; use crate::scan::{FileScanTask, FileScanTaskStream}; use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, Type}; + use crate::{ErrorKind, Runtime}; #[test] fn test_arrow_projection_mask() { @@ -576,7 +576,7 @@ message schema { writer.close().unwrap(); // Read the old Parquet file using the NEW schema (with column 'b') - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { file_size_in_bytes: std::fs::metadata(format!("{table_location}/old_file.parquet")) @@ -678,7 +678,7 @@ message schema { writer.write(&to_write).expect("Writing batch"); writer.close().unwrap(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { @@ -780,7 +780,7 @@ message schema { writer.write(&to_write).expect("Writing batch"); writer.close().unwrap(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { @@ -871,7 +871,7 @@ message schema { writer.write(&to_write).expect("Writing batch"); writer.close().unwrap(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { @@ -976,7 +976,7 @@ message schema { } writer.close().unwrap(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { @@ -1110,7 +1110,7 @@ message schema { writer.write(&to_write).expect("Writing batch"); writer.close().unwrap(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { @@ -1211,7 +1211,7 @@ message schema { writer.write(&to_write).expect("Writing batch"); writer.close().unwrap(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { @@ -1322,7 +1322,7 @@ message schema { let predicate = Reference::new("id").less_than(Datum::int(5)); // Enable both row_group_filtering and row_selection - triggered the panic - let reader = ArrowReaderBuilder::new(file_io) + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()) .with_row_group_filtering_enabled(true) .with_row_selection_enabled(true) .build(); @@ -1470,7 +1470,7 @@ message schema { writer.close().unwrap(); // Read the Parquet file with partition spec and data - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { file_size_in_bytes: std::fs::metadata(format!("{table_location}/data.parquet")) @@ -1680,7 +1680,7 @@ message schema { let predicate = Reference::new("id").greater_than(Datum::int(1)); - let reader = ArrowReaderBuilder::new(FileIO::new_with_fs()) + let reader = ArrowReaderBuilder::new(FileIO::new_with_fs(), Runtime::current()) .with_row_group_filtering_enabled(true) .with_row_selection_enabled(true) .build(); diff --git a/crates/iceberg/src/arrow/reader/row_filter.rs b/crates/iceberg/src/arrow/reader/row_filter.rs index 80432a0437..bc5a05e473 100644 --- a/crates/iceberg/src/arrow/reader/row_filter.rs +++ b/crates/iceberg/src/arrow/reader/row_filter.rs @@ -204,6 +204,7 @@ mod tests { use parquet::file::properties::WriterProperties; use tempfile::TempDir; + use crate::Runtime; use crate::arrow::{ArrowReader, ArrowReaderBuilder}; use crate::expr::{Bind, Predicate, Reference}; use crate::io::FileIO; @@ -320,7 +321,7 @@ mod tests { let (file_io, schema, table_location, _temp_dir) = setup_kleene_logic(data_for_col_a, DataType::Utf8); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); let result_data = test_perform_read(predicate, schema, table_location, reader).await; @@ -342,7 +343,7 @@ mod tests { let (file_io, schema, table_location, _temp_dir) = setup_kleene_logic(data_for_col_a, DataType::Utf8); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); let result_data = test_perform_read(predicate, schema, table_location, reader).await; @@ -406,7 +407,7 @@ mod tests { let (file_io, schema, table_location, _temp_dir) = setup_kleene_logic(data_for_col_a, DataType::LargeUtf8); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); for (predicate, expected) in predicates { println!("testing predicate {predicate}"); @@ -513,7 +514,7 @@ mod tests { ); let file_io = FileIO::new_with_fs(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); // Task 1: read only the first row group let task1 = FileScanTask { diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 8fa5c479c3..12895b45ff 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -27,6 +27,7 @@ use itertools::Itertools; use super::namespace_state::NamespaceState; use crate::io::{FileIO, FileIOBuilder, MemoryStorageFactory, StorageFactory}; +use crate::runtime::Runtime; use crate::spec::{TableMetadata, TableMetadataBuilder}; use crate::table::Table; use crate::{ @@ -45,6 +46,7 @@ const LOCATION: &str = "location"; pub struct MemoryCatalogBuilder { config: MemoryCatalogConfig, storage_factory: Option>, + runtime: Option, } impl Default for MemoryCatalogBuilder { @@ -56,6 +58,7 @@ impl Default for MemoryCatalogBuilder { props: HashMap::new(), }, storage_factory: None, + runtime: None, } } } @@ -68,6 +71,11 @@ impl CatalogBuilder for MemoryCatalogBuilder { self } + fn with_runtime(mut self, runtime: Runtime) -> Self { + self.runtime = Some(runtime); + self + } + fn load( mut self, name: impl Into, @@ -100,7 +108,8 @@ impl CatalogBuilder for MemoryCatalogBuilder { "Catalog warehouse is required", )) } else { - MemoryCatalog::new(self.config, self.storage_factory) + let runtime = self.runtime.unwrap_or_else(Runtime::current); + MemoryCatalog::new(self.config, self.storage_factory, runtime) } }; @@ -121,6 +130,7 @@ pub struct MemoryCatalog { root_namespace_state: Mutex, file_io: FileIO, warehouse_location: String, + runtime: Runtime, } impl MemoryCatalog { @@ -128,6 +138,7 @@ impl MemoryCatalog { fn new( config: MemoryCatalogConfig, storage_factory: Option>, + runtime: Runtime, ) -> Result { // Use provided factory or default to MemoryStorageFactory let factory = storage_factory.unwrap_or_else(|| Arc::new(MemoryStorageFactory)); @@ -136,6 +147,7 @@ impl MemoryCatalog { root_namespace_state: Mutex::new(NamespaceState::default()), file_io: FileIOBuilder::new(factory).with_props(config.props).build(), warehouse_location: config.warehouse, + runtime, }) } @@ -153,6 +165,7 @@ impl MemoryCatalog { .metadata(metadata) .metadata_location(metadata_location.to_string()) .file_io(self.file_io.clone()) + .runtime(self.runtime.clone()) .build() } } @@ -307,6 +320,7 @@ impl Catalog for MemoryCatalog { .metadata_location(metadata_location.to_string()) .metadata(metadata) .identifier(table_ident) + .runtime(self.runtime.clone()) .build() } @@ -378,6 +392,7 @@ impl Catalog for MemoryCatalog { .metadata_location(metadata_location) .metadata(metadata) .identifier(table_ident.clone()) + .runtime(self.runtime.clone()) .build() } @@ -1870,6 +1885,11 @@ pub(crate) mod tests { // Assert the table doesn't contain the update yet assert!(!table.metadata().properties().contains_key("key")); + // `last_updated_ms` is millisecond-resolution `chrono::Utc::now()`. + // Without this sleep, create and commit can land in the same + // millisecond and the `<` assertion below flakes. + tokio::time::sleep(std::time::Duration::from_millis(2)).await; + // Update table metadata let tx = Transaction::new(&table); let updated_table = tx diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 43102adec9..b9dcdb2b40 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -40,6 +40,7 @@ use typed_builder::TypedBuilder; use uuid::Uuid; use crate::io::StorageFactory; +use crate::runtime::Runtime; use crate::spec::{ EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot, SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder, @@ -151,6 +152,13 @@ pub trait CatalogBuilder: Default + Debug + Send + Sync { /// ``` fn with_storage_factory(self, storage_factory: Arc) -> Self; + /// Set a custom tokio Runtime to use for spawning async tasks. + /// + /// When a Runtime is provided, the catalog will propagate it to all tables + /// it creates. Tasks such as scan planning and delete file processing + /// will be spawned on this runtime. + fn with_runtime(self, runtime: Runtime) -> Self; + /// Create a new catalog instance. fn load( self, diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index 4f6fd28483..07394bb855 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -23,7 +23,7 @@ use futures::StreamExt; use futures::channel::mpsc::{Sender, channel}; use tokio::sync::Notify; -use crate::runtime::spawn; +use crate::runtime::Runtime; use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; use crate::spec::{DataContentType, DataFile, Struct}; @@ -53,7 +53,7 @@ struct PopulatedDeleteFileIndex { impl DeleteFileIndex { /// create a new `DeleteFileIndex` along with the sender that populates it with delete files - pub(crate) fn new() -> (DeleteFileIndex, Sender) { + pub(crate) fn new(runtime: Runtime) -> (DeleteFileIndex, Sender) { // TODO: what should the channel limit be? let (tx, rx) = channel(10); let notify = Arc::new(Notify::new()); @@ -62,7 +62,7 @@ impl DeleteFileIndex { ))); let delete_file_stream = rx.boxed(); - spawn({ + runtime.io().spawn({ let state = state.clone(); async move { let delete_files: Vec = diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index ae0708146b..4e346460f5 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -90,6 +90,7 @@ pub mod transaction; pub mod transform; mod runtime; +pub use runtime::{Runtime, RuntimeHandle}; pub mod arrow; pub(crate) mod delete_file_index; diff --git a/crates/iceberg/src/runtime/mod.rs b/crates/iceberg/src/runtime/mod.rs index 61aa623f58..2202995766 100644 --- a/crates/iceberg/src/runtime/mod.rs +++ b/crates/iceberg/src/runtime/mod.rs @@ -17,59 +17,304 @@ // This module contains the async runtime abstraction for iceberg. +use std::fmt; use std::future::Future; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use tokio::task; +use crate::{Error, ErrorKind, Result}; + +/// Wrapper around tokio's `JoinHandle` that converts task failures into +/// [`iceberg::Error`]. +/// +/// Tokio's `JoinHandle` resolves to `Result`, where a +/// `JoinError` means the task either panicked or was cancelled (typically from +/// runtime shutdown or `abort`). Both are surfaced here as +/// `ErrorKind::Unexpected` with the original `JoinError` preserved as the +/// source. pub struct JoinHandle(task::JoinHandle); impl Unpin for JoinHandle {} impl Future for JoinHandle { - type Output = T; + type Output = crate::Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.get_mut() { - JoinHandle(handle) => Pin::new(handle) - .poll(cx) - .map(|r| r.expect("tokio spawned task failed")), - } + Pin::new(&mut self.get_mut().0).poll(cx).map(|r| { + r.map_err(|e| Error::new(ErrorKind::Unexpected, "spawned task failed").with_source(e)) + }) + } +} + +/// Handle to a single tokio runtime. +/// +/// Wraps a [`tokio::runtime::Handle`], which is cheap to clone. The caller is +/// responsible for keeping the underlying runtime alive while this handle is +/// in use; spawning on a shut-down runtime will surface as a `JoinError` via +/// [`JoinHandle`]. +#[derive(Clone)] +pub struct RuntimeHandle { + handle: tokio::runtime::Handle, +} + +impl fmt::Debug for RuntimeHandle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RuntimeHandle").finish() + } +} + +impl RuntimeHandle { + fn from_tokio_handle(handle: tokio::runtime::Handle) -> Self { + Self { handle } + } + + /// Spawn an async task. + pub fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + JoinHandle(self.handle.spawn(future)) + } + + /// Spawn a blocking task. + pub fn spawn_blocking(&self, f: F) -> JoinHandle + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, + { + JoinHandle(self.handle.spawn_blocking(f)) } } -#[allow(dead_code)] -pub fn spawn(f: F) -> JoinHandle -where - F: std::future::Future + Send + 'static, - F::Output: Send + 'static, -{ - JoinHandle(task::spawn(f)) +/// Iceberg's runtime abstraction. +/// +/// Contains separate handles for IO-bound and CPU-bound work. When constructed +/// with a single tokio runtime, both `io()` and `cpu()` route to the same one. +/// Use [`Runtime::new_with_split`] to provide dedicated runtimes for each +/// category. +/// +/// # Lifetime +/// +/// A `Runtime` stores only `tokio::runtime::Handle`s (weak references). The +/// caller owns the tokio runtime's lifetime. If the underlying runtime is +/// dropped while iceberg is still using it, subsequent spawns will surface as +/// task cancellation errors via [`JoinHandle`]. +/// +/// Cloning is cheap. +#[derive(Clone)] +pub struct Runtime { + io: RuntimeHandle, + cpu: RuntimeHandle, } -#[allow(dead_code)] -pub fn spawn_blocking(f: F) -> JoinHandle -where - F: FnOnce() -> T + Send + 'static, - T: Send + 'static, -{ - JoinHandle(task::spawn_blocking(f)) +impl fmt::Debug for Runtime { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Runtime").finish() + } +} + +impl Runtime { + /// Create a Runtime backed by a single tokio runtime for all work. + pub fn new(runtime: Arc) -> Self { + let handle = RuntimeHandle::from_tokio_handle(runtime.handle().clone()); + Self { + io: handle.clone(), + cpu: handle, + } + } + + /// Create a Runtime with separate tokio runtimes for IO and CPU work. + pub fn new_with_split( + io_runtime: Arc, + cpu_runtime: Arc, + ) -> Self { + Self { + io: RuntimeHandle::from_tokio_handle(io_runtime.handle().clone()), + cpu: RuntimeHandle::from_tokio_handle(cpu_runtime.handle().clone()), + } + } + + /// Borrows the tokio runtime the caller is currently running in. + /// + /// Panics if called outside a tokio runtime context. Use + /// [`Runtime::try_current`] for a fallible version. + /// + /// Iceberg never implicitly spawns its own runtime; callers outside a + /// tokio context must construct one explicitly via [`Runtime::new`] or + /// [`Runtime::new_with_split`]. + pub fn current() -> Self { + Self::try_current().expect( + "Runtime::current() called outside a tokio runtime context. \ + Call it from within #[tokio::main] / #[tokio::test], or construct \ + a Runtime explicitly via Runtime::new / Runtime::new_with_split.", + ) + } + + /// Fallible variant of [`Runtime::current`]. Returns an error if no tokio + /// runtime is available in the current context. + pub fn try_current() -> Result { + let handle = tokio::runtime::Handle::try_current().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "no tokio runtime in context; call Runtime::try_current() \ + from within a tokio runtime, or construct a Runtime explicitly \ + via Runtime::new / Runtime::new_with_split", + ) + .with_source(e) + })?; + let rh = RuntimeHandle::from_tokio_handle(handle); + Ok(Self { + io: rh.clone(), + cpu: rh, + }) + } + + /// Handle for IO-bound work (network fetches, file reads). + pub fn io(&self) -> &RuntimeHandle { + &self.io + } + + /// Handle for CPU-bound work (decoding, predicate eval, projection). + pub fn cpu(&self) -> &RuntimeHandle { + &self.cpu + } } #[cfg(test)] mod tests { use super::*; - #[tokio::test] - async fn test_tokio_spawn() { - let handle = spawn(async { 1 + 1 }); - assert_eq!(handle.await, 2); + /// A test harness that owns a tokio runtime and exposes a `Runtime` handle + /// plus a `block_on` helper for sync test bodies. + struct TestRuntime { + tokio: Arc, + rt: Runtime, + } + + impl TestRuntime { + fn new() -> Self { + let tokio = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("Failed to build tokio runtime"), + ); + let rt = Runtime::new(tokio.clone()); + Self { tokio, rt } + } + + fn block_on(&self, f: F) -> F::Output { + self.tokio.block_on(f) + } + } + + #[test] + fn test_runtime_spawn_io() { + let h = TestRuntime::new(); + let handle = h.rt.io().spawn(async { 1 + 1 }); + assert_eq!(h.block_on(handle).unwrap(), 2); + } + + #[test] + fn test_runtime_spawn_cpu() { + let h = TestRuntime::new(); + let handle = h.rt.cpu().spawn(async { 3 + 4 }); + assert_eq!(h.block_on(handle).unwrap(), 7); + } + + #[test] + fn test_runtime_spawn_blocking() { + let h = TestRuntime::new(); + let handle = h.rt.cpu().spawn_blocking(|| 1 + 1); + assert_eq!(h.block_on(handle).unwrap(), 2); } - #[tokio::test] - async fn test_tokio_spawn_blocking() { - let handle = spawn_blocking(|| 1 + 1); - assert_eq!(handle.await, 2); + #[test] + fn test_runtime_new_with_custom_runtime() { + let h = TestRuntime::new(); + let handle = h.rt.io().spawn(async { 42 }); + assert_eq!(h.block_on(handle).unwrap(), 42); + } + + #[test] + fn test_runtime_split_uses_separate_handles() { + let io_rt = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(), + ); + let cpu_rt = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(), + ); + let rt = Runtime::new_with_split(io_rt.clone(), cpu_rt.clone()); + // Spawn on each and confirm both are distinct live runtimes. We use + // `io_rt`/`cpu_rt` directly to `block_on` since our `Runtime` doesn't + // expose one. + let io_result = io_rt.block_on(async { rt.io().spawn(async { "io" }).await.unwrap() }); + let cpu_result = cpu_rt.block_on(async { rt.cpu().spawn(async { "cpu" }).await.unwrap() }); + assert_eq!(io_result, "io"); + assert_eq!(cpu_result, "cpu"); + } + + #[test] + fn test_runtime_clone() { + let h = TestRuntime::new(); + let rt2 = h.rt.clone(); + let handle = rt2.io().spawn(async { 5 }); + assert_eq!(h.block_on(handle).unwrap(), 5); + } + + #[test] + fn test_runtime_debug() { + let h = TestRuntime::new(); + let debug_str = format!("{:?}", h.rt); + assert!(debug_str.contains("Runtime")); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_try_current_in_runtime() { + let rt = Runtime::try_current().expect("should find current runtime"); + let result = rt.io().spawn(async { 7 }).await.unwrap(); + assert_eq!(result, 7); + } + + #[test] + fn test_try_current_outside_runtime() { + let err = Runtime::try_current().expect_err("must fail outside runtime"); + assert_eq!(err.kind(), ErrorKind::Unexpected); + } + + /// Verifies that when the caller drops the underlying tokio runtime, a + /// subsequent spawn surfaces as a `JoinError` via our `JoinHandle` rather + /// than hanging or misbehaving. + #[test] + fn test_spawn_after_runtime_drop_errors() { + let driver = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let owned = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(), + ); + let rt = Runtime::new(owned.clone()); + // Drop the caller's strong reference. `owned` was the only Arc. + drop(owned); + + // Spawning after shutdown returns a handle that resolves to a cancelled + // JoinError, which our wrapper maps to iceberg::Error. + let handle = rt.io().spawn(async { 1 }); + let result = driver.block_on(handle); + assert!(result.is_err(), "expected error after runtime shutdown"); } } diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 27f479183a..f6b9670fe7 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -38,7 +38,7 @@ use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluato use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name}; -use crate::runtime::spawn; +use crate::runtime::Runtime; use crate::spec::{DataContentType, SnapshotRef}; use crate::table::Table; use crate::util::available_parallelism; @@ -211,6 +211,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + runtime: self.table.runtime(), }); }; current_snapshot_id.clone() @@ -304,6 +305,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + runtime: self.table.runtime(), }) } } @@ -332,6 +334,8 @@ pub struct TableScan { row_group_filtering_enabled: bool, row_selection_enabled: bool, + + runtime: Runtime, } impl TableScan { @@ -353,7 +357,7 @@ impl TableScan { // used to stream the results back to the caller let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); - let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(); + let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(self.runtime.clone()); let manifest_list = plan_context.get_manifest_list().await?; @@ -368,9 +372,13 @@ impl TableScan { )?; let mut channel_for_manifest_error = file_scan_task_tx.clone(); + let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone(); + let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone(); + + let rt = self.runtime.clone(); // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s - spawn(async move { + rt.io().spawn(async move { let result = futures::stream::iter(manifest_file_contexts) .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move { ctx.fetch_manifest_and_stream_manifest_entries().await @@ -382,61 +390,83 @@ impl TableScan { } }); - let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone(); - let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone(); - // Process the delete file [`ManifestEntry`] stream in parallel - spawn(async move { - let result = manifest_entry_delete_ctx_rx - .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone()))) - .try_for_each_concurrent( - concurrency_limit_manifest_entries, - |(manifest_entry_context, tx)| async move { - spawn(async move { - Self::process_delete_manifest_entry(manifest_entry_context, tx).await - }) - .await - }, - ) - .await; - - if let Err(error) = result { - let _ = channel_for_delete_manifest_entry_error - .send(Err(error)) + { + let rt = rt.clone(); + let rt_inner = rt.clone(); + rt.cpu().spawn(async move { + let result = manifest_entry_delete_ctx_rx + .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone()))) + .try_for_each_concurrent( + concurrency_limit_manifest_entries, + |(manifest_entry_context, tx)| { + let rt_inner = rt_inner.clone(); + async move { + rt_inner + .cpu() + .spawn(async move { + Self::process_delete_manifest_entry( + manifest_entry_context, + tx, + ) + .await + }) + .await? + } + }, + ) .await; - } - }) - .await; + + if let Err(error) = result { + let _ = channel_for_delete_manifest_entry_error + .send(Err(error)) + .await; + } + }); + } // Process the data file [`ManifestEntry`] stream in parallel - spawn(async move { - let result = manifest_entry_data_ctx_rx - .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone()))) - .try_for_each_concurrent( - concurrency_limit_manifest_entries, - |(manifest_entry_context, tx)| async move { - spawn(async move { - Self::process_data_manifest_entry(manifest_entry_context, tx).await - }) - .await - }, - ) - .await; + { + let rt_inner = rt.clone(); + rt.cpu().spawn(async move { + let result = manifest_entry_data_ctx_rx + .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone()))) + .try_for_each_concurrent( + concurrency_limit_manifest_entries, + |(manifest_entry_context, tx)| { + let rt_inner = rt_inner.clone(); + async move { + rt_inner + .cpu() + .spawn(async move { + Self::process_data_manifest_entry( + manifest_entry_context, + tx, + ) + .await + }) + .await? + } + }, + ) + .await; - if let Err(error) = result { - let _ = channel_for_data_manifest_entry_error.send(Err(error)).await; - } - }); + if let Err(error) = result { + let _ = channel_for_data_manifest_entry_error.send(Err(error)).await; + } + }); + } Ok(file_scan_task_rx.boxed()) } /// Returns an [`ArrowRecordBatchStream`]. pub async fn to_arrow(&self) -> Result { - let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone()) - .with_data_file_concurrency_limit(self.concurrency_limit_data_files) - .with_row_group_filtering_enabled(self.row_group_filtering_enabled) - .with_row_selection_enabled(self.row_selection_enabled); + let mut arrow_reader_builder = + ArrowReaderBuilder::new(self.file_io.clone(), self.runtime.clone()) + .with_data_file_concurrency_limit(self.concurrency_limit_data_files) + .with_row_group_filtering_enabled(self.row_group_filtering_enabled) + .with_row_selection_enabled(self.row_selection_enabled); if let Some(batch_size) = self.batch_size { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); @@ -1200,8 +1230,8 @@ pub mod tests { } } - #[test] - fn test_table_scan_columns() { + #[tokio::test] + async fn test_table_scan_columns() { let table = TableTestFixture::new().table; let table_scan = table.scan().select(["x", "y"]).build().unwrap(); @@ -1219,8 +1249,8 @@ pub mod tests { assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names); } - #[test] - fn test_select_all() { + #[tokio::test] + async fn test_select_all() { let table = TableTestFixture::new().table; let table_scan = table.scan().select_all().build().unwrap(); @@ -1235,8 +1265,8 @@ pub mod tests { assert!(table_scan.is_err()); } - #[test] - fn test_table_scan_default_snapshot_id() { + #[tokio::test] + async fn test_table_scan_default_snapshot_id() { let table = TableTestFixture::new().table; let table_scan = table.scan().build().unwrap(); @@ -1254,8 +1284,8 @@ pub mod tests { assert!(table_scan.is_err()); } - #[test] - fn test_table_scan_with_snapshot_id() { + #[tokio::test] + async fn test_table_scan_with_snapshot_id() { let table = TableTestFixture::new().table; let table_scan = table @@ -1364,7 +1394,9 @@ pub mod tests { .unwrap(); assert_eq!(plan_task.len(), 2); - let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build(); + let reader = + ArrowReaderBuilder::new(fixture.table.file_io().clone(), fixture.table.runtime()) + .build(); let batch_stream = reader .clone() .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) @@ -1372,7 +1404,9 @@ pub mod tests { .stream(); let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap(); - let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build(); + let reader = + ArrowReaderBuilder::new(fixture.table.file_io().clone(), fixture.table.runtime()) + .build(); let batch_stream = reader .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) .unwrap() diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 56ddbd51ba..40755ec005 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -23,6 +23,7 @@ use crate::arrow::ArrowReaderBuilder; use crate::inspect::MetadataTable; use crate::io::FileIO; use crate::io::object_cache::ObjectCache; +use crate::runtime::Runtime; use crate::scan::TableScanBuilder; use crate::spec::{SchemaRef, TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; @@ -36,6 +37,7 @@ pub struct TableBuilder { readonly: bool, disable_cache: bool, cache_size_bytes: Option, + runtime: Option, } impl TableBuilder { @@ -48,6 +50,7 @@ impl TableBuilder { readonly: false, disable_cache: false, cache_size_bytes: None, + runtime: None, } } @@ -95,6 +98,12 @@ impl TableBuilder { self } + /// Set the Runtime for this table to use when spawning tasks. + pub fn runtime(mut self, runtime: Runtime) -> Self { + self.runtime = Some(runtime); + self + } + /// build the Table pub fn build(self) -> Result { let Self { @@ -105,6 +114,7 @@ impl TableBuilder { readonly, disable_cache, cache_size_bytes, + runtime, } = self; let Some(file_io) = file_io else { @@ -146,6 +156,7 @@ impl TableBuilder { identifier, readonly, object_cache, + runtime, }) } } @@ -159,6 +170,9 @@ pub struct Table { identifier: TableIdent, readonly: bool, object_cache: Arc, + /// Runtime explicitly attached at build time. `None` means "resolve from + /// the ambient tokio runtime on demand" + runtime: Option, } impl Table { @@ -230,6 +244,15 @@ impl Table { MetadataTable::new(self) } + /// Returns a resolved [`Runtime`] for this table. + /// + /// If a runtime was set via [`TableBuilder::runtime`], it is returned. + /// Otherwise, this borrows the ambient tokio runtime via + /// [`Runtime::current`], which panics if called outside a tokio context. + pub(crate) fn runtime(&self) -> Runtime { + self.runtime.clone().unwrap_or_else(Runtime::current) + } + /// Returns the flag indicating whether the `Table` is readonly or not pub fn readonly(&self) -> bool { self.readonly @@ -241,8 +264,11 @@ impl Table { } /// Create a reader for the table. + /// + /// Requires that a [`Runtime`] was set on the builder or that this is + /// called from within a tokio runtime context; panics otherwise. pub fn reader_builder(&self) -> ArrowReaderBuilder { - ArrowReaderBuilder::new(self.file_io.clone()) + ArrowReaderBuilder::new(self.file_io.clone(), self.runtime()) } } @@ -326,7 +352,7 @@ impl StaticTable { /// Create a reader for the table. pub fn reader_builder(&self) -> ArrowReaderBuilder { - ArrowReaderBuilder::new(self.0.file_io.clone()) + self.0.reader_builder() } }