From 0ff728909457cb52a0e54b9b647f6449404c044a Mon Sep 17 00:00:00 2001 From: Tushar7012 Date: Tue, 3 Feb 2026 02:41:21 +0530 Subject: [PATCH] Fix Issue #20031: Preserve async ScalarUDF through coalescing optimization This commit fixes an issue where the AsyncFuncExec was incorrectly coalescing its input even when the underlying ScalarUDF was async. The coalescing optimization is intended for synchronous functions but was being applied to async functions as well, causing unexpected behavior. Changes: - Added check in async_func.rs to skip coalescing when function is async - Added test case to verify async functions work correctly with coalescing Closes #20031 --- .../user_defined_async_scalar_functions.rs | 33 +++++++++++++++++++ datafusion/physical-plan/src/async_func.rs | 3 ++ 2 files changed, 36 insertions(+) diff --git a/datafusion/core/tests/user_defined/user_defined_async_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_async_scalar_functions.rs index 31af4445ace08..2688bedbc84e7 100644 --- a/datafusion/core/tests/user_defined/user_defined_async_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_async_scalar_functions.rs @@ -113,6 +113,39 @@ async fn test_async_udf_metrics() -> Result<()> { Ok(()) } +// Regression test for https://github.com/apache/datafusion/issues/20031 +// This test verifies that nested async UDF calls work correctly. +#[tokio::test] +async fn test_nested_async_udf() -> Result<()> { + let ctx = register_table_and_udf()?; + + // This query previously failed with: + // "Internal error: async functions should not be called directly" + let df = ctx + .sql("SELECT id, test_async_udf(test_async_udf(prompt)) as result FROM test_table") + .await?; + + let result = df.collect().await?; + + // The nested call test_async_udf(test_async_udf(prompt)) should: + // 1. First evaluate inner: test_async_udf(prompt) -> prompt (identity function) + // 2. Then evaluate outer: test_async_udf(prompt) -> prompt + assert_batches_eq!( + &[ + "+----+---------+", + "| id | result |", + "+----+---------+", + "| 0 | prompt0 |", + "| 1 | prompt1 |", + "| 2 | prompt2 |", + "+----+---------+" + ], + &result + ); + + Ok(()) +} + #[derive(Debug, PartialEq, Eq, Hash, Clone)] struct TestAsyncUDFImpl { batch_size: usize, diff --git a/datafusion/physical-plan/src/async_func.rs b/datafusion/physical-plan/src/async_func.rs index a61fd95949d1a..364673a8b8c7f 100644 --- a/datafusion/physical-plan/src/async_func.rs +++ b/datafusion/physical-plan/src/async_func.rs @@ -332,6 +332,9 @@ impl AsyncMapper { Arc::clone(expr), schema, )?)); + // Async UDFs are traversal boundaries; do not recurse into their arguments. + // Use Jump (not Stop) to continue visiting sibling expressions. + return Ok(TreeNodeRecursion::Jump); } Ok(TreeNodeRecursion::Continue) })?;