refactor: refactored delay generator DG645

This commit is contained in:
appel_c 2024-12-04 15:11:39 +01:00
parent 5b55ff25b6
commit 8f51789f5b
2 changed files with 567 additions and 417 deletions

View File

@ -0,0 +1,324 @@
""" Module for integrating the Stanford Research DG645 Delay Generator"""
import enum
import time
from typing import Any, Literal
from bec_lib.logger import bec_logger
from ophyd import Component, Device, EpicsSignal, EpicsSignalRO, Kind, PVPositioner, Signal
from ophyd.pseudopos import (
PseudoPositioner,
PseudoSingle,
pseudo_position_argument,
real_position_argument,
)
from typeguard import typechecked
logger = bec_logger.logger
class DelayGeneratorError(Exception):
"""Exception raised for errors."""
class TriggerSource(enum.IntEnum):
"""
Class for trigger options of DG645
Used to set the trigger source of the DG645 by setting the value
e.g. source.put(TriggerSource.Internal)
Exp:
TriggerSource.Internal
"""
INTERNAL = 0
EXT_RISING_EDGE = 1
EXT_FALLING_EDGE = 2
SS_EXT_RISING_EDGE = 3
SS_EXT_FALLING_EDGE = 4
SINGLE_SHOT = 5
LINE = 6
class DelayStatic(Device):
"""
Static axis for the T0 output channel
It allows setting the logic levels, but the timing is fixed.
The signal is high after receiving the trigger until the end
of the holdoff period.
"""
# Other channel stuff
ttl_mode = Component(EpicsSignal, "OutputModeTtlSS.PROC", kind=Kind.config, auto_monitor=True)
nim_mode = Component(EpicsSignal, "OutputModeNimSS.PROC", kind=Kind.config, auto_monitor=True)
polarity = Component(
EpicsSignal,
"OutputPolarityBI",
write_pv="OutputPolarityBO",
name="polarity",
kind=Kind.config,
auto_monitor=True,
)
amplitude = Component(
EpicsSignal,
"OutputAmpAI",
write_pv="OutputAmpAO",
name="amplitude",
kind=Kind.config,
auto_monitor=True,
)
offset = Component(
EpicsSignal,
"OutputOffsetAI",
write_pv="OutputOffsetAO",
name="offset",
kind=Kind.config,
auto_monitor=True,
)
class DummyPositioner(PVPositioner):
"""Dummy Positioner to set AO, AI and ReferenceMO."""
setpoint = Component(
EpicsSignal, "DelayAO", put_complete=True, kind=Kind.config, auto_monitor=True
)
readback = Component(EpicsSignalRO, "DelayAI", kind=Kind.config, auto_monitor=True)
# TODO This currently means that a "move" is done immediately. Given that these are PVs, this may be the appropriate solution
done = Component(Signal, value=1)
reference = Component(
EpicsSignal, "ReferenceMO", put_complete=True, kind=Kind.config, auto_monitor=True
)
class DelayPair(PseudoPositioner):
"""
Delay pair interface
Virtual motor interface to a pair of signals (on the frontpanel - AB/CD/EF/GH).
It offers a simple delay and pulse width interface.
"""
# The pseudo positioner axes
delay = Component(PseudoSingle, limits=(0, 2000.0), name="delay")
width = Component(PseudoSingle, limits=(0, 2000.0), name="pulsewidth")
ch1 = Component(DummyPositioner, name="ch1")
ch2 = Component(DummyPositioner, name="ch2")
io = Component(DelayStatic, name="io")
def __init__(self, *args, **kwargs):
# Change suffix names before connecting (a bit of dynamic connections)
self.__class__.__dict__["ch1"].suffix = kwargs["channel"][0]
self.__class__.__dict__["ch2"].suffix = kwargs["channel"][1]
self.__class__.__dict__["io"].suffix = kwargs["channel"]
del kwargs["channel"]
# Call parent to start the connections
super().__init__(*args, **kwargs)
@pseudo_position_argument
def forward(self, pseudo_pos):
"""Run a forward (pseudo -> real) calculation"""
return self.RealPosition(ch1=pseudo_pos.delay, ch2=pseudo_pos.delay + pseudo_pos.width)
@real_position_argument
def inverse(self, real_pos):
"""Run an inverse (real -> pseudo) calculation"""
return self.PseudoPosition(delay=real_pos.ch1, width=real_pos.ch2 - real_pos.ch1)
class DelayGenerator(Device):
"""Delay Generator Stanford Research DG645. This implements an interface for the DG645 delay generator.
The DG645 has 8 channels, each with a delay and pulse width. The channels are implemented as DelayPair objects (AB etc.).
Signal pairs, e.g. AB, CD, EF, GH, are implemented as DelayPair objects. They
have a TTL pulse width, delay and a reference signal to which they are being triggered.
In addition, the io layer allows setting amplitude, offset and polarity for each pair.
Detailed information can be found in the manual:
https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf
"""
USER_ACCESS = ["set_channels", "burst_enable", "burst_disable", "set_trigger", "is_ddg_okay"]
# PVs
trigger_burst_readout = Component(
EpicsSignal, "EventStatusLI.PROC", name="trigger_burst_readout"
)
burst_cycle_finished = Component(EpicsSignalRO, "EventStatusMBBID.B3", name="read_burst_state")
delay_finished = Component(EpicsSignalRO, "EventStatusMBBID.B2", name="delay_finished")
status = Component(EpicsSignalRO, "StatusSI", name="status")
clear_error = Component(EpicsSignal, "StatusClearBO", name="clear_error")
# Front Panel
channelT0 = Component(DelayStatic, "T0", name="T0")
channelAB = Component(DelayPair, "", name="AB", channel="AB")
channelCD = Component(DelayPair, "", name="CD", channel="CD")
channelEF = Component(DelayPair, "", name="EF", channel="EF")
channelGH = Component(DelayPair, "", name="GH", channel="GH")
holdoff = Component(
EpicsSignal,
"TriggerHoldoffAI",
write_pv="TriggerHoldoffAO",
name="trigger_holdoff",
kind=Kind.config,
)
inhibit = Component(
EpicsSignal,
"TriggerInhibitMI",
write_pv="TriggerInhibitMO",
name="trigger_inhibit",
kind=Kind.config,
)
source = Component(
EpicsSignal,
"TriggerSourceMI",
write_pv="TriggerSourceMO",
name="trigger_source",
kind=Kind.config,
)
level = Component(
EpicsSignal,
"TriggerLevelAI",
write_pv="TriggerLevelAO",
name="trigger_level",
kind=Kind.config,
)
rate = Component(
EpicsSignal,
"TriggerRateAI",
write_pv="TriggerRateAO",
name="trigger_rate",
kind=Kind.config,
)
trigger_shot = Component(EpicsSignal, "TriggerDelayBO", name="trigger_shot", kind="config")
burstMode = Component(
EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burstmode", kind=Kind.config
)
burstConfig = Component(
EpicsSignal, "BurstConfigBI", write_pv="BurstConfigBO", name="burstconfig", kind=Kind.config
)
burstCount = Component(
EpicsSignal, "BurstCountLI", write_pv="BurstCountLO", name="burstcount", kind=Kind.config
)
burstDelay = Component(
EpicsSignal, "BurstDelayAI", write_pv="BurstDelayAO", name="burstdelay", kind=Kind.config
)
burstPeriod = Component(
EpicsSignal, "BurstPeriodAI", write_pv="BurstPeriodAO", name="burstperiod", kind=Kind.config
)
def __init__(self, name: str, prefix: str = "", kind: Kind = None, parent=None, **kwargs):
"""Initialize the DG645 device
Args:
name (str): Name of the device
prefix (str): PV prefix
kind (Kind): Kind of the device
parent: Parent device
"""
super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs)
self.all_channels = ["channelT0", "channelAB", "channelCD", "channelEF", "channelGH"]
self.all_delay_pairs = ["AB", "CD", "EF", "GH"]
def set_trigger(self, source: TriggerSource | int) -> None:
"""Set the trigger source of the DG645
Args:
source (TriggerSource | int): The trigger source
INTERNAL = 0
EXT_RISING_EDGE = 1
EXT_FALLING_EDGE = 2
SS_EXT_RISING_EDGE = 3
SS_EXT_FALLING_EDGE = 4
SINGLE_SHOT = 5
LINE = 6
"""
value = int(source)
self.source.put(value)
@typechecked
def burst_enable(
self, count: int, delay: float, period: float, config: Literal["all", "first"] = "all"
) -> None:
"""Enable burst mode with valid parameters.
Args:
count (int): Number of bursts >0
delay (float): Delay between bursts in seconds >=0
period (float): Period of the bursts in seconds >0
config (str): Configuration of the burst. Default is "all"
"""
# Check inputs first
if count <= 0:
raise DelayGeneratorError(f"Count must be >0, provided: {count}")
if delay < 0:
raise DelayGeneratorError(f"Delay must be >=0, provided: {delay}")
if period <= 0:
raise DelayGeneratorError(f"Period must be >0, provided: {period}")
self.burstMode.put(1)
self.burstCount.put(count)
self.burstDelay.put(delay)
self.burstPeriod.put(period)
if config == "all":
self.burstConfig.put(0)
elif config == "first":
self.burstConfig.put(1)
def burst_disable(self) -> None:
"""Disable burst mode"""
self.burstMode.put(0)
def set_channels(self, signal: str, value: Any, channels: list = None) -> None:
"""
Utility method to set signals (width, delay, amplitude, offset, polarity)
on single of multiple channels T0, AB, CD, EF, GH.
Args:
signal (str) : signal to set (width, delay, amplitude, offset, polarity)
value (Any) : value to set
channels (list, optional) : list of channels to set. Defaults to self.all_channels
["channelT0", "channelAB", "channelCD", "channelEF", "channelGH"]
"""
if not channels:
channels = self.all_channels
for chname in channels:
channel = getattr(self, chname, None)
if not channel:
continue
if signal in channel.component_names:
getattr(channel, signal).set(value)
continue
if "io" in channel.component_names and signal in channel.io.component_names:
getattr(channel.io, signal).set(value)
def is_ddg_okay(self, raise_on_error: bool = False) -> None:
"""
Utility method to check if the DDG is okay.
If raise_on_error is False, the method will:
(1) check the status of the DDG,
(2) if the status is not okay, it will try to clear the error and wait 0.5s before checking again.
Args:
raise_on_error (bool, optional): raise exception if DDG is not okay. Defaults to False.
"""
sleep_time = 0.5
status = self.status.read()[self.parent.status.name]["value"]
if status != "STATUS OK" and not raise_on_error:
logger.warning(f"DDG returns {status}, trying to clear ERROR")
self.parent.clear_error()
time.sleep(sleep_time)
self.is_ddg_okay(raise_on_error=True)
elif status != "STATUS OK":
raise DelayGeneratorError(f"DDG failed to start with status: {status}")

