1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-08 09:47:52 +02:00

Compare commits

...

12 Commits

Author SHA1 Message Date
a275c5a3b1 wip 2026-03-15 14:31:39 +01:00
b12e6432d6 wip 2026-03-15 13:59:43 +01:00
b802beec69 wip 2026-03-15 13:53:37 +01:00
6c40df4380 wip 2026-03-15 13:36:13 +01:00
dc8ae5a85d wip 2026-03-15 13:31:45 +01:00
2d41a158bc wip 2026-03-15 13:25:42 +01:00
c2a6964875 wip 2026-03-15 13:18:15 +01:00
774ca08cb3 wip 2026-03-15 13:06:18 +01:00
4606bbd570 wip 2026-03-15 13:01:44 +01:00
b2ed9d42f7 wip 2026-03-15 12:57:19 +01:00
d799691e12 wip 2026-03-15 12:47:58 +01:00
6848a9e20b test(e2e): avoid timing issues in rpc_gui_obj test 2026-03-15 12:30:40 +01:00
5 changed files with 410 additions and 397 deletions

View File

@@ -1,19 +1,19 @@
name: Full CI
on:
on:
push:
pull_request:
workflow_dispatch:
inputs:
BEC_WIDGETS_BRANCH:
description: 'Branch of BEC Widgets to install'
description: "Branch of BEC Widgets to install"
required: false
type: string
BEC_CORE_BRANCH:
description: 'Branch of BEC Core to install'
description: "Branch of BEC Core to install"
required: false
type: string
OPHYD_DEVICES_BRANCH:
description: 'Branch of Ophyd Devices to install'
description: "Branch of Ophyd Devices to install"
required: false
type: string
@@ -25,60 +25,58 @@ permissions:
pull-requests: write
jobs:
check_pr_status:
uses: ./.github/workflows/check_pr.yml
# check_pr_status:
# uses: ./.github/workflows/check_pr.yml
formatter:
needs: check_pr_status
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/formatter.yml
# formatter:
# needs: check_pr_status
# if: needs.check_pr_status.outputs.branch-pr == ''
# uses: ./.github/workflows/formatter.yml
unit-test:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/pytest.yml
with:
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref }}
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
# unit-test:
# needs: [check_pr_status, formatter]
# if: needs.check_pr_status.outputs.branch-pr == ''
# uses: ./.github/workflows/pytest.yml
# with:
# BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref }}
# BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
# OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}
# secrets:
# CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
unit-test-matrix:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/pytest-matrix.yml
with:
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha}}
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}
# unit-test-matrix:
# needs: [check_pr_status, formatter]
# if: needs.check_pr_status.outputs.branch-pr == ''
# uses: ./.github/workflows/pytest-matrix.yml
# with:
# BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha}}
# BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
# OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}
generate-cli-test:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/generate-cli-check.yml
# generate-cli-test:
# needs: [check_pr_status, formatter]
# if: needs.check_pr_status.outputs.branch-pr == ''
# uses: ./.github/workflows/generate-cli-check.yml
end2end-test:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/end2end-conda.yml
child-repos:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/child_repos.yml
with:
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
plugin_repos:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: bec-project/bec/.github/workflows/plugin_repos.yml@main
with:
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
# child-repos:
# needs: [check_pr_status, formatter]
# if: needs.check_pr_status.outputs.branch-pr == ''
# uses: ./.github/workflows/child_repos.yml
# with:
# BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
# OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
# BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
secrets:
GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}
# plugin_repos:
# needs: [check_pr_status, formatter]
# if: needs.check_pr_status.outputs.branch-pr == ''
# uses: bec-project/bec/.github/workflows/plugin_repos.yml@main
# with:
# BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
# BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
# secrets:
# GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}

View File

