diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 163a608b37af0..1d13efd14eca8 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -426,8 +426,7 @@ impl Iterator for SortedIterator { // Combine adjacent indexes from the same batch to make a slice, // for more efficient `extend` later. let mut last_batch_idx = 0; - let mut start_row_idx = 0; - let mut len = 0; + let mut indices_in_batch = vec![]; let mut slices = vec![]; for i in 0..current_size { @@ -435,43 +434,67 @@ impl Iterator for SortedIterator { let c_index = self.indices.value(p) as usize; let ci = self.composite[c_index]; - if len == 0 { + if indices_in_batch.is_empty() { last_batch_idx = ci.batch_idx; - start_row_idx = ci.row_idx; - len = 1; + indices_in_batch.push(ci.row_idx); } else if ci.batch_idx == last_batch_idx { - len += 1; - // since we have pre-sort each of the incoming batches, - // so if we witnessed a wrong order of indexes from the same batch, - // it must be of the same key with the row pointed by start_row_index. - start_row_idx = min(start_row_idx, ci.row_idx); + indices_in_batch.push(ci.row_idx); } else { - slices.push(CompositeSlice { - batch_idx: last_batch_idx, - start_row_idx, - len, - }); + group_indices(last_batch_idx, &mut indices_in_batch, &mut slices); last_batch_idx = ci.batch_idx; - start_row_idx = ci.row_idx; - len = 1; + indices_in_batch.push(ci.row_idx); } } assert!( - len > 0, + !indices_in_batch.is_empty(), "There should have at least one record in a sort output slice." ); - slices.push(CompositeSlice { - batch_idx: last_batch_idx, - start_row_idx, - len, - }); + group_indices(last_batch_idx, &mut indices_in_batch, &mut slices); self.pos += current_size; Some(slices) } } +/// Group continuous indices into a slice for better `extend` performance +fn group_indices( + batch_idx: u32, + positions: &mut Vec, + output: &mut Vec, +) { + positions.sort_unstable(); + let mut last_pos = 0; + let mut run_length = 0; + for pos in positions.iter() { + if run_length == 0 { + last_pos = *pos; + run_length = 1; + } else if *pos == last_pos + 1 { + run_length += 1; + last_pos = *pos; + } else { + output.push(CompositeSlice { + batch_idx, + start_row_idx: last_pos + 1 - run_length, + len: run_length as usize, + }); + last_pos = *pos; + run_length = 1; + } + } + assert!( + run_length > 0, + "There should have at least one record in a sort output slice." + ); + output.push(CompositeSlice { + batch_idx, + start_row_idx: last_pos + 1 - run_length, + len: run_length as usize, + }); + positions.clear() +} + /// Stream of sorted record batches struct SortedSizedRecordBatchStream { schema: SchemaRef, diff --git a/datafusion/core/tests/parquet/repeat_much.snappy.parquet b/datafusion/core/tests/parquet/repeat_much.snappy.parquet new file mode 100644 index 0000000000000..f1066acbdfda7 Binary files /dev/null and b/datafusion/core/tests/parquet/repeat_much.snappy.parquet differ diff --git a/datafusion/core/tests/sql/order.rs b/datafusion/core/tests/sql/order.rs index 2683f20e7fdd3..d500fbb8956ff 100644 --- a/datafusion/core/tests/sql/order.rs +++ b/datafusion/core/tests/sql/order.rs @@ -16,6 +16,7 @@ // under the License. use super::*; +use fuzz_utils::{batches_to_vec, partitions_to_sorted_vec}; #[tokio::test] async fn test_sort_unprojected_col() -> Result<()> { @@ -198,3 +199,25 @@ async fn sort_empty() -> Result<()> { assert_eq!(results.len(), 0); Ok(()) } + +#[tokio::test] +async fn sort_with_lots_of_repetition_values() -> Result<()> { + let ctx = SessionContext::new(); + let filename = "tests/parquet/repeat_much.snappy.parquet"; + + ctx.register_parquet("rep", filename, ParquetReadOptions::default()) + .await?; + let sql = "select a from rep order by a"; + let actual = execute_to_batches(&ctx, sql).await; + let actual = batches_to_vec(&actual); + + let sql1 = "select a from rep"; + let expected = execute_to_batches(&ctx, sql1).await; + let expected = partitions_to_sorted_vec(&[expected]); + + assert_eq!(actual.len(), expected.len()); + for i in 0..actual.len() { + assert_eq!(actual[i], expected[i]); + } + Ok(()) +}