Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions pyaml/bpm/bpm.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ def __init__(self, cfg: ConfigModel):
----------
name : str
Element name
hardware : DeviceAccess
Direct access to a hardware (bypass the BPM model)
model : BPMModel
BPM model in charge of computing beam position
"""
Expand Down
80 changes: 77 additions & 3 deletions pyaml/bpm/bpm_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def get_pos_devices(self) -> list[DeviceAccess | None]:
Returns
-------
list[DeviceAccess]
Array of DeviceAcess
h and v position devices
"""
pass

Expand All @@ -32,7 +32,7 @@ def get_tilt_device(self) -> DeviceAccess | None:
Returns
-------
list[DeviceAccess]
Array of DeviceAcess
tilt device
"""
pass

Expand All @@ -44,6 +44,80 @@ def get_offset_devices(self) -> list[DeviceAccess | None]:
Returns
-------
list[DeviceAccess]
Array of DeviceAcess
h and v offset devices
"""
pass

def x_pos_index(self) -> int | None:
"""
Returns the index of the horizontal position in
an array, otherwise a scalar value is expected from the
corresponding DeviceAccess

Returns
-------
int
Index in the array, None for a scalar value
"""
return None

def y_pos_index(self) -> int | None:
"""
Returns the index of the veritcal position in
an array, otherwise a scalar value is expected from the
corresponding DeviceAccess

Returns
-------
int
Index in the array, None for a scalar value
"""
return None

def is_pos_indexed(self) -> bool:
return self.x_pos_index() is not None and self.y_pos_index() is not None

def tilt_index(self) -> int | None:
"""
Returns the index of the tilt angle in
an array, otherwise a scalar value is expected from the
corresponding DeviceAccess

Returns
-------
int
Index in the array, None for a scalar value
"""
return None

def is_tilt_indexed(self) -> bool:
return self.tilt_index() is not None

def x_offset_index(self) -> int | None:
"""
Returns the index of the horizontal offset in
an array, otherwise a scalar value is expected from the
corresponding DeviceAccess

Returns
-------
int
Index in the array, None for a scalar value
"""
return None

def y_offset_index(self) -> int | None:
"""
Returns the index of the veritcal offset in
an array, otherwise a scalar value is expected from the
corresponding DeviceAccess

Returns
-------
int
Index in the array, None for a scalar value
"""
return None

def is_offset_indexed(self) -> bool:
return self.x_offset_index() is not None and self.y_offset_index() is not None
34 changes: 34 additions & 0 deletions pyaml/bpm/bpm_simple_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@ class ConfigModel(BaseModel):
Horizontal position device
y_pos : DeviceAccess, optional
Vertical position device
x_pos_index : int, optional
Index in the array when specified, otherwise scalar
value is expected
y_pos_index : int, optional
Index in the array when specified, otherwise scalar
value is expected
"""

model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid")

x_pos: DeviceAccess | None
y_pos: DeviceAccess | None
x_pos_index: int | None = None
y_pos_index: int | None = None


class BPMSimpleModel(BPMModel):
Expand Down Expand Up @@ -73,5 +81,31 @@ def get_offset_devices(self) -> list[DeviceAccess | None]:
"""
return [None, None]

def x_pos_index(self) -> int | None:
"""
Returns the index of the horizontal position in
an array, otherwise a scalar value is expected from the
corresponding DeviceAccess

Returns
-------
int
Index in the array, None for a scalar value
"""
return self._cfg.x_pos_index

def y_pos_index(self) -> int | None:
"""
Returns the index of the veritcal position in
an array, otherwise a scalar value is expected from the
corresponding DeviceAccess

Returns
-------
int
Index in the array, None for a scalar value
"""
return self._cfg.y_pos_index

def __repr__(self):
return __pyaml_repr__(self)
4 changes: 4 additions & 0 deletions pyaml/bpm/bpm_tiltoffset_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# Define the main class name for this module
PYAMLCLASS = "BPMTiltOffsetModel"

# TODO: Implepement indexed offset and tilt


class ConfigModel(BaseModel):
"""
Expand All @@ -34,6 +36,8 @@ class ConfigModel(BaseModel):

x_pos: DeviceAccess | None
y_pos: DeviceAccess | None
x_pos_index: int | None = None
y_pos_index: int | None = None
x_offset: DeviceAccess | None
y_offset: DeviceAccess | None
tilt: DeviceAccess | None
Expand Down
125 changes: 99 additions & 26 deletions pyaml/control/abstract_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,36 @@ def unit(self) -> str:
# ------------------------------------------------------------------------------


class CSBPMArrayMapper(CSScalarAggregator):
"""
Wrapper to a native CS aggregator for BPM
"""

def __init__(self, devs: list[DeviceAccess], indices: list[list[int]]):
self._indices = indices
self._devs = devs

def set(self, value: NDArray[np.float64]):
raise Exception("BPM are not writable")

def get(self) -> NDArray[np.float64]:
# TODO read using DeviceAccessList
allValues = []
for i, d in enumerate(self._devs):
v = d.get()
allValues.extend(v[self._indices[i]])
return np.array(allValues)

def readback(self) -> np.array:
return self.get()

def unit(self) -> str:
return self._dev.unit()


# ------------------------------------------------------------------------------


class RWHardwareScalar(abstract.ReadWriteFloatScalar):
"""
Class providing read write access to a magnet
Expand Down Expand Up @@ -267,20 +297,35 @@ def unit(self) -> list[str]:

