1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-16 05:30:54 +02:00

Compare commits

...

42 Commits

Author SHA1 Message Date
de7eaf7826 feat: added websitewidget 2024-04-23 09:23:17 +02:00
1694215c06 feat: added simple vscode widget 2024-04-21 10:08:44 +02:00
semantic-release
e55daee756 0.46.6
Automatically generated by python-semantic-release
2024-04-19 16:51:51 +00:00
1111610f32 fix(cli): fixed support for devices as cli input 2024-04-19 18:18:25 +02:00
81484e8160 ci: changed ophyd default branch to main 2024-04-19 13:34:44 +02:00
semantic-release
2e349bd705 0.46.5
Automatically generated by python-semantic-release
2024-04-19 08:17:58 +00:00
a156803389 test(rpc/bec_figure): test_rpc_plotting_shortcuts_init_configs extended by testing scatter z gradient for BECWaveform through RPC 2024-04-19 01:09:39 +02:00
2955b5ec02 refactor(rpc/client_utils): update script for grid_scan adds z axis device 2024-04-19 00:17:00 +02:00
ff52100e23 fix(widgets/figure): individual cleanup disabled, making stuck rpc 2024-04-19 00:16:22 +02:00
026c0792be fix(plots/waveform): colormap is correctly passed from BECFigure 2024-04-19 00:15:04 +02:00
b632ed1095 refactor(examples/jupyter_console_window): jupyter console debugging window moved to examples 2024-04-16 19:47:01 +02:00
semantic-release
98beea37e6 0.46.4
Automatically generated by python-semantic-release
2024-04-16 15:22:38 +00:00
4bcae0f921 ci: set branch name for semver 2024-04-16 17:10:48 +02:00
22fb5a5656 ci: fixed multi-project pipeline 2024-04-16 17:00:27 +02:00
4da625e439 fix: renaming of bec_client to bec_ipython_client 2024-04-16 17:00:27 +02:00
05e268d466 ci: "master" renamed to "main" in semver and pages section 2024-04-16 15:23:22 +02:00
42a9a0ca15 ci: added workflow .gitlab-ci.yml 2024-04-16 10:06:48 +02:00
b6feb9adb3 ci: CI_MERGE_REQUEST_TARGET_BRANCH_NAME changed to main 2024-04-16 09:53:16 +02:00
1bc18a201c test(e2e/rpc): rpc e2e tests extended 2024-04-16 09:51:39 +02:00
c12f2cee80 fix(plots/motor_map): user can get data as dict from BECMotorMap 2024-04-16 09:51:39 +02:00
c2c583fce6 fix(plots/image): user can get data as np.ndarray from BECImageItem 2024-04-16 09:51:39 +02:00
5600624c57 refactor(isort): isort applied 2024-04-16 09:51:39 +02:00
66c0649d7e ci(tests): unit tests ci path corrected 2024-04-16 09:51:39 +02:00
2446c401d9 test: unit tests moved to separate folder; scope of autouse bec_dispatcher fixture reduced only for unit tests; ci adjusted 2024-04-16 09:51:39 +02:00
4d0df364d3 test(end-2-end): rpc end-2-end tests 2024-04-16 09:51:39 +02:00
ecdf0f122b fix(rpc/server): server can accept client or dispatcher 2024-04-16 09:51:39 +02:00
df5234aa52 ci: pull images via gitlab dependency proxy 2024-04-16 09:31:01 +02:00
62080e6b40 Revert "ci: merge AdditionalTests with test stage"
This reverts commit 2e3f46ea36
2024-04-15 16:45:41 +02:00
2e3f46ea36 ci: merge AdditionalTests with test stage 2024-04-15 15:01:50 +02:00
be9847e9d2 refactor(plots/image): all rpc widgets can access config_dict as property 2024-04-15 11:45:06 +02:00
2f7317b328 refactor(plots/image): images are accessed as property .images -> returns list[BECImage] 2024-04-15 11:40:30 +02:00
bd3b1ba043 ci: changed default BEC branch to main 2024-04-12 17:04:42 +02:00
semantic-release
59e82dfd00 0.46.3
Automatically generated by python-semantic-release
2024-04-11 16:27:58 +00:00
0b86a0009d fix(test_fake_redis): TestMessage fixed to pydantic BaseModel 2024-04-11 15:28:06 +02:00
49327a8dbd fix(plots/motor_map): removed single callback flag for connecting device_readback motors 2024-04-11 11:53:28 +02:00
301bb916da test(utils/bec_dispatcher): tests fixed 2024-04-11 11:53:28 +02:00
285bf0164b fix(cli/client_utils): print_log is buffered; add output processing thread 2024-04-11 11:53:28 +02:00
90907e0a9c refactor(bec_dispatcher): new BEC dispatcher - rebased 2024-04-11 10:54:46 +02:00
9def3734af fix: producer->connector 2024-04-11 10:50:46 +02:00
semantic-release
3a241e897b 0.46.2
Automatically generated by python-semantic-release
2024-04-10 20:19:43 +00:00
ee617b73a2 fix(widget/plots): added "get_config" to all children of BECConnector to USER_ACCESS 2024-04-10 18:06:37 +02:00
92cea90971 refactor(utils/bec_dispatcher): new singleton definition 2024-04-10 16:41:28 +02:00
52 changed files with 896 additions and 699 deletions

View File

