Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use crate::{
physical_optimizer::{
aggregate_statistics::AggregateStatistics,
hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule,
parallel_sort::ParallelSort,
},
};
pub use datafusion_physical_expr::execution_props::ExecutionProps;
Expand Down Expand Up @@ -1470,8 +1469,6 @@ impl SessionState {
.unwrap(),
)));
}
physical_optimizers.push(Arc::new(ParallelSort::new()));

physical_optimizers.push(Arc::new(Repartition::new()));
physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));

Expand Down
93 changes: 0 additions & 93 deletions datafusion/core/src/physical_optimizer/parallel_sort.rs

This file was deleted.

19 changes: 17 additions & 2 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Physical query planner

use super::analyze::AnalyzeExec;
use super::sorts::sort_preserving_merge::SortPreservingMergeExec;
use super::{
aggregates, empty::EmptyExec, hash_join::PartitionMode, udaf, union::UnionExec,
values::ValuesExec, windows,
Expand Down Expand Up @@ -841,8 +842,22 @@ impl DefaultPhysicalPlanner {
)),
})
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?))
}
// If we have a `LIMIT` can run sort/limts in parallel (similar to TopK)
Ok(if fetch.is_some() && session_state.config.target_partitions > 1 {
let sort = SortExec::new_with_partitioning(
sort_expr,
physical_input,
true,
*fetch,
);
let merge = SortPreservingMergeExec::new(
sort.expr().to_vec(),
Arc::new(sort),
);
Arc::new(merge)
} else {
Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?)
}) }
LogicalPlan::Join(Join {
left,
right,
Expand Down
65 changes: 28 additions & 37 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::physical_plan::{
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use crate::prelude::SessionConfig;
use arrow::array::{make_array, Array, ArrayRef, MutableArrayData, UInt32Array};
use arrow::array::{make_array, Array, ArrayRef, MutableArrayData};
pub use arrow::compute::SortOptions;
use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions};
use arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -152,6 +152,7 @@ impl ExternalSorter {
&self.expr,
batch_size,
tracking_metrics,
self.fetch,
)?;
let prev_used = self.free_all_memory();
streams.push(SortedStream::new(in_mem_stream, prev_used));
Expand Down Expand Up @@ -183,6 +184,7 @@ impl ExternalSorter {
&self.expr,
batch_size,
tracking_metrics,
self.fetch,
);
// Report to the memory manager we are no longer using memory
self.free_all_memory();
Expand Down Expand Up @@ -273,6 +275,7 @@ impl MemoryConsumer for ExternalSorter {
&self.expr,
self.session_config.batch_size(),
tracking_metrics,
self.fetch,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice thing is that it also reduces disk spilling, as sort + limit is done before writing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although to be honest, I would hope that if there is a LIMIT on the query we could probably avoid the spilling entirely

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah maybe the spilling could see the remaining batch is so small it could add the sorted data to memory again - avoiding the spill 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a future PR / optimization perhaps

);

spill_partial_sorted_stream(&mut stream?, spillfile.path(), self.schema.clone())
Expand All @@ -289,13 +292,14 @@ impl MemoryConsumer for ExternalSorter {
}
}

/// consume the non-empty `sorted_bathes` and do in_mem_sort
/// consume the non-empty `sorted_batches` and do in_mem_sort
fn in_mem_partial_sort(
buffered_batches: &mut Vec<BatchWithSortArray>,
schema: SchemaRef,
expressions: &[PhysicalSortExpr],
batch_size: usize,
tracking_metrics: MemTrackingMetrics,
fetch: Option<usize>,
) -> Result<SendableRecordBatchStream> {
assert_ne!(buffered_batches.len(), 0);
if buffered_batches.len() == 1 {
Expand Down Expand Up @@ -323,7 +327,7 @@ fn in_mem_partial_sort(
// NB timer records time taken on drop, so there are no
// calls to `timer.done()` below.
let _timer = tracking_metrics.elapsed_compute().timer();
get_sorted_iter(&sorted_arrays, expressions, batch_size)?
get_sorted_iter(&sorted_arrays, expressions, batch_size, fetch)?
};
Ok(Box::pin(SortedSizedRecordBatchStream::new(
schema,
Expand All @@ -345,6 +349,7 @@ fn get_sorted_iter(
sort_arrays: &[Vec<ArrayRef>],
expr: &[PhysicalSortExpr],
batch_size: usize,
fetch: Option<usize>,
) -> Result<SortedIterator> {
let row_indices = sort_arrays
.iter()
Expand Down Expand Up @@ -374,44 +379,38 @@ fn get_sorted_iter(
})
})
.collect::<Result<Vec<_>>>()?;
let indices = lexsort_to_indices(&sort_columns, None)?;
let indices = lexsort_to_indices(&sort_columns, fetch)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✨ 👍


Ok(SortedIterator::new(indices, row_indices, batch_size))
// Calculate composite index based on sorted indices
let row_indices = indices
.values()
.iter()
.map(|i| row_indices[*i as usize])
.collect();

Ok(SortedIterator::new(row_indices, batch_size))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some cleanup - we can do this immediately instead of keeping it in SortedIterator

}

struct SortedIterator {
/// Current logical position in the iterator
pos: usize,
/// Indexes into the input representing the correctly sorted total output
indices: UInt32Array,
/// Map each each logical input index to where it can be found in the sorted input batches
/// Sorted composite index of where to find the rows in buffered batches
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

composite: Vec<CompositeIndex>,
/// Maximum batch size to produce
batch_size: usize,
/// total length of the iterator
length: usize,
}

impl SortedIterator {
fn new(
indices: UInt32Array,
composite: Vec<CompositeIndex>,
batch_size: usize,
) -> Self {
let length = composite.len();
fn new(composite: Vec<CompositeIndex>, batch_size: usize) -> Self {
Self {
pos: 0,
indices,
composite,
batch_size,
length,
}
}

fn memory_size(&self) -> usize {
std::mem::size_of_val(self)
+ self.indices.get_array_memory_size()
+ std::mem::size_of_val(&self.composite[..])
std::mem::size_of_val(self) + std::mem::size_of_val(&self.composite[..])
}
}

Expand All @@ -420,33 +419,25 @@ impl Iterator for SortedIterator {

/// Emit a max of `batch_size` positions each time
fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.length {
let length = self.composite.len();
if self.pos >= length {
return None;
}

let current_size = min(self.batch_size, self.length - self.pos);
let current_size = min(self.batch_size, length - self.pos);

// Combine adjacent indexes from the same batch to make a slice,
// for more efficient `extend` later.
let mut last_batch_idx = 0;
let mut indices_in_batch = vec![];
let mut last_batch_idx = self.composite[self.pos].batch_idx;
let mut indices_in_batch = Vec::with_capacity(current_size);

let mut slices = vec![];
for i in 0..current_size {
let p = self.pos + i;
let c_index = self.indices.value(p) as usize;
let ci = self.composite[c_index];

if indices_in_batch.is_empty() {
last_batch_idx = ci.batch_idx;
indices_in_batch.push(ci.row_idx);
} else if ci.batch_idx == last_batch_idx {
indices_in_batch.push(ci.row_idx);
} else {
for ci in &self.composite[self.pos..self.pos + current_size] {
if ci.batch_idx != last_batch_idx {
group_indices(last_batch_idx, &mut indices_in_batch, &mut slices);
last_batch_idx = ci.batch_idx;
indices_in_batch.push(ci.row_idx);
}
indices_in_batch.push(ci.row_idx);
}

assert!(
Expand Down