class RBpmArray(abstract.ReadFloatArray):
"""
Class providing read access to a BPM array of a control system
Class providing read access to a BPM position [x,y] of a control system
"""

def __init__(self, model: BPMModel, devs: list[DeviceAccess]):
self.__model = model
self.__devs = devs
def __init__(self, model: BPMModel, hDev: DeviceAccess, vDev: DeviceAccess):
self._model = model
self._hDev = hDev
self._vDev = vDev
self._hIdx = self._model.x_pos_index()
self._vIdx = self._model.y_pos_index()

# Gets the values
def get(self) -> np.array:
return np.array([self.__devs[0].get(), self.__devs[1].get()])

# Gets the unit of the value Assume that x and y has the same unit
if self._hDev != self._vDev:
allhVal = self._hDev.get()
allvVal = self._vDev.get()
hVal = allhVal if self._hIdx is None else allhVal[self._hIdx]
vVal = allvVal if self._vIdx is None else allvVal[self._vIdx]
else:
# When h and v devices are identical, indexed
# values are expected
allVal = self._hDev.get()
hVal = allVal[self._hIdx]
vVal = allVal[self._vIdx]
return np.array([hVal, vVal])

# Gets the unit of the value Assume that x and y, offsets and positions
# have the same unit
def unit(self) -> str:
return self.__model.get_pos_devices()[0].unit()
return self._model.get_pos_devices()[0].unit()


# ------------------------------------------------------------------------------
Expand All @@ -292,51 +337,79 @@ class RWBpmTiltScalar(abstract.ReadFloatScalar):
"""

def __init__(self, model: BPMModel, dev: DeviceAccess):
self.__model = model
self.__dev = dev
self._model = model
self._dev = dev
self._idx = model.tilt_index()

# Gets the value
def get(self) -> float:
return self.__dev.get()
allTilt = self._dev.get()
if self._idx is not None:
return allTilt[self._idx]
else:
return allTilt

def set(self, value: float):
self.__dev.set(value)
self._dev.set(value)

def set_and_wait(self, value: NDArray[np.float64]):
raise NotImplementedError("Not implemented yet.")

# Gets the unit of the value
def unit(self) -> str:
return self.__model.get_tilt_device().unit()
return self._model.get_tilt_device().unit()


# ------------------------------------------------------------------------------


class RWBpmOffsetArray(abstract.ReadWriteFloatArray):
"""
Class providing read write access to a BPM offset of a control system
Class providing read write access to a BPM offset [x,y] of a control system
"""

def __init__(self, model: BPMModel, devs: list[DeviceAccess]):
self.__model = model
self.__devs = devs

# Gets the value
def get(self) -> NDArray[np.float64]:
return np.array([self.__devs[0].get(), self.__devs[1].get()])
def __init__(self, model: BPMModel, hDev: DeviceAccess, vDev: DeviceAccess):
self._model = model
self._hDev = hDev
self._vDev = vDev
self._hIdx = self._model.x_pos_index()
self._vIdx = self._model.y_pos_index()

# Sets the value
# Gets the values
def get(self) -> np.array:
if self._hDev != self._vDev:
allhVal = self._hDev.get()
allvVal = self._vDev.get()
hVal = allhVal if self._hIdx is None else allhVal[self._hIdx]
vVal = allvVal if self._vIdx is None else allvVal[self._vIdx]
else:
# When h and v devices are identical, indexed
# values are expected
allVal = self._hDev.get()
hVal = allVal[self._hIdx]
vVal = allVal[self._vIdx]
return np.array([hVal, vVal])

# Sets the values
def set(self, value: NDArray[np.float64]):
self.__devs[0].set(value[0])
self.__devs[1].set(value[1])
if self._hDev != self._vDev:
self._hDev.set(value[0])
self._vDev.set(value[1])
else:
# When h and v devices are identical, indexed
# values are expected
newValue = self._hDev.get()
newValue[self._hIdx] = value[0]
newValue[self._vIdx] = value[1]
self._hDev.set(newValue)

def set_and_wait(self, value: NDArray[np.float64]):
raise NotImplementedError("Not implemented yet.")

# Gets the unit of the value
# Gets the unit of the value Assume that x and y, offsets and positions
# have the same unit
def unit(self) -> str:
return self.__model.get_offset_devices()[0].unit()
return self._model.get_pos_devices()[0].unit()


# ------------------------------------------------------------------------------
Expand Down
Loading
Loading