Files
csaxs_bec/tests/tests_devices/test_delay_generator_csaxs.py

570 lines
21 KiB
Python

# pylint: skip-file
import threading
from typing import Generator
from unittest import mock
import numpy as np
import ophyd
import pytest
from bec_server.device_server.tests.utils import DMMock
from ophyd_devices.tests.utils import patched_device
from csaxs_bec.devices.epics.delay_generator_csaxs import DDG1, DDG2
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import (
DEFAULT_IO_CONFIG as DDG1_DEFAULT_IO_CONFIG,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import (
DEFAULT_READOUT_TIMES as DDG1_DEFAULT_READOUT_TIMES,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import (
DEFAULT_REFERENCES as DDG1_DEFAULT_REFERENCES,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import (
DEFAULT_TRIGGER_SOURCE as DDG1_DEFAULT_TRIGGER_SOURCE,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import PROC_EVENT_MODE
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_2 import (
DEFAULT_IO_CONFIG as DDG2_DEFAULT_IO_CONFIG,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_2 import (
DEFAULT_READOUT_TIMES as DDG2_DEFAULT_READOUT_TIMES,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_2 import (
DEFAULT_REFERENCES as DDG2_DEFAULT_REFERENCES,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_2 import (
DEFAULT_TRIGGER_SOURCE as DDG2_DEFAULT_TRIGGER_SOURCE,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (
BURSTCONFIG,
CHANNELREFERENCE,
STATUSBITS,
TRIGGERSOURCE,
DelayGeneratorCSAXS,
)
from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import MCSCardCSAXS
############################
### Test Delay Generator ###
############################
@pytest.fixture(scope="function")
def mock_ddg() -> Generator[DelayGeneratorCSAXS, DelayGeneratorCSAXS, DelayGeneratorCSAXS]:
"""Fixture to mock the camera device."""
with patched_device(
DelayGeneratorCSAXS, name="ddg", prefix="test:", _mock_pv_initial_value=0
) as dev:
try:
yield dev
finally:
dev.destroy()
def test_ddg_init(mock_ddg: DelayGeneratorCSAXS):
"""Test the proc event status method."""
assert mock_ddg.name == "ddg"
assert mock_ddg.prefix == "test:"
def test_ddg_proc_event_status(mock_ddg: DelayGeneratorCSAXS):
"""Test the proc event status method."""
mock_ddg.state.proc_status.put(0)
mock_ddg.proc_event_status()
assert mock_ddg.state.proc_status.get() == 1
def test_ddg_set_trigger(mock_ddg: DelayGeneratorCSAXS):
"""Test setting the trigger."""
for trigger in TRIGGERSOURCE:
mock_ddg.set_trigger(trigger)
assert mock_ddg.trigger_source.get() == trigger.value
def test_ddg_burst_enable(mock_ddg: DelayGeneratorCSAXS):
"""Test enabling burst mode."""
mock_ddg.burst_enable(count=100, delay=0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES)
mock_ddg.burst_mode.get() == 1
assert mock_ddg.burst_count.get() == 100
assert mock_ddg.burst_delay.get() == 0.1
assert mock_ddg.burst_period.get() == 0.02
assert mock_ddg.burst_config.get() == BURSTCONFIG.ALL_CYCLES.value
assert mock_ddg.burst_mode.get() == 1
# Count is 0
with pytest.raises(ValueError):
mock_ddg.burst_enable(count=0, delay=0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES)
# delay is negative
with pytest.raises(ValueError):
mock_ddg.burst_enable(count=100, delay=-0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES)
# period is zero
with pytest.raises(ValueError):
mock_ddg.burst_enable(count=100, delay=0.1, period=0, config=BURSTCONFIG.ALL_CYCLES)
# Works with default config
mock_ddg.burst_enable(count=100, delay=0.1, period=0.02)
mock_ddg.burst_mode.get() == BURSTCONFIG.FIRST_CYCLE.value
def test_ddg_wait_for_event_status(mock_ddg: DelayGeneratorCSAXS):
"""Test setting wait for event status."""
mock_ddg: DelayGeneratorCSAXS
mock_ddg.state.event_status._read_pv.mock_data = 0
status = mock_ddg.wait_for_event_status(value=STATUSBITS.END_OF_BURST) # 8
assert status.done is False
mock_ddg.state.event_status._read_pv.mock_data = 1
assert status.done is False
mock_ddg.state.event_status._read_pv.mock_data = 4
assert status.done is False
# TODO enable once callback for MockPV is implemented
# mock_ddg.state.event_status._read_pv.mock_data = 13 # 8 + 4 + 1
# status.wait(timeout=1) # Wait for the status to be done
# assert status.done is True
def test_ddg_set_io_values(mock_ddg: DelayGeneratorCSAXS):
"""Test setting IO values."""
mock_ddg.set_io_values(channel="ab", amplitude=3, offset=2, polarity=1, mode="ttl")
assert mock_ddg.ab.io.amplitude.get() == 3
assert mock_ddg.ab.io.offset.get() == 2
assert mock_ddg.ab.io.polarity.get() == 1
assert mock_ddg.ab.io.ttl_mode.get() == 1
# List of channels
channels = ["ab", "cd", "t0"]
mock_ddg.set_io_values(channel=channels, amplitude=3, offset=2, polarity=1, mode="nim")
for channel in channels:
if channel == "t0":
attr = getattr(mock_ddg, channel)
else:
attr = getattr(mock_ddg, channel).io
assert attr.amplitude.get() == 3
assert attr.offset.get() == 2
assert attr.polarity.get() == 1
assert attr.nim_mode.get() == 1
def test_ddg_set_delay_pairs(mock_ddg: DelayGeneratorCSAXS):
"""Test setting delay pairs."""
mock_ddg.set_delay_pairs(channel="ab", delay=0.1, width=0.2)
assert np.isclose(mock_ddg.ab.delay.get(), 0.1)
assert np.isclose(mock_ddg.ab.width.get(), 0.2)
assert np.isclose(mock_ddg.ab.ch1.setpoint.get(), 0.1)
assert np.isclose(mock_ddg.ab.ch2.setpoint.get(), 0.3)
# List of channels
channels = ["ab", "cd", "ef", "gh"]
delays = [0.1, 0.2, 0.4, 0.5]
mock_ddg.set_delay_pairs(channel=channels, delay=delays, width=0.2)
for delay, channel in zip(delays, channels):
assert np.isclose(getattr(mock_ddg, channel).delay.get(), delay)
assert np.isclose(getattr(mock_ddg, channel).width.get(), 0.2)
assert np.isclose(getattr(mock_ddg, channel).ch1.setpoint.get(), delay)
assert np.isclose(getattr(mock_ddg, channel).ch2.setpoint.get(), delay + 0.2)
#########################
### Test DDG1 Device ####
#########################
@pytest.fixture(scope="function")
def mock_mcs_csaxs() -> Generator[MCSCardCSAXS, None, None]:
"""Fixture to mock the MCSCardCSAXS device."""
dm = DMMock()
with patched_device(
MCSCardCSAXS,
name="mcs",
prefix="X12SA-MCS-CSAXS:",
device_manager=dm,
_mock_pv_initial_value=0,
) as dev:
dev.enabled = True
dev.device_manager.devices["mcs"] = dev
try:
yield dev
finally:
dev.destroy()
@pytest.fixture(scope="function")
def mock_ddg1(mock_mcs_csaxs: MCSCardCSAXS) -> Generator[DDG1, None, None]:
"""Fixture to mock the DDG1 device."""
# Add enabled to mock_mcs_csaxs
dm_mock = mock_mcs_csaxs.device_manager
with patched_device(
DDG1, name="ddg1", prefix="test_ddg1:", device_manager=dm_mock, _mock_pv_initial_value=0
) as dev:
dev.enabled = True
dev.device_manager.devices["ddg1"] = dev
try:
yield dev
finally:
dev.destroy()
def test_ddg1_on_connected(mock_ddg1: DDG1):
"""Test the on_connected method of DDG1."""
mock_ddg1.burst_mode.put(1) # Set burst mode to 1, if connected should reset it to 0
mock_ddg1.burst_delay.put(5) # Set to non-zero, should reset to 0 on connected
mock_ddg1.burst_count.put(10) # Set to non-default, should reset to 1 on connected
with mock.patch.object(mock_ddg1, "set_io_values") as mock_set_io_values:
mock_ddg1.on_connected()
# Burst mode Defaults
assert mock_ddg1.burst_mode.get() == 0
assert mock_ddg1.burst_delay.get() == 0
assert mock_ddg1.burst_count.get() == 1
assert mock_set_io_values.call_count == len(DDG1_DEFAULT_IO_CONFIG)
for ch, config in DDG1_DEFAULT_IO_CONFIG.items():
assert mock.call(ch, **config) in mock_set_io_values.call_args_list
# Check reference values from DEFAULT_REFERENCES
for ch, refs in DDG1_DEFAULT_REFERENCES:
if ch == "A":
sub_ch = mock_ddg1.ab.ch1
elif ch == "B":
sub_ch = mock_ddg1.ab.ch2
elif ch == "C":
sub_ch = mock_ddg1.cd.ch1
elif ch == "D":
sub_ch = mock_ddg1.cd.ch2
elif ch == "E":
sub_ch = mock_ddg1.ef.ch1
elif ch == "F":
sub_ch = mock_ddg1.ef.ch2
elif ch == "G":
sub_ch = mock_ddg1.gh.ch1
elif ch == "H":
sub_ch = mock_ddg1.gh.ch2
assert sub_ch.reference.get() == refs.value
# Check Default trigger source
assert mock_ddg1.trigger_source.get() == DDG1_DEFAULT_TRIGGER_SOURCE.value
# Check proc state mode
assert mock_ddg1.state.proc_status_mode.get() == PROC_EVENT_MODE.EVENT.value
# Check the poll thread is started
assert mock_ddg1._poll_thread.is_alive()
assert not mock_ddg1._poll_thread_kill_event.is_set()
assert not mock_ddg1._poll_thread_poll_loop_done.is_set()
assert not mock_ddg1._poll_thread_run_event.is_set()
def test_ddg1_prepare_mcs(mock_ddg1: DDG1, mock_mcs_csaxs: MCSCardCSAXS):
"""Test the prepare_mcs method of DDG1."""
mcs = mock_mcs_csaxs
ddg = mock_ddg1
# Simulate default state
mcs.acquiring._read_pv.mock_data = 0 # not acquiring
mcs.erase_start.put(0) # reset erase start
# Prepare MCS on trigger
st = ddg._prepare_mcs_on_trigger(mcs)
assert st.done is False
assert st.success is False
assert mcs.erase_start.get() == 1 # erase started
# Simulate acquiring started
mcs.acquiring._read_pv.mock_data = 1 # acquiring
st.wait(2)
assert st.done is True
assert st.success is True
def test_ddg1_stage(mock_ddg1: DDG1):
"""Test the on_stage method of DDG1."""
exp_time = 0.1
frames_per_trigger = 10
mock_ddg1.burst_mode.put(0) # Non-default, should be reset on stage
mock_ddg1.burst_delay.put(5) # Non-default, should be reset on stage
mock_ddg1.burst_count.put(10) # Non-default, should be reset on stage
mock_ddg1.scan_info.msg.scan_parameters["exp_time"] = exp_time
mock_ddg1.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
mock_ddg1.stage()
shutter_width = 2e-3 + exp_time * frames_per_trigger + 1e-3
assert np.isclose(mock_ddg1.burst_mode.get(), 1) # burst mode is enabled
assert np.isclose(mock_ddg1.burst_delay.get(), 0)
assert np.isclose(mock_ddg1.burst_period.get(), shutter_width)
# Trigger DDG2 through EXT/EN
assert np.isclose(mock_ddg1.ab.delay.get(), 2e-3)
assert np.isclose(mock_ddg1.ab.width.get(), 1e-6)
# Shutter channel cd
assert np.isclose(mock_ddg1.cd.delay.get(), 0)
assert np.isclose(mock_ddg1.cd.width.get(), shutter_width)
# MCS channel ef or gate
assert np.isclose(mock_ddg1.ef.delay.get(), 0)
assert np.isclose(mock_ddg1.ef.width.get(), 1e-6)
assert mock_ddg1.staged == ophyd.Staged.yes
def test_ddg1_on_trigger(mock_ddg1: DDG1):
"""
Test the on_trigger method of the DDG1.
We will test two scenarios:
I. Trigger is prepared, and resolves successfully after END_OF_BURST is reached in event status register.
II. Trigger is called while _poll_thread_loop_done is not yet finished from a previous trigger.
This may be the case if polling is yet to finsish. The next on_trigger should terminate the previous
polling, and work as expected. In addition, we will simulate that the mcs card is disabled, thus not prepared.
"""
ddg = mock_ddg1
# Make sure DDG is setup in default state through on_connected
ddg.on_connected()
# Check that poll thread is running and run event is not set
assert ddg._poll_thread.is_alive()
assert not ddg._poll_thread_run_event.is_set()
assert not ddg._poll_thread_poll_loop_done.is_set()
# Set the status register bit
ddg.state.event_status._read_pv.mock_data = STATUSBITS.ABORT_DELAY.value
#################################
# Scenario I - normal operation #
#################################
with mock.patch.object(ddg, "_prepare_mcs_on_trigger") as mock_prepare_mcs:
mock_prepare_mcs.return_value = ophyd.StatusBase(done=True, success=True)
status = ddg.trigger()
# Check that the poll thread run event is set
assert ddg._poll_thread_run_event.is_set()
assert not ddg._poll_thread_poll_loop_done.is_set()
assert status.done is False
assert status.success is False
assert ddg.trigger_shot.get() == 1
# Simulate that the event status bit reaches END_OF_BURST
ddg.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_BURST.value
status.wait(timeout=1) # Wait for the status to be done
assert status.done is True
assert status.success is True
# Should finish the poll loop
ddg._poll_thread_poll_loop_done.wait(timeout=1)
assert not ddg._poll_thread_run_event.is_set()
############################################
# Scenario II - previous poll not finished #
# MCS card disabled #
############################################
# Set mcs card to enabled = False
ddg.device_manager.devices["mcs"].enabled = False
ddg.state.event_status._read_pv.mock_data = STATUSBITS.ABORT_DELAY.value
ddg._start_polling()
assert ddg._poll_thread_run_event.is_set()
with mock.patch.object(ddg, "_prepare_mcs_on_trigger") as mock_prepare_mcs:
status = ddg.trigger()
mock_prepare_mcs.assert_not_called() # MCS is disabled, should not be called
assert status.done is False
assert status.success is False
# Resolve the status by simulating END_OF_BURST
ddg.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_BURST.value
status.wait(timeout=1) # Wait for the status to be done
assert status.done is True
assert status.success is True
# Wait for poll loop to finish
ddg._poll_thread_poll_loop_done.wait(timeout=1)
assert not ddg._poll_thread_run_event.is_set()
# def test_ddg1_trigger(mock_ddg1):
# """Test the on_trigger method of DDG1."""
# mock_ddg1.state.event_status._read_pv.mock_data = STATUSBITS.NONE.value
# with mock.patch.object(mock_ddg1, "device_manager") as mock_device_manager:
# # TODO add device manager DMMock, and properly test logic for mcs triggering.
# mock_get = mock_device_manager.devices.get = mock.Mock(return_value=None)
# status = mock_ddg1.trigger()
# assert mock_get.call_args == mock.call("mcs", None)
# assert status.done is False
# assert status.success is False
# assert mock_ddg1.trigger_shot.get() == 1
# mock_ddg1.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_BURST.value
# status.wait(timeout=1) # Wait for the status to be done
# assert status.done is True
# assert status.success is True
# def test_ddg1_stop(mock_ddg1):
# """Test the on_stop method of DDG1."""
# mock_ddg1.burst_mode.put(1) # Enable burst mode
# mock_ddg1.stop()
# assert mock_ddg1.burst_mode.get() == 0 # Burst mode is disabled
#########################
### Test DDG2 Device ####
#########################
@pytest.fixture(scope="function")
def mock_ddg2(mock_mcs_csaxs: MCSCardCSAXS) -> Generator[DDG2, None, None]:
"""Fixture to mock the DDG1 device."""
# Add enabled to mock_mcs_csaxs
dm_mock = mock_mcs_csaxs.device_manager
with patched_device(
DDG2, name="ddg2", prefix="test_ddg2:", device_manager=dm_mock, _mock_pv_initial_value=0
) as dev:
dev.enabled = True
dev.device_manager.devices["ddg2"] = dev
try:
yield dev
finally:
dev.destroy()
def test_ddg2_on_connected(mock_ddg2: DDG2):
"""Test the on_connected method of DDG1."""
mock_ddg2.burst_mode.put(1) # Set burst mode to 1, if connected should reset it to 0
mock_ddg2.burst_delay.put(5) # Set to non-zero, should reset to 0 on connected
mock_ddg2.burst_count.put(10) # Set to non-default, should reset to 1 on connected
with mock.patch.object(mock_ddg2, "set_io_values") as mock_set_io_values:
mock_ddg2.on_connected()
# Burst mode Defaults
assert mock_ddg2.burst_mode.get() == 0
assert mock_set_io_values.call_count == len(DDG2_DEFAULT_IO_CONFIG)
for ch, config in DDG2_DEFAULT_IO_CONFIG.items():
assert mock.call(ch, **config) in mock_set_io_values.call_args_list
# Check reference values from DEFAULT_REFERENCES
for ch, refs in DDG2_DEFAULT_REFERENCES:
if ch == "A":
sub_ch = mock_ddg2.ab.ch1
elif ch == "B":
sub_ch = mock_ddg2.ab.ch2
elif ch == "C":
sub_ch = mock_ddg2.cd.ch1
elif ch == "D":
sub_ch = mock_ddg2.cd.ch2
elif ch == "E":
sub_ch = mock_ddg2.ef.ch1
elif ch == "F":
sub_ch = mock_ddg2.ef.ch2
elif ch == "G":
sub_ch = mock_ddg2.gh.ch1
elif ch == "H":
sub_ch = mock_ddg2.gh.ch2
assert sub_ch.reference.get() == refs.value
# Check Default trigger source
assert mock_ddg2.trigger_source.get() == DDG2_DEFAULT_TRIGGER_SOURCE.value
def test_ddg2_on_stage(mock_ddg2: DDG2):
"""
Test the on_stage method of DDG2.
We will test two scenarios:
I. Stage device with valid parameters.
II. Stage device with invalid parameters (too short exp_time). Should raise ValueError.
"""
ddg = mock_ddg2
exp_time = 0.1
frames_per_trigger = 10
ddg.on_connected()
ddg.scan_info.msg.scan_parameters["exp_time"] = exp_time
ddg.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
# Set non-default burst mode settings
ddg.burst_mode.put(0)
ddg.burst_delay.put(5)
# Stage device with valid parameters
ddg.stage()
assert ddg.staged == ophyd.Staged.yes
assert ddg.burst_mode.get() == 1 # Burst mode is enabled
assert ddg.burst_delay.get() == 0 # Burst delay is set to 0
assert ddg.burst_count.get() == frames_per_trigger
assert ddg.burst_period.get() == exp_time
# Pulse width is exp_time - readout_time
burst_pulse_width = exp_time - DDG2_DEFAULT_READOUT_TIMES["ab"]
assert np.isclose(ddg.ab.delay.get(), 0)
assert np.isclose(ddg.ab.width.get(), burst_pulse_width)
# Unstage to reset
ddg.unstage() # Reset staged state for next test
exp_time_short = 2e-4 # too short exposure time
with pytest.raises(ValueError):
ddg.scan_info.msg.scan_parameters["exp_time"] = exp_time_short
ddg.stage()
def test_ddg2_on_trigger(mock_ddg2: DDG2):
"""Test the on_trigger method of DDG2."""
ddg = mock_ddg2
ddg.on_connected()
ddg.trigger_shot.put(0)
status = ddg.trigger()
assert ddg.trigger_shot.get() == 0 # Should not trigger DDG2 via soft trigger
status.wait()
assert status.done is True
assert status.success is True
def test_ddg2_on_stop(mock_ddg2: DDG2):
"""Test the on_stop method of DDG2."""
ddg = mock_ddg2
ddg.on_connected()
ddg.burst_mode.put(1) # Enable burst mode
ddg.stop()
assert ddg.burst_mode.get() == 0 # Burst mode is disabled
# def test_ddg2_stage(mock_ddg2):
# """Test the on_stage method of DDG2."""
# exp_time = 0.1
# frames_per_trigger = 10
# mock_ddg2.on_connected()
# mock_ddg2.burst_mode.put(0)
# mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = exp_time
# mock_ddg2.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
# mock_ddg2.stage()
# assert np.isclose(mock_ddg2.burst_mode.get(), 1) # Burst mode is enabled
# assert np.isclose(mock_ddg2.ab.delay.get(), 0)
# assert np.isclose(mock_ddg2.ab.width.get(), exp_time - 2e-4) # DEFAULT_READOUT_TIMES["ab"])
# assert mock_ddg2.burst_count.get() == frames_per_trigger
# assert np.isclose(mock_ddg2.burst_delay.get(), 0)
# assert np.isclose(mock_ddg2.burst_period.get(), exp_time)
# assert mock_ddg2.trigger_source.get() == TRIGGERSOURCE.EXT_RISING_EDGE.value
# assert mock_ddg2.staged == ophyd.Staged.yes
# mock_ddg2.unstage() # Reset staged state for next test
# with pytest.raises(ValueError):
# mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = 2e-4 # too short exposure time
# mock_ddg2.stage()
# def test_ddg2_trigger(mock_ddg2):
# """Test the on_trigger method of DDG2."""
# mock_ddg2.trigger_shot.put(0)
# status = mock_ddg2.trigger()
# assert mock_ddg2.trigger_shot.get() == 0 # Should not trigger DDG2 via soft trigger
# status.wait()
# assert status.done is True
# assert status.success is True
# def test_ddg2_stop(mock_ddg2):
# """Test the on_stop method of DDG2."""
# mock_ddg2.burst_mode.put(1) # Enable burst mode
# mock_ddg2.stop()
# assert mock_ddg2.burst_mode.get() == 0 # Burst mode is disabled