From 83f949e762aa8ad60deeb4fd43dd20220a09787b Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 16 Apr 2026 12:16:20 -0400 Subject: [PATCH 1/2] Try again to fix Miri in ParquetOpener. --- datafusion/datasource-parquet/src/opener.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 72549d5b8705f..f78006d222042 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1249,22 +1249,21 @@ impl PushDecoderStreamState { /// - [`Data`](DecodeResult::Data) – a decoded batch is projected and returned. /// - [`Finished`](DecodeResult::Finished) – signals end-of-stream (`None`). async fn transition(&mut self) -> Option> { + // Destructure so miri's Stacked Borrows can track disjoint field + // borrows across the `.await` yield point in get_byte_ranges. + let Self { + decoder, reader, .. + } = self; loop { - match self.decoder.try_decode() { + match decoder.try_decode() { Ok(DecodeResult::NeedsData(ranges)) => { - // IO (get_byte_ranges) and CPU (push_ranges) are still - // decoupled — they just can't live in a nested async block - // because that captures `&mut self` as one opaque borrow, - // which violates Stacked Borrows across the yield point. - // Inlining lets the compiler split the disjoint field borrows. - let data = self - .reader + let data = reader .get_byte_ranges(ranges.clone()) .await .map_err(DataFusionError::from); match data { Ok(data) => { - if let Err(e) = self.decoder.push_ranges(ranges, data) { + if let Err(e) = decoder.push_ranges(ranges, data) { return Some(Err(DataFusionError::from(e))); } } From aa64ae65ce9642544d7af09d04049de078f8854a Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 16 Apr 2026 12:49:07 -0400 Subject: [PATCH 2/2] take PushDecoderStreamState by value to resolve miri Stacked Borrows violation --- datafusion/datasource-parquet/src/opener.rs | 34 ++++++++++----------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f78006d222042..bad1c684b47f5 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1199,10 +1199,7 @@ impl RowGroupsPrunedParquetOpen { predicate_cache_records, baseline_metrics: prepared.baseline_metrics, }, - |mut state| async move { - let result = state.transition().await; - result.map(|r| (r, state)) - }, + |state| async move { state.transition().await }, ) .fuse(); @@ -1248,26 +1245,27 @@ impl PushDecoderStreamState { /// fetched from the [`AsyncFileReader`] and fed back into the decoder. /// - [`Data`](DecodeResult::Data) – a decoded batch is projected and returned. /// - [`Finished`](DecodeResult::Finished) – signals end-of-stream (`None`). - async fn transition(&mut self) -> Option> { - // Destructure so miri's Stacked Borrows can track disjoint field - // borrows across the `.await` yield point in get_byte_ranges. - let Self { - decoder, reader, .. - } = self; + /// + /// Takes `self` by value (rather than `&mut self`) so the generated future + /// owns the state directly. This avoids a Stacked Borrows violation under + /// miri where `&mut self` creates a single opaque borrow that conflicts + /// with `unfold`'s ownership across yield points. + async fn transition(mut self) -> Option<(Result, Self)> { loop { - match decoder.try_decode() { + match self.decoder.try_decode() { Ok(DecodeResult::NeedsData(ranges)) => { - let data = reader + let data = self + .reader .get_byte_ranges(ranges.clone()) .await .map_err(DataFusionError::from); match data { Ok(data) => { - if let Err(e) = decoder.push_ranges(ranges, data) { - return Some(Err(DataFusionError::from(e))); + if let Err(e) = self.decoder.push_ranges(ranges, data) { + return Some((Err(DataFusionError::from(e)), self)); } } - Err(e) => return Some(Err(e)), + Err(e) => return Some((Err(e), self)), } } Ok(DecodeResult::Data(batch)) => { @@ -1275,13 +1273,15 @@ impl PushDecoderStreamState { self.copy_arrow_reader_metrics(); let result = self.project_batch(&batch); timer.stop(); - return Some(result); + // Release the borrow on baseline_metrics before moving self + drop(timer); + return Some((result, self)); } Ok(DecodeResult::Finished) => { return None; } Err(e) => { - return Some(Err(DataFusionError::from(e))); + return Some((Err(DataFusionError::from(e)), self)); } } }