w
This commit is contained in:
@@ -4,6 +4,7 @@ from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
from contextlib import contextmanager
|
||||
from functools import partial
|
||||
from threading import RLock
|
||||
@@ -113,7 +114,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
self._current_data_index: int = 0
|
||||
self._mca_counter_index: int = 0
|
||||
self._current_data: dict[str, dict[Literal["value", "timestamp"], list[int] | float]] = {}
|
||||
|
||||
self._omit_mca_callbacks: threading.Event = threading.Event()
|
||||
|
||||
def on_connected(self):
|
||||
@@ -122,6 +122,8 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
"""
|
||||
# Make sure card is not running
|
||||
self.stop_all.put(1)
|
||||
# Make sure the card is erased
|
||||
self.erase_all.put(1)
|
||||
|
||||
# Setup the MCS card settings
|
||||
self.channel_advance.set(CHANNELADVANCE.EXTERNAL).wait(timeout=self._pv_timeout)
|
||||
@@ -154,14 +156,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
# Start monitoring thread
|
||||
self._scan_done_thread.start()
|
||||
|
||||
# Check mux_output setting
|
||||
# mux_output_value = self.mux_output.get()
|
||||
# if mux_output_value != self.NUM_MCA_CHANNELS:
|
||||
# raise RuntimeError(
|
||||
# f"MuxOutput is set to {mux_output_value}, but should be set to "
|
||||
# f"{self.NUM_MCA_CHANNELS} for proper operation. Please update the IOC configuration."
|
||||
# )
|
||||
|
||||
def _on_counter_update(self, value, **kwargs) -> None:
|
||||
"""
|
||||
Callback for counter updates of the mca channels (1-32).
|
||||
@@ -212,15 +206,23 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
Called when the device is staged.
|
||||
"""
|
||||
|
||||
# Clear any existing data
|
||||
# NOTE: It is important to suppress the MCA callbacks here to avoid
|
||||
# data updates from the mca channels as they are emitted when erase_all
|
||||
# is called on the IOC.
|
||||
with suppress_mca_callbacks(self):
|
||||
# Erase all data on the MCS card
|
||||
with self._rlock:
|
||||
self._current_data.clear()
|
||||
self.erase_all.put(1)
|
||||
# NOTE: If for some reason, the card is still acquiring, we need to stop it first
|
||||
if self.acquiring.get() == ACQUIRING.ACQUIRING:
|
||||
self.stop_all.put(1)
|
||||
status = CompareStatus(self.acquiring, ACQUIRING.DONE)
|
||||
status.wait(timeout=10)
|
||||
|
||||
# NOTE: Erase all will result in data emission through mca callback subscriptions
|
||||
# The buffer needs to be cleared as this will otherwise lead to missing
|
||||
# triggers during the scan.
|
||||
if self.current_channel.get() != 0:
|
||||
with suppress_mca_callbacks(self):
|
||||
logger.warning(
|
||||
f"MCS Card {self.name} had still data in buffer Erased all data on staging and sleeping for 1 second."
|
||||
)
|
||||
# Erase all data on the MCS card
|
||||
self.erase_all.put(1)
|
||||
time.sleep(1) # Allow time to process erase
|
||||
|
||||
triggers = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
|
||||
num_points = self.scan_info.msg.num_points
|
||||
@@ -229,6 +231,9 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
self.preset_real.set(0).wait(timeout=self._pv_timeout)
|
||||
self.num_use_all.set(triggers).wait(timeout=self._pv_timeout)
|
||||
|
||||
# Make sure to clear any remaining data in the local buffer
|
||||
with self._rlock:
|
||||
self._current_data.clear()
|
||||
# Reset monitoring of async data emission
|
||||
self._start_monitor_async_data_emission.clear()
|
||||
self._scan_done_callbacks.clear()
|
||||
@@ -239,17 +244,30 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
Called when the device is unstaged.
|
||||
"""
|
||||
self.stop_all.put(1)
|
||||
# Make sure that upon unstaging, all data on the MCS card is erased.
|
||||
with suppress_mca_callbacks(self):
|
||||
with self._rlock:
|
||||
self._current_data.clear()
|
||||
self.erase_all.put(1)
|
||||
|
||||
def _monitor_async_data_emission(self, timeout: int = 10) -> bool:
|
||||
def _monitor_async_data_emission(self, timeout: int = 10) -> None:
|
||||
"""Monitor data emission after scan is done."""
|
||||
while not self._scan_done_thread_kill_event.is_set():
|
||||
while self._start_monitor_async_data_emission.wait():
|
||||
if self._current_data_index == self.scan_info.msg.num_points:
|
||||
try:
|
||||
if self._current_data_index == self.scan_info.msg.num_points:
|
||||
for callback in self._scan_done_callbacks:
|
||||
callback()
|
||||
time.sleep(0.02) # 20ms delay to avoid busy loop
|
||||
except Exception: # pylint: disable=broad-except
|
||||
content = traceback.format_exc()
|
||||
logger.error(
|
||||
f"Exception in monitoring thread of complete for {self.name}:\n{content}"
|
||||
"Running callbacks to avoid deadlock."
|
||||
)
|
||||
for callback in self._scan_done_callbacks:
|
||||
callback()
|
||||
|
||||
time.sleep(0.02) # 20ms delay to avoid busy loop
|
||||
|
||||
def _status_callback(self, status: StatusBase) -> None:
|
||||
"""Callback for status completion."""
|
||||
if not status.done:
|
||||
|
||||
Reference in New Issue
Block a user