test: fixed all eiger test with updated mock PV; closes #11

This commit is contained in:
appel_c 2023-11-07 13:48:43 +01:00
parent 7e9abdb323
commit cb49a2a205

View File

@ -15,7 +15,9 @@ def patch_dual_pvs(device):
continue continue
if not hasattr(walk.item, "_write_pv"): if not hasattr(walk.item, "_write_pv"):
continue continue
if walk.item._read_pv.pvname.endswith("_RBV"): if walk.item._read_pv.pvname.endswith("_RBV") and isinstance(
walk.item, ophyd.areadetector.base.EpicsSignalWithRBV
):
walk.item._read_pv = walk.item._write_pv walk.item._read_pv = walk.item._write_pv
@ -130,376 +132,376 @@ def test_update_readout_time(mock_det, readout_time, expected_value):
assert mock_det.readout_time == expected_value assert mock_det.readout_time == expected_value
# @pytest.mark.parametrize( @pytest.mark.parametrize(
# "eacc, exp_url, daq_status, daq_cfg, expected_exception", "eacc, exp_url, daq_status, daq_cfg, expected_exception",
# [ [
# ( (
# "e12345", "e12345",
# "http://xbl-daq-29:5000", "http://xbl-daq-29:5000",
# {"state": "READY"}, {"state": "READY"},
# {"writer_user_id": 12543}, {"writer_user_id": 12543},
# False, False,
# ), ),
# ( (
# "e12345", "e12345",
# "http://xbl-daq-29:5000", "http://xbl-daq-29:5000",
# {"state": "READY"}, {"state": "READY"},
# {"writer_user_id": 15421}, {"writer_user_id": 15421},
# False, False,
# ), ),
# ( (
# "e12345", "e12345",
# "http://xbl-daq-29:5000", "http://xbl-daq-29:5000",
# {"state": "BUSY"}, {"state": "BUSY"},
# {"writer_user_id": 15421}, {"writer_user_id": 15421},
# True, True,
# ), ),
# ( (
# "e12345", "e12345",
# "http://xbl-daq-29:5000", "http://xbl-daq-29:5000",
# {"state": "READY"}, {"state": "READY"},
# {"writer_ud": 12345}, {"writer_ud": 12345},
# True, True,
# ), ),
# ], ],
# ) )
# def test_init_filewriter(mock_det, eacc, exp_url, daq_status, daq_cfg, expected_exception): def test_init_filewriter(mock_det, eacc, exp_url, daq_status, daq_cfg, expected_exception):
# """Test _init_filewriter (std daq in this case) """Test _init_filewriter (std daq in this case)
# This includes testing the functions: This includes testing the functions:
# - _update_service_config - _update_service_config
# Validation upon checking set values in mocked std_daq instance Validation upon checking set values in mocked std_daq instance
# """ """
# with mock.patch("ophyd_devices.epics.devices.eiger9m_csaxs.StdDaqClient") as mock_std_daq: with mock.patch("ophyd_devices.epics.devices.eiger9m_csaxs.StdDaqClient") as mock_std_daq:
# instance = mock_std_daq.return_value instance = mock_std_daq.return_value
# instance.stop_writer.return_value = None instance.stop_writer.return_value = None
# instance.get_status.return_value = daq_status instance.get_status.return_value = daq_status
# instance.get_config.return_value = daq_cfg instance.get_config.return_value = daq_cfg
# mock_det.scaninfo.username = eacc mock_det.scaninfo.username = eacc
# # scaninfo.username.return_value = eacc # scaninfo.username.return_value = eacc
# if expected_exception: if expected_exception:
# with pytest.raises(Exception): with pytest.raises(Exception):
# mock_det._init_filewriter() mock_det._init_filewriter()
# else: else:
# mock_det._init_filewriter() mock_det._init_filewriter()
# assert mock_det.std_rest_server_url == exp_url assert mock_det.std_rest_server_url == exp_url
# instance.stop_writer.assert_called_once() instance.stop_writer.assert_called_once()
# instance.get_status.assert_called() instance.get_status.assert_called()
# instance.set_config.assert_called_once_with(daq_cfg) instance.set_config.assert_called_once_with(daq_cfg)
# @pytest.mark.parametrize( @pytest.mark.parametrize(
# "scaninfo, daq_status, daq_cfg, detector_state, stopped, expected_exception", "scaninfo, daq_status, daq_cfg, detector_state, stopped, expected_exception",
# [ [
# ( (
# { {
# "eacc": "e12345", "eacc": "e12345",
# "num_points": 500, "num_points": 500,
# "frames_per_trigger": 1, "frames_per_trigger": 1,
# "filepath": "test.h5", "filepath": "test.h5",
# "scanID": "123", "scanID": "123",
# "mokev": 12.4, "mokev": 12.4,
# }, },
# {"state": "READY"}, {"state": "READY"},
# {"writer_user_id": 12543}, {"writer_user_id": 12543},
# 5, 5,
# False, False,
# False, False,
# ), ),
# ( (
# { {
# "eacc": "e12345", "eacc": "e12345",
# "num_points": 500, "num_points": 500,
# "frames_per_trigger": 1, "frames_per_trigger": 1,
# "filepath": "test.h5", "filepath": "test.h5",
# "scanID": "123", "scanID": "123",
# "mokev": 12.4, "mokev": 12.4,
# }, },
# {"state": "BUSY"}, {"state": "BUSY"},
# {"writer_user_id": 15421}, {"writer_user_id": 15421},
# 5, 5,
# False, False,
# False, False,
# ), ),
# ( (
# { {
# "eacc": "e12345", "eacc": "e12345",
# "num_points": 500, "num_points": 500,
# "frames_per_trigger": 1, "frames_per_trigger": 1,
# "filepath": "test.h5", "filepath": "test.h5",
# "scanID": "123", "scanID": "123",
# "mokev": 18.4, "mokev": 18.4,
# }, },
# {"state": "READY"}, {"state": "READY"},
# {"writer_user_id": 12345}, {"writer_user_id": 12345},
# 4, 4,
# False, False,
# True, True,
# ), ),
# ], ],
# ) )
# def test_stage( def test_stage(
# mock_det, mock_det,
# scaninfo, scaninfo,
# daq_status, daq_status,
# daq_cfg, daq_cfg,
# detector_state, detector_state,
# stopped, stopped,
# expected_exception, expected_exception,
# ): ):
# with mock.patch.object(mock_det, "std_client") as mock_std_daq, mock.patch.object( with mock.patch.object(mock_det, "std_client") as mock_std_daq, mock.patch.object(
# Eiger9McSAXS, "_publish_file_location" Eiger9McSAXS, "_publish_file_location"
# ) as mock_publish_file_location: ) as mock_publish_file_location:
# mock_std_daq.stop_writer.return_value = None mock_std_daq.stop_writer.return_value = None
# mock_std_daq.get_status.return_value = daq_status mock_std_daq.get_status.return_value = daq_status
# mock_std_daq.get_config.return_value = daq_cfg mock_std_daq.get_config.return_value = daq_cfg
# mock_det.scaninfo.num_points = scaninfo["num_points"] mock_det.scaninfo.num_points = scaninfo["num_points"]
# mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
# mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"] mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"]
# # TODO consider putting energy as variable in scaninfo # TODO consider putting energy as variable in scaninfo
# mock_det.device_manager.add_device("mokev", value=12.4) mock_det.device_manager.add_device("mokev", value=12.4)
# mock_det.cam.beam_energy.put(scaninfo["mokev"]) mock_det.cam.beam_energy.put(scaninfo["mokev"])
# mock_det._stopped = stopped mock_det._stopped = stopped
# mock_det.cam.detector_state.put(detector_state) mock_det.cam.detector_state._read_pv.mock_data = detector_state
# with mock.patch.object(mock_det, "_prep_file_writer") as mock_prep_fw: with mock.patch.object(mock_det, "_prep_file_writer") as mock_prep_fw:
# mock_det.filepath = scaninfo["filepath"] mock_det.filepath = scaninfo["filepath"]
# if expected_exception: if expected_exception:
# with pytest.raises(Exception): with pytest.raises(Exception):
# mock_det.stage() mock_det.stage()
# else: else:
# mock_det.stage() mock_det.stage()
# mock_prep_fw.assert_called_once() mock_prep_fw.assert_called_once()
# # Check _prep_det # Check _prep_det
# assert mock_det.cam.num_images.get() == int( assert mock_det.cam.num_images.get() == int(
# scaninfo["num_points"] * scaninfo["frames_per_trigger"] scaninfo["num_points"] * scaninfo["frames_per_trigger"]
# ) )
# assert mock_det.cam.num_frames.get() == 1 assert mock_det.cam.num_frames.get() == 1
# mock_publish_file_location.assert_called_with(done=False) mock_publish_file_location.assert_called_with(done=False)
# assert mock_det.cam.acquire.get() == 1 assert mock_det.cam.acquire.get() == 1
# @pytest.mark.parametrize( @pytest.mark.parametrize(
# "scaninfo, daq_status, expected_exception", "scaninfo, daq_status, expected_exception",
# [ [
# ( (
# { {
# "eacc": "e12345", "eacc": "e12345",
# "num_points": 500, "num_points": 500,
# "frames_per_trigger": 1, "frames_per_trigger": 1,
# "filepath": "test.h5", "filepath": "test.h5",
# "scanID": "123", "scanID": "123",
# }, },
# {"state": "BUSY", "acquisition": {"state": "WAITING_IMAGES"}}, {"state": "BUSY", "acquisition": {"state": "WAITING_IMAGES"}},
# False, False,
# ), ),
# ( (
# { {
# "eacc": "e12345", "eacc": "e12345",
# "num_points": 500, "num_points": 500,
# "frames_per_trigger": 1, "frames_per_trigger": 1,
# "filepath": "test.h5", "filepath": "test.h5",
# "scanID": "123", "scanID": "123",
# }, },
# {"state": "BUSY", "acquisition": {"state": "WAITING_IMAGES"}}, {"state": "BUSY", "acquisition": {"state": "WAITING_IMAGES"}},
# False, False,
# ), ),
# ( (
# { {
# "eacc": "e12345", "eacc": "e12345",
# "num_points": 500, "num_points": 500,
# "frames_per_trigger": 1, "frames_per_trigger": 1,
# "filepath": "test.h5", "filepath": "test.h5",
# "scanID": "123", "scanID": "123",
# }, },
# {"state": "BUSY", "acquisition": {"state": "ERROR"}}, {"state": "BUSY", "acquisition": {"state": "ERROR"}},
# True, True,
# ), ),
# ], ],
# ) )
# def test_prep_file_writer(mock_det, scaninfo, daq_status, expected_exception): def test_prep_file_writer(mock_det, scaninfo, daq_status, expected_exception):
# with mock.patch.object(mock_det, "std_client") as mock_std_daq, mock.patch.object( with mock.patch.object(mock_det, "std_client") as mock_std_daq, mock.patch.object(
# mock_det, "_filepath_exists" mock_det, "_filepath_exists"
# ) as mock_file_path_exists, mock.patch.object( ) as mock_file_path_exists, mock.patch.object(
# mock_det, "_stop_file_writer" mock_det, "_stop_file_writer"
# ) as mock_stop_file_writer, mock.patch.object( ) as mock_stop_file_writer, mock.patch.object(
# mock_det, "scaninfo" mock_det, "scaninfo"
# ) as mock_scaninfo: ) as mock_scaninfo:
# # mock_det = eiger_factory(name, prefix, sim_mode) # mock_det = eiger_factory(name, prefix, sim_mode)
# mock_det.std_client = mock_std_daq mock_det.std_client = mock_std_daq
# mock_std_daq.start_writer_async.return_value = None mock_std_daq.start_writer_async.return_value = None
# mock_std_daq.get_status.return_value = daq_status mock_std_daq.get_status.return_value = daq_status
# mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"] mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"]
# mock_det.scaninfo.num_points = scaninfo["num_points"] mock_det.scaninfo.num_points = scaninfo["num_points"]
# mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
# if expected_exception: if expected_exception:
# with pytest.raises(Exception): with pytest.raises(Exception):
# mock_det._prep_file_writer() mock_det._prep_file_writer()
# mock_file_path_exists.assert_called_once() mock_file_path_exists.assert_called_once()
# assert mock_stop_file_writer.call_count == 2 assert mock_stop_file_writer.call_count == 2
# else: else:
# mock_det._prep_file_writer() mock_det._prep_file_writer()
# mock_file_path_exists.assert_called_once() mock_file_path_exists.assert_called_once()
# mock_stop_file_writer.assert_called_once() mock_stop_file_writer.assert_called_once()
# daq_writer_call = { daq_writer_call = {
# "output_file": scaninfo["filepath"], "output_file": scaninfo["filepath"],
# "n_images": int(scaninfo["num_points"] * scaninfo["frames_per_trigger"]), "n_images": int(scaninfo["num_points"] * scaninfo["frames_per_trigger"]),
# } }
# mock_std_daq.start_writer_async.assert_called_with(daq_writer_call) mock_std_daq.start_writer_async.assert_called_with(daq_writer_call)
# @pytest.mark.parametrize( @pytest.mark.parametrize(
# "stopped, expected_exception", "stopped, expected_exception",
# [ [
# ( (
# False, False,
# False, False,
# ), ),
# ( (
# True, True,
# True, True,
# ), ),
# ], ],
# ) )
# def test_unstage( def test_unstage(
# mock_det, mock_det,
# stopped, stopped,
# expected_exception, expected_exception,
# ): ):
# with mock.patch.object(mock_det, "_finished") as mock_finished, mock.patch.object( with mock.patch.object(mock_det, "_finished") as mock_finished, mock.patch.object(
# mock_det, "_publish_file_location" mock_det, "_publish_file_location"
# ) as mock_publish_file_location: ) as mock_publish_file_location:
# mock_det._stopped = stopped mock_det._stopped = stopped
# if expected_exception: if expected_exception:
# mock_det.unstage() mock_det.unstage()
# assert mock_det._stopped == True assert mock_det._stopped == True
# else: else:
# mock_det.unstage() mock_det.unstage()
# mock_finished.assert_called_once() mock_finished.assert_called_once()
# mock_publish_file_location.assert_called_with(done=True, successful=True) mock_publish_file_location.assert_called_with(done=True, successful=True)
# assert mock_det._stopped == False assert mock_det._stopped == False
# def test_stop_fw(mock_det): def test_stop_fw(mock_det):
# with mock.patch.object(mock_det, "std_client") as mock_std_daq: with mock.patch.object(mock_det, "std_client") as mock_std_daq:
# mock_std_daq.stop_writer.return_value = None mock_std_daq.stop_writer.return_value = None
# mock_det.std_client = mock_std_daq mock_det.std_client = mock_std_daq
# mock_det._stop_file_writer() mock_det._stop_file_writer()
# mock_std_daq.stop_writer.assert_called_once() mock_std_daq.stop_writer.assert_called_once()
# @pytest.mark.parametrize( @pytest.mark.parametrize(
# "scaninfo", "scaninfo",
# [ [
# ({"filepath": "test.h5", "successful": True, "done": False, "scanID": "123"}), ({"filepath": "test.h5", "successful": True, "done": False, "scanID": "123"}),
# ({"filepath": "test.h5", "successful": False, "done": True, "scanID": "123"}), ({"filepath": "test.h5", "successful": False, "done": True, "scanID": "123"}),
# ({"filepath": "test.h5", "successful": None, "done": True, "scanID": "123"}), ({"filepath": "test.h5", "successful": None, "done": True, "scanID": "123"}),
# ], ],
# ) )
# def test_publish_file_location(mock_det, scaninfo): def test_publish_file_location(mock_det, scaninfo):
# mock_det.scaninfo.scanID = scaninfo["scanID"] mock_det.scaninfo.scanID = scaninfo["scanID"]
# mock_det.filepath = scaninfo["filepath"] mock_det.filepath = scaninfo["filepath"]
# mock_det._publish_file_location(done=scaninfo["done"], successful=scaninfo["successful"]) mock_det._publish_file_location(done=scaninfo["done"], successful=scaninfo["successful"])
# if scaninfo["successful"] is None: if scaninfo["successful"] is None:
# msg = BECMessage.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"]).dumps() msg = BECMessage.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"]).dumps()
# else: else:
# msg = BECMessage.FileMessage( msg = BECMessage.FileMessage(
# file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"] file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"]
# ).dumps() ).dumps()
# expected_calls = [ expected_calls = [
# mock.call( mock.call(
# MessageEndpoints.public_file(scaninfo["scanID"], mock_det.name), MessageEndpoints.public_file(scaninfo["scanID"], mock_det.name),
# msg, msg,
# pipe=mock_det._producer.pipeline.return_value, pipe=mock_det._producer.pipeline.return_value,
# ), ),
# mock.call( mock.call(
# MessageEndpoints.file_event(mock_det.name), MessageEndpoints.file_event(mock_det.name),
# msg, msg,
# pipe=mock_det._producer.pipeline.return_value, pipe=mock_det._producer.pipeline.return_value,
# ), ),
# ] ]
# assert mock_det._producer.set_and_publish.call_args_list == expected_calls assert mock_det._producer.set_and_publish.call_args_list == expected_calls
# def test_stop(mock_det): def test_stop(mock_det):
# with mock.patch.object(mock_det, "_stop_det") as mock_stop_det, mock.patch.object( with mock.patch.object(mock_det, "_stop_det") as mock_stop_det, mock.patch.object(
# mock_det, "_stop_file_writer" mock_det, "_stop_file_writer"
# ) as mock_stop_file_writer: ) as mock_stop_file_writer:
# mock_det.stop() mock_det.stop()
# mock_stop_det.assert_called_once() mock_stop_det.assert_called_once()
# mock_stop_file_writer.assert_called_once() mock_stop_file_writer.assert_called_once()
# assert mock_det._stopped == True assert mock_det._stopped == True
# @pytest.mark.parametrize( @pytest.mark.parametrize(
# "stopped, scaninfo, cam_state, daq_status, expected_exception", "stopped, scaninfo, cam_state, daq_status, expected_exception",
# [ [
# ( (
# False, False,
# { {
# "num_points": 500, "num_points": 500,
# "frames_per_trigger": 4, "frames_per_trigger": 4,
# }, },
# 0, 0,
# {"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 2000}}}, {"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 2000}}},
# False, False,
# ), ),
# ( (
# False, False,
# { {
# "num_points": 500, "num_points": 500,
# "frames_per_trigger": 4, "frames_per_trigger": 4,
# }, },
# 0, 0,
# {"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 1999}}}, {"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 1999}}},
# True, True,
# ), ),
# ( (
# False, False,
# { {
# "num_points": 500, "num_points": 500,
# "frames_per_trigger": 1, "frames_per_trigger": 1,
# }, },
# 1, 1,
# {"acquisition": {"state": "READY", "stats": {"n_write_completed": 500}}}, {"acquisition": {"state": "READY", "stats": {"n_write_completed": 500}}},
# True, True,
# ), ),
# ( (
# False, False,
# { {
# "num_points": 500, "num_points": 500,
# "frames_per_trigger": 1, "frames_per_trigger": 1,
# }, },
# 0, 0,
# {"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 500}}}, {"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 500}}},
# False, False,
# ), ),
# ], ],
# ) )
# def test_finished(mock_det, stopped, cam_state, daq_status, scaninfo, expected_exception): def test_finished(mock_det, stopped, cam_state, daq_status, scaninfo, expected_exception):
# with mock.patch.object(mock_det, "std_client") as mock_std_daq, mock.patch.object( with mock.patch.object(mock_det, "std_client") as mock_std_daq, mock.patch.object(
# mock_det, "_stop_file_writer" mock_det, "_stop_file_writer"
# ) as mock_stop_file_friter, mock.patch.object(mock_det, "_stop_det") as mock_stop_det: ) as mock_stop_file_friter, mock.patch.object(mock_det, "_stop_det") as mock_stop_det:
# mock_std_daq.get_status.return_value = daq_status mock_std_daq.get_status.return_value = daq_status
# mock_det.cam.acquire.put(cam_state) mock_det.cam.acquire._read_pv.mock_state = cam_state
# mock_det.scaninfo.num_points = scaninfo["num_points"] mock_det.scaninfo.num_points = scaninfo["num_points"]
# mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
# if expected_exception: if expected_exception:
# with pytest.raises(Exception): with pytest.raises(Exception):
# mock_det._finished() mock_det._finished()
# assert mock_det._stopped == stopped assert mock_det._stopped == stopped
# mock_stop_file_friter.assert_called() mock_stop_file_friter.assert_called()
# mock_stop_det.assert_called_once() mock_stop_det.assert_called_once()
# else: else:
# mock_det._finished() mock_det._finished()
# if stopped: if stopped:
# assert mock_det._stopped == stopped assert mock_det._stopped == stopped
# mock_stop_file_friter.assert_called() mock_stop_file_friter.assert_called()
# mock_stop_det.assert_called_once() mock_stop_det.assert_called_once()