fix: improve kickoff/complete logic, add test for complete
This commit is contained in:
@@ -491,18 +491,26 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
|
||||
def kickoff(self):
|
||||
"""Kickoff the device, called from BEC."""
|
||||
status = DeviceStatus(self)
|
||||
# TODO - tobe discussed and decided with Debye team
|
||||
# There are 2 possible ways to start the scan, either infinit or with a timer.
|
||||
# If the scan_duration is less than 0.1s, the scan will be started infinite
|
||||
# This logic should move in future to the controller, but for now it is implemented here
|
||||
# It is necessary since the current NIDAQ implementation relies on it
|
||||
timeout = 3
|
||||
scan_duration = self.scan_control.scan_duration.get()
|
||||
if scan_duration < 0.1:
|
||||
self.scan_control.scan_start_infinite.put(1)
|
||||
return
|
||||
self.scan_control.scan_start_timer.put(1)
|
||||
status.set_finished()
|
||||
start_func = (
|
||||
self.scan_control.scan_start_infinite.put
|
||||
if scan_duration < 0.1
|
||||
else self.scan_control.scan_start_timer.put
|
||||
)
|
||||
start_func(1)
|
||||
# TODO check if sleep is needed
|
||||
time.sleep(0.2)
|
||||
status = self.wait_with_status(
|
||||
signal_conditions=[(self.scan_control.scan_done.get, ScanControlScanStatus.RUNNING)],
|
||||
timeout=timeout,
|
||||
check_stopped=True,
|
||||
)
|
||||
return status
|
||||
|
||||
def stage(self) -> list[object]:
|
||||
@@ -542,17 +550,6 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
Actions to be executed when the device is completed.
|
||||
Implement for instance checks if the acquisition was successful.
|
||||
"""
|
||||
timeout = 3
|
||||
# Check first that scan started with maximum of 3 seconds timeout
|
||||
# TODO this could be problematic with very fast scans, a fully reliable signal from IOC would be better
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=[(self.scan_control.scan_done.get, ScanControlScanStatus.RUNNING)],
|
||||
timeout=timeout,
|
||||
check_stopped=True,
|
||||
):
|
||||
raise TimeoutError(
|
||||
f"Scan did not start after timeout of {timeout} with state {ScanControlScanStatus(self.scan_control.scan_done.get())} "
|
||||
)
|
||||
# Create DeviceStatus object with hook to finishing the scan.
|
||||
status = self.wait_with_status(
|
||||
signal_conditions=[(self.scan_control.scan_done.get, ScanControlScanStatus.FINISHED)],
|
||||
|
||||
@@ -10,7 +10,7 @@ from ophyd.utils import LimitError
|
||||
from ophyd_devices.tests.utils import MockPV
|
||||
|
||||
# from bec_server.device_server.tests.utils import DMMock
|
||||
from debye_bec.devices.mo1_bragg import Mo1Bragg, MoveType, ScanControlMode
|
||||
from debye_bec.devices.mo1_bragg import Mo1Bragg, MoveType, ScanControlMode, ScanControlScanStatus
|
||||
|
||||
# TODO move this function to ophyd_devices, it is duplicated in csaxs_bec and needed for other pluging repositories
|
||||
from debye_bec.devices.test_utils.utils import patch_dual_pvs
|
||||
@@ -148,3 +148,25 @@ def test_kickoff_scan(mock_bragg):
|
||||
dev.setup_simple_xas_scan(low=0.5, high=1, scan_time=0.1, mode=0, scan_duration=0)
|
||||
dev.kickoff()
|
||||
assert dev.scan_control.scan_start_infinite.get() == 1
|
||||
|
||||
|
||||
def test_complete(mock_bragg):
|
||||
dev = mock_bragg
|
||||
dev.scan_control.scan_done._read_pv.mock_data = ScanControlScanStatus.RUNNING
|
||||
# Normal case
|
||||
status = dev.complete()
|
||||
assert status.done is False
|
||||
assert status.success is False
|
||||
dev.scan_control.scan_done._read_pv.mock_data = ScanControlScanStatus.FINISHED
|
||||
time.sleep(0.2)
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
|
||||
# Stop called case
|
||||
status = dev.complete()
|
||||
assert status.done is False
|
||||
assert status.success is False
|
||||
dev.stop()
|
||||
time.sleep(0.2)
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
|
||||
Reference in New Issue
Block a user