1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-14 20:50:55 +02:00

Compare commits

..

1 Commits

Author SHA1 Message Date
c0dbb0a5e7 fix: reenable logpanel in menu 2025-10-06 09:53:59 +01:00
71 changed files with 585 additions and 1941 deletions

View File

@@ -1,6 +0,0 @@
version: 2
updates:
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "weekly"

View File

@@ -1,136 +1,6 @@
# CHANGELOG
## v2.42.0 (2025-10-21)
### Features
- **image_roi**: Enhance get_coordinates to include rectangle center and dimensions
([`96664c3`](https://github.com/bec-project/bec_widgets/commit/96664c3923737df0b09aa8f35df388f9fd630b55))
- **positioner_box_2d**: Added properties to enable/disable vertical and horizontal controls
([`1e19092`](https://github.com/bec-project/bec_widgets/commit/1e190923196f8b28c92dfdd83b9ce90873dd792d))
## v2.41.1 (2025-10-15)
### Bug Fixes
- **dependencies**: Bec lib versions fixed
([`3941050`](https://github.com/bec-project/bec_widgets/commit/3941050883a791f800ab7178af2435ac14f837b6))
## v2.41.0 (2025-10-15)
### Bug Fixes
- **image_roi**: Delete button added to compact version
([`ef27de4`](https://github.com/bec-project/bec_widgets/commit/ef27de40ceee8375d95a0f3a8e451b7d05d0ae2c))
- **image_roi**: Rois can be removed with right click context menu
([`37df95e`](https://github.com/bec-project/bec_widgets/commit/37df95ead8d6a07a6c5794a97a486d9f380004cc))
### Build System
- **bec_lib**: Version bump to 3.69.3
([`28ac9c5`](https://github.com/bec-project/bec_widgets/commit/28ac9c5cc369bdfa712c70c45591243631c65066))
### Features
- **image_roi_tree**: Compact mode added
([`c87a6cf`](https://github.com/bec-project/bec_widgets/commit/c87a6cfce9c36588b32f5279e63072bc2646c36f))
### Refactoring
- **serializer**: Upgrade to new serializer interface
([`3d807ea`](https://github.com/bec-project/bec_widgets/commit/3d807eaa63980fd2bb11661696c4d8548fffde8c))
### Testing
- **deviceconfig-form-update**: Add onFailure default to test
([`1dd20d5`](https://github.com/bec-project/bec_widgets/commit/1dd20d5986485f3bfe7ee02596ca23027ec4b756))
## v2.40.0 (2025-10-08)
### Bug Fixes
- **curve_tree**: Fetching scan numbers directly from the bec client
([`8111a4a`](https://github.com/bec-project/bec_widgets/commit/8111a4a21b7c1bd75316e9a1f1166b88ea52326d))
- **curve_tree**: Safeguard fetching scan numbers from BEC client
([`df8065e`](https://github.com/bec-project/bec_widgets/commit/df8065ea4000b24235520756515aa18f812bb390))
- **curve_tree**: Scans are always fetched by scan ids
([`20a59af`](https://github.com/bec-project/bec_widgets/commit/20a59af648a9808057df2226a3a3c12893cc5059))
- **waveform**: Cleanup of scan_history dialog if not closed manually before widget
([`d681ba5`](https://github.com/bec-project/bec_widgets/commit/d681ba538be9ccec45a1ebd412cbc33c8c7c0ae2))
- **waveform**: Fetching scan number is not done from list but from .get_by_scan_number
([`962ab77`](https://github.com/bec-project/bec_widgets/commit/962ab774e6afc73a321a5680e2862d9e41812888))
- **waveform**: If scan id and scan number is provided, the scan is fetched from the scan id
([`e59f27a`](https://github.com/bec-project/bec_widgets/commit/e59f27a22de490768c814c80642a7a91bebfef5b))
- **waveform**: Safeguard added to the fetching history data
([`540cfc3`](https://github.com/bec-project/bec_widgets/commit/540cfc37be65afcf721773564adc85de681a9d07))
- **waveform**: Safeguard for _scan_history_closed
([`2bf4896`](https://github.com/bec-project/bec_widgets/commit/2bf489600e96bb5b47d89bed261614f62c970ca9))
- **waveform**: Safeguard for if scan_item is a list
([`7e88a00`](https://github.com/bec-project/bec_widgets/commit/7e88a002b6ca40fc85fde993282b8706f140d9aa))
- **waveform**: Update x suffix label with x property change, do not wait for next update cycle
([`d19001c`](https://github.com/bec-project/bec_widgets/commit/d19001c94e652c0c3e18f8d7903fd1ccff1111cd))
- **waveform**: X_data checked with is scalar instead of len()
([`db7dd4f`](https://github.com/bec-project/bec_widgets/commit/db7dd4f8d4b1210e65c852f6193fc8cf0f4809a5))
### Build System
- **bec_lib**: Bec_lib dependency raised to 3.68
([`2f3dc2c`](https://github.com/bec-project/bec_widgets/commit/2f3dc2ce6b7133fc5582bd6996a674590cf1002d))
### Chores
- Add dependabot config
([`f25f865`](https://github.com/bec-project/bec_widgets/commit/f25f86522f0a2e9dd24ca862ea8de89873951f83))
### Features
- **waveform**: New type of curve - history curve
([`f083dff`](https://github.com/bec-project/bec_widgets/commit/f083dff6128c6256443b49f54ab12b54f1b90d66))
### Refactoring
- **test_waveform**: Test waveform renamed
([`2f798be`](https://github.com/bec-project/bec_widgets/commit/2f798be7b0d43d304ccbd0e992a9d62f1aa1dd5f))
- **waveform**: Separate method to fetch scan item from history
([`4be7058`](https://github.com/bec-project/bec_widgets/commit/4be70580a60293204b135c6ea77978f1dcf8aa5f))
### Testing
- **conftest**: Suppress_message_box for error popups fixture autouse True
([`0844a9e`](https://github.com/bec-project/bec_widgets/commit/0844a9e11975a34780b1dc413f5145517d1a1a22))
- **plotting_framework_e2e**: Fetching history curve
([`a006f95`](https://github.com/bec-project/bec_widgets/commit/a006f95f211ad115019967e365a6627d9678a1e3))
- **waveform,curve_tree**: Test extended to cover history curve behaviour
([`5a5d323`](https://github.com/bec-project/bec_widgets/commit/5a5d32312b08e1edeb69243daddfaaa9bac22273))
## v2.39.1 (2025-10-07)
### Bug Fixes
- Explicitly pass the cached readout flag
([`50696bc`](https://github.com/bec-project/bec_widgets/commit/50696bce4ce14c61b4bdda8c6fb40967972e6b23))
## v2.39.0 (2025-09-24)
### Bug Fixes

View File

@@ -3534,34 +3534,6 @@ class PositionerBox2D(RPCBase):
Take a screenshot of the dock area and save it to a file.
"""
@property
@rpc_call
def enable_controls_hor(self) -> "bool":
"""
Persisted switch for horizontal control buttons (tweak/step).
"""
@enable_controls_hor.setter
@rpc_call
def enable_controls_hor(self) -> "bool":
"""
Persisted switch for horizontal control buttons (tweak/step).
"""
@property
@rpc_call
def enable_controls_ver(self) -> "bool":
"""
Persisted switch for vertical control buttons (tweak/step).
"""
@enable_controls_ver.setter
@rpc_call
def enable_controls_ver(self) -> "bool":
"""
Persisted switch for vertical control buttons (tweak/step).
"""
class PositionerControlLine(RPCBase):
"""A widget that controls a single device."""
@@ -3681,8 +3653,8 @@ class RectangularROI(RPCBase):
@rpc_call
def get_coordinates(self, typed: "bool | None" = None) -> "dict | tuple":
"""
Returns the coordinates of a rectangle's corners, rectangle center and dimensions.
Supports returning them as either a dictionary with descriptive keys or a tuple of coordinates.
Returns the coordinates of a rectangle's corners. Supports returning them
as either a dictionary with descriptive keys or a tuple of coordinates.
Args:
typed (bool | None): If True, returns coordinates as a dictionary with
@@ -3690,7 +3662,7 @@ class RectangularROI(RPCBase):
the value of `self.description`.
Returns:
dict | tuple: The rectangle's corner coordinates, rectangle center and dimensions, where the format
dict | tuple: The rectangle's corner coordinates, where the format
depends on the `typed` parameter.
"""
@@ -5107,8 +5079,6 @@ class Waveform(RPCBase):
color: "str | None" = None,
label: "str | None" = None,
dap: "str | None" = None,
scan_id: "str | None" = None,
scan_number: "int | None" = None,
**kwargs,
) -> "Curve":
"""
@@ -5131,10 +5101,6 @@ class Waveform(RPCBase):
dap(str): The dap model to use for the curve, only available for sync devices.
If not specified, none will be added.
Use the same string as is the name of the LMFit model.
scan_id(str): Optional scan ID. When provided, the curve is treated as a **history** curve and
the ydata (and optional xdata) are fetched from that historical scan. Such curves are
never cleared by livescan resets.
scan_number(int): Optional scan index. When provided, the curve is treated as a **history** curve and
Returns:
Curve: The curve object.
@@ -5177,11 +5143,11 @@ class Waveform(RPCBase):
def update_with_scan_history(self, scan_index: "int" = None, scan_id: "str" = None):
"""
Update the scan curves with the data from the scan storage.
If both arguments are provided, scan_id takes precedence and scan_index is ignored.
Provide only one of scan_id or scan_index.
Args:
scan_id(str, optional): ScanID of the scan to be updated. Defaults to None.
scan_index(int, optional): Index (scan number) of the scan to be updated. Defaults to None.
scan_index(int, optional): Index of the scan to be updated. Defaults to None.
"""
@rpc_call

View File

@@ -55,7 +55,7 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
# "btn6": self.btn6,
# "pb": self.pb,
# "pi": self.pi,
"wf": self.wf,
# "wf": self.wf,
# "scatter": self.scatter,
# "scatter_mi": self.scatter,
# "mwf": self.mwf,
@@ -105,11 +105,12 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
# self.btn5 = QPushButton("Button 5")
# self.btn6 = QPushButton("Button 6")
#
fifth_tab = QWidget()
fifth_tab_layout = QVBoxLayout(fifth_tab)
self.wf = Waveform()
fifth_tab_layout.addWidget(self.wf)
tab_widget.addTab(fifth_tab, "Waveform Next Gen")
# fifth_tab = QWidget()
# fifth_tab_layout = QVBoxLayout(fifth_tab)
# self.wf = Waveform()
# fifth_tab_layout.addWidget(self.wf)
# tab_widget.addTab(fifth_tab, "Waveform Next Gen")
# tab_widget.setCurrentIndex(4)
#
sixth_tab = QWidget()
sixth_tab_layout = QVBoxLayout(sixth_tab)

View File

@@ -1,285 +0,0 @@
from unittest.mock import MagicMock
from bec_lib.device import Device as BECDevice
from bec_lib.device import Positioner as BECPositioner
from bec_lib.device import ReadoutPriority
from bec_lib.devicemanager import DeviceContainer
class FakeDevice(BECDevice):
"""Fake minimal positioner class for testing."""
def __init__(self, name, enabled=True, readout_priority=ReadoutPriority.MONITORED):
super().__init__(name=name)
self._enabled = enabled
self.signals = {self.name: {"value": 1.0}}
self.description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
self._readout_priority = readout_priority
self._config = {
"readoutPriority": "baseline",
"deviceClass": "ophyd.Device",
"deviceConfig": {},
"deviceTags": {"user device"},
"enabled": enabled,
"readOnly": False,
"name": self.name,
}
@property
def readout_priority(self):
return self._readout_priority
@readout_priority.setter
def readout_priority(self, value):
self._readout_priority = value
@property
def limits(self) -> tuple[float, float]:
return self._limits
@limits.setter
def limits(self, value: tuple[float, float]):
self._limits = value
def __contains__(self, item):
return item == self.name
@property
def _hints(self):
return [self.name]
def set_value(self, fake_value: float = 1.0) -> None:
"""
Setup fake value for device readout
Args:
fake_value(float): Desired fake value
"""
self.signals[self.name]["value"] = fake_value
def describe(self) -> dict:
"""
Get the description of the device
Returns:
dict: Description of the device
"""
return self.description
class FakePositioner(BECPositioner):
def __init__(
self,
name,
enabled=True,
limits=None,
read_value=1.0,
readout_priority=ReadoutPriority.MONITORED,
):
super().__init__(name=name)
# self.limits = limits if limits is not None else [0.0, 0.0]
self.read_value = read_value
self.setpoint_value = read_value
self.motor_is_moving_value = 0
self._enabled = enabled
self._limits = limits
self._readout_priority = readout_priority
self.signals = {self.name: {"value": 1.0}}
self.description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
self._config = {
"readoutPriority": "baseline",
"deviceClass": "ophyd_devices.SimPositioner",
"deviceConfig": {"delay": 1, "tolerance": 0.01, "update_frequency": 400},
"deviceTags": {"user motors"},
"enabled": enabled,
"readOnly": False,
"name": self.name,
}
self._info = {
"signals": {
"readback": {
"kind_str": "hinted",
"component_name": "readback",
"obj_name": self.name,
}, # hinted
"setpoint": {
"kind_str": "normal",
"component_name": "setpoint",
"obj_name": f"{self.name}_setpoint",
}, # normal
"velocity": {
"kind_str": "config",
"component_name": "velocity",
"obj_name": f"{self.name}_velocity",
}, # config
}
}
self.signals = {
self.name: {"value": self.read_value},
f"{self.name}_setpoint": {"value": self.setpoint_value},
f"{self.name}_motor_is_moving": {"value": self.motor_is_moving_value},
}
@property
def readout_priority(self):
return self._readout_priority
@readout_priority.setter
def readout_priority(self, value):
self._readout_priority = value
@property
def enabled(self) -> bool:
return self._enabled
@enabled.setter
def enabled(self, value: bool):
self._enabled = value
@property
def limits(self) -> tuple[float, float]:
return self._limits
@limits.setter
def limits(self, value: tuple[float, float]):
self._limits = value
def __contains__(self, item):
return item == self.name
@property
def _hints(self):
return [self.name]
def set_value(self, fake_value: float = 1.0) -> None:
"""
Setup fake value for device readout
Args:
fake_value(float): Desired fake value
"""
self.read_value = fake_value
def describe(self) -> dict:
"""
Get the description of the device
Returns:
dict: Description of the device
"""
return self.description
@property
def precision(self):
return 3
def set_read_value(self, value):
self.read_value = value
def read(self, cached=False):
return self.signals
def set_limits(self, limits):
self.limits = limits
def move(self, value, relative=False):
"""Simulates moving the device to a new position."""
if relative:
self.read_value += value
else:
self.read_value = value
# Respect the limits
self.read_value = max(min(self.read_value, self.limits[1]), self.limits[0])
@property
def readback(self):
return MagicMock(get=MagicMock(return_value=self.read_value))
class Positioner(FakePositioner):
"""just placeholder for testing embedded isinstance check in DeviceCombobox"""
def __init__(self, name="test", limits=None, read_value=1.0, enabled=True):
super().__init__(name, limits=limits, read_value=read_value, enabled=enabled)
class Device(FakeDevice):
"""just placeholder for testing embedded isinstance check in DeviceCombobox"""
def __init__(self, name, enabled=True):
super().__init__(name, enabled)
class DMMock:
def __init__(self):
self.devices = DeviceContainer()
self.enabled_devices = [device for device in self.devices if device.enabled]
def add_devices(self, devices: list):
"""
Add devices to the DeviceContainer.
Args:
devices (list): List of device instances to add.
"""
for device in devices:
self.devices[device.name] = device
def get_bec_signals(self, signal_class_name: str):
"""
Emulate DeviceManager.get_bec_signals for unit-tests.
For “AsyncSignal” we list every device whose readout_priority is
ReadoutPriority.ASYNC and build a minimal tuple
(device_name, signal_name, signal_info_dict) that matches the real
API shape used by Waveform._check_async_signal_found.
"""
signals: list[tuple[str, str, dict]] = []
if signal_class_name != "AsyncSignal":
return signals
for device in self.devices.values():
if getattr(device, "readout_priority", None) == ReadoutPriority.ASYNC:
device_name = device.name
signal_name = device.name # primary signal in our mocks
signal_info = {
"component_name": signal_name,
"obj_name": signal_name,
"kind_str": "hinted",
"signal_class": signal_class_name,
"metadata": {
"connected": True,
"precision": None,
"read_access": True,
"timestamp": 0.0,
"write_access": True,
},
}
signals.append((device_name, signal_name, signal_info))
return signals
DEVICES = [
FakePositioner("samx", limits=[-10, 10], read_value=2.0),
FakePositioner("samy", limits=[-5, 5], read_value=3.0),
FakePositioner("samz", limits=[-8, 8], read_value=4.0),
FakePositioner("aptrx", limits=None, read_value=4.0),
FakePositioner("aptry", limits=None, read_value=5.0),
FakeDevice("gauss_bpm"),
FakeDevice("gauss_adc1"),
FakeDevice("gauss_adc2"),
FakeDevice("gauss_adc3"),
FakeDevice("bpm4i"),
FakeDevice("bpm3a"),
FakeDevice("bpm3i"),
FakeDevice("eiger", readout_priority=ReadoutPriority.ASYNC),
FakeDevice("waveform1d"),
FakeDevice("async_device", readout_priority=ReadoutPriority.ASYNC),
Positioner("test", limits=[-10, 10], read_value=2.0),
Device("test_device"),
]
def check_remote_data_size(widget, plot_name, num_elements):
"""
Check if the remote data has the correct number of elements.
Used in the qtbot.waitUntil function.
"""
return len(widget.get_all_data()[plot_name]["x"]) == num_elements

View File

@@ -1,76 +1,285 @@
from pytestqt.exceptions import TimeoutError as QtBotTimeoutError
from qtpy.QtCore import QTimer
from qtpy.QtWidgets import QApplication
from unittest.mock import MagicMock
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module
from bec_widgets.utils import error_popups
from bec_lib.device import Device as BECDevice
from bec_lib.device import Positioner as BECPositioner
from bec_lib.device import ReadoutPriority
from bec_lib.devicemanager import DeviceContainer
class TestableQTimer(QTimer):
_instances: list[tuple[QTimer, str]] = []
_current_test_name: str = ""
class FakeDevice(BECDevice):
"""Fake minimal positioner class for testing."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
TestableQTimer._instances.append((self, TestableQTimer._current_test_name))
def __init__(self, name, enabled=True, readout_priority=ReadoutPriority.MONITORED):
super().__init__(name=name)
self._enabled = enabled
self.signals = {self.name: {"value": 1.0}}
self.description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
self._readout_priority = readout_priority
self._config = {
"readoutPriority": "baseline",
"deviceClass": "ophyd.Device",
"deviceConfig": {},
"deviceTags": {"user device"},
"enabled": enabled,
"readOnly": False,
"name": self.name,
}
@classmethod
def check_all_stopped(cls, qtbot):
def _is_done_or_deleted(t: QTimer):
try:
return not t.isActive()
except RuntimeError as e:
return "already deleted" in e.args[0]
@property
def readout_priority(self):
return self._readout_priority
try:
qtbot.waitUntil(lambda: all(_is_done_or_deleted(timer) for timer, _ in cls._instances))
except QtBotTimeoutError as exc:
active_timers = list(filter(lambda t: t[0].isActive(), cls._instances))
(t.stop() for t, _ in cls._instances)
raise TimeoutError(f"Failed to stop all timers: {active_timers}") from exc
cls._instances = []
@readout_priority.setter
def readout_priority(self, value):
self._readout_priority = value
@property
def limits(self) -> tuple[float, float]:
return self._limits
@limits.setter
def limits(self, value: tuple[float, float]):
self._limits = value
def __contains__(self, item):
return item == self.name
@property
def _hints(self):
return [self.name]
def set_value(self, fake_value: float = 1.0) -> None:
"""
Setup fake value for device readout
Args:
fake_value(float): Desired fake value
"""
self.signals[self.name]["value"] = fake_value
def describe(self) -> dict:
"""
Get the description of the device
Returns:
dict: Description of the device
"""
return self.description
def qapplication_fixture(qtbot, request, testable_qtimer_class):
yield
class FakePositioner(BECPositioner):
if request.node.stash._storage.get("failed"):
print("Test failed, skipping cleanup checks")
return
def __init__(
self,
name,
enabled=True,
limits=None,
read_value=1.0,
readout_priority=ReadoutPriority.MONITORED,
):
super().__init__(name=name)
# self.limits = limits if limits is not None else [0.0, 0.0]
self.read_value = read_value
self.setpoint_value = read_value
self.motor_is_moving_value = 0
self._enabled = enabled
self._limits = limits
self._readout_priority = readout_priority
self.signals = {self.name: {"value": 1.0}}
self.description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
self._config = {
"readoutPriority": "baseline",
"deviceClass": "ophyd_devices.SimPositioner",
"deviceConfig": {"delay": 1, "tolerance": 0.01, "update_frequency": 400},
"deviceTags": {"user motors"},
"enabled": enabled,
"readOnly": False,
"name": self.name,
}
self._info = {
"signals": {
"readback": {
"kind_str": "hinted",
"component_name": "readback",
"obj_name": self.name,
}, # hinted
"setpoint": {
"kind_str": "normal",
"component_name": "setpoint",
"obj_name": f"{self.name}_setpoint",
}, # normal
"velocity": {
"kind_str": "config",
"component_name": "velocity",
"obj_name": f"{self.name}_velocity",
}, # config
}
}
self.signals = {
self.name: {"value": self.read_value},
f"{self.name}_setpoint": {"value": self.setpoint_value},
f"{self.name}_motor_is_moving": {"value": self.motor_is_moving_value},
}
bec_dispatcher = bec_dispatcher_module.BECDispatcher()
bec_dispatcher.stop_cli_server()
@property
def readout_priority(self):
return self._readout_priority
testable_qtimer_class.check_all_stopped(qtbot)
qapp = QApplication.instance()
qapp.processEvents()
if hasattr(qapp, "os_listener") and qapp.os_listener:
qapp.removeEventFilter(qapp.os_listener)
try:
qtbot.waitUntil(lambda: qapp.topLevelWidgets() == [])
except QtBotTimeoutError as exc:
raise TimeoutError(f"Failed to close all widgets: {qapp.topLevelWidgets()}") from exc
@readout_priority.setter
def readout_priority(self, value):
self._readout_priority = value
@property
def enabled(self) -> bool:
return self._enabled
@enabled.setter
def enabled(self, value: bool):
self._enabled = value
@property
def limits(self) -> tuple[float, float]:
return self._limits
@limits.setter
def limits(self, value: tuple[float, float]):
self._limits = value
def __contains__(self, item):
return item == self.name
@property
def _hints(self):
return [self.name]
def set_value(self, fake_value: float = 1.0) -> None:
"""
Setup fake value for device readout
Args:
fake_value(float): Desired fake value
"""
self.read_value = fake_value
def describe(self) -> dict:
"""
Get the description of the device
Returns:
dict: Description of the device
"""
return self.description
@property
def precision(self):
return 3
def set_read_value(self, value):
self.read_value = value
def read(self):
return self.signals
def set_limits(self, limits):
self.limits = limits
def move(self, value, relative=False):
"""Simulates moving the device to a new position."""
if relative:
self.read_value += value
else:
self.read_value = value
# Respect the limits
self.read_value = max(min(self.read_value, self.limits[1]), self.limits[0])
@property
def readback(self):
return MagicMock(get=MagicMock(return_value=self.read_value))
def rpc_register_fixture():
try:
yield RPCRegister()
finally:
RPCRegister.reset_singleton()
class Positioner(FakePositioner):
"""just placeholder for testing embedded isinstance check in DeviceCombobox"""
def __init__(self, name="test", limits=None, read_value=1.0, enabled=True):
super().__init__(name, limits=limits, read_value=read_value, enabled=enabled)
def bec_dispatcher_fixture(threads_check):
bec_dispatcher = bec_dispatcher_module.BECDispatcher()
try:
yield bec_dispatcher
finally:
bec_dispatcher.disconnect_all()
bec_dispatcher.client.shutdown()
bec_dispatcher.stop_cli_server()
bec_dispatcher_module.BECDispatcher.reset_singleton()
class Device(FakeDevice):
"""just placeholder for testing embedded isinstance check in DeviceCombobox"""
def __init__(self, name, enabled=True):
super().__init__(name, enabled)
def clean_singleton_fixture():
error_popups._popup_utility_instance = None
yield
class DMMock:
def __init__(self):
self.devices = DeviceContainer()
self.enabled_devices = [device for device in self.devices if device.enabled]
def add_devices(self, devices: list):
"""
Add devices to the DeviceContainer.
Args:
devices (list): List of device instances to add.
"""
for device in devices:
self.devices[device.name] = device
def get_bec_signals(self, signal_class_name: str):
"""
Emulate DeviceManager.get_bec_signals for unit-tests.
For “AsyncSignal” we list every device whose readout_priority is
ReadoutPriority.ASYNC and build a minimal tuple
(device_name, signal_name, signal_info_dict) that matches the real
API shape used by Waveform._check_async_signal_found.
"""
signals: list[tuple[str, str, dict]] = []
if signal_class_name != "AsyncSignal":
return signals
for device in self.devices.values():
if getattr(device, "readout_priority", None) == ReadoutPriority.ASYNC:
device_name = device.name
signal_name = device.name # primary signal in our mocks
signal_info = {
"component_name": signal_name,
"obj_name": signal_name,
"kind_str": "hinted",
"signal_class": signal_class_name,
"metadata": {
"connected": True,
"precision": None,
"read_access": True,
"timestamp": 0.0,
"write_access": True,
},
}
signals.append((device_name, signal_name, signal_info))
return signals
DEVICES = [
FakePositioner("samx", limits=[-10, 10], read_value=2.0),
FakePositioner("samy", limits=[-5, 5], read_value=3.0),
FakePositioner("samz", limits=[-8, 8], read_value=4.0),
FakePositioner("aptrx", limits=None, read_value=4.0),
FakePositioner("aptry", limits=None, read_value=5.0),
FakeDevice("gauss_bpm"),
FakeDevice("gauss_adc1"),
FakeDevice("gauss_adc2"),
FakeDevice("gauss_adc3"),
FakeDevice("bpm4i"),
FakeDevice("bpm3a"),
FakeDevice("bpm3i"),
FakeDevice("eiger", readout_priority=ReadoutPriority.ASYNC),
FakeDevice("waveform1d"),
FakeDevice("async_device", readout_priority=ReadoutPriority.ASYNC),
Positioner("test", limits=[-10, 10], read_value=2.0),
Device("test_device"),
]
def check_remote_data_size(widget, plot_name, num_elements):
"""
Check if the remote data has the correct number of elements.
Used in the qtbot.waitUntil function.
"""
return len(widget.get_all_data()[plot_name]["x"]) == num_elements

View File

@@ -1,25 +1,44 @@
from bec_lib.codecs import BECCodec
from bec_lib.serialization import msgpack
from qtpy.QtCore import QPointF
class QPointFEncoder(BECCodec):
obj_type = QPointF
@staticmethod
def encode(obj: QPointF) -> list[float]:
"""Encode a QPointF object to a list of floats."""
return [obj.x(), obj.y()]
@staticmethod
def decode(type_name: str, data: list[float]) -> list[float]:
"""No-op function since QPointF is encoded as a list of floats."""
return data
def register_serializer_extension():
"""
Register the serializer extension for the BECConnector.
"""
if not msgpack.is_registered(QPointF):
msgpack.register(QPointF, QPointFEncoder.encode, QPointFEncoder.decode)
if not module_is_registered("bec_widgets.utils.serialization"):
msgpack.register_object_hook(encode_qpointf, decode_qpointf)
def module_is_registered(module_name: str) -> bool:
"""
Check if the module is registered in the encoder.
Args:
module_name (str): The name of the module to check.
Returns:
bool: True if the module is registered, False otherwise.
"""
# pylint: disable=protected-access
for enc in msgpack._encoder:
if enc[0].__module__ == module_name:
return True
return False
def encode_qpointf(obj):
"""
Encode a QPointF object to a list of floats. As this is mostly used for sending
data to the client, it is not necessary to convert it back to a QPointF object.
"""
if isinstance(obj, QPointF):
return [obj.x(), obj.y()]
return obj
def decode_qpointf(obj):
"""
no-op function since QPointF is encoded as a list of floats.
"""
return obj

View File

@@ -222,10 +222,9 @@ class BECDockArea(BECWidget, QWidget):
filled=True,
parent=self,
),
# FIXME temporarily disabled -> issue #644
"log_panel": MaterialIconAction(
icon_name=LogPanel.ICON_NAME,
tooltip="Add LogPanel - Disabled",
tooltip="Add LogPanel",
filled=True,
parent=self,
),
@@ -326,9 +325,9 @@ class BECDockArea(BECWidget, QWidget):
menu_utils.actions["progress_bar"].action.triggered.connect(
lambda: self._create_widget_from_toolbar(widget_name="RingProgressBar")
)
# FIXME temporarily disabled -> issue #644
menu_utils.actions["log_panel"].action.setEnabled(False)
menu_utils.actions["log_panel"].action.triggered.connect(
lambda: self._create_widget_from_toolbar(widget_name="LogPanel")
)
menu_utils.actions["sbb_monitor"].action.triggered.connect(
lambda: self._create_widget_from_toolbar(widget_name="SBBMonitor")
)

View File

@@ -88,7 +88,7 @@ class PositionerBoxBase(BECWidget, CompactPopupWidget):
if not self._check_device_is_valid(device):
return
data = self.dev[device].read(cached=True)
data = self.dev[device].read()
self._on_device_readback(
device,
self._device_ui_components(device),

View File

@@ -34,15 +34,7 @@ class PositionerBox2D(PositionerBoxBase):
PLUGIN = True
RPC = True
USER_ACCESS = [
"set_positioner_hor",
"set_positioner_ver",
"screenshot",
"enable_controls_hor",
"enable_controls_hor.setter",
"enable_controls_ver",
"enable_controls_ver.setter",
]
USER_ACCESS = ["set_positioner_hor", "set_positioner_ver", "screenshot"]
device_changed_hor = Signal(str, str)
device_changed_ver = Signal(str, str)
@@ -71,8 +63,6 @@ class PositionerBox2D(PositionerBoxBase):
self._limits_hor = None
self._limits_ver = None
self._dialog = None
self._enable_controls_hor = True
self._enable_controls_ver = True
if self.current_path == "":
self.current_path = os.path.dirname(__file__)
self.init_ui()
@@ -291,7 +281,6 @@ class PositionerBox2D(PositionerBoxBase):
self.on_device_readback_hor,
self._device_ui_components_hv("horizontal"),
)
self._apply_controls_enabled("horizontal")
@SafeSlot(str, str)
def on_device_change_ver(self, old_device: str, new_device: str):
@@ -311,7 +300,6 @@ class PositionerBox2D(PositionerBoxBase):
self.on_device_readback_ver,
self._device_ui_components_hv("vertical"),
)
self._apply_controls_enabled("vertical")
def _device_ui_components_hv(self, device: DeviceId) -> DeviceUpdateUIComponents:
if device == "horizontal":
@@ -349,25 +337,6 @@ class PositionerBox2D(PositionerBoxBase):
if device == self.device_ver:
return self._device_ui_components_hv("vertical")
def _apply_controls_enabled(self, axis: DeviceId):
state = self._enable_controls_hor if axis == "horizontal" else self._enable_controls_ver
if axis == "horizontal":
widgets = [
self.ui.tweak_increase_hor,
self.ui.tweak_decrease_hor,
self.ui.step_increase_hor,
self.ui.step_decrease_hor,
]
else:
widgets = [
self.ui.tweak_increase_ver,
self.ui.tweak_decrease_ver,
self.ui.step_increase_ver,
self.ui.step_decrease_ver,
]
for w in widgets:
w.setEnabled(state)
@SafeSlot(dict, dict)
def on_device_readback_hor(self, msg_content: dict, metadata: dict):
"""Callback for device readback.
@@ -448,26 +417,6 @@ class PositionerBox2D(PositionerBoxBase):
"""Step size for tweak"""
self.ui.step_size_ver.setValue(val)
@SafeProperty(bool)
def enable_controls_hor(self) -> bool:
"""Persisted switch for horizontal control buttons (tweak/step)."""
return self._enable_controls_hor
@enable_controls_hor.setter
def enable_controls_hor(self, value: bool):
self._enable_controls_hor = value
self._apply_controls_enabled("horizontal")
@SafeProperty(bool)
def enable_controls_ver(self) -> bool:
"""Persisted switch for vertical control buttons (tweak/step)."""
return self._enable_controls_ver
@enable_controls_ver.setter
def enable_controls_ver(self, value: bool):
self._enable_controls_ver = value
self._apply_controls_enabled("vertical")
@SafeSlot()
def on_tweak_inc_hor(self):
"""Tweak device a up"""

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
import math
from typing import TYPE_CHECKING, Literal
from typing import TYPE_CHECKING
from bec_lib import bec_logger
from bec_qthemes import material_icon
@@ -73,16 +73,11 @@ class ROIPropertyTree(BECWidget, QWidget):
- Children: type, line-width (spin box), coordinates (auto-updating).
Args:
parent (QWidget, optional): Parent widget. Defaults to None.
image_widget (Image): The main Image widget that displays the ImageItem.
Provides ``plot_item`` and owns an ROIController already.
controller (ROIController, optional): Optionally pass an external controller.
If None, the manager uses ``image_widget.roi_controller``.
compact (bool, optional): If True, use a compact mode with no tree view,
only a toolbar with draw actions. Defaults to False.
compact_orientation (str, optional): Orientation of the toolbar in compact mode.
Either "vertical" or "horizontal". Defaults to "vertical".
compact_color (str, optional): Color of the single active ROI in compact mode.
parent (QWidget, optional): Parent widget. Defaults to None.
"""
PLUGIN = False
@@ -97,18 +92,11 @@ class ROIPropertyTree(BECWidget, QWidget):
parent: QWidget = None,
image_widget: Image,
controller: ROIController | None = None,
compact: bool = False,
compact_orientation: Literal["vertical", "horizontal"] = "vertical",
compact_color: str = "#f0f0f0",
):
super().__init__(
parent=parent, config=ConnectionConfig(widget_class=self.__class__.__name__)
)
self.compact = compact
self.compact_orient = compact_orientation
self.compact_color = compact_color
self.single_active_roi: BaseROI | None = None
if controller is None:
# Use the controller already belonging to the Image widget
@@ -124,29 +112,22 @@ class ROIPropertyTree(BECWidget, QWidget):
self.layout = QVBoxLayout(self)
self._init_toolbar()
if not self.compact:
self._init_tree()
else:
self.tree = None
self._init_tree()
# connect controller
self.controller.roiAdded.connect(self._on_roi_added)
self.controller.roiRemoved.connect(self._on_roi_removed)
if not self.compact:
self.controller.cleared.connect(self.tree.clear)
self.controller.cleared.connect(self.tree.clear)
# initial load
for r in self.controller.rois:
self._on_roi_added(r)
if not self.compact:
self.tree.collapseAll()
self.tree.collapseAll()
# --------------------------------------------------------------------- UI
def _init_toolbar(self):
tb = self.toolbar = ModularToolBar(
self, orientation=self.compact_orient if self.compact else "horizontal"
)
tb = self.toolbar = ModularToolBar(self, orientation="horizontal")
self._draw_actions: dict[str, MaterialIconAction] = {}
# --- ROI draw actions (toggleable) ---
@@ -176,29 +157,6 @@ class ROIPropertyTree(BECWidget, QWidget):
for mode, act in self._draw_actions.items():
act.action.toggled.connect(lambda on, m=mode: self._on_draw_action_toggled(m, on))
if self.compact:
tb.components.add_safe(
"compact_delete",
MaterialIconAction("delete", "Delete Current Roi", checkable=False, parent=self),
)
bundle.add_action("compact_delete")
tb.components.get_action("compact_delete").action.triggered.connect(
lambda _: (
self.controller.remove_roi(self.single_active_roi)
if self.single_active_roi is not None
else None
)
)
tb.show_bundles(["roi_draw"])
self.layout.addWidget(tb)
# ROI drawing state (needed even in compact mode)
self._roi_draw_mode = None
self._roi_start_pos = None
self._temp_roi = None
self.plot.scene().installEventFilter(self)
return
# Expand/Collapse toggle
self.expand_toggle = MaterialIconAction(
"unfold_more", "Expand/Collapse", checkable=True, parent=self # icon when collapsed
@@ -369,21 +327,13 @@ class ROIPropertyTree(BECWidget, QWidget):
self._set_roi_draw_mode(None)
# register via controller
self.controller.add_roi(final_roi)
if self.compact:
final_roi.line_color = self.compact_color
return True
return super().eventFilter(obj, event)
# --------------------------------------------------------- controller slots
def _on_roi_added(self, roi: BaseROI):
if self.compact:
roi.line_color = self.compact_color
if self.single_active_roi is not None and self.single_active_roi is not roi:
self.controller.remove_roi(self.single_active_roi)
self.single_active_roi = roi
return
# check the global setting from the toolbar
if hasattr(self, "lock_all_action") and self.lock_all_action.action.isChecked():
if self.lock_all_action.action.isChecked():
roi.movable = False
# parent row with blank action column, name in ROI column
parent = QTreeWidgetItem(self.tree, ["", "", ""])
@@ -474,10 +424,6 @@ class ROIPropertyTree(BECWidget, QWidget):
roi.movable = not roi.movable
def _on_roi_removed(self, roi: BaseROI):
if self.compact:
if self.single_active_roi is roi:
self.single_active_roi = None
return
item = self.roi_items.pop(roi, None)
if item:
idx = self.tree.indexOfTopLevelItem(item)
@@ -503,9 +449,8 @@ class ROIPropertyTree(BECWidget, QWidget):
self.controller.remove_roi(roi)
def cleanup(self):
if hasattr(self, "cmap"):
self.cmap.close()
self.cmap.deleteLater()
self.cmap.close()
self.cmap.deleteLater()
if self.controller and hasattr(self.controller, "rois"):
for roi in self.controller.rois: # disconnect all signals from ROIs
try:
@@ -546,8 +491,8 @@ if __name__ == "__main__": # pragma: no cover
# Add the image widget on the left
ml.addWidget(image_widget)
# ROI manager linked to that image with compact mode
mgr = ROIPropertyTree(parent=image_widget, image_widget=image_widget, compact=True)
# ROI manager linked to that image
mgr = ROIPropertyTree(parent=image_widget, image_widget=image_widget)
mgr.setFixedWidth(350)
ml.addWidget(mgr)

View File

@@ -765,7 +765,7 @@ class MotorMap(PlotBase):
float: Motor initial position.
"""
entry = self.entry_validator.validate_signal(name, None)
init_position = round(float(self.dev[name].read(cached=True)[entry]["value"]), precision)
init_position = round(float(self.dev[name].read()[entry]["value"]), precision)
return init_position
def _sync_motor_map_selection_toolbar(self):

View File

@@ -174,8 +174,6 @@ class BaseROI(BECConnector):
self.remove_scale_handles() # remove any existing handles from pyqtgraph.RectROI
if movable:
self.add_scale_handle() # add custom scale handles
if hasattr(self, "sigRemoveRequested"):
self.sigRemoveRequested.connect(self.remove)
def set_parent(self, parent: Image):
"""
@@ -558,8 +556,8 @@ class RectangularROI(BaseROI, pg.RectROI):
def get_coordinates(self, typed: bool | None = None) -> dict | tuple:
"""
Returns the coordinates of a rectangle's corners, rectangle center and dimensions.
Supports returning them as either a dictionary with descriptive keys or a tuple of coordinates.
Returns the coordinates of a rectangle's corners. Supports returning them
as either a dictionary with descriptive keys or a tuple of coordinates.
Args:
typed (bool | None): If True, returns coordinates as a dictionary with
@@ -567,17 +565,13 @@ class RectangularROI(BaseROI, pg.RectROI):
the value of `self.description`.
Returns:
dict | tuple: The rectangle's corner coordinates, rectangle center and dimensions, where the format
dict | tuple: The rectangle's corner coordinates, where the format
depends on the `typed` parameter.
"""
if typed is None:
typed = self.description
x_left, y_bottom, x_right, y_top = self._normalized_edges()
width = x_right - x_left
height = y_top - y_bottom
cx = x_left + width / 2
cy = y_bottom + height / 2
if typed:
return {
@@ -585,19 +579,8 @@ class RectangularROI(BaseROI, pg.RectROI):
"bottom_right": (x_right, y_bottom),
"top_left": (x_left, y_top),
"top_right": (x_right, y_top),
"center_x": cx,
"center_y": cy,
"width": width,
"height": height,
}
return (
(x_left, y_bottom),
(x_right, y_bottom),
(x_left, y_top),
(x_right, y_top),
(cx, cy),
(width, height),
)
return (x_left, y_bottom), (x_right, y_bottom), (x_left, y_top), (x_right, y_top)
def _lookup_scene_image(self):
"""

View File

@@ -42,15 +42,10 @@ class CurveConfig(ConnectionConfig):
pen_style: Literal["solid", "dash", "dot", "dashdot"] | None = Field(
"solid", description="The style of the pen of the curve."
)
source: Literal["device", "dap", "custom", "history"] = Field(
source: Literal["device", "dap", "custom"] = Field(
"custom", description="The source of the curve."
)
signal: DeviceSignal | None = Field(None, description="The signal of the curve.")
scan_id: str | None = Field(None, description="Scan ID to be used when `source` is 'history'.")
scan_number: int | None = Field(
None, description="Scan index to be used when `source` is 'history'."
)
current_x_mode: str | None = Field(None, description="The current x mode of the history curve.")
parent_label: str | None = Field(
None, description="The label of the parent plot, only relevant for dap curves."
)
@@ -204,7 +199,7 @@ class Curve(BECConnector, pg.PlotDataItem):
Raises:
ValueError: If the source is not custom.
"""
if self.config.source in ["custom", "history"]:
if self.config.source == "custom":
self.setData(x, y)
else:
raise ValueError(f"Source {self.config.source} do not allow custom data setting.")

View File

@@ -5,34 +5,7 @@ from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
from bec_qthemes._icon.material_icons import material_icon
from qtpy.QtGui import QValidator
class ScanIndexValidator(QValidator):
"""Validator to allow only 'live' or integer scan numbers from an allowed set."""
def __init__(self, allowed_scans: set[int] | None = None, parent=None):
super().__init__(parent)
self.allowed_scans = allowed_scans or set()
def validate(self, input_str: str, pos: int):
# Accept empty or 'live'
if input_str == "" or input_str == "live":
return QValidator.State.Acceptable, input_str, pos
# Allow partial editing of "live"
if "live".startswith(input_str):
return QValidator.State.Intermediate, input_str, pos
# Accept integer only if present in the allowed set
if input_str.isdigit():
try:
num = int(input_str)
except ValueError:
return QValidator.State.Invalid, input_str, pos
if num in self.allowed_scans:
return QValidator.State.Acceptable, input_str, pos
return QValidator.State.Invalid, input_str, pos
from qtpy.QtCore import Qt
from qtpy.QtWidgets import (
QComboBox,
QHBoxLayout,
@@ -118,60 +91,8 @@ class CurveRow(QTreeWidgetItem):
# Create columns 1..2, depending on source
self._init_source_ui()
# Create columns 3..6 (color, style, width, symbol)
self._init_scan_index_ui()
self._init_style_controls()
def _init_scan_index_ui(self):
"""Create the Scan # editable combobox in column 3."""
if self.source not in ("device", "history"):
return
self.scan_index_combo = QComboBox()
self.scan_index_combo.setEditable(True)
# Populate 'live' and all available history scan indices
self.scan_index_combo.addItem("live", None)
scan_number_list = []
scan_id_list = []
try:
history = getattr(self.curve_tree.client, "history", None)
if history is not None:
scan_number_list = getattr(history, "_scan_numbers", []) or []
scan_id_list = getattr(history, "_scan_ids", []) or []
except Exception as e:
logger.error(f"Cannot fetch scan numbers from BEC client: {e}")
# If scan numbers cannot be fetched, only provide 'live' option
scan_number_list = []
scan_id_list = []
# Restrict input to 'live' or valid scan numbers
allowed = set()
try:
allowed = set(int(n) for n in scan_number_list if isinstance(n, (int, str)))
except Exception:
allowed = set()
validator = ScanIndexValidator(allowed, self.scan_index_combo)
self.scan_index_combo.lineEdit().setValidator(validator)
# Add items: show scan numbers, store scan IDs as item data
if scan_number_list and scan_id_list and len(scan_number_list) == len(scan_id_list):
for num, sid in zip(scan_number_list, scan_id_list):
self.scan_index_combo.addItem(str(num), sid)
else:
logger.error("Scan number and ID lists are mismatched or empty.")
# Select current based on existing config
selected = False
if getattr(self.config, "scan_id", None): # scan_id matching only
for i in range(self.scan_index_combo.count()):
if self.scan_index_combo.itemData(i) == self.config.scan_id:
self.scan_index_combo.setCurrentIndex(i)
selected = True
break
if not selected:
self.scan_index_combo.setCurrentText("live")
self.tree.setItemWidget(self, 3, self.scan_index_combo)
def _init_actions(self):
"""Create the actions widget in column 0, including a delete button and maybe 'Add DAP'."""
self.actions_widget = QWidget()
@@ -193,7 +114,7 @@ class CurveRow(QTreeWidgetItem):
actions_layout.addWidget(self.delete_button)
# If device row, add "Add DAP" button
if self.source in ("device", "history"):
if self.source == "device":
self.add_dap_button = QPushButton("DAP")
self.add_dap_button.clicked.connect(lambda: self.add_dap_row())
actions_layout.addWidget(self.add_dap_button)
@@ -202,7 +123,7 @@ class CurveRow(QTreeWidgetItem):
def _init_source_ui(self):
"""Create columns 1 and 2. For device rows, we have device/entry edits; for dap rows, label/model combo."""
if self.source in ("device", "history"):
if self.source == "device":
# Device row: columns 1..2 are device line edits
self.device_edit = DeviceComboBox(parent=self.tree)
self.device_edit.insertItem(0, "")
@@ -231,6 +152,7 @@ class CurveRow(QTreeWidgetItem):
self.tree.setItemWidget(self, 1, self.device_edit)
self.tree.setItemWidget(self, 2, self.entry_edit)
else:
# DAP row: column1= "Model" label, column2= DapComboBox
self.label_widget = QLabel("Model")
@@ -249,31 +171,31 @@ class CurveRow(QTreeWidgetItem):
self.tree.setItemWidget(self, 2, self.dap_combo)
def _init_style_controls(self):
"""Create columns 4..7: color button, style combo, width spin, symbol spin."""
# Color in col 4
"""Create columns 3..6: color button, style combo, width spin, symbol spin."""
# Color in col 3
self.color_button = ColorButtonNative(color=self.config.color)
self.color_button.color_changed.connect(self._on_color_changed)
self.tree.setItemWidget(self, 4, self.color_button)
self.tree.setItemWidget(self, 3, self.color_button)
# Style in col 5
# Style in col 4
self.style_combo = QComboBox()
self.style_combo.addItems(["solid", "dash", "dot", "dashdot"])
idx = self.style_combo.findText(self.config.pen_style)
if idx >= 0:
self.style_combo.setCurrentIndex(idx)
self.tree.setItemWidget(self, 5, self.style_combo)
self.tree.setItemWidget(self, 4, self.style_combo)
# Pen width in col 6
# Pen width in col 5
self.width_spin = QSpinBox()
self.width_spin.setRange(1, 20)
self.width_spin.setValue(self.config.pen_width)
self.tree.setItemWidget(self, 6, self.width_spin)
self.tree.setItemWidget(self, 5, self.width_spin)
# Symbol size in col 7
# Symbol size in col 6
self.symbol_spin = QSpinBox()
self.symbol_spin.setRange(1, 20)
self.symbol_spin.setValue(self.config.symbol_size)
self.tree.setItemWidget(self, 7, self.symbol_spin)
self.tree.setItemWidget(self, 6, self.symbol_spin)
@SafeSlot(str, verify_sender=True)
def _on_color_changed(self, new_color: str):
@@ -287,8 +209,8 @@ class CurveRow(QTreeWidgetItem):
self.config.symbol_color = new_color
def add_dap_row(self):
"""Create a new DAP row as a child. Only valid if source is 'device' or 'history'."""
if self.source not in ("device", "history"):
"""Create a new DAP row as a child. Only valid if source='device'."""
if self.source != "device":
return
curve_tree = self.tree.parent()
parent_label = self.config.label
@@ -366,7 +288,7 @@ class CurveRow(QTreeWidgetItem):
Returns:
dict: The serialized config based on the GUI state.
"""
if self.source in ("device", "history"):
if self.source == "device":
# Gather device name/entry
device_name = ""
device_entry = ""
@@ -387,23 +309,8 @@ class CurveRow(QTreeWidgetItem):
)
self.config.signal = DeviceSignal(name=device_name, entry=device_entry)
scan_combo_text = self.scan_index_combo.currentText()
if scan_combo_text == "live" or scan_combo_text == "":
self.config.scan_number = None
self.config.scan_id = None
self.config.source = "device"
self.config.label = f"{device_name}-{device_entry}"
if scan_combo_text.isdigit():
try:
scan_num = int(scan_combo_text)
except ValueError:
scan_num = None
self.config.scan_number = scan_num
self.config.scan_id = self.scan_index_combo.currentData()
self.config.source = "history"
# Label history curves with scan number suffix
if scan_num is not None:
self.config.label = f"{device_name}-{device_entry}-scan-{scan_num}"
self.config.source = "device"
self.config.label = f"{device_name}-{device_entry}"
else:
# DAP logic
parent_conf_dict = {}
@@ -536,12 +443,10 @@ class CurveTree(BECWidget, QWidget):
self.toolbar.show_bundles(["curve_tree"])
def _init_tree(self):
"""Initialize the QTreeWidget with 8 columns and compact widths."""
"""Initialize the QTreeWidget with 7 columns and compact widths."""
self.tree = QTreeWidget()
self.tree.setColumnCount(8)
self.tree.setHeaderLabels(
["Actions", "Name", "Entry", "Scan #", "Color", "Style", "Width", "Symbol"]
)
self.tree.setColumnCount(7)
self.tree.setHeaderLabels(["Actions", "Name", "Entry", "Color", "Style", "Width", "Symbol"])
header = self.tree.header()
for idx in range(self.tree.columnCount()):
@@ -551,10 +456,10 @@ class CurveTree(BECWidget, QWidget):
header.setSectionResizeMode(idx, QHeaderView.Fixed)
header.setStretchLastSection(False)
self.tree.setColumnWidth(0, 90)
self.tree.setColumnWidth(4, 70)
self.tree.setColumnWidth(5, 80)
self.tree.setColumnWidth(3, 70)
self.tree.setColumnWidth(4, 80)
self.tree.setColumnWidth(5, 50)
self.tree.setColumnWidth(6, 50)
self.tree.setColumnWidth(7, 50)
self.layout.addWidget(self.tree)
@@ -678,9 +583,9 @@ class CurveTree(BECWidget, QWidget):
self.tree.clear()
self.all_items = []
top_curves = [c for c in self.waveform.curves if c.config.source in ("device", "history")]
device_curves = [c for c in self.waveform.curves if c.config.source == "device"]
dap_curves = [c for c in self.waveform.curves if c.config.source == "dap"]
for dev in top_curves:
for dev in device_curves:
dr = CurveRow(self.tree, parent_item=None, config=dev.config, device_manager=self.dev)
for dap in dap_curves:
if dap.config.parent_label == dev.config.label:

View File

@@ -8,7 +8,6 @@ import numpy as np
import pyqtgraph as pg
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.scan_data_container import ScanDataContainer
from pydantic import Field, ValidationError, field_validator
from qtpy.QtCore import Qt, QTimer, Signal
from qtpy.QtWidgets import (
@@ -36,9 +35,6 @@ from bec_widgets.widgets.plots.plot_base import PlotBase
from bec_widgets.widgets.plots.waveform.curve import Curve, CurveConfig, DeviceSignal
from bec_widgets.widgets.plots.waveform.settings.curve_settings.curve_setting import CurveSetting
from bec_widgets.widgets.plots.waveform.utils.roi_manager import WaveformROIManager
from bec_widgets.widgets.services.scan_history_browser.scan_history_browser import (
ScanHistoryBrowser,
)
logger = bec_logger.logger
@@ -167,7 +163,6 @@ class Waveform(PlotBase):
# Curve data
self._sync_curves = []
self._async_curves = []
self._history_curves = []
self._slice_index = None
self._dap_curves = []
self._mode = None
@@ -184,14 +179,12 @@ class Waveform(PlotBase):
"readout_priority": None,
"label_suffix": "",
}
self._current_x_device: tuple[str, str] | None = None
# Specific GUI elements
self._init_roi_manager()
self.dap_summary = None
self.dap_summary_dialog = None
self.scan_history_dialog = None
self._add_waveform_specific_popup()
self._add_fit_parameters_popup()
self._enable_roi_toolbar_action(False) # default state where are no dap curves
self._init_curve_dialog()
self.curve_settings_dialog = None
@@ -259,7 +252,7 @@ class Waveform(PlotBase):
super().add_side_menus()
self._add_dap_summary_side_menu()
def _add_waveform_specific_popup(self):
def _add_fit_parameters_popup(self):
"""
Add popups to the Waveform widget.
"""
@@ -269,24 +262,11 @@ class Waveform(PlotBase):
icon_name="monitoring", tooltip="Open Fit Parameters", checkable=True, parent=self
),
)
self.toolbar.components.add_safe(
"scan_history",
MaterialIconAction(
icon_name="manage_search",
tooltip="Open Scan History browser",
checkable=True,
parent=self,
),
)
self.toolbar.get_bundle("axis_popup").add_action("fit_params")
self.toolbar.get_bundle("axis_popup").add_action("scan_history")
self.toolbar.components.get_action("fit_params").action.triggered.connect(
self.show_dap_summary_popup
)
self.toolbar.components.get_action("scan_history").action.triggered.connect(
self.show_scan_history_popup
)
@SafeSlot()
def _reset_view(self):
@@ -434,47 +414,6 @@ class Waveform(PlotBase):
self.toolbar.components.get_action("roi_linear").action.setChecked(False)
self._roi_manager.toggle_roi(False)
################################################################################
# Scan History browser popup
# TODO this is so far quick implementation just as popup, we should make scan history also standalone widget later
def show_scan_history_popup(self):
"""
Show the scan history popup.
"""
scan_history_action = self.toolbar.components.get_action("scan_history").action
if self.scan_history_dialog is None or not self.scan_history_dialog.isVisible():
self.scan_history_widget = ScanHistoryBrowser(parent=self)
self.scan_history_dialog = QDialog(modal=False)
self.scan_history_dialog.setWindowTitle(f"{self.object_name} - Scan History Browser")
self.scan_history_dialog.layout = QVBoxLayout(self.scan_history_dialog)
self.scan_history_dialog.layout.addWidget(self.scan_history_widget)
self.scan_history_widget.scan_history_device_viewer.request_history_plot.connect(
lambda scan_id, device_name, signal_name: self.plot(
y_name=device_name, y_entry=signal_name, scan_id=scan_id
)
)
self.scan_history_dialog.finished.connect(self._scan_history_closed)
self.scan_history_dialog.show()
self.scan_history_dialog.resize(780, 320)
scan_history_action.setChecked(True)
else:
# If already open, bring it to the front
self.scan_history_dialog.raise_()
self.scan_history_dialog.activateWindow()
scan_history_action.setChecked(True) # keep it toggle
def _scan_history_closed(self):
"""
Slot for when the scan history dialog is closed.
"""
if self.scan_history_dialog is None:
return
self.scan_history_widget.close()
self.scan_history_widget.deleteLater()
self.scan_history_dialog.deleteLater()
self.scan_history_dialog = None
self.toolbar.components.get_action("scan_history").action.setChecked(False)
################################################################################
# Dap Summary
@@ -564,11 +503,7 @@ class Waveform(PlotBase):
self.x_axis_mode["name"] = value
if value not in ["timestamp", "index", "auto"]:
self.x_axis_mode["entry"] = self.entry_validator.validate_signal(value, None)
self._current_x_device = (value, self.x_axis_mode["entry"])
self._switch_x_axis_item(mode=value)
self._current_x_device = None
self._refresh_history_curves()
self._update_curve_visibility()
self.async_signal_update.emit()
self.sync_signal_update.emit()
self.plot_item.enableAutoRange(x=True)
@@ -596,8 +531,6 @@ class Waveform(PlotBase):
return
self.x_axis_mode["entry"] = self.entry_validator.validate_signal(self.x_mode, value)
self._switch_x_axis_item(mode="device")
self._refresh_history_curves()
self._update_curve_visibility()
self.async_signal_update.emit()
self.sync_signal_update.emit()
self.plot_item.enableAutoRange(x=True)
@@ -738,8 +671,6 @@ class Waveform(PlotBase):
color: str | None = None,
label: str | None = None,
dap: str | None = None,
scan_id: str | None = None,
scan_number: int | None = None,
**kwargs,
) -> Curve:
"""
@@ -762,10 +693,6 @@ class Waveform(PlotBase):
dap(str): The dap model to use for the curve, only available for sync devices.
If not specified, none will be added.
Use the same string as is the name of the LMFit model.
scan_id(str): Optional scan ID. When provided, the curve is treated as a **history** curve and
the ydata (and optional xdata) are fetched from that historical scan. Such curves are
never cleared by livescan resets.
scan_number(int): Optional scan index. When provided, the curve is treated as a **history** curve and
Returns:
Curve: The curve object.
@@ -835,8 +762,6 @@ class Waveform(PlotBase):
label=label,
color=color,
source=source,
scan_id=scan_id,
scan_number=scan_number,
**kwargs,
)
@@ -844,9 +769,6 @@ class Waveform(PlotBase):
if source == "device":
config.signal = DeviceSignal(name=y_name, entry=y_entry)
if scan_id is not None or scan_number is not None:
config.source = "history"
# CREATE THE CURVE
curve = self._add_curve(config=config, x_data=x_data, y_data=y_data)
@@ -885,7 +807,7 @@ class Waveform(PlotBase):
device_curve = self._find_curve_by_label(device_label)
if not device_curve:
raise ValueError(f"No existing curve found with label '{device_label}'.")
if device_curve.config.source not in ("device", "history"):
if device_curve.config.source != "device":
raise ValueError(
f"Curve '{device_label}' is not a device curve. Only device curves can have DAP."
)
@@ -894,7 +816,7 @@ class Waveform(PlotBase):
dev_entry = device_curve.config.signal.entry
# 2) Build a label for the new DAP curve
dap_label = f"{device_label}-{dap_name}"
dap_label = f"{dev_name}-{dev_entry}-{dap_name}"
# 3) Possibly raise if the DAP curve already exists
if self._check_curve_id(dap_label):
@@ -947,23 +869,7 @@ class Waveform(PlotBase):
ValueError: If a duplicate curve label/config is found, or if
custom data is missing for `source='custom'`.
"""
scan_item: ScanDataContainer | None = None
if config.source == "history":
scan_item = self.get_history_scan_item(
scan_id=config.scan_id, scan_index=config.scan_number
)
if scan_item is None:
raise ValueError(
f"Could not find scan item for history curve '{config.label}' with scan_id='{config.scan_id}' and scan_number='{config.scan_number}'."
)
config.scan_id = scan_item.metadata["bec"]["scan_id"]
config.scan_number = scan_item.metadata["bec"]["scan_number"]
label = config.label
if config.source == "history":
label = f"{config.signal.name}-{config.signal.entry}-scan-{config.scan_number}"
config.label = label
if not label:
# Fallback label
label = WidgetContainerUtils.generate_unique_name(
@@ -985,7 +891,7 @@ class Waveform(PlotBase):
raise ValueError("For 'custom' curves, x_data and y_data must be provided.")
# Actually create the Curve item
curve = self._add_curve_object(name=label, config=config, scan_item=scan_item)
curve = self._add_curve_object(name=label, config=config)
# If custom => set initial data
if config.source == "custom" and x_data is not None and y_data is not None:
@@ -1002,8 +908,6 @@ class Waveform(PlotBase):
self.setup_dap_for_scan()
self.roi_enable.emit(True) # Enable the ROI toolbar action
self.request_dap() # Request DAP update directly without blocking proxy
if config.source == "history":
self._history_curves.append(curve)
QTimer.singleShot(
150, self.auto_range
@@ -1011,175 +915,24 @@ class Waveform(PlotBase):
return curve
def _add_curve_object(
self, name: str, config: CurveConfig, scan_item: ScanDataContainer | None = None
) -> Curve | None:
def _add_curve_object(self, name: str, config: CurveConfig) -> Curve:
"""
Low-level creation of the PlotDataItem (Curve) from a `CurveConfig`.
Args:
name (str): The name/label of the curve.
config (CurveConfig): Configuration model describing the curve.
scan_item (ScanDataContainer | None): Optional scan item for history curves.
Returns:
Curve: The newly created curve object, added to the plot.
"""
curve = Curve(config=config, name=name, parent_item=self)
self.plot_item.addItem(curve)
if scan_item is not None:
self._fetch_history_data_for_curve(curve, scan_item)
self._categorise_device_curves()
curve.visibleChanged.connect(self._refresh_crosshair_markers)
curve.visibleChanged.connect(self.auto_range)
return curve
def _fetch_history_data_for_curve(
self, curve: Curve, scan_item: ScanDataContainer
) -> Curve | None:
# Check if the data are already set
device = curve.config.signal.name
entry = curve.config.signal.entry
all_devices_used = getattr(
getattr(scan_item, "_msg", None), "stored_data_info", None
) or getattr(scan_item, "stored_data_info", None)
if all_devices_used is None:
curve.remove()
raise ValueError(
f"No stored data info found in scan item ID:{curve.config.scan_id} for curve '{curve.name()}'. "
f"Upgrade BEC to the latest version."
)
# 1. get y data
x_data, y_data = None, None
if device not in all_devices_used:
raise ValueError(f"Device '{device}' not found in scan item ID:{curve.config.scan_id}.")
if entry not in all_devices_used[device]:
raise ValueError(
f"Entry '{entry}' not found in device '{device}' in scan item ID:{curve.config.scan_id}."
)
y_shape = all_devices_used.get(device).get(entry).shape[0]
# Determine X-axis data
if self.x_axis_mode["name"] == "index":
x_data = np.arange(y_shape)
curve.config.current_x_mode = "index"
self._update_x_label_suffix(" (index)")
elif self.x_axis_mode["name"] == "timestamp":
y_device = scan_item.devices.get(device)
x_data = y_device.get(entry).read().get("timestamp")
curve.config.current_x_mode = "timestamp"
self._update_x_label_suffix(" (timestamp)")
elif self.x_axis_mode["name"] not in ("index", "timestamp", "auto"): # Custom device mode
if self.x_axis_mode["name"] not in all_devices_used:
logger.warning(
f"Custom device '{self.x_axis_mode['name']}' not found in scan item of history curve '{curve.name()}'; scan ID: {curve.config.scan_id}."
)
curve.setVisible(False)
return
x_entry_custom = self.x_axis_mode.get("entry")
if x_entry_custom is None:
x_entry_custom = self.entry_validator.validate_signal(
self.x_axis_mode["name"], None
)
if x_entry_custom not in all_devices_used[self.x_axis_mode["name"]]:
logger.warning(
f"Custom entry '{x_entry_custom}' for device '{self.x_axis_mode['name']}' not found in scan item of history curve '{curve.name()}'; scan ID: {curve.config.scan_id}."
)
curve.setVisible(False)
return
x_shape = (
scan_item._msg.stored_data_info.get(self.x_axis_mode["name"])
.get(x_entry_custom)
.shape[0]
)
if x_shape != y_shape:
logger.warning(
f"Shape mismatch for x data '{x_shape}' and y data '{y_shape}' in history curve '{curve.name()}'; scan ID: {curve.config.scan_id}."
)
curve.setVisible(False)
return
x_device = scan_item.devices.get(self.x_axis_mode["name"])
x_data = x_device.get(x_entry_custom).read().get("value")
curve.config.current_x_mode = self.x_axis_mode["name"]
self._update_x_label_suffix(f" (custom: {self.x_axis_mode['name']}-{x_entry_custom})")
elif self.x_axis_mode["name"] == "auto":
if (
self._current_x_device is None
): # Scenario where no x device is set yet, because there was no live scan done in this widget yet
# If no current x device, use the first motor from scan item
scan_motors = self._ensure_str_list(
scan_item.metadata.get("bec").get("scan_report_devices")
)
if not scan_motors: # scan was done without reported motor from whatever reason
x_data = np.arange(y_shape) # Fallback to index
y_data = scan_item.devices.get(device).get(entry).read().get("value")
curve.set_data(x=x_data, y=y_data)
self._update_x_label_suffix(" (auto: index)")
return curve
x_entry = self.entry_validator.validate_signal(scan_motors[0], None)
if x_entry not in all_devices_used.get(scan_motors[0], {}):
logger.warning(
f"Auto x entry '{x_entry}' for device '{scan_motors[0]}' not found in scan item of history curve '{curve.name()}'; scan ID: {curve.config.scan_id}."
)
curve.setVisible(False)
return
if y_shape != all_devices_used.get(scan_motors[0]).get(x_entry, {}).shape[0]:
logger.warning(
f"Shape mismatch for x data '{all_devices_used.get(scan_motors[0]).get(x_entry, {}).get('shape', [0])[0]}' and y data '{y_shape}' in history curve '{curve.name()}'; scan ID: {curve.config.scan_id}."
)
curve.setVisible(False)
return
x_data = scan_item.devices.get(scan_motors[0]).get(x_entry).read().get("value")
self._current_x_device = (scan_motors[0], x_entry)
self._update_x_label_suffix(f" (auto: {scan_motors[0]}-{x_entry})")
curve.config.current_x_mode = "auto"
self._update_x_label_suffix(f" (auto: {scan_motors[0]}-{x_entry})")
else: # Scan in auto mode was done and live scan already set the current x device
if self._current_x_device[0] not in all_devices_used:
logger.warning(
f"Auto x data for device '{self._current_x_device[0]}' "
f"and entry '{self._current_x_device[1]}'"
f" not found in scan item of the history curve {curve.name()}."
)
curve.setVisible(False)
return
x_device = scan_item.devices.get(self._current_x_device[0])
x_data = x_device.get(self._current_x_device[1]).read().get("value")
curve.config.current_x_mode = "auto"
self._update_x_label_suffix(
f" (auto: {self._current_x_device[0]}-{self._current_x_device[1]})"
)
if x_data is None:
logger.warning(
f"X data for curve '{curve.name()}' could not be determined. "
f"Check if the x_mode '{self.x_axis_mode['name']}' is valid for the scan item."
)
curve.setVisible(False)
return
if y_data is None:
y_data = scan_item.devices.get(device).get(entry).read().get("value")
if y_data is None:
logger.warning(
f"Y data for curve '{curve.name()}' could not be determined. "
f"Check if the device '{device}' and entry '{entry}' are valid for the scan item."
)
curve.setVisible(False)
return
curve.set_data(x=x_data, y=y_data)
return curve
def _refresh_history_curves(self):
for curve in self._history_curves:
scan_item = self.get_history_scan_item(
scan_id=curve.config.scan_id, scan_index=curve.config.scan_number
)
if scan_item is not None:
self._fetch_history_data_for_curve(curve, scan_item)
else:
logger.warning(f"Scan item for curve {curve.name()} not found.")
def _refresh_crosshair_markers(self):
"""
Refresh the crosshair markers when a curve visibility changes.
@@ -1214,42 +967,7 @@ class Waveform(PlotBase):
Clear all data from the plot widget, but keep the curve references.
"""
for c in self.curves:
if c.config.source != "history":
c.clear_data()
# X-axis compatibility helpers
def _is_curve_compatible(self, curve: Curve) -> bool:
"""
Return True when *curve* can be shown with the current x-axis mode.
- index, timestamp are always compatible.
- For history curves we check whether the requested motor
(self.x_axis_mode["name"]) exists in the cached
history_data_buffer["x"] dictionary.
- DAP is done by checking if the parent curve is visible.
- Device curves are fetched by update sync/async curves, which solves the compatibility there.
"""
mode = self.x_axis_mode.get("name", "index")
if mode in ("index", "timestamp"): # always compatible - wild west mode
return True
if curve.config.source == "history":
scan_item = self.get_history_scan_item(
scan_id=curve.config.scan_id, scan_index=curve.config.scan_number
)
curve = self._fetch_history_data_for_curve(curve, scan_item)
if curve is None:
return False
if curve.config.source == "dap":
parent_curve = self._find_curve_by_label(curve.config.parent_label)
if parent_curve.isVisible():
return True
return False # DAP curve is not compatible if parent curve is not visible
return True
def _update_curve_visibility(self) -> None:
"""Show or hide curves according to `_is_curve_compatible`."""
for c in self.curves:
c.setVisible(self._is_curve_compatible(c))
c.clear_data()
def clear_all(self):
"""
@@ -1412,7 +1130,7 @@ class Waveform(PlotBase):
self.scan_id = current_scan_id
self.scan_item = self.queue.scan_storage.find_scan_by_ID(self.scan_id) # live scan
self._slice_index = None # Reset the slice index
self._update_curve_visibility()
self._mode = self._categorise_device_curves()
# First trigger to sync and async data
@@ -1490,7 +1208,7 @@ class Waveform(PlotBase):
device_data = entry_obj.read()["value"] if entry_obj else None
x_data = self._get_x_data(device_name, device_entry)
if x_data is not None:
if np.isscalar(x_data):
if len(x_data) == 1:
self.clear_data()
return
if device_data is not None and x_data is not None:
@@ -1898,7 +1616,6 @@ class Waveform(PlotBase):
entry_obj = data.get(x_name, {}).get(x_entry)
x_data = entry_obj.read()["value"] if entry_obj else [0]
new_suffix = f" (custom: {x_name}-{x_entry})"
self._current_x_device = (x_name, x_entry)
# 2 User wants timestamp
if self.x_axis_mode["name"] == "timestamp":
@@ -1913,13 +1630,11 @@ class Waveform(PlotBase):
timestamps = entry_obj.read()["timestamp"] if entry_obj else [0]
x_data = timestamps
new_suffix = " (timestamp)"
self._current_x_device = None
# 3 User wants index
if self.x_axis_mode["name"] == "index":
x_data = None
new_suffix = " (index)"
self._current_x_device = None
# 4 Best effort automatic mode
if self.x_axis_mode["name"] is None or self.x_axis_mode["name"] == "auto":
@@ -1927,7 +1642,6 @@ class Waveform(PlotBase):
if len(self._async_curves) > 0:
x_data = None
new_suffix = " (auto: index)"
self._current_x_device = None
# 4.2 If there are sync curves, use the first device from the scan report
else:
try:
@@ -1950,7 +1664,6 @@ class Waveform(PlotBase):
entry_obj = data.get(x_name, {}).get(x_entry)
x_data = entry_obj.read()["value"] if entry_obj else None
new_suffix = f" (auto: {x_name}-{x_entry})"
self._current_x_device = (x_name, x_entry)
self._update_x_label_suffix(new_suffix)
return x_data
@@ -2053,83 +1766,49 @@ class Waveform(PlotBase):
logger.info(f"Scan {self.scan_id} => mode={self._mode}")
return mode
def get_history_scan_item(
self, scan_index: int = None, scan_id: str = None
) -> ScanDataContainer | None:
"""
Get scan item from history based on scan_id or scan_index.
If both are provided, scan_id takes precedence and the resolved scan_number
will be read from the fetched item.
Args:
scan_id (str, optional): ScanID of the scan to fetch. Defaults to None.
scan_index (int, optional): Index (scan number) of the scan to fetch. Defaults to None.
Returns:
ScanDataContainer | None: The fetched scan item or None if no item was found.
"""
if scan_index is not None and scan_id is not None:
scan_index = None # Prefer scan_id when both are given
if scan_index is None and scan_id is None:
logger.warning("Neither scan_id or scan_number was provided, fetching the latest scan")
scan_index = -1
if scan_index is None:
return self.client.history.get_by_scan_id(scan_id)
if scan_index == -1:
scan_item = self.client.queue.scan_storage.current_scan
if scan_item is not None:
if scan_item.status_message is None:
logger.warning(f"Scan item with {scan_item.scan_id} has no status message.")
return None
return scan_item
if len(self.client.history) == 0:
logger.info("No scans executed so far. Cannot fetch scan history.")
return None
# check if scan_index is negative, then fetch it just from the list from the end
if int(scan_index) < 0:
return self.client.history[scan_index]
scan_item = self.client.history.get_by_scan_number(scan_index)
if scan_item is None:
logger.warning(f"Scan with scan_number {scan_index} not found in history.")
return None
if isinstance(scan_item, list):
if len(scan_item) > 1:
logger.warning(
f"Multiple scans found with scan_number {scan_index}. Returning the latest one."
)
scan_item = scan_item[-1]
return scan_item
@SafeSlot(int)
@SafeSlot(str)
@SafeSlot()
def update_with_scan_history(self, scan_index: int = None, scan_id: str = None):
"""
Update the scan curves with the data from the scan storage.
If both arguments are provided, scan_id takes precedence and scan_index is ignored.
Provide only one of scan_id or scan_index.
Args:
scan_id(str, optional): ScanID of the scan to be updated. Defaults to None.
scan_index(int, optional): Index (scan number) of the scan to be updated. Defaults to None.
scan_index(int, optional): Index of the scan to be updated. Defaults to None.
"""
self.scan_item = self.get_history_scan_item(scan_index=scan_index, scan_id=scan_id)
if scan_index is not None and scan_id is not None:
raise ValueError("Only one of scan_id or scan_index can be provided.")
if self.scan_item is None:
if scan_index is None and scan_id is None:
logger.warning(f"Neither scan_id or scan_number was provided, fetching the latest scan")
scan_index = -1
if scan_index is None:
self.scan_id = scan_id
self.scan_item = self.client.history.get_by_scan_id(scan_id)
self._emit_signal_update()
return
if scan_id is not None:
self.scan_id = scan_id
else:
# If scan_number was used, set the scan_id from the fetched item
if hasattr(self.scan_item, "metadata"):
self.scan_id = self.scan_item.metadata["bec"]["scan_id"]
else:
self.scan_id = self.scan_item.scan_id
if scan_index == -1:
scan_item = self.client.queue.scan_storage.current_scan
if scan_item is not None:
if scan_item.status_message is None:
logger.warning(f"Scan item with {scan_item.scan_id} has no status message.")
return
self.scan_item = scan_item
self.scan_id = scan_item.scan_id
self._emit_signal_update()
return
if len(self.client.history) == 0:
logger.info("No scans executed so far. Skipping scan history update.")
return
self.scan_item = self.client.history[scan_index]
metadata = self.scan_item.metadata
self.scan_id = metadata["bec"]["scan_id"]
self._emit_signal_update()
@@ -2360,9 +2039,6 @@ class Waveform(PlotBase):
if self.dap_summary_dialog is not None:
self.dap_summary_dialog.reject()
self.dap_summary_dialog = None
if self.scan_history_dialog is not None:
self.scan_history_dialog.reject()
self.scan_history_dialog = None
super().cleanup()

View File

@@ -38,8 +38,8 @@ class SignalDisplay(BECWidget, QWidget):
@SafeSlot()
def _refresh(self):
if (dev := self.dev.get(self.device)) is not None:
dev.read(cached=True)
dev.read_configuration(cached=True)
dev.read()
dev.read_configuration()
def _add_refresh_button(self):
button_holder = QWidget()

View File

@@ -273,9 +273,7 @@ class SignalLabel(BECWidget, QWidget):
if not isinstance(self._device_obj, Device | Signal):
self._value, self._units = "__", ""
return
reading = (self._device_obj.read(cached=True) or {}) | (
self._device_obj.read_configuration(cached=True) or {}
)
reading = (self._device_obj.read() or {}) | (self._device_obj.read_configuration() or {})
value = reading.get(self._signal_key, {}).get("value")
if value is None:
self._value, self._units = "__", ""

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "bec_widgets"
version = "2.42.0"
version = "2.39.0"
description = "BEC Widgets"
requires-python = ">=3.11"
classifiers = [
@@ -13,8 +13,8 @@ classifiers = [
"Topic :: Scientific/Engineering",
]
dependencies = [
"bec_ipython_client~=3.70", # needed for jupyter console
"bec_lib~=3.70",
"bec_ipython_client~=3.52", # needed for jupyter console
"bec_lib~=3.52",
"bec_qthemes~=0.7, >=0.7",
"black~=25.0", # needed for bw-generate-cli
"isort~=5.13, >=5.13.2", # needed for bw-generate-cli

View File

@@ -1,7 +1,33 @@
import pytest
import qtpy.QtCore
from pytestqt.exceptions import TimeoutError as QtBotTimeoutError
from qtpy.QtCore import QTimer
class TestableQTimer(QTimer):
_instances: list[tuple[QTimer, str]] = []
_current_test_name: str = ""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
TestableQTimer._instances.append((self, TestableQTimer._current_test_name))
@classmethod
def check_all_stopped(cls, qtbot):
def _is_done_or_deleted(t: QTimer):
try:
return not t.isActive()
except RuntimeError as e:
return "already deleted" in e.args[0]
try:
qtbot.waitUntil(lambda: all(_is_done_or_deleted(timer) for timer, _ in cls._instances))
except QtBotTimeoutError as exc:
active_timers = list(filter(lambda t: t[0].isActive(), cls._instances))
(t.stop() for t, _ in cls._instances)
raise TimeoutError(f"Failed to stop all timers: {active_timers}") from exc
cls._instances = []
from bec_widgets.tests.utils import TestableQTimer
# To support 'from qtpy.QtCore import QTimer' syntax we just replace this completely for the test session
# see: https://docs.python.org/3/library/unittest.mock.html#where-to-patch

View File

@@ -6,7 +6,7 @@ from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.client import Image, MotorMap, MultiWaveform, ScatterWaveform, Waveform
from bec_widgets.cli.rpc.rpc_base import RPCReference
from bec_widgets.tests.fake_devices import check_remote_data_size
from bec_widgets.tests.utils import check_remote_data_size
def test_rpc_waveform1d_custom_curve(qtbot, connected_client_gui_obj):
@@ -286,85 +286,3 @@ def test_waveform_passing_device(qtbot, bec_client_lib, connected_client_gui_obj
# check plotted data
x_data, y_data = c1.get_data()
assert np.array_equal(y_data, last_scan_data.devices.samx.samx_setpoint.read().get("value"))
@pytest.mark.timeout(120)
@pytest.mark.parametrize(
"history_selector", ["scan_id", "scan_number"]
) # ensure unique curves per run
def test_rpc_waveform_history_curve(
qtbot, bec_client_lib, connected_client_gui_obj, history_selector
):
"""
E2E test for the new history curve feature:
- Run 3 scans
- For each scan, fetch history curve data using either scan_id OR scan_number (parametrized)
- Compare waveform data with BEC client scan data
Note: Parameterization prevents adding the same logical curve twice (which would collide on label).
"""
gui = connected_client_gui_obj
dock = gui.bec
client = bec_client_lib
dev = client.device_manager.devices
scans = client.scans
queue = client.queue
wf = dock.new("wf_dock").new("Waveform")
# Collect references for validation
scan_meta = [] # list of dicts with scan_id, scan_number, data
# Run 3 scans and collect their metadata and data
for i in range(3):
status = scans.line_scan(dev.samx, -5 + i, 5 + i, steps=10, exp_time=0.01, relative=False)
status.wait()
# Wait until the history entry appears and corresponds to this scan
def _wait_for_scan_in_history():
if len(client.history) == 0:
return False
return client.history[-1].metadata.bec.get("scan_id", None) == status.scan.scan_id
qtbot.waitUntil(_wait_for_scan_in_history, timeout=10000)
hist_item = client.history[-1]
item = queue.scan_storage.storage[-1]
data = item.live_data if hasattr(item, "live_data") else item.data
scan_meta.append(
{
"scan_id": hist_item.metadata.bec.get("scan_id"),
"scan_number": hist_item.metadata.bec.get("scan_number"),
"data": data,
}
)
# For each scan, fetch history curve by the chosen selector and compare to client data
for meta in scan_meta:
sel_value = meta[history_selector]
scan_data = meta["data"]
# Add curve from history using the chosen selector; single curve per scan to avoid duplicates
kwargs = {history_selector: sel_value}
curve = wf.plot(x_name="samx", y_name="bpm4i", **kwargs)
num_elements = 10
# Wait until curve has the expected number of points
def _curve_ready():
try:
x, y = curve.get_data()
except Exception:
return False
return x is not None and len(x) == num_elements and len(y) == num_elements
qtbot.waitUntil(_curve_ready, timeout=10000)
# Get plotted data
x_vals, y_vals = curve.get_data()
# Compare against BEC client scan data
np.testing.assert_equal(x_vals, np.array(scan_data["samx"]["samx"].val))
np.testing.assert_equal(y_vals, np.array(scan_data["bpm4i"]["bpm4i"].val))
# Clean up
curve.remove()

View File

@@ -7,9 +7,8 @@ import pytest
from bec_lib.bec_service import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.redis_connector import RedisConnector
from bec_lib.scan_history import ScanHistory
from bec_widgets.tests.fake_devices import DEVICES, DMMock, FakePositioner, Positioner
from bec_widgets.tests.utils import DEVICES, DMMock, FakePositioner, Positioner
def fake_redis_server(host, port):
@@ -239,18 +238,3 @@ def create_dummy_scan_item():
"scan_report_devices": ["samx"],
}
return dummy_scan
def inject_scan_history(widget, scan_history_factory, *history_args):
"""
Helper to inject scan history messages into client history.
"""
history_msgs = []
for scan_id, scan_number in history_args:
history_msgs.append(scan_history_factory(scan_id=scan_id, scan_number=scan_number))
widget.client.history = ScanHistory(widget.client, False)
for msg in history_msgs:
widget.client.history._scan_data[msg.scan_id] = msg
widget.client.history._scan_ids.append(msg.scan_id)
widget.client.queue.scan_storage.current_scan = None
return history_msgs

View File

@@ -5,10 +5,12 @@ import h5py
import numpy as np
import pytest
from bec_lib import messages
from bec_lib.messages import _StoredDataInfo
from qtpy.QtWidgets import QMessageBox
from pytestqt.exceptions import TimeoutError as QtBotTimeoutError
from qtpy.QtWidgets import QApplication
from bec_widgets.tests import utils as test_utils
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module
from bec_widgets.utils import error_popups
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
@@ -22,30 +24,49 @@ def pytest_runtest_makereport(item, call):
@pytest.fixture(autouse=True)
def qapplication(qtbot, request, testable_qtimer_class): # pylint: disable=unused-argument
yield from test_utils.qapplication_fixture(qtbot, request, testable_qtimer_class)
yield
# if the test failed, we don't want to check for open widgets as
# it simply pollutes the output
if request.node.stash._storage.get("failed"):
print("Test failed, skipping cleanup checks")
return
bec_dispatcher = bec_dispatcher_module.BECDispatcher()
bec_dispatcher.stop_cli_server()
testable_qtimer_class.check_all_stopped(qtbot)
qapp = QApplication.instance()
qapp.processEvents()
if hasattr(qapp, "os_listener") and qapp.os_listener:
qapp.removeEventFilter(qapp.os_listener)
try:
qtbot.waitUntil(lambda: qapp.topLevelWidgets() == [])
except QtBotTimeoutError as exc:
raise TimeoutError(f"Failed to close all widgets: {qapp.topLevelWidgets()}") from exc
@pytest.fixture(autouse=True)
def rpc_register():
yield from test_utils.rpc_register_fixture()
yield RPCRegister()
RPCRegister.reset_singleton()
@pytest.fixture(autouse=True)
def bec_dispatcher(threads_check): # pylint: disable=unused-argument
yield from test_utils.bec_dispatcher_fixture(threads_check)
bec_dispatcher = bec_dispatcher_module.BECDispatcher()
yield bec_dispatcher
bec_dispatcher.disconnect_all()
# clean BEC client
bec_dispatcher.client.shutdown()
# stop the cli server
bec_dispatcher.stop_cli_server()
# reinitialize singleton for next test
bec_dispatcher_module.BECDispatcher.reset_singleton()
@pytest.fixture(autouse=True)
def clean_singleton():
yield from test_utils.clean_singleton_fixture()
@pytest.fixture(autouse=True)
def suppress_message_box(monkeypatch):
"""
Auto-suppress any QMessageBox.exec_ calls by returning Ok immediately.
"""
monkeypatch.setattr(QMessageBox, "exec_", lambda *args, **kwargs: QMessageBox.Ok)
error_popups._popup_utility_instance = None
def create_widget(qtbot, widget, *args, **kwargs):
@@ -94,25 +115,9 @@ def create_history_file(file_path, data: dict, metadata: dict) -> messages.ScanH
elif isinstance(sub_value, dict):
for sub_sub_key, sub_sub_value in sub_value.items():
sub_sub_group = metadata_bec[key].create_group(sub_key)
# Handle _StoredDataInfo objects
if isinstance(sub_sub_value, _StoredDataInfo):
# Store the numeric shape
sub_sub_group.create_dataset("shape", data=sub_sub_value.shape)
# Store the dtype as a UTF-8 string
dt = sub_sub_value.dtype or ""
sub_sub_group.create_dataset(
"dtype", data=dt, dtype=h5py.string_dtype(encoding="utf-8")
)
continue
if isinstance(sub_sub_value, list):
json_val = json.dumps(sub_sub_value)
sub_sub_group.create_dataset(sub_sub_key, data=json_val)
elif isinstance(sub_sub_value, dict):
for k2, v2 in sub_sub_value.items():
val = json.dumps(v2) if isinstance(v2, list) else v2
sub_sub_group.create_dataset(k2, data=val)
else:
sub_sub_group.create_dataset(sub_sub_key, data=sub_sub_value)
sub_sub_value = json.dumps(sub_sub_value)
sub_sub_group.create_dataset(sub_sub_key, data=sub_sub_value)
else:
metadata_bec[key].create_dataset(sub_key, data=sub_value)
else:
@@ -139,8 +144,6 @@ def create_history_file(file_path, data: dict, metadata: dict) -> messages.ScanH
end_time=time.time(),
num_points=metadata["num_points"],
request_inputs=metadata["request_inputs"],
stored_data_info=metadata.get("stored_data_info"),
metadata={"scan_report_devices": metadata.get("scan_report_devices")},
)
return msg
@@ -191,102 +194,3 @@ def grid_scan_history_msg(tmpdir):
file_path = str(tmpdir.join("scan_1.h5"))
return create_history_file(file_path, data, metadata)
@pytest.fixture
def scan_history_factory(tmpdir):
"""
Factory to create scan history messages with custom parameters.
Usage:
msg1 = scan_history_factory(scan_id="id1", scan_number=1, num_points=10)
msg2 = scan_history_factory(scan_id="id2", scan_number=2, scan_name="grid_scan", num_points=16)
"""
def _factory(
scan_id: str = "test_scan",
scan_number: int = 1,
dataset_number: int = 1,
scan_name: str = "line_scan",
scan_type: str = "step",
num_points: int = 10,
x_range: tuple = (-5, 5),
y_range: tuple = (-5, 5),
):
# Generate positions based on scan type
if scan_name == "grid_scan":
grid_size = int(np.sqrt(num_points))
x_grid, y_grid = np.meshgrid(
np.linspace(x_range[0], x_range[1], grid_size),
np.linspace(y_range[0], y_range[1], grid_size),
)
x_flat = x_grid.T.ravel()
y_flat = y_grid.T.ravel()
else:
x_flat = np.linspace(x_range[0], x_range[1], num_points)
y_flat = np.linspace(y_range[0], y_range[1], num_points)
positions = np.vstack((x_flat, y_flat)).T
num_pts = len(positions)
# Create dummy data
data = {
"baseline": {"bpm1a": {"bpm1a": {"value": [1], "timestamp": [100]}}},
"monitored": {
"bpm4i": {
"bpm4i": {
"value": np.random.rand(num_points),
"timestamp": np.random.rand(num_points),
}
},
"bpm3a": {
"bpm3a": {
"value": np.random.rand(num_points),
"timestamp": np.random.rand(num_points),
}
},
"samx": {"samx": {"value": x_flat, "timestamp": np.arange(num_pts)}},
"samy": {"samy": {"value": y_flat, "timestamp": np.arange(num_pts)}},
},
"async": {
"async_device": {
"async_device": {
"value": np.random.rand(num_pts * 10),
"timestamp": np.random.rand(num_pts * 10),
}
}
},
}
metadata = {
"scan_id": scan_id,
"scan_name": scan_name,
"scan_type": scan_type,
"exit_status": "closed",
"scan_number": scan_number,
"dataset_number": dataset_number,
"request_inputs": {
"arg_bundle": [
"samx",
x_range[0],
x_range[1],
num_pts,
"samy",
y_range[0],
y_range[1],
num_pts,
],
"kwargs": {"relative": True},
},
"positions": positions.tolist(),
"num_points": num_pts,
"stored_data_info": {
"samx": {"samx": _StoredDataInfo(shape=(num_points,), dtype="float64")},
"samy": {"samy": _StoredDataInfo(shape=(num_points,), dtype="float64")},
"bpm4i": {"bpm4i": _StoredDataInfo(shape=(10,), dtype="float64")},
"async_device": {
"async_device": _StoredDataInfo(shape=(num_points * 10,), dtype="float64")
},
},
"scan_report_devices": [b"samx"],
}
file_path = str(tmpdir.join(f"{scan_id}.h5"))
return create_history_file(file_path, data, metadata)
return _factory

View File

@@ -2,9 +2,10 @@
import pytest
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.control.buttons.button_abort.button_abort import AbortButton
from .client_mocks import mocked_client
@pytest.fixture
def abort_button(qtbot, mocked_client):

View File

@@ -1,9 +1,9 @@
import pytest
from qtpy.QtWidgets import QDoubleSpinBox, QLineEdit
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.plots.plot_base import PlotBase
from bec_widgets.widgets.plots.setting_menus.axis_settings import AxisSettings
from tests.unit_tests.client_mocks import mocked_client
from tests.unit_tests.conftest import create_widget

View File

@@ -5,10 +5,11 @@ import pytest
from qtpy.QtCore import QObject
from qtpy.QtWidgets import QApplication
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.utils import BECConnector
from bec_widgets.utils.error_popups import SafeSlot as Slot
from .client_mocks import mocked_client
class BECConnectorQObject(BECConnector, QObject): ...

View File

@@ -5,9 +5,9 @@ from unittest import mock
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.containers.dock import BECDockArea
from .client_mocks import mocked_client
from .test_bec_queue import bec_queue_msg_full

View File

@@ -1,9 +1,10 @@
import pytest
from bec_lib import messages
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.services.bec_queue.bec_queue import BECQueue
from .client_mocks import mocked_client
@pytest.fixture
def bec_queue_msg_full():

View File

@@ -4,12 +4,13 @@ from unittest import mock
import pytest
from bec_lib.messages import BECStatus, ServiceMetricMessage, StatusMessage
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.services.bec_status_box.bec_status_box import (
BECServiceInfoContainer,
BECStatusBox,
)
from .client_mocks import mocked_client
@pytest.fixture
def service_status_fixture():

View File

@@ -4,11 +4,11 @@ from pydantic import ValidationError
from qtpy.QtGui import QColor
from qtpy.QtWidgets import QVBoxLayout, QWidget
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.utils import Colors, ConnectionConfig
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import apply_theme
from bec_widgets.widgets.plots.waveform.curve import CurveConfig
from tests.unit_tests.client_mocks import mocked_client
from tests.unit_tests.conftest import create_widget

View File

@@ -4,10 +4,10 @@ import pytest
from qtpy.QtCore import QPointF, Qt
from qtpy.QtGui import QTransform
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.utils import Crosshair
from bec_widgets.widgets.plots.image.image_item import ImageItem
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from tests.unit_tests.client_mocks import mocked_client
from .conftest import create_widget

View File

@@ -2,17 +2,12 @@ import json
from unittest.mock import MagicMock, patch
import pytest
from bec_lib.scan_history import ScanHistory
from qtpy.QtGui import QValidator
from qtpy.QtWidgets import QComboBox, QVBoxLayout
from bec_widgets.tests.client_mocks import dap_plugin_message, mocked_client, mocked_client_with_dap
from bec_widgets.widgets.plots.waveform.settings.curve_settings.curve_setting import CurveSetting
from bec_widgets.widgets.plots.waveform.settings.curve_settings.curve_tree import (
CurveTree,
ScanIndexValidator,
)
from bec_widgets.widgets.plots.waveform.settings.curve_settings.curve_tree import CurveTree
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from tests.unit_tests.client_mocks import dap_plugin_message, mocked_client, mocked_client_with_dap
from tests.unit_tests.conftest import create_widget
##################################################
@@ -160,7 +155,7 @@ def test_curve_tree_init(curve_tree_fixture):
curve_tree, wf = curve_tree_fixture
assert curve_tree.waveform == wf
assert curve_tree.color_palette == "plasma"
assert curve_tree.tree.columnCount() == 8
assert curve_tree.tree.columnCount() == 7
assert curve_tree.toolbar.components.exists("add")
assert curve_tree.toolbar.components.exists("expand")
@@ -379,54 +374,3 @@ def test_export_data_dap(curve_tree_fixture):
assert exported["signal"]["entry"] == "bpm4i"
assert exported["signal"]["dap"] == "GaussianModel"
assert exported["label"] == "bpm4i-bpm4i-GaussianModel"
def test_scan_index_validator_behavior():
"""
Test ScanIndexValidator allows empty, 'live', partial 'live', valid scan numbers,
and rejects invalid or disallowed inputs under the new allowed-set API.
"""
validator = ScanIndexValidator(allowed_scans={1, 2, 3})
def state(txt):
s, _, _ = validator.validate(txt, 0)
return s
assert state("") == QValidator.State.Acceptable
assert state("live") == QValidator.State.Acceptable
assert state("l") == QValidator.State.Intermediate
assert state("liv") == QValidator.State.Intermediate
assert state("1") == QValidator.State.Acceptable
assert state("3") == QValidator.State.Acceptable
assert state("4") == QValidator.State.Invalid
assert state("0") == QValidator.State.Invalid
assert state("abc") == QValidator.State.Invalid
def test_export_data_history_curve(curve_tree_fixture, scan_history_factory):
"""
Test that export_data for a history curve row correctly serializes scan_number
and resets scan_id when a numeric scan is selected.
"""
curve_tree, wf = curve_tree_fixture
# Inject two history scans into the waveform client
msgs = [
scan_history_factory(scan_id="hid1", scan_number=1),
scan_history_factory(scan_id="hid2", scan_number=2),
]
wf.client.history = ScanHistory(wf.client, False)
for m in msgs:
wf.client.history._scan_data[m.scan_id] = m
wf.client.history._scan_ids.append(m.scan_id)
wf.client.history._scan_numbers.append(m.scan_number)
wf.client.queue.scan_storage.current_scan = None
# Create a device row and select scan index "2"
device_row = curve_tree.add_new_curve(name="bpm4i", entry="bpm4i")
device_row.scan_index_combo.setCurrentText("2")
exported = device_row.export_data()
assert exported["source"] == "history"
assert exported["scan_number"] == 2
assert exported["scan_id"] is None
assert exported["label"] == "bpm4i-bpm4i-scan-2"

View File

@@ -1,8 +1,8 @@
import pytest
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.dap.dap_combo_box.dap_combo_box import DapComboBox
from .client_mocks import mocked_client
from .conftest import create_widget

View File

@@ -4,11 +4,12 @@ import pytest
from qtpy.QtCore import Qt
from qtpy.QtWidgets import QApplication
# pylint: disable=unused-import
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.utils.colors import set_theme
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
# pylint: disable=unused-import
from .client_mocks import mocked_client
# pylint: disable=redefined-outer-name

View File

@@ -6,7 +6,6 @@ from bec_lib.device import Device
from qtpy.QtCore import QPoint, Qt
from qtpy.QtWidgets import QTabWidget
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.services.device_browser.device_browser import DeviceBrowser
from bec_widgets.widgets.services.device_browser.device_item.device_config_form import (
DeviceConfigForm,
@@ -15,6 +14,8 @@ from bec_widgets.widgets.services.device_browser.device_item.device_signal_displ
SignalDisplay,
)
from .client_mocks import mocked_client
if TYPE_CHECKING: # pragma: no cover
from qtpy.QtWidgets import QListWidgetItem

View File

@@ -137,7 +137,6 @@ def test_update_cycle(update_dialog, qtbot):
"description": None,
"readOnly": False,
"softwareTrigger": False,
"onFailure": "retry",
"deviceTags": set(),
"userParameter": {},
"name": "test_device",

View File

@@ -4,13 +4,13 @@ import pytest
from bec_lib.device import ReadoutPriority
from qtpy.QtWidgets import QWidget
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import (
BECDeviceFilter,
DeviceInputBase,
DeviceInputConfig,
)
from .client_mocks import mocked_client
from .conftest import create_widget

View File

@@ -1,13 +1,14 @@
import pytest
from bec_lib.device import ReadoutPriority
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit import (
DeviceLineEdit,
)
from .client_mocks import mocked_client
@pytest.fixture
def device_input_combobox(qtbot, mocked_client):

View File

@@ -4,7 +4,7 @@ import pytest
from bec_lib.device import Signal
from qtpy.QtWidgets import QWidget
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.tests.utils import FakeDevice
from bec_widgets.utils.ophyd_kind_util import Kind
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.control.device_input.base_classes.device_signal_input_base import (
@@ -16,6 +16,7 @@ from bec_widgets.widgets.control.device_input.signal_line_edit.signal_line_edit
SignalLineEdit,
)
from .client_mocks import mocked_client
from .conftest import create_widget

View File

@@ -1,12 +1,12 @@
import pytest
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.utils.filter_io import FilterIO
from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit import (
DeviceLineEdit,
)
from bec_widgets.widgets.dap.dap_combo_box.dap_combo_box import DapComboBox
from .client_mocks import mocked_client
from .conftest import create_widget

View File

@@ -5,10 +5,13 @@ import pytest
from bec_lib import messages
from bec_lib.scan_history import ScanHistory
# pytest: disable=unused-import
from bec_widgets.tests.client_mocks import create_dummy_scan_item, mocked_client
from bec_widgets.widgets.plots.heatmap.heatmap import Heatmap, HeatmapConfig, HeatmapDeviceSignal
# pytest: disable=unused-import
from tests.unit_tests.client_mocks import mocked_client
from .client_mocks import create_dummy_scan_item
@pytest.fixture
def heatmap_widget(qtbot, mocked_client):

View File

@@ -4,10 +4,10 @@ import numpy as np
import pytest
from qtpy.QtCore import QPointF, Qt
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.image.setting_widgets.image_roi_tree import ROIPropertyTree
from bec_widgets.widgets.plots.roi.image_roi import CircularROI, RectangularROI
from tests.unit_tests.client_mocks import mocked_client
from tests.unit_tests.conftest import create_widget
@@ -29,14 +29,6 @@ def roi_tree(qtbot, image_widget):
yield tree
@pytest.fixture
def compact_roi_tree(qtbot, image_widget):
tree = create_widget(
qtbot, ROIPropertyTree, image_widget=image_widget, compact=True, compact_color="#00BCD4"
)
yield tree
def test_initialization(roi_tree, image_widget):
"""Test that the widget initializes correctly with the right components."""
# Check the widget has the right structure
@@ -439,120 +431,3 @@ def test_cleanup_disconnect_signals(roi_tree, image_widget):
# Verify that the tree item was not updated
assert item.text(roi_tree.COL_ROI) == initial_name
assert item.child(2).text(roi_tree.COL_PROPS) == initial_coord
def test_compact_initialization_minimal_toolbar(compact_roi_tree):
assert compact_roi_tree.compact is True
assert compact_roi_tree.tree is None
# Draw actions exist
assert compact_roi_tree.toolbar.components.get_action("roi_rectangle")
assert compact_roi_tree.toolbar.components.get_action("roi_circle")
assert compact_roi_tree.toolbar.components.get_action("roi_ellipse")
# Full-mode actions are absent
import pytest
with pytest.raises(KeyError):
compact_roi_tree.toolbar.components.get_action("expand_toggle")
with pytest.raises(KeyError):
compact_roi_tree.toolbar.components.get_action("lock_unlock_all")
with pytest.raises(KeyError):
compact_roi_tree.toolbar.components.get_action("roi_tree_cmap")
assert not hasattr(compact_roi_tree, "lock_all_action")
def test_compact_single_roi_enforced_programmatic(compact_roi_tree, image_widget):
# Add first ROI
roi1 = image_widget.add_roi(kind="rect", name="r1")
assert len(image_widget.roi_controller.rois) == 1
assert roi1.line_color == "#00BCD4"
# Add second ROI; the first should be removed automatically
roi2 = image_widget.add_roi(kind="circle", name="c1")
rois = image_widget.roi_controller.rois
assert len(rois) == 1
assert rois[0] is roi2
from bec_widgets.widgets.plots.roi.image_roi import CircularROI
assert isinstance(rois[0], CircularROI)
assert rois[0].line_color == "#00BCD4"
def test_compact_add_roi_from_toolbar_single_enforced(qtbot, compact_roi_tree, image_widget):
# Ensure view is ready
plot_item = image_widget.plot_item
view = plot_item.vb.scene().views()[0]
qtbot.waitExposed(view)
# Activate rectangle drawing
rect_action = compact_roi_tree.toolbar.components.get_action("roi_rectangle").action
rect_action.setChecked(True)
# Draw rectangle
start_pos = QPointF(10, 10)
end_pos = QPointF(50, 40)
start_scene = plot_item.vb.mapViewToScene(start_pos)
end_scene = plot_item.vb.mapViewToScene(end_pos)
start_widget = view.mapFromScene(start_scene)
end_widget = view.mapFromScene(end_scene)
qtbot.mousePress(view.viewport(), Qt.LeftButton, pos=start_widget)
qtbot.mouseMove(view.viewport(), pos=end_widget)
qtbot.mouseRelease(view.viewport(), Qt.LeftButton, pos=end_widget)
qtbot.wait(100)
rois = image_widget.roi_controller.rois
from bec_widgets.widgets.plots.roi.image_roi import CircularROI, RectangularROI
assert len(rois) == 1
assert isinstance(rois[0], RectangularROI)
assert rois[0].line_color == "#00BCD4"
# Now draw a circle; rectangle should be removed automatically
rect_action.setChecked(False)
circle_action = compact_roi_tree.toolbar.components.get_action("roi_circle").action
circle_action.setChecked(True)
start_pos = QPointF(20, 20)
end_pos = QPointF(40, 40)
start_scene = plot_item.vb.mapViewToScene(start_pos)
end_scene = plot_item.vb.mapViewToScene(end_pos)
start_widget = view.mapFromScene(start_scene)
end_widget = view.mapFromScene(end_scene)
qtbot.mousePress(view.viewport(), Qt.LeftButton, pos=start_widget)
qtbot.mouseMove(view.viewport(), pos=end_widget)
qtbot.mouseRelease(view.viewport(), Qt.LeftButton, pos=end_widget)
qtbot.wait(100)
rois = image_widget.roi_controller.rois
assert len(rois) == 1
assert isinstance(rois[0], CircularROI)
assert rois[0].line_color == "#00BCD4"
def test_compact_draw_mode_toggle(compact_roi_tree):
# Initially no draw mode
assert compact_roi_tree._roi_draw_mode is None
rect_action = compact_roi_tree.toolbar.components.get_action("roi_rectangle").action
circle_action = compact_roi_tree.toolbar.components.get_action("roi_circle").action
# Toggle rect on
rect_action.toggle()
assert compact_roi_tree._roi_draw_mode == "rect"
assert rect_action.isChecked()
assert not circle_action.isChecked()
# Toggle circle on; rect should toggle off
circle_action.toggle()
assert compact_roi_tree._roi_draw_mode == "circle"
assert circle_action.isChecked()
assert not rect_action.isChecked()
# Toggle circle off → none
circle_action.toggle()
assert compact_roi_tree._roi_draw_mode is None
assert not rect_action.isChecked()
assert not circle_action.isChecked()

View File

@@ -5,7 +5,6 @@ from typing import Literal
import numpy as np
import pytest
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.roi.image_roi import (
CircularROI,
@@ -13,6 +12,7 @@ from bec_widgets.widgets.plots.roi.image_roi import (
RectangularROI,
ROIController,
)
from tests.unit_tests.client_mocks import mocked_client
from tests.unit_tests.conftest import create_widget
@@ -81,7 +81,7 @@ def test_data_extraction_matches_coordinates(bec_image_widget_with_roi):
# For rectangular ROI: pixel bounding box equals coordinate bbox
if isinstance(roi, RectangularROI):
(x0, y0), (_, _), (_, _), (x1, y1), *_ = roi.get_coordinates(typed=False)
(x0, y0), (_, _), (_, _), (x1, y1) = roi.get_coordinates(typed=False)
# ensure ints inside image shape
x0, y0, x1, y1 = map(int, (x0, y0, x1, y1))
expected = widget.main_image.image[y0:y1, x0:x1]

View File

@@ -3,8 +3,8 @@ import pyqtgraph as pg
import pytest
from qtpy.QtCore import QPointF
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.plots.image.image import Image
from tests.unit_tests.client_mocks import mocked_client
from tests.unit_tests.conftest import create_widget
##################################################

View File

@@ -8,10 +8,11 @@ from qtpy.QtGui import QFontMetrics
import bec_widgets
from bec_widgets.applications.launch_window import LaunchWindow
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
from .client_mocks import mocked_client
base_path = os.path.dirname(bec_widgets.__file__)

View File

@@ -3,9 +3,9 @@ from unittest import mock
import numpy as np
import pytest
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.dap.lmfit_dialog.lmfit_dialog import LMFitDialog
from .client_mocks import mocked_client
from .conftest import create_widget

View File

@@ -8,12 +8,18 @@ from unittest.mock import MagicMock, patch
import pytest
from bec_lib.messages import LogMessage
from bec_lib.redis_connector import StreamMessage
from qtpy.QtCore import QDateTime
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.utility.logpanel._util import replace_escapes, simple_color_format
from bec_widgets.widgets.utility.logpanel._util import (
log_time,
replace_escapes,
simple_color_format,
)
from bec_widgets.widgets.utility.logpanel.logpanel import DEFAULT_LOG_COLORS, LogPanel
from .client_mocks import mocked_client
TEST_TABLE_STRING = "2025-01-15 15:57:18 | bec_server.scan_server.scan_queue | [INFO] | \n \x1b[3m primary queue / ScanQueueStatus.RUNNING \x1b[0m\n┏━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━┓\n\x1b[1m \x1b[0m\x1b[1m queue_id \x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mscan_id\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mis_scan\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mtype\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mscan_numb…\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mIQ status\x1b[0m\x1b[1m \x1b[0m┃\n┡━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━┩\n│ bbe50c82-6… │ None │ False │ mv │ None │ PENDING │\n└─────────────┴─────────┴─────────┴──────┴────────────┴───────────┘\n\n"
TEST_LOG_MESSAGES = [

View File

@@ -5,7 +5,6 @@ from qtpy.QtCore import QEvent, QPoint, QPointF
from qtpy.QtGui import QEnterEvent
from qtpy.QtWidgets import QApplication, QFrame, QLabel
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.containers.main_window.addons.hover_widget import (
HoverWidget,
WidgetTooltip,
@@ -14,6 +13,7 @@ from bec_widgets.widgets.containers.main_window.addons.scroll_label import Scrol
from bec_widgets.widgets.containers.main_window.addons.web_links import BECWebLinksMixin
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
from .client_mocks import mocked_client
from .conftest import create_widget

View File

@@ -1,8 +1,8 @@
import numpy as np
import pyqtgraph as pg
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.plots.motor_map.motor_map import MotorMap
from tests.unit_tests.client_mocks import mocked_client
from .conftest import create_widget

View File

@@ -1,7 +1,7 @@
import numpy as np
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.plots.multi_waveform.multi_waveform import MultiWaveform
from tests.unit_tests.client_mocks import mocked_client
from .conftest import create_widget

View File

@@ -1,7 +1,6 @@
import pytest
from qtpy import QtCore, QtGui, QtWidgets
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.utils.error_popups import ErrorPopupUtility
from bec_widgets.widgets.containers.main_window.addons.notification_center.notification_banner import (
DARK_PALETTE,
@@ -14,6 +13,8 @@ from bec_widgets.widgets.containers.main_window.addons.notification_center.notif
SeverityKind,
)
from .client_mocks import mocked_client
@pytest.fixture
def toast(qtbot):

View File

@@ -1,8 +1,8 @@
import numpy as np
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.plots.plot_base import PlotBase, UIMode
from .client_mocks import mocked_client
from .conftest import create_widget
# pylint: disable=unused-import

View File

@@ -7,8 +7,7 @@ from qtpy.QtCore import Qt, QTimer
from qtpy.QtGui import QValidator
from qtpy.QtWidgets import QPushButton
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.tests.fake_devices import Positioner
from bec_widgets.tests.utils import Positioner
from bec_widgets.widgets.control.device_control.positioner_box import (
PositionerBox,
PositionerControlLine,
@@ -17,6 +16,7 @@ from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit
DeviceLineEdit,
)
from .client_mocks import mocked_client
from .conftest import create_widget
@@ -50,7 +50,7 @@ def positioner_box(qtbot, mocked_client):
def test_positioner_box(positioner_box):
"""Test init of positioner box"""
assert positioner_box.device == "samx"
data = positioner_box.dev["samx"].read(cached=True)
data = positioner_box.dev["samx"].read()
# Avoid check for Positioner class from BEC in _init_device
setpoint_text = positioner_box.ui.setpoint.text()

View File

@@ -2,9 +2,9 @@ from unittest import mock
import pytest
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.control.device_control.positioner_box import PositionerBox2D
from .client_mocks import mocked_client
from .conftest import create_widget
@@ -29,8 +29,8 @@ def test_positioner_box_2d(positioner_box_2d):
"""Test init of 2D positioner box"""
assert positioner_box_2d.device_hor == "samx"
assert positioner_box_2d.device_ver == "samy"
data_hor = positioner_box_2d.dev["samx"].read(cached=True)
data_ver = positioner_box_2d.dev["samy"].read(cached=True)
data_hor = positioner_box_2d.dev["samx"].read()
data_ver = positioner_box_2d.dev["samy"].read()
# Avoid check for Positioner class from BEC in _init_device
setpoint_hor_text = positioner_box_2d.ui.setpoint_hor.text()
@@ -80,60 +80,3 @@ def test_positioner_box_setpoint_changes(positioner_box_2d: PositionerBox2D):
positioner_box_2d.ui.setpoint_ver.setText("100")
positioner_box_2d.on_setpoint_change_ver()
mock_move.assert_called_once_with(100, relative=False)
def _hor_buttons(widget: PositionerBox2D):
return [
widget.ui.tweak_increase_hor,
widget.ui.tweak_decrease_hor,
widget.ui.step_increase_hor,
widget.ui.step_decrease_hor,
]
def _ver_buttons(widget: PositionerBox2D):
return [
widget.ui.tweak_increase_ver,
widget.ui.tweak_decrease_ver,
widget.ui.step_increase_ver,
widget.ui.step_decrease_ver,
]
def test_controls_default_enabled(positioner_box_2d: PositionerBox2D):
"""By default both axes controls are enabled and UI reflects it."""
assert positioner_box_2d.enable_controls_hor is True
assert positioner_box_2d.enable_controls_ver is True
assert all(w.isEnabled() for w in _hor_buttons(positioner_box_2d))
assert all(w.isEnabled() for w in _ver_buttons(positioner_box_2d))
def test_disable_enable_controls_and_persist_across_device_change(
positioner_box_2d: PositionerBox2D, qtbot
):
"""Disabling an axis should disable its buttons and remain disabled after device (re)binding."""
# Disable horizontal and verify UI
positioner_box_2d.enable_controls_hor = False
assert positioner_box_2d.enable_controls_hor is False
assert all(not w.isEnabled() for w in _hor_buttons(positioner_box_2d))
# Simulate a horizontal device change; state must persist after queued re-apply
positioner_box_2d.on_device_change_hor("samx", "samx")
qtbot.waitUntil(lambda: all(not w.isEnabled() for w in _hor_buttons(positioner_box_2d)))
# Re-enable and verify UI
positioner_box_2d.enable_controls_hor = True
qtbot.waitUntil(lambda: all(w.isEnabled() for w in _hor_buttons(positioner_box_2d)))
# Disable vertical and verify UI
positioner_box_2d.enable_controls_ver = False
assert positioner_box_2d.enable_controls_ver is False
assert all(not w.isEnabled() for w in _ver_buttons(positioner_box_2d))
# Simulate a vertical device change; state must persist after queued re-apply
positioner_box_2d.on_device_change_ver("samy", "samy")
qtbot.waitUntil(lambda: all(not w.isEnabled() for w in _ver_buttons(positioner_box_2d)))
# Re-enable and verify UI
positioner_box_2d.enable_controls_ver = True
qtbot.waitUntil(lambda: all(w.isEnabled() for w in _ver_buttons(positioner_box_2d)))

View File

@@ -5,9 +5,10 @@ from unittest.mock import patch
import pytest
from qtpy.QtWidgets import QMessageBox
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.control.buttons.button_reset.button_reset import ResetButton
from .client_mocks import mocked_client
@pytest.fixture
def reset_button(qtbot, mocked_client):

View File

@@ -2,9 +2,10 @@
import pytest
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.control.buttons.button_resume.button_resume import ResumeButton
from .client_mocks import mocked_client
@pytest.fixture
def resume_button(qtbot, mocked_client):

View File

@@ -4,12 +4,13 @@ import pytest
from bec_lib.endpoints import MessageEndpoints
from pydantic import ValidationError
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.utils import Colors
from bec_widgets.widgets.progress.ring_progress_bar import RingProgressBar
from bec_widgets.widgets.progress.ring_progress_bar.ring import ProgressbarConnections, RingConfig
from bec_widgets.widgets.progress.ring_progress_bar.ring_progress_bar import RingProgressBarConfig
from .client_mocks import mocked_client
@pytest.fixture
def ring_progress_bar(qtbot, mocked_client):

View File

@@ -7,11 +7,12 @@ from bec_lib.endpoints import MessageEndpoints
from bec_lib.messages import AvailableResourceMessage, ScanHistoryMessage
from qtpy.QtCore import QModelIndex, Qt
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.utils.forms_from_types.items import StrFormItem
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets.control.scan_control import ScanControl
from .client_mocks import mocked_client
# pylint: disable=no-member
# pylint: disable=missing-function-docstring
# pylint: disable=redefined-outer-name

View File

@@ -2,9 +2,10 @@ from unittest import mock
import pytest
from bec_lib.messages import ScanHistoryMessage, _StoredDataInfo
from pytestqt import qtbot
from qtpy import QtCore
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.widgets.services.scan_history_browser.components import (
ScanHistoryDeviceViewer,
ScanHistoryMetadataViewer,
@@ -14,6 +15,8 @@ from bec_widgets.widgets.services.scan_history_browser.scan_history_browser impo
ScanHistoryBrowser,
)
from .client_mocks import mocked_client
@pytest.fixture
def scan_history_msg():

View File

@@ -4,7 +4,6 @@ import numpy as np
import pytest
from bec_lib import messages
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.progress.bec_progressbar.bec_progressbar import (
BECProgressBar,
ProgressState,
@@ -15,6 +14,8 @@ from bec_widgets.widgets.progress.scan_progressbar.scan_progressbar import (
ScanProgressBar,
)
from .client_mocks import mocked_client
@pytest.fixture
def scan_progressbar(qtbot, mocked_client):

View File

@@ -2,12 +2,12 @@ import json
import numpy as np
from bec_widgets.tests.client_mocks import create_dummy_scan_item, mocked_client
from bec_widgets.widgets.plots.scatter_waveform.scatter_curve import (
ScatterCurveConfig,
ScatterDeviceSignal,
)
from bec_widgets.widgets.plots.scatter_waveform.scatter_waveform import ScatterWaveform
from tests.unit_tests.client_mocks import create_dummy_scan_item, mocked_client
from .conftest import create_widget

View File

@@ -21,6 +21,7 @@ def test_multiple_extension_registration():
"""
Test that multiple extension registrations do not cause issues.
"""
assert msgpack.is_registered(QPointF)
assert serialization.module_is_registered("bec_widgets.utils.serialization")
serialization.register_serializer_extension()
assert msgpack.is_registered(QPointF)
assert serialization.module_is_registered("bec_widgets.utils.serialization")
assert len(msgpack._encoder) == len(set(msgpack._encoder))

View File

@@ -4,14 +4,15 @@ from unittest.mock import MagicMock, patch
import pytest
from qtpy import QtCore
from qtpy.QtWidgets import QDialogButtonBox
from qtpy.QtWidgets import QDialogButtonBox, QLabel
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.control.device_input.base_classes.device_signal_input_base import (
DeviceSignalInputBaseConfig,
)
from bec_widgets.widgets.utility.signal_label.signal_label import ChoiceDialog, SignalLabel
from .client_mocks import mocked_client
SAMX_INFO_DICT = {
"signals": {
"readback": {

View File

@@ -2,9 +2,10 @@
import pytest
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.control.buttons.stop_button.stop_button import StopButton
from .client_mocks import mocked_client
@pytest.fixture
def stop_button(qtbot, mocked_client):

View File

@@ -1,8 +1,9 @@
import pytest
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.editors.text_box.text_box import DEFAULT_TEXT, TextBox
from .client_mocks import mocked_client
@pytest.fixture
def text_box_widget(qtbot, mocked_client):

View File

@@ -3,10 +3,10 @@ from unittest import mock
import pyqtgraph as pg
import pytest
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.utils.bec_signal_proxy import BECSignalProxy
from bec_widgets.widgets.dap.dap_combo_box.dap_combo_box import DapComboBox
from .client_mocks import mocked_client
from .conftest import create_widget

View File

@@ -1,9 +1,10 @@
import pytest
from qtpy.QtCore import QPointF
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from .client_mocks import mocked_client
@pytest.fixture
def plot_widget_with_arrow_item(qtbot, mocked_client):

View File

@@ -5,9 +5,10 @@ from unittest import mock
import pytest
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.editors.vscode.vscode import VSCodeEditor
from .client_mocks import mocked_client
@pytest.fixture
def vscode_widget(qtbot, mocked_client):

View File

@@ -10,21 +10,24 @@ import pyqtgraph as pg
import pytest
from pyqtgraph.graphicsItems.DateAxisItem import DateAxisItem
from qtpy.QtCore import QTimer
from qtpy.QtWidgets import QApplication, QCheckBox, QDialog, QDialogButtonBox, QDoubleSpinBox
from bec_widgets.tests.client_mocks import (
DummyData,
create_dummy_scan_item,
dap_plugin_message,
inject_scan_history,
mocked_client,
mocked_client_with_dap,
from qtpy.QtWidgets import (
QApplication,
QCheckBox,
QDialog,
QDialogButtonBox,
QDoubleSpinBox,
QSpinBox,
)
from bec_widgets.widgets.plots.plot_base import UIMode
from bec_widgets.widgets.plots.waveform.curve import DeviceSignal
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from bec_widgets.widgets.services.scan_history_browser.scan_history_browser import (
ScanHistoryBrowser,
from tests.unit_tests.client_mocks import (
DummyData,
create_dummy_scan_item,
dap_plugin_message,
mocked_client,
mocked_client_with_dap,
)
from .conftest import create_widget
@@ -838,33 +841,6 @@ def test_show_dap_summary_popup(qtbot, mocked_client):
assert fit_action.isChecked() is False
def test_show_scan_history_popup(qtbot, mocked_client):
"""
Test that show_scan_history_popup displays the scan history browser dialog
and toggles the toolbar action correctly.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
scan_action = wf.toolbar.components.get_action("scan_history").action
# Initially unchecked and no dialog
assert not scan_action.isChecked()
assert wf.scan_history_dialog is None
# Show the popup
wf.show_scan_history_popup()
# Dialog should exist and be visible, action checked
assert wf.scan_history_dialog is not None
assert wf.scan_history_dialog.isVisible()
assert scan_action.isChecked()
# The embedded widget should be the correct type
assert isinstance(wf.scan_history_widget, ScanHistoryBrowser)
# Close the dialog (triggers _scan_history_closed)
wf.scan_history_dialog.close()
# Dialog reference should be cleared and action unchecked
assert wf.scan_history_dialog is None
assert not scan_action.isChecked()
#####################################################
# The following tests are for the async dataset guard
#####################################################
@@ -1087,187 +1063,3 @@ def test_dialog_reject_real_interaction(qtbot, mocked_client):
assert wf.skip_large_dataset_warning is True
# Limit remains unchanged
assert wf.max_dataset_size_mb == 1
def test_update_with_scan_history_by_index(qtbot, mocked_client, scan_history_factory):
"""
Test that update_with_scan_history by index loads the correct historical scan.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
hist1, hist2 = inject_scan_history(wf, scan_history_factory, ("hist1", 1), ("hist2", 2))
assert len(wf.client.history._scan_ids) == 2, "Expected two history scans"
# Do history curve plotting
wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id="hist1")
wf.plot(y_name="bpm4i", scan_number=2)
assert len(wf.plot_item.curves) == 2, "Expected two curves for history scans"
c1, c2 = wf.plot_item.curves
# First curve should be for hist1, second for hist2
assert c1.config.signal.name == "bpm4i"
assert c1.config.signal.entry == "bpm4i"
assert c1.config.scan_id == "hist1"
assert c1.config.scan_number == 1
assert c1.name() == "bpm4i-bpm4i-scan-1"
assert c2.config.signal.name == "bpm4i"
assert c2.config.signal.entry == "bpm4i"
assert c2.config.scan_id == "hist2"
assert c2.config.scan_number == 2
assert c2.name() == "bpm4i-bpm4i-scan-2"
@pytest.mark.parametrize("mode", ["auto", "timestamp", "index", "samx"])
def test_history_curve_x_modes_pre_plot(qtbot, mocked_client, scan_history_factory, mode):
"""
Test that history curves respect x_mode when set before plotting.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
hist1, hist2 = inject_scan_history(wf, scan_history_factory, ("hist1", 1), ("hist2", 2))
wf.x_mode = mode
c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id="hist1")
assert c.config.current_x_mode == mode
@pytest.mark.parametrize("mode", ["auto", "timestamp", "index", "samx"])
def test_history_curve_x_modes_post_plot(qtbot, mocked_client, scan_history_factory, mode):
"""
Test that changing x_mode after plotting history curves updates the curve on refresh.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
hist1, hist2 = inject_scan_history(wf, scan_history_factory, ("hist1", 1), ("hist2", 2))
c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id="hist1")
# Change x_mode after plotting
wf.x_mode = mode
# Refresh history curves
wf._refresh_history_curves()
assert c.config.current_x_mode == mode
def test_history_curve_incompatible_x_mode_hides_curve(qtbot, mocked_client, scan_history_factory):
"""
Test that setting an x_mode not present in stored data hides the history curve.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
wf.x_mode = "nonexistent_device"
# Inject history scan for this test
[history_msg] = inject_scan_history(wf, scan_history_factory, ("hist_bad", 1))
# Plot history curve
c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id=history_msg.scan_id)
# Curve should be hidden due to incompatible x_mode
assert not c.isVisible()
def test_fetch_history_data_no_stored_data_raises(
qtbot, mocked_client, monkeypatch, suppress_message_box
):
"""
Test that fetching history data when stored_data_info is missing raises ValueError.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
# Create a dummy scan_item lacking stored_data_info
dummy_scan = SimpleNamespace(
_msg=SimpleNamespace(stored_data_info=None),
devices={},
metadata={"bec": {"scan_id": "dummy", "scan_number": 1, "scan_report_devices": []}},
)
# Force get_history_scan_item to return our dummy
monkeypatch.setattr(wf, "get_history_scan_item", lambda scan_id, scan_index: dummy_scan)
# Attempt to plot history curve should be suppressed by SafeSlot and return None
c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id="dummy", scan_number=1)
assert c is None
assert len(wf.curves) == 0
def test_history_curve_device_missing_returns_none(qtbot, mocked_client, scan_history_factory):
"""
If the y-device is not in stored_data_info, plot should return None.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
wf.x_mode = "index"
[history_msg] = inject_scan_history(wf, scan_history_factory, ("hist_dev_missing", 1))
c = wf.plot(y_name="non-existing", y_entry="non-existing", scan_id=history_msg.scan_id)
assert c is None
def test_history_curve_custom_shape_mismatch_hides_curve(
qtbot, mocked_client, scan_history_factory
):
"""
For custom x-mode, if x and y shapes mismatch, curve should be hidden.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
wf.x_mode = "async_device"
[history_msg] = inject_scan_history(wf, scan_history_factory, ("hist_custom_shape", 1))
# Force shape mismatch for x-data
c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id=history_msg.scan_id)
assert c is not None
assert not c.isVisible()
def test_history_curve_index_mode_plots_curve(qtbot, mocked_client, scan_history_factory):
"""
Test that setting x_mode to 'index' plots and shows the history curve correctly.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
wf.x_mode = "index"
[history_msg] = inject_scan_history(wf, scan_history_factory, ("hist_index", 1))
c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id=history_msg.scan_id)
assert c is not None
assert c.isVisible()
assert c.config.current_x_mode == "index"
def test_history_curve_timestamp_mode_plots_curve(qtbot, mocked_client, scan_history_factory):
"""
Test that setting x_mode to 'timestamp' plots and shows the history curve correctly.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
wf.x_mode = "timestamp"
[history_msg] = inject_scan_history(wf, scan_history_factory, ("hist_time", 1))
c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id=history_msg.scan_id)
assert c is not None
assert c.isVisible()
assert c.config.current_x_mode == "timestamp"
def test_history_curve_auto_valid_uses_first_report_device(
qtbot, mocked_client, scan_history_factory
):
"""
Test that 'auto' x_mode uses the first available report device and shows the curve.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
wf.x_mode = "auto"
[history_msg] = inject_scan_history(wf, scan_history_factory, ("hist_auto_valid", 1))
# Plot history curve
c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id=history_msg.scan_id)
assert c is not None
assert c.isVisible()
# Should have fallen back to the first scan_report_device
assert c.config.current_x_mode == "auto"
def test_history_curve_file_not_found_returns_none(qtbot, mocked_client, scan_history_factory):
"""
If the history file path does not exist, plot should return None.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
wf.x_mode = "index"
# Inject a valid history message then corrupt its file_path
[history_msg] = inject_scan_history(wf, scan_history_factory, ("bad_file", 1))
history_msg.file_path = "/nonexistent/path.h5"
c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id=history_msg.scan_id)
assert c is None
def test_history_curve_scan_not_found_returns_none(qtbot, mocked_client):
"""
If the requested scan_id is not in history, plot should return None.
"""
wf = create_widget(qtbot, Waveform, client=mocked_client)
wf.x_mode = "index"
# No history scans injected for this widget
c = wf.plot(y_name="bpm4i", y_entry="bpm4i", scan_id="unknown_scan")
assert c is None

View File

@@ -3,9 +3,10 @@ from unittest import mock
import pytest
from qtpy.QtNetwork import QAuthenticator
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.editors.web_console.web_console import WebConsole, _web_console_registry
from .client_mocks import mocked_client
@pytest.fixture
def console_widget(qtbot, mocked_client):

View File

@@ -1,9 +1,10 @@
import pytest
from qtpy.QtCore import QUrl
from bec_widgets.tests.client_mocks import mocked_client
from bec_widgets.widgets.editors.website.website import WebsiteWidget
from .client_mocks import mocked_client
@pytest.fixture
def website_widget(qtbot, mocked_client):