Skip to content

Conversation

@ding-young
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

As described in above issue, since we introduced cooperative stream when reading spill file, we need to test that the reader stream properly yields. Since it is not obvious to write a test using SpillManager api, this PR adds a test that manually mocks SpillManager::read_spill_as_stream. (Note that writing to a spill file does not use the stream API.)

What changes are included in this PR?

In the test, the producer/sender stream always returns Poll::Ready, consumer/receiver stream polls it repeatedly and buffer capacity for spawn_buffered is set large enough so that yielding behavior comes from CooperativeStream.

Are these changes tested?

Are there any user-facing changes?

The visibility of spawn_buffered has changed

@github-actions github-actions bot added core Core DataFusion crate physical-plan Changes to the physical-plan crate labels Jun 30, 2025
/// If running in a tokio context spawns the execution of `stream` to a separate task
/// allowing it to execute in parallel with an intermediate buffer of size `buffer`
pub(crate) fn spawn_buffered(
pub fn spawn_buffered(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the visibility of spawn_buffered to pub to use it in the test.
If this isn't ideal, I'll follow any suggestions. (Perhaps duplicating the code might be better?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems fine to me

@ding-young
Copy link
Contributor Author

Thank you @pepijnve and fyi @zhuqi-lucas

let schema = mock_stream.schema();

let consumer_stream = futures::stream::poll_fn(move |cx| {
let mut collected = vec![];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think of having some cap of iterations or timelimit instead of infinite loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed :)

@alamb
Copy link
Contributor

alamb commented Jun 30, 2025

@pepijnve I wonder if you have some time to review this PR as well?

Comment on lines +277 to +278
// To make sure that inner stream is polled multiple times, loop until the buffer is full
// Ideally, the stream will yield before the loop ends
for _ in 0..buffer_capacity {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to loop until the buffer_capacity

@pepijnve
Copy link
Contributor

@pepijnve I wonder if you have some time to review this PR as well?

@alamb IIRC, I provided feedback out of band which was integrated by @ding-young

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ding-young
this is lgtm
If the test hits a cap lets debug_assert! it or output a warning to indicate something is wrong with the test

@ding-young ding-young force-pushed the test-cancel-spill branch 2 times, most recently from 90af070 to 6c93a0f Compare August 1, 2025 03:57
Comment on lines +297 to +299
// This should be unreachable since the stream is canceled
unreachable!("Expected the stream to be canceled, but it continued polling");
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated (Clippy told me to use unreachable!), thanks @comphead

@alamb alamb merged commit 9d6f923 into apache:main Aug 1, 2025
27 checks passed
@alamb
Copy link
Contributor

alamb commented Aug 1, 2025

Thanks again @ding-young and @comphead

Standing-Man pushed a commit to Standing-Man/datafusion that referenced this pull request Aug 4, 2025
…e#16616)

* add a test for mocking spawn_bufferd behavior

* Clean up

* add comment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add tests for yielding / cancelling in SpillManager

4 participants