mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-13 19:21:50 +02:00
wip - namespace update
This commit is contained in:
@ -35,9 +35,9 @@ class AutoUpdates:
|
||||
Create a default dock for the auto updates.
|
||||
"""
|
||||
self.dock_name = "default_figure"
|
||||
self._default_dock = self.gui.add_dock(self.dock_name)
|
||||
self._default_dock.add_widget("BECFigure")
|
||||
self._default_fig = self._default_dock.widget_list[0]
|
||||
self._default_dock = self.gui.new(self.dock_name)
|
||||
self._default_dock.new("BECFigure")
|
||||
self._default_fig = self._default_dock.elements_list[0]
|
||||
|
||||
@staticmethod
|
||||
def get_scan_info(msg) -> ScanInfo:
|
||||
|
@ -1,3 +1,5 @@
|
||||
"""Client utilities for the BEC GUI."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib
|
||||
@ -7,32 +9,35 @@ import os
|
||||
import select
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
from contextlib import contextmanager
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.utils.import_utils import isinstance_based_on_class_name, lazy_import, lazy_import_from
|
||||
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
|
||||
import bec_widgets.cli.client as client
|
||||
from bec_widgets.cli.auto_updates import AutoUpdates
|
||||
from bec_widgets.cli.rpc.rpc_base import RPCBase
|
||||
from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCReference
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_lib import messages
|
||||
from bec_lib.connector import MessageObject
|
||||
from bec_lib.device import DeviceBase
|
||||
|
||||
from bec_widgets.utils.bec_dispatcher import BECDispatcher
|
||||
from bec_lib.redis_connector import StreamMessage
|
||||
else:
|
||||
messages = lazy_import("bec_lib.messages")
|
||||
# from bec_lib.connector import MessageObject
|
||||
MessageObject = lazy_import_from("bec_lib.connector", ("MessageObject",))
|
||||
BECDispatcher = lazy_import_from("bec_widgets.utils.bec_dispatcher", ("BECDispatcher",))
|
||||
StreamMessage = lazy_import_from("bec_lib.redis_connector", ("StreamMessage",))
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
IGNORE_WIDGETS = ["BECDockArea", "BECDock"]
|
||||
|
||||
|
||||
def _filter_output(output: str) -> str:
|
||||
"""
|
||||
@ -67,7 +72,9 @@ def _get_output(process, logger) -> None:
|
||||
logger.error(f"Error reading process output: {str(e)}")
|
||||
|
||||
|
||||
def _start_plot_process(gui_id: str, gui_class: type, config: dict | str, logger=None) -> None:
|
||||
def _start_plot_process(
|
||||
gui_id: str, gui_class: type, gui_class_id: str, config: dict | str, logger=None
|
||||
) -> None:
|
||||
"""
|
||||
Start the plot in a new process.
|
||||
|
||||
@ -76,7 +83,16 @@ def _start_plot_process(gui_id: str, gui_class: type, config: dict | str, logger
|
||||
process will not be captured.
|
||||
"""
|
||||
# pylint: disable=subprocess-run-check
|
||||
command = ["bec-gui-server", "--id", gui_id, "--gui_class", gui_class.__name__, "--hide"]
|
||||
command = [
|
||||
"bec-gui-server",
|
||||
"--id",
|
||||
gui_id,
|
||||
"--gui_class",
|
||||
gui_class.__name__,
|
||||
"--gui_class_id",
|
||||
gui_class_id,
|
||||
"--hide",
|
||||
]
|
||||
if config:
|
||||
if isinstance(config, dict):
|
||||
config = json.dumps(config)
|
||||
@ -111,16 +127,20 @@ def _start_plot_process(gui_id: str, gui_class: type, config: dict | str, logger
|
||||
|
||||
|
||||
class RepeatTimer(threading.Timer):
|
||||
"""RepeatTimer class."""
|
||||
|
||||
def run(self):
|
||||
while not self.finished.wait(self.interval):
|
||||
self.function(*self.args, **self.kwargs)
|
||||
|
||||
|
||||
# pylint: disable=protected-access
|
||||
@contextmanager
|
||||
def wait_for_server(client):
|
||||
def wait_for_server(client: BECGuiClient):
|
||||
"""Context manager to wait for the server to start."""
|
||||
timeout = client._startup_timeout
|
||||
if not timeout:
|
||||
if client.gui_is_alive():
|
||||
if client._gui_is_alive():
|
||||
# there is hope, let's wait a bit
|
||||
timeout = 1
|
||||
else:
|
||||
@ -138,42 +158,63 @@ def wait_for_server(client):
|
||||
yield
|
||||
|
||||
|
||||
### ----------------------------
|
||||
### NOTE
|
||||
### it is far easier to extend the 'delete' method on the client side,
|
||||
### to know when the client is deleted, rather than listening to server
|
||||
### to get notified. However, 'generate_cli.py' cannot add extra stuff
|
||||
### in the generated client module. So, here a class with the same name
|
||||
### is created, and client module is patched.
|
||||
class WidgetNameSpace:
|
||||
def __repr__(self):
|
||||
console = Console()
|
||||
table = Table(title="Available widgets for BEC CLI usage")
|
||||
table.add_column("Widget Name", justify="left", style="magenta")
|
||||
table.add_column("Description", justify="left")
|
||||
for attr, value in self.__dict__.items():
|
||||
docs = value.__doc__
|
||||
docs = docs if docs else "No description available"
|
||||
table.add_row(attr, docs)
|
||||
console.print(table)
|
||||
return f""
|
||||
|
||||
|
||||
class AvailableWidgetsNamespace:
|
||||
"""Namespace for available widgets in the BEC GUI."""
|
||||
|
||||
def __init__(self):
|
||||
for widget in client.Widgets:
|
||||
name = widget.value
|
||||
if name in IGNORE_WIDGETS:
|
||||
continue
|
||||
setattr(self, name, name)
|
||||
|
||||
def __repr__(self):
|
||||
console = Console()
|
||||
table = Table(title="Available widgets for BEC CLI usage")
|
||||
table.add_column("Widget Name", justify="left", style="magenta")
|
||||
table.add_column("Description", justify="left")
|
||||
for attr_name, _ in self.__dict__.items():
|
||||
docs = getattr(client, attr_name).__doc__
|
||||
docs = docs if docs else "No description available"
|
||||
table.add_row(attr_name, docs if len(docs.strip()) > 0 else "No description available")
|
||||
console.print(table)
|
||||
return "" # f"<{self.__class__.__name__}>"
|
||||
|
||||
|
||||
class BECDockArea(client.BECDockArea):
|
||||
def delete(self):
|
||||
if self is BECGuiClient._top_level["main"].widget:
|
||||
raise RuntimeError("Cannot delete main window")
|
||||
super().delete()
|
||||
try:
|
||||
del BECGuiClient._top_level[self._gui_id]
|
||||
except KeyError:
|
||||
# if a dock area is not at top level
|
||||
pass
|
||||
"""Extend the BECDockArea class and add namespaces to access widgets of docks."""
|
||||
|
||||
|
||||
client.BECDockArea = BECDockArea
|
||||
### ----------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class WidgetDesc:
|
||||
title: str
|
||||
widget: BECDockArea
|
||||
def __init__(self, gui_id=None, config=None, name=None, parent=None):
|
||||
super().__init__(gui_id, config, name, parent)
|
||||
# Add namespaces for DockArea
|
||||
self.elements = WidgetNameSpace()
|
||||
|
||||
|
||||
class BECGuiClient(RPCBase):
|
||||
"""BEC GUI client class. Container for GUI applications within Python."""
|
||||
|
||||
_top_level = {}
|
||||
|
||||
def __init__(self, **kwargs) -> None:
|
||||
super().__init__(**kwargs)
|
||||
self._default_dock_name = "bec"
|
||||
self._auto_updates_enabled = True
|
||||
self._auto_updates = None
|
||||
self._killed = False
|
||||
self._startup_timeout = 0
|
||||
self._gui_started_timer = None
|
||||
self._gui_started_event = threading.Event()
|
||||
@ -181,14 +222,21 @@ class BECGuiClient(RPCBase):
|
||||
self._process_output_processing_thread = None
|
||||
|
||||
@property
|
||||
def windows(self):
|
||||
def windows(self) -> dict:
|
||||
"""Dictionary with dock ares in the GUI."""
|
||||
return self._top_level
|
||||
|
||||
@property
|
||||
def auto_updates(self):
|
||||
if self._auto_updates_enabled:
|
||||
with wait_for_server(self):
|
||||
return self._auto_updates
|
||||
def window_list(self) -> list:
|
||||
"""List with dock areas in the GUI."""
|
||||
return list(self._top_level.values())
|
||||
|
||||
# FIXME AUTO UPDATES
|
||||
# @property
|
||||
# def auto_updates(self):
|
||||
# if self._auto_updates_enabled:
|
||||
# with wait_for_server(self):
|
||||
# return self._auto_updates
|
||||
|
||||
def _get_update_script(self) -> AutoUpdates | None:
|
||||
eps = imd.entry_points(group="bec.widgets.auto_updates")
|
||||
@ -199,51 +247,53 @@ class BECGuiClient(RPCBase):
|
||||
# if the module is not found, we skip it
|
||||
if spec is None:
|
||||
continue
|
||||
return ep.load()(gui=self._top_level["main"].widget)
|
||||
return ep.load()(gui=self._top_level["main"])
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading auto update script from plugin: {str(e)}")
|
||||
return None
|
||||
|
||||
@property
|
||||
def selected_device(self):
|
||||
"""
|
||||
Selected device for the plot.
|
||||
"""
|
||||
auto_update_config_ep = MessageEndpoints.gui_auto_update_config(self._gui_id)
|
||||
auto_update_config = self._client.connector.get(auto_update_config_ep)
|
||||
if auto_update_config:
|
||||
return auto_update_config.selected_device
|
||||
return None
|
||||
# FIXME AUTO UPDATES
|
||||
# @property
|
||||
# def selected_device(self) -> str | None:
|
||||
# """
|
||||
# Selected device for the plot.
|
||||
# """
|
||||
# auto_update_config_ep = MessageEndpoints.gui_auto_update_config(self._gui_id)
|
||||
# auto_update_config = self._client.connector.get(auto_update_config_ep)
|
||||
# if auto_update_config:
|
||||
# return auto_update_config.selected_device
|
||||
# return None
|
||||
|
||||
@selected_device.setter
|
||||
def selected_device(self, device: str | DeviceBase):
|
||||
if isinstance_based_on_class_name(device, "bec_lib.device.DeviceBase"):
|
||||
self._client.connector.set_and_publish(
|
||||
MessageEndpoints.gui_auto_update_config(self._gui_id),
|
||||
messages.GUIAutoUpdateConfigMessage(selected_device=device.name),
|
||||
)
|
||||
elif isinstance(device, str):
|
||||
self._client.connector.set_and_publish(
|
||||
MessageEndpoints.gui_auto_update_config(self._gui_id),
|
||||
messages.GUIAutoUpdateConfigMessage(selected_device=device),
|
||||
)
|
||||
else:
|
||||
raise ValueError("Device must be a string or a device object")
|
||||
# @selected_device.setter
|
||||
# def selected_device(self, device: str | DeviceBase):
|
||||
# if isinstance_based_on_class_name(device, "bec_lib.device.DeviceBase"):
|
||||
# self._client.connector.set_and_publish(
|
||||
# MessageEndpoints.gui_auto_update_config(self._gui_id),
|
||||
# messages.GUIAutoUpdateConfigMessage(selected_device=device.name),
|
||||
# )
|
||||
# elif isinstance(device, str):
|
||||
# self._client.connector.set_and_publish(
|
||||
# MessageEndpoints.gui_auto_update_config(self._gui_id),
|
||||
# messages.GUIAutoUpdateConfigMessage(selected_device=device),
|
||||
# )
|
||||
# else:
|
||||
# raise ValueError("Device must be a string or a device object")
|
||||
|
||||
def _start_update_script(self) -> None:
|
||||
self._client.connector.register(MessageEndpoints.scan_status(), cb=self._handle_msg_update)
|
||||
# FIXME AUTO UPDATES
|
||||
# def _start_update_script(self) -> None:
|
||||
# self._client.connector.register(MessageEndpoints.scan_status(), cb=self._handle_msg_update)
|
||||
|
||||
def _handle_msg_update(self, msg: MessageObject) -> None:
|
||||
if self.auto_updates is not None:
|
||||
# pylint: disable=protected-access
|
||||
return self._update_script_msg_parser(msg.value)
|
||||
# def _handle_msg_update(self, msg: StreamMessage) -> None:
|
||||
# if self.auto_updates is not None:
|
||||
# # pylint: disable=protected-access
|
||||
# return self._update_script_msg_parser(msg.value)
|
||||
|
||||
def _update_script_msg_parser(self, msg: messages.BECMessage) -> None:
|
||||
if isinstance(msg, messages.ScanStatusMessage):
|
||||
if not self.gui_is_alive():
|
||||
return
|
||||
if self._auto_updates_enabled:
|
||||
return self.auto_updates.do_update(msg)
|
||||
# def _update_script_msg_parser(self, msg: messages.BECMessage) -> None:
|
||||
# if isinstance(msg, messages.ScanStatusMessage):
|
||||
# if not self._gui_is_alive():
|
||||
# return
|
||||
# if self._auto_updates_enabled:
|
||||
# return self.auto_updates.do_update(msg)
|
||||
|
||||
def _gui_post_startup(self):
|
||||
self._top_level["main"] = WidgetDesc(
|
||||
@ -263,7 +313,7 @@ class BECGuiClient(RPCBase):
|
||||
self._do_show_all()
|
||||
self._gui_started_event.set()
|
||||
|
||||
def start_server(self, wait=False) -> None:
|
||||
def _start_server(self, wait: bool = False) -> None:
|
||||
"""
|
||||
Start the GUI server, and execute callback when it is launched
|
||||
"""
|
||||
@ -272,7 +322,11 @@ class BECGuiClient(RPCBase):
|
||||
self._startup_timeout = 5
|
||||
self._gui_started_event.clear()
|
||||
self._process, self._process_output_processing_thread = _start_plot_process(
|
||||
self._gui_id, self.__class__, self._client._service_config.config, logger=logger
|
||||
self._gui_id,
|
||||
self.__class__,
|
||||
gui_class_id=self._default_dock_name,
|
||||
config=self._client._service_config.config, # pylint: disable=protected-access
|
||||
logger=logger,
|
||||
)
|
||||
|
||||
def gui_started_callback(callback):
|
||||
@ -283,7 +337,7 @@ class BECGuiClient(RPCBase):
|
||||
threading.current_thread().cancel()
|
||||
|
||||
self._gui_started_timer = RepeatTimer(
|
||||
0.5, lambda: self.gui_is_alive() and gui_started_callback(self._gui_post_startup)
|
||||
0.5, lambda: self._gui_is_alive() and gui_started_callback(self._gui_post_startup)
|
||||
)
|
||||
self._gui_started_timer.start()
|
||||
|
||||
@ -299,49 +353,90 @@ class BECGuiClient(RPCBase):
|
||||
|
||||
def _do_show_all(self):
|
||||
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
|
||||
rpc_client._run_rpc("show")
|
||||
rpc_client._run_rpc("show") # pylint: disable=protected-access
|
||||
for window in self._top_level.values():
|
||||
window.widget.show()
|
||||
window.show()
|
||||
|
||||
def show_all(self):
|
||||
def _show_all(self):
|
||||
with wait_for_server(self):
|
||||
return self._do_show_all()
|
||||
|
||||
def hide_all(self):
|
||||
def _hide_all(self):
|
||||
with wait_for_server(self):
|
||||
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
|
||||
rpc_client._run_rpc("hide")
|
||||
for window in self._top_level.values():
|
||||
window.widget.hide()
|
||||
rpc_client._run_rpc("hide") # pylint: disable=protected-access
|
||||
# because of the registry callbacks, we may have
|
||||
# dock areas that are already killed, but not yet
|
||||
# removed from the registry state
|
||||
if not self._killed:
|
||||
for window in self._top_level.values():
|
||||
window.hide()
|
||||
|
||||
def show(self):
|
||||
"""Show the GUI window."""
|
||||
if self._process is not None:
|
||||
return self.show_all()
|
||||
return self._show_all()
|
||||
# backward compatibility: show() was also starting server
|
||||
return self.start_server(wait=True)
|
||||
return self._start_server(wait=True)
|
||||
|
||||
def hide(self):
|
||||
return self.hide_all()
|
||||
"""Hide the GUI window."""
|
||||
return self._hide_all()
|
||||
|
||||
@property
|
||||
def main(self):
|
||||
"""Return client to main dock area (in main window)"""
|
||||
with wait_for_server(self):
|
||||
return self._top_level["main"].widget
|
||||
def new(
|
||||
self,
|
||||
name: str | None = None,
|
||||
wait: bool = True,
|
||||
geometry: tuple[int, int, int, int] | None = None,
|
||||
) -> BECDockArea:
|
||||
"""Create a new top-level dock area.
|
||||
|
||||
def new(self, title):
|
||||
"""Ask main window to create a new top-level dock area"""
|
||||
with wait_for_server(self):
|
||||
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
|
||||
widget = rpc_client._run_rpc("new_dock_area", title)
|
||||
self._top_level[widget._gui_id] = WidgetDesc(title=title, widget=widget)
|
||||
return widget
|
||||
|
||||
def close(self) -> None:
|
||||
Args:
|
||||
name(str, optional): The name of the dock area. Defaults to None.
|
||||
wait(bool, optional): Whether to wait for the server to start. Defaults to True.
|
||||
geometry(tuple[int, int, int, int] | None): The geometry of the dock area (pos_x, pos_y, w, h)
|
||||
Returns:
|
||||
BECDockArea: The new dock area.
|
||||
"""
|
||||
Close the gui window.
|
||||
if len(self.window_list) == 0:
|
||||
self.show()
|
||||
if wait:
|
||||
with wait_for_server(self):
|
||||
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
|
||||
widget = rpc_client._run_rpc(
|
||||
"new_dock_area", name, geometry
|
||||
) # pylint: disable=protected-access
|
||||
return widget
|
||||
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
|
||||
widget = rpc_client._run_rpc(
|
||||
"new_dock_area", name, geometry
|
||||
) # pylint: disable=protected-access
|
||||
return widget
|
||||
|
||||
def delete(self, name: str) -> None:
|
||||
"""Delete a dock area.
|
||||
|
||||
Args:
|
||||
name(str): The name of the dock area.
|
||||
"""
|
||||
widget = self.windows.get(name)
|
||||
if widget is None:
|
||||
raise ValueError(f"Dock area {name} not found.")
|
||||
widget._run_rpc("close") # pylint: disable=protected-access
|
||||
|
||||
def delete_all(self) -> None:
|
||||
"""Delete all dock areas."""
|
||||
for widget_name in self.windows.keys():
|
||||
self.delete(widget_name)
|
||||
def close(self):
|
||||
"""Deprecated. Use kill_server() instead."""
|
||||
# FIXME, deprecated in favor of kill, will be removed in the future
|
||||
self.kill_server()
|
||||
|
||||
def kill_server(self) -> None:
|
||||
"""Kill the GUI server."""
|
||||
self._top_level.clear()
|
||||
self._killed = True
|
||||
|
||||
if self._gui_started_timer is not None:
|
||||
self._gui_started_timer.cancel()
|
||||
|
Reference in New Issue
Block a user