0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 11:41:49 +02:00

refactor: move positioner_box logic to base class

This commit is contained in:
2025-01-09 17:15:11 +01:00
parent 2419521f5f
commit 3770db51be
6 changed files with 246 additions and 123 deletions

View File

@ -3235,6 +3235,31 @@ class PositionerBox(RPCBase):
"""
class PositionerBoxBase(RPCBase):
@property
@rpc_call
def _config_dict(self) -> "dict":
"""
Get the configuration of the widget.
Returns:
dict: The configuration of the widget.
"""
@rpc_call
def _get_all_rpc(self) -> "dict":
"""
Get all registered RPC objects.
"""
@property
@rpc_call
def _rpc_id(self) -> "str":
"""
Get the RPC ID of the widget.
"""
class PositionerControlLine(RPCBase):
@rpc_call
def set_positioner(self, positioner: "str | Positioner"):

View File

@ -0,0 +1,3 @@
from .positioner_box_base import PositionerBoxBase
__ALL__ = ["PositionerBoxBase"]

View File

@ -0,0 +1,180 @@
from ast import Tuple
import uuid
from abc import abstractmethod
from typing import Callable, TypedDict
from bec_lib.device import Positioner
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.messages import ScanQueueMessage
from qtpy.QtWidgets import QGroupBox, QDoubleSpinBox, QPushButton, QVBoxLayout, QLabel, QLineEdit
from bec_widgets.qt_utils.compact_popup import CompactPopupWidget
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.widgets.control.device_control.position_indicator.position_indicator import (
PositionIndicator,
)
from bec_widgets.widgets.utility.spinner.spinner import SpinnerWidget
logger = bec_logger.logger
class DeviceUpdateUIComponents(TypedDict):
spinner: SpinnerWidget
setpoint: QLineEdit
readback: QLabel
position_indicator: PositionIndicator
step_size: QDoubleSpinBox
device_box: QGroupBox
class PositionerBoxBase(BECWidget, CompactPopupWidget):
"""Contains some core logic for positioner box widgets"""
current_path = ""
ICON_NAME = "switch_right"
def __init__(self, parent=None, **kwargs):
"""Initialize the PositionerBox widget.
Args:
parent: The parent widget.
device (Positioner): The device to control.
"""
super().__init__(**kwargs)
CompactPopupWidget.__init__(self, parent=parent, layout=QVBoxLayout)
self.get_bec_shortcuts()
def _check_device_is_valid(self, device: str):
"""Check if the device is a positioner
Args:
device (str): The device name
"""
if device not in self.dev:
logger.info(f"Device {device} not found in the device list")
return False
if not isinstance(self.dev[device], Positioner):
logger.info(f"Device {device} is not a positioner")
return False
return True
@abstractmethod
def _device_ui_components(self, device: str) -> DeviceUpdateUIComponents: ...
def _init_device(
self,
device: str,
position_emit: Callable[[float], None],
limit_update: Callable[[tuple[float, float]], None],
):
"""Init the device view and readback"""
if self._check_device_is_valid(device):
data = self.dev[device].read()
self._on_device_readback(
device,
self._device_ui_components(device),
{"signals": data},
{},
position_emit,
limit_update,
)
def _stop_device(self, device: str):
"""Stop call"""
request_id = str(uuid.uuid4())
params = {"device": device, "rpc_id": request_id, "func": "stop", "args": [], "kwargs": {}}
msg = ScanQueueMessage(
scan_type="device_rpc",
parameter=params,
queue="emergency",
metadata={"RID": request_id, "response": False},
)
self.client.connector.send(MessageEndpoints.scan_queue_request(), msg)
# pylint: disable=unused-argument
def _on_device_readback(
self,
device: str,
ui_components: DeviceUpdateUIComponents,
msg_content: dict,
metadata: dict,
position_emit: Callable[[float], None],
limit_update: Callable[[tuple[float, float]], None],
):
signals = msg_content.get("signals", {})
# pylint: disable=protected-access
hinted_signals = self.dev[device]._hints
precision = self.dev[device].precision
spinner = ui_components["spinner"]
position_indicator = ui_components["position_indicator"]
readback = ui_components["readback"]
setpoint = ui_components["setpoint"]
readback_val = None
setpoint_val = None
if len(hinted_signals) == 1:
signal = hinted_signals[0]
readback_val = signals.get(signal, {}).get("value")
for setpoint_signal in ["setpoint", "user_setpoint"]:
setpoint_val = signals.get(f"{device}_{setpoint_signal}", {}).get("value")
if setpoint_val is not None:
break
for moving_signal in ["motor_done_move", "motor_is_moving"]:
is_moving = signals.get(f"{device}_{moving_signal}", {}).get("value")
if is_moving is not None:
break
if is_moving is not None:
spinner.setVisible(True)
if is_moving:
spinner.start()
spinner.setToolTip("Device is moving")
self.set_global_state("warning")
else:
spinner.stop()
spinner.setToolTip("Device is idle")
self.set_global_state("success")
else:
spinner.setVisible(False)
if readback_val is not None:
readback.setText(f"{readback_val:.{precision}f}")
position_emit(readback_val)
if setpoint_val is not None:
setpoint.setText(f"{setpoint_val:.{precision}f}")
limits = self.dev[device].limits
limit_update(limits)
if limits is not None and readback_val is not None and limits[0] != limits[1]:
pos = (readback_val - limits[0]) / (limits[1] - limits[0])
position_indicator.set_value(pos)
def _update_limits_ui(
self, limits: tuple[float, float], position_indicator, setpoint_validator
):
if limits is not None and limits[0] != limits[1]:
position_indicator.setToolTip(f"Min: {limits[0]}, Max: {limits[1]}")
setpoint_validator.setRange(limits[0], limits[1])
else:
position_indicator.setToolTip("No limits set")
setpoint_validator.setRange(float("-inf"), float("inf"))
def _update_device_ui(self, device: str, ui: DeviceUpdateUIComponents):
ui["device_box"].setTitle(device)
ui["readback"].setToolTip(f"{device} readback")
ui["setpoint"].setToolTip(f"{device} setpoint")
ui["step_size"].setToolTip(f"Step size for {device}")
precision = self.dev[device].precision
if precision is not None:
ui["step_size"].setDecimals(precision)
ui["step_size"].setValue(10**-precision * 10)
def _swap_readback_signal_connection(self, slot, old_device, new_device):
self.bec_dispatcher.disconnect_slot(slot, MessageEndpoints.device_readback(old_device))
self.bec_dispatcher.connect_slot(slot, MessageEndpoints.device_readback(new_device))

View File

@ -3,21 +3,21 @@
from __future__ import annotations
import os
import uuid
from bec_lib.device import Positioner
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.messages import ScanQueueMessage
from bec_qthemes import material_icon
from qtpy.QtCore import Property, Signal, Slot
from qtpy.QtGui import QDoubleValidator
from qtpy.QtWidgets import QDialog, QDoubleSpinBox, QPushButton, QVBoxLayout
from bec_widgets.qt_utils.compact_popup import CompactPopupWidget
from bec_widgets.widgets.control.device_control.positioner_box._base import PositionerBoxBase
from bec_widgets.utils import UILoader
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors, set_theme
from bec_widgets.widgets.control.device_control.positioner_box._base.positioner_box_base import (
DeviceUpdateUIComponents,
)
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit import (
DeviceLineEdit,
@ -29,40 +29,37 @@ logger = bec_logger.logger
MODULE_PATH = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
class PositionerBox(BECWidget, CompactPopupWidget):
class PositionerBox(PositionerBoxBase):
"""Simple Widget to control a positioner in box form"""
current_path = ""
ui_file = "positioner_box.ui"
dimensions = (234, 224)
PLUGIN = True
ICON_NAME = "switch_right"
USER_ACCESS = ["set_positioner"]
device_changed = Signal(str, str)
# Signal emitted to inform listeners about a position update
position_update = Signal(float)
def __init__(self, parent=None, device: Positioner = None, **kwargs):
def __init__(self, parent=None, device: Positioner | str | None = None, **kwargs):
"""Initialize the PositionerBox widget.
Args:
parent: The parent widget.
device (Positioner): The device to control.
"""
super().__init__(**kwargs)
CompactPopupWidget.__init__(self, parent=parent, layout=QVBoxLayout)
self.get_bec_shortcuts()
super().__init__(parent=parent, **kwargs)
self._device = ""
self._limits = None
self._dialog = None
if self.current_path == "":
self.current_path = os.path.dirname(__file__)
self.init_ui()
if device is not None:
self.init_ui()
self.device = device
self.init_device()
self._init_device(self.device, self.position_update.emit, self.update_limits)
def init_ui(self):
"""Init the ui"""
@ -115,12 +112,6 @@ class PositionerBox(BECWidget, CompactPopupWidget):
self._dialog.exec()
self._dialog = None
def init_device(self):
"""Init the device view and readback"""
if self._check_device_is_valid(self.device):
data = self.dev[self.device].read()
self.on_device_readback({"signals": data}, {})
def _toogle_enable_buttons(self, enable: bool) -> None:
"""Toogle enable/disable on available buttons
@ -181,20 +172,6 @@ class PositionerBox(BECWidget, CompactPopupWidget):
positioner = positioner.name
self.device = positioner
def _check_device_is_valid(self, device: str):
"""Check if the device is a positioner
Args:
device (str): The device name
"""
if device not in self.dev:
logger.info(f"Device {device} not found in the device list")
return False
if not isinstance(self.dev[device], Positioner):
logger.info(f"Device {device} is not a positioner")
return False
return True
@Slot(str, str)
def on_device_change(self, old_device: str, new_device: str):
"""Upon changing the device, a check will be performed if the device is a Positioner.
@ -207,24 +184,20 @@ class PositionerBox(BECWidget, CompactPopupWidget):
return
logger.info(f"Device changed from {old_device} to {new_device}")
self._toogle_enable_buttons(True)
self.init_device()
self.bec_dispatcher.disconnect_slot(
self.on_device_readback, MessageEndpoints.device_readback(old_device)
)
self.bec_dispatcher.connect_slot(
self.on_device_readback, MessageEndpoints.device_readback(new_device)
)
self.ui.device_box.setTitle(new_device)
self.ui.readback.setToolTip(f"{self.device} readback")
self.ui.setpoint.setToolTip(f"{self.device} setpoint")
self.ui.step_size.setToolTip(f"Step size for {new_device}")
self._init_device(new_device, self.position_update.emit, self.update_limits)
self._swap_readback_signal_connection(self.on_device_readback, old_device, new_device)
self._update_device_ui(new_device, self._device_ui_components(new_device))
precision = self.dev[new_device].precision
if precision is not None:
self.ui.step_size.setDecimals(precision)
self.ui.step_size.setValue(10**-precision * 10)
def _device_ui_components(self, device: str) -> DeviceUpdateUIComponents:
return {
"spinner": self.ui.spinner_widget,
"position_indicator": self.ui.position_indicator,
"readback": self.ui.readback,
"setpoint": self.ui.setpoint,
"step_size": self.ui.step_size,
"device_box": self.ui.device_box,
}
# pylint: disable=unused-argument
@Slot(dict, dict)
def on_device_readback(self, msg_content: dict, metadata: dict):
"""Callback for device readback.
@ -233,53 +206,14 @@ class PositionerBox(BECWidget, CompactPopupWidget):
msg_content (dict): The message content.
metadata (dict): The message metadata.
"""
signals = msg_content.get("signals", {})
# pylint: disable=protected-access
hinted_signals = self.dev[self.device]._hints
precision = self.dev[self.device].precision
readback_val = None
setpoint_val = None
if len(hinted_signals) == 1:
signal = hinted_signals[0]
readback_val = signals.get(signal, {}).get("value")
for setpoint_signal in ["setpoint", "user_setpoint"]:
setpoint_val = signals.get(f"{self.device}_{setpoint_signal}", {}).get("value")
if setpoint_val is not None:
break
for moving_signal in ["motor_done_move", "motor_is_moving"]:
is_moving = signals.get(f"{self.device}_{moving_signal}", {}).get("value")
if is_moving is not None:
break
if is_moving is not None:
self.ui.spinner_widget.setVisible(True)
if is_moving:
self.ui.spinner_widget.start()
self.ui.spinner_widget.setToolTip("Device is moving")
self.set_global_state("warning")
else:
self.ui.spinner_widget.stop()
self.ui.spinner_widget.setToolTip("Device is idle")
self.set_global_state("success")
else:
self.ui.spinner_widget.setVisible(False)
if readback_val is not None:
self.ui.readback.setText(f"{readback_val:.{precision}f}")
self.position_update.emit(readback_val)
if setpoint_val is not None:
self.ui.setpoint.setText(f"{setpoint_val:.{precision}f}")
limits = self.dev[self.device].limits
self.update_limits(limits)
if limits is not None and readback_val is not None and limits[0] != limits[1]:
pos = (readback_val - limits[0]) / (limits[1] - limits[0])
self.ui.position_indicator.set_value(pos)
self._on_device_readback(
self.device,
self._device_ui_components(self.device),
msg_content,
metadata,
self.position_update.emit,
self.update_limits,
)
def update_limits(self, limits: tuple):
"""Update limits
@ -290,31 +224,11 @@ class PositionerBox(BECWidget, CompactPopupWidget):
if limits == self._limits:
return
self._limits = limits
if limits is not None and limits[0] != limits[1]:
self.ui.position_indicator.setToolTip(f"Min: {limits[0]}, Max: {limits[1]}")
self.setpoint_validator.setRange(limits[0], limits[1])
else:
self.ui.position_indicator.setToolTip("No limits set")
self.setpoint_validator.setRange(float("-inf"), float("inf"))
self._update_limits_ui(limits, self.ui.position_indicator, self.setpoint_validator)
@Slot()
def on_stop(self):
"""Stop call"""
request_id = str(uuid.uuid4())
params = {
"device": self.device,
"rpc_id": request_id,
"func": "stop",
"args": [],
"kwargs": {},
}
msg = ScanQueueMessage(
scan_type="device_rpc",
parameter=params,
queue="emergency",
metadata={"RID": request_id, "response": False},
)
self.client.connector.send(MessageEndpoints.scan_queue_request(), msg)
self._stop_device(self.device)
@property
def step_size(self):

