305 lines
12 KiB
Python
305 lines
12 KiB
Python
# pylint: skip-file
|
|
import threading
|
|
from typing import Generator
|
|
from unittest import mock
|
|
|
|
import numpy as np
|
|
import ophyd
|
|
import pytest
|
|
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
|
|
|
|
from csaxs_bec.devices.epics.delay_generator_csaxs import DDG1, DDG2
|
|
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (
|
|
BURSTCONFIG,
|
|
CHANNELREFERENCE,
|
|
STATUSBITS,
|
|
TRIGGERSOURCE,
|
|
DelayGeneratorCSAXS,
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def mock_ddg1() -> Generator[DDG1, DDG1, DDG1]:
|
|
"""Fixture to mock the DDG1 device."""
|
|
name = "ddg1"
|
|
prefix = "test_ddg1:"
|
|
with mock.patch.object(ophyd, "cl") as mock_cl:
|
|
mock_cl.get_pv = MockPV
|
|
mock_cl.thread_class = threading.Thread
|
|
dev = DDG1(name=name, prefix=prefix)
|
|
patch_dual_pvs(dev)
|
|
yield dev
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def mock_ddg2() -> Generator[DDG2, DDG2, DDG2]:
|
|
"""Fixture to mock the DDG1 device."""
|
|
name = "ddg2"
|
|
prefix = "test_ddg2:"
|
|
with mock.patch.object(ophyd, "cl") as mock_cl:
|
|
mock_cl.get_pv = MockPV
|
|
mock_cl.thread_class = threading.Thread
|
|
dev = DDG2(name=name, prefix=prefix)
|
|
patch_dual_pvs(dev)
|
|
yield dev
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def mock_ddg() -> Generator[DelayGeneratorCSAXS, DelayGeneratorCSAXS, DelayGeneratorCSAXS]:
|
|
"""Fixture to mock the camera device."""
|
|
name = "ddg"
|
|
prefix = "test:"
|
|
with mock.patch.object(ophyd, "cl") as mock_cl:
|
|
mock_cl.get_pv = MockPV
|
|
mock_cl.thread_class = threading.Thread
|
|
dev = DelayGeneratorCSAXS(name=name, prefix=prefix)
|
|
patch_dual_pvs(dev)
|
|
yield dev
|
|
|
|
|
|
def test_ddg_init(mock_ddg):
|
|
"""Test the proc event status method."""
|
|
assert mock_ddg.name == "ddg"
|
|
assert mock_ddg.prefix == "test:"
|
|
|
|
|
|
def test_ddg_proc_event_status(mock_ddg):
|
|
"""Test the proc event status method."""
|
|
mock_ddg.state.proc_status.put(0)
|
|
mock_ddg.proc_event_status()
|
|
assert mock_ddg.state.proc_status.get() == 1
|
|
|
|
|
|
def test_ddg_set_trigger(mock_ddg):
|
|
"""Test setting the trigger."""
|
|
for trigger in TRIGGERSOURCE:
|
|
mock_ddg.set_trigger(trigger)
|
|
assert mock_ddg.trigger_source.get() == trigger.value
|
|
|
|
|
|
def test_ddg_burst_enable(mock_ddg):
|
|
"""Test enabling burst mode."""
|
|
mock_ddg.burst_enable(count=100, delay=0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES)
|
|
mock_ddg.burst_mode.get() == 1
|
|
assert mock_ddg.burst_count.get() == 100
|
|
assert mock_ddg.burst_delay.get() == 0.1
|
|
assert mock_ddg.burst_period.get() == 0.02
|
|
assert mock_ddg.burst_config.get() == BURSTCONFIG.ALL_CYCLES.value
|
|
assert mock_ddg.burst_mode.get() == 1
|
|
# Count is 0
|
|
with pytest.raises(ValueError):
|
|
mock_ddg.burst_enable(count=0, delay=0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES)
|
|
# delay is negative
|
|
with pytest.raises(ValueError):
|
|
mock_ddg.burst_enable(count=100, delay=-0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES)
|
|
# period is zero
|
|
with pytest.raises(ValueError):
|
|
mock_ddg.burst_enable(count=100, delay=0.1, period=0, config=BURSTCONFIG.ALL_CYCLES)
|
|
|
|
# Works with default config
|
|
mock_ddg.burst_enable(count=100, delay=0.1, period=0.02)
|
|
mock_ddg.burst_mode.get() == BURSTCONFIG.FIRST_CYCLE.value
|
|
|
|
|
|
def test_ddg_wait_for_event_status(mock_ddg):
|
|
"""Test setting wait for event status."""
|
|
mock_ddg: DelayGeneratorCSAXS
|
|
mock_ddg.state.event_status._read_pv.mock_data = 0
|
|
status = mock_ddg.wait_for_event_status(value=STATUSBITS.END_OF_BURST) # 8
|
|
assert status.done is False
|
|
mock_ddg.state.event_status._read_pv.mock_data = 1
|
|
assert status.done is False
|
|
mock_ddg.state.event_status._read_pv.mock_data = 4
|
|
assert status.done is False
|
|
# TODO enable once callback for MockPV is implemented
|
|
# mock_ddg.state.event_status._read_pv.mock_data = 13 # 8 + 4 + 1
|
|
# status.wait(timeout=1) # Wait for the status to be done
|
|
# assert status.done is True
|
|
|
|
|
|
def test_ddg_set_io_values(mock_ddg):
|
|
"""Test setting IO values."""
|
|
mock_ddg.set_io_values(channel="ab", amplitude=3, offset=2, polarity=1, mode="ttl")
|
|
assert mock_ddg.ab.io.amplitude.get() == 3
|
|
assert mock_ddg.ab.io.offset.get() == 2
|
|
assert mock_ddg.ab.io.polarity.get() == 1
|
|
assert mock_ddg.ab.io.ttl_mode.get() == 1
|
|
# List of channels
|
|
channels = ["ab", "cd", "t0"]
|
|
mock_ddg.set_io_values(channel=channels, amplitude=3, offset=2, polarity=1, mode="nim")
|
|
for channel in channels:
|
|
if channel == "t0":
|
|
attr = getattr(mock_ddg, channel)
|
|
else:
|
|
attr = getattr(mock_ddg, channel).io
|
|
assert attr.amplitude.get() == 3
|
|
assert attr.offset.get() == 2
|
|
assert attr.polarity.get() == 1
|
|
assert attr.nim_mode.get() == 1
|
|
|
|
|
|
def test_ddg_set_delay_pairs(mock_ddg):
|
|
"""Test setting delay pairs."""
|
|
mock_ddg.set_delay_pairs(channel="ab", delay=0.1, width=0.2)
|
|
assert np.isclose(mock_ddg.ab.delay.get(), 0.1)
|
|
assert np.isclose(mock_ddg.ab.width.get(), 0.2)
|
|
assert np.isclose(mock_ddg.ab.ch1.setpoint.get(), 0.1)
|
|
assert np.isclose(mock_ddg.ab.ch2.setpoint.get(), 0.3)
|
|
# List of channels
|
|
channels = ["ab", "cd", "ef", "gh"]
|
|
delays = [0.1, 0.2, 0.4, 0.5]
|
|
mock_ddg.set_delay_pairs(channel=channels, delay=delays, width=0.2)
|
|
for delay, channel in zip(delays, channels):
|
|
assert np.isclose(getattr(mock_ddg, channel).delay.get(), delay)
|
|
assert np.isclose(getattr(mock_ddg, channel).width.get(), 0.2)
|
|
assert np.isclose(getattr(mock_ddg, channel).ch1.setpoint.get(), delay)
|
|
assert np.isclose(getattr(mock_ddg, channel).ch2.setpoint.get(), delay + 0.2)
|
|
|
|
|
|
def test_ddg1_on_connected(mock_ddg1):
|
|
"""Test the on_connected method of DDG1."""
|
|
mock_ddg1.on_connected()
|
|
# IO defaults
|
|
assert mock_ddg1.burst_mode.get() == 0
|
|
assert mock_ddg1.ab.io.amplitude.get() == 5.0
|
|
assert mock_ddg1.cd.io.offset.get() == 0.0
|
|
assert mock_ddg1.ef.io.polarity.get() == 1
|
|
assert mock_ddg1.gh.io.ttl_mode.get() == 1
|
|
|
|
# reference defaults
|
|
assert mock_ddg1.ab.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
|
assert mock_ddg1.ab.ch2.reference.get() == 1 # CHANNELREFERENCE.A.value
|
|
assert mock_ddg1.cd.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
|
assert mock_ddg1.cd.ch2.reference.get() == 3 # CHANNELREFERENCE.C.value
|
|
assert mock_ddg1.ef.ch1.reference.get() == 4 # CHANNELREFERENCE.D.value
|
|
assert mock_ddg1.ef.ch2.reference.get() == 5 # CHANNELREFERENCE.E.value
|
|
assert mock_ddg1.gh.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
|
assert mock_ddg1.gh.ch2.reference.get() == 7 # CHANNELREFERENCE.G.value
|
|
|
|
# Default trigger source
|
|
assert mock_ddg1.trigger_source.get() == 5 # TRIGGERSOURCE.SINGLE_SHOT.value
|
|
|
|
|
|
def test_ddg1_stage(mock_ddg1):
|
|
"""Test the on_stage method of DDG1."""
|
|
exp_time = 0.1
|
|
frames_per_trigger = 10
|
|
|
|
mock_ddg1.burst_mode.put(1)
|
|
mock_ddg1.scan_info.msg.scan_parameters["exp_time"] = exp_time
|
|
mock_ddg1.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
|
|
|
|
mock_ddg1.stage()
|
|
|
|
assert np.isclose(mock_ddg1.burst_mode.get(), 1) # burst mode is enabled
|
|
assert np.isclose(mock_ddg1.burst_delay.get(), 0)
|
|
assert np.isclose(mock_ddg1.burst_period.get(), exp_time)
|
|
|
|
# Trigger DDG2 through EXT/EN
|
|
|
|
assert np.isclose(mock_ddg1.ab.delay.get(), 2e-3)
|
|
assert np.isclose(mock_ddg1.ab.width.get(), 1e-6)
|
|
# Shutter channel cd
|
|
assert np.isclose(mock_ddg1.cd.delay.get(), 0)
|
|
assert np.isclose(mock_ddg1.cd.width.get(), 2e-3 + exp_time * frames_per_trigger + 1e-3)
|
|
# MCS channel ef or gate
|
|
assert np.isclose(mock_ddg1.ef.delay.get(), 0)
|
|
assert np.isclose(mock_ddg1.ef.width.get(), 1e-6)
|
|
|
|
assert mock_ddg1.staged == ophyd.Staged.yes
|
|
|
|
|
|
def test_ddg1_trigger(mock_ddg1):
|
|
"""Test the on_trigger method of DDG1."""
|
|
mock_ddg1.state.event_status._read_pv.mock_data = STATUSBITS.NONE.value
|
|
|
|
with mock.patch.object(mock_ddg1, "device_manager") as mock_device_manager:
|
|
# TODO add device manager DMMock, and properly test logic for mcs triggering.
|
|
mock_get = mock_device_manager.devices.get = mock.Mock(return_value=None)
|
|
status = mock_ddg1.trigger()
|
|
assert mock_get.call_args == mock.call("mcs", None)
|
|
assert status.done is False
|
|
assert status.success is False
|
|
assert mock_ddg1.trigger_shot.get() == 1
|
|
mock_ddg1.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_BURST.value
|
|
status.wait(timeout=1) # Wait for the status to be done
|
|
assert status.done is True
|
|
assert status.success is True
|
|
|
|
|
|
def test_ddg1_stop(mock_ddg1):
|
|
"""Test the on_stop method of DDG1."""
|
|
mock_ddg1.burst_mode.put(1) # Enable burst mode
|
|
mock_ddg1.stop()
|
|
assert mock_ddg1.burst_mode.get() == 0 # Burst mode is disabled
|
|
|
|
|
|
def test_ddg2_on_connected(mock_ddg2):
|
|
"""Test on connected method of DDG2."""
|
|
mock_ddg2.on_connected()
|
|
# IO defaults
|
|
assert mock_ddg2.burst_mode.get() == 0
|
|
assert mock_ddg2.ab.io.amplitude.get() == 5.0
|
|
assert mock_ddg2.cd.io.offset.get() == 0.0
|
|
assert mock_ddg2.ef.io.polarity.get() == 1
|
|
assert mock_ddg2.gh.io.ttl_mode.get() == 1
|
|
|
|
# reference defaults
|
|
assert mock_ddg2.ab.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
|
assert mock_ddg2.ab.ch2.reference.get() == 1 # CHANNELREFERENCE.A.value
|
|
assert mock_ddg2.cd.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
|
assert mock_ddg2.cd.ch2.reference.get() == 3 # CHANNELREFERENCE.C.value
|
|
assert mock_ddg2.ef.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
|
assert mock_ddg2.ef.ch2.reference.get() == 5 # CHANNELREFERENCE.E.value
|
|
assert mock_ddg2.gh.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
|
|
assert mock_ddg2.gh.ch2.reference.get() == 7 # CHANNELREFERENCE.G.value
|
|
|
|
# Default trigger source
|
|
assert mock_ddg2.trigger_source.get() == 1 # TRIGGERSOURCE.EXT_RISING_EDGE.value
|
|
|
|
|
|
def test_ddg2_stage(mock_ddg2):
|
|
"""Test the on_stage method of DDG2."""
|
|
exp_time = 0.1
|
|
frames_per_trigger = 10
|
|
mock_ddg2.on_connected()
|
|
|
|
mock_ddg2.burst_mode.put(0)
|
|
mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = exp_time
|
|
mock_ddg2.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
|
|
|
|
mock_ddg2.stage()
|
|
|
|
assert np.isclose(mock_ddg2.burst_mode.get(), 1) # Burst mode is enabled
|
|
assert np.isclose(mock_ddg2.ab.delay.get(), 0)
|
|
assert np.isclose(mock_ddg2.ab.width.get(), exp_time - 2e-4) # DEFAULT_READOUT_TIMES["ab"])
|
|
assert mock_ddg2.burst_count.get() == frames_per_trigger
|
|
assert np.isclose(mock_ddg2.burst_delay.get(), 0)
|
|
assert np.isclose(mock_ddg2.burst_period.get(), exp_time)
|
|
|
|
assert mock_ddg2.trigger_source.get() == TRIGGERSOURCE.EXT_RISING_EDGE.value
|
|
|
|
assert mock_ddg2.staged == ophyd.Staged.yes
|
|
mock_ddg2.unstage() # Reset staged state for next test
|
|
|
|
with pytest.raises(ValueError):
|
|
mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = 2e-4 # too short exposure time
|
|
mock_ddg2.stage()
|
|
|
|
|
|
def test_ddg2_trigger(mock_ddg2):
|
|
"""Test the on_trigger method of DDG2."""
|
|
mock_ddg2.trigger_shot.put(0)
|
|
status = mock_ddg2.trigger()
|
|
assert mock_ddg2.trigger_shot.get() == 0 # Should not trigger DDG2 via soft trigger
|
|
status.wait()
|
|
assert status.done is True
|
|
assert status.success is True
|
|
|
|
|
|
def test_ddg2_stop(mock_ddg2):
|
|
"""Test the on_stop method of DDG2."""
|
|
mock_ddg2.burst_mode.put(1) # Enable burst mode
|
|
mock_ddg2.stop()
|
|
assert mock_ddg2.burst_mode.get() == 0 # Burst mode is disabled
|