1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-13 12:10:54 +02:00

Compare commits

..

12 Commits

Author SHA1 Message Date
semantic-release
da1dc85b44 3.2.4
Automatically generated by python-semantic-release
2026-03-19 17:23:19 +00:00
28be696f7c fix(main_app): setApplicationName("BEC") 2026-03-19 18:22:29 +01:00
semantic-release
008c3a223a 3.2.3
Automatically generated by python-semantic-release
2026-03-16 15:07:09 +00:00
b9145d762c fix: check adding parent for filesystemmodel 2026-03-16 16:06:22 +01:00
37a5dc2e9e fix: refactor client mock with global fakeredis 2026-03-16 16:06:22 +01:00
1351fcd47b ci: fix path for uploading logs on failure 2026-03-16 15:49:22 +01:00
semantic-release
14a6b04b11 3.2.2
Automatically generated by python-semantic-release
2026-03-16 14:28:24 +00:00
4c9d7fddce fix(image): disconnecting of 2d monitor 2026-03-16 15:26:40 +01:00
semantic-release
39ecb89196 3.2.1
Automatically generated by python-semantic-release
2026-03-16 14:08:42 +00:00
974f25997d fix(e2e): bec shell excluded from e2e testing 2026-03-16 15:07:51 +01:00
e061fa31a9 fix(e2e): bec dock rpc fixed synchronization 2026-03-16 15:07:51 +01:00
718f99527c fix(e2e): timeout for maybe_remove_dock_area 2026-03-16 15:07:51 +01:00
23 changed files with 835 additions and 573 deletions

View File

@@ -1,19 +1,19 @@
name: Full CI
on:
on:
push:
pull_request:
workflow_dispatch:
inputs:
BEC_WIDGETS_BRANCH:
description: "Branch of BEC Widgets to install"
description: 'Branch of BEC Widgets to install'
required: false
type: string
BEC_CORE_BRANCH:
description: "Branch of BEC Core to install"
description: 'Branch of BEC Core to install'
required: false
type: string
OPHYD_DEVICES_BRANCH:
description: "Branch of Ophyd Devices to install"
description: 'Branch of Ophyd Devices to install'
required: false
type: string
@@ -25,58 +25,60 @@ permissions:
pull-requests: write
jobs:
# check_pr_status:
# uses: ./.github/workflows/check_pr.yml
check_pr_status:
uses: ./.github/workflows/check_pr.yml
# formatter:
# needs: check_pr_status
# if: needs.check_pr_status.outputs.branch-pr == ''
# uses: ./.github/workflows/formatter.yml
formatter:
needs: check_pr_status
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/formatter.yml
# unit-test:
# needs: [check_pr_status, formatter]
# if: needs.check_pr_status.outputs.branch-pr == ''
# uses: ./.github/workflows/pytest.yml
# with:
# BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref }}
# BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
# OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}
# secrets:
# CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
unit-test:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/pytest.yml
with:
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref }}
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
# unit-test-matrix:
# needs: [check_pr_status, formatter]
# if: needs.check_pr_status.outputs.branch-pr == ''
# uses: ./.github/workflows/pytest-matrix.yml
# with:
# BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha}}
# BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
# OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}
unit-test-matrix:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/pytest-matrix.yml
with:
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha}}
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}
# generate-cli-test:
# needs: [check_pr_status, formatter]
# if: needs.check_pr_status.outputs.branch-pr == ''
# uses: ./.github/workflows/generate-cli-check.yml
generate-cli-test:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/generate-cli-check.yml
end2end-test:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/end2end-conda.yml
# child-repos:
# needs: [check_pr_status, formatter]
# if: needs.check_pr_status.outputs.branch-pr == ''
# uses: ./.github/workflows/child_repos.yml
# with:
# BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
# OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
# BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
child-repos:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/child_repos.yml
with:
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
plugin_repos:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: bec-project/bec/.github/workflows/plugin_repos.yml@main
with:
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
# plugin_repos:
# needs: [check_pr_status, formatter]
# if: needs.check_pr_status.outputs.branch-pr == ''
# uses: bec-project/bec/.github/workflows/plugin_repos.yml@main
# with:
# BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
# BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
# secrets:
# GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}
secrets:
GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}

View File

@@ -48,9 +48,7 @@ jobs:
source ./bin/install_bec_dev.sh -t
cd ../
pip install -e ./ophyd_devices -e .[dev,pyside6] -e ./bec_testing_plugin
pip install pytest-repeat
pytest -v --files-path ./ --start-servers --random-order ./tests/end-2-end/user_interaction/test_user_interaction_e2e.py
pytest -v --files-path ./ --start-servers --random-order ./tests/end-2-end
- name: Upload logs if job fails
if: failure()

View File

