1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-10 18:50:55 +02:00

Compare commits

..

21 Commits

Author SHA1 Message Date
3653fe9799 wip - fix formatter 2025-03-13 10:32:04 +01:00
750350dc52 wip - removed rpcreference import 2025-03-13 10:30:04 +01:00
9eb1608b01 wip - test namespace 2025-03-13 10:24:35 +01:00
0433b40054 wip - namespace 2025-03-13 10:24:35 +01:00
d1a41752c4 wip - namespace 2025-03-13 10:24:35 +01:00
bf060b3aba wip - namespace 2025-03-13 10:24:35 +01:00
5f0dd62f25 wip - namespace 2025-03-13 10:24:35 +01:00
21b1f0b2de wip - namespace 2025-03-13 10:24:35 +01:00
17f6dbb0d4 wip - namespace 2025-03-13 10:24:35 +01:00
ba347e026a wip - namespace update 2025-03-13 10:06:11 +01:00
705f157c04 docs(plot_base): update docstrings for properties and setters 2025-03-06 16:07:56 +01:00
4736c2fad1 refactor(waveform_widget): removed and replaced by Waveform 2025-03-06 16:07:56 +01:00
31b40aeede test(plot_indicators): tests adapted to not be dependent on BECWaveformWidget 2025-03-06 16:07:56 +01:00
77e8a5c884 fix(plot_indicators): cleanup adjusted 2025-03-06 16:07:56 +01:00
0f4365bbb0 feat(waveform): new Waveform widget based on NextGen PlotBase 2025-03-06 16:07:56 +01:00
906ca03929 fix(entry_validator): validator reports list of signal if user chooses the wrong one 2025-03-06 16:07:56 +01:00
1206069a8f fix(plot_base): update mouse mode state on mode change 2025-03-06 16:07:56 +01:00
86487a5f4d fix(plot_base): aspect ratio removed from the PlotBase 2025-03-06 16:07:56 +01:00
4bdcae7028 fix(plot_base): inner and outer axis setting in popup mode 2025-03-06 16:07:56 +01:00
81f61f3c3b fix(plot_base): fix cleanup of popups if popups are still open when PlotBase is closed 2025-03-06 16:07:56 +01:00
89e8ebf1b6 fix(lmfit_dialog_vertical): vertical sizePolicy fixed 2025-03-06 16:07:56 +01:00
54 changed files with 1483 additions and 5060 deletions

View File

@@ -197,13 +197,7 @@ end-2-end-conda:
script:
- *clone-repos
- *install-os-packages
- conda config --show-sources
- conda config --add channels conda-forge
- conda config --system --remove channels https://repo.anaconda.com/pkgs/main
- conda config --system --remove channels https://repo.anaconda.com/pkgs/r
- conda config --remove channels https://repo.anaconda.com/pkgs/main
- conda config --remove channels https://repo.anaconda.com/pkgs/r
- conda config --show-sources
- conda config --prepend channels conda-forge
- conda config --set channel_priority strict
- conda config --set always_yes yes --set changeps1 no
- conda create -q -n test-environment python=3.11
@@ -233,7 +227,6 @@ end-2-end-conda:
- if: '$CI_PIPELINE_SOURCE == "parent_pipeline"'
- if: '$CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "main"'
- if: '$CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "production"'
- if: '$CI_MERGE_REQUEST_TARGET_BRANCH_NAME =~ /^pre_release.*$/'
semver:
stage: Deploy

View File

@@ -1,14 +1,6 @@
# CHANGELOG
## v1.25.0 (2025-03-07)
### Features
- **waveform**: Add slice handling and reset functionality for async updates
([`7cbebbb`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/7cbebbb1f00ea2e2b3678c96b183a877e59c5240))
## v1.24.5 (2025-03-06)
### Bug Fixes

File diff suppressed because it is too large Load Diff

View File

@@ -9,9 +9,11 @@ import os
import select
import subprocess
import threading
import time
from contextlib import contextmanager
from typing import TYPE_CHECKING
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
from rich.console import Console
@@ -28,6 +30,7 @@ if TYPE_CHECKING:
from bec_lib.redis_connector import StreamMessage
else:
messages = lazy_import("bec_lib.messages")
# from bec_lib.connector import MessageObject
MessageObject = lazy_import_from("bec_lib.connector", ("MessageObject",))
StreamMessage = lazy_import_from("bec_lib.redis_connector", ("StreamMessage",))
@@ -71,7 +74,7 @@ def _get_output(process, logger) -> None:
def _start_plot_process(
gui_id: str, gui_class: type, gui_class_id: str, config: dict | str, logger=None
) -> tuple[subprocess.Popen[str], threading.Thread | None]:
) -> None:
"""
Start the plot in a new process.
@@ -204,7 +207,7 @@ class BECDockArea(client.BECDockArea):
class BECGuiClient(RPCBase):
"""BEC GUI client class. Container for GUI applications within Python."""
_top_level: dict[str, BECDockArea] = {}
_top_level = {}
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
@@ -220,7 +223,7 @@ class BECGuiClient(RPCBase):
@property
def windows(self) -> dict:
"""Dictionary with dock areas in the GUI."""
"""Dictionary with dock ares in the GUI."""
return self._top_level
@property
@@ -293,20 +296,20 @@ class BECGuiClient(RPCBase):
# return self.auto_updates.do_update(msg)
def _gui_post_startup(self):
# if self._auto_updates_enabled:
# if self._auto_updates is None:
# auto_updates = self._get_update_script()
# if auto_updates is None:
# AutoUpdates.create_default_dock = True
# AutoUpdates.enabled = True
# auto_updates = AutoUpdates(self._top_level["main"].widget)
# if auto_updates.create_default_dock:
# auto_updates.start_default_dock()
# self._start_update_script()
# self._auto_updates = auto_updates
self._top_level[self._default_dock_name] = BECDockArea(
gui_id=f"{self._default_dock_name}", name=self._default_dock_name, parent=self
self._top_level["main"] = WidgetDesc(
title="BEC Widgets", widget=BECDockArea(gui_id=self._gui_id)
)
if self._auto_updates_enabled:
if self._auto_updates is None:
auto_updates = self._get_update_script()
if auto_updates is None:
AutoUpdates.create_default_dock = True
AutoUpdates.enabled = True
auto_updates = AutoUpdates(self._top_level["main"].widget)
if auto_updates.create_default_dock:
auto_updates.start_default_dock()
self._start_update_script()
self._auto_updates = auto_updates
self._do_show_all()
self._gui_started_event.set()
@@ -345,9 +348,8 @@ class BECGuiClient(RPCBase):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
return rpc_client._run_rpc("_dump")
def start(self, wait: bool = True) -> None:
"""Start the server and show the GUI window."""
return self._start_server(wait=wait)
def start(self):
return self.start_server()
def _do_show_all(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
@@ -404,13 +406,11 @@ class BECGuiClient(RPCBase):
widget = rpc_client._run_rpc(
"new_dock_area", name, geometry
) # pylint: disable=protected-access
self._top_level[widget.widget_name] = widget
return widget
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
widget = rpc_client._run_rpc(
"new_dock_area", name, geometry
) # pylint: disable=protected-access
self._top_level[widget.widget_name] = widget
return widget
def delete(self, name: str) -> None:
@@ -453,17 +453,3 @@ class BECGuiClient(RPCBase):
self._process_output_processing_thread.join()
self._process.wait()
self._process = None
if __name__ == "__main__": # pragma: no cover
from bec_lib.client import BECClient
from bec_lib.service_config import ServiceConfig
config = ServiceConfig()
client = BECClient(config)
client.start()
# Test the client_utils.py module
gui = BECGuiClient()
gui.start()
print(gui.window_list)

View File

@@ -8,7 +8,7 @@ from weakref import WeakValueDictionary
from bec_lib.logger import bec_logger
from qtpy.QtCore import QObject
if TYPE_CHECKING: # pragma: no cover
if TYPE_CHECKING:
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.widgets.containers.dock.dock import BECDock
@@ -73,6 +73,20 @@ class RPCRegister:
rpc_object = self._rpc_register.get(gui_id, None)
return rpc_object
def get_rpc_by_name(self, name: str) -> QObject | None:
"""
Get an RPC object by its name.
Args:
name(str): The name of the RPC object to be retrieved.
Returns:
QObject | None: The RPC object with the given name.
"""
rpc_object = [rpc for rpc in self._rpc_register if rpc._name == name]
rpc_object = rpc_object[0] if len(rpc_object) > 0 else None
return rpc_object
def list_all_connections(self) -> dict:
"""
List all the registered RPC objects.

View File

@@ -13,7 +13,7 @@ class RPCWidgetHandler:
self._widget_classes = None
@property
def widget_classes(self) -> dict[str, type[BECWidget]]:
def widget_classes(self) -> dict[str, Any]:
"""
Get the available widget classes.
@@ -50,7 +50,9 @@ class RPCWidgetHandler:
Returns:
widget(BECWidget): The created widget.
"""
widget_class = self.widget_classes.get(widget_type) # type: ignore
if self._widget_classes is None:
self.update_available_widgets()
widget_class = self._widget_classes.get(widget_type) # type: ignore
if widget_class:
return widget_class(name=name, **kwargs)
raise ValueError(f"Unknown widget type: {widget_type}")

View File

@@ -69,8 +69,7 @@ class BECWidgetsCLIServer:
self.gui_id = gui_id
# register broadcast callback
self.rpc_register = RPCRegister()
self.gui = gui_class(parent=None, name=gui_class_id, gui_id=gui_class_id)
# self.rpc_register.add_rpc(self.gui)
self.rpc_register.add_rpc(self.gui)
self.dispatcher.connect_slot(
self.on_rpc_update, MessageEndpoints.gui_instructions(self.gui_id)
@@ -84,6 +83,7 @@ class BECWidgetsCLIServer:
self.status = messages.BECStatus.RUNNING
logger.success(f"Server started with gui_id: {self.gui_id}")
# Create initial object -> BECFigure or BECDockArea
self.gui = gui_class(parent=None, name=gui_class_id)
def on_rpc_update(self, msg: dict, metadata: dict):
request_id = metadata.get("request_id")

View File

@@ -15,14 +15,12 @@ from qtpy.QtWidgets import (
)
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.widget_io import WidgetHierarchy as wh
from bec_widgets.utils.colors import apply_theme
from bec_widgets.widgets.containers.dock import BECDockArea
from bec_widgets.widgets.containers.figure import BECFigure
from bec_widgets.widgets.containers.layout_manager.layout_manager import LayoutManagerWidget
from bec_widgets.widgets.editors.jupyter_console.jupyter_console import BECJupyterConsole
from bec_widgets.widgets.plots_next_gen.image.image import Image
from bec_widgets.widgets.plots_next_gen.plot_base import PlotBase
from bec_widgets.widgets.plots_next_gen.scatter_waveform.scatter_waveform import ScatterWaveform
from bec_widgets.widgets.plots_next_gen.waveform.waveform import Waveform
@@ -40,7 +38,6 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
{
"np": np,
"pg": pg,
"wh": wh,
"fig": self.figure,
"dock": self.dock,
"w1": self.w1,
@@ -54,8 +51,8 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
"w9": self.w9,
"w10": self.w10,
"d0": self.d0,
"d1": self.d1,
"im": self.im,
"mi": self.mi,
"mm": self.mm,
"mw": self.mw,
"lm": self.lm,
@@ -68,8 +65,6 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
"pb": self.pb,
"pi": self.pi,
"wf": self.wf,
"scatter": self.scatter,
"scatter_mi": self.scatter,
}
)
@@ -128,24 +123,6 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
fifth_tab_layout.addWidget(self.wf)
tab_widget.addTab(fifth_tab, "Waveform Next Gen")
tab_widget.setCurrentIndex(4)
sixth_tab = QWidget()
sixth_tab_layout = QVBoxLayout(sixth_tab)
self.im = Image()
self.mi = self.im.main_image
sixth_tab_layout.addWidget(self.im)
tab_widget.addTab(sixth_tab, "Image Next Gen")
tab_widget.setCurrentIndex(5)
seventh_tab = QWidget()
seventh_tab_layout = QVBoxLayout(seventh_tab)
self.scatter = ScatterWaveform()
self.scatter_mi = self.scatter.main_curve
self.scatter.plot("samx", "samy", "bpm4i")
seventh_tab_layout.addWidget(self.scatter)
tab_widget.addTab(seventh_tab, "Scatter Waveform")
tab_widget.setCurrentIndex(6)
# add stuff to the new Waveform widget
self._init_waveform()
@@ -225,6 +202,14 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
self.mm = self.d0.new("BECMotorMapWidget")
self.mm.change_motors("samx", "samy")
self.d1 = self.dock.new(name="dock_1", position="right")
self.im = self.d1.new("BECImageWidget")
self.im.image("waveform", "1d")
self.d2 = self.dock.new(name="dock_2", position="bottom")
self.wf = self.d2.new("BECFigure", row=0, col=0)
self.mw = self.wf.multi_waveform(monitor="waveform") # , config=config)
self.mw = None # self.wf.multi_waveform(monitor="waveform") # , config=config)
self.dock.save_state()
@@ -250,6 +235,7 @@ if __name__ == "__main__": # pragma: no cover
app = QApplication(sys.argv)
app.setApplicationName("Jupyter Console")
app.setApplicationDisplayName("Jupyter Console")
apply_theme("dark")
icon = material_icon("terminal", color=(255, 255, 255, 255), filled=True)
app.setWindowIcon(icon)
@@ -259,7 +245,7 @@ if __name__ == "__main__": # pragma: no cover
win = JupyterConsoleWindow()
win.show()
win.resize(1500, 800)
win.resize(1200, 800)
app.aboutToQuit.connect(win.close)
sys.exit(app.exec_())

View File

@@ -279,6 +279,7 @@ class SwitchableToolBarAction(ToolBarAction):
self.main_button.setToolTip(default_action.tooltip)
self.main_button.clicked.connect(self._trigger_current_action)
menu = QMenu(self.main_button)
self.menu_actions = {}
for key, action_obj in self.actions.items():
menu_action = QAction(action_obj.get_icon(), action_obj.tooltip, self.main_button)
menu_action.setIconVisibleInMenu(True)
@@ -286,54 +287,23 @@ class SwitchableToolBarAction(ToolBarAction):
menu_action.setChecked(key == self.current_key)
menu_action.triggered.connect(lambda checked, k=key: self.set_default_action(k))
menu.addAction(menu_action)
self.menu_actions[key] = menu_action
self.main_button.setMenu(menu)
toolbar.addWidget(self.main_button)
def _trigger_current_action(self):
"""
Triggers the current action associated with the main button.
"""
action_obj = self.actions[self.current_key]
action_obj.action.trigger()
def set_default_action(self, key: str):
"""
Sets the default action for the split action.
Args:
key(str): The key of the action to set as default.
"""
self.current_key = key
new_action = self.actions[self.current_key]
self.main_button.setIcon(new_action.get_icon())
self.main_button.setToolTip(new_action.tooltip)
# Update check state of menu items
for k, menu_act in self.actions.items():
menu_act.action.setChecked(False)
for k, menu_act in self.menu_actions.items():
menu_act.setChecked(k == key)
new_action.action.trigger()
# Active action chosen from menu is always checked, uncheck through main button
if self.checkable:
new_action.action.setChecked(True)
self.main_button.setChecked(True)
def block_all_signals(self, block: bool = True):
"""
Blocks or unblocks all signals for the actions in the toolbar.
Args:
block (bool): Whether to block signals. Defaults to True.
"""
self.main_button.blockSignals(block)
for action in self.actions.values():
action.action.blockSignals(block)
def set_state_all(self, state: bool):
"""
Uncheck all actions in the toolbar.
"""
for action in self.actions.values():
action.action.setChecked(state)
self.main_button.setChecked(state)
def get_icon(self) -> QIcon:
return self.actions[self.current_key].get_icon()
@@ -348,18 +318,11 @@ class WidgetAction(ToolBarAction):
widget (QWidget): The widget to be added to the toolbar.
"""
def __init__(
self,
label: str | None = None,
widget: QWidget = None,
adjust_size: bool = True,
parent=None,
):
def __init__(self, label: str | None = None, widget: QWidget = None, parent=None):
super().__init__(icon_path=None, tooltip=label, checkable=False)
self.label = label
self.widget = widget
self.container = None
self.adjust_size = adjust_size
def add_to_toolbar(self, toolbar: QToolBar, target: QWidget):
"""
@@ -380,7 +343,7 @@ class WidgetAction(ToolBarAction):
label_widget.setAlignment(Qt.AlignVCenter | Qt.AlignRight)
layout.addWidget(label_widget)
if isinstance(self.widget, QComboBox) and self.adjust_size:
if isinstance(self.widget, QComboBox):
self.widget.setSizeAdjustPolicy(QComboBox.AdjustToContents)
size_policy = QSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
@@ -864,7 +827,7 @@ class MainWindow(QMainWindow): # pragma: no cover
def add_bundles(self):
home_action = MaterialIconAction(
icon_name="home", tooltip="Home", checkable=False, parent=self
icon_name="home", tooltip="Home", checkable=True, parent=self
)
settings_action = MaterialIconAction(
icon_name="settings", tooltip="Settings", checkable=True, parent=self
@@ -881,7 +844,6 @@ class MainWindow(QMainWindow): # pragma: no cover
],
)
self.toolbar.add_bundle(main_actions_bundle, target_widget=self)
home_action.action.triggered.connect(lambda: self.switchable_action.set_state_all(False))
search_action = MaterialIconAction(
icon_name="search", tooltip="Search", checkable=False, parent=self
@@ -935,20 +897,20 @@ class MainWindow(QMainWindow): # pragma: no cover
def add_switchable_button_checkable(self):
action1 = MaterialIconAction(
icon_name="hdr_auto", tooltip="Action 1", checkable=True, parent=self
icon_name="counter_1", tooltip="Action 1", checkable=True, parent=self
)
action2 = MaterialIconAction(
icon_name="hdr_auto", tooltip="Action 2", checkable=True, filled=True, parent=self
icon_name="counter_2", tooltip="Action 2", checkable=True, parent=self
)
self.switchable_action = SwitchableToolBarAction(
switchable_action = SwitchableToolBarAction(
actions={"action1": action1, "action2": action2},
initial_action="action1",
tooltip="Switchable Action",
checkable=True,
parent=self,
)
self.toolbar.add_action("switchable_action", self.switchable_action, self)
self.toolbar.add_action("switchable_action", switchable_action, self)
action1.action.toggled.connect(
lambda checked: self.test_label.setText(f"Action 1 triggered, checked = {checked}")
@@ -969,20 +931,16 @@ class MainWindow(QMainWindow): # pragma: no cover
actions={"action1": action1, "action2": action2},
initial_action="action1",
tooltip="Switchable Action",
checkable=False,
checkable=True,
parent=self,
)
self.toolbar.add_action("switchable_action_no_toggle", switchable_action, self)
action1.action.triggered.connect(
lambda checked: self.test_label.setText(
f"Action 1 (non-checkable) triggered, checked = {checked}"
)
lambda checked: self.test_label.setText(f"Action 1 triggered, checked = {checked}")
)
action2.action.triggered.connect(
lambda checked: self.test_label.setText(
f"Action 2 (non-checkable) triggered, checked = {checked}"
)
lambda checked: self.test_label.setText(f"Action 2 triggered, checked = {checked}")
)
switchable_action.actions["action1"].action.setChecked(True)

View File

@@ -11,7 +11,7 @@ from bec_widgets.utils.bec_connector import BECConnector, ConnectionConfig
from bec_widgets.utils.colors import set_theme
from bec_widgets.utils.container_utils import WidgetContainerUtils
if TYPE_CHECKING: # pragma: no cover
if TYPE_CHECKING:
from bec_widgets.widgets.containers.dock import BECDock
logger = bec_logger.logger
@@ -55,7 +55,13 @@ class BECWidget(BECConnector):
"""
if not isinstance(self, QWidget):
raise RuntimeError(f"{repr(self)} is not a subclass of QWidget")
# Create a default name if None is provided
if name is None:
name = "bec_widget_init_without_name"
# name = self.__class__.__name__
# Check for invalid chars in the name
if not WidgetContainerUtils.has_name_valid_chars(name):
raise ValueError(f"Name {name} contains invalid characters.")
super().__init__(client=client, config=config, gui_id=gui_id, name=name)
self._parent_dock = parent_dock
app = QApplication.instance()
@@ -98,7 +104,8 @@ class BECWidget(BECConnector):
def cleanup(self):
"""Cleanup the widget."""
# All widgets need to call super().cleanup() in their cleanup method
# needed here instead of closeEvent, to be checked why
# However, all widgets need to call super().cleanup() in their cleanup method
self.rpc_register.remove_rpc(self)
def closeEvent(self, event):

View File

@@ -1,7 +1,4 @@
from __future__ import annotations
from collections import defaultdict
from typing import Any
import numpy as np
import pyqtgraph as pg
@@ -200,18 +197,15 @@ class Crosshair(QObject):
self.marker_2d = pg.ROI(
[0, 0], size=[1, 1], pen=pg.mkPen("r", width=2), movable=False
)
self.marker_2d.skip_auto_range = True
self.plot_item.addItem(self.marker_2d)
def snap_to_data(
self, x: float, y: float
) -> tuple[None, None] | tuple[defaultdict[Any, list], defaultdict[Any, list]]:
def snap_to_data(self, x, y) -> tuple[defaultdict[list], defaultdict[list]]:
"""
Finds the nearest data points to the given x and y coordinates.
Args:
x(float): The x-coordinate of the mouse cursor
y(float): The y-coordinate of the mouse cursor
x: The x-coordinate of the mouse cursor
y: The y-coordinate of the mouse cursor
Returns:
tuple: x and y values snapped to the nearest data
@@ -241,7 +235,7 @@ class Crosshair(QObject):
y_values[name] = closest_y
x_values[name] = closest_x
elif isinstance(item, pg.ImageItem): # 2D plot
name = item.config.monitor or str(id(item))
name = item.config.monitor
image_2d = item.image
# Clip the x and y values to the image dimensions to avoid out of bounds errors
y_values[name] = int(np.clip(y, 0, image_2d.shape[1] - 1))
@@ -326,7 +320,7 @@ class Crosshair(QObject):
)
self.coordinatesChanged1D.emit(coordinate_to_emit)
elif isinstance(item, pg.ImageItem):
name = item.config.monitor or str(id(item))
name = item.config.monitor
x, y = x_snap_values[name], y_snap_values[name]
if x is None or y is None:
continue
@@ -380,7 +374,7 @@ class Crosshair(QObject):
)
self.coordinatesClicked1D.emit(coordinate_to_emit)
elif isinstance(item, pg.ImageItem):
name = item.config.monitor or str(id(item))
name = item.config.monitor
x, y = x_snap_values[name], y_snap_values[name]
if x is None or y is None:
continue
@@ -424,17 +418,9 @@ class Crosshair(QObject):
"""
x, y = pos
x_scaled, y_scaled = self.scale_emitted_coordinates(x, y)
text = f"({x_scaled:.{self.precision}g}, {y_scaled:.{self.precision}g})"
for item in self.items:
if isinstance(item, pg.ImageItem):
image = item.image
ix = int(np.clip(x, 0, image.shape[0] - 1))
iy = int(np.clip(y, 0, image.shape[1] - 1))
intensity = image[ix, iy]
text += f"\nIntensity: {intensity:.{self.precision}g}"
break
# Update coordinate label
self.coord_label.setText(text)
self.coord_label.setText(f"({x_scaled:.{self.precision}g}, {y_scaled:.{self.precision}g})")
self.coord_label.setPos(x, y)
self.coord_label.setVisible(True)
@@ -450,9 +436,6 @@ class Crosshair(QObject):
self.clear_markers()
def cleanup(self):
if self.marker_2d is not None:
self.plot_item.removeItem(self.marker_2d)
self.marker_2d = None
self.plot_item.removeItem(self.v_line)
self.plot_item.removeItem(self.h_line)
self.plot_item.removeItem(self.coord_label)

View File

@@ -393,7 +393,7 @@ class BECDock(BECWidget, Dock):
if widget in self.widgets:
self.widgets.remove(widget)
widget.close()
# self._broadcast_update()
self._broadcast_update()
def delete_all(self):
"""
@@ -434,7 +434,7 @@ class BECDock(BECWidget, Dock):
super().close()
if __name__ == "__main__": # pragma: no cover
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication

