1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-11 11:10:53 +02:00

Compare commits

..

15 Commits

Author SHA1 Message Date
a275c5a3b1 wip 2026-03-15 14:31:39 +01:00
b12e6432d6 wip 2026-03-15 13:59:43 +01:00
b802beec69 wip 2026-03-15 13:53:37 +01:00
6c40df4380 wip 2026-03-15 13:36:13 +01:00
dc8ae5a85d wip 2026-03-15 13:31:45 +01:00
2d41a158bc wip 2026-03-15 13:25:42 +01:00
c2a6964875 wip 2026-03-15 13:18:15 +01:00
774ca08cb3 wip 2026-03-15 13:06:18 +01:00
4606bbd570 wip 2026-03-15 13:01:44 +01:00
b2ed9d42f7 wip 2026-03-15 12:57:19 +01:00
d799691e12 wip 2026-03-15 12:47:58 +01:00
6848a9e20b test(e2e): avoid timing issues in rpc_gui_obj test 2026-03-15 12:30:40 +01:00
semantic-release
bd5aafc052 3.2.0
Automatically generated by python-semantic-release
2026-03-11 20:52:57 +00:00
b4f6f5aa8b feat(waveform): composite DAP with multiple models 2026-03-11 21:52:10 +01:00
14d51b8016 feat(curve, waveform): add dap_parameters for lmfit customization in DAP requests 2026-03-11 21:52:10 +01:00
19 changed files with 1448 additions and 1036 deletions

View File

@@ -1,19 +1,19 @@
name: Full CI
on:
on:
push:
pull_request:
workflow_dispatch:
inputs:
BEC_WIDGETS_BRANCH:
description: 'Branch of BEC Widgets to install'
description: "Branch of BEC Widgets to install"
required: false
type: string
BEC_CORE_BRANCH:
description: 'Branch of BEC Core to install'
description: "Branch of BEC Core to install"
required: false
type: string
OPHYD_DEVICES_BRANCH:
description: 'Branch of Ophyd Devices to install'
description: "Branch of Ophyd Devices to install"
required: false
type: string
@@ -25,60 +25,58 @@ permissions:
pull-requests: write
jobs:
check_pr_status:
uses: ./.github/workflows/check_pr.yml
# check_pr_status:
# uses: ./.github/workflows/check_pr.yml
formatter:
needs: check_pr_status
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/formatter.yml
# formatter:
# needs: check_pr_status
# if: needs.check_pr_status.outputs.branch-pr == ''
# uses: ./.github/workflows/formatter.yml
unit-test:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/pytest.yml
with:
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref }}
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
# unit-test:
# needs: [check_pr_status, formatter]
# if: needs.check_pr_status.outputs.branch-pr == ''
# uses: ./.github/workflows/pytest.yml
# with:
# BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref }}
# BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
# OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}
# secrets:
# CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
unit-test-matrix:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/pytest-matrix.yml
with:
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha}}
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}
# unit-test-matrix:
# needs: [check_pr_status, formatter]
# if: needs.check_pr_status.outputs.branch-pr == ''
# uses: ./.github/workflows/pytest-matrix.yml
# with:
# BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha}}
# BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
# OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}
generate-cli-test:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/generate-cli-check.yml
# generate-cli-test:
# needs: [check_pr_status, formatter]
# if: needs.check_pr_status.outputs.branch-pr == ''
# uses: ./.github/workflows/generate-cli-check.yml
end2end-test:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/end2end-conda.yml
child-repos:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/child_repos.yml
with:
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
plugin_repos:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: bec-project/bec/.github/workflows/plugin_repos.yml@main
with:
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
# child-repos:
# needs: [check_pr_status, formatter]
# if: needs.check_pr_status.outputs.branch-pr == ''
# uses: ./.github/workflows/child_repos.yml
# with:
# BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
# OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
# BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
secrets:
GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}
# plugin_repos:
# needs: [check_pr_status, formatter]
# if: needs.check_pr_status.outputs.branch-pr == ''
# uses: bec-project/bec/.github/workflows/plugin_repos.yml@main
# with:
# BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
# BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
# secrets:
# GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}

View File