@@ -1,6 +1,52 @@
# CHANGELOG
## v3.2.4 (2026-03-19)
### Bug Fixes
- **main_app**: Setapplicationname("bec")
([`28be696`](https://github.com/bec-project/bec_widgets/commit/28be696f7c7d9762c742c6d5fb5b03867d5e92ea))
## v3.2.3 (2026-03-16)
### Bug Fixes
- Check adding parent for filesystemmodel
([`b9145d7`](https://github.com/bec-project/bec_widgets/commit/b9145d762cdf946f184834928a6404f21b4802a9))
- Refactor client mock with global fakeredis
([`37a5dc2`](https://github.com/bec-project/bec_widgets/commit/37a5dc2e9eeb447d174f4d7087051672f308c84c))
### Continuous Integration
- Fix path for uploading logs on failure
([`1351fcd`](https://github.com/bec-project/bec_widgets/commit/1351fcd47b909c1a33cb389c096041eb1449e3d3))
## v3.2.2 (2026-03-16)
### Bug Fixes
- **image**: Disconnecting of 2d monitor
([`4c9d7fd`](https://github.com/bec-project/bec_widgets/commit/4c9d7fddce7aa5b7f13a00ac332bd54b301e3c28))
## v3.2.1 (2026-03-16)
### Bug Fixes
- **e2e**: Bec dock rpc fixed synchronization
([`e061fa3`](https://github.com/bec-project/bec_widgets/commit/e061fa31a9a5e5c00e44337d7cc52c51d8e259b5))
- **e2e**: Bec shell excluded from e2e testing
([`974f259`](https://github.com/bec-project/bec_widgets/commit/974f25997d68d13ff1063026f9e5c4c8dd4d49f3))
- **e2e**: Timeout for maybe_remove_dock_area
([`718f995`](https://github.com/bec-project/bec_widgets/commit/718f99527c3bebb96845d3305aba69434eb83f77))
## v3.2.0 (2026-03-11)
### Features

View File

@@ -378,6 +378,7 @@ def main(): # pragma: no cover
args, qt_args = parser.parse_known_args(sys.argv[1:])
app = QApplication([sys.argv[0], *qt_args])
app.setApplicationName("BEC")
apply_theme("dark")
w = BECMainApp(show_examples=args.examples)

View File

@@ -1,6 +1,7 @@
# pylint: skip-file
from unittest.mock import MagicMock
from bec_lib.config_helper import ConfigHelper
from bec_lib.device import Device as BECDevice
from bec_lib.device import Positioner as BECPositioner
from bec_lib.device import ReadoutPriority
@@ -219,7 +220,9 @@ class Device(FakeDevice):
class DMMock:
def __init__(self):
def __init__(self, *args, **kwargs):
self._service = args[0]
self.config_helper = ConfigHelper(self._service.connector, self._service._service_name)
self.devices = DeviceContainer()
self.enabled_devices = [device for device in self.devices if device.enabled]
@@ -273,6 +276,10 @@ class DMMock:
configs.append(device._config)
return configs
def initialize(*_): ...
def shutdown(self): ...
DEVICES = [
FakePositioner("samx", limits=[-10, 10], read_value=2.0),

View File

@@ -123,17 +123,16 @@ class BECDispatcher:
self._registered_slots: DefaultDict[Hashable, QtThreadSafeCallback] = (
collections.defaultdict()
)
self.client = client
if self.client is None:
if config is not None:
if not isinstance(config, ServiceConfig):
# config is supposed to be a path
config = ServiceConfig(config)
if client is None:
if config is not None and not isinstance(config, ServiceConfig):
# config is supposed to be a path
config = ServiceConfig(config)
self.client = BECClient(
config=config, connector_cls=QtRedisConnector, name="BECWidgets"
)
else:
self.client = client
if self.client.started:
# have to reinitialize client to use proper connector
logger.info("Shutting down BECClient to switch to QtRedisConnector")

View File

@@ -118,9 +118,6 @@ class RPCServer:
logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}")
with rpc_exception_hook(functools.partial(self.send_response, request_id, False)):
try:
logger.info(
f"Processing RPC instruction: {msg['action']} with request_id: {request_id}. Parameters: {msg.get('parameter')}"
)
method = msg["action"]
args = msg["parameter"].get("args", [])
kwargs = msg["parameter"].get("kwargs", {})
@@ -260,10 +257,6 @@ class RPCServer:
else:
name = WidgetContainerUtils.generate_unique_name("dock_area", existing_dock_areas)
logger.info(
f"Launching new dock area with name: {name} and startup_profile: {startup_profile}"
)
result_widget = bw_launch.dock_area(object_name=name, startup_profile=startup_profile)
result_widget.window().setWindowTitle(f"BEC - {name}")
@@ -303,9 +296,6 @@ class RPCServer:
else:
res = self.serialize_object(res)
except RegistryNotReadyError:
logger.info(
f"Object not registered yet for RPC request {request_id}, retrying serialization after {retry_delay} ms"
)
try:
self._rpc_singleshot_repeats[request_id] += retry_delay
QTimer.singleShot(

View File

@@ -63,7 +63,7 @@ class ScriptTreeWidget(QWidget):
layout.setSpacing(0)
# Create tree view
self.tree = QTreeView()
self.tree = QTreeView(parent=self)
self.tree.setHeaderHidden(True)
self.tree.setRootIsDecorated(True)
@@ -71,12 +71,12 @@ class ScriptTreeWidget(QWidget):
self.tree.setMouseTracking(True)
# Create file system model
self.model = QFileSystemModel()
self.model = QFileSystemModel(parent=self)
self.model.setNameFilters(["*.py"])
self.model.setNameFilterDisables(False)
# Create proxy model to filter out underscore directories
self.proxy_model = QSortFilterProxyModel()
self.proxy_model = QSortFilterProxyModel(parent=self)
self.proxy_model.setFilterRegularExpression(QRegularExpression("^[^_].*"))
self.proxy_model.setSourceModel(self.model)
self.tree.setModel(self.proxy_model)

View File

@@ -270,6 +270,16 @@ class Image(ImageBase):
return
old_device = self._config.device
old_signal = self._config.signal
old_config = self.subscriptions["main"]
if old_device and old_signal and old_device != value:
self._disconnect_monitor_subscription(
device=old_device,
signal=old_signal,
source=old_config.source,
async_update=self.async_update,
async_signal_name=old_config.async_signal_name,
)
self._config.device = value
# If we have a signal, reconnect with the new device
@@ -325,6 +335,16 @@ class Image(ImageBase):
self._set_connection_status("disconnected")
return
old_signal = self._config.signal
old_config = self.subscriptions["main"]
if self._config.device and old_signal and old_signal != value:
self._disconnect_monitor_subscription(
device=self._config.device,
signal=old_signal,
source=old_config.source,
async_update=self.async_update,
async_signal_name=old_config.async_signal_name,
)
self._config.signal = value
# If we have a device, try to connect
@@ -447,6 +467,61 @@ class Image(ImageBase):
)
self._autorange_on_next_update = True
def _disconnect_monitor_subscription(
self,
*,
device: str,
signal: str,
source: Literal["device_monitor_1d", "device_monitor_2d"] | None,
async_update: bool,
async_signal_name: str | None,
) -> None:
if not device or not signal:
return
if async_update:
async_signal_name = async_signal_name or signal
ids_to_check = [self.scan_id, self.old_scan_id]
if source == "device_monitor_1d":
for scan_id in ids_to_check:
if scan_id is None:
continue
self.bec_dispatcher.disconnect_slot(
self.on_image_update_1d,
MessageEndpoints.device_async_signal(scan_id, device, async_signal_name),
)
logger.info(
f"Disconnecting 1d update ScanID:{scan_id}, Device Name:{device},Device Entry:{async_signal_name}"
)
elif source == "device_monitor_2d":
for scan_id in ids_to_check:
if scan_id is None:
continue
self.bec_dispatcher.disconnect_slot(
self.on_image_update_2d,
MessageEndpoints.device_async_signal(scan_id, device, async_signal_name),
)
logger.info(
f"Disconnecting 2d update ScanID:{scan_id}, Device Name:{device},Device Entry:{async_signal_name}"
)
return
if source == "device_monitor_1d":
self.bec_dispatcher.disconnect_slot(
self.on_image_update_1d, MessageEndpoints.device_preview(device, signal)
)
logger.info(
f"Disconnecting preview 1d update Device Name:{device}, Device Entry:{signal}"
)
elif source == "device_monitor_2d":
self.bec_dispatcher.disconnect_slot(
self.on_image_update_2d, MessageEndpoints.device_preview(device, signal)
)
logger.info(
f"Disconnecting preview 2d update Device Name:{device}, Device Entry:{signal}"
)
def _disconnect_current_monitor(self):
"""
Internal method to disconnect the current monitor subscriptions.
@@ -455,55 +530,13 @@ class Image(ImageBase):
return
config = self.subscriptions["main"]
if self.async_update:
async_signal_name = config.async_signal_name or self._config.signal
ids_to_check = [self.scan_id, self.old_scan_id]
if config.source == "device_monitor_1d":
for scan_id in ids_to_check:
if scan_id is None:
continue
self.bec_dispatcher.disconnect_slot(
self.on_image_update_1d,
MessageEndpoints.device_async_signal(
scan_id, self._config.device, async_signal_name
),
)
logger.info(
f"Disconnecting 1d update ScanID:{scan_id}, Device Name:{self._config.device},Device Entry:{async_signal_name}"
)
elif config.source == "device_monitor_2d":
for scan_id in ids_to_check:
if scan_id is None:
continue
self.bec_dispatcher.disconnect_slot(
self.on_image_update_2d,
MessageEndpoints.device_async_signal(
scan_id, self._config.device, async_signal_name
),
)
logger.info(
f"Disconnecting 2d update ScanID:{scan_id}, Device Name:{self._config.device},Device Entry:{async_signal_name}"
)
else:
if config.source == "device_monitor_1d":
self.bec_dispatcher.disconnect_slot(
self.on_image_update_1d,
MessageEndpoints.device_preview(self._config.device, self._config.signal),
)
logger.info(
f"Disconnecting preview 1d update Device Name:{self._config.device}, Device Entry:{self._config.signal}"
)
elif config.source == "device_monitor_2d":
self.bec_dispatcher.disconnect_slot(
self.on_image_update_2d,
MessageEndpoints.device_preview(self._config.device, self._config.signal),
)
logger.info(
f"Disconnecting preview 2d update Device Name:{self._config.device}, Device Entry:{self._config.signal}"
)
self._disconnect_monitor_subscription(
device=self._config.device,
signal=self._config.signal,
source=config.source,
async_update=self.async_update,
async_signal_name=config.async_signal_name,
)
# Reset async state
self.async_update = False
@@ -860,45 +893,19 @@ class Image(ImageBase):
logger.warning("Cannot disconnect monitor without both device and signal")
return
if self.async_update:
async_signal_name = config.async_signal_name or target_entry
ids_to_check = [self.scan_id, self.old_scan_id]
if config.source == "device_monitor_1d":
for scan_id in ids_to_check:
if scan_id is None:
continue
self.bec_dispatcher.disconnect_slot(
self.on_image_update_1d,
MessageEndpoints.device_async_signal(
scan_id, target_device, async_signal_name
),
)
elif config.source == "device_monitor_2d":
for scan_id in ids_to_check:
if scan_id is None:
continue
self.bec_dispatcher.disconnect_slot(
self.on_image_update_2d,
MessageEndpoints.device_async_signal(
scan_id, target_device, async_signal_name
),
)
else:
if config.source == "device_monitor_1d":
self.bec_dispatcher.disconnect_slot(
self.on_image_update_1d,
MessageEndpoints.device_preview(target_device, target_entry),
)
elif config.source == "device_monitor_2d":
self.bec_dispatcher.disconnect_slot(
self.on_image_update_2d,
MessageEndpoints.device_preview(target_device, target_entry),
)
else:
logger.warning(
f"Cannot disconnect monitor {target_device}.{target_entry} with source {self.subscriptions['main'].source}"
)
return
if config.source not in {"device_monitor_1d", "device_monitor_2d"}:
logger.warning(
f"Cannot disconnect monitor {target_device}.{target_entry} with source {self.subscriptions['main'].source}"
)
return
self._disconnect_monitor_subscription(
device=target_device,
signal=target_entry,
source=config.source,
async_update=self.async_update,
async_signal_name=config.async_signal_name,
)
self.subscriptions["main"].async_signal_name = None
self.async_update = False

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "bec_widgets"
version = "3.2.0"
version = "3.2.4"
description = "BEC Widgets"
requires-python = ">=3.11"
classifiers = [

View File

@@ -75,6 +75,13 @@ def test_dock_manipulations_e2e(qtbot, connected_client_gui_obj):
w1 = dock_area.new("Waveform")
w2 = dock_area.new("Waveform")
qtbot.waitUntil(
lambda: all(
gui_id in gui._server_registry for gui_id in [w0._gui_id, w1._gui_id, w2._gui_id]
),
timeout=5000,
)
assert hasattr(gui.bec, "Waveform")
assert hasattr(gui.bec, "Waveform_0")
assert hasattr(gui.bec, "Waveform_1")
@@ -126,6 +133,7 @@ def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
xw = gui.new("X")
xw.delete_all()
qtbot.waitUntil(lambda: len(gui.windows) == 2, timeout=3000)
assert xw.__class__.__name__ == "RPCReference"
assert gui._ipython_registry[xw._gui_id].__class__.__name__ == "BECDockArea"
assert len(gui.windows) == 2
@@ -145,14 +153,15 @@ def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
qtbot.waitUntil(wait_for_gui_started, timeout=3000)
# gui.windows should have bec with gui_id 'bec'
qtbot.waitUntil(lambda: len(gui.windows) == 1, timeout=3000)
assert len(gui.windows) == 1
# communication should work, main dock area should have same id and be visible
yw = gui.new("Y")
qtbot.waitUntil(lambda: len(gui.windows) == 2, timeout=3000)
yw.delete_all()
qtbot.waitUntil(lambda: len(gui.windows) == 2, timeout=3000)
assert len(gui.windows) == 2
yw.remove()
qtbot.waitUntil(lambda: len(gui.windows) == 1, timeout=3000)
assert len(gui.windows) == 1
assert len(gui.windows) == 1 # only bec is left

View File

@@ -89,8 +89,8 @@ def test_available_widgets(qtbot, connected_client_gui_obj):
# Skip private attributes
if object_name.startswith("_"):
continue
# Skip VSCode widget as Code server is not available in the Docker image
if object_name == "VSCodeEditor":
# Skip BECShell as ttyd is not installed
if object_name == "BECShell":
continue
# Skip WebConsole as ttyd is not installed

View File

@@ -128,167 +128,147 @@ def maybe_remove_dock_area(qtbot, gui: BECGuiClient, random_int_gen: random.Rand
random_int = random_int_gen.randint(0, 100)
if random_int >= 50:
# Needed, reference gets deleted in the gui
name = gui.dock_area.object_name
gui_id = gui.dock_area._gui_id
gui.dock_area.delete_all() # start fresh
gui.delete("dock_area")
wait_for_namespace_change(
qtbot, gui=gui, parent_widget=gui, object_name=name, widget_gui_id=gui_id, exists=False
)
qtbot.waitUntil(lambda: hasattr(gui, "dock_area") is False, timeout=5000)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_bec_progress_bar(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the BECProgressBar widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.BECProgressBar)
widget: client.BECProgressBar
# Check rpc calls
assert widget.label_template == "$value / $maximum - $percentage %"
widget.set_maximum(100)
widget.set_minimum(50)
widget.set_value(75)
assert widget._get_label() == "75 / 100 - 50 %"
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_bec_queue(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the BECQueue widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.BECQueue)
widget: client.BECQueue
# No rpc calls to test so far
# maybe we can add an rpc call to check the queue length
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_bec_status_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the BECStatusBox widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.BECStatusBox)
# Check rpc calls
assert widget.get_server_state() in ["RUNNING", "IDLE", "BUSY", "ERROR"]
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_dap_combo_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the DAPComboBox widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.DapComboBox)
widget: client.DAPComboBox
# Check rpc calls
widget.select_fit_model("PseudoVoigtModel")
widget.select_x_axis("samx")
widget.select_y_axis("bpm4i")
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_device_browser(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the DeviceBrowser widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.DeviceBrowser)
widget: client.DeviceBrowser
# No rpc calls yet to check
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_image(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the Image widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.Image)
widget: client.Image
scans = bec.scans
dev = bec.device_manager.devices
# Test rpc calls
img = widget.image(device=dev.eiger.name, signal="preview")
assert img.get_data() is None
# Run a scan and plot the image
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
s.wait()
def _wait_for_scan_in_history():
# Get scan item from history
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
return scan_item is not None
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# Check that last image is equivalent to data in Redis
last_img = bec.connector.get_last(MessageEndpoints.device_preview("eiger", "preview"))[
"data"
].data
assert np.allclose(img.get_data(), last_img)
# Now add a device with a preview signal
img = widget.image(device="eiger", signal="preview")
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
s.wait()
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# TODO re-enable when issue is resolved #560
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_bec_progress_bar(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the BECProgressBar widget."""
# def test_widgets_e2e_log_panel(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the LogPanel widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.BECProgressBar)
# widget: client.BECProgressBar
# # Check rpc calls
# assert widget.label_template == "$value / $maximum - $percentage %"
# widget.set_maximum(100)
# widget.set_minimum(50)
# widget.set_value(75)
# assert widget._get_label() == "75 / 100 - 50 %"
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_bec_queue(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the BECQueue widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.BECQueue)
# widget: client.BECQueue
# # No rpc calls to test so far
# # maybe we can add an rpc call to check the queue length
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_bec_status_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the BECStatusBox widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.BECStatusBox)
# # Check rpc calls
# assert widget.get_server_state() in ["RUNNING", "IDLE", "BUSY", "ERROR"]
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_dap_combo_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the DAPComboBox widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.DapComboBox)
# widget: client.DAPComboBox
# # Check rpc calls
# widget.select_fit_model("PseudoVoigtModel")
# widget.select_x_axis("samx")
# widget.select_y_axis("bpm4i")
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_device_browser(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the DeviceBrowser widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.DeviceBrowser)
# widget: client.DeviceBrowser
# # No rpc calls yet to check
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_image(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the Image widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.Image)
# widget: client.Image
# scans = bec.scans
# dev = bec.device_manager.devices
# # Test rpc calls
# img = widget.image(device=dev.eiger.name, signal="preview")
# assert img.get_data() is None
# # Run a scan and plot the image
# s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
# s.wait()
# def _wait_for_scan_in_history():
# # Get scan item from history
# scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
# return scan_item is not None
# qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# # Check that last image is equivalent to data in Redis
# last_img = bec.connector.get_last(MessageEndpoints.device_preview("eiger", "preview"))[
# "data"
# ].data
# assert np.allclose(img.get_data(), last_img)
# # Now add a device with a preview signal
# img = widget.image(device="eiger", signal="preview")
# s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
# s.wait()
# qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# # TODO re-enable when issue is resolved #560
# # @pytest.mark.timeout(PYTEST_TIMEOUT)
# # def test_widgets_e2e_log_panel(qtbot, connected_client_gui_obj, random_generator_from_seed):
# # """Test the LogPanel widget."""
# # gui = connected_client_gui_obj
# # bec = gui._client
# # # Create dock_area and widget
# # widget = create_widget(qtbot, gui, gui.available_widgets.LogPanel)
# # widget: client.LogPanel
# # # No rpc calls to check so far
# # # Test removing the widget, or leaving it open for the next test
# # maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_minesweeper(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the MineSweeper widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.Minesweeper)
# widget: client.MineSweeper
# widget = create_widget(qtbot, gui, gui.available_widgets.LogPanel)
# widget: client.LogPanel
# # No rpc calls to check so far
@@ -296,160 +276,175 @@ def maybe_remove_dock_area(qtbot, gui: BECGuiClient, random_int_gen: random.Rand
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_motor_map(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the MotorMap widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.MotorMap)
# widget: client.MotorMap
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_minesweeper(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the MineSweeper widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.Minesweeper)
widget: client.MineSweeper
# # Test RPC calls
# dev = bec.device_manager.devices
# scans = bec.scans
# # Set motor map to names
# widget.map(dev.samx, dev.samy)
# # Move motor samx to pos
# pos = dev.samx.limits[1] - 1 # -1 from higher limit
# scans.mv(dev.samx, pos, relative=False).wait()
# # Check that data is up to date
# assert np.isclose(widget.get_data()["x"][-1], pos, dev.samx.precision)
# # Move motor samy to pos
# pos = dev.samy.limits[0] + 1 # +1 from lower limit
# scans.mv(dev.samy, pos, relative=False).wait()
# # Check that data is up to date
# assert np.isclose(widget.get_data()["y"][-1], pos, dev.samy.precision)
# No rpc calls to check so far
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_multi_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test MultiWaveform widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.MultiWaveform)
# widget: client.MultiWaveform
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_motor_map(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the MotorMap widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.MotorMap)
widget: client.MotorMap
# # Test RPC calls
# dev = bec.device_manager.devices
# scans = bec.scans
# # test plotting
# cm = "cividis"
# widget.plot(dev.waveform, color_palette=cm)
# assert widget.monitor == dev.waveform.name
# assert widget.color_palette == cm
# Test RPC calls
dev = bec.device_manager.devices
scans = bec.scans
# Set motor map to names
widget.map(dev.samx, dev.samy)
# Move motor samx to pos
pos = dev.samx.limits[1] - 1 # -1 from higher limit
scans.mv(dev.samx, pos, relative=False).wait()
# Check that data is up to date
assert np.isclose(widget.get_data()["x"][-1], pos, dev.samx.precision)
# Move motor samy to pos
pos = dev.samy.limits[0] + 1 # +1 from lower limit
scans.mv(dev.samy, pos, relative=False).wait()
# Check that data is up to date
assert np.isclose(widget.get_data()["y"][-1], pos, dev.samy.precision)
# # Scan with BEC
# s = scans.line_scan(dev.samx, -3, 3, steps=5, exp_time=0.01, relative=False)
# s.wait()
# def _wait_for_scan_in_history():
# # Get scan item from history
# scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
# return scan_item is not None
# qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# # Wait for data in history (should be plotted?)
# # TODO how can we check that the data was plotted, implement get_data()
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_positioner_indicator(
# qtbot, connected_client_gui_obj, random_generator_from_seed
# ):
# """Test the PositionIndicator widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.PositionIndicator)
# widget: client.PositionIndicator
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_multi_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test MultiWaveform widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.MultiWaveform)
widget: client.MultiWaveform
# # TODO check what these rpc calls are supposed to do! Issue created #461
# widget.set_value(5)
# Test RPC calls
dev = bec.device_manager.devices
scans = bec.scans
# test plotting
cm = "cividis"
widget.plot(dev.waveform, color_palette=cm)
assert widget.monitor == dev.waveform.name
assert widget.color_palette == cm
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# Scan with BEC
s = scans.line_scan(dev.samx, -3, 3, steps=5, exp_time=0.01, relative=False)
s.wait()
def _wait_for_scan_in_history():
# Get scan item from history
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
return scan_item is not None
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# Wait for data in history (should be plotted?)
# TODO how can we check that the data was plotted, implement get_data()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_positioner_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the PositionerBox widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox)
# widget: client.PositionerBox
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_indicator(
qtbot, connected_client_gui_obj, random_generator_from_seed
):
"""Test the PositionIndicator widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.PositionIndicator)
widget: client.PositionIndicator
# # Test rpc calls
# dev = bec.device_manager.devices
# scans = bec.scans
# # No rpc calls to check so far
# widget.set_positioner(dev.samx)
# widget.set_positioner(dev.samy.name)
# TODO check what these rpc calls are supposed to do! Issue created #461
widget.set_value(5)
# scans.mv(dev.samy, -3, relative=False).wait()
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_positioner_box_2d(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the PositionerBox2D widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox2D)
# widget: client.PositionerBox2D
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the PositionerBox widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox)
widget: client.PositionerBox
# # Test rpc calls
# dev = bec.device_manager.devices
# scans = bec.scans
# # No rpc calls to check so far
# widget.set_positioner_hor(dev.samx)
# widget.set_positioner_ver(dev.samy)
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
# No rpc calls to check so far
widget.set_positioner(dev.samx)
widget.set_positioner(dev.samy.name)
# # Try moving the motors
# scans.mv(dev.samx, 3, relative=False).wait()
# scans.mv(dev.samy, -3, relative=False).wait()
scans.mv(dev.samy, -3, relative=False).wait()
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_positioner_control_line(
# qtbot, connected_client_gui_obj, random_generator_from_seed
# ):
# """Test the positioner control line widget"""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.PositionerControlLine)
# widget: client.PositionerControlLine
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_box_2d(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the PositionerBox2D widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox2D)
widget: client.PositionerBox2D
# # Test rpc calls
# dev = bec.device_manager.devices
# scans = bec.scans
# # Set positioner
# widget.set_positioner(dev.samx)
# scans.mv(dev.samx, 3, relative=False).wait()
# widget.set_positioner(dev.samy.name)
# scans.mv(dev.samy, -3, relative=False).wait()
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
# No rpc calls to check so far
widget.set_positioner_hor(dev.samx)
widget.set_positioner_ver(dev.samy)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# Try moving the motors
scans.mv(dev.samx, 3, relative=False).wait()
scans.mv(dev.samy, -3, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.repeat(20)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_control_line(
qtbot, connected_client_gui_obj, random_generator_from_seed
):
"""Test the positioner control line widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.PositionerControlLine)
widget: client.PositionerControlLine
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
# Set positioner
widget.set_positioner(dev.samx)
scans.mv(dev.samx, 3, relative=False).wait()
widget.set_positioner(dev.samy.name)
scans.mv(dev.samy, -3, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# TODO passes locally, fails on CI for some reason... -> issue #1003
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_ring_progress_bar(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the RingProgressBar widget"""
@@ -478,86 +473,86 @@ def test_widgets_e2e_ring_progress_bar(qtbot, connected_client_gui_obj, random_g
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_scan_control(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the ScanControl widget"""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.ScanControl)
# widget: client.ScanControl
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_scan_control(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the ScanControl widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.ScanControl)
widget: client.ScanControl
# # No rpc calls to check so far
# No rpc calls to check so far
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_scatter_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the ScatterWaveform widget"""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.ScatterWaveform)
# widget: client.ScatterWaveform
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_scatter_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the ScatterWaveform widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.ScatterWaveform)
widget: client.ScatterWaveform
# # Test rpc calls
# dev = bec.device_manager.devices
# scans = bec.scans
# widget.plot(dev.samx, dev.samy, dev.bpm4i)
# scans.grid_scan(dev.samx, -5, 5, 5, dev.samy, -5, 5, 5, exp_time=0.01, relative=False).wait()
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
widget.plot(dev.samx, dev.samy, dev.bpm4i)
scans.grid_scan(dev.samx, -5, 5, 5, dev.samy, -5, 5, 5, exp_time=0.01, relative=False).wait()
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_text_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the TextBox widget"""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.TextBox)
# widget: client.TextBox
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_text_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the TextBox widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.TextBox)
widget: client.TextBox
# # RPC calls
# widget.set_plain_text("Hello World")
# widget.set_html_text("<b> Hello World HTML </b>")
# RPC calls
widget.set_plain_text("Hello World")
widget.set_html_text("<b> Hello World HTML </b>")
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the Waveform widget"""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.Waveform)
# widget: client.Waveform
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the Waveform widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.Waveform)
widget: client.Waveform
# # Test rpc calls
# dev = bec.device_manager.devices
# scans = bec.scans
# widget.plot(dev.bpm4i)
# s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
# s.wait()
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
widget.plot(dev.bpm4i)
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
s.wait()
# def _wait_for_scan_in_history():
# # Get scan item from history
# scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
# return scan_item is not None
def _wait_for_scan_in_history():
# Get scan item from history
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
return scan_item is not None
# qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
# samx_data = scan_item.devices.samx.samx.read()["value"]
# bpm4i_data = scan_item.devices.bpm4i.bpm4i.read()["value"]
# curve = widget.curves[0]
# assert np.allclose(curve.get_data()[0], samx_data)
# assert np.allclose(curve.get_data()[1], bpm4i_data)
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
samx_data = scan_item.devices.samx.samx.read()["value"]
bpm4i_data = scan_item.devices.bpm4i.bpm4i.read()["value"]
curve = widget.curves[0]
assert np.allclose(curve.get_data()[0], samx_data)
assert np.allclose(curve.get_data()[1], bpm4i_data)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)

View File

@@ -1,47 +1,18 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
from math import inf
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, PropertyMock, patch
import fakeredis
import pytest
from bec_lib.bec_service import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.redis_connector import RedisConnector
from bec_lib.scan_history import ScanHistory
from bec_widgets.tests.utils import DEVICES, DMMock, FakePositioner, Positioner
def fake_redis_server(host, port, **kwargs):
redis = fakeredis.FakeRedis()
return redis
from bec_widgets.tests.utils import FakePositioner, Positioner
@pytest.fixture(scope="function")
def mocked_client(bec_dispatcher):
connector = RedisConnector("localhost:1", redis_cls=fake_redis_server)
# Create a MagicMock object
client = MagicMock() # TODO change to real BECClient
# Shutdown the original client
bec_dispatcher.client.shutdown()
# Mock the connector attribute
bec_dispatcher.client = client
# Mock the device_manager.devices attribute
client.connector = connector
client.device_manager = DMMock()
client.device_manager.add_devices(DEVICES)
def mock_mv(*args, relative=False):
# Extracting motor and value pairs
for i in range(0, len(args), 2):
motor = args[i]
value = args[i + 1]
motor.move(value, relative=relative)
return MagicMock(wait=MagicMock())
client.scans = MagicMock(mv=mock_mv)
# Ensure isinstance check for Positioner passes
original_isinstance = isinstance
@@ -52,8 +23,8 @@ def mocked_client(bec_dispatcher):
return original_isinstance(obj, class_info)
with patch("builtins.isinstance", new=isinstance_mock):
yield client
connector.shutdown() # TODO change to real BECClient
yield bec_dispatcher.client
bec_dispatcher.client.connector.shutdown()
##################################################
@@ -190,17 +161,16 @@ def mocked_client_with_dap(mocked_client, dap_plugin_message):
name="LmfitService1D", status=1, info={}
),
}
client = mocked_client
client.service_status = dap_services
client.connector.set(
type(mocked_client).service_status = PropertyMock(return_value=dap_services)
mocked_client.connector.set(
topic=MessageEndpoints.dap_available_plugins("dap"), msg=dap_plugin_message
)
# Patch the client's DAP attribute so that the available models include "GaussianModel"
patched_models = {"GaussianModel": {}, "LorentzModel": {}, "SineModel": {}}
client.dap._available_dap_plugins = patched_models
mocked_client.dap._available_dap_plugins = patched_models
yield client
yield mocked_client
class DummyData:
@@ -233,7 +203,6 @@ def create_dummy_scan_item():
"readout_priority": {"monitored": ["bpm4i"], "async": ["async_device"]},
}
}
dummy_scan.status_message = MagicMock()
dummy_scan.status_message.info = {
"readout_priority": {"monitored": ["bpm4i"], "async": ["async_device"]},
"scan_report_devices": ["samx"],

View File

@@ -1,19 +1,28 @@
import json
import time
from unittest import mock
from unittest.mock import patch
import fakeredis
import h5py
import numpy as np
import pytest
from bec_lib import messages
from bec_lib import messages, service_config
from bec_lib.bec_service import messages
from bec_lib.client import BECClient
from bec_lib.messages import _StoredDataInfo
from bec_qthemes import apply_theme
from bec_qthemes._theme import Theme
from ophyd._pyepics_shim import _dispatcher
from pytestqt.exceptions import TimeoutError as QtBotTimeoutError
from qtpy.QtCore import QEvent, QEventLoop
from qtpy.QtWidgets import QApplication, QMessageBox
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.tests.utils import DEVICES, DMMock
from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module
from bec_widgets.utils import error_popups
from bec_widgets.utils.bec_dispatcher import QtRedisConnector
# Patch to set default RAISE_ERROR_DEFAULT to True for tests
# This means that by default, error popups will raise exceptions during tests
@@ -38,15 +47,20 @@ def process_all_deferred_deletes(qapp):
def qapplication(qtbot, request, testable_qtimer_class): # pylint: disable=unused-argument
qapp = QApplication.instance()
process_all_deferred_deletes(qapp)
apply_theme("light")
qapp.processEvents()
if (
not hasattr(qapp, "theme")
or not isinstance(qapp.theme, Theme)
or qapp.theme.theme != "light"
):
apply_theme("light")
qapp.processEvents()
yield
# if the test failed, we don't want to check for open widgets as
# it simply pollutes the output
# stop pyepics dispatcher for leaking tests
from ophyd._pyepics_shim import _dispatcher
_dispatcher.stop()
if request.node.stash._storage.get("failed"):
@@ -71,9 +85,37 @@ def rpc_register():
RPCRegister.reset_singleton()
_REDIS_CONN: QtRedisConnector | None = None
def global_mock_qt_redis_connector(*_, **__):
global _REDIS_CONN
if _REDIS_CONN is None:
_REDIS_CONN = QtRedisConnector(bootstrap="localhost:1", redis_cls=fakeredis.FakeRedis)
return _REDIS_CONN
def mock_client(*_, **__):
with (
patch("bec_lib.client.DeviceManagerBase", DMMock),
patch("bec_lib.client.DAPPlugins"),
patch("bec_lib.client.Scans"),
patch("bec_lib.client.ScanManager"),
patch("bec_lib.bec_service.BECAccess"),
):
client = BECClient(
config=service_config.ServiceConfig(config={"redis": {"host": "localhost", "port": 1}}),
connector_cls=global_mock_qt_redis_connector,
)
client.start()
client.device_manager.add_devices(DEVICES)
return client
@pytest.fixture(autouse=True)
def bec_dispatcher(threads_check): # pylint: disable=unused-argument
bec_dispatcher = bec_dispatcher_module.BECDispatcher()
with mock.patch.object(bec_dispatcher_module, "BECClient", mock_client):
bec_dispatcher = bec_dispatcher_module.BECDispatcher()
yield bec_dispatcher
bec_dispatcher.disconnect_all()
# clean BEC client

View File

@@ -1,5 +1,7 @@
# pylint: disable=missing-function-docstring, missing-module-docstring, unused-import
from unittest.mock import MagicMock
import pytest
from bec_widgets.widgets.control.buttons.button_abort.button_abort import AbortButton
@@ -10,6 +12,7 @@ from .client_mocks import mocked_client
@pytest.fixture
def abort_button(qtbot, mocked_client):
widget = AbortButton(client=mocked_client)
widget.queue = MagicMock()
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget

View File

@@ -4,10 +4,45 @@ import time
from unittest import mock
import pytest
from bec_lib import service_config
from bec_lib.messages import ScanMessage
from bec_lib.serialization import MsgpackSerialization
from bec_widgets.utils.bec_dispatcher import QtRedisConnector, QtThreadSafeCallback
from bec_widgets.utils.bec_dispatcher import BECDispatcher, QtRedisConnector, QtThreadSafeCallback
def test_init_handles_client_and_config_arg():
# Client passed
self_mock = mock.MagicMock(_initialized=False)
with mock.patch.object(BECDispatcher, "start_cli_server"):
BECDispatcher.__init__(self_mock, client=mock.MagicMock(name="test_client"))
assert "test_client" in repr(self_mock.client)
# No client, service config object
self_mock.reset_mock()
self_mock._initialized = False
with (
mock.patch.object(BECDispatcher, "start_cli_server"),
mock.patch("bec_widgets.utils.bec_dispatcher.BECClient") as client_cls,
):
config = service_config.ServiceConfig()
BECDispatcher.__init__(self_mock, client=None, config=config)
client_cls.assert_called_with(
config=config, connector_cls=QtRedisConnector, name="BECWidgets"
)
# No client, service config string
self_mock.reset_mock()
self_mock._initialized = False
with (
mock.patch.object(BECDispatcher, "start_cli_server"),
mock.patch("bec_widgets.utils.bec_dispatcher.BECClient"),
mock.patch("bec_widgets.utils.bec_dispatcher.ServiceConfig") as svc_cfg,
mock.patch("bec_widgets.utils.bec_dispatcher.isinstance", return_value=False),
):
config = service_config.ServiceConfig()
BECDispatcher.__init__(self_mock, client=None, config="test_str")
svc_cfg.assert_called_with("test_str")
@pytest.fixture

View File

@@ -160,7 +160,7 @@ def test_signal_display(mocked_client, qtbot):
def test_signal_display_no_device(mocked_client, qtbot):
device_mock = mock.MagicMock()
mocked_client.client.device_manager.devices = {"test_device_1": device_mock}
mocked_client.device_manager.devices = {"test_device_1": device_mock}
signal_display = SignalDisplay(client=mocked_client, device="test_device_2")
qtbot.addWidget(signal_display)
assert (

View File

@@ -146,12 +146,12 @@ def test_signal_lineedit(device_signal_line_edit):
def test_device_signal_input_base_cleanup(qtbot, mocked_client):
with mock.patch.object(mocked_client.callbacks, "remove"):
widget = DeviceInputWidget(client=mocked_client)
widget.close()
widget.deleteLater()
widget = DeviceInputWidget(client=mocked_client)
widget.close()
widget.deleteLater()
mocked_client.callbacks.remove.assert_called_once_with(widget._device_update_register)
mocked_client.callbacks.remove.assert_called_once_with(widget._device_update_register)
def test_signal_combobox_get_signal_name_with_item_data(qtbot, device_signal_combobox):

View File

@@ -197,6 +197,163 @@ def test_image_setup_preview_signal_2d(qtbot, mocked_client):
np.testing.assert_array_equal(view.main_image.image, test_data)
def test_switching_device_disconnects_previous_preview_endpoint(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(mocked_client, "eiger", "img", signal_class="PreviewSignal", ndim=2)
_set_signal_config(mocked_client, "waveform1d", "img", signal_class="PreviewSignal", ndim=2)
connected = []
disconnected = []
monkeypatch.setattr(
view.bec_dispatcher,
"connect_slot",
lambda slot, endpoint, *args, **kwargs: connected.append(endpoint),
)
monkeypatch.setattr(
view.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint, *args, **kwargs: disconnected.append(endpoint),
)
view.image(device="eiger", signal="img")
connected.clear()
disconnected.clear()
view.device = "waveform1d"
assert MessageEndpoints.device_preview("eiger", "img") in disconnected
assert MessageEndpoints.device_preview("waveform1d", "img") in connected
def test_switching_device_disconnects_previous_async_endpoint(qtbot, mocked_client, monkeypatch):
"""
Verify that switching device while async_update=True disconnects device_async_signal
endpoints for both scan_id and old_scan_id on the old device before reconnecting to
the new device.
"""
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(
mocked_client, "eiger", "img", signal_class="AsyncSignal", ndim=2, obj_name="async_obj"
)
_set_signal_config(
mocked_client, "waveform1d", "img", signal_class="AsyncSignal", ndim=2, obj_name="async_obj"
)
connected = []
disconnected = []
monkeypatch.setattr(
view.bec_dispatcher,
"connect_slot",
lambda slot, endpoint, *args, **kwargs: connected.append(endpoint),
)
monkeypatch.setattr(
view.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint, *args, **kwargs: disconnected.append(endpoint),
)
view.image(device="eiger", signal="img")
assert view.async_update is True
assert view.subscriptions["main"].async_signal_name == "async_obj"
view.scan_id = "scan_current"
view.old_scan_id = "scan_previous"
connected.clear()
disconnected.clear()
view.device = "waveform1d"
# Both scan_id and old_scan_id endpoints for the old device must be disconnected
assert (
MessageEndpoints.device_async_signal("scan_current", "eiger", "async_obj") in disconnected
)
assert (
MessageEndpoints.device_async_signal("scan_previous", "eiger", "async_obj") in disconnected
)
# The new device's async endpoint for the current scan must be connected
assert (
MessageEndpoints.device_async_signal("scan_current", "waveform1d", "async_obj") in connected
)
def test_switching_signal_disconnects_previous_preview_endpoint(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(mocked_client, "eiger", "img_a", signal_class="PreviewSignal", ndim=2)
_set_signal_config(mocked_client, "eiger", "img_b", signal_class="PreviewSignal", ndim=2)
connected = []
disconnected = []
monkeypatch.setattr(
view.bec_dispatcher,
"connect_slot",
lambda slot, endpoint, *args, **kwargs: connected.append(endpoint),
)
monkeypatch.setattr(
view.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint, *args, **kwargs: disconnected.append(endpoint),
)
view.image(device="eiger", signal="img_a")
connected.clear()
disconnected.clear()
view.signal = "img_b"
assert MessageEndpoints.device_preview("eiger", "img_a") in disconnected
assert MessageEndpoints.device_preview("eiger", "img_b") in connected
def test_switching_signal_disconnects_previous_async_endpoint(qtbot, mocked_client, monkeypatch):
"""
When the current monitor is an async signal, switching to a different signal must
disconnect the previous async endpoint (based on scan_id/async_signal_name) before
reconnecting with the new signal's async endpoint.
"""
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(
mocked_client, "eiger", "img_a", signal_class="AsyncSignal", ndim=2, obj_name="async_obj_a"
)
_set_signal_config(
mocked_client, "eiger", "img_b", signal_class="AsyncSignal", ndim=2, obj_name="async_obj_b"
)
connected = []
disconnected = []
monkeypatch.setattr(
view.bec_dispatcher,
"connect_slot",
lambda slot, endpoint, *args, **kwargs: connected.append(endpoint),
)
monkeypatch.setattr(
view.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint, *args, **kwargs: disconnected.append(endpoint),
)
# Connect to img_a as an async signal; scan_id is None so no actual subscription is made
view.image(device="eiger", signal="img_a")
assert view.async_update is True
assert view.subscriptions["main"].async_signal_name == "async_obj_a"
assert view.subscriptions["main"].source == "device_monitor_2d"
# Simulate an active scan so that the async endpoint is real
view.scan_id = "scan_123"
connected.clear()
disconnected.clear()
# Switch to a different signal
view.signal = "img_b"
# The previous async endpoint for img_a must have been disconnected
expected_disconnect = MessageEndpoints.device_async_signal("scan_123", "eiger", "async_obj_a")
assert expected_disconnect in disconnected
# The new async endpoint for img_b must have been connected
expected_connect = MessageEndpoints.device_async_signal("scan_123", "eiger", "async_obj_b")
assert expected_connect in connected
def test_preview_signals_skip_0d_entries(qtbot, mocked_client, monkeypatch):
"""
Preview/async combobox should omit 0D signals.

View File

@@ -1,4 +1,4 @@
from unittest.mock import patch
from unittest.mock import MagicMock, patch
import numpy as np
@@ -53,14 +53,16 @@ def test_scatter_waveform_update_with_scan_history(qtbot, mocked_client, monkeyp
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
dummy_scan = create_dummy_scan_item()
mocked_client.history = MagicMock()
# .get_by_scan_id() typically returns historical data, but we abuse it here
# to return mock live data
mocked_client.history.get_by_scan_id.return_value = dummy_scan
mocked_client.history.__getitem__.return_value = dummy_scan
swf.plot("samx", "samy", "bpm4i", label="test_curve")
swf.update_with_scan_history(scan_id="dummy")
qtbot.wait(500)
assert swf.scan_item == dummy_scan
qtbot.waitUntil(lambda: swf.scan_item == dummy_scan, timeout=500)
qtbot.wait(200)
x_data, y_data = swf.main_curve.getData()
np.testing.assert_array_equal(x_data, [10, 20, 30])

View File

@@ -8,7 +8,7 @@ from .client_mocks import mocked_client
@pytest.fixture
def plot_widget_with_arrow_item(qtbot, mocked_client):
widget = Waveform(client=mocked_client())
widget = Waveform(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
@@ -17,7 +17,7 @@ def plot_widget_with_arrow_item(qtbot, mocked_client):
@pytest.fixture
def plot_widget_with_tick_item(qtbot, mocked_client):
widget = Waveform(client=mocked_client())
widget = Waveform(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)

View File

@@ -189,10 +189,10 @@ def test_bec_shell_startup_contains_gui_id(bec_shell_widget):
assert bec_shell._is_bec_shell
assert bec_shell._unique_id == "bec_shell"
assert bec_shell.startup_cmd == "bec --nogui"
with mock.patch.object(bec_shell.bec_dispatcher, "cli_server", None):
assert bec_shell.startup_cmd == "bec --nogui"
with mock.patch.object(bec_shell.bec_dispatcher, "cli_server") as mock_cli_server:
mock_cli_server.gui_id = "test_gui_id"
with mock.patch.object(bec_shell.bec_dispatcher.cli_server, "gui_id", "test_gui_id"):
assert bec_shell.startup_cmd == "bec --gui-id test_gui_id"