1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-03-04 16:02:51 +01:00

fix(signal_label): rewrite reading selection logic

This commit is contained in:
2025-07-18 14:04:32 +02:00
committed by David Perl
parent f0dc992586
commit cd17a4aad9
2 changed files with 44 additions and 36 deletions

View File

@@ -219,6 +219,8 @@ class SignalLabel(BECWidget, QWidget):
self._select_button.clicked.connect(self.show_choice_dialog)
self.get_bec_shortcuts()
self._device_obj = self.dev.get(self._device)
self._signal_key, self._signal_info = "", {}
self._connected: bool = False
self.connect_device()
@@ -251,8 +253,9 @@ class SignalLabel(BECWidget, QWidget):
def connect_device(self):
"""Subscribe to the Redis topic for the device to display"""
if not self._connected and self._device and self._device in self.dev:
self._signal_key, self._signal_info = self._signal_key_and_info()
self._manual_read()
self._read_endpoint = MessageEndpoints.device_read(self._device)
self._read_endpoint = MessageEndpoints.device_readback(self._device)
self._read_config_endpoint = MessageEndpoints.device_read_configuration(self._device)
self.bec_dispatcher.connect_slot(self.on_device_readback, self._read_endpoint)
self.bec_dispatcher.connect_slot(self.on_device_readback, self._read_config_endpoint)
@@ -262,44 +265,30 @@ class SignalLabel(BECWidget, QWidget):
def disconnect_device(self):
"""Unsubscribe from the Redis topic for the device to display"""
if self._connected:
self._connected = False
self.bec_dispatcher.disconnect_slot(self.on_device_readback, self._read_endpoint)
self.bec_dispatcher.disconnect_slot(self.on_device_readback, self._read_config_endpoint)
self._connected = False
def _manual_read(self):
if self._device is None or not isinstance(
(device := self.dev.get(self._device)), Device | Signal
):
self._units = ""
self._value = "__"
if not isinstance(self._device_obj, Device | Signal):
self._value, self._units = "__", ""
return
signal, info = (
(
getattr(device, self.signal, None),
device._info.get("signals", {}).get(self._signal, {}).get("describe", {}),
)
if isinstance(device, Device)
else (device, device.describe().get(self._device))
)
if not isinstance(signal, Signal): # Avoid getting other attributes of device, e.g. methods
signal = None
if signal is None:
self._units = ""
self._value = "__"
reading = (self._device_obj.read() or {}) | (self._device_obj.read_configuration() or {})
value = reading.get(self._signal_key, {}).get("value")
if value is None:
self._value, self._units = "__", ""
return
self._value = list(signal.read(cached=True).values())[0]["value"]
self._units = info.get("egu", "")
self._dtype = info.get("dtype", "float")
self._value = value
self._units = self._signal_info.get("egu", "")
self._dtype = self._signal_info.get("dtype", "float")
@SafeSlot(dict, dict)
def on_device_readback(self, msg: dict, metadata: dict) -> None:
"""
Update the display with the new value.
"""
try:
signal_to_read = self._patch_hinted_signal()
_value = msg["signals"].get(signal_to_read, {}).get("value")
_value = msg["signals"].get(self._signal_key, {}).get("value")
if _value is not None:
self._value = _value
self.set_display_value(self._value)
@@ -309,15 +298,25 @@ class SignalLabel(BECWidget, QWidget):
f"Error processing incoming reading: {msg}, handled with exception: {''.join(traceback.format_exception(e))}"
)
def _patch_hinted_signal(self):
if self.dev[self._device]._info["signals"] == {}:
return self._signal or self._device
signal_info = self.dev[self._device]._info["signals"][self._signal]
return (
signal_info["obj_name"]
if signal_info["kind_str"] == Kind.hinted.name
else (self._signal or self._device)
)
def _signal_key_and_info(self) -> tuple[str, dict]:
if isinstance(self._device_obj, Device):
signal_info = self._device_obj._info["signals"][self._signal]
if signal_info["kind_str"] == Kind.hinted.name:
return signal_info["obj_name"], signal_info
else:
return f"{self._device}_{self._signal}", signal_info
elif isinstance(self._device_obj, Signal):
return self._device, self._device_obj._info["describe_configuration"]
return "", {}
# if self.dev[self._device]._info["signals"] == {}:
# return self._signal or self._device
# signal_info = self.dev[self._device]._info["signals"][self._signal]
# return (
# signal_info["obj_name"]
# if signal_info["kind_str"] == Kind.hinted.name
# else (self._signal or self._device)
# )
@SafeProperty(str)
def device(self) -> str:
@@ -328,6 +327,7 @@ class SignalLabel(BECWidget, QWidget):
def device(self, value: str) -> None:
self.disconnect_device()
self._device = value
self._device_obj = self.dev.get(self._device)
self._config.device = value
self.connect_device()
self._update_label()

View File

@@ -84,7 +84,15 @@ def test_initialization(signal_label: SignalLabel):
def test_initialization_with_device(qtbot, mocked_client: MagicMock):
with patch.object(mocked_client.device_manager.devices.samx, "_info", SAMX_INFO_DICT):
with (
patch.object(mocked_client.device_manager.devices.samx, "_info", SAMX_INFO_DICT),
patch.object(
mocked_client.device_manager.devices.samx,
"_get_root_recursively",
lambda *_: (MagicMock(),),
),
):
widget = SignalLabel(device="samx", signal="readback", client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)