From e046412eb87e8b8453ec1ea96dbef686ad795dcf Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 6 Jan 2026 09:29:21 +0100 Subject: [PATCH] tests: fix tests for ddg and mcs integrations --- .../epics/delay_generator_csaxs/ddg_1.py | 11 +- .../epics/delay_generator_csaxs/ddg_2.py | 2 +- .../devices/epics/mcs_card/mcs_card_csaxs.py | 2 +- .../test_delay_generator_csaxs.py | 523 ++++++++++++---- tests/tests_devices/test_mcs_card.py | 564 ++++++------------ 5 files changed, 590 insertions(+), 512 deletions(-) diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py index 628617f..c3a1160 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py @@ -289,10 +289,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): self.cancel_on_stop(status_acquiring) mcs.erase_start.put(1) - # NOTE Timeout of 3s should be plenty, any longer wait should checked. If this happens to crash - # an acquisition regularly with a WaitTimeoutError, the timeout can be increased but it should - # be investigated why the EPICS interface is slow to respond. - status_acquiring.wait(timeout=3) + return status_acquiring def _poll_event_status(self) -> None: """ @@ -464,7 +461,11 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): if mcs is None or mcs.enabled is False: logger.info("Did not find mcs card with name 'mcs' in current session") else: - self._prepare_mcs_on_trigger(mcs) + status_mcs = self._prepare_mcs_on_trigger(mcs) + # NOTE Timeout of 3s should be plenty, any longer wait should checked. If this happens to crash + # an acquisition regularly with a WaitTimeoutError, the timeout can be increased but it should + # be investigated why the EPICS interface is slow to respond. + status_mcs.wait(timeout=3) # Prepare StatusBitsCompareStatus to resolve once the END_OF_BURST bit was set. status = self._prepare_trigger_status_event() diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py index 16172b5..55dad7c 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py @@ -25,7 +25,7 @@ Burst mode is enabled: import time from bec_lib.logger import bec_logger -from ophyd import DeviceStatus, StatusBase +from ophyd_devices import DeviceStatus, StatusBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import ( diff --git a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py index 3f21f71..b42e3b6 100644 --- a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py +++ b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py @@ -475,7 +475,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): # Handle external stop/cancel, and stop monitoring ret_status.add_callback(self._status_failed_callback) self.cancel_on_stop(ret_status) - return status + return ret_status def on_destroy(self): """ diff --git a/tests/tests_devices/test_delay_generator_csaxs.py b/tests/tests_devices/test_delay_generator_csaxs.py index 252babc..18079c6 100644 --- a/tests/tests_devices/test_delay_generator_csaxs.py +++ b/tests/tests_devices/test_delay_generator_csaxs.py @@ -6,9 +6,35 @@ from unittest import mock import numpy as np import ophyd import pytest -from ophyd_devices.tests.utils import MockPV, patch_dual_pvs +from bec_server.device_server.tests.utils import DMMock +from ophyd_devices.tests.utils import patched_device from csaxs_bec.devices.epics.delay_generator_csaxs import DDG1, DDG2 +from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import ( + DEFAULT_IO_CONFIG as DDG1_DEFAULT_IO_CONFIG, +) +from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import ( + DEFAULT_READOUT_TIMES as DDG1_DEFAULT_READOUT_TIMES, +) +from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import ( + DEFAULT_REFERENCES as DDG1_DEFAULT_REFERENCES, +) +from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import ( + DEFAULT_TRIGGER_SOURCE as DDG1_DEFAULT_TRIGGER_SOURCE, +) +from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import PROC_EVENT_MODE +from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_2 import ( + DEFAULT_IO_CONFIG as DDG2_DEFAULT_IO_CONFIG, +) +from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_2 import ( + DEFAULT_READOUT_TIMES as DDG2_DEFAULT_READOUT_TIMES, +) +from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_2 import ( + DEFAULT_REFERENCES as DDG2_DEFAULT_REFERENCES, +) +from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_2 import ( + DEFAULT_TRIGGER_SOURCE as DDG2_DEFAULT_TRIGGER_SOURCE, +) from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import ( BURSTCONFIG, CHANNELREFERENCE, @@ -16,68 +42,46 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import TRIGGERSOURCE, DelayGeneratorCSAXS, ) +from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import MCSCardCSAXS - -@pytest.fixture(scope="function") -def mock_ddg1() -> Generator[DDG1, DDG1, DDG1]: - """Fixture to mock the DDG1 device.""" - name = "ddg1" - prefix = "test_ddg1:" - with mock.patch.object(ophyd, "cl") as mock_cl: - mock_cl.get_pv = MockPV - mock_cl.thread_class = threading.Thread - dev = DDG1(name=name, prefix=prefix) - patch_dual_pvs(dev) - yield dev - - -@pytest.fixture(scope="function") -def mock_ddg2() -> Generator[DDG2, DDG2, DDG2]: - """Fixture to mock the DDG1 device.""" - name = "ddg2" - prefix = "test_ddg2:" - with mock.patch.object(ophyd, "cl") as mock_cl: - mock_cl.get_pv = MockPV - mock_cl.thread_class = threading.Thread - dev = DDG2(name=name, prefix=prefix) - patch_dual_pvs(dev) - yield dev +############################ +### Test Delay Generator ### +############################ @pytest.fixture(scope="function") def mock_ddg() -> Generator[DelayGeneratorCSAXS, DelayGeneratorCSAXS, DelayGeneratorCSAXS]: """Fixture to mock the camera device.""" - name = "ddg" - prefix = "test:" - with mock.patch.object(ophyd, "cl") as mock_cl: - mock_cl.get_pv = MockPV - mock_cl.thread_class = threading.Thread - dev = DelayGeneratorCSAXS(name=name, prefix=prefix) - patch_dual_pvs(dev) - yield dev + with patched_device( + DelayGeneratorCSAXS, name="ddg", prefix="test:", _mock_pv_initial_value=0 + ) as dev: + try: + yield dev + finally: + dev.destroy() -def test_ddg_init(mock_ddg): +def test_ddg_init(mock_ddg: DelayGeneratorCSAXS): """Test the proc event status method.""" assert mock_ddg.name == "ddg" assert mock_ddg.prefix == "test:" -def test_ddg_proc_event_status(mock_ddg): +def test_ddg_proc_event_status(mock_ddg: DelayGeneratorCSAXS): """Test the proc event status method.""" mock_ddg.state.proc_status.put(0) mock_ddg.proc_event_status() assert mock_ddg.state.proc_status.get() == 1 -def test_ddg_set_trigger(mock_ddg): +def test_ddg_set_trigger(mock_ddg: DelayGeneratorCSAXS): """Test setting the trigger.""" for trigger in TRIGGERSOURCE: mock_ddg.set_trigger(trigger) assert mock_ddg.trigger_source.get() == trigger.value -def test_ddg_burst_enable(mock_ddg): +def test_ddg_burst_enable(mock_ddg: DelayGeneratorCSAXS): """Test enabling burst mode.""" mock_ddg.burst_enable(count=100, delay=0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES) mock_ddg.burst_mode.get() == 1 @@ -101,7 +105,7 @@ def test_ddg_burst_enable(mock_ddg): mock_ddg.burst_mode.get() == BURSTCONFIG.FIRST_CYCLE.value -def test_ddg_wait_for_event_status(mock_ddg): +def test_ddg_wait_for_event_status(mock_ddg: DelayGeneratorCSAXS): """Test setting wait for event status.""" mock_ddg: DelayGeneratorCSAXS mock_ddg.state.event_status._read_pv.mock_data = 0 @@ -117,7 +121,7 @@ def test_ddg_wait_for_event_status(mock_ddg): # assert status.done is True -def test_ddg_set_io_values(mock_ddg): +def test_ddg_set_io_values(mock_ddg: DelayGeneratorCSAXS): """Test setting IO values.""" mock_ddg.set_io_values(channel="ab", amplitude=3, offset=2, polarity=1, mode="ttl") assert mock_ddg.ab.io.amplitude.get() == 3 @@ -138,7 +142,7 @@ def test_ddg_set_io_values(mock_ddg): assert attr.nim_mode.get() == 1 -def test_ddg_set_delay_pairs(mock_ddg): +def test_ddg_set_delay_pairs(mock_ddg: DelayGeneratorCSAXS): """Test setting delay pairs.""" mock_ddg.set_delay_pairs(channel="ab", delay=0.1, width=0.2) assert np.isclose(mock_ddg.ab.delay.get(), 0.1) @@ -156,52 +160,143 @@ def test_ddg_set_delay_pairs(mock_ddg): assert np.isclose(getattr(mock_ddg, channel).ch2.setpoint.get(), delay + 0.2) -def test_ddg1_on_connected(mock_ddg1): +######################### +### Test DDG1 Device #### +######################### + + +@pytest.fixture(scope="function") +def mock_mcs_csaxs() -> Generator[MCSCardCSAXS, None, None]: + """Fixture to mock the MCSCardCSAXS device.""" + dm = DMMock() + with patched_device( + MCSCardCSAXS, + name="mcs", + prefix="X12SA-MCS-CSAXS:", + device_manager=dm, + _mock_pv_initial_value=0, + ) as dev: + dev.enabled = True + dev.device_manager.devices["mcs"] = dev + try: + yield dev + finally: + dev.destroy() + + +@pytest.fixture(scope="function") +def mock_ddg1(mock_mcs_csaxs: MCSCardCSAXS) -> Generator[DDG1, None, None]: + """Fixture to mock the DDG1 device.""" + # Add enabled to mock_mcs_csaxs + dm_mock = mock_mcs_csaxs.device_manager + with patched_device( + DDG1, name="ddg1", prefix="test_ddg1:", device_manager=dm_mock, _mock_pv_initial_value=0 + ) as dev: + dev.enabled = True + dev.device_manager.devices["ddg1"] = dev + try: + yield dev + finally: + dev.destroy() + + +def test_ddg1_on_connected(mock_ddg1: DDG1): """Test the on_connected method of DDG1.""" - mock_ddg1.on_connected() - # IO defaults - assert mock_ddg1.burst_mode.get() == 0 - assert mock_ddg1.ab.io.amplitude.get() == 5.0 - assert mock_ddg1.cd.io.offset.get() == 0.0 - assert mock_ddg1.ef.io.polarity.get() == 1 - assert mock_ddg1.gh.io.ttl_mode.get() == 1 + mock_ddg1.burst_mode.put(1) # Set burst mode to 1, if connected should reset it to 0 + mock_ddg1.burst_delay.put(5) # Set to non-zero, should reset to 0 on connected + mock_ddg1.burst_count.put(10) # Set to non-default, should reset to 1 on connected + with mock.patch.object(mock_ddg1, "set_io_values") as mock_set_io_values: + mock_ddg1.on_connected() - # reference defaults - assert mock_ddg1.ab.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value - assert mock_ddg1.ab.ch2.reference.get() == 1 # CHANNELREFERENCE.A.value - assert mock_ddg1.cd.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value - assert mock_ddg1.cd.ch2.reference.get() == 3 # CHANNELREFERENCE.C.value - assert mock_ddg1.ef.ch1.reference.get() == 4 # CHANNELREFERENCE.D.value - assert mock_ddg1.ef.ch2.reference.get() == 5 # CHANNELREFERENCE.E.value - assert mock_ddg1.gh.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value - assert mock_ddg1.gh.ch2.reference.get() == 7 # CHANNELREFERENCE.G.value + # Burst mode Defaults + assert mock_ddg1.burst_mode.get() == 0 + assert mock_ddg1.burst_delay.get() == 0 + assert mock_ddg1.burst_count.get() == 1 - # Default trigger source - assert mock_ddg1.trigger_source.get() == 5 # TRIGGERSOURCE.SINGLE_SHOT.value + assert mock_set_io_values.call_count == len(DDG1_DEFAULT_IO_CONFIG) + for ch, config in DDG1_DEFAULT_IO_CONFIG.items(): + assert mock.call(ch, **config) in mock_set_io_values.call_args_list + + # Check reference values from DEFAULT_REFERENCES + for ch, refs in DDG1_DEFAULT_REFERENCES: + if ch == "A": + sub_ch = mock_ddg1.ab.ch1 + elif ch == "B": + sub_ch = mock_ddg1.ab.ch2 + elif ch == "C": + sub_ch = mock_ddg1.cd.ch1 + elif ch == "D": + sub_ch = mock_ddg1.cd.ch2 + elif ch == "E": + sub_ch = mock_ddg1.ef.ch1 + elif ch == "F": + sub_ch = mock_ddg1.ef.ch2 + elif ch == "G": + sub_ch = mock_ddg1.gh.ch1 + elif ch == "H": + sub_ch = mock_ddg1.gh.ch2 + assert sub_ch.reference.get() == refs.value + + # Check Default trigger source + assert mock_ddg1.trigger_source.get() == DDG1_DEFAULT_TRIGGER_SOURCE.value + + # Check proc state mode + assert mock_ddg1.state.proc_status_mode.get() == PROC_EVENT_MODE.EVENT.value + + # Check the poll thread is started + assert mock_ddg1._poll_thread.is_alive() + assert not mock_ddg1._poll_thread_kill_event.is_set() + assert not mock_ddg1._poll_thread_poll_loop_done.is_set() + assert not mock_ddg1._poll_thread_run_event.is_set() -def test_ddg1_stage(mock_ddg1): +def test_ddg1_prepare_mcs(mock_ddg1: DDG1, mock_mcs_csaxs: MCSCardCSAXS): + """Test the prepare_mcs method of DDG1.""" + mcs = mock_mcs_csaxs + ddg = mock_ddg1 + # Simulate default state + mcs.acquiring._read_pv.mock_data = 0 # not acquiring + mcs.erase_start.put(0) # reset erase start + + # Prepare MCS on trigger + st = ddg._prepare_mcs_on_trigger(mcs) + assert st.done is False + assert st.success is False + assert mcs.erase_start.get() == 1 # erase started + + # Simulate acquiring started + mcs.acquiring._read_pv.mock_data = 1 # acquiring + st.wait(2) + assert st.done is True + assert st.success is True + + +def test_ddg1_stage(mock_ddg1: DDG1): """Test the on_stage method of DDG1.""" exp_time = 0.1 frames_per_trigger = 10 - mock_ddg1.burst_mode.put(1) + mock_ddg1.burst_mode.put(0) # Non-default, should be reset on stage + mock_ddg1.burst_delay.put(5) # Non-default, should be reset on stage + mock_ddg1.burst_count.put(10) # Non-default, should be reset on stage + mock_ddg1.scan_info.msg.scan_parameters["exp_time"] = exp_time mock_ddg1.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger mock_ddg1.stage() + shutter_width = 2e-3 + exp_time * frames_per_trigger + 1e-3 + assert np.isclose(mock_ddg1.burst_mode.get(), 1) # burst mode is enabled assert np.isclose(mock_ddg1.burst_delay.get(), 0) - assert np.isclose(mock_ddg1.burst_period.get(), exp_time) + assert np.isclose(mock_ddg1.burst_period.get(), shutter_width) # Trigger DDG2 through EXT/EN - assert np.isclose(mock_ddg1.ab.delay.get(), 2e-3) assert np.isclose(mock_ddg1.ab.width.get(), 1e-6) # Shutter channel cd assert np.isclose(mock_ddg1.cd.delay.get(), 0) - assert np.isclose(mock_ddg1.cd.width.get(), 2e-3 + exp_time * frames_per_trigger + 1e-3) + assert np.isclose(mock_ddg1.cd.width.get(), shutter_width) # MCS channel ef or gate assert np.isclose(mock_ddg1.ef.delay.get(), 0) assert np.isclose(mock_ddg1.ef.width.get(), 1e-6) @@ -209,96 +304,266 @@ def test_ddg1_stage(mock_ddg1): assert mock_ddg1.staged == ophyd.Staged.yes -def test_ddg1_trigger(mock_ddg1): - """Test the on_trigger method of DDG1.""" - mock_ddg1.state.event_status._read_pv.mock_data = STATUSBITS.NONE.value +def test_ddg1_on_trigger(mock_ddg1: DDG1): + """ + Test the on_trigger method of the DDG1. + + We will test two scenarios: + I. Trigger is prepared, and resolves successfully after END_OF_BURST is reached in event status register. + II. Trigger is called while _poll_thread_loop_done is not yet finished from a previous trigger. + This may be the case if polling is yet to finsish. The next on_trigger should terminate the previous + polling, and work as expected. In addition, we will simulate that the mcs card is disabled, thus not prepared. + """ + ddg = mock_ddg1 + # Make sure DDG is setup in default state through on_connected + ddg.on_connected() + + # Check that poll thread is running and run event is not set + assert ddg._poll_thread.is_alive() + assert not ddg._poll_thread_run_event.is_set() + assert not ddg._poll_thread_poll_loop_done.is_set() + + # Set the status register bit + ddg.state.event_status._read_pv.mock_data = STATUSBITS.ABORT_DELAY.value + + ################################# + # Scenario I - normal operation # + ################################# + with mock.patch.object(ddg, "_prepare_mcs_on_trigger") as mock_prepare_mcs: + mock_prepare_mcs.return_value = ophyd.StatusBase(done=True, success=True) + status = ddg.trigger() + + # Check that the poll thread run event is set + assert ddg._poll_thread_run_event.is_set() + assert not ddg._poll_thread_poll_loop_done.is_set() - with mock.patch.object(mock_ddg1, "device_manager") as mock_device_manager: - # TODO add device manager DMMock, and properly test logic for mcs triggering. - mock_get = mock_device_manager.devices.get = mock.Mock(return_value=None) - status = mock_ddg1.trigger() - assert mock_get.call_args == mock.call("mcs", None) assert status.done is False assert status.success is False - assert mock_ddg1.trigger_shot.get() == 1 - mock_ddg1.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_BURST.value + assert ddg.trigger_shot.get() == 1 + + # Simulate that the event status bit reaches END_OF_BURST + ddg.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_BURST.value status.wait(timeout=1) # Wait for the status to be done assert status.done is True assert status.success is True + # Should finish the poll loop + ddg._poll_thread_poll_loop_done.wait(timeout=1) + assert not ddg._poll_thread_run_event.is_set() -def test_ddg1_stop(mock_ddg1): - """Test the on_stop method of DDG1.""" - mock_ddg1.burst_mode.put(1) # Enable burst mode - mock_ddg1.stop() - assert mock_ddg1.burst_mode.get() == 0 # Burst mode is disabled + ############################################ + # Scenario II - previous poll not finished # + # MCS card disabled # + ############################################ + + # Set mcs card to enabled = False + ddg.device_manager.devices["mcs"].enabled = False + ddg.state.event_status._read_pv.mock_data = STATUSBITS.ABORT_DELAY.value + ddg._start_polling() + assert ddg._poll_thread_run_event.is_set() + with mock.patch.object(ddg, "_prepare_mcs_on_trigger") as mock_prepare_mcs: + status = ddg.trigger() + mock_prepare_mcs.assert_not_called() # MCS is disabled, should not be called + assert status.done is False + assert status.success is False + + # Resolve the status by simulating END_OF_BURST + ddg.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_BURST.value + status.wait(timeout=1) # Wait for the status to be done + assert status.done is True + assert status.success is True + + # Wait for poll loop to finish + ddg._poll_thread_poll_loop_done.wait(timeout=1) + assert not ddg._poll_thread_run_event.is_set() -def test_ddg2_on_connected(mock_ddg2): - """Test on connected method of DDG2.""" - mock_ddg2.on_connected() - # IO defaults - assert mock_ddg2.burst_mode.get() == 0 - assert mock_ddg2.ab.io.amplitude.get() == 5.0 - assert mock_ddg2.cd.io.offset.get() == 0.0 - assert mock_ddg2.ef.io.polarity.get() == 1 - assert mock_ddg2.gh.io.ttl_mode.get() == 1 +# def test_ddg1_trigger(mock_ddg1): +# """Test the on_trigger method of DDG1.""" +# mock_ddg1.state.event_status._read_pv.mock_data = STATUSBITS.NONE.value - # reference defaults - assert mock_ddg2.ab.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value - assert mock_ddg2.ab.ch2.reference.get() == 1 # CHANNELREFERENCE.A.value - assert mock_ddg2.cd.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value - assert mock_ddg2.cd.ch2.reference.get() == 3 # CHANNELREFERENCE.C.value - assert mock_ddg2.ef.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value - assert mock_ddg2.ef.ch2.reference.get() == 5 # CHANNELREFERENCE.E.value - assert mock_ddg2.gh.ch1.reference.get() == 0 # CHANNELREFERENCE.T0.value - assert mock_ddg2.gh.ch2.reference.get() == 7 # CHANNELREFERENCE.G.value - - # Default trigger source - assert mock_ddg2.trigger_source.get() == 1 # TRIGGERSOURCE.EXT_RISING_EDGE.value +# with mock.patch.object(mock_ddg1, "device_manager") as mock_device_manager: +# # TODO add device manager DMMock, and properly test logic for mcs triggering. +# mock_get = mock_device_manager.devices.get = mock.Mock(return_value=None) +# status = mock_ddg1.trigger() +# assert mock_get.call_args == mock.call("mcs", None) +# assert status.done is False +# assert status.success is False +# assert mock_ddg1.trigger_shot.get() == 1 +# mock_ddg1.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_BURST.value +# status.wait(timeout=1) # Wait for the status to be done +# assert status.done is True +# assert status.success is True -def test_ddg2_stage(mock_ddg2): - """Test the on_stage method of DDG2.""" +# def test_ddg1_stop(mock_ddg1): +# """Test the on_stop method of DDG1.""" +# mock_ddg1.burst_mode.put(1) # Enable burst mode +# mock_ddg1.stop() +# assert mock_ddg1.burst_mode.get() == 0 # Burst mode is disabled + + +######################### +### Test DDG2 Device #### +######################### + + +@pytest.fixture(scope="function") +def mock_ddg2(mock_mcs_csaxs: MCSCardCSAXS) -> Generator[DDG2, None, None]: + """Fixture to mock the DDG1 device.""" + # Add enabled to mock_mcs_csaxs + dm_mock = mock_mcs_csaxs.device_manager + with patched_device( + DDG2, name="ddg2", prefix="test_ddg2:", device_manager=dm_mock, _mock_pv_initial_value=0 + ) as dev: + dev.enabled = True + dev.device_manager.devices["ddg2"] = dev + try: + yield dev + finally: + dev.destroy() + + +def test_ddg2_on_connected(mock_ddg2: DDG2): + """Test the on_connected method of DDG1.""" + mock_ddg2.burst_mode.put(1) # Set burst mode to 1, if connected should reset it to 0 + mock_ddg2.burst_delay.put(5) # Set to non-zero, should reset to 0 on connected + mock_ddg2.burst_count.put(10) # Set to non-default, should reset to 1 on connected + with mock.patch.object(mock_ddg2, "set_io_values") as mock_set_io_values: + mock_ddg2.on_connected() + # Burst mode Defaults + assert mock_ddg2.burst_mode.get() == 0 + + assert mock_set_io_values.call_count == len(DDG2_DEFAULT_IO_CONFIG) + for ch, config in DDG2_DEFAULT_IO_CONFIG.items(): + assert mock.call(ch, **config) in mock_set_io_values.call_args_list + + # Check reference values from DEFAULT_REFERENCES + for ch, refs in DDG2_DEFAULT_REFERENCES: + if ch == "A": + sub_ch = mock_ddg2.ab.ch1 + elif ch == "B": + sub_ch = mock_ddg2.ab.ch2 + elif ch == "C": + sub_ch = mock_ddg2.cd.ch1 + elif ch == "D": + sub_ch = mock_ddg2.cd.ch2 + elif ch == "E": + sub_ch = mock_ddg2.ef.ch1 + elif ch == "F": + sub_ch = mock_ddg2.ef.ch2 + elif ch == "G": + sub_ch = mock_ddg2.gh.ch1 + elif ch == "H": + sub_ch = mock_ddg2.gh.ch2 + assert sub_ch.reference.get() == refs.value + + # Check Default trigger source + assert mock_ddg2.trigger_source.get() == DDG2_DEFAULT_TRIGGER_SOURCE.value + + +def test_ddg2_on_stage(mock_ddg2: DDG2): + """ + Test the on_stage method of DDG2. + + We will test two scenarios: + I. Stage device with valid parameters. + II. Stage device with invalid parameters (too short exp_time). Should raise ValueError. + """ + ddg = mock_ddg2 exp_time = 0.1 frames_per_trigger = 10 - mock_ddg2.on_connected() + ddg.on_connected() + ddg.scan_info.msg.scan_parameters["exp_time"] = exp_time + ddg.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger - mock_ddg2.burst_mode.put(0) - mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = exp_time - mock_ddg2.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger + # Set non-default burst mode settings + ddg.burst_mode.put(0) + ddg.burst_delay.put(5) - mock_ddg2.stage() + # Stage device with valid parameters + ddg.stage() + assert ddg.staged == ophyd.Staged.yes + assert ddg.burst_mode.get() == 1 # Burst mode is enabled + assert ddg.burst_delay.get() == 0 # Burst delay is set to 0 + assert ddg.burst_count.get() == frames_per_trigger + assert ddg.burst_period.get() == exp_time - assert np.isclose(mock_ddg2.burst_mode.get(), 1) # Burst mode is enabled - assert np.isclose(mock_ddg2.ab.delay.get(), 0) - assert np.isclose(mock_ddg2.ab.width.get(), exp_time - 2e-4) # DEFAULT_READOUT_TIMES["ab"]) - assert mock_ddg2.burst_count.get() == frames_per_trigger - assert np.isclose(mock_ddg2.burst_delay.get(), 0) - assert np.isclose(mock_ddg2.burst_period.get(), exp_time) - - assert mock_ddg2.trigger_source.get() == TRIGGERSOURCE.EXT_RISING_EDGE.value - - assert mock_ddg2.staged == ophyd.Staged.yes - mock_ddg2.unstage() # Reset staged state for next test + # Pulse width is exp_time - readout_time + burst_pulse_width = exp_time - DDG2_DEFAULT_READOUT_TIMES["ab"] + assert np.isclose(ddg.ab.delay.get(), 0) + assert np.isclose(ddg.ab.width.get(), burst_pulse_width) + # Unstage to reset + ddg.unstage() # Reset staged state for next test + exp_time_short = 2e-4 # too short exposure time with pytest.raises(ValueError): - mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = 2e-4 # too short exposure time - mock_ddg2.stage() + ddg.scan_info.msg.scan_parameters["exp_time"] = exp_time_short + ddg.stage() -def test_ddg2_trigger(mock_ddg2): +def test_ddg2_on_trigger(mock_ddg2: DDG2): """Test the on_trigger method of DDG2.""" - mock_ddg2.trigger_shot.put(0) - status = mock_ddg2.trigger() - assert mock_ddg2.trigger_shot.get() == 0 # Should not trigger DDG2 via soft trigger + ddg = mock_ddg2 + ddg.on_connected() + ddg.trigger_shot.put(0) + status = ddg.trigger() + assert ddg.trigger_shot.get() == 0 # Should not trigger DDG2 via soft trigger status.wait() assert status.done is True assert status.success is True -def test_ddg2_stop(mock_ddg2): +def test_ddg2_on_stop(mock_ddg2: DDG2): """Test the on_stop method of DDG2.""" - mock_ddg2.burst_mode.put(1) # Enable burst mode - mock_ddg2.stop() - assert mock_ddg2.burst_mode.get() == 0 # Burst mode is disabled + ddg = mock_ddg2 + ddg.on_connected() + ddg.burst_mode.put(1) # Enable burst mode + ddg.stop() + assert ddg.burst_mode.get() == 0 # Burst mode is disabled + + +# def test_ddg2_stage(mock_ddg2): +# """Test the on_stage method of DDG2.""" +# exp_time = 0.1 +# frames_per_trigger = 10 +# mock_ddg2.on_connected() + +# mock_ddg2.burst_mode.put(0) +# mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = exp_time +# mock_ddg2.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger + +# mock_ddg2.stage() + +# assert np.isclose(mock_ddg2.burst_mode.get(), 1) # Burst mode is enabled +# assert np.isclose(mock_ddg2.ab.delay.get(), 0) +# assert np.isclose(mock_ddg2.ab.width.get(), exp_time - 2e-4) # DEFAULT_READOUT_TIMES["ab"]) +# assert mock_ddg2.burst_count.get() == frames_per_trigger +# assert np.isclose(mock_ddg2.burst_delay.get(), 0) +# assert np.isclose(mock_ddg2.burst_period.get(), exp_time) + +# assert mock_ddg2.trigger_source.get() == TRIGGERSOURCE.EXT_RISING_EDGE.value + +# assert mock_ddg2.staged == ophyd.Staged.yes +# mock_ddg2.unstage() # Reset staged state for next test + +# with pytest.raises(ValueError): +# mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = 2e-4 # too short exposure time +# mock_ddg2.stage() + + +# def test_ddg2_trigger(mock_ddg2): +# """Test the on_trigger method of DDG2.""" +# mock_ddg2.trigger_shot.put(0) +# status = mock_ddg2.trigger() +# assert mock_ddg2.trigger_shot.get() == 0 # Should not trigger DDG2 via soft trigger +# status.wait() +# assert status.done is True +# assert status.success is True + + +# def test_ddg2_stop(mock_ddg2): +# """Test the on_stop method of DDG2.""" +# mock_ddg2.burst_mode.put(1) # Enable burst mode +# mock_ddg2.stop() +# assert mock_ddg2.burst_mode.get() == 0 # Burst mode is disabled diff --git a/tests/tests_devices/test_mcs_card.py b/tests/tests_devices/test_mcs_card.py index c829988..4bfe5ee 100644 --- a/tests/tests_devices/test_mcs_card.py +++ b/tests/tests_devices/test_mcs_card.py @@ -1,5 +1,7 @@ # pylint: skip-file import threading +from copy import deepcopy +from typing import Generator from unittest import mock import numpy as np @@ -8,6 +10,7 @@ import pytest from bec_lib import messages from bec_lib.endpoints import MessageEndpoints from bec_server.device_server.tests.utils import DMMock +from ophyd_devices.interfaces.base_classes.psi_device_base import DeviceStoppedError from ophyd_devices.tests.utils import MockPV, patch_dual_pvs from csaxs_bec.devices.epics.mcs_card.mcs_card import ( @@ -46,429 +49,238 @@ def test_mcs_card(mock_mcs_card): @pytest.fixture(scope="function") -def mock_mcs_csaxs(): +def mock_mcs_csaxs() -> Generator[MCSCardCSAXS, None, None]: """Fixture to mock the MCSCardCSAXS device.""" name = "mcs_csaxs" prefix = "X12SA-MCS-CSAXS:" dm = DMMock() - with mock.patch.object(ophyd, "cl") as mock_cl: - mock_cl.get_pv = MockPV - mock_cl.thread_class = threading.Thread - mcs_card_csaxs = MCSCardCSAXS(name=name, prefix=prefix, device_manager=dm) - patch_dual_pvs(mcs_card_csaxs) - yield mcs_card_csaxs + try: + with mock.patch.object(ophyd, "cl") as mock_cl: + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + mcs_card_csaxs = MCSCardCSAXS(name=name, prefix=prefix, device_manager=dm) + patch_dual_pvs(mcs_card_csaxs) + yield mcs_card_csaxs + finally: + mcs_card_csaxs.on_destroy() -def test_mcs_card_csaxs(mock_mcs_csaxs): +def test_mcs_card_csaxs(mock_mcs_csaxs: MCSCardCSAXS): """Test the MCSCardCSAXS initialization.""" assert mock_mcs_csaxs.name == "mcs_csaxs" assert mock_mcs_csaxs.prefix == "X12SA-MCS-CSAXS:" - assert mock_mcs_csaxs.counter_mapping == { - "mcs_csaxs_counters_mca1": "current1", - "mcs_csaxs_counters_mca2": "current2", - "mcs_csaxs_counters_mca3": "current3", - "mcs_csaxs_counters_mca4": "current4", - "mcs_csaxs_counters_mca5": "count_time", - } - assert mock_mcs_csaxs._mcs_clock == 1e7 # 10 MHz + assert mock_mcs_csaxs._acquisition_group == "monitored" + assert mock_mcs_csaxs._num_total_triggers == 0 + assert mock_mcs_csaxs._mcs_clock == 1e7 + assert mock_mcs_csaxs._pv_timeout == 2.0 + assert mock_mcs_csaxs._mca_counter_index == 0 + assert mock_mcs_csaxs._current_data_index == 0 + assert mock_mcs_csaxs._current_data == {} + assert mock_mcs_csaxs.NUM_MCA_CHANNELS == 32 -def test_mcs_card_csaxs_on_connected(mock_mcs_csaxs): +def test_mcs_card_csaxs_on_connected(mock_mcs_csaxs: MCSCardCSAXS): """Test the on_connected method of MCSCardCSAXS.""" mcs = mock_mcs_csaxs - mcs.on_connected() - # Stop called - assert mcs.stop_all.get() == 1 - # Channel advance settings - assert mcs.channel_advance.get() == CHANNELADVANCE.EXTERNAL - assert mcs.channel1_source.get() == CHANNEL1SOURCE.EXTERNAL - assert mcs.prescale.get() == 1 - # - assert mcs.user_led.get() == 0 - # input output settings - assert mcs.input_mode.get() == INPUTMODE.MODE_3 - assert mcs.input_polarity.get() == POLARITY.NORMAL - assert mcs.output_mode.get() == OUTPUTMODE.MODE_2 - assert mcs.output_polarity.get() == POLARITY.NORMAL - assert mcs.count_on_start.get() == 0 - assert mcs.read_mode.get() == READMODE.PASSIVE - assert mcs.acquire_mode.get() == ACQUIREMODE.MCS - - with mock.patch.object(mcs.counters.mca1, "subscribe") as mock_mca_subscribe: + with ( + mock.patch.object(mcs.counters.mca1, "subscribe") as mock_mca_subscribe, + mock.patch.object(mcs, "mcs_recovery") as mock_mcs_recovery, + mock.patch.object(mcs._scan_done_thread, "start") as mock_scan_done_thread_start, + ): mcs.on_connected() + # Stop called + assert mcs.stop_all.get() == 1 + # Channel advance settings + assert mcs.channel_advance.get() == CHANNELADVANCE.EXTERNAL + assert mcs.channel1_source.get() == CHANNEL1SOURCE.EXTERNAL + assert mcs.prescale.get() == 1 + assert mcs.user_led.get() == 0 + + # Mux output + assert mcs.mux_output.get() == mcs.NUM_MCA_CHANNELS + + # input output settings + assert mcs.input_mode.get() == INPUTMODE.MODE_3 + assert mcs.input_polarity.get() == POLARITY.NORMAL + assert mcs.output_mode.get() == OUTPUTMODE.MODE_2 + assert mcs.output_polarity.get() == POLARITY.NORMAL + assert mcs.count_on_start.get() == 0 + assert mcs.read_mode.get() == READMODE.PASSIVE + assert mcs.acquire_mode.get() == ACQUIREMODE.MCS + + # Check if subscriptions are setup correctly assert mock_mca_subscribe.call_args == mock.call(mcs._on_counter_update, run=False) + # Check if recovery is called + mock_mcs_recovery.assert_called_once_with(timeout=1) + # Check if scan done thread is started + mock_scan_done_thread_start.assert_called_once() -def test_mcs_card_csaxs_stage(mock_mcs_csaxs): +def test_mcs_card_csaxs_stage(mock_mcs_csaxs: MCSCardCSAXS): """Test on stage method of MCSCardCSAXS""" mcs = mock_mcs_csaxs triggers = 5 + num_points = 10 mcs.scan_info.msg.scan_parameters["frames_per_trigger"] = triggers - mcs.erase_all.put(0) + mcs.scan_info.msg.num_points = num_points + + # Simulate that the MCS card is still acquiring, and that current channel is !=0 + mcs.current_channel._read_pv.mock_data = 2 # Simulate that current channel is not zero + mcs.erase_all.put(0) # Set erase_all to 0 + mcs._current_data = {"mca1": [1, 2, 3]} # Simulate existing data + mcs._scan_done_callbacks = [lambda: None] # Simulate existing callbacks + mcs._start_monitor_async_data_emission.set() # Simulate that monitoring is started + mcs._omit_mca_callbacks.set() # Simulate that mca callbacks are omitted + mcs.stage() + # Check that card is staged assert mcs._staged == ophyd.Staged.yes - assert mcs.erase_all.get() == 1 + + # Check that erase_all, stop_all, preset_real, num_use_all are set correctly + assert mcs.erase_all.get() == 1 # Should be set to 1 as current_channel !=0 assert mcs.preset_real.get() == 0 assert mcs.num_use_all.get() == triggers + # Check that internal variables are reset + assert mcs._num_total_triggers == triggers * num_points + assert mcs._current_data == {} + assert mcs._scan_done_callbacks == [] + assert mcs._current_data_index == 0 + + # Check that thread events are cleared properly + assert not mcs._start_monitor_async_data_emission.is_set() + assert not mcs._omit_mca_callbacks.is_set() + def test_mcs_card_csaxs_unstage(mock_mcs_csaxs): """Test unstage method of MCSCardCSAXS""" mcs = mock_mcs_csaxs mcs.stop_all.put(0) - mcs.ready_to_read.put(0) - mcs.erase_all.put(1) + mcs.erase_all.put(0) mcs.unstage() assert mcs.stop_all.get() == 1 - assert mcs.erase_all.get() == 0 + assert mcs.erase_all.get() == 1 -def test_mcs_card_csaxs_complete_and_stop(mock_mcs_csaxs): - """Test complete method of MCSCarcCSAXS""" +def test_mcs_card_csaxs_complete_and_stop(mock_mcs_csaxs: MCSCardCSAXS): + """ + Test complete method of MCSCarcCSAXS. + + Two use cases: + I. Acquisition is stopped externally + II. Acquisition completes normally + """ mcs = mock_mcs_csaxs mcs.acquiring._read_pv.mock_data = ACQUIRING.ACQUIRING + # Make sure that device on_connected has been called which starts the monitoring thread + mcs.on_connected() + + ####################### + # I. Use case where acquisition is stopped + ####################### + st = mcs.complete() assert st.done is False - mcs.stop_all.put(0) + assert mcs._start_monitor_async_data_emission.is_set() + + # Status should be cancelled by stop mcs.stop() - with pytest.raises(Exception): + with pytest.raises(DeviceStoppedError): st.wait(timeout=3) + + # Callback on status failure should stop monitoring + mcs._start_monitor_async_data_emission.wait(2) + assert not mcs._start_monitor_async_data_emission.is_set() + + ####################### + # II. Use case where acquisition completes normally + ####################### + + mcs._current_data_index = 0 + mcs.scan_info.msg.num_points = 10 + mcs.acquiring._read_pv.mock_data = ACQUIRING.ACQUIRING + + st = mcs.complete() + assert st.done is False + assert mcs._start_monitor_async_data_emission.is_set() + + mcs.acquiring._read_pv.mock_data = ACQUIRING.DONE + + # This should now automatically complete the status + mcs._current_data_index = 10 + st.wait(timeout=3) assert st.done is True - assert st.success is False - assert mcs.stop_all.get() == 1 + assert st.success is True + + # Clean up procedure should stop the async_data monitoring + mcs._start_monitor_async_data_emission.wait(2) + assert not mcs._start_monitor_async_data_emission.is_set() -def test_mcs_card_csaxs_on_counter_updated(mock_mcs_csaxs): +def test_mcs_recovery(mock_mcs_csaxs: MCSCardCSAXS): mcs = mock_mcs_csaxs - # Called for mca1 + # Simulate ongoing acquisition + mcs.erase_all._read_pv.mock_data = 0 + mcs.stop_all._read_pv.mock_data = 0 + mcs.erase_start.put(0) + mcs.mcs_recovery(timeout=0.1) + assert mcs.erase_all.get() == 1 + assert mcs.stop_all.get() == 1 + assert mcs.erase_start.get() == 1 + assert not mcs._omit_mca_callbacks.is_set() + + +def test_mcs_card_csaxs_on_counter_updated(mock_mcs_csaxs: MCSCardCSAXS): + """ + Test the on_counter_update method of MCSCardCSAXS. + We will test 2 use cases: + I. Suppressed callbacks + II. Callback from 32 mca counters, should result in data being sent to BEC + """ + mcs = mock_mcs_csaxs + + # I. Suppressed callbacks + mcs._omit_mca_callbacks.set() kwargs = {"obj": mcs.counters.mca1} mcs._on_counter_update(1, **kwargs) - assert mcs.mcs.mca1.get() == 1 - assert mcs.bpm.current1.get() == 1 - assert mcs.counter_updated == [mcs.counters.mca1.name] - # Called for mca2 - kwargs = {"obj": mcs.counters.mca2} - mcs._on_counter_update(np.array([2, 4]), **kwargs) - assert mcs.mcs.mca2.get() == [2, 4] - assert np.isclose(mcs.bpm.current2.get(), 3) - assert mcs.counter_updated == [mcs.counters.mca1.name, mcs.counters.mca2.name] - # Called for mca3 - kwargs = {"obj": mcs.counters.mca3} - mcs._on_counter_update(1000, **kwargs) - assert mcs.mcs.mca3.get() == 1000 - assert mcs.bpm.current3.get() == 1000 - assert mcs.counter_updated == [ - mcs.counters.mca1.name, - mcs.counters.mca2.name, - mcs.counters.mca3.name, - ] - # Called for mca4 - kwargs = {"obj": mcs.counters.mca4} - mcs._on_counter_update(np.array([20, 40]), **kwargs) - assert mcs.mcs.mca4.get() == [20, 40] - assert np.isclose(mcs.bpm.current4.get(), 30) - assert mcs.counter_updated == [ - mcs.counters.mca1.name, - mcs.counters.mca2.name, - mcs.counters.mca3.name, - mcs.counters.mca4.name, - ] - # Called for mca5 - assert mcs.ready_to_read.get() == 0 - kwargs = {"obj": mcs.counters.mca5} - mcs._on_counter_update(np.array([10000, 10000]), **kwargs) - assert np.isclose(mcs.bpm.count_time.get(), 10000 / 1e7) - assert mcs.mcs.mca5.get() == [10000, 10000] + assert mcs._mca_counter_index == 1 # Counter index should still increment + assert mcs._current_data == {} + # II. Callback from 32 mca counters + mcs._omit_mca_callbacks.clear() + mcs._mca_counter_index = 0 + mcs._current_data_index = 0 + val = mcs.mca.get() -# @pytest.fixture(scope="function") -# def mock_det(): -# name = "mcs" -# prefix = "X12SA-MCS:" -# dm = DMMock() -# with mock.patch.object(dm, "connector"): -# with ( -# mock.patch( -# "ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter" -# ) as filemixin, -# mock.patch( -# "ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config" -# ) as mock_service_config, -# ): -# with mock.patch.object(ophyd, "cl") as mock_cl: -# mock_cl.get_pv = MockPV -# mock_cl.thread_class = threading.Thread -# with mock.patch.object(MCScSAXS, "_init"): -# det = MCScSAXS(name=name, prefix=prefix, device_manager=dm) -# patch_dual_pvs(det) -# det.TIMEOUT_FOR_SIGNALS = 0.1 -# yield det + for ii in range(mcs.NUM_MCA_CHANNELS): + counter = getattr(mcs.counters, f"mca{ii+1}") + kwargs = {"obj": counter, "timestamp": 1.0} + if ii % 2 == 1: + value = np.array([ii, (ii + 1) * 2]) + else: + value = ii + mcs._on_counter_update(value, **kwargs) + if ii < (mcs.NUM_MCA_CHANNELS - 1): + assert mcs._current_data_index == 0 + assert mcs._mca_counter_index == ii + 1 + assert counter.attr_name in mcs._current_data + assert ( + mcs._current_data[counter.attr_name]["value"] == value.tolist() + if isinstance(value, np.ndarray) + else [value] + ) + buffer = deepcopy(mcs._current_data) + assert mcs.mca.get() == val # Async mca signal should not change + else: + # On last counter, data should be sent to BEC, and internal variables reset + buffer[counter.attr_name] = { + "value": value.tolist() if isinstance(value, np.ndarray) else [value], + "timestamp": 1.0, + } + assert mcs._mca_counter_index == 0 + assert mcs._current_data_index == 1 + assert mcs._current_data == {} - -# def test_init(): -# """Test the _init function:""" -# name = "eiger" -# prefix = "X12SA-ES-EIGER9M:" -# dm = DMMock() -# with mock.patch.object(dm, "connector"): -# with ( -# mock.patch("ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"), -# mock.patch( -# "ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config" -# ), -# ): -# with mock.patch.object(ophyd, "cl") as mock_cl: -# mock_cl.get_pv = MockPV -# with ( -# mock.patch( -# "csaxs_bec.devices.epics.mcs_csaxs.MCSSetup.initialize_detector" -# ) as mock_init_det, -# mock.patch( -# "csaxs_bec.devices.epics.mcs_csaxs.MCSSetup.initialize_detector_backend" -# ) as mock_init_backend, -# ): -# MCScSAXS(name=name, prefix=prefix, device_manager=dm) -# mock_init_det.assert_called_once() -# mock_init_backend.assert_called_once() - - -# @pytest.mark.parametrize( -# "trigger_source, channel_advance, channel_source1, pv_channels", -# [ -# ( -# 3, -# 1, -# 0, -# { -# "user_led": 0, -# "mux_output": 5, -# "input_pol": 0, -# "output_pol": 1, -# "count_on_start": 0, -# "stop_all": 1, -# }, -# ) -# ], -# ) -# def test_initialize_detector( -# mock_det, trigger_source, channel_advance, channel_source1, pv_channels -# ): -# """Test the _init function: - -# This includes testing the functions: -# - initialize_detector -# - stop_det -# - parent.set_trigger -# --> Testing the filewriter is done in test_init_filewriter - -# Validation upon setting the correct PVs - -# """ -# mock_det.custom_prepare.initialize_detector() # call the method you want to test -# assert mock_det.channel_advance.get() == channel_advance -# assert mock_det.channel1_source.get() == channel_source1 -# assert mock_det.user_led.get() == pv_channels["user_led"] -# assert mock_det.mux_output.get() == pv_channels["mux_output"] -# assert mock_det.input_polarity.get() == pv_channels["input_pol"] -# assert mock_det.output_polarity.get() == pv_channels["output_pol"] -# assert mock_det.count_on_start.get() == pv_channels["count_on_start"] -# assert mock_det.input_mode.get() == trigger_source - - -# def test_trigger(mock_det): -# """Test the trigger function: -# Validate that trigger calls the custom_prepare.on_trigger() function -# """ -# with mock.patch.object(mock_det.custom_prepare, "on_trigger") as mock_on_trigger: -# mock_det.trigger() -# mock_on_trigger.assert_called_once() - - -# @pytest.mark.parametrize( -# "value, num_lines, num_points, done", [(100, 5, 500, False), (500, 5, 500, True)] -# ) -# def test_progress_update(mock_det, value, num_lines, num_points, done): -# mock_det.num_lines.set(num_lines) -# mock_det.scaninfo.num_points = num_points -# calls = mock.call(sub_type="progress", value=value, max_value=num_points, done=done) -# with mock.patch.object(mock_det, "_run_subs") as mock_run_subs: -# mock_det.custom_prepare._progress_update(value=value) -# mock_run_subs.assert_called_once() -# assert mock_run_subs.call_args == calls - - -# @pytest.mark.parametrize( -# "values, expected_nothing", -# [([[100, 120, 140], [200, 220, 240], [300, 320, 340]], False), ([100, 200, 300], True)], -# ) -# def test_on_mca_data(mock_det, values, expected_nothing): -# """Test the on_mca_data function: -# Validate that on_mca_data calls the custom_prepare.on_mca_data() function -# """ -# with mock.patch.object(mock_det.custom_prepare, "_send_data_to_bec") as mock_send_data: -# mock_object = mock.MagicMock() -# for ii, name in enumerate(mock_det.custom_prepare.mca_names): -# mock_object.attr_name = name -# mock_det.custom_prepare._on_mca_data(obj=mock_object, value=values[ii]) -# if not expected_nothing and ii < (len(values) - 1): -# assert mock_det.custom_prepare.mca_data[name] == values[ii] - -# if not expected_nothing: -# mock_send_data.assert_called_once() -# assert mock_det.custom_prepare.acquisition_done is True - - -# @pytest.mark.parametrize( -# "metadata, mca_data", -# [ -# ( -# {"scan_id": 123}, -# { -# "mca1": {"value": [100, 120, 140]}, -# "mca3": {"value": [200, 220, 240]}, -# "mca4": {"value": [300, 320, 340]}, -# }, -# ) -# ], -# ) -# def test_send_data_to_bec(mock_det, metadata, mca_data): -# mock_det.scaninfo.scan_msg = mock.MagicMock() -# mock_det.scaninfo.scan_msg.metadata = metadata -# mock_det.scaninfo.scan_id = metadata["scan_id"] -# mock_det.custom_prepare.mca_data = mca_data -# mock_det.custom_prepare._send_data_to_bec() -# device_metadata = mock_det.scaninfo.scan_msg.metadata -# metadata.update({"async_update": "append", "num_lines": mock_det.num_lines.get()}) -# data = messages.DeviceMessage(signals=dict(mca_data), metadata=device_metadata) -# calls = mock.call( -# topic=MessageEndpoints.device_async_readback( -# scan_id=metadata["scan_id"], device=mock_det.name -# ), -# msg={"data": data}, -# expire=1800, -# ) - -# assert mock_det.connector.xadd.call_args == calls - - -# @pytest.mark.parametrize( -# "scaninfo, triggersource, stopped, expected_exception", -# [ -# ( -# {"num_points": 500, "frames_per_trigger": 1, "scan_type": "step"}, -# TriggerSource.MODE3, -# False, -# False, -# ), -# ( -# {"num_points": 500, "frames_per_trigger": 1, "scan_type": "fly"}, -# TriggerSource.MODE3, -# False, -# False, -# ), -# ( -# {"num_points": 5001, "frames_per_trigger": 2, "scan_type": "step"}, -# TriggerSource.MODE3, -# False, -# True, -# ), -# ( -# {"num_points": 500, "frames_per_trigger": 2, "scan_type": "random"}, -# TriggerSource.MODE3, -# False, -# True, -# ), -# ], -# ) -# def test_stage(mock_det, scaninfo, triggersource, stopped, expected_exception): -# mock_det.scaninfo.num_points = scaninfo["num_points"] -# mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] -# mock_det.scaninfo.scan_type = scaninfo["scan_type"] -# mock_det.stopped = stopped -# with mock.patch.object(mock_det.custom_prepare, "prepare_detector_backend") as mock_prep_fw: -# if expected_exception: -# with pytest.raises(MCSError): -# mock_det.stage() -# mock_prep_fw.assert_called_once() -# else: -# mock_det.stage() -# mock_prep_fw.assert_called_once() -# # Check set_trigger -# mock_det.input_mode.get() == triggersource -# if scaninfo["scan_type"] == "step": -# assert mock_det.num_use_all.get() == int(scaninfo["frames_per_trigger"]) * int( -# scaninfo["num_points"] -# ) -# elif scaninfo["scan_type"] == "fly": -# assert mock_det.num_use_all.get() == int(scaninfo["num_points"]) -# mock_det.preset_real.get() == 0 - -# # # CHeck custom_prepare.arm_acquisition -# # assert mock_det.custom_prepare.counter == 0 -# # assert mock_det.erase_start.get() == 1 -# # mock_prep_fw.assert_called_once() -# # # Check _prep_det -# # assert mock_det.cam.num_images.get() == int( -# # scaninfo["num_points"] * scaninfo["frames_per_trigger"] -# # ) -# # assert mock_det.cam.num_frames.get() == 1 - -# # mock_publish_file_location.assert_called_with(done=False) -# # assert mock_det.cam.acquire.get() == 1 - - -# def test_prepare_detector_backend(mock_det): -# mock_det.custom_prepare.prepare_detector_backend() -# assert mock_det.erase_all.get() == 1 -# assert mock_det.read_mode.get() == ReadoutMode.EVENT - - -# def test_complete(mock_det): -# with (mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,): -# mock_det.complete() -# assert mock_finished.call_count == 1 - - -# def test_stop_detector_backend(mock_det): -# mock_det.custom_prepare.stop_detector_backend() -# assert mock_det.custom_prepare.acquisition_done is True - - -# def test_stop(mock_det): -# with ( -# mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det, -# mock.patch.object( -# mock_det.custom_prepare, "stop_detector_backend" -# ) as mock_stop_detector_backend, -# ): -# mock_det.stop() -# mock_stop_det.assert_called_once() -# mock_stop_detector_backend.assert_called_once() -# assert mock_det.stopped is True - - -# @pytest.mark.parametrize( -# "stopped, acquisition_done, acquiring_state, expected_exception", -# [ -# (False, True, 0, False), -# (False, False, 0, True), -# (False, True, 1, True), -# (True, True, 0, True), -# ], -# ) -# def test_finished(mock_det, stopped, acquisition_done, acquiring_state, expected_exception): -# mock_det.custom_prepare.acquisition_done = acquisition_done -# mock_det.acquiring._read_pv.mock_data = acquiring_state -# mock_det.scaninfo.num_points = 500 -# mock_det.num_lines.put(500) -# mock_det.current_channel._read_pv.mock_data = 1 -# mock_det.stopped = stopped - -# if expected_exception: -# with pytest.raises(MCSTimeoutError): -# mock_det.timeout = 0.1 -# mock_det.custom_prepare.finished() -# else: -# mock_det.custom_prepare.finished() -# if stopped: -# assert mock_det.stopped is stopped + # Check that the async mca signal is properly set + assert isinstance(mcs.mca.get(), messages.DeviceMessage) + assert len(mcs.mca.get().signals) == mcs.NUM_MCA_CHANNELS