w
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m3s
CI for csaxs_bec / test (pull_request) Failing after 1m6s

This commit is contained in:
2025-12-16 08:52:58 +01:00
parent 7556e14e26
commit 8c8e988460
2 changed files with 49 additions and 6 deletions

View File

@@ -172,7 +172,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
mcs.stop_all.put(1)
status_acquiring = TransitionStatus(mcs.acquiring, [ACQUIRING.DONE, ACQUIRING.ACQUIRING])
self.cancel_on_stop(status_acquiring)
mcs.erase_start.put(1)
mcs.start_all.put(1) # Don't use erase_start as this may emit data through callbacks
status_acquiring.wait(timeout=10) # Allow 10 seconds in case communication is slow
def _poll_event_status(self) -> None:

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import threading
import time
from contextlib import contextmanager
from functools import partial
from threading import RLock
from typing import TYPE_CHECKING, Callable, Literal
@@ -28,6 +29,28 @@ from csaxs_bec.devices.epics.mcs_card.mcs_card import (
MCSCard,
)
@contextmanager
def suppress_mca_callbacks(mcs_card: MCSCard):
"""
Context manager to temporarily disable MCA channel callbacks.
Required to avoid additional callbacks when erasing all channels.
Args:
mcs_card (MCSCard): The MCSCard instance to suppress callbacks for.
"""
mcs_card._omit_mca_callbacks.set()
try:
yield
finally:
mcs_card._channels_updated.wait(timeout=3.0)
if not mcs_card._channels_updated.is_set():
logger.warning(
f"Timeout while waiting for MCA channels to be cleared for device {mcs_card.name}."
)
mcs_card._omit_mca_callbacks.clear()
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import DeviceManagerBase, ScanInfo
@@ -93,8 +116,12 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
target=self._monitor_async_data_emission, daemon=True
)
self._current_data_index: int = 0
self._mca_counter_index: int = 0
self._current_data: dict[str, dict[Literal["value", "timestamp"], list[int] | float]] = {}
self._omit_mca_callbacks: threading.Event = threading.Event()
self._channels_updated: threading.Event = threading.Event()
def on_connected(self):
"""
Called when the device is connected.
@@ -108,7 +135,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self.prescale.set(1).wait(timeout=self._pv_timeout)
self.user_led.set(0).wait(timeout=self._pv_timeout)
# Set the input and output modes & polarities
self.input_mode.set(INPUTMODE.MODE_3).wait(timeout=self._pv_timeout)
self.input_polarity.set(POLARITY.NORMAL).wait(timeout=self._pv_timeout)
@@ -151,6 +177,15 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
"""
with self._rlock:
if self._mca_counter_index == 0:
self._channels_updated.clear()
self._mca_counter_index += 1
if self._omit_mca_callbacks.is_set():
# Count supressing callbacks when erasing all channels
if self._mca_counter_index == self.NUM_MCA_CHANNELS:
self._mca_counter_index = 0
self._channels_updated.set()
return # Suppress callbacks when erasing all channels
signal = kwargs.get("obj", None)
if signal is None:
logger.error(f"Called without 'obj' in kwargs: {kwargs}")
@@ -180,9 +215,10 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
# Send out data on multi async signal
self.mca.put(self._current_data, acquisition_group=self._acquisition_group)
self._current_data.clear()
self._mca_counter_index = 0
self._current_data_index += 1
def _progress_update(self, *args, old_value:any, value:any, **kwargs) -> None:
def _progress_update(self, *args, old_value: any, value: any, **kwargs) -> None:
"""Callback for progress updates from ophyd subscription on current_channel."""
scan_done = bool(value == self._num_total_triggers)
self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done)
@@ -194,6 +230,16 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
Called when the device is staged.
"""
# Clear any existing data
# NOTE: It is important to suppress the MCA callbacks here to avoid
# data updates from the mca channels as they are emitted when erase_all
# is called on the IOC.
with suppress_mca_callbacks(self):
# Erase all data on the MCS card
with self._rlock:
self._current_data.clear()
self.erase_all.put(1)
triggers = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
num_points = self.scan_info.msg.num_points
self._num_total_triggers = triggers * num_points
@@ -201,9 +247,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self.preset_real.set(0).wait(timeout=self._pv_timeout)
self.num_use_all.set(triggers).wait(timeout=self._pv_timeout)
# Reset data
self._current_data.clear()
# Reset monitoring of async data emission
self._start_monitor_async_data_emission.clear()
self._scan_done_callbacks.clear()