tests: fix tests for ddg1
This commit is contained in:
@@ -282,10 +282,11 @@ def test_ddg1_stage(mock_ddg1: DDG1):
|
||||
|
||||
mock_ddg1.scan_info.msg.scan_parameters["exp_time"] = exp_time
|
||||
mock_ddg1.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
|
||||
mock_ddg1.fast_shutter_control._read_pv.mock_data = 0 # Simulate shutter control
|
||||
|
||||
mock_ddg1.stage()
|
||||
|
||||
shutter_width = 2e-3 + exp_time * frames_per_trigger + 1e-3
|
||||
shutter_width = mock_ddg1._shutter_to_open_delay + exp_time * frames_per_trigger
|
||||
|
||||
assert np.isclose(mock_ddg1.burst_mode.get(), 1) # burst mode is enabled
|
||||
assert np.isclose(mock_ddg1.burst_delay.get(), 0)
|
||||
@@ -302,6 +303,25 @@ def test_ddg1_stage(mock_ddg1: DDG1):
|
||||
assert np.isclose(mock_ddg1.ef.width.get(), 1e-6)
|
||||
|
||||
assert mock_ddg1.staged == ophyd.Staged.yes
|
||||
mock_ddg1.unstage()
|
||||
|
||||
# Test if shutter is kept open..
|
||||
mock_ddg1.fast_shutter_control._read_pv.mock_data = 1 # Simulate shutter control is kept open
|
||||
# Test method
|
||||
mock_ddg1.keep_shutter_open_during_scan(True)
|
||||
shutter_width = mock_ddg1._shutter_to_open_delay + exp_time * frames_per_trigger
|
||||
assert np.isclose(
|
||||
shutter_width, exp_time * frames_per_trigger
|
||||
) # Shutter to open delay is not added as shutter is kept open
|
||||
# Simulate fly scan, so no extra trigger for MCS card.
|
||||
mock_ddg1.scan_info.msg.scan_type = "fly"
|
||||
mock_ddg1.stage()
|
||||
# Shutter channel cd
|
||||
assert np.isclose(mock_ddg1.cd.delay.get(), 0)
|
||||
assert np.isclose(mock_ddg1.cd.width.get(), shutter_width)
|
||||
# MCS channel ef or gate
|
||||
assert np.isclose(mock_ddg1.ef.delay.get(), 0)
|
||||
assert np.isclose(mock_ddg1.ef.width.get(), 0) # No triggering of MCS due to shutter fly scan
|
||||
|
||||
|
||||
def test_ddg1_on_trigger(mock_ddg1: DDG1):
|
||||
@@ -331,9 +351,28 @@ def test_ddg1_on_trigger(mock_ddg1: DDG1):
|
||||
#################################
|
||||
with mock.patch.object(ddg, "_prepare_mcs_on_trigger") as mock_prepare_mcs:
|
||||
mock_prepare_mcs.return_value = ophyd.StatusBase(done=True, success=True)
|
||||
# MCS card is present and enabled, should call prepare_mcs_on_trigger
|
||||
# and the status should resolve once acuiring goes from 1 to 0.
|
||||
status = ddg.trigger()
|
||||
assert status.done is False
|
||||
mcs = ddg.device_manager.devices.get("mcs", None)
|
||||
assert mcs is not None
|
||||
mcs.acquiring._read_pv.mock_data = 1 # Simulate acquiring started
|
||||
assert status.done is False
|
||||
mcs.acquiring._read_pv.mock_data = 0 # Simulate acquiring stopped
|
||||
status.wait(timeout=1) # Wait for the status to be done
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
mock_prepare_mcs.assert_called_once()
|
||||
|
||||
# Now we disable the mcs card, and trigger again. This should not call prepare_mcs_on_trigger
|
||||
# and should fallback to polling the DDG for END_OF_BURST status bit.
|
||||
|
||||
# Disable mcs card
|
||||
mcs.enabled = False
|
||||
status = ddg.trigger()
|
||||
# Check that the poll thread run event is set
|
||||
# Careful in debugger, there is a timeout based on the exp_time + 5s default
|
||||
assert ddg._poll_thread_run_event.is_set()
|
||||
assert not ddg._poll_thread_poll_loop_done.is_set()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user