Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies = [
"scanspec>=0.7.3",
"pyzmq==26.3.0", # Until we can move to RHEL 8 https://github.com/DiamondLightSource/mx-bluesky/issues/1139
"deepdiff",
"daq-config-server>=v1.0.0", # For getting Configuration settings.
"daq-config-server>=v1.1.2", # For getting Configuration settings.
]

dynamic = ["version"]
Expand Down
8 changes: 7 additions & 1 deletion src/dodal/beamlines/i09_1_shared.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from daq_config_server.client import ConfigServer

from dodal.device_manager import DeviceManager
from dodal.devices.common_dcm import (
DoubleCrystalMonochromatorWithDSpacing,
Expand All @@ -6,6 +8,7 @@
)
from dodal.devices.i09_1_shared.hard_energy import HardEnergy, HardInsertionDeviceEnergy
from dodal.devices.i09_1_shared.hard_undulator_functions import (
I09HardLutProvider,
calculate_energy_i09_hu,
calculate_gap_i09_hu,
)
Expand All @@ -17,6 +20,9 @@

devices = DeviceManager()

I09_1_CONF_CLIENT = ConfigServer()
LOOK_UPTABLE_FILE = "/dls_sw/i09-1/software/gda/workspace_git/gda-diamond.git/configurations/i09-1-shared/lookupTables/IIDCalibrationTable.txt"


