mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-06-11 07:38:54 +02:00
Compare commits
42 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| cf514387da | |||
| 20d73c7976 | |||
| b2e0b79210 | |||
| 1427c70cfb | |||
| 154ae6026a | |||
| 9f94ca7748 | |||
| 3796984182 | |||
| 8a180eaa7b | |||
| 4572760b56 | |||
| e42a9824cc | |||
| 2fb7fb2ff4 | |||
| c8275fcfd5 | |||
| 07515d24be | |||
| 859563abb3 | |||
| bd66afb98d | |||
| 8e1e282fac | |||
| 878745b99a | |||
| e41e60956b | |||
| ed68eb5ac6 | |||
| b119c5ad76 | |||
| 9a58dba414 | |||
| c9fc0a82b9 | |||
| 668b1bd9cd | |||
| 1a6c8bf30f | |||
| c346bd0f18 | |||
| 5f86e41a03 | |||
| f7a48b5f6a | |||
| b4beb274da | |||
| 80694d151f | |||
| f03a5d9e85 | |||
| 5e8f0e8083 | |||
| 9eb05416ab | |||
| ab6a1aecc1 | |||
| d99db7d042 | |||
| a976837cff | |||
| 56427a7f0c | |||
| c4d4b78846 | |||
| 2dc0227d38 | |||
| 2d8e1eed4d | |||
| 3b579e740f | |||
| b8740c9594 | |||
| d5bf10e216 |
+142
@@ -1,6 +1,148 @@
|
||||
# CHANGELOG
|
||||
|
||||
|
||||
## v3.13.5 (2026-06-02)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- Change prints into proper logs
|
||||
([`c8275fc`](https://github.com/bec-project/bec_widgets/commit/c8275fcfd5c920393df3aa201c32a632ac8086a5))
|
||||
|
||||
- **abort_button**: From __future__ import annotations
|
||||
([`3796984`](https://github.com/bec-project/bec_widgets/commit/37969841822c8c38c23a1d8fca8e38aec684957b))
|
||||
|
||||
- **client_utils**: Increase default rpc timeout to 60s
|
||||
([`07515d2`](https://github.com/bec-project/bec_widgets/commit/07515d24be6e930b1b40170fc710255914cb7454))
|
||||
|
||||
- **client_utils**: Stop output reader thread on shutdown
|
||||
([`4572760`](https://github.com/bec-project/bec_widgets/commit/4572760b56ca2ab6435db3a6a4ba0d270e9008d1))
|
||||
|
||||
- **companion_app**: Disable logging of bec_lib.scan_items on widget side
|
||||
([`bd66afb`](https://github.com/bec-project/bec_widgets/commit/bd66afb98dcb76ca87b0db1334df3c1af0a9dbad))
|
||||
|
||||
- **forms**: Gridlayout applied to widget which already has layout
|
||||
([`1427c70`](https://github.com/bec-project/bec_widgets/commit/1427c70cfb6f84bbced7f72ec5cfa55ac0b9b742))
|
||||
|
||||
- **launch_window**: Exclude launcher check for non-parented widgets for BECMainWindow
|
||||
([`ed68eb5`](https://github.com/bec-project/bec_widgets/commit/ed68eb5ac6b20cfc7ca2c0b91864dc54fb579499))
|
||||
|
||||
- **launcher**: Avoid orphan widgets detection and logging
|
||||
([`9f94ca7`](https://github.com/bec-project/bec_widgets/commit/9f94ca7748d73a30622ecbaef384f4bc73a3d2fb))
|
||||
|
||||
- **logging**: Removed args/kwargs from logging messages
|
||||
([`2fb7fb2`](https://github.com/bec-project/bec_widgets/commit/2fb7fb2ff487863c3bc931498496da74b25e52d8))
|
||||
|
||||
- **rpc**: Additional logs
|
||||
([`e41e609`](https://github.com/bec-project/bec_widgets/commit/e41e60956b54890b70b3390b981196c9477abd93))
|
||||
|
||||
- **rpc**: Client/server rpc handshake for shutdown
|
||||
([`8a180ea`](https://github.com/bec-project/bec_widgets/commit/8a180eaa7be5c1603d893cf3b50585f88f9b0c83))
|
||||
|
||||
- **rpc**: Log dispatcher receipt before qt callback
|
||||
([`878745b`](https://github.com/bec-project/bec_widgets/commit/878745b99ac1e22c0fbddecc294e599469a2adfe))
|
||||
|
||||
- **rpc**: More robust shutdown section with PID logging
|
||||
([`e42a982`](https://github.com/bec-project/bec_widgets/commit/e42a9824ccd54b71a3141aaf2aa4e02af6a13782))
|
||||
|
||||
- **rpc_server**: Log warning if rpc call is repeated
|
||||
([`859563a`](https://github.com/bec-project/bec_widgets/commit/859563abb3e94ff55886e72db3177522900a89b8))
|
||||
|
||||
### Refactoring
|
||||
|
||||
- **client_utils**: Simplify PID fetching
|
||||
([`154ae60`](https://github.com/bec-project/bec_widgets/commit/154ae6026a6471b7c1db42f7c2ff3dc7be4b4afb))
|
||||
|
||||
- **rpc**: Share logging helpers
|
||||
([`8e1e282`](https://github.com/bec-project/bec_widgets/commit/8e1e282fac22ab6f726049758306c7ca17af70eb))
|
||||
|
||||
|
||||
## v3.13.4 (2026-05-29)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- **positioner_box**: Fix STOP button
|
||||
([`9a58dba`](https://github.com/bec-project/bec_widgets/commit/9a58dba414d9eec32fd7de7fc64c97c38f020b84))
|
||||
|
||||
|
||||
## v3.13.3 (2026-05-22)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- **tests**: Rename description attribute to _description in FakeDevice
|
||||
([`668b1bd`](https://github.com/bec-project/bec_widgets/commit/668b1bd9cd158fc12cff2c340d7317f30a212121))
|
||||
|
||||
|
||||
## v3.13.2 (2026-05-22)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- **tests**: Rename description attribute to _description in FakePositioner
|
||||
([`c346bd0`](https://github.com/bec-project/bec_widgets/commit/c346bd0f18ce873ff5ca6c59150c9581c9edca8d))
|
||||
|
||||
|
||||
## v3.13.1 (2026-05-21)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- Use .show instead of .start
|
||||
([`b4beb27`](https://github.com/bec-project/bec_widgets/commit/b4beb274da745da618f9b37ec241cd0109c088f1))
|
||||
|
||||
- **gui**: Replace window.show() with window.raise_window() and add hide() method
|
||||
([`f7a48b5`](https://github.com/bec-project/bec_widgets/commit/f7a48b5f6a51d391dca26ca42d03bad4f278ff22))
|
||||
|
||||
|
||||
## v3.13.0 (2026-05-21)
|
||||
|
||||
### Features
|
||||
|
||||
- **rpc-base**: Set default RPC timeout and allow customization
|
||||
([`f03a5d9`](https://github.com/bec-project/bec_widgets/commit/f03a5d9e853bd62b8ec1bad1c1e112fe01befe70))
|
||||
|
||||
|
||||
## v3.12.2 (2026-05-21)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- **toggle**: Disable styling implemented
|
||||
([`9eb0541`](https://github.com/bec-project/bec_widgets/commit/9eb05416ab68dcb88732dca8974c665030d34e0b))
|
||||
|
||||
|
||||
## v3.12.1 (2026-05-21)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- **device_input**: Correct cleanup unsubscribe
|
||||
([`56427a7`](https://github.com/bec-project/bec_widgets/commit/56427a7f0c3a89fe847d415c8b45212e663434c4))
|
||||
|
||||
- **device_input**: Ensure callback is removed after cleanup
|
||||
([`d99db7d`](https://github.com/bec-project/bec_widgets/commit/d99db7d04208945b86a39d65022b211ba093caed))
|
||||
|
||||
- **signal_combobox**: Signature matched for update_signals_from_filters
|
||||
([`a976837`](https://github.com/bec-project/bec_widgets/commit/a976837cff612349f2a3f17900903c203bc3d250))
|
||||
|
||||
|
||||
## v3.12.0 (2026-05-20)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- **scan-control**: Filter out private scans from allowed scans
|
||||
([`2dc0227`](https://github.com/bec-project/bec_widgets/commit/2dc0227d38f0e217e252a5e5751bafd60363a5a4))
|
||||
|
||||
- **scan-control**: Hide hidden scan arguments
|
||||
([`2d8e1ee`](https://github.com/bec-project/bec_widgets/commit/2d8e1eed4d6503c42a38c8de910ddaa54132405d))
|
||||
|
||||
- **scan-control**: Reject unsupported scan input types
|
||||
([`3b579e7`](https://github.com/bec-project/bec_widgets/commit/3b579e740f36c60c3635681a9b2c35b518498f58))
|
||||
|
||||
- **scan-control**: Skip duplicate visible scan kwargs
|
||||
([`b8740c9`](https://github.com/bec-project/bec_widgets/commit/b8740c95941d36102f07a51d74a50e6f262a6646))
|
||||
|
||||
### Features
|
||||
|
||||
- Add support for new scan signatures including units
|
||||
([`d5bf10e`](https://github.com/bec-project/bec_widgets/commit/d5bf10e21682ae8270078c7858a036bafbabf10e))
|
||||
|
||||
|
||||
## v3.11.1 (2026-05-18)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
@@ -5,6 +5,7 @@ import json
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import traceback
|
||||
from contextlib import redirect_stderr, redirect_stdout
|
||||
|
||||
import darkdetect
|
||||
@@ -63,6 +64,7 @@ class GUIServer:
|
||||
self.app: QApplication | None = None
|
||||
self.launcher_window: LaunchWindow | None = None
|
||||
self.dispatcher: BECDispatcher | None = None
|
||||
self._shutdown_started = False
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
@@ -74,6 +76,7 @@ class GUIServer:
|
||||
bec_logger._stderr_log_level = bec_logger.LOGLEVEL.ERROR
|
||||
bec_logger._update_sinks()
|
||||
|
||||
bec_logger.disabled_modules = ["bec_lib.scan_items"]
|
||||
with redirect_stdout(SimpleFileLikeFromLogOutputFunc(logger.info)): # type: ignore
|
||||
with redirect_stderr(SimpleFileLikeFromLogOutputFunc(logger.error)): # type: ignore
|
||||
self._run()
|
||||
@@ -122,17 +125,8 @@ class GUIServer:
|
||||
self.app.aboutToQuit.connect(self.shutdown)
|
||||
self.app.setQuitOnLastWindowClosed(True)
|
||||
|
||||
def sigint_handler(*args):
|
||||
# display message, for people to let it terminate gracefully
|
||||
print("Caught SIGINT, exiting")
|
||||
# Widgets should be all closed.
|
||||
with RPCRegister.delayed_broadcast():
|
||||
for widget in QApplication.instance().topLevelWidgets(): # type: ignore
|
||||
widget.close()
|
||||
self.shutdown()
|
||||
|
||||
signal.signal(signal.SIGINT, sigint_handler)
|
||||
signal.signal(signal.SIGTERM, sigint_handler)
|
||||
signal.signal(signal.SIGINT, self.request_shutdown)
|
||||
signal.signal(signal.SIGTERM, self.request_shutdown)
|
||||
|
||||
sys.exit(self.app.exec())
|
||||
|
||||
@@ -149,16 +143,67 @@ class GUIServer:
|
||||
)
|
||||
self.app.setWindowIcon(icon)
|
||||
|
||||
def request_shutdown(self, signum=None, _frame=None):
|
||||
"""
|
||||
Request Qt application shutdown from an RPC call or OS signal.
|
||||
|
||||
Cleanup itself is handled by ``shutdown()``, which is connected to
|
||||
``QApplication.aboutToQuit``. Calling it directly here would run BEC/RPC
|
||||
teardown before Qt has processed the widget close events.
|
||||
"""
|
||||
signal_name = signal.Signals(signum).name if signum is not None else "shutdown"
|
||||
pid = os.getpid()
|
||||
if self.app is None:
|
||||
logger.info(f"Caught {signal_name}, shutting down GUI server pid={pid} without app")
|
||||
self.shutdown()
|
||||
return
|
||||
|
||||
widgets = [
|
||||
f"{widget.__class__.__name__}(objectName={widget.objectName()!r})"
|
||||
for widget in self.app.topLevelWidgets()
|
||||
]
|
||||
logger.info(
|
||||
f"Caught {signal_name}, requesting GUI server shutdown pid={pid} "
|
||||
f"top_level_widgets={widgets}"
|
||||
)
|
||||
with RPCRegister.delayed_broadcast():
|
||||
for widget in self.app.topLevelWidgets():
|
||||
widget.close()
|
||||
self.app.quit()
|
||||
|
||||
@staticmethod
|
||||
def _run_shutdown_step(step: str, callback):
|
||||
try:
|
||||
callback()
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
f"GUIServer shutdown step failed pid={os.getpid()} step={step}: {exc}\n"
|
||||
f"{traceback.format_exc()}"
|
||||
)
|
||||
|
||||
def shutdown(self):
|
||||
logger.info("Shutdown GUIServer", repr(self))
|
||||
if self.launcher_window and shiboken6.isValid(self.launcher_window):
|
||||
self.launcher_window.close()
|
||||
self.launcher_window.deleteLater()
|
||||
if pylsp_server.is_running():
|
||||
pylsp_server.stop()
|
||||
if self.dispatcher:
|
||||
self.dispatcher.stop_cli_server()
|
||||
self.dispatcher.disconnect_all()
|
||||
if self._shutdown_started:
|
||||
return
|
||||
self._shutdown_started = True
|
||||
logger.info(f"Shutdown GUIServer pid={os.getpid()} {repr(self)}")
|
||||
|
||||
def close_launcher_window():
|
||||
if self.launcher_window and shiboken6.isValid(self.launcher_window):
|
||||
self.launcher_window.close()
|
||||
self.launcher_window.deleteLater()
|
||||
|
||||
def stop_pylsp_server():
|
||||
if pylsp_server.is_running():
|
||||
pylsp_server.stop()
|
||||
|
||||
def stop_dispatcher():
|
||||
if self.dispatcher:
|
||||
self.dispatcher.stop_cli_server()
|
||||
self.dispatcher.disconnect_all()
|
||||
|
||||
self._run_shutdown_step("close_launcher_window", close_launcher_window)
|
||||
self._run_shutdown_step("stop_pylsp_server", stop_pylsp_server)
|
||||
self._run_shutdown_step("stop_dispatcher", stop_dispatcher)
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
@@ -207,6 +207,7 @@ class LaunchWindow(BECMainWindow):
|
||||
|
||||
self.app = QApplication.instance()
|
||||
self.tiles: dict[str, LaunchTile] = {}
|
||||
self._logged_unparented_connections: set[str] = set()
|
||||
# Track the smallest main‑label font size chosen so far
|
||||
self._min_main_label_pt: int | None = None
|
||||
|
||||
@@ -655,53 +656,83 @@ class LaunchWindow(BECMainWindow):
|
||||
super().showEvent(event)
|
||||
self.setFixedSize(self.size())
|
||||
|
||||
def _launcher_is_last_widget(self, connections: dict) -> bool:
|
||||
def _has_external_window(self, connections: dict) -> bool:
|
||||
"""
|
||||
Check if the launcher is the last widget in the application.
|
||||
Check if any registered non-launcher connection owns a top-level Qt window.
|
||||
"""
|
||||
|
||||
# get all parents of connections
|
||||
for connection in connections.values():
|
||||
try:
|
||||
parent = connection.parent()
|
||||
if parent is None and connection.objectName() != self.objectName():
|
||||
logger.info(
|
||||
f"Found non-launcher connection without parent: {connection.objectName()}"
|
||||
)
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting parent of connection: {e}")
|
||||
return False
|
||||
return True
|
||||
if self._connection_belongs_to_launcher(connection):
|
||||
continue
|
||||
if isinstance(connection, QWidget) and connection.isWindow():
|
||||
return True
|
||||
return False
|
||||
|
||||
def _log_unparented_connections(self, connections: dict) -> None:
|
||||
"""
|
||||
Log non-launcher RPC connections that remain without an active top-level window.
|
||||
"""
|
||||
for connection in connections.values():
|
||||
if self._connection_belongs_to_launcher(connection):
|
||||
continue
|
||||
if isinstance(connection, QWidget) and connection.isWindow():
|
||||
continue
|
||||
|
||||
connection_description = (
|
||||
f"type={type(connection).__name__} objectName={connection.objectName()!r} "
|
||||
f"gui_id={connection.gui_id!r}"
|
||||
)
|
||||
if connection_description in self._logged_unparented_connections:
|
||||
continue
|
||||
self._logged_unparented_connections.add(connection_description)
|
||||
logger.warning(
|
||||
"Registered non-launcher RPC connection has no active top-level window: "
|
||||
f"{connection_description}"
|
||||
)
|
||||
|
||||
def _connection_belongs_to_launcher(self, connection: QObject) -> bool:
|
||||
"""
|
||||
Check whether a registered connection is the launcher itself or part of its Qt hierarchy.
|
||||
"""
|
||||
if connection is self or connection.gui_id == self.gui_id:
|
||||
return True
|
||||
|
||||
parent = connection.parent()
|
||||
while parent is not None:
|
||||
if parent is self:
|
||||
return True
|
||||
parent = parent.parent()
|
||||
|
||||
return False
|
||||
|
||||
def _turn_off_the_lights(self, connections: dict):
|
||||
"""
|
||||
If there is only one connection remaining, it is the launcher, so we show it.
|
||||
Once the launcher is closed as the last window, we quit the application.
|
||||
"""
|
||||
if self._launcher_is_last_widget(connections):
|
||||
self.show()
|
||||
self.activateWindow()
|
||||
self.raise_()
|
||||
if self._has_external_window(connections):
|
||||
self.hide()
|
||||
if self.app:
|
||||
self.app.setQuitOnLastWindowClosed(True) # type: ignore
|
||||
self.app.setQuitOnLastWindowClosed(False) # type: ignore
|
||||
return
|
||||
|
||||
self.hide()
|
||||
self._log_unparented_connections(connections)
|
||||
self.show()
|
||||
self.activateWindow()
|
||||
self.raise_()
|
||||
if self.app:
|
||||
self.app.setQuitOnLastWindowClosed(False) # type: ignore
|
||||
self.app.setQuitOnLastWindowClosed(True) # type: ignore
|
||||
|
||||
def closeEvent(self, event):
|
||||
"""
|
||||
Close the launcher window.
|
||||
"""
|
||||
connections = self.register.list_all_connections()
|
||||
if self._launcher_is_last_widget(connections):
|
||||
event.accept()
|
||||
if self._has_external_window(connections):
|
||||
event.ignore()
|
||||
self.hide()
|
||||
return
|
||||
|
||||
event.ignore()
|
||||
self.hide()
|
||||
event.accept()
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
|
||||
+159
-11
@@ -5,6 +5,7 @@ from __future__ import annotations
|
||||
import json
|
||||
import os
|
||||
import select
|
||||
import signal
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
@@ -33,6 +34,12 @@ else:
|
||||
logger = bec_logger.logger
|
||||
|
||||
IGNORE_WIDGETS = ["LaunchWindow"]
|
||||
PROCESS_TERMINATION_TIMEOUT = 10
|
||||
PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT = 2
|
||||
PROCESS_OUTPUT_SELECT_TIMEOUT = 0.2
|
||||
GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT = 3
|
||||
GRACEFUL_SERVER_SHUTDOWN_TIMEOUT = 5
|
||||
OUTPUT_READER_STOP_EVENT_ATTR = "_bec_output_reader_stop_event"
|
||||
|
||||
RegistryState: TypeAlias = dict[
|
||||
Literal["gui_id", "name", "widget_class", "config", "__rpc__", "container_proxy"],
|
||||
@@ -53,14 +60,16 @@ def _filter_output(output: str) -> str:
|
||||
return output
|
||||
|
||||
|
||||
def _get_output(process, logger) -> None:
|
||||
def _get_output(process, logger, stop_event: threading.Event | None = None) -> None:
|
||||
log_func = {process.stdout: logger.debug, process.stderr: logger.info}
|
||||
stream_buffer = {process.stdout: [], process.stderr: []}
|
||||
try:
|
||||
os.set_blocking(process.stdout.fileno(), False)
|
||||
os.set_blocking(process.stderr.fileno(), False)
|
||||
while process.poll() is None:
|
||||
readylist, _, _ = select.select([process.stdout, process.stderr], [], [], 1)
|
||||
while process.poll() is None and not (stop_event and stop_event.is_set()):
|
||||
readylist, _, _ = select.select(
|
||||
[process.stdout, process.stderr], [], [], PROCESS_OUTPUT_SELECT_TIMEOUT
|
||||
)
|
||||
for stream in (process.stdout, process.stderr):
|
||||
buf = stream_buffer[stream]
|
||||
if stream in readylist:
|
||||
@@ -75,6 +84,95 @@ def _get_output(process, logger) -> None:
|
||||
logger.error(f"Error reading process output: {str(e)}")
|
||||
|
||||
|
||||
def _process_group_snapshot(process) -> str:
|
||||
try:
|
||||
pgid = os.getpgid(process.pid)
|
||||
except ProcessLookupError:
|
||||
return "Process group snapshot unavailable: process already exited"
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["ps", "-o", "pid,ppid,pgid,stat,command", "-g", str(pgid)],
|
||||
check=False,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=2,
|
||||
)
|
||||
except Exception as exc:
|
||||
return f"Process group snapshot unavailable: {exc}"
|
||||
output = result.stdout.strip()
|
||||
if not output:
|
||||
return f"Process group snapshot empty for pgid={pgid}"
|
||||
return output
|
||||
|
||||
|
||||
def _terminate_plot_process(process, logger, timeout: float = PROCESS_TERMINATION_TIMEOUT) -> None:
|
||||
if process.poll() is not None:
|
||||
return
|
||||
|
||||
process_info = f"pid={process.pid} command={process.args}"
|
||||
try:
|
||||
pgid = os.getpgid(process.pid)
|
||||
process_info = f"pid={process.pid} pgid={pgid} command={process.args}"
|
||||
logger.info(f"Terminating GUI process group {process_info}")
|
||||
os.killpg(pgid, signal.SIGTERM)
|
||||
except ProcessLookupError:
|
||||
process.wait(timeout=timeout)
|
||||
return
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to terminate GUI process group; terminating process only.")
|
||||
logger.info(f"GUI process termination failure details: {exc}. pid={process.pid}")
|
||||
process.terminate()
|
||||
|
||||
try:
|
||||
process.wait(timeout=timeout)
|
||||
return
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning(f"GUI process did not stop within {timeout}s; killing it.")
|
||||
logger.info(
|
||||
f"GUI process force-kill details: {process_info}\n"
|
||||
f"{_process_group_snapshot(process)}"
|
||||
)
|
||||
|
||||
try:
|
||||
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
|
||||
except ProcessLookupError as e:
|
||||
logger.error(f"Failed to kill GUI process group: {e}")
|
||||
process.wait(timeout=timeout)
|
||||
return
|
||||
process.wait(timeout=timeout)
|
||||
|
||||
|
||||
def _wait_for_process_exit(process, timeout: float) -> bool:
|
||||
try:
|
||||
process.wait(timeout=timeout)
|
||||
except subprocess.TimeoutExpired:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _join_process_output_thread(process, thread: threading.Thread | None, logger) -> None:
|
||||
if thread is None:
|
||||
return
|
||||
thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT)
|
||||
if not thread.is_alive():
|
||||
return
|
||||
|
||||
if stop_event := getattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, None):
|
||||
stop_event.set()
|
||||
|
||||
for stream in (process.stdout, process.stderr):
|
||||
if stream is None:
|
||||
continue
|
||||
try:
|
||||
stream.close()
|
||||
except OSError as e:
|
||||
logger.error(f"Failed to close stream {str(e)}")
|
||||
thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT)
|
||||
if thread.is_alive():
|
||||
logger.warning("GUI process output reader thread did not stop after process shutdown.")
|
||||
logger.info(f"GUI process output reader thread details: pid={process.pid}")
|
||||
|
||||
|
||||
def _start_plot_process(
|
||||
gui_id: str,
|
||||
gui_class_id: str,
|
||||
@@ -126,8 +224,14 @@ def _start_plot_process(
|
||||
if logger is None:
|
||||
process_output_processing_thread = None
|
||||
else:
|
||||
process_output_stop_event = threading.Event()
|
||||
process_output_processing_thread = threading.Thread(
|
||||
target=_get_output, args=(process, logger)
|
||||
target=_get_output, args=(process, logger, process_output_stop_event)
|
||||
)
|
||||
setattr(
|
||||
process_output_processing_thread,
|
||||
OUTPUT_READER_STOP_EVENT_ATTR,
|
||||
process_output_stop_event,
|
||||
)
|
||||
process_output_processing_thread.start()
|
||||
return process, process_output_processing_thread
|
||||
@@ -222,6 +326,7 @@ class BECGuiClient(RPCBase):
|
||||
self._ipython_registry: dict[str, RPCReference] = {}
|
||||
self.available_widgets = AvailableWidgetsNamespace()
|
||||
register_serializer_extension()
|
||||
self._rpc_timeout = 60
|
||||
|
||||
####################
|
||||
#### Client API ####
|
||||
@@ -232,6 +337,16 @@ class BECGuiClient(RPCBase):
|
||||
"""The launcher object."""
|
||||
return RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self, object_name="launcher")
|
||||
|
||||
def set_rpc_timeout(self, timeout: float):
|
||||
"""Set the timeout for RPC calls to the GUI server.
|
||||
|
||||
Args:
|
||||
timeout(float): The timeout in seconds.
|
||||
"""
|
||||
if not isinstance(timeout, (int, float)) or timeout < 0:
|
||||
raise ValueError("Timeout must be a non-negative number.")
|
||||
self._rpc_timeout = timeout
|
||||
|
||||
def _safe_register_stream(self, endpoint: EndpointInfo, cb: Callable, **kwargs):
|
||||
"""Check if already registered for registration in idempotent functions."""
|
||||
if not self._client.connector.any_stream_is_registered(endpoint, cb=cb):
|
||||
@@ -358,7 +473,7 @@ class BECGuiClient(RPCBase):
|
||||
)
|
||||
|
||||
if not self._check_if_server_is_alive():
|
||||
self.start(wait=True)
|
||||
self.show(wait=True)
|
||||
if wait:
|
||||
with wait_for_server(self):
|
||||
return self._new_impl(
|
||||
@@ -454,11 +569,13 @@ class BECGuiClient(RPCBase):
|
||||
|
||||
if self._process:
|
||||
logger.success("Stopping GUI...")
|
||||
self._process.terminate()
|
||||
if self._process_output_processing_thread:
|
||||
self._process_output_processing_thread.join()
|
||||
self._process.wait()
|
||||
if not self._request_server_shutdown():
|
||||
_terminate_plot_process(self._process, logger)
|
||||
_join_process_output_thread(
|
||||
self._process, self._process_output_processing_thread, logger
|
||||
)
|
||||
self._process = None
|
||||
self._process_output_processing_thread = None
|
||||
|
||||
# Unregister the registry state
|
||||
self._client.connector.unregister(
|
||||
@@ -477,6 +594,37 @@ class BECGuiClient(RPCBase):
|
||||
#### Private methods ####
|
||||
#########################
|
||||
|
||||
def _request_server_shutdown(self) -> bool:
|
||||
if self._process is None or self._process.poll() is not None:
|
||||
return True
|
||||
process_details = f"pid={self._process.pid} command={self._process.args}"
|
||||
logger.info(f"Requesting graceful GUI shutdown {process_details}")
|
||||
try:
|
||||
self.launcher._run_rpc( # pylint: disable=protected-access
|
||||
"system.shutdown",
|
||||
wait_for_rpc_response=True,
|
||||
timeout=GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Could not confirm graceful GUI shutdown via RPC; "
|
||||
"falling back to process termination."
|
||||
)
|
||||
logger.info(f"Graceful GUI shutdown RPC failure details: {exc}. {process_details}")
|
||||
return False
|
||||
if _wait_for_process_exit(self._process, GRACEFUL_SERVER_SHUTDOWN_TIMEOUT):
|
||||
logger.info(f"GUI server exited after graceful shutdown {process_details}")
|
||||
return True
|
||||
logger.warning(
|
||||
"GUI server did not exit after graceful shutdown request; "
|
||||
"falling back to process termination."
|
||||
)
|
||||
logger.info(
|
||||
f"Graceful GUI shutdown timeout details: {process_details}\n"
|
||||
f"{_process_group_snapshot(self._process)}"
|
||||
)
|
||||
return False
|
||||
|
||||
def _check_if_server_is_alive(self):
|
||||
"""Checks if the process is alive"""
|
||||
if self._process is None:
|
||||
@@ -550,7 +698,7 @@ class BECGuiClient(RPCBase):
|
||||
if self.launcher and len(self._top_level) == 0:
|
||||
self.launcher._run_rpc("show") # pylint: disable=protected-access
|
||||
for window in self._top_level.values():
|
||||
window.show()
|
||||
window.raise_window()
|
||||
|
||||
def _show_all(self):
|
||||
with wait_for_server(self):
|
||||
@@ -569,7 +717,7 @@ class BECGuiClient(RPCBase):
|
||||
if self.launcher and len(self._top_level) == 0:
|
||||
self.launcher._run_rpc("raise") # pylint: disable=protected-access
|
||||
for window in self._top_level.values():
|
||||
window._run_rpc("raise") # type: ignore[attr-defined]
|
||||
window.raise_window()
|
||||
|
||||
def _raise_all(self):
|
||||
with wait_for_server(self):
|
||||
|
||||
@@ -2,6 +2,7 @@ from __future__ import annotations
|
||||
|
||||
import inspect
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from functools import wraps
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
@@ -9,6 +10,7 @@ from typing import TYPE_CHECKING, Any, cast
|
||||
from bec_lib.client import BECClient
|
||||
from bec_lib.device import DeviceBaseWithConfig
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
@@ -24,6 +26,9 @@ else:
|
||||
|
||||
# pylint: disable=protected-access
|
||||
|
||||
_DEFAULT_RPC_TIMEOUT = object()
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
def _name_arg(arg):
|
||||
if isinstance(arg, DeviceBaseWithConfig):
|
||||
@@ -154,6 +159,7 @@ class RPCReference:
|
||||
|
||||
|
||||
class RPCBase:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
gui_id: str | None = None,
|
||||
@@ -207,12 +213,16 @@ class RPCBase:
|
||||
# Use explicit call to ensure action name is 'raise' (not 'raise_')
|
||||
return self._run_rpc("raise")
|
||||
|
||||
def hide(self):
|
||||
"""Hide this widget (or its container)."""
|
||||
return self._run_rpc("hide")
|
||||
|
||||
def _run_rpc(
|
||||
self,
|
||||
method,
|
||||
*args,
|
||||
wait_for_rpc_response=True,
|
||||
timeout=5,
|
||||
wait_for_rpc_response: bool = True,
|
||||
timeout: float | None | object = _DEFAULT_RPC_TIMEOUT,
|
||||
gui_id: str | None = None,
|
||||
**kwargs,
|
||||
) -> Any:
|
||||
@@ -223,13 +233,16 @@ class RPCBase:
|
||||
method: The method to call.
|
||||
args: The arguments to pass to the method.
|
||||
wait_for_rpc_response: Whether to wait for the RPC response.
|
||||
timeout: The timeout for the RPC response.
|
||||
timeout: The timeout for the RPC response. If omitted, the client's default RPC
|
||||
timeout is used. If explicitly set to None, wait indefinitely.
|
||||
gui_id: The GUI ID to use for the RPC call. If None, the default GUI ID is used.
|
||||
kwargs: The keyword arguments to pass to the method.
|
||||
|
||||
Returns:
|
||||
The result of the RPC call.
|
||||
"""
|
||||
if timeout is _DEFAULT_RPC_TIMEOUT:
|
||||
timeout = self._root._rpc_timeout
|
||||
if method in ["show", "hide", "raise"] and gui_id is None:
|
||||
obj = self._root._server_registry.get(self._gui_id)
|
||||
if obj is None:
|
||||
@@ -251,12 +264,39 @@ class RPCBase:
|
||||
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
|
||||
)
|
||||
|
||||
target_gui_id = gui_id or self._gui_id
|
||||
sent_at = time.time()
|
||||
deadline = sent_at + timeout if timeout is not None else None
|
||||
rpc_msg.metadata.update(
|
||||
{
|
||||
"method": method,
|
||||
"receiver": receiver,
|
||||
"target_gui_id": target_gui_id,
|
||||
"object_name": self.object_name,
|
||||
"wait_for_response": wait_for_rpc_response,
|
||||
"timeout": timeout,
|
||||
"sent_at": sent_at,
|
||||
"deadline": deadline,
|
||||
}
|
||||
)
|
||||
logger.info(
|
||||
"Sending GUI RPC request "
|
||||
f"request_id={request_id} method={method} receiver={receiver} "
|
||||
f"target_gui_id={target_gui_id} object_name={self.object_name} "
|
||||
f"wait_for_response={wait_for_rpc_response} timeout={timeout}"
|
||||
)
|
||||
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
|
||||
|
||||
if wait_for_rpc_response:
|
||||
try:
|
||||
finished = self._msg_wait_event.wait(timeout)
|
||||
if not finished:
|
||||
logger.error(
|
||||
"GUI RPC response timeout "
|
||||
f"request_id={request_id} method={method} receiver={receiver} "
|
||||
f"target_gui_id={target_gui_id} object_name={self.object_name} "
|
||||
f"timeout={timeout}"
|
||||
)
|
||||
raise RPCResponseTimeoutError(request_id, timeout)
|
||||
finally:
|
||||
self._msg_wait_event.clear()
|
||||
@@ -268,6 +308,12 @@ class RPCBase:
|
||||
# the _on_rpc_response method
|
||||
assert isinstance(self._rpc_response, messages.RequestResponseMessage)
|
||||
|
||||
logger.info(
|
||||
"Received GUI RPC response "
|
||||
f"request_id={request_id} method={method} receiver={receiver} "
|
||||
f"target_gui_id={target_gui_id} object_name={self.object_name} "
|
||||
f"accepted={self._rpc_response.accepted}"
|
||||
)
|
||||
if not self._rpc_response.accepted:
|
||||
raise ValueError(self._rpc_response.message["error"])
|
||||
msg_result = self._rpc_response.message.get("result")
|
||||
@@ -276,6 +322,7 @@ class RPCBase:
|
||||
|
||||
def _on_rpc_response(self, msg_obj: MessageObject) -> None:
|
||||
msg = cast(messages.RequestResponseMessage, msg_obj.value)
|
||||
logger.debug(f"GUI RPC response callback received: {msg}")
|
||||
self._rpc_response = msg
|
||||
self._msg_wait_event.set()
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ class FakeDevice(BECDevice):
|
||||
super().__init__(name=name)
|
||||
self._enabled = enabled
|
||||
self.signals = {self.name: {"value": 1.0}}
|
||||
self.description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
|
||||
self._description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
|
||||
self._readout_priority = readout_priority
|
||||
self._config = {
|
||||
"readoutPriority": "baseline",
|
||||
@@ -74,7 +74,7 @@ class FakeDevice(BECDevice):
|
||||
Returns:
|
||||
dict: Description of the device
|
||||
"""
|
||||
return self.description
|
||||
return self._description
|
||||
|
||||
|
||||
class FakePositioner(BECPositioner):
|
||||
@@ -96,7 +96,7 @@ class FakePositioner(BECPositioner):
|
||||
self._limits = limits
|
||||
self._readout_priority = readout_priority
|
||||
self.signals = {self.name: {"value": 1.0}}
|
||||
self.description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
|
||||
self._description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
|
||||
self._config = {
|
||||
"readoutPriority": "baseline",
|
||||
"deviceClass": "ophyd_devices.SimPositioner",
|
||||
@@ -176,7 +176,7 @@ class FakePositioner(BECPositioner):
|
||||
Returns:
|
||||
dict: Description of the device
|
||||
"""
|
||||
return self.description
|
||||
return self._description
|
||||
|
||||
@property
|
||||
def precision(self):
|
||||
|
||||
@@ -3,8 +3,9 @@ from __future__ import annotations
|
||||
import collections
|
||||
import random
|
||||
import string
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from typing import TYPE_CHECKING, DefaultDict, Hashable, Union
|
||||
from typing import TYPE_CHECKING, Any, DefaultDict, Hashable, Union
|
||||
|
||||
import louie
|
||||
import redis
|
||||
@@ -15,6 +16,7 @@ from bec_lib.service_config import ServiceConfig
|
||||
from qtpy.QtCore import QObject
|
||||
from qtpy.QtCore import Signal as pyqtSignal
|
||||
|
||||
from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed
|
||||
from bec_widgets.utils.serialization import register_serializer_extension
|
||||
|
||||
logger = bec_logger.logger
|
||||
@@ -25,6 +27,39 @@ if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_widgets.utils.rpc_server import RPCServer
|
||||
|
||||
|
||||
def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None:
|
||||
if not isinstance(msg_content, dict) or not isinstance(metadata, dict):
|
||||
return
|
||||
request_id = metadata.get("request_id")
|
||||
method = msg_content.get("action")
|
||||
parameter = msg_content.get("parameter")
|
||||
if request_id is None or method is None or not isinstance(parameter, dict):
|
||||
return
|
||||
|
||||
dispatch_received_at = time.time()
|
||||
sent_at = metadata.get("sent_at")
|
||||
deadline = metadata.get("deadline")
|
||||
timeout = metadata.get("timeout")
|
||||
dispatch_latency = elapsed_seconds(sent_at, dispatch_received_at)
|
||||
stale_on_dispatch = deadline is not None and dispatch_received_at > deadline
|
||||
target_gui_id = parameter.get("gui_id") or metadata.get("target_gui_id")
|
||||
|
||||
logger.info(
|
||||
"GUI RPC dispatcher received request before Qt callback emit "
|
||||
f"request_id={request_id} method={method} receiver={metadata.get('receiver')} "
|
||||
f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} "
|
||||
f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)} "
|
||||
f"stale_on_dispatch={stale_on_dispatch}"
|
||||
)
|
||||
if stale_on_dispatch:
|
||||
logger.warning(
|
||||
"GUI RPC dispatcher received request after client timeout deadline "
|
||||
f"request_id={request_id} method={method} receiver={metadata.get('receiver')} "
|
||||
f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} "
|
||||
f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)}"
|
||||
)
|
||||
|
||||
|
||||
class QtThreadSafeCallback(QObject):
|
||||
"""QtThreadSafeCallback is a wrapper around a callback function to make it thread-safe for Qt."""
|
||||
|
||||
@@ -88,10 +123,12 @@ class QtRedisConnector(RedisConnector):
|
||||
|
||||
# we can notice kwargs are lost when passed to Qt slot
|
||||
metadata = msg.metadata
|
||||
_log_rpc_dispatcher_receive(msg.content, metadata)
|
||||
cb(msg.content, metadata)
|
||||
else:
|
||||
# from stream
|
||||
msg = msg["data"]
|
||||
_log_rpc_dispatcher_receive(msg.content, msg.metadata)
|
||||
cb(msg.content, msg.metadata)
|
||||
|
||||
|
||||
|
||||
@@ -331,32 +331,34 @@ class BECWidget(BECConnector):
|
||||
# All widgets need to call super().cleanup() in their cleanup method
|
||||
logger.info(f"Registry cleanup for widget {self.__class__.__name__}")
|
||||
self.rpc_register.remove_rpc(self)
|
||||
children = self.findChildren(BECWidget)
|
||||
for child in children:
|
||||
if not shiboken6.isValid(child):
|
||||
# If the child is not valid, it means it has already been deleted
|
||||
continue
|
||||
child.close()
|
||||
child.deleteLater()
|
||||
children = self.findChildren(BECWidget)
|
||||
for child in children:
|
||||
if not shiboken6.isValid(child):
|
||||
# If the child is not valid, it means it has already been deleted
|
||||
continue
|
||||
child.close()
|
||||
child.deleteLater()
|
||||
|
||||
# Tear down busy overlay explicitly to stop spinner and remove filters
|
||||
overlay = getattr(self, "_busy_overlay", None)
|
||||
if overlay is not None and shiboken6.isValid(overlay):
|
||||
try:
|
||||
overlay.hide()
|
||||
filt = getattr(overlay, "_filter", None)
|
||||
if filt is not None and shiboken6.isValid(filt):
|
||||
try:
|
||||
self.removeEventFilter(filt)
|
||||
except Exception as exc:
|
||||
logger.warning(f"Failed to remove event filter from busy overlay: {exc}")
|
||||
# Tear down busy overlay explicitly to stop spinner and remove filters
|
||||
overlay = getattr(self, "_busy_overlay", None)
|
||||
if overlay is not None and shiboken6.isValid(overlay):
|
||||
try:
|
||||
overlay.hide()
|
||||
filt = getattr(overlay, "_filter", None)
|
||||
if filt is not None and shiboken6.isValid(filt):
|
||||
try:
|
||||
self.removeEventFilter(filt)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
f"Failed to remove event filter from busy overlay: {exc}"
|
||||
)
|
||||
|
||||
# Cleanup the overlay widget. This will call cleanup on the custom widget if present.
|
||||
# Cleanup the overlay widget. This will call cleanup on the custom widget if present.
|
||||
|
||||
overlay.cleanup()
|
||||
overlay.deleteLater()
|
||||
except Exception as exc:
|
||||
logger.warning(f"Failed to delete busy overlay: {exc}")
|
||||
overlay.cleanup()
|
||||
overlay.deleteLater()
|
||||
except Exception as exc:
|
||||
logger.warning(f"Failed to delete busy overlay: {exc}")
|
||||
|
||||
def closeEvent(self, event):
|
||||
"""Wrap the close even to ensure the rpc_register is cleaned up."""
|
||||
|
||||
@@ -150,7 +150,7 @@ class TypedForm(BECWidget, QWidget):
|
||||
self.adjustSize()
|
||||
|
||||
def _new_grid_layout(self):
|
||||
new_grid = QGridLayout(self)
|
||||
new_grid = QGridLayout()
|
||||
new_grid.setContentsMargins(0, 0, 0, 0)
|
||||
return new_grid
|
||||
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
def elapsed_seconds(start: float | int | None, stop: float) -> float | None:
|
||||
if start is None:
|
||||
return None
|
||||
try:
|
||||
return max(0.0, stop - float(start))
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
def format_elapsed(elapsed: float | None) -> str:
|
||||
if elapsed is None:
|
||||
return "unknown"
|
||||
return f"{elapsed:.3f}"
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import functools
|
||||
import time
|
||||
import traceback
|
||||
import types
|
||||
from contextlib import contextmanager
|
||||
@@ -11,13 +12,14 @@ from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.utils.import_utils import lazy_import
|
||||
from qtpy.QtCore import Qt, QTimer
|
||||
from qtpy.QtWidgets import QWidget
|
||||
from qtpy.QtWidgets import QApplication, QWidget
|
||||
from redis.exceptions import RedisError
|
||||
|
||||
from bec_widgets.utils.bec_connector import BECConnector
|
||||
from bec_widgets.utils.bec_dispatcher import BECDispatcher
|
||||
from bec_widgets.utils.container_utils import WidgetContainerUtils
|
||||
from bec_widgets.utils.error_popups import ErrorPopupUtility
|
||||
from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed
|
||||
from bec_widgets.utils.rpc_register import RPCRegister
|
||||
from bec_widgets.utils.screen_utils import apply_window_geometry
|
||||
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
|
||||
@@ -115,27 +117,107 @@ class RPCServer:
|
||||
if request_id is None:
|
||||
logger.error("Received RPC instruction without request_id")
|
||||
return
|
||||
method = msg.get("action")
|
||||
parameter = msg.get("parameter", {})
|
||||
args = parameter.get("args", [])
|
||||
kwargs = parameter.get("kwargs", {})
|
||||
target_gui_id = parameter.get("gui_id")
|
||||
sent_at = metadata.get("sent_at")
|
||||
deadline = metadata.get("deadline")
|
||||
timeout = metadata.get("timeout")
|
||||
received_at = time.time()
|
||||
receive_latency = elapsed_seconds(sent_at, received_at)
|
||||
stale_on_receive = deadline is not None and received_at > deadline
|
||||
logger.info(
|
||||
"GUI RPC server received request "
|
||||
f"request_id={request_id} method={method} gui_id={self.gui_id} "
|
||||
f"target_gui_id={target_gui_id} timeout={timeout} "
|
||||
f"receive_latency_s={format_elapsed(receive_latency)} "
|
||||
f"stale_on_receive={stale_on_receive}"
|
||||
)
|
||||
if stale_on_receive:
|
||||
logger.warning(
|
||||
"GUI RPC server received request after client timeout deadline "
|
||||
f"request_id={request_id} method={method} gui_id={self.gui_id} "
|
||||
f"target_gui_id={target_gui_id} timeout={timeout} "
|
||||
f"receive_latency_s={format_elapsed(receive_latency)}"
|
||||
)
|
||||
logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}")
|
||||
|
||||
# Shutdown must acknowledge before teardown starts. The generic RPC path
|
||||
# below publishes successful responses through QTimer.singleShot(0);
|
||||
# for system.shutdown that would race with the queued app quit and
|
||||
# dispatcher shutdown scheduled by _shutdown_gui_server().
|
||||
if method == "system.shutdown":
|
||||
execution_start = time.perf_counter()
|
||||
try:
|
||||
self.run_system_rpc(method, args, kwargs)
|
||||
except Exception:
|
||||
execution_duration = time.perf_counter() - execution_start
|
||||
content = traceback.format_exc()
|
||||
logger.error(
|
||||
"GUI RPC server shutdown request failed "
|
||||
f"request_id={request_id} method={method} gui_id={self.gui_id} "
|
||||
f"execution_duration_s={execution_duration:.3f}\n{content}"
|
||||
)
|
||||
self.send_response(request_id, False, {"error": content})
|
||||
else:
|
||||
execution_duration = time.perf_counter() - execution_start
|
||||
logger.info(
|
||||
"GUI RPC server acknowledged shutdown request "
|
||||
f"request_id={request_id} method={method} gui_id={self.gui_id} "
|
||||
f"execution_duration_s={execution_duration:.3f}"
|
||||
)
|
||||
self.send_response(request_id, True, {"result": None})
|
||||
return
|
||||
|
||||
execution_start = time.perf_counter()
|
||||
with rpc_exception_hook(functools.partial(self.send_response, request_id, False)):
|
||||
try:
|
||||
method = msg["action"]
|
||||
args = msg["parameter"].get("args", [])
|
||||
kwargs = msg["parameter"].get("kwargs", {})
|
||||
if method.startswith("system."):
|
||||
res = self.run_system_rpc(method, args, kwargs)
|
||||
else:
|
||||
obj = self.get_object_from_config(msg["parameter"])
|
||||
obj = self.get_object_from_config(parameter)
|
||||
res = self.run_rpc(obj, method, args, kwargs)
|
||||
except Exception:
|
||||
execution_duration = time.perf_counter() - execution_start
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Error while executing RPC instruction: {content}")
|
||||
logger.error(
|
||||
"GUI RPC server execution failed "
|
||||
f"request_id={request_id} method={method} gui_id={self.gui_id} "
|
||||
f"target_gui_id={target_gui_id} execution_duration_s={execution_duration:.3f}\n"
|
||||
f"{content}"
|
||||
)
|
||||
self.send_response(request_id, False, {"error": content})
|
||||
else:
|
||||
execution_duration = time.perf_counter() - execution_start
|
||||
response_stale = deadline is not None and time.time() > deadline
|
||||
logger.info(
|
||||
"GUI RPC server executed request "
|
||||
f"request_id={request_id} method={method} gui_id={self.gui_id} "
|
||||
f"target_gui_id={target_gui_id} execution_duration_s={execution_duration:.3f} "
|
||||
f"response_after_client_deadline={response_stale}"
|
||||
)
|
||||
if response_stale:
|
||||
logger.warning(
|
||||
"GUI RPC server response is late for client timeout "
|
||||
f"request_id={request_id} method={method} gui_id={self.gui_id} "
|
||||
f"target_gui_id={target_gui_id} timeout={timeout} "
|
||||
f"execution_duration_s={execution_duration:.3f}"
|
||||
)
|
||||
logger.debug(f"RPC instruction executed successfully: {res}")
|
||||
self._rpc_singleshot_repeats[request_id] = SingleshotRPCRepeat()
|
||||
QTimer.singleShot(0, lambda: self.serialize_result_and_send(request_id, res))
|
||||
|
||||
def send_response(self, request_id: str, accepted: bool, msg: dict):
|
||||
log_message = (
|
||||
"GUI RPC server publishing response "
|
||||
f"request_id={request_id} gui_id={self.gui_id} accepted={accepted}"
|
||||
)
|
||||
if accepted:
|
||||
logger.info(log_message)
|
||||
else:
|
||||
logger.error(log_message)
|
||||
self.client.connector.set_and_publish(
|
||||
MessageEndpoints.gui_instruction_response(request_id),
|
||||
messages.RequestResponseMessage(accepted=accepted, message=msg),
|
||||
@@ -236,10 +318,23 @@ class RPCServer:
|
||||
def run_system_rpc(self, method: str, args: list, kwargs: dict):
|
||||
if method == "system.launch_dock_area":
|
||||
return self._launch_dock_area(*args, **kwargs)
|
||||
if method == "system.shutdown":
|
||||
return self._shutdown_gui_server()
|
||||
if method == "system.list_capabilities":
|
||||
return {"system.launch_dock_area": True}
|
||||
return {"system.launch_dock_area": True, "system.shutdown": True}
|
||||
raise ValueError(f"Unknown system RPC method: {method}")
|
||||
|
||||
@staticmethod
|
||||
def _shutdown_gui_server() -> None:
|
||||
app = QApplication.instance()
|
||||
if app is None:
|
||||
return
|
||||
gui_server = getattr(app, "gui_server", None)
|
||||
if gui_server is not None and hasattr(gui_server, "request_shutdown"):
|
||||
QTimer.singleShot(0, gui_server.request_shutdown)
|
||||
return
|
||||
QTimer.singleShot(0, app.quit)
|
||||
|
||||
@staticmethod
|
||||
def _launch_dock_area(
|
||||
name: str | None = None,
|
||||
@@ -297,7 +392,14 @@ class RPCServer:
|
||||
res = self.serialize_object(res)
|
||||
except RegistryNotReadyError:
|
||||
try:
|
||||
self._rpc_singleshot_repeats[request_id] += retry_delay
|
||||
repeat = self._rpc_singleshot_repeats[request_id]
|
||||
repeat += retry_delay
|
||||
logger.warning(
|
||||
"GUI RPC result serialization delayed; retrying "
|
||||
f"request_id={request_id} retry_delay_ms={retry_delay} "
|
||||
f"accumulated_delay_ms={repeat.accumulated_delay} "
|
||||
f"max_delay_ms={repeat.max_delay}"
|
||||
)
|
||||
QTimer.singleShot(
|
||||
retry_delay, lambda: self.serialize_result_and_send(request_id, res)
|
||||
)
|
||||
@@ -407,8 +509,9 @@ class RPCServer:
|
||||
container_proxy = parent.gui_id
|
||||
else:
|
||||
container_proxy = None
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
container_proxy = None
|
||||
logger.error(f"Error while serializing RPC result: {e}")
|
||||
|
||||
if wait and not self.rpc_register.object_is_registered(connector):
|
||||
raise RegistryNotReadyError(f"Connector {connector} not registered yet")
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_qthemes import material_icon
|
||||
from qtpy.QtCore import Qt
|
||||
from qtpy.QtWidgets import QHBoxLayout, QPushButton, QToolButton, QWidget
|
||||
@@ -5,6 +8,8 @@ from qtpy.QtWidgets import QHBoxLayout, QPushButton, QToolButton, QWidget
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
from bec_widgets.utils.error_popups import SafeSlot
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class AbortButton(BECWidget, QWidget):
|
||||
"""A button that abort the scan."""
|
||||
@@ -55,7 +60,7 @@ class AbortButton(BECWidget, QWidget):
|
||||
scan_id(str|None): The scan id to abort. If None, the current scan will be aborted.
|
||||
"""
|
||||
if self.scan_id is not None:
|
||||
print(f"Aborting scan with scan_id: {self.scan_id}")
|
||||
logger.info(f"Aborting scan with scan_id: {self.scan_id}")
|
||||
self.queue.request_scan_abortion(scan_id=self.scan_id)
|
||||
else:
|
||||
self.queue.request_scan_abortion()
|
||||
|
||||
+1
-1
@@ -429,7 +429,7 @@ class PositionerBox2D(PositionerBoxBase):
|
||||
|
||||
@SafeSlot()
|
||||
def on_stop(self):
|
||||
self._stop_device(f"{self.device_hor} or {self.device_ver}")
|
||||
self._stop_device([self.device_hor, self.device_ver])
|
||||
|
||||
@SafeProperty(float)
|
||||
def step_size_hor(self):
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
import uuid
|
||||
from abc import abstractmethod
|
||||
from typing import Callable, TypedDict
|
||||
from typing import Callable, Sequence, TypedDict
|
||||
|
||||
from bec_lib.device import Positioner
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.messages import ScanQueueMessage
|
||||
from bec_lib.messages import VariableMessage
|
||||
from qtpy.QtWidgets import (
|
||||
QDialog,
|
||||
QDoubleSpinBox,
|
||||
@@ -116,17 +115,16 @@ class PositionerBoxBase(BECWidget, QWidget):
|
||||
else:
|
||||
ui["units"].setVisible(False)
|
||||
|
||||
def _stop_device(self, device: str):
|
||||
def _stop_device(self, device: str | Sequence[str]):
|
||||
"""Stop call"""
|
||||
request_id = str(uuid.uuid4())
|
||||
params = {"device": device, "rpc_id": request_id, "func": "stop", "args": [], "kwargs": {}}
|
||||
msg = ScanQueueMessage(
|
||||
scan_type="device_rpc",
|
||||
parameter=params,
|
||||
queue="emergency",
|
||||
metadata={"RID": request_id, "response": False},
|
||||
)
|
||||
self.client.connector.send(MessageEndpoints.scan_queue_request(self.client.username), msg)
|
||||
devices = [device] if isinstance(device, str) else list(device)
|
||||
devices = [dev for dev in devices if dev]
|
||||
if not devices:
|
||||
logger.warning("Stop requested without a valid device.")
|
||||
return
|
||||
|
||||
msg = VariableMessage(value=devices)
|
||||
self.client.connector.send(MessageEndpoints.stop_devices(), msg)
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def _on_device_readback(
|
||||
|
||||
@@ -489,13 +489,17 @@ class DeviceComboBox(BECWidget, QComboBox):
|
||||
action: Device update action emitted by BEC.
|
||||
content: Device update payload. Currently unused.
|
||||
"""
|
||||
if self._callback_id is None or getattr(self, "_destroyed", False):
|
||||
return
|
||||
if action in ["add", "remove", "reload"]:
|
||||
self.device_config_update.emit()
|
||||
|
||||
def cleanup(self):
|
||||
"""Cleanup the widget."""
|
||||
if self._callback_id is not None:
|
||||
self.bec_dispatcher.client.callbacks.remove(self._callback_id)
|
||||
callback_id = self._callback_id
|
||||
self._callback_id = None
|
||||
self.bec_dispatcher.client.callbacks.remove(callback_id)
|
||||
super().cleanup()
|
||||
|
||||
def get_current_device(self) -> object:
|
||||
|
||||
@@ -197,16 +197,21 @@ class SignalComboBox(BECWidget, QComboBox):
|
||||
self.update_signals_from_filters()
|
||||
|
||||
@SafeSlot()
|
||||
@SafeSlot(dict, dict)
|
||||
def update_signals_from_filters(
|
||||
self, content: dict | None = None, metadata: dict | None = None
|
||||
):
|
||||
@SafeSlot(str, dict)
|
||||
def update_signals_from_filters(self, action: str | None = None, content: dict | None = None):
|
||||
"""Refresh available signals from the current device and filters.
|
||||
|
||||
Args:
|
||||
action: Optional BEC device update action. If provided, only device list changing
|
||||
actions trigger a refresh.
|
||||
content: Optional callback payload from BEC device updates. Currently unused.
|
||||
metadata: Optional callback metadata from BEC device updates. Currently unused.
|
||||
"""
|
||||
if self._device_update_register is None or getattr(self, "_destroyed", False):
|
||||
return
|
||||
|
||||
if action is not None and action not in ["add", "remove", "reload"]:
|
||||
return
|
||||
|
||||
self.config.signal_filter = [kind.name for kind in self.signal_filter]
|
||||
|
||||
if self._signal_class_filter:
|
||||
@@ -588,7 +593,10 @@ class SignalComboBox(BECWidget, QComboBox):
|
||||
|
||||
def cleanup(self):
|
||||
"""Cleanup the widget."""
|
||||
self.bec_dispatcher.client.callbacks.remove(self._device_update_register)
|
||||
if self._device_update_register is not None:
|
||||
callback_id = self._device_update_register
|
||||
self._device_update_register = None
|
||||
self.bec_dispatcher.client.callbacks.remove(callback_id)
|
||||
super().cleanup()
|
||||
|
||||
@staticmethod
|
||||
|
||||
@@ -14,7 +14,6 @@ from qtpy.QtWidgets import (
|
||||
QLabel,
|
||||
QPushButton,
|
||||
QSizePolicy,
|
||||
QSpacerItem,
|
||||
QVBoxLayout,
|
||||
QWidget,
|
||||
)
|
||||
@@ -25,6 +24,7 @@ from bec_widgets.utils.colors import apply_theme, get_accent_colors
|
||||
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
|
||||
from bec_widgets.widgets.control.buttons.stop_button.stop_button import StopButton
|
||||
from bec_widgets.widgets.control.scan_control.scan_group_box import ScanGroupBox
|
||||
from bec_widgets.widgets.control.scan_control.scan_info_adapter import ScanInfoAdapter
|
||||
from bec_widgets.widgets.editors.scan_metadata.scan_metadata import ScanMetadata
|
||||
|
||||
|
||||
@@ -95,6 +95,7 @@ class ScanControl(BECWidget, QWidget):
|
||||
self._hide_scan_control_buttons = False
|
||||
self._hide_metadata = False
|
||||
self._hide_scan_selection_combobox = False
|
||||
self._scan_info_adapter = ScanInfoAdapter()
|
||||
|
||||
# Create and set main layout
|
||||
self._init_UI()
|
||||
@@ -184,12 +185,17 @@ class ScanControl(BECWidget, QWidget):
|
||||
MessageEndpoints.available_scans()
|
||||
).resource
|
||||
if self.config.allowed_scans is None:
|
||||
supported_scans = ["ScanBase", "SyncFlyScanBase", "AsyncFlyScanBase"]
|
||||
allowed_scans = [
|
||||
scan_name
|
||||
for scan_name, scan_info in self.available_scans.items()
|
||||
if scan_info["base_class"] in supported_scans and len(scan_info["gui_config"]) > 0
|
||||
]
|
||||
supported_scans = ["ScanBase", "SyncFlyScanBase", "AsyncFlyScanBase", "ScanBaseV4"]
|
||||
|
||||
def _is_scan_supported(scan_name):
|
||||
scan_info = self.available_scans[scan_name]
|
||||
return (
|
||||
scan_info.get("base_class") in supported_scans
|
||||
and self._scan_info_adapter.has_scan_ui_config(scan_info)
|
||||
and not scan_name.startswith("_")
|
||||
)
|
||||
|
||||
allowed_scans = filter(_is_scan_supported, self.available_scans.keys())
|
||||
|
||||
else:
|
||||
allowed_scans = self.config.allowed_scans
|
||||
@@ -376,14 +382,14 @@ class ScanControl(BECWidget, QWidget):
|
||||
self.reset_layout()
|
||||
selected_scan_info = self.available_scans.get(scan_name, {})
|
||||
|
||||
gui_config = selected_scan_info.get("gui_config", {})
|
||||
self.arg_group = gui_config.get("arg_group", None)
|
||||
self.kwarg_groups = gui_config.get("kwarg_groups", None)
|
||||
gui_config = self._scan_info_adapter.build_scan_ui_config(selected_scan_info)
|
||||
arg_group = gui_config.get("arg_group", None)
|
||||
kwarg_groups = gui_config.get("kwarg_groups", [])
|
||||
|
||||
if bool(self.arg_group["arg_inputs"]):
|
||||
self.add_arg_group(self.arg_group)
|
||||
if len(self.kwarg_groups) > 0:
|
||||
self.add_kwargs_boxes(self.kwarg_groups)
|
||||
if arg_group and bool(arg_group.get("arg_inputs")):
|
||||
self.add_arg_group(arg_group)
|
||||
if kwarg_groups:
|
||||
self.add_kwargs_boxes(kwarg_groups)
|
||||
|
||||
self.update()
|
||||
self.adjustSize()
|
||||
@@ -414,6 +420,7 @@ class ScanControl(BECWidget, QWidget):
|
||||
position = self.ARG_BOX_POSITION + (1 if self.arg_box is not None else 0)
|
||||
for group in groups:
|
||||
box = ScanGroupBox(box_type="kwargs", config=group)
|
||||
box.reference_units_changed.connect(self._apply_reference_units_to_other_boxes)
|
||||
box.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Fixed)
|
||||
self.layout.insertWidget(position + len(self.kwarg_boxes), box)
|
||||
self.kwarg_boxes.append(box)
|
||||
@@ -427,11 +434,30 @@ class ScanControl(BECWidget, QWidget):
|
||||
"""
|
||||
self.arg_box = ScanGroupBox(box_type="args", config=group)
|
||||
self.arg_box.device_selected.connect(self.emit_device_selected)
|
||||
self.arg_box.reference_units_changed.connect(self._apply_reference_units_to_other_boxes)
|
||||
self.arg_box.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Fixed)
|
||||
self.arg_box.hide_add_remove_buttons = self._hide_add_remove_buttons
|
||||
self.layout.insertWidget(self.ARG_BOX_POSITION, self.arg_box)
|
||||
self.arg_box.setVisible(not self._hide_arg_box)
|
||||
|
||||
def _scan_group_boxes(self) -> list[ScanGroupBox]:
|
||||
boxes = []
|
||||
if self.arg_box is not None:
|
||||
boxes.append(self.arg_box)
|
||||
boxes.extend(self.kwarg_boxes)
|
||||
return boxes
|
||||
|
||||
def _apply_reference_units_to_other_boxes(
|
||||
self, source_box: ScanGroupBox, reference_name: str, units: str | None
|
||||
) -> None:
|
||||
"""
|
||||
Propagate device-derived units to scan fields that reference a device in another group.
|
||||
"""
|
||||
for box in self._scan_group_boxes():
|
||||
if box is source_box:
|
||||
continue
|
||||
box.apply_reference_units(reference_name, units)
|
||||
|
||||
@SafeSlot(str)
|
||||
def emit_device_selected(self, dev_names):
|
||||
"""
|
||||
|
||||
@@ -162,6 +162,47 @@ class ScanCheckBox(QCheckBox):
|
||||
self.setChecked(default)
|
||||
|
||||
|
||||
class ScanOptionalWidget(QGroupBox):
|
||||
def __init__(self, widget, parent=None):
|
||||
super().__init__(parent=parent)
|
||||
self.inner_widget = widget
|
||||
self.arg_name = getattr(widget, "arg_name", None)
|
||||
self.setFlat(True)
|
||||
|
||||
layout = QHBoxLayout(self)
|
||||
layout.setContentsMargins(0, 0, 0, 0)
|
||||
layout.addWidget(widget)
|
||||
|
||||
self.none_checkbox = QCheckBox(self)
|
||||
self.none_checkbox.setToolTip("Set this value to None.")
|
||||
self.none_checkbox.toggled.connect(self._on_none_toggled)
|
||||
layout.addWidget(self.none_checkbox)
|
||||
|
||||
def _on_none_toggled(self, checked: bool) -> None:
|
||||
self.inner_widget.setEnabled(not checked)
|
||||
|
||||
def set_none(self, checked: bool) -> None:
|
||||
self.none_checkbox.setChecked(checked)
|
||||
|
||||
def is_none(self) -> bool:
|
||||
return self.none_checkbox.isChecked()
|
||||
|
||||
def setToolTip(self, text: str) -> None: # noqa: N802
|
||||
super().setToolTip(text)
|
||||
self.inner_widget.setToolTip(text)
|
||||
checkbox_tooltip = "Set this value to None."
|
||||
if text:
|
||||
checkbox_tooltip = f"{text}\n{checkbox_tooltip}"
|
||||
self.none_checkbox.setToolTip(checkbox_tooltip)
|
||||
|
||||
def toolTip(self) -> str: # noqa: N802
|
||||
return self.inner_widget.toolTip()
|
||||
|
||||
def setSuffix(self, suffix: str) -> None: # noqa: N802
|
||||
if hasattr(self.inner_widget, "setSuffix"):
|
||||
self.inner_widget.setSuffix(suffix)
|
||||
|
||||
|
||||
class ScanGroupBox(QGroupBox):
|
||||
WIDGET_HANDLER = {
|
||||
ScanArgType.DEVICE: DeviceComboBox,
|
||||
@@ -174,6 +215,7 @@ class ScanGroupBox(QGroupBox):
|
||||
}
|
||||
|
||||
device_selected = Signal(str)
|
||||
reference_units_changed = Signal(object, str, object)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -209,6 +251,9 @@ class ScanGroupBox(QGroupBox):
|
||||
|
||||
self.labels = []
|
||||
self.widgets = []
|
||||
self._widget_configs = {}
|
||||
self._wrapped_widgets = {}
|
||||
self._column_labels = {}
|
||||
self.selected_devices = {}
|
||||
|
||||
self.init_box(self.config)
|
||||
@@ -247,6 +292,7 @@ class ScanGroupBox(QGroupBox):
|
||||
label = QLabel(text=display_name)
|
||||
self.layout.addWidget(label, row, column_index)
|
||||
self.labels.append(label)
|
||||
self._column_labels[column_index] = label
|
||||
|
||||
def add_input_widgets(self, group_inputs: dict, row) -> None:
|
||||
"""
|
||||
@@ -281,20 +327,32 @@ class ScanGroupBox(QGroupBox):
|
||||
)
|
||||
else:
|
||||
widget = widget_class(parent=self.parent(), arg_name=arg_name, default=default)
|
||||
self._apply_numeric_precision(widget, item)
|
||||
self._apply_numeric_limits(widget, item)
|
||||
if isinstance(widget, DeviceComboBox):
|
||||
self.selected_devices[widget] = ""
|
||||
widget.device_selected.connect(self.emit_device_selected)
|
||||
widget.currentTextChanged.connect(
|
||||
lambda text, device_widget=widget: self._handle_device_text_changed(
|
||||
device_widget, text
|
||||
)
|
||||
)
|
||||
if isinstance(widget, ScanLiteralsComboBox):
|
||||
widget.set_literals(item["type"].get("Literal", []))
|
||||
tooltip = item.get("tooltip", None)
|
||||
if tooltip is not None:
|
||||
widget.setToolTip(item["tooltip"])
|
||||
self.layout.addWidget(widget, row, column_index)
|
||||
self.widgets.append(widget)
|
||||
display_widget = self._wrap_optional_widget(widget, item, default)
|
||||
self._widget_configs[display_widget] = item
|
||||
self._apply_unit_metadata(display_widget, item)
|
||||
self.layout.addWidget(display_widget, row, column_index)
|
||||
self.widgets.append(display_widget)
|
||||
|
||||
@Slot(str)
|
||||
def emit_device_selected(self, device_name):
|
||||
self.selected_devices[self.sender()] = device_name.strip()
|
||||
sender = self.sender()
|
||||
self.selected_devices[sender] = device_name.strip()
|
||||
if isinstance(sender, DeviceComboBox):
|
||||
units = self._device_units(sender.get_current_device())
|
||||
self._update_reference_units(sender, units)
|
||||
self._emit_reference_units_changed(sender, units)
|
||||
selected_devices_str = " ".join(self.selected_devices.values())
|
||||
self.device_selected.emit(selected_devices_str)
|
||||
|
||||
@@ -319,8 +377,11 @@ class ScanGroupBox(QGroupBox):
|
||||
return
|
||||
|
||||
for widget in self.widgets[-len(self.inputs) :]:
|
||||
if isinstance(widget, DeviceComboBox):
|
||||
self.selected_devices[widget] = ""
|
||||
inner_widget = self._inner_widget(widget)
|
||||
if isinstance(inner_widget, DeviceComboBox):
|
||||
self.selected_devices[inner_widget] = ""
|
||||
self._wrapped_widgets.pop(inner_widget, None)
|
||||
self._widget_configs.pop(widget, None)
|
||||
widget.close()
|
||||
widget.deleteLater()
|
||||
self.widgets = self.widgets[: -len(self.inputs)]
|
||||
@@ -331,8 +392,11 @@ class ScanGroupBox(QGroupBox):
|
||||
def remove_all_widget_bundles(self):
|
||||
"""Remove every widget bundle from the scan control layout."""
|
||||
for widget in list(self.widgets):
|
||||
if isinstance(widget, DeviceComboBox):
|
||||
self.selected_devices.pop(widget, None)
|
||||
inner_widget = self._inner_widget(widget)
|
||||
if isinstance(inner_widget, DeviceComboBox):
|
||||
self.selected_devices.pop(inner_widget, None)
|
||||
self._wrapped_widgets.pop(inner_widget, None)
|
||||
self._widget_configs.pop(widget, None)
|
||||
widget.close()
|
||||
widget.deleteLater()
|
||||
self.layout.removeWidget(widget)
|
||||
@@ -368,12 +432,7 @@ class ScanGroupBox(QGroupBox):
|
||||
for j in range(self.layout.columnCount()):
|
||||
try: # In case that the bundle size changes
|
||||
widget = self.layout.itemAtPosition(i, j).widget()
|
||||
if isinstance(widget, DeviceComboBox) and device_object:
|
||||
value = widget.get_current_device()
|
||||
elif isinstance(widget, DeviceComboBox):
|
||||
value = widget.currentText()
|
||||
else:
|
||||
value = WidgetIO.get_value(widget)
|
||||
value = self._widget_value(widget, device_object=device_object)
|
||||
args.append(value)
|
||||
except AttributeError:
|
||||
continue
|
||||
@@ -383,27 +442,23 @@ class ScanGroupBox(QGroupBox):
|
||||
kwargs = {}
|
||||
for i in range(self.layout.columnCount()):
|
||||
widget = self.layout.itemAtPosition(1, i).widget()
|
||||
if isinstance(widget, DeviceComboBox) and device_object:
|
||||
value = widget.get_current_device().name
|
||||
elif isinstance(widget, DeviceComboBox):
|
||||
value = widget.currentText()
|
||||
elif isinstance(widget, ScanLiteralsComboBox):
|
||||
value = widget.get_value()
|
||||
else:
|
||||
value = WidgetIO.get_value(widget)
|
||||
value = self._widget_value(widget, device_object=device_object)
|
||||
inner_widget = self._inner_widget(widget)
|
||||
if isinstance(inner_widget, DeviceComboBox) and value is not None and device_object:
|
||||
value = value.name
|
||||
kwargs[widget.arg_name] = value
|
||||
return kwargs
|
||||
|
||||
def count_arg_rows(self):
|
||||
widget_rows = 0
|
||||
for row in range(self.layout.rowCount()):
|
||||
if row == 0:
|
||||
continue
|
||||
for col in range(self.layout.columnCount()):
|
||||
item = self.layout.itemAtPosition(row, col)
|
||||
if item is not None:
|
||||
widget = item.widget()
|
||||
if widget is not None:
|
||||
if isinstance(widget, DeviceComboBox):
|
||||
widget_rows += 1
|
||||
if item is not None and item.widget() is not None:
|
||||
widget_rows += 1
|
||||
break
|
||||
return widget_rows
|
||||
|
||||
def set_parameters(self, parameters: list | dict):
|
||||
@@ -427,11 +482,208 @@ class ScanGroupBox(QGroupBox):
|
||||
self.add_input_widgets(self.inputs, row)
|
||||
|
||||
for i, value in enumerate(parameters):
|
||||
WidgetIO.set_value(self.widgets[i], value)
|
||||
self._set_widget_value(self.widgets[i], value)
|
||||
|
||||
def _set_kwarg_parameters(self, parameters: dict):
|
||||
for widget in self.widgets:
|
||||
for key, value in parameters.items():
|
||||
if widget.arg_name == key:
|
||||
WidgetIO.set_value(widget, value)
|
||||
self._set_widget_value(widget, value)
|
||||
break
|
||||
|
||||
@staticmethod
|
||||
def _unit_tooltip(item: dict, units: str | None = None) -> str | None:
|
||||
tooltip = item.get("tooltip", None)
|
||||
reference_units = item.get("reference_units", None)
|
||||
units = units or item.get("units", None)
|
||||
tooltip_parts = [tooltip] if tooltip else []
|
||||
if units:
|
||||
tooltip_parts.append(f"Units: {units}")
|
||||
elif reference_units:
|
||||
tooltip_parts.append(f"Units from: {reference_units}")
|
||||
if tooltip_parts:
|
||||
return "\n".join(tooltip_parts)
|
||||
return None
|
||||
|
||||
def _apply_unit_metadata(self, widget, item: dict, units: str | None = None) -> None:
|
||||
units = units or item.get("units", None)
|
||||
tooltip = self._unit_tooltip(item, units)
|
||||
existing_tooltip = widget.toolTip()
|
||||
|
||||
if existing_tooltip:
|
||||
# strip the existing unit info from the tooltip if it exists
|
||||
# to avoid tooltip bloat on multiple updates
|
||||
existing_tooltip = "\n".join(
|
||||
line
|
||||
for line in existing_tooltip.splitlines()
|
||||
if not (line.startswith("Units:") or line.startswith("Units from:"))
|
||||
).strip()
|
||||
if tooltip:
|
||||
if existing_tooltip:
|
||||
widget.setToolTip(f"{existing_tooltip}\n{tooltip}")
|
||||
else:
|
||||
widget.setToolTip(tooltip)
|
||||
if hasattr(widget, "setSuffix"):
|
||||
widget.setSuffix(f" {units}" if units else "")
|
||||
|
||||
def _refresh_column_label(self, column: int, item: dict) -> None:
|
||||
if column not in self._column_labels:
|
||||
return
|
||||
self._column_labels[column].setText(item.get("display_name", item.get("name", None)))
|
||||
|
||||
@staticmethod
|
||||
def _device_units(device) -> str | None:
|
||||
egu = getattr(device, "egu", None)
|
||||
if not callable(egu):
|
||||
return None
|
||||
try:
|
||||
return egu()
|
||||
except Exception:
|
||||
logger.exception("Failed to fetch engineering units from device %s", device)
|
||||
return None
|
||||
|
||||
def _widget_position(self, widget) -> tuple[int, int] | None:
|
||||
widget = self._display_widget(widget)
|
||||
for row in range(self.layout.rowCount()):
|
||||
for column in range(self.layout.columnCount()):
|
||||
item = self.layout.itemAtPosition(row, column)
|
||||
if item is not None and item.widget() is widget:
|
||||
return row, column
|
||||
return None
|
||||
|
||||
def _update_reference_units(self, device_widget: DeviceComboBox, units: str | None) -> None:
|
||||
position = self._widget_position(device_widget)
|
||||
if position is None:
|
||||
return
|
||||
source_row, _ = position
|
||||
source_name = device_widget.arg_name
|
||||
|
||||
for widget in self.widgets:
|
||||
item = self._widget_configs.get(widget, {})
|
||||
if item.get("reference_units") != source_name:
|
||||
continue
|
||||
widget_position = self._widget_position(widget)
|
||||
if widget_position is None:
|
||||
continue
|
||||
row, column = widget_position
|
||||
if self.box_type == "args" and row != source_row:
|
||||
continue
|
||||
self._apply_unit_metadata(widget, item, units)
|
||||
self._refresh_column_label(column, item)
|
||||
|
||||
def apply_reference_units(self, reference_name: str, units: str | None) -> None:
|
||||
"""
|
||||
Apply units to widgets that reference an argument owned by another group box.
|
||||
|
||||
Cross-box references only have one widget row, so row scoping is intentionally handled by
|
||||
the source group before this method is called.
|
||||
"""
|
||||
for widget in self.widgets:
|
||||
item = self._widget_configs.get(widget, {})
|
||||
if item.get("reference_units") != reference_name:
|
||||
continue
|
||||
self._apply_unit_metadata(widget, item, units)
|
||||
position = self._widget_position(widget)
|
||||
if position is not None:
|
||||
_, column = position
|
||||
self._refresh_column_label(column, item)
|
||||
|
||||
def _emit_reference_units_changed(
|
||||
self, device_widget: DeviceComboBox, units: str | None
|
||||
) -> None:
|
||||
reference_name = getattr(device_widget, "arg_name", None)
|
||||
if not reference_name:
|
||||
return
|
||||
self.reference_units_changed.emit(self, reference_name, units)
|
||||
|
||||
def _handle_device_text_changed(self, device_widget: DeviceComboBox, device_name: str) -> None:
|
||||
if not device_widget.validate_device(device_name):
|
||||
self.selected_devices[device_widget] = ""
|
||||
self._update_reference_units(device_widget, None)
|
||||
self._emit_reference_units_changed(device_widget, None)
|
||||
|
||||
@staticmethod
|
||||
def _apply_numeric_precision(widget: ScanDoubleSpinBox, item: dict) -> None:
|
||||
if not isinstance(widget, ScanDoubleSpinBox):
|
||||
return
|
||||
|
||||
precision = item.get("precision")
|
||||
if precision is None:
|
||||
return
|
||||
|
||||
try:
|
||||
widget.setDecimals(max(0, int(precision)))
|
||||
except (TypeError, ValueError):
|
||||
logger.warning(
|
||||
"Ignoring invalid precision %r for parameter %s", precision, item.get("name")
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _apply_numeric_limits(widget: ScanDoubleSpinBox | ScanSpinBox, item: dict) -> None:
|
||||
if isinstance(widget, ScanSpinBox):
|
||||
minimum = -2147483647 # largest int which qt allows
|
||||
maximum = 2147483647
|
||||
if item.get("ge") is not None:
|
||||
minimum = int(item["ge"])
|
||||
if item.get("gt") is not None:
|
||||
minimum = int(item["gt"]) + 1
|
||||
if item.get("le") is not None:
|
||||
maximum = int(item["le"])
|
||||
if item.get("lt") is not None:
|
||||
maximum = int(item["lt"]) - 1
|
||||
widget.setRange(minimum, maximum)
|
||||
return
|
||||
|
||||
if isinstance(widget, ScanDoubleSpinBox):
|
||||
minimum = -float("inf")
|
||||
maximum = float("inf")
|
||||
step = 10 ** (-widget.decimals())
|
||||
if item.get("ge") is not None:
|
||||
minimum = float(item["ge"])
|
||||
if item.get("gt") is not None:
|
||||
minimum = float(item["gt"]) + step
|
||||
if item.get("le") is not None:
|
||||
maximum = float(item["le"])
|
||||
if item.get("lt") is not None:
|
||||
maximum = float(item["lt"]) - step
|
||||
widget.setRange(minimum, maximum)
|
||||
|
||||
def _wrap_optional_widget(self, widget, item: dict, default):
|
||||
if not item.get("optional", False):
|
||||
return widget
|
||||
|
||||
wrapped_widget = ScanOptionalWidget(widget, parent=self)
|
||||
wrapped_widget.set_none(default is None)
|
||||
self._wrapped_widgets[widget] = wrapped_widget
|
||||
return wrapped_widget
|
||||
|
||||
@staticmethod
|
||||
def _inner_widget(widget):
|
||||
if isinstance(widget, ScanOptionalWidget):
|
||||
return widget.inner_widget
|
||||
return widget
|
||||
|
||||
def _display_widget(self, widget):
|
||||
return self._wrapped_widgets.get(widget, widget)
|
||||
|
||||
def _widget_value(self, widget, *, device_object: bool = True):
|
||||
if isinstance(widget, ScanOptionalWidget) and widget.is_none():
|
||||
return None
|
||||
|
||||
inner_widget = self._inner_widget(widget)
|
||||
if isinstance(inner_widget, DeviceComboBox) and device_object:
|
||||
return inner_widget.get_current_device()
|
||||
if isinstance(inner_widget, DeviceComboBox):
|
||||
return inner_widget.currentText()
|
||||
if isinstance(inner_widget, ScanLiteralsComboBox):
|
||||
return inner_widget.get_value()
|
||||
return WidgetIO.get_value(inner_widget)
|
||||
|
||||
def _set_widget_value(self, widget, value) -> None:
|
||||
if isinstance(widget, ScanOptionalWidget):
|
||||
widget.set_none(value is None)
|
||||
if value is None:
|
||||
return
|
||||
WidgetIO.set_value(widget.inner_widget, value)
|
||||
return
|
||||
WidgetIO.set_value(widget, value)
|
||||
|
||||
@@ -0,0 +1,298 @@
|
||||
"""Helpers for translating BEC scan metadata into ScanControl UI configuration."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
AnnotationValue = str | dict[str, Any] | list[Any] | None
|
||||
ScanArgumentMetadata = dict[str, Any]
|
||||
SignatureEntry = dict[str, Any]
|
||||
ScanInputConfig = dict[str, Any]
|
||||
ScanInfo = dict[str, Any]
|
||||
ScanUIConfig = dict[str, Any]
|
||||
|
||||
SUPPORTED_SCAN_INPUT_TYPES = {"device", "DeviceBase", "float", "int", "bool", "str"}
|
||||
|
||||
|
||||
class ScanInfoAdapter:
|
||||
"""Normalize available-scan payloads into the structure consumed by ``ScanControl``."""
|
||||
|
||||
@staticmethod
|
||||
def has_scan_ui_config(scan_info: ScanInfo) -> bool:
|
||||
"""Check whether a scan exposes enough metadata to build a UI.
|
||||
|
||||
Args:
|
||||
scan_info (ScanInfo): Available-scan payload for one scan.
|
||||
|
||||
Returns:
|
||||
bool: ``True`` when a supported GUI metadata field is present.
|
||||
"""
|
||||
if not (
|
||||
scan_info.get("gui_visibility")
|
||||
or scan_info.get("gui_config")
|
||||
or scan_info.get("gui_visualization")
|
||||
or scan_info.get("signature")
|
||||
):
|
||||
return False
|
||||
|
||||
gui_config = ScanInfoAdapter().build_scan_ui_config(scan_info)
|
||||
return not ScanInfoAdapter.unsupported_inputs(gui_config)
|
||||
|
||||
@staticmethod
|
||||
def is_supported_input_type(input_type: AnnotationValue) -> bool:
|
||||
"""Return whether ``ScanGroupBox`` has a widget for this serialized type."""
|
||||
return (
|
||||
isinstance(input_type, str)
|
||||
and input_type in SUPPORTED_SCAN_INPUT_TYPES
|
||||
or isinstance(input_type, dict)
|
||||
and "Literal" in input_type
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def unsupported_inputs(gui_config: ScanUIConfig) -> list[ScanInputConfig]:
|
||||
"""Return input configs that cannot be rendered by ``ScanGroupBox``."""
|
||||
inputs = []
|
||||
arg_group = gui_config.get("arg_group")
|
||||
if arg_group:
|
||||
inputs.extend(arg_group.get("inputs", []))
|
||||
for group in gui_config.get("kwarg_groups", []):
|
||||
inputs.extend(group.get("inputs", []))
|
||||
return [
|
||||
input_config
|
||||
for input_config in inputs
|
||||
if not ScanInfoAdapter.is_supported_input_type(input_config.get("type"))
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def format_display_name(name: str) -> str:
|
||||
"""Convert a parameter name into a user-facing label.
|
||||
|
||||
Args:
|
||||
name (str): Raw parameter name.
|
||||
|
||||
Returns:
|
||||
str: Formatted display label such as ``Exp Time``.
|
||||
"""
|
||||
parts = re.split(r"(_|\d+)", name)
|
||||
return " ".join(part.capitalize() for part in parts if part.isalnum()).strip()
|
||||
|
||||
@staticmethod
|
||||
def resolve_tooltip(scan_argument: ScanArgumentMetadata) -> str | None:
|
||||
"""Resolve the tooltip text from parsed ``ScanArgument`` metadata.
|
||||
|
||||
Args:
|
||||
scan_argument (ScanArgumentMetadata): Parsed ``ScanArgument`` metadata.
|
||||
|
||||
Returns:
|
||||
str | None: Explicit tooltip text if provided, otherwise the description fallback.
|
||||
"""
|
||||
return scan_argument.get("tooltip") or scan_argument.get("description")
|
||||
|
||||
@staticmethod
|
||||
def parse_annotation(
|
||||
annotation: AnnotationValue,
|
||||
) -> tuple[AnnotationValue, ScanArgumentMetadata, bool]:
|
||||
"""Extract the serialized base annotation and ``ScanArgument`` metadata.
|
||||
|
||||
Args:
|
||||
annotation (AnnotationValue): Serialized annotation payload from BEC.
|
||||
|
||||
Returns:
|
||||
tuple[AnnotationValue, ScanArgumentMetadata, bool]: The unwrapped annotation,
|
||||
parsed ``ScanArgument`` metadata, and whether ``None`` is an allowed value.
|
||||
"""
|
||||
scan_argument: ScanArgumentMetadata = {}
|
||||
if isinstance(annotation, dict) and "Annotated" in annotation:
|
||||
annotated = annotation["Annotated"]
|
||||
annotation = annotated.get("type", "_empty")
|
||||
scan_argument = annotated.get("metadata", {}).get("ScanArgument", {}) or {}
|
||||
|
||||
allows_none = False
|
||||
if isinstance(annotation, list):
|
||||
allows_none = "NoneType" in annotation
|
||||
annotation = next(
|
||||
(entry for entry in annotation if entry != "NoneType"),
|
||||
annotation[0] if annotation else "_empty",
|
||||
)
|
||||
elif annotation == "NoneType":
|
||||
allows_none = True
|
||||
annotation = "_empty"
|
||||
|
||||
return annotation, scan_argument, allows_none
|
||||
|
||||
@staticmethod
|
||||
def scan_arg_type_from_annotation(annotation: AnnotationValue) -> AnnotationValue:
|
||||
"""Normalize an annotation value to the widget type expected by ``ScanControl``.
|
||||
|
||||
Args:
|
||||
annotation (AnnotationValue): Serialized or parsed annotation value.
|
||||
|
||||
Returns:
|
||||
AnnotationValue: The normalized type identifier used by the widget layer.
|
||||
"""
|
||||
if isinstance(annotation, dict):
|
||||
return annotation
|
||||
if annotation in ("_empty", None):
|
||||
return "str"
|
||||
return annotation
|
||||
|
||||
def scan_input_from_signature(
|
||||
self, param: SignatureEntry, arg: bool = False
|
||||
) -> ScanInputConfig:
|
||||
"""Build one ScanControl input description from a signature entry.
|
||||
|
||||
Args:
|
||||
param (SignatureEntry): Serialized signature entry.
|
||||
arg (bool): Whether the parameter belongs to the positional arg bundle.
|
||||
|
||||
Returns:
|
||||
ScanInputConfig: Normalized input configuration for ``ScanControl``.
|
||||
"""
|
||||
annotation, scan_argument, allows_none = self.parse_annotation(param.get("annotation"))
|
||||
return self._build_scan_input(
|
||||
name=param["name"],
|
||||
annotation=annotation,
|
||||
scan_argument=scan_argument,
|
||||
arg=arg,
|
||||
default=None if arg else param.get("default", None),
|
||||
optional=allows_none,
|
||||
)
|
||||
|
||||
def scan_input_from_arg_input(
|
||||
self, name: str, item_type: AnnotationValue, signature_by_name: dict[str, SignatureEntry]
|
||||
) -> ScanInputConfig:
|
||||
"""Build one arg-bundle input description from ``arg_input`` metadata.
|
||||
|
||||
Args:
|
||||
name (str): Argument name from ``arg_input``.
|
||||
item_type (AnnotationValue): Serialized argument type from ``arg_input``.
|
||||
signature_by_name (dict[str, SignatureEntry]): Signature entries indexed by
|
||||
parameter name.
|
||||
|
||||
Returns:
|
||||
ScanInputConfig: Normalized input configuration for one arg-bundle field.
|
||||
"""
|
||||
if name in signature_by_name:
|
||||
scan_input = self.scan_input_from_signature(signature_by_name[name], arg=True)
|
||||
scan_input["type"] = self.scan_arg_type_from_annotation(
|
||||
self.parse_annotation(signature_by_name[name].get("annotation"))[0]
|
||||
)
|
||||
else:
|
||||
annotation, scan_argument, allows_none = self.parse_annotation(item_type)
|
||||
scan_input = self._build_scan_input(
|
||||
name=name,
|
||||
annotation=annotation,
|
||||
scan_argument=scan_argument,
|
||||
arg=True,
|
||||
default=None,
|
||||
optional=allows_none,
|
||||
)
|
||||
if scan_input["type"] in ("_empty", None):
|
||||
scan_input["type"] = item_type
|
||||
return scan_input
|
||||
|
||||
def _build_scan_input(
|
||||
self,
|
||||
name: str,
|
||||
annotation: AnnotationValue,
|
||||
scan_argument: ScanArgumentMetadata,
|
||||
*,
|
||||
arg: bool,
|
||||
default: Any,
|
||||
optional: bool,
|
||||
) -> ScanInputConfig:
|
||||
"""Build one normalized ScanControl input configuration.
|
||||
|
||||
Args:
|
||||
name (str): Parameter name.
|
||||
annotation (AnnotationValue): Parsed annotation value.
|
||||
scan_argument (ScanArgumentMetadata): Parsed ``ScanArgument`` metadata.
|
||||
arg (bool): Whether the parameter belongs to the positional arg bundle.
|
||||
default (Any): Default value for the parameter.
|
||||
|
||||
Returns:
|
||||
ScanInputConfig: Normalized input configuration.
|
||||
"""
|
||||
return {
|
||||
"arg": arg,
|
||||
"name": name,
|
||||
"type": self.scan_arg_type_from_annotation(annotation),
|
||||
"display_name": scan_argument.get("display_name") or self.format_display_name(name),
|
||||
"tooltip": self.resolve_tooltip(scan_argument),
|
||||
"default": default,
|
||||
"optional": optional,
|
||||
"expert": scan_argument.get("expert", False),
|
||||
"hidden": scan_argument.get("hidden", False),
|
||||
"precision": scan_argument.get("precision"),
|
||||
"units": scan_argument.get("units"),
|
||||
"reference_units": scan_argument.get("reference_units"),
|
||||
"gt": scan_argument.get("gt"),
|
||||
"ge": scan_argument.get("ge"),
|
||||
"lt": scan_argument.get("lt"),
|
||||
"le": scan_argument.get("le"),
|
||||
"alternative_group": scan_argument.get("alternative_group"),
|
||||
}
|
||||
|
||||
def build_scan_ui_config(self, scan_info: ScanInfo) -> ScanUIConfig:
|
||||
"""Normalize one available-scan entry into the widget UI configuration.
|
||||
|
||||
Args:
|
||||
scan_info (ScanInfo): Available-scan payload for one scan.
|
||||
|
||||
Returns:
|
||||
ScanUIConfig: Legacy group structure consumed by ``ScanControl`` and
|
||||
``ScanGroupBox``.
|
||||
"""
|
||||
gui_visualization = (
|
||||
scan_info.get("gui_visualization") or scan_info.get("gui_visibility") or {}
|
||||
)
|
||||
if not gui_visualization and scan_info.get("gui_config"):
|
||||
return scan_info["gui_config"]
|
||||
|
||||
signature = scan_info.get("signature", [])
|
||||
signature_by_name = {entry["name"]: entry for entry in signature}
|
||||
|
||||
arg_group = None
|
||||
arg_input = scan_info.get("arg_input", {})
|
||||
if isinstance(arg_input, dict) and arg_input:
|
||||
bundle_size = scan_info.get("arg_bundle_size", {})
|
||||
inputs = [
|
||||
self.scan_input_from_arg_input(name, item_type, signature_by_name)
|
||||
for name, item_type in arg_input.items()
|
||||
]
|
||||
arg_group = {
|
||||
"name": "Scan Arguments",
|
||||
"bundle": bundle_size.get("bundle"),
|
||||
"arg_inputs": arg_input,
|
||||
"inputs": inputs,
|
||||
"min": bundle_size.get("min"),
|
||||
"max": bundle_size.get("max"),
|
||||
}
|
||||
|
||||
kwarg_groups = []
|
||||
arg_names = set(arg_input) if isinstance(arg_input, dict) else set()
|
||||
visible_kwarg_names = set()
|
||||
for group_name, input_names in gui_visualization.items():
|
||||
inputs = []
|
||||
for input_name in input_names:
|
||||
if input_name in arg_names or input_name not in signature_by_name:
|
||||
continue
|
||||
if input_name in visible_kwarg_names:
|
||||
continue
|
||||
param = signature_by_name[input_name]
|
||||
if param.get("kind") in ("VAR_POSITIONAL", "VAR_KEYWORD"):
|
||||
continue
|
||||
scan_input = self.scan_input_from_signature(param)
|
||||
if scan_input.get("hidden"):
|
||||
continue
|
||||
inputs.append(scan_input)
|
||||
visible_kwarg_names.add(input_name)
|
||||
if inputs:
|
||||
kwarg_groups.append({"name": group_name, "inputs": inputs})
|
||||
|
||||
return {
|
||||
"scan_class_name": scan_info.get("class"),
|
||||
"arg_group": arg_group,
|
||||
"kwarg_groups": kwarg_groups,
|
||||
}
|
||||
@@ -460,7 +460,7 @@ class ImageBase(PlotBase):
|
||||
self._color_bar = None
|
||||
|
||||
def disable_autorange():
|
||||
print("Disabling autorange")
|
||||
logger.info("Disabling autorange")
|
||||
self.setProperty("autorange", False)
|
||||
|
||||
if style == "simple":
|
||||
@@ -928,7 +928,7 @@ class ImageBase(PlotBase):
|
||||
# if sync:
|
||||
self._sync_colorbar_levels()
|
||||
self._sync_autorange_switch()
|
||||
print(f"Autorange set to {enabled}")
|
||||
logger.info(f"Autorange set to {enabled}")
|
||||
|
||||
@SafeProperty(str)
|
||||
def autorange_mode(self) -> str:
|
||||
|
||||
@@ -2449,7 +2449,7 @@ class Waveform(PlotBase):
|
||||
first_key = next(iter(info))
|
||||
mem_bytes = info[first_key]["value"]["mem_size"]
|
||||
size_mb = mem_bytes / (1024 * 1024)
|
||||
print(f"Dataset size: {size_mb:.1f} MB")
|
||||
logger.info(f"Dataset size: {size_mb:.1f} MB")
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.error(f"Unable to evaluate dataset size: {exc}")
|
||||
return True
|
||||
|
||||
@@ -155,7 +155,6 @@ class ScanProgressBar(BECWidget, QWidget):
|
||||
self._progress_device = None
|
||||
self.task = None
|
||||
self.scan_number = None
|
||||
self.progress_started.connect(lambda: print("Scan progress started"))
|
||||
|
||||
def connect_to_queue(self):
|
||||
"""
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import sys
|
||||
|
||||
from qtpy.QtCore import Property, QEasingCurve, QPointF, QPropertyAnimation, Qt, Signal
|
||||
from qtpy.QtCore import Property, QEasingCurve, QEvent, QPointF, QPropertyAnimation, Qt, Signal
|
||||
from qtpy.QtGui import QColor, QPainter
|
||||
from qtpy.QtWidgets import QApplication, QWidget
|
||||
|
||||
@@ -41,10 +41,22 @@ class ToggleSwitch(QWidget):
|
||||
theme = getattr(QApplication.instance(), "theme", None)
|
||||
colors = theme.colors if theme else {}
|
||||
|
||||
self._active_track_color = colors.get("PRIMARY", QColor(33, 150, 243))
|
||||
self._active_thumb_color = colors.get("ON_PRIMARY", QColor(255, 255, 255))
|
||||
self._inactive_track_color = colors.get("SEPARATOR", QColor(200, 200, 200))
|
||||
self._inactive_thumb_color = colors.get("ON_PRIMARY", QColor(255, 255, 255))
|
||||
self._active_track_color = self._theme_color(colors, "PRIMARY", QColor(33, 150, 243))
|
||||
self._active_thumb_color = self._theme_color(colors, "ON_PRIMARY", QColor(255, 255, 255))
|
||||
self._inactive_track_color = self._theme_color(colors, "SEPARATOR", QColor(200, 200, 200))
|
||||
self._inactive_thumb_color = self._theme_color(colors, "ON_PRIMARY", QColor(255, 255, 255))
|
||||
self._disabled_track_color = self._theme_color(colors, "DISABLED_BG", QColor(220, 220, 220))
|
||||
self._disabled_thumb_color = self._theme_color(colors, "DISABLED_FG", QColor(150, 150, 150))
|
||||
self._disabled_border_color = self._theme_color(
|
||||
colors, "DISABLED_BORDER", QColor(170, 170, 170)
|
||||
)
|
||||
if hasattr(self, "_checked"):
|
||||
self.update_colors()
|
||||
|
||||
@staticmethod
|
||||
def _theme_color(colors: dict, key: str, fallback: QColor) -> QColor:
|
||||
color = colors.get(key, fallback)
|
||||
return color if isinstance(color, QColor) else QColor(color)
|
||||
|
||||
@Property(bool)
|
||||
def checked(self):
|
||||
@@ -119,29 +131,40 @@ class ToggleSwitch(QWidget):
|
||||
|
||||
def paintEvent(self, event):
|
||||
painter = QPainter(self)
|
||||
painter.setRenderHint(QPainter.Antialiasing)
|
||||
painter.setRenderHint(QPainter.RenderHint.Antialiasing)
|
||||
|
||||
# Draw track
|
||||
painter.setBrush(self._track_color)
|
||||
painter.setPen(Qt.NoPen)
|
||||
painter.setPen(self._disabled_border_color if not self.isEnabled() else Qt.PenStyle.NoPen)
|
||||
painter.drawRoundedRect(
|
||||
0, 0, self.width(), self.height(), self.height() / 2, self.height() / 2
|
||||
)
|
||||
|
||||
# Draw thumb
|
||||
painter.setBrush(self._thumb_color)
|
||||
painter.setPen(Qt.PenStyle.NoPen)
|
||||
diameter = int(self.height() * 0.8)
|
||||
painter.drawEllipse(int(self._thumb_pos.x()), int(self._thumb_pos.y()), diameter, diameter)
|
||||
|
||||
def mousePressEvent(self, event):
|
||||
if event.button() == Qt.LeftButton:
|
||||
if self.isEnabled() and event.button() == Qt.MouseButton.LeftButton:
|
||||
self.checked = not self.checked
|
||||
|
||||
def update_colors(self):
|
||||
if not self.isEnabled():
|
||||
self._thumb_color = self._disabled_thumb_color
|
||||
self._track_color = self._disabled_track_color
|
||||
return
|
||||
|
||||
self._thumb_color = self.active_thumb_color if self._checked else self.inactive_thumb_color
|
||||
self._track_color = self.active_track_color if self._checked else self.inactive_track_color
|
||||
|
||||
def changeEvent(self, event):
|
||||
if event.type() == QEvent.Type.EnabledChange:
|
||||
self.update_colors()
|
||||
self.update()
|
||||
super().changeEvent(event)
|
||||
|
||||
def get_thumb_pos(self, checked):
|
||||
return QPointF(self.width() - self.height() + 3, 2) if checked else QPointF(3, 2)
|
||||
|
||||
@@ -167,7 +190,7 @@ class ToggleSwitch(QWidget):
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
from qtpy.QtWidgets import QHBoxLayout, QVBoxLayout, QWidget
|
||||
from qtpy.QtWidgets import QHBoxLayout, QWidget
|
||||
|
||||
from bec_widgets.utils.colors import apply_theme
|
||||
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
|
||||
@@ -177,9 +200,12 @@ if __name__ == "__main__": # pragma: no cover
|
||||
widget = QWidget()
|
||||
layout = QHBoxLayout(widget)
|
||||
toggle = ToggleSwitch()
|
||||
toggle_disabled = ToggleSwitch()
|
||||
dark_mode_btn = DarkModeButton()
|
||||
layout.addWidget(toggle)
|
||||
layout.addWidget(toggle_disabled)
|
||||
layout.addWidget(dark_mode_btn)
|
||||
toggle_disabled.setEnabled(False)
|
||||
window = QWidget()
|
||||
window.setLayout(layout)
|
||||
window.show()
|
||||
|
||||
+10
-1
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "bec_widgets"
|
||||
version = "3.11.1"
|
||||
version = "3.13.5"
|
||||
description = "BEC Widgets"
|
||||
requires-python = ">=3.11"
|
||||
classifiers = [
|
||||
@@ -64,6 +64,15 @@ qtermwidget = ["pyside6_qtermwidget"]
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
@@ -45,7 +45,7 @@ def connected_client_gui_obj(qtbot, gui_id, bec_client_lib):
|
||||
"""
|
||||
gui = BECGuiClient(gui_id=gui_id)
|
||||
try:
|
||||
gui.start(wait=True)
|
||||
gui.show(wait=True)
|
||||
qtbot.waitUntil(lambda: hasattr(gui, "bec"), timeout=5000)
|
||||
gui.bec.delete_all() # ensure clean state
|
||||
qtbot.waitUntil(lambda: len(gui.bec.widget_list()) == 0, timeout=10000)
|
||||
|
||||
@@ -143,11 +143,11 @@ def test_rpc_gui_obj(connected_client_gui_obj: BECGuiClient, qtbot):
|
||||
qtbot.wait(500)
|
||||
gui.kill_server()
|
||||
assert not gui._gui_is_alive()
|
||||
gui.start(wait=True)
|
||||
gui.show(wait=True)
|
||||
assert gui._gui_is_alive()
|
||||
# calling start multiple times should not change anything
|
||||
gui.start(wait=True)
|
||||
gui.start(wait=True)
|
||||
# calling show multiple times should not change anything
|
||||
gui.show(wait=True)
|
||||
gui.show(wait=True)
|
||||
|
||||
def wait_for_gui_started():
|
||||
return "bec" in gui.windows
|
||||
|
||||
@@ -69,7 +69,7 @@ def create_widget(
|
||||
return widget
|
||||
|
||||
|
||||
@pytest.mark.timeout(100)
|
||||
@pytest.mark.timeout(20)
|
||||
def test_available_widgets(qtbot, connected_client_gui_obj):
|
||||
"""This test checks that all widgets that are available via gui.available_widgets can be created and removed."""
|
||||
gui = connected_client_gui_obj
|
||||
|
||||
@@ -75,7 +75,7 @@ def connected_client_gui_obj(qtbot_scope_module, gui_id, bec_client_lib):
|
||||
"""
|
||||
gui = BECGuiClient(gui_id=gui_id)
|
||||
try:
|
||||
gui.start(wait=True)
|
||||
gui.show(wait=True)
|
||||
qtbot_scope_module.waitUntil(lambda: hasattr(gui, "bec"), timeout=5000)
|
||||
gui.bec.delete_all() # ensure clean state
|
||||
qtbot_scope_module.waitUntil(lambda: len(gui.bec.widget_list()) == 0, timeout=10000)
|
||||
|
||||
@@ -6,6 +6,7 @@ from qtpy.QtCore import QObject
|
||||
from qtpy.QtWidgets import QApplication, QWidget
|
||||
|
||||
from bec_widgets.utils.bec_connector import BECConnector
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
from bec_widgets.utils.error_popups import SafeProperty
|
||||
from bec_widgets.utils.error_popups import SafeSlot as Slot
|
||||
|
||||
@@ -15,6 +16,9 @@ from .client_mocks import mocked_client
|
||||
class BECConnectorQObject(BECConnector, QObject): ...
|
||||
|
||||
|
||||
class _CleanupBroadcastWidget(BECWidget, QWidget): ...
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bec_connector(mocked_client):
|
||||
connector = BECConnectorQObject(client=mocked_client)
|
||||
@@ -146,6 +150,28 @@ def test_bec_connector_change_object_name(bec_connector):
|
||||
assert not any(obj.objectName() == previous_name for obj in all_objects)
|
||||
|
||||
|
||||
def test_bec_widget_cleanup_broadcasts_after_children_are_unregistered(mocked_client, qtbot):
|
||||
parent = _CleanupBroadcastWidget(client=mocked_client, object_name="cleanup_parent")
|
||||
child = _CleanupBroadcastWidget(
|
||||
parent=parent, client=mocked_client, object_name="cleanup_child"
|
||||
)
|
||||
qtbot.addWidget(parent)
|
||||
|
||||
observed_connections = []
|
||||
parent.rpc_register.callbacks.append(
|
||||
lambda connections: observed_connections.append(set(connections))
|
||||
)
|
||||
|
||||
parent.close()
|
||||
|
||||
assert parent._destroyed is True
|
||||
assert child.gui_id not in parent.rpc_register.list_all_connections()
|
||||
assert all(
|
||||
parent.gui_id in snapshot or child.gui_id not in snapshot
|
||||
for snapshot in observed_connections
|
||||
)
|
||||
|
||||
|
||||
def test_bec_connector_export_settings():
|
||||
|
||||
class MyWidget(BECConnector, QWidget):
|
||||
|
||||
@@ -5,7 +5,7 @@ from unittest import mock
|
||||
|
||||
import pytest
|
||||
from bec_lib import service_config
|
||||
from bec_lib.messages import ScanMessage
|
||||
from bec_lib.messages import GUIInstructionMessage, ScanMessage
|
||||
from bec_lib.serialization import MsgpackSerialization
|
||||
|
||||
from bec_widgets.utils.bec_dispatcher import BECDispatcher, QtRedisConnector, QtThreadSafeCallback
|
||||
@@ -213,3 +213,49 @@ def test_dispatcher_2_topic_same_cb_with_boundmethod(
|
||||
|
||||
send_msg_event.set()
|
||||
qtbot.wait(10)
|
||||
|
||||
|
||||
def test_qt_redis_connector_logs_rpc_before_qt_callback(monkeypatch):
|
||||
info_mock = mock.MagicMock()
|
||||
warning_mock = mock.MagicMock()
|
||||
monkeypatch.setattr("bec_widgets.utils.bec_dispatcher.logger.info", info_mock)
|
||||
monkeypatch.setattr("bec_widgets.utils.bec_dispatcher.logger.warning", warning_mock)
|
||||
|
||||
def callback(_msg, _metadata):
|
||||
pass
|
||||
|
||||
cb = QtThreadSafeCallback(callback)
|
||||
connector = QtRedisConnector("localhost:1", mock.MagicMock())
|
||||
rpc_msg = GUIInstructionMessage(
|
||||
action="set_value",
|
||||
parameter={"args": [1], "kwargs": {"source": "test"}, "gui_id": "ring"},
|
||||
metadata={
|
||||
"request_id": "dispatcher-request",
|
||||
"receiver": "gui",
|
||||
"object_name": "progressbar",
|
||||
"timeout": 0.1,
|
||||
"sent_at": 1.0,
|
||||
"deadline": 1.1,
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
connector._execute_callback(cb, {"data": rpc_msg}, {})
|
||||
|
||||
info_mock.assert_called_once()
|
||||
info_message = info_mock.call_args.args[0]
|
||||
assert "GUI RPC dispatcher received request before Qt callback emit" in info_message
|
||||
assert "request_id=dispatcher-request" in info_message
|
||||
assert "method=set_value" in info_message
|
||||
assert "receiver=gui" in info_message
|
||||
assert "target_gui_id=ring" in info_message
|
||||
assert "object_name=progressbar" in info_message
|
||||
assert "timeout=0.1" in info_message
|
||||
assert "stale_on_dispatch=True" in info_message
|
||||
|
||||
warning_mock.assert_called_once()
|
||||
warning_message = warning_mock.call_args.args[0]
|
||||
assert "received request after client timeout deadline" in warning_message
|
||||
assert "request_id=dispatcher-request" in warning_message
|
||||
finally:
|
||||
connector.shutdown()
|
||||
|
||||
@@ -1,10 +1,19 @@
|
||||
import signal
|
||||
import subprocess
|
||||
from contextlib import contextmanager
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
|
||||
from bec_widgets.cli.client import BECDockArea
|
||||
from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process
|
||||
from bec_widgets.cli.client_utils import (
|
||||
GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT,
|
||||
OUTPUT_READER_STOP_EVENT_ATTR,
|
||||
BECGuiClient,
|
||||
_join_process_output_thread,
|
||||
_start_plot_process,
|
||||
)
|
||||
from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCResponseTimeoutError, rpc_timeout
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -220,7 +229,7 @@ def test_client_utils_new_starts_server_when_not_alive():
|
||||
with mock.patch("bec_widgets.cli.client_utils.wait_for_server", _no_wait_for_server):
|
||||
with (
|
||||
mock.patch.object(gui, "_check_if_server_is_alive", return_value=False),
|
||||
mock.patch.object(gui, "start") as mock_start,
|
||||
mock.patch.object(gui, "show") as mock_start,
|
||||
):
|
||||
gui.new(wait=False, startup_profile=None)
|
||||
|
||||
@@ -257,3 +266,109 @@ def test_client_utils_delete_falls_back_to_direct_close():
|
||||
gui.delete("dock")
|
||||
|
||||
widget._run_rpc.assert_called_once_with("close")
|
||||
|
||||
|
||||
def test_client_utils_gui_client_set_rpc_timeout():
|
||||
gui = BECGuiClient()
|
||||
assert gui._rpc_timeout == 60
|
||||
|
||||
gui.set_rpc_timeout(10)
|
||||
assert gui._rpc_timeout == 10
|
||||
|
||||
|
||||
def test_client_utils_kill_server_waits_for_process_before_joining_output_thread():
|
||||
gui = BECGuiClient()
|
||||
gui._client = mock.MagicMock()
|
||||
gui._process = mock.MagicMock(pid=123, stdout=None, stderr=None)
|
||||
gui._process.poll.return_value = None
|
||||
order = []
|
||||
gui._process.wait.side_effect = lambda timeout: order.append("wait")
|
||||
gui._process_output_processing_thread = mock.MagicMock()
|
||||
gui._process_output_processing_thread.join.side_effect = lambda timeout: order.append("join")
|
||||
gui._process_output_processing_thread.is_alive.return_value = False
|
||||
|
||||
with (
|
||||
mock.patch.object(gui, "_request_server_shutdown", return_value=False),
|
||||
mock.patch("bec_widgets.cli.client_utils.os.getpgid", return_value=123),
|
||||
mock.patch("bec_widgets.cli.client_utils.os.killpg") as killpg,
|
||||
):
|
||||
gui.kill_server()
|
||||
|
||||
killpg.assert_called_once_with(123, signal.SIGTERM)
|
||||
assert order == ["wait", "join"]
|
||||
assert gui._process is None
|
||||
assert gui._process_output_processing_thread is None
|
||||
|
||||
|
||||
def test_client_utils_kill_server_requests_graceful_shutdown_before_signal():
|
||||
gui = BECGuiClient()
|
||||
gui._client = mock.MagicMock()
|
||||
process = mock.MagicMock(stdout=None, stderr=None)
|
||||
process.poll.return_value = None
|
||||
gui._process = process
|
||||
gui._process_output_processing_thread = mock.MagicMock()
|
||||
gui._process_output_processing_thread.is_alive.return_value = False
|
||||
launcher = mock.MagicMock()
|
||||
|
||||
with (
|
||||
mock.patch.object(
|
||||
BECGuiClient, "launcher", new_callable=mock.PropertyMock
|
||||
) as launcher_prop,
|
||||
mock.patch("bec_widgets.cli.client_utils.os.killpg") as killpg,
|
||||
):
|
||||
launcher_prop.return_value = launcher
|
||||
gui.kill_server()
|
||||
|
||||
launcher._run_rpc.assert_called_once_with(
|
||||
"system.shutdown", wait_for_rpc_response=True, timeout=GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT
|
||||
)
|
||||
process.wait.assert_called_once_with(timeout=5)
|
||||
killpg.assert_not_called()
|
||||
assert gui._process is None
|
||||
assert gui._process_output_processing_thread is None
|
||||
|
||||
|
||||
def test_client_utils_kill_server_kills_process_group_after_timeout():
|
||||
gui = BECGuiClient()
|
||||
gui._client = mock.MagicMock()
|
||||
process = mock.MagicMock(pid=123, stdout=None, stderr=None, args=["bec-gui-server"])
|
||||
process.poll.return_value = None
|
||||
process.wait.side_effect = [subprocess.TimeoutExpired(cmd="bec-gui-server", timeout=10), None]
|
||||
gui._process = process
|
||||
|
||||
with (
|
||||
mock.patch.object(gui, "_request_server_shutdown", return_value=False),
|
||||
mock.patch("bec_widgets.cli.client_utils.os.getpgid", return_value=123),
|
||||
mock.patch("bec_widgets.cli.client_utils.os.killpg") as killpg,
|
||||
mock.patch("bec_widgets.cli.client_utils.subprocess.run") as run,
|
||||
):
|
||||
run.return_value.stdout = "PID PPID PGID STAT COMMAND\n123 1 123 S bec-gui-server"
|
||||
gui.kill_server()
|
||||
|
||||
assert killpg.call_args_list == [mock.call(123, signal.SIGTERM), mock.call(123, signal.SIGKILL)]
|
||||
assert process.wait.call_args_list == [mock.call(timeout=10), mock.call(timeout=10)]
|
||||
run.assert_called_once_with(
|
||||
["ps", "-o", "pid,ppid,pgid,stat,command", "-g", "123"],
|
||||
check=False,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=2,
|
||||
)
|
||||
|
||||
|
||||
def test_join_process_output_thread_signals_reader_before_closing_streams():
|
||||
process = mock.MagicMock(pid=123, args=["bec-gui-server"])
|
||||
process.stdout = mock.MagicMock()
|
||||
process.stderr = mock.MagicMock()
|
||||
thread = mock.MagicMock()
|
||||
stop_event = mock.MagicMock()
|
||||
setattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, stop_event)
|
||||
thread.is_alive.side_effect = [True, False]
|
||||
logger = mock.MagicMock()
|
||||
|
||||
_join_process_output_thread(process, thread, logger)
|
||||
|
||||
assert thread.join.call_args_list == [mock.call(timeout=2), mock.call(timeout=2)]
|
||||
stop_event.set.assert_called_once_with()
|
||||
process.stdout.close.assert_called_once_with()
|
||||
process.stderr.close.assert_called_once_with()
|
||||
|
||||
@@ -98,11 +98,15 @@ def test_waiting_display(update_dialog, qtbot):
|
||||
mock_spinner_stop.assert_called_once()
|
||||
|
||||
|
||||
def test_update_cycle(update_dialog, qtbot):
|
||||
def test_update_cycle(update_dialog):
|
||||
update = {"enabled": False, "readoutPriority": "baseline", "deviceTags": {"tag"}}
|
||||
|
||||
def _mock_send(action="update", config=None, wait_for_response=True, timeout_s=None):
|
||||
update_dialog.client.device_manager.devices["test_device"]._config = config["test_device"] # type: ignore
|
||||
device = update_dialog.client.device_manager.devices["test_device"]
|
||||
device._config = {**device._config, **config["test_device"]} # type: ignore
|
||||
|
||||
update_dialog._q_threadpool = MagicMock()
|
||||
update_dialog._q_threadpool.start.side_effect = lambda runnable: runnable.run()
|
||||
|
||||
update_dialog._config_helper.send_config_request = MagicMock(side_effect=_mock_send)
|
||||
for item in update_dialog._form.enumerate_form_widgets():
|
||||
@@ -111,9 +115,7 @@ def test_update_cycle(update_dialog, qtbot):
|
||||
|
||||
assert update_dialog.updated_config() == update
|
||||
update_dialog.apply()
|
||||
qtbot.waitUntil(
|
||||
lambda: update_dialog._config_helper.send_config_request.call_count == 1, timeout=100
|
||||
)
|
||||
update_dialog._q_threadpool.start.assert_called_once()
|
||||
|
||||
update_dialog._config_helper.send_config_request.assert_called_with(
|
||||
action="update", config={"test_device": update}, wait_for_response=False
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from bec_lib.device import ReadoutPriority
|
||||
|
||||
@@ -124,6 +126,36 @@ def test_device_input_combobox_disabled_invalid_has_neutral_border(device_input_
|
||||
assert "red" in device_input_combobox.styleSheet()
|
||||
|
||||
|
||||
def test_device_input_combobox_cleanup_unregisters_callback(qtbot, mocked_client):
|
||||
with mock.patch.object(mocked_client.callbacks, "remove"):
|
||||
widget = DeviceComboBox(client=mocked_client)
|
||||
qtbot.addWidget(widget)
|
||||
callback_id = widget._callback_id
|
||||
|
||||
widget.close()
|
||||
widget.deleteLater()
|
||||
|
||||
mocked_client.callbacks.remove.assert_called_once_with(callback_id)
|
||||
assert widget._callback_id is None
|
||||
|
||||
|
||||
def test_device_input_combobox_cleanup_clears_callback_before_unregister(qtbot, mocked_client):
|
||||
widget = DeviceComboBox(client=mocked_client)
|
||||
qtbot.addWidget(widget)
|
||||
callback_id = widget._callback_id
|
||||
|
||||
def assert_callback_cleared(removed_callback_id):
|
||||
assert removed_callback_id == callback_id
|
||||
assert widget._callback_id is None
|
||||
|
||||
with mock.patch.object(
|
||||
mocked_client.callbacks, "remove", side_effect=assert_callback_cleared
|
||||
) as remove_mock:
|
||||
widget.cleanup()
|
||||
|
||||
remove_mock.assert_called_once_with(callback_id)
|
||||
|
||||
|
||||
def test_get_device_from_input_combobox_init(device_input_combobox):
|
||||
device_input_combobox.setCurrentIndex(0)
|
||||
device_text = device_input_combobox.currentText()
|
||||
|
||||
@@ -188,10 +188,56 @@ def test_linked_device_combobox_updates_signal_combobox_on_each_text_change(
|
||||
def test_device_signal_input_base_cleanup(qtbot, mocked_client):
|
||||
with mock.patch.object(mocked_client.callbacks, "remove"):
|
||||
widget = SignalComboBox(client=mocked_client)
|
||||
callback_id = widget._device_update_register
|
||||
widget.close()
|
||||
widget.deleteLater()
|
||||
|
||||
mocked_client.callbacks.remove.assert_called_once_with(widget._device_update_register)
|
||||
mocked_client.callbacks.remove.assert_called_once_with(callback_id)
|
||||
assert widget._device_update_register is None
|
||||
|
||||
|
||||
def test_signal_combobox_cleanup_clears_callback_before_unregister(qtbot, mocked_client):
|
||||
widget = create_widget(qtbot=qtbot, widget=SignalComboBox, client=mocked_client)
|
||||
callback_id = widget._device_update_register
|
||||
|
||||
def assert_callback_cleared(removed_callback_id):
|
||||
assert removed_callback_id == callback_id
|
||||
assert widget._device_update_register is None
|
||||
|
||||
with mock.patch.object(
|
||||
mocked_client.callbacks, "remove", side_effect=assert_callback_cleared
|
||||
) as remove_mock:
|
||||
widget.cleanup()
|
||||
|
||||
remove_mock.assert_called_once_with(callback_id)
|
||||
|
||||
|
||||
def test_signal_combobox_cleanup_blocks_in_flight_device_update(qtbot, mocked_client):
|
||||
widget = create_widget(qtbot=qtbot, widget=SignalComboBox, client=mocked_client)
|
||||
callback_id = widget._device_update_register
|
||||
|
||||
def trigger_in_flight_update(_):
|
||||
widget.update_signals_from_filters("reload", {})
|
||||
|
||||
with (
|
||||
mock.patch.object(
|
||||
mocked_client.callbacks, "remove", side_effect=trigger_in_flight_update
|
||||
) as remove_mock,
|
||||
mock.patch.object(widget, "_set_signal_groups") as set_signal_groups,
|
||||
):
|
||||
widget.cleanup()
|
||||
|
||||
remove_mock.assert_called_once_with(callback_id)
|
||||
set_signal_groups.assert_not_called()
|
||||
|
||||
|
||||
def test_signal_combobox_device_update_ignores_update_action(qtbot, mocked_client):
|
||||
widget = create_widget(qtbot=qtbot, widget=SignalComboBox, client=mocked_client)
|
||||
|
||||
with mock.patch.object(widget, "_set_signal_groups") as set_signal_groups:
|
||||
widget.update_signals_from_filters("update", {})
|
||||
|
||||
set_signal_groups.assert_not_called()
|
||||
|
||||
|
||||
def test_signal_combobox_get_signal_name_with_item_data(qtbot, device_signal_combobox):
|
||||
|
||||
@@ -4,7 +4,9 @@ import os
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from qtpy.QtCore import QObject
|
||||
from qtpy.QtGui import QFontMetrics
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
import bec_widgets
|
||||
from bec_widgets.applications.launch_window import START_EMPTY_PROFILE_OPTION, LaunchWindow
|
||||
@@ -16,6 +18,28 @@ from .client_mocks import mocked_client
|
||||
base_path = os.path.dirname(bec_widgets.__file__)
|
||||
|
||||
|
||||
def _launcher_child_connection(launcher: LaunchWindow, name: str) -> QObject:
|
||||
connection = QObject(parent=launcher)
|
||||
connection.gui_id = f"{launcher.gui_id}:{name}"
|
||||
connection.setObjectName(name)
|
||||
return connection
|
||||
|
||||
|
||||
def _top_level_connection(qtbot, name: str) -> QWidget:
|
||||
connection = QWidget()
|
||||
connection.gui_id = name
|
||||
connection.setObjectName(name)
|
||||
qtbot.addWidget(connection)
|
||||
return connection
|
||||
|
||||
|
||||
def _unparented_connection(name: str) -> QObject:
|
||||
connection = QObject()
|
||||
connection.gui_id = name
|
||||
connection.setObjectName(name)
|
||||
return connection
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bec_launch_window(qtbot, mocked_client):
|
||||
widget = LaunchWindow(client=mocked_client)
|
||||
@@ -117,20 +141,20 @@ def test_open_dock_area_with_start_empty_option_calls_launch(bec_launch_window):
|
||||
(["launcher", "dock_area", "scan_progress_simple", "scan_progress_full"], False),
|
||||
(
|
||||
["launcher", "dock_area", "scan_progress_simple", "scan_progress_full", "hover_widget"],
|
||||
True,
|
||||
False,
|
||||
),
|
||||
(["launcher", "external_window"], True),
|
||||
],
|
||||
)
|
||||
def test_gui_server_turns_off_the_lights(bec_launch_window, connection_names, hide):
|
||||
def test_gui_server_turns_off_the_lights(bec_launch_window, qtbot, connection_names, hide):
|
||||
connections = {}
|
||||
for name in connection_names:
|
||||
conn = mock.MagicMock()
|
||||
if name == "hover_widget":
|
||||
conn.parent.return_value = None
|
||||
conn.objectName.return_value = "HoverWidget"
|
||||
conn = _unparented_connection("HoverWidget")
|
||||
elif name == "external_window":
|
||||
conn = _top_level_connection(qtbot, "external_window")
|
||||
else:
|
||||
conn.parent.return_value = mock.MagicMock()
|
||||
conn.objectName.return_value = bec_launch_window.objectName()
|
||||
conn = _launcher_child_connection(bec_launch_window, name)
|
||||
connections[name] = conn
|
||||
with (
|
||||
mock.patch.object(bec_launch_window, "show") as mock_show,
|
||||
@@ -153,6 +177,23 @@ def test_gui_server_turns_off_the_lights(bec_launch_window, connection_names, hi
|
||||
mock_set_quit_on_last_window_closed.assert_called_once_with(True)
|
||||
|
||||
|
||||
def test_launcher_detects_external_main_window(bec_launch_window, qtbot):
|
||||
connection = _top_level_connection(qtbot, "BECMainWindowNoRPC")
|
||||
|
||||
assert bec_launch_window._has_external_window({"window": connection})
|
||||
|
||||
|
||||
def test_launcher_logs_unparented_non_window_connection_once(bec_launch_window):
|
||||
connection = _unparented_connection("HoverWidget")
|
||||
|
||||
with mock.patch("bec_widgets.applications.launch_window.logger.warning") as mock_warning:
|
||||
bec_launch_window._turn_off_the_lights({"window": connection})
|
||||
bec_launch_window._turn_off_the_lights({"window": connection})
|
||||
|
||||
mock_warning.assert_called_once()
|
||||
assert "HoverWidget" in mock_warning.call_args.args[0]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"connection_names, close_called",
|
||||
[
|
||||
@@ -163,11 +204,12 @@ def test_gui_server_turns_off_the_lights(bec_launch_window, connection_names, hi
|
||||
(["launcher", "dock_area", "scan_progress_simple", "scan_progress_full"], True),
|
||||
(
|
||||
["launcher", "dock_area", "scan_progress_simple", "scan_progress_full", "hover_widget"],
|
||||
False,
|
||||
True,
|
||||
),
|
||||
(["launcher", "external_window"], False),
|
||||
],
|
||||
)
|
||||
def test_launch_window_closes(bec_launch_window, connection_names, close_called):
|
||||
def test_launch_window_closes(bec_launch_window, qtbot, connection_names, close_called):
|
||||
"""
|
||||
Test that the close event is handled correctly based on the connections.
|
||||
If there are no connections or only the launcher connection, the window should close.
|
||||
@@ -175,13 +217,12 @@ def test_launch_window_closes(bec_launch_window, connection_names, close_called)
|
||||
"""
|
||||
connections = {}
|
||||
for name in connection_names:
|
||||
conn = mock.MagicMock()
|
||||
if name == "hover_widget":
|
||||
conn.parent.return_value = None
|
||||
conn.objectName.return_value = "HoverWidget"
|
||||
conn = _unparented_connection("HoverWidget")
|
||||
elif name == "external_window":
|
||||
conn = _top_level_connection(qtbot, "external_window")
|
||||
else:
|
||||
conn.parent.return_value = mock.MagicMock()
|
||||
conn.objectName.return_value = bec_launch_window.objectName()
|
||||
conn = _launcher_child_connection(bec_launch_window, name)
|
||||
connections[name] = conn
|
||||
close_event = mock.MagicMock()
|
||||
with mock.patch.object(
|
||||
|
||||
@@ -2,7 +2,7 @@ from unittest import mock
|
||||
|
||||
import pytest
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.messages import ScanQueueMessage
|
||||
from bec_lib.messages import VariableMessage
|
||||
from qtpy.QtCore import Qt, QTimer
|
||||
from qtpy.QtGui import QValidator
|
||||
from qtpy.QtWidgets import QPushButton
|
||||
@@ -34,15 +34,11 @@ class PositionerWithoutPrecision(Positioner):
|
||||
def positioner_box(qtbot, mocked_client):
|
||||
"""Fixture for PositionerBox widget"""
|
||||
with mock.patch(
|
||||
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_base.uuid.uuid4"
|
||||
) as mock_uuid:
|
||||
mock_uuid.return_value = "fake_uuid"
|
||||
with mock.patch(
|
||||
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_base.PositionerBoxBase._check_device_is_valid",
|
||||
return_value=True,
|
||||
):
|
||||
db = create_widget(qtbot, PositionerBox, device="samx", client=mocked_client)
|
||||
yield db
|
||||
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_base.PositionerBoxBase._check_device_is_valid",
|
||||
return_value=True,
|
||||
):
|
||||
db = create_widget(qtbot, PositionerBox, device="samx", client=mocked_client)
|
||||
yield db
|
||||
|
||||
|
||||
def test_positioner_box(positioner_box):
|
||||
@@ -89,16 +85,8 @@ def test_positioner_box_on_stop(positioner_box):
|
||||
"""Test on stop button"""
|
||||
with mock.patch.object(positioner_box.client.connector, "send") as mock_send:
|
||||
positioner_box.on_stop()
|
||||
params = {"device": "samx", "rpc_id": "fake_uuid", "func": "stop", "args": [], "kwargs": {}}
|
||||
msg = ScanQueueMessage(
|
||||
scan_type="device_rpc",
|
||||
parameter=params,
|
||||
queue="emergency",
|
||||
metadata={"RID": "fake_uuid", "response": False},
|
||||
)
|
||||
mock_send.assert_called_once_with(
|
||||
MessageEndpoints.scan_queue_request(positioner_box.client.username), msg
|
||||
)
|
||||
msg = VariableMessage(value=["samx"])
|
||||
mock_send.assert_called_once_with(MessageEndpoints.stop_devices(), msg)
|
||||
|
||||
|
||||
def test_positioner_box_setpoint_change(positioner_box):
|
||||
@@ -139,19 +127,15 @@ def test_positioner_control_line(qtbot, mocked_client):
|
||||
Inherits from PositionerBox, but the layout is changed. Check dimensions only
|
||||
"""
|
||||
with mock.patch(
|
||||
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_base.uuid.uuid4"
|
||||
) as mock_uuid:
|
||||
mock_uuid.return_value = "fake_uuid"
|
||||
with mock.patch(
|
||||
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box.positioner_box.PositionerBox._check_device_is_valid",
|
||||
return_value=True,
|
||||
):
|
||||
db = PositionerControlLine(device="samx", client=mocked_client)
|
||||
qtbot.addWidget(db)
|
||||
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box.positioner_box.PositionerBox._check_device_is_valid",
|
||||
return_value=True,
|
||||
):
|
||||
db = PositionerControlLine(device="samx", client=mocked_client)
|
||||
qtbot.addWidget(db)
|
||||
|
||||
assert db.ui.device_box.height() == db.height()
|
||||
assert db.ui.device_box.height() >= db.dimensions[0]
|
||||
assert db.ui.device_box.width() == 600
|
||||
assert db.ui.device_box.height() == db.height()
|
||||
assert db.ui.device_box.height() >= db.dimensions[0]
|
||||
assert db.ui.device_box.width() == 600
|
||||
|
||||
|
||||
def test_positioner_box_open_dialog_selection(qtbot, positioner_box):
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.messages import VariableMessage
|
||||
|
||||
from bec_widgets.widgets.control.device_control.positioner_box import PositionerBox2D
|
||||
|
||||
@@ -12,17 +14,13 @@ from .conftest import create_widget
|
||||
def positioner_box_2d(qtbot, mocked_client):
|
||||
"""Fixture for PositionerBox widget"""
|
||||
with mock.patch(
|
||||
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_base.uuid.uuid4"
|
||||
) as mock_uuid:
|
||||
mock_uuid.return_value = "fake_uuid"
|
||||
with mock.patch(
|
||||
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_base.PositionerBoxBase._check_device_is_valid",
|
||||
return_value=True,
|
||||
):
|
||||
db = create_widget(
|
||||
qtbot, PositionerBox2D, device_hor="samx", device_ver="samy", client=mocked_client
|
||||
)
|
||||
yield db
|
||||
"bec_widgets.widgets.control.device_control.positioner_box.positioner_box_base.PositionerBoxBase._check_device_is_valid",
|
||||
return_value=True,
|
||||
):
|
||||
db = create_widget(
|
||||
qtbot, PositionerBox2D, device_hor="samx", device_ver="samy", client=mocked_client
|
||||
)
|
||||
yield db
|
||||
|
||||
|
||||
def test_positioner_box_2d(positioner_box_2d):
|
||||
@@ -82,6 +80,14 @@ def test_positioner_box_setpoint_changes(positioner_box_2d: PositionerBox2D):
|
||||
mock_move.assert_called_once_with(100, relative=False)
|
||||
|
||||
|
||||
def test_positioner_box_2d_on_stop(positioner_box_2d: PositionerBox2D):
|
||||
"""Stop button sends both positioners to the immediate stop endpoint."""
|
||||
with mock.patch.object(positioner_box_2d.client.connector, "send") as mock_send:
|
||||
positioner_box_2d.on_stop()
|
||||
msg = VariableMessage(value=["samx", "samy"])
|
||||
mock_send.assert_called_once_with(MessageEndpoints.stop_devices(), msg)
|
||||
|
||||
|
||||
def _hor_buttons(widget: PositionerBox2D):
|
||||
return [
|
||||
widget.ui.tweak_increase_hor,
|
||||
|
||||
@@ -3,10 +3,12 @@ from unittest.mock import MagicMock
|
||||
import pytest
|
||||
from bec_lib.device import DeviceBaseWithConfig, Signal
|
||||
|
||||
from bec_widgets.cli.rpc import rpc_base as rpc_base_module
|
||||
from bec_widgets.cli.rpc.rpc_base import (
|
||||
DeletedWidgetError,
|
||||
RPCBase,
|
||||
RPCReference,
|
||||
RPCResponseTimeoutError,
|
||||
_transform_args_kwargs,
|
||||
)
|
||||
|
||||
@@ -51,3 +53,33 @@ def test_transform_args_kwargs():
|
||||
)
|
||||
assert args == ("full name", "short name", "string_arg", "full name")
|
||||
assert kwargs == {"a": "full name", "b": "short name", "c": "string_arg", "d": "full name"}
|
||||
|
||||
|
||||
def test_run_rpc_logs_response_timeout(monkeypatch):
|
||||
rpc = RPCBase(gui_id="progress_widget", object_name="progressbar")
|
||||
rpc._rpc_timeout = 0
|
||||
rpc._client = MagicMock()
|
||||
|
||||
info_mock = MagicMock()
|
||||
error_mock = MagicMock()
|
||||
monkeypatch.setattr(rpc_base_module.logger, "info", info_mock)
|
||||
monkeypatch.setattr(rpc_base_module.logger, "error", error_mock)
|
||||
|
||||
with pytest.raises(RPCResponseTimeoutError):
|
||||
rpc._run_rpc("set_value", 42, precision=2, timeout=0)
|
||||
|
||||
publish_msg = rpc._client.connector.set_and_publish.call_args.args[1]
|
||||
assert publish_msg.metadata["method"] == "set_value"
|
||||
assert publish_msg.metadata["target_gui_id"] == "progress_widget"
|
||||
assert publish_msg.metadata["object_name"] == "progressbar"
|
||||
assert publish_msg.metadata["timeout"] == 0
|
||||
assert publish_msg.metadata["deadline"] == publish_msg.metadata["sent_at"]
|
||||
assert info_mock.call_count == 1
|
||||
info_message = info_mock.call_args.args[0]
|
||||
error_mock.assert_called_once()
|
||||
error_message = error_mock.call_args.args[0]
|
||||
assert "GUI RPC response timeout" in error_message
|
||||
assert "method=set_value" in error_message
|
||||
assert "target_gui_id=progress_widget" in error_message
|
||||
assert "object_name=progressbar" in error_message
|
||||
assert "timeout=0" in error_message
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
import argparse
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from bec_lib.service_config import ServiceConfig
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
from bec_widgets.applications import companion_app as companion_app_module
|
||||
from bec_widgets.applications.companion_app import GUIServer
|
||||
from bec_widgets.utils import rpc_server as rpc_server_module
|
||||
from bec_widgets.utils.bec_connector import BECConnector
|
||||
from bec_widgets.utils.rpc_server import RegistryNotReadyError, RPCServer, SingleshotRPCRepeat
|
||||
|
||||
@@ -58,6 +60,68 @@ def test_gui_server_get_service_config(gui_server):
|
||||
assert gui_server._get_service_config().config == ServiceConfig().config
|
||||
|
||||
|
||||
def test_gui_server_signal_shutdown_closes_widgets_and_quits_app(gui_server):
|
||||
widget = MagicMock()
|
||||
gui_server.app = MagicMock()
|
||||
gui_server.app.topLevelWidgets.return_value = [widget]
|
||||
|
||||
gui_server.request_shutdown()
|
||||
|
||||
widget.close.assert_called_once()
|
||||
gui_server.app.quit.assert_called_once()
|
||||
|
||||
|
||||
def test_gui_server_shutdown_is_idempotent(gui_server):
|
||||
gui_server.launcher_window = MagicMock()
|
||||
gui_server.dispatcher = MagicMock()
|
||||
|
||||
with (
|
||||
patch.object(companion_app_module.shiboken6, "isValid", return_value=True),
|
||||
patch.object(companion_app_module.pylsp_server, "is_running", return_value=False),
|
||||
):
|
||||
gui_server.shutdown()
|
||||
gui_server.shutdown()
|
||||
|
||||
gui_server.launcher_window.close.assert_called_once()
|
||||
gui_server.launcher_window.deleteLater.assert_called_once()
|
||||
gui_server.dispatcher.stop_cli_server.assert_called_once()
|
||||
gui_server.dispatcher.disconnect_all.assert_called_once()
|
||||
|
||||
|
||||
def test_rpc_server_system_capabilities_include_shutdown(rpc_server):
|
||||
assert rpc_server.run_system_rpc("system.list_capabilities", [], {}) == {
|
||||
"system.launch_dock_area": True,
|
||||
"system.shutdown": True,
|
||||
}
|
||||
|
||||
|
||||
def test_rpc_server_system_shutdown_requests_gui_server_shutdown(rpc_server, qapp):
|
||||
gui_server = MagicMock()
|
||||
qapp.gui_server = gui_server
|
||||
|
||||
rpc_server.run_system_rpc("system.shutdown", [], {})
|
||||
qapp.processEvents()
|
||||
|
||||
gui_server.request_shutdown.assert_called_once()
|
||||
del qapp.gui_server
|
||||
|
||||
|
||||
def test_on_rpc_update_system_shutdown_sends_response_before_return(rpc_server):
|
||||
order = []
|
||||
rpc_server.run_system_rpc = MagicMock(side_effect=lambda *_args: order.append("shutdown"))
|
||||
rpc_server.send_response = MagicMock(side_effect=lambda *_args: order.append("response"))
|
||||
rpc_server.serialize_result_and_send = MagicMock()
|
||||
|
||||
rpc_server.on_rpc_update(
|
||||
{"action": "system.shutdown", "parameter": {"args": [], "kwargs": {}}},
|
||||
{"request_id": "shutdown-request", "sent_at": 1.0, "deadline": 10.0, "timeout": 2},
|
||||
)
|
||||
|
||||
assert order == ["shutdown", "response"]
|
||||
rpc_server.send_response.assert_called_once_with("shutdown-request", True, {"result": None})
|
||||
rpc_server.serialize_result_and_send.assert_not_called()
|
||||
|
||||
|
||||
def test_singleshot_rpc_repeat_raises_on_repeated_singleshot(rpc_server):
|
||||
"""
|
||||
Test that a singleshot RPC method raises an error when called multiple times.
|
||||
@@ -91,22 +155,34 @@ def test_serialize_result_and_send_with_singleshot_retry(rpc_server, qtbot, dumm
|
||||
# Third call succeeds
|
||||
return {"gui_id": dummy.gui_id, "success": True}
|
||||
|
||||
warning_mock = MagicMock()
|
||||
|
||||
# Patch serialize_object to control when it raises RegistryNotReadyError
|
||||
with patch.object(rpc_server, "serialize_object", side_effect=serialize_side_effect):
|
||||
with patch.object(rpc_server, "send_response") as mock_send_response:
|
||||
# Start the serialization process
|
||||
rpc_server._rpc_singleshot_repeats[request_id] = SingleshotRPCRepeat()
|
||||
rpc_server.serialize_result_and_send(request_id, dummy)
|
||||
with patch.object(rpc_server_module.logger, "warning", warning_mock):
|
||||
# Start the serialization process
|
||||
rpc_server._rpc_singleshot_repeats[request_id] = SingleshotRPCRepeat()
|
||||
rpc_server.serialize_result_and_send(request_id, dummy)
|
||||
|
||||
# Verify that serialize_object was called 3 times
|
||||
qtbot.waitUntil(lambda: call_count >= 3, timeout=5000)
|
||||
# Verify that serialize_object was called 3 times
|
||||
qtbot.waitUntil(lambda: call_count >= 3, timeout=5000)
|
||||
|
||||
# Verify that send_response was called with success
|
||||
mock_send_response.assert_called_once()
|
||||
args = mock_send_response.call_args[0]
|
||||
assert args[0] == request_id
|
||||
assert args[1] is True # accepted=True
|
||||
assert "result" in args[2]
|
||||
# Verify that send_response was called with success
|
||||
mock_send_response.assert_called_once()
|
||||
args = mock_send_response.call_args[0]
|
||||
assert args[0] == request_id
|
||||
assert args[1] is True # accepted=True
|
||||
assert "result" in args[2]
|
||||
|
||||
assert warning_mock.call_count == 2
|
||||
warning_logs = "\n".join(call.args[0] for call in warning_mock.call_args_list)
|
||||
assert "result serialization delayed; retrying" in warning_logs
|
||||
assert "request_id=test_request_123" in warning_logs
|
||||
assert "retry_delay_ms=100" in warning_logs
|
||||
assert "accumulated_delay_ms=100" in warning_logs
|
||||
assert "accumulated_delay_ms=200" in warning_logs
|
||||
assert "max_delay_ms=2000" in warning_logs
|
||||
|
||||
|
||||
def test_serialize_result_and_send_max_delay_exceeded(rpc_server, qtbot, dummy_widget):
|
||||
@@ -140,6 +216,56 @@ def test_serialize_result_and_send_max_delay_exceeded(rpc_server, qtbot, dummy_w
|
||||
assert "Max delay exceeded" in args[2]["error"]
|
||||
|
||||
|
||||
def test_send_response_logs_publish_status(rpc_server, monkeypatch):
|
||||
info_mock = MagicMock()
|
||||
error_mock = MagicMock()
|
||||
monkeypatch.setattr(rpc_server_module.logger, "info", info_mock)
|
||||
monkeypatch.setattr(rpc_server_module.logger, "error", error_mock)
|
||||
|
||||
with patch.object(rpc_server.client.connector, "set_and_publish") as publish_mock:
|
||||
rpc_server.send_response("request-ok", True, {"result": None})
|
||||
rpc_server.send_response("request-failed", False, {"error": "bad"})
|
||||
|
||||
assert publish_mock.call_count == 2
|
||||
assert "request_id=request-ok" in info_mock.call_args.args[0]
|
||||
assert "accepted=True" in info_mock.call_args.args[0]
|
||||
assert "request_id=request-failed" in error_mock.call_args.args[0]
|
||||
assert "accepted=False" in error_mock.call_args.args[0]
|
||||
|
||||
|
||||
def test_on_rpc_update_logs_late_client_deadline(rpc_server, monkeypatch):
|
||||
info_mock = MagicMock()
|
||||
warning_mock = MagicMock()
|
||||
monkeypatch.setattr(rpc_server_module.logger, "info", info_mock)
|
||||
monkeypatch.setattr(rpc_server_module.logger, "warning", warning_mock)
|
||||
|
||||
rpc_server.rpc_register.get_rpc_by_id = MagicMock()
|
||||
rpc_server.run_rpc = MagicMock(return_value=None)
|
||||
rpc_server.serialize_result_and_send = MagicMock()
|
||||
|
||||
rpc_server.on_rpc_update(
|
||||
{
|
||||
"action": "set_value",
|
||||
"parameter": {"args": [1], "kwargs": {"source": "test"}, "gui_id": "ring"},
|
||||
},
|
||||
{"request_id": "late-request", "timeout": 0.1, "sent_at": 1.0, "deadline": 1.1},
|
||||
)
|
||||
|
||||
received_log = info_mock.call_args_list[0].args[0]
|
||||
executed_log = info_mock.call_args_list[1].args[0]
|
||||
warning_logs = "\n".join(call.args[0] for call in warning_mock.call_args_list)
|
||||
|
||||
assert "GUI RPC server received request" in received_log
|
||||
assert "request_id=late-request" in received_log
|
||||
assert "method=set_value" in received_log
|
||||
assert "target_gui_id=ring" in received_log
|
||||
assert "timeout=0.1" in received_log
|
||||
assert "stale_on_receive=True" in received_log
|
||||
assert "response_after_client_deadline=True" in executed_log
|
||||
assert "received request after client timeout deadline" in warning_logs
|
||||
assert "response is late for client timeout" in warning_logs
|
||||
|
||||
|
||||
def test_run_rpc_delegates_to_rpc_content_class(rpc_server):
|
||||
class Content:
|
||||
USER_ACCESS = ["foo", "mode", "mode.setter"]
|
||||
|
||||
@@ -11,6 +11,7 @@ from bec_widgets.utils.forms_from_types.items import StrFormItem
|
||||
from bec_widgets.utils.widget_io import WidgetIO
|
||||
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
|
||||
from bec_widgets.widgets.control.scan_control import ScanControl
|
||||
from bec_widgets.widgets.control.scan_control.scan_info_adapter import ScanInfoAdapter
|
||||
|
||||
from .client_mocks import mocked_client
|
||||
|
||||
@@ -280,6 +281,397 @@ def test_populate_scans(scan_control, mocked_client):
|
||||
assert sorted(items) == sorted(expected_scans)
|
||||
|
||||
|
||||
def test_scan_control_uses_gui_visibility_and_signature(qtbot, mocked_client):
|
||||
scan_info = {
|
||||
"class": "AnnotatedScan",
|
||||
"base_class": "ScanBase",
|
||||
"arg_input": {
|
||||
"device": "DeviceBase",
|
||||
"start": {
|
||||
"Annotated": {
|
||||
"type": "float",
|
||||
"metadata": {
|
||||
"ScanArgument": {
|
||||
"display_name": "Start Position",
|
||||
"description": "Start position",
|
||||
"tooltip": "Custom start tooltip",
|
||||
"expert": False,
|
||||
"alternative_group": None,
|
||||
"units": None,
|
||||
"reference_units": "device",
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
"stop": {
|
||||
"Annotated": {
|
||||
"type": "float",
|
||||
"metadata": {
|
||||
"ScanArgument": {
|
||||
"display_name": None,
|
||||
"description": "Stop position",
|
||||
"tooltip": None,
|
||||
"expert": False,
|
||||
"alternative_group": None,
|
||||
"units": None,
|
||||
"reference_units": "device",
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
"arg_bundle_size": {"bundle": 3, "min": 1, "max": None},
|
||||
"gui_visibility": {
|
||||
"Movement Parameters": ["steps", "step_size"],
|
||||
"Acquisition Parameters": ["exp_time", "relative"],
|
||||
},
|
||||
"required_kwargs": [],
|
||||
"signature": [
|
||||
{"name": "args", "kind": "VAR_POSITIONAL", "default": "_empty", "annotation": "_empty"},
|
||||
{"name": "steps", "kind": "KEYWORD_ONLY", "default": 10, "annotation": "int"},
|
||||
{
|
||||
"name": "step_size",
|
||||
"kind": "KEYWORD_ONLY",
|
||||
"default": None,
|
||||
"annotation": {
|
||||
"Annotated": {
|
||||
"type": "float",
|
||||
"metadata": {
|
||||
"ScanArgument": {
|
||||
"display_name": "Step Size Custom",
|
||||
"description": "Step size",
|
||||
"tooltip": "Custom step tooltip",
|
||||
"expert": False,
|
||||
"alternative_group": "scan_resolution",
|
||||
"units": "mm",
|
||||
"reference_units": None,
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "exp_time",
|
||||
"kind": "KEYWORD_ONLY",
|
||||
"default": 0,
|
||||
"annotation": {
|
||||
"Annotated": {
|
||||
"type": "float",
|
||||
"metadata": {
|
||||
"ScanArgument": {
|
||||
"display_name": None,
|
||||
"description": None,
|
||||
"tooltip": "Exposure time",
|
||||
"expert": False,
|
||||
"alternative_group": None,
|
||||
"units": "s",
|
||||
"reference_units": None,
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
{"name": "relative", "kind": "KEYWORD_ONLY", "default": False, "annotation": "bool"},
|
||||
{"name": "kwargs", "kind": "VAR_KEYWORD", "default": "_empty", "annotation": "_empty"},
|
||||
],
|
||||
}
|
||||
mocked_client.connector.set_and_publish(
|
||||
MessageEndpoints.available_scans(),
|
||||
AvailableResourceMessage(resource={"annotated_scan": scan_info}),
|
||||
)
|
||||
|
||||
widget = ScanControl(client=mocked_client)
|
||||
qtbot.addWidget(widget)
|
||||
qtbot.waitExposed(widget)
|
||||
widget.comboBox_scan_selection.setCurrentText("annotated_scan")
|
||||
|
||||
assert widget.comboBox_scan_selection.count() == 1
|
||||
assert widget.arg_box.layout.itemAtPosition(0, 1).widget().text() == "Start Position"
|
||||
assert "Custom start tooltip\nUnits from: device" in widget.arg_box.widgets[1].toolTip()
|
||||
with patch.object(mocked_client.device_manager.devices.samx, "egu", return_value="mm"):
|
||||
WidgetIO.set_value(widget.arg_box.widgets[0], "samx")
|
||||
assert widget.arg_box.layout.itemAtPosition(0, 1).widget().text() == "Start Position"
|
||||
assert widget.arg_box.widgets[1].suffix() == " mm"
|
||||
assert "Custom start tooltip\nUnits: mm" in widget.arg_box.widgets[1].toolTip()
|
||||
widget.arg_box.widgets[0].setCurrentText("not_a_device")
|
||||
assert widget.arg_box.layout.itemAtPosition(0, 1).widget().text() == "Start Position"
|
||||
assert widget.arg_box.widgets[1].suffix() == ""
|
||||
assert "Custom start tooltip\nUnits from: device" in widget.arg_box.widgets[1].toolTip()
|
||||
assert [box.title() for box in widget.kwarg_boxes] == [
|
||||
"Movement Parameters",
|
||||
"Acquisition Parameters",
|
||||
]
|
||||
assert widget.kwarg_boxes[0].layout.itemAtPosition(0, 1).widget().text() == "Step Size Custom"
|
||||
assert widget.kwarg_boxes[0].widgets[1].suffix() == " mm"
|
||||
assert "Custom step tooltip\nUnits: mm" in widget.kwarg_boxes[0].widgets[1].toolTip()
|
||||
assert widget.kwarg_boxes[1].layout.itemAtPosition(0, 0).widget().text() == "Exp Time"
|
||||
assert "Exposure time\nUnits: s" in widget.kwarg_boxes[1].widgets[0].toolTip()
|
||||
|
||||
|
||||
def test_scan_info_adapter_skips_duplicate_visible_kwargs():
|
||||
scan_info = {
|
||||
"class": "DuplicateScan",
|
||||
"base_class": "ScanBaseV4",
|
||||
"arg_input": {},
|
||||
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
|
||||
"gui_visibility": {
|
||||
"Scan Parameters": ["relative", "burst_at_each_point"],
|
||||
"Acquisition Parameters": ["exp_time", "burst_at_each_point"],
|
||||
},
|
||||
"signature": [
|
||||
{"name": "relative", "kind": "KEYWORD_ONLY", "default": False, "annotation": "bool"},
|
||||
{
|
||||
"name": "burst_at_each_point",
|
||||
"kind": "KEYWORD_ONLY",
|
||||
"default": 1,
|
||||
"annotation": "int",
|
||||
},
|
||||
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0, "annotation": "float"},
|
||||
],
|
||||
}
|
||||
|
||||
gui_config = ScanInfoAdapter().build_scan_ui_config(scan_info)
|
||||
groups = {
|
||||
group["name"]: [input_spec["name"] for input_spec in group["inputs"]]
|
||||
for group in gui_config["kwarg_groups"]
|
||||
}
|
||||
|
||||
assert groups == {
|
||||
"Scan Parameters": ["relative", "burst_at_each_point"],
|
||||
"Acquisition Parameters": ["exp_time"],
|
||||
}
|
||||
|
||||
|
||||
def test_scan_info_adapter_supports_optional_annotated_types():
|
||||
scan_info = {
|
||||
"class": "OptionalScan",
|
||||
"base_class": "ScanBaseV4",
|
||||
"arg_input": {},
|
||||
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
|
||||
"gui_visibility": {"Matching": ["atol"]},
|
||||
"signature": [
|
||||
{
|
||||
"arg": False,
|
||||
"name": "atol",
|
||||
"annotation": {
|
||||
"Annotated": {
|
||||
"type": ["float", "NoneType"],
|
||||
"metadata": {
|
||||
"ScanArgument": {
|
||||
"display_name": "Tolerance",
|
||||
"tooltip": "Optional tolerance used for position matching",
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
"default": None,
|
||||
"kind": "KEYWORD_ONLY",
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
gui_config = ScanInfoAdapter().build_scan_ui_config(scan_info)
|
||||
input_spec = gui_config["kwarg_groups"][0]["inputs"][0]
|
||||
|
||||
assert input_spec["name"] == "atol"
|
||||
assert input_spec["type"] == "float"
|
||||
assert input_spec["optional"] is True
|
||||
assert input_spec["default"] is None
|
||||
assert input_spec["display_name"] == "Tolerance"
|
||||
|
||||
|
||||
def test_scan_info_adapter_rejects_unsupported_visible_inputs():
|
||||
scan_info = {
|
||||
"class": "UnsupportedScan",
|
||||
"base_class": "ScanBaseV4",
|
||||
"arg_input": {},
|
||||
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
|
||||
"gui_visibility": {"Regions": ["regions"]},
|
||||
"signature": [
|
||||
{
|
||||
"name": "regions",
|
||||
"kind": "KEYWORD_ONLY",
|
||||
"default": "_empty",
|
||||
"annotation": {
|
||||
"Generic": {
|
||||
"origin": "list",
|
||||
"args": [
|
||||
{"Generic": {"origin": "tuple", "args": ["float", "float", "int"]}}
|
||||
],
|
||||
}
|
||||
},
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
gui_config = ScanInfoAdapter().build_scan_ui_config(scan_info)
|
||||
unsupported_inputs = ScanInfoAdapter.unsupported_inputs(gui_config)
|
||||
|
||||
assert [input_spec["name"] for input_spec in unsupported_inputs] == ["regions"]
|
||||
assert ScanInfoAdapter.has_scan_ui_config(scan_info) is False
|
||||
|
||||
|
||||
def test_scan_info_adapter_skips_hidden_visible_kwargs():
|
||||
scan_info = {
|
||||
"class": "HiddenScan",
|
||||
"base_class": "ScanBaseV4",
|
||||
"arg_input": {},
|
||||
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
|
||||
"gui_visibility": {"Acquisition": ["exp_time", "internal_token"]},
|
||||
"signature": [
|
||||
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0, "annotation": "float"},
|
||||
{
|
||||
"name": "internal_token",
|
||||
"kind": "KEYWORD_ONLY",
|
||||
"default": None,
|
||||
"annotation": {
|
||||
"Annotated": {
|
||||
"type": "str",
|
||||
"metadata": {
|
||||
"ScanArgument": {"display_name": "Internal Token", "hidden": True}
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
gui_config = ScanInfoAdapter().build_scan_ui_config(scan_info)
|
||||
|
||||
assert [input_spec["name"] for input_spec in gui_config["kwarg_groups"][0]["inputs"]] == [
|
||||
"exp_time"
|
||||
]
|
||||
|
||||
|
||||
def test_scan_control_propagates_reference_units_across_kwarg_groups(qtbot, mocked_client):
|
||||
scan_info = {
|
||||
"class": "RoundScan",
|
||||
"base_class": "ScanBaseV4",
|
||||
"arg_input": {},
|
||||
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
|
||||
"gui_visibility": {
|
||||
"Motors": ["motor_1", "motor_2"],
|
||||
"Ring Parameters": ["inner_radius", "outer_radius", "center_1", "center_2"],
|
||||
},
|
||||
"required_kwargs": [],
|
||||
"signature": [
|
||||
{
|
||||
"name": "motor_1",
|
||||
"kind": "POSITIONAL_OR_KEYWORD",
|
||||
"default": "_empty",
|
||||
"annotation": "DeviceBase",
|
||||
},
|
||||
{
|
||||
"name": "motor_2",
|
||||
"kind": "POSITIONAL_OR_KEYWORD",
|
||||
"default": "_empty",
|
||||
"annotation": "DeviceBase",
|
||||
},
|
||||
{
|
||||
"name": "inner_radius",
|
||||
"kind": "POSITIONAL_OR_KEYWORD",
|
||||
"default": "_empty",
|
||||
"annotation": {
|
||||
"Annotated": {
|
||||
"type": "float",
|
||||
"metadata": {
|
||||
"ScanArgument": {
|
||||
"display_name": "Inner Radius",
|
||||
"units": None,
|
||||
"reference_units": "motor_1",
|
||||
"ge": 0,
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "outer_radius",
|
||||
"kind": "POSITIONAL_OR_KEYWORD",
|
||||
"default": "_empty",
|
||||
"annotation": {
|
||||
"Annotated": {
|
||||
"type": "float",
|
||||
"metadata": {
|
||||
"ScanArgument": {
|
||||
"display_name": "Outer Radius",
|
||||
"units": None,
|
||||
"reference_units": "motor_1",
|
||||
"ge": 0,
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "center_1",
|
||||
"kind": "KEYWORD_ONLY",
|
||||
"default": 0,
|
||||
"annotation": {
|
||||
"Annotated": {
|
||||
"type": "float",
|
||||
"metadata": {
|
||||
"ScanArgument": {
|
||||
"display_name": "Center Motor 1",
|
||||
"units": None,
|
||||
"reference_units": "motor_1",
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "center_2",
|
||||
"kind": "KEYWORD_ONLY",
|
||||
"default": 0,
|
||||
"annotation": {
|
||||
"Annotated": {
|
||||
"type": "float",
|
||||
"metadata": {
|
||||
"ScanArgument": {
|
||||
"display_name": "Center Motor 2",
|
||||
"units": None,
|
||||
"reference_units": "motor_2",
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
],
|
||||
}
|
||||
mocked_client.connector.set_and_publish(
|
||||
MessageEndpoints.available_scans(),
|
||||
AvailableResourceMessage(resource={"round_scan": scan_info}),
|
||||
)
|
||||
|
||||
widget = ScanControl(client=mocked_client)
|
||||
qtbot.addWidget(widget)
|
||||
qtbot.waitExposed(widget)
|
||||
widget.comboBox_scan_selection.setCurrentText("round_scan")
|
||||
|
||||
motor_box = widget.kwarg_boxes[0]
|
||||
ring_box = widget.kwarg_boxes[1]
|
||||
|
||||
assert "Units from: motor_1" in ring_box.widgets[0].toolTip()
|
||||
assert ring_box.widgets[0].suffix() == ""
|
||||
|
||||
with patch.object(mocked_client.device_manager.devices.samx, "egu", return_value="mm"):
|
||||
WidgetIO.set_value(motor_box.widgets[0], "samx")
|
||||
|
||||
assert ring_box.widgets[0].suffix() == " mm"
|
||||
assert ring_box.widgets[1].suffix() == " mm"
|
||||
assert ring_box.widgets[2].suffix() == " mm"
|
||||
assert ring_box.widgets[3].suffix() == ""
|
||||
assert "Units: mm" in ring_box.widgets[0].toolTip()
|
||||
|
||||
motor_box.widgets[0].setCurrentText("not_a_device")
|
||||
|
||||
assert ring_box.widgets[0].suffix() == ""
|
||||
assert ring_box.widgets[1].suffix() == ""
|
||||
assert ring_box.widgets[2].suffix() == ""
|
||||
assert "Units from: motor_1" in ring_box.widgets[0].toolTip()
|
||||
|
||||
|
||||
def test_current_scan(scan_control, mocked_client):
|
||||
current_scan = scan_control.current_scan
|
||||
wrong_scan = "error_scan"
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
|
||||
|
||||
from bec_widgets.utils.widget_io import WidgetIO
|
||||
from bec_widgets.widgets.control.scan_control.scan_group_box import ScanGroupBox
|
||||
from bec_widgets.widgets.control.scan_control.scan_group_box import ScanGroupBox, ScanOptionalWidget
|
||||
|
||||
|
||||
def test_kwarg_box(qtbot):
|
||||
@@ -67,28 +67,28 @@ def test_kwarg_box(qtbot):
|
||||
assert kwarg_box.widgets[0].__class__.__name__ == "ScanDoubleSpinBox"
|
||||
assert kwarg_box.widgets[0].arg_name == "exp_time"
|
||||
assert WidgetIO.get_value(kwarg_box.widgets[0]) == 0
|
||||
assert kwarg_box.widgets[0].toolTip() == "Exposure time in seconds"
|
||||
assert "Exposure time in seconds" in kwarg_box.widgets[0].toolTip()
|
||||
|
||||
# Widget 1
|
||||
assert kwarg_box.widgets[1].__class__.__name__ == "ScanSpinBox"
|
||||
assert kwarg_box.widgets[1].arg_name == "num_points"
|
||||
assert WidgetIO.get_value(kwarg_box.widgets[1]) == 1
|
||||
assert kwarg_box.widgets[1].toolTip() == "Number of points"
|
||||
assert "Number of points" in kwarg_box.widgets[1].toolTip()
|
||||
|
||||
# Widget 2
|
||||
assert kwarg_box.widgets[2].__class__.__name__ == "ScanCheckBox"
|
||||
assert kwarg_box.widgets[2].arg_name == "relative"
|
||||
assert WidgetIO.get_value(kwarg_box.widgets[2]) == False
|
||||
assert (
|
||||
kwarg_box.widgets[2].toolTip()
|
||||
== "If True, the motors will be moved relative to their current position"
|
||||
"If True, the motors will be moved relative to their current position"
|
||||
in kwarg_box.widgets[2].toolTip()
|
||||
)
|
||||
|
||||
# Widget 3
|
||||
assert kwarg_box.widgets[3].__class__.__name__ == "ScanLineEdit"
|
||||
assert kwarg_box.widgets[3].arg_name == "scan_type"
|
||||
assert WidgetIO.get_value(kwarg_box.widgets[3]) == "line"
|
||||
assert kwarg_box.widgets[3].toolTip() == "Type of scan"
|
||||
assert "Type of scan" in kwarg_box.widgets[3].toolTip()
|
||||
|
||||
parameters = kwarg_box.get_parameters()
|
||||
assert parameters == {"exp_time": 0, "num_points": 1, "relative": False, "scan_type": "line"}
|
||||
@@ -146,14 +146,130 @@ def test_arg_box(qtbot):
|
||||
assert arg_box.widgets[0].__class__.__name__ == "ScanLineEdit"
|
||||
assert arg_box.widgets[0].arg_name == "device"
|
||||
assert WidgetIO.get_value(arg_box.widgets[0]) == "samx"
|
||||
assert arg_box.widgets[0].toolTip() == "Device to scan"
|
||||
assert "Device to scan" in arg_box.widgets[0].toolTip()
|
||||
|
||||
# Widget 1
|
||||
assert arg_box.widgets[1].__class__.__name__ == "ScanDoubleSpinBox"
|
||||
assert arg_box.widgets[1].arg_name == "start"
|
||||
assert WidgetIO.get_value(arg_box.widgets[1]) == 0
|
||||
assert arg_box.widgets[1].toolTip() == "Start position"
|
||||
assert "Start position" in arg_box.widgets[1].toolTip()
|
||||
|
||||
# Widget 2
|
||||
assert arg_box.widgets[2].__class__.__name__ == "ScanSpinBox"
|
||||
assert arg_box.widgets[2].arg_name
|
||||
|
||||
|
||||
def test_spinbox_limits_from_scan_info(qtbot):
|
||||
group_input = {
|
||||
"name": "Kwarg Test",
|
||||
"inputs": [
|
||||
{
|
||||
"arg": False,
|
||||
"name": "exp_time",
|
||||
"type": "float",
|
||||
"display_name": "Exp Time",
|
||||
"tooltip": "Exposure time in seconds",
|
||||
"default": 2.0,
|
||||
"expert": False,
|
||||
"precision": 3,
|
||||
"gt": 1.5,
|
||||
"ge": None,
|
||||
"lt": 5.0,
|
||||
"le": None,
|
||||
},
|
||||
{
|
||||
"arg": False,
|
||||
"name": "num_points",
|
||||
"type": "int",
|
||||
"display_name": "Num Points",
|
||||
"tooltip": "Number of points",
|
||||
"default": 4,
|
||||
"expert": False,
|
||||
"gt": None,
|
||||
"ge": 3,
|
||||
"lt": 9,
|
||||
"le": None,
|
||||
},
|
||||
{
|
||||
"arg": False,
|
||||
"name": "settling_time",
|
||||
"type": "float",
|
||||
"display_name": "Settling Time",
|
||||
"tooltip": "Settling time in seconds",
|
||||
"default": 0.5,
|
||||
"expert": False,
|
||||
"gt": None,
|
||||
"ge": 0.2,
|
||||
"lt": None,
|
||||
"le": 3.5,
|
||||
},
|
||||
{
|
||||
"arg": False,
|
||||
"name": "steps",
|
||||
"type": "int",
|
||||
"display_name": "Steps",
|
||||
"tooltip": "Number of steps",
|
||||
"default": 4,
|
||||
"expert": False,
|
||||
"gt": 0,
|
||||
"ge": None,
|
||||
"lt": None,
|
||||
"le": 10,
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
kwarg_box = ScanGroupBox(box_type="kwargs", config=group_input)
|
||||
|
||||
exp_time = kwarg_box.widgets[0]
|
||||
num_points = kwarg_box.widgets[1]
|
||||
settling_time = kwarg_box.widgets[2]
|
||||
steps = kwarg_box.widgets[3]
|
||||
|
||||
assert exp_time.decimals() == 3
|
||||
assert exp_time.minimum() == 1.501
|
||||
assert exp_time.maximum() == 4.999
|
||||
assert num_points.minimum() == 3
|
||||
assert num_points.maximum() == 8
|
||||
assert settling_time.minimum() == 0.2
|
||||
assert settling_time.maximum() == 3.5
|
||||
assert steps.minimum() == 1
|
||||
assert steps.maximum() == 10
|
||||
|
||||
|
||||
def test_optional_kwarg_widget_round_trips_none(qtbot):
|
||||
group_input = {
|
||||
"name": "Kwarg Test",
|
||||
"inputs": [
|
||||
{
|
||||
"arg": False,
|
||||
"name": "atol",
|
||||
"type": "float",
|
||||
"display_name": "Tolerance",
|
||||
"tooltip": "Optional tolerance used for position matching",
|
||||
"default": None,
|
||||
"optional": True,
|
||||
"expert": False,
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
kwarg_box = ScanGroupBox(box_type="kwargs", config=group_input)
|
||||
|
||||
assert isinstance(kwarg_box.widgets[0], ScanOptionalWidget)
|
||||
assert kwarg_box.widgets[0].none_checkbox.text() == ""
|
||||
assert kwarg_box.widgets[0].is_none() is True
|
||||
assert kwarg_box.widgets[0].inner_widget.isEnabled() is False
|
||||
assert kwarg_box.get_parameters() == {"atol": None}
|
||||
|
||||
kwarg_box.set_parameters({"atol": 1.25})
|
||||
|
||||
assert kwarg_box.widgets[0].is_none() is False
|
||||
assert kwarg_box.widgets[0].inner_widget.isEnabled() is True
|
||||
assert WidgetIO.get_value(kwarg_box.widgets[0].inner_widget) == 1.25
|
||||
assert kwarg_box.get_parameters() == {"atol": 1.25}
|
||||
|
||||
kwarg_box.set_parameters({"atol": None})
|
||||
|
||||
assert kwarg_box.widgets[0].is_none() is True
|
||||
assert kwarg_box.get_parameters() == {"atol": None}
|
||||
|
||||
@@ -36,3 +36,26 @@ def test_toggle_click(qtbot, toggle):
|
||||
qtbot.mouseClick(toggle, Qt.LeftButton)
|
||||
toggle.paintEvent(None)
|
||||
assert toggle.checked is not init_state
|
||||
|
||||
|
||||
def test_toggle_disabled_state_blocks_clicks_and_restores_colors(qtbot, toggle):
|
||||
toggle.checked = True
|
||||
assert toggle._track_color == toggle.active_track_color
|
||||
assert toggle._thumb_color == toggle.active_thumb_color
|
||||
|
||||
toggle.setEnabled(False)
|
||||
|
||||
assert toggle._track_color == toggle._disabled_track_color
|
||||
assert toggle._thumb_color == toggle._disabled_thumb_color
|
||||
|
||||
qtbot.mouseClick(toggle, Qt.LeftButton)
|
||||
|
||||
assert toggle.checked is True
|
||||
assert toggle._track_color == toggle._disabled_track_color
|
||||
assert toggle._thumb_color == toggle._disabled_thumb_color
|
||||
|
||||
toggle.setEnabled(True)
|
||||
|
||||
assert toggle.checked is True
|
||||
assert toggle._track_color == toggle.active_track_color
|
||||
assert toggle._thumb_color == toggle.active_thumb_color
|
||||
|
||||
Reference in New Issue
Block a user