@@ -1,12 +1,22 @@
# This file is a template, and might need editing before it works on your project.
# Official language image. Look for the different tagged releases at:
# https://hub.docker.com/r/library/python/tags/
image: $CI_DOCKER_REGISTRY/python:3.10
image: $CI_DEPENDENCY_PROXY_GROUP_IMAGE_PREFIX/python:3.10
#commands to run in the Docker container before starting each job.
variables:
DOCKER_TLS_CERTDIR: ""
BEC_CORE_BRANCH: "master"
OPHYD_DEVICES_BRANCH: "master"
BEC_CORE_BRANCH: "main"
OPHYD_DEVICES_BRANCH: "main"
workflow:
rules:
- if: $CI_PIPELINE_SOURCE == "schedule"
- if: $CI_PIPELINE_SOURCE == "web"
- if: $CI_PIPELINE_SOURCE == "pipeline"
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
- if: $CI_COMMIT_BRANCH && $CI_OPEN_MERGE_REQUESTS
when: never
- if: $CI_COMMIT_BRANCH
include:
- template: Security/Secret-Detection.gitlab-ci.yml
@@ -17,6 +27,7 @@ stages:
- Formatter
- test
- AdditionalTests
- End2End
- Deploy
formatter:
@@ -87,7 +98,7 @@ tests:
- apt-get install -y libgl1-mesa-glx libegl1-mesa x11-utils libxkbcommon-x11-0 libdbus-1-3
- pip install -e ./bec/bec_lib[dev]
- pip install -e .[dev]
- coverage run --source=./bec_widgets -m pytest -v --junitxml=report.xml --random-order --full-trace ./tests
- coverage run --source=./bec_widgets -m pytest -v --junitxml=report.xml --random-order --full-trace ./tests/unit_tests
- coverage report
- coverage xml
coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/'
@@ -98,28 +109,64 @@ tests:
coverage_format: cobertura
path: coverage.xml
#tests-3.10-pyqt5: #todo enable when we decide what qt distributions we want to support
# extends: "tests"
# stage: AdditionalTests
# image: $CI_DOCKER_REGISTRY/python:3.10
# script:
# - apt-get update
# - apt-get install -y libgl1-mesa-glx libegl1-mesa x11-utils libxkbcommon-x11-0 libdbus-1-3
# - pip install .[dev,pyqt5]
# - pytest -v --random-order ./tests
tests-3.11:
extends: "tests"
stage: AdditionalTests
image: $CI_DOCKER_REGISTRY/python:3.11
image: $CI_DEPENDENCY_PROXY_GROUP_IMAGE_PREFIX/python:3.11
allow_failure: true
tests-3.12:
extends: "tests"
stage: AdditionalTests
image: $CI_DOCKER_REGISTRY/python:3.12
image: $CI_DEPENDENCY_PROXY_GROUP_IMAGE_PREFIX/python:3.12
allow_failure: true
end-2-end-conda:
stage: End2End
needs: []
image: continuumio/miniconda3
allow_failure: false
variables:
QT_QPA_PLATFORM: "offscreen"
script:
- apt-get update
- apt-get install -y libgl1-mesa-glx libegl1-mesa x11-utils libxkbcommon-x11-0 libdbus-1-3
- conda config --prepend channels conda-forge
- conda config --set channel_priority strict
- conda config --set always_yes yes --set changeps1 no
- conda create -q -n test-environment python=3.10
- conda init bash
- source ~/.bashrc
- conda activate test-environment
- git clone --branch $BEC_CORE_BRANCH https://gitlab.psi.ch/bec/bec.git
- git clone --branch $OPHYD_DEVICES_BRANCH https://gitlab.psi.ch/bec/ophyd_devices.git
- export OHPYD_DEVICES_PATH=$PWD/ophyd_devices
- cd ./bec
- source ./bin/install_bec_dev.sh -t
- pip install -e ./bec_lib[dev]
- pip install -e ./bec_ipython_client[dev]
- cd ../
- pip install -e .[dev]
- cd ./tests/end-2-end
- pytest --start-servers --flush-redis --random-order
artifacts:
when: on_failure
paths:
- ./logs/*.log
expire_in: 1 week
rules:
- if: '$CI_PIPELINE_SOURCE == "schedule"'
- if: '$CI_PIPELINE_SOURCE == "web"'
- if: '$CI_PIPELINE_SOURCE == "pipeline"'
- if: '$CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "main"'
- if: '$CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "production"'
semver:
stage: Deploy
@@ -144,10 +191,11 @@ semver:
semantic-release publish -v DEBUG
-D version_variable=./setup.py:__version__
-D hvcs=gitlab
-D branch=main
allow_failure: false
rules:
- if: '$CI_COMMIT_REF_NAME == "master"'
- if: '$CI_COMMIT_REF_NAME == "main"'
pages:
stage: Deploy
@@ -158,6 +206,6 @@ pages:
- if: '$CI_COMMIT_TAG != null'
variables:
TARGET_BRANCH: $CI_COMMIT_TAG
- if: '$CI_COMMIT_REF_NAME == "master"'
- if: '$CI_COMMIT_REF_NAME == "main"'
script:
- curl -X POST -d "branches=$CI_COMMIT_REF_NAME" -d "token=$RTD_TOKEN" https://readthedocs.org/api/v2/webhook/bec-widgets/253243/

View File

@@ -2,6 +2,43 @@
<!--next-version-placeholder-->
## v0.46.6 (2024-04-19)
### Fix
* **cli:** Fixed support for devices as cli input ([`1111610`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/1111610f3206c5c46db6b4bd1e8827f1a4cd9e3f))
## v0.46.5 (2024-04-19)
### Fix
* **widgets/figure:** Individual cleanup disabled, making stuck rpc ([`ff52100`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/ff52100e234debdfb5ccc0869352cfafde52ac93))
* **plots/waveform:** Colormap is correctly passed from BECFigure ([`026c079`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/026c0792bee25723013fffe57ccff10d9b652913))
## v0.46.4 (2024-04-16)
### Fix
* Renaming of bec_client to bec_ipython_client ([`4da625e`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/4da625e4398bdd937c2b788592f15f7530148292))
* **plots/motor_map:** User can get data as dict from BECMotorMap ([`c12f2ce`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/c12f2cee80b13137a2b70e2d121a079e20d124e2))
* **plots/image:** User can get data as np.ndarray from BECImageItem ([`c2c583f`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/c2c583fce6f28981990c504dd065705124e40e44))
* **rpc/server:** Server can accept client or dispatcher ([`ecdf0f1`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/ecdf0f122b628ee378b80793d498cedafe50fbf8))
## v0.46.3 (2024-04-11)
### Fix
* **test_fake_redis:** TestMessage fixed to pydantic BaseModel ([`0b86a00`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/0b86a0009d9366b710294a3ab55cb9f4894472c0))
* **plots/motor_map:** Removed single callback flag for connecting device_readback motors ([`49327a8`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/49327a8dbde270c67bc0ce7c757fd4a3eae118b4))
* **cli/client_utils:** Print_log is buffered; add output processing thread ([`285bf01`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/285bf0164b6deb91678f03ab2a190680b6d83a02))
* Producer->connector ([`9def373`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/9def3734afb361ac2d5cc933661766cdc440e09d))
## v0.46.2 (2024-04-10)
### Fix
* **widget/plots:** Added "get_config" to all children of `BECConnector` to USER_ACCESS ([`ee617b7`](https://gitlab.psi.ch/bec/bec-widgets/-/commit/ee617b73a2fcad8194394182fcecb0dd4f583a8e))
## v0.46.1 (2024-04-10)
### Fix

View File

@@ -6,14 +6,13 @@ from bec_widgets.cli.client_utils import BECFigureClientMixin, RPCBase, rpc_call
class BECPlotBase(RPCBase):
@property
@rpc_call
def get_config(self, dict_output: "bool" = True) -> "dict | BaseModel":
def config_dict(self) -> "dict":
"""
Get the configuration of the widget.
Args:
dict_output(bool): If True, return the configuration as a dictionary. If False, return the configuration as a pydantic model.
Returns:
dict: The configuration of the plot widget.
dict: The configuration of the widget.
"""
@rpc_call
@@ -137,6 +136,15 @@ class BECPlotBase(RPCBase):
class BECWaveform(RPCBase):
@property
@rpc_call
def config_dict(self) -> "dict":
"""
Get the configuration of the widget.
Returns:
dict: The configuration of the widget.
"""
@rpc_call
def add_curve_scan(
self,
@@ -258,16 +266,6 @@ class BECWaveform(RPCBase):
dict | pd.DataFrame: Data of all curves in the specified format.
"""
@rpc_call
def get_config(self, dict_output: "bool" = True) -> "dict | BaseModel":
"""
Get the configuration of the widget.
Args:
dict_output(bool): If True, return the configuration as a dictionary. If False, return the configuration as a pydantic model.
Returns:
dict: The configuration of the plot widget.
"""
@rpc_call
def set(self, **kwargs) -> "None":
"""
@@ -389,6 +387,15 @@ class BECWaveform(RPCBase):
class BECFigure(RPCBase, BECFigureClientMixin):
@property
@rpc_call
def config_dict(self) -> "dict":
"""
Get the configuration of the widget.
Returns:
dict: The configuration of the widget.
"""
@property
@rpc_call
def axes(self) -> "list[BECPlotBase]":
@@ -607,18 +614,17 @@ class BECFigure(RPCBase, BECFigureClientMixin):
Clear all widgets from the figure and reset to default state
"""
@rpc_call
def get_config(self, dict_output: "bool" = True) -> "dict | BaseModel":
"""
Get the configuration of the widget.
Args:
dict_output(bool): If True, return the configuration as a dictionary. If False, return the configuration as a pydantic model.
Returns:
dict: The configuration of the plot widget.
"""
class BECCurve(RPCBase):
@property
@rpc_call
def config_dict(self) -> "dict":
"""
Get the configuration of the widget.
Returns:
dict: The configuration of the widget.
"""
@rpc_call
def set(self, **kwargs):
"""
@@ -707,6 +713,15 @@ class BECCurve(RPCBase):
class BECImageShow(RPCBase):
@property
@rpc_call
def config_dict(self) -> "dict":
"""
Get the configuration of the widget.
Returns:
dict: The configuration of the widget.
"""
@rpc_call
def add_image_by_config(self, config: "ImageItemConfig | dict") -> "BECImageItem":
"""
@@ -730,14 +745,6 @@ class BECImageShow(RPCBase):
ImageItemConfig|dict: The configuration of the image.
"""
@rpc_call
def get_image_list(self) -> "list[BECImageItem]":
"""
Get the list of images.
Returns:
list[BECImageItem]: The list of images.
"""
@rpc_call
def get_image_dict(self) -> "dict[str, dict[str, BECImageItem]]":
"""
@@ -899,16 +906,6 @@ class BECImageShow(RPCBase):
use_threading(bool): Whether to use threading.
"""
@rpc_call
def get_config(self, dict_output: "bool" = True) -> "dict | BaseModel":
"""
Get the configuration of the widget.
Args:
dict_output(bool): If True, return the configuration as a dictionary. If False, return the configuration as a pydantic model.
Returns:
dict: The configuration of the plot widget.
"""
@rpc_call
def set(self, **kwargs) -> "None":
"""
@@ -1028,20 +1025,37 @@ class BECImageShow(RPCBase):
Remove the plot widget from the figure.
"""
@property
@rpc_call
def images(self) -> "list[BECImageItem]":
"""
Get the list of images.
Returns:
list[BECImageItem]: The list of images.
"""
class BECConnector(RPCBase):
@property
@rpc_call
def get_config(self, dict_output: "bool" = True) -> "dict | BaseModel":
def config_dict(self) -> "dict":
"""
Get the configuration of the widget.
Args:
dict_output(bool): If True, return the configuration as a dictionary. If False, return the configuration as a pydantic model.
Returns:
dict: The configuration of the plot widget.
dict: The configuration of the widget.
"""
class BECImageItem(RPCBase):
@property
@rpc_call
def config_dict(self) -> "dict":
"""
Get the configuration of the widget.
Returns:
dict: The configuration of the widget.
"""
@rpc_call
def set(self, **kwargs):
"""
@@ -1144,17 +1158,24 @@ class BECImageItem(RPCBase):
"""
@rpc_call
def get_config(self, dict_output: "bool" = True) -> "dict | BaseModel":
def get_data(self) -> "np.ndarray":
"""
Get the configuration of the widget.
Args:
dict_output(bool): If True, return the configuration as a dictionary. If False, return the configuration as a pydantic model.
Get the data of the image.
Returns:
dict: The configuration of the plot widget.
np.ndarray: The data of the image.
"""
class BECMotorMap(RPCBase):
@property
@rpc_call
def config_dict(self) -> "dict":
"""
Get the configuration of the widget.
Returns:
dict: The configuration of the widget.
"""
@rpc_call
def change_motors(
self,
@@ -1213,3 +1234,11 @@ class BECMotorMap(RPCBase):
Args:
scatter_size(int): Size of the scatter points.
"""
@rpc_call
def get_data(self) -> "dict":
"""
Get the data of the motor map.
Returns:
dict: Data of the motor map.
"""

View File

@@ -1,8 +1,11 @@
from __future__ import annotations
import importlib
import os
import select
import subprocess
import sys
import threading
import time
import uuid
from functools import wraps
@@ -33,6 +36,17 @@ def rpc_call(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
# we could rely on a strict type check here, but this is more flexible
# moreover, it would anyway crash for objects...
out = []
for arg in args:
if hasattr(arg, "name"):
arg = arg.name
out.append(arg)
args = tuple(out)
for key, val in kwargs.items():
if hasattr(val, "name"):
kwargs[key] = val.name
if not self.gui_is_alive():
raise RuntimeError("GUI is not alive")
return self._run_rpc(func.__name__, *args, **kwargs)
@@ -79,8 +93,9 @@ def update_script(figure: BECFigure, msg):
print(f"Scan {scan_number} is running")
dev_x = scan_report_devices[0]
dev_y = scan_report_devices[1]
dev_z = get_selected_device(monitored_devices, figure.selected_device)
figure.clear_all()
plt = figure.plot(dev_x, dev_y, label=f"Scan {scan_number}")
plt = figure.plot(dev_x, dev_y, dev_z, label=f"Scan {scan_number}")
plt.set(title=f"Scan {scan_number}", x_label=dev_x, y_label=dev_y)
elif scan_report_devices:
dev_x = scan_report_devices[0]
@@ -99,6 +114,7 @@ class BECFigureClientMixin:
self.update_script = update_script
self._target_endpoint = MessageEndpoints.scan_status()
self._selected_device = None
self.stderr_output = []
@property
def selected_device(self):
@@ -151,8 +167,10 @@ class BECFigureClientMixin:
if self._process is None:
return
self._run_rpc("close", (), wait_for_rpc_response=False)
self._process.kill()
self._process.terminate()
self._process_output_processing_thread.join()
self._process = None
self._client.shutdown()
def _start_plot_process(self) -> None:
"""
@@ -163,10 +181,12 @@ class BECFigureClientMixin:
monitor_module = importlib.import_module("bec_widgets.cli.server")
monitor_path = monitor_module.__file__
command = f"python {monitor_path} --id {self._gui_id}"
command = [sys.executable, "-u", monitor_path, "--id", self._gui_id]
self._process = subprocess.Popen(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
command, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
self._process_output_processing_thread = threading.Thread(target=self._get_output)
self._process_output_processing_thread.start()
def print_log(self) -> None:
"""
@@ -174,19 +194,22 @@ class BECFigureClientMixin:
"""
if self._process is None:
return
print(self._get_stderr_output())
print("".join(self.stderr_output))
# Flush list
self.stderr_output.clear()
def _get_stderr_output(self) -> str:
stderr_output = []
while self._process.poll() is not None:
readylist, _, _ = select.select([self._process.stderr], [], [], 0.1)
if not readylist:
break
line = self._process.stderr.readline()
if not line:
break
stderr_output.append(line.decode("utf-8"))
return "".join(stderr_output)
def _get_output(self) -> str:
os.set_blocking(self._process.stdout.fileno(), False)
os.set_blocking(self._process.stderr.fileno(), False)
while self._process.poll() is None:
readylist, _, _ = select.select([self._process.stdout, self._process.stderr], [], [], 1)
if self._process.stdout in readylist:
# print("*"*10, self._process.stdout.read(1024), flush=True, end="")
self._process.stdout.read(1024)
if self._process.stderr in readylist:
# print("!"*10, self._process.stderr.read(1024), flush=True, end="", file=sys.stderr)
print(self._process.stderr.read(1024), flush=True, end="", file=sys.stderr)
self.stderr_output.append(self._process.stderr.read(1024))
class RPCBase:

View File

@@ -12,9 +12,9 @@ from bec_widgets.widgets.plots import BECCurve, BECImageShow, BECWaveform
class BECWidgetsCLIServer:
WIDGETS = [BECWaveform, BECFigure, BECCurve, BECImageShow]
def __init__(self, gui_id: str = None, dispatcher: BECDispatcher = None) -> None:
def __init__(self, gui_id: str = None, dispatcher: BECDispatcher = None, client=None) -> None:
self.dispatcher = BECDispatcher() if dispatcher is None else dispatcher
self.client = self.dispatcher.client
self.client = self.dispatcher.client if client is None else client
self.client.start()
self.gui_id = gui_id
self.fig = BECFigure(gui_id=self.gui_id)

View File

@@ -0,0 +1,102 @@
import os
import numpy as np
import pyqtgraph as pg
from pyqtgraph.Qt import uic
from qtconsole.inprocess import QtInProcessKernelManager
from qtconsole.rich_jupyter_widget import RichJupyterWidget
from qtpy.QtWidgets import QApplication, QVBoxLayout, QWidget
from bec_widgets.utils import BECDispatcher
from bec_widgets.widgets import BECFigure
class JupyterConsoleWidget(RichJupyterWidget): # pragma: no cover:
def __init__(self):
super().__init__()
self.kernel_manager = QtInProcessKernelManager()
self.kernel_manager.start_kernel(show_banner=False)
self.kernel_client = self.kernel_manager.client()
self.kernel_client.start_channels()
self.kernel_manager.kernel.shell.push({"np": np, "pg": pg})
# self.set_console_font_size(70)
def shutdown_kernel(self):
self.kernel_client.stop_channels()
self.kernel_manager.shutdown_kernel()
class JupyterConsoleWindow(QWidget): # pragma: no cover:
"""A widget that contains a Jupyter console linked to BEC Widgets with full API access (contains Qt and pyqtgraph API)."""
def __init__(self, parent=None):
super().__init__(parent)
current_path = os.path.dirname(__file__)
uic.loadUi(os.path.join(current_path, "jupyter_console_window.ui"), self)
self._init_ui()
self.splitter.setSizes([200, 100])
self.safe_close = False
# self.figure.clean_signal.connect(self.confirm_close)
# console push
self.console.kernel_manager.kernel.shell.push(
{
"fig": self.figure,
"w1": self.w1,
"w2": self.w2,
"w3": self.w3,
"bec": self.figure.client,
"scans": self.figure.client.scans,
"dev": self.figure.client.device_manager.devices,
}
)
def _init_ui(self):
# Plotting window
self.glw_1_layout = QVBoxLayout(self.glw) # Create a new QVBoxLayout
self.figure = BECFigure(parent=self, gui_id="remote") # Create a new BECDeviceMonitor
self.glw_1_layout.addWidget(self.figure) # Add BECDeviceMonitor to the layout
# add stuff to figure
self._init_figure()
self.console_layout = QVBoxLayout(self.widget_console)
self.console = JupyterConsoleWidget()
self.console_layout.addWidget(self.console)
self.console.set_default_style("linux")
def _init_figure(self):
self.figure.plot("samx", "bpm4d")
self.figure.motor_map("samx", "samy")
self.figure.image("eiger", color_map="viridis", vrange=(0, 100))
self.figure.change_layout(2, 2)
self.w1 = self.figure[0, 0]
self.w2 = self.figure[0, 1]
self.w3 = self.figure[1, 0]
# curves for w1
self.w1.add_curve_scan("samx", "samy", "bpm4i", pen_style="dash")
self.w1.add_curve_scan("samx", "samy", "bpm3a", pen_style="dash")
self.c1 = self.w1.get_config()
if __name__ == "__main__": # pragma: no cover
import sys
bec_dispatcher = BECDispatcher()
client = bec_dispatcher.client
client.start()
app = QApplication(sys.argv)
app.setApplicationName("Jupyter Console")
win = JupyterConsoleWindow()
win.show()
sys.exit(app.exec_())

View File

@@ -3,7 +3,6 @@ import time
from bec_lib import MessageEndpoints, RedisConnector, messages
connector = RedisConnector("localhost:6379")
producer = connector.producer()
metadata = {}
scan_id = "ScanID1"
@@ -15,9 +14,7 @@ for ii in range(20):
data = {"mca1": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], "mca2": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]}
msg = messages.DeviceMessage(signals=data, metadata=metadata).dumps()
# producer.send(topic=MessageEndpoints.device_status(device="mca"), msg=msg)
producer.xadd(
connector.xadd(
topic=MessageEndpoints.device_async_readback(
scan_id=scan_id, device="mca"
), # scan_id will be different for each scan

View File

@@ -41,7 +41,7 @@ class StreamPlot(QtWidgets.QWidget):
uic.loadUi(os.path.join(current_path, "line_plot.ui"), self)
self._idle_time = 100
self.producer = RedisConnector(["localhost:6379"]).producer()
self.connector = RedisConnector(["localhost:6379"])
self.y_value_list = y_value_list
self.previous_y_value_list = None
@@ -215,7 +215,7 @@ class StreamPlot(QtWidgets.QWidget):
]
}
msg = messages.DeviceMessage(signals=return_dict).dumps()
self.producer.set_and_publish("px_stream/gui_event", msg=msg)
self.connector.set_and_publish("px_stream/gui_event", msg=msg)
self.roi_signal.emit(region)
def init_table(self):
@@ -271,7 +271,7 @@ class StreamPlot(QtWidgets.QWidget):
time.sleep(0.1)
continue
endpoint = f"px_stream/projection_{self._current_proj}/data"
msgs = self.client.producer.lrange(topic=endpoint, start=-1, end=-1)
msgs = self.client.connector.lrange(topic=endpoint, start=-1, end=-1)
data = msgs
if not data:
continue
@@ -296,7 +296,7 @@ class StreamPlot(QtWidgets.QWidget):
def new_proj(self, content: dict, _metadata: dict):
proj_nr = content["signals"]["proj_nr"]
endpoint = f"px_stream/projection_{proj_nr}/metadata"
msg_raw = self.client.producer.get(topic=endpoint)
msg_raw = self.client.connector.get(topic=endpoint)
msg = messages.DeviceMessage.loads(msg_raw)
self._current_q = msg.content["signals"]["q"]
self._current_norm = msg.content["signals"]["norm_sum"]

View File

@@ -31,11 +31,11 @@ class ConnectionConfig(BaseModel):
class BECConnector:
"""Connection mixin class for all BEC widgets, to handle BEC client and device manager"""
USER_ACCESS = ["get_config"]
USER_ACCESS = ["config_dict"]
def __init__(self, client=None, config: ConnectionConfig = None, gui_id: str = None):
# BEC related connections
self.bec_dispatcher = BECDispatcher()
self.bec_dispatcher = BECDispatcher(client=client)
self.client = self.bec_dispatcher.client if client is None else client
if config:
@@ -54,6 +54,24 @@ class BECConnector:
else:
self.gui_id = self.config.gui_id
@property
def config_dict(self) -> dict:
"""
Get the configuration of the widget.
Returns:
dict: The configuration of the widget.
"""
return self.config.model_dump()
@config_dict.setter
def config_dict(self, config: BaseModel) -> None:
"""
Get the configuration of the widget.
Returns:
dict: The configuration of the widget.
"""
self.config = config
@pyqtSlot(str)
def set_gui_id(self, gui_id: str) -> None:
"""

View File

@@ -1,54 +1,113 @@
from __future__ import annotations
import argparse
import itertools
import os
import collections
from collections.abc import Callable
from typing import Union
from typing import TYPE_CHECKING, Union
import redis
from bec_lib import BECClient, ServiceConfig
from bec_lib.endpoints import EndpointInfo
from bec_lib import BECClient
from bec_lib.redis_connector import MessageObject, RedisConnector
from qtpy.QtCore import QObject
from qtpy.QtCore import Signal as pyqtSignal
# Adding a new pyqt signal requires a class factory, as they must be part of the class definition
# and cannot be dynamically added as class attributes after the class has been defined.
_signal_class_factory = (
type(f"Signal{i}", (QObject,), dict(signal=pyqtSignal(dict, dict))) for i in itertools.count()
)
if TYPE_CHECKING:
from bec_lib.endpoints import EndpointInfo
class _Connection:
class QtThreadSafeCallback(QObject):
cb_signal = pyqtSignal(dict, dict)
def __init__(self, cb):
super().__init__()
self.cb = cb
self.cb_signal.connect(self.cb)
def __hash__(self):
# make 2 differents QtThreadSafeCallback to look
# identical when used as dictionary keys, if the
# callback is the same
return id(self.cb)
def __call__(self, msg_content, metadata):
self.cb_signal.emit(msg_content, metadata)
class QtRedisConnector(RedisConnector):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def _execute_callback(self, cb, msg, kwargs):
if not isinstance(cb, QtThreadSafeCallback):
return super()._execute_callback(cb, msg, kwargs)
# if msg.msg_type == "bundle_message":
# # big warning: how to handle bundle messages?
# # message with messages inside ; which slot to call?
# # bundle_msg = msg
# # for msg in bundle_msg:
# # ...
# # for now, only consider the 1st message
# msg = msg[0]
# raise RuntimeError(f"
if isinstance(msg, MessageObject):
if isinstance(msg.value, list):
msg = msg.value[0]
else:
msg = msg.value
# we can notice kwargs are lost when passed to Qt slot
metadata = msg.metadata
cb(msg.content, metadata)
else:
# from stream
msg = msg["data"]
cb(msg.content, msg.metadata)
class BECDispatcher:
"""Utility class to keep track of slots connected to a particular redis connector"""
def __init__(self, callback) -> None:
self.callback = callback
_instance = None
_initialized = False
self.slots = set()
# keep a reference to a new signal class, so it is not gc'ed
self._signal_container = next(_signal_class_factory)()
self.signal: pyqtSignal = self._signal_container.signal
class _BECDispatcher(QObject):
"""Utility class to keep track of slots connected to a particular redis connector"""
def __new__(cls, client=None, *args, **kwargs):
if cls._instance is None:
cls._instance = super(BECDispatcher, cls).__new__(cls)
cls._initialized = False
return cls._instance
def __init__(self, client=None):
super().__init__()
self.client = BECClient() if client is None else client
if self._initialized:
return
self._slots = collections.defaultdict(set)
self.client = client
if self.client is None:
self.client = BECClient(connector_cls=QtRedisConnector, forced=True)
else:
if self.client.started:
# have to reinitialize client to use proper connector
self.client.shutdown()
self.client._BECClient__init_params["connector_cls"] = QtRedisConnector
try:
self.client.start()
except redis.exceptions.ConnectionError:
print("Could not connect to Redis, skipping start of BECClient.")
self._connections = {}
self._initialized = True
@classmethod
def reset_singleton(cls):
cls._instance = None
cls._initialized = False
def connect_slot(
self,
slot: Callable,
topics: Union[EndpointInfo, str, list[Union[EndpointInfo, str]]],
single_callback_for_all_topics=False,
) -> None:
"""Connect widget's pyqt slot, so that it is called on new pub/sub topic message.
@@ -56,132 +115,27 @@ class _BECDispatcher(QObject):
slot (Callable): A slot method/function that accepts two inputs: content and metadata of
the corresponding pub/sub message
topics (EndpointInfo | str | list): A topic or list of topics that can typically be acquired via bec_lib.MessageEndpoints
single_callback_for_all_topics (bool): If True, use the same callback for all topics, otherwise use
separate callbacks.
"""
# Normalise the topics input
if isinstance(topics, (str, EndpointInfo)):
topics = [topics]
slot = QtThreadSafeCallback(slot)
self.client.connector.register(topics, cb=slot)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
self._slots[slot].update(set(topics_str))
endpoint_to_consumer_type = {
(topic.endpoint if isinstance(topic, EndpointInfo) else topic): (
topic.message_op.name if isinstance(topic, EndpointInfo) else "SEND"
)
for topic in topics
}
def disconnect_slot(self, slot: Callable, topics: Union[str, list]):
self.client.connector.unregister(topics, cb=slot)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
self._slots[slot].difference_update(set(topics_str))
if not self._slots[slot]:
del self._slots[slot]
# Group topics by consumer type
consumer_type_to_endpoints = {}
for endpoint, consumer_type in endpoint_to_consumer_type.items():
if consumer_type not in consumer_type_to_endpoints:
consumer_type_to_endpoints[consumer_type] = []
consumer_type_to_endpoints[consumer_type].append(endpoint)
def disconnect_topics(self, topics: Union[str, list]):
self.client.connector.unregister(topics)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
for slot in list(self._slots.keys()):
slot_topics = self._slots[slot]
slot_topics.difference_update(set(topics_str))
if not slot_topics:
del self._slots[slot]
for consumer_type, endpoints in consumer_type_to_endpoints.items():
topics_key = (
tuple(sorted(endpoints)) if single_callback_for_all_topics else tuple(endpoints)
)
if topics_key not in self._connections:
self._connections[topics_key] = self._create_connection(endpoints, consumer_type)
connection = self._connections[topics_key]
if slot not in connection.slots:
connection.signal.connect(slot)
connection.slots.add(slot)
def _create_connection(self, topics: list, consumer_type: str) -> _Connection:
"""Creates a new connection for given topics."""
def cb(msg):
if isinstance(msg, dict):
msg = msg["data"]
else:
msg = msg.value
for connection_key, connection in self._connections.items():
if set(topics).intersection(connection_key):
if isinstance(msg, list):
msg = msg[0]
connection.signal.emit(msg.content, msg.metadata)
try:
if consumer_type == "STREAM":
self.client.connector.register_stream(topics=topics, cb=cb, newest_only=True)
else:
self.client.connector.register(topics=topics, cb=cb)
except redis.exceptions.ConnectionError:
print("Could not connect to Redis, skipping registration of topics.")
return _Connection(cb)
def _do_disconnect_slot(self, topic, slot):
print(f"Disconnecting {slot} from {topic}")
connection = self._connections[topic]
try:
connection.signal.disconnect(slot)
except TypeError:
print(f"Could not disconnect slot:'{slot}' from topic:'{topic}'")
print("Continue to remove slot:'{slot}' from 'connection.slots'.")
connection.slots.remove(slot)
if not connection.slots:
del self._connections[topic]
def _disconnect_slot_from_topic(self, slot: Callable, topic: str) -> None:
"""A helper method to disconnect a slot from a specific topic.
Args:
slot (Callable): A slot to be disconnected
topic (str): A corresponding topic that can typically be acquired via
bec_lib.MessageEndpoints
"""
connection = self._connections.get(topic)
if connection and slot in connection.slots:
self._do_disconnect_slot(topic, slot)
def disconnect_slot(self, slot: Callable, topics: Union[str, list]) -> None:
"""Disconnect widget's pyqt slot from pub/sub updates on a topic.
Args:
slot (Callable): A slot to be disconnected
topics (str | list): A corresponding topic or list of topics that can typically be acquired via
bec_lib.MessageEndpoints
"""
# Normalise the topics input
if isinstance(topics, (str, EndpointInfo)):
topics = [topics]
endpoints = [
topic.endpoint if isinstance(topic, EndpointInfo) else topic for topic in topics
]
for key, connection in list(self._connections.items()):
if slot in connection.slots:
common_topics = set(endpoints).intersection(key)
if common_topics:
remaining_topics = set(key) - set(endpoints)
# Disconnect slot from common topics
self._do_disconnect_slot(key, slot)
# Reconnect slot to remaining topics if any
if remaining_topics:
self.connect_slot(slot, list(remaining_topics), True)
def disconnect_all(self):
"""Disconnect all slots from all topics."""
for key, connection in list(self._connections.items()):
for slot in list(connection.slots):
self._disconnect_slot_from_topic(slot, key)
# variable holding the Singleton instance of BECDispatcher
_bec_dispatcher = None
def BECDispatcher():
global _bec_dispatcher
if _bec_dispatcher is None:
parser = argparse.ArgumentParser()
parser.add_argument("--bec-client", default=None)
args, _ = parser.parse_known_args()
_bec_dispatcher = _BECDispatcher(args.bec_client)
return _bec_dispatcher
def disconnect_all(self, *args, **kwargs):
self.disconnect_topics(self.client.connector._topics_cb)

View File

@@ -0,0 +1,32 @@
from qtpy.QtCore import QUrl
from qtpy.QtWebEngineWidgets import QWebEngineView
from qtpy.QtWidgets import QApplication, QVBoxLayout, QWidget
class WebsiteWidget(QWidget):
def __init__(self, url):
super().__init__()
self.editor = QWebEngineView(self)
layout = QVBoxLayout()
layout.addWidget(self.editor)
self.setLayout(layout)
self.editor.setUrl(QUrl(url))
class VSCodeEditor(WebsiteWidget):
token = "bec"
host = "localhost"
port = 7000
def __init__(self):
super().__init__(f"http://{self.host}:{self.port}?tkn={self.token}")
if __name__ == "__main__":
import sys
app = QApplication(sys.argv)
mainWin = WebsiteWidget("https://scilog.psi.ch")
mainWin.show()
sys.exit(app.exec())

View File

@@ -0,0 +1,59 @@
"""
Module to handle the vscode server
"""
import subprocess
class VSCodeServer:
"""
Class to handle the vscode server
"""
_instance = None
def __init__(self, port=7000, token="bec"):
self.started = False
self._server = None
self.port = port
self.token = token
def __new__(cls, *args, forced=False, **kwargs):
if cls._instance is None or forced:
cls._instance = super(VSCodeServer, cls).__new__(cls)
return cls._instance
def start_server(self):
"""
Start the vscode server in a subprocess
"""
if self.started:
return
self._server = subprocess.Popen(
f"code serve-web --port {self.port} --connection-token={self.token} --accept-server-license-terms",
shell=True,
)
self.started = True
def wait(self):
"""
Wait for the server to finish
"""
if not self.started:
return
if not self._server:
return
self._server.wait()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Start the vscode server")
parser.add_argument("--port", type=int, default=7000, help="Port to start the server")
parser.add_argument("--token", type=str, default="bec", help="Token to start the server")
args = parser.parse_args()
server = VSCodeServer(port=args.port, token=args.token)
server.start_server()
server.wait()

View File

@@ -97,6 +97,7 @@ class WidgetHandler:
class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
USER_ACCESS = [
"config_dict",
"axes",
"widgets",
"add_plot",
@@ -109,7 +110,6 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
"change_layout",
"change_theme",
"clear_all",
"get_config",
]
clean_signal = pyqtSignal()
@@ -227,7 +227,7 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
y_entry=y_entry,
z_entry=z_entry,
color=color,
color_map=color_map_z,
color_map_z=color_map_z,
label=label,
validate=validate,
)
@@ -313,7 +313,7 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
y_entry=y_entry,
z_entry=z_entry,
color=color,
color_map=color_map_z,
color_map_z=color_map_z,
label=label,
validate=validate,
)
@@ -787,8 +787,8 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
def clear_all(self):
"""Clear all widgets from the figure and reset to default state"""
for widget in self._widgets.values():
widget.cleanup()
# for widget in self._widgets.values():
# widget.cleanup()
self.clear()
self._widgets = defaultdict(dict)
self.grid = []
@@ -796,158 +796,3 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
self.config = FigureConfig(
widget_class=self.__class__.__name__, gui_id=self.gui_id, theme=theme
)
##################################################
##################################################
# Debug window
##################################################
##################################################
from qtconsole.inprocess import QtInProcessKernelManager
from qtconsole.rich_jupyter_widget import RichJupyterWidget
class JupyterConsoleWidget(RichJupyterWidget): # pragma: no cover:
def __init__(self):
super().__init__()
self.kernel_manager = QtInProcessKernelManager()
self.kernel_manager.start_kernel(show_banner=False)
self.kernel_client = self.kernel_manager.client()
self.kernel_client.start_channels()
self.kernel_manager.kernel.shell.push({"np": np, "pg": pg})
# self.set_console_font_size(70)
def shutdown_kernel(self):
self.kernel_client.stop_channels()
self.kernel_manager.shutdown_kernel()
class DebugWindow(QWidget): # pragma: no cover:
"""Debug window for BEC widgets"""
def __init__(self, parent=None):
super().__init__(parent)
current_path = os.path.dirname(__file__)
uic.loadUi(os.path.join(current_path, "figure_debug_minimal.ui"), self)
self._init_ui()
self.splitter.setSizes([200, 100])
self.safe_close = False
# self.figure.clean_signal.connect(self.confirm_close)
# console push
self.console.kernel_manager.kernel.shell.push(
{
"fig": self.figure,
"w1": self.w1,
"w2": self.w2,
"w3": self.w3,
"w4": self.w4,
"bec": self.figure.client,
"scans": self.figure.client.scans,
"dev": self.figure.client.device_manager.devices,
}
)
def _init_ui(self):
# Plotting window
self.glw_1_layout = QVBoxLayout(self.glw) # Create a new QVBoxLayout
self.figure = BECFigure(parent=self, gui_id="remote") # Create a new BECDeviceMonitor
self.glw_1_layout.addWidget(self.figure) # Add BECDeviceMonitor to the layout
# add stuff to figure
self._init_figure()
self.console_layout = QVBoxLayout(self.widget_console)
self.console = JupyterConsoleWidget()
self.console_layout.addWidget(self.console)
self.console.set_default_style("linux")
def _init_figure(self):
# self.figure.add_widget(widget_type="Waveform1D", row=0, col=0, title="Widget 1")
self.figure.plot("samx", "bpm4d")
self.figure.add_widget(widget_type="Waveform1D", row=0, col=1, title="Widget 2")
self.figure.add_image(
title="Image", row=1, col=0, color_map="viridis", color_bar="simple", vrange=(0, 100)
)
self.figure.add_image(title="Image", row=1, col=1, vrange=(0, 100))
self.w1 = self.figure[0, 0]
self.w2 = self.figure[0, 1]
self.w3 = self.figure[1, 0]
self.w4 = self.figure[1, 1]
# curves for w1
self.w1.add_curve_scan("samx", "samy", "bpm4i", pen_style="dash")
self.w1.add_curve_scan("samx", "samy", "bpm3a", pen_style="dash")
# self.w1.add_curve_custom(
# x=[1, 2, 3, 4, 5],
# y=[1, 2, 3, 4, 5],
# label="curve-custom",
# color="blue",
# pen_style="dashdot",
# )
self.c1 = self.w1.get_config()
# curves for w2
self.w2.add_curve_scan("samx", "bpm3a", pen_style="solid")
self.w2.add_curve_scan("samx", "bpm4d", pen_style="dot")
self.w2.add_curve_custom(
x=[1, 2, 3, 4, 5], y=[5, 4, 3, 2, 1], color="red", pen_style="dashdot"
)
# curves for w3
# self.w3.add_curve_scan("samx", "bpm4i", pen_style="dash")
# self.w3.add_curve_custom(
# x=[1, 2, 3, 4, 5],
# y=[1, 2, 3, 4, 5],
# label="curve-custom",
# color="blue",
# pen_style="dashdot",
# )
# curves for w4
# self.w4.add_curve_scan("samx", "bpm4i", pen_style="dash")
# self.w4.add_curve_custom(
# x=[1, 2, 3, 4, 5],
# y=[1, 2, 3, 4, 5],
# label="curve-custom",
# color="blue",
# pen_style="dashdot",
# )
# Image setting for w3
self.w3.add_monitor_image("eiger", vrange=(0, 100), color_bar="full")
# Image setting for w4
self.w4.add_monitor_image("eiger", vrange=(0, 100), color_map="viridis")
# def confirm_close(self):
# self.safe_close = True
#
# def closeEvent(self, event):
# self.figure.cleanup()
# if self.safe_close == True:
# print("Safe close")
# event.accept()
if __name__ == "__main__": # pragma: no cover
import sys
bec_dispatcher = BECDispatcher()
client = bec_dispatcher.client
client.start()
app = QApplication(sys.argv)
win = DebugWindow()
win.show()
sys.exit(app.exec_())

View File

@@ -217,7 +217,6 @@ class MotorMap(pg.GraphicsLayoutWidget):
bec_dispatcher.connect_slot(
self.on_device_readback,
endpoints,
single_callback_for_all_topics=True,
)
def _add_limits_to_plot_data(self):

View File

@@ -59,6 +59,7 @@ class ImageConfig(WidgetConfig):
class BECImageItem(BECConnector, pg.ImageItem):
USER_ACCESS = [
"config_dict",
"set",
"set_fft",
"set_log",
@@ -70,7 +71,7 @@ class BECImageItem(BECConnector, pg.ImageItem):
"set_auto_downsample",
"set_monitor",
"set_vrange",
"get_config",
"get_data",
]
def __init__(
@@ -243,6 +244,14 @@ class BECImageItem(BECConnector, pg.ImageItem):
self.color_bar.setLevels(min=vmin, max=vmax)
self.color_bar.setHistogramRange(vmin - 0.1 * vmin, vmax + 0.1 * vmax)
def get_data(self) -> np.ndarray:
"""
Get the data of the image.
Returns:
np.ndarray: The data of the image.
"""
return self.image
def _add_color_bar(
self, color_bar_style: str = "simple", vrange: Optional[tuple[int, int]] = None
):
@@ -281,9 +290,9 @@ class BECImageItem(BECConnector, pg.ImageItem):
class BECImageShow(BECPlotBase):
USER_ACCESS = [
"config_dict",
"add_image_by_config",
"get_image_config",
"get_image_list",
"get_image_dict",
"add_monitor_image",
"add_custom_image",
@@ -298,7 +307,6 @@ class BECImageShow(BECPlotBase):
"set_rotation",
"set_transpose",
"toggle_threading",
"get_config",
"set",
"set_title",
"set_x_label",
@@ -311,6 +319,7 @@ class BECImageShow(BECPlotBase):
"lock_aspect_ratio",
"plot",
"remove",
"images",
]
def __init__(
@@ -448,7 +457,8 @@ class BECImageShow(BECPlotBase):
else:
return image.config # TODO check if this works
def get_image_list(self) -> list[BECImageItem]:
@property
def images(self) -> list[BECImageItem]:
"""
Get the list of images.
Returns:
@@ -460,6 +470,16 @@ class BECImageShow(BECPlotBase):
images.append(image)
return images
@images.setter
def images(self, value: dict[str, dict[str, BECImageItem]]):
"""
Set the images from a dictionary.
Args:
value (dict[str, dict[str, BECImageItem]]): The images to set, organized by source and id.
"""
self._images = value
def get_image_dict(self) -> dict[str, dict[str, BECImageItem]]:
"""
Get all images.

View File

@@ -36,12 +36,14 @@ class MotorMapConfig(WidgetConfig):
class BECMotorMap(BECPlotBase):
USER_ACCESS = [
"config_dict",
"change_motors",
"set_max_points",
"set_precision",
"set_num_dim_points",
"set_background_value",
"set_scatter_size",
"get_data",
]
# QT Signals
@@ -127,6 +129,18 @@ class BECMotorMap(BECPlotBase):
# Redraw the motor map
self._make_motor_map()
def get_data(self) -> dict:
"""
Get the data of the motor map.
Returns:
dict: Data of the motor map.
"""
data = {
"x": self.database_buffer["x"],
"y": self.database_buffer["y"],
}
return data
# TODO setup all visual properties
def set_max_points(self, max_points: int) -> None:
"""
@@ -185,9 +199,7 @@ class BECMotorMap(BECPlotBase):
MessageEndpoints.device_readback(self.motor_y),
]
self.bec_dispatcher.connect_slot(
self.on_device_readback, endpoints, single_callback_for_all_topics=True
)
self.bec_dispatcher.connect_slot(self.on_device_readback, endpoints)
def _make_motor_map(self):
"""

View File

@@ -37,7 +37,7 @@ class WidgetConfig(ConnectionConfig):
class BECPlotBase(BECConnector, pg.GraphicsLayout):
USER_ACCESS = [
"get_config",
"config_dict",
"set",
"set_title",
"set_x_label",

View File

@@ -64,6 +64,7 @@ class Waveform1DConfig(WidgetConfig):
class BECCurve(BECConnector, pg.PlotDataItem):
USER_ACCESS = [
"config_dict",
"set",
"set_data",
"set_color",
@@ -227,6 +228,7 @@ class BECCurve(BECConnector, pg.PlotDataItem):
class BECWaveform(BECPlotBase):
USER_ACCESS = [
"config_dict",
"add_curve_scan",
"add_curve_custom",
"remove_curve",
@@ -236,7 +238,6 @@ class BECWaveform(BECPlotBase):
"get_curve_config",
"apply_config",
"get_all_data",
"get_config",
"set",
"set_title",
"set_x_label",

View File

@@ -1,7 +1,7 @@
# pylint: disable= missing-module-docstring
from setuptools import find_packages, setup
__version__ = "0.46.1"
__version__ = "0.46.6"
# Default to PyQt6 if no other Qt binding is installed
QT_DEPENDENCY = "PyQt6>=6.0"

View File

@@ -1,36 +0,0 @@
import threading
import pytest
from bec_lib.bec_service import BECService
from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module
@pytest.fixture()
def threads_check():
current_threads = set(
th
for th in threading.enumerate()
if "loguru" not in th.name and th is not threading.main_thread()
)
yield
threads_after = set(
th
for th in threading.enumerate()
if "loguru" not in th.name and th is not threading.main_thread()
)
additional_threads = threads_after - current_threads
assert (
len(additional_threads) == 0
), f"Test creates {len(additional_threads)} threads that are not cleaned: {additional_threads}"
@pytest.fixture(autouse=True)
def bec_dispatcher(threads_check):
bec_dispatcher = bec_dispatcher_module.BECDispatcher()
yield bec_dispatcher
bec_dispatcher.disconnect_all()
# clean BEC client
bec_dispatcher.client.shutdown()
# reinitialize singleton for next test
bec_dispatcher_module._bec_dispatcher = None

View File

@@ -0,0 +1,181 @@
import numpy as np
import pytest
from bec_lib import MessageEndpoints
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
from bec_widgets.cli.server import BECWidgetsCLIServer
from bec_widgets.utils import BECDispatcher
@pytest.fixture
def rpc_server(qtbot, bec_client_lib, threads_check):
dispatcher = BECDispatcher(client=bec_client_lib) # Has to init singleton with fixture client
server = BECWidgetsCLIServer(gui_id="id_test")
qtbot.addWidget(server.fig)
qtbot.waitExposed(server.fig)
qtbot.wait(1000) # 1s long to wait until gui is ready
yield server
dispatcher.disconnect_all()
server.client.shutdown()
server.shutdown()
dispatcher.reset_singleton()
def test_rpc_waveform1d_custom_curve(rpc_server, qtbot):
fig = BECFigure(rpc_server.gui_id)
fig_server = rpc_server.fig
ax = fig.add_plot()
curve = ax.add_curve_custom([1, 2, 3], [1, 2, 3])
curve.set_color("red")
curve = ax.curves[0]
curve.set_color("blue")
assert len(fig_server.widgets) == 1
assert len(fig_server.widgets["widget_1"].curves) == 1
def test_rpc_plotting_shortcuts_init_configs(rpc_server, qtbot):
fig = BECFigure(rpc_server.gui_id)
fig_server = rpc_server.fig
plt = fig.plot("samx", "bpm4i")
im = fig.image("eiger")
motor_map = fig.motor_map("samx", "samy")
plt_z = fig.add_plot("samx", "samy", "bpm4i")
# Checking if classes are correctly initialised
assert len(fig_server.widgets) == 4
assert plt.__class__.__name__ == "BECWaveform"
assert plt.__class__ == BECWaveform
assert im.__class__.__name__ == "BECImageShow"
assert im.__class__ == BECImageShow
assert motor_map.__class__.__name__ == "BECMotorMap"
assert motor_map.__class__ == BECMotorMap
# check if the correct devices are set
# plot
assert plt.config_dict["curves"]["bpm4i-bpm4i"]["signals"] == {
"source": "scan_segment",
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
"y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None, "limits": None},
"z": None,
}
# image
assert im.config_dict["images"]["eiger"]["monitor"] == "eiger"
# motor map
assert motor_map.config_dict["signals"] == {
"source": "device_readback",
"x": {
"name": "samx",
"entry": "samx",
"unit": None,
"modifier": None,
"limits": [-50.0, 50.0],
},
"y": {
"name": "samy",
"entry": "samy",
"unit": None,
"modifier": None,
"limits": [-50.0, 50.0],
},
"z": None,
}
# plot with z scatter
assert plt_z.config_dict["curves"]["bpm4i-bpm4i"]["signals"] == {
"source": "scan_segment",
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
"y": {"name": "samy", "entry": "samy", "unit": None, "modifier": None, "limits": None},
"z": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None, "limits": None},
}
def test_rpc_waveform_scan(rpc_server, qtbot):
fig = BECFigure(rpc_server.gui_id)
# add 3 different curves to track
plt = fig.plot("samx", "bpm4i")
fig.plot("samx", "bpm3a")
fig.plot("samx", "bpm4d")
client = rpc_server.client
dev = client.device_manager.devices
scans = client.scans
queue = client.queue
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
# wait for scan to finish
while not status.status == "COMPLETED":
qtbot.wait(200)
last_scan_data = queue.scan_storage.storage[-1].data
# get data from curves
plt_data = plt.get_all_data()
# check plotted data
assert plt_data["bpm4i-bpm4i"]["x"] == last_scan_data["samx"]["samx"].val
assert plt_data["bpm4i-bpm4i"]["y"] == last_scan_data["bpm4i"]["bpm4i"].val
assert plt_data["bpm3a-bpm3a"]["x"] == last_scan_data["samx"]["samx"].val
assert plt_data["bpm3a-bpm3a"]["y"] == last_scan_data["bpm3a"]["bpm3a"].val
assert plt_data["bpm4d-bpm4d"]["x"] == last_scan_data["samx"]["samx"].val
assert plt_data["bpm4d-bpm4d"]["y"] == last_scan_data["bpm4d"]["bpm4d"].val
def test_rpc_image(rpc_server, qtbot):
fig = BECFigure(rpc_server.gui_id)
im = fig.image("eiger")
client = rpc_server.client
dev = client.device_manager.devices
scans = client.scans
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
# wait for scan to finish
while not status.status == "COMPLETED":
qtbot.wait(200)
last_image_device = client.connector.get_last(MessageEndpoints.device_monitor("eiger"))[
"data"
].data
qtbot.wait(500)
last_image_plot = im.images[0].get_data()
# check plotted data
np.testing.assert_equal(last_image_device, last_image_plot)
def test_rpc_motor_map(rpc_server, qtbot):
fig = BECFigure(rpc_server.gui_id)
fig_server = rpc_server.fig
motor_map = fig.motor_map("samx", "samy")
client = rpc_server.client
dev = client.device_manager.devices
scans = client.scans
initial_pos_x = dev.samx.read()["samx"]["value"]
initial_pos_y = dev.samy.read()["samy"]["value"]
status = scans.mv(dev.samx, 1, dev.samy, 2, relative=True)
# wait for scan to finish
while not status.status == "COMPLETED":
qtbot.wait(200)
final_pos_x = dev.samx.read()["samx"]["value"]
final_pos_y = dev.samy.read()["samy"]["value"]
# check plotted data
motor_map_data = motor_map.get_data()
np.testing.assert_equal(
[motor_map_data["x"][0], motor_map_data["y"][0]], [initial_pos_x, initial_pos_y]
)
np.testing.assert_equal(
[motor_map_data["x"][-1], motor_map_data["y"][-1]], [final_pos_x, final_pos_y]
)

View File

@@ -1,239 +0,0 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
from unittest.mock import Mock
import pytest
from bec_lib.connector import MessageObject
from bec_lib.messages import ScanMessage
msg = MessageObject(topic="", value=ScanMessage(point_id=0, scan_id="scan_id", data={}))
@pytest.fixture(name="consumer")
def _consumer(bec_dispatcher):
bec_dispatcher.client.connector = Mock()
yield bec_dispatcher.client.connector
@pytest.mark.filterwarnings("ignore:Failed to connect to redis.")
def test_connect_one_slot(bec_dispatcher, consumer):
slot1 = Mock()
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
consumer.register.assert_called_once()
# trigger consumer callback as if a message was published
consumer.register.call_args.kwargs["cb"](msg)
slot1.assert_called_once()
consumer.register.call_args.kwargs["cb"](msg)
assert slot1.call_count == 2
def test_connect_identical(bec_dispatcher, consumer):
slot1 = Mock()
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
consumer.register.assert_called_once()
consumer.register.call_args.kwargs["cb"](msg)
slot1.assert_called_once()
def test_connect_many_slots_one_topic(bec_dispatcher, consumer):
slot1, slot2 = Mock(), Mock()
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
consumer.register.assert_called_once()
bec_dispatcher.connect_slot(slot=slot2, topics="topic0")
consumer.register.assert_called_once()
# trigger consumer callback as if a message was published
consumer.register.call_args.kwargs["cb"](msg)
slot1.assert_called_once()
slot2.assert_called_once()
consumer.register.call_args.kwargs["cb"](msg)
assert slot1.call_count == 2
assert slot2.call_count == 2
def test_connect_one_slot_many_topics(bec_dispatcher, consumer):
slot1 = Mock()
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
assert consumer.register.call_count == 1
bec_dispatcher.connect_slot(slot=slot1, topics="topic1")
assert consumer.register.call_count == 2
# trigger consumer callback as if a message was published
consumer.register.call_args_list[0].kwargs["cb"](msg)
slot1.assert_called_once()
consumer.register.call_args_list[1].kwargs["cb"](msg)
assert slot1.call_count == 2
def test_disconnect_one_slot_one_topic(bec_dispatcher, consumer):
slot1, slot2 = Mock(), Mock()
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
# disconnect using a different topic
bec_dispatcher.disconnect_slot(slot=slot1, topics="topic1")
consumer.register.call_args.kwargs["cb"](msg)
assert slot1.call_count == 1
# disconnect using a different slot
bec_dispatcher.disconnect_slot(slot=slot2, topics="topic0")
consumer.register.call_args.kwargs["cb"](msg)
assert slot1.call_count == 2
# disconnect using the right slot and topics
bec_dispatcher.disconnect_slot(slot=slot1, topics="topic0")
# reset count to for slot
slot1.reset_mock()
consumer.register.call_args.kwargs["cb"](msg)
assert slot1.call_count == 0
def test_disconnect_identical(bec_dispatcher, consumer):
slot1 = Mock()
# Try to connect slot twice
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
# Test to call the slot once (slot should be not connected twice)
consumer.register.call_args.kwargs["cb"](msg)
assert slot1.call_count == 1
# Disconnect the slot
bec_dispatcher.disconnect_slot(slot=slot1, topics="topic0")
# Test to call the slot once (slot should be not connected anymore), count remains 1
consumer.register.call_args.kwargs["cb"](msg)
assert slot1.call_count == 1
def test_disconnect_many_slots_one_topic(bec_dispatcher, consumer):
slot1, slot2, slot3 = Mock(), Mock(), Mock()
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
bec_dispatcher.connect_slot(slot=slot2, topics="topic0")
# disconnect using a different slot
bec_dispatcher.disconnect_slot(slot3, topics="topic0")
consumer.register.call_args.kwargs["cb"](msg)
assert slot1.call_count == 1
assert slot2.call_count == 1
# disconnect using a different topics
bec_dispatcher.disconnect_slot(slot1, topics="topic1")
consumer.register.call_args.kwargs["cb"](msg)
assert slot1.call_count == 2
assert slot2.call_count == 2
# disconnect using the right slot and topics
bec_dispatcher.disconnect_slot(slot1, topics="topic0")
consumer.register.call_args.kwargs["cb"](msg)
assert slot1.call_count == 2
assert slot2.call_count == 3
def test_disconnect_one_slot_many_topics(bec_dispatcher, consumer):
slot1, slot2 = Mock(), Mock()
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
bec_dispatcher.connect_slot(slot=slot1, topics="topic1")
# disconnect using a different slot
bec_dispatcher.disconnect_slot(slot=slot2, topics="topic0")
consumer.register.call_args_list[0].kwargs["cb"](msg)
assert slot1.call_count == 1
consumer.register.call_args_list[1].kwargs["cb"](msg)
assert slot1.call_count == 2
# disconnect using a different topics
bec_dispatcher.disconnect_slot(slot=slot1, topics="topic3")
consumer.register.call_args_list[0].kwargs["cb"](msg)
assert slot1.call_count == 3
consumer.register.call_args_list[1].kwargs["cb"](msg)
assert slot1.call_count == 4
# disconnect using the right slot and topics
bec_dispatcher.disconnect_slot(slot=slot1, topics="topic0")
# Calling disconnected topic0 should not call slot1
consumer.register.call_args_list[0].kwargs["cb"](msg)
assert slot1.call_count == 4
# Calling topic1 should still call slot1
consumer.register.call_args_list[1].kwargs["cb"](msg)
assert slot1.call_count == 5
# disconnect remaining topic1 from slot1, calling any topic should not increase count
bec_dispatcher.disconnect_slot(slot=slot1, topics="topic1")
consumer.register.call_args_list[0].kwargs["cb"](msg)
consumer.register.call_args_list[1].kwargs["cb"](msg)
assert slot1.call_count == 5
def test_disconnect_all(bec_dispatcher, consumer):
# Mock slots to connect
slot1, slot2, slot3 = Mock(), Mock(), Mock()
# Connect slots to different topics
bec_dispatcher.connect_slot(slot=slot1, topics="topic0")
bec_dispatcher.connect_slot(slot=slot2, topics="topic1")
bec_dispatcher.connect_slot(slot=slot3, topics="topic2")
# Call disconnect_all method
bec_dispatcher.disconnect_all()
# Simulate messages and verify that none of the slots are called
consumer.register.call_args_list[0].kwargs["cb"](msg)
consumer.register.call_args_list[1].kwargs["cb"](msg)
consumer.register.call_args_list[2].kwargs["cb"](msg)
# Ensure that the slots have not been called
assert slot1.call_count == 0
assert slot2.call_count == 0
assert slot3.call_count == 0
# Also, check that the consumer for each topic is shutdown
assert "topic0" not in bec_dispatcher._connections
assert "topic1" not in bec_dispatcher._connections
assert "topic2" not in bec_dispatcher._connections
def test_connect_one_slot_multiple_topics_single_callback(bec_dispatcher, consumer):
slot1 = Mock()
# Connect the slot to multiple topics using a single callback
topics = ["topic1", "topic2"]
bec_dispatcher.connect_slot(slot=slot1, topics=topics, single_callback_for_all_topics=True)
# Verify the initial state
assert len(bec_dispatcher._connections) == 1 # One connection for all topics
assert len(bec_dispatcher._connections[tuple(sorted(topics))].slots) == 1 # One slot connected
# Simulate messages being published on each topic
for topic in topics:
msg_with_topic = MessageObject(
topic=topic, value=ScanMessage(point_id=0, scan_id="scan_id", data={})
)
consumer.register.call_args.kwargs["cb"](msg_with_topic)
# Verify that the slot is called once for each topic
assert slot1.call_count == len(topics)
# Verify that a single consumer is created for all topics
consumer.register.assert_called_once()
def test_disconnect_all_with_single_callback_for_multiple_topics(bec_dispatcher, consumer):
slot1 = Mock()
# Connect the slot to multiple topics using a single callback
topics = ["topic1", "topic2"]
bec_dispatcher.connect_slot(slot=slot1, topics=topics, single_callback_for_all_topics=True)
# Verify the initial state
assert len(bec_dispatcher._connections) == 1 # One connection for all topics
assert len(bec_dispatcher._connections[tuple(sorted(topics))].slots) == 1 # One slot connected
# Call disconnect_all method
bec_dispatcher.disconnect_all()
# Verify that the slot is disconnected
assert len(bec_dispatcher._connections) == 0 # All connections are removed
assert slot1.call_count == 0 # Slot has not been called
# Simulate messages and verify that the slot is not called
consumer.register.call_args.kwargs["cb"](msg)
assert slot1.call_count == 0 # Slot has not been called

View File

View File

@@ -1,7 +1,9 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
from unittest.mock import MagicMock, patch
import fakeredis
import pytest
from bec_lib import BECClient, RedisConnector
from bec_lib.device import Positioner
from bec_lib.devicemanager import DeviceContainer
@@ -92,12 +94,24 @@ DEVICES = [
]
def fake_redis_server(host, port):
redis = fakeredis.FakeRedis()
return redis
@pytest.fixture(scope="function")
def mocked_client():
def mocked_client(bec_dispatcher):
connector = RedisConnector("localhost:1", redis_cls=fake_redis_server)
# Create a MagicMock object
client = MagicMock()
client = MagicMock() # TODO change to real BECClient
# Shutdown the original client
bec_dispatcher.client.shutdown()
# Mock the connector attribute
bec_dispatcher.client = client
# Mock the device_manager.devices attribute
client.connector = connector
client.device_manager = DMMock()
client.device_manager.add_devives(DEVICES)
@@ -121,3 +135,4 @@ def mocked_client():
with patch("builtins.isinstance", new=isinstance_mock):
yield client
connector.shutdown() # TODO change to real BECClient

View File

@@ -0,0 +1,14 @@
import pytest
from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module
@pytest.fixture(autouse=True)
def bec_dispatcher(threads_check):
bec_dispatcher = bec_dispatcher_module.BECDispatcher()
yield bec_dispatcher
bec_dispatcher.disconnect_all()
# clean BEC client
bec_dispatcher.client.shutdown()
# reinitialize singleton for next test
bec_dispatcher_module.BECDispatcher.reset_singleton()

View File

@@ -0,0 +1,59 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import time
from unittest import mock
import pytest
import redis
from bec_lib.connector import MessageObject
from bec_lib.messages import ScanMessage
from bec_lib.redis_connector import RedisConnector
from bec_lib.serialization import MsgpackSerialization
from bec_widgets.utils.bec_dispatcher import QtRedisConnector
@pytest.fixture
def bec_dispatcher_w_connector(bec_dispatcher, topics_msg_list):
def pubsub_msg_generator():
for topic, msg in topics_msg_list:
yield {"channel": topic.encode(), "pattern": None, "data": msg}
while True:
time.sleep(0.2)
yield StopIteration
with mock.patch("redis.Redis"):
pubsub = redis.Redis().pubsub()
messages = pubsub_msg_generator()
pubsub.get_message.side_effect = lambda timeout: next(messages)
connector = QtRedisConnector("localhost:1")
bec_dispatcher.client.connector = connector
yield bec_dispatcher
dummy_msg = MsgpackSerialization.dumps(ScanMessage(point_id=0, scan_id="0", data={}))
@pytest.mark.parametrize(
"topics_msg_list",
[
(
("topic1", dummy_msg),
("topic2", dummy_msg),
("topic3", dummy_msg),
)
],
)
def test_dispatcher_disconnect_all(bec_dispatcher_w_connector, qtbot):
bec_dispatcher = bec_dispatcher_w_connector
cb1 = mock.Mock(spec=[])
cb2 = mock.Mock(spec=[])
bec_dispatcher.connect_slot(cb1, "topic1")
bec_dispatcher.connect_slot(cb1, "topic2")
bec_dispatcher.connect_slot(cb2, "topic2")
bec_dispatcher.connect_slot(cb2, "topic3")
assert len(bec_dispatcher.client.connector._topics_cb) == 3
bec_dispatcher.disconnect_all()
assert len(bec_dispatcher.client.connector._topics_cb) == 0

View File

@@ -16,7 +16,8 @@ def bec_figure(qtbot, mocked_client):
widget = BECFigure(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
return widget
yield widget
widget.close()
def test_bec_figure_init(bec_figure):

View File

@@ -0,0 +1,29 @@
from unittest import mock
import pytest
from bec_widgets.cli.client import BECFigure
from .client_mocks import FakeDevice
@pytest.fixture
def cli_figure():
fig = BECFigure(gui_id="test")
with mock.patch.object(fig, "_run_rpc") as mock_rpc_call:
with mock.patch.object(fig, "gui_is_alive", return_value=True):
yield fig, mock_rpc_call
def test_rpc_call_plot(cli_figure):
fig, mock_rpc_call = cli_figure
fig.plot("samx", "bpm4i")
mock_rpc_call.assert_called_with("plot", "samx", "bpm4i")
def test_rpc_call_accepts_device_as_input(cli_figure):
dev1 = FakeDevice("samx")
dev2 = FakeDevice("bpm4i")
fig, mock_rpc_call = cli_figure
fig.plot(dev1, dev2)
mock_rpc_call.assert_called_with("plot", "samx", "bpm4i")

View File

View File

@@ -1,6 +1,4 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import os
import pickle
from unittest.mock import MagicMock
import pytest
@@ -8,8 +6,7 @@ from qtpy.QtWidgets import QLineEdit
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets import ScanControl
from .test_msgs.available_scans_message import available_scans_message
from tests.unit_tests.test_msgs.available_scans_message import available_scans_message
class FakePositioner: