From 283d6dc176342426d74e8c749228845995edfa6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Tue, 31 Oct 2023 10:37:04 +0000 Subject: [PATCH] GroupedHashAggregateStream breaks spill batch ... into smaller chunks to decrease memory required for merging. --- datafusion/physical-plan/src/aggregates/mod.rs | 2 +- datafusion/physical-plan/src/aggregates/row_hash.rs | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 4c612223178c9..da152a6264afa 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -2155,7 +2155,7 @@ mod tests { spill: bool, ) -> Result<()> { let task_ctx = if spill { - new_spill_ctx(2, 2812) + new_spill_ctx(2, 2886) } else { Arc::new(TaskContext::default()) }; diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index d773533ad6a32..7b660885845b2 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -673,7 +673,16 @@ impl GroupedHashAggregateStream { let spillfile = self.runtime.disk_manager.create_tmp_file("HashAggSpill")?; let mut writer = IPCWriter::new(spillfile.path(), &emit.schema())?; // TODO: slice large `sorted` and write to multiple files in parallel - writer.write(&sorted)?; + let mut offset = 0; + let total_rows = sorted.num_rows(); + + while offset < total_rows { + let length = std::cmp::min(total_rows - offset, self.batch_size); + let batch = sorted.slice(offset, length); + offset += batch.num_rows(); + writer.write(&batch)?; + } + writer.finish()?; self.spill_state.spills.push(spillfile); Ok(())