wip mcs card
This commit is contained in:
@@ -1,11 +1,11 @@
|
||||
"""
|
||||
Module for the MCSCard CSAXS implementation at cSAXS.
|
||||
|
||||
Please respect the comments regarding timing and procedure of the MCS card. These
|
||||
are highlighted with NOTE comments directly in the code to indicate if certain calls
|
||||
are critical for stable operation of the device. Most of them are emipirically found
|
||||
through long-term testing with the IOC of the SIS3820 MCS card, and attempt to avoid
|
||||
unexpected behavior of the HW and IOC.
|
||||
Please respect the comments regarding timing and procedures of the MCS card. These
|
||||
are highlighted with NOTE comments directly in the code, indicating requirements
|
||||
for stable device operation. Most of these constraints were identified
|
||||
empirically through extensive testing with the SIS3820 MCS card IOC and are intended
|
||||
to prevent unexpected hardware or IOC behavior.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -39,13 +39,14 @@ from csaxs_bec.devices.epics.mcs_card.mcs_card import (
|
||||
|
||||
|
||||
@contextmanager
|
||||
def suppress_mca_callbacks(mcs_card: MCSCard):
|
||||
def suppress_mca_callbacks(mcs_card: MCSCard, restore_after_timeout: None | float = None):
|
||||
"""
|
||||
Utility context manager to suppress MCA channel callbacks temporarily.
|
||||
It is required because erasing all channels via 'erase_all' PV triggers
|
||||
callbacks for each channel. Depending on timing, this can interfere with
|
||||
ongoing data acquisition so this context manager can be used to suppress
|
||||
those callbacks temporarily.
|
||||
those callbacks temporarily. If used with restore_after_timeout, the suppression
|
||||
will be automatically cleared after the specified timeout in seconds.
|
||||
|
||||
NOTE: Please be aware that it does not restore previous state, which means
|
||||
that _omit_mca_callbacks will remain set after exiting the context. It has
|
||||
@@ -54,12 +55,17 @@ def suppress_mca_callbacks(mcs_card: MCSCard):
|
||||
|
||||
Args:
|
||||
mcs_card (MCSCard): The MCSCard instance to suppress callbacks for.
|
||||
restore_after_timeout (float | None): Optional timeout in seconds to automatically
|
||||
clear the suppression after the specified time. If None, the original state
|
||||
is not restored.
|
||||
"""
|
||||
mcs_card._omit_mca_callbacks.set() # pylint: disable=protected-access
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
pass
|
||||
if restore_after_timeout is not None:
|
||||
time.sleep(restore_after_timeout)
|
||||
mcs_card._omit_mca_callbacks.clear() # pylint: disable=protected-access
|
||||
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
@@ -74,8 +80,8 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
The basic functionality is inherited from the MCSCard class.
|
||||
|
||||
Please note that the number of channels is fixed to 32, so there will be data for all
|
||||
32 channels. The logic of the card is linked to the timing system (DDG) and therefore,
|
||||
has to be coordinated with the logic on the DDG side.
|
||||
32 channels. In addition, the logic of the card is linked to the timing system (DDG)
|
||||
and therefore changes have to be coordinated with the logic on the DDG side.
|
||||
|
||||
Args:
|
||||
name (str): Name of the device.
|
||||
@@ -83,6 +89,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
"""
|
||||
|
||||
USER_ACCESS = ["mcs_recovery"]
|
||||
|
||||
# NOTE The number of MCA channels is fixed to 32 for the CSAXS MCS card.
|
||||
# On the IOC, we receive a 'warning' or 'error' once we set this channel for the
|
||||
# envisioned input/output mode settings of the card. However, we need to know the
|
||||
@@ -100,7 +107,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
name="counters",
|
||||
signals=[
|
||||
f"mca{i}" for i in range(1, 33)
|
||||
], # This needs to be in sync with counters DynamicDeviceComponent
|
||||
], # NOTE Channels 1-32, they need to be in sync with the 'counters' component (DynamicDeviceComponent) of the MCSCard
|
||||
ndim=1,
|
||||
async_update={"type": "add", "max_shape": [None]},
|
||||
max_size=1000,
|
||||
@@ -124,8 +131,8 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
|
||||
)
|
||||
# NOTE MCS Clock frequency. This is linked to the settings of the SIS3820 IOC and
|
||||
# cabeling of the card. Currently, the 'output_mode' is set to MODE_2 and output 6 or 7
|
||||
# (both 10MHz clocks) are used on channel 5 input for timing signal of the IOC.
|
||||
# cabeling of the card. Currently, the 'output_mode' is set to MODE_2 and one of the outputs
|
||||
# 6 or 7 (both 10MHz clocks) is used on channel 5 input for the timing signal of the IOC.
|
||||
# Please adjust this comment if the cabling or IOC settings change.
|
||||
self._mcs_clock = 1e7 # 10MHz clock -> 1e7 Hz
|
||||
self._pv_timeout = 2.0 # seconds
|
||||
@@ -158,7 +165,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
def on_connected(self):
|
||||
"""
|
||||
This method is called once the device and all its PVs are connected. Any initial
|
||||
setup of PVs should be managed here. Please beware that settings of the MCS card
|
||||
setup of PVs should be managed here. Please be aware that settings of the MCS card
|
||||
correlate with its operation mode, input/output modes, and timing. Changing single
|
||||
parameters without understanding the overall logic may lead to unexpected behavior
|
||||
of the device.Therefore, any modification of these parameters should be handled
|
||||
@@ -200,7 +207,8 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
|
||||
# NOTE Data is read out when the MCS card finishes an acquisition. The logic for this
|
||||
# is also linked to triggering on the DDG.
|
||||
# Set ReadMode to PASSIVE, card will wait for external trigger to be read
|
||||
# Set ReadMode to PASSIVE, the card will wait either wait for readout command or
|
||||
# automatically readout once acquisition is done.
|
||||
self.read_mode.set(READMODE.PASSIVE).wait(timeout=self._pv_timeout)
|
||||
|
||||
# Set the acquire mode
|
||||
@@ -227,13 +235,17 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
|
||||
def _on_counter_update(self, value, **kwargs) -> None:
|
||||
"""
|
||||
Callback for counter updates of the mca channels (1-32).
|
||||
Callback for counter updates of the mca channels (1-32). This callback is attached
|
||||
to each mca channel PV on the MCS card. It collects data from all channels
|
||||
and once all channels have been updated for a given acquisition, it pushes
|
||||
the data to BEC through the AsyncMultiSignal 'mca'.
|
||||
|
||||
It is important that mux_output is set to the correct number of channels in on_connected,
|
||||
because the callback here waits for updates on all channels before pushing data to BEC.
|
||||
|
||||
The lock is important to avoid that different threads execute this callback simultaneously,
|
||||
which would jeopardize the logic that counts the number of received channels.
|
||||
The _rlock is used to ensure thread safety as multiple callbacks may be executed
|
||||
simultaneously from different threads.
|
||||
|
||||
If _omit_mca_callbacks is set, the callback will return immediately without processing the
|
||||
data. This is used when erasing all channels to avoid interference with ongoing acquisition.
|
||||
It has to manually cleared after the context manager 'suppress_mca_callbacks' is used.
|
||||
@@ -249,9 +261,10 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
signal: EpicsSignalRO | None = kwargs.get("obj", None)
|
||||
if signal is None:
|
||||
logger.error(f"Called without 'obj' in kwargs: {kwargs}")
|
||||
return # Log error if no signal object is provided
|
||||
return
|
||||
|
||||
# Extract index from signal name that updates
|
||||
# NOTE: This relies on the naming convention of the mca channels being 'mca1', 'mca2', ..., 'mca32'.
|
||||
# for the MCSCard class with the 'counters' DynamicDeviceComponent.
|
||||
# Ignore any updates from channels beyond NUM_MCA_CHANNELS
|
||||
attr_name = signal.attr_name
|
||||
index = int(attr_name[3:]) # Extract index from 'mcaX'
|
||||
@@ -266,7 +279,8 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
else:
|
||||
value = [value] # Received single value, convert to list
|
||||
|
||||
# Store the value with timestamp. If available in kwargs, use provided timestamp from CA, otherwise use current time when received.
|
||||
# Store the value with timestamp. If available in kwargs, use provided timestamp from CA,
|
||||
# otherwise use current time when received.
|
||||
self._current_data.update(
|
||||
{attr_name: {"value": value, "timestamp": kwargs.get("timestamp") or time.time()}}
|
||||
)
|
||||
@@ -278,6 +292,8 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
self._mca_counter_index = 0
|
||||
self._current_data_index += 1
|
||||
|
||||
# NOTE The logic for the device progress is not yet fully refined for all scan types.
|
||||
# This has to be adjusted once fly scan and step scan logic is fully implemented.
|
||||
# pylint: disable=unused-argument
|
||||
def _progress_update(self, *args, old_value: any, value: any, **kwargs) -> None:
|
||||
"""
|
||||
@@ -295,12 +311,16 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
|
||||
def on_stage(self) -> None:
|
||||
"""
|
||||
Method called when the device is staged before a scan. Any bootstrapping required
|
||||
for the scan should be handled here.
|
||||
This method is called when the device is staged before a scan. Any bootstrapping required
|
||||
for the scan should be handled here. We also need to handle MCS card specific logic to ensure
|
||||
that the card is properly prepared for the scan.
|
||||
|
||||
The following procedure is implemented here:
|
||||
- Ensure that any ongoing acquisition is stopped (should never happen if not interfered with manually)
|
||||
- Erase all data on the MCS card to ensure a clean start (should never
|
||||
- Set acquisition parameters based on scan parameters (frames_per_trigger, num_points, acquisition_group)
|
||||
- Clear any events and buffers related to async data emission. This includes '_omit_mca_callbacks',
|
||||
'_start_monitor_async_data_emission', '_scan_done_callbacks', and '_current_data'.
|
||||
"""
|
||||
|
||||
# NOTE: If for some reason, the card is still acquiring, we need to stop it first
|
||||
@@ -321,15 +341,12 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
# triggers during the scan. Again, this should not happen if unstage is properly called.
|
||||
# But user interference or a restart of the device_server may lead to this situation.
|
||||
if self.current_channel.get() != 0:
|
||||
with suppress_mca_callbacks(self):
|
||||
with suppress_mca_callbacks(self, restore_after_timeout=1.0):
|
||||
logger.warning(
|
||||
f"MCS Card {self.name} had still data in buffer Erased all data on staging and sleeping for 1 second."
|
||||
)
|
||||
# Erase all data on the MCS card
|
||||
self.erase_all.put(1)
|
||||
time.sleep(1) # Allow time to process erase
|
||||
# Clear the omit flag after erasing and waiting for 1s
|
||||
self._omit_mca_callbacks.clear()
|
||||
|
||||
#####################################
|
||||
### Setup Acquisition Parameters ###
|
||||
@@ -353,9 +370,12 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
# Clear any previous scan done callbacks
|
||||
self._scan_done_callbacks.clear()
|
||||
|
||||
# Reset counter for data index of emitted data, for fly scans, this logic may have to be adjusted.
|
||||
# Reset counter for data index of emitted data, NOTE for fly scans, this logic may have to be adjusted.
|
||||
self._current_data_index = 0
|
||||
|
||||
# NOTE Make sure that the signal that omits mca callbacks is cleared
|
||||
self._omit_mca_callbacks.clear()
|
||||
|
||||
def on_unstage(self) -> None:
|
||||
"""
|
||||
Called when the device is unstaged. This method should be omnipotent and resolve fast.
|
||||
@@ -363,6 +383,8 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
|
||||
NOTE: It is important that the logic for on_complete is solid and properly waiting for mca data to be emitted
|
||||
to BEC. Otherwise, unstage may interfere with ongoing data emission. Unstage is called after complete during scans.
|
||||
It is crucial that the device itself calls '_omit_mca_callbacks' in its on_stage method to make sure
|
||||
that data is emitted once the card is properly staged.
|
||||
"""
|
||||
self.stop_all.put(1)
|
||||
with suppress_mca_callbacks(self):
|
||||
@@ -372,18 +394,14 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
|
||||
def _monitor_async_data_emission(self) -> None:
|
||||
"""
|
||||
Monitoring loop that runs in a separate thread to check when all data has been emitted.
|
||||
It is IDLE most of the time, but started once complete is called. Please be aware that
|
||||
it has a timeout to avoid deadlocks in case something unexpected happens. Errors are logged
|
||||
but the exception has to be handled through logic in the callbacks _scan_done_callbacks.
|
||||
Monitoring loop that runs in a separate thread to check if all async data has been emitted to BEC.
|
||||
It is IDLE most of the time, but activate in the 'on_complete' method called by 'complete'.
|
||||
|
||||
The check is done by comparing the number of data updates received through mca channel callbacks
|
||||
with the expected number of points in the scan. Once they match, all callbacks in _scan_done_callbacks
|
||||
are called to indicate that data emission is done.
|
||||
The check is done by comparing the number of data updates '_current_data_index' received through
|
||||
mca channel callbacks with the expected number of points in the scan. Once they match, all
|
||||
callbacks in _scan_done_callbacks are called to indicate that data emission is done.
|
||||
Callbacks need to also accept and handle exceptions to properly report failure.
|
||||
NOTE! This logic currently works for any step scan, but has to be extended for fly scans.
|
||||
|
||||
Args:
|
||||
timeout (int): Timeout in seconds to avoid deadlocks. Defaults to 10 seconds.
|
||||
"""
|
||||
while not self._scan_done_thread_kill_event.is_set():
|
||||
while self._start_monitor_async_data_emission.wait():
|
||||
@@ -422,12 +440,12 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
def on_complete(self) -> CompareStatus:
|
||||
"""
|
||||
|
||||
Method that is called at the end of scan core, but before unstage.
|
||||
In here, the device should report whether it successfully finished
|
||||
data acquisition for the scan. This has to happen asynchronously,
|
||||
so through a status object (future promise) that is returned and resolves
|
||||
once logic for data acquisition on the MCS card is finished. This is required
|
||||
because data updates that occur after the scan is closed will be rejected by BEC.
|
||||
Method that is called at the end of scan core, but before unstage. This method is
|
||||
used to report whether the device successfully completed its data acquisition for the scan.
|
||||
The check has to be implemented asynchronously and resolve through a status (future) object
|
||||
returned by this method.
|
||||
NOTE: For the MCS card, we need to ensure that all data has been acquired
|
||||
and emitted to BEC as updates after 'on_complete' resolved will be rejected by BEC.
|
||||
Therefore, we need to ensure that all data has been emitted to BEC before
|
||||
reporting completion of the device.
|
||||
|
||||
@@ -454,12 +472,16 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
|
||||
# Combine both statuses
|
||||
ret_status = status & status_async_data
|
||||
# Handle external stop/cancel, and stop monitoring
|
||||
ret_status.add_callback(self._status_failed_callback)
|
||||
self.cancel_on_stop(ret_status)
|
||||
return status
|
||||
|
||||
def on_destroy(self):
|
||||
"""Hook called when the device is being destroyed. It should clean up any resources including threads."""
|
||||
"""
|
||||
The on destroy hook is called when the device is destroyed, but also reloaded.
|
||||
Here, we need to clean up all resources used up by the device, including running threads.
|
||||
"""
|
||||
self._scan_done_thread_kill_event.set()
|
||||
self._start_monitor_async_data_emission.set()
|
||||
if self._scan_done_thread.is_alive():
|
||||
@@ -486,10 +508,12 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
logger.info(
|
||||
f"Running recovery procedure for MCS card {self.name} with {sleep_time}s sleep, calling stop_all and erase_all, and another {sleep_time}s sleep"
|
||||
)
|
||||
# First erase and start ongoing acquisition.
|
||||
self.erase_start.put(1)
|
||||
time.sleep(sleep_time)
|
||||
# After a brief processing time, we stop any ongoing acquisition.
|
||||
self.stop_all.put(1)
|
||||
with suppress_mca_callbacks(self):
|
||||
# Finally, we erase all data while suppressing mca callbacks to avoid interference.
|
||||
# We restore the callback suppression after timeout to ensure proper operation afterwards.
|
||||
with suppress_mca_callbacks(self, restore_after_timeout=sleep_time):
|
||||
self.erase_all.put(1)
|
||||
time.sleep(sleep_time)
|
||||
self._omit_mca_callbacks.clear()
|
||||
|
||||
Reference in New Issue
Block a user