Skip to content

Conversation

@erratic-pattern
Copy link
Contributor

@erratic-pattern erratic-pattern commented Dec 16, 2025

Which issue does this PR close?

Rationale for this change

Join operators don't yield to the tokio runtime, which prevents query cancellation from working and causes long-running joins to be uncancellable.

What changes are included in this PR?

Wrap join streams with cooperative() in each operator's execute() method:

Are these changes tested?

I made a standalone reproducer with tests, but they are too flaky to include in DataFusion test suite.

Unsure of how to test this reliably in CI without introducing test flakiness / timing issues.

Are there any user-facing changes?

No API changes.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @erratic-pattern -- this makes sense to me

cc @pepijnve (our cancellation expert) and @comphead (as our join expert)

I think we should adding tests for this code to avoid potential regressions.

Perhaps you could adapt the tests from https://github.com/erratic-pattern/datafusion-join-cancellation-repro/blob/main/src/main.rs into https://github.com/alamb/datafusion/blob/775277ae3cef7a97f0ca4868b4ccdfc03e9507ee/datafusion/core/tests/execution/coop.rs#L67

I did verify locally the tests in https://github.com/erratic-pattern/datafusion-join-cancellation-repro/blob/main/src/main.rs fail on main and pass with this PR:

Details
---- tests::test_nested_loop_join stdout ----
Testing NestedLoopJoinExec cancellation...

thread 'tests::test_nested_loop_join' (28125262) panicked at src/main.rs:72:17:
FAILED: NestedLoopJoinExec did not respond to cancellation within 5.014958s
The join operator is not yielding to the tokio runtime.

---- tests::test_sort_merge_join stdout ----
Testing SortMergeJoinExec cancellation...

thread 'tests::test_sort_merge_join' (28125263) panicked at src/main.rs:72:17:
FAILED: SortMergeJoinExec did not respond to cancellation within 5.019358375s
The join operator is not yielding to the tokio runtime.

---- tests::test_cross_join stdout ----
Testing CrossJoinExec cancellation...

thread 'tests::test_cross_join' (28125260) panicked at src/main.rs:72:17:
FAILED: CrossJoinExec did not respond to cancellation within 5.005322833s
The join operator is not yielding to the tokio runtime.
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

---- tests::test_hash_join stdout ----
Testing HashJoinExec cancellation...

thread 'tests::test_hash_join' (28125261) panicked at src/main.rs:72:17:
FAILED: HashJoinExec did not respond to cancellation within 5.009637375s
The join operator is not yielding to the tokio runtime.

@pepijnve
Copy link
Contributor

I had a quick look at the test case. It seems like it should be triggering yielding already. I’ll take a closer look as soon as I can.

@pepijnve
Copy link
Contributor

@alamb for me, two of the four test cases pass on main. Hash and sort merge consistently work; cross and nested loop fail.

@pepijnve
Copy link
Contributor

pepijnve commented Dec 16, 2025

For cross join specifically, as far as I can tell we're getting stuck looping in BuildBatches state. I can't really explain why the proposed fix would make that loop exit since the coop wrapper is added around the inner build batches loop. The task budget should only be getting decremented once and then still get stuck in the same loop. A more precise fix for this problem would be to decrement the tokio task budget on each loop iteration explicitly.

--- a/datafusion/physical-plan/src/joins/cross_join.rs	(revision 2bea7968977fd9e2f78c766ca553a45068c15048)
+++ b/datafusion/physical-plan/src/joins/cross_join.rs	(date 1765923164223)
@@ -581,6 +581,7 @@
                     handle_state!(ready!(self.fetch_probe_batch(cx)))
                 }
                 CrossJoinStreamState::BuildBatches(_) => {
+                    ready!(tokio::task::coop::poll_proceed(cx)).made_progress();
                     let poll = handle_state!(self.build_batches());
                     self.join_metrics.baseline.record_poll(poll)
                 }

Edit: I was mistaken, the cross join is actually returning batches sufficiently frequently. It's actually the aggregation loop that's not yielding. I'm reminded of the original discussion we had on this topic in #16196. Whose responsibility is it to yield sufficiently frequently? Back then I argued that it's the looping code (i.e. the aggregation loop) that's responsible for this because that's where the root cause of the problem lies. We couldn't reach consensus on that and in the end only put budget consumption (and yielding) in the leaf nodes.
What you're seeing here is that decision coming back to bite us. In this particular test setup the cross join stream isn't pulling data sufficiently fast from the budget consuming streams. As a consequence it can take a long time to deplete the task budget and yield.
As before, there are two ways to go about fixing this. Either we inject more budget consumption at the batch production side (as this PR is doing), or we add it at the looping batch consumption side (i.e. in the aggregation logic).