View File

@@ -25,10 +25,9 @@ from bec_widgets.widgets.containers.dock.dock import BECDock, DockConfig
from bec_widgets.widgets.control.device_control.positioner_box import PositionerBox
from bec_widgets.widgets.control.scan_control.scan_control import ScanControl
from bec_widgets.widgets.editors.vscode.vscode import VSCodeEditor
from bec_widgets.widgets.plots.image.image_widget import BECImageWidget
from bec_widgets.widgets.plots.motor_map.motor_map_widget import BECMotorMapWidget
from bec_widgets.widgets.plots.multi_waveform.multi_waveform_widget import BECMultiWaveformWidget
from bec_widgets.widgets.plots_next_gen.image.image import Image
from bec_widgets.widgets.plots_next_gen.scatter_waveform.scatter_waveform import ScatterWaveform
from bec_widgets.widgets.plots_next_gen.waveform.waveform import Waveform
from bec_widgets.widgets.progress.ring_progress_bar.ring_progress_bar import RingProgressBar
from bec_widgets.widgets.services.bec_queue.bec_queue import BECQueue
@@ -97,18 +96,13 @@ class BECDockArea(BECWidget, QWidget):
"waveform": MaterialIconAction(
icon_name=Waveform.ICON_NAME, tooltip="Add Waveform", filled=True
),
"scatter_waveform": MaterialIconAction(
icon_name=ScatterWaveform.ICON_NAME,
tooltip="Add Scatter Waveform",
filled=True,
),
"multi_waveform": MaterialIconAction(
icon_name=BECMultiWaveformWidget.ICON_NAME,
tooltip="Add Multi Waveform",
filled=True,
),
"image": MaterialIconAction(
icon_name=Image.ICON_NAME, tooltip="Add Image", filled=True
icon_name=BECImageWidget.ICON_NAME, tooltip="Add Image", filled=True
),
"motor_map": MaterialIconAction(
icon_name=BECMotorMapWidget.ICON_NAME,
@@ -182,14 +176,11 @@ class BECDockArea(BECWidget, QWidget):
self.toolbar.widgets["menu_plots"].widgets["waveform"].triggered.connect(
lambda: self._create_widget_from_toolbar(widget_name="Waveform")
)
self.toolbar.widgets["menu_plots"].widgets["scatter_waveform"].triggered.connect(
lambda: self._create_widget_from_toolbar(widget_name="ScatterWaveform")
)
self.toolbar.widgets["menu_plots"].widgets["multi_waveform"].triggered.connect(
lambda: self._create_widget_from_toolbar(widget_name="BECMultiWaveformWidget")
)
self.toolbar.widgets["menu_plots"].widgets["image"].triggered.connect(
lambda: self._create_widget_from_toolbar(widget_name="Image")
lambda: self._create_widget_from_toolbar(widget_name="BECImageWidget")
)
self.toolbar.widgets["menu_plots"].widgets["motor_map"].triggered.connect(
lambda: self._create_widget_from_toolbar(widget_name="BECMotorMapWidget")
@@ -476,7 +467,7 @@ class BECDockArea(BECWidget, QWidget):
dock.hide_title_bar()
else:
raise ValueError(f"Dock with name {dock_name} does not exist.")
# self._broadcast_update()
self._broadcast_update()
def remove(self) -> None:
"""Remove the dock area."""

View File

@@ -33,7 +33,6 @@ class ImageConfig(SubplotConfig):
)
# TODO old version will be deprecated
class BECImageShow(BECPlotBase):
USER_ACCESS = [
"_rpc_id",

View File

@@ -41,7 +41,6 @@ class ImageItemConfig(ConnectionConfig):
)
# TODO old version will be deprecated
class BECImageItem(BECConnector, pg.ImageItem):
USER_ACCESS = [
"_rpc_id",

View File

@@ -7,8 +7,6 @@ import numpy as np
from pydantic import BaseModel, Field
from qtpy.QtCore import QObject, Signal, Slot
# TODO will be deleted
@dataclass
class ImageStats:

View File

@@ -126,8 +126,6 @@ class BECWaveform(BECPlotBase):
"label_suffix": "",
}
self._slice_index = None
# Scan segment update proxy
self.proxy_update_plot = pg.SignalProxy(
self.scan_signal_update, rateLimit=25, slot=self._update_scan_curves
@@ -1254,9 +1252,7 @@ class BECWaveform(BECPlotBase):
x_data = None
instruction = metadata.get("async_update", {}).get("type")
max_shape = metadata.get("async_update", {}).get("max_shape", [])
all_async_curves = self._curves_data["async"].values()
# for curve in self._curves_data["async"].values():
for curve in all_async_curves:
for curve in self._curves_data["async"].values():
y_entry = curve.config.signals.y.entry
x_name = self._x_axis_mode["name"]
for device, async_data in msg["signals"].items():
@@ -1280,18 +1276,6 @@ class BECWaveform(BECPlotBase):
curve.setData(x_data, new_data)
else:
curve.setData(new_data)
elif instruction == "add_slice":
current_slice_id = metadata.get("async_update", {}).get("index")
data_plot = async_data["value"]
if current_slice_id != self._slice_index:
self._slice_index = current_slice_id
new_data = data_plot
else:
x_data, y_data = curve.get_data()
new_data = np.hstack((y_data, data_plot))
curve.setData(new_data)
elif instruction == "replace":
if x_name == "timestamp":
x_data = async_data["timestamp"]
@@ -1540,10 +1524,6 @@ class BECWaveform(BECPlotBase):
for curve_id in curve_ids_to_remove:
self.remove_curve(curve_id)
def reset(self):
self._slice_index = None
super().reset()
def clear_all(self):
sources = list(self._curves_data.keys())
for source in sources:

View File

@@ -71,4 +71,5 @@ class BECMainWindow(BECWidget, QMainWindow):
return dock_area
def cleanup(self):
# TODO
super().close()

View File

@@ -0,0 +1 @@
{'files': ['image_widget.py']}

View File

@@ -1,39 +1,43 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
import os
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
import bec_widgets
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.plots_next_gen.image.image import Image
from bec_widgets.widgets.plots.image.image_widget import BECImageWidget
DOM_XML = """
<ui language='c++'>
<widget class='Image' name='image'>
<widget class='BECImageWidget' name='bec_image_widget'>
</widget>
</ui>
"""
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
class ImagePlugin(QDesignerCustomWidgetInterface): # pragma: no cover
class BECImageWidgetPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = Image(parent)
t = BECImageWidget(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return "Plot Widgets Next Gen"
return "BEC Plots"
def icon(self):
return designer_material_icon(Image.ICON_NAME)
return designer_material_icon(BECImageWidget.ICON_NAME)
def includeFile(self):
return "image"
return "bec_image_widget"
def initialize(self, form_editor):
self._form_editor = form_editor
@@ -45,10 +49,10 @@ class ImagePlugin(QDesignerCustomWidgetInterface): # pragma: no cover
return self._form_editor is not None
def name(self):
return "Image"
return "BECImageWidget"
def toolTip(self):
return "Image"
return "BECImageWidget"
def whatsThis(self):
return self.toolTip()

View File

@@ -0,0 +1,515 @@
from __future__ import annotations
import sys
from typing import Literal, Optional
import pyqtgraph as pg
from bec_lib.device import ReadoutPriority
from qtpy.QtWidgets import QComboBox, QVBoxLayout, QWidget
from bec_widgets.qt_utils.error_popups import SafeSlot, WarningPopupUtility
from bec_widgets.qt_utils.settings_dialog import SettingsDialog
from bec_widgets.qt_utils.toolbar import (
DeviceSelectionAction,
MaterialIconAction,
ModularToolBar,
SeparatorAction,
WidgetAction,
)
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.widgets.containers.figure import BECFigure
from bec_widgets.widgets.containers.figure.plots.axis_settings import AxisSettings
from bec_widgets.widgets.containers.figure.plots.image.image import ImageConfig
from bec_widgets.widgets.containers.figure.plots.image.image_item import BECImageItem
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
class BECImageWidget(BECWidget, QWidget):
PLUGIN = True
ICON_NAME = "image"
USER_ACCESS = [
"image",
"set",
"set_title",
"set_x_label",
"set_y_label",
"set_x_scale",
"set_y_scale",
"set_x_lim",
"set_y_lim",
"set_vrange",
"set_fft",
"set_transpose",
"set_rotation",
"set_log",
"set_grid",
"enable_fps_monitor",
"lock_aspect_ratio",
]
def __init__(
self,
parent: QWidget | None = None,
config: ImageConfig | dict = None,
client=None,
gui_id: str | None = None,
**kwargs,
) -> None:
if config is None:
config = ImageConfig(widget_class=self.__class__.__name__)
else:
if isinstance(config, dict):
config = ImageConfig(**config)
super().__init__(client=client, gui_id=gui_id, **kwargs)
QWidget.__init__(self, parent)
self.layout = QVBoxLayout(self)
self.layout.setSpacing(0)
self.layout.setContentsMargins(0, 0, 0, 0)
self.fig = BECFigure()
self.dim_combo_box = QComboBox()
self.dim_combo_box.addItems(["1d", "2d"])
self.toolbar = ModularToolBar(
actions={
"monitor": DeviceSelectionAction(
"Monitor:",
DeviceComboBox(
device_filter=BECDeviceFilter.DEVICE,
readout_priority_filter=[ReadoutPriority.ASYNC],
),
),
"monitor_type": WidgetAction(widget=self.dim_combo_box),
"connect": MaterialIconAction(icon_name="link", tooltip="Connect Device"),
"separator_0": SeparatorAction(),
"save": MaterialIconAction(icon_name="save", tooltip="Open Export Dialog"),
"separator_1": SeparatorAction(),
"drag_mode": MaterialIconAction(
icon_name="open_with", tooltip="Drag Mouse Mode", checkable=True
),
"rectangle_mode": MaterialIconAction(
icon_name="frame_inspect", tooltip="Rectangle Zoom Mode", checkable=True
),
"auto_range": MaterialIconAction(
icon_name="open_in_full", tooltip="Autorange Plot"
),
"auto_range_image": MaterialIconAction(
icon_name="hdr_auto", tooltip="Autorange Image Intensity", checkable=True
),
"aspect_ratio": MaterialIconAction(
icon_name="aspect_ratio", tooltip="Lock image aspect ratio", checkable=True
),
"separator_2": SeparatorAction(),
"FFT": MaterialIconAction(icon_name="fft", tooltip="Toggle FFT", checkable=True),
"log": MaterialIconAction(
icon_name="log_scale", tooltip="Toggle log scale", checkable=True
),
"transpose": MaterialIconAction(
icon_name="transform", tooltip="Transpose Image", checkable=True
),
"rotate_right": MaterialIconAction(
icon_name="rotate_right", tooltip="Rotate image clockwise by 90 deg"
),
"rotate_left": MaterialIconAction(
icon_name="rotate_left", tooltip="Rotate image counterclockwise by 90 deg"
),
"reset": MaterialIconAction(
icon_name="reset_settings", tooltip="Reset Image Settings"
),
"separator_3": SeparatorAction(),
"fps_monitor": MaterialIconAction(
icon_name="speed", tooltip="Show FPS Monitor", checkable=True
),
"axis_settings": MaterialIconAction(
icon_name="settings", tooltip="Open Configuration Dialog"
),
},
target_widget=self,
)
self.layout.addWidget(self.toolbar)
self.layout.addWidget(self.fig)
self.warning_util = WarningPopupUtility(self)
self._image = self.fig.image()
self._image.apply_config(config)
self.rotation = 0
self.config = config
self._hook_actions()
self.toolbar.widgets["drag_mode"].action.setChecked(True)
self.toolbar.widgets["auto_range_image"].action.setChecked(True)
def _hook_actions(self):
self.toolbar.widgets["connect"].action.triggered.connect(self._connect_action)
# sepatator
self.toolbar.widgets["save"].action.triggered.connect(self.export)
# sepatator
self.toolbar.widgets["drag_mode"].action.triggered.connect(self.enable_mouse_pan_mode)
self.toolbar.widgets["rectangle_mode"].action.triggered.connect(
self.enable_mouse_rectangle_mode
)
self.toolbar.widgets["auto_range"].action.triggered.connect(self.toggle_auto_range)
self.toolbar.widgets["auto_range_image"].action.triggered.connect(
self.toggle_image_autorange
)
self.toolbar.widgets["aspect_ratio"].action.triggered.connect(self.toggle_aspect_ratio)
# sepatator
self.toolbar.widgets["FFT"].action.triggered.connect(self.toggle_fft)
self.toolbar.widgets["log"].action.triggered.connect(self.toggle_log)
self.toolbar.widgets["transpose"].action.triggered.connect(self.toggle_transpose)
self.toolbar.widgets["rotate_left"].action.triggered.connect(self.rotate_left)
self.toolbar.widgets["rotate_right"].action.triggered.connect(self.rotate_right)
self.toolbar.widgets["reset"].action.triggered.connect(self.reset_settings)
# sepatator
self.toolbar.widgets["axis_settings"].action.triggered.connect(self.show_axis_settings)
self.toolbar.widgets["fps_monitor"].action.toggled.connect(self.enable_fps_monitor)
###################################
# Dialog Windows
###################################
@SafeSlot(popup_error=True)
def _connect_action(self):
monitor_combo = self.toolbar.widgets["monitor"].device_combobox
monitor_name = monitor_combo.currentText()
monitor_type = self.toolbar.widgets["monitor_type"].widget.currentText()
self.image(monitor=monitor_name, monitor_type=monitor_type)
monitor_combo.setStyleSheet("QComboBox { background-color: " "; }")
def show_axis_settings(self):
dialog = SettingsDialog(
self,
settings_widget=AxisSettings(),
window_title="Axis Settings",
config=self._config_dict["axis"],
)
dialog.exec()
###################################
# User Access Methods from image
###################################
@SafeSlot(popup_error=True)
def image(
self,
monitor: str,
monitor_type: Optional[Literal["1d", "2d"]] = "2d",
color_map: Optional[str] = "magma",
color_bar: Optional[Literal["simple", "full"]] = "full",
downsample: Optional[bool] = True,
opacity: Optional[float] = 1.0,
vrange: Optional[tuple[int, int]] = None,
# post_processing: Optional[PostProcessingConfig] = None,
**kwargs,
) -> BECImageItem:
if self.toolbar.widgets["monitor"].device_combobox.currentText() != monitor:
self.toolbar.widgets["monitor"].device_combobox.setCurrentText(monitor)
self.toolbar.widgets["monitor"].device_combobox.setStyleSheet(
"QComboBox {{ background-color: " "; }}"
)
if self.toolbar.widgets["monitor_type"].widget.currentText() != monitor_type:
self.toolbar.widgets["monitor_type"].widget.setCurrentText(monitor_type)
self.toolbar.widgets["monitor_type"].widget.setStyleSheet(
"QComboBox {{ background-color: " "; }}"
)
return self._image.image(
monitor=monitor,
monitor_type=monitor_type,
color_map=color_map,
color_bar=color_bar,
downsample=downsample,
opacity=opacity,
vrange=vrange,
**kwargs,
)
def set_vrange(self, vmin: float, vmax: float, name: str = None):
"""
Set the range of the color bar.
If name is not specified, then set vrange for all images.
Args:
vmin(float): Minimum value of the color bar.
vmax(float): Maximum value of the color bar.
name(str): The name of the image. If None, apply to all images.
"""
self._image.set_vrange(vmin, vmax, name)
def set_color_map(self, color_map: str, name: str = None):
"""
Set the color map of the image.
If name is not specified, then set color map for all images.
Args:
cmap(str): The color map of the image.
name(str): The name of the image. If None, apply to all images.
"""
self._image.set_color_map(color_map, name)
def set_fft(self, enable: bool = False, name: str = None):
"""
Set the FFT of the image.
If name is not specified, then set FFT for all images.
Args:
enable(bool): Whether to perform FFT on the monitor data.
name(str): The name of the image. If None, apply to all images.
"""
self._image.set_fft(enable, name)
self.toolbar.widgets["FFT"].action.setChecked(enable)
def set_transpose(self, enable: bool = False, name: str = None):
"""
Set the transpose of the image.
If name is not specified, then set transpose for all images.
Args:
enable(bool): Whether to transpose the monitor data before displaying.
name(str): The name of the image. If None, apply to all images.
"""
self._image.set_transpose(enable, name)
self.toolbar.widgets["transpose"].action.setChecked(enable)
def set_rotation(self, deg_90: int = 0, name: str = None):
"""
Set the rotation of the image.
If name is not specified, then set rotation for all images.
Args:
deg_90(int): The rotation angle of the monitor data before displaying.
name(str): The name of the image. If None, apply to all images.
"""
self._image.set_rotation(deg_90, name)
def set_log(self, enable: bool = False, name: str = None):
"""
Set the log of the image.
If name is not specified, then set log for all images.
Args:
enable(bool): Whether to perform log on the monitor data.
name(str): The name of the image. If None, apply to all images.
"""
self._image.set_log(enable, name)
self.toolbar.widgets["log"].action.setChecked(enable)
###################################
# User Access Methods from Plotbase
###################################
def set(self, **kwargs):
"""
Set the properties of the plot widget.
Args:
**kwargs: Keyword arguments for the properties to be set.
Possible properties:
- title: str
- x_label: str
- y_label: str
- x_scale: Literal["linear", "log"]
- y_scale: Literal["linear", "log"]
- x_lim: tuple
- y_lim: tuple
- legend_label_size: int
"""
self._image.set(**kwargs)
def set_title(self, title: str):
"""
Set the title of the plot widget.
Args:
title(str): Title of the plot.
"""
self._image.set_title(title)
def set_x_label(self, x_label: str):
"""
Set the x-axis label of the plot widget.
Args:
x_label(str): Label of the x-axis.
"""
self._image.set_x_label(x_label)
def set_y_label(self, y_label: str):
"""
Set the y-axis label of the plot widget.
Args:
y_label(str): Label of the y-axis.
"""
self._image.set_y_label(y_label)
def set_x_scale(self, x_scale: Literal["linear", "log"]):
"""
Set the scale of the x-axis of the plot widget.
Args:
x_scale(Literal["linear", "log"]): Scale of the x-axis.
"""
self._image.set_x_scale(x_scale)
def set_y_scale(self, y_scale: Literal["linear", "log"]):
"""
Set the scale of the y-axis of the plot widget.
Args:
y_scale(Literal["linear", "log"]): Scale of the y-axis.
"""
self._image.set_y_scale(y_scale)
def set_x_lim(self, x_lim: tuple):
"""
Set the limits of the x-axis of the plot widget.
Args:
x_lim(tuple): Limits of the x-axis.
"""
self._image.set_x_lim(x_lim)
def set_y_lim(self, y_lim: tuple):
"""
Set the limits of the y-axis of the plot widget.
Args:
y_lim(tuple): Limits of the y-axis.
"""
self._image.set_y_lim(y_lim)
def set_grid(self, x_grid: bool, y_grid: bool):
"""
Set the grid visibility of the plot widget.
Args:
x_grid(bool): Visibility of the x-axis grid.
y_grid(bool): Visibility of the y-axis grid.
"""
self._image.set_grid(x_grid, y_grid)
def lock_aspect_ratio(self, lock: bool):
"""
Lock the aspect ratio of the plot widget.
Args:
lock(bool): Lock the aspect ratio.
"""
self._image.lock_aspect_ratio(lock)
###################################
# Toolbar Actions
###################################
@SafeSlot()
def toggle_auto_range(self):
"""
Set the auto range of the plot widget from the toolbar.
"""
self._image.set_auto_range(True, "xy")
@SafeSlot()
def toggle_fft(self):
checked = self.toolbar.widgets["FFT"].action.isChecked()
self.set_fft(checked)
@SafeSlot()
def toggle_log(self):
checked = self.toolbar.widgets["log"].action.isChecked()
self.set_log(checked)
@SafeSlot()
def toggle_transpose(self):
checked = self.toolbar.widgets["transpose"].action.isChecked()
self.set_transpose(checked)
@SafeSlot()
def rotate_left(self):
self.rotation = (self.rotation + 1) % 4
self.set_rotation(self.rotation)
@SafeSlot()
def rotate_right(self):
self.rotation = (self.rotation - 1) % 4
self.set_rotation(self.rotation)
@SafeSlot()
def reset_settings(self):
self.set_log(False)
self.set_fft(False)
self.set_transpose(False)
self.rotation = 0
self.set_rotation(0)
self.toolbar.widgets["FFT"].action.setChecked(False)
self.toolbar.widgets["log"].action.setChecked(False)
self.toolbar.widgets["transpose"].action.setChecked(False)
@SafeSlot()
def toggle_image_autorange(self):
"""
Enable the auto range of the image intensity.
"""
checked = self.toolbar.widgets["auto_range_image"].action.isChecked()
self._image.set_autorange(checked)
@SafeSlot()
def toggle_aspect_ratio(self):
"""
Enable the auto range of the image intensity.
"""
checked = self.toolbar.widgets["aspect_ratio"].action.isChecked()
self._image.lock_aspect_ratio(checked)
@SafeSlot()
def enable_mouse_rectangle_mode(self):
self.toolbar.widgets["rectangle_mode"].action.setChecked(True)
self.toolbar.widgets["drag_mode"].action.setChecked(False)
self._image.plot_item.getViewBox().setMouseMode(pg.ViewBox.RectMode)
@SafeSlot()
def enable_mouse_pan_mode(self):
self.toolbar.widgets["drag_mode"].action.setChecked(True)
self.toolbar.widgets["rectangle_mode"].action.setChecked(False)
self._image.plot_item.getViewBox().setMouseMode(pg.ViewBox.PanMode)
@SafeSlot()
def enable_fps_monitor(self, enabled: bool):
"""
Enable the FPS monitor of the plot widget.
Args:
enabled(bool): If True, enable the FPS monitor.
"""
self._image.enable_fps_monitor(enabled)
if self.toolbar.widgets["fps_monitor"].action.isChecked() != enabled:
self.toolbar.widgets["fps_monitor"].action.setChecked(enabled)
def export(self):
"""
Show the export dialog for the plot widget.
"""
self._image.export()
def cleanup(self):
self.fig.cleanup()
self.toolbar.close()
self.toolbar.deleteLater()
return super().cleanup()
def main(): # pragma: no cover
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
widget = BECImageWidget()
widget.image("waveform", "1d")
widget.show()
sys.exit(app.exec_())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -6,9 +6,9 @@ def main(): # pragma: no cover
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.plots_next_gen.image.image_plugin import ImagePlugin
from bec_widgets.widgets.plots.image.bec_image_widget_plugin import BECImageWidgetPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(ImagePlugin())
QPyDesignerCustomWidgetCollection.addCustomWidget(BECImageWidgetPlugin())
if __name__ == "__main__": # pragma: no cover

View File

@@ -1,943 +0,0 @@
from __future__ import annotations
from typing import Literal
import numpy as np
import pyqtgraph as pg
from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
from pydantic import Field, ValidationError, field_validator
from qtpy.QtCore import QPointF, Signal
from qtpy.QtWidgets import QWidget
from bec_widgets.qt_utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.qt_utils.toolbar import MaterialIconAction, SwitchableToolBarAction
from bec_widgets.utils import ConnectionConfig
from bec_widgets.utils.colors import Colors
from bec_widgets.widgets.plots_next_gen.image.image_item import ImageItem
from bec_widgets.widgets.plots_next_gen.image.toolbar_bundles.image_selection import (
MonitorSelectionToolbarBundle,
)
from bec_widgets.widgets.plots_next_gen.image.toolbar_bundles.processing import (
ImageProcessingToolbarBundle,
)
from bec_widgets.widgets.plots_next_gen.plot_base import PlotBase
logger = bec_logger.logger
# noinspection PyDataclass
class ImageConfig(ConnectionConfig):
color_map: str = Field(
"magma", description="The colormap of the figure widget.", validate_default=True
)
color_bar: Literal["full", "simple"] | None = Field(
None, description="The type of the color bar."
)
lock_aspect_ratio: bool = Field(
False, description="Whether to lock the aspect ratio of the image."
)
model_config: dict = {"validate_assignment": True}
_validate_color_map = field_validator("color_map")(Colors.validate_color_map)
class Image(PlotBase):
PLUGIN = True
RPC = True
ICON_NAME = "image"
USER_ACCESS = [
# General PlotBase Settings
"enable_toolbar",
"enable_toolbar.setter",
"enable_side_panel",
"enable_side_panel.setter",
"enable_fps_monitor",
"enable_fps_monitor.setter",
"set",
"title",
"title.setter",
"x_label",
"x_label.setter",
"y_label",
"y_label.setter",
"x_limits",
"x_limits.setter",
"y_limits",
"y_limits.setter",
"x_grid",
"x_grid.setter",
"y_grid",
"y_grid.setter",
"inner_axes",
"inner_axes.setter",
"outer_axes",
"outer_axes.setter",
"auto_range_x",
"auto_range_x.setter",
"auto_range_y",
"auto_range_y.setter",
"x_log",
"x_log.setter",
"y_log",
"y_log.setter",
"legend_label_size",
"legend_label_size.setter",
# ImageView Specific Settings
"color_map",
"color_map.setter",
"vrange",
"vrange.setter",
"v_min",
"v_min.setter",
"v_max",
"v_max.setter",
"lock_aspect_ratio",
"lock_aspect_ratio.setter",
"autorange",
"autorange.setter",
"autorange_mode",
"autorange_mode.setter",
"monitor",
"monitor.setter",
"enable_colorbar",
"enable_simple_colorbar",
"enable_simple_colorbar.setter",
"enable_full_colorbar",
"enable_full_colorbar.setter",
"fft",
"fft.setter",
"log",
"log.setter",
"rotation",
"rotation.setter",
"transpose",
"transpose.setter",
"image",
"main_image",
]
sync_colorbar_with_autorange = Signal()
def __init__(
self,
parent: QWidget | None = None,
config: ImageConfig | None = None,
client=None,
gui_id: str | None = None,
popups: bool = True,
**kwargs,
):
self._main_image = ImageItem(parent_image=self)
self._color_bar = None
if config is None:
config = ImageConfig(widget_class=self.__class__.__name__)
super().__init__(
parent=parent, config=config, client=client, gui_id=gui_id, popups=popups, **kwargs
)
# For PropertyManager identification
self.setObjectName("Image")
self.plot_item.addItem(self._main_image)
self.scan_id = None
# Default Color map to magma
self.color_map = "magma"
################################################################################
# Widget Specific GUI interactions
################################################################################
def _init_toolbar(self):
# add to the first position
self.selection_bundle = MonitorSelectionToolbarBundle(
bundle_id="selection", target_widget=self
)
self.toolbar.add_bundle(self.selection_bundle, self)
super()._init_toolbar()
# Image specific changes to PlotBase toolbar
self.toolbar.widgets["reset_legend"].action.setVisible(False)
# Lock aspect ratio button
self.lock_aspect_ratio_action = MaterialIconAction(
icon_name="aspect_ratio", tooltip="Lock Aspect Ratio", checkable=True, parent=self
)
self.toolbar.add_action_to_bundle(
bundle_id="mouse_interaction",
action_id="lock_aspect_ratio",
action=self.lock_aspect_ratio_action,
target_widget=self,
)
self.lock_aspect_ratio_action.action.toggled.connect(
lambda checked: self.setProperty("lock_aspect_ratio", checked)
)
self.lock_aspect_ratio_action.action.setChecked(True)
self._init_autorange_action()
self._init_colorbar_action()
# Processing Bundle
self.processing_bundle = ImageProcessingToolbarBundle(
bundle_id="processing", target_widget=self
)
self.toolbar.add_bundle(self.processing_bundle, target_widget=self)
def _init_autorange_action(self):
self.autorange_mean_action = MaterialIconAction(
icon_name="hdr_auto", tooltip="Enable Auto Range (Mean)", checkable=True, parent=self
)
self.autorange_max_action = MaterialIconAction(
icon_name="hdr_auto",
tooltip="Enable Auto Range (Max)",
checkable=True,
filled=True,
parent=self,
)
self.autorange_switch = SwitchableToolBarAction(
actions={
"auto_range_mean": self.autorange_mean_action,
"auto_range_max": self.autorange_max_action,
},
initial_action="auto_range_mean",
tooltip="Enable Auto Range",
checkable=True,
parent=self,
)
self.toolbar.add_action_to_bundle(
bundle_id="roi",
action_id="autorange_image",
action=self.autorange_switch,
target_widget=self,
)
self.autorange_mean_action.action.toggled.connect(
lambda checked: self.toggle_autorange(checked, mode="mean")
)
self.autorange_max_action.action.toggled.connect(
lambda checked: self.toggle_autorange(checked, mode="max")
)
self.autorange = True
self.autorange_mode = "mean"
def _init_colorbar_action(self):
self.full_colorbar_action = MaterialIconAction(
icon_name="edgesensor_low", tooltip="Enable Full Colorbar", checkable=True, parent=self
)
self.simple_colorbar_action = MaterialIconAction(
icon_name="smartphone", tooltip="Enable Simple Colorbar", checkable=True, parent=self
)
self.colorbar_switch = SwitchableToolBarAction(
actions={
"full_colorbar": self.full_colorbar_action,
"simple_colorbar": self.simple_colorbar_action,
},
initial_action="full_colorbar",
tooltip="Enable Full Colorbar",
checkable=True,
parent=self,
)
self.toolbar.add_action_to_bundle(
bundle_id="roi",
action_id="switch_colorbar",
action=self.colorbar_switch,
target_widget=self,
)
self.simple_colorbar_action.action.toggled.connect(
lambda checked: self.enable_colorbar(checked, style="simple")
)
self.full_colorbar_action.action.toggled.connect(
lambda checked: self.enable_colorbar(checked, style="full")
)
def enable_colorbar(
self,
enabled: bool,
style: Literal["full", "simple"] = "full",
vrange: tuple[int, int] | None = None,
):
"""
Enable the colorbar and switch types of colorbars.
Args:
enabled(bool): Whether to enable the colorbar.
style(Literal["full", "simple"]): The type of colorbar to enable.
vrange(tuple): The range of values to use for the colorbar.
"""
autorange_state = self._main_image.autorange
if enabled:
if self._color_bar:
if self.config.color_bar == "full":
self.cleanup_histogram_lut_item(self._color_bar)
self.plot_widget.removeItem(self._color_bar)
self._color_bar = None
if style == "simple":
self._color_bar = pg.ColorBarItem(colorMap=self.config.color_map)
self._color_bar.setImageItem(self._main_image)
self._color_bar.sigLevelsChangeFinished.connect(
lambda: self.setProperty("autorange", False)
)
elif style == "full":
self._color_bar = pg.HistogramLUTItem()
self._color_bar.setImageItem(self._main_image)
self._color_bar.gradient.loadPreset(self.config.color_map)
self._color_bar.sigLevelsChanged.connect(
lambda: self.setProperty("autorange", False)
)
self.plot_widget.addItem(self._color_bar, row=0, col=1)
self.config.color_bar = style
else:
if self._color_bar:
self.plot_widget.removeItem(self._color_bar)
self._color_bar = None
self.config.color_bar = None
self.autorange = autorange_state
self._sync_colorbar_actions()
if vrange: # should be at the end to disable the autorange if defined
self.v_range = vrange
################################################################################
# Widget Specific Properties
################################################################################
################################################################################
# Colorbar toggle
@SafeProperty(bool)
def enable_simple_colorbar(self) -> bool:
"""
Enable the simple colorbar.
"""
enabled = False
if self.config.color_bar == "simple":
enabled = True
return enabled
@enable_simple_colorbar.setter
def enable_simple_colorbar(self, value: bool):
"""
Enable the simple colorbar.
Args:
value(bool): Whether to enable the simple colorbar.
"""
self.enable_colorbar(enabled=value, style="simple")
@SafeProperty(bool)
def enable_full_colorbar(self) -> bool:
"""
Enable the full colorbar.
"""
enabled = False
if self.config.color_bar == "full":
enabled = True
return enabled
@enable_full_colorbar.setter
def enable_full_colorbar(self, value: bool):
"""
Enable the full colorbar.
Args:
value(bool): Whether to enable the full colorbar.
"""
self.enable_colorbar(enabled=value, style="full")
################################################################################
# Appearance
@SafeProperty(str)
def color_map(self) -> str:
"""
Set the color map of the image.
"""
return self.config.color_map
@color_map.setter
def color_map(self, value: str):
"""
Set the color map of the image.
Args:
value(str): The color map to set.
"""
try:
self.config.color_map = value
self._main_image.color_map = value
if self._color_bar:
if self.config.color_bar == "simple":
self._color_bar.setColorMap(value)
elif self.config.color_bar == "full":
self._color_bar.gradient.loadPreset(value)
except ValidationError:
return
# v_range is for designer, vrange is for RPC
@SafeProperty("QPointF")
def v_range(self) -> QPointF:
"""
Set the v_range of the main image.
"""
vmin, vmax = self._main_image.v_range
return QPointF(vmin, vmax)
@v_range.setter
def v_range(self, value: tuple | list | QPointF):
"""
Set the v_range of the main image.
Args:
value(tuple | list | QPointF): The range of values to set.
"""
if isinstance(value, (tuple, list)):
value = self._tuple_to_qpointf(value)
vmin, vmax = value.x(), value.y()
self._main_image.v_range = (vmin, vmax)
# propagate to colorbar if exists
if self._color_bar:
if self.config.color_bar == "simple":
self._color_bar.setLevels(low=vmin, high=vmax)
elif self.config.color_bar == "full":
self._color_bar.setLevels(min=vmin, max=vmax)
self._color_bar.setHistogramRange(vmin - 0.1 * vmin, vmax + 0.1 * vmax)
self.autorange_switch.set_state_all(False)
@property
def vrange(self) -> tuple:
"""
Get the vrange of the image.
"""
return (self.v_range.x(), self.v_range.y())
@vrange.setter
def vrange(self, value):
"""
Set the vrange of the image.
Args:
value(tuple):
"""
self.v_range = value
@property
def v_min(self) -> float:
"""
Get the minimum value of the v_range.
"""
return self.v_range.x()
@v_min.setter
def v_min(self, value: float):
"""
Set the minimum value of the v_range.
Args:
value(float): The minimum value to set.
"""
self.v_range = (value, self.v_range.y())
@property
def v_max(self) -> float:
"""
Get the maximum value of the v_range.
"""
return self.v_range.y()
@v_max.setter
def v_max(self, value: float):
"""
Set the maximum value of the v_range.
Args:
value(float): The maximum value to set.
"""
self.v_range = (self.v_range.x(), value)
@SafeProperty(bool)
def lock_aspect_ratio(self) -> bool:
"""
Whether the aspect ratio is locked.
"""
return self.config.lock_aspect_ratio
@lock_aspect_ratio.setter
def lock_aspect_ratio(self, value: bool):
"""
Set the aspect ratio lock.
Args:
value(bool): Whether to lock the aspect ratio.
"""
self.config.lock_aspect_ratio = bool(value)
self.plot_item.setAspectLocked(value)
################################################################################
# Data Acquisition
@SafeProperty(str)
def monitor(self) -> str:
"""
The name of the monitor to use for the image.
"""
return self._main_image.config.monitor
@monitor.setter
def monitor(self, value: str):
"""
Set the monitor for the image.
Args:
value(str): The name of the monitor to set.
"""
if self._main_image.config.monitor == value:
return
try:
self.entry_validator.validate_monitor(value)
except ValueError:
return
self.image(monitor=value)
@property
def main_image(self) -> ImageItem:
"""Access the main image item."""
return self._main_image
################################################################################
# Autorange + Colorbar sync
@SafeProperty(bool)
def autorange(self) -> bool:
"""
Whether autorange is enabled.
"""
return self._main_image.autorange
@autorange.setter
def autorange(self, enabled: bool):
"""
Set autorange.
Args:
enabled(bool): Whether to enable autorange.
"""
self._main_image.autorange = enabled
if enabled and self._main_image.raw_data is not None:
self._main_image.apply_autorange()
self._sync_colorbar_levels()
self._sync_autorange_switch()
@SafeProperty(str)
def autorange_mode(self) -> str:
"""
Autorange mode.
Options:
- "max": Use the maximum value of the image for autoranging.
- "mean": Use the mean value of the image for autoranging.
"""
return self._main_image.autorange_mode
@autorange_mode.setter
def autorange_mode(self, mode: str):
"""
Set the autorange mode.
Args:
mode(str): The autorange mode. Options are "max" or "mean".
"""
# for qt Designer
if mode not in ["max", "mean"]:
return
self._main_image.autorange_mode = mode
self._sync_autorange_switch()
@SafeSlot(bool, str, bool)
def toggle_autorange(self, enabled: bool, mode: str):
"""
Toggle autorange.
Args:
enabled(bool): Whether to enable autorange.
mode(str): The autorange mode. Options are "max" or "mean".
"""
if self._main_image is not None:
self._main_image.autorange = enabled
self._main_image.autorange_mode = mode
if enabled:
self._main_image.apply_autorange()
self._sync_colorbar_levels()
def _sync_autorange_switch(self):
"""
Synchronize the autorange switch with the current autorange state and mode if changed from outside.
"""
self.autorange_switch.block_all_signals(True)
self.autorange_switch.set_default_action(f"auto_range_{self._main_image.autorange_mode}")
self.autorange_switch.set_state_all(self._main_image.autorange)
self.autorange_switch.block_all_signals(False)
def _sync_colorbar_levels(self):
"""Immediately propagate current levels to the active colorbar."""
vrange = self._main_image.v_range
if self._color_bar:
self._color_bar.blockSignals(True)
self.v_range = vrange
self._color_bar.blockSignals(False)
def _sync_colorbar_actions(self):
"""
Synchronize the colorbar actions with the current colorbar state.
"""
self.colorbar_switch.block_all_signals(True)
if self._color_bar is not None:
self.colorbar_switch.set_default_action(f"{self.config.color_bar}_colorbar")
self.colorbar_switch.set_state_all(True)
else:
self.colorbar_switch.set_state_all(False)
self.colorbar_switch.block_all_signals(False)
################################################################################
# Post Processing
################################################################################
@SafeProperty(bool)
def fft(self) -> bool:
"""
Whether FFT postprocessing is enabled.
"""
return self._main_image.fft
@fft.setter
def fft(self, enable: bool):
"""
Set FFT postprocessing.
Args:
enable(bool): Whether to enable FFT postprocessing.
"""
self._main_image.fft = enable
@SafeProperty(bool)
def log(self) -> bool:
"""
Whether logarithmic scaling is applied.
"""
return self._main_image.log
@log.setter
def log(self, enable: bool):
"""
Set logarithmic scaling.
Args:
enable(bool): Whether to enable logarithmic scaling.
"""
self._main_image.log = enable
@SafeProperty(int)
def rotation(self) -> int:
"""
The number of 90° rotations to apply.
"""
return self._main_image.rotation
@rotation.setter
def rotation(self, value: int):
"""
Set the number of 90° rotations to apply.
Args:
value(int): The number of 90° rotations to apply.
"""
self._main_image.rotation = value
@SafeProperty(bool)
def transpose(self) -> bool:
"""
Whether the image is transposed.
"""
return self._main_image.transpose
@transpose.setter
def transpose(self, enable: bool):
"""
Set the image to be transposed.
Args:
enable(bool): Whether to enable transposing the image.
"""
self._main_image.transpose = enable
################################################################################
# High Level methods for API
################################################################################
@SafeSlot(popup_error=True)
def image(
self,
monitor: str | None = None,
monitor_type: Literal["auto", "1d", "2d"] = "auto",
color_map: str | None = None,
color_bar: Literal["simple", "full"] | None = None,
vrange: tuple[int, int] | None = None,
) -> ImageItem:
"""
Set the image source and update the image.
Args:
monitor(str): The name of the monitor to use for the image.
monitor_type(str): The type of monitor to use. Options are "1d", "2d", or "auto".
color_map(str): The color map to use for the image.
color_bar(str): The type of color bar to use. Options are "simple" or "full".
vrange(tuple): The range of values to use for the color map.
Returns:
ImageItem: The image object.
"""
if self._main_image.config.monitor is not None:
self.disconnect_monitor(self._main_image.config.monitor)
self.entry_validator.validate_monitor(monitor)
self._main_image.config.monitor = monitor
if monitor_type == "1d":
self._main_image.config.source = "device_monitor_1d"
self._main_image.config.monitor_type = "1d"
elif monitor_type == "2d":
self._main_image.config.source = "device_monitor_2d"
self._main_image.config.monitor_type = "2d"
elif monitor_type == "auto":
self._main_image.config.source = "auto"
logger.warning(
f"Updates for '{monitor}' will be fetch from both 1D and 2D monitor endpoints."
)
self._main_image.config.monitor_type = "auto"
self.set_image_update(monitor=monitor, type=monitor_type)
if color_map is not None:
self._main_image.color_map = color_map
if color_bar is not None:
self.enable_colorbar(True, color_bar)
if vrange is not None:
self.vrange = vrange
self._sync_device_selection()
return self._main_image
def _sync_device_selection(self):
"""
Synchronize the device selection with the current monitor.
"""
if self._main_image.config.monitor is not None:
for combo in (
self.selection_bundle.device_combo_box,
self.selection_bundle.dim_combo_box,
):
combo.blockSignals(True)
self.selection_bundle.device_combo_box.set_device(self._main_image.config.monitor)
self.selection_bundle.dim_combo_box.setCurrentText(self._main_image.config.monitor_type)
for combo in (
self.selection_bundle.device_combo_box,
self.selection_bundle.dim_combo_box,
):
combo.blockSignals(False)
################################################################################
# Image Update Methods
################################################################################
########################################
# Connections
def set_image_update(self, monitor: str, type: Literal["1d", "2d", "auto"]):
"""
Set the image update method for the given monitor.
Args:
monitor(str): The name of the monitor to use for the image.
type(str): The type of monitor to use. Options are "1d", "2d", or "auto".
"""
# TODO consider moving connecting and disconnecting logic to Image itself if multiple images
if type == "1d":
self.bec_dispatcher.connect_slot(
self.on_image_update_1d, MessageEndpoints.device_monitor_1d(monitor)
)
elif type == "2d":
self.bec_dispatcher.connect_slot(
self.on_image_update_2d, MessageEndpoints.device_monitor_2d(monitor)
)
elif type == "auto":
self.bec_dispatcher.connect_slot(
self.on_image_update_1d, MessageEndpoints.device_monitor_1d(monitor)
)
self.bec_dispatcher.connect_slot(
self.on_image_update_2d, MessageEndpoints.device_monitor_2d(monitor)
)
print(f"Connected to {monitor} with type {type}")
self._main_image.config.monitor = monitor
def disconnect_monitor(self, monitor: str):
"""
Disconnect the monitor from the image update signals, both 1D and 2D.
Args:
monitor(str): The name of the monitor to disconnect.
"""
self.bec_dispatcher.disconnect_slot(
self.on_image_update_1d, MessageEndpoints.device_monitor_1d(monitor)
)
self.bec_dispatcher.disconnect_slot(
self.on_image_update_2d, MessageEndpoints.device_monitor_2d(monitor)
)
self._main_image.config.monitor = None
########################################
# 1D updates
@SafeSlot(dict, dict)
def on_image_update_1d(self, msg: dict, metadata: dict):
"""
Update the image with 1D data.
Args:
msg(dict): The message containing the data.
metadata(dict): The metadata associated with the message.
"""
data = msg["data"]
current_scan_id = metadata.get("scan_id", None)
if current_scan_id is None:
return
if current_scan_id != self.scan_id:
self.scan_id = current_scan_id
self._main_image.clear()
self._main_image.buffer = []
self._main_image.max_len = 0
image_buffer = self.adjust_image_buffer(self._main_image, data)
if self._color_bar is not None:
self._color_bar.blockSignals(True)
self._main_image.set_data(image_buffer)
if self._color_bar is not None:
self._color_bar.blockSignals(False)
def adjust_image_buffer(self, image: ImageItem, new_data: np.ndarray) -> np.ndarray:
"""
Adjusts the image buffer to accommodate the new data, ensuring that all rows have the same length.
Args:
image: The image object (used to store a buffer list and max_len).
new_data (np.ndarray): The new incoming 1D waveform data.
Returns:
np.ndarray: The updated image buffer with adjusted shapes.
"""
new_len = new_data.shape[0]
if not hasattr(image, "buffer"):
image.buffer = []
image.max_len = 0
if new_len > image.max_len:
image.max_len = new_len
for i in range(len(image.buffer)):
wf = image.buffer[i]
pad_width = image.max_len - wf.shape[0]
if pad_width > 0:
image.buffer[i] = np.pad(wf, (0, pad_width), mode="constant", constant_values=0)
image.buffer.append(new_data)
else:
pad_width = image.max_len - new_len
if pad_width > 0:
new_data = np.pad(new_data, (0, pad_width), mode="constant", constant_values=0)
image.buffer.append(new_data)
image_buffer = np.array(image.buffer)
return image_buffer
########################################
# 2D updates
def on_image_update_2d(self, msg: dict, metadata: dict):
"""
Update the image with 2D data.
Args:
msg(dict): The message containing the data.
metadata(dict): The metadata associated with the message.
"""
data = msg["data"]
if self._color_bar is not None:
self._color_bar.blockSignals(True)
self._main_image.set_data(data)
if self._color_bar is not None:
self._color_bar.blockSignals(False)
################################################################################
# Clean up
################################################################################
@staticmethod
def cleanup_histogram_lut_item(histogram_lut_item: pg.HistogramLUTItem):
"""
Clean up HistogramLUTItem safely, including open ViewBox menus and child widgets.
Args:
histogram_lut_item(pg.HistogramLUTItem): The HistogramLUTItem to clean up.
"""
histogram_lut_item.vb.menu.close()
histogram_lut_item.vb.menu.deleteLater()
histogram_lut_item.gradient.menu.close()
histogram_lut_item.gradient.menu.deleteLater()
histogram_lut_item.gradient.colorDialog.close()
histogram_lut_item.gradient.colorDialog.deleteLater()
def cleanup(self):
"""
Disconnect the image update signals and clean up the image.
"""
if self._main_image.config.monitor is not None:
self.disconnect_monitor(self._main_image.config.monitor)
self._main_image.config.monitor = None
if self._color_bar:
if self.config.color_bar == "full":
self.cleanup_histogram_lut_item(self._color_bar)
if self.config.color_bar == "simple":
self.plot_widget.removeItem(self._color_bar)
self._color_bar.deleteLater()
self._color_bar = None
super().cleanup()
if __name__ == "__main__": # pragma: no cover
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
widget = Image(popups=True)
widget.show()
widget.resize(1000, 800)
sys.exit(app.exec_())

View File

@@ -1 +0,0 @@
{'files': ['image.py']}

View File

@@ -1,260 +0,0 @@
from __future__ import annotations
from typing import Literal, Optional
import numpy as np
import pyqtgraph as pg
from bec_lib.logger import bec_logger
from pydantic import Field, ValidationError, field_validator
from qtpy.QtCore import Signal
from bec_widgets.utils import BECConnector, Colors, ConnectionConfig
from bec_widgets.widgets.plots_next_gen.image.image_processor import (
ImageProcessor,
ImageStats,
ProcessingConfig,
)
logger = bec_logger.logger
# noinspection PyDataclass
class ImageItemConfig(ConnectionConfig): # TODO review config
parent_id: str | None = Field(None, description="The parent plot of the image.")
monitor: str | None = Field(None, description="The name of the monitor.")
monitor_type: Literal["1d", "2d", "auto"] = Field("auto", description="The type of monitor.")
source: str | None = Field(None, description="The source of the curve.")
color_map: str | None = Field("magma", description="The color map of the image.")
downsample: bool | None = Field(True, description="Whether to downsample the image.")
opacity: float | None = Field(1.0, description="The opacity of the image.")
v_range: tuple[float | int, float | int] | None = Field(
None, description="The range of the color bar. If None, the range is automatically set."
)
autorange: bool | None = Field(True, description="Whether to autorange the color bar.")
autorange_mode: Literal["max", "mean"] = Field(
"mean", description="Whether to use the mean of the image for autoscaling."
)
processing: ProcessingConfig = Field(
default_factory=ProcessingConfig, description="The post processing of the image."
)
model_config: dict = {"validate_assignment": True}
_validate_color_map = field_validator("color_map")(Colors.validate_color_map)
class ImageItem(BECConnector, pg.ImageItem):
RPC = True
USER_ACCESS = [
"color_map",
"color_map.setter",
"v_range",
"v_range.setter",
"v_min",
"v_min.setter",
"v_max",
"v_max.setter",
"autorange",
"autorange.setter",
"autorange_mode",
"autorange_mode.setter",
"fft",
"fft.setter",
"log",
"log.setter",
"rotation",
"rotation.setter",
"transpose",
"transpose.setter",
]
vRangeChangedManually = Signal(tuple)
def __init__(
self,
config: Optional[ImageItemConfig] = None,
gui_id: Optional[str] = None,
parent_image=None,
**kwargs,
):
if config is None:
config = ImageItemConfig(widget_class=self.__class__.__name__)
self.config = config
else:
self.config = config
super().__init__(config=config, gui_id=gui_id)
pg.ImageItem.__init__(self)
self.parent_image = parent_image
self.raw_data = None
self.buffer = []
self.max_len = 0
# Image processor will handle any setting of data
self._image_processor = ImageProcessor(config=self.config.processing)
def set_data(self, data: np.ndarray):
self.raw_data = data
self._process_image()
################################################################################
# Properties
@property
def color_map(self) -> str:
"""Get the current color map."""
return self.config.color_map
@color_map.setter
def color_map(self, value: str):
"""Set a new color map."""
try:
self.config.color_map = value
self.setColorMap(value)
except ValidationError:
logger.error(f"Invalid colormap '{value}' provided.")
@property
def v_range(self) -> tuple[float, float]:
"""
Get the color intensity range of the image.
"""
if self.levels is not None:
return tuple(float(x) for x in self.levels)
return 0.0, 1.0
@v_range.setter
def v_range(self, vrange: tuple[float, float]):
"""
Set the color intensity range of the image.
"""
self.set_v_range(vrange, disable_autorange=True)
def set_v_range(self, vrange: tuple[float, float], disable_autorange=True):
if disable_autorange:
self.config.autorange = False
self.vRangeChangedManually.emit(vrange)
self.setLevels(vrange)
self.config.v_range = vrange
@property
def v_min(self) -> float:
return self.v_range[0]
@v_min.setter
def v_min(self, value: float):
self.v_range = (value, self.v_range[1])
@property
def v_max(self) -> float:
return self.v_range[1]
@v_max.setter
def v_max(self, value: float):
self.v_range = (self.v_range[0], value)
################################################################################
# Autorange Logic
@property
def autorange(self) -> bool:
return self.config.autorange
@autorange.setter
def autorange(self, value: bool):
self.config.autorange = value
if value:
self.apply_autorange()
@property
def autorange_mode(self) -> Literal["max", "mean"]:
return self.config.autorange_mode
@autorange_mode.setter
def autorange_mode(self, mode: Literal["max", "mean"]):
self.config.autorange_mode = mode
if self.autorange:
self.apply_autorange()
def apply_autorange(self):
if self.raw_data is None:
return
data = self.image
if data is None:
data = self.raw_data
stats = ImageStats.from_data(data)
self.auto_update_vrange(stats)
def auto_update_vrange(self, stats: ImageStats) -> None:
"""Update the v_range based on the stats of the image."""
fumble_factor = 2
if self.config.autorange_mode == "mean":
vmin = max(stats.mean - fumble_factor * stats.std, 0)
vmax = stats.mean + fumble_factor * stats.std
elif self.config.autorange_mode == "max":
vmin, vmax = stats.minimum, stats.maximum
else:
return
self.set_v_range(vrange=(vmin, vmax), disable_autorange=False)
################################################################################
# Data Processing Logic
def _process_image(self):
"""
Reprocess the current raw data and update the image display.
"""
if self.raw_data is not None:
autorange = self.config.autorange
self._image_processor.set_config(self.config.processing)
processed_data = self._image_processor.process_image(self.raw_data)
self.setImage(processed_data, autoLevels=False)
self.autorange = autorange
@property
def fft(self) -> bool:
"""Get or set whether FFT postprocessing is enabled."""
return self.config.processing.fft
@fft.setter
def fft(self, enable: bool):
self.config.processing.fft = enable
self._process_image()
@property
def log(self) -> bool:
"""Get or set whether logarithmic scaling is applied."""
return self.config.processing.log
@log.setter
def log(self, enable: bool):
self.config.processing.log = enable
self._process_image()
@property
def rotation(self) -> Optional[int]:
"""Get or set the number of 90° rotations to apply."""
return self.config.processing.rotation
@rotation.setter
def rotation(self, value: Optional[int]):
self.config.processing.rotation = value
self._process_image()
@property
def transpose(self) -> bool:
"""Get or set whether the image is transposed."""
return self.config.processing.transpose
@transpose.setter
def transpose(self, enable: bool):
self.config.processing.transpose = enable
self._process_image()
################################################################################
# Data Update Logic
def clear(self):
super().clear()
self.raw_data = None
self.buffer = []
self.max_len = 0

View File

@@ -1,150 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
from pydantic import BaseModel, Field
from qtpy.QtCore import QObject, Signal
@dataclass
class ImageStats:
"""Container to store stats of an image."""
maximum: float
minimum: float
mean: float
std: float
@classmethod
def from_data(cls, data: np.ndarray) -> ImageStats:
"""
Get the statistics of the image data.
Args:
data(np.ndarray): The image data.
Returns:
ImageStats: The statistics of the image data.
"""
return cls(maximum=np.max(data), minimum=np.min(data), mean=np.mean(data), std=np.std(data))
# noinspection PyDataclass
class ProcessingConfig(BaseModel):
fft: bool = Field(False, description="Whether to perform FFT on the monitor data.")
log: bool = Field(False, description="Whether to perform log on the monitor data.")
transpose: bool = Field(
False, description="Whether to transpose the monitor data before displaying."
)
rotation: int = Field(
0, description="The rotation angle of the monitor data before displaying."
)
stats: ImageStats = Field(
ImageStats(maximum=0, minimum=0, mean=0, std=0),
description="The statistics of the image data.",
)
model_config: dict = {"validate_assignment": True}
class ImageProcessor(QObject):
"""
Class for processing the image data.
"""
image_processed = Signal(np.ndarray)
def __init__(self, parent=None, config: ProcessingConfig = None):
super().__init__(parent=parent)
if config is None:
config = ProcessingConfig()
self.config = config
self._current_thread = None
def set_config(self, config: ProcessingConfig):
"""
Set the configuration of the processor.
Args:
config(ProcessingConfig): The configuration of the processor.
"""
self.config = config
def FFT(self, data: np.ndarray) -> np.ndarray:
"""
Perform FFT on the data.
Args:
data(np.ndarray): The data to be processed.
Returns:
np.ndarray: The processed data.
"""
return np.abs(np.fft.fftshift(np.fft.fft2(data)))
def rotation(self, data: np.ndarray, rotate_90: int) -> np.ndarray:
"""
Rotate the data by 90 degrees n times.
Args:
data(np.ndarray): The data to be processed.
rotate_90(int): The number of 90 degree rotations.
Returns:
np.ndarray: The processed data.
"""
return np.rot90(data, k=rotate_90, axes=(0, 1))
def transpose(self, data: np.ndarray) -> np.ndarray:
"""
Transpose the data.
Args:
data(np.ndarray): The data to be processed.
Returns:
np.ndarray: The processed data.
"""
return np.transpose(data)
def log(self, data: np.ndarray) -> np.ndarray:
"""
Perform log on the data.
Args:
data(np.ndarray): The data to be processed.
Returns:
np.ndarray: The processed data.
"""
# TODO this is not final solution -> data should stay as int16
data = data.astype(np.float32)
offset = 1e-6
data_offset = data + offset
return np.log10(data_offset)
def update_image_stats(self, data: np.ndarray) -> None:
"""Get the statistics of the image data.
Args:
data(np.ndarray): The image data.
"""
self.config.stats.maximum = np.max(data)
self.config.stats.minimum = np.min(data)
self.config.stats.mean = np.mean(data)
self.config.stats.std = np.std(data)
def process_image(self, data: np.ndarray) -> np.ndarray:
"""Core processing logic without threading overhead."""
if self.config.fft:
data = self.FFT(data)
if self.config.rotation is not None:
data = self.rotation(data, self.config.rotation)
if self.config.transpose:
data = self.transpose(data)
if self.config.log:
data = self.log(data)
self.update_image_stats(data)
return data

View File

@@ -1,57 +0,0 @@
from bec_lib.device import ReadoutPriority
from qtpy.QtCore import Qt
from qtpy.QtWidgets import QComboBox, QStyledItemDelegate
from bec_widgets.qt_utils.error_popups import SafeSlot
from bec_widgets.qt_utils.toolbar import ToolbarBundle, WidgetAction
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
class NoCheckDelegate(QStyledItemDelegate):
"""To reduce space in combo boxes by removing the checkmark."""
def initStyleOption(self, option, index):
super().initStyleOption(option, index)
# Remove any check indicator
option.checkState = Qt.Unchecked
class MonitorSelectionToolbarBundle(ToolbarBundle):
"""
A bundle of actions for a toolbar that controls monitor selection on a plot.
"""
def __init__(self, bundle_id="device_selection", target_widget=None, **kwargs):
super().__init__(bundle_id=bundle_id, actions=[], **kwargs)
self.target_widget = target_widget
# 1) Device combo box
self.device_combo_box = DeviceComboBox(
device_filter=BECDeviceFilter.DEVICE, readout_priority_filter=[ReadoutPriority.ASYNC]
)
self.device_combo_box.addItem("", None)
self.device_combo_box.setCurrentText("")
self.device_combo_box.setToolTip("Select Device")
self.device_combo_box.setItemDelegate(NoCheckDelegate(self.device_combo_box))
self.add_action("monitor", WidgetAction(widget=self.device_combo_box, adjust_size=True))
# 2) Dimension combo box
self.dim_combo_box = QComboBox()
self.dim_combo_box.addItems(["auto", "1d", "2d"])
self.dim_combo_box.setCurrentText("auto")
self.dim_combo_box.setToolTip("Monitor Dimension")
self.dim_combo_box.setFixedWidth(60)
self.dim_combo_box.setItemDelegate(NoCheckDelegate(self.dim_combo_box))
self.add_action("dim_combo", WidgetAction(widget=self.dim_combo_box, adjust_size=True))
# Connect slots, a device will be connected upon change of any combobox
self.device_combo_box.currentTextChanged.connect(lambda: self.connect_monitor())
self.dim_combo_box.currentTextChanged.connect(lambda: self.connect_monitor())
@SafeSlot()
def connect_monitor(self):
dim = self.dim_combo_box.currentText()
self.target_widget.image(monitor=self.device_combo_box.currentText(), monitor_type=dim)

View File

@@ -1,79 +0,0 @@
from bec_widgets.qt_utils.error_popups import SafeSlot
from bec_widgets.qt_utils.toolbar import MaterialIconAction, ToolbarBundle
class ImageProcessingToolbarBundle(ToolbarBundle):
"""
A bundle of actions for a toolbar that controls processing of monitor.
"""
def __init__(self, bundle_id="mouse_interaction", target_widget=None, **kwargs):
super().__init__(bundle_id=bundle_id, actions=[], **kwargs)
self.target_widget = target_widget
self.fft = MaterialIconAction(icon_name="fft", tooltip="Toggle FFT", checkable=True)
self.log = MaterialIconAction(icon_name="log_scale", tooltip="Toggle Log", checkable=True)
self.transpose = MaterialIconAction(
icon_name="transform", tooltip="Transpose Image", checkable=True
)
self.right = MaterialIconAction(
icon_name="rotate_right", tooltip="Rotate image clockwise by 90 deg"
)
self.left = MaterialIconAction(
icon_name="rotate_left", tooltip="Rotate image counterclockwise by 90 deg"
)
self.reset = MaterialIconAction(icon_name="reset_settings", tooltip="Reset Image Settings")
self.add_action("fft", self.fft)
self.add_action("log", self.log)
self.add_action("transpose", self.transpose)
self.add_action("rotate_right", self.right)
self.add_action("rotate_left", self.left)
self.add_action("reset", self.reset)
self.fft.action.triggered.connect(self.toggle_fft)
self.log.action.triggered.connect(self.toggle_log)
self.transpose.action.triggered.connect(self.toggle_transpose)
self.right.action.triggered.connect(self.rotate_right)
self.left.action.triggered.connect(self.rotate_left)
self.reset.action.triggered.connect(self.reset_settings)
@SafeSlot()
def toggle_fft(self):
checked = self.fft.action.isChecked()
self.target_widget.fft = checked
@SafeSlot()
def toggle_log(self):
checked = self.log.action.isChecked()
self.target_widget.log = checked
@SafeSlot()
def toggle_transpose(self):
checked = self.transpose.action.isChecked()
self.target_widget.transpose = checked
@SafeSlot()
def rotate_right(self):
if self.target_widget.rotation is None:
return
rotation = (self.target_widget.rotation - 1) % 4
self.target_widget.rotation = rotation
@SafeSlot()
def rotate_left(self):
if self.target_widget.rotation is None:
return
rotation = (self.target_widget.rotation + 1) % 4
self.target_widget.rotation = rotation
@SafeSlot()
def reset_settings(self):
self.target_widget.fft = False
self.target_widget.log = False
self.target_widget.transpose = False
self.target_widget.rotation = 0
self.fft.action.setChecked(False)
self.log.action.setChecked(False)
self.transpose.action.setChecked(False)

View File

@@ -112,8 +112,6 @@ class PlotBase(BECWidget, QWidget):
self.fps_label = QLabel(alignment=Qt.AlignmentFlag.AlignRight)
self._user_x_label = ""
self._x_label_suffix = ""
self._user_y_label = ""
self._y_label_suffix = ""
self._init_ui()
@@ -323,7 +321,22 @@ class PlotBase(BECWidget, QWidget):
Args:
value(bool): The value to set.
"""
self.toolbar.setVisible(value)
if value:
# Disable popup mode
if self._popups:
# Directly update the internal flag to avoid recursion
self._popups = False
# Hide the popup bundle if it exists and close any open dialogs
if self.popup_bundle is not None:
for action in self.toolbar.bundles["popup_bundle"].actions:
action.setVisible(False)
if self.axis_settings_dialog is not None and self.axis_settings_dialog.isVisible():
self.axis_settings_dialog.close()
self.side_panel.show()
# Add side menus if not already added
self.add_side_menus()
else:
self.side_panel.hide()
@SafeProperty(bool, doc="Enable the FPS monitor.")
def enable_fps_monitor(self) -> bool:
@@ -478,7 +491,7 @@ class PlotBase(BECWidget, QWidget):
"""
The set label for the y-axis.
"""
return self._user_y_label
return self.plot_item.getAxis("left").labelText
@y_label.setter
def y_label(self, value: str):
@@ -487,39 +500,9 @@ class PlotBase(BECWidget, QWidget):
Args:
value(str): The label to set.
"""
self._user_y_label = value
self._apply_y_label()
self.plot_item.setLabel("left", text=value)
self.property_changed.emit("y_label", value)
@property
def y_label_suffix(self) -> str:
"""
A read-only suffix automatically appended to the y label.
"""
return self._y_label_suffix
def set_y_label_suffix(self, suffix: str):
"""
Public method to update the y label suffix.
"""
self._y_label_suffix = suffix
self._apply_y_label()
@property
def y_label_combined(self) -> str:
"""
The final y label shown on the axis = user portion + suffix.
"""
return self._user_y_label + self._y_label_suffix
def _apply_y_label(self):
"""
Actually updates the pyqtgraph y axis label text to
the combined y label. Called whenever y label or suffix changes.
"""
final_label = self.y_label_combined
self.plot_item.setLabel("left", text=final_label)
def _tuple_to_qpointf(self, tuple: tuple | list):
"""
Helper function to convert a tuple to a QPointF.
@@ -961,10 +944,9 @@ class PlotBase(BECWidget, QWidget):
self.cleanup_pyqtgraph()
super().cleanup()
def cleanup_pyqtgraph(self, item: pg.PlotItem | None = None):
def cleanup_pyqtgraph(self):
"""Cleanup pyqtgraph items."""
if item is None:
item = self.plot_item
item = self.plot_item
item.vb.menu.close()
item.vb.menu.deleteLater()
item.ctrlMenu.close()

View File

@@ -1,17 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.plots_next_gen.scatter_waveform.scatter_waveform_plugin import (
ScatterWaveformPlugin,
)
QPyDesignerCustomWidgetCollection.addCustomWidget(ScatterWaveformPlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -1,194 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Literal
import numpy as np
import pyqtgraph as pg
from bec_lib import bec_logger
from pydantic import BaseModel, Field, ValidationError, field_validator
from qtpy import QtCore
from bec_widgets.utils import BECConnector, Colors, ConnectionConfig
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.widgets.plots_next_gen.scatter_waveform.scatter_waveform import ScatterWaveform
logger = bec_logger.logger
# noinspection PyDataclass
class ScatterDeviceSignal(BaseModel):
"""The configuration of a signal in the scatter waveform widget."""
name: str
entry: str
model_config: dict = {"validate_assignment": True}
# noinspection PyDataclass
class ScatterCurveConfig(ConnectionConfig):
parent_id: str | None = Field(None, description="The parent plot of the curve.")
label: str | None = Field(None, description="The label of the curve.")
color: str | tuple = Field("#808080", description="The color of the curve.")
symbol: str | None = Field("o", description="The symbol of the curve.")
symbol_size: int | None = Field(7, description="The size of the symbol of the curve.")
pen_width: int | None = Field(4, description="The width of the pen of the curve.")
pen_style: Literal["solid", "dash", "dot", "dashdot"] = Field(
"solid", description="The style of the pen of the curve."
)
color_map: str | None = Field(
"magma", description="The color palette of the figure widget.", validate_default=True
)
x_device: ScatterDeviceSignal | None = Field(
None, description="The x device signal of the scatter waveform."
)
y_device: ScatterDeviceSignal | None = Field(
None, description="The y device signal of the scatter waveform."
)
z_device: ScatterDeviceSignal | None = Field(
None, description="The z device signal of the scatter waveform."
)
model_config: dict = {"validate_assignment": True}
_validate_color_palette = field_validator("color_map")(Colors.validate_color_map)
class ScatterCurve(BECConnector, pg.PlotDataItem):
"""Scatter curve item for the scatter waveform widget."""
USER_ACCESS = ["color_map"]
def __init__(
self,
parent_item: ScatterWaveform,
name: str | None = None,
config: ScatterCurveConfig | None = None,
gui_id: str | None = None,
**kwargs,
):
if config is None:
config = ScatterCurveConfig(
label=name,
widget_class=self.__class__.__name__,
parent_id=parent_item.config.gui_id,
)
self.config = config
else:
self.config = config
name = config.label
super().__init__(config=config, gui_id=gui_id)
pg.PlotDataItem.__init__(self, name=name)
self.parent_item = parent_item
self.data_z = None # color scaling needs to be cashed for changing colormap
self.apply_config()
def apply_config(self, config: dict | ScatterCurveConfig | None = None, **kwargs) -> None:
"""
Apply the configuration to the curve.
Args:
config(dict|ScatterCurveConfig, optional): The configuration to apply.
"""
if config is not None:
if isinstance(config, dict):
config = ScatterCurveConfig(**config)
self.config = config
pen_style_map = {
"solid": QtCore.Qt.SolidLine,
"dash": QtCore.Qt.DashLine,
"dot": QtCore.Qt.DotLine,
"dashdot": QtCore.Qt.DashDotLine,
}
pen_style = pen_style_map.get(self.config.pen_style, QtCore.Qt.SolidLine)
pen = pg.mkPen(color=self.config.color, width=self.config.pen_width, style=pen_style)
self.setPen(pen)
if self.config.symbol:
self.setSymbolSize(self.config.symbol_size)
self.setSymbol(self.config.symbol)
@property
def color_map(self) -> str:
"""The color map of the scatter curve."""
return self.config.color_map
@color_map.setter
def color_map(self, value: str):
"""
Set the color map of the scatter curve.
Args:
value(str): The color map to set.
"""
try:
if value != self.config.color_map:
self.config.color_map = value
self.refresh_color_map(value)
except ValidationError:
return
def set_data(
self,
x: list[float] | np.ndarray,
y: list[float] | np.ndarray,
z: list[float] | np.ndarray,
color_map: str | None = None,
):
"""
Set the data of the scatter curve.
Args:
x (list[float] | np.ndarray): The x data of the scatter curve.
y (list[float] | np.ndarray): The y data of the scatter curve.
z (list[float] | np.ndarray): The z data of the scatter curve.
color_map (str | None): The color map of the scatter curve.
"""
if color_map is None:
color_map = self.config.color_map
self.data_z = z
color_z = self._make_z_gradient(z, color_map)
try:
self.setData(x=x, y=y, symbolBrush=color_z)
except TypeError:
logger.error("Error in setData, one of the data arrays is None")
def _make_z_gradient(self, data_z: list | np.ndarray, colormap: str) -> list | None:
"""
Make a gradient color for the z values.
Args:
data_z(list|np.ndarray): Z values.
colormap(str): Colormap for the gradient color.
Returns:
list: List of colors for the z values.
"""
# Normalize z_values for color mapping
z_min, z_max = np.min(data_z), np.max(data_z)
if z_max != z_min: # Ensure that there is a range in the z values
z_values_norm = (data_z - z_min) / (z_max - z_min)
colormap = pg.colormap.get(colormap) # using colormap from global settings
colors = [colormap.map(z, mode="qcolor") for z in z_values_norm]
return colors
else:
return None
def refresh_color_map(self, color_map: str):
"""
Refresh the color map of the scatter curve.
Args:
color_map(str): The color map to use.
"""
x_data, y_data = self.getData()
if x_data is None or y_data is None:
return
if self.data_z is not None:
self.set_data(x_data, y_data, self.data_z, color_map)

View File

@@ -1,522 +0,0 @@
from __future__ import annotations
import json
import pyqtgraph as pg
from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
from pydantic import Field, ValidationError, field_validator
from qtpy.QtCore import QTimer, Signal
from qtpy.QtWidgets import QHBoxLayout, QMainWindow, QWidget
from bec_widgets.qt_utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.qt_utils.settings_dialog import SettingsDialog
from bec_widgets.qt_utils.toolbar import MaterialIconAction
from bec_widgets.utils import Colors, ConnectionConfig
from bec_widgets.utils.colors import set_theme
from bec_widgets.widgets.plots_next_gen.plot_base import PlotBase
from bec_widgets.widgets.plots_next_gen.scatter_waveform.scatter_curve import (
ScatterCurve,
ScatterCurveConfig,
ScatterDeviceSignal,
)
from bec_widgets.widgets.plots_next_gen.scatter_waveform.settings.scatter_curve_setting import (
ScatterCurveSettings,
)
logger = bec_logger.logger
# noinspection PyDataclass
class ScatterWaveformConfig(ConnectionConfig):
color_map: str | None = Field(
"magma",
description="The color map of the z scaling of scatter waveform.",
validate_default=True,
)
model_config: dict = {"validate_assignment": True}
_validate_color_palette = field_validator("color_map")(Colors.validate_color_map)
class ScatterWaveform(PlotBase):
PLUGIN = True
RPC = True
ICON_NAME = "scatter_plot"
USER_ACCESS = [
# General PlotBase Settings
"enable_toolbar",
"enable_toolbar.setter",
"enable_side_panel",
"enable_side_panel.setter",
"enable_fps_monitor",
"enable_fps_monitor.setter",
"set",
"title",
"title.setter",
"x_label",
"x_label.setter",
"y_label",
"y_label.setter",
"x_limits",
"x_limits.setter",
"y_limits",
"y_limits.setter",
"x_grid",
"x_grid.setter",
"y_grid",
"y_grid.setter",
"inner_axes",
"inner_axes.setter",
"outer_axes",
"outer_axes.setter",
"lock_aspect_ratio",
"lock_aspect_ratio.setter",
"auto_range_x",
"auto_range_x.setter",
"auto_range_y",
"auto_range_y.setter",
"x_log",
"x_log.setter",
"y_log",
"y_log.setter",
"legend_label_size",
"legend_label_size.setter",
# Scatter Waveform Specific RPC Access
"main_curve",
"color_map",
"color_map.setter",
"plot",
"update_with_scan_history",
"clear_all",
]
sync_signal_update = Signal()
new_scan = Signal()
new_scan_id = Signal(str)
scatter_waveform_property_changed = Signal()
def __init__(
self,
parent: QWidget | None = None,
config: ScatterWaveformConfig | None = None,
client=None,
gui_id: str | None = None,
popups: bool = True,
**kwargs,
):
if config is None:
config = ScatterWaveformConfig(widget_class=self.__class__.__name__)
super().__init__(
parent=parent, config=config, client=client, gui_id=gui_id, popups=popups, **kwargs
)
self._main_curve = ScatterCurve(parent_item=self)
# For PropertyManager identification
self.setObjectName("ScatterWaveform")
# Specific GUI elements
self.scatter_dialog = None
# Scan Data
self.old_scan_id = None
self.scan_id = None
self.scan_item = None
# Scan status update loop
self.bec_dispatcher.connect_slot(self.on_scan_status, MessageEndpoints.scan_status())
self.bec_dispatcher.connect_slot(self.on_scan_progress, MessageEndpoints.scan_progress())
# Curve update loop
self.proxy_update_sync = pg.SignalProxy(
self.sync_signal_update, rateLimit=25, slot=self.update_sync_curves
)
self._init_scatter_curve_settings()
self.update_with_scan_history(-1)
################################################################################
# Widget Specific GUI interactions
################################################################################
def _init_scatter_curve_settings(self):
"""
Initialize the scatter curve settings menu.
"""
scatter_curve_settings = ScatterCurveSettings(target_widget=self, popup=False)
self.side_panel.add_menu(
action_id="scatter_curve",
icon_name="scatter_plot",
tooltip="Show Scatter Curve Settings",
widget=scatter_curve_settings,
title="Scatter Curve Settings",
)
def add_popups(self):
"""
Add popups to the ScatterWaveform widget.
"""
super().add_popups()
scatter_curve_setting_action = MaterialIconAction(
icon_name="scatter_plot",
tooltip="Show Scatter Curve Settings",
checkable=True,
parent=self,
)
self.toolbar.add_action_to_bundle(
bundle_id="popup_bundle",
action_id="scatter_waveform_settings",
action=scatter_curve_setting_action,
target_widget=self,
)
self.toolbar.widgets["scatter_waveform_settings"].action.triggered.connect(
self.show_scatter_curve_settings
)
def show_scatter_curve_settings(self):
"""
Show the scatter curve settings dialog.
"""
scatter_settings_action = self.toolbar.widgets["scatter_waveform_settings"].action
if self.scatter_dialog is None or not self.scatter_dialog.isVisible():
scatter_settings = ScatterCurveSettings(target_widget=self, popup=True)
self.scatter_dialog = SettingsDialog(
self,
settings_widget=scatter_settings,
window_title="Scatter Curve Settings",
modal=False,
)
self.scatter_dialog.resize(620, 200)
# When the dialog is closed, update the toolbar icon and clear the reference
self.scatter_dialog.finished.connect(self._scatter_dialog_closed)
self.scatter_dialog.show()
scatter_settings_action.setChecked(True)
else:
# If already open, bring it to the front
self.scatter_dialog.raise_()
self.scatter_dialog.activateWindow()
scatter_settings_action.setChecked(True) # keep it toggled
def _scatter_dialog_closed(self):
"""
Slot for when the scatter curve settings dialog is closed.
"""
self.scatter_dialog = None
self.toolbar.widgets["scatter_waveform_settings"].action.setChecked(False)
################################################################################
# Widget Specific Properties
################################################################################
@property
def main_curve(self) -> ScatterCurve:
"""The main scatter curve item."""
return self._main_curve
@SafeProperty(str)
def color_map(self) -> str:
"""The color map of the scatter waveform."""
return self.config.color_map
@color_map.setter
def color_map(self, value: str):
"""
Set the color map of the scatter waveform.
Args:
value(str): The color map to set.
"""
try:
self.config.color_map = value
self.main_curve.color_map = value
self.scatter_waveform_property_changed.emit()
except ValidationError:
return
@SafeProperty(str, designable=False, popup_error=True)
def curve_json(self) -> str:
"""
Get the curve configuration as a JSON string.
"""
return json.dumps(self.main_curve.config.model_dump(), indent=2)
@curve_json.setter
def curve_json(self, value: str):
"""
Set the curve configuration from a JSON string.
Args:
value(str): The JSON string to set the curve configuration from.
"""
try:
config = ScatterCurveConfig(**json.loads(value))
self._add_main_scatter_curve(config)
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON: {e}")
################################################################################
# High Level methods for API
################################################################################
@SafeSlot(popup_error=True)
def plot(
self,
x_name: str,
y_name: str,
z_name: str,
x_entry: None | str = None,
y_entry: None | str = None,
z_entry: None | str = None,
color_map: str | None = "magma",
label: str | None = None,
validate_bec: bool = True,
) -> ScatterCurve:
"""
Plot the data from the device signals.
Args:
x_name (str): The name of the x device signal.
y_name (str): The name of the y device signal.
z_name (str): The name of the z device signal.
x_entry (None | str): The x entry of the device signal.
y_entry (None | str): The y entry of the device signal.
z_entry (None | str): The z entry of the device signal.
color_map (str | None): The color map of the scatter waveform.
label (str | None): The label of the curve.
validate_bec (bool): Whether to validate the device signals with current BEC instance.
Returns:
ScatterCurve: The scatter curve object.
"""
if validate_bec:
x_entry = self.entry_validator.validate_signal(x_name, x_entry)
y_entry = self.entry_validator.validate_signal(y_name, y_entry)
z_entry = self.entry_validator.validate_signal(z_name, z_entry)
if color_map is not None:
try:
self.config.color_map = color_map
except ValidationError:
raise ValueError(
f"Invalid color map '{color_map}'. Using previously defined color map '{self.config.color_map}'."
)
if label is None:
label = f"{z_name}-{z_entry}"
config = ScatterCurveConfig(
parent_id=self.gui_id,
label=label,
color_map=color_map,
x_device=ScatterDeviceSignal(name=x_name, entry=x_entry),
y_device=ScatterDeviceSignal(name=y_name, entry=y_entry),
z_device=ScatterDeviceSignal(name=z_name, entry=z_entry),
)
# Add Curve
self._add_main_scatter_curve(config)
self.scatter_waveform_property_changed.emit()
return self._main_curve
def _add_main_scatter_curve(self, config: ScatterCurveConfig):
"""
Add the main scatter curve to the plot.
Args:
config(ScatterCurveConfig): The configuration of the scatter curve.
"""
# Apply suffix for axes
self.set_x_label_suffix(f"[{config.x_device.name}-{config.x_device.name}]")
self.set_y_label_suffix(f"[{config.y_device.name}-{config.y_device.name}]")
# To have only one main curve
if self._main_curve is not None:
self.plot_item.removeItem(self._main_curve)
self._main_curve = None
self._main_curve = ScatterCurve(
parent_item=self, config=config, gui_id=self.gui_id, name=config.label
)
self.plot_item.addItem(self._main_curve)
self.sync_signal_update.emit()
################################################################################
# BEC Update Methods
################################################################################
@SafeSlot(dict, dict)
def on_scan_status(self, msg: dict, meta: dict):
"""
Initial scan status message handler, which is triggered at the begging and end of scan.
Used for triggering the update of the sync and async curves.
Args:
msg(dict): The message content.
meta(dict): The message metadata.
"""
current_scan_id = msg.get("scan_id", None)
if current_scan_id is None:
return
if current_scan_id != self.scan_id:
self.reset()
self.new_scan.emit()
self.new_scan_id.emit(current_scan_id)
self.auto_range_x = True
self.auto_range_y = True
self.old_scan_id = self.scan_id
self.scan_id = current_scan_id
self.scan_item = self.queue.scan_storage.find_scan_by_ID(self.scan_id)
# First trigger to update the scan curves
self.sync_signal_update.emit()
@SafeSlot(dict, dict)
def on_scan_progress(self, msg: dict, meta: dict):
"""
Slot for handling scan progress messages. Used for triggering the update of the sync curves.
Args:
msg(dict): The message content.
meta(dict): The message metadata.
"""
self.sync_signal_update.emit()
status = msg.get("done")
if status:
QTimer.singleShot(100, self.update_sync_curves)
QTimer.singleShot(300, self.update_sync_curves)
@SafeSlot()
def update_sync_curves(self, _=None):
"""
Update the scan curves with the data from the scan segment.
"""
if self.scan_item is None:
logger.info("No scan executed so far; skipping device curves categorisation.")
return "none"
data, access_key = self._fetch_scan_data_and_access()
if data == "none":
logger.info("No scan executed so far; skipping device curves categorisation.")
return "none"
try:
x_name = self._main_curve.config.x_device.name
x_entry = self._main_curve.config.x_device.entry
y_name = self._main_curve.config.y_device.name
y_entry = self._main_curve.config.y_device.entry
z_name = self._main_curve.config.z_device.name
z_entry = self._main_curve.config.z_device.entry
except AttributeError:
return
if access_key == "val":
x_data = data.get(x_name, {}).get(x_entry, {}).get(access_key, None)
y_data = data.get(y_name, {}).get(y_entry, {}).get(access_key, None)
z_data = data.get(z_name, {}).get(z_entry, {}).get(access_key, None)
else:
x_data = data.get(x_name, {}).get(x_entry, {}).read().get("value", None)
y_data = data.get(y_name, {}).get(y_entry, {}).read().get("value", None)
z_data = data.get(z_name, {}).get(z_entry, {}).read().get("value", None)
self._main_curve.set_data(x=x_data, y=y_data, z=z_data)
def _fetch_scan_data_and_access(self):
"""
Decide whether the widget is in live or historical mode
and return the appropriate data dict and access key.
Returns:
data_dict (dict): The data structure for the current scan.
access_key (str): Either 'val' (live) or 'value' (history).
"""
if self.scan_item is None:
# Optionally fetch the latest from history if nothing is set
self.update_with_scan_history(-1)
if self.scan_item is None:
logger.info("No scan executed so far; skipping device curves categorisation.")
return "none", "none"
if hasattr(self.scan_item, "live_data"):
# Live scan
return self.scan_item.live_data, "val"
else:
# Historical
scan_devices = self.scan_item.devices
return scan_devices, "value"
@SafeSlot(int)
@SafeSlot(str)
@SafeSlot()
def update_with_scan_history(self, scan_index: int = None, scan_id: str = None):
"""
Update the scan curves with the data from the scan storage.
Provide only one of scan_id or scan_index.
Args:
scan_id(str, optional): ScanID of the scan to be updated. Defaults to None.
scan_index(int, optional): Index of the scan to be updated. Defaults to None.
"""
if scan_index is not None and scan_id is not None:
raise ValueError("Only one of scan_id or scan_index can be provided.")
if scan_index is None and scan_id is None:
logger.warning(f"Neither scan_id or scan_number was provided, fetching the latest scan")
scan_index = -1
if scan_index is not None:
if len(self.client.history) == 0:
logger.info("No scans executed so far. Skipping scan history update.")
return
self.scan_item = self.client.history[scan_index]
metadata = self.scan_item.metadata
self.scan_id = metadata["bec"]["scan_id"]
else:
self.scan_id = scan_id
self.scan_item = self.client.history.get_by_scan_id(scan_id)
self.sync_signal_update.emit()
################################################################################
# Cleanup
################################################################################
@SafeSlot()
def clear_all(self):
"""
Clear all the curves from the plot.
"""
if self.crosshair is not None:
self.crosshair.clear_markers()
self._main_curve.clear()
class DemoApp(QMainWindow): # pragma: no cover
def __init__(self):
super().__init__()
self.setWindowTitle("Waveform Demo")
self.resize(800, 600)
self.main_widget = QWidget()
self.layout = QHBoxLayout(self.main_widget)
self.setCentralWidget(self.main_widget)
self.waveform_popup = ScatterWaveform(popups=True)
self.waveform_popup.plot("samx", "samy", "bpm4i")
self.waveform_side = ScatterWaveform(popups=False)
self.waveform_popup.plot("samx", "samy", "bpm3a")
self.layout.addWidget(self.waveform_side)
self.layout.addWidget(self.waveform_popup)
if __name__ == "__main__": # pragma: no cover
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
set_theme("dark")
widget = DemoApp()
widget.show()
widget.resize(1400, 600)
sys.exit(app.exec_())

View File

@@ -1 +0,0 @@
{'files': ['scatter_waveform.py']}

View File

@@ -1,54 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.plots_next_gen.scatter_waveform.scatter_waveform import ScatterWaveform
DOM_XML = """
<ui language='c++'>
<widget class='ScatterWaveform' name='scatter_waveform'>
</widget>
</ui>
"""
class ScatterWaveformPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = ScatterWaveform(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return "Plot Widgets Next Gen"
def icon(self):
return designer_material_icon(ScatterWaveform.ICON_NAME)
def includeFile(self):
return "scatter_waveform"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "ScatterWaveform"
def toolTip(self):
return "ScatterWaveform"
def whatsThis(self):
return self.toolTip()

View File

@@ -1,125 +0,0 @@
import os
from qtpy.QtWidgets import QFrame, QScrollArea, QVBoxLayout
from bec_widgets.qt_utils.error_popups import SafeSlot
from bec_widgets.qt_utils.settings_dialog import SettingWidget
from bec_widgets.utils import UILoader
class ScatterCurveSettings(SettingWidget):
def __init__(self, parent=None, target_widget=None, popup=False, *args, **kwargs):
super().__init__(parent=parent, *args, **kwargs)
# This is a settings widget that depends on the target widget
# and should mirror what is in the target widget.
# Saving settings for this widget could result in recursively setting the target widget.
self.setProperty("skip_settings", True)
self.setObjectName("ScatterCurveSettings")
current_path = os.path.dirname(__file__)
if popup:
form = UILoader().load_ui(
os.path.join(current_path, "scatter_curve_settings_horizontal.ui"), self
)
else:
form = UILoader().load_ui(
os.path.join(current_path, "scatter_curve_settings_vertical.ui"), self
)
self.target_widget = target_widget
self.popup = popup
# # Scroll area
self.scroll_area = QScrollArea(self)
self.scroll_area.setWidgetResizable(True)
self.scroll_area.setFrameShape(QFrame.NoFrame)
self.scroll_area.setWidget(form)
self.layout = QVBoxLayout(self)
self.layout.setContentsMargins(0, 0, 0, 0)
self.layout.addWidget(self.scroll_area)
self.ui = form
self.fetch_all_properties()
self.target_widget.scatter_waveform_property_changed.connect(self.fetch_all_properties)
if popup is False:
self.ui.button_apply.clicked.connect(self.accept_changes)
@SafeSlot()
def fetch_all_properties(self):
"""
Fetch all properties from the target widget and update the settings widget.
"""
if not self.target_widget:
return
# Get properties from the target widget
color_map = getattr(self.target_widget, "color_map", None)
# Default values for device properties
x_name, x_entry = None, None
y_name, y_entry = None, None
z_name, z_entry = None, None
# Safely access device properties
if hasattr(self.target_widget, "main_curve") and self.target_widget.main_curve:
if hasattr(self.target_widget.main_curve, "config"):
config = self.target_widget.main_curve.config
if hasattr(config, "x_device") and config.x_device:
x_name = getattr(config.x_device, "name", None)
x_entry = getattr(config.x_device, "entry", None)
if hasattr(config, "y_device") and config.y_device:
y_name = getattr(config.y_device, "name", None)
y_entry = getattr(config.y_device, "entry", None)
if hasattr(config, "z_device") and config.z_device:
z_name = getattr(config.z_device, "name", None)
z_entry = getattr(config.z_device, "entry", None)
# Apply the properties to the settings widget
if hasattr(self.ui, "color_map"):
self.ui.color_map.colormap = color_map
if hasattr(self.ui, "x_name"):
self.ui.x_name.set_device(x_name)
if hasattr(self.ui, "x_entry") and x_entry is not None:
self.ui.x_entry.setText(x_entry)
if hasattr(self.ui, "y_name"):
self.ui.y_name.set_device(y_name)
if hasattr(self.ui, "y_entry") and y_entry is not None:
self.ui.y_entry.setText(y_entry)
if hasattr(self.ui, "z_name"):
self.ui.z_name.set_device(z_name)
if hasattr(self.ui, "z_entry") and z_entry is not None:
self.ui.z_entry.setText(z_entry)
@SafeSlot()
def accept_changes(self):
"""
Apply all properties from the settings widget to the target widget.
"""
x_name = self.ui.x_name.text()
x_entry = self.ui.x_entry.text()
y_name = self.ui.y_name.text()
y_entry = self.ui.y_entry.text()
z_name = self.ui.z_name.text()
z_entry = self.ui.z_entry.text()
validate_bec = self.ui.validate_bec.checked
color_map = self.ui.color_map.colormap
self.target_widget.plot(
x_name=x_name,
y_name=y_name,
z_name=z_name,
x_entry=x_entry,
y_entry=y_entry,
z_entry=z_entry,
color_map=color_map,
validate_bec=validate_bec,
)

View File

@@ -1,195 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>604</width>
<height>166</height>
</rect>
</property>
<property name="windowTitle">
<string>Form</string>
</property>
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<layout class="QHBoxLayout" name="horizontalLayout_2">
<item>
<widget class="QLabel" name="label_7">
<property name="text">
<string>Validate BEC</string>
</property>
</widget>
</item>
<item>
<widget class="ToggleSwitch" name="validate_bec"/>
</item>
<item>
<widget class="BECColorMapWidget" name="color_map"/>
</item>
</layout>
</item>
<item>
<layout class="QHBoxLayout" name="horizontalLayout">
<item>
<widget class="QGroupBox" name="groupBox">
<property name="title">
<string>X Device</string>
</property>
<layout class="QGridLayout" name="gridLayout">
<item row="0" column="0">
<widget class="QLabel" name="label">
<property name="text">
<string>Name</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="DeviceLineEdit" name="x_name"/>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_2">
<property name="text">
<string>Signal</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="x_entry"/>
</item>
</layout>
</widget>
</item>
<item>
<widget class="QGroupBox" name="groupBox_2">
<property name="title">
<string>Y Device</string>
</property>
<layout class="QGridLayout" name="gridLayout_2">
<item row="0" column="0">
<widget class="QLabel" name="label_3">
<property name="text">
<string>Name</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="DeviceLineEdit" name="y_name"/>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_4">
<property name="text">
<string>Signal</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="y_entry"/>
</item>
</layout>
</widget>
</item>
<item>
<widget class="QGroupBox" name="groupBox_3">
<property name="title">
<string>Z Device</string>
</property>
<layout class="QGridLayout" name="gridLayout_3">
<item row="0" column="0">
<widget class="QLabel" name="label_5">
<property name="text">
<string>Name</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_6">
<property name="text">
<string>Signal</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="z_entry"/>
</item>
<item row="0" column="1">
<widget class="DeviceLineEdit" name="z_name"/>
</item>
</layout>
</widget>
</item>
</layout>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>DeviceLineEdit</class>
<extends>QLineEdit</extends>
<header>device_line_edit</header>
</customwidget>
<customwidget>
<class>ToggleSwitch</class>
<extends>QWidget</extends>
<header>toggle_switch</header>
</customwidget>
<customwidget>
<class>BECColorMapWidget</class>
<extends>QWidget</extends>
<header>bec_color_map_widget</header>
</customwidget>
</customwidgets>
<resources/>
<connections>
<connection>
<sender>x_name</sender>
<signal>textChanged(QString)</signal>
<receiver>x_entry</receiver>
<slot>clear()</slot>
<hints>
<hint type="sourcelabel">
<x>134</x>
<y>95</y>
</hint>
<hint type="destinationlabel">
<x>138</x>
<y>128</y>
</hint>
</hints>
</connection>
<connection>
<sender>y_name</sender>
<signal>textChanged(QString)</signal>
<receiver>y_entry</receiver>
<slot>clear()</slot>
<hints>
<hint type="sourcelabel">
<x>351</x>
<y>91</y>
</hint>
<hint type="destinationlabel">
<x>349</x>
<y>121</y>
</hint>
</hints>
</connection>
<connection>
<sender>z_name</sender>
<signal>textChanged(QString)</signal>
<receiver>z_entry</receiver>
<slot>clear()</slot>
<hints>
<hint type="sourcelabel">
<x>520</x>
<y>98</y>
</hint>
<hint type="destinationlabel">
<x>522</x>
<y>127</y>
</hint>
</hints>
</connection>
</connections>
</ui>

View File

@@ -1,204 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>233</width>
<height>427</height>
</rect>
</property>
<property name="maximumSize">
<size>
<width>16777215</width>
<height>427</height>
</size>
</property>
<property name="windowTitle">
<string>Form</string>
</property>
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<widget class="QPushButton" name="button_apply">
<property name="text">
<string>Apply</string>
</property>
</widget>
</item>
<item>
<widget class="BECColorMapWidget" name="color_map"/>
</item>
<item>
<layout class="QHBoxLayout" name="horizontalLayout">
<item>
<widget class="QLabel" name="label_7">
<property name="text">
<string>Validate BEC</string>
</property>
</widget>
</item>
<item>
<widget class="ToggleSwitch" name="validate_bec"/>
</item>
</layout>
</item>
<item>
<widget class="QGroupBox" name="groupBox">
<property name="title">
<string>X Device</string>
</property>
<layout class="QGridLayout" name="gridLayout">
<item row="0" column="0">
<widget class="QLabel" name="label">
<property name="text">
<string>Name</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="DeviceLineEdit" name="x_name"/>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_2">
<property name="text">
<string>Signal</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="x_entry"/>
</item>
</layout>
</widget>
</item>
<item>
<widget class="QGroupBox" name="groupBox_2">
<property name="title">
<string>Y Device</string>
</property>
<layout class="QGridLayout" name="gridLayout_2">
<item row="0" column="0">
<widget class="QLabel" name="label_3">
<property name="text">
<string>Name</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="DeviceLineEdit" name="y_name"/>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_4">
<property name="text">
<string>Signal</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="y_entry"/>
</item>
</layout>
</widget>
</item>
<item>
<widget class="QGroupBox" name="groupBox_3">
<property name="title">
<string>Z Device</string>
</property>
<layout class="QGridLayout" name="gridLayout_3">
<item row="0" column="0">
<widget class="QLabel" name="label_5">
<property name="text">
<string>Name</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="DeviceLineEdit" name="z_name"/>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_6">
<property name="text">
<string>Signal</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="z_entry"/>
</item>
</layout>
</widget>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>DeviceLineEdit</class>
<extends>QLineEdit</extends>
<header>device_line_edit</header>
</customwidget>
<customwidget>
<class>ToggleSwitch</class>
<extends>QWidget</extends>
<header>toggle_switch</header>
</customwidget>
<customwidget>
<class>BECColorMapWidget</class>
<extends>QWidget</extends>
<header>bec_color_map_widget</header>
</customwidget>
</customwidgets>
<resources/>
<connections>
<connection>
<sender>x_name</sender>
<signal>textChanged(QString)</signal>
<receiver>x_entry</receiver>
<slot>clear()</slot>
<hints>
<hint type="sourcelabel">
<x>156</x>
<y>123</y>
</hint>
<hint type="destinationlabel">
<x>158</x>
<y>157</y>
</hint>
</hints>
</connection>
<connection>
<sender>y_name</sender>
<signal>textChanged(QString)</signal>
<receiver>y_entry</receiver>
<slot>clear()</slot>
<hints>
<hint type="sourcelabel">
<x>116</x>
<y>229</y>
</hint>
<hint type="destinationlabel">
<x>116</x>
<y>251</y>
</hint>
</hints>
</connection>
<connection>
<sender>z_name</sender>
<signal>textChanged(QString)</signal>
<receiver>z_entry</receiver>
<slot>clear()</slot>
<hints>
<hint type="sourcelabel">
<x>110</x>
<y>326</y>
</hint>
<hint type="destinationlabel">
<x>110</x>
<y>352</y>
</hint>
</hints>
</connection>
</connections>
</ui>

View File

@@ -1,5 +1,3 @@
import traceback
from pyqtgraph.exporters import MatplotlibExporter
from bec_widgets.qt_utils.error_popups import SafeSlot, WarningPopupUtility
@@ -62,7 +60,7 @@ class PlotExportBundle(ToolbarBundle):
import matplotlib as mpl
MatplotlibExporter(self.target_widget.plot_item).export()
except ModuleNotFoundError:
except:
warning_util = WarningPopupUtility()
warning_util.show_warning(
title="Matplotlib not installed",
@@ -70,12 +68,3 @@ class PlotExportBundle(ToolbarBundle):
detailed_text="Please install matplotlib in your Python environment by using 'pip install matplotlib'.",
)
return
except TypeError:
warning_util = WarningPopupUtility()
error_msg = traceback.format_exc()
warning_util.show_warning(
title="Matplotlib TypeError",
message="Matplotlib exporter could not resolve the plot item.",
detailed_text=error_msg,
)
return

View File

@@ -1,7 +1,5 @@
from __future__ import annotations
from bec_lib.lmfit_serializer import serialize_lmfit_params
import json
from typing import Literal
@@ -11,7 +9,7 @@ import pyqtgraph as pg
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from pydantic import Field, ValidationError, field_validator
from qtpy.QtCore import QTimer, Signal
from qtpy.QtCore import Signal
from qtpy.QtWidgets import QDialog, QHBoxLayout, QMainWindow, QVBoxLayout, QWidget
from bec_widgets.qt_utils.error_popups import SafeProperty, SafeSlot
@@ -975,10 +973,6 @@ class Waveform(PlotBase):
meta(dict): The message metadata.
"""
self.sync_signal_update.emit()
status = msg.get("done")
if status:
QTimer.singleShot(100, self.update_sync_curves)
QTimer.singleShot(300, self.update_sync_curves)
def _fetch_scan_data_and_access(self):
"""
@@ -1154,8 +1148,8 @@ class Waveform(PlotBase):
MessageEndpoints.dap_response(f"{self.scan_id}-{self.gui_id}"),
)
@SafeSlot()
def request_dap(self, _=None):
# @SafeSlot() #FIXME type error
def request_dap(self):
"""Request new fit for data"""
for dap_curve in self._dap_curves:
@@ -1178,32 +1172,12 @@ class Waveform(PlotBase):
x_min = None
x_max = None
dice_roll = np.random.randint(0, 11)
if dap_curve.dap_params is not None:
fit_success = dap_curve.dap_summary.get("success", False)
else:
fit_success = False
params_serialized = {}
if fit_success and dice_roll < 9:
amplitude = dap_curve.dap_params.get("amplitude")
center = dap_curve.dap_params.get("center")
sigma = dap_curve.dap_params.get("sigma")
amplitude_param = lmfit.Parameter(name="amplitude", value=amplitude)
center_param = lmfit.Parameter(name="center", value=center)
sigma_param = lmfit.Parameter(name="sigma", value=sigma)
params_serialized = serialize_lmfit_params(
[amplitude_param, center_param, sigma_param]
)
msg = messages.DAPRequestMessage(
dap_cls="LmfitService1D",
dap_type="on_demand",
config={
"args": [],
"kwargs": {"data_x": x_data, "data_y": y_data, **params_serialized},
"kwargs": {"data_x": x_data, "data_y": y_data},
"class_args": model._plugin_info["class_args"],
"class_kwargs": model._plugin_info["class_kwargs"],
"curve_label": dap_curve.name(),
@@ -1239,18 +1213,9 @@ class Waveform(PlotBase):
# Retrieve and store the fit parameters and summary from the DAP server response
try:
if curve.dap_params is None:
curve.dap_params = msg["data"][1]["fit_parameters"]
curve.dap_summary = msg["data"][1]["fit_summary"]
print("set first time")
else:
old_redchi = curve.dap_summary.get("redchi", 0)
new_redchi = msg["data"][1]["fit_summary"]["redchi"]
if old_redchi < new_redchi:
curve.dap_params = msg["data"][1]["fit_parameters"]
curve.dap_summary = msg["data"][1]["fit_summary"]
print(f"updating from {old_redchi} to {new_redchi}")
except (TypeError, AttributeError):
curve.dap_params = msg["data"][1]["fit_parameters"]
curve.dap_summary = msg["data"][1]["fit_summary"]
except TypeError:
logger.warning(f"Failed to retrieve DAP data for curve '{curve.name()}'")
return

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "bec_widgets"
version = "1.25.0"
version = "1.24.5"
description = "BEC Widgets"
requires-python = ">=3.10"
classifiers = [

View File

@@ -1,378 +1,378 @@
# import time
# import numpy as np
# import pytest
# from bec_lib.endpoints import MessageEndpoints
# from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
# from bec_widgets.tests.utils import check_remote_data_size
# from bec_widgets.utils import Colors
# # pylint: disable=unused-argument
# # pylint: disable=redefined-outer-name
# # pylint: disable=too-many-locals
# def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_dock):
# # BEC client shortcuts
# dock = connected_client_dock
# client = bec_client_lib
# dev = client.device_manager.devices
# scans = client.scans
# queue = client.queue
# # Create 3 docks
# d0 = dock.add_dock("dock_0")
# d1 = dock.add_dock("dock_1")
# d2 = dock.add_dock("dock_2")
# dock_config = dock._config_dict
# assert len(dock_config["docks"]) == 3
# # Add 3 figures with some widgets
# fig0 = d0.add_widget("BECFigure")
# fig1 = d1.add_widget("BECFigure")
# fig2 = d2.add_widget("BECFigure")
# dock_config = dock._config_dict
# assert len(dock_config["docks"]) == 3
# assert len(dock_config["docks"]["dock_0"]["widgets"]) == 1
# assert len(dock_config["docks"]["dock_1"]["widgets"]) == 1
# assert len(dock_config["docks"]["dock_2"]["widgets"]) == 1
# assert fig1.__class__.__name__ == "BECFigure"
# assert fig1.__class__ == BECFigure
# assert fig2.__class__.__name__ == "BECFigure"
# assert fig2.__class__ == BECFigure
# mm = fig0.motor_map("samx", "samy")
# plt = fig1.plot(x_name="samx", y_name="bpm4i")
# im = fig2.image("eiger")
# assert mm.__class__.__name__ == "BECMotorMap"
# assert mm.__class__ == BECMotorMap
# assert plt.__class__.__name__ == "BECWaveform"
# assert plt.__class__ == BECWaveform
# assert im.__class__.__name__ == "BECImageShow"
# assert im.__class__ == BECImageShow
# assert mm._config_dict["signals"] == {
# "dap": None,
# "source": "device_readback",
# "x": {
# "name": "samx",
# "entry": "samx",
# "unit": None,
# "modifier": None,
# "limits": [-50.0, 50.0],
# },
# "y": {
# "name": "samy",
# "entry": "samy",
# "unit": None,
# "modifier": None,
# "limits": [-50.0, 50.0],
# },
# "z": None,
# }
# assert plt._config_dict["curves"]["bpm4i-bpm4i"]["signals"] == {
# "dap": None,
# "source": "scan_segment",
# "x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
# "y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None, "limits": None},
# "z": None,
# }
# assert im._config_dict["images"]["eiger"]["monitor"] == "eiger"
# # check initial position of motor map
# initial_pos_x = dev.samx.read()["samx"]["value"]
# initial_pos_y = dev.samy.read()["samy"]["value"]
# # Try to make a scan
# status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
# status.wait()
# # plot
# item = queue.scan_storage.storage[-1]
# plt_last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
# num_elements = 10
# plot_name = "bpm4i-bpm4i"
# qtbot.waitUntil(lambda: check_remote_data_size(plt, plot_name, num_elements))
# plt_data = plt.get_all_data()
# assert plt_data["bpm4i-bpm4i"]["x"] == plt_last_scan_data["samx"]["samx"].val
# assert plt_data["bpm4i-bpm4i"]["y"] == plt_last_scan_data["bpm4i"]["bpm4i"].val
# # image
# last_image_device = client.connector.get_last(MessageEndpoints.device_monitor_2d("eiger"))[
# "data"
# ].data
# time.sleep(0.5)
# last_image_plot = im.images[0].get_data()
# np.testing.assert_equal(last_image_device, last_image_plot)
# # motor map
# final_pos_x = dev.samx.read()["samx"]["value"]
# final_pos_y = dev.samy.read()["samy"]["value"]
# # check final coordinates of motor map
# motor_map_data = mm.get_data()
# np.testing.assert_equal(
# [motor_map_data["x"][0], motor_map_data["y"][0]], [initial_pos_x, initial_pos_y]
# )
# np.testing.assert_equal(
# [motor_map_data["x"][-1], motor_map_data["y"][-1]], [final_pos_x, final_pos_y]
# )
# def test_dock_manipulations_e2e(connected_client_dock):
# dock = connected_client_dock
# d0 = dock.add_dock("dock_0")
# d1 = dock.add_dock("dock_1")
# d2 = dock.add_dock("dock_2")
# dock_config = dock._config_dict
# assert len(dock_config["docks"]) == 3
# d0.detach()
# dock.detach_dock("dock_2")
# dock_config = dock._config_dict
# assert len(dock_config["docks"]) == 3
# assert len(dock.temp_areas) == 2
# d0.attach()
# dock_config = dock._config_dict
# assert len(dock_config["docks"]) == 3
# assert len(dock.temp_areas) == 1
# d2.remove()
# dock_config = dock._config_dict
# assert ["dock_0", "dock_1"] == list(dock_config["docks"])
# dock.clear_all()
# dock_config = dock._config_dict
# assert len(dock_config["docks"]) == 0
# assert len(dock.temp_areas) == 0
# def test_ring_bar(connected_client_dock):
# dock = connected_client_dock
# d0 = dock.add_dock(name="dock_0")
# bar = d0.add_widget("RingProgressBar")
# assert bar.__class__.__name__ == "RingProgressBar"
import time
import numpy as np
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
from bec_widgets.tests.utils import check_remote_data_size
from bec_widgets.utils import Colors
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# pylint: disable=too-many-locals
def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_dock):
# BEC client shortcuts
dock = connected_client_dock
client = bec_client_lib
dev = client.device_manager.devices
scans = client.scans
queue = client.queue
# Create 3 docks
d0 = dock.add_dock("dock_0")
d1 = dock.add_dock("dock_1")
d2 = dock.add_dock("dock_2")
dock_config = dock._config_dict
assert len(dock_config["docks"]) == 3
# Add 3 figures with some widgets
fig0 = d0.add_widget("BECFigure")
fig1 = d1.add_widget("BECFigure")
fig2 = d2.add_widget("BECFigure")
dock_config = dock._config_dict
assert len(dock_config["docks"]) == 3
assert len(dock_config["docks"]["dock_0"]["widgets"]) == 1
assert len(dock_config["docks"]["dock_1"]["widgets"]) == 1
assert len(dock_config["docks"]["dock_2"]["widgets"]) == 1
assert fig1.__class__.__name__ == "BECFigure"
assert fig1.__class__ == BECFigure
assert fig2.__class__.__name__ == "BECFigure"
assert fig2.__class__ == BECFigure
mm = fig0.motor_map("samx", "samy")
plt = fig1.plot(x_name="samx", y_name="bpm4i")
im = fig2.image("eiger")
assert mm.__class__.__name__ == "BECMotorMap"
assert mm.__class__ == BECMotorMap
assert plt.__class__.__name__ == "BECWaveform"
assert plt.__class__ == BECWaveform
assert im.__class__.__name__ == "BECImageShow"
assert im.__class__ == BECImageShow
assert mm._config_dict["signals"] == {
"dap": None,
"source": "device_readback",
"x": {
"name": "samx",
"entry": "samx",
"unit": None,
"modifier": None,
"limits": [-50.0, 50.0],
},
"y": {
"name": "samy",
"entry": "samy",
"unit": None,
"modifier": None,
"limits": [-50.0, 50.0],
},
"z": None,
}
assert plt._config_dict["curves"]["bpm4i-bpm4i"]["signals"] == {
"dap": None,
"source": "scan_segment",
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
"y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None, "limits": None},
"z": None,
}
assert im._config_dict["images"]["eiger"]["monitor"] == "eiger"
# check initial position of motor map
initial_pos_x = dev.samx.read()["samx"]["value"]
initial_pos_y = dev.samy.read()["samy"]["value"]
# Try to make a scan
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
status.wait()
# plot
item = queue.scan_storage.storage[-1]
plt_last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
num_elements = 10
plot_name = "bpm4i-bpm4i"
qtbot.waitUntil(lambda: check_remote_data_size(plt, plot_name, num_elements))
plt_data = plt.get_all_data()
assert plt_data["bpm4i-bpm4i"]["x"] == plt_last_scan_data["samx"]["samx"].val
assert plt_data["bpm4i-bpm4i"]["y"] == plt_last_scan_data["bpm4i"]["bpm4i"].val
# image
last_image_device = client.connector.get_last(MessageEndpoints.device_monitor_2d("eiger"))[
"data"
].data
time.sleep(0.5)
last_image_plot = im.images[0].get_data()
np.testing.assert_equal(last_image_device, last_image_plot)
# motor map
final_pos_x = dev.samx.read()["samx"]["value"]
final_pos_y = dev.samy.read()["samy"]["value"]
# check final coordinates of motor map
motor_map_data = mm.get_data()
np.testing.assert_equal(
[motor_map_data["x"][0], motor_map_data["y"][0]], [initial_pos_x, initial_pos_y]
)
np.testing.assert_equal(
[motor_map_data["x"][-1], motor_map_data["y"][-1]], [final_pos_x, final_pos_y]
)
def test_dock_manipulations_e2e(connected_client_dock):
dock = connected_client_dock
d0 = dock.add_dock("dock_0")
d1 = dock.add_dock("dock_1")
d2 = dock.add_dock("dock_2")
dock_config = dock._config_dict
assert len(dock_config["docks"]) == 3
d0.detach()
dock.detach_dock("dock_2")
dock_config = dock._config_dict
assert len(dock_config["docks"]) == 3
assert len(dock.temp_areas) == 2
d0.attach()
dock_config = dock._config_dict
assert len(dock_config["docks"]) == 3
assert len(dock.temp_areas) == 1
d2.remove()
dock_config = dock._config_dict
assert ["dock_0", "dock_1"] == list(dock_config["docks"])
dock.clear_all()
dock_config = dock._config_dict
assert len(dock_config["docks"]) == 0
assert len(dock.temp_areas) == 0
def test_ring_bar(connected_client_dock):
dock = connected_client_dock
d0 = dock.add_dock(name="dock_0")
bar = d0.add_widget("RingProgressBar")
assert bar.__class__.__name__ == "RingProgressBar"
# bar.set_number_of_bars(5)
# bar.set_colors_from_map("viridis")
# bar.set_value([10, 20, 30, 40, 50])
bar.set_number_of_bars(5)
bar.set_colors_from_map("viridis")
bar.set_value([10, 20, 30, 40, 50])
# bar_config = bar._config_dict
bar_config = bar._config_dict
# expected_colors_light = [
# list(color) for color in Colors.golden_angle_color("viridis", 5, "RGB", theme="light")
# ]
# expected_colors_dark = [
# list(color) for color in Colors.golden_angle_color("viridis", 5, "RGB", theme="dark")
# ]
# bar_colors = [ring._config_dict["color"] for ring in bar.rings]
# bar_values = [ring._config_dict["value"] for ring in bar.rings]
# assert bar_config["num_bars"] == 5
# assert bar_values == [10, 20, 30, 40, 50]
# assert bar_colors == expected_colors_light or bar_colors == expected_colors_dark
expected_colors_light = [
list(color) for color in Colors.golden_angle_color("viridis", 5, "RGB", theme="light")
]
expected_colors_dark = [
list(color) for color in Colors.golden_angle_color("viridis", 5, "RGB", theme="dark")
]
bar_colors = [ring._config_dict["color"] for ring in bar.rings]
bar_values = [ring._config_dict["value"] for ring in bar.rings]
assert bar_config["num_bars"] == 5
assert bar_values == [10, 20, 30, 40, 50]
assert bar_colors == expected_colors_light or bar_colors == expected_colors_dark
# def test_ring_bar_scan_update(bec_client_lib, connected_client_dock):
# dock = connected_client_dock
def test_ring_bar_scan_update(bec_client_lib, connected_client_dock):
dock = connected_client_dock
# d0 = dock.add_dock("dock_0")
d0 = dock.add_dock("dock_0")
# bar = d0.add_widget("RingProgressBar")
bar = d0.add_widget("RingProgressBar")
# client = bec_client_lib
# dev = client.device_manager.devices
# dev.samx.tolerance.set(0)
# dev.samy.tolerance.set(0)
# scans = client.scans
client = bec_client_lib
dev = client.device_manager.devices
dev.samx.tolerance.set(0)
dev.samy.tolerance.set(0)
scans = client.scans
# status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
# status.wait()
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
status.wait()
# bar_config = bar._config_dict
# assert bar_config["num_bars"] == 1
# assert bar_config["rings"][0]["value"] == 10
# assert bar_config["rings"][0]["min_value"] == 0
# assert bar_config["rings"][0]["max_value"] == 10
bar_config = bar._config_dict
assert bar_config["num_bars"] == 1
assert bar_config["rings"][0]["value"] == 10
assert bar_config["rings"][0]["min_value"] == 0
assert bar_config["rings"][0]["max_value"] == 10
# status = scans.grid_scan(dev.samx, -5, 5, 4, dev.samy, -10, 10, 4, relative=True, exp_time=0.1)
# status.wait()
status = scans.grid_scan(dev.samx, -5, 5, 4, dev.samy, -10, 10, 4, relative=True, exp_time=0.1)
status.wait()
# bar_config = bar._config_dict
# assert bar_config["num_bars"] == 1
# assert bar_config["rings"][0]["value"] == 16
# assert bar_config["rings"][0]["min_value"] == 0
# assert bar_config["rings"][0]["max_value"] == 16
bar_config = bar._config_dict
assert bar_config["num_bars"] == 1
assert bar_config["rings"][0]["value"] == 16
assert bar_config["rings"][0]["min_value"] == 0
assert bar_config["rings"][0]["max_value"] == 16
# init_samx = dev.samx.read()["samx"]["value"]
# init_samy = dev.samy.read()["samy"]["value"]
# final_samx = init_samx + 5
# final_samy = init_samy + 10
init_samx = dev.samx.read()["samx"]["value"]
init_samy = dev.samy.read()["samy"]["value"]
final_samx = init_samx + 5
final_samy = init_samy + 10
# dev.samx.velocity.put(5)
# dev.samy.velocity.put(5)
dev.samx.velocity.put(5)
dev.samy.velocity.put(5)
# status = scans.umv(dev.samx, 5, dev.samy, 10, relative=True)
# status.wait()
status = scans.umv(dev.samx, 5, dev.samy, 10, relative=True)
status.wait()
# bar_config = bar._config_dict
# assert bar_config["num_bars"] == 2
# assert bar_config["rings"][0]["value"] == final_samx
# assert bar_config["rings"][1]["value"] == final_samy
# assert bar_config["rings"][0]["min_value"] == init_samx
# assert bar_config["rings"][0]["max_value"] == final_samx
# assert bar_config["rings"][1]["min_value"] == init_samy
# assert bar_config["rings"][1]["max_value"] == final_samy
bar_config = bar._config_dict
assert bar_config["num_bars"] == 2
assert bar_config["rings"][0]["value"] == final_samx
assert bar_config["rings"][1]["value"] == final_samy
assert bar_config["rings"][0]["min_value"] == init_samx
assert bar_config["rings"][0]["max_value"] == final_samx
assert bar_config["rings"][1]["min_value"] == init_samy
assert bar_config["rings"][1]["max_value"] == final_samy
# def test_auto_update(bec_client_lib, connected_client_dock_w_auto_updates, qtbot):
# client = bec_client_lib
# dev = client.device_manager.devices
# scans = client.scans
# queue = client.queue
# gui, dock = connected_client_dock_w_auto_updates
# auto_updates = gui.auto_updates
def test_auto_update(bec_client_lib, connected_client_dock_w_auto_updates, qtbot):
client = bec_client_lib
dev = client.device_manager.devices
scans = client.scans
queue = client.queue
gui, dock = connected_client_dock_w_auto_updates
auto_updates = gui.auto_updates
# def get_default_figure():
# return auto_updates.get_default_figure()
def get_default_figure():
return auto_updates.get_default_figure()
# plt = get_default_figure()
# gui.selected_device = "bpm4i"
# status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
# status.wait()
# # get data from curves
# widgets = plt.widget_list
# qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
# item = queue.scan_storage.storage[-1]
# last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
# num_elements = 10
# plot_name = f"Scan {status.scan.scan_number} - {dock.selected_device}"
# qtbot.waitUntil(lambda: check_remote_data_size(widgets[0], plot_name, num_elements))
# plt_data = widgets[0].get_all_data()
# # check plotted data
# assert (
# plt_data[f"Scan {status.scan.scan_number} - bpm4i"]["x"]
# == last_scan_data["samx"]["samx"].val
# )
# assert (
# plt_data[f"Scan {status.scan.scan_number} - bpm4i"]["y"]
# == last_scan_data["bpm4i"]["bpm4i"].val
# )
# status = scans.grid_scan(
# dev.samx, -10, 10, 5, dev.samy, -5, 5, 5, exp_time=0.05, relative=False
# )
# status.wait()
# plt = auto_updates.get_default_figure()
# widgets = plt.widget_list
# qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
# item = queue.scan_storage.storage[-1]
# last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
# plot_name = f"Scan {status.scan.scan_number} - bpm4i"
# num_elements_bec = 25
# qtbot.waitUntil(lambda: check_remote_data_size(widgets[0], plot_name, num_elements_bec))
# plt_data = widgets[0].get_all_data()
# # check plotted data
# assert (
# plt_data[f"Scan {status.scan.scan_number} - {dock.selected_device}"]["x"]
# == last_scan_data["samx"]["samx"].val
# )
# assert (
# plt_data[f"Scan {status.scan.scan_number} - {dock.selected_device}"]["y"]
# == last_scan_data["samy"]["samy"].val
# )
# def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
# gui = connected_client_gui_obj
# assert gui.selected_device is None
# assert len(gui.windows) == 1
# assert gui.windows["main"].widget is gui.main
# assert gui.windows["main"].title == "BEC Widgets"
# mw = gui.main
# assert mw.__class__.__name__ == "BECDockArea"
# xw = gui.new("X")
# assert xw.__class__.__name__ == "BECDockArea"
# assert len(gui.windows) == 2
# gui_info = gui._dump()
# mw_info = gui_info[mw._gui_id]
# assert mw_info["title"] == "BEC Widgets"
# assert mw_info["visible"]
# xw_info = gui_info[xw._gui_id]
# assert xw_info["title"] == "X"
# assert xw_info["visible"]
# gui.hide()
# gui_info = gui._dump()
# assert not any(windows["visible"] for windows in gui_info.values())
# gui.show()
# gui_info = gui._dump()
# assert all(windows["visible"] for windows in gui_info.values())
# assert gui.gui_is_alive()
# gui.close()
# assert not gui.gui_is_alive()
# gui.start_server(wait=True)
# assert gui.gui_is_alive()
# # calling start multiple times should not change anything
# gui.start_server(wait=True)
# gui.start()
# # gui.windows should have main, and main dock area should have same gui_id as before
# assert len(gui.windows) == 1
# assert gui.windows["main"].widget._gui_id == mw._gui_id
# # communication should work, main dock area should have same id and be visible
# gui_info = gui._dump()
# assert gui_info[mw._gui_id]["visible"]
# with pytest.raises(RuntimeError):
# gui.main.delete()
# yw = gui.new("Y")
# assert len(gui.windows) == 2
# yw.delete()
# assert len(gui.windows) == 1
# # check it is really deleted on server
# gui_info = gui._dump()
# assert yw._gui_id not in gui_info
# def test_rpc_call_with_exception_in_safeslot_error_popup(connected_client_gui_obj, qtbot):
# gui = connected_client_gui_obj
# gui.main.add_dock("test")
# qtbot.waitUntil(lambda: len(gui.main.panels) == 2) # default_figure + test
# qtbot.wait(500)
# with pytest.raises(ValueError):
# gui.main.add_dock("test")
# # time.sleep(0.1)
plt = get_default_figure()
gui.selected_device = "bpm4i"
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
status.wait()
# get data from curves
widgets = plt.widget_list
qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
item = queue.scan_storage.storage[-1]
last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
num_elements = 10
plot_name = f"Scan {status.scan.scan_number} - {dock.selected_device}"
qtbot.waitUntil(lambda: check_remote_data_size(widgets[0], plot_name, num_elements))
plt_data = widgets[0].get_all_data()
# check plotted data
assert (
plt_data[f"Scan {status.scan.scan_number} - bpm4i"]["x"]
== last_scan_data["samx"]["samx"].val
)
assert (
plt_data[f"Scan {status.scan.scan_number} - bpm4i"]["y"]
== last_scan_data["bpm4i"]["bpm4i"].val
)
status = scans.grid_scan(
dev.samx, -10, 10, 5, dev.samy, -5, 5, 5, exp_time=0.05, relative=False
)
status.wait()
plt = auto_updates.get_default_figure()
widgets = plt.widget_list
qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
item = queue.scan_storage.storage[-1]
last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
plot_name = f"Scan {status.scan.scan_number} - bpm4i"
num_elements_bec = 25
qtbot.waitUntil(lambda: check_remote_data_size(widgets[0], plot_name, num_elements_bec))
plt_data = widgets[0].get_all_data()
# check plotted data
assert (
plt_data[f"Scan {status.scan.scan_number} - {dock.selected_device}"]["x"]
== last_scan_data["samx"]["samx"].val
)
assert (
plt_data[f"Scan {status.scan.scan_number} - {dock.selected_device}"]["y"]
== last_scan_data["samy"]["samy"].val
)
def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
gui = connected_client_gui_obj
assert gui.selected_device is None
assert len(gui.windows) == 1
assert gui.windows["main"].widget is gui.main
assert gui.windows["main"].title == "BEC Widgets"
mw = gui.main
assert mw.__class__.__name__ == "BECDockArea"
xw = gui.new("X")
assert xw.__class__.__name__ == "BECDockArea"
assert len(gui.windows) == 2
gui_info = gui._dump()
mw_info = gui_info[mw._gui_id]
assert mw_info["title"] == "BEC Widgets"
assert mw_info["visible"]
xw_info = gui_info[xw._gui_id]
assert xw_info["title"] == "X"
assert xw_info["visible"]
gui.hide()
gui_info = gui._dump()
assert not any(windows["visible"] for windows in gui_info.values())
gui.show()
gui_info = gui._dump()
assert all(windows["visible"] for windows in gui_info.values())
assert gui.gui_is_alive()
gui.close()
assert not gui.gui_is_alive()
gui.start_server(wait=True)
assert gui.gui_is_alive()
# calling start multiple times should not change anything
gui.start_server(wait=True)
gui.start()
# gui.windows should have main, and main dock area should have same gui_id as before
assert len(gui.windows) == 1
assert gui.windows["main"].widget._gui_id == mw._gui_id
# communication should work, main dock area should have same id and be visible
gui_info = gui._dump()
assert gui_info[mw._gui_id]["visible"]
with pytest.raises(RuntimeError):
gui.main.delete()
yw = gui.new("Y")
assert len(gui.windows) == 2
yw.delete()
assert len(gui.windows) == 1
# check it is really deleted on server
gui_info = gui._dump()
assert yw._gui_id not in gui_info
def test_rpc_call_with_exception_in_safeslot_error_popup(connected_client_gui_obj, qtbot):
gui = connected_client_gui_obj
gui.main.add_dock("test")
qtbot.waitUntil(lambda: len(gui.main.panels) == 2) # default_figure + test
qtbot.wait(500)
with pytest.raises(ValueError):
gui.main.add_dock("test")
# time.sleep(0.1)

View File

@@ -11,13 +11,14 @@ from bec_widgets.tests.utils import check_remote_data_size
@pytest.fixture
def connected_figure(connected_client_gui_obj):
gui = connected_client_gui_obj
dock = gui.window_list[0].new("dock")
dock = gui.bec.new("dock")
fig = dock.new(name="fig", widget="BECFigure")
return fig
def test_rpc_waveform1d_custom_curve(connected_figure):
fig = connected_figure
# fig = BECFigure(connected_client_figure)
ax = fig.plot()
curve = ax.plot(x=[1, 2, 3], y=[1, 2, 3])
@@ -31,6 +32,7 @@ def test_rpc_waveform1d_custom_curve(connected_figure):
def test_rpc_plotting_shortcuts_init_configs(connected_figure, qtbot):
fig = connected_figure
# fig = BECFigure(connected_client_figure)
plt = fig.plot(x_name="samx", y_name="bpm4i")
im = fig.image("eiger")
@@ -88,6 +90,7 @@ def test_rpc_plotting_shortcuts_init_configs(connected_figure, qtbot):
def test_rpc_waveform_scan(qtbot, connected_figure, bec_client_lib):
# fig = BECFigure(connected_client_figure)
fig = connected_figure
# add 3 different curves to track
plt = fig.plot(x_name="samx", y_name="bpm4i")
@@ -123,6 +126,7 @@ def test_rpc_waveform_scan(qtbot, connected_figure, bec_client_lib):
def test_rpc_image(connected_figure, bec_client_lib):
# fig = BECFigure(connected_client_figure)
fig = connected_figure
im = fig.image("eiger")
@@ -144,6 +148,7 @@ def test_rpc_image(connected_figure, bec_client_lib):
def test_rpc_motor_map(connected_figure, bec_client_lib):
# fig = BECFigure(connected_client_figure)
fig = connected_figure
motor_map = fig.motor_map("samx", "samy")
@@ -175,6 +180,7 @@ def test_rpc_motor_map(connected_figure, bec_client_lib):
def test_dap_rpc(connected_figure, bec_client_lib, qtbot):
fig = connected_figure
# fig = BECFigure(connected_client_figure)
plt = fig.plot(x_name="samx", y_name="bpm4i", dap="GaussianModel")
client = bec_client_lib
@@ -213,6 +219,7 @@ def test_dap_rpc(connected_figure, bec_client_lib, qtbot):
def test_removing_subplots(connected_figure, bec_client_lib):
# fig = BECFigure(connected_client_figure)
fig = connected_figure
plt = fig.plot(x_name="samx", y_name="bpm4i", dap="GaussianModel")
im = fig.image(monitor="eiger")

View File

@@ -5,7 +5,7 @@ from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWave
def test_rpc_register_list_connections(connected_client_gui_obj):
gui = connected_client_gui_obj
fig = gui.window_list[0].new("fig").new(name="fig", widget="BECFigure")
fig = gui.bec.new("fig").new(name="fig", widget="BECFigure")
plt = fig.plot(x_name="samx", y_name="bpm4i")
im = fig.image("eiger")

View File

@@ -200,41 +200,3 @@ def mocked_client_with_dap(mocked_client, dap_plugin_message):
client.dap._available_dap_plugins = patched_models
yield client
class DummyData:
def __init__(self, val, timestamps):
self.val = val
self.timestamps = timestamps
def get(self, key, default=None):
if key == "val":
return self.val
return default
def create_dummy_scan_item():
"""
Helper to create a dummy scan item with both live_data and metadata/status_message info.
"""
dummy_live_data = {
"samx": {"samx": DummyData(val=[10, 20, 30], timestamps=[100, 200, 300])},
"samy": {"samy": DummyData(val=[5, 10, 15], timestamps=[100, 200, 300])},
"bpm4i": {"bpm4i": DummyData(val=[5, 6, 7], timestamps=[101, 201, 301])},
"async_device": {"async_device": DummyData(val=[1, 2, 3], timestamps=[11, 21, 31])},
}
dummy_scan = MagicMock()
dummy_scan.live_data = dummy_live_data
dummy_scan.metadata = {
"bec": {
"scan_id": "dummy",
"scan_report_devices": ["samx"],
"readout_priority": {"monitored": ["bpm4i"], "async": ["async_device"]},
}
}
dummy_scan.status_message = MagicMock()
dummy_scan.status_message.info = {
"readout_priority": {"monitored": ["bpm4i"], "async": ["async_device"]},
"scan_report_devices": ["samx"],
}
return dummy_scan

View File

@@ -118,19 +118,12 @@ def test_toolbar_add_plot_waveform(bec_dock_area):
assert bec_dock_area.panels["Waveform_0"].widgets[0].config.widget_class == "Waveform"
def test_toolbar_add_plot_scatter_waveform(bec_dock_area):
bec_dock_area.toolbar.widgets["menu_plots"].widgets["scatter_waveform"].trigger()
assert "ScatterWaveform_0" in bec_dock_area.panels
assert (
bec_dock_area.panels["ScatterWaveform_0"].widgets[0].config.widget_class
== "ScatterWaveform"
)
def test_toolbar_add_plot_image(bec_dock_area):
bec_dock_area.toolbar.widgets["menu_plots"].widgets["image"].trigger()
assert "Image_0" in bec_dock_area.panels
assert bec_dock_area.panels["Image_0"].widgets[0].config.widget_class == "Image"
assert "BECImageWidget_0" in bec_dock_area.panels
assert (
bec_dock_area.panels["BECImageWidget_0"].widgets[0].config.widget_class == "BECImageWidget"
)
def test_toolbar_add_plot_motor_map(bec_dock_area):
@@ -142,15 +135,6 @@ def test_toolbar_add_plot_motor_map(bec_dock_area):
)
def test_toolbar_add_multi_waveform(bec_dock_area):
bec_dock_area.toolbar.widgets["menu_plots"].widgets["multi_waveform"].trigger()
assert "BECMultiWaveformWidget_0" in bec_dock_area.panels
assert (
bec_dock_area.panels["BECMultiWaveformWidget_0"].widgets[0].config.widget_class
== "BECMultiWaveformWidget"
)
def test_toolbar_add_device_positioner_box(bec_dock_area):
bec_dock_area.toolbar.widgets["menu_devices"].widgets["positioner_box"].trigger()
assert "PositionerBox_0" in bec_dock_area.panels

View File

@@ -0,0 +1,224 @@
from unittest.mock import MagicMock
import pyqtgraph as pg
import pytest
from bec_widgets.widgets.control.device_input.base_classes.device_input_base import BECDeviceFilter
from bec_widgets.widgets.plots.image.image_widget import BECImageWidget
from .client_mocks import mocked_client
@pytest.fixture
def image_widget(qtbot, mocked_client):
widget = BECImageWidget(client=mocked_client())
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
@pytest.fixture
def mock_image(image_widget):
image_mock = MagicMock()
image_widget._image = image_mock
return image_mock
def test_image_widget_init(image_widget):
assert image_widget is not None
assert image_widget.client is not None
assert isinstance(image_widget, BECImageWidget)
assert image_widget.config.widget_class == "BECImageWidget"
assert image_widget._image is not None
assert (
BECDeviceFilter.DEVICE
in image_widget.toolbar.widgets["monitor"].device_combobox.config.device_filter
)
assert image_widget.toolbar.widgets["drag_mode"].action.isChecked() == True
assert image_widget.toolbar.widgets["rectangle_mode"].action.isChecked() == False
assert image_widget.toolbar.widgets["auto_range"].action.isChecked() == False
assert image_widget.toolbar.widgets["auto_range_image"].action.isChecked() == True
assert image_widget.toolbar.widgets["FFT"].action.isChecked() == False
assert image_widget.toolbar.widgets["transpose"].action.isChecked() == False
assert image_widget.toolbar.widgets["log"].action.isChecked() == False
###################################
# Toolbar Actions
###################################
def test_toolbar_connect_action(image_widget, mock_image, qtbot):
combo_device = image_widget.toolbar.widgets["monitor"].device_combobox
combo_device.setCurrentText("eiger")
qtbot.wait(200)
assert combo_device.currentText() == "eiger"
combo_dim = image_widget.toolbar.widgets["monitor_type"].widget
combo_dim.setCurrentText("2d")
qtbot.wait(200)
assert combo_dim.currentText() == "2d"
action = image_widget.toolbar.widgets["connect"].action
action.trigger()
image_widget._image.image.assert_called_once_with(
monitor="eiger",
monitor_type="2d",
color_map="magma",
color_bar="full",
downsample=True,
opacity=1.0,
vrange=None,
)
def test_image_toolbar_drag_mode_action_triggered(image_widget, qtbot):
action_drag = image_widget.toolbar.widgets["drag_mode"].action
action_rectangle = image_widget.toolbar.widgets["rectangle_mode"].action
action_drag.trigger()
assert action_drag.isChecked() == True
assert action_rectangle.isChecked() == False
def test_image_toolbar_rectangle_mode_action_triggered(image_widget, qtbot):
action_drag = image_widget.toolbar.widgets["drag_mode"].action
action_rectangle = image_widget.toolbar.widgets["rectangle_mode"].action
action_rectangle.trigger()
assert action_drag.isChecked() == False
assert action_rectangle.isChecked() == True
def test_image_toolbar_auto_range(image_widget, mock_image):
action = image_widget.toolbar.widgets["auto_range"].action
action.trigger()
image_widget._image.set_auto_range.assert_called_once_with(True, "xy")
def test_image_toolbar_enable_mouse_pan_mode(qtbot, image_widget):
action_drag = image_widget.toolbar.widgets["drag_mode"].action
action_rectangle = image_widget.toolbar.widgets["rectangle_mode"].action
mock_view_box = MagicMock()
image_widget._image.plot_item.getViewBox = MagicMock(return_value=mock_view_box)
image_widget.enable_mouse_pan_mode()
assert action_drag.isChecked() == True
assert action_rectangle.isChecked() == False
mock_view_box.setMouseMode.assert_called_once_with(pg.ViewBox.PanMode)
def test_image_toolbar_auto_range_image(image_widget, mock_image):
action = image_widget.toolbar.widgets["auto_range_image"].action
action.trigger()
assert action.isChecked() == False
image_widget._image.set_autorange.assert_called_once_with(False)
def test_image_toolbar_FFT(image_widget, mock_image):
action = image_widget.toolbar.widgets["FFT"].action
action.trigger()
assert action.isChecked() == True
image_widget._image.set_fft.assert_called_once_with(True, None)
def test_image_toolbar_log(image_widget, mock_image):
action = image_widget.toolbar.widgets["log"].action
action.trigger()
assert action.isChecked() == True
image_widget._image.set_log.assert_called_once_with(True, None)
def test_image_toggle_transpose(image_widget, mock_image):
action = image_widget.toolbar.widgets["transpose"].action
action.trigger()
assert action.isChecked() == True
image_widget._image.set_transpose.assert_called_once_with(True, None)
def test_image_toolbar_rotation(image_widget, mock_image):
action_left = image_widget.toolbar.widgets["rotate_left"].action
action_right = image_widget.toolbar.widgets["rotate_right"].action
action_left.trigger()
image_widget._image.set_rotation(1, None)
action_right.trigger()
image_widget._image.set_rotation(2, None)
action_right.trigger()
image_widget._image.set_rotation(1, None)
###################################
# Wrapper methods for ImageShow
###################################
def test_image_set_image(image_widget, mock_image):
image_widget.image(monitor="image", monitor_type="2d")
image_widget._image.image.assert_called_once_with(
monitor="image",
monitor_type="2d",
color_map="magma",
color_bar="full",
downsample=True,
opacity=1.0,
vrange=None,
)
def test_image_vrange(image_widget, mock_image):
image_widget.set_vrange(0, 1)
image_widget._image.set_vrange.assert_called_once_with(0, 1, None)
def test_image_set_color_map(image_widget, mock_image):
image_widget.set_color_map("viridis")
image_widget._image.set_color_map.assert_called_once_with("viridis", None)
def test_image_widget_set_title(image_widget, mock_image):
image_widget.set_title("Title Label")
image_widget._image.set_title.assert_called_once_with("Title Label")
def test_image_widget_set_x_label(image_widget, mock_image):
image_widget.set_x_label("X Label")
image_widget._image.set_x_label.assert_called_once_with("X Label")
def test_image_widget_set_y_label(image_widget, mock_image):
image_widget.set_y_label("Y Label")
image_widget._image.set_y_label.assert_called_once_with("Y Label")
def test_image_widget_set_x_scale(image_widget, mock_image):
image_widget.set_x_scale("linear")
image_widget._image.set_x_scale.assert_called_once_with("linear")
def test_image_widget_set_y_scale(image_widget, mock_image):
image_widget.set_y_scale("log")
image_widget._image.set_y_scale.assert_called_once_with("log")
def test_image_widget_set_x_lim(image_widget, mock_image):
image_widget.set_x_lim((0, 10))
image_widget._image.set_x_lim.assert_called_once_with((0, 10))
def test_image_widget_set_y_lim(image_widget, mock_image):
image_widget.set_y_lim((0, 10))
image_widget._image.set_y_lim.assert_called_once_with((0, 10))
def test_image_widget_set_grid(image_widget, mock_image):
image_widget.set_grid(True, False)
image_widget._image.set_grid.assert_called_once_with(True, False)
def test_image_widget_lock_aspect_ratio(image_widget, mock_image):
image_widget.lock_aspect_ratio(True)
image_widget._image.lock_aspect_ratio.assert_called_once_with(True)
def test_image_widget_export(image_widget, mock_image):
image_widget.export()
image_widget._image.export.assert_called_once()

View File

@@ -4,6 +4,9 @@ import pytest
from qtpy.QtCore import QPointF, Qt
from bec_widgets.utils import Crosshair
from bec_widgets.widgets.plots.image.image_widget import BECImageWidget
from .client_mocks import mocked_client
# pylint: disable = redefined-outer-name
@@ -22,20 +25,14 @@ def plot_widget_with_crosshair(qtbot):
@pytest.fixture
def image_widget_with_crosshair(qtbot):
widget = pg.PlotWidget()
def image_widget_with_crosshair(qtbot, mocked_client):
widget = BECImageWidget(client=mocked_client())
widget._image.add_custom_image(name="test", data=np.random.random((100, 200)))
widget._image.hook_crosshair()
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
image_item = pg.ImageItem()
image_item.setImage(np.random.rand(100, 100))
image_item.config = type("obj", (object,), {"monitor": "test"})
widget.addItem(image_item)
plot_item = widget.getPlotItem()
crosshair = Crosshair(plot_item=plot_item, precision=3)
yield crosshair, plot_item
yield widget._image.crosshair, widget._image.plot_item
def test_mouse_moved_lines(plot_widget_with_crosshair):
@@ -107,13 +104,13 @@ def test_mouse_moved_signals_2D(image_widget_with_crosshair):
crosshair.coordinatesChanged2D.connect(slot)
pos_in_view = QPointF(21.0, 55.0)
pos_in_view = QPointF(22.0, 55.0)
pos_in_scene = plot_item.vb.mapViewToScene(pos_in_view)
event_mock = [pos_in_scene]
crosshair.mouse_moved(event_mock)
assert emitted_values_2D == [("test", 21, 55)]
assert emitted_values_2D == [("test", 22.0, 55.0)]
def test_mouse_moved_signals_2D_outside(image_widget_with_crosshair):
@@ -229,41 +226,3 @@ def test_crosshair_clicked_signal(qtbot, plot_widget_with_crosshair):
assert np.isclose(round(x, 1), 2)
assert np.isclose(round(y, 1), 5)
def test_update_coord_label_1D(plot_widget_with_crosshair):
crosshair, _ = plot_widget_with_crosshair
# Provide a test position
pos = (10, 20)
crosshair.update_coord_label(pos)
expected_text = f"({10:.3g}, {20:.3g})"
# Verify that the coordinate label shows only the 1D coordinates (no intensity line)
assert crosshair.coord_label.toPlainText() == expected_text
label_pos = crosshair.coord_label.pos()
assert np.isclose(label_pos.x(), 10)
assert np.isclose(label_pos.y(), 20)
assert crosshair.coord_label.isVisible()
def test_update_coord_label_2D(image_widget_with_crosshair):
crosshair, plot_item = image_widget_with_crosshair
known_image = np.array([[10, 20], [30, 40]], dtype=float)
for item in plot_item.items:
if isinstance(item, pg.ImageItem):
item.setImage(known_image)
pos = (0.5, 1.2)
crosshair.update_coord_label(pos)
ix = int(np.clip(0.5, 0, known_image.shape[0] - 1)) # 0
iy = int(np.clip(1.2, 0, known_image.shape[1] - 1)) # 1
intensity = known_image[ix, iy] # Expected: 20
expected_text = f"({0.5:.3g}, {1.2:.3g})\nIntensity: {intensity:.3g}"
assert crosshair.coord_label.toPlainText() == expected_text
label_pos = crosshair.coord_label.pos()
assert np.isclose(label_pos.x(), 0.5)
assert np.isclose(label_pos.y(), 1.2)
assert crosshair.coord_label.isVisible()

View File

@@ -1,331 +0,0 @@
import numpy as np
import pyqtgraph as pg
import pytest
from bec_widgets.widgets.plots_next_gen.image.image import Image
from tests.unit_tests.client_mocks import mocked_client
from tests.unit_tests.conftest import create_widget
##################################################
# Image widget base functionality tests
##################################################
def test_initialization_defaults(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
assert bec_image_view.color_map == "magma"
assert bec_image_view.autorange is True
assert bec_image_view.autorange_mode == "mean"
assert bec_image_view.config.lock_aspect_ratio is True
assert bec_image_view.main_image is not None
assert bec_image_view._color_bar is None
def test_setting_color_map(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.color_map = "viridis"
assert bec_image_view.color_map == "viridis"
assert bec_image_view.config.color_map == "viridis"
def test_invalid_color_map_handling(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
previous_colormap = bec_image_view.color_map
bec_image_view.color_map = "invalid_colormap_name"
assert bec_image_view.color_map == previous_colormap
assert bec_image_view.main_image.color_map == previous_colormap
def test_toggle_autorange(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.autorange = False
assert bec_image_view.autorange is False
bec_image_view.toggle_autorange(True, "max")
assert bec_image_view.autorange is True
assert bec_image_view.autorange_mode == "max"
assert bec_image_view.main_image.autorange is True
assert bec_image_view.main_image.autorange_mode == "max"
assert bec_image_view.main_image.config.autorange is True
assert bec_image_view.main_image.config.autorange_mode == "max"
def test_lock_aspect_ratio(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.lock_aspect_ratio = True
assert bec_image_view.lock_aspect_ratio is True
assert bool(bec_image_view.plot_item.getViewBox().state["aspectLocked"]) is True
assert bec_image_view.config.lock_aspect_ratio is True
def test_set_vrange(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.vrange = (10, 100)
assert bec_image_view.vrange == (10, 100)
assert bec_image_view.main_image.levels == (10, 100)
assert bec_image_view.main_image.config.v_range == (10, 100)
def test_enable_simple_colorbar(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.enable_simple_colorbar = True
assert bec_image_view.enable_simple_colorbar is True
assert bec_image_view.config.color_bar == "simple"
assert isinstance(bec_image_view._color_bar, pg.ColorBarItem)
# Enabling color bar should not cancel autorange
assert bec_image_view.autorange is True
assert bec_image_view.autorange_mode == "mean"
assert bec_image_view.main_image.autorange is True
assert bec_image_view.main_image.autorange_mode == "mean"
def test_enable_full_colorbar(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.enable_full_colorbar = True
assert bec_image_view.enable_full_colorbar is True
assert bec_image_view.config.color_bar == "full"
assert isinstance(bec_image_view._color_bar, pg.HistogramLUTItem)
# Enabling color bar should not cancel autorange
assert bec_image_view.autorange is True
assert bec_image_view.autorange_mode == "mean"
assert bec_image_view.main_image.autorange is True
assert bec_image_view.main_image.autorange_mode == "mean"
@pytest.mark.parametrize("colorbar_type", ["simple", "full"])
def test_enable_colorbar_with_vrange(qtbot, mocked_client, colorbar_type):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.enable_colorbar(True, colorbar_type, (0, 100))
if colorbar_type == "simple":
assert isinstance(bec_image_view._color_bar, pg.ColorBarItem)
assert bec_image_view.enable_simple_colorbar is True
else:
assert isinstance(bec_image_view._color_bar, pg.HistogramLUTItem)
assert bec_image_view.enable_full_colorbar is True
assert bec_image_view.config.color_bar == colorbar_type
assert bec_image_view.vrange == (0, 100)
assert bec_image_view.main_image.levels == (0, 100)
assert bec_image_view._color_bar is not None
def test_image_setup_image_2d(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.image(monitor="eiger", monitor_type="2d")
assert bec_image_view.monitor == "eiger"
assert bec_image_view.main_image.config.source == "device_monitor_2d"
assert bec_image_view.main_image.config.monitor_type == "2d"
assert bec_image_view.main_image.raw_data is None
assert bec_image_view.main_image.image is None
def test_image_setup_image_1d(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.image(monitor="eiger", monitor_type="1d")
assert bec_image_view.monitor == "eiger"
assert bec_image_view.main_image.config.source == "device_monitor_1d"
assert bec_image_view.main_image.config.monitor_type == "1d"
assert bec_image_view.main_image.raw_data is None
assert bec_image_view.main_image.image is None
def test_image_setup_image_auto(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.image(monitor="eiger", monitor_type="auto")
assert bec_image_view.monitor == "eiger"
assert bec_image_view.main_image.config.source == "auto"
assert bec_image_view.main_image.config.monitor_type == "auto"
assert bec_image_view.main_image.raw_data is None
assert bec_image_view.main_image.image is None
def test_image_data_update_2d(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
test_data = np.random.rand(20, 30)
message = {"data": test_data}
metadata = {}
bec_image_view.on_image_update_2d(message, metadata)
np.testing.assert_array_equal(bec_image_view._main_image.image, test_data)
def test_image_data_update_1d(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
waveform1 = np.random.rand(50)
waveform2 = np.random.rand(60) # Different length, tests padding logic
metadata = {"scan_id": "scan_test"}
bec_image_view.on_image_update_1d({"data": waveform1}, metadata)
assert bec_image_view._main_image.raw_data.shape == (1, 50)
bec_image_view.on_image_update_1d({"data": waveform2}, metadata)
assert bec_image_view._main_image.raw_data.shape == (2, 60)
def test_toolbar_actions_presence(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
assert "autorange_image" in bec_image_view.toolbar.bundles["roi"]
assert "lock_aspect_ratio" in bec_image_view.toolbar.bundles["mouse_interaction"]
assert "processing" in bec_image_view.toolbar.bundles
assert "selection" in bec_image_view.toolbar.bundles
def test_image_processing_fft_toggle(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.fft = True
assert bec_image_view.fft is True
bec_image_view.fft = False
assert bec_image_view.fft is False
def test_image_processing_log_toggle(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.log = True
assert bec_image_view.log is True
bec_image_view.log = False
assert bec_image_view.log is False
def test_image_rotation_and_transpose(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.rotation = 2
assert bec_image_view.rotation == 2
bec_image_view.transpose = True
assert bec_image_view.transpose is True
@pytest.mark.parametrize("colorbar_type", ["none", "simple", "full"])
def test_setting_vrange_with_colorbar(qtbot, mocked_client, colorbar_type):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
if colorbar_type == "simple":
bec_image_view.enable_simple_colorbar = True
elif colorbar_type == "full":
bec_image_view.enable_full_colorbar = True
bec_image_view.vrange = (0, 100)
assert bec_image_view.vrange == (0, 100)
assert bec_image_view.main_image.levels == (0, 100)
assert bec_image_view.main_image.config.v_range == (0, 100)
assert bec_image_view.v_min == 0
assert bec_image_view.v_max == 100
if colorbar_type == "simple":
assert isinstance(bec_image_view._color_bar, pg.ColorBarItem)
assert bec_image_view._color_bar.levels() == (0, 100)
elif colorbar_type == "full":
assert isinstance(bec_image_view._color_bar, pg.HistogramLUTItem)
assert bec_image_view._color_bar.getLevels() == (0, 100)
###################################
# Toolbar Actions
###################################
def test_setup_image_from_toolbar(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.selection_bundle.device_combo_box.setCurrentText("eiger")
bec_image_view.selection_bundle.dim_combo_box.setCurrentText("2d")
assert bec_image_view.monitor == "eiger"
assert bec_image_view.main_image.config.source == "device_monitor_2d"
assert bec_image_view.main_image.config.monitor_type == "2d"
assert bec_image_view.main_image.raw_data is None
assert bec_image_view.main_image.image is None
def test_image_actions_interactions(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.autorange = False # Change the initial state to False
bec_image_view.autorange_mean_action.action.trigger()
assert bec_image_view.autorange is True
assert bec_image_view.main_image.autorange is True
assert bec_image_view.autorange_mode == "mean"
bec_image_view.autorange_max_action.action.trigger()
assert bec_image_view.autorange is True
assert bec_image_view.main_image.autorange is True
assert bec_image_view.autorange_mode == "max"
bec_image_view.toolbar.widgets["lock_aspect_ratio"].action.trigger()
assert bec_image_view.lock_aspect_ratio is False
assert bool(bec_image_view.plot_item.getViewBox().state["aspectLocked"]) is False
def test_image_toggle_action_fft(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.processing_bundle.fft.action.trigger()
assert bec_image_view.fft is True
assert bec_image_view.main_image.fft is True
assert bec_image_view.main_image.config.processing.fft is True
def test_image_toggle_action_log(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.processing_bundle.log.action.trigger()
assert bec_image_view.log is True
assert bec_image_view.main_image.log is True
assert bec_image_view.main_image.config.processing.log is True
def test_image_toggle_action_transpose(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.processing_bundle.transpose.action.trigger()
assert bec_image_view.transpose is True
assert bec_image_view.main_image.transpose is True
assert bec_image_view.main_image.config.processing.transpose is True
def test_image_toggle_action_rotate_right(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.processing_bundle.right.action.trigger()
assert bec_image_view.rotation == 3
assert bec_image_view.main_image.rotation == 3
assert bec_image_view.main_image.config.processing.rotation == 3
def test_image_toggle_action_rotate_left(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.processing_bundle.left.action.trigger()
assert bec_image_view.rotation == 1
assert bec_image_view.main_image.rotation == 1
assert bec_image_view.main_image.config.processing.rotation == 1
def test_image_toggle_action_reset(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
# Setup some processing
bec_image_view.fft = True
bec_image_view.log = True
bec_image_view.transpose = True
bec_image_view.rotation = 2
bec_image_view.processing_bundle.reset.action.trigger()
assert bec_image_view.rotation == 0
assert bec_image_view.main_image.rotation == 0
assert bec_image_view.main_image.config.processing.rotation == 0
assert bec_image_view.fft is False
assert bec_image_view.main_image.fft is False
assert bec_image_view.log is False
assert bec_image_view.main_image.log is False
assert bec_image_view.transpose is False
assert bec_image_view.main_image.transpose is False

View File

@@ -1,153 +0,0 @@
import json
import numpy as np
from bec_widgets.widgets.plots_next_gen.scatter_waveform.scatter_curve import (
ScatterCurveConfig,
ScatterDeviceSignal,
)
from bec_widgets.widgets.plots_next_gen.scatter_waveform.scatter_waveform import ScatterWaveform
from tests.unit_tests.client_mocks import create_dummy_scan_item, mocked_client
from .conftest import create_widget
def test_waveform_initialization(qtbot, mocked_client):
"""
Test that a new Waveform widget initializes with the correct defaults.
"""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
assert swf.objectName() == "ScatterWaveform"
# Inherited from PlotBase
assert swf.title == ""
assert swf.x_label == ""
assert swf.y_label == ""
# No crosshair or FPS monitor by default
assert swf.crosshair is None
assert swf.fps_monitor is None
assert swf.main_curve is not None
def test_scatter_waveform_plot(qtbot, mocked_client):
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
curve = swf.plot("samx", "samy", "bpm4i")
assert curve is not None
assert isinstance(curve.config, ScatterCurveConfig)
assert curve.config.x_device == ScatterDeviceSignal(name="samx", entry="samx")
assert curve.config.label == "bpm4i-bpm4i"
def test_scatter_waveform_color_map(qtbot, mocked_client):
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
assert swf.color_map == "magma"
swf.color_map = "plasma"
assert swf.color_map == "plasma"
def test_scatter_waveform_curve_json(qtbot, mocked_client):
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
# Add a device-based scatter curve
swf.plot(x_name="samx", y_name="samy", z_name="bpm4i", label="test_curve")
json_str = swf.curve_json
data = json.loads(json_str)
assert isinstance(data, dict)
assert data["label"] == "test_curve"
assert data["x_device"]["name"] == "samx"
assert data["y_device"]["name"] == "samy"
assert data["z_device"]["name"] == "bpm4i"
# Clear and reload from JSON
swf.clear_all()
assert swf.main_curve.getData() == (None, None)
swf.curve_json = json_str
assert swf.main_curve.config.label == "test_curve"
def test_scatter_waveform_update_with_scan_history(qtbot, mocked_client, monkeypatch):
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
dummy_scan = create_dummy_scan_item()
mocked_client.history.get_by_scan_id.return_value = dummy_scan
mocked_client.history.__getitem__.return_value = dummy_scan
swf.plot("samx", "samy", "bpm4i", label="test_curve")
swf.update_with_scan_history(scan_id="dummy")
qtbot.wait(500)
assert swf.scan_item == dummy_scan
x_data, y_data = swf.main_curve.getData()
np.testing.assert_array_equal(x_data, [10, 20, 30])
np.testing.assert_array_equal(y_data, [5, 10, 15])
def test_scatter_waveform_live_update(qtbot, mocked_client, monkeypatch):
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
dummy_scan = create_dummy_scan_item()
monkeypatch.setattr(swf.queue.scan_storage, "find_scan_by_ID", lambda scan_id: dummy_scan)
swf.plot("samx", "samy", "bpm4i", label="live_curve")
# Simulate scan status indicating new scan start
msg = {"scan_id": "dummy"}
meta = {}
swf.on_scan_status(msg, meta)
assert swf.scan_id == "dummy"
assert swf.scan_item == dummy_scan
qtbot.wait(500)
x_data, y_data = swf.main_curve.getData()
np.testing.assert_array_equal(x_data, [10, 20, 30])
np.testing.assert_array_equal(y_data, [5, 10, 15])
def test_scatter_waveform_scan_progress(qtbot, mocked_client, monkeypatch):
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
dummy_scan = create_dummy_scan_item()
monkeypatch.setattr(swf.queue.scan_storage, "find_scan_by_ID", lambda scan_id: dummy_scan)
swf.plot("samx", "samy", "bpm4i")
# Simulate scan status indicating scan progress
swf.scan_id = "dummy"
swf.scan_item = dummy_scan
msg = {"progress": 50}
meta = {}
swf.on_scan_progress(msg, meta)
qtbot.wait(500)
# swf.update_sync_curves()
x_data, y_data = swf.main_curve.getData()
np.testing.assert_array_equal(x_data, [10, 20, 30])
np.testing.assert_array_equal(y_data, [5, 10, 15])
def test_scatter_waveform_settings_popup(qtbot, mocked_client):
"""
Test that the settings popup is created correctly.
"""
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
scatter_popup_action = swf.toolbar.widgets["scatter_waveform_settings"].action
assert not scatter_popup_action.isChecked(), "Should start unchecked"
swf.show_scatter_curve_settings()
assert swf.scatter_dialog is not None
assert swf.scatter_dialog.isVisible()
assert scatter_popup_action.isChecked()
swf.scatter_dialog.close()
assert swf.scatter_dialog is None
assert not scatter_popup_action.isChecked(), "Should be unchecked after closing dialog"

View File

@@ -9,13 +9,7 @@ from pyqtgraph.graphicsItems.DateAxisItem import DateAxisItem
from bec_widgets.widgets.plots_next_gen.plot_base import UIMode
from bec_widgets.widgets.plots_next_gen.waveform.curve import DeviceSignal
from bec_widgets.widgets.plots_next_gen.waveform.waveform import Waveform
from tests.unit_tests.client_mocks import (
DummyData,
create_dummy_scan_item,
dap_plugin_message,
mocked_client,
mocked_client_with_dap,
)
from tests.unit_tests.client_mocks import dap_plugin_message, mocked_client, mocked_client_with_dap
from .conftest import create_widget
@@ -322,6 +316,43 @@ def test_curve_json_setter_ignores_custom(qtbot, mocked_client):
##################################################
class DummyData:
def __init__(self, val, timestamps):
self.val = val
self.timestamps = timestamps
def get(self, key, default=None):
if key == "val":
return self.val
return default
def create_dummy_scan_item():
"""
Helper to create a dummy scan item with both live_data and metadata/status_message info.
"""
dummy_live_data = {
"samx": {"samx": DummyData(val=[10, 20, 30], timestamps=[100, 200, 300])},
"bpm4i": {"bpm4i": DummyData(val=[5, 6, 7], timestamps=[101, 201, 301])},
"async_device": {"async_device": DummyData(val=[1, 2, 3], timestamps=[11, 21, 31])},
}
dummy_scan = MagicMock()
dummy_scan.live_data = dummy_live_data
dummy_scan.metadata = {
"bec": {
"scan_id": "dummy",
"scan_report_devices": ["samx"],
"readout_priority": {"monitored": ["bpm4i"], "async": ["async_device"]},
}
}
dummy_scan.status_message = MagicMock()
dummy_scan.status_message.info = {
"readout_priority": {"monitored": ["bpm4i"], "async": ["async_device"]},
"scan_report_devices": ["samx"],
}
return dummy_scan
def test_update_sync_curves(monkeypatch, qtbot, mocked_client):
"""
Test that update_sync_curves retrieves live data correctly and calls setData on sync curves.