mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-13 19:21:50 +02:00
refactor: BECGuiClientMixin -> BECGuiClient
- Mixin class was only used with BECDockArea, now it is a class by itself which represents the client object connected to the GUI server ; ".main" is the dock area of the main window - Enhanced "wait_for_server" - ".selected_device" is stored in Redis, to allow server-side to know about the auto update configuration instead of keeping it on client
This commit is contained in:
@ -5,7 +5,7 @@ from __future__ import annotations
|
||||
import enum
|
||||
from typing import Literal, Optional, overload
|
||||
|
||||
from bec_widgets.cli.client_utils import BECGuiClientMixin, RPCBase, rpc_call
|
||||
from bec_widgets.cli.client_utils import RPCBase, rpc_call
|
||||
|
||||
# pylint: skip-file
|
||||
|
||||
@ -342,7 +342,7 @@ class BECDock(RPCBase):
|
||||
"""
|
||||
|
||||
|
||||
class BECDockArea(RPCBase, BECGuiClientMixin):
|
||||
class BECDockArea(RPCBase):
|
||||
@property
|
||||
@rpc_call
|
||||
def _config_dict(self) -> "dict":
|
||||
@ -353,6 +353,13 @@ class BECDockArea(RPCBase, BECGuiClientMixin):
|
||||
dict: The configuration of the widget.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def selected_device(self) -> "str":
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def panels(self) -> "dict[str, BECDock]":
|
||||
|
@ -9,6 +9,7 @@ import subprocess
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from contextlib import contextmanager
|
||||
from functools import wraps
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
@ -132,151 +133,6 @@ class RepeatTimer(threading.Timer):
|
||||
self.function(*self.args, **self.kwargs)
|
||||
|
||||
|
||||
class BECGuiClientMixin:
|
||||
def __init__(self, **kwargs) -> None:
|
||||
super().__init__(**kwargs)
|
||||
self._auto_updates_enabled = True
|
||||
self._auto_updates = None
|
||||
self._gui_started_timer = None
|
||||
self._gui_started_event = threading.Event()
|
||||
self._process = None
|
||||
self._process_output_processing_thread = None
|
||||
self._target_endpoint = MessageEndpoints.scan_status()
|
||||
self._selected_device = None
|
||||
|
||||
@property
|
||||
def auto_updates(self):
|
||||
if self._auto_updates_enabled:
|
||||
self._gui_started_event.wait()
|
||||
return self._auto_updates
|
||||
|
||||
def shutdown_auto_updates(self):
|
||||
if self._auto_updates_enabled:
|
||||
if self._auto_updates is not None:
|
||||
self._auto_updates.shutdown()
|
||||
self._auto_updates = None
|
||||
|
||||
def _get_update_script(self) -> AutoUpdates | None:
|
||||
eps = imd.entry_points(group="bec.widgets.auto_updates")
|
||||
for ep in eps:
|
||||
if ep.name == "plugin_widgets_update":
|
||||
try:
|
||||
spec = importlib.util.find_spec(ep.module)
|
||||
# if the module is not found, we skip it
|
||||
if spec is None:
|
||||
continue
|
||||
return ep.load()(gui=self)
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading auto update script from plugin: {str(e)}")
|
||||
return None
|
||||
|
||||
@property
|
||||
def selected_device(self):
|
||||
"""
|
||||
Selected device for the plot.
|
||||
"""
|
||||
return self._selected_device
|
||||
|
||||
@selected_device.setter
|
||||
def selected_device(self, device: str | DeviceBase):
|
||||
if isinstance_based_on_class_name(device, "bec_lib.device.DeviceBase"):
|
||||
self._selected_device = device.name
|
||||
elif isinstance(device, str):
|
||||
self._selected_device = device
|
||||
else:
|
||||
raise ValueError("Device must be a string or a device object")
|
||||
|
||||
def _start_update_script(self) -> None:
|
||||
self._client.connector.register(
|
||||
self._target_endpoint, cb=self._handle_msg_update, parent=self
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _handle_msg_update(msg: MessageObject, parent: BECGuiClientMixin) -> None:
|
||||
if parent.auto_updates is not None:
|
||||
# pylint: disable=protected-access
|
||||
parent._update_script_msg_parser(msg.value)
|
||||
|
||||
def _update_script_msg_parser(self, msg: messages.BECMessage) -> None:
|
||||
if isinstance(msg, messages.ScanStatusMessage):
|
||||
if not self.gui_is_alive():
|
||||
return
|
||||
if self._auto_updates_enabled:
|
||||
self.auto_updates.msg_queue.put(msg)
|
||||
|
||||
def _gui_post_startup(self):
|
||||
if self._auto_updates_enabled:
|
||||
if self._auto_updates is None:
|
||||
auto_updates = self._get_update_script()
|
||||
if auto_updates is None:
|
||||
AutoUpdates.create_default_dock = True
|
||||
AutoUpdates.enabled = True
|
||||
auto_updates = AutoUpdates(gui=self)
|
||||
if auto_updates.create_default_dock:
|
||||
auto_updates.start_default_dock()
|
||||
# fig = auto_updates.get_default_figure()
|
||||
self._auto_updates = auto_updates
|
||||
self._gui_started_event.set()
|
||||
self.show_all()
|
||||
|
||||
def start_server(self, wait=False) -> None:
|
||||
"""
|
||||
Start the GUI server, and execute callback when it is launched
|
||||
"""
|
||||
if self._process is None or self._process.poll() is not None:
|
||||
logger.success("GUI starting...")
|
||||
self._gui_started_event.clear()
|
||||
self._start_update_script()
|
||||
self._process, self._process_output_processing_thread = _start_plot_process(
|
||||
self._gui_id, self.__class__, self._client._service_config.config, logger=logger
|
||||
)
|
||||
|
||||
def gui_started_callback(callback):
|
||||
try:
|
||||
if callable(callback):
|
||||
callback()
|
||||
finally:
|
||||
threading.current_thread().cancel()
|
||||
|
||||
self._gui_started_timer = RepeatTimer(
|
||||
1, lambda: self.gui_is_alive() and gui_started_callback(self._gui_post_startup)
|
||||
)
|
||||
self._gui_started_timer.start()
|
||||
|
||||
if wait:
|
||||
self._gui_started_event.wait()
|
||||
|
||||
def show_all(self):
|
||||
self._gui_started_event.wait()
|
||||
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
|
||||
rpc_client._run_rpc("show")
|
||||
|
||||
def hide_all(self):
|
||||
self._gui_started_event.wait()
|
||||
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
|
||||
rpc_client._run_rpc("hide")
|
||||
|
||||
def close(self) -> None:
|
||||
"""
|
||||
Close the gui window.
|
||||
"""
|
||||
if self._gui_started_timer is not None:
|
||||
self._gui_started_timer.cancel()
|
||||
self._gui_started_timer.join()
|
||||
|
||||
if self._process is None:
|
||||
return
|
||||
|
||||
if self._process:
|
||||
logger.success("Stopping GUI...")
|
||||
self._process.terminate()
|
||||
if self._process_output_processing_thread:
|
||||
self._process_output_processing_thread.join()
|
||||
self._process.wait()
|
||||
self._process = None
|
||||
self.shutdown_auto_updates()
|
||||
|
||||
|
||||
class RPCResponseTimeoutError(Exception):
|
||||
"""Exception raised when an RPC response is not received within the expected time."""
|
||||
|
||||
@ -401,3 +257,209 @@ class RPCBase:
|
||||
if heart.status == messages.BECStatus.RUNNING:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class RepeatTimer(threading.Timer):
|
||||
def run(self):
|
||||
while not self.finished.wait(self.interval):
|
||||
self.function(*self.args, **self.kwargs)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def wait_for_server(client):
|
||||
timeout = client._startup_timeout
|
||||
if not timeout:
|
||||
if client.gui_is_alive():
|
||||
# there is hope, let's wait a bit
|
||||
timeout = 1
|
||||
else:
|
||||
raise RuntimeError("GUI is not alive")
|
||||
try:
|
||||
if client._gui_started_event.wait(timeout=timeout):
|
||||
client._gui_started_timer.cancel()
|
||||
client._gui_started_timer.join()
|
||||
else:
|
||||
raise TimeoutError("Could not connect to GUI server")
|
||||
finally:
|
||||
# after initial waiting period, do not wait so much any more
|
||||
# (only relevant if GUI didn't start)
|
||||
client._startup_timeout = 0
|
||||
yield
|
||||
|
||||
|
||||
class BECGuiClient(RPCBase):
|
||||
def __init__(self, **kwargs) -> None:
|
||||
super().__init__(**kwargs)
|
||||
self._auto_updates_enabled = True
|
||||
self._auto_updates = None
|
||||
self._startup_timeout = 0
|
||||
self._gui_started_timer = None
|
||||
self._gui_started_event = threading.Event()
|
||||
self._process = None
|
||||
self._process_output_processing_thread = None
|
||||
self._target_endpoint = MessageEndpoints.scan_status()
|
||||
|
||||
@property
|
||||
def auto_updates(self):
|
||||
if self._auto_updates_enabled:
|
||||
with wait_for_server(self):
|
||||
return self._auto_updates
|
||||
|
||||
def _shutdown_auto_updates(self):
|
||||
if self._auto_updates_enabled:
|
||||
if self._auto_updates is not None:
|
||||
self._auto_updates.shutdown()
|
||||
self._auto_updates = None
|
||||
|
||||
def _get_update_script(self) -> AutoUpdates | None:
|
||||
eps = imd.entry_points(group="bec.widgets.auto_updates")
|
||||
for ep in eps:
|
||||
if ep.name == "plugin_widgets_update":
|
||||
try:
|
||||
spec = importlib.util.find_spec(ep.module)
|
||||
# if the module is not found, we skip it
|
||||
if spec is None:
|
||||
continue
|
||||
return ep.load()(gui=self)
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading auto update script from plugin: {str(e)}")
|
||||
return None
|
||||
|
||||
@property
|
||||
def selected_device(self):
|
||||
"""
|
||||
Selected device for the plot.
|
||||
"""
|
||||
auto_update_config_ep = MessageEndpoints.gui_auto_update_config(self._gui_id)
|
||||
auto_update_config = self._client.connector.get(auto_update_config_ep)
|
||||
if auto_update_config:
|
||||
return auto_update_config.selected_device
|
||||
return None
|
||||
|
||||
@selected_device.setter
|
||||
def selected_device(self, device: str | DeviceBase):
|
||||
if isinstance_based_on_class_name(device, "bec_lib.device.DeviceBase"):
|
||||
self._client.connector.set_and_publish(
|
||||
MessageEndpoints.gui_auto_update_config(self._gui_id),
|
||||
messages.GUIAutoUpdateConfigMessage(selected_device=device.name),
|
||||
)
|
||||
elif isinstance(device, str):
|
||||
self._client.connector.set_and_publish(
|
||||
MessageEndpoints.gui_auto_update_config(self._gui_id),
|
||||
messages.GUIAutoUpdateConfigMessage(selected_device=device),
|
||||
)
|
||||
else:
|
||||
raise ValueError("Device must be a string or a device object")
|
||||
|
||||
def _start_update_script(self) -> None:
|
||||
self._client.connector.register(
|
||||
self._target_endpoint, cb=self._handle_msg_update, parent=self
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _handle_msg_update(msg: MessageObject, parent: BECGuiClient) -> None:
|
||||
if parent.auto_updates is not None:
|
||||
# pylint: disable=protected-access
|
||||
parent._update_script_msg_parser(msg.value)
|
||||
|
||||
def _update_script_msg_parser(self, msg: messages.BECMessage) -> None:
|
||||
if isinstance(msg, messages.ScanStatusMessage):
|
||||
if not self.gui_is_alive():
|
||||
return
|
||||
if self._auto_updates_enabled:
|
||||
self.auto_updates.msg_queue.put(msg)
|
||||
|
||||
def _gui_post_startup(self):
|
||||
self._gui_started_event.set()
|
||||
if self._auto_updates_enabled:
|
||||
if self._auto_updates is None:
|
||||
auto_updates = self._get_update_script()
|
||||
if auto_updates is None:
|
||||
AutoUpdates.create_default_dock = True
|
||||
AutoUpdates.enabled = True
|
||||
auto_updates = AutoUpdates(gui=client.BECDockArea(gui_id=self._gui_id))
|
||||
if auto_updates.create_default_dock:
|
||||
auto_updates.start_default_dock()
|
||||
self._auto_updates = auto_updates
|
||||
self._do_show_all()
|
||||
|
||||
def start_server(self, wait=False) -> None:
|
||||
"""
|
||||
Start the GUI server, and execute callback when it is launched
|
||||
"""
|
||||
if self._process is None or self._process.poll() is not None:
|
||||
logger.success("GUI starting...")
|
||||
self._startup_timeout = 5
|
||||
self._gui_started_event.clear()
|
||||
self._start_update_script()
|
||||
self._process, self._process_output_processing_thread = _start_plot_process(
|
||||
self._gui_id, self.__class__, self._client._service_config.config, logger=logger
|
||||
)
|
||||
|
||||
def gui_started_callback(callback):
|
||||
try:
|
||||
if callable(callback):
|
||||
callback()
|
||||
finally:
|
||||
threading.current_thread().cancel()
|
||||
|
||||
self._gui_started_timer = RepeatTimer(
|
||||
0.5, lambda: self.gui_is_alive() and gui_started_callback(self._gui_post_startup)
|
||||
)
|
||||
self._gui_started_timer.start()
|
||||
|
||||
if wait:
|
||||
self._gui_started_event.wait()
|
||||
|
||||
|
||||
def start(self):
|
||||
return self.start_server()
|
||||
|
||||
def _do_show_all(self):
|
||||
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
|
||||
rpc_client._run_rpc("show")
|
||||
|
||||
def show_all(self):
|
||||
with wait_for_server(self):
|
||||
return self._do_show_all()
|
||||
|
||||
def hide_all(self):
|
||||
with wait_for_server(self):
|
||||
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
|
||||
rpc_client._run_rpc("hide")
|
||||
|
||||
def show(self):
|
||||
if self._process is not None:
|
||||
return self.show_all()
|
||||
else:
|
||||
# backward compatibility: show() was also starting server
|
||||
return self.start_server(wait=True)
|
||||
|
||||
def hide(self):
|
||||
return self.hide_all()
|
||||
|
||||
@property
|
||||
def main(self):
|
||||
"""Return client to main dock area (in main window)"""
|
||||
with wait_for_server(self):
|
||||
return client.BECDockArea(gui_id=self._gui_id)
|
||||
|
||||
def close(self) -> None:
|
||||
"""
|
||||
Close the gui window.
|
||||
"""
|
||||
if self._gui_started_timer is not None:
|
||||
self._gui_started_timer.cancel()
|
||||
self._gui_started_timer.join()
|
||||
|
||||
if self._process is None:
|
||||
return
|
||||
|
||||
if self._process:
|
||||
logger.success("Stopping GUI...")
|
||||
self._process.terminate()
|
||||
if self._process_output_processing_thread:
|
||||
self._process_output_processing_thread.join()
|
||||
self._process.wait()
|
||||
self._process = None
|
||||
self._shutdown_auto_updates()
|
||||
|
@ -35,7 +35,7 @@ from __future__ import annotations
|
||||
import enum
|
||||
from typing import Literal, Optional, overload
|
||||
|
||||
from bec_widgets.cli.client_utils import RPCBase, rpc_call, BECGuiClientMixin
|
||||
from bec_widgets.cli.client_utils import RPCBase, rpc_call
|
||||
|
||||
# pylint: skip-file"""
|
||||
|
||||
@ -84,7 +84,7 @@ class Widgets(str, enum.Enum):
|
||||
# Generate the content
|
||||
if cls.__name__ == "BECDockArea":
|
||||
self.content += f"""
|
||||
class {class_name}(RPCBase, BECGuiClientMixin):"""
|
||||
class {class_name}(RPCBase):"""
|
||||
else:
|
||||
self.content += f"""
|
||||
class {class_name}(RPCBase):"""
|
||||
|
@ -199,10 +199,20 @@ def main():
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.gui_class == "BECFigure":
|
||||
gui_class = BECFigure
|
||||
elif args.gui_class == "BECDockArea":
|
||||
if args.hide:
|
||||
# if we start hidden, it means we are under control of the client
|
||||
# -> set the log level to critical to not see all the messages
|
||||
# pylint: disable=protected-access
|
||||
# bec_logger._stderr_log_level = bec_logger.LOGLEVEL.CRITICAL
|
||||
bec_logger.level = bec_logger.LOGLEVEL.CRITICAL
|
||||
else:
|
||||
# verbose log
|
||||
bec_logger.level = bec_logger.LOGLEVEL.DEBUG
|
||||
|
||||
if args.gui_class == "BECDockArea":
|
||||
gui_class = BECDockArea
|
||||
elif args.gui_class == "BECFigure":
|
||||
gui_class = BECFigure
|
||||
else:
|
||||
print(
|
||||
"Please specify a valid gui_class to run. Use -h for help."
|
||||
@ -222,6 +232,8 @@ def main():
|
||||
size=QSize(48, 48),
|
||||
)
|
||||
app.setWindowIcon(icon)
|
||||
# store gui id within QApplication object, to make it available to all widgets
|
||||
app.gui_id = args.id
|
||||
|
||||
server = _start_server(args.id, gui_class, args.config)
|
||||
|
||||
|
@ -3,11 +3,12 @@ from __future__ import annotations
|
||||
from typing import Literal, Optional
|
||||
from weakref import WeakValueDictionary
|
||||
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from pydantic import Field
|
||||
from pyqtgraph.dockarea.DockArea import DockArea
|
||||
from qtpy.QtCore import Qt
|
||||
from qtpy.QtGui import QPainter, QPaintEvent
|
||||
from qtpy.QtWidgets import QSizePolicy, QVBoxLayout, QWidget
|
||||
from qtpy.QtWidgets import QApplication, QSizePolicy, QVBoxLayout, QWidget
|
||||
|
||||
from bec_widgets.qt_utils.error_popups import SafeSlot
|
||||
from bec_widgets.qt_utils.toolbar import (
|
||||
@ -43,6 +44,7 @@ class BECDockArea(BECWidget, QWidget):
|
||||
PLUGIN = True
|
||||
USER_ACCESS = [
|
||||
"_config_dict",
|
||||
"selected_device",
|
||||
"panels",
|
||||
"save_state",
|
||||
"remove_dock",
|
||||
@ -210,6 +212,13 @@ class BECDockArea(BECWidget, QWidget):
|
||||
"Add docks using 'add_dock' method from CLI\n or \n Add widget docks using the toolbar",
|
||||
)
|
||||
|
||||
@property
|
||||
def selected_device(self) -> str:
|
||||
gui_id = QApplication.instance().gui_id
|
||||
return self.client.connector.get(
|
||||
MessageEndpoints.gui_auto_update_config(gui_id)
|
||||
).selected_device
|
||||
|
||||
@property
|
||||
def panels(self) -> dict[str, BECDock]:
|
||||
"""
|
||||
|
@ -5,8 +5,7 @@ from contextlib import contextmanager
|
||||
import pytest
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
|
||||
from bec_widgets.cli.client import BECDockArea
|
||||
from bec_widgets.cli.client_utils import _start_plot_process
|
||||
from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process
|
||||
from bec_widgets.utils import BECDispatcher
|
||||
from bec_widgets.widgets.containers.figure import BECFigure
|
||||
|
||||
@ -48,20 +47,20 @@ def rpc_server_figure(gui_id, bec_client_lib):
|
||||
|
||||
@pytest.fixture
|
||||
def rpc_server_dock(gui_id, bec_client_lib):
|
||||
dock_area = BECDockArea(gui_id=gui_id)
|
||||
dock_area._auto_updates_enabled = False
|
||||
gui = BECGuiClient(gui_id=gui_id)
|
||||
gui._auto_updates_enabled = False
|
||||
try:
|
||||
dock_area.start_server(wait=True)
|
||||
yield dock_area
|
||||
gui.start_server(wait=True)
|
||||
yield gui.main
|
||||
finally:
|
||||
dock_area.close()
|
||||
gui.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def rpc_server_dock_w_auto_updates(gui_id, bec_client_lib):
|
||||
dock_area = BECDockArea(gui_id=gui_id)
|
||||
gui = BECGuiClient(gui_id=gui_id)
|
||||
try:
|
||||
dock_area.start_server(wait=True)
|
||||
yield dock_area
|
||||
gui.start_server(wait=True)
|
||||
yield gui, gui.main
|
||||
finally:
|
||||
dock_area.close()
|
||||
gui.close()
|
||||
|
@ -239,14 +239,15 @@ def test_auto_update(bec_client_lib, rpc_server_dock_w_auto_updates, qtbot):
|
||||
dev = client.device_manager.devices
|
||||
scans = client.scans
|
||||
queue = client.queue
|
||||
dock = rpc_server_dock_w_auto_updates
|
||||
gui, dock = rpc_server_dock_w_auto_updates
|
||||
auto_updates = gui.auto_updates
|
||||
|
||||
def get_default_figure():
|
||||
return dock.auto_updates.get_default_figure()
|
||||
return auto_updates.get_default_figure()
|
||||
|
||||
plt = get_default_figure()
|
||||
|
||||
dock.selected_device = "bpm4i"
|
||||
gui.selected_device = "bpm4i"
|
||||
|
||||
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
|
||||
status.wait()
|
||||
@ -274,7 +275,7 @@ def test_auto_update(bec_client_lib, rpc_server_dock_w_auto_updates, qtbot):
|
||||
)
|
||||
status.wait()
|
||||
|
||||
plt = dock.auto_updates.get_default_figure()
|
||||
plt = auto_updates.get_default_figure()
|
||||
widgets = plt.widget_list
|
||||
qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
|
||||
plt_data = widgets[0].get_all_data()
|
||||
|
@ -4,7 +4,7 @@ from unittest import mock
|
||||
import pytest
|
||||
|
||||
from bec_widgets.cli.client import BECFigure
|
||||
from bec_widgets.cli.client_utils import BECGuiClientMixin, _start_plot_process
|
||||
from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process
|
||||
from bec_widgets.tests.utils import FakeDevice
|
||||
|
||||
|
||||
@ -63,7 +63,7 @@ def test_client_utils_passes_client_config_to_server(bec_dispatcher):
|
||||
|
||||
@contextmanager
|
||||
def bec_client_mixin():
|
||||
mixin = BECGuiClientMixin()
|
||||
mixin = BECGuiClient()
|
||||
mixin._client = bec_dispatcher.client
|
||||
mixin._gui_id = "gui_id"
|
||||
mixin.gui_is_alive = mock.MagicMock()
|
||||
@ -82,6 +82,6 @@ def test_client_utils_passes_client_config_to_server(bec_dispatcher):
|
||||
wait=False
|
||||
) # the started event will not be set, wait=True would block forever
|
||||
mock_start_plot.assert_called_once_with(
|
||||
"gui_id", BECGuiClientMixin, mixin._client._service_config.config, logger=mock.ANY
|
||||
"gui_id", BECGuiClient, mixin._client._service_config.config, logger=mock.ANY
|
||||
)
|
||||
mixin._start_update_script.assert_called_once()
|
||||
|
@ -70,7 +70,7 @@ def test_client_generator_with_black_formatting():
|
||||
import enum
|
||||
from typing import Literal, Optional, overload
|
||||
|
||||
from bec_widgets.cli.client_utils import BECGuiClientMixin, RPCBase, rpc_call
|
||||
from bec_widgets.cli.client_utils import RPCBase, rpc_call
|
||||
|
||||
# pylint: skip-file
|
||||
|
||||
|
Reference in New Issue
Block a user