fix: cleanup, add test for ddg base class

This commit is contained in:
appel_c 2024-12-05 08:51:31 +01:00
parent 13f456e78e
commit 7fe80c1608
6 changed files with 176 additions and 796 deletions

View File

@ -0,0 +1,12 @@
ddg:
description: DelayGenerator for detectors
deviceClass: ophyd_devices.devices.delay_generator_645.DelayGenerator
deviceConfig:
prefix: 'X12SA-CPCL-DDG3:'
deviceTags:
- cSAXS
- ddg_mcs
onFailure: buffer
enabled: true
readoutPriority: async
softwareTrigger: True

View File

@ -143,7 +143,13 @@ class DelayGenerator(Device):
https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf
"""
USER_ACCESS = ["set_channels", "burst_enable", "burst_disable", "set_trigger", "check_if_ddg_okay"]
USER_ACCESS = [
"set_channels",
"burst_enable",
"burst_disable",
"set_trigger",
"check_if_ddg_okay",
]
# PVs
trigger_burst_readout = Component(
@ -241,7 +247,7 @@ class DelayGenerator(Device):
LINE = 6
"""
value = int(source)
self.source.put(value)
self.source.set(value).wait()
@typechecked
def burst_enable(
@ -251,9 +257,12 @@ class DelayGenerator(Device):
Args:
count (int): Number of bursts >0
delay (float): Delay between bursts in seconds >=0
delay (float): Delay before bursts start in seconds >=0
period (float): Period of the bursts in seconds >0
config (str): Configuration of the burst. Default is "all"
config (str): Configuration of T0 duiring burst.
In addition, to simplify triggering of other instruments synchronously with the burst,
the T0 output may be configured to fire on the first delay cycle of the burst,
rather than for all delay cycles as is normally the case.
"""
# Check inputs first
@ -317,8 +326,9 @@ class DelayGenerator(Device):
status = self.status.read()[self.status.name]["value"]
if status != "STATUS OK" and not raise_on_error:
logger.warning(f"DDG returns {status}, trying to clear ERROR")
self.parent.clear_error()
# TODO check if clear_error is working
self.clear_error.put(1)
time.sleep(sleep_time)
self.is_ddg_okay(raise_on_error=True)
self.check_if_ddg_okay(raise_on_error=True)
elif status != "STATUS OK":
raise DelayGeneratorError(f"DDG failed to start with status: {status}")

View File

@ -1,336 +0,0 @@
import time
from typing import Any
from bec_lib import bec_logger
from ophyd import (
Component,
Device,
DeviceStatus,
EpicsSignal,
EpicsSignalRO,
Kind,
PVPositioner,
Signal,
)
from ophyd.device import Staged
from ophyd_devices.devices.delay_generator_645 import DelayGenerator
from ophyd_devices.interfaces.base_classes.psi_device_base import CustomPrepare, PSIDeviceBase
from ophyd_devices.utils import bec_utils
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
logger = bec_logger.logger
class DelayGeneratorNotOkay(Exception):
"""Custom exception class for DelayGenerator errors"""
# class DDGCustomMixin:
# """
# Mixin class for custom DelayGenerator logic within PSIDelayGeneratorBase.
# This class provides a parent class for implementation of BL specific logic of the device.
# It is also possible to pass implementing certain methods, e.g. finished or on_trigger,
# based on the setup and desired operation mode at the beamline.
# Args:
# parent (object): instance of PSIDelayGeneratorBase
# **kwargs: keyword arguments
# """
# def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
# self.parent = parent
# def initialize_default_parameter(self) -> None:
# """
# Method to initialize default parameters for DDG.
# Called upon initiating the base class.
# It should be used to set the DDG default parameters.
# These may include: amplitude, offsets, delays, etc.
# """
# def prepare_ddg(self) -> None:
# """
# Method to prepare the DDG for the upcoming scan.
# Called by the stage method of the base class.
# It should be used to set the DDG parameters for the upcoming scan.
# """
# def on_trigger(self) -> None:
# """Method executed upon trigger call in parent class"""
# def finished(self) -> None:
# """Method to check if DDG is finished with the scan"""
# def on_pre_scan(self) -> None:
# """
# Method executed upon pre_scan call in parent class.
# Covenient to implement time sensitive actions to be executed right before start of the scan.
# Example could be to open the shutter by triggering a pulse via pre_scan.
# """
# def check_scan_id(self) -> None:
# """Method to check if there is a new scan_id, called by stage."""
# def is_ddg_okay(self, raise_on_error=False) -> None:
# """
# Method to check if DDG is okay
# It checks the status PV of the DDG and tries to clear the error if it is not okay.
# It will rerun itself and raise DelayGeneratorNotOkay if DDG is still not okay.
# Args:
# raise_on_error (bool, optional): raise exception if DDG is not okay. Defaults to False.
# """
# status = self.parent.status.read()[self.parent.status.name]["value"]
# if status != "STATUS OK" and not raise_on_error:
# logger.warning(f"DDG returns {status}, trying to clear ERROR")
# self.parent.clear_error()
# time.sleep(1)
# self.is_ddg_okay(raise_on_error=True)
# elif status != "STATUS OK":
# raise DelayGeneratorNotOkay(f"DDG failed to start with status: {status}")
class PSIDelayGeneratorBase(PSIDeviceBase, DelayGenerator):
"""
Abstract base class for DelayGenerator DG645
This class implements a thin Ophyd wrapper around the Stanford Research DG645
digital delay generator.
The DG645 generates 8+1 signals: A, B, C, D, E, F, G, H and T0. Front panel outputs
T0, AB, CD, EF and GH are combinations of these signals. Back panel outputs are
directly routed signals. Signals are not independent.
Signal pairs, e.g. AB, CD, EF, GH, are implemented as DelayPair objects. They
have a TTL pulse width, delay and a reference signal to which they are being triggered.
In addition, the io layer allows setting amplitude, offset and polarity for each pair.
Detailed information can be found in the manual:
https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf
Class attributes:
custom_prepare_cls (object): class for custom prepare logic (BL specific)
Args:
prefix (str) : EPICS PV prefix for component (optional)
name (str) : name of the device, as will be reported via read()
kind (str) : member of class 'ophydobj.Kind', defaults to Kind.normal
omitted -> readout ignored for read 'ophydobj.read()'
normal -> readout for read
config -> config parameter for 'ophydobj.read_configuration()'
hinted -> which attribute is readout for read
read_attrs (list) : sequence of attribute names to read
configuration_attrs (list) : sequence of attribute names via config_parameters
parent (object) : instance of the parent device
device_manager (object) : bec device manager
sim_mode (bool) : simulation mode, if True, no device manager is required
**kwargs : keyword arguments
attributes : lazy_wait_for_connection : bool
"""
# Custom_prepare_cls
# custom_prepare_cls = DDGCustomMixin
# SUB_PROGRESS = "progress"
# SUB_VALUE = "value"
# _default_sub = SUB_VALUE
# def __init__(
# self,
# prefix="",
# *,
# name,
# kind=None,
# read_attrs=None,
# configuration_attrs=None,
# parent=None,
# device_manager=None,
# sim_mode=False,
# **kwargs,
# ):
# super().__init__(
# prefix=prefix,
# name=name,
# kind=kind,
# read_attrs=read_attrs,
# configuration_attrs=configuration_attrs,
# parent=parent,
# **kwargs,
# )
# if device_manager is None and not sim_mode:
# raise DeviceInitError(
# f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add"
# " DeviceManager to initialization or init with sim_mode=True"
# )
# # Init variables
# self.sim_mode = sim_mode
# self.stopped = False
# self.name = name
# self.scaninfo = None
# self.timeout = 5
# self.all_channels = ["channelT0", "channelAB", "channelCD", "channelEF", "channelGH"]
# self.all_delay_pairs = ["AB", "CD", "EF", "GH"]
# self.wait_for_connection(all_signals=True)
# # Init custom prepare class with BL specific logic
# self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
# if not sim_mode:
# self.device_manager = device_manager
# else:
# self.device_manager = bec_utils.DMMock()
# self.connector = self.device_manager.connector
# self._update_scaninfo()
# self._init()
# def _update_scaninfo(self) -> None:
# """
# Method to updated scaninfo from BEC.
# In sim_mode, scaninfo output is mocked - see bec_scaninfo_mixin.py
# """
# self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode)
# self.scaninfo.load_scan_metadata()
# def _init(self) -> None:
# """Method to initialize custom parameters of the DDG."""
# self.custom_prepare.initialize_default_parameter()
# self.custom_prepare.is_ddg_okay()
# def set_channels(self, signal: str, value: Any, channels: list = None) -> None:
# """
# Method to set signals on DelayPair and DelayStatic channels.
# Signals can be set on the DelayPair and DelayStatic channels. The method checks
# if the signal is available on the channel and sets it. It works for both, DelayPair
# and Delay Static although signals are hosted in different layers.
# Args:
# signal (str) : signal to set (width, delay, amplitude, offset, polarity)
# value (Any) : value to set
# channels (list, optional) : list of channels to set. Defaults to self.all_channels (T0,AB,CD,EF,GH)
# """
# if not channels:
# channels = self.all_channels
# for chname in channels:
# channel = getattr(self, chname, None)
# if not channel:
# continue
# if signal in channel.component_names:
# getattr(channel, signal).set(value)
# continue
# if "io" in channel.component_names and signal in channel.io.component_names:
# getattr(channel.io, signal).set(value)
# def set_trigger(self, trigger_source: TriggerSource) -> None:
# """Set trigger source on DDG - possible values defined in TriggerSource enum"""
# value = int(trigger_source)
# self.source.put(value)
# def burst_enable(self, count, delay, period, config="all"):
# """Enable the burst mode"""
# # Validate inputs
# count = int(count)
# assert count > 0, "Number of bursts must be positive"
# assert delay >= 0, "Burst delay must be larger than 0"
# assert period > 0, "Burst period must be positive"
# assert config in ["all", "first"], "Supported burst configs are 'all' and 'first'"
# self.burstMode.put(1)
# self.burstCount.put(count)
# self.burstDelay.put(delay)
# self.burstPeriod.put(period)
# if config == "all":
# self.burstConfig.put(0)
# elif config == "first":
# self.burstConfig.put(1)
# def burst_disable(self):
# """Disable burst mode"""
# self.burstMode.put(0)
# def stage(self) -> list[object]:
# """
# Method to stage the device.
# Called in preparation for a scan.
# Internal Calls:
# - scaninfo.load_scan_metadata : load scan metadata
# - custom_prepare.prepare_ddg : prepare DDG for measurement
# - is_ddg_okay : check if DDG is okay
# Returns:
# list(object): list of objects that were staged
# """
# if self._staged != Staged.no:
# return super().stage()
# self.stopped = False
# self.scaninfo.load_scan_metadata()
# self.custom_prepare.prepare_ddg()
# self.custom_prepare.is_ddg_okay()
# # At the moment needed bc signal might not be reliable, BEC too fast.
# # Consider removing this overhead in future!
# time.sleep(0.05)
# return super().stage()
# def trigger(self) -> DeviceStatus:
# """
# Method to trigger the acquisition.
# Internal Call:
# - custom_prepare.on_trigger : execute BL specific action
# """
# self.custom_prepare.on_trigger()
# return super().trigger()
# def pre_scan(self) -> None:
# """
# Method pre_scan gets executed directly before the scan
# Internal Call:
# - custom_prepare.on_pre_scan : execute BL specific action
# """
# self.custom_prepare.on_pre_scan()
# def unstage(self) -> list[object]:
# """
# Method unstage gets called at the end of a scan.
# If scan (self.stopped is True) is stopped, returns directly.
# Otherwise, checks if the DDG finished acquisition
# Internal Calls:
# - custom_prepare.check_scan_id : check if scan_id changed or detector stopped
# - custom_prepare.finished : check if device finished acquisition (succesfully)
# - is_ddg_okay : check if DDG is okay
# Returns:
# list(object): list of objects that were unstaged
# """
# self.custom_prepare.check_scan_id()
# if self.stopped is True:
# return super().unstage()
# self.custom_prepare.finished()
# self.custom_prepare.is_ddg_okay()
# self.stopped = False
# return super().unstage()
# def stop(self, *, success=False) -> None:
# """
# Method to stop the DDG
# #TODO Check if the pulse generation can be interruppted
# Internal Call:
# - custom_prepare.is_ddg_okay : check if DDG is okay
# """
# self.custom_prepare.is_ddg_okay()
# super().stop(success=success)
# self.stopped = True

View File

@ -1,459 +1,65 @@
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from ophyd import Component as Cpt
from ophyd import Kind
from ophyd_devices.interfaces.base_classes.psi_device_base import CustomPrepare, PSIDeviceBase
class PSIDetectorBase(PSIDeviceBase):
"""Deprecated, use PSIDeviceBase instead. Here for backwards compatibility."""
from ophyd_devices.sim.sim_signals import SetableSignal
class CustomDetectorMixin(CustomPrepare):
"""Deprecated, use CustomPrepare instead. Here for backwards compatibility."""
# """This module contains the base class for SLS detectors. We follow the approach to integrate
# PSI detectors into the BEC system based on this base class. The base class is used to implement
# certain methods that are expected by BEC, such as stage, unstage, trigger, stop, etc...
# We use composition with a custom prepare class to implement BL specific logic for the detector.
# The beamlines need to inherit from the CustomDetectorMixing for their mixin classes."""
# import os
# import threading
# import time
# import traceback
# from bec_lib import messages
# from bec_lib.endpoints import MessageEndpoints
# from bec_lib.file_utils import FileWriter
# from bec_lib.logger import bec_logger
# from ophyd import Component, Device, DeviceStatus, Kind
# from ophyd.device import Staged
# from ophyd_devices.sim.sim_signals import SetableSignal
# from ophyd_devices.utils import bec_utils
# from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
# from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError
# logger = bec_logger.logger
# class DetectorInitError(Exception):
# """Raised when initiation of the device class fails,
# due to missing device manager or not started in sim_mode."""
# class CustomDetectorMixin:
# """
# Mixin class for custom detector logic
# This class is used to implement BL specific logic for the detector.
# It is used in the PSIDetectorBase class.
# For the integration of a new detector, the following functions should
# help with integrating functionality, but additional ones can be added.
# Check PSIDetectorBase for the functions that are called during relevant function calls of
# stage, unstage, trigger, stop and _init.
# """
# def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
# self.parent = parent
# def on_init(self) -> None:
# """
# Init sequence for the detector
# """
# def on_stage(self) -> None:
# """
# Specify actions to be executed during stage in preparation for a scan.
# self.parent.scaninfo already has all current parameters for the upcoming scan.
# In case the backend service is writing data on disk, this step should include publishing
# a file_event and file_message to BEC to inform the system where the data is written to.
# IMPORTANT:
# It must be safe to assume that the device is ready for the scan
# to start immediately once this function is finished.
# """
# def on_unstage(self) -> None:
# """
# Specify actions to be executed during unstage.
# This step should include checking if the acqusition was successful,
# and publishing the file location and file event message,
# with flagged done to BEC.
# """
# def on_stop(self) -> None:
# """
# Specify actions to be executed during stop.
# This must also set self.parent.stopped to True.
# This step should include stopping the detector and backend service.
# """
# def on_trigger(self) -> None | DeviceStatus:
# """
# Specify actions to be executed upon receiving trigger signal.
# Return a DeviceStatus object or None
# """
# def on_pre_scan(self) -> None:
# """
# Specify actions to be executed right before a scan starts.
# Only use if needed, and it is recommended to keep this function as short/fast as possible.
# """
# def on_complete(self) -> None | DeviceStatus:
# """
# Specify actions to be executed when the scan is complete.
# This can for instance be to check with the detector and backend if all data is written succsessfully.
# """
# # TODO make this a SUB event in the device manager
# def publish_file_location(
# self,
# done: bool,
# successful: bool,
# filepath: str = None,
# hinted_locations: dict = None,
# metadata: dict = None,
# ) -> None:
# """
# Publish the filepath to REDIS.
# We publish two events here:
# - file_event: event for the filewriter
# - public_file: event for any secondary service (e.g. radial integ code)
# Args:
# done (bool): True if scan is finished
# successful (bool): True if scan was successful
# filepath (str): Optional, filepath to publish. If None, it will be taken from self.parent.filepath.get()
# hinted_locations (dict): Optional, dictionary with hinted locations; {dev_name : h5_entry}
# metadata (dict): additional metadata to publish
# """
# if metadata is None:
# metadata = {}
# if filepath is None:
# file_path = self.parent.filepath.get()
# msg = messages.FileMessage(
# file_path=self.parent.filepath.get(),
# hinted_locations=hinted_locations,
# done=done,
# successful=successful,
# metadata=metadata,
# )
# pipe = self.parent.connector.pipeline()
# self.parent.connector.set_and_publish(
# MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name),
# msg,
# pipe=pipe,
# )
# self.parent.connector.set_and_publish(
# MessageEndpoints.file_event(self.parent.name), msg, pipe=pipe
# )
# pipe.execute()
# def wait_for_signals(
# self,
# signal_conditions: list[tuple],
# timeout: float,
# check_stopped: bool = False,
# interval: float = 0.05,
# all_signals: bool = False,
# ) -> bool:
# """
# Convenience wrapper to allow waiting for signals to reach a certain condition.
# For EPICs PVs, an example usage is pasted at the bottom.
# Args:
# signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
# timeout (float): timeout in seconds
# interval (float): interval in seconds
# all_signals (bool): True if all signals should be True, False if any signal should be True
# Returns:
# bool: True if all signals are in the desired state, False if timeout is reached
# >>> Example usage for EPICS PVs:
# >>> self.wait_for_signals(signal_conditions=[(self.acquiring.get, False)], timeout=5, interval=0.05, check_stopped=True, all_signals=True)
# """
# timer = 0
# while True:
# checks = [
# get_current_state() == condition
# for get_current_state, condition in signal_conditions
# ]
# if check_stopped is True and self.parent.stopped is True:
# return False
# if (all_signals and all(checks)) or (not all_signals and any(checks)):
# return True
# if timer > timeout:
# return False
# time.sleep(interval)
# timer += interval
# def wait_with_status(
# self,
# signal_conditions: list[tuple],
# timeout: float,
# check_stopped: bool = False,
# interval: float = 0.05,
# all_signals: bool = False,
# exception_on_timeout: Exception = None,
# ) -> DeviceStatus:
# """Utility function to wait for signals in a thread.
# Returns a DevicesStatus object that resolves either to set_finished or set_exception.
# The DeviceStatus is attached to the paent device, i.e. the detector object inheriting from PSIDetectorBase.
# Usage:
# This function should be used to wait for signals to reach a certain condition, especially in the context of
# on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC.
# It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met,
# the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception.
# The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError.
# Args:
# signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
# timeout (float): timeout in seconds
# check_stopped (bool): True if stopped flag should be checked
# interval (float): interval in seconds
# all_signals (bool): True if all signals should be True, False if any signal should be True
# exception_on_timeout (Exception): Exception to raise on timeout
# Returns:
# DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception
# """
# if exception_on_timeout is None:
# exception_on_timeout = DeviceTimeoutError(
# f"Timeout error for {self.parent.name} while waiting for signals {signal_conditions}"
# )
# status = DeviceStatus(self.parent)
# # utility function to wrap the wait_for_signals function
# def wait_for_signals_wrapper(
# status: DeviceStatus,
# signal_conditions: list[tuple],
# timeout: float,
# check_stopped: bool,
# interval: float,
# all_signals: bool,
# exception_on_timeout: Exception,
# ):
# """Convenient wrapper around wait_for_signals to set status based on the result.
# Args:
# status (DeviceStatus): DeviceStatus object to be set
# signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
# timeout (float): timeout in seconds
# check_stopped (bool): True if stopped flag should be checked
# interval (float): interval in seconds
# all_signals (bool): True if all signals should be True, False if any signal should be True
# exception_on_timeout (Exception): Exception to raise on timeout
# """
# try:
# result = self.wait_for_signals(
# signal_conditions, timeout, check_stopped, interval, all_signals
# )
# if result:
# status.set_finished()
# else:
# if self.parent.stopped:
# # INFO This will execute a callback to the parent device.stop() method
# status.set_exception(exc=DeviceStopError(f"{self.parent.name} was stopped"))
# else:
# # INFO This will execute a callback to the parent device.stop() method
# status.set_exception(exc=exception_on_timeout)
# # pylint: disable=broad-except
# except Exception as exc:
# content = traceback.format_exc()
# logger.warning(
# f"Error in wait_for_signals in {self.parent.name}; Traceback: {content}"
# )
# # INFO This will execute a callback to the parent device.stop() method
# status.set_exception(exc=exc)
# thread = threading.Thread(
# target=wait_for_signals_wrapper,
# args=(
# status,
# signal_conditions,
# timeout,
# check_stopped,
# interval,
# all_signals,
# exception_on_timeout,
# ),
# daemon=True,
# )
# thread.start()
# return status
# class PSIDetectorBase(Device):
# """
# Abstract base class for SLS detectors
# Class attributes:
# custom_prepare_cls (object): class for custom prepare logic (BL specific)
# Args:
# prefix (str): EPICS PV prefix for component (optional)
# name (str): name of the device, as will be reported via read()
# kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal
# omitted -> readout ignored for read 'ophydobj.read()'
# normal -> readout for read
# config -> config parameter for 'ophydobj.read_configuration()'
# hinted -> which attribute is readout for read
# parent (object): instance of the parent device
# device_manager (object): bec device manager
# **kwargs: keyword arguments
# """
# filepath = Component(SetableSignal, value="", kind=Kind.config)
# custom_prepare_cls = CustomDetectorMixin
# def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs):
# super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs)
# self.stopped = False
# self.name = name
# self.service_cfg = None
# self.scaninfo = None
# self.filewriter = None
# if not issubclass(self.custom_prepare_cls, CustomDetectorMixin):
# raise DetectorInitError("Custom prepare class must be subclass of CustomDetectorMixin")
# self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
# if device_manager:
# self._update_service_config()
# self.device_manager = device_manager
# else:
# self.device_manager = bec_utils.DMMock()
# base_path = kwargs["basepath"] if "basepath" in kwargs else "."
# self.service_cfg = {"base_path": os.path.abspath(base_path)}
# self.connector = self.device_manager.connector
# self._update_scaninfo()
# self._update_filewriter()
# self._init()
# def _update_filewriter(self) -> None:
# """Update filewriter with service config"""
# self.filewriter = FileWriter(service_config=self.service_cfg, connector=self.connector)
# def _update_scaninfo(self) -> None:
# """Update scaninfo from BecScaninfoMixing
# This depends on device manager and operation/sim_mode
# """
# self.scaninfo = BecScaninfoMixin(self.device_manager)
# self.scaninfo.load_scan_metadata()
# def _update_service_config(self) -> None:
# """Update service config from BEC service config
# If bec services are not running and SERVICE_CONFIG is NONE, we fall back to the current directory.
# """
# # pylint: disable=import-outside-toplevel
# from bec_lib.bec_service import SERVICE_CONFIG
# if SERVICE_CONFIG:
# self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
# return
# self.service_cfg = {"base_path": os.path.abspath(".")}
# def check_scan_id(self) -> None:
# """Checks if scan_id has changed and set stopped flagged to True if it has."""
# old_scan_id = self.scaninfo.scan_id
# self.scaninfo.load_scan_metadata()
# if self.scaninfo.scan_id != old_scan_id:
# self.stopped = True
# def _init(self) -> None:
# """Initialize detector, filewriter and set default parameters"""
# self.custom_prepare.on_init()
# def stage(self) -> list[object]:
# """
# Stage device in preparation for a scan.
# First we check if the device is already staged. Stage is idempotent,
# if staged twice it should raise (we let ophyd.Device handle the raise here).
# We reset the stopped flag and get the scaninfo from BEC, before calling custom_prepare.on_stage.
# Returns:
# list(object): list of objects that were staged
# """
# if self._staged != Staged.no:
# return super().stage()
# self.stopped = False
# self.scaninfo.load_scan_metadata()
# self.custom_prepare.on_stage()
# return super().stage()
# def pre_scan(self) -> None:
# """Pre-scan logic.
# This function will be called from BEC directly before the scan core starts, and should only implement
# time-critical actions. Therefore, it should also be kept as short/fast as possible.
# I.e. Arming a detector in case there is a risk of timing out.
# """
# self.custom_prepare.on_pre_scan()
# def trigger(self) -> DeviceStatus:
# """Trigger the detector, called from BEC."""
# # pylint: disable=assignment-from-no-return
# status = self.custom_prepare.on_trigger()
# if isinstance(status, DeviceStatus):
# return status
# return super().trigger()
# def complete(self) -> None:
# """Complete the acquisition, called from BEC.
# This function is called after the scan is complete, just before unstage.
# We can check here with the data backend and detector if the acquisition successfully finished.
# Actions are implemented in custom_prepare.on_complete since they are beamline specific.
# """
# # pylint: disable=assignment-from-no-return
# status = self.custom_prepare.on_complete()
# if isinstance(status, DeviceStatus):
# return status
# status = DeviceStatus(self)
# status.set_finished()
# return status
# def unstage(self) -> list[object]:
# """
# Unstage device after a scan.
# We first check if the scanID has changed, thus, the scan was unexpectedly interrupted but the device was not stopped.
# If that is the case, the stopped flag is set to True, which will immediately unstage the device.
# Custom_prepare.on_unstage is called to allow for BL specific logic to be executed.
# Returns:
# list(object): list of objects that were unstaged
# """
# self.check_scan_id()
# self.custom_prepare.on_unstage()
# self.stopped = False
# return super().unstage()
# def stop(self, *, success=False) -> None:
# """
# Stop the scan, with camera and file writer
# """
# self.custom_prepare.on_stop()
# super().stop(success=success)
# self.stopped = True
def publish_file_location(
self,
done: bool,
successful: bool,
filepath: str = None,
hinted_locations: dict = None,
metadata: dict = None,
) -> None:
"""
Publish the filepath to REDIS.
We publish two events here:
- file_event: event for the filewriter
- public_file: event for any secondary service (e.g. radial integ code)
Args:
done (bool): True if scan is finished
successful (bool): True if scan was successful
filepath (str): Optional, filepath to publish. If None, it will be taken from self.parent.filepath.get()
hinted_locations (dict): Optional, dictionary with hinted locations; {dev_name : h5_entry}
metadata (dict): additional metadata to publish
"""
if metadata is None:
metadata = {}
if filepath is None:
file_path = self.parent.filepath.get()
msg = messages.FileMessage(
file_path=self.parent.filepath.get(),
hinted_locations=hinted_locations,
done=done,
successful=successful,
metadata=metadata,
)
pipe = self.parent.connector.pipeline()
self.parent.connector.set_and_publish(
MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name),
msg,
pipe=pipe,
)
self.parent.connector.set_and_publish(
MessageEndpoints.file_event(self.parent.name), msg, pipe=pipe
)
pipe.execute()
class PSIDetectorBase(PSIDeviceBase):
"""Deprecated, use PSIDeviceBase instead. Here for backwards compatibility."""
custom_prepare_cls = CustomDetectorMixin
filepath = Cpt(SetableSignal, value="", kind=Kind.config)

