0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 03:31:50 +02:00

feat: add rpc broadcast

This commit is contained in:
2025-03-13 10:06:34 +01:00
committed by wyzula-jan
parent 9f2a083abb
commit 2ba9b4cb23
14 changed files with 780 additions and 635 deletions

View File

@ -9,17 +9,20 @@ import os
import select
import subprocess
import threading
import time
from contextlib import contextmanager
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
from rich.console import Console
from rich.table import Table
import bec_widgets.cli.client as client
from bec_widgets.cli.auto_updates import AutoUpdates
from bec_widgets.cli.rpc.rpc_base import RPCBase
# from bec_widgets.cli.auto_updates import AutoUpdates
from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCReference
if TYPE_CHECKING:
from bec_lib import messages
@ -166,7 +169,7 @@ class WidgetNameSpace:
docs = docs if docs else "No description available"
table.add_row(attr, docs)
console.print(table)
return f""
return ""
class AvailableWidgetsNamespace:
@ -189,22 +192,13 @@ class AvailableWidgetsNamespace:
docs = docs if docs else "No description available"
table.add_row(attr_name, docs if len(docs.strip()) > 0 else "No description available")
console.print(table)
return "" # f"<{self.__class__.__name__}>"
class BECDockArea(client.BECDockArea):
"""Extend the BECDockArea class and add namespaces to access widgets of docks."""
def __init__(self, gui_id=None, config=None, name=None, parent=None):
super().__init__(gui_id, config, name, parent)
# Add namespaces for DockArea
self.elements = WidgetNameSpace()
return ""
class BECGuiClient(RPCBase):
"""BEC GUI client class. Container for GUI applications within Python."""
_top_level: dict[str, BECDockArea] = {}
_top_level: dict[str, client.BECDockArea] = {}
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
@ -217,6 +211,32 @@ class BECGuiClient(RPCBase):
self._gui_started_event = threading.Event()
self._process = None
self._process_output_processing_thread = None
self._exposed_dock_areas = []
self._registry_state = {}
self._ipython_registry = {}
self.available_widgets = AvailableWidgetsNamespace()
####################
#### Client API ####
####################
def connect_to_gui_server(self, gui_id: str) -> None:
"""Connect to a GUI server"""
# Unregister the old callback
self._client.connector.unregister(
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
)
self._gui_id = gui_id
# Get the registry state
msgs = self._client.connector.xread(
MessageEndpoints.gui_registry_state(self._gui_id), count=1
)
if msgs:
self._handle_registry_update(msgs[0])
# Register the new callback
self._client.connector.register(
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
)
@property
def windows(self) -> dict:
@ -228,6 +248,282 @@ class BECGuiClient(RPCBase):
"""List with dock areas in the GUI."""
return list(self._top_level.values())
def start(self, wait: bool = False) -> None:
"""Start the GUI server."""
return self._start(wait=wait)
def show(self):
"""Show the GUI window."""
if self._process is not None:
return self._show_all()
# backward compatibility: show() was also starting server
return self._start_server(wait=True)
def hide(self):
"""Hide the GUI window."""
return self._hide_all()
def new(
self,
name: str | None = None,
wait: bool = True,
geometry: tuple[int, int, int, int] | None = None,
) -> client.BECDockArea:
"""Create a new top-level dock area.
Args:
name(str, optional): The name of the dock area. Defaults to None.
wait(bool, optional): Whether to wait for the server to start. Defaults to True.
geometry(tuple[int, int, int, int] | None): The geometry of the dock area (pos_x, pos_y, w, h)
Returns:
client.BECDockArea: The new dock area.
"""
if len(self.window_list) == 0:
self.show()
if wait:
with wait_for_server(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
widget = rpc_client._run_rpc(
"new_dock_area", name, geometry
) # pylint: disable=protected-access
self._top_level[widget.widget_name] = widget
return widget
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
widget = rpc_client._run_rpc(
"new_dock_area", name, geometry
) # pylint: disable=protected-access
self._top_level[widget.widget_name] = widget
return widget
def delete(self, name: str) -> None:
"""Delete a dock area.
Args:
name(str): The name of the dock area.
"""
widget = self.windows.get(name)
if widget is None:
raise ValueError(f"Dock area {name} not found.")
widget._run_rpc("close") # pylint: disable=protected-access
def delete_all(self) -> None:
"""Delete all dock areas."""
for widget_name in self.windows:
self.delete(widget_name)
def kill_server(self) -> None:
"""Kill the GUI server."""
self._top_level.clear()
self._killed = True
if self._gui_started_timer is not None:
self._gui_started_timer.cancel()
self._gui_started_timer.join()
if self._process is None:
return
if self._process:
logger.success("Stopping GUI...")
self._process.terminate()
if self._process_output_processing_thread:
self._process_output_processing_thread.join()
self._process.wait()
self._process = None
# Unregister the registry state
self._client.connector.unregister(
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
)
def close(self):
"""Deprecated. Use kill_server() instead."""
# FIXME, deprecated in favor of kill, will be removed in the future
self.kill_server()
#########################
#### Private methods ####
#########################
def _gui_post_startup(self):
timeout = 10
while time.time() < time.time() + timeout:
if len(list(self._registry_state.keys())) == 0:
time.sleep(0.1)
else:
break
# FIXME AUTO UPDATES
# if self._auto_updates_enabled:
# if self._auto_updates is None:
# auto_updates = self._get_update_script()
# if auto_updates is None:
# AutoUpdates.create_default_dock = True
# AutoUpdates.enabled = True
# auto_updates = AutoUpdates(self._top_level["main"].widget)
# if auto_updates.create_default_dock:
# auto_updates.start_default_dock()
# self._start_update_script()
# self._auto_updates = auto_updates
self._do_show_all()
self._gui_started_event.set()
def _start_server(self, wait: bool = False) -> None:
"""
Start the GUI server, and execute callback when it is launched
"""
if self._process is None or self._process.poll() is not None:
logger.success("GUI starting...")
self._startup_timeout = 5
self._gui_started_event.clear()
self._process, self._process_output_processing_thread = _start_plot_process(
self._gui_id,
self.__class__,
gui_class_id=self._default_dock_name,
config=self._client._service_config.config, # pylint: disable=protected-access
logger=logger,
)
def gui_started_callback(callback):
try:
if callable(callback):
callback()
finally:
threading.current_thread().cancel()
self._gui_started_timer = RepeatTimer(
0.5, lambda: self._gui_is_alive() and gui_started_callback(self._gui_post_startup)
)
self._gui_started_timer.start()
if wait:
self._gui_started_event.wait()
def _dump(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
return rpc_client._run_rpc("_dump")
def _start(self, wait: bool = False) -> None:
self._killed = False
self._client.connector.register(
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
)
return self._start_server(wait=wait)
def _handle_registry_update(self, msg: StreamMessage) -> None:
self._registry_state = msg["data"].state
self._update_dynamic_namespace()
def _do_show_all(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
rpc_client._run_rpc("show") # pylint: disable=protected-access
for window in self._top_level.values():
window.show()
def _show_all(self):
with wait_for_server(self):
return self._do_show_all()
def _hide_all(self):
with wait_for_server(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
rpc_client._run_rpc("hide") # pylint: disable=protected-access
# because of the registry callbacks, we may have
# dock areas that are already killed, but not yet
# removed from the registry state
if not self._killed:
for window in self._top_level.values():
window.hide()
def _update_dynamic_namespace(self):
"""Update the dynamic name space"""
self._clear_exposed_dock_areas()
self._cleanup_ipython_registry()
self._add_registry_to_namespace()
def _clear_exposed_dock_areas(self):
"""Clear the exposed dock areas"""
self._top_level.clear()
for widget_id in self._exposed_dock_areas:
delattr(self, widget_id)
self._exposed_dock_areas.clear()
def _cleanup_ipython_registry(self):
"""Cleanup the ipython registry"""
remove_ids = []
for widget_id in self._ipython_registry:
if widget_id not in self._registry_state:
remove_ids.append(widget_id)
for widget_id in remove_ids:
self._ipython_registry.pop(widget_id)
def _set_dynamic_attributes(self, obj: object, name: str, value: Any) -> None:
"""Add an object to the namespace"""
setattr(obj, name, value)
def _add_registry_to_namespace(self) -> None:
"""Add registry to namespace"""
# Add dock areas
dock_area_states = [
state
for state in self._registry_state.values()
if state["widget_class"] == "BECDockArea"
]
for state in dock_area_states:
# obj is an RPC reference to the RPCBase object
dock_area_obj = self._add_widget(state, self)
self._set_dynamic_attributes(self, dock_area_obj.widget_name, dock_area_obj)
# Add dock_area to the top level
self._top_level[dock_area_obj.widget_name] = dock_area_obj
self._exposed_dock_areas.append(dock_area_obj.widget_name)
# Add docks
dock_states = [
state
for state in self._registry_state.values()
if state["config"].get("parent_id", "") == dock_area_obj._gui_id
]
for state in dock_states:
dock_obj = self._add_widget(state, dock_area_obj)
self._set_dynamic_attributes(dock_area_obj, dock_obj.widget_name, dock_obj)
# Add widgets
widget_states = [
state
for state in self._registry_state.values()
if state["config"].get("parent_id", "") == dock_obj._gui_id
]
for state in widget_states:
widget = self._add_widget(state, dock_obj)
self._set_dynamic_attributes(dock_obj, widget.widget_name, widget)
self._set_dynamic_attributes(dock_area_obj.elements, widget.widget_name, widget)
def _add_widget(self, state: dict, parent: object) -> RPCReference:
"""Add a widget to the namespace
Args:
state (dict): The state of the widget from the _registry_state.
parent (object): The parent object.
"""
name = state["name"]
gui_id = state["gui_id"]
widget_class = getattr(client, state["widget_class"])
obj = self._ipython_registry.get(gui_id)
if obj is None:
widget = widget_class(gui_id=gui_id, name=name, parent=parent)
self._ipython_registry[gui_id] = widget
else:
widget = obj
if widget_class == client.BECDockArea:
# Add elements to dynamic namespace
self._set_dynamic_attributes(widget, "elements", WidgetNameSpace())
obj = RPCReference(registry=self._ipython_registry, gui_id=gui_id)
return obj
################################
#### Auto updates ####
#### potentially deprecated ####
################################
# FIXME AUTO UPDATES
# @property
# def auto_updates(self):
@ -235,19 +531,19 @@ class BECGuiClient(RPCBase):
# with wait_for_server(self):
# return self._auto_updates
def _get_update_script(self) -> AutoUpdates | None:
eps = imd.entry_points(group="bec.widgets.auto_updates")
for ep in eps:
if ep.name == "plugin_widgets_update":
try:
spec = importlib.util.find_spec(ep.module)
# if the module is not found, we skip it
if spec is None:
continue
return ep.load()(gui=self._top_level["main"])
except Exception as e:
logger.error(f"Error loading auto update script from plugin: {str(e)}")
return None
# def _get_update_script(self) -> AutoUpdates | None:
# eps = imd.entry_points(group="bec.widgets.auto_updates")
# for ep in eps:
# if ep.name == "plugin_widgets_update":
# try:
# spec = importlib.util.find_spec(ep.module)
# # if the module is not found, we skip it
# if spec is None:
# continue
# return ep.load()(gui=self._top_level["main"])
# except Exception as e:
# logger.error(f"Error loading auto update script from plugin: {str(e)}")
# return None
# FIXME AUTO UPDATES
# @property
@ -292,176 +588,14 @@ class BECGuiClient(RPCBase):
# if self._auto_updates_enabled:
# return self.auto_updates.do_update(msg)
def _gui_post_startup(self):
# if self._auto_updates_enabled:
# if self._auto_updates is None:
# auto_updates = self._get_update_script()
# if auto_updates is None:
# AutoUpdates.create_default_dock = True
# AutoUpdates.enabled = True
# auto_updates = AutoUpdates(self._top_level["main"].widget)
# if auto_updates.create_default_dock:
# auto_updates.start_default_dock()
# self._start_update_script()
# self._auto_updates = auto_updates
self._top_level[self._default_dock_name] = BECDockArea(
gui_id=f"{self._default_dock_name}", name=self._default_dock_name, parent=self
)
self._do_show_all()
self._gui_started_event.set()
def _start_server(self, wait: bool = False) -> None:
"""
Start the GUI server, and execute callback when it is launched
"""
if self._process is None or self._process.poll() is not None:
logger.success("GUI starting...")
self._startup_timeout = 5
self._gui_started_event.clear()
self._process, self._process_output_processing_thread = _start_plot_process(
self._gui_id,
self.__class__,
gui_class_id=self._default_dock_name,
config=self._client._service_config.config, # pylint: disable=protected-access
logger=logger,
)
def gui_started_callback(callback):
try:
if callable(callback):
callback()
finally:
threading.current_thread().cancel()
self._gui_started_timer = RepeatTimer(
0.5, lambda: self._gui_is_alive() and gui_started_callback(self._gui_post_startup)
)
self._gui_started_timer.start()
if wait:
self._gui_started_event.wait()
def _dump(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
return rpc_client._run_rpc("_dump")
def start(self, wait: bool = True) -> None:
"""Start the server and show the GUI window."""
return self._start_server(wait=wait)
def _do_show_all(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
rpc_client._run_rpc("show") # pylint: disable=protected-access
for window in self._top_level.values():
window.show()
def _show_all(self):
with wait_for_server(self):
return self._do_show_all()
def _hide_all(self):
with wait_for_server(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
rpc_client._run_rpc("hide") # pylint: disable=protected-access
# because of the registry callbacks, we may have
# dock areas that are already killed, but not yet
# removed from the registry state
if not self._killed:
for window in self._top_level.values():
window.hide()
def show(self):
"""Show the GUI window."""
if self._process is not None:
return self._show_all()
# backward compatibility: show() was also starting server
return self._start_server(wait=True)
def hide(self):
"""Hide the GUI window."""
return self._hide_all()
def new(
self,
name: str | None = None,
wait: bool = True,
geometry: tuple[int, int, int, int] | None = None,
) -> BECDockArea:
"""Create a new top-level dock area.
Args:
name(str, optional): The name of the dock area. Defaults to None.
wait(bool, optional): Whether to wait for the server to start. Defaults to True.
geometry(tuple[int, int, int, int] | None): The geometry of the dock area (pos_x, pos_y, w, h)
Returns:
BECDockArea: The new dock area.
"""
if len(self.window_list) == 0:
self.show()
if wait:
with wait_for_server(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
widget = rpc_client._run_rpc(
"new_dock_area", name, geometry
) # pylint: disable=protected-access
self._top_level[widget.widget_name] = widget
return widget
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
widget = rpc_client._run_rpc(
"new_dock_area", name, geometry
) # pylint: disable=protected-access
self._top_level[widget.widget_name] = widget
return widget
def delete(self, name: str) -> None:
"""Delete a dock area.
Args:
name(str): The name of the dock area.
"""
widget = self.windows.get(name)
if widget is None:
raise ValueError(f"Dock area {name} not found.")
widget._run_rpc("close") # pylint: disable=protected-access
def delete_all(self) -> None:
"""Delete all dock areas."""
for widget_name in self.windows.keys():
self.delete(widget_name)
def close(self):
"""Deprecated. Use kill_server() instead."""
# FIXME, deprecated in favor of kill, will be removed in the future
self.kill_server()
def kill_server(self) -> None:
"""Kill the GUI server."""
self._top_level.clear()
self._killed = True
if self._gui_started_timer is not None:
self._gui_started_timer.cancel()
self._gui_started_timer.join()
if self._process is None:
return
if self._process:
logger.success("Stopping GUI...")
self._process.terminate()
if self._process_output_processing_thread:
self._process_output_processing_thread.join()
self._process.wait()
self._process = None
if __name__ == "__main__": # pragma: no cover
from bec_lib.client import BECClient
from bec_lib.service_config import ServiceConfig
config = ServiceConfig()
client = BECClient(config)
client.start()
bec_client = BECClient(config)
bec_client.start()
# Test the client_utils.py module
gui = BECGuiClient()

View File

@ -60,6 +60,50 @@ class RPCResponseTimeoutError(Exception):
)
class DeletedWidgetError(Exception): ...
def check_for_deleted_widget(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if self._gui_id not in self._registry:
raise DeletedWidgetError(f"Widget with gui_id {self._gui_id} has been deleted")
return func(self, *args, **kwargs)
return wrapper
class RPCReference:
def __init__(self, registry: dict, gui_id: str) -> None:
self._registry = registry
self._gui_id = gui_id
@check_for_deleted_widget
def __getattr__(self, name):
if name in ["_registry", "_gui_id"]:
return super().__getattribute__(name)
return self._registry[self._gui_id].__getattribute__(name)
@check_for_deleted_widget
def __getitem__(self, key):
return self._registry[self._gui_id].__getitem__(key)
def __repr__(self):
if self._gui_id not in self._registry:
return f"<Deleted widget with gui_id {self._gui_id}>"
return self._registry[self._gui_id].__repr__()
def __str__(self):
if self._gui_id not in self._registry:
return f"<Deleted widget with gui_id {self._gui_id}>"
return self._registry[self._gui_id].__str__()
def __dir__(self):
if self._gui_id not in self._registry:
return []
return self._registry[self._gui_id].__dir__()
class RPCBase:
def __init__(
self,
@ -182,7 +226,11 @@ class RPCBase:
cls = getattr(client, cls)
# print(msg_result)
return cls(parent=self, **msg_result)
ret = cls(parent=self, **msg_result)
self._root._ipython_registry[ret._gui_id] = ret
obj = RPCReference(self._root._ipython_registry, ret._gui_id)
return obj
# return ret
return msg_result
def _gui_is_alive(self):

View File

@ -17,6 +17,20 @@ if TYPE_CHECKING: # pragma: no cover
logger = bec_logger.logger
def broadcast_update(func):
"""
Decorator to broadcast updates to the RPCRegister whenever a new RPC object is added or removed.
"""
@wraps(func)
def wrapper(self, *args, **kwargs):
result = func(self, *args, **kwargs)
self.broadcast()
return result
return wrapper
class RPCRegister:
"""
A singleton class that keeps track of all the RPC objects registered in the system for CLI usage.
@ -37,7 +51,9 @@ class RPCRegister:
return
self._rpc_register = WeakValueDictionary()
self._initialized = True
self.callbacks = []
@broadcast_update
def add_rpc(self, rpc: QObject):
"""
Add an RPC object to the register.
@ -49,6 +65,7 @@ class RPCRegister:
raise ValueError("RPC object must have a 'gui_id' attribute.")
self._rpc_register[rpc.gui_id] = rpc
@broadcast_update
def remove_rpc(self, rpc: str):
"""
Remove an RPC object from the register.
@ -97,6 +114,25 @@ class RPCRegister:
widgets = [rpc for rpc in self._rpc_register.values() if isinstance(rpc, cls)]
return [widget._name for widget in widgets]
def broadcast(self):
"""
Broadcast the update to all the callbacks.
"""
# print("Broadcasting")
connections = self.list_all_connections()
for callback in self.callbacks:
callback(connections)
def add_callback(self, callback: Callable[[dict], None]):
"""
Add a callback that will be called whenever the registry is updated.
Args:
callback(Callable[[dict], None]): The callback to be added. It should accept a dictionary of all the
registered RPC objects as an argument.
"""
self.callbacks.append(callback)
@classmethod
def reset_singleton(cls):
"""

View File

@ -69,6 +69,7 @@ class BECWidgetsCLIServer:
self.gui_id = gui_id
# register broadcast callback
self.rpc_register = RPCRegister()
self.rpc_register.add_callback(self.broadcast_registry_update)
self.gui = gui_class(parent=None, name=gui_class_id, gui_id=gui_class_id)
# self.rpc_register.add_rpc(self.gui)
@ -139,13 +140,15 @@ class BECWidgetsCLIServer:
def serialize_object(self, obj):
if isinstance(obj, BECConnector):
config = {} # obj.config.model_dump()
config["parent_id"] = obj.parent_id
return {
"gui_id": obj.gui_id,
"name": (
obj._name if hasattr(obj, "_name") else obj.__class__.__name__
), # pylint: disable=protected-access
"widget_class": obj.__class__.__name__,
"config": obj.config.model_dump(),
"config": config,
"__rpc__": True,
}
return obj
@ -161,6 +164,22 @@ class BECWidgetsCLIServer:
except RedisError as exc:
logger.error(f"Error while emitting heartbeat: {exc}")
def broadcast_registry_update(self, connections: dict):
"""
Broadcast the updated registry to all clients.
"""
# We only need to broadcast the dock areas
data = {key: self.serialize_object(val) for key, val in connections.items()}
# logger.info(f"All registered connections: {list(connections.keys())}")
for k, v in data.items():
logger.info(f"key: {k}, value: {v}")
self.client.connector.xadd(
MessageEndpoints.gui_registry_state(self.gui_id),
msg_dict={"data": messages.GUIRegistryStateMessage(state=data)},
max_size=1, # only single message in stream
)
def shutdown(self): # TODO not sure if needed when cleanup is done at level of BECConnector
logger.info(f"Shutting down server with gui_id: {self.gui_id}")
self.status = messages.BECStatus.IDLE

View File

@ -82,6 +82,7 @@ class BECConnector:
config: ConnectionConfig | None = None,
gui_id: str | None = None,
name: str | None = None,
parent_id: str | None = None,
):
# BEC related connections
self.bec_dispatcher = BECDispatcher(client=client)
@ -110,14 +111,12 @@ class BECConnector:
)
self.config = ConnectionConfig(widget_class=self.__class__.__name__)
# I feel that we should not allow BECConnector to be created with a custom gui_id
# because this would break with the logic in the RPCRegister of retrieving widgets by type
# iterating over all widgets and checkinf if the register widget starts with the string that is passsed.
# If the gui_id is randomly generated, this would break since that widget would have a
# gui_id that is generated in a different way.
self.parent_id = parent_id
# If the gui_id is passed, it should be respected. However, this should be revisted since
# the gui_id has to be unique, and may no longer be.
if gui_id:
self.config.gui_id = gui_id
self.gui_id: str = gui_id
self.gui_id: str = gui_id # Keep namespace in sync
else:
self.gui_id: str = self.config.gui_id # type: ignore
if name is None:
@ -319,6 +318,7 @@ class BECConnector:
self.deleteLater()
else:
self.rpc_register.remove_rpc(self)
self.rpc_register.broadcast()
def get_config(self, dict_output: bool = True) -> dict | BaseModel:
"""

View File

@ -34,6 +34,7 @@ class BECWidget(BECConnector):
theme_update: bool = False,
name: str | None = None,
parent_dock: BECDock | None = None,
parent_id: str | None = None,
**kwargs,
):
"""
@ -56,7 +57,9 @@ class BECWidget(BECConnector):
if not isinstance(self, QWidget):
raise RuntimeError(f"{repr(self)} is not a subclass of QWidget")
super().__init__(client=client, config=config, gui_id=gui_id, name=name)
super().__init__(
client=client, config=config, gui_id=gui_id, name=name, parent_id=parent_id
)
self._parent_dock = parent_dock
app = QApplication.instance()
if not hasattr(app, "theme"):

View File

@ -131,6 +131,7 @@ class BECDock(BECWidget, Dock):
self,
parent: QWidget | None = None,
parent_dock_area: BECDockArea | None = None,
parent_id: str | None = None,
config: DockConfig | None = None,
name: str | None = None,
client=None,
@ -149,7 +150,7 @@ class BECDock(BECWidget, Dock):
config = DockConfig(**config)
self.config = config
super().__init__(
client=client, config=config, gui_id=gui_id, name=name
client=client, config=config, gui_id=gui_id, name=name, parent_id=parent_id
) # Name was checked and created in BEC Widget
label = CustomDockLabel(text=name, closable=closable)
Dock.__init__(self, name=name, label=label, parent=self, **kwargs)
@ -324,18 +325,24 @@ class BECDock(BECWidget, Dock):
if isinstance(widget, str):
widget = cast(
BECWidget,
widget_handler.create_widget(widget_type=widget, name=name, parent_dock=self),
widget_handler.create_widget(
widget_type=widget, name=name, parent_dock=self, parent_id=self.gui_id
),
)
else:
widget._name = name # pylint: disable=protected-access
self.addWidget(widget, row=row, col=col, rowspan=rowspan, colspan=colspan)
if hasattr(widget, "config"):
self.config.widgets[widget.gui_id] = widget.config
widget.config.gui_id = widget.gui_id
self.config.widgets[widget._name] = widget.config # pylint: disable=protected-access
self._broadcast_update()
return widget
def _broadcast_update(self):
rpc_register = RPCRegister()
rpc_register.broadcast()
def move_widget(self, widget: QWidget, new_row: int, new_col: int):
"""
Move a widget to a new position in the layout.

View File

@ -357,7 +357,7 @@ class BECDockArea(BECWidget, QWidget):
else: # Name is not provided
name = WidgetContainerUtils.generate_unique_name(name="dock", list_of_names=dock_names)
dock = BECDock(name=name, parent_dock_area=self, closable=closable)
dock = BECDock(name=name, parent_dock_area=self, parent_id=self.gui_id, closable=closable)
dock.config.position = position
self.config.docks[dock.name()] = dock.config
# The dock.name is equal to the name passed to BECDock
@ -381,8 +381,14 @@ class BECDockArea(BECWidget, QWidget):
self.update()
if floating:
dock.detach()
# Run broadcast update
self._broadcast_update()
return dock
def _broadcast_update(self):
rpc_register = RPCRegister()
rpc_register.broadcast()
def detach_dock(self, dock_name: str) -> BECDock:
"""
Undock a dock from the dock area.

View File

@ -76,6 +76,7 @@ class BECMotorMap(BECPlotBase):
config: Optional[MotorMapConfig] = None,
client=None,
gui_id: Optional[str] = None,
**kwargs,
):
if config is None:
config = MotorMapConfig(widget_class=self.__class__.__name__)

View File

@ -274,4 +274,4 @@ class BECCurve(BECConnector, pg.PlotDataItem):
"""Remove the curve from the plot."""
# self.parent_item.removeItem(self)
self.parent_item.remove_curve(self.name())
self.rpc_register.remove_rpc(self)
super().remove()

View File

@ -325,4 +325,4 @@ class Curve(BECConnector, pg.PlotDataItem):
"""Remove the curve from the plot."""
# self.parent_item.removeItem(self)
self.parent_item.remove_curve(self.name())
self.rpc_register.remove_rpc(self)
super().remove()

View File

@ -1,81 +1,59 @@
"""This module contains fixtures that are used in the end-2-end tests."""
import random
import time
from contextlib import contextmanager
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process
from bec_widgets.utils import BECDispatcher
from bec_widgets.widgets.containers.figure import BECFigure
from bec_widgets.cli.client_utils import BECGuiClient
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# make threads check in autouse, **will be executed at the end**; better than
# having it in fixtures for each test, since it prevents from needing to
# 'manually' shutdown bec_client_lib (for example) to make it happy, whereas
# whereas in fact bec_client_lib makes its on cleanup
@pytest.fixture(autouse=True)
def threads_check_fixture(threads_check):
"""
Fixture to check if threads are still alive at the end of the test.
This should always run to avoid leaked threads within our application.
The fixture is set to autouse, meaning it will run for every test.
"""
return
@pytest.fixture
def gui_id():
return f"figure_{random.randint(0,100)}" # make a new gui id each time, to ensure no 'gui is alive' zombie key can perturbate
@contextmanager
def plot_server(gui_id, klass, client_lib):
dispatcher = BECDispatcher(client=client_lib) # Has to init singleton with fixture client
process, _ = _start_plot_process(
gui_id, klass, gui_class_id="bec", config=client_lib._client._service_config.config_path
)
try:
while client_lib._client.connector.get(MessageEndpoints.gui_heartbeat(gui_id)) is None:
time.sleep(0.3)
yield gui_id
finally:
process.terminate()
process.wait()
dispatcher.disconnect_all()
dispatcher.reset_singleton()
@pytest.fixture
def connected_client_figure(gui_id, bec_client_lib):
with plot_server(gui_id, BECFigure, bec_client_lib) as server:
yield server
"""New gui id each time, to ensure no 'gui is alive' zombie key can perturbate"""
return f"figure_{random.randint(0,100)}"
@pytest.fixture
def connected_client_gui_obj(gui_id, bec_client_lib):
"""
Fixture to create a new BECGuiClient object and start a server in the background.
This fixture should be used if a new gui instance is needed for each test.
"""
gui = BECGuiClient(gui_id=gui_id)
try:
gui.start(wait=True)
# gui._start_server(wait=True)
yield gui
finally:
gui.kill_server()
@pytest.fixture
def connected_client_dock(gui_id, bec_client_lib):
@pytest.fixture(scope="session")
def connected_gui_with_scope_session(gui_id, bec_client_lib):
"""
Fixture to create a new BECGuiClient object and start a server in the background.
This fixture is scoped to the session, meaning it remains alive for all tests in the session.
We can use this fixture to create a gui object that is used across multiple tests, and
simulate a real-world scenario where the gui is not restarted for each test.
"""
gui = BECGuiClient(gui_id=gui_id)
gui._auto_updates_enabled = False
try:
gui.start(wait=True)
gui.window_list[0]
yield gui.window_list[0]
finally:
gui.kill_server()
@pytest.fixture
def connected_client_dock_w_auto_updates(gui_id, bec_client_lib):
gui = BECGuiClient(gui_id=gui_id)
try:
gui._start_server(wait=True)
yield gui, gui.window_list[0]
yield gui
finally:
gui.kill_server()

View File

@ -1,378 +1,284 @@
# import time
# import numpy as np
# import pytest
# from bec_lib.endpoints import MessageEndpoints
# from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
# from bec_widgets.tests.utils import check_remote_data_size
# from bec_widgets.utils import Colors
# # pylint: disable=unused-argument
# # pylint: disable=redefined-outer-name
# # pylint: disable=too-many-locals
# def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_dock):
# # BEC client shortcuts
# dock = connected_client_dock
# client = bec_client_lib
# dev = client.device_manager.devices
# scans = client.scans
# queue = client.queue
# # Create 3 docks
# d0 = dock.add_dock("dock_0")
# d1 = dock.add_dock("dock_1")
# d2 = dock.add_dock("dock_2")
# dock_config = dock._config_dict
# assert len(dock_config["docks"]) == 3
# # Add 3 figures with some widgets
# fig0 = d0.add_widget("BECFigure")
# fig1 = d1.add_widget("BECFigure")
# fig2 = d2.add_widget("BECFigure")
# dock_config = dock._config_dict
# assert len(dock_config["docks"]) == 3
# assert len(dock_config["docks"]["dock_0"]["widgets"]) == 1
# assert len(dock_config["docks"]["dock_1"]["widgets"]) == 1
# assert len(dock_config["docks"]["dock_2"]["widgets"]) == 1
# assert fig1.__class__.__name__ == "BECFigure"
# assert fig1.__class__ == BECFigure
# assert fig2.__class__.__name__ == "BECFigure"
# assert fig2.__class__ == BECFigure
# mm = fig0.motor_map("samx", "samy")
# plt = fig1.plot(x_name="samx", y_name="bpm4i")
# im = fig2.image("eiger")
# assert mm.__class__.__name__ == "BECMotorMap"
# assert mm.__class__ == BECMotorMap
# assert plt.__class__.__name__ == "BECWaveform"
# assert plt.__class__ == BECWaveform
# assert im.__class__.__name__ == "BECImageShow"
# assert im.__class__ == BECImageShow
# assert mm._config_dict["signals"] == {
# "dap": None,
# "source": "device_readback",
# "x": {
# "name": "samx",
# "entry": "samx",
# "unit": None,
# "modifier": None,
# "limits": [-50.0, 50.0],
# },
# "y": {
# "name": "samy",
# "entry": "samy",
# "unit": None,
# "modifier": None,
# "limits": [-50.0, 50.0],
# },
# "z": None,
# }
# assert plt._config_dict["curves"]["bpm4i-bpm4i"]["signals"] == {
# "dap": None,
# "source": "scan_segment",
# "x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
# "y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None, "limits": None},
# "z": None,
# }
# assert im._config_dict["images"]["eiger"]["monitor"] == "eiger"
# # check initial position of motor map
# initial_pos_x = dev.samx.read()["samx"]["value"]
# initial_pos_y = dev.samy.read()["samy"]["value"]
# # Try to make a scan
# status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
# status.wait()
# # plot
# item = queue.scan_storage.storage[-1]
# plt_last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
# num_elements = 10
# plot_name = "bpm4i-bpm4i"
# qtbot.waitUntil(lambda: check_remote_data_size(plt, plot_name, num_elements))
# plt_data = plt.get_all_data()
# assert plt_data["bpm4i-bpm4i"]["x"] == plt_last_scan_data["samx"]["samx"].val
# assert plt_data["bpm4i-bpm4i"]["y"] == plt_last_scan_data["bpm4i"]["bpm4i"].val
# # image
# last_image_device = client.connector.get_last(MessageEndpoints.device_monitor_2d("eiger"))[
# "data"
# ].data
# time.sleep(0.5)
# last_image_plot = im.images[0].get_data()
# np.testing.assert_equal(last_image_device, last_image_plot)
# # motor map
# final_pos_x = dev.samx.read()["samx"]["value"]
# final_pos_y = dev.samy.read()["samy"]["value"]
# # check final coordinates of motor map
# motor_map_data = mm.get_data()
# np.testing.assert_equal(
# [motor_map_data["x"][0], motor_map_data["y"][0]], [initial_pos_x, initial_pos_y]
# )
# np.testing.assert_equal(
# [motor_map_data["x"][-1], motor_map_data["y"][-1]], [final_pos_x, final_pos_y]
# )
# def test_dock_manipulations_e2e(connected_client_dock):
# dock = connected_client_dock
# d0 = dock.add_dock("dock_0")
# d1 = dock.add_dock("dock_1")
# d2 = dock.add_dock("dock_2")
# dock_config = dock._config_dict
# assert len(dock_config["docks"]) == 3
# d0.detach()
# dock.detach_dock("dock_2")
# dock_config = dock._config_dict
# assert len(dock_config["docks"]) == 3
# assert len(dock.temp_areas) == 2
# d0.attach()
# dock_config = dock._config_dict
# assert len(dock_config["docks"]) == 3
# assert len(dock.temp_areas) == 1
# d2.remove()
# dock_config = dock._config_dict
# assert ["dock_0", "dock_1"] == list(dock_config["docks"])
# dock.clear_all()
# dock_config = dock._config_dict
# assert len(dock_config["docks"]) == 0
# assert len(dock.temp_areas) == 0
# def test_ring_bar(connected_client_dock):
# dock = connected_client_dock
# d0 = dock.add_dock(name="dock_0")
# bar = d0.add_widget("RingProgressBar")
# assert bar.__class__.__name__ == "RingProgressBar"
import time
import numpy as np
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
from bec_widgets.cli.rpc.rpc_base import RPCReference
from bec_widgets.tests.utils import check_remote_data_size
from bec_widgets.utils import Colors
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# pylint: disable=too-many-locals
# pylint: disable=protected-access
def test_gui_rpc_registry(qtbot, connected_client_gui_obj):
gui = connected_client_gui_obj
dock_area = gui.new("cool_dock_area")
def check_dock_area_registered():
return dock_area._gui_id in gui._registry_state
qtbot.waitUntil(check_dock_area_registered, timeout=5000)
assert hasattr(gui, "cool_dock_area")
dock = dock_area.new("dock_0")
def check_dock_registered():
dock_dict = (
gui._registry_state.get(dock_area._gui_id, {}).get("config", {}).get("docks", {})
)
return len(dock_dict) == 1
qtbot.waitUntil(check_dock_registered, timeout=5000)
assert hasattr(gui.cool_dock_area, "dock_0")
# assert hasattr(dock_area, "dock_0")
def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_gui_obj):
gui = connected_client_gui_obj
# BEC client shortcuts
dock = gui.bec
client = bec_client_lib
dev = client.device_manager.devices
scans = client.scans
queue = client.queue
# Create 3 docks
d0 = dock.new("dock_0")
d1 = dock.new("dock_1")
d2 = dock.new("dock_2")
# Check that callback for dock_registry is done
def check_docks_registered():
dock_register = dock._parent._registry_state.get(dock._gui_id, None)
if dock_register is not None:
n_docks = dock_register.get("config", {}).get("docks", {})
return len(n_docks) == 3
return False
# raise AssertionError("Docks not registered yet")
# bar.set_number_of_bars(5)
# bar.set_colors_from_map("viridis")
# bar.set_value([10, 20, 30, 40, 50])
# Waii until docks are registered
qtbot.waitUntil(check_docks_registered, timeout=5000)
qtbot.wait(500)
assert len(dock.panels) == 3
assert hasattr(gui.bec, "dock_0")
# bar_config = bar._config_dict
# Add 3 figures with some widgets
fig0 = d0.new("BECFigure")
fig1 = d1.new("BECFigure")
fig2 = d2.new("BECFigure")
# expected_colors_light = [
# list(color) for color in Colors.golden_angle_color("viridis", 5, "RGB", theme="light")
# ]
# expected_colors_dark = [
# list(color) for color in Colors.golden_angle_color("viridis", 5, "RGB", theme="dark")
# ]
# bar_colors = [ring._config_dict["color"] for ring in bar.rings]
# bar_values = [ring._config_dict["value"] for ring in bar.rings]
# assert bar_config["num_bars"] == 5
# assert bar_values == [10, 20, 30, 40, 50]
# assert bar_colors == expected_colors_light or bar_colors == expected_colors_dark
def check_fig2_registered():
# return hasattr(d2, "BECFigure_2")
dock_config = dock._parent._registry_state[dock._gui_id]["config"]["docks"].get(
d2.widget_name, {}
)
if dock_config:
n_widgets = dock_config.get("widgets", {})
if any(widget_name.startswith("BECFigure") for widget_name in n_widgets.keys()):
return True
raise AssertionError("Figure not registered yet")
qtbot.waitUntil(check_fig2_registered, timeout=5000)
# def test_ring_bar_scan_update(bec_client_lib, connected_client_dock):
# dock = connected_client_dock
assert len(d0.element_list) == 1
assert len(d1.element_list) == 1
assert len(d2.element_list) == 1
# d0 = dock.add_dock("dock_0")
assert fig1.__class__.__name__ == "RPCReference"
assert fig1.__class__ == RPCReference
assert gui._ipython_registry[fig1._gui_id].__class__ == BECFigure
assert fig2.__class__.__name__ == "RPCReference"
assert fig2.__class__ == RPCReference
assert gui._ipython_registry[fig2._gui_id].__class__ == BECFigure
# bar = d0.add_widget("RingProgressBar")
mm = fig0.motor_map("samx", "samy")
plt = fig1.plot(x_name="samx", y_name="bpm4i")
im = fig2.image("eiger")
# client = bec_client_lib
# dev = client.device_manager.devices
# dev.samx.tolerance.set(0)
# dev.samy.tolerance.set(0)
# scans = client.scans
assert mm.__class__.__name__ == "RPCReference"
assert mm.__class__ == RPCReference
assert plt.__class__.__name__ == "RPCReference"
assert plt.__class__ == RPCReference
assert im.__class__.__name__ == "RPCReference"
assert im.__class__ == RPCReference
# status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
# status.wait()
# bar_config = bar._config_dict
# assert bar_config["num_bars"] == 1
# assert bar_config["rings"][0]["value"] == 10
# assert bar_config["rings"][0]["min_value"] == 0
# assert bar_config["rings"][0]["max_value"] == 10
def test_dock_manipulations_e2e(qtbot, connected_client_gui_obj):
gui = connected_client_gui_obj
dock = gui.bec
# status = scans.grid_scan(dev.samx, -5, 5, 4, dev.samy, -10, 10, 4, relative=True, exp_time=0.1)
# status.wait()
d0 = dock.new("dock_0")
d1 = dock.new("dock_1")
d2 = dock.new("dock_2")
# bar_config = bar._config_dict
# assert bar_config["num_bars"] == 1
# assert bar_config["rings"][0]["value"] == 16
# assert bar_config["rings"][0]["min_value"] == 0
# assert bar_config["rings"][0]["max_value"] == 16
assert hasattr(gui.bec, "dock_0")
assert hasattr(gui.bec, "dock_1")
assert hasattr(gui.bec, "dock_2")
assert len(gui.bec.panels) == 3
# init_samx = dev.samx.read()["samx"]["value"]
# init_samy = dev.samy.read()["samy"]["value"]
# final_samx = init_samx + 5
# final_samy = init_samy + 10
d0.detach()
dock.detach_dock("dock_2")
# How can we properly check that the dock is detached?
assert len(gui.bec.panels) == 3
d0.attach()
assert len(gui.bec.panels) == 3
# dev.samx.velocity.put(5)
# dev.samy.velocity.put(5)
# status = scans.umv(dev.samx, 5, dev.samy, 10, relative=True)
# status.wait()
# bar_config = bar._config_dict
# assert bar_config["num_bars"] == 2
# assert bar_config["rings"][0]["value"] == final_samx
# assert bar_config["rings"][1]["value"] == final_samy
# assert bar_config["rings"][0]["min_value"] == init_samx
# assert bar_config["rings"][0]["max_value"] == final_samx
# assert bar_config["rings"][1]["min_value"] == init_samy
# assert bar_config["rings"][1]["max_value"] == final_samy
# def test_auto_update(bec_client_lib, connected_client_dock_w_auto_updates, qtbot):
# client = bec_client_lib
# dev = client.device_manager.devices
# scans = client.scans
# queue = client.queue
# gui, dock = connected_client_dock_w_auto_updates
# auto_updates = gui.auto_updates
# def get_default_figure():
# return auto_updates.get_default_figure()
# plt = get_default_figure()
# gui.selected_device = "bpm4i"
# status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
# status.wait()
# # get data from curves
# widgets = plt.widget_list
# qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
# item = queue.scan_storage.storage[-1]
# last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
# num_elements = 10
# plot_name = f"Scan {status.scan.scan_number} - {dock.selected_device}"
# qtbot.waitUntil(lambda: check_remote_data_size(widgets[0], plot_name, num_elements))
# plt_data = widgets[0].get_all_data()
# # check plotted data
# assert (
# plt_data[f"Scan {status.scan.scan_number} - bpm4i"]["x"]
# == last_scan_data["samx"]["samx"].val
# )
# assert (
# plt_data[f"Scan {status.scan.scan_number} - bpm4i"]["y"]
# == last_scan_data["bpm4i"]["bpm4i"].val
# )
# status = scans.grid_scan(
# dev.samx, -10, 10, 5, dev.samy, -5, 5, 5, exp_time=0.05, relative=False
# )
# status.wait()
# plt = auto_updates.get_default_figure()
# widgets = plt.widget_list
# qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
# item = queue.scan_storage.storage[-1]
# last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
# plot_name = f"Scan {status.scan.scan_number} - bpm4i"
# num_elements_bec = 25
# qtbot.waitUntil(lambda: check_remote_data_size(widgets[0], plot_name, num_elements_bec))
# plt_data = widgets[0].get_all_data()
# # check plotted data
# assert (
# plt_data[f"Scan {status.scan.scan_number} - {dock.selected_device}"]["x"]
# == last_scan_data["samx"]["samx"].val
# )
# assert (
# plt_data[f"Scan {status.scan.scan_number} - {dock.selected_device}"]["y"]
# == last_scan_data["samy"]["samy"].val
# )
# def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
# gui = connected_client_gui_obj
# assert gui.selected_device is None
# assert len(gui.windows) == 1
# assert gui.windows["main"].widget is gui.main
# assert gui.windows["main"].title == "BEC Widgets"
# mw = gui.main
# assert mw.__class__.__name__ == "BECDockArea"
# xw = gui.new("X")
# assert xw.__class__.__name__ == "BECDockArea"
# assert len(gui.windows) == 2
# gui_info = gui._dump()
# mw_info = gui_info[mw._gui_id]
# assert mw_info["title"] == "BEC Widgets"
# assert mw_info["visible"]
# xw_info = gui_info[xw._gui_id]
# assert xw_info["title"] == "X"
# assert xw_info["visible"]
# gui.hide()
# gui_info = gui._dump()
# assert not any(windows["visible"] for windows in gui_info.values())
# gui.show()
# gui_info = gui._dump()
# assert all(windows["visible"] for windows in gui_info.values())
# assert gui.gui_is_alive()
# gui.close()
# assert not gui.gui_is_alive()
# gui.start_server(wait=True)
# assert gui.gui_is_alive()
# # calling start multiple times should not change anything
# gui.start_server(wait=True)
# gui.start()
# # gui.windows should have main, and main dock area should have same gui_id as before
# assert len(gui.windows) == 1
# assert gui.windows["main"].widget._gui_id == mw._gui_id
# # communication should work, main dock area should have same id and be visible
# gui_info = gui._dump()
# assert gui_info[mw._gui_id]["visible"]
# with pytest.raises(RuntimeError):
# gui.main.delete()
# yw = gui.new("Y")
# assert len(gui.windows) == 2
# yw.delete()
# assert len(gui.windows) == 1
# # check it is really deleted on server
# gui_info = gui._dump()
# assert yw._gui_id not in gui_info
# def test_rpc_call_with_exception_in_safeslot_error_popup(connected_client_gui_obj, qtbot):
# gui = connected_client_gui_obj
# gui.main.add_dock("test")
# qtbot.waitUntil(lambda: len(gui.main.panels) == 2) # default_figure + test
# qtbot.wait(500)
# with pytest.raises(ValueError):
# gui.main.add_dock("test")
# # time.sleep(0.1)
def wait_for_dock_removed():
dock_config = gui._registry_state[gui.bec._gui_id]["config"]["docks"]
return len(dock_config.keys()) == 2
d2.remove()
qtbot.waitUntil(wait_for_dock_removed, timeout=5000)
assert len(gui.bec.panels) == 2
def wait_for_docks_removed():
dock_config = gui._registry_state[gui.bec._gui_id]["config"]["docks"]
return len(dock_config.keys()) == 0
dock.clear_all()
qtbot.waitUntil(wait_for_docks_removed, timeout=5000)
assert len(gui.bec.panels) == 0
def test_ring_bar(connected_client_dock):
dock = connected_client_dock
d0 = dock.add_dock(name="dock_0")
bar = d0.add_widget("RingProgressBar")
assert bar.__class__.__name__ == "RingProgressBar"
plt = get_default_figure()
gui.selected_device = "bpm4i"
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
status.wait()
# get data from curves
widgets = plt.widget_list
qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
item = queue.scan_storage.storage[-1]
last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
num_elements = 10
plot_name = f"Scan {status.scan.scan_number} - {dock.selected_device}"
qtbot.waitUntil(lambda: check_remote_data_size(widgets[0], plot_name, num_elements))
plt_data = widgets[0].get_all_data()
# check plotted data
assert (
plt_data[f"Scan {status.scan.scan_number} - bpm4i"]["x"]
== last_scan_data["samx"]["samx"].val
)
assert (
plt_data[f"Scan {status.scan.scan_number} - bpm4i"]["y"]
== last_scan_data["bpm4i"]["bpm4i"].val
)
status = scans.grid_scan(
dev.samx, -10, 10, 5, dev.samy, -5, 5, 5, exp_time=0.05, relative=False
)
status.wait()
plt = auto_updates.get_default_figure()
widgets = plt.widget_list
qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
item = queue.scan_storage.storage[-1]
last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
plot_name = f"Scan {status.scan.scan_number} - bpm4i"
num_elements_bec = 25
qtbot.waitUntil(lambda: check_remote_data_size(widgets[0], plot_name, num_elements_bec))
plt_data = widgets[0].get_all_data()
# check plotted data
assert (
plt_data[f"Scan {status.scan.scan_number} - {dock.selected_device}"]["x"]
== last_scan_data["samx"]["samx"].val
)
assert (
plt_data[f"Scan {status.scan.scan_number} - {dock.selected_device}"]["y"]
== last_scan_data["samy"]["samy"].val
)
def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
gui = connected_client_gui_obj
assert gui.selected_device is None
assert len(gui.windows) == 1
assert gui.windows["bec"] is gui.bec
mw = gui.bec
assert mw.__class__.__name__ == "BECDockArea"
xw = gui.new("X")
assert xw.__class__.__name__ == "BECDockArea"
assert len(gui.windows) == 2
gui_info = gui._dump()
mw_info = gui_info[mw._gui_id]
assert mw_info["title"] == "BEC"
assert mw_info["visible"]
xw_info = gui_info[xw._gui_id]
assert xw_info["title"] == "BEC - X"
assert xw_info["visible"]
gui.hide()
gui_info = gui._dump()
assert not any(windows["visible"] for windows in gui_info.values())
gui.show()
gui_info = gui._dump()
assert all(windows["visible"] for windows in gui_info.values())
assert gui._gui_is_alive()
gui._close()
assert not gui._gui_is_alive()
gui._start_server(wait=True)
assert gui._gui_is_alive()
# calling start multiple times should not change anything
gui._start_server(wait=True)
gui._start()
# gui.windows should have bec with gui_id 'bec'
assert len(gui.windows) == 1
assert gui.windows["bec"]._gui_id == mw._gui_id
# communication should work, main dock area should have same id and be visible
gui_info = gui._dump()
assert gui_info[mw._gui_id]["visible"]
with pytest.raises(RuntimeError):
gui.bec.delete()
yw = gui.new("Y")
assert len(gui.windows) == 2
yw.delete()
assert len(gui.windows) == 1
# check it is really deleted on server
gui_info = gui._dump()
assert yw._gui_id not in gui_info
def test_rpc_call_with_exception_in_safeslot_error_popup(connected_client_gui_obj, qtbot):
gui = connected_client_gui_obj
gui.main.add_dock("test")
qtbot.waitUntil(lambda: len(gui.main.panels) == 2) # default_figure + test
qtbot.wait(500)
with pytest.raises(ValueError):
gui.bec.add_dock("test")
# time.sleep(0.1)

View File

@ -5,8 +5,11 @@ import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
from bec_widgets.cli.rpc.rpc_base import RPCReference
from bec_widgets.tests.utils import check_remote_data_size
# pylint: disable=protected-access
@pytest.fixture
def connected_figure(connected_client_gui_obj):
@ -39,12 +42,15 @@ def test_rpc_plotting_shortcuts_init_configs(connected_figure, qtbot):
# Checking if classes are correctly initialised
assert len(fig.widgets) == 4
assert plt.__class__.__name__ == "BECWaveform"
assert plt.__class__ == BECWaveform
assert im.__class__.__name__ == "BECImageShow"
assert im.__class__ == BECImageShow
assert motor_map.__class__.__name__ == "BECMotorMap"
assert motor_map.__class__ == BECMotorMap
assert plt.__class__.__name__ == "RPCReference"
assert plt.__class__ == RPCReference
assert plt._root._ipython_registry[plt._gui_id].__class__ == BECWaveform
assert im.__class__.__name__ == "RPCReference"
assert im.__class__ == RPCReference
assert im._root._ipython_registry[im._gui_id].__class__ == BECImageShow
assert motor_map.__class__.__name__ == "RPCReference"
assert motor_map.__class__ == RPCReference
assert motor_map._root._ipython_registry[motor_map._gui_id].__class__ == BECMotorMap
# check if the correct devices are set
# plot
@ -215,10 +221,11 @@ def test_dap_rpc(connected_figure, bec_client_lib, qtbot):
def test_removing_subplots(connected_figure, bec_client_lib):
fig = connected_figure
plt = fig.plot(x_name="samx", y_name="bpm4i", dap="GaussianModel")
im = fig.image(monitor="eiger")
mm = fig.motor_map(motor_x="samx", motor_y="samy")
# Registry can't handle multiple subplots on one widget, BECFigure will be deprecated though
# im = fig.image(monitor="eiger")
# mm = fig.motor_map(motor_x="samx", motor_y="samy")
assert len(fig.widget_list) == 3
assert len(fig.widget_list) == 1
# removing curves
assert len(plt.curves) == 2
@ -229,7 +236,7 @@ def test_removing_subplots(connected_figure, bec_client_lib):
# removing all subplots from figure
plt.remove()
im.remove()
mm.remove()
# im.remove()
# mm.remove()
assert len(fig.widget_list) == 0