fix: add PSIDetectorBase

This commit is contained in:
appel_c 2023-11-16 17:43:22 +01:00
parent ee5cf17a05
commit a8a12103ea

View File

@ -4,7 +4,7 @@ import threading
import numpy as np
import os
from typing import Any, List
from typing import Any, List, Type
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd import Device
@ -127,6 +127,39 @@ class CustomDetectorMixin:
DetectorTimeoutError: if detector cannot be stopped
"""
def wait_for_signals(
self,
signal_conditions: list,
timeout: float,
check_stopped: bool = False,
interval: float = 0.05,
all_signals: bool = False,
) -> bool:
"""Wait for signals to reach a certain condition
Args:
signal_conditions (tuple): tuple of (get_current_state, condition) functions
timeout (float): timeout in seconds
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
Returns:
bool: True if all signals are in the desired state, False if timeout is reached
"""
timer = 0
while True:
checks = [
get_current_state() == condition
for get_current_state, condition in signal_conditions
]
if (all_signals and all(checks)) or (not all_signals and any(checks)):
return True
if check_stopped == True and self.parent._stopped == True:
return False
if timer > timeout:
return False
time.sleep(interval)
timer += interval
class Eiger9MSetup(CustomDetectorMixin):
"""Eiger setup class
@ -172,7 +205,7 @@ class Eiger9MSetup(CustomDetectorMixin):
# Stop writer
self.std_client.stop_writer()
# TODO put back change of e-account! and check with Leo which status to wait for
# Change e-account
eacc = self.parent.scaninfo.username
self.update_std_cfg("writer_user_id", int(eacc.strip(" e")))
@ -204,9 +237,9 @@ class Eiger9MSetup(CustomDetectorMixin):
f"Type of new value {type(value)}:{value} does not match old value {type(old_value)}:{old_value}"
)
cfg.update({cfg_key: value})
logger.info(cfg)
logger.info(f"Updated std_daq config for key {cfg_key} from {old_value} to {value}")
logger.debug(cfg)
self.std_client.set_config(cfg)
logger.debug(f"Updated std_daq config for key {cfg_key} from {old_value} to {value}")
def stop_detector(self) -> None:
"""Stop the detector and wait for the proper status message"""
@ -415,39 +448,6 @@ class Eiger9MSetup(CustomDetectorMixin):
self.stop_detector()
self.stop_detector_backend()
def wait_for_signals(
self,
signal_conditions: list,
timeout: float,
check_stopped: bool = False,
interval: float = 0.05,
all_signals: bool = False,
) -> bool:
"""Wait for signals to reach a certain condition
Args:
signal_conditions (tuple): tuple of (get_current_state, condition) functions
timeout (float): timeout in seconds
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
Returns:
bool: True if all signals are in the desired state, False if timeout is reached
"""
timer = 0
while True:
checks = [
get_current_state() == condition
for get_current_state, condition in signal_conditions
]
if (all_signals and all(checks)) or (not all_signals and any(checks)):
return True
if check_stopped == True and self.parent._stopped == True:
return False
if timer > timeout:
return False
time.sleep(interval)
timer += interval
class SLSDetectorCam(Device):
"""SLS Detector Camera - Eiger 9M
@ -491,24 +491,35 @@ class DetectorState(enum.IntEnum):
ABORTED = 10
class Eiger9McSAXS(Device):
"""Eiger 9M detector for CSAXS
class PSIDetectorBase(Device):
"""
Abstract base class for SLS detectors
Parent class: DetectorBase
Device class: SlsDetectorCam
Attributes:
name str: 'eiger'
prefix (str): PV prefix (X12SA-ES-EIGER9M:)
Args:
prefix (str): EPICS PV prefix for component (optional)
name (str): name of the device, as will be reported via read()
kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal
omitted -> readout ignored for read 'ophydobj.read()'
normal -> readout for read
config -> config parameter for 'ophydobj.read_configuration()'
hinted -> which attribute is readout for read
read_attrs (list): sequence of attribute names to read
configuration_attrs (list): sequence of attribute names via config_parameters
parent (object): instance of the parent device
device_manager (object): bec device manager
sim_mode (bool): simulation mode, if True, no device manager is required
**kwargs: keyword arguments
attributes: lazy_wait_for_connection : bool
"""
custom_prepare_cls = CustomDetectorMixin
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = [
"describe",
]
cam = ADCpt(SLSDetectorCam, "cam1:")
# cam = ADCpt(SLSDetectorCam, "cam1:")
def __init__(
self,
@ -537,6 +548,7 @@ class Eiger9McSAXS(Device):
f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add DeviceManager to initialization or init with sim_mode=True"
)
# sim_mode True allows the class to be started without BEC running
# Init variables
self.sim_mode = sim_mode
self._lock = threading.RLock()
self._stopped = False
@ -548,7 +560,10 @@ class Eiger9McSAXS(Device):
self.readout_time_min = MIN_READOUT
self.timeout = 5
self.wait_for_connection(all_signals=True)
self.custom_prepare = Eiger9MSetup(parent=self, **kwargs)
# Init custom prepare class with BL specific logic
self.custom_prepare = self.custom_prepare_cls(
parent=self, **kwargs
) # Eiger9MSetup(parent=self, **kwargs)
if not sim_mode:
self._update_service_config()
self.device_manager = device_manager
@ -708,6 +723,84 @@ class Eiger9McSAXS(Device):
super().stop(success=success)
self._stopped = True
# def set_trigger(self, trigger_source: TriggerSource) -> None:
# """Set trigger source for the detector.
# Check the TriggerSource enum for possible values
# Args:
# trigger_source (TriggerSource): Trigger source for the detector
# """
# value = trigger_source
# self.cam.trigger_mode.put(value)
class Eiger9McSAXS(PSIDetectorBase):
custom_prepare_cls = Eiger9MSetup
cam = ADCpt(SLSDetectorCam, "cam1:")
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
**kwargs,
):
super().__init__(
prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
device_manager=device_manager,
sim_mode=sim_mode,
**kwargs,
)
def set_trigger(self, trigger_source: TriggerSource) -> None:
"""Set trigger source for the detector.
Check the TriggerSource enum for possible values
Args:
trigger_source (TriggerSource): Trigger source for the detector
"""
value = trigger_source
self.cam.trigger_mode.put(value)
# class Eiger9McSAXS(Eiger9M):
# def __init__(
# self,
# prefix="",
# *,
# name,
# kind=None,
# read_attrs=None,
# configuration_attrs=None,
# parent=None,
# device_manager=None,
# sim_mode=False,
# **kwargs,
# ):
# super().__init__(
# prefix=prefix,
# name=name,
# kind=kind,
# read_attrs=read_attrs,
# configuration_attrs=configuration_attrs,
# parent=parent,
# **kwargs,
# )
# # self.custom_prepare = Eiger9MSetup(parent=self, **kwargs)
if __name__ == "__main__":
eiger = Eiger9McSAXS(name="eiger", prefix="X12SA-ES-EIGER9M:", sim_mode=True)