Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,12 @@ impl ExternalSorter {
metrics: BaselineMetrics,
) -> Result<SendableRecordBatchStream> {
assert_ne!(self.in_mem_batches.len(), 0);

// The elapsed compute timer is updated when the value is dropped.
// There is no need for an explicit call to drop.
let elapsed_compute = metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer();

if self.in_mem_batches.len() == 1 {
let batch = self.in_mem_batches.remove(0);
let reservation = self.reservation.take();
Expand Down Expand Up @@ -552,7 +558,9 @@ impl ExternalSorter {
let fetch = self.fetch;
let expressions = Arc::clone(&self.expr);
let stream = futures::stream::once(futures::future::lazy(move |_| {
let timer = metrics.elapsed_compute().timer();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the rest of this code, and I think there are other non trivial consumers of compute (e.g. the call to concat_batches above)

What I would recommend is moving this timer up into the SortExec::in_mem_sort and add one in SortExec::in_mem_sort_stream

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did double check and the streaming_merge code does seem to track elapsed compute reasonably:

// NB timer records time taken on drop, so there are no
// calls to `timer.done()` below.
let elapsed_compute = self.metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer();

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where I have put the timer is part of the stream, so would not execute until the stream is polled, that means it wouldn't be tracked if the timer was moved to in_mem_sort_stream. I agree there should probably be another one there too though. in_mem_sort looks to be reasonably trivial to me, but I could add timers in the non-async blocks too if you think it's worthwhile.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think I missed the fact the timer is in the future 👍

let sorted = sort_batch(&batch, &expressions, fetch)?;
timer.done();
metrics.record_output(sorted.num_rows());
drop(batch);
drop(reservation);
Expand Down