diff --git a/csaxs_bec/device_configs/endstation.yaml b/csaxs_bec/device_configs/endstation.yaml index 1bd8ef6..c9a2c20 100644 --- a/csaxs_bec/device_configs/endstation.yaml +++ b/csaxs_bec/device_configs/endstation.yaml @@ -32,7 +32,7 @@ mcs: ids_cam: description: IDS camera for live image acquisition - deviceClass: csaxs_bec.devices.ids_cameras.ids_camera_new.IDSCamera + deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera deviceConfig: camera_id: 201 bits_per_pixel: 24 diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py index 7b097f6..f12a0b7 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py @@ -31,11 +31,14 @@ DELAY CHANNELS: from __future__ import annotations +import threading import time +from concurrent.futures import Future, ThreadPoolExecutor from typing import TYPE_CHECKING from bec_lib.logger import bec_logger -from ophyd import DeviceStatus, StatusBase +from ophyd import Component as Cpt +from ophyd import DeviceStatus, Signal, StatusBase from ophyd_devices import CompareStatus, TransitionStatus from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase @@ -48,6 +51,7 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import AllChannelNames, ChannelConfig, DelayGeneratorCSAXS, + StatusBitsCompareStatus, ) from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import ACQUIRING, READYTOREAD @@ -110,6 +114,9 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs ) self.device_manager = device_manager + self._threadpool = ThreadPoolExecutor(max_workers=1) + self._status_thread_event = threading.Event() + self._status_future = None # Future for the status job # pylint: disable=attribute-defined-outside-init def on_connected(self) -> None: @@ -124,7 +131,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): self.set_trigger(DEFAULT_TRIGGER_SOURCE) self.set_references_for_channels(DEFAULT_REFERENCES) # Set proc status to passively update with 5Hz (0.2s) - self.state.proc_status_mode.put(PROC_EVENT_MODE.FREQ_5HZ) + self.state.proc_status_mode.put(PROC_EVENT_MODE.EVENT) def on_stage(self) -> DeviceStatus | StatusBase | None: """ @@ -134,7 +141,8 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): This DDG is always not in burst mode. """ - self.burst_disable() + exp_time = self.scan_info.msg.scan_parameters["exp_time"] + self.burst_enable(1, 0, exp_time) exp_time = self.scan_info.msg.scan_parameters["exp_time"] frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"] # Trigger DDG2 @@ -151,73 +159,96 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): # e has refernce to d, f has reference to e self.set_delay_pairs(channel="ef", delay=0, width=1e-6) + def _prepare_mcs_on_trigger(self, mcs: MCSCardCSAXS) -> None: + """Prepare the MCS card for the next trigger. + This method holds the logic to ensure that the MCS card is ready to read. + It's logic is coupled to the MCS card implementation and the DDG1 trigger logic. + """ + status_ready_read = CompareStatus(mcs.ready_to_read, READYTOREAD.DONE) + mcs.stop_all.put(1) + status_acquiring = TransitionStatus(mcs.acquiring, [ACQUIRING.DONE, ACQUIRING.ACQUIRING]) + self.cancel_on_stop(status_ready_read) + self.cancel_on_stop(status_acquiring) + status_ready_read.wait(10) + + mcs.ready_to_read.put(READYTOREAD.PROCESSING) + mcs.erase_start.put(1) + status_acquiring.wait(timeout=10) # Allow 10 seconds in case communication is slow + + def _reset_status_event_future(self) -> None: + """Reset the status future and thread event.""" + if self._status_future and not self._status_future.done(): + self._status_thread_event.set() + self._status_future.result(timeout=10) # Allow 10 seconds for communication to finish + + # Reset to known state + self._status_thread_event.clear() + self.state.proc_status.put(1, use_complete=True) + + def _prepare_trigger_status_event(self, timeout: float | None = None) -> DeviceStatus: + """Prepare the trigger status event for the DDG1, and trigger the de""" + if timeout is None: + # Default timeout of 5 seconds + exposure time * frames_per_trigger + timeout = 5 + self.scan_info.msg.scan_parameters.get( + "exp_time", 0.1 + ) * self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1) + + # Callback to cancel the status if the device is stopped + def cancel_cb(status: CompareStatus) -> None: + """Callback to cancel the status if the device is stopped.""" + self._status_thread_event.set() + + status = StatusBitsCompareStatus( + self.state.event_status, STATUSBITS.END_OF_BURST, timeout=timeout, run=False + ) + status.add_callback(cancel_cb) + self.cancel_on_stop(status) + + # Callback to poll events, this gets executed in a separate thread by the threadpool + def _status_job(): + """Callback to poll event status an update the status_event signal.""" + while not self._status_thread_event.is_set(): + self.state.proc_status.put(1, use_complete=True) + time.sleep(0.02) # 20ms delay for processing + self.state.event_status.get(use_monitor=False) + time.sleep(0.02) # 20ms delay for processing + + # Submit the status job to the threadpool + self._status_future = self._threadpool.submit(_status_job) + return status + def on_trigger(self) -> DeviceStatus | StatusBase | None: """Note, we need to add a delay to the StatusBits callback on the event_status. If we don't then subsequent triggers may reach the DDG too early, and will be ignored. To avoid this, we've added the option to specify a delay via add_delay, default here is 50ms. """ - status = CompareStatus(self.state.event_status, STATUSBITS.NONE) - self.cancel_on_stop(status) + # Make sure to reset the status future and thread event + self._reset_status_event_future() + + # Prepare the MCS card for the next software trigger mcs = self.device_manager.devices.get("mcs", None) if mcs is None: logger.info("Did not find mcs card with name 'mcs' in current session") else: - mcs: MCSCardCSAXS - status_ready_read = CompareStatus(mcs.ready_to_read, READYTOREAD.DONE) - mcs.stop_all.put(1) - status_acquiring = TransitionStatus( - mcs.acquiring, [ACQUIRING.DONE, ACQUIRING.ACQUIRING] - ) - self.cancel_on_stop(status_ready_read) - self.cancel_on_stop(status_acquiring) - status_ready_read.wait(10) - - mcs.ready_to_read.put(READYTOREAD.PROCESSING) - mcs.erase_start.put(1) - status_acquiring.wait( - timeout=10 - ) # 2 s wait for mcs card to start should be more than enough.. - status.wait(timeout=10) - # Default timeout of 5 seconds + exposure time * frames_per_trigger - timeout = 5 + self.scan_info.msg.scan_parameters.get( - "exp_time", 0.1 - ) * self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1) - status = CompareStatus(self.state.event_status, STATUSBITS.END_OF_DELAY, timeout=timeout) - self.cancel_on_stop(status) + self._prepare_mcs_on_trigger(mcs) + # Prepare status to wait for the end of burst + status = self._prepare_trigger_status_event() + # Trigger the DDG1 self.trigger_shot.put(1, use_complete=True) return status - def wait_for_status( - self, status: DeviceStatus, bit_event: STATUSBITS, timeout: float = 2 - ) -> None: - """Wait for a event status bit to be set. - - Args: - status (DeviceStatus): The status object to update. - bit_event (STATUSBITS): The event status bit to wait for. - timeout (float): Maximum time to wait for the event status bit to be set. - """ - current_time = time.time() - while not status.done: - self.state.proc_status.put(1, use_complete=True) - event_status = self.state.event_status.get() - if (STATUSBITS(event_status) & bit_event) == bit_event: - status.set_finished() - if time.time() - current_time > timeout: - status.set_exception( - TimeoutError( - f"Timeout waiting for status of device {self.name} for event_status {bit_event}" - ) - ) - break - time.sleep(0.1) - time.sleep(0.05) # Give time for the IOC to be ready again - return status - def on_stop(self) -> None: """Stop the delay generator by setting the burst mode to 0""" self.stop_ddg() + def on_destroy(self): + """Clean up resources when the device is destroyed.""" + self._status_thread_event.set() + if hasattr(self, "_threadpool") and self._threadpool is not None: + self._threadpool.shutdown(wait=False) + self._threadpool = None + super().on_destroy() + if __name__ == "__main__": ddg = DDG1(name="ddg1", prefix="X12SA-CPCL-DDG1:") diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/delay_generator_csaxs.py b/csaxs_bec/devices/epics/delay_generator_csaxs/delay_generator_csaxs.py index 526ce78..f55d682 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs/delay_generator_csaxs.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/delay_generator_csaxs.py @@ -402,7 +402,6 @@ class DelayGeneratorEventStatus(Device): EpicsSignal, "EventStatusLI.PROC", name="proc_status", - auto_monitor=True, kind=Kind.omitted, doc="Poll and flush the latest event status register entry from the HW to the event_status signal", ) diff --git a/tests/tests_devices/test_delay_generator_csaxs.py b/tests/tests_devices/test_delay_generator_csaxs.py index 25d99ef..68a1f3c 100644 --- a/tests/tests_devices/test_delay_generator_csaxs.py +++ b/tests/tests_devices/test_delay_generator_csaxs.py @@ -191,7 +191,10 @@ def test_ddg1_stage(mock_ddg1): mock_ddg1.stage() - assert np.isclose(mock_ddg1.burst_mode.get(), 0) # Burst mode is disabled + assert np.isclose(mock_ddg1.burst_mode.get(), 1) # burst mode is enabled + assert np.isclose(mock_ddg1.burst_delay.get(), 0) + assert np.isclose(mock_ddg1.burst_period.get(), exp_time) + # Trigger DDG2 through EXT/EN assert np.isclose(mock_ddg1.ab.delay.get(), 2e-3) @@ -218,7 +221,7 @@ def test_ddg1_trigger(mock_ddg1): assert status.done is False assert status.success is False assert mock_ddg1.trigger_shot.get() == 1 - mock_ddg1.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_DELAY.value + mock_ddg1.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_BURST.value status.wait(timeout=1) # Wait for the status to be done assert status.done is True assert status.success is True