From 8c8e988460d8ec5acd46fea25d7fff4e373f27cf Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 16 Dec 2025 08:52:58 +0100 Subject: [PATCH] w --- .../epics/delay_generator_csaxs/ddg_1.py | 2 +- .../devices/epics/mcs_card/mcs_card_csaxs.py | 53 +++++++++++++++++-- 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py index b712719..9357278 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py @@ -172,7 +172,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): mcs.stop_all.put(1) status_acquiring = TransitionStatus(mcs.acquiring, [ACQUIRING.DONE, ACQUIRING.ACQUIRING]) self.cancel_on_stop(status_acquiring) - mcs.erase_start.put(1) + mcs.start_all.put(1) # Don't use erase_start as this may emit data through callbacks status_acquiring.wait(timeout=10) # Allow 10 seconds in case communication is slow def _poll_event_status(self) -> None: diff --git a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py index 189e0f0..59398d4 100644 --- a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py +++ b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py @@ -4,6 +4,7 @@ from __future__ import annotations import threading import time +from contextlib import contextmanager from functools import partial from threading import RLock from typing import TYPE_CHECKING, Callable, Literal @@ -28,6 +29,28 @@ from csaxs_bec.devices.epics.mcs_card.mcs_card import ( MCSCard, ) + +@contextmanager +def suppress_mca_callbacks(mcs_card: MCSCard): + """ + Context manager to temporarily disable MCA channel callbacks. + Required to avoid additional callbacks when erasing all channels. + + Args: + mcs_card (MCSCard): The MCSCard instance to suppress callbacks for. + """ + mcs_card._omit_mca_callbacks.set() + try: + yield + finally: + mcs_card._channels_updated.wait(timeout=3.0) + if not mcs_card._channels_updated.is_set(): + logger.warning( + f"Timeout while waiting for MCA channels to be cleared for device {mcs_card.name}." + ) + mcs_card._omit_mca_callbacks.clear() + + if TYPE_CHECKING: # pragma: no cover from bec_lib.devicemanager import DeviceManagerBase, ScanInfo @@ -93,8 +116,12 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): target=self._monitor_async_data_emission, daemon=True ) self._current_data_index: int = 0 + self._mca_counter_index: int = 0 self._current_data: dict[str, dict[Literal["value", "timestamp"], list[int] | float]] = {} + self._omit_mca_callbacks: threading.Event = threading.Event() + self._channels_updated: threading.Event = threading.Event() + def on_connected(self): """ Called when the device is connected. @@ -108,7 +135,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): self.prescale.set(1).wait(timeout=self._pv_timeout) self.user_led.set(0).wait(timeout=self._pv_timeout) - # Set the input and output modes & polarities self.input_mode.set(INPUTMODE.MODE_3).wait(timeout=self._pv_timeout) self.input_polarity.set(POLARITY.NORMAL).wait(timeout=self._pv_timeout) @@ -151,6 +177,15 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): """ with self._rlock: + if self._mca_counter_index == 0: + self._channels_updated.clear() + self._mca_counter_index += 1 + if self._omit_mca_callbacks.is_set(): + # Count supressing callbacks when erasing all channels + if self._mca_counter_index == self.NUM_MCA_CHANNELS: + self._mca_counter_index = 0 + self._channels_updated.set() + return # Suppress callbacks when erasing all channels signal = kwargs.get("obj", None) if signal is None: logger.error(f"Called without 'obj' in kwargs: {kwargs}") @@ -180,9 +215,10 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): # Send out data on multi async signal self.mca.put(self._current_data, acquisition_group=self._acquisition_group) self._current_data.clear() + self._mca_counter_index = 0 self._current_data_index += 1 - def _progress_update(self, *args, old_value:any, value:any, **kwargs) -> None: + def _progress_update(self, *args, old_value: any, value: any, **kwargs) -> None: """Callback for progress updates from ophyd subscription on current_channel.""" scan_done = bool(value == self._num_total_triggers) self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done) @@ -194,6 +230,16 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): Called when the device is staged. """ + # Clear any existing data + # NOTE: It is important to suppress the MCA callbacks here to avoid + # data updates from the mca channels as they are emitted when erase_all + # is called on the IOC. + with suppress_mca_callbacks(self): + # Erase all data on the MCS card + with self._rlock: + self._current_data.clear() + self.erase_all.put(1) + triggers = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1) num_points = self.scan_info.msg.num_points self._num_total_triggers = triggers * num_points @@ -201,9 +247,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): self.preset_real.set(0).wait(timeout=self._pv_timeout) self.num_use_all.set(triggers).wait(timeout=self._pv_timeout) - # Reset data - self._current_data.clear() - # Reset monitoring of async data emission self._start_monitor_async_data_emission.clear() self._scan_done_callbacks.clear()