View File

@ -2,16 +2,29 @@ from unittest import mock
def patch_dual_pvs(device):
"""Patch dual PVs"""
patch_functions_required_for_connection(device)
device.wait_for_connection(all_signals=True)
for walk in device.walk_signals():
if not hasattr(walk.item, "_read_pv"):
continue
if not hasattr(walk.item, "_write_pv"):
continue
if walk.item._read_pv.pvname.endswith("_RBV"):
if walk.item._read_pv.pvname != walk.item._write_pv.pvname:
walk.item._read_pv = walk.item._write_pv
def patch_functions_required_for_connection(device):
"""Patch functions required for connection. This will run the subs for all sub devices and devices.
This is needed to ensure that the wait_for_connection method of required for connections methods are properly patched.
"""
for event in device.event_types:
device._run_subs(sub_type=event, value=0, timestamp=0)
for name, dev in device.walk_subdevices(include_lazy=True):
for event in dev.event_types:
dev._run_subs(sub_type=event, value=0, timestamp=0)
class SocketMock:
"""Socket Mock. Used for testing"""

View File

@ -0,0 +1,75 @@
import threading
from unittest import mock
import ophyd
import pytest
from ophyd_devices.devices.delay_generator_645 import (
DelayGenerator,
DelayGeneratorError,
TriggerSource,
)
from ophyd_devices.tests.utils import (
MockPV,
patch_dual_pvs,
patch_functions_required_for_connection,
)
@pytest.fixture(scope="function")
def mock_ddg():
name = "ddg"
prefix = "X12SA-CPCL-DDG3:"
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
ddg = DelayGenerator(name=name, prefix=prefix)
patch_functions_required_for_connection(ddg)
patch_dual_pvs(ddg)
yield ddg
def test_ddg_init(mock_ddg):
"""This test the initialization of the DelayGenerator"""
assert mock_ddg.name == "ddg"
assert mock_ddg.prefix == "X12SA-CPCL-DDG3:"
def test_set_trigger(mock_ddg):
"""This test the set_trigger method of the DelayGenerator"""
mock_ddg.set_trigger(TriggerSource.SINGLE_SHOT)
assert mock_ddg.source.get() == 5
mock_ddg.set_trigger(TriggerSource.INTERNAL)
assert mock_ddg.source.get() == 0
def test_burst_enable(mock_ddg):
"""This test the burst_enable method of the DelayGenerator"""
count = 10
delay = 0.1
period = 0.2
mock_ddg.burst_enable(count=count, delay=delay, period=period)
assert mock_ddg.burstMode.get() == 1
assert mock_ddg.burstCount.get() == count
assert mock_ddg.burstDelay.get() == delay
assert mock_ddg.burstPeriod.get() == period
assert mock_ddg.burstConfig.get() == 0
with pytest.raises(DelayGeneratorError):
delay = -1
mock_ddg.burst_enable(count=count, delay=delay, period=period)
with pytest.raises(DelayGeneratorError):
delay = 0
period = 0
mock_ddg.burst_enable(count=count, delay=delay, period=period)
def test_check_if_ddg_okay(mock_ddg):
"""This test the is_ddg_okay method of the DelayGenerator"""
# Test for when the status is okay
mock_ddg.status._read_pv.mock_data = "STATUS OK"
assert mock_ddg.check_if_ddg_okay() is None
# Test for when the status is not okay
mock_ddg.status._read_pv.mock_data = "STATUS NOT OK"
with pytest.raises(DelayGeneratorError):
mock_ddg.check_if_ddg_okay()