Compare commits

..

1 Commits

Author SHA1 Message Date
David Perl 0f4b307fc6 wip: interlock gui 2026-06-01 14:29:20 +02:00
30 changed files with 360 additions and 1093 deletions
-63
View File
@@ -1,69 +1,6 @@
# CHANGELOG
## v3.13.5 (2026-06-02)
### Bug Fixes
- Change prints into proper logs
([`c8275fc`](https://github.com/bec-project/bec_widgets/commit/c8275fcfd5c920393df3aa201c32a632ac8086a5))
- **abort_button**: From __future__ import annotations
([`3796984`](https://github.com/bec-project/bec_widgets/commit/37969841822c8c38c23a1d8fca8e38aec684957b))
- **client_utils**: Increase default rpc timeout to 60s
([`07515d2`](https://github.com/bec-project/bec_widgets/commit/07515d24be6e930b1b40170fc710255914cb7454))
- **client_utils**: Stop output reader thread on shutdown
([`4572760`](https://github.com/bec-project/bec_widgets/commit/4572760b56ca2ab6435db3a6a4ba0d270e9008d1))
- **companion_app**: Disable logging of bec_lib.scan_items on widget side
([`bd66afb`](https://github.com/bec-project/bec_widgets/commit/bd66afb98dcb76ca87b0db1334df3c1af0a9dbad))
- **forms**: Gridlayout applied to widget which already has layout
([`1427c70`](https://github.com/bec-project/bec_widgets/commit/1427c70cfb6f84bbced7f72ec5cfa55ac0b9b742))
- **launch_window**: Exclude launcher check for non-parented widgets for BECMainWindow
([`ed68eb5`](https://github.com/bec-project/bec_widgets/commit/ed68eb5ac6b20cfc7ca2c0b91864dc54fb579499))
- **launcher**: Avoid orphan widgets detection and logging
([`9f94ca7`](https://github.com/bec-project/bec_widgets/commit/9f94ca7748d73a30622ecbaef384f4bc73a3d2fb))
- **logging**: Removed args/kwargs from logging messages
([`2fb7fb2`](https://github.com/bec-project/bec_widgets/commit/2fb7fb2ff487863c3bc931498496da74b25e52d8))
- **rpc**: Additional logs
([`e41e609`](https://github.com/bec-project/bec_widgets/commit/e41e60956b54890b70b3390b981196c9477abd93))
- **rpc**: Client/server rpc handshake for shutdown
([`8a180ea`](https://github.com/bec-project/bec_widgets/commit/8a180eaa7be5c1603d893cf3b50585f88f9b0c83))
- **rpc**: Log dispatcher receipt before qt callback
([`878745b`](https://github.com/bec-project/bec_widgets/commit/878745b99ac1e22c0fbddecc294e599469a2adfe))
- **rpc**: More robust shutdown section with PID logging
([`e42a982`](https://github.com/bec-project/bec_widgets/commit/e42a9824ccd54b71a3141aaf2aa4e02af6a13782))
- **rpc_server**: Log warning if rpc call is repeated
([`859563a`](https://github.com/bec-project/bec_widgets/commit/859563abb3e94ff55886e72db3177522900a89b8))
### Refactoring
- **client_utils**: Simplify PID fetching
([`154ae60`](https://github.com/bec-project/bec_widgets/commit/154ae6026a6471b7c1db42f7c2ff3dc7be4b4afb))
- **rpc**: Share logging helpers
([`8e1e282`](https://github.com/bec-project/bec_widgets/commit/8e1e282fac22ab6f726049758306c7ca17af70eb))
## v3.13.4 (2026-05-29)
### Bug Fixes
- **positioner_box**: Fix STOP button
([`9a58dba`](https://github.com/bec-project/bec_widgets/commit/9a58dba414d9eec32fd7de7fc64c97c38f020b84))
## v3.13.3 (2026-05-22)
### Bug Fixes
+20 -65
View File
@@ -5,7 +5,6 @@ import json
import os
import signal
import sys
import traceback
from contextlib import redirect_stderr, redirect_stdout
import darkdetect
@@ -64,7 +63,6 @@ class GUIServer:
self.app: QApplication | None = None
self.launcher_window: LaunchWindow | None = None
self.dispatcher: BECDispatcher | None = None
self._shutdown_started = False
def start(self):
"""
@@ -76,7 +74,6 @@ class GUIServer:
bec_logger._stderr_log_level = bec_logger.LOGLEVEL.ERROR
bec_logger._update_sinks()
bec_logger.disabled_modules = ["bec_lib.scan_items"]
with redirect_stdout(SimpleFileLikeFromLogOutputFunc(logger.info)): # type: ignore
with redirect_stderr(SimpleFileLikeFromLogOutputFunc(logger.error)): # type: ignore
self._run()
@@ -125,8 +122,17 @@ class GUIServer:
self.app.aboutToQuit.connect(self.shutdown)
self.app.setQuitOnLastWindowClosed(True)
signal.signal(signal.SIGINT, self.request_shutdown)
signal.signal(signal.SIGTERM, self.request_shutdown)
def sigint_handler(*args):
# display message, for people to let it terminate gracefully
print("Caught SIGINT, exiting")
# Widgets should be all closed.
with RPCRegister.delayed_broadcast():
for widget in QApplication.instance().topLevelWidgets(): # type: ignore
widget.close()
self.shutdown()
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
sys.exit(self.app.exec())
@@ -143,67 +149,16 @@ class GUIServer:
)
self.app.setWindowIcon(icon)
def request_shutdown(self, signum=None, _frame=None):
"""
Request Qt application shutdown from an RPC call or OS signal.
Cleanup itself is handled by ``shutdown()``, which is connected to
``QApplication.aboutToQuit``. Calling it directly here would run BEC/RPC
teardown before Qt has processed the widget close events.
"""
signal_name = signal.Signals(signum).name if signum is not None else "shutdown"
pid = os.getpid()
if self.app is None:
logger.info(f"Caught {signal_name}, shutting down GUI server pid={pid} without app")
self.shutdown()
return
widgets = [
f"{widget.__class__.__name__}(objectName={widget.objectName()!r})"
for widget in self.app.topLevelWidgets()
]
logger.info(
f"Caught {signal_name}, requesting GUI server shutdown pid={pid} "
f"top_level_widgets={widgets}"
)
with RPCRegister.delayed_broadcast():
for widget in self.app.topLevelWidgets():
widget.close()
self.app.quit()
@staticmethod
def _run_shutdown_step(step: str, callback):
try:
callback()
except Exception as exc:
logger.error(
f"GUIServer shutdown step failed pid={os.getpid()} step={step}: {exc}\n"
f"{traceback.format_exc()}"
)
def shutdown(self):
if self._shutdown_started:
return
self._shutdown_started = True
logger.info(f"Shutdown GUIServer pid={os.getpid()} {repr(self)}")
def close_launcher_window():
if self.launcher_window and shiboken6.isValid(self.launcher_window):
self.launcher_window.close()
self.launcher_window.deleteLater()
def stop_pylsp_server():
if pylsp_server.is_running():
pylsp_server.stop()
def stop_dispatcher():
if self.dispatcher:
self.dispatcher.stop_cli_server()
self.dispatcher.disconnect_all()
self._run_shutdown_step("close_launcher_window", close_launcher_window)
self._run_shutdown_step("stop_pylsp_server", stop_pylsp_server)
self._run_shutdown_step("stop_dispatcher", stop_dispatcher)
logger.info("Shutdown GUIServer", repr(self))
if self.launcher_window and shiboken6.isValid(self.launcher_window):
self.launcher_window.close()
self.launcher_window.deleteLater()
if pylsp_server.is_running():
pylsp_server.stop()
if self.dispatcher:
self.dispatcher.stop_cli_server()
self.dispatcher.disconnect_all()
def main():
+26 -57
View File
@@ -207,7 +207,6 @@ class LaunchWindow(BECMainWindow):
self.app = QApplication.instance()
self.tiles: dict[str, LaunchTile] = {}
self._logged_unparented_connections: set[str] = set()
# Track the smallest mainlabel font size chosen so far
self._min_main_label_pt: int | None = None
@@ -656,83 +655,53 @@ class LaunchWindow(BECMainWindow):
super().showEvent(event)
self.setFixedSize(self.size())
def _has_external_window(self, connections: dict) -> bool:
def _launcher_is_last_widget(self, connections: dict) -> bool:
"""
Check if any registered non-launcher connection owns a top-level Qt window.
Check if the launcher is the last widget in the application.
"""
# get all parents of connections
for connection in connections.values():
if self._connection_belongs_to_launcher(connection):
continue
if isinstance(connection, QWidget) and connection.isWindow():
return True
return False
def _log_unparented_connections(self, connections: dict) -> None:
"""
Log non-launcher RPC connections that remain without an active top-level window.
"""
for connection in connections.values():
if self._connection_belongs_to_launcher(connection):
continue
if isinstance(connection, QWidget) and connection.isWindow():
continue
connection_description = (
f"type={type(connection).__name__} objectName={connection.objectName()!r} "
f"gui_id={connection.gui_id!r}"
)
if connection_description in self._logged_unparented_connections:
continue
self._logged_unparented_connections.add(connection_description)
logger.warning(
"Registered non-launcher RPC connection has no active top-level window: "
f"{connection_description}"
)
def _connection_belongs_to_launcher(self, connection: QObject) -> bool:
"""
Check whether a registered connection is the launcher itself or part of its Qt hierarchy.
"""
if connection is self or connection.gui_id == self.gui_id:
return True
parent = connection.parent()
while parent is not None:
if parent is self:
return True
parent = parent.parent()
return False
try:
parent = connection.parent()
if parent is None and connection.objectName() != self.objectName():
logger.info(
f"Found non-launcher connection without parent: {connection.objectName()}"
)
return False
except Exception as e:
logger.error(f"Error getting parent of connection: {e}")
return False
return True
def _turn_off_the_lights(self, connections: dict):
"""
If there is only one connection remaining, it is the launcher, so we show it.
Once the launcher is closed as the last window, we quit the application.
"""
if self._has_external_window(connections):
self.hide()
if self._launcher_is_last_widget(connections):
self.show()
self.activateWindow()
self.raise_()
if self.app:
self.app.setQuitOnLastWindowClosed(False) # type: ignore
self.app.setQuitOnLastWindowClosed(True) # type: ignore
return
self._log_unparented_connections(connections)
self.show()
self.activateWindow()
self.raise_()
self.hide()
if self.app:
self.app.setQuitOnLastWindowClosed(True) # type: ignore
self.app.setQuitOnLastWindowClosed(False) # type: ignore
def closeEvent(self, event):
"""
Close the launcher window.
"""
connections = self.register.list_all_connections()
if self._has_external_window(connections):
event.ignore()
self.hide()
if self._launcher_is_last_widget(connections):
event.accept()
return
event.accept()
event.ignore()
self.hide()
if __name__ == "__main__": # pragma: no cover
+9 -146
View File
@@ -5,7 +5,6 @@ from __future__ import annotations
import json
import os
import select
import signal
import subprocess
import threading
import time
@@ -34,12 +33,6 @@ else:
logger = bec_logger.logger
IGNORE_WIDGETS = ["LaunchWindow"]
PROCESS_TERMINATION_TIMEOUT = 10
PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT = 2
PROCESS_OUTPUT_SELECT_TIMEOUT = 0.2
GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT = 3
GRACEFUL_SERVER_SHUTDOWN_TIMEOUT = 5
OUTPUT_READER_STOP_EVENT_ATTR = "_bec_output_reader_stop_event"
RegistryState: TypeAlias = dict[
Literal["gui_id", "name", "widget_class", "config", "__rpc__", "container_proxy"],
@@ -60,16 +53,14 @@ def _filter_output(output: str) -> str:
return output
def _get_output(process, logger, stop_event: threading.Event | None = None) -> None:
def _get_output(process, logger) -> None:
log_func = {process.stdout: logger.debug, process.stderr: logger.info}
stream_buffer = {process.stdout: [], process.stderr: []}
try:
os.set_blocking(process.stdout.fileno(), False)
os.set_blocking(process.stderr.fileno(), False)
while process.poll() is None and not (stop_event and stop_event.is_set()):
readylist, _, _ = select.select(
[process.stdout, process.stderr], [], [], PROCESS_OUTPUT_SELECT_TIMEOUT
)
while process.poll() is None:
readylist, _, _ = select.select([process.stdout, process.stderr], [], [], 1)
for stream in (process.stdout, process.stderr):
buf = stream_buffer[stream]
if stream in readylist:
@@ -84,95 +75,6 @@ def _get_output(process, logger, stop_event: threading.Event | None = None) -> N
logger.error(f"Error reading process output: {str(e)}")
def _process_group_snapshot(process) -> str:
try:
pgid = os.getpgid(process.pid)
except ProcessLookupError:
return "Process group snapshot unavailable: process already exited"
try:
result = subprocess.run(
["ps", "-o", "pid,ppid,pgid,stat,command", "-g", str(pgid)],
check=False,
capture_output=True,
text=True,
timeout=2,
)
except Exception as exc:
return f"Process group snapshot unavailable: {exc}"
output = result.stdout.strip()
if not output:
return f"Process group snapshot empty for pgid={pgid}"
return output
def _terminate_plot_process(process, logger, timeout: float = PROCESS_TERMINATION_TIMEOUT) -> None:
if process.poll() is not None:
return
process_info = f"pid={process.pid} command={process.args}"
try:
pgid = os.getpgid(process.pid)
process_info = f"pid={process.pid} pgid={pgid} command={process.args}"
logger.info(f"Terminating GUI process group {process_info}")
os.killpg(pgid, signal.SIGTERM)
except ProcessLookupError:
process.wait(timeout=timeout)
return
except Exception as exc:
logger.warning("Failed to terminate GUI process group; terminating process only.")
logger.info(f"GUI process termination failure details: {exc}. pid={process.pid}")
process.terminate()
try:
process.wait(timeout=timeout)
return
except subprocess.TimeoutExpired:
logger.warning(f"GUI process did not stop within {timeout}s; killing it.")
logger.info(
f"GUI process force-kill details: {process_info}\n"
f"{_process_group_snapshot(process)}"
)
try:
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
except ProcessLookupError as e:
logger.error(f"Failed to kill GUI process group: {e}")
process.wait(timeout=timeout)
return
process.wait(timeout=timeout)
def _wait_for_process_exit(process, timeout: float) -> bool:
try:
process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
return False
return True
def _join_process_output_thread(process, thread: threading.Thread | None, logger) -> None:
if thread is None:
return
thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT)
if not thread.is_alive():
return
if stop_event := getattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, None):
stop_event.set()
for stream in (process.stdout, process.stderr):
if stream is None:
continue
try:
stream.close()
except OSError as e:
logger.error(f"Failed to close stream {str(e)}")
thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT)
if thread.is_alive():
logger.warning("GUI process output reader thread did not stop after process shutdown.")
logger.info(f"GUI process output reader thread details: pid={process.pid}")
def _start_plot_process(
gui_id: str,
gui_class_id: str,
@@ -224,14 +126,8 @@ def _start_plot_process(
if logger is None:
process_output_processing_thread = None
else:
process_output_stop_event = threading.Event()
process_output_processing_thread = threading.Thread(
target=_get_output, args=(process, logger, process_output_stop_event)
)
setattr(
process_output_processing_thread,
OUTPUT_READER_STOP_EVENT_ATTR,
process_output_stop_event,
target=_get_output, args=(process, logger)
)
process_output_processing_thread.start()
return process, process_output_processing_thread
@@ -326,7 +222,7 @@ class BECGuiClient(RPCBase):
self._ipython_registry: dict[str, RPCReference] = {}
self.available_widgets = AvailableWidgetsNamespace()
register_serializer_extension()
self._rpc_timeout = 60
self._rpc_timeout = 5
####################
#### Client API ####
@@ -569,13 +465,11 @@ class BECGuiClient(RPCBase):
if self._process:
logger.success("Stopping GUI...")
if not self._request_server_shutdown():
_terminate_plot_process(self._process, logger)
_join_process_output_thread(
self._process, self._process_output_processing_thread, logger
)
self._process.terminate()
if self._process_output_processing_thread:
self._process_output_processing_thread.join()
self._process.wait()
self._process = None
self._process_output_processing_thread = None
# Unregister the registry state
self._client.connector.unregister(
@@ -594,37 +488,6 @@ class BECGuiClient(RPCBase):
#### Private methods ####
#########################
def _request_server_shutdown(self) -> bool:
if self._process is None or self._process.poll() is not None:
return True
process_details = f"pid={self._process.pid} command={self._process.args}"
logger.info(f"Requesting graceful GUI shutdown {process_details}")
try:
self.launcher._run_rpc( # pylint: disable=protected-access
"system.shutdown",
wait_for_rpc_response=True,
timeout=GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT,
)
except Exception as exc:
logger.warning(
"Could not confirm graceful GUI shutdown via RPC; "
"falling back to process termination."
)
logger.info(f"Graceful GUI shutdown RPC failure details: {exc}. {process_details}")
return False
if _wait_for_process_exit(self._process, GRACEFUL_SERVER_SHUTDOWN_TIMEOUT):
logger.info(f"GUI server exited after graceful shutdown {process_details}")
return True
logger.warning(
"GUI server did not exit after graceful shutdown request; "
"falling back to process termination."
)
logger.info(
f"Graceful GUI shutdown timeout details: {process_details}\n"
f"{_process_group_snapshot(self._process)}"
)
return False
def _check_if_server_is_alive(self):
"""Checks if the process is alive"""
if self._process is None:
-37
View File
@@ -2,7 +2,6 @@ from __future__ import annotations
import inspect
import threading
import time
import uuid
from functools import wraps
from typing import TYPE_CHECKING, Any, cast
@@ -10,7 +9,6 @@ from typing import TYPE_CHECKING, Any, cast
from bec_lib.client import BECClient
from bec_lib.device import DeviceBaseWithConfig
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
if TYPE_CHECKING: # pragma: no cover
@@ -27,7 +25,6 @@ else:
# pylint: disable=protected-access
_DEFAULT_RPC_TIMEOUT = object()
logger = bec_logger.logger
def _name_arg(arg):
@@ -264,39 +261,12 @@ class RPCBase:
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
)
target_gui_id = gui_id or self._gui_id
sent_at = time.time()
deadline = sent_at + timeout if timeout is not None else None
rpc_msg.metadata.update(
{
"method": method,
"receiver": receiver,
"target_gui_id": target_gui_id,
"object_name": self.object_name,
"wait_for_response": wait_for_rpc_response,
"timeout": timeout,
"sent_at": sent_at,
"deadline": deadline,
}
)
logger.info(
"Sending GUI RPC request "
f"request_id={request_id} method={method} receiver={receiver} "
f"target_gui_id={target_gui_id} object_name={self.object_name} "
f"wait_for_response={wait_for_rpc_response} timeout={timeout}"
)
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
if wait_for_rpc_response:
try:
finished = self._msg_wait_event.wait(timeout)
if not finished:
logger.error(
"GUI RPC response timeout "
f"request_id={request_id} method={method} receiver={receiver} "
f"target_gui_id={target_gui_id} object_name={self.object_name} "
f"timeout={timeout}"
)
raise RPCResponseTimeoutError(request_id, timeout)
finally:
self._msg_wait_event.clear()
@@ -308,12 +278,6 @@ class RPCBase:
# the _on_rpc_response method
assert isinstance(self._rpc_response, messages.RequestResponseMessage)
logger.info(
"Received GUI RPC response "
f"request_id={request_id} method={method} receiver={receiver} "
f"target_gui_id={target_gui_id} object_name={self.object_name} "
f"accepted={self._rpc_response.accepted}"
)
if not self._rpc_response.accepted:
raise ValueError(self._rpc_response.message["error"])
msg_result = self._rpc_response.message.get("result")
@@ -322,7 +286,6 @@ class RPCBase:
def _on_rpc_response(self, msg_obj: MessageObject) -> None:
msg = cast(messages.RequestResponseMessage, msg_obj.value)
logger.debug(f"GUI RPC response callback received: {msg}")
self._rpc_response = msg
self._msg_wait_event.set()
+1 -56
View File
@@ -3,9 +3,8 @@ from __future__ import annotations
import collections
import random
import string
import time
from collections.abc import Callable
from typing import TYPE_CHECKING, Any, DefaultDict, Hashable, Union
from typing import TYPE_CHECKING, DefaultDict, Hashable, Union
import louie
import redis
@@ -16,7 +15,6 @@ from bec_lib.service_config import ServiceConfig
from qtpy.QtCore import QObject
from qtpy.QtCore import Signal as pyqtSignal
from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed
from bec_widgets.utils.serialization import register_serializer_extension
logger = bec_logger.logger
@@ -24,43 +22,9 @@ logger = bec_logger.logger
if TYPE_CHECKING: # pragma: no cover
from bec_lib.endpoints import EndpointInfo
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.rpc_server import RPCServer
def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None:
if not isinstance(msg_content, dict) or not isinstance(metadata, dict):
return
request_id = metadata.get("request_id")
method = msg_content.get("action")
parameter = msg_content.get("parameter")
if request_id is None or method is None or not isinstance(parameter, dict):
return
dispatch_received_at = time.time()
sent_at = metadata.get("sent_at")
deadline = metadata.get("deadline")
timeout = metadata.get("timeout")
dispatch_latency = elapsed_seconds(sent_at, dispatch_received_at)
stale_on_dispatch = deadline is not None and dispatch_received_at > deadline
target_gui_id = parameter.get("gui_id") or metadata.get("target_gui_id")
logger.info(
"GUI RPC dispatcher received request before Qt callback emit "
f"request_id={request_id} method={method} receiver={metadata.get('receiver')} "
f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} "
f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)} "
f"stale_on_dispatch={stale_on_dispatch}"
)
if stale_on_dispatch:
logger.warning(
"GUI RPC dispatcher received request after client timeout deadline "
f"request_id={request_id} method={method} receiver={metadata.get('receiver')} "
f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} "
f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)}"
)
class QtThreadSafeCallback(QObject):
"""QtThreadSafeCallback is a wrapper around a callback function to make it thread-safe for Qt."""
@@ -78,7 +42,6 @@ class QtThreadSafeCallback(QObject):
self.cb_info = cb_info
self.cb = cb
self.cb_owner = louie.saferef.safe_ref(cb.__self__) if hasattr(cb, "__self__") else None
self.cb_ref = louie.saferef.safe_ref(cb)
self.cb_signal.connect(self.cb)
self.topics = set()
@@ -125,12 +88,10 @@ class QtRedisConnector(RedisConnector):
# we can notice kwargs are lost when passed to Qt slot
metadata = msg.metadata
_log_rpc_dispatcher_receive(msg.content, metadata)
cb(msg.content, metadata)
else:
# from stream
msg = msg["data"]
_log_rpc_dispatcher_receive(msg.content, msg.metadata)
cb(msg.content, msg.metadata)
@@ -279,22 +240,6 @@ class BECDispatcher:
# pylint: disable=protected-access
self.disconnect_topics(self.client.connector._topics_cb)
def disconnect_owner(self, owner: BECWidget):
"""
Disconnect all slots owned by a particular widget.
Args:
owner(BECWidget): The owner widget whose slots should be disconnected
"""
slots_to_disconnect = []
for connected_slot in self._registered_slots.values():
if connected_slot.cb_owner is not None and connected_slot.cb_owner() == owner:
slots_to_disconnect.append(connected_slot)
for slot in slots_to_disconnect:
topics = slot.topics.copy()
for topic in topics:
self.disconnect_slot(slot.cb, topic)
def start_cli_server(self, gui_id: str | None = None):
"""
Start the CLI server.
+23 -26
View File
@@ -331,40 +331,37 @@ class BECWidget(BECConnector):
# All widgets need to call super().cleanup() in their cleanup method
logger.info(f"Registry cleanup for widget {self.__class__.__name__}")
self.rpc_register.remove_rpc(self)
children = self.findChildren(BECWidget)
for child in children:
if not shiboken6.isValid(child):
# If the child is not valid, it means it has already been deleted
continue
child.close()
child.deleteLater()
children = self.findChildren(BECWidget)
for child in children:
if not shiboken6.isValid(child):
# If the child is not valid, it means it has already been deleted
continue
child.close()
child.deleteLater()
# Tear down busy overlay explicitly to stop spinner and remove filters
overlay = getattr(self, "_busy_overlay", None)
if overlay is not None and shiboken6.isValid(overlay):
try:
overlay.hide()
filt = getattr(overlay, "_filter", None)
if filt is not None and shiboken6.isValid(filt):
try:
self.removeEventFilter(filt)
except Exception as exc:
logger.warning(
f"Failed to remove event filter from busy overlay: {exc}"
)
# Tear down busy overlay explicitly to stop spinner and remove filters
overlay = getattr(self, "_busy_overlay", None)
if overlay is not None and shiboken6.isValid(overlay):
try:
overlay.hide()
filt = getattr(overlay, "_filter", None)
if filt is not None and shiboken6.isValid(filt):
try:
self.removeEventFilter(filt)
except Exception as exc:
logger.warning(f"Failed to remove event filter from busy overlay: {exc}")
# Cleanup the overlay widget. This will call cleanup on the custom widget if present.
# Cleanup the overlay widget. This will call cleanup on the custom widget if present.
overlay.cleanup()
overlay.deleteLater()
except Exception as exc:
logger.warning(f"Failed to delete busy overlay: {exc}")
overlay.cleanup()
overlay.deleteLater()
except Exception as exc:
logger.warning(f"Failed to delete busy overlay: {exc}")
def closeEvent(self, event):
"""Wrap the close even to ensure the rpc_register is cleaned up."""
try:
if not self._destroyed:
self.bec_dispatcher.disconnect_owner(self)
self.cleanup()
self._destroyed = True
finally:
+1 -1
View File
@@ -150,7 +150,7 @@ class TypedForm(BECWidget, QWidget):
self.adjustSize()
def _new_grid_layout(self):
new_grid = QGridLayout()
new_grid = QGridLayout(self)
new_grid.setContentsMargins(0, 0, 0, 0)
return new_grid
-16
View File
@@ -1,16 +0,0 @@
from __future__ import annotations
def elapsed_seconds(start: float | int | None, stop: float) -> float | None:
if start is None:
return None
try:
return max(0.0, stop - float(start))
except (TypeError, ValueError):
return None
def format_elapsed(elapsed: float | None) -> str:
if elapsed is None:
return "unknown"
return f"{elapsed:.3f}"
+9 -112
View File
@@ -1,7 +1,6 @@
from __future__ import annotations
import functools
import time
import traceback
import types
from contextlib import contextmanager
@@ -12,14 +11,13 @@ from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import
from qtpy.QtCore import Qt, QTimer
from qtpy.QtWidgets import QApplication, QWidget
from qtpy.QtWidgets import QWidget
from redis.exceptions import RedisError
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.utils.container_utils import WidgetContainerUtils
from bec_widgets.utils.error_popups import ErrorPopupUtility
from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.screen_utils import apply_window_geometry
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
@@ -117,107 +115,27 @@ class RPCServer:
if request_id is None:
logger.error("Received RPC instruction without request_id")
return
method = msg.get("action")
parameter = msg.get("parameter", {})
args = parameter.get("args", [])
kwargs = parameter.get("kwargs", {})
target_gui_id = parameter.get("gui_id")
sent_at = metadata.get("sent_at")
deadline = metadata.get("deadline")
timeout = metadata.get("timeout")
received_at = time.time()
receive_latency = elapsed_seconds(sent_at, received_at)
stale_on_receive = deadline is not None and received_at > deadline
logger.info(
"GUI RPC server received request "
f"request_id={request_id} method={method} gui_id={self.gui_id} "
f"target_gui_id={target_gui_id} timeout={timeout} "
f"receive_latency_s={format_elapsed(receive_latency)} "
f"stale_on_receive={stale_on_receive}"
)
if stale_on_receive:
logger.warning(
"GUI RPC server received request after client timeout deadline "
f"request_id={request_id} method={method} gui_id={self.gui_id} "
f"target_gui_id={target_gui_id} timeout={timeout} "
f"receive_latency_s={format_elapsed(receive_latency)}"
)
logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}")
# Shutdown must acknowledge before teardown starts. The generic RPC path
# below publishes successful responses through QTimer.singleShot(0);
# for system.shutdown that would race with the queued app quit and
# dispatcher shutdown scheduled by _shutdown_gui_server().
if method == "system.shutdown":
execution_start = time.perf_counter()
try:
self.run_system_rpc(method, args, kwargs)
except Exception:
execution_duration = time.perf_counter() - execution_start
content = traceback.format_exc()
logger.error(
"GUI RPC server shutdown request failed "
f"request_id={request_id} method={method} gui_id={self.gui_id} "
f"execution_duration_s={execution_duration:.3f}\n{content}"
)
self.send_response(request_id, False, {"error": content})
else:
execution_duration = time.perf_counter() - execution_start
logger.info(
"GUI RPC server acknowledged shutdown request "
f"request_id={request_id} method={method} gui_id={self.gui_id} "
f"execution_duration_s={execution_duration:.3f}"
)
self.send_response(request_id, True, {"result": None})
return
execution_start = time.perf_counter()
with rpc_exception_hook(functools.partial(self.send_response, request_id, False)):
try:
method = msg["action"]
args = msg["parameter"].get("args", [])
kwargs = msg["parameter"].get("kwargs", {})
if method.startswith("system."):
res = self.run_system_rpc(method, args, kwargs)
else:
obj = self.get_object_from_config(parameter)
obj = self.get_object_from_config(msg["parameter"])
res = self.run_rpc(obj, method, args, kwargs)
except Exception:
execution_duration = time.perf_counter() - execution_start
content = traceback.format_exc()
logger.error(
"GUI RPC server execution failed "
f"request_id={request_id} method={method} gui_id={self.gui_id} "
f"target_gui_id={target_gui_id} execution_duration_s={execution_duration:.3f}\n"
f"{content}"
)
logger.error(f"Error while executing RPC instruction: {content}")
self.send_response(request_id, False, {"error": content})
else:
execution_duration = time.perf_counter() - execution_start
response_stale = deadline is not None and time.time() > deadline
logger.info(
"GUI RPC server executed request "
f"request_id={request_id} method={method} gui_id={self.gui_id} "
f"target_gui_id={target_gui_id} execution_duration_s={execution_duration:.3f} "
f"response_after_client_deadline={response_stale}"
)
if response_stale:
logger.warning(
"GUI RPC server response is late for client timeout "
f"request_id={request_id} method={method} gui_id={self.gui_id} "
f"target_gui_id={target_gui_id} timeout={timeout} "
f"execution_duration_s={execution_duration:.3f}"
)
logger.debug(f"RPC instruction executed successfully: {res}")
self._rpc_singleshot_repeats[request_id] = SingleshotRPCRepeat()
QTimer.singleShot(0, lambda: self.serialize_result_and_send(request_id, res))
def send_response(self, request_id: str, accepted: bool, msg: dict):
log_message = (
"GUI RPC server publishing response "
f"request_id={request_id} gui_id={self.gui_id} accepted={accepted}"
)
if accepted:
logger.info(log_message)
else:
logger.error(log_message)
self.client.connector.set_and_publish(
MessageEndpoints.gui_instruction_response(request_id),
messages.RequestResponseMessage(accepted=accepted, message=msg),
@@ -318,23 +236,10 @@ class RPCServer:
def run_system_rpc(self, method: str, args: list, kwargs: dict):
if method == "system.launch_dock_area":
return self._launch_dock_area(*args, **kwargs)
if method == "system.shutdown":
return self._shutdown_gui_server()
if method == "system.list_capabilities":
return {"system.launch_dock_area": True, "system.shutdown": True}
return {"system.launch_dock_area": True}
raise ValueError(f"Unknown system RPC method: {method}")
@staticmethod
def _shutdown_gui_server() -> None:
app = QApplication.instance()
if app is None:
return
gui_server = getattr(app, "gui_server", None)
if gui_server is not None and hasattr(gui_server, "request_shutdown"):
QTimer.singleShot(0, gui_server.request_shutdown)
return
QTimer.singleShot(0, app.quit)
@staticmethod
def _launch_dock_area(
name: str | None = None,
@@ -392,14 +297,7 @@ class RPCServer:
res = self.serialize_object(res)
except RegistryNotReadyError:
try:
repeat = self._rpc_singleshot_repeats[request_id]
repeat += retry_delay
logger.warning(
"GUI RPC result serialization delayed; retrying "
f"request_id={request_id} retry_delay_ms={retry_delay} "
f"accumulated_delay_ms={repeat.accumulated_delay} "
f"max_delay_ms={repeat.max_delay}"
)
self._rpc_singleshot_repeats[request_id] += retry_delay
QTimer.singleShot(
retry_delay, lambda: self.serialize_result_and_send(request_id, res)
)
@@ -509,9 +407,8 @@ class RPCServer:
container_proxy = parent.gui_id
else:
container_proxy = None
except Exception as e:
except Exception:
container_proxy = None
logger.error(f"Error while serializing RPC result: {e}")
if wait and not self.rpc_register.object_is_registered(connector):
raise RegistryNotReadyError(f"Connector {connector} not registered yet")
+3 -43
View File
@@ -1,6 +1,6 @@
from bec_lib.logger import bec_logger
from qtpy import PYSIDE6
from qtpy.QtCore import QEvent, QFile, QIODevice, QObject
from qtpy.QtCore import QFile, QIODevice
from bec_widgets.utils.plugin_utils import get_designer_plugin
@@ -9,56 +9,16 @@ logger = bec_logger.logger
if PYSIDE6:
from qtpy.QtUiTools import QUiLoader
class _LoadedUiCloser(QObject):
"""Forward root close events to widgets instantiated by ``QUiLoader``.
Destroying a parent widget does not guarantee ``closeEvent`` is delivered to
every child widget. Some of our designer plugins rely on ``closeEvent`` /
``cleanup`` to unregister callbacks, so explicitly close loaded descendants
when the loaded form itself is closed.
"""
def __init__(self, root_widget):
super().__init__(root_widget)
self._root_widget = root_widget
self._widgets = []
root_widget.installEventFilter(self)
def register_widget(self, widget):
if widget is None or widget is self._root_widget:
return
self._widgets.append(widget)
def eventFilter(self, watched, event):
if watched is self._root_widget and event.type() == QEvent.Close:
for widget in reversed(self._widgets):
try:
widget.close()
except RuntimeError:
continue
return super().eventFilter(watched, event)
class CustomUiLoader(QUiLoader):
def __init__(self, baseinstance):
super().__init__(baseinstance)
self.baseinstance = baseinstance
self._closer = _LoadedUiCloser(baseinstance) if baseinstance is not None else None
def createWidget(self, class_name, parent=None, name=""):
if parent is None and self.baseinstance is not None:
return self.baseinstance
widget_parent = parent if parent is not None else self.baseinstance
widget = get_designer_plugin(class_name, raise_on_missing=False)
if widget is not None:
created_widget = widget(widget_parent)
created_widget.setObjectName(name)
else:
created_widget = super().createWidget(class_name, widget_parent, name)
if self._closer is not None:
self._closer.register_widget(created_widget)
return created_widget
return widget(self.baseinstance)
return super().createWidget(class_name, self.baseinstance, name)
class UILoader:
@@ -1,6 +1,3 @@
from __future__ import annotations
from bec_lib.logger import bec_logger
from bec_qthemes import material_icon
from qtpy.QtCore import Qt
from qtpy.QtWidgets import QHBoxLayout, QPushButton, QToolButton, QWidget
@@ -8,8 +5,6 @@ from qtpy.QtWidgets import QHBoxLayout, QPushButton, QToolButton, QWidget
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
logger = bec_logger.logger
class AbortButton(BECWidget, QWidget):
"""A button that abort the scan."""
@@ -60,7 +55,7 @@ class AbortButton(BECWidget, QWidget):
scan_id(str|None): The scan id to abort. If None, the current scan will be aborted.
"""
if self.scan_id is not None:
logger.info(f"Aborting scan with scan_id: {self.scan_id}")
print(f"Aborting scan with scan_id: {self.scan_id}")
self.queue.request_scan_abortion(scan_id=self.scan_id)
else:
self.queue.request_scan_abortion()
@@ -429,7 +429,7 @@ class PositionerBox2D(PositionerBoxBase):
@SafeSlot()
def on_stop(self):
self._stop_device([self.device_hor, self.device_ver])
self._stop_device(f"{self.device_hor} or {self.device_ver}")
@SafeProperty(float)
def step_size_hor(self):
@@ -1,10 +1,11 @@
import uuid
from abc import abstractmethod
from typing import Callable, Sequence, TypedDict
from typing import Callable, TypedDict
from bec_lib.device import Positioner
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.messages import VariableMessage
from bec_lib.messages import ScanQueueMessage
from qtpy.QtWidgets import (
QDialog,
QDoubleSpinBox,
@@ -115,16 +116,17 @@ class PositionerBoxBase(BECWidget, QWidget):
else:
ui["units"].setVisible(False)
def _stop_device(self, device: str | Sequence[str]):
def _stop_device(self, device: str):
"""Stop call"""
devices = [device] if isinstance(device, str) else list(device)
devices = [dev for dev in devices if dev]
if not devices:
logger.warning("Stop requested without a valid device.")
return
msg = VariableMessage(value=devices)
self.client.connector.send(MessageEndpoints.stop_devices(), msg)
request_id = str(uuid.uuid4())
params = {"device": device, "rpc_id": request_id, "func": "stop", "args": [], "kwargs": {}}
msg = ScanQueueMessage(
scan_type="device_rpc",
parameter=params,
queue="emergency",
metadata={"RID": request_id, "response": False},
)
self.client.connector.send(MessageEndpoints.scan_queue_request(self.client.username), msg)
# pylint: disable=unused-argument
def _on_device_readback(
@@ -460,7 +460,7 @@ class ImageBase(PlotBase):
self._color_bar = None
def disable_autorange():
logger.info("Disabling autorange")
print("Disabling autorange")
self.setProperty("autorange", False)
if style == "simple":
@@ -928,7 +928,7 @@ class ImageBase(PlotBase):
# if sync:
self._sync_colorbar_levels()
self._sync_autorange_switch()
logger.info(f"Autorange set to {enabled}")
print(f"Autorange set to {enabled}")
@SafeProperty(str)
def autorange_mode(self) -> str:
@@ -2449,7 +2449,7 @@ class Waveform(PlotBase):
first_key = next(iter(info))
mem_bytes = info[first_key]["value"]["mem_size"]
size_mb = mem_bytes / (1024 * 1024)
logger.info(f"Dataset size: {size_mb:.1f} MB")
print(f"Dataset size: {size_mb:.1f} MB")
except Exception as exc: # noqa: BLE001
logger.error(f"Unable to evaluate dataset size: {exc}")
return True
@@ -155,6 +155,7 @@ class ScanProgressBar(BECWidget, QWidget):
self._progress_device = None
self.task = None
self.scan_number = None
self.progress_started.connect(lambda: print("Scan progress started"))
def connect_to_queue(self):
"""
@@ -0,0 +1,170 @@
from bec_lib.builtin_actor_hli import ScanInterlockHli
from bec_lib.endpoints import MessageEndpoints
from bec_lib.messages import BlStateStatus
from qtpy import QtGui, QtWidgets
from qtpy.QtCore import Qt
from bec_widgets.utils.bec_widget import BECWidget, ConnectionConfig
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
class ScanInterlockToggle(ToggleSwitch):
def __init__(self, interlock: ScanInterlockHli, parent):
super().__init__(parent=parent)
self._interlock = interlock
def mousePressEvent(self, event):
if self.isEnabled() and event.button() == Qt.MouseButton.LeftButton:
self._interlock.enabled = not self.checked
class BlStateList(QtWidgets.QListWidget):
def __init__(self, interlock: ScanInterlockHli, parent):
super().__init__(parent)
self._interlock = interlock
def dropEvent(self, event: QtGui.QDropEvent, /) -> None:
data = event.mimeData()
if not data.hasText():
return
self._interlock.add_state_to_interlock(data.text(), "valid")
class ScanInterlockControl(BECWidget, QtWidgets.QWidget):
"""
ScanInterlockControl can be used to enable/disable the scan interlock actor,
and add/remove beamline states for it to watch.
"""
RPC = False
PLUGIN = True
def __init__(
self,
parent: QtWidgets.QWidget | None = None,
client=None,
config: ConnectionConfig | None = None,
gui_id: str | None = None,
theme_update: bool = False,
**kwargs,
):
"""
Args:
parent (QtWidgets.QWidget, optional): The parent widget.
client: The BEC client.
config (ConnectionConfig, optional): The connection configuration.
gui_id (str, optional): The GUI ID.
theme_update (bool, optional): Whether to subscribe to theme updates. Defaults to False.
"""
super().__init__(
parent=parent,
client=client,
config=config,
gui_id=gui_id,
theme_update=theme_update,
**kwargs,
)
self._interlock = self.client.builtin_actors.scan_interlock
self._layout = QtWidgets.QVBoxLayout(self)
self._setup_control_layout()
self._setup_list_layout()
self.bec_dispatcher.connect_slot(
self._update_all_content,
MessageEndpoints.builtin_actor_update_notif("ScanInterlockActor"),
)
self._update_all_content()
def _setup_control_layout(self):
self._controls_layout = QtWidgets.QHBoxLayout()
self._layout.addLayout(self._controls_layout)
self._enabled_text = QtWidgets.QLabel()
self._enabled_text.setText("Widget Uninitialised")
self._enabled_toggle = ScanInterlockToggle(self._interlock, parent=self)
self._controls_layout.addWidget(self._enabled_text)
self._controls_layout.addWidget(self._enabled_toggle)
self._enabled_toggle
def _set_enabled_text(self, enabled: bool):
self._enabled_text.setText(
"Scan Interlock Enabled" if enabled else "Scan Interlock Disabled"
)
def _setup_list_layout(self):
self._list_layout = QtWidgets.QVBoxLayout()
self._layout.addLayout(self._list_layout)
self._list_layout.addWidget(QtWidgets.QLabel("Beamline states watched:"))
self._bl_states_list = BlStateList(self._interlock, self)
self._bl_states_list.setDragDropMode(QtWidgets.QListWidget.DragDropMode.DropOnly)
self._list_layout.addWidget(self._bl_states_list)
self._delete_button_layout = QtWidgets.QHBoxLayout()
self._list_layout.addLayout(self._delete_button_layout)
self._delete_button = QtWidgets.QPushButton("Remove selected states from interlock")
self._delete_button_layout.addWidget(self._delete_button)
self._delete_button.clicked.connect(self._delete_selected)
self._delete_all_button = QtWidgets.QPushButton("Remove all states")
self._delete_button_layout.addWidget(self._delete_all_button)
self._delete_all_button.clicked.connect(self._delete_all)
@SafeSlot()
def _delete_selected(self):
to_delete = [i.text() for i in self._bl_states_list.selectedItems()]
for state in to_delete:
self._interlock.remove_state_from_interlock(state)
@SafeSlot()
def _delete_all(self):
self._interlock.clear_all()
def _update_list(self, list_content: dict[str, BlStateStatus]):
self._bl_states_list.clear()
for item in list_content:
self._bl_states_list.addItem(item)
@SafeSlot()
def _update_all_content(self, *_, **__):
self._set_enabled_text(self._interlock.enabled)
self._enabled_toggle.setChecked(self._interlock.enabled)
self._update_list(self._interlock.states_watched)
if __name__ == "__main__": # pragma: no cover
# pylint: disable=import-outside-toplevel
from qtpy.QtWidgets import QApplication
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
app = QApplication([])
main_window = QtWidgets.QMainWindow()
central_widget = QtWidgets.QWidget()
button = DarkModeButton()
layout = QtWidgets.QVBoxLayout(central_widget)
main_window.setCentralWidget(central_widget)
scan_interlock_control = ScanInterlockControl() # type: ignore
layout.addWidget(button)
layout.addWidget(scan_interlock_control)
class TestList(QtWidgets.QListWidget):
def mimeData(self, items, /):
mimedata = super().mimeData(items)
text = ",".join([i.text() for i in items])
mimedata.setText(text)
return mimedata
test_list = TestList()
test_list.addItems(["samx_in_limits", "samy_in_limits"])
test_list.setDragEnabled(True)
layout.addWidget(test_list)
lineedit = QtWidgets.QLineEdit()
lineedit.setDragEnabled(True)
layout.addWidget(lineedit)
main_window.setWindowTitle("Scan Interlock Control")
main_window.resize(800, 400)
main_window.show()
app.exec_()
+1 -3
View File
@@ -1,6 +1,6 @@
[project]
name = "bec_widgets"
version = "3.13.5"
version = "3.13.3"
description = "BEC Widgets"
requires-python = ">=3.11"
classifiers = [
@@ -71,8 +71,6 @@ qtermwidget = ["pyside6_qtermwidget"]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
+1 -1
View File
@@ -69,7 +69,7 @@ def create_widget(
return widget
@pytest.mark.timeout(20)
@pytest.mark.timeout(100)
def test_available_widgets(qtbot, connected_client_gui_obj):
"""This test checks that all widgets that are available via gui.available_widgets can be created and removed."""
gui = connected_client_gui_obj
-26
View File
@@ -6,7 +6,6 @@ from qtpy.QtCore import QObject
from qtpy.QtWidgets import QApplication, QWidget
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeProperty
from bec_widgets.utils.error_popups import SafeSlot as Slot
@@ -16,9 +15,6 @@ from .client_mocks import mocked_client
class BECConnectorQObject(BECConnector, QObject): ...
class _CleanupBroadcastWidget(BECWidget, QWidget): ...
@pytest.fixture
def bec_connector(mocked_client):
connector = BECConnectorQObject(client=mocked_client)
@@ -150,28 +146,6 @@ def test_bec_connector_change_object_name(bec_connector):
assert not any(obj.objectName() == previous_name for obj in all_objects)
def test_bec_widget_cleanup_broadcasts_after_children_are_unregistered(mocked_client, qtbot):
parent = _CleanupBroadcastWidget(client=mocked_client, object_name="cleanup_parent")
child = _CleanupBroadcastWidget(
parent=parent, client=mocked_client, object_name="cleanup_child"
)
qtbot.addWidget(parent)
observed_connections = []
parent.rpc_register.callbacks.append(
lambda connections: observed_connections.append(set(connections))
)
parent.close()
assert parent._destroyed is True
assert child.gui_id not in parent.rpc_register.list_all_connections()
assert all(
parent.gui_id in snapshot or child.gui_id not in snapshot
for snapshot in observed_connections
)
def test_bec_connector_export_settings():
class MyWidget(BECConnector, QWidget):
+1 -47
View File
@@ -5,7 +5,7 @@ from unittest import mock
import pytest
from bec_lib import service_config
from bec_lib.messages import GUIInstructionMessage, ScanMessage
from bec_lib.messages import ScanMessage
from bec_lib.serialization import MsgpackSerialization
from bec_widgets.utils.bec_dispatcher import BECDispatcher, QtRedisConnector, QtThreadSafeCallback
@@ -213,49 +213,3 @@ def test_dispatcher_2_topic_same_cb_with_boundmethod(
send_msg_event.set()
qtbot.wait(10)
def test_qt_redis_connector_logs_rpc_before_qt_callback(monkeypatch):
info_mock = mock.MagicMock()
warning_mock = mock.MagicMock()
monkeypatch.setattr("bec_widgets.utils.bec_dispatcher.logger.info", info_mock)
monkeypatch.setattr("bec_widgets.utils.bec_dispatcher.logger.warning", warning_mock)
def callback(_msg, _metadata):
pass
cb = QtThreadSafeCallback(callback)
connector = QtRedisConnector("localhost:1", mock.MagicMock())
rpc_msg = GUIInstructionMessage(
action="set_value",
parameter={"args": [1], "kwargs": {"source": "test"}, "gui_id": "ring"},
metadata={
"request_id": "dispatcher-request",
"receiver": "gui",
"object_name": "progressbar",
"timeout": 0.1,
"sent_at": 1.0,
"deadline": 1.1,
},
)
try:
connector._execute_callback(cb, {"data": rpc_msg}, {})
info_mock.assert_called_once()
info_message = info_mock.call_args.args[0]
assert "GUI RPC dispatcher received request before Qt callback emit" in info_message
assert "request_id=dispatcher-request" in info_message
assert "method=set_value" in info_message
assert "receiver=gui" in info_message
assert "target_gui_id=ring" in info_message
assert "object_name=progressbar" in info_message
assert "timeout=0.1" in info_message
assert "stale_on_dispatch=True" in info_message
warning_mock.assert_called_once()
warning_message = warning_mock.call_args.args[0]
assert "received request after client timeout deadline" in warning_message
assert "request_id=dispatcher-request" in warning_message
finally:
connector.shutdown()
+2 -108
View File
@@ -1,18 +1,10 @@
import signal
import subprocess
from contextlib import contextmanager
from unittest import mock
import pytest
from bec_widgets.cli.client import BECDockArea
from bec_widgets.cli.client_utils import (
GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT,
OUTPUT_READER_STOP_EVENT_ATTR,
BECGuiClient,
_join_process_output_thread,
_start_plot_process,
)
from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process
from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCResponseTimeoutError, rpc_timeout
@@ -270,105 +262,7 @@ def test_client_utils_delete_falls_back_to_direct_close():
def test_client_utils_gui_client_set_rpc_timeout():
gui = BECGuiClient()
assert gui._rpc_timeout == 60
assert gui._rpc_timeout == 5
gui.set_rpc_timeout(10)
assert gui._rpc_timeout == 10
def test_client_utils_kill_server_waits_for_process_before_joining_output_thread():
gui = BECGuiClient()
gui._client = mock.MagicMock()
gui._process = mock.MagicMock(pid=123, stdout=None, stderr=None)
gui._process.poll.return_value = None
order = []
gui._process.wait.side_effect = lambda timeout: order.append("wait")
gui._process_output_processing_thread = mock.MagicMock()
gui._process_output_processing_thread.join.side_effect = lambda timeout: order.append("join")
gui._process_output_processing_thread.is_alive.return_value = False
with (
mock.patch.object(gui, "_request_server_shutdown", return_value=False),
mock.patch("bec_widgets.cli.client_utils.os.getpgid", return_value=123),
mock.patch("bec_widgets.cli.client_utils.os.killpg") as killpg,
):
gui.kill_server()
killpg.assert_called_once_with(123, signal.SIGTERM)
assert order == ["wait", "join"]
assert gui._process is None
assert gui._process_output_processing_thread is None
def test_client_utils_kill_server_requests_graceful_shutdown_before_signal():
gui = BECGuiClient()
gui._client = mock.MagicMock()
process = mock.MagicMock(stdout=None, stderr=None)
process.poll.return_value = None
gui._process = process
gui._process_output_processing_thread = mock.MagicMock()
gui._process_output_processing_thread.is_alive.return_value = False
launcher = mock.MagicMock()
with (
mock.patch.object(
BECGuiClient, "launcher", new_callable=mock.PropertyMock
) as launcher_prop,
mock.patch("bec_widgets.cli.client_utils.os.killpg") as killpg,
):
launcher_prop.return_value = launcher
gui.kill_server()
launcher._run_rpc.assert_called_once_with(
"system.shutdown", wait_for_rpc_response=True, timeout=GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT
)
process.wait.assert_called_once_with(timeout=5)
killpg.assert_not_called()
assert gui._process is None
assert gui._process_output_processing_thread is None
def test_client_utils_kill_server_kills_process_group_after_timeout():
gui = BECGuiClient()
gui._client = mock.MagicMock()
process = mock.MagicMock(pid=123, stdout=None, stderr=None, args=["bec-gui-server"])
process.poll.return_value = None
process.wait.side_effect = [subprocess.TimeoutExpired(cmd="bec-gui-server", timeout=10), None]
gui._process = process
with (
mock.patch.object(gui, "_request_server_shutdown", return_value=False),
mock.patch("bec_widgets.cli.client_utils.os.getpgid", return_value=123),
mock.patch("bec_widgets.cli.client_utils.os.killpg") as killpg,
mock.patch("bec_widgets.cli.client_utils.subprocess.run") as run,
):
run.return_value.stdout = "PID PPID PGID STAT COMMAND\n123 1 123 S bec-gui-server"
gui.kill_server()
assert killpg.call_args_list == [mock.call(123, signal.SIGTERM), mock.call(123, signal.SIGKILL)]
assert process.wait.call_args_list == [mock.call(timeout=10), mock.call(timeout=10)]
run.assert_called_once_with(
["ps", "-o", "pid,ppid,pgid,stat,command", "-g", "123"],
check=False,
capture_output=True,
text=True,
timeout=2,
)
def test_join_process_output_thread_signals_reader_before_closing_streams():
process = mock.MagicMock(pid=123, args=["bec-gui-server"])
process.stdout = mock.MagicMock()
process.stderr = mock.MagicMock()
thread = mock.MagicMock()
stop_event = mock.MagicMock()
setattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, stop_event)
thread.is_alive.side_effect = [True, False]
logger = mock.MagicMock()
_join_process_output_thread(process, thread, logger)
assert thread.join.call_args_list == [mock.call(timeout=2), mock.call(timeout=2)]
stop_event.set.assert_called_once_with()
process.stdout.close.assert_called_once_with()
process.stderr.close.assert_called_once_with()
@@ -98,15 +98,11 @@ def test_waiting_display(update_dialog, qtbot):
mock_spinner_stop.assert_called_once()
def test_update_cycle(update_dialog):
def test_update_cycle(update_dialog, qtbot):
update = {"enabled": False, "readoutPriority": "baseline", "deviceTags": {"tag"}}
def _mock_send(action="update", config=None, wait_for_response=True, timeout_s=None):
device = update_dialog.client.device_manager.devices["test_device"]
device._config = {**device._config, **config["test_device"]} # type: ignore
update_dialog._q_threadpool = MagicMock()
update_dialog._q_threadpool.start.side_effect = lambda runnable: runnable.run()
update_dialog.client.device_manager.devices["test_device"]._config = config["test_device"] # type: ignore
update_dialog._config_helper.send_config_request = MagicMock(side_effect=_mock_send)
for item in update_dialog._form.enumerate_form_widgets():
@@ -115,7 +111,9 @@ def test_update_cycle(update_dialog):
assert update_dialog.updated_config() == update
update_dialog.apply()
update_dialog._q_threadpool.start.assert_called_once()
qtbot.waitUntil(
lambda: update_dialog._config_helper.send_config_request.call_count == 1, timeout=100
)
update_dialog._config_helper.send_config_request.assert_called_with(
action="update", config={"test_device": update}, wait_for_response=False
+14 -55
View File
@@ -4,9 +4,7 @@ import os
from unittest import mock
import pytest
from qtpy.QtCore import QObject
from qtpy.QtGui import QFontMetrics
from qtpy.QtWidgets import QWidget
import bec_widgets
from bec_widgets.applications.launch_window import START_EMPTY_PROFILE_OPTION, LaunchWindow
@@ -18,28 +16,6 @@ from .client_mocks import mocked_client
base_path = os.path.dirname(bec_widgets.__file__)
def _launcher_child_connection(launcher: LaunchWindow, name: str) -> QObject:
connection = QObject(parent=launcher)
connection.gui_id = f"{launcher.gui_id}:{name}"
connection.setObjectName(name)
return connection
def _top_level_connection(qtbot, name: str) -> QWidget:
connection = QWidget()
connection.gui_id = name
connection.setObjectName(name)
qtbot.addWidget(connection)
return connection
def _unparented_connection(name: str) -> QObject:
connection = QObject()
connection.gui_id = name
connection.setObjectName(name)
return connection
@pytest.fixture
def bec_launch_window(qtbot, mocked_client):
widget = LaunchWindow(client=mocked_client)
@@ -141,20 +117,20 @@ def test_open_dock_area_with_start_empty_option_calls_launch(bec_launch_window):
(["launcher", "dock_area", "scan_progress_simple", "scan_progress_full"], False),
(
["launcher", "dock_area", "scan_progress_simple", "scan_progress_full", "hover_widget"],
False,
True,
),
(["launcher", "external_window"], True),
],
)
def test_gui_server_turns_off_the_lights(bec_launch_window, qtbot, connection_names, hide):
def test_gui_server_turns_off_the_lights(bec_launch_window, connection_names, hide):
connections = {}
for name in connection_names:
conn = mock.MagicMock()
if name == "hover_widget":
conn = _unparented_connection("HoverWidget")
elif name == "external_window":
conn = _top_level_connection(qtbot, "external_window")
conn.parent.return_value = None
conn.objectName.return_value = "HoverWidget"
else:
conn = _launcher_child_connection(bec_launch_window, name)
conn.parent.return_value = mock.MagicMock()
conn.objectName.return_value = bec_launch_window.objectName()
connections[name] = conn
with (
mock.patch.object(bec_launch_window, "show") as mock_show,
@@ -177,23 +153,6 @@ def test_gui_server_turns_off_the_lights(bec_launch_window, qtbot, connection_na
mock_set_quit_on_last_window_closed.assert_called_once_with(True)
def test_launcher_detects_external_main_window(bec_launch_window, qtbot):
connection = _top_level_connection(qtbot, "BECMainWindowNoRPC")
assert bec_launch_window._has_external_window({"window": connection})
def test_launcher_logs_unparented_non_window_connection_once(bec_launch_window):
connection = _unparented_connection("HoverWidget")
with mock.patch("bec_widgets.applications.launch_window.logger.warning") as mock_warning:
bec_launch_window._turn_off_the_lights({"window": connection})
bec_launch_window._turn_off_the_lights({"window": connection})
mock_warning.assert_called_once()
assert "HoverWidget" in mock_warning.call_args.args[0]
@pytest.mark.parametrize(
"connection_names, close_called",
[
@@ -204,12 +163,11 @@ def test_launcher_logs_unparented_non_window_connection_once(bec_launch_window):
(["launcher", "dock_area", "scan_progress_simple", "scan_progress_full"], True),
(
["launcher", "dock_area", "scan_progress_simple", "scan_progress_full", "hover_widget"],
True,
False,
),
(["launcher", "external_window"], False),
],
)
def test_launch_window_closes(bec_launch_window, qtbot, connection_names, close_called):
def test_launch_window_closes(bec_launch_window, connection_names, close_called):
"""
Test that the close event is handled correctly based on the connections.
If there are no connections or only the launcher connection, the window should close.
@@ -217,12 +175,13 @@ def test_launch_window_closes(bec_launch_window, qtbot, connection_names, close_
"""
connections = {}
for name in connection_names:
conn = mock.MagicMock()
if name == "hover_widget":
conn = _unparented_connection("HoverWidget")
elif name == "external_window":
conn = _top_level_connection(qtbot, "external_window")
conn.parent.return_value = None
conn.objectName.return_value = "HoverWidget"
else:
conn = _launcher_child_connection(bec_launch_window, name)
conn.parent.return_value = mock.MagicMock()
conn.objectName.return_value = bec_launch_window.objectName()
connections[name] = conn
close_event = mock.MagicMock()
with mock.patch.object(
+32 -16
View File
@@ -2,7 +2,7 @@ from unittest import mock
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_lib.messages import VariableMessage
from bec_lib.messages import ScanQueueMessage
from qtpy.QtCore import Qt, QTimer
from qtpy.QtGui import QValidator
from qtpy.QtWidgets import QPushButton
@@ -34,11 +34,15 @@ class PositionerWithoutPrecision(Positioner):
def positioner_box(qtbot, mocked_client):
"""Fixture for PositionerBox widget"""
with mock.patch(
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_base.PositionerBoxBase._check_device_is_valid",
return_value=True,
):
db = create_widget(qtbot, PositionerBox, device="samx", client=mocked_client)
yield db
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_base.uuid.uuid4"
) as mock_uuid:
mock_uuid.return_value = "fake_uuid"
with mock.patch(
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_base.PositionerBoxBase._check_device_is_valid",
return_value=True,
):
db = create_widget(qtbot, PositionerBox, device="samx", client=mocked_client)
yield db
def test_positioner_box(positioner_box):
@@ -85,8 +89,16 @@ def test_positioner_box_on_stop(positioner_box):
"""Test on stop button"""
with mock.patch.object(positioner_box.client.connector, "send") as mock_send:
positioner_box.on_stop()
msg = VariableMessage(value=["samx"])
mock_send.assert_called_once_with(MessageEndpoints.stop_devices(), msg)
params = {"device": "samx", "rpc_id": "fake_uuid", "func": "stop", "args": [], "kwargs": {}}
msg = ScanQueueMessage(
scan_type="device_rpc",
parameter=params,
queue="emergency",
metadata={"RID": "fake_uuid", "response": False},
)
mock_send.assert_called_once_with(
MessageEndpoints.scan_queue_request(positioner_box.client.username), msg
)
def test_positioner_box_setpoint_change(positioner_box):
@@ -127,15 +139,19 @@ def test_positioner_control_line(qtbot, mocked_client):
Inherits from PositionerBox, but the layout is changed. Check dimensions only
"""
with mock.patch(
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box.positioner_box.PositionerBox._check_device_is_valid",
return_value=True,
):
db = PositionerControlLine(device="samx", client=mocked_client)
qtbot.addWidget(db)
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_base.uuid.uuid4"
) as mock_uuid:
mock_uuid.return_value = "fake_uuid"
with mock.patch(
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box.positioner_box.PositionerBox._check_device_is_valid",
return_value=True,
):
db = PositionerControlLine(device="samx", client=mocked_client)
qtbot.addWidget(db)
assert db.ui.device_box.height() == db.height()
assert db.ui.device_box.height() >= db.dimensions[0]
assert db.ui.device_box.width() == 600
assert db.ui.device_box.height() == db.height()
assert db.ui.device_box.height() >= db.dimensions[0]
assert db.ui.device_box.width() == 600
def test_positioner_box_open_dialog_selection(qtbot, positioner_box):
+11 -17
View File
@@ -1,8 +1,6 @@
from unittest import mock
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_lib.messages import VariableMessage
from bec_widgets.widgets.control.device_control.positioner_box import PositionerBox2D
@@ -14,13 +12,17 @@ from .conftest import create_widget
def positioner_box_2d(qtbot, mocked_client):
"""Fixture for PositionerBox widget"""
with mock.patch(
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_base.PositionerBoxBase._check_device_is_valid",
return_value=True,
):
db = create_widget(
qtbot, PositionerBox2D, device_hor="samx", device_ver="samy", client=mocked_client
)
yield db
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_base.uuid.uuid4"
) as mock_uuid:
mock_uuid.return_value = "fake_uuid"
with mock.patch(
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_base.PositionerBoxBase._check_device_is_valid",
return_value=True,
):
db = create_widget(
qtbot, PositionerBox2D, device_hor="samx", device_ver="samy", client=mocked_client
)
yield db
def test_positioner_box_2d(positioner_box_2d):
@@ -80,14 +82,6 @@ def test_positioner_box_setpoint_changes(positioner_box_2d: PositionerBox2D):
mock_move.assert_called_once_with(100, relative=False)
def test_positioner_box_2d_on_stop(positioner_box_2d: PositionerBox2D):
"""Stop button sends both positioners to the immediate stop endpoint."""
with mock.patch.object(positioner_box_2d.client.connector, "send") as mock_send:
positioner_box_2d.on_stop()
msg = VariableMessage(value=["samx", "samy"])
mock_send.assert_called_once_with(MessageEndpoints.stop_devices(), msg)
def _hor_buttons(widget: PositionerBox2D):
return [
widget.ui.tweak_increase_hor,
-32
View File
@@ -3,12 +3,10 @@ from unittest.mock import MagicMock
import pytest
from bec_lib.device import DeviceBaseWithConfig, Signal
from bec_widgets.cli.rpc import rpc_base as rpc_base_module
from bec_widgets.cli.rpc.rpc_base import (
DeletedWidgetError,
RPCBase,
RPCReference,
RPCResponseTimeoutError,
_transform_args_kwargs,
)
@@ -53,33 +51,3 @@ def test_transform_args_kwargs():
)
assert args == ("full name", "short name", "string_arg", "full name")
assert kwargs == {"a": "full name", "b": "short name", "c": "string_arg", "d": "full name"}
def test_run_rpc_logs_response_timeout(monkeypatch):
rpc = RPCBase(gui_id="progress_widget", object_name="progressbar")
rpc._rpc_timeout = 0
rpc._client = MagicMock()
info_mock = MagicMock()
error_mock = MagicMock()
monkeypatch.setattr(rpc_base_module.logger, "info", info_mock)
monkeypatch.setattr(rpc_base_module.logger, "error", error_mock)
with pytest.raises(RPCResponseTimeoutError):
rpc._run_rpc("set_value", 42, precision=2, timeout=0)
publish_msg = rpc._client.connector.set_and_publish.call_args.args[1]
assert publish_msg.metadata["method"] == "set_value"
assert publish_msg.metadata["target_gui_id"] == "progress_widget"
assert publish_msg.metadata["object_name"] == "progressbar"
assert publish_msg.metadata["timeout"] == 0
assert publish_msg.metadata["deadline"] == publish_msg.metadata["sent_at"]
assert info_mock.call_count == 1
info_message = info_mock.call_args.args[0]
error_mock.assert_called_once()
error_message = error_mock.call_args.args[0]
assert "GUI RPC response timeout" in error_message
assert "method=set_value" in error_message
assert "target_gui_id=progress_widget" in error_message
assert "object_name=progressbar" in error_message
assert "timeout=0" in error_message
+12 -138
View File
@@ -1,13 +1,11 @@
import argparse
from unittest.mock import MagicMock, patch
from unittest.mock import patch
import pytest
from bec_lib.service_config import ServiceConfig
from qtpy.QtWidgets import QWidget
from bec_widgets.applications import companion_app as companion_app_module
from bec_widgets.applications.companion_app import GUIServer
from bec_widgets.utils import rpc_server as rpc_server_module
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.rpc_server import RegistryNotReadyError, RPCServer, SingleshotRPCRepeat
@@ -60,68 +58,6 @@ def test_gui_server_get_service_config(gui_server):
assert gui_server._get_service_config().config == ServiceConfig().config
def test_gui_server_signal_shutdown_closes_widgets_and_quits_app(gui_server):
widget = MagicMock()
gui_server.app = MagicMock()
gui_server.app.topLevelWidgets.return_value = [widget]
gui_server.request_shutdown()
widget.close.assert_called_once()
gui_server.app.quit.assert_called_once()
def test_gui_server_shutdown_is_idempotent(gui_server):
gui_server.launcher_window = MagicMock()
gui_server.dispatcher = MagicMock()
with (
patch.object(companion_app_module.shiboken6, "isValid", return_value=True),
patch.object(companion_app_module.pylsp_server, "is_running", return_value=False),
):
gui_server.shutdown()
gui_server.shutdown()
gui_server.launcher_window.close.assert_called_once()
gui_server.launcher_window.deleteLater.assert_called_once()
gui_server.dispatcher.stop_cli_server.assert_called_once()
gui_server.dispatcher.disconnect_all.assert_called_once()
def test_rpc_server_system_capabilities_include_shutdown(rpc_server):
assert rpc_server.run_system_rpc("system.list_capabilities", [], {}) == {
"system.launch_dock_area": True,
"system.shutdown": True,
}
def test_rpc_server_system_shutdown_requests_gui_server_shutdown(rpc_server, qapp):
gui_server = MagicMock()
qapp.gui_server = gui_server
rpc_server.run_system_rpc("system.shutdown", [], {})
qapp.processEvents()
gui_server.request_shutdown.assert_called_once()
del qapp.gui_server
def test_on_rpc_update_system_shutdown_sends_response_before_return(rpc_server):
order = []
rpc_server.run_system_rpc = MagicMock(side_effect=lambda *_args: order.append("shutdown"))
rpc_server.send_response = MagicMock(side_effect=lambda *_args: order.append("response"))
rpc_server.serialize_result_and_send = MagicMock()
rpc_server.on_rpc_update(
{"action": "system.shutdown", "parameter": {"args": [], "kwargs": {}}},
{"request_id": "shutdown-request", "sent_at": 1.0, "deadline": 10.0, "timeout": 2},
)
assert order == ["shutdown", "response"]
rpc_server.send_response.assert_called_once_with("shutdown-request", True, {"result": None})
rpc_server.serialize_result_and_send.assert_not_called()
def test_singleshot_rpc_repeat_raises_on_repeated_singleshot(rpc_server):
"""
Test that a singleshot RPC method raises an error when called multiple times.
@@ -155,34 +91,22 @@ def test_serialize_result_and_send_with_singleshot_retry(rpc_server, qtbot, dumm
# Third call succeeds
return {"gui_id": dummy.gui_id, "success": True}
warning_mock = MagicMock()
# Patch serialize_object to control when it raises RegistryNotReadyError
with patch.object(rpc_server, "serialize_object", side_effect=serialize_side_effect):
with patch.object(rpc_server, "send_response") as mock_send_response:
with patch.object(rpc_server_module.logger, "warning", warning_mock):
# Start the serialization process
rpc_server._rpc_singleshot_repeats[request_id] = SingleshotRPCRepeat()
rpc_server.serialize_result_and_send(request_id, dummy)
# Start the serialization process
rpc_server._rpc_singleshot_repeats[request_id] = SingleshotRPCRepeat()
rpc_server.serialize_result_and_send(request_id, dummy)
# Verify that serialize_object was called 3 times
qtbot.waitUntil(lambda: call_count >= 3, timeout=5000)
# Verify that serialize_object was called 3 times
qtbot.waitUntil(lambda: call_count >= 3, timeout=5000)
# Verify that send_response was called with success
mock_send_response.assert_called_once()
args = mock_send_response.call_args[0]
assert args[0] == request_id
assert args[1] is True # accepted=True
assert "result" in args[2]
assert warning_mock.call_count == 2
warning_logs = "\n".join(call.args[0] for call in warning_mock.call_args_list)
assert "result serialization delayed; retrying" in warning_logs
assert "request_id=test_request_123" in warning_logs
assert "retry_delay_ms=100" in warning_logs
assert "accumulated_delay_ms=100" in warning_logs
assert "accumulated_delay_ms=200" in warning_logs
assert "max_delay_ms=2000" in warning_logs
# Verify that send_response was called with success
mock_send_response.assert_called_once()
args = mock_send_response.call_args[0]
assert args[0] == request_id
assert args[1] is True # accepted=True
assert "result" in args[2]
def test_serialize_result_and_send_max_delay_exceeded(rpc_server, qtbot, dummy_widget):
@@ -216,56 +140,6 @@ def test_serialize_result_and_send_max_delay_exceeded(rpc_server, qtbot, dummy_w
assert "Max delay exceeded" in args[2]["error"]
def test_send_response_logs_publish_status(rpc_server, monkeypatch):
info_mock = MagicMock()
error_mock = MagicMock()
monkeypatch.setattr(rpc_server_module.logger, "info", info_mock)
monkeypatch.setattr(rpc_server_module.logger, "error", error_mock)
with patch.object(rpc_server.client.connector, "set_and_publish") as publish_mock:
rpc_server.send_response("request-ok", True, {"result": None})
rpc_server.send_response("request-failed", False, {"error": "bad"})
assert publish_mock.call_count == 2
assert "request_id=request-ok" in info_mock.call_args.args[0]
assert "accepted=True" in info_mock.call_args.args[0]
assert "request_id=request-failed" in error_mock.call_args.args[0]
assert "accepted=False" in error_mock.call_args.args[0]
def test_on_rpc_update_logs_late_client_deadline(rpc_server, monkeypatch):
info_mock = MagicMock()
warning_mock = MagicMock()
monkeypatch.setattr(rpc_server_module.logger, "info", info_mock)
monkeypatch.setattr(rpc_server_module.logger, "warning", warning_mock)
rpc_server.rpc_register.get_rpc_by_id = MagicMock()
rpc_server.run_rpc = MagicMock(return_value=None)
rpc_server.serialize_result_and_send = MagicMock()
rpc_server.on_rpc_update(
{
"action": "set_value",
"parameter": {"args": [1], "kwargs": {"source": "test"}, "gui_id": "ring"},
},
{"request_id": "late-request", "timeout": 0.1, "sent_at": 1.0, "deadline": 1.1},
)
received_log = info_mock.call_args_list[0].args[0]
executed_log = info_mock.call_args_list[1].args[0]
warning_logs = "\n".join(call.args[0] for call in warning_mock.call_args_list)
assert "GUI RPC server received request" in received_log
assert "request_id=late-request" in received_log
assert "method=set_value" in received_log
assert "target_gui_id=ring" in received_log
assert "timeout=0.1" in received_log
assert "stale_on_receive=True" in received_log
assert "response_after_client_deadline=True" in executed_log
assert "received request after client timeout deadline" in warning_logs
assert "response is late for client timeout" in warning_logs
def test_run_rpc_delegates_to_rpc_content_class(rpc_server):
class Content:
USER_ACCESS = ["foo", "mode", "mode.setter"]