Skip to content

[DATAFLINT-4425] - duration instrumentation on all nodes#58

Draft
minskya wants to merge 14 commits intomainfrom
duration-all-nodes-more
Draft

[DATAFLINT-4425] - duration instrumentation on all nodes#58
minskya wants to merge 14 commits intomainfrom
duration-all-nodes-more

Conversation

@minskya
Copy link
Copy Markdown
Contributor

@minskya minskya commented Apr 5, 2026

Summary

Adds server-side duration attribution and topology-based stage grouping to the DataFlint Spark plugin. Duration computation and stage assignment move from the frontend (TypeScript) to the backend (Scala), operating on SparkPlanGraph + metric values from the status store — works for both live execution and history server.

  • GraphDurationAttribution computes stage groups from graph topology (Exchange boundaries) and per-node exclusive durations via pipelined subtraction
  • /dataflint/sqlplan and /dataflint/sqlmetrics endpoints enriched with stageGroups, nodeDurations, and exchangeDurations
  • Frontend simplified from ~650 lines to ~130 lines — reads pre-computed data from backend, zero guessing
  • Shared API case classes consolidated in DurationApi.scala (no more duplication between Spark 3/4 modules)
  • Tests: GraphStageGroupSpec (topology) + GraphDurationAttributionSpec (durations, exchanges, normalization)

Changed files

File Change
GraphDurationAttribution.scala NEW — stage grouping, duration attribution, normalization, exchange durations
DurationApi.scala NEW — shared case classes (ResolvedStageGroup, NodeDurationData, ExchangeDurationData, etc.)
PlanDurationAttribution.scala DELETED — replaced by GraphDurationAttribution
DataflintSQLPlanPage.scala (v3/v4) Enriched with stageGroups + nodeDurations for all SQLs
DataflintSQLMetricsPage.scala (v3/v4) Enriched with stageGroups + nodeDurations for running SQLs
api.scala (v3/v4) SqlEnrichedData includes stage groups + durations; duplicates moved to shared module
TimedExec.scala Codegen sleep config for test timing
DataFlintRDDUtils.scala Duration metric utilities
SQLNodeStageReducer.ts Rewritten — uses backend ResolvedStageGroup for stages, backend durations for display
PlanDurationAttribution.ts DELETED — replaced by backend
SqlReducer.ts Stores backend data, null-safe metric handling
AppStore.ts New types: ResolvedStageGroup, NodeDurationData, ExchangeBoundary
SqlReducerUtils.ts Added "duration" to input/output/join/sort metric allowlists
SparkApi.tsx Fetches SQLMetricsWithDuration, guards against empty responses
GraphStageGroupSpec.scala NEW — 4 tests: no-shuffle, single exchange, chained exchanges, unique node IDs
GraphDurationAttributionSpec.scala NEW — 4 tests: instrumented durations, exchange write/read, resolved groups, normalization

Attribution Model

Node with native exclusive metric (sortTime, aggTime):
  exclusive = metric value

Pipelined node (parent inclusive > max child inclusive):
  exclusive = inclusive - max_child_inclusive

Blocking node (Sort, HashAggregate, Python UDFs):
  exclusive = inclusive (independent timer)

Non-instrumented node inside codegen:
  exclusive = codegen pipelineTime (fallback)

Normalization:
  per-stage durations scaled to match executorRunTime

Stage Grouping

Topology-based (deterministic, no metrics needed):

  1. Walk graph from root, collect nodes between Exchange boundaries
  2. Each Exchange creates a new stage group
  3. Chained exchanges (Exchange → Exchange) handled via recursive drill-through
  4. Codegen cluster inner nodes included in their parent group
  5. Resolve plan-stage IDs to Spark stage IDs via metric extraction + elimination

Exchange Duration Split

Each exchange node provides:

  • writeDurationMs = shuffle write time (belongs to write stage)
  • readDurationMs = fetch wait time + remote durations (belongs to read stage)

minskya added 11 commits March 24, 2026 15:35
  Replace 19 per-type DataFlint*Exec subclasses (spark3 + spark4) with a
  single TimedExec wrapper that adds a `duration` metric to any SparkPlan
  while preserving all child metrics automatically.

  - TimedExec.children = child.children (transparent wrapper — one node in
    Spark plan graph, no double-node in native SQL UI or DataFlint UI)
  - nodeName = "DataFlint" + child.nodeName
  - Instrumentation enabled globally or per node type via existing config flags
  - Exchange nodes never wrapped
  - Version-specific classes (PythonMapInArrowExec, ArrowWindowPythonExec)
    matched by simple class name string — no NoClassDefFoundError on older
    Spark versions
  - TimedExec and MetricsUtils moved to plugin/ (shared by spark3 + spark4)
@notion-workspace
Copy link
Copy Markdown

minskya and others added 3 commits April 6, 2026 14:20
joins codeGen instrumentation doesnt work - cancelling codegen for these.
Move duration computation and stage assignment from the frontend (TypeScript)
to the backend (Scala), using SparkPlanGraph + metric values from the status
store. This works for both live execution and history server.

Backend (GraphDurationAttribution):
- Compute stage groups from graph topology (Exchange boundaries) — deterministic,
  no Spark stage data needed
- Resolve plan-stage IDs to actual Spark stage IDs using metric-based extraction
  with process-of-elimination fallback
- Compute per-node exclusive durations via pipelined subtraction model:
  - Native exclusive metrics (sortTime, aggTime, buildTime) used directly
  - Python UDF nodes (WindowInPandas, BatchEvalPython, etc.) treated as blocking
    (independent timers)
  - findTimedChild returns max timing among descendants (not first found)
  - Codegen pipelineTime fallback for non-instrumented nodes inside codegen
  - Auto-detection: attribution mode when instrumented, classic mode otherwise
- Normalize per-stage durations to match executorRunTime
- Exchange write/read durations from shuffle metrics
- Stage metadata (status, numTasks, executorRunTime) included in response
- Shared API case classes moved to plugin module (DurationApi.scala)

Frontend (simplified SQLNodeStageReducer):
- Reduced from ~650 lines to ~130 lines
- Uses backend-provided ResolvedStageGroup for stage assignment (no guessing)
- Uses backend-provided NodeDurationData for durations (no TS-side computation)
- Exchange write/read durations from backend
- Removed PlanDurationAttribution.ts entirely
- Null-safe metric value handling

API changes:
- /dataflint/sqlplan enriched with stageGroups + nodeDurations for all SQLs
- /dataflint/sqlmetrics enriched with stageGroups + nodeDurations for running SQLs
- Metric allowlists updated: added "duration" to input/output/join/sort types

Tests:
- GraphStageGroupSpec: topology-based stage grouping (4 tests)
- GraphDurationAttributionSpec: durations, exchange metrics, normalization (4 tests)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@minskya minskya force-pushed the duration-all-nodes-more branch from dd5f26c to e3dabdd Compare April 9, 2026 14:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant