feat: adding ddg1 and ddg2 logic for csaxs

This commit is contained in:
2025-07-14 18:14:30 +02:00
parent 173893ec33
commit 98e4364176
6 changed files with 201 additions and 145 deletions

View File

@@ -1,6 +1,6 @@
ddg_master:
ddg1:
description: Main delay Generator for triggering
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DDGMaster
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DDG1
enabled: true
deviceConfig:
prefix: 'X12SA-CPCL-DDG1:'
@@ -9,6 +9,17 @@ ddg_master:
readoutPriority: baseline
softwareTrigger: true
ddg2:
description: Detector delay Generator for trigger burst
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DDG2
enabled: true
deviceConfig:
prefix: 'X12SA-CPCL-DDG2:'
onFailure: raise
readOnly: false
readoutPriority: baseline
softwareTrigger: false
samx:
readoutPriority: baseline
deviceClass: ophyd_devices.SimPositioner

View File

@@ -1 +1,2 @@
from .ddg_1 import DDG1
from .ddg_2 import DDG2

View File

@@ -1,6 +1,35 @@
import atexit
"""
DDG1 delay generator
This module implements the DDG1 delay generator logic for the CSAXS beamline.
The attached PDF trigger_scheme_ddg1_ddg2.pdf provides a more detailed overview of
the trigger scheme. If the logic changes in the future, it is highly recommended to
update the PDF accordingly.
The DDG1 is the main trigger delay generator for the CSAXS beamline. It will
receive either a soft trigger from BEC (depending on the scan type) or a hardware trigger
from a beamline device (e.g. the Galil stages). It is responsible for opening the shutter
and sending a trigger to the Delay Generator CSAXS (DDG2), which in turn will
send the trigger to the detectors. DDG1 will not be witout burst mode, but rather in standard
mode creating delays for the channels ab, cd, ef, gh.
A brief summary of the DDG1 logic:
DELAY PAIRS:
- DelayPair ab is connected to the EXT/EN of DDG2.
- DelayPair cd is connected to the SHUTTER.
- DelayPair ef is connected to an OR gate together with the detector
PULSE train for the MCS card. The MCS card needs one extra pulse to forward points.
DELAY CHANNELS:
- a = t0 + 2ms (2ms delay to allow the shutter to open)
- b = a + 1us (short pulse)
- c = t0
- d = a + exp_time * burst_count + 1ms (to allow the shutter to close)
- e = d
- f = e + 1us (short pulse to OR gate for MCS triggering)
"""
import time
from threading import Event, Thread
from bec_lib.logger import bec_logger
from ophyd import DeviceStatus, StatusBase
@@ -14,9 +43,7 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import
AllChannelNames,
ChannelConfig,
DelayGeneratorCSAXS,
StatusBitsCompareStatus,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.error_registry import ERROR_CODES
logger = bec_logger.logger
@@ -35,104 +62,101 @@ DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = {
"gh": _DEFAULT_CHANNEL_CONFIG,
}
DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.SINGLE_SHOT
DEFAULT_DEAD_TIMES_S = {"ab": 1e-3, "cd": 1e-3, "ef": 1e-3, "gh": 1e-3}
DEFAULT_READOUT_TIMES = {"ab": 2e-4, "cd": 2e-4, "ef": 2e-4, "gh": 2e-4} # 0.2 ms 5kHz
DEFAULT_REFERENCES: list[tuple[AllChannelNames, CHANNELREFERENCE]] = [
("A", CHANNELREFERENCE.T0), # T0 + 2ms delay
("B", CHANNELREFERENCE.A),
("C", CHANNELREFERENCE.T0), # T0
("D", CHANNELREFERENCE.C),
("E", CHANNELREFERENCE.D), # D One extra pulse once shutter closes for MCS
("F", CHANNELREFERENCE.E), # E + 1mu s
("G", CHANNELREFERENCE.T0),
("H", CHANNELREFERENCE.G),
]
class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
"""
Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG1
Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG1.
It will be triggered by a soft trigger from BEC or a hardware trigger from a beamline device (e.g. the Galil stages).
It is operated in standard mode, not burst mode and will trigger the EXT/EN of DDG2 (channel ab).
It is responsible for opening the shutter (channel cd) and sending an extra trigger to an or gate for the MCS card (channel ef).
"""
# pylint: disable=attribute-defined-outside-init
def on_connected(self) -> None:
"""Set the default values on the device - intended to overwrite everything to a usable default state.
Sets DEFAULT_IO_CONFIG into each channel,
sets the trigger source to DEFAULT_TRIGGER_SOURCE,
and turns off burst mode"""
"""
Set the default values on the device - intended to overwrite everything to a usable default state.
Sets DEFAULT_IO_CONFIG into each channel, sets the trigger source to DEFAULT_TRIGGER_SOURCE,
and turns off burst mode.
"""
self.burst_disable() # it is possible to miss setting settings if burst is enabled
for channel, config in DEFAULT_IO_CONFIG.items():
self.set_io_values(channel, **config)
self.set_trigger(DEFAULT_TRIGGER_SOURCE)
self.set_references_for_channels(
[
("A", CHANNELREFERENCE.T0),
("B", CHANNELREFERENCE.A),
("C", CHANNELREFERENCE.T0),
("D", CHANNELREFERENCE.C),
("E", CHANNELREFERENCE.D), # One extra pulse once shutter closes for MCS
("F", CHANNELREFERENCE.E),
("G", CHANNELREFERENCE.T0),
("H", CHANNELREFERENCE.G),
]
)
# Start background thread to poll status register
# self._status_polling_stop_event = Event()
# self._status_polling_thread = Thread(target=self._poll_status)
# self._status_polling_thread.start()
# atexit.register(self.on_destroy)
# def _poll_status(self):
# """The status register has to be actively pulled, triggering the proc_status results in
# event_status being updated, which in turn allows the StatusBitsCompareStatus from on_trigger
# to be updated and eventually resolve."""
# dispatcher = self.state.proc_status.cl.get_dispatcher()
# event = dispatcher.stop_event
# while not (self._status_polling_stop_event.is_set() or event.is_set()):
# try:
# # Call with timeout to avoid blocking in shutdown
# self.state.proc_status.put(1, timeout=1)
# except TimeoutError:
# # If any of the stop events are set, stop polling
# if self._status_polling_stop_event.is_set() or event.is_set():
# logger.info("Exiting _poll_status thread loop for DDG.")
# break
# time.sleep(1 / 5) # poll the status at 5 Hz
self.set_references_for_channels(DEFAULT_REFERENCES)
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Stage logic for the DDG1 device, being th main trigger delay generator for CSAXS.
For standard scans, it will be triggered by a soft trigger from BEC.
It also has a hardware trigger feeded into the EXT/EN for fly-scanning, i.e. Galil stages.
This DDG is always not in burst mode.
"""
self.burst_disable()
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"]
readout_time = self.scan_info.msg.scan_parameters["readout_time"]
if readout_time is not None and readout_time != 0:
# If we are given a single readout time from BEC, use it for all 4 channels
pulse_widths = [exp_time - readout_time] * len(DEFAULT_DEAD_TIMES_S)
else:
# Otherwise, derive the pulse widths from the default dead times defined above
pulse_widths = [exp_time - DEFAULT_DEAD_TIMES_S[ch] for ch in DEFAULT_DEAD_TIMES_S]
logger.info(f"setting pulse widths to {pulse_widths}")
self.set_delay_pairs(["ab", "cd", "ef", "gh"], delay=0, width=pulse_widths)
self.burst_enable(count=frames_per_trigger, delay=0, period=exp_time)
# Trigger DDG2
# a = t0 + 2ms, b = a + 1us
# a has reference to t0, b has reference to a
self.set_delay_pairs(channel="ab", delay=2e-3, width=1e-6)
# Trigger shutter
shutter_width = 2e-3 + exp_time * frames_per_trigger + 1e-3
# d = c/t0 + 2ms + exp_time * burst_count + 1ms
# c has reference to t0, d has reference to c
self.set_delay_pairs(channel="cd", delay=0, width=shutter_width)
# Trigger extra pulse for MCS OR gate
# f = e + 1us
# e has refernce to d, f has reference to e
self.set_delay_pairs(channel="ef", delay=0, width=1e-6)
def on_trigger(self) -> DeviceStatus | StatusBase | None:
""" Note, we need to add a delay to the StatusBits callback on the event_status.
"""Note, we need to add a delay to the StatusBits callback on the event_status.
If we don't then subsequent triggers may reach the DDG too early, and will be ignored. To
avoid this, we've added the option to specify a delay via add_delay, default here is 50ms."""
st = StatusBase()#StatusBitsCompareStatus(self.state.event_status, STATUSBITS.END_OF_BURST, run=False)
avoid this, we've added the option to specify a delay via add_delay, default here is 50ms.
"""
st = StatusBase()
self.cancel_on_stop(st)
self.trigger_shot.put(1, use_complete=True)
time.sleep(self.scan_info.msg.scan_parameters["exp_time"])
timer = 0
# TODO make asynchronous and nicer!
while st.done is False:
self.state.proc_status.put(1,use_complete=True)
# Do I need to give this time to update? ask Xiaoqiang!!
event_status = self.state.event_status.get()
if (STATUSBITS(event_status) & STATUSBITS.END_OF_BURST) == STATUSBITS.END_OF_BURST:
st.set_finished()
timer += 0.1
time.sleep(0.1)
if timer > 1:
st.set_exception(TimeoutError(f"Device {self.name} failed to finish trigger"))
break
time.sleep(0.05)
return st
self.cancel_on_stop(st)
status = self.wait_for_status(status=st, bit_event=STATUSBITS.END_OF_DELAY, timeout=2)
return status
def on_destroy(self) -> None:
return
if getattr(self, "_status_polling_stop_event", None) is not None:
self._status_polling_stop_event.set()
if getattr(self, "_status_polling_thread", None) is not None:
self._status_polling_thread.join(timeout=3)
def wait_for_status(
self, status: StatusBase, bit_event: STATUSBITS, timeout: float = 2
) -> None:
"""Wait for a event status bit to be set.
Args:
status (StatusBase): The status object to update.
bit_event (STATUSBITS): The event status bit to wait for.
timeout (float): Maximum time to wait for the event status bit to be set.
"""
current_time = time.time()
while not status.done:
self.state.proc_status.put(1, use_complete=True)
event_status = self.state.event_status.get()
if (STATUSBITS(event_status) & bit_event) == bit_event:
status.set_finished()
if time.time() - current_time > timeout:
status.set_exception(TimeoutError(f"Timeout waiting for status {status}"))
break
time.sleep(0.1)
time.sleep(0.05) # Give time for the IOC to be ready again
return status
def on_stop(self) -> None:
"""Stop the delay generator by setting the burst mode to 0"""
@@ -140,6 +164,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
if __name__ == "__main__":
ddg = DDG1(name="ddg", prefix="X12SA-CPCL-DDG1:")
ddg = DDG1(name="ddg1", prefix="X12SA-CPCL-DDG1:")
ddg.wait_for_connection(all_signals=True, timeout=30)
ddg.summary()

View File

@@ -1,6 +1,28 @@
import atexit
"""
DDG2 delay generator
This module implements the DDG2 delay generator logic for the CSAXS beamline.
Please check also the code for DDG1, aswell as the attached PDF trigger_scheme_ddg1_ddg2.pdf
The DDG2 is responsible for creating a burst of triggers for all relevant detectors.
It will receive a be triggered from the DDG1 through the EXT/EN channel.
A brief summary of the DDG2 logic:
DELAY PAIRS:
- EXT/EN is connected to the DDG1 delay pair ab.
- DelayPair ab is connected to a multiplexer, multiplexing the trigger to the detectors.
DELAY CHANNELS:
- a = t0
- b = a + (exp_time - READOUT_TIMES)
Burst mode is enabled:
- Burst count is set to the number of frames per trigger.
- Burst delay is set to 0.
- Burst period is set to the exposure time.
"""
import time
from threading import Event, Thread
from bec_lib.logger import bec_logger
from ophyd import DeviceStatus, StatusBase
@@ -14,9 +36,7 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import
AllChannelNames,
ChannelConfig,
DelayGeneratorCSAXS,
StatusBitsCompareStatus,
)
from csaxs_bec.devices.epics.delay_generator_csaxs.error_registry import ERROR_CODES
logger = bec_logger.logger
@@ -34,83 +54,83 @@ DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = {
"ef": _DEFAULT_CHANNEL_CONFIG,
"gh": _DEFAULT_CHANNEL_CONFIG,
}
DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.SINGLE_SHOT
DEFAULT_DEAD_TIMES_S = {"ab": 1e-3, "cd": 1e-3, "ef": 1e-3, "gh": 1e-3}
DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.EXT_RISING_EDGE
DEFAULT_READOUT_TIMES = {"ab": 2e-4, "cd": 2e-4, "ef": 2e-4, "gh": 2e-4} # 0.2 ms 5kHz
DEFAULT_REFERENCES: list[tuple[AllChannelNames, CHANNELREFERENCE]] = [
("A", CHANNELREFERENCE.T0),
("B", CHANNELREFERENCE.A),
("C", CHANNELREFERENCE.T0),
("D", CHANNELREFERENCE.C),
("E", CHANNELREFERENCE.T0),
("F", CHANNELREFERENCE.E),
("G", CHANNELREFERENCE.T0),
("H", CHANNELREFERENCE.G),
]
class DDG2(PSIDeviceBase, DelayGeneratorCSAXS):
"""
Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG1
Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG2.
This device is responsible for creating triggers in burst mode and is connected to a multiplexer that
distributes the trigger to the detectors. The DDG2 is triggered by the DDG1 through the EXT/EN channel.
"""
# pylint: disable=attribute-defined-outside-init
def on_connected(self) -> None:
"""Set the default values on the device - intended to overwrite everything to a usable default state.
Sets DEFAULT_IO_CONFIG into each channel,
sets the trigger source to DEFAULT_TRIGGER_SOURCE,
and turns off burst mode"""
"""
Set the default values on the device - intended to overwrite everything to a usable default state.
Sets DEFAULT_IO_CONFIG into each channel, sets the trigger source to DEFAULT_TRIGGER_SOURCE.
"""
self.burst_disable() # it is possible to miss setting settings if burst is enabled
for channel, config in DEFAULT_IO_CONFIG.items():
self.set_io_values(channel, **config)
self.set_trigger(DEFAULT_TRIGGER_SOURCE)
self.set_references_for_channels(
[
("A", CHANNELREFERENCE.T0),
("B", CHANNELREFERENCE.A),
("C", CHANNELREFERENCE.T0),
("D", CHANNELREFERENCE.C),
("E", CHANNELREFERENCE.T0),# One extra pulse once shutter closes for MCS
("F", CHANNELREFERENCE.E),
("G", CHANNELREFERENCE.T0),
("H", CHANNELREFERENCE.G),
]
)
self.set_references_for_channels(DEFAULT_REFERENCES)
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Stage logic for the DDG1 device, being th main trigger delay generator for CSAXS.
For standard scans, it will be triggered by a soft trigger from BEC.
It also has a hardware trigger feeded into the EXT/EN for fly-scanning, i.e. Galil stages.
This DDG is always not in burst mode.
"""
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"]
readout_time = self.scan_info.msg.scan_parameters["readout_time"]
if readout_time is not None and readout_time != 0:
# If we are given a single readout time from BEC, use it for all 4 channels
pulse_widths = [exp_time - readout_time] * len(DEFAULT_DEAD_TIMES_S)
else:
# Otherwise, derive the pulse widths from the default dead times defined above
pulse_widths = [exp_time - DEFAULT_DEAD_TIMES_S[ch] for ch in DEFAULT_DEAD_TIMES_S]
logger.info(f"setting pulse widths to {pulse_widths}")
self.set_delay_pairs(["ab", "cd", "ef", "gh"], delay=0, width=pulse_widths)
# a = t0
# a has reference to t0, b has reference to a
burst_pulse_width = exp_time - DEFAULT_READOUT_TIMES["ab"]
self.set_delay_pairs(channel="ab", delay=0, width=burst_pulse_width)
self.burst_enable(count=frames_per_trigger, delay=0, period=exp_time)
def on_trigger(self) -> DeviceStatus | StatusBase | None:
""" Note, we need to add a delay to the StatusBits callback on the event_status.
If we don't then subsequent triggers may reach the DDG too early, and will be ignored. To
avoid this, we've added the option to specify a delay via add_delay, default here is 50ms."""
st = StatusBase()#StatusBitsCompareStatus(self.state.event_status, STATUSBITS.END_OF_BURST, run=False)
self.cancel_on_stop(st)
self.trigger_shot.put(1, use_complete=True)
time.sleep(self.scan_info.msg.scan_parameters["exp_time"])
timer = 0
# TODO make asynchronous and nicer!
while st.done is False:
self.state.proc_status.put(1,use_complete=True)
# Do I need to give this time to update? ask Xiaoqiang!!
event_status = self.state.event_status.get()
if (STATUSBITS(event_status) & STATUSBITS.END_OF_BURST) == STATUSBITS.END_OF_BURST:
st.set_finished()
timer += 0.1
time.sleep(0.1)
if timer > 1:
st.set_exception(TimeoutError(f"Device {self.name} failed to finish trigger"))
break
time.sleep(0.05)
return st
"""
DDG2 will not receive a trigger from BEC, but will be triggered by the DDG1 through the EXT/EN channel.
"""
def on_destroy(self) -> None:
return
if getattr(self, "_status_polling_stop_event", None) is not None:
self._status_polling_stop_event.set()
if getattr(self, "_status_polling_thread", None) is not None:
self._status_polling_thread.join(timeout=3)
def wait_for_status(
self, status: StatusBase, bit_event: STATUSBITS, timeout: float = 2
) -> None:
"""Wait for a event status bit to be set.
Args:
status (StatusBase): The status object to update.
bit_event (STATUSBITS): The event status bit to wait for.
timeout (float): Maximum time to wait for the event status bit to be set.
"""
current_time = time.time()
while not status.done:
self.state.proc_status.put(1, use_complete=True)
event_status = self.state.event_status.get()
if (STATUSBITS(event_status) & bit_event) == bit_event:
status.set_finished()
if time.time() - current_time > timeout:
status.set_exception(TimeoutError(f"Timeout waiting for status {status}"))
break
time.sleep(0.1)
time.sleep(0.05) # Give time for the IOC to be ready again
return status
def on_stop(self) -> None:
"""Stop the delay generator by setting the burst mode to 0"""
@@ -118,6 +138,6 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS):
if __name__ == "__main__":
ddg = DDG1(name="ddg", prefix="X12SA-CPCL-DDG1:")
ddg = DDG2(name="ddg2", prefix="X12SA-CPCL-DDG2:")
ddg.wait_for_connection(all_signals=True, timeout=30)
ddg.summary()

View File

@@ -1,5 +1,5 @@
ERROR_CODES: dict[str, str] = {
"STATUS OK": "No more errors left in the queue.", # renamed apparently from the IOC for No Error to STATUS OK
"STATUS OK": "No more errors left in the queue.", # renamed apparently from the IOC for 'No Error' to 'STATUS OK'
"Illegal Value": "A parameter was out of range.",
"Illegal Mode": "The action is illegal in the current mode.",
"Illegal Delay": "The requested delay is out of range.",