mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-06-23 03:17:58 +02:00
refactor: clean up code
This commit is contained in:
@ -278,7 +278,7 @@ def test_stage(
|
||||
# TODO consider putting energy as variable in scaninfo
|
||||
mock_det.device_manager.add_device("mokev", value=12.4)
|
||||
mock_det.cam.beam_energy.put(scaninfo["mokev"])
|
||||
mock_det._stopped = stopped
|
||||
mock_det.stopped = stopped
|
||||
mock_det.cam.detector_state._read_pv.mock_data = detector_state
|
||||
with mock.patch.object(mock_det.custom_prepare, "prepare_data_backend") as mock_prep_fw:
|
||||
mock_det.filepath = scaninfo["filepath"]
|
||||
@ -393,15 +393,15 @@ def test_unstage(
|
||||
with mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, mock.patch.object(
|
||||
mock_det.custom_prepare, "publish_file_location"
|
||||
) as mock_publish_file_location:
|
||||
mock_det._stopped = stopped
|
||||
mock_det.stopped = stopped
|
||||
if expected_exception:
|
||||
mock_det.unstage()
|
||||
assert mock_det._stopped is True
|
||||
assert mock_det.stopped is True
|
||||
else:
|
||||
mock_det.unstage()
|
||||
mock_finished.assert_called_once()
|
||||
mock_publish_file_location.assert_called_with(done=True, successful=True)
|
||||
assert mock_det._stopped is False
|
||||
assert mock_det.stopped is False
|
||||
|
||||
|
||||
def test_stop_detector_backend(mock_det):
|
||||
@ -436,15 +436,15 @@ def test_publish_file_location(mock_det, scaninfo):
|
||||
mock.call(
|
||||
MessageEndpoints.public_file(scaninfo["scanID"], mock_det.name),
|
||||
msg,
|
||||
pipe=mock_det._producer.pipeline.return_value,
|
||||
pipe=mock_det.producer.pipeline.return_value,
|
||||
),
|
||||
mock.call(
|
||||
MessageEndpoints.file_event(mock_det.name),
|
||||
msg,
|
||||
pipe=mock_det._producer.pipeline.return_value,
|
||||
pipe=mock_det.producer.pipeline.return_value,
|
||||
),
|
||||
]
|
||||
assert mock_det._producer.set_and_publish.call_args_list == expected_calls
|
||||
assert mock_det.producer.set_and_publish.call_args_list == expected_calls
|
||||
|
||||
|
||||
def test_stop(mock_det):
|
||||
@ -456,7 +456,7 @@ def test_stop(mock_det):
|
||||
mock_det.stop()
|
||||
mock_stop_det.assert_called_once()
|
||||
mock_stop_detector_backend.assert_called_once()
|
||||
assert mock_det._stopped is True
|
||||
assert mock_det.stopped is True
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -520,13 +520,13 @@ def test_finished(mock_det, stopped, cam_state, daq_status, scaninfo, expected_e
|
||||
with pytest.raises(Exception):
|
||||
mock_det.timeout = 0.1
|
||||
mock_det.custom_prepare.finished()
|
||||
assert mock_det._stopped is stopped
|
||||
assert mock_det.stopped is stopped
|
||||
mock_stop_backend.assert_called()
|
||||
mock_stop_det.assert_called_once()
|
||||
else:
|
||||
mock_det.custom_prepare.finished()
|
||||
if stopped:
|
||||
assert mock_det._stopped is stopped
|
||||
assert mock_det.stopped is stopped
|
||||
|
||||
mock_stop_backend.assert_called()
|
||||
mock_stop_det.assert_called_once()
|
||||
|
@ -223,15 +223,15 @@ def test_publish_file_location(mock_det, scaninfo):
|
||||
mock.call(
|
||||
MessageEndpoints.public_file(scaninfo["scanID"], mock_det.name),
|
||||
msg,
|
||||
pipe=mock_det._producer.pipeline.return_value,
|
||||
pipe=mock_det.producer.pipeline.return_value,
|
||||
),
|
||||
mock.call(
|
||||
MessageEndpoints.file_event(mock_det.name),
|
||||
msg,
|
||||
pipe=mock_det._producer.pipeline.return_value,
|
||||
pipe=mock_det.producer.pipeline.return_value,
|
||||
),
|
||||
]
|
||||
assert mock_det._producer.set_and_publish.call_args_list == expected_calls
|
||||
assert mock_det.producer.set_and_publish.call_args_list == expected_calls
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -281,16 +281,16 @@ def test_unstage(
|
||||
with mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, mock.patch.object(
|
||||
mock_det.custom_prepare, "publish_file_location"
|
||||
) as mock_publish_file_location:
|
||||
mock_det._stopped = stopped
|
||||
mock_det.stopped = stopped
|
||||
if expected_abort:
|
||||
mock_det.unstage()
|
||||
assert mock_det._stopped == stopped
|
||||
assert mock_det.stopped is stopped
|
||||
assert mock_publish_file_location.call_count == 0
|
||||
else:
|
||||
mock_det.unstage()
|
||||
mock_finished.assert_called_once()
|
||||
mock_publish_file_location.assert_called_with(done=True, successful=True)
|
||||
assert mock_det._stopped == False
|
||||
assert mock_det.stopped is stopped
|
||||
|
||||
|
||||
def test_stop(mock_det):
|
||||
@ -302,7 +302,7 @@ def test_stop(mock_det):
|
||||
mock_det.stop()
|
||||
mock_stop_det.assert_called_once()
|
||||
mock_stop_detector_backend.assert_called_once()
|
||||
assert mock_det._stopped == True
|
||||
assert mock_det.stopped is True
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -318,7 +318,7 @@ def test_finished(mock_det, stopped, scaninfo):
|
||||
) as mock_stop_det, mock.patch.object(
|
||||
mock_det.custom_prepare, "stop_detector_backend"
|
||||
) as mock_stop_file_writer:
|
||||
mock_det._stopped = stopped
|
||||
mock_det.stopped = stopped
|
||||
mock_det.dxp.current_pixel._read_pv.mock_data = int(
|
||||
scaninfo["num_points"] * scaninfo["frames_per_trigger"]
|
||||
)
|
||||
@ -328,6 +328,6 @@ def test_finished(mock_det, stopped, scaninfo):
|
||||
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
|
||||
mock_det.scaninfo.num_points = scaninfo["num_points"]
|
||||
mock_det.custom_prepare.finished()
|
||||
assert mock_det._stopped == stopped
|
||||
assert mock_det.stopped is stopped
|
||||
mock_stop_det.assert_called_once()
|
||||
mock_stop_file_writer.assert_called_once()
|
||||
|
@ -114,7 +114,7 @@ def test_stage(
|
||||
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
|
||||
mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"]
|
||||
mock_det.device_manager.add_device("mokev", value=12.4)
|
||||
mock_det._stopped = stopped
|
||||
mock_det.stopped = stopped
|
||||
with mock.patch.object(
|
||||
mock_det.custom_prepare, "prepare_data_backend"
|
||||
) as mock_data_backend, mock.patch.object(
|
||||
@ -218,15 +218,15 @@ def test_publish_file_location(mock_det, scaninfo):
|
||||
mock.call(
|
||||
MessageEndpoints.public_file(scaninfo["scanID"], mock_det.name),
|
||||
msg,
|
||||
pipe=mock_det._producer.pipeline.return_value,
|
||||
pipe=mock_det.producer.pipeline.return_value,
|
||||
),
|
||||
mock.call(
|
||||
MessageEndpoints.file_event(mock_det.name),
|
||||
msg,
|
||||
pipe=mock_det._producer.pipeline.return_value,
|
||||
pipe=mock_det.producer.pipeline.return_value,
|
||||
),
|
||||
]
|
||||
assert mock_det._producer.set_and_publish.call_args_list == expected_calls
|
||||
assert mock_det.producer.set_and_publish.call_args_list == expected_calls
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -451,15 +451,15 @@ def test_unstage(
|
||||
with mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, mock.patch.object(
|
||||
mock_det.custom_prepare, "publish_file_location"
|
||||
) as mock_publish_file_location:
|
||||
mock_det._stopped = stopped
|
||||
mock_det.stopped = stopped
|
||||
if expected_exception:
|
||||
mock_det.unstage()
|
||||
assert mock_det._stopped == True
|
||||
assert mock_det.stopped is True
|
||||
else:
|
||||
mock_det.unstage()
|
||||
mock_finished.assert_called_once()
|
||||
mock_publish_file_location.assert_called_with(done=True, successful=True)
|
||||
assert mock_det._stopped == False
|
||||
assert mock_det.stopped is False
|
||||
|
||||
|
||||
def test_stop(mock_det):
|
||||
@ -474,7 +474,7 @@ def test_stop(mock_det):
|
||||
mock_stop_det.assert_called_once()
|
||||
mock_stop_file_writer.assert_called_once()
|
||||
mock_close_file_writer.assert_called_once()
|
||||
assert mock_det._stopped == True
|
||||
assert mock_det.stopped is True
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -506,19 +506,19 @@ def test_finished(mock_det, stopped, mcs_stage_state, expected_exception):
|
||||
mock_det.custom_prepare, "close_file_writer"
|
||||
) as mock_close_file_writer:
|
||||
mock_dm.devices.mcs.obj._staged = mcs_stage_state
|
||||
mock_det._stopped = stopped
|
||||
mock_det.stopped = stopped
|
||||
if expected_exception:
|
||||
with pytest.raises(Exception):
|
||||
mock_det.timeout = 0.1
|
||||
mock_det.custom_prepare.finished()
|
||||
assert mock_det._stopped == stopped
|
||||
assert mock_det.stopped is stopped
|
||||
mock_stop_file_friter.assert_called()
|
||||
mock_stop_det.assert_called_once()
|
||||
mock_close_file_writer.assert_called_once()
|
||||
else:
|
||||
mock_det.custom_prepare.finished()
|
||||
if stopped:
|
||||
assert mock_det._stopped == stopped
|
||||
assert mock_det.stopped is stopped
|
||||
|
||||
mock_stop_file_friter.assert_called()
|
||||
mock_stop_det.assert_called_once()
|
||||
|
Reference in New Issue
Block a user