Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/source/user-guide/latest/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,9 @@ The following scenarios will fall back to Spark's native Iceberg reader:
- Scans with residual filters using `truncate`, `bucket`, `year`, `month`, `day`, or `hour`
transform functions (partition pruning still works, but row-level filtering of these
transforms falls back)

### Task input metrics

The native Iceberg reader populates Spark's task-level `inputMetrics.bytesRead` (visible in the Spark UI Stages tab) using the `bytes_read` counter from iceberg-rust's `ScanMetrics`. This counter includes bytes read from both data files and delete files.

Iceberg Java does not explicitly report `bytesRead` to Spark's task input metrics. On the iceberg Java path, any `bytesRead` value comes from Hadoop's filesystem-level I/O counters, not from Iceberg itself. Because Comet's native reader and the Hadoop filesystem use different counting mechanisms, the exact byte counts will differ between the two paths.
4 changes: 2 additions & 2 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ object_store = { version = "0.13.1", features = ["gcp", "azure", "aws", "http"]
url = "2.2"
aws-config = "1.8.16"
aws-credential-types = "1.2.13"
iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "a2f067d" }
iceberg-storage-opendal = { git = "https://github.com/apache/iceberg-rust", rev = "a2f067d", features = ["opendal-all"] }
iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "1ad4bfd" }
iceberg-storage-opendal = { git = "https://github.com/apache/iceberg-rust", rev = "1ad4bfd", features = ["opendal-all"] }

[profile.release]
debug = true
Expand Down
27 changes: 25 additions & 2 deletions native/core/src/execution/operators/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
};
use futures::{Stream, StreamExt, TryStreamExt};
use iceberg::arrow::ScanMetrics;
use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
use iceberg_storage_opendal::OpenDalStorageFactory;

