Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,39 @@ async fn test_async_udf_metrics() -> Result<()> {
Ok(())
}

// Regression test for https://github.com/apache/datafusion/issues/20031
// This test verifies that nested async UDF calls work correctly.
#[tokio::test]
async fn test_nested_async_udf() -> Result<()> {
let ctx = register_table_and_udf()?;

// This query previously failed with:
// "Internal error: async functions should not be called directly"
let df = ctx
.sql("SELECT id, test_async_udf(test_async_udf(prompt)) as result FROM test_table")
.await?;

let result = df.collect().await?;

// The nested call test_async_udf(test_async_udf(prompt)) should:
// 1. First evaluate inner: test_async_udf(prompt) -> prompt (identity function)
// 2. Then evaluate outer: test_async_udf(prompt) -> prompt
assert_batches_eq!(
&[
"+----+---------+",
"| id | result |",
"+----+---------+",
"| 0 | prompt0 |",
"| 1 | prompt1 |",
"| 2 | prompt2 |",
"+----+---------+"
],
&result
);

Ok(())
}

#[derive(Debug, PartialEq, Eq, Hash, Clone)]
struct TestAsyncUDFImpl {
batch_size: usize,
Expand Down
3 changes: 3 additions & 0 deletions datafusion/physical-plan/src/async_func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ impl AsyncMapper {
Arc::clone(expr),
schema,
)?));
// Async UDFs are traversal boundaries; do not recurse into their arguments.
// Use Jump (not Stop) to continue visiting sibling expressions.
return Ok(TreeNodeRecursion::Jump);
}
Ok(TreeNodeRecursion::Continue)
})?;
Expand Down
Loading