Skip to content

Add metrics for parquet sink#20307

Merged
xudong963 merged 5 commits intoapache:mainfrom
xudong963:metric_parquet_sink
Mar 2, 2026
Merged

Add metrics for parquet sink#20307
xudong963 merged 5 commits intoapache:mainfrom
xudong963:metric_parquet_sink

Conversation

@xudong963
Copy link
Member

@xudong963 xudong963 commented Feb 12, 2026

Which issue does this PR close?

  • Closes #.

Rationale for this change

Before the PR, explain analyze for copyinto doesn't have metric

What changes are included in this PR?

Support metrics for ParquetSink

Are these changes tested?

Yes, ut + sqllogictest

Are there any user-facing changes?

@xudong963 xudong963 marked this pull request as draft February 12, 2026 10:26
@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) datasource Changes to the datasource crate labels Feb 12, 2026
@xudong963 xudong963 marked this pull request as ready for review February 12, 2026 15:15
xudong963 added a commit to massive-com/arrow-datafusion that referenced this pull request Feb 13, 2026
Cherry-pick of apache#20307

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
xudong963 added a commit to massive-com/arrow-datafusion that referenced this pull request Feb 13, 2026
Cherry-pick of apache#20307

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@kosiew kosiew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xudong963 Thanks for working on this.

.map(|rg| rg.compressed_size() as usize)
.sum();
rows_written_counter.add(file_rows);
bytes_written_counter.add(file_bytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that bytes_written is derived from row_groups().compressed_size() and may not reflect the exact on-disk file size, should we clarify this in the documentation or rename it to something like compressed_row_group_bytes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I think bytes_written is the intuitive name that users expect for a sink metric. I'd like to keep the name bytes_written but adding a brief doc comment explaining what it measures.

(Renaming to compressed_row_group_bytes is overly verbose and would surprise users familiar with standard sink metrics. )

Comment on lines +485 to +488
let rows_written = aggregated
.iter()
.find(|m| m.value().name() == "rows_written")
.expect("should have rows_written metric");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since both test_parquet_sink_metrics_parallel and test_parquet_sink_metrics repeat the aggregate_by_name().iter().find(...) pattern, should we add a small helper (e.g., metric_usize(&aggregated, "rows_written")) to simplify the tests and improve readability?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

Comment on lines +1342 to +1343
let rows_written_counter =
MetricBuilder::new(&self.metrics).global_counter("rows_written");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Parquet now implements sink metrics but CSV/JSON still use the default DataSink::metrics() -> None, is this divergence intentional? If so, would it make sense to follow up by centralizing common sink metrics wiring (e.g., in FileSink or write orchestration) to reduce duplication and enable consistent opt-in across file sinks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I'll open an issue and make a follow-up PR for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opened an issue #20644

@xudong963 xudong963 force-pushed the metric_parquet_sink branch from b235df2 to 50d3d0f Compare March 1, 2026 21:08
@xudong963
Copy link
Member Author

@kosiew thanks for the review, applied your suggestions in the commit

Copy link
Contributor

@kosiew kosiew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@xudong963 xudong963 added this pull request to the merge queue Mar 2, 2026
Merged via the queue into apache:main with commit 95de1bf Mar 2, 2026
28 checks passed
de-bgunter pushed a commit to de-bgunter/datafusion that referenced this pull request Mar 24, 2026
## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes apache#123` indicates that this PR will close issue apache#123.
-->

- Closes #.

## Rationale for this change

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

Before the PR, explain analyze for copyinto doesn't have metric

## What changes are included in this PR?

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

Support metrics for ParquetSink

## Are these changes tested?

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
Yes, ut + sqllogictest

## Are there any user-facing changes?

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
natalievolk added a commit to natalievolk/datafusion that referenced this pull request Mar 24, 2026
…Sink and JsonSink

Follows the pattern established in apache#20307 (ParquetSink metrics). Threads
bytes_written tracking through the shared `spawn_writer_tasks_and_join`
orchestration function so both CSV and JSON sinks get consistent metrics
without duplicating logic.

Changes:
- `orchestration.rs`: track bytes per serialized chunk in
  `serialize_rb_stream_to_object_store`; extend the internal oneshot
  channel to carry (rows, bytes); add optional `Count` params to
  `spawn_writer_tasks_and_join`
- `CsvSink` / `JsonSink`: add `ExecutionPlanMetricsSet`, wire
  `rows_written`, `bytes_written`, `elapsed_compute` counters, implement
  `DataSink::metrics()`
- Tests: EXPLAIN ANALYZE sqllogictests in copy.slt; unit tests in
  datasource/file_format/csv.rs and json.rs

Closes apache#20644

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate datasource Changes to the datasource crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants