Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
f157912
create hard-x-ray undulator and add to i09
Sep 15, 2025
5e5e504
move comments to a parametere
Sep 15, 2025
d44acc3
add "comments" parameter to lookuptable in hard undulator
Sep 15, 2025
ab7491c
extract update lookup table cache method
Sep 15, 2025
803e4ca
few comments added
Sep 15, 2025
9c9b317
typo fix
Sep 15, 2025
448059a
add reading test
Sep 15, 2025
6d52c54
add more tests
Sep 16, 2025
99a05a6
more tests
Sep 16, 2025
127dd9c
tests
Sep 16, 2025
735debd
remove lut path
Sep 16, 2025
c954f09
more tests!!!
Sep 16, 2025
375dec2
Merge branch 'main' into add_undulator_i09
Villtord Sep 16, 2025
a7f11ce
update docs
Sep 16, 2025
5d5438a
remove order from set method - it is not how currently beamline use it
Sep 17, 2025
67f32eb
fix cached lookup value
Sep 17, 2025
0c6f60b
update tests
Sep 17, 2025
05124dd
set method docs update
Sep 17, 2025
595b8ad
make gap calculation function callable parameter
Sep 17, 2025
1d5a9e7
Merge branch 'main' into add_undulator_i09
Villtord Sep 17, 2025
4debd54
review: order extracted to a separate class
Sep 24, 2025
2efc846
replace column numbers with constants
Sep 24, 2025
698c071
amend docs
Sep 24, 2025
8aa55da
change names of parameters in calculation gap function
Sep 25, 2025
9a507d1
amend test string
Sep 25, 2025
2a687b2
add valueerror message with valid orders
Sep 25, 2025
3c1f13a
restructure tests, add docs
Sep 25, 2025
6ac71f3
amend comment
Sep 25, 2025
3988d25
Sorted out theory of gap calculation
Sep 26, 2025
651878d
Merge branch 'main' into add_undulator_i09
Villtord Sep 26, 2025
e52c1fe
Refactor undulator class, make hardUndulator only set gap in mm
Oct 2, 2025
4da6cad
Merge branch 'main' into add_undulator_i09
Villtord Oct 2, 2025
54cef04
fix tests, amend docstrings
Oct 2, 2025
6771325
update tests and docs
Oct 3, 2025
3f26e35
move _set_gap to a base undulator class
Oct 3, 2025
fe6823f
Remove order from hard undulator
Oct 3, 2025
ea93944
add test move to same gap
Oct 3, 2025
ebd3c2e
Merge branch 'main' into add_undulator_i09
Villtord Oct 3, 2025
4773595
Remove hard undulator class and use base undulator instead
Oct 3, 2025
d2903e6
fix test
Oct 3, 2025
7448198
Merge branch 'main' into add_undulator_i09
Villtord Oct 6, 2025
8c24690
amend test after rebase
Oct 6, 2025
41e1645
Merge branch 'add_undulator_i09' of ssh://github.com/DiamondLightSour…
Oct 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/dodal/devices/i09_1_shared/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from dodal.devices.i09_1_shared.hard_undulator_functions import (
calculate_gap_hu,
get_hu_lut_as_dict,
)

__all__ = ["calculate_gap_hu", "get_hu_lut_as_dict"]
109 changes: 109 additions & 0 deletions src/dodal/devices/i09_1_shared/hard_undulator_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import numpy as np

from dodal.devices.util.lookup_tables import energy_distance_table
from dodal.log import LOGGER

LUT_COMMENTS = ["#"]
HU_SKIP_ROWS = 3

# Physics constants
ELECTRON_REST_ENERGY_MEV = 0.510999

# Columns in the lookup table
RING_ENERGY_COLUMN = 1
MAGNET_FIELD_COLUMN = 2
MIN_ENERGY_COLUMN = 3
MAX_ENERGY_COLUMN = 4
GAP_OFFSET_COLUMN = 7


