refactor: cleanup for device and tests

This commit is contained in:
2024-07-26 17:09:12 +02:00
parent 59675e0038
commit ce3718dcf6
2 changed files with 187 additions and 124 deletions

View File

@@ -35,7 +35,7 @@ logger = bec_logger.logger
class ScanControlScanStatus(int, enum.Enum):
"""Enum class for the scan status of Bragg positioner"""
"""Enum class for the scan status of the Bragg positioner"""
PARAMETER_WRONG = 0
VALIDATION_PENDING = 1
@@ -44,7 +44,7 @@ class ScanControlScanStatus(int, enum.Enum):
class ScanControlLoadMessage(int, enum.Enum):
"""Enum for validating messages for load message of Bragg positioner"""
"""Enum for validating messages for load message of the Bragg positioner"""
PENDING = 0
STARTED = 1
@@ -194,7 +194,10 @@ class Mo1BraggScanControl(Device):
@dataclass
class ScanParameter:
"""Dataclass to store the scan parameters for the Mo1 Bragg positioner"""
"""Dataclass to store the scan parameters for the Mo1 Bragg positioner.
This needs to be in sync with the kwargs of the MO1 Bragg scans from Debye, to
ensure that the scan parameters are correctly set. Any changes in the scan kwargs,
i.e. renaming or adding new parameters, need to be represented here as well."""
scan_time: float = None
scan_duration: float = None
@@ -227,7 +230,6 @@ class Mo1Bragg(Device, PositionerBase):
# signal to indicate the move type 'energy' or 'angle'
move_type = Cpt(MoveTypeSignal, value=MoveType.ENERGY, kind="config")
################ Motor PVs ################
# Energy PVs
readback = Cpt(
EpicsSignalRO, suffix="feedback_pos_energy_RBV", kind="hinted", auto_monitor=True
@@ -312,7 +314,6 @@ class Mo1Bragg(Device, PositionerBase):
value (int) : current progress value
"""
max_value = 100
# logger.info(f"Progress at {value}")
self._run_subs(
sub_type=self.SUB_PROGRESS,
value=value,
@@ -336,9 +337,7 @@ class Mo1Bragg(Device, PositionerBase):
Args:
success (bool) : Flag to indicate if the motion was successful
"""
# Stop any motion on the device
self.move_stop.put(1)
# Stop the move thread
self._stopped = True
if self._move_thread is not None:
self._move_thread.join()
@@ -412,33 +411,24 @@ class Mo1Bragg(Device, PositionerBase):
update_frequency (float): Optional, frequency to update the current position of the motion, defaults to 0.1s
"""
success = True
exception = None
try:
# Set the target position on IOC
move_cpt.put(target_pos)
# Start motion
self.move_abs.put(1)
# Currently sleep is needed due to delay in updates on PVs, maybe time can be reduced
time.sleep(0.5)
while self.motor_is_moving.get() == 0:
val = read_cpt.get()
self._run_subs(sub_type=self.SUB_READBACK, value=val)
if self.stopped:
success = False
break
time.sleep(update_frequency)
# pylint: disable=protected-access
status._finished(success=success)
# pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Error in move thread of device {self.name}: {content}")
exception = exc
finally:
if exception:
status.set_exception(exc=exception)
else:
# pylint: disable=protected-access
status._finished(success=success)
status.set_exception(exc=exc)
def move(self, value: float, move_type: str | MoveType = None, **kwargs) -> DeviceStatus:
"""Move the Bragg positioner to the specified value, allows to switch between move types angle and energy.
@@ -579,6 +569,7 @@ class Mo1Bragg(Device, PositionerBase):
def kickoff(self):
"""Kickoff the device, called from BEC."""
scan_duration = self.scan_control.scan_duration.get()
# TODO implement better logic for infinite scans, at least bring it up with Debye
start_func = (
self.scan_control.scan_start_infinite.put
if scan_duration < 0.1
@@ -617,21 +608,22 @@ class Mo1Bragg(Device, PositionerBase):
"""
state = self.scan_control.scan_msg.get()
if state != target_state:
logger.warning(f"Resetting scan validation in stage for state: {ScanControlLoadMessage(state)}, " \
f"retry .get() on scan_control: {ScanControlLoadMessage(self.scan_control.scan_msg.get())} and sleeping 1s")
logger.warning(
f"Resetting scan validation in stage for state: {ScanControlLoadMessage(state)}, "
f"retry .get() on scan_control: {ScanControlLoadMessage(self.scan_control.scan_msg.get())} and sleeping 1s"
)
self.scan_control.scan_val_reset.put(1)
# Sleep to ensure the reset is done
time.sleep(1)
if not self.wait_for_signals(
signal_conditions=[(self.scan_control.scan_msg.get, target_state)],
timeout=self.timeout_for_pvwait,
check_stopped=True,
):
raise TimeoutError(
f"Timeout after {self.timeout_for_pvwait} while waiting for scan status," \
f" current state: {ScanControlScanStatus(self.scan_control.scan_msg.get())}"
f"Timeout after {self.timeout_for_pvwait} while waiting for scan status,"
f" current state: {ScanControlScanStatus(self.scan_control.scan_msg.get())}"
)
def on_stage(self) -> None:
@@ -681,7 +673,9 @@ class Mo1Bragg(Device, PositionerBase):
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
)
else:
raise Mo1BraggError(f"Scan mode {scan_name} not implemented for device {self.name}")
raise Mo1BraggError(
f"Scan mode {scan_name} not implemented for scan_type={self.scaninfo.scan_type} on device {self.name}"
)
# Load the scan parameters to the controller
self.scan_control.scan_load.put(1)
# Wait for params to be checked from controller
@@ -728,14 +722,15 @@ class Mo1Bragg(Device, PositionerBase):
return super().unstage()
def on_unstage(self) -> None:
"""Actions to be executed when the device is unstaged."""
"""Actions to be executed when the device is unstaged.
The checks here ensure that the controller resets the Scan_msg to PENDING state."""
if self.wait_for_signals(
signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING)],
timeout=self.timeout_for_pvwait,
check_stopped=True,
):
return
self.scan_control.scan_val_reset.put(1)
if not self.wait_for_signals(
signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING)],
@@ -746,8 +741,6 @@ class Mo1Bragg(Device, PositionerBase):
f"Timeout after {self.timeout_for_pvwait} while waiting for scan validation"
)
# -------------- End Flyer Interface methods -----------------#
# -------------- Utility methods -----------------#
@@ -850,7 +843,8 @@ class Mo1Bragg(Device, PositionerBase):
result = self.wait_for_signals(
signal_conditions, timeout, check_stopped, interval, all_signals
)
if result:
if result is True:
# pylint: disable=protected-access
status.set_finished()
else:
status.set_exception(exception_on_timeout)

View File

@@ -7,7 +7,11 @@ from unittest import mock
import ophyd
import pytest
from bec_lib.messages import ScanStatusMessage
from bec_lib.messages import ScanQueueMessage, ScanStatusMessage
from bec_server.scan_server.scan_assembler import ScanAssembler
from bec_server.scan_server.scan_queue import RequestBlock
from bec_server.scan_server.scan_worker import ScanWorker
from bec_server.scan_server.tests.fixtures import scan_server_mock
from ophyd.utils import LimitError
from ophyd_devices.tests.utils import MockPV
@@ -23,8 +27,7 @@ from debye_bec.devices.mo1_bragg import (
# TODO move this function to ophyd_devices, it is duplicated in csaxs_bec and needed for other pluging repositories
from debye_bec.devices.test_utils.utils import patch_dual_pvs
from bec_server.scan_server.tests.fixtures import scan_server_mock
from bec_server.scan_server.scan_worker import ScanWorker
@pytest.fixture(scope="function")
def scan_worker_mock(scan_server_mock):
@@ -271,97 +274,163 @@ def test_unstage(mock_bragg):
assert mock_put.call_count == 1
def test_stage_(mock_bragg):
# Test unknown scan type first
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
msg = ScanStatusMessage(
scan_id="my_scan_id",
status="closed",
info={
"kwargs": {
"start": 0,
"stop": 5,
"scan_time": 1,
"scan_duration": 10,
"xrd_enable_low": True,
"xrd_enable_high": False,
"num_trigger_low": 1,
"num_trigger_high": 7,
"exp_time_low": 1,
"exp_time_high": 3,
"cycle_low": 1,
"cycle_high": 5,
}
},
metadata={},
)
mock_bragg.scaninfo.scan_msg = msg
mock_bragg.scaninfo.scan_msg.content["info"].update({"scan_name": "unknown_fly_scan"})
with (
mock.patch.object(mock_bragg.scaninfo, "load_scan_metadata") as mock_load_scan_metadata,
mock.patch.object(mock_bragg, "_check_scan_msg") as mock_check_scan_msg,
):
with pytest.raises(Mo1BraggError):
mock_bragg.stage()
assert mock_check_scan_msg.call_count == 1
assert mock_load_scan_metadata.call_count == 1
# Test simple XAS scan
mock_bragg.scaninfo.scan_msg.content["info"].update({"scan_name": "xas_simple_scan"})
@pytest.mark.parametrize(
"msg",
[
ScanQueueMessage(
scan_type="monitor_scan",
parameter={
"args": {},
"kwargs": {"device": "mo1_bragg", "start": 0, "stop": 10, "relative": True},
"num_points": 100,
},
queue="primary",
metadata={"RID": "test1234"},
),
ScanQueueMessage(
scan_type="xas_simple_scan",
parameter={
"args": {},
"kwargs": {
"motor": "mo1_bragg",
"start": 0,
"stop": 10,
"scan_time": 1,
"scan_duration": 10,
},
"num_points": 100,
},
queue="primary",
metadata={"RID": "test1234"},
),
ScanQueueMessage(
scan_type="xas_simple_scan_with_xrd",
parameter={
"args": {},
"kwargs": {
"motor": "mo1_bragg",
"start": 0,
"stop": 10,
"scan_time": 1,
"scan_duration": 10,
"xrd_enable_low": True,
"xrd_enable_high": False,
"num_trigger_low": 1,
"num_trigger_high": 7,
"exp_time_low": 1,
"exp_time_high": 3,
"cycle_low": 1,
"cycle_high": 5,
},
"num_points": 10,
},
queue="primary",
metadata={"RID": "test1234"},
),
],
)
def test_stage(mock_bragg, scan_worker_mock, msg):
"""This test is important to check that the stage method of the device is working correctly.
Changing the kwargs names in the scans is tightly linked to the logic on the device, thus
it is important to check that the stage method is working correctly for the current implementation.
Therefor, this test creates a scaninfo message using the scan.open_scan() method to always check
agains the currently implemented scans vs. the logic on the device"""
# Create a scaninfo message using scans the ScanQueueMessages above, 3 cases of fly scan; for the general case the procedure is not defined yet
worker = scan_worker_mock
scan_server = worker.parent
rb = RequestBlock(msg, assembler=ScanAssembler(parent=scan_server))
with mock.patch.object(worker, "current_instruction_queue_item"):
worker.scan_motors = []
worker.readout_priority = {
"monitored": [],
"baseline": [],
"async": [],
"continuous": [],
"on_request": [],
}
open_scan_msg = list(rb.scan.open_scan())[0]
worker._initialize_scan_info(rb, open_scan_msg, msg.content["parameter"].get("num_points"))
scan_status_msg = ScanStatusMessage(
scan_id="test1234", status="closed", info=worker.current_scan_info, metadata={}
)
mock_bragg.scaninfo.scan_msg = scan_status_msg
# Ensure that ScanControlLoadMessage is set to SUCCESS
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
with (
mock.patch.object(mock_bragg, "set_xas_settings") as mock_xas_settings,
mock.patch.object(mock_bragg, "set_xrd_settings") as mock_xrd_settings,
mock.patch.object(
mock_bragg, "set_scan_control_settings"
) as mock_set_scan_control_settings,
mock.patch.object(mock_bragg.scaninfo, "load_scan_metadata") as mock_load_scan_metadata,
mock.patch.object(mock_bragg, "_check_scan_msg") as mock_check_scan_msg,
mock.patch.object(mock_bragg, "on_unstage"),
):
mock_bragg.stage()
assert mock_xas_settings.call_args == mock.call(
low=msg.content["info"]["kwargs"]["start"],
high=msg.content["info"]["kwargs"]["stop"],
scan_time=msg.content["info"]["kwargs"]["scan_time"],
)
assert mock_xrd_settings.call_args == mock.call(
enable_low=False,
enable_high=False,
num_trigger_low=0,
num_trigger_high=0,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
cycle_high=0,
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.SIMPLE,
scan_duration=msg.content["info"]["kwargs"]["scan_duration"],
)
mock_bragg.scaninfo.scan_msg.content["info"].update(
{"scan_name": "xas_simple_scan_with_xrd"}
)
# Unstage mock_bragg to reset _staged
mock_bragg._staged= ophyd.Staged.no
time.sleep(1)
# Test simple XAS scan with XRD
mock_bragg.stage()
assert mock_xas_settings.call_args == mock.call(
low=msg.content["info"]["kwargs"]["start"],
high=msg.content["info"]["kwargs"]["stop"],
scan_time=msg.content["info"]["kwargs"]["scan_time"],
)
assert mock_xrd_settings.call_args == mock.call(
enable_low=msg.content["info"]["kwargs"]["xrd_enable_low"],
enable_high=msg.content["info"]["kwargs"]["xrd_enable_high"],
num_trigger_low=msg.content["info"]["kwargs"]["num_trigger_low"],
num_trigger_high=msg.content["info"]["kwargs"]["num_trigger_high"],
exp_time_low=msg.content["info"]["kwargs"]["exp_time_low"],
exp_time_high=msg.content["info"]["kwargs"]["exp_time_high"],
cycle_low=msg.content["info"]["kwargs"]["cycle_low"],
cycle_high=msg.content["info"]["kwargs"]["cycle_high"],
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.SIMPLE,
scan_duration=msg.content["info"]["kwargs"]["scan_duration"],
)
# Test redundant staging
with pytest.raises(ophyd.utils.errors.RedundantStaging):
mock_bragg.stage()
scan_name = scan_status_msg.content["info"].get("scan_name", "")
# Chek the not implemented fly scan first, should raise Mo1BraggError
if scan_name not in ["xas_simple_scan", "xas_simple_scan_with_xrd"]:
with pytest.raises(Mo1BraggError):
mock_bragg.stage()
assert mock_check_scan_msg.call_count == 1
assert mock_load_scan_metadata.call_count == 1
else:
with (
mock.patch.object(mock_bragg, "set_xas_settings") as mock_xas_settings,
mock.patch.object(mock_bragg, "set_xrd_settings") as mock_xrd_settings,
mock.patch.object(
mock_bragg, "set_scan_control_settings"
) as mock_set_scan_control_settings,
):
# Check xas_simple_scan
if scan_name == "xas_simple_scan":
mock_bragg.stage()
assert mock_xas_settings.call_args == mock.call(
low=scan_status_msg.content["info"]["kwargs"]["start"],
high=scan_status_msg.content["info"]["kwargs"]["stop"],
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
)
assert mock_xrd_settings.call_args == mock.call(
enable_low=False,
enable_high=False,
num_trigger_low=0,
num_trigger_high=0,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
cycle_high=0,
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.SIMPLE,
scan_duration=scan_status_msg.content["info"]["kwargs"][
"scan_duration"
],
)
# Check xas_simple_scan_with_xrd
elif scan_name == "xas_simple_scan_with_xrd":
mock_bragg.stage()
assert mock_xas_settings.call_args == mock.call(
low=scan_status_msg.content["info"]["kwargs"]["start"],
high=scan_status_msg.content["info"]["kwargs"]["stop"],
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
)
assert mock_xrd_settings.call_args == mock.call(
enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"],
enable_high=scan_status_msg.content["info"]["kwargs"][
"xrd_enable_high"
],
num_trigger_low=scan_status_msg.content["info"]["kwargs"][
"num_trigger_low"
],
num_trigger_high=scan_status_msg.content["info"]["kwargs"][
"num_trigger_high"
],
exp_time_low=scan_status_msg.content["info"]["kwargs"]["exp_time_low"],
exp_time_high=scan_status_msg.content["info"]["kwargs"][
"exp_time_high"
],
cycle_low=scan_status_msg.content["info"]["kwargs"]["cycle_low"],
cycle_high=scan_status_msg.content["info"]["kwargs"]["cycle_high"],
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.SIMPLE,
scan_duration=scan_status_msg.content["info"]["kwargs"][
"scan_duration"
],
)