fix: removed sls_detector_baseclass, add psi_detector_base, fixed tests and eiger9m_csaxs

This commit is contained in:
appel_c 2023-11-16 18:29:47 +01:00
parent a8a12103ea
commit 90cd05e68e
5 changed files with 352 additions and 771 deletions

View File

@ -25,7 +25,10 @@ from ophyd.quadem import QuadEM
# cSAXS
from .epics_motor_ex import EpicsMotorEx
from .mcs_csaxs import McsCsaxs
from .psi_detector_base import PSIDetectorBase, CustomDetectorMixin
from .eiger9m_csaxs import Eiger9McSAXS
from .pilatus_csaxs import PilatuscSAXS
from .falcon_csaxs import FalconcSAXS
from .DelayGeneratorDG645 import DelayGeneratorDG645
# from .psi_detector_base import PSIDetectorBase, CustomDetectorMixin

View File

@ -1,31 +1,23 @@
import enum
import time
import threading
import numpy as np
import os
from typing import Any, List, Type
from typing import Any
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd import Device
from ophyd import ADComponent as ADCpt
from ophyd.device import Staged
from std_daq_client import StdDaqClient
from bec_lib import messages, MessageEndpoints, threadlocked, bec_logger
from bec_lib.bec_service import SERVICE_CONFIG
from bec_lib.devicemanager import DeviceStatus
from bec_lib.file_utils import FileWriterMixin
from bec_lib.core import BECMessage, MessageEndpoints, threadlocked
from bec_lib.core import bec_logger
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils import bec_utils
from ophyd_devices.epics.devices.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
logger = bec_logger.logger
# Specify here the minimum readout time for the detector
MIN_READOUT = 3e-3
class EigerError(Exception):
"""Base class for exceptions in this module."""
@ -39,128 +31,6 @@ class EigerTimeoutError(EigerError):
pass
class EigerInitError(EigerError):
"""Raised when initiation of the device class fails,
due to missing device manager or not started in sim_mode."""
pass
class CustomDetectorMixin:
def __init__(self, parent: Device = None, *args, **kwargs) -> None:
self.parent = parent
def initialize_default_parameter(self) -> None:
"""
Init parameters for the detector
Raises (optional):
DetectorTimeoutError: if detector cannot be initialized
"""
pass
def initialize_detector(self) -> None:
"""
Init parameters for the detector
Raises (optional):
DetectorTimeoutError: if detector cannot be initialized
"""
pass
def initialize_detector_backend(self) -> None:
"""
Init parameters for teh detector backend (filewriter)
Raises (optional):
DetectorTimeoutError: if filewriter cannot be initialized
"""
pass
def prepare_detector(self) -> None:
"""
Prepare detector for the scan
"""
pass
def prepare_data_backend(self) -> None:
"""
Prepare the data backend for the scan
"""
pass
def stop_detector(self) -> None:
"""
Stop the detector
"""
pass
def stop_detector_backend(self) -> None:
"""
Stop the detector backend
"""
pass
def on_trigger(self) -> None:
"""
Specify actions to be executed upon receiving trigger signal
"""
pass
def pre_scan(self) -> None:
"""
Specify actions to be executed right before a scan
BEC calls pre_scan just before execution of the scan core.
It is convenient to execute time critical features of the detector,
e.g. arming it, but it is recommended to keep this function as short/fast as possible.
"""
pass
def finished(self) -> None:
"""
Specify actions to be executed during unstage
This may include checks if acquisition was succesful
Raises (optional):
DetectorTimeoutError: if detector cannot be stopped
"""
def wait_for_signals(
self,
signal_conditions: list,
timeout: float,
check_stopped: bool = False,
interval: float = 0.05,
all_signals: bool = False,
) -> bool:
"""Wait for signals to reach a certain condition
Args:
signal_conditions (tuple): tuple of (get_current_state, condition) functions
timeout (float): timeout in seconds
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
Returns:
bool: True if all signals are in the desired state, False if timeout is reached
"""
timer = 0
while True:
checks = [
get_current_state() == condition
for get_current_state, condition in signal_conditions
]
if (all_signals and all(checks)) or (not all_signals and any(checks)):
return True
if check_stopped == True and self.parent._stopped == True:
return False
if timer > timeout:
return False
time.sleep(interval)
timer += interval
class Eiger9MSetup(CustomDetectorMixin):
"""Eiger setup class
@ -226,6 +96,8 @@ class Eiger9MSetup(CustomDetectorMixin):
def update_std_cfg(self, cfg_key: str, value: Any) -> None:
"""Update std_daq config with new e-account for the current beamtime"""
# Load config from client and check old value
cfg = self.std_client.get_config()
old_value = cfg.get(cfg_key)
if old_value is None:
@ -236,6 +108,8 @@ class Eiger9MSetup(CustomDetectorMixin):
raise EigerError(
f"Type of new value {type(value)}:{value} does not match old value {type(old_value)}:{old_value}"
)
# Update config with new value and send back to client
cfg.update({cfg_key: value})
logger.debug(cfg)
self.std_client.set_config(cfg)
@ -243,8 +117,11 @@ class Eiger9MSetup(CustomDetectorMixin):
def stop_detector(self) -> None:
"""Stop the detector and wait for the proper status message"""
# Stop detector
self.parent.cam.acquire.put(0)
# Check if detector returned in idle state
signal_conditions = [
(
lambda: self.parent.cam.detector_state.read()[self.parent.cam.detector_state.name][
@ -260,7 +137,7 @@ class Eiger9MSetup(CustomDetectorMixin):
timeout=self.parent.timeout - self.parent.timeout // 2,
all_signals=False,
):
# Retry stop detector
# Retry stop detector and wait for remaining time
self.parent.cam.acquire.put(0)
if not self.wait_for_signals(
signal_conditions=signal_conditions,
@ -287,6 +164,8 @@ class Eiger9MSetup(CustomDetectorMixin):
Threshold might be in ev or keV
"""
# get current beam energy from device manageer
mokev = self.parent.device_manager.devices.mokev.obj.read()[
self.parent.device_manager.devices.mokev.name
]["value"]
@ -312,10 +191,14 @@ class Eiger9MSetup(CustomDetectorMixin):
def set_acquisition_params(self) -> None:
"""Set acquisition parameters for the detector"""
# Set number of images and frames (frames is for internal burst of detector)
self.parent.cam.num_images.put(
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
)
self.parent.cam.num_frames.put(1)
# Update the readout time of the detector
self.update_readout_time()
def prepare_data_backend(self) -> None:
@ -491,253 +374,10 @@ class DetectorState(enum.IntEnum):
ABORTED = 10
class PSIDetectorBase(Device):
"""
Abstract base class for SLS detectors
Args:
prefix (str): EPICS PV prefix for component (optional)
name (str): name of the device, as will be reported via read()
kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal
omitted -> readout ignored for read 'ophydobj.read()'
normal -> readout for read
config -> config parameter for 'ophydobj.read_configuration()'
hinted -> which attribute is readout for read
read_attrs (list): sequence of attribute names to read
configuration_attrs (list): sequence of attribute names via config_parameters
parent (object): instance of the parent device
device_manager (object): bec device manager
sim_mode (bool): simulation mode, if True, no device manager is required
**kwargs: keyword arguments
attributes: lazy_wait_for_connection : bool
"""
custom_prepare_cls = CustomDetectorMixin
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = [
"describe",
]
# cam = ADCpt(SLSDetectorCam, "cam1:")
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
**kwargs,
):
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
if device_manager is None and not sim_mode:
raise EigerInitError(
f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add DeviceManager to initialization or init with sim_mode=True"
)
# sim_mode True allows the class to be started without BEC running
# Init variables
self.sim_mode = sim_mode
self._lock = threading.RLock()
self._stopped = False
self.name = name
self.service_cfg = None
self.std_client = None
self.scaninfo = None
self.filewriter = None
self.readout_time_min = MIN_READOUT
self.timeout = 5
self.wait_for_connection(all_signals=True)
# Init custom prepare class with BL specific logic
self.custom_prepare = self.custom_prepare_cls(
parent=self, **kwargs
) # Eiger9MSetup(parent=self, **kwargs)
if not sim_mode:
self._update_service_config()
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
base_path = kwargs["basepath"] if "basepath" in kwargs else "~/Data10/"
self.service_cfg = {"base_path": os.path.expanduser(base_path)}
self._producer = self.device_manager.producer
self._update_scaninfo()
self._update_filewriter()
self._init()
def _update_filewriter(self) -> None:
"""Update filewriter with service config"""
self.filewriter = FileWriterMixin(self.service_cfg)
def _update_scaninfo(self) -> None:
"""Update scaninfo from BecScaninfoMixing
This depends on device manager and operation/sim_mode
"""
self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode)
self.scaninfo.load_scan_metadata()
def _update_service_config(self) -> None:
"""Update service config from BEC service config"""
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
def _init(self) -> None:
"""Initialize detector, filewriter and set default parameters"""
self.custom_prepare.initialize_default_parameter()
self.custom_prepare.initialize_detector()
self.custom_prepare.initialize_detector_backend()
def stage(self) -> List[object]:
"""
Stage device in preparation for a scan
Internal Calls:
- _prep_backend : prepare detector filewriter for measurement
- _prep_detector : prepare detector for measurement
Returns:
List(object): list of objects that were staged
"""
# Method idempotent, should rais ;obj;'RedudantStaging' if staged twice
if self._staged != Staged.no:
return super().stage()
# Reset flag for detector stopped
self._stopped = False
# Load metadata of the scan
self.scaninfo.load_scan_metadata()
# Prepare detector and file writer
self.custom_prepare.prepare_data_backend()
self.custom_prepare.prepare_detector()
state = False
self.custom_prepare.publish_file_location(done=state)
self.custom_prepare.arm_acquisition()
# At the moment needed bc signal is not reliable, BEC too fast
time.sleep(0.05)
return super().stage()
def set_trigger(self, trigger_source: TriggerSource) -> None:
"""Set trigger source for the detector.
Check the TriggerSource enum for possible values
Args:
trigger_source (TriggerSource): Trigger source for the detector
"""
value = trigger_source
self.cam.trigger_mode.put(value)
def _publish_file_location(self, done: bool = False, successful: bool = None) -> None:
"""Publish the filepath to REDIS.
We publish two events here:
- file_event: event for the filewriter
- public_file: event for any secondary service (e.g. radial integ code)
Args:
done (bool): True if scan is finished
successful (bool): True if scan was successful
"""
pipe = self._producer.pipeline()
if successful is None:
msg = messages.FileMessage(file_path=self.filepath, done=done)
else:
msg = messages.FileMessage(file_path=self.filepath, done=done, successful=successful)
self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe
)
self._producer.set_and_publish(
MessageEndpoints.file_event(self.name), msg.dumps(), pipe=pipe
)
pipe.execute()
# TODO function for abstract class?
def _arm_acquisition(self) -> None:
"""Arm Eiger detector for acquisition"""
timer = 0
self.cam.acquire.put(1)
while True:
det_ctrl = self.cam.detector_state.read()[self.cam.detector_state.name]["value"]
if det_ctrl == DetectorState.RUNNING:
break
if self._stopped == True:
break
time.sleep(0.01)
timer += 0.01
if timer > 5:
self.stop()
raise EigerTimeoutError("Failed to arm the acquisition. IOC did not update.")
# TODO function for abstract class?
def trigger(self) -> DeviceStatus:
"""Trigger the detector, called from BEC."""
self.custom_prepare.on_trigger()
return super().trigger()
def unstage(self) -> List[object]:
"""
Unstage device in preparation for a scan
Returns directly if self._stopped,
otherwise checks with self._finished
if data acquisition on device finished (an was successful)
Internal Calls:
- custom_prepare.check_scanID : check if scanID changed or detector stopped
- custom_prepare.finished : check if device finished acquisition (succesfully)
- custom_prepare.publish_file_location : publish file location to bec
Returns:
List(object): list of objects that were unstaged
"""
self.custom_prepare.check_scanID()
if self._stopped == True:
return super().unstage()
self.custom_prepare.finished()
state = True
self.custom_prepare.publish_file_location(done=state, successful=state)
self._stopped = False
return super().unstage()
def stop(self, *, success=False) -> None:
"""
Stop the scan, with camera and file writer
Internal Calls:
- custom_prepare.stop_detector : stop detector
- custom_prepare.stop_backend : stop detector filewriter
"""
self.custom_prepare.stop_detector()
self.custom_prepare.stop_detector_backend()
super().stop(success=success)
self._stopped = True
# def set_trigger(self, trigger_source: TriggerSource) -> None:
# """Set trigger source for the detector.
# Check the TriggerSource enum for possible values
# Args:
# trigger_source (TriggerSource): Trigger source for the detector
# """
# value = trigger_source
# self.cam.trigger_mode.put(value)
class Eiger9McSAXS(PSIDetectorBase):
custom_prepare_cls = Eiger9MSetup
cam = ADCpt(SLSDetectorCam, "cam1:")
MIN_READOUT = 3e-3
def __init__(
self,
@ -776,31 +416,5 @@ class Eiger9McSAXS(PSIDetectorBase):
self.cam.trigger_mode.put(value)
# class Eiger9McSAXS(Eiger9M):
# def __init__(
# self,
# prefix="",
# *,
# name,
# kind=None,
# read_attrs=None,
# configuration_attrs=None,
# parent=None,
# device_manager=None,
# sim_mode=False,
# **kwargs,
# ):
# super().__init__(
# prefix=prefix,
# name=name,
# kind=kind,
# read_attrs=read_attrs,
# configuration_attrs=configuration_attrs,
# parent=parent,
# **kwargs,
# )
# # self.custom_prepare = Eiger9MSetup(parent=self, **kwargs)
if __name__ == "__main__":
eiger = Eiger9McSAXS(name="eiger", prefix="X12SA-ES-EIGER9M:", sim_mode=True)

View File

@ -0,0 +1,326 @@
import time
import threading
from bec_lib.core.devicemanager import DeviceStatus
import os
from typing import List
from ophyd import Device
from ophyd.device import Staged
from bec_lib.core.file_utils import FileWriterMixin
from bec_lib.core.bec_service import SERVICE_CONFIG
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils import bec_utils
# Specify here the minimum readout time for the detector
MIN_READOUT = 3e-3
class DetectorInitError(Exception):
"""Raised when initiation of the device class fails,
due to missing device manager or not started in sim_mode."""
pass
class CustomDetectorMixin:
def __init__(self, parent: Device = None, *args, **kwargs) -> None:
self.parent = parent
def initialize_default_parameter(self) -> None:
"""
Init parameters for the detector
Raises (optional):
DetectorTimeoutError: if detector cannot be initialized
"""
pass
def initialize_detector(self) -> None:
"""
Init parameters for the detector
Raises (optional):
DetectorTimeoutError: if detector cannot be initialized
"""
pass
def initialize_detector_backend(self) -> None:
"""
Init parameters for teh detector backend (filewriter)
Raises (optional):
DetectorTimeoutError: if filewriter cannot be initialized
"""
pass
def prepare_detector(self) -> None:
"""
Prepare detector for the scan
"""
pass
def prepare_data_backend(self) -> None:
"""
Prepare the data backend for the scan
"""
pass
def stop_detector(self) -> None:
"""
Stop the detector
"""
pass
def stop_detector_backend(self) -> None:
"""
Stop the detector backend
"""
pass
def on_trigger(self) -> None:
"""
Specify actions to be executed upon receiving trigger signal
"""
pass
def pre_scan(self) -> None:
"""
Specify actions to be executed right before a scan
BEC calls pre_scan just before execution of the scan core.
It is convenient to execute time critical features of the detector,
e.g. arming it, but it is recommended to keep this function as short/fast as possible.
"""
pass
def finished(self) -> None:
"""
Specify actions to be executed during unstage
This may include checks if acquisition was succesful
Raises (optional):
DetectorTimeoutError: if detector cannot be stopped
"""
def wait_for_signals(
self,
signal_conditions: list,
timeout: float,
check_stopped: bool = False,
interval: float = 0.05,
all_signals: bool = False,
) -> bool:
"""Wait for signals to reach a certain condition
Args:
signal_conditions (tuple): tuple of (get_current_state, condition) functions
timeout (float): timeout in seconds
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
Returns:
bool: True if all signals are in the desired state, False if timeout is reached
"""
timer = 0
while True:
checks = [
get_current_state() == condition
for get_current_state, condition in signal_conditions
]
if (all_signals and all(checks)) or (not all_signals and any(checks)):
return True
if check_stopped == True and self.parent._stopped == True:
return False
if timer > timeout:
return False
time.sleep(interval)
timer += interval
class PSIDetectorBase(Device):
"""
Abstract base class for SLS detectors
Class attributes:
custom_prepare_cls (object): class for custom prepare logic (BL specific)
Min_readout (float): minimum readout time for detector
Args:
prefix (str): EPICS PV prefix for component (optional)
name (str): name of the device, as will be reported via read()
kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal
omitted -> readout ignored for read 'ophydobj.read()'
normal -> readout for read
config -> config parameter for 'ophydobj.read_configuration()'
hinted -> which attribute is readout for read
read_attrs (list): sequence of attribute names to read
configuration_attrs (list): sequence of attribute names via config_parameters
parent (object): instance of the parent device
device_manager (object): bec device manager
sim_mode (bool): simulation mode, if True, no device manager is required
**kwargs: keyword arguments
attributes: lazy_wait_for_connection : bool
"""
custom_prepare_cls = CustomDetectorMixin
MIN_READOUT = 1e-3
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = [
"describe",
]
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
**kwargs,
):
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
if device_manager is None and not sim_mode:
raise DetectorInitError(
f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add DeviceManager to initialization or init with sim_mode=True"
)
# sim_mode True allows the class to be started without BEC running
# Init variables
self.sim_mode = sim_mode
self._lock = threading.RLock()
self._stopped = False
self.name = name
self.service_cfg = None
self.std_client = None
self.scaninfo = None
self.filewriter = None
self.readout_time_min = MIN_READOUT
self.timeout = 5
self.wait_for_connection(all_signals=True)
# Init custom prepare class with BL specific logic
self.custom_prepare = self.custom_prepare_cls(
parent=self, **kwargs
) # Eiger9MSetup(parent=self, **kwargs)
if not sim_mode:
self._update_service_config()
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
base_path = kwargs["basepath"] if "basepath" in kwargs else "~/Data10/"
self.service_cfg = {"base_path": os.path.expanduser(base_path)}
self._producer = self.device_manager.producer
self._update_scaninfo()
self._update_filewriter()
self._init()
def _update_filewriter(self) -> None:
"""Update filewriter with service config"""
self.filewriter = FileWriterMixin(self.service_cfg)
def _update_scaninfo(self) -> None:
"""Update scaninfo from BecScaninfoMixing
This depends on device manager and operation/sim_mode
"""
self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode)
self.scaninfo.load_scan_metadata()
def _update_service_config(self) -> None:
"""Update service config from BEC service config"""
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
def _init(self) -> None:
"""Initialize detector, filewriter and set default parameters"""
self.custom_prepare.initialize_default_parameter()
self.custom_prepare.initialize_detector()
self.custom_prepare.initialize_detector_backend()
def stage(self) -> List[object]:
"""
Stage device in preparation for a scan
Internal Calls:
- _prep_backend : prepare detector filewriter for measurement
- _prep_detector : prepare detector for measurement
Returns:
List(object): list of objects that were staged
"""
# Method idempotent, should rais ;obj;'RedudantStaging' if staged twice
if self._staged != Staged.no:
return super().stage()
# Reset flag for detector stopped
self._stopped = False
# Load metadata of the scan
self.scaninfo.load_scan_metadata()
# Prepare detector and file writer
self.custom_prepare.prepare_data_backend()
self.custom_prepare.prepare_detector()
state = False
self.custom_prepare.publish_file_location(done=state)
self.custom_prepare.arm_acquisition()
# At the moment needed bc signal is not reliable, BEC too fast
time.sleep(0.05)
return super().stage()
def trigger(self) -> DeviceStatus:
"""Trigger the detector, called from BEC."""
self.custom_prepare.on_trigger()
return super().trigger()
def unstage(self) -> List[object]:
"""
Unstage device in preparation for a scan
Returns directly if self._stopped,
otherwise checks with self._finished
if data acquisition on device finished (an was successful)
Internal Calls:
- custom_prepare.check_scanID : check if scanID changed or detector stopped
- custom_prepare.finished : check if device finished acquisition (succesfully)
- custom_prepare.publish_file_location : publish file location to bec
Returns:
List(object): list of objects that were unstaged
"""
self.custom_prepare.check_scanID()
if self._stopped == True:
return super().unstage()
self.custom_prepare.finished()
state = True
self.custom_prepare.publish_file_location(done=state, successful=state)
self._stopped = False
return super().unstage()
def stop(self, *, success=False) -> None:
"""
Stop the scan, with camera and file writer
Internal Calls:
- custom_prepare.stop_detector : stop detector
- custom_prepare.stop_backend : stop detector filewriter
"""
self.custom_prepare.stop_detector()
self.custom_prepare.stop_detector_backend()
super().stop(success=success)
self._stopped = True