View File

@ -1,4 +1,3 @@
import enum
import time
from typing import Any
@ -14,193 +13,90 @@ from ophyd import (
Signal,
)
from ophyd.device import Staged
from ophyd.pseudopos import (
PseudoPositioner,
PseudoSingle,
pseudo_position_argument,
real_position_argument,
)
from ophyd_devices.devices.delay_generator_645 import DelayGenerator
from ophyd_devices.interfaces.base_classes.psi_device_base import CustomPrepare, PSIDeviceBase
from ophyd_devices.utils import bec_utils
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
logger = bec_logger.logger
class DelayGeneratorError(Exception):
"""Exception raised for errors."""
class DelayGeneratorNotOkay(Exception):
"""Custom exception class for DelayGenerator errors"""
class DeviceInitError(DelayGeneratorError):
"""Error upon failed initialization, invoked by missing device manager or device not started in sim_mode."""
# class DDGCustomMixin:
# """
# Mixin class for custom DelayGenerator logic within PSIDelayGeneratorBase.
# This class provides a parent class for implementation of BL specific logic of the device.
# It is also possible to pass implementing certain methods, e.g. finished or on_trigger,
# based on the setup and desired operation mode at the beamline.
# Args:
# parent (object): instance of PSIDelayGeneratorBase
# **kwargs: keyword arguments
# """
# def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
# self.parent = parent
# def initialize_default_parameter(self) -> None:
# """
# Method to initialize default parameters for DDG.
# Called upon initiating the base class.
# It should be used to set the DDG default parameters.
# These may include: amplitude, offsets, delays, etc.
# """
# def prepare_ddg(self) -> None:
# """
# Method to prepare the DDG for the upcoming scan.
# Called by the stage method of the base class.
# It should be used to set the DDG parameters for the upcoming scan.
# """
# def on_trigger(self) -> None:
# """Method executed upon trigger call in parent class"""
# def finished(self) -> None:
# """Method to check if DDG is finished with the scan"""
# def on_pre_scan(self) -> None:
# """
# Method executed upon pre_scan call in parent class.
# Covenient to implement time sensitive actions to be executed right before start of the scan.
# Example could be to open the shutter by triggering a pulse via pre_scan.
# """
# def check_scan_id(self) -> None:
# """Method to check if there is a new scan_id, called by stage."""
# def is_ddg_okay(self, raise_on_error=False) -> None:
# """
# Method to check if DDG is okay
# It checks the status PV of the DDG and tries to clear the error if it is not okay.
# It will rerun itself and raise DelayGeneratorNotOkay if DDG is still not okay.
# Args:
# raise_on_error (bool, optional): raise exception if DDG is not okay. Defaults to False.
# """
# status = self.parent.status.read()[self.parent.status.name]["value"]
# if status != "STATUS OK" and not raise_on_error:
# logger.warning(f"DDG returns {status}, trying to clear ERROR")
# self.parent.clear_error()
# time.sleep(1)
# self.is_ddg_okay(raise_on_error=True)
# elif status != "STATUS OK":
# raise DelayGeneratorNotOkay(f"DDG failed to start with status: {status}")
class DelayGeneratorNotOkay(DelayGeneratorError):
"""Error when DDG is not okay"""
class TriggerSource(enum.IntEnum):
"""
Class for trigger options of DG645
Used to set the trigger source of the DG645 by setting the value
e.g. source.put(TriggerSource.Internal)
Exp:
TriggerSource.Internal
"""
INTERNAL = 0
EXT_RISING_EDGE = 1
EXT_FALLING_EDGE = 2
SS_EXT_RISING_EDGE = 3
SS_EXT_FALLING_EDGE = 4
SINGLE_SHOT = 5
LINE = 6
class DelayStatic(Device):
"""
Static axis for the T0 output channel
It allows setting the logic levels, but the timing is fixed.
The signal is high after receiving the trigger until the end
of the holdoff period.
"""
# Other channel stuff
ttl_mode = Component(EpicsSignal, "OutputModeTtlSS.PROC", kind=Kind.config)
nim_mode = Component(EpicsSignal, "OutputModeNimSS.PROC", kind=Kind.config)
polarity = Component(
EpicsSignal,
"OutputPolarityBI",
write_pv="OutputPolarityBO",
name="polarity",
kind=Kind.config,
)
amplitude = Component(
EpicsSignal, "OutputAmpAI", write_pv="OutputAmpAO", name="amplitude", kind=Kind.config
)
offset = Component(
EpicsSignal, "OutputOffsetAI", write_pv="OutputOffsetAO", name="offset", kind=Kind.config
)
class DummyPositioner(PVPositioner):
"""Dummy Positioner to set AO, AI and ReferenceMO."""
setpoint = Component(EpicsSignal, "DelayAO", put_complete=True, kind=Kind.config)
readback = Component(EpicsSignalRO, "DelayAI", kind=Kind.config)
done = Component(Signal, value=1)
reference = Component(EpicsSignal, "ReferenceMO", put_complete=True, kind=Kind.config)
class DelayPair(PseudoPositioner):
"""
Delay pair interface
Virtual motor interface to a pair of signals (on the frontpanel - AB/CD/EF/GH).
It offers a simple delay and pulse width interface.
"""
# The pseudo positioner axes
delay = Component(PseudoSingle, limits=(0, 2000.0), name="delay")
width = Component(PseudoSingle, limits=(0, 2000.0), name="pulsewidth")
ch1 = Component(DummyPositioner, name="ch1")
ch2 = Component(DummyPositioner, name="ch2")
io = Component(DelayStatic, name="io")
def __init__(self, *args, **kwargs):
# Change suffix names before connecting (a bit of dynamic connections)
self.__class__.__dict__["ch1"].suffix = kwargs["channel"][0]
self.__class__.__dict__["ch2"].suffix = kwargs["channel"][1]
self.__class__.__dict__["io"].suffix = kwargs["channel"]
del kwargs["channel"]
# Call parent to start the connections
super().__init__(*args, **kwargs)
@pseudo_position_argument
def forward(self, pseudo_pos):
"""Run a forward (pseudo -> real) calculation"""
return self.RealPosition(ch1=pseudo_pos.delay, ch2=pseudo_pos.delay + pseudo_pos.width)
@real_position_argument
def inverse(self, real_pos):
"""Run an inverse (real -> pseudo) calculation"""
return self.PseudoPosition(delay=real_pos.ch1, width=real_pos.ch2 - real_pos.ch1)
class DDGCustomMixin:
"""
Mixin class for custom DelayGenerator logic within PSIDelayGeneratorBase.
This class provides a parent class for implementation of BL specific logic of the device.
It is also possible to pass implementing certain methods, e.g. finished or on_trigger,
based on the setup and desired operation mode at the beamline.
Args:
parent (object): instance of PSIDelayGeneratorBase
**kwargs: keyword arguments
"""
def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
self.parent = parent
def initialize_default_parameter(self) -> None:
"""
Method to initialize default parameters for DDG.
Called upon initiating the base class.
It should be used to set the DDG default parameters.
These may include: amplitude, offsets, delays, etc.
"""
def prepare_ddg(self) -> None:
"""
Method to prepare the DDG for the upcoming scan.
Called by the stage method of the base class.
It should be used to set the DDG parameters for the upcoming scan.
"""
def on_trigger(self) -> None:
"""Method executed upon trigger call in parent class"""
def finished(self) -> None:
"""Method to check if DDG is finished with the scan"""
def on_pre_scan(self) -> None:
"""
Method executed upon pre_scan call in parent class.
Covenient to implement time sensitive actions to be executed right before start of the scan.
Example could be to open the shutter by triggering a pulse via pre_scan.
"""
def check_scan_id(self) -> None:
"""Method to check if there is a new scan_id, called by stage."""
def is_ddg_okay(self, raise_on_error=False) -> None:
"""
Method to check if DDG is okay
It checks the status PV of the DDG and tries to clear the error if it is not okay.
It will rerun itself and raise DelayGeneratorNotOkay if DDG is still not okay.
Args:
raise_on_error (bool, optional): raise exception if DDG is not okay. Defaults to False.
"""
status = self.parent.status.read()[self.parent.status.name]["value"]
if status != "STATUS OK" and not raise_on_error:
logger.warning(f"DDG returns {status}, trying to clear ERROR")
self.parent.clear_error()
time.sleep(1)
self.is_ddg_okay(raise_on_error=True)
elif status != "STATUS OK":
raise DelayGeneratorNotOkay(f"DDG failed to start with status: {status}")
class PSIDelayGeneratorBase(Device):
class PSIDelayGeneratorBase(PSIDeviceBase, DelayGenerator):
"""
Abstract base class for DelayGenerator DG645
@ -239,272 +135,202 @@ class PSIDelayGeneratorBase(Device):
"""
# Custom_prepare_cls
custom_prepare_cls = DDGCustomMixin
# custom_prepare_cls = DDGCustomMixin
SUB_PROGRESS = "progress"
SUB_VALUE = "value"
_default_sub = SUB_VALUE
# SUB_PROGRESS = "progress"
# SUB_VALUE = "value"
# _default_sub = SUB_VALUE
USER_ACCESS = ["set_channels", "_set_trigger", "burst_enable", "burst_disable", "reload_config"]
# def __init__(
# self,
# prefix="",
# *,
# name,
# kind=None,
# read_attrs=None,
# configuration_attrs=None,
# parent=None,
# device_manager=None,
# sim_mode=False,
# **kwargs,
# ):
# super().__init__(
# prefix=prefix,
# name=name,
# kind=kind,
# read_attrs=read_attrs,
# configuration_attrs=configuration_attrs,
# parent=parent,
# **kwargs,
# )
# if device_manager is None and not sim_mode:
# raise DeviceInitError(
# f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add"
# " DeviceManager to initialization or init with sim_mode=True"
# )
# # Init variables
# self.sim_mode = sim_mode
# self.stopped = False
# self.name = name
# self.scaninfo = None
# self.timeout = 5
# self.all_channels = ["channelT0", "channelAB", "channelCD", "channelEF", "channelGH"]
# self.all_delay_pairs = ["AB", "CD", "EF", "GH"]
# self.wait_for_connection(all_signals=True)
# Assign PVs from DDG645
trigger_burst_readout = Component(
EpicsSignal, "EventStatusLI.PROC", name="trigger_burst_readout"
)
burst_cycle_finished = Component(EpicsSignalRO, "EventStatusMBBID.B3", name="read_burst_state")
delay_finished = Component(EpicsSignalRO, "EventStatusMBBID.B2", name="delay_finished")
status = Component(EpicsSignalRO, "StatusSI", name="status")
clear_error = Component(EpicsSignal, "StatusClearBO", name="clear_error")
# # Init custom prepare class with BL specific logic
# self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
# if not sim_mode:
# self.device_manager = device_manager
# else:
# self.device_manager = bec_utils.DMMock()
# self.connector = self.device_manager.connector
# self._update_scaninfo()
# self._init()
# Front Panel
channelT0 = Component(DelayStatic, "T0", name="T0")
channelAB = Component(DelayPair, "", name="AB", channel="AB")
channelCD = Component(DelayPair, "", name="CD", channel="CD")
channelEF = Component(DelayPair, "", name="EF", channel="EF")
channelGH = Component(DelayPair, "", name="GH", channel="GH")
# def _update_scaninfo(self) -> None:
# """
# Method to updated scaninfo from BEC.
holdoff = Component(
EpicsSignal,
"TriggerHoldoffAI",
write_pv="TriggerHoldoffAO",
name="trigger_holdoff",
kind=Kind.config,
)
inhibit = Component(
EpicsSignal,
"TriggerInhibitMI",
write_pv="TriggerInhibitMO",
name="trigger_inhibit",
kind=Kind.config,
)
source = Component(
EpicsSignal,
"TriggerSourceMI",
write_pv="TriggerSourceMO",
name="trigger_source",
kind=Kind.config,
)
level = Component(
EpicsSignal,
"TriggerLevelAI",
write_pv="TriggerLevelAO",
name="trigger_level",
kind=Kind.config,
)
rate = Component(
EpicsSignal,
"TriggerRateAI",
write_pv="TriggerRateAO",
name="trigger_rate",
kind=Kind.config,
)
trigger_shot = Component(EpicsSignal, "TriggerDelayBO", name="trigger_shot", kind="config")
burstMode = Component(
EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burstmode", kind=Kind.config
)
burstConfig = Component(
EpicsSignal, "BurstConfigBI", write_pv="BurstConfigBO", name="burstconfig", kind=Kind.config
)
burstCount = Component(
EpicsSignal, "BurstCountLI", write_pv="BurstCountLO", name="burstcount", kind=Kind.config
)
burstDelay = Component(
EpicsSignal, "BurstDelayAI", write_pv="BurstDelayAO", name="burstdelay", kind=Kind.config
)
burstPeriod = Component(
EpicsSignal, "BurstPeriodAI", write_pv="BurstPeriodAO", name="burstperiod", kind=Kind.config
)
# In sim_mode, scaninfo output is mocked - see bec_scaninfo_mixin.py
# """
# self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode)
# self.scaninfo.load_scan_metadata()
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
**kwargs,
):
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
if device_manager is None and not sim_mode:
raise DeviceInitError(
f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add"
" DeviceManager to initialization or init with sim_mode=True"
)
# Init variables
self.sim_mode = sim_mode
self.stopped = False
self.name = name
self.scaninfo = None
self.timeout = 5
self.all_channels = ["channelT0", "channelAB", "channelCD", "channelEF", "channelGH"]
self.all_delay_pairs = ["AB", "CD", "EF", "GH"]
self.wait_for_connection(all_signals=True)
# def _init(self) -> None:
# """Method to initialize custom parameters of the DDG."""
# self.custom_prepare.initialize_default_parameter()
# self.custom_prepare.is_ddg_okay()
# Init custom prepare class with BL specific logic
self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
if not sim_mode:
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
self.connector = self.device_manager.connector
self._update_scaninfo()
self._init()
# def set_channels(self, signal: str, value: Any, channels: list = None) -> None:
# """
# Method to set signals on DelayPair and DelayStatic channels.
def _update_scaninfo(self) -> None:
"""
Method to updated scaninfo from BEC.
# Signals can be set on the DelayPair and DelayStatic channels. The method checks
# if the signal is available on the channel and sets it. It works for both, DelayPair
# and Delay Static although signals are hosted in different layers.
In sim_mode, scaninfo output is mocked - see bec_scaninfo_mixin.py
"""
self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode)
self.scaninfo.load_scan_metadata()
# Args:
# signal (str) : signal to set (width, delay, amplitude, offset, polarity)
# value (Any) : value to set
# channels (list, optional) : list of channels to set. Defaults to self.all_channels (T0,AB,CD,EF,GH)
# """
# if not channels:
# channels = self.all_channels
# for chname in channels:
# channel = getattr(self, chname, None)
# if not channel:
# continue
# if signal in channel.component_names:
# getattr(channel, signal).set(value)
# continue
# if "io" in channel.component_names and signal in channel.io.component_names:
# getattr(channel.io, signal).set(value)
def _init(self) -> None:
"""Method to initialize custom parameters of the DDG."""
self.custom_prepare.initialize_default_parameter()
self.custom_prepare.is_ddg_okay()
# def set_trigger(self, trigger_source: TriggerSource) -> None:
# """Set trigger source on DDG - possible values defined in TriggerSource enum"""
# value = int(trigger_source)
# self.source.put(value)
def set_channels(self, signal: str, value: Any, channels: list = None) -> None:
"""
Method to set signals on DelayPair and DelayStatic channels.
# def burst_enable(self, count, delay, period, config="all"):
# """Enable the burst mode"""
# # Validate inputs
# count = int(count)
# assert count > 0, "Number of bursts must be positive"
# assert delay >= 0, "Burst delay must be larger than 0"
# assert period > 0, "Burst period must be positive"
# assert config in ["all", "first"], "Supported burst configs are 'all' and 'first'"
Signals can be set on the DelayPair and DelayStatic channels. The method checks
if the signal is available on the channel and sets it. It works for both, DelayPair
and Delay Static although signals are hosted in different layers.
# self.burstMode.put(1)
# self.burstCount.put(count)
# self.burstDelay.put(delay)
# self.burstPeriod.put(period)
Args:
signal (str) : signal to set (width, delay, amplitude, offset, polarity)
value (Any) : value to set
channels (list, optional) : list of channels to set. Defaults to self.all_channels (T0,AB,CD,EF,GH)
"""
if not channels:
channels = self.all_channels
for chname in channels:
channel = getattr(self, chname, None)
if not channel:
continue
if signal in channel.component_names:
getattr(channel, signal).set(value)
continue
if "io" in channel.component_names and signal in channel.io.component_names:
getattr(channel.io, signal).set(value)
# if config == "all":
# self.burstConfig.put(0)
# elif config == "first":
# self.burstConfig.put(1)
def set_trigger(self, trigger_source: TriggerSource) -> None:
"""Set trigger source on DDG - possible values defined in TriggerSource enum"""
value = int(trigger_source)
self.source.put(value)
# def burst_disable(self):
# """Disable burst mode"""
# self.burstMode.put(0)
def burst_enable(self, count, delay, period, config="all"):
"""Enable the burst mode"""
# Validate inputs
count = int(count)
assert count > 0, "Number of bursts must be positive"
assert delay >= 0, "Burst delay must be larger than 0"
assert period > 0, "Burst period must be positive"
assert config in ["all", "first"], "Supported burst configs are 'all' and 'first'"
# def stage(self) -> list[object]:
# """
# Method to stage the device.
self.burstMode.put(1)
self.burstCount.put(count)
self.burstDelay.put(delay)
self.burstPeriod.put(period)
# Called in preparation for a scan.
if config == "all":
self.burstConfig.put(0)
elif config == "first":
self.burstConfig.put(1)
# Internal Calls:
# - scaninfo.load_scan_metadata : load scan metadata
# - custom_prepare.prepare_ddg : prepare DDG for measurement
# - is_ddg_okay : check if DDG is okay
def burst_disable(self):
"""Disable burst mode"""
self.burstMode.put(0)
# Returns:
# list(object): list of objects that were staged
# """
# if self._staged != Staged.no:
# return super().stage()
# self.stopped = False
# self.scaninfo.load_scan_metadata()
# self.custom_prepare.prepare_ddg()
# self.custom_prepare.is_ddg_okay()
# # At the moment needed bc signal might not be reliable, BEC too fast.
# # Consider removing this overhead in future!
# time.sleep(0.05)
# return super().stage()
def stage(self) -> list[object]:
"""
Method to stage the device.
# def trigger(self) -> DeviceStatus:
# """
# Method to trigger the acquisition.
Called in preparation for a scan.
# Internal Call:
# - custom_prepare.on_trigger : execute BL specific action
# """
# self.custom_prepare.on_trigger()
# return super().trigger()
Internal Calls:
- scaninfo.load_scan_metadata : load scan metadata
- custom_prepare.prepare_ddg : prepare DDG for measurement
- is_ddg_okay : check if DDG is okay
# def pre_scan(self) -> None:
# """
# Method pre_scan gets executed directly before the scan
Returns:
list(object): list of objects that were staged
"""
if self._staged != Staged.no:
return super().stage()
self.stopped = False
self.scaninfo.load_scan_metadata()
self.custom_prepare.prepare_ddg()
self.custom_prepare.is_ddg_okay()
# At the moment needed bc signal might not be reliable, BEC too fast.
# Consider removing this overhead in future!
time.sleep(0.05)
return super().stage()
# Internal Call:
# - custom_prepare.on_pre_scan : execute BL specific action
# """
# self.custom_prepare.on_pre_scan()
def trigger(self) -> DeviceStatus:
"""
Method to trigger the acquisition.
# def unstage(self) -> list[object]:
# """
# Method unstage gets called at the end of a scan.
Internal Call:
- custom_prepare.on_trigger : execute BL specific action
"""
self.custom_prepare.on_trigger()
return super().trigger()
# If scan (self.stopped is True) is stopped, returns directly.
# Otherwise, checks if the DDG finished acquisition
def pre_scan(self) -> None:
"""
Method pre_scan gets executed directly before the scan
# Internal Calls:
# - custom_prepare.check_scan_id : check if scan_id changed or detector stopped
# - custom_prepare.finished : check if device finished acquisition (succesfully)
# - is_ddg_okay : check if DDG is okay
Internal Call:
- custom_prepare.on_pre_scan : execute BL specific action
"""
self.custom_prepare.on_pre_scan()
# Returns:
# list(object): list of objects that were unstaged
# """
# self.custom_prepare.check_scan_id()
# if self.stopped is True:
# return super().unstage()
# self.custom_prepare.finished()
# self.custom_prepare.is_ddg_okay()
# self.stopped = False
# return super().unstage()
def unstage(self) -> list[object]:
"""
Method unstage gets called at the end of a scan.
# def stop(self, *, success=False) -> None:
# """
# Method to stop the DDG
If scan (self.stopped is True) is stopped, returns directly.
Otherwise, checks if the DDG finished acquisition
# #TODO Check if the pulse generation can be interruppted
Internal Calls:
- custom_prepare.check_scan_id : check if scan_id changed or detector stopped
- custom_prepare.finished : check if device finished acquisition (succesfully)
- is_ddg_okay : check if DDG is okay
Returns:
list(object): list of objects that were unstaged
"""
self.custom_prepare.check_scan_id()
if self.stopped is True:
return super().unstage()
self.custom_prepare.finished()
self.custom_prepare.is_ddg_okay()
self.stopped = False
return super().unstage()
def stop(self, *, success=False) -> None:
"""
Method to stop the DDG
#TODO Check if the pulse generation can be interruppted
Internal Call:
- custom_prepare.is_ddg_okay : check if DDG is okay
"""
self.custom_prepare.is_ddg_okay()
super().stop(success=success)
self.stopped = True
# Internal Call:
# - custom_prepare.is_ddg_okay : check if DDG is okay
# """
# self.custom_prepare.is_ddg_okay()
# super().stop(success=success)
# self.stopped = True