View File

@ -1,4 +1,5 @@
import os
from bec_lib.device import Positioner
from bec_widgets.widgets.control.device_control.positioner_box import PositionerBox

View File

@ -23,11 +23,11 @@ from .conftest import create_widget
def positioner_box(qtbot, mocked_client):
"""Fixture for PositionerBox widget"""
with mock.patch(
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box.positioner_box.uuid.uuid4"
"bec_widgets.widgets.control.device_control.positioner_box._base.positioner_box_base.uuid.uuid4"
) as mock_uuid:
mock_uuid.return_value = "fake_uuid"
with mock.patch(
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box.positioner_box.PositionerBox._check_device_is_valid",
"bec_widgets.widgets.control.device_control.positioner_box._base.positioner_box_base.PositionerBoxBase._check_device_is_valid",
return_value=True,
):
db = create_widget(qtbot, PositionerBox, device="samx", client=mocked_client)
@ -126,7 +126,7 @@ def test_positioner_control_line(qtbot, mocked_client):
Inherits from PositionerBox, but the layout is changed. Check dimensions only
"""
with mock.patch(
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box.positioner_box.uuid.uuid4"
"bec_widgets.widgets.control.device_control.positioner_box._base.positioner_box_base.uuid.uuid4"
) as mock_uuid:
mock_uuid.return_value = "fake_uuid"
with mock.patch(