feat(ddg): rewrite of the delay generator integration for cSAXS

This commit is contained in:
2025-07-03 20:13:12 +02:00
parent 44a4dfc818
commit 293e56fba8
6 changed files with 863 additions and 528 deletions

View File

@@ -0,0 +1,34 @@
ddg:
description: 'CSAXS master delay generator'
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.ddg_master.DDGMaster
deviceConfig:
prefix: "X12SA-CPCL-DDG1:"
enabled: true
readOnly: false
onFailure: raise
readoutPriority: baseline
softwareTrigger: true
samx:
readoutPriority: baseline
deviceClass: ophyd_devices.SimPositioner
deviceConfig:
delay: 1
limits:
- -50
- 50
tolerance: 0.01
update_frequency: 400
deviceTags:
- user motors
enabled: true
readOnly: false
bpm4i:
readoutPriority: monitored
deviceClass: ophyd_devices.SimMonitor
deviceConfig:
deviceTags:
- beamline
enabled: true
readOnly: false

View File

@@ -1,274 +0,0 @@
from bec_lib import bec_logger
from ophyd import Component, DeviceStatus, Kind
from ophyd_devices.devices.delay_generator_645 import DelayGenerator, TriggerSource
from ophyd_devices.interfaces.base_classes.bec_device_base import BECDeviceBase, CustomPrepare
from ophyd_devices.sim.sim_signals import SetableSignal
from ophyd_devices.utils import bec_utils
logger = bec_logger.logger
class DelayGeneratorcSAXSError(Exception):
"""Exception raised for errors."""
class DDGSetup(CustomPrepare["DelayGeneratorcSAXS"]):
"""
Custom Prepare class with hooks for beamline specific logic for the DG645 at CSAXS
"""
def on_wait_for_connection(self) -> None:
"""Init default parameter after the all signals are connected"""
for ii, channel in enumerate(self.parent.all_channels):
self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel])
self.parent.set_channels("amplitude", self.parent.amplitude.get())
self.parent.set_channels("offset", self.parent.offset.get())
# Setup reference
self.parent.set_channels(
"reference", 0, [f"channel{pair}.ch1" for pair in self.parent.all_delay_pairs]
)
self.parent.set_channels(
"reference", 0, [f"channel{pair}.ch2" for pair in self.parent.all_delay_pairs]
)
self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# Set threshold level for ext. pulses
self.parent.level.put(self.parent.thres_trig_level.get())
def on_stage(self) -> None:
"Hook execute before the scan starts"
if self.parent.scaninfo.scan_type == "step":
exp_time = self.parent.scaninfo.exp_time
delay = 0
self.parent.burst_disable()
self.parent.set_trigger(TriggerSource.SINGLE_SHOT)
self.parent.set_channels(signal="width", value=exp_time)
self.parent.set_channels(signal="delay", value=delay)
return
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "jjf_test":
# TODO implement the logic for JJF triggering
exp_time = 480e-6 # self.parent.scaninfo.exp_time
readout = 20e-6 # self.parent.scaninfo.readout_time
total_exposure = exp_time + readout
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure)
delay = 0
delay_burst = self.parent.delay_burst.get()
self.parent.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT)
self.parent.set_channels(signal="width", value=exp_time)
self.parent.set_channels(signal="delay", value=delay)
self.parent.burst_enable(
count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first"
)
logger.info(
f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}"
)
def on_trigger(self) -> DeviceStatus:
"""Method to be executed upon trigger"""
if self.parent.scaninfo.scan_type == "step":
self.parent.trigger_shot.put(1)
return
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "jjf_test":
exp_time = 480e-6 # self.parent.scaninfo.exp_time
readout = 20e-6 # self.parent.scaninfo.readout_time
total_exposure = exp_time + readout
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure)
# Start trigger cycle
self.parent.trigger_burst_readout.put(1)
# Create status object that will wait for the end of the burst cycle
status = self.wait_with_status(
signal_conditions=[(self.parent.burst_cycle_finished, 1)],
timeout=num_burst_cycle * total_exposure + 1, # add 1s to be sure
check_stopped=True,
exception_on_timeout=DelayGeneratorcSAXSError(
f"{self.parent.name} run into timeout in complete call."
),
)
logger.info(f"Return status {self.parent.name}")
return status
def on_complete(self) -> DeviceStatus:
pass
def on_pre_scan(self) -> None:
"""
Method called by pre_scan hook in parent class.
Executes trigger if premove_trigger is Trus.
"""
if self.parent.premove_trigger.get() is True:
self.parent.trigger_shot.put(1)
class DelayGeneratorcSAXS(BECDeviceBase, DelayGenerator):
"""
DG645 delay generator at cSAXS (multiple can be in use depending on the setup)
Default values for setting up DDG.
Note: checks of set calues are not (only partially) included, check manual for details on possible settings.
https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf
- delay_burst : (float >=0) Delay between trigger and first pulse in burst mode
- delta_width : (float >= 0) Add width to fast shutter signal to make sure its open during acquisition
- additional_triggers : (int) add additional triggers to burst mode (mcs card needs +1 triggers per line)
- polarity : (list of 0/1) polarity for different channels
- amplitude : (float) amplitude voltage of TTLs
- offset : (float) offset for ampltitude
- thres_trig_level : (float) threshold of trigger amplitude
Custom signals for logic in different DDGs during scans (for custom_prepare.prepare_ddg):
- set_high_on_exposure : (bool): if True, then TTL signal should go high during the full acquisition time of a scan.
# TODO trigger_width and fixed_ttl could be combined into single list.
- fixed_ttl_width : (list of either 1 or 0), one for each channel.
- trigger_width : (float) if fixed_ttl_width is True, then the width of the TTL pulse is set to this value.
- set_trigger_source : (TriggerSource) specifies the default trigger source for the DDG.
- premove_trigger : (bool) if True, then a trigger should be executed before the scan starts (to be implemented in on_pre_scan).
- set_high_on_stage : (bool) if True, then TTL signal should go high already on stage.
"""
custom_prepare_cls = DDGSetup
# Custom signals passed on during the init procedure via BEC
# TODO review whether those should remain here like that
delay_burst = Component(
bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config"
)
delta_width = Component(
bec_utils.ConfigSignal, name="delta_width", kind="config", config_storage_name="ddg_config"
)
additional_triggers = Component(
bec_utils.ConfigSignal,
name="additional_triggers",
kind="config",
config_storage_name="ddg_config",
)
polarity = Component(
bec_utils.ConfigSignal, name="polarity", kind="config", config_storage_name="ddg_config"
)
fixed_ttl_width = Component(
bec_utils.ConfigSignal,
name="fixed_ttl_width",
kind="config",
config_storage_name="ddg_config",
)
amplitude = Component(
bec_utils.ConfigSignal, name="amplitude", kind="config", config_storage_name="ddg_config"
)
offset = Component(
bec_utils.ConfigSignal, name="offset", kind="config", config_storage_name="ddg_config"
)
thres_trig_level = Component(
bec_utils.ConfigSignal,
name="thres_trig_level",
kind="config",
config_storage_name="ddg_config",
)
set_high_on_exposure = Component(
bec_utils.ConfigSignal,
name="set_high_on_exposure",
kind="config",
config_storage_name="ddg_config",
)
set_high_on_stage = Component(
bec_utils.ConfigSignal,
name="set_high_on_stage",
kind="config",
config_storage_name="ddg_config",
)
set_trigger_source = Component(
bec_utils.ConfigSignal,
name="set_trigger_source",
kind="config",
config_storage_name="ddg_config",
)
trigger_width = Component(
bec_utils.ConfigSignal,
name="trigger_width",
kind="config",
config_storage_name="ddg_config",
)
premove_trigger = Component(
bec_utils.ConfigSignal,
name="premove_trigger",
kind="config",
config_storage_name="ddg_config",
)
def __init__(
self,
name: str,
prefix: str = "",
kind: Kind = None,
ddg_config: dict = None,
parent=None,
device_manager=None,
**kwargs,
):
"""
Args:
prefix (str, optional): Prefix of the device. Defaults to "".
name (str): Name of the device.
kind (str, optional): Kind of the device. Defaults to None.
read_attrs (list, optional): List of attributes to read. Defaults to None.
configuration_attrs (list, optional): List of attributes to configure. Defaults to None.
parent (Device, optional): Parent device. Defaults to None.
device_manager (DeviceManagerBase, optional): DeviceManagerBase object. Defaults to None.
sim_mode (bool, optional): Simulation mode flag. Defaults to False.
ddg_config (dict, optional): Dictionary of ddg_config signals. Defaults to None.
"""
# Default values for ddg_config signals
self.ddg_config = {
# Setup default values
f"{name}_delay_burst": 0,
f"{name}_delta_width": 0,
f"{name}_additional_triggers": 0,
f"{name}_polarity": [1, 1, 1, 1, 1],
f"{name}_amplitude": 4.5,
f"{name}_offset": 0,
f"{name}_thres_trig_level": 2.5,
# Values for different behaviour during scans
f"{name}_fixed_ttl_width": [0, 0, 0, 0, 0],
f"{name}_trigger_width": None,
f"{name}_set_high_on_exposure": False,
f"{name}_set_high_on_stage": False,
f"{name}_set_trigger_source": "SINGLE_SHOT",
f"{name}_premove_trigger": False,
}
if ddg_config is not None:
# pylint: disable=expression-not-assigned
[self.ddg_config.update({f"{name}_{key}": value}) for key, value in ddg_config.items()]
super().__init__(
prefix=prefix,
name=name,
kind=kind,
parent=parent,
device_manager=device_manager,
**kwargs,
)
# if __name__ == "__main__":
# dgen = DelayGeneratorcSAXS("X12SA-CPCL-DDG3:", name="ddg3")

