diff --git a/ophyd_devices/configs/delay_generator_example.yaml b/ophyd_devices/configs/delay_generator_example.yaml new file mode 100644 index 0000000..2d31242 --- /dev/null +++ b/ophyd_devices/configs/delay_generator_example.yaml @@ -0,0 +1,12 @@ +ddg: + description: DelayGenerator for detectors + deviceClass: ophyd_devices.devices.delay_generator_645.DelayGenerator + deviceConfig: + prefix: 'X12SA-CPCL-DDG3:' + deviceTags: + - cSAXS + - ddg_mcs + onFailure: buffer + enabled: true + readoutPriority: async + softwareTrigger: True \ No newline at end of file diff --git a/ophyd_devices/devices/delay_generator_645.py b/ophyd_devices/devices/delay_generator_645.py index 4ac5670..ff5c8f3 100644 --- a/ophyd_devices/devices/delay_generator_645.py +++ b/ophyd_devices/devices/delay_generator_645.py @@ -143,7 +143,13 @@ class DelayGenerator(Device): https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf """ - USER_ACCESS = ["set_channels", "burst_enable", "burst_disable", "set_trigger", "check_if_ddg_okay"] + USER_ACCESS = [ + "set_channels", + "burst_enable", + "burst_disable", + "set_trigger", + "check_if_ddg_okay", + ] # PVs trigger_burst_readout = Component( @@ -241,7 +247,7 @@ class DelayGenerator(Device): LINE = 6 """ value = int(source) - self.source.put(value) + self.source.set(value).wait() @typechecked def burst_enable( @@ -251,9 +257,12 @@ class DelayGenerator(Device): Args: count (int): Number of bursts >0 - delay (float): Delay between bursts in seconds >=0 + delay (float): Delay before bursts start in seconds >=0 period (float): Period of the bursts in seconds >0 - config (str): Configuration of the burst. Default is "all" + config (str): Configuration of T0 duiring burst. + In addition, to simplify triggering of other instruments synchronously with the burst, + the T0 output may be configured to fire on the first delay cycle of the burst, + rather than for all delay cycles as is normally the case. """ # Check inputs first @@ -317,8 +326,9 @@ class DelayGenerator(Device): status = self.status.read()[self.status.name]["value"] if status != "STATUS OK" and not raise_on_error: logger.warning(f"DDG returns {status}, trying to clear ERROR") - self.parent.clear_error() + # TODO check if clear_error is working + self.clear_error.put(1) time.sleep(sleep_time) - self.is_ddg_okay(raise_on_error=True) + self.check_if_ddg_okay(raise_on_error=True) elif status != "STATUS OK": raise DelayGeneratorError(f"DDG failed to start with status: {status}") diff --git a/ophyd_devices/interfaces/base_classes/psi_delay_generator_base.py b/ophyd_devices/interfaces/base_classes/psi_delay_generator_base.py deleted file mode 100644 index 713bb34..0000000 --- a/ophyd_devices/interfaces/base_classes/psi_delay_generator_base.py +++ /dev/null @@ -1,336 +0,0 @@ -import time -from typing import Any - -from bec_lib import bec_logger -from ophyd import ( - Component, - Device, - DeviceStatus, - EpicsSignal, - EpicsSignalRO, - Kind, - PVPositioner, - Signal, -) -from ophyd.device import Staged - -from ophyd_devices.devices.delay_generator_645 import DelayGenerator -from ophyd_devices.interfaces.base_classes.psi_device_base import CustomPrepare, PSIDeviceBase -from ophyd_devices.utils import bec_utils -from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin - -logger = bec_logger.logger - - -class DelayGeneratorNotOkay(Exception): - """Custom exception class for DelayGenerator errors""" - - -# class DDGCustomMixin: -# """ -# Mixin class for custom DelayGenerator logic within PSIDelayGeneratorBase. - -# This class provides a parent class for implementation of BL specific logic of the device. -# It is also possible to pass implementing certain methods, e.g. finished or on_trigger, -# based on the setup and desired operation mode at the beamline. - -# Args: -# parent (object): instance of PSIDelayGeneratorBase -# **kwargs: keyword arguments -# """ - -# def __init__(self, *_args, parent: Device = None, **_kwargs) -> None: -# self.parent = parent - -# def initialize_default_parameter(self) -> None: -# """ -# Method to initialize default parameters for DDG. - -# Called upon initiating the base class. -# It should be used to set the DDG default parameters. -# These may include: amplitude, offsets, delays, etc. -# """ - -# def prepare_ddg(self) -> None: -# """ -# Method to prepare the DDG for the upcoming scan. - -# Called by the stage method of the base class. -# It should be used to set the DDG parameters for the upcoming scan. -# """ - -# def on_trigger(self) -> None: -# """Method executed upon trigger call in parent class""" - -# def finished(self) -> None: -# """Method to check if DDG is finished with the scan""" - -# def on_pre_scan(self) -> None: -# """ -# Method executed upon pre_scan call in parent class. - -# Covenient to implement time sensitive actions to be executed right before start of the scan. -# Example could be to open the shutter by triggering a pulse via pre_scan. -# """ - -# def check_scan_id(self) -> None: -# """Method to check if there is a new scan_id, called by stage.""" - -# def is_ddg_okay(self, raise_on_error=False) -> None: -# """ -# Method to check if DDG is okay - -# It checks the status PV of the DDG and tries to clear the error if it is not okay. -# It will rerun itself and raise DelayGeneratorNotOkay if DDG is still not okay. - -# Args: -# raise_on_error (bool, optional): raise exception if DDG is not okay. Defaults to False. -# """ -# status = self.parent.status.read()[self.parent.status.name]["value"] -# if status != "STATUS OK" and not raise_on_error: -# logger.warning(f"DDG returns {status}, trying to clear ERROR") -# self.parent.clear_error() -# time.sleep(1) -# self.is_ddg_okay(raise_on_error=True) -# elif status != "STATUS OK": -# raise DelayGeneratorNotOkay(f"DDG failed to start with status: {status}") - - -class PSIDelayGeneratorBase(PSIDeviceBase, DelayGenerator): - """ - Abstract base class for DelayGenerator DG645 - - This class implements a thin Ophyd wrapper around the Stanford Research DG645 - digital delay generator. - - The DG645 generates 8+1 signals: A, B, C, D, E, F, G, H and T0. Front panel outputs - T0, AB, CD, EF and GH are combinations of these signals. Back panel outputs are - directly routed signals. Signals are not independent. - - Signal pairs, e.g. AB, CD, EF, GH, are implemented as DelayPair objects. They - have a TTL pulse width, delay and a reference signal to which they are being triggered. - In addition, the io layer allows setting amplitude, offset and polarity for each pair. - - Detailed information can be found in the manual: - https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf - - Class attributes: - custom_prepare_cls (object): class for custom prepare logic (BL specific) - - Args: - prefix (str) : EPICS PV prefix for component (optional) - name (str) : name of the device, as will be reported via read() - kind (str) : member of class 'ophydobj.Kind', defaults to Kind.normal - omitted -> readout ignored for read 'ophydobj.read()' - normal -> readout for read - config -> config parameter for 'ophydobj.read_configuration()' - hinted -> which attribute is readout for read - read_attrs (list) : sequence of attribute names to read - configuration_attrs (list) : sequence of attribute names via config_parameters - parent (object) : instance of the parent device - device_manager (object) : bec device manager - sim_mode (bool) : simulation mode, if True, no device manager is required - **kwargs : keyword arguments - attributes : lazy_wait_for_connection : bool - """ - - # Custom_prepare_cls - # custom_prepare_cls = DDGCustomMixin - - # SUB_PROGRESS = "progress" - # SUB_VALUE = "value" - # _default_sub = SUB_VALUE - - # def __init__( - # self, - # prefix="", - # *, - # name, - # kind=None, - # read_attrs=None, - # configuration_attrs=None, - # parent=None, - # device_manager=None, - # sim_mode=False, - # **kwargs, - # ): - # super().__init__( - # prefix=prefix, - # name=name, - # kind=kind, - # read_attrs=read_attrs, - # configuration_attrs=configuration_attrs, - # parent=parent, - # **kwargs, - # ) - # if device_manager is None and not sim_mode: - # raise DeviceInitError( - # f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add" - # " DeviceManager to initialization or init with sim_mode=True" - # ) - # # Init variables - # self.sim_mode = sim_mode - # self.stopped = False - # self.name = name - # self.scaninfo = None - # self.timeout = 5 - # self.all_channels = ["channelT0", "channelAB", "channelCD", "channelEF", "channelGH"] - # self.all_delay_pairs = ["AB", "CD", "EF", "GH"] - # self.wait_for_connection(all_signals=True) - - # # Init custom prepare class with BL specific logic - # self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs) - # if not sim_mode: - # self.device_manager = device_manager - # else: - # self.device_manager = bec_utils.DMMock() - # self.connector = self.device_manager.connector - # self._update_scaninfo() - # self._init() - - # def _update_scaninfo(self) -> None: - # """ - # Method to updated scaninfo from BEC. - - # In sim_mode, scaninfo output is mocked - see bec_scaninfo_mixin.py - # """ - # self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode) - # self.scaninfo.load_scan_metadata() - - # def _init(self) -> None: - # """Method to initialize custom parameters of the DDG.""" - # self.custom_prepare.initialize_default_parameter() - # self.custom_prepare.is_ddg_okay() - - # def set_channels(self, signal: str, value: Any, channels: list = None) -> None: - # """ - # Method to set signals on DelayPair and DelayStatic channels. - - # Signals can be set on the DelayPair and DelayStatic channels. The method checks - # if the signal is available on the channel and sets it. It works for both, DelayPair - # and Delay Static although signals are hosted in different layers. - - # Args: - # signal (str) : signal to set (width, delay, amplitude, offset, polarity) - # value (Any) : value to set - # channels (list, optional) : list of channels to set. Defaults to self.all_channels (T0,AB,CD,EF,GH) - # """ - # if not channels: - # channels = self.all_channels - # for chname in channels: - # channel = getattr(self, chname, None) - # if not channel: - # continue - # if signal in channel.component_names: - # getattr(channel, signal).set(value) - # continue - # if "io" in channel.component_names and signal in channel.io.component_names: - # getattr(channel.io, signal).set(value) - - # def set_trigger(self, trigger_source: TriggerSource) -> None: - # """Set trigger source on DDG - possible values defined in TriggerSource enum""" - # value = int(trigger_source) - # self.source.put(value) - - # def burst_enable(self, count, delay, period, config="all"): - # """Enable the burst mode""" - # # Validate inputs - # count = int(count) - # assert count > 0, "Number of bursts must be positive" - # assert delay >= 0, "Burst delay must be larger than 0" - # assert period > 0, "Burst period must be positive" - # assert config in ["all", "first"], "Supported burst configs are 'all' and 'first'" - - # self.burstMode.put(1) - # self.burstCount.put(count) - # self.burstDelay.put(delay) - # self.burstPeriod.put(period) - - # if config == "all": - # self.burstConfig.put(0) - # elif config == "first": - # self.burstConfig.put(1) - - # def burst_disable(self): - # """Disable burst mode""" - # self.burstMode.put(0) - - # def stage(self) -> list[object]: - # """ - # Method to stage the device. - - # Called in preparation for a scan. - - # Internal Calls: - # - scaninfo.load_scan_metadata : load scan metadata - # - custom_prepare.prepare_ddg : prepare DDG for measurement - # - is_ddg_okay : check if DDG is okay - - # Returns: - # list(object): list of objects that were staged - # """ - # if self._staged != Staged.no: - # return super().stage() - # self.stopped = False - # self.scaninfo.load_scan_metadata() - # self.custom_prepare.prepare_ddg() - # self.custom_prepare.is_ddg_okay() - # # At the moment needed bc signal might not be reliable, BEC too fast. - # # Consider removing this overhead in future! - # time.sleep(0.05) - # return super().stage() - - # def trigger(self) -> DeviceStatus: - # """ - # Method to trigger the acquisition. - - # Internal Call: - # - custom_prepare.on_trigger : execute BL specific action - # """ - # self.custom_prepare.on_trigger() - # return super().trigger() - - # def pre_scan(self) -> None: - # """ - # Method pre_scan gets executed directly before the scan - - # Internal Call: - # - custom_prepare.on_pre_scan : execute BL specific action - # """ - # self.custom_prepare.on_pre_scan() - - # def unstage(self) -> list[object]: - # """ - # Method unstage gets called at the end of a scan. - - # If scan (self.stopped is True) is stopped, returns directly. - # Otherwise, checks if the DDG finished acquisition - - # Internal Calls: - # - custom_prepare.check_scan_id : check if scan_id changed or detector stopped - # - custom_prepare.finished : check if device finished acquisition (succesfully) - # - is_ddg_okay : check if DDG is okay - - # Returns: - # list(object): list of objects that were unstaged - # """ - # self.custom_prepare.check_scan_id() - # if self.stopped is True: - # return super().unstage() - # self.custom_prepare.finished() - # self.custom_prepare.is_ddg_okay() - # self.stopped = False - # return super().unstage() - - # def stop(self, *, success=False) -> None: - # """ - # Method to stop the DDG - - # #TODO Check if the pulse generation can be interruppted - - # Internal Call: - # - custom_prepare.is_ddg_okay : check if DDG is okay - # """ - # self.custom_prepare.is_ddg_okay() - # super().stop(success=success) - # self.stopped = True diff --git a/ophyd_devices/interfaces/base_classes/psi_detector_base.py b/ophyd_devices/interfaces/base_classes/psi_detector_base.py index d59f2d2..9f64491 100644 --- a/ophyd_devices/interfaces/base_classes/psi_detector_base.py +++ b/ophyd_devices/interfaces/base_classes/psi_detector_base.py @@ -1,459 +1,65 @@ +from bec_lib import messages +from bec_lib.endpoints import MessageEndpoints +from ophyd import Component as Cpt +from ophyd import Kind + from ophyd_devices.interfaces.base_classes.psi_device_base import CustomPrepare, PSIDeviceBase - - -class PSIDetectorBase(PSIDeviceBase): - """Deprecated, use PSIDeviceBase instead. Here for backwards compatibility.""" +from ophyd_devices.sim.sim_signals import SetableSignal class CustomDetectorMixin(CustomPrepare): """Deprecated, use CustomPrepare instead. Here for backwards compatibility.""" - -# """This module contains the base class for SLS detectors. We follow the approach to integrate -# PSI detectors into the BEC system based on this base class. The base class is used to implement -# certain methods that are expected by BEC, such as stage, unstage, trigger, stop, etc... -# We use composition with a custom prepare class to implement BL specific logic for the detector. -# The beamlines need to inherit from the CustomDetectorMixing for their mixin classes.""" - -# import os -# import threading -# import time -# import traceback - -# from bec_lib import messages -# from bec_lib.endpoints import MessageEndpoints -# from bec_lib.file_utils import FileWriter -# from bec_lib.logger import bec_logger -# from ophyd import Component, Device, DeviceStatus, Kind -# from ophyd.device import Staged - -# from ophyd_devices.sim.sim_signals import SetableSignal -# from ophyd_devices.utils import bec_utils -# from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin -# from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError - -# logger = bec_logger.logger - - -# class DetectorInitError(Exception): -# """Raised when initiation of the device class fails, -# due to missing device manager or not started in sim_mode.""" - - -# class CustomDetectorMixin: -# """ -# Mixin class for custom detector logic - -# This class is used to implement BL specific logic for the detector. -# It is used in the PSIDetectorBase class. - -# For the integration of a new detector, the following functions should -# help with integrating functionality, but additional ones can be added. - -# Check PSIDetectorBase for the functions that are called during relevant function calls of -# stage, unstage, trigger, stop and _init. -# """ - -# def __init__(self, *_args, parent: Device = None, **_kwargs) -> None: -# self.parent = parent - -# def on_init(self) -> None: -# """ -# Init sequence for the detector -# """ - -# def on_stage(self) -> None: -# """ -# Specify actions to be executed during stage in preparation for a scan. -# self.parent.scaninfo already has all current parameters for the upcoming scan. - -# In case the backend service is writing data on disk, this step should include publishing -# a file_event and file_message to BEC to inform the system where the data is written to. - -# IMPORTANT: -# It must be safe to assume that the device is ready for the scan -# to start immediately once this function is finished. -# """ - -# def on_unstage(self) -> None: -# """ -# Specify actions to be executed during unstage. - -# This step should include checking if the acqusition was successful, -# and publishing the file location and file event message, -# with flagged done to BEC. -# """ - -# def on_stop(self) -> None: -# """ -# Specify actions to be executed during stop. -# This must also set self.parent.stopped to True. - -# This step should include stopping the detector and backend service. -# """ - -# def on_trigger(self) -> None | DeviceStatus: -# """ -# Specify actions to be executed upon receiving trigger signal. -# Return a DeviceStatus object or None -# """ - -# def on_pre_scan(self) -> None: -# """ -# Specify actions to be executed right before a scan starts. - -# Only use if needed, and it is recommended to keep this function as short/fast as possible. -# """ - -# def on_complete(self) -> None | DeviceStatus: -# """ -# Specify actions to be executed when the scan is complete. - -# This can for instance be to check with the detector and backend if all data is written succsessfully. -# """ - -# # TODO make this a SUB event in the device manager -# def publish_file_location( -# self, -# done: bool, -# successful: bool, -# filepath: str = None, -# hinted_locations: dict = None, -# metadata: dict = None, -# ) -> None: -# """ -# Publish the filepath to REDIS. - -# We publish two events here: -# - file_event: event for the filewriter -# - public_file: event for any secondary service (e.g. radial integ code) - -# Args: -# done (bool): True if scan is finished -# successful (bool): True if scan was successful -# filepath (str): Optional, filepath to publish. If None, it will be taken from self.parent.filepath.get() -# hinted_locations (dict): Optional, dictionary with hinted locations; {dev_name : h5_entry} -# metadata (dict): additional metadata to publish -# """ -# if metadata is None: -# metadata = {} - -# if filepath is None: -# file_path = self.parent.filepath.get() - -# msg = messages.FileMessage( -# file_path=self.parent.filepath.get(), -# hinted_locations=hinted_locations, -# done=done, -# successful=successful, -# metadata=metadata, -# ) -# pipe = self.parent.connector.pipeline() -# self.parent.connector.set_and_publish( -# MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name), -# msg, -# pipe=pipe, -# ) -# self.parent.connector.set_and_publish( -# MessageEndpoints.file_event(self.parent.name), msg, pipe=pipe -# ) -# pipe.execute() - -# def wait_for_signals( -# self, -# signal_conditions: list[tuple], -# timeout: float, -# check_stopped: bool = False, -# interval: float = 0.05, -# all_signals: bool = False, -# ) -> bool: -# """ -# Convenience wrapper to allow waiting for signals to reach a certain condition. -# For EPICs PVs, an example usage is pasted at the bottom. - -# Args: -# signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check -# timeout (float): timeout in seconds -# interval (float): interval in seconds -# all_signals (bool): True if all signals should be True, False if any signal should be True - -# Returns: -# bool: True if all signals are in the desired state, False if timeout is reached - -# >>> Example usage for EPICS PVs: -# >>> self.wait_for_signals(signal_conditions=[(self.acquiring.get, False)], timeout=5, interval=0.05, check_stopped=True, all_signals=True) -# """ - -# timer = 0 -# while True: -# checks = [ -# get_current_state() == condition -# for get_current_state, condition in signal_conditions -# ] -# if check_stopped is True and self.parent.stopped is True: -# return False -# if (all_signals and all(checks)) or (not all_signals and any(checks)): -# return True -# if timer > timeout: -# return False -# time.sleep(interval) -# timer += interval - -# def wait_with_status( -# self, -# signal_conditions: list[tuple], -# timeout: float, -# check_stopped: bool = False, -# interval: float = 0.05, -# all_signals: bool = False, -# exception_on_timeout: Exception = None, -# ) -> DeviceStatus: -# """Utility function to wait for signals in a thread. -# Returns a DevicesStatus object that resolves either to set_finished or set_exception. -# The DeviceStatus is attached to the paent device, i.e. the detector object inheriting from PSIDetectorBase. - -# Usage: -# This function should be used to wait for signals to reach a certain condition, especially in the context of -# on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC. -# It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met, -# the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception. -# The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError. - -# Args: -# signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check -# timeout (float): timeout in seconds -# check_stopped (bool): True if stopped flag should be checked -# interval (float): interval in seconds -# all_signals (bool): True if all signals should be True, False if any signal should be True -# exception_on_timeout (Exception): Exception to raise on timeout - -# Returns: -# DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception -# """ -# if exception_on_timeout is None: -# exception_on_timeout = DeviceTimeoutError( -# f"Timeout error for {self.parent.name} while waiting for signals {signal_conditions}" -# ) - -# status = DeviceStatus(self.parent) - -# # utility function to wrap the wait_for_signals function -# def wait_for_signals_wrapper( -# status: DeviceStatus, -# signal_conditions: list[tuple], -# timeout: float, -# check_stopped: bool, -# interval: float, -# all_signals: bool, -# exception_on_timeout: Exception, -# ): -# """Convenient wrapper around wait_for_signals to set status based on the result. - -# Args: -# status (DeviceStatus): DeviceStatus object to be set -# signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check -# timeout (float): timeout in seconds -# check_stopped (bool): True if stopped flag should be checked -# interval (float): interval in seconds -# all_signals (bool): True if all signals should be True, False if any signal should be True -# exception_on_timeout (Exception): Exception to raise on timeout -# """ -# try: -# result = self.wait_for_signals( -# signal_conditions, timeout, check_stopped, interval, all_signals -# ) -# if result: -# status.set_finished() -# else: -# if self.parent.stopped: -# # INFO This will execute a callback to the parent device.stop() method -# status.set_exception(exc=DeviceStopError(f"{self.parent.name} was stopped")) -# else: -# # INFO This will execute a callback to the parent device.stop() method -# status.set_exception(exc=exception_on_timeout) -# # pylint: disable=broad-except -# except Exception as exc: -# content = traceback.format_exc() -# logger.warning( -# f"Error in wait_for_signals in {self.parent.name}; Traceback: {content}" -# ) -# # INFO This will execute a callback to the parent device.stop() method -# status.set_exception(exc=exc) - -# thread = threading.Thread( -# target=wait_for_signals_wrapper, -# args=( -# status, -# signal_conditions, -# timeout, -# check_stopped, -# interval, -# all_signals, -# exception_on_timeout, -# ), -# daemon=True, -# ) -# thread.start() -# return status - - -# class PSIDetectorBase(Device): -# """ -# Abstract base class for SLS detectors - -# Class attributes: -# custom_prepare_cls (object): class for custom prepare logic (BL specific) - -# Args: -# prefix (str): EPICS PV prefix for component (optional) -# name (str): name of the device, as will be reported via read() -# kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal -# omitted -> readout ignored for read 'ophydobj.read()' -# normal -> readout for read -# config -> config parameter for 'ophydobj.read_configuration()' -# hinted -> which attribute is readout for read -# parent (object): instance of the parent device -# device_manager (object): bec device manager -# **kwargs: keyword arguments -# """ - -# filepath = Component(SetableSignal, value="", kind=Kind.config) - -# custom_prepare_cls = CustomDetectorMixin - -# def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs): -# super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs) -# self.stopped = False -# self.name = name -# self.service_cfg = None -# self.scaninfo = None -# self.filewriter = None - -# if not issubclass(self.custom_prepare_cls, CustomDetectorMixin): -# raise DetectorInitError("Custom prepare class must be subclass of CustomDetectorMixin") -# self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs) - -# if device_manager: -# self._update_service_config() -# self.device_manager = device_manager -# else: -# self.device_manager = bec_utils.DMMock() -# base_path = kwargs["basepath"] if "basepath" in kwargs else "." -# self.service_cfg = {"base_path": os.path.abspath(base_path)} - -# self.connector = self.device_manager.connector -# self._update_scaninfo() -# self._update_filewriter() -# self._init() - -# def _update_filewriter(self) -> None: -# """Update filewriter with service config""" -# self.filewriter = FileWriter(service_config=self.service_cfg, connector=self.connector) - -# def _update_scaninfo(self) -> None: -# """Update scaninfo from BecScaninfoMixing -# This depends on device manager and operation/sim_mode -# """ -# self.scaninfo = BecScaninfoMixin(self.device_manager) -# self.scaninfo.load_scan_metadata() - -# def _update_service_config(self) -> None: -# """Update service config from BEC service config - -# If bec services are not running and SERVICE_CONFIG is NONE, we fall back to the current directory. -# """ -# # pylint: disable=import-outside-toplevel -# from bec_lib.bec_service import SERVICE_CONFIG - -# if SERVICE_CONFIG: -# self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] -# return -# self.service_cfg = {"base_path": os.path.abspath(".")} - -# def check_scan_id(self) -> None: -# """Checks if scan_id has changed and set stopped flagged to True if it has.""" -# old_scan_id = self.scaninfo.scan_id -# self.scaninfo.load_scan_metadata() -# if self.scaninfo.scan_id != old_scan_id: -# self.stopped = True - -# def _init(self) -> None: -# """Initialize detector, filewriter and set default parameters""" -# self.custom_prepare.on_init() - -# def stage(self) -> list[object]: -# """ -# Stage device in preparation for a scan. -# First we check if the device is already staged. Stage is idempotent, -# if staged twice it should raise (we let ophyd.Device handle the raise here). -# We reset the stopped flag and get the scaninfo from BEC, before calling custom_prepare.on_stage. - -# Returns: -# list(object): list of objects that were staged - -# """ -# if self._staged != Staged.no: -# return super().stage() -# self.stopped = False -# self.scaninfo.load_scan_metadata() -# self.custom_prepare.on_stage() -# return super().stage() - -# def pre_scan(self) -> None: -# """Pre-scan logic. - -# This function will be called from BEC directly before the scan core starts, and should only implement -# time-critical actions. Therefore, it should also be kept as short/fast as possible. -# I.e. Arming a detector in case there is a risk of timing out. -# """ -# self.custom_prepare.on_pre_scan() - -# def trigger(self) -> DeviceStatus: -# """Trigger the detector, called from BEC.""" -# # pylint: disable=assignment-from-no-return -# status = self.custom_prepare.on_trigger() -# if isinstance(status, DeviceStatus): -# return status -# return super().trigger() - -# def complete(self) -> None: -# """Complete the acquisition, called from BEC. - -# This function is called after the scan is complete, just before unstage. -# We can check here with the data backend and detector if the acquisition successfully finished. - -# Actions are implemented in custom_prepare.on_complete since they are beamline specific. -# """ -# # pylint: disable=assignment-from-no-return -# status = self.custom_prepare.on_complete() -# if isinstance(status, DeviceStatus): -# return status -# status = DeviceStatus(self) -# status.set_finished() -# return status - -# def unstage(self) -> list[object]: -# """ -# Unstage device after a scan. - -# We first check if the scanID has changed, thus, the scan was unexpectedly interrupted but the device was not stopped. -# If that is the case, the stopped flag is set to True, which will immediately unstage the device. - -# Custom_prepare.on_unstage is called to allow for BL specific logic to be executed. - -# Returns: -# list(object): list of objects that were unstaged -# """ -# self.check_scan_id() -# self.custom_prepare.on_unstage() -# self.stopped = False -# return super().unstage() - -# def stop(self, *, success=False) -> None: -# """ -# Stop the scan, with camera and file writer - -# """ -# self.custom_prepare.on_stop() -# super().stop(success=success) -# self.stopped = True + def publish_file_location( + self, + done: bool, + successful: bool, + filepath: str = None, + hinted_locations: dict = None, + metadata: dict = None, + ) -> None: + """ + Publish the filepath to REDIS. + + We publish two events here: + - file_event: event for the filewriter + - public_file: event for any secondary service (e.g. radial integ code) + + Args: + done (bool): True if scan is finished + successful (bool): True if scan was successful + filepath (str): Optional, filepath to publish. If None, it will be taken from self.parent.filepath.get() + hinted_locations (dict): Optional, dictionary with hinted locations; {dev_name : h5_entry} + metadata (dict): additional metadata to publish + """ + if metadata is None: + metadata = {} + + if filepath is None: + file_path = self.parent.filepath.get() + + msg = messages.FileMessage( + file_path=self.parent.filepath.get(), + hinted_locations=hinted_locations, + done=done, + successful=successful, + metadata=metadata, + ) + pipe = self.parent.connector.pipeline() + self.parent.connector.set_and_publish( + MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name), + msg, + pipe=pipe, + ) + self.parent.connector.set_and_publish( + MessageEndpoints.file_event(self.parent.name), msg, pipe=pipe + ) + pipe.execute() + + +class PSIDetectorBase(PSIDeviceBase): + """Deprecated, use PSIDeviceBase instead. Here for backwards compatibility.""" + + custom_prepare_cls = CustomDetectorMixin + + filepath = Cpt(SetableSignal, value="", kind=Kind.config) diff --git a/ophyd_devices/tests/utils.py b/ophyd_devices/tests/utils.py index 4285320..ff1502c 100644 --- a/ophyd_devices/tests/utils.py +++ b/ophyd_devices/tests/utils.py @@ -2,16 +2,29 @@ from unittest import mock def patch_dual_pvs(device): + """Patch dual PVs""" + patch_functions_required_for_connection(device) device.wait_for_connection(all_signals=True) for walk in device.walk_signals(): if not hasattr(walk.item, "_read_pv"): continue if not hasattr(walk.item, "_write_pv"): continue - if walk.item._read_pv.pvname.endswith("_RBV"): + if walk.item._read_pv.pvname != walk.item._write_pv.pvname: walk.item._read_pv = walk.item._write_pv +def patch_functions_required_for_connection(device): + """Patch functions required for connection. This will run the subs for all sub devices and devices. + This is needed to ensure that the wait_for_connection method of required for connections methods are properly patched. + """ + for event in device.event_types: + device._run_subs(sub_type=event, value=0, timestamp=0) + for name, dev in device.walk_subdevices(include_lazy=True): + for event in dev.event_types: + dev._run_subs(sub_type=event, value=0, timestamp=0) + + class SocketMock: """Socket Mock. Used for testing""" diff --git a/tests/test_delay_generator.py b/tests/test_delay_generator.py new file mode 100644 index 0000000..ca441b6 --- /dev/null +++ b/tests/test_delay_generator.py @@ -0,0 +1,75 @@ +import threading +from unittest import mock + +import ophyd +import pytest + +from ophyd_devices.devices.delay_generator_645 import ( + DelayGenerator, + DelayGeneratorError, + TriggerSource, +) +from ophyd_devices.tests.utils import ( + MockPV, + patch_dual_pvs, + patch_functions_required_for_connection, +) + + +@pytest.fixture(scope="function") +def mock_ddg(): + name = "ddg" + prefix = "X12SA-CPCL-DDG3:" + with mock.patch.object(ophyd, "cl") as mock_cl: + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + ddg = DelayGenerator(name=name, prefix=prefix) + patch_functions_required_for_connection(ddg) + patch_dual_pvs(ddg) + yield ddg + + +def test_ddg_init(mock_ddg): + """This test the initialization of the DelayGenerator""" + assert mock_ddg.name == "ddg" + assert mock_ddg.prefix == "X12SA-CPCL-DDG3:" + + +def test_set_trigger(mock_ddg): + """This test the set_trigger method of the DelayGenerator""" + mock_ddg.set_trigger(TriggerSource.SINGLE_SHOT) + assert mock_ddg.source.get() == 5 + mock_ddg.set_trigger(TriggerSource.INTERNAL) + assert mock_ddg.source.get() == 0 + + +def test_burst_enable(mock_ddg): + """This test the burst_enable method of the DelayGenerator""" + count = 10 + delay = 0.1 + period = 0.2 + + mock_ddg.burst_enable(count=count, delay=delay, period=period) + assert mock_ddg.burstMode.get() == 1 + assert mock_ddg.burstCount.get() == count + assert mock_ddg.burstDelay.get() == delay + assert mock_ddg.burstPeriod.get() == period + assert mock_ddg.burstConfig.get() == 0 + with pytest.raises(DelayGeneratorError): + delay = -1 + mock_ddg.burst_enable(count=count, delay=delay, period=period) + with pytest.raises(DelayGeneratorError): + delay = 0 + period = 0 + mock_ddg.burst_enable(count=count, delay=delay, period=period) + + +def test_check_if_ddg_okay(mock_ddg): + """This test the is_ddg_okay method of the DelayGenerator""" + # Test for when the status is okay + mock_ddg.status._read_pv.mock_data = "STATUS OK" + assert mock_ddg.check_if_ddg_okay() is None + # Test for when the status is not okay + mock_ddg.status._read_pv.mock_data = "STATUS NOT OK" + with pytest.raises(DelayGeneratorError): + mock_ddg.check_if_ddg_okay()