refactor(ddg): final refactoring and cleanup

This commit is contained in:
2025-07-10 15:48:41 +02:00
parent 293e56fba8
commit 1c539a1e9c
6 changed files with 293 additions and 143 deletions

View File

@@ -1,34 +0,0 @@
ddg:
description: 'CSAXS master delay generator'
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.ddg_master.DDGMaster
deviceConfig:
prefix: "X12SA-CPCL-DDG1:"
enabled: true
readOnly: false
onFailure: raise
readoutPriority: baseline
softwareTrigger: true
samx:
readoutPriority: baseline
deviceClass: ophyd_devices.SimPositioner
deviceConfig:
delay: 1
limits:
- -50
- 50
tolerance: 0.01
update_frequency: 400
deviceTags:
- user motors
enabled: true
readOnly: false
bpm4i:
readoutPriority: monitored
deviceClass: ophyd_devices.SimMonitor
deviceConfig:
deviceTags:
- beamline
enabled: true
readOnly: false

View File

@@ -0,0 +1 @@
from .ddg_master import DDGMaster

View File

@@ -1,77 +1,92 @@
from threading import Event, Thread
from time import sleep
from ophyd import DeviceStatus, StatusBase
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import DelayGeneratorCSAXS, ChannelConfig, AllChannelNames, OUTPUTPOLARITY, TRIGGERSOURCE, StatusBitsCompareStatus, STATUSBITS, CHANNELREFERENCE
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from bec_lib.logger import bec_logger
import atexit
import time
from threading import Event, Thread
from bec_lib.logger import bec_logger
from ophyd import DeviceStatus, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (
CHANNELREFERENCE,
OUTPUTPOLARITY,
STATUSBITS,
TRIGGERSOURCE,
AllChannelNames,
ChannelConfig,
DelayGeneratorCSAXS,
StatusBitsCompareStatus,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.error_registry import ERROR_CODES
logger = bec_logger.logger
_DEFAULT_CHANNEL_CONFIG: ChannelConfig = {
"amplitude":5.0,
"offset":0.0,
"polarity": OUTPUTPOLARITY.POSITIVE,
"mode":"ttl",
}
"amplitude": 5.0,
"offset": 0.0,
"polarity": OUTPUTPOLARITY.POSITIVE,
"mode": "ttl",
}
DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = {
"t0":_DEFAULT_CHANNEL_CONFIG,
"ab":_DEFAULT_CHANNEL_CONFIG,
"cd":_DEFAULT_CHANNEL_CONFIG,
"ef":_DEFAULT_CHANNEL_CONFIG,
"gh":_DEFAULT_CHANNEL_CONFIG,
"t0": _DEFAULT_CHANNEL_CONFIG,
"ab": _DEFAULT_CHANNEL_CONFIG,
"cd": _DEFAULT_CHANNEL_CONFIG,
"ef": _DEFAULT_CHANNEL_CONFIG,
"gh": _DEFAULT_CHANNEL_CONFIG,
}
DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.SINGLE_SHOT
DEFAULT_DEAD_TIMES_S = {
"ab":1e-3,
"cd":1e-3,
"ef":1e-3,
"gh":1e-3,
}
DEFAULT_DEAD_TIMES_S = {"ab": 1e-3, "cd": 1e-3, "ef": 1e-3, "gh": 1e-3}
class DDGMaster(PSIDeviceBase, DelayGeneratorCSAXS):
"""
Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG1
"""
# pylint: disable=attribute-defined-outside-init
def on_connected(self) -> None:
"""Set the default values on the device - intended to overwrite everything to a usable default state.
Sets DEFAULT_IO_CONFIG into each channel,
sets the trigger source to DEFAULT_TRIGGER_SOURCE,
sets the trigger source to DEFAULT_TRIGGER_SOURCE,
and turns off burst mode"""
self.burst_disable() # it is possible to miss setting settings if burst is enabled
self.burst_disable() # it is possible to miss setting settings if burst is enabled
for channel, config in DEFAULT_IO_CONFIG.items():
self.set_io_values(channel, **config)
self.set_io_values(channel, **config)
self.set_trigger(DEFAULT_TRIGGER_SOURCE)
self.set_references_for_channels(
[("A", CHANNELREFERENCE.T0),
[
("A", CHANNELREFERENCE.T0),
("B", CHANNELREFERENCE.A),
("C", CHANNELREFERENCE.T0),
("D", CHANNELREFERENCE.C),
("E", CHANNELREFERENCE.T0),
("F", CHANNELREFERENCE.E),
("G", CHANNELREFERENCE.T0),
("H", CHANNELREFERENCE.G)]
("H", CHANNELREFERENCE.G),
]
)
# Start background thread to poll status register
self._status_polling_stop_event = Event()
self._status_polling_thread = Thread(target=self._poll_status) # TODO if exit is called, ca_context from pyepics seems unset.. Hook to kill thread?
self._status_polling_thread = Thread(target=self._poll_status)
self._status_polling_thread.start()
atexit.register(self.on_destroy)
def _poll_status(self):
"""The status register has to be actively pulled, triggering the proc_status results in
"""The status register has to be actively pulled, triggering the proc_status results in
event_status being updated, which in turn allows the StatusBitsCompareStatus from on_trigger
to be updated and eventually resolve."""
dispatcher = self.state.proc_status.cl.get_dispatcher()
event = dispatcher.stop_event
while not (self._status_polling_stop_event.is_set() or event.is_set()):
self.state.proc_status.put(1)
sleep(1/5) # poll the status at 5 Hz
try:
# Call with timeout to avoid blocking in shutdown
self.state.proc_status.put(1, timeout=1)
except TimeoutError:
# If any of the stop events are set, stop polling
if self._status_polling_stop_event.is_set() or event.is_set():
logger.info("Exiting _poll_status thread loop for DDG.")
break
time.sleep(1 / 5) # poll the status at 5 Hz
def on_stage(self) -> DeviceStatus | StatusBase | None:
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
@@ -80,10 +95,10 @@ class DDGMaster(PSIDeviceBase, DelayGeneratorCSAXS):
if readout_time is not None and readout_time != 0:
# If we are given a single readout time from BEC, use it for all 4 channels
pulse_widths = [exp_time - readout_time]*len(DEFAULT_DEAD_TIMES_S)
pulse_widths = [exp_time - readout_time] * len(DEFAULT_DEAD_TIMES_S)
else:
# Otherwise, derive the pulse widths from the default dead times defined above
pulse_widths = [exp_time - DEFAULT_DEAD_TIMES_S[ch] for ch in DEFAULT_DEAD_TIMES_S.keys()]
pulse_widths = [exp_time - DEFAULT_DEAD_TIMES_S[ch] for ch in DEFAULT_DEAD_TIMES_S]
logger.info(f"setting pulse widths to {pulse_widths}")
self.set_delay_pairs(["ab", "cd", "ef", "gh"], delay=0, width=pulse_widths)
self.burst_enable(count=frames_per_trigger, delay=0, period=exp_time)
@@ -93,13 +108,18 @@ class DDGMaster(PSIDeviceBase, DelayGeneratorCSAXS):
self.cancel_on_stop(st)
self.trigger_shot.put(1)
return st
def on_destroy(self) -> None:
if getattr(self, "_status_polling_stop_event", None) is not None:
self._status_polling_stop_event.set()
if getattr(self, "_status_polling_thread", None) is not None:
self._status_polling_thread.join(timeout=3)
def on_stop(self) -> None:
"""Stop the delay generator by setting the burst mode to 0"""
self.stop_ddg()
if __name__ == "__main__":
ddg = DDGMaster(name="ddg", prefix="X12SA-CPCL-DDG1:")
ddg.wait_for_connection(all_signals=True, timeout=30)

View File

@@ -8,16 +8,19 @@ https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf
import enum
from typing import Literal, TypedDict
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind, Signal
from ophyd_devices import StatusBase, SubscriptionStatus
from typeguard import typechecked
from bec_lib.logger import bec_logger
from csaxs_bec.devices.epics.delay_generator_csaxs.error_registry import ERROR_CODES
logger = bec_logger.logger
DelayChannelNames = Literal["ab", "cd", "ef", "gh"]
AllChannelNames = Literal["t0", "ab", "cd", "ef", "gh"]
LiteralChannels = Literal["A","B","C","D","E","F","G","H"]
LiteralChannels = Literal["A", "B", "C", "D", "E", "F", "G", "H"]
class CHANNELREFERENCE(enum.Enum):
T0 = 0
@@ -30,6 +33,7 @@ class CHANNELREFERENCE(enum.Enum):
G = 7
H = 8
class BURSTCONFIG(enum.Enum):
"""Enum option for burst_config signal of the delay generator.
@@ -105,9 +109,10 @@ class StatusBitsCompareStatus(SubscriptionStatus):
self,
signal: EpicsSignalRO,
value: STATUSBITS,
raise_states: list[STATUSBITS] | None = None,
*args,
event_type=None,
timeout: float|None = None,
timeout: float | None = None,
settle_time: float = 0,
run: bool = True,
**kwargs,
@@ -115,6 +120,7 @@ class StatusBitsCompareStatus(SubscriptionStatus):
"""Initialize the compare status with a signal."""
self._signal = signal
self._value = value
self._raise_states = raise_states or []
super().__init__(
device=signal,
callback=self._compare_callback,
@@ -126,28 +132,43 @@ class StatusBitsCompareStatus(SubscriptionStatus):
def _compare_callback(self, value, **kwargs) -> bool:
"""Callback for subscription status"""
if any((STATUSBITS(value) & state) == state for state in self._raise_states):
self.set_exception(
ValueError(
f"Status bits {STATUSBITS(value).describe()} raised an exception: {self._raise_states}"
)
)
return False
return (STATUSBITS(value) & self._value) == self._value
class ChannelConfig(TypedDict):
amplitude: float | None
offset: float | None
polarity: OUTPUTPOLARITY | Literal[0, 1] | None
mode: Literal["ttl", "nim"] | None
amplitude: float | None
offset: float | None
polarity: OUTPUTPOLARITY | Literal[0, 1] | None
mode: Literal["ttl", "nim"] | None
class StaticPair(Device):
"""
Class to represent a static pair.
Class to represent a static pair (T0, aswell as all AB, CB, EF, GH channels).
It allows setting the logic levels, but the timing is fixed.
The signal is high after receiving the trigger until the end of the holdoff period.
"""
ttl_mode = Cpt(
EpicsSignal, "OutputModeTtlSS.PROC", kind=Kind.omitted, auto_monitor=True
EpicsSignal,
"OutputModeTtlSS.PROC",
kind=Kind.omitted,
auto_monitor=True,
doc="Set the output mode to TTL",
)
nim_mode = Cpt(
EpicsSignal, "OutputModeNimSS.PROC", kind=Kind.omitted, auto_monitor=True
EpicsSignal,
"OutputModeNimSS.PROC",
kind=Kind.omitted,
auto_monitor=True,
doc="Set the output mode to NIM",
)
polarity = Cpt(
EpicsSignal,
@@ -156,6 +177,7 @@ class StaticPair(Device):
name="polarity",
kind=Kind.omitted,
auto_monitor=True,
doc="Control the polarity of the output signal. POS 1 or NEG 0",
)
amplitude = Cpt(
@@ -165,6 +187,7 @@ class StaticPair(Device):
name="amplitude",
kind=Kind.omitted,
auto_monitor=True,
doc="Amplitude of the output signal in volts.",
)
offset = Cpt(
@@ -174,12 +197,13 @@ class StaticPair(Device):
name="offset",
kind=Kind.omitted,
auto_monitor=True,
doc="Offset of the output signal in volts.",
)
class Channel(Device):
"""
Represents a single channel A, B, etc. of the delay generator.
Represents a single channel A, B, C, ... of the delay generator.
"""
setpoint = Cpt(
@@ -189,7 +213,7 @@ class Channel(Device):
put_complete=True,
auto_monitor=True,
kind=Kind.omitted,
doc="Value for the channel",
doc="Setpoint value for the delay of the channel",
)
reference = Cpt(
EpicsSignal,
@@ -197,7 +221,7 @@ class Channel(Device):
put_complete=True,
kind=Kind.omitted,
auto_monitor=True,
doc="Reference channel for the channel", # Check defaults, possible should be T0,A,B,...
doc="Reference channel T0,A,B,.. for the delay of the setpoint",
)
def __init__(self, *args, **kwargs):
@@ -219,8 +243,8 @@ class WidthSignal(Signal):
Returns:
float: The width of the channel in seconds.
"""
parent: _DelayPairBase = self._parent # type: ignore
return parent.ch2.setpoint.get() - parent.ch1.setpoint.get() # type: ignore
parent: _DelayPairBase = self._parent # type: ignore
return parent.ch2.setpoint.get() - parent.ch1.setpoint.get() # type: ignore
def check_value(self, value: float) -> float:
"""Check if the value is larger equal to 0"""
@@ -237,8 +261,8 @@ class WidthSignal(Signal):
value (float): The width to set in seconds.
"""
self.check_value(value)
parent: _DelayPairBase = self._parent # type: ignore
ch1_setpoint: float = parent.ch1.setpoint.get()# type: ignore
parent: _DelayPairBase = self._parent # type: ignore
ch1_setpoint: float = parent.ch1.setpoint.get() # type: ignore
parent.ch2.setpoint.put(ch1_setpoint + value, **kwargs)
def set(self, value: float, **kwargs):
@@ -264,7 +288,7 @@ class DelaySignal(Signal):
Returns:
float: The delay of the channel in seconds.
"""
parent: _DelayPairBase = self._parent # type: ignore
parent: _DelayPairBase = self._parent # type: ignore
return parent.ch1.setpoint.get()
def put(self, value: float, **kwargs):
@@ -274,7 +298,7 @@ class DelaySignal(Signal):
Args:
value (float): The delay to set in seconds.
"""
parent: _DelayPairBase = self._parent # type: ignore
parent: _DelayPairBase = self._parent # type: ignore
parent.ch1.setpoint.put(value, **kwargs)
parent.ch2.setpoint.put(value + parent.width.get(), **kwargs)
@@ -289,51 +313,69 @@ class DelaySignal(Signal):
self.put(value, **kwargs)
status.set_finished()
return status
class _DelayPairBase(Device):
""" Base class for delay pairs. Children have to implement ch1,ch2 for
"""Base class for delay pairs. Children have to implement ch1,ch2 for
the respective delay channels. The class attributes have to be called
ch1, ch2 for width and delay signals to work."""
ch1: Cpt[Channel]
ch2: Cpt[Channel]
io: Cpt[StaticPair]
width = Cpt(WidthSignal, name="width", kind=Kind.config, doc="Width of TTL pulse")
delay = Cpt(DelaySignal, name="delay", kind=Kind.config, doc="Delay of TTL pulse")
width = Cpt(
WidthSignal, name="width", kind=Kind.config, doc="Width of TTL pulse for delay pair"
)
delay = Cpt(
DelaySignal, name="delay", kind=Kind.config, doc="Delay of TTL pulse for delay pair"
)
class DelayPairAB(_DelayPairBase):
ch1 = Cpt(Channel, "A", name="A", kind=Kind.omitted)
ch2 = Cpt(Channel, "B", name="B", kind=Kind.omitted)
io = Cpt(StaticPair, "AB", name="io", kind=Kind.omitted)
ch1 = Cpt(Channel, "A", name="A", kind=Kind.omitted, doc="Channel A")
ch2 = Cpt(Channel, "B", name="B", kind=Kind.omitted, doc="Channel B")
io = Cpt(StaticPair, "AB", name="io", kind=Kind.omitted, doc="IO for delay pair AB")
class DelayPairCD(_DelayPairBase):
ch1 = Cpt(Channel, "C", name="C", kind=Kind.omitted)
ch2 = Cpt(Channel, "D", name="D", kind=Kind.omitted)
io = Cpt(StaticPair, "CD", name="io", kind=Kind.omitted)
ch1 = Cpt(Channel, "C", name="C", kind=Kind.omitted, doc="Channel C")
ch2 = Cpt(Channel, "D", name="D", kind=Kind.omitted, doc="Channel D")
io = Cpt(StaticPair, "CD", name="io", kind=Kind.omitted, doc="IO for delay pair CD")
class DelayPairEF(_DelayPairBase):
ch1 = Cpt(Channel, "E", name="E", kind=Kind.omitted)
ch2 = Cpt(Channel, "F", name="F", kind=Kind.omitted)
io = Cpt(StaticPair, "EF", name="io", kind=Kind.omitted)
ch1 = Cpt(Channel, "E", name="E", kind=Kind.omitted, doc="Channel E")
ch2 = Cpt(Channel, "F", name="F", kind=Kind.omitted, doc="Channel F")
io = Cpt(StaticPair, "EF", name="io", kind=Kind.omitted, doc="IO for delay pair EF")
class DelayPairGH(_DelayPairBase):
ch1 = Cpt(Channel, "G", name="G", kind=Kind.omitted)
ch2 = Cpt(Channel, "H", name="H", kind=Kind.omitted)
io = Cpt(StaticPair, "GH", name="io", kind=Kind.omitted)
ch1 = Cpt(Channel, "G", name="G", kind=Kind.omitted, doc="Channel G")
ch2 = Cpt(Channel, "H", name="H", kind=Kind.omitted, doc="Channel H")
io = Cpt(StaticPair, "GH", name="io", kind=Kind.omitted, doc="IO for delay pair GH")
class DelayGeneratorEventStatus(Device):
"""Subdevice to represent the event state of the delay generator."""
event_status = Cpt(
EpicsSignalRO, "EventStatusLI", name="event_status", kind=Kind.omitted, auto_monitor=True
EpicsSignalRO,
"EventStatusLI",
name="event_status",
kind=Kind.omitted,
auto_monitor=True,
doc="Event status register for the delay generator",
)
proc_status = Cpt(
EpicsSignal, "EventStatusLI.PROC", name="proc_status", kind=Kind.omitted
EpicsSignal,
"EventStatusLI.PROC",
name="proc_status",
kind=Kind.omitted,
doc="Poll and flush the latest event status register entry from the HW to the event_status signal",
)
@@ -353,17 +395,26 @@ class DelayGeneratorCSAXS(Device):
_pv_timeout: float = 1.5 # Default timeout for PV operations in seconds
# Front Panel
t0 = Cpt(StaticPair, "T0", name="t0")
ab = Cpt(DelayPairAB, "", name="ab")
cd = Cpt(DelayPairCD, "", name="cd")
ef = Cpt(DelayPairEF, "", name="ef")
gh = Cpt(DelayPairGH, "", name="gh")
state = Cpt(DelayGeneratorEventStatus, "", name="state")
t0 = Cpt(StaticPair, "T0", name="t0", doc="T0 static pair")
ab = Cpt(DelayPairAB, "", name="ab", doc="Delay pair AB")
cd = Cpt(DelayPairCD, "", name="cd", doc="Delay pair CD")
ef = Cpt(DelayPairEF, "", name="ef", doc="Delay pair EF")
gh = Cpt(DelayPairGH, "", name="gh", doc="Delay pair GH")
state = Cpt(DelayGeneratorEventStatus, "", name="state", doc="Subdevice for event status")
status_msg = Cpt(
EpicsSignalRO, "StatusSI", name="status_msg", kind=Kind.omitted, auto_monitor=True
EpicsSignalRO,
"StatusSI",
name="status_msg",
kind=Kind.omitted,
auto_monitor=True,
doc="Status message from the delay generator",
)
status_msg_clear = Cpt(
EpicsSignal, "StatusClearBO", name="status_msg_clear", kind=Kind.omitted
EpicsSignal,
"StatusClearBO",
name="status_msg_clear",
kind=Kind.omitted,
doc="Clear the status message",
)
trigger_holdoff = Cpt(
@@ -386,7 +437,7 @@ class DelayGeneratorCSAXS(Device):
write_pv="TriggerSourceMO",
name="trigger_source",
kind=Kind.omitted,
doc="Trigger Source for the DDG, options in TRIGGERSOURCE"
doc="Trigger Source for the DDG, options in TRIGGERSOURCE",
)
trigger_level = Cpt(
EpicsSignal,
@@ -403,10 +454,20 @@ class DelayGeneratorCSAXS(Device):
kind=Kind.omitted,
)
trigger_shot = Cpt(
EpicsSignal, "TriggerDelayBO", name="trigger_shot", kind=Kind.omitted
EpicsSignal,
"TriggerDelayBO",
name="trigger_shot",
kind=Kind.omitted,
doc="Software trigger, needs to be in correct mode to work",
)
burst_mode = Cpt(
EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burst_mode", kind=Kind.omitted
EpicsSignal,
"BurstModeBI",
write_pv="BurstModeBO",
name="burst_mode",
kind=Kind.omitted,
auto_monitor=True,
doc="Enable or disable burst mode. 1 = enabled, 0 = disabled.",
)
burst_config = Cpt(
EpicsSignal,
@@ -414,12 +475,23 @@ class DelayGeneratorCSAXS(Device):
write_pv="BurstConfigBO",
name="burst_config",
kind=Kind.omitted,
doc="Configuration of T0 during burst. Can be ALL_CYCLES (0) or FIRST_CYCLE (1) .",
)
burst_count = Cpt(
EpicsSignal, "BurstCountLI", write_pv="BurstCountLO", name="burst_count", kind=Kind.omitted
EpicsSignal,
"BurstCountLI",
write_pv="BurstCountLO",
name="burst_count",
kind=Kind.omitted,
doc="Number of bursts to trigger in burst mode. Must be >0.",
)
burst_delay = Cpt(
EpicsSignal, "BurstDelayAI", write_pv="BurstDelayAO", name="burst_delay", kind=Kind.omitted
EpicsSignal,
"BurstDelayAI",
write_pv="BurstDelayAO",
name="burst_delay",
kind=Kind.omitted,
doc="Delay before bursts start in seconds. Must be >=0.",
)
burst_period = Cpt(
EpicsSignal,
@@ -427,6 +499,7 @@ class DelayGeneratorCSAXS(Device):
write_pv="BurstPeriodAO",
name="burst_period",
kind=Kind.omitted,
doc="Period of the bursts in seconds. Must be >0.",
)
def proc_event_status(self) -> None:
@@ -511,9 +584,7 @@ class DelayGeneratorCSAXS(Device):
@typechecked
def set_io_values(
self,
channel: (
AllChannelNames | list[AllChannelNames]
),
channel: AllChannelNames | list[AllChannelNames],
amplitude: float | None = None,
offset: float | None = None,
polarity: OUTPUTPOLARITY | Literal[0, 1] | None = None,
@@ -572,7 +643,7 @@ class DelayGeneratorCSAXS(Device):
channel = [channel]
if isinstance(delay, (float, int)):
delay = [float(delay)] * len(channel)
if isinstance(width, (float,int)):
if isinstance(width, (float, int)):
width = [float(width)] * len(channel)
if delay is not None:
if len(delay) != len(channel):
@@ -580,7 +651,7 @@ class DelayGeneratorCSAXS(Device):
f"Length of delay {len(delay)} must match length of channel {len(channel)}."
)
for ii, ch in enumerate(channel):
delay_channel=getattr(self, ch)
delay_channel = getattr(self, ch)
delay_channel.delay.put(delay[ii])
if width is not None:
if len(width) != len(channel):
@@ -589,7 +660,7 @@ class DelayGeneratorCSAXS(Device):
)
logger.info(f"setting widths of channels {channel} to {width}")
for ii, ch in enumerate(channel):
delay_channel= getattr(self, ch)
delay_channel = getattr(self, ch)
delay_channel.width.put(width[ii])
def _get_literal_channel(self, channel: LiteralChannels) -> Channel:
@@ -607,10 +678,29 @@ class DelayGeneratorCSAXS(Device):
def set_channel_reference(self, channel: LiteralChannels, reference_channel: CHANNELREFERENCE):
self._get_literal_channel(channel).reference.put(reference_channel.value)
def set_references_for_channels(self, channels_and_refs: list[tuple[LiteralChannels, CHANNELREFERENCE]]):
def set_references_for_channels(
self, channels_and_refs: list[tuple[LiteralChannels, CHANNELREFERENCE]]
):
for ch, ref in channels_and_refs:
self.set_channel_reference(ch, ref)
def stop_ddg(self) -> None:
"""Stop the delay generator by setting the burst mode to 0"""
self.burst_mode.put(0)
def reset_error(self) -> None:
"""Reset the error status message of the delay generator."""
self.status_msg_clear.put(1)
def get_error_msg(self) -> str:
"""Get the error message from the delay generator."""
msg = self.status_msg.get()
if msg in ERROR_CODES:
return ERROR_CODES[msg]
else:
return f"Unknown error code: {msg}"
if __name__ == "__main__":
ddg = DelayGeneratorCSAXS(name="ddg", prefix="X12SA-CPCL-DDG1:")
ddg.wait_for_connection(all_signals=True, timeout=30)

View File

@@ -0,0 +1,73 @@
ERROR_CODES: dict[str, str] = {
"STATUS OK": "No more errors left in the queue.", # renamed apparently from the IOC for No Error to STATUS OK
"Illegal Value": "A parameter was out of range.",
"Illegal Mode": "The action is illegal in the current mode.",
"Illegal Delay": "The requested delay is out of range.",
"Illegal Link": "The requested delay linkage is illegal.",
"Recall Failed": "Recall of instrument settings failed; settings were invalid.",
"Not Allowed": "Action not allowed: instrument is locked by another interface.",
"Failed Self Test": "The DG645 self test failed.",
"Failed Auto Calibration": "The DG645 auto calibration failed.",
"Lost Data": "Output buffer overflow or data lost due to communication error.",
"No Listener": "No GPIB listeners; pending output discarded.",
"Failed ROM Check": "ROM checksum failed; firmware likely corrupted.",
"Failed Offset T0 Test": "Self test of offset functionality for T0 failed.",
"Failed Offset AB Test": "Self test of offset functionality for AB failed.",
"Failed Offset CD Test": "Self test of offset functionality for CD failed.",
"Failed Offset EF Test": "Self test of offset functionality for EF failed.",
"Failed Offset GH Test": "Self test of offset functionality for GH failed.",
"Failed Amplitude T0 Test": "Self test of amplitude functionality for T0 failed.",
"Failed Amplitude AB Test": "Self test of amplitude functionality for AB failed.",
"Failed Amplitude CD Test": "Self test of amplitude functionality for CD failed.",
"Failed Amplitude EF Test": "Self test of amplitude functionality for EF failed.",
"Failed Amplitude GH Test": "Self test of amplitude functionality for GH failed.",
"Failed FPGA Communications Test": "Self test of FPGA communications failed.",
"Failed GPIB Communications Test": "Self test of GPIB communications failed.",
"Failed DDS Communications Test": "Self test of DDS communications failed.",
"Failed Serial EEPROM Communications Test": "Self test of serial EEPROM failed.",
"Failed Temperature Sensor Communications Test": "Temp sensor communication failed.",
"Failed PLL Communications Test": "PLL communication self test failed.",
"Failed DAC 0 Communications Test": "Self test of DAC 0 failed.",
"Failed DAC 1 Communications Test": "Self test of DAC 1 failed.",
"Failed DAC 2 Communications Test": "Self test of DAC 2 failed.",
"Failed Sample and Hold Operations Test": "Sample and hold self test failed.",
"Failed Vjitter Operations Test": "Vjitter operation self test failed.",
"Failed Channel T0 Analog Delay Test": "Analog delay test for T0 failed.",
"Failed Channel T1 Analog Delay Test": "Analog delay test for T1 failed.",
"Failed Channel A Analog Delay Test": "Analog delay test for A failed.",
"Failed Channel B Analog Delay Test": "Analog delay test for B failed.",
"Failed Channel C Analog Delay Test": "Analog delay test for C failed.",
"Failed Channel D Analog Delay Test": "Analog delay test for D failed.",
"Failed Channel E Analog Delay Test": "Analog delay test for E failed.",
"Failed Channel F Analog Delay Test": "Analog delay test for F failed.",
"Failed Channel G Analog Delay Test": "Analog delay test for G failed.",
"Failed Channel H Analog Delay Test": "Analog delay test for H failed.",
"Failed Sample and Hold Calibration": "Auto calibration of sample and hold failed.",
"Failed T0 Calibration": "Auto calibration of channel T0 failed.",
"Failed T1 Calibration": "Auto calibration of channel T1 failed.",
"Failed A Calibration": "Auto calibration of channel A failed.",
"Failed B Calibration": "Auto calibration of channel B failed.",
"Failed C Calibration": "Auto calibration of channel C failed.",
"Failed D Calibration": "Auto calibration of channel D failed.",
"Failed E Calibration": "Auto calibration of channel E failed.",
"Failed F Calibration": "Auto calibration of channel F failed.",
"Failed G Calibration": "Auto calibration of channel G failed.",
"Failed H Calibration": "Auto calibration of channel H failed.",
"Failed Vjitter Calibration": "Auto calibration of Vjitter failed.",
"Illegal Command": "The command syntax used was illegal.",
"Undefined Command": "The specified command does not exist.",
"Illegal Query": "The specified command does not permit queries.",
"Illegal Set": "The specified command can only be queried.",
"Null Parameter": "The parser detected an empty parameter.",
"Extra Parameters": "Too many parameters were provided.",
"Missing Parameters": "Some required parameters are missing.",
"Parameter Overflow": "Buffer overflow while parsing parameters.",
"Invalid Floating Point Number": "Expected a float but couldn't parse it.",
"Invalid Integer": "Expected an integer but couldn't parse it.",
"Integer Overflow": "Parsed integer is too large.",
"Invalid Hexadecimal": "Failed to parse expected hexadecimal input.",
"Syntax Error": "The parser detected a syntax error.",
"Communication Error": "Framing or parity error detected.",
"Over run": "Input buffer overflowed.",
"Too Many Errors": "Error buffer is full; some errors dropped.",
}

View File

@@ -91,16 +91,16 @@ def test_ddg_wait_for_event_status(mock_ddg):
def test_ddg_set_io_values(mock_ddg):
"""Test setting IO values."""
mock_ddg.set_io_values(channel="AB", amplitude=3, offset=2, polarity=1, mode="ttl")
assert mock_ddg.AB.io.amplitude.get() == 3
assert mock_ddg.AB.io.offset.get() == 2
assert mock_ddg.AB.io.polarity.get() == 1
assert mock_ddg.AB.io.ttl_mode.get() == 1
mock_ddg.set_io_values(channel="ab", amplitude=3, offset=2, polarity=1, mode="ttl")
assert mock_ddg.ab.io.amplitude.get() == 3
assert mock_ddg.ab.io.offset.get() == 2
assert mock_ddg.ab.io.polarity.get() == 1
assert mock_ddg.ab.io.ttl_mode.get() == 1
# List of channels
channels = ["AB", "CD", "T0"]
channels = ["ab", "cd", "t0"]
mock_ddg.set_io_values(channel=channels, amplitude=3, offset=2, polarity=1, mode="nim")
for channel in channels:
if channel == "T0":
if channel == "t0":
attr = getattr(mock_ddg, channel)
else:
attr = getattr(mock_ddg, channel).io
@@ -112,13 +112,13 @@ def test_ddg_set_io_values(mock_ddg):
def test_ddg_set_delay_pairs(mock_ddg):
"""Test setting delay pairs."""
mock_ddg.set_delay_pairs(channel="AB", delay=0.1, width=0.2)
assert np.isclose(mock_ddg.AB.delay.get(), 0.1)
assert np.isclose(mock_ddg.AB.width.get(), 0.2)
assert np.isclose(mock_ddg.AB.ch1.setpoint.get(), 0.1)
assert np.isclose(mock_ddg.AB.ch2.setpoint.get(), 0.3)
mock_ddg.set_delay_pairs(channel="ab", delay=0.1, width=0.2)
assert np.isclose(mock_ddg.ab.delay.get(), 0.1)
assert np.isclose(mock_ddg.ab.width.get(), 0.2)
assert np.isclose(mock_ddg.ab.ch1.setpoint.get(), 0.1)
assert np.isclose(mock_ddg.ab.ch2.setpoint.get(), 0.3)
# List of channels
channels = ["AB", "CD", "EF", "GH"]
channels = ["ab", "cd", "ef", "gh"]
delays = [0.1, 0.2, 0.4, 0.5]
mock_ddg.set_delay_pairs(channel=channels, delay=delays, width=0.2)
for delay, channel in zip(delays, channels):