mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-04-30 12:02:29 +02:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ef3146f1ea |
-5727
File diff suppressed because it is too large
Load Diff
@@ -1,168 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from queue import Queue
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .client import BECDockArea, BECFigure
|
||||
|
||||
|
||||
class ScanInfo(BaseModel):
|
||||
scan_id: str
|
||||
scan_number: int
|
||||
scan_name: str
|
||||
scan_report_devices: list
|
||||
monitored_devices: list
|
||||
status: str
|
||||
model_config: dict = {"validate_assignment": True}
|
||||
|
||||
|
||||
class AutoUpdates:
|
||||
create_default_dock: bool = False
|
||||
enabled: bool = False
|
||||
dock_name: str = None
|
||||
|
||||
def __init__(self, gui: BECDockArea):
|
||||
self.gui = gui
|
||||
self._default_dock = None
|
||||
self._default_fig = None
|
||||
|
||||
def start_default_dock(self):
|
||||
"""
|
||||
Create a default dock for the auto updates.
|
||||
"""
|
||||
self.dock_name = "default_figure"
|
||||
self._default_dock = self.gui.add_dock(self.dock_name)
|
||||
self._default_dock.add_widget("BECFigure")
|
||||
self._default_fig = self._default_dock.widget_list[0]
|
||||
|
||||
@staticmethod
|
||||
def get_scan_info(msg) -> ScanInfo:
|
||||
"""
|
||||
Update the script with the given data.
|
||||
"""
|
||||
info = msg.info
|
||||
status = msg.status
|
||||
scan_id = msg.scan_id
|
||||
scan_number = info.get("scan_number", 0)
|
||||
scan_name = info.get("scan_name", "Unknown")
|
||||
scan_report_devices = info.get("scan_report_devices", [])
|
||||
monitored_devices = info.get("readout_priority", {}).get("monitored", [])
|
||||
monitored_devices = [dev for dev in monitored_devices if dev not in scan_report_devices]
|
||||
return ScanInfo(
|
||||
scan_id=scan_id,
|
||||
scan_number=scan_number,
|
||||
scan_name=scan_name,
|
||||
scan_report_devices=scan_report_devices,
|
||||
monitored_devices=monitored_devices,
|
||||
status=status,
|
||||
)
|
||||
|
||||
def get_default_figure(self) -> BECFigure | None:
|
||||
"""
|
||||
Get the default figure from the GUI.
|
||||
"""
|
||||
return self._default_fig
|
||||
|
||||
def do_update(self, msg):
|
||||
"""
|
||||
Run the update function if enabled.
|
||||
"""
|
||||
if not self.enabled:
|
||||
return
|
||||
if msg.status != "open":
|
||||
return
|
||||
info = self.get_scan_info(msg)
|
||||
return self.handler(info)
|
||||
|
||||
def get_selected_device(self, monitored_devices, selected_device):
|
||||
"""
|
||||
Get the selected device for the plot. If no device is selected, the first
|
||||
device in the monitored devices list is selected.
|
||||
"""
|
||||
if selected_device:
|
||||
return selected_device
|
||||
if len(monitored_devices) > 0:
|
||||
sel_device = monitored_devices[0]
|
||||
return sel_device
|
||||
return None
|
||||
|
||||
def handler(self, info: ScanInfo) -> None:
|
||||
"""
|
||||
Default update function.
|
||||
"""
|
||||
if info.scan_name == "line_scan" and info.scan_report_devices:
|
||||
return self.simple_line_scan(info)
|
||||
if info.scan_name == "grid_scan" and info.scan_report_devices:
|
||||
return self.simple_grid_scan(info)
|
||||
if info.scan_report_devices:
|
||||
return self.best_effort(info)
|
||||
|
||||
def simple_line_scan(self, info: ScanInfo) -> None:
|
||||
"""
|
||||
Simple line scan.
|
||||
"""
|
||||
fig = self.get_default_figure()
|
||||
if not fig:
|
||||
return
|
||||
dev_x = info.scan_report_devices[0]
|
||||
selected_device = yield self.gui.selected_device
|
||||
dev_y = self.get_selected_device(info.monitored_devices, selected_device)
|
||||
if not dev_y:
|
||||
return
|
||||
yield fig.clear_all()
|
||||
yield fig.plot(
|
||||
x_name=dev_x,
|
||||
y_name=dev_y,
|
||||
label=f"Scan {info.scan_number} - {dev_y}",
|
||||
title=f"Scan {info.scan_number}",
|
||||
x_label=dev_x,
|
||||
y_label=dev_y,
|
||||
)
|
||||
|
||||
def simple_grid_scan(self, info: ScanInfo) -> None:
|
||||
"""
|
||||
Simple grid scan.
|
||||
"""
|
||||
fig = self.get_default_figure()
|
||||
if not fig:
|
||||
return
|
||||
dev_x = info.scan_report_devices[0]
|
||||
dev_y = info.scan_report_devices[1]
|
||||
selected_device = yield self.gui.selected_device
|
||||
dev_z = self.get_selected_device(info.monitored_devices, selected_device)
|
||||
yield fig.clear_all()
|
||||
yield fig.plot(
|
||||
x_name=dev_x,
|
||||
y_name=dev_y,
|
||||
z_name=dev_z,
|
||||
label=f"Scan {info.scan_number} - {dev_z}",
|
||||
title=f"Scan {info.scan_number}",
|
||||
x_label=dev_x,
|
||||
y_label=dev_y,
|
||||
)
|
||||
|
||||
def best_effort(self, info: ScanInfo) -> None:
|
||||
"""
|
||||
Best effort scan.
|
||||
"""
|
||||
fig = self.get_default_figure()
|
||||
if not fig:
|
||||
return
|
||||
dev_x = info.scan_report_devices[0]
|
||||
selected_device = yield self.gui.selected_device
|
||||
dev_y = self.get_selected_device(info.monitored_devices, selected_device)
|
||||
if not dev_y:
|
||||
return
|
||||
yield fig.clear_all()
|
||||
yield fig.plot(
|
||||
x_name=dev_x,
|
||||
y_name=dev_y,
|
||||
label=f"Scan {info.scan_number} - {dev_y}",
|
||||
title=f"Scan {info.scan_number}",
|
||||
x_label=dev_x,
|
||||
y_label=dev_y,
|
||||
)
|
||||
@@ -31,7 +31,6 @@ class Widgets(str, enum.Enum):
|
||||
DeviceComboBox = "DeviceComboBox"
|
||||
DeviceLineEdit = "DeviceLineEdit"
|
||||
LMFitDialog = "LMFitDialog"
|
||||
Minesweeper = "Minesweeper"
|
||||
PositionIndicator = "PositionIndicator"
|
||||
PositionerBox = "PositionerBox"
|
||||
PositionerControlLine = "PositionerControlLine"
|
||||
@@ -3182,9 +3181,6 @@ class LMFitDialog(RPCBase):
|
||||
"""
|
||||
|
||||
|
||||
class Minesweeper(RPCBase): ...
|
||||
|
||||
|
||||
class PositionIndicator(RPCBase):
|
||||
@rpc_call
|
||||
def set_value(self, position: float):
|
||||
|
||||
@@ -16,7 +16,6 @@ from bec_lib.logger import bec_logger
|
||||
from bec_lib.utils.import_utils import isinstance_based_on_class_name, lazy_import, lazy_import_from
|
||||
|
||||
import bec_widgets.cli.client as client
|
||||
from bec_widgets.cli.auto_updates import AutoUpdates
|
||||
from bec_widgets.cli.rpc.rpc_base import RPCBase
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -160,8 +159,7 @@ class BECGuiClient(RPCBase):
|
||||
|
||||
def __init__(self, **kwargs) -> None:
|
||||
super().__init__(**kwargs)
|
||||
self._auto_updates_enabled = True
|
||||
self._auto_updates = None
|
||||
self._auto_update_enabled = True
|
||||
self._startup_timeout = 0
|
||||
self._gui_started_timer = None
|
||||
self._gui_started_event = threading.Event()
|
||||
@@ -172,30 +170,25 @@ class BECGuiClient(RPCBase):
|
||||
def windows(self):
|
||||
return self._top_level
|
||||
|
||||
@property
|
||||
def auto_updates(self):
|
||||
if self._auto_updates_enabled:
|
||||
with wait_for_server(self):
|
||||
return self._auto_updates
|
||||
|
||||
def _get_update_script(self) -> AutoUpdates | None:
|
||||
eps = imd.entry_points(group="bec.widgets.auto_updates")
|
||||
for ep in eps:
|
||||
if ep.name == "plugin_widgets_update":
|
||||
try:
|
||||
spec = importlib.util.find_spec(ep.module)
|
||||
# if the module is not found, we skip it
|
||||
if spec is None:
|
||||
continue
|
||||
return ep.load()(gui=self)
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading auto update script from plugin: {str(e)}")
|
||||
return None
|
||||
# TODO: needs review
|
||||
# def _get_update_script(self) -> AutoUpdates | None:
|
||||
# eps = imd.entry_points(group="bec.widgets.auto_updates")
|
||||
# for ep in eps:
|
||||
# if ep.name == "plugin_widgets_update":
|
||||
# try:
|
||||
# spec = importlib.util.find_spec(ep.module)
|
||||
# # if the module is not found, we skip it
|
||||
# if spec is None:
|
||||
# continue
|
||||
# return ep.load()(gui=self)
|
||||
# except Exception as e:
|
||||
# logger.error(f"Error loading auto update script from plugin: {str(e)}")
|
||||
# return None
|
||||
|
||||
@property
|
||||
def selected_device(self):
|
||||
"""
|
||||
Selected device for the plot.
|
||||
Selected device for the auto update plot.
|
||||
"""
|
||||
auto_update_config_ep = MessageEndpoints.gui_auto_update_config(self._gui_id)
|
||||
auto_update_config = self._client.connector.get(auto_update_config_ep)
|
||||
@@ -218,36 +211,12 @@ class BECGuiClient(RPCBase):
|
||||
else:
|
||||
raise ValueError("Device must be a string or a device object")
|
||||
|
||||
def _start_update_script(self) -> None:
|
||||
self._client.connector.register(MessageEndpoints.scan_status(), cb=self._handle_msg_update)
|
||||
|
||||
def _handle_msg_update(self, msg: MessageObject) -> None:
|
||||
if self.auto_updates is not None:
|
||||
# pylint: disable=protected-access
|
||||
return self._update_script_msg_parser(msg.value)
|
||||
|
||||
def _update_script_msg_parser(self, msg: messages.BECMessage) -> None:
|
||||
if isinstance(msg, messages.ScanStatusMessage):
|
||||
if not self.gui_is_alive():
|
||||
return
|
||||
if self._auto_updates_enabled:
|
||||
return self.auto_updates.do_update(msg)
|
||||
|
||||
def _gui_post_startup(self):
|
||||
self._top_level["main"] = WidgetDesc(
|
||||
title="BEC Widgets", widget=BECDockArea(gui_id=self._gui_id)
|
||||
)
|
||||
if self._auto_updates_enabled:
|
||||
if self._auto_updates is None:
|
||||
auto_updates = self._get_update_script()
|
||||
if auto_updates is None:
|
||||
AutoUpdates.create_default_dock = True
|
||||
AutoUpdates.enabled = True
|
||||
auto_updates = AutoUpdates(self._top_level["main"].widget)
|
||||
if auto_updates.create_default_dock:
|
||||
auto_updates.start_default_dock()
|
||||
self._start_update_script()
|
||||
self._auto_updates = auto_updates
|
||||
if self._auto_update_enabled:
|
||||
self._do_install_auto_update()
|
||||
self._do_show_all()
|
||||
self._gui_started_event.set()
|
||||
|
||||
@@ -325,6 +294,14 @@ class BECGuiClient(RPCBase):
|
||||
self._top_level[widget._gui_id] = WidgetDesc(title=title, widget=widget)
|
||||
return widget
|
||||
|
||||
def _do_install_auto_update(self):
|
||||
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
|
||||
return rpc_client._run_rpc("install_auto_update")
|
||||
|
||||
def install_auto_update(self):
|
||||
with wait_for_server(self):
|
||||
return self._do_install_auto_update()
|
||||
|
||||
def close(self) -> None:
|
||||
"""
|
||||
Close the gui window.
|
||||
|
||||
@@ -264,8 +264,8 @@ def main():
|
||||
|
||||
RPCRegister().add_rpc(win)
|
||||
|
||||
gui = server.gui
|
||||
win.setCentralWidget(gui)
|
||||
win.setCentralWidget(server.gui)
|
||||
|
||||
if not args.hide:
|
||||
win.show()
|
||||
|
||||
|
||||
@@ -224,11 +224,3 @@ DEVICES = [
|
||||
Positioner("test", limits=[-10, 10], read_value=2.0),
|
||||
Device("test_device"),
|
||||
]
|
||||
|
||||
|
||||
def check_remote_data_size(widget, plot_name, num_elements):
|
||||
"""
|
||||
Check if the remote data has the correct number of elements.
|
||||
Used in the qtbot.waitUntil function.
|
||||
"""
|
||||
return len(widget.get_all_data()[plot_name]["x"]) == num_elements
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
# pylint: disable=no-name-in-module
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Literal
|
||||
|
||||
from qtpy.QtWidgets import (
|
||||
QApplication,
|
||||
@@ -27,15 +28,6 @@ class WidgetHandler(ABC):
|
||||
def set_value(self, widget: QWidget, value):
|
||||
"""Set a value on the widget instance."""
|
||||
|
||||
def connect_change_signal(self, widget: QWidget, slot):
|
||||
"""
|
||||
Connect a change signal from this widget to the given slot.
|
||||
If the widget type doesn't have a known "value changed" signal, do nothing.
|
||||
|
||||
slot: a function accepting two arguments (widget, value)
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class LineEditHandler(WidgetHandler):
|
||||
"""Handler for QLineEdit widgets."""
|
||||
@@ -46,9 +38,6 @@ class LineEditHandler(WidgetHandler):
|
||||
def set_value(self, widget: QLineEdit, value: str) -> None:
|
||||
widget.setText(value)
|
||||
|
||||
def connect_change_signal(self, widget: QLineEdit, slot):
|
||||
widget.textChanged.connect(lambda text, w=widget: slot(w, text))
|
||||
|
||||
|
||||
class ComboBoxHandler(WidgetHandler):
|
||||
"""Handler for QComboBox widgets."""
|
||||
@@ -64,11 +53,6 @@ class ComboBoxHandler(WidgetHandler):
|
||||
if isinstance(value, int):
|
||||
widget.setCurrentIndex(value)
|
||||
|
||||
def connect_change_signal(self, widget: QComboBox, slot):
|
||||
# currentIndexChanged(int) or currentIndexChanged(str) both possible.
|
||||
# We use currentIndexChanged(int) for a consistent behavior.
|
||||
widget.currentIndexChanged.connect(lambda idx, w=widget: slot(w, self.get_value(w)))
|
||||
|
||||
|
||||
class TableWidgetHandler(WidgetHandler):
|
||||
"""Handler for QTableWidget widgets."""
|
||||
@@ -88,16 +72,6 @@ class TableWidgetHandler(WidgetHandler):
|
||||
item = QTableWidgetItem(str(cell_value))
|
||||
widget.setItem(row, col, item)
|
||||
|
||||
def connect_change_signal(self, widget: QTableWidget, slot):
|
||||
# If desired, we could connect cellChanged(row, col) and then fetch all data.
|
||||
# This might be noisy if table is large.
|
||||
# For demonstration, connect cellChanged to update entire table value.
|
||||
def on_cell_changed(row, col, w=widget):
|
||||
val = self.get_value(w)
|
||||
slot(w, val)
|
||||
|
||||
widget.cellChanged.connect(on_cell_changed)
|
||||
|
||||
|
||||
class SpinBoxHandler(WidgetHandler):
|
||||
"""Handler for QSpinBox and QDoubleSpinBox widgets."""
|
||||
@@ -108,9 +82,6 @@ class SpinBoxHandler(WidgetHandler):
|
||||
def set_value(self, widget, value):
|
||||
widget.setValue(value)
|
||||
|
||||
def connect_change_signal(self, widget: QSpinBox | QDoubleSpinBox, slot):
|
||||
widget.valueChanged.connect(lambda val, w=widget: slot(w, val))
|
||||
|
||||
|
||||
class CheckBoxHandler(WidgetHandler):
|
||||
"""Handler for QCheckBox widgets."""
|
||||
@@ -121,9 +92,6 @@ class CheckBoxHandler(WidgetHandler):
|
||||
def set_value(self, widget, value):
|
||||
widget.setChecked(value)
|
||||
|
||||
def connect_change_signal(self, widget: QCheckBox, slot):
|
||||
widget.toggled.connect(lambda val, w=widget: slot(w, val))
|
||||
|
||||
|
||||
class LabelHandler(WidgetHandler):
|
||||
"""Handler for QLabel widgets."""
|
||||
@@ -131,15 +99,12 @@ class LabelHandler(WidgetHandler):
|
||||
def get_value(self, widget, **kwargs):
|
||||
return widget.text()
|
||||
|
||||
def set_value(self, widget: QLabel, value):
|
||||
def set_value(self, widget, value):
|
||||
widget.setText(value)
|
||||
|
||||
# QLabel typically doesn't have user-editable changes. No signal to connect.
|
||||
# If needed, this can remain empty.
|
||||
|
||||
|
||||
class WidgetIO:
|
||||
"""Public interface for getting, setting values and connecting signals using handler mapping"""
|
||||
"""Public interface for getting and setting values using handler mapping"""
|
||||
|
||||
_handlers = {
|
||||
QLineEdit: LineEditHandler,
|
||||
@@ -183,17 +148,6 @@ class WidgetIO:
|
||||
elif not ignore_errors:
|
||||
raise ValueError(f"No handler for widget type: {type(widget)}")
|
||||
|
||||
@staticmethod
|
||||
def connect_widget_change_signal(widget, slot):
|
||||
"""
|
||||
Connect the widget's value-changed signal to a generic slot function (widget, value).
|
||||
This now delegates the logic to the widget's handler.
|
||||
"""
|
||||
handler_class = WidgetIO._find_handler(widget)
|
||||
if handler_class:
|
||||
handler = handler_class()
|
||||
handler.connect_change_signal(widget, slot)
|
||||
|
||||
@staticmethod
|
||||
def check_and_adjust_limits(spin_box: QDoubleSpinBox, number: float):
|
||||
"""
|
||||
@@ -355,8 +309,8 @@ class WidgetHierarchy:
|
||||
WidgetHierarchy.import_config_from_dict(child, widget_config, set_values)
|
||||
|
||||
|
||||
# Example usage
|
||||
def hierarchy_example(): # pragma: no cover
|
||||
# Example application to demonstrate the usage of the functions
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
app = QApplication([])
|
||||
|
||||
# Create instance of WidgetHierarchy
|
||||
@@ -411,37 +365,3 @@ def hierarchy_example(): # pragma: no cover
|
||||
print(f"Config dict new REDUCED: {config_dict_new_reduced}")
|
||||
|
||||
app.exec()
|
||||
|
||||
|
||||
def widget_io_signal_example(): # pragma: no cover
|
||||
app = QApplication([])
|
||||
|
||||
main_widget = QWidget()
|
||||
layout = QVBoxLayout(main_widget)
|
||||
line_edit = QLineEdit(main_widget)
|
||||
combo_box = QComboBox(main_widget)
|
||||
spin_box = QSpinBox(main_widget)
|
||||
combo_box.addItems(["Option 1", "Option 2", "Option 3"])
|
||||
|
||||
layout.addWidget(line_edit)
|
||||
layout.addWidget(combo_box)
|
||||
layout.addWidget(spin_box)
|
||||
|
||||
main_widget.show()
|
||||
|
||||
def universal_slot(w, val):
|
||||
print(f"Widget {w.objectName() or w} changed, new value: {val}")
|
||||
|
||||
# Connect all supported widgets through their handlers
|
||||
WidgetIO.connect_widget_change_signal(line_edit, universal_slot)
|
||||
WidgetIO.connect_widget_change_signal(combo_box, universal_slot)
|
||||
WidgetIO.connect_widget_change_signal(spin_box, universal_slot)
|
||||
|
||||
app.exec_()
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
# Change example function to test different scenarios
|
||||
|
||||
# hierarchy_example()
|
||||
widget_io_signal_example()
|
||||
|
||||
@@ -44,7 +44,6 @@ class BECDockArea(BECWidget, QWidget):
|
||||
PLUGIN = True
|
||||
USER_ACCESS = [
|
||||
"_config_dict",
|
||||
"selected_device",
|
||||
"panels",
|
||||
"save_state",
|
||||
"remove_dock",
|
||||
@@ -216,17 +215,6 @@ class BECDockArea(BECWidget, QWidget):
|
||||
"Add docks using 'add_dock' method from CLI\n or \n Add widget docks using the toolbar",
|
||||
)
|
||||
|
||||
@property
|
||||
def selected_device(self) -> str:
|
||||
gui_id = QApplication.instance().gui_id
|
||||
auto_update_config = self.client.connector.get(
|
||||
MessageEndpoints.gui_auto_update_config(gui_id)
|
||||
)
|
||||
try:
|
||||
return auto_update_config.selected_device
|
||||
except AttributeError:
|
||||
return None
|
||||
|
||||
@property
|
||||
def panels(self) -> dict[str, BECDock]:
|
||||
"""
|
||||
|
||||
@@ -1,8 +1,26 @@
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
|
||||
from qtpy.QtWidgets import QApplication, QMainWindow
|
||||
|
||||
from bec_widgets.utils import BECConnector
|
||||
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
|
||||
|
||||
messages = lazy_import("bec_lib.messages")
|
||||
# from bec_lib.connector import MessageObject
|
||||
MessageObject = lazy_import_from("bec_lib.connector", ("MessageObject",))
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class ScanInfo(BaseModel):
|
||||
scan_id: str
|
||||
scan_number: int
|
||||
scan_name: str
|
||||
scan_report_devices: list
|
||||
monitored_devices: list
|
||||
status: str
|
||||
model_config: dict = {"validate_assignment": True}
|
||||
|
||||
|
||||
class BECMainWindow(QMainWindow, BECConnector):
|
||||
def __init__(self, *args, **kwargs):
|
||||
@@ -39,3 +57,138 @@ class BECMainWindow(QMainWindow, BECConnector):
|
||||
dock_area.window().setWindowTitle(name)
|
||||
dock_area.show()
|
||||
return dock_area
|
||||
|
||||
def install_auto_update(self):
|
||||
dock_area = self.centralWidget()
|
||||
figure_dock = dock_area.add_dock("default_figure")
|
||||
self.auto_update_fig = figure_dock.add_widget("BECFigure")
|
||||
self.client.connector.register(MessageEndpoints.scan_status(), cb=self._handle_msg_update)
|
||||
|
||||
@property
|
||||
def selected_device(self) -> str:
|
||||
gui_id = QApplication.instance().gui_id
|
||||
auto_update_config = self.client.connector.get(
|
||||
MessageEndpoints.gui_auto_update_config(gui_id)
|
||||
)
|
||||
try:
|
||||
return auto_update_config.selected_device
|
||||
except AttributeError:
|
||||
return None
|
||||
|
||||
def _handle_msg_update(self, msg: MessageObject) -> None:
|
||||
msg = msg.value
|
||||
if isinstance(msg, messages.ScanStatusMessage):
|
||||
return self.do_update(msg)
|
||||
|
||||
def get_scan_info(self, msg) -> ScanInfo:
|
||||
"""
|
||||
Update the script with the given data.
|
||||
"""
|
||||
info = msg.info
|
||||
status = msg.status
|
||||
scan_id = msg.scan_id
|
||||
scan_number = info.get("scan_number", 0)
|
||||
scan_name = info.get("scan_name", "Unknown")
|
||||
scan_report_devices = info.get("scan_report_devices", [])
|
||||
monitored_devices = info.get("readout_priority", {}).get("monitored", [])
|
||||
monitored_devices = [dev for dev in monitored_devices if dev not in scan_report_devices]
|
||||
return ScanInfo(
|
||||
scan_id=scan_id,
|
||||
scan_number=scan_number,
|
||||
scan_name=scan_name,
|
||||
scan_report_devices=scan_report_devices,
|
||||
monitored_devices=monitored_devices,
|
||||
status=status,
|
||||
)
|
||||
|
||||
def do_update(self, msg):
|
||||
if msg.status != "open":
|
||||
return
|
||||
info = self.get_scan_info(msg)
|
||||
return self.handler(info)
|
||||
|
||||
def handler(self, info: ScanInfo) -> None:
|
||||
"""
|
||||
Default update function.
|
||||
"""
|
||||
if info.scan_name == "line_scan" and info.scan_report_devices:
|
||||
return self.simple_line_scan(info)
|
||||
if info.scan_name == "grid_scan" and info.scan_report_devices:
|
||||
return self.simple_grid_scan(info)
|
||||
if info.scan_report_devices:
|
||||
return self.best_effort(info)
|
||||
|
||||
def get_selected_device(self, monitored_devices, selected_device):
|
||||
"""
|
||||
Get the selected device for the plot. If no device is selected, the first
|
||||
device in the monitored devices list is selected.
|
||||
"""
|
||||
if selected_device:
|
||||
return selected_device
|
||||
if len(monitored_devices) > 0:
|
||||
sel_device = monitored_devices[0]
|
||||
return sel_device
|
||||
return None
|
||||
|
||||
def simple_line_scan(self, info: ScanInfo) -> None:
|
||||
"""
|
||||
Simple line scan.
|
||||
"""
|
||||
fig = self.auto_update_fig
|
||||
if not fig:
|
||||
return
|
||||
dev_x = info.scan_report_devices[0]
|
||||
dev_y = self.get_selected_device(info.monitored_devices, self.selected_device)
|
||||
if not dev_y:
|
||||
return
|
||||
fig.clear_all()
|
||||
fig.plot(
|
||||
x_name=dev_x,
|
||||
y_name=dev_y,
|
||||
label=f"Scan {info.scan_number} - {dev_y}",
|
||||
title=f"Scan {info.scan_number}",
|
||||
x_label=dev_x,
|
||||
y_label=dev_y,
|
||||
)
|
||||
|
||||
def simple_grid_scan(self, info: ScanInfo) -> None:
|
||||
"""
|
||||
Simple grid scan.
|
||||
"""
|
||||
fig = self.auto_update_fig
|
||||
if not fig:
|
||||
return
|
||||
dev_x = info.scan_report_devices[0]
|
||||
dev_y = info.scan_report_devices[1]
|
||||
dev_z = self.get_selected_device(info.monitored_devices, self.selected_device)
|
||||
fig.clear_all()
|
||||
fig.plot(
|
||||
x_name=dev_x,
|
||||
y_name=dev_y,
|
||||
z_name=dev_z,
|
||||
label=f"Scan {info.scan_number} - {dev_z}",
|
||||
title=f"Scan {info.scan_number}",
|
||||
x_label=dev_x,
|
||||
y_label=dev_y,
|
||||
)
|
||||
|
||||
def best_effort(self, info: ScanInfo) -> None:
|
||||
"""
|
||||
Best effort scan.
|
||||
"""
|
||||
fig = self.auto_update_fig
|
||||
if not fig:
|
||||
return
|
||||
dev_x = info.scan_report_devices[0]
|
||||
dev_y = self.get_selected_device(info.monitored_devices, self.selected_device)
|
||||
if not dev_y:
|
||||
return
|
||||
fig.clear_all()
|
||||
fig.plot(
|
||||
x_name=dev_x,
|
||||
y_name=dev_y,
|
||||
label=f"Scan {info.scan_number} - {dev_y}",
|
||||
title=f"Scan {info.scan_number}",
|
||||
x_label=dev_x,
|
||||
y_label=dev_y,
|
||||
)
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
from bec_widgets.widgets.games.minesweeper import Minesweeper
|
||||
|
||||
__ALL__ = ["Minesweeper"]
|
||||
@@ -1,413 +0,0 @@
|
||||
import enum
|
||||
import random
|
||||
import time
|
||||
|
||||
from bec_qthemes import material_icon
|
||||
from qtpy.QtCore import QSize, Qt, QTimer, Signal, Slot
|
||||
from qtpy.QtGui import QBrush, QColor, QPainter, QPen
|
||||
from qtpy.QtWidgets import (
|
||||
QApplication,
|
||||
QComboBox,
|
||||
QGridLayout,
|
||||
QHBoxLayout,
|
||||
QLabel,
|
||||
QPushButton,
|
||||
QVBoxLayout,
|
||||
QWidget,
|
||||
)
|
||||
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
|
||||
NUM_COLORS = {
|
||||
1: QColor("#f44336"),
|
||||
2: QColor("#9C27B0"),
|
||||
3: QColor("#3F51B5"),
|
||||
4: QColor("#03A9F4"),
|
||||
5: QColor("#00BCD4"),
|
||||
6: QColor("#4CAF50"),
|
||||
7: QColor("#E91E63"),
|
||||
8: QColor("#FF9800"),
|
||||
}
|
||||
|
||||
LEVELS: dict[str, tuple[int, int]] = {"1": (8, 10), "2": (16, 40), "3": (24, 99)}
|
||||
|
||||
|
||||
class GameStatus(enum.Enum):
|
||||
READY = 0
|
||||
PLAYING = 1
|
||||
FAILED = 2
|
||||
SUCCESS = 3
|
||||
|
||||
|
||||
class Pos(QWidget):
|
||||
expandable = Signal(int, int)
|
||||
clicked = Signal()
|
||||
ohno = Signal()
|
||||
|
||||
def __init__(self, x, y, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
self.setFixedSize(QSize(20, 20))
|
||||
|
||||
self.x = x
|
||||
self.y = y
|
||||
self.is_start = False
|
||||
self.is_mine = False
|
||||
self.adjacent_n = 0
|
||||
self.is_revealed = False
|
||||
self.is_flagged = False
|
||||
|
||||
def reset(self):
|
||||
"""Restore the tile to its original state before mine status is assigned"""
|
||||
self.is_start = False
|
||||
self.is_mine = False
|
||||
self.adjacent_n = 0
|
||||
|
||||
self.is_revealed = False
|
||||
self.is_flagged = False
|
||||
|
||||
self.update()
|
||||
|
||||
def paintEvent(self, event):
|
||||
p = QPainter(self)
|
||||
|
||||
r = event.rect()
|
||||
|
||||
if self.is_revealed:
|
||||
color = self.palette().base().color()
|
||||
outer, inner = color, color
|
||||
else:
|
||||
outer, inner = (self.palette().highlightedText().color(), self.palette().text().color())
|
||||
|
||||
p.fillRect(r, QBrush(inner))
|
||||
pen = QPen(outer)
|
||||
pen.setWidth(1)
|
||||
p.setPen(pen)
|
||||
p.drawRect(r)
|
||||
|
||||
if self.is_revealed:
|
||||
if self.is_mine:
|
||||
p.drawPixmap(r, material_icon("experiment", convert_to_pixmap=True, filled=True))
|
||||
|
||||
elif self.adjacent_n > 0:
|
||||
pen = QPen(NUM_COLORS[self.adjacent_n])
|
||||
p.setPen(pen)
|
||||
f = p.font()
|
||||
f.setBold(True)
|
||||
p.setFont(f)
|
||||
p.drawText(r, Qt.AlignHCenter | Qt.AlignVCenter, str(self.adjacent_n))
|
||||
|
||||
elif self.is_flagged:
|
||||
p.drawPixmap(
|
||||
r,
|
||||
material_icon(
|
||||
"flag",
|
||||
size=(50, 50),
|
||||
convert_to_pixmap=True,
|
||||
filled=True,
|
||||
color=self.palette().base().color(),
|
||||
),
|
||||
)
|
||||
p.end()
|
||||
|
||||
def flag(self):
|
||||
self.is_flagged = not self.is_flagged
|
||||
self.update()
|
||||
|
||||
self.clicked.emit()
|
||||
|
||||
def reveal(self):
|
||||
self.is_revealed = True
|
||||
self.update()
|
||||
|
||||
def click(self):
|
||||
if not self.is_revealed:
|
||||
self.reveal()
|
||||
if self.adjacent_n == 0:
|
||||
self.expandable.emit(self.x, self.y)
|
||||
|
||||
self.clicked.emit()
|
||||
|
||||
def mouseReleaseEvent(self, event):
|
||||
if event.button() == Qt.MouseButton.RightButton and not self.is_revealed:
|
||||
self.flag()
|
||||
return
|
||||
|
||||
if event.button() == Qt.MouseButton.LeftButton:
|
||||
self.click()
|
||||
if self.is_mine:
|
||||
self.ohno.emit()
|
||||
|
||||
|
||||
class Minesweeper(BECWidget, QWidget):
|
||||
|
||||
PLUGIN = True
|
||||
ICON_NAME = "videogame_asset"
|
||||
USER_ACCESS = []
|
||||
|
||||
def __init__(self, parent=None, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
QWidget.__init__(self, parent=parent)
|
||||
|
||||
self._ui_initialised = False
|
||||
self._timer_start_num_seconds = 0
|
||||
self._set_level_params(LEVELS["1"])
|
||||
|
||||
self._init_ui()
|
||||
self._init_map()
|
||||
|
||||
self.update_status(GameStatus.READY)
|
||||
self.reset_map()
|
||||
self.update_status(GameStatus.READY)
|
||||
|
||||
def _init_ui(self):
|
||||
if self._ui_initialised:
|
||||
return
|
||||
self._ui_initialised = True
|
||||
|
||||
status_hb = QHBoxLayout()
|
||||
self.mines = QLabel()
|
||||
self.mines.setAlignment(Qt.AlignHCenter | Qt.AlignVCenter)
|
||||
f = self.mines.font()
|
||||
f.setPointSize(24)
|
||||
self.mines.setFont(f)
|
||||
|
||||
self.reset_button = QPushButton()
|
||||
self.reset_button.setFixedSize(QSize(32, 32))
|
||||
self.reset_button.setIconSize(QSize(32, 32))
|
||||
self.reset_button.setFlat(True)
|
||||
self.reset_button.pressed.connect(self.reset_button_pressed)
|
||||
|
||||
self.clock = QLabel()
|
||||
self.clock.setAlignment(Qt.AlignHCenter | Qt.AlignVCenter)
|
||||
self.clock.setFont(f)
|
||||
self._timer = QTimer()
|
||||
self._timer.timeout.connect(self.update_timer)
|
||||
self._timer.start(1000) # 1 second timer
|
||||
self.mines.setText(f"{self.num_mines:03d}")
|
||||
self.clock.setText("000")
|
||||
|
||||
status_hb.addWidget(self.mines)
|
||||
status_hb.addWidget(self.reset_button)
|
||||
status_hb.addWidget(self.clock)
|
||||
|
||||
level_hb = QHBoxLayout()
|
||||
self.level_selector = QComboBox()
|
||||
self.level_selector.addItems(list(LEVELS.keys()))
|
||||
level_hb.addWidget(QLabel("Level: "))
|
||||
level_hb.addWidget(self.level_selector)
|
||||
self.level_selector.currentTextChanged.connect(self.change_level)
|
||||
|
||||
vb = QVBoxLayout()
|
||||
vb.addLayout(level_hb)
|
||||
vb.addLayout(status_hb)
|
||||
|
||||
self.grid = QGridLayout()
|
||||
self.grid.setSpacing(5)
|
||||
|
||||
vb.addLayout(self.grid)
|
||||
self.setLayout(vb)
|
||||
|
||||
def _init_map(self):
|
||||
"""Redraw the grid of mines"""
|
||||
|
||||
# Remove any previous grid items and reset the grid
|
||||
for i in reversed(range(self.grid.count())):
|
||||
w: Pos = self.grid.itemAt(i).widget()
|
||||
w.clicked.disconnect(self.on_click)
|
||||
w.expandable.disconnect(self.expand_reveal)
|
||||
w.ohno.disconnect(self.game_over)
|
||||
w.setParent(None)
|
||||
w.deleteLater()
|
||||
|
||||
# Add positions to the map
|
||||
for x in range(0, self.b_size):
|
||||
for y in range(0, self.b_size):
|
||||
w = Pos(x, y)
|
||||
self.grid.addWidget(w, y, x)
|
||||
# Connect signal to handle expansion.
|
||||
w.clicked.connect(self.on_click)
|
||||
w.expandable.connect(self.expand_reveal)
|
||||
w.ohno.connect(self.game_over)
|
||||
|
||||
def reset_map(self):
|
||||
"""
|
||||
Reset the map and add new mines.
|
||||
"""
|
||||
# Clear all mine positions
|
||||
for x in range(0, self.b_size):
|
||||
for y in range(0, self.b_size):
|
||||
w = self.grid.itemAtPosition(y, x).widget()
|
||||
w.reset()
|
||||
|
||||
# Add mines to the positions
|
||||
positions = []
|
||||
while len(positions) < self.num_mines:
|
||||
x, y = (random.randint(0, self.b_size - 1), random.randint(0, self.b_size - 1))
|
||||
if (x, y) not in positions:
|
||||
w = self.grid.itemAtPosition(y, x).widget()
|
||||
w.is_mine = True
|
||||
positions.append((x, y))
|
||||
|
||||
def get_adjacency_n(x, y):
|
||||
positions = self.get_surrounding(x, y)
|
||||
num_mines = sum(1 if w.is_mine else 0 for w in positions)
|
||||
|
||||
return num_mines
|
||||
|
||||
# Add adjacencies to the positions
|
||||
for x in range(0, self.b_size):
|
||||
for y in range(0, self.b_size):
|
||||
w = self.grid.itemAtPosition(y, x).widget()
|
||||
w.adjacent_n = get_adjacency_n(x, y)
|
||||
|
||||
# Place starting marker
|
||||
while True:
|
||||
x, y = (random.randint(0, self.b_size - 1), random.randint(0, self.b_size - 1))
|
||||
w = self.grid.itemAtPosition(y, x).widget()
|
||||
# We don't want to start on a mine.
|
||||
if (x, y) not in positions:
|
||||
w = self.grid.itemAtPosition(y, x).widget()
|
||||
w.is_start = True
|
||||
|
||||
# Reveal all positions around this, if they are not mines either.
|
||||
for w in self.get_surrounding(x, y):
|
||||
if not w.is_mine:
|
||||
w.click()
|
||||
break
|
||||
|
||||
def get_surrounding(self, x, y):
|
||||
positions = []
|
||||
for xi in range(max(0, x - 1), min(x + 2, self.b_size)):
|
||||
for yi in range(max(0, y - 1), min(y + 2, self.b_size)):
|
||||
positions.append(self.grid.itemAtPosition(yi, xi).widget())
|
||||
return positions
|
||||
|
||||
def get_num_hidden(self) -> int:
|
||||
"""
|
||||
Get the number of hidden positions.
|
||||
"""
|
||||
return sum(
|
||||
1
|
||||
for x in range(0, self.b_size)
|
||||
for y in range(0, self.b_size)
|
||||
if not self.grid.itemAtPosition(y, x).widget().is_revealed
|
||||
)
|
||||
|
||||
def get_num_remaining_flags(self) -> int:
|
||||
"""
|
||||
Get the number of remaining flags.
|
||||
"""
|
||||
return self.num_mines - sum(
|
||||
1
|
||||
for x in range(0, self.b_size)
|
||||
for y in range(0, self.b_size)
|
||||
if self.grid.itemAtPosition(y, x).widget().is_flagged
|
||||
)
|
||||
|
||||
def reset_button_pressed(self):
|
||||
match self.status:
|
||||
case GameStatus.PLAYING:
|
||||
self.game_over()
|
||||
case GameStatus.FAILED | GameStatus.SUCCESS:
|
||||
self.reset_map()
|
||||
|
||||
def reveal_map(self):
|
||||
for x in range(0, self.b_size):
|
||||
for y in range(0, self.b_size):
|
||||
w = self.grid.itemAtPosition(y, x).widget()
|
||||
w.reveal()
|
||||
|
||||
@Slot(str)
|
||||
def change_level(self, level: str):
|
||||
self._set_level_params(LEVELS[level])
|
||||
self._init_map()
|
||||
self.reset_map()
|
||||
|
||||
@Slot(int, int)
|
||||
def expand_reveal(self, x, y):
|
||||
"""
|
||||
Expand the reveal to the surrounding
|
||||
|
||||
Args:
|
||||
x (int): The x position.
|
||||
y (int): The y position.
|
||||
"""
|
||||
for xi in range(max(0, x - 1), min(x + 2, self.b_size)):
|
||||
for yi in range(max(0, y - 1), min(y + 2, self.b_size)):
|
||||
w = self.grid.itemAtPosition(yi, xi).widget()
|
||||
if not w.is_mine:
|
||||
w.click()
|
||||
|
||||
@Slot()
|
||||
def on_click(self):
|
||||
"""
|
||||
Handle the click event. If the game is not started, start the game.
|
||||
"""
|
||||
self.update_available_flags()
|
||||
if self.status != GameStatus.PLAYING:
|
||||
# First click.
|
||||
self.update_status(GameStatus.PLAYING)
|
||||
# Start timer.
|
||||
self._timer_start_num_seconds = int(time.time())
|
||||
return
|
||||
self.check_win()
|
||||
|
||||
def update_available_flags(self):
|
||||
"""
|
||||
Update the number of available flags.
|
||||
"""
|
||||
self.mines.setText(f"{self.get_num_remaining_flags():03d}")
|
||||
|
||||
def check_win(self):
|
||||
"""
|
||||
Check if the game is won.
|
||||
"""
|
||||
if self.get_num_hidden() == self.num_mines:
|
||||
self.update_status(GameStatus.SUCCESS)
|
||||
|
||||
def update_status(self, status: GameStatus):
|
||||
"""
|
||||
Update the status of the game.
|
||||
|
||||
Args:
|
||||
status (GameStatus): The status of the game.
|
||||
"""
|
||||
self.status = status
|
||||
match status:
|
||||
case GameStatus.READY:
|
||||
icon = material_icon(icon_name="add", convert_to_pixmap=False)
|
||||
case GameStatus.PLAYING:
|
||||
icon = material_icon(icon_name="smart_toy", convert_to_pixmap=False)
|
||||
case GameStatus.FAILED:
|
||||
icon = material_icon(icon_name="error", convert_to_pixmap=False)
|
||||
case GameStatus.SUCCESS:
|
||||
icon = material_icon(icon_name="celebration", convert_to_pixmap=False)
|
||||
self.reset_button.setIcon(icon)
|
||||
|
||||
def update_timer(self):
|
||||
"""
|
||||
Update the timer.
|
||||
"""
|
||||
if self.status == GameStatus.PLAYING:
|
||||
num_seconds = int(time.time()) - self._timer_start_num_seconds
|
||||
self.clock.setText(f"{num_seconds:03d}")
|
||||
|
||||
def game_over(self):
|
||||
"""Cause the game to end early"""
|
||||
self.reveal_map()
|
||||
self.update_status(GameStatus.FAILED)
|
||||
|
||||
def _set_level_params(self, level: tuple[int, int]):
|
||||
self.b_size, self.num_mines = level
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from bec_widgets.utils.colors import set_theme
|
||||
|
||||
app = QApplication([])
|
||||
set_theme("light")
|
||||
widget = Minesweeper()
|
||||
widget.show()
|
||||
|
||||
app.exec_()
|
||||
@@ -1 +0,0 @@
|
||||
{'files': ['minesweeper.py']}
|
||||
@@ -1,54 +0,0 @@
|
||||
# Copyright (C) 2022 The Qt Company Ltd.
|
||||
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
|
||||
|
||||
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
|
||||
|
||||
from bec_widgets.utils.bec_designer import designer_material_icon
|
||||
from bec_widgets.widgets.games.minesweeper import Minesweeper
|
||||
|
||||
DOM_XML = """
|
||||
<ui language='c++'>
|
||||
<widget class='Minesweeper' name='minesweeper'>
|
||||
</widget>
|
||||
</ui>
|
||||
"""
|
||||
|
||||
|
||||
class MinesweeperPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._form_editor = None
|
||||
|
||||
def createWidget(self, parent):
|
||||
t = Minesweeper(parent)
|
||||
return t
|
||||
|
||||
def domXml(self):
|
||||
return DOM_XML
|
||||
|
||||
def group(self):
|
||||
return "BEC Games"
|
||||
|
||||
def icon(self):
|
||||
return designer_material_icon(Minesweeper.ICON_NAME)
|
||||
|
||||
def includeFile(self):
|
||||
return "minesweeper"
|
||||
|
||||
def initialize(self, form_editor):
|
||||
self._form_editor = form_editor
|
||||
|
||||
def isContainer(self):
|
||||
return False
|
||||
|
||||
def isInitialized(self):
|
||||
return self._form_editor is not None
|
||||
|
||||
def name(self):
|
||||
return "Minesweeper"
|
||||
|
||||
def toolTip(self):
|
||||
return "Minesweeper"
|
||||
|
||||
def whatsThis(self):
|
||||
return self.toolTip()
|
||||
@@ -1,15 +0,0 @@
|
||||
def main(): # pragma: no cover
|
||||
from qtpy import PYSIDE6
|
||||
|
||||
if not PYSIDE6:
|
||||
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
|
||||
return
|
||||
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
|
||||
|
||||
from bec_widgets.widgets.games.minesweeper_plugin import MinesweeperPlugin
|
||||
|
||||
QPyDesignerCustomWidgetCollection.addCustomWidget(MinesweeperPlugin())
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
main()
|
||||
@@ -1,11 +0,0 @@
|
||||
(user.widgets.games)=
|
||||
|
||||
# Game widgets
|
||||
|
||||
To provide some entertainment during long nights at the beamline, there are game widgets available. Well, only one, so far.
|
||||
|
||||
## Minesweeper
|
||||
|
||||

|
||||
|
||||
The classic game Minesweeper. You may select from three different levels. The game can be ended or reset by clicking on the icon in the top-centre (the robot in the screenshot).
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 14 KiB |
@@ -270,6 +270,5 @@ signal_input/signal_input.md
|
||||
position_indicator/position_indicator.md
|
||||
lmfit_dialog/lmfit_dialog.md
|
||||
dap_combo_box/dap_combo_box.md
|
||||
games/games.md
|
||||
|
||||
```
|
||||
+1
-1
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "bec_widgets"
|
||||
version = "1.14.0"
|
||||
version = "1.12.0"
|
||||
description = "BEC Widgets"
|
||||
requires-python = ">=3.10"
|
||||
classifiers = [
|
||||
|
||||
@@ -4,8 +4,7 @@ import numpy as np
|
||||
import pytest
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
|
||||
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
|
||||
from bec_widgets.tests.utils import check_remote_data_size
|
||||
from bec_widgets.cli.client import BECDockArea, BECFigure, BECImageShow, BECMotorMap, BECWaveform
|
||||
from bec_widgets.utils import Colors
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
@@ -13,7 +12,7 @@ from bec_widgets.utils import Colors
|
||||
# pylint: disable=too-many-locals
|
||||
|
||||
|
||||
def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_dock):
|
||||
def test_rpc_add_dock_with_figure_e2e(bec_client_lib, connected_client_dock):
|
||||
# BEC client shortcuts
|
||||
dock = connected_client_dock
|
||||
client = bec_client_lib
|
||||
@@ -89,17 +88,14 @@ def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_do
|
||||
|
||||
# Try to make a scan
|
||||
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
|
||||
status.wait()
|
||||
|
||||
# wait for scan to finish
|
||||
while not status.status == "COMPLETED":
|
||||
time.sleep(0.2)
|
||||
|
||||
# plot
|
||||
item = queue.scan_storage.storage[-1]
|
||||
plt_last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
|
||||
num_elements = len(plt_last_scan_data["samx"]["samx"].val)
|
||||
|
||||
plot_name = "bpm4i-bpm4i"
|
||||
|
||||
qtbot.waitUntil(lambda: check_remote_data_size(plt, plot_name, num_elements))
|
||||
|
||||
plt_data = plt.get_all_data()
|
||||
assert plt_data["bpm4i-bpm4i"]["x"] == plt_last_scan_data["samx"]["samx"].val
|
||||
assert plt_data["bpm4i-bpm4i"]["y"] == plt_last_scan_data["bpm4i"]["bpm4i"].val
|
||||
@@ -259,17 +255,11 @@ def test_auto_update(bec_client_lib, connected_client_dock_w_auto_updates, qtbot
|
||||
# get data from curves
|
||||
widgets = plt.widget_list
|
||||
qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
|
||||
plt_data = widgets[0].get_all_data()
|
||||
|
||||
item = queue.scan_storage.storage[-1]
|
||||
last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
|
||||
|
||||
num_elements = len(last_scan_data["samx"]["samx"].val)
|
||||
|
||||
plot_name = f"Scan {status.scan.scan_number} - {dock.selected_device}"
|
||||
|
||||
qtbot.waitUntil(lambda: check_remote_data_size(widgets[0], plot_name, num_elements))
|
||||
plt_data = widgets[0].get_all_data()
|
||||
|
||||
# check plotted data
|
||||
assert (
|
||||
plt_data[f"Scan {status.scan.scan_number} - bpm4i"]["x"]
|
||||
@@ -287,18 +277,12 @@ def test_auto_update(bec_client_lib, connected_client_dock_w_auto_updates, qtbot
|
||||
|
||||
plt = auto_updates.get_default_figure()
|
||||
widgets = plt.widget_list
|
||||
|
||||
qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
|
||||
plt_data = widgets[0].get_all_data()
|
||||
|
||||
item = queue.scan_storage.storage[-1]
|
||||
last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
|
||||
|
||||
plot_name = f"Scan {status.scan.scan_number} - bpm4i"
|
||||
|
||||
num_elements_bec = len(last_scan_data["samx"]["samx"].val)
|
||||
qtbot.waitUntil(lambda: check_remote_data_size(widgets[0], plot_name, num_elements_bec))
|
||||
plt_data = widgets[0].get_all_data()
|
||||
|
||||
# check plotted data
|
||||
assert (
|
||||
plt_data[f"Scan {status.scan.scan_number} - {dock.selected_device}"]["x"]
|
||||
@@ -371,7 +355,6 @@ def test_rpc_call_with_exception_in_safeslot_error_popup(connected_client_gui_ob
|
||||
gui = connected_client_gui_obj
|
||||
|
||||
gui.main.add_dock("test")
|
||||
qtbot.waitUntil(lambda: len(gui.main.panels) == 2) # default_figure + test
|
||||
with pytest.raises(ValueError):
|
||||
gui.main.add_dock("test")
|
||||
# time.sleep(0.1)
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
|
||||
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
|
||||
from bec_widgets.tests.utils import check_remote_data_size
|
||||
|
||||
|
||||
def test_rpc_waveform1d_custom_curve(connected_client_figure):
|
||||
@@ -78,7 +78,7 @@ def test_rpc_plotting_shortcuts_init_configs(connected_client_figure, qtbot):
|
||||
}
|
||||
|
||||
|
||||
def test_rpc_waveform_scan(qtbot, connected_client_figure, bec_client_lib):
|
||||
def test_rpc_waveform_scan(connected_client_figure, bec_client_lib):
|
||||
fig = BECFigure(connected_client_figure)
|
||||
|
||||
# add 3 different curves to track
|
||||
@@ -97,11 +97,6 @@ def test_rpc_waveform_scan(qtbot, connected_client_figure, bec_client_lib):
|
||||
item = queue.scan_storage.storage[-1]
|
||||
last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
|
||||
|
||||
num_elements = len(last_scan_data["samx"]["samx"].val)
|
||||
|
||||
for plot_name in ["bpm4i-bpm4i", "bpm3a-bpm3a", "bpm4d-bpm4d"]:
|
||||
qtbot.waitUntil(lambda: check_remote_data_size(plt, plot_name, num_elements))
|
||||
|
||||
# get data from curves
|
||||
plt_data = plt.get_all_data()
|
||||
|
||||
|
||||
@@ -1,50 +0,0 @@
|
||||
# pylint: disable=missing-function-docstring, missing-module-docstring, unused-import
|
||||
|
||||
import pytest
|
||||
from qtpy.QtCore import Qt
|
||||
|
||||
from bec_widgets.widgets.games import Minesweeper
|
||||
from bec_widgets.widgets.games.minesweeper import LEVELS, GameStatus, Pos
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def minesweeper(qtbot):
|
||||
widget = Minesweeper()
|
||||
qtbot.addWidget(widget)
|
||||
qtbot.waitExposed(widget)
|
||||
yield widget
|
||||
|
||||
|
||||
def test_minesweeper_init(minesweeper: Minesweeper):
|
||||
assert minesweeper.status == GameStatus.READY
|
||||
|
||||
|
||||
def test_changing_level_updates_size_and_removes_old_grid_items(minesweeper: Minesweeper):
|
||||
assert minesweeper.b_size == LEVELS["1"][0]
|
||||
grid_items = [minesweeper.grid.itemAt(i).widget() for i in range(minesweeper.grid.count())]
|
||||
for w in grid_items:
|
||||
assert w.parent() is not None
|
||||
minesweeper.change_level("2")
|
||||
assert minesweeper.b_size == LEVELS["2"][0]
|
||||
for w in grid_items:
|
||||
assert w.parent() is None
|
||||
|
||||
|
||||
def test_game_state_changes_to_failed_on_loss(qtbot, minesweeper: Minesweeper):
|
||||
assert minesweeper.status == GameStatus.READY
|
||||
grid_items: list[Pos] = [
|
||||
minesweeper.grid.itemAt(i).widget() for i in range(minesweeper.grid.count())
|
||||
]
|
||||
mine = [p for p in grid_items if p.is_mine][0]
|
||||
|
||||
with qtbot.waitSignal(mine.ohno, timeout=1000):
|
||||
qtbot.mouseRelease(mine, Qt.MouseButton.LeftButton)
|
||||
assert minesweeper.status == GameStatus.FAILED
|
||||
|
||||
|
||||
def test_game_resets_on_reset_click(minesweeper: Minesweeper):
|
||||
assert minesweeper.status == GameStatus.READY
|
||||
minesweeper.grid.itemAt(1).widget().ohno.emit()
|
||||
assert minesweeper.status == GameStatus.FAILED
|
||||
minesweeper.reset_button_pressed()
|
||||
assert minesweeper.status == GameStatus.PLAYING
|
||||
@@ -1,17 +1,8 @@
|
||||
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
|
||||
import pytest
|
||||
from qtpy.QtCore import Qt
|
||||
from qtpy.QtWidgets import (
|
||||
QComboBox,
|
||||
QLineEdit,
|
||||
QSpinBox,
|
||||
QTableWidget,
|
||||
QTableWidgetItem,
|
||||
QVBoxLayout,
|
||||
QWidget,
|
||||
)
|
||||
from qtpy.QtWidgets import QComboBox, QLineEdit, QSpinBox, QTableWidget, QVBoxLayout, QWidget
|
||||
|
||||
from bec_widgets.utils.widget_io import WidgetHierarchy, WidgetIO
|
||||
from bec_widgets.utils.widget_io import WidgetHierarchy
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
@@ -31,12 +22,6 @@ def example_widget(qtbot):
|
||||
# Add text items to the combo box
|
||||
combo_box.addItems(["Option 1", "Option 2", "Option 3"])
|
||||
|
||||
# Populate the table widget
|
||||
table_widget.setItem(0, 0, QTableWidgetItem("Initial A"))
|
||||
table_widget.setItem(0, 1, QTableWidgetItem("Initial B"))
|
||||
table_widget.setItem(1, 0, QTableWidgetItem("Initial C"))
|
||||
table_widget.setItem(1, 1, QTableWidgetItem("Initial D"))
|
||||
|
||||
qtbot.addWidget(main_widget)
|
||||
qtbot.waitExposed(main_widget)
|
||||
yield main_widget
|
||||
@@ -103,73 +88,3 @@ def test_export_import_config(example_widget):
|
||||
|
||||
assert exported_config_full == expected_full
|
||||
assert exported_config_reduced == expected_reduced
|
||||
|
||||
|
||||
def test_widget_io_get_set_value(example_widget):
|
||||
# Extract widgets
|
||||
line_edit = example_widget.findChild(QLineEdit)
|
||||
combo_box = example_widget.findChild(QComboBox)
|
||||
table_widget = example_widget.findChild(QTableWidget)
|
||||
spin_box = example_widget.findChild(QSpinBox)
|
||||
|
||||
# Check initial values
|
||||
assert WidgetIO.get_value(line_edit) == ""
|
||||
assert WidgetIO.get_value(combo_box) == 0 # first index
|
||||
assert WidgetIO.get_value(table_widget) == [
|
||||
["Initial A", "Initial B"],
|
||||
["Initial C", "Initial D"],
|
||||
]
|
||||
assert WidgetIO.get_value(spin_box) == 0
|
||||
|
||||
# Set new values
|
||||
WidgetIO.set_value(line_edit, "Hello")
|
||||
WidgetIO.set_value(combo_box, "Option 2")
|
||||
WidgetIO.set_value(table_widget, [["X", "Y"], ["Z", "W"]])
|
||||
WidgetIO.set_value(spin_box, 5)
|
||||
|
||||
# Check updated values
|
||||
assert WidgetIO.get_value(line_edit) == "Hello"
|
||||
assert WidgetIO.get_value(combo_box, as_string=True) == "Option 2"
|
||||
assert WidgetIO.get_value(table_widget) == [["X", "Y"], ["Z", "W"]]
|
||||
assert WidgetIO.get_value(spin_box) == 5
|
||||
|
||||
|
||||
def test_widget_io_signal(qtbot, example_widget):
|
||||
# Extract widgets
|
||||
line_edit = example_widget.findChild(QLineEdit)
|
||||
combo_box = example_widget.findChild(QComboBox)
|
||||
spin_box = example_widget.findChild(QSpinBox)
|
||||
table_widget = example_widget.findChild(QTableWidget)
|
||||
|
||||
# We'll store changes in a list to verify the slot is called
|
||||
changes = []
|
||||
|
||||
def universal_slot(w, val):
|
||||
changes.append((w, val))
|
||||
|
||||
# Connect signals
|
||||
WidgetIO.connect_widget_change_signal(line_edit, universal_slot)
|
||||
WidgetIO.connect_widget_change_signal(combo_box, universal_slot)
|
||||
WidgetIO.connect_widget_change_signal(spin_box, universal_slot)
|
||||
WidgetIO.connect_widget_change_signal(table_widget, universal_slot)
|
||||
|
||||
# Trigger changes
|
||||
line_edit.setText("NewText")
|
||||
qtbot.waitUntil(lambda: len(changes) > 0)
|
||||
assert changes[-1][1] == "NewText"
|
||||
|
||||
combo_box.setCurrentIndex(2)
|
||||
qtbot.waitUntil(lambda: len(changes) > 1)
|
||||
# combo_box change should give the current index or value
|
||||
# We set "Option 3" is index 2
|
||||
assert changes[-1][1] == 2 or changes[-1][1] == "Option 3"
|
||||
|
||||
spin_box.setValue(42)
|
||||
qtbot.waitUntil(lambda: len(changes) > 2)
|
||||
assert changes[-1][1] == 42
|
||||
|
||||
# For the table widget, changing a cell triggers cellChanged
|
||||
table_widget.setItem(0, 0, QTableWidgetItem("ChangedCell"))
|
||||
qtbot.waitUntil(lambda: len(changes) > 3)
|
||||
# The entire table value should be retrieved
|
||||
assert changes[-1][1][0][0] == "ChangedCell"
|
||||
|
||||
Reference in New Issue
Block a user