From 5f8ef31bc873f62b7c9c22daf2dca2d7b8f99799 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Tue, 10 Feb 2026 19:15:21 +0800 Subject: [PATCH 1/5] Add metrics for parquet sink --- datafusion/core/src/dataframe/parquet.rs | 148 ++++++++++++++++++ .../datasource-parquet/src/file_format.rs | 30 ++++ datafusion/sqllogictest/test_files/copy.slt | 8 + 3 files changed, 186 insertions(+) diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 54dadfd78cbc2..5908d1a5a2625 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -430,6 +430,154 @@ mod tests { Ok(()) } + /// Test that ParquetSink exposes rows_written, bytes_written, and + /// elapsed_compute metrics via DataSinkExec. + #[tokio::test] + async fn test_parquet_sink_metrics() -> Result<()> { + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use datafusion_execution::TaskContext; + + use futures::TryStreamExt; + + let ctx = SessionContext::new(); + let tmp_dir = TempDir::new()?; + let output_path = tmp_dir.path().join("metrics_test.parquet"); + let output_path_str = output_path.to_str().unwrap(); + + // Register a table with 100 rows + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("val", DataType::Int32, false), + ])); + let ids: Vec = (0..100).collect(); + let vals: Vec = (100..200).collect(); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(ids)), + Arc::new(Int32Array::from(vals)), + ], + )?; + ctx.register_batch("source", batch)?; + + // Create the physical plan for COPY TO + let df = ctx + .sql(&format!( + "COPY source TO '{output_path_str}' STORED AS PARQUET" + )) + .await?; + let plan = df.create_physical_plan().await?; + + // Execute the plan + let task_ctx = Arc::new(TaskContext::from(&ctx.state())); + let stream = plan.execute(0, task_ctx)?; + let _batches: Vec<_> = stream.try_collect().await?; + + // Check metrics on the DataSinkExec (top-level plan) + let metrics = plan + .metrics() + .expect("DataSinkExec should return metrics from ParquetSink"); + let aggregated = metrics.aggregate_by_name(); + + // rows_written should be 100 + let rows_written = aggregated + .iter() + .find(|m| m.value().name() == "rows_written") + .expect("should have rows_written metric"); + assert_eq!( + rows_written.value().as_usize(), + 100, + "expected 100 rows written" + ); + + // bytes_written should be > 0 + let bytes_written = aggregated + .iter() + .find(|m| m.value().name() == "bytes_written") + .expect("should have bytes_written metric"); + assert!( + bytes_written.value().as_usize() > 0, + "expected bytes_written > 0, got {}", + bytes_written.value().as_usize() + ); + + // elapsed_compute should be > 0 + let elapsed = aggregated + .iter() + .find(|m| m.value().name() == "elapsed_compute") + .expect("should have elapsed_compute metric"); + assert!( + elapsed.value().as_usize() > 0, + "expected elapsed_compute > 0" + ); + + Ok(()) + } + + /// Test that ParquetSink metrics work with single_file_parallelism enabled. + #[tokio::test] + async fn test_parquet_sink_metrics_parallel() -> Result<()> { + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use datafusion_execution::TaskContext; + + use futures::TryStreamExt; + + let ctx = SessionContext::new(); + ctx.sql("SET datafusion.execution.parquet.allow_single_file_parallelism = true") + .await? + .collect() + .await?; + + let tmp_dir = TempDir::new()?; + let output_path = tmp_dir.path().join("metrics_parallel.parquet"); + let output_path_str = output_path.to_str().unwrap(); + + let schema = Arc::new(Schema::new(vec![Field::new( + "id", + DataType::Int32, + false, + )])); + let ids: Vec = (0..50).collect(); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(ids))], + )?; + ctx.register_batch("source2", batch)?; + + let df = ctx + .sql(&format!( + "COPY source2 TO '{output_path_str}' STORED AS PARQUET" + )) + .await?; + let plan = df.create_physical_plan().await?; + let task_ctx = Arc::new(TaskContext::from(&ctx.state())); + let stream = plan.execute(0, task_ctx)?; + let _batches: Vec<_> = stream.try_collect().await?; + + let metrics = plan + .metrics() + .expect("DataSinkExec should return metrics"); + let aggregated = metrics.aggregate_by_name(); + + let rows_written = aggregated + .iter() + .find(|m| m.value().name() == "rows_written") + .expect("should have rows_written metric"); + assert_eq!(rows_written.value().as_usize(), 50); + + let bytes_written = aggregated + .iter() + .find(|m| m.value().name() == "bytes_written") + .expect("should have bytes_written metric"); + assert!(bytes_written.value().as_usize() > 0); + + Ok(()) + } + /// Test FileOutputMode::Directory - explicitly request directory output /// even for paths WITH file extensions. #[tokio::test] diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index edbdd618edb05..0ebbd8a33cb86 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -55,6 +55,9 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReserv use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; +use datafusion_physical_plan::metrics::{ + ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, +}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; @@ -1156,6 +1159,8 @@ pub struct ParquetSink { written: Arc>>, /// Optional sorting columns to write to Parquet metadata sorting_columns: Option>, + /// Metrics for tracking write operations + metrics: ExecutionPlanMetricsSet, } impl Debug for ParquetSink { @@ -1188,6 +1193,7 @@ impl ParquetSink { parquet_options, written: Default::default(), sorting_columns: None, + metrics: ExecutionPlanMetricsSet::new(), } } @@ -1333,6 +1339,15 @@ impl FileSink for ParquetSink { mut file_stream_rx: DemuxedStreamReceiver, object_store: Arc, ) -> Result { + let rows_written_counter = + MetricBuilder::new(&self.metrics).global_counter("rows_written"); + let bytes_written_counter = + MetricBuilder::new(&self.metrics).global_counter("bytes_written"); + let elapsed_compute = + MetricBuilder::new(&self.metrics).elapsed_compute(0); + + let write_start = std::time::Instant::now(); + let parquet_opts = &self.parquet_options; let mut file_write_tasks: JoinSet< @@ -1415,7 +1430,16 @@ impl FileSink for ParquetSink { match result { Ok(r) => { let (path, parquet_meta_data) = r?; + let file_rows = + parquet_meta_data.file_metadata().num_rows() as usize; + let file_bytes: usize = parquet_meta_data + .row_groups() + .iter() + .map(|rg| rg.compressed_size() as usize) + .sum(); row_count += parquet_meta_data.file_metadata().num_rows(); + rows_written_counter.add(file_rows); + bytes_written_counter.add(file_bytes); let mut written_files = self.written.lock(); written_files .try_insert(path.clone(), parquet_meta_data) @@ -1437,6 +1461,8 @@ impl FileSink for ParquetSink { .await .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; + elapsed_compute.add_elapsed(write_start); + Ok(row_count as u64) } } @@ -1447,6 +1473,10 @@ impl DataSink for ParquetSink { self } + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + fn schema(&self) -> &SchemaRef { self.config.output_schema() } diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 9af0dc63936ae..98ad6581fa5bf 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -200,6 +200,14 @@ physical_plan 01)DataSinkExec: sink=ParquetSink(file_groups=[]) 02)--DataSourceExec: partitions=1, partition_sizes=[1] +# Verify ParquetSink exposes rows_written, bytes_written, and elapsed_compute metrics +query TT +EXPLAIN ANALYZE COPY source_table TO 'test_files/scratch/copy/table_metrics/' STORED AS PARQUET; +---- +Plan with Metrics +01)DataSinkExec: sink=ParquetSink(file_groups=[]), metrics=[elapsed_compute=, bytes_written=, rows_written=2] +02)--DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] + # Copy to directory as partitioned files with keep_partition_by_columns enabled query I COPY (values ('1', 'a'), ('2', 'b'), ('3', 'c')) TO 'test_files/scratch/copy/partitioned_table4/' STORED AS parquet PARTITIONED BY (column1) From 975715e4981fe74fddd07b26605638e77f1da338 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Thu, 12 Feb 2026 23:00:08 +0800 Subject: [PATCH 2/5] rich test --- datafusion/core/src/dataframe/parquet.rs | 11 +++-------- datafusion/datasource-parquet/src/file_format.rs | 6 ++---- datafusion/sqllogictest/test_files/copy.slt | 7 +++++-- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 5908d1a5a2625..9015ee7aea113 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -536,11 +536,8 @@ mod tests { let output_path = tmp_dir.path().join("metrics_parallel.parquet"); let output_path_str = output_path.to_str().unwrap(); - let schema = Arc::new(Schema::new(vec![Field::new( - "id", - DataType::Int32, - false, - )])); + let schema = + Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); let ids: Vec = (0..50).collect(); let batch = RecordBatch::try_new( Arc::clone(&schema), @@ -558,9 +555,7 @@ mod tests { let stream = plan.execute(0, task_ctx)?; let _batches: Vec<_> = stream.try_collect().await?; - let metrics = plan - .metrics() - .expect("DataSinkExec should return metrics"); + let metrics = plan.metrics().expect("DataSinkExec should return metrics"); let aggregated = metrics.aggregate_by_name(); let rows_written = aggregated diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 0ebbd8a33cb86..9af8aa4b06046 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1343,8 +1343,7 @@ impl FileSink for ParquetSink { MetricBuilder::new(&self.metrics).global_counter("rows_written"); let bytes_written_counter = MetricBuilder::new(&self.metrics).global_counter("bytes_written"); - let elapsed_compute = - MetricBuilder::new(&self.metrics).elapsed_compute(0); + let elapsed_compute = MetricBuilder::new(&self.metrics).elapsed_compute(0); let write_start = std::time::Instant::now(); @@ -1430,8 +1429,7 @@ impl FileSink for ParquetSink { match result { Ok(r) => { let (path, parquet_meta_data) = r?; - let file_rows = - parquet_meta_data.file_metadata().num_rows() as usize; + let file_rows = parquet_meta_data.file_metadata().num_rows() as usize; let file_bytes: usize = parquet_meta_data .row_groups() .iter() diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 98ad6581fa5bf..402ac8e8512bf 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -201,12 +201,15 @@ physical_plan 02)--DataSourceExec: partitions=1, partition_sizes=[1] # Verify ParquetSink exposes rows_written, bytes_written, and elapsed_compute metrics +# Use a query with Sort and Projection to verify metrics across all operators query TT -EXPLAIN ANALYZE COPY source_table TO 'test_files/scratch/copy/table_metrics/' STORED AS PARQUET; +EXPLAIN ANALYZE COPY (SELECT col1, upper(col2) AS col2_upper FROM source_table ORDER BY col1) TO 'test_files/scratch/copy/table_metrics/' STORED AS PARQUET; ---- Plan with Metrics 01)DataSinkExec: sink=ParquetSink(file_groups=[]), metrics=[elapsed_compute=, bytes_written=, rows_written=2] -02)--DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] +02)--SortExec: expr=[col1@0 ASC NULLS LAST], preserve_partitioning=[false], metrics=[output_rows=2, elapsed_compute=, output_bytes=, output_batches=, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0] +03)----ProjectionExec: expr=[col1@0 as col1, upper(col2@1) as col2_upper], metrics=[output_rows=2, elapsed_compute=, output_bytes=, output_batches=1, expr_0_eval_time=, expr_1_eval_time=] +04)------DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] # Copy to directory as partitioned files with keep_partition_by_columns enabled query I From ac882e667dba753b35477e4168d2d26d965ed8c5 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Thu, 12 Feb 2026 23:06:09 +0800 Subject: [PATCH 3/5] Fix clippy: use datafusion Instant for WASM compatibility Co-Authored-By: Claude Opus 4.6 --- datafusion/datasource-parquet/src/file_format.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 9af8aa4b06046..5936cf28893d2 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1345,7 +1345,7 @@ impl FileSink for ParquetSink { MetricBuilder::new(&self.metrics).global_counter("bytes_written"); let elapsed_compute = MetricBuilder::new(&self.metrics).elapsed_compute(0); - let write_start = std::time::Instant::now(); + let write_start = datafusion_common::instant::Instant::now(); let parquet_opts = &self.parquet_options; From ac9e4839cfc1b84aa7997b088ccbf72a34d2289c Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Thu, 12 Feb 2026 23:12:28 +0800 Subject: [PATCH 4/5] remove duplicate --- datafusion/datasource-parquet/src/file_format.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 5936cf28893d2..4438cff50cfe8 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1424,7 +1424,6 @@ impl FileSink for ParquetSink { } } - let mut row_count = 0; while let Some(result) = file_write_tasks.join_next().await { match result { Ok(r) => { @@ -1435,7 +1434,6 @@ impl FileSink for ParquetSink { .iter() .map(|rg| rg.compressed_size() as usize) .sum(); - row_count += parquet_meta_data.file_metadata().num_rows(); rows_written_counter.add(file_rows); bytes_written_counter.add(file_bytes); let mut written_files = self.written.lock(); @@ -1461,7 +1459,7 @@ impl FileSink for ParquetSink { elapsed_compute.add_elapsed(write_start); - Ok(row_count as u64) + Ok(rows_written_counter.value() as u64) } } From 50d3d0f15ece648c03f7e06ccce9c6dc386399cc Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Sun, 1 Mar 2026 22:08:22 +0100 Subject: [PATCH 5/5] resolve comments --- datafusion/core/src/dataframe/parquet.rs | 52 ++++++++----------- .../datasource-parquet/src/file_format.rs | 3 ++ 2 files changed, 24 insertions(+), 31 deletions(-) diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 9015ee7aea113..e9c49a92843d6 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -127,6 +127,19 @@ mod tests { use tempfile::TempDir; use url::Url; + /// Helper to extract a metric value by name from aggregated metrics. + fn metric_usize( + aggregated: &datafusion_physical_expr_common::metrics::MetricsSet, + name: &str, + ) -> usize { + aggregated + .iter() + .find(|m| m.value().name() == name) + .unwrap_or_else(|| panic!("should have {name} metric")) + .value() + .as_usize() + } + #[tokio::test] async fn filter_pushdown_dataframe() -> Result<()> { let ctx = SessionContext::new(); @@ -482,36 +495,22 @@ mod tests { let aggregated = metrics.aggregate_by_name(); // rows_written should be 100 - let rows_written = aggregated - .iter() - .find(|m| m.value().name() == "rows_written") - .expect("should have rows_written metric"); assert_eq!( - rows_written.value().as_usize(), + metric_usize(&aggregated, "rows_written"), 100, "expected 100 rows written" ); // bytes_written should be > 0 - let bytes_written = aggregated - .iter() - .find(|m| m.value().name() == "bytes_written") - .expect("should have bytes_written metric"); + let bytes_written = metric_usize(&aggregated, "bytes_written"); assert!( - bytes_written.value().as_usize() > 0, - "expected bytes_written > 0, got {}", - bytes_written.value().as_usize() + bytes_written > 0, + "expected bytes_written > 0, got {bytes_written}" ); // elapsed_compute should be > 0 - let elapsed = aggregated - .iter() - .find(|m| m.value().name() == "elapsed_compute") - .expect("should have elapsed_compute metric"); - assert!( - elapsed.value().as_usize() > 0, - "expected elapsed_compute > 0" - ); + let elapsed = metric_usize(&aggregated, "elapsed_compute"); + assert!(elapsed > 0, "expected elapsed_compute > 0"); Ok(()) } @@ -558,17 +557,8 @@ mod tests { let metrics = plan.metrics().expect("DataSinkExec should return metrics"); let aggregated = metrics.aggregate_by_name(); - let rows_written = aggregated - .iter() - .find(|m| m.value().name() == "rows_written") - .expect("should have rows_written metric"); - assert_eq!(rows_written.value().as_usize(), 50); - - let bytes_written = aggregated - .iter() - .find(|m| m.value().name() == "bytes_written") - .expect("should have bytes_written metric"); - assert!(bytes_written.value().as_usize() > 0); + assert_eq!(metric_usize(&aggregated, "rows_written"), 50); + assert!(metric_usize(&aggregated, "bytes_written") > 0); Ok(()) } diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 4438cff50cfe8..4e2affd98d551 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1341,6 +1341,9 @@ impl FileSink for ParquetSink { ) -> Result { let rows_written_counter = MetricBuilder::new(&self.metrics).global_counter("rows_written"); + // Note: bytes_written is the sum of compressed row group sizes, which + // may differ slightly from the actual on-disk file size (excludes footer, + // page indexes, and other Parquet metadata overhead). let bytes_written_counter = MetricBuilder::new(&self.metrics).global_counter("bytes_written"); let elapsed_compute = MetricBuilder::new(&self.metrics).elapsed_compute(0);