diff --git a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py index e86fa76..f48fedb 100644 --- a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py +++ b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py @@ -1,4 +1,12 @@ -"""Module for the MCSCard CSAXS implementation.""" +""" +Module for the MCSCard CSAXS implementation at cSAXS. + +Please respect the comments regarding timing and procedure of the MCS card. These +are highlighted with NOTE comments directly in the code to indicate if certain calls +are critical for stable operation of the device. Most of them are emipirically found +through long-term testing with the IOC of the SIS3820 MCS card, and attempt to avoid +unexpected behavior of the HW and IOC. +""" from __future__ import annotations @@ -14,9 +22,8 @@ import numpy as np from bec_lib.logger import bec_logger from ophyd import Component as Cpt from ophyd import EpicsSignalRO, Kind -from ophyd_devices import AsyncSignal, CompareStatus, ProgressSignal, StatusBase +from ophyd_devices import AsyncMultiSignal, CompareStatus, ProgressSignal, StatusBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase -from ophyd_devices.utils.bec_signals import AsyncMultiSignal from csaxs_bec.devices.epics.mcs_card.mcs_card import ( ACQUIREMODE, @@ -34,13 +41,21 @@ from csaxs_bec.devices.epics.mcs_card.mcs_card import ( @contextmanager def suppress_mca_callbacks(mcs_card: MCSCard): """ - Context manager to temporarily disable MCA channel callbacks. - Required to avoid additional callbacks when erasing all channels. + Utility context manager to suppress MCA channel callbacks temporarily. + It is required because erasing all channels via 'erase_all' PV triggers + callbacks for each channel. Depending on timing, this can interfere with + ongoing data acquisition so this context manager can be used to suppress + those callbacks temporarily. + + NOTE: Please be aware that it does not restore previous state, which means + that _omit_mca_callbacks will remain set after exiting the context. It has + to be cleared manually if needed. This can be improved in the future, but + should be carefully coordinated with the logic implemented within '_on_counter_update'. Args: mcs_card (MCSCard): The MCSCard instance to suppress callbacks for. """ - mcs_card._omit_mca_callbacks.set() + mcs_card._omit_mca_callbacks.set() # pylint: disable=protected-access try: yield finally: @@ -59,14 +74,27 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): The basic functionality is inherited from the MCSCard class. Please note that the number of channels is fixed to 32, so there will be data for all - 32 channels even if not all channels are used in the experiment. This setting can not - be realibly changed on the SIS3820 card's IOC through mux_output, so it is fixed here. - Mux_output should therefore also be set to 32 in the IOC configuration. + 32 channels. The logic of the card is linked to the timing system (DDG) and therefore, + has to be coordinated with the logic on the DDG side. + + Args: + name (str): Name of the device. + prefix (str, optional): Prefix for the EPICS PVs. Defaults to "". """ + USER_ACCESS = ["mcs_recovery"] + # NOTE The number of MCA channels is fixed to 32 for the CSAXS MCS card. + # On the IOC, we receive a 'warning' or 'error' once we set this channel for the + # envisioned input/output mode settings of the card. However, we need to know the + # channels set as callback timing relies on the channels to be set. + # For the future, we may consider adding an initialization parameter to set + # the number of channels, which in return limits the number of subscriptions + # on the channels. However, mux_output should still be set to 32 on the IOC side. + # If this limits performance, this should be investigated with Controls engineers and + # the IOC. NUM_MCA_CHANNELS: int = 32 - # All counter from the MCS card. + # MCA counters for the card. Channels 1-32 will be sent to BEC. mca = Cpt( AsyncMultiSignal, name="counters", @@ -77,9 +105,11 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): async_update={"type": "add", "max_shape": [None]}, max_size=1000, kind=Kind.normal, - doc="AsyncMultiSignal for MCA card channels 1-32", + doc=( + "AsyncMultiSignal for MCA card channels 1-32." + "Cabling of the MCS card determines which channel corresponds to which input." + ), ) - # Progress Signal progress = Cpt(ProgressSignal, doc="ProgressSignal indicating the progress of the device") def __init__( @@ -90,21 +120,30 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): device_manager: DeviceManagerBase | None = None, **kwargs, ): - """ - Initialize the MCSCardCSAXS with the given arguments and keyword arguments. - """ super().__init__( name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs ) + # NOTE MCS Clock frequency. This is linked to the settings of the SIS3820 IOC and + # cabeling of the card. Currently, the 'output_mode' is set to MODE_2 and output 6 or 7 + # (both 10MHz clocks) are used on channel 5 input for timing signal of the IOC. + # Please adjust this comment if the cabling or IOC settings change. self._mcs_clock = 1e7 # 10MHz clock -> 1e7 Hz self._pv_timeout = 2.0 # seconds - self._rlock = RLock() # Needed to ensure thread safety for counter updates - self._acquisition_group: str = "monitored" + self._rlock = RLock() + + # NOTE This parameter will be sent with async data of the mcs counters. + # Based on scan-paramters, e.g. frames_per_trigger, this will be either + # 'monitored' or 'burst_group'. This means whether data from this channel + # is in sync with monitored devices or another group. In this scenario, + # the other group is called burst_group. Other detectors connected and + # triggered through the same timing system should implement the same logic + # to allow data to be properly grouped afterwards. + self._acquisition_group: str = "monitored" # default value, will be updated in on_stage self._num_total_triggers: int = 0 - # Event logic to schedule async data emission & monitoring - # Please note that complete needs to wait until all data was sent - # This requires additional logic and a thread to monitor the data emission + # Thread and event logic for monitoring async data emission after scan is done + # These are mostly internal variables for which values should not be changed externally. + # Adjusting the logic of them should also be handled with care and proper testing. self._scan_done_thread_kill_event: threading.Event = threading.Event() self._start_monitor_async_data_emission: threading.Event = threading.Event() self._scan_done_callbacks: list[Callable[[], None]] = [] @@ -118,17 +157,40 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): def on_connected(self): """ - Called when the device is connected. + This method is called once the device and all its PVs are connected. Any initial + setup of PVs should be managed here. Please beware that settings of the MCS card + correlate with its operation mode, input/output modes, and timing. Changing single + parameters without understanding the overall logic may lead to unexpected behavior + of the device.Therefore, any modification of these parameters should be handled + with care and tested. + + A brief summary of the procesdure that is implemented here: + - Stop any ongoing acquisiton. + - Setup the Initial initial settings of the MCS card with respective operation modes + - Run 'mcs_recovery' procedure to ensure that no pending acquisition data is scheduled + to be pushed through mcs channels + - Subscribe a callback '_on_counter_update' to mcs counter PVs to forward + data through AsyncMultiSignal to BEC + - Start the monitoring thread for async data emission after scan is done """ + # NOTE Stop any ongoing acquisition first. This shut be done before setting any PVs. self.stop_all.put(1) - - # Setup the MCS card settings + ######################### + ### Setup MCS Card ### + ######################### + # Setup the MCS card settings. Please note that any runtime modification + # these parameter may lead to unexpected behavior of the device. + # Therefore this has to be set up correctly. self.channel_advance.set(CHANNELADVANCE.EXTERNAL).wait(timeout=self._pv_timeout) self.channel1_source.set(CHANNEL1SOURCE.EXTERNAL).wait(timeout=self._pv_timeout) self.prescale.set(1).wait(timeout=self._pv_timeout) self.user_led.set(0).wait(timeout=self._pv_timeout) + # NOTE The number of output channels has to be set to NUM_MCA_CHANNELS. + # The logic to send data to BEC relies on knowing how many channels are active. + self.mux_output.put(self.NUM_MCA_CHANNELS) + # Set the input and output modes & polarities self.input_mode.set(INPUTMODE.MODE_3).wait(timeout=self._pv_timeout) self.input_polarity.set(POLARITY.NORMAL).wait(timeout=self._pv_timeout) @@ -136,6 +198,8 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): self.output_polarity.set(POLARITY.NORMAL).wait(timeout=self._pv_timeout) self.count_on_start.set(0).wait(timeout=self._pv_timeout) + # NOTE Data is read out when the MCS card finishes an acquisition. The logic for this + # is also linked to triggering on the DDG. # Set ReadMode to PASSIVE, card will wait for external trigger to be read self.read_mode.set(READMODE.PASSIVE).wait(timeout=self._pv_timeout) @@ -145,57 +209,85 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): # Subscribe the progress signal self.current_channel.subscribe(self._progress_update, run=False) - self.mcs_recovery() + # NOTE: Run a recovery procedure to ensure that the card has no pending data + # that needs to be pushed through the mca channels. The procedure involves + # stopping any ongoing acquisition and erasing all data on the card. Including + # a short sleep to allow the IOC to process the commands. + self.mcs_recovery(timeout=1) - # Subscribe to the mca updates + #################################### + ### Setup MCS Subscriptions ### + #################################### for sig in self.counters.component_names: sig_obj: EpicsSignalRO = getattr(self.counters, sig) sig_obj.subscribe(self._on_counter_update, run=False) # Start monitoring thread self._scan_done_thread.start() - def _on_counter_update(self, value, **kwargs) -> None: """ Callback for counter updates of the mca channels (1-32). - Data from the mca channels will be pushed to a list, and then forwarded to - the async multi signal 'raw' for readout after the trigger is complete. + It is important that mux_output is set to the correct number of channels in on_connected, + because the callback here waits for updates on all channels before pushing data to BEC. + The lock is important to avoid that different threads execute this callback simultaneously, + which would jeopardize the logic that counts the number of received channels. + If _omit_mca_callbacks is set, the callback will return immediately without processing the + data. This is used when erasing all channels to avoid interference with ongoing acquisition. + It has to manually cleared after the context manager 'suppress_mca_callbacks' is used. + + Args: + value: The new value from the counter PV. + **kwargs: Additional keyword arguments from the subscription, including 'obj' (the EpicsSignalRO instance). """ with self._rlock: self._mca_counter_index += 1 if self._omit_mca_callbacks.is_set(): return # Suppress callbacks when erasing all channels - signal = kwargs.get("obj", None) + signal: EpicsSignalRO | None = kwargs.get("obj", None) if signal is None: logger.error(f"Called without 'obj' in kwargs: {kwargs}") - return - signal: EpicsSignalRO - attr_name = signal.attr_name + return # Log error if no signal object is provided - # Ignore updates for channels that are not setup through num_connected_channels + # Extract index from signal name that updates + # Ignore any updates from channels beyond NUM_MCA_CHANNELS + attr_name = signal.attr_name index = int(attr_name[3:]) # Extract index from 'mcaX' if index > self.NUM_MCA_CHANNELS: return + # NOTE Depending on the scan parameters, we may either receive single values or numpy arrays. + # Therefore, we need to handle both cases here to ensure that data is always stored. We do + # this by converting single values to a list with one element, and numpy arrays to lists. if isinstance(value, np.ndarray): value = value.tolist() # Convert numpy array to list else: value = [value] # Received single value, convert to list + + # Store the value with timestamp. If available in kwargs, use provided timestamp from CA, otherwise use current time when received. self._current_data.update( {attr_name: {"value": value, "timestamp": kwargs.get("timestamp") or time.time()}} ) + + # Once we have received all channels, push data to BEC and reset for next accumulation if len(self._current_data) == self.NUM_MCA_CHANNELS: - # Send out data on multi async signal self.mca.put(self._current_data, acquisition_group=self._acquisition_group) self._current_data.clear() self._mca_counter_index = 0 self._current_data_index += 1 + # pylint: disable=unused-argument def _progress_update(self, *args, old_value: any, value: any, **kwargs) -> None: - """Callback for progress updates from ophyd subscription on current_channel.""" + """ + Callback to update the progress signals base on values of current_channel in respect to expected total triggers. + Logic for these updates need to be extended once fly and step scan logic is fully implemented. + + Args: + old_value: Previous value of the signal. + value: New value of the signal. + """ scan_done = bool(value == self._num_total_triggers) self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done) if scan_done: @@ -203,7 +295,12 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): def on_stage(self) -> None: """ - Called when the device is staged. + Method called when the device is staged before a scan. Any bootstrapping required + for the scan should be handled here. + + The following procedure is implemented here: + - Ensure that any ongoing acquisition is stopped (should never happen if not interfered with manually) + - Erase all data on the MCS card to ensure a clean start (should never """ # NOTE: If for some reason, the card is still acquiring, we need to stop it first @@ -217,12 +314,12 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): status = CompareStatus(self.acquiring, ACQUIRING.DONE) status.wait(timeout=10) - # NOTE: Erase all will result in data emission through mca callback subscriptions + # NOTE: If current_channel != 0, erase all data on the card. This + # needs to be done with the 'suppress_mca_callbacks' context manager as erase_all will result + # in data emission through mca callback subscriptions. # The buffer needs to be cleared as this will otherwise lead to missing # triggers during the scan. Again, this should not happen if unstage is properly called. # But user interference or a restart of the device_server may lead to this situation. - # self.erase_all.put(1) - # time.sleep(3) if self.current_channel.get() != 0: with suppress_mca_callbacks(self): logger.warning( @@ -231,7 +328,12 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): # Erase all data on the MCS card self.erase_all.put(1) time.sleep(1) # Allow time to process erase + # Clear the omit flag after erasing and waiting for 1s + self._omit_mca_callbacks.clear() + ##################################### + ### Setup Acquisition Parameters ### + ##################################### triggers = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1) num_points = self.scan_info.msg.num_points self._num_total_triggers = triggers * num_points @@ -239,62 +341,114 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): self.preset_real.set(0).wait(timeout=self._pv_timeout) self.num_use_all.set(triggers).wait(timeout=self._pv_timeout) - # Make sure to clear any remaining data in the local buffer + # Clear any previous data, just to be sure with self._rlock: self._current_data.clear() - # Reset monitoring of async data emission + + # NOTE Reset events for monitoring async_data_emission thread which is + # running during complete to wait for all data from the card + # to be emitted to BEC. self._start_monitor_async_data_emission.clear() + + # Clear any previous scan done callbacks self._scan_done_callbacks.clear() + + # Reset counter for data index of emitted data, for fly scans, this logic may have to be adjusted. self._current_data_index = 0 def on_unstage(self) -> None: """ - Called when the device is unstaged. + Called when the device is unstaged. This method should be omnipotent and resolve fast. + It stops any ongoing acquisition, erases all data on the MCS and clears the local buffer '_current_data'. + + NOTE: It is important that the logic for on_complete is solid and properly waiting for mca data to be emitted + to BEC. Otherwise, unstage may interfere with ongoing data emission. Unstage is called after complete during scans. """ self.stop_all.put(1) - # Make sure that upon unstaging, all data on the MCS card is erased. with suppress_mca_callbacks(self): with self._rlock: self._current_data.clear() self.erase_all.put(1) - def _monitor_async_data_emission(self, timeout: int = 10) -> None: - """Monitor data emission after scan is done.""" + def _monitor_async_data_emission(self) -> None: + """ + Monitoring loop that runs in a separate thread to check when all data has been emitted. + It is IDLE most of the time, but started once complete is called. Please be aware that + it has a timeout to avoid deadlocks in case something unexpected happens. Errors are logged + but the exception has to be handled through logic in the callbacks _scan_done_callbacks. + + The check is done by comparing the number of data updates received through mca channel callbacks + with the expected number of points in the scan. Once they match, all callbacks in _scan_done_callbacks + are called to indicate that data emission is done. + NOTE! This logic currently works for any step scan, but has to be extended for fly scans. + + Args: + timeout (int): Timeout in seconds to avoid deadlocks. Defaults to 10 seconds. + """ while not self._scan_done_thread_kill_event.is_set(): while self._start_monitor_async_data_emission.wait(): try: if self._current_data_index == self.scan_info.msg.num_points: for callback in self._scan_done_callbacks: - callback() + callback(exception=None) time.sleep(0.02) # 20ms delay to avoid busy loop - except Exception: # pylint: disable=broad-except + except Exception as exc: # pylint: disable=broad-except content = traceback.format_exc() logger.error( f"Exception in monitoring thread of complete for {self.name}:\n{content}" "Running callbacks to avoid deadlock." ) for callback in self._scan_done_callbacks: - callback() + callback(exception=exc) - def _status_callback(self, status: StatusBase) -> None: + def _status_callback(self, status: StatusBase, exception=None) -> None: """Callback for status completion.""" - if not status.done: - status.set_finished() self._start_monitor_async_data_emission.clear() # Stop monitoring + # NOTE Important check as set_finished or set_exception should not be called + # if the status is already done (e.g. cancelled externally) + if status.done: + return # Already done and cancelled externally. + if exception: + status.set_exception(exception) + else: + status.set_finished() def _status_failed_callback(self, status: StatusBase) -> None: - """Callback for status failure.""" - if status.done and not status.success: + """Callback for status failure, the monitoring thread should be stopped.""" + # NOTE Check for status.done and status.success is important to avoid + if status.done: self._start_monitor_async_data_emission.clear() # Stop monitoring def on_complete(self) -> CompareStatus: - """On scan completion.""" - # Prepare callback for data emission done + """ + + Method that is called at the end of scan core, but before unstage. + In here, the device should report whether it successfully finished + data acquisition for the scan. This has to happen asynchronously, + so through a status object (future promise) that is returned and resolves + once logic for data acquisition on the MCS card is finished. This is required + because data updates that occur after the scan is closed will be rejected by BEC. + Therefore, we need to ensure that all data has been emitted to BEC before + reporting completion of the device. + + This method implements the following procedure: + - Starts the IDLE async data monitoring thread that checks if all expected data + has been emitted to BEC through the mca channel callbacks. + - Use a CompareStatus to monitor when the MCS card becomes DONE. Please note that this + only indicates that the card has finished acquisition, but not that all data has been + emitted to BEC. + - Return combined status object. A callback is registered to handle failure of the status + if it is stopped externally, e.g. through scan abort. This should ensure that the + monitoring thread is stopped properly. + + """ + # Prepare and register status callback for the async monitoring loop status_async_data = StatusBase(obj=self) self._scan_done_callbacks.append(partial(self._status_callback, status_async_data)) - # Start done callback loop + # Set the event to start monitoring async data emission self._start_monitor_async_data_emission.set() + # Add CompareStatus for Acquiring DONE status = CompareStatus(self.acquiring, ACQUIRING.DONE) @@ -305,6 +459,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): return status def on_destroy(self): + """Hook called when the device is being destroyed. It should clean up any resources including threads.""" self._scan_done_thread_kill_event.set() self._start_monitor_async_data_emission.set() if self._scan_done_thread.is_alive(): @@ -313,18 +468,28 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): logger.warning(f"Thread for device {self.name} did not terminate properly.") def on_stop(self) -> None: - """ - Called when the scan is stopped. - """ + """Hook called when the device is stopped. In addition, any status that is registered through cancel_on_stop will be cancelled here.""" self.stop_all.put(1) self.erase_all.put(1) - def mcs_recovery(self, timeout:int=1) -> None: - """Recovery procedure for the mcs card""" - sleep_time = timeout / 2 # 2 sleeps - logger.info(f"Running recovery procedure for MCS card {self.name} with {sleep_time}s sleep, calling stop_all and erase_all, and another {sleep_time}s sleep") + def mcs_recovery(self, timeout: int = 1) -> None: + """ + Recovery procedure for the mcs card. This procedure has been empirically found and can + be used to ensure that the MCS card is stopped and has no pending data to be emitted. + It involves stopping any ongoing acquisition and erasing all data on the card, with + a sleep in between to allow the IOC to process the commands. + + Args: + timeout (int): Total timeout for the recovery procedure. Defaults to 1 second. + """ + sleep_time = timeout / 2 # 2 sleeps + logger.info( + f"Running recovery procedure for MCS card {self.name} with {sleep_time}s sleep, calling stop_all and erase_all, and another {sleep_time}s sleep" + ) self.erase_start.put(1) time.sleep(sleep_time) self.stop_all.put(1) - self.erase_all.put(1) - time.sleep(sleep_time) \ No newline at end of file + with suppress_mca_callbacks(self): + self.erase_all.put(1) + time.sleep(sleep_time) + self._omit_mca_callbacks.clear()