ophyd_devices/tests/test_delay_generator_csaxs.py

308 lines
10 KiB
Python

# pylint: skip-file
import pytest
from unittest import mock
from ophyd_devices.epics.devices.delay_generator_csaxs import DDGSetup
from ophyd_devices.epics.devices.psi_delay_generator_base import TriggerSource
def patch_dual_pvs(device):
for walk in device.walk_signals():
if not hasattr(walk.item, "_read_pv"):
continue
if not hasattr(walk.item, "_write_pv"):
continue
if walk.item._read_pv.pvname.endswith("_RBV"):
walk.item._read_pv = walk.item._write_pv
@pytest.fixture(scope="function")
def mock_DDGSetup():
mock_ddg = mock.MagicMock()
yield DDGSetup(parent=mock_ddg)
# Fixture for scaninfo
@pytest.fixture(
params=[
{
"scanID": "1234",
"scan_type": "step",
"num_points": 500,
"frames_per_trigger": 1,
"exp_time": 0.1,
"readout_time": 0.1,
},
{
"scanID": "1234",
"scan_type": "step",
"num_points": 500,
"frames_per_trigger": 5,
"exp_time": 0.01,
"readout_time": 0,
},
{
"scanID": "1234",
"scan_type": "fly",
"num_points": 500,
"frames_per_trigger": 1,
"exp_time": 1,
"readout_time": 0.2,
},
{
"scanID": "1234",
"scan_type": "fly",
"num_points": 500,
"frames_per_trigger": 5,
"exp_time": 0.1,
"readout_time": 0.4,
},
]
)
def scaninfo(request):
return request.param
# Fixture for DDG config default values
@pytest.fixture(
params=[
{
"delay_burst": 0.0,
"delta_width": 0.0,
"additional_triggers": 0,
"polarity": [0, 0, 0, 0, 0],
"amplitude": 0.0,
"offset": 0.0,
"thres_trig_level": 0.0,
},
{
"delay_burst": 0.1,
"delta_width": 0.1,
"additional_triggers": 1,
"polarity": [0, 0, 1, 0, 0],
"amplitude": 5,
"offset": 0.0,
"thres_trig_level": 2.5,
},
]
)
def ddg_config_defaults(request):
return request.param
# Fixture for DDG config scan values
@pytest.fixture(
params=[
{
"fixed_ttl_width": [0, 0, 0, 0, 0],
"trigger_width": None,
"set_high_on_exposure": False,
"set_high_on_stage": False,
"set_trigger_source": "SINGLE_SHOT",
"premove_trigger": False,
},
{
"fixed_ttl_width": [0, 0, 0, 0, 0],
"trigger_width": 0.1,
"set_high_on_exposure": True,
"set_high_on_stage": False,
"set_trigger_source": "SINGLE_SHOT",
"premove_trigger": True,
},
{
"fixed_ttl_width": [0, 0, 0, 0, 0],
"trigger_width": 0.1,
"set_high_on_exposure": False,
"set_high_on_stage": False,
"set_trigger_source": "EXT_RISING_EDGE",
"premove_trigger": False,
},
]
)
def ddg_config_scan(request):
return request.param
# Fixture for delay pairs
@pytest.fixture(
params=[
{"all_channels": ["channelAB", "channelCD"], "all_delay_pairs": ["AB", "CD"]},
{"all_channels": [], "all_delay_pairs": []},
{"all_channels": ["channelT0", "channelAB", "channelCD"], "all_delay_pairs": ["AB", "CD"]},
]
)
def channel_pairs(request):
return request.param
def test_check_scanID(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan):
"""Test the check_scanID method."""
# Set first attributes of parent class
for k, v in scaninfo.items():
setattr(mock_DDGSetup.parent.scaninfo, k, v)
for k, v in ddg_config_defaults.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
for k, v in ddg_config_scan.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
# Call the function you want to test
mock_DDGSetup.check_scanID()
mock_DDGSetup.parent.scaninfo.load_scan_metadata.assert_called_once()
def test_on_pre_scan(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan):
"""Test the check_scanID method."""
# Set first attributes of parent class
for k, v in scaninfo.items():
setattr(mock_DDGSetup.parent.scaninfo, k, v)
for k, v in ddg_config_defaults.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
for k, v in ddg_config_scan.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
# Call the function you want to test
mock_DDGSetup.on_pre_scan()
if ddg_config_scan["premove_trigger"]:
mock_DDGSetup.parent.trigger_shot.put.assert_called_once_with(1)
@pytest.mark.parametrize("source", ["SINGLE_SHOT", "EXT_RISING_EDGE"])
def test_on_trigger(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, source):
"""Test the on_trigger method."""
# Set first attributes of parent class
for k, v in scaninfo.items():
setattr(mock_DDGSetup.parent.scaninfo, k, v)
for k, v in ddg_config_defaults.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
for k, v in ddg_config_scan.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
# Call the function you want to test
mock_DDGSetup.parent.source.name = "source"
mock_DDGSetup.parent.source.read.return_value = {
mock_DDGSetup.parent.source.name: {"value": getattr(TriggerSource, source)}
}
mock_DDGSetup.on_trigger()
if source == "SINGLE_SHOT":
mock_DDGSetup.parent.trigger_shot.put.assert_called_once_with(1)
def test_initialize_default_parameter(
mock_DDGSetup,
scaninfo,
ddg_config_defaults,
ddg_config_scan,
channel_pairs,
):
"""Test the initialize_default_parameter method."""
# Set first attributes of parent class
for k, v in scaninfo.items():
setattr(mock_DDGSetup.parent.scaninfo, k, v)
for k, v in ddg_config_defaults.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
for k, v in ddg_config_scan.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
# Call the function you want to test
mock_DDGSetup.parent.all_channels = channel_pairs["all_channels"]
mock_DDGSetup.parent.all_delay_pairs = channel_pairs["all_delay_pairs"]
calls = []
calls.extend(
[
mock.call("polarity", ddg_config_defaults["polarity"][ii], [channel])
for ii, channel in enumerate(channel_pairs["all_channels"])
]
)
calls.extend([mock.call("amplitude", ddg_config_defaults["amplitude"])])
calls.extend([mock.call("offset", ddg_config_defaults["offset"])])
calls.extend(
[
mock.call(
"reference", 0, [f"channel{pair}.ch1" for pair in channel_pairs["all_delay_pairs"]]
)
]
)
calls.extend(
[
mock.call(
"reference", 0, [f"channel{pair}.ch2" for pair in channel_pairs["all_delay_pairs"]]
)
]
)
mock_DDGSetup.initialize_default_parameter()
mock_DDGSetup.parent.set_channels.assert_has_calls(calls)
def test_prepare_ddg(
mock_DDGSetup,
scaninfo,
ddg_config_defaults,
ddg_config_scan,
channel_pairs,
):
"""Test the prepare_ddg method."""
# Set first attributes of parent class
for k, v in scaninfo.items():
setattr(mock_DDGSetup.parent.scaninfo, k, v)
for k, v in ddg_config_defaults.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
for k, v in ddg_config_scan.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
# Call the function you want to test
mock_DDGSetup.parent.all_channels = channel_pairs["all_channels"]
mock_DDGSetup.parent.all_delay_pairs = channel_pairs["all_delay_pairs"]
mock_DDGSetup.prepare_ddg()
mock_DDGSetup.parent.set_trigger.assert_called_once_with(
getattr(TriggerSource, ddg_config_scan["set_trigger_source"])
)
if scaninfo["scan_type"] == "step":
if ddg_config_scan["set_high_on_exposure"]:
num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"]
exp_time = ddg_config_defaults["delta_width"] + scaninfo["frames_per_trigger"] * (
scaninfo["exp_time"] + scaninfo["readout_time"]
)
total_exposure = exp_time
delay_burst = ddg_config_defaults["delay_burst"]
else:
exp_time = ddg_config_defaults["delta_width"] + scaninfo["exp_time"]
total_exposure = exp_time + scaninfo["readout_time"]
delay_burst = ddg_config_defaults["delay_burst"]
num_burst_cycle = (
scaninfo["frames_per_trigger"] + ddg_config_defaults["additional_triggers"]
)
elif scaninfo["scan_type"] == "fly":
if ddg_config_scan["set_high_on_exposure"]:
num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"]
exp_time = (
ddg_config_defaults["delta_width"]
+ scaninfo["num_points"] * scaninfo["exp_time"]
+ (scaninfo["num_points"] - 1) * scaninfo["readout_time"]
)
total_exposure = exp_time
delay_burst = ddg_config_defaults["delay_burst"]
else:
exp_time = ddg_config_defaults["delta_width"] + scaninfo["exp_time"]
total_exposure = exp_time + scaninfo["readout_time"]
delay_burst = ddg_config_defaults["delay_burst"]
num_burst_cycle = scaninfo["num_points"] + ddg_config_defaults["additional_triggers"]
# mock_DDGSetup.parent.burst_enable.assert_called_once_with(
# mock.call(num_burst_cycle, delay_burst, total_exposure, config="first")
# )
mock_DDGSetup.parent.burst_enable.assert_called_once_with(
num_burst_cycle, delay_burst, total_exposure, config="first"
)
if not ddg_config_scan["trigger_width"]:
call = mock.call("width", exp_time)
assert call in mock_DDGSetup.parent.set_channels.mock_calls
else:
call = mock.call("width", ddg_config_scan["trigger_width"])
assert call in mock_DDGSetup.parent.set_channels.mock_calls
if ddg_config_scan["set_high_on_exposure"]:
calls = [
mock.call("width", value, channels=[channel])
for value, channel in zip(
ddg_config_scan["fixed_ttl_width"], channel_pairs["all_channels"]
)
if value != 0
]
if calls:
assert all(calls in mock_DDGSetup.parent.set_channels.mock_calls)