docs(mcs-card-csaxs): Add documentation to MCS card implementation
CI for csaxs_bec / test (pull_request) Failing after 1m1s
CI for csaxs_bec / test (push) Failing after 1m11s

This commit is contained in:
2025-12-19 14:17:34 +01:00
parent 10fa6d032d
commit 4b47846d56
@@ -1,4 +1,12 @@
"""Module for the MCSCard CSAXS implementation."""
"""
Module for the MCSCard CSAXS implementation at cSAXS.
Please respect the comments regarding timing and procedure of the MCS card. These
are highlighted with NOTE comments directly in the code to indicate if certain calls
are critical for stable operation of the device. Most of them are emipirically found
through long-term testing with the IOC of the SIS3820 MCS card, and attempt to avoid
unexpected behavior of the HW and IOC.
"""
from __future__ import annotations
@@ -14,9 +22,8 @@ import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, Kind
from ophyd_devices import AsyncSignal, CompareStatus, ProgressSignal, StatusBase
from ophyd_devices import AsyncMultiSignal, CompareStatus, ProgressSignal, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.bec_signals import AsyncMultiSignal
from csaxs_bec.devices.epics.mcs_card.mcs_card import (
ACQUIREMODE,
@@ -34,13 +41,21 @@ from csaxs_bec.devices.epics.mcs_card.mcs_card import (
@contextmanager
def suppress_mca_callbacks(mcs_card: MCSCard):
"""
Context manager to temporarily disable MCA channel callbacks.
Required to avoid additional callbacks when erasing all channels.
Utility context manager to suppress MCA channel callbacks temporarily.
It is required because erasing all channels via 'erase_all' PV triggers
callbacks for each channel. Depending on timing, this can interfere with
ongoing data acquisition so this context manager can be used to suppress
those callbacks temporarily.
NOTE: Please be aware that it does not restore previous state, which means
that _omit_mca_callbacks will remain set after exiting the context. It has
to be cleared manually if needed. This can be improved in the future, but
should be carefully coordinated with the logic implemented within '_on_counter_update'.
Args:
mcs_card (MCSCard): The MCSCard instance to suppress callbacks for.
"""
mcs_card._omit_mca_callbacks.set()
mcs_card._omit_mca_callbacks.set() # pylint: disable=protected-access
try:
yield
finally:
@@ -59,14 +74,27 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
The basic functionality is inherited from the MCSCard class.
Please note that the number of channels is fixed to 32, so there will be data for all
32 channels even if not all channels are used in the experiment. This setting can not
be realibly changed on the SIS3820 card's IOC through mux_output, so it is fixed here.
Mux_output should therefore also be set to 32 in the IOC configuration.
32 channels. The logic of the card is linked to the timing system (DDG) and therefore,
has to be coordinated with the logic on the DDG side.
Args:
name (str): Name of the device.
prefix (str, optional): Prefix for the EPICS PVs. Defaults to "".
"""
USER_ACCESS = ["mcs_recovery"]
# NOTE The number of MCA channels is fixed to 32 for the CSAXS MCS card.
# On the IOC, we receive a 'warning' or 'error' once we set this channel for the
# envisioned input/output mode settings of the card. However, we need to know the
# channels set as callback timing relies on the channels to be set.
# For the future, we may consider adding an initialization parameter to set
# the number of channels, which in return limits the number of subscriptions
# on the channels. However, mux_output should still be set to 32 on the IOC side.
# If this limits performance, this should be investigated with Controls engineers and
# the IOC.
NUM_MCA_CHANNELS: int = 32
# All counter from the MCS card.
# MCA counters for the card. Channels 1-32 will be sent to BEC.
mca = Cpt(
AsyncMultiSignal,
name="counters",
@@ -77,9 +105,11 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
kind=Kind.normal,
doc="AsyncMultiSignal for MCA card channels 1-32",
doc=(
"AsyncMultiSignal for MCA card channels 1-32."
"Cabling of the MCS card determines which channel corresponds to which input."
),
)
# Progress Signal
progress = Cpt(ProgressSignal, doc="ProgressSignal indicating the progress of the device")
def __init__(
@@ -90,21 +120,30 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
device_manager: DeviceManagerBase | None = None,
**kwargs,
):
"""
Initialize the MCSCardCSAXS with the given arguments and keyword arguments.
"""
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
# NOTE MCS Clock frequency. This is linked to the settings of the SIS3820 IOC and
# cabeling of the card. Currently, the 'output_mode' is set to MODE_2 and output 6 or 7
# (both 10MHz clocks) are used on channel 5 input for timing signal of the IOC.
# Please adjust this comment if the cabling or IOC settings change.
self._mcs_clock = 1e7 # 10MHz clock -> 1e7 Hz
self._pv_timeout = 2.0 # seconds
self._rlock = RLock() # Needed to ensure thread safety for counter updates
self._acquisition_group: str = "monitored"
self._rlock = RLock()
# NOTE This parameter will be sent with async data of the mcs counters.
# Based on scan-paramters, e.g. frames_per_trigger, this will be either
# 'monitored' or 'burst_group'. This means whether data from this channel
# is in sync with monitored devices or another group. In this scenario,
# the other group is called burst_group. Other detectors connected and
# triggered through the same timing system should implement the same logic
# to allow data to be properly grouped afterwards.
self._acquisition_group: str = "monitored" # default value, will be updated in on_stage
self._num_total_triggers: int = 0
# Event logic to schedule async data emission & monitoring
# Please note that complete needs to wait until all data was sent
# This requires additional logic and a thread to monitor the data emission
# Thread and event logic for monitoring async data emission after scan is done
# These are mostly internal variables for which values should not be changed externally.
# Adjusting the logic of them should also be handled with care and proper testing.
self._scan_done_thread_kill_event: threading.Event = threading.Event()
self._start_monitor_async_data_emission: threading.Event = threading.Event()
self._scan_done_callbacks: list[Callable[[], None]] = []
@@ -118,17 +157,40 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
def on_connected(self):
"""
Called when the device is connected.
This method is called once the device and all its PVs are connected. Any initial
setup of PVs should be managed here. Please beware that settings of the MCS card
correlate with its operation mode, input/output modes, and timing. Changing single
parameters without understanding the overall logic may lead to unexpected behavior
of the device.Therefore, any modification of these parameters should be handled
with care and tested.
A brief summary of the procesdure that is implemented here:
- Stop any ongoing acquisiton.
- Setup the Initial initial settings of the MCS card with respective operation modes
- Run 'mcs_recovery' procedure to ensure that no pending acquisition data is scheduled
to be pushed through mcs channels
- Subscribe a callback '_on_counter_update' to mcs counter PVs to forward
data through AsyncMultiSignal to BEC
- Start the monitoring thread for async data emission after scan is done
"""
# NOTE Stop any ongoing acquisition first. This shut be done before setting any PVs.
self.stop_all.put(1)
# Setup the MCS card settings
#########################
### Setup MCS Card ###
#########################
# Setup the MCS card settings. Please note that any runtime modification
# these parameter may lead to unexpected behavior of the device.
# Therefore this has to be set up correctly.
self.channel_advance.set(CHANNELADVANCE.EXTERNAL).wait(timeout=self._pv_timeout)
self.channel1_source.set(CHANNEL1SOURCE.EXTERNAL).wait(timeout=self._pv_timeout)
self.prescale.set(1).wait(timeout=self._pv_timeout)
self.user_led.set(0).wait(timeout=self._pv_timeout)
# NOTE The number of output channels has to be set to NUM_MCA_CHANNELS.
# The logic to send data to BEC relies on knowing how many channels are active.
self.mux_output.put(self.NUM_MCA_CHANNELS)
# Set the input and output modes & polarities
self.input_mode.set(INPUTMODE.MODE_3).wait(timeout=self._pv_timeout)
self.input_polarity.set(POLARITY.NORMAL).wait(timeout=self._pv_timeout)
@@ -136,6 +198,8 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self.output_polarity.set(POLARITY.NORMAL).wait(timeout=self._pv_timeout)
self.count_on_start.set(0).wait(timeout=self._pv_timeout)
# NOTE Data is read out when the MCS card finishes an acquisition. The logic for this
# is also linked to triggering on the DDG.
# Set ReadMode to PASSIVE, card will wait for external trigger to be read
self.read_mode.set(READMODE.PASSIVE).wait(timeout=self._pv_timeout)
@@ -145,57 +209,85 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
# Subscribe the progress signal
self.current_channel.subscribe(self._progress_update, run=False)
self.mcs_recovery()
# NOTE: Run a recovery procedure to ensure that the card has no pending data
# that needs to be pushed through the mca channels. The procedure involves
# stopping any ongoing acquisition and erasing all data on the card. Including
# a short sleep to allow the IOC to process the commands.
self.mcs_recovery(timeout=1)
# Subscribe to the mca updates
####################################
### Setup MCS Subscriptions ###
####################################
for sig in self.counters.component_names:
sig_obj: EpicsSignalRO = getattr(self.counters, sig)
sig_obj.subscribe(self._on_counter_update, run=False)
# Start monitoring thread
self._scan_done_thread.start()
def _on_counter_update(self, value, **kwargs) -> None:
"""
Callback for counter updates of the mca channels (1-32).
Data from the mca channels will be pushed to a list, and then forwarded to
the async multi signal 'raw' for readout after the trigger is complete.
It is important that mux_output is set to the correct number of channels in on_connected,
because the callback here waits for updates on all channels before pushing data to BEC.
The lock is important to avoid that different threads execute this callback simultaneously,
which would jeopardize the logic that counts the number of received channels.
If _omit_mca_callbacks is set, the callback will return immediately without processing the
data. This is used when erasing all channels to avoid interference with ongoing acquisition.
It has to manually cleared after the context manager 'suppress_mca_callbacks' is used.
Args:
value: The new value from the counter PV.
**kwargs: Additional keyword arguments from the subscription, including 'obj' (the EpicsSignalRO instance).
"""
with self._rlock:
self._mca_counter_index += 1
if self._omit_mca_callbacks.is_set():
return # Suppress callbacks when erasing all channels
signal = kwargs.get("obj", None)
signal: EpicsSignalRO | None = kwargs.get("obj", None)
if signal is None:
logger.error(f"Called without 'obj' in kwargs: {kwargs}")
return
signal: EpicsSignalRO
attr_name = signal.attr_name
return # Log error if no signal object is provided
# Ignore updates for channels that are not setup through num_connected_channels
# Extract index from signal name that updates
# Ignore any updates from channels beyond NUM_MCA_CHANNELS
attr_name = signal.attr_name
index = int(attr_name[3:]) # Extract index from 'mcaX'
if index > self.NUM_MCA_CHANNELS:
return
# NOTE Depending on the scan parameters, we may either receive single values or numpy arrays.
# Therefore, we need to handle both cases here to ensure that data is always stored. We do
# this by converting single values to a list with one element, and numpy arrays to lists.
if isinstance(value, np.ndarray):
value = value.tolist() # Convert numpy array to list
else:
value = [value] # Received single value, convert to list
# Store the value with timestamp. If available in kwargs, use provided timestamp from CA, otherwise use current time when received.
self._current_data.update(
{attr_name: {"value": value, "timestamp": kwargs.get("timestamp") or time.time()}}
)
# Once we have received all channels, push data to BEC and reset for next accumulation
if len(self._current_data) == self.NUM_MCA_CHANNELS:
# Send out data on multi async signal
self.mca.put(self._current_data, acquisition_group=self._acquisition_group)
self._current_data.clear()
self._mca_counter_index = 0
self._current_data_index += 1
# pylint: disable=unused-argument
def _progress_update(self, *args, old_value: any, value: any, **kwargs) -> None:
"""Callback for progress updates from ophyd subscription on current_channel."""
"""
Callback to update the progress signals base on values of current_channel in respect to expected total triggers.
Logic for these updates need to be extended once fly and step scan logic is fully implemented.
Args:
old_value: Previous value of the signal.
value: New value of the signal.
"""
scan_done = bool(value == self._num_total_triggers)
self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done)
if scan_done:
@@ -203,7 +295,12 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
def on_stage(self) -> None:
"""
Called when the device is staged.
Method called when the device is staged before a scan. Any bootstrapping required
for the scan should be handled here.
The following procedure is implemented here:
- Ensure that any ongoing acquisition is stopped (should never happen if not interfered with manually)
- Erase all data on the MCS card to ensure a clean start (should never
"""
# NOTE: If for some reason, the card is still acquiring, we need to stop it first
@@ -217,12 +314,12 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
status = CompareStatus(self.acquiring, ACQUIRING.DONE)
status.wait(timeout=10)
# NOTE: Erase all will result in data emission through mca callback subscriptions
# NOTE: If current_channel != 0, erase all data on the card. This
# needs to be done with the 'suppress_mca_callbacks' context manager as erase_all will result
# in data emission through mca callback subscriptions.
# The buffer needs to be cleared as this will otherwise lead to missing
# triggers during the scan. Again, this should not happen if unstage is properly called.
# But user interference or a restart of the device_server may lead to this situation.
# self.erase_all.put(1)
# time.sleep(3)
if self.current_channel.get() != 0:
with suppress_mca_callbacks(self):
logger.warning(
@@ -231,7 +328,12 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
# Erase all data on the MCS card
self.erase_all.put(1)
time.sleep(1) # Allow time to process erase
# Clear the omit flag after erasing and waiting for 1s
self._omit_mca_callbacks.clear()
#####################################
### Setup Acquisition Parameters ###
#####################################
triggers = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
num_points = self.scan_info.msg.num_points
self._num_total_triggers = triggers * num_points
@@ -239,62 +341,114 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self.preset_real.set(0).wait(timeout=self._pv_timeout)
self.num_use_all.set(triggers).wait(timeout=self._pv_timeout)
# Make sure to clear any remaining data in the local buffer
# Clear any previous data, just to be sure
with self._rlock:
self._current_data.clear()
# Reset monitoring of async data emission
# NOTE Reset events for monitoring async_data_emission thread which is
# running during complete to wait for all data from the card
# to be emitted to BEC.
self._start_monitor_async_data_emission.clear()
# Clear any previous scan done callbacks
self._scan_done_callbacks.clear()
# Reset counter for data index of emitted data, for fly scans, this logic may have to be adjusted.
self._current_data_index = 0
def on_unstage(self) -> None:
"""
Called when the device is unstaged.
Called when the device is unstaged. This method should be omnipotent and resolve fast.
It stops any ongoing acquisition, erases all data on the MCS and clears the local buffer '_current_data'.
NOTE: It is important that the logic for on_complete is solid and properly waiting for mca data to be emitted
to BEC. Otherwise, unstage may interfere with ongoing data emission. Unstage is called after complete during scans.
"""
self.stop_all.put(1)
# Make sure that upon unstaging, all data on the MCS card is erased.
with suppress_mca_callbacks(self):
with self._rlock:
self._current_data.clear()
self.erase_all.put(1)
def _monitor_async_data_emission(self, timeout: int = 10) -> None:
"""Monitor data emission after scan is done."""
def _monitor_async_data_emission(self) -> None:
"""
Monitoring loop that runs in a separate thread to check when all data has been emitted.
It is IDLE most of the time, but started once complete is called. Please be aware that
it has a timeout to avoid deadlocks in case something unexpected happens. Errors are logged
but the exception has to be handled through logic in the callbacks _scan_done_callbacks.
The check is done by comparing the number of data updates received through mca channel callbacks
with the expected number of points in the scan. Once they match, all callbacks in _scan_done_callbacks
are called to indicate that data emission is done.
NOTE! This logic currently works for any step scan, but has to be extended for fly scans.
Args:
timeout (int): Timeout in seconds to avoid deadlocks. Defaults to 10 seconds.
"""
while not self._scan_done_thread_kill_event.is_set():
while self._start_monitor_async_data_emission.wait():
try:
if self._current_data_index == self.scan_info.msg.num_points:
for callback in self._scan_done_callbacks:
callback()
callback(exception=None)
time.sleep(0.02) # 20ms delay to avoid busy loop
except Exception: # pylint: disable=broad-except
except Exception as exc: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Exception in monitoring thread of complete for {self.name}:\n{content}"
"Running callbacks to avoid deadlock."
)
for callback in self._scan_done_callbacks:
callback()
callback(exception=exc)
def _status_callback(self, status: StatusBase) -> None:
def _status_callback(self, status: StatusBase, exception=None) -> None:
"""Callback for status completion."""
if not status.done:
status.set_finished()
self._start_monitor_async_data_emission.clear() # Stop monitoring
# NOTE Important check as set_finished or set_exception should not be called
# if the status is already done (e.g. cancelled externally)
if status.done:
return # Already done and cancelled externally.
if exception:
status.set_exception(exception)
else:
status.set_finished()
def _status_failed_callback(self, status: StatusBase) -> None:
"""Callback for status failure."""
if status.done and not status.success:
"""Callback for status failure, the monitoring thread should be stopped."""
# NOTE Check for status.done and status.success is important to avoid
if status.done:
self._start_monitor_async_data_emission.clear() # Stop monitoring
def on_complete(self) -> CompareStatus:
"""On scan completion."""
# Prepare callback for data emission done
"""
Method that is called at the end of scan core, but before unstage.
In here, the device should report whether it successfully finished
data acquisition for the scan. This has to happen asynchronously,
so through a status object (future promise) that is returned and resolves
once logic for data acquisition on the MCS card is finished. This is required
because data updates that occur after the scan is closed will be rejected by BEC.
Therefore, we need to ensure that all data has been emitted to BEC before
reporting completion of the device.
This method implements the following procedure:
- Starts the IDLE async data monitoring thread that checks if all expected data
has been emitted to BEC through the mca channel callbacks.
- Use a CompareStatus to monitor when the MCS card becomes DONE. Please note that this
only indicates that the card has finished acquisition, but not that all data has been
emitted to BEC.
- Return combined status object. A callback is registered to handle failure of the status
if it is stopped externally, e.g. through scan abort. This should ensure that the
monitoring thread is stopped properly.
"""
# Prepare and register status callback for the async monitoring loop
status_async_data = StatusBase(obj=self)
self._scan_done_callbacks.append(partial(self._status_callback, status_async_data))
# Start done callback loop
# Set the event to start monitoring async data emission
self._start_monitor_async_data_emission.set()
# Add CompareStatus for Acquiring DONE
status = CompareStatus(self.acquiring, ACQUIRING.DONE)
@@ -305,6 +459,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
return status
def on_destroy(self):
"""Hook called when the device is being destroyed. It should clean up any resources including threads."""
self._scan_done_thread_kill_event.set()
self._start_monitor_async_data_emission.set()
if self._scan_done_thread.is_alive():
@@ -313,18 +468,28 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
logger.warning(f"Thread for device {self.name} did not terminate properly.")
def on_stop(self) -> None:
"""
Called when the scan is stopped.
"""
"""Hook called when the device is stopped. In addition, any status that is registered through cancel_on_stop will be cancelled here."""
self.stop_all.put(1)
self.erase_all.put(1)
def mcs_recovery(self, timeout:int=1) -> None:
"""Recovery procedure for the mcs card"""
sleep_time = timeout / 2 # 2 sleeps
logger.info(f"Running recovery procedure for MCS card {self.name} with {sleep_time}s sleep, calling stop_all and erase_all, and another {sleep_time}s sleep")
def mcs_recovery(self, timeout: int = 1) -> None:
"""
Recovery procedure for the mcs card. This procedure has been empirically found and can
be used to ensure that the MCS card is stopped and has no pending data to be emitted.
It involves stopping any ongoing acquisition and erasing all data on the card, with
a sleep in between to allow the IOC to process the commands.
Args:
timeout (int): Total timeout for the recovery procedure. Defaults to 1 second.
"""
sleep_time = timeout / 2 # 2 sleeps
logger.info(
f"Running recovery procedure for MCS card {self.name} with {sleep_time}s sleep, calling stop_all and erase_all, and another {sleep_time}s sleep"
)
self.erase_start.put(1)
time.sleep(sleep_time)
self.stop_all.put(1)
self.erase_all.put(1)
time.sleep(sleep_time)
with suppress_mca_callbacks(self):
self.erase_all.put(1)
time.sleep(sleep_time)
self._omit_mca_callbacks.clear()