test: add tests for ddg1 and ddg2 implementation at csaxs
This commit is contained in:
@@ -1,2 +1,14 @@
|
||||
from .ddg_1 import DDG1
|
||||
from .ddg_2 import DDG2
|
||||
from .delay_generator_csaxs import (
|
||||
BURSTCONFIG,
|
||||
CHANNELREFERENCE,
|
||||
OUTPUTPOLARITY,
|
||||
STATUSBITS,
|
||||
TRIGGERINHIBIT,
|
||||
TRIGGERSOURCE,
|
||||
AllChannelNames,
|
||||
ChannelConfig,
|
||||
DelayChannelNames,
|
||||
)
|
||||
from .error_registry import ERROR_CODES
|
||||
|
||||
@@ -8,14 +8,42 @@ import ophyd
|
||||
import pytest
|
||||
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
|
||||
|
||||
from csaxs_bec.devices.epics.delay_generator_csaxs import DDG1, DDG2
|
||||
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (
|
||||
BURSTCONFIG,
|
||||
CHANNELREFERENCE,
|
||||
STATUSBITS,
|
||||
TRIGGERSOURCE,
|
||||
DelayGeneratorCSAXS,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def mock_ddg1() -> Generator[DDG1, DDG1, DDG1]:
|
||||
"""Fixture to mock the DDG1 device."""
|
||||
name = "ddg1"
|
||||
prefix = "test_ddg1:"
|
||||
with mock.patch.object(ophyd, "cl") as mock_cl:
|
||||
mock_cl.get_pv = MockPV
|
||||
mock_cl.thread_class = threading.Thread
|
||||
dev = DDG1(name=name, prefix=prefix)
|
||||
patch_dual_pvs(dev)
|
||||
yield dev
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def mock_ddg2() -> Generator[DDG2, DDG2, DDG2]:
|
||||
"""Fixture to mock the DDG1 device."""
|
||||
name = "ddg2"
|
||||
prefix = "test_ddg2:"
|
||||
with mock.patch.object(ophyd, "cl") as mock_cl:
|
||||
mock_cl.get_pv = MockPV
|
||||
mock_cl.thread_class = threading.Thread
|
||||
dev = DDG2(name=name, prefix=prefix)
|
||||
patch_dual_pvs(dev)
|
||||
yield dev
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def mock_ddg() -> Generator[DelayGeneratorCSAXS, DelayGeneratorCSAXS, DelayGeneratorCSAXS]:
|
||||
"""Fixture to mock the camera device."""
|
||||
@@ -126,3 +154,136 @@ def test_ddg_set_delay_pairs(mock_ddg):
|
||||
assert np.isclose(getattr(mock_ddg, channel).width.get(), 0.2)
|
||||
assert np.isclose(getattr(mock_ddg, channel).ch1.setpoint.get(), delay)
|
||||
assert np.isclose(getattr(mock_ddg, channel).ch2.setpoint.get(), delay + 0.2)
|
||||
|
||||
|
||||
def test_ddg1_on_connected(mock_ddg1):
|
||||
"""Test the on_connected method of DDG1."""
|
||||
mock_ddg1.on_connected()
|
||||
# IO defaults
|
||||
assert mock_ddg1.burst_mode.get() == 0
|
||||
assert mock_ddg1.ab.io.amplitude.get() == 5.0
|
||||
assert mock_ddg1.cd.io.offset.get() == 0.0
|
||||
assert mock_ddg1.ef.io.polarity.get() == 1
|
||||
assert mock_ddg1.gh.io.ttl_mode.get() == 1
|
||||
|
||||
# reference defaults
|
||||
assert mock_ddg1.ab.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
||||
assert mock_ddg1.ab.ch2.reference.get() == 1 # CHANNELREFERENCE.A.value
|
||||
assert mock_ddg1.cd.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
||||
assert mock_ddg1.cd.ch2.reference.get() == 3 # CHANNELREFERENCE.C.value
|
||||
assert mock_ddg1.ef.ch1.reference.get() == 4 # CHANNELREFERENCE.D.value
|
||||
assert mock_ddg1.ef.ch2.reference.get() == 5 # CHANNELREFERENCE.E.value
|
||||
assert mock_ddg1.gh.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
||||
assert mock_ddg1.gh.ch2.reference.get() == 7 # CHANNELREFERENCE.G.value
|
||||
|
||||
# Default trigger source
|
||||
assert mock_ddg1.trigger_source.get() == 5 # TRIGGERSOURCE.SINGLE_SHOT.value
|
||||
|
||||
|
||||
def test_ddg1_stage(mock_ddg1):
|
||||
"""Test the on_stage method of DDG1."""
|
||||
exp_time = 0.1
|
||||
frames_per_trigger = 10
|
||||
|
||||
mock_ddg1.burst_mode.put(1)
|
||||
mock_ddg1.scan_info.msg.scan_parameters["exp_time"] = exp_time
|
||||
mock_ddg1.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
|
||||
|
||||
mock_ddg1.stage()
|
||||
|
||||
assert np.isclose(mock_ddg1.burst_mode.get(), 0) # Burst mode is disabled
|
||||
# Trigger DDG2 through EXT/EN
|
||||
|
||||
assert np.isclose(mock_ddg1.ab.delay.get(), 2e-3)
|
||||
assert np.isclose(mock_ddg1.ab.width.get(), 1e-6)
|
||||
# Shutter channel cd
|
||||
assert np.isclose(mock_ddg1.cd.delay.get(), 0)
|
||||
assert np.isclose(mock_ddg1.cd.width.get(), 2e-3 + exp_time * frames_per_trigger + 1e-3)
|
||||
# MCS channel ef or gate
|
||||
assert np.isclose(mock_ddg1.ef.delay.get(), 0)
|
||||
assert np.isclose(mock_ddg1.ef.width.get(), 1e-6)
|
||||
|
||||
assert mock_ddg1.staged == ophyd.Staged.yes
|
||||
|
||||
|
||||
def test_ddg1_trigger(mock_ddg1):
|
||||
"""Test the on_trigger method of DDG1."""
|
||||
mock_ddg1.state.event_status._read_pv.mock_data = (
|
||||
5 # STATUSBITS.END_OF_DELAY.value + STATUSBITS.TRIG.value
|
||||
)
|
||||
status = mock_ddg1.trigger()
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
assert mock_ddg1.trigger_shot.get() == 1
|
||||
|
||||
|
||||
def test_ddg1_stop(mock_ddg1):
|
||||
"""Test the on_stop method of DDG1."""
|
||||
mock_ddg1.burst_mode.put(1) # Enable burst mode
|
||||
mock_ddg1.stop()
|
||||
assert mock_ddg1.burst_mode.get() == 0 # Burst mode is disabled
|
||||
|
||||
|
||||
def test_ddg2_on_connected(mock_ddg2):
|
||||
"""Test on connected method of DDG2."""
|
||||
mock_ddg2.on_connected()
|
||||
# IO defaults
|
||||
assert mock_ddg2.burst_mode.get() == 0
|
||||
assert mock_ddg2.ab.io.amplitude.get() == 5.0
|
||||
assert mock_ddg2.cd.io.offset.get() == 0.0
|
||||
assert mock_ddg2.ef.io.polarity.get() == 1
|
||||
assert mock_ddg2.gh.io.ttl_mode.get() == 1
|
||||
|
||||
# reference defaults
|
||||
assert mock_ddg2.ab.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
||||
assert mock_ddg2.ab.ch2.reference.get() == 1 # CHANNELREFERENCE.A.value
|
||||
assert mock_ddg2.cd.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
||||
assert mock_ddg2.cd.ch2.reference.get() == 3 # CHANNELREFERENCE.C.value
|
||||
assert mock_ddg2.ef.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
||||
assert mock_ddg2.ef.ch2.reference.get() == 5 # CHANNELREFERENCE.E.value
|
||||
assert mock_ddg2.gh.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
||||
assert mock_ddg2.gh.ch2.reference.get() == 7 # CHANNELREFERENCE.G.value
|
||||
|
||||
# Default trigger source
|
||||
assert mock_ddg2.trigger_source.get() == 1 # TRIGGERSOURCE.EXT_RISING_EDGE.value
|
||||
|
||||
|
||||
def test_ddg2_stage(mock_ddg2):
|
||||
"""Test the on_stage method of DDG2."""
|
||||
exp_time = 0.1
|
||||
frames_per_trigger = 10
|
||||
mock_ddg2.on_connected()
|
||||
|
||||
mock_ddg2.burst_mode.put(0)
|
||||
mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = exp_time
|
||||
mock_ddg2.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
|
||||
|
||||
mock_ddg2.stage()
|
||||
|
||||
assert np.isclose(mock_ddg2.burst_mode.get(), 1) # Burst mode is enabled
|
||||
assert np.isclose(mock_ddg2.ab.delay.get(), 0)
|
||||
assert np.isclose(mock_ddg2.ab.width.get(), exp_time - 2e-4) # DEFAULT_READOUT_TIMES["ab"])
|
||||
assert mock_ddg2.burst_count.get() == frames_per_trigger
|
||||
assert np.isclose(mock_ddg2.burst_delay.get(), 0)
|
||||
assert np.isclose(mock_ddg2.burst_period.get(), exp_time)
|
||||
|
||||
assert mock_ddg2.trigger_source.get() == TRIGGERSOURCE.EXT_RISING_EDGE.value
|
||||
|
||||
assert mock_ddg2.staged == ophyd.Staged.yes
|
||||
|
||||
|
||||
def test_ddg2_trigger(mock_ddg2):
|
||||
"""Test the on_trigger method of DDG2."""
|
||||
mock_ddg2.trigger_shot.put(0)
|
||||
status = mock_ddg2.trigger()
|
||||
assert mock_ddg2.trigger_shot.get() == 0 # Should not trigger DDG2 via soft trigger
|
||||
status.wait()
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
|
||||
|
||||
def test_ddg2_stop(mock_ddg2):
|
||||
"""Test the on_stop method of DDG2."""
|
||||
mock_ddg2.burst_mode.put(1) # Enable burst mode
|
||||
mock_ddg2.stop()
|
||||
assert mock_ddg2.burst_mode.get() == 0 # Burst mode is disabled
|
||||
|
||||
Reference in New Issue
Block a user