From 0506856a2f18b9394eb37eac926e2c25094e4ebe Mon Sep 17 00:00:00 2001 From: StandingMan Date: Thu, 25 Dec 2025 15:53:04 +0800 Subject: [PATCH 1/2] feat: make FanoutWriter writer configurable Signed-off-by: StandingMan --- crates/iceberg/src/spec/table_properties.rs | 12 +++++++++ .../datafusion/src/physical_plan/write.rs | 25 +++++++++++++++++-- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index 4975456010..413604f51c 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -49,6 +49,8 @@ pub struct TableProperties { pub write_format_default: String, /// The target file size for files. pub write_target_file_size_bytes: usize, + /// Whether to use `FanoutWriter` for partitioned tables. + pub write_datafusion_fanout_enabled: bool, } impl TableProperties { @@ -137,6 +139,11 @@ impl TableProperties { pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes"; /// Default target file size pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB + /// Whether to use `FanoutWriter` for partitioned tables (handles unsorted data). + /// If false, uses `ClusteredWriter` (requires sorted data, more memory efficient). + pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = "write.datafusion.fanout.enabled"; + /// Default value for fanout writer enabled + pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true; } impl TryFrom<&HashMap> for TableProperties { @@ -175,6 +182,11 @@ impl TryFrom<&HashMap> for TableProperties { TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES, TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, )?, + write_datafusion_fanout_enabled: parse_property( + props, + TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED, + TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT, + )?, }) } } diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 9eb53c235f..025e4597ea 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -44,6 +44,7 @@ use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; +use iceberg::writer::partitioning::fanout_writer; use iceberg::{Error, ErrorKind}; use parquet::file::properties::WriterProperties; use uuid::Uuid; @@ -266,8 +267,28 @@ impl ExecutionPlan for IcebergWriteExec { let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); // Create TaskWriter - // TODO: Make fanout_enabled configurable via table properties - let fanout_enabled = true; + let fanout_enabled = self + .table + .metadata() + .properties() + .get(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED) + .map(|value| { + value + .parse::() + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid value for {}, expected 'true' or 'false'", + TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED + ), + ) + .with_source(e) + }) + .map_err(to_datafusion_error) + }) + .transpose()? + .unwrap_or(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT); let schema = self.table.metadata().current_schema().clone(); let partition_spec = self.table.metadata().default_partition_spec().clone(); let task_writer = TaskWriter::try_new( From f636b42a65bea6717db01eec5d2827d4aef7caa8 Mon Sep 17 00:00:00 2001 From: StandingMan Date: Thu, 25 Dec 2025 16:01:48 +0800 Subject: [PATCH 2/2] chore: fix clippy error Signed-off-by: StandingMan --- crates/integrations/datafusion/src/physical_plan/write.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 025e4597ea..fdfddf877b 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -44,7 +44,6 @@ use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; -use iceberg::writer::partitioning::fanout_writer; use iceberg::{Error, ErrorKind}; use parquet::file::properties::WriterProperties; use uuid::Uuid;