Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use super::SendableRecordBatchStream;
use crate::stream::RecordBatchReceiverStream;
use crate::{ColumnStatistics, Statistics};

use arrow::array::Array;
use arrow::array::{Array, StringViewArray};
use arrow::datatypes::Schema;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use datafusion_common::stats::Precision;
use datafusion_common::{Result, plan_err};
Expand Down Expand Up @@ -190,7 +191,7 @@ pub fn can_project(
.max()
.is_some_and(|&i| i >= schema.fields().len())
{
Err(arrow::error::ArrowError::SchemaError(format!(
Err(ArrowError::SchemaError(format!(
"project index {} out of bounds, max field {}",
columns.iter().max().unwrap(),
schema.fields().len()
Expand All @@ -204,6 +205,31 @@ pub fn can_project(
}
}

/// Return a new `RecordBatch` with [`StringViewArray::gc`] called on such columns (if any).
pub(crate) fn gc_stringview_arrays(
batch: RecordBatch,
) -> Result<RecordBatch, ArrowError> {
let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.num_columns());

let mut arr_mutated = false;
for array in batch.columns() {
if let Some(string_view_array) = array.as_any().downcast_ref::<StringViewArray>()
{
let new_array = string_view_array.gc();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling gc here basically forces all the data to copied which is a lot of work. I am not sure it is a good idea to do so unconditionally

We had to do some pretty sophisticated heuristics of when to do a GC as part of the arrow BatchCoalescer

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I see that here. Repartition does use the batch coalescer, but on the receiver side, not the sender side. Memory tracking happens on the sender, before coalesce.

@Dandandan mentioned above that coalescing before sending had mixed results. I can try re-doing that on latest main to see if it still has the same effect. If so, I'll try branching out from the morsel PR and try it out.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think in theory it should be slightly better to do the coalescing directly when batches are potentially already in cache & saving partition - 1 * wake ups (which are not used).

But it seems (my theory) pushing it upstream might create some parallelism / mitigating some skew as also for a slow partition it can do the coalescing in parallel.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dandandan I see that you had tried moving the coalesce upstream in repartition here: #21550
But looks like you closed it. I see an improvement in clickbench from the benchmarks there. Why was it closed?

I noticed that the morsel PRs are now merged, so I was planning to try out moving the coalesce to the producer.

Copy link
Copy Markdown
Contributor

@Dandandan Dandandan Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I tried it again, but I did still see a small regression compared to the morsel PR coalesce upstream.
Perhaps it can still extract some more parallelism in certain cases (e.g. it can still do coalescing in another task/thread and start triggering IO request again in the current task?). Or when we only have a few files left we still can use some more threads to do the coalesce in another thread.

I couldn't solve it yet but feel free to try again!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, let me try this.

But if your guess is correct, then we're blocking the next IO call from happening due to CPU compute above the data source. So keeping the coalesce downstream is just removing some compute from that path. The core issue seems to be that we're not doing pre-fetching of IO while the compute is running?

I'm not sure, but I hope to get more insights when I try this out.

new_columns.push(Arc::new(new_array));
arr_mutated = true;
} else {
new_columns.push(Arc::clone(array));
}
}

if arr_mutated {
RecordBatch::try_new(batch.schema(), new_columns)
} else {
Ok(batch)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
84 changes: 81 additions & 3 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use super::{
DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream,
};
use crate::coalesce::LimitedBatchCoalescer;
use crate::common::gc_stringview_arrays;
use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
use crate::hash_utils::create_hashes;
use crate::metrics::{BaselineMetrics, SpillMetrics};
Expand Down Expand Up @@ -617,8 +618,14 @@ impl BatchPartitioner {
batch.schema(),
columns,
&options,
)
.unwrap();
)?;

// When `StringViewArray`s are present, the `take_arrays` call above
// re-uses data buffers from the original array. This causes the memory
// pool to count the same data buffers multiple times, once for each
// consumer of the repartition.
// So we gc the output arrays, which creates new data buffers.
let batch = gc_stringview_arrays(batch)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be best to use the coalesce kernels (which do GC-ing already I think) before sending them to the upstream partitions.

I got some mixed performance results from that before, but I think the upcoming morsel / workstealing changes might be able to improve this (as it won't benefit from pushing the copying work over to a new (possibly idling) thread)


partitioned_batches.push(Ok((partition, batch)));

Expand Down Expand Up @@ -1770,7 +1777,7 @@ mod tests {
{collect, expressions::col},
};

use arrow::array::{ArrayRef, StringArray, UInt32Array};
use arrow::array::{ArrayRef, StringArray, StringViewArray, UInt32Array};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::cast::as_string_array;
use datafusion_common::exec_err;
Expand Down Expand Up @@ -2544,6 +2551,33 @@ mod tests {
.collect()
}

fn test_schema_string_view() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("key", DataType::UInt32, false),
Field::new("val", DataType::Utf8View, false),
]))
}

/// Create a batch with StringViewArray data for compaction tests.
/// Strings are >12 bytes to force out-of-line storage in the buffer
/// (strings <=12 bytes are inlined in the view and don't reference the buffer).
fn create_string_view_batch(num_rows: usize, num_partitions: usize) -> RecordBatch {
let schema = test_schema_string_view();
let keys: Vec<u32> = (0..num_rows).map(|i| (i % num_partitions) as u32).collect();
let vals: Vec<String> = (0..num_rows)
.map(|i| format!("string_value_{i:0>20}"))
.collect();
let val_refs: Vec<&str> = vals.iter().map(|s| s.as_str()).collect();
RecordBatch::try_new(
schema,
vec![
Arc::new(UInt32Array::from(keys)) as ArrayRef,
Arc::new(StringViewArray::from(val_refs)) as ArrayRef,
],
)
.unwrap()
}

#[tokio::test]
async fn test_repartition_ordering_with_spilling() -> Result<()> {
// Test that repartition preserves ordering when spilling occurs
Expand Down Expand Up @@ -2617,6 +2651,50 @@ mod tests {

Ok(())
}

/// Ensure that memory tracker usage does not blow up after hash repartition of string view
/// arrays.
/// See: https://github.com/apache/datafusion/issues/20491
#[tokio::test]
async fn hash_repartition_string_view_compaction() -> Result<()> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test does not actually exercise the regression it is meant to cover.
It only checks that repartition returns all rows. That would also pass before the gc() change.

As a result, we still do not have a test that would catch the over-counting bug if this logic regresses.

Please add an assertion that observes the compaction or accounting behavior directly. For example:

  • Check that the total get_array_memory_size() across the repartitioned outputs stays close to the original batch, instead of scaling with the number of output partitions.
  • Test spill behavior under a tight memory limit (e.g., spilled bytes).
  • Verify StringViewArray buffer ownership after repartition, so outputs no longer all retain the original shared payload buffer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I did try to add a test that checks for memory size specifically, but it seemed a bit fragile to assert on those numbers. Let me try the other approaches, thanks!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the test, please take a look!

Without the fix, the mem usage blows up to 4x of the original size. With the fix, it's actually less than the original size. To have some margin for error, I have used a threshold of 2x for the mem usage assertion

let schema = test_schema_string_view();
let num_partitions = 8;

// use batch_size rows per partition to avoid the coalescer from GCing during coalesce.
let batch_size = SessionConfig::new().batch_size();

let expected_rows = batch_size * num_partitions;
let batch = create_string_view_batch(expected_rows, num_partitions);
let original_size = batch.get_array_memory_size();
let partitions = vec![vec![batch]];

let output_partitions = repartition(
&schema,
partitions,
Partitioning::Hash(vec![col("key", &schema)?], num_partitions),
)
.await?;

assert_eq!(num_partitions, output_partitions.len());

let total_rows: usize = output_partitions
.iter()
.flatten()
.map(|batch| batch.num_rows())
.sum();
assert_eq!(total_rows, expected_rows);

let repartitioned_size: usize = output_partitions
.iter()
.flatten()
.map(|batch| batch.get_array_memory_size())
.sum();
// without GC, the repartitioned_size blows up to 4x (in this case) of the original_size.
// So using a safe threshold of 2x.
assert!(repartitioned_size <= original_size * 2);

Ok(())
}
}

#[cfg(test)]
Expand Down
35 changes: 6 additions & 29 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::sync::Arc;

use parking_lot::RwLock;

use crate::common::spawn_buffered;
use crate::common::{gc_stringview_arrays, spawn_buffered};
use crate::execution_plan::{
Boundedness, CardinalityEffect, EmissionType, has_same_children_properties,
};
Expand Down Expand Up @@ -54,7 +54,7 @@ use crate::{
Statistics,
};

use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
use arrow::datatypes::SchemaRef;
use datafusion_common::config::SpillCompression;
Expand Down Expand Up @@ -496,33 +496,10 @@ impl ExternalSorter {
fn organize_stringview_arrays(
globally_sorted_batches: &mut Vec<RecordBatch>,
) -> Result<()> {
let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());

for batch in globally_sorted_batches.drain(..) {
let mut new_columns: Vec<Arc<dyn Array>> =
Vec::with_capacity(batch.num_columns());

let mut arr_mutated = false;
for array in batch.columns() {
if let Some(string_view_array) =
array.as_any().downcast_ref::<StringViewArray>()
{
let new_array = string_view_array.gc();
new_columns.push(Arc::new(new_array));
arr_mutated = true;
} else {
new_columns.push(Arc::clone(array));
}
}

let organized_batch = if arr_mutated {
RecordBatch::try_new(batch.schema(), new_columns)?
} else {
batch
};

organized_batches.push(organized_batch);
}
let organized_batches = globally_sorted_batches
.drain(..)
.map(gc_stringview_arrays)
.collect::<Result<_, _>>()?;

*globally_sorted_batches = organized_batches;

Expand Down
Loading