1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-18 06:15:37 +02:00

Compare commits

...

11 Commits

Author SHA1 Message Date
semantic-release
1d3e0214fd 3.4.4
Automatically generated by python-semantic-release
2026-04-14 07:33:15 +00:00
37747babda fix: check for duplicate subscriptions in GUIClient 2026-04-14 09:32:17 +02:00
32f5d486d3 fix: make gui client registry callback non static 2026-04-14 09:32:17 +02:00
0ff1fdc815 fix: remove staticmethod subscription 2026-04-14 09:32:17 +02:00
c7de320ca5 fix: check duplicate stream sub 2026-04-14 09:32:17 +02:00
semantic-release
5b23dce3d0 3.4.3
Automatically generated by python-semantic-release
2026-04-13 09:20:13 +00:00
5e84d3bec6 fix: Set OPHYD_CONTROL_LAYER to dummy for tests 2026-04-13 11:19:22 +02:00
semantic-release
9a2396ee9c 3.4.2
Automatically generated by python-semantic-release
2026-04-01 12:55:30 +00:00
2dab16b684 test: Add tests for admin access 2026-04-01 14:54:28 +02:00
e6c8cd0b1a fix: allow admin user to pass deployment group check 2026-04-01 14:54:28 +02:00
242f8933b2 fix(bec-atlas-admin-view): Fix atlas_url to bec-atlas-prod.psi.ch 2026-04-01 14:54:28 +02:00
10 changed files with 192 additions and 100 deletions

View File