async def get_hu_lut_as_dict(lut_path: str) -> dict:
lut_dict: dict = {}
_lookup_table: np.ndarray = await energy_distance_table(
lut_path,
comments=LUT_COMMENTS,
skiprows=HU_SKIP_ROWS,
)
for i in range(_lookup_table.shape[0]):
lut_dict[_lookup_table[i][0]] = _lookup_table[i]
LOGGER.debug(f"Loaded lookup table:\n {lut_dict}")
return lut_dict


def calculate_gap_hu(
photon_energy_kev: float,
look_up_table: dict[int, "np.ndarray"],
order: int = 1,
gap_offset: float = 0.0,
undulator_period_mm: int = 27,
) -> float:
"""
Calculate the undulator gap required to produce a given energy at a given harmonic order.
This algorithm was provided by the I09 beamline scientists, and is based on the physics of undulator radiation.
https://cxro.lbl.gov//PDF/X-Ray-Data-Booklet.pdf

Args:
photon_energy_kev (float): Requested photon energy in keV.
look_up_table (dict[int, np.ndarray]): Lookup table containing undulator and beamline parameters for each harmonic order.
order (int, optional): Harmonic order for which to calculate the gap. Defaults to 1.
gap_offset (float, optional): Additional gap offset to apply (in mm). Defaults to 0.0.
undulator_period_mm (int, optional): Undulator period in mm. Defaults to 27.

Returns:
float: Calculated undulator gap in millimeters.
"""
magnet_blocks_per_period = 4
magnet_block_height_mm = 16

if order not in look_up_table.keys():
raise ValueError(f"Order parameter {order} not found in lookup table")

gamma = 1000 * look_up_table[order][RING_ENERGY_COLUMN] / ELECTRON_REST_ENERGY_MEV

# Constructive interference of radiation emitted at different poles
# lamda = (lambda_u/2*gamma^2)*(1+K^2/2 + gamma^2*theta^2)/n for n=1,2,3...
# theta is the observation angle, assumed to be 0 here.
# Rearranging for K (the undulator parameter, related to magnetic field and gap)
# gives K^2 = 2*((2*n*gamma^2*lamda/lambda_u)-1)

undulator_parameter_sqr = (
4.959368e-6
* (order * gamma * gamma / (undulator_period_mm * photon_energy_kev))
- 2
)
if undulator_parameter_sqr < 0:
raise ValueError("diffraction parameter squared must be positive!")
undulator_parameter = np.sqrt(undulator_parameter_sqr)

# Undulator_parameter K is also defined as K = 0.934*B0[T]*lambda_u[cm],
# where B0[T] is a peak magnetic field that must depend on gap,
# but in our LUT it is does not depend on gap, so it's a factor,
# leading to K = 0.934*B0[T]*lambda_u[cm]*exp(-pi*gap/lambda_u) or
# K = undulator_parameter_max*exp(-pi*gap/lambda_u)
# Calculating undulator_parameter_max gives:
undulator_parameter_max = (
(
2
* 0.0934
* undulator_period_mm
* look_up_table[order][MAGNET_FIELD_COLUMN]
* magnet_blocks_per_period
/ np.pi
)
* np.sin(np.pi / magnet_blocks_per_period)
* (1 - np.exp(-2 * np.pi * magnet_block_height_mm / undulator_period_mm))
)

# Finnaly, rearranging the equation:
# undulator_parameter = undulator_parameter_max*exp(-pi*gap/lambda_u) for gap gives:
gap = (
(undulator_period_mm / np.pi)
* np.log(undulator_parameter_max / undulator_parameter)
+ look_up_table[order][GAP_OFFSET_COLUMN]
+ gap_offset
)
LOGGER.debug(
f"Calculated gap is {gap}mm for energy {photon_energy_kev}keV at order {order}"
)

return gap
201 changes: 151 additions & 50 deletions src/dodal/devices/undulator.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import os

import numpy as np
from bluesky.protocols import Movable
from bluesky.protocols import Locatable, Location, Movable
from numpy import ndarray
from ophyd_async.core import (
AsyncStatus,
Reference,
StandardReadable,
StandardReadableFormat,
soft_signal_r_and_setter,
soft_signal_rw,
)
from ophyd_async.epics.core import epics_signal_r
from ophyd_async.epics.motor import Motor
Expand Down Expand Up @@ -38,42 +39,75 @@ def _get_gap_for_energy(
)


class Undulator(StandardReadable, Movable[float]):
class UndulatorOrder(StandardReadable, Locatable[int]):
"""
An Undulator-type insertion device, used to control photon emission at a given
beam energy.
Represents the order of an undulator, providing mechanisms to read, set, and validate the order value against a lookup table passed as a dictionary.

"""

def __init__(self, lut: dict, name: str = "") -> None:
"""
Args:
lut (dict): Dictionary read from lookup table file.
name: Name for device. Defaults to ""
"""
self.lut = lut
with self.add_children_as_readables():
self._order = soft_signal_rw(int, initial_value=3)
super().__init__(name=name)

@AsyncStatus.wrap
async def set(self, value: int) -> None:
await self._check_order_valid(value)
await self._order.set(value)

async def _check_order_valid(self, value: int) -> None:
if value not in self.lut.keys():
raise ValueError(
f"Order {value} not found in lookup table, must be in {[int(key) for key in self.lut.keys()]}"
)

async def locate(self) -> Location[int]:
return await self._order.locate()


class BaseUndulator(StandardReadable, Movable[float]):
"""
Base class for undulator devices providing gap control and access management.
This class expects target gap value [mm] passed in set method.
"""

def __init__(
self,
prefix: str,
id_gap_lookup_table_path: str = os.devnull,
name: str = "",
poles: int | None = None,
length: float | None = None,
undulator_period: int | None = None,
baton: Baton | None = None,
name: str = "",
) -> None:
"""Constructor

"""
Args:
prefix: PV prefix
poles (int): Number of magnetic poles built into the undulator
length (float): Length of the undulator in meters
poles (int, optional): Number of magnetic poles built into the undulator
length (float, optional): Length of the undulator in meters
undulator_period(int, optional): Undulator period
baton (optional): Baton object if provided.
name (str, optional): Name for device. Defaults to "".
"""

self.baton_ref = Reference(baton) if baton else None
self.id_gap_lookup_table_path = id_gap_lookup_table_path

with self.add_children_as_readables():
self.gap_access = epics_signal_r(EnabledDisabledUpper, prefix + "IDBLENA")
self.gap_motor = Motor(prefix + "BLGAPMTR")
self.current_gap = epics_signal_r(float, prefix + "CURRGAPD")
self.gap_access = epics_signal_r(EnabledDisabledUpper, prefix + "IDBLENA")

with self.add_children_as_readables(StandardReadableFormat.CONFIG_SIGNAL):
self.gap_discrepancy_tolerance_mm, _ = soft_signal_r_and_setter(
float,
initial_value=UNDULATOR_DISCREPANCY_THRESHOLD_MM,
)

if poles is not None:
self.poles, _ = soft_signal_r_and_setter(
int,
Expand All @@ -90,58 +124,125 @@ def __init__(
else:
self.length = None

super().__init__(name)
if undulator_period is not None:
self.undulator_period, _ = soft_signal_r_and_setter(
int, initial_value=undulator_period
)
else:
self.undulator_period = None
super().__init__(name=name)

@AsyncStatus.wrap
async def set(self, value: float):
async def set(self, value: float) -> None:
"""
Set the undulator gap to a given energy in keV
Set the undulator gap to a given value in mm

Args:
value: energy in keV
value: gap in mm
"""
await self._set_undulator_gap(value)
await self.raise_if_not_enabled() # Check access
if await self._check_gap_within_threshold(value):
LOGGER.debug(
"Gap is already in the correct place, no need to ask it to move"
)
return

LOGGER.info(
f"Undulator gap mismatch. Moving gap to nominal value, {value:.3f}mm"
)
commissioning_mode = await self._is_commissioning_mode_enabled()
if not commissioning_mode:
# Only move if the gap is sufficiently different to the value from the
# DCM lookup table AND we're not in commissioning mode
await self.gap_motor.set(
value,
timeout=STATUS_TIMEOUT_S,
)
else:
LOGGER.warning("In test mode, not moving ID gap")

async def _check_gap_within_threshold(self, target_gap: float) -> bool:
"""
Check if the undulator gap is within the acceptable threshold of the target gap

