1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-30 12:02:29 +02:00

Compare commits

..

1 Commits

Author SHA1 Message Date
guijar_m ef3146f1ea refactor: move auto-update initialization to the GUI server side
Add '.install_auto_update()' on GUI client to configure auto-update
(called at GUI startup by default)
2024-12-23 16:13:06 +01:00
22 changed files with 199 additions and 6723 deletions
-5727
View File
File diff suppressed because it is too large Load Diff
-168
View File
@@ -1,168 +0,0 @@
from __future__ import annotations
import threading
from queue import Queue
from typing import TYPE_CHECKING
from pydantic import BaseModel
if TYPE_CHECKING:
from .client import BECDockArea, BECFigure
class ScanInfo(BaseModel):
scan_id: str
scan_number: int
scan_name: str
scan_report_devices: list
monitored_devices: list
status: str
model_config: dict = {"validate_assignment": True}
class AutoUpdates:
create_default_dock: bool = False
enabled: bool = False
dock_name: str = None
def __init__(self, gui: BECDockArea):
self.gui = gui
self._default_dock = None
self._default_fig = None
def start_default_dock(self):
"""
Create a default dock for the auto updates.
"""
self.dock_name = "default_figure"
self._default_dock = self.gui.add_dock(self.dock_name)
self._default_dock.add_widget("BECFigure")
self._default_fig = self._default_dock.widget_list[0]
@staticmethod
def get_scan_info(msg) -> ScanInfo:
"""
Update the script with the given data.
"""
info = msg.info
status = msg.status
scan_id = msg.scan_id
scan_number = info.get("scan_number", 0)
scan_name = info.get("scan_name", "Unknown")
scan_report_devices = info.get("scan_report_devices", [])
monitored_devices = info.get("readout_priority", {}).get("monitored", [])
monitored_devices = [dev for dev in monitored_devices if dev not in scan_report_devices]
return ScanInfo(
scan_id=scan_id,
scan_number=scan_number,
scan_name=scan_name,
scan_report_devices=scan_report_devices,
monitored_devices=monitored_devices,
status=status,
)
def get_default_figure(self) -> BECFigure | None:
"""
Get the default figure from the GUI.
"""
return self._default_fig
def do_update(self, msg):
"""
Run the update function if enabled.
"""
if not self.enabled:
return
if msg.status != "open":
return
info = self.get_scan_info(msg)
return self.handler(info)
def get_selected_device(self, monitored_devices, selected_device):
"""
Get the selected device for the plot. If no device is selected, the first
device in the monitored devices list is selected.
"""
if selected_device:
return selected_device
if len(monitored_devices) > 0:
sel_device = monitored_devices[0]
return sel_device
return None
def handler(self, info: ScanInfo) -> None:
"""
Default update function.
"""
if info.scan_name == "line_scan" and info.scan_report_devices:
return self.simple_line_scan(info)
if info.scan_name == "grid_scan" and info.scan_report_devices:
return self.simple_grid_scan(info)
if info.scan_report_devices:
return self.best_effort(info)
def simple_line_scan(self, info: ScanInfo) -> None:
"""
Simple line scan.
"""
fig = self.get_default_figure()
if not fig:
return
dev_x = info.scan_report_devices[0]
selected_device = yield self.gui.selected_device
dev_y = self.get_selected_device(info.monitored_devices, selected_device)
if not dev_y:
return
yield fig.clear_all()
yield fig.plot(
x_name=dev_x,
y_name=dev_y,
label=f"Scan {info.scan_number} - {dev_y}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)
def simple_grid_scan(self, info: ScanInfo) -> None:
"""
Simple grid scan.
"""
fig = self.get_default_figure()
if not fig:
return
dev_x = info.scan_report_devices[0]
dev_y = info.scan_report_devices[1]
selected_device = yield self.gui.selected_device
dev_z = self.get_selected_device(info.monitored_devices, selected_device)
yield fig.clear_all()
yield fig.plot(
x_name=dev_x,
y_name=dev_y,
z_name=dev_z,
label=f"Scan {info.scan_number} - {dev_z}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)
def best_effort(self, info: ScanInfo) -> None:
"""
Best effort scan.
"""
fig = self.get_default_figure()
if not fig:
return
dev_x = info.scan_report_devices[0]
selected_device = yield self.gui.selected_device
dev_y = self.get_selected_device(info.monitored_devices, selected_device)
if not dev_y:
return
yield fig.clear_all()
yield fig.plot(
x_name=dev_x,
y_name=dev_y,
label=f"Scan {info.scan_number} - {dev_y}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)
-4
View File
@@ -31,7 +31,6 @@ class Widgets(str, enum.Enum):
DeviceComboBox = "DeviceComboBox"
DeviceLineEdit = "DeviceLineEdit"
LMFitDialog = "LMFitDialog"
Minesweeper = "Minesweeper"
PositionIndicator = "PositionIndicator"
PositionerBox = "PositionerBox"
PositionerControlLine = "PositionerControlLine"
@@ -3182,9 +3181,6 @@ class LMFitDialog(RPCBase):
"""
class Minesweeper(RPCBase): ...
class PositionIndicator(RPCBase):
@rpc_call
def set_value(self, position: float):
+26 -49
View File
@@ -16,7 +16,6 @@ from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import isinstance_based_on_class_name, lazy_import, lazy_import_from
import bec_widgets.cli.client as client
from bec_widgets.cli.auto_updates import AutoUpdates
from bec_widgets.cli.rpc.rpc_base import RPCBase
if TYPE_CHECKING:
@@ -160,8 +159,7 @@ class BECGuiClient(RPCBase):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._auto_updates_enabled = True
self._auto_updates = None
self._auto_update_enabled = True
self._startup_timeout = 0
self._gui_started_timer = None
self._gui_started_event = threading.Event()
@@ -172,30 +170,25 @@ class BECGuiClient(RPCBase):
def windows(self):
return self._top_level
@property
def auto_updates(self):
if self._auto_updates_enabled:
with wait_for_server(self):
return self._auto_updates
def _get_update_script(self) -> AutoUpdates | None:
eps = imd.entry_points(group="bec.widgets.auto_updates")
for ep in eps:
if ep.name == "plugin_widgets_update":
try:
spec = importlib.util.find_spec(ep.module)
# if the module is not found, we skip it
if spec is None:
continue
return ep.load()(gui=self)
except Exception as e:
logger.error(f"Error loading auto update script from plugin: {str(e)}")
return None
# TODO: needs review
# def _get_update_script(self) -> AutoUpdates | None:
# eps = imd.entry_points(group="bec.widgets.auto_updates")
# for ep in eps:
# if ep.name == "plugin_widgets_update":
# try:
# spec = importlib.util.find_spec(ep.module)
# # if the module is not found, we skip it
# if spec is None:
# continue
# return ep.load()(gui=self)
# except Exception as e:
# logger.error(f"Error loading auto update script from plugin: {str(e)}")
# return None
@property
def selected_device(self):
"""
Selected device for the plot.
Selected device for the auto update plot.
"""
auto_update_config_ep = MessageEndpoints.gui_auto_update_config(self._gui_id)
auto_update_config = self._client.connector.get(auto_update_config_ep)
@@ -218,36 +211,12 @@ class BECGuiClient(RPCBase):
else:
raise ValueError("Device must be a string or a device object")
def _start_update_script(self) -> None:
self._client.connector.register(MessageEndpoints.scan_status(), cb=self._handle_msg_update)
def _handle_msg_update(self, msg: MessageObject) -> None:
if self.auto_updates is not None:
# pylint: disable=protected-access
return self._update_script_msg_parser(msg.value)
def _update_script_msg_parser(self, msg: messages.BECMessage) -> None:
if isinstance(msg, messages.ScanStatusMessage):
if not self.gui_is_alive():
return
if self._auto_updates_enabled:
return self.auto_updates.do_update(msg)
def _gui_post_startup(self):
self._top_level["main"] = WidgetDesc(
title="BEC Widgets", widget=BECDockArea(gui_id=self._gui_id)
)
if self._auto_updates_enabled:
if self._auto_updates is None:
auto_updates = self._get_update_script()
if auto_updates is None:
AutoUpdates.create_default_dock = True
AutoUpdates.enabled = True
auto_updates = AutoUpdates(self._top_level["main"].widget)
if auto_updates.create_default_dock:
auto_updates.start_default_dock()
self._start_update_script()
self._auto_updates = auto_updates
if self._auto_update_enabled:
self._do_install_auto_update()
self._do_show_all()
self._gui_started_event.set()
@@ -325,6 +294,14 @@ class BECGuiClient(RPCBase):
self._top_level[widget._gui_id] = WidgetDesc(title=title, widget=widget)
return widget
def _do_install_auto_update(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
return rpc_client._run_rpc("install_auto_update")
def install_auto_update(self):
with wait_for_server(self):
return self._do_install_auto_update()
def close(self) -> None:
"""
Close the gui window.
+2 -2
View File
@@ -264,8 +264,8 @@ def main():
RPCRegister().add_rpc(win)
gui = server.gui
win.setCentralWidget(gui)
win.setCentralWidget(server.gui)
if not args.hide:
win.show()
-8
View File
@@ -224,11 +224,3 @@ DEVICES = [
Positioner("test", limits=[-10, 10], read_value=2.0),
Device("test_device"),
]
def check_remote_data_size(widget, plot_name, num_elements):
"""
Check if the remote data has the correct number of elements.
Used in the qtbot.waitUntil function.
"""
return len(widget.get_all_data()[plot_name]["x"]) == num_elements
+5 -85
View File
@@ -1,5 +1,6 @@
# pylint: disable=no-name-in-module
from abc import ABC, abstractmethod
from typing import Literal
from qtpy.QtWidgets import (
QApplication,
@@ -27,15 +28,6 @@ class WidgetHandler(ABC):
def set_value(self, widget: QWidget, value):
"""Set a value on the widget instance."""
def connect_change_signal(self, widget: QWidget, slot):
"""
Connect a change signal from this widget to the given slot.
If the widget type doesn't have a known "value changed" signal, do nothing.
slot: a function accepting two arguments (widget, value)
"""
pass
class LineEditHandler(WidgetHandler):
"""Handler for QLineEdit widgets."""
@@ -46,9 +38,6 @@ class LineEditHandler(WidgetHandler):
def set_value(self, widget: QLineEdit, value: str) -> None:
widget.setText(value)
def connect_change_signal(self, widget: QLineEdit, slot):
widget.textChanged.connect(lambda text, w=widget: slot(w, text))
class ComboBoxHandler(WidgetHandler):
"""Handler for QComboBox widgets."""
@@ -64,11 +53,6 @@ class ComboBoxHandler(WidgetHandler):
if isinstance(value, int):
widget.setCurrentIndex(value)
def connect_change_signal(self, widget: QComboBox, slot):
# currentIndexChanged(int) or currentIndexChanged(str) both possible.
# We use currentIndexChanged(int) for a consistent behavior.
widget.currentIndexChanged.connect(lambda idx, w=widget: slot(w, self.get_value(w)))
class TableWidgetHandler(WidgetHandler):
"""Handler for QTableWidget widgets."""
@@ -88,16 +72,6 @@ class TableWidgetHandler(WidgetHandler):
item = QTableWidgetItem(str(cell_value))
widget.setItem(row, col, item)
def connect_change_signal(self, widget: QTableWidget, slot):
# If desired, we could connect cellChanged(row, col) and then fetch all data.
# This might be noisy if table is large.
# For demonstration, connect cellChanged to update entire table value.
def on_cell_changed(row, col, w=widget):
val = self.get_value(w)
slot(w, val)
widget.cellChanged.connect(on_cell_changed)
class SpinBoxHandler(WidgetHandler):
"""Handler for QSpinBox and QDoubleSpinBox widgets."""
@@ -108,9 +82,6 @@ class SpinBoxHandler(WidgetHandler):
def set_value(self, widget, value):
widget.setValue(value)
def connect_change_signal(self, widget: QSpinBox | QDoubleSpinBox, slot):
widget.valueChanged.connect(lambda val, w=widget: slot(w, val))
class CheckBoxHandler(WidgetHandler):
"""Handler for QCheckBox widgets."""
@@ -121,9 +92,6 @@ class CheckBoxHandler(WidgetHandler):
def set_value(self, widget, value):
widget.setChecked(value)
def connect_change_signal(self, widget: QCheckBox, slot):
widget.toggled.connect(lambda val, w=widget: slot(w, val))
class LabelHandler(WidgetHandler):
"""Handler for QLabel widgets."""
@@ -131,15 +99,12 @@ class LabelHandler(WidgetHandler):
def get_value(self, widget, **kwargs):
return widget.text()
def set_value(self, widget: QLabel, value):
def set_value(self, widget, value):
widget.setText(value)
# QLabel typically doesn't have user-editable changes. No signal to connect.
# If needed, this can remain empty.
class WidgetIO:
"""Public interface for getting, setting values and connecting signals using handler mapping"""
"""Public interface for getting and setting values using handler mapping"""
_handlers = {
QLineEdit: LineEditHandler,
@@ -183,17 +148,6 @@ class WidgetIO:
elif not ignore_errors:
raise ValueError(f"No handler for widget type: {type(widget)}")
@staticmethod
def connect_widget_change_signal(widget, slot):
"""
Connect the widget's value-changed signal to a generic slot function (widget, value).
This now delegates the logic to the widget's handler.
"""
handler_class = WidgetIO._find_handler(widget)
if handler_class:
handler = handler_class()
handler.connect_change_signal(widget, slot)
@staticmethod
def check_and_adjust_limits(spin_box: QDoubleSpinBox, number: float):
"""
@@ -355,8 +309,8 @@ class WidgetHierarchy:
WidgetHierarchy.import_config_from_dict(child, widget_config, set_values)
# Example usage
def hierarchy_example(): # pragma: no cover
# Example application to demonstrate the usage of the functions
if __name__ == "__main__": # pragma: no cover
app = QApplication([])
# Create instance of WidgetHierarchy
@@ -411,37 +365,3 @@ def hierarchy_example(): # pragma: no cover
print(f"Config dict new REDUCED: {config_dict_new_reduced}")
app.exec()
def widget_io_signal_example(): # pragma: no cover
app = QApplication([])
main_widget = QWidget()
layout = QVBoxLayout(main_widget)
line_edit = QLineEdit(main_widget)
combo_box = QComboBox(main_widget)
spin_box = QSpinBox(main_widget)
combo_box.addItems(["Option 1", "Option 2", "Option 3"])
layout.addWidget(line_edit)
layout.addWidget(combo_box)
layout.addWidget(spin_box)
main_widget.show()
def universal_slot(w, val):
print(f"Widget {w.objectName() or w} changed, new value: {val}")
# Connect all supported widgets through their handlers
WidgetIO.connect_widget_change_signal(line_edit, universal_slot)
WidgetIO.connect_widget_change_signal(combo_box, universal_slot)
WidgetIO.connect_widget_change_signal(spin_box, universal_slot)
app.exec_()
if __name__ == "__main__": # pragma: no cover
# Change example function to test different scenarios
# hierarchy_example()
widget_io_signal_example()
@@ -44,7 +44,6 @@ class BECDockArea(BECWidget, QWidget):
PLUGIN = True
USER_ACCESS = [
"_config_dict",
"selected_device",
"panels",
"save_state",
"remove_dock",
@@ -216,17 +215,6 @@ class BECDockArea(BECWidget, QWidget):
"Add docks using 'add_dock' method from CLI\n or \n Add widget docks using the toolbar",
)
@property
def selected_device(self) -> str:
gui_id = QApplication.instance().gui_id
auto_update_config = self.client.connector.get(
MessageEndpoints.gui_auto_update_config(gui_id)
)
try:
return auto_update_config.selected_device
except AttributeError:
return None
@property
def panels(self) -> dict[str, BECDock]:
"""
@@ -1,8 +1,26 @@
from bec_lib.endpoints import MessageEndpoints
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
from qtpy.QtWidgets import QApplication, QMainWindow
from bec_widgets.utils import BECConnector
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
messages = lazy_import("bec_lib.messages")
# from bec_lib.connector import MessageObject
MessageObject = lazy_import_from("bec_lib.connector", ("MessageObject",))
from pydantic import BaseModel
class ScanInfo(BaseModel):
scan_id: str
scan_number: int
scan_name: str
scan_report_devices: list
monitored_devices: list
status: str
model_config: dict = {"validate_assignment": True}
class BECMainWindow(QMainWindow, BECConnector):
def __init__(self, *args, **kwargs):
@@ -39,3 +57,138 @@ class BECMainWindow(QMainWindow, BECConnector):
dock_area.window().setWindowTitle(name)
dock_area.show()
return dock_area
def install_auto_update(self):
dock_area = self.centralWidget()
figure_dock = dock_area.add_dock("default_figure")
self.auto_update_fig = figure_dock.add_widget("BECFigure")
self.client.connector.register(MessageEndpoints.scan_status(), cb=self._handle_msg_update)
@property
def selected_device(self) -> str:
gui_id = QApplication.instance().gui_id
auto_update_config = self.client.connector.get(
MessageEndpoints.gui_auto_update_config(gui_id)
)
try:
return auto_update_config.selected_device
except AttributeError:
return None
def _handle_msg_update(self, msg: MessageObject) -> None:
msg = msg.value
if isinstance(msg, messages.ScanStatusMessage):
return self.do_update(msg)
def get_scan_info(self, msg) -> ScanInfo:
"""
Update the script with the given data.
"""
info = msg.info
status = msg.status
scan_id = msg.scan_id
scan_number = info.get("scan_number", 0)
scan_name = info.get("scan_name", "Unknown")
scan_report_devices = info.get("scan_report_devices", [])
monitored_devices = info.get("readout_priority", {}).get("monitored", [])
monitored_devices = [dev for dev in monitored_devices if dev not in scan_report_devices]
return ScanInfo(
scan_id=scan_id,
scan_number=scan_number,
scan_name=scan_name,
scan_report_devices=scan_report_devices,
monitored_devices=monitored_devices,
status=status,
)
def do_update(self, msg):
if msg.status != "open":
return
info = self.get_scan_info(msg)
return self.handler(info)
def handler(self, info: ScanInfo) -> None:
"""
Default update function.
"""
if info.scan_name == "line_scan" and info.scan_report_devices:
return self.simple_line_scan(info)
if info.scan_name == "grid_scan" and info.scan_report_devices:
return self.simple_grid_scan(info)
if info.scan_report_devices:
return self.best_effort(info)
def get_selected_device(self, monitored_devices, selected_device):
"""
Get the selected device for the plot. If no device is selected, the first
device in the monitored devices list is selected.
"""
if selected_device:
return selected_device
if len(monitored_devices) > 0:
sel_device = monitored_devices[0]
return sel_device
return None
def simple_line_scan(self, info: ScanInfo) -> None:
"""
Simple line scan.
"""
fig = self.auto_update_fig
if not fig:
return
dev_x = info.scan_report_devices[0]
dev_y = self.get_selected_device(info.monitored_devices, self.selected_device)
if not dev_y:
return
fig.clear_all()
fig.plot(
x_name=dev_x,
y_name=dev_y,
label=f"Scan {info.scan_number} - {dev_y}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)
def simple_grid_scan(self, info: ScanInfo) -> None:
"""
Simple grid scan.
"""
fig = self.auto_update_fig
if not fig:
return
dev_x = info.scan_report_devices[0]
dev_y = info.scan_report_devices[1]
dev_z = self.get_selected_device(info.monitored_devices, self.selected_device)
fig.clear_all()
fig.plot(
x_name=dev_x,
y_name=dev_y,
z_name=dev_z,
label=f"Scan {info.scan_number} - {dev_z}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)
def best_effort(self, info: ScanInfo) -> None:
"""
Best effort scan.
"""
fig = self.auto_update_fig
if not fig:
return
dev_x = info.scan_report_devices[0]
dev_y = self.get_selected_device(info.monitored_devices, self.selected_device)
if not dev_y:
return
fig.clear_all()
fig.plot(
x_name=dev_x,
y_name=dev_y,
label=f"Scan {info.scan_number} - {dev_y}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)
-3
View File
@@ -1,3 +0,0 @@
from bec_widgets.widgets.games.minesweeper import Minesweeper
__ALL__ = ["Minesweeper"]
-413
View File
@@ -1,413 +0,0 @@
import enum
import random
import time
from bec_qthemes import material_icon
from qtpy.QtCore import QSize, Qt, QTimer, Signal, Slot
from qtpy.QtGui import QBrush, QColor, QPainter, QPen
from qtpy.QtWidgets import (
QApplication,
QComboBox,
QGridLayout,
QHBoxLayout,
QLabel,
QPushButton,
QVBoxLayout,
QWidget,
)
from bec_widgets.utils.bec_widget import BECWidget
NUM_COLORS = {
1: QColor("#f44336"),
2: QColor("#9C27B0"),
3: QColor("#3F51B5"),
4: QColor("#03A9F4"),
5: QColor("#00BCD4"),
6: QColor("#4CAF50"),
7: QColor("#E91E63"),
8: QColor("#FF9800"),
}
LEVELS: dict[str, tuple[int, int]] = {"1": (8, 10), "2": (16, 40), "3": (24, 99)}
class GameStatus(enum.Enum):
READY = 0
PLAYING = 1
FAILED = 2
SUCCESS = 3
class Pos(QWidget):
expandable = Signal(int, int)
clicked = Signal()
ohno = Signal()
def __init__(self, x, y, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setFixedSize(QSize(20, 20))
self.x = x
self.y = y
self.is_start = False
self.is_mine = False
self.adjacent_n = 0
self.is_revealed = False
self.is_flagged = False
def reset(self):
"""Restore the tile to its original state before mine status is assigned"""
self.is_start = False
self.is_mine = False
self.adjacent_n = 0
self.is_revealed = False
self.is_flagged = False
self.update()
def paintEvent(self, event):
p = QPainter(self)
r = event.rect()
if self.is_revealed:
color = self.palette().base().color()
outer, inner = color, color
else:
outer, inner = (self.palette().highlightedText().color(), self.palette().text().color())
p.fillRect(r, QBrush(inner))
pen = QPen(outer)
pen.setWidth(1)
p.setPen(pen)
p.drawRect(r)
if self.is_revealed:
if self.is_mine:
p.drawPixmap(r, material_icon("experiment", convert_to_pixmap=True, filled=True))
elif self.adjacent_n > 0:
pen = QPen(NUM_COLORS[self.adjacent_n])
p.setPen(pen)
f = p.font()
f.setBold(True)
p.setFont(f)
p.drawText(r, Qt.AlignHCenter | Qt.AlignVCenter, str(self.adjacent_n))
elif self.is_flagged:
p.drawPixmap(
r,
material_icon(
"flag",
size=(50, 50),
convert_to_pixmap=True,
filled=True,
color=self.palette().base().color(),
),
)
p.end()
def flag(self):
self.is_flagged = not self.is_flagged
self.update()
self.clicked.emit()
def reveal(self):
self.is_revealed = True
self.update()
def click(self):
if not self.is_revealed:
self.reveal()
if self.adjacent_n == 0:
self.expandable.emit(self.x, self.y)
self.clicked.emit()
def mouseReleaseEvent(self, event):
if event.button() == Qt.MouseButton.RightButton and not self.is_revealed:
self.flag()
return
if event.button() == Qt.MouseButton.LeftButton:
self.click()
if self.is_mine:
self.ohno.emit()
class Minesweeper(BECWidget, QWidget):
PLUGIN = True
ICON_NAME = "videogame_asset"
USER_ACCESS = []
def __init__(self, parent=None, *args, **kwargs):
super().__init__(*args, **kwargs)
QWidget.__init__(self, parent=parent)
self._ui_initialised = False
self._timer_start_num_seconds = 0
self._set_level_params(LEVELS["1"])
self._init_ui()
self._init_map()
self.update_status(GameStatus.READY)
self.reset_map()
self.update_status(GameStatus.READY)
def _init_ui(self):
if self._ui_initialised:
return
self._ui_initialised = True
status_hb = QHBoxLayout()
self.mines = QLabel()
self.mines.setAlignment(Qt.AlignHCenter | Qt.AlignVCenter)
f = self.mines.font()
f.setPointSize(24)
self.mines.setFont(f)
self.reset_button = QPushButton()
self.reset_button.setFixedSize(QSize(32, 32))
self.reset_button.setIconSize(QSize(32, 32))
self.reset_button.setFlat(True)
self.reset_button.pressed.connect(self.reset_button_pressed)
self.clock = QLabel()
self.clock.setAlignment(Qt.AlignHCenter | Qt.AlignVCenter)
self.clock.setFont(f)
self._timer = QTimer()
self._timer.timeout.connect(self.update_timer)
self._timer.start(1000) # 1 second timer
self.mines.setText(f"{self.num_mines:03d}")
self.clock.setText("000")
status_hb.addWidget(self.mines)
status_hb.addWidget(self.reset_button)
status_hb.addWidget(self.clock)
level_hb = QHBoxLayout()
self.level_selector = QComboBox()
self.level_selector.addItems(list(LEVELS.keys()))
level_hb.addWidget(QLabel("Level: "))
level_hb.addWidget(self.level_selector)
self.level_selector.currentTextChanged.connect(self.change_level)
vb = QVBoxLayout()
vb.addLayout(level_hb)
vb.addLayout(status_hb)
self.grid = QGridLayout()
self.grid.setSpacing(5)
vb.addLayout(self.grid)
self.setLayout(vb)
def _init_map(self):
"""Redraw the grid of mines"""
# Remove any previous grid items and reset the grid
for i in reversed(range(self.grid.count())):
w: Pos = self.grid.itemAt(i).widget()
w.clicked.disconnect(self.on_click)
w.expandable.disconnect(self.expand_reveal)
w.ohno.disconnect(self.game_over)
w.setParent(None)
w.deleteLater()
# Add positions to the map
for x in range(0, self.b_size):
for y in range(0, self.b_size):
w = Pos(x, y)
self.grid.addWidget(w, y, x)
# Connect signal to handle expansion.
w.clicked.connect(self.on_click)
w.expandable.connect(self.expand_reveal)
w.ohno.connect(self.game_over)
def reset_map(self):
"""
Reset the map and add new mines.
"""
# Clear all mine positions
for x in range(0, self.b_size):
for y in range(0, self.b_size):
w = self.grid.itemAtPosition(y, x).widget()
w.reset()
# Add mines to the positions
positions = []
while len(positions) < self.num_mines:
x, y = (random.randint(0, self.b_size - 1), random.randint(0, self.b_size - 1))
if (x, y) not in positions:
w = self.grid.itemAtPosition(y, x).widget()
w.is_mine = True
positions.append((x, y))
def get_adjacency_n(x, y):
positions = self.get_surrounding(x, y)
num_mines = sum(1 if w.is_mine else 0 for w in positions)
return num_mines
# Add adjacencies to the positions
for x in range(0, self.b_size):
for y in range(0, self.b_size):
w = self.grid.itemAtPosition(y, x).widget()
w.adjacent_n = get_adjacency_n(x, y)
# Place starting marker
while True:
x, y = (random.randint(0, self.b_size - 1), random.randint(0, self.b_size - 1))
w = self.grid.itemAtPosition(y, x).widget()
# We don't want to start on a mine.
if (x, y) not in positions:
w = self.grid.itemAtPosition(y, x).widget()
w.is_start = True
# Reveal all positions around this, if they are not mines either.
for w in self.get_surrounding(x, y):
if not w.is_mine:
w.click()
break
def get_surrounding(self, x, y):
positions = []
for xi in range(max(0, x - 1), min(x + 2, self.b_size)):
for yi in range(max(0, y - 1), min(y + 2, self.b_size)):
positions.append(self.grid.itemAtPosition(yi, xi).widget())
return positions
def get_num_hidden(self) -> int:
"""
Get the number of hidden positions.
"""
return sum(
1
for x in range(0, self.b_size)
for y in range(0, self.b_size)
if not self.grid.itemAtPosition(y, x).widget().is_revealed
)
def get_num_remaining_flags(self) -> int:
"""
Get the number of remaining flags.
"""
return self.num_mines - sum(
1
for x in range(0, self.b_size)
for y in range(0, self.b_size)
if self.grid.itemAtPosition(y, x).widget().is_flagged
)
def reset_button_pressed(self):
match self.status:
case GameStatus.PLAYING:
self.game_over()
case GameStatus.FAILED | GameStatus.SUCCESS:
self.reset_map()
def reveal_map(self):
for x in range(0, self.b_size):
for y in range(0, self.b_size):
w = self.grid.itemAtPosition(y, x).widget()
w.reveal()
@Slot(str)
def change_level(self, level: str):
self._set_level_params(LEVELS[level])
self._init_map()
self.reset_map()
@Slot(int, int)
def expand_reveal(self, x, y):
"""
Expand the reveal to the surrounding
Args:
x (int): The x position.
y (int): The y position.
"""
for xi in range(max(0, x - 1), min(x + 2, self.b_size)):
for yi in range(max(0, y - 1), min(y + 2, self.b_size)):
w = self.grid.itemAtPosition(yi, xi).widget()
if not w.is_mine:
w.click()
@Slot()
def on_click(self):
"""
Handle the click event. If the game is not started, start the game.
"""
self.update_available_flags()
if self.status != GameStatus.PLAYING:
# First click.
self.update_status(GameStatus.PLAYING)
# Start timer.
self._timer_start_num_seconds = int(time.time())
return
self.check_win()
def update_available_flags(self):
"""
Update the number of available flags.
"""
self.mines.setText(f"{self.get_num_remaining_flags():03d}")
def check_win(self):
"""
Check if the game is won.
"""
if self.get_num_hidden() == self.num_mines:
self.update_status(GameStatus.SUCCESS)
def update_status(self, status: GameStatus):
"""
Update the status of the game.
Args:
status (GameStatus): The status of the game.
"""
self.status = status
match status:
case GameStatus.READY:
icon = material_icon(icon_name="add", convert_to_pixmap=False)
case GameStatus.PLAYING:
icon = material_icon(icon_name="smart_toy", convert_to_pixmap=False)
case GameStatus.FAILED:
icon = material_icon(icon_name="error", convert_to_pixmap=False)
case GameStatus.SUCCESS:
icon = material_icon(icon_name="celebration", convert_to_pixmap=False)
self.reset_button.setIcon(icon)
def update_timer(self):
"""
Update the timer.
"""
if self.status == GameStatus.PLAYING:
num_seconds = int(time.time()) - self._timer_start_num_seconds
self.clock.setText(f"{num_seconds:03d}")
def game_over(self):
"""Cause the game to end early"""
self.reveal_map()
self.update_status(GameStatus.FAILED)
def _set_level_params(self, level: tuple[int, int]):
self.b_size, self.num_mines = level
if __name__ == "__main__":
from bec_widgets.utils.colors import set_theme
app = QApplication([])
set_theme("light")
widget = Minesweeper()
widget.show()
app.exec_()
@@ -1 +0,0 @@
{'files': ['minesweeper.py']}
@@ -1,54 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.games.minesweeper import Minesweeper
DOM_XML = """
<ui language='c++'>
<widget class='Minesweeper' name='minesweeper'>
</widget>
</ui>
"""
class MinesweeperPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = Minesweeper(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return "BEC Games"
def icon(self):
return designer_material_icon(Minesweeper.ICON_NAME)
def includeFile(self):
return "minesweeper"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "Minesweeper"
def toolTip(self):
return "Minesweeper"
def whatsThis(self):
return self.toolTip()
@@ -1,15 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.games.minesweeper_plugin import MinesweeperPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(MinesweeperPlugin())
if __name__ == "__main__": # pragma: no cover
main()
-11
View File
@@ -1,11 +0,0 @@
(user.widgets.games)=
# Game widgets
To provide some entertainment during long nights at the beamline, there are game widgets available. Well, only one, so far.
## Minesweeper
![Minesweeper](./minesweeper.png)
The classic game Minesweeper. You may select from three different levels. The game can be ended or reset by clicking on the icon in the top-centre (the robot in the screenshot).
Binary file not shown.

Before

Width:  |  Height:  |  Size: 14 KiB

-1
View File
@@ -270,6 +270,5 @@ signal_input/signal_input.md
position_indicator/position_indicator.md
lmfit_dialog/lmfit_dialog.md
dap_combo_box/dap_combo_box.md
games/games.md
```
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "bec_widgets"
version = "1.14.0"
version = "1.12.0"
description = "BEC Widgets"
requires-python = ">=3.10"
classifiers = [
+8 -25
View File
@@ -4,8 +4,7 @@ import numpy as np
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
from bec_widgets.tests.utils import check_remote_data_size
from bec_widgets.cli.client import BECDockArea, BECFigure, BECImageShow, BECMotorMap, BECWaveform
from bec_widgets.utils import Colors
# pylint: disable=unused-argument
@@ -13,7 +12,7 @@ from bec_widgets.utils import Colors
# pylint: disable=too-many-locals
def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_dock):
def test_rpc_add_dock_with_figure_e2e(bec_client_lib, connected_client_dock):
# BEC client shortcuts
dock = connected_client_dock
client = bec_client_lib
@@ -89,17 +88,14 @@ def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_do
# Try to make a scan
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
status.wait()
# wait for scan to finish
while not status.status == "COMPLETED":
time.sleep(0.2)
# plot
item = queue.scan_storage.storage[-1]
plt_last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
num_elements = len(plt_last_scan_data["samx"]["samx"].val)
plot_name = "bpm4i-bpm4i"
qtbot.waitUntil(lambda: check_remote_data_size(plt, plot_name, num_elements))
plt_data = plt.get_all_data()
assert plt_data["bpm4i-bpm4i"]["x"] == plt_last_scan_data["samx"]["samx"].val
assert plt_data["bpm4i-bpm4i"]["y"] == plt_last_scan_data["bpm4i"]["bpm4i"].val
@@ -259,17 +255,11 @@ def test_auto_update(bec_client_lib, connected_client_dock_w_auto_updates, qtbot
# get data from curves
widgets = plt.widget_list
qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
plt_data = widgets[0].get_all_data()
item = queue.scan_storage.storage[-1]
last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
num_elements = len(last_scan_data["samx"]["samx"].val)
plot_name = f"Scan {status.scan.scan_number} - {dock.selected_device}"
qtbot.waitUntil(lambda: check_remote_data_size(widgets[0], plot_name, num_elements))
plt_data = widgets[0].get_all_data()
# check plotted data
assert (
plt_data[f"Scan {status.scan.scan_number} - bpm4i"]["x"]
@@ -287,18 +277,12 @@ def test_auto_update(bec_client_lib, connected_client_dock_w_auto_updates, qtbot
plt = auto_updates.get_default_figure()
widgets = plt.widget_list
qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
plt_data = widgets[0].get_all_data()
item = queue.scan_storage.storage[-1]
last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
plot_name = f"Scan {status.scan.scan_number} - bpm4i"
num_elements_bec = len(last_scan_data["samx"]["samx"].val)
qtbot.waitUntil(lambda: check_remote_data_size(widgets[0], plot_name, num_elements_bec))
plt_data = widgets[0].get_all_data()
# check plotted data
assert (
plt_data[f"Scan {status.scan.scan_number} - {dock.selected_device}"]["x"]
@@ -371,7 +355,6 @@ def test_rpc_call_with_exception_in_safeslot_error_popup(connected_client_gui_ob
gui = connected_client_gui_obj
gui.main.add_dock("test")
qtbot.waitUntil(lambda: len(gui.main.panels) == 2) # default_figure + test
with pytest.raises(ValueError):
gui.main.add_dock("test")
# time.sleep(0.1)
+2 -7
View File
@@ -1,10 +1,10 @@
import time
import numpy as np
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
from bec_widgets.tests.utils import check_remote_data_size
def test_rpc_waveform1d_custom_curve(connected_client_figure):
@@ -78,7 +78,7 @@ def test_rpc_plotting_shortcuts_init_configs(connected_client_figure, qtbot):
}
def test_rpc_waveform_scan(qtbot, connected_client_figure, bec_client_lib):
def test_rpc_waveform_scan(connected_client_figure, bec_client_lib):
fig = BECFigure(connected_client_figure)
# add 3 different curves to track
@@ -97,11 +97,6 @@ def test_rpc_waveform_scan(qtbot, connected_client_figure, bec_client_lib):
item = queue.scan_storage.storage[-1]
last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
num_elements = len(last_scan_data["samx"]["samx"].val)
for plot_name in ["bpm4i-bpm4i", "bpm3a-bpm3a", "bpm4d-bpm4d"]:
qtbot.waitUntil(lambda: check_remote_data_size(plt, plot_name, num_elements))
# get data from curves
plt_data = plt.get_all_data()
-50
View File
@@ -1,50 +0,0 @@
# pylint: disable=missing-function-docstring, missing-module-docstring, unused-import
import pytest
from qtpy.QtCore import Qt
from bec_widgets.widgets.games import Minesweeper
from bec_widgets.widgets.games.minesweeper import LEVELS, GameStatus, Pos
@pytest.fixture
def minesweeper(qtbot):
widget = Minesweeper()
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_minesweeper_init(minesweeper: Minesweeper):
assert minesweeper.status == GameStatus.READY
def test_changing_level_updates_size_and_removes_old_grid_items(minesweeper: Minesweeper):
assert minesweeper.b_size == LEVELS["1"][0]
grid_items = [minesweeper.grid.itemAt(i).widget() for i in range(minesweeper.grid.count())]
for w in grid_items:
assert w.parent() is not None
minesweeper.change_level("2")
assert minesweeper.b_size == LEVELS["2"][0]
for w in grid_items:
assert w.parent() is None
def test_game_state_changes_to_failed_on_loss(qtbot, minesweeper: Minesweeper):
assert minesweeper.status == GameStatus.READY
grid_items: list[Pos] = [
minesweeper.grid.itemAt(i).widget() for i in range(minesweeper.grid.count())
]
mine = [p for p in grid_items if p.is_mine][0]
with qtbot.waitSignal(mine.ohno, timeout=1000):
qtbot.mouseRelease(mine, Qt.MouseButton.LeftButton)
assert minesweeper.status == GameStatus.FAILED
def test_game_resets_on_reset_click(minesweeper: Minesweeper):
assert minesweeper.status == GameStatus.READY
minesweeper.grid.itemAt(1).widget().ohno.emit()
assert minesweeper.status == GameStatus.FAILED
minesweeper.reset_button_pressed()
assert minesweeper.status == GameStatus.PLAYING
+2 -87
View File
@@ -1,17 +1,8 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import pytest
from qtpy.QtCore import Qt
from qtpy.QtWidgets import (
QComboBox,
QLineEdit,
QSpinBox,
QTableWidget,
QTableWidgetItem,
QVBoxLayout,
QWidget,
)
from qtpy.QtWidgets import QComboBox, QLineEdit, QSpinBox, QTableWidget, QVBoxLayout, QWidget
from bec_widgets.utils.widget_io import WidgetHierarchy, WidgetIO
from bec_widgets.utils.widget_io import WidgetHierarchy
@pytest.fixture(scope="function")
@@ -31,12 +22,6 @@ def example_widget(qtbot):
# Add text items to the combo box
combo_box.addItems(["Option 1", "Option 2", "Option 3"])
# Populate the table widget
table_widget.setItem(0, 0, QTableWidgetItem("Initial A"))
table_widget.setItem(0, 1, QTableWidgetItem("Initial B"))
table_widget.setItem(1, 0, QTableWidgetItem("Initial C"))
table_widget.setItem(1, 1, QTableWidgetItem("Initial D"))
qtbot.addWidget(main_widget)
qtbot.waitExposed(main_widget)
yield main_widget
@@ -103,73 +88,3 @@ def test_export_import_config(example_widget):
assert exported_config_full == expected_full
assert exported_config_reduced == expected_reduced
def test_widget_io_get_set_value(example_widget):
# Extract widgets
line_edit = example_widget.findChild(QLineEdit)
combo_box = example_widget.findChild(QComboBox)
table_widget = example_widget.findChild(QTableWidget)
spin_box = example_widget.findChild(QSpinBox)
# Check initial values
assert WidgetIO.get_value(line_edit) == ""
assert WidgetIO.get_value(combo_box) == 0 # first index
assert WidgetIO.get_value(table_widget) == [
["Initial A", "Initial B"],
["Initial C", "Initial D"],
]
assert WidgetIO.get_value(spin_box) == 0
# Set new values
WidgetIO.set_value(line_edit, "Hello")
WidgetIO.set_value(combo_box, "Option 2")
WidgetIO.set_value(table_widget, [["X", "Y"], ["Z", "W"]])
WidgetIO.set_value(spin_box, 5)
# Check updated values
assert WidgetIO.get_value(line_edit) == "Hello"
assert WidgetIO.get_value(combo_box, as_string=True) == "Option 2"
assert WidgetIO.get_value(table_widget) == [["X", "Y"], ["Z", "W"]]
assert WidgetIO.get_value(spin_box) == 5
def test_widget_io_signal(qtbot, example_widget):
# Extract widgets
line_edit = example_widget.findChild(QLineEdit)
combo_box = example_widget.findChild(QComboBox)
spin_box = example_widget.findChild(QSpinBox)
table_widget = example_widget.findChild(QTableWidget)
# We'll store changes in a list to verify the slot is called
changes = []
def universal_slot(w, val):
changes.append((w, val))
# Connect signals
WidgetIO.connect_widget_change_signal(line_edit, universal_slot)
WidgetIO.connect_widget_change_signal(combo_box, universal_slot)
WidgetIO.connect_widget_change_signal(spin_box, universal_slot)
WidgetIO.connect_widget_change_signal(table_widget, universal_slot)
# Trigger changes
line_edit.setText("NewText")
qtbot.waitUntil(lambda: len(changes) > 0)
assert changes[-1][1] == "NewText"
combo_box.setCurrentIndex(2)
qtbot.waitUntil(lambda: len(changes) > 1)
# combo_box change should give the current index or value
# We set "Option 3" is index 2
assert changes[-1][1] == 2 or changes[-1][1] == "Option 3"
spin_box.setValue(42)
qtbot.waitUntil(lambda: len(changes) > 2)
assert changes[-1][1] == 42
# For the table widget, changing a cell triggers cellChanged
table_widget.setItem(0, 0, QTableWidgetItem("ChangedCell"))
qtbot.waitUntil(lambda: len(changes) > 3)
# The entire table value should be retrieved
assert changes[-1][1][0][0] == "ChangedCell"