0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-13 19:21:50 +02:00

fix: rpc_server_dock fixture now spawns the server process

This commit is contained in:
2024-05-29 17:05:52 +02:00
committed by wyzula-jan
parent 2a88e17b23
commit cd9fc46ff8
8 changed files with 157 additions and 162 deletions

View File

@ -1588,6 +1588,10 @@ class BECDockArea(RPCBase, BECGuiClientMixin):
extra(str): Extra docks that are in the dockarea but that are not mentioned in state will be added to the bottom of the dockarea, unless otherwise specified by the extra argument. extra(str): Extra docks that are in the dockarea but that are not mentioned in state will be added to the bottom of the dockarea, unless otherwise specified by the extra argument.
""" """
@rpc_call
def get_docks_repr(self):
"""Return dict, list and text representation of docks"""
@rpc_call @rpc_call
def add_dock( def add_dock(
self, self,

View File

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import collections
from typing import Literal, Optional from typing import Literal, Optional
from weakref import WeakValueDictionary from weakref import WeakValueDictionary
@ -68,9 +69,17 @@ class BECDockArea(BECConnector, DockArea):
""" """
return dict(self.docks) return dict(self.docks)
def get_docks_repr(self) -> dict:
docks_repr = {
"docks": collections.defaultdict(dict),
"tempAreas": list(map(str, self.tempAreas)),
}
for dock_name, dock in self.panels.items():
docks_repr["docks"][dock_name]["widgets"] = list(map(str, dock.widgets))
return docks_repr
@panels.setter @panels.setter
def panels(self, value: dict): def panels(self, value: dict):
self.docks = WeakValueDictionary(value) self.docks = WeakValueDictionary(value)
def restore_state( def restore_state(

View File

@ -136,6 +136,16 @@ class SpiralProgressBar(BECConnector, QWidget):
def rings(self, value): def rings(self, value):
self._rings = value self._rings = value
def __str__(self):
return (
"Spiral progress bar\n"
"-------------------\n"
f" Num bars: {self.config.num_bars}\n"
f"Bar colors: {[ring.color.getRgb() for ring in self.rings]}\n"
f"Bar values: [{', '.join('%.3f' % ring.value for ring in self.rings)}]\n"
f"Bar config: {' | '.join('%d: config min=%.3f, max=%.3f' % (i, ring.config.min_value, ring.config.max_value) for i, ring in enumerate(self.rings))}\n"
)
def update_config(self, config: SpiralProgressBarConfig | dict): def update_config(self, config: SpiralProgressBarConfig | dict):
""" """
Update the configuration of the widget. Update the configuration of the widget.

View File

@ -31,6 +31,7 @@ dev = [
"pytest", "pytest",
"pytest-random-order", "pytest-random-order",
"pytest-timeout", "pytest-timeout",
"pytest-xvfb",
"coverage", "coverage",
"pytest-qt", "pytest-qt",
"isort", "isort",

View File

@ -1,40 +1,55 @@
import pytest import random
import time
from contextlib import contextmanager
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.client_utils import _start_plot_process
from bec_widgets.cli.rpc_register import RPCRegister from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.cli.server import BECWidgetsCLIServer
from bec_widgets.utils import BECDispatcher from bec_widgets.utils import BECDispatcher
from bec_widgets.widgets import BECDockArea, BECFigure from bec_widgets.widgets import BECDockArea, BECFigure
# make threads check in autouse, **will be executed at the end**; better than
# having it in fixtures for each test, since it prevents from needing to
# 'manually' shutdown bec_client_lib (for example) to make it happy, whereas
# whereas in fact bec_client_lib makes its on cleanup
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def rpc_register(): def threads_check_fixture(threads_check):
yield RPCRegister() return
RPCRegister.reset_singleton()
@pytest.fixture @pytest.fixture
def rpc_server_figure(qtbot, bec_client_lib, threads_check): def gui_id():
dispatcher = BECDispatcher(client=bec_client_lib) # Has to init singleton with fixture client return f"figure_{random.randint(0,100)}" # make a new gui id each time, to ensure no 'gui is alive' zombie key can perturbate
server = BECWidgetsCLIServer(gui_id="figure", gui_class=BECFigure)
qtbot.addWidget(server.gui)
qtbot.waitExposed(server.gui) @contextmanager
qtbot.wait(1000) # 1s long to wait until gui is ready def plot_server(gui_id, klass, client_lib):
yield server dispatcher = BECDispatcher(client=client_lib) # Has to init singleton with fixture client
dispatcher.disconnect_all() process, output_thread = _start_plot_process(
server.client.shutdown() gui_id, klass, client_lib._client._service_config.redis
server.shutdown() )
dispatcher.reset_singleton() try:
while client_lib._client.connector.get(MessageEndpoints.gui_heartbeat(gui_id)) is None:
time.sleep(0.3)
yield gui_id
finally:
process.terminate()
process.wait()
output_thread.join()
dispatcher.disconnect_all()
dispatcher.reset_singleton()
@pytest.fixture @pytest.fixture
def rpc_server_dock(qtbot, bec_client_lib, threads_check): def rpc_server_figure(gui_id, bec_client_lib):
dispatcher = BECDispatcher(client=bec_client_lib) # Has to init singleton with fixture client with plot_server(gui_id, BECFigure, bec_client_lib) as server:
server = BECWidgetsCLIServer(gui_id="figure", gui_class=BECDockArea) yield server
qtbot.addWidget(server.gui)
qtbot.waitExposed(server.gui)
qtbot.wait(1000) # 1s long to wait until gui is ready @pytest.fixture
yield server def rpc_server_dock(gui_id, bec_client_lib):
dispatcher.disconnect_all() with plot_server(gui_id, BECDockArea, bec_client_lib) as server:
server.client.shutdown() yield server
server.shutdown()
dispatcher.reset_singleton()

View File

@ -1,3 +1,5 @@
import time
import numpy as np import numpy as np
import pytest import pytest
from bec_lib.client import BECClient from bec_lib.client import BECClient
@ -8,24 +10,10 @@ from bec_widgets.cli.client import BECDockArea, BECFigure, BECImageShow, BECMoto
from bec_widgets.utils import Colors from bec_widgets.utils import Colors
@pytest.fixture(name="bec_client") def test_rpc_add_dock_with_figure_e2e(bec_client_lib, rpc_server_dock):
def cli_bec_client(rpc_server_dock):
"""
Fixture to create a BECClient instance that is independent of the GUI.
"""
# pylint: disable=protected-access
cli_client = BECClient(forced=True, config=rpc_server_dock.client._service_config)
cli_client.start()
yield cli_client
cli_client.shutdown()
def test_rpc_add_dock_with_figure_e2e(rpc_server_dock, qtbot):
dock = BECDockArea(rpc_server_dock.gui_id)
dock_server = rpc_server_dock.gui
# BEC client shortcuts # BEC client shortcuts
client = rpc_server_dock.client dock = BECDockArea(rpc_server_dock)
client = bec_client_lib
dev = client.device_manager.devices dev = client.device_manager.devices
scans = client.scans scans = client.scans
queue = client.queue queue = client.queue
@ -35,17 +23,17 @@ def test_rpc_add_dock_with_figure_e2e(rpc_server_dock, qtbot):
d1 = dock.add_dock("dock_1") d1 = dock.add_dock("dock_1")
d2 = dock.add_dock("dock_2") d2 = dock.add_dock("dock_2")
assert len(dock_server.docks) == 3 assert len(dock.get_docks_repr()["docks"]) == 3
# Add 3 figures with some widgets # Add 3 figures with some widgets
fig0 = d0.add_widget("BECFigure") fig0 = d0.add_widget("BECFigure")
fig1 = d1.add_widget("BECFigure") fig1 = d1.add_widget("BECFigure")
fig2 = d2.add_widget("BECFigure") fig2 = d2.add_widget("BECFigure")
assert len(dock_server.docks) == 3 docks = dock.get_docks_repr()["docks"]
assert len(dock_server.docks["dock_0"].widgets) == 1 assert len(docks) == 3
assert len(dock_server.docks["dock_1"].widgets) == 1 assert len(docks["dock_0"]["widgets"]) == 1
assert len(dock_server.docks["dock_2"].widgets) == 1 assert len(docks["dock_1"]["widgets"]) == 1
assert len(docks["dock_2"]["widgets"]) == 1
assert fig1.__class__.__name__ == "BECFigure" assert fig1.__class__.__name__ == "BECFigure"
assert fig1.__class__ == BECFigure assert fig1.__class__ == BECFigure
@ -98,7 +86,7 @@ def test_rpc_add_dock_with_figure_e2e(rpc_server_dock, qtbot):
# wait for scan to finish # wait for scan to finish
while not status.status == "COMPLETED": while not status.status == "COMPLETED":
qtbot.wait(200) time.sleep(0.2)
# plot # plot
plt_last_scan_data = queue.scan_storage.storage[-1].data plt_last_scan_data = queue.scan_storage.storage[-1].data
@ -110,7 +98,7 @@ def test_rpc_add_dock_with_figure_e2e(rpc_server_dock, qtbot):
last_image_device = client.connector.get_last(MessageEndpoints.device_monitor("eiger"))[ last_image_device = client.connector.get_last(MessageEndpoints.device_monitor("eiger"))[
"data" "data"
].data ].data
qtbot.wait(500) time.sleep(0.5)
last_image_plot = im.images[0].get_data() last_image_plot = im.images[0].get_data()
np.testing.assert_equal(last_image_device, last_image_plot) np.testing.assert_equal(last_image_device, last_image_plot)
@ -129,40 +117,40 @@ def test_rpc_add_dock_with_figure_e2e(rpc_server_dock, qtbot):
) )
def test_dock_manipulations_e2e(rpc_server_dock, qtbot): def test_dock_manipulations_e2e(rpc_server_dock):
dock = BECDockArea(rpc_server_dock.gui_id) dock = BECDockArea(rpc_server_dock)
dock_server = rpc_server_dock.gui
d0 = dock.add_dock("dock_0") d0 = dock.add_dock("dock_0")
d1 = dock.add_dock("dock_1") d1 = dock.add_dock("dock_1")
d2 = dock.add_dock("dock_2") d2 = dock.add_dock("dock_2")
assert len(dock_server.docks) == 3 assert len(dock.get_docks_repr()["docks"]) == 3
d0.detach() d0.detach()
dock.detach_dock("dock_2") dock.detach_dock("dock_2")
assert len(dock_server.docks) == 3 docks_repr = dock.get_docks_repr()
assert len(dock_server.tempAreas) == 2 assert len(docks_repr["docks"]) == 3
assert len(docks_repr["tempAreas"]) == 2
d0.attach() d0.attach()
assert len(dock_server.docks) == 3 docks_repr = dock.get_docks_repr()
assert len(dock_server.tempAreas) == 1 assert len(docks_repr["docks"]) == 3
assert len(docks_repr["tempAreas"]) == 1
d2.remove() d2.remove()
qtbot.wait(200) docks_repr = dock.get_docks_repr()
assert len(docks_repr["docks"]) == 2
assert len(dock_server.docks) == 2 assert ["dock_0", "dock_1"] == list(docks_repr["docks"])
docks_list = list(dict(dock_server.docks).keys())
assert ["dock_0", "dock_1"] == docks_list
dock.clear_all() dock.clear_all()
assert len(dock_server.docks) == 0 docks_repr = dock.get_docks_repr()
assert len(dock_server.tempAreas) == 0 assert len(docks_repr["docks"]) == 0
assert len(docks_repr["tempAreas"]) == 0
def test_spiral_bar(rpc_server_dock): def test_spiral_bar(rpc_server_dock):
dock = BECDockArea(rpc_server_dock.gui_id) dock = BECDockArea(rpc_server_dock)
dock_server = rpc_server_dock.gui
d0 = dock.add_dock(name="dock_0") d0 = dock.add_dock(name="dock_0")
@ -173,49 +161,42 @@ def test_spiral_bar(rpc_server_dock):
bar.set_colors_from_map("viridis") bar.set_colors_from_map("viridis")
bar.set_value([10, 20, 30, 40, 50]) bar.set_value([10, 20, 30, 40, 50])
bar_server = dock_server.docks["dock_0"].widgets[0] docks_repr = dock.get_docks_repr()
bar_repr = docks_repr["docks"]["dock_0"]["widgets"][0]
expected_colors = Colors.golden_angle_color("viridis", 5, "RGB") expected_colors = Colors.golden_angle_color("viridis", 5, "RGB")
bar_colors = [ring.color.getRgb() for ring in bar_server.rings] assert f"Bar colors: {expected_colors}" in bar_repr
bar_values = [ring.config.value for ring in bar_server.rings] assert f"Bar values: [10.000, 20.000, 30.000, 40.000, 50.000]" in bar_repr
assert bar_values == [10, 20, 30, 40, 50]
assert bar_colors == expected_colors
def test_spiral_bar_scan_update(rpc_server_dock, qtbot): def test_spiral_bar_scan_update(bec_client_lib, rpc_server_dock):
dock = BECDockArea(rpc_server_dock.gui_id) dock = BECDockArea(rpc_server_dock)
dock_server = rpc_server_dock.gui
d0 = dock.add_dock("dock_0") d0 = dock.add_dock("dock_0")
d0.add_widget("SpiralProgressBar") d0.add_widget("SpiralProgressBar")
client = rpc_server_dock.client client = bec_client_lib
dev = client.device_manager.devices dev = client.device_manager.devices
dev.samx.tolerance.set(0)
dev.samy.tolerance.set(0)
scans = client.scans scans = client.scans
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False) status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
status.wait()
while not status.status == "COMPLETED": bar_repr = dock.get_docks_repr()["docks"]["dock_0"]["widgets"][0]
qtbot.wait(200) assert "Num bars: 1" in bar_repr
assert "Bar values: [10.000]" in bar_repr
qtbot.wait(200) assert "0: config min=0.000, max=10.000" in bar_repr
bar_server = dock_server.docks["dock_0"].widgets[0]
assert bar_server.config.num_bars == 1
np.testing.assert_allclose(bar_server.rings[0].config.value, 10, atol=0.1)
np.testing.assert_allclose(bar_server.rings[0].config.min_value, 0, atol=0.1)
np.testing.assert_allclose(bar_server.rings[0].config.max_value, 10, atol=0.1)
status = scans.grid_scan(dev.samx, -5, 5, 4, dev.samy, -10, 10, 4, relative=True, exp_time=0.1) status = scans.grid_scan(dev.samx, -5, 5, 4, dev.samy, -10, 10, 4, relative=True, exp_time=0.1)
status.wait()
while not status.status == "COMPLETED": bar_repr = dock.get_docks_repr()["docks"]["dock_0"]["widgets"][0]
qtbot.wait(200) assert "Num bars: 1" in bar_repr
assert "Bar values: [16.000]" in bar_repr
qtbot.wait(200) assert "0: config min=0.000, max=16.000" in bar_repr
assert bar_server.config.num_bars == 1
np.testing.assert_allclose(bar_server.rings[0].config.value, 16, atol=0.1)
np.testing.assert_allclose(bar_server.rings[0].config.min_value, 0, atol=0.1)
np.testing.assert_allclose(bar_server.rings[0].config.max_value, 16, atol=0.1)
init_samx = dev.samx.read()["samx"]["value"] init_samx = dev.samx.read()["samx"]["value"]
init_samy = dev.samy.read()["samy"]["value"] init_samy = dev.samy.read()["samy"]["value"]
@ -226,18 +207,15 @@ def test_spiral_bar_scan_update(rpc_server_dock, qtbot):
dev.samy.velocity.put(5) dev.samy.velocity.put(5)
status = scans.umv(dev.samx, 5, dev.samy, 10, relative=True) status = scans.umv(dev.samx, 5, dev.samy, 10, relative=True)
status.wait()
while not status.status == "COMPLETED": bar_repr = dock.get_docks_repr()["docks"]["dock_0"]["widgets"][0]
qtbot.wait(200) assert "Num bars: 2" in bar_repr
assert f"Bar values: [{'%.3f' % final_samx}, {'%.3f' % final_samy}]" in bar_repr
qtbot.wait(200) assert (
assert bar_server.config.num_bars == 2 f"0: config min={'%.3f' % init_samx}, max={'%.3f' % final_samx} | 1: config min={'%.3f' % init_samy}, max={'%.3f' % final_samy}"
np.testing.assert_allclose(bar_server.rings[0].config.value, final_samx, atol=0.1) in bar_repr
np.testing.assert_allclose(bar_server.rings[1].config.value, final_samy, atol=0.1) )
np.testing.assert_allclose(bar_server.rings[0].config.min_value, init_samx, atol=0.1)
np.testing.assert_allclose(bar_server.rings[1].config.min_value, init_samy, atol=0.1)
np.testing.assert_allclose(bar_server.rings[0].config.max_value, final_samx, atol=0.1)
np.testing.assert_allclose(bar_server.rings[1].config.max_value, final_samy, atol=0.1)
def test_auto_update(rpc_server_dock, bec_client, qtbot): def test_auto_update(rpc_server_dock, bec_client, qtbot):

View File

@ -5,9 +5,8 @@ from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
def test_rpc_waveform1d_custom_curve(rpc_server_figure, qtbot): def test_rpc_waveform1d_custom_curve(rpc_server_figure):
fig = BECFigure(rpc_server_figure.gui_id) fig = BECFigure(rpc_server_figure)
fig_server = rpc_server_figure.gui
ax = fig.add_plot() ax = fig.add_plot()
curve = ax.add_curve_custom([1, 2, 3], [1, 2, 3]) curve = ax.add_curve_custom([1, 2, 3], [1, 2, 3])
@ -15,13 +14,12 @@ def test_rpc_waveform1d_custom_curve(rpc_server_figure, qtbot):
curve = ax.curves[0] curve = ax.curves[0]
curve.set_color("blue") curve.set_color("blue")
assert len(fig_server.widgets) == 1 assert len(fig.widgets) == 1
assert len(fig_server.widgets[ax.rpc_id].curves) == 1 assert len(fig.widgets[ax.rpc_id].curves) == 1
def test_rpc_plotting_shortcuts_init_configs(rpc_server_figure, qtbot): def test_rpc_plotting_shortcuts_init_configs(rpc_server_figure, qtbot):
fig = BECFigure(rpc_server_figure.gui_id) fig = BECFigure(rpc_server_figure)
fig_server = rpc_server_figure.gui
plt = fig.plot(x_name="samx", y_name="bpm4i") plt = fig.plot(x_name="samx", y_name="bpm4i")
im = fig.image("eiger") im = fig.image("eiger")
@ -29,7 +27,7 @@ def test_rpc_plotting_shortcuts_init_configs(rpc_server_figure, qtbot):
plt_z = fig.add_plot("samx", "samy", "bpm4i") plt_z = fig.add_plot("samx", "samy", "bpm4i")
# Checking if classes are correctly initialised # Checking if classes are correctly initialised
assert len(fig_server.widgets) == 4 assert len(fig.widgets) == 4
assert plt.__class__.__name__ == "BECWaveform" assert plt.__class__.__name__ == "BECWaveform"
assert plt.__class__ == BECWaveform assert plt.__class__ == BECWaveform
assert im.__class__.__name__ == "BECImageShow" assert im.__class__.__name__ == "BECImageShow"
@ -75,24 +73,21 @@ def test_rpc_plotting_shortcuts_init_configs(rpc_server_figure, qtbot):
} }
def test_rpc_waveform_scan(rpc_server_figure, qtbot): def test_rpc_waveform_scan(rpc_server_figure, bec_client_lib):
fig = BECFigure(rpc_server_figure.gui_id) fig = BECFigure(rpc_server_figure)
# add 3 different curves to track # add 3 different curves to track
plt = fig.plot(x_name="samx", y_name="bpm4i") plt = fig.plot(x_name="samx", y_name="bpm4i")
fig.plot(x_name="samx", y_name="bpm3a") fig.plot(x_name="samx", y_name="bpm3a")
fig.plot(x_name="samx", y_name="bpm4d") fig.plot(x_name="samx", y_name="bpm4d")
client = rpc_server_figure.client client = bec_client_lib
dev = client.device_manager.devices dev = client.device_manager.devices
scans = client.scans scans = client.scans
queue = client.queue queue = client.queue
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False) status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
status.wait()
# wait for scan to finish
while not status.status == "COMPLETED":
qtbot.wait(200)
last_scan_data = queue.scan_storage.storage[-1].data last_scan_data = queue.scan_storage.storage[-1].data
@ -108,38 +103,33 @@ def test_rpc_waveform_scan(rpc_server_figure, qtbot):
assert plt_data["bpm4d-bpm4d"]["y"] == last_scan_data["bpm4d"]["bpm4d"].val assert plt_data["bpm4d-bpm4d"]["y"] == last_scan_data["bpm4d"]["bpm4d"].val
def test_rpc_image(rpc_server_figure, qtbot): def test_rpc_image(rpc_server_figure, bec_client_lib):
fig = BECFigure(rpc_server_figure.gui_id) fig = BECFigure(rpc_server_figure)
im = fig.image("eiger") im = fig.image("eiger")
client = rpc_server_figure.client client = bec_client_lib
dev = client.device_manager.devices dev = client.device_manager.devices
scans = client.scans scans = client.scans
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False) status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
status.wait()
# wait for scan to finish
while not status.status == "COMPLETED":
qtbot.wait(200)
last_image_device = client.connector.get_last(MessageEndpoints.device_monitor("eiger"))[ last_image_device = client.connector.get_last(MessageEndpoints.device_monitor("eiger"))[
"data" "data"
].data ].data
qtbot.wait(500)
last_image_plot = im.images[0].get_data() last_image_plot = im.images[0].get_data()
# check plotted data # check plotted data
np.testing.assert_equal(last_image_device, last_image_plot) np.testing.assert_equal(last_image_device, last_image_plot)
def test_rpc_motor_map(rpc_server_figure, qtbot): def test_rpc_motor_map(rpc_server_figure, bec_client_lib):
fig = BECFigure(rpc_server_figure.gui_id) fig = BECFigure(rpc_server_figure)
fig_server = rpc_server_figure.gui
motor_map = fig.motor_map("samx", "samy") motor_map = fig.motor_map("samx", "samy")
client = rpc_server_figure.client client = bec_client_lib
dev = client.device_manager.devices dev = client.device_manager.devices
scans = client.scans scans = client.scans
@ -147,10 +137,8 @@ def test_rpc_motor_map(rpc_server_figure, qtbot):
initial_pos_y = dev.samy.read()["samy"]["value"] initial_pos_y = dev.samy.read()["samy"]["value"]
status = scans.mv(dev.samx, 1, dev.samy, 2, relative=True) status = scans.mv(dev.samx, 1, dev.samy, 2, relative=True)
status.wait()
# wait for scan to finish
while not status.status == "COMPLETED":
qtbot.wait(200)
final_pos_x = dev.samx.read()["samx"]["value"] final_pos_x = dev.samx.read()["samx"]["value"]
final_pos_y = dev.samy.read()["samy"]["value"] final_pos_y = dev.samy.read()["samy"]["value"]

