diff --git a/csaxs_bec/device_configs/endstation.yaml b/csaxs_bec/device_configs/endstation.yaml index c9a2c20..962bd71 100644 --- a/csaxs_bec/device_configs/endstation.yaml +++ b/csaxs_bec/device_configs/endstation.yaml @@ -42,3 +42,14 @@ ids_cam: enabled: true readoutPriority: async softwareTrigger: True + +eiger_1_5: + description: Eiger 1.5M in-vacuum detector + deviceClass: csaxs_bec.devices.jungfraujoch.eiger_1_5m.Eiger1_5M + deviceConfig: + detector_distance: 100 + beam_center: [0, 0] + onFailure: raise + enabled: true + readoutPriority: async + softwareTrigger: False diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py index 314730d..4990799 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py @@ -33,6 +33,7 @@ from __future__ import annotations import threading import time +import traceback from typing import TYPE_CHECKING from bec_lib.logger import bec_logger @@ -160,6 +161,9 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): # f = e + 1us # e has refernce to d, f has reference to e self.set_delay_pairs(channel="ef", delay=0, width=1e-6) + time.sleep( + 0.2 + ) # After staging, make sure that the DDG HW has some time to process changes properly. def _prepare_mcs_on_trigger(self, mcs: MCSCardCSAXS) -> None: """Prepare the MCS card for the next trigger. @@ -188,7 +192,13 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): while ( self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set() ): - self._poll_loop() + try: + self._poll_loop() + except Exception: # pylint: disable=broad-except + content = traceback.format_exc() + logger.error( + f"Exception in polling loop thread, polling continues...\n Error content:\n{content}" + ) self._poll_thread_poll_loop_done.set() @@ -199,13 +209,17 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): The 20ms sleep was added to ensure that the event status is not polled too frequently, and to give the device time to process the previous command. This was found empirically to be necessary to avoid missing events. + IMPORTANT: Do not remove sleeps or try to optimize this logic. This seems to be a + fragile balance between polling frequency and device processing time. Also in between + start/stop of polling. Please also consider that there is a sleep in on_trigger and + that this might also be necessary to avoid that HW becomes unavailable/unstable. """ self.state.proc_status.put(1, use_complete=True) time.sleep(0.02) # 20ms delay for processing, important for not missing events - if self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set(): + if self._poll_thread_kill_event.is_set() or not self._poll_thread_run_event.is_set(): return self.state.event_status.get(use_monitor=False) - if self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set(): + if self._poll_thread_kill_event.is_set() or not self._poll_thread_run_event.is_set(): return time.sleep(0.02) # 20ms delay for processing, important for not missing events @@ -256,6 +270,9 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): # Stop polling, poll once manually to ensure that the register is clean self._stop_polling() self._poll_thread_poll_loop_done.wait(timeout=1) + # IMPORTANT: Keep this sleep setting, as it is necessary to avoid that the HW + # becomes unresponsive. This was found empirically and seems to be necessary + time.sleep(0.02) # Prepare the MCS card for the next software trigger mcs = self.device_manager.devices.get("mcs", None) diff --git a/csaxs_bec/devices/epics/eiger9m_csaxs.py b/csaxs_bec/devices/epics/eiger9m_csaxs.py deleted file mode 100644 index cf3e31d..0000000 --- a/csaxs_bec/devices/epics/eiger9m_csaxs.py +++ /dev/null @@ -1,381 +0,0 @@ -import enum -import os -import threading -import time -from typing import Any - -import numpy as np -from bec_lib.logger import bec_logger -from ophyd import ADComponent as ADCpt -from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV -from ophyd_devices.interfaces.base_classes.psi_detector_base import ( - CustomDetectorMixin, - PSIDetectorBase, -) -from std_daq_client import StdDaqClient - -logger = bec_logger.logger - - -class EigerError(Exception): - """Base class for exceptions in this module.""" - - -class EigerTimeoutError(EigerError): - """Raised when the Eiger does not respond in time.""" - - -class Eiger9MSetup(CustomDetectorMixin): - """Eiger setup class - - Parent class: CustomDetectorMixin - - """ - - def __init__(self, *args, parent: Device = None, **kwargs) -> None: - super().__init__(*args, parent=parent, **kwargs) - self.std_rest_server_url = ( - kwargs["file_writer_url"] if "file_writer_url" in kwargs else "http://xbl-daq-29:5000" - ) - self.std_client = None - self._lock = threading.RLock() - - def on_init(self) -> None: - """Initialize the detector""" - self.initialize_default_parameter() - self.initialize_detector() - self.initialize_detector_backend() - - def initialize_detector(self) -> None: - """Initialize detector""" - self.stop_detector() - self.parent.cam.trigger_mode.put(TriggerSource.GATING) - - def initialize_default_parameter(self) -> None: - """Set default parameters for Eiger9M detector""" - self.update_readout_time() - - def update_readout_time(self) -> None: - """Set readout time for Eiger9M detector""" - readout_time = ( - self.parent.scaninfo.readout_time - if hasattr(self.parent.scaninfo, "readout_time") - else self.parent.MIN_READOUT - ) - self.parent.readout_time = max(readout_time, self.parent.MIN_READOUT) - - def initialize_detector_backend(self) -> None: - """Initialize detector backend""" - - self.std_client = StdDaqClient(url_base=self.std_rest_server_url) - self.std_client.stop_writer() - eacc = self.parent.scaninfo.username - self.update_std_cfg("writer_user_id", int(eacc.strip(" e"))) - - signal_conditions = [(lambda: self.std_client.get_status()["state"], "READY")] - if not self.wait_for_signals( - signal_conditions=signal_conditions, - timeout=self.parent.TIMEOUT_FOR_SIGNALS, - all_signals=True, - ): - raise EigerTimeoutError( - f"Std client not in READY state, returns: {self.std_client.get_status()}" - ) - - def update_std_cfg(self, cfg_key: str, value: Any) -> None: - """ - Update std_daq config - - Checks that the new value matches the type of the former entry. - - Args: - cfg_key (str) : config key of value to be updated - value (Any) : value to be updated for the specified key - - Raises: - Raises EigerError if the key was not in the config before and if the new value does not match the type of the old value - - """ - - cfg = self.std_client.get_config() - old_value = cfg.get(cfg_key) - if old_value is None: - raise EigerError( - f"Tried to change entry for key {cfg_key} in std_config that does not exist" - ) - if not isinstance(value, type(old_value)): - raise EigerError( - f"Type of new value {type(value)}:{value} does not match old value" - f" {type(old_value)}:{old_value}" - ) - - cfg.update({cfg_key: value}) - logger.debug(cfg) - self.std_client.set_config(cfg) - logger.debug(f"Updated std_daq config for key {cfg_key} from {old_value} to {value}") - - def on_stage(self) -> None: - """Prepare the detector for scan""" - self.prepare_detector() - self.prepare_data_backend() - self.publish_file_location(done=False, successful=False) - self.arm_acquisition() - - def prepare_detector(self) -> None: - """Prepare detector for scan""" - self.set_detector_threshold() - self.set_acquisition_params() - self.parent.cam.trigger_mode.put(TriggerSource.GATING) - - def set_detector_threshold(self) -> None: - """ - Set the detector threshold - - The function sets the detector threshold automatically to 1/2 of the beam energy. - """ - mokev = self.parent.device_manager.devices.mokev.obj.read()[ - self.parent.device_manager.devices.mokev.name - ]["value"] - factor = 1 - unit = getattr(self.parent.cam.threshold_energy, "units", None) - - if unit is not None and unit == "eV": - factor = 1000 - setpoint = int(mokev * factor) - energy = self.parent.cam.beam_energy.read()[self.parent.cam.beam_energy.name]["value"] - - if setpoint != energy: - self.parent.cam.beam_energy.set(setpoint) - - threshold = self.parent.cam.threshold_energy.read()[self.parent.cam.threshold_energy.name][ - "value" - ] - if not np.isclose(setpoint / 2, threshold, rtol=0.05): - self.parent.cam.threshold_energy.set(setpoint / 2) - - def set_acquisition_params(self) -> None: - """Set acquisition parameters for the detector""" - self.parent.cam.num_images.put( - int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger) - ) - self.parent.cam.num_frames.put(1) - self.update_readout_time() - - def prepare_data_backend(self) -> None: - """Prepare the data backend for the scan""" - self.parent.filepath.set( - self.parent.filewriter.compile_full_filename(f"{self.parent.name}.h5") - ).wait() - self.filepath_exists(self.parent.filepath.get()) - self.stop_detector_backend() - try: - self.std_client.start_writer_async( - { - "output_file": self.parent.filepath.get(), - "n_images": int( - self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger - ), - } - ) - except Exception as exc: - time.sleep(5) - if self.std_client.get_status()["state"] == "READY": - raise EigerTimeoutError(f"Timeout of start_writer_async with {exc}") from exc - - signal_conditions = [ - (lambda: self.std_client.get_status()["acquisition"]["state"], "WAITING_IMAGES") - ] - if not self.wait_for_signals( - signal_conditions=signal_conditions, - timeout=self.parent.TIMEOUT_FOR_SIGNALS, - check_stopped=False, - all_signals=True, - ): - raise EigerTimeoutError( - "Timeout of 5s reached for std_daq start_writer_async with std_daq client status" - f" {self.std_client.get_status()}" - ) - - def on_unstage(self) -> None: - """Unstage the detector""" - pass - - def on_complete(self) -> None: - """Complete the detector""" - self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS) - self.publish_file_location(done=True, successful=True) - - def on_stop(self) -> None: - """Stop the detector""" - self.stop_detector() - self.stop_detector_backend() - - def stop_detector(self) -> None: - """Stop the detector""" - - # Stop detector - self.parent.cam.acquire.put(0) - signal_conditions = [ - ( - lambda: self.parent.cam.detector_state.read()[self.parent.cam.detector_state.name][ - "value" - ], - DetectorState.IDLE, - ) - ] - - if not self.wait_for_signals( - signal_conditions=signal_conditions, - timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2, - check_stopped=True, - all_signals=False, - ): - # Retry stop detector and wait for remaining time - self.parent.cam.acquire.put(0) - if not self.wait_for_signals( - signal_conditions=signal_conditions, - timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2, - check_stopped=True, - all_signals=False, - ): - raise EigerTimeoutError( - f"Failed to stop detector, detector state {signal_conditions[0][0]}" - ) - - def stop_detector_backend(self) -> None: - """Close file writer""" - self.std_client.stop_writer() - - def filepath_exists(self, filepath: str) -> None: - """Check if filepath exists""" - signal_conditions = [(lambda: os.path.exists(os.path.dirname(filepath)), True)] - if not self.wait_for_signals( - signal_conditions=signal_conditions, - timeout=self.parent.TIMEOUT_FOR_SIGNALS, - check_stopped=False, - all_signals=True, - ): - raise EigerError(f"Timeout of 3s reached for filepath {filepath}") - - def arm_acquisition(self) -> None: - """Arm Eiger detector for acquisition""" - self.parent.cam.acquire.put(1) - signal_conditions = [ - ( - lambda: self.parent.cam.detector_state.read()[self.parent.cam.detector_state.name][ - "value" - ], - DetectorState.RUNNING, - ) - ] - if not self.wait_for_signals( - signal_conditions=signal_conditions, - timeout=self.parent.TIMEOUT_FOR_SIGNALS, - check_stopped=True, - all_signals=False, - ): - raise EigerTimeoutError( - f"Failed to arm the acquisition. Detector state {signal_conditions[0][0]}" - ) - - def finished(self, timeout: int = 5) -> None: - """Check if acquisition is finished.""" - with self._lock: - signal_conditions = [ - ( - lambda: self.parent.cam.acquire.read()[self.parent.cam.acquire.name]["value"], - DetectorState.IDLE, - ), - (lambda: self.std_client.get_status()["acquisition"]["state"], "FINISHED"), - ( - lambda: self.std_client.get_status()["acquisition"]["stats"][ - "n_write_completed" - ], - int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger), - ), - ] - if not self.wait_for_signals( - signal_conditions=signal_conditions, - timeout=timeout, - check_stopped=True, - all_signals=True, - ): - raise EigerTimeoutError( - f"Reached timeout with detector state {signal_conditions[0][0]}, std_daq state" - f" {signal_conditions[1][0]} and received frames of {signal_conditions[2][0]} for" - " the file writer" - ) - self.stop_detector() - self.stop_detector_backend() - - -class SLSDetectorCam(Device): - """ - SLS Detector Camera - Eiger9M - - Base class to map EPICS PVs to ophyd signals. - """ - - threshold_energy = ADCpt(EpicsSignalWithRBV, "ThresholdEnergy") - beam_energy = ADCpt(EpicsSignalWithRBV, "BeamEnergy") - bit_depth = ADCpt(EpicsSignalWithRBV, "BitDepth") - num_images = ADCpt(EpicsSignalWithRBV, "NumCycles") - num_frames = ADCpt(EpicsSignalWithRBV, "NumFrames") - trigger_mode = ADCpt(EpicsSignalWithRBV, "TimingMode") - trigger_software = ADCpt(EpicsSignal, "TriggerSoftware") - acquire = ADCpt(EpicsSignal, "Acquire") - detector_state = ADCpt(EpicsSignalRO, "DetectorState_RBV") - - -class TriggerSource(int, enum.Enum): - """Trigger signals for Eiger9M detector""" - - AUTO = 0 - TRIGGER = 1 - GATING = 2 - BURST_TRIGGER = 3 - - -class DetectorState(int, enum.Enum): - """Detector states for Eiger9M detector""" - - IDLE = 0 - ERROR = 1 - WAITING = 2 - FINISHED = 3 - TRANSMITTING = 4 - RUNNING = 5 - STOPPED = 6 - STILL_WAITING = 7 - INITIALIZING = 8 - DISCONNECTED = 9 - ABORTED = 10 - - -class Eiger9McSAXS(PSIDetectorBase): - """ - Eiger9M detector for CSAXS - - Parent class: PSIDetectorBase - - class attributes: - custom_prepare_cls (FalconSetup) : Custom detector setup class for cSAXS, - inherits from CustomDetectorMixin - PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector - Various EpicsPVs for controlling the detector - """ - - # Specify which functions are revealed to the user in BEC client - USER_ACCESS = [] - - # specify Setup class - custom_prepare_cls = Eiger9MSetup - # specify minimum readout time for detector and timeout for checks after unstage - MIN_READOUT = 3e-3 - TIMEOUT_FOR_SIGNALS = 5 - # specify class attributes - cam = ADCpt(SLSDetectorCam, "cam1:") - - -if __name__ == "__main__": - eiger = Eiger9McSAXS(name="eiger", prefix="X12SA-ES-EIGER9M:", sim_mode=True) diff --git a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py index 25958c9..91149eb 100644 --- a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py +++ b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py @@ -158,7 +158,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): self.acquire_mode.set(ACQUIREMODE.MCS).wait(timeout=self._pv_timeout) # Subscribe the progress signal - self.current_channel.subscribe(self._progress_update, run=False) + # self.current_channel.subscribe(self._progress_update, run=False) # Subscribe to the mca updates for name in self.counter_mapping.keys(): diff --git a/csaxs_bec/devices/jungfraujoch/eiger.py b/csaxs_bec/devices/jungfraujoch/eiger.py new file mode 100644 index 0000000..70ee01f --- /dev/null +++ b/csaxs_bec/devices/jungfraujoch/eiger.py @@ -0,0 +1,318 @@ +""" +Generic integration of JungfrauJoch backend with Eiger detectors +for the cSAXS beamline at the Swiss Light Source. + +The WEB UI is available on http://sls-jfjoch-001:8080 + +NOTE: this may not be the best place to store this information. It should be migrated to +beamline documentation for debugging of Eiger & JungfrauJoch. + +The JungfrauJoch server for cSAXS runs on sls-jfjoch-001.psi.ch +User with sufficient rights may use: +- sudo systemctl restart jfjoch_broker +- sudo systemctl status jfjoch_broker +to check and/or restart the broker for the JungfrauJoch server. + +Some extra notes for setting up the detector: +- If the energy on JFJ is set via DetectorSettings, the variable in DatasetSettings will be ignored +- Changes in energy may take time, good to implement logic that only resets energy if needed. +- For the Eiger, the frame_time_us in DetectorSettings is ignored, only the frame_time_us in + the DatasetSettings is relevant +- The bit_depth will be adjusted automatically based on the exp_time. Here, we need to ensure + that subsequent triggers properly + consider the readout_time of the boards. For Jungfrau detectors, the difference between + count_time_us and frame_time_us is the readout_time of the boards. For the Eiger, this needs + to be taken into account during the integration. +- beam_center and detector settings are required input arguments, thus, they may be set to wrong + values for acquisitions to start. Please keep this in mind. + +Hardware related notes: +- If there is an HW issue with the detector, power cycling may help. +- The sls_detector package is available on console on /sls/X12SA/data/gac-x12sa/erik/micromamba + - Run: source setup_9m.sh # Be careful, this connects to the detector, so it should not be + used during operation + - Useful commands: + - p highvoltage 0 or 150 (operational) + - g highvoltage + - # Put high voltage to 0 before power cylcing it. + - telnet bchip500 + - cd power_control_user/ + - ./on + - ./off + +Further information that may be relevant for debugging: +JungfrauJoch - one needs to connect to the jfj-server (sls-jfjoch-001) +""" + +from __future__ import annotations + +import os +import time +from typing import TYPE_CHECKING, Literal + +import yaml +from bec_lib.file_utils import get_full_path +from bec_lib.logger import bec_logger +from jfjoch_client.models.dataset_settings import DatasetSettings +from jfjoch_client.models.detector_settings import DetectorSettings +from jfjoch_client.models.detector_state import DetectorState +from jfjoch_client.models.detector_timing import DetectorTiming +from jfjoch_client.models.file_writer_format import FileWriterFormat +from jfjoch_client.models.file_writer_settings import FileWriterSettings +from ophyd import Component as Cpt +from ophyd import DeviceStatus +from ophyd_devices import FileEventSignal, PreviewSignal +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase + +from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import JungfrauJochClient +from csaxs_bec.devices.jungfraujoch.jungfraujoch_preview import JungfrauJochPreview + +if TYPE_CHECKING: # pragma no cover + from bec_lib.devicemanager import ScanInfo + from bec_server.device_server.device_server import DeviceManagerDS + from jfjoch_client.models.measurement_statistics import MeasurementStatistics + + +logger = bec_logger.logger + +EIGER_READOUT_TIME_US = 500e-6 # 500 microseconds in s + + +class EigerError(Exception): + """Custom exception for Eiger detector errors.""" + + +class Eiger(PSIDeviceBase): + """ + Base integration of the Eiger1.5M and Eiger9M at cSAXS. All relevant + """ + + USER_ACCESS = ["detector_distance", "beam_center"] + + file_event = Cpt(FileEventSignal, name="file_event") + preview_image = Cpt(PreviewSignal, name="preview_image", ndim=2) + + def __init__( + self, + name: str, + detector_name: Literal["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"], + host: str = "http://sls-jfjoch-001", + port: int = 8080, + detector_distance: float = 100.0, + beam_center: tuple[int, int] = (0, 0), + scan_info: ScanInfo = None, + readout_time: float = EIGER_READOUT_TIME_US, + device_manager=None, + **kwargs, + ): + """ + Initialize the PSI Device Base class. + + Args: + name (str) : Name of the device + detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"] + host (str): Hostname of the Jungfrau Joch server. + port (int): Port of the Jungfrau Joch server. + scan_info (ScanInfo): The scan info to use. + device_manager (DeviceManagerDS): The device manager to use. + **kwargs: Additional keyword arguments. + """ + super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs) + self._host = f"{host}:{port}" + self.jfj_client = JungfrauJochClient(host=self._host, parent=self) + self.jfj_preview_client = JungfrauJochPreview( + url="tcp://129.129.95.114:5400", cb=self.preview_image.put + ) # IP of sls-jfjoch-001.psi.ch on port 5400 for ZMQ stream + self.device_manager = device_manager + self.detector_name = detector_name + self._detector_distance = detector_distance + self._beam_center = beam_center + self._readout_time = readout_time + self._full_path = "" + if self.device_manager is not None: + self.device_manager: DeviceManagerDS + + @property + def detector_distance(self) -> float: + """The detector distance in mm.""" + return self._detector_distance + + @detector_distance.setter + def detector_distance(self, value: float) -> None: + """Set the detector distance in mm.""" + if value <= 0: + raise ValueError("Detector distance must be a positive value.") + self._detector_distance = value + + @property + def beam_center(self) -> tuple[float, float]: + """The beam center in pixels. (x,y)""" + return self._beam_center + + @beam_center.setter + def beam_center(self, value: tuple[float, float]) -> None: + """Set the beam center in pixels. (x,y)""" + self._beam_center = value + + def on_init(self) -> None: + """ + Called when the device is initialized. + + No siganls are connected at this point, + thus should not be set here but in on_connected instead. + """ + + def on_connected(self) -> None: + """ + Called after the device is connected and its signals are connected. + Default values for signals should be set here. + """ + logger.debug(f"On connected called for {self.name}") + self.jfj_client.stop(request_timeout=3) + # Check which detector is selected + + # Get available detectors + available_detectors = self.jfj_client.api.config_select_detector_get(_request_timeout=5) + # Get current detector + current_detector_name = "" + if available_detectors.current_id: + detector_selection = [ + det.description + for det in available_detectors.detectors + if det.id == available_detectors.current_id + ] + current_detector_name = detector_selection[0] if detector_selection else "" + if current_detector_name != self.detector_name: + raise RuntimeError( + f"Please select and initialise the detector {self.detector_name} in the WEB UI: {self._host}." + ) + if self.jfj_client.detector_state != DetectorState.IDLE: + raise RuntimeError( + f"Detector {self.detector_name} is not in IDLE state, current state: {self.jfj_client.detector_state}. Please initialize the detector in the WEB UI: {self._host}." + ) + # TODO - check again once Eiger should be initialized automatically, currently human initialization is expected + # # Once the automation should be enabled, we may use here + # detector_selection = [ + # det for det in available_detectors.detectors if det.id == self.detector_name + # ] + # if not detector_selection: + # raise ValueError( + # f"Detector {self.detector_name} not found in available detectors: {[det.description for det in available_detectors.detectors]}" + # ) + # det_id = detector_selection[0].id + # self.jfj_client.api.config_select_detector_put( + # detector_selection=DetectorSelection(id=det_id), _request_timeout=5 + # ) + # self.jfj_client.connect_and_initialise(timeout=10) + + # Setup Detector settings, here we may also set the energy already as this might be time consuming + settings = DetectorSettings(frame_time_us=int(500), timing=DetectorTiming.TRIGGER) + self.jfj_client.set_detector_settings(settings, timeout=10) + # Set the file writer to the appropriate output for the HDF5 file + file_writer_settings = FileWriterSettings(overwrite=True, format=FileWriterFormat.NXMXVDS) + logger.debug( + f"Setting writer_settings: {yaml.dump(file_writer_settings.to_dict(), indent=4)}" + ) + self.jfj_client.api.config_file_writer_put( + file_writer_settings=file_writer_settings, _request_timeout=10 + ) + # Start the preview client + self.jfj_preview_client.connect() + self.jfj_preview_client.start() + logger.info(f"Connected to JungfrauJoch preview stream at {self.jfj_preview_client.url}") + + def on_stage(self) -> DeviceStatus | None: + """ + Called while staging the device. + + Information about the upcoming scan can be accessed from the scan_info object. + """ + start_time = time.time() + scan_msg = self.scan_info.msg + # Set acquisition parameter + # TODO add check of mono energy, this can then also be passed to DatasetSettings + incident_energy = 12.0 + exp_time = scan_msg.scan_parameters.get("exp_time", 0) + if exp_time <= self._readout_time: + raise ValueError( + f"Receive scan request for scan {scan_msg.scan_name} with exp_time {exp_time}s, which must be larger than the readout time {self._readout_time}s of the detector {self.detector_name}." + ) + frame_time_us = exp_time # + ntrigger = int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"]) + # Fetch file path + self._full_path = get_full_path(scan_msg, name=f"{self.name}_master") + self._full_path = os.path.abspath(os.path.expanduser(self._full_path)) + # Inform BEC about upcoming file event + self.file_event.put( + file_path=self._full_path, + done=False, + successful=False, + hinted_h5_entries={"data": "entry/data/data"}, + ) + # JFJ adds _master.h5 automatically + path = os.path.relpath(self._full_path, start="/sls/x12sa/data").removesuffix("_master.h5") + data_settings = DatasetSettings( + image_time_us=int(frame_time_us * 1e6), # This is currently ignored + ntrigger=ntrigger, + file_prefix=path, + beam_x_pxl=int(self._beam_center[0]), + beam_y_pxl=int(self._beam_center[1]), + detector_distance_mm=self.detector_distance, + incident_energy_ke_v=incident_energy, + ) + logger.debug(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}") + prep_time = start_time - time.time() + logger.debug(f"Prepared information for eiger to start acquisition in {prep_time:.2f}s") + self.jfj_client.wait_for_idle(timeout=10, request_timeout=10) # Ensure we are in IDLE state + self.jfj_client.start(settings=data_settings) # Takes around ~0.6s + logger.debug(f"Wait for IDLE and start call took {time.time()-start_time-prep_time:.2f}s") + + def on_unstage(self) -> DeviceStatus: + """Called while unstaging the device.""" + + def on_pre_scan(self) -> DeviceStatus: + """Called right before the scan starts on all devices automatically.""" + + def on_trigger(self) -> DeviceStatus: + """Called when the device is triggered.""" + + def _file_event_callback(self, status: DeviceStatus) -> None: + """Callback to update the file_event signal when the acquisition is done.""" + logger.info(f"Acquisition done callback called for {self.name} for status {status.success}") + self.file_event.put( + file_path=self._full_path, + done=status.done, + successful=status.success, + hinted_h5_entries={"data": "entry/data/data"}, + ) + + def on_complete(self) -> DeviceStatus: + """Called to inquire if a device has completed a scans.""" + + def wait_for_complete(): + start_time = time.time() + timeout = 10 + for _ in range(timeout): + if self.jfj_client.wait_for_idle(timeout=1, request_timeout=10): + return + statistics: MeasurementStatistics = self.jfj_client.api.statistics_data_collection_get( + _request_timeout=5 + ) + raise TimeoutError( + f"Timeout after waiting for detector {self.name} to complete for {time.time()-start_time:.2f}s, measurement statistics: {yaml.dump(statistics.to_dict(), indent=4)}" + ) + + status = self.task_handler.submit_task(wait_for_complete, run=True) + status.add_callback(self._file_event_callback) + self.cancel_on_stop(status) + return status + + def on_kickoff(self) -> DeviceStatus | None: + """Called to kickoff a device for a fly scan. Has to be called explicitly.""" + + def on_stop(self) -> None: + """Called when the device is stopped.""" + self.jfj_client.stop( + request_timeout=0.5 + ) # Call should not block more than 0.5 seconds to stop all devices... + self.task_handler.shutdown() diff --git a/csaxs_bec/devices/jungfraujoch/eiger_1_5m.py b/csaxs_bec/devices/jungfraujoch/eiger_1_5m.py new file mode 100644 index 0000000..22432c7 --- /dev/null +++ b/csaxs_bec/devices/jungfraujoch/eiger_1_5m.py @@ -0,0 +1,54 @@ +""" +Eiger 1.5M specific integration. It is based on the Eiger base integration for the JungfrauJoch backend +which is placed in eiger_csaxs, and where code that is equivalent for the Eiger9M and Eiger1.5M is shared. + +Please check the eiger_csaxs.py class for more details about the relevant services. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from csaxs_bec.devices.jungfraujoch.eiger import Eiger + +EIGER1_5M_READOUT_TIME_US = 500e-6 # 500 microseconds in s +DETECTOR_NAME = "EIGER 1.5M" + + +if TYPE_CHECKING: # pragma no cover + from bec_lib.devicemanager import ScanInfo + from bec_server.device_server.device_server import DeviceManagerDS + + +# pylint:disable=invalid-name +class Eiger1_5M(Eiger): + """ + Eiger 1.5M specific integration for the in-vaccum Eiger. + + The logic implemented here is coupled to the DelayGenerator integration, + repsonsible for the global triggering of all devices through a single Trigger logic. + Please check the eiger.py class for more details about the integration of relevant backend + services. The detector_name must be set to "EIGER 1.5M: + """ + + USER_ACCESS = Eiger.USER_ACCESS + [] # Add more user_access methods here. + + def __init__( + self, + name: str, + detector_distance: float = 100.0, + beam_center: tuple[float, float] = (0.0, 0.0), + scan_info: ScanInfo = None, + device_manager: DeviceManagerDS = None, + **kwargs, + ) -> None: + super().__init__( + name=name, + detector_name=DETECTOR_NAME, + readout_time=EIGER1_5M_READOUT_TIME_US, + detector_distance=detector_distance, + beam_center=beam_center, + scan_info=scan_info, + device_manager=device_manager, + **kwargs, + ) diff --git a/csaxs_bec/devices/jungfraujoch/eiger_9m.py b/csaxs_bec/devices/jungfraujoch/eiger_9m.py new file mode 100644 index 0000000..f44ca1d --- /dev/null +++ b/csaxs_bec/devices/jungfraujoch/eiger_9m.py @@ -0,0 +1,58 @@ +""" +Eiger 9M specific integration. It is based on the Eiger base integration for the JungfrauJoch backend +which is placed in eiger_csaxs, and where code that is equivalent for the Eiger9M and Eiger1.5M is shared. + +Please check the eiger_csaxs.py class for more details about the relevant services. + +In 16bit mode, 8e7 counts/s per pixel are supported in summed up frames, +although subframes will never have more than 12bit counts (~4000 counts per pixel in subframe). +In 32bit mode, 2e7 counts/s per pixel are supported, for which subframes will have no +more than 24bit counts, which means 16.7 million counts per pixel in subframes. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from csaxs_bec.devices.jungfraujoch.eiger import Eiger + +if TYPE_CHECKING: # pragma no cover + from bec_lib.devicemanager import ScanInfo + from bec_server.device_server.device_server import DeviceManagerDS + +EIGER9M_READOUT_TIME_US = 500e-6 # 500 microseconds in s +DETECTOR_NAME = "EIGER 8.5M (tmp)" # "EIGER 9M"" + + +# pylint:disable=invalid-name +class Eiger9M(Eiger): + """ + Eiger 1.5M specific integration for the in-vaccum Eiger. + + The logic implemented here is coupled to the DelayGenerator integration, + repsonsible for the global triggering of all devices through a single Trigger logic. + Please check the eiger.py class for more details about the integration of relevant backend + services. The detector_name must be set to "EIGER 1.5M: + """ + + USER_ACCESS = Eiger.USER_ACCESS + [] # Add more user_access methods here. + + def __init__( + self, + name: str, + detector_distance: float = 100.0, + beam_center: tuple[float, float] = (0.0, 0.0), + scan_info: ScanInfo = None, + device_manager: DeviceManagerDS = None, + **kwargs, + ) -> None: + super().__init__( + name=name, + detector_name=DETECTOR_NAME, + readout_time=EIGER9M_READOUT_TIME_US, + detector_distance=detector_distance, + beam_center=beam_center, + scan_info=scan_info, + device_manager=device_manager, + **kwargs, + ) diff --git a/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py b/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py index 2a02dc6..1db6b15 100644 --- a/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py +++ b/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py @@ -1,20 +1,35 @@ -import enum -import math +"""Module with client interface for the Jungfrau Joch detector API""" -import jfjoch_client +from __future__ import annotations + +import enum +import time +import traceback +from typing import TYPE_CHECKING + +import requests from bec_lib.logger import bec_logger +from jfjoch_client.api.default_api import DefaultApi +from jfjoch_client.api_client import ApiClient +from jfjoch_client.configuration import Configuration +from jfjoch_client.models.broker_status import BrokerStatus +from jfjoch_client.models.dataset_settings import DatasetSettings +from jfjoch_client.models.detector_settings import DetectorSettings logger = bec_logger.logger +if TYPE_CHECKING: + from ophyd import Device + +# pylint: disable=raise-missing-from +# pylint: disable=broad-except class JungfrauJochClientError(Exception): """Base class for exceptions in this module.""" -class DetectorState(enum.StrEnum): - """Detector states for Jungfrau Joch detector - ['Inactive', 'Idle', 'Busy', 'Measuring', 'Pedestal', 'Error'] - """ +class DetectorState(str, enum.Enum): + """Possible Detector states for Jungfrau Joch detector""" INACTIVE = "Inactive" IDLE = "Idle" @@ -24,24 +39,30 @@ class DetectorState(enum.StrEnum): ERROR = "Error" -class ResponseWaitDone(enum.IntEnum): - """Response state for Jungfrau Joch detector wait till done""" - - DETECTOR_IDLE = 200 - TIMEOUT_PARAM_OUT_OF_RANGE = 400 - JUNGFRAU_ERROR = 500 - DETECTOR_INACTIVE = 502 - TIMEOUT_REACHED = 504 - - class JungfrauJochClient: - """Thin wrapper around the Jungfrau Joch API client""" + """Thin wrapper around the Jungfrau Joch API client. - def __init__(self, host: str = "http://sls-jfjoch-001:8080") -> None: + sudo systemctl restart jfjoch_broker + sudo systemctl status jfjoch_broker + + It looks as if the detector is not being stopped properly. + One module remains running, how can we restart the detector? + """ + + def __init__( + self, host: str = "http://sls-jfjoch-001:8080", parent: Device | None = None + ) -> None: self._initialised = False - configuration = jfjoch_client.Configuration(host=host) - api_client = jfjoch_client.ApiClient(configuration) - self.api = jfjoch_client.DefaultApi(api_client) + configuration = Configuration(host=host) + api_client = ApiClient(configuration) + self.api = DefaultApi(api_client) + self._parent_name = parent.name if parent else self.__class__.__name__ + + @property + def jjf_state(self) -> BrokerStatus: + """Get the status of JungfrauJoch""" + response = self.api.status_get() + return BrokerStatus(**response.to_dict()) @property def initialised(self) -> bool: @@ -53,101 +74,113 @@ class JungfrauJochClient: """Set the connected status""" self._initialised = value - def get_jungfrau_joch_status(self) -> DetectorState: + # TODO this is not correct, as it may be that the state in INACTIVE. Models are not in sync... + # REMOVE all model enums as most of the validation takes place in the Pydantic models, i.e. BrokerStatus here.. + @property + def detector_state(self) -> DetectorState: """Get the status of JungfrauJoch""" - return self.api.status_get().state + return DetectorState(self.jjf_state.state) - def connect_and_initialise(self, timeout: int = 5) -> None: + def connect_and_initialise(self, timeout: int = 10, **kwargs) -> None: """Check if JungfrauJoch is connected and ready to receive commands""" - status = self.api.status_get().state + status = self.detector_state if status != DetectorState.IDLE: - self.api.initialize_post() - self.wait_till_done(timeout) - self.initialised = True + self.api.initialize_post() # This is a blocking call.... + self.wait_for_idle(timeout, request_timeout=timeout) # Blocking call + self.initialised = True - def set_detector_settings(self, settings: dict | jfjoch_client.DatasetSettings) -> None: + def set_detector_settings(self, settings: dict | DetectorSettings, timeout: int = 10) -> None: """Set the detector settings. JungfrauJoch must be in IDLE, Error or Inactive state. Note, the full settings have to be provided, otherwise the settings will be overwritten with default values. Args: settings (dict): dictionary of settings """ - state = self.api.status_get().state + state = self.detector_state if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]: - raise JungfrauJochClientError( - f"Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}" - ) + time.sleep(1) # Give the detector 1s to become IDLE, retry + state = self.detector_state + if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]: + raise JungfrauJochClientError( + f"Error in {self._parent_name}. Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}" + ) if isinstance(settings, dict): - settings = jfjoch_client.DatasetSettings(**settings) - self.api.config_detector_put(settings) + settings = DetectorSettings(**settings) + try: + self.api.config_detector_put(detector_settings=settings, _request_timeout=timeout) + except requests.exceptions.Timeout: + raise TimeoutError(f"Timeout while setting detector settings for {self._parent_name}") + except Exception: + content = traceback.format_exc() + raise JungfrauJochClientError( + f"Error while setting detector settings for {self._parent_name}: {content}" + ) - def set_mesaurement_settings(self, settings: dict | jfjoch_client.DatasetSettings) -> None: - """Set the measurement settings. JungfrauJoch must be in IDLE state. + def start(self, settings: dict | DatasetSettings, request_timeout: float = 10) -> None: + """Start the mesaurement. DatasetSettings must be provided, and JungfrauJoch must be in IDLE state. The method call is blocking and JungfrauJoch will be ready to measure after the call resolves. - Please check the DataSettings class for the available settings. - The minimum required settings are: - beam_x_pxl: StrictFloat | StrictInt, - beam_y_pxl: StrictFloat | StrictInt, - detector_distance_mm: float | int, - incident_energy_keV: float | int, - Args: settings (dict): dictionary of settings + + Please check the DataSettings class for the available settings. Minimum required settings are + beam_x_pxl, beam_y_pxl, detector_distance_mm, incident_energy_keV. + """ - state = self.api.status_get().state + state = self.detector_state if state != DetectorState.IDLE: raise JungfrauJochClientError( - f"Detector must be in IDLE state to set settings. Current state: {state}" + f"Error in {self._parent_name}. Detector must be in IDLE state to set settings. Current state: {state}" ) if isinstance(settings, dict): - settings = jfjoch_client.DatasetSettings(**settings) + settings = DatasetSettings(**settings) try: - res = self.api.start_post_with_http_info(dataset_settings=settings) - if res.status_code != 200: - logger.error( - f"Error while setting measurement settings {settings}, response: {res}" - ) - raise JungfrauJochClientError( - f"Error while setting measurement settings {settings}, response: {res}" - ) - except Exception as e: - logger.error( - f"Error while setting measurement settings {settings}. Exception raised {e}" + self.api.start_post_with_http_info( + dataset_settings=settings, _request_timeout=request_timeout ) + except requests.exceptions.Timeout: + raise TimeoutError( + f"TimeoutError in JungfrauJochClient for parent device {self._parent_name} for 'start' call" + ) + except Exception: + content = traceback.format_exc() raise JungfrauJochClientError( - f"Error while setting measurement settings {settings}. Exception raised {e}" - ) from e + f"Error in JungfrauJochClient for parent device {self._parent_name} during 'start' call: {content}" + ) - def wait_till_done(self, timeout: int = 5) -> None: - """Wait until JungfrauJoch is done. + def stop(self, request_timeout: float = 0.5) -> None: + """Stop the acquisition, this only logs errors and is not raising.""" + try: + self.api.cancel_post_with_http_info(_request_timeout=request_timeout) + except requests.exceptions.Timeout: + content = traceback.format_exc() + logger.error( + f"Timeout in JungFrauJochClient for device {self._parent_name} during stop: {content}" + ) + except Exception: + content = traceback.format_exc() + logger.error( + f"Error in JungFrauJochClient for device {self._parent_name} during stop: {content}" + ) + + def wait_for_idle(self, timeout: int = 10, request_timeout: float | None = None) -> bool: + """Wait for JungfrauJoch to be in Idle state. Blocking call with timeout. Args: timeout (int): timeout in seconds + Returns: + bool: True if the detector is in IDLE state, False if timeout occurred """ - success = False + if request_timeout is None: + request_timeout = timeout try: - response = self.api.wait_till_done_post_with_http_info(math.ceil(timeout / 2)) - if response.status_code != ResponseWaitDone.DETECTOR_IDLE: - logger.info( - f"Waitin for JungfrauJoch to be done, status: {ResponseWaitDone(response.status_code)}; response msg {response}" - ) - response = self.api.wait_till_done_post_with_http_info(math.floor(timeout / 2)) - if response.status_code == ResponseWaitDone.DETECTOR_IDLE: - success = True - return - except Exception as e: - logger.error(f"Error while waiting for JungfrauJoch to initialise: {e}") - raise JungfrauJochClientError( - f"Error while waiting for JungfrauJoch to initialise: {e}" - ) from e - else: - if success is False: - logger.error( - f"Failed to initialise JungfrauJoch with status: {response.status_code}; response msg {response}" - ) - raise JungfrauJochClientError( - f"Failed to initialise JungfrauJoch with status: {response.status_code}; response msg {response}" - ) + self.api.wait_till_done_post(timeout=timeout, _request_timeout=request_timeout) + except requests.exceptions.Timeout: + raise TimeoutError(f"HTTP request timeout in wait_for_idle for {self._parent_name}") + except Exception: + content = traceback.format_exc() + logger.debug(f"Waiting for device {self._parent_name} to become IDLE: {content}") + return False + return True diff --git a/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py b/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py new file mode 100644 index 0000000..bfda46d --- /dev/null +++ b/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py @@ -0,0 +1,96 @@ +"""Module for the Eiger preview ZMQ stream.""" + +from __future__ import annotations + +import json +import threading +import time +from typing import Callable + +import numpy as np +import zmq +from bec_lib.logger import bec_logger + +logger = bec_logger.logger + +ZMQ_TOPIC_FILTER = b"" + + +class JungfrauJochPreview: + USER_ACCESS = ["start", "stop"] + + def __init__(self, url: str, cb: Callable): + self.url = url + self._socket = None + self._shutdown_event = threading.Event() + self._zmq_thread = None + self._on_update_callback = cb + + def connect(self): + """Connect to the JungfrauJoch PUB-SUB streaming interface + + JungfrauJoch may reject connection for a few seconds when it restarts, + so if it fails, wait a bit and try to connect again. + """ + # pylint: disable=no-member + + context = zmq.Context() + self._socket = context.socket(zmq.SUB) + self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER) + try: + self._socket.connect(self.url) + except ConnectionRefusedError: + time.sleep(1) + self._socket.connect(self.url) + + def start(self): + self._zmq_thread = threading.Thread( + target=self._zmq_update_loop, daemon=True, name="JungfrauJoch_live_preview" + ) + self._zmq_thread.start() + + def stop(self): + self._shutdown_event.set() + if self._zmq_thread: + self._zmq_thread.join() + + def _zmq_update_loop(self): + while not self._shutdown_event.is_set(): + if self._socket is None: + self.connect() + try: + self._poll() + except ValueError: + # Happens when ZMQ partially delivers the multipart message + pass + except zmq.error.Again: + # Happens when receive queue is empty + time.sleep(0.1) + + def _poll(self): + """ + Poll the ZMQ socket for new data. It will throttle the data update and + only subscribe to the topic for a single update. This is not very nice + but it seems like there is currently no option to set the update rate on + the backend. + """ + + if self._shutdown_event.wait(0.2): + return + + try: + # subscribe to the topic + self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER) + + # pylint: disable=no-member + r = self._socket.recv_multipart(flags=zmq.NOBLOCK) + self._parse_data(r) + + finally: + # Unsubscribe from the topic + self._socket.setsockopt(zmq.UNSUBSCRIBE, ZMQ_TOPIC_FILTER) + + def _parse_data(self, data): + # TODO decode and parse the data + # self._on_update_callback(data) + pass diff --git a/frame_dump.cbor b/frame_dump.cbor new file mode 100644 index 0000000..8916581 Binary files /dev/null and b/frame_dump.cbor differ diff --git a/pyproject.toml b/pyproject.toml index 9adffe4..269d369 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "pyepics", "pyueye", # for the IDS uEye camera "bec_widgets", + "zmq", ] [project.optional-dependencies] diff --git a/tests/tests_devices/test_eiger.py b/tests/tests_devices/test_eiger.py new file mode 100644 index 0000000..e43c981 --- /dev/null +++ b/tests/tests_devices/test_eiger.py @@ -0,0 +1,318 @@ +# pylint: skip-file +import os +import threading +from time import time +from typing import TYPE_CHECKING, Generator +from unittest import mock + +import pytest +from bec_lib.messages import FileMessage, ScanStatusMessage +from jfjoch_client.models.broker_status import BrokerStatus +from jfjoch_client.models.dataset_settings import DatasetSettings +from jfjoch_client.models.detector_list import DetectorList +from jfjoch_client.models.detector_list_element import DetectorListElement +from jfjoch_client.models.detector_settings import DetectorSettings +from jfjoch_client.models.detector_timing import DetectorTiming +from jfjoch_client.models.file_writer_format import FileWriterFormat +from jfjoch_client.models.file_writer_settings import FileWriterSettings +from jfjoch_client.models.measurement_statistics import MeasurementStatistics +from ophyd import Staged +from ophyd_devices.utils.psi_device_base_utils import DeviceStatus + +from csaxs_bec.devices.jungfraujoch.eiger import Eiger +from csaxs_bec.devices.jungfraujoch.eiger_1_5m import Eiger1_5M +from csaxs_bec.devices.jungfraujoch.eiger_9m import Eiger9M + +if TYPE_CHECKING: # pragma no cover + from bec_lib.messages import FileMessage + +# @pytest.fixture(scope="function") +# def scan_worker_mock(scan_server_mock): +# scan_server_mock.device_manager.connector = mock.MagicMock() +# scan_worker = ScanWorker(parent=scan_server_mock) +# yield scan_worker + + +@pytest.fixture( + scope="function", + params=[(0.1, 1, 1, "line_scan"), (0.2, 2, 2, "time_scan"), (0.5, 5, 5, "acquire")], +) +def mock_scan_info(request, tmpdir): + exp_time, frames_per_trigger, num_points, scan_name = request.param + scan_info = ScanStatusMessage( + scan_id="test_id", + status="open", + scan_number=1, + scan_parameters={ + "exp_time": exp_time, + "frames_per_trigger": frames_per_trigger, + "system_config": {}, + }, + info={"file_components": (f"{tmpdir}/data/S00000/S000001", "h5")}, + num_points=num_points, + scan_name=scan_name, + ) + yield scan_info + + +@pytest.fixture(scope="function", params=[(1,), (2,)]) +def detector_list(request) -> Generator[DetectorList, None, None]: + """Fixture for the detector list.""" + current_id = request.param[0] + detector_list = DetectorList( + detectors=[ + DetectorListElement( + id=1, + description="EIGER 1.5M", + serial_number="123456", + base_ipv4_addr="192.168.0.1", + udp_interface_count=1, + nmodules=1, + width=512, + height=512, + pixel_size_mm=0.1, + readout_time_us=100, + min_frame_time_us=1000, + min_count_time_us=100, + type="EIGER", + ), + DetectorListElement( + id=2, + description="EIGER 8.5M (tmp)", + serial_number="123456", + base_ipv4_addr="192.168.0.1", + udp_interface_count=1, + nmodules=1, + width=512, + height=512, + pixel_size_mm=0.1, + readout_time_us=100, + min_frame_time_us=1000, + min_count_time_us=100, + type="EIGER", + ), + ], + current_id=current_id, + ) + yield detector_list + + +@pytest.fixture(scope="function") +def eiger_1_5m(mock_scan_info) -> Generator[Eiger1_5M, None, None]: + """Fixture for the Eiger 1.5M device.""" + name = "eiger_1_5m" + dev = Eiger1_5M(name=name, beam_center=(256, 256), detector_distance=100.0) + dev.scan_info.msg = mock_scan_info + yield dev + + +@pytest.fixture(scope="function") +def eiger_9m(mock_scan_info) -> Generator[Eiger9M, None, None]: + """Fixture for the Eiger 9M device. + Currently only on_connected is different for both devices, all other methods are the same.""" + name = "eiger_9m" + dev = Eiger9M(name=name) + dev.scan_info.msg = mock_scan_info + yield dev + + +@pytest.mark.parametrize("detector_state", ["Idle", "Inactive"]) +def test_eiger_1_5m_on_connected(eiger_1_5m, detector_list, detector_state): + """Test the on_connected logic of the Eiger detector.""" + eiger = eiger_1_5m + detector_id = 1 + with ( + mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop, + mock.patch.object( + eiger.jfj_client.api, "config_select_detector_get", return_value=detector_list + ), + mock.patch.object( + eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state=detector_state) + ), + mock.patch.object(eiger.jfj_client, "set_detector_settings") as mock_set_det, + mock.patch.object(eiger.jfj_client.api, "config_file_writer_put") as mock_file_writer, + mock.patch.object(eiger, "jfj_preview_client") as mock_jfj_preview_client, + ): + if detector_state != "Idle" or detector_list.current_id != detector_id: + with pytest.raises(RuntimeError): + eiger.on_connected() + mock_jfj_client_stop.assert_called_once() + assert mock_jfj_preview_client.call_count == 0 + else: + eiger.on_connected() + assert mock_set_det.call_args == mock.call( + DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10 + ) + assert mock_file_writer.call_args == mock.call( + file_writer_settings=FileWriterSettings( + overwrite=True, format=FileWriterFormat.NXMXVDS + ), + _request_timeout=10, + ) + mock_jfj_client_stop.assert_called_once() + assert mock_jfj_preview_client.connect.call_count == 1 + assert mock_jfj_preview_client.start.call_count == 1 + + +@pytest.mark.parametrize("detector_state", ["Idle", "Inactive"]) +def test_eiger_9m_on_connected(eiger_9m, detector_list, detector_state): + """Test the on_connected logic of the Eiger detector.""" + eiger = eiger_9m + detector_id = 2 + with ( + mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop, + mock.patch.object( + eiger.jfj_client.api, "config_select_detector_get", return_value=detector_list + ), + mock.patch.object( + eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state=detector_state) + ), + mock.patch.object(eiger.jfj_client, "set_detector_settings") as mock_set_det, + mock.patch.object(eiger.jfj_client.api, "config_file_writer_put") as mock_file_writer, + mock.patch.object(eiger, "jfj_preview_client") as mock_jfj_preview_client, + ): + if detector_state != "Idle" or detector_list.current_id != detector_id: + with pytest.raises(RuntimeError): + eiger.on_connected() + mock_jfj_client_stop.assert_called_once() + assert mock_jfj_preview_client.call_count == 0 + else: + eiger.on_connected() + assert mock_set_det.call_args == mock.call( + DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10 + ) + assert mock_file_writer.call_args == mock.call( + file_writer_settings=FileWriterSettings( + overwrite=True, format=FileWriterFormat.NXMXVDS + ), + _request_timeout=10, + ) + mock_jfj_client_stop.assert_called_once() + assert mock_jfj_preview_client.connect.call_count == 1 + assert mock_jfj_preview_client.start.call_count == 1 + + +@pytest.mark.timeout(20) +def test_eiger_on_stop(eiger_1_5m): + """Test the on_stop logic of the Eiger detector. This is equivalent for 9M and 1_5M.""" + eiger = eiger_1_5m + start_event = threading.Event() + stop_event = threading.Event() + + def tmp_task(): + start_event.set() + try: + while True: + time.sleep(0.1) + finally: + stop_event.set() + + eiger.task_handler.submit_task(tmp_task, run=True) + start_event.wait(timeout=5) # Wait for thread to start + + with mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop: + eiger.on_stop() + mock_jfj_client_stop.assert_called_once() + stop_event.wait(timeout=5) # Thread should be killed from task_handler + + +@pytest.mark.timeout(25) +@pytest.mark.parametrize("raise_timeout", [True, False]) +def test_eiger_on_complete(eiger_1_5m, raise_timeout): + """Test the on_complete logic of the Eiger detector. This is equivalent for 9M and 1_5M.""" + eiger = eiger_1_5m + + callback_completed_event = threading.Event() + + def _callback_complete(status: DeviceStatus): + if status.done: + callback_completed_event.set() + + unblock_wait_for_idle = threading.Event() + + def mock_wait_for_idle(timeout: int, request_timeout: float): + if unblock_wait_for_idle.wait(timeout): + if raise_timeout: + return False + return True + return False + + with ( + mock.patch.object(eiger.jfj_client, "wait_for_idle", side_effect=mock_wait_for_idle), + mock.patch.object( + eiger.jfj_client.api, + "statistics_data_collection_get", + return_value=MeasurementStatistics(run_number=1), + ), + ): + status = eiger.complete() + status.add_callback(_callback_complete) + assert status.done == False + assert status.success == False + assert eiger.file_event.get() is None + unblock_wait_for_idle.set() + if raise_timeout: + with pytest.raises(TimeoutError): + status.wait(timeout=10) + else: + status.wait(timeout=10) + assert status.done == True + assert status.success == False if raise_timeout else True + + +def test_eiger_file_event_callback(eiger_1_5m, tmp_path): + """Test the file_event callback of the Eiger detector. This is equivalent for 9M and 1_5M.""" + eiger = eiger_1_5m + test_file = tmp_path / "test_file.h5" + eiger._full_path = str(test_file) + assert eiger.file_event.get() is None + status = DeviceStatus(device=eiger, done=True, success=True) + eiger._file_event_callback(status) + file_msg: FileMessage = eiger.file_event.get() + assert file_msg.device_name == eiger.name + assert file_msg.file_path == str(test_file) + assert file_msg.done is True + assert file_msg.successful is True + assert file_msg.hinted_h5_entries == {"data": "entry/data/data"} + status = DeviceStatus(device=eiger, done=False, success=False) + eiger._file_event_callback(status) + file_msg: FileMessage = eiger.file_event.get() + assert file_msg.device_name == eiger.name + assert file_msg.file_path == str(test_file) + assert file_msg.done is False + assert file_msg.successful is False + assert file_msg.hinted_h5_entries == {"data": "entry/data/data"} + + +def test_eiger_on_sage(eiger_1_5m): + """Test the on_stage and on_unstage logic of the Eiger detector. This is equivalent for 9M and 1_5M.""" + eiger = eiger_1_5m + scan_msg = eiger.scan_info.msg + with ( + mock.patch.object(eiger.jfj_client, "wait_for_idle", return_value=True), + mock.patch.object(eiger.jfj_client, "start") as mock_start, + ): + eiger.stage() + assert ( + eiger._full_path + == f"{scan_msg.info['file_components'][0]}_{eiger.name}_master.{scan_msg.info['file_components'][1]}" + ) + file_msg: FileMessage = eiger.file_event.get() + assert file_msg.file_path == eiger._full_path + assert file_msg.done is False + assert file_msg.successful is False + assert file_msg.hinted_h5_entries == {"data": "entry/data/data"} + + data_settings = DatasetSettings( + image_time_us=int(scan_msg.scan_parameters["exp_time"] * 1e6), + ntrigger=int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"]), + file_prefix=os.path.relpath(eiger._full_path, start="/sls/x12sa/data").removesuffix( + "_master.h5" + ), + beam_x_pxl=eiger.beam_center[0], + beam_y_pxl=eiger.beam_center[1], + detector_distance_mm=eiger.detector_distance, + incident_energy_ke_v=12.0, # hardcoded at this moment as it is hardcoded in the Eiger implementation + ) + assert mock_start.call_args == mock.call(settings=data_settings) + assert eiger.staged is Staged.yes diff --git a/tests/tests_devices/test_eiger9m_csaxs.py b/tests/tests_devices/test_eiger9m_csaxs.py deleted file mode 100644 index 43e3b86..0000000 --- a/tests/tests_devices/test_eiger9m_csaxs.py +++ /dev/null @@ -1,444 +0,0 @@ -# pylint: skip-file -import threading -from unittest import mock - -import ophyd -import pytest -from bec_lib import messages -from bec_lib.endpoints import MessageEndpoints -from bec_server.device_server.tests.utils import DMMock -from ophyd_devices.tests.utils import MockPV - -from csaxs_bec.devices.epics.eiger9m_csaxs import Eiger9McSAXS -from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs - - -@pytest.fixture(scope="function") -def mock_det(): - name = "eiger" - prefix = "X12SA-ES-EIGER9M:" - dm = DMMock() - with mock.patch.object(dm, "connector"): - with ( - mock.patch("ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"), - mock.patch( - "ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config" - ), - ): - with mock.patch.object(ophyd, "cl") as mock_cl: - mock_cl.get_pv = MockPV - mock_cl.thread_class = threading.Thread - with mock.patch.object(Eiger9McSAXS, "_init"): - det = Eiger9McSAXS(name=name, prefix=prefix, device_manager=dm) - patch_dual_pvs(det) - det.TIMEOUT_FOR_SIGNALS = 0.1 - yield det - - -def test_init(): - """Test the _init function:""" - name = "eiger" - prefix = "X12SA-ES-EIGER9M:" - dm = DMMock() - with mock.patch.object(dm, "connector"): - with ( - mock.patch("ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"), - mock.patch( - "ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config" - ), - ): - with mock.patch.object(ophyd, "cl") as mock_cl: - mock_cl.get_pv = MockPV - with ( - mock.patch( - "csaxs_bec.devices.epics.eiger9m_csaxs.Eiger9MSetup.initialize_default_parameter" - ) as mock_default, - mock.patch( - "csaxs_bec.devices.epics.eiger9m_csaxs.Eiger9MSetup.initialize_detector" - ) as mock_init_det, - mock.patch( - "csaxs_bec.devices.epics.eiger9m_csaxs.Eiger9MSetup.initialize_detector_backend" - ) as mock_init_backend, - ): - Eiger9McSAXS(name=name, prefix=prefix, device_manager=dm) - mock_default.assert_called_once() - mock_init_det.assert_called_once() - mock_init_backend.assert_called_once() - - -@pytest.mark.parametrize( - "trigger_source, detector_state, expected_exception", [(2, 1, True), (2, 0, False)] -) -def test_initialize_detector(mock_det, trigger_source, detector_state, expected_exception): - """Test the _init function: - - This includes testing the functions: - - _init_detector - - _stop_det - - _set_trigger - --> Testing the filewriter is done in test_init_filewriter - - Validation upon setting the correct PVs - - """ - mock_det.cam.detector_state._read_pv.mock_data = detector_state - if expected_exception: - with pytest.raises(Exception): - mock_det.timeout = 0.1 - mock_det.custom_prepare.initialize_detector() - else: - mock_det.custom_prepare.initialize_detector() # call the method you want to test - assert mock_det.cam.acquire.get() == 0 - assert mock_det.cam.detector_state.get() == detector_state - assert mock_det.cam.trigger_mode.get() == trigger_source - - -def test_trigger(mock_det): - """Test the trigger function: - Validate that trigger calls the custom_prepare.on_trigger() function - """ - with mock.patch.object(mock_det.custom_prepare, "on_trigger") as mock_on_trigger: - mock_det.trigger() - mock_on_trigger.assert_called_once() - - -@pytest.mark.parametrize( - "readout_time, expected_value", [(1e-3, 3e-3), (3e-3, 3e-3), (5e-3, 5e-3), (None, 3e-3)] -) -def test_update_readout_time(mock_det, readout_time, expected_value): - if readout_time is None: - mock_det.custom_prepare.update_readout_time() - assert mock_det.readout_time == expected_value - else: - mock_det.scaninfo.readout_time = readout_time - mock_det.custom_prepare.update_readout_time() - assert mock_det.readout_time == expected_value - - -@pytest.mark.parametrize( - "eacc, exp_url, daq_status, daq_cfg, expected_exception", - [ - ("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_user_id": 12543}, False), - ("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_user_id": 15421}, False), - ("e12345", "http://xbl-daq-29:5000", {"state": "BUSY"}, {"writer_user_id": 15421}, True), - ("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_ud": 12345}, True), - ], -) -def test_initialize_detector_backend( - mock_det, eacc, exp_url, daq_status, daq_cfg, expected_exception -): - """Test self.custom_prepare.initialize_detector_backend (std daq in this case) - - This includes testing the functions: - - - _update_service_config - - Validation upon checking set values in mocked std_daq instance - """ - with mock.patch("csaxs_bec.devices.epics.eiger9m_csaxs.StdDaqClient") as mock_std_daq: - instance = mock_std_daq.return_value - instance.stop_writer.return_value = None - instance.get_status.return_value = daq_status - instance.get_config.return_value = daq_cfg - mock_det.scaninfo.username = eacc - # scaninfo.username.return_value = eacc - if expected_exception: - with pytest.raises(Exception): - mock_det.timeout = 0.1 - mock_det.custom_prepare.initialize_detector_backend() - else: - mock_det.custom_prepare.initialize_detector_backend() - - instance.stop_writer.assert_called_once() - instance.get_status.assert_called() - instance.set_config.assert_called_once_with(daq_cfg) - - -@pytest.mark.parametrize( - "scaninfo, daq_status, daq_cfg, detector_state, stopped, expected_exception", - [ - ( - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "filepath": "test.h5", - "scan_id": "123", - "mokev": 12.4, - }, - {"state": "READY"}, - {"writer_user_id": 12543}, - 5, - False, - False, - ), - ( - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "filepath": "test.h5", - "scan_id": "123", - "mokev": 12.4, - }, - {"state": "BUSY"}, - {"writer_user_id": 15421}, - 5, - False, - False, - ), - ( - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "filepath": "test.h5", - "scan_id": "123", - "mokev": 18.4, - }, - {"state": "READY"}, - {"writer_user_id": 12345}, - 4, - False, - True, - ), - ], -) -def test_stage( - mock_det, scaninfo, daq_status, daq_cfg, detector_state, stopped, expected_exception -): - with ( - mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq, - mock.patch.object( - mock_det.custom_prepare, "publish_file_location" - ) as mock_publish_file_location, - ): - mock_std_daq.stop_writer.return_value = None - mock_std_daq.get_status.return_value = daq_status - mock_std_daq.get_config.return_value = daq_cfg - mock_det.scaninfo.num_points = scaninfo["num_points"] - mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] - mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"] - # TODO consider putting energy as variable in scaninfo - mock_det.device_manager.add_device("mokev", value=12.4) - mock_det.cam.beam_energy.put(scaninfo["mokev"]) - mock_det.stopped = stopped - mock_det.cam.detector_state._read_pv.mock_data = detector_state - with mock.patch.object(mock_det.custom_prepare, "prepare_data_backend") as mock_prep_fw: - mock_det.filepath.set(scaninfo["filepath"]).wait() - if expected_exception: - with pytest.raises(Exception): - mock_det.timeout = 0.1 - mock_det.stage() - else: - mock_det.stage() - mock_prep_fw.assert_called_once() - # Check _prep_det - assert mock_det.cam.num_images.get() == int( - scaninfo["num_points"] * scaninfo["frames_per_trigger"] - ) - assert mock_det.cam.num_frames.get() == 1 - - mock_publish_file_location.assert_called_with(done=False, successful=False) - assert mock_det.cam.acquire.get() == 1 - - -@pytest.mark.parametrize( - "scaninfo, daq_status, expected_exception", - [ - ( - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "filepath": "test.h5", - "scan_id": "123", - }, - {"state": "BUSY", "acquisition": {"state": "WAITING_IMAGES"}}, - False, - ), - ( - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "filepath": "test.h5", - "scan_id": "123", - }, - {"state": "BUSY", "acquisition": {"state": "WAITING_IMAGES"}}, - False, - ), - ( - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "filepath": "test.h5", - "scan_id": "123", - }, - {"state": "BUSY", "acquisition": {"state": "ERROR"}}, - True, - ), - ], -) -def test_prepare_detector_backend(mock_det, scaninfo, daq_status, expected_exception): - with ( - mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq, - mock.patch.object(mock_det.custom_prepare, "filepath_exists") as mock_file_path_exists, - mock.patch.object(mock_det.custom_prepare, "stop_detector_backend") as mock_stop_backend, - mock.patch.object(mock_det, "scaninfo"), - ): - mock_std_daq.start_writer_async.return_value = None - mock_std_daq.get_status.return_value = daq_status - mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"] - mock_det.scaninfo.num_points = scaninfo["num_points"] - mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] - - if expected_exception: - with pytest.raises(Exception): - mock_det.timeout = 0.1 - mock_det.custom_prepare.prepare_data_backend() - mock_file_path_exists.assert_called_once() - assert mock_stop_backend.call_count == 2 - - else: - mock_det.custom_prepare.prepare_data_backend() - mock_file_path_exists.assert_called_once() - mock_stop_backend.assert_called_once() - - daq_writer_call = { - "output_file": scaninfo["filepath"], - "n_images": int(scaninfo["num_points"] * scaninfo["frames_per_trigger"]), - } - mock_std_daq.start_writer_async.assert_called_with(daq_writer_call) - - -@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)]) -def test_complete(mock_det, stopped, expected_exception): - with ( - mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, - mock.patch.object( - mock_det.custom_prepare, "publish_file_location" - ) as mock_publish_file_location, - ): - mock_det.stopped = stopped - if expected_exception: - mock_det.complete() - assert mock_det.stopped is True - else: - mock_det.complete() - mock_finished.assert_called_once() - mock_publish_file_location.assert_called_with(done=True, successful=True) - assert mock_det.stopped is False - - -def test_stop_detector_backend(mock_det): - with mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq: - mock_std_daq.stop_writer.return_value = None - mock_det.std_client = mock_std_daq - mock_det.custom_prepare.stop_detector_backend() - mock_std_daq.stop_writer.assert_called_once() - - -@pytest.mark.parametrize( - "scaninfo", - [ - ({"filepath": "test.h5", "successful": True, "done": False, "scan_id": "123"}), - ({"filepath": "test.h5", "successful": False, "done": True, "scan_id": "123"}), - ], -) -def test_publish_file_location(mock_det, scaninfo): - mock_det.scaninfo.scan_id = scaninfo["scan_id"] - mock_det.filepath.set(scaninfo["filepath"]).wait() - mock_det.custom_prepare.publish_file_location( - done=scaninfo["done"], successful=scaninfo["successful"] - ) - if scaninfo["successful"] is None: - msg = messages.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"]) - else: - msg = messages.FileMessage( - file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"] - ) - expected_calls = [ - mock.call( - MessageEndpoints.public_file(scaninfo["scan_id"], mock_det.name), - msg, - pipe=mock_det.connector.pipeline.return_value, - ), - mock.call( - MessageEndpoints.file_event(mock_det.name), - msg, - pipe=mock_det.connector.pipeline.return_value, - ), - ] - assert mock_det.connector.set_and_publish.call_args_list == expected_calls - - -def test_stop(mock_det): - with ( - mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det, - mock.patch.object( - mock_det.custom_prepare, "stop_detector_backend" - ) as mock_stop_detector_backend, - ): - mock_det.stop() - mock_stop_det.assert_called_once() - mock_stop_detector_backend.assert_called_once() - assert mock_det.stopped is True - - -@pytest.mark.parametrize( - "stopped, scaninfo, cam_state, daq_status, expected_exception", - [ - ( - False, - {"num_points": 500, "frames_per_trigger": 4}, - 0, - {"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 2000}}}, - False, - ), - ( - False, - {"num_points": 500, "frames_per_trigger": 4}, - 0, - {"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 1999}}}, - True, - ), - ( - False, - {"num_points": 500, "frames_per_trigger": 1}, - 1, - {"acquisition": {"state": "READY", "stats": {"n_write_completed": 500}}}, - True, - ), - ( - False, - {"num_points": 500, "frames_per_trigger": 1}, - 0, - {"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 500}}}, - False, - ), - ], -) -def test_finished(mock_det, stopped, cam_state, daq_status, scaninfo, expected_exception): - with ( - mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq, - mock.patch.object(mock_det.custom_prepare, "stop_detector_backend") as mock_stop_backend, - mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det, - ): - mock_std_daq.get_status.return_value = daq_status - mock_det.cam.acquire._read_pv.mock_state = cam_state - mock_det.scaninfo.num_points = scaninfo["num_points"] - mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] - if expected_exception: - with pytest.raises(Exception): - mock_det.timeout = 0.1 - mock_det.custom_prepare.finished() - assert mock_det.stopped is stopped - else: - mock_det.custom_prepare.finished() - if stopped: - assert mock_det.stopped is stopped - - mock_stop_backend.assert_called() - mock_stop_det.assert_called_once() diff --git a/tests/tests_devices/test_mcs_card.py b/tests/tests_devices/test_mcs_card.py index 19447c3..034a80a 100644 --- a/tests/tests_devices/test_mcs_card.py +++ b/tests/tests_devices/test_mcs_card.py @@ -96,11 +96,9 @@ def test_mcs_card_csaxs_on_connected(mock_mcs_csaxs): assert mcs.read_mode.get() == READMODE.PASSIVE assert mcs.acquire_mode.get() == ACQUIREMODE.MCS - with mock.patch.object(mcs.current_channel, "subscribe") as mock_cur_ch_subscribe: - with mock.patch.object(mcs.counters.mca1, "subscribe") as mock_mca_subscribe: - mcs.on_connected() - assert mock_cur_ch_subscribe.call_args == mock.call(mcs._progress_update, run=False) - assert mock_mca_subscribe.call_args == mock.call(mcs._on_counter_update, run=False) + with mock.patch.object(mcs.counters.mca1, "subscribe") as mock_mca_subscribe: + mcs.on_connected() + assert mock_mca_subscribe.call_args == mock.call(mcs._on_counter_update, run=False) def test_mcs_card_csaxs_stage(mock_mcs_csaxs):