From 8f51789f5b0e0e62b949bb202a3b7c3159cd86e5 Mon Sep 17 00:00:00 2001 From: appel_c Date: Wed, 4 Dec 2024 15:11:39 +0100 Subject: [PATCH] refactor: refactored delay generator DG645 --- ophyd_devices/devices/delay_generator_645.py | 324 +++++++++ .../base_classes/psi_delay_generator_base.py | 660 +++++++----------- 2 files changed, 567 insertions(+), 417 deletions(-) create mode 100644 ophyd_devices/devices/delay_generator_645.py diff --git a/ophyd_devices/devices/delay_generator_645.py b/ophyd_devices/devices/delay_generator_645.py new file mode 100644 index 0000000..fbe93ec --- /dev/null +++ b/ophyd_devices/devices/delay_generator_645.py @@ -0,0 +1,324 @@ +""" Module for integrating the Stanford Research DG645 Delay Generator""" + +import enum +import time +from typing import Any, Literal + +from bec_lib.logger import bec_logger +from ophyd import Component, Device, EpicsSignal, EpicsSignalRO, Kind, PVPositioner, Signal +from ophyd.pseudopos import ( + PseudoPositioner, + PseudoSingle, + pseudo_position_argument, + real_position_argument, +) +from typeguard import typechecked + +logger = bec_logger.logger + + +class DelayGeneratorError(Exception): + """Exception raised for errors.""" + + +class TriggerSource(enum.IntEnum): + """ + Class for trigger options of DG645 + + Used to set the trigger source of the DG645 by setting the value + e.g. source.put(TriggerSource.Internal) + Exp: + TriggerSource.Internal + """ + + INTERNAL = 0 + EXT_RISING_EDGE = 1 + EXT_FALLING_EDGE = 2 + SS_EXT_RISING_EDGE = 3 + SS_EXT_FALLING_EDGE = 4 + SINGLE_SHOT = 5 + LINE = 6 + + +class DelayStatic(Device): + """ + Static axis for the T0 output channel + + It allows setting the logic levels, but the timing is fixed. + The signal is high after receiving the trigger until the end + of the holdoff period. + """ + + # Other channel stuff + ttl_mode = Component(EpicsSignal, "OutputModeTtlSS.PROC", kind=Kind.config, auto_monitor=True) + nim_mode = Component(EpicsSignal, "OutputModeNimSS.PROC", kind=Kind.config, auto_monitor=True) + polarity = Component( + EpicsSignal, + "OutputPolarityBI", + write_pv="OutputPolarityBO", + name="polarity", + kind=Kind.config, + auto_monitor=True, + ) + + amplitude = Component( + EpicsSignal, + "OutputAmpAI", + write_pv="OutputAmpAO", + name="amplitude", + kind=Kind.config, + auto_monitor=True, + ) + + offset = Component( + EpicsSignal, + "OutputOffsetAI", + write_pv="OutputOffsetAO", + name="offset", + kind=Kind.config, + auto_monitor=True, + ) + + +class DummyPositioner(PVPositioner): + """Dummy Positioner to set AO, AI and ReferenceMO.""" + + setpoint = Component( + EpicsSignal, "DelayAO", put_complete=True, kind=Kind.config, auto_monitor=True + ) + readback = Component(EpicsSignalRO, "DelayAI", kind=Kind.config, auto_monitor=True) + # TODO This currently means that a "move" is done immediately. Given that these are PVs, this may be the appropriate solution + done = Component(Signal, value=1) + reference = Component( + EpicsSignal, "ReferenceMO", put_complete=True, kind=Kind.config, auto_monitor=True + ) + + +class DelayPair(PseudoPositioner): + """ + Delay pair interface + + Virtual motor interface to a pair of signals (on the frontpanel - AB/CD/EF/GH). + It offers a simple delay and pulse width interface. + """ + + # The pseudo positioner axes + delay = Component(PseudoSingle, limits=(0, 2000.0), name="delay") + width = Component(PseudoSingle, limits=(0, 2000.0), name="pulsewidth") + ch1 = Component(DummyPositioner, name="ch1") + ch2 = Component(DummyPositioner, name="ch2") + io = Component(DelayStatic, name="io") + + def __init__(self, *args, **kwargs): + # Change suffix names before connecting (a bit of dynamic connections) + self.__class__.__dict__["ch1"].suffix = kwargs["channel"][0] + self.__class__.__dict__["ch2"].suffix = kwargs["channel"][1] + self.__class__.__dict__["io"].suffix = kwargs["channel"] + + del kwargs["channel"] + # Call parent to start the connections + super().__init__(*args, **kwargs) + + @pseudo_position_argument + def forward(self, pseudo_pos): + """Run a forward (pseudo -> real) calculation""" + return self.RealPosition(ch1=pseudo_pos.delay, ch2=pseudo_pos.delay + pseudo_pos.width) + + @real_position_argument + def inverse(self, real_pos): + """Run an inverse (real -> pseudo) calculation""" + return self.PseudoPosition(delay=real_pos.ch1, width=real_pos.ch2 - real_pos.ch1) + + +class DelayGenerator(Device): + """Delay Generator Stanford Research DG645. This implements an interface for the DG645 delay generator. + + The DG645 has 8 channels, each with a delay and pulse width. The channels are implemented as DelayPair objects (AB etc.). + + Signal pairs, e.g. AB, CD, EF, GH, are implemented as DelayPair objects. They + have a TTL pulse width, delay and a reference signal to which they are being triggered. + In addition, the io layer allows setting amplitude, offset and polarity for each pair. + + Detailed information can be found in the manual: + https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf + """ + + USER_ACCESS = ["set_channels", "burst_enable", "burst_disable", "set_trigger", "is_ddg_okay"] + + # PVs + trigger_burst_readout = Component( + EpicsSignal, "EventStatusLI.PROC", name="trigger_burst_readout" + ) + burst_cycle_finished = Component(EpicsSignalRO, "EventStatusMBBID.B3", name="read_burst_state") + delay_finished = Component(EpicsSignalRO, "EventStatusMBBID.B2", name="delay_finished") + status = Component(EpicsSignalRO, "StatusSI", name="status") + clear_error = Component(EpicsSignal, "StatusClearBO", name="clear_error") + + # Front Panel + channelT0 = Component(DelayStatic, "T0", name="T0") + channelAB = Component(DelayPair, "", name="AB", channel="AB") + channelCD = Component(DelayPair, "", name="CD", channel="CD") + channelEF = Component(DelayPair, "", name="EF", channel="EF") + channelGH = Component(DelayPair, "", name="GH", channel="GH") + + holdoff = Component( + EpicsSignal, + "TriggerHoldoffAI", + write_pv="TriggerHoldoffAO", + name="trigger_holdoff", + kind=Kind.config, + ) + inhibit = Component( + EpicsSignal, + "TriggerInhibitMI", + write_pv="TriggerInhibitMO", + name="trigger_inhibit", + kind=Kind.config, + ) + source = Component( + EpicsSignal, + "TriggerSourceMI", + write_pv="TriggerSourceMO", + name="trigger_source", + kind=Kind.config, + ) + level = Component( + EpicsSignal, + "TriggerLevelAI", + write_pv="TriggerLevelAO", + name="trigger_level", + kind=Kind.config, + ) + rate = Component( + EpicsSignal, + "TriggerRateAI", + write_pv="TriggerRateAO", + name="trigger_rate", + kind=Kind.config, + ) + trigger_shot = Component(EpicsSignal, "TriggerDelayBO", name="trigger_shot", kind="config") + burstMode = Component( + EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burstmode", kind=Kind.config + ) + burstConfig = Component( + EpicsSignal, "BurstConfigBI", write_pv="BurstConfigBO", name="burstconfig", kind=Kind.config + ) + burstCount = Component( + EpicsSignal, "BurstCountLI", write_pv="BurstCountLO", name="burstcount", kind=Kind.config + ) + burstDelay = Component( + EpicsSignal, "BurstDelayAI", write_pv="BurstDelayAO", name="burstdelay", kind=Kind.config + ) + burstPeriod = Component( + EpicsSignal, "BurstPeriodAI", write_pv="BurstPeriodAO", name="burstperiod", kind=Kind.config + ) + + def __init__(self, name: str, prefix: str = "", kind: Kind = None, parent=None, **kwargs): + """Initialize the DG645 device + + Args: + name (str): Name of the device + prefix (str): PV prefix + kind (Kind): Kind of the device + parent: Parent device + """ + super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs) + + self.all_channels = ["channelT0", "channelAB", "channelCD", "channelEF", "channelGH"] + self.all_delay_pairs = ["AB", "CD", "EF", "GH"] + + def set_trigger(self, source: TriggerSource | int) -> None: + """Set the trigger source of the DG645 + + Args: + source (TriggerSource | int): The trigger source + INTERNAL = 0 + EXT_RISING_EDGE = 1 + EXT_FALLING_EDGE = 2 + SS_EXT_RISING_EDGE = 3 + SS_EXT_FALLING_EDGE = 4 + SINGLE_SHOT = 5 + LINE = 6 + """ + value = int(source) + self.source.put(value) + + @typechecked + def burst_enable( + self, count: int, delay: float, period: float, config: Literal["all", "first"] = "all" + ) -> None: + """Enable burst mode with valid parameters. + + Args: + count (int): Number of bursts >0 + delay (float): Delay between bursts in seconds >=0 + period (float): Period of the bursts in seconds >0 + config (str): Configuration of the burst. Default is "all" + """ + + # Check inputs first + if count <= 0: + raise DelayGeneratorError(f"Count must be >0, provided: {count}") + if delay < 0: + raise DelayGeneratorError(f"Delay must be >=0, provided: {delay}") + if period <= 0: + raise DelayGeneratorError(f"Period must be >0, provided: {period}") + + self.burstMode.put(1) + self.burstCount.put(count) + self.burstDelay.put(delay) + self.burstPeriod.put(period) + + if config == "all": + self.burstConfig.put(0) + elif config == "first": + self.burstConfig.put(1) + + def burst_disable(self) -> None: + """Disable burst mode""" + self.burstMode.put(0) + + def set_channels(self, signal: str, value: Any, channels: list = None) -> None: + """ + Utility method to set signals (width, delay, amplitude, offset, polarity) + on single of multiple channels T0, AB, CD, EF, GH. + + + Args: + signal (str) : signal to set (width, delay, amplitude, offset, polarity) + value (Any) : value to set + channels (list, optional) : list of channels to set. Defaults to self.all_channels + ["channelT0", "channelAB", "channelCD", "channelEF", "channelGH"] + """ + if not channels: + channels = self.all_channels + for chname in channels: + channel = getattr(self, chname, None) + if not channel: + continue + if signal in channel.component_names: + getattr(channel, signal).set(value) + continue + if "io" in channel.component_names and signal in channel.io.component_names: + getattr(channel.io, signal).set(value) + + def is_ddg_okay(self, raise_on_error: bool = False) -> None: + """ + Utility method to check if the DDG is okay. + + If raise_on_error is False, the method will: + (1) check the status of the DDG, + (2) if the status is not okay, it will try to clear the error and wait 0.5s before checking again. + + Args: + raise_on_error (bool, optional): raise exception if DDG is not okay. Defaults to False. + """ + sleep_time = 0.5 + status = self.status.read()[self.parent.status.name]["value"] + if status != "STATUS OK" and not raise_on_error: + logger.warning(f"DDG returns {status}, trying to clear ERROR") + self.parent.clear_error() + time.sleep(sleep_time) + self.is_ddg_okay(raise_on_error=True) + elif status != "STATUS OK": + raise DelayGeneratorError(f"DDG failed to start with status: {status}") diff --git a/ophyd_devices/interfaces/base_classes/psi_delay_generator_base.py b/ophyd_devices/interfaces/base_classes/psi_delay_generator_base.py index 03c76ea..713bb34 100644 --- a/ophyd_devices/interfaces/base_classes/psi_delay_generator_base.py +++ b/ophyd_devices/interfaces/base_classes/psi_delay_generator_base.py @@ -1,4 +1,3 @@ -import enum import time from typing import Any @@ -14,193 +13,90 @@ from ophyd import ( Signal, ) from ophyd.device import Staged -from ophyd.pseudopos import ( - PseudoPositioner, - PseudoSingle, - pseudo_position_argument, - real_position_argument, -) +from ophyd_devices.devices.delay_generator_645 import DelayGenerator +from ophyd_devices.interfaces.base_classes.psi_device_base import CustomPrepare, PSIDeviceBase from ophyd_devices.utils import bec_utils from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin logger = bec_logger.logger -class DelayGeneratorError(Exception): - """Exception raised for errors.""" +class DelayGeneratorNotOkay(Exception): + """Custom exception class for DelayGenerator errors""" -class DeviceInitError(DelayGeneratorError): - """Error upon failed initialization, invoked by missing device manager or device not started in sim_mode.""" +# class DDGCustomMixin: +# """ +# Mixin class for custom DelayGenerator logic within PSIDelayGeneratorBase. + +# This class provides a parent class for implementation of BL specific logic of the device. +# It is also possible to pass implementing certain methods, e.g. finished or on_trigger, +# based on the setup and desired operation mode at the beamline. + +# Args: +# parent (object): instance of PSIDelayGeneratorBase +# **kwargs: keyword arguments +# """ + +# def __init__(self, *_args, parent: Device = None, **_kwargs) -> None: +# self.parent = parent + +# def initialize_default_parameter(self) -> None: +# """ +# Method to initialize default parameters for DDG. + +# Called upon initiating the base class. +# It should be used to set the DDG default parameters. +# These may include: amplitude, offsets, delays, etc. +# """ + +# def prepare_ddg(self) -> None: +# """ +# Method to prepare the DDG for the upcoming scan. + +# Called by the stage method of the base class. +# It should be used to set the DDG parameters for the upcoming scan. +# """ + +# def on_trigger(self) -> None: +# """Method executed upon trigger call in parent class""" + +# def finished(self) -> None: +# """Method to check if DDG is finished with the scan""" + +# def on_pre_scan(self) -> None: +# """ +# Method executed upon pre_scan call in parent class. + +# Covenient to implement time sensitive actions to be executed right before start of the scan. +# Example could be to open the shutter by triggering a pulse via pre_scan. +# """ + +# def check_scan_id(self) -> None: +# """Method to check if there is a new scan_id, called by stage.""" + +# def is_ddg_okay(self, raise_on_error=False) -> None: +# """ +# Method to check if DDG is okay + +# It checks the status PV of the DDG and tries to clear the error if it is not okay. +# It will rerun itself and raise DelayGeneratorNotOkay if DDG is still not okay. + +# Args: +# raise_on_error (bool, optional): raise exception if DDG is not okay. Defaults to False. +# """ +# status = self.parent.status.read()[self.parent.status.name]["value"] +# if status != "STATUS OK" and not raise_on_error: +# logger.warning(f"DDG returns {status}, trying to clear ERROR") +# self.parent.clear_error() +# time.sleep(1) +# self.is_ddg_okay(raise_on_error=True) +# elif status != "STATUS OK": +# raise DelayGeneratorNotOkay(f"DDG failed to start with status: {status}") -class DelayGeneratorNotOkay(DelayGeneratorError): - """Error when DDG is not okay""" - - -class TriggerSource(enum.IntEnum): - """ - Class for trigger options of DG645 - - Used to set the trigger source of the DG645 by setting the value - e.g. source.put(TriggerSource.Internal) - Exp: - TriggerSource.Internal - """ - - INTERNAL = 0 - EXT_RISING_EDGE = 1 - EXT_FALLING_EDGE = 2 - SS_EXT_RISING_EDGE = 3 - SS_EXT_FALLING_EDGE = 4 - SINGLE_SHOT = 5 - LINE = 6 - - -class DelayStatic(Device): - """ - Static axis for the T0 output channel - - It allows setting the logic levels, but the timing is fixed. - The signal is high after receiving the trigger until the end - of the holdoff period. - """ - - # Other channel stuff - ttl_mode = Component(EpicsSignal, "OutputModeTtlSS.PROC", kind=Kind.config) - nim_mode = Component(EpicsSignal, "OutputModeNimSS.PROC", kind=Kind.config) - polarity = Component( - EpicsSignal, - "OutputPolarityBI", - write_pv="OutputPolarityBO", - name="polarity", - kind=Kind.config, - ) - amplitude = Component( - EpicsSignal, "OutputAmpAI", write_pv="OutputAmpAO", name="amplitude", kind=Kind.config - ) - offset = Component( - EpicsSignal, "OutputOffsetAI", write_pv="OutputOffsetAO", name="offset", kind=Kind.config - ) - - -class DummyPositioner(PVPositioner): - """Dummy Positioner to set AO, AI and ReferenceMO.""" - - setpoint = Component(EpicsSignal, "DelayAO", put_complete=True, kind=Kind.config) - readback = Component(EpicsSignalRO, "DelayAI", kind=Kind.config) - done = Component(Signal, value=1) - reference = Component(EpicsSignal, "ReferenceMO", put_complete=True, kind=Kind.config) - - -class DelayPair(PseudoPositioner): - """ - Delay pair interface - - Virtual motor interface to a pair of signals (on the frontpanel - AB/CD/EF/GH). - It offers a simple delay and pulse width interface. - """ - - # The pseudo positioner axes - delay = Component(PseudoSingle, limits=(0, 2000.0), name="delay") - width = Component(PseudoSingle, limits=(0, 2000.0), name="pulsewidth") - ch1 = Component(DummyPositioner, name="ch1") - ch2 = Component(DummyPositioner, name="ch2") - io = Component(DelayStatic, name="io") - - def __init__(self, *args, **kwargs): - # Change suffix names before connecting (a bit of dynamic connections) - self.__class__.__dict__["ch1"].suffix = kwargs["channel"][0] - self.__class__.__dict__["ch2"].suffix = kwargs["channel"][1] - self.__class__.__dict__["io"].suffix = kwargs["channel"] - - del kwargs["channel"] - # Call parent to start the connections - super().__init__(*args, **kwargs) - - @pseudo_position_argument - def forward(self, pseudo_pos): - """Run a forward (pseudo -> real) calculation""" - return self.RealPosition(ch1=pseudo_pos.delay, ch2=pseudo_pos.delay + pseudo_pos.width) - - @real_position_argument - def inverse(self, real_pos): - """Run an inverse (real -> pseudo) calculation""" - return self.PseudoPosition(delay=real_pos.ch1, width=real_pos.ch2 - real_pos.ch1) - - -class DDGCustomMixin: - """ - Mixin class for custom DelayGenerator logic within PSIDelayGeneratorBase. - - This class provides a parent class for implementation of BL specific logic of the device. - It is also possible to pass implementing certain methods, e.g. finished or on_trigger, - based on the setup and desired operation mode at the beamline. - - Args: - parent (object): instance of PSIDelayGeneratorBase - **kwargs: keyword arguments - """ - - def __init__(self, *_args, parent: Device = None, **_kwargs) -> None: - self.parent = parent - - def initialize_default_parameter(self) -> None: - """ - Method to initialize default parameters for DDG. - - Called upon initiating the base class. - It should be used to set the DDG default parameters. - These may include: amplitude, offsets, delays, etc. - """ - - def prepare_ddg(self) -> None: - """ - Method to prepare the DDG for the upcoming scan. - - Called by the stage method of the base class. - It should be used to set the DDG parameters for the upcoming scan. - """ - - def on_trigger(self) -> None: - """Method executed upon trigger call in parent class""" - - def finished(self) -> None: - """Method to check if DDG is finished with the scan""" - - def on_pre_scan(self) -> None: - """ - Method executed upon pre_scan call in parent class. - - Covenient to implement time sensitive actions to be executed right before start of the scan. - Example could be to open the shutter by triggering a pulse via pre_scan. - """ - - def check_scan_id(self) -> None: - """Method to check if there is a new scan_id, called by stage.""" - - def is_ddg_okay(self, raise_on_error=False) -> None: - """ - Method to check if DDG is okay - - It checks the status PV of the DDG and tries to clear the error if it is not okay. - It will rerun itself and raise DelayGeneratorNotOkay if DDG is still not okay. - - Args: - raise_on_error (bool, optional): raise exception if DDG is not okay. Defaults to False. - """ - status = self.parent.status.read()[self.parent.status.name]["value"] - if status != "STATUS OK" and not raise_on_error: - logger.warning(f"DDG returns {status}, trying to clear ERROR") - self.parent.clear_error() - time.sleep(1) - self.is_ddg_okay(raise_on_error=True) - elif status != "STATUS OK": - raise DelayGeneratorNotOkay(f"DDG failed to start with status: {status}") - - -class PSIDelayGeneratorBase(Device): +class PSIDelayGeneratorBase(PSIDeviceBase, DelayGenerator): """ Abstract base class for DelayGenerator DG645 @@ -239,272 +135,202 @@ class PSIDelayGeneratorBase(Device): """ # Custom_prepare_cls - custom_prepare_cls = DDGCustomMixin + # custom_prepare_cls = DDGCustomMixin - SUB_PROGRESS = "progress" - SUB_VALUE = "value" - _default_sub = SUB_VALUE + # SUB_PROGRESS = "progress" + # SUB_VALUE = "value" + # _default_sub = SUB_VALUE - USER_ACCESS = ["set_channels", "_set_trigger", "burst_enable", "burst_disable", "reload_config"] + # def __init__( + # self, + # prefix="", + # *, + # name, + # kind=None, + # read_attrs=None, + # configuration_attrs=None, + # parent=None, + # device_manager=None, + # sim_mode=False, + # **kwargs, + # ): + # super().__init__( + # prefix=prefix, + # name=name, + # kind=kind, + # read_attrs=read_attrs, + # configuration_attrs=configuration_attrs, + # parent=parent, + # **kwargs, + # ) + # if device_manager is None and not sim_mode: + # raise DeviceInitError( + # f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add" + # " DeviceManager to initialization or init with sim_mode=True" + # ) + # # Init variables + # self.sim_mode = sim_mode + # self.stopped = False + # self.name = name + # self.scaninfo = None + # self.timeout = 5 + # self.all_channels = ["channelT0", "channelAB", "channelCD", "channelEF", "channelGH"] + # self.all_delay_pairs = ["AB", "CD", "EF", "GH"] + # self.wait_for_connection(all_signals=True) - # Assign PVs from DDG645 - trigger_burst_readout = Component( - EpicsSignal, "EventStatusLI.PROC", name="trigger_burst_readout" - ) - burst_cycle_finished = Component(EpicsSignalRO, "EventStatusMBBID.B3", name="read_burst_state") - delay_finished = Component(EpicsSignalRO, "EventStatusMBBID.B2", name="delay_finished") - status = Component(EpicsSignalRO, "StatusSI", name="status") - clear_error = Component(EpicsSignal, "StatusClearBO", name="clear_error") + # # Init custom prepare class with BL specific logic + # self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs) + # if not sim_mode: + # self.device_manager = device_manager + # else: + # self.device_manager = bec_utils.DMMock() + # self.connector = self.device_manager.connector + # self._update_scaninfo() + # self._init() - # Front Panel - channelT0 = Component(DelayStatic, "T0", name="T0") - channelAB = Component(DelayPair, "", name="AB", channel="AB") - channelCD = Component(DelayPair, "", name="CD", channel="CD") - channelEF = Component(DelayPair, "", name="EF", channel="EF") - channelGH = Component(DelayPair, "", name="GH", channel="GH") + # def _update_scaninfo(self) -> None: + # """ + # Method to updated scaninfo from BEC. - holdoff = Component( - EpicsSignal, - "TriggerHoldoffAI", - write_pv="TriggerHoldoffAO", - name="trigger_holdoff", - kind=Kind.config, - ) - inhibit = Component( - EpicsSignal, - "TriggerInhibitMI", - write_pv="TriggerInhibitMO", - name="trigger_inhibit", - kind=Kind.config, - ) - source = Component( - EpicsSignal, - "TriggerSourceMI", - write_pv="TriggerSourceMO", - name="trigger_source", - kind=Kind.config, - ) - level = Component( - EpicsSignal, - "TriggerLevelAI", - write_pv="TriggerLevelAO", - name="trigger_level", - kind=Kind.config, - ) - rate = Component( - EpicsSignal, - "TriggerRateAI", - write_pv="TriggerRateAO", - name="trigger_rate", - kind=Kind.config, - ) - trigger_shot = Component(EpicsSignal, "TriggerDelayBO", name="trigger_shot", kind="config") - burstMode = Component( - EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burstmode", kind=Kind.config - ) - burstConfig = Component( - EpicsSignal, "BurstConfigBI", write_pv="BurstConfigBO", name="burstconfig", kind=Kind.config - ) - burstCount = Component( - EpicsSignal, "BurstCountLI", write_pv="BurstCountLO", name="burstcount", kind=Kind.config - ) - burstDelay = Component( - EpicsSignal, "BurstDelayAI", write_pv="BurstDelayAO", name="burstdelay", kind=Kind.config - ) - burstPeriod = Component( - EpicsSignal, "BurstPeriodAI", write_pv="BurstPeriodAO", name="burstperiod", kind=Kind.config - ) + # In sim_mode, scaninfo output is mocked - see bec_scaninfo_mixin.py + # """ + # self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode) + # self.scaninfo.load_scan_metadata() - def __init__( - self, - prefix="", - *, - name, - kind=None, - read_attrs=None, - configuration_attrs=None, - parent=None, - device_manager=None, - sim_mode=False, - **kwargs, - ): - super().__init__( - prefix=prefix, - name=name, - kind=kind, - read_attrs=read_attrs, - configuration_attrs=configuration_attrs, - parent=parent, - **kwargs, - ) - if device_manager is None and not sim_mode: - raise DeviceInitError( - f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add" - " DeviceManager to initialization or init with sim_mode=True" - ) - # Init variables - self.sim_mode = sim_mode - self.stopped = False - self.name = name - self.scaninfo = None - self.timeout = 5 - self.all_channels = ["channelT0", "channelAB", "channelCD", "channelEF", "channelGH"] - self.all_delay_pairs = ["AB", "CD", "EF", "GH"] - self.wait_for_connection(all_signals=True) + # def _init(self) -> None: + # """Method to initialize custom parameters of the DDG.""" + # self.custom_prepare.initialize_default_parameter() + # self.custom_prepare.is_ddg_okay() - # Init custom prepare class with BL specific logic - self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs) - if not sim_mode: - self.device_manager = device_manager - else: - self.device_manager = bec_utils.DMMock() - self.connector = self.device_manager.connector - self._update_scaninfo() - self._init() + # def set_channels(self, signal: str, value: Any, channels: list = None) -> None: + # """ + # Method to set signals on DelayPair and DelayStatic channels. - def _update_scaninfo(self) -> None: - """ - Method to updated scaninfo from BEC. + # Signals can be set on the DelayPair and DelayStatic channels. The method checks + # if the signal is available on the channel and sets it. It works for both, DelayPair + # and Delay Static although signals are hosted in different layers. - In sim_mode, scaninfo output is mocked - see bec_scaninfo_mixin.py - """ - self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode) - self.scaninfo.load_scan_metadata() + # Args: + # signal (str) : signal to set (width, delay, amplitude, offset, polarity) + # value (Any) : value to set + # channels (list, optional) : list of channels to set. Defaults to self.all_channels (T0,AB,CD,EF,GH) + # """ + # if not channels: + # channels = self.all_channels + # for chname in channels: + # channel = getattr(self, chname, None) + # if not channel: + # continue + # if signal in channel.component_names: + # getattr(channel, signal).set(value) + # continue + # if "io" in channel.component_names and signal in channel.io.component_names: + # getattr(channel.io, signal).set(value) - def _init(self) -> None: - """Method to initialize custom parameters of the DDG.""" - self.custom_prepare.initialize_default_parameter() - self.custom_prepare.is_ddg_okay() + # def set_trigger(self, trigger_source: TriggerSource) -> None: + # """Set trigger source on DDG - possible values defined in TriggerSource enum""" + # value = int(trigger_source) + # self.source.put(value) - def set_channels(self, signal: str, value: Any, channels: list = None) -> None: - """ - Method to set signals on DelayPair and DelayStatic channels. + # def burst_enable(self, count, delay, period, config="all"): + # """Enable the burst mode""" + # # Validate inputs + # count = int(count) + # assert count > 0, "Number of bursts must be positive" + # assert delay >= 0, "Burst delay must be larger than 0" + # assert period > 0, "Burst period must be positive" + # assert config in ["all", "first"], "Supported burst configs are 'all' and 'first'" - Signals can be set on the DelayPair and DelayStatic channels. The method checks - if the signal is available on the channel and sets it. It works for both, DelayPair - and Delay Static although signals are hosted in different layers. + # self.burstMode.put(1) + # self.burstCount.put(count) + # self.burstDelay.put(delay) + # self.burstPeriod.put(period) - Args: - signal (str) : signal to set (width, delay, amplitude, offset, polarity) - value (Any) : value to set - channels (list, optional) : list of channels to set. Defaults to self.all_channels (T0,AB,CD,EF,GH) - """ - if not channels: - channels = self.all_channels - for chname in channels: - channel = getattr(self, chname, None) - if not channel: - continue - if signal in channel.component_names: - getattr(channel, signal).set(value) - continue - if "io" in channel.component_names and signal in channel.io.component_names: - getattr(channel.io, signal).set(value) + # if config == "all": + # self.burstConfig.put(0) + # elif config == "first": + # self.burstConfig.put(1) - def set_trigger(self, trigger_source: TriggerSource) -> None: - """Set trigger source on DDG - possible values defined in TriggerSource enum""" - value = int(trigger_source) - self.source.put(value) + # def burst_disable(self): + # """Disable burst mode""" + # self.burstMode.put(0) - def burst_enable(self, count, delay, period, config="all"): - """Enable the burst mode""" - # Validate inputs - count = int(count) - assert count > 0, "Number of bursts must be positive" - assert delay >= 0, "Burst delay must be larger than 0" - assert period > 0, "Burst period must be positive" - assert config in ["all", "first"], "Supported burst configs are 'all' and 'first'" + # def stage(self) -> list[object]: + # """ + # Method to stage the device. - self.burstMode.put(1) - self.burstCount.put(count) - self.burstDelay.put(delay) - self.burstPeriod.put(period) + # Called in preparation for a scan. - if config == "all": - self.burstConfig.put(0) - elif config == "first": - self.burstConfig.put(1) + # Internal Calls: + # - scaninfo.load_scan_metadata : load scan metadata + # - custom_prepare.prepare_ddg : prepare DDG for measurement + # - is_ddg_okay : check if DDG is okay - def burst_disable(self): - """Disable burst mode""" - self.burstMode.put(0) + # Returns: + # list(object): list of objects that were staged + # """ + # if self._staged != Staged.no: + # return super().stage() + # self.stopped = False + # self.scaninfo.load_scan_metadata() + # self.custom_prepare.prepare_ddg() + # self.custom_prepare.is_ddg_okay() + # # At the moment needed bc signal might not be reliable, BEC too fast. + # # Consider removing this overhead in future! + # time.sleep(0.05) + # return super().stage() - def stage(self) -> list[object]: - """ - Method to stage the device. + # def trigger(self) -> DeviceStatus: + # """ + # Method to trigger the acquisition. - Called in preparation for a scan. + # Internal Call: + # - custom_prepare.on_trigger : execute BL specific action + # """ + # self.custom_prepare.on_trigger() + # return super().trigger() - Internal Calls: - - scaninfo.load_scan_metadata : load scan metadata - - custom_prepare.prepare_ddg : prepare DDG for measurement - - is_ddg_okay : check if DDG is okay + # def pre_scan(self) -> None: + # """ + # Method pre_scan gets executed directly before the scan - Returns: - list(object): list of objects that were staged - """ - if self._staged != Staged.no: - return super().stage() - self.stopped = False - self.scaninfo.load_scan_metadata() - self.custom_prepare.prepare_ddg() - self.custom_prepare.is_ddg_okay() - # At the moment needed bc signal might not be reliable, BEC too fast. - # Consider removing this overhead in future! - time.sleep(0.05) - return super().stage() + # Internal Call: + # - custom_prepare.on_pre_scan : execute BL specific action + # """ + # self.custom_prepare.on_pre_scan() - def trigger(self) -> DeviceStatus: - """ - Method to trigger the acquisition. + # def unstage(self) -> list[object]: + # """ + # Method unstage gets called at the end of a scan. - Internal Call: - - custom_prepare.on_trigger : execute BL specific action - """ - self.custom_prepare.on_trigger() - return super().trigger() + # If scan (self.stopped is True) is stopped, returns directly. + # Otherwise, checks if the DDG finished acquisition - def pre_scan(self) -> None: - """ - Method pre_scan gets executed directly before the scan + # Internal Calls: + # - custom_prepare.check_scan_id : check if scan_id changed or detector stopped + # - custom_prepare.finished : check if device finished acquisition (succesfully) + # - is_ddg_okay : check if DDG is okay - Internal Call: - - custom_prepare.on_pre_scan : execute BL specific action - """ - self.custom_prepare.on_pre_scan() + # Returns: + # list(object): list of objects that were unstaged + # """ + # self.custom_prepare.check_scan_id() + # if self.stopped is True: + # return super().unstage() + # self.custom_prepare.finished() + # self.custom_prepare.is_ddg_okay() + # self.stopped = False + # return super().unstage() - def unstage(self) -> list[object]: - """ - Method unstage gets called at the end of a scan. + # def stop(self, *, success=False) -> None: + # """ + # Method to stop the DDG - If scan (self.stopped is True) is stopped, returns directly. - Otherwise, checks if the DDG finished acquisition + # #TODO Check if the pulse generation can be interruppted - Internal Calls: - - custom_prepare.check_scan_id : check if scan_id changed or detector stopped - - custom_prepare.finished : check if device finished acquisition (succesfully) - - is_ddg_okay : check if DDG is okay - - Returns: - list(object): list of objects that were unstaged - """ - self.custom_prepare.check_scan_id() - if self.stopped is True: - return super().unstage() - self.custom_prepare.finished() - self.custom_prepare.is_ddg_okay() - self.stopped = False - return super().unstage() - - def stop(self, *, success=False) -> None: - """ - Method to stop the DDG - - #TODO Check if the pulse generation can be interruppted - - Internal Call: - - custom_prepare.is_ddg_okay : check if DDG is okay - """ - self.custom_prepare.is_ddg_okay() - super().stop(success=success) - self.stopped = True + # Internal Call: + # - custom_prepare.is_ddg_okay : check if DDG is okay + # """ + # self.custom_prepare.is_ddg_okay() + # super().stop(success=success) + # self.stopped = True