View File

@ -1,362 +0,0 @@
import enum
import os
from abc import ABC, abstractmethod
from typing import List
from ophyd import Device
from ophyd.device import Staged
from ophyd_devices.utils import bec_utils
from bec_lib.bec_service import SERVICE_CONFIG
from bec_lib.file_utils import FileWriterMixin
from bec_lib import messages, MessageEndpoints
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
# Define here a minimum readout time for the detector
MIN_READOUT = None
# Custom exceptions specific to detectors
class DetectorError(Exception):
"""
Class for custom detector errors
Specifying different types of errors can be helpful and used
for error handling, e.g. scan repetitions.
An suggestion/example would be to have 3 class types for errors
- EigerError : base error class for the detector (e.g. Eiger here)
- EigerTimeoutError(EigerError) : timeout error, inherits from EigerError
- EigerInitError(EigerError) : initialization error, inherits from EigerError
"""
pass
class TriggerSource(enum.IntEnum):
"""
Class for trigger signals
Here we would map trigger options from EPICS, example implementation:
AUTO = 0
TRIGGER = 1
GATING = 2
BURST_TRIGGER = 3
To set the trigger source to gating, call TriggerSource.Gating
"""
pass
class SLSDetectorBase(ABC, Device):
"""
Abstract base class for SLS detectors
Args:
prefix (str): EPICS PV prefix for component (optional)
name (str): name of the device, as will be reported via read()
kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal
omitted -> readout ignored for read 'ophydobj.read()'
normal -> readout for read
config -> config parameter for 'ophydobj.read_configuration()'
hinted -> which attribute is readout for read
read_attrs (list): sequence of attribute names to read
configuration_attrs (list): sequence of attribute names via config_parameters
parent (object): instance of the parent device
device_manager (object): bec device manager
sim_mode (bool): simulation mode, if True, no device manager is required
**kwargs: keyword arguments
attributes: lazy_wait_for_connection : bool
"""
def __init____init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
**kwargs,
) -> None:
super().__init__(
prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
device_manager=device_manager,
**kwargs,
)
if device_manager is None and not sim_mode:
raise DetectorError(
f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add DeviceManager to initialization or init with sim_mode=True"
)
self.sim_mode = sim_mode
self._stopped = False
self._staged = Staged.no
self.name = name
self.service_cfg = None
self.std_client = None
self.scaninfo = None
self.filewriter = None
self.readout_time_min = MIN_READOUT
self.timeout = 5
self.wait_for_connection(all_signals=True)
if not self.sim_mode:
self._update_service_cfg()
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
base_path = kwargs["basepath"] if "basepath" in kwargs else "~/Data10/"
self.service_cfg = {"base_path": os.path.expanduser(base_path)}
self._producer = self.device_manager.producer
self._update_scaninfo()
self._update_filewriter()
self._init()
def _update_service_cfg(self) -> None:
"""
Update service configuration from BEC SERVICE CONFIG
"""
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
def _update_scaninfo(self) -> None:
"""
Update scaninfo from BecScaninfoMixing
"""
self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode)
self.scaninfo.load_scan_metadata()
def _update_filewriter(self) -> None:
"""
Update filewriter with service config
"""
self.filewriter = FileWriterMixin(self.service_cfg)
@abstractmethod
def _init(self) -> None:
"""
Initialize detector & detector filewriter
Can also be used to init default parameters
Internal Calls:
- _init_detector : Init detector
- _init_det_fw : Init file_writer
"""
self._init_det()
self._init_det_fw()
@abstractmethod
def _init_det(self) -> None:
"""
Init parameters for the detector
Raises (optional):
DetectorError: if detector cannot be initialized
"""
pass
@abstractmethod
def _init_det_fw(self) -> None:
"""
Init parameters for detector filewriter
Raises (optional):
DetectorError: if filewriter cannot be initialized
"""
pass
@abstractmethod
def _set_trigger(self, trigger_source) -> None:
"""
Set trigger source for the detector
Args:
trigger_source (enum.IntEnum): trigger source
"""
pass
@abstractmethod
def _prep_det_fw(self) -> None:
"""
Prepare detector file writer for scan
Raises (optional):
DetectorError: If file writer cannot be prepared
"""
pass
@abstractmethod
def _stop_det_fw(self) -> None:
"""
Stops detector file writer
Raises (optional):
DetectorError: If file writer cannot be stopped
"""
pass
@abstractmethod
def _prep_det(self) -> None:
"""
Prepare detector for scans
"""
pass
@abstractmethod
def _stop_det(self) -> None:
"""
Stop the detector and wait for the proper status message
Raises (optional):
DetectorError: If detector cannot be prepared
"""
pass
@abstractmethod
def _arm_acquisition(self) -> None:
"""Arm detector for acquisition"""
pass
@abstractmethod
def trigger(self) -> None:
"""
Trigger the detector, called from BEC
Internal Calls:
- _on_trigger : call trigger action
"""
self._on_trigger()
@abstractmethod
def _on_trigger(self) -> None:
"""
Specify action that should be taken upon trigger signal
"""
pass
@abstractmethod
def stop(self, *, success=False) -> None:
"""
Stop the scan, with camera and file writer
Internal Calls:
- _stop_det : stop detector
- _stop_det_fw : stop detector filewriter
"""
pass
# TODO maybe not required to overwrite, but simply used by the user.
# If yes, then self.scaninfo.scanID & self.scaninfo.name & self.filepath
# should become input arguments
@abstractmethod
def _publish_file_location(self, done: bool = False, successful: bool = None) -> None:
"""Publish the file location/event to bec
Two events are published:
- file_event : event for the filewriter
- public_file : event for any secondary service (e.g. radial integ code)
Args:
done (bool): True if scan is finished
successful (bool): True if scan was successful
"""
pipe = self._producer.pipeline()
if successful is None:
msg = messages.FileMessage(file_path=self.filepath, done=done)
else:
msg = messages.FileMessage(file_path=self.filepath, done=done, successful=successful)
self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe
)
self._producer.set_and_publish(
MessageEndpoints.file_event(self.name), msg.dumps(), pipe=pipe
)
pipe.execute()
@abstractmethod
def stage(self) -> List(object):
"""
Stage device in preparation for a scan
Internal Calls:
- _prep_det_fw : prepare detector filewriter for measurement
- _prep_det : prepare detector for measurement
- _publish_file_location : publish file location to bec
Returns:
List(object): list of objects that were staged
"""
# Method idempotent, should rais ;obj;'RedudantStaging' if staged twice
if self._staged != Staged.no:
return super().stage()
else:
# Reset flag for detector stopped
self._stopped = False
# Load metadata of the scan
self.scaninfo.load_scan_metadata()
# Prepare detector and file writer
self._prep_det_fw()
self._prep_det()
state = False
self._publish_file_location(done=state)
return super().stage()
@abstractmethod
def unstage(self):
"""
Unstage device in preparation for a scan
Returns directly if self._stopped,
otherwise checks with self._finished
if data acquisition on device finished (an was successful)
Internal Calls:
- _finished : check if device finished acquisition (succesfully)
- _publish_file_location : publish file location to bec
"""
# Check if scan was stopped
old_scanID = self.scaninfo.scanID
self.scaninfo.load_scan_metadata()
if self.scaninfo.scanID != old_scanID:
self._stopped = True
if self._stopped == True:
return super().unstage()
# check if device finished acquisition
self._finished()
state = True
# Publish file location to bec
self._publish_file_location(done=state, successful=state)
self._stopped = False
return super().unstage()
def _finished(self):
"""
Check if acquisition on device finished (succesfully)
This function is called from unstage, and will check if
detector and filewriter of the detector return from acquisition.
If desired, it can also raise in case data acquisition was incomplete
Small examples:
(1) check detector & detector filewriter status
if both finished --> good, if either is not finished --> raise
(2) (Optional) check if number of images received
is equivalent to the number of images requested
Raises (optional):
TimeoutError: if data acquisition was incomplete
"""
pass

View File

@ -28,9 +28,9 @@ def mock_det():
dm = DMMock()
with mock.patch.object(dm, "producer"):
with mock.patch(
"ophyd_devices.epics.devices.eiger9m_csaxs.FileWriterMixin"
"ophyd_devices.epics.devices.psi_detector_base.FileWriterMixin"
) as filemixin, mock.patch(
"ophyd_devices.epics.devices.eiger9m_csaxs.Eiger9McSAXS._update_service_config"
"ophyd_devices.epics.devices.psi_detector_base.PSIDetectorBase._update_service_config"
) as mock_service_config:
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
@ -51,9 +51,9 @@ def test_init():
dm = DMMock()
with mock.patch.object(dm, "producer"):
with mock.patch(
"ophyd_devices.epics.devices.eiger9m_csaxs.FileWriterMixin"
"ophyd_devices.epics.devices.psi_detector_base.FileWriterMixin"
) as filemixin, mock.patch(
"ophyd_devices.epics.devices.eiger9m_csaxs.Eiger9McSAXS._update_service_config"
"ophyd_devices.epics.devices.psi_detector_base.PSIDetectorBase._update_service_config"
) as mock_service_config:
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV