diff --git a/CHANGES.md b/CHANGES.md index 0394882d8a7a..af1ab8c6e3ce 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -65,6 +65,7 @@ * New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). * New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). +* (Python) Add YAML Editor and Visualization Panel ([#35772](https://github.com/apache/beam/issues/35772)). ## I/Os @@ -96,7 +97,7 @@ * New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). * New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). -* (Python) Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues. +* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues. ## I/Os diff --git a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/apache_beam_jupyterlab_sidepanel/yaml_parse_utils.py b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/apache_beam_jupyterlab_sidepanel/yaml_parse_utils.py new file mode 100644 index 000000000000..aebca7b85d65 --- /dev/null +++ b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/apache_beam_jupyterlab_sidepanel/yaml_parse_utils.py @@ -0,0 +1,176 @@ +# Licensed under the Apache License, Version 2.0 (the 'License'); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +import dataclasses +import json +from dataclasses import dataclass +from typing import Any +from typing import Dict +from typing import List +from typing import TypedDict + +import yaml + +import apache_beam as beam +from apache_beam.yaml.main import build_pipeline_components_from_yaml + +# ======================== Type Definitions ======================== + + +@dataclass +class NodeData: + id: str + label: str + type: str = "" + + def __post_init__(self): + # Ensure ID is not empty + if not self.id: + raise ValueError("Node ID cannot be empty") + + +@dataclass +class EdgeData: + source: str + target: str + label: str = "" + + def __post_init__(self): + if not self.source or not self.target: + raise ValueError("Edge source and target cannot be empty") + + +class FlowGraph(TypedDict): + nodes: List[Dict[str, Any]] + edges: List[Dict[str, Any]] + + +# ======================== Main Function ======================== + + +def parse_beam_yaml(yaml_str: str, isDryRunMode: bool = False) -> str: + """ + Parse Beam YAML and convert to flow graph data structure + + Args: + yaml_str: Input YAML string + + Returns: + Standardized response format: + - Success: {'status': 'success', 'data': {...}, 'error': None} + - Failure: {'status': 'error', 'data': None, 'error': 'message'} + """ + # Phase 1: YAML Parsing + try: + parsed_yaml = yaml.safe_load(yaml_str) + if not parsed_yaml or 'pipeline' not in parsed_yaml: + return build_error_response( + "Invalid YAML structure: missing 'pipeline' section") + except yaml.YAMLError as e: + return build_error_response(f"YAML parsing error: {str(e)}") + + # Phase 2: Pipeline Validation + try: + options, constructor = build_pipeline_components_from_yaml( + yaml_str, + [], + validate_schema='per_transform' + ) + if isDryRunMode: + with beam.Pipeline(options=options) as p: + constructor(p) + except Exception as e: + return build_error_response(f"Pipeline validation failed: {str(e)}") + + # Phase 3: Graph Construction + try: + pipeline = parsed_yaml['pipeline'] + transforms = pipeline.get('transforms', []) + + nodes: List[NodeData] = [] + edges: List[EdgeData] = [] + + nodes.append(NodeData(id='0', label='Input', type='input')) + nodes.append(NodeData(id='1', label='Output', type='output')) + + # Process transform nodes + for idx, transform in enumerate(transforms): + if not isinstance(transform, dict): + continue + + payload = {k: v for k, v in transform.items() if k not in {"type"}} + + node_id = f"t{idx}" + node_data = NodeData( + id=node_id, + label=transform.get('type', 'unnamed'), + type='default', + **payload) + nodes.append(node_data) + + # Create connections between nodes + if idx > 0: + edges.append( + EdgeData(source=f"t{idx-1}", target=node_id, label='chain')) + + if transforms: + edges.append(EdgeData(source='0', target='t0', label='start')) + edges.append(EdgeData(source=node_id, target='1', label='stop')) + + def to_dict(node): + if hasattr(node, '__dataclass_fields__'): + return dataclasses.asdict(node) + return node + + nodes_serializable = [to_dict(n) for n in nodes] + + return build_success_response( + nodes=nodes_serializable, edges=[dataclasses.asdict(e) for e in edges]) + + except Exception as e: + return build_error_response(f"Graph construction failed: {str(e)}") + + +# ======================== Utility Functions ======================== + + +def build_success_response( + nodes: List[Dict[str, Any]], edges: List[Dict[str, Any]]) -> str: + """Build success response""" + return json.dumps({'data': {'nodes': nodes, 'edges': edges}, 'error': None}) + + +def build_error_response(error_msg: str) -> str: + """Build error response""" + return json.dumps({'data': None, 'error': error_msg}) + + +if __name__ == "__main__": + # Example usage + example_yaml = """ +pipeline: + transforms: + - type: ReadFromCsv + name: A + config: + path: /path/to/input*.csv + - type: WriteToJson + name: B + config: + path: /path/to/output.json + input: ReadFromCsv + - type: Join + input: [A, B] + """ + + response = parse_beam_yaml(example_yaml, isDryRunMode=False) + print(response) diff --git a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/package.json b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/package.json index 8b51461f6cd4..eef3fcaa80f4 100644 --- a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/package.json +++ b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/package.json @@ -47,27 +47,37 @@ "@jupyterlab/launcher": "^4.3.6", "@jupyterlab/mainmenu": "^4.3.6", "@lumino/widgets": "^2.2.1", + "@monaco-editor/react": "^4.7.0", "@rmwc/base": "^14.0.0", "@rmwc/button": "^8.0.6", + "@rmwc/card": "^14.3.5", "@rmwc/data-table": "^8.0.6", "@rmwc/dialog": "^8.0.6", "@rmwc/drawer": "^8.0.6", "@rmwc/fab": "^8.0.6", + "@rmwc/grid": "^14.3.5", "@rmwc/list": "^8.0.6", "@rmwc/ripple": "^14.0.0", "@rmwc/textfield": "^8.0.6", "@rmwc/tooltip": "^8.0.6", "@rmwc/top-app-bar": "^8.0.6", + "@rmwc/touch-target": "^14.3.5", + "@xyflow/react": "^12.8.2", + "dagre": "^0.8.5", + "lodash": "^4.17.21", "material-design-icons": "^3.0.1", "react": "^18.2.0", - "react-dom": "^18.2.0" + "react-dom": "^18.2.0", + "react-split": "^2.0.14" }, "devDependencies": { "@jupyterlab/builder": "^4.3.6", "@testing-library/dom": "^9.3.0", "@testing-library/jest-dom": "^6.1.4", "@testing-library/react": "^14.0.0", + "@types/dagre": "^0.7.53", "@types/jest": "^29.5.14", + "@types/lodash": "^4.17.20", "@types/react": "^18.2.0", "@types/react-dom": "^18.2.0", "@typescript-eslint/eslint-plugin": "^7.3.1", @@ -97,5 +107,6 @@ "test": "jest", "resolutions": { "@types/react": "^18.2.0" - } -} + }, + "packageManager": "yarn@1.22.22+sha512.a6b2f7906b721bba3d67d4aff083df04dad64c399707841b7acf00f6b133b7ac24255f2652fa22ae3534329dc6180534e98d17432037ff6fd140556e2bb3137e" +} \ No newline at end of file diff --git a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/SidePanel.ts b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/SidePanel.ts index fb86b0a53fdf..d8f19c278843 100644 --- a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/SidePanel.ts +++ b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/SidePanel.ts @@ -58,14 +58,17 @@ export class SidePanel extends BoxPanel { const sessionModelItr = manager.sessions.running(); const firstModel = sessionModelItr.next(); let onlyOneUniqueKernelExists = true; - if (firstModel === undefined) { - // There is zero unique running kernel. + + if (firstModel.done) { + // No Running kernel onlyOneUniqueKernelExists = false; } else { + // firstModel.value is the first session let sessionModel = sessionModelItr.next(); - while (sessionModel !== undefined) { + + while (!sessionModel.done) { + // Check if there is more than one unique kernel if (sessionModel.value.kernel.id !== firstModel.value.kernel.id) { - // There is more than one unique running kernel. onlyOneUniqueKernelExists = false; break; } diff --git a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/index.ts b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/index.ts index 3f2b02d11b53..92a1ea3cdbbe 100644 --- a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/index.ts +++ b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/index.ts @@ -28,12 +28,15 @@ import { SidePanel } from './SidePanel'; import { InteractiveInspectorWidget } from './inspector/InteractiveInspectorWidget'; +import { YamlWidget } from './yaml/YamlWidget'; namespace CommandIDs { export const open_inspector = 'apache-beam-jupyterlab-sidepanel:open_inspector'; export const open_clusters_panel = 'apache-beam-jupyterlab-sidepanel:open_clusters_panel'; + export const open_yaml_editor = + 'apache-beam-jupyterlab-sidepanel:open_yaml_editor'; } /** @@ -67,6 +70,7 @@ function activate( const category = 'Interactive Beam'; const inspectorCommandLabel = 'Open Inspector'; const clustersCommandLabel = 'Manage Clusters'; + const yamlCommandLabel = 'Edit YAML Pipeline'; const { commands, shell, serviceManager } = app; async function createInspectorPanel(): Promise { @@ -105,6 +109,24 @@ function activate( return panel; } + async function createYamlPanel(): Promise { + const sessionContext = new SessionContext({ + sessionManager: serviceManager.sessions, + specsManager: serviceManager.kernelspecs, + name: 'Interactive Beam YAML Session' + }); + const yamlEditor = new YamlWidget(sessionContext); + const panel = new SidePanel( + serviceManager, + rendermime, + sessionContext, + 'Interactive Beam YAML Editor', + yamlEditor + ); + activatePanel(panel); + return panel; + } + function activatePanel(panel: SidePanel): void { shell.add(panel, 'main'); shell.activateById(panel.id); @@ -122,6 +144,12 @@ function activate( execute: createClustersPanel }); + // The open_yaml_editor command is also used by the below entry points. + commands.addCommand(CommandIDs.open_yaml_editor, { + label: yamlCommandLabel, + execute: createYamlPanel + }); + // Entry point in launcher. if (launcher) { launcher.add({ @@ -132,6 +160,10 @@ function activate( command: CommandIDs.open_clusters_panel, category: category }); + launcher.add({ + command: CommandIDs.open_yaml_editor, + category: category + }); } // Entry point in top menu. @@ -140,10 +172,11 @@ function activate( mainMenu.addMenu(menu); menu.addItem({ command: CommandIDs.open_inspector }); menu.addItem({ command: CommandIDs.open_clusters_panel }); + menu.addItem({ command: CommandIDs.open_yaml_editor }); // Entry point in commands palette. palette.addItem({ command: CommandIDs.open_inspector, category }); palette.addItem({ command: CommandIDs.open_clusters_panel, category }); + palette.addItem({ command: CommandIDs.open_yaml_editor, category }); } - export default extension; diff --git a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/yaml/CustomStyle.tsx b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/yaml/CustomStyle.tsx new file mode 100644 index 000000000000..87d93de0b60a --- /dev/null +++ b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/yaml/CustomStyle.tsx @@ -0,0 +1,179 @@ +// Licensed under the Apache License, Version 2.0 (the 'License'); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +import React, { memo } from 'react'; +import { Handle, Position } from '@xyflow/react'; +import { EdgeProps, BaseEdge, getSmoothStepPath } from '@xyflow/react'; +import { INodeData } from './DataType'; +import { transformEmojiMap } from './EmojiMap'; + +export function DefaultNode({ data }: { data: INodeData }) { + const emoji = data.label + ? transformEmojiMap[data.label] || 'πŸ“¦' + : data.emoji || 'πŸ“¦'; + const typeClass = data.type ? `custom-node-${data.type}` : ''; + + return ( +
+
+
{emoji}
+
{data.label}
+
+ + + +
+ ); +} + +// ===== Input Node ===== +export function InputNode({ data }: { data: INodeData }) { + return ( +
+
+
{data.emoji || '🟒'}
+
{data.label}
+
+ + +
+ ); +} + +// ===== Output Node ===== +export function OutputNode({ data }: { data: INodeData }) { + return ( +
+
+
{data.emoji || 'πŸ”΄'}
+
{data.label}
+
+ + +
+ ); +} + +export default memo(DefaultNode); + +export function AnimatedSVGEdge({ + id, + sourceX, + sourceY, + targetX, + targetY, + sourcePosition, + targetPosition +}: EdgeProps) { + const [initialEdgePath] = getSmoothStepPath({ + sourceX, + sourceY, + targetX, + targetY, + sourcePosition, + targetPosition + }); + + let edgePath = initialEdgePath; + + // If the edge is almost vertical or horizontal, use a straight line + const dx = Math.abs(targetX - sourceX); + const dy = Math.abs(targetY - sourceY); + if (dx < 1) { + edgePath = `M${sourceX},${sourceY} L${sourceX + 1},${targetY}`; + } else if (dy < 1) { + edgePath = `M${sourceX},${sourceY} L${targetX},${sourceY + 1}`; + } + + const dotCount = 4; + const dotDur = 3.5; + + const dots = Array.from({ length: dotCount }, (_, i) => ( + + + + + )); + + return ( + <> + {/* Gradient Base Edge */} + + + {/* Dots */} + {dots} + + {/* Flow shader line */} + + + + + {/* Gradient Color */} + + + + + + + + + + + + + + ); +} diff --git a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/yaml/DataType.ts b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/yaml/DataType.ts new file mode 100644 index 000000000000..0ea535d5fc6a --- /dev/null +++ b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/yaml/DataType.ts @@ -0,0 +1,37 @@ +// Licensed under the Apache License, Version 2.0 (the 'License'); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +export const nodeWidth = 320; +export const nodeHeight = 100; + +export interface INodeData { + id: string; + label: string; + type?: string; + [key: string]: any; +} + +export interface IEdgeData { + source: string; + target: string; + label?: string; +} + +export interface IFlowGraph { + nodes: INodeData[]; + edges: IEdgeData[]; +} + +export interface IApiResponse { + data: IFlowGraph | null; + error: string | null; +} diff --git a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/yaml/EditablePanel.tsx b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/yaml/EditablePanel.tsx new file mode 100644 index 000000000000..d2b19d4371f4 --- /dev/null +++ b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/yaml/EditablePanel.tsx @@ -0,0 +1,408 @@ +// Licensed under the Apache License, Version 2.0 (the 'License'); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +import React from 'react'; +import { Node } from '@xyflow/react'; +import '../../style/yaml/YamlEditor.css'; +import { transformEmojiMap } from './EmojiMap'; + +type EditableKeyValuePanelProps = { + node: Node; + onChange: (newData: Record) => void; + depth?: number; +}; + +type EditableKeyValuePanelState = { + localData: Record; + collapsedKeys: Set; +}; + +/** + * An editable key-value panel component for displaying + * and modifying node properties. + * + * Features: + * - Nested object support with collapsible sections + * - Real-time key-value editing with validation + * - Dynamic field addition and deletion + * - Support for multi-line text values + * - Object conversion for nested structures + * - Reference documentation integration + * - Visual hierarchy with depth-based indentation + * - Interactive UI with hover effects and transitions + * + * State Management: + * - localData: Local copy of the node data being edited + * - collapsedKeys: Set of keys that are currently collapsed + * + * Props: + * @param {Node} node - The node is data to be edited + * @param {(data: Record) => void} onChange - + * Callback for data changes + * @param {number} [depth=0] - Current nesting depth for recursive rendering + * + * Methods: + * - toggleCollapse: Toggles collapse state of nested objects + * - handleKeyChange: Updates keys with validation + * - handleValueChange: Updates values in the local data + * - handleDelete: Removes key-value pairs + * - handleAddPair: Adds new key-value pairs + * - convertToObject: Converts primitive values to objects + * - renderValueEditor: Renders appropriate input based on value type + * + * UI Features: + * - Collapsible nested object sections + * - Multi-line text support for complex values + * - Add/Delete buttons for field management + * - Reference documentation links + * - Visual feedback for user interactions + * - Responsive design with proper spacing + */ +export class EditableKeyValuePanel extends React.Component< + EditableKeyValuePanelProps, + EditableKeyValuePanelState +> { + static defaultProps = { + depth: 0 + }; + + constructor(props: EditableKeyValuePanelProps) { + super(props); + this.state = { + localData: { ...(props.node ? props.node.data : {}) }, + collapsedKeys: new Set() + }; + } + + componentDidUpdate(prevProps: EditableKeyValuePanelProps) { + if (prevProps.node !== this.props.node && this.props.node) { + this.setState({ localData: { ...(this.props.node.data ?? {}) } }); + } + } + + toggleCollapse = (key: string) => { + this.setState(({ collapsedKeys }) => { + const newSet = new Set(collapsedKeys); + newSet.has(key) ? newSet.delete(key) : newSet.add(key); + return { collapsedKeys: newSet }; + }); + }; + + handleKeyChange = (oldKey: string, newKey: string) => { + newKey = newKey.trim(); + if (newKey === oldKey || newKey === '') { + return alert('Invalid Key!'); + } + if (newKey in this.state.localData) { + return alert('Duplicated Key!'); + } + + const newData: Record = {}; + for (const [k, v] of Object.entries(this.state.localData)) { + newData[k === oldKey ? newKey : k] = v; + } + + this.setState({ localData: newData }, () => this.props.onChange(newData)); + }; + + handleValueChange = (key: string, newValue: any) => { + const newData = { ...this.state.localData, [key]: newValue }; + this.setState({ localData: newData }, () => this.props.onChange(newData)); + }; + + handleDelete = (key: string) => { + const { [key]: _, ...rest } = this.state.localData; + void _; + this.setState({ localData: rest }, () => this.props.onChange(rest)); + }; + + handleAddPair = () => { + let i = 1; + const baseKey = 'newKey'; + while (`${baseKey}${i}` in this.state.localData) { + i++; + } + const newKey = `${baseKey}${i}`; + const newData = { ...this.state.localData, [newKey]: '' }; + this.setState({ localData: newData }, () => this.props.onChange(newData)); + }; + + convertToObject = (key: string) => { + if ( + typeof this.state.localData[key] === 'object' && + this.state.localData[key] !== null + ) { + return; + } + const newData = { ...this.state.localData, [key]: {} }; + this.setState({ localData: newData }, () => this.props.onChange(newData)); + this.setState(({ collapsedKeys }) => { + const newSet = new Set(collapsedKeys); + newSet.delete(key); + return { collapsedKeys: newSet }; + }); + }; + + renderValueEditor = (key: string, value: any) => { + const isMultiline = + key === 'callable' || (typeof value === 'string' && value.includes('\n')); + + return isMultiline ? ( +