Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyaml/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
__title__ = "pyAML"
__description__ = "Python Accelerator Middle Layer"
__url__ = "https://github.com/python-accelerator-middle-layer/pyaml"
__version__ = "0.2.2"
__version__ = "0.2.3"
__author__ = "pyAML collaboration"
__author_email__ = ""

Expand Down
186 changes: 186 additions & 0 deletions pyaml/control/abstract_impl.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from typing import Any

import numpy as np
from numpy import double
from numpy.typing import NDArray

from .. import PyAMLException
from ..bpm.bpm_model import BPMModel
from ..common import abstract
from ..common.abstract_aggregator import ScalarAggregator
Expand All @@ -15,6 +18,168 @@
# ------------------------------------------------------------------------------


def check_range(values: Any, dev_range: Any) -> bool:
"""
Check whether values are within given ranges.

Inverted semantics:
- True -> all checks pass (everything is within bounds)
- False -> at least one check fails (out of range)

dev_range format (flat):
[min1, max1, min2, max2, ...]

Broadcasting rules:
Let N = number of values, K = number of ranges (pairs).
- N == K : one range per value
- N == 1 and K > 1: the single value must satisfy ALL ranges
- N > 1 and K == 1: the single range applies to ALL values
"""
# ---- Normalize values to a 1D float array ----
v = np.asarray(values, dtype=float)
if v.ndim == 0:
v = v.reshape(1)
else:
v = v.ravel()
n = v.size

# ---- Normalize dev_range (object to preserve None) ----
r = np.asarray(dev_range, dtype=object).ravel()
if (r.size % 2) != 0:
raise ValueError(f"dev_range must have an even length, got {r.size}")

mins_obj = r[0::2]
maxs_obj = r[1::2]
k = mins_obj.size

# ---- Broadcasting rules ----
if n == k:
vv = v
mins = mins_obj
maxs = maxs_obj
elif n == 1 and k > 1:
vv = np.full(k, v[0], dtype=float)
mins = mins_obj
maxs = maxs_obj
elif n > 1 and k == 1:
vv = v
mins = np.full(n, mins_obj[0], dtype=object)
maxs = np.full(n, maxs_obj[0], dtype=object)
else:
raise ValueError(
f"Inconsistent sizes: {n} value(s) for {k} range(s). "
f"Supported: N==K, N==1, or K==1."
)

# ---- Replace None bounds with -inf / +inf (NumPy-safe) ----
mins_is_none = np.equal(mins, None)
maxs_is_none = np.equal(maxs, None)

mins_f = np.where(mins_is_none, -np.inf, mins).astype(float)
maxs_f = np.where(maxs_is_none, +np.inf, maxs).astype(float)

# ---- Vectorized range check ----
return bool(np.all((vv >= mins_f) & (vv <= maxs_f)))


def _as_1d_float_array(values: Any) -> np.ndarray:
"""Normalize input values to a 1D float NumPy array."""
v = np.asarray(values, dtype=float)
if v.ndim == 0:
return v.reshape(1)
return v.ravel()


def _iter_devices_and_ranges(devs: DeviceAccess | DeviceAccessList):
"""
Yield tuples (device, [min, max]) for each underlying device.

Works for:
- DeviceAccess: yields 1 item
- DeviceAccessList: yields N items based on get_devices() and get_range() flattening
"""
# Single device
if (
hasattr(devs, "get")
and hasattr(devs, "get_range")
and not hasattr(devs, "get_devices")
):
r = devs.get_range()
if r is None:
r = [None, None]
return [(devs, [r[0], r[1]])]

# Device list (expects get_devices() + get_range() flat list)
devices = devs.get_devices()
flat = np.asarray(devs.get_range(), dtype=object).ravel()
if (flat.size % 2) != 0:
raise ValueError(f"dev_range must have an even length, got {flat.size}")

pairs = []
for i, d in enumerate(devices):
pairs.append((d, [flat[2 * i], flat[2 * i + 1]]))
return pairs


def format_out_of_range_message(
values: Any,
devs: DeviceAccess | DeviceAccessList,
*,
header: str = "Values out of range:",
) -> str:
"""
Build a user-friendly error message for out-of-range values.

Output example:
Values out of range:
110 A, '//host/dev/attr' [10.0, 109.0]
110 A, '//host/dev/attr' [10.0, 109.0]

Notes:
- Only failing channels are listed.
- Supports scalar/array values and DeviceAccess/DeviceAccessList.
- Uses check_range() semantics (inclusive bounds, None => unbounded).
"""
v = _as_1d_float_array(values)
dev_pairs = _iter_devices_and_ranges(devs)

