fix(flomni): avoid XRayEye GUI RPC timeouts #208

Open
wyzula_j wants to merge 1 commits from fix/temp-xray-eye-timeout-fix into main
@@ -1,14 +1,17 @@
from __future__ import annotations
import threading
import time
from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
from bec_qthemes import material_icon
from bec_widgets import BECWidget, SafeProperty, SafeSlot
from bec_widgets.utils.rpc_decorator import rpc_timeout
from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from bec_widgets.widgets.plots.image.setting_widgets.image_roi_tree import ROIPropertyTree
from bec_widgets.widgets.plots.roi.image_roi import BaseROI, CircularROI, RectangularROI
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
from qtpy.QtCore import Qt, QTimer
from qtpy.QtWidgets import (
@@ -20,11 +23,11 @@ from qtpy.QtWidgets import (
QPushButton,
QSizePolicy,
QSpinBox,
QTabWidget,
QTextEdit,
QToolButton,
QVBoxLayout,
QWidget,
QTextEdit,
QTabWidget,
)
logger = bec_logger.logger
@@ -116,16 +119,38 @@ class XRayEye2DControl(BECWidget, QWidget):
if tweak:
step = int(self._step_size / 5)
if direction == "up":
self.dev.omny_xray_gui.mvy.set(step)
self._run_device_action_async(
lambda: self.dev.omny_xray_gui.mvy.set(step), "set X-ray eye Y move"
)
elif direction == "down":
self.dev.omny_xray_gui.mvy.set(-step)
self._run_device_action_async(
lambda: self.dev.omny_xray_gui.mvy.set(-step), "set X-ray eye Y move"
)
elif direction == "left":
self.dev.omny_xray_gui.mvx.set(-step)
self._run_device_action_async(
lambda: self.dev.omny_xray_gui.mvx.set(-step), "set X-ray eye X move"
)
elif direction == "right":
self.dev.omny_xray_gui.mvx.set(step)
self._run_device_action_async(
lambda: self.dev.omny_xray_gui.mvx.set(step), "set X-ray eye X move"
)
else:
logger.warning(f"Unknown direction {direction} for move command.")
def _run_device_action_async(self, action, description: str) -> None:
parent = self.parent()
if hasattr(parent, "_run_device_action_async"):
parent._run_device_action_async(action, description)
return
def runner():
try:
action()
except Exception as exc:
logger.warning(f"Failed to {description}: {exc}")
threading.Thread(target=runner, daemon=True).start()
class XRayEye(BECWidget, QWidget):
USER_ACCESS = [
@@ -147,6 +172,16 @@ class XRayEye(BECWidget, QWidget):
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, **kwargs)
self._connected_motor = None
self._current_step = 0
self._fit_tab_initialized = False
self._live_view_enabled = False
self._live_view_requested = False
self._live_view_apply_pending = False
self._fit_tab_init_pending = False
self._pending_fit_array = None
self._closing = False
self._device_action_lock = threading.Lock()
self._device_action_threads = set()
self.get_bec_shortcuts()
self._init_ui()
@@ -159,10 +194,12 @@ class XRayEye(BECWidget, QWidget):
self.bec_dispatcher.connect_slot(
self.getting_camera_status, MessageEndpoints.device_read_configuration(CAMERA[0])
)
self.bec_dispatcher.connect_slot(
self.getting_xray_gui_readback, MessageEndpoints.device_readback("omny_xray_gui")
)
self.connect_motors()
self.resize(800, 600)
QTimer.singleShot(0, self._init_gui_trigger)
def _init_ui(self):
self.root_layout = QVBoxLayout(self)
@@ -280,6 +317,12 @@ class XRayEye(BECWidget, QWidget):
self.fit_tab = QWidget(parent=self)
self.fit_layout = QVBoxLayout(self.fit_tab)
self.tab_widget.addTab(self.fit_tab, "Fit")
def _ensure_fit_tab(self):
if self._fit_tab_initialized:
return
self.waveform_x = Waveform(parent=self.fit_tab)
self.waveform_y = Waveform(parent=self.fit_tab)
@@ -319,11 +362,10 @@ class XRayEye(BECWidget, QWidget):
self.fit_layout.addWidget(self.waveform_x)
self.fit_layout.addWidget(self.waveform_y)
self.tab_widget.addTab(self.fit_tab, "Fit")
self._fit_tab_initialized = True
def _make_connections(self):
# Fetch initial state
self.on_live_view_enabled(True)
# Keep construction lightweight. Live preview is enabled explicitly by GUI actions/RPC.
self.step_size.setValue(self.motor_control_2d.step_size)
# Make connections
@@ -332,6 +374,9 @@ class XRayEye(BECWidget, QWidget):
lambda x: self.motor_control_2d.setProperty("step_size", x)
)
self.submit_button.clicked.connect(self.submit)
self.tab_widget.currentChanged.connect(
lambda index: self._schedule_fit_tab_init() if index == 1 else None
)
def _create_separator(self):
sep = QFrame(parent=self)
@@ -340,10 +385,6 @@ class XRayEye(BECWidget, QWidget):
sep.setLineWidth(1)
return sep
def _init_gui_trigger(self):
self.dev.omny_xray_gui.read()
self.dev.fsh.read()
################################################################################
# Device Connection logic
################################################################################
@@ -402,6 +443,7 @@ class XRayEye(BECWidget, QWidget):
def switch_tab(self, tab: str):
if tab == "fit":
self.tab_widget.setCurrentIndex(1)
self._schedule_fit_tab_init()
else:
self.tab_widget.setCurrentIndex(0)
@@ -420,28 +462,54 @@ class XRayEye(BECWidget, QWidget):
def on_live_view_enabled(self, enabled: bool):
logger.info(f"Live view is enabled: {enabled}")
self.live_preview_toggle.blockSignals(True)
if enabled:
try:
self._live_view_requested = bool(enabled)
self.live_preview_toggle.checked = enabled
self.image.image(device=CAMERA[0], signal=CAMERA[1])
finally:
self.live_preview_toggle.blockSignals(False)
return
self._schedule_live_view_apply()
self.image.disconnect_monitor(CAMERA[0], CAMERA[1])
self.live_preview_toggle.checked = enabled
self.live_preview_toggle.blockSignals(False)
def _schedule_live_view_apply(self):
if self._closing or self._live_view_apply_pending:
return
self._live_view_apply_pending = True
QTimer.singleShot(0, self._apply_live_view_state)
def _apply_live_view_state(self):
self._live_view_apply_pending = False
if self._closing:
return
enabled = self._live_view_requested
if enabled == self._live_view_enabled:
return
try:
if enabled:
self.image.image(device=CAMERA[0], signal=CAMERA[1])
else:
self.image.disconnect_monitor(CAMERA[0], CAMERA[1])
self._live_view_enabled = enabled
except Exception as exc:
logger.warning(f"Failed to apply XRayEye live view state {enabled}: {exc}")
return
if enabled != self._live_view_requested:
self._schedule_live_view_apply()
@SafeSlot(bool)
def camera_running_enabled(self, enabled: bool):
logger.info(f"Camera running: {enabled}")
self.camera_running_toggle.blockSignals(True)
self.dev.get(CAMERA[0]).live_mode_enabled.put(enabled)
self.camera_running_toggle.checked = enabled
self.camera_running_toggle.blockSignals(False)
self._run_device_action_async(
lambda: self.dev.get(CAMERA[0]).live_mode_enabled.put(enabled), "set camera live mode"
)
@SafeSlot(dict, dict)
def getting_camera_status(self, data, meta):
print(f"msg:{data}")
live_mode_enabled = data.get("signals").get(f"{CAMERA[0]}_live_mode_enabled").get("value")
_ = meta
live_mode_enabled = (
data.get("signals", {}).get(f"{CAMERA[0]}_live_mode_enabled", {}).get("value", False)
)
self.camera_running_toggle.blockSignals(True)
self.camera_running_toggle.checked = live_mode_enabled
self.camera_running_toggle.blockSignals(False)
@@ -451,19 +519,31 @@ class XRayEye(BECWidget, QWidget):
logger.info(f"Shutter changed from GUI to: {enabled}")
self.shutter_toggle.blockSignals(True)
if enabled:
self.dev.fsh.fshopen()
self._run_device_action_async(self.dev.fsh.fshopen, "open fast shutter")
else:
self.dev.fsh.fshclose()
self._run_device_action_async(self.dev.fsh.fshclose, "close fast shutter")
# self.shutter_toggle.checked = enabled
self.shutter_toggle.blockSignals(False)
@SafeSlot(dict, dict)
def getting_shutter_status(self, data, meta):
shutter_open = bool(data.get("signals").get("fsh_shutter").get("value"))
_ = meta
shutter_open = bool(data.get("signals", {}).get("fsh_shutter", {}).get("value", False))
self.shutter_toggle.blockSignals(True)
self.shutter_toggle.checked = shutter_open
self.shutter_toggle.blockSignals(False)
@SafeSlot(dict, dict)
def getting_xray_gui_readback(self, data, meta):
_ = meta
step = data.get("signals", {}).get("omny_xray_gui_step", {}).get("value")
if step is None:
return
try:
self._current_step = int(step)
except (TypeError, ValueError):
logger.warning(f"Invalid X-ray eye GUI step value: {step!r}")
@SafeSlot(bool, bool)
@rpc_timeout(20)
def on_motors_enable(self, x_enable: bool, y_enable: bool):
@@ -492,39 +572,61 @@ class XRayEye(BECWidget, QWidget):
@SafeSlot(dict, dict)
def on_dap_params(self, data, meta):
print("#######################################")
print("getting dap parameters")
print(f"data: {data}")
print(f"meta: {meta}")
self._ensure_fit_tab()
self.waveform_x.auto_range(True)
self.waveform_y.auto_range(True)
# self.bec_dispatcher.disconnect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
curve_id = meta.get("curve_id")
if curve_id == "fit-x-SineModel+LinearModel":
self.dev.omny_xray_gui.fit_params_x.set(data).wait()
print(f"setting x data to {data}")
self._run_device_action_async(
lambda: self.dev.omny_xray_gui.fit_params_x.set(data),
"set X-ray eye fit parameters X",
)
else:
self.dev.omny_xray_gui.fit_params_y.set(data).wait()
print(f"setting y data to {data}")
self._run_device_action_async(
lambda: self.dev.omny_xray_gui.fit_params_y.set(data),
"set X-ray eye fit parameters Y",
)
# self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
@SafeSlot(bool, bool)
def on_tomo_angle_readback(self, data: dict, meta: dict):
# TODO implement if needed
print(f"data: {data}")
print(f"meta: {meta}")
_ = data, meta
@SafeSlot()
@rpc_timeout(20)
def submit_fit_array(self, fit_array):
self.tab_widget.setCurrentIndex(1)
# self.fix_x.title = " got fit array"
print(f"got fit array {fit_array}")
self._pending_fit_array = fit_array
self._schedule_fit_tab_init()
def _schedule_fit_tab_init(self):
if self._closing:
return
if self._fit_tab_initialized:
self._apply_pending_fit_array()
return
if self._fit_tab_init_pending:
return
self._fit_tab_init_pending = True
QTimer.singleShot(0, self._ensure_fit_tab_and_apply_pending)
def _ensure_fit_tab_and_apply_pending(self):
self._fit_tab_init_pending = False
if self._closing:
return
self._ensure_fit_tab()
self._apply_pending_fit_array()
def _apply_pending_fit_array(self):
if self._pending_fit_array is None or not self._fit_tab_initialized:
return
fit_array = self._pending_fit_array
self._pending_fit_array = None
self.waveform_x.curves[0].set_data(x=fit_array[0], y=fit_array[1])
self.waveform_y.curves[0].set_data(x=fit_array[0], y=fit_array[2])
# self.fit_x.set_data(x=fit_array[0],y=fit_array[1])
# self.fit_y.set_data(x=fit_array[0],y=fit_array[2])
@SafeSlot()
def submit(self):
@@ -549,18 +651,34 @@ class XRayEye(BECWidget, QWidget):
return
# submit roi coordinates
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get("value"))
step = self._current_step
getattr(self.dev.omny_xray_gui, f"xval_x_{step}").set(roi_center_x)
getattr(self.dev.omny_xray_gui, f"yval_y_{step}").set(roi_center_y)
getattr(self.dev.omny_xray_gui, f"width_x_{step}").set(roi_width)
getattr(self.dev.omny_xray_gui, f"width_y_{step}").set(roi_height)
self.dev.omny_xray_gui.submit.set(1)
self._run_device_action_async(
lambda: self._submit_roi_values(
step, roi_center_x, roi_center_y, roi_width, roi_height
),
"submit X-ray eye ROI values",
)
finally:
self.submit_button.blockSignals(False)
def _submit_roi_values(
self,
step: int,
roi_center_x: float,
roi_center_y: float,
roi_width: float,
roi_height: float,
) -> None:
getattr(self.dev.omny_xray_gui, f"xval_x_{step}").set(roi_center_x)
getattr(self.dev.omny_xray_gui, f"yval_y_{step}").set(roi_center_y)
getattr(self.dev.omny_xray_gui, f"width_x_{step}").set(roi_width)
getattr(self.dev.omny_xray_gui, f"width_y_{step}").set(roi_height)
self.dev.omny_xray_gui.submit.set(1)
def cleanup(self):
"""Cleanup connections on widget close -> disconnect slots and stop live mode of camera."""
self._closing = True
if self._connected_motor is not None:
self.bec_dispatcher.disconnect_slot(
self.on_tomo_angle_readback, MessageEndpoints.device_readback(self._connected_motor)
@@ -572,17 +690,71 @@ class XRayEye(BECWidget, QWidget):
self.bec_dispatcher.disconnect_slot(
self.getting_camera_status, MessageEndpoints.device_read_configuration(CAMERA[0])
)
self.bec_dispatcher.disconnect_slot(
self.getting_xray_gui_readback, MessageEndpoints.device_readback("omny_xray_gui")
)
getattr(self.dev, CAMERA[0]).stop_live_mode()
self._run_device_action_async(
getattr(self.dev, CAMERA[0]).stop_live_mode,
"stop camera live mode",
allow_during_cleanup=True,
)
self._join_device_actions(timeout=1.0)
super().cleanup()
def _run_device_action_async(
self, action, description: str, *, allow_during_cleanup: bool = False
) -> threading.Thread | None:
if self._closing and not allow_during_cleanup:
logger.debug(f"Skipping device action during XRayEye cleanup: {description}")
return None
def runner():
try:
if self._closing and not allow_during_cleanup:
return
action()
except Exception as exc:
logger.warning(f"Failed to {description}: {exc}")
finally:
with self._device_action_lock:
self._device_action_threads.discard(thread)
thread = threading.Thread(target=runner, daemon=True, name=f"XRayEye: {description}")
with self._device_action_lock:
if self._closing and not allow_during_cleanup:
return None
self._device_action_threads.add(thread)
thread.start()
return thread
def _join_device_actions(self, timeout: float) -> None:
deadline = time.monotonic() + timeout
while True:
with self._device_action_lock:
threads = [
thread
for thread in self._device_action_threads
if thread is not threading.current_thread()
]
if not threads:
return
remaining = deadline - time.monotonic()
if remaining <= 0:
logger.warning(
f"{len(threads)} XRayEye device action(s) still running during cleanup."
)
return
for thread in threads:
thread.join(timeout=min(remaining, 0.1))
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.colors import apply_theme
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
apply_theme("light")