231 lines
8.0 KiB
Python
231 lines
8.0 KiB
Python
# pylint: skip-file
|
|
import os
|
|
import threading
|
|
from typing import Generator
|
|
from unittest import mock
|
|
|
|
import ophyd
|
|
import pytest
|
|
from bec_lib import messages
|
|
from bec_lib.endpoints import MessageEndpoints
|
|
from bec_lib.file_utils import get_full_path
|
|
from bec_server.device_server.tests.utils import DMMock
|
|
from ophyd_devices.interfaces.base_classes.psi_device_base import DeviceStoppedError
|
|
from ophyd_devices.tests.utils import patched_device
|
|
|
|
from csaxs_bec.devices.epics.falcon_csaxs import (
|
|
ACQUIRESTATUS,
|
|
FalconcSAXS,
|
|
MappingSource,
|
|
TriggerSource,
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def mock_det() -> Generator[FalconcSAXS, None, None]:
|
|
"""Fixture to mock the FalconcSAXS device."""
|
|
name = "mcs_csaxs"
|
|
prefix = "X12SA-MCS-CSAXS:"
|
|
dm = DMMock()
|
|
with patched_device(
|
|
FalconcSAXS,
|
|
name="falcon",
|
|
prefix="X12SA-SITORO:",
|
|
device_manager=dm,
|
|
_mock_pv_initial_value=1,
|
|
) as dev:
|
|
try:
|
|
for dotted_name, device in dev.walk_subdevices(include_lazy=True):
|
|
device.stage_sigs = {} # Remove stage signals
|
|
device.trigger_sigs = {} # Remove trigger signals
|
|
if hasattr(device, "plugin_type"):
|
|
device.plugin_type._read_pv.mock_data = device._plugin_type
|
|
yield dev
|
|
finally:
|
|
dev.destroy()
|
|
|
|
|
|
def test_falcon_init(mock_det: FalconcSAXS):
|
|
"""Test the initialization of the FalconcSAXS device."""
|
|
assert mock_det._readout_time == mock_det.MIN_READOUT
|
|
assert mock_det._value_pixel_per_buffer == 20
|
|
assert mock_det._queue_size == 2000
|
|
assert mock_det._full_path == ""
|
|
|
|
|
|
def test_falcon_on_connected(mock_det: FalconcSAXS):
|
|
"""Test the on_connected method of the FalconcSAXS device."""
|
|
falcon = mock_det
|
|
|
|
# Set known default values
|
|
falcon.preset_mode.put(-1)
|
|
falcon.input_logic_polarity.put(-1)
|
|
falcon.auto_pixels_per_buffer.put(-1)
|
|
falcon.hdf5.enable.put(-1)
|
|
|
|
with (
|
|
mock.patch.object(falcon, "on_stop") as mock_on_stop,
|
|
mock.patch.object(falcon, "set_trigger") as mock_set_trigger,
|
|
):
|
|
|
|
falcon.on_connected()
|
|
mock_on_stop.assert_called_once()
|
|
mock_set_trigger.assert_called_once_with(
|
|
mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0
|
|
)
|
|
|
|
# Detector default PV values
|
|
assert falcon.preset_mode.get() == "1" # Real Time
|
|
assert falcon.input_logic_polarity.get() == 0
|
|
assert falcon.auto_pixels_per_buffer.get() == 0
|
|
assert falcon.pixels_per_buffer.get() == falcon._value_pixel_per_buffer
|
|
|
|
# Backend default PV values
|
|
assert falcon.hdf5.enable.get() == "1" # Enabled
|
|
assert falcon.hdf5.xml_file_name.get() == "layout.xml"
|
|
assert falcon.hdf5.lazy_open.get() == "1" # Enabled
|
|
assert falcon.hdf5.temp_suffix.get() == ""
|
|
assert falcon.hdf5.queue_size.get() == falcon._queue_size
|
|
assert falcon.nd_array_mode.get() == 1
|
|
assert falcon.hdf5.file_template.get() == "%s%s"
|
|
assert falcon.hdf5.file_write_mode.get() == 2
|
|
|
|
|
|
def test_falcon_on_stage(mock_det: FalconcSAXS):
|
|
"""
|
|
|
|
Test the on_stage method of the FalconcSAXS device.
|
|
All relevant information is available in the scan_info attribute and used
|
|
to bootstrap the detector for the upcoming acquisition. Two scenarios are tested:
|
|
I. Normal case with exposure time larger than readout time
|
|
II. Case where exposure time is smaller than readout time, which should raise an exception.
|
|
"""
|
|
falcon = mock_det
|
|
num_points = 10
|
|
exp_time = 0.2
|
|
frames_per_trigger = 5
|
|
falcon.scan_info.msg.num_points = num_points
|
|
falcon.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
|
|
falcon.scan_info.msg.scan_parameters["exp_time"] = exp_time
|
|
falcon.hdf5.array_counter.put(5) # Set to non-zero to check reset
|
|
|
|
# I. Normal case
|
|
falcon.stage()
|
|
|
|
assert falcon.staged is ophyd.Staged.yes
|
|
assert falcon._full_path == get_full_path(falcon.scan_info.msg, falcon.name)
|
|
file_path = falcon.hdf5.file_path.get()
|
|
file_name = falcon.hdf5.file_name.get()
|
|
assert os.path.join(file_path, file_name) == falcon._full_path
|
|
|
|
assert falcon.preset_real_time.get() == exp_time
|
|
assert falcon.pixels_per_run.get() == num_points * frames_per_trigger
|
|
assert falcon.hdf5.num_capture.get() == num_points * frames_per_trigger
|
|
assert falcon.hdf5.array_counter.get() == 0
|
|
assert falcon.hdf5.capture.get() == 1
|
|
assert falcon.start_all.get() == 1
|
|
|
|
# II. Unstage device first
|
|
falcon.unstage()
|
|
exp_time = 1e-3 # Smaller than readout time
|
|
falcon.scan_info.msg.scan_parameters["exp_time"] = exp_time
|
|
with pytest.raises(ValueError):
|
|
falcon.stage()
|
|
assert falcon.staged is not ophyd.Staged.no
|
|
|
|
|
|
def test_falcon_on_pre_scan(mock_det: FalconcSAXS):
|
|
"""Test the on_pre_scan method of the FalconcSAXS device."""
|
|
falcon = mock_det
|
|
# I. Test normal case with success
|
|
falcon.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.DONE
|
|
falcon.hdf5.capture._read_pv.mock_data = ACQUIRESTATUS.DONE
|
|
falcon = mock_det
|
|
st = falcon.on_pre_scan()
|
|
assert st.done is False
|
|
assert st.success is False
|
|
falcon.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.ACQUIRING
|
|
assert st.done is False
|
|
assert st.success is False
|
|
falcon.hdf5.capture._read_pv.mock_data = ACQUIRESTATUS.ACQUIRING
|
|
st.wait(3)
|
|
assert st.done is True
|
|
assert st.success is True
|
|
|
|
# II. Test abort case with stop called
|
|
falcon.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.DONE
|
|
falcon.hdf5.capture._read_pv.mock_data = ACQUIRESTATUS.DONE
|
|
st = falcon.on_pre_scan()
|
|
assert st.done is False
|
|
assert st.success is False
|
|
falcon.stop()
|
|
with pytest.raises(DeviceStoppedError):
|
|
st.wait(3)
|
|
assert st.done is True
|
|
assert st.success is False
|
|
|
|
|
|
def test_falcon_stop(mock_det: FalconcSAXS):
|
|
"""Test the stop method of the FalconcSAXS device."""
|
|
falcon = mock_det
|
|
|
|
falcon.stop_all.put(0)
|
|
falcon.hdf5.capture.put(1)
|
|
falcon.erase_all.put(0)
|
|
falcon.stop()
|
|
assert falcon.stop_all.get() == 1
|
|
assert falcon.hdf5.capture.get() == 0
|
|
assert falcon.erase_all.get() == 1
|
|
|
|
|
|
def test_falcon_complete(mock_det: FalconcSAXS):
|
|
"""Test the complete method of the FalconcSAXS device."""
|
|
falcon = mock_det
|
|
|
|
num_points = 10
|
|
frames_per_trigger = 5
|
|
falcon.scan_info.msg.num_points = num_points
|
|
falcon.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
|
|
|
|
# I. Test normal case with success
|
|
falcon.dxp.current_pixel._read_pv.mock_data = num_points * frames_per_trigger - 1
|
|
falcon.hdf5.array_counter._read_pv.mock_data = num_points * frames_per_trigger - 1
|
|
falcon._full_path = "/tmp/fake_path/test.h5"
|
|
st = falcon.on_complete()
|
|
assert st.done is False
|
|
assert st.success is False
|
|
falcon.dxp.current_pixel._read_pv.mock_data = num_points * frames_per_trigger
|
|
assert st.done is False
|
|
assert st.success is False
|
|
falcon.hdf5.array_counter._read_pv.mock_data = num_points * frames_per_trigger
|
|
st.wait(3)
|
|
assert st.done is True
|
|
assert st.success is True
|
|
assert falcon.file_event.get() == messages.FileMessage(
|
|
file_path="/tmp/fake_path/test.h5",
|
|
done=True,
|
|
successful=True,
|
|
device_name=falcon.name,
|
|
file_type="h5",
|
|
hinted_h5_entries=None,
|
|
metadata={},
|
|
)
|
|
|
|
# II. Test case where acquisition fails due to interruption
|
|
falcon.dxp.current_pixel._read_pv.mock_data = num_points * frames_per_trigger - 1
|
|
st = falcon.on_complete()
|
|
assert st.done is False
|
|
assert st.success is False
|
|
falcon.stop()
|
|
with pytest.raises(DeviceStoppedError):
|
|
st.wait(3)
|
|
assert falcon.file_event.get() == messages.FileMessage(
|
|
file_path="/tmp/fake_path/test.h5",
|
|
done=True,
|
|
successful=False,
|
|
device_name=falcon.name,
|
|
file_type="h5",
|
|
hinted_h5_entries=None,
|
|
metadata={},
|
|
)
|