Args:
target_gap: target gap in mm
Returns:
True if the gap is within the threshold, False otherwise
"""
current_gap = await self.current_gap.get_value()
tolerance = await self.gap_discrepancy_tolerance_mm.get_value()
return abs(target_gap - current_gap) <= tolerance

async def raise_if_not_enabled(self):
async def _is_commissioning_mode_enabled(self) -> bool | None:
"""
Asynchronously checks if commissioning mode is enabled via the baton reference.
"""
return self.baton_ref and await self.baton_ref().commissioning.get_value()

async def raise_if_not_enabled(self) -> AccessError | None:
"""
Asynchronously raises AccessError if gap access is disabled and not in commissioning mode.
"""
access_level = await self.gap_access.get_value()
commissioning_mode = await self._is_commissioning_mode_enabled()
if access_level is EnabledDisabledUpper.DISABLED and not commissioning_mode:
raise AccessError("Undulator gap access is disabled. Contact Control Room")

async def _set_undulator_gap(self, energy_kev: float) -> None:
await self.raise_if_not_enabled()
target_gap = await self._get_gap_to_match_energy(energy_kev)
LOGGER.info(
f"Setting undulator gap to {target_gap:.3f}mm based on {energy_kev:.2f}kev"

class Undulator(BaseUndulator):
"""
An Undulator-type insertion device, used to control photon emission at a given beam energy.
This class expects energy [keV] passed in set method and does convertion to gap
internally, for which it requires path to lookup table file in constructor.
"""

def __init__(
self,
prefix: str,
id_gap_lookup_table_path: str = os.devnull,
poles: int | None = None,
length: float | None = None,
undulator_period: int | None = None,
baton: Baton | None = None,
name: str = "",
) -> None:
"""Constructor

Args:
prefix: PV prefix
id_gap_lookup_table_path (str): Path to a lookup table file
poles (int, optional): Number of magnetic poles built into the undulator
length (float, optional): Length of the undulator in meters
undulator_period(int, optional): Undulator period
baton (optional): Baton object if provided.
name (str, optional): Name for device. Defaults to "".
"""

self.id_gap_lookup_table_path = id_gap_lookup_table_path
super().__init__(
prefix=prefix,
poles=poles,
length=length,
undulator_period=undulator_period,
baton=baton,
name=name,
)

# Check if undulator gap is close enough to the value from the DCM
current_gap = await self.current_gap.get_value()
tolerance = await self.gap_discrepancy_tolerance_mm.get_value()
difference = abs(target_gap - current_gap)
if difference > tolerance:
LOGGER.info(
f"Undulator gap mismatch. {difference:.3f}mm is outside tolerance.\
Moving gap to nominal value, {target_gap:.3f}mm"
)
commissioning_mode = await self._is_commissioning_mode_enabled()
if not commissioning_mode:
# Only move if the gap is sufficiently different to the value from the
# DCM lookup table AND we're not in commissioning mode
await self.gap_motor.set(
target_gap,
timeout=STATUS_TIMEOUT_S,
)
else:
LOGGER.warning("In test mode, not moving ID gap")
else:
LOGGER.debug(
"Gap is already in the correct place for the new energy value "
f"{energy_kev}, no need to ask it to move"
)
@AsyncStatus.wrap
async def set(self, value: float):
"""
Check conditions and Set undulator gap to a given energy in keV

async def _is_commissioning_mode_enabled(self):
return self.baton_ref and await self.baton_ref().commissioning.get_value()
Args:
value: energy in keV
"""
# Convert energy in keV to gap in mm first
target_gap = await self._get_gap_to_match_energy(value)
LOGGER.info(
f"Setting undulator gap to {target_gap:.3f}mm based on {value:.2f}kev"
)
await super().set(target_gap)

async def _get_gap_to_match_energy(self, energy_kev: float) -> float:
"""
Expand Down
Loading