test(eiger): cleanup and add tests
This commit is contained in:
@@ -53,18 +53,3 @@ eiger_1_5:
|
||||
enabled: true
|
||||
readoutPriority: async
|
||||
softwareTrigger: False
|
||||
|
||||
samx:
|
||||
readoutPriority: baseline
|
||||
deviceClass: ophyd_devices.SimPositioner
|
||||
deviceConfig:
|
||||
delay: 1
|
||||
limits:
|
||||
- -50
|
||||
- 50
|
||||
tolerance: 0.01
|
||||
update_frequency: 400
|
||||
deviceTags:
|
||||
- user motors
|
||||
enabled: true
|
||||
readOnly: false
|
||||
|
||||
@@ -281,7 +281,7 @@ class Eiger(PSIDeviceBase):
|
||||
logger.info(f"Acquisition done callback called for {self.name} for status {status.success}")
|
||||
self.file_event.put(
|
||||
file_path=self._full_path,
|
||||
done=True,
|
||||
done=status.done,
|
||||
successful=status.success,
|
||||
hinted_h5_entries={"data": "entry/data/data"},
|
||||
)
|
||||
|
||||
@@ -25,7 +25,7 @@ DETECTOR_NAME = "EIGER 8.5M (tmp)" # "EIGER 9M""
|
||||
|
||||
|
||||
# pylint:disable=invalid-name
|
||||
class Eiger1_5M(Eiger):
|
||||
class Eiger9M(Eiger):
|
||||
"""
|
||||
Eiger 1.5M specific integration for the in-vaccum Eiger.
|
||||
|
||||
|
||||
@@ -74,6 +74,8 @@ class JungfrauJochClient:
|
||||
"""Set the connected status"""
|
||||
self._initialised = value
|
||||
|
||||
# TODO this is not correct, as it may be that the state in INACTIVE. Models are not in sync...
|
||||
# REMOVE all model enums as most of the validation takes place in the Pydantic models, i.e. BrokerStatus here..
|
||||
@property
|
||||
def detector_state(self) -> DetectorState:
|
||||
"""Get the status of JungfrauJoch"""
|
||||
|
||||
318
tests/tests_devices/test_eiger.py
Normal file
318
tests/tests_devices/test_eiger.py
Normal file
@@ -0,0 +1,318 @@
|
||||
# pylint: skip-file
|
||||
import os
|
||||
import threading
|
||||
from time import time
|
||||
from typing import TYPE_CHECKING, Generator
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from bec_lib.messages import FileMessage, ScanStatusMessage
|
||||
from jfjoch_client.models.broker_status import BrokerStatus
|
||||
from jfjoch_client.models.dataset_settings import DatasetSettings
|
||||
from jfjoch_client.models.detector_list import DetectorList
|
||||
from jfjoch_client.models.detector_list_element import DetectorListElement
|
||||
from jfjoch_client.models.detector_settings import DetectorSettings
|
||||
from jfjoch_client.models.detector_timing import DetectorTiming
|
||||
from jfjoch_client.models.file_writer_format import FileWriterFormat
|
||||
from jfjoch_client.models.file_writer_settings import FileWriterSettings
|
||||
from jfjoch_client.models.measurement_statistics import MeasurementStatistics
|
||||
from ophyd import Staged
|
||||
from ophyd_devices.utils.psi_device_base_utils import DeviceStatus
|
||||
|
||||
from csaxs_bec.devices.jungfraujoch.eiger import Eiger
|
||||
from csaxs_bec.devices.jungfraujoch.eiger_1_5m import Eiger1_5M
|
||||
from csaxs_bec.devices.jungfraujoch.eiger_9m import Eiger9M
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from bec_lib.messages import FileMessage
|
||||
|
||||
# @pytest.fixture(scope="function")
|
||||
# def scan_worker_mock(scan_server_mock):
|
||||
# scan_server_mock.device_manager.connector = mock.MagicMock()
|
||||
# scan_worker = ScanWorker(parent=scan_server_mock)
|
||||
# yield scan_worker
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope="function",
|
||||
params=[(0.1, 1, 1, "line_scan"), (0.2, 2, 2, "time_scan"), (0.5, 5, 5, "acquire")],
|
||||
)
|
||||
def mock_scan_info(request, tmpdir):
|
||||
exp_time, frames_per_trigger, num_points, scan_name = request.param
|
||||
scan_info = ScanStatusMessage(
|
||||
scan_id="test_id",
|
||||
status="open",
|
||||
scan_number=1,
|
||||
scan_parameters={
|
||||
"exp_time": exp_time,
|
||||
"frames_per_trigger": frames_per_trigger,
|
||||
"system_config": {},
|
||||
},
|
||||
info={"file_components": (f"{tmpdir}/data/S00000/S000001", "h5")},
|
||||
num_points=num_points,
|
||||
scan_name=scan_name,
|
||||
)
|
||||
yield scan_info
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", params=[(1,), (2,)])
|
||||
def detector_list(request) -> Generator[DetectorList, None, None]:
|
||||
"""Fixture for the detector list."""
|
||||
current_id = request.param[0]
|
||||
detector_list = DetectorList(
|
||||
detectors=[
|
||||
DetectorListElement(
|
||||
id=1,
|
||||
description="EIGER 1.5M",
|
||||
serial_number="123456",
|
||||
base_ipv4_addr="192.168.0.1",
|
||||
udp_interface_count=1,
|
||||
nmodules=1,
|
||||
width=512,
|
||||
height=512,
|
||||
pixel_size_mm=0.1,
|
||||
readout_time_us=100,
|
||||
min_frame_time_us=1000,
|
||||
min_count_time_us=100,
|
||||
type="EIGER",
|
||||
),
|
||||
DetectorListElement(
|
||||
id=2,
|
||||
description="EIGER 8.5M (tmp)",
|
||||
serial_number="123456",
|
||||
base_ipv4_addr="192.168.0.1",
|
||||
udp_interface_count=1,
|
||||
nmodules=1,
|
||||
width=512,
|
||||
height=512,
|
||||
pixel_size_mm=0.1,
|
||||
readout_time_us=100,
|
||||
min_frame_time_us=1000,
|
||||
min_count_time_us=100,
|
||||
type="EIGER",
|
||||
),
|
||||
],
|
||||
current_id=current_id,
|
||||
)
|
||||
yield detector_list
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def eiger_1_5m(mock_scan_info) -> Generator[Eiger1_5M, None, None]:
|
||||
"""Fixture for the Eiger 1.5M device."""
|
||||
name = "eiger_1_5m"
|
||||
dev = Eiger1_5M(name=name, beam_center=(256, 256), detector_distance=100.0)
|
||||
dev.scan_info.msg = mock_scan_info
|
||||
yield dev
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def eiger_9m(mock_scan_info) -> Generator[Eiger9M, None, None]:
|
||||
"""Fixture for the Eiger 9M device.
|
||||
Currently only on_connected is different for both devices, all other methods are the same."""
|
||||
name = "eiger_9m"
|
||||
dev = Eiger9M(name=name)
|
||||
dev.scan_info.msg = mock_scan_info
|
||||
yield dev
|
||||
|
||||
|
||||
@pytest.mark.parametrize("detector_state", ["Idle", "Inactive"])
|
||||
def test_eiger_1_5m_on_connected(eiger_1_5m, detector_list, detector_state):
|
||||
"""Test the on_connected logic of the Eiger detector."""
|
||||
eiger = eiger_1_5m
|
||||
detector_id = 1
|
||||
with (
|
||||
mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop,
|
||||
mock.patch.object(
|
||||
eiger.jfj_client.api, "config_select_detector_get", return_value=detector_list
|
||||
),
|
||||
mock.patch.object(
|
||||
eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state=detector_state)
|
||||
),
|
||||
mock.patch.object(eiger.jfj_client, "set_detector_settings") as mock_set_det,
|
||||
mock.patch.object(eiger.jfj_client.api, "config_file_writer_put") as mock_file_writer,
|
||||
mock.patch.object(eiger, "jfj_preview_client") as mock_jfj_preview_client,
|
||||
):
|
||||
if detector_state != "Idle" or detector_list.current_id != detector_id:
|
||||
with pytest.raises(RuntimeError):
|
||||
eiger.on_connected()
|
||||
mock_jfj_client_stop.assert_called_once()
|
||||
assert mock_jfj_preview_client.call_count == 0
|
||||
else:
|
||||
eiger.on_connected()
|
||||
assert mock_set_det.call_args == mock.call(
|
||||
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10
|
||||
)
|
||||
assert mock_file_writer.call_args == mock.call(
|
||||
file_writer_settings=FileWriterSettings(
|
||||
overwrite=True, format=FileWriterFormat.NXMXVDS
|
||||
),
|
||||
_request_timeout=10,
|
||||
)
|
||||
mock_jfj_client_stop.assert_called_once()
|
||||
assert mock_jfj_preview_client.connect.call_count == 1
|
||||
assert mock_jfj_preview_client.start.call_count == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize("detector_state", ["Idle", "Inactive"])
|
||||
def test_eiger_9m_on_connected(eiger_9m, detector_list, detector_state):
|
||||
"""Test the on_connected logic of the Eiger detector."""
|
||||
eiger = eiger_9m
|
||||
detector_id = 2
|
||||
with (
|
||||
mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop,
|
||||
mock.patch.object(
|
||||
eiger.jfj_client.api, "config_select_detector_get", return_value=detector_list
|
||||
),
|
||||
mock.patch.object(
|
||||
eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state=detector_state)
|
||||
),
|
||||
mock.patch.object(eiger.jfj_client, "set_detector_settings") as mock_set_det,
|
||||
mock.patch.object(eiger.jfj_client.api, "config_file_writer_put") as mock_file_writer,
|
||||
mock.patch.object(eiger, "jfj_preview_client") as mock_jfj_preview_client,
|
||||
):
|
||||
if detector_state != "Idle" or detector_list.current_id != detector_id:
|
||||
with pytest.raises(RuntimeError):
|
||||
eiger.on_connected()
|
||||
mock_jfj_client_stop.assert_called_once()
|
||||
assert mock_jfj_preview_client.call_count == 0
|
||||
else:
|
||||
eiger.on_connected()
|
||||
assert mock_set_det.call_args == mock.call(
|
||||
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10
|
||||
)
|
||||
assert mock_file_writer.call_args == mock.call(
|
||||
file_writer_settings=FileWriterSettings(
|
||||
overwrite=True, format=FileWriterFormat.NXMXVDS
|
||||
),
|
||||
_request_timeout=10,
|
||||
)
|
||||
mock_jfj_client_stop.assert_called_once()
|
||||
assert mock_jfj_preview_client.connect.call_count == 1
|
||||
assert mock_jfj_preview_client.start.call_count == 1
|
||||
|
||||
|
||||
@pytest.mark.timeout(20)
|
||||
def test_eiger_on_stop(eiger_1_5m):
|
||||
"""Test the on_stop logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
|
||||
eiger = eiger_1_5m
|
||||
start_event = threading.Event()
|
||||
stop_event = threading.Event()
|
||||
|
||||
def tmp_task():
|
||||
start_event.set()
|
||||
try:
|
||||
while True:
|
||||
time.sleep(0.1)
|
||||
finally:
|
||||
stop_event.set()
|
||||
|
||||
eiger.task_handler.submit_task(tmp_task, run=True)
|
||||
start_event.wait(timeout=5) # Wait for thread to start
|
||||
|
||||
with mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop:
|
||||
eiger.on_stop()
|
||||
mock_jfj_client_stop.assert_called_once()
|
||||
stop_event.wait(timeout=5) # Thread should be killed from task_handler
|
||||
|
||||
|
||||
@pytest.mark.timeout(25)
|
||||
@pytest.mark.parametrize("raise_timeout", [True, False])
|
||||
def test_eiger_on_complete(eiger_1_5m, raise_timeout):
|
||||
"""Test the on_complete logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
|
||||
eiger = eiger_1_5m
|
||||
|
||||
callback_completed_event = threading.Event()
|
||||
|
||||
def _callback_complete(status: DeviceStatus):
|
||||
if status.done:
|
||||
callback_completed_event.set()
|
||||
|
||||
unblock_wait_for_idle = threading.Event()
|
||||
|
||||
def mock_wait_for_idle(timeout: int, request_timeout: float):
|
||||
if unblock_wait_for_idle.wait(timeout):
|
||||
if raise_timeout:
|
||||
return False
|
||||
return True
|
||||
return False
|
||||
|
||||
with (
|
||||
mock.patch.object(eiger.jfj_client, "wait_for_idle", side_effect=mock_wait_for_idle),
|
||||
mock.patch.object(
|
||||
eiger.jfj_client.api,
|
||||
"statistics_data_collection_get",
|
||||
return_value=MeasurementStatistics(run_number=1),
|
||||
),
|
||||
):
|
||||
status = eiger.complete()
|
||||
status.add_callback(_callback_complete)
|
||||
assert status.done == False
|
||||
assert status.success == False
|
||||
assert eiger.file_event.get() is None
|
||||
unblock_wait_for_idle.set()
|
||||
if raise_timeout:
|
||||
with pytest.raises(TimeoutError):
|
||||
status.wait(timeout=10)
|
||||
else:
|
||||
status.wait(timeout=10)
|
||||
assert status.done == True
|
||||
assert status.success == False if raise_timeout else True
|
||||
|
||||
|
||||
def test_eiger_file_event_callback(eiger_1_5m, tmp_path):
|
||||
"""Test the file_event callback of the Eiger detector. This is equivalent for 9M and 1_5M."""
|
||||
eiger = eiger_1_5m
|
||||
test_file = tmp_path / "test_file.h5"
|
||||
eiger._full_path = str(test_file)
|
||||
assert eiger.file_event.get() is None
|
||||
status = DeviceStatus(device=eiger, done=True, success=True)
|
||||
eiger._file_event_callback(status)
|
||||
file_msg: FileMessage = eiger.file_event.get()
|
||||
assert file_msg.device_name == eiger.name
|
||||
assert file_msg.file_path == str(test_file)
|
||||
assert file_msg.done is True
|
||||
assert file_msg.successful is True
|
||||
assert file_msg.hinted_h5_entries == {"data": "entry/data/data"}
|
||||
status = DeviceStatus(device=eiger, done=False, success=False)
|
||||
eiger._file_event_callback(status)
|
||||
file_msg: FileMessage = eiger.file_event.get()
|
||||
assert file_msg.device_name == eiger.name
|
||||
assert file_msg.file_path == str(test_file)
|
||||
assert file_msg.done is False
|
||||
assert file_msg.successful is False
|
||||
assert file_msg.hinted_h5_entries == {"data": "entry/data/data"}
|
||||
|
||||
|
||||
def test_eiger_on_sage(eiger_1_5m):
|
||||
"""Test the on_stage and on_unstage logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
|
||||
eiger = eiger_1_5m
|
||||
scan_msg = eiger.scan_info.msg
|
||||
with (
|
||||
mock.patch.object(eiger.jfj_client, "wait_for_idle", return_value=True),
|
||||
mock.patch.object(eiger.jfj_client, "start") as mock_start,
|
||||
):
|
||||
eiger.stage()
|
||||
assert (
|
||||
eiger._full_path
|
||||
== f"{scan_msg.info['file_components'][0]}_{eiger.name}_master.{scan_msg.info['file_components'][1]}"
|
||||
)
|
||||
file_msg: FileMessage = eiger.file_event.get()
|
||||
assert file_msg.file_path == eiger._full_path
|
||||
assert file_msg.done is False
|
||||
assert file_msg.successful is False
|
||||
assert file_msg.hinted_h5_entries == {"data": "entry/data/data"}
|
||||
|
||||
data_settings = DatasetSettings(
|
||||
image_time_us=int(scan_msg.scan_parameters["exp_time"] * 1e6),
|
||||
ntrigger=int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"]),
|
||||
file_prefix=os.path.relpath(eiger._full_path, start="/sls/x12sa/data").removesuffix(
|
||||
"_master.h5"
|
||||
),
|
||||
beam_x_pxl=eiger.beam_center[0],
|
||||
beam_y_pxl=eiger.beam_center[1],
|
||||
detector_distance_mm=eiger.detector_distance,
|
||||
incident_energy_ke_v=12.0, # hardcoded at this moment as it is hardcoded in the Eiger implementation
|
||||
)
|
||||
assert mock_start.call_args == mock.call(settings=data_settings)
|
||||
assert eiger.staged is Staged.yes
|
||||
@@ -1,444 +0,0 @@
|
||||
# pylint: skip-file
|
||||
import threading
|
||||
from unittest import mock
|
||||
|
||||
import ophyd
|
||||
import pytest
|
||||
from bec_lib import messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_server.device_server.tests.utils import DMMock
|
||||
from ophyd_devices.tests.utils import MockPV
|
||||
|
||||
from csaxs_bec.devices.epics.eiger9m_csaxs import Eiger9McSAXS
|
||||
from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def mock_det():
|
||||
name = "eiger"
|
||||
prefix = "X12SA-ES-EIGER9M:"
|
||||
dm = DMMock()
|
||||
with mock.patch.object(dm, "connector"):
|
||||
with (
|
||||
mock.patch("ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"),
|
||||
mock.patch(
|
||||
"ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config"
|
||||
),
|
||||
):
|
||||
with mock.patch.object(ophyd, "cl") as mock_cl:
|
||||
mock_cl.get_pv = MockPV
|
||||
mock_cl.thread_class = threading.Thread
|
||||
with mock.patch.object(Eiger9McSAXS, "_init"):
|
||||
det = Eiger9McSAXS(name=name, prefix=prefix, device_manager=dm)
|
||||
patch_dual_pvs(det)
|
||||
det.TIMEOUT_FOR_SIGNALS = 0.1
|
||||
yield det
|
||||
|
||||
|
||||
def test_init():
|
||||
"""Test the _init function:"""
|
||||
name = "eiger"
|
||||
prefix = "X12SA-ES-EIGER9M:"
|
||||
dm = DMMock()
|
||||
with mock.patch.object(dm, "connector"):
|
||||
with (
|
||||
mock.patch("ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"),
|
||||
mock.patch(
|
||||
"ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config"
|
||||
),
|
||||
):
|
||||
with mock.patch.object(ophyd, "cl") as mock_cl:
|
||||
mock_cl.get_pv = MockPV
|
||||
with (
|
||||
mock.patch(
|
||||
"csaxs_bec.devices.epics.eiger9m_csaxs.Eiger9MSetup.initialize_default_parameter"
|
||||
) as mock_default,
|
||||
mock.patch(
|
||||
"csaxs_bec.devices.epics.eiger9m_csaxs.Eiger9MSetup.initialize_detector"
|
||||
) as mock_init_det,
|
||||
mock.patch(
|
||||
"csaxs_bec.devices.epics.eiger9m_csaxs.Eiger9MSetup.initialize_detector_backend"
|
||||
) as mock_init_backend,
|
||||
):
|
||||
Eiger9McSAXS(name=name, prefix=prefix, device_manager=dm)
|
||||
mock_default.assert_called_once()
|
||||
mock_init_det.assert_called_once()
|
||||
mock_init_backend.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"trigger_source, detector_state, expected_exception", [(2, 1, True), (2, 0, False)]
|
||||
)
|
||||
def test_initialize_detector(mock_det, trigger_source, detector_state, expected_exception):
|
||||
"""Test the _init function:
|
||||
|
||||
This includes testing the functions:
|
||||
- _init_detector
|
||||
- _stop_det
|
||||
- _set_trigger
|
||||
--> Testing the filewriter is done in test_init_filewriter
|
||||
|
||||
Validation upon setting the correct PVs
|
||||
|
||||
"""
|
||||
mock_det.cam.detector_state._read_pv.mock_data = detector_state
|
||||
if expected_exception:
|
||||
with pytest.raises(Exception):
|
||||
mock_det.timeout = 0.1
|
||||
mock_det.custom_prepare.initialize_detector()
|
||||
else:
|
||||
mock_det.custom_prepare.initialize_detector() # call the method you want to test
|
||||
assert mock_det.cam.acquire.get() == 0
|
||||
assert mock_det.cam.detector_state.get() == detector_state
|
||||
assert mock_det.cam.trigger_mode.get() == trigger_source
|
||||
|
||||
|
||||
def test_trigger(mock_det):
|
||||
"""Test the trigger function:
|
||||
Validate that trigger calls the custom_prepare.on_trigger() function
|
||||
"""
|
||||
with mock.patch.object(mock_det.custom_prepare, "on_trigger") as mock_on_trigger:
|
||||
mock_det.trigger()
|
||||
mock_on_trigger.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"readout_time, expected_value", [(1e-3, 3e-3), (3e-3, 3e-3), (5e-3, 5e-3), (None, 3e-3)]
|
||||
)
|
||||
def test_update_readout_time(mock_det, readout_time, expected_value):
|
||||
if readout_time is None:
|
||||
mock_det.custom_prepare.update_readout_time()
|
||||
assert mock_det.readout_time == expected_value
|
||||
else:
|
||||
mock_det.scaninfo.readout_time = readout_time
|
||||
mock_det.custom_prepare.update_readout_time()
|
||||
assert mock_det.readout_time == expected_value
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"eacc, exp_url, daq_status, daq_cfg, expected_exception",
|
||||
[
|
||||
("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_user_id": 12543}, False),
|
||||
("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_user_id": 15421}, False),
|
||||
("e12345", "http://xbl-daq-29:5000", {"state": "BUSY"}, {"writer_user_id": 15421}, True),
|
||||
("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_ud": 12345}, True),
|
||||
],
|
||||
)
|
||||
def test_initialize_detector_backend(
|
||||
mock_det, eacc, exp_url, daq_status, daq_cfg, expected_exception
|
||||
):
|
||||
"""Test self.custom_prepare.initialize_detector_backend (std daq in this case)
|
||||
|
||||
This includes testing the functions:
|
||||
|
||||
- _update_service_config
|
||||
|
||||
Validation upon checking set values in mocked std_daq instance
|
||||
"""
|
||||
with mock.patch("csaxs_bec.devices.epics.eiger9m_csaxs.StdDaqClient") as mock_std_daq:
|
||||
instance = mock_std_daq.return_value
|
||||
instance.stop_writer.return_value = None
|
||||
instance.get_status.return_value = daq_status
|
||||
instance.get_config.return_value = daq_cfg
|
||||
mock_det.scaninfo.username = eacc
|
||||
# scaninfo.username.return_value = eacc
|
||||
if expected_exception:
|
||||
with pytest.raises(Exception):
|
||||
mock_det.timeout = 0.1
|
||||
mock_det.custom_prepare.initialize_detector_backend()
|
||||
else:
|
||||
mock_det.custom_prepare.initialize_detector_backend()
|
||||
|
||||
instance.stop_writer.assert_called_once()
|
||||
instance.get_status.assert_called()
|
||||
instance.set_config.assert_called_once_with(daq_cfg)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"scaninfo, daq_status, daq_cfg, detector_state, stopped, expected_exception",
|
||||
[
|
||||
(
|
||||
{
|
||||
"eacc": "e12345",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"filepath": "test.h5",
|
||||
"scan_id": "123",
|
||||
"mokev": 12.4,
|
||||
},
|
||||
{"state": "READY"},
|
||||
{"writer_user_id": 12543},
|
||||
5,
|
||||
False,
|
||||
False,
|
||||
),
|
||||
(
|
||||
{
|
||||
"eacc": "e12345",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"filepath": "test.h5",
|
||||
"scan_id": "123",
|
||||
"mokev": 12.4,
|
||||
},
|
||||
{"state": "BUSY"},
|
||||
{"writer_user_id": 15421},
|
||||
5,
|
||||
False,
|
||||
False,
|
||||
),
|
||||
(
|
||||
{
|
||||
"eacc": "e12345",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"filepath": "test.h5",
|
||||
"scan_id": "123",
|
||||
"mokev": 18.4,
|
||||
},
|
||||
{"state": "READY"},
|
||||
{"writer_user_id": 12345},
|
||||
4,
|
||||
False,
|
||||
True,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_stage(
|
||||
mock_det, scaninfo, daq_status, daq_cfg, detector_state, stopped, expected_exception
|
||||
):
|
||||
with (
|
||||
mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq,
|
||||
mock.patch.object(
|
||||
mock_det.custom_prepare, "publish_file_location"
|
||||
) as mock_publish_file_location,
|
||||
):
|
||||
mock_std_daq.stop_writer.return_value = None
|
||||
mock_std_daq.get_status.return_value = daq_status
|
||||
mock_std_daq.get_config.return_value = daq_cfg
|
||||
mock_det.scaninfo.num_points = scaninfo["num_points"]
|
||||
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
|
||||
mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"]
|
||||
# TODO consider putting energy as variable in scaninfo
|
||||
mock_det.device_manager.add_device("mokev", value=12.4)
|
||||
mock_det.cam.beam_energy.put(scaninfo["mokev"])
|
||||
mock_det.stopped = stopped
|
||||
mock_det.cam.detector_state._read_pv.mock_data = detector_state
|
||||
with mock.patch.object(mock_det.custom_prepare, "prepare_data_backend") as mock_prep_fw:
|
||||
mock_det.filepath.set(scaninfo["filepath"]).wait()
|
||||
if expected_exception:
|
||||
with pytest.raises(Exception):
|
||||
mock_det.timeout = 0.1
|
||||
mock_det.stage()
|
||||
else:
|
||||
mock_det.stage()
|
||||
mock_prep_fw.assert_called_once()
|
||||
# Check _prep_det
|
||||
assert mock_det.cam.num_images.get() == int(
|
||||
scaninfo["num_points"] * scaninfo["frames_per_trigger"]
|
||||
)
|
||||
assert mock_det.cam.num_frames.get() == 1
|
||||
|
||||
mock_publish_file_location.assert_called_with(done=False, successful=False)
|
||||
assert mock_det.cam.acquire.get() == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"scaninfo, daq_status, expected_exception",
|
||||
[
|
||||
(
|
||||
{
|
||||
"eacc": "e12345",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"filepath": "test.h5",
|
||||
"scan_id": "123",
|
||||
},
|
||||
{"state": "BUSY", "acquisition": {"state": "WAITING_IMAGES"}},
|
||||
False,
|
||||
),
|
||||
(
|
||||
{
|
||||
"eacc": "e12345",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"filepath": "test.h5",
|
||||
"scan_id": "123",
|
||||
},
|
||||
{"state": "BUSY", "acquisition": {"state": "WAITING_IMAGES"}},
|
||||
False,
|
||||
),
|
||||
(
|
||||
{
|
||||
"eacc": "e12345",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"filepath": "test.h5",
|
||||
"scan_id": "123",
|
||||
},
|
||||
{"state": "BUSY", "acquisition": {"state": "ERROR"}},
|
||||
True,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_prepare_detector_backend(mock_det, scaninfo, daq_status, expected_exception):
|
||||
with (
|
||||
mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq,
|
||||
mock.patch.object(mock_det.custom_prepare, "filepath_exists") as mock_file_path_exists,
|
||||
mock.patch.object(mock_det.custom_prepare, "stop_detector_backend") as mock_stop_backend,
|
||||
mock.patch.object(mock_det, "scaninfo"),
|
||||
):
|
||||
mock_std_daq.start_writer_async.return_value = None
|
||||
mock_std_daq.get_status.return_value = daq_status
|
||||
mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"]
|
||||
mock_det.scaninfo.num_points = scaninfo["num_points"]
|
||||
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
|
||||
|
||||
if expected_exception:
|
||||
with pytest.raises(Exception):
|
||||
mock_det.timeout = 0.1
|
||||
mock_det.custom_prepare.prepare_data_backend()
|
||||
mock_file_path_exists.assert_called_once()
|
||||
assert mock_stop_backend.call_count == 2
|
||||
|
||||
else:
|
||||
mock_det.custom_prepare.prepare_data_backend()
|
||||
mock_file_path_exists.assert_called_once()
|
||||
mock_stop_backend.assert_called_once()
|
||||
|
||||
daq_writer_call = {
|
||||
"output_file": scaninfo["filepath"],
|
||||
"n_images": int(scaninfo["num_points"] * scaninfo["frames_per_trigger"]),
|
||||
}
|
||||
mock_std_daq.start_writer_async.assert_called_with(daq_writer_call)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)])
|
||||
def test_complete(mock_det, stopped, expected_exception):
|
||||
with (
|
||||
mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,
|
||||
mock.patch.object(
|
||||
mock_det.custom_prepare, "publish_file_location"
|
||||
) as mock_publish_file_location,
|
||||
):
|
||||
mock_det.stopped = stopped
|
||||
if expected_exception:
|
||||
mock_det.complete()
|
||||
assert mock_det.stopped is True
|
||||
else:
|
||||
mock_det.complete()
|
||||
mock_finished.assert_called_once()
|
||||
mock_publish_file_location.assert_called_with(done=True, successful=True)
|
||||
assert mock_det.stopped is False
|
||||
|
||||
|
||||
def test_stop_detector_backend(mock_det):
|
||||
with mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq:
|
||||
mock_std_daq.stop_writer.return_value = None
|
||||
mock_det.std_client = mock_std_daq
|
||||
mock_det.custom_prepare.stop_detector_backend()
|
||||
mock_std_daq.stop_writer.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"scaninfo",
|
||||
[
|
||||
({"filepath": "test.h5", "successful": True, "done": False, "scan_id": "123"}),
|
||||
({"filepath": "test.h5", "successful": False, "done": True, "scan_id": "123"}),
|
||||
],
|
||||
)
|
||||
def test_publish_file_location(mock_det, scaninfo):
|
||||
mock_det.scaninfo.scan_id = scaninfo["scan_id"]
|
||||
mock_det.filepath.set(scaninfo["filepath"]).wait()
|
||||
mock_det.custom_prepare.publish_file_location(
|
||||
done=scaninfo["done"], successful=scaninfo["successful"]
|
||||
)
|
||||
if scaninfo["successful"] is None:
|
||||
msg = messages.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"])
|
||||
else:
|
||||
msg = messages.FileMessage(
|
||||
file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"]
|
||||
)
|
||||
expected_calls = [
|
||||
mock.call(
|
||||
MessageEndpoints.public_file(scaninfo["scan_id"], mock_det.name),
|
||||
msg,
|
||||
pipe=mock_det.connector.pipeline.return_value,
|
||||
),
|
||||
mock.call(
|
||||
MessageEndpoints.file_event(mock_det.name),
|
||||
msg,
|
||||
pipe=mock_det.connector.pipeline.return_value,
|
||||
),
|
||||
]
|
||||
assert mock_det.connector.set_and_publish.call_args_list == expected_calls
|
||||
|
||||
|
||||
def test_stop(mock_det):
|
||||
with (
|
||||
mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
|
||||
mock.patch.object(
|
||||
mock_det.custom_prepare, "stop_detector_backend"
|
||||
) as mock_stop_detector_backend,
|
||||
):
|
||||
mock_det.stop()
|
||||
mock_stop_det.assert_called_once()
|
||||
mock_stop_detector_backend.assert_called_once()
|
||||
assert mock_det.stopped is True
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"stopped, scaninfo, cam_state, daq_status, expected_exception",
|
||||
[
|
||||
(
|
||||
False,
|
||||
{"num_points": 500, "frames_per_trigger": 4},
|
||||
0,
|
||||
{"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 2000}}},
|
||||
False,
|
||||
),
|
||||
(
|
||||
False,
|
||||
{"num_points": 500, "frames_per_trigger": 4},
|
||||
0,
|
||||
{"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 1999}}},
|
||||
True,
|
||||
),
|
||||
(
|
||||
False,
|
||||
{"num_points": 500, "frames_per_trigger": 1},
|
||||
1,
|
||||
{"acquisition": {"state": "READY", "stats": {"n_write_completed": 500}}},
|
||||
True,
|
||||
),
|
||||
(
|
||||
False,
|
||||
{"num_points": 500, "frames_per_trigger": 1},
|
||||
0,
|
||||
{"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 500}}},
|
||||
False,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_finished(mock_det, stopped, cam_state, daq_status, scaninfo, expected_exception):
|
||||
with (
|
||||
mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq,
|
||||
mock.patch.object(mock_det.custom_prepare, "stop_detector_backend") as mock_stop_backend,
|
||||
mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
|
||||
):
|
||||
mock_std_daq.get_status.return_value = daq_status
|
||||
mock_det.cam.acquire._read_pv.mock_state = cam_state
|
||||
mock_det.scaninfo.num_points = scaninfo["num_points"]
|
||||
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
|
||||
if expected_exception:
|
||||
with pytest.raises(Exception):
|
||||
mock_det.timeout = 0.1
|
||||
mock_det.custom_prepare.finished()
|
||||
assert mock_det.stopped is stopped
|
||||
else:
|
||||
mock_det.custom_prepare.finished()
|
||||
if stopped:
|
||||
assert mock_det.stopped is stopped
|
||||
|
||||
mock_stop_backend.assert_called()
|
||||
mock_stop_det.assert_called_once()
|
||||
Reference in New Issue
Block a user