From 19faece0a5e119e1f1403372c09825748de5e032 Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 23 Nov 2023 08:37:12 +0100 Subject: [PATCH] feat: add test for class --- tests/test_delay_generator_csaxs.py | 302 ++++++++++++++++++++++++++++ 1 file changed, 302 insertions(+) create mode 100644 tests/test_delay_generator_csaxs.py diff --git a/tests/test_delay_generator_csaxs.py b/tests/test_delay_generator_csaxs.py new file mode 100644 index 0000000..83ebdf9 --- /dev/null +++ b/tests/test_delay_generator_csaxs.py @@ -0,0 +1,302 @@ +# pylint: skip-file +import pytest +from unittest import mock + +from ophyd_devices.epics.devices.delay_generator_csaxs import DDGSetup +from ophyd_devices.epics.devices.psi_delay_generator_base import TriggerSource + + +def patch_dual_pvs(device): + for walk in device.walk_signals(): + if not hasattr(walk.item, "_read_pv"): + continue + if not hasattr(walk.item, "_write_pv"): + continue + if walk.item._read_pv.pvname.endswith("_RBV"): + walk.item._read_pv = walk.item._write_pv + + +@pytest.fixture(scope="function") +def mock_DDGSetup(): + mock_ddg = mock.MagicMock() + yield DDGSetup(parent=mock_ddg) + + +# Fixture for scaninfo +@pytest.fixture( + params=[ + { + "scanID": "1234", + "scan_type": "step", + "num_points": 500, + "frames_per_trigger": 1, + "exposure_time": 0.1, + "readout_time": 0.1, + }, + { + "scanID": "1234", + "scan_type": "step", + "num_points": 500, + "frames_per_trigger": 5, + "exposure_time": 0.01, + "readout_time": 0, + }, + { + "scanID": "1234", + "scan_type": "fly", + "num_points": 500, + "frames_per_trigger": 1, + "exposure_time": 1, + "readout_time": 0.2, + }, + { + "scanID": "1234", + "scan_type": "fly", + "num_points": 500, + "frames_per_trigger": 5, + "exposure_time": 0.1, + "readout_time": 0.4, + }, + ] +) +def scaninfo(request): + return request.param + + +# Fixture for DDG config default values +@pytest.fixture( + params=[ + { + "delay_burst": 0.0, + "delta_width": 0.0, + "additional_triggers": 0, + "polarity": [0, 0, 0, 0, 0], + "amplitude": 0.0, + "offset": 0.0, + "thres_trig_level": 0.0, + }, + { + "delay_burst": 0.1, + "delta_width": 0.1, + "additional_triggers": 1, + "polarity": [0, 0, 1, 0, 0], + "amplitude": 5, + "offset": 0.0, + "thres_trig_level": 2.5, + }, + ] +) +def ddg_config_defaults(request): + return request.param + + +# Fixture for DDG config scan values +@pytest.fixture( + params=[ + { + "fixed_ttl_width": [0, 0, 0, 0, 0], + "trigger_width": None, + "set_high_on_exposure": False, + "set_high_on_stage": False, + "set_trigger_source": "SINGLE_SHOT", + "premove_trigger": False, + }, + { + "fixed_ttl_width": [0, 0, 0, 0, 0], + "trigger_width": 0.1, + "set_high_on_exposure": False, + "set_high_on_stage": False, + "set_trigger_source": "SINGLE_SHOT", + "premove_trigger": True, + }, + { + "fixed_ttl_width": [0, 0, 0, 0, 0], + "trigger_width": 0.1, + "set_high_on_exposure": False, + "set_high_on_stage": False, + "set_trigger_source": "EXT_RISING_EDGE", + "premove_trigger": False, + }, + ] +) +def ddg_config_scan(request): + return request.param + + +# Fixture for delay pairs +@pytest.fixture( + params=[ + {"all_channels": ["channelAB", "channelCD"], "all_delay_pairs": ["AB", "CD"]}, + {"all_channels": [], "all_delay_pairs": []}, + {"all_channels": ["channelT0", "channelAB", "channelCD"], "all_delay_pairs": ["AB", "CD"]}, + ] +) +def channel_pairs(request): + return request.param + + +def test_check_scanID(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan): + """Test the check_scanID method.""" + # Set first attributes of parent class + for k, v in scaninfo.items(): + setattr(mock_DDGSetup.parent.scaninfo, k, v) + for k, v in ddg_config_defaults.items(): + getattr(mock_DDGSetup.parent, k).get.return_value = v + for k, v in ddg_config_scan.items(): + getattr(mock_DDGSetup.parent, k).get.return_value = v + # Call the function you want to test + mock_DDGSetup.check_scanID() + mock_DDGSetup.parent.scaninfo.load_scan_metadata.assert_called_once() + + +def test_on_pre_scan(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan): + """Test the check_scanID method.""" + # Set first attributes of parent class + for k, v in scaninfo.items(): + setattr(mock_DDGSetup.parent.scaninfo, k, v) + for k, v in ddg_config_defaults.items(): + getattr(mock_DDGSetup.parent, k).get.return_value = v + for k, v in ddg_config_scan.items(): + getattr(mock_DDGSetup.parent, k).get.return_value = v + # Call the function you want to test + mock_DDGSetup.on_pre_scan() + if ddg_config_scan["premove_trigger"]: + assert mock_DDGSetup.parent.trigger_shot.put.called_once_with(1) + + +@pytest.mark.parametrize("source", ["SINGLE_SHOT", "EXT_RISING_EDGE"]) +def test_on_trigger(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, source): + """Test the on_trigger method.""" + # Set first attributes of parent class + for k, v in scaninfo.items(): + setattr(mock_DDGSetup.parent.scaninfo, k, v) + for k, v in ddg_config_defaults.items(): + getattr(mock_DDGSetup.parent, k).get.return_value = v + for k, v in ddg_config_scan.items(): + getattr(mock_DDGSetup.parent, k).get.return_value = v + # Call the function you want to test + mock_DDGSetup.parent.source.name = "source" + mock_DDGSetup.parent.source.read.return_value = { + mock_DDGSetup.parent.source.name: {"value": source} + } + mock_DDGSetup.on_trigger() + if source == "SINGLE_SHOT": + assert mock_DDGSetup.parent.trigger_shot.put.called_once_with(1) + + +def test_initialize_default_parameter( + mock_DDGSetup, + scaninfo, + ddg_config_defaults, + ddg_config_scan, + channel_pairs, +): + """Test the initialize_default_parameter method.""" + # Set first attributes of parent class + for k, v in scaninfo.items(): + setattr(mock_DDGSetup.parent.scaninfo, k, v) + for k, v in ddg_config_defaults.items(): + getattr(mock_DDGSetup.parent, k).get.return_value = v + for k, v in ddg_config_scan.items(): + getattr(mock_DDGSetup.parent, k).get.return_value = v + # Call the function you want to test + mock_DDGSetup.parent.all_channels = channel_pairs["all_channels"] + mock_DDGSetup.parent.all_delay_pairs = channel_pairs["all_delay_pairs"] + calls = [] + calls.extend( + [ + mock.call("polarity", ddg_config_defaults["polarity"][ii], [channel]) + for ii, channel in enumerate(channel_pairs["all_channels"]) + ] + ) + calls.extend([mock.call("amplitude", ddg_config_defaults["amplitude"])]) + calls.extend([mock.call("offset", ddg_config_defaults["offset"])]) + calls.extend( + [ + mock.call( + "reference", 0, [f"channel{pair}.ch1" for pair in channel_pairs["all_delay_pairs"]] + ) + ] + ) + calls.extend( + [ + mock.call( + "reference", 0, [f"channel{pair}.ch2" for pair in channel_pairs["all_delay_pairs"]] + ) + ] + ) + mock_DDGSetup.initialize_default_parameter() + mock_DDGSetup.parent.set_channels.assert_has_calls(calls) + + +def test_prepare_ddg( + mock_DDGSetup, + scaninfo, + ddg_config_defaults, + ddg_config_scan, + channel_pairs, +): + """Test the prepare_ddg method.""" + # Set first attributes of parent class + for k, v in scaninfo.items(): + setattr(mock_DDGSetup.parent.scaninfo, k, v) + for k, v in ddg_config_defaults.items(): + getattr(mock_DDGSetup.parent, k).get.return_value = v + for k, v in ddg_config_scan.items(): + getattr(mock_DDGSetup.parent, k).get.return_value = v + # Call the function you want to test + mock_DDGSetup.parent.all_channels = channel_pairs["all_channels"] + mock_DDGSetup.parent.all_delay_pairs = channel_pairs["all_delay_pairs"] + + mock_DDGSetup.prepare_ddg() + assert mock_DDGSetup.parent.set_trigger.called_once_with( + getattr(TriggerSource, ddg_config_scan["set_trigger_source"]) + ) + if scaninfo["scan_type"] == "step": + if ddg_config_scan["set_high_on_exposure"]: + num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"] + exp_time = ddg_config_defaults["delta_width"] + scaninfo["frames_per_trigger"] * ( + scaninfo["exposure_time"] + scaninfo["readout_time"] + ) + total_exposure = exp_time + delay_burst = ddg_config_defaults["delay_burst"] + else: + exp_time = ddg_config_defaults["delta_width"] + scaninfo["exposure_time"] + total_exposure = exp_time + scaninfo["readout_time"] + delay_burst = ddg_config_defaults["delay_burst"] + num_burst_cycle = ( + scaninfo["frames_per_trigger"] + ddg_config_defaults["additional_triggers"] + ) + elif scaninfo["scan_type"] == "fly": + if ddg_config_scan["set_high_on_exposure"]: + num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"] + exp_time = ( + ddg_config_defaults["delta_width"] + + scaninfo["num_points"] * scaninfo["exposure_time"] + + (scaninfo["num_points"] - 1) * scaninfo["readout_time"] + ) + total_exposure = exp_time + delay_burst = ddg_config_defaults["delay_burst"] + else: + exp_time = ddg_config_defaults["delta_width"] + scaninfo["exposure_time"] + total_exposure = exp_time + scaninfo["readout_time"] + delay_burst = ddg_config_defaults["delay_burst"] + num_burst_cycle = scaninfo["num_points"] + ddg_config_defaults["additional_triggers"] + + assert mock_DDGSetup.parent.burst_enable.called_once_with( + num_burst_cycle, delay_burst, total_exposure, config="first" + ) + if not ddg_config_scan["trigger_width"]: + calls = mock.call("width", exp_time) + assert mock_DDGSetup.parent.set_channels.has_calls(calls) + else: + calls = mock.call("width", ddg_config_scan["trigger_width"]) + assert mock_DDGSetup.parent.set_channels.has_calls(calls) + if ddg_config_scan["set_high_on_exposure"]: + calls = [ + mock.call("width", value, channels=[channel]) + for channel, value in zip( + ddg_config_defaults["fixed_ttl_width"], channel_pairs["all_channels"] + ) + ] + assert mock_DDGSetup.parent.set_channels.has_calls(calls)