0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-12 18:51:50 +02:00

refactor: move auto-update initialization to the GUI server side

Add '.install_auto_update()' on GUI client to configure auto-update
(called at GUI startup by default)
This commit is contained in:
2024-12-20 15:14:02 +01:00
parent 1c8b06cbe6
commit ef3146f1ea
5 changed files with 181 additions and 231 deletions

View File

@ -1,168 +0,0 @@
from __future__ import annotations
import threading
from queue import Queue
from typing import TYPE_CHECKING
from pydantic import BaseModel
if TYPE_CHECKING:
from .client import BECDockArea, BECFigure
class ScanInfo(BaseModel):
scan_id: str
scan_number: int
scan_name: str
scan_report_devices: list
monitored_devices: list
status: str
model_config: dict = {"validate_assignment": True}
class AutoUpdates:
create_default_dock: bool = False
enabled: bool = False
dock_name: str = None
def __init__(self, gui: BECDockArea):
self.gui = gui
self._default_dock = None
self._default_fig = None
def start_default_dock(self):
"""
Create a default dock for the auto updates.
"""
self.dock_name = "default_figure"
self._default_dock = self.gui.add_dock(self.dock_name)
self._default_dock.add_widget("BECFigure")
self._default_fig = self._default_dock.widget_list[0]
@staticmethod
def get_scan_info(msg) -> ScanInfo:
"""
Update the script with the given data.
"""
info = msg.info
status = msg.status
scan_id = msg.scan_id
scan_number = info.get("scan_number", 0)
scan_name = info.get("scan_name", "Unknown")
scan_report_devices = info.get("scan_report_devices", [])
monitored_devices = info.get("readout_priority", {}).get("monitored", [])
monitored_devices = [dev for dev in monitored_devices if dev not in scan_report_devices]
return ScanInfo(
scan_id=scan_id,
scan_number=scan_number,
scan_name=scan_name,
scan_report_devices=scan_report_devices,
monitored_devices=monitored_devices,
status=status,
)
def get_default_figure(self) -> BECFigure | None:
"""
Get the default figure from the GUI.
"""
return self._default_fig
def do_update(self, msg):
"""
Run the update function if enabled.
"""
if not self.enabled:
return
if msg.status != "open":
return
info = self.get_scan_info(msg)
return self.handler(info)
def get_selected_device(self, monitored_devices, selected_device):
"""
Get the selected device for the plot. If no device is selected, the first
device in the monitored devices list is selected.
"""
if selected_device:
return selected_device
if len(monitored_devices) > 0:
sel_device = monitored_devices[0]
return sel_device
return None
def handler(self, info: ScanInfo) -> None:
"""
Default update function.
"""
if info.scan_name == "line_scan" and info.scan_report_devices:
return self.simple_line_scan(info)
if info.scan_name == "grid_scan" and info.scan_report_devices:
return self.simple_grid_scan(info)
if info.scan_report_devices:
return self.best_effort(info)
def simple_line_scan(self, info: ScanInfo) -> None:
"""
Simple line scan.
"""
fig = self.get_default_figure()
if not fig:
return
dev_x = info.scan_report_devices[0]
selected_device = yield self.gui.selected_device
dev_y = self.get_selected_device(info.monitored_devices, selected_device)
if not dev_y:
return
yield fig.clear_all()
yield fig.plot(
x_name=dev_x,
y_name=dev_y,
label=f"Scan {info.scan_number} - {dev_y}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)
def simple_grid_scan(self, info: ScanInfo) -> None:
"""
Simple grid scan.
"""
fig = self.get_default_figure()
if not fig:
return
dev_x = info.scan_report_devices[0]
dev_y = info.scan_report_devices[1]
selected_device = yield self.gui.selected_device
dev_z = self.get_selected_device(info.monitored_devices, selected_device)
yield fig.clear_all()
yield fig.plot(
x_name=dev_x,
y_name=dev_y,
z_name=dev_z,
label=f"Scan {info.scan_number} - {dev_z}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)
def best_effort(self, info: ScanInfo) -> None:
"""
Best effort scan.
"""
fig = self.get_default_figure()
if not fig:
return
dev_x = info.scan_report_devices[0]
selected_device = yield self.gui.selected_device
dev_y = self.get_selected_device(info.monitored_devices, selected_device)
if not dev_y:
return
yield fig.clear_all()
yield fig.plot(
x_name=dev_x,
y_name=dev_y,
label=f"Scan {info.scan_number} - {dev_y}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)

View File

@ -16,7 +16,6 @@ from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import isinstance_based_on_class_name, lazy_import, lazy_import_from
import bec_widgets.cli.client as client
from bec_widgets.cli.auto_updates import AutoUpdates
from bec_widgets.cli.rpc.rpc_base import RPCBase
if TYPE_CHECKING:
@ -160,8 +159,7 @@ class BECGuiClient(RPCBase):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._auto_updates_enabled = True
self._auto_updates = None
self._auto_update_enabled = True
self._startup_timeout = 0
self._gui_started_timer = None
self._gui_started_event = threading.Event()
@ -172,30 +170,25 @@ class BECGuiClient(RPCBase):
def windows(self):
return self._top_level
@property
def auto_updates(self):
if self._auto_updates_enabled:
with wait_for_server(self):
return self._auto_updates
def _get_update_script(self) -> AutoUpdates | None:
eps = imd.entry_points(group="bec.widgets.auto_updates")
for ep in eps:
if ep.name == "plugin_widgets_update":
try:
spec = importlib.util.find_spec(ep.module)
# if the module is not found, we skip it
if spec is None:
continue
return ep.load()(gui=self)
except Exception as e:
logger.error(f"Error loading auto update script from plugin: {str(e)}")
return None
# TODO: needs review
# def _get_update_script(self) -> AutoUpdates | None:
# eps = imd.entry_points(group="bec.widgets.auto_updates")
# for ep in eps:
# if ep.name == "plugin_widgets_update":
# try:
# spec = importlib.util.find_spec(ep.module)
# # if the module is not found, we skip it
# if spec is None:
# continue
# return ep.load()(gui=self)
# except Exception as e:
# logger.error(f"Error loading auto update script from plugin: {str(e)}")
# return None
@property
def selected_device(self):
"""
Selected device for the plot.
Selected device for the auto update plot.
"""
auto_update_config_ep = MessageEndpoints.gui_auto_update_config(self._gui_id)
auto_update_config = self._client.connector.get(auto_update_config_ep)
@ -218,36 +211,12 @@ class BECGuiClient(RPCBase):
else:
raise ValueError("Device must be a string or a device object")
def _start_update_script(self) -> None:
self._client.connector.register(MessageEndpoints.scan_status(), cb=self._handle_msg_update)
def _handle_msg_update(self, msg: MessageObject) -> None:
if self.auto_updates is not None:
# pylint: disable=protected-access
return self._update_script_msg_parser(msg.value)
def _update_script_msg_parser(self, msg: messages.BECMessage) -> None:
if isinstance(msg, messages.ScanStatusMessage):
if not self.gui_is_alive():
return
if self._auto_updates_enabled:
return self.auto_updates.do_update(msg)
def _gui_post_startup(self):
self._top_level["main"] = WidgetDesc(
title="BEC Widgets", widget=BECDockArea(gui_id=self._gui_id)
)
if self._auto_updates_enabled:
if self._auto_updates is None:
auto_updates = self._get_update_script()
if auto_updates is None:
AutoUpdates.create_default_dock = True
AutoUpdates.enabled = True
auto_updates = AutoUpdates(self._top_level["main"].widget)
if auto_updates.create_default_dock:
auto_updates.start_default_dock()
self._start_update_script()
self._auto_updates = auto_updates
if self._auto_update_enabled:
self._do_install_auto_update()
self._do_show_all()
self._gui_started_event.set()
@ -325,6 +294,14 @@ class BECGuiClient(RPCBase):
self._top_level[widget._gui_id] = WidgetDesc(title=title, widget=widget)
return widget
def _do_install_auto_update(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
return rpc_client._run_rpc("install_auto_update")
def install_auto_update(self):
with wait_for_server(self):
return self._do_install_auto_update()
def close(self) -> None:
"""
Close the gui window.

View File

@ -264,8 +264,8 @@ def main():
RPCRegister().add_rpc(win)
gui = server.gui
win.setCentralWidget(gui)
win.setCentralWidget(server.gui)
if not args.hide:
win.show()

View File

@ -44,7 +44,6 @@ class BECDockArea(BECWidget, QWidget):
PLUGIN = True
USER_ACCESS = [
"_config_dict",
"selected_device",
"panels",
"save_state",
"remove_dock",
@ -216,17 +215,6 @@ class BECDockArea(BECWidget, QWidget):
"Add docks using 'add_dock' method from CLI\n or \n Add widget docks using the toolbar",
)
@property
def selected_device(self) -> str:
gui_id = QApplication.instance().gui_id
auto_update_config = self.client.connector.get(
MessageEndpoints.gui_auto_update_config(gui_id)
)
try:
return auto_update_config.selected_device
except AttributeError:
return None
@property
def panels(self) -> dict[str, BECDock]:
"""

View File

@ -1,8 +1,26 @@
from bec_lib.endpoints import MessageEndpoints
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
from qtpy.QtWidgets import QApplication, QMainWindow
from bec_widgets.utils import BECConnector
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
messages = lazy_import("bec_lib.messages")
# from bec_lib.connector import MessageObject
MessageObject = lazy_import_from("bec_lib.connector", ("MessageObject",))
from pydantic import BaseModel
class ScanInfo(BaseModel):
scan_id: str
scan_number: int
scan_name: str
scan_report_devices: list
monitored_devices: list
status: str
model_config: dict = {"validate_assignment": True}
class BECMainWindow(QMainWindow, BECConnector):
def __init__(self, *args, **kwargs):
@ -39,3 +57,138 @@ class BECMainWindow(QMainWindow, BECConnector):
dock_area.window().setWindowTitle(name)
dock_area.show()
return dock_area
def install_auto_update(self):
dock_area = self.centralWidget()
figure_dock = dock_area.add_dock("default_figure")
self.auto_update_fig = figure_dock.add_widget("BECFigure")
self.client.connector.register(MessageEndpoints.scan_status(), cb=self._handle_msg_update)
@property
def selected_device(self) -> str:
gui_id = QApplication.instance().gui_id
auto_update_config = self.client.connector.get(
MessageEndpoints.gui_auto_update_config(gui_id)
)
try:
return auto_update_config.selected_device
except AttributeError:
return None
def _handle_msg_update(self, msg: MessageObject) -> None:
msg = msg.value
if isinstance(msg, messages.ScanStatusMessage):
return self.do_update(msg)
def get_scan_info(self, msg) -> ScanInfo:
"""
Update the script with the given data.
"""
info = msg.info
status = msg.status
scan_id = msg.scan_id
scan_number = info.get("scan_number", 0)
scan_name = info.get("scan_name", "Unknown")
scan_report_devices = info.get("scan_report_devices", [])
monitored_devices = info.get("readout_priority", {}).get("monitored", [])
monitored_devices = [dev for dev in monitored_devices if dev not in scan_report_devices]
return ScanInfo(
scan_id=scan_id,
scan_number=scan_number,
scan_name=scan_name,
scan_report_devices=scan_report_devices,
monitored_devices=monitored_devices,
status=status,
)
def do_update(self, msg):
if msg.status != "open":
return
info = self.get_scan_info(msg)
return self.handler(info)
def handler(self, info: ScanInfo) -> None:
"""
Default update function.
"""
if info.scan_name == "line_scan" and info.scan_report_devices:
return self.simple_line_scan(info)
if info.scan_name == "grid_scan" and info.scan_report_devices:
return self.simple_grid_scan(info)
if info.scan_report_devices:
return self.best_effort(info)
def get_selected_device(self, monitored_devices, selected_device):
"""
Get the selected device for the plot. If no device is selected, the first
device in the monitored devices list is selected.
"""
if selected_device:
return selected_device
if len(monitored_devices) > 0:
sel_device = monitored_devices[0]
return sel_device
return None
def simple_line_scan(self, info: ScanInfo) -> None:
"""
Simple line scan.
"""
fig = self.auto_update_fig
if not fig:
return
dev_x = info.scan_report_devices[0]
dev_y = self.get_selected_device(info.monitored_devices, self.selected_device)
if not dev_y:
return
fig.clear_all()
fig.plot(
x_name=dev_x,
y_name=dev_y,
label=f"Scan {info.scan_number} - {dev_y}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)
def simple_grid_scan(self, info: ScanInfo) -> None:
"""
Simple grid scan.
"""
fig = self.auto_update_fig
if not fig:
return
dev_x = info.scan_report_devices[0]
dev_y = info.scan_report_devices[1]
dev_z = self.get_selected_device(info.monitored_devices, self.selected_device)
fig.clear_all()
fig.plot(
x_name=dev_x,
y_name=dev_y,
z_name=dev_z,
label=f"Scan {info.scan_number} - {dev_z}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)
def best_effort(self, info: ScanInfo) -> None:
"""
Best effort scan.
"""
fig = self.auto_update_fig
if not fig:
return
dev_x = info.scan_report_devices[0]
dev_y = self.get_selected_device(info.monitored_devices, self.selected_device)
if not dev_y:
return
fig.clear_all()
fig.plot(
x_name=dev_x,
y_name=dev_y,
label=f"Scan {info.scan_number} - {dev_y}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)