mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 11:41:49 +02:00
fix: fixed support for auto updates
This commit is contained in:
@ -1,8 +1 @@
|
||||
from bec_lib.utils.import_utils import lazy_import_from
|
||||
|
||||
# from .auto_updates import AutoUpdates, ScanInfo
|
||||
# TODO: put back when Pydantic gets faster
|
||||
AutoUpdates, ScanInfo = lazy_import_from(
|
||||
"bec_widgets.cli.auto_updates", ("AutoUpdates", "ScanInfo")
|
||||
)
|
||||
from .client import BECDockArea, BECFigure
|
||||
|
@ -5,7 +5,7 @@ from typing import TYPE_CHECKING
|
||||
from pydantic import BaseModel
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .client import BECFigure
|
||||
from .client import BECDockArea, BECFigure
|
||||
|
||||
|
||||
class ScanInfo(BaseModel):
|
||||
@ -18,9 +18,20 @@ class ScanInfo(BaseModel):
|
||||
|
||||
|
||||
class AutoUpdates:
|
||||
def __init__(self, figure: BECFigure, enabled: bool = True):
|
||||
self.enabled = enabled
|
||||
self.figure = figure
|
||||
create_default_dock: bool = False
|
||||
enabled: bool = False
|
||||
dock_name: str = None
|
||||
|
||||
def __init__(self, gui: BECDockArea):
|
||||
self.gui = gui
|
||||
|
||||
def start_default_dock(self):
|
||||
"""
|
||||
Create a default dock for the auto updates.
|
||||
"""
|
||||
dock = self.gui.add_dock("default_figure")
|
||||
dock.add_widget_bec("BECFigure")
|
||||
self.dock_name = "default_figure"
|
||||
|
||||
@staticmethod
|
||||
def get_scan_info(msg) -> ScanInfo:
|
||||
@ -44,6 +55,18 @@ class AutoUpdates:
|
||||
status=status,
|
||||
)
|
||||
|
||||
def get_default_figure(self) -> BECFigure | None:
|
||||
"""
|
||||
Get the default figure from the GUI.
|
||||
"""
|
||||
dock = self.gui.panels.get(self.dock_name, [])
|
||||
if not dock:
|
||||
return None
|
||||
widgets = dock.widget_list
|
||||
if not widgets:
|
||||
return None
|
||||
return widgets[0]
|
||||
|
||||
def run(self, msg):
|
||||
"""
|
||||
Run the update function if enabled.
|
||||
@ -86,33 +109,42 @@ class AutoUpdates:
|
||||
"""
|
||||
Simple line scan.
|
||||
"""
|
||||
fig = self.get_default_figure()
|
||||
if not fig:
|
||||
return
|
||||
dev_x = info.scan_report_devices[0]
|
||||
dev_y = self.get_selected_device(info.monitored_devices, self.figure.selected_device)
|
||||
dev_y = self.get_selected_device(info.monitored_devices, self.gui.selected_device)
|
||||
if not dev_y:
|
||||
return
|
||||
self.figure.clear_all()
|
||||
plt = self.figure.plot(dev_x, dev_y)
|
||||
fig.clear_all()
|
||||
plt = fig.plot(x_name=dev_x, y_name=dev_y)
|
||||
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
|
||||
|
||||
def simple_grid_scan(self, info: ScanInfo) -> None:
|
||||
"""
|
||||
Simple grid scan.
|
||||
"""
|
||||
fig = self.get_default_figure()
|
||||
if not fig:
|
||||
return
|
||||
dev_x = info.scan_report_devices[0]
|
||||
dev_y = info.scan_report_devices[1]
|
||||
dev_z = self.get_selected_device(info.monitored_devices, self.figure.selected_device)
|
||||
self.figure.clear_all()
|
||||
plt = self.figure.plot(dev_x, dev_y, dev_z, label=f"Scan {info.scan_number}")
|
||||
dev_z = self.get_selected_device(info.monitored_devices, self.gui.selected_device)
|
||||
fig.clear_all()
|
||||
plt = fig.plot(x_name=dev_x, y_name=dev_y, z_name=dev_z, label=f"Scan {info.scan_number}")
|
||||
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
|
||||
|
||||
def best_effort(self, info: ScanInfo) -> None:
|
||||
"""
|
||||
Best effort scan.
|
||||
"""
|
||||
fig = self.get_default_figure()
|
||||
if not fig:
|
||||
return
|
||||
dev_x = info.scan_report_devices[0]
|
||||
dev_y = self.get_selected_device(info.monitored_devices, self.figure.selected_device)
|
||||
dev_y = self.get_selected_device(info.monitored_devices, self.gui.selected_device)
|
||||
if not dev_y:
|
||||
return
|
||||
self.figure.clear_all()
|
||||
plt = self.figure.plot(dev_x, dev_y, label=f"Scan {info.scan_number}")
|
||||
fig.clear_all()
|
||||
plt = fig.plot(x_name=dev_x, y_name=dev_y, label=f"Scan {info.scan_number}")
|
||||
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
|
||||
|
@ -17,10 +17,13 @@ from bec_lib.utils.import_utils import isinstance_based_on_class_name, lazy_impo
|
||||
from qtpy.QtCore import QCoreApplication
|
||||
|
||||
import bec_widgets.cli.client as client
|
||||
from bec_widgets.cli.auto_updates import AutoUpdates
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_lib.device import DeviceBase
|
||||
|
||||
from bec_widgets.cli.client import BECDockArea, BECFigure
|
||||
|
||||
messages = lazy_import("bec_lib.messages")
|
||||
# from bec_lib.connector import MessageObject
|
||||
MessageObject = lazy_import_from("bec_lib.connector", ("MessageObject",))
|
||||
@ -62,16 +65,19 @@ class BECGuiClientMixin:
|
||||
def __init__(self, **kwargs) -> None:
|
||||
super().__init__(**kwargs)
|
||||
self._process = None
|
||||
self.update_script = self._get_update_script()
|
||||
self.auto_updates = self._get_update_script()
|
||||
self._target_endpoint = MessageEndpoints.scan_status()
|
||||
self._selected_device = None
|
||||
self.stderr_output = []
|
||||
|
||||
def _get_update_script(self) -> AutoUpdates:
|
||||
def _get_update_script(self) -> AutoUpdates | None:
|
||||
eps = imd.entry_points(group="bec.widgets.auto_updates")
|
||||
for ep in eps:
|
||||
if ep.name == "plugin_widgets_update":
|
||||
return ep.load()(figure=self)
|
||||
try:
|
||||
return ep.load()(gui=self)
|
||||
except Exception as e:
|
||||
print(f"Error loading auto update script from plugin: {str(e)}")
|
||||
return None
|
||||
|
||||
@property
|
||||
@ -97,7 +103,7 @@ class BECGuiClientMixin:
|
||||
|
||||
@staticmethod
|
||||
def _handle_msg_update(msg: MessageObject, parent: BECGuiClientMixin) -> None:
|
||||
if parent.update_script is not None:
|
||||
if parent.auto_updates is not None:
|
||||
# pylint: disable=protected-access
|
||||
parent._update_script_msg_parser(msg.value)
|
||||
|
||||
@ -105,7 +111,7 @@ class BECGuiClientMixin:
|
||||
if isinstance(msg, messages.ScanStatusMessage):
|
||||
if not self.gui_is_alive():
|
||||
return
|
||||
self.update_script.run(msg)
|
||||
self.auto_updates.run(msg)
|
||||
|
||||
def show(self) -> None:
|
||||
"""
|
||||
|
@ -1,11 +1,25 @@
|
||||
import numpy as np
|
||||
import pytest
|
||||
from bec_lib.client import BECClient
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
|
||||
from bec_widgets.cli.auto_updates import AutoUpdates
|
||||
from bec_widgets.cli.client import BECDockArea, BECFigure, BECImageShow, BECMotorMap, BECWaveform
|
||||
from bec_widgets.utils import Colors
|
||||
|
||||
|
||||
@pytest.fixture(name="bec_client")
|
||||
def cli_bec_client(rpc_server_dock):
|
||||
"""
|
||||
Fixture to create a BECClient instance that is independent of the GUI.
|
||||
"""
|
||||
# pylint: disable=protected-access
|
||||
cli_client = BECClient(forced=True, config=rpc_server_dock.client._service_config)
|
||||
cli_client.start()
|
||||
yield cli_client
|
||||
cli_client.shutdown()
|
||||
|
||||
|
||||
def test_rpc_add_dock_with_figure_e2e(rpc_server_dock, qtbot):
|
||||
dock = BECDockArea(rpc_server_dock.gui_id)
|
||||
dock_server = rpc_server_dock.gui
|
||||
@ -224,3 +238,57 @@ def test_spiral_bar_scan_update(rpc_server_dock, qtbot):
|
||||
np.testing.assert_allclose(bar_server.rings[1].config.min_value, init_samy, atol=0.1)
|
||||
np.testing.assert_allclose(bar_server.rings[0].config.max_value, final_samx, atol=0.1)
|
||||
np.testing.assert_allclose(bar_server.rings[1].config.max_value, final_samy, atol=0.1)
|
||||
|
||||
|
||||
def test_auto_update(rpc_server_dock, bec_client, qtbot):
|
||||
dock = BECDockArea(rpc_server_dock.gui_id)
|
||||
dock._client = bec_client
|
||||
|
||||
AutoUpdates.enabled = True
|
||||
AutoUpdates.create_default_dock = True
|
||||
dock.auto_updates = AutoUpdates(gui=dock)
|
||||
dock.auto_updates.start_default_dock()
|
||||
dock.selected_device = "bpm4i"
|
||||
|
||||
# we need to start the update script manually; normally this is done when the GUI is started
|
||||
dock._start_update_script()
|
||||
|
||||
client = bec_client
|
||||
dev = client.device_manager.devices
|
||||
scans = client.scans
|
||||
queue = client.queue
|
||||
|
||||
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
|
||||
|
||||
# wait for scan to finish
|
||||
while not status.status == "COMPLETED":
|
||||
qtbot.wait(200)
|
||||
|
||||
last_scan_data = queue.scan_storage.storage[-1].data
|
||||
|
||||
# get data from curves
|
||||
plt = dock.auto_updates.get_default_figure()
|
||||
widgets = plt.widget_list
|
||||
plt_data = widgets[0].get_all_data()
|
||||
|
||||
# check plotted data
|
||||
assert plt_data["bpm4i-bpm4i"]["x"] == last_scan_data["samx"]["samx"].val
|
||||
assert plt_data["bpm4i-bpm4i"]["y"] == last_scan_data["bpm4i"]["bpm4i"].val
|
||||
|
||||
status = scans.grid_scan(
|
||||
dev.samx, -10, 10, 5, dev.samy, -5, 5, 5, exp_time=0.05, relative=False
|
||||
)
|
||||
|
||||
# wait for scan to finish
|
||||
while not status.status == "COMPLETED":
|
||||
qtbot.wait(200)
|
||||
|
||||
plt = dock.auto_updates.get_default_figure()
|
||||
widgets = plt.widget_list
|
||||
plt_data = widgets[0].get_all_data()
|
||||
|
||||
last_scan_data = queue.scan_storage.storage[-1].data
|
||||
|
||||
# check plotted data
|
||||
assert plt_data[f"Scan {status.scan.scan_number}"]["x"] == last_scan_data["samx"]["samx"].val
|
||||
assert plt_data[f"Scan {status.scan.scan_number}"]["y"] == last_scan_data["samy"]["samy"].val
|
||||
|
@ -25,5 +25,5 @@ def test_rpc_call_accepts_device_as_input(cli_figure):
|
||||
dev1 = FakeDevice("samx")
|
||||
dev2 = FakeDevice("bpm4i")
|
||||
fig, mock_rpc_call = cli_figure
|
||||
fig.plot(dev1, dev2)
|
||||
mock_rpc_call.assert_called_with("plot", "samx", "bpm4i")
|
||||
fig.plot(x_name=dev1, y_name=dev2)
|
||||
mock_rpc_call.assert_called_with("plot", x_name="samx", y_name="bpm4i")
|
||||
|
Reference in New Issue
Block a user