# pylint: skip-file import threading from typing import Generator from unittest import mock import numpy as np import ophyd import pytest from bec_server.device_server.tests.utils import DMMock from ophyd_devices.tests.utils import patched_device from csaxs_bec.devices.epics.delay_generator_csaxs import DDG1, DDG2 from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import ( DEFAULT_IO_CONFIG as DDG1_DEFAULT_IO_CONFIG, ) from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import ( DEFAULT_READOUT_TIMES as DDG1_DEFAULT_READOUT_TIMES, ) from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import ( DEFAULT_REFERENCES as DDG1_DEFAULT_REFERENCES, ) from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import ( DEFAULT_TRIGGER_SOURCE as DDG1_DEFAULT_TRIGGER_SOURCE, ) from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import PROC_EVENT_MODE from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_2 import ( DEFAULT_IO_CONFIG as DDG2_DEFAULT_IO_CONFIG, ) from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_2 import ( DEFAULT_READOUT_TIMES as DDG2_DEFAULT_READOUT_TIMES, ) from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_2 import ( DEFAULT_REFERENCES as DDG2_DEFAULT_REFERENCES, ) from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_2 import ( DEFAULT_TRIGGER_SOURCE as DDG2_DEFAULT_TRIGGER_SOURCE, ) from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import ( BURSTCONFIG, CHANNELREFERENCE, STATUSBITS, TRIGGERSOURCE, DelayGeneratorCSAXS, ) from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import MCSCardCSAXS ############################ ### Test Delay Generator ### ############################ @pytest.fixture(scope="function") def mock_ddg() -> Generator[DelayGeneratorCSAXS, DelayGeneratorCSAXS, DelayGeneratorCSAXS]: """Fixture to mock the camera device.""" with patched_device( DelayGeneratorCSAXS, name="ddg", prefix="test:", _mock_pv_initial_value=0 ) as dev: try: yield dev finally: dev.destroy() def test_ddg_init(mock_ddg: DelayGeneratorCSAXS): """Test the proc event status method.""" assert mock_ddg.name == "ddg" assert mock_ddg.prefix == "test:" def test_ddg_proc_event_status(mock_ddg: DelayGeneratorCSAXS): """Test the proc event status method.""" mock_ddg.state.proc_status.put(0) mock_ddg.proc_event_status() assert mock_ddg.state.proc_status.get() == 1 def test_ddg_set_trigger(mock_ddg: DelayGeneratorCSAXS): """Test setting the trigger.""" for trigger in TRIGGERSOURCE: mock_ddg.set_trigger(trigger) assert mock_ddg.trigger_source.get() == trigger.value def test_ddg_burst_enable(mock_ddg: DelayGeneratorCSAXS): """Test enabling burst mode.""" mock_ddg.burst_enable(count=100, delay=0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES) mock_ddg.burst_mode.get() == 1 assert mock_ddg.burst_count.get() == 100 assert mock_ddg.burst_delay.get() == 0.1 assert mock_ddg.burst_period.get() == 0.02 assert mock_ddg.burst_config.get() == BURSTCONFIG.ALL_CYCLES.value assert mock_ddg.burst_mode.get() == 1 # Count is 0 with pytest.raises(ValueError): mock_ddg.burst_enable(count=0, delay=0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES) # delay is negative with pytest.raises(ValueError): mock_ddg.burst_enable(count=100, delay=-0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES) # period is zero with pytest.raises(ValueError): mock_ddg.burst_enable(count=100, delay=0.1, period=0, config=BURSTCONFIG.ALL_CYCLES) # Works with default config mock_ddg.burst_enable(count=100, delay=0.1, period=0.02) mock_ddg.burst_mode.get() == BURSTCONFIG.FIRST_CYCLE.value def test_ddg_wait_for_event_status(mock_ddg: DelayGeneratorCSAXS): """Test setting wait for event status.""" mock_ddg: DelayGeneratorCSAXS mock_ddg.state.event_status._read_pv.mock_data = 0 status = mock_ddg.wait_for_event_status(value=STATUSBITS.END_OF_BURST) # 8 assert status.done is False mock_ddg.state.event_status._read_pv.mock_data = 1 assert status.done is False mock_ddg.state.event_status._read_pv.mock_data = 4 assert status.done is False # TODO enable once callback for MockPV is implemented # mock_ddg.state.event_status._read_pv.mock_data = 13 # 8 + 4 + 1 # status.wait(timeout=1) # Wait for the status to be done # assert status.done is True def test_ddg_set_io_values(mock_ddg: DelayGeneratorCSAXS): """Test setting IO values.""" mock_ddg.set_io_values(channel="ab", amplitude=3, offset=2, polarity=1, mode="ttl") assert mock_ddg.ab.io.amplitude.get() == 3 assert mock_ddg.ab.io.offset.get() == 2 assert mock_ddg.ab.io.polarity.get() == 1 assert mock_ddg.ab.io.ttl_mode.get() == 1 # List of channels channels = ["ab", "cd", "t0"] mock_ddg.set_io_values(channel=channels, amplitude=3, offset=2, polarity=1, mode="nim") for channel in channels: if channel == "t0": attr = getattr(mock_ddg, channel) else: attr = getattr(mock_ddg, channel).io assert attr.amplitude.get() == 3 assert attr.offset.get() == 2 assert attr.polarity.get() == 1 assert attr.nim_mode.get() == 1 def test_ddg_set_delay_pairs(mock_ddg: DelayGeneratorCSAXS): """Test setting delay pairs.""" mock_ddg.set_delay_pairs(channel="ab", delay=0.1, width=0.2) assert np.isclose(mock_ddg.ab.delay.get(), 0.1) assert np.isclose(mock_ddg.ab.width.get(), 0.2) assert np.isclose(mock_ddg.ab.ch1.setpoint.get(), 0.1) assert np.isclose(mock_ddg.ab.ch2.setpoint.get(), 0.3) # List of channels channels = ["ab", "cd", "ef", "gh"] delays = [0.1, 0.2, 0.4, 0.5] mock_ddg.set_delay_pairs(channel=channels, delay=delays, width=0.2) for delay, channel in zip(delays, channels): assert np.isclose(getattr(mock_ddg, channel).delay.get(), delay) assert np.isclose(getattr(mock_ddg, channel).width.get(), 0.2) assert np.isclose(getattr(mock_ddg, channel).ch1.setpoint.get(), delay) assert np.isclose(getattr(mock_ddg, channel).ch2.setpoint.get(), delay + 0.2) ######################### ### Test DDG1 Device #### ######################### @pytest.fixture(scope="function") def mock_mcs_csaxs() -> Generator[MCSCardCSAXS, None, None]: """Fixture to mock the MCSCardCSAXS device.""" dm = DMMock() with patched_device( MCSCardCSAXS, name="mcs", prefix="X12SA-MCS-CSAXS:", device_manager=dm, _mock_pv_initial_value=0, ) as dev: dev.enabled = True dev.device_manager.devices["mcs"] = dev try: yield dev finally: dev.destroy() @pytest.fixture(scope="function") def mock_ddg1(mock_mcs_csaxs: MCSCardCSAXS) -> Generator[DDG1, None, None]: """Fixture to mock the DDG1 device.""" # Add enabled to mock_mcs_csaxs dm_mock = mock_mcs_csaxs.device_manager with patched_device( DDG1, name="ddg1", prefix="test_ddg1:", device_manager=dm_mock, _mock_pv_initial_value=0 ) as dev: dev.enabled = True dev.device_manager.devices["ddg1"] = dev try: yield dev finally: dev.destroy() def test_ddg1_on_connected(mock_ddg1: DDG1): """Test the on_connected method of DDG1.""" mock_ddg1.burst_mode.put(1) # Set burst mode to 1, if connected should reset it to 0 mock_ddg1.burst_delay.put(5) # Set to non-zero, should reset to 0 on connected mock_ddg1.burst_count.put(10) # Set to non-default, should reset to 1 on connected with mock.patch.object(mock_ddg1, "set_io_values") as mock_set_io_values: mock_ddg1.on_connected() # Burst mode Defaults assert mock_ddg1.burst_mode.get() == 0 assert mock_ddg1.burst_delay.get() == 0 assert mock_ddg1.burst_count.get() == 1 assert mock_set_io_values.call_count == len(DDG1_DEFAULT_IO_CONFIG) for ch, config in DDG1_DEFAULT_IO_CONFIG.items(): assert mock.call(ch, **config) in mock_set_io_values.call_args_list # Check reference values from DEFAULT_REFERENCES for ch, refs in DDG1_DEFAULT_REFERENCES: if ch == "A": sub_ch = mock_ddg1.ab.ch1 elif ch == "B": sub_ch = mock_ddg1.ab.ch2 elif ch == "C": sub_ch = mock_ddg1.cd.ch1 elif ch == "D": sub_ch = mock_ddg1.cd.ch2 elif ch == "E": sub_ch = mock_ddg1.ef.ch1 elif ch == "F": sub_ch = mock_ddg1.ef.ch2 elif ch == "G": sub_ch = mock_ddg1.gh.ch1 elif ch == "H": sub_ch = mock_ddg1.gh.ch2 assert sub_ch.reference.get() == refs.value # Check Default trigger source assert mock_ddg1.trigger_source.get() == DDG1_DEFAULT_TRIGGER_SOURCE.value # Check proc state mode assert mock_ddg1.state.proc_status_mode.get() == PROC_EVENT_MODE.EVENT.value # Check the poll thread is started assert mock_ddg1._poll_thread.is_alive() assert not mock_ddg1._poll_thread_kill_event.is_set() assert not mock_ddg1._poll_thread_poll_loop_done.is_set() assert not mock_ddg1._poll_thread_run_event.is_set() def test_ddg1_prepare_mcs(mock_ddg1: DDG1, mock_mcs_csaxs: MCSCardCSAXS): """Test the prepare_mcs method of DDG1.""" mcs = mock_mcs_csaxs ddg = mock_ddg1 # Simulate default state mcs.acquiring._read_pv.mock_data = 0 # not acquiring mcs.erase_start.put(0) # reset erase start # Prepare MCS on trigger st = ddg._prepare_mcs_on_trigger(mcs) assert st.done is False assert st.success is False assert mcs.erase_start.get() == 1 # erase started # Simulate acquiring started mcs.acquiring._read_pv.mock_data = 1 # acquiring st.wait(2) assert st.done is True assert st.success is True def test_ddg1_stage(mock_ddg1: DDG1): """Test the on_stage method of DDG1.""" exp_time = 0.1 frames_per_trigger = 10 mock_ddg1.burst_mode.put(0) # Non-default, should be reset on stage mock_ddg1.burst_delay.put(5) # Non-default, should be reset on stage mock_ddg1.burst_count.put(10) # Non-default, should be reset on stage mock_ddg1.scan_info.msg.scan_parameters["exp_time"] = exp_time mock_ddg1.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger mock_ddg1.fast_shutter_control._read_pv.mock_data = 0 # Simulate shutter control mock_ddg1.stage() shutter_width = mock_ddg1._shutter_to_open_delay + exp_time * frames_per_trigger total_exposure = 2 * mock_ddg1._shutter_to_open_delay + exp_time * frames_per_trigger + 3e-6 assert np.isclose(mock_ddg1.burst_mode.get(), 1) # burst mode is enabled assert np.isclose(mock_ddg1.burst_delay.get(), 0) assert np.isclose(mock_ddg1.burst_period.get(), total_exposure) # Trigger DDG2 through EXT/EN assert np.isclose(mock_ddg1.ab.delay.get(), 2e-3) assert np.isclose(mock_ddg1.ab.width.get(), shutter_width) # Shutter channel cd assert np.isclose(mock_ddg1.cd.delay.get(), 0) assert np.isclose(mock_ddg1.cd.width.get(), shutter_width) # MCS channel ef or gate assert np.isclose(mock_ddg1.ef.delay.get(), 1e-6) assert np.isclose(mock_ddg1.ef.width.get(), 1e-6) assert mock_ddg1.staged == ophyd.Staged.yes mock_ddg1.unstage() # Test if shutter is kept open.. mock_ddg1.fast_shutter_control._read_pv.mock_data = 1 # Simulate shutter control is kept open # Test method mock_ddg1.keep_shutter_open_during_scan(True) shutter_width = mock_ddg1._shutter_to_open_delay + exp_time * frames_per_trigger assert np.isclose( shutter_width, exp_time * frames_per_trigger ) # Shutter to open delay is not added as shutter is kept open # Simulate fly scan, so no extra trigger for MCS card. mock_ddg1.scan_info.msg.scan_type = "fly" mock_ddg1.stage() # Shutter channel cd assert np.isclose(mock_ddg1.cd.delay.get(), 0) assert np.isclose(mock_ddg1.cd.width.get(), shutter_width) # MCS channel ef or gate assert np.isclose(mock_ddg1.ef.delay.get(), 0) assert np.isclose(mock_ddg1.ef.width.get(), 0) # No triggering of MCS due to shutter fly scan def test_ddg1_on_trigger(mock_ddg1: DDG1): """ Test the on_trigger method of the DDG1. We will test two scenarios: I. Trigger is prepared, and resolves successfully after END_OF_BURST is reached in event status register. II. Trigger is called while _poll_thread_loop_done is not yet finished from a previous trigger. This may be the case if polling is yet to finsish. The next on_trigger should terminate the previous polling, and work as expected. In addition, we will simulate that the mcs card is disabled, thus not prepared. """ ddg = mock_ddg1 # Make sure DDG is setup in default state through on_connected ddg.on_connected() # Check that poll thread is running and run event is not set assert ddg._poll_thread.is_alive() assert not ddg._poll_thread_run_event.is_set() assert not ddg._poll_thread_poll_loop_done.is_set() # Set the status register bit ddg.state.event_status._read_pv.mock_data = STATUSBITS.ABORT_DELAY.value ################################# # Scenario I - normal operation # ################################# with mock.patch.object(ddg, "_prepare_mcs_on_trigger") as mock_prepare_mcs: mock_prepare_mcs.return_value = ophyd.StatusBase(done=True, success=True) # MCS card is present and enabled, should call prepare_mcs_on_trigger # and the status should resolve once acuiring goes from 1 to 0. status = ddg.trigger() assert status.done is False mcs = ddg.device_manager.devices.get("mcs", None) assert mcs is not None mcs.acquiring._read_pv.mock_data = 1 # Simulate acquiring started assert status.done is False mcs.acquiring._read_pv.mock_data = 0 # Simulate acquiring stopped status.wait(timeout=1) # Wait for the status to be done assert status.done is True assert status.success is True mock_prepare_mcs.assert_called_once() # Now we disable the mcs card, and trigger again. This should not call prepare_mcs_on_trigger # and should fallback to polling the DDG for END_OF_BURST status bit. # Disable mcs card mcs.enabled = False status = ddg.trigger() # Check that the poll thread run event is set # Careful in debugger, there is a timeout based on the exp_time + 5s default assert ddg._poll_thread_run_event.is_set() assert not ddg._poll_thread_poll_loop_done.is_set() assert status.done is False assert status.success is False assert ddg.trigger_shot.get() == 1 # Simulate that the event status bit reaches END_OF_BURST ddg.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_BURST.value status.wait(timeout=1) # Wait for the status to be done assert status.done is True assert status.success is True # Should finish the poll loop ddg._poll_thread_poll_loop_done.wait(timeout=1) assert not ddg._poll_thread_run_event.is_set() ############################################ # Scenario II - previous poll not finished # # MCS card disabled # ############################################ # Set mcs card to enabled = False ddg.device_manager.devices["mcs"].enabled = False ddg.state.event_status._read_pv.mock_data = STATUSBITS.ABORT_DELAY.value ddg._start_polling() assert ddg._poll_thread_run_event.is_set() with mock.patch.object(ddg, "_prepare_mcs_on_trigger") as mock_prepare_mcs: status = ddg.trigger() mock_prepare_mcs.assert_not_called() # MCS is disabled, should not be called assert status.done is False assert status.success is False # Resolve the status by simulating END_OF_BURST ddg.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_BURST.value status.wait(timeout=1) # Wait for the status to be done assert status.done is True assert status.success is True # Wait for poll loop to finish ddg._poll_thread_poll_loop_done.wait(timeout=1) assert not ddg._poll_thread_run_event.is_set() # def test_ddg1_trigger(mock_ddg1): # """Test the on_trigger method of DDG1.""" # mock_ddg1.state.event_status._read_pv.mock_data = STATUSBITS.NONE.value # with mock.patch.object(mock_ddg1, "device_manager") as mock_device_manager: # # TODO add device manager DMMock, and properly test logic for mcs triggering. # mock_get = mock_device_manager.devices.get = mock.Mock(return_value=None) # status = mock_ddg1.trigger() # assert mock_get.call_args == mock.call("mcs", None) # assert status.done is False # assert status.success is False # assert mock_ddg1.trigger_shot.get() == 1 # mock_ddg1.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_BURST.value # status.wait(timeout=1) # Wait for the status to be done # assert status.done is True # assert status.success is True # def test_ddg1_stop(mock_ddg1): # """Test the on_stop method of DDG1.""" # mock_ddg1.burst_mode.put(1) # Enable burst mode # mock_ddg1.stop() # assert mock_ddg1.burst_mode.get() == 0 # Burst mode is disabled ######################### ### Test DDG2 Device #### ######################### @pytest.fixture(scope="function") def mock_ddg2(mock_mcs_csaxs: MCSCardCSAXS) -> Generator[DDG2, None, None]: """Fixture to mock the DDG1 device.""" # Add enabled to mock_mcs_csaxs dm_mock = mock_mcs_csaxs.device_manager with patched_device( DDG2, name="ddg2", prefix="test_ddg2:", device_manager=dm_mock, _mock_pv_initial_value=0 ) as dev: dev.enabled = True dev.device_manager.devices["ddg2"] = dev try: yield dev finally: dev.destroy() def test_ddg2_on_connected(mock_ddg2: DDG2): """Test the on_connected method of DDG1.""" mock_ddg2.burst_mode.put(1) # Set burst mode to 1, if connected should reset it to 0 mock_ddg2.burst_delay.put(5) # Set to non-zero, should reset to 0 on connected mock_ddg2.burst_count.put(10) # Set to non-default, should reset to 1 on connected with mock.patch.object(mock_ddg2, "set_io_values") as mock_set_io_values: mock_ddg2.on_connected() # Burst mode Defaults assert mock_ddg2.burst_mode.get() == 0 assert mock_set_io_values.call_count == len(DDG2_DEFAULT_IO_CONFIG) for ch, config in DDG2_DEFAULT_IO_CONFIG.items(): assert mock.call(ch, **config) in mock_set_io_values.call_args_list # Check reference values from DEFAULT_REFERENCES for ch, refs in DDG2_DEFAULT_REFERENCES: if ch == "A": sub_ch = mock_ddg2.ab.ch1 elif ch == "B": sub_ch = mock_ddg2.ab.ch2 elif ch == "C": sub_ch = mock_ddg2.cd.ch1 elif ch == "D": sub_ch = mock_ddg2.cd.ch2 elif ch == "E": sub_ch = mock_ddg2.ef.ch1 elif ch == "F": sub_ch = mock_ddg2.ef.ch2 elif ch == "G": sub_ch = mock_ddg2.gh.ch1 elif ch == "H": sub_ch = mock_ddg2.gh.ch2 assert sub_ch.reference.get() == refs.value # Check Default trigger source assert mock_ddg2.trigger_source.get() == DDG2_DEFAULT_TRIGGER_SOURCE.value def test_ddg2_on_stage(mock_ddg2: DDG2): """ Test the on_stage method of DDG2. We will test two scenarios: I. Stage device with valid parameters. II. Stage device with invalid parameters (too short exp_time). Should raise ValueError. """ ddg = mock_ddg2 exp_time = 0.1 frames_per_trigger = 10 ddg.on_connected() ddg.scan_info.msg.scan_parameters["exp_time"] = exp_time ddg.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger # Set non-default burst mode settings ddg.burst_mode.put(0) ddg.burst_delay.put(5) # Stage device with valid parameters ddg.stage() assert ddg.staged == ophyd.Staged.yes assert ddg.burst_mode.get() == 1 # Burst mode is enabled assert ddg.burst_delay.get() == 0 # Burst delay is set to 0 assert ddg.burst_count.get() == frames_per_trigger assert ddg.burst_period.get() == exp_time # Pulse width is exp_time - readout_time burst_pulse_width = exp_time - DDG2_DEFAULT_READOUT_TIMES["ab"] assert np.isclose(ddg.ab.delay.get(), 0) assert np.isclose(ddg.ab.width.get(), burst_pulse_width) # Unstage to reset ddg.unstage() # Reset staged state for next test exp_time_short = 2e-4 # too short exposure time with pytest.raises(ValueError): ddg.scan_info.msg.scan_parameters["exp_time"] = exp_time_short ddg.stage() def test_ddg2_on_trigger(mock_ddg2: DDG2): """Test the on_trigger method of DDG2.""" ddg = mock_ddg2 ddg.on_connected() ddg.trigger_shot.put(0) status = ddg.trigger() assert ddg.trigger_shot.get() == 0 # Should not trigger DDG2 via soft trigger status.wait() assert status.done is True assert status.success is True def test_ddg2_on_stop(mock_ddg2: DDG2): """Test the on_stop method of DDG2.""" ddg = mock_ddg2 ddg.on_connected() ddg.burst_mode.put(1) # Enable burst mode ddg.stop() assert ddg.burst_mode.get() == 0 # Burst mode is disabled # def test_ddg2_stage(mock_ddg2): # """Test the on_stage method of DDG2.""" # exp_time = 0.1 # frames_per_trigger = 10 # mock_ddg2.on_connected() # mock_ddg2.burst_mode.put(0) # mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = exp_time # mock_ddg2.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger # mock_ddg2.stage() # assert np.isclose(mock_ddg2.burst_mode.get(), 1) # Burst mode is enabled # assert np.isclose(mock_ddg2.ab.delay.get(), 0) # assert np.isclose(mock_ddg2.ab.width.get(), exp_time - 2e-4) # DEFAULT_READOUT_TIMES["ab"]) # assert mock_ddg2.burst_count.get() == frames_per_trigger # assert np.isclose(mock_ddg2.burst_delay.get(), 0) # assert np.isclose(mock_ddg2.burst_period.get(), exp_time) # assert mock_ddg2.trigger_source.get() == TRIGGERSOURCE.EXT_RISING_EDGE.value # assert mock_ddg2.staged == ophyd.Staged.yes # mock_ddg2.unstage() # Reset staged state for next test # with pytest.raises(ValueError): # mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = 2e-4 # too short exposure time # mock_ddg2.stage() # def test_ddg2_trigger(mock_ddg2): # """Test the on_trigger method of DDG2.""" # mock_ddg2.trigger_shot.put(0) # status = mock_ddg2.trigger() # assert mock_ddg2.trigger_shot.get() == 0 # Should not trigger DDG2 via soft trigger # status.wait() # assert status.done is True # assert status.success is True # def test_ddg2_stop(mock_ddg2): # """Test the on_stop method of DDG2.""" # mock_ddg2.burst_mode.put(1) # Enable burst mode # mock_ddg2.stop() # assert mock_ddg2.burst_mode.get() == 0 # Burst mode is disabled