refactor(ddg2): add check for negative pulse widths.

This commit is contained in:
2025-08-05 11:35:43 +02:00
parent a01593fa4b
commit 3c9192d6a5
4 changed files with 69 additions and 15 deletions

View File

@@ -42,6 +42,7 @@ from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (
CHANNELREFERENCE,
OUTPUTPOLARITY,
PROC_EVENT_MODE,
STATUSBITS,
TRIGGERSOURCE,
AllChannelNames,
@@ -122,6 +123,8 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
self.set_io_values(channel, **config)
self.set_trigger(DEFAULT_TRIGGER_SOURCE)
self.set_references_for_channels(DEFAULT_REFERENCES)
# Set proc status to passively update with 5Hz (0.2s)
self.state.proc_status_mode.put(PROC_EVENT_MODE.FREQ_5HZ)
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
@@ -153,6 +156,8 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
If we don't then subsequent triggers may reach the DDG too early, and will be ignored. To
avoid this, we've added the option to specify a delay via add_delay, default here is 50ms.
"""
status = CompareStatus(self.state.event_status, STATUSBITS.NONE)
self.cancel_on_stop(status)
mcs = self.device_manager.devices.get("mcs", None)
if mcs is None:
logger.info(f"Did not find mcs card in current session")
@@ -172,22 +177,23 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
status_acquiring.wait(
timeout=10
) # 2 s wait for mcs card to start should be more than enough..
st = DeviceStatus(self)
self.cancel_on_stop(st)
status.wait(timeout=10)
# Default timeout of 5 seconds + exposure time * frames_per_trigger
timeout = 5 + self.scan_info.msg.scan_parameters.get(
"exp_time", 0.1
) * self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
status = CompareStatus(self.state.event_status, STATUSBITS.END_OF_DELAY, timeout=timeout)
self.cancel_on_stop(status)
self.trigger_shot.put(1, use_complete=True)
time.sleep(self.scan_info.msg.scan_parameters["exp_time"])
self.cancel_on_stop(st)
status = self.wait_for_status(status=st, bit_event=STATUSBITS.END_OF_DELAY, timeout=10)
return status
def wait_for_status(
self, status: StatusBase, bit_event: STATUSBITS, timeout: float = 2
self, status: DeviceStatus, bit_event: STATUSBITS, timeout: float = 2
) -> None:
"""Wait for a event status bit to be set.
Args:
status (StatusBase): The status object to update.
status (DeviceStatus): The status object to update.
bit_event (STATUSBITS): The event status bit to wait for.
timeout (float): Maximum time to wait for the event status bit to be set.
"""

View File

@@ -100,6 +100,10 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS):
frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"]
# a = t0
# a has reference to t0, b has reference to a
if any(exp_time < rt for rt in DEFAULT_READOUT_TIMES.values()):
raise ValueError(
f"Exposure time {exp_time} is too short for the readout times {DEFAULT_READOUT_TIMES}"
)
burst_pulse_width = exp_time - DEFAULT_READOUT_TIMES["ab"]
self.set_delay_pairs(channel="ab", delay=0, width=burst_pulse_width)
self.burst_enable(count=frames_per_trigger, delay=0, period=exp_time)

View File

@@ -6,8 +6,8 @@ https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf
"""
import enum
from typing import Literal, TypedDict
import time
from typing import Literal, TypedDict
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
@@ -76,9 +76,26 @@ class OUTPUTPOLARITY(enum.Enum):
POSITIVE = 1
class PROC_EVENT_MODE(int, enum.Enum):
"""Read mode for MCS channels."""
PASSIVE = 0
EVENT = 1
IO_INTR = 2
FREQ_0_1HZ = 3
FREQ_0_2HZ = 4
FREQ_0_5HZ = 5
FREQ_1HZ = 6
FREQ_2HZ = 7
FREQ_5HZ = 8
FREQ_10HZ = 9
FREQ_100HZ = 10
class STATUSBITS(enum.IntFlag):
"""Bit flags for the status signal of the delay generator."""
NONE = 0 << 0 # No status bits set.
TRIG = 1 << 0 # Got a trigger.
RATE = 1 << 1 # Got a trigger while a delay or burst was in progress.
END_OF_DELAY = 1 << 2 # A delay cycle has completed.
@@ -91,6 +108,7 @@ class STATUSBITS(enum.IntFlag):
def describe(self) -> dict:
"""Return a description of the status bits."""
descriptions = {
STATUSBITS.NONE: "No status bits set.",
STATUSBITS.TRIG: "Got a trigger.",
STATUSBITS.RATE: "Got a trigger while a delay or burst was in progress.",
STATUSBITS.END_OF_DELAY: "A delay cycle has completed.",
@@ -114,7 +132,7 @@ class StatusBitsCompareStatus(SubscriptionStatus):
*args,
event_type=None,
timeout: float | None = None,
add_delay:float|None = None,
add_delay: float | None = None,
settle_time: float = 0,
run: bool = True,
**kwargs,
@@ -137,9 +155,9 @@ class StatusBitsCompareStatus(SubscriptionStatus):
"""Callback for subscription status"""
obj = kwargs.get("obj", None)
if obj is None:
name = 'no object received'
name = "no object received"
else:
name=obj.name
name = obj.name
if any((STATUSBITS(value) & state) == state for state in self._raise_states):
self.set_exception(
ValueError(
@@ -147,7 +165,7 @@ class StatusBitsCompareStatus(SubscriptionStatus):
)
)
return False
if self._add_delay !=0:
if self._add_delay != 0:
time.sleep(self._add_delay)
return (STATUSBITS(value) & self._value) == self._value
@@ -378,17 +396,24 @@ class DelayGeneratorEventStatus(Device):
"EventStatusLI",
name="event_status",
kind=Kind.omitted,
auto_monitor=True,
doc="Event status register for the delay generator",
)
proc_status = Cpt(
EpicsSignal,
"EventStatusLI.PROC",
name="proc_status",
auto_monitor=True,
kind=Kind.omitted,
doc="Poll and flush the latest event status register entry from the HW to the event_status signal",
)
proc_status_mode = Cpt(
EpicsSignal,
"EventStatusLI.SCAN",
kind=Kind.omitted,
doc="Readout mode for transferring data from status buffer to the event_status signal.",
)
class DelayGeneratorCSAXS(Device):
"""
@@ -403,6 +428,13 @@ class DelayGeneratorCSAXS(Device):
In addition, the io layer allows setting amplitude, offset and polarity for each pair.
"""
# USER_ACCESS = [
# "set_channel_reference",
# "set_references_for_channels",
# "set_io_values",
# "set_trigger",
# ]
_pv_timeout: float = 5 # Default timeout for PV operations in seconds
# Front Panel
@@ -686,11 +718,23 @@ class DelayGeneratorCSAXS(Device):
}[channel]
def set_channel_reference(self, channel: LiteralChannels, reference_channel: CHANNELREFERENCE):
"""Set the reference channel for a specific channel.
Args:
channel (LiteralChannels): The channel to set the reference for.
reference_channel (CHANNELREFERENCE): The reference channel to set.
"""
self._get_literal_channel(channel).reference.put(reference_channel.value)
def set_references_for_channels(
self, channels_and_refs: list[tuple[LiteralChannels, CHANNELREFERENCE]]
):
"""Set the reference channels for multiple channels.
Args:
channels_and_refs (list[tuple[LiteralChannels, CHANNELREFERENCE]]): A list of
tuples where each tuple contains a channel and its corresponding reference channel.
"""
for ch, ref in channels_and_refs:
self.set_channel_reference(ch, ref)

View File

@@ -193,7 +193,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self.counter_updated.append(signal.name)
received_all_updates = set(self.counter_updated) == set(self.counter_mapping.keys())
if received_all_updates:
self.ready_to_read.put(1) # Reset happens from DDG class!
self.ready_to_read.put(READYTOREAD.DONE) # Reset happens from DDG class!
self.counter_updated.clear()
def _progress_update(self, value, **kwargs) -> None: