|
|
|
|
@@ -31,11 +31,14 @@ DELAY CHANNELS:
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import threading
|
|
|
|
|
import time
|
|
|
|
|
from concurrent.futures import Future, ThreadPoolExecutor
|
|
|
|
|
from typing import TYPE_CHECKING
|
|
|
|
|
|
|
|
|
|
from bec_lib.logger import bec_logger
|
|
|
|
|
from ophyd import DeviceStatus, StatusBase
|
|
|
|
|
from ophyd import Component as Cpt
|
|
|
|
|
from ophyd import DeviceStatus, Signal, StatusBase
|
|
|
|
|
from ophyd_devices import CompareStatus, TransitionStatus
|
|
|
|
|
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
|
|
|
|
|
|
|
|
|
@@ -48,6 +51,7 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import
|
|
|
|
|
AllChannelNames,
|
|
|
|
|
ChannelConfig,
|
|
|
|
|
DelayGeneratorCSAXS,
|
|
|
|
|
StatusBitsCompareStatus,
|
|
|
|
|
)
|
|
|
|
|
from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import ACQUIRING, READYTOREAD
|
|
|
|
|
|
|
|
|
|
@@ -110,6 +114,9 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
|
|
|
|
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
|
|
|
|
|
)
|
|
|
|
|
self.device_manager = device_manager
|
|
|
|
|
self._threadpool = ThreadPoolExecutor(max_workers=1)
|
|
|
|
|
self._status_thread_event = threading.Event()
|
|
|
|
|
self._status_future = None # Future for the status job
|
|
|
|
|
|
|
|
|
|
# pylint: disable=attribute-defined-outside-init
|
|
|
|
|
def on_connected(self) -> None:
|
|
|
|
|
@@ -124,7 +131,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
|
|
|
|
self.set_trigger(DEFAULT_TRIGGER_SOURCE)
|
|
|
|
|
self.set_references_for_channels(DEFAULT_REFERENCES)
|
|
|
|
|
# Set proc status to passively update with 5Hz (0.2s)
|
|
|
|
|
self.state.proc_status_mode.put(PROC_EVENT_MODE.FREQ_5HZ)
|
|
|
|
|
self.state.proc_status_mode.put(PROC_EVENT_MODE.EVENT)
|
|
|
|
|
|
|
|
|
|
def on_stage(self) -> DeviceStatus | StatusBase | None:
|
|
|
|
|
"""
|
|
|
|
|
@@ -134,7 +141,8 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
|
|
|
|
|
|
|
|
|
This DDG is always not in burst mode.
|
|
|
|
|
"""
|
|
|
|
|
self.burst_disable()
|
|
|
|
|
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
|
|
|
|
|
self.burst_enable(1, 0, exp_time)
|
|
|
|
|
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
|
|
|
|
|
frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"]
|
|
|
|
|
# Trigger DDG2
|
|
|
|
|
@@ -151,73 +159,96 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
|
|
|
|
# e has refernce to d, f has reference to e
|
|
|
|
|
self.set_delay_pairs(channel="ef", delay=0, width=1e-6)
|
|
|
|
|
|
|
|
|
|
def _prepare_mcs_on_trigger(self, mcs: MCSCardCSAXS) -> None:
|
|
|
|
|
"""Prepare the MCS card for the next trigger.
|
|
|
|
|
This method holds the logic to ensure that the MCS card is ready to read.
|
|
|
|
|
It's logic is coupled to the MCS card implementation and the DDG1 trigger logic.
|
|
|
|
|
"""
|
|
|
|
|
status_ready_read = CompareStatus(mcs.ready_to_read, READYTOREAD.DONE)
|
|
|
|
|
mcs.stop_all.put(1)
|
|
|
|
|
status_acquiring = TransitionStatus(mcs.acquiring, [ACQUIRING.DONE, ACQUIRING.ACQUIRING])
|
|
|
|
|
self.cancel_on_stop(status_ready_read)
|
|
|
|
|
self.cancel_on_stop(status_acquiring)
|
|
|
|
|
status_ready_read.wait(10)
|
|
|
|
|
|
|
|
|
|
mcs.ready_to_read.put(READYTOREAD.PROCESSING)
|
|
|
|
|
mcs.erase_start.put(1)
|
|
|
|
|
status_acquiring.wait(timeout=10) # Allow 10 seconds in case communication is slow
|
|
|
|
|
|
|
|
|
|
def _reset_status_event_future(self) -> None:
|
|
|
|
|
"""Reset the status future and thread event."""
|
|
|
|
|
if self._status_future and not self._status_future.done():
|
|
|
|
|
self._status_thread_event.set()
|
|
|
|
|
self._status_future.result(timeout=10) # Allow 10 seconds for communication to finish
|
|
|
|
|
|
|
|
|
|
# Reset to known state
|
|
|
|
|
self._status_thread_event.clear()
|
|
|
|
|
self.state.proc_status.put(1, use_complete=True)
|
|
|
|
|
|
|
|
|
|
def _prepare_trigger_status_event(self, timeout: float | None = None) -> DeviceStatus:
|
|
|
|
|
"""Prepare the trigger status event for the DDG1, and trigger the de"""
|
|
|
|
|
if timeout is None:
|
|
|
|
|
# Default timeout of 5 seconds + exposure time * frames_per_trigger
|
|
|
|
|
timeout = 5 + self.scan_info.msg.scan_parameters.get(
|
|
|
|
|
"exp_time", 0.1
|
|
|
|
|
) * self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
|
|
|
|
|
|
|
|
|
|
# Callback to cancel the status if the device is stopped
|
|
|
|
|
def cancel_cb(status: CompareStatus) -> None:
|
|
|
|
|
"""Callback to cancel the status if the device is stopped."""
|
|
|
|
|
self._status_thread_event.set()
|
|
|
|
|
|
|
|
|
|
status = StatusBitsCompareStatus(
|
|
|
|
|
self.state.event_status, STATUSBITS.END_OF_BURST, timeout=timeout, run=False
|
|
|
|
|
)
|
|
|
|
|
status.add_callback(cancel_cb)
|
|
|
|
|
self.cancel_on_stop(status)
|
|
|
|
|
|
|
|
|
|
# Callback to poll events, this gets executed in a separate thread by the threadpool
|
|
|
|
|
def _status_job():
|
|
|
|
|
"""Callback to poll event status an update the status_event signal."""
|
|
|
|
|
while not self._status_thread_event.is_set():
|
|
|
|
|
self.state.proc_status.put(1, use_complete=True)
|
|
|
|
|
time.sleep(0.02) # 20ms delay for processing
|
|
|
|
|
self.state.event_status.get(use_monitor=False)
|
|
|
|
|
time.sleep(0.02) # 20ms delay for processing
|
|
|
|
|
|
|
|
|
|
# Submit the status job to the threadpool
|
|
|
|
|
self._status_future = self._threadpool.submit(_status_job)
|
|
|
|
|
return status
|
|
|
|
|
|
|
|
|
|
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
|
|
|
|
"""Note, we need to add a delay to the StatusBits callback on the event_status.
|
|
|
|
|
If we don't then subsequent triggers may reach the DDG too early, and will be ignored. To
|
|
|
|
|
avoid this, we've added the option to specify a delay via add_delay, default here is 50ms.
|
|
|
|
|
"""
|
|
|
|
|
status = CompareStatus(self.state.event_status, STATUSBITS.NONE)
|
|
|
|
|
self.cancel_on_stop(status)
|
|
|
|
|
# Make sure to reset the status future and thread event
|
|
|
|
|
self._reset_status_event_future()
|
|
|
|
|
|
|
|
|
|
# Prepare the MCS card for the next software trigger
|
|
|
|
|
mcs = self.device_manager.devices.get("mcs", None)
|
|
|
|
|
if mcs is None:
|
|
|
|
|
logger.info("Did not find mcs card with name 'mcs' in current session")
|
|
|
|
|
else:
|
|
|
|
|
mcs: MCSCardCSAXS
|
|
|
|
|
status_ready_read = CompareStatus(mcs.ready_to_read, READYTOREAD.DONE)
|
|
|
|
|
mcs.stop_all.put(1)
|
|
|
|
|
status_acquiring = TransitionStatus(
|
|
|
|
|
mcs.acquiring, [ACQUIRING.DONE, ACQUIRING.ACQUIRING]
|
|
|
|
|
)
|
|
|
|
|
self.cancel_on_stop(status_ready_read)
|
|
|
|
|
self.cancel_on_stop(status_acquiring)
|
|
|
|
|
status_ready_read.wait(10)
|
|
|
|
|
|
|
|
|
|
mcs.ready_to_read.put(READYTOREAD.PROCESSING)
|
|
|
|
|
mcs.erase_start.put(1)
|
|
|
|
|
status_acquiring.wait(
|
|
|
|
|
timeout=10
|
|
|
|
|
) # 2 s wait for mcs card to start should be more than enough..
|
|
|
|
|
status.wait(timeout=10)
|
|
|
|
|
# Default timeout of 5 seconds + exposure time * frames_per_trigger
|
|
|
|
|
timeout = 5 + self.scan_info.msg.scan_parameters.get(
|
|
|
|
|
"exp_time", 0.1
|
|
|
|
|
) * self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
|
|
|
|
|
status = CompareStatus(self.state.event_status, STATUSBITS.END_OF_DELAY, timeout=timeout)
|
|
|
|
|
self.cancel_on_stop(status)
|
|
|
|
|
self._prepare_mcs_on_trigger(mcs)
|
|
|
|
|
# Prepare status to wait for the end of burst
|
|
|
|
|
status = self._prepare_trigger_status_event()
|
|
|
|
|
# Trigger the DDG1
|
|
|
|
|
self.trigger_shot.put(1, use_complete=True)
|
|
|
|
|
return status
|
|
|
|
|
|
|
|
|
|
def wait_for_status(
|
|
|
|
|
self, status: DeviceStatus, bit_event: STATUSBITS, timeout: float = 2
|
|
|
|
|
) -> None:
|
|
|
|
|
"""Wait for a event status bit to be set.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
status (DeviceStatus): The status object to update.
|
|
|
|
|
bit_event (STATUSBITS): The event status bit to wait for.
|
|
|
|
|
timeout (float): Maximum time to wait for the event status bit to be set.
|
|
|
|
|
"""
|
|
|
|
|
current_time = time.time()
|
|
|
|
|
while not status.done:
|
|
|
|
|
self.state.proc_status.put(1, use_complete=True)
|
|
|
|
|
event_status = self.state.event_status.get()
|
|
|
|
|
if (STATUSBITS(event_status) & bit_event) == bit_event:
|
|
|
|
|
status.set_finished()
|
|
|
|
|
if time.time() - current_time > timeout:
|
|
|
|
|
status.set_exception(
|
|
|
|
|
TimeoutError(
|
|
|
|
|
f"Timeout waiting for status of device {self.name} for event_status {bit_event}"
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
break
|
|
|
|
|
time.sleep(0.1)
|
|
|
|
|
time.sleep(0.05) # Give time for the IOC to be ready again
|
|
|
|
|
return status
|
|
|
|
|
|
|
|
|
|
def on_stop(self) -> None:
|
|
|
|
|
"""Stop the delay generator by setting the burst mode to 0"""
|
|
|
|
|
self.stop_ddg()
|
|
|
|
|
|
|
|
|
|
def on_destroy(self):
|
|
|
|
|
"""Clean up resources when the device is destroyed."""
|
|
|
|
|
self._status_thread_event.set()
|
|
|
|
|
if hasattr(self, "_threadpool") and self._threadpool is not None:
|
|
|
|
|
self._threadpool.shutdown(wait=False)
|
|
|
|
|
self._threadpool = None
|
|
|
|
|
super().on_destroy()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
ddg = DDG1(name="ddg1", prefix="X12SA-CPCL-DDG1:")
|
|
|
|
|
|