@@ -48,12 +48,14 @@ jobs:
source ./bin/install_bec_dev.sh -t
cd ../
pip install -e ./ophyd_devices -e .[dev,pyside6] -e ./bec_testing_plugin
pytest -v --files-path ./ --start-servers --random-order ./tests/end-2-end
pip install pytest-repeat
pytest -v --files-path ./ --start-servers --random-order ./tests/end-2-end/user_interaction/test_user_interaction_e2e.py
- name: Upload logs if job fails
if: failure()
uses: actions/upload-artifact@v4
with:
name: pytest-logs
path: ./logs/*.log
path: ./bec/logs/*.log
retention-days: 7

View File

@@ -118,6 +118,9 @@ class RPCServer:
logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}")
with rpc_exception_hook(functools.partial(self.send_response, request_id, False)):
try:
logger.info(
f"Processing RPC instruction: {msg['action']} with request_id: {request_id}. Parameters: {msg.get('parameter')}"
)
method = msg["action"]
args = msg["parameter"].get("args", [])
kwargs = msg["parameter"].get("kwargs", {})
@@ -257,6 +260,10 @@ class RPCServer:
else:
name = WidgetContainerUtils.generate_unique_name("dock_area", existing_dock_areas)
logger.info(
f"Launching new dock area with name: {name} and startup_profile: {startup_profile}"
)
result_widget = bw_launch.dock_area(object_name=name, startup_profile=startup_profile)
result_widget.window().setWindowTitle(f"BEC - {name}")
@@ -296,6 +303,9 @@ class RPCServer:
else:
res = self.serialize_object(res)
except RegistryNotReadyError:
logger.info(
f"Object not registered yet for RPC request {request_id}, retrying serialization after {retry_delay} ms"
)
try:
self._rpc_singleshot_repeats[request_id] += retry_delay
QTimer.singleShot(

View File

@@ -150,7 +150,9 @@ def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
# communication should work, main dock area should have same id and be visible
yw = gui.new("Y")
qtbot.waitUntil(lambda: len(gui.windows) == 2, timeout=3000)
yw.delete_all()
assert len(gui.windows) == 2
yw.remove()
assert len(gui.windows) == 1 # only bec is left
qtbot.waitUntil(lambda: len(gui.windows) == 1, timeout=3000)
assert len(gui.windows) == 1

View File

@@ -135,144 +135,160 @@ def maybe_remove_dock_area(qtbot, gui: BECGuiClient, random_int_gen: random.Rand
wait_for_namespace_change(
qtbot, gui=gui, parent_widget=gui, object_name=name, widget_gui_id=gui_id, exists=False
)
qtbot.waitUntil(lambda: hasattr(gui, "dock_area") is False, timeout=5000)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_bec_progress_bar(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the BECProgressBar widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.BECProgressBar)
widget: client.BECProgressBar
# Check rpc calls
assert widget.label_template == "$value / $maximum - $percentage %"
widget.set_maximum(100)
widget.set_minimum(50)
widget.set_value(75)
assert widget._get_label() == "75 / 100 - 50 %"
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_bec_queue(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the BECQueue widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.BECQueue)
widget: client.BECQueue
# No rpc calls to test so far
# maybe we can add an rpc call to check the queue length
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_bec_status_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the BECStatusBox widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.BECStatusBox)
# Check rpc calls
assert widget.get_server_state() in ["RUNNING", "IDLE", "BUSY", "ERROR"]
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_dap_combo_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the DAPComboBox widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.DapComboBox)
widget: client.DAPComboBox
# Check rpc calls
widget.select_fit_model("PseudoVoigtModel")
widget.select_x_axis("samx")
widget.select_y_axis("bpm4i")
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_device_browser(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the DeviceBrowser widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.DeviceBrowser)
widget: client.DeviceBrowser
# No rpc calls yet to check
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_image(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the Image widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.Image)
widget: client.Image
scans = bec.scans
dev = bec.device_manager.devices
# Test rpc calls
img = widget.image(device=dev.eiger.name, signal="preview")
assert img.get_data() is None
# Run a scan and plot the image
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
s.wait()
def _wait_for_scan_in_history():
# Get scan item from history
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
return scan_item is not None
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# Check that last image is equivalent to data in Redis
last_img = bec.connector.get_last(MessageEndpoints.device_preview("eiger", "preview"))[
"data"
].data
assert np.allclose(img.get_data(), last_img)
# Now add a device with a preview signal
img = widget.image(device="eiger", signal="preview")
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
s.wait()
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# TODO re-enable when issue is resolved #560
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_log_panel(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the LogPanel widget."""
# def test_widgets_e2e_bec_progress_bar(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the BECProgressBar widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.LogPanel)
# widget: client.LogPanel
# widget = create_widget(qtbot, gui, gui.available_widgets.BECProgressBar)
# widget: client.BECProgressBar
# # Check rpc calls
# assert widget.label_template == "$value / $maximum - $percentage %"
# widget.set_maximum(100)
# widget.set_minimum(50)
# widget.set_value(75)
# assert widget._get_label() == "75 / 100 - 50 %"
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_bec_queue(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the BECQueue widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.BECQueue)
# widget: client.BECQueue
# # No rpc calls to test so far
# # maybe we can add an rpc call to check the queue length
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_bec_status_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the BECStatusBox widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.BECStatusBox)
# # Check rpc calls
# assert widget.get_server_state() in ["RUNNING", "IDLE", "BUSY", "ERROR"]
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_dap_combo_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the DAPComboBox widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.DapComboBox)
# widget: client.DAPComboBox
# # Check rpc calls
# widget.select_fit_model("PseudoVoigtModel")
# widget.select_x_axis("samx")
# widget.select_y_axis("bpm4i")
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_device_browser(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the DeviceBrowser widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.DeviceBrowser)
# widget: client.DeviceBrowser
# # No rpc calls yet to check
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_image(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the Image widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.Image)
# widget: client.Image
# scans = bec.scans
# dev = bec.device_manager.devices
# # Test rpc calls
# img = widget.image(device=dev.eiger.name, signal="preview")
# assert img.get_data() is None
# # Run a scan and plot the image
# s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
# s.wait()
# def _wait_for_scan_in_history():
# # Get scan item from history
# scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
# return scan_item is not None
# qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# # Check that last image is equivalent to data in Redis
# last_img = bec.connector.get_last(MessageEndpoints.device_preview("eiger", "preview"))[
# "data"
# ].data
# assert np.allclose(img.get_data(), last_img)
# # Now add a device with a preview signal
# img = widget.image(device="eiger", signal="preview")
# s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
# s.wait()
# qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# # TODO re-enable when issue is resolved #560
# # @pytest.mark.timeout(PYTEST_TIMEOUT)
# # def test_widgets_e2e_log_panel(qtbot, connected_client_gui_obj, random_generator_from_seed):
# # """Test the LogPanel widget."""
# # gui = connected_client_gui_obj
# # bec = gui._client
# # # Create dock_area and widget
# # widget = create_widget(qtbot, gui, gui.available_widgets.LogPanel)
# # widget: client.LogPanel
# # # No rpc calls to check so far
# # # Test removing the widget, or leaving it open for the next test
# # maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_minesweeper(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the MineSweeper widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.Minesweeper)
# widget: client.MineSweeper
# # No rpc calls to check so far
@@ -280,175 +296,160 @@ def test_widgets_e2e_image(qtbot, connected_client_gui_obj, random_generator_fro
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_minesweeper(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the MineSweeper widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.Minesweeper)
widget: client.MineSweeper
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_motor_map(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the MotorMap widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.MotorMap)
# widget: client.MotorMap
# No rpc calls to check so far
# # Test RPC calls
# dev = bec.device_manager.devices
# scans = bec.scans
# # Set motor map to names
# widget.map(dev.samx, dev.samy)
# # Move motor samx to pos
# pos = dev.samx.limits[1] - 1 # -1 from higher limit
# scans.mv(dev.samx, pos, relative=False).wait()
# # Check that data is up to date
# assert np.isclose(widget.get_data()["x"][-1], pos, dev.samx.precision)
# # Move motor samy to pos
# pos = dev.samy.limits[0] + 1 # +1 from lower limit
# scans.mv(dev.samy, pos, relative=False).wait()
# # Check that data is up to date
# assert np.isclose(widget.get_data()["y"][-1], pos, dev.samy.precision)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_motor_map(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the MotorMap widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.MotorMap)
widget: client.MotorMap
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_multi_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test MultiWaveform widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.MultiWaveform)
# widget: client.MultiWaveform
# Test RPC calls
dev = bec.device_manager.devices
scans = bec.scans
# Set motor map to names
widget.map(dev.samx, dev.samy)
# Move motor samx to pos
pos = dev.samx.limits[1] - 1 # -1 from higher limit
scans.mv(dev.samx, pos, relative=False).wait()
# Check that data is up to date
assert np.isclose(widget.get_data()["x"][-1], pos, dev.samx.precision)
# Move motor samy to pos
pos = dev.samy.limits[0] + 1 # +1 from lower limit
scans.mv(dev.samy, pos, relative=False).wait()
# Check that data is up to date
assert np.isclose(widget.get_data()["y"][-1], pos, dev.samy.precision)
# # Test RPC calls
# dev = bec.device_manager.devices
# scans = bec.scans
# # test plotting
# cm = "cividis"
# widget.plot(dev.waveform, color_palette=cm)
# assert widget.monitor == dev.waveform.name
# assert widget.color_palette == cm
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# # Scan with BEC
# s = scans.line_scan(dev.samx, -3, 3, steps=5, exp_time=0.01, relative=False)
# s.wait()
# def _wait_for_scan_in_history():
# # Get scan item from history
# scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
# return scan_item is not None
# qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# # Wait for data in history (should be plotted?)
# # TODO how can we check that the data was plotted, implement get_data()
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_multi_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test MultiWaveform widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.MultiWaveform)
widget: client.MultiWaveform
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_positioner_indicator(
# qtbot, connected_client_gui_obj, random_generator_from_seed
# ):
# """Test the PositionIndicator widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.PositionIndicator)
# widget: client.PositionIndicator
# Test RPC calls
dev = bec.device_manager.devices
scans = bec.scans
# test plotting
cm = "cividis"
widget.plot(dev.waveform, color_palette=cm)
assert widget.monitor == dev.waveform.name
assert widget.color_palette == cm
# # TODO check what these rpc calls are supposed to do! Issue created #461
# widget.set_value(5)
# Scan with BEC
s = scans.line_scan(dev.samx, -3, 3, steps=5, exp_time=0.01, relative=False)
s.wait()
def _wait_for_scan_in_history():
# Get scan item from history
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
return scan_item is not None
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# Wait for data in history (should be plotted?)
# TODO how can we check that the data was plotted, implement get_data()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_indicator(
qtbot, connected_client_gui_obj, random_generator_from_seed
):
"""Test the PositionIndicator widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.PositionIndicator)
widget: client.PositionIndicator
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_positioner_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the PositionerBox widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox)
# widget: client.PositionerBox
# TODO check what these rpc calls are supposed to do! Issue created #461
widget.set_value(5)
# # Test rpc calls
# dev = bec.device_manager.devices
# scans = bec.scans
# # No rpc calls to check so far
# widget.set_positioner(dev.samx)
# widget.set_positioner(dev.samy.name)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# scans.mv(dev.samy, -3, relative=False).wait()
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the PositionerBox widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox)
widget: client.PositionerBox
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_positioner_box_2d(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the PositionerBox2D widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox2D)
# widget: client.PositionerBox2D
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
# No rpc calls to check so far
widget.set_positioner(dev.samx)
widget.set_positioner(dev.samy.name)
# # Test rpc calls
# dev = bec.device_manager.devices
# scans = bec.scans
# # No rpc calls to check so far
# widget.set_positioner_hor(dev.samx)
# widget.set_positioner_ver(dev.samy)
scans.mv(dev.samy, -3, relative=False).wait()
# # Try moving the motors
# scans.mv(dev.samx, 3, relative=False).wait()
# scans.mv(dev.samy, -3, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_box_2d(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the PositionerBox2D widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox2D)
widget: client.PositionerBox2D
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_positioner_control_line(
# qtbot, connected_client_gui_obj, random_generator_from_seed
# ):
# """Test the positioner control line widget"""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.PositionerControlLine)
# widget: client.PositionerControlLine
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
# No rpc calls to check so far
widget.set_positioner_hor(dev.samx)
widget.set_positioner_ver(dev.samy)
# # Test rpc calls
# dev = bec.device_manager.devices
# scans = bec.scans
# # Set positioner
# widget.set_positioner(dev.samx)
# scans.mv(dev.samx, 3, relative=False).wait()
# widget.set_positioner(dev.samy.name)
# scans.mv(dev.samy, -3, relative=False).wait()
# Try moving the motors
scans.mv(dev.samx, 3, relative=False).wait()
scans.mv(dev.samy, -3, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_control_line(
qtbot, connected_client_gui_obj, random_generator_from_seed
):
"""Test the positioner control line widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.PositionerControlLine)
widget: client.PositionerControlLine
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
# Set positioner
widget.set_positioner(dev.samx)
scans.mv(dev.samx, 3, relative=False).wait()
widget.set_positioner(dev.samy.name)
scans.mv(dev.samy, -3, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# TODO passes locally, fails on CI for some reason... -> issue #1003
@pytest.mark.repeat(20)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_ring_progress_bar(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the RingProgressBar widget"""
@@ -477,86 +478,86 @@ def test_widgets_e2e_ring_progress_bar(qtbot, connected_client_gui_obj, random_g
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_scan_control(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the ScanControl widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.ScanControl)
widget: client.ScanControl
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_scan_control(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the ScanControl widget"""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.ScanControl)
# widget: client.ScanControl
# No rpc calls to check so far
# # No rpc calls to check so far
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_scatter_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the ScatterWaveform widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.ScatterWaveform)
widget: client.ScatterWaveform
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_scatter_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the ScatterWaveform widget"""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.ScatterWaveform)
# widget: client.ScatterWaveform
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
widget.plot(dev.samx, dev.samy, dev.bpm4i)
scans.grid_scan(dev.samx, -5, 5, 5, dev.samy, -5, 5, 5, exp_time=0.01, relative=False).wait()
# # Test rpc calls
# dev = bec.device_manager.devices
# scans = bec.scans
# widget.plot(dev.samx, dev.samy, dev.bpm4i)
# scans.grid_scan(dev.samx, -5, 5, 5, dev.samy, -5, 5, 5, exp_time=0.01, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_text_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the TextBox widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.TextBox)
widget: client.TextBox
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_text_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the TextBox widget"""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.TextBox)
# widget: client.TextBox
# RPC calls
widget.set_plain_text("Hello World")
widget.set_html_text("<b> Hello World HTML </b>")
# # RPC calls
# widget.set_plain_text("Hello World")
# widget.set_html_text("<b> Hello World HTML </b>")
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the Waveform widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.Waveform)
widget: client.Waveform
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the Waveform widget"""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.Waveform)
# widget: client.Waveform
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
widget.plot(dev.bpm4i)
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
s.wait()
# # Test rpc calls
# dev = bec.device_manager.devices
# scans = bec.scans
# widget.plot(dev.bpm4i)
# s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
# s.wait()
def _wait_for_scan_in_history():
# Get scan item from history
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
return scan_item is not None
# def _wait_for_scan_in_history():
# # Get scan item from history
# scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
# return scan_item is not None
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
samx_data = scan_item.devices.samx.samx.read()["value"]
bpm4i_data = scan_item.devices.bpm4i.bpm4i.read()["value"]
curve = widget.curves[0]
assert np.allclose(curve.get_data()[0], samx_data)
assert np.allclose(curve.get_data()[1], bpm4i_data)
# scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
# samx_data = scan_item.devices.samx.samx.read()["value"]
# bpm4i_data = scan_item.devices.bpm4i.bpm4i.read()["value"]
# curve = widget.curves[0]
# assert np.allclose(curve.get_data()[0], samx_data)
# assert np.allclose(curve.get_data()[1], bpm4i_data)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)