Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
* (Python) Add YAML Editor and Visualization Panel ([#35772](https://github.com/apache/beam/issues/35772)).

## I/Os

Expand Down Expand Up @@ -96,7 +97,7 @@

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
* (Python) Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues.
* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues.

## I/Os

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# Licensed under the Apache License, Version 2.0 (the 'License'); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

import dataclasses
import json
from dataclasses import dataclass
from typing import Any
from typing import Dict
from typing import List
from typing import TypedDict

import yaml

import apache_beam as beam
from apache_beam.yaml.main import build_pipeline_components_from_yaml

# ======================== Type Definitions ========================


@dataclass
class NodeData:
id: str
label: str
type: str = ""

def __post_init__(self):
# Ensure ID is not empty
if not self.id:
raise ValueError("Node ID cannot be empty")


@dataclass
class EdgeData:
source: str
target: str
label: str = ""

def __post_init__(self):
if not self.source or not self.target:
raise ValueError("Edge source and target cannot be empty")


class FlowGraph(TypedDict):
nodes: List[Dict[str, Any]]
edges: List[Dict[str, Any]]


# ======================== Main Function ========================


def parse_beam_yaml(yaml_str: str, isDryRunMode: bool = False) -> str:
"""
Parse Beam YAML and convert to flow graph data structure

Args:
yaml_str: Input YAML string

Returns:
Standardized response format:
- Success: {'status': 'success', 'data': {...}, 'error': None}
- Failure: {'status': 'error', 'data': None, 'error': 'message'}
"""
# Phase 1: YAML Parsing
try:
parsed_yaml = yaml.safe_load(yaml_str)
if not parsed_yaml or 'pipeline' not in parsed_yaml:
return build_error_response(
"Invalid YAML structure: missing 'pipeline' section")
except yaml.YAMLError as e:
return build_error_response(f"YAML parsing error: {str(e)}")

# Phase 2: Pipeline Validation
try:
options, constructor = build_pipeline_components_from_yaml(
yaml_str,
[],
validate_schema='per_transform'
)
if isDryRunMode:
with beam.Pipeline(options=options) as p:
constructor(p)
except Exception as e:
return build_error_response(f"Pipeline validation failed: {str(e)}")

# Phase 3: Graph Construction
try:
pipeline = parsed_yaml['pipeline']
transforms = pipeline.get('transforms', [])

nodes: List[NodeData] = []
edges: List[EdgeData] = []

nodes.append(NodeData(id='0', label='Input', type='input'))
nodes.append(NodeData(id='1', label='Output', type='output'))

# Process transform nodes
for idx, transform in enumerate(transforms):
if not isinstance(transform, dict):
continue

payload = {k: v for k, v in transform.items() if k not in {"type"}}

node_id = f"t{idx}"
node_data = NodeData(
id=node_id,
label=transform.get('type', 'unnamed'),
type='default',
**payload)
nodes.append(node_data)

# Create connections between nodes
if idx > 0:
edges.append(
EdgeData(source=f"t{idx-1}", target=node_id, label='chain'))

if transforms:
edges.append(EdgeData(source='0', target='t0', label='start'))
edges.append(EdgeData(source=node_id, target='1', label='stop'))

def to_dict(node):
if hasattr(node, '__dataclass_fields__'):
return dataclasses.asdict(node)
return node

nodes_serializable = [to_dict(n) for n in nodes]

return build_success_response(
nodes=nodes_serializable, edges=[dataclasses.asdict(e) for e in edges])

except Exception as e:
return build_error_response(f"Graph construction failed: {str(e)}")


# ======================== Utility Functions ========================


def build_success_response(
nodes: List[Dict[str, Any]], edges: List[Dict[str, Any]]) -> str:
"""Build success response"""
return json.dumps({'data': {'nodes': nodes, 'edges': edges}, 'error': None})


def build_error_response(error_msg: str) -> str:
"""Build error response"""
return json.dumps({'data': None, 'error': error_msg})


if __name__ == "__main__":
# Example usage
example_yaml = """
pipeline:
transforms:
- type: ReadFromCsv
name: A
config:
path: /path/to/input*.csv
- type: WriteToJson
name: B
config:
path: /path/to/output.json
input: ReadFromCsv
- type: Join
input: [A, B]
"""

response = parse_beam_yaml(example_yaml, isDryRunMode=False)
print(response)
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,37 @@
"@jupyterlab/launcher": "^4.3.6",
"@jupyterlab/mainmenu": "^4.3.6",
"@lumino/widgets": "^2.2.1",
"@monaco-editor/react": "^4.7.0",
"@rmwc/base": "^14.0.0",
"@rmwc/button": "^8.0.6",
"@rmwc/card": "^14.3.5",
"@rmwc/data-table": "^8.0.6",
"@rmwc/dialog": "^8.0.6",
"@rmwc/drawer": "^8.0.6",
"@rmwc/fab": "^8.0.6",
"@rmwc/grid": "^14.3.5",
"@rmwc/list": "^8.0.6",
"@rmwc/ripple": "^14.0.0",
"@rmwc/textfield": "^8.0.6",
"@rmwc/tooltip": "^8.0.6",
"@rmwc/top-app-bar": "^8.0.6",
"@rmwc/touch-target": "^14.3.5",
"@xyflow/react": "^12.8.2",
"dagre": "^0.8.5",
"lodash": "^4.17.21",
"material-design-icons": "^3.0.1",
"react": "^18.2.0",
"react-dom": "^18.2.0"
"react-dom": "^18.2.0",
"react-split": "^2.0.14"
},
"devDependencies": {
"@jupyterlab/builder": "^4.3.6",
"@testing-library/dom": "^9.3.0",
"@testing-library/jest-dom": "^6.1.4",
"@testing-library/react": "^14.0.0",
"@types/dagre": "^0.7.53",
"@types/jest": "^29.5.14",
"@types/lodash": "^4.17.20",
"@types/react": "^18.2.0",
"@types/react-dom": "^18.2.0",
"@typescript-eslint/eslint-plugin": "^7.3.1",
Expand Down Expand Up @@ -97,5 +107,6 @@
"test": "jest",
"resolutions": {
"@types/react": "^18.2.0"
}
}
},
"packageManager": "yarn@1.22.22+sha512.a6b2f7906b721bba3d67d4aff083df04dad64c399707841b7acf00f6b133b7ac24255f2652fa22ae3534329dc6180534e98d17432037ff6fd140556e2bb3137e"
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,17 @@ export class SidePanel extends BoxPanel {
const sessionModelItr = manager.sessions.running();
const firstModel = sessionModelItr.next();
let onlyOneUniqueKernelExists = true;
if (firstModel === undefined) {
// There is zero unique running kernel.

if (firstModel.done) {
// No Running kernel
onlyOneUniqueKernelExists = false;
} else {
// firstModel.value is the first session
let sessionModel = sessionModelItr.next();
while (sessionModel !== undefined) {

while (!sessionModel.done) {
// Check if there is more than one unique kernel
if (sessionModel.value.kernel.id !== firstModel.value.kernel.id) {
// There is more than one unique running kernel.
onlyOneUniqueKernelExists = false;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ import { SidePanel } from './SidePanel';
import {
InteractiveInspectorWidget
} from './inspector/InteractiveInspectorWidget';
import { YamlWidget } from './yaml/YamlWidget';

namespace CommandIDs {
export const open_inspector =
'apache-beam-jupyterlab-sidepanel:open_inspector';
export const open_clusters_panel =
'apache-beam-jupyterlab-sidepanel:open_clusters_panel';
export const open_yaml_editor =
'apache-beam-jupyterlab-sidepanel:open_yaml_editor';
}

/**
Expand Down Expand Up @@ -67,6 +70,7 @@ function activate(
const category = 'Interactive Beam';
const inspectorCommandLabel = 'Open Inspector';
const clustersCommandLabel = 'Manage Clusters';
const yamlCommandLabel = 'Edit YAML Pipeline';
const { commands, shell, serviceManager } = app;

async function createInspectorPanel(): Promise<SidePanel> {
Expand Down Expand Up @@ -105,6 +109,24 @@ function activate(
return panel;
}

async function createYamlPanel(): Promise<SidePanel> {
const sessionContext = new SessionContext({
sessionManager: serviceManager.sessions,
specsManager: serviceManager.kernelspecs,
name: 'Interactive Beam YAML Session'
});
const yamlEditor = new YamlWidget(sessionContext);
const panel = new SidePanel(
serviceManager,
rendermime,
sessionContext,
'Interactive Beam YAML Editor',
yamlEditor
);
activatePanel(panel);
return panel;
}

function activatePanel(panel: SidePanel): void {
shell.add(panel, 'main');
shell.activateById(panel.id);
Expand All @@ -122,6 +144,12 @@ function activate(
execute: createClustersPanel
});

// The open_yaml_editor command is also used by the below entry points.
commands.addCommand(CommandIDs.open_yaml_editor, {
label: yamlCommandLabel,
execute: createYamlPanel
});

// Entry point in launcher.
if (launcher) {
launcher.add({
Expand All @@ -132,6 +160,10 @@ function activate(
command: CommandIDs.open_clusters_panel,
category: category
});
launcher.add({
command: CommandIDs.open_yaml_editor,
category: category
});
}

// Entry point in top menu.
Expand All @@ -140,10 +172,11 @@ function activate(
mainMenu.addMenu(menu);
menu.addItem({ command: CommandIDs.open_inspector });
menu.addItem({ command: CommandIDs.open_clusters_panel });
menu.addItem({ command: CommandIDs.open_yaml_editor });

// Entry point in commands palette.
palette.addItem({ command: CommandIDs.open_inspector, category });
palette.addItem({ command: CommandIDs.open_clusters_panel, category });
palette.addItem({ command: CommandIDs.open_yaml_editor, category });
}

export default extension;
Loading
Loading