Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,19 @@ mod tests {
use tempfile::TempDir;
use url::Url;

/// Helper to extract a metric value by name from aggregated metrics.
fn metric_usize(
aggregated: &datafusion_physical_expr_common::metrics::MetricsSet,
name: &str,
) -> usize {
aggregated
.iter()
.find(|m| m.value().name() == name)
.unwrap_or_else(|| panic!("should have {name} metric"))
.value()
.as_usize()
}

#[tokio::test]
async fn filter_pushdown_dataframe() -> Result<()> {
let ctx = SessionContext::new();
Expand Down Expand Up @@ -430,6 +443,126 @@ mod tests {
Ok(())
}

/// Test that ParquetSink exposes rows_written, bytes_written, and
/// elapsed_compute metrics via DataSinkExec.
#[tokio::test]
async fn test_parquet_sink_metrics() -> Result<()> {
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_execution::TaskContext;

use futures::TryStreamExt;

let ctx = SessionContext::new();
let tmp_dir = TempDir::new()?;
let output_path = tmp_dir.path().join("metrics_test.parquet");
let output_path_str = output_path.to_str().unwrap();

// Register a table with 100 rows
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("val", DataType::Int32, false),
]));
let ids: Vec<i32> = (0..100).collect();
let vals: Vec<i32> = (100..200).collect();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Int32Array::from(ids)),
Arc::new(Int32Array::from(vals)),
],
)?;
ctx.register_batch("source", batch)?;

// Create the physical plan for COPY TO
let df = ctx
.sql(&format!(
"COPY source TO '{output_path_str}' STORED AS PARQUET"
))
.await?;
let plan = df.create_physical_plan().await?;

// Execute the plan
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
let stream = plan.execute(0, task_ctx)?;
let _batches: Vec<_> = stream.try_collect().await?;

// Check metrics on the DataSinkExec (top-level plan)
let metrics = plan
.metrics()
.expect("DataSinkExec should return metrics from ParquetSink");
let aggregated = metrics.aggregate_by_name();

// rows_written should be 100
assert_eq!(
metric_usize(&aggregated, "rows_written"),
100,
"expected 100 rows written"
);

// bytes_written should be > 0
let bytes_written = metric_usize(&aggregated, "bytes_written");
assert!(
bytes_written > 0,
"expected bytes_written > 0, got {bytes_written}"
);

// elapsed_compute should be > 0
let elapsed = metric_usize(&aggregated, "elapsed_compute");
assert!(elapsed > 0, "expected elapsed_compute > 0");

Ok(())
}

/// Test that ParquetSink metrics work with single_file_parallelism enabled.
#[tokio::test]
async fn test_parquet_sink_metrics_parallel() -> Result<()> {
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_execution::TaskContext;

use futures::TryStreamExt;

let ctx = SessionContext::new();
ctx.sql("SET datafusion.execution.parquet.allow_single_file_parallelism = true")
.await?
.collect()
.await?;

let tmp_dir = TempDir::new()?;
let output_path = tmp_dir.path().join("metrics_parallel.parquet");
let output_path_str = output_path.to_str().unwrap();

let schema =
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let ids: Vec<i32> = (0..50).collect();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int32Array::from(ids))],
)?;
ctx.register_batch("source2", batch)?;

let df = ctx
.sql(&format!(
"COPY source2 TO '{output_path_str}' STORED AS PARQUET"
))
.await?;
let plan = df.create_physical_plan().await?;
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
let stream = plan.execute(0, task_ctx)?;
let _batches: Vec<_> = stream.try_collect().await?;

let metrics = plan.metrics().expect("DataSinkExec should return metrics");
let aggregated = metrics.aggregate_by_name();

assert_eq!(metric_usize(&aggregated, "rows_written"), 50);
assert!(metric_usize(&aggregated, "bytes_written") > 0);

Ok(())
}

/// Test FileOutputMode::Directory - explicitly request directory output
/// even for paths WITH file extensions.
#[tokio::test]
Expand Down
35 changes: 32 additions & 3 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReserv
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use datafusion_physical_plan::metrics::{
ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use datafusion_session::Session;

Expand Down Expand Up @@ -1156,6 +1159,8 @@ pub struct ParquetSink {
written: Arc<parking_lot::Mutex<HashMap<Path, ParquetMetaData>>>,
/// Optional sorting columns to write to Parquet metadata
sorting_columns: Option<Vec<SortingColumn>>,
/// Metrics for tracking write operations
metrics: ExecutionPlanMetricsSet,
}

impl Debug for ParquetSink {
Expand Down Expand Up @@ -1188,6 +1193,7 @@ impl ParquetSink {
parquet_options,
written: Default::default(),
sorting_columns: None,
metrics: ExecutionPlanMetricsSet::new(),
}
}

Expand Down Expand Up @@ -1333,6 +1339,17 @@ impl FileSink for ParquetSink {
mut file_stream_rx: DemuxedStreamReceiver,
object_store: Arc<dyn ObjectStore>,
) -> Result<u64> {
let rows_written_counter =
MetricBuilder::new(&self.metrics).global_counter("rows_written");
Comment on lines +1342 to +1343
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Parquet now implements sink metrics but CSV/JSON still use the default DataSink::metrics() -> None, is this divergence intentional? If so, would it make sense to follow up by centralizing common sink metrics wiring (e.g., in FileSink or write orchestration) to reduce duplication and enable consistent opt-in across file sinks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I'll open an issue and make a follow-up PR for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opened an issue #20644

// Note: bytes_written is the sum of compressed row group sizes, which
// may differ slightly from the actual on-disk file size (excludes footer,
// page indexes, and other Parquet metadata overhead).
let bytes_written_counter =
MetricBuilder::new(&self.metrics).global_counter("bytes_written");
let elapsed_compute = MetricBuilder::new(&self.metrics).elapsed_compute(0);

let write_start = datafusion_common::instant::Instant::now();

let parquet_opts = &self.parquet_options;

let mut file_write_tasks: JoinSet<
Expand Down Expand Up @@ -1410,12 +1427,18 @@ impl FileSink for ParquetSink {
}
}

let mut row_count = 0;
while let Some(result) = file_write_tasks.join_next().await {
match result {
Ok(r) => {
let (path, parquet_meta_data) = r?;
row_count += parquet_meta_data.file_metadata().num_rows();
let file_rows = parquet_meta_data.file_metadata().num_rows() as usize;
let file_bytes: usize = parquet_meta_data
.row_groups()
.iter()
.map(|rg| rg.compressed_size() as usize)
.sum();
rows_written_counter.add(file_rows);
bytes_written_counter.add(file_bytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that bytes_written is derived from row_groups().compressed_size() and may not reflect the exact on-disk file size, should we clarify this in the documentation or rename it to something like compressed_row_group_bytes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I think bytes_written is the intuitive name that users expect for a sink metric. I'd like to keep the name bytes_written but adding a brief doc comment explaining what it measures.

(Renaming to compressed_row_group_bytes is overly verbose and would surprise users familiar with standard sink metrics. )

let mut written_files = self.written.lock();
written_files
.try_insert(path.clone(), parquet_meta_data)
Expand All @@ -1437,7 +1460,9 @@ impl FileSink for ParquetSink {
.await
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;

Ok(row_count as u64)
elapsed_compute.add_elapsed(write_start);

Ok(rows_written_counter.value() as u64)
}
}

Expand All @@ -1447,6 +1472,10 @@ impl DataSink for ParquetSink {
self
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn schema(&self) -> &SchemaRef {
self.config.output_schema()
}
Expand Down
11 changes: 11 additions & 0 deletions datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,17 @@ physical_plan
01)DataSinkExec: sink=ParquetSink(file_groups=[])
02)--DataSourceExec: partitions=1, partition_sizes=[1]

# Verify ParquetSink exposes rows_written, bytes_written, and elapsed_compute metrics
# Use a query with Sort and Projection to verify metrics across all operators
query TT
EXPLAIN ANALYZE COPY (SELECT col1, upper(col2) AS col2_upper FROM source_table ORDER BY col1) TO 'test_files/scratch/copy/table_metrics/' STORED AS PARQUET;
----
Plan with Metrics
01)DataSinkExec: sink=ParquetSink(file_groups=[]), metrics=[elapsed_compute=<slt:ignore>, bytes_written=<slt:ignore>, rows_written=2]
02)--SortExec: expr=[col1@0 ASC NULLS LAST], preserve_partitioning=[false], metrics=[output_rows=2, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, output_batches=<slt:ignore>, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0]
03)----ProjectionExec: expr=[col1@0 as col1, upper(col2@1) as col2_upper], metrics=[output_rows=2, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, output_batches=1, expr_0_eval_time=<slt:ignore>, expr_1_eval_time=<slt:ignore>]
04)------DataSourceExec: partitions=1, partition_sizes=[1], metrics=[]

# Copy to directory as partitioned files with keep_partition_by_columns enabled
query I
COPY (values ('1', 'a'), ('2', 'b'), ('3', 'c')) TO 'test_files/scratch/copy/partitioned_table4/' STORED AS parquet PARTITIONED BY (column1)
Expand Down