feat: refactor falcon for psi_detector_base class; adapted eiger; added and debugged tests

This commit is contained in:
2023-11-16 22:52:45 +01:00
parent 90cd05e68e
commit bcc3210761
5 changed files with 319 additions and 419 deletions

View File

@ -114,6 +114,15 @@ def test_initialize_detector(
assert mock_det.cam.trigger_mode.get() == trigger_source
def test_trigger(mock_det):
"""Test the trigger function:
Validate that trigger calls the custom_prepare.on_trigger() function
"""
with mock.patch.object(mock_det.custom_prepare, "on_trigger") as mock_on_trigger:
mock_det.trigger()
mock_on_trigger.assert_called_once()
@pytest.mark.parametrize(
"readout_time, expected_value",
[

View File

@ -28,9 +28,9 @@ def mock_det():
dm = DMMock()
with mock.patch.object(dm, "producer"):
with mock.patch(
"ophyd_devices.epics.devices.falcon_csaxs.FileWriterMixin"
"ophyd_devices.epics.devices.psi_detector_base.FileWriterMixin"
) as filemixin, mock.patch(
"ophyd_devices.epics.devices.falcon_csaxs.FalconcSAXS._update_service_config"
"ophyd_devices.epics.devices.psi_detector_base.PSIDetectorBase._update_service_config"
) as mock_service_config:
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
@ -76,9 +76,9 @@ def test_init_detector(
if expected_exception:
with pytest.raises(FalconTimeoutError):
mock_det.timeout = 0.1
mock_det._init_detector()
mock_det.custom_prepare.initialize_detector()
else:
mock_det._init_detector() # call the method you want to test
mock_det.custom_prepare.initialize_detector()
assert mock_det.state.get() == detector_state
assert mock_det.collect_mode.get() == mapping_source
assert mock_det.pixel_advance_mode.get() == trigger_source
@ -103,17 +103,19 @@ def test_init_detector(
def test_update_readout_time(mock_det, readout_time, expected_value):
# mock_det.scaninfo.readout_time = readout_time
if readout_time is None:
mock_det._update_readout_time()
mock_det.custom_prepare.update_readout_time()
assert mock_det.readout_time == expected_value
else:
mock_det.scaninfo.readout_time = readout_time
mock_det._update_readout_time()
mock_det.custom_prepare.update_readout_time()
assert mock_det.readout_time == expected_value
def test_default_parameter(mock_det):
with mock.patch.object(mock_det, "_update_readout_time") as mock_update_readout_time:
mock_det._default_parameter()
def test_initialize_default_parameter(mock_det):
with mock.patch.object(
mock_det.custom_prepare, "update_readout_time"
) as mock_update_readout_time:
mock_det.custom_prepare.initialize_default_parameter()
assert mock_det._value_pixel_per_buffer == 20
mock_update_readout_time.assert_called_once()
@ -139,12 +141,12 @@ def test_stage(mock_det, scaninfo):
This includes testing _prep_det
"""
with mock.patch.object(mock_det, "_set_trigger") as mock_set_trigger, mock.patch.object(
mock_det, "_prep_file_writer"
) as mock_prep_file_writer, mock.patch.object(
mock_det, "_publish_file_location"
with mock.patch.object(mock_det, "set_trigger") as mock_set_trigger, mock.patch.object(
mock_det.custom_prepare, "prepare_data_backend"
) as mock_prep_data_backend, mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location, mock.patch.object(
mock_det, "_arm_acquisition"
mock_det.custom_prepare, "arm_acquisition"
) as mock_arm_acquisition:
mock_det.scaninfo.exp_time = scaninfo["exp_time"]
mock_det.scaninfo.num_points = scaninfo["num_points"]
@ -155,7 +157,7 @@ def test_stage(mock_det, scaninfo):
assert mock_det.pixels_per_run.get() == int(
scaninfo["num_points"] * scaninfo["frames_per_trigger"]
)
mock_prep_file_writer.assert_called_once()
mock_prep_data_backend.assert_called_once()
mock_publish_file_location.assert_called_once_with(done=False)
mock_arm_acquisition.assert_called_once()
@ -179,12 +181,12 @@ def test_stage(mock_det, scaninfo):
),
],
)
def test_prep_file_writer(mock_det, scaninfo):
def test_prepare_data_backend(mock_det, scaninfo):
mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"]
mock_det.scaninfo.num_points = scaninfo["num_points"]
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
mock_det.scaninfo.scan_number = 1
mock_det._prep_file_writer()
mock_det.custom_prepare.prepare_data_backend()
file_path, file_name = os.path.split(scaninfo["filepath"])
assert mock_det.hdf5.file_path.get() == file_path
assert mock_det.hdf5.file_name.get() == file_name
@ -208,7 +210,9 @@ def test_prep_file_writer(mock_det, scaninfo):
def test_publish_file_location(mock_det, scaninfo):
mock_det.scaninfo.scanID = scaninfo["scanID"]
mock_det.filepath = scaninfo["filepath"]
mock_det._publish_file_location(done=scaninfo["done"], successful=scaninfo["successful"])
mock_det.custom_prepare.publish_file_location(
done=scaninfo["done"], successful=scaninfo["successful"]
)
if scaninfo["successful"] is None:
msg = messages.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"]).dumps()
else:
@ -243,15 +247,15 @@ def test_arm_acquisition(mock_det, detector_state, expected_exception):
if expected_exception:
with pytest.raises(FalconTimeoutError):
mock_det.timeout = 0.1
mock_det._arm_acquisition()
mock_det.custom_prepare.arm_acquisition()
mock_stop.assert_called_once()
else:
mock_det._arm_acquisition()
mock_det.custom_prepare.arm_acquisition()
assert mock_det.start_all.get() == 1
def test_trigger(mock_det):
with mock.patch.object(mock_det, "_on_trigger") as mock_on_trigger:
with mock.patch.object(mock_det.custom_prepare, "on_trigger") as mock_on_trigger:
mock_det.trigger()
mock_on_trigger.assert_called_once()
@ -274,8 +278,8 @@ def test_unstage(
stopped,
expected_abort,
):
with mock.patch.object(mock_det, "_finished") as mock_finished, mock.patch.object(
mock_det, "_publish_file_location"
with mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location:
mock_det._stopped = stopped
if expected_abort:
@ -290,12 +294,14 @@ def test_unstage(
def test_stop(mock_det):
with mock.patch.object(mock_det, "_stop_det") as mock_stop_det, mock.patch.object(
mock_det, "_stop_file_writer"
) as mock_stop_file_writer:
with mock.patch.object(
mock_det.custom_prepare, "stop_detector"
) as mock_stop_det, mock.patch.object(
mock_det.custom_prepare, "stop_detector_backend"
) as mock_stop_detector_backend:
mock_det.stop()
mock_stop_det.assert_called_once()
mock_stop_file_writer.assert_called_once()
mock_stop_detector_backend.assert_called_once()
assert mock_det._stopped == True
@ -307,8 +313,10 @@ def test_stop(mock_det):
],
)
def test_finished(mock_det, stopped, scaninfo):
with mock.patch.object(mock_det, "_stop_det") as mock_stop_det, mock.patch.object(
mock_det, "_stop_file_writer"
with mock.patch.object(
mock_det.custom_prepare, "stop_detector"
) as mock_stop_det, mock.patch.object(
mock_det.custom_prepare, "stop_detector_backend"
) as mock_stop_file_writer:
mock_det._stopped = stopped
mock_det.dxp.current_pixel._read_pv.mock_data = int(
@ -319,7 +327,7 @@ def test_finished(mock_det, stopped, scaninfo):
)
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
mock_det.scaninfo.num_points = scaninfo["num_points"]
mock_det._finished()
mock_det.custom_prepare.finished()
assert mock_det._stopped == stopped
mock_stop_det.assert_called_once()
mock_stop_file_writer.assert_called_once()