@@ -48,12 +48,14 @@ jobs:
source ./bin/install_bec_dev.sh -t
cd ../
pip install -e ./ophyd_devices -e .[dev,pyside6] -e ./bec_testing_plugin
pytest -v --files-path ./ --start-servers --random-order ./tests/end-2-end
pip install pytest-repeat
pytest -v --files-path ./ --start-servers --random-order ./tests/end-2-end/user_interaction/test_user_interaction_e2e.py
- name: Upload logs if job fails
if: failure()
uses: actions/upload-artifact@v4
with:
name: pytest-logs
path: ./logs/*.log
path: ./bec/logs/*.log
retention-days: 7

View File

@@ -1,6 +1,17 @@
# CHANGELOG
## v3.2.0 (2026-03-11)
### Features
- **curve, waveform**: Add dap_parameters for lmfit customization in DAP requests
([`14d51b8`](https://github.com/bec-project/bec_widgets/commit/14d51b80169f5a060dd24287f3a6db9a4b41275a))
- **waveform**: Composite DAP with multiple models
([`b4f6f5a`](https://github.com/bec-project/bec_widgets/commit/b4f6f5aa8bcd0f6091610e8f839ea265c87575e0))
## v3.1.4 (2026-03-11)
### Bug Fixes

View File

@@ -3116,24 +3116,24 @@ class LaunchWindow(RPCBase):
class LogPanel(RPCBase):
"""Live display of the BEC logs in a table view."""
"""Displays a log panel"""
@rpc_call
def remove(self):
def set_plain_text(self, text: str) -> None:
"""
Cleanup the BECConnector
Set the plain text of the widget.
Args:
text (str): The text to set.
"""
@rpc_call
def attach(self):
"""
None
def set_html_text(self, text: str) -> None:
"""
Set the HTML text of the widget.
@rpc_call
def detach(self):
"""
Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget.
Args:
text (str): The text to set.
"""
@@ -6249,7 +6249,8 @@ class Waveform(RPCBase):
signal_y: "str | None" = None,
color: "str | None" = None,
label: "str | None" = None,
dap: "str | None" = None,
dap: "str | list[str] | None" = None,
dap_parameters: "dict | list | lmfit.Parameters | None | object" = None,
scan_id: "str | None" = None,
scan_number: "int | None" = None,
**kwargs,
@@ -6271,9 +6272,14 @@ class Waveform(RPCBase):
signal_y(str): The name of the entry for the y-axis.
color(str): The color of the curve.
label(str): The label of the curve.
dap(str): The dap model to use for the curve. When provided, a DAP curve is
dap(str | list[str]): The dap model to use for the curve. When provided, a DAP curve is
attached automatically for device, history, or custom data sources. Use
the same string as the LMFit model name.
the same string as the LMFit model name, or a list of model names to build a composite.
dap_parameters(dict | list | lmfit.Parameters | None): Optional lmfit parameter overrides sent to
the DAP server. For a single model: values can be numeric (interpreted as fixed parameters)
or dicts like `{"value": 1.0, "vary": False}`. For composite models (dap is list), use either
a list aligned to the model list (each item is a param dict), or a dict of
`{ "ModelName": { "param": {...} } }` when model names are unique.
scan_id(str): Optional scan ID. When provided, the curve is treated as a **history** curve and
the ydata (and optional xdata) are fetched from that historical scan. Such curves are
never cleared by livescan resets.
@@ -6287,9 +6293,10 @@ class Waveform(RPCBase):
def add_dap_curve(
self,
device_label: "str",
dap_name: "str",
dap_name: "str | list[str]",
color: "str | None" = None,
dap_oversample: "int" = 1,
dap_parameters: "dict | list | lmfit.Parameters | None" = None,
**kwargs,
) -> "Curve":
"""
@@ -6299,9 +6306,11 @@ class Waveform(RPCBase):
Args:
device_label(str): The label of the source curve to add DAP to.
dap_name(str): The name of the DAP model to use.
dap_name(str | list[str]): The name of the DAP model to use, or a list of model
names to build a composite model.
color(str): The color of the curve.
dap_oversample(int): The oversampling factor for the DAP curve.
dap_parameters(dict | list | lmfit.Parameters | None): Optional lmfit parameter overrides sent to the DAP server.
**kwargs
Returns:

View File

@@ -118,6 +118,9 @@ class RPCServer:
logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}")
with rpc_exception_hook(functools.partial(self.send_response, request_id, False)):
try:
logger.info(
f"Processing RPC instruction: {msg['action']} with request_id: {request_id}. Parameters: {msg.get('parameter')}"
)
method = msg["action"]
args = msg["parameter"].get("args", [])
kwargs = msg["parameter"].get("kwargs", {})
@@ -257,6 +260,10 @@ class RPCServer:
else:
name = WidgetContainerUtils.generate_unique_name("dock_area", existing_dock_areas)
logger.info(
f"Launching new dock area with name: {name} and startup_profile: {startup_profile}"
)
result_widget = bw_launch.dock_area(object_name=name, startup_profile=startup_profile)
result_widget.window().setWindowTitle(f"BEC - {name}")
@@ -296,6 +303,9 @@ class RPCServer:
else:
res = self.serialize_object(res)
except RegistryNotReadyError:
logger.info(
f"Object not registered yet for RPC request {request_id}, retrying serialization after {retry_delay} ms"
)
try:
self._rpc_singleshot_repeats[request_id] += retry_delay
QTimer.singleShot(

View File

@@ -79,7 +79,7 @@ from bec_widgets.widgets.plots.waveform.waveform import Waveform
from bec_widgets.widgets.progress.ring_progress_bar.ring_progress_bar import RingProgressBar
from bec_widgets.widgets.services.bec_queue.bec_queue import BECQueue
from bec_widgets.widgets.services.bec_status_box.bec_status_box import BECStatusBox
from bec_widgets.widgets.utility.logpanel.logpanel import LogPanel
from bec_widgets.widgets.utility.logpanel import LogPanel
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
logger = bec_logger.logger
@@ -376,7 +376,6 @@ class BECDockArea(DockAreaWidget):
"bec_shell": (BECShell.ICON_NAME, "Add BEC Shell", "BECShell"),
"log_panel": (LogPanel.ICON_NAME, "Add LogPanel - Disabled", "LogPanel"),
"sbb_monitor": ("train", "Add SBB Monitor", "SBBMonitor"),
"log_panel": (LogPanel.ICON_NAME, "Add LogPanel", "LogPanel"),
}
# Create expandable menu actions (original behavior)
@@ -488,7 +487,9 @@ class BECDockArea(DockAreaWidget):
# first two items not needed for this part
for key, (_, _, widget_type) in mapping.items():
act = menu.actions[key].action
if key == "terminal":
if widget_type == "LogPanel":
act.setEnabled(False) # keep disabled per issue #644
elif key == "terminal":
act.triggered.connect(
lambda _, t=widget_type: self.new(widget=t, closable=True, startup_cmd=None)
)
@@ -509,7 +510,10 @@ class BECDockArea(DockAreaWidget):
for action_id, (_, _, widget_type) in mapping.items():
flat_action_id = f"flat_{action_id}"
flat_action = self.toolbar.components.get_action(flat_action_id).action
flat_action.triggered.connect(lambda _, t=widget_type: self.new(t))
if widget_type == "LogPanel":
flat_action.setEnabled(False) # keep disabled per issue #644
else:
flat_action.triggered.connect(lambda _, t=widget_type: self.new(t))
_connect_flat_actions(self._ACTION_MAPPINGS["menu_plots"])
_connect_flat_actions(self._ACTION_MAPPINGS["menu_devices"])

View File

@@ -22,8 +22,9 @@ class DeviceSignal(BaseModel):
device: str
signal: str
dap: str | None = None
dap: str | list[str] | None = None
dap_oversample: int = 1
dap_parameters: dict | list | None = None
model_config: dict = {"validate_assignment": True}

View File

@@ -1,13 +1,13 @@
from __future__ import annotations
import json
from typing import Literal
from typing import TYPE_CHECKING, Literal
import lmfit
import numpy as np
import pyqtgraph as pg
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.lmfit_serializer import serialize_lmfit_params, serialize_param_object
from bec_lib.scan_data_container import ScanDataContainer
from pydantic import Field, ValidationError, field_validator
from qtpy.QtCore import Qt, QTimer, Signal
@@ -41,6 +41,18 @@ from bec_widgets.widgets.services.scan_history_browser.scan_history_browser impo
)
logger = bec_logger.logger
_DAP_PARAM = object()
if TYPE_CHECKING: # pragma: no cover
import lmfit # type: ignore
else:
try:
import lmfit # type: ignore
except Exception as e: # pragma: no cover
logger.warning(
f"lmfit could not be imported: {e}. Custom DAP functionality will be unavailable."
)
lmfit = None
# noinspection PyDataclass
@@ -696,7 +708,8 @@ class Waveform(PlotBase):
signal_y: str | None = None,
color: str | None = None,
label: str | None = None,
dap: str | None = None,
dap: str | list[str] | None = None,
dap_parameters: dict | list | lmfit.Parameters | None | object = None,
scan_id: str | None = None,
scan_number: int | None = None,
**kwargs,
@@ -718,9 +731,14 @@ class Waveform(PlotBase):
signal_y(str): The name of the entry for the y-axis.
color(str): The color of the curve.
label(str): The label of the curve.
dap(str): The dap model to use for the curve. When provided, a DAP curve is
dap(str | list[str]): The dap model to use for the curve. When provided, a DAP curve is
attached automatically for device, history, or custom data sources. Use
the same string as the LMFit model name.
the same string as the LMFit model name, or a list of model names to build a composite.
dap_parameters(dict | list | lmfit.Parameters | None): Optional lmfit parameter overrides sent to
the DAP server. For a single model: values can be numeric (interpreted as fixed parameters)
or dicts like `{"value": 1.0, "vary": False}`. For composite models (dap is list), use either
a list aligned to the model list (each item is a param dict), or a dict of
`{ "ModelName": { "param": {...} } }` when model names are unique.
scan_id(str): Optional scan ID. When provided, the curve is treated as a **history** curve and
the ydata (and optional xdata) are fetched from that historical scan. Such curves are
never cleared by livescan resets.
@@ -733,6 +751,8 @@ class Waveform(PlotBase):
source = "custom"
x_data = None
y_data = None
if dap_parameters is _DAP_PARAM:
dap_parameters = kwargs.pop("dap_parameters", None) or kwargs.pop("parameters", None)
# 1. Custom curve logic
if x is not None and y is not None:
@@ -810,7 +830,9 @@ class Waveform(PlotBase):
curve = self._add_curve(config=config, x_data=x_data, y_data=y_data)
if dap is not None and curve.config.source in ("device", "history", "custom"):
self.add_dap_curve(device_label=curve.name(), dap_name=dap, **kwargs)
self.add_dap_curve(
device_label=curve.name(), dap_name=dap, dap_parameters=dap_parameters, **kwargs
)
return curve
@@ -820,9 +842,10 @@ class Waveform(PlotBase):
def add_dap_curve(
self,
device_label: str,
dap_name: str,
dap_name: str | list[str],
color: str | None = None,
dap_oversample: int = 1,
dap_parameters: dict | list | lmfit.Parameters | None = None,
**kwargs,
) -> Curve:
"""
@@ -832,9 +855,11 @@ class Waveform(PlotBase):
Args:
device_label(str): The label of the source curve to add DAP to.
dap_name(str): The name of the DAP model to use.
dap_name(str | list[str]): The name of the DAP model to use, or a list of model
names to build a composite model.
color(str): The color of the curve.
dap_oversample(int): The oversampling factor for the DAP curve.
dap_parameters(dict | list | lmfit.Parameters | None): Optional lmfit parameter overrides sent to the DAP server.
**kwargs
Returns:
@@ -859,7 +884,7 @@ class Waveform(PlotBase):
dev_entry = "custom"
# 2) Build a label for the new DAP curve
dap_label = f"{device_label}-{dap_name}"
dap_label = f"{device_label}-{self._format_dap_label(dap_name)}"
# 3) Possibly raise if the DAP curve already exists
if self._check_curve_id(dap_label):
@@ -882,7 +907,11 @@ class Waveform(PlotBase):
# Attach device signal with DAP
config.signal = DeviceSignal(
device=dev_name, signal=dev_entry, dap=dap_name, dap_oversample=dap_oversample
device=dev_name,
signal=dev_entry,
dap=dap_name,
dap_oversample=dap_oversample,
dap_parameters=self._normalize_dap_parameters(dap_parameters, dap_name=dap_name),
)
# 4) Create the DAP curve config using `_add_curve(...)`
@@ -1754,7 +1783,9 @@ class Waveform(PlotBase):
x_data, y_data = parent_curve.get_data()
model_name = dap_curve.config.signal.dap
model = getattr(self.dap, model_name)
model = None
if not isinstance(model_name, (list, tuple)):
model = getattr(self.dap, model_name)
try:
x_min, x_max = self.roi_region
x_data, y_data = self._crop_data(x_data, y_data, x_min, x_max)
@@ -1762,20 +1793,132 @@ class Waveform(PlotBase):
x_min = None
x_max = None
dap_parameters = getattr(dap_curve.config.signal, "dap_parameters", None)
dap_kwargs = {
"data_x": x_data,
"data_y": y_data,
"oversample": dap_curve.dap_oversample,
}
if dap_parameters:
dap_kwargs["parameters"] = dap_parameters
if model is not None:
class_args = model._plugin_info["class_args"]
class_kwargs = model._plugin_info["class_kwargs"]
else:
class_args = []
class_kwargs = {"model": model_name}
msg = messages.DAPRequestMessage(
dap_cls="LmfitService1D",
dap_type="on_demand",
config={
"args": [],
"kwargs": {"data_x": x_data, "data_y": y_data},
"class_args": model._plugin_info["class_args"],
"class_kwargs": model._plugin_info["class_kwargs"],
"kwargs": dap_kwargs,
"class_args": class_args,
"class_kwargs": class_kwargs,
"curve_label": dap_curve.name(),
},
metadata={"RID": f"{self.scan_id}-{self.gui_id}"},
)
self.client.connector.set_and_publish(MessageEndpoints.dap_request(), msg)
@staticmethod
def _normalize_dap_parameters(
parameters: dict | list | lmfit.Parameters | None, dap_name: str | list[str] | None = None
) -> dict | list | None:
"""
Normalize user-provided lmfit parameters into a JSON-serializable dict suitable for the DAP server.
Supports:
- `lmfit.Parameters` (single-model only)
- `dict[name -> number]` (treated as fixed parameter with `vary=False`)
- `dict[name -> dict]` (lmfit.Parameter fields; defaults to `vary=False` if unspecified)
- `dict[name -> lmfit.Parameter]`
- composite: `list[dict[param_name -> spec]]` aligned to model list
- composite: `dict[model_name -> dict[param_name -> spec]]` (unique model names only)
"""
if parameters is None:
return None
if isinstance(dap_name, (list, tuple)):
if lmfit is not None and isinstance(parameters, lmfit.Parameters):
raise TypeError("dap_parameters must be a dict when using composite dap models.")
if isinstance(parameters, (list, tuple)):
normalized_list: list[dict | None] = []
for idx, item in enumerate(parameters):
if item is None:
normalized_list.append(None)
continue
if not isinstance(item, dict):
raise TypeError(
f"dap_parameters list item {idx} must be a dict of parameter overrides."
)
normalized_list.append(Waveform._normalize_param_overrides(item))
return normalized_list or None
if not isinstance(parameters, dict):
raise TypeError(
"dap_parameters must be a dict of model->params when using composite dap models."
)
model_names = set(dap_name)
invalid_models = set(parameters.keys()) - model_names
if invalid_models:
raise TypeError(
f"Invalid dap_parameters keys for composite model: {sorted(invalid_models)}"
)
normalized_composite: dict[str, dict] = {}
for model_name in dap_name:
model_params = parameters.get(model_name)
if model_params is None:
continue
if not isinstance(model_params, dict):
raise TypeError(
f"dap_parameters for '{model_name}' must be a dict of parameter overrides."
)
normalized = Waveform._normalize_param_overrides(model_params)
if normalized:
normalized_composite[model_name] = normalized
return normalized_composite or None
if lmfit is not None and isinstance(parameters, lmfit.Parameters):
return serialize_lmfit_params(parameters)
if not isinstance(parameters, dict):
if lmfit is None:
raise TypeError(
"dap_parameters must be a dict when lmfit is not installed on the client."
)
raise TypeError("dap_parameters must be a dict or lmfit.Parameters (or omitted).")
return Waveform._normalize_param_overrides(parameters)
@staticmethod
def _normalize_param_overrides(parameters: dict) -> dict | None:
normalized: dict[str, dict] = {}
for name, spec in parameters.items():
if spec is None:
continue
if isinstance(spec, (int, float, np.number)):
normalized[name] = {"name": name, "value": float(spec), "vary": False}
continue
if lmfit is not None and isinstance(spec, lmfit.Parameter):
normalized[name] = serialize_param_object(spec)
continue
if isinstance(spec, dict):
normalized[name] = {"name": name, **spec}
if "vary" not in normalized[name]:
normalized[name]["vary"] = False
continue
raise TypeError(
f"Invalid dap_parameters entry for '{name}': expected number, dict, or lmfit.Parameter."
)
return normalized or None
@staticmethod
def _format_dap_label(dap_name: str | list[str]) -> str:
if isinstance(dap_name, (list, tuple)):
return "+".join(dap_name)
return dap_name
@SafeSlot(dict, dict)
def update_dap_curves(self, msg, metadata):
"""
@@ -1793,14 +1936,6 @@ class Waveform(PlotBase):
if not curve:
return
# Get data from the parent (device) curve
parent_curve = self._find_curve_by_label(curve.config.parent_label)
if parent_curve is None:
return
x_parent, _ = parent_curve.get_data()
if x_parent is None or len(x_parent) == 0:
return
# Retrieve and store the fit parameters and summary from the DAP server response
try:
curve.dap_params = msg["data"][1]["fit_parameters"]
@@ -1809,19 +1944,13 @@ class Waveform(PlotBase):
logger.warning(f"Failed to retrieve DAP data for curve '{curve.name()}'")
return
# Render model according to the DAP model name and parameters
model_name = curve.config.signal.dap
model_function = getattr(lmfit.models, model_name)()
x_min, x_max = x_parent.min(), x_parent.max()
oversample = curve.dap_oversample
new_x = np.linspace(x_min, x_max, int(len(x_parent) * oversample))
# Evaluate the model with the provided parameters to generate the y values
new_y = model_function.eval(**curve.dap_params, x=new_x)
# Update the curve with the new data
curve.setData(new_x, new_y)
# Plot the fitted curve using the server-provided output to avoid requiring lmfit on the client.
try:
fit_data = msg["data"][0]
curve.setData(np.asarray(fit_data["x"]), np.asarray(fit_data["y"]))
except Exception as e:
logger.exception(f"Failed to plot DAP result for curve '{curve.name()}', error: {e}")
return
metadata.update({"curve_id": curve_id})
self.dap_params_update.emit(curve.dap_params, metadata)
@@ -2341,24 +2470,20 @@ class DemoApp(QMainWindow): # pragma: no cover
def __init__(self):
super().__init__()
self.setWindowTitle("Waveform Demo")
self.resize(1200, 600)
self.resize(1600, 600)
self.main_widget = QWidget(self)
self.layout = QHBoxLayout(self.main_widget)
self.setCentralWidget(self.main_widget)
self.waveform_popup = Waveform(popups=True)
self.waveform_popup.plot(device_y="waveform")
self.waveform_side = Waveform(popups=False)
self.waveform_side.plot(device_y="bpm4i", signal_y="bpm4i", dap="GaussianModel")
self.waveform_side.plot(device_y="bpm3a", signal_y="bpm3a")
self.custom_waveform = Waveform(popups=True)
self._populate_custom_curve_demo()
self.layout.addWidget(self.waveform_side)
self.layout.addWidget(self.waveform_popup)
self.sine_waveform = Waveform(popups=True)
self.sine_waveform.dap_params_update.connect(self._log_sine_dap_params)
self._populate_sine_curve_demo()
self.layout.addWidget(self.custom_waveform)
self.layout.addWidget(self.sine_waveform)
def _populate_custom_curve_demo(self):
"""
@@ -2377,8 +2502,141 @@ class DemoApp(QMainWindow): # pragma: no cover
sigma = 0.8
y = amplitude * np.exp(-((x - center) ** 2) / (2 * sigma**2)) + noise
# 1) No explicit parameters: server will use lmfit defaults/guesses.
self.custom_waveform.plot(x=x, y=y, label="custom-gaussian", dap="GaussianModel")
# 2) Easy dict: numbers mean "fix this parameter to value" (vary=False).
self.custom_waveform.plot(
x=x,
y=y,
label="custom-gaussian-fixed-easy",
dap="GaussianModel",
dap_parameters={"amplitude": 1.0},
dap_oversample=5,
)
# 3) Partial parameter override: this should still trigger guessing on the server
# because not all Gaussian parameters are explicitly specified.
self.custom_waveform.plot(
x=x,
y=y,
label="custom-gaussian-partial-guess",
dap="GaussianModel",
dap_parameters={
"center": {"value": 1.2, "vary": True},
"sigma": {"value": sigma, "vary": False, "min": 0.0},
},
)
# 4) Complete parameter override: this should skip guessing on the server.
if lmfit is not None:
params_gauss = lmfit.models.GaussianModel().make_params()
params_gauss["amplitude"].set(value=amplitude, vary=False)
params_gauss["center"].set(value=center, vary=False)
params_gauss["sigma"].set(value=sigma, vary=False, min=0.0)
self.custom_waveform.plot(
x=x,
y=y,
label="custom-gaussian-complete-no-guess",
dap="GaussianModel",
dap_parameters=params_gauss,
)
else:
logger.info("Skipping lmfit.Parameters demo (lmfit not installed on client).")
# Composite example: spectrum with three Gaussians (DAP-only)
x_spec = np.linspace(-5, 5, 800)
rng_spec = np.random.default_rng(123)
centers = [-2.0, 0.6, 2.4]
amplitudes = [2.5, 3.2, 1.8]
sigmas = [0.35, 0.5, 0.3]
y_spec = (
amplitudes[0] * np.exp(-((x_spec - centers[0]) ** 2) / (2 * sigmas[0] ** 2))
+ amplitudes[1] * np.exp(-((x_spec - centers[1]) ** 2) / (2 * sigmas[1] ** 2))
+ amplitudes[2] * np.exp(-((x_spec - centers[2]) ** 2) / (2 * sigmas[2] ** 2))
+ rng_spec.normal(loc=0, scale=0.06, size=x_spec.size)
)
# 5) Composite model with partial overrides only: this should still trigger guessing.
self.custom_waveform.plot(
x=x_spec,
y=y_spec,
label="custom-gaussian-spectrum-partial-guess",
dap=["GaussianModel", "GaussianModel", "GaussianModel"],
dap_parameters=[
{"center": {"value": centers[0], "vary": False}},
{"center": {"value": centers[1], "vary": False}},
{"center": {"value": centers[2], "vary": False}},
],
)
# 6) Composite model with all component parameters specified: this should skip guessing.
self.custom_waveform.plot(
x=x_spec,
y=y_spec,
label="custom-gaussian-spectrum-complete-no-guess",
dap=["GaussianModel", "GaussianModel", "GaussianModel"],
dap_parameters=[
{
"amplitude": {"value": amplitudes[0], "vary": False},
"center": {"value": centers[0], "vary": False},
"sigma": {"value": sigmas[0], "vary": False, "min": 0.0},
},
{
"amplitude": {"value": amplitudes[1], "vary": False},
"center": {"value": centers[1], "vary": False},
"sigma": {"value": sigmas[1], "vary": False, "min": 0.0},
},
{
"amplitude": {"value": amplitudes[2], "vary": False},
"center": {"value": centers[2], "vary": False},
"sigma": {"value": sigmas[2], "vary": False, "min": 0.0},
},
],
)
def _populate_sine_curve_demo(self):
"""
Showcase how lmfit's base SineModel can struggle with a drifting baseline.
"""
x = np.linspace(0, 6 * np.pi, 600)
rng = np.random.default_rng(7)
amplitude = 1.6
frequency = 0.75
phase = 0.4
offset = 0.8
slope = 0.08
noise = rng.normal(loc=0, scale=0.12, size=x.size)
y = offset + slope * x + amplitude * np.sin(2 * np.pi * frequency * x + phase) + noise
# Base SineModel (no offset support) to show the mismatch
self.sine_waveform.plot(x=x, y=y, label="custom-sine-data", dap="SineModel")
# Composite model: Sine + Linear baseline (offset + slope)
self.sine_waveform.plot(
x=x,
y=y,
label="custom-sine-composite",
dap=["SineModel", "LinearModel"],
dap_oversample=4,
)
if lmfit is None:
logger.info("Skipping sine lmfit demo (lmfit not installed on client).")
return
return
@staticmethod
def _log_sine_dap_params(params: dict, metadata: dict):
curve_id = metadata.get("curve_id")
if curve_id not in {
"custom-sine-data-SineModel",
"custom-sine-composite-SineModel+LinearModel",
}:
return
logger.info(f"SineModel DAP fit params ({curve_id}): {params}")
if __name__ == "__main__": # pragma: no cover
import sys

View File

@@ -0,0 +1,58 @@
"""Utilities for filtering and formatting in the LogPanel"""
from __future__ import annotations
import re
from collections import deque
from typing import Callable, Iterator
from bec_lib.logger import LogLevel
from bec_lib.messages import LogMessage
from qtpy.QtCore import QDateTime
LinesHtmlFormatter = Callable[[deque[LogMessage]], Iterator[str]]
LineFormatter = Callable[[LogMessage], str]
LineFilter = Callable[[LogMessage], bool] | None
ANSI_ESCAPE_REGEX = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
def replace_escapes(s: str):
s = ANSI_ESCAPE_REGEX.sub("", s)
return s.replace(" ", "&nbsp;").replace("\n", "<br />").replace("\t", " ")
def level_filter(msg: LogMessage, thresh: int):
return LogLevel[msg.content["log_type"].upper()].value >= thresh
def noop_format(line: LogMessage):
_textline = line.log_msg if isinstance(line.log_msg, str) else line.log_msg["text"]
return replace_escapes(_textline.strip()) + "<br />"
def simple_color_format(line: LogMessage, colors: dict[LogLevel, str]):
color = colors.get(LogLevel[line.content["log_type"].upper()]) or colors[LogLevel.INFO]
return f'<font color="{color}">{noop_format(line)}</font>'
def create_formatter(line_format: LineFormatter, line_filter: LineFilter) -> LinesHtmlFormatter:
def _formatter(data: deque[LogMessage]):
if line_filter is not None:
return (line_format(line) for line in data if line_filter(line))
else:
return (line_format(line) for line in data)
return _formatter
def log_txt(line):
return line.log_msg if isinstance(line.log_msg, str) else line.log_msg["text"]
def log_time(line):
return QDateTime.fromMSecsSinceEpoch(int(line.log_msg["record"]["time"]["timestamp"] * 1000))
def log_svc(line):
return line.log_msg["service_name"]

View File

@@ -30,7 +30,7 @@ class LogPanelPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
return DOM_XML
def group(self):
return ""
return "BEC Services"
def icon(self):
return designer_material_icon(LogPanel.ICON_NAME)
@@ -51,7 +51,7 @@ class LogPanelPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
return "LogPanel"
def toolTip(self):
return "LogPanel"
return "Displays a log panel"
def whatsThis(self):
return self.toolTip()

View File

@@ -2,31 +2,21 @@
from __future__ import annotations
import operator
import os
import re
from collections import deque
from dataclasses import dataclass
from functools import partial
from typing import Iterable, Literal
from functools import partial, reduce
from re import Pattern
from typing import TYPE_CHECKING, Literal
from bec_lib.client import BECClient
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import LogLevel, bec_logger
from bec_lib.messages import LogMessage, StatusMessage
from bec_qthemes import material_icon
from qtpy.QtCore import Signal # type: ignore
from qtpy.QtCore import (
QAbstractTableModel,
QCoreApplication,
QDateTime,
QModelIndex,
QObject,
QPersistentModelIndex,
QSize,
QSortFilterProxyModel,
Qt,
QTimer,
)
from qtpy.QtGui import QColor
from pyqtgraph import SignalProxy
from qtpy.QtCore import QDateTime, QObject, Qt, Signal
from qtpy.QtGui import QFont
from qtpy.QtWidgets import (
QApplication,
QCheckBox,
@@ -35,417 +25,204 @@ from qtpy.QtWidgets import (
QDialog,
QGridLayout,
QHBoxLayout,
QHeaderView,
QLabel,
QLineEdit,
QPushButton,
QSizePolicy,
QTableView,
QToolButton,
QScrollArea,
QTextEdit,
QVBoxLayout,
QWidget,
)
from thefuzz import fuzz
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.colors import apply_theme, get_theme_palette
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.editors.text_box.text_box import TextBox
from bec_widgets.widgets.services.bec_status_box.bec_status_box import BECServiceStatusMixin
from bec_widgets.widgets.utility.logpanel._util import (
LineFilter,
LineFormatter,
LinesHtmlFormatter,
create_formatter,
level_filter,
log_svc,
log_time,
log_txt,
noop_format,
simple_color_format,
)
if TYPE_CHECKING: # pragma: no cover
from qtpy.QtCore import SignalInstance
logger = bec_logger.logger
MODULE_PATH = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
_DEFAULT_LOG_COLORS = {
LogLevel.INFO.name: QColor("#FFFFFF"),
LogLevel.SUCCESS.name: QColor("#00FF00"),
LogLevel.WARNING.name: QColor("#FFCC00"),
LogLevel.ERROR.name: QColor("#FF0000"),
LogLevel.DEBUG.name: QColor("#0000CC"),
# TODO: improve log color handling
DEFAULT_LOG_COLORS = {
LogLevel.INFO: "#FFFFFF",
LogLevel.SUCCESS: "#00FF00",
LogLevel.WARNING: "#FFCC00",
LogLevel.ERROR: "#FF0000",
LogLevel.DEBUG: "#0000CC",
}
@dataclass(frozen=True)
class _Constants:
FUZZ_THRESHOLD = 80
UPDATE_INTERVAL_MS = 200
headers = ["level", "timestamp", "service_name", "message", "function"]
_CONST = _Constants()
class TimestampUpdate:
def __init__(self, value: QDateTime | None, update_type: Literal["start", "end"]) -> None:
self.value = value
self.update_type = update_type
class BecLogsQueue(BECConnector, QObject):
"""Manages getting logs from BEC Redis and formatting them for display"""
RPC = False
new_messages = Signal()
paused = Signal(bool)
_instance: BecLogsQueue | None = None
new_message = Signal()
@classmethod
def instance(cls):
if cls._instance is None:
cls._instance = cls(QCoreApplication.instance())
return cls._instance
def __init__(self, parent: QObject | None, maxlen: int = 2500, **kwargs) -> None:
if BecLogsQueue._instance:
raise RuntimeError("Create no more than one BecLogsQueue - use BecLogsQueue.instance()")
def __init__(
self,
parent: QObject | None,
maxlen: int = 1000,
line_formatter: LineFormatter = noop_format,
**kwargs,
) -> None:
super().__init__(parent=parent, **kwargs)
self._timestamp_start: QDateTime | None = None
self._timestamp_end: QDateTime | None = None
self._max_length = maxlen
self._paused = False
self._data = deque(
(
item["data"]
for item in self.bec_dispatcher.client.connector.xread(
MessageEndpoints.log(), count=self._max_length, id="0"
)
),
maxlen=self._max_length,
)
self._incoming: deque[LogMessage] = deque([], maxlen=self._max_length)
self._data: deque[LogMessage] = deque([], self._max_length)
self._display_queue: deque[str] = deque([], self._max_length)
self._log_level: str | None = None
self._search_query: Pattern | str | None = None
self._selected_services: set[str] | None = None
self._set_formatter_and_update_filter(line_formatter)
# instance attribute still accessible after c++ object is deleted, so the callback can be unregistered
self.bec_dispatcher.connect_slot(self._process_incoming_log_msg, MessageEndpoints.log())
self._update_timer = QTimer(self, interval=_CONST.UPDATE_INTERVAL_MS)
self._update_timer.timeout.connect(self._proc_update)
QCoreApplication.instance().aboutToQuit.connect(self.cleanup) # type: ignore
self._update_timer.start()
def __len__(self):
return len(self._data)
@SafeSlot()
def toggle_pause(self):
self._paused = not self._paused
self.paused.emit(self._paused)
def row_data(self, index: int) -> LogMessage | None:
if index < 0 or index > (len(self._data) - 1):
return None
return self._data[index]
def cell_data(self, row: int, key: str):
if key == "level":
return self._data[row].log_type.upper()
msg_item = self._data[row].log_msg
if isinstance(msg_item, str):
return msg_item
if key == "service_name":
return msg_item.get(key)
elif key in ["service_name", "function", "message"]:
return msg_item.get("record", {}).get(key)
elif key == "timestamp":
return msg_item.get("record", {}).get("time", {}).get("repr")
def log_timestamp(self, row: int) -> float:
msg_item = self._data[row].log_msg
if isinstance(msg_item, str):
return 0
return msg_item.get("record", {}).get("time", {}).get("timestamp")
def cleanup(self, *_):
"""Stop listening to the Redis log stream"""
self.bec_dispatcher.disconnect_slot(
self._process_incoming_log_msg, [MessageEndpoints.log()]
)
self._update_timer.stop()
BecLogsQueue._instance = None
@SafeSlot(verify_sender=True)
def _process_incoming_log_msg(self, msg: dict, _metadata: dict):
try:
_msg = LogMessage(**msg)
self._incoming.append(_msg)
self._data.append(_msg)
if self.filter is None or self.filter(_msg):
self._display_queue.append(self._line_formatter(_msg))
self.new_message.emit()
except Exception as e:
if "Internal C++ object (BecLogsQueue) already deleted." in e.args:
return
logger.warning(f"Error in LogPanel incoming message callback: {e}")
@SafeSlot(verify_sender=True)
def _proc_update(self):
if self._paused or len(self._incoming) == 0:
return
self._data.extend(self._incoming)
self._incoming.clear()
self.new_messages.emit()
def _set_formatter_and_update_filter(self, line_formatter: LineFormatter = noop_format):
self._line_formatter: LineFormatter = line_formatter
self._queue_formatter: LinesHtmlFormatter = create_formatter(
self._line_formatter, self.filter
)
def _combine_filters(self, *args: LineFilter):
return lambda msg: reduce(operator.and_, [filt(msg) for filt in args if filt is not None])
class BecLogsTableModel(QAbstractTableModel):
def __init__(self, parent: QWidget | None = None):
super().__init__(parent)
self.log_queue = BecLogsQueue.instance()
self.log_queue.new_messages.connect(self.handle_new_messages)
self._headers = _CONST.headers
def rowCount(self, parent: QModelIndex | QPersistentModelIndex = QModelIndex()) -> int:
return len(self.log_queue)
def columnCount(self, parent: QModelIndex | QPersistentModelIndex = QModelIndex()) -> int:
return len(self._headers)
def headerData(self, section, orientation, role=int(Qt.ItemDataRole.DisplayRole)):
if role == Qt.ItemDataRole.DisplayRole and orientation == Qt.Orientation.Horizontal:
return self._headers[section]
return None
def get_row_data(self, index: QModelIndex) -> LogMessage | None:
"""Return the row data for the given index."""
if not index.isValid():
def _create_re_filter(self) -> LineFilter:
if self._search_query is None:
return None
return self.log_queue.row_data(index.row())
elif isinstance(self._search_query, str):
return lambda line: self._search_query in log_txt(line)
return lambda line: self._search_query.match(log_txt(line)) is not None
def timestamp(self, row: int):
return QDateTime.fromMSecsSinceEpoch(int(self.log_queue.log_timestamp(row) * 1000))
def _create_service_filter(self):
return (
lambda line: self._selected_services is None or log_svc(line) in self._selected_services
)
def data(self, index, role=int(Qt.ItemDataRole.DisplayRole)):
"""Return data for the given index and role."""
if not index.isValid():
def _create_timestamp_filter(self) -> LineFilter:
s, e = self._timestamp_start, self._timestamp_end
if s is e is None:
return lambda msg: True
def _time_filter(msg):
msg_time = log_time(msg)
if s is None:
return msg_time <= e
if e is None:
return s <= msg_time
return s <= msg_time <= e
return _time_filter
@property
def filter(self) -> LineFilter:
"""A function which filters a log message based on all applied criteria"""
thresh = LogLevel[self._log_level].value if self._log_level is not None else 0
return self._combine_filters(
partial(level_filter, thresh=thresh),
self._create_re_filter(),
self._create_timestamp_filter(),
self._create_service_filter(),
)
def update_level_filter(self, level: str):
"""Change the log-level of the level filter"""
if level not in [l.name for l in LogLevel]:
logger.error(f"Logging level {level} unrecognized for filter!")
return
if role in [Qt.ItemDataRole.DisplayRole, Qt.ItemDataRole.ToolTipRole]:
return self.log_queue.cell_data(index.row(), self._headers[index.column()])
if role in [Qt.ItemDataRole.ForegroundRole]:
return self._map_log_level_color(self.log_queue.cell_data(index.row(), "level"))
self._log_level = level
self._set_formatter_and_update_filter(self._line_formatter)
def _map_log_level_color(self, data):
return _DEFAULT_LOG_COLORS.get(data)
def update_search_filter(self, search_query: Pattern | str | None = None):
"""Change the string or regex to filter against"""
self._search_query = search_query
self._set_formatter_and_update_filter(self._line_formatter)
def handle_new_messages(self):
self.dataChanged.emit(
self.index(0, 0), self.index(self.rowCount() - 1, self.columnCount() - 1)
def update_time_filter(self, start: QDateTime | None, end: QDateTime | None):
"""Change the start and/or end times to filter against"""
self._timestamp_start = start
self._timestamp_end = end
self._set_formatter_and_update_filter(self._line_formatter)
def update_service_filter(self, services: set[str]):
"""Change the selected services to display"""
self._selected_services = services
self._set_formatter_and_update_filter(self._line_formatter)
def update_line_formatter(self, line_formatter: LineFormatter):
"""Update the formatter"""
self._set_formatter_and_update_filter(line_formatter)
def display_all(self) -> str:
"""Return formatted output for all log messages"""
return "\n".join(self._queue_formatter(self._data.copy()))
def format_new(self):
"""Return formatted output for the display queue"""
res = "\n".join(self._display_queue)
self._display_queue = deque([], self._max_length)
return res
def clear_logs(self):
"""Clear the cache and display queue"""
self._data = deque([])
self._display_queue = deque([])
def fetch_history(self):
"""Fetch all available messages from Redis"""
self._data = deque(
item["data"]
for item in self.bec_dispatcher.client.connector.xread(
MessageEndpoints.log().endpoint, from_start=True, count=self._max_length
)
)
class LogMsgProxyModel(QSortFilterProxyModel):
show_service_column = Signal(bool)
def __init__(
self,
parent=None,
service_filter: set[str] | None = None,
level_filter: LogLevel | None = None,
):
super().__init__(parent)
self._service_filter = service_filter or set()
self._level_filter: LogLevel | None = level_filter
self._filter_text: str = ""
self._fuzzy_search: bool = False
self._time_filter_start: QDateTime | None = None
self._time_filter_end: QDateTime | None = None
def get_row_data(self, rows: Iterable[QModelIndex]) -> Iterable[LogMessage | None]:
return (self.sourceModel().get_row_data(self.mapToSource(idx)) for idx in rows)
def sourceModel(self) -> BecLogsTableModel:
return super().sourceModel() # type: ignore
@SafeSlot(int, int)
def refresh(self, *_):
self.invalidateRowsFilter()
@SafeSlot(None)
@SafeSlot(set)
def update_service_filter(self, filter: set[str]):
"""Filter to the selected services (show any service in the provided set)
Args:
filter (set[str] | None): set of services for which to show logs"""
self._service_filter = filter
self.show_service_column.emit(len(filter) != 1)
self.invalidateRowsFilter()
@SafeSlot(None)
@SafeSlot(LogLevel)
def update_level_filter(self, filter: LogLevel | None):
"""Filter to the selected log level
Args:
filter (str | None): lowest log level to show"""
self._level_filter = filter
self.invalidateRowsFilter()
@SafeSlot(str)
def update_filter_text(self, filter: str):
"""Filter messages based on text
Args:
filter (str | None): set of services for which to show logs"""
self._filter_text = filter
self.invalidateRowsFilter()
@SafeSlot(bool)
def update_fuzzy(self, state: bool):
"""Set text filter to fuzzy search or not
Args:
state (bool): fuzzy search on"""
self._fuzzy_search = state
self.invalidateRowsFilter()
@SafeSlot(TimestampUpdate)
def update_timestamp(self, update: TimestampUpdate):
if update.update_type == "start":
self._time_filter_start = update.value
else:
self._time_filter_end = update.value
self.invalidateRowsFilter()
def filterAcceptsRow(self, source_row: int, source_parent) -> bool:
# No service filter, and no filter text, display everything
possible_filters = [
self._service_filter,
self._level_filter,
self._filter_text,
self._time_filter_start,
self._time_filter_end,
]
if not any(map(bool, possible_filters)):
return True
model = self.sourceModel()
# Filter out services
if self._service_filter:
col = _CONST.headers.index("service_name")
if model.data(model.index(source_row, col, source_parent)) not in self._service_filter:
return False
# Filter out levels
if self._level_filter:
col = _CONST.headers.index("level")
level: str = model.data(model.index(source_row, col, source_parent)) # type: ignore
if LogLevel[level] < self._level_filter:
return False
# Filter time
if self._time_filter_start:
if model.timestamp(source_row) < self._time_filter_start:
return False
if self._time_filter_end:
if model.timestamp(source_row) > self._time_filter_end:
return False
# Filter message text - must go last because this can return True
if self._filter_text:
col = _CONST.headers.index("message")
msg: str = model.data(model.index(source_row, col, source_parent)).lower() # type: ignore
if self._fuzzy_search:
if fuzz.partial_ratio(self._filter_text.lower(), msg) >= _CONST.FUZZ_THRESHOLD:
return True
else:
return self._filter_text.lower() in msg.lower()
return True
class BecLogTableView(QTableView):
def __init__(self, *args, max_message_width: int = 1000, **kwargs) -> None:
super().__init__(*args, **kwargs)
header = QHeaderView(Qt.Orientation.Horizontal, parent=self)
header.setSectionResizeMode(QHeaderView.ResizeMode.Interactive)
header.setStretchLastSection(True)
header.setMaximumSectionSize(max_message_width)
self.setHorizontalHeader(header)
def model(self) -> LogMsgProxyModel:
return super().model() # type: ignore
class LogPanel(BECWidget, QWidget):
"""Live display of the BEC logs in a table view."""
PLUGIN = True
ICON_NAME = "browse_activity"
def __init__(
self,
parent: QWidget | None = None,
max_message_width: int = 1000,
show_toolbar: bool = True,
service_filter: set[str] | None = None,
level_filter: LogLevel | None = None,
**kwargs,
) -> None:
super().__init__(parent=parent, **kwargs)
self._setup_models(service_filter=service_filter, level_filter=level_filter)
self._layout = QVBoxLayout()
self.setLayout(self._layout)
if show_toolbar:
self._setup_toolbar(client=self.client)
self._setup_table_view(max_message_width=max_message_width)
self._update_service_filter(service_filter or set())
if show_toolbar:
self._connect_toolbar()
self._proxy.show_service_column.connect(self._show_service_column)
colors = QApplication.instance().theme.accent_colors # type: ignore
dict_colors = QApplication.instance().theme.colors # type: ignore
_DEFAULT_LOG_COLORS.update(
{
LogLevel.INFO.name: dict_colors["FG"],
LogLevel.SUCCESS.name: colors.success,
LogLevel.WARNING.name: colors.warning,
LogLevel.ERROR.name: colors.emergency,
LogLevel.DEBUG.name: dict_colors["BORDER"],
}
)
self._table.scrollToBottom()
def _setup_models(self, service_filter: set[str] | None, level_filter: LogLevel | None):
self._model = BecLogsTableModel(parent=self)
self._proxy = LogMsgProxyModel(
parent=self, service_filter=service_filter, level_filter=level_filter
)
self._proxy.setSourceModel(self._model)
self._model.log_queue.new_messages.connect(self._proxy.refresh)
def _setup_table_view(self, max_message_width: int) -> None:
"""Setup the table view."""
self._table = BecLogTableView(self, max_message_width=max_message_width)
self._table.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
self._layout.addWidget(self._table)
self._table.setModel(self._proxy)
self._table.setHorizontalScrollMode(QTableView.ScrollMode.ScrollPerPixel)
self._table.setTextElideMode(Qt.TextElideMode.ElideRight)
self._table.resizeColumnsToContents()
def _setup_toolbar(self, client: BECClient):
self._toolbar = LogPanelToolbar(self, client)
self._layout.addWidget(self._toolbar)
def _connect_toolbar(self):
self._toolbar.services_selected.connect(self._proxy.update_service_filter)
self._toolbar.search_textbox.textChanged.connect(self._proxy.update_filter_text)
self._toolbar.level_changed.connect(self._proxy.update_level_filter)
self._toolbar.fuzzy_changed.connect(self._proxy.update_fuzzy)
self._toolbar.timestamp_update.connect(self._proxy.update_timestamp)
self._toolbar.pause_button.clicked.connect(self._model.log_queue.toggle_pause)
self._model.log_queue.paused.connect(self._toolbar._update_pause_button_icon)
def _update_service_filter(self, filter: set[str]):
self._service_filter = filter
self._proxy.update_service_filter(filter)
self._table.setColumnHidden(
_CONST.headers.index("service_name"), len(self._service_filter) == 1
)
@SafeSlot(bool)
def _show_service_column(self, show: bool):
self._table.setColumnHidden(_CONST.headers.index("service_name"), not show)
def sizeHint(self) -> QSize:
return QSize(600, 300)
def unique_service_names_from_history(self) -> set[str]:
"""Go through the log history to determine active service names"""
return set(msg.log_msg["service_name"] for msg in self._data)
class LogPanelToolbar(QWidget):
services_selected = Signal(set)
level_changed = Signal(LogLevel)
fuzzy_changed = Signal(bool)
timestamp_update = Signal(TimestampUpdate)
services_selected: SignalInstance = Signal(set)
def __init__(self, parent: QWidget | None = None, client: BECClient | None = None) -> None:
def __init__(self, parent: QWidget | None = None) -> None:
"""A toolbar for the logpanel, mainly used for managing the states of filters"""
super().__init__(parent)
@@ -454,46 +231,44 @@ class LogPanelToolbar(QWidget):
self._timestamp_end: QDateTime | None = None
self._unique_service_names: set[str] = set()
self._services_selected: set[str] = set()
self._services_selected: set[str] | None = None
self._layout = QHBoxLayout(self)
self.layout = QHBoxLayout(self) # type: ignore
if client is not None:
self.client = client
self.service_choice_button = QPushButton("Select services", self)
self._layout.addWidget(self.service_choice_button)
self.service_choice_button.clicked.connect(self._open_service_filter_dialog)
self.service_choice_button = QPushButton("Select services", self)
self.layout.addWidget(self.service_choice_button)
self.service_choice_button.clicked.connect(self._open_service_filter_dialog)
self.filter_level_dropdown = self._log_level_box()
self._layout.addWidget(self.filter_level_dropdown)
self.filter_level_dropdown.currentTextChanged.connect(self._emit_level)
self.layout.addWidget(self.filter_level_dropdown)
self.clear_button = QPushButton("Clear all", self)
self.layout.addWidget(self.clear_button)
self.fetch_button = QPushButton("Fetch history", self)
self.layout.addWidget(self.fetch_button)
self._string_search_box()
self.timerange_button = QPushButton("Set time range", self)
self._layout.addWidget(self.timerange_button)
self.timerange_button.clicked.connect(self._open_datetime_dialog)
self.layout.addWidget(self.timerange_button)
self.pause_button = QToolButton()
self.pause_button.setIcon(material_icon("pause", size=(20, 20), convert_to_pixmap=False))
self._layout.addWidget(self.pause_button)
@property
def time_start(self):
return self._timestamp_start
@SafeSlot(bool)
def _update_pause_button_icon(self, paused):
self.pause_button.setIcon(
material_icon(
"play_arrow" if paused else "pause", size=(20, 20), convert_to_pixmap=False
)
)
@property
def time_end(self):
return self._timestamp_end
def _string_search_box(self):
self._layout.addWidget(QLabel("Search: "))
self.layout.addWidget(QLabel("Search: "))
self.search_textbox = QLineEdit()
self._layout.addWidget(self.search_textbox)
self._layout.addWidget(QLabel("Fuzzy: "))
self.fuzzy = QCheckBox()
self._layout.addWidget(self.fuzzy)
self.fuzzy.checkStateChanged.connect(self._emit_fuzzy)
self.layout.addWidget(self.search_textbox)
self.layout.addWidget(QLabel("Use regex: "))
self.regex_enabled = QCheckBox()
self.layout.addWidget(self.regex_enabled)
self.update_re_button = QPushButton("Update search", self)
self.layout.addWidget(self.update_re_button)
def _log_level_box(self):
box = QComboBox()
@@ -501,14 +276,6 @@ class LogPanelToolbar(QWidget):
[box.addItem(l.name) for l in LogLevel]
return box
@SafeSlot(str)
def _emit_level(self, level: str):
self.level_changed.emit(LogLevel[level])
@SafeSlot(Qt.CheckState)
def _emit_fuzzy(self, state: Qt.CheckState):
self.fuzzy_changed.emit(state == Qt.CheckState.Checked)
def _current_ts(self, selection_type: Literal["start", "end"]):
if selection_type == "start":
return self._timestamp_start
@@ -517,7 +284,6 @@ class LogPanelToolbar(QWidget):
else:
raise ValueError(f"timestamps can only be for the start or end, not {selection_type}")
@SafeSlot()
def _open_datetime_dialog(self):
"""Open dialog window for timestamp filter selection"""
self._dt_dialog = QDialog(self)
@@ -546,8 +312,8 @@ class LogPanelToolbar(QWidget):
)
_layout.addWidget(date_clear_button)
date_button_set("start", label_start)
date_button_set("end", label_end)
for v in [("start", label_start), ("end", label_end)]:
date_button_set(*v)
close_button = QPushButton("Close", parent=self._dt_dialog)
close_button.clicked.connect(self._dt_dialog.accept)
@@ -586,15 +352,19 @@ class LogPanelToolbar(QWidget):
self._timestamp_start = dt
else:
self._timestamp_end = dt
self.timestamp_update.emit(TimestampUpdate(value=dt, update_type=selection_type))
def service_list_update(self, services_info: dict[str, StatusMessage]):
@SafeSlot(dict, set)
def service_list_update(
self, services_info: dict[str, StatusMessage], services_from_history: set[str], *_, **__
):
"""Change the list of services which can be selected"""
self._unique_service_names = set([s.split("/")[0] for s in services_info.keys()])
self._unique_service_names |= services_from_history
if self._services_selected is None:
self._services_selected = self._unique_service_names
@SafeSlot()
def _open_service_filter_dialog(self):
self.service_list_update(self.client.service_status)
if len(self._unique_service_names) == 0 or self._services_selected is None:
return
self._svc_dialog = QDialog(self)
@@ -602,7 +372,7 @@ class LogPanelToolbar(QWidget):
layout = QVBoxLayout()
self._svc_dialog.setLayout(layout)
service_cb_grid = QGridLayout()
service_cb_grid = QGridLayout(parent=self._svc_dialog)
layout.addLayout(service_cb_grid)
def check_box(name: str, checked: Qt.CheckState):
@@ -628,6 +398,146 @@ class LogPanelToolbar(QWidget):
self._svc_dialog.deleteLater()
class LogPanel(TextBox):
"""Displays a log panel"""
ICON_NAME = "terminal"
service_list_update = Signal(dict, set)
def __init__(
self,
parent=None,
client: BECClient | None = None,
service_status: BECServiceStatusMixin | None = None,
**kwargs,
):
"""Initialize the LogPanel widget."""
super().__init__(parent=parent, client=client, config={"text": ""}, **kwargs)
self._update_colors()
self._service_status = service_status or BECServiceStatusMixin(self, client=self.client) # type: ignore
self._log_manager = BecLogsQueue(
parent=self, line_formatter=partial(simple_color_format, colors=self._colors)
)
self._proxy_update = SignalProxy(
self._log_manager.new_message, rateLimit=1, slot=self._on_append
)
self.toolbar = LogPanelToolbar(parent=self)
self.toolbar_area = QScrollArea()
self.toolbar_area.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff)
self.toolbar_area.setSizeAdjustPolicy(QScrollArea.SizeAdjustPolicy.AdjustToContents)
self.toolbar_area.setFixedHeight(int(self.toolbar.clear_button.height() * 2))
self.toolbar_area.setWidget(self.toolbar)
self.layout.addWidget(self.toolbar_area)
self.toolbar.clear_button.clicked.connect(self._on_clear)
self.toolbar.fetch_button.clicked.connect(self._on_fetch)
self.toolbar.update_re_button.clicked.connect(self._on_re_update)
self.toolbar.search_textbox.returnPressed.connect(self._on_re_update)
self.toolbar.regex_enabled.checkStateChanged.connect(self._on_re_update)
self.toolbar.filter_level_dropdown.currentTextChanged.connect(self._set_level_filter)
self.toolbar.timerange_button.clicked.connect(self._choose_datetime)
self._service_status.services_update.connect(self._update_service_list)
self.service_list_update.connect(self.toolbar.service_list_update)
self.toolbar.services_selected.connect(self._update_service_filter)
self.text_box_text_edit.setFont(QFont("monospace", 12))
self.text_box_text_edit.setHtml("")
self.text_box_text_edit.setLineWrapMode(QTextEdit.LineWrapMode.NoWrap)
self._connect_to_theme_change()
@SafeSlot(set)
def _update_service_filter(self, services: set[str]):
self._log_manager.update_service_filter(services)
self._on_redraw()
@SafeSlot(dict, dict)
def _update_service_list(self, services_info: dict[str, StatusMessage], *_, **__):
self.service_list_update.emit(
services_info, self._log_manager.unique_service_names_from_history()
)
@SafeSlot()
def _choose_datetime(self):
self.toolbar._open_datetime_dialog()
self._set_time_filter()
def _connect_to_theme_change(self):
"""Connect to the theme change signal."""
qapp = QApplication.instance()
if hasattr(qapp, "theme_signal"):
qapp.theme_signal.theme_updated.connect(self._on_redraw) # type: ignore
def _update_colors(self):
self._colors = DEFAULT_LOG_COLORS.copy()
self._colors.update({LogLevel.INFO: get_theme_palette().text().color().name()})
def _cursor_to_end(self):
c = self.text_box_text_edit.textCursor()
c.movePosition(c.MoveOperation.End)
self.text_box_text_edit.setTextCursor(c)
@SafeSlot()
@SafeSlot(str)
def _on_redraw(self, *_):
self._update_colors()
self._log_manager.update_line_formatter(partial(simple_color_format, colors=self._colors))
self.set_html_text(self._log_manager.display_all())
self._cursor_to_end()
@SafeSlot(verify_sender=True)
def _on_append(self, *_):
self.text_box_text_edit.insertHtml(self._log_manager.format_new())
self._cursor_to_end()
@SafeSlot()
def _on_clear(self):
self._log_manager.clear_logs()
self.set_html_text(self._log_manager.display_all())
self._cursor_to_end()
@SafeSlot()
@SafeSlot(Qt.CheckState)
def _on_re_update(self, *_):
if self.toolbar.regex_enabled.isChecked():
try:
search_query = re.compile(self.toolbar.search_textbox.text())
except Exception as e:
logger.warning(f"Failed to compile search regex with error {e}")
search_query = None
logger.info(f"Setting LogPanel search regex to {search_query}")
else:
search_query = self.toolbar.search_textbox.text()
logger.info(f'Setting LogPanel search string to "{search_query}"')
self._log_manager.update_search_filter(search_query)
self.set_html_text(self._log_manager.display_all())
self._cursor_to_end()
@SafeSlot()
def _on_fetch(self):
self._log_manager.fetch_history()
self.set_html_text(self._log_manager.display_all())
self._cursor_to_end()
@SafeSlot(str)
def _set_level_filter(self, level: str):
self._log_manager.update_level_filter(level)
self._on_redraw()
@SafeSlot()
def _set_time_filter(self):
self._log_manager.update_time_filter(self.toolbar.time_start, self.toolbar.time_end)
self._on_redraw()
def cleanup(self):
self._service_status.cleanup()
self._log_manager.cleanup()
self._log_manager.deleteLater()
super().cleanup()
if __name__ == "__main__": # pragma: no cover
import sys
@@ -635,15 +545,7 @@ if __name__ == "__main__": # pragma: no cover
app = QApplication(sys.argv)
apply_theme("dark")
panel = QWidget()
queue = BecLogsQueue(panel)
layout = QVBoxLayout(panel)
layout.addWidget(QLabel("All logs, no filters:"))
layout.addWidget(LogPanel())
layout.addWidget(QLabel("All services, level filter WARNING preapplied:"))
layout.addWidget(LogPanel(level_filter=LogLevel.WARNING))
layout.addWidget(QLabel('All services, service filter {"DeviceServer"} preapplied:'))
layout.addWidget(LogPanel(service_filter={"DeviceServer"}))
widget = LogPanel()
panel.show()
widget.show()
sys.exit(app.exec())

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "bec_widgets"
version = "3.1.4"
version = "3.2.0"
description = "BEC Widgets"
requires-python = ">=3.11"
classifiers = [

View File

@@ -1,5 +1,3 @@
import traceback
import pytest
import qtpy.QtCore
from pytestqt.exceptions import TimeoutError as QtBotTimeoutError
@@ -7,14 +5,12 @@ from qtpy.QtCore import QTimer
class TestableQTimer(QTimer):
_instances: list[tuple[QTimer, str, str]] = []
_instances: list[tuple[QTimer, str]] = []
_current_test_name: str = ""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
tb = traceback.format_stack()
init_line = list(filter(lambda msg: "QTimer(" in msg, tb))[-1]
TestableQTimer._instances.append((self, TestableQTimer._current_test_name, init_line))
TestableQTimer._instances.append((self, TestableQTimer._current_test_name))
@classmethod
def check_all_stopped(cls, qtbot):
@@ -24,21 +20,12 @@ class TestableQTimer(QTimer):
except RuntimeError as e:
return "already deleted" in e.args[0]
def _format_timers(timers: list[tuple[QTimer, str, str]]):
return "\n".join(
f"Timer: {t[0]}\n in test: {t[1]}\n created at:{t[2]}" for t in timers
)
try:
qtbot.waitUntil(
lambda: all(_is_done_or_deleted(timer) for timer, _, _ in cls._instances)
)
qtbot.waitUntil(lambda: all(_is_done_or_deleted(timer) for timer, _ in cls._instances))
except QtBotTimeoutError as exc:
active_timers = list(filter(lambda t: t[0].isActive(), cls._instances))
(t.stop() for t, _, _ in cls._instances)
raise TimeoutError(
f"Failed to stop all timers:\n{_format_timers(active_timers)}"
) from exc
(t.stop() for t, _ in cls._instances)
raise TimeoutError(f"Failed to stop all timers: {active_timers}") from exc
cls._instances = []

View File

@@ -150,7 +150,9 @@ def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
# communication should work, main dock area should have same id and be visible
yw = gui.new("Y")
qtbot.waitUntil(lambda: len(gui.windows) == 2, timeout=3000)
yw.delete_all()
assert len(gui.windows) == 2
yw.remove()
assert len(gui.windows) == 1 # only bec is left
qtbot.waitUntil(lambda: len(gui.windows) == 1, timeout=3000)
assert len(gui.windows) == 1

View File

@@ -72,6 +72,7 @@ def test_rpc_plotting_shortcuts_init_configs(qtbot, connected_client_gui_obj):
"dap": None,
"device": "bpm4i",
"signal": "bpm4i",
"dap_parameters": None,
"dap_oversample": 1,
}
assert c1._config_dict["source"] == "device"

View File

@@ -135,304 +135,321 @@ def maybe_remove_dock_area(qtbot, gui: BECGuiClient, random_int_gen: random.Rand
wait_for_namespace_change(
qtbot, gui=gui, parent_widget=gui, object_name=name, widget_gui_id=gui_id, exists=False
)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_bec_progress_bar(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the BECProgressBar widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.BECProgressBar)
widget: client.BECProgressBar
# Check rpc calls
assert widget.label_template == "$value / $maximum - $percentage %"
widget.set_maximum(100)
widget.set_minimum(50)
widget.set_value(75)
assert widget._get_label() == "75 / 100 - 50 %"
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_bec_queue(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the BECQueue widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.BECQueue)
widget: client.BECQueue
# No rpc calls to test so far
# maybe we can add an rpc call to check the queue length
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_bec_status_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the BECStatusBox widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.BECStatusBox)
# Check rpc calls
assert widget.get_server_state() in ["RUNNING", "IDLE", "BUSY", "ERROR"]
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_dap_combo_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the DAPComboBox widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.DapComboBox)
widget: client.DAPComboBox
# Check rpc calls
widget.select_fit_model("PseudoVoigtModel")
widget.select_x_axis("samx")
widget.select_y_axis("bpm4i")
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_device_browser(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the DeviceBrowser widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.DeviceBrowser)
widget: client.DeviceBrowser
# No rpc calls yet to check
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_image(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the Image widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.Image)
widget: client.Image
scans = bec.scans
dev = bec.device_manager.devices
# Test rpc calls
img = widget.image(device=dev.eiger.name, signal="preview")
assert img.get_data() is None
# Run a scan and plot the image
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
s.wait()
def _wait_for_scan_in_history():
# Get scan item from history
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
return scan_item is not None
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# Check that last image is equivalent to data in Redis
last_img = bec.connector.get_last(MessageEndpoints.device_preview("eiger", "preview"))[
"data"
].data
assert np.allclose(img.get_data(), last_img)
# Now add a device with a preview signal
img = widget.image(device="eiger", signal="preview")
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
s.wait()
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_minesweeper(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the MineSweeper widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.Minesweeper)
widget: client.MineSweeper
# No rpc calls to check so far
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_motor_map(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the MotorMap widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.MotorMap)
widget: client.MotorMap
# Test RPC calls
dev = bec.device_manager.devices
scans = bec.scans
# Set motor map to names
widget.map(dev.samx, dev.samy)
# Move motor samx to pos
pos = dev.samx.limits[1] - 1 # -1 from higher limit
scans.mv(dev.samx, pos, relative=False).wait()
# Check that data is up to date
assert np.isclose(widget.get_data()["x"][-1], pos, dev.samx.precision)
# Move motor samy to pos
pos = dev.samy.limits[0] + 1 # +1 from lower limit
scans.mv(dev.samy, pos, relative=False).wait()
# Check that data is up to date
assert np.isclose(widget.get_data()["y"][-1], pos, dev.samy.precision)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_multi_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test MultiWaveform widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.MultiWaveform)
widget: client.MultiWaveform
# Test RPC calls
dev = bec.device_manager.devices
scans = bec.scans
# test plotting
cm = "cividis"
widget.plot(dev.waveform, color_palette=cm)
assert widget.monitor == dev.waveform.name
assert widget.color_palette == cm
# Scan with BEC
s = scans.line_scan(dev.samx, -3, 3, steps=5, exp_time=0.01, relative=False)
s.wait()
def _wait_for_scan_in_history():
# Get scan item from history
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
return scan_item is not None
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# Wait for data in history (should be plotted?)
# TODO how can we check that the data was plotted, implement get_data()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_indicator(
qtbot, connected_client_gui_obj, random_generator_from_seed
):
"""Test the PositionIndicator widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.PositionIndicator)
widget: client.PositionIndicator
# TODO check what these rpc calls are supposed to do! Issue created #461
widget.set_value(5)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the PositionerBox widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox)
widget: client.PositionerBox
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
# No rpc calls to check so far
widget.set_positioner(dev.samx)
widget.set_positioner(dev.samy.name)
scans.mv(dev.samy, -3, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_box_2d(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the PositionerBox2D widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox2D)
widget: client.PositionerBox2D
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
# No rpc calls to check so far
widget.set_positioner_hor(dev.samx)
widget.set_positioner_ver(dev.samy)
# Try moving the motors
scans.mv(dev.samx, 3, relative=False).wait()
scans.mv(dev.samy, -3, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_control_line(
qtbot, connected_client_gui_obj, random_generator_from_seed
):
"""Test the positioner control line widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.PositionerControlLine)
widget: client.PositionerControlLine
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
# Set positioner
widget.set_positioner(dev.samx)
scans.mv(dev.samx, 3, relative=False).wait()
widget.set_positioner(dev.samy.name)
scans.mv(dev.samy, -3, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# TODO passes locally, fails on CI for some reason... -> issue #1003
qtbot.waitUntil(lambda: hasattr(gui, "dock_area") is False, timeout=5000)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_bec_progress_bar(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the BECProgressBar widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.BECProgressBar)
# widget: client.BECProgressBar
# # Check rpc calls
# assert widget.label_template == "$value / $maximum - $percentage %"
# widget.set_maximum(100)
# widget.set_minimum(50)
# widget.set_value(75)
# assert widget._get_label() == "75 / 100 - 50 %"
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_bec_queue(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the BECQueue widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.BECQueue)
# widget: client.BECQueue
# # No rpc calls to test so far
# # maybe we can add an rpc call to check the queue length
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_bec_status_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the BECStatusBox widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.BECStatusBox)
# # Check rpc calls
# assert widget.get_server_state() in ["RUNNING", "IDLE", "BUSY", "ERROR"]
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_dap_combo_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the DAPComboBox widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.DapComboBox)
# widget: client.DAPComboBox
# # Check rpc calls
# widget.select_fit_model("PseudoVoigtModel")
# widget.select_x_axis("samx")
# widget.select_y_axis("bpm4i")
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_device_browser(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the DeviceBrowser widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.DeviceBrowser)
# widget: client.DeviceBrowser
# # No rpc calls yet to check
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_image(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the Image widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.Image)
# widget: client.Image
# scans = bec.scans
# dev = bec.device_manager.devices
# # Test rpc calls
# img = widget.image(device=dev.eiger.name, signal="preview")
# assert img.get_data() is None
# # Run a scan and plot the image
# s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
# s.wait()
# def _wait_for_scan_in_history():
# # Get scan item from history
# scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
# return scan_item is not None
# qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# # Check that last image is equivalent to data in Redis
# last_img = bec.connector.get_last(MessageEndpoints.device_preview("eiger", "preview"))[
# "data"
# ].data
# assert np.allclose(img.get_data(), last_img)
# # Now add a device with a preview signal
# img = widget.image(device="eiger", signal="preview")
# s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
# s.wait()
# qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# # TODO re-enable when issue is resolved #560
# # @pytest.mark.timeout(PYTEST_TIMEOUT)
# # def test_widgets_e2e_log_panel(qtbot, connected_client_gui_obj, random_generator_from_seed):
# # """Test the LogPanel widget."""
# # gui = connected_client_gui_obj
# # bec = gui._client
# # # Create dock_area and widget
# # widget = create_widget(qtbot, gui, gui.available_widgets.LogPanel)
# # widget: client.LogPanel
# # # No rpc calls to check so far
# # # Test removing the widget, or leaving it open for the next test
# # maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_minesweeper(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the MineSweeper widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.Minesweeper)
# widget: client.MineSweeper
# # No rpc calls to check so far
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_motor_map(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the MotorMap widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.MotorMap)
# widget: client.MotorMap
# # Test RPC calls
# dev = bec.device_manager.devices
# scans = bec.scans
# # Set motor map to names
# widget.map(dev.samx, dev.samy)
# # Move motor samx to pos
# pos = dev.samx.limits[1] - 1 # -1 from higher limit
# scans.mv(dev.samx, pos, relative=False).wait()
# # Check that data is up to date
# assert np.isclose(widget.get_data()["x"][-1], pos, dev.samx.precision)
# # Move motor samy to pos
# pos = dev.samy.limits[0] + 1 # +1 from lower limit
# scans.mv(dev.samy, pos, relative=False).wait()
# # Check that data is up to date
# assert np.isclose(widget.get_data()["y"][-1], pos, dev.samy.precision)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_multi_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test MultiWaveform widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.MultiWaveform)
# widget: client.MultiWaveform
# # Test RPC calls
# dev = bec.device_manager.devices
# scans = bec.scans
# # test plotting
# cm = "cividis"
# widget.plot(dev.waveform, color_palette=cm)
# assert widget.monitor == dev.waveform.name
# assert widget.color_palette == cm
# # Scan with BEC
# s = scans.line_scan(dev.samx, -3, 3, steps=5, exp_time=0.01, relative=False)
# s.wait()
# def _wait_for_scan_in_history():
# # Get scan item from history
# scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
# return scan_item is not None
# qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# # Wait for data in history (should be plotted?)
# # TODO how can we check that the data was plotted, implement get_data()
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_positioner_indicator(
# qtbot, connected_client_gui_obj, random_generator_from_seed
# ):
# """Test the PositionIndicator widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.PositionIndicator)
# widget: client.PositionIndicator
# # TODO check what these rpc calls are supposed to do! Issue created #461
# widget.set_value(5)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_positioner_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the PositionerBox widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox)
# widget: client.PositionerBox
# # Test rpc calls
# dev = bec.device_manager.devices
# scans = bec.scans
# # No rpc calls to check so far
# widget.set_positioner(dev.samx)
# widget.set_positioner(dev.samy.name)
# scans.mv(dev.samy, -3, relative=False).wait()
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_positioner_box_2d(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the PositionerBox2D widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox2D)
# widget: client.PositionerBox2D
# # Test rpc calls
# dev = bec.device_manager.devices
# scans = bec.scans
# # No rpc calls to check so far
# widget.set_positioner_hor(dev.samx)
# widget.set_positioner_ver(dev.samy)
# # Try moving the motors
# scans.mv(dev.samx, 3, relative=False).wait()
# scans.mv(dev.samy, -3, relative=False).wait()
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_positioner_control_line(
# qtbot, connected_client_gui_obj, random_generator_from_seed
# ):
# """Test the positioner control line widget"""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.PositionerControlLine)
# widget: client.PositionerControlLine
# # Test rpc calls
# dev = bec.device_manager.devices
# scans = bec.scans
# # Set positioner
# widget.set_positioner(dev.samx)
# scans.mv(dev.samx, 3, relative=False).wait()
# widget.set_positioner(dev.samy.name)
# scans.mv(dev.samy, -3, relative=False).wait()
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.repeat(20)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_ring_progress_bar(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the RingProgressBar widget"""
@@ -461,86 +478,86 @@ def test_widgets_e2e_ring_progress_bar(qtbot, connected_client_gui_obj, random_g
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_scan_control(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the ScanControl widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.ScanControl)
widget: client.ScanControl
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_scan_control(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the ScanControl widget"""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.ScanControl)
# widget: client.ScanControl
# No rpc calls to check so far
# # No rpc calls to check so far
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_scatter_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the ScatterWaveform widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.ScatterWaveform)
widget: client.ScatterWaveform
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_scatter_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the ScatterWaveform widget"""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.ScatterWaveform)
# widget: client.ScatterWaveform
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
widget.plot(dev.samx, dev.samy, dev.bpm4i)
scans.grid_scan(dev.samx, -5, 5, 5, dev.samy, -5, 5, 5, exp_time=0.01, relative=False).wait()
# # Test rpc calls
# dev = bec.device_manager.devices
# scans = bec.scans
# widget.plot(dev.samx, dev.samy, dev.bpm4i)
# scans.grid_scan(dev.samx, -5, 5, 5, dev.samy, -5, 5, 5, exp_time=0.01, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_text_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the TextBox widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.TextBox)
widget: client.TextBox
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_text_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the TextBox widget"""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.TextBox)
# widget: client.TextBox
# RPC calls
widget.set_plain_text("Hello World")
widget.set_html_text("<b> Hello World HTML </b>")
# # RPC calls
# widget.set_plain_text("Hello World")
# widget.set_html_text("<b> Hello World HTML </b>")
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the Waveform widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area and widget
widget = create_widget(qtbot, gui, gui.available_widgets.Waveform)
widget: client.Waveform
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the Waveform widget"""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area and widget
# widget = create_widget(qtbot, gui, gui.available_widgets.Waveform)
# widget: client.Waveform
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
widget.plot(dev.bpm4i)
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
s.wait()
# # Test rpc calls
# dev = bec.device_manager.devices
# scans = bec.scans
# widget.plot(dev.bpm4i)
# s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
# s.wait()
def _wait_for_scan_in_history():
# Get scan item from history
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
return scan_item is not None
# def _wait_for_scan_in_history():
# # Get scan item from history
# scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
# return scan_item is not None
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
samx_data = scan_item.devices.samx.samx.read()["value"]
bpm4i_data = scan_item.devices.bpm4i.bpm4i.read()["value"]
curve = widget.curves[0]
assert np.allclose(curve.get_data()[0], samx_data)
assert np.allclose(curve.get_data()[1], bpm4i_data)
# scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
# samx_data = scan_item.devices.samx.samx.read()["value"]
# bpm4i_data = scan_item.devices.bpm4i.bpm4i.read()["value"]
# curve = widget.curves[0]
# assert np.allclose(curve.get_data()[0], samx_data)
# assert np.allclose(curve.get_data()[1], bpm4i_data)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)

View File

@@ -2229,6 +2229,7 @@ class TestFlatToolbarActions:
"flat_progress_bar",
"flat_terminal",
"flat_bec_shell",
"flat_log_panel",
"flat_sbb_monitor",
]
@@ -2288,6 +2289,11 @@ class TestFlatToolbarActions:
action.trigger()
mock_new.assert_called_once_with(widget_type)
def test_flat_log_panel_action_disabled(self, advanced_dock_area):
"""Test that flat log panel action is disabled."""
action = advanced_dock_area.toolbar.components.get_action("flat_log_panel").action
assert not action.isEnabled()
class TestModeTransitions:
"""Test mode transitions and state consistency."""

View File

@@ -7,123 +7,163 @@ from collections import deque
from unittest.mock import MagicMock, patch
import pytest
from bec_lib.logger import LogLevel
from bec_lib.messages import LogMessage
from bec_lib.redis_connector import StreamMessage
from qtpy.QtCore import QDateTime
from bec_widgets.widgets.utility.logpanel.logpanel import LogPanel, TimestampUpdate
from bec_widgets.widgets.utility.logpanel._util import (
log_time,
replace_escapes,
simple_color_format,
)
from bec_widgets.widgets.utility.logpanel.logpanel import DEFAULT_LOG_COLORS, LogPanel
from .client_mocks import mocked_client
TEST_TABLE_STRING = "2025-01-15 15:57:18 | bec_server.scan_server.scan_queue | [INFO] | \n \x1b[3m primary queue / ScanQueueStatus.RUNNING \x1b[0m\n┏━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━┓\n\x1b[1m \x1b[0m\x1b[1m queue_id \x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mscan_id\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mis_scan\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mtype\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mscan_numb…\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mIQ status\x1b[0m\x1b[1m \x1b[0m┃\n┡━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━┩\n│ bbe50c82-6… │ None │ False │ mv │ None │ PENDING │\n└─────────────┴─────────┴─────────┴──────┴────────────┴───────────┘\n\n"
TEST_LOG_MESSAGES = [
{"data": msg}
for msg in [
LogMessage(
LogMessage(
metadata={},
log_type="debug",
log_msg={
"text": "datetime | debug | test log message",
"record": {"time": {"timestamp": 123456789.000}},
"service_name": "ScanServer",
},
),
LogMessage(
metadata={},
log_type="info",
log_msg={
"text": "datetime | info | test log message",
"record": {"time": {"timestamp": 123456789.007}},
"service_name": "ScanServer",
},
),
LogMessage(
metadata={},
log_type="success",
log_msg={
"text": "datetime | success | test log message",
"record": {"time": {"timestamp": 123456789.012}},
"service_name": "ScanServer",
},
),
]
TEST_COMBINED_PLAINTEXT = "datetime | debug | test log message\ndatetime | info | test log message\ndatetime | success | test log message\n"
@pytest.fixture
def raw_queue():
yield deque(TEST_LOG_MESSAGES, maxlen=100)
@pytest.fixture
def log_panel(qtbot, mocked_client: MagicMock):
widget = LogPanel(client=mocked_client, service_status=MagicMock())
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_log_panel_init(log_panel: LogPanel):
assert log_panel.plain_text == ""
def test_table_string_processing():
assert "\x1b" in TEST_TABLE_STRING
sanitized = replace_escapes(TEST_TABLE_STRING)
assert "\x1b" not in sanitized
assert " " not in sanitized
assert "\n" not in sanitized
@pytest.mark.parametrize(
["msg", "color"], zip(TEST_LOG_MESSAGES, ["#0000CC", "#FFFFFF", "#00FF00"])
)
def test_color_format(msg: LogMessage, color: str):
assert color in simple_color_format(msg, DEFAULT_LOG_COLORS)
def test_logpanel_output(qtbot, log_panel: LogPanel):
log_panel._log_manager._data = deque(TEST_LOG_MESSAGES)
log_panel._on_redraw()
assert log_panel.plain_text == TEST_COMBINED_PLAINTEXT
def display_queue_empty():
print(log_panel._log_manager._display_queue)
return len(log_panel._log_manager._display_queue) == 0
next_text = "datetime | error | test log message"
msg = LogMessage(
metadata={},
log_type="error",
log_msg={"text": next_text, "record": {}, "service_name": "ScanServer"},
)
log_panel._log_manager._process_incoming_log_msg(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
qtbot.waitUntil(display_queue_empty, timeout=5000)
assert log_panel.plain_text == TEST_COMBINED_PLAINTEXT + next_text + "\n"
def test_level_filter(log_panel: LogPanel):
log_panel._log_manager._data = deque(TEST_LOG_MESSAGES)
log_panel._log_manager.update_level_filter("INFO")
log_panel._on_redraw()
assert (
log_panel.plain_text
== "datetime | info | test log message\ndatetime | success | test log message\n"
)
def test_clear_button(log_panel: LogPanel):
log_panel._log_manager._data = deque(TEST_LOG_MESSAGES)
log_panel.toolbar.clear_button.click()
assert log_panel._log_manager._data == deque([])
def test_timestamp_filter(log_panel: LogPanel):
log_panel._log_manager._timestamp_start = QDateTime(1973, 11, 29, 21, 33, 9, 5, 1)
pytest.approx(log_panel._log_manager._timestamp_start.toMSecsSinceEpoch() / 1000, 123456789.005)
log_panel._log_manager._timestamp_end = QDateTime(1973, 11, 29, 21, 33, 9, 10, 1)
pytest.approx(log_panel._log_manager._timestamp_end.toMSecsSinceEpoch() / 1000, 123456789.010)
filter_ = log_panel._log_manager._create_timestamp_filter()
assert not filter_(TEST_LOG_MESSAGES[0])
assert filter_(TEST_LOG_MESSAGES[1])
assert not filter_(TEST_LOG_MESSAGES[2])
def test_error_handling_in_callback(log_panel: LogPanel):
log_panel._log_manager.new_message = MagicMock()
with patch("bec_widgets.widgets.utility.logpanel.logpanel.logger") as logger:
# generally errors should be logged
log_panel._log_manager.new_message.emit = MagicMock(
side_effect=ValueError("Something went wrong")
)
msg = LogMessage(
metadata={},
log_type="debug",
log_msg={
"text": "datetime | debug | test log message",
"record": {
"time": {"timestamp": 123456789.000, "repr": "2025-01-01 00:00:01"},
"message": "test debug message abcd",
"function": "_debug",
},
"service_name": "ScanServer",
},
),
LogMessage(
metadata={},
log_type="info",
log_msg={
"text": "datetime | info | test info log message",
"record": {
"time": {"timestamp": 123456789.007, "repr": "2025-01-01 00:00:02"},
"message": "test info message efgh",
"function": "_info",
},
"service_name": "DeviceServer",
},
),
LogMessage(
metadata={},
log_type="success",
log_msg={
"text": "datetime | success | test log message",
"record": {
"time": {"timestamp": 123456789.012, "repr": "2025-01-01 00:00:03"},
"message": "test success message ijkl",
"function": "_success",
},
"service_name": "ScanServer",
},
),
]
]
@pytest.fixture
def log_panel(qtbot, mocked_client):
mocked_client.connector.xread = lambda *_, **__: TEST_LOG_MESSAGES
widget = LogPanel()
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
widget._model.log_queue.cleanup()
widget.close()
widget.deleteLater()
qtbot.wait(100)
def test_log_panel_init(qtbot, log_panel: LogPanel):
assert log_panel
def test_log_panel_filters(qtbot, log_panel: LogPanel):
assert log_panel._proxy.rowCount() == 3
# Service filter
log_panel._update_service_filter({"DeviceServer"})
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 1, timeout=200)
log_panel._update_service_filter(set())
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 3, timeout=200)
# Text filter
log_panel._proxy.update_filter_text("efgh")
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 1, timeout=200)
log_panel._proxy.update_filter_text("")
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 3, timeout=200)
# Time filter
log_panel._proxy.update_timestamp(
TimestampUpdate(value=QDateTime.fromMSecsSinceEpoch(123456789004), update_type="start")
)
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 2, timeout=200)
log_panel._proxy.update_timestamp(
TimestampUpdate(value=QDateTime.fromMSecsSinceEpoch(123456789009), update_type="end")
)
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 1, timeout=200)
log_panel._proxy.update_timestamp(TimestampUpdate(value=None, update_type="start"))
log_panel._proxy.update_timestamp(TimestampUpdate(value=None, update_type="end"))
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 3, timeout=200)
# Level filter
log_panel._proxy.update_level_filter(LogLevel.SUCCESS)
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 1, timeout=200)
log_panel._proxy.update_level_filter(None)
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 3, timeout=200)
def test_log_panel_update(qtbot, log_panel: LogPanel):
log_panel._model.log_queue._incoming.append(
LogMessage(
metadata={},
log_type="error",
log_msg={
"text": "datetime | error | test log message",
"record": {
"time": {"timestamp": 123456789.015, "repr": "2025-01-01 00:00:03"},
"message": "test error message xyz",
"function": "_error",
},
"record": {"time": {"timestamp": 123456789.000}},
"service_name": "ScanServer",
},
)
)
log_panel._model.log_queue._proc_update()
qtbot.waitUntil(lambda: log_panel._model.rowCount() == 4, timeout=500)
log_panel._log_manager._process_incoming_log_msg(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
logger.warning.assert_called_once()
# this specific error should be ignored and not relogged
log_panel._log_manager.new_message.emit = MagicMock(
side_effect=RuntimeError("Internal C++ object (BecLogsQueue) already deleted.")
)
log_panel._log_manager._process_incoming_log_msg(
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
)
logger.warning.assert_called_once()

View File

@@ -516,6 +516,112 @@ def test_plot_custom_curve_with_inline_dap(qtbot, mocked_client_with_dap):
assert dap_curve.config.signal.dap == "GaussianModel"
def test_normalize_dap_parameters_number_dict():
normalized = Waveform._normalize_dap_parameters({"amplitude": 1.0, "center": 2})
assert normalized == {
"amplitude": {"name": "amplitude", "value": 1.0, "vary": False},
"center": {"name": "center", "value": 2.0, "vary": False},
}
def test_normalize_dap_parameters_dict_spec_defaults_vary_false():
normalized = Waveform._normalize_dap_parameters({"sigma": {"value": 0.8, "min": 0.0}})
assert normalized["sigma"]["name"] == "sigma"
assert normalized["sigma"]["value"] == 0.8
assert normalized["sigma"]["min"] == 0.0
assert normalized["sigma"]["vary"] is False
def test_normalize_dap_parameters_invalid_type_raises():
with pytest.raises(TypeError):
Waveform._normalize_dap_parameters(["amplitude", 1.0]) # type: ignore[arg-type]
def test_normalize_dap_parameters_composite_list():
normalized = Waveform._normalize_dap_parameters(
[{"center": 1.0}, {"sigma": {"value": 0.5, "min": 0.0}}],
dap_name=["GaussianModel", "GaussianModel"],
)
assert normalized == [
{"center": {"name": "center", "value": 1.0, "vary": False}},
{"sigma": {"name": "sigma", "value": 0.5, "min": 0.0, "vary": False}},
]
def test_normalize_dap_parameters_composite_dict():
normalized = Waveform._normalize_dap_parameters(
{
"GaussianModel": {"center": {"value": 1.0, "vary": True}},
"LorentzModel": {"amplitude": 2.0},
},
dap_name=["GaussianModel", "LorentzModel"],
)
assert normalized["GaussianModel"]["center"]["value"] == 1.0
assert normalized["GaussianModel"]["center"]["vary"] is True
assert normalized["LorentzModel"]["amplitude"]["value"] == 2.0
assert normalized["LorentzModel"]["amplitude"]["vary"] is False
def test_request_dap_includes_normalized_parameters(qtbot, mocked_client_with_dap, monkeypatch):
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
curve = wf.plot(
x=[0, 1, 2],
y=[1, 2, 3],
label="custom-inline-params",
dap="GaussianModel",
dap_parameters={"amplitude": 1.0},
)
dap_curve = wf.get_curve(f"{curve.name()}-GaussianModel")
assert dap_curve is not None
dap_curve.dap_oversample = 3
captured = {}
def capture(topic, msg, *args, **kwargs): # noqa: ARG001
captured["topic"] = topic
captured["msg"] = msg
monkeypatch.setattr(wf.client.connector, "set_and_publish", capture)
wf.request_dap()
msg = captured["msg"]
dap_kwargs = msg.content["config"]["kwargs"]
assert dap_kwargs["oversample"] == 3
assert dap_kwargs["parameters"] == {
"amplitude": {"name": "amplitude", "value": 1.0, "vary": False}
}
def test_request_dap_includes_composite_parameters_list(qtbot, mocked_client_with_dap, monkeypatch):
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
curve = wf.plot(
x=[0, 1, 2],
y=[1, 2, 3],
label="custom-composite",
dap=["GaussianModel", "GaussianModel"],
dap_parameters=[{"center": 0.0}, {"center": 1.0}],
)
dap_curve = wf.get_curve(f"{curve.name()}-GaussianModel+GaussianModel")
assert dap_curve is not None
captured = {}
def capture(topic, msg, *args, **kwargs): # noqa: ARG001
captured["topic"] = topic
captured["msg"] = msg
monkeypatch.setattr(wf.client.connector, "set_and_publish", capture)
wf.request_dap()
msg = captured["msg"]
dap_kwargs = msg.content["config"]["kwargs"]
assert dap_kwargs["parameters"] == [
{"center": {"name": "center", "value": 0.0, "vary": False}},
{"center": {"name": "center", "value": 1.0, "vary": False}},
]
assert msg.content["config"]["class_kwargs"]["model"] == ["GaussianModel", "GaussianModel"]
def test_fetch_scan_data_and_access(qtbot, mocked_client, monkeypatch):
"""
Test the _fetch_scan_data_and_access method returns live_data/val if in a live scan,