refactor(ddg): final refactoring and cleanup
This commit is contained in:
@@ -1,34 +0,0 @@
|
||||
ddg:
|
||||
description: 'CSAXS master delay generator'
|
||||
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.ddg_master.DDGMaster
|
||||
deviceConfig:
|
||||
prefix: "X12SA-CPCL-DDG1:"
|
||||
enabled: true
|
||||
readOnly: false
|
||||
onFailure: raise
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: true
|
||||
|
||||
samx:
|
||||
readoutPriority: baseline
|
||||
deviceClass: ophyd_devices.SimPositioner
|
||||
deviceConfig:
|
||||
delay: 1
|
||||
limits:
|
||||
- -50
|
||||
- 50
|
||||
tolerance: 0.01
|
||||
update_frequency: 400
|
||||
deviceTags:
|
||||
- user motors
|
||||
enabled: true
|
||||
readOnly: false
|
||||
|
||||
bpm4i:
|
||||
readoutPriority: monitored
|
||||
deviceClass: ophyd_devices.SimMonitor
|
||||
deviceConfig:
|
||||
deviceTags:
|
||||
- beamline
|
||||
enabled: true
|
||||
readOnly: false
|
||||
@@ -0,0 +1 @@
|
||||
from .ddg_master import DDGMaster
|
||||
|
||||
@@ -1,77 +1,92 @@
|
||||
from threading import Event, Thread
|
||||
from time import sleep
|
||||
|
||||
from ophyd import DeviceStatus, StatusBase
|
||||
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import DelayGeneratorCSAXS, ChannelConfig, AllChannelNames, OUTPUTPOLARITY, TRIGGERSOURCE, StatusBitsCompareStatus, STATUSBITS, CHANNELREFERENCE
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from bec_lib.logger import bec_logger
|
||||
import atexit
|
||||
import time
|
||||
from threading import Event, Thread
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import DeviceStatus, StatusBase
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (
|
||||
CHANNELREFERENCE,
|
||||
OUTPUTPOLARITY,
|
||||
STATUSBITS,
|
||||
TRIGGERSOURCE,
|
||||
AllChannelNames,
|
||||
ChannelConfig,
|
||||
DelayGeneratorCSAXS,
|
||||
StatusBitsCompareStatus,
|
||||
)
|
||||
from csaxs_bec.devices.epics.delay_generator_csaxs.error_registry import ERROR_CODES
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
_DEFAULT_CHANNEL_CONFIG: ChannelConfig = {
|
||||
"amplitude":5.0,
|
||||
"offset":0.0,
|
||||
"polarity": OUTPUTPOLARITY.POSITIVE,
|
||||
"mode":"ttl",
|
||||
}
|
||||
"amplitude": 5.0,
|
||||
"offset": 0.0,
|
||||
"polarity": OUTPUTPOLARITY.POSITIVE,
|
||||
"mode": "ttl",
|
||||
}
|
||||
|
||||
DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = {
|
||||
"t0":_DEFAULT_CHANNEL_CONFIG,
|
||||
"ab":_DEFAULT_CHANNEL_CONFIG,
|
||||
"cd":_DEFAULT_CHANNEL_CONFIG,
|
||||
"ef":_DEFAULT_CHANNEL_CONFIG,
|
||||
"gh":_DEFAULT_CHANNEL_CONFIG,
|
||||
"t0": _DEFAULT_CHANNEL_CONFIG,
|
||||
"ab": _DEFAULT_CHANNEL_CONFIG,
|
||||
"cd": _DEFAULT_CHANNEL_CONFIG,
|
||||
"ef": _DEFAULT_CHANNEL_CONFIG,
|
||||
"gh": _DEFAULT_CHANNEL_CONFIG,
|
||||
}
|
||||
DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.SINGLE_SHOT
|
||||
DEFAULT_DEAD_TIMES_S = {
|
||||
"ab":1e-3,
|
||||
"cd":1e-3,
|
||||
"ef":1e-3,
|
||||
"gh":1e-3,
|
||||
}
|
||||
DEFAULT_DEAD_TIMES_S = {"ab": 1e-3, "cd": 1e-3, "ef": 1e-3, "gh": 1e-3}
|
||||
|
||||
|
||||
class DDGMaster(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
"""
|
||||
Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG1
|
||||
"""
|
||||
|
||||
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
def on_connected(self) -> None:
|
||||
"""Set the default values on the device - intended to overwrite everything to a usable default state.
|
||||
Sets DEFAULT_IO_CONFIG into each channel,
|
||||
sets the trigger source to DEFAULT_TRIGGER_SOURCE,
|
||||
sets the trigger source to DEFAULT_TRIGGER_SOURCE,
|
||||
and turns off burst mode"""
|
||||
self.burst_disable() # it is possible to miss setting settings if burst is enabled
|
||||
self.burst_disable() # it is possible to miss setting settings if burst is enabled
|
||||
for channel, config in DEFAULT_IO_CONFIG.items():
|
||||
self.set_io_values(channel, **config)
|
||||
self.set_io_values(channel, **config)
|
||||
self.set_trigger(DEFAULT_TRIGGER_SOURCE)
|
||||
self.set_references_for_channels(
|
||||
[("A", CHANNELREFERENCE.T0),
|
||||
[
|
||||
("A", CHANNELREFERENCE.T0),
|
||||
("B", CHANNELREFERENCE.A),
|
||||
("C", CHANNELREFERENCE.T0),
|
||||
("D", CHANNELREFERENCE.C),
|
||||
("E", CHANNELREFERENCE.T0),
|
||||
("F", CHANNELREFERENCE.E),
|
||||
("G", CHANNELREFERENCE.T0),
|
||||
("H", CHANNELREFERENCE.G)]
|
||||
("H", CHANNELREFERENCE.G),
|
||||
]
|
||||
)
|
||||
# Start background thread to poll status register
|
||||
self._status_polling_stop_event = Event()
|
||||
self._status_polling_thread = Thread(target=self._poll_status) # TODO if exit is called, ca_context from pyepics seems unset.. Hook to kill thread?
|
||||
self._status_polling_thread = Thread(target=self._poll_status)
|
||||
self._status_polling_thread.start()
|
||||
atexit.register(self.on_destroy)
|
||||
|
||||
|
||||
def _poll_status(self):
|
||||
"""The status register has to be actively pulled, triggering the proc_status results in
|
||||
"""The status register has to be actively pulled, triggering the proc_status results in
|
||||
event_status being updated, which in turn allows the StatusBitsCompareStatus from on_trigger
|
||||
to be updated and eventually resolve."""
|
||||
dispatcher = self.state.proc_status.cl.get_dispatcher()
|
||||
event = dispatcher.stop_event
|
||||
while not (self._status_polling_stop_event.is_set() or event.is_set()):
|
||||
self.state.proc_status.put(1)
|
||||
sleep(1/5) # poll the status at 5 Hz
|
||||
|
||||
try:
|
||||
# Call with timeout to avoid blocking in shutdown
|
||||
self.state.proc_status.put(1, timeout=1)
|
||||
except TimeoutError:
|
||||
# If any of the stop events are set, stop polling
|
||||
if self._status_polling_stop_event.is_set() or event.is_set():
|
||||
logger.info("Exiting _poll_status thread loop for DDG.")
|
||||
break
|
||||
time.sleep(1 / 5) # poll the status at 5 Hz
|
||||
|
||||
def on_stage(self) -> DeviceStatus | StatusBase | None:
|
||||
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
|
||||
@@ -80,10 +95,10 @@ class DDGMaster(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
|
||||
if readout_time is not None and readout_time != 0:
|
||||
# If we are given a single readout time from BEC, use it for all 4 channels
|
||||
pulse_widths = [exp_time - readout_time]*len(DEFAULT_DEAD_TIMES_S)
|
||||
pulse_widths = [exp_time - readout_time] * len(DEFAULT_DEAD_TIMES_S)
|
||||
else:
|
||||
# Otherwise, derive the pulse widths from the default dead times defined above
|
||||
pulse_widths = [exp_time - DEFAULT_DEAD_TIMES_S[ch] for ch in DEFAULT_DEAD_TIMES_S.keys()]
|
||||
pulse_widths = [exp_time - DEFAULT_DEAD_TIMES_S[ch] for ch in DEFAULT_DEAD_TIMES_S]
|
||||
logger.info(f"setting pulse widths to {pulse_widths}")
|
||||
self.set_delay_pairs(["ab", "cd", "ef", "gh"], delay=0, width=pulse_widths)
|
||||
self.burst_enable(count=frames_per_trigger, delay=0, period=exp_time)
|
||||
@@ -93,13 +108,18 @@ class DDGMaster(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
self.cancel_on_stop(st)
|
||||
self.trigger_shot.put(1)
|
||||
return st
|
||||
|
||||
|
||||
def on_destroy(self) -> None:
|
||||
if getattr(self, "_status_polling_stop_event", None) is not None:
|
||||
self._status_polling_stop_event.set()
|
||||
if getattr(self, "_status_polling_thread", None) is not None:
|
||||
self._status_polling_thread.join(timeout=3)
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Stop the delay generator by setting the burst mode to 0"""
|
||||
self.stop_ddg()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
ddg = DDGMaster(name="ddg", prefix="X12SA-CPCL-DDG1:")
|
||||
ddg.wait_for_connection(all_signals=True, timeout=30)
|
||||
|
||||
@@ -8,16 +8,19 @@ https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf
|
||||
import enum
|
||||
from typing import Literal, TypedDict
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind, Signal
|
||||
from ophyd_devices import StatusBase, SubscriptionStatus
|
||||
from typeguard import typechecked
|
||||
from bec_lib.logger import bec_logger
|
||||
|
||||
from csaxs_bec.devices.epics.delay_generator_csaxs.error_registry import ERROR_CODES
|
||||
|
||||
logger = bec_logger.logger
|
||||
DelayChannelNames = Literal["ab", "cd", "ef", "gh"]
|
||||
AllChannelNames = Literal["t0", "ab", "cd", "ef", "gh"]
|
||||
LiteralChannels = Literal["A","B","C","D","E","F","G","H"]
|
||||
LiteralChannels = Literal["A", "B", "C", "D", "E", "F", "G", "H"]
|
||||
|
||||
|
||||
class CHANNELREFERENCE(enum.Enum):
|
||||
T0 = 0
|
||||
@@ -30,6 +33,7 @@ class CHANNELREFERENCE(enum.Enum):
|
||||
G = 7
|
||||
H = 8
|
||||
|
||||
|
||||
class BURSTCONFIG(enum.Enum):
|
||||
"""Enum option for burst_config signal of the delay generator.
|
||||
|
||||
@@ -105,9 +109,10 @@ class StatusBitsCompareStatus(SubscriptionStatus):
|
||||
self,
|
||||
signal: EpicsSignalRO,
|
||||
value: STATUSBITS,
|
||||
raise_states: list[STATUSBITS] | None = None,
|
||||
*args,
|
||||
event_type=None,
|
||||
timeout: float|None = None,
|
||||
timeout: float | None = None,
|
||||
settle_time: float = 0,
|
||||
run: bool = True,
|
||||
**kwargs,
|
||||
@@ -115,6 +120,7 @@ class StatusBitsCompareStatus(SubscriptionStatus):
|
||||
"""Initialize the compare status with a signal."""
|
||||
self._signal = signal
|
||||
self._value = value
|
||||
self._raise_states = raise_states or []
|
||||
super().__init__(
|
||||
device=signal,
|
||||
callback=self._compare_callback,
|
||||
@@ -126,28 +132,43 @@ class StatusBitsCompareStatus(SubscriptionStatus):
|
||||
|
||||
def _compare_callback(self, value, **kwargs) -> bool:
|
||||
"""Callback for subscription status"""
|
||||
if any((STATUSBITS(value) & state) == state for state in self._raise_states):
|
||||
self.set_exception(
|
||||
ValueError(
|
||||
f"Status bits {STATUSBITS(value).describe()} raised an exception: {self._raise_states}"
|
||||
)
|
||||
)
|
||||
return False
|
||||
return (STATUSBITS(value) & self._value) == self._value
|
||||
|
||||
|
||||
|
||||
class ChannelConfig(TypedDict):
|
||||
amplitude: float | None
|
||||
offset: float | None
|
||||
polarity: OUTPUTPOLARITY | Literal[0, 1] | None
|
||||
mode: Literal["ttl", "nim"] | None
|
||||
amplitude: float | None
|
||||
offset: float | None
|
||||
polarity: OUTPUTPOLARITY | Literal[0, 1] | None
|
||||
mode: Literal["ttl", "nim"] | None
|
||||
|
||||
|
||||
class StaticPair(Device):
|
||||
"""
|
||||
Class to represent a static pair.
|
||||
Class to represent a static pair (T0, aswell as all AB, CB, EF, GH channels).
|
||||
It allows setting the logic levels, but the timing is fixed.
|
||||
The signal is high after receiving the trigger until the end of the holdoff period.
|
||||
"""
|
||||
|
||||
ttl_mode = Cpt(
|
||||
EpicsSignal, "OutputModeTtlSS.PROC", kind=Kind.omitted, auto_monitor=True
|
||||
EpicsSignal,
|
||||
"OutputModeTtlSS.PROC",
|
||||
kind=Kind.omitted,
|
||||
auto_monitor=True,
|
||||
doc="Set the output mode to TTL",
|
||||
)
|
||||
nim_mode = Cpt(
|
||||
EpicsSignal, "OutputModeNimSS.PROC", kind=Kind.omitted, auto_monitor=True
|
||||
EpicsSignal,
|
||||
"OutputModeNimSS.PROC",
|
||||
kind=Kind.omitted,
|
||||
auto_monitor=True,
|
||||
doc="Set the output mode to NIM",
|
||||
)
|
||||
polarity = Cpt(
|
||||
EpicsSignal,
|
||||
@@ -156,6 +177,7 @@ class StaticPair(Device):
|
||||
name="polarity",
|
||||
kind=Kind.omitted,
|
||||
auto_monitor=True,
|
||||
doc="Control the polarity of the output signal. POS 1 or NEG 0",
|
||||
)
|
||||
|
||||
amplitude = Cpt(
|
||||
@@ -165,6 +187,7 @@ class StaticPair(Device):
|
||||
name="amplitude",
|
||||
kind=Kind.omitted,
|
||||
auto_monitor=True,
|
||||
doc="Amplitude of the output signal in volts.",
|
||||
)
|
||||
|
||||
offset = Cpt(
|
||||
@@ -174,12 +197,13 @@ class StaticPair(Device):
|
||||
name="offset",
|
||||
kind=Kind.omitted,
|
||||
auto_monitor=True,
|
||||
doc="Offset of the output signal in volts.",
|
||||
)
|
||||
|
||||
|
||||
class Channel(Device):
|
||||
"""
|
||||
Represents a single channel A, B, etc. of the delay generator.
|
||||
Represents a single channel A, B, C, ... of the delay generator.
|
||||
"""
|
||||
|
||||
setpoint = Cpt(
|
||||
@@ -189,7 +213,7 @@ class Channel(Device):
|
||||
put_complete=True,
|
||||
auto_monitor=True,
|
||||
kind=Kind.omitted,
|
||||
doc="Value for the channel",
|
||||
doc="Setpoint value for the delay of the channel",
|
||||
)
|
||||
reference = Cpt(
|
||||
EpicsSignal,
|
||||
@@ -197,7 +221,7 @@ class Channel(Device):
|
||||
put_complete=True,
|
||||
kind=Kind.omitted,
|
||||
auto_monitor=True,
|
||||
doc="Reference channel for the channel", # Check defaults, possible should be T0,A,B,...
|
||||
doc="Reference channel T0,A,B,.. for the delay of the setpoint",
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
@@ -219,8 +243,8 @@ class WidthSignal(Signal):
|
||||
Returns:
|
||||
float: The width of the channel in seconds.
|
||||
"""
|
||||
parent: _DelayPairBase = self._parent # type: ignore
|
||||
return parent.ch2.setpoint.get() - parent.ch1.setpoint.get() # type: ignore
|
||||
parent: _DelayPairBase = self._parent # type: ignore
|
||||
return parent.ch2.setpoint.get() - parent.ch1.setpoint.get() # type: ignore
|
||||
|
||||
def check_value(self, value: float) -> float:
|
||||
"""Check if the value is larger equal to 0"""
|
||||
@@ -237,8 +261,8 @@ class WidthSignal(Signal):
|
||||
value (float): The width to set in seconds.
|
||||
"""
|
||||
self.check_value(value)
|
||||
parent: _DelayPairBase = self._parent # type: ignore
|
||||
ch1_setpoint: float = parent.ch1.setpoint.get()# type: ignore
|
||||
parent: _DelayPairBase = self._parent # type: ignore
|
||||
ch1_setpoint: float = parent.ch1.setpoint.get() # type: ignore
|
||||
parent.ch2.setpoint.put(ch1_setpoint + value, **kwargs)
|
||||
|
||||
def set(self, value: float, **kwargs):
|
||||
@@ -264,7 +288,7 @@ class DelaySignal(Signal):
|
||||
Returns:
|
||||
float: The delay of the channel in seconds.
|
||||
"""
|
||||
parent: _DelayPairBase = self._parent # type: ignore
|
||||
parent: _DelayPairBase = self._parent # type: ignore
|
||||
return parent.ch1.setpoint.get()
|
||||
|
||||
def put(self, value: float, **kwargs):
|
||||
@@ -274,7 +298,7 @@ class DelaySignal(Signal):
|
||||
Args:
|
||||
value (float): The delay to set in seconds.
|
||||
"""
|
||||
parent: _DelayPairBase = self._parent # type: ignore
|
||||
parent: _DelayPairBase = self._parent # type: ignore
|
||||
parent.ch1.setpoint.put(value, **kwargs)
|
||||
parent.ch2.setpoint.put(value + parent.width.get(), **kwargs)
|
||||
|
||||
@@ -289,51 +313,69 @@ class DelaySignal(Signal):
|
||||
self.put(value, **kwargs)
|
||||
status.set_finished()
|
||||
return status
|
||||
|
||||
|
||||
|
||||
class _DelayPairBase(Device):
|
||||
""" Base class for delay pairs. Children have to implement ch1,ch2 for
|
||||
"""Base class for delay pairs. Children have to implement ch1,ch2 for
|
||||
the respective delay channels. The class attributes have to be called
|
||||
ch1, ch2 for width and delay signals to work."""
|
||||
|
||||
ch1: Cpt[Channel]
|
||||
ch2: Cpt[Channel]
|
||||
io: Cpt[StaticPair]
|
||||
width = Cpt(WidthSignal, name="width", kind=Kind.config, doc="Width of TTL pulse")
|
||||
delay = Cpt(DelaySignal, name="delay", kind=Kind.config, doc="Delay of TTL pulse")
|
||||
width = Cpt(
|
||||
WidthSignal, name="width", kind=Kind.config, doc="Width of TTL pulse for delay pair"
|
||||
)
|
||||
delay = Cpt(
|
||||
DelaySignal, name="delay", kind=Kind.config, doc="Delay of TTL pulse for delay pair"
|
||||
)
|
||||
|
||||
|
||||
class DelayPairAB(_DelayPairBase):
|
||||
|
||||
ch1 = Cpt(Channel, "A", name="A", kind=Kind.omitted)
|
||||
ch2 = Cpt(Channel, "B", name="B", kind=Kind.omitted)
|
||||
io = Cpt(StaticPair, "AB", name="io", kind=Kind.omitted)
|
||||
ch1 = Cpt(Channel, "A", name="A", kind=Kind.omitted, doc="Channel A")
|
||||
ch2 = Cpt(Channel, "B", name="B", kind=Kind.omitted, doc="Channel B")
|
||||
io = Cpt(StaticPair, "AB", name="io", kind=Kind.omitted, doc="IO for delay pair AB")
|
||||
|
||||
|
||||
class DelayPairCD(_DelayPairBase):
|
||||
|
||||
ch1 = Cpt(Channel, "C", name="C", kind=Kind.omitted)
|
||||
ch2 = Cpt(Channel, "D", name="D", kind=Kind.omitted)
|
||||
io = Cpt(StaticPair, "CD", name="io", kind=Kind.omitted)
|
||||
ch1 = Cpt(Channel, "C", name="C", kind=Kind.omitted, doc="Channel C")
|
||||
ch2 = Cpt(Channel, "D", name="D", kind=Kind.omitted, doc="Channel D")
|
||||
io = Cpt(StaticPair, "CD", name="io", kind=Kind.omitted, doc="IO for delay pair CD")
|
||||
|
||||
|
||||
class DelayPairEF(_DelayPairBase):
|
||||
|
||||
ch1 = Cpt(Channel, "E", name="E", kind=Kind.omitted)
|
||||
ch2 = Cpt(Channel, "F", name="F", kind=Kind.omitted)
|
||||
io = Cpt(StaticPair, "EF", name="io", kind=Kind.omitted)
|
||||
ch1 = Cpt(Channel, "E", name="E", kind=Kind.omitted, doc="Channel E")
|
||||
ch2 = Cpt(Channel, "F", name="F", kind=Kind.omitted, doc="Channel F")
|
||||
io = Cpt(StaticPair, "EF", name="io", kind=Kind.omitted, doc="IO for delay pair EF")
|
||||
|
||||
|
||||
class DelayPairGH(_DelayPairBase):
|
||||
|
||||
ch1 = Cpt(Channel, "G", name="G", kind=Kind.omitted)
|
||||
ch2 = Cpt(Channel, "H", name="H", kind=Kind.omitted)
|
||||
io = Cpt(StaticPair, "GH", name="io", kind=Kind.omitted)
|
||||
ch1 = Cpt(Channel, "G", name="G", kind=Kind.omitted, doc="Channel G")
|
||||
ch2 = Cpt(Channel, "H", name="H", kind=Kind.omitted, doc="Channel H")
|
||||
io = Cpt(StaticPair, "GH", name="io", kind=Kind.omitted, doc="IO for delay pair GH")
|
||||
|
||||
|
||||
class DelayGeneratorEventStatus(Device):
|
||||
"""Subdevice to represent the event state of the delay generator."""
|
||||
|
||||
event_status = Cpt(
|
||||
EpicsSignalRO, "EventStatusLI", name="event_status", kind=Kind.omitted, auto_monitor=True
|
||||
EpicsSignalRO,
|
||||
"EventStatusLI",
|
||||
name="event_status",
|
||||
kind=Kind.omitted,
|
||||
auto_monitor=True,
|
||||
doc="Event status register for the delay generator",
|
||||
)
|
||||
proc_status = Cpt(
|
||||
EpicsSignal, "EventStatusLI.PROC", name="proc_status", kind=Kind.omitted
|
||||
EpicsSignal,
|
||||
"EventStatusLI.PROC",
|
||||
name="proc_status",
|
||||
kind=Kind.omitted,
|
||||
doc="Poll and flush the latest event status register entry from the HW to the event_status signal",
|
||||
)
|
||||
|
||||
|
||||
@@ -353,17 +395,26 @@ class DelayGeneratorCSAXS(Device):
|
||||
_pv_timeout: float = 1.5 # Default timeout for PV operations in seconds
|
||||
|
||||
# Front Panel
|
||||
t0 = Cpt(StaticPair, "T0", name="t0")
|
||||
ab = Cpt(DelayPairAB, "", name="ab")
|
||||
cd = Cpt(DelayPairCD, "", name="cd")
|
||||
ef = Cpt(DelayPairEF, "", name="ef")
|
||||
gh = Cpt(DelayPairGH, "", name="gh")
|
||||
state = Cpt(DelayGeneratorEventStatus, "", name="state")
|
||||
t0 = Cpt(StaticPair, "T0", name="t0", doc="T0 static pair")
|
||||
ab = Cpt(DelayPairAB, "", name="ab", doc="Delay pair AB")
|
||||
cd = Cpt(DelayPairCD, "", name="cd", doc="Delay pair CD")
|
||||
ef = Cpt(DelayPairEF, "", name="ef", doc="Delay pair EF")
|
||||
gh = Cpt(DelayPairGH, "", name="gh", doc="Delay pair GH")
|
||||
state = Cpt(DelayGeneratorEventStatus, "", name="state", doc="Subdevice for event status")
|
||||
status_msg = Cpt(
|
||||
EpicsSignalRO, "StatusSI", name="status_msg", kind=Kind.omitted, auto_monitor=True
|
||||
EpicsSignalRO,
|
||||
"StatusSI",
|
||||
name="status_msg",
|
||||
kind=Kind.omitted,
|
||||
auto_monitor=True,
|
||||
doc="Status message from the delay generator",
|
||||
)
|
||||
status_msg_clear = Cpt(
|
||||
EpicsSignal, "StatusClearBO", name="status_msg_clear", kind=Kind.omitted
|
||||
EpicsSignal,
|
||||
"StatusClearBO",
|
||||
name="status_msg_clear",
|
||||
kind=Kind.omitted,
|
||||
doc="Clear the status message",
|
||||
)
|
||||
|
||||
trigger_holdoff = Cpt(
|
||||
@@ -386,7 +437,7 @@ class DelayGeneratorCSAXS(Device):
|
||||
write_pv="TriggerSourceMO",
|
||||
name="trigger_source",
|
||||
kind=Kind.omitted,
|
||||
doc="Trigger Source for the DDG, options in TRIGGERSOURCE"
|
||||
doc="Trigger Source for the DDG, options in TRIGGERSOURCE",
|
||||
)
|
||||
trigger_level = Cpt(
|
||||
EpicsSignal,
|
||||
@@ -403,10 +454,20 @@ class DelayGeneratorCSAXS(Device):
|
||||
kind=Kind.omitted,
|
||||
)
|
||||
trigger_shot = Cpt(
|
||||
EpicsSignal, "TriggerDelayBO", name="trigger_shot", kind=Kind.omitted
|
||||
EpicsSignal,
|
||||
"TriggerDelayBO",
|
||||
name="trigger_shot",
|
||||
kind=Kind.omitted,
|
||||
doc="Software trigger, needs to be in correct mode to work",
|
||||
)
|
||||
burst_mode = Cpt(
|
||||
EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burst_mode", kind=Kind.omitted
|
||||
EpicsSignal,
|
||||
"BurstModeBI",
|
||||
write_pv="BurstModeBO",
|
||||
name="burst_mode",
|
||||
kind=Kind.omitted,
|
||||
auto_monitor=True,
|
||||
doc="Enable or disable burst mode. 1 = enabled, 0 = disabled.",
|
||||
)
|
||||
burst_config = Cpt(
|
||||
EpicsSignal,
|
||||
@@ -414,12 +475,23 @@ class DelayGeneratorCSAXS(Device):
|
||||
write_pv="BurstConfigBO",
|
||||
name="burst_config",
|
||||
kind=Kind.omitted,
|
||||
doc="Configuration of T0 during burst. Can be ALL_CYCLES (0) or FIRST_CYCLE (1) .",
|
||||
)
|
||||
burst_count = Cpt(
|
||||
EpicsSignal, "BurstCountLI", write_pv="BurstCountLO", name="burst_count", kind=Kind.omitted
|
||||
EpicsSignal,
|
||||
"BurstCountLI",
|
||||
write_pv="BurstCountLO",
|
||||
name="burst_count",
|
||||
kind=Kind.omitted,
|
||||
doc="Number of bursts to trigger in burst mode. Must be >0.",
|
||||
)
|
||||
burst_delay = Cpt(
|
||||
EpicsSignal, "BurstDelayAI", write_pv="BurstDelayAO", name="burst_delay", kind=Kind.omitted
|
||||
EpicsSignal,
|
||||
"BurstDelayAI",
|
||||
write_pv="BurstDelayAO",
|
||||
name="burst_delay",
|
||||
kind=Kind.omitted,
|
||||
doc="Delay before bursts start in seconds. Must be >=0.",
|
||||
)
|
||||
burst_period = Cpt(
|
||||
EpicsSignal,
|
||||
@@ -427,6 +499,7 @@ class DelayGeneratorCSAXS(Device):
|
||||
write_pv="BurstPeriodAO",
|
||||
name="burst_period",
|
||||
kind=Kind.omitted,
|
||||
doc="Period of the bursts in seconds. Must be >0.",
|
||||
)
|
||||
|
||||
def proc_event_status(self) -> None:
|
||||
@@ -511,9 +584,7 @@ class DelayGeneratorCSAXS(Device):
|
||||
@typechecked
|
||||
def set_io_values(
|
||||
self,
|
||||
channel: (
|
||||
AllChannelNames | list[AllChannelNames]
|
||||
),
|
||||
channel: AllChannelNames | list[AllChannelNames],
|
||||
amplitude: float | None = None,
|
||||
offset: float | None = None,
|
||||
polarity: OUTPUTPOLARITY | Literal[0, 1] | None = None,
|
||||
@@ -572,7 +643,7 @@ class DelayGeneratorCSAXS(Device):
|
||||
channel = [channel]
|
||||
if isinstance(delay, (float, int)):
|
||||
delay = [float(delay)] * len(channel)
|
||||
if isinstance(width, (float,int)):
|
||||
if isinstance(width, (float, int)):
|
||||
width = [float(width)] * len(channel)
|
||||
if delay is not None:
|
||||
if len(delay) != len(channel):
|
||||
@@ -580,7 +651,7 @@ class DelayGeneratorCSAXS(Device):
|
||||
f"Length of delay {len(delay)} must match length of channel {len(channel)}."
|
||||
)
|
||||
for ii, ch in enumerate(channel):
|
||||
delay_channel=getattr(self, ch)
|
||||
delay_channel = getattr(self, ch)
|
||||
delay_channel.delay.put(delay[ii])
|
||||
if width is not None:
|
||||
if len(width) != len(channel):
|
||||
@@ -589,7 +660,7 @@ class DelayGeneratorCSAXS(Device):
|
||||
)
|
||||
logger.info(f"setting widths of channels {channel} to {width}")
|
||||
for ii, ch in enumerate(channel):
|
||||
delay_channel= getattr(self, ch)
|
||||
delay_channel = getattr(self, ch)
|
||||
delay_channel.width.put(width[ii])
|
||||
|
||||
def _get_literal_channel(self, channel: LiteralChannels) -> Channel:
|
||||
@@ -607,10 +678,29 @@ class DelayGeneratorCSAXS(Device):
|
||||
def set_channel_reference(self, channel: LiteralChannels, reference_channel: CHANNELREFERENCE):
|
||||
self._get_literal_channel(channel).reference.put(reference_channel.value)
|
||||
|
||||
def set_references_for_channels(self, channels_and_refs: list[tuple[LiteralChannels, CHANNELREFERENCE]]):
|
||||
def set_references_for_channels(
|
||||
self, channels_and_refs: list[tuple[LiteralChannels, CHANNELREFERENCE]]
|
||||
):
|
||||
for ch, ref in channels_and_refs:
|
||||
self.set_channel_reference(ch, ref)
|
||||
|
||||
def stop_ddg(self) -> None:
|
||||
"""Stop the delay generator by setting the burst mode to 0"""
|
||||
self.burst_mode.put(0)
|
||||
|
||||
def reset_error(self) -> None:
|
||||
"""Reset the error status message of the delay generator."""
|
||||
self.status_msg_clear.put(1)
|
||||
|
||||
def get_error_msg(self) -> str:
|
||||
"""Get the error message from the delay generator."""
|
||||
msg = self.status_msg.get()
|
||||
if msg in ERROR_CODES:
|
||||
return ERROR_CODES[msg]
|
||||
else:
|
||||
return f"Unknown error code: {msg}"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
ddg = DelayGeneratorCSAXS(name="ddg", prefix="X12SA-CPCL-DDG1:")
|
||||
ddg.wait_for_connection(all_signals=True, timeout=30)
|
||||
|
||||
@@ -0,0 +1,73 @@
|
||||
ERROR_CODES: dict[str, str] = {
|
||||
"STATUS OK": "No more errors left in the queue.", # renamed apparently from the IOC for No Error to STATUS OK
|
||||
"Illegal Value": "A parameter was out of range.",
|
||||
"Illegal Mode": "The action is illegal in the current mode.",
|
||||
"Illegal Delay": "The requested delay is out of range.",
|
||||
"Illegal Link": "The requested delay linkage is illegal.",
|
||||
"Recall Failed": "Recall of instrument settings failed; settings were invalid.",
|
||||
"Not Allowed": "Action not allowed: instrument is locked by another interface.",
|
||||
"Failed Self Test": "The DG645 self test failed.",
|
||||
"Failed Auto Calibration": "The DG645 auto calibration failed.",
|
||||
"Lost Data": "Output buffer overflow or data lost due to communication error.",
|
||||
"No Listener": "No GPIB listeners; pending output discarded.",
|
||||
"Failed ROM Check": "ROM checksum failed; firmware likely corrupted.",
|
||||
"Failed Offset T0 Test": "Self test of offset functionality for T0 failed.",
|
||||
"Failed Offset AB Test": "Self test of offset functionality for AB failed.",
|
||||
"Failed Offset CD Test": "Self test of offset functionality for CD failed.",
|
||||
"Failed Offset EF Test": "Self test of offset functionality for EF failed.",
|
||||
"Failed Offset GH Test": "Self test of offset functionality for GH failed.",
|
||||
"Failed Amplitude T0 Test": "Self test of amplitude functionality for T0 failed.",
|
||||
"Failed Amplitude AB Test": "Self test of amplitude functionality for AB failed.",
|
||||
"Failed Amplitude CD Test": "Self test of amplitude functionality for CD failed.",
|
||||
"Failed Amplitude EF Test": "Self test of amplitude functionality for EF failed.",
|
||||
"Failed Amplitude GH Test": "Self test of amplitude functionality for GH failed.",
|
||||
"Failed FPGA Communications Test": "Self test of FPGA communications failed.",
|
||||
"Failed GPIB Communications Test": "Self test of GPIB communications failed.",
|
||||
"Failed DDS Communications Test": "Self test of DDS communications failed.",
|
||||
"Failed Serial EEPROM Communications Test": "Self test of serial EEPROM failed.",
|
||||
"Failed Temperature Sensor Communications Test": "Temp sensor communication failed.",
|
||||
"Failed PLL Communications Test": "PLL communication self test failed.",
|
||||
"Failed DAC 0 Communications Test": "Self test of DAC 0 failed.",
|
||||
"Failed DAC 1 Communications Test": "Self test of DAC 1 failed.",
|
||||
"Failed DAC 2 Communications Test": "Self test of DAC 2 failed.",
|
||||
"Failed Sample and Hold Operations Test": "Sample and hold self test failed.",
|
||||
"Failed Vjitter Operations Test": "Vjitter operation self test failed.",
|
||||
"Failed Channel T0 Analog Delay Test": "Analog delay test for T0 failed.",
|
||||
"Failed Channel T1 Analog Delay Test": "Analog delay test for T1 failed.",
|
||||
"Failed Channel A Analog Delay Test": "Analog delay test for A failed.",
|
||||
"Failed Channel B Analog Delay Test": "Analog delay test for B failed.",
|
||||
"Failed Channel C Analog Delay Test": "Analog delay test for C failed.",
|
||||
"Failed Channel D Analog Delay Test": "Analog delay test for D failed.",
|
||||
"Failed Channel E Analog Delay Test": "Analog delay test for E failed.",
|
||||
"Failed Channel F Analog Delay Test": "Analog delay test for F failed.",
|
||||
"Failed Channel G Analog Delay Test": "Analog delay test for G failed.",
|
||||
"Failed Channel H Analog Delay Test": "Analog delay test for H failed.",
|
||||
"Failed Sample and Hold Calibration": "Auto calibration of sample and hold failed.",
|
||||
"Failed T0 Calibration": "Auto calibration of channel T0 failed.",
|
||||
"Failed T1 Calibration": "Auto calibration of channel T1 failed.",
|
||||
"Failed A Calibration": "Auto calibration of channel A failed.",
|
||||
"Failed B Calibration": "Auto calibration of channel B failed.",
|
||||
"Failed C Calibration": "Auto calibration of channel C failed.",
|
||||
"Failed D Calibration": "Auto calibration of channel D failed.",
|
||||
"Failed E Calibration": "Auto calibration of channel E failed.",
|
||||
"Failed F Calibration": "Auto calibration of channel F failed.",
|
||||
"Failed G Calibration": "Auto calibration of channel G failed.",
|
||||
"Failed H Calibration": "Auto calibration of channel H failed.",
|
||||
"Failed Vjitter Calibration": "Auto calibration of Vjitter failed.",
|
||||
"Illegal Command": "The command syntax used was illegal.",
|
||||
"Undefined Command": "The specified command does not exist.",
|
||||
"Illegal Query": "The specified command does not permit queries.",
|
||||
"Illegal Set": "The specified command can only be queried.",
|
||||
"Null Parameter": "The parser detected an empty parameter.",
|
||||
"Extra Parameters": "Too many parameters were provided.",
|
||||
"Missing Parameters": "Some required parameters are missing.",
|
||||
"Parameter Overflow": "Buffer overflow while parsing parameters.",
|
||||
"Invalid Floating Point Number": "Expected a float but couldn't parse it.",
|
||||
"Invalid Integer": "Expected an integer but couldn't parse it.",
|
||||
"Integer Overflow": "Parsed integer is too large.",
|
||||
"Invalid Hexadecimal": "Failed to parse expected hexadecimal input.",
|
||||
"Syntax Error": "The parser detected a syntax error.",
|
||||
"Communication Error": "Framing or parity error detected.",
|
||||
"Over run": "Input buffer overflowed.",
|
||||
"Too Many Errors": "Error buffer is full; some errors dropped.",
|
||||
}
|
||||
@@ -91,16 +91,16 @@ def test_ddg_wait_for_event_status(mock_ddg):
|
||||
|
||||
def test_ddg_set_io_values(mock_ddg):
|
||||
"""Test setting IO values."""
|
||||
mock_ddg.set_io_values(channel="AB", amplitude=3, offset=2, polarity=1, mode="ttl")
|
||||
assert mock_ddg.AB.io.amplitude.get() == 3
|
||||
assert mock_ddg.AB.io.offset.get() == 2
|
||||
assert mock_ddg.AB.io.polarity.get() == 1
|
||||
assert mock_ddg.AB.io.ttl_mode.get() == 1
|
||||
mock_ddg.set_io_values(channel="ab", amplitude=3, offset=2, polarity=1, mode="ttl")
|
||||
assert mock_ddg.ab.io.amplitude.get() == 3
|
||||
assert mock_ddg.ab.io.offset.get() == 2
|
||||
assert mock_ddg.ab.io.polarity.get() == 1
|
||||
assert mock_ddg.ab.io.ttl_mode.get() == 1
|
||||
# List of channels
|
||||
channels = ["AB", "CD", "T0"]
|
||||
channels = ["ab", "cd", "t0"]
|
||||
mock_ddg.set_io_values(channel=channels, amplitude=3, offset=2, polarity=1, mode="nim")
|
||||
for channel in channels:
|
||||
if channel == "T0":
|
||||
if channel == "t0":
|
||||
attr = getattr(mock_ddg, channel)
|
||||
else:
|
||||
attr = getattr(mock_ddg, channel).io
|
||||
@@ -112,13 +112,13 @@ def test_ddg_set_io_values(mock_ddg):
|
||||
|
||||
def test_ddg_set_delay_pairs(mock_ddg):
|
||||
"""Test setting delay pairs."""
|
||||
mock_ddg.set_delay_pairs(channel="AB", delay=0.1, width=0.2)
|
||||
assert np.isclose(mock_ddg.AB.delay.get(), 0.1)
|
||||
assert np.isclose(mock_ddg.AB.width.get(), 0.2)
|
||||
assert np.isclose(mock_ddg.AB.ch1.setpoint.get(), 0.1)
|
||||
assert np.isclose(mock_ddg.AB.ch2.setpoint.get(), 0.3)
|
||||
mock_ddg.set_delay_pairs(channel="ab", delay=0.1, width=0.2)
|
||||
assert np.isclose(mock_ddg.ab.delay.get(), 0.1)
|
||||
assert np.isclose(mock_ddg.ab.width.get(), 0.2)
|
||||
assert np.isclose(mock_ddg.ab.ch1.setpoint.get(), 0.1)
|
||||
assert np.isclose(mock_ddg.ab.ch2.setpoint.get(), 0.3)
|
||||
# List of channels
|
||||
channels = ["AB", "CD", "EF", "GH"]
|
||||
channels = ["ab", "cd", "ef", "gh"]
|
||||
delays = [0.1, 0.2, 0.4, 0.5]
|
||||
mock_ddg.set_delay_pairs(channel=channels, delay=delays, width=0.2)
|
||||
for delay, channel in zip(delays, channels):
|
||||
|
||||
Reference in New Issue
Block a user