View File

@@ -0,0 +1,106 @@
from threading import Event, Thread
from time import sleep
from ophyd import DeviceStatus, StatusBase
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import DelayGeneratorCSAXS, ChannelConfig, AllChannelNames, OUTPUTPOLARITY, TRIGGERSOURCE, StatusBitsCompareStatus, STATUSBITS, CHANNELREFERENCE
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from bec_lib.logger import bec_logger
import atexit
logger = bec_logger.logger
_DEFAULT_CHANNEL_CONFIG: ChannelConfig = {
"amplitude":5.0,
"offset":0.0,
"polarity": OUTPUTPOLARITY.POSITIVE,
"mode":"ttl",
}
DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = {
"t0":_DEFAULT_CHANNEL_CONFIG,
"ab":_DEFAULT_CHANNEL_CONFIG,
"cd":_DEFAULT_CHANNEL_CONFIG,
"ef":_DEFAULT_CHANNEL_CONFIG,
"gh":_DEFAULT_CHANNEL_CONFIG,
}
DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.SINGLE_SHOT
DEFAULT_DEAD_TIMES_S = {
"ab":1e-3,
"cd":1e-3,
"ef":1e-3,
"gh":1e-3,
}
class DDGMaster(PSIDeviceBase, DelayGeneratorCSAXS):
"""
Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG1
"""
def on_connected(self) -> None:
"""Set the default values on the device - intended to overwrite everything to a usable default state.
Sets DEFAULT_IO_CONFIG into each channel,
sets the trigger source to DEFAULT_TRIGGER_SOURCE,
and turns off burst mode"""
self.burst_disable() # it is possible to miss setting settings if burst is enabled
for channel, config in DEFAULT_IO_CONFIG.items():
self.set_io_values(channel, **config)
self.set_trigger(DEFAULT_TRIGGER_SOURCE)
self.set_references_for_channels(
[("A", CHANNELREFERENCE.T0),
("B", CHANNELREFERENCE.A),
("C", CHANNELREFERENCE.T0),
("D", CHANNELREFERENCE.C),
("E", CHANNELREFERENCE.T0),
("F", CHANNELREFERENCE.E),
("G", CHANNELREFERENCE.T0),
("H", CHANNELREFERENCE.G)]
)
# Start background thread to poll status register
self._status_polling_stop_event = Event()
self._status_polling_thread = Thread(target=self._poll_status) # TODO if exit is called, ca_context from pyepics seems unset.. Hook to kill thread?
self._status_polling_thread.start()
atexit.register(self.on_destroy)
def _poll_status(self):
"""The status register has to be actively pulled, triggering the proc_status results in
event_status being updated, which in turn allows the StatusBitsCompareStatus from on_trigger
to be updated and eventually resolve."""
dispatcher = self.state.proc_status.cl.get_dispatcher()
event = dispatcher.stop_event
while not (self._status_polling_stop_event.is_set() or event.is_set()):
self.state.proc_status.put(1)
sleep(1/5) # poll the status at 5 Hz
def on_stage(self) -> DeviceStatus | StatusBase | None:
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"]
readout_time = self.scan_info.msg.scan_parameters["readout_time"]
if readout_time is not None and readout_time != 0:
# If we are given a single readout time from BEC, use it for all 4 channels
pulse_widths = [exp_time - readout_time]*len(DEFAULT_DEAD_TIMES_S)
else:
# Otherwise, derive the pulse widths from the default dead times defined above
pulse_widths = [exp_time - DEFAULT_DEAD_TIMES_S[ch] for ch in DEFAULT_DEAD_TIMES_S.keys()]
logger.info(f"setting pulse widths to {pulse_widths}")
self.set_delay_pairs(["ab", "cd", "ef", "gh"], delay=0, width=pulse_widths)
self.burst_enable(count=frames_per_trigger, delay=0, period=exp_time)
def on_trigger(self) -> DeviceStatus | StatusBase | None:
st = StatusBitsCompareStatus(self.state.event_status, STATUSBITS.END_OF_BURST, run=False)
self.cancel_on_stop(st)
self.trigger_shot.put(1)
return st
def on_destroy(self) -> None:
if getattr(self, "_status_polling_stop_event", None) is not None:
self._status_polling_stop_event.set()
if getattr(self, "_status_polling_thread", None) is not None:
self._status_polling_thread.join(timeout=3)
if __name__ == "__main__":
ddg = DDGMaster(name="ddg", prefix="X12SA-CPCL-DDG1:")
ddg.wait_for_connection(all_signals=True, timeout=30)
ddg.summary()

