diff --git a/README.md b/README.md index 4a47a245..7baa7c70 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,26 @@ PyQASM requires Python 3.10 or greater, and can be installed with pip as follows pip install pyqasm ``` +### Optional Dependencies + +PyQASM provides an optional extra called pyqasm[pulse] that adds pulse/calibration features. + +```bash +pip install pyqasm[pulse] +``` + +PyQASM also offers optional extras for command-line interface (CLI) functionality and for program visualization. + +To install the CLI tools: +```bash +pip install pyqasm[cli] +``` + +To install the visualization tools: +```bash +pip install pyqasm[visualization] +``` + ### Install from source You can also install from source by cloning this repository and running a pip install command diff --git a/examples/pulse_unroll_example.py b/examples/pulse_unroll_example.py new file mode 100644 index 00000000..845767a9 --- /dev/null +++ b/examples/pulse_unroll_example.py @@ -0,0 +1,141 @@ +# Copyright 2025 qBraid +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=invalid-name + +""" +Script demonstrating how to unroll a QASM 3 program using pyqasm. + +""" + +from pyqasm import dumps, loads + +complex_example = """ +OPENQASM 3; +defcalgrammar "openpulse"; + +cal { + extern port q0_drive; + extern port q0_readout; + extern frame q0_frame = newframe(q0_drive, 5.2e9, 0.0); + extern frame q0_readout_frame = newframe(q0_readout, 6.1e9, 0.0); + extern frame q0_acquire = newframe(q0_readout, 6.1e9, 0.0); +} + +const duration pulse_length = 60ns; +const duration meas_length = 800ns; +const duration buffer_time = 20ns; + +waveform gaussian_pulse = gaussian(pulse_length, 0.3, 10ns); +waveform drag_pulse = gaussian(pulse_length, 0.25, 10ns, alpha=0.5); +waveform meas_pulse = constant(meas_length, 0.2); +waveform zero_pad = constant(buffer_time, 0.0); + +defcal rb_sequence $q0 { + for int i in [0:20] { + bit[2] selector = random[2]; + + switch(selector) { + case 0: play(q0_frame, gaussian_pulse); + case 1: play(q0_frame, drag_pulse); + case 2: { + play(q0_frame, gaussian_pulse); + delay[pulse_length] q0_frame; + play(q0_frame, drag_pulse); + } + default: { + play(q0_frame, drag_pulse); + delay[pulse_length] q0_frame; + play(q0_frame, gaussian_pulse); + } + } + delay[buffer_time] q0_frame; + } + + play(q0_readout_frame, meas_pulse); + capture(q0_acquire, meas_pulse); + + box { + bit result = get_measure(q0_acquire); + stream result; + if (result) { + delay[1ms] q0_frame; + } + } +} + +qubit[1] q; +bit[1] c; + +rb_sequence q[0]; +c[0] = measure q[0]; +""" + +simple_example = """ +OPENQASM 3; +defcalgrammar "openpulse"; + +cal { + port tx_port; + frame tx_frame = newframe(tx_port2, 7883050000.0, 0); + waveform readout_waveform_wf = constant(5e-06, 0.03); + for int shot in [0:499] { + play(readout_waveform_wf, tx_frame2); + barrier tx_frame2; + } +} +""" + +# cal { +# extern drag(complex[size] amp, duration l, duration sigma, float[size] beta) -> waveform; +# extern gaussian_square(complex[size] amp, duration l, duration square_width, duration sigma) -> waveform; + +# extern port q0; +# extern port q1; + +# frame q0_frame = newframe(q0, q0_freq, 0); +# frame q1_frame = newframe(q1, q1_freq, 0); +# } + +example = """ +OPENQASM 3; +defcalgrammar "openpulse"; + +const float q0_freq = 5.0e9; +const float q1_freq = 5.1e9; + +defcal rz(angle theta) $0 { + shift_phase(q0_frame, theta); +} + +defcal rz(angle theta) $1 { + shift_phase(q1_frame, theta); +} + +defcal sx $0 { + waveform sx_wf = drag(0.2+0.1im, 160dt, 40dt, 0.05); + play(q0_frame, sx_wf); +} + +defcal sx $1 { + waveform sx_wf = drag(0.1+0.05im, 160dt, 40dt, 0.1); + play(q1_frame, sx_wf); +} +""" + +program = loads(simple_example) + +program.unroll() + +print(dumps(program)) diff --git a/pyproject.toml b/pyproject.toml index f78becce..10a6f0b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ test = ["pytest", "pytest-cov", "pytest-mpl", "matplotlib"] lint = ["black", "isort>=6.0.0", "pylint", "mypy", "qbraid-cli>=0.10.2"] docs = ["sphinx>=7.3.7,<8.3.0", "sphinx-autodoc-typehints>=1.24,<3.2", "sphinx-rtd-theme>=2.0.0,<4.0.0", "docutils<0.22", "sphinx-copybutton"] visualization = ["matplotlib"] +pulse = ["openpulse[parser]>=1.0.1"] [tool.setuptools.package-data] pyqasm = ["py.typed", "*.pyx"] diff --git a/src/pyqasm/entrypoint.py b/src/pyqasm/entrypoint.py index af083021..e44badf5 100644 --- a/src/pyqasm/entrypoint.py +++ b/src/pyqasm/entrypoint.py @@ -21,6 +21,7 @@ from typing import TYPE_CHECKING import openqasm3 +import openpulse from pyqasm.exceptions import ValidationError from pyqasm.maps import SUPPORTED_QASM_VERSIONS @@ -63,11 +64,14 @@ def loads(program: openqasm3.ast.Program | str) -> QasmModule: """ if isinstance(program, str): try: - program = openqasm3.parse(program) - except openqasm3.parser.QASM3ParsingError as err: - raise ValidationError(f"Failed to parse OpenQASM string: {err}") from err - elif not isinstance(program, openqasm3.ast.Program): - raise TypeError("Input quantum program must be of type 'str' or 'openqasm3.ast.Program'.") + if 'defcalgrammar "openpulse"' in program: + program = openpulse.parse(program) + else: + program = openqasm3.parse(program) + except (openqasm3.parser.QASM3ParsingError, openpulse.parser.OpenPulseParsingError) as err: + raise ValidationError(f"Failed to parse the OpenQASM/Openpulse string: {err}") from err + elif not isinstance(program, openqasm3.ast.Program): + raise TypeError("Input quantum program must be of type 'str' or 'openqasm3.ast.Program' or 'openpulse.ast.Program'.") if program.version not in SUPPORTED_QASM_VERSIONS: raise ValidationError( f"Unsupported OpenQASM version: {program.version}. " diff --git a/src/pyqasm/modules/qasm3.py b/src/pyqasm/modules/qasm3.py index 8ed08d51..3524ea00 100644 --- a/src/pyqasm/modules/qasm3.py +++ b/src/pyqasm/modules/qasm3.py @@ -49,6 +49,9 @@ def accept(self, visitor): visitor (QasmVisitor): The visitor to accept """ unrolled_stmt_list = visitor.visit_basic_block(self._statements) + # print("--------------------------------") + # print("Unrolled stmt list: ", unrolled_stmt_list) + # print("--------------------------------") final_stmt_list = visitor.finalize(unrolled_stmt_list) self._unrolled_ast.statements = final_stmt_list diff --git a/src/pyqasm/visitor.py b/src/pyqasm/visitor.py index bb24ef47..0fa297e1 100644 --- a/src/pyqasm/visitor.py +++ b/src/pyqasm/visitor.py @@ -26,6 +26,7 @@ import numpy as np import openqasm3.ast as qasm3_ast +import openpulse.ast as pulse_ast from openqasm3.printer import dumps from pyqasm.analyzer import Qasm3Analyzer @@ -84,6 +85,7 @@ def __init__( self._custom_gates: dict[str, qasm3_ast.QuantumGateDefinition] = {} self._external_gates: list[str] = [] if external_gates is None else external_gates self._subroutine_defns: dict[str, qasm3_ast.SubroutineDefinition] = {} + self._calibration_defns: dict[tuple[str, tuple], pulse_ast.CalibrationDefinition] = {} self._check_only: bool = check_only self._unroll_barriers: bool = unroll_barriers self._curr_scope: int = 0 @@ -2144,6 +2146,59 @@ def _visit_include(self, include: qasm3_ast.Include) -> list[qasm3_ast.Statement return [] return [include] + + + + # You'll also need these other visitor methods for your visit map: + + + def _visit_calibration_statement(self, statement: qasm3_ast.CalibrationStatement) -> list[qasm3_ast.Statement]: + """Visit a calibration statement element. + + Args: + statement (qasm3_ast.CalibrationStatement): The calibration statement to visit. + + Returns: + list[qasm3_ast.Statement]: The calibration statement if not in check_only mode. + """ + print("Calibration Statement Structure:") + print("Statement: ", statement) + + # Add specific validation for CalibrationStatement if needed + # This might be different from CalibrationDefinition + + if self._check_only: + return [] + + return [statement] + + def _visit_calibration_grammar_declaration(self, declaration: qasm3_ast.CalibrationGrammarDeclaration) -> list[qasm3_ast.Statement]: + """Visit a calibration grammar declaration element. + + Args: + declaration (qasm3_ast.CalibrationGrammarDeclaration): The calibration grammar declaration to visit. + + Returns: + list[qasm3_ast.Statement]: The declaration if not in check_only mode. + """ + print("Calibration Grammar Declaration Structure:") + print("Declaration: ", declaration) + + grammar_name = declaration.name + + # Validate supported calibration grammars + supported_grammars = {"openpulse"} # Add more as needed + if grammar_name not in supported_grammars: + raise_qasm3_error( + f"Unsupported calibration grammar '{grammar_name}'", + error_node=declaration, + span=declaration.span, + ) + + if self._check_only: + return [] + + return [declaration] def visit_statement(self, statement: qasm3_ast.Statement) -> list[qasm3_ast.Statement]: """Visit a statement element. @@ -2175,6 +2230,9 @@ def visit_statement(self, statement: qasm3_ast.Statement) -> list[qasm3_ast.Stat qasm3_ast.SubroutineDefinition: self._visit_subroutine_definition, qasm3_ast.ExpressionStatement: lambda x: self._visit_function_call(x.expression), qasm3_ast.IODeclaration: lambda x: [], + pulse_ast.CalibrationStatement: self._visit_calibration_statement, + pulse_ast.CalibrationDefinition: self._visit_calibration_definition, + pulse_ast.CalibrationGrammarDeclaration: self._visit_calibration_grammar_declaration, } visitor_function = visit_map.get(type(statement)) @@ -2228,3 +2286,286 @@ def finalize(self, unrolled_stmts): if len(stmt.qubits) == len(self._qubit_labels): stmt.qubits = [] return unrolled_stmts + + + # ---------------------------------------------------------------------------------------------------------------- + # DEFCAL BLOCK IMPLEMENTATION + + def _visit_calibration_definition(self, definition: pulse_ast.CalibrationDefinition) -> list[pulse_ast.CalibrationDefinition]: + """Visit a calibration definition element (defcal block).""" + gate_name = definition.name.name + print("--------------------------------") + print("Gate name: ", gate_name) + print("--------------------------------") + + # Parse physical qubit identifiers + physical_qubits = self._parse_physical_qubits(definition.qubits, definition) + + # Validate calibration arguments + self._validate_calibration_arguments(definition.arguments, gate_name, definition) + + # have to check for duplicate calibration definitions + calibration_key = (gate_name, tuple(physical_qubits)) + if calibration_key in self._calibration_defns: + raise_qasm3_error( + f"Duplicate calibration definition for gate '{gate_name}' " + f"on physical qubits {physical_qubits}", + error_node=definition, + span=definition.span, + ) + + # Validate the calibration body + self._validate_calibration_body(definition.body, gate_name, definition) + + # store calibration definition + self._calibration_defns[calibration_key] = definition + + logger.debug("Added calibration definition for gate '%s' on qubits %s", gate_name, physical_qubits) + + if self._check_only: + return [] + return [definition] + + def _parse_physical_qubits(self, qubits: list, definition: pulse_ast.CalibrationDefinition) -> list[int]: + """Parse physical qubit identifiers from calibration definition.""" + print("--------------------------------") + print("Parsing physical qubits: ", qubits) + + + physical_qubits = [] + seen_qubits = set() + + for qubit in qubits: + print("--------------------------------") + print("Qubit: ", qubit) + print("--------------------------------") + qubit_name = qubit.name + + if qubit_name in seen_qubits: + raise_qasm3_error( + f"Duplicate physical qubit '{qubit_name}' in calibration definition", + error_node=definition, + span=definition.span, + ) + seen_qubits.add(qubit_name) + + if not qubit_name.startswith('$'): + raise_qasm3_error( + f"Physical qubit identifier must start with '$', got '{qubit_name}'", + error_node=definition, + span=definition.span, + ) + + try: + physical_qubit_id = int(qubit_name[1:]) + physical_qubits.append(physical_qubit_id) + except ValueError: + raise_qasm3_error( + f"Invalid physical qubit identifier '{qubit_name}'. Expected format: $", + error_node=definition, + span=definition.span, + ) + + print("--------------------------------") + return physical_qubits + + def _validate_calibration_arguments(self, arguments: list, gate_name: str, definition: pulse_ast.CalibrationDefinition) -> None: + """Validate the classical arguments of a calibration definition.""" + print("--------------------------------") + print("Validating calibration arguments: ", arguments) + print("--------------------------------") + seen_arg_names = set() + + for arg in arguments: + print("--------------------------------") + print("Argument: ", arg) + print("--------------------------------") + arg_name = arg.name.name if hasattr(arg.name, 'name') else str(arg.name) + + if arg_name in seen_arg_names: + raise_qasm3_error( + f"Duplicate argument name '{arg_name}' in calibration definition for '{gate_name}'", + error_node=definition, + span=definition.span, + ) + seen_arg_names.add(arg_name) + + # Validate argument type + type_name = type(arg.type).__name__ + valid_types = {'AngleType', 'DurationType', 'FloatType', 'IntType', 'ComplexType'} + + if type_name not in valid_types: + raise_qasm3_error( + f"Unsupported argument type '{type_name}' for argument '{arg_name}' " + f"in calibration definition for '{gate_name}'", + error_node=definition, + span=definition.span, + ) + + def _validate_calibration_body(self, body: list, gate_name: str, definition: pulse_ast.CalibrationDefinition) -> None: + """Validate calibration body using EXISTING visitor methods.""" + print("--------------------------------") + print("Validating calibration body: ", body) + print("--------------------------------") + # Set up scope for calibration arguments + self._push_scope({}) + self._add_calibration_args_to_scope(definition.arguments) + + try: + print("--------------------------------") + print("Validating calibration body: ", body) + print("--------------------------------") + for stmt in body: + self._validate_calibration_statement_dispatch(stmt, gate_name, definition) + finally: + self._pop_scope() + + def _add_calibration_args_to_scope(self, arguments: list) -> None: + """Add calibration arguments to current scope.""" + for arg in arguments: + arg_name = arg.name.name if hasattr(arg.name, 'name') else str(arg.name) + variable = Variable( + name=arg_name, + base_type=arg.type, + base_size=self._get_calibration_arg_size(arg.type), + dims=None, + value=None, + is_constant=False, + readonly=True + ) + self._add_var_in_scope(variable) + + def _validate_calibration_statement_dispatch(self, stmt, gate_name: str, definition: pulse_ast.CalibrationDefinition) -> None: + """Route calibration statements to EXISTING validators.""" + stmt_type_name = type(stmt).__name__ + + if stmt_type_name == 'ClassicalDeclaration': + # Only validate openpulse-specific types, then use existing validator + self._validate_openpulse_declaration_type(stmt, gate_name, definition) + # Try to use existing validator if compatible + try: + self._visit_classical_declaration(stmt) + except (AttributeError, TypeError): + # If pulse_ast isn't compatible, do basic validation + self._basic_pulse_declaration_validation(stmt, gate_name, definition) + + elif stmt_type_name == 'ClassicalAssignment': + # Use existing assignment validator + try: + self._visit_classical_assignment(stmt) + except Exception as err: + raise_qasm3_error( + f"Invalid assignment in calibration definition for '{gate_name}': {err}", + error_node=definition, + span=definition.span, + raised_from=err, + ) + + elif stmt_type_name in ['BranchingStatement', 'IfStatement']: + # Use existing branching validator + try: + self._visit_branching_statement(stmt) + except Exception as err: + raise_qasm3_error( + f"Invalid branching statement in calibration definition for '{gate_name}': {err}", + error_node=definition, + span=definition.span, + raised_from=err, + ) + + elif stmt_type_name in ['ForInLoop', 'ForLoop']: + # Use existing loop validator + try: + self._visit_forin_loop(stmt) + except Exception as err: + raise_qasm3_error( + f"Invalid loop in calibration definition for '{gate_name}': {err}", + error_node=definition, + span=definition.span, + raised_from=err, + ) + + elif stmt_type_name == 'ExpressionStatement': + # Validate openpulse function calls + self._validate_openpulse_function_call(stmt, gate_name, definition) + + else: + raise_qasm3_error( + f"Unsupported statement type '{stmt_type_name}' in calibration definition for '{gate_name}'", + error_node=definition, + span=definition.span, + ) + + def _get_calibration_arg_size(self, arg_type) -> int: + """Get size for calibration argument type.""" + type_name = type(arg_type).__name__ + return { + 'AngleType': 64, 'FloatType': 64, 'IntType': 32, + 'DurationType': 64, 'ComplexType': 128 + }.get(type_name, 32) + + def _validate_openpulse_declaration_type(self, stmt, gate_name: str, definition) -> None: + """Validate that declaration uses valid openpulse types.""" + type_name = type(stmt.type).__name__ + valid_types = {'WaveformType', 'FrameType', 'PortType', 'IntType', 'FloatType', 'AngleType', 'DurationType'} + + if type_name not in valid_types: + raise_qasm3_error( + f"Invalid type '{type_name}' in calibration definition for '{gate_name}'", + error_node=definition, + span=definition.span, + ) + + def _basic_pulse_declaration_validation(self, stmt, gate_name: str, definition) -> None: + """Basic validation for pulse declarations when existing validator fails.""" + var_name = stmt.identifier.name if hasattr(stmt.identifier, 'name') else str(stmt.identifier) + + if self._check_in_scope(var_name): + raise_qasm3_error( + f"Variable '{var_name}' already exists in scope", + error_node=definition, + span=definition.span, + ) + + # Add to scope manually + variable = Variable( + name=var_name, + base_type=stmt.type, + base_size=self._get_calibration_arg_size(stmt.type), + dims=None, + value=None, + is_constant=False, + readonly=False + ) + self._add_var_in_scope(variable) + + def _validate_openpulse_function_call(self, stmt, gate_name: str, definition) -> None: + """Validate openpulse function calls.""" + if hasattr(stmt, 'expression') and hasattr(stmt.expression, 'name'): + func_name = stmt.expression.name.name + + openpulse_functions = { + 'shift_phase', 'play', 'delay', 'set_frequency', 'set_phase', + 'drag', 'gaussian', 'constant', 'arbitrary', + 'newframe', 'sum', 'mix', 'capture_v0', 'capture_v1', 'capture', + 'barrier', 'reset_phase' + } + + if func_name not in openpulse_functions: + raise_qasm3_error( + f"Invalid function '{func_name}' in calibration definition for '{gate_name}'", + error_node=definition, + span=definition.span, + ) + + # Basic argument validation - check variables exist in scope + if hasattr(stmt.expression, 'arguments'): + for arg in stmt.expression.arguments: + if hasattr(arg, 'name'): + var_name = arg.name if isinstance(arg.name, str) else getattr(arg.name, 'name', str(arg.name)) + if not self._check_in_scope(var_name): + raise_qasm3_error( + f"Undefined variable '{var_name}' in function '{func_name}'", + error_node=definition, + span=definition.span, + ) \ No newline at end of file diff --git a/tests/pulse/test_import.py b/tests/pulse/test_import.py new file mode 100644 index 00000000..f7e6abb1 --- /dev/null +++ b/tests/pulse/test_import.py @@ -0,0 +1,22 @@ +# Copyright 2025 qBraid +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for pulse functionality.""" + +import pytest + + +def test_openpulse_import(): + """Tests that openpulse can be imported if pyqasm[pulse] is installed.""" + pytest.importorskip("openpulse")