diff --git a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py index 54a89e8..0deefc2 100644 --- a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py +++ b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py @@ -373,7 +373,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): # This requires additional logic and a thread to monitor the data emission self._scan_done_thread_kill_event: threading.Event = threading.Event() self._start_monitor_async_data_emission: threading.Event = threading.Event() - self._scan_done_callbacks : list[Callable[[], None]] = [] + self._scan_done_callbacks: list[Callable[[], None]] = [] self._scan_done_thread: threading.Thread | None = None self._current_data_index: int = 0 @@ -440,7 +440,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): else: value = [value] # Received single value, convert to list data = { - f"{self.name}_{mca_channel.name}": { + f"{mca_channel.name}": { "value": value, "timestamp": kwargs.get("timestamp") or time.time(), } @@ -449,10 +449,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): f"Received update for {signal.name}. Setting data for {mca_channel.name}: to {data}" ) mca_channel.put(data) - if self._scan_done_event.is_set(): - # Last data sent after scan is done TODO, improve logic as this may fail due to timing.. - # better to count the number of data that was sent... - self._last_data_sent_event.set() self._received_updates.update(data) if len(self._received_updates) == self.num_connected_channels: # Send out data on multi async signal @@ -484,7 +480,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): # Reset monitoring of async data emission self._start_monitor_async_data_emission.clear() - self._last_data_sent_event.clear() + self._last_data_sent_event.clear() self._current_data_index = 0 def on_unstage(self) -> None: @@ -502,31 +498,37 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): for callback in self._scan_done_callbacks: callback() - time.sleep(0.02) # 20ms delay to avoid busy loop + time.sleep(0.02) # 20ms delay to avoid busy loop + def _status_callback(self, status: StatusBase) -> None: + """Callback for status completion.""" + if not status.done: + status.set_finished() + self._start_monitor_async_data_emission.clear() # Stop monitoring - - while self._start_monitor_async_data_emission.wait(): + def _status_failed_callback(self, status: StatusBase) -> None: + """Callback for status failure.""" + if status.done and not status.success: + self._start_monitor_async_data_emission.clear() # Stop monitoring def on_complete(self) -> CompareStatus: """On scan completion.""" # Check Acquiring is DONE - status_data_sent = self.task_handler.submit_task(self._check_data_sent, task_args=(5.0,)) + status_async_data = StatusBase(obj=self) + status_async_data.add_callback(self._status_callback) status = CompareStatus(self.acquiring, ACQUIRING.DONE) - ret_status = status & status_data_sent + ret_status = status & status_async_data + ret_status.add_callback(self._status_failed_callback) self.cancel_on_stop(ret_status) return status - + def on_destroy(self): self._scan_done_thread_kill_event.set() self._start_monitor_async_data_emission.set() - if self._scan_done_thread is not None: - self._scan_done_thread.join(timeout=2.0) if self._scan_done_thread.is_alive(): - logger.warning(f"Scan done monitoring thread for device {self.name} did not terminate properly.") - - - + self._scan_done_thread.join(timeout=2.0) + if self._scan_done_thread.is_alive(): + logger.warning(f"Thread for device {self.name} did not terminate properly.") def on_stop(self) -> None: """