From 488adce7ab26374ef7de24d37ad1eb8c728cf52c Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sat, 9 Apr 2022 00:42:26 +0800 Subject: [PATCH 1/3] fix: Sort with a lot of repetition values --- .../core/src/physical_plan/sorts/sort.rs | 72 ++++++++++++------ .../tests/parquet/repeat_much.snappy.parquet | Bin 0 -> 1261 bytes datafusion/core/tests/sql/order.rs | 23 ++++++ 3 files changed, 72 insertions(+), 23 deletions(-) create mode 100644 datafusion/core/tests/parquet/repeat_much.snappy.parquet diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 163a608b37af0..6e9dec470c272 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -426,8 +426,7 @@ impl Iterator for SortedIterator { // Combine adjacent indexes from the same batch to make a slice, // for more efficient `extend` later. let mut last_batch_idx = 0; - let mut start_row_idx = 0; - let mut len = 0; + let mut indices_in_batch = vec![]; let mut slices = vec![]; for i in 0..current_size { @@ -435,43 +434,70 @@ impl Iterator for SortedIterator { let c_index = self.indices.value(p) as usize; let ci = self.composite[c_index]; - if len == 0 { + if indices_in_batch.is_empty() { last_batch_idx = ci.batch_idx; - start_row_idx = ci.row_idx; - len = 1; + indices_in_batch.push(ci.row_idx); } else if ci.batch_idx == last_batch_idx { - len += 1; - // since we have pre-sort each of the incoming batches, - // so if we witnessed a wrong order of indexes from the same batch, - // it must be of the same key with the row pointed by start_row_index. - start_row_idx = min(start_row_idx, ci.row_idx); + indices_in_batch.push(ci.row_idx); } else { - slices.push(CompositeSlice { - batch_idx: last_batch_idx, - start_row_idx, - len, - }); + let indices = indices_in_batch.drain(..).collect::>(); + group_indices(last_batch_idx, indices, &mut slices); last_batch_idx = ci.batch_idx; - start_row_idx = ci.row_idx; - len = 1; + indices_in_batch.push(ci.row_idx); } } assert!( - len > 0, + !indices_in_batch.is_empty(), "There should have at least one record in a sort output slice." ); - slices.push(CompositeSlice { - batch_idx: last_batch_idx, - start_row_idx, - len, - }); + let indices = indices_in_batch.drain(..).collect::>(); + group_indices(last_batch_idx, indices, &mut slices); self.pos += current_size; Some(slices) } } +/// Group continuous indices into a slice for better `extend` performance +#[allow(clippy::stable_sort_primitive)] +fn group_indices( + batch_idx: u32, + mut positions: Vec, + output: &mut Vec, +) { + // use sort instead of sort_unstable since it's likely nearly sorted. + positions.sort(); + let mut last_pos = 0; + let mut run_length = 0; + for pos in positions.into_iter() { + if run_length == 0 { + last_pos = pos; + run_length = 1; + } else if pos == last_pos + 1 { + run_length += 1; + last_pos = pos; + } else { + output.push(CompositeSlice { + batch_idx, + start_row_idx: last_pos + 1 - run_length, + len: run_length as usize, + }); + last_pos = pos; + run_length = 1; + } + } + assert!( + run_length > 0, + "There should have at least one record in a sort output slice." + ); + output.push(CompositeSlice { + batch_idx, + start_row_idx: last_pos + 1 - run_length, + len: run_length as usize, + }) +} + /// Stream of sorted record batches struct SortedSizedRecordBatchStream { schema: SchemaRef, diff --git a/datafusion/core/tests/parquet/repeat_much.snappy.parquet b/datafusion/core/tests/parquet/repeat_much.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..f1066acbdfda7376681b6b5b2359330c4f9fed92 GIT binary patch literal 1261 zcmZwHPfXKL0LSsyuA5UC0`lgoR2h{4!_sUl3`U5F8Vy;9LA{`2uwxxe_XoBPW0oa^ za4{OQcrh9^#y{~w@Tdn9J$m$j9=v$L#27<38Myg-6WA6xe6qdv?c3MaW}_!B1gJ#U zMYq>Y@GqivTKJY_7oPaYh0vF*j7%&YPn1doL!X3B|58)ZShNife z$l!ra(4h~8;50u(~j?1TOHuGb-Yik_{F@-1WK7>`y)$hFOHN9_w=q&h@gP!9*7;p4Ks z5yde&UR&fF&LqC&yjU5bs4eXjKQ}ZY(gX*g8R+U8!3~G-ojqC+{vGnw4WbS{c>sfn()|)QX4)+CYV4YftR$yDgKy zB&=NG#mcB=nU3de?TC1x13IA#e5DssH=?~Xq|u+7^*e7Bf7@6&#-o)HqVhGqv6w74 z<{t~)s2m3kZ&(i%Wo@`FH)CXPOynk~&6Gh!LPf901cSIYHrE7Mk(Caof{A@jha}@T z70V~w%P#jjVH`x zET;}$DX3{9#ZMVHx-~12w#+Friv#I=GHFDUIFBEin!24-2MfjG)gob$tqb;~vYxFK zdIEaDuLnF`(R?Bq^B4o+SbQKJ4hMUqAu}Ed1%okv$R7%ueg4UCv_IVMkDAd~z$YsT NHepU6WE6jNe*uMfFAo3! literal 0 HcmV?d00001 diff --git a/datafusion/core/tests/sql/order.rs b/datafusion/core/tests/sql/order.rs index 2683f20e7fdd3..d500fbb8956ff 100644 --- a/datafusion/core/tests/sql/order.rs +++ b/datafusion/core/tests/sql/order.rs @@ -16,6 +16,7 @@ // under the License. use super::*; +use fuzz_utils::{batches_to_vec, partitions_to_sorted_vec}; #[tokio::test] async fn test_sort_unprojected_col() -> Result<()> { @@ -198,3 +199,25 @@ async fn sort_empty() -> Result<()> { assert_eq!(results.len(), 0); Ok(()) } + +#[tokio::test] +async fn sort_with_lots_of_repetition_values() -> Result<()> { + let ctx = SessionContext::new(); + let filename = "tests/parquet/repeat_much.snappy.parquet"; + + ctx.register_parquet("rep", filename, ParquetReadOptions::default()) + .await?; + let sql = "select a from rep order by a"; + let actual = execute_to_batches(&ctx, sql).await; + let actual = batches_to_vec(&actual); + + let sql1 = "select a from rep"; + let expected = execute_to_batches(&ctx, sql1).await; + let expected = partitions_to_sorted_vec(&[expected]); + + assert_eq!(actual.len(), expected.len()); + for i in 0..actual.len() { + assert_eq!(actual[i], expected[i]); + } + Ok(()) +} From 8b340f20573e36b0b799944fd40a912f8103792e Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sat, 9 Apr 2022 09:40:52 +0800 Subject: [PATCH 2/3] clear insteal of drain --- .../core/src/physical_plan/sorts/sort.rs | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 6e9dec470c272..4e862562557f7 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -440,8 +440,7 @@ impl Iterator for SortedIterator { } else if ci.batch_idx == last_batch_idx { indices_in_batch.push(ci.row_idx); } else { - let indices = indices_in_batch.drain(..).collect::>(); - group_indices(last_batch_idx, indices, &mut slices); + group_indices(last_batch_idx, &mut indices_in_batch, &mut slices); last_batch_idx = ci.batch_idx; indices_in_batch.push(ci.row_idx); } @@ -451,8 +450,7 @@ impl Iterator for SortedIterator { !indices_in_batch.is_empty(), "There should have at least one record in a sort output slice." ); - let indices = indices_in_batch.drain(..).collect::>(); - group_indices(last_batch_idx, indices, &mut slices); + group_indices(last_batch_idx, &mut indices_in_batch, &mut slices); self.pos += current_size; Some(slices) @@ -463,27 +461,27 @@ impl Iterator for SortedIterator { #[allow(clippy::stable_sort_primitive)] fn group_indices( batch_idx: u32, - mut positions: Vec, + positions: &mut Vec, output: &mut Vec, ) { - // use sort instead of sort_unstable since it's likely nearly sorted. + // use sort instead of sort_unstable since the input indices is nearly sorted. positions.sort(); let mut last_pos = 0; let mut run_length = 0; - for pos in positions.into_iter() { + for pos in positions.iter() { if run_length == 0 { - last_pos = pos; + last_pos = *pos; run_length = 1; - } else if pos == last_pos + 1 { + } else if *pos == last_pos + 1 { run_length += 1; - last_pos = pos; + last_pos = *pos; } else { output.push(CompositeSlice { batch_idx, start_row_idx: last_pos + 1 - run_length, len: run_length as usize, }); - last_pos = pos; + last_pos = *pos; run_length = 1; } } @@ -495,7 +493,8 @@ fn group_indices( batch_idx, start_row_idx: last_pos + 1 - run_length, len: run_length as usize, - }) + }); + positions.clear() } /// Stream of sorted record batches From 0db97d239ff8e96e1a9be1e694e3a0e0a678d5c8 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sat, 9 Apr 2022 10:41:44 +0800 Subject: [PATCH 3/3] Use unstable --- datafusion/core/src/physical_plan/sorts/sort.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 4e862562557f7..1d13efd14eca8 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -458,14 +458,12 @@ impl Iterator for SortedIterator { } /// Group continuous indices into a slice for better `extend` performance -#[allow(clippy::stable_sort_primitive)] fn group_indices( batch_idx: u32, positions: &mut Vec, output: &mut Vec, ) { - // use sort instead of sort_unstable since the input indices is nearly sorted. - positions.sort(); + positions.sort_unstable(); let mut last_pos = 0; let mut run_length = 0; for pos in positions.iter() {