Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions datafusion/physical-plan/src/async_func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ impl AsyncFuncExec {
input.boundedness(),
))
}

pub fn async_exprs(&self) -> &[Arc<AsyncFuncExpr>] {
&self.async_exprs
}

pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}
}

impl DisplayAs for AsyncFuncExec {
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ serde = { version = "1.0", optional = true }
serde_json = { workspace = true, optional = true }

[dev-dependencies]
async-trait = { workspace = true }
datafusion = { workspace = true, default-features = false, features = [
"sql",
"datetime_expressions",
Expand Down
7 changes: 7 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,7 @@ message PhysicalPlanNode {
GenerateSeriesNode generate_series = 33;
SortMergeJoinExecNode sort_merge_join = 34;
MemoryScanExecNode memory_scan = 35;
AsyncFuncExecNode async_func = 36;
}
}

Expand Down Expand Up @@ -1393,3 +1394,9 @@ message SortMergeJoinExecNode {
repeated SortExprNode sort_options = 6;
datafusion_common.NullEquality null_equality = 7;
}

message AsyncFuncExecNode {
PhysicalPlanNode input = 1;
repeated PhysicalExprNode async_exprs = 2;
repeated string async_expr_names = 3;
}
141 changes: 141 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 78 additions & 1 deletion datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ use datafusion_physical_plan::unnest::{ListUnnest, UnnestExec};
use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use datafusion_physical_plan::{ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr};

use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr;
use datafusion_physical_plan::async_func::AsyncFuncExec;
use prost::bytes::BufMut;
use prost::Message;

Expand Down Expand Up @@ -251,6 +253,9 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
PhysicalPlanType::SortMergeJoin(sort_join) => {
self.try_into_sort_join(sort_join, ctx, extension_codec)
}
PhysicalPlanType::AsyncFunc(async_func) => {
self.try_into_async_func_physical_plan(async_func, ctx, extension_codec)
}
}
}

Expand Down Expand Up @@ -462,6 +467,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
}
}

if let Some(exec) = plan.downcast_ref::<AsyncFuncExec>() {
return protobuf::PhysicalPlanNode::try_from_async_func_exec(
exec,
extension_codec,
);
}

let mut buf: Vec<u8> = vec![];
match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
Ok(_) => {
Expand Down Expand Up @@ -1972,6 +1984,44 @@ impl protobuf::PhysicalPlanNode {
Ok(Arc::new(CooperativeExec::new(input)))
}

fn try_into_async_func_physical_plan(
&self,
async_func: &protobuf::AsyncFuncExecNode,
ctx: &TaskContext,
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn ExecutionPlan>> {
let input: Arc<dyn ExecutionPlan> =
into_physical_plan(&async_func.input, ctx, extension_codec)?;

if async_func.async_exprs.len() != async_func.async_expr_names.len() {
return internal_err!(
"AsyncFuncExecNode async_exprs length does not match async_expr_names"
);
}

let async_exprs = async_func
.async_exprs
.iter()
.zip(async_func.async_expr_names.iter())
.map(|(expr, name)| {
let physical_expr = parse_physical_expr(
expr,
ctx,
input.schema().as_ref(),
extension_codec,
)?;

Ok(Arc::new(AsyncFuncExpr::try_new(
name.clone(),
physical_expr,
input.schema().as_ref(),
)?))
})
.collect::<Result<Vec<_>>>()?;

Ok(Arc::new(AsyncFuncExec::try_new(async_exprs, input)?))
}

fn try_from_explain_exec(
exec: &ExplainExec,
_extension_codec: &dyn PhysicalExtensionCodec,
Expand Down Expand Up @@ -3222,6 +3272,34 @@ impl protobuf::PhysicalPlanNode {

Ok(None)
}

fn try_from_async_func_exec(
exec: &AsyncFuncExec,
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Self> {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
Arc::clone(exec.input()),
extension_codec,
)?;

let mut async_exprs = vec![];
let mut async_expr_names = vec![];

for async_expr in exec.async_exprs() {
async_exprs.push(serialize_physical_expr(&async_expr.func, extension_codec)?);
async_expr_names.push(async_expr.name.clone())
}

Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::AsyncFunc(Box::new(
protobuf::AsyncFuncExecNode {
input: Some(Box::new(input)),
async_exprs,
async_expr_names,
},
))),
})
}
}

pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
Expand Down Expand Up @@ -3439,7 +3517,6 @@ impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
fn into_physical_plan(
node: &Option<Box<protobuf::PhysicalPlanNode>>,
ctx: &TaskContext,

extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn ExecutionPlan>> {
if let Some(field) = node {
Expand Down
Loading