replace per-node wrapper classes with generic TimedExec#57
replace per-node wrapper classes with generic TimedExec#57
Conversation
Replace 19 per-type DataFlint*Exec subclasses (spark3 + spark4) with a
single TimedExec wrapper that adds a `duration` metric to any SparkPlan
while preserving all child metrics automatically.
- TimedExec.children = child.children (transparent wrapper — one node in
Spark plan graph, no double-node in native SQL UI or DataFlint UI)
- nodeName = "DataFlint" + child.nodeName
- Instrumentation enabled globally or per node type via existing config flags
- Exchange nodes never wrapped
- Version-specific classes (PythonMapInArrowExec, ArrowWindowPythonExec)
matched by simple class name string — no NoClassDefFoundError on older
Spark versions
- TimedExec and MetricsUtils moved to plugin/ (shared by spark3 + spark4)
…kage removed unused packages
There was a problem hiding this comment.
Pull request overview
This PR consolidates Spark plan instrumentation by replacing many per-node DataFlint*Exec subclasses with a single generic TimedExec wrapper that adds a duration metric while preserving existing child metrics, and updates the UI + tests to recognize the new wrapped node names.
Changes:
- Replace per-node instrumented exec implementations (Spark 3 + Spark 4) with a generic
TimedExecwrapper applied byDataFlintInstrumentationColumnarRule. - Update Spark UI reducers/metric processors to recognize the new
DataFlintPythonMapInArrownode name and treat it as instrumented. - Add a Spark 3.x compatibility helper for
AppStatusStore.stageListsignature drift and update pages to use it.
Reviewed changes
Copilot reviewed 33 out of 34 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| spark-ui/src/reducers/SqlReducerUtils.ts | Add node type + display name mapping for DataFlintPythonMapInArrow. |
| spark-ui/src/reducers/SqlReducer.ts | Recognize DataFlintPythonMapInArrow in plan parsing and python-eval node sets. |
| spark-ui/src/reducers/SQLNodeStageReducer.ts | Include DataFlintPythonMapInArrow in stage attribution and instrumented-duration handling. |
| spark-ui/src/components/SqlFlow/MetricProcessors.tsx | Treat DataFlintPythonMapInArrow as instrumented for duration display logic. |
| spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala | Remove Spark 4-specific DataFlintWindowExec implementation. |
| spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec_4_0.scala | Remove Spark 4.0-specific DataFlintWindowInPandasExec implementation. |
| spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintMapInPandasExec.scala | Remove Spark 4-specific DataFlintMapInPandasExec implementation. |
| spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintMapInArrowExec.scala | Remove Spark 4-specific DataFlintMapInArrowExec implementation. |
| spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintFlatMapGroupsInPandasExec.scala | Remove Spark 4-specific DataFlintFlatMapGroupsInPandasExec implementation. |
| spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintFlatMapCoGroupsInPandasExec.scala | Remove Spark 4-specific DataFlintFlatMapCoGroupsInPandasExec implementation. |
| spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintBatchEvalPythonExec.scala | Remove Spark 4-specific DataFlintBatchEvalPythonExec implementation. |
| spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintArrowWindowPythonExec_4_1.scala | Remove Spark 4.1 reflection-based Arrow window wrapper. |
| spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintArrowEvalPythonExec.scala | Remove Spark 4-specific DataFlintArrowEvalPythonExec implementation. |
| spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala | Switch to wrapping by simple class name + TimedExec; exclude Exchange. |
| spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWindowExecSpec.scala | Update tests to assert TimedExec(WindowExec) in executed plans and validate duration metric. |
| spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintPythonExecSpec.scala | Update tests to assert wrapping (TimedExec) instead of replacement subclasses. |
| spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala | Remove Spark 3-specific DataFlintWindowExec implementation. |
| spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec.scala | Remove Spark 3-specific DataFlintWindowInPandasExec implementation. |
| spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintPythonMapInArrowExec_3_5.scala | Remove Spark 3.5-specific Python MapInArrow wrapper. |
| spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintMapInPandasExec_3_5.scala | Remove Spark 3.5-specific MapInPandas wrapper. |
| spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintMapInPandasExec_3_0.scala | Remove Spark 3.0–3.4 reflection-based MapInPandas wrapper. |
| spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintMapInArrowExec_3_3.scala | Remove Spark 3.3–3.4 reflection-based MapInArrow wrapper. |
| spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintFlatMapGroupsInPandasExec.scala | Remove Spark 3-specific FlatMapGroupsInPandas wrapper. |
| spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintFlatMapCoGroupsInPandasExec.scala | Remove Spark 3-specific FlatMapCoGroupsInPandas wrapper. |
| spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintBatchEvalPythonExec.scala | Remove Spark 3-specific BatchEvalPythonExec wrapper. |
| spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintArrowEvalPythonExec.scala | Remove Spark 3-specific ArrowEvalPythonExec wrapper. |
| spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/api/api.scala | Add AppStatusStoreCompat to safely call stageList across Spark 3.x. |
| spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLStagesRddPage.scala | Use AppStatusStoreCompat.stageList instead of direct ui.store.stageList(null). |
| spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/api/DataflintCachedStoragePage.scala | Use AppStatusStoreCompat.stageList instead of direct ui.store.stageList(null). |
| spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala | Switch to wrapping by simple class name + TimedExec; exclude Exchange. |
| spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala | Introduce the generic timed wrapper used by Spark 3 + Spark 4 plugins. |
| spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/MetricsUtils.scala | Add shared timing-metric helper used by TimedExec. |
| spark-plugin/.gitignore | Ignore a local Python virtualenv directory. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala
Show resolved
Hide resolved
...uginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala
Outdated
Show resolved
Hide resolved
...uginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 36 out of 36 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| override protected def doExecute(): RDD[InternalRow] = | ||
| DataFlintRDDUtils.withDurationMetric(child.execute(), longMetric("duration")) |
There was a problem hiding this comment.
TimedExec always executes via the row path (child.execute() wrapped by withDurationMetric) and does not delegate supportsColumnar/doExecuteColumnar. This makes the wrapper non-transparent for columnar execution: it can force columnar-capable nodes back to rows (and may fail if a wrapped node is columnar-only). Consider delegating supportsColumnar to the child and adding a columnar execution wrapper (or explicitly constraining TimedExec to row-only nodes and enforcing that in the rule/docs).
| spark.sparkContext.setJobDescription("mapInPandas: compute discounted totals → DataFlintMapInPandasExec") | ||
| df_pandas.write \ |
There was a problem hiding this comment.
These job description strings still reference the old per-node *Exec wrapper class names (e.g. DataFlintMapInPandasExec). With the new TimedExec approach, the visible node name will be DataFlint + the child nodeName (e.g. DataFlintMapInPandas), and the wrapper class is TimedExec. Consider updating the descriptions to match the new plan node naming so users aren't misled when inspecting the UI.
Replace 19 per-type DataFlint*Exec subclasses (spark3 + spark4) with a
single TimedExec wrapper that adds a
durationmetric to any SparkPlanwhile preserving all child metrics automatically.