1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-08 01:37:53 +02:00

Compare commits

..

46 Commits

Author SHA1 Message Date
de5a58a63b feat: add messaging config view 2026-03-20 14:49:16 +01:00
semantic-release
ad01011a3e 3.3.0
Automatically generated by python-semantic-release
2026-03-20 13:18:49 +00:00
d4ecefd80a fix: Fix black 2026 formatting 2026-03-20 14:17:41 +01:00
d4afcb6832 refactor(fuzzy-search): unify is_match for fuzzy search 2026-03-20 14:17:41 +01:00
2b0f575733 refactor(atlas-http-service): Rename AtlasEndpoints 2026-03-20 14:17:41 +01:00
0c6f3f8352 fix(admin-widget): Cleanup and minor improvements 2026-03-20 14:17:41 +01:00
48c9c83bb0 fix(admin_view): minor changes
fixed the styling and avoid shadowing styles
minor rewording
2026-03-20 14:17:41 +01:00
ab223d5fdc refactor: fix formatting, running black 2026.1 2026-03-20 14:17:41 +01:00
137e572a94 fix(admin-view): generate RPC interface for AdminView 2026-03-20 14:17:41 +01:00
b14b046882 fix(main-app): fix id for main-app init of AdminView 2026-03-20 14:17:41 +01:00
a7a9458180 refactor: address review comments 2026-03-20 14:17:41 +01:00
23c146b3e6 fix(bec-atlas-admin-view): Fix connect_slot for dispatcher 2026-03-20 14:17:41 +01:00
df44d9b50e test(bec-atlas-admin-view): complement tests for BECAtlasAdminView, ExperimentSelection, BECAtlasHTTPService 2026-03-20 14:17:41 +01:00
de941d1bc5 fix(actions): allow minimum icon size for actions in toolbar 2026-03-20 14:17:41 +01:00
34e80ee8f9 test(bec-atlas-http-service): add tests for http service 2026-03-20 14:17:41 +01:00
d1a1d85abd fix(login-dialog): remove login_dialog 2026-03-20 14:17:41 +01:00
8e53ae2d39 fix(RPC): fix rpc access 2026-03-20 14:17:41 +01:00
889e9c0994 fix(pyproject): add PyJWT as dependency 2026-03-20 14:17:41 +01:00
f565deb71d fix(main-app): skip on_enter/exit hooks if darkmodebutton clicked 2026-03-20 14:17:41 +01:00
895b318990 refactor: cleanup widgets 2026-03-20 14:17:41 +01:00
3a17a249ed refactor(admin-view): Refactor experiment selection, http service, admin view, and add main view 2026-03-20 14:17:41 +01:00
598c453a18 feat(experiment-selection): add experiment selection widget 2026-03-20 14:17:41 +01:00
63059a4ef8 feat(admin-view): add admin view to views 2026-03-20 14:17:41 +01:00
ec58fbd6d8 feat(bec-atlas-admin-view): Add initial admin view 2026-03-20 14:17:41 +01:00
17708730fc feat(bec-atlas-admin-view): add http service through QNetworkAccessManager 2026-03-20 14:17:41 +01:00
1384a329ab feat(bec-atlas-admin-view): Add login dilaog 2026-03-20 14:17:41 +01:00
semantic-release
da1dc85b44 3.2.4
Automatically generated by python-semantic-release
2026-03-19 17:23:19 +00:00
28be696f7c fix(main_app): setApplicationName("BEC") 2026-03-19 18:22:29 +01:00
semantic-release
008c3a223a 3.2.3
Automatically generated by python-semantic-release
2026-03-16 15:07:09 +00:00
b9145d762c fix: check adding parent for filesystemmodel 2026-03-16 16:06:22 +01:00
37a5dc2e9e fix: refactor client mock with global fakeredis 2026-03-16 16:06:22 +01:00
1351fcd47b ci: fix path for uploading logs on failure 2026-03-16 15:49:22 +01:00
semantic-release
14a6b04b11 3.2.2
Automatically generated by python-semantic-release
2026-03-16 14:28:24 +00:00
4c9d7fddce fix(image): disconnecting of 2d monitor 2026-03-16 15:26:40 +01:00
semantic-release
39ecb89196 3.2.1
Automatically generated by python-semantic-release
2026-03-16 14:08:42 +00:00
974f25997d fix(e2e): bec shell excluded from e2e testing 2026-03-16 15:07:51 +01:00
e061fa31a9 fix(e2e): bec dock rpc fixed synchronization 2026-03-16 15:07:51 +01:00
718f99527c fix(e2e): timeout for maybe_remove_dock_area 2026-03-16 15:07:51 +01:00
semantic-release
bd5aafc052 3.2.0
Automatically generated by python-semantic-release
2026-03-11 20:52:57 +00:00
b4f6f5aa8b feat(waveform): composite DAP with multiple models 2026-03-11 21:52:10 +01:00
14d51b8016 feat(curve, waveform): add dap_parameters for lmfit customization in DAP requests 2026-03-11 21:52:10 +01:00
semantic-release
e94554b471 3.1.4
Automatically generated by python-semantic-release
2026-03-11 11:58:34 +00:00
7e0e391888 build: increased minimal version of bec and bec qthemes 2026-03-11 12:57:40 +01:00
53e5ec42b8 fix(profile_utils): renamed to fetch widgets settings 2026-03-11 12:57:40 +01:00
semantic-release
0e49828a23 3.1.3
Automatically generated by python-semantic-release
2026-03-09 08:46:29 +00:00
278d8de058 fix(monaco_dock): optimization, removal of QTimer, eventFilter replaced by signal/slot 2026-03-09 09:45:40 +01:00
45 changed files with 4443 additions and 277 deletions

View File

