diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 54dadfd78cbc2..e9c49a92843d6 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -127,6 +127,19 @@ mod tests { use tempfile::TempDir; use url::Url; + /// Helper to extract a metric value by name from aggregated metrics. + fn metric_usize( + aggregated: &datafusion_physical_expr_common::metrics::MetricsSet, + name: &str, + ) -> usize { + aggregated + .iter() + .find(|m| m.value().name() == name) + .unwrap_or_else(|| panic!("should have {name} metric")) + .value() + .as_usize() + } + #[tokio::test] async fn filter_pushdown_dataframe() -> Result<()> { let ctx = SessionContext::new(); @@ -430,6 +443,126 @@ mod tests { Ok(()) } + /// Test that ParquetSink exposes rows_written, bytes_written, and + /// elapsed_compute metrics via DataSinkExec. + #[tokio::test] + async fn test_parquet_sink_metrics() -> Result<()> { + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use datafusion_execution::TaskContext; + + use futures::TryStreamExt; + + let ctx = SessionContext::new(); + let tmp_dir = TempDir::new()?; + let output_path = tmp_dir.path().join("metrics_test.parquet"); + let output_path_str = output_path.to_str().unwrap(); + + // Register a table with 100 rows + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("val", DataType::Int32, false), + ])); + let ids: Vec = (0..100).collect(); + let vals: Vec = (100..200).collect(); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(ids)), + Arc::new(Int32Array::from(vals)), + ], + )?; + ctx.register_batch("source", batch)?; + + // Create the physical plan for COPY TO + let df = ctx + .sql(&format!( + "COPY source TO '{output_path_str}' STORED AS PARQUET" + )) + .await?; + let plan = df.create_physical_plan().await?; + + // Execute the plan + let task_ctx = Arc::new(TaskContext::from(&ctx.state())); + let stream = plan.execute(0, task_ctx)?; + let _batches: Vec<_> = stream.try_collect().await?; + + // Check metrics on the DataSinkExec (top-level plan) + let metrics = plan + .metrics() + .expect("DataSinkExec should return metrics from ParquetSink"); + let aggregated = metrics.aggregate_by_name(); + + // rows_written should be 100 + assert_eq!( + metric_usize(&aggregated, "rows_written"), + 100, + "expected 100 rows written" + ); + + // bytes_written should be > 0 + let bytes_written = metric_usize(&aggregated, "bytes_written"); + assert!( + bytes_written > 0, + "expected bytes_written > 0, got {bytes_written}" + ); + + // elapsed_compute should be > 0 + let elapsed = metric_usize(&aggregated, "elapsed_compute"); + assert!(elapsed > 0, "expected elapsed_compute > 0"); + + Ok(()) + } + + /// Test that ParquetSink metrics work with single_file_parallelism enabled. + #[tokio::test] + async fn test_parquet_sink_metrics_parallel() -> Result<()> { + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use datafusion_execution::TaskContext; + + use futures::TryStreamExt; + + let ctx = SessionContext::new(); + ctx.sql("SET datafusion.execution.parquet.allow_single_file_parallelism = true") + .await? + .collect() + .await?; + + let tmp_dir = TempDir::new()?; + let output_path = tmp_dir.path().join("metrics_parallel.parquet"); + let output_path_str = output_path.to_str().unwrap(); + + let schema = + Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let ids: Vec = (0..50).collect(); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(ids))], + )?; + ctx.register_batch("source2", batch)?; + + let df = ctx + .sql(&format!( + "COPY source2 TO '{output_path_str}' STORED AS PARQUET" + )) + .await?; + let plan = df.create_physical_plan().await?; + let task_ctx = Arc::new(TaskContext::from(&ctx.state())); + let stream = plan.execute(0, task_ctx)?; + let _batches: Vec<_> = stream.try_collect().await?; + + let metrics = plan.metrics().expect("DataSinkExec should return metrics"); + let aggregated = metrics.aggregate_by_name(); + + assert_eq!(metric_usize(&aggregated, "rows_written"), 50); + assert!(metric_usize(&aggregated, "bytes_written") > 0); + + Ok(()) + } + /// Test FileOutputMode::Directory - explicitly request directory output /// even for paths WITH file extensions. #[tokio::test] diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index edbdd618edb05..4e2affd98d551 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -55,6 +55,9 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReserv use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; +use datafusion_physical_plan::metrics::{ + ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, +}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; @@ -1156,6 +1159,8 @@ pub struct ParquetSink { written: Arc>>, /// Optional sorting columns to write to Parquet metadata sorting_columns: Option>, + /// Metrics for tracking write operations + metrics: ExecutionPlanMetricsSet, } impl Debug for ParquetSink { @@ -1188,6 +1193,7 @@ impl ParquetSink { parquet_options, written: Default::default(), sorting_columns: None, + metrics: ExecutionPlanMetricsSet::new(), } } @@ -1333,6 +1339,17 @@ impl FileSink for ParquetSink { mut file_stream_rx: DemuxedStreamReceiver, object_store: Arc, ) -> Result { + let rows_written_counter = + MetricBuilder::new(&self.metrics).global_counter("rows_written"); + // Note: bytes_written is the sum of compressed row group sizes, which + // may differ slightly from the actual on-disk file size (excludes footer, + // page indexes, and other Parquet metadata overhead). + let bytes_written_counter = + MetricBuilder::new(&self.metrics).global_counter("bytes_written"); + let elapsed_compute = MetricBuilder::new(&self.metrics).elapsed_compute(0); + + let write_start = datafusion_common::instant::Instant::now(); + let parquet_opts = &self.parquet_options; let mut file_write_tasks: JoinSet< @@ -1410,12 +1427,18 @@ impl FileSink for ParquetSink { } } - let mut row_count = 0; while let Some(result) = file_write_tasks.join_next().await { match result { Ok(r) => { let (path, parquet_meta_data) = r?; - row_count += parquet_meta_data.file_metadata().num_rows(); + let file_rows = parquet_meta_data.file_metadata().num_rows() as usize; + let file_bytes: usize = parquet_meta_data + .row_groups() + .iter() + .map(|rg| rg.compressed_size() as usize) + .sum(); + rows_written_counter.add(file_rows); + bytes_written_counter.add(file_bytes); let mut written_files = self.written.lock(); written_files .try_insert(path.clone(), parquet_meta_data) @@ -1437,7 +1460,9 @@ impl FileSink for ParquetSink { .await .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; - Ok(row_count as u64) + elapsed_compute.add_elapsed(write_start); + + Ok(rows_written_counter.value() as u64) } } @@ -1447,6 +1472,10 @@ impl DataSink for ParquetSink { self } + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + fn schema(&self) -> &SchemaRef { self.config.output_schema() } diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 9af0dc63936ae..402ac8e8512bf 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -200,6 +200,17 @@ physical_plan 01)DataSinkExec: sink=ParquetSink(file_groups=[]) 02)--DataSourceExec: partitions=1, partition_sizes=[1] +# Verify ParquetSink exposes rows_written, bytes_written, and elapsed_compute metrics +# Use a query with Sort and Projection to verify metrics across all operators +query TT +EXPLAIN ANALYZE COPY (SELECT col1, upper(col2) AS col2_upper FROM source_table ORDER BY col1) TO 'test_files/scratch/copy/table_metrics/' STORED AS PARQUET; +---- +Plan with Metrics +01)DataSinkExec: sink=ParquetSink(file_groups=[]), metrics=[elapsed_compute=, bytes_written=, rows_written=2] +02)--SortExec: expr=[col1@0 ASC NULLS LAST], preserve_partitioning=[false], metrics=[output_rows=2, elapsed_compute=, output_bytes=, output_batches=, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0] +03)----ProjectionExec: expr=[col1@0 as col1, upper(col2@1) as col2_upper], metrics=[output_rows=2, elapsed_compute=, output_bytes=, output_batches=1, expr_0_eval_time=, expr_1_eval_time=] +04)------DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] + # Copy to directory as partitioned files with keep_partition_by_columns enabled query I COPY (values ('1', 'a'), ('2', 'b'), ('3', 'c')) TO 'test_files/scratch/copy/partitioned_table4/' STORED AS parquet PARTITIONED BY (column1)