From 57c6fb22cb8918539dbe18212394365317e24a9b Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 28 Apr 2026 11:03:11 -0400 Subject: [PATCH 1/5] Bump iceberg-rust version, add bytes metrics and test at node-level. --- native/Cargo.lock | 4 +-- native/Cargo.toml | 4 +-- .../src/execution/operators/iceberg_scan.rs | 27 +++++++++++++++++-- .../comet/CometIcebergNativeScanExec.scala | 16 ++++++++--- .../comet/CometIcebergNativeSuite.scala | 3 +++ 5 files changed, 44 insertions(+), 10 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 0eed66b2ba..b7416c3bbe 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -3564,7 +3564,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.9.0" -source = "git+https://github.com/apache/iceberg-rust?rev=a2f067d#a2f067d0225d66ab88b8a18ec25b8a0953e35082" +source = "git+https://github.com/apache/iceberg-rust?rev=1ad4bfd#1ad4bfd39319508e79960d16dad1b1cdf965c5f4" dependencies = [ "aes-gcm", "anyhow", @@ -3619,7 +3619,7 @@ dependencies = [ [[package]] name = "iceberg-storage-opendal" version = "0.9.0" -source = "git+https://github.com/apache/iceberg-rust?rev=a2f067d#a2f067d0225d66ab88b8a18ec25b8a0953e35082" +source = "git+https://github.com/apache/iceberg-rust?rev=1ad4bfd#1ad4bfd39319508e79960d16dad1b1cdf965c5f4" dependencies = [ "anyhow", "async-trait", diff --git a/native/Cargo.toml b/native/Cargo.toml index a2d716e1fb..b026a93cd0 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -58,8 +58,8 @@ object_store = { version = "0.13.1", features = ["gcp", "azure", "aws", "http"] url = "2.2" aws-config = "1.8.16" aws-credential-types = "1.2.13" -iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "a2f067d" } -iceberg-storage-opendal = { git = "https://github.com/apache/iceberg-rust", rev = "a2f067d", features = ["opendal-all"] } +iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "1ad4bfd" } +iceberg-storage-opendal = { git = "https://github.com/apache/iceberg-rust", rev = "1ad4bfd", features = ["opendal-all"] } [profile.release] debug = true diff --git a/native/core/src/execution/operators/iceberg_scan.rs b/native/core/src/execution/operators/iceberg_scan.rs index d217ebc34b..55bcbef349 100644 --- a/native/core/src/execution/operators/iceberg_scan.rs +++ b/native/core/src/execution/operators/iceberg_scan.rs @@ -38,6 +38,7 @@ use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, }; use futures::{Stream, StreamExt, TryStreamExt}; +use iceberg::arrow::ScanMetrics; use iceberg::io::{FileIO, FileIOBuilder, StorageFactory}; use iceberg_storage_opendal::OpenDalStorageFactory; @@ -171,10 +172,13 @@ impl IcebergScanExec { // Pass all tasks to iceberg-rust at once to utilize its flatten_unordered // parallelization, avoiding overhead of single-task streams - let stream = reader.read(task_stream).map_err(|e| { + let scan_result = reader.read(task_stream).map_err(|e| { DataFusionError::Execution(format!("Failed to read Iceberg tasks: {}", e)) })?; + let scan_metrics = scan_result.metrics().clone(); + let stream = scan_result.stream(); + let spark_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); let adapter_factory = SparkPhysicalExprAdapterFactory::new(spark_options, None); @@ -187,6 +191,9 @@ impl IcebergScanExec { adapter_factory, cached: None, baseline_metrics: metrics.baseline, + scan_metrics, + bytes_scanned: metrics.bytes_scanned, + last_reported_bytes: 0, }; Ok(Box::pin(wrapped_stream)) @@ -201,7 +208,6 @@ impl IcebergScanExec { match scheme { "file" => Ok(Arc::new(OpenDalStorageFactory::Fs)), "s3" | "s3a" => Ok(Arc::new(OpenDalStorageFactory::S3 { - configured_scheme: scheme.to_string(), customized_credential_load: None, })), "gs" => Ok(Arc::new(OpenDalStorageFactory::Gcs)), @@ -233,6 +239,8 @@ struct IcebergScanMetrics { baseline: BaselineMetrics, /// Count of file splits (FileScanTasks) processed num_splits: Count, + /// Total bytes read from storage + bytes_scanned: Count, } impl IcebergScanMetrics { @@ -240,6 +248,7 @@ impl IcebergScanMetrics { Self { baseline: BaselineMetrics::new(metrics, 0), num_splits: MetricBuilder::new(metrics).counter("num_splits", 0), + bytes_scanned: MetricBuilder::new(metrics).counter("bytes_scanned", 0), } } } @@ -257,6 +266,12 @@ struct IcebergStreamWrapper { cached: Option, /// Metrics for output tracking baseline_metrics: BaselineMetrics, + /// Iceberg scan metrics for bytes read tracking + scan_metrics: ScanMetrics, + /// DF metric counter bridging iceberg-rust's bytes_read to the metric tree + bytes_scanned: Count, + /// Last reported bytes_read value for delta computation + last_reported_bytes: u64, } /// Cached projection state: file schema, adapter, and pre-built projection expressions. @@ -314,6 +329,14 @@ where other => other, }; + // Bridge iceberg-rust's live AtomicU64 counter into the DF metric tree + let current = self.scan_metrics.bytes_read(); + let delta = current - self.last_reported_bytes; + if delta > 0 { + self.bytes_scanned.add(delta as usize); + self.last_reported_bytes = current; + } + self.baseline_metrics.record_poll(result) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala index 36085b6329..9e0e12e178 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala @@ -21,6 +21,7 @@ package org.apache.spark.sql.comet import scala.jdk.CollectionConverters._ +import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, DynamicPruningExpression, SortOrder} @@ -272,7 +273,8 @@ case class CometIcebergNativeScanExec( override lazy val metrics: Map[String, SQLMetric] = { val baseMetrics = Map( - "output_rows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + "output_rows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "bytes_scanned" -> SQLMetrics.createSizeMetric(sparkContext, "number of bytes scanned")) // Create IMMUTABLE metrics with captured values AND types // these won't be affected by accumulator merges @@ -296,16 +298,22 @@ case class CometIcebergNativeScanExec( override def doExecuteColumnar(): RDD[ColumnarBatch] = { val nativeMetrics = CometMetricNode.fromCometPlan(this) val serializedPlan = CometExec.serializeNativePlan(nativeOp) - CometExecRDD( + new CometExecRDD( sparkContext, inputRDDs = Seq.empty, commonByKey = Map(metadataLocation -> commonData), perPartitionByKey = Map(metadataLocation -> perPartitionData), serializedPlan = serializedPlan, - numPartitions = perPartitionData.length, + defaultNumPartitions = perPartitionData.length, numOutputCols = output.length, nativeMetrics = nativeMetrics, - subqueries = Seq.empty) + subqueries = Seq.empty) { + override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { + val res = super.compute(split, context) + Option(context).foreach(nativeMetrics.reportScanInputMetrics) + res + } + } } /** diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala index 62c8844f72..5a1468b9b3 100644 --- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala @@ -1445,6 +1445,9 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { assert(metrics("output_rows").value == 10000) assert(metrics("num_splits").value > 0) + assert( + metrics("bytes_scanned").value > 0, + "bytes_scanned should be > 0 after reading data files") // ImmutableSQLMetric prevents these from being reset to 0 after execution assert( metrics("totalDataManifest").value > 0, From ecb514f22b37bae02c5d7a398328bdb406398074 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 28 Apr 2026 11:15:04 -0400 Subject: [PATCH 2/5] test task-level metrics. --- .../comet/CometIcebergNativeSuite.scala | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala index 5a1468b9b3..993d4b291f 100644 --- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala @@ -22,8 +22,10 @@ package org.apache.comet import java.io.File import java.nio.file.Files +import scala.collection.mutable import scala.jdk.CollectionConverters._ +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.comet.CometIcebergNativeScanExec import org.apache.spark.sql.execution.SparkPlan @@ -2767,4 +2769,82 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { } } } + + test("task-level inputMetrics.bytesRead is populated for Iceberg native scan") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE test_cat.db.task_metrics_test ( + id INT, + value DOUBLE + ) USING iceberg + """) + + spark + .range(10000) + .selectExpr("CAST(id AS INT)", "CAST(id * 1.5 AS DOUBLE) as value") + .coalesce(1) + .write + .format("iceberg") + .mode("append") + .saveAsTable("test_cat.db.task_metrics_test") + + val bytesReadValues = mutable.ArrayBuffer.empty[Long] + val recordsReadValues = mutable.ArrayBuffer.empty[Long] + + val listener = new SparkListener { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + val im = taskEnd.taskMetrics.inputMetrics + if (im.bytesRead > 0) { + bytesReadValues.synchronized { + bytesReadValues += im.bytesRead + recordsReadValues += im.recordsRead + } + } + } + } + spark.sparkContext.addSparkListener(listener) + + try { + val df = spark.sql("SELECT * FROM test_cat.db.task_metrics_test") + + val scanNodes = df.queryExecution.executedPlan + .collectLeaves() + .collect { case s: CometIcebergNativeScanExec => s } + assert(scanNodes.nonEmpty, "Expected CometIcebergNativeScanExec in plan") + + df.collect() + + // listenerBus.waitUntilEmpty() is package-private to org.apache.spark + Thread.sleep(1000) + + val totalBytes = bytesReadValues.sum + val totalRecords = recordsReadValues.sum + + assert(totalBytes > 0, s"task inputMetrics.bytesRead should be > 0, got $totalBytes") + assert( + totalRecords == 10000, + s"task inputMetrics.recordsRead should be 10000, got $totalRecords") + + // SQL-level metric should match task-level metric + val sqlBytes = scanNodes.head.metrics("bytes_scanned").value + assert( + sqlBytes == totalBytes, + s"SQL bytes_scanned ($sqlBytes) should match task bytesRead ($totalBytes)") + } finally { + spark.sparkContext.removeSparkListener(listener) + spark.sql("DROP TABLE test_cat.db.task_metrics_test") + } + } + } + } } From 222ba1e80463916a522044159a8145f37278a425 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 28 Apr 2026 12:34:13 -0400 Subject: [PATCH 3/5] Address PR feedback. --- docs/source/user-guide/latest/iceberg.md | 4 +++ .../comet/CometIcebergNativeSuite.scala | 5 ++-- .../apache/spark/CometListenerBusUtils.scala | 30 +++++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) create mode 100644 spark/src/test/scala/org/apache/spark/CometListenerBusUtils.scala diff --git a/docs/source/user-guide/latest/iceberg.md b/docs/source/user-guide/latest/iceberg.md index 4dbd085c22..4261e502fb 100644 --- a/docs/source/user-guide/latest/iceberg.md +++ b/docs/source/user-guide/latest/iceberg.md @@ -141,3 +141,7 @@ The following scenarios will fall back to Spark's native Iceberg reader: - Scans with residual filters using `truncate`, `bucket`, `year`, `month`, `day`, or `hour` transform functions (partition pruning still works, but row-level filtering of these transforms falls back) + +### Task input metrics + +The native Iceberg reader populates Spark's task-level `inputMetrics.bytesRead` (visible in the Spark UI Stages tab) using the `bytes_read` counter from iceberg-rust's `ScanMetrics`. This counter includes bytes read from both data files and delete files. Iceberg Java's equivalent metric reports data-file bytes only, so `bytesRead` will diverge from the Iceberg Java path for Merge-On-Read tables with position or equality deletes. diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala index 993d4b291f..77c01c85db 100644 --- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala @@ -25,6 +25,7 @@ import java.nio.file.Files import scala.collection.mutable import scala.jdk.CollectionConverters._ +import org.apache.spark.CometListenerBusUtils import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.comet.CometIcebergNativeScanExec @@ -2824,8 +2825,8 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { df.collect() - // listenerBus.waitUntilEmpty() is package-private to org.apache.spark - Thread.sleep(1000) + // Drain listener events so onTaskEnd has fired before we assert + CometListenerBusUtils.waitUntilEmpty(spark.sparkContext) val totalBytes = bytesReadValues.sum val totalRecords = recordsReadValues.sum diff --git a/spark/src/test/scala/org/apache/spark/CometListenerBusUtils.scala b/spark/src/test/scala/org/apache/spark/CometListenerBusUtils.scala new file mode 100644 index 0000000000..99016db44a --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/CometListenerBusUtils.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark + +object CometListenerBusUtils { + + /** + * Blocks until the SparkContext's listener bus has drained all pending events. Exposes the + * package-private `listenerBus.waitUntilEmpty()` to tests outside the `org.apache.spark` + * package. + */ + def waitUntilEmpty(sc: SparkContext): Unit = sc.listenerBus.waitUntilEmpty() +} From 9ce7fec378f2ca76a01e41145696235c82cc1f74 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 28 Apr 2026 13:13:39 -0400 Subject: [PATCH 4/5] Address PR feedback. --- docs/source/user-guide/latest/iceberg.md | 4 +- .../comet/CometIcebergNativeSuite.scala | 43 ++++++++++++++----- 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/docs/source/user-guide/latest/iceberg.md b/docs/source/user-guide/latest/iceberg.md index 4261e502fb..24a4bda057 100644 --- a/docs/source/user-guide/latest/iceberg.md +++ b/docs/source/user-guide/latest/iceberg.md @@ -144,4 +144,6 @@ The following scenarios will fall back to Spark's native Iceberg reader: ### Task input metrics -The native Iceberg reader populates Spark's task-level `inputMetrics.bytesRead` (visible in the Spark UI Stages tab) using the `bytes_read` counter from iceberg-rust's `ScanMetrics`. This counter includes bytes read from both data files and delete files. Iceberg Java's equivalent metric reports data-file bytes only, so `bytesRead` will diverge from the Iceberg Java path for Merge-On-Read tables with position or equality deletes. +The native Iceberg reader populates Spark's task-level `inputMetrics.bytesRead` (visible in the Spark UI Stages tab) using the `bytes_read` counter from iceberg-rust's `ScanMetrics`. This counter includes bytes read from both data files and delete files. + +Iceberg Java does not explicitly report `bytesRead` to Spark's task input metrics. On the iceberg Java path, any `bytesRead` value comes from Hadoop's filesystem-level I/O counters, not from Iceberg itself. Because Comet's native reader and the Hadoop filesystem use different counting mechanisms, the exact byte counts will differ between the two paths. diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala index 77c01c85db..4b495fb627 100644 --- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala @@ -2793,7 +2793,7 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { spark .range(10000) .selectExpr("CAST(id AS INT)", "CAST(id * 1.5 AS DOUBLE) as value") - .coalesce(1) + .repartition(5) .write .format("iceberg") .mode("append") @@ -2816,7 +2816,25 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { spark.sparkContext.addSparkListener(listener) try { - val df = spark.sql("SELECT * FROM test_cat.db.task_metrics_test") + val query = "SELECT * FROM test_cat.db.task_metrics_test" + + // Same drain-run-drain pattern as CometTaskMetricsSuite's shuffle test + CometListenerBusUtils.waitUntilEmpty(spark.sparkContext) + + // Baseline: iceberg-Java scan (Comet native disabled) + withSQLConf(CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "false") { + bytesReadValues.clear() + recordsReadValues.clear() + spark.sql(query).collect() + CometListenerBusUtils.waitUntilEmpty(spark.sparkContext) + } + val sparkBytes = bytesReadValues.sum + val sparkRecords = recordsReadValues.sum + + // Comet native Iceberg scan + bytesReadValues.clear() + recordsReadValues.clear() + val df = spark.sql(query) val scanNodes = df.queryExecution.executedPlan .collectLeaves() @@ -2824,23 +2842,26 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { assert(scanNodes.nonEmpty, "Expected CometIcebergNativeScanExec in plan") df.collect() - - // Drain listener events so onTaskEnd has fired before we assert CometListenerBusUtils.waitUntilEmpty(spark.sparkContext) - val totalBytes = bytesReadValues.sum - val totalRecords = recordsReadValues.sum + val cometBytes = bytesReadValues.sum + val cometRecords = recordsReadValues.sum + + // Both paths should report metrics + assert(sparkBytes > 0, s"Spark bytesRead should be > 0, got $sparkBytes") + assert(sparkRecords > 0, s"Spark recordsRead should be > 0, got $sparkRecords") + assert(cometBytes > 0, s"Comet bytesRead should be > 0, got $cometBytes") + assert(cometRecords > 0, s"Comet recordsRead should be > 0, got $cometRecords") - assert(totalBytes > 0, s"task inputMetrics.bytesRead should be > 0, got $totalBytes") assert( - totalRecords == 10000, - s"task inputMetrics.recordsRead should be 10000, got $totalRecords") + cometRecords == sparkRecords, + s"recordsRead mismatch: comet=$cometRecords, spark=$sparkRecords") // SQL-level metric should match task-level metric val sqlBytes = scanNodes.head.metrics("bytes_scanned").value assert( - sqlBytes == totalBytes, - s"SQL bytes_scanned ($sqlBytes) should match task bytesRead ($totalBytes)") + sqlBytes == cometBytes, + s"SQL bytes_scanned ($sqlBytes) should match task bytesRead ($cometBytes)") } finally { spark.sparkContext.removeSparkListener(listener) spark.sql("DROP TABLE test_cat.db.task_metrics_test") From 7889d3bc1c8c1557921bb8bef292a8969705937d Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 28 Apr 2026 13:18:25 -0400 Subject: [PATCH 5/5] Address PR feedback. --- .../comet/CometIcebergNativeSuite.scala | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala index 4b495fb627..6d870583f6 100644 --- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala @@ -551,6 +551,69 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { } } + test("bytes_scanned includes delete file I/O") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE test_cat.db.delete_bytes_test ( + id INT, + value DOUBLE + ) USING iceberg + TBLPROPERTIES ( + 'write.delete.mode' = 'merge-on-read', + 'write.merge.mode' = 'merge-on-read' + ) + """) + + spark + .range(1000) + .selectExpr("CAST(id AS INT)", "CAST(id * 1.5 AS DOUBLE) as value") + .coalesce(1) + .write + .format("iceberg") + .mode("append") + .saveAsTable("test_cat.db.delete_bytes_test") + + // Scan before deletes: data files only + val dfBefore = spark.sql("SELECT * FROM test_cat.db.delete_bytes_test") + val scanBefore = dfBefore.queryExecution.executedPlan + .collectLeaves() + .collect { case s: CometIcebergNativeScanExec => s } + .head + dfBefore.collect() + val bytesBefore = scanBefore.metrics("bytes_scanned").value + assert(bytesBefore > 0, s"bytes_scanned before deletes should be > 0, got $bytesBefore") + + // Create position delete files + spark.sql("DELETE FROM test_cat.db.delete_bytes_test WHERE id < 100") + + // Scan after deletes: data files + delete files + val dfAfter = spark.sql("SELECT * FROM test_cat.db.delete_bytes_test") + val scanAfter = dfAfter.queryExecution.executedPlan + .collectLeaves() + .collect { case s: CometIcebergNativeScanExec => s } + .head + dfAfter.collect() + val bytesAfter = scanAfter.metrics("bytes_scanned").value + + assert( + bytesAfter > bytesBefore, + s"bytes_scanned should increase after deletes: before=$bytesBefore, after=$bytesAfter") + + spark.sql("DROP TABLE test_cat.db.delete_bytes_test") + } + } + } + test("MOR table with EQUALITY deletes - verify deletes are applied") { assume(icebergAvailable, "Iceberg not available in classpath")