From 446d052c7a5e4e6d9c9e1d286fbff581372bca38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 2 Jul 2025 00:01:05 +0200 Subject: [PATCH 01/14] Reuse Rows in RowCursorStream --- datafusion/physical-plan/src/sorts/cursor.rs | 5 +-- datafusion/physical-plan/src/sorts/merge.rs | 1 + datafusion/physical-plan/src/sorts/stream.rs | 35 +++++++++++++++++--- 3 files changed, 34 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index 17033e6a31425..89fbb0fbcf89e 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -16,6 +16,7 @@ // under the License. use std::cmp::Ordering; +use std::sync::Arc; use arrow::array::{ types::ByteArrayType, Array, ArrowPrimitiveType, GenericByteArray, @@ -151,7 +152,7 @@ impl Ord for Cursor { /// Used for sorting when there are multiple columns in the sort key #[derive(Debug)] pub struct RowValues { - rows: Rows, + rows: Arc, /// Tracks for the memory used by in the `Rows` of this /// cursor. Freed on drop @@ -164,7 +165,7 @@ impl RowValues { /// /// Panics if the reservation is not for exactly `rows.size()` /// bytes or if `rows` is empty. - pub fn new(rows: Rows, reservation: MemoryReservation) -> Self { + pub fn new(rows: Arc, reservation: MemoryReservation) -> Self { assert_eq!( rows.size(), reservation.size(), diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 0c18a3b6c7032..58ba8ec657e47 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -328,6 +328,7 @@ impl SortPreservingMergeStream { fn advance_cursors(&mut self, stream_idx: usize) -> bool { if let Some(cursor) = &mut self.cursors[stream_idx] { let _ = cursor.advance(); + self.prev_cursors[stream_idx] = None; if cursor.is_finished() { // Take the current cursor, leaving `None` in its place self.prev_cursors[stream_idx] = self.cursors[stream_idx].take(); diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index e029c60b285b6..d1ca09b778646 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -21,7 +21,7 @@ use crate::{PhysicalExpr, PhysicalSortExpr}; use arrow::array::Array; use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; -use arrow::row::{RowConverter, SortField}; +use arrow::row::{RowConverter, Rows, SortField}; use datafusion_common::Result; use datafusion_execution::memory_pool::MemoryReservation; use datafusion_physical_expr_common::sort_expr::LexOrdering; @@ -88,6 +88,8 @@ pub struct RowCursorStream { streams: FusedStreams, /// Tracks the memory used by `converter` reservation: MemoryReservation, + /// rows for each partition + rows: Vec<[Option>; 2]>, } impl RowCursorStream { @@ -105,26 +107,49 @@ impl RowCursorStream { }) .collect::>>()?; - let streams = streams.into_iter().map(|s| s.fuse()).collect(); + let streams: Vec<_> = streams.into_iter().map(|s| s.fuse()).collect(); let converter = RowConverter::new(sort_fields)?; + let mut rows = Vec::with_capacity(streams.len()); + for _ in &streams { + // Initialize each stream with an empty Rows + rows.push([Some(Arc::new(converter.empty_rows(0, 0))), Some(Arc::new(converter.empty_rows(0, 0)))]); + } Ok(Self { converter, reservation, column_expressions: expressions.iter().map(|x| Arc::clone(&x.expr)).collect(), streams: FusedStreams(streams), + rows: rows, }) } - fn convert_batch(&mut self, batch: &RecordBatch) -> Result { + fn convert_batch( + &mut self, + batch: &RecordBatch, + stream_idx: usize, + ) -> Result { let cols = self .column_expressions .iter() .map(|expr| expr.evaluate(batch)?.into_array(batch.num_rows())) .collect::>>()?; - let rows = self.converter.convert_columns(&cols)?; + // At this point, ownership should be unique + let mut rows = Arc::try_unwrap(self.rows[stream_idx][1].take().unwrap()) + .expect("unique ownership of rows"); + + rows.clear(); + + self.converter.append(&mut rows, &cols)?; self.reservation.try_resize(self.converter.size())?; + let rows = Arc::new(rows); + + self.rows[stream_idx][1] = Some(rows.clone()); + + let [a, b] = &mut self.rows[stream_idx]; + std::mem::swap(a, b); + // track the memory in the newly created Rows. let mut rows_reservation = self.reservation.new_empty(); rows_reservation.try_grow(rows.size())?; @@ -146,7 +171,7 @@ impl PartitionedStream for RowCursorStream { ) -> Poll> { Poll::Ready(ready!(self.streams.poll_next(cx, stream_idx)).map(|r| { r.and_then(|batch| { - let cursor = self.convert_batch(&batch)?; + let cursor = self.convert_batch(&batch, stream_idx)?; Ok((cursor, batch)) }) })) From 99c4f5be0fc0da477dbaf1c372caba624f33011f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 2 Jul 2025 00:05:06 +0200 Subject: [PATCH 02/14] WIP --- datafusion/physical-plan/src/sorts/merge.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 58ba8ec657e47..0c18a3b6c7032 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -328,7 +328,6 @@ impl SortPreservingMergeStream { fn advance_cursors(&mut self, stream_idx: usize) -> bool { if let Some(cursor) = &mut self.cursors[stream_idx] { let _ = cursor.advance(); - self.prev_cursors[stream_idx] = None; if cursor.is_finished() { // Take the current cursor, leaving `None` in its place self.prev_cursors[stream_idx] = self.cursors[stream_idx].take(); From 729115f3598049dd8725133be4b8a8e05a022a33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 2 Jul 2025 00:22:10 +0200 Subject: [PATCH 03/14] Fmt --- datafusion/physical-plan/src/sorts/stream.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index d1ca09b778646..5ce1f1b97af00 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -112,7 +112,10 @@ impl RowCursorStream { let mut rows = Vec::with_capacity(streams.len()); for _ in &streams { // Initialize each stream with an empty Rows - rows.push([Some(Arc::new(converter.empty_rows(0, 0))), Some(Arc::new(converter.empty_rows(0, 0)))]); + rows.push([ + Some(Arc::new(converter.empty_rows(0, 0))), + Some(Arc::new(converter.empty_rows(0, 0))), + ]); } Ok(Self { converter, @@ -148,7 +151,7 @@ impl RowCursorStream { self.rows[stream_idx][1] = Some(rows.clone()); let [a, b] = &mut self.rows[stream_idx]; - std::mem::swap(a, b); + std::mem::swap(a, b); // track the memory in the newly created Rows. let mut rows_reservation = self.reservation.new_empty(); From 7f315cda45e1141ac6cc958081ef210141dbc342 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 2 Jul 2025 00:27:20 +0200 Subject: [PATCH 04/14] Add comment, make it backwards compatible --- datafusion/physical-plan/src/sorts/stream.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 5ce1f1b97af00..622c6d4f31fc2 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -78,6 +78,8 @@ impl FusedStreams { /// A [`PartitionedStream`] that wraps a set of [`SendableRecordBatchStream`] /// and computes [`RowValues`] based on the provided [`PhysicalSortExpr`] +/// Note: for optimal performance, keep only the final RowValues in memory +/// before pulling the next batch. This will allow reuse of allocations. #[derive(Debug)] pub struct RowCursorStream { /// Converter to convert output of physical expressions @@ -139,7 +141,7 @@ impl RowCursorStream { // At this point, ownership should be unique let mut rows = Arc::try_unwrap(self.rows[stream_idx][1].take().unwrap()) - .expect("unique ownership of rows"); + .unwrap_or_else(|_| self.converter.empty_rows(0, 0)); rows.clear(); From 5db1bc15d0df5a539129d1f07b10e84a1d92420b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 2 Jul 2025 00:28:33 +0200 Subject: [PATCH 05/14] Add comment, make it backwards compatible --- datafusion/physical-plan/src/sorts/stream.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 622c6d4f31fc2..e5fe03232f523 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -90,7 +90,8 @@ pub struct RowCursorStream { streams: FusedStreams, /// Tracks the memory used by `converter` reservation: MemoryReservation, - /// rows for each partition + /// Allocated rows for each partition, we keep two to allow for buffering one + /// in the consumer of the stream rows: Vec<[Option>; 2]>, } From 5b25ca462e3e3ecb7f023d98e61546407e61ea7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 2 Jul 2025 00:30:16 +0200 Subject: [PATCH 06/14] Add comment, make it backwards compatible --- datafusion/physical-plan/src/sorts/stream.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index e5fe03232f523..79907204bd669 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -140,7 +140,7 @@ impl RowCursorStream { .map(|expr| expr.evaluate(batch)?.into_array(batch.num_rows())) .collect::>>()?; - // At this point, ownership should be unique + // At this point, ownership should of this Rows should be unique let mut rows = Arc::try_unwrap(self.rows[stream_idx][1].take().unwrap()) .unwrap_or_else(|_| self.converter.empty_rows(0, 0)); @@ -153,6 +153,7 @@ impl RowCursorStream { self.rows[stream_idx][1] = Some(rows.clone()); + // swap the curent with the previous one, so that the next poll can reuse the Rows from the previous poll let [a, b] = &mut self.rows[stream_idx]; std::mem::swap(a, b); From 163567e80027ec4af2ae30d93cc4dcc4cc89c403 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 2 Jul 2025 00:41:45 +0200 Subject: [PATCH 07/14] Clippy --- datafusion/physical-plan/src/sorts/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 79907204bd669..15ea20c3d0d7d 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -151,7 +151,7 @@ impl RowCursorStream { let rows = Arc::new(rows); - self.rows[stream_idx][1] = Some(rows.clone()); + self.rows[stream_idx][1] = Some(Arc::clone(&rows)); // swap the curent with the previous one, so that the next poll can reuse the Rows from the previous poll let [a, b] = &mut self.rows[stream_idx]; From f92137f181e1000364c15c50a46a0097c22b0bb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 2 Jul 2025 07:01:29 +0200 Subject: [PATCH 08/14] Clippy --- datafusion/physical-plan/src/sorts/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 15ea20c3d0d7d..e8c77492e7b99 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -125,7 +125,7 @@ impl RowCursorStream { reservation, column_expressions: expressions.iter().map(|x| Arc::clone(&x.expr)).collect(), streams: FusedStreams(streams), - rows: rows, + rows, }) } From add5c9e494c17a78001797a7bd2bd65be47d7a35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 2 Jul 2025 22:10:42 +0200 Subject: [PATCH 09/14] Return error on non-unique reference --- datafusion/physical-plan/src/sorts/stream.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index e8c77492e7b99..1d7d63128de5f 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -22,7 +22,7 @@ use arrow::array::Array; use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; use arrow::row::{RowConverter, Rows, SortField}; -use datafusion_common::Result; +use datafusion_common::{DataFusionError, Result}; use datafusion_execution::memory_pool::MemoryReservation; use datafusion_physical_expr_common::sort_expr::LexOrdering; use futures::stream::{Fuse, StreamExt}; @@ -78,8 +78,7 @@ impl FusedStreams { /// A [`PartitionedStream`] that wraps a set of [`SendableRecordBatchStream`] /// and computes [`RowValues`] based on the provided [`PhysicalSortExpr`] -/// Note: for optimal performance, keep only the final RowValues in memory -/// before pulling the next batch. This will allow reuse of allocations. +/// Note: this errors #[derive(Debug)] pub struct RowCursorStream { /// Converter to convert output of physical expressions @@ -142,7 +141,11 @@ impl RowCursorStream { // At this point, ownership should of this Rows should be unique let mut rows = Arc::try_unwrap(self.rows[stream_idx][1].take().unwrap()) - .unwrap_or_else(|_| self.converter.empty_rows(0, 0)); + .map_err(|_| { + DataFusionError::Internal( + "Rows from RowCursorStream is still in use by consumer".to_string(), + ) + })?; rows.clear(); From c59c5f9b54771fbc6047cc3a5d359dbf3e0c56a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 2 Jul 2025 22:14:46 +0200 Subject: [PATCH 10/14] Comment --- datafusion/physical-plan/src/sorts/stream.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 1d7d63128de5f..fcbf24c560f9d 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -78,7 +78,8 @@ impl FusedStreams { /// A [`PartitionedStream`] that wraps a set of [`SendableRecordBatchStream`] /// and computes [`RowValues`] based on the provided [`PhysicalSortExpr`] -/// Note: this errors +/// Note: the stream returns an error if the consumer buffers more than one RowValues (i.e. holds on to two RowValues +/// from the same partition at the same time). #[derive(Debug)] pub struct RowCursorStream { /// Converter to convert output of physical expressions From 2fa920ed5b261ee12d8a254cfb03509b10d982be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 3 Jul 2025 18:11:44 +0200 Subject: [PATCH 11/14] Update datafusion/physical-plan/src/sorts/stream.rs Co-authored-by: Oleks V --- datafusion/physical-plan/src/sorts/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index fcbf24c560f9d..01f7f12423a48 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -143,7 +143,7 @@ impl RowCursorStream { // At this point, ownership should of this Rows should be unique let mut rows = Arc::try_unwrap(self.rows[stream_idx][1].take().unwrap()) .map_err(|_| { - DataFusionError::Internal( + internal_datafusion_err!( "Rows from RowCursorStream is still in use by consumer".to_string(), ) })?; From 86bb9768f7a4e03d9463dccb128b60dc5ccdf2bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 3 Jul 2025 22:28:01 +0200 Subject: [PATCH 12/14] Fix --- datafusion/physical-plan/src/sorts/stream.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 01f7f12423a48..7fa3709a86df4 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -22,7 +22,7 @@ use arrow::array::Array; use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; use arrow::row::{RowConverter, Rows, SortField}; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{internal_datafusion_err, Result}; use datafusion_execution::memory_pool::MemoryReservation; use datafusion_physical_expr_common::sort_expr::LexOrdering; use futures::stream::{Fuse, StreamExt}; @@ -144,7 +144,7 @@ impl RowCursorStream { let mut rows = Arc::try_unwrap(self.rows[stream_idx][1].take().unwrap()) .map_err(|_| { internal_datafusion_err!( - "Rows from RowCursorStream is still in use by consumer".to_string(), + "Rows from RowCursorStream is still in use by consumer" ) })?; From 66612280c15fe8b65925913fa8702869bf9b8ce9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 3 Jul 2025 22:44:38 +0200 Subject: [PATCH 13/14] Extract logic --- datafusion/physical-plan/src/sorts/stream.rs | 47 ++++++++++++++------ 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 7fa3709a86df4..da5796af1611f 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -76,6 +76,36 @@ impl FusedStreams { } } +/// A pair pf Arc that can be reused +#[derive(Debug)] +struct ReusableRows { + // inner[stream_idx] holds a two Arcs: + // at start of a new poll + // .0 is the rows from the previous poll (at start), + // .1 is the one that is being written to + // at end of a poll, .0 will be swapped with .1, + inner: Vec<[Option>; 2]>, +} + +impl ReusableRows { + // return a Rows for writing, + // does not clone if the existing rows can be reused + fn take_next(&mut self, stream_idx: usize) -> Result { + Arc::try_unwrap(self.inner[stream_idx][1].take().unwrap()).map_err(|_| { + internal_datafusion_err!( + "Rows from RowCursorStream is still in use by consumer" + ) + }) + } + // save the Rows + fn save(&mut self, stream_idx: usize, rows: Arc) { + self.inner[stream_idx][1] = Some(Arc::clone(&rows)); + // swap the curent with the previous one, so that the next poll can reuse the Rows from the previous poll + let [a, b] = &mut self.inner[stream_idx]; + std::mem::swap(a, b); + } +} + /// A [`PartitionedStream`] that wraps a set of [`SendableRecordBatchStream`] /// and computes [`RowValues`] based on the provided [`PhysicalSortExpr`] /// Note: the stream returns an error if the consumer buffers more than one RowValues (i.e. holds on to two RowValues @@ -92,7 +122,7 @@ pub struct RowCursorStream { reservation: MemoryReservation, /// Allocated rows for each partition, we keep two to allow for buffering one /// in the consumer of the stream - rows: Vec<[Option>; 2]>, + rows: ReusableRows, } impl RowCursorStream { @@ -125,7 +155,7 @@ impl RowCursorStream { reservation, column_expressions: expressions.iter().map(|x| Arc::clone(&x.expr)).collect(), streams: FusedStreams(streams), - rows, + rows: ReusableRows { inner: rows }, }) } @@ -141,12 +171,7 @@ impl RowCursorStream { .collect::>>()?; // At this point, ownership should of this Rows should be unique - let mut rows = Arc::try_unwrap(self.rows[stream_idx][1].take().unwrap()) - .map_err(|_| { - internal_datafusion_err!( - "Rows from RowCursorStream is still in use by consumer" - ) - })?; + let mut rows = self.rows.take_next(stream_idx)?; rows.clear(); @@ -155,11 +180,7 @@ impl RowCursorStream { let rows = Arc::new(rows); - self.rows[stream_idx][1] = Some(Arc::clone(&rows)); - - // swap the curent with the previous one, so that the next poll can reuse the Rows from the previous poll - let [a, b] = &mut self.rows[stream_idx]; - std::mem::swap(a, b); + self.rows.save(stream_idx, Arc::clone(&rows)); // track the memory in the newly created Rows. let mut rows_reservation = self.reservation.new_empty(); From 559ece54f7bb3729843941d84d2e39804fd0ed41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 4 Jul 2025 08:25:06 +0200 Subject: [PATCH 14/14] Doc fix --- datafusion/physical-plan/src/sorts/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index da5796af1611f..49e7413122fca 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -76,7 +76,7 @@ impl FusedStreams { } } -/// A pair pf Arc that can be reused +/// A pair of `Arc` that can be reused #[derive(Debug)] struct ReusableRows { // inner[stream_idx] holds a two Arcs: