From a8a12103ea2108c5183a710ead04db4379627d83 Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 16 Nov 2023 17:43:22 +0100 Subject: [PATCH] fix: add PSIDetectorBase --- ophyd_devices/epics/devices/eiger9m_csaxs.py | 187 ++++++++++++++----- 1 file changed, 140 insertions(+), 47 deletions(-) diff --git a/ophyd_devices/epics/devices/eiger9m_csaxs.py b/ophyd_devices/epics/devices/eiger9m_csaxs.py index 946f272..6fcdc08 100644 --- a/ophyd_devices/epics/devices/eiger9m_csaxs.py +++ b/ophyd_devices/epics/devices/eiger9m_csaxs.py @@ -4,7 +4,7 @@ import threading import numpy as np import os -from typing import Any, List +from typing import Any, List, Type from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV from ophyd import Device @@ -127,6 +127,39 @@ class CustomDetectorMixin: DetectorTimeoutError: if detector cannot be stopped """ + def wait_for_signals( + self, + signal_conditions: list, + timeout: float, + check_stopped: bool = False, + interval: float = 0.05, + all_signals: bool = False, + ) -> bool: + """Wait for signals to reach a certain condition + + Args: + signal_conditions (tuple): tuple of (get_current_state, condition) functions + timeout (float): timeout in seconds + interval (float): interval in seconds + all_signals (bool): True if all signals should be True, False if any signal should be True + Returns: + bool: True if all signals are in the desired state, False if timeout is reached + """ + timer = 0 + while True: + checks = [ + get_current_state() == condition + for get_current_state, condition in signal_conditions + ] + if (all_signals and all(checks)) or (not all_signals and any(checks)): + return True + if check_stopped == True and self.parent._stopped == True: + return False + if timer > timeout: + return False + time.sleep(interval) + timer += interval + class Eiger9MSetup(CustomDetectorMixin): """Eiger setup class @@ -172,7 +205,7 @@ class Eiger9MSetup(CustomDetectorMixin): # Stop writer self.std_client.stop_writer() - # TODO put back change of e-account! and check with Leo which status to wait for + # Change e-account eacc = self.parent.scaninfo.username self.update_std_cfg("writer_user_id", int(eacc.strip(" e"))) @@ -204,9 +237,9 @@ class Eiger9MSetup(CustomDetectorMixin): f"Type of new value {type(value)}:{value} does not match old value {type(old_value)}:{old_value}" ) cfg.update({cfg_key: value}) - logger.info(cfg) - logger.info(f"Updated std_daq config for key {cfg_key} from {old_value} to {value}") + logger.debug(cfg) self.std_client.set_config(cfg) + logger.debug(f"Updated std_daq config for key {cfg_key} from {old_value} to {value}") def stop_detector(self) -> None: """Stop the detector and wait for the proper status message""" @@ -415,39 +448,6 @@ class Eiger9MSetup(CustomDetectorMixin): self.stop_detector() self.stop_detector_backend() - def wait_for_signals( - self, - signal_conditions: list, - timeout: float, - check_stopped: bool = False, - interval: float = 0.05, - all_signals: bool = False, - ) -> bool: - """Wait for signals to reach a certain condition - - Args: - signal_conditions (tuple): tuple of (get_current_state, condition) functions - timeout (float): timeout in seconds - interval (float): interval in seconds - all_signals (bool): True if all signals should be True, False if any signal should be True - Returns: - bool: True if all signals are in the desired state, False if timeout is reached - """ - timer = 0 - while True: - checks = [ - get_current_state() == condition - for get_current_state, condition in signal_conditions - ] - if (all_signals and all(checks)) or (not all_signals and any(checks)): - return True - if check_stopped == True and self.parent._stopped == True: - return False - if timer > timeout: - return False - time.sleep(interval) - timer += interval - class SLSDetectorCam(Device): """SLS Detector Camera - Eiger 9M @@ -491,24 +491,35 @@ class DetectorState(enum.IntEnum): ABORTED = 10 -class Eiger9McSAXS(Device): - """Eiger 9M detector for CSAXS +class PSIDetectorBase(Device): + """ + Abstract base class for SLS detectors - Parent class: DetectorBase - Device class: SlsDetectorCam - - Attributes: - name str: 'eiger' - prefix (str): PV prefix (X12SA-ES-EIGER9M:) + Args: + prefix (str): EPICS PV prefix for component (optional) + name (str): name of the device, as will be reported via read() + kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal + omitted -> readout ignored for read 'ophydobj.read()' + normal -> readout for read + config -> config parameter for 'ophydobj.read_configuration()' + hinted -> which attribute is readout for read + read_attrs (list): sequence of attribute names to read + configuration_attrs (list): sequence of attribute names via config_parameters + parent (object): instance of the parent device + device_manager (object): bec device manager + sim_mode (bool): simulation mode, if True, no device manager is required + **kwargs: keyword arguments + attributes: lazy_wait_for_connection : bool """ + custom_prepare_cls = CustomDetectorMixin # Specify which functions are revealed to the user in BEC client USER_ACCESS = [ "describe", ] - cam = ADCpt(SLSDetectorCam, "cam1:") + # cam = ADCpt(SLSDetectorCam, "cam1:") def __init__( self, @@ -537,6 +548,7 @@ class Eiger9McSAXS(Device): f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add DeviceManager to initialization or init with sim_mode=True" ) # sim_mode True allows the class to be started without BEC running + # Init variables self.sim_mode = sim_mode self._lock = threading.RLock() self._stopped = False @@ -548,7 +560,10 @@ class Eiger9McSAXS(Device): self.readout_time_min = MIN_READOUT self.timeout = 5 self.wait_for_connection(all_signals=True) - self.custom_prepare = Eiger9MSetup(parent=self, **kwargs) + # Init custom prepare class with BL specific logic + self.custom_prepare = self.custom_prepare_cls( + parent=self, **kwargs + ) # Eiger9MSetup(parent=self, **kwargs) if not sim_mode: self._update_service_config() self.device_manager = device_manager @@ -708,6 +723,84 @@ class Eiger9McSAXS(Device): super().stop(success=success) self._stopped = True + # def set_trigger(self, trigger_source: TriggerSource) -> None: + # """Set trigger source for the detector. + # Check the TriggerSource enum for possible values + + # Args: + # trigger_source (TriggerSource): Trigger source for the detector + + # """ + # value = trigger_source + # self.cam.trigger_mode.put(value) + + +class Eiger9McSAXS(PSIDetectorBase): + custom_prepare_cls = Eiger9MSetup + cam = ADCpt(SLSDetectorCam, "cam1:") + + def __init__( + self, + prefix="", + *, + name, + kind=None, + read_attrs=None, + configuration_attrs=None, + parent=None, + device_manager=None, + sim_mode=False, + **kwargs, + ): + super().__init__( + prefix, + name=name, + kind=kind, + read_attrs=read_attrs, + configuration_attrs=configuration_attrs, + parent=parent, + device_manager=device_manager, + sim_mode=sim_mode, + **kwargs, + ) + + def set_trigger(self, trigger_source: TriggerSource) -> None: + """Set trigger source for the detector. + Check the TriggerSource enum for possible values + + Args: + trigger_source (TriggerSource): Trigger source for the detector + + """ + value = trigger_source + self.cam.trigger_mode.put(value) + + +# class Eiger9McSAXS(Eiger9M): +# def __init__( +# self, +# prefix="", +# *, +# name, +# kind=None, +# read_attrs=None, +# configuration_attrs=None, +# parent=None, +# device_manager=None, +# sim_mode=False, +# **kwargs, +# ): +# super().__init__( +# prefix=prefix, +# name=name, +# kind=kind, +# read_attrs=read_attrs, +# configuration_attrs=configuration_attrs, +# parent=parent, +# **kwargs, +# ) +# # self.custom_prepare = Eiger9MSetup(parent=self, **kwargs) + if __name__ == "__main__": eiger = Eiger9McSAXS(name="eiger", prefix="X12SA-ES-EIGER9M:", sim_mode=True)