test: add tests and review scan and device
This commit is contained in:
@@ -9,10 +9,10 @@ put_complete=True is used to ensure that the action is executed completely. This
|
||||
to allow for a more stable execution of the action."""
|
||||
|
||||
import enum
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
import re
|
||||
from typing import Literal
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
@@ -126,7 +126,7 @@ class Mo1BraggScanSettings(Device):
|
||||
"""Mo1 Bragg PVs to set the scan setttings"""
|
||||
|
||||
# XRD settings
|
||||
xrd_select_ref_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_select_ref_ENUM", kind="confit")
|
||||
xrd_select_ref_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_select_ref_ENUM", kind="config")
|
||||
xrd_enable_hi_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_enable_hi_ENUM", kind="config")
|
||||
|
||||
xrd_time_hi = Cpt(EpicsSignalWithRBV, suffix="xrd_time_hi", kind="config")
|
||||
@@ -406,7 +406,7 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
# Start motion
|
||||
self.move_abs.put(1)
|
||||
# Currently sleep is needed due to delay in updates on PVs, maybe time can be reduced
|
||||
time.sleep(0.5)
|
||||
time.sleep(0.25)
|
||||
while self.motor_is_moving.get() == 0:
|
||||
# TODO check if the _run_subs is needed since we have an auto_monitor on the readback PV
|
||||
# However, since the move_type can change, it might be necessary to have it here
|
||||
@@ -607,7 +607,6 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
# Wait for reset to be done
|
||||
# TODO check if this sleep can really be removed
|
||||
# time.sleep(1)
|
||||
self._get_scaninfo_parameters()
|
||||
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING)],
|
||||
@@ -618,13 +617,18 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
f"Timeout after {self.timeout_for_pvwait} while waiting for scan status, current state: {self.scan_control.scan_status.get()}"
|
||||
)
|
||||
# Add here the logic to setup different type of scans
|
||||
self.setup_simple_xas_scan(
|
||||
low=self.start,
|
||||
high=self.end,
|
||||
scan_time=self.scan_time,
|
||||
mode=ScanControlMode.SIMPLE,
|
||||
scan_duration=self.scan_duration,
|
||||
)
|
||||
scan_name = self.scaninfo.scan_msg.content["info"].get("scan_name", "")
|
||||
if scan_name == "energy_oscillation_scan":
|
||||
self._get_scaninfo_parameters()
|
||||
self.setup_simple_xas_scan(
|
||||
low=self.start,
|
||||
high=self.end,
|
||||
scan_time=self.scan_time,
|
||||
mode=ScanControlMode.SIMPLE,
|
||||
scan_duration=self.scan_duration,
|
||||
)
|
||||
else:
|
||||
raise Mo1BraggError(f"Scan mode {scan_name} not implemented for device {self.name}")
|
||||
# Wait for params to be checked from controller
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.SUCCESS)],
|
||||
@@ -664,9 +668,8 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
list(object): list of objects that were unstaged
|
||||
"""
|
||||
self.check_scan_id()
|
||||
if self._stopped is True:
|
||||
return super().unstage()
|
||||
self.on_unstage()
|
||||
# TODO should this reset on unstage?
|
||||
self._stopped = False
|
||||
return super().unstage()
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ class MonoBraggOscillationScan(AsyncFlyScanBase):
|
||||
"""
|
||||
|
||||
scan_name = "energy_oscillation_scan"
|
||||
scan_type = "xas"
|
||||
scan_type = "fly"
|
||||
scan_report_hint = "device_progress"
|
||||
required_kwargs = []
|
||||
use_scan_progress_report = False
|
||||
@@ -33,15 +33,14 @@ class MonoBraggOscillationScan(AsyncFlyScanBase):
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
# TODO If the scan only works with this device, then it could be set as a default value here as it is now.
|
||||
# FYI This at the moment assumes it to be loaded for this specific name. Should be solved differently in future.
|
||||
# Needs to be still its name
|
||||
# TODO How to we deal with scans written for a specific device/motor?
|
||||
self.motor = motor if motor is not None else self.device_manager.devices["mo1_bragg"].name
|
||||
self.start = start
|
||||
self.stop = stop
|
||||
self.scan_time = scan_time
|
||||
self.scan_duration = scan_duration
|
||||
self.device_move_request_id = str(uuid.uuid4())
|
||||
self.primary_readout_cycle = 1 # Sleep time in seconds between readout cycles
|
||||
|
||||
def prepare_positions(self):
|
||||
"""Prepare the positions for the scan.
|
||||
@@ -67,9 +66,7 @@ class MonoBraggOscillationScan(AsyncFlyScanBase):
|
||||
"""
|
||||
Return the instructions for the scan report.
|
||||
"""
|
||||
yield from self.stubs.scan_report_instruction({"device_progress" : [self.motor]})
|
||||
# TODO implement which instructions the scan should listen to for the table printout if wanted.
|
||||
# yield None
|
||||
yield from self.stubs.scan_report_instruction({"device_progress": [self.motor]})
|
||||
|
||||
def scan_core(self):
|
||||
"""Run the scan core.
|
||||
@@ -80,29 +77,24 @@ class MonoBraggOscillationScan(AsyncFlyScanBase):
|
||||
yield from self.stubs.kickoff(device=self.motor)
|
||||
yield from self.stubs.complete(device=self.motor)
|
||||
|
||||
# This is the number of the instruction for the complete call.
|
||||
# Therefore, the complete method should implement the creation of a DeviceStatus object
|
||||
# that resolves once the scan is finished.
|
||||
# Get the target DIID (instruction number) for the stubs.complete call
|
||||
target_diid = self.DIID - 1
|
||||
|
||||
while True:
|
||||
# Readout monitored devices
|
||||
yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary")
|
||||
# Check if complete call on Mo1 Bragg has been finished
|
||||
status = self.stubs.get_req_status(
|
||||
device=self.motor, RID=self.metadata["RID"], DIID=target_diid
|
||||
)
|
||||
if status:
|
||||
break
|
||||
# Readout with 1 Hz frequency
|
||||
time.sleep(1)
|
||||
time.sleep(self.primary_readout_cycle)
|
||||
self.point_id += 1
|
||||
|
||||
def finalize(self):
|
||||
"""Set the number of points for the scan, base on the point_id which incrementally increases in scan_core."""
|
||||
# TODO This will call complete again, but it is already called on the device in scan_core, but not on all other devices.
|
||||
# It would also return to the start position per default
|
||||
# yield from super().finalize()
|
||||
# Alternatively, the complete method could be called here for all devices except the flyer.
|
||||
# Call complete on all devices except the motor/Mo1Bragg
|
||||
yield from self.stubs.complete(
|
||||
device=[dev for dev in self.device_manager.devices.keys() if dev != self.motor]
|
||||
)
|
||||
|
||||
@@ -10,7 +10,14 @@ from ophyd.utils import LimitError
|
||||
from ophyd_devices.tests.utils import MockPV
|
||||
|
||||
# from bec_server.device_server.tests.utils import DMMock
|
||||
from debye_bec.devices.mo1_bragg import Mo1Bragg, MoveType, ScanControlMode, ScanControlScanStatus
|
||||
from debye_bec.devices.mo1_bragg import (
|
||||
Mo1Bragg,
|
||||
Mo1BraggError,
|
||||
MoveType,
|
||||
ScanControlLoadMessage,
|
||||
ScanControlMode,
|
||||
ScanControlScanStatus,
|
||||
)
|
||||
|
||||
# TODO move this function to ophyd_devices, it is duplicated in csaxs_bec and needed for other pluging repositories
|
||||
from debye_bec.devices.test_utils.utils import patch_dual_pvs
|
||||
@@ -20,7 +27,10 @@ from debye_bec.devices.test_utils.utils import patch_dual_pvs
|
||||
def mock_bragg():
|
||||
name = "bragg"
|
||||
prefix = "X01DA-OP-MO1:BRAGG:"
|
||||
with mock.patch.object(ophyd, "cl") as mock_cl:
|
||||
with (
|
||||
mock.patch.object(ophyd, "cl") as mock_cl,
|
||||
mock.patch("debye_bec.devices.mo1_bragg.Mo1Bragg", "_on_init"),
|
||||
):
|
||||
mock_cl.get_pv = MockPV
|
||||
mock_cl.thread_class = threading.Thread
|
||||
dev = Mo1Bragg(name=name, prefix=prefix)
|
||||
@@ -34,7 +44,7 @@ def test_init(mock_bragg):
|
||||
assert dev.prefix == "X01DA-OP-MO1:BRAGG:"
|
||||
assert dev.move_type.get() == MoveType.ENERGY
|
||||
assert dev.crystal.offset_si111._read_pvname == "X01DA-OP-MO1:BRAGG:offset_si111_RBV"
|
||||
assert dev.move_abs._read_pvname == "X01DA-OP-MO1:BRAGG:move_abs"
|
||||
assert dev.move_abs._read_pvname == "X01DA-OP-MO1:BRAGG:move_abs.PROC"
|
||||
|
||||
|
||||
def test_check_value(mock_bragg):
|
||||
@@ -72,7 +82,7 @@ def test_move_succeeds(mock_bragg):
|
||||
# Move succeeds
|
||||
with mock.patch.object(dev.motor_is_moving._read_pv, "mock_data", side_effect=[0, 1]):
|
||||
status = dev.move(0.5)
|
||||
# Sleep needed for while loop in _move_and_finish
|
||||
# Sleep needed to allow thread to resolive in _move_and_finish, i.e. and the 0.25s sleep inside the function
|
||||
time.sleep(0.5)
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
@@ -128,12 +138,6 @@ def test_setup_simple_xas_scan(mock_bragg):
|
||||
assert dev.scan_control.scan_mode_enum.get() == ScanControlMode.SIMPLE
|
||||
assert dev.scan_control.scan_duration.get() == 10
|
||||
|
||||
dev.scan_control.scan_status._read_pv.mock_data = 0
|
||||
with pytest.raises(TimeoutError):
|
||||
dev.setup_simple_xas_scan(
|
||||
low=0.5, high=1, scan_time=0.1, mode=0, scan_duration=10, timeout=0.5
|
||||
)
|
||||
|
||||
|
||||
def test_kickoff_scan(mock_bragg):
|
||||
dev = mock_bragg
|
||||
@@ -152,21 +156,87 @@ def test_kickoff_scan(mock_bragg):
|
||||
|
||||
def test_complete(mock_bragg):
|
||||
dev = mock_bragg
|
||||
dev.scan_control.scan_done._read_pv.mock_data = ScanControlScanStatus.RUNNING
|
||||
dev.scan_control.scan_done._read_pv.mock_data = 0
|
||||
# Normal case
|
||||
status = dev.complete()
|
||||
assert status.done is False
|
||||
assert status.success is False
|
||||
dev.scan_control.scan_done._read_pv.mock_data = ScanControlScanStatus.FINISHED
|
||||
dev.scan_control.scan_done._read_pv.mock_data = 1
|
||||
time.sleep(0.2)
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
|
||||
# Stop called case
|
||||
dev.scan_control.scan_done._read_pv.mock_data = 0
|
||||
status = dev.complete()
|
||||
assert status.done is False
|
||||
assert status.success is False
|
||||
dev.stop()
|
||||
time.sleep(0.2)
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
assert status.success is False
|
||||
|
||||
|
||||
def test_unstage(mock_bragg):
|
||||
mock_bragg.scan_control.scan_val_reset._read_pv.mock_data = 0
|
||||
mock_bragg.unstage()
|
||||
assert mock_bragg.scan_control.scan_val_reset.get() == 1
|
||||
|
||||
# Ensure PV is also called when stopped
|
||||
mock_bragg._stopped = True
|
||||
mock_bragg.scan_control.scan_val_reset._read_pv.mock_data = 0
|
||||
mock_bragg.unstage()
|
||||
assert mock_bragg.scan_control.scan_val_reset.get() == 1
|
||||
assert mock_bragg._stopped == False
|
||||
|
||||
|
||||
def test_stage(mock_bragg):
|
||||
# Test unknown scan type first
|
||||
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING
|
||||
mock_bragg.scaninfo.scan_msg.content["info"].update({"scan_name": "unknown_fly_scan"})
|
||||
with pytest.raises(Mo1BraggError):
|
||||
mock_bragg.stage()
|
||||
|
||||
# Test simple XAS scan
|
||||
mock_bragg.scaninfo.scan_msg.content["info"].update({"scan_name": "energy_oscillation_scan"})
|
||||
start = 0
|
||||
end = 5
|
||||
scan_time = 1
|
||||
scan_duration = 10
|
||||
|
||||
# Ensure side effect sets parameters
|
||||
def _get_scaninfo():
|
||||
mock_bragg.start = 0
|
||||
mock_bragg.end = 5
|
||||
mock_bragg.scan_time = 1
|
||||
mock_bragg.scan_duration = 10
|
||||
|
||||
with (
|
||||
mock.patch.object(mock_bragg, "_get_scaninfo_parameters", side_effect=_get_scaninfo),
|
||||
mock.patch.object(mock_bragg, "setup_simple_xas_scan") as mock_setup_simple_xas,
|
||||
mock.patch.object(mock_bragg.scaninfo, "load_scan_metadata") as mock_load_scan_metadata,
|
||||
mock.patch.object(
|
||||
mock_bragg.scan_control.scan_msg,
|
||||
"get",
|
||||
side_effect=[
|
||||
ScanControlLoadMessage.PENDING,
|
||||
ScanControlLoadMessage.PENDING,
|
||||
ScanControlLoadMessage.SUCCESS,
|
||||
],
|
||||
),
|
||||
):
|
||||
mock_bragg.stage()
|
||||
assert mock_load_scan_metadata.call_count == 1
|
||||
assert mock_setup_simple_xas.call_args == mock.call(
|
||||
low=start,
|
||||
high=end,
|
||||
scan_time=scan_time,
|
||||
mode=ScanControlMode.SIMPLE,
|
||||
scan_duration=scan_duration,
|
||||
)
|
||||
|
||||
with mock.patch.object(mock_bragg, "on_stage") as mock_unstage:
|
||||
mock_bragg._staged = ophyd.Staged.yes
|
||||
with pytest.raises(ophyd.utils.errors.RedundantStaging):
|
||||
mock_bragg.stage()
|
||||
assert mock_unstage.call_count == 0
|
||||
|
||||
@@ -30,12 +30,17 @@ def test_mono_bragg_oscillation_scan():
|
||||
cmd.parameter["rpc_id"] = "my_test_rpc_id"
|
||||
|
||||
assert reference_commands == [
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "DIID": 0, "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="scan_report_instruction",
|
||||
parameter={"device_progress": ["mo1_bragg"]},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "DIID": 1, "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="open_scan",
|
||||
parameter={
|
||||
"scan_motors": [],
|
||||
@@ -48,17 +53,17 @@ def test_mono_bragg_oscillation_scan():
|
||||
"num_points": None,
|
||||
"positions": [0.0, 5.0],
|
||||
"scan_name": "energy_oscillation_scan",
|
||||
"scan_type": "xas",
|
||||
"scan_type": "fly",
|
||||
},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "DIID": 1, "RID": "my_test_request_id"},
|
||||
metadata={"readout_priority": "monitored", "DIID": 2, "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="stage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "baseline", "DIID": 2, "RID": "my_test_request_id"},
|
||||
metadata={"readout_priority": "baseline", "DIID": 3, "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="baseline_reading",
|
||||
parameter={},
|
||||
@@ -66,7 +71,7 @@ def test_mono_bragg_oscillation_scan():
|
||||
DeviceInstructionMessage(
|
||||
metadata={
|
||||
"readout_priority": "monitored",
|
||||
"DIID": 3,
|
||||
"DIID": 4,
|
||||
"RID": "my_test_request_id",
|
||||
"response": True,
|
||||
},
|
||||
@@ -81,55 +86,55 @@ def test_mono_bragg_oscillation_scan():
|
||||
},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "DIID": 4, "RID": "my_test_request_id"},
|
||||
metadata={"readout_priority": "monitored", "DIID": 5, "RID": "my_test_request_id"},
|
||||
device="mo1_bragg",
|
||||
action="kickoff",
|
||||
parameter={"configure": {}, "wait_group": "kickoff"},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "DIID": 5, "RID": "my_test_request_id"},
|
||||
metadata={"readout_priority": "monitored", "DIID": 6, "RID": "my_test_request_id"},
|
||||
device="mo1_bragg",
|
||||
action="complete",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "DIID": 6, "RID": "my_test_request_id"},
|
||||
metadata={"readout_priority": "monitored", "DIID": 7, "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="read",
|
||||
parameter={"group": "primary", "wait_group": "readout_primary"},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "DIID": 7, "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="wait",
|
||||
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "DIID": 8, "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="wait",
|
||||
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "DIID": 9, "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="read",
|
||||
parameter={"group": "primary", "wait_group": "readout_primary"},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "DIID": 9, "RID": "my_test_request_id"},
|
||||
metadata={"readout_priority": "monitored", "DIID": 10, "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="wait",
|
||||
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "DIID": 10, "RID": "my_test_request_id"},
|
||||
metadata={"readout_priority": "monitored", "DIID": 11, "RID": "my_test_request_id"},
|
||||
device=[],
|
||||
action="complete",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "DIID": 11, "RID": "my_test_request_id"},
|
||||
metadata={"readout_priority": "monitored", "DIID": 12, "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="unstage",
|
||||
parameter={},
|
||||
),
|
||||
DeviceInstructionMessage(
|
||||
metadata={"readout_priority": "monitored", "DIID": 12, "RID": "my_test_request_id"},
|
||||
metadata={"readout_priority": "monitored", "DIID": 13, "RID": "my_test_request_id"},
|
||||
device=None,
|
||||
action="close_scan",
|
||||
parameter={},
|
||||
|
||||
Reference in New Issue
Block a user