Skip to content

Streaming panic when spilling batches with mixed nullability #21292

@gruuya

Description

@gruuya

Describe the bug

We've been seeing a panic during streaming for a large query (on sizable tables with high cardinality)

 Column 'val' is declared as non-nullable but contains null values

This is unexpected as the plan seems fine, i.e. the column is nullable from the leaf nodes up to the root node, so this doesn't appear to be a planning error.

The query itself has a union operator where one child has a nullable schema (column projection), while the other child has non-nullable schema (literal projection), wrapped by some aggregation operations (among other things).

The problem seems to occur when there's spilling and the input batches with mixed nullability arive in a specific order

/// Appends a `RecordBatch` to the spill file, initializing the writer if necessary.
///
/// # Errors
/// - Returns an error if the file is not active (has been finalized)
/// - Returns an error if appending would exceed the disk usage limit configured
/// by `max_temp_directory_size` in `DiskManager`
pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<()> {
if self.in_progress_file.is_none() {
return Err(exec_datafusion_err!(
"Append operation failed: No active in-progress file. The file may have already been finalized."
));
}
if self.writer.is_none() {
let schema = batch.schema();

Note that the schema is inherited from the first batch, instead of the spill_writer.

To Reproduce

It's quite hard to reproduce this at a higher level (i.e. with plain SQL or DataFrame API), but the best I could Claude up is this

const NUM_BATCHES: usize = 200;
const ROWS_PER_BATCH: usize = 10;

fn non_nullable_schema() -> Arc<Schema> {
    Arc::new(Schema::new(vec![
        Field::new("key", DataType::Int64, false),
        Field::new("val", DataType::Int64, false),
    ]))
}

fn nullable_schema() -> Arc<Schema> {
    Arc::new(Schema::new(vec![
        Field::new("key", DataType::Int64, false),
        Field::new("val", DataType::Int64, true),
    ]))
}

fn non_nullable_batches() -> Vec<RecordBatch> {
    (0..NUM_BATCHES)
        .map(|i| {
            let start = (i * ROWS_PER_BATCH) as i64;
            let keys: Vec<i64> = (start..start + ROWS_PER_BATCH as i64).collect();
            RecordBatch::try_new(
                non_nullable_schema(),
                vec![
                    Arc::new(Int64Array::from(keys)),
                    Arc::new(Int64Array::from(vec![0i64; ROWS_PER_BATCH])),
                ],
            )
            .unwrap()
        })
        .collect()
}

fn nullable_batches() -> Vec<RecordBatch> {
    (0..NUM_BATCHES)
        .map(|i| {
            let start = (i * ROWS_PER_BATCH) as i64;
            let keys: Vec<i64> = (start..start + ROWS_PER_BATCH as i64).collect();
            let vals: Vec<Option<i64>> = (0..ROWS_PER_BATCH)
                .map(|j| if j % 3 == 1 { None } else { Some(j as i64) })
                .collect();
            RecordBatch::try_new(
                nullable_schema(),
                vec![
                    Arc::new(Int64Array::from(keys)),
                    Arc::new(Int64Array::from(vals)),
                ],
            )
            .unwrap()
        })
        .collect()
}

fn build_task_ctx(pool_size: usize) -> Arc<datafusion_execution::TaskContext> {
    let session_config = SessionConfig::new().with_batch_size(2);
    let runtime = RuntimeEnvBuilder::new()
        .with_memory_pool(Arc::new(FairSpillPool::new(pool_size)))
        .build_arc()
        .unwrap();
    Arc::new(
        datafusion_execution::TaskContext::default()
            .with_session_config(session_config)
            .with_runtime(runtime),
    )
}

/// Exercises spilling through UnionExec -> RepartitionExec where union children
/// have mismatched nullability (one child's `val` is non-nullable, the other's
/// is nullable with NULLs). A tiny FairSpillPool forces all batches to spill.
///
/// UnionExec returns child streams without schema coercion, so batches from
/// different children carry different per-field nullability into the shared
/// SpillPool. The IPC writer must use the SpillManager's canonical (nullable)
/// schema — not the first batch's schema — so readback batches are valid.
///
/// Otherwise, sort_batch will panic with
/// `Column 'val' is declared as non-nullable but contains null values`
#[tokio::test]
async fn test_sort_union_repartition_spill_mixed_nullability() {
    let non_nullable_exec = MemorySourceConfig::try_new_exec(
        &[non_nullable_batches()],
        non_nullable_schema(),
        None,
    )
        .unwrap();

    let nullable_exec =
        MemorySourceConfig::try_new_exec(&[nullable_batches()], nullable_schema(), None)
            .unwrap();

    let union_exec =
        UnionExec::try_new(vec![non_nullable_exec, nullable_exec]).unwrap();
    assert!(union_exec.schema().field(1).is_nullable());

    let repartition = Arc::new(
        RepartitionExec::try_new(union_exec, Partitioning::RoundRobinBatch(1)).unwrap(),
    );

    let task_ctx = build_task_ctx(200);
    let mut stream = repartition.execute(0, task_ctx).unwrap();

    let sort_expr = LexOrdering::new(vec![PhysicalSortExpr {
        expr: col("key", &nullable_schema()).unwrap(),
        options: SortOptions::default(),
    }])
        .unwrap();

    let mut total_rows = 0usize;
    let mut total_nulls = 0usize;
    while let Some(result) = stream.next().await {
        let batch = result.unwrap();

        let batch = sort_batch(&batch, &sort_expr, None).unwrap();

        total_rows += batch.num_rows();
        total_nulls += batch.column(1).null_count();
    }

    assert_eq!(
        total_rows,
        NUM_BATCHES * ROWS_PER_BATCH * 2,
        "All rows from both UNION branches should be present"
    );
    assert!(
        total_nulls > 0,
        "Expected some null values in output (i.e. nullable batches were processed)"
    );
}

This test fails with Column 'val' is declared as non-nullable but contains null values.

On the other hand the test does pass with the following change

diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
index 2666ab882..362768ee5 100644
--- a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
+++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
@@ -62,7 +62,11 @@ impl InProgressSpillFile {
             ));
         }
         if self.writer.is_none() {
-            let schema = batch.schema();
+            // Use the SpillManager's declared schema rather than the batch's schema.
+            // Individual batches may have different schemas (e.g., different nullability)
+            // when they come from different branches of a UnionExec. The SpillManager's
+            // schema represents the canonical schema that all batches should conform to.
+            let schema = self.spill_writer.schema();
             if let Some(in_progress_file) = &mut self.in_progress_file {
                 self.writer = Some(IPCStreamWriter::new(
                     in_progress_file.path(),

Expected behavior

There should be no streaming errors.

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions