From ed09459e186ca34dba7d6f008600e001c9e716a2 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 24 Feb 2026 16:23:57 -0500 Subject: [PATCH] Extract changes from reader_perf branch. --- .../src/main/scala/org/apache/comet/CometConf.scala | 11 +++++++++++ native/core/src/execution/operators/iceberg_scan.rs | 6 +++++- native/core/src/execution/planner.rs | 2 ++ native/proto/src/proto/operator.proto | 3 +++ .../comet/serde/operator/CometIcebergNativeScan.scala | 4 +++- 5 files changed, 24 insertions(+), 2 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 480eafdcb7..5ee777f3d7 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -148,6 +148,17 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_ICEBERG_DATA_FILE_CONCURRENCY_LIMIT: ConfigEntry[Int] = + conf("spark.comet.scan.icebergNative.dataFileConcurrencyLimit") + .category(CATEGORY_SCAN) + .doc( + "The number of Iceberg data files to read concurrently within a single task. " + + "Higher values improve throughput for tables with many small files by overlapping " + + "I/O latency, but increase memory usage. Values between 2 and 8 are suggested.") + .intConf + .checkValue(v => v > 0, "Data file concurrency limit must be positive") + .createWithDefault(1) + val COMET_CSV_V2_NATIVE_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.scan.csv.v2.enabled") .category(CATEGORY_TESTING) diff --git a/native/core/src/execution/operators/iceberg_scan.rs b/native/core/src/execution/operators/iceberg_scan.rs index bc20592e90..39ce25002b 100644 --- a/native/core/src/execution/operators/iceberg_scan.rs +++ b/native/core/src/execution/operators/iceberg_scan.rs @@ -61,6 +61,8 @@ pub struct IcebergScanExec { catalog_properties: HashMap, /// Pre-planned file scan tasks tasks: Vec, + /// Number of data files to read concurrently + data_file_concurrency_limit: usize, /// Metrics metrics: ExecutionPlanMetricsSet, } @@ -71,6 +73,7 @@ impl IcebergScanExec { schema: SchemaRef, catalog_properties: HashMap, tasks: Vec, + data_file_concurrency_limit: usize, ) -> Result { let output_schema = schema; let plan_properties = Self::compute_properties(Arc::clone(&output_schema), 1); @@ -83,6 +86,7 @@ impl IcebergScanExec { plan_properties, catalog_properties, tasks, + data_file_concurrency_limit, metrics, }) } @@ -158,7 +162,7 @@ impl IcebergScanExec { let reader = iceberg::arrow::ArrowReaderBuilder::new(file_io) .with_batch_size(batch_size) - .with_data_file_concurrency_limit(context.session_config().target_partitions()) + .with_data_file_concurrency_limit(self.data_file_concurrency_limit) .with_row_selection_enabled(true) .build(); diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index f84d6cc590..ef81cdfbfa 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1176,12 +1176,14 @@ impl PhysicalPlanner { .collect(); let metadata_location = common.metadata_location.clone(); let tasks = parse_file_scan_tasks_from_common(common, &scan.file_scan_tasks)?; + let data_file_concurrency_limit = common.data_file_concurrency_limit as usize; let iceberg_scan = IcebergScanExec::new( metadata_location, required_schema, catalog_properties, tasks, + data_file_concurrency_limit, )?; Ok(( diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 93872b462c..bf2752bdd0 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -175,6 +175,9 @@ message IcebergScanCommon { repeated PartitionData partition_data_pool = 9; repeated DeleteFileList delete_files_pool = 10; repeated spark.spark_expression.Expr residual_pool = 11; + + // Number of data files to read concurrently within a single task + uint32 data_file_concurrency_limit = 12; } message IcebergScan { diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index c86b2a51bb..9f1a015996 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeExec} import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceRDD, DataSourceRDDPartition} import org.apache.spark.sql.types._ -import org.apache.comet.ConfigEntry +import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection} import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.ExprOuterClass.Expr @@ -757,6 +757,8 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit var totalTasks = 0 commonBuilder.setMetadataLocation(metadata.metadataLocation) + commonBuilder.setDataFileConcurrencyLimit( + CometConf.COMET_ICEBERG_DATA_FILE_CONCURRENCY_LIMIT.get()) metadata.catalogProperties.foreach { case (key, value) => commonBuilder.putCatalogProperties(key, value) }