Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ pub struct ScanExec {
jvm_fetch_time: Time,
/// Time spent in FFI
arrow_ffi_time: Time,
/// Does the source of this scan re-use buffers?
pub has_buffer_reuse: bool,
}

impl ScanExec {
Expand All @@ -88,6 +90,7 @@ impl ScanExec {
input_source: Option<Arc<GlobalRef>>,
input_source_description: &str,
data_types: Vec<DataType>,
has_buffer_reuse: bool,
) -> Result<Self, CometError> {
let metrics_set = ExecutionPlanMetricsSet::default();
let baseline_metrics = BaselineMetrics::new(&metrics_set, 0);
Expand Down Expand Up @@ -138,6 +141,7 @@ impl ScanExec {
jvm_fetch_time,
arrow_ffi_time,
schema,
has_buffer_reuse,
})
}

Expand Down
21 changes: 18 additions & 3 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1216,8 +1216,13 @@ impl PhysicalPlanner {
};

// The `ScanExec` operator will take actual arrays from Spark during execution
let scan =
ScanExec::new(self.exec_context_id, input_source, &scan.source, data_types)?;
let scan = ScanExec::new(
self.exec_context_id,
input_source,
&scan.source,
data_types,
scan.has_buffer_reuse,
)?;
Ok((
vec![scan.clone()],
Arc::new(SparkPlan::new(spark_plan.plan_id, Arc::new(scan), vec![])),
Expand Down Expand Up @@ -2341,8 +2346,13 @@ impl From<ExpressionError> for DataFusionError {
fn can_reuse_input_batch(op: &Arc<dyn ExecutionPlan>) -> bool {
if op.as_any().is::<ProjectionExec>() || op.as_any().is::<LocalLimitExec>() {
can_reuse_input_batch(op.children()[0])
} else if op.as_any().is::<ScanExec>() {
op.as_any()
.downcast_ref::<ScanExec>()
.unwrap()
.has_buffer_reuse
} else {
op.as_any().is::<ScanExec>()
false
}
}

Expand Down Expand Up @@ -2594,6 +2604,7 @@ mod tests {
type_info: None,
}],
source: "".to_string(),
has_buffer_reuse: false,
})),
};

Expand Down Expand Up @@ -2667,6 +2678,7 @@ mod tests {
type_info: None,
}],
source: "".to_string(),
has_buffer_reuse: false,
})),
};

Expand Down Expand Up @@ -2877,6 +2889,7 @@ mod tests {
op_struct: Some(OpStruct::Scan(spark_operator::Scan {
fields: vec![create_proto_datatype()],
source: "".to_string(),
has_buffer_reuse: false,
})),
}
}
Expand Down Expand Up @@ -2919,6 +2932,7 @@ mod tests {
},
],
source: "".to_string(),
has_buffer_reuse: false,
})),
};

Expand Down Expand Up @@ -3039,6 +3053,7 @@ mod tests {
},
],
source: "".to_string(),
has_buffer_reuse: false,
})),
};

Expand Down
2 changes: 2 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ message Scan {
// is purely for informational purposes when viewing native query plans in
// debug mode.
string source = 2;
// Specifies whether the source of the scan reuses buffers
bool has_buffer_reuse = 3;
}

message NativeScan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2830,6 +2830,12 @@ object QueryPlanSerde extends Logging with CometExprShim {
scanBuilder.setSource(source)
}

op match {
case scan: CometScanExec =>
scanBuilder.setHasBufferReuse(scan.scanImpl == CometConf.SCAN_NATIVE_COMET)
case _ =>
}

val scanTypes = op.output.flatten { attr =>
serializeDataType(attr.dataType)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ class CometNativeShuffleWriter[K, V](
serializeDataType(attr.dataType)
}

// TODO only set this if input is native_comet scan
scanBuilder.setHasBufferReuse(true)

if (scanTypes.length == outputAttributes.length) {
scanBuilder.addAllFields(scanTypes.asJava)

Expand Down
Loading