w
This commit is contained in:
@@ -373,7 +373,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
# This requires additional logic and a thread to monitor the data emission
|
||||
self._scan_done_thread_kill_event: threading.Event = threading.Event()
|
||||
self._start_monitor_async_data_emission: threading.Event = threading.Event()
|
||||
self._scan_done_callbacks : list[Callable[[], None]] = []
|
||||
self._scan_done_callbacks: list[Callable[[], None]] = []
|
||||
self._scan_done_thread: threading.Thread | None = None
|
||||
self._current_data_index: int = 0
|
||||
|
||||
@@ -440,7 +440,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
else:
|
||||
value = [value] # Received single value, convert to list
|
||||
data = {
|
||||
f"{self.name}_{mca_channel.name}": {
|
||||
f"{mca_channel.name}": {
|
||||
"value": value,
|
||||
"timestamp": kwargs.get("timestamp") or time.time(),
|
||||
}
|
||||
@@ -449,10 +449,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
f"Received update for {signal.name}. Setting data for {mca_channel.name}: to {data}"
|
||||
)
|
||||
mca_channel.put(data)
|
||||
if self._scan_done_event.is_set():
|
||||
# Last data sent after scan is done TODO, improve logic as this may fail due to timing..
|
||||
# better to count the number of data that was sent...
|
||||
self._last_data_sent_event.set()
|
||||
self._received_updates.update(data)
|
||||
if len(self._received_updates) == self.num_connected_channels:
|
||||
# Send out data on multi async signal
|
||||
@@ -484,7 +480,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
|
||||
# Reset monitoring of async data emission
|
||||
self._start_monitor_async_data_emission.clear()
|
||||
self._last_data_sent_event.clear()
|
||||
self._last_data_sent_event.clear()
|
||||
self._current_data_index = 0
|
||||
|
||||
def on_unstage(self) -> None:
|
||||
@@ -502,31 +498,37 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
for callback in self._scan_done_callbacks:
|
||||
callback()
|
||||
|
||||
time.sleep(0.02) # 20ms delay to avoid busy loop
|
||||
time.sleep(0.02) # 20ms delay to avoid busy loop
|
||||
|
||||
def _status_callback(self, status: StatusBase) -> None:
|
||||
"""Callback for status completion."""
|
||||
if not status.done:
|
||||
status.set_finished()
|
||||
self._start_monitor_async_data_emission.clear() # Stop monitoring
|
||||
|
||||
|
||||
while self._start_monitor_async_data_emission.wait():
|
||||
def _status_failed_callback(self, status: StatusBase) -> None:
|
||||
"""Callback for status failure."""
|
||||
if status.done and not status.success:
|
||||
self._start_monitor_async_data_emission.clear() # Stop monitoring
|
||||
|
||||
def on_complete(self) -> CompareStatus:
|
||||
"""On scan completion."""
|
||||
# Check Acquiring is DONE
|
||||
status_data_sent = self.task_handler.submit_task(self._check_data_sent, task_args=(5.0,))
|
||||
status_async_data = StatusBase(obj=self)
|
||||
status_async_data.add_callback(self._status_callback)
|
||||
status = CompareStatus(self.acquiring, ACQUIRING.DONE)
|
||||
ret_status = status & status_data_sent
|
||||
ret_status = status & status_async_data
|
||||
ret_status.add_callback(self._status_failed_callback)
|
||||
self.cancel_on_stop(ret_status)
|
||||
return status
|
||||
|
||||
|
||||
def on_destroy(self):
|
||||
self._scan_done_thread_kill_event.set()
|
||||
self._start_monitor_async_data_emission.set()
|
||||
if self._scan_done_thread is not None:
|
||||
self._scan_done_thread.join(timeout=2.0)
|
||||
if self._scan_done_thread.is_alive():
|
||||
logger.warning(f"Scan done monitoring thread for device {self.name} did not terminate properly.")
|
||||
|
||||
|
||||
|
||||
self._scan_done_thread.join(timeout=2.0)
|
||||
if self._scan_done_thread.is_alive():
|
||||
logger.warning(f"Thread for device {self.name} did not terminate properly.")
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user