Skip to content
21 changes: 19 additions & 2 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
TableCommit, TableCreation, TableIdent,
Runtime, TableCommit, TableCreation, TableIdent,
};
use iceberg_storage_opendal::OpenDalStorageFactory;

Expand All @@ -58,6 +58,7 @@ pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
pub struct GlueCatalogBuilder {
config: GlueCatalogConfig,
storage_factory: Option<Arc<dyn StorageFactory>>,
runtime: Option<Runtime>,
}

impl Default for GlueCatalogBuilder {
Expand All @@ -71,6 +72,7 @@ impl Default for GlueCatalogBuilder {
props: HashMap::new(),
},
storage_factory: None,
runtime: None,
}
}
}
Expand All @@ -83,6 +85,11 @@ impl CatalogBuilder for GlueCatalogBuilder {
self
}

fn with_runtime(mut self, runtime: Runtime) -> Self {
self.runtime = Some(runtime);
self
}

fn load(
mut self,
name: impl Into<String>,
Expand Down Expand Up @@ -129,7 +136,11 @@ impl CatalogBuilder for GlueCatalogBuilder {
));
}

GlueCatalog::new(self.config, self.storage_factory).await
let runtime = match self.runtime {
Some(rt) => rt,
None => Runtime::try_current()?,
};
GlueCatalog::new(self.config, self.storage_factory, runtime).await
}
}
}
Expand All @@ -151,6 +162,7 @@ pub struct GlueCatalog {
config: GlueCatalogConfig,
client: GlueClient,
file_io: FileIO,
runtime: Runtime,
}

impl Debug for GlueCatalog {
Expand All @@ -166,6 +178,7 @@ impl GlueCatalog {
async fn new(
config: GlueCatalogConfig,
storage_factory: Option<Arc<dyn StorageFactory>>,
runtime: Runtime,
) -> Result<Self> {
let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
let mut file_io_props = config.props.clone();
Expand Down Expand Up @@ -214,6 +227,7 @@ impl GlueCatalog {
config,
client: GlueClient(client),
file_io,
runtime,
})
}
/// Get the catalogs `FileIO`
Expand Down Expand Up @@ -272,6 +286,7 @@ impl GlueCatalog {
NamespaceIdent::new(db_name),
table_name.to_owned(),
))
.runtime(self.runtime.clone())
.build()?;

Ok((table, version_id))
Expand Down Expand Up @@ -611,6 +626,7 @@ impl Catalog for GlueCatalog {
.metadata_location(metadata_location_str)
.metadata(metadata)
.identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
.runtime(self.runtime.clone())
.build()
}

Expand Down Expand Up @@ -845,6 +861,7 @@ impl Catalog for GlueCatalog {
.metadata_location(metadata_location)
.metadata(metadata)
.file_io(self.file_io())
.runtime(self.runtime.clone())
.build()?)
}

Expand Down
43 changes: 30 additions & 13 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
TableCommit, TableCreation, TableIdent,
Runtime, TableCommit, TableCreation, TableIdent,
};
use volo_thrift::MaybeException;

Expand All @@ -56,6 +56,7 @@ pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
pub struct HmsCatalogBuilder {
config: HmsCatalogConfig,
storage_factory: Option<Arc<dyn StorageFactory>>,
runtime: Option<Runtime>,
}

impl Default for HmsCatalogBuilder {
Expand All @@ -69,6 +70,7 @@ impl Default for HmsCatalogBuilder {
props: HashMap::new(),
},
storage_factory: None,
runtime: None,
}
}
}
Expand All @@ -81,6 +83,11 @@ impl CatalogBuilder for HmsCatalogBuilder {
self
}

fn with_runtime(mut self, runtime: Runtime) -> Self {
self.runtime = Some(runtime);
self
}

fn load(
mut self,
name: impl Into<String>,
Expand Down Expand Up @@ -116,26 +123,31 @@ impl CatalogBuilder for HmsCatalogBuilder {
})
.collect();

let result = {
let result = (|| -> Result<HmsCatalog> {
if self.config.name.is_none() {
Err(Error::new(
return Err(Error::new(
ErrorKind::DataInvalid,
"Catalog name is required",
))
} else if self.config.address.is_empty() {
Err(Error::new(
));
}
if self.config.address.is_empty() {
return Err(Error::new(
ErrorKind::DataInvalid,
"Catalog address is required",
))
} else if self.config.warehouse.is_empty() {
Err(Error::new(
));
}
if self.config.warehouse.is_empty() {
return Err(Error::new(
ErrorKind::DataInvalid,
"Catalog warehouse is required",
))
} else {
HmsCatalog::new(self.config, self.storage_factory)
));
}
};
let runtime = match self.runtime {
Some(rt) => rt,
None => Runtime::try_current()?,
};
HmsCatalog::new(self.config, self.storage_factory, runtime)
})();

std::future::ready(result)
}
Expand Down Expand Up @@ -169,6 +181,7 @@ pub struct HmsCatalog {
config: HmsCatalogConfig,
client: HmsClient,
file_io: FileIO,
runtime: Runtime,
}

impl Debug for HmsCatalog {
Expand All @@ -184,6 +197,7 @@ impl HmsCatalog {
fn new(
config: HmsCatalogConfig,
storage_factory: Option<Arc<dyn StorageFactory>>,
runtime: Runtime,
) -> Result<Self> {
let address = config
.address
Expand Down Expand Up @@ -223,6 +237,7 @@ impl HmsCatalog {
config,
client: HmsClient(client),
file_io,
runtime,
})
}
/// Get the catalogs `FileIO`
Expand Down Expand Up @@ -529,6 +544,7 @@ impl Catalog for HmsCatalog {
.metadata_location(metadata_location_str)
.metadata(metadata)
.identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
.runtime(self.runtime.clone())
.build()
}

Expand Down Expand Up @@ -567,6 +583,7 @@ impl Catalog for HmsCatalog {
NamespaceIdent::new(db_name),
table.name.clone(),
))
.runtime(self.runtime.clone())
.build()
}

Expand Down
Loading
Loading