@@ -1,6 +1,47 @@
# CHANGELOG
## v3.4.4 (2026-04-14)
### Bug Fixes
- Check duplicate stream sub
([`c7de320`](https://github.com/bec-project/bec_widgets/commit/c7de320ca564264a31b84931f553170f25659685))
- Check for duplicate subscriptions in GUIClient
([`37747ba`](https://github.com/bec-project/bec_widgets/commit/37747babda407040333c6bd04646be9a49e0ee81))
- Make gui client registry callback non static
([`32f5d48`](https://github.com/bec-project/bec_widgets/commit/32f5d486d3fc8d41df2668c58932ae982819b285))
- Remove staticmethod subscription
([`0ff1fdc`](https://github.com/bec-project/bec_widgets/commit/0ff1fdc81578eec3ffc5d4030fca7b357a0b4c2f))
## v3.4.3 (2026-04-13)
### Bug Fixes
- Set OPHYD_CONTROL_LAYER to dummy for tests
([`5e84d3b`](https://github.com/bec-project/bec_widgets/commit/5e84d3bec608ae9f2ee6dae67db2e3e1387b1f59))
## v3.4.2 (2026-04-01)
### Bug Fixes
- Allow admin user to pass deployment group check
([`e6c8cd0`](https://github.com/bec-project/bec_widgets/commit/e6c8cd0b1a1162302071c93a2ac51880b3cf1b7d))
- **bec-atlas-admin-view**: Fix atlas_url to bec-atlas-prod.psi.ch
([`242f893`](https://github.com/bec-project/bec_widgets/commit/242f8933b246802f5f3a5b9df7de07901f151c82))
### Testing
- Add tests for admin access
([`2dab16b`](https://github.com/bec-project/bec_widgets/commit/2dab16b68415806f3f291657f394bb2d8654229d))
## v3.4.1 (2026-04-01)
### Bug Fixes

View File

@@ -10,9 +10,9 @@ import threading
import time
from contextlib import contextmanager
from threading import Lock
from typing import TYPE_CHECKING, Literal, TypeAlias, cast
from typing import TYPE_CHECKING, Callable, Literal, TypeAlias, cast
from bec_lib.endpoints import MessageEndpoints
from bec_lib.endpoints import EndpointInfo, MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
from rich.console import Console
@@ -232,6 +232,11 @@ class BECGuiClient(RPCBase):
"""The launcher object."""
return RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self, object_name="launcher")
def _safe_register_stream(self, endpoint: EndpointInfo, cb: Callable, **kwargs):
"""Check if already registered for registration in idempotent functions."""
if not self._client.connector.any_stream_is_registered(endpoint, cb=cb):
self._client.connector.register(endpoint, cb=cb, **kwargs)
def connect_to_gui_server(self, gui_id: str) -> None:
"""Connect to a GUI server"""
# Unregister the old callback
@@ -247,10 +252,9 @@ class BECGuiClient(RPCBase):
self._ipython_registry = {}
# Register the new callback
self._client.connector.register(
self._safe_register_stream(
MessageEndpoints.gui_registry_state(self._gui_id),
cb=self._handle_registry_update,
parent=self,
from_start=True,
)
@@ -531,20 +535,14 @@ class BECGuiClient(RPCBase):
def _start(self, wait: bool = False) -> None:
self._killed = False
self._client.connector.register(
MessageEndpoints.gui_registry_state(self._gui_id),
cb=self._handle_registry_update,
parent=self,
self._safe_register_stream(
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
)
return self._start_server(wait=wait)
@staticmethod
def _handle_registry_update(
msg: dict[str, GUIRegistryStateMessage], parent: BECGuiClient
) -> None:
def _handle_registry_update(self, msg: dict[str, GUIRegistryStateMessage]) -> None:
# This was causing a deadlock during shutdown, not sure why.
# with self._lock:
self = parent
self._server_registry = cast(dict[str, RegistryState], msg["data"].state)
self._update_dynamic_namespace(self._server_registry)

View File

@@ -248,9 +248,7 @@ class RPCBase:
self._rpc_response = None
self._msg_wait_event.clear()
self._client.connector.register(
MessageEndpoints.gui_instruction_response(request_id),
cb=self._on_rpc_response,
parent=self,
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
)
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
@@ -276,11 +274,10 @@ class RPCBase:
self._rpc_response = None
return self._create_widget_from_msg_result(msg_result)
@staticmethod
def _on_rpc_response(msg_obj: MessageObject, parent: RPCBase) -> None:
def _on_rpc_response(self, msg_obj: MessageObject) -> None:
msg = cast(messages.RequestResponseMessage, msg_obj.value)
parent._rpc_response = msg
parent._msg_wait_event.set()
self._rpc_response = msg
self._msg_wait_event.set()
def _create_widget_from_msg_result(self, msg_result):
if msg_result is None:

View File

@@ -175,12 +175,15 @@ class BECDispatcher:
cb_info (dict | None): A dictionary containing information about the callback. Defaults to None.
"""
qt_slot = QtThreadSafeCallback(cb=slot, cb_info=cb_info)
if qt_slot not in self._registered_slots:
self._registered_slots[qt_slot] = qt_slot
qt_slot = self._registered_slots[qt_slot]
self.client.connector.register(topics, cb=qt_slot, **kwargs)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
qt_slot.topics.update(set(topics_str))
if not self.client.connector.any_stream_is_registered(topics, qt_slot):
if qt_slot not in self._registered_slots:
self._registered_slots[qt_slot] = qt_slot
qt_slot = self._registered_slots[qt_slot]
self.client.connector.register(topics, cb=qt_slot, **kwargs)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
qt_slot.topics.update(set(topics_str))
else:
logger.warning(f"Attempted to create duplicate stream subscription for {topics=}")
def disconnect_slot(
self, slot: Callable, topics: EndpointInfo | str | list[EndpointInfo] | list[str]

View File

@@ -251,7 +251,7 @@ class BECAtlasAdminView(BECWidget, QWidget):
def __init__(
self,
parent=None,
atlas_url: str = "https://bec-atlas-dev.psi.ch/api/v1",
atlas_url: str = "https://bec-atlas-prod.psi.ch/api/v1",
client=None,
**kwargs,
):

View File

@@ -142,6 +142,17 @@ class BECAtlasHTTPService(QWidget):
if self._auth_user_info is not None:
self._auth_user_info.groups = set(groups)
def __check_access_to_owner_groups(self, groups: list[str]) -> bool:
"""Check if the authenticated user has access to the current deployment based on their groups."""
if self._auth_user_info is None or self._current_deployment_info is None:
return False
# Admin user
has_both = {"admin", "atlas_func_account"}.issubset(groups)
if has_both:
return True
# Regular user check with group intersection
return not self.auth_user_info.groups.isdisjoint(groups)
def __clear_login_info(self, skip_logout: bool = False):
"""Clear the authenticated user information after logout."""
self._auth_user_info = None
@@ -231,9 +242,7 @@ class BECAtlasHTTPService(QWidget):
)
elif AtlasEndpoints.DEPLOYMENT_INFO.value in request_url:
owner_groups = data.get("owner_groups", [])
if self.auth_user_info is not None and not self.auth_user_info.groups.isdisjoint(
owner_groups
):
if self.__check_access_to_owner_groups(owner_groups):
self.authenticated.emit(self.auth_user_info.model_dump())
else:
if self.auth_user_info is not None:

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "bec_widgets"
version = "3.4.1"
version = "3.4.4"
description = "BEC Widgets"
requires-python = ">=3.11"
classifiers = [

View File

@@ -1,6 +1,7 @@
import pytest
from bec_widgets.cli.client import Image, MotorMap, Waveform
from bec_widgets.cli.client_utils import BECGuiClient
from bec_widgets.cli.rpc.rpc_base import RPCReference
# pylint: disable=unused-argument
@@ -122,7 +123,7 @@ def test_ring_bar(qtbot, connected_client_gui_obj):
assert gui._ipython_registry[bar._gui_id].__class__.__name__ == "RingProgressBar"
def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
def test_rpc_gui_obj(connected_client_gui_obj: BECGuiClient, qtbot):
gui = connected_client_gui_obj
qtbot.waitUntil(lambda: len(gui.windows) == 1, timeout=3000)

View File

@@ -1,3 +1,8 @@
# Force ophyd onto its dummy control layer in tests so importing it does not
# try to create a real EPICS CA context.
import os
os.environ.setdefault("OPHYD_CONTROL_LAYER", "dummy")
import json
import time
from unittest import mock
@@ -13,7 +18,7 @@ from bec_lib.client import BECClient
from bec_lib.messages import _StoredDataInfo
from bec_qthemes import apply_theme
from bec_qthemes._theme import Theme
from ophyd._pyepics_shim import _dispatcher
from ophyd._dummy_shim import _dispatcher
from pytestqt.exceptions import TimeoutError as QtBotTimeoutError
from qtpy.QtCore import QEvent, QEventLoop
from qtpy.QtWidgets import QApplication, QMessageBox

View File

@@ -81,12 +81,95 @@ class _FakeReply:
self.deleted = True
@pytest.fixture
def experiment_info_message() -> ExperimentInfoMessage:
data = {
"_id": "p22622",
"owner_groups": ["admin"],
"access_groups": ["unx-sls_xda_bs", "p22622"],
"realm_id": "TestBeamline",
"proposal": "12345967",
"title": "Test Experiment for Mat Card Widget",
"firstname": "John",
"lastname": "Doe",
"email": "john.doe@psi.ch",
"account": "doe_j",
"pi_firstname": "Jane",
"pi_lastname": "Smith",
"pi_email": "jane.smith@psi.ch",
"pi_account": "smith_j",
"eaccount": "e22622",
"pgroup": "p22622",
"abstract": "This is a test abstract for the experiment mat card widget. It should be long enough to test text wrapping and display in the card. The abstract provides a brief overview of the experiment, its goals, and its significance. This text is meant to simulate a real abstract that might be associated with an experiment in the BEC Atlas system. The card should be able to handle abstracts of varying lengths without any issues, ensuring that the user can read the full abstract even if it is quite long.",
"schedule": [{"start": "01/01/2025 08:00:00", "end": "03/01/2025 18:00:00"}],
"proposal_submitted": "15/12/2024",
"proposal_expire": "31/12/2025",
"proposal_status": "Scheduled",
"delta_last_schedule": 30,
"mainproposal": "",
}
return ExperimentInfoMessage.model_validate(data)
@pytest.fixture
def experiment_info_list(experiment_info_message: ExperimentInfoMessage) -> list[dict]:
"""Fixture to provide a list of experiment info dictionaries."""
another_experiment_info = {
"_id": "p22623",
"owner_groups": ["admin"],
"access_groups": ["unx-sls_xda_bs", "p22623"],
"realm_id": "TestBeamline",
"proposal": "",
"title": "Experiment without Proposal",
"firstname": "Alice",
"lastname": "Johnson",
"email": "alice.johnson@psi.ch",
"account": "johnson_a",
"pi_firstname": "Bob",
"pi_lastname": "Brown",
"pi_email": "bob.brown@psi.ch",
"pi_account": "brown_b",
"eaccount": "e22623",
"pgroup": "p22623",
"abstract": "",
"schedule": [],
"proposal_submitted": "",
"proposal_expire": "",
"proposal_status": "",
"delta_last_schedule": None,
"mainproposal": "",
}
return [
experiment_info_message.model_dump(),
ExperimentInfoMessage.model_validate(another_experiment_info).model_dump(),
]
class TestBECAtlasHTTPService:
@pytest.fixture
def http_service(self, qtbot):
def deployment_info(
self, experiment_info_message: ExperimentInfoMessage
) -> DeploymentInfoMessage:
"""Fixture to provide a DeploymentInfoMessage instance."""
return DeploymentInfoMessage(
deployment_id="dep-1",
name="Test Deployment",
messaging_config=MessagingConfig(
signal=MessagingServiceScopeConfig(enabled=False),
teams=MessagingServiceScopeConfig(enabled=False),
scilog=MessagingServiceScopeConfig(enabled=False),
),
active_session=SessionInfoMessage(
experiment=experiment_info_message, name="Test Session"
),
)
@pytest.fixture
def http_service(self, deployment_info: DeploymentInfoMessage, qtbot):
"""Fixture to create a BECAtlasHTTPService instance."""
service = BECAtlasHTTPService(base_url="http://localhost:8000")
service._set_current_deployment_info(deployment_info)
qtbot.addWidget(service)
qtbot.waitExposed(service)
return service
@@ -224,7 +307,7 @@ class TestBECAtlasHTTPService:
assert http_service.auth_user_info.groups == {"operators", "staff"}
mock_get_deployment_info.assert_called_once_with(deployment_id="dep-1")
def test_handle_response_deployment_info(self, http_service, qtbot):
def test_handle_response_deployment_info(self, http_service: BECAtlasHTTPService, qtbot):
"""Test handling deployment info response"""
# Groups match: should emit authenticated signal with user info
@@ -268,6 +351,25 @@ class TestBECAtlasHTTPService:
mock_show_warning.assert_called_once()
mock_logout.assert_called_once()
def test_handle_response_deployment_info_admin_access(self, http_service, qtbot):
http_service._auth_user_info = AuthenticatedUserInfo(
email="alice@example.org",
exp=time.time() + 60,
groups={"operators"},
deployment_id="dep-1",
)
# Admin user should authenticate regardless of group membership
reply = _FakeReply(
request_url="http://localhost:8000/deployments/id?deployment_id=dep-1",
status=200,
payload=b'{"owner_groups": ["admin", "atlas_func_account"], "name": "Beamline Deployment"}',
)
with qtbot.waitSignal(http_service.authenticated, timeout=1000) as blocker:
http_service._handle_response(reply)
assert blocker.args[0]["email"] == "alice@example.org"
def test_handle_response_emits_http_response(self, http_service, qtbot):
"""Test that _handle_response emits the http_response signal with correct parameters for a generic response."""
reply = _FakeReply(
@@ -297,70 +399,6 @@ class TestBECAtlasHTTPService:
http_service._handle_response(reply, _override_slot_params={"raise_error": True})
@pytest.fixture
def experiment_info_message() -> ExperimentInfoMessage:
data = {
"_id": "p22622",
"owner_groups": ["admin"],
"access_groups": ["unx-sls_xda_bs", "p22622"],
"realm_id": "TestBeamline",
"proposal": "12345967",
"title": "Test Experiment for Mat Card Widget",
"firstname": "John",
"lastname": "Doe",
"email": "john.doe@psi.ch",
"account": "doe_j",
"pi_firstname": "Jane",
"pi_lastname": "Smith",
"pi_email": "jane.smith@psi.ch",
"pi_account": "smith_j",
"eaccount": "e22622",
"pgroup": "p22622",
"abstract": "This is a test abstract for the experiment mat card widget. It should be long enough to test text wrapping and display in the card. The abstract provides a brief overview of the experiment, its goals, and its significance. This text is meant to simulate a real abstract that might be associated with an experiment in the BEC Atlas system. The card should be able to handle abstracts of varying lengths without any issues, ensuring that the user can read the full abstract even if it is quite long.",
"schedule": [{"start": "01/01/2025 08:00:00", "end": "03/01/2025 18:00:00"}],
"proposal_submitted": "15/12/2024",
"proposal_expire": "31/12/2025",
"proposal_status": "Scheduled",
"delta_last_schedule": 30,
"mainproposal": "",
}
return ExperimentInfoMessage.model_validate(data)
@pytest.fixture
def experiment_info_list(experiment_info_message: ExperimentInfoMessage) -> list[dict]:
"""Fixture to provide a list of experiment info dictionaries."""
another_experiment_info = {
"_id": "p22623",
"owner_groups": ["admin"],
"access_groups": ["unx-sls_xda_bs", "p22623"],
"realm_id": "TestBeamline",
"proposal": "",
"title": "Experiment without Proposal",
"firstname": "Alice",
"lastname": "Johnson",
"email": "alice.johnson@psi.ch",
"account": "johnson_a",
"pi_firstname": "Bob",
"pi_lastname": "Brown",
"pi_email": "bob.brown@psi.ch",
"pi_account": "brown_b",
"eaccount": "e22623",
"pgroup": "p22623",
"abstract": "",
"schedule": [],
"proposal_submitted": "",
"proposal_expire": "",
"proposal_status": "",
"delta_last_schedule": None,
"mainproposal": "",
}
return [
experiment_info_message.model_dump(),
ExperimentInfoMessage.model_validate(another_experiment_info).model_dump(),
]
class TestBECAtlasExperimentSelection:
def test_format_name(self, experiment_info_message: ExperimentInfoMessage):
@@ -546,7 +584,7 @@ class TestBECAtlasAdminView:
def test_init_and_login(self, admin_view: BECAtlasAdminView, qtbot):
"""Test that the BECAtlasAdminView initializes correctly."""
# Check that the atlas URL is set correctly
assert admin_view._atlas_url == "https://bec-atlas-dev.psi.ch/api/v1"
assert admin_view._atlas_url == "https://bec-atlas-prod.psi.ch/api/v1"
# Test that clicking the login button emits the credentials_entered signal with the correct username and password
with mock.patch.object(admin_view.atlas_http_service, "login") as mock_login: