Feat/add mo1bragg angle positioner #52

Merged
appel_c merged 6 commits from feat/add_mo1bragg_angle_positioner into main 2025-05-16 15:05:53 +02:00
8 changed files with 149 additions and 146 deletions

View File

@@ -219,6 +219,15 @@ dummy_pv:
onFailure: retry
enabled: true
softwareTrigger: false
mo1_bragg_angle:
readoutPriority: baseline
description: Positioner for the Monochromator
deviceClass: debye_bec.devices.mo1_bragg.mo1_bragg_angle.Mo1BraggAngle
deviceConfig:
prefix: "X01DA-OP-MO1:BRAGG:"
onFailure: retry
enabled: true
softwareTrigger: false
# NIDAQ
nidaq:
@@ -337,15 +346,15 @@ reffoilchanger:
# enabled: true
# softwareTrigger: false
# xray_eye:
# readoutPriority: async
# description: X-ray eye
# deviceClass: debye_bec.devices.cameras.basler_cam.BaslerCam
# deviceConfig:
# prefix: "X01DA-ES-XRAYEYE:"
# onFailure: retry
# enabled: true
# softwareTrigger: false
xray_eye:
readoutPriority: async
description: X-ray eye
deviceClass: debye_bec.devices.cameras.basler_cam.BaslerCam
deviceConfig:
prefix: "X01DA-ES-XRAYEYE:"
onFailure: retry
enabled: true
softwareTrigger: false
# Pilatus Curtain
# pilatus_curtain:

View File

@@ -81,7 +81,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG:
"""
USER_ACCESS = ["set_advanced_xas_settings"]
USER_ACCESS = ["set_advanced_xas_settings", "set_xtal"]
def __init__(self, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): # type: ignore
"""
@@ -315,13 +315,8 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
high (float): High energy/angle value of the scan
scan_time (float): Time for a half oscillation
"""
move_type = self.move_type.get()
if move_type == MoveType.ENERGY:
self.scan_settings.s_scan_energy_lo.put(low)
self.scan_settings.s_scan_energy_hi.put(high)
else:
self.scan_settings.s_scan_angle_lo.put(low)
self.scan_settings.s_scan_angle_hi.put(high)
self.scan_settings.s_scan_energy_lo.put(low)
self.scan_settings.s_scan_energy_hi.put(high)
self.scan_settings.s_scan_scantime.put(scan_time)
def wait_for_signal(self, signal: Cpt, value: Any, timeout: float | None = None) -> None:
@@ -381,18 +376,10 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
p_kink (float): Position of kink in %
e_kink (float): Energy of kink in eV
"""
# TODO Add fallback solution for automatic testing, otherwise test will fail
# because no monochromator will calculate the angle
# Unsure how to implement this
move_type = self.move_type.get()
if move_type == MoveType.ENERGY:
e_kink_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=e_kink)
# Angle and Energy are inverse proportional!
high_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=low)
low_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=high)
else:
raise Mo1BraggError("MoveType Angle not implemented for advanced scans, use Energy")
e_kink_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=e_kink)
# Angle and Energy are inverse proportional!
high_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=low)
low_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=high)
pos, vel, dt = compute_spline(
low_deg=low_deg,

View File

@@ -0,0 +1,20 @@
"""Positioner implementation with readback angle of the MO1 Bragg positioner."""
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, EpicsSignalWithRBV
from debye_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner
class Mo1BraggAngle(Mo1BraggPositioner):
"""Positioner implementation with readback angle of the MO1 Bragg positioner."""
readback = Cpt(EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True)
setpoint = Cpt(EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind="normal", auto_monitor=True)
low_lim = Cpt(EpicsSignalRO, suffix="lo_lim_pos_angle_RBV", kind="config", auto_monitor=True)
high_lim = Cpt(EpicsSignalRO, suffix="hi_lim_pos_angle_RBV", kind="config", auto_monitor=True)
@property
def egu(self) -> str:
"""Return the engineering unit of the positioner."""
return "deg"

View File

@@ -189,13 +189,13 @@ class Mo1BraggScanControl(Device):
class Mo1BraggPositioner(Device, PositionerBase):
"""
Positioner implementation of the MO1 Bragg positioner.
Positioner implementation with readback energy of the MO1 Bragg positioner.
The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG:
This soft IOC connects to the NI motor and its control loop.
"""
USER_ACCESS = ["set_advanced_xas_settings"]
USER_ACCESS = ["set_xtal"]
####### Sub-components ########
# Namespace is cleaner and easier to maintain
@@ -207,10 +207,6 @@ class Mo1BraggPositioner(Device, PositionerBase):
scan_control = Cpt(Mo1BraggScanControl, "")
status = Cpt(Mo1BraggStatus, "")
############# switch between energy and angle #############
# TODO should be removed/replaced once decision about pseudo motor is made
move_type = Cpt(MoveTypeSignal, value=MoveType.ENERGY, kind="config")
############# Energy PVs #############
readback = Cpt(
@@ -226,22 +222,6 @@ class Mo1BraggPositioner(Device, PositionerBase):
high_lim = Cpt(EpicsSignalRO, suffix="hi_lim_pos_energy_RBV", kind="config", auto_monitor=True)
velocity = Cpt(EpicsSignalWithRBV, suffix="move_velocity", kind="config", auto_monitor=True)
########### Angle PVs #############
# TODO Pseudo motor for angle?
feedback_pos_angle = Cpt(
EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True
)
setpoint_abs_angle = Cpt(
EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind="normal", auto_monitor=True
)
low_limit_angle = Cpt(
EpicsSignalRO, suffix="lo_lim_pos_angle_RBV", kind="config", auto_monitor=True
)
high_limit_angle = Cpt(
EpicsSignalRO, suffix="hi_lim_pos_angle_RBV", kind="config", auto_monitor=True
)
########## Move Command PVs ##########
move_abs = Cpt(EpicsSignal, suffix="move_abs", kind="config", put_complete=True)
@@ -271,9 +251,7 @@ class Mo1BraggPositioner(Device, PositionerBase):
success (bool) : Flag to indicate if the motion was successful
"""
self.move_stop.put(1)
if self._move_thread is not None:
self._move_thread.join()
self._move_thread = None
self._stopped = True
super().stop(success=success)
def stop_scan(self) -> None:
@@ -290,9 +268,7 @@ class Mo1BraggPositioner(Device, PositionerBase):
@property
def limits(self) -> tuple:
"""Return limits of the Bragg positioner"""
if self.move_type.get() == MoveType.ENERGY:
return (self.low_lim.get(), self.high_lim.get())
return (self.low_limit_angle.get(), self.high_limit_angle.get())
return (self.low_lim.get(), self.high_lim.get())
@property
def low_limit(self) -> float:
@@ -307,16 +283,12 @@ class Mo1BraggPositioner(Device, PositionerBase):
@property
def egu(self) -> str:
"""Return the engineering units of the positioner"""
if self.move_type.get() == MoveType.ENERGY:
return "eV"
return "deg"
return "eV"
@property
def position(self) -> float:
"""Return the current position of Mo1Bragg, considering the move type"""
move_type = self.move_type.get()
move_cpt = self.readback if move_type == MoveType.ENERGY else self.feedback_pos_angle
return move_cpt.get()
return self.readback.get()
# pylint: disable=arguments-differ
def check_value(self, value: float) -> None:
@@ -332,7 +304,7 @@ class Mo1BraggPositioner(Device, PositionerBase):
raise LimitError(f"position={value} not within limits {self.limits}")
def _move_and_finish(
self, target_pos: float, move_cpt: Cpt, status: DeviceStatus, update_frequency: float = 0.1
self, target_pos: float, status: DeviceStatus, update_frequency: float = 0.1
) -> None:
"""
Method to be called in the move thread to move the Bragg positioner
@@ -349,12 +321,14 @@ class Mo1BraggPositioner(Device, PositionerBase):
update_frequency (float): Optional, frequency to update the current position of
the motion, defaults to 0.1s
"""
motor_name = None
try:
# Set the target position on IOC
move_cpt.put(target_pos)
self.setpoint.put(target_pos)
self.move_abs.put(1)
# Currently sleep is needed due to delay in updates on PVs, maybe time can be reduced
time.sleep(0.5)
motor_name = self.name
while self.motor_is_moving.get() == 0:
if self.stopped:
raise Mo1BraggStoppedError(f"Device {self.name} was stopped")
@@ -364,10 +338,12 @@ class Mo1BraggPositioner(Device, PositionerBase):
# pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Error in move thread of device {self.name}: {content}")
logger.error(
f"Error in move thread of device {motor_name if motor_name else ''}: {content}"
)
status.set_exception(exc=exc)
def move(self, value: float, move_type: str | MoveType = None, **kwargs) -> DeviceStatus:
def move(self, value: float, **kwargs) -> DeviceStatus:
"""
Move the Bragg positioner to the specified value, allows to
switch between move types angle and energy.
@@ -381,16 +357,12 @@ class Mo1BraggPositioner(Device, PositionerBase):
DeviceStatus : status object to track the motion
"""
self._stopped = False
if move_type is not None:
self.move_type.put(move_type)
move_type = self.move_type.get()
move_cpt = self.setpoint if move_type == MoveType.ENERGY else self.setpoint_abs_angle
self.check_value(value)
status = DeviceStatus(device=self)
self._move_thread = threading.Thread(
target=self._move_and_finish, args=(value, move_cpt, status, 0.1)
target=self._move_and_finish, args=(value, status, 0.1)
)
self._move_thread.start()
return status

View File

@@ -72,11 +72,7 @@ class XASSimpleScan(AsyncFlyScanBase):
yield None
def pre_scan(self):
"""Pre Scan action. Ensure the motor movetype is set to energy, then check
limits for start/end energy.
#TODO Remove once the motor movetype is removed and ANGLE motion is a pseudo motor.
"""
yield from self.stubs.send_rpc_and_wait(self.motor, "move_type.set", "energy")
"""Pre Scan action."""
self._check_limits()
# Ensure parent class pre_scan actions to be called.

View File

@@ -52,7 +52,6 @@ def test_init(mock_bragg):
dev = mock_bragg
assert dev.name == "bragg"
assert dev.prefix == "X01DA-OP-MO1:BRAGG:"
assert dev.move_type.get() == MoveType.ENERGY
assert dev.crystal.offset_si111._read_pvname == "X01DA-OP-MO1:BRAGG:offset_si111_RBV"
assert dev.move_abs._read_pvname == "X01DA-OP-MO1:BRAGG:move_abs"
@@ -61,29 +60,15 @@ def test_check_value(mock_bragg):
dev = mock_bragg
dev.low_lim._read_pv.mock_data = 0
dev.high_lim._read_pv.mock_data = 1
dev.low_limit_angle._read_pv.mock_data = 10
dev.high_limit_angle._read_pv.mock_data = 20
# Check that limits are taken correctly from angle or energy
# Energy first
move_type = MoveType.ENERGY
dev.move_type.set(move_type)
# nothing happens
dev.check_value(0.5)
with pytest.raises(LimitError):
dev.check_value(15)
# Angle next
move_type = MoveType.ANGLE
dev.move_type.set(move_type)
dev.check_value(15)
with pytest.raises(LimitError):
dev.check_value(0.5)
def test_egu(mock_bragg):
dev = mock_bragg
assert dev.egu == "eV"
dev.move_type.set(MoveType.ANGLE)
assert dev.egu == "deg"
def test_move_succeeds(mock_bragg):
@@ -135,16 +120,10 @@ def test_set_xtal(mock_bragg):
def test_set_xas_settings(mock_bragg):
dev = mock_bragg
dev.move_type.set(MoveType.ENERGY)
dev.set_xas_settings(low=0.5, high=1, scan_time=0.1)
assert dev.scan_settings.s_scan_energy_lo.get() == 0.5
assert dev.scan_settings.s_scan_energy_hi.get() == 1
assert dev.scan_settings.s_scan_scantime.get() == 0.1
dev.move_type.set(MoveType.ANGLE)
dev.set_xas_settings(low=10, high=20, scan_time=1)
assert dev.scan_settings.s_scan_angle_lo.get() == 10
assert dev.scan_settings.s_scan_angle_hi.get() == 20
assert dev.scan_settings.s_scan_scantime.get() == 1
def test_set_trig_settings(mock_bragg):

View File

@@ -0,0 +1,88 @@
"""Tests for the Mo1BraggAngle class."""
import threading
from unittest import mock
import ophyd
import pytest
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
from debye_bec.devices.mo1_bragg.mo1_bragg_angle import Mo1BraggAngle
from debye_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggStoppedError
# pylint: disable=protected-access
@pytest.fixture(scope="function")
def mock_bragg() -> Mo1BraggAngle:
"""Fixture for the Mo1BraggAngle device."""
name = "bragg"
prefix = "X01DA-OP-MO1:BRAGG:"
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = Mo1BraggAngle(name=name, prefix=prefix)
patch_dual_pvs(dev)
yield dev
def test_mo1_bragg_angle_init(mock_bragg):
"""Test the initialization of the Mo1BraggAngle device."""
assert mock_bragg.name == "bragg"
assert mock_bragg.prefix == "X01DA-OP-MO1:BRAGG:"
assert isinstance(mock_bragg.readback, ophyd.EpicsSignalRO)
assert isinstance(mock_bragg.setpoint, ophyd.EpicsSignalWithRBV)
assert isinstance(mock_bragg.low_lim, ophyd.EpicsSignalRO)
assert isinstance(mock_bragg.high_lim, ophyd.EpicsSignalRO)
def test_mo1_bragg_angle_egu(mock_bragg):
"""Test the engineering unit of the Mo1BraggAngle device."""
assert mock_bragg.egu == "deg"
def test_mo1_bragg_angle_limits(mock_bragg):
"""Test the limits of the Mo1BraggAngle device."""
mock_bragg.low_lim._read_pv.mock_data = -10
mock_bragg.high_lim._read_pv.mock_data = 10
assert mock_bragg.limits == (-10, 10)
def test_mo1_bragg_angle_move(mock_bragg):
"""Test the move method of the Mo1BraggAngle device."""
mock_bragg.setpoint.put(0)
mock_bragg.readback._read_pv.mock_data = 0 # EpicsSignalRO
# Change PV for motor is moving before starting the move
mock_bragg.motor_is_moving._read_pv.mock_data = 0 # EpicsSignalRO
status = mock_bragg.move(5)
assert status.done is False
# Check setpoint is set correctly
assert mock_bragg.setpoint.get() == 5
# Update the motor is moving PV to simulate that the move is done
mock_bragg.motor_is_moving._read_pv.mock_data = 1
assert mock_bragg.motor_is_moving.get() == 1
status.wait(timeout=5) # If the status does not resolve after 5 seconds, something is wrong
assert status.done is True
def test_mo1_bragg_angle_stop(mock_bragg):
"""Test the stop method of the Mo1BraggAngle device."""
assert mock_bragg.stopped is False
mock_bragg.stop()
assert mock_bragg.stopped is True
status = mock_bragg.move(5)
assert status.done is False
# stopped should be resetted
assert mock_bragg.stopped is False
with pytest.raises(Mo1BraggStoppedError):
mock_bragg.stop()
status.wait(timeout=5) # This should raise before due to stop() call

View File

@@ -92,18 +92,6 @@ def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
action="read",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
@@ -205,18 +193,6 @@ def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock):
action="read",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
@@ -312,18 +288,6 @@ def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock):
action="read",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],
@@ -427,18 +391,6 @@ def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock):
action="read",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"],