From 3770db51be68a5f3fa65e0a67a4ed3efd1c7d6fe Mon Sep 17 00:00:00 2001 From: David Perl Date: Thu, 9 Jan 2025 17:15:11 +0100 Subject: [PATCH] refactor: move positioner_box logic to base class --- bec_widgets/cli/client.py | 25 +++ .../positioner_box/_base/__init__.py | 3 + .../_base/positioner_box_base.py | 180 ++++++++++++++++++ .../positioner_box/positioner_box.py | 154 ++++----------- .../positioner_control_line.py | 1 + tests/unit_tests/test_positioner_box.py | 6 +- 6 files changed, 246 insertions(+), 123 deletions(-) create mode 100644 bec_widgets/widgets/control/device_control/positioner_box/_base/__init__.py create mode 100644 bec_widgets/widgets/control/device_control/positioner_box/_base/positioner_box_base.py diff --git a/bec_widgets/cli/client.py b/bec_widgets/cli/client.py index 61774208..1c3be217 100644 --- a/bec_widgets/cli/client.py +++ b/bec_widgets/cli/client.py @@ -3235,6 +3235,31 @@ class PositionerBox(RPCBase): """ +class PositionerBoxBase(RPCBase): + @property + @rpc_call + def _config_dict(self) -> "dict": + """ + Get the configuration of the widget. + + Returns: + dict: The configuration of the widget. + """ + + @rpc_call + def _get_all_rpc(self) -> "dict": + """ + Get all registered RPC objects. + """ + + @property + @rpc_call + def _rpc_id(self) -> "str": + """ + Get the RPC ID of the widget. + """ + + class PositionerControlLine(RPCBase): @rpc_call def set_positioner(self, positioner: "str | Positioner"): diff --git a/bec_widgets/widgets/control/device_control/positioner_box/_base/__init__.py b/bec_widgets/widgets/control/device_control/positioner_box/_base/__init__.py new file mode 100644 index 00000000..007b19a2 --- /dev/null +++ b/bec_widgets/widgets/control/device_control/positioner_box/_base/__init__.py @@ -0,0 +1,3 @@ +from .positioner_box_base import PositionerBoxBase + +__ALL__ = ["PositionerBoxBase"] diff --git a/bec_widgets/widgets/control/device_control/positioner_box/_base/positioner_box_base.py b/bec_widgets/widgets/control/device_control/positioner_box/_base/positioner_box_base.py new file mode 100644 index 00000000..eadd77d0 --- /dev/null +++ b/bec_widgets/widgets/control/device_control/positioner_box/_base/positioner_box_base.py @@ -0,0 +1,180 @@ +from ast import Tuple +import uuid +from abc import abstractmethod +from typing import Callable, TypedDict + +from bec_lib.device import Positioner +from bec_lib.endpoints import MessageEndpoints +from bec_lib.logger import bec_logger +from bec_lib.messages import ScanQueueMessage +from qtpy.QtWidgets import QGroupBox, QDoubleSpinBox, QPushButton, QVBoxLayout, QLabel, QLineEdit + +from bec_widgets.qt_utils.compact_popup import CompactPopupWidget +from bec_widgets.utils.bec_widget import BECWidget +from bec_widgets.widgets.control.device_control.position_indicator.position_indicator import ( + PositionIndicator, +) +from bec_widgets.widgets.utility.spinner.spinner import SpinnerWidget + +logger = bec_logger.logger + + +class DeviceUpdateUIComponents(TypedDict): + spinner: SpinnerWidget + setpoint: QLineEdit + readback: QLabel + position_indicator: PositionIndicator + step_size: QDoubleSpinBox + device_box: QGroupBox + + +class PositionerBoxBase(BECWidget, CompactPopupWidget): + """Contains some core logic for positioner box widgets""" + + current_path = "" + ICON_NAME = "switch_right" + + def __init__(self, parent=None, **kwargs): + """Initialize the PositionerBox widget. + + Args: + parent: The parent widget. + device (Positioner): The device to control. + """ + super().__init__(**kwargs) + CompactPopupWidget.__init__(self, parent=parent, layout=QVBoxLayout) + self.get_bec_shortcuts() + + def _check_device_is_valid(self, device: str): + """Check if the device is a positioner + + Args: + device (str): The device name + """ + if device not in self.dev: + logger.info(f"Device {device} not found in the device list") + return False + if not isinstance(self.dev[device], Positioner): + logger.info(f"Device {device} is not a positioner") + return False + return True + + @abstractmethod + def _device_ui_components(self, device: str) -> DeviceUpdateUIComponents: ... + + def _init_device( + self, + device: str, + position_emit: Callable[[float], None], + limit_update: Callable[[tuple[float, float]], None], + ): + """Init the device view and readback""" + if self._check_device_is_valid(device): + data = self.dev[device].read() + self._on_device_readback( + device, + self._device_ui_components(device), + {"signals": data}, + {}, + position_emit, + limit_update, + ) + + def _stop_device(self, device: str): + """Stop call""" + request_id = str(uuid.uuid4()) + params = {"device": device, "rpc_id": request_id, "func": "stop", "args": [], "kwargs": {}} + msg = ScanQueueMessage( + scan_type="device_rpc", + parameter=params, + queue="emergency", + metadata={"RID": request_id, "response": False}, + ) + self.client.connector.send(MessageEndpoints.scan_queue_request(), msg) + + # pylint: disable=unused-argument + def _on_device_readback( + self, + device: str, + ui_components: DeviceUpdateUIComponents, + msg_content: dict, + metadata: dict, + position_emit: Callable[[float], None], + limit_update: Callable[[tuple[float, float]], None], + ): + signals = msg_content.get("signals", {}) + # pylint: disable=protected-access + hinted_signals = self.dev[device]._hints + precision = self.dev[device].precision + + spinner = ui_components["spinner"] + position_indicator = ui_components["position_indicator"] + readback = ui_components["readback"] + setpoint = ui_components["setpoint"] + + readback_val = None + setpoint_val = None + + if len(hinted_signals) == 1: + signal = hinted_signals[0] + readback_val = signals.get(signal, {}).get("value") + + for setpoint_signal in ["setpoint", "user_setpoint"]: + setpoint_val = signals.get(f"{device}_{setpoint_signal}", {}).get("value") + if setpoint_val is not None: + break + + for moving_signal in ["motor_done_move", "motor_is_moving"]: + is_moving = signals.get(f"{device}_{moving_signal}", {}).get("value") + if is_moving is not None: + break + + if is_moving is not None: + spinner.setVisible(True) + if is_moving: + spinner.start() + spinner.setToolTip("Device is moving") + self.set_global_state("warning") + else: + spinner.stop() + spinner.setToolTip("Device is idle") + self.set_global_state("success") + else: + spinner.setVisible(False) + + if readback_val is not None: + readback.setText(f"{readback_val:.{precision}f}") + position_emit(readback_val) + + if setpoint_val is not None: + setpoint.setText(f"{setpoint_val:.{precision}f}") + + limits = self.dev[device].limits + limit_update(limits) + if limits is not None and readback_val is not None and limits[0] != limits[1]: + pos = (readback_val - limits[0]) / (limits[1] - limits[0]) + position_indicator.set_value(pos) + + def _update_limits_ui( + self, limits: tuple[float, float], position_indicator, setpoint_validator + ): + if limits is not None and limits[0] != limits[1]: + position_indicator.setToolTip(f"Min: {limits[0]}, Max: {limits[1]}") + setpoint_validator.setRange(limits[0], limits[1]) + else: + position_indicator.setToolTip("No limits set") + setpoint_validator.setRange(float("-inf"), float("inf")) + + def _update_device_ui(self, device: str, ui: DeviceUpdateUIComponents): + ui["device_box"].setTitle(device) + ui["readback"].setToolTip(f"{device} readback") + ui["setpoint"].setToolTip(f"{device} setpoint") + ui["step_size"].setToolTip(f"Step size for {device}") + precision = self.dev[device].precision + if precision is not None: + ui["step_size"].setDecimals(precision) + ui["step_size"].setValue(10**-precision * 10) + + def _swap_readback_signal_connection(self, slot, old_device, new_device): + self.bec_dispatcher.disconnect_slot(slot, MessageEndpoints.device_readback(old_device)) + self.bec_dispatcher.connect_slot(slot, MessageEndpoints.device_readback(new_device)) diff --git a/bec_widgets/widgets/control/device_control/positioner_box/positioner_box/positioner_box.py b/bec_widgets/widgets/control/device_control/positioner_box/positioner_box/positioner_box.py index 8d430098..1391030f 100644 --- a/bec_widgets/widgets/control/device_control/positioner_box/positioner_box/positioner_box.py +++ b/bec_widgets/widgets/control/device_control/positioner_box/positioner_box/positioner_box.py @@ -3,21 +3,21 @@ from __future__ import annotations import os -import uuid from bec_lib.device import Positioner from bec_lib.endpoints import MessageEndpoints from bec_lib.logger import bec_logger -from bec_lib.messages import ScanQueueMessage from bec_qthemes import material_icon from qtpy.QtCore import Property, Signal, Slot from qtpy.QtGui import QDoubleValidator from qtpy.QtWidgets import QDialog, QDoubleSpinBox, QPushButton, QVBoxLayout -from bec_widgets.qt_utils.compact_popup import CompactPopupWidget +from bec_widgets.widgets.control.device_control.positioner_box._base import PositionerBoxBase from bec_widgets.utils import UILoader -from bec_widgets.utils.bec_widget import BECWidget from bec_widgets.utils.colors import get_accent_colors, set_theme +from bec_widgets.widgets.control.device_control.positioner_box._base.positioner_box_base import ( + DeviceUpdateUIComponents, +) from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit import ( DeviceLineEdit, @@ -29,40 +29,37 @@ logger = bec_logger.logger MODULE_PATH = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) -class PositionerBox(BECWidget, CompactPopupWidget): +class PositionerBox(PositionerBoxBase): """Simple Widget to control a positioner in box form""" - current_path = "" ui_file = "positioner_box.ui" dimensions = (234, 224) PLUGIN = True - ICON_NAME = "switch_right" + USER_ACCESS = ["set_positioner"] device_changed = Signal(str, str) # Signal emitted to inform listeners about a position update position_update = Signal(float) - def __init__(self, parent=None, device: Positioner = None, **kwargs): + def __init__(self, parent=None, device: Positioner | str | None = None, **kwargs): """Initialize the PositionerBox widget. Args: parent: The parent widget. device (Positioner): The device to control. """ - super().__init__(**kwargs) - CompactPopupWidget.__init__(self, parent=parent, layout=QVBoxLayout) - self.get_bec_shortcuts() + super().__init__(parent=parent, **kwargs) + self._device = "" self._limits = None self._dialog = None if self.current_path == "": self.current_path = os.path.dirname(__file__) - self.init_ui() - if device is not None: - self.device = device - self.init_device() + self.init_ui() + self.device = device + self._init_device(self.device, self.position_update.emit, self.update_limits) def init_ui(self): """Init the ui""" @@ -115,12 +112,6 @@ class PositionerBox(BECWidget, CompactPopupWidget): self._dialog.exec() self._dialog = None - def init_device(self): - """Init the device view and readback""" - if self._check_device_is_valid(self.device): - data = self.dev[self.device].read() - self.on_device_readback({"signals": data}, {}) - def _toogle_enable_buttons(self, enable: bool) -> None: """Toogle enable/disable on available buttons @@ -181,20 +172,6 @@ class PositionerBox(BECWidget, CompactPopupWidget): positioner = positioner.name self.device = positioner - def _check_device_is_valid(self, device: str): - """Check if the device is a positioner - - Args: - device (str): The device name - """ - if device not in self.dev: - logger.info(f"Device {device} not found in the device list") - return False - if not isinstance(self.dev[device], Positioner): - logger.info(f"Device {device} is not a positioner") - return False - return True - @Slot(str, str) def on_device_change(self, old_device: str, new_device: str): """Upon changing the device, a check will be performed if the device is a Positioner. @@ -207,24 +184,20 @@ class PositionerBox(BECWidget, CompactPopupWidget): return logger.info(f"Device changed from {old_device} to {new_device}") self._toogle_enable_buttons(True) - self.init_device() - self.bec_dispatcher.disconnect_slot( - self.on_device_readback, MessageEndpoints.device_readback(old_device) - ) - self.bec_dispatcher.connect_slot( - self.on_device_readback, MessageEndpoints.device_readback(new_device) - ) - self.ui.device_box.setTitle(new_device) - self.ui.readback.setToolTip(f"{self.device} readback") - self.ui.setpoint.setToolTip(f"{self.device} setpoint") - self.ui.step_size.setToolTip(f"Step size for {new_device}") + self._init_device(new_device, self.position_update.emit, self.update_limits) + self._swap_readback_signal_connection(self.on_device_readback, old_device, new_device) + self._update_device_ui(new_device, self._device_ui_components(new_device)) - precision = self.dev[new_device].precision - if precision is not None: - self.ui.step_size.setDecimals(precision) - self.ui.step_size.setValue(10**-precision * 10) + def _device_ui_components(self, device: str) -> DeviceUpdateUIComponents: + return { + "spinner": self.ui.spinner_widget, + "position_indicator": self.ui.position_indicator, + "readback": self.ui.readback, + "setpoint": self.ui.setpoint, + "step_size": self.ui.step_size, + "device_box": self.ui.device_box, + } - # pylint: disable=unused-argument @Slot(dict, dict) def on_device_readback(self, msg_content: dict, metadata: dict): """Callback for device readback. @@ -233,53 +206,14 @@ class PositionerBox(BECWidget, CompactPopupWidget): msg_content (dict): The message content. metadata (dict): The message metadata. """ - signals = msg_content.get("signals", {}) - # pylint: disable=protected-access - hinted_signals = self.dev[self.device]._hints - precision = self.dev[self.device].precision - - readback_val = None - setpoint_val = None - - if len(hinted_signals) == 1: - signal = hinted_signals[0] - readback_val = signals.get(signal, {}).get("value") - - for setpoint_signal in ["setpoint", "user_setpoint"]: - setpoint_val = signals.get(f"{self.device}_{setpoint_signal}", {}).get("value") - if setpoint_val is not None: - break - - for moving_signal in ["motor_done_move", "motor_is_moving"]: - is_moving = signals.get(f"{self.device}_{moving_signal}", {}).get("value") - if is_moving is not None: - break - - if is_moving is not None: - self.ui.spinner_widget.setVisible(True) - if is_moving: - self.ui.spinner_widget.start() - self.ui.spinner_widget.setToolTip("Device is moving") - self.set_global_state("warning") - else: - self.ui.spinner_widget.stop() - self.ui.spinner_widget.setToolTip("Device is idle") - self.set_global_state("success") - else: - self.ui.spinner_widget.setVisible(False) - - if readback_val is not None: - self.ui.readback.setText(f"{readback_val:.{precision}f}") - self.position_update.emit(readback_val) - - if setpoint_val is not None: - self.ui.setpoint.setText(f"{setpoint_val:.{precision}f}") - - limits = self.dev[self.device].limits - self.update_limits(limits) - if limits is not None and readback_val is not None and limits[0] != limits[1]: - pos = (readback_val - limits[0]) / (limits[1] - limits[0]) - self.ui.position_indicator.set_value(pos) + self._on_device_readback( + self.device, + self._device_ui_components(self.device), + msg_content, + metadata, + self.position_update.emit, + self.update_limits, + ) def update_limits(self, limits: tuple): """Update limits @@ -290,31 +224,11 @@ class PositionerBox(BECWidget, CompactPopupWidget): if limits == self._limits: return self._limits = limits - if limits is not None and limits[0] != limits[1]: - self.ui.position_indicator.setToolTip(f"Min: {limits[0]}, Max: {limits[1]}") - self.setpoint_validator.setRange(limits[0], limits[1]) - else: - self.ui.position_indicator.setToolTip("No limits set") - self.setpoint_validator.setRange(float("-inf"), float("inf")) + self._update_limits_ui(limits, self.ui.position_indicator, self.setpoint_validator) @Slot() def on_stop(self): - """Stop call""" - request_id = str(uuid.uuid4()) - params = { - "device": self.device, - "rpc_id": request_id, - "func": "stop", - "args": [], - "kwargs": {}, - } - msg = ScanQueueMessage( - scan_type="device_rpc", - parameter=params, - queue="emergency", - metadata={"RID": request_id, "response": False}, - ) - self.client.connector.send(MessageEndpoints.scan_queue_request(), msg) + self._stop_device(self.device) @property def step_size(self): diff --git a/bec_widgets/widgets/control/device_control/positioner_box/positioner_control_line/positioner_control_line.py b/bec_widgets/widgets/control/device_control/positioner_box/positioner_control_line/positioner_control_line.py index 6e8d52f5..6f879838 100644 --- a/bec_widgets/widgets/control/device_control/positioner_box/positioner_control_line/positioner_control_line.py +++ b/bec_widgets/widgets/control/device_control/positioner_box/positioner_control_line/positioner_control_line.py @@ -1,4 +1,5 @@ import os + from bec_lib.device import Positioner from bec_widgets.widgets.control.device_control.positioner_box import PositionerBox diff --git a/tests/unit_tests/test_positioner_box.py b/tests/unit_tests/test_positioner_box.py index 0735b608..d3737530 100644 --- a/tests/unit_tests/test_positioner_box.py +++ b/tests/unit_tests/test_positioner_box.py @@ -23,11 +23,11 @@ from .conftest import create_widget def positioner_box(qtbot, mocked_client): """Fixture for PositionerBox widget""" with mock.patch( - "bec_widgets.widgets.control.device_control.positioner_box.positioner_box.positioner_box.uuid.uuid4" + "bec_widgets.widgets.control.device_control.positioner_box._base.positioner_box_base.uuid.uuid4" ) as mock_uuid: mock_uuid.return_value = "fake_uuid" with mock.patch( - "bec_widgets.widgets.control.device_control.positioner_box.positioner_box.positioner_box.PositionerBox._check_device_is_valid", + "bec_widgets.widgets.control.device_control.positioner_box._base.positioner_box_base.PositionerBoxBase._check_device_is_valid", return_value=True, ): db = create_widget(qtbot, PositionerBox, device="samx", client=mocked_client) @@ -126,7 +126,7 @@ def test_positioner_control_line(qtbot, mocked_client): Inherits from PositionerBox, but the layout is changed. Check dimensions only """ with mock.patch( - "bec_widgets.widgets.control.device_control.positioner_box.positioner_box.positioner_box.uuid.uuid4" + "bec_widgets.widgets.control.device_control.positioner_box._base.positioner_box_base.uuid.uuid4" ) as mock_uuid: mock_uuid.return_value = "fake_uuid" with mock.patch(