View File

@@ -0,0 +1,617 @@
"""
Delay generator implementation for CSAXS.
Detailed information can be found in the manual:
https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf
"""
import enum
from typing import Literal, TypedDict
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind, Signal
from ophyd_devices import StatusBase, SubscriptionStatus
from typeguard import typechecked
from bec_lib.logger import bec_logger
logger = bec_logger.logger
DelayChannelNames = Literal["ab", "cd", "ef", "gh"]
AllChannelNames = Literal["t0", "ab", "cd", "ef", "gh"]
LiteralChannels = Literal["A","B","C","D","E","F","G","H"]
class CHANNELREFERENCE(enum.Enum):
T0 = 0
A = 1
B = 2
C = 3
D = 4
E = 5
F = 6
G = 7
H = 8
class BURSTCONFIG(enum.Enum):
"""Enum option for burst_config signal of the delay generator.
ALL_CYCLES: T0 triggere for all cycles.
FIRST_CYCLE: T0 only triggered for the first cycle.
"""
ALL_CYCLES = 0
FIRST_CYCLE = 1
class TRIGGERSOURCE(enum.Enum):
"""Enum options for the trigger_source signal of the delay generator."""
INTERNAL = 0
EXT_RISING_EDGE = 1
EXT_FALLING_EDGE = 2
SS_EXT_RISING_EDGE = 3
SS_EXT_FALLING_EDGE = 4
SINGLE_SHOT = 5
LINE = 6
class TRIGGERINHIBIT(enum.Enum):
"""Enum options for the trigger_inhibit signal of the delay generator."""
OFF = 0
TRIGGERS = 1
AB = 2
AB_CD = 3
AB_CD_EF = 4
AB_CD_EF_GH = 5
class OUTPUTPOLARITY(enum.Enum):
"""Enum options for the polarity signal of the static pair."""
NEGATIVE = 0
POSITIVE = 1
class STATUSBITS(enum.IntFlag):
"""Bit flags for the status signal of the delay generator."""
TRIG = 1 << 0 # Got a trigger.
RATE = 1 << 1 # Got a trigger while a delay or burst was in progress.
END_OF_DELAY = 1 << 2 # A delay cycle has completed.
END_OF_BURST = 1 << 3 # A burst cycle has completed.
INHIBIT = 1 << 4 # A trigger or output delay cycle was inhibited.
ABORT_DELAY = 1 << 5 # A delay cycle was aborted early.
PLL_UNLOCK = 1 << 6 # The 100 MHz PLL came unlocked.
RB_UNLOCK = 1 << 7 # The installed Rb oscillator is unlocked.
def describe(self) -> dict:
"""Return a description of the status bits."""
descriptions = {
STATUSBITS.TRIG: "Got a trigger.",
STATUSBITS.RATE: "Got a trigger while a delay or burst was in progress.",
STATUSBITS.END_OF_DELAY: "A delay cycle has completed.",
STATUSBITS.END_OF_BURST: "A burst cycle has completed.",
STATUSBITS.INHIBIT: "A trigger or output delay cycle was inhibited.",
STATUSBITS.ABORT_DELAY: "A delay cycle was aborted early.",
STATUSBITS.PLL_UNLOCK: "The 100 MHz PLL came unlocked.",
STATUSBITS.RB_UNLOCK: "The installed Rb oscillator is unlocked.",
}
return {flag.name: descriptions[flag] for flag in STATUSBITS if flag in self}
class StatusBitsCompareStatus(SubscriptionStatus):
"""Compare status for STATUSBITS comparison."""
def __init__(
self,
signal: EpicsSignalRO,
value: STATUSBITS,
*args,
event_type=None,
timeout: float|None = None,
settle_time: float = 0,
run: bool = True,
**kwargs,
):
"""Initialize the compare status with a signal."""
self._signal = signal
self._value = value
super().__init__(
device=signal,
callback=self._compare_callback,
timeout=timeout,
settle_time=settle_time,
event_type=event_type,
run=run,
)
def _compare_callback(self, value, **kwargs) -> bool:
"""Callback for subscription status"""
return (STATUSBITS(value) & self._value) == self._value
class ChannelConfig(TypedDict):
amplitude: float | None
offset: float | None
polarity: OUTPUTPOLARITY | Literal[0, 1] | None
mode: Literal["ttl", "nim"] | None
class StaticPair(Device):
"""
Class to represent a static pair.
It allows setting the logic levels, but the timing is fixed.
The signal is high after receiving the trigger until the end of the holdoff period.
"""
ttl_mode = Cpt(
EpicsSignal, "OutputModeTtlSS.PROC", kind=Kind.omitted, auto_monitor=True
)
nim_mode = Cpt(
EpicsSignal, "OutputModeNimSS.PROC", kind=Kind.omitted, auto_monitor=True
)
polarity = Cpt(
EpicsSignal,
"OutputPolarityBI",
write_pv="OutputPolarityBO",
name="polarity",
kind=Kind.omitted,
auto_monitor=True,
)
amplitude = Cpt(
EpicsSignal,
"OutputAmpAI",
write_pv="OutputAmpAO",
name="amplitude",
kind=Kind.omitted,
auto_monitor=True,
)
offset = Cpt(
EpicsSignal,
"OutputOffsetAI",
write_pv="OutputOffsetAO",
name="offset",
kind=Kind.omitted,
auto_monitor=True,
)
class Channel(Device):
"""
Represents a single channel A, B, etc. of the delay generator.
"""
setpoint = Cpt(
EpicsSignal,
write_pv="DelayAO",
read_pv="DelayAI",
put_complete=True,
auto_monitor=True,
kind=Kind.omitted,
doc="Value for the channel",
)
reference = Cpt(
EpicsSignal,
"ReferenceMO",
put_complete=True,
kind=Kind.omitted,
auto_monitor=True,
doc="Reference channel for the channel", # Check defaults, possible should be T0,A,B,...
)
def __init__(self, *args, **kwargs):
"""
Initialize the channel with a setpoint and reference signal.
"""
# The read PV in EpicsSignal does not receive the prefix.. so we need to add it manually.
self.__class__.__dict__["setpoint"].kwargs["read_pv"] = args[0] + "DelayAI"
super().__init__(*args, **kwargs)
class WidthSignal(Signal):
"""A signal that represents the width of a channel."""
def get(self, **kwargs) -> float:
"""
Get the width of the channel.
Returns:
float: The width of the channel in seconds.
"""
parent: _DelayPairBase = self._parent # type: ignore
return parent.ch2.setpoint.get() - parent.ch1.setpoint.get() # type: ignore
def check_value(self, value: float) -> float:
"""Check if the value is larger equal to 0"""
if value >= 0:
return value
else:
raise ValueError(f"Width must be larger ot equal 0, got {value} seconds.")
def put(self, value: float, **kwargs):
"""
Set the width of the channel.
Args:
value (float): The width to set in seconds.
"""
self.check_value(value)
parent: _DelayPairBase = self._parent # type: ignore
ch1_setpoint: float = parent.ch1.setpoint.get()# type: ignore
parent.ch2.setpoint.put(ch1_setpoint + value, **kwargs)
def set(self, value: float, **kwargs):
"""
Set the width of the channel.
Args:
value (float): The width to set in seconds.
"""
status = StatusBase()
self.put(value, **kwargs)
status.set_finished()
return status
class DelaySignal(Signal):
"""A signal that represents the delay of a channel."""
def get(self, **kwargs):
"""
Get the delay of the channel.
Returns:
float: The delay of the channel in seconds.
"""
parent: _DelayPairBase = self._parent # type: ignore
return parent.ch1.setpoint.get()
def put(self, value: float, **kwargs):
"""
Set the delay of the channel.
Args:
value (float): The delay to set in seconds.
"""
parent: _DelayPairBase = self._parent # type: ignore
parent.ch1.setpoint.put(value, **kwargs)
parent.ch2.setpoint.put(value + parent.width.get(), **kwargs)
def set(self, value: float, **kwargs):
"""
Set the width of the channel.
Args:
value (float): The width to set in seconds.
"""
status = StatusBase()
self.put(value, **kwargs)
status.set_finished()
return status
class _DelayPairBase(Device):
""" Base class for delay pairs. Children have to implement ch1,ch2 for
the respective delay channels. The class attributes have to be called
ch1, ch2 for width and delay signals to work."""
ch1: Cpt[Channel]
ch2: Cpt[Channel]
io: Cpt[StaticPair]
width = Cpt(WidthSignal, name="width", kind=Kind.config, doc="Width of TTL pulse")
delay = Cpt(DelaySignal, name="delay", kind=Kind.config, doc="Delay of TTL pulse")
class DelayPairAB(_DelayPairBase):
ch1 = Cpt(Channel, "A", name="A", kind=Kind.omitted)
ch2 = Cpt(Channel, "B", name="B", kind=Kind.omitted)
io = Cpt(StaticPair, "AB", name="io", kind=Kind.omitted)
class DelayPairCD(_DelayPairBase):
ch1 = Cpt(Channel, "C", name="C", kind=Kind.omitted)
ch2 = Cpt(Channel, "D", name="D", kind=Kind.omitted)
io = Cpt(StaticPair, "CD", name="io", kind=Kind.omitted)
class DelayPairEF(_DelayPairBase):
ch1 = Cpt(Channel, "E", name="E", kind=Kind.omitted)
ch2 = Cpt(Channel, "F", name="F", kind=Kind.omitted)
io = Cpt(StaticPair, "EF", name="io", kind=Kind.omitted)
class DelayPairGH(_DelayPairBase):
ch1 = Cpt(Channel, "G", name="G", kind=Kind.omitted)
ch2 = Cpt(Channel, "H", name="H", kind=Kind.omitted)
io = Cpt(StaticPair, "GH", name="io", kind=Kind.omitted)
class DelayGeneratorEventStatus(Device):
"""Subdevice to represent the event state of the delay generator."""
event_status = Cpt(
EpicsSignalRO, "EventStatusLI", name="event_status", kind=Kind.omitted, auto_monitor=True
)
proc_status = Cpt(
EpicsSignal, "EventStatusLI.PROC", name="proc_status", kind=Kind.omitted
)
class DelayGeneratorCSAXS(Device):
"""
Delay Generator Stanford Research DG645. This implements an interface for the DG645 delay generator.
Detailed information can be found in the manual:
https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf
The DG645 has 8 channels, each with a delay and pulse width. The channels are implemented as DelayPair objects (AB etc.).
Each pair has a TTL pulse width, delay and a reference signal to which they are being triggered.
In addition, the io layer allows setting amplitude, offset and polarity for each pair.
"""
_pv_timeout: float = 1.5 # Default timeout for PV operations in seconds
# Front Panel
t0 = Cpt(StaticPair, "T0", name="t0")
ab = Cpt(DelayPairAB, "", name="ab")
cd = Cpt(DelayPairCD, "", name="cd")
ef = Cpt(DelayPairEF, "", name="ef")
gh = Cpt(DelayPairGH, "", name="gh")
state = Cpt(DelayGeneratorEventStatus, "", name="state")
status_msg = Cpt(
EpicsSignalRO, "StatusSI", name="status_msg", kind=Kind.omitted, auto_monitor=True
)
status_msg_clear = Cpt(
EpicsSignal, "StatusClearBO", name="status_msg_clear", kind=Kind.omitted
)
trigger_holdoff = Cpt(
EpicsSignal,
"TriggerHoldoffAI",
write_pv="TriggerHoldoffAO",
name="trigger_holdoff",
kind=Kind.config,
)
trigger_inhibit = Cpt(
EpicsSignal,
"TriggerInhibitMI",
write_pv="TriggerInhibitMO",
name="trigger_inhibit",
kind=Kind.omitted,
)
trigger_source = Cpt(
EpicsSignal,
"TriggerSourceMI",
write_pv="TriggerSourceMO",
name="trigger_source",
kind=Kind.omitted,
doc="Trigger Source for the DDG, options in TRIGGERSOURCE"
)
trigger_level = Cpt(
EpicsSignal,
"TriggerLevelAI",
write_pv="TriggerLevelAO",
name="trigger_level",
kind=Kind.omitted,
)
trigger_rate = Cpt(
EpicsSignal,
"TriggerRateAI",
write_pv="TriggerRateAO",
name="trigger_rate",
kind=Kind.omitted,
)
trigger_shot = Cpt(
EpicsSignal, "TriggerDelayBO", name="trigger_shot", kind=Kind.omitted
)
burst_mode = Cpt(
EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burst_mode", kind=Kind.omitted
)
burst_config = Cpt(
EpicsSignal,
"BurstConfigBI",
write_pv="BurstConfigBO",
name="burst_config",
kind=Kind.omitted,
)
burst_count = Cpt(
EpicsSignal, "BurstCountLI", write_pv="BurstCountLO", name="burst_count", kind=Kind.omitted
)
burst_delay = Cpt(
EpicsSignal, "BurstDelayAI", write_pv="BurstDelayAO", name="burst_delay", kind=Kind.omitted
)
burst_period = Cpt(
EpicsSignal,
"BurstPeriodAI",
write_pv="BurstPeriodAO",
name="burst_period",
kind=Kind.omitted,
)
def proc_event_status(self) -> None:
"""The reading must be manually triggered to update the event status."""
self.state.proc_status.put(1)
def wait_for_event_status(
self, value: STATUSBITS, timeout: float | None = None
) -> StatusBitsCompareStatus:
"""
Wait for a specific event status.
Args:
value (STATUSBITS): The status bits to wait for.
timeout (float): The maximum time to wait in seconds.
"""
return StatusBitsCompareStatus(
signal=self.state.event_status, value=value, timeout=timeout, run=True
)
def set_trigger(self, source: TRIGGERSOURCE | int) -> None:
"""
Set the trigger source.
Args:
source (TriggerSource | int): The trigger source
INTERNAL = 0
EXT_RISING_EDGE = 1
EXT_FALLING_EDGE = 2
SS_EXT_RISING_EDGE = 3
SS_EXT_FALLING_EDGE = 4
SINGLE_SHOT = 5
LINE = 6
"""
if isinstance(source, TRIGGERSOURCE):
self.trigger_source.set(source.value).wait(self._pv_timeout)
else:
self.trigger_source.set(int(source)).wait(self._pv_timeout)
@typechecked
def burst_enable(
self,
count: int,
delay: float,
period: float,
config: Literal["all", "first"] | BURSTCONFIG = "first",
) -> None:
"""Enable burst mode with valid parameters.
Args:
count (int): Number of bursts >0
delay (float): Delay before bursts start in seconds >=0
period (float): Period of the bursts in seconds >0
config (str): Configuration of T0 duiring burst.
In addition, to simplify triggering of other instruments synchronously with the burst,
the T0 output may be configured to fire on the first delay cycle of the burst,
rather than for all delay cycles as is normally the case. BURSTCONFIG
"""
# Check inputs first
if count <= 0:
raise ValueError(f"Count must be >0, provided: {count}")
if delay < 0:
raise ValueError(f"Delay must be >=0, provided: {delay}")
if period <= 0:
raise ValueError(f"Period must be >0, provided: {period}")
self.burst_mode.set(1).wait(timeout=self._pv_timeout)
self.burst_count.set(count).wait(timeout=self._pv_timeout)
self.burst_delay.set(delay).wait(timeout=self._pv_timeout)
self.burst_period.set(period).wait(timeout=self._pv_timeout)
if config == "all":
self.burst_config.set(BURSTCONFIG.ALL_CYCLES.value).wait(timeout=self._pv_timeout)
elif config == "first":
self.burst_config.set(BURSTCONFIG.FIRST_CYCLE.value).wait(timeout=self._pv_timeout)
def burst_disable(self) -> None:
"""Disable burst mode"""
self.burst_mode.set(0).wait(timeout=self._pv_timeout)
@typechecked
def set_io_values(
self,
channel: (
AllChannelNames | list[AllChannelNames]
),
amplitude: float | None = None,
offset: float | None = None,
polarity: OUTPUTPOLARITY | Literal[0, 1] | None = None,
mode: Literal["ttl", "nim"] | None = None,
) -> None:
"""Set the IO values for the static pair.
Args:
channel (str | list[str]): Channel(s) to set the IO values for.
Can be "t0", "ab", "cd", "ef", "gh" or a list of these.
If a list is provided, the same values will be set for all channels.
amplitude (float): Amplitude of the output signal in volts.
offset (float): Offset of the output signal in volts.
polarity (OUTPUTPOLARITY | int): Polarity of the output signal.
ttl_mode (bool): If True, set the output to TTL mode.
nim_mode (bool): If True, set the output to NIM mode.
If both ttl_mode and nim_mode are set to True,
a ValueError is raised.
"""
if isinstance(channel, str):
channel = [channel]
for ch in channel:
if ch == "t0":
io_channel = self.t0
else:
io_channel = getattr(getattr(self, ch), "io")
if amplitude is not None:
io_channel.amplitude.set(amplitude).wait(timeout=self._pv_timeout)
if offset is not None:
io_channel.offset.set(offset).wait(timeout=self._pv_timeout)
if polarity is not None:
if isinstance(polarity, OUTPUTPOLARITY):
io_channel.polarity.set(polarity.value).wait(timeout=self._pv_timeout)
else:
io_channel.polarity.set(int(polarity)).wait(timeout=self._pv_timeout)
if mode == "ttl":
io_channel.ttl_mode.set(1).wait(timeout=self._pv_timeout)
if mode == "nim":
io_channel.nim_mode.set(1).wait(timeout=self._pv_timeout)
def set_delay_pairs(
self,
channel: DelayChannelNames | list[DelayChannelNames],
delay: float | list[float] | None = None,
width: float | list[float] | None = None,
) -> None:
"""Set the delay and width for a specific channel pair.
Args:
channel (str): Channel pair to set the delay and width for.
Can be "ab", "cd", "ef", "gh".
delay (float): Delay in seconds to set for the channel pair.
width (float): Width in seconds to set for the channel pair.
"""
if isinstance(channel, str):
channel = [channel]
if isinstance(delay, (float, int)):
delay = [float(delay)] * len(channel)
if isinstance(width, (float,int)):
width = [float(width)] * len(channel)
if delay is not None:
if len(delay) != len(channel):
raise ValueError(
f"Length of delay {len(delay)} must match length of channel {len(channel)}."
)
for ii, ch in enumerate(channel):
delay_channel=getattr(self, ch)
delay_channel.delay.put(delay[ii])
if width is not None:
if len(width) != len(channel):
raise ValueError(
f"Length of width {len(width)} must match length of channel {len(channel)}."
)
logger.info(f"setting widths of channels {channel} to {width}")
for ii, ch in enumerate(channel):
delay_channel= getattr(self, ch)
delay_channel.width.put(width[ii])
def _get_literal_channel(self, channel: LiteralChannels) -> Channel:
return {
"A": self.ab.ch1,
"B": self.ab.ch2,
"C": self.cd.ch1,
"D": self.cd.ch2,
"E": self.ef.ch1,
"F": self.ef.ch2,
"G": self.gh.ch1,
"H": self.gh.ch2,
}[channel]
def set_channel_reference(self, channel: LiteralChannels, reference_channel: CHANNELREFERENCE):
self._get_literal_channel(channel).reference.put(reference_channel.value)
def set_references_for_channels(self, channels_and_refs: list[tuple[LiteralChannels, CHANNELREFERENCE]]):
for ch, ref in channels_and_refs:
self.set_channel_reference(ch, ref)
if __name__ == "__main__":
ddg = DelayGeneratorCSAXS(name="ddg", prefix="X12SA-CPCL-DDG1:")
ddg.wait_for_connection(all_signals=True, timeout=30)
ddg.summary()

