319 lines
12 KiB
Python
319 lines
12 KiB
Python
# pylint: skip-file
|
|
import os
|
|
import threading
|
|
from time import time
|
|
from typing import TYPE_CHECKING, Generator
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
from bec_lib.messages import FileMessage, ScanStatusMessage
|
|
from jfjoch_client.models.broker_status import BrokerStatus
|
|
from jfjoch_client.models.dataset_settings import DatasetSettings
|
|
from jfjoch_client.models.detector_list import DetectorList
|
|
from jfjoch_client.models.detector_list_element import DetectorListElement
|
|
from jfjoch_client.models.detector_settings import DetectorSettings
|
|
from jfjoch_client.models.detector_timing import DetectorTiming
|
|
from jfjoch_client.models.file_writer_format import FileWriterFormat
|
|
from jfjoch_client.models.file_writer_settings import FileWriterSettings
|
|
from jfjoch_client.models.measurement_statistics import MeasurementStatistics
|
|
from ophyd import Staged
|
|
from ophyd_devices.utils.psi_device_base_utils import DeviceStatus
|
|
|
|
from csaxs_bec.devices.jungfraujoch.eiger import Eiger
|
|
from csaxs_bec.devices.jungfraujoch.eiger_1_5m import Eiger1_5M
|
|
from csaxs_bec.devices.jungfraujoch.eiger_9m import Eiger9M
|
|
|
|
if TYPE_CHECKING: # pragma no cover
|
|
from bec_lib.messages import FileMessage
|
|
|
|
# @pytest.fixture(scope="function")
|
|
# def scan_worker_mock(scan_server_mock):
|
|
# scan_server_mock.device_manager.connector = mock.MagicMock()
|
|
# scan_worker = ScanWorker(parent=scan_server_mock)
|
|
# yield scan_worker
|
|
|
|
|
|
@pytest.fixture(
|
|
scope="function",
|
|
params=[(0.1, 1, 1, "line_scan"), (0.2, 2, 2, "time_scan"), (0.5, 5, 5, "acquire")],
|
|
)
|
|
def mock_scan_info(request, tmpdir):
|
|
exp_time, frames_per_trigger, num_points, scan_name = request.param
|
|
scan_info = ScanStatusMessage(
|
|
scan_id="test_id",
|
|
status="open",
|
|
scan_number=1,
|
|
scan_parameters={
|
|
"exp_time": exp_time,
|
|
"frames_per_trigger": frames_per_trigger,
|
|
"system_config": {},
|
|
},
|
|
info={"file_components": (f"{tmpdir}/data/S00000/S000001", "h5")},
|
|
num_points=num_points,
|
|
scan_name=scan_name,
|
|
)
|
|
yield scan_info
|
|
|
|
|
|
@pytest.fixture(scope="function", params=[(1,), (2,)])
|
|
def detector_list(request) -> Generator[DetectorList, None, None]:
|
|
"""Fixture for the detector list."""
|
|
current_id = request.param[0]
|
|
detector_list = DetectorList(
|
|
detectors=[
|
|
DetectorListElement(
|
|
id=1,
|
|
description="EIGER 1.5M",
|
|
serial_number="123456",
|
|
base_ipv4_addr="192.168.0.1",
|
|
udp_interface_count=1,
|
|
nmodules=1,
|
|
width=512,
|
|
height=512,
|
|
pixel_size_mm=0.1,
|
|
readout_time_us=100,
|
|
min_frame_time_us=1000,
|
|
min_count_time_us=100,
|
|
type="EIGER",
|
|
),
|
|
DetectorListElement(
|
|
id=2,
|
|
description="EIGER 8.5M (tmp)",
|
|
serial_number="123456",
|
|
base_ipv4_addr="192.168.0.1",
|
|
udp_interface_count=1,
|
|
nmodules=1,
|
|
width=512,
|
|
height=512,
|
|
pixel_size_mm=0.1,
|
|
readout_time_us=100,
|
|
min_frame_time_us=1000,
|
|
min_count_time_us=100,
|
|
type="EIGER",
|
|
),
|
|
],
|
|
current_id=current_id,
|
|
)
|
|
yield detector_list
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def eiger_1_5m(mock_scan_info) -> Generator[Eiger1_5M, None, None]:
|
|
"""Fixture for the Eiger 1.5M device."""
|
|
name = "eiger_1_5m"
|
|
dev = Eiger1_5M(name=name, beam_center=(256, 256), detector_distance=100.0)
|
|
dev.scan_info.msg = mock_scan_info
|
|
yield dev
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def eiger_9m(mock_scan_info) -> Generator[Eiger9M, None, None]:
|
|
"""Fixture for the Eiger 9M device.
|
|
Currently only on_connected is different for both devices, all other methods are the same."""
|
|
name = "eiger_9m"
|
|
dev = Eiger9M(name=name)
|
|
dev.scan_info.msg = mock_scan_info
|
|
yield dev
|
|
|
|
|
|
@pytest.mark.parametrize("detector_state", ["Idle", "Inactive"])
|
|
def test_eiger_1_5m_on_connected(eiger_1_5m, detector_list, detector_state):
|
|
"""Test the on_connected logic of the Eiger detector."""
|
|
eiger = eiger_1_5m
|
|
detector_id = 1
|
|
with (
|
|
mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop,
|
|
mock.patch.object(
|
|
eiger.jfj_client.api, "config_select_detector_get", return_value=detector_list
|
|
),
|
|
mock.patch.object(
|
|
eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state=detector_state)
|
|
),
|
|
mock.patch.object(eiger.jfj_client, "set_detector_settings") as mock_set_det,
|
|
mock.patch.object(eiger.jfj_client.api, "config_file_writer_put") as mock_file_writer,
|
|
mock.patch.object(eiger, "jfj_preview_client") as mock_jfj_preview_client,
|
|
):
|
|
if detector_state != "Idle" or detector_list.current_id != detector_id:
|
|
with pytest.raises(RuntimeError):
|
|
eiger.on_connected()
|
|
mock_jfj_client_stop.assert_called_once()
|
|
assert mock_jfj_preview_client.call_count == 0
|
|
else:
|
|
eiger.on_connected()
|
|
assert mock_set_det.call_args == mock.call(
|
|
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10
|
|
)
|
|
assert mock_file_writer.call_args == mock.call(
|
|
file_writer_settings=FileWriterSettings(
|
|
overwrite=True, format=FileWriterFormat.NXMXVDS
|
|
),
|
|
_request_timeout=10,
|
|
)
|
|
mock_jfj_client_stop.assert_called_once()
|
|
assert mock_jfj_preview_client.connect.call_count == 1
|
|
assert mock_jfj_preview_client.start.call_count == 1
|
|
|
|
|
|
@pytest.mark.parametrize("detector_state", ["Idle", "Inactive"])
|
|
def test_eiger_9m_on_connected(eiger_9m, detector_list, detector_state):
|
|
"""Test the on_connected logic of the Eiger detector."""
|
|
eiger = eiger_9m
|
|
detector_id = 2
|
|
with (
|
|
mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop,
|
|
mock.patch.object(
|
|
eiger.jfj_client.api, "config_select_detector_get", return_value=detector_list
|
|
),
|
|
mock.patch.object(
|
|
eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state=detector_state)
|
|
),
|
|
mock.patch.object(eiger.jfj_client, "set_detector_settings") as mock_set_det,
|
|
mock.patch.object(eiger.jfj_client.api, "config_file_writer_put") as mock_file_writer,
|
|
mock.patch.object(eiger, "jfj_preview_client") as mock_jfj_preview_client,
|
|
):
|
|
if detector_state != "Idle" or detector_list.current_id != detector_id:
|
|
with pytest.raises(RuntimeError):
|
|
eiger.on_connected()
|
|
mock_jfj_client_stop.assert_called_once()
|
|
assert mock_jfj_preview_client.call_count == 0
|
|
else:
|
|
eiger.on_connected()
|
|
assert mock_set_det.call_args == mock.call(
|
|
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10
|
|
)
|
|
assert mock_file_writer.call_args == mock.call(
|
|
file_writer_settings=FileWriterSettings(
|
|
overwrite=True, format=FileWriterFormat.NXMXVDS
|
|
),
|
|
_request_timeout=10,
|
|
)
|
|
mock_jfj_client_stop.assert_called_once()
|
|
assert mock_jfj_preview_client.connect.call_count == 1
|
|
assert mock_jfj_preview_client.start.call_count == 1
|
|
|
|
|
|
@pytest.mark.timeout(20)
|
|
def test_eiger_on_stop(eiger_1_5m):
|
|
"""Test the on_stop logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
|
|
eiger = eiger_1_5m
|
|
start_event = threading.Event()
|
|
stop_event = threading.Event()
|
|
|
|
def tmp_task():
|
|
start_event.set()
|
|
try:
|
|
while True:
|
|
time.sleep(0.1)
|
|
finally:
|
|
stop_event.set()
|
|
|
|
eiger.task_handler.submit_task(tmp_task, run=True)
|
|
start_event.wait(timeout=5) # Wait for thread to start
|
|
|
|
with mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop:
|
|
eiger.on_stop()
|
|
mock_jfj_client_stop.assert_called_once()
|
|
stop_event.wait(timeout=5) # Thread should be killed from task_handler
|
|
|
|
|
|
@pytest.mark.timeout(25)
|
|
@pytest.mark.parametrize("raise_timeout", [True, False])
|
|
def test_eiger_on_complete(eiger_1_5m, raise_timeout):
|
|
"""Test the on_complete logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
|
|
eiger = eiger_1_5m
|
|
|
|
callback_completed_event = threading.Event()
|
|
|
|
def _callback_complete(status: DeviceStatus):
|
|
if status.done:
|
|
callback_completed_event.set()
|
|
|
|
unblock_wait_for_idle = threading.Event()
|
|
|
|
def mock_wait_for_idle(timeout: int, request_timeout: float):
|
|
if unblock_wait_for_idle.wait(timeout):
|
|
if raise_timeout:
|
|
return False
|
|
return True
|
|
return False
|
|
|
|
with (
|
|
mock.patch.object(eiger.jfj_client, "wait_for_idle", side_effect=mock_wait_for_idle),
|
|
mock.patch.object(
|
|
eiger.jfj_client.api,
|
|
"statistics_data_collection_get",
|
|
return_value=MeasurementStatistics(run_number=1),
|
|
),
|
|
):
|
|
status = eiger.complete()
|
|
status.add_callback(_callback_complete)
|
|
assert status.done == False
|
|
assert status.success == False
|
|
assert eiger.file_event.get() is None
|
|
unblock_wait_for_idle.set()
|
|
if raise_timeout:
|
|
with pytest.raises(TimeoutError):
|
|
status.wait(timeout=10)
|
|
else:
|
|
status.wait(timeout=10)
|
|
assert status.done == True
|
|
assert status.success == False if raise_timeout else True
|
|
|
|
|
|
def test_eiger_file_event_callback(eiger_1_5m, tmp_path):
|
|
"""Test the file_event callback of the Eiger detector. This is equivalent for 9M and 1_5M."""
|
|
eiger = eiger_1_5m
|
|
test_file = tmp_path / "test_file.h5"
|
|
eiger._full_path = str(test_file)
|
|
assert eiger.file_event.get() is None
|
|
status = DeviceStatus(device=eiger, done=True, success=True)
|
|
eiger._file_event_callback(status)
|
|
file_msg: FileMessage = eiger.file_event.get()
|
|
assert file_msg.device_name == eiger.name
|
|
assert file_msg.file_path == str(test_file)
|
|
assert file_msg.done is True
|
|
assert file_msg.successful is True
|
|
assert file_msg.hinted_h5_entries == {"data": "entry/data/data"}
|
|
status = DeviceStatus(device=eiger, done=False, success=False)
|
|
eiger._file_event_callback(status)
|
|
file_msg: FileMessage = eiger.file_event.get()
|
|
assert file_msg.device_name == eiger.name
|
|
assert file_msg.file_path == str(test_file)
|
|
assert file_msg.done is False
|
|
assert file_msg.successful is False
|
|
assert file_msg.hinted_h5_entries == {"data": "entry/data/data"}
|
|
|
|
|
|
def test_eiger_on_sage(eiger_1_5m):
|
|
"""Test the on_stage and on_unstage logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
|
|
eiger = eiger_1_5m
|
|
scan_msg = eiger.scan_info.msg
|
|
with (
|
|
mock.patch.object(eiger.jfj_client, "wait_for_idle", return_value=True),
|
|
mock.patch.object(eiger.jfj_client, "start") as mock_start,
|
|
):
|
|
eiger.stage()
|
|
assert (
|
|
eiger._full_path
|
|
== f"{scan_msg.info['file_components'][0]}_{eiger.name}_master.{scan_msg.info['file_components'][1]}"
|
|
)
|
|
file_msg: FileMessage = eiger.file_event.get()
|
|
assert file_msg.file_path == eiger._full_path
|
|
assert file_msg.done is False
|
|
assert file_msg.successful is False
|
|
assert file_msg.hinted_h5_entries == {"data": "entry/data/data"}
|
|
|
|
data_settings = DatasetSettings(
|
|
image_time_us=int(scan_msg.scan_parameters["exp_time"] * 1e6),
|
|
ntrigger=int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"]),
|
|
file_prefix=os.path.relpath(eiger._full_path, start="/sls/x12sa/data").removesuffix(
|
|
"_master.h5"
|
|
),
|
|
beam_x_pxl=eiger.beam_center[0],
|
|
beam_y_pxl=eiger.beam_center[1],
|
|
detector_distance_mm=eiger.detector_distance,
|
|
incident_energy_ke_v=12.0, # hardcoded at this moment as it is hardcoded in the Eiger implementation
|
|
)
|
|
assert mock_start.call_args == mock.call(settings=data_settings)
|
|
assert eiger.staged is Staged.yes
|