diff --git a/tests/tests_devices/test_delay_generator_csaxs.py b/tests/tests_devices/test_delay_generator_csaxs.py index 18079c6..6ba4ecb 100644 --- a/tests/tests_devices/test_delay_generator_csaxs.py +++ b/tests/tests_devices/test_delay_generator_csaxs.py @@ -282,10 +282,11 @@ def test_ddg1_stage(mock_ddg1: DDG1): mock_ddg1.scan_info.msg.scan_parameters["exp_time"] = exp_time mock_ddg1.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger + mock_ddg1.fast_shutter_control._read_pv.mock_data = 0 # Simulate shutter control mock_ddg1.stage() - shutter_width = 2e-3 + exp_time * frames_per_trigger + 1e-3 + shutter_width = mock_ddg1._shutter_to_open_delay + exp_time * frames_per_trigger assert np.isclose(mock_ddg1.burst_mode.get(), 1) # burst mode is enabled assert np.isclose(mock_ddg1.burst_delay.get(), 0) @@ -302,6 +303,25 @@ def test_ddg1_stage(mock_ddg1: DDG1): assert np.isclose(mock_ddg1.ef.width.get(), 1e-6) assert mock_ddg1.staged == ophyd.Staged.yes + mock_ddg1.unstage() + + # Test if shutter is kept open.. + mock_ddg1.fast_shutter_control._read_pv.mock_data = 1 # Simulate shutter control is kept open + # Test method + mock_ddg1.keep_shutter_open_during_scan(True) + shutter_width = mock_ddg1._shutter_to_open_delay + exp_time * frames_per_trigger + assert np.isclose( + shutter_width, exp_time * frames_per_trigger + ) # Shutter to open delay is not added as shutter is kept open + # Simulate fly scan, so no extra trigger for MCS card. + mock_ddg1.scan_info.msg.scan_type = "fly" + mock_ddg1.stage() + # Shutter channel cd + assert np.isclose(mock_ddg1.cd.delay.get(), 0) + assert np.isclose(mock_ddg1.cd.width.get(), shutter_width) + # MCS channel ef or gate + assert np.isclose(mock_ddg1.ef.delay.get(), 0) + assert np.isclose(mock_ddg1.ef.width.get(), 0) # No triggering of MCS due to shutter fly scan def test_ddg1_on_trigger(mock_ddg1: DDG1): @@ -331,9 +351,28 @@ def test_ddg1_on_trigger(mock_ddg1: DDG1): ################################# with mock.patch.object(ddg, "_prepare_mcs_on_trigger") as mock_prepare_mcs: mock_prepare_mcs.return_value = ophyd.StatusBase(done=True, success=True) + # MCS card is present and enabled, should call prepare_mcs_on_trigger + # and the status should resolve once acuiring goes from 1 to 0. status = ddg.trigger() + assert status.done is False + mcs = ddg.device_manager.devices.get("mcs", None) + assert mcs is not None + mcs.acquiring._read_pv.mock_data = 1 # Simulate acquiring started + assert status.done is False + mcs.acquiring._read_pv.mock_data = 0 # Simulate acquiring stopped + status.wait(timeout=1) # Wait for the status to be done + assert status.done is True + assert status.success is True + mock_prepare_mcs.assert_called_once() + # Now we disable the mcs card, and trigger again. This should not call prepare_mcs_on_trigger + # and should fallback to polling the DDG for END_OF_BURST status bit. + + # Disable mcs card + mcs.enabled = False + status = ddg.trigger() # Check that the poll thread run event is set + # Careful in debugger, there is a timeout based on the exp_time + 5s default assert ddg._poll_thread_run_event.is_set() assert not ddg._poll_thread_poll_loop_done.is_set()