diff --git a/docs/source/user-guide/latest/iceberg.md b/docs/source/user-guide/latest/iceberg.md index 4dbd085c22..24a4bda057 100644 --- a/docs/source/user-guide/latest/iceberg.md +++ b/docs/source/user-guide/latest/iceberg.md @@ -141,3 +141,9 @@ The following scenarios will fall back to Spark's native Iceberg reader: - Scans with residual filters using `truncate`, `bucket`, `year`, `month`, `day`, or `hour` transform functions (partition pruning still works, but row-level filtering of these transforms falls back) + +### Task input metrics + +The native Iceberg reader populates Spark's task-level `inputMetrics.bytesRead` (visible in the Spark UI Stages tab) using the `bytes_read` counter from iceberg-rust's `ScanMetrics`. This counter includes bytes read from both data files and delete files. + +Iceberg Java does not explicitly report `bytesRead` to Spark's task input metrics. On the iceberg Java path, any `bytesRead` value comes from Hadoop's filesystem-level I/O counters, not from Iceberg itself. Because Comet's native reader and the Hadoop filesystem use different counting mechanisms, the exact byte counts will differ between the two paths. diff --git a/native/Cargo.lock b/native/Cargo.lock index 0eed66b2ba..b7416c3bbe 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -3564,7 +3564,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.9.0" -source = "git+https://github.com/apache/iceberg-rust?rev=a2f067d#a2f067d0225d66ab88b8a18ec25b8a0953e35082" +source = "git+https://github.com/apache/iceberg-rust?rev=1ad4bfd#1ad4bfd39319508e79960d16dad1b1cdf965c5f4" dependencies = [ "aes-gcm", "anyhow", @@ -3619,7 +3619,7 @@ dependencies = [ [[package]] name = "iceberg-storage-opendal" version = "0.9.0" -source = "git+https://github.com/apache/iceberg-rust?rev=a2f067d#a2f067d0225d66ab88b8a18ec25b8a0953e35082" +source = "git+https://github.com/apache/iceberg-rust?rev=1ad4bfd#1ad4bfd39319508e79960d16dad1b1cdf965c5f4" dependencies = [ "anyhow", "async-trait", diff --git a/native/Cargo.toml b/native/Cargo.toml index a2d716e1fb..b026a93cd0 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -58,8 +58,8 @@ object_store = { version = "0.13.1", features = ["gcp", "azure", "aws", "http"] url = "2.2" aws-config = "1.8.16" aws-credential-types = "1.2.13" -iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "a2f067d" } -iceberg-storage-opendal = { git = "https://github.com/apache/iceberg-rust", rev = "a2f067d", features = ["opendal-all"] } +iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "1ad4bfd" } +iceberg-storage-opendal = { git = "https://github.com/apache/iceberg-rust", rev = "1ad4bfd", features = ["opendal-all"] } [profile.release] debug = true diff --git a/native/core/src/execution/operators/iceberg_scan.rs b/native/core/src/execution/operators/iceberg_scan.rs index d217ebc34b..55bcbef349 100644 --- a/native/core/src/execution/operators/iceberg_scan.rs +++ b/native/core/src/execution/operators/iceberg_scan.rs @@ -38,6 +38,7 @@ use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, }; use futures::{Stream, StreamExt, TryStreamExt}; +use iceberg::arrow::ScanMetrics; use iceberg::io::{FileIO, FileIOBuilder, StorageFactory}; use iceberg_storage_opendal::OpenDalStorageFactory; @@ -171,10 +172,13 @@ impl IcebergScanExec { // Pass all tasks to iceberg-rust at once to utilize its flatten_unordered // parallelization, avoiding overhead of single-task streams - let stream = reader.read(task_stream).map_err(|e| { + let scan_result = reader.read(task_stream).map_err(|e| { DataFusionError::Execution(format!("Failed to read Iceberg tasks: {}", e)) })?; + let scan_metrics = scan_result.metrics().clone(); + let stream = scan_result.stream(); + let spark_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); let adapter_factory = SparkPhysicalExprAdapterFactory::new(spark_options, None); @@ -187,6 +191,9 @@ impl IcebergScanExec { adapter_factory, cached: None, baseline_metrics: metrics.baseline, + scan_metrics, + bytes_scanned: metrics.bytes_scanned, + last_reported_bytes: 0, }; Ok(Box::pin(wrapped_stream)) @@ -201,7 +208,6 @@ impl IcebergScanExec { match scheme { "file" => Ok(Arc::new(OpenDalStorageFactory::Fs)), "s3" | "s3a" => Ok(Arc::new(OpenDalStorageFactory::S3 { - configured_scheme: scheme.to_string(), customized_credential_load: None, })), "gs" => Ok(Arc::new(OpenDalStorageFactory::Gcs)), @@ -233,6 +239,8 @@ struct IcebergScanMetrics { baseline: BaselineMetrics, /// Count of file splits (FileScanTasks) processed num_splits: Count, + /// Total bytes read from storage + bytes_scanned: Count, } impl IcebergScanMetrics { @@ -240,6 +248,7 @@ impl IcebergScanMetrics { Self { baseline: BaselineMetrics::new(metrics, 0), num_splits: MetricBuilder::new(metrics).counter("num_splits", 0), + bytes_scanned: MetricBuilder::new(metrics).counter("bytes_scanned", 0), } } } @@ -257,6 +266,12 @@ struct IcebergStreamWrapper { cached: Option, /// Metrics for output tracking baseline_metrics: BaselineMetrics, + /// Iceberg scan metrics for bytes read tracking + scan_metrics: ScanMetrics, + /// DF metric counter bridging iceberg-rust's bytes_read to the metric tree + bytes_scanned: Count, + /// Last reported bytes_read value for delta computation + last_reported_bytes: u64, } /// Cached projection state: file schema, adapter, and pre-built projection expressions. @@ -314,6 +329,14 @@ where other => other, }; + // Bridge iceberg-rust's live AtomicU64 counter into the DF metric tree + let current = self.scan_metrics.bytes_read(); + let delta = current - self.last_reported_bytes; + if delta > 0 { + self.bytes_scanned.add(delta as usize); + self.last_reported_bytes = current; + } + self.baseline_metrics.record_poll(result) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala index 36085b6329..9e0e12e178 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala @@ -21,6 +21,7 @@ package org.apache.spark.sql.comet import scala.jdk.CollectionConverters._ +import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, DynamicPruningExpression, SortOrder} @@ -272,7 +273,8 @@ case class CometIcebergNativeScanExec( override lazy val metrics: Map[String, SQLMetric] = { val baseMetrics = Map( - "output_rows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + "output_rows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "bytes_scanned" -> SQLMetrics.createSizeMetric(sparkContext, "number of bytes scanned")) // Create IMMUTABLE metrics with captured values AND types // these won't be affected by accumulator merges @@ -296,16 +298,22 @@ case class CometIcebergNativeScanExec( override def doExecuteColumnar(): RDD[ColumnarBatch] = { val nativeMetrics = CometMetricNode.fromCometPlan(this) val serializedPlan = CometExec.serializeNativePlan(nativeOp) - CometExecRDD( + new CometExecRDD( sparkContext, inputRDDs = Seq.empty, commonByKey = Map(metadataLocation -> commonData), perPartitionByKey = Map(metadataLocation -> perPartitionData), serializedPlan = serializedPlan, - numPartitions = perPartitionData.length, + defaultNumPartitions = perPartitionData.length, numOutputCols = output.length, nativeMetrics = nativeMetrics, - subqueries = Seq.empty) + subqueries = Seq.empty) { + override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { + val res = super.compute(split, context) + Option(context).foreach(nativeMetrics.reportScanInputMetrics) + res + } + } } /** diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala index 62c8844f72..6d870583f6 100644 --- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala @@ -22,8 +22,11 @@ package org.apache.comet import java.io.File import java.nio.file.Files +import scala.collection.mutable import scala.jdk.CollectionConverters._ +import org.apache.spark.CometListenerBusUtils +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.comet.CometIcebergNativeScanExec import org.apache.spark.sql.execution.SparkPlan @@ -548,6 +551,69 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { } } + test("bytes_scanned includes delete file I/O") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE test_cat.db.delete_bytes_test ( + id INT, + value DOUBLE + ) USING iceberg + TBLPROPERTIES ( + 'write.delete.mode' = 'merge-on-read', + 'write.merge.mode' = 'merge-on-read' + ) + """) + + spark + .range(1000) + .selectExpr("CAST(id AS INT)", "CAST(id * 1.5 AS DOUBLE) as value") + .coalesce(1) + .write + .format("iceberg") + .mode("append") + .saveAsTable("test_cat.db.delete_bytes_test") + + // Scan before deletes: data files only + val dfBefore = spark.sql("SELECT * FROM test_cat.db.delete_bytes_test") + val scanBefore = dfBefore.queryExecution.executedPlan + .collectLeaves() + .collect { case s: CometIcebergNativeScanExec => s } + .head + dfBefore.collect() + val bytesBefore = scanBefore.metrics("bytes_scanned").value + assert(bytesBefore > 0, s"bytes_scanned before deletes should be > 0, got $bytesBefore") + + // Create position delete files + spark.sql("DELETE FROM test_cat.db.delete_bytes_test WHERE id < 100") + + // Scan after deletes: data files + delete files + val dfAfter = spark.sql("SELECT * FROM test_cat.db.delete_bytes_test") + val scanAfter = dfAfter.queryExecution.executedPlan + .collectLeaves() + .collect { case s: CometIcebergNativeScanExec => s } + .head + dfAfter.collect() + val bytesAfter = scanAfter.metrics("bytes_scanned").value + + assert( + bytesAfter > bytesBefore, + s"bytes_scanned should increase after deletes: before=$bytesBefore, after=$bytesAfter") + + spark.sql("DROP TABLE test_cat.db.delete_bytes_test") + } + } + } + test("MOR table with EQUALITY deletes - verify deletes are applied") { assume(icebergAvailable, "Iceberg not available in classpath") @@ -1445,6 +1511,9 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { assert(metrics("output_rows").value == 10000) assert(metrics("num_splits").value > 0) + assert( + metrics("bytes_scanned").value > 0, + "bytes_scanned should be > 0 after reading data files") // ImmutableSQLMetric prevents these from being reset to 0 after execution assert( metrics("totalDataManifest").value > 0, @@ -2764,4 +2833,103 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { } } } + + test("task-level inputMetrics.bytesRead is populated for Iceberg native scan") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE test_cat.db.task_metrics_test ( + id INT, + value DOUBLE + ) USING iceberg + """) + + spark + .range(10000) + .selectExpr("CAST(id AS INT)", "CAST(id * 1.5 AS DOUBLE) as value") + .repartition(5) + .write + .format("iceberg") + .mode("append") + .saveAsTable("test_cat.db.task_metrics_test") + + val bytesReadValues = mutable.ArrayBuffer.empty[Long] + val recordsReadValues = mutable.ArrayBuffer.empty[Long] + + val listener = new SparkListener { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + val im = taskEnd.taskMetrics.inputMetrics + if (im.bytesRead > 0) { + bytesReadValues.synchronized { + bytesReadValues += im.bytesRead + recordsReadValues += im.recordsRead + } + } + } + } + spark.sparkContext.addSparkListener(listener) + + try { + val query = "SELECT * FROM test_cat.db.task_metrics_test" + + // Same drain-run-drain pattern as CometTaskMetricsSuite's shuffle test + CometListenerBusUtils.waitUntilEmpty(spark.sparkContext) + + // Baseline: iceberg-Java scan (Comet native disabled) + withSQLConf(CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "false") { + bytesReadValues.clear() + recordsReadValues.clear() + spark.sql(query).collect() + CometListenerBusUtils.waitUntilEmpty(spark.sparkContext) + } + val sparkBytes = bytesReadValues.sum + val sparkRecords = recordsReadValues.sum + + // Comet native Iceberg scan + bytesReadValues.clear() + recordsReadValues.clear() + val df = spark.sql(query) + + val scanNodes = df.queryExecution.executedPlan + .collectLeaves() + .collect { case s: CometIcebergNativeScanExec => s } + assert(scanNodes.nonEmpty, "Expected CometIcebergNativeScanExec in plan") + + df.collect() + CometListenerBusUtils.waitUntilEmpty(spark.sparkContext) + + val cometBytes = bytesReadValues.sum + val cometRecords = recordsReadValues.sum + + // Both paths should report metrics + assert(sparkBytes > 0, s"Spark bytesRead should be > 0, got $sparkBytes") + assert(sparkRecords > 0, s"Spark recordsRead should be > 0, got $sparkRecords") + assert(cometBytes > 0, s"Comet bytesRead should be > 0, got $cometBytes") + assert(cometRecords > 0, s"Comet recordsRead should be > 0, got $cometRecords") + + assert( + cometRecords == sparkRecords, + s"recordsRead mismatch: comet=$cometRecords, spark=$sparkRecords") + + // SQL-level metric should match task-level metric + val sqlBytes = scanNodes.head.metrics("bytes_scanned").value + assert( + sqlBytes == cometBytes, + s"SQL bytes_scanned ($sqlBytes) should match task bytesRead ($cometBytes)") + } finally { + spark.sparkContext.removeSparkListener(listener) + spark.sql("DROP TABLE test_cat.db.task_metrics_test") + } + } + } + } } diff --git a/spark/src/test/scala/org/apache/spark/CometListenerBusUtils.scala b/spark/src/test/scala/org/apache/spark/CometListenerBusUtils.scala new file mode 100644 index 0000000000..99016db44a --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/CometListenerBusUtils.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark + +object CometListenerBusUtils { + + /** + * Blocks until the SparkContext's listener bus has drained all pending events. Exposes the + * package-private `listenerBus.waitUntilEmpty()` to tests outside the `org.apache.spark` + * package. + */ + def waitUntilEmpty(sc: SparkContext): Unit = sc.listenerBus.waitUntilEmpty() +}