From bfcecd73c251e63535ad85cd69db008c77dab2b5 Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 15 Jan 2026 08:36:54 +0100 Subject: [PATCH] refactor(mcs-ddg): cleanup and fix mcs and ddg from beamline tests --- .../epics/delay_generator_csaxs/ddg_1.py | 3 ++ .../epics/delay_generator_csaxs/ddg_2.py | 3 ++ csaxs_bec/devices/epics/mcs_card/README.md | 2 +- .../devices/epics/mcs_card/mcs_card_csaxs.py | 41 ++++++++++++++----- 4 files changed, 37 insertions(+), 12 deletions(-) diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py index c3a1160..29a0623 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py @@ -212,6 +212,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): - We set the delay pairs ef to be triggered after the shutter closes with a width of 1us to trigger the MCS card. - Finally, we add a short sleep to ensure that the IOC and DDG HW process the values properly. """ + start_time = time.time() ######################################## ### Burst mode settings ################ @@ -264,6 +265,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): # This value has been choosen empirically after testing with the HW. It's # also just called once per scan and has been found to improve stability of the HW. time.sleep(0.2) + logger.info(f"DDG {self.name} on_stage completed in {time.time() - start_time:.3f}s.") def _prepare_mcs_on_trigger(self, mcs: MCSCardCSAXS) -> None: """ @@ -415,6 +417,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): # Callback to cancel the status if the device is stopped def cancel_cb(status: CompareStatus) -> None: """Callback to cancel the status if the device is stopped.""" + logger.debug("DDG1 end of burst detected, stopping polling loop.") if status.done: self._stop_polling() diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py index 55dad7c..bf781f3 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py @@ -145,6 +145,7 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS): This logic is robust for step scans as well as fly scans, as the DDG2 is triggered by the DDG1 through the EXT/EN channel. """ + start_time = time.time() ######################################## ### Burst mode settings ################ ######################################## @@ -183,6 +184,8 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS): # Trigger detectors with delay 0, and pulse width = exp_time - readout_time self.set_delay_pairs(channel="ab", delay=0, width=burst_pulse_width) + logger.info(f"DDG {self.name} on_stage completed in {time.time() - start_time:.3f}s.") + def on_pre_scan(self): """ diff --git a/csaxs_bec/devices/epics/mcs_card/README.md b/csaxs_bec/devices/epics/mcs_card/README.md index d76f8a3..7465c79 100644 --- a/csaxs_bec/devices/epics/mcs_card/README.md +++ b/csaxs_bec/devices/epics/mcs_card/README.md @@ -10,4 +10,4 @@ Operation of the MCS card requires proper configuration as some of the parameter The ophyd device implementation is provided [MCSCard](./mcs_card.py). This class provides a basic interface to the MCS PVs, including configuration of parameters such as number of channels, dwell time, and control of acquisition start/stop. Please check the source code of the class for more details of the implementation. -The [MCSCardCSAXS](./mcs_card_csaxs.py) class extends the basic MCSCard implementation with cSAXS-specific logic and configurations. Please be aware that this is also linked to the implementation of other devices, most notably the [delay generator integration](../delay_generator/README.md), which is used as the trigger source for the MCS card during operation. \ No newline at end of file +The [MCSCardCSAXS](./mcs_card_csaxs.py) class extends the basic MCSCard implementation with cSAXS-specific logic and configurations. Please be aware that this is also linked to the implementation of other devices, most notably the [delay generator integration](../delay_generator_csaxs/README.md), which is used as the trigger source for the MCS card during operation. \ No newline at end of file diff --git a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py index b42e3b6..1ff3d08 100644 --- a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py +++ b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py @@ -255,9 +255,9 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): **kwargs: Additional keyword arguments from the subscription, including 'obj' (the EpicsSignalRO instance). """ with self._rlock: - self._mca_counter_index += 1 if self._omit_mca_callbacks.is_set(): return # Suppress callbacks when erasing all channels + self._mca_counter_index += 1 signal: EpicsSignalRO | None = kwargs.get("obj", None) if signal is None: logger.error(f"Called without 'obj' in kwargs: {kwargs}") @@ -286,7 +286,13 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): ) # Once we have received all channels, push data to BEC and reset for next accumulation + logger.debug( + f"Received update for {attr_name}, index {self._mca_counter_index}/{self.NUM_MCA_CHANNELS}" + ) if len(self._current_data) == self.NUM_MCA_CHANNELS: + logger.debug( + f"Current data index {self._current_data_index} complete, pushing to BEC." + ) self.mca.put(self._current_data, acquisition_group=self._acquisition_group) self._current_data.clear() self._mca_counter_index = 0 @@ -322,6 +328,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): - Clear any events and buffers related to async data emission. This includes '_omit_mca_callbacks', '_start_monitor_async_data_emission', '_scan_done_callbacks', and '_current_data'. """ + start_time = time.time() # NOTE: If for some reason, the card is still acquiring, we need to stop it first # This should never happen as the card is properly stopped during unstage @@ -361,6 +368,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): # Clear any previous data, just to be sure with self._rlock: self._current_data.clear() + self._mca_counter_index = 0 # NOTE Reset events for monitoring async_data_emission thread which is # running during complete to wait for all data from the card @@ -376,6 +384,8 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): # NOTE Make sure that the signal that omits mca callbacks is cleared self._omit_mca_callbacks.clear() + logger.info(f"MCS Card {self.name} on_stage completed in {time.time() - start_time:.3f}s.") + def on_unstage(self) -> None: """ Called when the device is unstaged. This method should be omnipotent and resolve fast. @@ -390,6 +400,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): with suppress_mca_callbacks(self): with self._rlock: self._current_data.clear() + self._current_data_index = 0 self.erase_all.put(1) def _monitor_async_data_emission(self) -> None: @@ -406,9 +417,14 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): while not self._scan_done_thread_kill_event.is_set(): while self._start_monitor_async_data_emission.wait(): try: - if self._current_data_index == self.scan_info.msg.num_points: - for callback in self._scan_done_callbacks: - callback(exception=None) + logger.debug(f"Monitoring async data emission for {self.name}...") + if ( + hasattr(self.scan_info.msg, "num_points") + and self.scan_info.msg.num_points is not None + ): + if self._current_data_index == self.scan_info.msg.num_points: + for callback in self._scan_done_callbacks: + callback(exception=None) time.sleep(0.02) # 20ms delay to avoid busy loop except Exception as exc: # pylint: disable=broad-except content = traceback.format_exc() @@ -424,17 +440,19 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): self._start_monitor_async_data_emission.clear() # Stop monitoring # NOTE Important check as set_finished or set_exception should not be called # if the status is already done (e.g. cancelled externally) - if status.done: - return # Already done and cancelled externally. - if exception: - status.set_exception(exception) - else: - status.set_finished() + with self._rlock: + if status.done: + return # Already done and cancelled externally. + if exception is not None: + status.set_exception(exception) + else: + status.set_finished() def _status_failed_callback(self, status: StatusBase) -> None: """Callback for status failure, the monitoring thread should be stopped.""" # NOTE Check for status.done and status.success is important to avoid if status.done: + self._start_monitor_async_data_emission.clear() # Stop monitoring def on_complete(self) -> CompareStatus: @@ -465,6 +483,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): self._scan_done_callbacks.append(partial(self._status_callback, status_async_data)) # Set the event to start monitoring async data emission + logger.debug(f"Starting to monitor async data emission for {self.name}...") self._start_monitor_async_data_emission.set() # Add CompareStatus for Acquiring DONE @@ -505,7 +524,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): timeout (int): Total timeout for the recovery procedure. Defaults to 1 second. """ sleep_time = timeout / 2 # 2 sleeps - logger.info( + logger.debug( f"Running recovery procedure for MCS card {self.name} with {sleep_time}s sleep, calling stop_all and erase_all, and another {sleep_time}s sleep" ) # First erase and start ongoing acquisition.