refactor: review scan draft, and refactor ophyd positioner, add tests for initial scan draft

This commit is contained in:
2024-07-17 13:04:36 +02:00
parent a693ac7778
commit 422b789892
4 changed files with 186 additions and 14 deletions
+34 -2
View File
@@ -184,6 +184,7 @@ class Mo1BraggScanControl(Device):
EpicsSignalRO, suffix="scan_time_left_RBV", kind="normal", auto_monitor=True
)
scan_done = Cpt(EpicsSignalRO, suffix="scan_done_RBV", kind="normal", auto_monitor=True)
scan_val_reset = Cpt(EpicsSignal, suffix="scan_val_reset", kind="config")
class Mo1Bragg(Device, PositionerBase):
@@ -228,6 +229,7 @@ class Mo1Bragg(Device, PositionerBase):
# Execute motion
move_abs = Cpt(EpicsSignal, suffix="move_abs", kind="config")
move_stop = Cpt(EpicsSignal, suffix="move_stop", kind="config")
motor_is_moving = Cpt(
EpicsSignalRO, suffix="move_abs_done_RBV", kind="normal", auto_monitor=True
)
@@ -308,8 +310,9 @@ class Mo1Bragg(Device, PositionerBase):
def stop(self, *, success=False) -> None:
"""Stop any motion on the positioner"""
# TODO Add PV call to interrupt the motion
# Stop the motion
self.move_stop.put(1)
# Move thread is stopped too
self._stopped = True
if self._move_thread is not None:
self._move_thread.join()
@@ -529,6 +532,33 @@ class Mo1Bragg(Device, PositionerBase):
def on_stage(self) -> None:
"""Actions to be executed when the device is staged."""
if not self.scaninfo.scan_type == "xas":
return
# Could become redudant if we use this in unstage
self.scan_val_reset.put(1)
# Brief sleep to allow for PV to update (FYI, reliable set makes this redundant)
time.sleep(0.2)
if not self.wait_for_signals(
signal_conditions=[
(self.scan_control.scan_status.get, ScanControlScanStatus.WAITING_FOR_PARAMETER)
],
timeout=1.5,
check_stopped=True,
):
raise TimeoutError(
f"Timeout while waiting for scan status to get ready. State: {self.scan_control.scan_status.get()}"
)
# Add here the logic to setup different type of scans
low, high = self.scaninfo.positions
scan_time = self.scaninfo.scan_time
scan_duration = self.scaninfo.scan_duration
self.setup_simple_xas_scan(
low=low,
high=high,
scan_time=scan_time,
mode=ScanControlMode.SIMPLE,
scan_duration=scan_duration,
)
def complete(self) -> None:
"""Complete the acquisition, called from BEC.
@@ -579,6 +609,8 @@ class Mo1Bragg(Device, PositionerBase):
def on_unstage(self) -> None:
"""Actions to be executed when the device is unstaged."""
# Reset setting of paramters after scan to be ready and clean for the next scan
self.scan_val_reset.put(1)
def check_scan_id(self) -> None:
"""Checks if scan_id has changed and set stopped flagged to True if it has."""
+1
View File
@@ -0,0 +1 @@
from .mono_bragg_scans import MonoBraggOscillationScan
+14 -12
View File
@@ -17,15 +17,17 @@ class MonoBraggOscillationScan(AsyncFlyScanBase):
"""
scan_name = "energy_oscillation_scan"
scan_type = "xas"
required_kwargs = []
use_scan_progress_report = False
pre_move = False
def __init__(
self,
lower_energy: float,
upper_energy: float,
frequency: float,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
motor: DeviceBase = None,
**kwargs,
):
@@ -33,10 +35,11 @@ class MonoBraggOscillationScan(AsyncFlyScanBase):
# TODO If the scan only works with this device, then it could be set as a default value here as it is now.
# FYI This at the moment assumes it to be loaded for this specific name. Should be solved differently in future.
# Needs to be still its name
self.motor = motor if not None else self.device_manager.devices["mo1_bragg"].name
self.lower_energy = lower_energy
self.upper_energy = upper_energy
self.frequency = frequency
self.motor = motor if motor is not None else self.device_manager.devices["mo1_bragg"].name
self.start = start
self.stop = stop
self.scan_time = scan_time
self.scan_duration = scan_duration
self.device_move_request_id = str(uuid.uuid4())
def prepare_positions(self):
@@ -44,18 +47,17 @@ class MonoBraggOscillationScan(AsyncFlyScanBase):
Use here only start and end energy defining the range for the scan.
"""
self.positions = np.array([self.lower_energy, self.upper_energy], dtype=float)
self.positions = np.array([self.start, self.stop], dtype=float)
self.num_pos = None
yield None
def pre_scan(self):
"""Pre Scan action. Happens just before the scan.
Ensure the motor movetype is set to energy, then check limits for start/end energy.
"""
status = yield from self.stubs.send_rpc_and_wait(
self.motor.name, "move_type.set", value=MoveType.ENERGY
)
status.wait()
yield from self.stubs.send_rpc_and_wait(self.motor, "move_type.set", "energy")
self._check_limits()
# Ensure parent class pre_scan actions to be called.
super().pre_scan()
+137
View File
@@ -0,0 +1,137 @@
from unittest import mock
from bec_lib.messages import DeviceInstructionMessage
from bec_server.device_server.tests.utils import DMMock
from debye_bec.scans import MonoBraggOscillationScan
def test_mono_bragg_oscillation_scan():
# create a fake device manager that we can use to add devices
device_manager = DMMock()
device_manager.add_device("mo1_bragg")
request = MonoBraggOscillationScan(
start=0, stop=5, scan_time=1, scan_duration=10, device_manager=device_manager
)
request.metadata["RID"] = "my_test_request_id"
with (
mock.patch.object(request.stubs, "get_req_status", side_effect=[False, True]),
mock.patch.object(request.stubs, "_get_from_rpc", return_value=True),
):
reference_commands = list(request.run())
for cmd in reference_commands:
if not cmd:
continue
if "RID" in cmd.metadata:
cmd.metadata["RID"] = "my_test_request_id"
if "rpc_id" in cmd.parameter:
cmd.parameter["rpc_id"] = "my_test_rpc_id"
assert reference_commands == [
None,
None,
None,
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 0, "RID": "my_test_request_id"},
device=None,
action="open_scan",
parameter={
"scan_motors": [],
"readout_priority": {
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
},
"num_points": None,
"positions": [0.0, 5.0],
"scan_name": "energy_oscillation_scan",
"scan_type": "xas",
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 1, "RID": "my_test_request_id"},
device=None,
action="stage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "baseline", "DIID": 2, "RID": "my_test_request_id"},
device=None,
action="baseline_reading",
parameter={},
),
DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"DIID": 3,
"RID": "my_test_request_id",
"response": True,
},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 4, "RID": "my_test_request_id"},
device="mo1_bragg",
action="kickoff",
parameter={"configure": {}, "wait_group": "kickoff"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 5, "RID": "my_test_request_id"},
device="mo1_bragg",
action="complete",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 6, "RID": "my_test_request_id"},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 7, "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 8, "RID": "my_test_request_id"},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 9, "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 10, "RID": "my_test_request_id"},
device=[],
action="complete",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 11, "RID": "my_test_request_id"},
device=None,
action="unstage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 12, "RID": "my_test_request_id"},
device=None,
action="close_scan",
parameter={},
),
]