View File

@@ -1,276 +1,128 @@
# pylint: skip-file
import threading
from typing import Generator
from unittest import mock
import numpy as np
import ophyd
import pytest
from ophyd_devices.devices.delay_generator_645 import TriggerSource
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
from csaxs_bec.devices.epics.delay_generator_csaxs import DDGSetup
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (
BURSTCONFIG,
STATUSBITS,
TRIGGERSOURCE,
DelayGeneratorCSAXS,
)
@pytest.fixture(scope="function")
def mock_DDGSetup():
mock_ddg = mock.MagicMock()
yield DDGSetup(parent=mock_ddg)
def mock_ddg() -> Generator[DelayGeneratorCSAXS, DelayGeneratorCSAXS, DelayGeneratorCSAXS]:
"""Fixture to mock the camera device."""
name = "ddg"
prefix = "test:"
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = DelayGeneratorCSAXS(name=name, prefix=prefix)
patch_dual_pvs(dev)
yield dev
# Fixture for scaninfo
@pytest.fixture(
params=[
{
"scan_id": "1234",
"scan_type": "step",
"num_points": 500,
"frames_per_trigger": 1,
"exp_time": 0.1,
"readout_time": 0.1,
},
{
"scan_id": "1234",
"scan_type": "step",
"num_points": 500,
"frames_per_trigger": 5,
"exp_time": 0.01,
"readout_time": 0,
},
{
"scan_id": "1234",
"scan_type": "fly",
"num_points": 500,
"frames_per_trigger": 1,
"exp_time": 1,
"readout_time": 0.2,
},
{
"scan_id": "1234",
"scan_type": "fly",
"num_points": 500,
"frames_per_trigger": 5,
"exp_time": 0.1,
"readout_time": 0.4,
},
]
)
def scaninfo(request):
return request.param
def test_ddg_init(mock_ddg):
"""Test the proc event status method."""
assert mock_ddg.name == "ddg"
assert mock_ddg.prefix == "test:"
# Fixture for DDG config default values
@pytest.fixture(
params=[
{
"delay_burst": 0.0,
"delta_width": 0.0,
"additional_triggers": 0,
"polarity": [0, 0, 0, 0, 0],
"amplitude": 0.0,
"offset": 0.0,
"thres_trig_level": 0.0,
},
{
"delay_burst": 0.1,
"delta_width": 0.1,
"additional_triggers": 1,
"polarity": [0, 0, 1, 0, 0],
"amplitude": 5,
"offset": 0.0,
"thres_trig_level": 2.5,
},
]
)
def ddg_config_defaults(request):
return request.param
def test_ddg_proc_event_status(mock_ddg):
"""Test the proc event status method."""
mock_ddg.state.proc_status.put(0)
mock_ddg.proc_event_status()
assert mock_ddg.state.proc_status.get() == 1
# Fixture for DDG config scan values
@pytest.fixture(
params=[
{
"fixed_ttl_width": [0, 0, 0, 0, 0],
"trigger_width": None,
"set_high_on_exposure": False,
"set_high_on_stage": False,
"set_trigger_source": "SINGLE_SHOT",
"premove_trigger": False,
},
{
"fixed_ttl_width": [0, 0, 0, 0, 0],
"trigger_width": 0.1,
"set_high_on_exposure": True,
"set_high_on_stage": False,
"set_trigger_source": "SINGLE_SHOT",
"premove_trigger": True,
},
{
"fixed_ttl_width": [0, 0, 0, 0, 0],
"trigger_width": 0.1,
"set_high_on_exposure": False,
"set_high_on_stage": False,
"set_trigger_source": "EXT_RISING_EDGE",
"premove_trigger": False,
},
]
)
def ddg_config_scan(request):
return request.param
def test_ddg_set_trigger(mock_ddg):
"""Test setting the trigger."""
for trigger in TRIGGERSOURCE:
mock_ddg.set_trigger(trigger)
assert mock_ddg.trigger_source.get() == trigger.value
# Fixture for delay pairs
@pytest.fixture(
params=[
{"all_channels": ["channelAB", "channelCD"], "all_delay_pairs": ["AB", "CD"]},
{"all_channels": [], "all_delay_pairs": []},
{"all_channels": ["channelT0", "channelAB", "channelCD"], "all_delay_pairs": ["AB", "CD"]},
]
)
def channel_pairs(request):
return request.param
def test_ddg_burst_enable(mock_ddg):
"""Test enabling burst mode."""
mock_ddg.burst_enable(count=100, delay=0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES)
mock_ddg.burst_mode.get() == 1
assert mock_ddg.burst_count.get() == 100
assert mock_ddg.burst_delay.get() == 0.1
assert mock_ddg.burst_period.get() == 0.02
assert mock_ddg.burst_config.get() == BURSTCONFIG.ALL_CYCLES.value
assert mock_ddg.burst_mode.get() == 1
# Count is 0
with pytest.raises(ValueError):
mock_ddg.burst_enable(count=0, delay=0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES)
# delay is negative
with pytest.raises(ValueError):
mock_ddg.burst_enable(count=100, delay=-0.1, period=0.02, config=BURSTCONFIG.ALL_CYCLES)
# period is zero
with pytest.raises(ValueError):
mock_ddg.burst_enable(count=100, delay=0.1, period=0, config=BURSTCONFIG.ALL_CYCLES)
# Works with default config
mock_ddg.burst_enable(count=100, delay=0.1, period=0.02)
mock_ddg.burst_mode.get() == BURSTCONFIG.FIRST_CYCLE.value
def test_on_pre_scan(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan):
"""Test the check_scan_id method."""
# Set first attributes of parent class
for k, v in scaninfo.items():
setattr(mock_DDGSetup.parent.scaninfo, k, v)
for k, v in ddg_config_defaults.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
for k, v in ddg_config_scan.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
# Call the function you want to test
mock_DDGSetup.on_pre_scan()
if ddg_config_scan["premove_trigger"]:
mock_DDGSetup.parent.trigger_shot.put.assert_called_once_with(1)
def test_ddg_wait_for_event_status(mock_ddg):
"""Test setting wait for event status."""
mock_ddg: DelayGeneratorCSAXS
mock_ddg.state.event_status._read_pv.mock_data = 0
status = mock_ddg.wait_for_event_status(value=STATUSBITS.END_OF_BURST) # 8
assert status.done is False
mock_ddg.state.event_status._read_pv.mock_data = 1
assert status.done is False
mock_ddg.state.event_status._read_pv.mock_data = 4
assert status.done is False
# TODO enable once callback for MockPV is implemented
# mock_ddg.state.event_status._read_pv.mock_data = 13 # 8 + 4 + 1
# status.wait(timeout=1) # Wait for the status to be done
# assert status.done is True
# TODO put back once the logic is implemented
# @pytest.mark.parametrize("source", ["SINGLE_SHOT", "EXT_RISING_EDGE"])
# def test_on_trigger(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, source):
# """Test the on_trigger method."""
# # Set first attributes of parent class
# for k, v in scaninfo.items():
# setattr(mock_DDGSetup.parent.scaninfo, k, v)
# for k, v in ddg_config_defaults.items():
# getattr(mock_DDGSetup.parent, k).get.return_value = v
# for k, v in ddg_config_scan.items():
# getattr(mock_DDGSetup.parent, k).get.return_value = v
# # Call the function you want to test
# mock_DDGSetup.parent.source.name = "source"
# mock_DDGSetup.parent.source.read.return_value = {
# mock_DDGSetup.parent.source.name: {"value": getattr(TriggerSource, source)}
# }
# mock_DDGSetup.on_trigger()
# if source == "SINGLE_SHOT":
# mock_DDGSetup.parent.trigger_shot.put.assert_called_once_with(1)
def test_ddg_set_io_values(mock_ddg):
"""Test setting IO values."""
mock_ddg.set_io_values(channel="AB", amplitude=3, offset=2, polarity=1, mode="ttl")
assert mock_ddg.AB.io.amplitude.get() == 3
assert mock_ddg.AB.io.offset.get() == 2
assert mock_ddg.AB.io.polarity.get() == 1
assert mock_ddg.AB.io.ttl_mode.get() == 1
# List of channels
channels = ["AB", "CD", "T0"]
mock_ddg.set_io_values(channel=channels, amplitude=3, offset=2, polarity=1, mode="nim")
for channel in channels:
if channel == "T0":
attr = getattr(mock_ddg, channel)
else:
attr = getattr(mock_ddg, channel).io
assert attr.amplitude.get() == 3
assert attr.offset.get() == 2
assert attr.polarity.get() == 1
assert attr.nim_mode.get() == 1
def test_on_wait_for_connection(
mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, channel_pairs
):
"""Test the initialize_default_parameter method."""
# Set first attributes of parent class
for k, v in scaninfo.items():
setattr(mock_DDGSetup.parent.scaninfo, k, v)
for k, v in ddg_config_defaults.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
for k, v in ddg_config_scan.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
# Call the function you want to test
mock_DDGSetup.parent.all_channels = channel_pairs["all_channels"]
mock_DDGSetup.parent.all_delay_pairs = channel_pairs["all_delay_pairs"]
calls = []
calls.extend(
[
mock.call("polarity", ddg_config_defaults["polarity"][ii], [channel])
for ii, channel in enumerate(channel_pairs["all_channels"])
]
)
calls.extend([mock.call("amplitude", ddg_config_defaults["amplitude"])])
calls.extend([mock.call("offset", ddg_config_defaults["offset"])])
calls.extend(
[
mock.call(
"reference", 0, [f"channel{pair}.ch1" for pair in channel_pairs["all_delay_pairs"]]
)
]
)
calls.extend(
[
mock.call(
"reference", 0, [f"channel{pair}.ch2" for pair in channel_pairs["all_delay_pairs"]]
)
]
)
mock_DDGSetup.on_wait_for_connection()
mock_DDGSetup.parent.set_channels.assert_has_calls(calls)
# TODO put back once the logic is implemented
# def test_on_stage(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, channel_pairs):
# """Test the prepare_ddg method."""
# # Set first attributes of parent class
# for k, v in scaninfo.items():
# setattr(mock_DDGSetup.parent.scaninfo, k, v)
# for k, v in ddg_config_defaults.items():
# getattr(mock_DDGSetup.parent, k).get.return_value = v
# for k, v in ddg_config_scan.items():
# getattr(mock_DDGSetup.parent, k).get.return_value = v
# # Call the function you want to test
# mock_DDGSetup.parent.all_channels = channel_pairs["all_channels"]
# mock_DDGSetup.parent.all_delay_pairs = channel_pairs["all_delay_pairs"]
# mock_DDGSetup.prepare_ddg()
# mock_DDGSetup.parent.set_trigger.assert_called_once_with(
# getattr(TriggerSource, ddg_config_scan["set_trigger_source"])
# )
# if scaninfo["scan_type"] == "step":
# if ddg_config_scan["set_high_on_exposure"]:
# num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"]
# exp_time = ddg_config_defaults["delta_width"] + scaninfo["frames_per_trigger"] * (
# scaninfo["exp_time"] + scaninfo["readout_time"]
# )
# total_exposure = exp_time
# delay_burst = ddg_config_defaults["delay_burst"]
# else:
# exp_time = ddg_config_defaults["delta_width"] + scaninfo["exp_time"]
# total_exposure = exp_time + scaninfo["readout_time"]
# delay_burst = ddg_config_defaults["delay_burst"]
# num_burst_cycle = (
# scaninfo["frames_per_trigger"] + ddg_config_defaults["additional_triggers"]
# )
# elif scaninfo["scan_type"] == "fly":
# if ddg_config_scan["set_high_on_exposure"]:
# num_burst_cycle = 1 + ddg_config_defaults["additional_triggers"]
# exp_time = (
# ddg_config_defaults["delta_width"]
# + scaninfo["num_points"] * scaninfo["exp_time"]
# + (scaninfo["num_points"] - 1) * scaninfo["readout_time"]
# )
# total_exposure = exp_time
# delay_burst = ddg_config_defaults["delay_burst"]
# else:
# exp_time = ddg_config_defaults["delta_width"] + scaninfo["exp_time"]
# total_exposure = exp_time + scaninfo["readout_time"]
# delay_burst = ddg_config_defaults["delay_burst"]
# num_burst_cycle = scaninfo["num_points"] + ddg_config_defaults["additional_triggers"]
# # mock_DDGSetup.parent.burst_enable.assert_called_once_with(
# # mock.call(num_burst_cycle, delay_burst, total_exposure, config="first")
# # )
# mock_DDGSetup.parent.burst_enable.assert_called_once_with(
# num_burst_cycle, delay_burst, total_exposure, config="first"
# )
# if not ddg_config_scan["trigger_width"]:
# call = mock.call("width", exp_time)
# assert call in mock_DDGSetup.parent.set_channels.mock_calls
# else:
# call = mock.call("width", ddg_config_scan["trigger_width"])
# assert call in mock_DDGSetup.parent.set_channels.mock_calls
# if ddg_config_scan["set_high_on_exposure"]:
# calls = [
# mock.call("width", value, channels=[channel])
# for value, channel in zip(
# ddg_config_scan["fixed_ttl_width"], channel_pairs["all_channels"]
# )
# if value != 0
# ]
# if calls:
# assert all(calls in mock_DDGSetup.parent.set_channels.mock_calls)
def test_ddg_set_delay_pairs(mock_ddg):
"""Test setting delay pairs."""
mock_ddg.set_delay_pairs(channel="AB", delay=0.1, width=0.2)
assert np.isclose(mock_ddg.AB.delay.get(), 0.1)
assert np.isclose(mock_ddg.AB.width.get(), 0.2)
assert np.isclose(mock_ddg.AB.ch1.setpoint.get(), 0.1)
assert np.isclose(mock_ddg.AB.ch2.setpoint.get(), 0.3)
# List of channels
channels = ["AB", "CD", "EF", "GH"]
delays = [0.1, 0.2, 0.4, 0.5]
mock_ddg.set_delay_pairs(channel=channels, delay=delays, width=0.2)
for delay, channel in zip(delays, channels):
assert np.isclose(getattr(mock_ddg, channel).delay.get(), delay)
assert np.isclose(getattr(mock_ddg, channel).width.get(), 0.2)
assert np.isclose(getattr(mock_ddg, channel).ch1.setpoint.get(), delay)
assert np.isclose(getattr(mock_ddg, channel).ch2.setpoint.get(), delay + 0.2)