From 90cd05e68ea7640a6bc1a8b98d47f9edc7a7f3a0 Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 16 Nov 2023 18:29:47 +0100 Subject: [PATCH] fix: removed sls_detector_baseclass, add psi_detector_base, fixed tests and eiger9m_csaxs --- ophyd_devices/epics/devices/__init__.py | 3 + ophyd_devices/epics/devices/eiger9m_csaxs.py | 424 +----------------- .../epics/devices/psi_detector_base.py | 326 ++++++++++++++ .../epics/devices/sls_detector_baseclass.py | 362 --------------- tests/test_eiger9m_csaxs.py | 8 +- 5 files changed, 352 insertions(+), 771 deletions(-) create mode 100644 ophyd_devices/epics/devices/psi_detector_base.py delete mode 100644 ophyd_devices/epics/devices/sls_detector_baseclass.py diff --git a/ophyd_devices/epics/devices/__init__.py b/ophyd_devices/epics/devices/__init__.py index 75eb22e..d645549 100644 --- a/ophyd_devices/epics/devices/__init__.py +++ b/ophyd_devices/epics/devices/__init__.py @@ -25,7 +25,10 @@ from ophyd.quadem import QuadEM # cSAXS from .epics_motor_ex import EpicsMotorEx from .mcs_csaxs import McsCsaxs +from .psi_detector_base import PSIDetectorBase, CustomDetectorMixin from .eiger9m_csaxs import Eiger9McSAXS from .pilatus_csaxs import PilatuscSAXS from .falcon_csaxs import FalconcSAXS from .DelayGeneratorDG645 import DelayGeneratorDG645 + +# from .psi_detector_base import PSIDetectorBase, CustomDetectorMixin diff --git a/ophyd_devices/epics/devices/eiger9m_csaxs.py b/ophyd_devices/epics/devices/eiger9m_csaxs.py index 6fcdc08..d362c7a 100644 --- a/ophyd_devices/epics/devices/eiger9m_csaxs.py +++ b/ophyd_devices/epics/devices/eiger9m_csaxs.py @@ -1,31 +1,23 @@ import enum import time -import threading import numpy as np import os -from typing import Any, List, Type +from typing import Any from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV from ophyd import Device from ophyd import ADComponent as ADCpt -from ophyd.device import Staged from std_daq_client import StdDaqClient -from bec_lib import messages, MessageEndpoints, threadlocked, bec_logger -from bec_lib.bec_service import SERVICE_CONFIG -from bec_lib.devicemanager import DeviceStatus -from bec_lib.file_utils import FileWriterMixin +from bec_lib.core import BECMessage, MessageEndpoints, threadlocked +from bec_lib.core import bec_logger -from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin -from ophyd_devices.utils import bec_utils +from ophyd_devices.epics.devices.psi_detector_base import PSIDetectorBase, CustomDetectorMixin logger = bec_logger.logger -# Specify here the minimum readout time for the detector -MIN_READOUT = 3e-3 - class EigerError(Exception): """Base class for exceptions in this module.""" @@ -39,128 +31,6 @@ class EigerTimeoutError(EigerError): pass -class EigerInitError(EigerError): - """Raised when initiation of the device class fails, - due to missing device manager or not started in sim_mode.""" - - pass - - -class CustomDetectorMixin: - def __init__(self, parent: Device = None, *args, **kwargs) -> None: - self.parent = parent - - def initialize_default_parameter(self) -> None: - """ - Init parameters for the detector - - Raises (optional): - DetectorTimeoutError: if detector cannot be initialized - """ - pass - - def initialize_detector(self) -> None: - """ - Init parameters for the detector - - Raises (optional): - DetectorTimeoutError: if detector cannot be initialized - """ - pass - - def initialize_detector_backend(self) -> None: - """ - Init parameters for teh detector backend (filewriter) - - Raises (optional): - DetectorTimeoutError: if filewriter cannot be initialized - """ - pass - - def prepare_detector(self) -> None: - """ - Prepare detector for the scan - """ - pass - - def prepare_data_backend(self) -> None: - """ - Prepare the data backend for the scan - """ - pass - - def stop_detector(self) -> None: - """ - Stop the detector - """ - pass - - def stop_detector_backend(self) -> None: - """ - Stop the detector backend - """ - pass - - def on_trigger(self) -> None: - """ - Specify actions to be executed upon receiving trigger signal - """ - pass - - def pre_scan(self) -> None: - """ - Specify actions to be executed right before a scan - - BEC calls pre_scan just before execution of the scan core. - It is convenient to execute time critical features of the detector, - e.g. arming it, but it is recommended to keep this function as short/fast as possible. - """ - pass - - def finished(self) -> None: - """ - Specify actions to be executed during unstage - - This may include checks if acquisition was succesful - - Raises (optional): - DetectorTimeoutError: if detector cannot be stopped - """ - - def wait_for_signals( - self, - signal_conditions: list, - timeout: float, - check_stopped: bool = False, - interval: float = 0.05, - all_signals: bool = False, - ) -> bool: - """Wait for signals to reach a certain condition - - Args: - signal_conditions (tuple): tuple of (get_current_state, condition) functions - timeout (float): timeout in seconds - interval (float): interval in seconds - all_signals (bool): True if all signals should be True, False if any signal should be True - Returns: - bool: True if all signals are in the desired state, False if timeout is reached - """ - timer = 0 - while True: - checks = [ - get_current_state() == condition - for get_current_state, condition in signal_conditions - ] - if (all_signals and all(checks)) or (not all_signals and any(checks)): - return True - if check_stopped == True and self.parent._stopped == True: - return False - if timer > timeout: - return False - time.sleep(interval) - timer += interval - - class Eiger9MSetup(CustomDetectorMixin): """Eiger setup class @@ -226,6 +96,8 @@ class Eiger9MSetup(CustomDetectorMixin): def update_std_cfg(self, cfg_key: str, value: Any) -> None: """Update std_daq config with new e-account for the current beamtime""" + + # Load config from client and check old value cfg = self.std_client.get_config() old_value = cfg.get(cfg_key) if old_value is None: @@ -236,6 +108,8 @@ class Eiger9MSetup(CustomDetectorMixin): raise EigerError( f"Type of new value {type(value)}:{value} does not match old value {type(old_value)}:{old_value}" ) + + # Update config with new value and send back to client cfg.update({cfg_key: value}) logger.debug(cfg) self.std_client.set_config(cfg) @@ -243,8 +117,11 @@ class Eiger9MSetup(CustomDetectorMixin): def stop_detector(self) -> None: """Stop the detector and wait for the proper status message""" + # Stop detector self.parent.cam.acquire.put(0) + + # Check if detector returned in idle state signal_conditions = [ ( lambda: self.parent.cam.detector_state.read()[self.parent.cam.detector_state.name][ @@ -260,7 +137,7 @@ class Eiger9MSetup(CustomDetectorMixin): timeout=self.parent.timeout - self.parent.timeout // 2, all_signals=False, ): - # Retry stop detector + # Retry stop detector and wait for remaining time self.parent.cam.acquire.put(0) if not self.wait_for_signals( signal_conditions=signal_conditions, @@ -287,6 +164,8 @@ class Eiger9MSetup(CustomDetectorMixin): Threshold might be in ev or keV """ + + # get current beam energy from device manageer mokev = self.parent.device_manager.devices.mokev.obj.read()[ self.parent.device_manager.devices.mokev.name ]["value"] @@ -312,10 +191,14 @@ class Eiger9MSetup(CustomDetectorMixin): def set_acquisition_params(self) -> None: """Set acquisition parameters for the detector""" + + # Set number of images and frames (frames is for internal burst of detector) self.parent.cam.num_images.put( int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger) ) self.parent.cam.num_frames.put(1) + + # Update the readout time of the detector self.update_readout_time() def prepare_data_backend(self) -> None: @@ -491,253 +374,10 @@ class DetectorState(enum.IntEnum): ABORTED = 10 -class PSIDetectorBase(Device): - """ - Abstract base class for SLS detectors - - Args: - prefix (str): EPICS PV prefix for component (optional) - name (str): name of the device, as will be reported via read() - kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal - omitted -> readout ignored for read 'ophydobj.read()' - normal -> readout for read - config -> config parameter for 'ophydobj.read_configuration()' - hinted -> which attribute is readout for read - read_attrs (list): sequence of attribute names to read - configuration_attrs (list): sequence of attribute names via config_parameters - parent (object): instance of the parent device - device_manager (object): bec device manager - sim_mode (bool): simulation mode, if True, no device manager is required - **kwargs: keyword arguments - - attributes: lazy_wait_for_connection : bool - """ - - custom_prepare_cls = CustomDetectorMixin - # Specify which functions are revealed to the user in BEC client - USER_ACCESS = [ - "describe", - ] - - # cam = ADCpt(SLSDetectorCam, "cam1:") - - def __init__( - self, - prefix="", - *, - name, - kind=None, - read_attrs=None, - configuration_attrs=None, - parent=None, - device_manager=None, - sim_mode=False, - **kwargs, - ): - super().__init__( - prefix=prefix, - name=name, - kind=kind, - read_attrs=read_attrs, - configuration_attrs=configuration_attrs, - parent=parent, - **kwargs, - ) - if device_manager is None and not sim_mode: - raise EigerInitError( - f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add DeviceManager to initialization or init with sim_mode=True" - ) - # sim_mode True allows the class to be started without BEC running - # Init variables - self.sim_mode = sim_mode - self._lock = threading.RLock() - self._stopped = False - self.name = name - self.service_cfg = None - self.std_client = None - self.scaninfo = None - self.filewriter = None - self.readout_time_min = MIN_READOUT - self.timeout = 5 - self.wait_for_connection(all_signals=True) - # Init custom prepare class with BL specific logic - self.custom_prepare = self.custom_prepare_cls( - parent=self, **kwargs - ) # Eiger9MSetup(parent=self, **kwargs) - if not sim_mode: - self._update_service_config() - self.device_manager = device_manager - else: - self.device_manager = bec_utils.DMMock() - base_path = kwargs["basepath"] if "basepath" in kwargs else "~/Data10/" - self.service_cfg = {"base_path": os.path.expanduser(base_path)} - self._producer = self.device_manager.producer - self._update_scaninfo() - self._update_filewriter() - self._init() - - def _update_filewriter(self) -> None: - """Update filewriter with service config""" - self.filewriter = FileWriterMixin(self.service_cfg) - - def _update_scaninfo(self) -> None: - """Update scaninfo from BecScaninfoMixing - This depends on device manager and operation/sim_mode - """ - self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode) - self.scaninfo.load_scan_metadata() - - def _update_service_config(self) -> None: - """Update service config from BEC service config""" - self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] - - def _init(self) -> None: - """Initialize detector, filewriter and set default parameters""" - self.custom_prepare.initialize_default_parameter() - self.custom_prepare.initialize_detector() - self.custom_prepare.initialize_detector_backend() - - def stage(self) -> List[object]: - """ - Stage device in preparation for a scan - - Internal Calls: - - _prep_backend : prepare detector filewriter for measurement - - _prep_detector : prepare detector for measurement - - Returns: - List(object): list of objects that were staged - - """ - # Method idempotent, should rais ;obj;'RedudantStaging' if staged twice - if self._staged != Staged.no: - return super().stage() - - # Reset flag for detector stopped - self._stopped = False - # Load metadata of the scan - self.scaninfo.load_scan_metadata() - # Prepare detector and file writer - self.custom_prepare.prepare_data_backend() - self.custom_prepare.prepare_detector() - state = False - self.custom_prepare.publish_file_location(done=state) - self.custom_prepare.arm_acquisition() - # At the moment needed bc signal is not reliable, BEC too fast - time.sleep(0.05) - return super().stage() - - def set_trigger(self, trigger_source: TriggerSource) -> None: - """Set trigger source for the detector. - Check the TriggerSource enum for possible values - - Args: - trigger_source (TriggerSource): Trigger source for the detector - - """ - value = trigger_source - self.cam.trigger_mode.put(value) - - def _publish_file_location(self, done: bool = False, successful: bool = None) -> None: - """Publish the filepath to REDIS. - We publish two events here: - - file_event: event for the filewriter - - public_file: event for any secondary service (e.g. radial integ code) - - Args: - done (bool): True if scan is finished - successful (bool): True if scan was successful - - """ - pipe = self._producer.pipeline() - if successful is None: - msg = messages.FileMessage(file_path=self.filepath, done=done) - else: - msg = messages.FileMessage(file_path=self.filepath, done=done, successful=successful) - self._producer.set_and_publish( - MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe - ) - self._producer.set_and_publish( - MessageEndpoints.file_event(self.name), msg.dumps(), pipe=pipe - ) - pipe.execute() - - # TODO function for abstract class? - def _arm_acquisition(self) -> None: - """Arm Eiger detector for acquisition""" - timer = 0 - self.cam.acquire.put(1) - while True: - det_ctrl = self.cam.detector_state.read()[self.cam.detector_state.name]["value"] - if det_ctrl == DetectorState.RUNNING: - break - if self._stopped == True: - break - time.sleep(0.01) - timer += 0.01 - if timer > 5: - self.stop() - raise EigerTimeoutError("Failed to arm the acquisition. IOC did not update.") - - # TODO function for abstract class? - def trigger(self) -> DeviceStatus: - """Trigger the detector, called from BEC.""" - self.custom_prepare.on_trigger() - return super().trigger() - - def unstage(self) -> List[object]: - """ - Unstage device in preparation for a scan - - Returns directly if self._stopped, - otherwise checks with self._finished - if data acquisition on device finished (an was successful) - - Internal Calls: - - custom_prepare.check_scanID : check if scanID changed or detector stopped - - custom_prepare.finished : check if device finished acquisition (succesfully) - - custom_prepare.publish_file_location : publish file location to bec - - Returns: - List(object): list of objects that were unstaged - """ - self.custom_prepare.check_scanID() - if self._stopped == True: - return super().unstage() - self.custom_prepare.finished() - state = True - self.custom_prepare.publish_file_location(done=state, successful=state) - self._stopped = False - return super().unstage() - - def stop(self, *, success=False) -> None: - """ - Stop the scan, with camera and file writer - - Internal Calls: - - custom_prepare.stop_detector : stop detector - - custom_prepare.stop_backend : stop detector filewriter - """ - self.custom_prepare.stop_detector() - self.custom_prepare.stop_detector_backend() - super().stop(success=success) - self._stopped = True - - # def set_trigger(self, trigger_source: TriggerSource) -> None: - # """Set trigger source for the detector. - # Check the TriggerSource enum for possible values - - # Args: - # trigger_source (TriggerSource): Trigger source for the detector - - # """ - # value = trigger_source - # self.cam.trigger_mode.put(value) - - class Eiger9McSAXS(PSIDetectorBase): custom_prepare_cls = Eiger9MSetup cam = ADCpt(SLSDetectorCam, "cam1:") + MIN_READOUT = 3e-3 def __init__( self, @@ -776,31 +416,5 @@ class Eiger9McSAXS(PSIDetectorBase): self.cam.trigger_mode.put(value) -# class Eiger9McSAXS(Eiger9M): -# def __init__( -# self, -# prefix="", -# *, -# name, -# kind=None, -# read_attrs=None, -# configuration_attrs=None, -# parent=None, -# device_manager=None, -# sim_mode=False, -# **kwargs, -# ): -# super().__init__( -# prefix=prefix, -# name=name, -# kind=kind, -# read_attrs=read_attrs, -# configuration_attrs=configuration_attrs, -# parent=parent, -# **kwargs, -# ) -# # self.custom_prepare = Eiger9MSetup(parent=self, **kwargs) - - if __name__ == "__main__": eiger = Eiger9McSAXS(name="eiger", prefix="X12SA-ES-EIGER9M:", sim_mode=True) diff --git a/ophyd_devices/epics/devices/psi_detector_base.py b/ophyd_devices/epics/devices/psi_detector_base.py new file mode 100644 index 0000000..6c77db9 --- /dev/null +++ b/ophyd_devices/epics/devices/psi_detector_base.py @@ -0,0 +1,326 @@ +import time +import threading +from bec_lib.core.devicemanager import DeviceStatus +import os + +from typing import List + +from ophyd import Device +from ophyd.device import Staged + +from bec_lib.core.file_utils import FileWriterMixin +from bec_lib.core.bec_service import SERVICE_CONFIG + +from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin +from ophyd_devices.utils import bec_utils + +# Specify here the minimum readout time for the detector +MIN_READOUT = 3e-3 + + +class DetectorInitError(Exception): + """Raised when initiation of the device class fails, + due to missing device manager or not started in sim_mode.""" + + pass + + +class CustomDetectorMixin: + def __init__(self, parent: Device = None, *args, **kwargs) -> None: + self.parent = parent + + def initialize_default_parameter(self) -> None: + """ + Init parameters for the detector + + Raises (optional): + DetectorTimeoutError: if detector cannot be initialized + """ + pass + + def initialize_detector(self) -> None: + """ + Init parameters for the detector + + Raises (optional): + DetectorTimeoutError: if detector cannot be initialized + """ + pass + + def initialize_detector_backend(self) -> None: + """ + Init parameters for teh detector backend (filewriter) + + Raises (optional): + DetectorTimeoutError: if filewriter cannot be initialized + """ + pass + + def prepare_detector(self) -> None: + """ + Prepare detector for the scan + """ + pass + + def prepare_data_backend(self) -> None: + """ + Prepare the data backend for the scan + """ + pass + + def stop_detector(self) -> None: + """ + Stop the detector + """ + pass + + def stop_detector_backend(self) -> None: + """ + Stop the detector backend + """ + pass + + def on_trigger(self) -> None: + """ + Specify actions to be executed upon receiving trigger signal + """ + pass + + def pre_scan(self) -> None: + """ + Specify actions to be executed right before a scan + + BEC calls pre_scan just before execution of the scan core. + It is convenient to execute time critical features of the detector, + e.g. arming it, but it is recommended to keep this function as short/fast as possible. + """ + pass + + def finished(self) -> None: + """ + Specify actions to be executed during unstage + + This may include checks if acquisition was succesful + + Raises (optional): + DetectorTimeoutError: if detector cannot be stopped + """ + + def wait_for_signals( + self, + signal_conditions: list, + timeout: float, + check_stopped: bool = False, + interval: float = 0.05, + all_signals: bool = False, + ) -> bool: + """Wait for signals to reach a certain condition + + Args: + signal_conditions (tuple): tuple of (get_current_state, condition) functions + timeout (float): timeout in seconds + interval (float): interval in seconds + all_signals (bool): True if all signals should be True, False if any signal should be True + Returns: + bool: True if all signals are in the desired state, False if timeout is reached + """ + timer = 0 + while True: + checks = [ + get_current_state() == condition + for get_current_state, condition in signal_conditions + ] + if (all_signals and all(checks)) or (not all_signals and any(checks)): + return True + if check_stopped == True and self.parent._stopped == True: + return False + if timer > timeout: + return False + time.sleep(interval) + timer += interval + + +class PSIDetectorBase(Device): + """ + Abstract base class for SLS detectors + + Class attributes: + custom_prepare_cls (object): class for custom prepare logic (BL specific) + Min_readout (float): minimum readout time for detector + + Args: + prefix (str): EPICS PV prefix for component (optional) + name (str): name of the device, as will be reported via read() + kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal + omitted -> readout ignored for read 'ophydobj.read()' + normal -> readout for read + config -> config parameter for 'ophydobj.read_configuration()' + hinted -> which attribute is readout for read + read_attrs (list): sequence of attribute names to read + configuration_attrs (list): sequence of attribute names via config_parameters + parent (object): instance of the parent device + device_manager (object): bec device manager + sim_mode (bool): simulation mode, if True, no device manager is required + **kwargs: keyword arguments + + attributes: lazy_wait_for_connection : bool + """ + + custom_prepare_cls = CustomDetectorMixin + + MIN_READOUT = 1e-3 + + # Specify which functions are revealed to the user in BEC client + USER_ACCESS = [ + "describe", + ] + + def __init__( + self, + prefix="", + *, + name, + kind=None, + read_attrs=None, + configuration_attrs=None, + parent=None, + device_manager=None, + sim_mode=False, + **kwargs, + ): + super().__init__( + prefix=prefix, + name=name, + kind=kind, + read_attrs=read_attrs, + configuration_attrs=configuration_attrs, + parent=parent, + **kwargs, + ) + if device_manager is None and not sim_mode: + raise DetectorInitError( + f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add DeviceManager to initialization or init with sim_mode=True" + ) + # sim_mode True allows the class to be started without BEC running + # Init variables + self.sim_mode = sim_mode + self._lock = threading.RLock() + self._stopped = False + self.name = name + self.service_cfg = None + self.std_client = None + self.scaninfo = None + self.filewriter = None + self.readout_time_min = MIN_READOUT + self.timeout = 5 + self.wait_for_connection(all_signals=True) + + # Init custom prepare class with BL specific logic + self.custom_prepare = self.custom_prepare_cls( + parent=self, **kwargs + ) # Eiger9MSetup(parent=self, **kwargs) + if not sim_mode: + self._update_service_config() + self.device_manager = device_manager + else: + self.device_manager = bec_utils.DMMock() + base_path = kwargs["basepath"] if "basepath" in kwargs else "~/Data10/" + self.service_cfg = {"base_path": os.path.expanduser(base_path)} + self._producer = self.device_manager.producer + self._update_scaninfo() + self._update_filewriter() + self._init() + + def _update_filewriter(self) -> None: + """Update filewriter with service config""" + self.filewriter = FileWriterMixin(self.service_cfg) + + def _update_scaninfo(self) -> None: + """Update scaninfo from BecScaninfoMixing + This depends on device manager and operation/sim_mode + """ + self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode) + self.scaninfo.load_scan_metadata() + + def _update_service_config(self) -> None: + """Update service config from BEC service config""" + self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] + + def _init(self) -> None: + """Initialize detector, filewriter and set default parameters""" + self.custom_prepare.initialize_default_parameter() + self.custom_prepare.initialize_detector() + self.custom_prepare.initialize_detector_backend() + + def stage(self) -> List[object]: + """ + Stage device in preparation for a scan + + Internal Calls: + - _prep_backend : prepare detector filewriter for measurement + - _prep_detector : prepare detector for measurement + + Returns: + List(object): list of objects that were staged + + """ + # Method idempotent, should rais ;obj;'RedudantStaging' if staged twice + if self._staged != Staged.no: + return super().stage() + + # Reset flag for detector stopped + self._stopped = False + # Load metadata of the scan + self.scaninfo.load_scan_metadata() + # Prepare detector and file writer + self.custom_prepare.prepare_data_backend() + self.custom_prepare.prepare_detector() + state = False + self.custom_prepare.publish_file_location(done=state) + self.custom_prepare.arm_acquisition() + # At the moment needed bc signal is not reliable, BEC too fast + time.sleep(0.05) + return super().stage() + + def trigger(self) -> DeviceStatus: + """Trigger the detector, called from BEC.""" + self.custom_prepare.on_trigger() + return super().trigger() + + def unstage(self) -> List[object]: + """ + Unstage device in preparation for a scan + + Returns directly if self._stopped, + otherwise checks with self._finished + if data acquisition on device finished (an was successful) + + Internal Calls: + - custom_prepare.check_scanID : check if scanID changed or detector stopped + - custom_prepare.finished : check if device finished acquisition (succesfully) + - custom_prepare.publish_file_location : publish file location to bec + + Returns: + List(object): list of objects that were unstaged + """ + self.custom_prepare.check_scanID() + if self._stopped == True: + return super().unstage() + self.custom_prepare.finished() + state = True + self.custom_prepare.publish_file_location(done=state, successful=state) + self._stopped = False + return super().unstage() + + def stop(self, *, success=False) -> None: + """ + Stop the scan, with camera and file writer + + Internal Calls: + - custom_prepare.stop_detector : stop detector + - custom_prepare.stop_backend : stop detector filewriter + """ + self.custom_prepare.stop_detector() + self.custom_prepare.stop_detector_backend() + super().stop(success=success) + self._stopped = True diff --git a/ophyd_devices/epics/devices/sls_detector_baseclass.py b/ophyd_devices/epics/devices/sls_detector_baseclass.py deleted file mode 100644 index 1285eb1..0000000 --- a/ophyd_devices/epics/devices/sls_detector_baseclass.py +++ /dev/null @@ -1,362 +0,0 @@ -import enum -import os -from abc import ABC, abstractmethod -from typing import List - -from ophyd import Device -from ophyd.device import Staged -from ophyd_devices.utils import bec_utils - -from bec_lib.bec_service import SERVICE_CONFIG -from bec_lib.file_utils import FileWriterMixin -from bec_lib import messages, MessageEndpoints -from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin - - -# Define here a minimum readout time for the detector -MIN_READOUT = None - - -# Custom exceptions specific to detectors -class DetectorError(Exception): - """ - Class for custom detector errors - - Specifying different types of errors can be helpful and used - for error handling, e.g. scan repetitions. - - An suggestion/example would be to have 3 class types for errors - - EigerError : base error class for the detector (e.g. Eiger here) - - EigerTimeoutError(EigerError) : timeout error, inherits from EigerError - - EigerInitError(EigerError) : initialization error, inherits from EigerError - """ - - pass - - -class TriggerSource(enum.IntEnum): - """ - Class for trigger signals - - Here we would map trigger options from EPICS, example implementation: - AUTO = 0 - TRIGGER = 1 - GATING = 2 - BURST_TRIGGER = 3 - - To set the trigger source to gating, call TriggerSource.Gating - """ - - pass - - -class SLSDetectorBase(ABC, Device): - """ - Abstract base class for SLS detectors - - Args: - prefix (str): EPICS PV prefix for component (optional) - name (str): name of the device, as will be reported via read() - kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal - omitted -> readout ignored for read 'ophydobj.read()' - normal -> readout for read - config -> config parameter for 'ophydobj.read_configuration()' - hinted -> which attribute is readout for read - read_attrs (list): sequence of attribute names to read - configuration_attrs (list): sequence of attribute names via config_parameters - parent (object): instance of the parent device - device_manager (object): bec device manager - sim_mode (bool): simulation mode, if True, no device manager is required - **kwargs: keyword arguments - - attributes: lazy_wait_for_connection : bool - """ - - def __init____init__( - self, - prefix="", - *, - name, - kind=None, - read_attrs=None, - configuration_attrs=None, - parent=None, - device_manager=None, - sim_mode=False, - **kwargs, - ) -> None: - super().__init__( - prefix, - name=name, - kind=kind, - read_attrs=read_attrs, - configuration_attrs=configuration_attrs, - parent=parent, - device_manager=device_manager, - **kwargs, - ) - if device_manager is None and not sim_mode: - raise DetectorError( - f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add DeviceManager to initialization or init with sim_mode=True" - ) - self.sim_mode = sim_mode - self._stopped = False - self._staged = Staged.no - self.name = name - self.service_cfg = None - self.std_client = None - self.scaninfo = None - self.filewriter = None - self.readout_time_min = MIN_READOUT - self.timeout = 5 - self.wait_for_connection(all_signals=True) - if not self.sim_mode: - self._update_service_cfg() - self.device_manager = device_manager - else: - self.device_manager = bec_utils.DMMock() - base_path = kwargs["basepath"] if "basepath" in kwargs else "~/Data10/" - self.service_cfg = {"base_path": os.path.expanduser(base_path)} - self._producer = self.device_manager.producer - self._update_scaninfo() - self._update_filewriter() - self._init() - - def _update_service_cfg(self) -> None: - """ - Update service configuration from BEC SERVICE CONFIG - """ - self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] - - def _update_scaninfo(self) -> None: - """ - Update scaninfo from BecScaninfoMixing - """ - self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode) - self.scaninfo.load_scan_metadata() - - def _update_filewriter(self) -> None: - """ - Update filewriter with service config - """ - self.filewriter = FileWriterMixin(self.service_cfg) - - @abstractmethod - def _init(self) -> None: - """ - Initialize detector & detector filewriter - - Can also be used to init default parameters - - Internal Calls: - - _init_detector : Init detector - - _init_det_fw : Init file_writer - - """ - self._init_det() - self._init_det_fw() - - @abstractmethod - def _init_det(self) -> None: - """ - Init parameters for the detector - - Raises (optional): - DetectorError: if detector cannot be initialized - """ - pass - - @abstractmethod - def _init_det_fw(self) -> None: - """ - Init parameters for detector filewriter - - Raises (optional): - DetectorError: if filewriter cannot be initialized - """ - pass - - @abstractmethod - def _set_trigger(self, trigger_source) -> None: - """ - Set trigger source for the detector - - Args: - trigger_source (enum.IntEnum): trigger source - """ - pass - - @abstractmethod - def _prep_det_fw(self) -> None: - """ - Prepare detector file writer for scan - - Raises (optional): - DetectorError: If file writer cannot be prepared - """ - pass - - @abstractmethod - def _stop_det_fw(self) -> None: - """ - Stops detector file writer - - Raises (optional): - DetectorError: If file writer cannot be stopped - """ - pass - - @abstractmethod - def _prep_det(self) -> None: - """ - Prepare detector for scans - """ - pass - - @abstractmethod - def _stop_det(self) -> None: - """ - Stop the detector and wait for the proper status message - - Raises (optional): - DetectorError: If detector cannot be prepared - """ - pass - - @abstractmethod - def _arm_acquisition(self) -> None: - """Arm detector for acquisition""" - pass - - @abstractmethod - def trigger(self) -> None: - """ - Trigger the detector, called from BEC - - Internal Calls: - - _on_trigger : call trigger action - """ - self._on_trigger() - - @abstractmethod - def _on_trigger(self) -> None: - """ - Specify action that should be taken upon trigger signal - """ - pass - - @abstractmethod - def stop(self, *, success=False) -> None: - """ - Stop the scan, with camera and file writer - - Internal Calls: - - _stop_det : stop detector - - _stop_det_fw : stop detector filewriter - """ - pass - - # TODO maybe not required to overwrite, but simply used by the user. - # If yes, then self.scaninfo.scanID & self.scaninfo.name & self.filepath - # should become input arguments - @abstractmethod - def _publish_file_location(self, done: bool = False, successful: bool = None) -> None: - """Publish the file location/event to bec - - Two events are published: - - file_event : event for the filewriter - - public_file : event for any secondary service (e.g. radial integ code) - - Args: - done (bool): True if scan is finished - successful (bool): True if scan was successful - - """ - pipe = self._producer.pipeline() - if successful is None: - msg = messages.FileMessage(file_path=self.filepath, done=done) - else: - msg = messages.FileMessage(file_path=self.filepath, done=done, successful=successful) - self._producer.set_and_publish( - MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe - ) - self._producer.set_and_publish( - MessageEndpoints.file_event(self.name), msg.dumps(), pipe=pipe - ) - pipe.execute() - - @abstractmethod - def stage(self) -> List(object): - """ - Stage device in preparation for a scan - - Internal Calls: - - _prep_det_fw : prepare detector filewriter for measurement - - _prep_det : prepare detector for measurement - - _publish_file_location : publish file location to bec - - Returns: - List(object): list of objects that were staged - - """ - # Method idempotent, should rais ;obj;'RedudantStaging' if staged twice - if self._staged != Staged.no: - return super().stage() - else: - # Reset flag for detector stopped - self._stopped = False - # Load metadata of the scan - self.scaninfo.load_scan_metadata() - # Prepare detector and file writer - self._prep_det_fw() - self._prep_det() - state = False - self._publish_file_location(done=state) - return super().stage() - - @abstractmethod - def unstage(self): - """ - Unstage device in preparation for a scan - - Returns directly if self._stopped, - otherwise checks with self._finished - if data acquisition on device finished (an was successful) - - Internal Calls: - - _finished : check if device finished acquisition (succesfully) - - _publish_file_location : publish file location to bec - """ - # Check if scan was stopped - old_scanID = self.scaninfo.scanID - self.scaninfo.load_scan_metadata() - if self.scaninfo.scanID != old_scanID: - self._stopped = True - if self._stopped == True: - return super().unstage() - # check if device finished acquisition - self._finished() - state = True - # Publish file location to bec - self._publish_file_location(done=state, successful=state) - self._stopped = False - return super().unstage() - - def _finished(self): - """ - Check if acquisition on device finished (succesfully) - - This function is called from unstage, and will check if - detector and filewriter of the detector return from acquisition. - If desired, it can also raise in case data acquisition was incomplete - - Small examples: - (1) check detector & detector filewriter status - if both finished --> good, if either is not finished --> raise - (2) (Optional) check if number of images received - is equivalent to the number of images requested - - Raises (optional): - TimeoutError: if data acquisition was incomplete - """ - pass diff --git a/tests/test_eiger9m_csaxs.py b/tests/test_eiger9m_csaxs.py index cfa2664..c33d03f 100644 --- a/tests/test_eiger9m_csaxs.py +++ b/tests/test_eiger9m_csaxs.py @@ -28,9 +28,9 @@ def mock_det(): dm = DMMock() with mock.patch.object(dm, "producer"): with mock.patch( - "ophyd_devices.epics.devices.eiger9m_csaxs.FileWriterMixin" + "ophyd_devices.epics.devices.psi_detector_base.FileWriterMixin" ) as filemixin, mock.patch( - "ophyd_devices.epics.devices.eiger9m_csaxs.Eiger9McSAXS._update_service_config" + "ophyd_devices.epics.devices.psi_detector_base.PSIDetectorBase._update_service_config" ) as mock_service_config: with mock.patch.object(ophyd, "cl") as mock_cl: mock_cl.get_pv = MockPV @@ -51,9 +51,9 @@ def test_init(): dm = DMMock() with mock.patch.object(dm, "producer"): with mock.patch( - "ophyd_devices.epics.devices.eiger9m_csaxs.FileWriterMixin" + "ophyd_devices.epics.devices.psi_detector_base.FileWriterMixin" ) as filemixin, mock.patch( - "ophyd_devices.epics.devices.eiger9m_csaxs.Eiger9McSAXS._update_service_config" + "ophyd_devices.epics.devices.psi_detector_base.PSIDetectorBase._update_service_config" ) as mock_service_config: with mock.patch.object(ophyd, "cl") as mock_cl: mock_cl.get_pv = MockPV