Compare commits

..

24 Commits

Author SHA1 Message Date
wyzula_j a7975a08da fix(progress): scan progress reset on_scan_status in unified backend 2026-06-09 14:18:16 +02:00
wyzula_j 97c0e68f29 refactor(bec_progress): simplification of chunk radius calculation 2026-06-07 11:06:13 +02:00
wyzula_j c994674ba3 fix(ring): ProgressSignal fetch logic back 2026-06-07 11:06:13 +02:00
wyzula_j c28b512d1f fix(scan_control): remove parent from layout to prevent QLayout: Attempting to add QLayout "" to ScanGroupBox "", which already has a layout 2026-06-07 11:06:13 +02:00
wyzula_j 81cde2cb79 feat(progress): progress is tracked from bec; unified progress backend 2026-06-07 11:06:13 +02:00
wyzula_j cd150c09a9 fix(bec_progress_bar): replace the custom paint event progressbar with native QProgressBar 2026-06-02 17:05:53 +02:00
wyzula_j af125e2222 test(e2e): increase rpc test_available_widgets timout back to 100 2026-06-02 14:51:33 +02:00
semantic-release b2e0b79210 3.13.5
Automatically generated by python-semantic-release
2026-06-02 10:28:09 +00:00
wyzula_j 1427c70cfb fix(forms): GridLayout applied to widget which already has layout 2026-06-02 12:27:23 +02:00
wyzula_j 154ae6026a refactor(client_utils): simplify PID fetching 2026-06-02 12:27:23 +02:00
wyzula_j 9f94ca7748 fix(launcher): avoid orphan widgets detection and logging 2026-06-02 12:27:23 +02:00
wyzula_j 3796984182 fix(abort_button): from __future__ import annotations 2026-06-02 12:27:23 +02:00
wyzula_j 8a180eaa7b fix(rpc): client/server rpc handshake for shutdown 2026-06-02 12:27:23 +02:00
wyzula_j 4572760b56 fix(client_utils): stop output reader thread on shutdown 2026-06-02 12:27:23 +02:00
wyzula_j e42a9824cc fix(rpc): more robust shutdown section with PID logging 2026-06-02 12:27:23 +02:00
wyzula_j 2fb7fb2ff4 fix(logging): removed args/kwargs from logging messages 2026-06-02 12:27:23 +02:00
wyzula_j c8275fcfd5 fix: change prints into proper logs 2026-06-02 12:27:23 +02:00
wyzula_j 07515d24be fix(client_utils): increase default rpc timeout to 60s 2026-06-02 12:27:23 +02:00
wyzula_j 859563abb3 fix(rpc_server): log warning if rpc call is repeated 2026-06-02 12:27:23 +02:00
wyzula_j bd66afb98d fix(companion_app): disable logging of bec_lib.scan_items on widget side 2026-06-02 12:27:23 +02:00
wyzula_j 8e1e282fac refactor(rpc): share logging helpers 2026-06-02 12:27:23 +02:00
wyzula_j 878745b99a fix(rpc): log dispatcher receipt before qt callback 2026-06-02 12:27:23 +02:00
wyzula_j e41e60956b fix(rpc): additional logs 2026-06-02 12:27:23 +02:00
wyzula_j ed68eb5ac6 fix(launch_window): exclude launcher check for non-parented widgets for BECMainWindow 2026-06-02 12:27:23 +02:00
33 changed files with 2179 additions and 927 deletions
+55
View File
@@ -1,6 +1,61 @@
# CHANGELOG
## v3.13.5 (2026-06-02)
### Bug Fixes
- Change prints into proper logs
([`c8275fc`](https://github.com/bec-project/bec_widgets/commit/c8275fcfd5c920393df3aa201c32a632ac8086a5))
- **abort_button**: From __future__ import annotations
([`3796984`](https://github.com/bec-project/bec_widgets/commit/37969841822c8c38c23a1d8fca8e38aec684957b))
- **client_utils**: Increase default rpc timeout to 60s
([`07515d2`](https://github.com/bec-project/bec_widgets/commit/07515d24be6e930b1b40170fc710255914cb7454))
- **client_utils**: Stop output reader thread on shutdown
([`4572760`](https://github.com/bec-project/bec_widgets/commit/4572760b56ca2ab6435db3a6a4ba0d270e9008d1))
- **companion_app**: Disable logging of bec_lib.scan_items on widget side
([`bd66afb`](https://github.com/bec-project/bec_widgets/commit/bd66afb98dcb76ca87b0db1334df3c1af0a9dbad))
- **forms**: Gridlayout applied to widget which already has layout
([`1427c70`](https://github.com/bec-project/bec_widgets/commit/1427c70cfb6f84bbced7f72ec5cfa55ac0b9b742))
- **launch_window**: Exclude launcher check for non-parented widgets for BECMainWindow
([`ed68eb5`](https://github.com/bec-project/bec_widgets/commit/ed68eb5ac6b20cfc7ca2c0b91864dc54fb579499))
- **launcher**: Avoid orphan widgets detection and logging
([`9f94ca7`](https://github.com/bec-project/bec_widgets/commit/9f94ca7748d73a30622ecbaef384f4bc73a3d2fb))
- **logging**: Removed args/kwargs from logging messages
([`2fb7fb2`](https://github.com/bec-project/bec_widgets/commit/2fb7fb2ff487863c3bc931498496da74b25e52d8))
- **rpc**: Additional logs
([`e41e609`](https://github.com/bec-project/bec_widgets/commit/e41e60956b54890b70b3390b981196c9477abd93))
- **rpc**: Client/server rpc handshake for shutdown
([`8a180ea`](https://github.com/bec-project/bec_widgets/commit/8a180eaa7be5c1603d893cf3b50585f88f9b0c83))
- **rpc**: Log dispatcher receipt before qt callback
([`878745b`](https://github.com/bec-project/bec_widgets/commit/878745b99ac1e22c0fbddecc294e599469a2adfe))
- **rpc**: More robust shutdown section with PID logging
([`e42a982`](https://github.com/bec-project/bec_widgets/commit/e42a9824ccd54b71a3141aaf2aa4e02af6a13782))
- **rpc_server**: Log warning if rpc call is repeated
([`859563a`](https://github.com/bec-project/bec_widgets/commit/859563abb3e94ff55886e72db3177522900a89b8))
### Refactoring
- **client_utils**: Simplify PID fetching
([`154ae60`](https://github.com/bec-project/bec_widgets/commit/154ae6026a6471b7c1db42f7c2ff3dc7be4b4afb))
- **rpc**: Share logging helpers
([`8e1e282`](https://github.com/bec-project/bec_widgets/commit/8e1e282fac22ab6f726049758306c7ca17af70eb))
## v3.13.4 (2026-05-29)
### Bug Fixes
+65 -20
View File
@@ -5,6 +5,7 @@ import json
import os
import signal
import sys
import traceback
from contextlib import redirect_stderr, redirect_stdout
import darkdetect
@@ -63,6 +64,7 @@ class GUIServer:
self.app: QApplication | None = None
self.launcher_window: LaunchWindow | None = None
self.dispatcher: BECDispatcher | None = None
self._shutdown_started = False
def start(self):
"""
@@ -74,6 +76,7 @@ class GUIServer:
bec_logger._stderr_log_level = bec_logger.LOGLEVEL.ERROR
bec_logger._update_sinks()
bec_logger.disabled_modules = ["bec_lib.scan_items"]
with redirect_stdout(SimpleFileLikeFromLogOutputFunc(logger.info)): # type: ignore
with redirect_stderr(SimpleFileLikeFromLogOutputFunc(logger.error)): # type: ignore
self._run()
@@ -122,17 +125,8 @@ class GUIServer:
self.app.aboutToQuit.connect(self.shutdown)
self.app.setQuitOnLastWindowClosed(True)
def sigint_handler(*args):
# display message, for people to let it terminate gracefully
print("Caught SIGINT, exiting")
# Widgets should be all closed.
with RPCRegister.delayed_broadcast():
for widget in QApplication.instance().topLevelWidgets(): # type: ignore
widget.close()
self.shutdown()
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
signal.signal(signal.SIGINT, self.request_shutdown)
signal.signal(signal.SIGTERM, self.request_shutdown)
sys.exit(self.app.exec())
@@ -149,16 +143,67 @@ class GUIServer:
)
self.app.setWindowIcon(icon)
def request_shutdown(self, signum=None, _frame=None):
"""
Request Qt application shutdown from an RPC call or OS signal.
Cleanup itself is handled by ``shutdown()``, which is connected to
``QApplication.aboutToQuit``. Calling it directly here would run BEC/RPC
teardown before Qt has processed the widget close events.
"""
signal_name = signal.Signals(signum).name if signum is not None else "shutdown"
pid = os.getpid()
if self.app is None:
logger.info(f"Caught {signal_name}, shutting down GUI server pid={pid} without app")
self.shutdown()
return
widgets = [
f"{widget.__class__.__name__}(objectName={widget.objectName()!r})"
for widget in self.app.topLevelWidgets()
]
logger.info(
f"Caught {signal_name}, requesting GUI server shutdown pid={pid} "
f"top_level_widgets={widgets}"
)
with RPCRegister.delayed_broadcast():
for widget in self.app.topLevelWidgets():
widget.close()
self.app.quit()
@staticmethod
def _run_shutdown_step(step: str, callback):
try:
callback()
except Exception as exc:
logger.error(
f"GUIServer shutdown step failed pid={os.getpid()} step={step}: {exc}\n"
f"{traceback.format_exc()}"
)
def shutdown(self):
logger.info("Shutdown GUIServer", repr(self))
if self.launcher_window and shiboken6.isValid(self.launcher_window):
self.launcher_window.close()
self.launcher_window.deleteLater()
if pylsp_server.is_running():
pylsp_server.stop()
if self.dispatcher:
self.dispatcher.stop_cli_server()
self.dispatcher.disconnect_all()
if self._shutdown_started:
return
self._shutdown_started = True
logger.info(f"Shutdown GUIServer pid={os.getpid()} {repr(self)}")
def close_launcher_window():
if self.launcher_window and shiboken6.isValid(self.launcher_window):
self.launcher_window.close()
self.launcher_window.deleteLater()
def stop_pylsp_server():
if pylsp_server.is_running():
pylsp_server.stop()
def stop_dispatcher():
if self.dispatcher:
self.dispatcher.stop_cli_server()
self.dispatcher.disconnect_all()
self._run_shutdown_step("close_launcher_window", close_launcher_window)
self._run_shutdown_step("stop_pylsp_server", stop_pylsp_server)
self._run_shutdown_step("stop_dispatcher", stop_dispatcher)
def main():
+57 -26
View File
@@ -207,6 +207,7 @@ class LaunchWindow(BECMainWindow):
self.app = QApplication.instance()
self.tiles: dict[str, LaunchTile] = {}
self._logged_unparented_connections: set[str] = set()
# Track the smallest mainlabel font size chosen so far
self._min_main_label_pt: int | None = None
@@ -655,53 +656,83 @@ class LaunchWindow(BECMainWindow):
super().showEvent(event)
self.setFixedSize(self.size())
def _launcher_is_last_widget(self, connections: dict) -> bool:
def _has_external_window(self, connections: dict) -> bool:
"""
Check if the launcher is the last widget in the application.
Check if any registered non-launcher connection owns a top-level Qt window.
"""
# get all parents of connections
for connection in connections.values():
try:
parent = connection.parent()
if parent is None and connection.objectName() != self.objectName():
logger.info(
f"Found non-launcher connection without parent: {connection.objectName()}"
)
return False
except Exception as e:
logger.error(f"Error getting parent of connection: {e}")
return False
return True
if self._connection_belongs_to_launcher(connection):
continue
if isinstance(connection, QWidget) and connection.isWindow():
return True
return False
def _log_unparented_connections(self, connections: dict) -> None:
"""
Log non-launcher RPC connections that remain without an active top-level window.
"""
for connection in connections.values():
if self._connection_belongs_to_launcher(connection):
continue
if isinstance(connection, QWidget) and connection.isWindow():
continue
connection_description = (
f"type={type(connection).__name__} objectName={connection.objectName()!r} "
f"gui_id={connection.gui_id!r}"
)
if connection_description in self._logged_unparented_connections:
continue
self._logged_unparented_connections.add(connection_description)
logger.warning(
"Registered non-launcher RPC connection has no active top-level window: "
f"{connection_description}"
)
def _connection_belongs_to_launcher(self, connection: QObject) -> bool:
"""
Check whether a registered connection is the launcher itself or part of its Qt hierarchy.
"""
if connection is self or connection.gui_id == self.gui_id:
return True
parent = connection.parent()
while parent is not None:
if parent is self:
return True
parent = parent.parent()
return False
def _turn_off_the_lights(self, connections: dict):
"""
If there is only one connection remaining, it is the launcher, so we show it.
Once the launcher is closed as the last window, we quit the application.
"""
if self._launcher_is_last_widget(connections):
self.show()
self.activateWindow()
self.raise_()
if self._has_external_window(connections):
self.hide()
if self.app:
self.app.setQuitOnLastWindowClosed(True) # type: ignore
self.app.setQuitOnLastWindowClosed(False) # type: ignore
return
self.hide()
self._log_unparented_connections(connections)
self.show()
self.activateWindow()
self.raise_()
if self.app:
self.app.setQuitOnLastWindowClosed(False) # type: ignore
self.app.setQuitOnLastWindowClosed(True) # type: ignore
def closeEvent(self, event):
"""
Close the launcher window.
"""
connections = self.register.list_all_connections()
if self._launcher_is_last_widget(connections):
event.accept()
if self._has_external_window(connections):
event.ignore()
self.hide()
return
event.ignore()
self.hide()
event.accept()
if __name__ == "__main__": # pragma: no cover
+1 -1
View File
@@ -427,7 +427,7 @@ class BECMainWindow(RPCBase):
class BECProgressBar(RPCBase):
"""A custom progress bar with smooth transitions. The displayed text can be customized using a template."""
"""A BEC progress bar backed by Qt's native QProgressBar."""
_IMPORT_MODULE = "bec_widgets.widgets.progress.bec_progressbar.bec_progressbar"
+146 -9
View File
@@ -5,6 +5,7 @@ from __future__ import annotations
import json
import os
import select
import signal
import subprocess
import threading
import time
@@ -33,6 +34,12 @@ else:
logger = bec_logger.logger
IGNORE_WIDGETS = ["LaunchWindow"]
PROCESS_TERMINATION_TIMEOUT = 10
PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT = 2
PROCESS_OUTPUT_SELECT_TIMEOUT = 0.2
GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT = 3
GRACEFUL_SERVER_SHUTDOWN_TIMEOUT = 5
OUTPUT_READER_STOP_EVENT_ATTR = "_bec_output_reader_stop_event"
RegistryState: TypeAlias = dict[
Literal["gui_id", "name", "widget_class", "config", "__rpc__", "container_proxy"],
@@ -53,14 +60,16 @@ def _filter_output(output: str) -> str:
return output
def _get_output(process, logger) -> None:
def _get_output(process, logger, stop_event: threading.Event | None = None) -> None:
log_func = {process.stdout: logger.debug, process.stderr: logger.info}
stream_buffer = {process.stdout: [], process.stderr: []}
try:
os.set_blocking(process.stdout.fileno(), False)
os.set_blocking(process.stderr.fileno(), False)
while process.poll() is None:
readylist, _, _ = select.select([process.stdout, process.stderr], [], [], 1)
while process.poll() is None and not (stop_event and stop_event.is_set()):
readylist, _, _ = select.select(
[process.stdout, process.stderr], [], [], PROCESS_OUTPUT_SELECT_TIMEOUT
)
for stream in (process.stdout, process.stderr):
buf = stream_buffer[stream]
if stream in readylist:
@@ -75,6 +84,95 @@ def _get_output(process, logger) -> None:
logger.error(f"Error reading process output: {str(e)}")
def _process_group_snapshot(process) -> str:
try:
pgid = os.getpgid(process.pid)
except ProcessLookupError:
return "Process group snapshot unavailable: process already exited"
try:
result = subprocess.run(
["ps", "-o", "pid,ppid,pgid,stat,command", "-g", str(pgid)],
check=False,
capture_output=True,
text=True,
timeout=2,
)
except Exception as exc:
return f"Process group snapshot unavailable: {exc}"
output = result.stdout.strip()
if not output:
return f"Process group snapshot empty for pgid={pgid}"
return output
def _terminate_plot_process(process, logger, timeout: float = PROCESS_TERMINATION_TIMEOUT) -> None:
if process.poll() is not None:
return
process_info = f"pid={process.pid} command={process.args}"
try:
pgid = os.getpgid(process.pid)
process_info = f"pid={process.pid} pgid={pgid} command={process.args}"
logger.info(f"Terminating GUI process group {process_info}")
os.killpg(pgid, signal.SIGTERM)
except ProcessLookupError:
process.wait(timeout=timeout)
return
except Exception as exc:
logger.warning("Failed to terminate GUI process group; terminating process only.")
logger.info(f"GUI process termination failure details: {exc}. pid={process.pid}")
process.terminate()
try:
process.wait(timeout=timeout)
return
except subprocess.TimeoutExpired:
logger.warning(f"GUI process did not stop within {timeout}s; killing it.")
logger.info(
f"GUI process force-kill details: {process_info}\n"
f"{_process_group_snapshot(process)}"
)
try:
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
except ProcessLookupError as e:
logger.error(f"Failed to kill GUI process group: {e}")
process.wait(timeout=timeout)
return
process.wait(timeout=timeout)
def _wait_for_process_exit(process, timeout: float) -> bool:
try:
process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
return False
return True
def _join_process_output_thread(process, thread: threading.Thread | None, logger) -> None:
if thread is None:
return
thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT)
if not thread.is_alive():
return
if stop_event := getattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, None):
stop_event.set()
for stream in (process.stdout, process.stderr):
if stream is None:
continue
try:
stream.close()
except OSError as e:
logger.error(f"Failed to close stream {str(e)}")
thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT)
if thread.is_alive():
logger.warning("GUI process output reader thread did not stop after process shutdown.")
logger.info(f"GUI process output reader thread details: pid={process.pid}")
def _start_plot_process(
gui_id: str,
gui_class_id: str,
@@ -126,8 +224,14 @@ def _start_plot_process(
if logger is None:
process_output_processing_thread = None
else:
process_output_stop_event = threading.Event()
process_output_processing_thread = threading.Thread(
target=_get_output, args=(process, logger)
target=_get_output, args=(process, logger, process_output_stop_event)
)
setattr(
process_output_processing_thread,
OUTPUT_READER_STOP_EVENT_ATTR,
process_output_stop_event,
)
process_output_processing_thread.start()
return process, process_output_processing_thread
@@ -222,7 +326,7 @@ class BECGuiClient(RPCBase):
self._ipython_registry: dict[str, RPCReference] = {}
self.available_widgets = AvailableWidgetsNamespace()
register_serializer_extension()
self._rpc_timeout = 5
self._rpc_timeout = 60
####################
#### Client API ####
@@ -465,11 +569,13 @@ class BECGuiClient(RPCBase):
if self._process:
logger.success("Stopping GUI...")
self._process.terminate()
if self._process_output_processing_thread:
self._process_output_processing_thread.join()
self._process.wait()
if not self._request_server_shutdown():
_terminate_plot_process(self._process, logger)
_join_process_output_thread(
self._process, self._process_output_processing_thread, logger
)
self._process = None
self._process_output_processing_thread = None
# Unregister the registry state
self._client.connector.unregister(
@@ -488,6 +594,37 @@ class BECGuiClient(RPCBase):
#### Private methods ####
#########################
def _request_server_shutdown(self) -> bool:
if self._process is None or self._process.poll() is not None:
return True
process_details = f"pid={self._process.pid} command={self._process.args}"
logger.info(f"Requesting graceful GUI shutdown {process_details}")
try:
self.launcher._run_rpc( # pylint: disable=protected-access
"system.shutdown",
wait_for_rpc_response=True,
timeout=GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT,
)
except Exception as exc:
logger.warning(
"Could not confirm graceful GUI shutdown via RPC; "
"falling back to process termination."
)
logger.info(f"Graceful GUI shutdown RPC failure details: {exc}. {process_details}")
return False
if _wait_for_process_exit(self._process, GRACEFUL_SERVER_SHUTDOWN_TIMEOUT):
logger.info(f"GUI server exited after graceful shutdown {process_details}")
return True
logger.warning(
"GUI server did not exit after graceful shutdown request; "
"falling back to process termination."
)
logger.info(
f"Graceful GUI shutdown timeout details: {process_details}\n"
f"{_process_group_snapshot(self._process)}"
)
return False
def _check_if_server_is_alive(self):
"""Checks if the process is alive"""
if self._process is None:
+37
View File
@@ -2,6 +2,7 @@ from __future__ import annotations
import inspect
import threading
import time
import uuid
from functools import wraps
from typing import TYPE_CHECKING, Any, cast
@@ -9,6 +10,7 @@ from typing import TYPE_CHECKING, Any, cast
from bec_lib.client import BECClient
from bec_lib.device import DeviceBaseWithConfig
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
if TYPE_CHECKING: # pragma: no cover
@@ -25,6 +27,7 @@ else:
# pylint: disable=protected-access
_DEFAULT_RPC_TIMEOUT = object()
logger = bec_logger.logger
def _name_arg(arg):
@@ -261,12 +264,39 @@ class RPCBase:
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
)
target_gui_id = gui_id or self._gui_id
sent_at = time.time()
deadline = sent_at + timeout if timeout is not None else None
rpc_msg.metadata.update(
{
"method": method,
"receiver": receiver,
"target_gui_id": target_gui_id,
"object_name": self.object_name,
"wait_for_response": wait_for_rpc_response,
"timeout": timeout,
"sent_at": sent_at,
"deadline": deadline,
}
)
logger.info(
"Sending GUI RPC request "
f"request_id={request_id} method={method} receiver={receiver} "
f"target_gui_id={target_gui_id} object_name={self.object_name} "
f"wait_for_response={wait_for_rpc_response} timeout={timeout}"
)
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
if wait_for_rpc_response:
try:
finished = self._msg_wait_event.wait(timeout)
if not finished:
logger.error(
"GUI RPC response timeout "
f"request_id={request_id} method={method} receiver={receiver} "
f"target_gui_id={target_gui_id} object_name={self.object_name} "
f"timeout={timeout}"
)
raise RPCResponseTimeoutError(request_id, timeout)
finally:
self._msg_wait_event.clear()
@@ -278,6 +308,12 @@ class RPCBase:
# the _on_rpc_response method
assert isinstance(self._rpc_response, messages.RequestResponseMessage)
logger.info(
"Received GUI RPC response "
f"request_id={request_id} method={method} receiver={receiver} "
f"target_gui_id={target_gui_id} object_name={self.object_name} "
f"accepted={self._rpc_response.accepted}"
)
if not self._rpc_response.accepted:
raise ValueError(self._rpc_response.message["error"])
msg_result = self._rpc_response.message.get("result")
@@ -286,6 +322,7 @@ class RPCBase:
def _on_rpc_response(self, msg_obj: MessageObject) -> None:
msg = cast(messages.RequestResponseMessage, msg_obj.value)
logger.debug(f"GUI RPC response callback received: {msg}")
self._rpc_response = msg
self._msg_wait_event.set()
+38 -1
View File
@@ -3,8 +3,9 @@ from __future__ import annotations
import collections
import random
import string
import time
from collections.abc import Callable
from typing import TYPE_CHECKING, DefaultDict, Hashable, Union
from typing import TYPE_CHECKING, Any, DefaultDict, Hashable, Union
import louie
import redis
@@ -15,6 +16,7 @@ from bec_lib.service_config import ServiceConfig
from qtpy.QtCore import QObject
from qtpy.QtCore import Signal as pyqtSignal
from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed
from bec_widgets.utils.serialization import register_serializer_extension
logger = bec_logger.logger
@@ -25,6 +27,39 @@ if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.rpc_server import RPCServer
def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None:
if not isinstance(msg_content, dict) or not isinstance(metadata, dict):
return
request_id = metadata.get("request_id")
method = msg_content.get("action")
parameter = msg_content.get("parameter")
if request_id is None or method is None or not isinstance(parameter, dict):
return
dispatch_received_at = time.time()
sent_at = metadata.get("sent_at")
deadline = metadata.get("deadline")
timeout = metadata.get("timeout")
dispatch_latency = elapsed_seconds(sent_at, dispatch_received_at)
stale_on_dispatch = deadline is not None and dispatch_received_at > deadline
target_gui_id = parameter.get("gui_id") or metadata.get("target_gui_id")
logger.info(
"GUI RPC dispatcher received request before Qt callback emit "
f"request_id={request_id} method={method} receiver={metadata.get('receiver')} "
f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} "
f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)} "
f"stale_on_dispatch={stale_on_dispatch}"
)
if stale_on_dispatch:
logger.warning(
"GUI RPC dispatcher received request after client timeout deadline "
f"request_id={request_id} method={method} receiver={metadata.get('receiver')} "
f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} "
f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)}"
)
class QtThreadSafeCallback(QObject):
"""QtThreadSafeCallback is a wrapper around a callback function to make it thread-safe for Qt."""
@@ -88,10 +123,12 @@ class QtRedisConnector(RedisConnector):
# we can notice kwargs are lost when passed to Qt slot
metadata = msg.metadata
_log_rpc_dispatcher_receive(msg.content, metadata)
cb(msg.content, metadata)
else:
# from stream
msg = msg["data"]
_log_rpc_dispatcher_receive(msg.content, msg.metadata)
cb(msg.content, msg.metadata)
+25 -23
View File
@@ -331,32 +331,34 @@ class BECWidget(BECConnector):
# All widgets need to call super().cleanup() in their cleanup method
logger.info(f"Registry cleanup for widget {self.__class__.__name__}")
self.rpc_register.remove_rpc(self)
children = self.findChildren(BECWidget)
for child in children:
if not shiboken6.isValid(child):
# If the child is not valid, it means it has already been deleted
continue
child.close()
child.deleteLater()
children = self.findChildren(BECWidget)
for child in children:
if not shiboken6.isValid(child):
# If the child is not valid, it means it has already been deleted
continue
child.close()
child.deleteLater()
# Tear down busy overlay explicitly to stop spinner and remove filters
overlay = getattr(self, "_busy_overlay", None)
if overlay is not None and shiboken6.isValid(overlay):
try:
overlay.hide()
filt = getattr(overlay, "_filter", None)
if filt is not None and shiboken6.isValid(filt):
try:
self.removeEventFilter(filt)
except Exception as exc:
logger.warning(f"Failed to remove event filter from busy overlay: {exc}")
# Tear down busy overlay explicitly to stop spinner and remove filters
overlay = getattr(self, "_busy_overlay", None)
if overlay is not None and shiboken6.isValid(overlay):
try:
overlay.hide()
filt = getattr(overlay, "_filter", None)
if filt is not None and shiboken6.isValid(filt):
try:
self.removeEventFilter(filt)
except Exception as exc:
logger.warning(
f"Failed to remove event filter from busy overlay: {exc}"
)
# Cleanup the overlay widget. This will call cleanup on the custom widget if present.
# Cleanup the overlay widget. This will call cleanup on the custom widget if present.
overlay.cleanup()
overlay.deleteLater()
except Exception as exc:
logger.warning(f"Failed to delete busy overlay: {exc}")
overlay.cleanup()
overlay.deleteLater()
except Exception as exc:
logger.warning(f"Failed to delete busy overlay: {exc}")
def closeEvent(self, event):
"""Wrap the close even to ensure the rpc_register is cleaned up."""
+1 -1
View File
@@ -150,7 +150,7 @@ class TypedForm(BECWidget, QWidget):
self.adjustSize()
def _new_grid_layout(self):
new_grid = QGridLayout(self)
new_grid = QGridLayout()
new_grid.setContentsMargins(0, 0, 0, 0)
return new_grid
+16
View File
@@ -0,0 +1,16 @@
from __future__ import annotations
def elapsed_seconds(start: float | int | None, stop: float) -> float | None:
if start is None:
return None
try:
return max(0.0, stop - float(start))
except (TypeError, ValueError):
return None
def format_elapsed(elapsed: float | None) -> str:
if elapsed is None:
return "unknown"
return f"{elapsed:.3f}"
+112 -9
View File
@@ -1,6 +1,7 @@
from __future__ import annotations
import functools
import time
import traceback
import types
from contextlib import contextmanager
@@ -11,13 +12,14 @@ from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import
from qtpy.QtCore import Qt, QTimer
from qtpy.QtWidgets import QWidget
from qtpy.QtWidgets import QApplication, QWidget
from redis.exceptions import RedisError
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.utils.container_utils import WidgetContainerUtils
from bec_widgets.utils.error_popups import ErrorPopupUtility
from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.screen_utils import apply_window_geometry
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
@@ -115,27 +117,107 @@ class RPCServer:
if request_id is None:
logger.error("Received RPC instruction without request_id")
return
method = msg.get("action")
parameter = msg.get("parameter", {})
args = parameter.get("args", [])
kwargs = parameter.get("kwargs", {})
target_gui_id = parameter.get("gui_id")
sent_at = metadata.get("sent_at")
deadline = metadata.get("deadline")
timeout = metadata.get("timeout")
received_at = time.time()
receive_latency = elapsed_seconds(sent_at, received_at)
stale_on_receive = deadline is not None and received_at > deadline
logger.info(
"GUI RPC server received request "
f"request_id={request_id} method={method} gui_id={self.gui_id} "
f"target_gui_id={target_gui_id} timeout={timeout} "
f"receive_latency_s={format_elapsed(receive_latency)} "
f"stale_on_receive={stale_on_receive}"
)
if stale_on_receive:
logger.warning(
"GUI RPC server received request after client timeout deadline "
f"request_id={request_id} method={method} gui_id={self.gui_id} "
f"target_gui_id={target_gui_id} timeout={timeout} "
f"receive_latency_s={format_elapsed(receive_latency)}"
)
logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}")
# Shutdown must acknowledge before teardown starts. The generic RPC path
# below publishes successful responses through QTimer.singleShot(0);
# for system.shutdown that would race with the queued app quit and
# dispatcher shutdown scheduled by _shutdown_gui_server().
if method == "system.shutdown":
execution_start = time.perf_counter()
try:
self.run_system_rpc(method, args, kwargs)
except Exception:
execution_duration = time.perf_counter() - execution_start
content = traceback.format_exc()
logger.error(
"GUI RPC server shutdown request failed "
f"request_id={request_id} method={method} gui_id={self.gui_id} "
f"execution_duration_s={execution_duration:.3f}\n{content}"
)
self.send_response(request_id, False, {"error": content})
else:
execution_duration = time.perf_counter() - execution_start
logger.info(
"GUI RPC server acknowledged shutdown request "
f"request_id={request_id} method={method} gui_id={self.gui_id} "
f"execution_duration_s={execution_duration:.3f}"
)
self.send_response(request_id, True, {"result": None})
return
execution_start = time.perf_counter()
with rpc_exception_hook(functools.partial(self.send_response, request_id, False)):
try:
method = msg["action"]
args = msg["parameter"].get("args", [])
kwargs = msg["parameter"].get("kwargs", {})
if method.startswith("system."):
res = self.run_system_rpc(method, args, kwargs)
else:
obj = self.get_object_from_config(msg["parameter"])
obj = self.get_object_from_config(parameter)
res = self.run_rpc(obj, method, args, kwargs)
except Exception:
execution_duration = time.perf_counter() - execution_start
content = traceback.format_exc()
logger.error(f"Error while executing RPC instruction: {content}")
logger.error(
"GUI RPC server execution failed "
f"request_id={request_id} method={method} gui_id={self.gui_id} "
f"target_gui_id={target_gui_id} execution_duration_s={execution_duration:.3f}\n"
f"{content}"
)
self.send_response(request_id, False, {"error": content})
else:
execution_duration = time.perf_counter() - execution_start
response_stale = deadline is not None and time.time() > deadline
logger.info(
"GUI RPC server executed request "
f"request_id={request_id} method={method} gui_id={self.gui_id} "
f"target_gui_id={target_gui_id} execution_duration_s={execution_duration:.3f} "
f"response_after_client_deadline={response_stale}"
)
if response_stale:
logger.warning(
"GUI RPC server response is late for client timeout "
f"request_id={request_id} method={method} gui_id={self.gui_id} "
f"target_gui_id={target_gui_id} timeout={timeout} "
f"execution_duration_s={execution_duration:.3f}"
)
logger.debug(f"RPC instruction executed successfully: {res}")
self._rpc_singleshot_repeats[request_id] = SingleshotRPCRepeat()
QTimer.singleShot(0, lambda: self.serialize_result_and_send(request_id, res))
def send_response(self, request_id: str, accepted: bool, msg: dict):
log_message = (
"GUI RPC server publishing response "
f"request_id={request_id} gui_id={self.gui_id} accepted={accepted}"
)
if accepted:
logger.info(log_message)
else:
logger.error(log_message)
self.client.connector.set_and_publish(
MessageEndpoints.gui_instruction_response(request_id),
messages.RequestResponseMessage(accepted=accepted, message=msg),
@@ -236,10 +318,23 @@ class RPCServer:
def run_system_rpc(self, method: str, args: list, kwargs: dict):
if method == "system.launch_dock_area":
return self._launch_dock_area(*args, **kwargs)
if method == "system.shutdown":
return self._shutdown_gui_server()
if method == "system.list_capabilities":
return {"system.launch_dock_area": True}
return {"system.launch_dock_area": True, "system.shutdown": True}
raise ValueError(f"Unknown system RPC method: {method}")
@staticmethod
def _shutdown_gui_server() -> None:
app = QApplication.instance()
if app is None:
return
gui_server = getattr(app, "gui_server", None)
if gui_server is not None and hasattr(gui_server, "request_shutdown"):
QTimer.singleShot(0, gui_server.request_shutdown)
return
QTimer.singleShot(0, app.quit)
@staticmethod
def _launch_dock_area(
name: str | None = None,
@@ -297,7 +392,14 @@ class RPCServer:
res = self.serialize_object(res)
except RegistryNotReadyError:
try:
self._rpc_singleshot_repeats[request_id] += retry_delay
repeat = self._rpc_singleshot_repeats[request_id]
repeat += retry_delay
logger.warning(
"GUI RPC result serialization delayed; retrying "
f"request_id={request_id} retry_delay_ms={retry_delay} "
f"accumulated_delay_ms={repeat.accumulated_delay} "
f"max_delay_ms={repeat.max_delay}"
)
QTimer.singleShot(
retry_delay, lambda: self.serialize_result_and_send(request_id, res)
)
@@ -407,8 +509,9 @@ class RPCServer:
container_proxy = parent.gui_id
else:
container_proxy = None
except Exception:
except Exception as e:
container_proxy = None
logger.error(f"Error while serializing RPC result: {e}")
if wait and not self.rpc_register.object_is_registered(connector):
raise RegistryNotReadyError(f"Connector {connector} not registered yet")
@@ -46,8 +46,8 @@ logger = bec_logger.logger
class BECMainWindow(BECWidget, QMainWindow):
RPC = True
PLUGIN = True
SCAN_PROGRESS_WIDTH = 100 # px
SCAN_PROGRESS_HEIGHT = 12 # px
SCAN_PROGRESS_WIDTH = 120 # px
SCAN_PROGRESS_HEIGHT = 20 # px
def __init__(self, parent=None, window_title: str = "BEC", **kwargs):
super().__init__(parent=parent, **kwargs)
@@ -197,7 +197,11 @@ class BECMainWindow(BECWidget, QMainWindow):
# Setting HoverWidget for the scan progress bar - minimal and full version
self._scan_progress_bar_simple = ScanProgressBar(
self, one_line_design=True, rpc_exposed=False, rpc_passthrough_children=False
self,
one_line_design=True,
rpc_exposed=False,
rpc_passthrough_children=False,
enable_dynamic_stylesheet=True,
)
self._scan_progress_bar_simple.show_elapsed_time = False
self._scan_progress_bar_simple.show_remaining_time = False
@@ -205,8 +209,9 @@ class BECMainWindow(BECWidget, QMainWindow):
self._scan_progress_bar_simple.progressbar.label_template = ""
self._scan_progress_bar_simple.progressbar.setFixedHeight(self.SCAN_PROGRESS_HEIGHT)
self._scan_progress_bar_simple.progressbar.setFixedWidth(self.SCAN_PROGRESS_WIDTH)
# This one do not need dynamic styling on hover ScanProgressBar since user will hover on it probably later, when progress bar is big enough
self._scan_progress_bar_full = ScanProgressBar(
self, rpc_exposed=False, rpc_passthrough_children=False
self, rpc_exposed=False, rpc_passthrough_children=False, enable_dynamic_stylesheet=False
)
self._scan_progress_hover = HoverWidget(
self, simple=self._scan_progress_bar_simple, full=self._scan_progress_bar_full
@@ -233,8 +238,8 @@ class BECMainWindow(BECWidget, QMainWindow):
# The actual line
line = QFrame()
line.setFrameShape(QFrame.VLine)
line.setFrameShadow(QFrame.Sunken)
line.setFrameShape(QFrame.Shape.VLine)
line.setFrameShadow(QFrame.Shadow.Sunken)
line.setFixedHeight(status_bar.sizeHint().height() - 2)
# Wrapper to center the line vertically -> work around for QFrame not being able to center itself
@@ -242,7 +247,7 @@ class BECMainWindow(BECWidget, QMainWindow):
vbox = QVBoxLayout(wrapper)
vbox.setContentsMargins(0, 0, 0, 0)
vbox.addStretch()
vbox.addWidget(line, alignment=Qt.AlignHCenter)
vbox.addWidget(line, alignment=Qt.AlignmentFlag.AlignHCenter)
vbox.addStretch()
wrapper.setFixedWidth(line.sizeHint().width())
@@ -1,3 +1,6 @@
from __future__ import annotations
from bec_lib.logger import bec_logger
from bec_qthemes import material_icon
from qtpy.QtCore import Qt
from qtpy.QtWidgets import QHBoxLayout, QPushButton, QToolButton, QWidget
@@ -5,6 +8,8 @@ from qtpy.QtWidgets import QHBoxLayout, QPushButton, QToolButton, QWidget
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
logger = bec_logger.logger
class AbortButton(BECWidget, QWidget):
"""A button that abort the scan."""
@@ -55,7 +60,7 @@ class AbortButton(BECWidget, QWidget):
scan_id(str|None): The scan id to abort. If None, the current scan will be aborted.
"""
if self.scan_id is not None:
print(f"Aborting scan with scan_id: {self.scan_id}")
logger.info(f"Aborting scan with scan_id: {self.scan_id}")
self.queue.request_scan_abortion(scan_id=self.scan_id)
else:
self.queue.request_scan_abortion()
@@ -192,7 +192,7 @@ class ScanGroupBox(QGroupBox):
vbox_layout = QVBoxLayout(self)
hbox_layout = QHBoxLayout()
vbox_layout.addLayout(hbox_layout)
self.layout = QGridLayout(self)
self.layout = QGridLayout()
vbox_layout.addLayout(self.layout)
# Add bundle button
@@ -460,7 +460,7 @@ class ImageBase(PlotBase):
self._color_bar = None
def disable_autorange():
print("Disabling autorange")
logger.info("Disabling autorange")
self.setProperty("autorange", False)
if style == "simple":
@@ -928,7 +928,7 @@ class ImageBase(PlotBase):
# if sync:
self._sync_colorbar_levels()
self._sync_autorange_switch()
print(f"Autorange set to {enabled}")
logger.info(f"Autorange set to {enabled}")
@SafeProperty(str)
def autorange_mode(self) -> str:
@@ -2449,7 +2449,7 @@ class Waveform(PlotBase):
first_key = next(iter(info))
mem_bytes = info[first_key]["value"]["mem_size"]
size_mb = mem_bytes / (1024 * 1024)
print(f"Dataset size: {size_mb:.1f} MB")
logger.info(f"Dataset size: {size_mb:.1f} MB")
except Exception as exc: # noqa: BLE001
logger.error(f"Unable to evaluate dataset size: {exc}")
return True
@@ -2,50 +2,41 @@ import sys
from enum import Enum
from string import Template
from qtpy.QtCore import QEasingCurve, QPropertyAnimation, QRectF, Qt, QTimer
from qtpy.QtGui import QColor, QPainter, QPainterPath
class ProgressState(Enum):
NORMAL = "normal"
PAUSED = "paused"
INTERRUPTED = "interrupted"
COMPLETED = "completed"
@classmethod
def from_bec_status(cls, status: str) -> "ProgressState":
"""
Map a BEC status string (open, paused, aborted, halted, closed)
to the corresponding ProgressState.
Any unknown status falls back to NORMAL.
"""
mapping = {
"open": cls.NORMAL,
"paused": cls.PAUSED,
"aborted": cls.INTERRUPTED,
"halted": cls.PAUSED,
"closed": cls.COMPLETED,
}
return mapping.get(status.lower(), cls.NORMAL)
PROGRESS_STATE_COLORS = {
ProgressState.NORMAL: QColor("#2979ff"), # blue normal progress
ProgressState.PAUSED: QColor("#ffca28"), # orange/amber paused
ProgressState.INTERRUPTED: QColor("#ff5252"), # red interrupted
ProgressState.COMPLETED: QColor("#00e676"), # green finished
}
from qtpy.QtWidgets import QApplication, QLabel, QVBoxLayout, QWidget
from qtpy.QtCore import QTimer
from qtpy.QtGui import QPalette
from qtpy.QtWidgets import QApplication, QProgressBar, QSizePolicy, QVBoxLayout, QWidget
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
class ProgressState(Enum):
NORMAL = "normal"
PAUSED = "paused"
WARNING = "warning"
INTERRUPTED = "interrupted"
COMPLETED = "completed"
class BECProgressBar(BECWidget, QWidget):
"""
A custom progress bar with smooth transitions. The displayed text can be customized using a template.
A BEC progress bar backed by Qt's native QProgressBar.
The displayed text can be customized using a template with $value, $maximum,
and $percentage placeholders.
Args:
parent: Parent Qt widget.
client: Optional BEC client instance.
config: Optional widget configuration.
gui_id: Optional GUI identifier used by the BEC widget infrastructure.
enable_dynamic_stylesheet: If True, adjust the chunk border radius while the
filled chunk is still too narrow for the target radius. This avoids Qt
stylesheet over-rounding artifacts on small progress values. Once the
target radius is usable, normal value updates no longer rebuild the
stylesheet.
**kwargs: Additional keyword arguments forwarded to BECWidget.
"""
PLUGIN = True
@@ -61,7 +52,15 @@ class BECProgressBar(BECWidget, QWidget):
]
ICON_NAME = "page_control"
def __init__(self, parent=None, client=None, config=None, gui_id=None, **kwargs):
def __init__(
self,
parent=None,
client=None,
config=None,
gui_id=None,
enable_dynamic_stylesheet: bool = True,
**kwargs,
):
super().__init__(
parent=parent, client=client, gui_id=gui_id, config=config, theme_update=True, **kwargs
)
@@ -71,7 +70,6 @@ class BECProgressBar(BECWidget, QWidget):
# internal values
self._oversampling_factor = 50
self._value = 0
self._target_value = 0
self._maximum = 100 * self._oversampling_factor
# User values
@@ -80,46 +78,38 @@ class BECProgressBar(BECWidget, QWidget):
self._user_maximum = 100
self._label_template = "$value / $maximum - $percentage %"
# Color settings
self._background_color = QColor(30, 30, 30)
self._progress_color = accent_colors.highlight
self._completed_color = accent_colors.success
self._border_color = QColor(50, 50, 50)
# Cornerrounding: base radius in pixels (autoreduced if bar is small)
self._corner_radius = 10
self._corner_radius = 8
# Progressbar state handling
self._state = ProgressState.NORMAL
self._state_colors = {
ProgressState.NORMAL: accent_colors.default,
ProgressState.PAUSED: accent_colors.warning,
ProgressState.PAUSED: accent_colors.highlight,
ProgressState.WARNING: accent_colors.warning,
ProgressState.INTERRUPTED: accent_colors.emergency,
ProgressState.COMPLETED: accent_colors.success,
}
# layout settings
self._padding_left_right = 10
self._value_animation = QPropertyAnimation(self, b"_progressbar_value")
self._value_animation.setDuration(200)
self._value_animation.setEasingCurve(QEasingCurve.Type.OutCubic)
self._chunk_radius = None
self._enable_dynamic_stylesheet = enable_dynamic_stylesheet
# label on top of the progress bar
self.center_label = QLabel(self)
self.center_label.setAlignment(Qt.AlignHCenter)
self.center_label.setMinimumSize(0, 0)
self.center_label.setStyleSheet("background: transparent; color: white;")
self.progressbar = QProgressBar(self)
self.progressbar.setTextVisible(True)
self.progressbar.setRange(0, self._maximum)
self.progressbar.setMinimumHeight(0)
self.progressbar.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Ignored)
layout = QVBoxLayout(self)
layout.setContentsMargins(10, 0, 10, 0)
layout.setSpacing(0)
layout.addWidget(self.center_label)
layout.setAlignment(self.center_label, Qt.AlignCenter)
self.setLayout(layout)
self._layout = QVBoxLayout(self)
self._layout.setContentsMargins(self._padding_left_right, 0, self._padding_left_right, 0)
self._layout.setSpacing(0)
self._layout.addWidget(self.progressbar)
self.setLayout(self._layout)
self.update()
self._adjust_label_width()
self._sync_progressbar()
self._apply_state_style()
@SafeProperty(
str, doc="The template for the center label. Use $value, $maximum, and $percentage."
@@ -140,17 +130,18 @@ class BECProgressBar(BECWidget, QWidget):
accent_colors = get_accent_colors()
self._state_colors = {
ProgressState.NORMAL: accent_colors.default,
ProgressState.PAUSED: accent_colors.warning,
ProgressState.PAUSED: accent_colors.highlight,
ProgressState.WARNING: accent_colors.warning,
ProgressState.INTERRUPTED: accent_colors.emergency,
ProgressState.COMPLETED: accent_colors.success,
}
self._chunk_radius = None
self._apply_state_style()
@label_template.setter
def label_template(self, template):
self._label_template = template
self._adjust_label_width()
self.set_value(self._user_value)
self.update()
self._sync_progressbar()
@SafeProperty(float, designable=False)
def _progressbar_value(self):
@@ -162,28 +153,16 @@ class BECProgressBar(BECWidget, QWidget):
@_progressbar_value.setter
def _progressbar_value(self, val):
self._value = val
self.update()
self.progressbar.setValue(int(round(val)))
def _update_template(self):
template = Template(self._label_template)
return template.safe_substitute(
value=self._user_value,
maximum=self._user_maximum,
percentage=int((self.map_value(self._user_value) / self._maximum) * 100),
percentage=int(self._percentage(self._user_value)),
)
def _adjust_label_width(self):
"""
Reserve enough horizontal space for the center label so the widget
doesn't resize as the text grows during progress.
"""
template = Template(self._label_template)
sample_text = template.safe_substitute(
value=self._user_maximum, maximum=self._user_maximum, percentage=100
)
width = self.center_label.fontMetrics().horizontalAdvance(sample_text)
self.center_label.setFixedWidth(width)
@SafeSlot(float)
@SafeSlot(int)
def set_value(self, value):
@@ -193,21 +172,35 @@ class BECProgressBar(BECWidget, QWidget):
Args:
value (float): The value to set.
"""
if value > self._user_maximum:
value = self._user_maximum
elif value < self._user_minimum:
value = self._user_minimum
self._target_value = self.map_value(value)
self._user_value = value
self.center_label.setText(self._update_template())
previous_visual_state = self._current_visual_state()
previous_value = self._value
self._user_value = self._clamp_value(value)
self._value = self.map_value(self._user_value)
if self._enable_dynamic_stylesheet and self._value < previous_value:
self._chunk_radius = None
# Update state automatically unless paused or interrupted
if self._state not in (ProgressState.PAUSED, ProgressState.INTERRUPTED):
if self._state not in (
ProgressState.PAUSED,
ProgressState.WARNING,
ProgressState.INTERRUPTED,
):
self._state = (
ProgressState.COMPLETED
if self._user_value >= self._user_maximum
else ProgressState.NORMAL
)
self.animate_progress()
self._sync_progressbar()
visual_state_changed = self._current_visual_state() is not previous_visual_state
if visual_state_changed:
self._chunk_radius = None
if (
self._enable_dynamic_stylesheet
and not visual_state_changed
and (self._chunk_radius is None or self._chunk_radius != self._target_chunk_radius())
):
self._update_chunk_radius()
if visual_state_changed:
self._apply_state_style()
@SafeProperty(object, doc="Current visual state of the progress bar.")
def state(self):
@@ -226,7 +219,8 @@ class BECProgressBar(BECWidget, QWidget):
if not isinstance(state, ProgressState):
raise ValueError("state must be a ProgressState or its value")
self._state = state
self.update()
self._chunk_radius = None
self._apply_state_style()
@SafeProperty(float, doc="Base corner radius in pixels (autoscaled down on small bars).")
def corner_radius(self) -> float:
@@ -235,7 +229,18 @@ class BECProgressBar(BECWidget, QWidget):
@corner_radius.setter
def corner_radius(self, radius: float):
self._corner_radius = max(0.0, radius)
self.update()
self._chunk_radius = None
self._apply_state_style()
@SafeProperty(bool)
def enable_dynamic_stylesheet(self) -> bool:
return self._enable_dynamic_stylesheet
@enable_dynamic_stylesheet.setter
def enable_dynamic_stylesheet(self, enabled: bool):
self._enable_dynamic_stylesheet = bool(enabled)
self._chunk_radius = None
self._apply_state_style()
@SafeProperty(float)
def padding_left_right(self) -> float:
@@ -244,60 +249,12 @@ class BECProgressBar(BECWidget, QWidget):
@padding_left_right.setter
def padding_left_right(self, padding: float):
self._padding_left_right = padding
self.update()
self._layout.setContentsMargins(int(round(padding)), 0, int(round(padding)), 0)
def paintEvent(self, event):
painter = QPainter(self)
painter.setRenderHint(QPainter.Antialiasing)
rect = self.rect().adjusted(self._padding_left_right, 0, -self._padding_left_right, -1)
# Corner radius adapts to widget height so it never exceeds half the bars thickness
radius = min(self._corner_radius, rect.height() / 2)
# Draw background
painter.setBrush(self._background_color)
painter.setPen(Qt.NoPen)
painter.drawRoundedRect(rect, radius, radius) # Rounded corners
# Draw border
painter.setBrush(Qt.NoBrush)
painter.setPen(self._border_color)
painter.drawRoundedRect(rect, radius, radius)
# Determine progress colour based on current state
if self._state == ProgressState.PAUSED:
current_color = self._state_colors[ProgressState.PAUSED]
elif self._state == ProgressState.INTERRUPTED:
current_color = self._state_colors[ProgressState.INTERRUPTED]
elif self._state == ProgressState.COMPLETED or self._value >= self._maximum:
current_color = self._state_colors[ProgressState.COMPLETED]
else:
current_color = self._state_colors[ProgressState.NORMAL]
# Set clipping region to preserve the background's rounded corners
progress_rect = rect.adjusted(
0, 0, int(-rect.width() + (self._value / self._maximum) * rect.width()), 0
)
clip_path = QPainterPath()
clip_path.addRoundedRect(
QRectF(rect), radius, radius
) # Clip to the background's rounded corners
painter.setClipPath(clip_path)
# Draw progress bar
painter.setBrush(current_color)
painter.drawRect(progress_rect) # Less rounded, no additional rounding
painter.end()
def animate_progress(self):
"""
Animate the progress bar from the current value to the target value.
"""
self._value_animation.stop()
self._value_animation.setStartValue(self._value)
self._value_animation.setEndValue(self._target_value)
self._value_animation.start()
def resizeEvent(self, event):
super().resizeEvent(event)
self._chunk_radius = None
self._update_chunk_radius()
@SafeProperty(float)
def maximum(self):
@@ -343,10 +300,11 @@ class BECProgressBar(BECWidget, QWidget):
Args:
maximum (float): The maximum value.
"""
previous_maximum = self._user_maximum
self._user_maximum = maximum
self._adjust_label_width()
if self._enable_dynamic_stylesheet and maximum != previous_maximum:
self._chunk_radius = None
self.set_value(self._user_value) # Update the value to fit the new range
self.update()
@SafeSlot(float)
def set_minimum(self, minimum: float):
@@ -356,40 +314,126 @@ class BECProgressBar(BECWidget, QWidget):
Args:
minimum (float): The minimum value.
"""
previous_minimum = self._user_minimum
self._user_minimum = minimum
if self._enable_dynamic_stylesheet and minimum != previous_minimum:
self._chunk_radius = None
self.set_value(self._user_value) # Update the value to fit the new range
self.update()
def map_value(self, value: float):
"""
Map the user value to the range [0, 100*self._oversampling_factor] for the progress
"""
return (
(value - self._user_minimum) / (self._user_maximum - self._user_minimum) * self._maximum
)
span = self._user_maximum - self._user_minimum
if span <= 0:
return float(self._maximum if value >= self._user_maximum else 0)
mapped_value = (value - self._user_minimum) / span * self._maximum
return min(float(self._maximum), max(0.0, mapped_value))
def _percentage(self, value: float) -> float:
return (self.map_value(value) / self._maximum) * 100 if self._maximum else 0.0
def _clamp_value(self, value: float) -> float:
if self._user_maximum <= self._user_minimum:
return self._user_maximum
return min(self._user_maximum, max(self._user_minimum, value))
def _sync_progressbar(self) -> None:
self.progressbar.setRange(0, int(self._maximum))
self.progressbar.setValue(int(round(self._value)))
self.progressbar.setFormat(self._update_template())
def _setup_style_sheet(self, *, chunk_radius: int) -> None:
radius = int(round(self._corner_radius))
chunk_color = self._state_colors[self._current_visual_state()].name()
self.progressbar.setStyleSheet(f"""
QProgressBar {{
background-color: palette(mid);
border: none;
border-radius: {radius}px;
color: palette(text);
text-align: center;
}}
QProgressBar::chunk {{
background-color: {chunk_color};
border-radius: {chunk_radius}px;
}}
""")
def _update_chunk_radius(self) -> None:
chunk_radius = self._current_chunk_radius()
if chunk_radius != self._chunk_radius:
self._chunk_radius = chunk_radius
self._setup_style_sheet(chunk_radius=chunk_radius)
self._apply_state_palette()
def _apply_state_style(self) -> None:
if self._chunk_radius is None:
self._chunk_radius = self._current_chunk_radius()
self._setup_style_sheet(chunk_radius=self._chunk_radius)
self._apply_state_palette()
def _apply_state_palette(self) -> None:
color = self._state_colors[self._current_visual_state()]
palette = self.progressbar.palette()
palette.setColor(QPalette.ColorRole.Highlight, color)
palette.setColor(QPalette.ColorRole.HighlightedText, palette.color(QPalette.ColorRole.Text))
self.progressbar.setPalette(palette)
def _current_chunk_radius(self) -> int:
target_radius = self._target_chunk_radius()
if not self._enable_dynamic_stylesheet:
return target_radius
return self._calculate_chunk_radius(target_radius)
def _target_chunk_radius(self) -> int:
radius = int(round(self._corner_radius))
return max(0, radius - 1)
def _calculate_chunk_radius(self, target_radius: int) -> int:
"""
Scale the chunk radius down while the filled part is narrower than the target radius.
Qt stylesheets otherwise over-round very small chunks.
"""
if target_radius <= 0 or self._maximum <= 0:
return 0
fill_width = self.progressbar.width() * min(1.0, max(0.0, self._value / self._maximum))
if fill_width <= 0:
return 0
return min(target_radius, max(1, int(fill_width / 2)))
def _current_visual_state(self) -> ProgressState:
if self._state in (ProgressState.PAUSED, ProgressState.WARNING, ProgressState.INTERRUPTED):
return self._state
if self._state == ProgressState.COMPLETED or self._value >= self._maximum:
return ProgressState.COMPLETED
return ProgressState.NORMAL
def _get_label(self) -> str:
"""Return the label text. mostly used for testing rpc."""
return self.center_label.text()
return self.progressbar.text()
if __name__ == "__main__": # pragma: no cover
app = QApplication(sys.argv)
progressBar = BECProgressBar()
progressBar.show()
progressBar.set_minimum(-100)
progressBar.set_maximum(0)
progress_bar = BECProgressBar()
progress_bar.setWindowTitle("BEC Progress Bar")
progress_bar.resize(360, 48)
progress_bar.set_minimum(-100)
progress_bar.set_maximum(0)
progress_bar.set_value(-100)
progress_bar.show()
# Example of setting values
def update_progress():
value = progressBar._user_value + 2.5
if value > progressBar._user_maximum:
value = -100 # progressBar._maximum / progressBar._upsampling_factor
progressBar.set_value(value)
value = progress_bar._user_value + 2.5
if value > progress_bar._user_maximum:
value = progress_bar._user_minimum
progress_bar.set_value(value)
timer = QTimer()
timer = QTimer(progress_bar)
timer.timeout.connect(update_progress)
timer.start(200) # Update every half second
timer.start(200)
sys.exit(app.exec())
@@ -0,0 +1,280 @@
from __future__ import annotations
import time
from dataclasses import dataclass
from typing import Literal
import numpy as np
from bec_lib.endpoints import MessageEndpoints
from qtpy.QtCore import QObject, QTimer, Signal
from bec_widgets.utils.error_popups import SafeSlot
@dataclass(frozen=True)
class ProgressSnapshot:
value: float
max_value: float
done: bool
status: Literal["open", "paused", "aborted", "halted", "closed", "user_completed"]
scan_id: str | None = None
scan_number: int | None = None
rid: str | None = None
is_new_scan: bool = False
class ProgressTask(QObject):
"""
Class to store progress information.
Inspired by https://github.com/Textualize/rich/blob/master/rich/progress.py
"""
def __init__(
self, parent: QObject | None, value: float = 0, max_value: float = 0, done: bool = False
):
super().__init__(parent=parent)
self.start_time = time.monotonic()
self.done = done
self.value = value
self.max_value = max_value
self._elapsed_time = 0
self.timer = QTimer(self)
self.timer.timeout.connect(self.update_elapsed_time)
self.timer.start(1000)
def update(self, value: float, max_value: float, done: bool = False):
"""
Update the progress.
"""
self.max_value = max_value
self.done = done
self.value = value
if done:
self.timer.stop()
def update_elapsed_time(self):
"""
Update the time estimates. This is called every second by a QTimer.
"""
self._elapsed_time = max(0.0, time.monotonic() - self.start_time)
@property
def percentage(self) -> float:
"""float: Get progress of task as a percentage. If a None total was set, returns 0"""
if not self.max_value:
return 0.0
completed = (self.value / self.max_value) * 100.0
completed = min(100.0, max(0.0, completed))
return completed
@property
def speed(self) -> float:
"""Get the estimated speed in steps per second."""
if self._elapsed_time == 0:
return 0.0
return self.value / self._elapsed_time
@property
def frequency(self) -> float:
"""Get the estimated frequency in steps per second."""
if self.speed == 0:
return 0.0
return 1 / self.speed
@property
def time_elapsed(self) -> str:
return self._format_time(int(self._elapsed_time))
@property
def remaining(self) -> float:
"""Get the estimated remaining steps."""
if self.done:
return 0.0
remaining = self.max_value - self.value
return remaining
@property
def time_remaining(self) -> str:
"""
Get the estimated remaining time in the format HH:MM:SS.
"""
if self.done or not self.speed or not self.remaining:
return self._format_time(0)
estimate = int(np.round(self.remaining / self.speed))
return self._format_time(estimate)
@staticmethod
def _format_time(seconds: float) -> str:
"""
Format the time in seconds to a string in the format HH:MM:SS.
"""
return f"{seconds // 3600:02}:{(seconds // 60) % 60:02}:{seconds % 60:02}"
class BECProgressTracker(QObject):
"""
Shared backend for BEC scan progress messages.
"""
progress_started = Signal(object)
progress_updated = Signal(object)
progress_finished = Signal(object)
progress_cleared = Signal()
def __init__(self, bec_dispatcher, parent: QObject | None = None):
super().__init__(parent=parent)
self.bec_dispatcher = bec_dispatcher
self._connected = False
self.task: ProgressTask | None = None
self.scan_number: int | None = None
self._active_scan_id: str | None = None
self._active_rid: str | None = None
self._last_reset_scan_id: str | None = None
def start(self) -> None:
if self._connected:
return
self.bec_dispatcher.connect_slot(
self.process_progress_message, MessageEndpoints.scan_progress()
)
self.bec_dispatcher.connect_slot(
self.process_scan_status_message, MessageEndpoints.scan_status()
)
self._connected = True
def _start_task(self, scan_id: str | None, rid: str | None = None) -> None:
if self.task is not None:
self.task.timer.stop()
self.task.deleteLater()
self.task = ProgressTask(parent=self)
self._active_scan_id = scan_id
self._active_rid = rid
self.progress_started.emit(
ProgressSnapshot(
value=0,
max_value=100,
done=False,
status="open",
scan_id=self._active_scan_id,
scan_number=self.scan_number,
rid=self._active_rid,
)
)
def clear_task(self, *, emit_finished: bool = True) -> None:
if self.task is None:
self._active_scan_id = None
self._active_rid = None
self.progress_cleared.emit()
return
self.task.timer.stop()
self.task.deleteLater()
self.task = None
self._active_scan_id = None
self._active_rid = None
self.progress_cleared.emit()
if emit_finished:
self.progress_finished.emit(
ProgressSnapshot(
value=0,
max_value=100,
done=True,
status="open",
scan_id=self._active_scan_id,
scan_number=self.scan_number,
rid=self._active_rid,
)
)
@SafeSlot(dict, dict)
def process_progress_message(
self, msg_content: dict, metadata: dict
) -> ProgressSnapshot | None:
done = msg_content.get("done", False)
value = msg_content.get("value", 0)
max_value = msg_content.get("max_value", 100)
status: Literal["open", "paused", "aborted", "halted", "closed", "user_completed"] = (
metadata.get("status", "open")
)
scan_id = metadata.get("scan_id") or metadata.get("RID")
rid = metadata.get("RID")
scan_number = metadata.get("scan_number")
if scan_number is not None:
self.scan_number = scan_number
is_new_scan = False
previous_scan_id = self._active_scan_id
previous_rid = self._active_rid
identity_changed = (
(scan_id is not None and scan_id != previous_scan_id)
or (rid is not None and rid != previous_rid)
or (previous_scan_id is None and previous_rid is None)
)
if self.task is None:
self._start_task(scan_id, rid=rid)
is_new_scan = identity_changed
elif scan_id is not None and scan_id != self._active_scan_id:
self._start_task(scan_id, rid=rid)
is_new_scan = True
elif rid is not None and rid != self._active_rid:
self._start_task(scan_id or self._active_scan_id, rid=rid)
is_new_scan = True
if self.task is None:
return None
self.task.update(value, max_value, done)
snapshot = ProgressSnapshot(
value=value,
max_value=max_value,
done=done,
status=status,
scan_id=self._active_scan_id,
scan_number=self.scan_number,
rid=self._active_rid,
is_new_scan=is_new_scan,
)
self.progress_updated.emit(snapshot)
if done:
self.clear_task()
return snapshot
@SafeSlot(dict, dict)
def process_scan_status_message(
self, msg_content: dict, metadata: dict
) -> ProgressSnapshot | None:
if msg_content.get("status") != "open":
return None
scan_id = msg_content.get("scan_id") or metadata.get("scan_id") or metadata.get("RID")
if scan_id is None or scan_id == self._last_reset_scan_id:
return None
self.clear_task(emit_finished=False)
self._last_reset_scan_id = scan_id
self.scan_number = msg_content.get("scan_number")
snapshot = ProgressSnapshot(
value=0,
max_value=100,
done=False,
status="open",
scan_id=scan_id,
scan_number=self.scan_number,
rid=metadata.get("RID"),
is_new_scan=True,
)
self.progress_updated.emit(snapshot)
return snapshot
def cleanup(self) -> None:
self.clear_task(emit_finished=False)
if self._connected:
self.bec_dispatcher.disconnect_slot(
self.process_progress_message, MessageEndpoints.scan_progress()
)
self.bec_dispatcher.disconnect_slot(
self.process_scan_status_message, MessageEndpoints.scan_status()
)
self._connected = False
@@ -13,6 +13,7 @@ from bec_widgets import BECWidget
from bec_widgets.utils.bec_connector import ConnectionConfig
from bec_widgets.utils.colors import Colors
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.widgets.progress.progress_backend import BECProgressTracker, ProgressSnapshot
logger = bec_logger.logger
if TYPE_CHECKING:
@@ -81,6 +82,8 @@ class Ring(BECWidget, QWidget):
self._color: QColor = self.convert_color(self.config.color)
self._background_color: QColor = self.convert_color(self.config.background_color)
self.registered_slot: tuple[Callable, str | EndpointInfo] | None = None
self.progress_tracker = BECProgressTracker(self.bec_dispatcher, parent=self)
self.progress_tracker.progress_updated.connect(self._on_progress_snapshot)
self.RID = None
self._gap = 5
self._hovered = False
@@ -219,35 +222,32 @@ class Ring(BECWidget, QWidget):
case "manual":
if self.config.mode == "manual":
return
if self.registered_slot is not None:
self.bec_dispatcher.disconnect_slot(*self.registered_slot)
self._disconnect_registered_update()
self.config.mode = "manual"
self.registered_slot = None
case "scan":
if self.config.mode == "scan":
return
if self.registered_slot is not None:
self.bec_dispatcher.disconnect_slot(*self.registered_slot)
self._disconnect_registered_update()
self.config.mode = "scan"
self.bec_dispatcher.connect_slot(
self.on_scan_progress, MessageEndpoints.scan_progress()
)
self.registered_slot = (self.on_scan_progress, MessageEndpoints.scan_progress())
self.progress_tracker.start()
case "device":
if self.registered_slot is not None:
self.bec_dispatcher.disconnect_slot(*self.registered_slot)
self._disconnect_registered_update()
self.config.mode = "device"
if device == "":
self.registered_slot = None
return
self.config.device = device
# self.config.signal = self._get_signal_from_device(device, signal)
signal = self._update_device_connection(device, signal)
self.config.signal = signal
case _:
raise ValueError(f"Unsupported mode: {mode}")
def _disconnect_registered_update(self):
if self.registered_slot is not None:
self.bec_dispatcher.disconnect_slot(*self.registered_slot)
self.registered_slot = None
self.progress_tracker.cleanup()
def set_precision(self, precision: int):
"""
Set the precision for the ring widget.
@@ -270,13 +270,13 @@ class Ring(BECWidget, QWidget):
def _get_signals_for_device(self, device: str) -> dict[str, list[str]]:
"""
Get the signals for the device.
Get the appropriate signals for the device to be used in the ring widget, based on the signal infos from the device manager.
Args:
device(str): Device name for the device
device(str): Device name for the device readback mode
Returns:
dict[str, list[str]]: Dictionary with the signals for the device
dict[str, list[str]]: Signal infos for the device to be used in the ring widget
"""
dm = self.bec_dispatcher.client.device_manager
if not dm:
@@ -285,24 +285,25 @@ class Ring(BECWidget, QWidget):
if dev_obj is None:
raise ValueError(f"Device '{device}' not found in device manager.")
signal_infos = getattr(dev_obj, "_info", {}).get("signals", {})
progress_signals = [
obj["component_name"]
for obj in dev_obj._info["signals"].values()
if obj["signal_class"] == "ProgressSignal"
for obj in signal_infos.values()
if obj.get("signal_class") == "ProgressSignal"
]
hinted_signals = [
obj["obj_name"]
for obj in dev_obj._info["signals"].values()
if obj["kind_str"] == "hinted"
and obj["signal_class"]
for obj in signal_infos.values()
if obj.get("kind_str") == "hinted"
and obj.get("signal_class")
not in ["ProgressSignal", "AsyncSignal", "AsyncMultiSignal", "DynamicSignal"]
]
normal_signals = [
obj["component_name"]
for obj in dev_obj._info["signals"].values()
if obj["kind_str"] == "normal"
for obj in signal_infos.values()
if obj.get("kind_str") == "normal"
]
return {
"progress_signals": progress_signals,
"hinted_signals": hinted_signals,
@@ -311,21 +312,15 @@ class Ring(BECWidget, QWidget):
def _update_device_connection(self, device: str, signal: str | None) -> str:
"""
Update the device connection for the ring widget.
Subscribe device mode to the endpoint matching the selected signal.
In general, we support two modes here:
- If signal is provided, we use that directly.
- If signal is not provided, we try to get the signal from the device manager.
We first check for progress signals, then for hinted signals, and finally for normal signals.
Depending on what type of signal we get (progress or hinted/normal), we subscribe to different endpoints.
Args:
device(str): Device name for the device mode
signal(str): Signal name for the device mode
When no signal is provided, the ring selects the first available progress
signal, then the first hinted readback signal, then the first normal
readback signal. Progress signals use the device_progress endpoint;
readback signals use the device_readback endpoint.
Returns:
str: The selected signal name for the device mode
The selected signal name, or an empty string if the device is not known.
"""
logger.info(f"Updating device connection for device '{device}' and signal '{signal}'")
dm = self.bec_dispatcher.client.device_manager
@@ -341,18 +336,17 @@ class Ring(BECWidget, QWidget):
normal_signals = signals["normal_signals"]
if not signal:
# If signal is not provided, we try to get it from the device manager
if len(progress_signals) > 0:
if progress_signals:
signal = progress_signals[0]
logger.info(
f"Using progress signal '{signal}' for device '{device}' in ring progress bar."
)
elif len(hinted_signals) > 0:
elif hinted_signals:
signal = hinted_signals[0]
logger.info(
f"Using hinted signal '{signal}' for device '{device}' in ring progress bar."
)
elif len(normal_signals) > 0:
elif normal_signals:
signal = normal_signals[0]
logger.info(
f"Using normal signal '{signal}' for device '{device}' in ring progress bar."
@@ -366,26 +360,18 @@ class Ring(BECWidget, QWidget):
self.bec_dispatcher.connect_slot(self.on_device_progress, endpoint)
self.registered_slot = (self.on_device_progress, endpoint)
return signal
if signal in hinted_signals or signal in normal_signals:
endpoint = MessageEndpoints.device_readback(device)
self.bec_dispatcher.connect_slot(self.on_device_readback, endpoint)
self.registered_slot = (self.on_device_readback, endpoint)
return signal
@SafeSlot(dict, dict)
def on_scan_progress(self, msg, meta):
"""
Update the ring widget with the scan progress.
Args:
msg(dict): Message with the scan progress
meta(dict): Metadata for the message
"""
current_RID = meta.get("RID", None)
if current_RID != self.RID:
self.set_min_max_values(0, msg.get("max_value", 100))
self.set_value(msg.get("value", 0))
self.update()
raise ValueError(
f"Signal '{signal}' is not usable for ring progress device mode. "
f"Available progress signals: {progress_signals}; "
f"available readback signals: {hinted_signals + normal_signals}."
)
@SafeSlot(dict, dict)
def on_device_readback(self, msg, meta):
@@ -408,30 +394,31 @@ class Ring(BECWidget, QWidget):
@SafeSlot(dict, dict)
def on_device_progress(self, msg, meta):
"""
Update the ring widget with the device progress.
Args:
msg(dict): Message with the device progress
meta(dict): Metadata for the message
"""
device = self.config.device
if device is None:
return
max_val = msg.get("max_value", 100)
self.set_min_max_values(0, max_val)
value = msg.get("value", 0)
if msg.get("done"):
value = max_val
self.set_value(value)
self.set_value(max_val if msg.get("done") else msg.get("value", 0))
self.update()
def _on_progress_snapshot(self, snapshot: ProgressSnapshot):
if snapshot.is_new_scan:
self.set_min_max_values(0, snapshot.max_value)
self.RID = snapshot.rid
self.set_value(snapshot.value)
self.update()
def paintEvent(self, event):
if not self.progress_container:
return
painter = QtGui.QPainter(self)
painter.setRenderHint(QtGui.QPainter.RenderHint.Antialiasing)
size = min(self.width(), self.height())
if size <= 0 or not self.isVisible():
return
painter = QtGui.QPainter(self)
if not painter.isActive():
return
painter.setRenderHint(QtGui.QPainter.RenderHint.Antialiasing)
# Center the ring
x_offset = (self.width() - size) // 2
@@ -509,15 +496,6 @@ class Ring(BECWidget, QWidget):
return QtGui.QColor(*color)
raise ValueError(f"Unsupported color format: {color}")
def cleanup(self):
"""
Cleanup the ring widget.
Disconnect any registered slots.
"""
if self.registered_slot is not None:
self.bec_dispatcher.disconnect_slot(*self.registered_slot)
self.registered_slot = None
###############################################
####### QProperties ###########################
###############################################
@@ -666,6 +644,7 @@ class Ring(BECWidget, QWidget):
if self.registered_slot is not None:
self.bec_dispatcher.disconnect_slot(*self.registered_slot)
self.registered_slot = None
self.progress_tracker.cleanup()
self._hover_animation.stop()
super().cleanup()
@@ -1,121 +1,27 @@
from __future__ import annotations
import enum
import os
import time
from typing import Literal
import numpy as np
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from qtpy.QtCore import QObject, QTimer, Signal
from qtpy.QtCore import Signal
from qtpy.QtWidgets import QVBoxLayout, QWidget
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.utils.error_popups import SafeProperty
from bec_widgets.utils.ui_loader import UILoader
from bec_widgets.widgets.progress.bec_progressbar.bec_progressbar import ProgressState
from bec_widgets.widgets.progress.progress_backend import BECProgressTracker, ProgressSnapshot
logger = bec_logger.logger
class ProgressSource(enum.Enum):
"""
Enum to define the source of the progress.
"""
SCAN_PROGRESS = "scan_progress"
DEVICE_PROGRESS = "device_progress"
class ProgressTask(QObject):
"""
Class to store progress information.
Inspired by https://github.com/Textualize/rich/blob/master/rich/progress.py
"""
def __init__(self, parent: QWidget, value: float = 0, max_value: float = 0, done: bool = False):
super().__init__(parent=parent)
self.start_time = time.time()
self.done = done
self.value = value
self.max_value = max_value
self._elapsed_time = 0
self.timer = QTimer(self)
self.timer.timeout.connect(self.update_elapsed_time)
self.timer.start(100) # update the elapsed time every 100 ms
def update(self, value: float, max_value: float, done: bool = False):
"""
Update the progress.
"""
self.max_value = max_value
self.done = done
self.value = value
if done:
self.timer.stop()
def update_elapsed_time(self):
"""
Update the time estimates. This is called every 100 ms by a QTimer.
"""
self._elapsed_time += 0.1
@property
def percentage(self) -> float:
"""float: Get progress of task as a percentage. If a None total was set, returns 0"""
if not self.max_value:
return 0.0
completed = (self.value / self.max_value) * 100.0
completed = min(100.0, max(0.0, completed))
return completed
@property
def speed(self) -> float:
"""Get the estimated speed in steps per second."""
if self._elapsed_time == 0:
return 0.0
return self.value / self._elapsed_time
@property
def frequency(self) -> float:
"""Get the estimated frequency in steps per second."""
if self.speed == 0:
return 0.0
return 1 / self.speed
@property
def time_elapsed(self) -> str:
# format the elapsed time to a string in the format HH:MM:SS
return self._format_time(int(self._elapsed_time))
@property
def remaining(self) -> float:
"""Get the estimated remaining steps."""
if self.done:
return 0.0
remaining = self.max_value - self.value
return remaining
@property
def time_remaining(self) -> str:
"""
Get the estimated remaining time in the format HH:MM:SS.
"""
if self.done or not self.speed or not self.remaining:
return self._format_time(0)
estimate = int(np.round(self.remaining / self.speed))
return self._format_time(estimate)
def _format_time(self, seconds: float) -> str:
"""
Format the time in seconds to a string in the format HH:MM:SS.
"""
return f"{seconds // 3600:02}:{(seconds // 60) % 60:02}:{seconds % 60:02}"
BEC_STATUS_TO_PROGRESS_STATE = {
"open": ProgressState.NORMAL,
"paused": ProgressState.PAUSED,
"aborted": ProgressState.WARNING,
"halted": ProgressState.INTERRUPTED,
"closed": ProgressState.COMPLETED,
"user_completed": ProgressState.COMPLETED,
}
class ScanProgressBar(BECWidget, QWidget):
@@ -130,7 +36,14 @@ class ScanProgressBar(BECWidget, QWidget):
progress_finished = Signal()
def __init__(
self, parent=None, client=None, config=None, gui_id=None, one_line_design=False, **kwargs
self,
parent=None,
client=None,
config=None,
gui_id=None,
one_line_design=False,
enable_dynamic_stylesheet: bool = True,
**kwargs,
):
super().__init__(parent=parent, client=client, config=config, gui_id=gui_id, **kwargs)
@@ -146,84 +59,43 @@ class ScanProgressBar(BECWidget, QWidget):
self.layout.addWidget(self.ui)
self.setLayout(self.layout)
self.progressbar = self.ui.progressbar
self.progressbar.enable_dynamic_stylesheet = enable_dynamic_stylesheet
self._show_elapsed_time = self.ui.elapsed_time_label.isVisible()
self._show_remaining_time = self.ui.remaining_time_label.isVisible()
self._show_source_label = self.ui.source_label.isVisible()
self.connect_to_queue()
self._progress_source = None
self._progress_device = None
self.task = None
self.scan_number = None
self.progress_started.connect(lambda: print("Scan progress started"))
def connect_to_queue(self):
"""
Connect to the queue status signal.
"""
self.bec_dispatcher.connect_slot(self.on_queue_update, MessageEndpoints.scan_queue_status())
def set_progress_source(self, source: ProgressSource, device=None):
"""
Set the source of the progress.
"""
if self._progress_source == source and self._progress_device == device:
self.update_source_label(source, device=device)
return
if self._progress_source is not None:
self.bec_dispatcher.disconnect_slot(
self.on_progress_update,
(
MessageEndpoints.scan_progress()
if self._progress_source == ProgressSource.SCAN_PROGRESS
else MessageEndpoints.device_progress(device=self._progress_device)
),
)
self._progress_source = source
self._progress_device = None if source == ProgressSource.SCAN_PROGRESS else device
self.bec_dispatcher.connect_slot(
self.on_progress_update,
(
MessageEndpoints.scan_progress()
if source == ProgressSource.SCAN_PROGRESS
else MessageEndpoints.device_progress(device=device)
),
self.progress_tracker = BECProgressTracker(self.bec_dispatcher, parent=self)
self.progress_tracker.progress_started.connect(self._on_progress_started)
self.progress_tracker.progress_updated.connect(self._on_progress_snapshot)
self.progress_tracker.progress_finished.connect(
lambda _snapshot: self.progress_finished.emit()
)
self.update_source_label(source, device=device)
# self.progress_started.emit()
self.progress_tracker.start()
def update_source_label(self, source: ProgressSource, device=None):
scan_text = f"Scan {self.scan_number}" if self.scan_number is not None else "Scan"
text = scan_text if source == ProgressSource.SCAN_PROGRESS else f"Device {device}"
logger.info(f"Set progress source to {text}")
self.ui.source_label.setText(text)
@SafeSlot(dict, dict)
def on_progress_update(self, msg_content: dict, metadata: dict):
"""
Update the progress bar based on the progress message.
"""
value = msg_content["value"]
max_value = msg_content.get("max_value", 100)
done = msg_content.get("done", False)
status: Literal["open", "paused", "aborted", "halted", "closed"] = metadata.get(
"status", "open"
)
if self.task is None:
def update_source_label(self):
scan_number = self.progress_tracker.scan_number
scan_text = f"Scan {scan_number}" if scan_number is not None else "Scan"
if self.ui.source_label.text() == scan_text:
return
self.task.update(value, max_value, done)
logger.info(f"Set progress source to {scan_text}")
self.ui.source_label.setText(scan_text)
def _on_progress_started(self, _snapshot: ProgressSnapshot):
if self.progress_tracker.task is not None:
self.progress_tracker.task.timer.timeout.connect(self.update_labels)
self.progress_started.emit()
def _on_progress_snapshot(self, snapshot: ProgressSnapshot):
self.update_labels()
self.progressbar.set_maximum(self.task.max_value)
self.progressbar.state = ProgressState.from_bec_status(status)
self.progressbar.set_value(self.task.value)
if done:
self.task = None
self.progress_finished.emit()
return
if snapshot.is_new_scan and self.progress_tracker.task is None:
self.ui.elapsed_time_label.setText("00:00:00")
self.ui.remaining_time_label.setText("00:00:00")
self.update_source_label()
self.progressbar.set_maximum(snapshot.max_value)
self.progressbar.set_value(snapshot.value)
self.progressbar.state = BEC_STATUS_TO_PROGRESS_STATE.get(
snapshot.status.lower(), ProgressState.NORMAL
)
@SafeProperty(bool)
def show_elapsed_time(self):
@@ -260,74 +132,17 @@ class ScanProgressBar(BECWidget, QWidget):
"""
Update the labels based on the progress task.
"""
if self.task is None:
task = self.progress_tracker.task
if task is None:
return
self.ui.elapsed_time_label.setText(self.task.time_elapsed)
self.ui.remaining_time_label.setText(self.task.time_remaining)
@SafeSlot(dict, dict, verify_sender=True)
def on_queue_update(self, msg_content, metadata):
"""
Update the progress bar based on the queue status.
"""
if not "queue" in msg_content:
return
if "primary" not in msg_content["queue"]:
return
if (primary_queue := msg_content.get("queue").get("primary")) is None:
return
if not isinstance(primary_queue, messages.ScanQueueStatus):
return
primary_queue_info = primary_queue.info
if len(primary_queue_info) == 0:
return
scan_info = primary_queue_info[0]
if scan_info is None:
return
if scan_info.status.lower() == "running" and self.task is None:
self.task = ProgressTask(parent=self)
self.progress_started.emit()
active_request_block = scan_info.active_request_block
if active_request_block is None:
return
self.scan_number = active_request_block.scan_number
report_instructions = active_request_block.report_instructions
if not report_instructions:
return
# for now, let's just use the first instruction
instruction = report_instructions[0]
if "scan_progress" in instruction:
self.set_progress_source(ProgressSource.SCAN_PROGRESS)
elif "device_progress" in instruction:
device = instruction["device_progress"][0]
self.set_progress_source(ProgressSource.DEVICE_PROGRESS, device=device)
self.ui.elapsed_time_label.setText(task.time_elapsed)
self.ui.remaining_time_label.setText(task.time_remaining)
def cleanup(self):
if self.task is not None:
self.task.timer.stop()
self.close()
self.deleteLater()
if self._progress_source is not None:
self.bec_dispatcher.disconnect_slot(
self.on_progress_update,
(
MessageEndpoints.scan_progress()
if self._progress_source == ProgressSource.SCAN_PROGRESS
else MessageEndpoints.device_progress(device=self._progress_device)
),
)
self._progress_source = None
self._progress_device = None
self.progress_tracker.cleanup()
self.progressbar.close()
self.progressbar.deleteLater()
self.bec_dispatcher.disconnect_slot(
self.on_queue_update, MessageEndpoints.scan_queue_status()
)
super().cleanup()
+2 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "bec_widgets"
version = "3.13.4"
version = "3.13.5"
description = "BEC Widgets"
requires-python = ">=3.11"
classifiers = [
@@ -72,6 +72,7 @@ qtermwidget = ["pyside6_qtermwidget"]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
+26
View File
@@ -6,6 +6,7 @@ from qtpy.QtCore import QObject
from qtpy.QtWidgets import QApplication, QWidget
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeProperty
from bec_widgets.utils.error_popups import SafeSlot as Slot
@@ -15,6 +16,9 @@ from .client_mocks import mocked_client
class BECConnectorQObject(BECConnector, QObject): ...
class _CleanupBroadcastWidget(BECWidget, QWidget): ...
@pytest.fixture
def bec_connector(mocked_client):
connector = BECConnectorQObject(client=mocked_client)
@@ -146,6 +150,28 @@ def test_bec_connector_change_object_name(bec_connector):
assert not any(obj.objectName() == previous_name for obj in all_objects)
def test_bec_widget_cleanup_broadcasts_after_children_are_unregistered(mocked_client, qtbot):
parent = _CleanupBroadcastWidget(client=mocked_client, object_name="cleanup_parent")
child = _CleanupBroadcastWidget(
parent=parent, client=mocked_client, object_name="cleanup_child"
)
qtbot.addWidget(parent)
observed_connections = []
parent.rpc_register.callbacks.append(
lambda connections: observed_connections.append(set(connections))
)
parent.close()
assert parent._destroyed is True
assert child.gui_id not in parent.rpc_register.list_all_connections()
assert all(
parent.gui_id in snapshot or child.gui_id not in snapshot
for snapshot in observed_connections
)
def test_bec_connector_export_settings():
class MyWidget(BECConnector, QWidget):
+47 -1
View File
@@ -5,7 +5,7 @@ from unittest import mock
import pytest
from bec_lib import service_config
from bec_lib.messages import ScanMessage
from bec_lib.messages import GUIInstructionMessage, ScanMessage
from bec_lib.serialization import MsgpackSerialization
from bec_widgets.utils.bec_dispatcher import BECDispatcher, QtRedisConnector, QtThreadSafeCallback
@@ -213,3 +213,49 @@ def test_dispatcher_2_topic_same_cb_with_boundmethod(
send_msg_event.set()
qtbot.wait(10)
def test_qt_redis_connector_logs_rpc_before_qt_callback(monkeypatch):
info_mock = mock.MagicMock()
warning_mock = mock.MagicMock()
monkeypatch.setattr("bec_widgets.utils.bec_dispatcher.logger.info", info_mock)
monkeypatch.setattr("bec_widgets.utils.bec_dispatcher.logger.warning", warning_mock)
def callback(_msg, _metadata):
pass
cb = QtThreadSafeCallback(callback)
connector = QtRedisConnector("localhost:1", mock.MagicMock())
rpc_msg = GUIInstructionMessage(
action="set_value",
parameter={"args": [1], "kwargs": {"source": "test"}, "gui_id": "ring"},
metadata={
"request_id": "dispatcher-request",
"receiver": "gui",
"object_name": "progressbar",
"timeout": 0.1,
"sent_at": 1.0,
"deadline": 1.1,
},
)
try:
connector._execute_callback(cb, {"data": rpc_msg}, {})
info_mock.assert_called_once()
info_message = info_mock.call_args.args[0]
assert "GUI RPC dispatcher received request before Qt callback emit" in info_message
assert "request_id=dispatcher-request" in info_message
assert "method=set_value" in info_message
assert "receiver=gui" in info_message
assert "target_gui_id=ring" in info_message
assert "object_name=progressbar" in info_message
assert "timeout=0.1" in info_message
assert "stale_on_dispatch=True" in info_message
warning_mock.assert_called_once()
warning_message = warning_mock.call_args.args[0]
assert "received request after client timeout deadline" in warning_message
assert "request_id=dispatcher-request" in warning_message
finally:
connector.shutdown()
+176 -20
View File
@@ -1,5 +1,8 @@
import numpy as np
from unittest import mock
import pytest
from qtpy.QtGui import QPalette
from qtpy.QtWidgets import QProgressBar
from bec_widgets.widgets.progress.bec_progressbar.bec_progressbar import (
BECProgressBar,
@@ -15,6 +18,14 @@ def progressbar(qtbot):
yield widget
@pytest.fixture
def static_progressbar(qtbot):
widget = BECProgressBar(enable_dynamic_stylesheet=False)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_progressbar(progressbar):
progressbar.update()
@@ -23,36 +34,181 @@ def test_progressbar_set_value(qtbot, progressbar):
progressbar.set_minimum(0)
progressbar.set_maximum(100)
progressbar.set_value(50)
progressbar.paintEvent(None)
qtbot.waitUntil(
lambda: np.isclose(
progressbar._value, progressbar._user_value * progressbar._oversampling_factor
)
)
assert isinstance(progressbar.progressbar, QProgressBar)
assert progressbar._value == progressbar._user_value * progressbar._oversampling_factor
assert progressbar.progressbar.value() == 50 * progressbar._oversampling_factor
def test_progressbar_label(progressbar):
progressbar.label_template = "Test: $value"
progressbar.set_value(50)
assert progressbar.center_label.text() == "Test: 50"
assert progressbar._get_label() == "Test: 50"
assert progressbar.progressbar.text() == "Test: 50"
def test_progress_state_from_bec_status():
"""ProgressState.from_bec_status() maps BEC literals correctly."""
mapping = {
"open": ProgressState.NORMAL,
"paused": ProgressState.PAUSED,
"aborted": ProgressState.INTERRUPTED,
"halted": ProgressState.PAUSED,
"closed": ProgressState.COMPLETED,
"UNKNOWN": ProgressState.NORMAL, # fallback
}
for text, expected in mapping.items():
assert ProgressState.from_bec_status(text) is expected
def test_progressbar_equal_minimum_and_maximum_does_not_raise(progressbar):
progressbar.set_minimum(0)
progressbar.set_maximum(0)
progressbar.set_value(0)
assert progressbar._get_label() == "0 / 0 - 100 %"
assert progressbar.progressbar.value() == progressbar.progressbar.maximum()
def test_progressbar_uses_static_stylesheet_with_palette_state_color(progressbar):
progressbar.progressbar.resize(100, 20)
progressbar.set_value(50)
progressbar.state = ProgressState.PAUSED
style_sheet = progressbar.progressbar.styleSheet()
assert "QProgressBar::chunk" in style_sheet
assert (
f"background-color: {progressbar._state_colors[ProgressState.PAUSED].name()};"
in style_sheet
)
assert "background-color: palette(mid);" in style_sheet
assert "border-radius: 7px;" in style_sheet
assert (
progressbar.progressbar.palette().color(QPalette.ColorRole.Highlight)
== progressbar._state_colors[ProgressState.PAUSED]
)
def test_progressbar_value_updates_do_not_rebuild_stylesheet_within_same_chunk_mode(progressbar):
progressbar.progressbar.resize(100, 20)
progressbar.set_value(30)
with mock.patch.object(
progressbar, "_setup_style_sheet", wraps=progressbar._setup_style_sheet
) as setup_style_sheet:
progressbar.set_value(35)
progressbar.set_value(42)
progressbar.set_value(50)
setup_style_sheet.assert_not_called()
def test_progressbar_value_updates_skip_chunk_radius_after_target_reached(progressbar):
progressbar.progressbar.resize(100, 20)
progressbar.set_value(30)
assert progressbar._chunk_radius == progressbar._target_chunk_radius()
with mock.patch.object(
progressbar, "_update_chunk_radius", wraps=progressbar._update_chunk_radius
) as update_chunk_radius:
progressbar.set_value(35)
progressbar.set_value(42)
progressbar.set_value(50)
update_chunk_radius.assert_not_called()
def test_progressbar_repeated_same_maximum_does_not_reset_chunk_radius(progressbar):
progressbar.progressbar.resize(100, 20)
progressbar.set_maximum(100)
progressbar.set_value(30)
assert progressbar._chunk_radius == progressbar._target_chunk_radius()
with mock.patch.object(
progressbar, "_update_chunk_radius", wraps=progressbar._update_chunk_radius
) as update_chunk_radius:
progressbar.set_maximum(100)
progressbar.set_value(40)
update_chunk_radius.assert_not_called()
def test_progressbar_can_disable_dynamic_stylesheet(static_progressbar):
static_progressbar.progressbar.resize(100, 20)
assert static_progressbar.enable_dynamic_stylesheet is False
assert static_progressbar._chunk_radius == static_progressbar._target_chunk_radius()
with mock.patch.object(
static_progressbar, "_setup_style_sheet", wraps=static_progressbar._setup_style_sheet
) as setup_style_sheet:
static_progressbar.set_value(1)
static_progressbar.set_value(2)
static_progressbar.set_value(3)
setup_style_sheet.assert_not_called()
assert "border-radius: 7px;" in static_progressbar.progressbar.styleSheet()
def test_progressbar_dynamic_stylesheet_can_be_toggled(progressbar):
progressbar.enable_dynamic_stylesheet = False
assert progressbar.enable_dynamic_stylesheet is False
assert progressbar._chunk_radius == progressbar._target_chunk_radius()
assert "border-radius: 7px;" in progressbar.progressbar.styleSheet()
def test_progressbar_rebuilds_stylesheet_until_chunk_radius_reaches_target(progressbar):
progressbar.progressbar.resize(100, 20)
progressbar.set_value(9)
with mock.patch.object(
progressbar, "_setup_style_sheet", wraps=progressbar._setup_style_sheet
) as setup_style_sheet:
progressbar.set_value(12)
progressbar.set_value(25)
progressbar.set_value(30)
assert setup_style_sheet.call_count == 2
assert "border-radius: 7px;" in progressbar.progressbar.styleSheet()
def test_progressbar_resets_chunk_radius_when_value_goes_backwards(progressbar):
progressbar.progressbar.resize(100, 20)
progressbar.set_value(30)
assert "border-radius: 7px;" in progressbar.progressbar.styleSheet()
progressbar.set_value(4)
assert "border-radius: 2px;" in progressbar.progressbar.styleSheet()
def test_progressbar_state_setter(progressbar):
"""Setting .state reflects internally."""
progressbar.state = ProgressState.PAUSED
assert progressbar.state is ProgressState.PAUSED
def test_progressbar_warning_state_has_own_color_and_persists_on_value_update(progressbar):
assert (
progressbar._state_colors[ProgressState.PAUSED]
!= progressbar._state_colors[ProgressState.WARNING]
)
assert (
progressbar._state_colors[ProgressState.WARNING]
!= progressbar._state_colors[ProgressState.INTERRUPTED]
)
progressbar.state = ProgressState.WARNING
progressbar.set_value(50)
assert progressbar.state is ProgressState.WARNING
assert (
progressbar.progressbar.palette().color(QPalette.ColorRole.Highlight)
== progressbar._state_colors[ProgressState.WARNING]
)
def test_progressbar_warning_state_has_own_color_and_persists_on_value_update(progressbar):
assert (
progressbar._state_colors[ProgressState.PAUSED]
!= progressbar._state_colors[ProgressState.WARNING]
)
assert (
progressbar._state_colors[ProgressState.WARNING]
!= progressbar._state_colors[ProgressState.INTERRUPTED]
)
progressbar.state = ProgressState.WARNING
progressbar.set_value(50)
assert progressbar.state is ProgressState.WARNING
assert (
progressbar.progressbar.palette().color(QPalette.ColorRole.Highlight)
== progressbar._state_colors[ProgressState.WARNING]
)
+108 -2
View File
@@ -1,10 +1,18 @@
import signal
import subprocess
from contextlib import contextmanager
from unittest import mock
import pytest
from bec_widgets.cli.client import BECDockArea
from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process
from bec_widgets.cli.client_utils import (
GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT,
OUTPUT_READER_STOP_EVENT_ATTR,
BECGuiClient,
_join_process_output_thread,
_start_plot_process,
)
from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCResponseTimeoutError, rpc_timeout
@@ -262,7 +270,105 @@ def test_client_utils_delete_falls_back_to_direct_close():
def test_client_utils_gui_client_set_rpc_timeout():
gui = BECGuiClient()
assert gui._rpc_timeout == 5
assert gui._rpc_timeout == 60
gui.set_rpc_timeout(10)
assert gui._rpc_timeout == 10
def test_client_utils_kill_server_waits_for_process_before_joining_output_thread():
gui = BECGuiClient()
gui._client = mock.MagicMock()
gui._process = mock.MagicMock(pid=123, stdout=None, stderr=None)
gui._process.poll.return_value = None
order = []
gui._process.wait.side_effect = lambda timeout: order.append("wait")
gui._process_output_processing_thread = mock.MagicMock()
gui._process_output_processing_thread.join.side_effect = lambda timeout: order.append("join")
gui._process_output_processing_thread.is_alive.return_value = False
with (
mock.patch.object(gui, "_request_server_shutdown", return_value=False),
mock.patch("bec_widgets.cli.client_utils.os.getpgid", return_value=123),
mock.patch("bec_widgets.cli.client_utils.os.killpg") as killpg,
):
gui.kill_server()
killpg.assert_called_once_with(123, signal.SIGTERM)
assert order == ["wait", "join"]
assert gui._process is None
assert gui._process_output_processing_thread is None
def test_client_utils_kill_server_requests_graceful_shutdown_before_signal():
gui = BECGuiClient()
gui._client = mock.MagicMock()
process = mock.MagicMock(stdout=None, stderr=None)
process.poll.return_value = None
gui._process = process
gui._process_output_processing_thread = mock.MagicMock()
gui._process_output_processing_thread.is_alive.return_value = False
launcher = mock.MagicMock()
with (
mock.patch.object(
BECGuiClient, "launcher", new_callable=mock.PropertyMock
) as launcher_prop,
mock.patch("bec_widgets.cli.client_utils.os.killpg") as killpg,
):
launcher_prop.return_value = launcher
gui.kill_server()
launcher._run_rpc.assert_called_once_with(
"system.shutdown", wait_for_rpc_response=True, timeout=GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT
)
process.wait.assert_called_once_with(timeout=5)
killpg.assert_not_called()
assert gui._process is None
assert gui._process_output_processing_thread is None
def test_client_utils_kill_server_kills_process_group_after_timeout():
gui = BECGuiClient()
gui._client = mock.MagicMock()
process = mock.MagicMock(pid=123, stdout=None, stderr=None, args=["bec-gui-server"])
process.poll.return_value = None
process.wait.side_effect = [subprocess.TimeoutExpired(cmd="bec-gui-server", timeout=10), None]
gui._process = process
with (
mock.patch.object(gui, "_request_server_shutdown", return_value=False),
mock.patch("bec_widgets.cli.client_utils.os.getpgid", return_value=123),
mock.patch("bec_widgets.cli.client_utils.os.killpg") as killpg,
mock.patch("bec_widgets.cli.client_utils.subprocess.run") as run,
):
run.return_value.stdout = "PID PPID PGID STAT COMMAND\n123 1 123 S bec-gui-server"
gui.kill_server()
assert killpg.call_args_list == [mock.call(123, signal.SIGTERM), mock.call(123, signal.SIGKILL)]
assert process.wait.call_args_list == [mock.call(timeout=10), mock.call(timeout=10)]
run.assert_called_once_with(
["ps", "-o", "pid,ppid,pgid,stat,command", "-g", "123"],
check=False,
capture_output=True,
text=True,
timeout=2,
)
def test_join_process_output_thread_signals_reader_before_closing_streams():
process = mock.MagicMock(pid=123, args=["bec-gui-server"])
process.stdout = mock.MagicMock()
process.stderr = mock.MagicMock()
thread = mock.MagicMock()
stop_event = mock.MagicMock()
setattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, stop_event)
thread.is_alive.side_effect = [True, False]
logger = mock.MagicMock()
_join_process_output_thread(process, thread, logger)
assert thread.join.call_args_list == [mock.call(timeout=2), mock.call(timeout=2)]
stop_event.set.assert_called_once_with()
process.stdout.close.assert_called_once_with()
process.stderr.close.assert_called_once_with()
@@ -98,11 +98,15 @@ def test_waiting_display(update_dialog, qtbot):
mock_spinner_stop.assert_called_once()
def test_update_cycle(update_dialog, qtbot):
def test_update_cycle(update_dialog):
update = {"enabled": False, "readoutPriority": "baseline", "deviceTags": {"tag"}}
def _mock_send(action="update", config=None, wait_for_response=True, timeout_s=None):
update_dialog.client.device_manager.devices["test_device"]._config = config["test_device"] # type: ignore
device = update_dialog.client.device_manager.devices["test_device"]
device._config = {**device._config, **config["test_device"]} # type: ignore
update_dialog._q_threadpool = MagicMock()
update_dialog._q_threadpool.start.side_effect = lambda runnable: runnable.run()
update_dialog._config_helper.send_config_request = MagicMock(side_effect=_mock_send)
for item in update_dialog._form.enumerate_form_widgets():
@@ -111,9 +115,7 @@ def test_update_cycle(update_dialog, qtbot):
assert update_dialog.updated_config() == update
update_dialog.apply()
qtbot.waitUntil(
lambda: update_dialog._config_helper.send_config_request.call_count == 1, timeout=100
)
update_dialog._q_threadpool.start.assert_called_once()
update_dialog._config_helper.send_config_request.assert_called_with(
action="update", config={"test_device": update}, wait_for_response=False
@@ -40,7 +40,7 @@ def test_update_device_initialization_progress(progress_bar, qtbot):
assert progress_bar.progress_bar._user_value == 1
assert progress_bar.progress_bar._user_maximum == 3
assert progress_bar.progress_label.text() == f"{msg.device} initialization in progress..."
assert "1 / 3 - 33 %" == progress_bar.progress_bar.center_label.text()
assert "1 / 3 - 33 %" == progress_bar.progress_bar.progressbar.text()
# II. Update with message of finished DeviceInitializationProgressMessage, finished=True, success=True
msg.finished = True
@@ -49,7 +49,7 @@ def test_update_device_initialization_progress(progress_bar, qtbot):
assert progress_bar.progress_bar._user_value == 1
assert progress_bar.progress_bar._user_maximum == 3
assert progress_bar.progress_label.text() == f"{msg.device} initialization succeeded!"
assert "1 / 3 - 33 %" == progress_bar.progress_bar.center_label.text()
assert "1 / 3 - 33 %" == progress_bar.progress_bar.progressbar.text()
# III. Update with message of finished DeviceInitializationProgressMessage, finished=True, success=False
msg.finished = True
@@ -59,7 +59,7 @@ def test_update_device_initialization_progress(progress_bar, qtbot):
with qtbot.waitSignal(progress_bar.failed_devices_changed) as signal_blocker:
progress_bar._update_device_initialization_progress(msg.model_dump(), {})
assert progress_bar.progress_label.text() == f"{msg.device} initialization failed!"
assert "2 / 3 - 66 %" == progress_bar.progress_bar.center_label.text()
assert "2 / 3 - 66 %" == progress_bar.progress_bar.progressbar.text()
assert progress_bar.progress_bar._user_value == 2
assert progress_bar.progress_bar._user_maximum == 3
+55 -14
View File
@@ -4,7 +4,9 @@ import os
from unittest import mock
import pytest
from qtpy.QtCore import QObject
from qtpy.QtGui import QFontMetrics
from qtpy.QtWidgets import QWidget
import bec_widgets
from bec_widgets.applications.launch_window import START_EMPTY_PROFILE_OPTION, LaunchWindow
@@ -16,6 +18,28 @@ from .client_mocks import mocked_client
base_path = os.path.dirname(bec_widgets.__file__)
def _launcher_child_connection(launcher: LaunchWindow, name: str) -> QObject:
connection = QObject(parent=launcher)
connection.gui_id = f"{launcher.gui_id}:{name}"
connection.setObjectName(name)
return connection
def _top_level_connection(qtbot, name: str) -> QWidget:
connection = QWidget()
connection.gui_id = name
connection.setObjectName(name)
qtbot.addWidget(connection)
return connection
def _unparented_connection(name: str) -> QObject:
connection = QObject()
connection.gui_id = name
connection.setObjectName(name)
return connection
@pytest.fixture
def bec_launch_window(qtbot, mocked_client):
widget = LaunchWindow(client=mocked_client)
@@ -117,20 +141,20 @@ def test_open_dock_area_with_start_empty_option_calls_launch(bec_launch_window):
(["launcher", "dock_area", "scan_progress_simple", "scan_progress_full"], False),
(
["launcher", "dock_area", "scan_progress_simple", "scan_progress_full", "hover_widget"],
True,
False,
),
(["launcher", "external_window"], True),
],
)
def test_gui_server_turns_off_the_lights(bec_launch_window, connection_names, hide):
def test_gui_server_turns_off_the_lights(bec_launch_window, qtbot, connection_names, hide):
connections = {}
for name in connection_names:
conn = mock.MagicMock()
if name == "hover_widget":
conn.parent.return_value = None
conn.objectName.return_value = "HoverWidget"
conn = _unparented_connection("HoverWidget")
elif name == "external_window":
conn = _top_level_connection(qtbot, "external_window")
else:
conn.parent.return_value = mock.MagicMock()
conn.objectName.return_value = bec_launch_window.objectName()
conn = _launcher_child_connection(bec_launch_window, name)
connections[name] = conn
with (
mock.patch.object(bec_launch_window, "show") as mock_show,
@@ -153,6 +177,23 @@ def test_gui_server_turns_off_the_lights(bec_launch_window, connection_names, hi
mock_set_quit_on_last_window_closed.assert_called_once_with(True)
def test_launcher_detects_external_main_window(bec_launch_window, qtbot):
connection = _top_level_connection(qtbot, "BECMainWindowNoRPC")
assert bec_launch_window._has_external_window({"window": connection})
def test_launcher_logs_unparented_non_window_connection_once(bec_launch_window):
connection = _unparented_connection("HoverWidget")
with mock.patch("bec_widgets.applications.launch_window.logger.warning") as mock_warning:
bec_launch_window._turn_off_the_lights({"window": connection})
bec_launch_window._turn_off_the_lights({"window": connection})
mock_warning.assert_called_once()
assert "HoverWidget" in mock_warning.call_args.args[0]
@pytest.mark.parametrize(
"connection_names, close_called",
[
@@ -163,11 +204,12 @@ def test_gui_server_turns_off_the_lights(bec_launch_window, connection_names, hi
(["launcher", "dock_area", "scan_progress_simple", "scan_progress_full"], True),
(
["launcher", "dock_area", "scan_progress_simple", "scan_progress_full", "hover_widget"],
False,
True,
),
(["launcher", "external_window"], False),
],
)
def test_launch_window_closes(bec_launch_window, connection_names, close_called):
def test_launch_window_closes(bec_launch_window, qtbot, connection_names, close_called):
"""
Test that the close event is handled correctly based on the connections.
If there are no connections or only the launcher connection, the window should close.
@@ -175,13 +217,12 @@ def test_launch_window_closes(bec_launch_window, connection_names, close_called)
"""
connections = {}
for name in connection_names:
conn = mock.MagicMock()
if name == "hover_widget":
conn.parent.return_value = None
conn.objectName.return_value = "HoverWidget"
conn = _unparented_connection("HoverWidget")
elif name == "external_window":
conn = _top_level_connection(qtbot, "external_window")
else:
conn.parent.return_value = mock.MagicMock()
conn.objectName.return_value = bec_launch_window.objectName()
conn = _launcher_child_connection(bec_launch_window, name)
connections[name] = conn
close_event = mock.MagicMock()
with mock.patch.object(
+133
View File
@@ -0,0 +1,133 @@
from unittest import mock
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.widgets.progress.progress_backend import BECProgressTracker
def _dispatcher():
dispatcher = mock.MagicMock()
return dispatcher
def test_tracker_subscribes_to_scan_progress_immediately():
dispatcher = _dispatcher()
tracker = BECProgressTracker(dispatcher)
tracker.start()
assert dispatcher.connect_slot.call_args_list == [
mock.call(tracker.process_progress_message, MessageEndpoints.scan_progress()),
mock.call(tracker.process_scan_status_message, MessageEndpoints.scan_status()),
]
tracker.cleanup()
def test_tracker_starts_scan_from_scan_progress_metadata():
dispatcher = _dispatcher()
tracker = BECProgressTracker(dispatcher)
snapshots = []
tracker.progress_updated.connect(snapshots.append)
tracker.start()
tracker.process_progress_message(
{"value": 3, "max_value": 10},
{"scan_id": "scan-2", "RID": "rid-2", "scan_number": 2, "status": "open"},
)
assert tracker.task is not None
assert tracker._active_scan_id == "scan-2"
assert tracker._active_rid == "rid-2"
assert tracker.scan_number == 2
assert snapshots[-1].scan_number == 2
tracker.cleanup()
def test_tracker_switches_sources_idempotently():
dispatcher = _dispatcher()
tracker = BECProgressTracker(dispatcher)
tracker.start()
tracker.start()
assert dispatcher.connect_slot.call_count == 2
assert dispatcher.disconnect_slot.call_count == 0
tracker.cleanup()
def test_tracker_resets_progress_on_new_open_scan_status():
dispatcher = _dispatcher()
tracker = BECProgressTracker(dispatcher)
snapshots = []
tracker.progress_updated.connect(snapshots.append)
tracker.start()
snapshot = tracker.process_scan_status_message(
{"scan_id": "scan-1", "scan_number": 7, "status": "open"}, {}
)
assert snapshot is not None
assert snapshot.value == 0
assert snapshot.max_value == 100
assert snapshot.status == "open"
assert snapshot.scan_id == "scan-1"
assert snapshot.scan_number == 7
assert snapshot.is_new_scan is True
assert tracker.task is None
assert tracker.scan_number == 7
assert snapshots[-1] == snapshot
tracker.cleanup()
def test_tracker_ignores_duplicate_open_scan_status():
dispatcher = _dispatcher()
tracker = BECProgressTracker(dispatcher)
snapshots = []
tracker.progress_updated.connect(snapshots.append)
tracker.start()
tracker.process_scan_status_message({"scan_id": "scan-1", "status": "open"}, {})
tracker.process_scan_status_message({"scan_id": "scan-1", "status": "open"}, {})
assert len(snapshots) == 1
tracker.cleanup()
def test_tracker_marks_new_scan_only_when_rid_changes():
dispatcher = _dispatcher()
tracker = BECProgressTracker(dispatcher)
snapshots = []
tracker.progress_updated.connect(snapshots.append)
tracker.start()
tracker.process_progress_message({"value": 10, "max_value": 100}, {"RID": "rid-1"})
tracker.process_progress_message({"value": 20, "max_value": 200}, {"RID": "rid-1"})
tracker.process_progress_message({"value": 5, "max_value": 50}, {"RID": "rid-2"})
assert [snapshot.is_new_scan for snapshot in snapshots] == [True, False, True]
assert tracker._active_rid == "rid-2"
tracker.cleanup()
def test_tracker_keeps_partial_value_for_done_scan_progress():
dispatcher = _dispatcher()
tracker = BECProgressTracker(dispatcher)
snapshots = []
tracker.progress_updated.connect(snapshots.append)
tracker.start()
tracker.process_progress_message(
{"value": 4, "max_value": 10, "done": True},
{"scan_id": "scan-1", "RID": "rid-1", "status": "aborted"},
)
assert snapshots[-1].value == 4
assert snapshots[-1].max_value == 10
assert snapshots[-1].done is True
assert tracker.task is None
tracker.cleanup()
+146 -26
View File
@@ -1,8 +1,9 @@
# pylint: disable=missing-function-docstring, missing-module-docstring
from unittest.mock import MagicMock
from unittest.mock import MagicMock, call
import pytest
from bec_lib.endpoints import MessageEndpoints
from qtpy.QtGui import QColor
from bec_widgets.tests.utils import FakeDevice
@@ -76,11 +77,14 @@ def test_set_update_to_scan(ring_widget):
ring_widget.set_update("scan")
assert ring_widget.config.mode == "scan"
# Verify that connect_slot was called
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
assert call_args[0][0] == ring_widget.on_scan_progress
assert "scan_progress" in str(call_args[0][1])
assert ring_widget.bec_dispatcher.connect_slot.call_args_list == [
call(
ring_widget.progress_tracker.process_progress_message, MessageEndpoints.scan_progress()
),
call(
ring_widget.progress_tracker.process_scan_status_message, MessageEndpoints.scan_status()
),
]
def test_set_update_from_scan_to_manual(ring_widget):
@@ -97,6 +101,14 @@ def test_set_update_from_scan_to_manual(ring_widget):
assert ring_widget.config.mode == "manual"
assert ring_widget.registered_slot is None
assert ring_widget.bec_dispatcher.disconnect_slot.call_args_list == [
call(
ring_widget.progress_tracker.process_progress_message, MessageEndpoints.scan_progress()
),
call(
ring_widget.progress_tracker.process_scan_status_message, MessageEndpoints.scan_status()
),
]
def test_set_update_to_device(ring_widget_with_device):
@@ -420,7 +432,7 @@ def test_set_direction_counter_clockwise(ring_widget):
###################################
def test_update_device_connection_with_progress_signal(ring_widget_with_device):
def test_update_device_connection_prefers_progress_signal(ring_widget_with_device):
ring_widget = ring_widget_with_device
samx = ring_widget.bec_dispatcher.client.device_manager.devices.samx
samx._info["signals"]["progress"] = {
@@ -432,15 +444,114 @@ def test_update_device_connection_with_progress_signal(ring_widget_with_device):
ring_widget.bec_dispatcher.connect_slot = MagicMock()
ring_widget._update_device_connection("samx", "progress")
signal = ring_widget._update_device_connection("samx", "")
# Should connect to device_progress endpoint
assert signal == "progress"
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
assert call_args[0][0] == ring_widget.on_device_progress
assert call_args[0][1] == MessageEndpoints.device_progress("samx")
def test_update_device_connection_with_hinted_signal(ring_widget):
def test_update_device_connection_accepts_explicit_progress_signal(ring_widget_with_device):
ring_widget = ring_widget_with_device
samx = ring_widget.bec_dispatcher.client.device_manager.devices.samx
samx._info["signals"]["progress"] = {
"obj_name": "samx_progress",
"component_name": "progress",
"signal_class": "ProgressSignal",
"kind_str": "hinted",
}
ring_widget.bec_dispatcher.connect_slot = MagicMock()
signal = ring_widget._update_device_connection("samx", "progress")
assert signal == "progress"
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
assert call_args[0][0] == ring_widget.on_device_progress
assert call_args[0][1] == MessageEndpoints.device_progress("samx")
def test_update_device_connection_resolves_component_name_to_readback_signal(
ring_widget_with_device,
):
ring_widget = ring_widget_with_device
samx = ring_widget.bec_dispatcher.client.device_manager.devices.samx
samx._info["signals"]["setpoint"] = {
"obj_name": "samx_setpoint",
"component_name": "setpoint",
"signal_class": "Signal",
"kind_str": "normal",
}
ring_widget.bec_dispatcher.connect_slot = MagicMock()
signal = ring_widget._update_device_connection("samx", "setpoint")
assert signal == "setpoint"
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
assert call_args[0][0] == ring_widget.on_device_readback
assert call_args[0][1] == MessageEndpoints.device_readback("samx")
def test_update_device_connection_falls_back_to_hinted_signal(ring_widget_with_device):
ring_widget = ring_widget_with_device
ring_widget.bec_dispatcher.connect_slot = MagicMock()
signal = ring_widget._update_device_connection("samx", "")
assert signal == "samx"
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
assert call_args[0][0] == ring_widget.on_device_readback
assert call_args[0][1] == MessageEndpoints.device_readback("samx")
def test_update_device_connection_falls_back_to_normal_signal(ring_widget_with_device):
ring_widget = ring_widget_with_device
samx = ring_widget.bec_dispatcher.client.device_manager.devices.samx
samx._info["signals"] = {
"setpoint": {
"obj_name": "samx_setpoint",
"component_name": "setpoint",
"signal_class": "Signal",
"kind_str": "normal",
}
}
ring_widget.bec_dispatcher.connect_slot = MagicMock()
signal = ring_widget._update_device_connection("samx", "")
assert signal == "setpoint"
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
assert call_args[0][0] == ring_widget.on_device_readback
assert call_args[0][1] == MessageEndpoints.device_readback("samx")
def test_update_device_connection_rejects_unusable_signal(ring_widget_with_device):
ring_widget = ring_widget_with_device
samx = ring_widget.bec_dispatcher.client.device_manager.devices.samx
samx._info["signals"]["async_signal"] = {
"obj_name": "samx_async",
"component_name": "async_signal",
"signal_class": "AsyncSignal",
"kind_str": "hinted",
}
ring_widget.bec_dispatcher.connect_slot = MagicMock()
with pytest.raises(ValueError, match="not usable for ring progress device mode"):
ring_widget._update_device_connection("samx", "samx_async")
ring_widget.bec_dispatcher.connect_slot.assert_not_called()
def test_update_device_connection_accepts_explicit_hinted_signal(ring_widget):
mock_device = FakeDevice(name="samx")
mock_device._info = {
"signals": {
@@ -452,12 +563,13 @@ def test_update_device_connection_with_hinted_signal(ring_widget):
ring_widget.bec_dispatcher.connect_slot = MagicMock()
ring_widget._update_device_connection("samx", "samx")
signal = ring_widget._update_device_connection("samx", "samx")
# Should connect to device_readback endpoint
assert signal == "samx"
ring_widget.bec_dispatcher.connect_slot.assert_called_once()
call_args = ring_widget.bec_dispatcher.connect_slot.call_args
assert call_args[0][0] == ring_widget.on_device_readback
assert call_args[0][1] == MessageEndpoints.device_readback("samx")
def test_update_device_connection_no_device_manager(ring_widget):
@@ -472,44 +584,52 @@ def test_update_device_connection_device_not_found(ring_widget):
mock_device = FakeDevice(name="samx")
ring_widget.bec_dispatcher.client.device_manager.devices["samx"] = mock_device
# Should return without raising an error
ring_widget._update_device_connection("nonexistent", "signal")
assert ring_widget._update_device_connection("nonexistent", "signal") == ""
###################################
# on_scan_progress tests
# scan progress tests
###################################
def test_on_scan_progress_updates_value(ring_widget):
def test_scan_progress_updates_value(ring_widget):
msg = {"value": 42, "max_value": 100}
meta = {"RID": "test_rid_123"}
ring_widget.on_scan_progress(msg, meta)
ring_widget.progress_tracker.process_progress_message(msg, meta)
assert ring_widget.config.value == 42
def test_on_scan_progress_updates_min_max_on_new_rid(ring_widget):
def test_scan_status_open_resets_scan_progress_value(ring_widget):
ring_widget.set_min_max_values(0, 200)
ring_widget.set_value(80)
ring_widget.progress_tracker.process_scan_status_message(
{"scan_id": "scan-1", "scan_number": 7, "status": "open"}, {}
)
assert ring_widget.config.min_value == 0
assert ring_widget.config.max_value == 100
assert ring_widget.config.value == 0
def test_scan_progress_updates_min_max_on_new_rid(ring_widget):
msg = {"value": 50, "max_value": 200}
meta = {"RID": "new_rid"}
ring_widget.RID = "old_rid"
ring_widget.on_scan_progress(msg, meta)
ring_widget.progress_tracker.process_progress_message(msg, meta)
assert ring_widget.config.min_value == 0
assert ring_widget.config.max_value == 200
assert ring_widget.config.value == 50
def test_on_scan_progress_same_rid_no_min_max_update(ring_widget):
msg = {"value": 75, "max_value": 300}
def test_scan_progress_same_rid_no_min_max_update(ring_widget):
meta = {"RID": "same_rid"}
ring_widget.RID = "same_rid"
ring_widget.set_min_max_values(0, 100)
ring_widget.on_scan_progress(msg, meta)
ring_widget.progress_tracker.process_progress_message({"value": 10, "max_value": 100}, meta)
ring_widget.progress_tracker.process_progress_message({"value": 75, "max_value": 300}, meta)
# Max value should not be updated when RID is the same
assert ring_widget.config.max_value == 100
+32
View File
@@ -3,10 +3,12 @@ from unittest.mock import MagicMock
import pytest
from bec_lib.device import DeviceBaseWithConfig, Signal
from bec_widgets.cli.rpc import rpc_base as rpc_base_module
from bec_widgets.cli.rpc.rpc_base import (
DeletedWidgetError,
RPCBase,
RPCReference,
RPCResponseTimeoutError,
_transform_args_kwargs,
)
@@ -51,3 +53,33 @@ def test_transform_args_kwargs():
)
assert args == ("full name", "short name", "string_arg", "full name")
assert kwargs == {"a": "full name", "b": "short name", "c": "string_arg", "d": "full name"}
def test_run_rpc_logs_response_timeout(monkeypatch):
rpc = RPCBase(gui_id="progress_widget", object_name="progressbar")
rpc._rpc_timeout = 0
rpc._client = MagicMock()
info_mock = MagicMock()
error_mock = MagicMock()
monkeypatch.setattr(rpc_base_module.logger, "info", info_mock)
monkeypatch.setattr(rpc_base_module.logger, "error", error_mock)
with pytest.raises(RPCResponseTimeoutError):
rpc._run_rpc("set_value", 42, precision=2, timeout=0)
publish_msg = rpc._client.connector.set_and_publish.call_args.args[1]
assert publish_msg.metadata["method"] == "set_value"
assert publish_msg.metadata["target_gui_id"] == "progress_widget"
assert publish_msg.metadata["object_name"] == "progressbar"
assert publish_msg.metadata["timeout"] == 0
assert publish_msg.metadata["deadline"] == publish_msg.metadata["sent_at"]
assert info_mock.call_count == 1
info_message = info_mock.call_args.args[0]
error_mock.assert_called_once()
error_message = error_mock.call_args.args[0]
assert "GUI RPC response timeout" in error_message
assert "method=set_value" in error_message
assert "target_gui_id=progress_widget" in error_message
assert "object_name=progressbar" in error_message
assert "timeout=0" in error_message
+138 -12
View File
@@ -1,11 +1,13 @@
import argparse
from unittest.mock import patch
from unittest.mock import MagicMock, patch
import pytest
from bec_lib.service_config import ServiceConfig
from qtpy.QtWidgets import QWidget
from bec_widgets.applications import companion_app as companion_app_module
from bec_widgets.applications.companion_app import GUIServer
from bec_widgets.utils import rpc_server as rpc_server_module
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.rpc_server import RegistryNotReadyError, RPCServer, SingleshotRPCRepeat
@@ -58,6 +60,68 @@ def test_gui_server_get_service_config(gui_server):
assert gui_server._get_service_config().config == ServiceConfig().config
def test_gui_server_signal_shutdown_closes_widgets_and_quits_app(gui_server):
widget = MagicMock()
gui_server.app = MagicMock()
gui_server.app.topLevelWidgets.return_value = [widget]
gui_server.request_shutdown()
widget.close.assert_called_once()
gui_server.app.quit.assert_called_once()
def test_gui_server_shutdown_is_idempotent(gui_server):
gui_server.launcher_window = MagicMock()
gui_server.dispatcher = MagicMock()
with (
patch.object(companion_app_module.shiboken6, "isValid", return_value=True),
patch.object(companion_app_module.pylsp_server, "is_running", return_value=False),
):
gui_server.shutdown()
gui_server.shutdown()
gui_server.launcher_window.close.assert_called_once()
gui_server.launcher_window.deleteLater.assert_called_once()
gui_server.dispatcher.stop_cli_server.assert_called_once()
gui_server.dispatcher.disconnect_all.assert_called_once()
def test_rpc_server_system_capabilities_include_shutdown(rpc_server):
assert rpc_server.run_system_rpc("system.list_capabilities", [], {}) == {
"system.launch_dock_area": True,
"system.shutdown": True,
}
def test_rpc_server_system_shutdown_requests_gui_server_shutdown(rpc_server, qapp):
gui_server = MagicMock()
qapp.gui_server = gui_server
rpc_server.run_system_rpc("system.shutdown", [], {})
qapp.processEvents()
gui_server.request_shutdown.assert_called_once()
del qapp.gui_server
def test_on_rpc_update_system_shutdown_sends_response_before_return(rpc_server):
order = []
rpc_server.run_system_rpc = MagicMock(side_effect=lambda *_args: order.append("shutdown"))
rpc_server.send_response = MagicMock(side_effect=lambda *_args: order.append("response"))
rpc_server.serialize_result_and_send = MagicMock()
rpc_server.on_rpc_update(
{"action": "system.shutdown", "parameter": {"args": [], "kwargs": {}}},
{"request_id": "shutdown-request", "sent_at": 1.0, "deadline": 10.0, "timeout": 2},
)
assert order == ["shutdown", "response"]
rpc_server.send_response.assert_called_once_with("shutdown-request", True, {"result": None})
rpc_server.serialize_result_and_send.assert_not_called()
def test_singleshot_rpc_repeat_raises_on_repeated_singleshot(rpc_server):
"""
Test that a singleshot RPC method raises an error when called multiple times.
@@ -91,22 +155,34 @@ def test_serialize_result_and_send_with_singleshot_retry(rpc_server, qtbot, dumm
# Third call succeeds
return {"gui_id": dummy.gui_id, "success": True}
warning_mock = MagicMock()
# Patch serialize_object to control when it raises RegistryNotReadyError
with patch.object(rpc_server, "serialize_object", side_effect=serialize_side_effect):
with patch.object(rpc_server, "send_response") as mock_send_response:
# Start the serialization process
rpc_server._rpc_singleshot_repeats[request_id] = SingleshotRPCRepeat()
rpc_server.serialize_result_and_send(request_id, dummy)
with patch.object(rpc_server_module.logger, "warning", warning_mock):
# Start the serialization process
rpc_server._rpc_singleshot_repeats[request_id] = SingleshotRPCRepeat()
rpc_server.serialize_result_and_send(request_id, dummy)
# Verify that serialize_object was called 3 times
qtbot.waitUntil(lambda: call_count >= 3, timeout=5000)
# Verify that serialize_object was called 3 times
qtbot.waitUntil(lambda: call_count >= 3, timeout=5000)
# Verify that send_response was called with success
mock_send_response.assert_called_once()
args = mock_send_response.call_args[0]
assert args[0] == request_id
assert args[1] is True # accepted=True
assert "result" in args[2]
# Verify that send_response was called with success
mock_send_response.assert_called_once()
args = mock_send_response.call_args[0]
assert args[0] == request_id
assert args[1] is True # accepted=True
assert "result" in args[2]
assert warning_mock.call_count == 2
warning_logs = "\n".join(call.args[0] for call in warning_mock.call_args_list)
assert "result serialization delayed; retrying" in warning_logs
assert "request_id=test_request_123" in warning_logs
assert "retry_delay_ms=100" in warning_logs
assert "accumulated_delay_ms=100" in warning_logs
assert "accumulated_delay_ms=200" in warning_logs
assert "max_delay_ms=2000" in warning_logs
def test_serialize_result_and_send_max_delay_exceeded(rpc_server, qtbot, dummy_widget):
@@ -140,6 +216,56 @@ def test_serialize_result_and_send_max_delay_exceeded(rpc_server, qtbot, dummy_w
assert "Max delay exceeded" in args[2]["error"]
def test_send_response_logs_publish_status(rpc_server, monkeypatch):
info_mock = MagicMock()
error_mock = MagicMock()
monkeypatch.setattr(rpc_server_module.logger, "info", info_mock)
monkeypatch.setattr(rpc_server_module.logger, "error", error_mock)
with patch.object(rpc_server.client.connector, "set_and_publish") as publish_mock:
rpc_server.send_response("request-ok", True, {"result": None})
rpc_server.send_response("request-failed", False, {"error": "bad"})
assert publish_mock.call_count == 2
assert "request_id=request-ok" in info_mock.call_args.args[0]
assert "accepted=True" in info_mock.call_args.args[0]
assert "request_id=request-failed" in error_mock.call_args.args[0]
assert "accepted=False" in error_mock.call_args.args[0]
def test_on_rpc_update_logs_late_client_deadline(rpc_server, monkeypatch):
info_mock = MagicMock()
warning_mock = MagicMock()
monkeypatch.setattr(rpc_server_module.logger, "info", info_mock)
monkeypatch.setattr(rpc_server_module.logger, "warning", warning_mock)
rpc_server.rpc_register.get_rpc_by_id = MagicMock()
rpc_server.run_rpc = MagicMock(return_value=None)
rpc_server.serialize_result_and_send = MagicMock()
rpc_server.on_rpc_update(
{
"action": "set_value",
"parameter": {"args": [1], "kwargs": {"source": "test"}, "gui_id": "ring"},
},
{"request_id": "late-request", "timeout": 0.1, "sent_at": 1.0, "deadline": 1.1},
)
received_log = info_mock.call_args_list[0].args[0]
executed_log = info_mock.call_args_list[1].args[0]
warning_logs = "\n".join(call.args[0] for call in warning_mock.call_args_list)
assert "GUI RPC server received request" in received_log
assert "request_id=late-request" in received_log
assert "method=set_value" in received_log
assert "target_gui_id=ring" in received_log
assert "timeout=0.1" in received_log
assert "stale_on_receive=True" in received_log
assert "response_after_client_deadline=True" in executed_log
assert "received request after client timeout deadline" in warning_logs
assert "response is late for client timeout" in warning_logs
def test_run_rpc_delegates_to_rpc_content_class(rpc_server):
class Content:
USER_ACCESS = ["foo", "mode", "mode.setter"]
+138 -266
View File
@@ -2,7 +2,6 @@ from unittest import mock
import numpy as np
import pytest
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.utils.bec_widget import BECWidget
@@ -10,11 +9,8 @@ from bec_widgets.widgets.progress.bec_progressbar.bec_progressbar import (
BECProgressBar,
ProgressState,
)
from bec_widgets.widgets.progress.scan_progressbar.scan_progressbar import (
ProgressSource,
ProgressTask,
ScanProgressBar,
)
from bec_widgets.widgets.progress.progress_backend import ProgressTask
from bec_widgets.widgets.progress.scan_progressbar.scan_progressbar import ScanProgressBar
from .client_mocks import mocked_client
@@ -27,30 +23,6 @@ def scan_progressbar(qtbot, mocked_client):
yield widget
@pytest.fixture
def scan_message():
return messages.ScanQueueMessage(
metadata={
"file_suffix": None,
"file_directory": None,
"user_metadata": {"sample_name": ""},
"RID": "94949c6e-d5f2-4f01-837e-a5d36257dd5d",
},
scan_type="line_scan",
parameter={
"args": {"samx": [-10.0, 10.0]},
"kwargs": {
"steps": 20,
"relative": False,
"exp_time": 0.1,
"burst_at_each_point": 1,
"system_config": {"file_suffix": None, "file_directory": None},
},
},
queue="primary",
)
def test_progress_task_basic():
"""percentage, remaining, and formatted time helpers behave as expected."""
task = ProgressTask(parent=None, value=50, max_value=100, done=False)
@@ -71,18 +43,52 @@ def test_progress_task_basic():
assert task.time_elapsed == "00:00:10"
def test_progress_task_elapsed_time_uses_monotonic_clock(monkeypatch):
times = iter([100.0, 102.5])
monkeypatch.setattr(
"bec_widgets.widgets.progress.progress_backend.time.monotonic", lambda: next(times)
)
task = ProgressTask(parent=None)
task.timer.stop()
task.update_elapsed_time()
assert task._elapsed_time == 2.5
assert task.time_elapsed == "00:00:02"
def test_scan_progressbar_initialization(scan_progressbar):
assert isinstance(scan_progressbar, ScanProgressBar)
assert isinstance(scan_progressbar.progressbar, BECProgressBar)
def test_scan_progressbar_passes_dynamic_stylesheet_setting(qtbot, mocked_client):
widget = ScanProgressBar(client=mocked_client, enable_dynamic_stylesheet=False)
qtbot.addWidget(widget)
assert widget.progressbar.enable_dynamic_stylesheet is False
def test_scan_progressbar_starts_from_scan_progress_before_queue_update(scan_progressbar):
scan_progressbar.progress_tracker.clear_task(emit_finished=False)
scan_progressbar.progress_tracker.process_progress_message(
{"value": 3, "max_value": 10, "done": False}, metadata={"RID": "live-rid"}
)
assert scan_progressbar.progress_tracker.task is not None
assert scan_progressbar.progress_tracker._active_scan_id == "live-rid"
assert scan_progressbar.progressbar._user_value == 3
assert scan_progressbar.progressbar._user_maximum == 10
def test_update_labels_content(scan_progressbar):
"""update_labels() reflects ProgressTask time strings on the UI."""
# fabricate a task with known timings
task = ProgressTask(parent=scan_progressbar, value=30, max_value=100, done=False)
task.timer.stop()
task._elapsed_time = 50
scan_progressbar.task = task
scan_progressbar.progress_tracker.task = task
scan_progressbar.update_labels()
@@ -90,17 +96,17 @@ def test_update_labels_content(scan_progressbar):
assert scan_progressbar.ui.remaining_time_label.text() == "00:01:57"
def test_on_progress_update(qtbot, scan_progressbar):
def test_progress_update(qtbot, scan_progressbar):
"""
on_progress_update() should forward new values to the embedded
BECProgressBar and keep ProgressTask in sync.
Scan progress updates should update the embedded BECProgressBar
and keep ProgressTask in sync.
"""
task = ProgressTask(parent=scan_progressbar, value=0, max_value=100, done=False)
task.timer.stop()
scan_progressbar.task = task
scan_progressbar.progress_tracker.task = task
msg = {"value": 20, "max_value": 100, "done": False}
scan_progressbar.on_progress_update(msg, metadata={"status": "open"})
scan_progressbar.progress_tracker.process_progress_message(msg, metadata={"status": "open"})
qtbot.wait(200)
bar = scan_progressbar.progressbar
@@ -110,14 +116,58 @@ def test_on_progress_update(qtbot, scan_progressbar):
assert bar.state is ProgressState.NORMAL
def test_scan_status_open_resets_progress_before_first_progress_update(scan_progressbar):
scan_progressbar.progressbar.set_maximum(50)
scan_progressbar.progressbar.set_value(25)
scan_progressbar.progressbar.state = ProgressState.INTERRUPTED
scan_progressbar.ui.elapsed_time_label.setText("00:00:12")
scan_progressbar.ui.remaining_time_label.setText("00:00:34")
scan_progressbar.progress_tracker.process_scan_status_message(
{"scan_id": "scan-1", "scan_number": 7, "status": "open"}, {"RID": "rid-1"}
)
assert scan_progressbar.progressbar._user_value == 0
assert scan_progressbar.progressbar._user_maximum == 100
assert scan_progressbar.progressbar.state is ProgressState.NORMAL
assert scan_progressbar.ui.elapsed_time_label.text() == "00:00:00"
assert scan_progressbar.ui.remaining_time_label.text() == "00:00:00"
assert scan_progressbar.ui.source_label.text() == "Scan 7"
def test_scan_status_open_reset_ignores_same_scan(scan_progressbar):
scan_progressbar.progress_tracker.process_scan_status_message(
{"scan_id": "scan-1", "status": "open"}, {}
)
scan_progressbar.progressbar.set_value(20)
scan_progressbar.progress_tracker.process_scan_status_message(
{"scan_id": "scan-1", "status": "open"}, {}
)
assert scan_progressbar.progressbar._user_value == 20
def test_scan_status_non_open_does_not_reset_progress(scan_progressbar):
scan_progressbar.progressbar.set_value(25)
scan_progressbar.progress_tracker.process_scan_status_message(
{"scan_id": "scan-1", "status": "closed"}, {}
)
assert scan_progressbar.progressbar._user_value == 25
@pytest.mark.parametrize(
"status, value, max_val, expected_state",
[
("open", 10, 100, ProgressState.NORMAL),
("paused", 25, 100, ProgressState.PAUSED),
("aborted", 30, 100, ProgressState.INTERRUPTED),
("halted", 40, 100, ProgressState.PAUSED),
("aborted", 30, 100, ProgressState.WARNING),
("halted", 40, 100, ProgressState.INTERRUPTED),
("closed", 100, 100, ProgressState.COMPLETED),
("user_completed", 40, 100, ProgressState.COMPLETED),
("UNKNOWN", 10, 100, ProgressState.NORMAL),
],
)
def test_state_mapping_during_updates(
@@ -126,9 +176,9 @@ def test_state_mapping_during_updates(
"""ScanProgressBar should translate BEC status → ProgressState consistently."""
task = ProgressTask(parent=scan_progressbar, value=0, max_value=max_val, done=False)
task.timer.stop()
scan_progressbar.task = task
scan_progressbar.progress_tracker.task = task
scan_progressbar.on_progress_update(
scan_progressbar.progress_tracker.process_progress_message(
{"value": value, "max_value": max_val, "done": status == "closed"},
metadata={"status": status},
)
@@ -136,97 +186,39 @@ def test_state_mapping_during_updates(
assert scan_progressbar.progressbar.state is expected_state
def test_source_label_updates(scan_progressbar):
"""update_source_label() renders correct text for both progress sources."""
# device progress
scan_progressbar.update_source_label(ProgressSource.DEVICE_PROGRESS, device="motor")
assert scan_progressbar.ui.source_label.text() == "Device motor"
def test_aborted_done_scan_keeps_partial_progress(scan_progressbar):
scan_progressbar.progress_tracker.process_progress_message(
{"value": 4, "max_value": 10, "done": True},
metadata={"scan_id": "scan-1", "RID": "rid-1", "status": "aborted"},
)
# scan progress (needs a scan_number for deterministic text)
scan_progressbar.scan_number = 5
scan_progressbar.update_source_label(ProgressSource.SCAN_PROGRESS)
assert scan_progressbar.progressbar._user_value == 4
assert scan_progressbar.progressbar._user_maximum == 10
assert scan_progressbar.progressbar.state is ProgressState.WARNING
assert scan_progressbar.progress_tracker.task is None
def test_source_label_updates(scan_progressbar):
"""update_source_label() renders the current scan label."""
scan_progressbar.progress_tracker.scan_number = 5
scan_progressbar.update_source_label()
assert scan_progressbar.ui.source_label.text() == "Scan 5"
def test_set_progress_source_connections(scan_progressbar, monkeypatch):
""" """
def test_source_label_update_logs_only_on_text_change(scan_progressbar):
scan_progressbar.progress_tracker.scan_number = 5
connect_calls = []
disconnect_calls = []
with mock.patch(
"bec_widgets.widgets.progress.scan_progressbar.scan_progressbar.logger.info"
) as mock_info:
scan_progressbar.update_source_label()
scan_progressbar.update_source_label()
scan_progressbar.update_source_label()
def fake_connect(slot, endpoint):
connect_calls.append(endpoint)
def fake_disconnect(slot, endpoint):
disconnect_calls.append(endpoint)
# Patch dispatcher methods
monkeypatch.setattr(scan_progressbar.bec_dispatcher, "connect_slot", fake_connect)
monkeypatch.setattr(scan_progressbar.bec_dispatcher, "disconnect_slot", fake_disconnect)
# switch to SCAN_PROGRESS
scan_progressbar.scan_number = 7
scan_progressbar.set_progress_source(ProgressSource.SCAN_PROGRESS)
assert scan_progressbar._progress_source == ProgressSource.SCAN_PROGRESS
assert scan_progressbar.ui.source_label.text() == "Scan 7"
assert connect_calls[-1] == MessageEndpoints.scan_progress()
assert disconnect_calls == []
# switch to DEVICE_PROGRESS
device = "motor"
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device=device)
assert scan_progressbar._progress_source == ProgressSource.DEVICE_PROGRESS
assert scan_progressbar.ui.source_label.text() == f"Device {device}"
assert connect_calls[-1] == MessageEndpoints.device_progress(device=device)
assert disconnect_calls == [MessageEndpoints.scan_progress()]
# calling again with the SAME source should not add new connect calls
prev_connect_count = len(connect_calls)
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device=device)
assert len(connect_calls) == prev_connect_count, "No extra connect made for same source"
mock_info.assert_called_once_with("Set progress source to Scan 5")
def test_set_progress_source_disconnects_previous_device_subscription(
scan_progressbar, monkeypatch
):
disconnect_calls = []
monkeypatch.setattr(scan_progressbar.bec_dispatcher, "connect_slot", lambda *args: None)
monkeypatch.setattr(
scan_progressbar.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint: disconnect_calls.append(endpoint),
)
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor1")
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor2")
assert disconnect_calls == [MessageEndpoints.device_progress(device="motor1")]
def test_set_progress_source_disconnects_device_when_switching_to_scan(
scan_progressbar, monkeypatch
):
disconnect_calls = []
monkeypatch.setattr(scan_progressbar.bec_dispatcher, "connect_slot", lambda *args: None)
monkeypatch.setattr(
scan_progressbar.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint: disconnect_calls.append(endpoint),
)
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor1")
scan_progressbar.set_progress_source(ProgressSource.SCAN_PROGRESS)
assert disconnect_calls == [MessageEndpoints.device_progress(device="motor1")]
def test_cleanup_disconnects_active_device_subscription(scan_progressbar, monkeypatch):
def test_cleanup_disconnects_active_scan_subscription(scan_progressbar, monkeypatch):
disconnect_calls = []
@@ -240,148 +232,28 @@ def test_cleanup_disconnects_active_device_subscription(scan_progressbar, monkey
monkeypatch.setattr(scan_progressbar.progressbar, "deleteLater", lambda: None)
monkeypatch.setattr(BECWidget, "cleanup", lambda self: None)
scan_progressbar.set_progress_source(ProgressSource.DEVICE_PROGRESS, device="motor1")
with (
mock.patch.object(scan_progressbar, "close", wraps=scan_progressbar.close) as close_mock,
mock.patch.object(
scan_progressbar, "deleteLater", wraps=scan_progressbar.deleteLater
) as delete_later_mock,
):
ScanProgressBar.cleanup(scan_progressbar)
assert disconnect_calls == [MessageEndpoints.scan_progress(), MessageEndpoints.scan_status()]
assert scan_progressbar.progress_tracker._connected is False
close_mock.assert_not_called()
delete_later_mock.assert_not_called()
def test_cleanup_stops_active_task(scan_progressbar, monkeypatch):
monkeypatch.setattr(BECWidget, "cleanup", lambda self: None)
scan_progressbar.progress_tracker.task = ProgressTask(parent=scan_progressbar)
scan_progressbar.progress_tracker._active_scan_id = "scan-1"
timer = scan_progressbar.progress_tracker.task.timer
ScanProgressBar.cleanup(scan_progressbar)
assert disconnect_calls == [
MessageEndpoints.device_progress(device="motor1"),
MessageEndpoints.scan_queue_status(),
]
assert scan_progressbar._progress_source is None
assert scan_progressbar._progress_device is None
def test_progressbar_queue_update(scan_progressbar):
"""
Test that an empty queue update does not change the progress source.
"""
msg = messages.ScanQueueStatusMessage(
queue={"primary": messages.ScanQueueStatus(info=[], status="RUNNING")}
)
with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source:
scan_progressbar.on_queue_update(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
mock_set_source.assert_not_called()
def test_progressbar_queue_update_with_scan(scan_progressbar, scan_message):
"""
Test that a queue update with a scan changes the progress source to SCAN_PROGRESS.
"""
request_block = messages.RequestBlock(
msg=scan_message,
RID="some-rid",
scan_motors=["samx"],
readout_priority={"monitored": ["samx"]},
is_scan=True,
scan_number=1,
scan_id="e3f50794-852c-4bb1-965e-41c585ab0aa9",
report_instructions=[{"scan_progress": 20}],
)
msg = messages.ScanQueueStatusMessage(
metadata={},
queue={
"primary": messages.ScanQueueStatus(
info=[
messages.QueueInfoEntry(
queue_id="40831e2c-fbd1-4432-8072-ad168a7ad964",
scan_id=["e3f50794-852c-4bb1-965e-41c585ab0aa9"],
status="RUNNING",
active_request_block=request_block,
is_scan=[True],
request_blocks=[request_block],
scan_number=[1],
)
],
status="RUNNING",
)
},
)
with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source:
scan_progressbar.on_queue_update(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
mock_set_source.assert_called_once_with(ProgressSource.SCAN_PROGRESS)
def test_progressbar_queue_update_with_device(scan_progressbar, scan_message):
"""
Test that a queue update with a device changes the progress source to DEVICE_PROGRESS.
"""
request_block = messages.RequestBlock(
msg=scan_message,
RID="some-rid",
scan_motors=["samx"],
readout_priority={"monitored": ["samx"]},
is_scan=True,
scan_number=1,
scan_id="e3f50794-852c-4bb1-965e-41c585ab0aa9",
report_instructions=[{"device_progress": ["samx"]}],
)
msg = messages.ScanQueueStatusMessage(
metadata={},
queue={
"primary": messages.ScanQueueStatus(
info=[
messages.QueueInfoEntry(
queue_id="40831e2c-fbd1-4432-8072-ad168a7ad964",
scan_id=["e3f50794-852c-4bb1-965e-41c585ab0aa9"],
status="RUNNING",
active_request_block=request_block,
is_scan=[True],
request_blocks=[request_block],
scan_number=[1],
)
],
status="RUNNING",
)
},
)
with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source:
scan_progressbar.on_queue_update(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
mock_set_source.assert_called_once_with(ProgressSource.DEVICE_PROGRESS, device="samx")
def test_progressbar_queue_update_with_no_scan_or_device(scan_progressbar, scan_message):
"""
Test that a queue update with neither scan nor device does not change the progress source.
"""
request_block = messages.RequestBlock(
msg=scan_message,
RID="some-rid",
scan_motors=["samx"],
readout_priority={"monitored": ["samx"]},
is_scan=True,
scan_number=1,
scan_id="e3f50794-852c-4bb1-965e-41c585ab0aa9",
)
msg = messages.ScanQueueStatusMessage(
metadata={},
queue={
"primary": messages.ScanQueueStatus(
info=[
messages.QueueInfoEntry(
queue_id="40831e2c-fbd1-4432-8072-ad168a7ad964",
scan_id=["e3f50794-852c-4bb1-965e-41c585ab0aa9"],
status="RUNNING",
active_request_block=request_block,
is_scan=[True],
request_blocks=[request_block],
scan_number=[1],
)
],
status="RUNNING",
)
},
)
with mock.patch.object(scan_progressbar, "set_progress_source") as mock_set_source:
scan_progressbar.on_queue_update(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
mock_set_source.assert_not_called()
assert not timer.isActive()
assert scan_progressbar.progress_tracker.task is None
assert scan_progressbar.progress_tracker._active_scan_id is None