From dd3b0144b9a0ffe13d3c5bc0e0f4aa9713aceaca Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 6 Jan 2026 10:45:07 +0100 Subject: [PATCH 1/4] feat(pilatus): deprecate pilatus integration --- csaxs_bec/devices/epics/pilatus_csaxs.py | 400 ------------------- tests/tests_devices/test_pilatus_csaxs.py | 449 ---------------------- 2 files changed, 849 deletions(-) delete mode 100644 csaxs_bec/devices/epics/pilatus_csaxs.py delete mode 100644 tests/tests_devices/test_pilatus_csaxs.py diff --git a/csaxs_bec/devices/epics/pilatus_csaxs.py b/csaxs_bec/devices/epics/pilatus_csaxs.py deleted file mode 100644 index 6a87369..0000000 --- a/csaxs_bec/devices/epics/pilatus_csaxs.py +++ /dev/null @@ -1,400 +0,0 @@ -import enum -import json -import os -import threading -import time - -import numpy as np -import requests -from bec_lib import bec_logger -from ophyd import ADComponent as ADCpt -from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Staged -from ophyd_devices.interfaces.base_classes.psi_detector_base import ( - CustomDetectorMixin, - PSIDetectorBase, -) - -logger = bec_logger.logger - - -class PilatusError(Exception): - """Base class for exceptions in this module.""" - - -class PilatusTimeoutError(PilatusError): - """Raised when the Pilatus does not respond in time during unstage.""" - - -class TriggerSource(enum.IntEnum): - """Trigger source options for the detector""" - - INTERNAL = 0 - EXT_ENABLE = 1 - EXT_TRIGGER = 2 - MULTI_TRIGGER = 3 - ALGINMENT = 4 - - -class SLSDetectorCam(Device): - """SLS Detector Camera - Pilatus - - Base class to map EPICS PVs to ophyd signals. - """ - - num_images = ADCpt(EpicsSignalWithRBV, "NumImages") - num_frames = ADCpt(EpicsSignalWithRBV, "NumExposures") - delay_time = ADCpt(EpicsSignalWithRBV, "NumExposures") - trigger_mode = ADCpt(EpicsSignalWithRBV, "TriggerMode") - acquire = ADCpt(EpicsSignal, "Acquire") - armed = ADCpt(EpicsSignalRO, "Armed") - - read_file_timeout = ADCpt(EpicsSignal, "ImageFileTmot") - detector_state = ADCpt(EpicsSignalRO, "StatusMessage_RBV") - status_message_camserver = ADCpt(EpicsSignalRO, "StringFromServer_RBV", string=True) - acquire_time = ADCpt(EpicsSignal, "AcquireTime") - acquire_period = ADCpt(EpicsSignal, "AcquirePeriod") - threshold_energy = ADCpt(EpicsSignalWithRBV, "ThresholdEnergy") - file_path = ADCpt(EpicsSignalWithRBV, "FilePath") - file_name = ADCpt(EpicsSignalWithRBV, "FileName") - file_number = ADCpt(EpicsSignalWithRBV, "FileNumber") - auto_increment = ADCpt(EpicsSignalWithRBV, "AutoIncrement") - file_template = ADCpt(EpicsSignalWithRBV, "FileTemplate") - file_format = ADCpt(EpicsSignalWithRBV, "FileNumber") - gap_fill = ADCpt(EpicsSignalWithRBV, "GapFill") - - -class PilatusSetup(CustomDetectorMixin): - """Pilatus setup class for cSAXS - - Parent class: CustomDetectorMixin - - """ - - def __init__(self, *args, parent: Device = None, **kwargs) -> None: - super().__init__(*args, parent=parent, **kwargs) - self._lock = threading.RLock() - - def on_init(self) -> None: - """Initialize the detector""" - self.initialize_default_parameter() - self.initialize_detector() - - def initialize_default_parameter(self) -> None: - """Set default parameters for Eiger9M detector""" - self.update_readout_time() - - def update_readout_time(self) -> None: - """Set readout time for Eiger9M detector""" - readout_time = ( - self.parent.scaninfo.readout_time - if hasattr(self.parent.scaninfo, "readout_time") - else self.parent.MIN_READOUT - ) - self.parent.readout_time = max(readout_time, self.parent.MIN_READOUT) - - def initialize_detector(self) -> None: - """Initialize detector""" - # Stops the detector - self.stop_detector() - # Sets the trigger source to GATING - self.parent.cam.trigger_mode.put(TriggerSource.EXT_ENABLE) - - def on_stage(self) -> None: - """Stage the detector for scan""" - self.prepare_detector() - self.prepare_data_backend() - self.publish_file_location( - done=False, successful=False, metadata={"input_path": self.parent.filepath_raw} - ) - - def prepare_detector(self) -> None: - """ - Prepare detector for scan. - - Includes checking the detector threshold, - setting the acquisition parameters and setting the trigger source - """ - self.set_detector_threshold() - self.set_acquisition_params() - self.parent.cam.trigger_mode.put(TriggerSource.EXT_ENABLE) - - def prepare_data_backend(self) -> None: - """ - Prepare the detector backend of pilatus for a scan - - A zmq service is running on xbl-daq-34 that is waiting - for a zmq message to start the writer for the pilatus_2 x12sa-pd-2 - - """ - - self.stop_detector_backend() - - self.parent.filepath.set( - self.parent.filewriter.compile_full_filename("pilatus_2.h5") - ).wait() - self.parent.cam.file_path.put("/dev/shm/zmq/") - self.parent.cam.file_name.put( - f"{self.parent.scaninfo.username}_2_{self.parent.scaninfo.scan_number:05d}" - ) - self.parent.cam.auto_increment.put(1) # auto increment - self.parent.cam.file_number.put(0) # first iter - self.parent.cam.file_format.put(0) # 0: TIFF - self.parent.cam.file_template.put("%s%s_%5.5d.cbf") - - # TODO better to remove hard coded path with link to home directory/pilatus_2 - basepath = f"/sls/X12SA/data/{self.parent.scaninfo.username}/Data10/pilatus_2/" - self.parent.filepath_raw = os.path.join( - basepath, - self.parent.filewriter.get_scan_directory(self.parent.scaninfo.scan_number, 1000, 5), - ) - # Make directory if needed - self.create_directory(self.parent.filepath_raw) - - headers = {"Content-Type": "application/json", "Accept": "application/json"} - # start the stream on x12sa-pd-2 - url = "http://x12sa-pd-2:8080/stream/pilatus_2" - data_msg = { - "source": [ - { - "searchPath": "/", - "searchPattern": "glob:*.cbf", - "destinationPath": self.parent.filepath_raw, - } - ] - } - res = self.send_requests_put(url=url, data=data_msg, headers=headers) - logger.info(f"{res.status_code} - {res.text} - {res.content}") - - if not res.ok: - res.raise_for_status() - - # start the data receiver on xbl-daq-34 - url = "http://xbl-daq-34:8091/pilatus_2/run" - data_msg = [ - "zmqWriter", - self.parent.scaninfo.username, - { - "addr": "tcp://x12sa-pd-2:8888", - "dst": ["file"], - "numFrm": int( - self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger - ), - "timeout": 2000, - "ifType": "PULL", - "user": self.parent.scaninfo.username, - }, - ] - res = self.send_requests_put(url=url, data=data_msg, headers=headers) - logger.info(f"{res.status_code} - {res.text} - {res.content}") - - if not res.ok: - res.raise_for_status() - - # Wait for server to become available again - time.sleep(0.1) - logger.info(f"{res.status_code} -{res.text} - {res.content}") - - # Send requests.put to xbl-daq-34 to wait for data - url = "http://xbl-daq-34:8091/pilatus_2/wait" - data_msg = [ - "zmqWriter", - self.parent.scaninfo.username, - { - "frmCnt": int( - self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger - ), - "timeout": 2000, - }, - ] - try: - res = self.send_requests_put(url=url, data=data_msg, headers=headers) - logger.info(f"{res}") - - if not res.ok: - res.raise_for_status() - except Exception as exc: - logger.info(f"Pilatus2 wait threw Exception: {exc}") - - def set_detector_threshold(self) -> None: - """ - Set correct detector threshold to 1/2 of current X-ray energy, allow 5% tolerance - - Threshold might be in ev or keV - """ - - # get current beam energy from device manageer - mokev = self.parent.device_manager.devices.mokev.obj.read()[ - self.parent.device_manager.devices.mokev.name - ]["value"] - factor = 1 - - # Check if energies are eV or keV, assume keV as the default - unit = getattr(self.parent.cam.threshold_energy, "units", None) - if unit is not None and unit == "eV": - factor = 1000 - - # set energy on detector - setpoint = int(mokev * factor) - - # set threshold on detector - threshold = self.parent.cam.threshold_energy.read()[self.parent.cam.threshold_energy.name][ - "value" - ] - if not np.isclose(setpoint / 2, threshold, rtol=0.05): - self.parent.cam.threshold_energy.set(setpoint / 2) - - def set_acquisition_params(self) -> None: - """Set acquisition parameters for the detector""" - - # Set number of images and frames (frames is for internal burst of detector) - self.parent.cam.num_images.put( - int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger) - ) - self.parent.cam.num_frames.put(1) - - # Update the readout time of the detector - self.update_readout_time() - - def create_directory(self, filepath: str) -> None: - """Create directory if it does not exist""" - os.makedirs(filepath, exist_ok=True) - - def close_file_writer(self) -> None: - """ - Close the file writer for pilatus_2 - - Delete the data from x12sa-pd-2 - - """ - url = "http://x12sa-pd-2:8080/stream/pilatus_2" - try: - res = self.send_requests_delete(url=url) - if not res.ok: - res.raise_for_status() - except Exception as exc: - logger.info(f"Pilatus2 close threw Exception: {exc}") - - def stop_file_writer(self) -> None: - """ - Stop the file writer for pilatus_2 - - Runs on xbl-daq-34 - """ - url = "http://xbl-daq-34:8091/pilatus_2/stop" - res = self.send_requests_put(url=url) - if not res.ok: - res.raise_for_status() - - def send_requests_put(self, url: str, data: list = None, headers: dict = None) -> object: - """ - Send a put request to the given url - - Args: - url (str): url to send the request to - data (dict): data to be sent with the request (optional) - headers (dict): headers to be sent with the request (optional) - - Returns: - status code of the request - """ - return requests.put(url=url, data=json.dumps(data), headers=headers, timeout=5) - - def send_requests_delete(self, url: str, headers: dict = None) -> object: - """ - Send a delete request to the given url - - Args: - url (str): url to send the request to - headers (dict): headers to be sent with the request (optional) - - Returns: - status code of the request - """ - return requests.delete(url=url, headers=headers, timeout=5) - - def on_pre_scan(self) -> None: - """Prepare detector for scan""" - self.arm_acquisition() - - def arm_acquisition(self) -> None: - """Arms the detector for the acquisition""" - self.parent.cam.acquire.put(1) - # TODO is this sleep needed? to be tested with detector and for how long - time.sleep(0.5) - - def on_unstage(self) -> None: - """Unstage the detector""" - pass - - def on_complete(self) -> None: - """Complete the scan""" - self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS) - self.publish_file_location( - done=True, successful=True, metadata={"input_path": self.parent.filepath_raw} - ) - - def finished(self, timeout: int = 5) -> None: - """Check if acquisition is finished.""" - # pylint: disable=protected-access - # TODO: at the moment this relies on device.mcs.obj._staged attribute - signal_conditions = [ - (lambda: self.parent.device_manager.devices.mcs.obj._staged, Staged.no) - ] - if not self.wait_for_signals( - signal_conditions=signal_conditions, - timeout=timeout, - check_stopped=True, - all_signals=True, - ): - raise PilatusTimeoutError( - f"Reached timeout with detector state {signal_conditions[0][0]}, std_daq state" - f" {signal_conditions[1][0]} and received frames of {signal_conditions[2][0]} for" - " the file writer" - ) - self.stop_detector() - self.stop_detector_backend() - - def on_stop(self) -> None: - """Stop detector""" - self.stop_detector() - self.stop_detector_backend() - - def stop_detector(self) -> None: - """Stop detector""" - self.parent.cam.acquire.put(0) - - def stop_detector_backend(self) -> None: - """Stop the file writer zmq service for pilatus_2""" - self.close_file_writer() - time.sleep(0.1) - self.stop_file_writer() - time.sleep(0.1) - - -class PilatuscSAXS(PSIDetectorBase): - """Pilatus_2 300k detector for CSAXS - - Parent class: PSIDetectorBase - - class attributes: - custom_prepare_cls (Eiger9MSetup) : Custom detector setup class for cSAXS, - inherits from CustomDetectorMixin - cam (SLSDetectorCam) : Detector camera - MIN_READOUT (float) : Minimum readout time for the detector - - """ - - # Specify which functions are revealed to the user in BEC client - USER_ACCESS = [] - - # specify Setup class - custom_prepare_cls = PilatusSetup - # specify minimum readout time for detector - MIN_READOUT = 3e-3 - TIMEOUT_FOR_SIGNALS = 5 - # specify class attributes - cam = ADCpt(SLSDetectorCam, "cam1:") - - -if __name__ == "__main__": - pilatus_2 = PilatuscSAXS(name="pilatus_2", prefix="X12SA-ES-PILATUS300K:", sim_mode=True) diff --git a/tests/tests_devices/test_pilatus_csaxs.py b/tests/tests_devices/test_pilatus_csaxs.py deleted file mode 100644 index 8b04372..0000000 --- a/tests/tests_devices/test_pilatus_csaxs.py +++ /dev/null @@ -1,449 +0,0 @@ -# pylint: skip-file -import os -import threading -from unittest import mock - -import ophyd -import pytest -from bec_lib import messages -from bec_lib.endpoints import MessageEndpoints -from bec_server.device_server.tests.utils import DMMock -from ophyd_devices.tests.utils import MockPV - -from csaxs_bec.devices.epics.pilatus_csaxs import PilatuscSAXS -from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs - - -@pytest.fixture(scope="function") -def mock_det(): - name = "pilatus" - prefix = "X12SA-ES-PILATUS300K:" - dm = DMMock() - with mock.patch.object(dm, "connector"): - with ( - mock.patch("ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"), - mock.patch( - "ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config" - ), - ): - with mock.patch.object(ophyd, "cl") as mock_cl: - mock_cl.get_pv = MockPV - mock_cl.thread_class = threading.Thread - with mock.patch.object(PilatuscSAXS, "_init"): - det = PilatuscSAXS(name=name, prefix=prefix, device_manager=dm) - patch_dual_pvs(det) - yield det - - -@pytest.mark.parametrize("trigger_source, detector_state", [(1, 0)]) -# TODO rewrite this one, write test for init_detector, init_filewriter is tested -def test_init_detector(mock_det, trigger_source, detector_state): - """Test the _init function: - - This includes testing the functions: - - _init_detector - - _stop_det - - _set_trigger - --> Testing the filewriter is done in test_init_filewriter - - Validation upon setting the correct PVs - - """ - mock_det.custom_prepare.on_init() # call the method you want to test - assert mock_det.cam.acquire.get() == detector_state - assert mock_det.cam.trigger_mode.get() == trigger_source - - -@pytest.mark.parametrize( - "scaninfo, stopped, expected_exception", - [ - ( - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "filepath": "test.h5", - "scan_id": "123", - "mokev": 12.4, - }, - False, - False, - ), - ( - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "filepath": "test.h5", - "scan_id": "123", - "mokev": 12.4, - }, - True, - False, - ), - ], -) -def test_stage(mock_det, scaninfo, stopped, expected_exception): - path = "tmp" - mock_det.filepath_raw = path - with mock.patch.object( - mock_det.custom_prepare, "publish_file_location" - ) as mock_publish_file_location: - mock_det.scaninfo.num_points = scaninfo["num_points"] - mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] - mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"] - mock_det.device_manager.add_device("mokev", value=12.4) - mock_det.stopped = stopped - with ( - mock.patch.object(mock_det.custom_prepare, "prepare_data_backend") as mock_data_backend, - mock.patch.object( - mock_det.custom_prepare, "update_readout_time" - ) as mock_update_readout_time, - ): - mock_det.filepath.set(scaninfo["filepath"]).wait() - if expected_exception: - with pytest.raises(Exception): - mock_det.timeout = 0.1 - mock_det.stage() - else: - mock_det.stage() - mock_data_backend.assert_called_once() - mock_update_readout_time.assert_called() - # Check _prep_det - assert mock_det.cam.num_images.get() == int( - scaninfo["num_points"] * scaninfo["frames_per_trigger"] - ) - assert mock_det.cam.num_frames.get() == 1 - - mock_publish_file_location.assert_called_once_with( - done=False, successful=False, metadata={"input_path": path} - ) - - -def test_pre_scan(mock_det): - mock_det.custom_prepare.on_pre_scan() - assert mock_det.cam.acquire.get() == 1 - - -@pytest.mark.parametrize( - "readout_time, expected_value", [(1e-3, 3e-3), (3e-3, 3e-3), (5e-3, 5e-3), (None, 3e-3)] -) -def test_update_readout_time(mock_det, readout_time, expected_value): - if readout_time is None: - mock_det.custom_prepare.update_readout_time() - assert mock_det.readout_time == expected_value - else: - mock_det.scaninfo.readout_time = readout_time - mock_det.custom_prepare.update_readout_time() - assert mock_det.readout_time == expected_value - - -@pytest.mark.parametrize( - "scaninfo", - [ - ( - { - "filepath": "test.h5", - "filepath_raw": "test5_raw.h5", - "successful": True, - "done": False, - "scan_id": "123", - } - ), - ( - { - "filepath": "test.h5", - "filepath_raw": "test5_raw.h5", - "successful": False, - "done": True, - "scan_id": "123", - } - ), - ], -) -def test_publish_file_location(mock_det, scaninfo): - mock_det.scaninfo.scan_id = scaninfo["scan_id"] - mock_det.filepath.set(scaninfo["filepath"]).wait() - mock_det.filepath_raw = scaninfo["filepath_raw"] - mock_det.custom_prepare.publish_file_location( - done=scaninfo["done"], - successful=scaninfo["successful"], - metadata={"input_path": scaninfo["filepath_raw"]}, - ) - if scaninfo["successful"] is None: - msg = messages.FileMessage( - file_path=scaninfo["filepath"], - done=scaninfo["done"], - metadata={"input_path": scaninfo["filepath_raw"]}, - ) - else: - msg = messages.FileMessage( - file_path=scaninfo["filepath"], - done=scaninfo["done"], - metadata={"input_path": scaninfo["filepath_raw"]}, - successful=scaninfo["successful"], - ) - expected_calls = [ - mock.call( - MessageEndpoints.public_file(scaninfo["scan_id"], mock_det.name), - msg, - pipe=mock_det.connector.pipeline.return_value, - ), - mock.call( - MessageEndpoints.file_event(mock_det.name), - msg, - pipe=mock_det.connector.pipeline.return_value, - ), - ] - assert mock_det.connector.set_and_publish.call_args_list == expected_calls - - -@pytest.mark.parametrize( - "requests_state, expected_exception, url_delete, url_put", - [ - ( - True, - False, - "http://x12sa-pd-2:8080/stream/pilatus_2", - "http://xbl-daq-34:8091/pilatus_2/stop", - ), - ( - False, - False, - "http://x12sa-pd-2:8080/stream/pilatus_2", - "http://xbl-daq-34:8091/pilatus_2/stop", - ), - ], -) -def test_stop_detector_backend(mock_det, requests_state, expected_exception, url_delete, url_put): - with ( - mock.patch.object( - mock_det.custom_prepare, "send_requests_delete" - ) as mock_send_requests_delete, - mock.patch.object(mock_det.custom_prepare, "send_requests_put") as mock_send_requests_put, - ): - instance_delete = mock_send_requests_delete.return_value - instance_delete.ok = requests_state - instance_put = mock_send_requests_put.return_value - instance_put.ok = requests_state - if expected_exception: - mock_det.custom_prepare.stop_detector_backend() - mock_send_requests_delete.assert_called_once_with(url=url_delete) - mock_send_requests_put.assert_called_once_with(url=url_put) - instance_delete.raise_for_status.called_once() - instance_put.raise_for_status.called_once() - else: - mock_det.custom_prepare.stop_detector_backend() - mock_send_requests_delete.assert_called_once_with(url=url_delete) - mock_send_requests_put.assert_called_once_with(url=url_put) - - -@pytest.mark.parametrize( - "scaninfo, data_msgs, urls, requests_state, expected_exception", - [ - ( - { - "filepath_raw": "pilatus_2.h5", - "eacc": "e12345", - "scan_number": 1000, - "scan_directory": "S00000_00999", - "num_points": 500, - "frames_per_trigger": 1, - "headers": {"Content-Type": "application/json", "Accept": "application/json"}, - }, - [ - { - "source": [ - { - "searchPath": "/", - "searchPattern": "glob:*.cbf", - "destinationPath": ( - "/sls/X12SA/data/e12345/Data10/pilatus_2/S00000_00999" - ), - } - ] - }, - [ - "zmqWriter", - "e12345", - { - "addr": "tcp://x12sa-pd-2:8888", - "dst": ["file"], - "numFrm": 500, - "timeout": 2000, - "ifType": "PULL", - "user": "e12345", - }, - ], - ["zmqWriter", "e12345", {"frmCnt": 500, "timeout": 2000}], - ], - [ - "http://x12sa-pd-2:8080/stream/pilatus_2", - "http://xbl-daq-34:8091/pilatus_2/run", - "http://xbl-daq-34:8091/pilatus_2/wait", - ], - True, - False, - ), - ( - { - "filepath_raw": "pilatus_2.h5", - "eacc": "e12345", - "scan_number": 1000, - "scan_directory": "S00000_00999", - "num_points": 500, - "frames_per_trigger": 1, - "headers": {"Content-Type": "application/json", "Accept": "application/json"}, - }, - [ - { - "source": [ - { - "searchPath": "/", - "searchPattern": "glob:*.cbf", - "destinationPath": ( - "/sls/X12SA/data/e12345/Data10/pilatus_2/S00000_00999" - ), - } - ] - }, - [ - "zmqWriter", - "e12345", - { - "addr": "tcp://x12sa-pd-2:8888", - "dst": ["file"], - "numFrm": 500, - "timeout": 2000, - "ifType": "PULL", - "user": "e12345", - }, - ], - ["zmqWriter", "e12345", {"frmCnt": 500, "timeout": 2000}], - ], - [ - "http://x12sa-pd-2:8080/stream/pilatus_2", - "http://xbl-daq-34:8091/pilatus_2/run", - "http://xbl-daq-34:8091/pilatus_2/wait", - ], - False, # return of res.ok is False! - True, - ), - ], -) -def test_prep_file_writer(mock_det, scaninfo, data_msgs, urls, requests_state, expected_exception): - with ( - mock.patch.object(mock_det.custom_prepare, "close_file_writer") as mock_close_file_writer, - mock.patch.object(mock_det.custom_prepare, "stop_file_writer") as mock_stop_file_writer, - mock.patch.object(mock_det, "filewriter") as mock_filewriter, - mock.patch.object(mock_det.custom_prepare, "create_directory") as mock_create_directory, - mock.patch.object(mock_det.custom_prepare, "send_requests_put") as mock_send_requests_put, - ): - mock_det.scaninfo.scan_number = scaninfo["scan_number"] - mock_det.scaninfo.num_points = scaninfo["num_points"] - mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] - mock_det.scaninfo.username = scaninfo["eacc"] - mock_filewriter.compile_full_filename.return_value = scaninfo["filepath_raw"] - mock_filewriter.get_scan_directory.return_value = scaninfo["scan_directory"] - instance = mock_send_requests_put.return_value - instance.ok = requests_state - instance.raise_for_status.side_effect = Exception - - if expected_exception: - with pytest.raises(Exception): - mock_det.timeout = 0.1 - mock_det.custom_prepare.prepare_data_backend() - mock_close_file_writer.assert_called_once() - mock_stop_file_writer.assert_called_once() - instance.raise_for_status.assert_called_once() - else: - mock_det.custom_prepare.prepare_data_backend() - - mock_close_file_writer.assert_called_once() - mock_stop_file_writer.assert_called_once() - - # Assert values set on detector - assert mock_det.cam.file_path.get() == "/dev/shm/zmq/" - assert ( - mock_det.cam.file_name.get() - == f"{scaninfo['eacc']}_2_{scaninfo['scan_number']:05d}" - ) - assert mock_det.cam.auto_increment.get() == 1 - assert mock_det.cam.file_number.get() == 0 - assert mock_det.cam.file_format.get() == 0 - assert mock_det.cam.file_template.get() == "%s%s_%5.5d.cbf" - # Remove last / from destinationPath - mock_create_directory.assert_called_once_with( - os.path.join(data_msgs[0]["source"][0]["destinationPath"]) - ) - assert mock_send_requests_put.call_count == 3 - - calls = [ - mock.call(url=url, data=data_msg, headers=scaninfo["headers"]) - for url, data_msg in zip(urls, data_msgs) - ] - for call, mock_call in zip(calls, mock_send_requests_put.call_args_list): - assert call == mock_call - - -def test_complete(mock_det): - path = "tmp" - mock_det.filepath_raw = path - with ( - mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, - mock.patch.object( - mock_det.custom_prepare, "publish_file_location" - ) as mock_publish_file_location, - ): - mock_det.complete() - assert mock_finished.call_count == 1 - call = mock.call(done=True, successful=True, metadata={"input_path": path}) - assert mock_publish_file_location.call_args == call - - -def test_stop(mock_det): - with ( - mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det, - mock.patch.object(mock_det.custom_prepare, "stop_file_writer") as mock_stop_file_writer, - mock.patch.object(mock_det.custom_prepare, "close_file_writer") as mock_close_file_writer, - ): - mock_det.stop() - mock_stop_det.assert_called_once() - mock_stop_file_writer.assert_called_once() - mock_close_file_writer.assert_called_once() - assert mock_det.stopped is True - - -@pytest.mark.parametrize( - "stopped, mcs_stage_state, expected_exception", - [ - (False, ophyd.Staged.no, False), - (True, ophyd.Staged.no, True), - (False, ophyd.Staged.yes, True), - ], -) -def test_finished(mock_det, stopped, mcs_stage_state, expected_exception): - with ( - mock.patch.object(mock_det, "device_manager") as mock_dm, - mock.patch.object(mock_det.custom_prepare, "stop_file_writer") as mock_stop_file_friter, - mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det, - mock.patch.object(mock_det.custom_prepare, "close_file_writer") as mock_close_file_writer, - ): - mock_dm.devices.mcs.obj._staged = mcs_stage_state - mock_det.stopped = stopped - if expected_exception: - with pytest.raises(Exception): - mock_det.timeout = 0.1 - mock_det.custom_prepare.finished() - assert mock_det.stopped is stopped - mock_stop_file_friter.assert_called() - mock_stop_det.assert_called_once() - mock_close_file_writer.assert_called_once() - else: - mock_det.custom_prepare.finished() - if stopped: - assert mock_det.stopped is stopped - - mock_stop_file_friter.assert_called() - mock_stop_det.assert_called_once() - mock_close_file_writer.assert_called_once() -- 2.49.1 From c1dee287b870823e782833f5c1a1d354aaac0afd Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 6 Jan 2026 15:13:56 +0100 Subject: [PATCH 2/4] refactor(falcon): Migrate Falcon integration to PsiDeviceBase --- csaxs_bec/devices/epics/falcon_csaxs.py | 454 +++++++++-------------- tests/tests_devices/test_falcon_csaxs.py | 151 ++++---- 2 files changed, 247 insertions(+), 358 deletions(-) diff --git a/csaxs_bec/devices/epics/falcon_csaxs.py b/csaxs_bec/devices/epics/falcon_csaxs.py index 962eb9f..3c7bec0 100644 --- a/csaxs_bec/devices/epics/falcon_csaxs.py +++ b/csaxs_bec/devices/epics/falcon_csaxs.py @@ -1,15 +1,17 @@ +"""Falcon Sitoro detector class for cSAXS beamline.""" + import enum import os import threading +from typing import Literal +from bec_lib.file_utils import get_full_path from bec_lib.logger import bec_logger from ophyd import Component as Cpt -from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV -from ophyd.mca import EpicsMCARecord -from ophyd_devices.interfaces.base_classes.psi_detector_base import ( - CustomDetectorMixin, - PSIDetectorBase, -) +from ophyd_devices import CompareStatus, FileEventSignal +from ophyd_devices.devices.areadetector.plugins import HDF5Plugin_V35 as HDF5Plugin +from ophyd_devices.devices.dxp import EpicsDXPFalcon, EpicsMCARecord, Falcon +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase logger = bec_logger.logger @@ -18,15 +20,11 @@ class FalconError(Exception): """Base class for exceptions in this module.""" -class FalconTimeoutError(FalconError): - """Raised when the Falcon does not respond in time.""" - - -class DetectorState(enum.IntEnum): +class ACQUIRESTATUS(enum.IntEnum): """Detector states for Falcon detector""" DONE = 0 - ACQUIRING = 1 + ACQUIRING = 1 # or Capturing class TriggerSource(enum.IntEnum): @@ -44,238 +42,55 @@ class MappingSource(enum.IntEnum): MAPPING = 1 -class EpicsDXPFalcon(Device): - """ - DXP parameters for Falcon detector +class FalconControl(Falcon): + """Falcon Control class at cSAXS. prefix: 'X12SA-SITORO:'""" - Base class to map EPICS PVs from DXP parameters to ophyd signals. + dxp = Cpt(EpicsDXPFalcon, "dxp1:") + mca = Cpt(EpicsMCARecord, "mca1") + hdf5 = Cpt(HDF5Plugin, "HDF1:") + + +class FalconcSAXS(PSIDeviceBase, FalconControl): + """ + Falcon Sitoro detector for CSAXS + + + class attributes: + dxp (EpicsDXPFalcon) : DXP parameters for Falcon detector + mca (EpicsMCARecord) : MCA parameters for Falcon detector + hdf5 (FalconHDF5Plugins) : HDF5 parameters for Falcon detector + MIN_READOUT (float) : Minimum readout time for the detector """ - elapsed_live_time = Cpt(EpicsSignal, "ElapsedLiveTime") - elapsed_real_time = Cpt(EpicsSignal, "ElapsedRealTime") - elapsed_trigger_live_time = Cpt(EpicsSignal, "ElapsedTriggerLiveTime") + # specify minimum readout time for detector + MIN_READOUT = 3e-3 + _pv_timeout = 3 # Timeout for PV operations in seconds - # Energy Filter PVs - energy_threshold = Cpt(EpicsSignalWithRBV, "DetectionThreshold") - min_pulse_separation = Cpt(EpicsSignalWithRBV, "MinPulsePairSeparation") - detection_filter = Cpt(EpicsSignalWithRBV, "DetectionFilter", string=True) - scale_factor = Cpt(EpicsSignalWithRBV, "ScaleFactor") - risetime_optimisation = Cpt(EpicsSignalWithRBV, "RisetimeOptimization") - - # Misc PVs - detector_polarity = Cpt(EpicsSignalWithRBV, "DetectorPolarity") - decay_time = Cpt(EpicsSignalWithRBV, "DecayTime") - - current_pixel = Cpt(EpicsSignalRO, "CurrentPixel") - - -class FalconHDF5Plugins(Device): - """ - HDF5 parameters for Falcon detector - - Base class to map EPICS PVs from HDF5 Plugin to ophyd signals. - """ - - capture = Cpt(EpicsSignalWithRBV, "Capture") - enable = Cpt(EpicsSignalWithRBV, "EnableCallbacks", string=True, kind="config") - xml_file_name = Cpt(EpicsSignalWithRBV, "XMLFileName", string=True, kind="config") - lazy_open = Cpt(EpicsSignalWithRBV, "LazyOpen", string=True, doc="0='No' 1='Yes'") - temp_suffix = Cpt(EpicsSignalWithRBV, "TempSuffix", string=True) - file_path = Cpt(EpicsSignalWithRBV, "FilePath", string=True, kind="config") - file_name = Cpt(EpicsSignalWithRBV, "FileName", string=True, kind="config") - file_template = Cpt(EpicsSignalWithRBV, "FileTemplate", string=True, kind="config") - num_capture = Cpt(EpicsSignalWithRBV, "NumCapture", kind="config") - file_write_mode = Cpt(EpicsSignalWithRBV, "FileWriteMode", kind="config") - queue_size = Cpt(EpicsSignalWithRBV, "QueueSize", kind="config") - array_counter = Cpt(EpicsSignalWithRBV, "ArrayCounter", kind="config") - - -class FalconSetup(CustomDetectorMixin): - """ - Falcon setup class for cSAXS - - Parent class: CustomDetectorMixin - - """ - - def __init__(self, *args, parent: Device = None, **kwargs) -> None: - super().__init__(*args, parent=parent, **kwargs) - self._lock = threading.RLock() + file_event = Cpt(FileEventSignal, name="file_event") def on_init(self) -> None: - """Initialize Falcon detector""" - self.initialize_default_parameter() - self.initialize_detector() - self.initialize_detector_backend() + """Initialize Falcon Sitoro detector""" + self._lock = threading.RLock() + self._readout_time = self.MIN_READOUT + self._value_pixel_per_buffer = 20 + self._queue_size = 2000 + self._full_path = "" - def initialize_default_parameter(self) -> None: + def on_connected(self): """ - Set default parameters for Falcon - - This will set: - - readout (float): readout time in seconds - - value_pixel_per_buffer (int): number of spectra in buffer of Falcon Sitoro - + Setup Falcon Sitoro detector default parameters once signals are connected """ - self.parent.value_pixel_per_buffer = 20 - self.update_readout_time() - - def update_readout_time(self) -> None: - """Set readout time for Eiger9M detector""" - readout_time = ( - self.parent.scaninfo.readout_time - if hasattr(self.parent.scaninfo, "readout_time") - else self.parent.MIN_READOUT - ) - self.parent.readout_time = max(readout_time, self.parent.MIN_READOUT) - - def initialize_detector(self) -> None: - """Initialize Falcon detector""" - self.stop_detector() - self.stop_detector_backend() + self._initialize_detector() + self._initialize_detector_backend() self.set_trigger( mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0 ) - # 1 Realtime - self.parent.preset_mode.put(1) - # 0 Normal, 1 Inverted - self.parent.input_logic_polarity.put(0) - # 0 Manual 1 Auto - self.parent.auto_pixels_per_buffer.put(0) - # Sets the number of pixels/spectra in the buffer - self.parent.pixels_per_buffer.put(self.parent.value_pixel_per_buffer) - - def initialize_detector_backend(self) -> None: - """Initialize the detector backend for Falcon.""" - self.parent.hdf5.enable.put(1) - # file location of h5 layout for cSAXS - self.parent.hdf5.xml_file_name.put("layout.xml") - # TODO Check if lazy open is needed and wanted! - self.parent.hdf5.lazy_open.put(1) - self.parent.hdf5.temp_suffix.put("") - # size of queue for number of spectra allowed in the buffer, if too small at high throughput, data is lost - self.parent.hdf5.queue_size.put(2000) - # Segmentation into Spectra within EPICS, 1 is activate, 0 is deactivate - self.parent.nd_array_mode.put(1) - - def on_stage(self) -> None: - """Prepare detector and backend for acquisition""" - self.prepare_detector() - self.prepare_data_backend() - self.publish_file_location(done=False, successful=False) - self.arm_acquisition() - - def prepare_detector(self) -> None: - """Prepare detector for acquisition""" - self.set_trigger( - mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0 - ) - self.parent.preset_real.put(self.parent.scaninfo.exp_time) - self.parent.pixels_per_run.put( - int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger) - ) - - def prepare_data_backend(self) -> None: - """Prepare data backend for acquisition""" - self.parent.filepath.set( - self.parent.filewriter.compile_full_filename(f"{self.parent.name}.h5") - ).wait() - file_path, file_name = os.path.split(self.parent.filepath.get()) - self.parent.hdf5.file_path.put(file_path) - self.parent.hdf5.file_name.put(file_name) - self.parent.hdf5.file_template.put("%s%s") - self.parent.hdf5.num_capture.put( - int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger) - ) - self.parent.hdf5.file_write_mode.put(2) - # Reset spectrum counter in filewriter, used for indexing & identifying missing triggers - self.parent.hdf5.array_counter.put(0) - # Start file writing - self.parent.hdf5.capture.put(1) - - def arm_acquisition(self) -> None: - """Arm detector for acquisition""" - self.parent.start_all.put(1) - signal_conditions = [ - ( - lambda: self.parent.state.read()[self.parent.state.name]["value"], - DetectorState.ACQUIRING, - ) - ] - if not self.wait_for_signals( - signal_conditions=signal_conditions, - timeout=self.parent.TIMEOUT_FOR_SIGNALS, - check_stopped=True, - all_signals=False, - ): - raise FalconTimeoutError( - f"Failed to arm the acquisition. Detector state {signal_conditions[0][0]}" - ) - - def on_unstage(self) -> None: - """Unstage detector and backend""" - pass - - def on_complete(self) -> None: - """Complete detector and backend""" - self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS) - self.publish_file_location(done=True, successful=True) - - def on_stop(self) -> None: - """Stop detector and backend""" - self.stop_detector() - self.stop_detector_backend() - - def stop_detector(self) -> None: - """Stops detector""" - - self.parent.stop_all.put(1) - self.parent.erase_all.put(1) - - signal_conditions = [ - (lambda: self.parent.state.read()[self.parent.state.name]["value"], DetectorState.DONE) - ] - - if not self.wait_for_signals( - signal_conditions=signal_conditions, - timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2, - all_signals=False, - ): - # Retry stop detector and wait for remaining time - raise FalconTimeoutError( - f"Failed to stop detector, timeout with state {signal_conditions[0][0]}" - ) - - def stop_detector_backend(self) -> None: - """Stop the detector backend""" - self.parent.hdf5.capture.put(0) - - def finished(self, timeout: int = 5) -> None: - """Check if scan finished succesfully""" - with self._lock: - total_frames = int( - self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger - ) - signal_conditions = [ - (self.parent.dxp.current_pixel.get, total_frames), - (self.parent.hdf5.array_counter.get, total_frames), - ] - if not self.wait_for_signals( - signal_conditions=signal_conditions, - timeout=timeout, - check_stopped=True, - all_signals=True, - ): - logger.debug( - f"Falcon missed a trigger: received trigger {self.parent.dxp.current_pixel.get()}," - f" send data {self.parent.hdf5.array_counter.get()} from total_frames" - f" {total_frames}" - ) - self.stop_detector() - self.stop_detector_backend() def set_trigger( - self, mapping_mode: MappingSource, trigger_source: TriggerSource, ignore_gate: int = 0 + self, + mapping_mode: MappingSource, + trigger_source: TriggerSource, + ignore_gate: Literal[0, 1] = 0, ) -> None: """ Set triggering mode for detector @@ -287,63 +102,144 @@ class FalconSetup(CustomDetectorMixin): """ mapping = int(mapping_mode) - trigger = trigger_source - self.parent.collect_mode.put(mapping) - self.parent.pixel_advance_mode.put(trigger) - self.parent.ignore_gate.put(ignore_gate) + trigger = int(trigger_source) + self.collect_mode.put(mapping) + self.pixel_advance_mode.put(trigger) + self.ignore_gate.put(ignore_gate) + def _initialize_detector(self) -> None: + """Initialize Falcon detector""" + self.stop_detector() + self.stop_detector_backend() + self.set_trigger( + mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0 + ) -class FalconcSAXS(PSIDetectorBase): - """ - Falcon Sitoro detector for CSAXS + # 1 Realtime + self.preset_mode.put(1) - Parent class: PSIDetectorBase + # 0 Normal, 1 Inverted + self.input_logic_polarity.put(0) - class attributes: - custom_prepare_cls (FalconSetup) : Custom detector setup class for cSAXS, - inherits from CustomDetectorMixin - PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector - dxp (EpicsDXPFalcon) : DXP parameters for Falcon detector - mca (EpicsMCARecord) : MCA parameters for Falcon detector - hdf5 (FalconHDF5Plugins) : HDF5 parameters for Falcon detector - MIN_READOUT (float) : Minimum readout time for the detector - """ + # 0 Manual 1 Auto + self.auto_pixels_per_buffer.put(0) - # Specify which functions are revealed to the user in BEC client - USER_ACCESS = ["describe"] + # Sets the number of pixels/spectra in the buffer + self.pixels_per_buffer.put(self._value_pixel_per_buffer) - # specify Setup class - custom_prepare_cls = FalconSetup - # specify minimum readout time for detector - MIN_READOUT = 3e-3 - TIMEOUT_FOR_SIGNALS = 5 + def _initialize_detector_backend(self) -> None: + """Initialize the detector backend for Falcon.""" + # Enable HDF5 plugin + self.hdf5.enable.put(1) - # specify class attributes - dxp = Cpt(EpicsDXPFalcon, "dxp1:") - mca = Cpt(EpicsMCARecord, "mca1") - hdf5 = Cpt(FalconHDF5Plugins, "HDF1:") + # Use layout.xml file for cSAXS Falcon. FIXME:Should be checked if IOC runs on different host. + self.hdf5.xml_file_name.put("layout.xml") - stop_all = Cpt(EpicsSignal, "StopAll") - erase_all = Cpt(EpicsSignal, "EraseAll") - start_all = Cpt(EpicsSignal, "StartAll") - state = Cpt(EpicsSignal, "Acquiring") - preset_mode = Cpt(EpicsSignal, "PresetMode") # 0 No preset 1 Real time 2 Events 3 Triggers - preset_real = Cpt(EpicsSignal, "PresetReal") - preset_events = Cpt(EpicsSignal, "PresetEvents") - preset_triggers = Cpt(EpicsSignal, "PresetTriggers") - triggers = Cpt(EpicsSignalRO, "MaxTriggers", lazy=True) - events = Cpt(EpicsSignalRO, "MaxEvents", lazy=True) - input_count_rate = Cpt(EpicsSignalRO, "MaxInputCountRate", lazy=True) - output_count_rate = Cpt(EpicsSignalRO, "MaxOutputCountRate", lazy=True) - collect_mode = Cpt(EpicsSignal, "CollectMode") # 0 MCA spectra, 1 MCA mapping - pixel_advance_mode = Cpt(EpicsSignal, "PixelAdvanceMode") - ignore_gate = Cpt(EpicsSignal, "IgnoreGate") - input_logic_polarity = Cpt(EpicsSignal, "InputLogicPolarity") - auto_pixels_per_buffer = Cpt(EpicsSignal, "AutoPixelsPerBuffer") - pixels_per_buffer = Cpt(EpicsSignal, "PixelsPerBuffer") - pixels_per_run = Cpt(EpicsSignal, "PixelsPerRun") - nd_array_mode = Cpt(EpicsSignal, "NDArrayMode") + # TODO Check if lazy open is needed and wanted! + self.hdf5.lazy_open.put(1) + self.hdf5.temp_suffix.put("") + + # Size of the queue for the number of spectra allowed in the buffer. If too small, data is lost at high throughput + self.hdf5.queue_size.put(self._queue_size) + + # Set nd_array mode to 1: This means segmentation into Spectra within EPICS, 1 is activate, 0 is deactivate + self.nd_array_mode.put(1) + + def on_stage(self): + """ + This method is called when the detector is staged for acquisition. + We use the information in scan_info.msg about the upcoming scan to set all relevant parameters on the detector. + """ + # Calculate relevant parameters + num_points = self.scan_info.msg.num_points + frames_per_trigger = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1) + overall_frames = int(num_points * frames_per_trigger) + exp_time = self.scan_info.exp_time + self._full_path = get_full_path(self.scan_info.msg, self.name) + + # Check that exposure time is larger than readout time + readout_time = max( + self.scan_info.msg.scan_parameters.get("readout_time", self.MIN_READOUT), + self.MIN_READOUT, + ) + if exp_time < readout_time: + raise ValueError( + f"Exposure time {exp_time} is less than minimum readout time {readout_time}" + ) + + # TODO: Add h5_entries for linking the Falcon NEXUS entries with the master file + self.file_event.put(file_path=self._full_path, done=False, successful=False) + + self.preset_real.put(exp_time) + self.pixels_per_run.put(overall_frames) + + # Prepare detector backend PVs + file_path, file_name = os.path.split(self._full_path) + self.hdf5.file_path.put(file_path) + self.hdf5.file_name.put(file_name) + self.hdf5.file_template.put("%s%s") + self.hdf5.num_capture.put(overall_frames) + self.hdf5.file_write_mode.put(2) + # Reset spectrum counter in filewriter, used for indexing & identifying missing triggers + self.hdf5.array_counter.put(0) + + # Start file writing + self.hdf5.capture.put(1) + # Start the acquisition + self.start_all.put(1) + + def on_pre_scan(self): + """ + Method for actions just before the scan starts. + """ + status_camera = CompareStatus( + self.cam.acquire_busy, ACQUIRESTATUS.DONE, timeout=self._pv_timeout + ) + status_writer = CompareStatus( + self.hdf.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout + ) + # Logical combine of statuses + status = status_camera & status_writer + self.cancel_on_stop(status) + return status + + def _complete_callback(self, status: CompareStatus) -> None: + """Callback for when the device completes a scan.""" + # FIXME Add proper h5 entries once checked + if status.success: + self.file_event.put( + file_path=self._full_path, # pylint: disable:protected-access + done=True, + successful=True, + ) + else: + self.file_event.put( + file_path=self._full_path, # pylint: disable:protected-access + done=True, + successful=False, + ) + + def on_complete(self) -> None: + """Complete detector and backend""" + # Calculate relevant parameters + num_points = self.scan_info.msg.num_points + frames_per_trigger = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1) + overall_frames = int(num_points * frames_per_trigger) + + status_detector = CompareStatus(self.dxp.current_pixel, overall_frames, run=True) + status_backend = CompareStatus(self.hdf5.array_counter, overall_frames, run=True) + + status = status_detector & status_backend + self.cancel_on_stop(status) + status.add_callback(self._complete_callback) + return status + + def on_stop(self) -> None: + """Stop detector and backend""" + self.stop_all.put(1) + self.hdf5.capture.put(0) + self.erase_all.put(1) if __name__ == "__main__": - falcon = FalconcSAXS(name="falcon", prefix="X12SA-SITORO:", sim_mode=True) + falcon = FalconcSAXS(name="falcon", prefix="X12SA-SITORO:") diff --git a/tests/tests_devices/test_falcon_csaxs.py b/tests/tests_devices/test_falcon_csaxs.py index cc1f537..18a7b8a 100644 --- a/tests/tests_devices/test_falcon_csaxs.py +++ b/tests/tests_devices/test_falcon_csaxs.py @@ -1,6 +1,7 @@ # pylint: skip-file import os import threading +from typing import Generator from unittest import mock import ophyd @@ -8,80 +9,74 @@ import pytest from bec_lib import messages from bec_lib.endpoints import MessageEndpoints from bec_server.device_server.tests.utils import DMMock -from ophyd_devices.tests.utils import MockPV +from ophyd_devices.tests.utils import patched_device -from csaxs_bec.devices.epics.falcon_csaxs import FalconcSAXS, FalconTimeoutError -from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs +from csaxs_bec.devices.epics.falcon_csaxs import FalconcSAXS @pytest.fixture(scope="function") -def mock_det(): - name = "falcon" - prefix = "X12SA-SITORO:" +def mock_det() -> Generator[FalconcSAXS, None, None]: + """Fixture to mock the FalconcSAXS device.""" + name = "mcs_csaxs" + prefix = "X12SA-MCS-CSAXS:" dm = DMMock() - with mock.patch.object(dm, "connector"): - with ( - mock.patch( - "ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter" - ) as filemixin, - mock.patch( - "ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config" - ) as mock_service_config, - ): - with mock.patch.object(ophyd, "cl") as mock_cl: - mock_cl.get_pv = MockPV - mock_cl.thread_class = threading.Thread - with mock.patch.object(FalconcSAXS, "_init"): - det = FalconcSAXS(name=name, prefix=prefix, device_manager=dm) - patch_dual_pvs(det) - det.TIMEOUT_FOR_SIGNALS = 0.1 - yield det + with patched_device( + FalconcSAXS, + name="falcon", + prefix="X12SA-SITORO:", + device_manager=dm, + _mock_pv_initial_value=0, + ) as dev: + try: + yield dev + finally: + dev.destroy() -@pytest.mark.parametrize( - "trigger_source, mapping_source, ignore_gate, pixels_per_buffer, detector_state," - " expected_exception", - [(1, 1, 0, 20, 0, False), (1, 1, 0, 20, 1, True)], -) -# TODO rewrite this one, write test for init_detector, init_filewriter is tested -def test_init_detector( - mock_det, - trigger_source, - mapping_source, - ignore_gate, - pixels_per_buffer, - detector_state, - expected_exception, -): - """Test the _init function: +# @pytest.mark.parametrize( +# "trigger_source, mapping_source, ignore_gate, pixels_per_buffer, detector_state," +# " expected_exception", +# [(1, 1, 0, 20, 0, False), (1, 1, 0, 20, 1, True)], +# ) +# # TODO rewrite this one, write test for init_detector, init_filewriter is tested +# def test_init_detector( +# mock_det, +# trigger_source, +# mapping_source, +# ignore_gate, +# pixels_per_buffer, +# detector_state, +# expected_exception, +# ): +# """Test the _init function: - This includes testing the functions: - - _init_detector - - _stop_det - - _set_trigger - --> Testing the filewriter is done in test_init_filewriter +# This includes testing the functions: +# - _init_detector +# - _stop_det +# - _set_trigger +# --> Testing the filewriter is done in test_init_filewriter - Validation upon setting the correct PVs +# Validation upon setting the correct PVs - """ - mock_det.value_pixel_per_buffer = pixels_per_buffer - mock_det.state._read_pv.mock_data = detector_state - if expected_exception: - with pytest.raises(FalconTimeoutError): - mock_det.timeout = 0.1 - mock_det.custom_prepare.initialize_detector() - else: - mock_det.custom_prepare.initialize_detector() - assert mock_det.state.get() == detector_state - assert mock_det.collect_mode.get() == mapping_source - assert mock_det.pixel_advance_mode.get() == trigger_source - assert mock_det.ignore_gate.get() == ignore_gate +# """ +# mock_det.value_pixel_per_buffer = pixels_per_buffer +# mock_det.state._read_pv.mock_data = detector_state +# if expected_exception: +# with pytest.raises(FalconTimeoutError): +# mock_det.timeout = 0.1 +# mock_det.custom_prepare.initialize_detector() +# else: +# mock_det.custom_prepare.initialize_detector() +# assert mock_det.state.get() == detector_state +# assert mock_det.collect_mode.get() == mapping_source +# assert mock_det.pixel_advance_mode.get() == trigger_source +# assert mock_det.ignore_gate.get() == ignore_gate - assert mock_det.preset_mode.get() == 1 - assert mock_det.erase_all.get() == 1 - assert mock_det.input_logic_polarity.get() == 0 - assert mock_det.auto_pixels_per_buffer.get() == 0 - assert mock_det.pixels_per_buffer.get() == pixels_per_buffer +# assert mock_det.preset_mode.get() == 1 +# assert mock_det.erase_all.get() == 1 +# assert mock_det.input_logic_polarity.get() == 0 +# assert mock_det.auto_pixels_per_buffer.get() == 0 +# assert mock_det.pixels_per_buffer.get() == pixels_per_buffer @pytest.mark.parametrize( @@ -89,12 +84,12 @@ def test_init_detector( ) def test_update_readout_time(mock_det, readout_time, expected_value): if readout_time is None: - mock_det.custom_prepare.update_readout_time() - assert mock_det.readout_time == expected_value + mock_det._update_readout_time() + assert mock_det._readout_time == expected_value else: - mock_det.scaninfo.readout_time = readout_time - mock_det.custom_prepare.update_readout_time() - assert mock_det.readout_time == expected_value + mock_det.scan_info.readout_time = readout_time + mock_det._update_readout_time() + assert mock_det._readout_time == expected_value def test_initialize_default_parameter(mock_det): @@ -109,17 +104,15 @@ def test_initialize_default_parameter(mock_det): @pytest.mark.parametrize( "scaninfo", [ - ( - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "exp_time": 0.1, - "filepath": "test.h5", - "scan_id": "123", - "mokev": 12.4, - } - ) + { + "eacc": "e12345", + "num_points": 500, + "frames_per_trigger": 1, + "exp_time": 0.1, + "filepath": "test.h5", + "scan_id": "123", + "mokev": 12.4, + } ], ) def test_stage(mock_det, scaninfo): -- 2.49.1 From 4b95ebace31ed24f23990473121761d1ddc89c9b Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 6 Jan 2026 17:57:31 +0100 Subject: [PATCH 3/4] test(falcon): fix test after falcon refactoring --- csaxs_bec/devices/epics/falcon_csaxs.py | 19 +- tests/tests_devices/test_falcon_csaxs.py | 435 ++++++++++------------- 2 files changed, 196 insertions(+), 258 deletions(-) diff --git a/csaxs_bec/devices/epics/falcon_csaxs.py b/csaxs_bec/devices/epics/falcon_csaxs.py index 3c7bec0..da12e6b 100644 --- a/csaxs_bec/devices/epics/falcon_csaxs.py +++ b/csaxs_bec/devices/epics/falcon_csaxs.py @@ -80,6 +80,7 @@ class FalconcSAXS(PSIDeviceBase, FalconControl): """ Setup Falcon Sitoro detector default parameters once signals are connected """ + self.on_stop() self._initialize_detector() self._initialize_detector_backend() self.set_trigger( @@ -109,11 +110,6 @@ class FalconcSAXS(PSIDeviceBase, FalconControl): def _initialize_detector(self) -> None: """Initialize Falcon detector""" - self.stop_detector() - self.stop_detector_backend() - self.set_trigger( - mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0 - ) # 1 Realtime self.preset_mode.put(1) @@ -141,6 +137,8 @@ class FalconcSAXS(PSIDeviceBase, FalconControl): # Size of the queue for the number of spectra allowed in the buffer. If too small, data is lost at high throughput self.hdf5.queue_size.put(self._queue_size) + self.hdf5.file_template.put("%s%s") + self.hdf5.file_write_mode.put(2) # Set nd_array mode to 1: This means segmentation into Spectra within EPICS, 1 is activate, 0 is deactivate self.nd_array_mode.put(1) @@ -154,7 +152,7 @@ class FalconcSAXS(PSIDeviceBase, FalconControl): num_points = self.scan_info.msg.num_points frames_per_trigger = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1) overall_frames = int(num_points * frames_per_trigger) - exp_time = self.scan_info.exp_time + exp_time = self.scan_info.msg.scan_parameters["exp_time"] self._full_path = get_full_path(self.scan_info.msg, self.name) # Check that exposure time is larger than readout time @@ -170,16 +168,15 @@ class FalconcSAXS(PSIDeviceBase, FalconControl): # TODO: Add h5_entries for linking the Falcon NEXUS entries with the master file self.file_event.put(file_path=self._full_path, done=False, successful=False) - self.preset_real.put(exp_time) + self.preset_real_time.put(exp_time) self.pixels_per_run.put(overall_frames) # Prepare detector backend PVs file_path, file_name = os.path.split(self._full_path) self.hdf5.file_path.put(file_path) self.hdf5.file_name.put(file_name) - self.hdf5.file_template.put("%s%s") + self.hdf5.num_capture.put(overall_frames) - self.hdf5.file_write_mode.put(2) # Reset spectrum counter in filewriter, used for indexing & identifying missing triggers self.hdf5.array_counter.put(0) @@ -193,10 +190,10 @@ class FalconcSAXS(PSIDeviceBase, FalconControl): Method for actions just before the scan starts. """ status_camera = CompareStatus( - self.cam.acquire_busy, ACQUIRESTATUS.DONE, timeout=self._pv_timeout + self.acquire_busy, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout ) status_writer = CompareStatus( - self.hdf.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout + self.hdf5.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout ) # Logical combine of statuses status = status_camera & status_writer diff --git a/tests/tests_devices/test_falcon_csaxs.py b/tests/tests_devices/test_falcon_csaxs.py index 18a7b8a..058e7ae 100644 --- a/tests/tests_devices/test_falcon_csaxs.py +++ b/tests/tests_devices/test_falcon_csaxs.py @@ -8,10 +8,17 @@ import ophyd import pytest from bec_lib import messages from bec_lib.endpoints import MessageEndpoints +from bec_lib.file_utils import get_full_path from bec_server.device_server.tests.utils import DMMock +from ophyd_devices.interfaces.base_classes.psi_device_base import DeviceStoppedError from ophyd_devices.tests.utils import patched_device -from csaxs_bec.devices.epics.falcon_csaxs import FalconcSAXS +from csaxs_bec.devices.epics.falcon_csaxs import ( + ACQUIRESTATUS, + FalconcSAXS, + MappingSource, + TriggerSource, +) @pytest.fixture(scope="function") @@ -28,264 +35,198 @@ def mock_det() -> Generator[FalconcSAXS, None, None]: _mock_pv_initial_value=0, ) as dev: try: + for dotted_name, device in dev.walk_subdevices(include_lazy=True): + device.stage_sigs = {} # Remove stage signals + device.trigger_sigs = {} # Remove trigger signals + if hasattr(device, "plugin_type"): + device.plugin_type._read_pv.mock_data = device._plugin_type yield dev finally: dev.destroy() -# @pytest.mark.parametrize( -# "trigger_source, mapping_source, ignore_gate, pixels_per_buffer, detector_state," -# " expected_exception", -# [(1, 1, 0, 20, 0, False), (1, 1, 0, 20, 1, True)], -# ) -# # TODO rewrite this one, write test for init_detector, init_filewriter is tested -# def test_init_detector( -# mock_det, -# trigger_source, -# mapping_source, -# ignore_gate, -# pixels_per_buffer, -# detector_state, -# expected_exception, -# ): -# """Test the _init function: - -# This includes testing the functions: -# - _init_detector -# - _stop_det -# - _set_trigger -# --> Testing the filewriter is done in test_init_filewriter - -# Validation upon setting the correct PVs - -# """ -# mock_det.value_pixel_per_buffer = pixels_per_buffer -# mock_det.state._read_pv.mock_data = detector_state -# if expected_exception: -# with pytest.raises(FalconTimeoutError): -# mock_det.timeout = 0.1 -# mock_det.custom_prepare.initialize_detector() -# else: -# mock_det.custom_prepare.initialize_detector() -# assert mock_det.state.get() == detector_state -# assert mock_det.collect_mode.get() == mapping_source -# assert mock_det.pixel_advance_mode.get() == trigger_source -# assert mock_det.ignore_gate.get() == ignore_gate - -# assert mock_det.preset_mode.get() == 1 -# assert mock_det.erase_all.get() == 1 -# assert mock_det.input_logic_polarity.get() == 0 -# assert mock_det.auto_pixels_per_buffer.get() == 0 -# assert mock_det.pixels_per_buffer.get() == pixels_per_buffer +def test_falcon_init(mock_det: FalconcSAXS): + """Test the initialization of the FalconcSAXS device.""" + assert mock_det._readout_time == mock_det.MIN_READOUT + assert mock_det._value_pixel_per_buffer == 20 + assert mock_det._queue_size == 2000 + assert mock_det._full_path == "" -@pytest.mark.parametrize( - "readout_time, expected_value", [(1e-3, 3e-3), (3e-3, 3e-3), (5e-3, 5e-3), (None, 3e-3)] -) -def test_update_readout_time(mock_det, readout_time, expected_value): - if readout_time is None: - mock_det._update_readout_time() - assert mock_det._readout_time == expected_value - else: - mock_det.scan_info.readout_time = readout_time - mock_det._update_readout_time() - assert mock_det._readout_time == expected_value +def test_falcon_on_connected(mock_det: FalconcSAXS): + """Test the on_connected method of the FalconcSAXS device.""" + falcon = mock_det + + # Set known default values + falcon.preset_mode.put(-1) + falcon.input_logic_polarity.put(-1) + falcon.auto_pixels_per_buffer.put(-1) + falcon.hdf5.enable.put(-1) + + with ( + mock.patch.object(falcon, "on_stop") as mock_on_stop, + mock.patch.object(falcon, "set_trigger") as mock_set_trigger, + ): + + falcon.on_connected() + mock_on_stop.assert_called_once() + mock_set_trigger.assert_called_once_with( + mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0 + ) + + # Detector default PV values + assert falcon.preset_mode.get() == "1" # Real Time + assert falcon.input_logic_polarity.get() == 0 + assert falcon.auto_pixels_per_buffer.get() == 0 + assert falcon.pixels_per_buffer.get() == falcon._value_pixel_per_buffer + + # Backend default PV values + assert falcon.hdf5.enable.get() == "1" # Enabled + assert falcon.hdf5.xml_file_name.get() == "layout.xml" + assert falcon.hdf5.lazy_open.get() == "1" # Enabled + assert falcon.hdf5.temp_suffix.get() == "" + assert falcon.hdf5.queue_size.get() == falcon._queue_size + assert falcon.nd_array_mode.get() == 1 + assert falcon.hdf5.file_template.get() == "%s%s" + assert falcon.hdf5.file_write_mode.get() == 2 -def test_initialize_default_parameter(mock_det): - with mock.patch.object( - mock_det.custom_prepare, "update_readout_time" - ) as mock_update_readout_time: - mock_det.custom_prepare.initialize_default_parameter() - assert mock_det.value_pixel_per_buffer == 20 - mock_update_readout_time.assert_called_once() - - -@pytest.mark.parametrize( - "scaninfo", - [ - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "exp_time": 0.1, - "filepath": "test.h5", - "scan_id": "123", - "mokev": 12.4, - } - ], -) -def test_stage(mock_det, scaninfo): - """Test the stage function: - - This includes testing _prep_det +def test_falcon_on_stage(mock_det: FalconcSAXS): """ - with ( - mock.patch.object(mock_det.custom_prepare, "set_trigger") as mock_set_trigger, - mock.patch.object( - mock_det.custom_prepare, "prepare_data_backend" - ) as mock_prep_data_backend, - mock.patch.object( - mock_det.custom_prepare, "publish_file_location" - ) as mock_publish_file_location, - mock.patch.object(mock_det.custom_prepare, "arm_acquisition") as mock_arm_acquisition, - ): - mock_det.scaninfo.exp_time = scaninfo["exp_time"] - mock_det.scaninfo.num_points = scaninfo["num_points"] - mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] - mock_det.stage() - mock_set_trigger.assert_called_once() - assert mock_det.preset_real.get() == scaninfo["exp_time"] - assert mock_det.pixels_per_run.get() == int( - scaninfo["num_points"] * scaninfo["frames_per_trigger"] - ) - mock_prep_data_backend.assert_called_once() - mock_publish_file_location.assert_called_once_with(done=False, successful=False) - mock_arm_acquisition.assert_called_once() + + Test the on_stage method of the FalconcSAXS device. + All relevant information is available in the scan_info attribute and used + to bootstrap the detector for the upcoming acquisition. Two scenarios are tested: + I. Normal case with exposure time larger than readout time + II. Case where exposure time is smaller than readout time, which should raise an exception. + """ + falcon = mock_det + num_points = 10 + exp_time = 0.2 + frames_per_trigger = 5 + falcon.scan_info.msg.num_points = num_points + falcon.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger + falcon.scan_info.msg.scan_parameters["exp_time"] = exp_time + falcon.hdf5.array_counter.put(5) # Set to non-zero to check reset + + # I. Normal case + old_file_event = falcon.file_event.get() + with mock.patch.object(falcon.hdf5.array_size, "get", return_value=(1, 1, 1)): + falcon.stage() + + assert falcon.staged is ophyd.Staged.yes + assert falcon._full_path == get_full_path(falcon.scan_info.msg, falcon.name) + file_path = falcon.hdf5.file_path.get() + file_name = falcon.hdf5.file_name.get() + assert os.path.join(file_path, file_name) == falcon._full_path + + assert falcon.preset_real_time.get() == exp_time + assert falcon.pixels_per_run.get() == num_points * frames_per_trigger + assert falcon.hdf5.num_capture.get() == num_points * frames_per_trigger + assert falcon.hdf5.array_counter.get() == 0 + assert falcon.hdf5.capture.get() == 1 + assert falcon.start_all.get() == 1 + + # II. Unstage device first + falcon.unstage() + exp_time = 1e-3 # Smaller than readout time + falcon.scan_info.msg.scan_parameters["exp_time"] = exp_time + with pytest.raises(ValueError): + falcon.stage() + assert falcon.staged is not ophyd.Staged.no -@pytest.mark.parametrize( - "scaninfo", - [ - ( - { - "filepath": "/das/work/p18/p18533/data/S00000-S00999/S00001/data.h5", - "num_points": 500, - "frames_per_trigger": 1, - } - ), - ( - { - "filepath": "/das/work/p18/p18533/data/S00000-S00999/S00001/data1234.h5", - "num_points": 500, - "frames_per_trigger": 1, - } - ), - ], -) -def test_prepare_data_backend(mock_det, scaninfo): - mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"] - mock_det.scaninfo.num_points = scaninfo["num_points"] - mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] - mock_det.scaninfo.scan_number = 1 - mock_det.custom_prepare.prepare_data_backend() - file_path, file_name = os.path.split(scaninfo["filepath"]) - assert mock_det.hdf5.file_path.get() == file_path - assert mock_det.hdf5.file_name.get() == file_name - assert mock_det.hdf5.file_template.get() == "%s%s" - assert mock_det.hdf5.num_capture.get() == int( - scaninfo["num_points"] * scaninfo["frames_per_trigger"] +def test_falcon_on_pre_scan(mock_det: FalconcSAXS): + """Test the on_pre_scan method of the FalconcSAXS device.""" + falcon = mock_det + # I. Test normal case with success + falcon.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.DONE + falcon.hdf5.capture._read_pv.mock_data = ACQUIRESTATUS.DONE + falcon = mock_det + st = falcon.on_pre_scan() + assert st.done is False + assert st.success is False + falcon.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.ACQUIRING + assert st.done is False + assert st.success is False + falcon.hdf5.capture._read_pv.mock_data = ACQUIRESTATUS.ACQUIRING + st.wait(3) + assert st.done is True + assert st.success is True + + # II. Test abort case with stop called + falcon.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.DONE + falcon.hdf5.capture._read_pv.mock_data = ACQUIRESTATUS.DONE + st = falcon.on_pre_scan() + assert st.done is False + assert st.success is False + falcon.stop() + with pytest.raises(DeviceStoppedError): + st.wait(3) + assert st.done is True + assert st.success is False + + +def test_falcon_stop(mock_det: FalconcSAXS): + """Test the stop method of the FalconcSAXS device.""" + falcon = mock_det + + falcon.stop_all.put(0) + falcon.hdf5.capture.put(1) + falcon.erase_all.put(0) + falcon.stop() + assert falcon.stop_all.get() == 1 + assert falcon.hdf5.capture.get() == 0 + assert falcon.erase_all.get() == 1 + + +def test_falcon_complete(mock_det: FalconcSAXS): + """Test the complete method of the FalconcSAXS device.""" + falcon = mock_det + + num_points = 10 + frames_per_trigger = 5 + falcon.scan_info.msg.num_points = num_points + falcon.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger + + # I. Test normal case with success + falcon.dxp.current_pixel._read_pv.mock_data = num_points * frames_per_trigger - 1 + falcon.hdf5.array_counter._read_pv.mock_data = num_points * frames_per_trigger - 1 + falcon._full_path = "/tmp/fake_path/test.h5" + st = falcon.on_complete() + assert st.done is False + assert st.success is False + falcon.dxp.current_pixel._read_pv.mock_data = num_points * frames_per_trigger + assert st.done is False + assert st.success is False + falcon.hdf5.array_counter._read_pv.mock_data = num_points * frames_per_trigger + st.wait(3) + assert st.done is True + assert st.success is True + assert falcon.file_event.get() == messages.FileMessage( + file_path="/tmp/fake_path/test.h5", + done=True, + successful=True, + device_name=falcon.name, + file_type="h5", + hinted_h5_entries=None, + metadata={}, ) - assert mock_det.hdf5.file_write_mode.get() == 2 - assert mock_det.hdf5.array_counter.get() == 0 - assert mock_det.hdf5.capture.get() == 1 - -@pytest.mark.parametrize( - "scaninfo", - [ - ({"filepath": "test.h5", "successful": True, "done": False, "scan_id": "123"}), - ({"filepath": "test.h5", "successful": False, "done": True, "scan_id": "123"}), - ], -) -def test_publish_file_location(mock_det, scaninfo): - mock_det.scaninfo.scan_id = scaninfo["scan_id"] - mock_det.filepath.set(scaninfo["filepath"]).wait() - mock_det.custom_prepare.publish_file_location( - done=scaninfo["done"], successful=scaninfo["successful"] + # II. Test case where acquisition fails due to interruption + falcon.dxp.current_pixel._read_pv.mock_data = num_points * frames_per_trigger - 1 + st = falcon.on_complete() + assert st.done is False + assert st.success is False + falcon.stop() + with pytest.raises(DeviceStoppedError): + st.wait(3) + assert falcon.file_event.get() == messages.FileMessage( + file_path="/tmp/fake_path/test.h5", + done=True, + successful=False, + device_name=falcon.name, + file_type="h5", + hinted_h5_entries=None, + metadata={}, ) - if scaninfo["successful"] is None: - msg = messages.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"]) - else: - msg = messages.FileMessage( - file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"] - ) - expected_calls = [ - mock.call( - MessageEndpoints.public_file(scaninfo["scan_id"], mock_det.name), - msg, - pipe=mock_det.connector.pipeline.return_value, - ), - mock.call( - MessageEndpoints.file_event(mock_det.name), - msg, - pipe=mock_det.connector.pipeline.return_value, - ), - ] - assert mock_det.connector.set_and_publish.call_args_list == expected_calls - - -@pytest.mark.parametrize("detector_state, expected_exception", [(1, False), (0, True)]) -def test_arm_acquisition(mock_det, detector_state, expected_exception): - with mock.patch.object(mock_det, "stop") as mock_stop: - mock_det.state._read_pv.mock_data = detector_state - if expected_exception: - with pytest.raises(FalconTimeoutError): - mock_det.timeout = 0.1 - mock_det.custom_prepare.arm_acquisition() - mock_stop.assert_called_once() - else: - mock_det.custom_prepare.arm_acquisition() - assert mock_det.start_all.get() == 1 - - -def test_trigger(mock_det): - with mock.patch.object(mock_det.custom_prepare, "on_trigger") as mock_on_trigger: - mock_det.trigger() - mock_on_trigger.assert_called_once() - - -def test_complete(mock_det): - with ( - mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, - mock.patch.object( - mock_det.custom_prepare, "publish_file_location" - ) as mock_publish_file_location, - ): - mock_det.stopped = False - mock_det.complete() - assert mock_finished.call_count == 1 - call = mock.call(done=True, successful=True) - assert mock_publish_file_location.call_args == call - - -def test_stop(mock_det): - with ( - mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det, - mock.patch.object( - mock_det.custom_prepare, "stop_detector_backend" - ) as mock_stop_detector_backend, - ): - mock_det.stop() - mock_stop_det.assert_called_once() - mock_stop_detector_backend.assert_called_once() - assert mock_det.stopped is True - - -@pytest.mark.parametrize( - "stopped, scaninfo", - [ - (False, {"num_points": 500, "frames_per_trigger": 1}), - (True, {"num_points": 500, "frames_per_trigger": 1}), - ], -) -def test_finished(mock_det, stopped, scaninfo): - with ( - mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det, - mock.patch.object( - mock_det.custom_prepare, "stop_detector_backend" - ) as mock_stop_file_writer, - ): - mock_det.stopped = stopped - mock_det.dxp.current_pixel._read_pv.mock_data = int( - scaninfo["num_points"] * scaninfo["frames_per_trigger"] - ) - mock_det.hdf5.array_counter._read_pv.mock_data = int( - scaninfo["num_points"] * scaninfo["frames_per_trigger"] - ) - mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] - mock_det.scaninfo.num_points = scaninfo["num_points"] - mock_det.custom_prepare.finished() - assert mock_det.stopped is stopped - mock_stop_det.assert_called_once() - mock_stop_file_writer.assert_called_once() -- 2.49.1 From 7326c471f83c23b330f50ab2941ead465c18b0c5 Mon Sep 17 00:00:00 2001 From: appel_c Date: Wed, 7 Jan 2026 11:12:35 +0100 Subject: [PATCH 4/4] test(falcon): fix test for improved patched_device method in ophyd_devices --- tests/tests_devices/test_falcon_csaxs.py | 46 ++++++++++++------------ 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/tests/tests_devices/test_falcon_csaxs.py b/tests/tests_devices/test_falcon_csaxs.py index 058e7ae..fb1e100 100644 --- a/tests/tests_devices/test_falcon_csaxs.py +++ b/tests/tests_devices/test_falcon_csaxs.py @@ -32,7 +32,7 @@ def mock_det() -> Generator[FalconcSAXS, None, None]: name="falcon", prefix="X12SA-SITORO:", device_manager=dm, - _mock_pv_initial_value=0, + _mock_pv_initial_value=1, ) as dev: try: for dotted_name, device in dev.walk_subdevices(include_lazy=True): @@ -110,30 +110,28 @@ def test_falcon_on_stage(mock_det: FalconcSAXS): falcon.hdf5.array_counter.put(5) # Set to non-zero to check reset # I. Normal case - old_file_event = falcon.file_event.get() - with mock.patch.object(falcon.hdf5.array_size, "get", return_value=(1, 1, 1)): + falcon.stage() + + assert falcon.staged is ophyd.Staged.yes + assert falcon._full_path == get_full_path(falcon.scan_info.msg, falcon.name) + file_path = falcon.hdf5.file_path.get() + file_name = falcon.hdf5.file_name.get() + assert os.path.join(file_path, file_name) == falcon._full_path + + assert falcon.preset_real_time.get() == exp_time + assert falcon.pixels_per_run.get() == num_points * frames_per_trigger + assert falcon.hdf5.num_capture.get() == num_points * frames_per_trigger + assert falcon.hdf5.array_counter.get() == 0 + assert falcon.hdf5.capture.get() == 1 + assert falcon.start_all.get() == 1 + + # II. Unstage device first + falcon.unstage() + exp_time = 1e-3 # Smaller than readout time + falcon.scan_info.msg.scan_parameters["exp_time"] = exp_time + with pytest.raises(ValueError): falcon.stage() - - assert falcon.staged is ophyd.Staged.yes - assert falcon._full_path == get_full_path(falcon.scan_info.msg, falcon.name) - file_path = falcon.hdf5.file_path.get() - file_name = falcon.hdf5.file_name.get() - assert os.path.join(file_path, file_name) == falcon._full_path - - assert falcon.preset_real_time.get() == exp_time - assert falcon.pixels_per_run.get() == num_points * frames_per_trigger - assert falcon.hdf5.num_capture.get() == num_points * frames_per_trigger - assert falcon.hdf5.array_counter.get() == 0 - assert falcon.hdf5.capture.get() == 1 - assert falcon.start_all.get() == 1 - - # II. Unstage device first - falcon.unstage() - exp_time = 1e-3 # Smaller than readout time - falcon.scan_info.msg.scan_parameters["exp_time"] = exp_time - with pytest.raises(ValueError): - falcon.stage() - assert falcon.staged is not ophyd.Staged.no + assert falcon.staged is not ophyd.Staged.no def test_falcon_on_pre_scan(mock_det: FalconcSAXS): -- 2.49.1