@comphead
Copy link
Contributor

I checked PR in datafusion-cli with TPCDS q99 which takes some time to execute and having 3-4 joins.

Having started the query I can see CPU usage for top -pid $(pgrep datafusion-cli) but once Ctrl+C I can see 0 usage, so likely the query was cancelled and released resources. For main I can see the same behavior, but resources released perhaps couple of seconds later.

@pepijnve
Copy link
Contributor

I've experimented a bit with a variant of what's suggested in this PR in https://github.com/pepijnve/datafusion/tree/coop_joins. The idea there in the cross join implementation, is that you want to consume a unit of task budget in each loop iteration. For the fetch states the assumption is that the polled stream will take care of that. In the build batches state, there are no Tokio resources being accessed directly or indirectly, so we need to do so ourselves. I added a non-async variant of Tokio's async consume_budget() that's more convenient to use in manual stream implementations for this.


// create join stream
Ok(Box::pin(SortMergeJoinStream::try_new(
Ok(make_cooperative(Box::pin(SortMergeJoinStream::try_new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this kind of situation, you can use

Box::pin(cooperative(SortMergeJoinStream::try_new...

instead. This avoids one level of pointer indirection.

@erratic-pattern
Copy link
Contributor Author

If you are seeing passing on the repro tests, you may need to adjust the reproducer depending on local environment due to timing issues.

Some things I found that needed to be adjusted if the query is completing too quickly.

  • if the query finishes before the initial 250ms sleep it won't hang on the task cancellation. This initial sleep is needed to ensure the task enters the JoinExec stream before handle.abort() is called. A smaller initial sleep will fix this.
  • If the query finishes before the 5 second timeout the repro tests will pass despite not cancelling properly. In this case the query is completing normally within the 5 second window, but ignoring the cancellation. Maybe the repro needs a better way to indicate this by checking whether the query succeeded. To fix this, bump the number of rows OR reduce the timeout to ensure that the query execution time takes longer than the timeout.

@erratic-pattern erratic-pattern force-pushed the fix-join-cooperative-scheduling branch from 35f765d to 0a99e70 Compare December 17, 2025 16:18
@alamb
Copy link
Contributor

alamb commented Dec 17, 2025

Edit: I was mistaken, the cross join is actually returning batches sufficiently frequently. It's actually the aggregation loop that's not yielding. I'm reminded of the original discussion we had on this topic in #16196. Whose responsibility is it to yield sufficiently frequently? Back then I argued that it's the looping code (i.e. the aggregation loop) that's responsible for this because that's where the root cause of the problem lies.

My memory was that the general consensus was that the operators themselves (ie. in this case the aggregation loop) should be yielding periodically to the scheduler, and that the cooperative() method was a workaround that gave us some more time to figure out how to do this (and tokio's budget wasn't yet public)

I've experimented a bit with a variant of what's suggested in this PR in https://github.com/pepijnve/datafusion/tree/coop_joins.

I really like this method from pepijnve@aeecad3

/// Consumes a unit of budget. If the task budget has been depleted `Poll::Pending` is returned.
/// Otherwise, `Poll::Ready(()))` is returned.
///
/// The task will only yield if its entire coop budget has been exhausted.
/// This function can be used in order to insert optional yield points into long
/// computations that do not use Tokio resources,
/// without redundantly yielding to the runtime each time.
pub fn consume_budget(cx: &mut Context<'_>) -> Poll<()> {
    tokio::task::coop::poll_proceed(cx).map(|r| r.made_progress())
}

@pepijnve would your proposal be to add the appropriate consume_budget calls in group by hash instead?

@pepijnve
Copy link
Contributor

@pepijnve would your proposal be to add the appropriate consume_budget calls in group by hash instead?

That's a tricky question because I don't think there's a clear, 'best' choice.

A very quick recap first for the people who weren't involved in this work. Here's a quick and dirty adaptation of an example from the Tokio docs that resembles the type of code you typically see in DataFusion:

fn drop_all(mut input: Pin<&mut (dyn RecordBatchStream + Send)>, cx: &mut Context<'_>) -> Poll<()> {
    while let Some(_) = ready!(input.poll_next_unpin(cx)) {}
    Poll::Ready(())
}

In Tokio, a simple loop like that can be problematic. If every input.poll_next_unpin() invocation returns Poll::Ready, then the while loop will keep on going until the stream is completely consumed. The effect you see then is that the task that's calling drop_all is not cancelable.

The solution to this is to ensure you periodically "yield to the runtime". The way to do that is to ensure drop_all breaks out of the loop every now and then by returning Poll::Pending.

So in this example, what's the appropriate fix to achieve that? There are really only two options:

  1. ensure every possible input value returns Poll::Pending periodically
  2. adapt the drop_all function so that it returns Poll::Pending periodically itself

If we map this onto one of the problematic queries from the reproduction case

SELECT sum(t1.v + t2.v) FROM t1, t2

the loop here is introduced by the sum aggregation. If you remove it from the query, then the query cancels just fine. The fix we're discussing so far in this PR is option 1 described above. The alternative of changing the aggregation operator (and possibly other stream draining operators) would be option 2.

The downside of both options is that if we sprinkle task budget consumption around the codebase that task will yield more frequently which may introduce performance regressions. The question then is which approach leads to the least amount of redundant yielding.

@alamb
Copy link
Contributor

alamb commented Jan 12, 2026

I am just coming back to this issue, as it is affecting some of our customers (who issue some big query, often by accident, and then the fact the queries take several seconds to cancel ties up a non trivial amount of their resources). We have deployed this patch into production and will report back on how it works.

The full issue is explained quite well in this blog (if I immodestly say so myself): https://datafusion.apache.org/blog/2025/06/30/cancellation/

So in this example, what's the appropriate fix to achieve that? There are really only two options:

in my mind, the two options map to

  1. HAVE EACH OPERATOR CHECK AT OUTPUT ensure every possible input value returns Poll::Pending periodically
  2. HAVE EACH OPERATOR CHECK AT INPUT adapt the drop_all function so that it returns Poll::Pending periodically itself

I think this PR effectively starts us down the path of 1 (checking at the output of each operator)

The more I think about this, the more I think I understand that @pepijnve is suggesting that that we should add the yield checking to the aggregation (and other stream draining operators)

The alternative of changing the aggregation operator (and possibly other stream draining operators) would be option 2.

I will look into this idea (checking the budget at input for operators) and see what it might look like as well as triyng to make the cancellation suggestions in

/// To enable timely cancellation, the [`Stream`] that is returned must not
/// block the CPU indefinitely and must yield back to the tokio runtime regularly.
/// In a typical [`ExecutionPlan`], this automatically happens unless there are
/// special circumstances; e.g. when the computational complexity of processing a
/// batch is superlinear. See this [general guideline][async-guideline] for more context
/// on this point, which explains why one should avoid spending a long time without
/// reaching an `await`/yield point in asynchronous runtimes.
/// This can be achieved by using the utilities from the [`coop`](crate::coop) module, by
/// manually returning [`Poll::Pending`] and setting up wakers appropriately, or by calling
/// [`tokio::task::yield_now()`] when appropriate.
/// In special cases that warrant manual yielding, determination for "regularly" may be
/// made using the [Tokio task budget](https://docs.rs/tokio/latest/tokio/task/coop/index.html),
/// a timer (being careful with the overhead-heavy system call needed to take the time), or by
/// counting rows or batches.
more specific

BTW as an aside, I am going to talk about our use of Tokio in DataFusion at the Tokio conference in April and this is definitely one of the items I will discuss

@pepijnve
Copy link
Contributor

pepijnve commented Jan 12, 2026

I will look into this idea (checking the budget at input for operators) and see what it might look like as well as triyng to make the cancellation suggestions in

It's stale at this point, but #16301 might still be of some use. One idea I remember playing with was to have a ConsumptionType counterpart to EmissionType that could indicate that an operator eagerly drains a particular input stream or only streams from it. That might allow more automatic coop injection. I don't remember if I actually tried that or not.

edit: one thing that came back to me was that it wasn't clear what the consumption type of Filter should be. It's not really 'draining', but it's not entirely streaming either. But a highly selective predicate can result in long running loops as well.

@alamb
Copy link
Contributor

alamb commented Jan 12, 2026

Another thing we can do to reduce the overhead of calling consume_budget might be to have a function / object that only calls it once every N (maybe 10? ) batches as well 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Join operators don't yield to tokio runtime, preventing query cancellation

4 participants