View File

@ -3,40 +3,30 @@ import pytest
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
def find_deepest_value(d: dict): def test_rpc_register_list_connections(rpc_server_figure):
""" fig = BECFigure(rpc_server_figure)
Recursively find the deepest value in a dictionary
Args:
d(dict): Dictionary to search
Returns:
The deepest value in the dictionary.
"""
if isinstance(d, dict):
if d:
return find_deepest_value(next(iter(d.values())))
return d
def test_rpc_register_list_connections(rpc_server_figure, rpc_register, qtbot):
fig = BECFigure(rpc_server_figure.gui_id)
fig_server = rpc_server_figure.gui
plt = fig.plot(x_name="samx", y_name="bpm4i") plt = fig.plot(x_name="samx", y_name="bpm4i")
im = fig.image("eiger") im = fig.image("eiger")
motor_map = fig.motor_map("samx", "samy") motor_map = fig.motor_map("samx", "samy")
plt_z = fig.add_plot("samx", "samy", "bpm4i") plt_z = fig.add_plot("samx", "samy", "bpm4i")
all_connections = rpc_register.list_all_connections() # keep only class names from objects, since objects on server and client are different
# so the best we can do is to compare types (rpc register is unit-tested elsewhere)
all_connections = {obj_id: type(obj).__name__ for obj_id, obj in fig.get_all_rpc().items()}
# Construct dict of all rpc items manually all_subwidgets_expected = {wid: type(widget).__name__ for wid, widget in fig.widgets.items()}
all_subwidgets_expected = dict(fig_server.widgets) curve_1D = fig.widgets[plt.rpc_id]
curve_1D = find_deepest_value(fig_server.widgets[plt.rpc_id]._curves_data) curve_2D = fig.widgets[plt_z.rpc_id]
curve_2D = find_deepest_value(fig_server.widgets[plt_z.rpc_id]._curves_data) curves_expected = {
curves_expected = {curve_1D.rpc_id: curve_1D, curve_2D.rpc_id: curve_2D} curve_1D.rpc_id: type(curve_1D).__name__,
fig_expected = {fig.rpc_id: fig_server} curve_2D.rpc_id: type(curve_2D).__name__,
}
curves_expected.update({curve._gui_id: type(curve).__name__ for curve in curve_1D.curves})
curves_expected.update({curve._gui_id: type(curve).__name__ for curve in curve_2D.curves})
fig_expected = {fig.rpc_id: type(fig).__name__}
image_item_expected = { image_item_expected = {
fig_server.widgets[im.rpc_id].images[0].rpc_id: fig_server.widgets[im.rpc_id].images[0] fig.widgets[im.rpc_id].images[0].rpc_id: type(fig.widgets[im.rpc_id].images[0]).__name__
} }
all_connections_expected = { all_connections_expected = {