# Apply the same broadcasting rules as check_range():
# - N == K : value per device
# - N == 1 and K > 1 : single value checked against all devices
# - N > 1 and K == 1 : single device range applied to all values (rare here but supported)
n = v.size
k = len(dev_pairs)

if n == k:
vv = v
pairs = dev_pairs
elif n == 1 and k > 1:
vv = np.full(k, v[0], dtype=float)
pairs = dev_pairs
elif n > 1 and k == 1:
vv = v
pairs = [dev_pairs[0]] * n
else:
raise ValueError(
f"Inconsistent sizes: {n} value(s) for {k} device(s). "
f"Supported: N==K, N==1, or K==1."
)

lines = [header]
for val, (dev, r) in zip(vv, pairs):
if not check_range(val, r):
unit = dev.unit() if hasattr(dev, "unit") else ""
name = str(dev)
rmin, rmax = r[0], r[1]
lines.append(f"{val:g} {unit}, '{name}' [{rmin}, {rmax}]")

# Fallback if nothing selected (should not happen if caller checked range before)
if len(lines) == 1:
lines.append("(no channel details available)")

return "\n".join(lines)


class CSScalarAggregator(ScalarAggregator):
"""
Basic control system aggregator for a list of scalar values
Expand Down Expand Up @@ -98,6 +263,11 @@ def set(self, value: NDArray[np.float64]):
model.compute_hardware_values(mStrengths)
)
hardwareIndex += nbDev
dev_range = self._devs.get_range()
if not check_range(newHardwareValues, dev_range):
raise PyAMLException(
format_out_of_range_message(newHardwareValues, self._devs)
)
self._devs.set(newHardwareValues)

def set_and_wait(self, value: NDArray[np.float64]):
Expand Down Expand Up @@ -188,6 +358,9 @@ def get(self) -> float:
return self.__dev.get()

def set(self, value: float):
dev_range = self.__dev.get_range()
if not check_range(value, dev_range):
raise PyAMLException(format_out_of_range_message(value, self.__dev))
self.__dev.set(value)

def set_and_wait(self, value: double):
Expand Down Expand Up @@ -220,6 +393,9 @@ def get(self) -> float:
# Sets the value
def set(self, value: float):
current = self.__model.compute_hardware_values([value])[0]
dev_range = self.__dev.get_range()
if not check_range(current, dev_range):
raise PyAMLException(format_out_of_range_message(current, self.__dev))
self.__dev.set(current)

# Sets the value and wait that the read value reach the setpoint
Expand Down Expand Up @@ -254,6 +430,9 @@ def get(self) -> np.array:
# Sets the value
def set(self, value: np.array):
for idx, p in enumerate(self.__devs):
dev_range = p.get_range()
if not check_range(value[idx], dev_range):
raise PyAMLException(format_out_of_range_message(value[idx], p))
p.set(value[idx])

# Sets the value and waits that the read value reach the setpoint
Expand Down Expand Up @@ -286,6 +465,11 @@ def get(self) -> np.array:
# Sets the value
def set(self, value: np.array):
cur = self.__model.compute_hardware_values(value)
for idx, p in enumerate(self.__devs):
dev_range = p.get_range()
if not check_range(cur[idx], dev_range):
raise PyAMLException(format_out_of_range_message(cur[idx], p))

for idx, p in enumerate(self.__devs):
p.set(cur[idx])

Expand Down Expand Up @@ -519,6 +703,8 @@ def get(self) -> NDArray:

def unit(self) -> str:
return self.__tune_monitor._cfg.tune_v.unit()


# ------------------------------------------------------------------------------


Expand Down
9 changes: 9 additions & 0 deletions pyaml/control/deviceaccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,12 @@ def readback(self):
def unit(self) -> str:
"""Return the variable unit"""
pass

@abstractmethod
def get_range(self) -> list[float]:
pass

@abstractmethod
def check_device_availability(self) -> bool:
pass

8 changes: 8 additions & 0 deletions pyaml/control/deviceaccesslist.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,11 @@ def readback(self) -> np.array:
def unit(self) -> str:
"""Return the variable unit"""
pass

@abstractmethod
def get_range(self) -> list[float]:
pass

@abstractmethod
def check_device_availability(self) -> bool:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ simulators:
lattice: sr/lattices/ebs.mat
name: design
controls:
- type: pyaml-cs-oa.pyaml.controlsystem
- type: tango.pyaml.controlsystem
tango_host: ebs-simu-3:10000
name: live
data_folder: /data/store
Expand Down Expand Up @@ -168,6 +168,7 @@ devices:
type: tango.pyaml.attribute
attribute: srmag/vps-qf1/c05-a/current
unit: A
range: [10,109]
- type: pyaml.magnet.quadrupole
name: QF1E-C05
model:
Expand Down
Loading
Loading