Expand Down Expand Up @@ -171,10 +172,13 @@ impl IcebergScanExec {

// Pass all tasks to iceberg-rust at once to utilize its flatten_unordered
// parallelization, avoiding overhead of single-task streams
let stream = reader.read(task_stream).map_err(|e| {
let scan_result = reader.read(task_stream).map_err(|e| {
DataFusionError::Execution(format!("Failed to read Iceberg tasks: {}", e))
})?;

let scan_metrics = scan_result.metrics().clone();
let stream = scan_result.stream();

let spark_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
let adapter_factory = SparkPhysicalExprAdapterFactory::new(spark_options, None);

Expand All @@ -187,6 +191,9 @@ impl IcebergScanExec {
adapter_factory,
cached: None,
baseline_metrics: metrics.baseline,
scan_metrics,
bytes_scanned: metrics.bytes_scanned,
last_reported_bytes: 0,
};

Ok(Box::pin(wrapped_stream))
Expand All @@ -201,7 +208,6 @@ impl IcebergScanExec {
match scheme {
"file" => Ok(Arc::new(OpenDalStorageFactory::Fs)),
"s3" | "s3a" => Ok(Arc::new(OpenDalStorageFactory::S3 {
configured_scheme: scheme.to_string(),
customized_credential_load: None,
})),
"gs" => Ok(Arc::new(OpenDalStorageFactory::Gcs)),
Expand Down Expand Up @@ -233,13 +239,16 @@ struct IcebergScanMetrics {
baseline: BaselineMetrics,
/// Count of file splits (FileScanTasks) processed
num_splits: Count,
/// Total bytes read from storage
bytes_scanned: Count,
}

impl IcebergScanMetrics {
fn new(metrics: &ExecutionPlanMetricsSet) -> Self {
Self {
baseline: BaselineMetrics::new(metrics, 0),
num_splits: MetricBuilder::new(metrics).counter("num_splits", 0),
bytes_scanned: MetricBuilder::new(metrics).counter("bytes_scanned", 0),
Comment thread
mbutrovich marked this conversation as resolved.
}
}
}
Expand All @@ -257,6 +266,12 @@ struct IcebergStreamWrapper<S> {
cached: Option<CachedProjection>,
/// Metrics for output tracking
baseline_metrics: BaselineMetrics,
/// Iceberg scan metrics for bytes read tracking
scan_metrics: ScanMetrics,
/// DF metric counter bridging iceberg-rust's bytes_read to the metric tree
bytes_scanned: Count,
/// Last reported bytes_read value for delta computation
last_reported_bytes: u64,
}

/// Cached projection state: file schema, adapter, and pre-built projection expressions.
Expand Down Expand Up @@ -314,6 +329,14 @@ where
other => other,
};

// Bridge iceberg-rust's live AtomicU64 counter into the DF metric tree
let current = self.scan_metrics.bytes_read();
let delta = current - self.last_reported_bytes;
if delta > 0 {
self.bytes_scanned.add(delta as usize);
self.last_reported_bytes = current;
}

self.baseline_metrics.record_poll(result)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.spark.sql.comet

import scala.jdk.CollectionConverters._

import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, DynamicPruningExpression, SortOrder}
Expand Down Expand Up @@ -272,7 +273,8 @@ case class CometIcebergNativeScanExec(

override lazy val metrics: Map[String, SQLMetric] = {
val baseMetrics = Map(
"output_rows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
"output_rows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"bytes_scanned" -> SQLMetrics.createSizeMetric(sparkContext, "number of bytes scanned"))
Comment thread
mbutrovich marked this conversation as resolved.

// Create IMMUTABLE metrics with captured values AND types
// these won't be affected by accumulator merges
Expand All @@ -296,16 +298,22 @@ case class CometIcebergNativeScanExec(
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val nativeMetrics = CometMetricNode.fromCometPlan(this)
val serializedPlan = CometExec.serializeNativePlan(nativeOp)
CometExecRDD(
new CometExecRDD(
sparkContext,
inputRDDs = Seq.empty,
commonByKey = Map(metadataLocation -> commonData),
perPartitionByKey = Map(metadataLocation -> perPartitionData),
serializedPlan = serializedPlan,
numPartitions = perPartitionData.length,
defaultNumPartitions = perPartitionData.length,
numOutputCols = output.length,
nativeMetrics = nativeMetrics,
subqueries = Seq.empty)
subqueries = Seq.empty) {
override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = {
val res = super.compute(split, context)
Option(context).foreach(nativeMetrics.reportScanInputMetrics)
res
}
}
}

/**
Expand Down
168 changes: 168 additions & 0 deletions spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ package org.apache.comet
import java.io.File
import java.nio.file.Files

import scala.collection.mutable
import scala.jdk.CollectionConverters._

import org.apache.spark.CometListenerBusUtils
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.CometTestBase
import org.apache.spark.sql.comet.CometIcebergNativeScanExec
import org.apache.spark.sql.execution.SparkPlan
Expand Down Expand Up @@ -548,6 +551,69 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper {
}
}

test("bytes_scanned includes delete file I/O") {
assume(icebergAvailable, "Iceberg not available in classpath")

withTempIcebergDir { warehouseDir =>
withSQLConf(
"spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.test_cat.type" -> "hadoop",
"spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath,
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {

spark.sql("""
CREATE TABLE test_cat.db.delete_bytes_test (
id INT,
value DOUBLE
) USING iceberg
TBLPROPERTIES (
'write.delete.mode' = 'merge-on-read',
'write.merge.mode' = 'merge-on-read'
)
""")

spark
.range(1000)
.selectExpr("CAST(id AS INT)", "CAST(id * 1.5 AS DOUBLE) as value")
.coalesce(1)
.write
.format("iceberg")
.mode("append")
.saveAsTable("test_cat.db.delete_bytes_test")

// Scan before deletes: data files only
val dfBefore = spark.sql("SELECT * FROM test_cat.db.delete_bytes_test")
val scanBefore = dfBefore.queryExecution.executedPlan
.collectLeaves()
.collect { case s: CometIcebergNativeScanExec => s }
.head
dfBefore.collect()
val bytesBefore = scanBefore.metrics("bytes_scanned").value
assert(bytesBefore > 0, s"bytes_scanned before deletes should be > 0, got $bytesBefore")

// Create position delete files
spark.sql("DELETE FROM test_cat.db.delete_bytes_test WHERE id < 100")
Comment thread
mbutrovich marked this conversation as resolved.

// Scan after deletes: data files + delete files
val dfAfter = spark.sql("SELECT * FROM test_cat.db.delete_bytes_test")
val scanAfter = dfAfter.queryExecution.executedPlan
.collectLeaves()
.collect { case s: CometIcebergNativeScanExec => s }
.head
dfAfter.collect()
val bytesAfter = scanAfter.metrics("bytes_scanned").value

assert(
bytesAfter > bytesBefore,
s"bytes_scanned should increase after deletes: before=$bytesBefore, after=$bytesAfter")

spark.sql("DROP TABLE test_cat.db.delete_bytes_test")
}
}
}

test("MOR table with EQUALITY deletes - verify deletes are applied") {
assume(icebergAvailable, "Iceberg not available in classpath")

Expand Down Expand Up @@ -1445,6 +1511,9 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper {

assert(metrics("output_rows").value == 10000)
assert(metrics("num_splits").value > 0)
assert(
metrics("bytes_scanned").value > 0,
"bytes_scanned should be > 0 after reading data files")
// ImmutableSQLMetric prevents these from being reset to 0 after execution
assert(
metrics("totalDataManifest").value > 0,
Expand Down Expand Up @@ -2764,4 +2833,103 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper {
}
}
}

test("task-level inputMetrics.bytesRead is populated for Iceberg native scan") {
assume(icebergAvailable, "Iceberg not available in classpath")

withTempIcebergDir { warehouseDir =>
withSQLConf(
"spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.test_cat.type" -> "hadoop",
"spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath,
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {

spark.sql("""
CREATE TABLE test_cat.db.task_metrics_test (
id INT,
value DOUBLE
) USING iceberg
""")

spark
.range(10000)
.selectExpr("CAST(id AS INT)", "CAST(id * 1.5 AS DOUBLE) as value")
.repartition(5)
.write
.format("iceberg")
.mode("append")
Comment thread
mbutrovich marked this conversation as resolved.
.saveAsTable("test_cat.db.task_metrics_test")

val bytesReadValues = mutable.ArrayBuffer.empty[Long]
val recordsReadValues = mutable.ArrayBuffer.empty[Long]

val listener = new SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val im = taskEnd.taskMetrics.inputMetrics
if (im.bytesRead > 0) {
bytesReadValues.synchronized {
bytesReadValues += im.bytesRead
recordsReadValues += im.recordsRead
}
}
}
}
spark.sparkContext.addSparkListener(listener)

try {
val query = "SELECT * FROM test_cat.db.task_metrics_test"

// Same drain-run-drain pattern as CometTaskMetricsSuite's shuffle test
CometListenerBusUtils.waitUntilEmpty(spark.sparkContext)

// Baseline: iceberg-Java scan (Comet native disabled)
withSQLConf(CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "false") {
bytesReadValues.clear()
recordsReadValues.clear()
spark.sql(query).collect()
CometListenerBusUtils.waitUntilEmpty(spark.sparkContext)
}
val sparkBytes = bytesReadValues.sum
val sparkRecords = recordsReadValues.sum

// Comet native Iceberg scan
bytesReadValues.clear()
recordsReadValues.clear()
val df = spark.sql(query)

val scanNodes = df.queryExecution.executedPlan
.collectLeaves()
.collect { case s: CometIcebergNativeScanExec => s }
assert(scanNodes.nonEmpty, "Expected CometIcebergNativeScanExec in plan")

df.collect()
CometListenerBusUtils.waitUntilEmpty(spark.sparkContext)

val cometBytes = bytesReadValues.sum
val cometRecords = recordsReadValues.sum

// Both paths should report metrics
assert(sparkBytes > 0, s"Spark bytesRead should be > 0, got $sparkBytes")
assert(sparkRecords > 0, s"Spark recordsRead should be > 0, got $sparkRecords")
assert(cometBytes > 0, s"Comet bytesRead should be > 0, got $cometBytes")
assert(cometRecords > 0, s"Comet recordsRead should be > 0, got $cometRecords")

assert(
cometRecords == sparkRecords,
s"recordsRead mismatch: comet=$cometRecords, spark=$sparkRecords")

// SQL-level metric should match task-level metric
val sqlBytes = scanNodes.head.metrics("bytes_scanned").value
assert(
sqlBytes == cometBytes,
s"SQL bytes_scanned ($sqlBytes) should match task bytesRead ($cometBytes)")
} finally {
spark.sparkContext.removeSparkListener(listener)
spark.sql("DROP TABLE test_cat.db.task_metrics_test")
}
}
}
}
}
30 changes: 30 additions & 0 deletions spark/src/test/scala/org/apache/spark/CometListenerBusUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark

object CometListenerBusUtils {

/**
* Blocks until the SparkContext's listener bus has drained all pending events. Exposes the
* package-private `listenerBus.waitUntilEmpty()` to tests outside the `org.apache.spark`
* package.
*/
def waitUntilEmpty(sc: SparkContext): Unit = sc.listenerBus.waitUntilEmpty()
}
Loading