diff --git a/pyaml/common/element_holder.py b/pyaml/common/element_holder.py index 20b57616..ac1d65b7 100644 --- a/pyaml/common/element_holder.py +++ b/pyaml/common/element_holder.py @@ -11,6 +11,7 @@ from ..arrays.serialized_magnet_array import SerializedMagnetsArray from ..bpm.bpm import BPM from ..common.exception import PyAMLException +from ..diagnostics.chromaticity_monitor import ChomaticityMonitor from ..diagnostics.tune_monitor import BetatronTuneMonitor from ..magnet.cfm_magnet import CombinedFunctionMagnet from ..magnet.magnet import Magnet @@ -221,6 +222,15 @@ def get_betatron_tune_monitor(self, name: str) -> BetatronTuneMonitor: def add_betatron_tune_monitor(self, tune_monitor: Element): self.__add(self.__DIAG, tune_monitor) + # Chromaticity monitor + + def get_chromaticity_monitor(self, name: str) -> ChomaticityMonitor: + obj = self.__get("Diagnostic", name, self.__DIAG) + return obj + + def add_chromaticity_monitor(self, chromaticity_monitor: Element): + self.__add(self.__DIAG, chromaticity_monitor) + # Tuning tools def get_tune_tuning(self, name: str) -> "Tune": diff --git a/pyaml/control/abstract_impl.py b/pyaml/control/abstract_impl.py index 87831ebc..21ac527e 100644 --- a/pyaml/control/abstract_impl.py +++ b/pyaml/control/abstract_impl.py @@ -513,3 +513,24 @@ def get(self) -> NDArray: def unit(self) -> str: return self.__tune_monitor._cfg.tune_v.unit() +# ------------------------------------------------------------------------------ + + +class RChromaticityArray(abstract.ReadFloatArray): + """ + Class providing read write access to chromaticity of a control system. + """ + + def __init__(self, chromaticity_monitor): + self.__chromaticity_monitor = chromaticity_monitor + + def _update_chromaticity_monitor(self, chromaticity_monitor): + """Use to attach the proper chromaticity_monitor and not the one used to create this instance""" + self.__chromaticity_monitor = chromaticity_monitor + + def get(self) -> NDArray: + # Return horizontal and vertical chromaticity as a NumPy array + return self.__chromaticity_monitor._last_measured + + def unit(self) -> str: + return "1" diff --git a/pyaml/control/controlsystem.py b/pyaml/control/controlsystem.py index 2c3f296c..b6636c5b 100644 --- a/pyaml/control/controlsystem.py +++ b/pyaml/control/controlsystem.py @@ -15,6 +15,7 @@ CSStrengthScalarAggregator, RBetatronTuneArray, RBpmArray, + RChromaticityArray, RWBpmOffsetArray, RWBpmTiltScalar, RWHardwareArray, @@ -25,6 +26,7 @@ RWStrengthArray, RWStrengthScalar, ) +from ..diagnostics.chromaticity_monitor import ChomaticityMonitor from ..diagnostics.tune_monitor import BetatronTuneMonitor from ..magnet.cfm_magnet import CombinedFunctionMagnet from ..magnet.magnet import Magnet @@ -284,6 +286,11 @@ def fill_device(self, elements: list[Element]): e = e.attach(self, betatron_tune) self.add_betatron_tune_monitor(e) + elif isinstance(e, ChomaticityMonitor): + chromaticity = RChromaticityArray(e) + e = e.attach(self, chromaticity) + self.add_chromaticity_monitor(e) + elif isinstance(e, Tune): self.add_tune_tuning(e.attach(self)) diff --git a/pyaml/diagnostics/chromaticity_monitor.py b/pyaml/diagnostics/chromaticity_monitor.py new file mode 100644 index 00000000..195148e9 --- /dev/null +++ b/pyaml/diagnostics/chromaticity_monitor.py @@ -0,0 +1,305 @@ +from ..common.abstract import ReadFloatArray +from ..common.element import Element, ElementConfigModel +from ..common.exception import PyAMLException + +try: + from typing import Self # Python 3.11+ +except ImportError: + from typing_extensions import Self # Python 3.10 and earlier +from time import sleep + +import matplotlib.pyplot as plt +import numpy as np +from pydantic import ConfigDict + +PYAMLCLASS = "ChomaticityMonitor" + + +class ConfigModel(ElementConfigModel): + model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") + + """ + Chomaticity measurement + + Parameters + ---------- + betatron_tune: str + Name of the diagnostic pyaml device for measuring the tune + RFfreq: str + Name of main RF frequency plant + N_step: int = 5 + Default number of RF step during chromaticity + measurment [default: 5] + alphac: float | None = None + Moment compaction factor + E_delta: float + Default variation of relative energy during chromaticity measurment: + f0 - f0 * E_delta * alphac < f_RF < f0 + f0 * E_delta * alphac + [default: 0.001] + Max_E_delta: float + Maximum autorized variation of relative energy during chromaticity + measurment [default: 0.004] + N_tune_meas: int + Default number of tune measurment per RF frequency [default: 1] + Sleep_between_meas: float + Default time sleep between two tune measurment [default: 2.0] + Sleep_between_RFvar: float + Default time sleep after RF frequency variation [default: 5.0] + fit_order: int + Fitting order [default: 1] + """ + + betatron_tune: str + RFfreq: str + N_step: int = 5 + alphac: float | None = None + E_delta: float = 0.001 + Max_E_delta: float = 0.004 + N_tune_meas: int = 1 + Sleep_between_meas: float = 2.0 + Sleep_between_RFvar: float = 5.0 + fit_order: int = 1 + + +class ChomaticityMonitor(Element): + """ + Class providing access to a chromaticity monitor + of a physical or simulated lattice. The monitor provides + horizontal and vertical chromaticity measurements. + """ + + def __init__(self, cfg: ConfigModel): + """ + Construct a ChomaticityMonitor. + Parameters + ---------- + cfg : ConfigModel + Configuration for the ChromaticityMonitor, including betatron + tune monitor, RF plant, and defaults parameters. + """ + + super().__init__(cfg.name) + self._cfg = cfg + self.__chromaticity = None + self._last_measured = np.array([np.nan, np.nan]) + self._peer = None + + @property + def chromaticity(self) -> ReadFloatArray: + self.check_peer() + return self.__chromaticity + + def attach(self, peer, chromaticity: ReadFloatArray) -> Self: + obj = self.__class__(self._cfg) + chromaticity._update_chromaticity_monitor(obj) + obj.__chromaticity = chromaticity + obj._peer = peer + return obj + + def chromaticity_measurement( + self, + N_step: int = None, + alphac: float = None, + E_delta: float = None, + Max_E_delta: float = None, + N_tune_meas: int = None, + Sleep_between_meas: float = None, + Sleep_between_RFvar: float = None, + fit_order: int = None, + do_plot: bool = None, + ): + """ + Main function for chromaticity measurment + + Parameters + ---------- + N_step: int + Default number of RF step during chromaticity + measurment [default: from config] + alphac: float | None + Moment compaction factor [default: from config] + E_delta: float + Default variation of relative energy during chromaticity measurment: + f0 - f0 * E_delta * alphac < f_RF < f0 + f0 * E_delta * alphac + [default: from config] + Max_E_delta: float + Maximum autorized variation of relative energy during chromaticity + measurment [default: from config] + N_tune_meas: int + Default number of tune measurment per RF frequency [default: from config] + Sleep_between_meas: float + Default time sleep between two tune measurment [default: from config] + Sleep_between_RFvar: float + Default time sleep after RF frequency variation [default: from config] + fit_order: int + Fitting order [default: 1] + do_plot : bool + Do you want to plot the fittinf results ? + """ + if N_step is None: + N_step = self._cfg.N_step + if alphac is None: + alphac = self._cfg.alphac + if E_delta is None: + E_delta = self._cfg.E_delta + if Max_E_delta is None: + Max_E_delta = self._cfg.Max_E_delta + if N_tune_meas is None: + N_tune_meas = self._cfg.N_tune_meas + if Sleep_between_meas is None: + Sleep_between_meas = self._cfg.Sleep_between_meas + if Sleep_between_RFvar is None: + Sleep_between_RFvar = self._cfg.Sleep_between_RFvar + if fit_order is None: + fit_order = self._cfg.fit_order + if abs(E_delta) > abs(Max_E_delta): + # TODO : Add logger to warm that E_delta is to large + return np.array([None, None]) + + if alphac is None: + raise PyAMLException("Moment compaction factor is not defined") + + delta, NuX, NuY = self.measure_tune_response( + N_step=N_step, + alphac=alphac, + E_delta=E_delta, + N_tune_meas=N_tune_meas, + Sleep_between_meas=Sleep_between_meas, + Sleep_between_RFvar=Sleep_between_RFvar, + ) + chrom = self.fit_chromaticity( + delta=delta, NuX=NuX, NuY=NuY, order=fit_order, do_plot=do_plot + ) + return chrom + + def measure_tune_response( + self, + N_step: int, + alphac: float, + E_delta: float, + N_tune_meas: int, + Sleep_between_meas: float, + Sleep_between_RFvar: float, + ): + """ + Main function for chromaticity measurment + + N_step: int + Default number of RF step during chromaticity + measurment [default: from config] + alphac: float | None + Moment compaction factor [default: from config] + E_delta: float + Default variation of relative energy during chromaticity measurment: + f0 - f0 * E_delta * alphac < f_RF < f0 + f0 * E_delta * alphac + [default: from config] + N_tune_meas: int + Default number of tune measurment per RF frequency [default: from config] + Sleep_between_meas: float + Default time sleep between two tune measurment [default: from config] + Sleep_between_RFvar: float + Default time sleep after RF frequency variation [default: from config] + + """ + tune = self._peer.get_betatron_tune_monitor(self._cfg.betatron_tune) + rf = self._peer.get_rf_plant(self._cfg.RFfreq) + + f0 = rf.frequency.get() + + delta = np.linspace(-E_delta, E_delta, N_step) + delta_frec = delta * alphac * f0 + + NuY = np.zeros((N_step, N_tune_meas)) + NuX = np.zeros((N_step, N_tune_meas)) + + # ensure that, even if there is an issus, the script will finish by + # reseting the RF frequency to its original value + err = None + try: + for i, f in enumerate(delta_frec): + # TODO : Use set_and_wait once it is implemented ! + # (and remove Sleep_between_RFvar ?) + rf.frequency.set(f0 + f) + sleep(Sleep_between_RFvar) + + for j in range(N_tune_meas): + NuX[i, j], NuY[i, j] = tune.tune.get() + sleep(Sleep_between_meas) + except Exception as ex: + err = ex + finally: + # TODO : Use set_and_wait once it is implemented ! + rf.frequency.set(f0) + + if err: + raise (err) + + return (delta, NuX, NuY) + + def fit_chromaticity(self, delta, NuX, NuY, order, do_plot): + """ + Compute chromaticity from measurement data. + + Parameters + ---------- + delta : array of float + Relative energy (delta) variation steps done. + NuX : array of float + Horizontal tune measured. + NuZ : array of float + Vertical tune measured. + order : int + order of polynomial used for fit + plot : bool, optional + If True, plot the fit. + Plots are made but not shown. Use plt.show() to show it. + + Returns + ------- + chro : array + Array with horizontal and veritical chromaticity. + + """ + delta = -delta + chro = [] + N_step_delta = len(delta) + for i, Nu in enumerate([NuX, NuY]): + # if N_step_delta%2 == 0: + # tune0 = np.mean((Nu[N_step_delta//2-1,:] + Nu[N_step_delta//2,:])/2.) + # else: + # tune0 = np.mean(Nu[N_step_delta//2,:]) + + dtune = np.mean(Nu[:, :], 1) + coefs = np.polynomial.polynomial.polyfit(delta, dtune, order) + chro.append(coefs[1]) + + if do_plot: + fig = plt.figure("Chromaticity_measurement") + ax = fig.add_subplot(2, 1, 1 + i) + ax.scatter(delta, dtune) + title = "" + for o in range(order, -1, -1): + dp = "" + if o == 1: + dp = "dp/p" + elif o >= 1: + dp = "(dp/p)$^2$" + + title += f"{coefs[o]:.4f} {dp}" + if o != 0: + title += " + " + + print(title) + ax.plot(delta, np.polyval(coefs[::-1], delta)) + ax.set_title(title) + ax.set_xlabel("Momentum Shift, dp/p [%]") + ax.set_ylabel("%s Tune" % ["Horizontal", "Vertical"][i]) + ax.legend() + + if do_plot: + fig.tight_layout() + plt.show() + + self._last_measured = np.array(chro) + + return self._last_measured diff --git a/pyaml/lattice/abstract_impl.py b/pyaml/lattice/abstract_impl.py index 5e2841b7..cbd01e87 100644 --- a/pyaml/lattice/abstract_impl.py +++ b/pyaml/lattice/abstract_impl.py @@ -579,3 +579,24 @@ def get(self) -> float: def unit(self) -> str: return "1" + +# ------------------------------------------------------------------------------ + + +class RChromaticityArray(abstract.ReadFloatArray): + """ + Class providing read-only access to the chromaticity of a ring. + """ + + def __init__(self, ring: at.Lattice): + self.__ring = ring + + def _update_chromaticity_monitor(self, chromaticity_monitor): + """Use to attach the rigth object in control.abstract_impl.RChromaticityArray. Nothing needed here""" + pass + + def get(self) -> float: + return self.__ring.get_chrom()[:2] + + def unit(self) -> str: + return "1" diff --git a/pyaml/lattice/simulator.py b/pyaml/lattice/simulator.py index d042a67c..5dd7051d 100644 --- a/pyaml/lattice/simulator.py +++ b/pyaml/lattice/simulator.py @@ -9,6 +9,7 @@ from ..common.element_holder import ElementHolder from ..common.exception import PyAMLException from ..configuration import get_root_folder +from ..diagnostics.chromaticity_monitor import ChomaticityMonitor from ..diagnostics.tune_monitor import BetatronTuneMonitor from ..lattice.abstract_impl import ( BPMHScalarAggregator, @@ -16,6 +17,7 @@ BPMVScalarAggregator, RBetatronTuneArray, RBpmArray, + RChromaticityArray, RWBpmOffsetArray, RWBpmTiltScalar, RWHardwareArray, @@ -267,6 +269,11 @@ def fill_device(self, elements: list[Element]): e = e.attach(self, betatron_tune) self.add_betatron_tune_monitor(e) + elif isinstance(e, ChomaticityMonitor): + betatron_tune = RChromaticityArray(self.ring) + e = e.attach(self, betatron_tune) + self.add_chromaticity_monitor(e) + elif isinstance(e, Tune): self.add_tune_tuning(e.attach(self)) diff --git a/tests/config/EBS_chromaticity.yaml b/tests/config/EBS_chromaticity.yaml new file mode 100644 index 00000000..97e71fe8 --- /dev/null +++ b/tests/config/EBS_chromaticity.yaml @@ -0,0 +1,44 @@ +type: pyaml.accelerator +facility: ESRF +machine: sr +energy: 6e9 +simulators: + - type: pyaml.lattice.simulator + lattice: sr/lattices/ebs.mat + name: design +controls: + - type: tango.pyaml.controlsystem + tango_host: ebs-simu-2:10000 + name: live +data_folder: /data/store +devices: +- type: pyaml.diagnostics.tune_monitor + name: BETATRON_TUNE + tune_h: + type: tango.pyaml.attribute_read_only + attribute: srdiag/beam-tune/main/Qh + unit: "1" + tune_v: + type: tango.pyaml.attribute_read_only + attribute: srdiag/beam-tune/main/Qv + unit: "1" +- type: pyaml.diagnostics.chromaticity_monitor + name: KSI + betatron_tune: BETATRON_TUNE + RFfreq: RF +- type: pyaml.rf.rf_plant + name: RF + masterclock: + type: tango.pyaml.attribute + attribute: sy/ms/1/Frequency + unit: Hz + transmitters: + - type: pyaml.rf.rf_transmitter + name: RFTRA + cavities: [CAV_C05_01,CAV_C05_02,CAV_C05_03,CAV_C05_04,CAV_C05_05,CAV_C07_01,CAV_C07_02,CAV_C07_03,CAV_C07_04,CAV_C07_05,CAV_C25_01,CAV_C25_02,CAV_C25_03] + harmonic: 1 + distribution: 1 + voltage: + type: tango.pyaml.attribute + attribute: sys/ringsimulator/ebs/RfVoltage + unit: V diff --git a/tests/test_chromaticity_monitor.py b/tests/test_chromaticity_monitor.py new file mode 100644 index 00000000..1e964a2a --- /dev/null +++ b/tests/test_chromaticity_monitor.py @@ -0,0 +1,48 @@ +import numpy as np +import pytest + +from pyaml.accelerator import Accelerator +from pyaml.configuration.factory import Factory + + +def test_simulator_chromaticity_monitor(): + sr: Accelerator = Accelerator.load( + "tests/config/EBS_chromaticity.yaml", ignore_external=True + ) + sr.design.get_lattice().disable_6d() + chromaticity_monitor = sr.design.get_chromaticity_monitor("KSI") + assert ( + chromaticity_monitor.chromaticity.get()[0] + == sr.design.get_lattice().get_chrom()[0] + ) + assert ( + chromaticity_monitor.chromaticity.get()[1] + == sr.design.get_lattice().get_chrom()[1] + ) + + Factory.clear() + + +@pytest.mark.parametrize( + "install_test_package", + [{"name": "tango-pyaml", "path": "tests/dummy_cs/tango-pyaml"}], + indirect=True, +) +def test_controlsystem_chromaticity_monitor(install_test_package): + sr: Accelerator = Accelerator.load("tests/config/EBS_chromaticity.yaml") + chromaticity_monitor = sr.live.get_chromaticity_monitor("KSI") + assert np.isnan(chromaticity_monitor.chromaticity.get()[0]) + assert np.isnan(chromaticity_monitor.chromaticity.get()[1]) + chromaticity_monitor.chromaticity_measurement( + do_plot=False, + alphac=1e-4, + Sleep_between_meas=0, + Sleep_between_RFvar=0, + E_delta=1, + Max_E_delta=1, + ) + ksi = np.abs(chromaticity_monitor.chromaticity.get()) + assert abs(ksi[0]) < 1e-17 + assert abs(ksi[1]) < 1e-17 + + Factory.clear()