test(falcon): fix test after falcon refactoring
This commit is contained in:
@@ -80,6 +80,7 @@ class FalconcSAXS(PSIDeviceBase, FalconControl):
|
||||
"""
|
||||
Setup Falcon Sitoro detector default parameters once signals are connected
|
||||
"""
|
||||
self.on_stop()
|
||||
self._initialize_detector()
|
||||
self._initialize_detector_backend()
|
||||
self.set_trigger(
|
||||
@@ -109,11 +110,6 @@ class FalconcSAXS(PSIDeviceBase, FalconControl):
|
||||
|
||||
def _initialize_detector(self) -> None:
|
||||
"""Initialize Falcon detector"""
|
||||
self.stop_detector()
|
||||
self.stop_detector_backend()
|
||||
self.set_trigger(
|
||||
mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0
|
||||
)
|
||||
|
||||
# 1 Realtime
|
||||
self.preset_mode.put(1)
|
||||
@@ -141,6 +137,8 @@ class FalconcSAXS(PSIDeviceBase, FalconControl):
|
||||
|
||||
# Size of the queue for the number of spectra allowed in the buffer. If too small, data is lost at high throughput
|
||||
self.hdf5.queue_size.put(self._queue_size)
|
||||
self.hdf5.file_template.put("%s%s")
|
||||
self.hdf5.file_write_mode.put(2)
|
||||
|
||||
# Set nd_array mode to 1: This means segmentation into Spectra within EPICS, 1 is activate, 0 is deactivate
|
||||
self.nd_array_mode.put(1)
|
||||
@@ -154,7 +152,7 @@ class FalconcSAXS(PSIDeviceBase, FalconControl):
|
||||
num_points = self.scan_info.msg.num_points
|
||||
frames_per_trigger = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
|
||||
overall_frames = int(num_points * frames_per_trigger)
|
||||
exp_time = self.scan_info.exp_time
|
||||
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
|
||||
self._full_path = get_full_path(self.scan_info.msg, self.name)
|
||||
|
||||
# Check that exposure time is larger than readout time
|
||||
@@ -170,16 +168,15 @@ class FalconcSAXS(PSIDeviceBase, FalconControl):
|
||||
# TODO: Add h5_entries for linking the Falcon NEXUS entries with the master file
|
||||
self.file_event.put(file_path=self._full_path, done=False, successful=False)
|
||||
|
||||
self.preset_real.put(exp_time)
|
||||
self.preset_real_time.put(exp_time)
|
||||
self.pixels_per_run.put(overall_frames)
|
||||
|
||||
# Prepare detector backend PVs
|
||||
file_path, file_name = os.path.split(self._full_path)
|
||||
self.hdf5.file_path.put(file_path)
|
||||
self.hdf5.file_name.put(file_name)
|
||||
self.hdf5.file_template.put("%s%s")
|
||||
|
||||
self.hdf5.num_capture.put(overall_frames)
|
||||
self.hdf5.file_write_mode.put(2)
|
||||
# Reset spectrum counter in filewriter, used for indexing & identifying missing triggers
|
||||
self.hdf5.array_counter.put(0)
|
||||
|
||||
@@ -193,10 +190,10 @@ class FalconcSAXS(PSIDeviceBase, FalconControl):
|
||||
Method for actions just before the scan starts.
|
||||
"""
|
||||
status_camera = CompareStatus(
|
||||
self.cam.acquire_busy, ACQUIRESTATUS.DONE, timeout=self._pv_timeout
|
||||
self.acquire_busy, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout
|
||||
)
|
||||
status_writer = CompareStatus(
|
||||
self.hdf.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout
|
||||
self.hdf5.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout
|
||||
)
|
||||
# Logical combine of statuses
|
||||
status = status_camera & status_writer
|
||||
|
||||
@@ -8,10 +8,17 @@ import ophyd
|
||||
import pytest
|
||||
from bec_lib import messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.file_utils import get_full_path
|
||||
from bec_server.device_server.tests.utils import DMMock
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import DeviceStoppedError
|
||||
from ophyd_devices.tests.utils import patched_device
|
||||
|
||||
from csaxs_bec.devices.epics.falcon_csaxs import FalconcSAXS
|
||||
from csaxs_bec.devices.epics.falcon_csaxs import (
|
||||
ACQUIRESTATUS,
|
||||
FalconcSAXS,
|
||||
MappingSource,
|
||||
TriggerSource,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
@@ -28,264 +35,198 @@ def mock_det() -> Generator[FalconcSAXS, None, None]:
|
||||
_mock_pv_initial_value=0,
|
||||
) as dev:
|
||||
try:
|
||||
for dotted_name, device in dev.walk_subdevices(include_lazy=True):
|
||||
device.stage_sigs = {} # Remove stage signals
|
||||
device.trigger_sigs = {} # Remove trigger signals
|
||||
if hasattr(device, "plugin_type"):
|
||||
device.plugin_type._read_pv.mock_data = device._plugin_type
|
||||
yield dev
|
||||
finally:
|
||||
dev.destroy()
|
||||
|
||||
|
||||
# @pytest.mark.parametrize(
|
||||
# "trigger_source, mapping_source, ignore_gate, pixels_per_buffer, detector_state,"
|
||||
# " expected_exception",
|
||||
# [(1, 1, 0, 20, 0, False), (1, 1, 0, 20, 1, True)],
|
||||
# )
|
||||
# # TODO rewrite this one, write test for init_detector, init_filewriter is tested
|
||||
# def test_init_detector(
|
||||
# mock_det,
|
||||
# trigger_source,
|
||||
# mapping_source,
|
||||
# ignore_gate,
|
||||
# pixels_per_buffer,
|
||||
# detector_state,
|
||||
# expected_exception,
|
||||
# ):
|
||||
# """Test the _init function:
|
||||
|
||||
# This includes testing the functions:
|
||||
# - _init_detector
|
||||
# - _stop_det
|
||||
# - _set_trigger
|
||||
# --> Testing the filewriter is done in test_init_filewriter
|
||||
|
||||
# Validation upon setting the correct PVs
|
||||
|
||||
# """
|
||||
# mock_det.value_pixel_per_buffer = pixels_per_buffer
|
||||
# mock_det.state._read_pv.mock_data = detector_state
|
||||
# if expected_exception:
|
||||
# with pytest.raises(FalconTimeoutError):
|
||||
# mock_det.timeout = 0.1
|
||||
# mock_det.custom_prepare.initialize_detector()
|
||||
# else:
|
||||
# mock_det.custom_prepare.initialize_detector()
|
||||
# assert mock_det.state.get() == detector_state
|
||||
# assert mock_det.collect_mode.get() == mapping_source
|
||||
# assert mock_det.pixel_advance_mode.get() == trigger_source
|
||||
# assert mock_det.ignore_gate.get() == ignore_gate
|
||||
|
||||
# assert mock_det.preset_mode.get() == 1
|
||||
# assert mock_det.erase_all.get() == 1
|
||||
# assert mock_det.input_logic_polarity.get() == 0
|
||||
# assert mock_det.auto_pixels_per_buffer.get() == 0
|
||||
# assert mock_det.pixels_per_buffer.get() == pixels_per_buffer
|
||||
def test_falcon_init(mock_det: FalconcSAXS):
|
||||
"""Test the initialization of the FalconcSAXS device."""
|
||||
assert mock_det._readout_time == mock_det.MIN_READOUT
|
||||
assert mock_det._value_pixel_per_buffer == 20
|
||||
assert mock_det._queue_size == 2000
|
||||
assert mock_det._full_path == ""
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"readout_time, expected_value", [(1e-3, 3e-3), (3e-3, 3e-3), (5e-3, 5e-3), (None, 3e-3)]
|
||||
)
|
||||
def test_update_readout_time(mock_det, readout_time, expected_value):
|
||||
if readout_time is None:
|
||||
mock_det._update_readout_time()
|
||||
assert mock_det._readout_time == expected_value
|
||||
else:
|
||||
mock_det.scan_info.readout_time = readout_time
|
||||
mock_det._update_readout_time()
|
||||
assert mock_det._readout_time == expected_value
|
||||
def test_falcon_on_connected(mock_det: FalconcSAXS):
|
||||
"""Test the on_connected method of the FalconcSAXS device."""
|
||||
falcon = mock_det
|
||||
|
||||
# Set known default values
|
||||
falcon.preset_mode.put(-1)
|
||||
falcon.input_logic_polarity.put(-1)
|
||||
falcon.auto_pixels_per_buffer.put(-1)
|
||||
falcon.hdf5.enable.put(-1)
|
||||
|
||||
with (
|
||||
mock.patch.object(falcon, "on_stop") as mock_on_stop,
|
||||
mock.patch.object(falcon, "set_trigger") as mock_set_trigger,
|
||||
):
|
||||
|
||||
falcon.on_connected()
|
||||
mock_on_stop.assert_called_once()
|
||||
mock_set_trigger.assert_called_once_with(
|
||||
mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0
|
||||
)
|
||||
|
||||
# Detector default PV values
|
||||
assert falcon.preset_mode.get() == "1" # Real Time
|
||||
assert falcon.input_logic_polarity.get() == 0
|
||||
assert falcon.auto_pixels_per_buffer.get() == 0
|
||||
assert falcon.pixels_per_buffer.get() == falcon._value_pixel_per_buffer
|
||||
|
||||
# Backend default PV values
|
||||
assert falcon.hdf5.enable.get() == "1" # Enabled
|
||||
assert falcon.hdf5.xml_file_name.get() == "layout.xml"
|
||||
assert falcon.hdf5.lazy_open.get() == "1" # Enabled
|
||||
assert falcon.hdf5.temp_suffix.get() == ""
|
||||
assert falcon.hdf5.queue_size.get() == falcon._queue_size
|
||||
assert falcon.nd_array_mode.get() == 1
|
||||
assert falcon.hdf5.file_template.get() == "%s%s"
|
||||
assert falcon.hdf5.file_write_mode.get() == 2
|
||||
|
||||
|
||||
def test_initialize_default_parameter(mock_det):
|
||||
with mock.patch.object(
|
||||
mock_det.custom_prepare, "update_readout_time"
|
||||
) as mock_update_readout_time:
|
||||
mock_det.custom_prepare.initialize_default_parameter()
|
||||
assert mock_det.value_pixel_per_buffer == 20
|
||||
mock_update_readout_time.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"scaninfo",
|
||||
[
|
||||
{
|
||||
"eacc": "e12345",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"exp_time": 0.1,
|
||||
"filepath": "test.h5",
|
||||
"scan_id": "123",
|
||||
"mokev": 12.4,
|
||||
}
|
||||
],
|
||||
)
|
||||
def test_stage(mock_det, scaninfo):
|
||||
"""Test the stage function:
|
||||
|
||||
This includes testing _prep_det
|
||||
def test_falcon_on_stage(mock_det: FalconcSAXS):
|
||||
"""
|
||||
with (
|
||||
mock.patch.object(mock_det.custom_prepare, "set_trigger") as mock_set_trigger,
|
||||
mock.patch.object(
|
||||
mock_det.custom_prepare, "prepare_data_backend"
|
||||
) as mock_prep_data_backend,
|
||||
mock.patch.object(
|
||||
mock_det.custom_prepare, "publish_file_location"
|
||||
) as mock_publish_file_location,
|
||||
mock.patch.object(mock_det.custom_prepare, "arm_acquisition") as mock_arm_acquisition,
|
||||
):
|
||||
mock_det.scaninfo.exp_time = scaninfo["exp_time"]
|
||||
mock_det.scaninfo.num_points = scaninfo["num_points"]
|
||||
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
|
||||
mock_det.stage()
|
||||
mock_set_trigger.assert_called_once()
|
||||
assert mock_det.preset_real.get() == scaninfo["exp_time"]
|
||||
assert mock_det.pixels_per_run.get() == int(
|
||||
scaninfo["num_points"] * scaninfo["frames_per_trigger"]
|
||||
)
|
||||
mock_prep_data_backend.assert_called_once()
|
||||
mock_publish_file_location.assert_called_once_with(done=False, successful=False)
|
||||
mock_arm_acquisition.assert_called_once()
|
||||
|
||||
Test the on_stage method of the FalconcSAXS device.
|
||||
All relevant information is available in the scan_info attribute and used
|
||||
to bootstrap the detector for the upcoming acquisition. Two scenarios are tested:
|
||||
I. Normal case with exposure time larger than readout time
|
||||
II. Case where exposure time is smaller than readout time, which should raise an exception.
|
||||
"""
|
||||
falcon = mock_det
|
||||
num_points = 10
|
||||
exp_time = 0.2
|
||||
frames_per_trigger = 5
|
||||
falcon.scan_info.msg.num_points = num_points
|
||||
falcon.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
|
||||
falcon.scan_info.msg.scan_parameters["exp_time"] = exp_time
|
||||
falcon.hdf5.array_counter.put(5) # Set to non-zero to check reset
|
||||
|
||||
# I. Normal case
|
||||
old_file_event = falcon.file_event.get()
|
||||
with mock.patch.object(falcon.hdf5.array_size, "get", return_value=(1, 1, 1)):
|
||||
falcon.stage()
|
||||
|
||||
assert falcon.staged is ophyd.Staged.yes
|
||||
assert falcon._full_path == get_full_path(falcon.scan_info.msg, falcon.name)
|
||||
file_path = falcon.hdf5.file_path.get()
|
||||
file_name = falcon.hdf5.file_name.get()
|
||||
assert os.path.join(file_path, file_name) == falcon._full_path
|
||||
|
||||
assert falcon.preset_real_time.get() == exp_time
|
||||
assert falcon.pixels_per_run.get() == num_points * frames_per_trigger
|
||||
assert falcon.hdf5.num_capture.get() == num_points * frames_per_trigger
|
||||
assert falcon.hdf5.array_counter.get() == 0
|
||||
assert falcon.hdf5.capture.get() == 1
|
||||
assert falcon.start_all.get() == 1
|
||||
|
||||
# II. Unstage device first
|
||||
falcon.unstage()
|
||||
exp_time = 1e-3 # Smaller than readout time
|
||||
falcon.scan_info.msg.scan_parameters["exp_time"] = exp_time
|
||||
with pytest.raises(ValueError):
|
||||
falcon.stage()
|
||||
assert falcon.staged is not ophyd.Staged.no
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"scaninfo",
|
||||
[
|
||||
(
|
||||
{
|
||||
"filepath": "/das/work/p18/p18533/data/S00000-S00999/S00001/data.h5",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
}
|
||||
),
|
||||
(
|
||||
{
|
||||
"filepath": "/das/work/p18/p18533/data/S00000-S00999/S00001/data1234.h5",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
}
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_prepare_data_backend(mock_det, scaninfo):
|
||||
mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"]
|
||||
mock_det.scaninfo.num_points = scaninfo["num_points"]
|
||||
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
|
||||
mock_det.scaninfo.scan_number = 1
|
||||
mock_det.custom_prepare.prepare_data_backend()
|
||||
file_path, file_name = os.path.split(scaninfo["filepath"])
|
||||
assert mock_det.hdf5.file_path.get() == file_path
|
||||
assert mock_det.hdf5.file_name.get() == file_name
|
||||
assert mock_det.hdf5.file_template.get() == "%s%s"
|
||||
assert mock_det.hdf5.num_capture.get() == int(
|
||||
scaninfo["num_points"] * scaninfo["frames_per_trigger"]
|
||||
def test_falcon_on_pre_scan(mock_det: FalconcSAXS):
|
||||
"""Test the on_pre_scan method of the FalconcSAXS device."""
|
||||
falcon = mock_det
|
||||
# I. Test normal case with success
|
||||
falcon.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.DONE
|
||||
falcon.hdf5.capture._read_pv.mock_data = ACQUIRESTATUS.DONE
|
||||
falcon = mock_det
|
||||
st = falcon.on_pre_scan()
|
||||
assert st.done is False
|
||||
assert st.success is False
|
||||
falcon.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.ACQUIRING
|
||||
assert st.done is False
|
||||
assert st.success is False
|
||||
falcon.hdf5.capture._read_pv.mock_data = ACQUIRESTATUS.ACQUIRING
|
||||
st.wait(3)
|
||||
assert st.done is True
|
||||
assert st.success is True
|
||||
|
||||
# II. Test abort case with stop called
|
||||
falcon.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.DONE
|
||||
falcon.hdf5.capture._read_pv.mock_data = ACQUIRESTATUS.DONE
|
||||
st = falcon.on_pre_scan()
|
||||
assert st.done is False
|
||||
assert st.success is False
|
||||
falcon.stop()
|
||||
with pytest.raises(DeviceStoppedError):
|
||||
st.wait(3)
|
||||
assert st.done is True
|
||||
assert st.success is False
|
||||
|
||||
|
||||
def test_falcon_stop(mock_det: FalconcSAXS):
|
||||
"""Test the stop method of the FalconcSAXS device."""
|
||||
falcon = mock_det
|
||||
|
||||
falcon.stop_all.put(0)
|
||||
falcon.hdf5.capture.put(1)
|
||||
falcon.erase_all.put(0)
|
||||
falcon.stop()
|
||||
assert falcon.stop_all.get() == 1
|
||||
assert falcon.hdf5.capture.get() == 0
|
||||
assert falcon.erase_all.get() == 1
|
||||
|
||||
|
||||
def test_falcon_complete(mock_det: FalconcSAXS):
|
||||
"""Test the complete method of the FalconcSAXS device."""
|
||||
falcon = mock_det
|
||||
|
||||
num_points = 10
|
||||
frames_per_trigger = 5
|
||||
falcon.scan_info.msg.num_points = num_points
|
||||
falcon.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
|
||||
|
||||
# I. Test normal case with success
|
||||
falcon.dxp.current_pixel._read_pv.mock_data = num_points * frames_per_trigger - 1
|
||||
falcon.hdf5.array_counter._read_pv.mock_data = num_points * frames_per_trigger - 1
|
||||
falcon._full_path = "/tmp/fake_path/test.h5"
|
||||
st = falcon.on_complete()
|
||||
assert st.done is False
|
||||
assert st.success is False
|
||||
falcon.dxp.current_pixel._read_pv.mock_data = num_points * frames_per_trigger
|
||||
assert st.done is False
|
||||
assert st.success is False
|
||||
falcon.hdf5.array_counter._read_pv.mock_data = num_points * frames_per_trigger
|
||||
st.wait(3)
|
||||
assert st.done is True
|
||||
assert st.success is True
|
||||
assert falcon.file_event.get() == messages.FileMessage(
|
||||
file_path="/tmp/fake_path/test.h5",
|
||||
done=True,
|
||||
successful=True,
|
||||
device_name=falcon.name,
|
||||
file_type="h5",
|
||||
hinted_h5_entries=None,
|
||||
metadata={},
|
||||
)
|
||||
assert mock_det.hdf5.file_write_mode.get() == 2
|
||||
assert mock_det.hdf5.array_counter.get() == 0
|
||||
assert mock_det.hdf5.capture.get() == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"scaninfo",
|
||||
[
|
||||
({"filepath": "test.h5", "successful": True, "done": False, "scan_id": "123"}),
|
||||
({"filepath": "test.h5", "successful": False, "done": True, "scan_id": "123"}),
|
||||
],
|
||||
)
|
||||
def test_publish_file_location(mock_det, scaninfo):
|
||||
mock_det.scaninfo.scan_id = scaninfo["scan_id"]
|
||||
mock_det.filepath.set(scaninfo["filepath"]).wait()
|
||||
mock_det.custom_prepare.publish_file_location(
|
||||
done=scaninfo["done"], successful=scaninfo["successful"]
|
||||
# II. Test case where acquisition fails due to interruption
|
||||
falcon.dxp.current_pixel._read_pv.mock_data = num_points * frames_per_trigger - 1
|
||||
st = falcon.on_complete()
|
||||
assert st.done is False
|
||||
assert st.success is False
|
||||
falcon.stop()
|
||||
with pytest.raises(DeviceStoppedError):
|
||||
st.wait(3)
|
||||
assert falcon.file_event.get() == messages.FileMessage(
|
||||
file_path="/tmp/fake_path/test.h5",
|
||||
done=True,
|
||||
successful=False,
|
||||
device_name=falcon.name,
|
||||
file_type="h5",
|
||||
hinted_h5_entries=None,
|
||||
metadata={},
|
||||
)
|
||||
if scaninfo["successful"] is None:
|
||||
msg = messages.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"])
|
||||
else:
|
||||
msg = messages.FileMessage(
|
||||
file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"]
|
||||
)
|
||||
expected_calls = [
|
||||
mock.call(
|
||||
MessageEndpoints.public_file(scaninfo["scan_id"], mock_det.name),
|
||||
msg,
|
||||
pipe=mock_det.connector.pipeline.return_value,
|
||||
),
|
||||
mock.call(
|
||||
MessageEndpoints.file_event(mock_det.name),
|
||||
msg,
|
||||
pipe=mock_det.connector.pipeline.return_value,
|
||||
),
|
||||
]
|
||||
assert mock_det.connector.set_and_publish.call_args_list == expected_calls
|
||||
|
||||
|
||||
@pytest.mark.parametrize("detector_state, expected_exception", [(1, False), (0, True)])
|
||||
def test_arm_acquisition(mock_det, detector_state, expected_exception):
|
||||
with mock.patch.object(mock_det, "stop") as mock_stop:
|
||||
mock_det.state._read_pv.mock_data = detector_state
|
||||
if expected_exception:
|
||||
with pytest.raises(FalconTimeoutError):
|
||||
mock_det.timeout = 0.1
|
||||
mock_det.custom_prepare.arm_acquisition()
|
||||
mock_stop.assert_called_once()
|
||||
else:
|
||||
mock_det.custom_prepare.arm_acquisition()
|
||||
assert mock_det.start_all.get() == 1
|
||||
|
||||
|
||||
def test_trigger(mock_det):
|
||||
with mock.patch.object(mock_det.custom_prepare, "on_trigger") as mock_on_trigger:
|
||||
mock_det.trigger()
|
||||
mock_on_trigger.assert_called_once()
|
||||
|
||||
|
||||
def test_complete(mock_det):
|
||||
with (
|
||||
mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,
|
||||
mock.patch.object(
|
||||
mock_det.custom_prepare, "publish_file_location"
|
||||
) as mock_publish_file_location,
|
||||
):
|
||||
mock_det.stopped = False
|
||||
mock_det.complete()
|
||||
assert mock_finished.call_count == 1
|
||||
call = mock.call(done=True, successful=True)
|
||||
assert mock_publish_file_location.call_args == call
|
||||
|
||||
|
||||
def test_stop(mock_det):
|
||||
with (
|
||||
mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
|
||||
mock.patch.object(
|
||||
mock_det.custom_prepare, "stop_detector_backend"
|
||||
) as mock_stop_detector_backend,
|
||||
):
|
||||
mock_det.stop()
|
||||
mock_stop_det.assert_called_once()
|
||||
mock_stop_detector_backend.assert_called_once()
|
||||
assert mock_det.stopped is True
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"stopped, scaninfo",
|
||||
[
|
||||
(False, {"num_points": 500, "frames_per_trigger": 1}),
|
||||
(True, {"num_points": 500, "frames_per_trigger": 1}),
|
||||
],
|
||||
)
|
||||
def test_finished(mock_det, stopped, scaninfo):
|
||||
with (
|
||||
mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
|
||||
mock.patch.object(
|
||||
mock_det.custom_prepare, "stop_detector_backend"
|
||||
) as mock_stop_file_writer,
|
||||
):
|
||||
mock_det.stopped = stopped
|
||||
mock_det.dxp.current_pixel._read_pv.mock_data = int(
|
||||
scaninfo["num_points"] * scaninfo["frames_per_trigger"]
|
||||
)
|
||||
mock_det.hdf5.array_counter._read_pv.mock_data = int(
|
||||
scaninfo["num_points"] * scaninfo["frames_per_trigger"]
|
||||
)
|
||||
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
|
||||
mock_det.scaninfo.num_points = scaninfo["num_points"]
|
||||
mock_det.custom_prepare.finished()
|
||||
assert mock_det.stopped is stopped
|
||||
mock_stop_det.assert_called_once()
|
||||
mock_stop_file_writer.assert_called_once()
|
||||
|
||||
Reference in New Issue
Block a user