feat: adding ddg1 and ddg2 logic for csaxs
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
ddg_master:
|
||||
ddg1:
|
||||
description: Main delay Generator for triggering
|
||||
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DDGMaster
|
||||
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DDG1
|
||||
enabled: true
|
||||
deviceConfig:
|
||||
prefix: 'X12SA-CPCL-DDG1:'
|
||||
@@ -9,6 +9,17 @@ ddg_master:
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: true
|
||||
|
||||
ddg2:
|
||||
description: Detector delay Generator for trigger burst
|
||||
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DDG2
|
||||
enabled: true
|
||||
deviceConfig:
|
||||
prefix: 'X12SA-CPCL-DDG2:'
|
||||
onFailure: raise
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
|
||||
samx:
|
||||
readoutPriority: baseline
|
||||
deviceClass: ophyd_devices.SimPositioner
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
from .ddg_1 import DDG1
|
||||
from .ddg_2 import DDG2
|
||||
|
||||
@@ -1,6 +1,35 @@
|
||||
import atexit
|
||||
"""
|
||||
DDG1 delay generator
|
||||
|
||||
This module implements the DDG1 delay generator logic for the CSAXS beamline.
|
||||
The attached PDF trigger_scheme_ddg1_ddg2.pdf provides a more detailed overview of
|
||||
the trigger scheme. If the logic changes in the future, it is highly recommended to
|
||||
update the PDF accordingly.
|
||||
|
||||
The DDG1 is the main trigger delay generator for the CSAXS beamline. It will
|
||||
receive either a soft trigger from BEC (depending on the scan type) or a hardware trigger
|
||||
from a beamline device (e.g. the Galil stages). It is responsible for opening the shutter
|
||||
and sending a trigger to the Delay Generator CSAXS (DDG2), which in turn will
|
||||
send the trigger to the detectors. DDG1 will not be witout burst mode, but rather in standard
|
||||
mode creating delays for the channels ab, cd, ef, gh.
|
||||
|
||||
A brief summary of the DDG1 logic:
|
||||
DELAY PAIRS:
|
||||
- DelayPair ab is connected to the EXT/EN of DDG2.
|
||||
- DelayPair cd is connected to the SHUTTER.
|
||||
- DelayPair ef is connected to an OR gate together with the detector
|
||||
PULSE train for the MCS card. The MCS card needs one extra pulse to forward points.
|
||||
|
||||
DELAY CHANNELS:
|
||||
- a = t0 + 2ms (2ms delay to allow the shutter to open)
|
||||
- b = a + 1us (short pulse)
|
||||
- c = t0
|
||||
- d = a + exp_time * burst_count + 1ms (to allow the shutter to close)
|
||||
- e = d
|
||||
- f = e + 1us (short pulse to OR gate for MCS triggering)
|
||||
"""
|
||||
|
||||
import time
|
||||
from threading import Event, Thread
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import DeviceStatus, StatusBase
|
||||
@@ -14,9 +43,7 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import
|
||||
AllChannelNames,
|
||||
ChannelConfig,
|
||||
DelayGeneratorCSAXS,
|
||||
StatusBitsCompareStatus,
|
||||
)
|
||||
from csaxs_bec.devices.epics.delay_generator_csaxs.error_registry import ERROR_CODES
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
@@ -35,104 +62,101 @@ DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = {
|
||||
"gh": _DEFAULT_CHANNEL_CONFIG,
|
||||
}
|
||||
DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.SINGLE_SHOT
|
||||
DEFAULT_DEAD_TIMES_S = {"ab": 1e-3, "cd": 1e-3, "ef": 1e-3, "gh": 1e-3}
|
||||
DEFAULT_READOUT_TIMES = {"ab": 2e-4, "cd": 2e-4, "ef": 2e-4, "gh": 2e-4} # 0.2 ms 5kHz
|
||||
|
||||
DEFAULT_REFERENCES: list[tuple[AllChannelNames, CHANNELREFERENCE]] = [
|
||||
("A", CHANNELREFERENCE.T0), # T0 + 2ms delay
|
||||
("B", CHANNELREFERENCE.A),
|
||||
("C", CHANNELREFERENCE.T0), # T0
|
||||
("D", CHANNELREFERENCE.C),
|
||||
("E", CHANNELREFERENCE.D), # D One extra pulse once shutter closes for MCS
|
||||
("F", CHANNELREFERENCE.E), # E + 1mu s
|
||||
("G", CHANNELREFERENCE.T0),
|
||||
("H", CHANNELREFERENCE.G),
|
||||
]
|
||||
|
||||
|
||||
class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
"""
|
||||
Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG1
|
||||
Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG1.
|
||||
It will be triggered by a soft trigger from BEC or a hardware trigger from a beamline device (e.g. the Galil stages).
|
||||
It is operated in standard mode, not burst mode and will trigger the EXT/EN of DDG2 (channel ab).
|
||||
It is responsible for opening the shutter (channel cd) and sending an extra trigger to an or gate for the MCS card (channel ef).
|
||||
"""
|
||||
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
def on_connected(self) -> None:
|
||||
"""Set the default values on the device - intended to overwrite everything to a usable default state.
|
||||
Sets DEFAULT_IO_CONFIG into each channel,
|
||||
sets the trigger source to DEFAULT_TRIGGER_SOURCE,
|
||||
and turns off burst mode"""
|
||||
"""
|
||||
Set the default values on the device - intended to overwrite everything to a usable default state.
|
||||
Sets DEFAULT_IO_CONFIG into each channel, sets the trigger source to DEFAULT_TRIGGER_SOURCE,
|
||||
and turns off burst mode.
|
||||
"""
|
||||
self.burst_disable() # it is possible to miss setting settings if burst is enabled
|
||||
for channel, config in DEFAULT_IO_CONFIG.items():
|
||||
self.set_io_values(channel, **config)
|
||||
self.set_trigger(DEFAULT_TRIGGER_SOURCE)
|
||||
self.set_references_for_channels(
|
||||
[
|
||||
("A", CHANNELREFERENCE.T0),
|
||||
("B", CHANNELREFERENCE.A),
|
||||
("C", CHANNELREFERENCE.T0),
|
||||
("D", CHANNELREFERENCE.C),
|
||||
("E", CHANNELREFERENCE.D), # One extra pulse once shutter closes for MCS
|
||||
("F", CHANNELREFERENCE.E),
|
||||
("G", CHANNELREFERENCE.T0),
|
||||
("H", CHANNELREFERENCE.G),
|
||||
]
|
||||
)
|
||||
# Start background thread to poll status register
|
||||
# self._status_polling_stop_event = Event()
|
||||
# self._status_polling_thread = Thread(target=self._poll_status)
|
||||
# self._status_polling_thread.start()
|
||||
# atexit.register(self.on_destroy)
|
||||
|
||||
# def _poll_status(self):
|
||||
# """The status register has to be actively pulled, triggering the proc_status results in
|
||||
# event_status being updated, which in turn allows the StatusBitsCompareStatus from on_trigger
|
||||
# to be updated and eventually resolve."""
|
||||
# dispatcher = self.state.proc_status.cl.get_dispatcher()
|
||||
# event = dispatcher.stop_event
|
||||
# while not (self._status_polling_stop_event.is_set() or event.is_set()):
|
||||
# try:
|
||||
# # Call with timeout to avoid blocking in shutdown
|
||||
# self.state.proc_status.put(1, timeout=1)
|
||||
# except TimeoutError:
|
||||
# # If any of the stop events are set, stop polling
|
||||
# if self._status_polling_stop_event.is_set() or event.is_set():
|
||||
# logger.info("Exiting _poll_status thread loop for DDG.")
|
||||
# break
|
||||
# time.sleep(1 / 5) # poll the status at 5 Hz
|
||||
self.set_references_for_channels(DEFAULT_REFERENCES)
|
||||
|
||||
def on_stage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""
|
||||
Stage logic for the DDG1 device, being th main trigger delay generator for CSAXS.
|
||||
For standard scans, it will be triggered by a soft trigger from BEC.
|
||||
It also has a hardware trigger feeded into the EXT/EN for fly-scanning, i.e. Galil stages.
|
||||
|
||||
This DDG is always not in burst mode.
|
||||
"""
|
||||
self.burst_disable()
|
||||
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
|
||||
frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"]
|
||||
readout_time = self.scan_info.msg.scan_parameters["readout_time"]
|
||||
|
||||
if readout_time is not None and readout_time != 0:
|
||||
# If we are given a single readout time from BEC, use it for all 4 channels
|
||||
pulse_widths = [exp_time - readout_time] * len(DEFAULT_DEAD_TIMES_S)
|
||||
else:
|
||||
# Otherwise, derive the pulse widths from the default dead times defined above
|
||||
pulse_widths = [exp_time - DEFAULT_DEAD_TIMES_S[ch] for ch in DEFAULT_DEAD_TIMES_S]
|
||||
logger.info(f"setting pulse widths to {pulse_widths}")
|
||||
self.set_delay_pairs(["ab", "cd", "ef", "gh"], delay=0, width=pulse_widths)
|
||||
self.burst_enable(count=frames_per_trigger, delay=0, period=exp_time)
|
||||
# Trigger DDG2
|
||||
# a = t0 + 2ms, b = a + 1us
|
||||
# a has reference to t0, b has reference to a
|
||||
self.set_delay_pairs(channel="ab", delay=2e-3, width=1e-6)
|
||||
# Trigger shutter
|
||||
shutter_width = 2e-3 + exp_time * frames_per_trigger + 1e-3
|
||||
# d = c/t0 + 2ms + exp_time * burst_count + 1ms
|
||||
# c has reference to t0, d has reference to c
|
||||
self.set_delay_pairs(channel="cd", delay=0, width=shutter_width)
|
||||
# Trigger extra pulse for MCS OR gate
|
||||
# f = e + 1us
|
||||
# e has refernce to d, f has reference to e
|
||||
self.set_delay_pairs(channel="ef", delay=0, width=1e-6)
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
||||
""" Note, we need to add a delay to the StatusBits callback on the event_status.
|
||||
"""Note, we need to add a delay to the StatusBits callback on the event_status.
|
||||
If we don't then subsequent triggers may reach the DDG too early, and will be ignored. To
|
||||
avoid this, we've added the option to specify a delay via add_delay, default here is 50ms."""
|
||||
st = StatusBase()#StatusBitsCompareStatus(self.state.event_status, STATUSBITS.END_OF_BURST, run=False)
|
||||
avoid this, we've added the option to specify a delay via add_delay, default here is 50ms.
|
||||
"""
|
||||
st = StatusBase()
|
||||
self.cancel_on_stop(st)
|
||||
self.trigger_shot.put(1, use_complete=True)
|
||||
time.sleep(self.scan_info.msg.scan_parameters["exp_time"])
|
||||
timer = 0
|
||||
# TODO make asynchronous and nicer!
|
||||
while st.done is False:
|
||||
self.state.proc_status.put(1,use_complete=True)
|
||||
# Do I need to give this time to update? ask Xiaoqiang!!
|
||||
event_status = self.state.event_status.get()
|
||||
if (STATUSBITS(event_status) & STATUSBITS.END_OF_BURST) == STATUSBITS.END_OF_BURST:
|
||||
st.set_finished()
|
||||
timer += 0.1
|
||||
time.sleep(0.1)
|
||||
if timer > 1:
|
||||
st.set_exception(TimeoutError(f"Device {self.name} failed to finish trigger"))
|
||||
break
|
||||
time.sleep(0.05)
|
||||
return st
|
||||
self.cancel_on_stop(st)
|
||||
status = self.wait_for_status(status=st, bit_event=STATUSBITS.END_OF_DELAY, timeout=2)
|
||||
return status
|
||||
|
||||
def on_destroy(self) -> None:
|
||||
return
|
||||
if getattr(self, "_status_polling_stop_event", None) is not None:
|
||||
self._status_polling_stop_event.set()
|
||||
if getattr(self, "_status_polling_thread", None) is not None:
|
||||
self._status_polling_thread.join(timeout=3)
|
||||
def wait_for_status(
|
||||
self, status: StatusBase, bit_event: STATUSBITS, timeout: float = 2
|
||||
) -> None:
|
||||
"""Wait for a event status bit to be set.
|
||||
|
||||
Args:
|
||||
status (StatusBase): The status object to update.
|
||||
bit_event (STATUSBITS): The event status bit to wait for.
|
||||
timeout (float): Maximum time to wait for the event status bit to be set.
|
||||
"""
|
||||
current_time = time.time()
|
||||
while not status.done:
|
||||
self.state.proc_status.put(1, use_complete=True)
|
||||
event_status = self.state.event_status.get()
|
||||
if (STATUSBITS(event_status) & bit_event) == bit_event:
|
||||
status.set_finished()
|
||||
if time.time() - current_time > timeout:
|
||||
status.set_exception(TimeoutError(f"Timeout waiting for status {status}"))
|
||||
break
|
||||
time.sleep(0.1)
|
||||
time.sleep(0.05) # Give time for the IOC to be ready again
|
||||
return status
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Stop the delay generator by setting the burst mode to 0"""
|
||||
@@ -140,6 +164,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
ddg = DDG1(name="ddg", prefix="X12SA-CPCL-DDG1:")
|
||||
ddg = DDG1(name="ddg1", prefix="X12SA-CPCL-DDG1:")
|
||||
ddg.wait_for_connection(all_signals=True, timeout=30)
|
||||
ddg.summary()
|
||||
|
||||
@@ -1,6 +1,28 @@
|
||||
import atexit
|
||||
"""
|
||||
DDG2 delay generator
|
||||
|
||||
This module implements the DDG2 delay generator logic for the CSAXS beamline.
|
||||
Please check also the code for DDG1, aswell as the attached PDF trigger_scheme_ddg1_ddg2.pdf
|
||||
|
||||
The DDG2 is responsible for creating a burst of triggers for all relevant detectors.
|
||||
It will receive a be triggered from the DDG1 through the EXT/EN channel.
|
||||
|
||||
A brief summary of the DDG2 logic:
|
||||
DELAY PAIRS:
|
||||
- EXT/EN is connected to the DDG1 delay pair ab.
|
||||
- DelayPair ab is connected to a multiplexer, multiplexing the trigger to the detectors.
|
||||
|
||||
DELAY CHANNELS:
|
||||
- a = t0
|
||||
- b = a + (exp_time - READOUT_TIMES)
|
||||
|
||||
Burst mode is enabled:
|
||||
- Burst count is set to the number of frames per trigger.
|
||||
- Burst delay is set to 0.
|
||||
- Burst period is set to the exposure time.
|
||||
"""
|
||||
|
||||
import time
|
||||
from threading import Event, Thread
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import DeviceStatus, StatusBase
|
||||
@@ -14,9 +36,7 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import
|
||||
AllChannelNames,
|
||||
ChannelConfig,
|
||||
DelayGeneratorCSAXS,
|
||||
StatusBitsCompareStatus,
|
||||
)
|
||||
from csaxs_bec.devices.epics.delay_generator_csaxs.error_registry import ERROR_CODES
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
@@ -34,83 +54,83 @@ DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = {
|
||||
"ef": _DEFAULT_CHANNEL_CONFIG,
|
||||
"gh": _DEFAULT_CHANNEL_CONFIG,
|
||||
}
|
||||
DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.SINGLE_SHOT
|
||||
DEFAULT_DEAD_TIMES_S = {"ab": 1e-3, "cd": 1e-3, "ef": 1e-3, "gh": 1e-3}
|
||||
DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.EXT_RISING_EDGE
|
||||
DEFAULT_READOUT_TIMES = {"ab": 2e-4, "cd": 2e-4, "ef": 2e-4, "gh": 2e-4} # 0.2 ms 5kHz
|
||||
|
||||
DEFAULT_REFERENCES: list[tuple[AllChannelNames, CHANNELREFERENCE]] = [
|
||||
("A", CHANNELREFERENCE.T0),
|
||||
("B", CHANNELREFERENCE.A),
|
||||
("C", CHANNELREFERENCE.T0),
|
||||
("D", CHANNELREFERENCE.C),
|
||||
("E", CHANNELREFERENCE.T0),
|
||||
("F", CHANNELREFERENCE.E),
|
||||
("G", CHANNELREFERENCE.T0),
|
||||
("H", CHANNELREFERENCE.G),
|
||||
]
|
||||
|
||||
|
||||
class DDG2(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
"""
|
||||
Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG1
|
||||
Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG2.
|
||||
This device is responsible for creating triggers in burst mode and is connected to a multiplexer that
|
||||
distributes the trigger to the detectors. The DDG2 is triggered by the DDG1 through the EXT/EN channel.
|
||||
"""
|
||||
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
def on_connected(self) -> None:
|
||||
"""Set the default values on the device - intended to overwrite everything to a usable default state.
|
||||
Sets DEFAULT_IO_CONFIG into each channel,
|
||||
sets the trigger source to DEFAULT_TRIGGER_SOURCE,
|
||||
and turns off burst mode"""
|
||||
"""
|
||||
Set the default values on the device - intended to overwrite everything to a usable default state.
|
||||
Sets DEFAULT_IO_CONFIG into each channel, sets the trigger source to DEFAULT_TRIGGER_SOURCE.
|
||||
"""
|
||||
self.burst_disable() # it is possible to miss setting settings if burst is enabled
|
||||
for channel, config in DEFAULT_IO_CONFIG.items():
|
||||
self.set_io_values(channel, **config)
|
||||
self.set_trigger(DEFAULT_TRIGGER_SOURCE)
|
||||
self.set_references_for_channels(
|
||||
[
|
||||
("A", CHANNELREFERENCE.T0),
|
||||
("B", CHANNELREFERENCE.A),
|
||||
("C", CHANNELREFERENCE.T0),
|
||||
("D", CHANNELREFERENCE.C),
|
||||
("E", CHANNELREFERENCE.T0),# One extra pulse once shutter closes for MCS
|
||||
("F", CHANNELREFERENCE.E),
|
||||
("G", CHANNELREFERENCE.T0),
|
||||
("H", CHANNELREFERENCE.G),
|
||||
]
|
||||
)
|
||||
self.set_references_for_channels(DEFAULT_REFERENCES)
|
||||
|
||||
def on_stage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""
|
||||
Stage logic for the DDG1 device, being th main trigger delay generator for CSAXS.
|
||||
For standard scans, it will be triggered by a soft trigger from BEC.
|
||||
It also has a hardware trigger feeded into the EXT/EN for fly-scanning, i.e. Galil stages.
|
||||
|
||||
This DDG is always not in burst mode.
|
||||
"""
|
||||
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
|
||||
frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"]
|
||||
readout_time = self.scan_info.msg.scan_parameters["readout_time"]
|
||||
|
||||
if readout_time is not None and readout_time != 0:
|
||||
# If we are given a single readout time from BEC, use it for all 4 channels
|
||||
pulse_widths = [exp_time - readout_time] * len(DEFAULT_DEAD_TIMES_S)
|
||||
else:
|
||||
# Otherwise, derive the pulse widths from the default dead times defined above
|
||||
pulse_widths = [exp_time - DEFAULT_DEAD_TIMES_S[ch] for ch in DEFAULT_DEAD_TIMES_S]
|
||||
logger.info(f"setting pulse widths to {pulse_widths}")
|
||||
self.set_delay_pairs(["ab", "cd", "ef", "gh"], delay=0, width=pulse_widths)
|
||||
# a = t0
|
||||
# a has reference to t0, b has reference to a
|
||||
burst_pulse_width = exp_time - DEFAULT_READOUT_TIMES["ab"]
|
||||
self.set_delay_pairs(channel="ab", delay=0, width=burst_pulse_width)
|
||||
self.burst_enable(count=frames_per_trigger, delay=0, period=exp_time)
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
||||
""" Note, we need to add a delay to the StatusBits callback on the event_status.
|
||||
If we don't then subsequent triggers may reach the DDG too early, and will be ignored. To
|
||||
avoid this, we've added the option to specify a delay via add_delay, default here is 50ms."""
|
||||
st = StatusBase()#StatusBitsCompareStatus(self.state.event_status, STATUSBITS.END_OF_BURST, run=False)
|
||||
self.cancel_on_stop(st)
|
||||
self.trigger_shot.put(1, use_complete=True)
|
||||
time.sleep(self.scan_info.msg.scan_parameters["exp_time"])
|
||||
timer = 0
|
||||
# TODO make asynchronous and nicer!
|
||||
while st.done is False:
|
||||
self.state.proc_status.put(1,use_complete=True)
|
||||
# Do I need to give this time to update? ask Xiaoqiang!!
|
||||
event_status = self.state.event_status.get()
|
||||
if (STATUSBITS(event_status) & STATUSBITS.END_OF_BURST) == STATUSBITS.END_OF_BURST:
|
||||
st.set_finished()
|
||||
timer += 0.1
|
||||
time.sleep(0.1)
|
||||
if timer > 1:
|
||||
st.set_exception(TimeoutError(f"Device {self.name} failed to finish trigger"))
|
||||
break
|
||||
time.sleep(0.05)
|
||||
return st
|
||||
"""
|
||||
DDG2 will not receive a trigger from BEC, but will be triggered by the DDG1 through the EXT/EN channel.
|
||||
"""
|
||||
|
||||
def on_destroy(self) -> None:
|
||||
return
|
||||
if getattr(self, "_status_polling_stop_event", None) is not None:
|
||||
self._status_polling_stop_event.set()
|
||||
if getattr(self, "_status_polling_thread", None) is not None:
|
||||
self._status_polling_thread.join(timeout=3)
|
||||
def wait_for_status(
|
||||
self, status: StatusBase, bit_event: STATUSBITS, timeout: float = 2
|
||||
) -> None:
|
||||
"""Wait for a event status bit to be set.
|
||||
|
||||
Args:
|
||||
status (StatusBase): The status object to update.
|
||||
bit_event (STATUSBITS): The event status bit to wait for.
|
||||
timeout (float): Maximum time to wait for the event status bit to be set.
|
||||
"""
|
||||
current_time = time.time()
|
||||
while not status.done:
|
||||
self.state.proc_status.put(1, use_complete=True)
|
||||
event_status = self.state.event_status.get()
|
||||
if (STATUSBITS(event_status) & bit_event) == bit_event:
|
||||
status.set_finished()
|
||||
if time.time() - current_time > timeout:
|
||||
status.set_exception(TimeoutError(f"Timeout waiting for status {status}"))
|
||||
break
|
||||
time.sleep(0.1)
|
||||
time.sleep(0.05) # Give time for the IOC to be ready again
|
||||
return status
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Stop the delay generator by setting the burst mode to 0"""
|
||||
@@ -118,6 +138,6 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
ddg = DDG1(name="ddg", prefix="X12SA-CPCL-DDG1:")
|
||||
ddg = DDG2(name="ddg2", prefix="X12SA-CPCL-DDG2:")
|
||||
ddg.wait_for_connection(all_signals=True, timeout=30)
|
||||
ddg.summary()
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
ERROR_CODES: dict[str, str] = {
|
||||
"STATUS OK": "No more errors left in the queue.", # renamed apparently from the IOC for No Error to STATUS OK
|
||||
"STATUS OK": "No more errors left in the queue.", # renamed apparently from the IOC for 'No Error' to 'STATUS OK'
|
||||
"Illegal Value": "A parameter was out of range.",
|
||||
"Illegal Mode": "The action is illegal in the current mode.",
|
||||
"Illegal Delay": "The requested delay is out of range.",
|
||||
|
||||
Binary file not shown.
Reference in New Issue
Block a user