From 72e69fee5bc0ffb88f1e1a13f3984d4e90acd3d0 Mon Sep 17 00:00:00 2001 From: "alexandre.moutardier" Date: Mon, 8 Dec 2025 17:42:45 +0100 Subject: [PATCH 01/11] DO NOT USE : first draft that still must be debug when used with control system ... --- pyaml/common/element_holder.py | 10 + pyaml/control/abstract_impl.py | 21 ++ pyaml/control/controlsystem.py | 7 + pyaml/diagnostics/chromaticity_monitor.py | 337 ++++++++++++++++++++++ pyaml/lattice/abstract_impl.py | 21 ++ pyaml/lattice/simulator.py | 7 + tests/config/EBS_chromaticity.yaml | 43 +++ 7 files changed, 446 insertions(+) create mode 100644 pyaml/diagnostics/chromaticity_monitor.py create mode 100644 tests/config/EBS_chromaticity.yaml diff --git a/pyaml/common/element_holder.py b/pyaml/common/element_holder.py index b2571174..5aa444d7 100644 --- a/pyaml/common/element_holder.py +++ b/pyaml/common/element_holder.py @@ -9,6 +9,7 @@ from ..bpm.bpm import BPM from ..common.exception import PyAMLException from ..diagnostics.tune_monitor import BetatronTuneMonitor +from ..diagnostics.chromaticity_monitor import ChomaticityMonitor from ..magnet.cfm_magnet import CombinedFunctionMagnet from ..magnet.magnet import Magnet from ..rf.rf_plant import RFPlant @@ -180,6 +181,15 @@ def get_betatron_tune_monitor(self, name: str) -> BetatronTuneMonitor: def add_betatron_tune_monitor(self, tune_monitor: Element): self.__add(self.__DIAG, tune_monitor) + # Chromaticity monitor + + def get_chromaticity_monitor(self, name: str) -> ChomaticityMonitor: + obj = self.__get("Diagnostic", name, self.__DIAG) + return obj + + def add_chromaticity_monitor(self, chromaticity_monitor: Element): + self.__add(self.__DIAG, chromaticity_monitor) + # Tuning tools def get_tune_tuning(self, name: str) -> Tune: diff --git a/pyaml/control/abstract_impl.py b/pyaml/control/abstract_impl.py index 2ebfaf3f..e3dc03b9 100644 --- a/pyaml/control/abstract_impl.py +++ b/pyaml/control/abstract_impl.py @@ -420,3 +420,24 @@ def get(self) -> NDArray: def unit(self) -> str: return self.__tune_monitor._cfg.tune_v.unit() +# ------------------------------------------------------------------------------ + + +class RChromaticityArray(abstract.ReadFloatArray): + """ + Class providing read write access to chromaticity of a control system. + """ + + def __init__(self, chromaticity_monitor): + self.__chromaticity_monitor = chromaticity_monitor + + def _update_chromaticity_monitor(self, chromaticity_monitor): + """Use to attach the proper chromaticity_monitor and not the one used to create this instance""" + self.__chromaticity_monitor = chromaticity_monitor + + def get(self) -> NDArray: + # Return horizontal and vertical chromaticity as a NumPy array + return self.__chromaticity_monitor._last_measured + + def unit(self) -> str: + return "1" diff --git a/pyaml/control/controlsystem.py b/pyaml/control/controlsystem.py index 2da03f1d..a4226118 100644 --- a/pyaml/control/controlsystem.py +++ b/pyaml/control/controlsystem.py @@ -10,6 +10,7 @@ CSScalarAggregator, CSStrengthScalarAggregator, RBetatronTuneArray, + RChromaticityArray, RBpmArray, RWBpmOffsetArray, RWBpmTiltScalar, @@ -22,6 +23,7 @@ RWStrengthScalar, ) from ..diagnostics.tune_monitor import BetatronTuneMonitor +from ..diagnostics.chromaticity_monitor import ChomaticityMonitor from ..magnet.cfm_magnet import CombinedFunctionMagnet from ..magnet.magnet import Magnet from ..rf.rf_plant import RFPlant, RWTotalVoltage @@ -164,5 +166,10 @@ def fill_device(self, elements: list[Element]): e = e.attach(self, betatron_tune) self.add_betatron_tune_monitor(e) + elif isinstance(e, ChomaticityMonitor): + chromaticity = RChromaticityArray(e) + e = e.attach(self, chromaticity) + self.add_chromaticity_monitor(e) + elif isinstance(e, Tune): self.add_tune_tuning(e.attach(self)) diff --git a/pyaml/diagnostics/chromaticity_monitor.py b/pyaml/diagnostics/chromaticity_monitor.py new file mode 100644 index 00000000..90b732d1 --- /dev/null +++ b/pyaml/diagnostics/chromaticity_monitor.py @@ -0,0 +1,337 @@ +from ..common.abstract import ReadFloatArray +from ..common.element import Element, ElementConfigModel +from ..control.deviceaccess import DeviceAccess + +try: + from typing import Self # Python 3.11+ +except ImportError: + from typing_extensions import Self # Python 3.10 and earlier +from pydantic import ConfigDict + +from time import sleep +import numpy as np +import matplotlib.pyplot as plt +from scipy.optimize import curve_fit + +PYAMLCLASS = "ChomaticityMonitor" + + +class ConfigModel(ElementConfigModel): + model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") + + betatron_tune: str + """Name of the diagnostic pyaml device for measuring the tune""" + RFfreq: str + """Name of main RF frequency plant""" + N_step: int = 5 + """Default number of RF step during chromaticity measurment [default: 5]""" + alphac: float = 4.1819e-04 + """Default Twiss parameter alpha ???""" + E_delta: float = 0.001 + """Default variation of relative energy during chromaticity measurment : f0 - f0 * E_delta * alphac < f_RF < f0 + f0 * E_delta * alphac [default: 0.001]""" + Max_E_delta: float = 0.004 + """Maximum autorized variation of relative energy during chromaticity measurment [default: 0.004]""" + N_tune_meas: int = 1 + """Default number of tune measurment per RF frequency [default: 1]""" + Sleep_between_meas: float = 2.0 + """Default time sleep between two tune measurment [default: 2.0]""" + Sleep_between_RFvar: float = 5.0 + """Default time sleep after RF frequency variation [default: 5.0]""" + fit_method: str = "lin" + """Default fitting method used for chromaticity between "lin" for linear or "quad" for quadratique [default: "lin"]""" + + +class ChomaticityMonitor(Element): + """ + Class providing access to a betatron tune monitor + of a physical or simulated lattice. + The monitor provides horizontal and vertical betatron tune measurements. + """ + + def __init__(self, cfg: ConfigModel): + """ + Construct a BetatronTuneMonitor. + Parameters + ---------- + cfg : ConfigModel + Configuration for the ChromaticityMonitor, including betatron tune monitor, RF plant, and defaults parameters. + """ + + super().__init__(cfg.name) + self._cfg = cfg + self.__chromaticity = None + self._last_measured = np.array([np.nan, np.nan]) + self._peer = None + + @property + def chromaticity(self) -> ReadFloatArray: + self.check_peer() + return self.__chromaticity + + def attach(self, peer, chromaticity: ReadFloatArray) -> Self: + obj = self.__class__(self._cfg) + chromaticity._update_chromaticity_monitor(obj) + obj.__chromaticity = chromaticity + obj._peer = peer + return obj + + def chromaticity_measurement(self, + N_step: int=None, + alphac: float=None, + E_delta: float=None, + Max_E_delta: float=None, + N_tune_meas: int=None, + Sleep_between_meas: float=None, + Sleep_between_RFvar: float=None, + fit_method: str=None, + do_plot: bool=False): + """ + Main function for chromaticity measurment + + N_step : int + Number of RF step during chromaticity measurment. + If defined, eraised defalt and configation files values. + alphac : float + Default Twiss parameter alpha ??? + If defined, eraised defalt and configation files values. + E_delta : float + Default variation of relative energy during chromaticity measurment : f0 - f0 * E_delta * alphac < f_RF < f0 + f0 * E_delta * alphac + If defined, eraised defalt and configation files values. + Max_E_delta : float + Maximum autorized variation of relative energy during chromaticity measurment + If defined, eraised defalt and configation files values. + N_tune_meas : int + Default number of tune measurment per RF frequency. + If defined, eraised defalt and configation files values. + Sleep_between_meas : float + Default time sleep between two tune measurment. + If defined, eraised defalt and configation files values. + Sleep_between_RFvar: float + Default time sleep after RF frequency variation. + If defined, eraised defalt and configation files values. + fit_method: str + Default fitting method used for chromaticity between "lin" for linear or "quad" for quadratique. + If defined, eraised defalt and configation files values. + do_plot : bool + Do you want to plot the fittinf results ? + """ + if N_step is None : + N_step = self._cfg.N_step + if alphac is None : + alphac = self._cfg.alphac + if E_delta is None : + E_delta = self._cfg.E_delta + if Max_E_delta is None : + Max_E_delta = self._cfg.Max_E_delta + if N_tune_meas is None : + N_tune_meas = self._cfg.N_tune_meas + if Sleep_between_meas is None : + Sleep_between_meas = self._cfg.Sleep_between_meas + if Sleep_between_RFvar is None : + Sleep_between_RFvar = self._cfg.Sleep_between_RFvar + if fit_method is None : + fit_method = self._cfg.fit_method + + if abs(E_delta) > abs(Max_E_delta): + # TODO : Add logger to warm that E_delta is to large + return np.array([None, None]) + + Delta, NuX, NuY = self.measure_tune_response(N_step=N_step, alphac=alphac, E_delta=E_delta, N_tune_meas=N_tune_meas, Sleep_between_meas=Sleep_between_meas, Sleep_between_RFvar=Sleep_between_RFvar) + chrom = self.fit_chromaticity(Delta, NuX, NuY, fit_method, do_plot) + return(chrom) + + def measure_tune_response(self, + N_step: int, + alphac: float, + E_delta: float, + N_tune_meas: int, + Sleep_between_meas: float, + Sleep_between_RFvar: float): + """ + Main function for chromaticity measurment + + N_step : int + Number of RF step during chromaticity measurment. + alphac : float + Default Twiss parameter alpha ??? + E_delta : float + Default variation of relative energy during chromaticity measurment : f0 - f0 * E_delta * alphac < f_RF < f0 + f0 * E_delta * alphac + N_tune_meas : int + Default number of tune measurment per RF frequency. + Sleep_between_meas : float + Default time sleep between two tune measurment. + Sleep_between_RFvar: float + Default time sleep after RF frequency variation. + + """ + tune = self._peer.get_betatron_tune_monitor(self._cfg.betatron_tune) + rf = self._peer.get_rf_plant(self._cfg.RFfreq) + + f0 = rf.frequency.get() + delta_f = f0 * E_delta * alphac + Delta = np.linspace(f0 - delta_f, f0 + delta_f, N_step) + + NuY = np.zeros((N_step, N_tune_meas)) + NuX = np.zeros((N_step, N_tune_meas)) + + try: # ensure that, even if there is an issus, the script will finish by reseting the RF frequency + for i, f in enumerate(Delta): + rf.frequency.set_and_wait(f) + sleep(Sleep_between_RFvar) + + for j in range(N_tune_meas): + NuX[i,j], NuY[i,j] = tune.tune.get() + sleep(Sleep_between_meas) + except: + # TODO : add proper exception + print("NOK") + finally: + rf.frequency.set_and_wait(f0) + return(Delta, NuX, NuY) + + + def fit_chromaticity(self, Delta, NuX, NuY, method, do_plot): + """ + Compute chromaticity from measurement data. + + Parameters + ---------- + delta : array of float + Relative energy (delta) variation steps done. + NuX : array of float + Horizontal tune measured. + NuZ : array of float + Vertical tune measured. + method : {"lin" or "quad"}, optional + "lin" uses a linear fit and "quad" a 2nd order polynomial. + The default is "lin". + plot : bool, optional + If True, plot the fit. + The default is False. + + Returns + ------- + chro : array + Array with horizontal and veritical chromaticity. + + """ + Delta = -Delta + chro = [] + N_step_delta = len(Delta) + for i, Nu in enumerate([NuX, NuY]): + if N_step_delta%2 == 0: + tune0 = np.mean(Nu[N_step_delta//2:N_step_delta//2+1,:]) + else: + tune0 = np.mean(Nu[N_step_delta//2,:]) + + dtune = np.mean(Nu[:,:],1) - tune0 + + if method=="lin": + def linear_fit(x, a, b): + return a*x + b + popt_lin, _ = curve_fit(linear_fit, Delta, dtune) + chro.append(popt_lin[0]) + elif method=="quad": + def quad_fit(x, a, b, c): + return a*x**2 + b*x + c + popt_quad, _ = curve_fit(quad_fit, Delta, dtune) + chro.append(popt_quad[1]) + + if do_plot: + plt.figure() + plt.scatter(Delta, dtune) + if method=="lin": + plt.plot(Delta, + linear_fit(Delta, popt_lin[0], popt_lin[1]), + '--', + label="fit: {:.4f}x+{:.8f}".format(*popt_lin)) + elif method=="quad": + plt.plot(Delta, + quad_fit(Delta, popt_quad[0], popt_quad[1], popt_quad[2]), + '--', + label="fit: {:.4f}x2+{:.4f}x+{:.4f}".format(*popt_quad)) + plt.xlabel('$\\delta$') + if i == 0: + plt.title("Horizontal Chromaticity") + plt.ylabel('$\\delta Q_x$') + else: + plt.title("Vertical Chromaticity") + plt.ylabel('$\\delta Q_y$') + plt.legend() + + self._last_measured = np.array(chro) + + return self._last_measured + + + + + +# exit() +# from ..common.element import Element, ElementConfigModel +# from ..common.abstract import ReadFloatArray +# from ..control.deviceaccess import DeviceAccess +# from .tune_monitor import BetatronTuneMonitor +# try: +# from typing import Self # Python 3.11+ +# except ImportError: +# from typing_extensions import Self # Python 3.10 and earlier +# from pydantic import ConfigDict +# from scipy.optimize import curve_fit +# import matplotlib.pyplot as plt + +# PYAMLCLASS = "ChromaticityMonitor" + +# class ConfigModel(ElementConfigModel): + +# model_config = ConfigDict(arbitrary_types_allowed=True,extra="forbid") +# # tune_monitor: str +# # """Betatron tune""" + +# tune_h: DeviceAccess +# """Horizontal betatron tune""" +# tune_v: DeviceAccess +# """Vertical betatron tune""" + + +# class ChromaticityMonitor(Element): +# """ +# Class providing measurement of chromaticity using betatron tune monitor of a physical or simulated lattice. +# The monitor provides horizontal and vertical chromaticity (ksi) measurements. +# """ + +# def __init__(self, cfg: ConfigModel): +# """ +# Construct a ChromaticityMonitor. +# Parameters +# ---------- +# cfg : ConfigModel +# Configuration for the ChromaticityMonitor and BetatronTuneMonitor. +# """ + +# super().__init__(cfg.name) +# self._cfg = cfg +# self.__RFfreq = None +# # self.__ksi = None + +# @property +# def RFfreq(self) -> ReadFloatArray: +# self.check_peer() +# return self.__RFfreq + +# # @property +# # def Ksi(self) -> ReadFloatArray: +# # self.check_peer() +# # return self.__ksi + +# # @property +# # def chromaticity(self) -> ReadFloatArray: +# # return self.Ksi + +# def attach(self, peer, RFfreq: ReadFloatArray) -> Self: +# obj = self.__class__(self._cfg) +# obj.__RFfreq = RFfreq +# obj._peer = peer +# return obj + +# # def \ No newline at end of file diff --git a/pyaml/lattice/abstract_impl.py b/pyaml/lattice/abstract_impl.py index 6651e18d..2facd47f 100644 --- a/pyaml/lattice/abstract_impl.py +++ b/pyaml/lattice/abstract_impl.py @@ -497,3 +497,24 @@ def get(self) -> float: def unit(self) -> str: return "1" + +# ------------------------------------------------------------------------------ + + +class RChromaticityArray(abstract.ReadFloatArray): + """ + Class providing read-only access to the chromaticity of a ring. + """ + + def __init__(self, ring: at.Lattice): + self.__ring = ring + + def _update_chromaticity_monitor(self, chromaticity_monitor): + """Use to attach the rigth object in control.abstract_impl.RChromaticityArray. Nothing needed here""" + pass + + def get(self) -> float: + return self.__ring.get_chrom()[:2] + + def unit(self) -> str: + return "1" diff --git a/pyaml/lattice/simulator.py b/pyaml/lattice/simulator.py index 1438dfd2..c9eb23df 100644 --- a/pyaml/lattice/simulator.py +++ b/pyaml/lattice/simulator.py @@ -10,11 +10,13 @@ from ..common.exception import PyAMLException from ..configuration import get_root_folder from ..diagnostics.tune_monitor import BetatronTuneMonitor +from ..diagnostics.chromaticity_monitor import ChomaticityMonitor from ..lattice.abstract_impl import ( BPMHScalarAggregator, BPMScalarAggregator, BPMVScalarAggregator, RBetatronTuneArray, + RChromaticityArray, RBpmArray, RWBpmOffsetArray, RWBpmTiltScalar, @@ -204,6 +206,11 @@ def fill_device(self, elements: list[Element]): e = e.attach(self, betatron_tune) self.add_betatron_tune_monitor(e) + elif isinstance(e, ChomaticityMonitor): + betatron_tune = RChromaticityArray(self.ring) + e = e.attach(self, betatron_tune) + self.add_chromaticity_monitor(e) + elif isinstance(e, Tune): self.add_tune_tuning(e.attach(self)) diff --git a/tests/config/EBS_chromaticity.yaml b/tests/config/EBS_chromaticity.yaml new file mode 100644 index 00000000..7caef7cb --- /dev/null +++ b/tests/config/EBS_chromaticity.yaml @@ -0,0 +1,43 @@ +type: pyaml.accelerator +name: sr +energy: 6e9 +simulators: + - type: pyaml.lattice.simulator + lattice: sr/lattices/ebs.mat + name: design +controls: + - type: tango.pyaml.controlsystem + tango_host: ebs-simu-3:10000 + name: live +data_folder: /data/store +devices: +- type: pyaml.diagnostics.tune_monitor + name: BETATRON_TUNE + tune_h: + type: tango.pyaml.attribute_read_only + attribute: srdiag/tune/tune_h + unit: mm + tune_v: + type: tango.pyaml.attribute_read_only + attribute: srdiag/tune/tune_v + unit: mm +- type: pyaml.diagnostics.chromaticity_monitor + name: KSI + betatron_tune: BETATRON_TUNE + RFfreq: RF +- type: pyaml.rf.rf_plant + name: RF + masterclock: + type: tango.pyaml.attribute + attribute: sy/ms/1/Frequency + unit: Hz + transmitters: + - type: pyaml.rf.rf_transmitter + name: RFTRA + cavities: [CAV_C05_01,CAV_C05_02,CAV_C05_03,CAV_C05_04,CAV_C05_05,CAV_C07_01,CAV_C07_02,CAV_C07_03,CAV_C07_04,CAV_C07_05,CAV_C25_01,CAV_C25_02,CAV_C25_03] + harmonic: 1 + distribution: 1 + voltage: + type: tango.pyaml.attribute + attribute: sys/ringsimulator/ebs/RfVoltage + unit: V From 551492e4dde92cda16b5554faf95be0101d62938 Mon Sep 17 00:00:00 2001 From: Operateur Date: Tue, 9 Dec 2025 03:37:17 +0100 Subject: [PATCH 02/11] Update of chromaticity measurement after test on SOLEIL synchrotron ring --- pyaml/diagnostics/chromaticity_monitor.py | 155 ++++++---------------- 1 file changed, 42 insertions(+), 113 deletions(-) diff --git a/pyaml/diagnostics/chromaticity_monitor.py b/pyaml/diagnostics/chromaticity_monitor.py index 90b732d1..824a38b4 100644 --- a/pyaml/diagnostics/chromaticity_monitor.py +++ b/pyaml/diagnostics/chromaticity_monitor.py @@ -84,7 +84,7 @@ def chromaticity_measurement(self, Sleep_between_meas: float=None, Sleep_between_RFvar: float=None, fit_method: str=None, - do_plot: bool=False): + do_plot: bool=None): """ Main function for chromaticity measurment @@ -131,13 +131,14 @@ def chromaticity_measurement(self, Sleep_between_RFvar = self._cfg.Sleep_between_RFvar if fit_method is None : fit_method = self._cfg.fit_method - + if do_plot is None : + do_plot = self._cfg.do_plot if abs(E_delta) > abs(Max_E_delta): # TODO : Add logger to warm that E_delta is to large return np.array([None, None]) - Delta, NuX, NuY = self.measure_tune_response(N_step=N_step, alphac=alphac, E_delta=E_delta, N_tune_meas=N_tune_meas, Sleep_between_meas=Sleep_between_meas, Sleep_between_RFvar=Sleep_between_RFvar) - chrom = self.fit_chromaticity(Delta, NuX, NuY, fit_method, do_plot) + delta, NuX, NuY = self.measure_tune_response(N_step=N_step, alphac=alphac, E_delta=E_delta, N_tune_meas=N_tune_meas, Sleep_between_meas=Sleep_between_meas, Sleep_between_RFvar=Sleep_between_RFvar) + chrom = self.fit_chromaticity(delta=delta, NuX=NuX, NuY=NuY, method=fit_method, do_plot=do_plot) return(chrom) def measure_tune_response(self, @@ -168,15 +169,17 @@ def measure_tune_response(self, rf = self._peer.get_rf_plant(self._cfg.RFfreq) f0 = rf.frequency.get() - delta_f = f0 * E_delta * alphac - Delta = np.linspace(f0 - delta_f, f0 + delta_f, N_step) + + delta = np.linspace(-E_delta, E_delta, N_step) + delta_frec = delta * alphac * f0 NuY = np.zeros((N_step, N_tune_meas)) NuX = np.zeros((N_step, N_tune_meas)) try: # ensure that, even if there is an issus, the script will finish by reseting the RF frequency - for i, f in enumerate(Delta): - rf.frequency.set_and_wait(f) + for i, f in enumerate(delta_frec): + # TODO : Use set_and_wait once it is implemented ! (and remove Sleep_between_RFvar ?) + rf.frequency.set(f0 + f) sleep(Sleep_between_RFvar) for j in range(N_tune_meas): @@ -186,11 +189,13 @@ def measure_tune_response(self, # TODO : add proper exception print("NOK") finally: - rf.frequency.set_and_wait(f0) - return(Delta, NuX, NuY) + # TODO : Use set_and_wait once it is implemented ! + rf.frequency.set(f0) + return(delta, NuX, NuY) - def fit_chromaticity(self, Delta, NuX, NuY, method, do_plot): + + def fit_chromaticity(self, delta, NuX, NuY, method, do_plot): """ Compute chromaticity from measurement data. @@ -202,12 +207,11 @@ def fit_chromaticity(self, Delta, NuX, NuY, method, do_plot): Horizontal tune measured. NuZ : array of float Vertical tune measured. - method : {"lin" or "quad"}, optional + method : {"lin" or "quad"} "lin" uses a linear fit and "quad" a 2nd order polynomial. - The default is "lin". plot : bool, optional If True, plot the fit. - The default is False. + Plots are made but not shown. Use plt.show() to show it. Returns ------- @@ -215,123 +219,48 @@ def fit_chromaticity(self, Delta, NuX, NuY, method, do_plot): Array with horizontal and veritical chromaticity. """ - Delta = -Delta + delta = -delta chro = [] - N_step_delta = len(Delta) + N_step_delta = len(delta) for i, Nu in enumerate([NuX, NuY]): - if N_step_delta%2 == 0: - tune0 = np.mean(Nu[N_step_delta//2:N_step_delta//2+1,:]) - else: - tune0 = np.mean(Nu[N_step_delta//2,:]) + # if N_step_delta%2 == 0: + # tune0 = np.mean((Nu[N_step_delta//2-1,:] + Nu[N_step_delta//2,:])/2.) + # else: + # tune0 = np.mean(Nu[N_step_delta//2,:]) - dtune = np.mean(Nu[:,:],1) - tune0 + dtune = np.mean(Nu[:,:],1) if method=="lin": def linear_fit(x, a, b): return a*x + b - popt_lin, _ = curve_fit(linear_fit, Delta, dtune) + popt_lin, _ = curve_fit(linear_fit, delta, dtune) chro.append(popt_lin[0]) elif method=="quad": def quad_fit(x, a, b, c): return a*x**2 + b*x + c - popt_quad, _ = curve_fit(quad_fit, Delta, dtune) + popt_quad, _ = curve_fit(quad_fit, delta, dtune) chro.append(popt_quad[1]) if do_plot: - plt.figure() - plt.scatter(Delta, dtune) + fig = plt.figure("Chromaticity_measurement") + ax = fig.add_subplot(2,1,1+i) + ax.scatter(delta, dtune) if method=="lin": - plt.plot(Delta, - linear_fit(Delta, popt_lin[0], popt_lin[1]), - '--', - label="fit: {:.4f}x+{:.8f}".format(*popt_lin)) + ax.plot(delta, + linear_fit(delta, popt_lin[0], popt_lin[1]), + '--') + title = "{:.4f}dp/p+{:.8f}".format(*popt_lin) elif method=="quad": - plt.plot(Delta, - quad_fit(Delta, popt_quad[0], popt_quad[1], popt_quad[2]), - '--', - label="fit: {:.4f}x2+{:.4f}x+{:.4f}".format(*popt_quad)) - plt.xlabel('$\\delta$') - if i == 0: - plt.title("Horizontal Chromaticity") - plt.ylabel('$\\delta Q_x$') - else: - plt.title("Vertical Chromaticity") - plt.ylabel('$\\delta Q_y$') - plt.legend() - + ax.plot(delta, + quad_fit(delta, popt_quad[0], popt_quad[1], popt_quad[2]), + '--',) + title="{:.4f}(dp/p)$^2$+{:.4f}dp/p+{:.4f}".format(*popt_quad) + ax.set_title(title) + ax.set_xlabel('Momentum Shift, dp/p [%]') + ax.set_ylabel("%s Tune"%["Horizontal", "Vertical"][i]) + # ax.legend() self._last_measured = np.array(chro) return self._last_measured - - - -# exit() -# from ..common.element import Element, ElementConfigModel -# from ..common.abstract import ReadFloatArray -# from ..control.deviceaccess import DeviceAccess -# from .tune_monitor import BetatronTuneMonitor -# try: -# from typing import Self # Python 3.11+ -# except ImportError: -# from typing_extensions import Self # Python 3.10 and earlier -# from pydantic import ConfigDict -# from scipy.optimize import curve_fit -# import matplotlib.pyplot as plt - -# PYAMLCLASS = "ChromaticityMonitor" - -# class ConfigModel(ElementConfigModel): - -# model_config = ConfigDict(arbitrary_types_allowed=True,extra="forbid") -# # tune_monitor: str -# # """Betatron tune""" - -# tune_h: DeviceAccess -# """Horizontal betatron tune""" -# tune_v: DeviceAccess -# """Vertical betatron tune""" - - -# class ChromaticityMonitor(Element): -# """ -# Class providing measurement of chromaticity using betatron tune monitor of a physical or simulated lattice. -# The monitor provides horizontal and vertical chromaticity (ksi) measurements. -# """ - -# def __init__(self, cfg: ConfigModel): -# """ -# Construct a ChromaticityMonitor. -# Parameters -# ---------- -# cfg : ConfigModel -# Configuration for the ChromaticityMonitor and BetatronTuneMonitor. -# """ - -# super().__init__(cfg.name) -# self._cfg = cfg -# self.__RFfreq = None -# # self.__ksi = None - -# @property -# def RFfreq(self) -> ReadFloatArray: -# self.check_peer() -# return self.__RFfreq - -# # @property -# # def Ksi(self) -> ReadFloatArray: -# # self.check_peer() -# # return self.__ksi - -# # @property -# # def chromaticity(self) -> ReadFloatArray: -# # return self.Ksi - -# def attach(self, peer, RFfreq: ReadFloatArray) -> Self: -# obj = self.__class__(self._cfg) -# obj.__RFfreq = RFfreq -# obj._peer = peer -# return obj - -# # def \ No newline at end of file From 6e5e3bc5b121ca5dcbd45d4508c2f9e9b6871c01 Mon Sep 17 00:00:00 2001 From: "alexandre.moutardier" Date: Thu, 11 Dec 2025 17:34:34 +0100 Subject: [PATCH 03/11] Add unitary test for chromaticity measurement and correct error ('do_plot' as no default value un cfg) --- pyaml/diagnostics/chromaticity_monitor.py | 2 -- tests/test_chromaticity_monitor.py | 37 +++++++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) create mode 100644 tests/test_chromaticity_monitor.py diff --git a/pyaml/diagnostics/chromaticity_monitor.py b/pyaml/diagnostics/chromaticity_monitor.py index 824a38b4..3cd7f0b4 100644 --- a/pyaml/diagnostics/chromaticity_monitor.py +++ b/pyaml/diagnostics/chromaticity_monitor.py @@ -131,8 +131,6 @@ def chromaticity_measurement(self, Sleep_between_RFvar = self._cfg.Sleep_between_RFvar if fit_method is None : fit_method = self._cfg.fit_method - if do_plot is None : - do_plot = self._cfg.do_plot if abs(E_delta) > abs(Max_E_delta): # TODO : Add logger to warm that E_delta is to large return np.array([None, None]) diff --git a/tests/test_chromaticity_monitor.py b/tests/test_chromaticity_monitor.py new file mode 100644 index 00000000..9d60ba70 --- /dev/null +++ b/tests/test_chromaticity_monitor.py @@ -0,0 +1,37 @@ +import pytest + +from pyaml.accelerator import Accelerator +from pyaml.configuration.factory import Factory +import numpy as np + +@pytest.mark.parametrize( + "install_test_package", + [{"name": "tango-pyaml", "path": "tests/dummy_cs/tango-pyaml"}], + indirect=True, +) +def test_simulator_chromaticity_monitor(install_test_package): + sr: Accelerator = Accelerator.load("tests/config/EBS_chromaticity.yaml") + sr.design.get_lattice().disable_6d() + chromaticity_monitor = sr.design.get_chromaticity_monitor("KSI") + assert chromaticity_monitor.chromaticity.get()[0] == sr.design.get_lattice().get_chrom()[0] + assert chromaticity_monitor.chromaticity.get()[1] == sr.design.get_lattice().get_chrom()[1] + + Factory.clear() + + +@pytest.mark.parametrize( + "install_test_package", + [{"name": "tango-pyaml", "path": "tests/dummy_cs/tango-pyaml"}], + indirect=True, +) +def test_controlsystem_chromaticity_monitor(install_test_package): + sr: Accelerator = Accelerator.load("tests/config/EBS_chromaticity.yaml") + chromaticity_monitor = sr.live.get_chromaticity_monitor("KSI") + assert np.isnan(chromaticity_monitor.chromaticity.get()[0]) + assert np.isnan(chromaticity_monitor.chromaticity.get()[1]) + chromaticity_monitor.chromaticity_measurement(do_plot=False, Sleep_between_meas=0, Sleep_between_RFvar=0) + ksi = np.abs(chromaticity_monitor.chromaticity.get()) + assert ksi[0] < 1e-17 + assert ksi[1] < 1e-17 + + Factory.clear() From f5f78320c2e2c21b74a4a86d99b97395cad302c1 Mon Sep 17 00:00:00 2001 From: "alexandre.moutardier" Date: Fri, 16 Jan 2026 16:06:07 +0100 Subject: [PATCH 04/11] reformating ruff (ruff check . --fix) --- pyaml/common/element_holder.py | 2 +- pyaml/control/controlsystem.py | 4 +-- pyaml/diagnostics/chromaticity_monitor.py | 40 +++++++++++------------ pyaml/lattice/simulator.py | 4 +-- tests/test_chromaticity_monitor.py | 3 +- 5 files changed, 27 insertions(+), 26 deletions(-) diff --git a/pyaml/common/element_holder.py b/pyaml/common/element_holder.py index 5db0c34d..f462a325 100644 --- a/pyaml/common/element_holder.py +++ b/pyaml/common/element_holder.py @@ -8,8 +8,8 @@ from ..arrays.magnet_array import MagnetArray from ..bpm.bpm import BPM from ..common.exception import PyAMLException -from ..diagnostics.tune_monitor import BetatronTuneMonitor from ..diagnostics.chromaticity_monitor import ChomaticityMonitor +from ..diagnostics.tune_monitor import BetatronTuneMonitor from ..magnet.cfm_magnet import CombinedFunctionMagnet from ..magnet.magnet import Magnet from ..rf.rf_plant import RFPlant diff --git a/pyaml/control/controlsystem.py b/pyaml/control/controlsystem.py index a4226118..d4209163 100644 --- a/pyaml/control/controlsystem.py +++ b/pyaml/control/controlsystem.py @@ -10,8 +10,8 @@ CSScalarAggregator, CSStrengthScalarAggregator, RBetatronTuneArray, - RChromaticityArray, RBpmArray, + RChromaticityArray, RWBpmOffsetArray, RWBpmTiltScalar, RWHardwareArray, @@ -22,8 +22,8 @@ RWStrengthArray, RWStrengthScalar, ) -from ..diagnostics.tune_monitor import BetatronTuneMonitor from ..diagnostics.chromaticity_monitor import ChomaticityMonitor +from ..diagnostics.tune_monitor import BetatronTuneMonitor from ..magnet.cfm_magnet import CombinedFunctionMagnet from ..magnet.magnet import Magnet from ..rf.rf_plant import RFPlant, RWTotalVoltage diff --git a/pyaml/diagnostics/chromaticity_monitor.py b/pyaml/diagnostics/chromaticity_monitor.py index 3cd7f0b4..01f979d0 100644 --- a/pyaml/diagnostics/chromaticity_monitor.py +++ b/pyaml/diagnostics/chromaticity_monitor.py @@ -6,11 +6,11 @@ from typing import Self # Python 3.11+ except ImportError: from typing_extensions import Self # Python 3.10 and earlier -from pydantic import ConfigDict - from time import sleep -import numpy as np + import matplotlib.pyplot as plt +import numpy as np +from pydantic import ConfigDict from scipy.optimize import curve_fit PYAMLCLASS = "ChomaticityMonitor" @@ -75,13 +75,13 @@ def attach(self, peer, chromaticity: ReadFloatArray) -> Self: obj._peer = peer return obj - def chromaticity_measurement(self, - N_step: int=None, - alphac: float=None, - E_delta: float=None, - Max_E_delta: float=None, - N_tune_meas: int=None, - Sleep_between_meas: float=None, + def chromaticity_measurement(self, + N_step: int=None, + alphac: float=None, + E_delta: float=None, + Max_E_delta: float=None, + N_tune_meas: int=None, + Sleep_between_meas: float=None, Sleep_between_RFvar: float=None, fit_method: str=None, do_plot: bool=None): @@ -139,12 +139,12 @@ def chromaticity_measurement(self, chrom = self.fit_chromaticity(delta=delta, NuX=NuX, NuY=NuY, method=fit_method, do_plot=do_plot) return(chrom) - def measure_tune_response(self, - N_step: int, - alphac: float, - E_delta: float, - N_tune_meas: int, - Sleep_between_meas: float, + def measure_tune_response(self, + N_step: int, + alphac: float, + E_delta: float, + N_tune_meas: int, + Sleep_between_meas: float, Sleep_between_RFvar: float): """ Main function for chromaticity measurment @@ -165,7 +165,7 @@ def measure_tune_response(self, """ tune = self._peer.get_betatron_tune_monitor(self._cfg.betatron_tune) rf = self._peer.get_rf_plant(self._cfg.RFfreq) - + f0 = rf.frequency.get() delta = np.linspace(-E_delta, E_delta, N_step) @@ -190,7 +190,7 @@ def measure_tune_response(self, # TODO : Use set_and_wait once it is implemented ! rf.frequency.set(f0) - return(delta, NuX, NuY) + return(delta, NuX, NuY) def fit_chromaticity(self, delta, NuX, NuY, method, do_plot): @@ -247,9 +247,9 @@ def quad_fit(x, a, b, c): ax.plot(delta, linear_fit(delta, popt_lin[0], popt_lin[1]), '--') - title = "{:.4f}dp/p+{:.8f}".format(*popt_lin) + title = "{:.4f}dp/p+{:.8f}".format(*popt_lin) elif method=="quad": - ax.plot(delta, + ax.plot(delta, quad_fit(delta, popt_quad[0], popt_quad[1], popt_quad[2]), '--',) title="{:.4f}(dp/p)$^2$+{:.4f}dp/p+{:.4f}".format(*popt_quad) diff --git a/pyaml/lattice/simulator.py b/pyaml/lattice/simulator.py index c9eb23df..ba6f3d51 100644 --- a/pyaml/lattice/simulator.py +++ b/pyaml/lattice/simulator.py @@ -9,15 +9,15 @@ from ..common.element_holder import ElementHolder from ..common.exception import PyAMLException from ..configuration import get_root_folder -from ..diagnostics.tune_monitor import BetatronTuneMonitor from ..diagnostics.chromaticity_monitor import ChomaticityMonitor +from ..diagnostics.tune_monitor import BetatronTuneMonitor from ..lattice.abstract_impl import ( BPMHScalarAggregator, BPMScalarAggregator, BPMVScalarAggregator, RBetatronTuneArray, - RChromaticityArray, RBpmArray, + RChromaticityArray, RWBpmOffsetArray, RWBpmTiltScalar, RWHardwareArray, diff --git a/tests/test_chromaticity_monitor.py b/tests/test_chromaticity_monitor.py index 9d60ba70..e4d082a0 100644 --- a/tests/test_chromaticity_monitor.py +++ b/tests/test_chromaticity_monitor.py @@ -1,8 +1,9 @@ +import numpy as np import pytest from pyaml.accelerator import Accelerator from pyaml.configuration.factory import Factory -import numpy as np + @pytest.mark.parametrize( "install_test_package", From 872752c91491464344e47607c28a923a3b947ac6 Mon Sep 17 00:00:00 2001 From: "alexandre.moutardier" Date: Fri, 16 Jan 2026 16:17:17 +0100 Subject: [PATCH 05/11] update change name -> machine and facility --- tests/config/EBS_chromaticity.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/config/EBS_chromaticity.yaml b/tests/config/EBS_chromaticity.yaml index 7caef7cb..eb65395f 100644 --- a/tests/config/EBS_chromaticity.yaml +++ b/tests/config/EBS_chromaticity.yaml @@ -1,5 +1,6 @@ type: pyaml.accelerator -name: sr +facility: ESRF +machine: sr energy: 6e9 simulators: - type: pyaml.lattice.simulator From c68564d3f22722e14837dbbf5bb8f103c56a4a6c Mon Sep 17 00:00:00 2001 From: "alexandre.moutardier" Date: Tue, 20 Jan 2026 17:24:13 +0100 Subject: [PATCH 06/11] correct unitary test --- tests/test_chromaticity_monitor.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/tests/test_chromaticity_monitor.py b/tests/test_chromaticity_monitor.py index e4d082a0..c18f97b6 100644 --- a/tests/test_chromaticity_monitor.py +++ b/tests/test_chromaticity_monitor.py @@ -5,13 +5,8 @@ from pyaml.configuration.factory import Factory -@pytest.mark.parametrize( - "install_test_package", - [{"name": "tango-pyaml", "path": "tests/dummy_cs/tango-pyaml"}], - indirect=True, -) -def test_simulator_chromaticity_monitor(install_test_package): - sr: Accelerator = Accelerator.load("tests/config/EBS_chromaticity.yaml") +def test_simulator_chromaticity_monitor(): + sr: Accelerator = Accelerator.load("tests/config/EBS_chromaticity.yaml", ignore_external=True) sr.design.get_lattice().disable_6d() chromaticity_monitor = sr.design.get_chromaticity_monitor("KSI") assert chromaticity_monitor.chromaticity.get()[0] == sr.design.get_lattice().get_chrom()[0] @@ -30,9 +25,9 @@ def test_controlsystem_chromaticity_monitor(install_test_package): chromaticity_monitor = sr.live.get_chromaticity_monitor("KSI") assert np.isnan(chromaticity_monitor.chromaticity.get()[0]) assert np.isnan(chromaticity_monitor.chromaticity.get()[1]) - chromaticity_monitor.chromaticity_measurement(do_plot=False, Sleep_between_meas=0, Sleep_between_RFvar=0) + chromaticity_monitor.chromaticity_measurement(do_plot=False, Sleep_between_meas=0, Sleep_between_RFvar=0, E_delta=1, Max_E_delta=1) ksi = np.abs(chromaticity_monitor.chromaticity.get()) - assert ksi[0] < 1e-17 - assert ksi[1] < 1e-17 + assert abs(ksi[0]) < 1e-17 + assert abs(ksi[1]) < 1e-17 Factory.clear() From 561024d1e9cfd1f6787d254edc435ea6117e7799 Mon Sep 17 00:00:00 2001 From: PONS Date: Thu, 22 Jan 2026 09:07:15 +0100 Subject: [PATCH 07/11] Update yaml to make it run on EBS VA --- tests/config/EBS_chromaticity.yaml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/config/EBS_chromaticity.yaml b/tests/config/EBS_chromaticity.yaml index eb65395f..97e71fe8 100644 --- a/tests/config/EBS_chromaticity.yaml +++ b/tests/config/EBS_chromaticity.yaml @@ -8,7 +8,7 @@ simulators: name: design controls: - type: tango.pyaml.controlsystem - tango_host: ebs-simu-3:10000 + tango_host: ebs-simu-2:10000 name: live data_folder: /data/store devices: @@ -16,12 +16,12 @@ devices: name: BETATRON_TUNE tune_h: type: tango.pyaml.attribute_read_only - attribute: srdiag/tune/tune_h - unit: mm + attribute: srdiag/beam-tune/main/Qh + unit: "1" tune_v: type: tango.pyaml.attribute_read_only - attribute: srdiag/tune/tune_v - unit: mm + attribute: srdiag/beam-tune/main/Qv + unit: "1" - type: pyaml.diagnostics.chromaticity_monitor name: KSI betatron_tune: BETATRON_TUNE From 9c09f046755c4df6540f6e8edcc0c9c95f0c2fae Mon Sep 17 00:00:00 2001 From: PONS Date: Thu, 22 Jan 2026 09:30:00 +0100 Subject: [PATCH 08/11] Fix formating, undefined mcf and plot --- pyaml/diagnostics/chromaticity_monitor.py | 265 +++++++++++++--------- tests/config/EBSTune.yaml | 3 +- 2 files changed, 161 insertions(+), 107 deletions(-) diff --git a/pyaml/diagnostics/chromaticity_monitor.py b/pyaml/diagnostics/chromaticity_monitor.py index 01f979d0..de34e6b5 100644 --- a/pyaml/diagnostics/chromaticity_monitor.py +++ b/pyaml/diagnostics/chromaticity_monitor.py @@ -1,6 +1,6 @@ from ..common.abstract import ReadFloatArray from ..common.element import Element, ElementConfigModel -from ..control.deviceaccess import DeviceAccess +from ..common.exception import PyAMLException try: from typing import Self # Python 3.11+ @@ -19,42 +19,65 @@ class ConfigModel(ElementConfigModel): model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") + """ + Chomaticity measurement + + Parameters + ---------- betatron_tune: str - """Name of the diagnostic pyaml device for measuring the tune""" + Name of the diagnostic pyaml device for measuring the tune RFfreq: str - """Name of main RF frequency plant""" + Name of main RF frequency plant N_step: int = 5 - """Default number of RF step during chromaticity measurment [default: 5]""" - alphac: float = 4.1819e-04 - """Default Twiss parameter alpha ???""" + Default number of RF step during chromaticity + measurment [default: 5] + alphac: float | None = None + Moment compaction factor + E_delta: float + Default variation of relative energy during chromaticity measurment: + f0 - f0 * E_delta * alphac < f_RF < f0 + f0 * E_delta * alphac + [default: 0.001] + Max_E_delta: float + Maximum autorized variation of relative energy during chromaticity + measurment [default: 0.004] + N_tune_meas: int + Default number of tune measurment per RF frequency [default: 1] + Sleep_between_meas: float + Default time sleep between two tune measurment [default: 2.0] + Sleep_between_RFvar: float + Default time sleep after RF frequency variation [default: 5.0] + fit_method: str + Default fitting method used for chromaticity between "lin" for + linear or "quad" for quadratique [default: "lin"] + """ + + betatron_tune: str + RFfreq: str + N_step: int = 5 + alphac: float | None = None E_delta: float = 0.001 - """Default variation of relative energy during chromaticity measurment : f0 - f0 * E_delta * alphac < f_RF < f0 + f0 * E_delta * alphac [default: 0.001]""" Max_E_delta: float = 0.004 - """Maximum autorized variation of relative energy during chromaticity measurment [default: 0.004]""" N_tune_meas: int = 1 - """Default number of tune measurment per RF frequency [default: 1]""" Sleep_between_meas: float = 2.0 - """Default time sleep between two tune measurment [default: 2.0]""" Sleep_between_RFvar: float = 5.0 - """Default time sleep after RF frequency variation [default: 5.0]""" fit_method: str = "lin" - """Default fitting method used for chromaticity between "lin" for linear or "quad" for quadratique [default: "lin"]""" class ChomaticityMonitor(Element): """ - Class providing access to a betatron tune monitor - of a physical or simulated lattice. - The monitor provides horizontal and vertical betatron tune measurements. + Class providing access to a chromaticity monitor + of a physical or simulated lattice. The monitor provides + horizontal and vertical chromaticity measurements. """ def __init__(self, cfg: ConfigModel): """ - Construct a BetatronTuneMonitor. + Construct a ChomaticityMonitor. Parameters ---------- cfg : ConfigModel - Configuration for the ChromaticityMonitor, including betatron tune monitor, RF plant, and defaults parameters. + Configuration for the ChromaticityMonitor, including betatron + tune monitor, RF plant, and defaults parameters. """ super().__init__(cfg.name) @@ -75,92 +98,110 @@ def attach(self, peer, chromaticity: ReadFloatArray) -> Self: obj._peer = peer return obj - def chromaticity_measurement(self, - N_step: int=None, - alphac: float=None, - E_delta: float=None, - Max_E_delta: float=None, - N_tune_meas: int=None, - Sleep_between_meas: float=None, - Sleep_between_RFvar: float=None, - fit_method: str=None, - do_plot: bool=None): + def chromaticity_measurement( + self, + N_step: int = None, + alphac: float = None, + E_delta: float = None, + Max_E_delta: float = None, + N_tune_meas: int = None, + Sleep_between_meas: float = None, + Sleep_between_RFvar: float = None, + fit_method: str = None, + do_plot: bool = None, + ): """ Main function for chromaticity measurment - N_step : int - Number of RF step during chromaticity measurment. - If defined, eraised defalt and configation files values. - alphac : float - Default Twiss parameter alpha ??? - If defined, eraised defalt and configation files values. - E_delta : float - Default variation of relative energy during chromaticity measurment : f0 - f0 * E_delta * alphac < f_RF < f0 + f0 * E_delta * alphac - If defined, eraised defalt and configation files values. - Max_E_delta : float - Maximum autorized variation of relative energy during chromaticity measurment - If defined, eraised defalt and configation files values. - N_tune_meas : int - Default number of tune measurment per RF frequency. - If defined, eraised defalt and configation files values. - Sleep_between_meas : float - Default time sleep between two tune measurment. - If defined, eraised defalt and configation files values. + Parameters + ---------- + N_step: int + Default number of RF step during chromaticity + measurment [default: from config] + alphac: float | None + Moment compaction factor [default: from config] + E_delta: float + Default variation of relative energy during chromaticity measurment: + f0 - f0 * E_delta * alphac < f_RF < f0 + f0 * E_delta * alphac + [default: from config] + Max_E_delta: float + Maximum autorized variation of relative energy during chromaticity + measurment [default: from config] + N_tune_meas: int + Default number of tune measurment per RF frequency [default: from config] + Sleep_between_meas: float + Default time sleep between two tune measurment [default: from config] Sleep_between_RFvar: float - Default time sleep after RF frequency variation. - If defined, eraised defalt and configation files values. + Default time sleep after RF frequency variation [default: from config] fit_method: str - Default fitting method used for chromaticity between "lin" for linear or "quad" for quadratique. - If defined, eraised defalt and configation files values. + Default fitting method used for chromaticity between "lin" for + linear or "quad" for quadratique [default: from config] do_plot : bool Do you want to plot the fittinf results ? """ - if N_step is None : + if N_step is None: N_step = self._cfg.N_step - if alphac is None : + if alphac is None: alphac = self._cfg.alphac - if E_delta is None : + if E_delta is None: E_delta = self._cfg.E_delta - if Max_E_delta is None : + if Max_E_delta is None: Max_E_delta = self._cfg.Max_E_delta - if N_tune_meas is None : + if N_tune_meas is None: N_tune_meas = self._cfg.N_tune_meas - if Sleep_between_meas is None : + if Sleep_between_meas is None: Sleep_between_meas = self._cfg.Sleep_between_meas - if Sleep_between_RFvar is None : + if Sleep_between_RFvar is None: Sleep_between_RFvar = self._cfg.Sleep_between_RFvar - if fit_method is None : + if fit_method is None: fit_method = self._cfg.fit_method if abs(E_delta) > abs(Max_E_delta): # TODO : Add logger to warm that E_delta is to large return np.array([None, None]) - delta, NuX, NuY = self.measure_tune_response(N_step=N_step, alphac=alphac, E_delta=E_delta, N_tune_meas=N_tune_meas, Sleep_between_meas=Sleep_between_meas, Sleep_between_RFvar=Sleep_between_RFvar) - chrom = self.fit_chromaticity(delta=delta, NuX=NuX, NuY=NuY, method=fit_method, do_plot=do_plot) - return(chrom) - - def measure_tune_response(self, - N_step: int, - alphac: float, - E_delta: float, - N_tune_meas: int, - Sleep_between_meas: float, - Sleep_between_RFvar: float): + if alphac is None: + raise PyAMLException("Moment compaction factor is not defined") + + delta, NuX, NuY = self.measure_tune_response( + N_step=N_step, + alphac=alphac, + E_delta=E_delta, + N_tune_meas=N_tune_meas, + Sleep_between_meas=Sleep_between_meas, + Sleep_between_RFvar=Sleep_between_RFvar, + ) + chrom = self.fit_chromaticity( + delta=delta, NuX=NuX, NuY=NuY, method=fit_method, do_plot=do_plot + ) + return chrom + + def measure_tune_response( + self, + N_step: int, + alphac: float, + E_delta: float, + N_tune_meas: int, + Sleep_between_meas: float, + Sleep_between_RFvar: float, + ): """ Main function for chromaticity measurment - N_step : int - Number of RF step during chromaticity measurment. - alphac : float - Default Twiss parameter alpha ??? - E_delta : float - Default variation of relative energy during chromaticity measurment : f0 - f0 * E_delta * alphac < f_RF < f0 + f0 * E_delta * alphac - N_tune_meas : int - Default number of tune measurment per RF frequency. - Sleep_between_meas : float - Default time sleep between two tune measurment. + N_step: int + Default number of RF step during chromaticity + measurment [default: from config] + alphac: float | None + Moment compaction factor [default: from config] + E_delta: float + Default variation of relative energy during chromaticity measurment: + f0 - f0 * E_delta * alphac < f_RF < f0 + f0 * E_delta * alphac + [default: from config] + N_tune_meas: int + Default number of tune measurment per RF frequency [default: from config] + Sleep_between_meas: float + Default time sleep between two tune measurment [default: from config] Sleep_between_RFvar: float - Default time sleep after RF frequency variation. + Default time sleep after RF frequency variation [default: from config] """ tune = self._peer.get_betatron_tune_monitor(self._cfg.betatron_tune) @@ -174,24 +215,29 @@ def measure_tune_response(self, NuY = np.zeros((N_step, N_tune_meas)) NuX = np.zeros((N_step, N_tune_meas)) - try: # ensure that, even if there is an issus, the script will finish by reseting the RF frequency + # ensure that, even if there is an issus, the script will finish by + # reseting the RF frequency to its original value + err = None + try: for i, f in enumerate(delta_frec): - # TODO : Use set_and_wait once it is implemented ! (and remove Sleep_between_RFvar ?) + # TODO : Use set_and_wait once it is implemented ! + # (and remove Sleep_between_RFvar ?) rf.frequency.set(f0 + f) sleep(Sleep_between_RFvar) for j in range(N_tune_meas): - NuX[i,j], NuY[i,j] = tune.tune.get() + NuX[i, j], NuY[i, j] = tune.tune.get() sleep(Sleep_between_meas) - except: - # TODO : add proper exception - print("NOK") + except Exception as ex: + err = ex finally: # TODO : Use set_and_wait once it is implemented ! rf.frequency.set(f0) - return(delta, NuX, NuY) + if err: + raise (err) + return (delta, NuX, NuY) def fit_chromaticity(self, delta, NuX, NuY, method, do_plot): """ @@ -226,39 +272,46 @@ def fit_chromaticity(self, delta, NuX, NuY, method, do_plot): # else: # tune0 = np.mean(Nu[N_step_delta//2,:]) - dtune = np.mean(Nu[:,:],1) + dtune = np.mean(Nu[:, :], 1) + + if method == "lin": - if method=="lin": def linear_fit(x, a, b): - return a*x + b + return a * x + b + popt_lin, _ = curve_fit(linear_fit, delta, dtune) chro.append(popt_lin[0]) - elif method=="quad": + elif method == "quad": + def quad_fit(x, a, b, c): - return a*x**2 + b*x + c + return a * x**2 + b * x + c + popt_quad, _ = curve_fit(quad_fit, delta, dtune) chro.append(popt_quad[1]) if do_plot: fig = plt.figure("Chromaticity_measurement") - ax = fig.add_subplot(2,1,1+i) + ax = fig.add_subplot(2, 1, 1 + i) ax.scatter(delta, dtune) - if method=="lin": - ax.plot(delta, - linear_fit(delta, popt_lin[0], popt_lin[1]), - '--') + if method == "lin": + ax.plot(delta, linear_fit(delta, popt_lin[0], popt_lin[1]), "--") title = "{:.4f}dp/p+{:.8f}".format(*popt_lin) - elif method=="quad": - ax.plot(delta, - quad_fit(delta, popt_quad[0], popt_quad[1], popt_quad[2]), - '--',) - title="{:.4f}(dp/p)$^2$+{:.4f}dp/p+{:.4f}".format(*popt_quad) + elif method == "quad": + ax.plot( + delta, + quad_fit(delta, popt_quad[0], popt_quad[1], popt_quad[2]), + "--", + ) + title = "{:.4f}(dp/p)$^2$+{:.4f}dp/p+{:.4f}".format(*popt_quad) ax.set_title(title) - ax.set_xlabel('Momentum Shift, dp/p [%]') - ax.set_ylabel("%s Tune"%["Horizontal", "Vertical"][i]) - # ax.legend() - self._last_measured = np.array(chro) + ax.set_xlabel("Momentum Shift, dp/p [%]") + ax.set_ylabel("%s Tune" % ["Horizontal", "Vertical"][i]) + ax.legend() - return self._last_measured + if do_plot: + fig.tight_layout() + plt.show() + self._last_measured = np.array(chro) + return self._last_measured diff --git a/tests/config/EBSTune.yaml b/tests/config/EBSTune.yaml index f4ce69b4..63b0e1f7 100644 --- a/tests/config/EBSTune.yaml +++ b/tests/config/EBSTune.yaml @@ -8,7 +8,7 @@ simulators: name: design controls: - type: tango.pyaml.controlsystem - tango_host: ebs-simu-3:10000 + tango_host: ebs-simu-2:10000 name: live data_folder: /data/store arrays: @@ -1021,6 +1021,7 @@ devices: powerconverter: type: tango.pyaml.attribute attribute: srmag/vps-qd2/c04-e/current + range: [10,109] unit: A - type: pyaml.magnet.quadrupole name: QD2A-C05 From 264dcab9b8917b5640b315d25a52c6cbcf7365b0 Mon Sep 17 00:00:00 2001 From: PONS Date: Thu, 22 Jan 2026 10:10:11 +0100 Subject: [PATCH 09/11] Swith to polyfit --- pyaml/diagnostics/chromaticity_monitor.py | 68 ++++++++++------------- 1 file changed, 28 insertions(+), 40 deletions(-) diff --git a/pyaml/diagnostics/chromaticity_monitor.py b/pyaml/diagnostics/chromaticity_monitor.py index de34e6b5..195148e9 100644 --- a/pyaml/diagnostics/chromaticity_monitor.py +++ b/pyaml/diagnostics/chromaticity_monitor.py @@ -11,7 +11,6 @@ import matplotlib.pyplot as plt import numpy as np from pydantic import ConfigDict -from scipy.optimize import curve_fit PYAMLCLASS = "ChomaticityMonitor" @@ -46,9 +45,8 @@ class ConfigModel(ElementConfigModel): Default time sleep between two tune measurment [default: 2.0] Sleep_between_RFvar: float Default time sleep after RF frequency variation [default: 5.0] - fit_method: str - Default fitting method used for chromaticity between "lin" for - linear or "quad" for quadratique [default: "lin"] + fit_order: int + Fitting order [default: 1] """ betatron_tune: str @@ -60,7 +58,7 @@ class ConfigModel(ElementConfigModel): N_tune_meas: int = 1 Sleep_between_meas: float = 2.0 Sleep_between_RFvar: float = 5.0 - fit_method: str = "lin" + fit_order: int = 1 class ChomaticityMonitor(Element): @@ -107,7 +105,7 @@ def chromaticity_measurement( N_tune_meas: int = None, Sleep_between_meas: float = None, Sleep_between_RFvar: float = None, - fit_method: str = None, + fit_order: int = None, do_plot: bool = None, ): """ @@ -133,9 +131,8 @@ def chromaticity_measurement( Default time sleep between two tune measurment [default: from config] Sleep_between_RFvar: float Default time sleep after RF frequency variation [default: from config] - fit_method: str - Default fitting method used for chromaticity between "lin" for - linear or "quad" for quadratique [default: from config] + fit_order: int + Fitting order [default: 1] do_plot : bool Do you want to plot the fittinf results ? """ @@ -153,8 +150,8 @@ def chromaticity_measurement( Sleep_between_meas = self._cfg.Sleep_between_meas if Sleep_between_RFvar is None: Sleep_between_RFvar = self._cfg.Sleep_between_RFvar - if fit_method is None: - fit_method = self._cfg.fit_method + if fit_order is None: + fit_order = self._cfg.fit_order if abs(E_delta) > abs(Max_E_delta): # TODO : Add logger to warm that E_delta is to large return np.array([None, None]) @@ -171,7 +168,7 @@ def chromaticity_measurement( Sleep_between_RFvar=Sleep_between_RFvar, ) chrom = self.fit_chromaticity( - delta=delta, NuX=NuX, NuY=NuY, method=fit_method, do_plot=do_plot + delta=delta, NuX=NuX, NuY=NuY, order=fit_order, do_plot=do_plot ) return chrom @@ -239,7 +236,7 @@ def measure_tune_response( return (delta, NuX, NuY) - def fit_chromaticity(self, delta, NuX, NuY, method, do_plot): + def fit_chromaticity(self, delta, NuX, NuY, order, do_plot): """ Compute chromaticity from measurement data. @@ -251,8 +248,8 @@ def fit_chromaticity(self, delta, NuX, NuY, method, do_plot): Horizontal tune measured. NuZ : array of float Vertical tune measured. - method : {"lin" or "quad"} - "lin" uses a linear fit and "quad" a 2nd order polynomial. + order : int + order of polynomial used for fit plot : bool, optional If True, plot the fit. Plots are made but not shown. Use plt.show() to show it. @@ -273,36 +270,27 @@ def fit_chromaticity(self, delta, NuX, NuY, method, do_plot): # tune0 = np.mean(Nu[N_step_delta//2,:]) dtune = np.mean(Nu[:, :], 1) - - if method == "lin": - - def linear_fit(x, a, b): - return a * x + b - - popt_lin, _ = curve_fit(linear_fit, delta, dtune) - chro.append(popt_lin[0]) - elif method == "quad": - - def quad_fit(x, a, b, c): - return a * x**2 + b * x + c - - popt_quad, _ = curve_fit(quad_fit, delta, dtune) - chro.append(popt_quad[1]) + coefs = np.polynomial.polynomial.polyfit(delta, dtune, order) + chro.append(coefs[1]) if do_plot: fig = plt.figure("Chromaticity_measurement") ax = fig.add_subplot(2, 1, 1 + i) ax.scatter(delta, dtune) - if method == "lin": - ax.plot(delta, linear_fit(delta, popt_lin[0], popt_lin[1]), "--") - title = "{:.4f}dp/p+{:.8f}".format(*popt_lin) - elif method == "quad": - ax.plot( - delta, - quad_fit(delta, popt_quad[0], popt_quad[1], popt_quad[2]), - "--", - ) - title = "{:.4f}(dp/p)$^2$+{:.4f}dp/p+{:.4f}".format(*popt_quad) + title = "" + for o in range(order, -1, -1): + dp = "" + if o == 1: + dp = "dp/p" + elif o >= 1: + dp = "(dp/p)$^2$" + + title += f"{coefs[o]:.4f} {dp}" + if o != 0: + title += " + " + + print(title) + ax.plot(delta, np.polyval(coefs[::-1], delta)) ax.set_title(title) ax.set_xlabel("Momentum Shift, dp/p [%]") ax.set_ylabel("%s Tune" % ["Horizontal", "Vertical"][i]) From 55d9245b5b27e87bf6f1db0ded3df40a362ee54e Mon Sep 17 00:00:00 2001 From: PONS Date: Thu, 22 Jan 2026 10:22:47 +0100 Subject: [PATCH 10/11] Fix unit test --- tests/test_chromaticity_monitor.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/tests/test_chromaticity_monitor.py b/tests/test_chromaticity_monitor.py index c18f97b6..1e964a2a 100644 --- a/tests/test_chromaticity_monitor.py +++ b/tests/test_chromaticity_monitor.py @@ -6,11 +6,19 @@ def test_simulator_chromaticity_monitor(): - sr: Accelerator = Accelerator.load("tests/config/EBS_chromaticity.yaml", ignore_external=True) + sr: Accelerator = Accelerator.load( + "tests/config/EBS_chromaticity.yaml", ignore_external=True + ) sr.design.get_lattice().disable_6d() chromaticity_monitor = sr.design.get_chromaticity_monitor("KSI") - assert chromaticity_monitor.chromaticity.get()[0] == sr.design.get_lattice().get_chrom()[0] - assert chromaticity_monitor.chromaticity.get()[1] == sr.design.get_lattice().get_chrom()[1] + assert ( + chromaticity_monitor.chromaticity.get()[0] + == sr.design.get_lattice().get_chrom()[0] + ) + assert ( + chromaticity_monitor.chromaticity.get()[1] + == sr.design.get_lattice().get_chrom()[1] + ) Factory.clear() @@ -25,7 +33,14 @@ def test_controlsystem_chromaticity_monitor(install_test_package): chromaticity_monitor = sr.live.get_chromaticity_monitor("KSI") assert np.isnan(chromaticity_monitor.chromaticity.get()[0]) assert np.isnan(chromaticity_monitor.chromaticity.get()[1]) - chromaticity_monitor.chromaticity_measurement(do_plot=False, Sleep_between_meas=0, Sleep_between_RFvar=0, E_delta=1, Max_E_delta=1) + chromaticity_monitor.chromaticity_measurement( + do_plot=False, + alphac=1e-4, + Sleep_between_meas=0, + Sleep_between_RFvar=0, + E_delta=1, + Max_E_delta=1, + ) ksi = np.abs(chromaticity_monitor.chromaticity.get()) assert abs(ksi[0]) < 1e-17 assert abs(ksi[1]) < 1e-17 From 3148d668bf57715acab8f66d742d429146a63d2b Mon Sep 17 00:00:00 2001 From: PONS Date: Thu, 22 Jan 2026 10:28:19 +0100 Subject: [PATCH 11/11] Restored EBSTune.yaml commited by mistake --- tests/config/EBSTune.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/config/EBSTune.yaml b/tests/config/EBSTune.yaml index 63b0e1f7..f4ce69b4 100644 --- a/tests/config/EBSTune.yaml +++ b/tests/config/EBSTune.yaml @@ -8,7 +8,7 @@ simulators: name: design controls: - type: tango.pyaml.controlsystem - tango_host: ebs-simu-2:10000 + tango_host: ebs-simu-3:10000 name: live data_folder: /data/store arrays: @@ -1021,7 +1021,6 @@ devices: powerconverter: type: tango.pyaml.attribute attribute: srmag/vps-qd2/c04-e/current - range: [10,109] unit: A - type: pyaml.magnet.quadrupole name: QD2A-C05