@@ -55,5 +55,5 @@ jobs:
uses: actions/upload-artifact@v4
with:
name: pytest-logs
path: ./logs/*.log
path: ./bec/logs/*.log
retention-days: 7

View File

@@ -1,6 +1,168 @@
# CHANGELOG
## v3.3.0 (2026-03-20)
### Bug Fixes
- Fix black 2026 formatting
([`d4ecefd`](https://github.com/bec-project/bec_widgets/commit/d4ecefd80a6ab944b4da51c1ee35e5dea67770f2))
- **actions**: Allow minimum icon size for actions in toolbar
([`de941d1`](https://github.com/bec-project/bec_widgets/commit/de941d1bc565e444f84696b1de046d50b62f3c1b))
- **admin-view**: Generate RPC interface for AdminView
([`137e572`](https://github.com/bec-project/bec_widgets/commit/137e572a942281a9c7478a6be83f815848917e26))
- **admin-widget**: Cleanup and minor improvements
([`0c6f3f8`](https://github.com/bec-project/bec_widgets/commit/0c6f3f8352e7318a4f0579c83066b5b433fd1144))
- **admin_view**: Minor changes
([`48c9c83`](https://github.com/bec-project/bec_widgets/commit/48c9c83bb0c9432905d347b2d2cf46c05e58c098))
- **bec-atlas-admin-view**: Fix connect_slot for dispatcher
([`23c146b`](https://github.com/bec-project/bec_widgets/commit/23c146b3e6bbbabfb35f1892bc8653a65652ae6a))
- **login-dialog**: Remove login_dialog
([`d1a1d85`](https://github.com/bec-project/bec_widgets/commit/d1a1d85abd3331ebab696580c692c69b71482f37))
- **main-app**: Fix id for main-app init of AdminView
([`b14b046`](https://github.com/bec-project/bec_widgets/commit/b14b04688284eb875ea4469765786834e74fceb3))
- **main-app**: Skip on_enter/exit hooks if darkmodebutton clicked
([`f565deb`](https://github.com/bec-project/bec_widgets/commit/f565deb71db8fa5206fa2b4eea436e5055030bbc))
- **pyproject**: Add PyJWT as dependency
([`889e9c0`](https://github.com/bec-project/bec_widgets/commit/889e9c0994a960b93c93143b6dc5845dc96f9f96))
- **RPC**: Fix rpc access
([`8e53ae2`](https://github.com/bec-project/bec_widgets/commit/8e53ae2d3938e9c0a4c11082300156994447faaf))
### Features
- **admin-view**: Add admin view to views
([`63059a4`](https://github.com/bec-project/bec_widgets/commit/63059a4ef897a919f296c68ada066e0b228f8248))
- **bec-atlas-admin-view**: Add http service through QNetworkAccessManager
([`1770873`](https://github.com/bec-project/bec_widgets/commit/17708730fcff41713638c17d0cc1f5d9d0b75122))
- **bec-atlas-admin-view**: Add initial admin view
([`ec58fbd`](https://github.com/bec-project/bec_widgets/commit/ec58fbd6d859058f518b88ba15670a3a715c3cc3))
- **bec-atlas-admin-view**: Add login dilaog
([`1384a32`](https://github.com/bec-project/bec_widgets/commit/1384a329abf873b5496e540a542088c7f13b7270))
- **experiment-selection**: Add experiment selection widget
([`598c453`](https://github.com/bec-project/bec_widgets/commit/598c453a1876cebc2482d55bf6c2728ec247def0))
### Refactoring
- Address review comments
([`a7a9458`](https://github.com/bec-project/bec_widgets/commit/a7a9458180c18bf2bba652c2ff8a68875af36a22))
- Cleanup widgets
([`895b318`](https://github.com/bec-project/bec_widgets/commit/895b3189904778c269200365b264a32ff15dda21))
- Fix formatting, running black 2026.1
([`ab223d5`](https://github.com/bec-project/bec_widgets/commit/ab223d5fdc00b1a7bc9fd61abce5fabe4409654b))
- **admin-view**: Refactor experiment selection, http service, admin view, and add main view
([`3a17a24`](https://github.com/bec-project/bec_widgets/commit/3a17a249ed179fb8a11591f948c7b6338e10a60d))
- **atlas-http-service**: Rename AtlasEndpoints
([`2b0f575`](https://github.com/bec-project/bec_widgets/commit/2b0f575733412a96e54dff2dca15082d64caf7ee))
- **fuzzy-search**: Unify is_match for fuzzy search
([`d4afcb6`](https://github.com/bec-project/bec_widgets/commit/d4afcb68324f63ac8aea7cc3b2c82e79d2e643ca))
### Testing
- **bec-atlas-admin-view**: Complement tests for BECAtlasAdminView, ExperimentSelection,
BECAtlasHTTPService
([`df44d9b`](https://github.com/bec-project/bec_widgets/commit/df44d9b50eb289a7851579c64a2a8c0e2363b06a))
- **bec-atlas-http-service**: Add tests for http service
([`34e80ee`](https://github.com/bec-project/bec_widgets/commit/34e80ee8f9a2b2373c97ae7cde90690ab6fb37ce))
## v3.2.4 (2026-03-19)
### Bug Fixes
- **main_app**: Setapplicationname("bec")
([`28be696`](https://github.com/bec-project/bec_widgets/commit/28be696f7c7d9762c742c6d5fb5b03867d5e92ea))
## v3.2.3 (2026-03-16)
### Bug Fixes
- Check adding parent for filesystemmodel
([`b9145d7`](https://github.com/bec-project/bec_widgets/commit/b9145d762cdf946f184834928a6404f21b4802a9))
- Refactor client mock with global fakeredis
([`37a5dc2`](https://github.com/bec-project/bec_widgets/commit/37a5dc2e9eeb447d174f4d7087051672f308c84c))
### Continuous Integration
- Fix path for uploading logs on failure
([`1351fcd`](https://github.com/bec-project/bec_widgets/commit/1351fcd47b909c1a33cb389c096041eb1449e3d3))
## v3.2.2 (2026-03-16)
### Bug Fixes
- **image**: Disconnecting of 2d monitor
([`4c9d7fd`](https://github.com/bec-project/bec_widgets/commit/4c9d7fddce7aa5b7f13a00ac332bd54b301e3c28))
## v3.2.1 (2026-03-16)
### Bug Fixes
- **e2e**: Bec dock rpc fixed synchronization
([`e061fa3`](https://github.com/bec-project/bec_widgets/commit/e061fa31a9a5e5c00e44337d7cc52c51d8e259b5))
- **e2e**: Bec shell excluded from e2e testing
([`974f259`](https://github.com/bec-project/bec_widgets/commit/974f25997d68d13ff1063026f9e5c4c8dd4d49f3))
- **e2e**: Timeout for maybe_remove_dock_area
([`718f995`](https://github.com/bec-project/bec_widgets/commit/718f99527c3bebb96845d3305aba69434eb83f77))
## v3.2.0 (2026-03-11)
### Features
- **curve, waveform**: Add dap_parameters for lmfit customization in DAP requests
([`14d51b8`](https://github.com/bec-project/bec_widgets/commit/14d51b80169f5a060dd24287f3a6db9a4b41275a))
- **waveform**: Composite DAP with multiple models
([`b4f6f5a`](https://github.com/bec-project/bec_widgets/commit/b4f6f5aa8bcd0f6091610e8f839ea265c87575e0))
## v3.1.4 (2026-03-11)
### Bug Fixes
- **profile_utils**: Renamed to fetch widgets settings
([`53e5ec4`](https://github.com/bec-project/bec_widgets/commit/53e5ec42b8b33397af777f418fbd8601628226a6))
### Build System
- Increased minimal version of bec and bec qthemes
([`7e0e391`](https://github.com/bec-project/bec_widgets/commit/7e0e391888f2ee4e8528ccb3938e36da4c32f146))
## v3.1.3 (2026-03-09)
### Bug Fixes
- **monaco_dock**: Optimization, removal of QTimer, eventFilter replaced by signal/slot
([`278d8de`](https://github.com/bec-project/bec_widgets/commit/278d8de058c2f5c6c9aa7317e1026651d7a4acd3))
## v3.1.2 (2026-03-06)
### Bug Fixes

View File

@@ -5,6 +5,7 @@ from qtpy.QtWidgets import QApplication, QHBoxLayout, QStackedWidget, QWidget
from bec_widgets.applications.navigation_centre.reveal_animator import ANIMATION_DURATION
from bec_widgets.applications.navigation_centre.side_bar import SideBar
from bec_widgets.applications.navigation_centre.side_bar_components import NavigationItem
from bec_widgets.applications.views.admin_view.admin_view import AdminView
from bec_widgets.applications.views.developer_view.developer_view import DeveloperView
from bec_widgets.applications.views.device_manager_view.device_manager_view import DeviceManagerView
from bec_widgets.applications.views.dock_area_view.dock_area_view import DockAreaView
@@ -63,6 +64,8 @@ class BECMainApp(BECMainWindow):
self.dock_area = DockAreaView(self)
self.device_manager = DeviceManagerView(self)
# self.developer_view = DeveloperView(self) #TODO temporary disable until the bugs with BECShell are resolved
self.admin_view = AdminView(self)
self.add_view(icon="widgets", title="Dock Area", widget=self.dock_area, mini_text="Docks")
self.add_view(
icon="display_settings",
@@ -78,6 +81,13 @@ class BECMainApp(BECMainWindow):
# mini_text="IDE",
# exclusive=True,
# )
self.add_view(
icon="admin_panel_settings",
title="Admin View",
widget=self.admin_view,
mini_text="Admin",
from_top=False,
)
if self._show_examples:
self.add_section("Examples", "examples")
@@ -181,6 +191,12 @@ class BECMainApp(BECMainWindow):
# Internal: route sidebar selection to the stack
def _on_view_selected(self, vid: str) -> None:
# Special handling for views that can not be switched to (e.g. dark mode toggle)
# Not registered as proper view with a stack index, so we ignore any logic below
# as it will anyways not result in a stack switch.
idx = self._view_index.get(vid)
if idx is None or not (0 <= idx < self.stack.count()):
return
# Determine current view
current_index = self.stack.currentIndex()
current_view = (
@@ -378,6 +394,7 @@ def main(): # pragma: no cover
args, qt_args = parser.parse_known_args(sys.argv[1:])
app = QApplication([sys.argv[0], *qt_args])
app.setApplicationName("BEC")
apply_theme("dark")
w = BECMainApp(show_examples=args.examples)

View File

@@ -0,0 +1,35 @@
"""Module for Admin View."""
from qtpy.QtWidgets import QWidget
from bec_widgets.applications.views.view import ViewBase
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.services.bec_atlas_admin_view.bec_atlas_admin_view import BECAtlasAdminView
class AdminView(ViewBase):
"""
A view for administrators to change the current active experiment, manage messaging
services, and more tasks reserved for users with admin privileges.
"""
def __init__(
self,
parent: QWidget | None = None,
content: QWidget | None = None,
*,
view_id: str | None = None,
title: str | None = None,
**kwargs,
):
super().__init__(parent=parent, content=content, view_id=view_id, title=title)
self.admin_widget = BECAtlasAdminView(parent=self)
self.set_content(self.admin_widget)
@SafeSlot()
def on_exit(self) -> None:
"""Called before the view is hidden.
Default implementation does nothing. Override in subclasses.
"""
self.admin_widget.logout()

View File

@@ -89,6 +89,16 @@ except ImportError as e:
logger.error(f"Failed loading plugins: \n{reduce(add, traceback.format_exception(e))}")
class AdminView(RPCBase):
"""A view for administrators to change the current active experiment, manage messaging"""
@rpc_call
def activate(self) -> "None":
"""
Switch the parent application to this view.
"""
class AutoUpdates(RPCBase):
@property
@rpc_call
@@ -6249,7 +6259,8 @@ class Waveform(RPCBase):
signal_y: "str | None" = None,
color: "str | None" = None,
label: "str | None" = None,
dap: "str | None" = None,
dap: "str | list[str] | None" = None,
dap_parameters: "dict | list | lmfit.Parameters | None | object" = None,
scan_id: "str | None" = None,
scan_number: "int | None" = None,
**kwargs,
@@ -6271,9 +6282,14 @@ class Waveform(RPCBase):
signal_y(str): The name of the entry for the y-axis.
color(str): The color of the curve.
label(str): The label of the curve.
dap(str): The dap model to use for the curve. When provided, a DAP curve is
dap(str | list[str]): The dap model to use for the curve. When provided, a DAP curve is
attached automatically for device, history, or custom data sources. Use
the same string as the LMFit model name.
the same string as the LMFit model name, or a list of model names to build a composite.
dap_parameters(dict | list | lmfit.Parameters | None): Optional lmfit parameter overrides sent to
the DAP server. For a single model: values can be numeric (interpreted as fixed parameters)
or dicts like `{"value": 1.0, "vary": False}`. For composite models (dap is list), use either
a list aligned to the model list (each item is a param dict), or a dict of
`{ "ModelName": { "param": {...} } }` when model names are unique.
scan_id(str): Optional scan ID. When provided, the curve is treated as a **history** curve and
the ydata (and optional xdata) are fetched from that historical scan. Such curves are
never cleared by livescan resets.
@@ -6287,9 +6303,10 @@ class Waveform(RPCBase):
def add_dap_curve(
self,
device_label: "str",
dap_name: "str",
dap_name: "str | list[str]",
color: "str | None" = None,
dap_oversample: "int" = 1,
dap_parameters: "dict | list | lmfit.Parameters | None" = None,
**kwargs,
) -> "Curve":
"""
@@ -6299,9 +6316,11 @@ class Waveform(RPCBase):
Args:
device_label(str): The label of the source curve to add DAP to.
dap_name(str): The name of the DAP model to use.
dap_name(str | list[str]): The name of the DAP model to use, or a list of model
names to build a composite model.
color(str): The color of the curve.
dap_oversample(int): The oversampling factor for the DAP curve.
dap_parameters(dict | list | lmfit.Parameters | None): Optional lmfit parameter overrides sent to the DAP server.
**kwargs
Returns:

View File

@@ -1,6 +1,7 @@
# pylint: skip-file
from unittest.mock import MagicMock
from bec_lib.config_helper import ConfigHelper
from bec_lib.device import Device as BECDevice
from bec_lib.device import Positioner as BECPositioner
from bec_lib.device import ReadoutPriority
@@ -219,7 +220,9 @@ class Device(FakeDevice):
class DMMock:
def __init__(self):
def __init__(self, *args, **kwargs):
self._service = args[0]
self.config_helper = ConfigHelper(self._service.connector, self._service._service_name)
self.devices = DeviceContainer()
self.enabled_devices = [device for device in self.devices if device.enabled]
@@ -273,6 +276,10 @@ class DMMock:
configs.append(device._config)
return configs
def initialize(*_): ...
def shutdown(self): ...
DEVICES = [
FakePositioner("samx", limits=[-10, 10], read_value=2.0),

View File

@@ -123,17 +123,16 @@ class BECDispatcher:
self._registered_slots: DefaultDict[Hashable, QtThreadSafeCallback] = (
collections.defaultdict()
)
self.client = client
if self.client is None:
if config is not None:
if not isinstance(config, ServiceConfig):
# config is supposed to be a path
config = ServiceConfig(config)
if client is None:
if config is not None and not isinstance(config, ServiceConfig):
# config is supposed to be a path
config = ServiceConfig(config)
self.client = BECClient(
config=config, connector_cls=QtRedisConnector, name="BECWidgets"
)
else:
self.client = client
if self.client.started:
# have to reinitialize client to use proper connector
logger.info("Shutting down BECClient to switch to QtRedisConnector")

View File

@@ -0,0 +1,35 @@
"""Module providing fuzzy search utilities for the BEC widgets."""
from __future__ import annotations
from typing import Any
from thefuzz import fuzz
FUZZY_SEARCH_THRESHOLD = 80
def is_match(
text: str, row_data: dict[str, Any], relevant_keys: list[str], enable_fuzzy: bool
) -> bool:
"""
Check if the text matches any of the relevant keys in the row data.
Args:
text (str): The text to search for.
row_data (dict[str, Any]): The row data to search in.
relevant_keys (list[str]): The keys to consider for searching.
enable_fuzzy (bool): Whether to use fuzzy matching.
Returns:
bool: True if a match is found, False otherwise.
"""
for key in relevant_keys:
data = str(row_data.get(key, "") or "")
if enable_fuzzy:
match_ratio = fuzz.partial_ratio(text.lower(), data.lower())
if match_ratio >= FUZZY_SEARCH_THRESHOLD:
return True
else:
if text.lower() in data.lower():
return True
return False

View File

@@ -35,16 +35,19 @@ logger = bec_logger.logger
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
def create_action_with_text(toolbar_action, toolbar: QToolBar):
def create_action_with_text(toolbar_action, toolbar: QToolBar, min_size: QSize | None = None):
"""
Helper function to create a toolbar button with text beside or under the icon.
Args:
toolbar_action(ToolBarAction): The toolbar action to create the button for.
toolbar(ModularToolBar): The toolbar to add the button to.
min_size(QSize, optional): The minimum size for the button. Defaults to None.
"""
btn = QToolButton(parent=toolbar)
if min_size is not None:
btn.setMinimumSize(min_size)
if getattr(toolbar_action, "label_text", None):
toolbar_action.action.setText(toolbar_action.label_text)
if getattr(toolbar_action, "tooltip", None):

View File

@@ -115,12 +115,12 @@ def _settings_profiles_root() -> str:
str: Absolute path to the profiles root. The directory is created if missing.
"""
client = BECClient()
bec_widgets_settings = client._service_config.config.get("bec_widgets_settings")
bec_widgets_settings = client._service_config.config.get("widgets_settings")
bec_widgets_setting_path = (
bec_widgets_settings.get("base_path") if bec_widgets_settings else None
)
default_path = os.path.join(bec_widgets_setting_path, "profiles")
root = os.environ.get("BECWIDGETS_PROFILE_DIR", default_path)
root = os.path.expanduser(os.environ.get("BECWIDGETS_PROFILE_DIR", default_path))
os.makedirs(root, exist_ok=True)
return root
@@ -138,7 +138,7 @@ def _profiles_dir(segment: str, namespace: str | None) -> str:
"""
base = os.path.join(_settings_profiles_root(), segment)
ns = slugify.slugify(namespace, separator="_") if namespace else None
path = os.path.join(base, ns) if ns else base
path = os.path.expanduser(os.path.join(base, ns) if ns else base)
os.makedirs(path, exist_ok=True)
return path

View File

@@ -63,7 +63,7 @@ class ScriptTreeWidget(QWidget):
layout.setSpacing(0)
# Create tree view
self.tree = QTreeView()
self.tree = QTreeView(parent=self)
self.tree.setHeaderHidden(True)
self.tree.setRootIsDecorated(True)
@@ -71,12 +71,12 @@ class ScriptTreeWidget(QWidget):
self.tree.setMouseTracking(True)
# Create file system model
self.model = QFileSystemModel()
self.model = QFileSystemModel(parent=self)
self.model.setNameFilters(["*.py"])
self.model.setNameFilterDisables(False)
# Create proxy model to filter out underscore directories
self.proxy_model = QSortFilterProxyModel()
self.proxy_model = QSortFilterProxyModel(parent=self)
self.proxy_model.setFilterRegularExpression(QRegularExpression("^[^_].*"))
self.proxy_model.setSourceModel(self.model)
self.tree.setModel(self.proxy_model)

View File

@@ -5,9 +5,8 @@ in DeviceTableRow entries.
from __future__ import annotations
import traceback
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Callable, Iterable, Literal, Tuple
from typing import TYPE_CHECKING, Any, Callable, Iterable, Tuple
from bec_lib.atlas_models import Device as DeviceModel
from bec_lib.callback_handler import EventType
@@ -19,6 +18,7 @@ from thefuzz import fuzz
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.fuzzy_search import is_match
from bec_widgets.widgets.control.device_manager.components.device_table.device_table_row import (
DeviceTableRow,
)
@@ -37,34 +37,6 @@ _DeviceCfgIter = Iterable[dict[str, Any]]
# DeviceValidationResult: device_config, config_status, connection_status, error_message
_ValidationResultIter = Iterable[Tuple[dict[str, Any], ConfigStatus, ConnectionStatus, str]]
FUZZY_SEARCH_THRESHOLD = 80
def is_match(
text: str, row_data: dict[str, Any], relevant_keys: list[str], enable_fuzzy: bool
) -> bool:
"""
Check if the text matches any of the relevant keys in the row data.
Args:
text (str): The text to search for.
row_data (dict[str, Any]): The row data to search in.
relevant_keys (list[str]): The keys to consider for searching.
enable_fuzzy (bool): Whether to use fuzzy matching.
Returns:
bool: True if a match is found, False otherwise.
"""
for key in relevant_keys:
data = str(row_data.get(key, "") or "")
if enable_fuzzy:
match_ratio = fuzz.partial_ratio(text.lower(), data.lower())
if match_ratio >= FUZZY_SEARCH_THRESHOLD:
return True
else:
if text.lower() in data.lower():
return True
return False
class TableSortOnHold:
"""Context manager for putting table sorting on hold. Works with nested calls."""

View File

@@ -6,7 +6,7 @@ from typing import Any, cast
from bec_lib.logger import bec_logger
from bec_lib.macro_update_handler import has_executable_code
from qtpy.QtCore import QEvent, QTimer, Signal
from qtpy.QtCore import Signal
from qtpy.QtWidgets import QFileDialog, QMessageBox, QToolButton, QWidget
from bec_widgets.widgets.containers.dock_area.basic_dock_area import DockAreaWidget
@@ -36,12 +36,12 @@ class MonacoDock(DockAreaWidget):
**kwargs,
)
self.dock_manager.focusedDockWidgetChanged.connect(self._on_focus_event)
self.dock_manager.installEventFilter(self)
self._last_focused_editor: CDockWidget | None = None
self.focused_editor.connect(self._on_last_focused_editor_changed)
initial_editor = self.add_editor()
if isinstance(initial_editor, CDockWidget):
self.last_focused_editor = initial_editor
self._install_manager_scan_and_fix_guards()
def _create_editor_widget(self) -> MonacoWidget:
"""Create a configured Monaco editor widget."""
@@ -73,7 +73,8 @@ class MonacoDock(DockAreaWidget):
logger.info(f"Editor '{widget.current_file}' has unsaved changes: {widget.get_text()}")
self.save_enabled.emit(widget.modified)
def _update_tab_title_for_modification(self, dock: CDockWidget, modified: bool):
@staticmethod
def _update_tab_title_for_modification(dock: CDockWidget, modified: bool):
"""Update the tab title to show modification status with a dot indicator."""
current_title = dock.windowTitle()
@@ -98,14 +99,12 @@ class MonacoDock(DockAreaWidget):
return
active_sig = signatures[signature.get("activeSignature", 0)]
active_param = signature.get("activeParameter", 0) # TODO: Add highlight for active_param
# Get signature label and documentation
label = active_sig.get("label", "")
doc_obj = active_sig.get("documentation", {})
documentation = doc_obj.get("value", "") if isinstance(doc_obj, dict) else str(doc_obj)
# Format the markdown output
# Format the Markdown output
markdown = f"```python\n{label}\n```\n\n{documentation}"
self.signature_help.emit(markdown)
@@ -156,9 +155,10 @@ class MonacoDock(DockAreaWidget):
if self.last_focused_editor is dock:
self.last_focused_editor = None
# After topology changes, make sure single-tab areas get a plus button
QTimer.singleShot(0, self._scan_and_fix_areas)
self._scan_and_fix_areas()
def reset_widget(self, widget: MonacoWidget):
@staticmethod
def reset_widget(widget: MonacoWidget):
"""
Reset the given Monaco editor widget to its initial state.
@@ -193,23 +193,23 @@ class MonacoDock(DockAreaWidget):
# pylint: disable=protected-access
area._monaco_plus_btn = plus_btn
def _scan_and_fix_areas(self):
def _install_manager_scan_and_fix_guards(self) -> None:
"""
Track ADS structural changes to trigger scan and fix of dock areas for plus button injection.
"""
self.dock_manager.dockAreaCreated.connect(self._scan_and_fix_areas)
self.dock_manager.dockWidgetAdded.connect(self._scan_and_fix_areas)
self.dock_manager.stateRestored.connect(self._scan_and_fix_areas)
self.dock_manager.restoringState.connect(self._scan_and_fix_areas)
self.dock_manager.focusedDockWidgetChanged.connect(self._scan_and_fix_areas)
self._scan_and_fix_areas()
def _scan_and_fix_areas(self, *_arg):
# Find all dock areas under this manager and ensure each single-tab area has a plus button
areas = self.dock_manager.findChildren(CDockAreaWidget)
for a in areas:
self._ensure_area_plus(a)
def eventFilter(self, obj, event):
# Track dock manager events
if obj is self.dock_manager and event.type() in (
QEvent.Type.ChildAdded,
QEvent.Type.ChildRemoved,
QEvent.Type.LayoutRequest,
):
QTimer.singleShot(0, self._scan_and_fix_areas)
return super().eventFilter(obj, event)
def add_editor(
self, area: Any | None = None, title: str | None = None, tooltip: str | None = None
) -> CDockWidget:
@@ -258,7 +258,7 @@ class MonacoDock(DockAreaWidget):
if area_widget is not None:
self._ensure_area_plus(area_widget)
QTimer.singleShot(0, self._scan_and_fix_areas)
self._scan_and_fix_areas()
self.last_focused_editor = dock
return dock

View File

@@ -270,6 +270,16 @@ class Image(ImageBase):
return
old_device = self._config.device
old_signal = self._config.signal
old_config = self.subscriptions["main"]
if old_device and old_signal and old_device != value:
self._disconnect_monitor_subscription(
device=old_device,
signal=old_signal,
source=old_config.source,
async_update=self.async_update,
async_signal_name=old_config.async_signal_name,
)
self._config.device = value
# If we have a signal, reconnect with the new device
@@ -325,6 +335,16 @@ class Image(ImageBase):
self._set_connection_status("disconnected")
return
old_signal = self._config.signal
old_config = self.subscriptions["main"]
if self._config.device and old_signal and old_signal != value:
self._disconnect_monitor_subscription(
device=self._config.device,
signal=old_signal,
source=old_config.source,
async_update=self.async_update,
async_signal_name=old_config.async_signal_name,
)
self._config.signal = value
# If we have a device, try to connect
@@ -447,6 +467,61 @@ class Image(ImageBase):
)
self._autorange_on_next_update = True
def _disconnect_monitor_subscription(
self,
*,
device: str,
signal: str,
source: Literal["device_monitor_1d", "device_monitor_2d"] | None,
async_update: bool,
async_signal_name: str | None,
) -> None:
if not device or not signal:
return
if async_update:
async_signal_name = async_signal_name or signal
ids_to_check = [self.scan_id, self.old_scan_id]
if source == "device_monitor_1d":
for scan_id in ids_to_check:
if scan_id is None:
continue
self.bec_dispatcher.disconnect_slot(
self.on_image_update_1d,
MessageEndpoints.device_async_signal(scan_id, device, async_signal_name),
)
logger.info(
f"Disconnecting 1d update ScanID:{scan_id}, Device Name:{device},Device Entry:{async_signal_name}"
)
elif source == "device_monitor_2d":
for scan_id in ids_to_check:
if scan_id is None:
continue
self.bec_dispatcher.disconnect_slot(
self.on_image_update_2d,
MessageEndpoints.device_async_signal(scan_id, device, async_signal_name),
)
logger.info(
f"Disconnecting 2d update ScanID:{scan_id}, Device Name:{device},Device Entry:{async_signal_name}"
)
return
if source == "device_monitor_1d":
self.bec_dispatcher.disconnect_slot(
self.on_image_update_1d, MessageEndpoints.device_preview(device, signal)
)
logger.info(
f"Disconnecting preview 1d update Device Name:{device}, Device Entry:{signal}"
)
elif source == "device_monitor_2d":
self.bec_dispatcher.disconnect_slot(
self.on_image_update_2d, MessageEndpoints.device_preview(device, signal)
)
logger.info(
f"Disconnecting preview 2d update Device Name:{device}, Device Entry:{signal}"
)
def _disconnect_current_monitor(self):
"""
Internal method to disconnect the current monitor subscriptions.
@@ -455,55 +530,13 @@ class Image(ImageBase):
return
config = self.subscriptions["main"]
if self.async_update:
async_signal_name = config.async_signal_name or self._config.signal
ids_to_check = [self.scan_id, self.old_scan_id]
if config.source == "device_monitor_1d":
for scan_id in ids_to_check:
if scan_id is None:
continue
self.bec_dispatcher.disconnect_slot(
self.on_image_update_1d,
MessageEndpoints.device_async_signal(
scan_id, self._config.device, async_signal_name
),
)
logger.info(
f"Disconnecting 1d update ScanID:{scan_id}, Device Name:{self._config.device},Device Entry:{async_signal_name}"
)
elif config.source == "device_monitor_2d":
for scan_id in ids_to_check:
if scan_id is None:
continue
self.bec_dispatcher.disconnect_slot(
self.on_image_update_2d,
MessageEndpoints.device_async_signal(
scan_id, self._config.device, async_signal_name
),
)
logger.info(
f"Disconnecting 2d update ScanID:{scan_id}, Device Name:{self._config.device},Device Entry:{async_signal_name}"
)
else:
if config.source == "device_monitor_1d":
self.bec_dispatcher.disconnect_slot(
self.on_image_update_1d,
MessageEndpoints.device_preview(self._config.device, self._config.signal),
)
logger.info(
f"Disconnecting preview 1d update Device Name:{self._config.device}, Device Entry:{self._config.signal}"
)
elif config.source == "device_monitor_2d":
self.bec_dispatcher.disconnect_slot(
self.on_image_update_2d,
MessageEndpoints.device_preview(self._config.device, self._config.signal),
)
logger.info(
f"Disconnecting preview 2d update Device Name:{self._config.device}, Device Entry:{self._config.signal}"
)
self._disconnect_monitor_subscription(
device=self._config.device,
signal=self._config.signal,
source=config.source,
async_update=self.async_update,
async_signal_name=config.async_signal_name,
)
# Reset async state
self.async_update = False
@@ -860,45 +893,19 @@ class Image(ImageBase):
logger.warning("Cannot disconnect monitor without both device and signal")
return
if self.async_update:
async_signal_name = config.async_signal_name or target_entry
ids_to_check = [self.scan_id, self.old_scan_id]
if config.source == "device_monitor_1d":
for scan_id in ids_to_check:
if scan_id is None:
continue
self.bec_dispatcher.disconnect_slot(
self.on_image_update_1d,
MessageEndpoints.device_async_signal(
scan_id, target_device, async_signal_name
),
)
elif config.source == "device_monitor_2d":
for scan_id in ids_to_check:
if scan_id is None:
continue
self.bec_dispatcher.disconnect_slot(
self.on_image_update_2d,
MessageEndpoints.device_async_signal(
scan_id, target_device, async_signal_name
),
)
else:
if config.source == "device_monitor_1d":
self.bec_dispatcher.disconnect_slot(
self.on_image_update_1d,
MessageEndpoints.device_preview(target_device, target_entry),
)
elif config.source == "device_monitor_2d":
self.bec_dispatcher.disconnect_slot(
self.on_image_update_2d,
MessageEndpoints.device_preview(target_device, target_entry),
)
else:
logger.warning(
f"Cannot disconnect monitor {target_device}.{target_entry} with source {self.subscriptions['main'].source}"
)
return
if config.source not in {"device_monitor_1d", "device_monitor_2d"}:
logger.warning(
f"Cannot disconnect monitor {target_device}.{target_entry} with source {self.subscriptions['main'].source}"
)
return
self._disconnect_monitor_subscription(
device=target_device,
signal=target_entry,
source=config.source,
async_update=self.async_update,
async_signal_name=config.async_signal_name,
)
self.subscriptions["main"].async_signal_name = None
self.async_update = False

View File

@@ -22,8 +22,9 @@ class DeviceSignal(BaseModel):
device: str
signal: str
dap: str | None = None
dap: str | list[str] | None = None
dap_oversample: int = 1
dap_parameters: dict | list | None = None
model_config: dict = {"validate_assignment": True}

View File

@@ -1,13 +1,13 @@
from __future__ import annotations
import json
from typing import Literal
from typing import TYPE_CHECKING, Literal
import lmfit
import numpy as np
import pyqtgraph as pg
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.lmfit_serializer import serialize_lmfit_params, serialize_param_object
from bec_lib.scan_data_container import ScanDataContainer
from pydantic import Field, ValidationError, field_validator
from qtpy.QtCore import Qt, QTimer, Signal
@@ -41,6 +41,18 @@ from bec_widgets.widgets.services.scan_history_browser.scan_history_browser impo
)
logger = bec_logger.logger
_DAP_PARAM = object()
if TYPE_CHECKING: # pragma: no cover
import lmfit # type: ignore
else:
try:
import lmfit # type: ignore
except Exception as e: # pragma: no cover
logger.warning(
f"lmfit could not be imported: {e}. Custom DAP functionality will be unavailable."
)
lmfit = None
# noinspection PyDataclass
@@ -696,7 +708,8 @@ class Waveform(PlotBase):
signal_y: str | None = None,
color: str | None = None,
label: str | None = None,
dap: str | None = None,
dap: str | list[str] | None = None,
dap_parameters: dict | list | lmfit.Parameters | None | object = None,
scan_id: str | None = None,
scan_number: int | None = None,
**kwargs,
@@ -718,9 +731,14 @@ class Waveform(PlotBase):
signal_y(str): The name of the entry for the y-axis.
color(str): The color of the curve.
label(str): The label of the curve.
dap(str): The dap model to use for the curve. When provided, a DAP curve is
dap(str | list[str]): The dap model to use for the curve. When provided, a DAP curve is
attached automatically for device, history, or custom data sources. Use
the same string as the LMFit model name.
the same string as the LMFit model name, or a list of model names to build a composite.
dap_parameters(dict | list | lmfit.Parameters | None): Optional lmfit parameter overrides sent to
the DAP server. For a single model: values can be numeric (interpreted as fixed parameters)
or dicts like `{"value": 1.0, "vary": False}`. For composite models (dap is list), use either
a list aligned to the model list (each item is a param dict), or a dict of
`{ "ModelName": { "param": {...} } }` when model names are unique.
scan_id(str): Optional scan ID. When provided, the curve is treated as a **history** curve and
the ydata (and optional xdata) are fetched from that historical scan. Such curves are
never cleared by livescan resets.
@@ -733,6 +751,8 @@ class Waveform(PlotBase):
source = "custom"
x_data = None
y_data = None
if dap_parameters is _DAP_PARAM:
dap_parameters = kwargs.pop("dap_parameters", None) or kwargs.pop("parameters", None)
# 1. Custom curve logic
if x is not None and y is not None:
@@ -810,7 +830,9 @@ class Waveform(PlotBase):
curve = self._add_curve(config=config, x_data=x_data, y_data=y_data)
if dap is not None and curve.config.source in ("device", "history", "custom"):
self.add_dap_curve(device_label=curve.name(), dap_name=dap, **kwargs)
self.add_dap_curve(
device_label=curve.name(), dap_name=dap, dap_parameters=dap_parameters, **kwargs
)
return curve
@@ -820,9 +842,10 @@ class Waveform(PlotBase):
def add_dap_curve(
self,
device_label: str,
dap_name: str,
dap_name: str | list[str],
color: str | None = None,
dap_oversample: int = 1,
dap_parameters: dict | list | lmfit.Parameters | None = None,
**kwargs,
) -> Curve:
"""
@@ -832,9 +855,11 @@ class Waveform(PlotBase):
Args:
device_label(str): The label of the source curve to add DAP to.
dap_name(str): The name of the DAP model to use.
dap_name(str | list[str]): The name of the DAP model to use, or a list of model
names to build a composite model.
color(str): The color of the curve.
dap_oversample(int): The oversampling factor for the DAP curve.
dap_parameters(dict | list | lmfit.Parameters | None): Optional lmfit parameter overrides sent to the DAP server.
**kwargs
Returns:
@@ -859,7 +884,7 @@ class Waveform(PlotBase):
dev_entry = "custom"
# 2) Build a label for the new DAP curve
dap_label = f"{device_label}-{dap_name}"
dap_label = f"{device_label}-{self._format_dap_label(dap_name)}"
# 3) Possibly raise if the DAP curve already exists
if self._check_curve_id(dap_label):
@@ -882,7 +907,11 @@ class Waveform(PlotBase):
# Attach device signal with DAP
config.signal = DeviceSignal(
device=dev_name, signal=dev_entry, dap=dap_name, dap_oversample=dap_oversample
device=dev_name,
signal=dev_entry,
dap=dap_name,
dap_oversample=dap_oversample,
dap_parameters=self._normalize_dap_parameters(dap_parameters, dap_name=dap_name),
)
# 4) Create the DAP curve config using `_add_curve(...)`
@@ -1754,7 +1783,9 @@ class Waveform(PlotBase):
x_data, y_data = parent_curve.get_data()
model_name = dap_curve.config.signal.dap
model = getattr(self.dap, model_name)
model = None
if not isinstance(model_name, (list, tuple)):
model = getattr(self.dap, model_name)
try:
x_min, x_max = self.roi_region
x_data, y_data = self._crop_data(x_data, y_data, x_min, x_max)
@@ -1762,20 +1793,132 @@ class Waveform(PlotBase):
x_min = None
x_max = None
dap_parameters = getattr(dap_curve.config.signal, "dap_parameters", None)
dap_kwargs = {
"data_x": x_data,
"data_y": y_data,
"oversample": dap_curve.dap_oversample,
}
if dap_parameters:
dap_kwargs["parameters"] = dap_parameters
if model is not None:
class_args = model._plugin_info["class_args"]
class_kwargs = model._plugin_info["class_kwargs"]
else:
class_args = []
class_kwargs = {"model": model_name}
msg = messages.DAPRequestMessage(
dap_cls="LmfitService1D",
dap_type="on_demand",
config={
"args": [],
"kwargs": {"data_x": x_data, "data_y": y_data},
"class_args": model._plugin_info["class_args"],
"class_kwargs": model._plugin_info["class_kwargs"],
"kwargs": dap_kwargs,
"class_args": class_args,
"class_kwargs": class_kwargs,
"curve_label": dap_curve.name(),
},
metadata={"RID": f"{self.scan_id}-{self.gui_id}"},
)
self.client.connector.set_and_publish(MessageEndpoints.dap_request(), msg)
@staticmethod
def _normalize_dap_parameters(
parameters: dict | list | lmfit.Parameters | None, dap_name: str | list[str] | None = None
) -> dict | list | None:
"""
Normalize user-provided lmfit parameters into a JSON-serializable dict suitable for the DAP server.
Supports:
- `lmfit.Parameters` (single-model only)
- `dict[name -> number]` (treated as fixed parameter with `vary=False`)
- `dict[name -> dict]` (lmfit.Parameter fields; defaults to `vary=False` if unspecified)
- `dict[name -> lmfit.Parameter]`
- composite: `list[dict[param_name -> spec]]` aligned to model list
- composite: `dict[model_name -> dict[param_name -> spec]]` (unique model names only)
"""
if parameters is None:
return None
if isinstance(dap_name, (list, tuple)):
if lmfit is not None and isinstance(parameters, lmfit.Parameters):
raise TypeError("dap_parameters must be a dict when using composite dap models.")
if isinstance(parameters, (list, tuple)):
normalized_list: list[dict | None] = []
for idx, item in enumerate(parameters):
if item is None:
normalized_list.append(None)
continue
if not isinstance(item, dict):
raise TypeError(
f"dap_parameters list item {idx} must be a dict of parameter overrides."
)
normalized_list.append(Waveform._normalize_param_overrides(item))
return normalized_list or None
if not isinstance(parameters, dict):
raise TypeError(
"dap_parameters must be a dict of model->params when using composite dap models."
)
model_names = set(dap_name)
invalid_models = set(parameters.keys()) - model_names
if invalid_models:
raise TypeError(
f"Invalid dap_parameters keys for composite model: {sorted(invalid_models)}"
)
normalized_composite: dict[str, dict] = {}
for model_name in dap_name:
model_params = parameters.get(model_name)
if model_params is None:
continue
if not isinstance(model_params, dict):
raise TypeError(
f"dap_parameters for '{model_name}' must be a dict of parameter overrides."
)
normalized = Waveform._normalize_param_overrides(model_params)
if normalized:
normalized_composite[model_name] = normalized
return normalized_composite or None
if lmfit is not None and isinstance(parameters, lmfit.Parameters):
return serialize_lmfit_params(parameters)
if not isinstance(parameters, dict):
if lmfit is None:
raise TypeError(
"dap_parameters must be a dict when lmfit is not installed on the client."
)
raise TypeError("dap_parameters must be a dict or lmfit.Parameters (or omitted).")
return Waveform._normalize_param_overrides(parameters)
@staticmethod
def _normalize_param_overrides(parameters: dict) -> dict | None:
normalized: dict[str, dict] = {}
for name, spec in parameters.items():
if spec is None:
continue
if isinstance(spec, (int, float, np.number)):
normalized[name] = {"name": name, "value": float(spec), "vary": False}
continue
if lmfit is not None and isinstance(spec, lmfit.Parameter):
normalized[name] = serialize_param_object(spec)
continue
if isinstance(spec, dict):
normalized[name] = {"name": name, **spec}
if "vary" not in normalized[name]:
normalized[name]["vary"] = False
continue
raise TypeError(
f"Invalid dap_parameters entry for '{name}': expected number, dict, or lmfit.Parameter."
)
return normalized or None
@staticmethod
def _format_dap_label(dap_name: str | list[str]) -> str:
if isinstance(dap_name, (list, tuple)):
return "+".join(dap_name)
return dap_name
@SafeSlot(dict, dict)
def update_dap_curves(self, msg, metadata):
"""
@@ -1793,14 +1936,6 @@ class Waveform(PlotBase):
if not curve:
return
# Get data from the parent (device) curve
parent_curve = self._find_curve_by_label(curve.config.parent_label)
if parent_curve is None:
return
x_parent, _ = parent_curve.get_data()
if x_parent is None or len(x_parent) == 0:
return
# Retrieve and store the fit parameters and summary from the DAP server response
try:
curve.dap_params = msg["data"][1]["fit_parameters"]
@@ -1809,19 +1944,13 @@ class Waveform(PlotBase):
logger.warning(f"Failed to retrieve DAP data for curve '{curve.name()}'")
return
# Render model according to the DAP model name and parameters
model_name = curve.config.signal.dap
model_function = getattr(lmfit.models, model_name)()
x_min, x_max = x_parent.min(), x_parent.max()
oversample = curve.dap_oversample
new_x = np.linspace(x_min, x_max, int(len(x_parent) * oversample))
# Evaluate the model with the provided parameters to generate the y values
new_y = model_function.eval(**curve.dap_params, x=new_x)
# Update the curve with the new data
curve.setData(new_x, new_y)
# Plot the fitted curve using the server-provided output to avoid requiring lmfit on the client.
try:
fit_data = msg["data"][0]
curve.setData(np.asarray(fit_data["x"]), np.asarray(fit_data["y"]))
except Exception as e:
logger.exception(f"Failed to plot DAP result for curve '{curve.name()}', error: {e}")
return
metadata.update({"curve_id": curve_id})
self.dap_params_update.emit(curve.dap_params, metadata)
@@ -2341,24 +2470,20 @@ class DemoApp(QMainWindow): # pragma: no cover
def __init__(self):
super().__init__()
self.setWindowTitle("Waveform Demo")
self.resize(1200, 600)
self.resize(1600, 600)
self.main_widget = QWidget(self)
self.layout = QHBoxLayout(self.main_widget)
self.setCentralWidget(self.main_widget)
self.waveform_popup = Waveform(popups=True)
self.waveform_popup.plot(device_y="waveform")
self.waveform_side = Waveform(popups=False)
self.waveform_side.plot(device_y="bpm4i", signal_y="bpm4i", dap="GaussianModel")
self.waveform_side.plot(device_y="bpm3a", signal_y="bpm3a")
self.custom_waveform = Waveform(popups=True)
self._populate_custom_curve_demo()
self.layout.addWidget(self.waveform_side)
self.layout.addWidget(self.waveform_popup)
self.sine_waveform = Waveform(popups=True)
self.sine_waveform.dap_params_update.connect(self._log_sine_dap_params)
self._populate_sine_curve_demo()
self.layout.addWidget(self.custom_waveform)
self.layout.addWidget(self.sine_waveform)
def _populate_custom_curve_demo(self):
"""
@@ -2377,8 +2502,141 @@ class DemoApp(QMainWindow): # pragma: no cover
sigma = 0.8
y = amplitude * np.exp(-((x - center) ** 2) / (2 * sigma**2)) + noise
# 1) No explicit parameters: server will use lmfit defaults/guesses.
self.custom_waveform.plot(x=x, y=y, label="custom-gaussian", dap="GaussianModel")
# 2) Easy dict: numbers mean "fix this parameter to value" (vary=False).
self.custom_waveform.plot(
x=x,
y=y,
label="custom-gaussian-fixed-easy",
dap="GaussianModel",
dap_parameters={"amplitude": 1.0},
dap_oversample=5,
)
# 3) Partial parameter override: this should still trigger guessing on the server
# because not all Gaussian parameters are explicitly specified.
self.custom_waveform.plot(
x=x,
y=y,
label="custom-gaussian-partial-guess",
dap="GaussianModel",
dap_parameters={
"center": {"value": 1.2, "vary": True},
"sigma": {"value": sigma, "vary": False, "min": 0.0},
},
)
# 4) Complete parameter override: this should skip guessing on the server.
if lmfit is not None:
params_gauss = lmfit.models.GaussianModel().make_params()
params_gauss["amplitude"].set(value=amplitude, vary=False)
params_gauss["center"].set(value=center, vary=False)
params_gauss["sigma"].set(value=sigma, vary=False, min=0.0)
self.custom_waveform.plot(
x=x,
y=y,
label="custom-gaussian-complete-no-guess",
dap="GaussianModel",
dap_parameters=params_gauss,
)
else:
logger.info("Skipping lmfit.Parameters demo (lmfit not installed on client).")
# Composite example: spectrum with three Gaussians (DAP-only)
x_spec = np.linspace(-5, 5, 800)
rng_spec = np.random.default_rng(123)
centers = [-2.0, 0.6, 2.4]
amplitudes = [2.5, 3.2, 1.8]
sigmas = [0.35, 0.5, 0.3]
y_spec = (
amplitudes[0] * np.exp(-((x_spec - centers[0]) ** 2) / (2 * sigmas[0] ** 2))
+ amplitudes[1] * np.exp(-((x_spec - centers[1]) ** 2) / (2 * sigmas[1] ** 2))
+ amplitudes[2] * np.exp(-((x_spec - centers[2]) ** 2) / (2 * sigmas[2] ** 2))
+ rng_spec.normal(loc=0, scale=0.06, size=x_spec.size)
)
# 5) Composite model with partial overrides only: this should still trigger guessing.
self.custom_waveform.plot(
x=x_spec,
y=y_spec,
label="custom-gaussian-spectrum-partial-guess",
dap=["GaussianModel", "GaussianModel", "GaussianModel"],
dap_parameters=[
{"center": {"value": centers[0], "vary": False}},
{"center": {"value": centers[1], "vary": False}},
{"center": {"value": centers[2], "vary": False}},
],
)
# 6) Composite model with all component parameters specified: this should skip guessing.
self.custom_waveform.plot(
x=x_spec,
y=y_spec,
label="custom-gaussian-spectrum-complete-no-guess",
dap=["GaussianModel", "GaussianModel", "GaussianModel"],
dap_parameters=[
{
"amplitude": {"value": amplitudes[0], "vary": False},
"center": {"value": centers[0], "vary": False},
"sigma": {"value": sigmas[0], "vary": False, "min": 0.0},
},
{
"amplitude": {"value": amplitudes[1], "vary": False},
"center": {"value": centers[1], "vary": False},
"sigma": {"value": sigmas[1], "vary": False, "min": 0.0},
},
{
"amplitude": {"value": amplitudes[2], "vary": False},
"center": {"value": centers[2], "vary": False},
"sigma": {"value": sigmas[2], "vary": False, "min": 0.0},
},
],
)
def _populate_sine_curve_demo(self):
"""
Showcase how lmfit's base SineModel can struggle with a drifting baseline.
"""
x = np.linspace(0, 6 * np.pi, 600)
rng = np.random.default_rng(7)
amplitude = 1.6
frequency = 0.75
phase = 0.4
offset = 0.8
slope = 0.08
noise = rng.normal(loc=0, scale=0.12, size=x.size)
y = offset + slope * x + amplitude * np.sin(2 * np.pi * frequency * x + phase) + noise
# Base SineModel (no offset support) to show the mismatch
self.sine_waveform.plot(x=x, y=y, label="custom-sine-data", dap="SineModel")
# Composite model: Sine + Linear baseline (offset + slope)
self.sine_waveform.plot(
x=x,
y=y,
label="custom-sine-composite",
dap=["SineModel", "LinearModel"],
dap_oversample=4,
)
if lmfit is None:
logger.info("Skipping sine lmfit demo (lmfit not installed on client).")
return
return
@staticmethod
def _log_sine_dap_params(params: dict, metadata: dict):
curve_id = metadata.get("curve_id")
if curve_id not in {
"custom-sine-data-SineModel",
"custom-sine-composite-SineModel+LinearModel",
}:
return
logger.info(f"SineModel DAP fit params ({curve_id}): {params}")
if __name__ == "__main__": # pragma: no cover
import sys

View File

@@ -0,0 +1,618 @@
"""Admin View panel for setting up account and messaging services in BEC."""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.messages import DeploymentInfoMessage, ExperimentInfoMessage
from qtpy.QtCore import QSize, Qt, QTimer, Signal
from qtpy.QtWidgets import (
QFrame,
QGroupBox,
QHBoxLayout,
QLabel,
QSizePolicy,
QStackedLayout,
QVBoxLayout,
QWidget,
)
from bec_widgets.utils.bec_login import BECLogin
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.actions import (
MaterialIconAction,
WidgetAction,
create_action_with_text,
)
from bec_widgets.utils.toolbars.bundles import ToolbarBundle
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.widgets.services.bec_atlas_admin_view.bec_atlas_http_service import (
AtlasEndpoints,
AuthenticatedUserInfo,
BECAtlasHTTPService,
HTTPResponse,
)
from bec_widgets.widgets.services.bec_atlas_admin_view.experiment_selection.experiment_mat_card import (
ExperimentMatCard,
)
from bec_widgets.widgets.services.bec_atlas_admin_view.experiment_selection.experiment_selection import (
ExperimentSelection,
)
from bec_widgets.widgets.services.bec_messaging_config.bec_messaging_config_widget import (
BECMessagingConfigWidget,
)
if TYPE_CHECKING: # pragma: no cover
from qtpy.QtWidgets import QToolBar
logger = bec_logger.logger
class OverviewWidget(QGroupBox):
"""Overview Widget for the BEC Atlas Admin view"""
login_requested = Signal(str, str)
change_experiment_requested = Signal()
def __init__(self, parent=None):
super().__init__(parent=parent)
self.setContentsMargins(12, 0, 12, 6)
self._authenticated = False
# Root layout
self.root_layout = QVBoxLayout(self)
self.root_layout.setContentsMargins(0, 0, 0, 0)
self.root_layout.setSpacing(0)
# Stacked Layout to switch between login form and overview content
self.stacked_layout = QStackedLayout()
self.stacked_layout.setContentsMargins(0, 0, 0, 0)
self.stacked_layout.setSpacing(0)
self.stacked_layout.setStackingMode(QStackedLayout.StackingMode.StackAll)
self.root_layout.addLayout(self.stacked_layout)
self._init_login_view()
self._init_experiment_overview()
self.stacked_layout.setCurrentWidget(self._login_widget)
self._experiment_overview_widget.setVisible(False)
def set_experiment_info(self, experiment_info: ExperimentInfoMessage):
"""Set the experiment information for the overview widget."""
self._experiment_overview_widget.set_experiment_info(experiment_info)
@SafeSlot(bool)
def set_authenticated(self, authenticated: bool):
"""Set the authentication state of the overview widget."""
self._authenticated = authenticated
if authenticated:
self.stacked_layout.setCurrentWidget(self._experiment_overview_widget)
self._experiment_overview_widget.setVisible(True)
else:
self.stacked_layout.setCurrentWidget(self._login_widget)
self._experiment_overview_widget.setVisible(False)
def _init_login_view(self):
"""Initialize the login view."""
self._login_widget = QWidget()
layout = QHBoxLayout(self._login_widget)
self._login_widget.setAutoFillBackground(True)
self._login_widget.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
layout.setSpacing(16)
content = QFrame()
content_layout = QVBoxLayout(content)
content.setFrameShape(QFrame.Shape.StyledPanel)
content.setFrameShadow(QFrame.Shadow.Raised)
content.setStyleSheet(
"""
QFrame
{
border: 1px solid #cccccc;
}
QLabel
{
border: none;
}
"""
)
content_layout.setAlignment(Qt.AlignmentFlag.AlignCenter)
content.setFixedSize(400, 280)
self._login = BECLogin(parent=self)
self._login.credentials_entered.connect(self.login_requested.emit)
content_layout.addWidget(self._login)
layout.addWidget(content)
self.stacked_layout.addWidget(self._login_widget)
def _init_experiment_overview(self):
"""Initialize the experiment overview content."""
self._experiment_overview_widget = ExperimentMatCard(
show_activate_button=True,
parent=self,
title="Current Experiment",
button_text="Change Experiment",
)
self._experiment_overview_widget.experiment_selected.connect(self._on_experiment_selected)
layout = QVBoxLayout(self._experiment_overview_widget)
self._experiment_overview_widget.setAutoFillBackground(True)
self._experiment_overview_widget.setSizePolicy(
QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding
)
layout.setSpacing(16)
layout.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.stacked_layout.addWidget(self._experiment_overview_widget)
@SafeSlot(dict)
def _on_experiment_selected(self, _):
"""Handle the change experiment button click."""
self.change_experiment_requested.emit()
class CustomLogoutAction(MaterialIconAction):
"""Custom logout action that can be enabled/disabled based on authentication state."""
def __init__(self, parent=None):
super().__init__(
icon_name="logout",
tooltip="Logout",
label_text="Logout",
text_position="under",
parent=parent,
filled=True,
)
self.action.setEnabled(False) # Initially disabled until authenticated
self._tick_timer = QTimer(parent)
self._tick_timer.setInterval(1000)
self._tick_timer.timeout.connect(self._on_tick)
self._login_remaining_s = 0
def add_to_toolbar(self, toolbar: QToolBar, target: QWidget):
"""
Adds the action to the toolbar.
Args:
toolbar(QToolBar): The toolbar to add the action to.
target(QWidget): The target widget for the action.
"""
create_action_with_text(toolbar_action=self, toolbar=toolbar, min_size=QSize(70, 40))
def set_authenticated(self, auth_info: AuthenticatedUserInfo | None):
"""Enable or disable the logout action based on authentication state."""
if not auth_info:
self._tick_timer.stop()
self._login_remaining_s = 0
self.action.setEnabled(False)
self.update_label() # Reset Label text
return # No need to set the timer if we're not authenticated
self._login_remaining_s = max(0, int(auth_info.exp - time.time())) if auth_info else 0
self.action.setEnabled(True)
if self._login_remaining_s > 0:
self._tick_timer.start()
def _on_tick(self) -> None:
"""Handle the timer countdown tick to update the remaining logout time."""
self._login_remaining_s -= 1
if self._login_remaining_s <= 0:
self.set_authenticated(None) # This will disable the action and stop the timer
return
self.update_label() # Optionally update the label to show remaining time
def update_label(self):
"""Update the label text of the logout action."""
if self._login_remaining_s > 0:
label_text = f"{self.label_text}\n({self._login_remaining_s}s)"
else:
label_text = self.label_text
self.action.setText(label_text)
def cleanup(self):
"""Cleanup the timer when the action is destroyed."""
if self._tick_timer.isActive():
self._tick_timer.stop()
class AtlasConnectionInfo(QWidget):
def __init__(self, parent=None):
super().__init__(parent=parent)
layout = QVBoxLayout(self)
layout.setAlignment(Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter)
self.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
layout.setContentsMargins(6, 6, 6, 12)
layout.setSpacing(8)
self._bl_info_label = QLabel(self)
self._atlas_url_label = QLabel(self)
layout.addWidget(self._bl_info_label)
layout.addWidget(self._atlas_url_label)
self._atlas_url_text = ""
def set_info(self, realm_id: str, bl_name: str, atlas_url: str):
"""Set the connection information for the BEC Atlas API."""
bl_info = f"{realm_id} @ {bl_name}"
self._bl_info_label.setText(bl_info)
self._atlas_url_label.setText(atlas_url)
self._atlas_url_text = atlas_url
def set_logged_in(self, email: str):
"""Show login status in the atlas info widget."""
self._atlas_url_label.setText(f"{self._atlas_url_text} | {email}")
def clear_login(self):
"""Clear login status from the atlas info widget."""
self._atlas_url_label.setText(self._atlas_url_text)
class BECAtlasAdminView(BECWidget, QWidget):
RPC = False
authenticated = Signal(bool)
def __init__(
self,
parent=None,
atlas_url: str = "https://bec-atlas-dev.psi.ch/api/v1",
client=None,
**kwargs,
):
super().__init__(parent=parent, client=client, **kwargs)
# State variables
self._current_deployment_info: DeploymentInfoMessage | None = None
self._current_deployment_info = None
self._current_session_info = None
self._current_experiment_info = None
self._authenticated = False
self._atlas_url = atlas_url
# Root layout
self.root_layout = QVBoxLayout(self)
self.root_layout.setContentsMargins(0, 0, 0, 0)
self.root_layout.setSpacing(0)
# Toolbar for navigation between different views in the admin panel
self.toolbar = ModularToolBar(self)
self.init_toolbar()
self.root_layout.insertWidget(0, self.toolbar)
self.toolbar.show_bundles(["view", "atlas_info", "auth"])
# Stacked layout to switch between overview, experiment selection and messaging services
# It is added below the toolbar
self.stacked_layout = QStackedLayout()
self.stacked_layout.setContentsMargins(0, 0, 0, 0)
self.stacked_layout.setSpacing(0)
self.stacked_layout.setStackingMode(QStackedLayout.StackingMode.StackAll)
self.root_layout.addLayout(self.stacked_layout)
# BEC Atlas HTTP Service
self.atlas_http_service = BECAtlasHTTPService(
parent=self, base_url=atlas_url, headers={"accept": "application/json"}
)
# Overview widget
self.overview_widget = OverviewWidget(parent=self)
self.stacked_layout.addWidget(self.overview_widget)
# Experiment Selection widget
self.experiment_selection = ExperimentSelection(parent=self)
self.experiment_selection.setVisible(False)
self.stacked_layout.addWidget(self.experiment_selection)
# Messaging Services widget
self.messaging_config_widget = BECMessagingConfigWidget(parent=self)
self.messaging_config_widget.setVisible(False)
self.stacked_layout.addWidget(self.messaging_config_widget)
# Connect signals
self.overview_widget.login_requested.connect(self._on_login_requested)
self.overview_widget.change_experiment_requested.connect(
self._on_experiment_selection_selected
)
self.authenticated.connect(self.overview_widget.set_authenticated)
self.experiment_selection.experiment_selected.connect(self._on_experiment_selected)
self.atlas_http_service.http_response.connect(self._on_http_response_received)
self.atlas_http_service.authenticated.connect(self._on_authenticated)
self._connect_dispatcher()
def _connect_dispatcher(self):
self.bec_dispatcher.connect_slot(
slot=self._update_deployment_info,
topics=MessageEndpoints.deployment_info(),
from_start=True,
)
def init_toolbar(self):
"""Initialize the toolbar for the admin view. This allows to switch between different views in the admin panel."""
# Overview
overview = MaterialIconAction(
icon_name="home",
tooltip="Show Overview Panel",
label_text="Overview",
text_position="under",
parent=self,
filled=True,
)
overview.action.triggered.connect(self._on_overview_selected)
self.toolbar.components.add_safe("overview", overview)
# Experiment Selection
experiment_selection = MaterialIconAction(
icon_name="experiment",
tooltip="Show Experiment Selection Panel",
label_text="Experiment Selection",
text_position="under",
parent=self,
filled=True,
)
experiment_selection.action.triggered.connect(self._on_experiment_selection_selected)
experiment_selection.action.setEnabled(False) # Initially disabled until authenticated
self.toolbar.components.add_safe("experiment_selection", experiment_selection)
# Messaging Services
messaging_services = MaterialIconAction(
icon_name="chat",
tooltip="Show Messaging Services Panel",
label_text="Messaging Services",
text_position="under",
parent=self,
filled=True,
)
messaging_services.action.triggered.connect(self._on_messaging_services_selected)
messaging_services.action.setEnabled(False) # Initially disabled until authenticated
self.toolbar.components.add_safe("messaging_services", messaging_services)
# Atlas Info
self._atlas_info_widget = AtlasConnectionInfo(parent=self)
atlas_info = WidgetAction(widget=self._atlas_info_widget, parent=self)
self.toolbar.components.add_safe("atlas_info", atlas_info)
logout_action = CustomLogoutAction(parent=self)
logout_action.action.triggered.connect(self.logout)
logout_action.action.setEnabled(False) # Initially disabled until authenticated
self.toolbar.components.add_safe("logout", logout_action)
# Add view_bundle to toolbar
view_bundle = ToolbarBundle("view", self.toolbar.components)
view_bundle.add_action("overview")
view_bundle.add_action("experiment_selection")
view_bundle.add_action("messaging_services")
self.toolbar.add_bundle(view_bundle)
# Add atlas_info to toolbar
atlas_info_bundle = ToolbarBundle("atlas_info", self.toolbar.components)
atlas_info_bundle.add_action("atlas_info")
self.toolbar.add_bundle(atlas_info_bundle)
# Add auth_bundle to toolbar
auth_bundle = ToolbarBundle("auth", self.toolbar.components)
auth_bundle.add_action("logout")
self.toolbar.add_bundle(auth_bundle)
########################
## Toolbar icon slots
########################
def _on_overview_selected(self):
"""Show the overview panel."""
self.overview_widget.setVisible(True)
self.experiment_selection.setVisible(False)
self.messaging_config_widget.setVisible(False)
self.stacked_layout.setCurrentWidget(self.overview_widget)
def _on_experiment_selection_selected(self):
"""Show the experiment selection panel."""
if not self._authenticated:
logger.warning("Attempted to access experiment selection without authentication.")
return
self.overview_widget.setVisible(False)
self.experiment_selection.setVisible(True)
self.messaging_config_widget.setVisible(False)
self.stacked_layout.setCurrentWidget(self.experiment_selection)
def _on_messaging_services_selected(self):
"""Show the messaging services panel."""
if not self._authenticated:
logger.warning("Attempted to access messaging services without authentication.")
return
self.overview_widget.setVisible(False)
self.experiment_selection.setVisible(False)
self.messaging_config_widget.setVisible(True)
if self._current_deployment_info is not None:
self.messaging_config_widget.populate_from_deployment(self._current_deployment_info)
self.stacked_layout.setCurrentWidget(self.messaging_config_widget)
########################
## Internal slots
########################
@SafeSlot(dict)
def _on_experiment_selected(self, experiment_info: dict) -> None:
"""Handle the experiment selected signal from the experiment selection widget"""
experiment_info = ExperimentInfoMessage.model_validate(experiment_info)
experiment_id = experiment_info.pgroup
deployment_id = self._current_deployment_info.deployment_id
self.set_experiment(experiment_id=experiment_id, deployment_id=deployment_id)
@SafeSlot(str, str, popup_error=True)
def _on_login_requested(self, username: str, password: str):
"""Handle login requested signal from the overview widget."""
# Logout first to clear any existing session and cookies before attempting a new login
if self._authenticated:
logger.info("Existing session detected, logging out before attempting new login.")
self.logout()
# Now login with new credentials
self.login(username, password)
@SafeSlot(dict, dict)
def _update_deployment_info(self, msg: dict, _: dict) -> None:
"""Fetch current deployment info from the server."""
deployment = DeploymentInfoMessage.model_validate(msg)
self._current_deployment_info = deployment
self._current_session_info = deployment.active_session
if self._current_session_info is not None:
self._current_experiment_info = self._current_session_info.experiment
self.overview_widget.set_experiment_info(self._current_experiment_info)
self._atlas_info_widget.set_info(
realm_id=self._current_experiment_info.realm_id or "",
bl_name=self._current_deployment_info.name or "",
atlas_url=self._atlas_url,
)
self.atlas_http_service._set_current_deployment_info(deployment)
self.messaging_config_widget.populate_from_deployment(deployment)
def _fetch_available_experiments(self):
"""Fetch the list of available experiments for the authenticated user."""
# What if this is None, should this be an optional user input in the UI?
if self._current_experiment_info is None:
logger.error(
"No current experiment info available, cannot fetch available experiments."
)
return
current_realm_id = self._current_experiment_info.realm_id
if current_realm_id is None:
logger.error(
"Current experiment does not have a realm_id, cannot fetch available experiments."
)
return
self.atlas_http_service.get_experiments_for_realm(current_realm_id)
########################
## HTTP Service response handling
########################
def _on_http_response_received(self, response: dict) -> None:
"""Handle the HTTP response received from the BEC Atlas API."""
response = HTTPResponse(**response)
logger.debug(
f"HTTP Response received: {response.request_url} with status {response.status}"
)
if AtlasEndpoints.REALMS_EXPERIMENTS in response.request_url:
experiments = response.data if isinstance(response.data, list) else []
self.experiment_selection.set_experiment_infos(experiments)
elif AtlasEndpoints.SET_EXPERIMENT in response.request_url:
self._on_overview_selected()
@SafeSlot(dict)
def _on_authenticated(self, auth_info: dict) -> None:
"""Handle authentication state change."""
authenticated = False
# Only if the user has owner access to the deployment, we consider them to be fully authenticated
# This means that although they may authenticate against atlas, they won't be able to see any
# extra information here
if auth_info:
info = AuthenticatedUserInfo.model_validate(auth_info)
if (
self._current_deployment_info
and info.deployment_id == self._current_deployment_info.deployment_id
):
authenticated = True
else:
logger.warning(
f"Authenticated user {info.email} does not have access to the current deployment {self._current_deployment_info.name if self._current_deployment_info else '<no deployment>'}."
)
self._authenticated = authenticated
if authenticated:
self.toolbar.components.get_action("experiment_selection").action.setEnabled(True)
self.toolbar.components.get_action("messaging_services").action.setEnabled(True)
self.toolbar.components.get_action("logout").action.setEnabled(True)
self._fetch_available_experiments() # Fetch experiments upon successful authentication
self._atlas_info_widget.set_logged_in(info.email)
self.toolbar.components.get_action("logout").set_authenticated(info)
else:
self.toolbar.components.get_action("experiment_selection").action.setEnabled(False)
self.toolbar.components.get_action("messaging_services").action.setEnabled(False)
self.toolbar.components.get_action("logout").action.setEnabled(False)
# Delete data in experiment selection widget upon logout
self.experiment_selection.set_experiment_infos([])
self._on_overview_selected() # Switch back to overview on logout
self._atlas_info_widget.clear_login() # Clear login status in atlas info widget on logout
self.toolbar.components.get_action("logout").set_authenticated(None)
self.authenticated.emit(authenticated)
################
## API Methods
################
@SafeSlot(str, str, popup_error=True)
def set_experiment(self, experiment_id: str, deployment_id: str) -> None:
"""Set the experiment information for the current experiment."""
self.atlas_http_service.set_experiment(experiment_id, deployment_id)
@SafeSlot(str, str, popup_error=True)
def login(self, username: str, password: str) -> None:
"""Login to the BEC Atlas API with the provided username and password."""
self.atlas_http_service.login(username=username, password=password)
@SafeSlot(popup_error=True)
def logout(self) -> None:
"""Logout from the BEC Atlas API."""
self.atlas_http_service.logout()
def get_user_info(self):
"""Get the current user information from the BEC Atlas API."""
self.atlas_http_service.get_user_info()
###############
## Cleanup
###############
def cleanup(self):
self.atlas_http_service.cleanup()
return super().cleanup()
if __name__ == "__main__":
import sys
from bec_qthemes import apply_theme
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
apply_theme("light")
window = BECAtlasAdminView()
exp_info_dict = {
"_id": "p22622",
"owner_groups": ["admin"],
"access_groups": ["unx-sls_xda_bs", "p22622"],
"realm_id": "TestBeamline",
"proposal": "12345967",
"title": "Test Experiment for Mat Card Widget",
"firstname": "John",
"lastname": "Doe",
"email": "john.doe@psi.ch",
"account": "doe_j",
"pi_firstname": "Jane",
"pi_lastname": "Smith",
"pi_email": "jane.smith@psi.ch",
"pi_account": "smith_j",
"eaccount": "e22622",
"pgroup": "p22622",
"abstract": "This is a test abstract for the experiment mat card widget. It should be long enough to test text wrapping and display in the card. The abstract provides a brief overview of the experiment, its goals, and its significance. This text is meant to simulate a real abstract that might be associated with an experiment in the BEC Atlas system. The card should be able to handle abstracts of varying lengths without any issues, ensuring that the user can read the full abstract even if it is quite long.",
"schedule": [{"start": "01/01/2025 08:00:00", "end": "03/01/2025 18:00:00"}],
"proposal_submitted": "15/12/2024",
"proposal_expire": "31/12/2025",
"proposal_status": "Scheduled",
"delta_last_schedule": 30,
"mainproposal": "",
}
from bec_lib.messages import DeploymentInfoMessage, ExperimentInfoMessage, SessionInfoMessage
# proposal_info = ExperimentInfoMessage(**exp_info_dict)
# session_info = SessionInfoMessage(name="Test Session", experiment=proposal_info)
# deployment_info = DeploymentInfoMessage(
# deployment_id="test_deployment_001", active_session=session_info
# )
# window.set_experiment_info(proposal_info)
window.resize(800, 600)
window.show()
sys.exit(app.exec_())

View File

@@ -0,0 +1,386 @@
"""
HTTP service widget for interacting with the BEC Atlas API.
This module defines Qt-based classes that wrap ``QNetworkAccessManager`` to perform
authenticated HTTP requests against the BEC Atlas backend, manage login and logout
flows, and track authentication token expiry for the BEC Atlas Admin View.
It also provides Pydantic models that describe HTTP responses and authenticated user
information, as well as an ``AuthenticatedTimer`` helper used to signal when
authentication tokens expire.
"""
import json
import time
from enum import StrEnum
from typing import Literal
import jwt
from bec_lib.logger import bec_logger
from bec_lib.messages import DeploymentInfoMessage
from pydantic import BaseModel
from qtpy.QtCore import QObject, QTimer, QUrl, QUrlQuery, Signal
from qtpy.QtNetwork import QNetworkAccessManager, QNetworkReply, QNetworkRequest
from qtpy.QtWidgets import QMessageBox, QWidget
from bec_widgets.utils.error_popups import SafeSlot
logger = bec_logger.logger
class AtlasEndpoints(StrEnum):
"""Constants for BEC Atlas API endpoints."""
LOGIN = "/user/login"
LOGOUT = "/user/logout"
REALMS_EXPERIMENTS = "/realms/experiments"
SET_EXPERIMENT = "/deployments/experiment"
USER_INFO = "/user/me"
DEPLOYMENT_INFO = "/deployments/id"
class BECAtlasHTTPError(Exception):
"""Custom exception for BEC Atlas HTTP errors."""
class HTTPResponse(BaseModel):
"""Model representing an HTTP response."""
request_url: str
headers: dict
status: int
data: dict | list | str # Check with Klaus if str is deprecated
class AuthenticatedUserInfo(BaseModel):
"""Model representing authenticated user information."""
email: str
exp: float
groups: set[str]
deployment_id: str
class AuthenticatedTimer(QObject):
"""Timer to track authentication expiration and emit a signal when the token expires."""
expired = Signal()
def __init__(self, parent=None):
super().__init__(parent)
self._timer = QTimer(self)
self._timer.setSingleShot(True)
self._timer.timeout.connect(self._on_expired)
def start(self, duration_seconds: float):
"""Start the timer with the given duration in seconds."""
self._timer.start(int(duration_seconds * 1000))
def stop(self):
"""Stop the timer."""
self._timer.stop()
@SafeSlot()
def _on_expired(self):
"""Handle the timer expiration by emitting the expired signal."""
logger.info("Authentication token has expired.")
self.expired.emit()
class BECAtlasHTTPService(QWidget):
"""HTTP service using the QNetworkAccessManager to interact with the BEC Atlas API."""
http_response = Signal(dict) # HTTPResponse.model_dump() dict
authenticated = Signal(dict) # AuthenticatedUserInfo.model_dump() dict or {}
authentication_expires = Signal(float)
def __init__(self, parent=None, base_url: str = "", headers: dict | None = None):
super().__init__(parent)
if headers is None:
headers = {"accept": "application/json"}
self._headers = headers
self._base_url = base_url
self.network_manager = QNetworkAccessManager(self)
self.network_manager.finished.connect(self._handle_response)
self._auth_user_info: AuthenticatedUserInfo | None = None
self._auth_timer = self._create_auth_timer()
self._current_deployment_info = None
def _create_auth_timer(self) -> AuthenticatedTimer:
"""Create and connect the authenticated timer to handle token expiration."""
timer = AuthenticatedTimer(self)
timer.expired.connect(self.__clear_login_info)
return timer
@property
def auth_user_info(self) -> AuthenticatedUserInfo | None:
"""Get the authenticated user information, including email and token expiration time."""
return self._auth_user_info
def __set_auth_info(self, login_info: dict[Literal["email", "exp"], str | float]):
"""Set the authenticated user information after a successful login."""
login_info.update({"groups": []}) # Initialize groups as empty until we fetch user info
login_info.update(
{
"deployment_id": (
self._current_deployment_info.deployment_id
if self._current_deployment_info
else ""
)
}
)
self._auth_user_info = AuthenticatedUserInfo(**login_info)
# Start timer to clear auth info once token expires
exp_time = login_info.get("exp", 0)
current_time = time.time() # TODO should we use server time to avoid clock skew issues?
duration = max(0, exp_time - current_time)
self._auth_timer.start(duration)
def __set_auth_groups(self, groups: list[str]):
"""Set the authenticated user's groups after fetching user info."""
if self._auth_user_info is not None:
self._auth_user_info.groups = set(groups)
def __clear_login_info(self, skip_logout: bool = False):
"""Clear the authenticated user information after logout."""
self._auth_user_info = None
if not skip_logout:
self.logout() # Ensure we also logout on the server side and invalidate the session
def closeEvent(self, event):
self.cleanup()
return super().closeEvent(event)
def cleanup(self):
"""Cleanup connection, destroy authenticate cookies."""
logger.info("Cleaning up BECAtlasHTTPService: disconnecting signals and clearing cookies.")
# Disconnect signals to avoid handling responses after cleanup
self.network_manager.finished.disconnect(self._handle_response)
# Logout to invalidate session on server side
self.logout()
# Stop the authentication timer
self._auth_timer.stop()
# Delete all cookies related to the base URL
for cookie in self.network_manager.cookieJar().cookiesForUrl(QUrl(self._base_url)):
self.network_manager.cookieJar().deleteCookie(cookie)
@SafeSlot(QNetworkReply, popup_error=True)
def _handle_response(self, reply: QNetworkReply):
"""
Handle the HTTP response from the server.
Args:
reply (QNetworkReply): The network reply object containing the response.
"""
status = reply.attribute(QNetworkRequest.Attribute.HttpStatusCodeAttribute)
raw_bytes = reply.readAll().data()
request_url = reply.url().toString()
headers = dict([(i.data().decode(), j.data().decode()) for i, j in reply.rawHeaderPairs()])
reply.deleteLater()
# Any unsuccessful status code should raise here
if status != 200:
raise BECAtlasHTTPError(
f"HTTP request for {request_url} failed with status code {status} and response: {raw_bytes.decode('utf-8')}"
)
if len(raw_bytes) > 0:
data = json.loads(raw_bytes.decode())
else:
data = {}
if data is None:
data = {}
logger.warning(f"Received empty response for {request_url} with status code {status}.")
if not isinstance(data, (dict, list, str)):
raise BECAtlasHTTPError(
f"Expected response data to be a dict, list, or str for {request_url}, but got {type(data)}. Response content: {data}"
)
if AtlasEndpoints.LOGIN.value in request_url:
# If it's a login response, don't forward the token
# but extract the expiration time and emit it
token = data.get("access_token")
data = jwt.decode(token, options={"verify_signature": False})
self.authentication_expires.emit(data.get("exp", 0))
# Now we set the auth info, and then fetch the user info to get the groups
self.__set_auth_info(data)
self.get_user_info() # Fetch groups, then emit authenticated once groups are set on auth_user
elif AtlasEndpoints.LOGOUT.value in request_url:
self._auth_timer.stop() # Stop the timer if it was running
self.__clear_login_info(skip_logout=True) # Skip calling logout again
self.authenticated.emit({})
elif AtlasEndpoints.USER_INFO.value in request_url:
groups = data.get("groups", [])
email = data.get("email", "")
# Second step of authentication: We also have all groups now
if self.auth_user_info is not None and self.auth_user_info.email == email:
self.__set_auth_groups(groups)
if self._current_deployment_info is not None:
# Now we need to fetch the deployment info to get the owner groups and check access rights,
# Then we can emit the authenticated signal with the full user info including groups if access is
# granted. Otherwise, we emit nothing and show a warning that the user does not have the access
# rights for the current deployment.
self.get_deployment_info(
deployment_id=self._current_deployment_info.deployment_id
)
elif AtlasEndpoints.DEPLOYMENT_INFO.value in request_url:
owner_groups = data.get("owner_groups", [])
if self.auth_user_info is not None and not self.auth_user_info.groups.isdisjoint(
owner_groups
):
self.authenticated.emit(self.auth_user_info.model_dump())
else:
if self.auth_user_info is not None:
warning_text = f"User {self.auth_user_info.email} does not have access to the active deployment {data.get('name', '<unknown>')}."
else:
warning_text = "Authenticated user information is missing. Cannot verify access to the active deployment."
self._show_warning(warning_text)
self.logout() # Logout to clear auth info and stop timer since user does not have access
response = HTTPResponse(request_url=request_url, headers=headers, status=status, data=data)
self.http_response.emit(response.model_dump())
def _show_warning(self, text: str):
"""Show a warning message to the user."""
msg = QMessageBox(self)
msg.setIcon(QMessageBox.Icon.Warning)
msg.setText(text)
msg.setWindowTitle("Authentication Warning")
msg.exec_()
#######################
# GET/POST Request Methods
#######################
def _get_request(self, endpoint: str, query_parameters: dict | None = None):
"""
GET request to the API endpoint.
Args:
endpoint (str): The API endpoint to send the GET request to.
query_parameters (dict | None): Optional query parameters to include in the URL.
"""
url = QUrl(self._base_url + endpoint)
if query_parameters:
query = QUrlQuery()
for key, value in query_parameters.items():
query.addQueryItem(key, value)
url.setQuery(query)
request = QNetworkRequest(url)
for key, value in self._headers.items():
request.setRawHeader(key.encode(), value.encode())
self.network_manager.get(request)
def _post_request(
self, endpoint: str, payload: dict | None = None, query_parameters: dict | None = None
):
"""
POST request to the API endpoint with a JSON payload.
Args:
endpoint (str): The API endpoint to send the POST request to.
payload (dict): The JSON payload to include in the POST request.
query_parameters (dict | None): Optional query parameters to include in the URL.
"""
if payload is None:
payload = {}
url = QUrl(self._base_url + endpoint)
if query_parameters:
query = QUrlQuery()
for key, value in query_parameters.items():
query.addQueryItem(key, value)
url.setQuery(query)
request = QNetworkRequest(url)
# Headers
for key, value in self._headers.items():
request.setRawHeader(key.encode(), value.encode())
request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader, "application/json")
payload_dump = json.dumps(payload).encode()
self.network_manager.post(request, payload_dump)
def _set_current_deployment_info(self, deployment_info: dict | DeploymentInfoMessage):
"""
Set the current deployment information for the service.
Args:
deployment_info (dict | DeploymentInfoMessage): The deployment information to set.
"""
if isinstance(deployment_info, dict):
deployment_info = DeploymentInfoMessage.model_validate(deployment_info)
self._current_deployment_info = deployment_info
################
# API Methods
################
@SafeSlot(str, str, popup_error=True)
def login(self, username: str, password: str):
"""
Login to BEC Atlas with the provided username and password.
Args:
username (str): The username for authentication.
password (str): The password for authentication.
"""
self._post_request(
endpoint=AtlasEndpoints.LOGIN.value,
payload={"username": username, "password": password},
)
@SafeSlot(popup_error=True)
def logout(self):
"""Logout from BEC Atlas."""
self._post_request(endpoint=AtlasEndpoints.LOGOUT.value)
@SafeSlot(str, popup_error=True)
def get_experiments_for_realm(self, realm_id: str):
"""
Get the list of realms from BEC Atlas. Requires authentication.
Args:
realm_id (str): The ID of the realm to retrieve experiments for.
"""
self._get_request(
endpoint=AtlasEndpoints.REALMS_EXPERIMENTS.value,
query_parameters={"realm_id": realm_id},
)
@SafeSlot(str, str)
def set_experiment(self, experiment_id: str, deployment_id: str) -> None:
"""
Set the current experiment information for the service.
Args:
experiment_id (str): The ID of the experiment to set.
deployment_id (str): The ID of the deployment associated with the experiment.
"""
self._post_request(
endpoint=AtlasEndpoints.SET_EXPERIMENT.value,
query_parameters={"experiment_id": experiment_id, "deployment_id": deployment_id},
)
@SafeSlot(popup_error=True)
def get_user_info(self):
"""Get the current user information from BEC Atlas. Requires authentication."""
self._get_request(endpoint=AtlasEndpoints.USER_INFO.value)
@SafeSlot(str, popup_error=True)
def get_deployment_info(self, deployment_id: str):
"""
Get the deployment information for a given deployment ID. Requires authentication.
Args:
deployment_id (str): The ID of the deployment to retrieve information for.
"""
self._get_request(
endpoint=AtlasEndpoints.DEPLOYMENT_INFO.value,
query_parameters={"deployment_id": deployment_id},
)

View File

@@ -0,0 +1,273 @@
"""Mat-card like widget to display experiment details. Optionally, a button on the bottom which the user can click to trigger the selection of the experiment."""
from bec_lib.messages import ExperimentInfoMessage
from qtpy.QtCore import Qt, Signal
from qtpy.QtWidgets import (
QFrame,
QGraphicsDropShadowEffect,
QGridLayout,
QGroupBox,
QHBoxLayout,
QLabel,
QPushButton,
QSizePolicy,
QVBoxLayout,
QWidget,
)
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_theme_palette
from bec_widgets.utils.round_frame import RoundedFrame
from bec_widgets.widgets.services.bec_atlas_admin_view.experiment_selection.utils import (
format_name,
format_schedule,
)
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
class BorderLessLabel(QLabel):
"""A QLabel that does not show any border, even when stylesheets try to apply one."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setStyleSheet("border: none;")
self.setSizePolicy(QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Preferred)
class ExperimentMatCard(BECWidget, QWidget):
RPC = False
experiment_selected = Signal(dict)
def __init__(
self,
parent=None,
show_activate_button: bool = True,
button_text: str = "Activate",
title: str = "Next Experiment",
**kwargs,
):
super().__init__(parent=parent, theme_update=True, **kwargs)
layout = QVBoxLayout(self)
layout.setContentsMargins(12, 8, 12, 8)
self.experiment_info = {}
self._abstract_text = ""
# Add card frame with shadow and custom styling
self._card_frame = QFrame(parent=self)
layout = QVBoxLayout(self._card_frame)
self._card_frame.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Preferred)
palette = get_theme_palette()
self._card_frame.setStyleSheet(f"""
QFrame {{
border: 1px solid {palette.mid().color().name()};
background: {palette.base().color().name()};
}}
""")
shadow = QGraphicsDropShadowEffect(self._card_frame)
shadow.setBlurRadius(18)
shadow.setOffset(0, 4)
shadow.setColor(palette.shadow().color())
self._card_frame.setGraphicsEffect(shadow)
self._group_box = QGroupBox(self._card_frame)
self._group_box.setStyleSheet(
"QGroupBox { border: none; }; QLabel { border: none; padding: 0px; }"
)
self._fill_group_box(
title=title, show_activate_button=show_activate_button, button_text=button_text
)
self.apply_theme("light")
def apply_theme(self, theme: str):
palette = get_theme_palette()
self._card_frame.setStyleSheet(f"""
QFrame {{
border: 1px solid {palette.mid().color().name()};
background: {palette.base().color().name()};
}}
""")
shadow = self._card_frame.graphicsEffect()
if isinstance(shadow, QGraphicsDropShadowEffect):
shadow.setColor(palette.shadow().color())
def _fill_group_box(
self, title: str, show_activate_button: bool, button_text: str = "Activate"
):
group_layout = QVBoxLayout(self._group_box)
group_layout.setContentsMargins(16, 16, 16, 16)
group_layout.setSpacing(12)
title_row = QHBoxLayout()
self._card_title = BorderLessLabel(title, self._group_box)
self._card_title.setStyleSheet("""
border: none;
font-size: 14px;
font-weight: 600;
""")
# Add title row and info button to QH layout, then add it to QV layout
title_row.addWidget(self._card_title)
title_row.addStretch(1)
group_layout.addLayout(title_row)
self._card_grid = QGridLayout()
self._card_grid.setHorizontalSpacing(12)
self._card_grid.setVerticalSpacing(8)
self._card_grid.setColumnStretch(1, 1)
self._card_pgroup = BorderLessLabel("-", self._group_box)
self._card_title_value = BorderLessLabel("-", self._group_box)
self._card_title_value.setWordWrap(True)
self._card_title_value.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Preferred)
self._card_name = BorderLessLabel("-", self._group_box)
self._card_start = BorderLessLabel("-", self._group_box)
self._card_end = BorderLessLabel("-", self._group_box)
self._card_row_labels = []
def _row_label(text):
label = BorderLessLabel(text, self._group_box)
self._card_row_labels.append(label)
return label
self._card_grid.addWidget(_row_label("Name"), 0, 0)
self._card_grid.addWidget(self._card_name, 0, 1)
self._card_grid.addWidget(_row_label("Title"), 1, 0)
self._card_grid.addWidget(self._card_title_value, 1, 1)
self._card_grid.addWidget(_row_label("P-group"), 2, 0)
self._card_grid.addWidget(self._card_pgroup, 2, 1)
self._card_grid.addWidget(_row_label("Schedule (start)"), 3, 0)
self._card_grid.addWidget(self._card_start, 3, 1)
self._card_grid.addWidget(_row_label("Schedule (end)"), 4, 0)
self._card_grid.addWidget(self._card_end, 4, 1)
# Add to groupbox
group_layout.addLayout(self._card_grid)
# Add abstract field at the bottom of the card.
self._abstract_label = BorderLessLabel("", self._group_box)
self._abstract_label.setWordWrap(True)
self._abstract_label.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Preferred)
group_layout.addWidget(self._abstract_label)
# Add activate button at the bottom
self._activate_button = QPushButton(button_text, self._group_box)
self._activate_button.clicked.connect(self._emit_next_experiment)
self._activate_button.setSizePolicy(
QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Preferred
)
group_layout.addWidget(self._activate_button, alignment=Qt.AlignmentFlag.AlignHCenter)
self._activate_button.setVisible(show_activate_button)
self._activate_button.setEnabled(False)
self._card_frame.layout().setContentsMargins(12, 12, 12, 12)
self._card_frame.layout().addWidget(self._group_box)
card_row = QHBoxLayout()
card_row.addStretch(0)
card_row.addWidget(self._card_frame)
card_row.addStretch(0)
layout = self.layout()
layout.addStretch(0)
layout.addLayout(card_row)
layout.addStretch(0)
def _emit_next_experiment(self):
self.experiment_selected.emit(self.experiment_info)
def clear_experiment_info(self):
"""
Clear the experiment information displayed on the card and disable the activate button.
"""
self._card_pgroup.setText("-")
self._card_title_value.setText("-")
self._card_name.setText("-")
self._card_start.setText("-")
self._card_end.setText("-")
self._abstract_text = ""
self._abstract_label.setText("")
self.experiment_info = {}
self._activate_button.setEnabled(False)
def set_experiment_info(self, info: ExperimentInfoMessage | dict):
"""
Set the experiment information to display on the card.
Args:
info (ExperimentInfoMessage | dict): The experiment information to display. Can be either a
dictionary or an ExperimentInfoMessage instance.
"""
if isinstance(info, dict):
info = ExperimentInfoMessage(**info)
start, end = format_schedule(info.schedule)
self._card_pgroup.setText(info.pgroup or "-")
self._card_title_value.setText(info.title or "-")
self._card_name.setText(format_name(info))
self._card_start.setText(start or "-")
self._card_end.setText(end or "-")
self._abstract_text = (info.abstract or "").strip()
self._abstract_label.setText(self._abstract_text if self._abstract_text else "")
self.experiment_info = info.model_dump()
self._activate_button.setEnabled(True)
def set_title(self, title: str):
"""
Set the title displayed at the top of the card.
Args:
title (str): The title text to display.
"""
self._card_title.setText(title)
if __name__ == "__main__": # pragma: no cover
import sys
from bec_qthemes import apply_theme
from qtpy.QtWidgets import QApplication
exp_info = {
"_id": "p22622",
"owner_groups": ["admin"],
"access_groups": ["unx-sls_xda_bs", "p22622"],
"realm_id": "TestBeamline",
"proposal": "12345967",
"title": "Test Experiment for Mat Card Widget",
"firstname": "John",
"lastname": "Doe",
"email": "john.doe@psi.ch",
"account": "doe_j",
"pi_firstname": "Jane",
"pi_lastname": "Smith",
"pi_email": "jane.smith@psi.ch",
"pi_account": "smith_j",
"eaccount": "e22622",
"pgroup": "p22622",
"abstract": "This is a test abstract for the experiment mat card widget. It should be long enough to test text wrapping and display in the card. The abstract provides a brief overview of the experiment, its goals, and its significance. This text is meant to simulate a real abstract that might be associated with an experiment in the BEC Atlas system. The card should be able to handle abstracts of varying lengths without any issues, ensuring that the user can read the full abstract even if it is quite long.",
"schedule": [{"start": "01/01/2025 08:00:00", "end": "03/01/2025 18:00:00"}],
"proposal_submitted": "15/12/2024",
"proposal_expire": "31/12/2025",
"proposal_status": "Scheduled",
"delta_last_schedule": 30,
"mainproposal": "",
}
app = QApplication(sys.argv)
apply_theme("dark")
w = QWidget()
l = QVBoxLayout(w)
button = DarkModeButton()
widget = ExperimentMatCard()
widget.set_experiment_info(exp_info)
widget.set_title("Scheduled Experiment")
l.addWidget(button)
l.addWidget(widget)
w.resize(w.sizeHint())
w.show()
sys.exit(app.exec())

View File

@@ -0,0 +1,395 @@
"""Experiment Selection View for BEC Atlas Admin Widget"""
from datetime import datetime
from typing import Any
from bec_lib.logger import bec_logger
from qtpy.QtCore import Qt, Signal
from qtpy.QtWidgets import (
QCheckBox,
QHBoxLayout,
QHeaderView,
QLabel,
QLineEdit,
QSizePolicy,
QTableWidget,
QTableWidgetItem,
QTabWidget,
QVBoxLayout,
QWidget,
)
from thefuzz import fuzz
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.fuzzy_search import is_match
from bec_widgets.widgets.services.bec_atlas_admin_view.experiment_selection.experiment_mat_card import (
ExperimentMatCard,
)
from bec_widgets.widgets.services.bec_atlas_admin_view.experiment_selection.utils import (
format_name,
format_schedule,
)
logger = bec_logger.logger
class ExperimentSelection(QWidget):
experiment_selected = Signal(dict)
def __init__(self, experiment_infos=None, parent=None):
super().__init__(parent=parent)
self._experiment_infos = experiment_infos or []
self._next_experiment = self._select_next_experiment(self._experiment_infos)
self._enable_fuzzy_search: bool = True
self._hidden_rows: set[int] = set()
self._headers: dict[str, str] = {
"pgroup": "pgroup",
"title": "Title",
"name": "Name",
"schedule_start": "Schedule (start)",
"schedule_end": "Schedule (end)",
}
self._table_infos: list[dict[str, Any]] = []
main_layout = QVBoxLayout(self)
main_layout.setContentsMargins(0, 0, 0, 0)
# main_layout.setSpacing(12)
main_layout.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
self.setAutoFillBackground(True)
self._tabs = QTabWidget(self)
main_layout.addWidget(self._tabs, stretch=1)
self._card_tab = ExperimentMatCard(
parent=self, show_activate_button=True, button_text="Activate"
)
self._card_tab.experiment_selected.connect(self._emit_selected_experiment)
if self._next_experiment:
self._card_tab.set_experiment_info(self._next_experiment)
self._table_tab = QWidget(self)
self._tabs.addTab(self._card_tab, "Next Experiment")
self._tabs.addTab(self._table_tab, "Manual Selection")
self._build_table_tab()
self._tabs.currentChanged.connect(self._on_tab_changed)
button_layout = QHBoxLayout()
main_layout.addLayout(button_layout)
self._apply_table_filters()
self.restore_default_view()
def restore_default_view(self):
"""Reset the view to the default state, showing the next experiment card."""
self._tabs.setCurrentWidget(self._card_tab)
def set_experiment_infos(self, experiment_infos: list[dict]):
"""
Update the experiment information displayed in the view. It will in addition determine
the next experiment to be shown in the card view. If no next experiment can be determined,
the card view will be cleared.
Args:
experiment_infos (list[dict]): A list of experiment information dictionaries.
"""
self._experiment_infos = experiment_infos
self._next_experiment = self._select_next_experiment(self._experiment_infos)
if self._next_experiment:
self._card_tab.set_experiment_info(self._next_experiment)
else:
self._card_tab.clear_experiment_info()
self._apply_table_filters()
def _setup_search(self, layout: QVBoxLayout):
"""
Create components related to the search functionality
Args:
layout (QVBoxLayout): The layout to which the search components will be added.
"""
# Create search bar
search_layout = QHBoxLayout()
self.search_label = QLabel("Search:")
self.search_input = QLineEdit()
self.search_input.setPlaceholderText("Filter experiments...")
self.search_input.setClearButtonEnabled(True)
self.search_input.textChanged.connect(self._apply_row_filter)
search_layout.addWidget(self.search_label)
search_layout.addWidget(self.search_input)
# Add exact match toggle
fuzzy_layout = QHBoxLayout()
self.fuzzy_label = QLabel("Exact Match:")
self.fuzzy_is_disabled = QCheckBox()
self.fuzzy_is_disabled.stateChanged.connect(self._state_change_fuzzy_search)
self.fuzzy_is_disabled.setToolTip(
"Enable approximate matching (OFF) and exact matching (ON)"
)
self.fuzzy_label.setToolTip("Enable approximate matching (OFF) and exact matching (ON)")
fuzzy_layout.addWidget(self.fuzzy_label)
fuzzy_layout.addWidget(self.fuzzy_is_disabled)
fuzzy_layout.addStretch()
# Add both search components to the layout
self.search_controls = QHBoxLayout()
self.search_controls.addLayout(search_layout)
self.search_controls.addSpacing(20) # Add some space between the search box and toggle
self.search_controls.addLayout(fuzzy_layout)
# Add filter section for proposals
filter_layout = QHBoxLayout()
filter_layout.setContentsMargins(12, 0, 12, 0)
filter_layout.setSpacing(12)
self._with_proposals = QCheckBox("Show experiments with proposals", self)
self._without_proposals = QCheckBox("Show experiments without proposals", self)
self._with_proposals.setChecked(True)
self._without_proposals.setChecked(True)
self._with_proposals.toggled.connect(self._apply_table_filters)
self._without_proposals.toggled.connect(self._apply_table_filters)
filter_layout.addWidget(self._with_proposals)
filter_layout.addWidget(self._without_proposals)
filter_layout.addStretch(1)
self.search_controls.addLayout(filter_layout)
# Insert the search controls layout at the top of the table
layout.addLayout(self.search_controls)
def _build_table_tab(self):
layout = QVBoxLayout(self._table_tab)
layout.setContentsMargins(16, 16, 16, 16)
layout.setSpacing(8)
self._setup_search(layout)
# Add table
hor_layout = QHBoxLayout()
self._table = QTableWidget(self._table_tab)
self._table.setColumnCount(5)
self._table.setHorizontalHeaderLabels(list(self._headers.values()))
vh = self._table.verticalHeader()
vh.setVisible(False)
vh.setDefaultSectionSize(vh.minimumSectionSize())
self._table.setSelectionBehavior(QTableWidget.SelectionBehavior.SelectRows)
self._table.setSelectionMode(QTableWidget.SelectionMode.SingleSelection)
self._table.setWordWrap(True)
self._table.setStyleSheet("QTableWidget::item { padding: 4px; }")
header = self._table.horizontalHeader()
header.setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents)
header.setSectionResizeMode(1, QHeaderView.ResizeMode.Stretch)
header.setSectionResizeMode(2, QHeaderView.ResizeMode.ResizeToContents)
header.setSectionResizeMode(3, QHeaderView.ResizeMode.ResizeToContents)
header.setSectionResizeMode(4, QHeaderView.ResizeMode.ResizeToContents)
self._table.itemSelectionChanged.connect(self._update_selection_state)
hor_layout.addWidget(self._table, stretch=5)
hor_layout.addSpacing(12) # Add space between table and side card
# Add side card for experiment details
self._side_card = ExperimentMatCard(
parent=self, show_activate_button=True, button_text="Activate"
)
self._side_card.experiment_selected.connect(self._emit_selected_experiment)
hor_layout.addWidget(self._side_card, stretch=2) # Ratio 5:2 between table and card
layout.addLayout(hor_layout)
@SafeSlot()
@SafeSlot(int)
@SafeSlot(bool) # Overload for buttons
def _apply_table_filters(self, *args, **kwargs):
if self._tabs.currentWidget() is not self._table_tab:
return
show_with = self._with_proposals.isChecked()
show_without = self._without_proposals.isChecked()
self._table_infos = []
for info in self._experiment_infos:
has_proposal = bool(info.get("proposal"))
if has_proposal and not show_with:
continue
if not has_proposal and not show_without:
continue
self._table_infos.append(info)
self._populate_table()
self._update_selection_state()
def _populate_table(self):
# Clear table before populating, this keeps headers intact
self._table.setRowCount(0)
# Refill table
self._table.setRowCount(len(self._table_infos))
for row, info in enumerate(self._table_infos):
pgroup = info.get("pgroup", "")
title = info.get("title", "")
name = format_name(info)
start, end = format_schedule(info.get("schedule"))
self._table.setItem(row, 0, QTableWidgetItem(pgroup))
self._table.setItem(row, 1, QTableWidgetItem(title))
self._table.setItem(row, 2, QTableWidgetItem(name))
self._table.setItem(row, 3, QTableWidgetItem(start))
self._table.setItem(row, 4, QTableWidgetItem(end))
width = self._table.viewport().width()
self._table.resizeRowsToContents()
self._table.resize(width, self._table.height())
# self._table.resizeRowsToContents()
@SafeSlot()
def _update_selection_state(self):
if self._tabs.currentWidget() is not self._table_tab:
return
index = self._table.selectionModel().selectedRows()
if len(index) > 0:
index = index[0]
self._side_card.set_experiment_info(self._table_infos[index.row()])
def _emit_selected_experiment(self):
if self._tabs.currentWidget() is self._card_tab and self._next_experiment:
self.experiment_selected.emit(self._next_experiment)
return
selected = self._table.selectionModel().selectedRows()
if not selected:
return
row = selected[0].row()
if 0 <= row < len(self._table_infos):
self.experiment_selected.emit(self._table_infos[row])
logger.info(f"Emitting next experiment signal with info: {self._table_infos[row]}")
def _select_next_experiment(self, experiment_infos: list[dict]) -> dict | None:
candidates = []
for info in experiment_infos:
start, _ = format_schedule(info.get("schedule"), as_datetime=True)
if start is None:
continue
candidates.append((start, info))
if not candidates:
return experiment_infos[0] if experiment_infos else None
now = datetime.now()
future = [entry for entry in candidates if entry[0] >= now]
pool = future or candidates
return min(pool, key=lambda entry: abs(entry[0] - now))[1]
def _on_tab_changed(self, index):
if self._tabs.widget(index) is self._table_tab:
self._table.resizeRowsToContents()
if self._next_experiment:
self._side_card.set_experiment_info(self._next_experiment)
self._apply_table_filters()
def _get_column_data(self, row) -> dict[str, str]:
output = {}
for ii, header in enumerate(self._headers.values()):
item = self._table.item(row, ii)
if item is None:
output[header] = ""
continue
output[header] = item.text()
return output
@SafeSlot(str)
def _apply_row_filter(self, text_input: str):
"""Apply a filter to the table rows based on the filter text."""
if not text_input:
for row in self._hidden_rows:
self._table.setRowHidden(row, False)
self._hidden_rows.clear()
return
for row in range(self._table.rowCount()):
experiment_data = self._get_column_data(row)
if is_match(
text_input, experiment_data, list(self._headers.values()), self._enable_fuzzy_search
):
self._table.setRowHidden(row, False)
self._hidden_rows.discard(row)
else:
self._table.setRowHidden(row, True)
self._hidden_rows.add(row)
@SafeSlot(int)
def _state_change_fuzzy_search(self, enabled: int):
"""Handle state changes for the fuzzy search toggle."""
self._enable_fuzzy_search = not bool(enabled)
# Re-apply filter with updated fuzzy search setting
current_text = self.search_input.text()
self._apply_row_filter(current_text)
if __name__ == "__main__": # pragma: no cover
from qtpy.QtWidgets import QApplication
experiment_infos = [
{
"_id": "p22622",
"owner_groups": ["admin"],
"access_groups": ["unx-sls_xda_bs", "p22622"],
"realm_id": "TestBeamline",
"proposal": "12345967",
"title": "Test Experiment for Mat Card Widget",
"firstname": "John",
"lastname": "Doe",
"email": "john.doe@psi.ch",
"account": "doe_j",
"pi_firstname": "Jane",
"pi_lastname": "Smith",
"pi_email": "jane.smith@psi.ch",
"pi_account": "smith_j",
"eaccount": "e22622",
"pgroup": "p22622",
"abstract": "This is a test abstract for the experiment mat card widget. It should be long enough to test text wrapping and display in the card. The abstract provides a brief overview of the experiment, its goals, and its significance. This text is meant to simulate a real abstract that might be associated with an experiment in the BEC Atlas system. The card should be able to handle abstracts of varying lengths without any issues, ensuring that the user can read the full abstract even if it is quite long.",
"schedule": [{"start": "01/01/2025 08:00:00", "end": "03/01/2025 18:00:00"}],
"proposal_submitted": "15/12/2024",
"proposal_expire": "31/12/2025",
"proposal_status": "Scheduled",
"delta_last_schedule": 30,
"mainproposal": "",
},
{
"_id": "p22623",
"owner_groups": ["admin"],
"access_groups": ["unx-sls_xda_bs", "p22623"],
"realm_id": "TestBeamline",
"proposal": "",
"title": "Experiment without Proposal",
"firstname": "Alice",
"lastname": "Johnson",
"email": "alice.johnson@psi.ch",
"account": "johnson_a",
"pi_firstname": "Bob",
"pi_lastname": "Brown",
"pi_email": "bob.brown@psi.ch",
"pi_account": "brown_b",
"eaccount": "e22623",
"pgroup": "p22623",
"abstract": "",
"schedule": [],
"proposal_submitted": "",
"proposal_expire": "",
"proposal_status": "",
"delta_last_schedule": None,
"mainproposal": "",
},
]
app = QApplication([])
from bec_qthemes import apply_theme
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
apply_theme("light")
w = QWidget()
l = QVBoxLayout(w)
dark_button = DarkModeButton()
l.addWidget(dark_button)
widget = ExperimentSelection(experiment_infos)
l.addWidget(widget)
w.resize(1280, 920)
w.show()
app.exec()

View File

@@ -0,0 +1,67 @@
"""Utility functions for experiment selection."""
from datetime import datetime
from typing import Literal
from bec_lib.messages import ExperimentInfoMessage
def format_name(info: dict | ExperimentInfoMessage) -> str:
"""Format the name from the experiment info."""
info = ExperimentInfoMessage.model_validate(info) if isinstance(info, dict) else info
firstname = info.firstname
lastname = info.lastname
return " ".join(part for part in [firstname, lastname] if part)
def format_schedule(
schedule: list[dict[Literal["start", "end"], str]] | None, as_datetime: bool = False
) -> tuple[str, str] | tuple[datetime | None, datetime | None]:
"""Format the schedule information to display start and end times."""
if not schedule:
return (None, None) if as_datetime else ("", "")
start, end = _pick_schedule_entry(schedule)
if as_datetime:
return start, end
return format_datetime(start), format_datetime(end)
def _pick_schedule_entry(
schedule: list[dict[Literal["start", "end"], str]],
) -> tuple[datetime | None, datetime | None]:
"""Pick the most relevant schedule entry based on the current time."""
now = datetime.now()
candidates = []
for item in schedule:
if not item:
continue
start_raw = item.get("start")
parsed = _parse_schedule_start(start_raw)
if parsed is None:
continue
candidates.append((parsed, item))
if not candidates:
return None, None
future = [entry for entry in candidates if entry[0] >= now]
pool = future or candidates
chosen_start, chosen_item = min(pool, key=lambda entry: abs(entry[0] - now))
end_raw = chosen_item.get("end")
return chosen_start, _parse_schedule_start(end_raw)
def _parse_schedule_start(value) -> datetime | None:
"""Parse a schedule start string into a datetime object."""
if not value:
return None
try:
return datetime.strptime(value, "%d/%m/%Y %H:%M:%S")
except ValueError:
return None
def format_datetime(value) -> str:
if not value:
return ""
return value.strftime("%Y-%m-%d %H:%M")

View File

@@ -0,0 +1,315 @@
"""Module for the BEC messaging configuration widget."""
from __future__ import annotations
import json
from qtpy.QtCore import Qt, QTimer, Signal # type: ignore[attr-defined]
from qtpy.QtWidgets import (
QGroupBox,
QHBoxLayout,
QLabel,
QPushButton,
QSizePolicy,
QSplitter,
QTabWidget,
QVBoxLayout,
QWidget,
)
from bec_widgets.utils.colors import apply_theme
from bec_widgets.widgets.services.bec_messaging_config.service_cards import (
CardType,
ScopeListWidget,
card_from_service,
make_card,
)
from bec_widgets.widgets.services.bec_messaging_config.service_scope_event_table import (
ServiceScopeEventTableWidget,
)
class ServiceConfigPanel(QWidget):
"""Panel that manages global and local service scopes for one service type.
Args:
card_type (CardType): The service type used when adding new scope cards.
parent (QWidget | None): The parent widget.
"""
config_changed = Signal()
def __init__(self, card_type: CardType, parent: QWidget | None = None) -> None:
super().__init__(parent)
self._card_type: CardType = card_type
root = QVBoxLayout(self)
root.setContentsMargins(12, 12, 12, 12)
root.setSpacing(0)
splitter = QSplitter(Qt.Orientation.Vertical)
# ── Local settings box ────────────────────────────────────────────
self._local_box = QGroupBox("Current Experiment")
self._local_list = ScopeListWidget()
self._local_list.cards_changed.connect(self.config_changed)
self._local_add_btn = QPushButton("+ Add")
self._local_add_btn.setFixedWidth(120)
self._local_add_btn.clicked.connect(
lambda: self._local_list.add_card(make_card(self._card_type))
)
local_layout = QVBoxLayout(self._local_box)
local_layout.setContentsMargins(16, 16, 16, 16)
local_layout.setSpacing(12)
local_layout.addWidget(self._local_add_btn, 0, Qt.AlignmentFlag.AlignRight)
local_layout.addWidget(self._local_list, 1)
splitter.addWidget(self._local_box)
# ── Global settings box ───────────────────────────────────────────
self._global_box = QGroupBox("All Experiments")
self._global_list = ScopeListWidget()
self._global_list.cards_changed.connect(self.config_changed)
self._global_add_btn = QPushButton("+ Add")
self._global_add_btn.setFixedWidth(120)
self._global_add_btn.clicked.connect(
lambda: self._global_list.add_card(make_card(self._card_type))
)
global_layout = QVBoxLayout(self._global_box)
global_layout.setContentsMargins(16, 16, 16, 16)
global_layout.setSpacing(12)
global_layout.addWidget(self._global_add_btn, 0, Qt.AlignmentFlag.AlignRight)
global_layout.addWidget(self._global_list, 1)
splitter.addWidget(self._global_box)
splitter.setSizes([300, 300])
root.addWidget(splitter, 1)
# ------------------------------------------------------------------
def load_services(self, deployment_services: list, session_services: list) -> None:
"""Populate both lists with services matching the panel service type."""
self._clear_list(self._global_list)
self._clear_list(self._local_list)
for info in deployment_services:
if getattr(info, "service_type", None) == self._card_type:
self._global_list.add_card(card_from_service(info))
for info in session_services:
if getattr(info, "service_type", None) == self._card_type:
self._local_list.add_card(card_from_service(info))
@staticmethod
def _clear_list(list_widget: ScopeListWidget) -> None:
"""Remove all cards from *list_widget*."""
list_widget.clear_cards()
# ------------------------------------------------------------------
def get_data(self) -> dict:
"""Collect all card data from both the deployment and session lists."""
return {
"deployment": self._collect(self._global_list),
"session": self._collect(self._local_list),
}
@staticmethod
def _collect(list_widget: ScopeListWidget) -> list[dict]:
return [card.get_data() for card in list_widget.cards()]
class BECMessagingConfigWidget(QWidget):
"""Widget to configure SciLog, Signal, and MS Teams messaging services."""
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self.setWindowTitle("BEC Messaging Configuration")
self.setMinimumSize(540, 500)
root = QVBoxLayout(self)
root.setContentsMargins(16, 16, 16, 16)
root.setSpacing(12)
content_splitter = QSplitter(Qt.Orientation.Horizontal)
# ── Tab widget ────────────────────────────────────────────────────
self._tabs = QTabWidget()
self._scilog_panel = ServiceConfigPanel("scilog")
self._signal_panel = ServiceConfigPanel("signal")
self._teams_panel = ServiceConfigPanel("teams")
for panel in (self._scilog_panel, self._signal_panel, self._teams_panel):
panel.config_changed.connect(self._refresh_scope_event_table)
self._tabs.addTab(self._scilog_panel, "SciLog")
self._tabs.addTab(self._signal_panel, "Signal")
self._tabs.addTab(self._teams_panel, "MS Teams")
content_splitter.addWidget(self._tabs)
self._scope_event_table = ServiceScopeEventTableWidget(self)
content_splitter.addWidget(self._scope_event_table)
content_splitter.setStretchFactor(0, 3)
content_splitter.setStretchFactor(1, 2)
root.addWidget(content_splitter, 1)
# ── Bottom action bar ─────────────────────────────────────────────
bottom_row = QHBoxLayout()
bottom_row.setSpacing(12)
self._status_label = QLabel("")
self._status_label.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Preferred)
bottom_row.addWidget(self._status_label, 1)
save_btn = QPushButton("Save && Apply")
save_btn.setDefault(True)
save_btn.clicked.connect(self._mock_save_to_atlas_api)
bottom_row.addWidget(save_btn)
root.addLayout(bottom_row)
# ------------------------------------------------------------------
# Initialisation from backend message
# ------------------------------------------------------------------
def populate_from_deployment(self, msg: DeploymentInfoMessage) -> None:
"""Populate all panels from a deployment info message.
Args:
msg (DeploymentInfoMessage): Deployment information containing deployment and session services.
"""
deployment_services = list(msg.messaging_services)
session_services = (
list(msg.active_session.messaging_services) if msg.active_session is not None else []
)
self._scilog_panel.load_services(deployment_services, session_services)
self._signal_panel.load_services(deployment_services, session_services)
self._teams_panel.load_services(deployment_services, session_services)
self._refresh_scope_event_table()
# ------------------------------------------------------------------
# Dummy REST methods (replace with real requests calls later)
# ------------------------------------------------------------------
def _build_payload(self) -> dict:
"""Collect the current UI state as a serializable dictionary."""
return {
"scilog": self._scilog_panel.get_data(),
"signal": self._signal_panel.get_data(),
"teams": self._teams_panel.get_data(),
"event_subscriptions": self._scope_event_table.get_data(),
}
def _refresh_scope_event_table(self) -> None:
"""Refresh the event subscription table from the current service cards."""
self._scope_event_table.set_services(self._collect_services_for_event_table())
def _collect_services_for_event_table(self) -> list[dict]:
"""Collect all configured services for the event subscription table."""
service_rows: list[dict] = []
for panel in (self._scilog_panel, self._signal_panel, self._teams_panel):
panel_data = panel.get_data()
for source_name in ("deployment", "session"):
for service in panel_data[source_name]:
service_rows.append({**service, "source": source_name})
return service_rows
def _mock_save_to_atlas_api(self) -> None:
"""Simulate saving the current configuration to Atlas."""
payload = self._build_payload()
print("" * 60)
print("[BECMessagingConfigWidget] _mock_save_to_atlas_api payload:")
print(json.dumps(payload, indent=2))
print("" * 60)
self._set_status("✅ Saved!", timeout_ms=4000)
# ------------------------------------------------------------------
# Status bar helper
# ------------------------------------------------------------------
def _set_status(self, message: str, *, timeout_ms: int = 0) -> None:
"""Show a status message and optionally clear it after a timeout.
Args:
message (str): The message to display in the status label.
timeout_ms (int): Time in milliseconds before clearing the message.
"""
self._status_label.setText(message)
if timeout_ms > 0:
QTimer.singleShot(timeout_ms, lambda: self._status_label.setText(""))
if __name__ == "__main__": # pragma: no cover
import sys
from bec_lib.messages import (
DeploymentInfoMessage,
MessagingConfig,
MessagingServiceScopeConfig,
SciLogServiceInfo,
SessionInfoMessage,
SignalServiceInfo,
TeamsServiceInfo,
)
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
apply_theme("dark")
# ── Build a realistic mock DeploymentInfoMessage ──────────────────
mock_deployment = DeploymentInfoMessage(
deployment_id="dep-0001",
name="mockup-beamline",
messaging_config=MessagingConfig(
signal=MessagingServiceScopeConfig(enabled=True),
teams=MessagingServiceScopeConfig(enabled=True),
scilog=MessagingServiceScopeConfig(enabled=True),
),
messaging_services=[
SciLogServiceInfo(
id="sl-global-1",
scope="beamline",
enabled=True,
name="Beamline Log",
logbook_id="lb-99001",
),
TeamsServiceInfo(
id="teams-global-1",
scope="beamline",
enabled=True,
name="BEC Channel",
workflow_webhook_url="https://outlook.office.com/webhook/…",
),
SignalServiceInfo(
id="signal-global-1",
scope="beamline",
enabled=False,
name=None,
group_id=None,
group_link=None,
),
],
active_session=SessionInfoMessage(
name="session-2026-03-07",
messaging_services=[
SciLogServiceInfo(
id="sl-local-1",
scope="experiment",
enabled=True,
name="My Notebook",
logbook_id="lb-12345",
),
SignalServiceInfo(
id="signal-local-1",
scope="experiment",
enabled=True,
name="Lab Signal Group",
group_id="grp-8a3f291c",
group_link="https://signal.group/#grp-8a3f291c",
),
],
),
)
widget = BECMessagingConfigWidget()
widget.populate_from_deployment(mock_deployment)
widget.show()
sys.exit(app.exec())

View File

@@ -0,0 +1,429 @@
"""Module for service scope cards used by the messaging configuration widget."""
from __future__ import annotations
import uuid
from enum import IntEnum
from typing import TYPE_CHECKING, Literal, Type
from bec_qthemes import material_icon
from qtpy.QtCore import QRegularExpression, Qt, QTimer, Signal # type: ignore[attr-defined]
from qtpy.QtGui import QRegularExpressionValidator
from qtpy.QtWidgets import (
QCheckBox,
QFrame,
QHBoxLayout,
QLabel,
QLineEdit,
QPushButton,
QScrollArea,
QSizePolicy,
QSpacerItem,
QStackedLayout,
QToolButton,
QVBoxLayout,
QWidget,
)
if TYPE_CHECKING: # pragma: no cover
from bec_lib import messages
CardType = Literal["scilog", "signal", "teams"]
class ScopeListWidget(QScrollArea):
"""A scrollable list that stacks scope cards neatly at the top."""
cards_changed = Signal()
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self.setWidgetResizable(True)
self.setFrameShape(QFrame.Shape.NoFrame)
self._container = QWidget()
self._layout = QVBoxLayout(self._container)
self._layout.setContentsMargins(4, 8, 4, 8)
self._layout.setSpacing(16)
self._spacer = QSpacerItem(0, 0, QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Expanding)
self._layout.addSpacerItem(self._spacer)
self.setWidget(self._container)
def add_card(self, card: BaseScopeCard) -> None:
"""Insert a card above the trailing spacer.
Args:
card (BaseScopeCard): The card widget to add to the list.
"""
idx = self._layout.count() - 1
self._layout.insertWidget(idx, card)
card.delete_requested.connect(lambda: self._remove_card(card))
card.delete_requested.connect(self.cards_changed)
card.data_changed.connect(self.cards_changed)
self.cards_changed.emit()
def clear_cards(self) -> None:
"""Remove all cards without touching the trailing spacer."""
for index in range(self._layout.count() - 2, -1, -1):
item = self._layout.itemAt(index)
if item is None:
continue
card = item.widget()
if isinstance(card, BaseScopeCard):
self._layout.removeWidget(card)
card.deleteLater()
self.cards_changed.emit()
def cards(self) -> list[BaseScopeCard]:
"""Return the cards currently stored in the list."""
results: list[BaseScopeCard] = []
for index in range(self._layout.count()):
item = self._layout.itemAt(index)
if item is None:
continue
card = item.widget()
if isinstance(card, BaseScopeCard):
results.append(card)
return results
def _remove_card(self, card: BaseScopeCard) -> None:
self._layout.removeWidget(card)
card.deleteLater()
self.cards_changed.emit()
class BaseScopeCard(QFrame):
"""Base card with shared identity, scope, and enabled fields."""
delete_requested = Signal()
data_changed = Signal()
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self._id: str = str(uuid.uuid4())
self.setFrameShape(QFrame.Shape.StyledPanel)
self.setFrameShadow(QFrame.Shadow.Raised)
self.setStyleSheet(
"BaseScopeCard {"
" border: 1px solid palette(mid);"
" border-radius: 6px;"
" background: palette(base);"
"}"
)
root = QVBoxLayout(self)
root.setContentsMargins(20, 16, 20, 20)
root.setSpacing(14)
header_row = QHBoxLayout()
header_row.setSpacing(10)
self.enabled_checkbox = QCheckBox("Enabled")
self.enabled_checkbox.setChecked(True)
self.enabled_checkbox.toggled.connect(self.data_changed)
header_row.addWidget(self.enabled_checkbox)
header_row.addStretch(1)
self._delete_btn = QToolButton()
delete_icon = material_icon(
"delete", size=(25, 25), convert_to_pixmap=False, filled=False, color="#CC181E"
)
self._delete_btn.setToolTip("Delete this scope configuration")
self._delete_btn.setIcon(delete_icon)
self._delete_btn.clicked.connect(self.delete_requested)
header_row.addWidget(self._delete_btn)
root.addLayout(header_row)
identity_row = QHBoxLayout()
identity_row.setSpacing(16)
scope_col = QVBoxLayout()
scope_col.setSpacing(4)
scope_col.addWidget(QLabel("Scope"))
self.scope_edit = QLineEdit()
self.scope_edit.setPlaceholderText("e.g. user, admin")
self.scope_edit.textChanged.connect(self.data_changed)
scope_col.addWidget(self.scope_edit)
identity_row.addLayout(scope_col, 1)
name_col = QVBoxLayout()
name_col.setSpacing(4)
name_col.addWidget(QLabel("Name (optional)"))
self.name_edit = QLineEdit()
self.name_edit.setPlaceholderText("display name")
self.name_edit.textChanged.connect(self.data_changed)
name_col.addWidget(self.name_edit)
identity_row.addLayout(name_col, 1)
root.addLayout(identity_row)
self.content_layout = QVBoxLayout()
self.content_layout.setContentsMargins(0, 0, 0, 0)
self.content_layout.setSpacing(12)
root.addLayout(self.content_layout)
def get_data(self) -> dict:
"""Return the common payload for a messaging service card."""
return {
"id": self._id,
"scope": self.scope_edit.text(),
"enabled": self.enabled_checkbox.isChecked(),
"name": self.name_edit.text() or None,
}
def set_data(self, info: messages.MessagingService) -> None: # type: ignore[name-defined]
"""Populate the shared card fields from a messaging service.
Args:
info (messages.MessagingService): The service object used to populate the card.
"""
self._id = info.id
self.scope_edit.setText(info.scope)
self.enabled_checkbox.setChecked(info.enabled)
self.name_edit.setText(info.name or "")
class SciLogScopeCard(BaseScopeCard):
"""Card used to configure SciLog service settings."""
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
col = QVBoxLayout()
col.setSpacing(4)
col.addWidget(QLabel("Logbook ID"))
self.logbook_id_edit = QLineEdit()
self.logbook_id_edit.setPlaceholderText("e.g. lb-12345")
self.logbook_id_edit.textChanged.connect(self.data_changed)
col.addWidget(self.logbook_id_edit)
self.content_layout.addLayout(col)
def get_data(self) -> dict:
"""Return the SciLog-specific payload for this card."""
data = super().get_data()
data["service_type"] = "scilog"
data["logbook_id"] = self.logbook_id_edit.text()
return data
def set_data(self, info: messages.SciLogServiceInfo) -> None: # type: ignore[override]
"""Populate the card from SciLog service information.
Args:
info (messages.SciLogServiceInfo): The SciLog service object used to populate the card.
"""
super().set_data(info)
self.logbook_id_edit.setText(info.logbook_id)
class TeamsScopeCard(BaseScopeCard):
"""Card used to configure MS Teams service settings."""
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
fields_row = QHBoxLayout()
fields_row.setSpacing(16)
col = QVBoxLayout()
col.setSpacing(4)
col.addWidget(QLabel("Workflow Webhook URL"))
self.workflow_webhook_url_edit = edit = QLineEdit(parent=self)
edit.setPlaceholderText("e.g. https://outlook.office.com/webhook/…")
edit.textChanged.connect(self.data_changed)
col.addWidget(edit)
fields_row.addLayout(col, 1)
self.content_layout.addLayout(fields_row)
def get_data(self) -> dict:
"""Return the MS Teams-specific payload for this card."""
data = super().get_data()
data["service_type"] = "teams"
data["workflow_webhook_url"] = self.workflow_webhook_url_edit.text()
return data
def set_data(self, info: messages.TeamsServiceInfo) -> None: # type: ignore[override]
"""Populate the card from MS Teams service information.
Args:
info (messages.TeamsServiceInfo): The MS Teams service object used to populate the card.
"""
super().set_data(info)
self.workflow_webhook_url_edit.setText(info.workflow_webhook_url)
class _SignalState(IntEnum):
UNCONFIGURED = 0
PENDING = 1
CONFIGURED = 2
class SignalScopeCard(BaseScopeCard):
"""Card used to configure Signal service settings and linking state."""
_MOCK_GROUP_ID = "grp-8a3f291c"
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self._state = _SignalState.UNCONFIGURED
self._mock_group_id: str = ""
self._mock_group_link: str = ""
stacked_container = QWidget()
self._stacked = QStackedLayout(stacked_container)
self._stacked.setContentsMargins(0, 0, 0, 0)
self.content_layout.addWidget(stacked_container)
self._build_unconfigured_page()
self._build_pending_page()
self._build_configured_page()
self._stacked.setCurrentIndex(_SignalState.UNCONFIGURED)
def _build_unconfigured_page(self) -> None:
page = QWidget()
row = QHBoxLayout(page)
row.setContentsMargins(0, 0, 0, 0)
row.setSpacing(6)
phone_col = QVBoxLayout()
phone_col.setSpacing(4)
phone_col.addWidget(QLabel("Phone Number"))
self._phone_edit = QLineEdit()
self._phone_edit.setValidator(
QRegularExpressionValidator(QRegularExpression(r"^\+\S*$"), self._phone_edit)
)
self._phone_edit.setPlaceholderText("+41791234567")
self._phone_edit.textChanged.connect(self.data_changed)
phone_col.addWidget(self._phone_edit)
row.addLayout(phone_col, 1)
start_linking_btn = QPushButton("Start Linking")
start_linking_btn.setFixedWidth(100)
start_linking_btn.clicked.connect(self._on_ping_clicked)
row.addWidget(start_linking_btn, 0, Qt.AlignmentFlag.AlignBottom)
self._stacked.addWidget(page)
def _build_pending_page(self) -> None:
page = QWidget()
row = QHBoxLayout(page)
row.setContentsMargins(0, 0, 0, 0)
row.setSpacing(6)
waiting_lbl = QLabel("⏳ Waiting for you to reply on Signal…")
waiting_lbl.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Preferred)
row.addWidget(waiting_lbl)
cancel_btn = QPushButton("Cancel")
cancel_btn.clicked.connect(self._on_cancel_clicked)
row.addWidget(cancel_btn)
self._stacked.addWidget(page)
def _build_configured_page(self) -> None:
page = QWidget()
row = QHBoxLayout(page)
row.setContentsMargins(0, 0, 0, 0)
row.setSpacing(6)
self._linked_lbl = QLabel("🟢 Linked (Group ID: —)")
self._linked_lbl.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Preferred)
row.addWidget(self._linked_lbl)
unlink_btn = QPushButton("Unlink")
unlink_btn.clicked.connect(self._on_unlink_clicked)
row.addWidget(unlink_btn)
self._stacked.addWidget(page)
def _on_ping_clicked(self) -> None:
self._state = _SignalState.PENDING
self._stacked.setCurrentIndex(_SignalState.PENDING)
QTimer.singleShot(3000, self._mock_backend_confirmation)
def _mock_backend_confirmation(self) -> None:
if self._state != _SignalState.PENDING:
return
self._mock_group_id = self._MOCK_GROUP_ID
self._mock_group_link = f"https://signal.group/#{self._mock_group_id}"
self._linked_lbl.setText(f"🟢 Linked (Group ID: {self._mock_group_id})")
self._state = _SignalState.CONFIGURED
self._stacked.setCurrentIndex(_SignalState.CONFIGURED)
self.data_changed.emit()
def _on_cancel_clicked(self) -> None:
self._state = _SignalState.UNCONFIGURED
self._stacked.setCurrentIndex(_SignalState.UNCONFIGURED)
self.data_changed.emit()
def _on_unlink_clicked(self) -> None:
self._mock_group_id = ""
self._mock_group_link = ""
self._state = _SignalState.UNCONFIGURED
self._stacked.setCurrentIndex(_SignalState.UNCONFIGURED)
self.data_changed.emit()
def get_data(self) -> dict:
"""Return the Signal-specific payload for this card."""
data = super().get_data()
data["service_type"] = "signal"
configured = self._state == _SignalState.CONFIGURED
data["group_id"] = self._mock_group_id if configured else None
data["group_link"] = self._mock_group_link if configured else None
return data
def set_data(self, info: messages.SignalServiceInfo) -> None: # type: ignore[override]
"""Populate the card from Signal service information.
Args:
info (messages.SignalServiceInfo): The Signal service object used to populate the card.
"""
super().set_data(info)
if info.group_id:
self._mock_group_id = info.group_id
self._mock_group_link = info.group_link or ""
self._linked_lbl.setText(f"🟢 Linked (Group ID: {self._mock_group_id})")
self._state = _SignalState.CONFIGURED
self._stacked.setCurrentIndex(_SignalState.CONFIGURED)
return
self._mock_group_id = ""
self._mock_group_link = ""
self._state = _SignalState.UNCONFIGURED
self._stacked.setCurrentIndex(_SignalState.UNCONFIGURED)
_CARD_CLASSES: dict[CardType, Type[BaseScopeCard]] = {
"scilog": SciLogScopeCard,
"signal": SignalScopeCard,
"teams": TeamsScopeCard,
}
def make_card(card_type: CardType) -> BaseScopeCard:
"""Create a new service card for the requested card type.
Args:
card_type (CardType): The service type for the card to create.
"""
return _CARD_CLASSES[card_type]()
def card_from_service(info: object) -> BaseScopeCard:
"""Create and populate a card from a messaging service object.
Args:
info (object): A messaging service object with a ``service_type`` attribute.
"""
service_type: str = getattr(info, "service_type", "")
card_class = _CARD_CLASSES.get(service_type) # type: ignore[arg-type]
if card_class is None:
raise ValueError(f"Unknown service_type: {service_type!r}")
card = card_class()
card.set_data(info) # type: ignore[arg-type]
return card

View File

@@ -0,0 +1,125 @@
"""Module for the service scope event subscription table widget."""
from __future__ import annotations
from qtpy.QtCore import Qt
from qtpy.QtWidgets import (
QAbstractItemView,
QCheckBox,
QHBoxLayout,
QHeaderView,
QSizePolicy,
QTableWidget,
QVBoxLayout,
QWidget,
)
class ServiceScopeEventTableWidget(QWidget):
"""Widget that manages per-scope event subscriptions for messaging services."""
EVENT_NAMES = ("new_scan", "scan_finished", "alarm")
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self._services: list[dict] = []
self._subscriptions: dict[str, dict[str, bool]] = {}
root = QVBoxLayout(self)
root.setContentsMargins(0, 0, 0, 0)
root.setSpacing(0)
self._table = QTableWidget(len(self.EVENT_NAMES), 0, self)
self._table.setEditTriggers(QAbstractItemView.EditTrigger.NoEditTriggers)
self._table.setSelectionMode(QAbstractItemView.SelectionMode.NoSelection)
self._table.setAlternatingRowColors(True)
self._table.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
self._table.setVerticalHeaderLabels(list(self.EVENT_NAMES))
self._table.horizontalHeader().setStretchLastSection(True)
header = self._table.horizontalHeader()
header.setSectionResizeMode(QHeaderView.ResizeMode.Stretch)
self._table.verticalHeader().setSectionResizeMode(QHeaderView.ResizeMode.ResizeToContents)
root.addWidget(self._table, 1)
def set_services(self, services: list[dict]) -> None:
"""Update the table rows to match the current services.
Args:
services (list[dict]): Service dictionaries collected from the service configuration panels.
"""
self._services = [dict(service) for service in services]
known_ids = {str(service.get("id", "")) for service in self._services if service.get("id")}
self._subscriptions = {
service_id: subscriptions
for service_id, subscriptions in self._subscriptions.items()
if service_id in known_ids
}
self._table.clearContents()
self._table.setRowCount(len(self.EVENT_NAMES))
self._table.setColumnCount(len(self._services))
self._table.setHorizontalHeaderLabels(
[self._format_service_label(service) for service in self._services]
)
for column, service in enumerate(self._services):
service_id = str(service.get("id", ""))
event_states = self._subscriptions.setdefault(
service_id, {event_name: False for event_name in self.EVENT_NAMES}
)
for row, event_name in enumerate(self.EVENT_NAMES):
self._table.setCellWidget(
row,
column,
self._make_checkbox_cell(
service_id, event_name, event_states.get(event_name, False)
),
)
def get_data(self) -> list[dict]:
"""Return the event subscriptions for the current services."""
results: list[dict] = []
for service in self._services:
service_id = str(service.get("id", ""))
results.append(
{
"id": service_id,
"source": service.get("source"),
"service_type": service.get("service_type"),
"scope": service.get("scope"),
"events": dict(
self._subscriptions.get(
service_id, {event_name: False for event_name in self.EVENT_NAMES}
)
),
}
)
return results
def _format_service_label(self, service: dict) -> str:
service_name = str(service.get("service_type", ""))
scope_name = str(service.get("scope", ""))
source_name = str(service.get("source", ""))
return f"{service_name}\n{scope_name}\n({source_name})"
def _make_checkbox_cell(self, service_id: str, event_name: str, checked: bool) -> QWidget:
checkbox = QCheckBox()
checkbox.setChecked(checked)
checkbox.toggled.connect(
lambda state, current_service_id=service_id, current_event_name=event_name: self._set_event_state(
current_service_id, current_event_name, state
)
)
container = QWidget()
layout = QHBoxLayout(container)
layout.setContentsMargins(0, 0, 0, 0)
layout.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout.addWidget(checkbox)
return container
def _set_event_state(self, service_id: str, event_name: str, checked: bool) -> None:
self._subscriptions.setdefault(service_id, {})[event_name] = checked

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "bec_widgets"
version = "3.1.2"
version = "3.3.0"
description = "BEC Widgets"
requires-python = ">=3.11"
classifiers = [
@@ -13,14 +13,14 @@ classifiers = [
"Topic :: Scientific/Engineering",
]
dependencies = [
"bec_ipython_client~=3.106", # needed for jupyter console
"bec_lib~=3.106",
"bec_qthemes~=1.0, >=1.3.3",
"bec_ipython_client~=3.107,>=3.107.2", # needed for jupyter console
"bec_lib~=3.107,>=3.107.2",
"bec_qthemes~=1.0, >=1.3.4",
"black>=26,<27", # needed for bw-generate-cli
"isort>=5.13, <9.0", # needed for bw-generate-cli
"ophyd_devices~=1.29, >=1.29.1",
"pydantic~=2.0",
"pyqtgraph==0.14.0",
"pyqtgraph==0.13.7",
"PySide6==6.9.0",
"qtconsole~=5.5, >=5.5.1", # needed for jupyter console
"qtpy~=2.4",
@@ -32,6 +32,7 @@ dependencies = [
"copier~=9.7",
"typer~=0.15",
"markdown~=3.9",
"PyJWT~=2.9",
]

View File

@@ -75,6 +75,13 @@ def test_dock_manipulations_e2e(qtbot, connected_client_gui_obj):
w1 = dock_area.new("Waveform")
w2 = dock_area.new("Waveform")
qtbot.waitUntil(
lambda: all(
gui_id in gui._server_registry for gui_id in [w0._gui_id, w1._gui_id, w2._gui_id]
),
timeout=5000,
)
assert hasattr(gui.bec, "Waveform")
assert hasattr(gui.bec, "Waveform_0")
assert hasattr(gui.bec, "Waveform_1")
@@ -126,6 +133,7 @@ def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
xw = gui.new("X")
xw.delete_all()
qtbot.waitUntil(lambda: len(gui.windows) == 2, timeout=3000)
assert xw.__class__.__name__ == "RPCReference"
assert gui._ipython_registry[xw._gui_id].__class__.__name__ == "BECDockArea"
assert len(gui.windows) == 2
@@ -145,12 +153,15 @@ def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
qtbot.waitUntil(wait_for_gui_started, timeout=3000)
# gui.windows should have bec with gui_id 'bec'
qtbot.waitUntil(lambda: len(gui.windows) == 1, timeout=3000)
assert len(gui.windows) == 1
# communication should work, main dock area should have same id and be visible
yw = gui.new("Y")
yw.delete_all()
qtbot.waitUntil(lambda: len(gui.windows) == 2, timeout=3000)
assert len(gui.windows) == 2
yw.remove()
qtbot.waitUntil(lambda: len(gui.windows) == 1, timeout=3000)
assert len(gui.windows) == 1 # only bec is left

View File

@@ -72,6 +72,7 @@ def test_rpc_plotting_shortcuts_init_configs(qtbot, connected_client_gui_obj):
"dap": None,
"device": "bpm4i",
"signal": "bpm4i",
"dap_parameters": None,
"dap_oversample": 1,
}
assert c1._config_dict["source"] == "device"

View File

@@ -89,8 +89,8 @@ def test_available_widgets(qtbot, connected_client_gui_obj):
# Skip private attributes
if object_name.startswith("_"):
continue
# Skip VSCode widget as Code server is not available in the Docker image
if object_name == "VSCodeEditor":
# Skip BECShell as ttyd is not installed
if object_name == "BECShell":
continue
# Skip WebConsole as ttyd is not installed

View File

@@ -128,13 +128,9 @@ def maybe_remove_dock_area(qtbot, gui: BECGuiClient, random_int_gen: random.Rand
random_int = random_int_gen.randint(0, 100)
if random_int >= 50:
# Needed, reference gets deleted in the gui
name = gui.dock_area.object_name
gui_id = gui.dock_area._gui_id
gui.dock_area.delete_all() # start fresh
gui.delete("dock_area")
wait_for_namespace_change(
qtbot, gui=gui, parent_widget=gui, object_name=name, widget_gui_id=gui_id, exists=False
)
qtbot.waitUntil(lambda: hasattr(gui, "dock_area") is False, timeout=5000)
@pytest.mark.timeout(PYTEST_TIMEOUT)

View File

@@ -1,47 +1,18 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
from math import inf
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, PropertyMock, patch
import fakeredis
import pytest
from bec_lib.bec_service import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.redis_connector import RedisConnector
from bec_lib.scan_history import ScanHistory
from bec_widgets.tests.utils import DEVICES, DMMock, FakePositioner, Positioner
def fake_redis_server(host, port, **kwargs):
redis = fakeredis.FakeRedis()
return redis
from bec_widgets.tests.utils import FakePositioner, Positioner
@pytest.fixture(scope="function")
def mocked_client(bec_dispatcher):
connector = RedisConnector("localhost:1", redis_cls=fake_redis_server)
# Create a MagicMock object
client = MagicMock() # TODO change to real BECClient
# Shutdown the original client
bec_dispatcher.client.shutdown()
# Mock the connector attribute
bec_dispatcher.client = client
# Mock the device_manager.devices attribute
client.connector = connector
client.device_manager = DMMock()
client.device_manager.add_devices(DEVICES)
def mock_mv(*args, relative=False):
# Extracting motor and value pairs
for i in range(0, len(args), 2):
motor = args[i]
value = args[i + 1]
motor.move(value, relative=relative)
return MagicMock(wait=MagicMock())
client.scans = MagicMock(mv=mock_mv)
# Ensure isinstance check for Positioner passes
original_isinstance = isinstance
@@ -52,8 +23,8 @@ def mocked_client(bec_dispatcher):
return original_isinstance(obj, class_info)
with patch("builtins.isinstance", new=isinstance_mock):
yield client
connector.shutdown() # TODO change to real BECClient
yield bec_dispatcher.client
bec_dispatcher.client.connector.shutdown()
##################################################
@@ -190,17 +161,16 @@ def mocked_client_with_dap(mocked_client, dap_plugin_message):
name="LmfitService1D", status=1, info={}
),
}
client = mocked_client
client.service_status = dap_services
client.connector.set(
type(mocked_client).service_status = PropertyMock(return_value=dap_services)
mocked_client.connector.set(
topic=MessageEndpoints.dap_available_plugins("dap"), msg=dap_plugin_message
)
# Patch the client's DAP attribute so that the available models include "GaussianModel"
patched_models = {"GaussianModel": {}, "LorentzModel": {}, "SineModel": {}}
client.dap._available_dap_plugins = patched_models
mocked_client.dap._available_dap_plugins = patched_models
yield client
yield mocked_client
class DummyData:
@@ -233,7 +203,6 @@ def create_dummy_scan_item():
"readout_priority": {"monitored": ["bpm4i"], "async": ["async_device"]},
}
}
dummy_scan.status_message = MagicMock()
dummy_scan.status_message.info = {
"readout_priority": {"monitored": ["bpm4i"], "async": ["async_device"]},
"scan_report_devices": ["samx"],

View File

@@ -1,19 +1,28 @@
import json
import time
from unittest import mock
from unittest.mock import patch
import fakeredis
import h5py
import numpy as np
import pytest
from bec_lib import messages
from bec_lib import messages, service_config
from bec_lib.bec_service import messages
from bec_lib.client import BECClient
from bec_lib.messages import _StoredDataInfo
from bec_qthemes import apply_theme
from bec_qthemes._theme import Theme
from ophyd._pyepics_shim import _dispatcher
from pytestqt.exceptions import TimeoutError as QtBotTimeoutError
from qtpy.QtCore import QEvent, QEventLoop
from qtpy.QtWidgets import QApplication, QMessageBox
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.tests.utils import DEVICES, DMMock
from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module
from bec_widgets.utils import error_popups
from bec_widgets.utils.bec_dispatcher import QtRedisConnector
# Patch to set default RAISE_ERROR_DEFAULT to True for tests
# This means that by default, error popups will raise exceptions during tests
@@ -38,15 +47,20 @@ def process_all_deferred_deletes(qapp):
def qapplication(qtbot, request, testable_qtimer_class): # pylint: disable=unused-argument
qapp = QApplication.instance()
process_all_deferred_deletes(qapp)
apply_theme("light")
qapp.processEvents()
if (
not hasattr(qapp, "theme")
or not isinstance(qapp.theme, Theme)
or qapp.theme.theme != "light"
):
apply_theme("light")
qapp.processEvents()
yield
# if the test failed, we don't want to check for open widgets as
# it simply pollutes the output
# stop pyepics dispatcher for leaking tests
from ophyd._pyepics_shim import _dispatcher
_dispatcher.stop()
if request.node.stash._storage.get("failed"):
@@ -71,9 +85,37 @@ def rpc_register():
RPCRegister.reset_singleton()
_REDIS_CONN: QtRedisConnector | None = None
def global_mock_qt_redis_connector(*_, **__):
global _REDIS_CONN
if _REDIS_CONN is None:
_REDIS_CONN = QtRedisConnector(bootstrap="localhost:1", redis_cls=fakeredis.FakeRedis)
return _REDIS_CONN
def mock_client(*_, **__):
with (
patch("bec_lib.client.DeviceManagerBase", DMMock),
patch("bec_lib.client.DAPPlugins"),
patch("bec_lib.client.Scans"),
patch("bec_lib.client.ScanManager"),
patch("bec_lib.bec_service.BECAccess"),
):
client = BECClient(
config=service_config.ServiceConfig(config={"redis": {"host": "localhost", "port": 1}}),
connector_cls=global_mock_qt_redis_connector,
)
client.start()
client.device_manager.add_devices(DEVICES)
return client
@pytest.fixture(autouse=True)
def bec_dispatcher(threads_check): # pylint: disable=unused-argument
bec_dispatcher = bec_dispatcher_module.BECDispatcher()
with mock.patch.object(bec_dispatcher_module, "BECClient", mock_client):
bec_dispatcher = bec_dispatcher_module.BECDispatcher()
yield bec_dispatcher
bec_dispatcher.disconnect_all()
# clean BEC client

View File

@@ -1,5 +1,7 @@
# pylint: disable=missing-function-docstring, missing-module-docstring, unused-import
from unittest.mock import MagicMock
import pytest
from bec_widgets.widgets.control.buttons.button_abort.button_abort import AbortButton
@@ -10,6 +12,7 @@ from .client_mocks import mocked_client
@pytest.fixture
def abort_button(qtbot, mocked_client):
widget = AbortButton(client=mocked_client)
widget.queue = MagicMock()
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget

View File

@@ -0,0 +1,720 @@
import datetime
import time
from types import SimpleNamespace
from unittest import mock
import jwt
import pytest
from bec_lib.messages import (
DeploymentInfoMessage,
ExperimentInfoMessage,
MessagingConfig,
MessagingServiceScopeConfig,
SessionInfoMessage,
)
from qtpy.QtCore import QByteArray, QUrl
from qtpy.QtNetwork import QNetworkRequest
from bec_widgets.utils.fuzzy_search import is_match
from bec_widgets.widgets.services.bec_atlas_admin_view.bec_atlas_admin_view import BECAtlasAdminView
from bec_widgets.widgets.services.bec_atlas_admin_view.bec_atlas_http_service import (
AtlasEndpoints,
AuthenticatedUserInfo,
BECAtlasHTTPError,
BECAtlasHTTPService,
HTTPResponse,
)
from bec_widgets.widgets.services.bec_atlas_admin_view.experiment_selection.experiment_mat_card import (
ExperimentMatCard,
)
from bec_widgets.widgets.services.bec_atlas_admin_view.experiment_selection.experiment_selection import (
ExperimentSelection,
)
from bec_widgets.widgets.services.bec_atlas_admin_view.experiment_selection.utils import (
format_datetime,
format_name,
format_schedule,
)
class _FakeQByteArray:
def __init__(self, payload: bytes):
self._payload = payload
def data(self) -> bytes:
return self._payload
class _FakeReply:
def __init__(
self,
*,
request_url: str,
status: int = 200,
payload: bytes = b"{}",
headers: list[tuple[bytes, bytes]] | None = None,
):
self._request_url = request_url
self._status = status
self._payload = payload
self._headers = (
headers
if headers is not None
else [(QByteArray(b"content-type"), QByteArray(b"application/json"))]
)
self.deleted = False
def attribute(self, attr):
assert attr == QNetworkRequest.Attribute.HttpStatusCodeAttribute
return self._status
def readAll(self):
return _FakeQByteArray(self._payload)
def url(self):
return QUrl(self._request_url)
def rawHeaderPairs(self):
return self._headers
def deleteLater(self):
self.deleted = True
class TestBECAtlasHTTPService:
@pytest.fixture
def http_service(self, qtbot):
"""Fixture to create a BECAtlasHTTPService instance."""
service = BECAtlasHTTPService(base_url="http://localhost:8000")
qtbot.addWidget(service)
qtbot.waitExposed(service)
return service
def test_initialization(self, http_service):
"""Test that the BECAtlasHTTPService initializes correctly."""
assert http_service._base_url == "http://localhost:8000"
assert http_service._auth_timer._timer.isActive() == False
assert http_service._headers == {"accept": "application/json"}
def test_get_request_uses_network_manager_get(self, http_service):
"""Test that _get_request uses the network manager's get method with correct parameters."""
with mock.patch.object(http_service.network_manager, "get") as mock_get:
http_service._get_request(
endpoint=AtlasEndpoints.REALMS_EXPERIMENTS.value,
query_parameters={"realm_id": "realm-1"},
)
mock_get.assert_called_once()
request = mock_get.call_args.args[0]
assert request.url().toString() == (
"http://localhost:8000/realms/experiments?realm_id=realm-1"
)
assert request.rawHeader("accept") == QByteArray(b"application/json")
def test_post_request_uses_network_manager_post(self, http_service):
"""Test that _post_request uses the network manager's post method with correct parameters."""
with mock.patch.object(http_service.network_manager, "post") as mock_post:
http_service._post_request(
endpoint=AtlasEndpoints.LOGIN.value, payload={"username": "alice", "password": "pw"}
)
mock_post.assert_called_once()
request, payload = mock_post.call_args.args
assert request.url().toString() == "http://localhost:8000/user/login"
assert request.rawHeader("accept") == QByteArray(b"application/json")
assert payload == b'{"username": "alice", "password": "pw"}'
def test_public_api(self, http_service):
"""Test BEC ATLAS public API methods from the http service."""
with mock.patch.object(http_service, "_get_request") as mock_get:
# User info
http_service.get_user_info()
mock_get.assert_called_once_with(endpoint=AtlasEndpoints.USER_INFO.value)
mock_get.reset_mock()
# Deployment info
http_service.get_deployment_info("dep-1")
mock_get.assert_called_once_with(
endpoint=AtlasEndpoints.DEPLOYMENT_INFO.value,
query_parameters={"deployment_id": "dep-1"},
)
mock_get.reset_mock()
# Realms experiments
http_service.get_experiments_for_realm("realm-1")
mock_get.assert_called_once_with(
endpoint=AtlasEndpoints.REALMS_EXPERIMENTS.value,
query_parameters={"realm_id": "realm-1"},
)
with mock.patch.object(http_service, "_post_request") as mock_post:
# Logout
http_service.logout()
mock_post.assert_called_once_with(endpoint=AtlasEndpoints.LOGOUT.value)
mock_post.reset_mock()
# Login
http_service.login("alice", "pw")
mock_post.assert_called_once_with(
endpoint=AtlasEndpoints.LOGIN.value, payload={"username": "alice", "password": "pw"}
)
mock_post.reset_mock()
# Set experiment
http_service.set_experiment("exp-1", "dep-1")
mock_post.assert_called_once_with(
endpoint=AtlasEndpoints.SET_EXPERIMENT.value,
query_parameters={"experiment_id": "exp-1", "deployment_id": "dep-1"},
)
def test_handle_response_login(self, http_service, qtbot):
"""Test that handling a login response correctly decodes the token and starts the auth timer."""
exp = time.time() + 300
token = jwt.encode({"email": "alice@example.org", "exp": exp}, "secret", algorithm="HS256")
payload = ("{" f'"access_token": "{token}"' "}").encode()
reply = _FakeReply(
request_url="http://localhost:8000/user/login", status=200, payload=payload
)
with mock.patch.object(http_service, "get_user_info") as mock_get_user_info:
with qtbot.waitSignal(http_service.authentication_expires, timeout=1000) as blocker:
http_service._handle_response(reply)
assert blocker.args[0] == pytest.approx(exp)
assert http_service.auth_user_info is not None
assert http_service.auth_user_info.email == "alice@example.org"
assert http_service.auth_user_info.groups == set()
http_service.get_user_info.assert_called_once()
def test_handle_response_logout(self, http_service, qtbot):
"""Test handle response for logout."""
http_service._auth_user_info = AuthenticatedUserInfo(
email="alice@example.org", exp=time.time() + 60, groups={"staff"}, deployment_id="dep-1"
)
reply = _FakeReply(
request_url="http://localhost:8000/user/logout", status=200, payload=b"{}"
)
with qtbot.waitSignal(http_service.authenticated, timeout=1000) as blocker:
http_service._handle_response(reply)
assert blocker.args[0] == {}
assert http_service.auth_user_info is None
def test_handle_response_user_info(self, http_service):
"""Test handle response for user info endpoint correctly updates auth user info."""
http_service._auth_user_info = AuthenticatedUserInfo(
email="alice@example.org", exp=time.time() + 60, groups=set(), deployment_id="dep-1"
)
http_service._current_deployment_info = SimpleNamespace(deployment_id="dep-1")
reply = _FakeReply(
request_url="http://localhost:8000/user/me",
status=200,
payload=b'{"email": "alice@example.org", "groups": ["operators", "staff"]}',
)
with mock.patch.object(http_service, "get_deployment_info") as mock_get_deployment_info:
http_service._handle_response(reply)
assert http_service.auth_user_info is not None
assert http_service.auth_user_info.groups == {"operators", "staff"}
mock_get_deployment_info.assert_called_once_with(deployment_id="dep-1")
def test_handle_response_deployment_info(self, http_service, qtbot):
"""Test handling deployment info response"""
# Groups match: should emit authenticated signal with user info
http_service._auth_user_info = AuthenticatedUserInfo(
email="alice@example.org",
exp=time.time() + 60,
groups={"operators"},
deployment_id="dep-1",
)
reply = _FakeReply(
request_url="http://localhost:8000/deployments/id?deployment_id=dep-1",
status=200,
payload=b'{"owner_groups": ["operators"], "name": "Beamline Deployment"}',
)
with qtbot.waitSignal(http_service.authenticated, timeout=1000) as blocker:
http_service._handle_response(reply)
assert blocker.args[0]["email"] == "alice@example.org"
assert set(blocker.args[0]["groups"]) == {"operators"}
assert blocker.args[0]["deployment_id"] == "dep-1"
# Groups do not match: should show warning and logout
http_service._auth_user_info = AuthenticatedUserInfo(
email="alice@example.org",
exp=time.time() + 60,
groups={"operators"},
deployment_id="dep-1",
)
reply = _FakeReply(
request_url="http://localhost:8000/deployments/id?deployment_id=dep-1",
status=200,
payload=b'{"owner_groups": ["no-operators"], "name": "Beamline Deployment"}',
)
with (
mock.patch.object(http_service, "_show_warning") as mock_show_warning,
mock.patch.object(http_service, "logout") as mock_logout,
):
http_service._handle_response(reply)
mock_show_warning.assert_called_once()
mock_logout.assert_called_once()
def test_handle_response_emits_http_response(self, http_service, qtbot):
"""Test that _handle_response emits the http_response signal with correct parameters for a generic response."""
reply = _FakeReply(
request_url="http://localhost:8000/realms/experiments?realm_id=realm-1",
status=200,
payload=b'{"items": []}',
)
with qtbot.waitSignal(http_service.http_response, timeout=1000) as blocker:
http_service._handle_response(reply)
assert blocker.args[0]["request_url"] == (
"http://localhost:8000/realms/experiments?realm_id=realm-1"
)
assert blocker.args[0]["status"] == 200
assert blocker.args[0]["headers"] == {"content-type": "application/json"}
assert blocker.args[0]["data"] == {"items": []}
def test_handle_response_raises_for_invalid_status(self, http_service):
reply = _FakeReply(
request_url="http://localhost:8000/user/me",
status=401,
payload=b'{"detail": "Unauthorized"}',
)
with pytest.raises(BECAtlasHTTPError):
http_service._handle_response(reply, _override_slot_params={"raise_error": True})
@pytest.fixture
def experiment_info_message() -> ExperimentInfoMessage:
data = {
"_id": "p22622",
"owner_groups": ["admin"],
"access_groups": ["unx-sls_xda_bs", "p22622"],
"realm_id": "TestBeamline",
"proposal": "12345967",
"title": "Test Experiment for Mat Card Widget",
"firstname": "John",
"lastname": "Doe",
"email": "john.doe@psi.ch",
"account": "doe_j",
"pi_firstname": "Jane",
"pi_lastname": "Smith",
"pi_email": "jane.smith@psi.ch",
"pi_account": "smith_j",
"eaccount": "e22622",
"pgroup": "p22622",
"abstract": "This is a test abstract for the experiment mat card widget. It should be long enough to test text wrapping and display in the card. The abstract provides a brief overview of the experiment, its goals, and its significance. This text is meant to simulate a real abstract that might be associated with an experiment in the BEC Atlas system. The card should be able to handle abstracts of varying lengths without any issues, ensuring that the user can read the full abstract even if it is quite long.",
"schedule": [{"start": "01/01/2025 08:00:00", "end": "03/01/2025 18:00:00"}],
"proposal_submitted": "15/12/2024",
"proposal_expire": "31/12/2025",
"proposal_status": "Scheduled",
"delta_last_schedule": 30,
"mainproposal": "",
}
return ExperimentInfoMessage.model_validate(data)
@pytest.fixture
def experiment_info_list(experiment_info_message: ExperimentInfoMessage) -> list[dict]:
"""Fixture to provide a list of experiment info dictionaries."""
another_experiment_info = {
"_id": "p22623",
"owner_groups": ["admin"],
"access_groups": ["unx-sls_xda_bs", "p22623"],
"realm_id": "TestBeamline",
"proposal": "",
"title": "Experiment without Proposal",
"firstname": "Alice",
"lastname": "Johnson",
"email": "alice.johnson@psi.ch",
"account": "johnson_a",
"pi_firstname": "Bob",
"pi_lastname": "Brown",
"pi_email": "bob.brown@psi.ch",
"pi_account": "brown_b",
"eaccount": "e22623",
"pgroup": "p22623",
"abstract": "",
"schedule": [],
"proposal_submitted": "",
"proposal_expire": "",
"proposal_status": "",
"delta_last_schedule": None,
"mainproposal": "",
}
return [
experiment_info_message.model_dump(),
ExperimentInfoMessage.model_validate(another_experiment_info).model_dump(),
]
class TestBECAtlasExperimentSelection:
def test_format_name(self, experiment_info_message: ExperimentInfoMessage):
"""Test utils format name"""
assert format_name(experiment_info_message) == "John Doe"
def test_format_schedule(self, experiment_info_message: ExperimentInfoMessage):
"""Test utils format schedule"""
assert format_schedule(experiment_info_message.schedule) == (
"2025-01-01 08:00",
"2025-01-03 18:00",
)
assert format_schedule(experiment_info_message.schedule, as_datetime=True) == (
datetime.datetime.strptime(
experiment_info_message.schedule[0]["start"], "%d/%m/%Y %H:%M:%S"
),
datetime.datetime.strptime(
experiment_info_message.schedule[0]["end"], "%d/%m/%Y %H:%M:%S"
),
)
assert format_schedule([]) == ("", "")
def test_format_datetime(self):
"""Test utils format datetime"""
dt = datetime.datetime(2025, 1, 1, 8, 0)
assert format_datetime(dt) == "2025-01-01 08:00"
assert format_datetime(None) == ""
@pytest.fixture
def mat_card(self, qtbot):
"""Fixture to create an ExperimentMatCard instance."""
card = ExperimentMatCard()
qtbot.addWidget(card)
qtbot.waitExposed(card)
return card
def test_set_experiment_info(
self, mat_card: ExperimentMatCard, experiment_info_message: ExperimentInfoMessage, qtbot
):
"""Test that set_experiment_info correctly updates the card's display based on the provided experiment info, whether it's a dictionary or an ExperimentInfoMessage instance."""
# Test with ExperimentInfoMessage instance
mat_card.set_experiment_info(experiment_info_message)
assert mat_card._card_pgroup.text() == "p22622"
assert mat_card._card_title.text() == "Next Experiment"
assert mat_card._abstract_label.text() == experiment_info_message.abstract.strip()
assert mat_card.experiment_info == experiment_info_message.model_dump()
assert mat_card._activate_button.isEnabled()
assert mat_card._activate_button.text() == "Activate"
# Test with dictionary input
mat_card.set_experiment_info(experiment_info_message.model_dump())
mat_card.set_title("Experiment Details")
assert mat_card._card_pgroup.text() == "p22622"
assert mat_card._card_title.text() == "Experiment Details"
assert mat_card._abstract_label.text() == experiment_info_message.abstract.strip()
assert mat_card.experiment_info == experiment_info_message.model_dump()
assert mat_card._activate_button.isEnabled()
assert mat_card._activate_button.text() == "Activate"
with qtbot.waitSignal(mat_card.experiment_selected, timeout=1000) as blocker:
mat_card._activate_button.click()
assert blocker.args[0] == experiment_info_message.model_dump()
def test_is_match(self):
"""Test is_match utility function for search functionality."""
data = {"name": "Test Experiment", "description": "This is a test."}
relevant_keys = ["name", "description"]
# Test exact match
assert is_match("Test Experiment", data, relevant_keys, enable_fuzzy=False)
assert not is_match("Nonexistent", data, relevant_keys, enable_fuzzy=False)
# Test fuzzy match
assert not is_match("Nonexistent", data, relevant_keys, enable_fuzzy=True)
assert is_match("Test Experimnt", data, relevant_keys, enable_fuzzy=True)
# Typo should still match with fuzzy enabled
assert is_match("Test Experiement", data, relevant_keys, enable_fuzzy=True)
@pytest.fixture
def experiment_selection(self, qtbot):
"""Fixture to create an ExperimentSelection instance with sample experiment info."""
selection = ExperimentSelection()
qtbot.addWidget(selection)
qtbot.waitExposed(selection)
return selection
def test_set_experiments(
self, experiment_selection: ExperimentSelection, experiment_info_list: list[dict]
):
"""Test that set_experiment_infos correctly populates the experiment selection with provided experiment info."""
with mock.patch.object(
experiment_selection._card_tab, "set_experiment_info"
) as mock_set_experiment_info:
experiment_selection.set_experiment_infos(experiment_info_list)
assert len(experiment_selection._experiment_infos) == 2
# Next experiment should be the first one as the second one has no schedule
mock_set_experiment_info.assert_called_once_with(experiment_info_list[0])
# Should be on card tab
assert experiment_selection._tabs.currentWidget() == experiment_selection._card_tab
def test_filter_functionality(
self, experiment_selection: ExperimentSelection, experiment_info_list: list[dict], qtbot
):
"""Test that the search functionality correctly filters experiments based on the search query."""
wid = experiment_selection
wid.set_experiment_infos(experiment_info_list)
# First move to the table tab
wid._tabs.setCurrentWidget(wid._table_tab)
assert wid._side_card.experiment_info == wid._next_experiment
# Initially, both experiments should be in the table
assert wid._table.rowCount() == 2
with qtbot.waitSignal(wid._with_proposals.toggled, timeout=1000):
wid._with_proposals.setChecked(False) # Should hide one experiment
assert wid._table.rowCount() == 1
with qtbot.waitSignal(wid._without_proposals.toggled, timeout=1000):
wid._without_proposals.setChecked(False) # Should hide the other experiment
assert wid._table.rowCount() == 0
with qtbot.waitSignals(
[wid._without_proposals.toggled, wid._with_proposals.toggled], timeout=1000
):
wid._without_proposals.setChecked(True)
wid._with_proposals.setChecked(True) # Should show both experiments again
assert wid._table.rowCount() == 2
# Click on first experiment and check if side card updates
with qtbot.waitSignal(wid._table.itemSelectionChanged, timeout=1000):
wid._table.selectRow(0) # Select the first experiment
pgroup = wid._table.item(0, 0).text() # pgroup
exp = [exp for exp in experiment_info_list if exp["pgroup"] == pgroup][0]
assert wid._side_card.experiment_info == exp
# Click on second experiment and check if side card updates
with qtbot.waitSignal(wid._table.itemSelectionChanged, timeout=1000):
wid._table.selectRow(1) # Select the second experiment
pgroup = wid._table.item(1, 0).text() # pgroup
exp = [exp for exp in experiment_info_list if exp["pgroup"] == pgroup][0]
assert wid._side_card.experiment_info == exp
wid.search_input.setText("Experiment without Proposal")
with mock.patch.object(wid, "_apply_row_filter") as mock_apply_row_filter:
with qtbot.waitSignal(wid.fuzzy_is_disabled.stateChanged, timeout=1000):
wid.fuzzy_is_disabled.setChecked(True) # Disable fuzzy search
mock_apply_row_filter.assert_called_once_with("Experiment without Proposal")
assert wid._enable_fuzzy_search is False
def test_emit_selected_experiment(
self, experiment_selection: ExperimentSelection, experiment_info_list: list[dict], qtbot
):
"""Test that clicking the activate button on the side card emits the experiment_selected signal with the correct experiment info."""
wid = experiment_selection
wid.set_experiment_infos(experiment_info_list)
wid._tabs.setCurrentWidget(wid._table_tab)
with qtbot.waitSignal(wid._table.itemSelectionChanged, timeout=1000):
wid._table.selectRow(1) # Select the second experiment
pgroup = wid._table.item(1, 0).text() # pgroup
exp = [exp for exp in experiment_info_list if exp["pgroup"] == pgroup][0]
with qtbot.waitSignal(wid.experiment_selected, timeout=1000) as blocker:
wid._side_card._activate_button.click()
assert blocker.args == [exp]
class TestBECAtlasAdminView:
@pytest.fixture
def admin_view(self, qtbot):
"""Fixture to create a BECAtlasAdminView instance."""
with mock.patch(
"bec_widgets.widgets.services.bec_atlas_admin_view.bec_atlas_admin_view.BECAtlasAdminView._connect_dispatcher"
):
view = BECAtlasAdminView()
qtbot.addWidget(view)
qtbot.waitExposed(view)
return view
def test_init_and_login(self, admin_view: BECAtlasAdminView, qtbot):
"""Test that the BECAtlasAdminView initializes correctly."""
# Check that the atlas URL is set correctly
assert admin_view._atlas_url == "https://bec-atlas-dev.psi.ch/api/v1"
# Test that clicking the login button emits the credentials_entered signal with the correct username and password
with mock.patch.object(admin_view.atlas_http_service, "login") as mock_login:
with qtbot.waitSignal(admin_view.overview_widget.login_requested, timeout=1000):
admin_view.overview_widget._login.username.setText("alice")
admin_view.overview_widget._login.password.setText("password")
admin_view.overview_widget._login._emit_credentials()
mock_login.assert_called_once_with(username="alice", password="password")
mock_login.reset_mock()
admin_view._authenticated = True
with mock.patch.object(admin_view, "logout") as mock_logout:
with qtbot.waitSignal(admin_view.overview_widget.login_requested, timeout=1000):
admin_view.overview_widget._login.password.setText("password")
admin_view.overview_widget._login._emit_credentials()
mock_logout.assert_called_once()
mock_login.assert_called_once_with(username="alice", password="password")
def test_on_experiment_selected(
self, admin_view: BECAtlasAdminView, deployment_info: DeploymentInfoMessage, qtbot
):
"""Test that selecting an experiment in the overview widget correctly calls the HTTP service to set the experiment and updates the current experiment view."""
# First we need to simulate that we are authenticated and have deployment info
admin_view._update_deployment_info(deployment_info.model_dump(), {})
with mock.patch.object(
admin_view.atlas_http_service, "set_experiment"
) as mock_set_experiment:
with qtbot.waitSignal(
admin_view.experiment_selection.experiment_selected, timeout=1000
):
admin_view.experiment_selection.experiment_selected.emit(
deployment_info.active_session.experiment.model_dump()
)
mock_set_experiment.assert_called_once_with(
deployment_info.active_session.experiment.pgroup, deployment_info.deployment_id
)
@pytest.fixture
def deployment_info(
self, experiment_info_message: ExperimentInfoMessage
) -> DeploymentInfoMessage:
"""Fixture to provide a DeploymentInfoMessage instance."""
return DeploymentInfoMessage(
deployment_id="dep-1",
name="Test Deployment",
messaging_config=MessagingConfig(
signal=MessagingServiceScopeConfig(enabled=False),
teams=MessagingServiceScopeConfig(enabled=False),
scilog=MessagingServiceScopeConfig(enabled=False),
),
active_session=SessionInfoMessage(
experiment=experiment_info_message, name="Test Session"
),
)
def test_on_authenticated(
self, admin_view: BECAtlasAdminView, deployment_info: DeploymentInfoMessage, qtbot
):
"""Test that the on_authenticated method correctly updates the UI based on authentication state."""
# Simulate successful authentication
auth_info = AuthenticatedUserInfo(
email="alice@example.com",
exp=time.time() + 300,
groups={"operators"},
deployment_id="dep-1",
)
# First check that deployment info updates all fields correctly
admin_view._update_deployment_info(deployment_info.model_dump(), {})
assert admin_view.atlas_http_service._current_deployment_info == deployment_info
assert (
admin_view._atlas_info_widget._bl_info_label.text()
== f"{deployment_info.active_session.experiment.realm_id} @ {deployment_info.name}"
)
assert admin_view._atlas_info_widget._atlas_url_label.text() == admin_view._atlas_url
# Now run on_authenticated, this enables all toolbar buttons
# and calls fetch experiments. It also switches the overview widget
# to the current experiment view.
# Default should be on the overview widget
assert admin_view.stacked_layout.currentWidget() == admin_view.overview_widget
with mock.patch.object(
admin_view, "_fetch_available_experiments"
) as mock_fetch_experiments:
with qtbot.waitSignal(admin_view.authenticated, timeout=1000) as blocker:
admin_view._on_authenticated(auth_info.model_dump())
# Fetch experiments should be called
mock_fetch_experiments.assert_called_once()
assert blocker.args[0] is True
assert (
admin_view._atlas_info_widget._atlas_url_label.text()
== f"{admin_view._atlas_info_widget._atlas_url_text} | {auth_info.email}"
)
assert (
admin_view.toolbar.components.get_action("messaging_services").action.isEnabled()
is False
)
# Logout timer is running
logout_action = admin_view.toolbar.components.get_action("logout")
assert logout_action.action.isEnabled() is True
assert logout_action._tick_timer.isActive() is True
# Current Experiment widget should be visible in the overview
assert (
admin_view.overview_widget.stacked_layout.currentWidget()
== admin_view.overview_widget._experiment_overview_widget
)
# Click toolbar to switch to experiment selection
exp_select = admin_view.toolbar.components.get_action("experiment_selection")
assert exp_select.action.isEnabled() is True
with qtbot.waitSignal(exp_select.action.triggered, timeout=1000):
exp_select.action.trigger()
assert admin_view.stacked_layout.currentWidget() == admin_view.experiment_selection
# Now we simulate that the authentication expires
# This deactivates buttons, resets the overview widget
# and emits authenticated signal with False
with qtbot.waitSignal(admin_view.authenticated, timeout=1000) as blocker:
admin_view._on_authenticated({}) # Simulate not authenticated anymore
assert blocker.args[0] is False
assert logout_action._tick_timer.isActive() is False
assert admin_view._atlas_info_widget._atlas_url_label.text() == admin_view._atlas_url
assert (
admin_view.overview_widget.stacked_layout.currentWidget()
== admin_view.overview_widget._login_widget
)
# View should switch back to overview
assert admin_view.stacked_layout.currentWidget() == admin_view.overview_widget
def test_fetch_experiments(
self, admin_view: BECAtlasAdminView, deployment_info: DeploymentInfoMessage, qtbot
):
"""Test that _fetch_available_experiments correctly calls the HTTP service and updates the experiment selection widget."""
admin_view._update_deployment_info(deployment_info.model_dump(), {})
with mock.patch.object(
admin_view.atlas_http_service, "get_experiments_for_realm"
) as mock_get_experiments:
admin_view._fetch_available_experiments()
mock_get_experiments.assert_called_once_with(
deployment_info.active_session.experiment.realm_id
)
def test_on_http_response_received(
self, experiment_info_list: list[dict], admin_view: BECAtlasAdminView, qtbot
):
"""Test that _on_http_response_received correctly handles HTTP responses and updates the UI accordingly."""
realms = HTTPResponse(
request_url=f"{admin_view._atlas_url}/{AtlasEndpoints.REALMS_EXPERIMENTS}/experiments?realm_id=TestBeamline",
status=200,
headers={"content-type": "application/json"},
data=experiment_info_list,
)
with mock.patch.object(
admin_view.experiment_selection, "set_experiment_infos"
) as mock_set_experiment_infos:
admin_view._on_http_response_received(realms.model_dump())
mock_set_experiment_infos.assert_called_once_with(experiment_info_list)
set_experiment = HTTPResponse(
request_url=f"{admin_view._atlas_url}/{AtlasEndpoints.SET_EXPERIMENT}",
status=200,
headers={"content-type": "application/json"},
data={},
)
with mock.patch.object(admin_view, "_on_overview_selected") as mock_on_overview_selected:
admin_view._on_http_response_received(set_experiment.model_dump())
mock_on_overview_selected.assert_called_once()

View File

@@ -4,10 +4,45 @@ import time
from unittest import mock
import pytest
from bec_lib import service_config
from bec_lib.messages import ScanMessage
from bec_lib.serialization import MsgpackSerialization
from bec_widgets.utils.bec_dispatcher import QtRedisConnector, QtThreadSafeCallback
from bec_widgets.utils.bec_dispatcher import BECDispatcher, QtRedisConnector, QtThreadSafeCallback
def test_init_handles_client_and_config_arg():
# Client passed
self_mock = mock.MagicMock(_initialized=False)
with mock.patch.object(BECDispatcher, "start_cli_server"):
BECDispatcher.__init__(self_mock, client=mock.MagicMock(name="test_client"))
assert "test_client" in repr(self_mock.client)
# No client, service config object
self_mock.reset_mock()
self_mock._initialized = False
with (
mock.patch.object(BECDispatcher, "start_cli_server"),
mock.patch("bec_widgets.utils.bec_dispatcher.BECClient") as client_cls,
):
config = service_config.ServiceConfig()
BECDispatcher.__init__(self_mock, client=None, config=config)
client_cls.assert_called_with(
config=config, connector_cls=QtRedisConnector, name="BECWidgets"
)
# No client, service config string
self_mock.reset_mock()
self_mock._initialized = False
with (
mock.patch.object(BECDispatcher, "start_cli_server"),
mock.patch("bec_widgets.utils.bec_dispatcher.BECClient"),
mock.patch("bec_widgets.utils.bec_dispatcher.ServiceConfig") as svc_cfg,
mock.patch("bec_widgets.utils.bec_dispatcher.isinstance", return_value=False),
):
config = service_config.ServiceConfig()
BECDispatcher.__init__(self_mock, client=None, config="test_str")
svc_cfg.assert_called_with("test_str")
@pytest.fixture

View File

@@ -160,7 +160,7 @@ def test_signal_display(mocked_client, qtbot):
def test_signal_display_no_device(mocked_client, qtbot):
device_mock = mock.MagicMock()
mocked_client.client.device_manager.devices = {"test_device_1": device_mock}
mocked_client.device_manager.devices = {"test_device_1": device_mock}
signal_display = SignalDisplay(client=mocked_client, device="test_device_2")
qtbot.addWidget(signal_display)
assert (

View File

@@ -146,12 +146,12 @@ def test_signal_lineedit(device_signal_line_edit):
def test_device_signal_input_base_cleanup(qtbot, mocked_client):
with mock.patch.object(mocked_client.callbacks, "remove"):
widget = DeviceInputWidget(client=mocked_client)
widget.close()
widget.deleteLater()
widget = DeviceInputWidget(client=mocked_client)
widget.close()
widget.deleteLater()
mocked_client.callbacks.remove.assert_called_once_with(widget._device_update_register)
mocked_client.callbacks.remove.assert_called_once_with(widget._device_update_register)
def test_signal_combobox_get_signal_name_with_item_data(qtbot, device_signal_combobox):

View File

@@ -197,6 +197,163 @@ def test_image_setup_preview_signal_2d(qtbot, mocked_client):
np.testing.assert_array_equal(view.main_image.image, test_data)
def test_switching_device_disconnects_previous_preview_endpoint(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(mocked_client, "eiger", "img", signal_class="PreviewSignal", ndim=2)
_set_signal_config(mocked_client, "waveform1d", "img", signal_class="PreviewSignal", ndim=2)
connected = []
disconnected = []
monkeypatch.setattr(
view.bec_dispatcher,
"connect_slot",
lambda slot, endpoint, *args, **kwargs: connected.append(endpoint),
)
monkeypatch.setattr(
view.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint, *args, **kwargs: disconnected.append(endpoint),
)
view.image(device="eiger", signal="img")
connected.clear()
disconnected.clear()
view.device = "waveform1d"
assert MessageEndpoints.device_preview("eiger", "img") in disconnected
assert MessageEndpoints.device_preview("waveform1d", "img") in connected
def test_switching_device_disconnects_previous_async_endpoint(qtbot, mocked_client, monkeypatch):
"""
Verify that switching device while async_update=True disconnects device_async_signal
endpoints for both scan_id and old_scan_id on the old device before reconnecting to
the new device.
"""
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(
mocked_client, "eiger", "img", signal_class="AsyncSignal", ndim=2, obj_name="async_obj"
)
_set_signal_config(
mocked_client, "waveform1d", "img", signal_class="AsyncSignal", ndim=2, obj_name="async_obj"
)
connected = []
disconnected = []
monkeypatch.setattr(
view.bec_dispatcher,
"connect_slot",
lambda slot, endpoint, *args, **kwargs: connected.append(endpoint),
)
monkeypatch.setattr(
view.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint, *args, **kwargs: disconnected.append(endpoint),
)
view.image(device="eiger", signal="img")
assert view.async_update is True
assert view.subscriptions["main"].async_signal_name == "async_obj"
view.scan_id = "scan_current"
view.old_scan_id = "scan_previous"
connected.clear()
disconnected.clear()
view.device = "waveform1d"
# Both scan_id and old_scan_id endpoints for the old device must be disconnected
assert (
MessageEndpoints.device_async_signal("scan_current", "eiger", "async_obj") in disconnected
)
assert (
MessageEndpoints.device_async_signal("scan_previous", "eiger", "async_obj") in disconnected
)
# The new device's async endpoint for the current scan must be connected
assert (
MessageEndpoints.device_async_signal("scan_current", "waveform1d", "async_obj") in connected
)
def test_switching_signal_disconnects_previous_preview_endpoint(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(mocked_client, "eiger", "img_a", signal_class="PreviewSignal", ndim=2)
_set_signal_config(mocked_client, "eiger", "img_b", signal_class="PreviewSignal", ndim=2)
connected = []
disconnected = []
monkeypatch.setattr(
view.bec_dispatcher,
"connect_slot",
lambda slot, endpoint, *args, **kwargs: connected.append(endpoint),
)
monkeypatch.setattr(
view.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint, *args, **kwargs: disconnected.append(endpoint),
)
view.image(device="eiger", signal="img_a")
connected.clear()
disconnected.clear()
view.signal = "img_b"
assert MessageEndpoints.device_preview("eiger", "img_a") in disconnected
assert MessageEndpoints.device_preview("eiger", "img_b") in connected
def test_switching_signal_disconnects_previous_async_endpoint(qtbot, mocked_client, monkeypatch):
"""
When the current monitor is an async signal, switching to a different signal must
disconnect the previous async endpoint (based on scan_id/async_signal_name) before
reconnecting with the new signal's async endpoint.
"""
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(
mocked_client, "eiger", "img_a", signal_class="AsyncSignal", ndim=2, obj_name="async_obj_a"
)
_set_signal_config(
mocked_client, "eiger", "img_b", signal_class="AsyncSignal", ndim=2, obj_name="async_obj_b"
)
connected = []
disconnected = []
monkeypatch.setattr(
view.bec_dispatcher,
"connect_slot",
lambda slot, endpoint, *args, **kwargs: connected.append(endpoint),
)
monkeypatch.setattr(
view.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint, *args, **kwargs: disconnected.append(endpoint),
)
# Connect to img_a as an async signal; scan_id is None so no actual subscription is made
view.image(device="eiger", signal="img_a")
assert view.async_update is True
assert view.subscriptions["main"].async_signal_name == "async_obj_a"
assert view.subscriptions["main"].source == "device_monitor_2d"
# Simulate an active scan so that the async endpoint is real
view.scan_id = "scan_123"
connected.clear()
disconnected.clear()
# Switch to a different signal
view.signal = "img_b"
# The previous async endpoint for img_a must have been disconnected
expected_disconnect = MessageEndpoints.device_async_signal("scan_123", "eiger", "async_obj_a")
assert expected_disconnect in disconnected
# The new async endpoint for img_b must have been connected
expected_connect = MessageEndpoints.device_async_signal("scan_123", "eiger", "async_obj_b")
assert expected_connect in connected
def test_preview_signals_skip_0d_entries(qtbot, mocked_client, monkeypatch):
"""
Preview/async combobox should omit 0D signals.

View File

@@ -1,4 +1,4 @@
from unittest.mock import patch
from unittest.mock import MagicMock, patch
import numpy as np
@@ -53,14 +53,16 @@ def test_scatter_waveform_update_with_scan_history(qtbot, mocked_client, monkeyp
swf = create_widget(qtbot, ScatterWaveform, client=mocked_client)
dummy_scan = create_dummy_scan_item()
mocked_client.history = MagicMock()
# .get_by_scan_id() typically returns historical data, but we abuse it here
# to return mock live data
mocked_client.history.get_by_scan_id.return_value = dummy_scan
mocked_client.history.__getitem__.return_value = dummy_scan
swf.plot("samx", "samy", "bpm4i", label="test_curve")
swf.update_with_scan_history(scan_id="dummy")
qtbot.wait(500)
assert swf.scan_item == dummy_scan
qtbot.waitUntil(lambda: swf.scan_item == dummy_scan, timeout=500)
qtbot.wait(200)
x_data, y_data = swf.main_curve.getData()
np.testing.assert_array_equal(x_data, [10, 20, 30])

View File

@@ -8,7 +8,7 @@ from .client_mocks import mocked_client
@pytest.fixture
def plot_widget_with_arrow_item(qtbot, mocked_client):
widget = Waveform(client=mocked_client())
widget = Waveform(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
@@ -17,7 +17,7 @@ def plot_widget_with_arrow_item(qtbot, mocked_client):
@pytest.fixture
def plot_widget_with_tick_item(qtbot, mocked_client):
widget = Waveform(client=mocked_client())
widget = Waveform(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)

View File

@@ -516,6 +516,112 @@ def test_plot_custom_curve_with_inline_dap(qtbot, mocked_client_with_dap):
assert dap_curve.config.signal.dap == "GaussianModel"
def test_normalize_dap_parameters_number_dict():
normalized = Waveform._normalize_dap_parameters({"amplitude": 1.0, "center": 2})
assert normalized == {
"amplitude": {"name": "amplitude", "value": 1.0, "vary": False},
"center": {"name": "center", "value": 2.0, "vary": False},
}
def test_normalize_dap_parameters_dict_spec_defaults_vary_false():
normalized = Waveform._normalize_dap_parameters({"sigma": {"value": 0.8, "min": 0.0}})
assert normalized["sigma"]["name"] == "sigma"
assert normalized["sigma"]["value"] == 0.8
assert normalized["sigma"]["min"] == 0.0
assert normalized["sigma"]["vary"] is False
def test_normalize_dap_parameters_invalid_type_raises():
with pytest.raises(TypeError):
Waveform._normalize_dap_parameters(["amplitude", 1.0]) # type: ignore[arg-type]
def test_normalize_dap_parameters_composite_list():
normalized = Waveform._normalize_dap_parameters(
[{"center": 1.0}, {"sigma": {"value": 0.5, "min": 0.0}}],
dap_name=["GaussianModel", "GaussianModel"],
)
assert normalized == [
{"center": {"name": "center", "value": 1.0, "vary": False}},
{"sigma": {"name": "sigma", "value": 0.5, "min": 0.0, "vary": False}},
]
def test_normalize_dap_parameters_composite_dict():
normalized = Waveform._normalize_dap_parameters(
{
"GaussianModel": {"center": {"value": 1.0, "vary": True}},
"LorentzModel": {"amplitude": 2.0},
},
dap_name=["GaussianModel", "LorentzModel"],
)
assert normalized["GaussianModel"]["center"]["value"] == 1.0
assert normalized["GaussianModel"]["center"]["vary"] is True
assert normalized["LorentzModel"]["amplitude"]["value"] == 2.0
assert normalized["LorentzModel"]["amplitude"]["vary"] is False
def test_request_dap_includes_normalized_parameters(qtbot, mocked_client_with_dap, monkeypatch):
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
curve = wf.plot(
x=[0, 1, 2],
y=[1, 2, 3],
label="custom-inline-params",
dap="GaussianModel",
dap_parameters={"amplitude": 1.0},
)
dap_curve = wf.get_curve(f"{curve.name()}-GaussianModel")
assert dap_curve is not None
dap_curve.dap_oversample = 3
captured = {}
def capture(topic, msg, *args, **kwargs): # noqa: ARG001
captured["topic"] = topic
captured["msg"] = msg
monkeypatch.setattr(wf.client.connector, "set_and_publish", capture)
wf.request_dap()
msg = captured["msg"]
dap_kwargs = msg.content["config"]["kwargs"]
assert dap_kwargs["oversample"] == 3
assert dap_kwargs["parameters"] == {
"amplitude": {"name": "amplitude", "value": 1.0, "vary": False}
}
def test_request_dap_includes_composite_parameters_list(qtbot, mocked_client_with_dap, monkeypatch):
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
curve = wf.plot(
x=[0, 1, 2],
y=[1, 2, 3],
label="custom-composite",
dap=["GaussianModel", "GaussianModel"],
dap_parameters=[{"center": 0.0}, {"center": 1.0}],
)
dap_curve = wf.get_curve(f"{curve.name()}-GaussianModel+GaussianModel")
assert dap_curve is not None
captured = {}
def capture(topic, msg, *args, **kwargs): # noqa: ARG001
captured["topic"] = topic
captured["msg"] = msg
monkeypatch.setattr(wf.client.connector, "set_and_publish", capture)
wf.request_dap()
msg = captured["msg"]
dap_kwargs = msg.content["config"]["kwargs"]
assert dap_kwargs["parameters"] == [
{"center": {"name": "center", "value": 0.0, "vary": False}},
{"center": {"name": "center", "value": 1.0, "vary": False}},
]
assert msg.content["config"]["class_kwargs"]["model"] == ["GaussianModel", "GaussianModel"]
def test_fetch_scan_data_and_access(qtbot, mocked_client, monkeypatch):
"""
Test the _fetch_scan_data_and_access method returns live_data/val if in a live scan,

View File

@@ -189,10 +189,10 @@ def test_bec_shell_startup_contains_gui_id(bec_shell_widget):
assert bec_shell._is_bec_shell
assert bec_shell._unique_id == "bec_shell"
assert bec_shell.startup_cmd == "bec --nogui"
with mock.patch.object(bec_shell.bec_dispatcher, "cli_server", None):
assert bec_shell.startup_cmd == "bec --nogui"
with mock.patch.object(bec_shell.bec_dispatcher, "cli_server") as mock_cli_server:
mock_cli_server.gui_id = "test_gui_id"
with mock.patch.object(bec_shell.bec_dispatcher.cli_server, "gui_id", "test_gui_id"):
assert bec_shell.startup_cmd == "bec --gui-id test_gui_id"