tests: fix tests for ddg and mcs integrations
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m45s
CI for csaxs_bec / test (pull_request) Successful in 1m44s

This commit is contained in:
2026-01-06 09:29:21 +01:00
parent e7f18db24a
commit e046412eb8
5 changed files with 590 additions and 512 deletions

View File

@@ -289,10 +289,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
self.cancel_on_stop(status_acquiring)
mcs.erase_start.put(1)
# NOTE Timeout of 3s should be plenty, any longer wait should checked. If this happens to crash
# an acquisition regularly with a WaitTimeoutError, the timeout can be increased but it should
# be investigated why the EPICS interface is slow to respond.
status_acquiring.wait(timeout=3)
return status_acquiring
def _poll_event_status(self) -> None:
"""
@@ -464,7 +461,11 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
if mcs is None or mcs.enabled is False:
logger.info("Did not find mcs card with name 'mcs' in current session")
else:
self._prepare_mcs_on_trigger(mcs)
status_mcs = self._prepare_mcs_on_trigger(mcs)
# NOTE Timeout of 3s should be plenty, any longer wait should checked. If this happens to crash
# an acquisition regularly with a WaitTimeoutError, the timeout can be increased but it should
# be investigated why the EPICS interface is slow to respond.
status_mcs.wait(timeout=3)
# Prepare StatusBitsCompareStatus to resolve once the END_OF_BURST bit was set.
status = self._prepare_trigger_status_event()

View File

@@ -25,7 +25,7 @@ Burst mode is enabled:
import time
from bec_lib.logger import bec_logger
from ophyd import DeviceStatus, StatusBase
from ophyd_devices import DeviceStatus, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (

View File

@@ -475,7 +475,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
# Handle external stop/cancel, and stop monitoring
ret_status.add_callback(self._status_failed_callback)
self.cancel_on_stop(ret_status)
return status
return ret_status
def on_destroy(self):
"""

View File