@devices.factory()
def dcm() -> DoubleCrystalMonochromatorWithDSpacing[
Expand Down Expand Up @@ -44,7 +50,7 @@ def hu_id_energy(
return HardInsertionDeviceEnergy(
undulator_order=harmonics,
undulator=undulator,
lut={}, # ToDo https://github.com/DiamondLightSource/sm-bluesky/issues/239
lut_provider=I09HardLutProvider(I09_1_CONF_CLIENT, LOOK_UPTABLE_FILE),
gap_to_energy_func=calculate_energy_i09_hu,
energy_to_gap_func=calculate_gap_i09_hu,
)
Expand Down
2 changes: 0 additions & 2 deletions src/dodal/devices/i09_1_shared/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
from .hard_undulator_functions import (
calculate_energy_i09_hu,
calculate_gap_i09_hu,
get_hu_lut_as_dict,
)

__all__ = [
"calculate_gap_i09_hu",
"get_hu_lut_as_dict",
"calculate_energy_i09_hu",
"HardInsertionDeviceEnergy",
"HardEnergy",
Expand Down
50 changes: 21 additions & 29 deletions src/dodal/devices/i09_1_shared/hard_energy.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from asyncio import gather
from collections.abc import Callable
from typing import Any, Protocol

from bluesky.protocols import Locatable, Location, Movable
from numpy import ndarray
from ophyd_async.core import (
AsyncStatus,
Reference,
Expand All @@ -13,13 +12,21 @@
)

from dodal.devices.common_dcm import DoubleCrystalMonochromatorBase
from dodal.devices.i09_1_shared.hard_undulator_functions import (
MAX_ENERGY_COLUMN,
MIN_ENERGY_COLUMN,
)
from dodal.devices.undulator import UndulatorInMm, UndulatorOrder


class LookUpTableProvider(Protocol):
def get_look_up_table(self, *args, **kwargs) -> Any:
"""Protocol to provide lookup table data."""
...


class EnergyGapConvertor(Protocol):
def __call__(self, lut: LookUpTableProvider, value: float, order: int) -> float:
"""Protocol to provide value conversion using lookup table provider."""
...


class HardInsertionDeviceEnergy(StandardReadable, Movable[float]):
"""
Compound device to link hard x-ray undulator gap and order to photon energy.
Expand All @@ -30,16 +37,16 @@ def __init__(
self,
undulator_order: UndulatorOrder,
undulator: UndulatorInMm,
lut: dict[int, ndarray],
gap_to_energy_func: Callable[..., float],
energy_to_gap_func: Callable[..., float],
lut_provider: LookUpTableProvider,
gap_to_energy_func: EnergyGapConvertor,
energy_to_gap_func: EnergyGapConvertor,
name: str = "",
) -> None:
self._lut = lut
self.gap_to_energy_func = gap_to_energy_func
self.energy_to_gap_func = energy_to_gap_func
self._undulator_order_ref = Reference(undulator_order)
self._undulator_ref = Reference(undulator)
self._lut_provider = lut_provider
self.gap_to_energy_func = gap_to_energy_func
self.energy_to_gap_func = energy_to_gap_func

self.add_readables([undulator_order, undulator.current_gap])
with self.add_children_as_readables(StandardReadableFormat.HINTED_SIGNAL):
Expand All @@ -54,26 +61,11 @@ def __init__(
super().__init__(name=name)

def _read_energy(self, current_gap: float, current_order: int) -> float:
return self.gap_to_energy_func(
gap=current_gap,
look_up_table=self._lut,
order=current_order,
)
return self.gap_to_energy_func(self._lut_provider, current_gap, current_order)

async def _set_energy(self, energy: float) -> None:
current_order = await self._undulator_order_ref().value.get_value()
min_energy, max_energy = self._lut[current_order][
MIN_ENERGY_COLUMN : MAX_ENERGY_COLUMN + 1
]
if not (min_energy <= energy <= max_energy):
raise ValueError(
f"Requested energy {energy} keV is out of range for harmonic {current_order}: "
f"[{min_energy}, {max_energy}] keV"
)

target_gap = self.energy_to_gap_func(
photon_energy_kev=energy, look_up_table=self._lut, order=current_order
)
target_gap = self.energy_to_gap_func(self._lut_provider, energy, current_order)
await self._undulator_ref().set(target_gap)

@AsyncStatus.wrap
Expand Down
147 changes: 93 additions & 54 deletions src/dodal/devices/i09_1_shared/hard_undulator_functions.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,82 @@
import numpy as np
from daq_config_server.client import ConfigServer
from daq_config_server.models.converters.lookup_tables import GenericLookupTable

from dodal.devices.util.lookup_tables import energy_distance_table
from dodal.devices.i09_1_shared.hard_energy import LookUpTableProvider
from dodal.log import LOGGER

LUT_COMMENTS = ["#"]
HU_SKIP_ROWS = 3

# Physics constants
ELECTRON_REST_ENERGY_MEV = 0.510999

# Columns in the lookup table
RING_ENERGY_COLUMN = 1
MAGNET_FIELD_COLUMN = 2
MIN_ENERGY_COLUMN = 3
MAX_ENERGY_COLUMN = 4
MIN_GAP_COLUMN = 5
MAX_GAP_COLUMN = 6
GAP_OFFSET_COLUMN = 7

# Column names in the lookup table
HARMONICS_COLUMN_NAME = "order"
RING_ENERGY_COLUMN_NAME = "ring_energy_gev"
MAGNET_FIELD_COLUMN_NAME = "magnetic_field_t"
MIN_ENERGY_COLUMN_NAME = "energy_min_ev"
MAX_ENERGY_COLUMN_NAME = "energy_max_ev"
MIN_GAP_COLUMN_NAME = "gap_min_mm"
MAX_GAP_COLUMN_NAME = "gap_max_mm"
GAP_OFFSET_COLUMN_NAME = "gap_offset_mm"
I09_HU_UNDULATOR_LUT_COLUMN_NAMES = [
HARMONICS_COLUMN_NAME,
RING_ENERGY_COLUMN_NAME,
MAGNET_FIELD_COLUMN_NAME,
MIN_ENERGY_COLUMN_NAME,
MAX_ENERGY_COLUMN_NAME,
MIN_GAP_COLUMN_NAME,
MAX_GAP_COLUMN_NAME,
GAP_OFFSET_COLUMN_NAME,
]
MAGNET_BLOCKS_PER_PERIOD = 4
MAGNTE_BLOCK_HEIGHT_MM = 16
MAGNET_BLOCK_HEIGHT_MM = 16


async def get_hu_lut_as_dict(lut_path: str) -> dict[int, np.ndarray]:
lut_dict: dict[int, np.ndarray] = {}
_lookup_table: np.ndarray = await energy_distance_table(
lut_path,
comments=LUT_COMMENTS,
skiprows=HU_SKIP_ROWS,
)
for i in range(_lookup_table.shape[0]):
lut_dict[_lookup_table[i][0]] = _lookup_table[i]
LOGGER.debug(f"Loaded lookup table: {lut_dict}")
return lut_dict
class I09HardLutProvider(LookUpTableProvider):
def __init__(self, config_server: ConfigServer, filepath: str) -> None:
self.config_server = config_server
self.filepath = filepath

def get_look_up_table(self) -> GenericLookupTable:
self._lut = self.config_server.get_file_contents(
self.filepath,
desired_return_type=GenericLookupTable,
reset_cached_result=True,
)
return self._lut


def _validate_order(order: int, look_up_table: dict[int, "np.ndarray"]) -> None:
def _validate_order(look_up_table: GenericLookupTable, order: int) -> None:
"""Validate that the harmonic order exists in the lookup table."""
if order not in look_up_table.keys():
order_column_index = look_up_table.get_column_names().index(HARMONICS_COLUMN_NAME)
if order not in look_up_table.columns[order_column_index]:
raise ValueError(f"Order parameter {order} not found in lookup table")


def _calculate_gamma(look_up_table: dict[int, "np.ndarray"], order: int) -> float:
def _validate_energy_in_range(
look_up_table: GenericLookupTable,
energy: float,
order: int,
) -> None:
"""Check if the requested energy is within the allowed range for the current harmonic order."""
min_energy = look_up_table.get_value(
HARMONICS_COLUMN_NAME, order, MIN_ENERGY_COLUMN_NAME
)
max_energy = look_up_table.get_value(
HARMONICS_COLUMN_NAME, order, MAX_ENERGY_COLUMN_NAME
)
if not (min_energy <= energy <= max_energy):
raise ValueError(
f"Requested energy {energy} keV is out of range for harmonic {order}: "
f"[{min_energy}, {max_energy}] keV"
)


def _calculate_gamma(look_up_table: GenericLookupTable, order: int) -> float:
"""Calculate the Lorentz factor gamma from the lookup table."""
return 1000 * look_up_table[order][RING_ENERGY_COLUMN] / ELECTRON_REST_ENERGY_MEV
ring_energy_gev = look_up_table.get_value(
HARMONICS_COLUMN_NAME, order, "ring_energy_gev"
)
return 1000 * ring_energy_gev / ELECTRON_REST_ENERGY_MEV


def _calculate_undulator_parameter_max(
Expand All @@ -62,16 +95,14 @@ def _calculate_undulator_parameter_max(
/ np.pi
)
* np.sin(np.pi / MAGNET_BLOCKS_PER_PERIOD)
* (1 - np.exp(-2 * np.pi * MAGNTE_BLOCK_HEIGHT_MM / undulator_period_mm))
* (1 - np.exp(-2 * np.pi * MAGNET_BLOCK_HEIGHT_MM / undulator_period_mm))
)


def calculate_gap_i09_hu(
photon_energy_kev: float,
look_up_table: dict[int, np.ndarray],
lut: LookUpTableProvider,
value: float,
order: int = 1,
gap_offset: float = 0.0,
undulator_period_mm: int = 27,
) -> float:
"""
Calculate the undulator gap required to produce a given energy at a given harmonic order.
Expand All @@ -82,14 +113,18 @@ def calculate_gap_i09_hu(
photon_energy_kev (float): Requested photon energy in keV.
look_up_table (dict[int, np.ndarray]): Lookup table containing undulator and beamline parameters for each harmonic order.
order (int, optional): Harmonic order for which to calculate the gap. Defaults to 1.
gap_offset (float, optional): Additional gap offset to apply (in mm). Defaults to 0.0.
undulator_period_mm (int, optional): Undulator period in mm. Defaults to 27.

Returns:
float: Calculated undulator gap in millimeters.
"""
gap_offset: float = 0.0
undulator_period_mm: int = 27
look_up_table: GenericLookupTable = lut.get_look_up_table()

# Validate inputs
_validate_order(look_up_table, order)
_validate_energy_in_range(look_up_table, value, order)

_validate_order(order, look_up_table)
gamma = _calculate_gamma(look_up_table, order)

# Constructive interference of radiation emitted at different poles
Expand All @@ -99,9 +134,7 @@ def calculate_gap_i09_hu(
# gives K^2 = 2*((2*n*gamma^2*lamda/lambda_u)-1)

undulator_parameter_sqr = (
4.959368e-6
* (order * gamma * gamma / (undulator_period_mm * photon_energy_kev))
- 2
4.959368e-6 * (order * gamma * gamma / (undulator_period_mm * value)) - 2
)
if undulator_parameter_sqr < 0:
raise ValueError(
Expand All @@ -116,30 +149,27 @@ def calculate_gap_i09_hu(
# K = undulator_parameter_max*exp(-pi*gap/lambda_u)
# Calculating undulator_parameter_max gives:
undulator_parameter_max = _calculate_undulator_parameter_max(
look_up_table[order][MAGNET_FIELD_COLUMN], undulator_period_mm
look_up_table.get_value(HARMONICS_COLUMN_NAME, order, MAGNET_FIELD_COLUMN_NAME),
undulator_period_mm,
)

# Finnaly, rearranging the equation:
# undulator_parameter = undulator_parameter_max*exp(-pi*gap/lambda_u) for gap gives
gap = (
(undulator_period_mm / np.pi)
* np.log(undulator_parameter_max / undulator_parameter)
+ look_up_table[order][GAP_OFFSET_COLUMN]
+ look_up_table.get_value(HARMONICS_COLUMN_NAME, order, GAP_OFFSET_COLUMN_NAME)
+ gap_offset
)
LOGGER.debug(
f"Calculated gap is {gap}mm for energy {photon_energy_kev}keV at order {order}"
)
LOGGER.debug(f"Calculated gap is {gap}mm for energy {value}keV at order {order}")

return gap


def calculate_energy_i09_hu(
gap: float,
look_up_table: dict[int, "np.ndarray"],
lut: LookUpTableProvider,
value: float,
order: int = 1,
gap_offset: float = 0.0,
undulator_period_mm: int = 27,
) -> float:
"""
Calculate the photon energy produced by the undulator at a given gap and harmonic order.
Expand All @@ -149,21 +179,30 @@ def calculate_energy_i09_hu(
gap (float): Undulator gap in millimeters.
look_up_table (dict[int, np.ndarray]): Lookup table containing undulator and beamline parameters for each harmonic order.
order (int, optional): Harmonic order for which to calculate the energy. Defaults to 1.
gap_offset (float, optional): Additional gap offset to apply (in mm). Defaults to 0.0.
undulator_period_mm (int, optional): Undulator period in mm. Defaults to 27.

Returns:
float: Calculated photon energy in keV.
"""
_validate_order(order, look_up_table)
gap_offset: float = 0.0
undulator_period_mm: int = 27

look_up_table: GenericLookupTable = lut.get_look_up_table()
_validate_order(look_up_table, order)

gamma = _calculate_gamma(look_up_table, order)
undulator_parameter_max = _calculate_undulator_parameter_max(
look_up_table[order][MAGNET_FIELD_COLUMN], undulator_period_mm
look_up_table.get_value(HARMONICS_COLUMN_NAME, order, MAGNET_FIELD_COLUMN_NAME),
undulator_period_mm,
)

undulator_parameter = undulator_parameter_max / np.exp(
(gap - look_up_table[order][GAP_OFFSET_COLUMN] - gap_offset)
(
value
- look_up_table.get_value(
HARMONICS_COLUMN_NAME, order, GAP_OFFSET_COLUMN_NAME
)
- gap_offset
)
/ (undulator_period_mm / np.pi)
)
energy_kev = (
Expand Down
Loading