@@ -6,9 +6,35 @@ from unittest import mock
import numpy as np
import ophyd
import pytest
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
from bec_server.device_server.tests.utils import DMMock
from ophyd_devices.tests.utils import patched_device
from csaxs_bec.devices.epics.delay_generator_csaxs import DDG1, DDG2
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import (
DEFAULT_IO_CONFIG as DDG1_DEFAULT_IO_CONFIG,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import (
DEFAULT_READOUT_TIMES as DDG1_DEFAULT_READOUT_TIMES,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import (
DEFAULT_REFERENCES as DDG1_DEFAULT_REFERENCES,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import (
DEFAULT_TRIGGER_SOURCE as DDG1_DEFAULT_TRIGGER_SOURCE,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import PROC_EVENT_MODE
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_2 import (
DEFAULT_IO_CONFIG as DDG2_DEFAULT_IO_CONFIG,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_2 import (
DEFAULT_READOUT_TIMES as DDG2_DEFAULT_READOUT_TIMES,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_2 import (
DEFAULT_REFERENCES as DDG2_DEFAULT_REFERENCES,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_2 import (
DEFAULT_TRIGGER_SOURCE as DDG2_DEFAULT_TRIGGER_SOURCE,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (
BURSTCONFIG,
CHANNELREFERENCE,
@@ -16,68 +42,46 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import
TRIGGERSOURCE,
DelayGeneratorCSAXS,
)
from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import MCSCardCSAXS
@pytest.fixture(scope="function")
def mock_ddg1() -> Generator[DDG1, DDG1, DDG1]:
"""Fixture to mock the DDG1 device."""
name = "ddg1"
prefix = "test_ddg1:"
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = DDG1(name=name, prefix=prefix)
patch_dual_pvs(dev)
yield dev
@pytest.fixture(scope="function")
def mock_ddg2() -> Generator[DDG2, DDG2, DDG2]:
"""Fixture to mock the DDG1 device."""
name = "ddg2"
prefix = "test_ddg2:"
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = DDG2(name=name, prefix=prefix)
patch_dual_pvs(dev)
yield dev
############################
### Test Delay Generator ###
############################
@pytest.fixture(scope="function")
def mock_ddg() -> Generator[DelayGeneratorCSAXS, DelayGeneratorCSAXS, DelayGeneratorCSAXS]:
"""Fixture to mock the camera device."""
name = "ddg"
prefix = "test:"
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = DelayGeneratorCSAXS(name=name, prefix=prefix)
patch_dual_pvs(dev)
yield dev
with patched_device(
DelayGeneratorCSAXS, name="ddg", prefix="test:", _mock_pv_initial_value=0
) as dev:
try:
yield dev
finally:
dev.destroy()
def test_ddg_init(mock_ddg):
def test_ddg_init(mock_ddg: DelayGeneratorCSAXS):
"""Test the proc event status method."""
assert mock_ddg.name == "ddg"
assert mock_ddg.prefix == "test:"
def test_ddg_proc_event_status(mock_ddg):
def test_ddg_proc_event_status(mock_ddg: DelayGeneratorCSAXS):
"""Test the proc event status method."""
mock_ddg.state.proc_status.put(0)
mock_ddg.proc_event_status()
assert mock_ddg.state.proc_status.get() == 1
def test_ddg_set_trigger(mock_ddg):
def test_ddg_set_trigger(mock_ddg: DelayGeneratorCSAXS):
"""Test setting the trigger."""
for trigger in TRIGGERSOURCE:
mock_ddg.set_trigger(trigger)
assert mock_ddg.trigger_source.get() == trigger.value
def test_ddg_burst_enable(mock_ddg):
def test_ddg_burst_enable(mock_ddg: DelayGeneratorCSAXS):
"""Test enabling burst mode."""
mock_ddg.burst_enable(count=100, delay=0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES)
mock_ddg.burst_mode.get() == 1
@@ -101,7 +105,7 @@ def test_ddg_burst_enable(mock_ddg):
mock_ddg.burst_mode.get() == BURSTCONFIG.FIRST_CYCLE.value
def test_ddg_wait_for_event_status(mock_ddg):
def test_ddg_wait_for_event_status(mock_ddg: DelayGeneratorCSAXS):
"""Test setting wait for event status."""
mock_ddg: DelayGeneratorCSAXS
mock_ddg.state.event_status._read_pv.mock_data = 0
@@ -117,7 +121,7 @@ def test_ddg_wait_for_event_status(mock_ddg):
# assert status.done is True
def test_ddg_set_io_values(mock_ddg):
def test_ddg_set_io_values(mock_ddg: DelayGeneratorCSAXS):
"""Test setting IO values."""
mock_ddg.set_io_values(channel="ab", amplitude=3, offset=2, polarity=1, mode="ttl")
assert mock_ddg.ab.io.amplitude.get() == 3
@@ -138,7 +142,7 @@ def test_ddg_set_io_values(mock_ddg):
assert attr.nim_mode.get() == 1
def test_ddg_set_delay_pairs(mock_ddg):
def test_ddg_set_delay_pairs(mock_ddg: DelayGeneratorCSAXS):
"""Test setting delay pairs."""
mock_ddg.set_delay_pairs(channel="ab", delay=0.1, width=0.2)
assert np.isclose(mock_ddg.ab.delay.get(), 0.1)
@@ -156,52 +160,143 @@ def test_ddg_set_delay_pairs(mock_ddg):
assert np.isclose(getattr(mock_ddg, channel).ch2.setpoint.get(), delay + 0.2)
def test_ddg1_on_connected(mock_ddg1):
#########################
### Test DDG1 Device ####
#########################
@pytest.fixture(scope="function")
def mock_mcs_csaxs() -> Generator[MCSCardCSAXS, None, None]:
"""Fixture to mock the MCSCardCSAXS device."""
dm = DMMock()
with patched_device(
MCSCardCSAXS,
name="mcs",
prefix="X12SA-MCS-CSAXS:",
device_manager=dm,
_mock_pv_initial_value=0,
) as dev:
dev.enabled = True
dev.device_manager.devices["mcs"] = dev
try:
yield dev
finally:
dev.destroy()
@pytest.fixture(scope="function")
def mock_ddg1(mock_mcs_csaxs: MCSCardCSAXS) -> Generator[DDG1, None, None]:
"""Fixture to mock the DDG1 device."""
# Add enabled to mock_mcs_csaxs
dm_mock = mock_mcs_csaxs.device_manager
with patched_device(
DDG1, name="ddg1", prefix="test_ddg1:", device_manager=dm_mock, _mock_pv_initial_value=0
) as dev:
dev.enabled = True
dev.device_manager.devices["ddg1"] = dev
try:
yield dev
finally:
dev.destroy()
def test_ddg1_on_connected(mock_ddg1: DDG1):
"""Test the on_connected method of DDG1."""
mock_ddg1.on_connected()
# IO defaults
assert mock_ddg1.burst_mode.get() == 0
assert mock_ddg1.ab.io.amplitude.get() == 5.0
assert mock_ddg1.cd.io.offset.get() == 0.0
assert mock_ddg1.ef.io.polarity.get() == 1
assert mock_ddg1.gh.io.ttl_mode.get() == 1
mock_ddg1.burst_mode.put(1) # Set burst mode to 1, if connected should reset it to 0
mock_ddg1.burst_delay.put(5) # Set to non-zero, should reset to 0 on connected
mock_ddg1.burst_count.put(10) # Set to non-default, should reset to 1 on connected
with mock.patch.object(mock_ddg1, "set_io_values") as mock_set_io_values:
mock_ddg1.on_connected()
# reference defaults
assert mock_ddg1.ab.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
assert mock_ddg1.ab.ch2.reference.get() == 1 # CHANNELREFERENCE.A.value
assert mock_ddg1.cd.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
assert mock_ddg1.cd.ch2.reference.get() == 3 # CHANNELREFERENCE.C.value
assert mock_ddg1.ef.ch1.reference.get() == 4 # CHANNELREFERENCE.D.value
assert mock_ddg1.ef.ch2.reference.get() == 5 # CHANNELREFERENCE.E.value
assert mock_ddg1.gh.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
assert mock_ddg1.gh.ch2.reference.get() == 7 # CHANNELREFERENCE.G.value
# Burst mode Defaults
assert mock_ddg1.burst_mode.get() == 0
assert mock_ddg1.burst_delay.get() == 0
assert mock_ddg1.burst_count.get() == 1
# Default trigger source
assert mock_ddg1.trigger_source.get() == 5 # TRIGGERSOURCE.SINGLE_SHOT.value
assert mock_set_io_values.call_count == len(DDG1_DEFAULT_IO_CONFIG)
for ch, config in DDG1_DEFAULT_IO_CONFIG.items():
assert mock.call(ch, **config) in mock_set_io_values.call_args_list
# Check reference values from DEFAULT_REFERENCES
for ch, refs in DDG1_DEFAULT_REFERENCES:
if ch == "A":
sub_ch = mock_ddg1.ab.ch1
elif ch == "B":
sub_ch = mock_ddg1.ab.ch2
elif ch == "C":
sub_ch = mock_ddg1.cd.ch1
elif ch == "D":
sub_ch = mock_ddg1.cd.ch2
elif ch == "E":
sub_ch = mock_ddg1.ef.ch1
elif ch == "F":
sub_ch = mock_ddg1.ef.ch2
elif ch == "G":
sub_ch = mock_ddg1.gh.ch1
elif ch == "H":
sub_ch = mock_ddg1.gh.ch2
assert sub_ch.reference.get() == refs.value
# Check Default trigger source
assert mock_ddg1.trigger_source.get() == DDG1_DEFAULT_TRIGGER_SOURCE.value
# Check proc state mode
assert mock_ddg1.state.proc_status_mode.get() == PROC_EVENT_MODE.EVENT.value
# Check the poll thread is started
assert mock_ddg1._poll_thread.is_alive()
assert not mock_ddg1._poll_thread_kill_event.is_set()
assert not mock_ddg1._poll_thread_poll_loop_done.is_set()
assert not mock_ddg1._poll_thread_run_event.is_set()
def test_ddg1_stage(mock_ddg1):
def test_ddg1_prepare_mcs(mock_ddg1: DDG1, mock_mcs_csaxs: MCSCardCSAXS):
"""Test the prepare_mcs method of DDG1."""
mcs = mock_mcs_csaxs
ddg = mock_ddg1
# Simulate default state
mcs.acquiring._read_pv.mock_data = 0 # not acquiring
mcs.erase_start.put(0) # reset erase start
# Prepare MCS on trigger
st = ddg._prepare_mcs_on_trigger(mcs)
assert st.done is False
assert st.success is False
assert mcs.erase_start.get() == 1 # erase started
# Simulate acquiring started
mcs.acquiring._read_pv.mock_data = 1 # acquiring
st.wait(2)
assert st.done is True
assert st.success is True
def test_ddg1_stage(mock_ddg1: DDG1):
"""Test the on_stage method of DDG1."""
exp_time = 0.1
frames_per_trigger = 10
mock_ddg1.burst_mode.put(1)
mock_ddg1.burst_mode.put(0) # Non-default, should be reset on stage
mock_ddg1.burst_delay.put(5) # Non-default, should be reset on stage
mock_ddg1.burst_count.put(10) # Non-default, should be reset on stage
mock_ddg1.scan_info.msg.scan_parameters["exp_time"] = exp_time
mock_ddg1.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
mock_ddg1.stage()
shutter_width = 2e-3 + exp_time * frames_per_trigger + 1e-3
assert np.isclose(mock_ddg1.burst_mode.get(), 1) # burst mode is enabled
assert np.isclose(mock_ddg1.burst_delay.get(), 0)
assert np.isclose(mock_ddg1.burst_period.get(), exp_time)
assert np.isclose(mock_ddg1.burst_period.get(), shutter_width)
# Trigger DDG2 through EXT/EN
assert np.isclose(mock_ddg1.ab.delay.get(), 2e-3)
assert np.isclose(mock_ddg1.ab.width.get(), 1e-6)
# Shutter channel cd
assert np.isclose(mock_ddg1.cd.delay.get(), 0)
assert np.isclose(mock_ddg1.cd.width.get(), 2e-3 + exp_time * frames_per_trigger + 1e-3)
assert np.isclose(mock_ddg1.cd.width.get(), shutter_width)
# MCS channel ef or gate
assert np.isclose(mock_ddg1.ef.delay.get(), 0)
assert np.isclose(mock_ddg1.ef.width.get(), 1e-6)
@@ -209,96 +304,266 @@ def test_ddg1_stage(mock_ddg1):
assert mock_ddg1.staged == ophyd.Staged.yes
def test_ddg1_trigger(mock_ddg1):
"""Test the on_trigger method of DDG1."""
mock_ddg1.state.event_status._read_pv.mock_data = STATUSBITS.NONE.value
def test_ddg1_on_trigger(mock_ddg1: DDG1):
"""
Test the on_trigger method of the DDG1.
We will test two scenarios:
I. Trigger is prepared, and resolves successfully after END_OF_BURST is reached in event status register.
II. Trigger is called while _poll_thread_loop_done is not yet finished from a previous trigger.
This may be the case if polling is yet to finsish. The next on_trigger should terminate the previous
polling, and work as expected. In addition, we will simulate that the mcs card is disabled, thus not prepared.
"""
ddg = mock_ddg1
# Make sure DDG is setup in default state through on_connected
ddg.on_connected()
# Check that poll thread is running and run event is not set
assert ddg._poll_thread.is_alive()
assert not ddg._poll_thread_run_event.is_set()
assert not ddg._poll_thread_poll_loop_done.is_set()
# Set the status register bit
ddg.state.event_status._read_pv.mock_data = STATUSBITS.ABORT_DELAY.value
#################################
# Scenario I - normal operation #
#################################
with mock.patch.object(ddg, "_prepare_mcs_on_trigger") as mock_prepare_mcs:
mock_prepare_mcs.return_value = ophyd.StatusBase(done=True, success=True)
status = ddg.trigger()
# Check that the poll thread run event is set
assert ddg._poll_thread_run_event.is_set()
assert not ddg._poll_thread_poll_loop_done.is_set()
with mock.patch.object(mock_ddg1, "device_manager") as mock_device_manager:
# TODO add device manager DMMock, and properly test logic for mcs triggering.
mock_get = mock_device_manager.devices.get = mock.Mock(return_value=None)
status = mock_ddg1.trigger()
assert mock_get.call_args == mock.call("mcs", None)
assert status.done is False
assert status.success is False
assert mock_ddg1.trigger_shot.get() == 1
mock_ddg1.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_BURST.value
assert ddg.trigger_shot.get() == 1
# Simulate that the event status bit reaches END_OF_BURST
ddg.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_BURST.value
status.wait(timeout=1) # Wait for the status to be done
assert status.done is True
assert status.success is True
# Should finish the poll loop
ddg._poll_thread_poll_loop_done.wait(timeout=1)
assert not ddg._poll_thread_run_event.is_set()
def test_ddg1_stop(mock_ddg1):
"""Test the on_stop method of DDG1."""
mock_ddg1.burst_mode.put(1) # Enable burst mode
mock_ddg1.stop()
assert mock_ddg1.burst_mode.get() == 0 # Burst mode is disabled
############################################
# Scenario II - previous poll not finished #
# MCS card disabled #
############################################
# Set mcs card to enabled = False
ddg.device_manager.devices["mcs"].enabled = False
ddg.state.event_status._read_pv.mock_data = STATUSBITS.ABORT_DELAY.value
ddg._start_polling()
assert ddg._poll_thread_run_event.is_set()
with mock.patch.object(ddg, "_prepare_mcs_on_trigger") as mock_prepare_mcs:
status = ddg.trigger()
mock_prepare_mcs.assert_not_called() # MCS is disabled, should not be called
assert status.done is False
assert status.success is False
# Resolve the status by simulating END_OF_BURST
ddg.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_BURST.value
status.wait(timeout=1) # Wait for the status to be done
assert status.done is True
assert status.success is True
# Wait for poll loop to finish
ddg._poll_thread_poll_loop_done.wait(timeout=1)
assert not ddg._poll_thread_run_event.is_set()
def test_ddg2_on_connected(mock_ddg2):
"""Test on connected method of DDG2."""
mock_ddg2.on_connected()
# IO defaults
assert mock_ddg2.burst_mode.get() == 0
assert mock_ddg2.ab.io.amplitude.get() == 5.0
assert mock_ddg2.cd.io.offset.get() == 0.0
assert mock_ddg2.ef.io.polarity.get() == 1
assert mock_ddg2.gh.io.ttl_mode.get() == 1
# def test_ddg1_trigger(mock_ddg1):
# """Test the on_trigger method of DDG1."""
# mock_ddg1.state.event_status._read_pv.mock_data = STATUSBITS.NONE.value
# reference defaults
assert mock_ddg2.ab.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
assert mock_ddg2.ab.ch2.reference.get() == 1 # CHANNELREFERENCE.A.value
assert mock_ddg2.cd.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
assert mock_ddg2.cd.ch2.reference.get() == 3 # CHANNELREFERENCE.C.value
assert mock_ddg2.ef.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
assert mock_ddg2.ef.ch2.reference.get() == 5 # CHANNELREFERENCE.E.value
assert mock_ddg2.gh.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value
assert mock_ddg2.gh.ch2.reference.get() == 7 # CHANNELREFERENCE.G.value
# Default trigger source
assert mock_ddg2.trigger_source.get() == 1 # TRIGGERSOURCE.EXT_RISING_EDGE.value
# with mock.patch.object(mock_ddg1, "device_manager") as mock_device_manager:
# # TODO add device manager DMMock, and properly test logic for mcs triggering.
# mock_get = mock_device_manager.devices.get = mock.Mock(return_value=None)
# status = mock_ddg1.trigger()
# assert mock_get.call_args == mock.call("mcs", None)
# assert status.done is False
# assert status.success is False
# assert mock_ddg1.trigger_shot.get() == 1
# mock_ddg1.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_BURST.value
# status.wait(timeout=1) # Wait for the status to be done
# assert status.done is True
# assert status.success is True
def test_ddg2_stage(mock_ddg2):
"""Test the on_stage method of DDG2."""
# def test_ddg1_stop(mock_ddg1):
# """Test the on_stop method of DDG1."""
# mock_ddg1.burst_mode.put(1) # Enable burst mode
# mock_ddg1.stop()
# assert mock_ddg1.burst_mode.get() == 0 # Burst mode is disabled
#########################
### Test DDG2 Device ####
#########################
@pytest.fixture(scope="function")
def mock_ddg2(mock_mcs_csaxs: MCSCardCSAXS) -> Generator[DDG2, None, None]:
"""Fixture to mock the DDG1 device."""
# Add enabled to mock_mcs_csaxs
dm_mock = mock_mcs_csaxs.device_manager
with patched_device(
DDG2, name="ddg2", prefix="test_ddg2:", device_manager=dm_mock, _mock_pv_initial_value=0
) as dev:
dev.enabled = True
dev.device_manager.devices["ddg2"] = dev
try:
yield dev
finally:
dev.destroy()
def test_ddg2_on_connected(mock_ddg2: DDG2):
"""Test the on_connected method of DDG1."""
mock_ddg2.burst_mode.put(1) # Set burst mode to 1, if connected should reset it to 0
mock_ddg2.burst_delay.put(5) # Set to non-zero, should reset to 0 on connected
mock_ddg2.burst_count.put(10) # Set to non-default, should reset to 1 on connected
with mock.patch.object(mock_ddg2, "set_io_values") as mock_set_io_values:
mock_ddg2.on_connected()
# Burst mode Defaults
assert mock_ddg2.burst_mode.get() == 0
assert mock_set_io_values.call_count == len(DDG2_DEFAULT_IO_CONFIG)
for ch, config in DDG2_DEFAULT_IO_CONFIG.items():
assert mock.call(ch, **config) in mock_set_io_values.call_args_list
# Check reference values from DEFAULT_REFERENCES
for ch, refs in DDG2_DEFAULT_REFERENCES:
if ch == "A":
sub_ch = mock_ddg2.ab.ch1
elif ch == "B":
sub_ch = mock_ddg2.ab.ch2
elif ch == "C":
sub_ch = mock_ddg2.cd.ch1
elif ch == "D":
sub_ch = mock_ddg2.cd.ch2
elif ch == "E":
sub_ch = mock_ddg2.ef.ch1
elif ch == "F":
sub_ch = mock_ddg2.ef.ch2
elif ch == "G":
sub_ch = mock_ddg2.gh.ch1
elif ch == "H":
sub_ch = mock_ddg2.gh.ch2
assert sub_ch.reference.get() == refs.value
# Check Default trigger source
assert mock_ddg2.trigger_source.get() == DDG2_DEFAULT_TRIGGER_SOURCE.value
def test_ddg2_on_stage(mock_ddg2: DDG2):
"""
Test the on_stage method of DDG2.
We will test two scenarios:
I. Stage device with valid parameters.
II. Stage device with invalid parameters (too short exp_time). Should raise ValueError.
"""
ddg = mock_ddg2
exp_time = 0.1
frames_per_trigger = 10
mock_ddg2.on_connected()
ddg.on_connected()
ddg.scan_info.msg.scan_parameters["exp_time"] = exp_time
ddg.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
mock_ddg2.burst_mode.put(0)
mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = exp_time
mock_ddg2.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
# Set non-default burst mode settings
ddg.burst_mode.put(0)
ddg.burst_delay.put(5)
mock_ddg2.stage()
# Stage device with valid parameters
ddg.stage()
assert ddg.staged == ophyd.Staged.yes
assert ddg.burst_mode.get() == 1 # Burst mode is enabled
assert ddg.burst_delay.get() == 0 # Burst delay is set to 0
assert ddg.burst_count.get() == frames_per_trigger
assert ddg.burst_period.get() == exp_time
assert np.isclose(mock_ddg2.burst_mode.get(), 1) # Burst mode is enabled
assert np.isclose(mock_ddg2.ab.delay.get(), 0)
assert np.isclose(mock_ddg2.ab.width.get(), exp_time - 2e-4) # DEFAULT_READOUT_TIMES["ab"])
assert mock_ddg2.burst_count.get() == frames_per_trigger
assert np.isclose(mock_ddg2.burst_delay.get(), 0)
assert np.isclose(mock_ddg2.burst_period.get(), exp_time)
assert mock_ddg2.trigger_source.get() == TRIGGERSOURCE.EXT_RISING_EDGE.value
assert mock_ddg2.staged == ophyd.Staged.yes
mock_ddg2.unstage() # Reset staged state for next test
# Pulse width is exp_time - readout_time
burst_pulse_width = exp_time - DDG2_DEFAULT_READOUT_TIMES["ab"]
assert np.isclose(ddg.ab.delay.get(), 0)
assert np.isclose(ddg.ab.width.get(), burst_pulse_width)
# Unstage to reset
ddg.unstage() # Reset staged state for next test
exp_time_short = 2e-4 # too short exposure time
with pytest.raises(ValueError):
mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = 2e-4 # too short exposure time
mock_ddg2.stage()
ddg.scan_info.msg.scan_parameters["exp_time"] = exp_time_short
ddg.stage()
def test_ddg2_trigger(mock_ddg2):
def test_ddg2_on_trigger(mock_ddg2: DDG2):
"""Test the on_trigger method of DDG2."""
mock_ddg2.trigger_shot.put(0)
status = mock_ddg2.trigger()
assert mock_ddg2.trigger_shot.get() == 0 # Should not trigger DDG2 via soft trigger
ddg = mock_ddg2
ddg.on_connected()
ddg.trigger_shot.put(0)
status = ddg.trigger()
assert ddg.trigger_shot.get() == 0 # Should not trigger DDG2 via soft trigger
status.wait()
assert status.done is True
assert status.success is True
def test_ddg2_stop(mock_ddg2):
def test_ddg2_on_stop(mock_ddg2: DDG2):
"""Test the on_stop method of DDG2."""
mock_ddg2.burst_mode.put(1) # Enable burst mode
mock_ddg2.stop()
assert mock_ddg2.burst_mode.get() == 0 # Burst mode is disabled
ddg = mock_ddg2
ddg.on_connected()
ddg.burst_mode.put(1) # Enable burst mode
ddg.stop()
assert ddg.burst_mode.get() == 0 # Burst mode is disabled
# def test_ddg2_stage(mock_ddg2):
# """Test the on_stage method of DDG2."""
# exp_time = 0.1
# frames_per_trigger = 10
# mock_ddg2.on_connected()
# mock_ddg2.burst_mode.put(0)
# mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = exp_time
# mock_ddg2.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
# mock_ddg2.stage()
# assert np.isclose(mock_ddg2.burst_mode.get(), 1) # Burst mode is enabled
# assert np.isclose(mock_ddg2.ab.delay.get(), 0)
# assert np.isclose(mock_ddg2.ab.width.get(), exp_time - 2e-4) # DEFAULT_READOUT_TIMES["ab"])
# assert mock_ddg2.burst_count.get() == frames_per_trigger
# assert np.isclose(mock_ddg2.burst_delay.get(), 0)
# assert np.isclose(mock_ddg2.burst_period.get(), exp_time)
# assert mock_ddg2.trigger_source.get() == TRIGGERSOURCE.EXT_RISING_EDGE.value
# assert mock_ddg2.staged == ophyd.Staged.yes
# mock_ddg2.unstage() # Reset staged state for next test
# with pytest.raises(ValueError):
# mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = 2e-4 # too short exposure time
# mock_ddg2.stage()
# def test_ddg2_trigger(mock_ddg2):
# """Test the on_trigger method of DDG2."""
# mock_ddg2.trigger_shot.put(0)
# status = mock_ddg2.trigger()
# assert mock_ddg2.trigger_shot.get() == 0 # Should not trigger DDG2 via soft trigger
# status.wait()
# assert status.done is True
# assert status.success is True
# def test_ddg2_stop(mock_ddg2):
# """Test the on_stop method of DDG2."""
# mock_ddg2.burst_mode.put(1) # Enable burst mode
# mock_ddg2.stop()
# assert mock_ddg2.burst_mode.get() == 0 # Burst mode is disabled

View File

@@ -1,5 +1,7 @@
# pylint: skip-file
import threading
from copy import deepcopy
from typing import Generator
from unittest import mock
import numpy as np
@@ -8,6 +10,7 @@ import pytest
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_server.device_server.tests.utils import DMMock
from ophyd_devices.interfaces.base_classes.psi_device_base import DeviceStoppedError
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
from csaxs_bec.devices.epics.mcs_card.mcs_card import (
@@ -46,429 +49,238 @@ def test_mcs_card(mock_mcs_card):
@pytest.fixture(scope="function")
def mock_mcs_csaxs():
def mock_mcs_csaxs() -> Generator[MCSCardCSAXS, None, None]:
"""Fixture to mock the MCSCardCSAXS device."""
name = "mcs_csaxs"
prefix = "X12SA-MCS-CSAXS:"
dm = DMMock()
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
mcs_card_csaxs = MCSCardCSAXS(name=name, prefix=prefix, device_manager=dm)
patch_dual_pvs(mcs_card_csaxs)
yield mcs_card_csaxs
try:
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
mcs_card_csaxs = MCSCardCSAXS(name=name, prefix=prefix, device_manager=dm)
patch_dual_pvs(mcs_card_csaxs)
yield mcs_card_csaxs
finally:
mcs_card_csaxs.on_destroy()
def test_mcs_card_csaxs(mock_mcs_csaxs):
def test_mcs_card_csaxs(mock_mcs_csaxs: MCSCardCSAXS):
"""Test the MCSCardCSAXS initialization."""
assert mock_mcs_csaxs.name == "mcs_csaxs"
assert mock_mcs_csaxs.prefix == "X12SA-MCS-CSAXS:"
assert mock_mcs_csaxs.counter_mapping == {
"mcs_csaxs_counters_mca1": "current1",
"mcs_csaxs_counters_mca2": "current2",
"mcs_csaxs_counters_mca3": "current3",
"mcs_csaxs_counters_mca4": "current4",
"mcs_csaxs_counters_mca5": "count_time",
}
assert mock_mcs_csaxs._mcs_clock == 1e7 # 10 MHz
assert mock_mcs_csaxs._acquisition_group == "monitored"
assert mock_mcs_csaxs._num_total_triggers == 0
assert mock_mcs_csaxs._mcs_clock == 1e7
assert mock_mcs_csaxs._pv_timeout == 2.0
assert mock_mcs_csaxs._mca_counter_index == 0
assert mock_mcs_csaxs._current_data_index == 0
assert mock_mcs_csaxs._current_data == {}
assert mock_mcs_csaxs.NUM_MCA_CHANNELS == 32
def test_mcs_card_csaxs_on_connected(mock_mcs_csaxs):
def test_mcs_card_csaxs_on_connected(mock_mcs_csaxs: MCSCardCSAXS):
"""Test the on_connected method of MCSCardCSAXS."""
mcs = mock_mcs_csaxs
mcs.on_connected()
# Stop called
assert mcs.stop_all.get() == 1
# Channel advance settings
assert mcs.channel_advance.get() == CHANNELADVANCE.EXTERNAL
assert mcs.channel1_source.get() == CHANNEL1SOURCE.EXTERNAL
assert mcs.prescale.get() == 1
#
assert mcs.user_led.get() == 0
# input output settings
assert mcs.input_mode.get() == INPUTMODE.MODE_3
assert mcs.input_polarity.get() == POLARITY.NORMAL
assert mcs.output_mode.get() == OUTPUTMODE.MODE_2
assert mcs.output_polarity.get() == POLARITY.NORMAL
assert mcs.count_on_start.get() == 0
assert mcs.read_mode.get() == READMODE.PASSIVE
assert mcs.acquire_mode.get() == ACQUIREMODE.MCS
with mock.patch.object(mcs.counters.mca1, "subscribe") as mock_mca_subscribe:
with (
mock.patch.object(mcs.counters.mca1, "subscribe") as mock_mca_subscribe,
mock.patch.object(mcs, "mcs_recovery") as mock_mcs_recovery,
mock.patch.object(mcs._scan_done_thread, "start") as mock_scan_done_thread_start,
):
mcs.on_connected()
# Stop called
assert mcs.stop_all.get() == 1
# Channel advance settings
assert mcs.channel_advance.get() == CHANNELADVANCE.EXTERNAL
assert mcs.channel1_source.get() == CHANNEL1SOURCE.EXTERNAL
assert mcs.prescale.get() == 1
assert mcs.user_led.get() == 0
# Mux output
assert mcs.mux_output.get() == mcs.NUM_MCA_CHANNELS
# input output settings
assert mcs.input_mode.get() == INPUTMODE.MODE_3
assert mcs.input_polarity.get() == POLARITY.NORMAL
assert mcs.output_mode.get() == OUTPUTMODE.MODE_2
assert mcs.output_polarity.get() == POLARITY.NORMAL
assert mcs.count_on_start.get() == 0
assert mcs.read_mode.get() == READMODE.PASSIVE
assert mcs.acquire_mode.get() == ACQUIREMODE.MCS
# Check if subscriptions are setup correctly
assert mock_mca_subscribe.call_args == mock.call(mcs._on_counter_update, run=False)
# Check if recovery is called
mock_mcs_recovery.assert_called_once_with(timeout=1)
# Check if scan done thread is started
mock_scan_done_thread_start.assert_called_once()
def test_mcs_card_csaxs_stage(mock_mcs_csaxs):
def test_mcs_card_csaxs_stage(mock_mcs_csaxs: MCSCardCSAXS):
"""Test on stage method of MCSCardCSAXS"""
mcs = mock_mcs_csaxs
triggers = 5
num_points = 10
mcs.scan_info.msg.scan_parameters["frames_per_trigger"] = triggers
mcs.erase_all.put(0)
mcs.scan_info.msg.num_points = num_points
# Simulate that the MCS card is still acquiring, and that current channel is !=0
mcs.current_channel._read_pv.mock_data = 2 # Simulate that current channel is not zero
mcs.erase_all.put(0) # Set erase_all to 0
mcs._current_data = {"mca1": [1, 2, 3]} # Simulate existing data
mcs._scan_done_callbacks = [lambda: None] # Simulate existing callbacks
mcs._start_monitor_async_data_emission.set() # Simulate that monitoring is started
mcs._omit_mca_callbacks.set() # Simulate that mca callbacks are omitted
mcs.stage()
# Check that card is staged
assert mcs._staged == ophyd.Staged.yes
assert mcs.erase_all.get() == 1
# Check that erase_all, stop_all, preset_real, num_use_all are set correctly
assert mcs.erase_all.get() == 1 # Should be set to 1 as current_channel !=0
assert mcs.preset_real.get() == 0
assert mcs.num_use_all.get() == triggers
# Check that internal variables are reset
assert mcs._num_total_triggers == triggers * num_points
assert mcs._current_data == {}
assert mcs._scan_done_callbacks == []
assert mcs._current_data_index == 0
# Check that thread events are cleared properly
assert not mcs._start_monitor_async_data_emission.is_set()
assert not mcs._omit_mca_callbacks.is_set()
def test_mcs_card_csaxs_unstage(mock_mcs_csaxs):
"""Test unstage method of MCSCardCSAXS"""
mcs = mock_mcs_csaxs
mcs.stop_all.put(0)
mcs.ready_to_read.put(0)
mcs.erase_all.put(1)
mcs.erase_all.put(0)
mcs.unstage()
assert mcs.stop_all.get() == 1
assert mcs.erase_all.get() == 0
assert mcs.erase_all.get() == 1
def test_mcs_card_csaxs_complete_and_stop(mock_mcs_csaxs):
"""Test complete method of MCSCarcCSAXS"""
def test_mcs_card_csaxs_complete_and_stop(mock_mcs_csaxs: MCSCardCSAXS):
"""
Test complete method of MCSCarcCSAXS.
Two use cases:
I. Acquisition is stopped externally
II. Acquisition completes normally
"""
mcs = mock_mcs_csaxs
mcs.acquiring._read_pv.mock_data = ACQUIRING.ACQUIRING
# Make sure that device on_connected has been called which starts the monitoring thread
mcs.on_connected()
#######################
# I. Use case where acquisition is stopped
#######################
st = mcs.complete()
assert st.done is False
mcs.stop_all.put(0)
assert mcs._start_monitor_async_data_emission.is_set()
# Status should be cancelled by stop
mcs.stop()
with pytest.raises(Exception):
with pytest.raises(DeviceStoppedError):
st.wait(timeout=3)
# Callback on status failure should stop monitoring
mcs._start_monitor_async_data_emission.wait(2)
assert not mcs._start_monitor_async_data_emission.is_set()
#######################
# II. Use case where acquisition completes normally
#######################
mcs._current_data_index = 0
mcs.scan_info.msg.num_points = 10
mcs.acquiring._read_pv.mock_data = ACQUIRING.ACQUIRING
st = mcs.complete()
assert st.done is False
assert mcs._start_monitor_async_data_emission.is_set()
mcs.acquiring._read_pv.mock_data = ACQUIRING.DONE
# This should now automatically complete the status
mcs._current_data_index = 10
st.wait(timeout=3)
assert st.done is True
assert st.success is False
assert mcs.stop_all.get() == 1
assert st.success is True
# Clean up procedure should stop the async_data monitoring
mcs._start_monitor_async_data_emission.wait(2)
assert not mcs._start_monitor_async_data_emission.is_set()
def test_mcs_card_csaxs_on_counter_updated(mock_mcs_csaxs):
def test_mcs_recovery(mock_mcs_csaxs: MCSCardCSAXS):
mcs = mock_mcs_csaxs
# Called for mca1
# Simulate ongoing acquisition
mcs.erase_all._read_pv.mock_data = 0
mcs.stop_all._read_pv.mock_data = 0
mcs.erase_start.put(0)
mcs.mcs_recovery(timeout=0.1)
assert mcs.erase_all.get() == 1
assert mcs.stop_all.get() == 1
assert mcs.erase_start.get() == 1
assert not mcs._omit_mca_callbacks.is_set()
def test_mcs_card_csaxs_on_counter_updated(mock_mcs_csaxs: MCSCardCSAXS):
"""
Test the on_counter_update method of MCSCardCSAXS.
We will test 2 use cases:
I. Suppressed callbacks
II. Callback from 32 mca counters, should result in data being sent to BEC
"""
mcs = mock_mcs_csaxs
# I. Suppressed callbacks
mcs._omit_mca_callbacks.set()
kwargs = {"obj": mcs.counters.mca1}
mcs._on_counter_update(1, **kwargs)
assert mcs.mcs.mca1.get() == 1
assert mcs.bpm.current1.get() == 1
assert mcs.counter_updated == [mcs.counters.mca1.name]
# Called for mca2
kwargs = {"obj": mcs.counters.mca2}
mcs._on_counter_update(np.array([2, 4]), **kwargs)
assert mcs.mcs.mca2.get() == [2, 4]
assert np.isclose(mcs.bpm.current2.get(), 3)
assert mcs.counter_updated == [mcs.counters.mca1.name, mcs.counters.mca2.name]
# Called for mca3
kwargs = {"obj": mcs.counters.mca3}
mcs._on_counter_update(1000, **kwargs)
assert mcs.mcs.mca3.get() == 1000
assert mcs.bpm.current3.get() == 1000
assert mcs.counter_updated == [
mcs.counters.mca1.name,
mcs.counters.mca2.name,
mcs.counters.mca3.name,
]
# Called for mca4
kwargs = {"obj": mcs.counters.mca4}
mcs._on_counter_update(np.array([20, 40]), **kwargs)
assert mcs.mcs.mca4.get() == [20, 40]
assert np.isclose(mcs.bpm.current4.get(), 30)
assert mcs.counter_updated == [
mcs.counters.mca1.name,
mcs.counters.mca2.name,
mcs.counters.mca3.name,
mcs.counters.mca4.name,
]
# Called for mca5
assert mcs.ready_to_read.get() == 0
kwargs = {"obj": mcs.counters.mca5}
mcs._on_counter_update(np.array([10000, 10000]), **kwargs)
assert np.isclose(mcs.bpm.count_time.get(), 10000 / 1e7)
assert mcs.mcs.mca5.get() == [10000, 10000]
assert mcs._mca_counter_index == 1 # Counter index should still increment
assert mcs._current_data == {}
# II. Callback from 32 mca counters
mcs._omit_mca_callbacks.clear()
mcs._mca_counter_index = 0
mcs._current_data_index = 0
val = mcs.mca.get()
# @pytest.fixture(scope="function")
# def mock_det():
# name = "mcs"
# prefix = "X12SA-MCS:"
# dm = DMMock()
# with mock.patch.object(dm, "connector"):
# with (
# mock.patch(
# "ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"
# ) as filemixin,
# mock.patch(
# "ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config"
# ) as mock_service_config,
# ):
# with mock.patch.object(ophyd, "cl") as mock_cl:
# mock_cl.get_pv = MockPV
# mock_cl.thread_class = threading.Thread
# with mock.patch.object(MCScSAXS, "_init"):
# det = MCScSAXS(name=name, prefix=prefix, device_manager=dm)
# patch_dual_pvs(det)
# det.TIMEOUT_FOR_SIGNALS = 0.1
# yield det
for ii in range(mcs.NUM_MCA_CHANNELS):
counter = getattr(mcs.counters, f"mca{ii+1}")
kwargs = {"obj": counter, "timestamp": 1.0}
if ii % 2 == 1:
value = np.array([ii, (ii + 1) * 2])
else:
value = ii
mcs._on_counter_update(value, **kwargs)
if ii < (mcs.NUM_MCA_CHANNELS - 1):
assert mcs._current_data_index == 0
assert mcs._mca_counter_index == ii + 1
assert counter.attr_name in mcs._current_data
assert (
mcs._current_data[counter.attr_name]["value"] == value.tolist()
if isinstance(value, np.ndarray)
else [value]
)
buffer = deepcopy(mcs._current_data)
assert mcs.mca.get() == val # Async mca signal should not change
else:
# On last counter, data should be sent to BEC, and internal variables reset
buffer[counter.attr_name] = {
"value": value.tolist() if isinstance(value, np.ndarray) else [value],
"timestamp": 1.0,
}
assert mcs._mca_counter_index == 0
assert mcs._current_data_index == 1
assert mcs._current_data == {}
# def test_init():
# """Test the _init function:"""
# name = "eiger"
# prefix = "X12SA-ES-EIGER9M:"
# dm = DMMock()
# with mock.patch.object(dm, "connector"):
# with (
# mock.patch("ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"),
# mock.patch(
# "ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config"
# ),
# ):
# with mock.patch.object(ophyd, "cl") as mock_cl:
# mock_cl.get_pv = MockPV
# with (
# mock.patch(
# "csaxs_bec.devices.epics.mcs_csaxs.MCSSetup.initialize_detector"
# ) as mock_init_det,
# mock.patch(
# "csaxs_bec.devices.epics.mcs_csaxs.MCSSetup.initialize_detector_backend"
# ) as mock_init_backend,
# ):
# MCScSAXS(name=name, prefix=prefix, device_manager=dm)
# mock_init_det.assert_called_once()
# mock_init_backend.assert_called_once()
# @pytest.mark.parametrize(
# "trigger_source, channel_advance, channel_source1, pv_channels",
# [
# (
# 3,
# 1,
# 0,
# {
# "user_led": 0,
# "mux_output": 5,
# "input_pol": 0,
# "output_pol": 1,
# "count_on_start": 0,
# "stop_all": 1,
# },
# )
# ],
# )
# def test_initialize_detector(
# mock_det, trigger_source, channel_advance, channel_source1, pv_channels
# ):
# """Test the _init function:
# This includes testing the functions:
# - initialize_detector
# - stop_det
# - parent.set_trigger
# --> Testing the filewriter is done in test_init_filewriter
# Validation upon setting the correct PVs
# """
# mock_det.custom_prepare.initialize_detector() # call the method you want to test
# assert mock_det.channel_advance.get() == channel_advance
# assert mock_det.channel1_source.get() == channel_source1
# assert mock_det.user_led.get() == pv_channels["user_led"]
# assert mock_det.mux_output.get() == pv_channels["mux_output"]
# assert mock_det.input_polarity.get() == pv_channels["input_pol"]
# assert mock_det.output_polarity.get() == pv_channels["output_pol"]
# assert mock_det.count_on_start.get() == pv_channels["count_on_start"]
# assert mock_det.input_mode.get() == trigger_source
# def test_trigger(mock_det):
# """Test the trigger function:
# Validate that trigger calls the custom_prepare.on_trigger() function
# """
# with mock.patch.object(mock_det.custom_prepare, "on_trigger") as mock_on_trigger:
# mock_det.trigger()
# mock_on_trigger.assert_called_once()
# @pytest.mark.parametrize(
# "value, num_lines, num_points, done", [(100, 5, 500, False), (500, 5, 500, True)]
# )
# def test_progress_update(mock_det, value, num_lines, num_points, done):
# mock_det.num_lines.set(num_lines)
# mock_det.scaninfo.num_points = num_points
# calls = mock.call(sub_type="progress", value=value, max_value=num_points, done=done)
# with mock.patch.object(mock_det, "_run_subs") as mock_run_subs:
# mock_det.custom_prepare._progress_update(value=value)
# mock_run_subs.assert_called_once()
# assert mock_run_subs.call_args == calls
# @pytest.mark.parametrize(
# "values, expected_nothing",
# [([[100, 120, 140], [200, 220, 240], [300, 320, 340]], False), ([100, 200, 300], True)],
# )
# def test_on_mca_data(mock_det, values, expected_nothing):
# """Test the on_mca_data function:
# Validate that on_mca_data calls the custom_prepare.on_mca_data() function
# """
# with mock.patch.object(mock_det.custom_prepare, "_send_data_to_bec") as mock_send_data:
# mock_object = mock.MagicMock()
# for ii, name in enumerate(mock_det.custom_prepare.mca_names):
# mock_object.attr_name = name
# mock_det.custom_prepare._on_mca_data(obj=mock_object, value=values[ii])
# if not expected_nothing and ii < (len(values) - 1):
# assert mock_det.custom_prepare.mca_data[name] == values[ii]
# if not expected_nothing:
# mock_send_data.assert_called_once()
# assert mock_det.custom_prepare.acquisition_done is True
# @pytest.mark.parametrize(
# "metadata, mca_data",
# [
# (
# {"scan_id": 123},
# {
# "mca1": {"value": [100, 120, 140]},
# "mca3": {"value": [200, 220, 240]},
# "mca4": {"value": [300, 320, 340]},
# },
# )
# ],
# )
# def test_send_data_to_bec(mock_det, metadata, mca_data):
# mock_det.scaninfo.scan_msg = mock.MagicMock()
# mock_det.scaninfo.scan_msg.metadata = metadata
# mock_det.scaninfo.scan_id = metadata["scan_id"]
# mock_det.custom_prepare.mca_data = mca_data
# mock_det.custom_prepare._send_data_to_bec()
# device_metadata = mock_det.scaninfo.scan_msg.metadata
# metadata.update({"async_update": "append", "num_lines": mock_det.num_lines.get()})
# data = messages.DeviceMessage(signals=dict(mca_data), metadata=device_metadata)
# calls = mock.call(
# topic=MessageEndpoints.device_async_readback(
# scan_id=metadata["scan_id"], device=mock_det.name
# ),
# msg={"data": data},
# expire=1800,
# )
# assert mock_det.connector.xadd.call_args == calls
# @pytest.mark.parametrize(
# "scaninfo, triggersource, stopped, expected_exception",
# [
# (
# {"num_points": 500, "frames_per_trigger": 1, "scan_type": "step"},
# TriggerSource.MODE3,
# False,
# False,
# ),
# (
# {"num_points": 500, "frames_per_trigger": 1, "scan_type": "fly"},
# TriggerSource.MODE3,
# False,
# False,
# ),
# (
# {"num_points": 5001, "frames_per_trigger": 2, "scan_type": "step"},
# TriggerSource.MODE3,
# False,
# True,
# ),
# (
# {"num_points": 500, "frames_per_trigger": 2, "scan_type": "random"},
# TriggerSource.MODE3,
# False,
# True,
# ),
# ],
# )
# def test_stage(mock_det, scaninfo, triggersource, stopped, expected_exception):
# mock_det.scaninfo.num_points = scaninfo["num_points"]
# mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
# mock_det.scaninfo.scan_type = scaninfo["scan_type"]
# mock_det.stopped = stopped
# with mock.patch.object(mock_det.custom_prepare, "prepare_detector_backend") as mock_prep_fw:
# if expected_exception:
# with pytest.raises(MCSError):
# mock_det.stage()
# mock_prep_fw.assert_called_once()
# else:
# mock_det.stage()
# mock_prep_fw.assert_called_once()
# # Check set_trigger
# mock_det.input_mode.get() == triggersource
# if scaninfo["scan_type"] == "step":
# assert mock_det.num_use_all.get() == int(scaninfo["frames_per_trigger"]) * int(
# scaninfo["num_points"]
# )
# elif scaninfo["scan_type"] == "fly":
# assert mock_det.num_use_all.get() == int(scaninfo["num_points"])
# mock_det.preset_real.get() == 0
# # # CHeck custom_prepare.arm_acquisition
# # assert mock_det.custom_prepare.counter == 0
# # assert mock_det.erase_start.get() == 1
# # mock_prep_fw.assert_called_once()
# # # Check _prep_det
# # assert mock_det.cam.num_images.get() == int(
# # scaninfo["num_points"] * scaninfo["frames_per_trigger"]
# # )
# # assert mock_det.cam.num_frames.get() == 1
# # mock_publish_file_location.assert_called_with(done=False)
# # assert mock_det.cam.acquire.get() == 1
# def test_prepare_detector_backend(mock_det):
# mock_det.custom_prepare.prepare_detector_backend()
# assert mock_det.erase_all.get() == 1
# assert mock_det.read_mode.get() == ReadoutMode.EVENT
# def test_complete(mock_det):
# with (mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,):
# mock_det.complete()
# assert mock_finished.call_count == 1
# def test_stop_detector_backend(mock_det):
# mock_det.custom_prepare.stop_detector_backend()
# assert mock_det.custom_prepare.acquisition_done is True
# def test_stop(mock_det):
# with (
# mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
# mock.patch.object(
# mock_det.custom_prepare, "stop_detector_backend"
# ) as mock_stop_detector_backend,
# ):
# mock_det.stop()
# mock_stop_det.assert_called_once()
# mock_stop_detector_backend.assert_called_once()
# assert mock_det.stopped is True
# @pytest.mark.parametrize(
# "stopped, acquisition_done, acquiring_state, expected_exception",
# [
# (False, True, 0, False),
# (False, False, 0, True),
# (False, True, 1, True),
# (True, True, 0, True),
# ],
# )
# def test_finished(mock_det, stopped, acquisition_done, acquiring_state, expected_exception):
# mock_det.custom_prepare.acquisition_done = acquisition_done
# mock_det.acquiring._read_pv.mock_data = acquiring_state
# mock_det.scaninfo.num_points = 500
# mock_det.num_lines.put(500)
# mock_det.current_channel._read_pv.mock_data = 1
# mock_det.stopped = stopped
# if expected_exception:
# with pytest.raises(MCSTimeoutError):
# mock_det.timeout = 0.1
# mock_det.custom_prepare.finished()
# else:
# mock_det.custom_prepare.finished()
# if stopped:
# assert mock_det.stopped is stopped
# Check that the async mca signal is properly set
assert isinstance(mcs.mca.get(), messages.DeviceMessage)
assert len(mcs.mca.get().signals) == mcs.NUM_MCA_CHANNELS