mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-04-11 11:10:53 +02:00
Compare commits
15 Commits
feat/table
...
scratch/de
| Author | SHA1 | Date | |
|---|---|---|---|
| a275c5a3b1 | |||
| b12e6432d6 | |||
| b802beec69 | |||
| 6c40df4380 | |||
| dc8ae5a85d | |||
| 2d41a158bc | |||
| c2a6964875 | |||
| 774ca08cb3 | |||
| 4606bbd570 | |||
| b2ed9d42f7 | |||
| d799691e12 | |||
| 6848a9e20b | |||
|
|
bd5aafc052 | ||
| b4f6f5aa8b | |||
| 14d51b8016 |
102
.github/workflows/ci.yml
vendored
102
.github/workflows/ci.yml
vendored
@@ -1,19 +1,19 @@
|
||||
name: Full CI
|
||||
on:
|
||||
on:
|
||||
push:
|
||||
pull_request:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
BEC_WIDGETS_BRANCH:
|
||||
description: 'Branch of BEC Widgets to install'
|
||||
description: "Branch of BEC Widgets to install"
|
||||
required: false
|
||||
type: string
|
||||
BEC_CORE_BRANCH:
|
||||
description: 'Branch of BEC Core to install'
|
||||
description: "Branch of BEC Core to install"
|
||||
required: false
|
||||
type: string
|
||||
OPHYD_DEVICES_BRANCH:
|
||||
description: 'Branch of Ophyd Devices to install'
|
||||
description: "Branch of Ophyd Devices to install"
|
||||
required: false
|
||||
type: string
|
||||
|
||||
@@ -25,60 +25,58 @@ permissions:
|
||||
pull-requests: write
|
||||
|
||||
jobs:
|
||||
check_pr_status:
|
||||
uses: ./.github/workflows/check_pr.yml
|
||||
# check_pr_status:
|
||||
# uses: ./.github/workflows/check_pr.yml
|
||||
|
||||
formatter:
|
||||
needs: check_pr_status
|
||||
if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
uses: ./.github/workflows/formatter.yml
|
||||
# formatter:
|
||||
# needs: check_pr_status
|
||||
# if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
# uses: ./.github/workflows/formatter.yml
|
||||
|
||||
unit-test:
|
||||
needs: [check_pr_status, formatter]
|
||||
if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
uses: ./.github/workflows/pytest.yml
|
||||
with:
|
||||
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref }}
|
||||
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
|
||||
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}
|
||||
secrets:
|
||||
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
|
||||
# unit-test:
|
||||
# needs: [check_pr_status, formatter]
|
||||
# if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
# uses: ./.github/workflows/pytest.yml
|
||||
# with:
|
||||
# BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref }}
|
||||
# BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
|
||||
# OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}
|
||||
# secrets:
|
||||
# CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
|
||||
|
||||
unit-test-matrix:
|
||||
needs: [check_pr_status, formatter]
|
||||
if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
uses: ./.github/workflows/pytest-matrix.yml
|
||||
with:
|
||||
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha}}
|
||||
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
|
||||
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}
|
||||
# unit-test-matrix:
|
||||
# needs: [check_pr_status, formatter]
|
||||
# if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
# uses: ./.github/workflows/pytest-matrix.yml
|
||||
# with:
|
||||
# BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha}}
|
||||
# BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
|
||||
# OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}
|
||||
|
||||
generate-cli-test:
|
||||
needs: [check_pr_status, formatter]
|
||||
if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
uses: ./.github/workflows/generate-cli-check.yml
|
||||
# generate-cli-test:
|
||||
# needs: [check_pr_status, formatter]
|
||||
# if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
# uses: ./.github/workflows/generate-cli-check.yml
|
||||
|
||||
end2end-test:
|
||||
needs: [check_pr_status, formatter]
|
||||
if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
uses: ./.github/workflows/end2end-conda.yml
|
||||
|
||||
child-repos:
|
||||
needs: [check_pr_status, formatter]
|
||||
if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
uses: ./.github/workflows/child_repos.yml
|
||||
with:
|
||||
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
|
||||
OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
|
||||
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
|
||||
|
||||
plugin_repos:
|
||||
needs: [check_pr_status, formatter]
|
||||
if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
uses: bec-project/bec/.github/workflows/plugin_repos.yml@main
|
||||
with:
|
||||
BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
|
||||
BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
|
||||
# child-repos:
|
||||
# needs: [check_pr_status, formatter]
|
||||
# if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
# uses: ./.github/workflows/child_repos.yml
|
||||
# with:
|
||||
# BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
|
||||
# OPHYD_DEVICES_BRANCH: ${{ inputs.OPHYD_DEVICES_BRANCH || 'main'}}
|
||||
# BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
|
||||
|
||||
secrets:
|
||||
GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}
|
||||
# plugin_repos:
|
||||
# needs: [check_pr_status, formatter]
|
||||
# if: needs.check_pr_status.outputs.branch-pr == ''
|
||||
# uses: bec-project/bec/.github/workflows/plugin_repos.yml@main
|
||||
# with:
|
||||
# BEC_CORE_BRANCH: ${{ inputs.BEC_CORE_BRANCH || 'main' }}
|
||||
# BEC_WIDGETS_BRANCH: ${{ inputs.BEC_WIDGETS_BRANCH || github.head_ref || github.sha }}
|
||||
|
||||
# secrets:
|
||||
# GH_READ_TOKEN: ${{ secrets.GH_READ_TOKEN }}
|
||||
|
||||
6
.github/workflows/end2end-conda.yml
vendored
6
.github/workflows/end2end-conda.yml
vendored
@@ -48,12 +48,14 @@ jobs:
|
||||
source ./bin/install_bec_dev.sh -t
|
||||
cd ../
|
||||
pip install -e ./ophyd_devices -e .[dev,pyside6] -e ./bec_testing_plugin
|
||||
pytest -v --files-path ./ --start-servers --random-order ./tests/end-2-end
|
||||
pip install pytest-repeat
|
||||
|
||||
pytest -v --files-path ./ --start-servers --random-order ./tests/end-2-end/user_interaction/test_user_interaction_e2e.py
|
||||
|
||||
- name: Upload logs if job fails
|
||||
if: failure()
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: pytest-logs
|
||||
path: ./logs/*.log
|
||||
path: ./bec/logs/*.log
|
||||
retention-days: 7
|
||||
|
||||
11
CHANGELOG.md
11
CHANGELOG.md
@@ -1,6 +1,17 @@
|
||||
# CHANGELOG
|
||||
|
||||
|
||||
## v3.2.0 (2026-03-11)
|
||||
|
||||
### Features
|
||||
|
||||
- **curve, waveform**: Add dap_parameters for lmfit customization in DAP requests
|
||||
([`14d51b8`](https://github.com/bec-project/bec_widgets/commit/14d51b80169f5a060dd24287f3a6db9a4b41275a))
|
||||
|
||||
- **waveform**: Composite DAP with multiple models
|
||||
([`b4f6f5a`](https://github.com/bec-project/bec_widgets/commit/b4f6f5aa8bcd0f6091610e8f839ea265c87575e0))
|
||||
|
||||
|
||||
## v3.1.4 (2026-03-11)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
@@ -3116,24 +3116,24 @@ class LaunchWindow(RPCBase):
|
||||
|
||||
|
||||
class LogPanel(RPCBase):
|
||||
"""Live display of the BEC logs in a table view."""
|
||||
"""Displays a log panel"""
|
||||
|
||||
@rpc_call
|
||||
def remove(self):
|
||||
def set_plain_text(self, text: str) -> None:
|
||||
"""
|
||||
Cleanup the BECConnector
|
||||
Set the plain text of the widget.
|
||||
|
||||
Args:
|
||||
text (str): The text to set.
|
||||
"""
|
||||
|
||||
@rpc_call
|
||||
def attach(self):
|
||||
"""
|
||||
None
|
||||
def set_html_text(self, text: str) -> None:
|
||||
"""
|
||||
Set the HTML text of the widget.
|
||||
|
||||
@rpc_call
|
||||
def detach(self):
|
||||
"""
|
||||
Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget.
|
||||
Args:
|
||||
text (str): The text to set.
|
||||
"""
|
||||
|
||||
|
||||
@@ -6249,7 +6249,8 @@ class Waveform(RPCBase):
|
||||
signal_y: "str | None" = None,
|
||||
color: "str | None" = None,
|
||||
label: "str | None" = None,
|
||||
dap: "str | None" = None,
|
||||
dap: "str | list[str] | None" = None,
|
||||
dap_parameters: "dict | list | lmfit.Parameters | None | object" = None,
|
||||
scan_id: "str | None" = None,
|
||||
scan_number: "int | None" = None,
|
||||
**kwargs,
|
||||
@@ -6271,9 +6272,14 @@ class Waveform(RPCBase):
|
||||
signal_y(str): The name of the entry for the y-axis.
|
||||
color(str): The color of the curve.
|
||||
label(str): The label of the curve.
|
||||
dap(str): The dap model to use for the curve. When provided, a DAP curve is
|
||||
dap(str | list[str]): The dap model to use for the curve. When provided, a DAP curve is
|
||||
attached automatically for device, history, or custom data sources. Use
|
||||
the same string as the LMFit model name.
|
||||
the same string as the LMFit model name, or a list of model names to build a composite.
|
||||
dap_parameters(dict | list | lmfit.Parameters | None): Optional lmfit parameter overrides sent to
|
||||
the DAP server. For a single model: values can be numeric (interpreted as fixed parameters)
|
||||
or dicts like `{"value": 1.0, "vary": False}`. For composite models (dap is list), use either
|
||||
a list aligned to the model list (each item is a param dict), or a dict of
|
||||
`{ "ModelName": { "param": {...} } }` when model names are unique.
|
||||
scan_id(str): Optional scan ID. When provided, the curve is treated as a **history** curve and
|
||||
the y‑data (and optional x‑data) are fetched from that historical scan. Such curves are
|
||||
never cleared by live‑scan resets.
|
||||
@@ -6287,9 +6293,10 @@ class Waveform(RPCBase):
|
||||
def add_dap_curve(
|
||||
self,
|
||||
device_label: "str",
|
||||
dap_name: "str",
|
||||
dap_name: "str | list[str]",
|
||||
color: "str | None" = None,
|
||||
dap_oversample: "int" = 1,
|
||||
dap_parameters: "dict | list | lmfit.Parameters | None" = None,
|
||||
**kwargs,
|
||||
) -> "Curve":
|
||||
"""
|
||||
@@ -6299,9 +6306,11 @@ class Waveform(RPCBase):
|
||||
|
||||
Args:
|
||||
device_label(str): The label of the source curve to add DAP to.
|
||||
dap_name(str): The name of the DAP model to use.
|
||||
dap_name(str | list[str]): The name of the DAP model to use, or a list of model
|
||||
names to build a composite model.
|
||||
color(str): The color of the curve.
|
||||
dap_oversample(int): The oversampling factor for the DAP curve.
|
||||
dap_parameters(dict | list | lmfit.Parameters | None): Optional lmfit parameter overrides sent to the DAP server.
|
||||
**kwargs
|
||||
|
||||
Returns:
|
||||
|
||||
@@ -118,6 +118,9 @@ class RPCServer:
|
||||
logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}")
|
||||
with rpc_exception_hook(functools.partial(self.send_response, request_id, False)):
|
||||
try:
|
||||
logger.info(
|
||||
f"Processing RPC instruction: {msg['action']} with request_id: {request_id}. Parameters: {msg.get('parameter')}"
|
||||
)
|
||||
method = msg["action"]
|
||||
args = msg["parameter"].get("args", [])
|
||||
kwargs = msg["parameter"].get("kwargs", {})
|
||||
@@ -257,6 +260,10 @@ class RPCServer:
|
||||
else:
|
||||
name = WidgetContainerUtils.generate_unique_name("dock_area", existing_dock_areas)
|
||||
|
||||
logger.info(
|
||||
f"Launching new dock area with name: {name} and startup_profile: {startup_profile}"
|
||||
)
|
||||
|
||||
result_widget = bw_launch.dock_area(object_name=name, startup_profile=startup_profile)
|
||||
result_widget.window().setWindowTitle(f"BEC - {name}")
|
||||
|
||||
@@ -296,6 +303,9 @@ class RPCServer:
|
||||
else:
|
||||
res = self.serialize_object(res)
|
||||
except RegistryNotReadyError:
|
||||
logger.info(
|
||||
f"Object not registered yet for RPC request {request_id}, retrying serialization after {retry_delay} ms"
|
||||
)
|
||||
try:
|
||||
self._rpc_singleshot_repeats[request_id] += retry_delay
|
||||
QTimer.singleShot(
|
||||
|
||||
@@ -79,7 +79,7 @@ from bec_widgets.widgets.plots.waveform.waveform import Waveform
|
||||
from bec_widgets.widgets.progress.ring_progress_bar.ring_progress_bar import RingProgressBar
|
||||
from bec_widgets.widgets.services.bec_queue.bec_queue import BECQueue
|
||||
from bec_widgets.widgets.services.bec_status_box.bec_status_box import BECStatusBox
|
||||
from bec_widgets.widgets.utility.logpanel.logpanel import LogPanel
|
||||
from bec_widgets.widgets.utility.logpanel import LogPanel
|
||||
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
|
||||
|
||||
logger = bec_logger.logger
|
||||
@@ -376,7 +376,6 @@ class BECDockArea(DockAreaWidget):
|
||||
"bec_shell": (BECShell.ICON_NAME, "Add BEC Shell", "BECShell"),
|
||||
"log_panel": (LogPanel.ICON_NAME, "Add LogPanel - Disabled", "LogPanel"),
|
||||
"sbb_monitor": ("train", "Add SBB Monitor", "SBBMonitor"),
|
||||
"log_panel": (LogPanel.ICON_NAME, "Add LogPanel", "LogPanel"),
|
||||
}
|
||||
|
||||
# Create expandable menu actions (original behavior)
|
||||
@@ -488,7 +487,9 @@ class BECDockArea(DockAreaWidget):
|
||||
# first two items not needed for this part
|
||||
for key, (_, _, widget_type) in mapping.items():
|
||||
act = menu.actions[key].action
|
||||
if key == "terminal":
|
||||
if widget_type == "LogPanel":
|
||||
act.setEnabled(False) # keep disabled per issue #644
|
||||
elif key == "terminal":
|
||||
act.triggered.connect(
|
||||
lambda _, t=widget_type: self.new(widget=t, closable=True, startup_cmd=None)
|
||||
)
|
||||
@@ -509,7 +510,10 @@ class BECDockArea(DockAreaWidget):
|
||||
for action_id, (_, _, widget_type) in mapping.items():
|
||||
flat_action_id = f"flat_{action_id}"
|
||||
flat_action = self.toolbar.components.get_action(flat_action_id).action
|
||||
flat_action.triggered.connect(lambda _, t=widget_type: self.new(t))
|
||||
if widget_type == "LogPanel":
|
||||
flat_action.setEnabled(False) # keep disabled per issue #644
|
||||
else:
|
||||
flat_action.triggered.connect(lambda _, t=widget_type: self.new(t))
|
||||
|
||||
_connect_flat_actions(self._ACTION_MAPPINGS["menu_plots"])
|
||||
_connect_flat_actions(self._ACTION_MAPPINGS["menu_devices"])
|
||||
|
||||
@@ -22,8 +22,9 @@ class DeviceSignal(BaseModel):
|
||||
|
||||
device: str
|
||||
signal: str
|
||||
dap: str | None = None
|
||||
dap: str | list[str] | None = None
|
||||
dap_oversample: int = 1
|
||||
dap_parameters: dict | list | None = None
|
||||
|
||||
model_config: dict = {"validate_assignment": True}
|
||||
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Literal
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
import lmfit
|
||||
import numpy as np
|
||||
import pyqtgraph as pg
|
||||
from bec_lib import bec_logger, messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.lmfit_serializer import serialize_lmfit_params, serialize_param_object
|
||||
from bec_lib.scan_data_container import ScanDataContainer
|
||||
from pydantic import Field, ValidationError, field_validator
|
||||
from qtpy.QtCore import Qt, QTimer, Signal
|
||||
@@ -41,6 +41,18 @@ from bec_widgets.widgets.services.scan_history_browser.scan_history_browser impo
|
||||
)
|
||||
|
||||
logger = bec_logger.logger
|
||||
_DAP_PARAM = object()
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
import lmfit # type: ignore
|
||||
else:
|
||||
try:
|
||||
import lmfit # type: ignore
|
||||
except Exception as e: # pragma: no cover
|
||||
logger.warning(
|
||||
f"lmfit could not be imported: {e}. Custom DAP functionality will be unavailable."
|
||||
)
|
||||
lmfit = None
|
||||
|
||||
|
||||
# noinspection PyDataclass
|
||||
@@ -696,7 +708,8 @@ class Waveform(PlotBase):
|
||||
signal_y: str | None = None,
|
||||
color: str | None = None,
|
||||
label: str | None = None,
|
||||
dap: str | None = None,
|
||||
dap: str | list[str] | None = None,
|
||||
dap_parameters: dict | list | lmfit.Parameters | None | object = None,
|
||||
scan_id: str | None = None,
|
||||
scan_number: int | None = None,
|
||||
**kwargs,
|
||||
@@ -718,9 +731,14 @@ class Waveform(PlotBase):
|
||||
signal_y(str): The name of the entry for the y-axis.
|
||||
color(str): The color of the curve.
|
||||
label(str): The label of the curve.
|
||||
dap(str): The dap model to use for the curve. When provided, a DAP curve is
|
||||
dap(str | list[str]): The dap model to use for the curve. When provided, a DAP curve is
|
||||
attached automatically for device, history, or custom data sources. Use
|
||||
the same string as the LMFit model name.
|
||||
the same string as the LMFit model name, or a list of model names to build a composite.
|
||||
dap_parameters(dict | list | lmfit.Parameters | None): Optional lmfit parameter overrides sent to
|
||||
the DAP server. For a single model: values can be numeric (interpreted as fixed parameters)
|
||||
or dicts like `{"value": 1.0, "vary": False}`. For composite models (dap is list), use either
|
||||
a list aligned to the model list (each item is a param dict), or a dict of
|
||||
`{ "ModelName": { "param": {...} } }` when model names are unique.
|
||||
scan_id(str): Optional scan ID. When provided, the curve is treated as a **history** curve and
|
||||
the y‑data (and optional x‑data) are fetched from that historical scan. Such curves are
|
||||
never cleared by live‑scan resets.
|
||||
@@ -733,6 +751,8 @@ class Waveform(PlotBase):
|
||||
source = "custom"
|
||||
x_data = None
|
||||
y_data = None
|
||||
if dap_parameters is _DAP_PARAM:
|
||||
dap_parameters = kwargs.pop("dap_parameters", None) or kwargs.pop("parameters", None)
|
||||
|
||||
# 1. Custom curve logic
|
||||
if x is not None and y is not None:
|
||||
@@ -810,7 +830,9 @@ class Waveform(PlotBase):
|
||||
curve = self._add_curve(config=config, x_data=x_data, y_data=y_data)
|
||||
|
||||
if dap is not None and curve.config.source in ("device", "history", "custom"):
|
||||
self.add_dap_curve(device_label=curve.name(), dap_name=dap, **kwargs)
|
||||
self.add_dap_curve(
|
||||
device_label=curve.name(), dap_name=dap, dap_parameters=dap_parameters, **kwargs
|
||||
)
|
||||
|
||||
return curve
|
||||
|
||||
@@ -820,9 +842,10 @@ class Waveform(PlotBase):
|
||||
def add_dap_curve(
|
||||
self,
|
||||
device_label: str,
|
||||
dap_name: str,
|
||||
dap_name: str | list[str],
|
||||
color: str | None = None,
|
||||
dap_oversample: int = 1,
|
||||
dap_parameters: dict | list | lmfit.Parameters | None = None,
|
||||
**kwargs,
|
||||
) -> Curve:
|
||||
"""
|
||||
@@ -832,9 +855,11 @@ class Waveform(PlotBase):
|
||||
|
||||
Args:
|
||||
device_label(str): The label of the source curve to add DAP to.
|
||||
dap_name(str): The name of the DAP model to use.
|
||||
dap_name(str | list[str]): The name of the DAP model to use, or a list of model
|
||||
names to build a composite model.
|
||||
color(str): The color of the curve.
|
||||
dap_oversample(int): The oversampling factor for the DAP curve.
|
||||
dap_parameters(dict | list | lmfit.Parameters | None): Optional lmfit parameter overrides sent to the DAP server.
|
||||
**kwargs
|
||||
|
||||
Returns:
|
||||
@@ -859,7 +884,7 @@ class Waveform(PlotBase):
|
||||
dev_entry = "custom"
|
||||
|
||||
# 2) Build a label for the new DAP curve
|
||||
dap_label = f"{device_label}-{dap_name}"
|
||||
dap_label = f"{device_label}-{self._format_dap_label(dap_name)}"
|
||||
|
||||
# 3) Possibly raise if the DAP curve already exists
|
||||
if self._check_curve_id(dap_label):
|
||||
@@ -882,7 +907,11 @@ class Waveform(PlotBase):
|
||||
|
||||
# Attach device signal with DAP
|
||||
config.signal = DeviceSignal(
|
||||
device=dev_name, signal=dev_entry, dap=dap_name, dap_oversample=dap_oversample
|
||||
device=dev_name,
|
||||
signal=dev_entry,
|
||||
dap=dap_name,
|
||||
dap_oversample=dap_oversample,
|
||||
dap_parameters=self._normalize_dap_parameters(dap_parameters, dap_name=dap_name),
|
||||
)
|
||||
|
||||
# 4) Create the DAP curve config using `_add_curve(...)`
|
||||
@@ -1754,7 +1783,9 @@ class Waveform(PlotBase):
|
||||
|
||||
x_data, y_data = parent_curve.get_data()
|
||||
model_name = dap_curve.config.signal.dap
|
||||
model = getattr(self.dap, model_name)
|
||||
model = None
|
||||
if not isinstance(model_name, (list, tuple)):
|
||||
model = getattr(self.dap, model_name)
|
||||
try:
|
||||
x_min, x_max = self.roi_region
|
||||
x_data, y_data = self._crop_data(x_data, y_data, x_min, x_max)
|
||||
@@ -1762,20 +1793,132 @@ class Waveform(PlotBase):
|
||||
x_min = None
|
||||
x_max = None
|
||||
|
||||
dap_parameters = getattr(dap_curve.config.signal, "dap_parameters", None)
|
||||
dap_kwargs = {
|
||||
"data_x": x_data,
|
||||
"data_y": y_data,
|
||||
"oversample": dap_curve.dap_oversample,
|
||||
}
|
||||
if dap_parameters:
|
||||
dap_kwargs["parameters"] = dap_parameters
|
||||
|
||||
if model is not None:
|
||||
class_args = model._plugin_info["class_args"]
|
||||
class_kwargs = model._plugin_info["class_kwargs"]
|
||||
else:
|
||||
class_args = []
|
||||
class_kwargs = {"model": model_name}
|
||||
|
||||
msg = messages.DAPRequestMessage(
|
||||
dap_cls="LmfitService1D",
|
||||
dap_type="on_demand",
|
||||
config={
|
||||
"args": [],
|
||||
"kwargs": {"data_x": x_data, "data_y": y_data},
|
||||
"class_args": model._plugin_info["class_args"],
|
||||
"class_kwargs": model._plugin_info["class_kwargs"],
|
||||
"kwargs": dap_kwargs,
|
||||
"class_args": class_args,
|
||||
"class_kwargs": class_kwargs,
|
||||
"curve_label": dap_curve.name(),
|
||||
},
|
||||
metadata={"RID": f"{self.scan_id}-{self.gui_id}"},
|
||||
)
|
||||
self.client.connector.set_and_publish(MessageEndpoints.dap_request(), msg)
|
||||
|
||||
@staticmethod
|
||||
def _normalize_dap_parameters(
|
||||
parameters: dict | list | lmfit.Parameters | None, dap_name: str | list[str] | None = None
|
||||
) -> dict | list | None:
|
||||
"""
|
||||
Normalize user-provided lmfit parameters into a JSON-serializable dict suitable for the DAP server.
|
||||
|
||||
Supports:
|
||||
- `lmfit.Parameters` (single-model only)
|
||||
- `dict[name -> number]` (treated as fixed parameter with `vary=False`)
|
||||
- `dict[name -> dict]` (lmfit.Parameter fields; defaults to `vary=False` if unspecified)
|
||||
- `dict[name -> lmfit.Parameter]`
|
||||
- composite: `list[dict[param_name -> spec]]` aligned to model list
|
||||
- composite: `dict[model_name -> dict[param_name -> spec]]` (unique model names only)
|
||||
"""
|
||||
if parameters is None:
|
||||
return None
|
||||
if isinstance(dap_name, (list, tuple)):
|
||||
if lmfit is not None and isinstance(parameters, lmfit.Parameters):
|
||||
raise TypeError("dap_parameters must be a dict when using composite dap models.")
|
||||
if isinstance(parameters, (list, tuple)):
|
||||
normalized_list: list[dict | None] = []
|
||||
for idx, item in enumerate(parameters):
|
||||
if item is None:
|
||||
normalized_list.append(None)
|
||||
continue
|
||||
if not isinstance(item, dict):
|
||||
raise TypeError(
|
||||
f"dap_parameters list item {idx} must be a dict of parameter overrides."
|
||||
)
|
||||
normalized_list.append(Waveform._normalize_param_overrides(item))
|
||||
return normalized_list or None
|
||||
if not isinstance(parameters, dict):
|
||||
raise TypeError(
|
||||
"dap_parameters must be a dict of model->params when using composite dap models."
|
||||
)
|
||||
model_names = set(dap_name)
|
||||
invalid_models = set(parameters.keys()) - model_names
|
||||
if invalid_models:
|
||||
raise TypeError(
|
||||
f"Invalid dap_parameters keys for composite model: {sorted(invalid_models)}"
|
||||
)
|
||||
normalized_composite: dict[str, dict] = {}
|
||||
for model_name in dap_name:
|
||||
model_params = parameters.get(model_name)
|
||||
if model_params is None:
|
||||
continue
|
||||
if not isinstance(model_params, dict):
|
||||
raise TypeError(
|
||||
f"dap_parameters for '{model_name}' must be a dict of parameter overrides."
|
||||
)
|
||||
normalized = Waveform._normalize_param_overrides(model_params)
|
||||
if normalized:
|
||||
normalized_composite[model_name] = normalized
|
||||
return normalized_composite or None
|
||||
|
||||
if lmfit is not None and isinstance(parameters, lmfit.Parameters):
|
||||
return serialize_lmfit_params(parameters)
|
||||
if not isinstance(parameters, dict):
|
||||
if lmfit is None:
|
||||
raise TypeError(
|
||||
"dap_parameters must be a dict when lmfit is not installed on the client."
|
||||
)
|
||||
raise TypeError("dap_parameters must be a dict or lmfit.Parameters (or omitted).")
|
||||
|
||||
return Waveform._normalize_param_overrides(parameters)
|
||||
|
||||
@staticmethod
|
||||
def _normalize_param_overrides(parameters: dict) -> dict | None:
|
||||
normalized: dict[str, dict] = {}
|
||||
for name, spec in parameters.items():
|
||||
if spec is None:
|
||||
continue
|
||||
if isinstance(spec, (int, float, np.number)):
|
||||
normalized[name] = {"name": name, "value": float(spec), "vary": False}
|
||||
continue
|
||||
if lmfit is not None and isinstance(spec, lmfit.Parameter):
|
||||
normalized[name] = serialize_param_object(spec)
|
||||
continue
|
||||
if isinstance(spec, dict):
|
||||
normalized[name] = {"name": name, **spec}
|
||||
if "vary" not in normalized[name]:
|
||||
normalized[name]["vary"] = False
|
||||
continue
|
||||
raise TypeError(
|
||||
f"Invalid dap_parameters entry for '{name}': expected number, dict, or lmfit.Parameter."
|
||||
)
|
||||
|
||||
return normalized or None
|
||||
|
||||
@staticmethod
|
||||
def _format_dap_label(dap_name: str | list[str]) -> str:
|
||||
if isinstance(dap_name, (list, tuple)):
|
||||
return "+".join(dap_name)
|
||||
return dap_name
|
||||
|
||||
@SafeSlot(dict, dict)
|
||||
def update_dap_curves(self, msg, metadata):
|
||||
"""
|
||||
@@ -1793,14 +1936,6 @@ class Waveform(PlotBase):
|
||||
if not curve:
|
||||
return
|
||||
|
||||
# Get data from the parent (device) curve
|
||||
parent_curve = self._find_curve_by_label(curve.config.parent_label)
|
||||
if parent_curve is None:
|
||||
return
|
||||
x_parent, _ = parent_curve.get_data()
|
||||
if x_parent is None or len(x_parent) == 0:
|
||||
return
|
||||
|
||||
# Retrieve and store the fit parameters and summary from the DAP server response
|
||||
try:
|
||||
curve.dap_params = msg["data"][1]["fit_parameters"]
|
||||
@@ -1809,19 +1944,13 @@ class Waveform(PlotBase):
|
||||
logger.warning(f"Failed to retrieve DAP data for curve '{curve.name()}'")
|
||||
return
|
||||
|
||||
# Render model according to the DAP model name and parameters
|
||||
model_name = curve.config.signal.dap
|
||||
model_function = getattr(lmfit.models, model_name)()
|
||||
|
||||
x_min, x_max = x_parent.min(), x_parent.max()
|
||||
oversample = curve.dap_oversample
|
||||
new_x = np.linspace(x_min, x_max, int(len(x_parent) * oversample))
|
||||
|
||||
# Evaluate the model with the provided parameters to generate the y values
|
||||
new_y = model_function.eval(**curve.dap_params, x=new_x)
|
||||
|
||||
# Update the curve with the new data
|
||||
curve.setData(new_x, new_y)
|
||||
# Plot the fitted curve using the server-provided output to avoid requiring lmfit on the client.
|
||||
try:
|
||||
fit_data = msg["data"][0]
|
||||
curve.setData(np.asarray(fit_data["x"]), np.asarray(fit_data["y"]))
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to plot DAP result for curve '{curve.name()}', error: {e}")
|
||||
return
|
||||
|
||||
metadata.update({"curve_id": curve_id})
|
||||
self.dap_params_update.emit(curve.dap_params, metadata)
|
||||
@@ -2341,24 +2470,20 @@ class DemoApp(QMainWindow): # pragma: no cover
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.setWindowTitle("Waveform Demo")
|
||||
self.resize(1200, 600)
|
||||
self.resize(1600, 600)
|
||||
self.main_widget = QWidget(self)
|
||||
self.layout = QHBoxLayout(self.main_widget)
|
||||
self.setCentralWidget(self.main_widget)
|
||||
|
||||
self.waveform_popup = Waveform(popups=True)
|
||||
self.waveform_popup.plot(device_y="waveform")
|
||||
|
||||
self.waveform_side = Waveform(popups=False)
|
||||
self.waveform_side.plot(device_y="bpm4i", signal_y="bpm4i", dap="GaussianModel")
|
||||
self.waveform_side.plot(device_y="bpm3a", signal_y="bpm3a")
|
||||
|
||||
self.custom_waveform = Waveform(popups=True)
|
||||
self._populate_custom_curve_demo()
|
||||
|
||||
self.layout.addWidget(self.waveform_side)
|
||||
self.layout.addWidget(self.waveform_popup)
|
||||
self.sine_waveform = Waveform(popups=True)
|
||||
self.sine_waveform.dap_params_update.connect(self._log_sine_dap_params)
|
||||
self._populate_sine_curve_demo()
|
||||
|
||||
self.layout.addWidget(self.custom_waveform)
|
||||
self.layout.addWidget(self.sine_waveform)
|
||||
|
||||
def _populate_custom_curve_demo(self):
|
||||
"""
|
||||
@@ -2377,8 +2502,141 @@ class DemoApp(QMainWindow): # pragma: no cover
|
||||
sigma = 0.8
|
||||
y = amplitude * np.exp(-((x - center) ** 2) / (2 * sigma**2)) + noise
|
||||
|
||||
# 1) No explicit parameters: server will use lmfit defaults/guesses.
|
||||
self.custom_waveform.plot(x=x, y=y, label="custom-gaussian", dap="GaussianModel")
|
||||
|
||||
# 2) Easy dict: numbers mean "fix this parameter to value" (vary=False).
|
||||
self.custom_waveform.plot(
|
||||
x=x,
|
||||
y=y,
|
||||
label="custom-gaussian-fixed-easy",
|
||||
dap="GaussianModel",
|
||||
dap_parameters={"amplitude": 1.0},
|
||||
dap_oversample=5,
|
||||
)
|
||||
|
||||
# 3) Partial parameter override: this should still trigger guessing on the server
|
||||
# because not all Gaussian parameters are explicitly specified.
|
||||
self.custom_waveform.plot(
|
||||
x=x,
|
||||
y=y,
|
||||
label="custom-gaussian-partial-guess",
|
||||
dap="GaussianModel",
|
||||
dap_parameters={
|
||||
"center": {"value": 1.2, "vary": True},
|
||||
"sigma": {"value": sigma, "vary": False, "min": 0.0},
|
||||
},
|
||||
)
|
||||
|
||||
# 4) Complete parameter override: this should skip guessing on the server.
|
||||
if lmfit is not None:
|
||||
params_gauss = lmfit.models.GaussianModel().make_params()
|
||||
params_gauss["amplitude"].set(value=amplitude, vary=False)
|
||||
params_gauss["center"].set(value=center, vary=False)
|
||||
params_gauss["sigma"].set(value=sigma, vary=False, min=0.0)
|
||||
self.custom_waveform.plot(
|
||||
x=x,
|
||||
y=y,
|
||||
label="custom-gaussian-complete-no-guess",
|
||||
dap="GaussianModel",
|
||||
dap_parameters=params_gauss,
|
||||
)
|
||||
else:
|
||||
logger.info("Skipping lmfit.Parameters demo (lmfit not installed on client).")
|
||||
|
||||
# Composite example: spectrum with three Gaussians (DAP-only)
|
||||
x_spec = np.linspace(-5, 5, 800)
|
||||
rng_spec = np.random.default_rng(123)
|
||||
centers = [-2.0, 0.6, 2.4]
|
||||
amplitudes = [2.5, 3.2, 1.8]
|
||||
sigmas = [0.35, 0.5, 0.3]
|
||||
y_spec = (
|
||||
amplitudes[0] * np.exp(-((x_spec - centers[0]) ** 2) / (2 * sigmas[0] ** 2))
|
||||
+ amplitudes[1] * np.exp(-((x_spec - centers[1]) ** 2) / (2 * sigmas[1] ** 2))
|
||||
+ amplitudes[2] * np.exp(-((x_spec - centers[2]) ** 2) / (2 * sigmas[2] ** 2))
|
||||
+ rng_spec.normal(loc=0, scale=0.06, size=x_spec.size)
|
||||
)
|
||||
|
||||
# 5) Composite model with partial overrides only: this should still trigger guessing.
|
||||
self.custom_waveform.plot(
|
||||
x=x_spec,
|
||||
y=y_spec,
|
||||
label="custom-gaussian-spectrum-partial-guess",
|
||||
dap=["GaussianModel", "GaussianModel", "GaussianModel"],
|
||||
dap_parameters=[
|
||||
{"center": {"value": centers[0], "vary": False}},
|
||||
{"center": {"value": centers[1], "vary": False}},
|
||||
{"center": {"value": centers[2], "vary": False}},
|
||||
],
|
||||
)
|
||||
|
||||
# 6) Composite model with all component parameters specified: this should skip guessing.
|
||||
self.custom_waveform.plot(
|
||||
x=x_spec,
|
||||
y=y_spec,
|
||||
label="custom-gaussian-spectrum-complete-no-guess",
|
||||
dap=["GaussianModel", "GaussianModel", "GaussianModel"],
|
||||
dap_parameters=[
|
||||
{
|
||||
"amplitude": {"value": amplitudes[0], "vary": False},
|
||||
"center": {"value": centers[0], "vary": False},
|
||||
"sigma": {"value": sigmas[0], "vary": False, "min": 0.0},
|
||||
},
|
||||
{
|
||||
"amplitude": {"value": amplitudes[1], "vary": False},
|
||||
"center": {"value": centers[1], "vary": False},
|
||||
"sigma": {"value": sigmas[1], "vary": False, "min": 0.0},
|
||||
},
|
||||
{
|
||||
"amplitude": {"value": amplitudes[2], "vary": False},
|
||||
"center": {"value": centers[2], "vary": False},
|
||||
"sigma": {"value": sigmas[2], "vary": False, "min": 0.0},
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
def _populate_sine_curve_demo(self):
|
||||
"""
|
||||
Showcase how lmfit's base SineModel can struggle with a drifting baseline.
|
||||
"""
|
||||
x = np.linspace(0, 6 * np.pi, 600)
|
||||
rng = np.random.default_rng(7)
|
||||
amplitude = 1.6
|
||||
frequency = 0.75
|
||||
phase = 0.4
|
||||
offset = 0.8
|
||||
slope = 0.08
|
||||
noise = rng.normal(loc=0, scale=0.12, size=x.size)
|
||||
y = offset + slope * x + amplitude * np.sin(2 * np.pi * frequency * x + phase) + noise
|
||||
|
||||
# Base SineModel (no offset support) to show the mismatch
|
||||
self.sine_waveform.plot(x=x, y=y, label="custom-sine-data", dap="SineModel")
|
||||
|
||||
# Composite model: Sine + Linear baseline (offset + slope)
|
||||
self.sine_waveform.plot(
|
||||
x=x,
|
||||
y=y,
|
||||
label="custom-sine-composite",
|
||||
dap=["SineModel", "LinearModel"],
|
||||
dap_oversample=4,
|
||||
)
|
||||
|
||||
if lmfit is None:
|
||||
logger.info("Skipping sine lmfit demo (lmfit not installed on client).")
|
||||
return
|
||||
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def _log_sine_dap_params(params: dict, metadata: dict):
|
||||
curve_id = metadata.get("curve_id")
|
||||
if curve_id not in {
|
||||
"custom-sine-data-SineModel",
|
||||
"custom-sine-composite-SineModel+LinearModel",
|
||||
}:
|
||||
return
|
||||
logger.info(f"SineModel DAP fit params ({curve_id}): {params}")
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
import sys
|
||||
|
||||
58
bec_widgets/widgets/utility/logpanel/_util.py
Normal file
58
bec_widgets/widgets/utility/logpanel/_util.py
Normal file
@@ -0,0 +1,58 @@
|
||||
"""Utilities for filtering and formatting in the LogPanel"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from collections import deque
|
||||
from typing import Callable, Iterator
|
||||
|
||||
from bec_lib.logger import LogLevel
|
||||
from bec_lib.messages import LogMessage
|
||||
from qtpy.QtCore import QDateTime
|
||||
|
||||
LinesHtmlFormatter = Callable[[deque[LogMessage]], Iterator[str]]
|
||||
LineFormatter = Callable[[LogMessage], str]
|
||||
LineFilter = Callable[[LogMessage], bool] | None
|
||||
|
||||
ANSI_ESCAPE_REGEX = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
|
||||
|
||||
|
||||
def replace_escapes(s: str):
|
||||
s = ANSI_ESCAPE_REGEX.sub("", s)
|
||||
return s.replace(" ", " ").replace("\n", "<br />").replace("\t", " ")
|
||||
|
||||
|
||||
def level_filter(msg: LogMessage, thresh: int):
|
||||
return LogLevel[msg.content["log_type"].upper()].value >= thresh
|
||||
|
||||
|
||||
def noop_format(line: LogMessage):
|
||||
_textline = line.log_msg if isinstance(line.log_msg, str) else line.log_msg["text"]
|
||||
return replace_escapes(_textline.strip()) + "<br />"
|
||||
|
||||
|
||||
def simple_color_format(line: LogMessage, colors: dict[LogLevel, str]):
|
||||
color = colors.get(LogLevel[line.content["log_type"].upper()]) or colors[LogLevel.INFO]
|
||||
return f'<font color="{color}">{noop_format(line)}</font>'
|
||||
|
||||
|
||||
def create_formatter(line_format: LineFormatter, line_filter: LineFilter) -> LinesHtmlFormatter:
|
||||
def _formatter(data: deque[LogMessage]):
|
||||
if line_filter is not None:
|
||||
return (line_format(line) for line in data if line_filter(line))
|
||||
else:
|
||||
return (line_format(line) for line in data)
|
||||
|
||||
return _formatter
|
||||
|
||||
|
||||
def log_txt(line):
|
||||
return line.log_msg if isinstance(line.log_msg, str) else line.log_msg["text"]
|
||||
|
||||
|
||||
def log_time(line):
|
||||
return QDateTime.fromMSecsSinceEpoch(int(line.log_msg["record"]["time"]["timestamp"] * 1000))
|
||||
|
||||
|
||||
def log_svc(line):
|
||||
return line.log_msg["service_name"]
|
||||
@@ -30,7 +30,7 @@ class LogPanelPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
|
||||
return DOM_XML
|
||||
|
||||
def group(self):
|
||||
return ""
|
||||
return "BEC Services"
|
||||
|
||||
def icon(self):
|
||||
return designer_material_icon(LogPanel.ICON_NAME)
|
||||
@@ -51,7 +51,7 @@ class LogPanelPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
|
||||
return "LogPanel"
|
||||
|
||||
def toolTip(self):
|
||||
return "LogPanel"
|
||||
return "Displays a log panel"
|
||||
|
||||
def whatsThis(self):
|
||||
return self.toolTip()
|
||||
|
||||
@@ -2,31 +2,21 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import operator
|
||||
import os
|
||||
import re
|
||||
from collections import deque
|
||||
from dataclasses import dataclass
|
||||
from functools import partial
|
||||
from typing import Iterable, Literal
|
||||
from functools import partial, reduce
|
||||
from re import Pattern
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
from bec_lib.client import BECClient
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.logger import LogLevel, bec_logger
|
||||
from bec_lib.messages import LogMessage, StatusMessage
|
||||
from bec_qthemes import material_icon
|
||||
from qtpy.QtCore import Signal # type: ignore
|
||||
from qtpy.QtCore import (
|
||||
QAbstractTableModel,
|
||||
QCoreApplication,
|
||||
QDateTime,
|
||||
QModelIndex,
|
||||
QObject,
|
||||
QPersistentModelIndex,
|
||||
QSize,
|
||||
QSortFilterProxyModel,
|
||||
Qt,
|
||||
QTimer,
|
||||
)
|
||||
from qtpy.QtGui import QColor
|
||||
from pyqtgraph import SignalProxy
|
||||
from qtpy.QtCore import QDateTime, QObject, Qt, Signal
|
||||
from qtpy.QtGui import QFont
|
||||
from qtpy.QtWidgets import (
|
||||
QApplication,
|
||||
QCheckBox,
|
||||
@@ -35,417 +25,204 @@ from qtpy.QtWidgets import (
|
||||
QDialog,
|
||||
QGridLayout,
|
||||
QHBoxLayout,
|
||||
QHeaderView,
|
||||
QLabel,
|
||||
QLineEdit,
|
||||
QPushButton,
|
||||
QSizePolicy,
|
||||
QTableView,
|
||||
QToolButton,
|
||||
QScrollArea,
|
||||
QTextEdit,
|
||||
QVBoxLayout,
|
||||
QWidget,
|
||||
)
|
||||
from thefuzz import fuzz
|
||||
|
||||
from bec_widgets.utils.bec_connector import BECConnector
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
from bec_widgets.utils.colors import apply_theme
|
||||
from bec_widgets.utils.colors import apply_theme, get_theme_palette
|
||||
from bec_widgets.utils.error_popups import SafeSlot
|
||||
from bec_widgets.widgets.editors.text_box.text_box import TextBox
|
||||
from bec_widgets.widgets.services.bec_status_box.bec_status_box import BECServiceStatusMixin
|
||||
from bec_widgets.widgets.utility.logpanel._util import (
|
||||
LineFilter,
|
||||
LineFormatter,
|
||||
LinesHtmlFormatter,
|
||||
create_formatter,
|
||||
level_filter,
|
||||
log_svc,
|
||||
log_time,
|
||||
log_txt,
|
||||
noop_format,
|
||||
simple_color_format,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from qtpy.QtCore import SignalInstance
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
MODULE_PATH = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
|
||||
|
||||
_DEFAULT_LOG_COLORS = {
|
||||
LogLevel.INFO.name: QColor("#FFFFFF"),
|
||||
LogLevel.SUCCESS.name: QColor("#00FF00"),
|
||||
LogLevel.WARNING.name: QColor("#FFCC00"),
|
||||
LogLevel.ERROR.name: QColor("#FF0000"),
|
||||
LogLevel.DEBUG.name: QColor("#0000CC"),
|
||||
# TODO: improve log color handling
|
||||
DEFAULT_LOG_COLORS = {
|
||||
LogLevel.INFO: "#FFFFFF",
|
||||
LogLevel.SUCCESS: "#00FF00",
|
||||
LogLevel.WARNING: "#FFCC00",
|
||||
LogLevel.ERROR: "#FF0000",
|
||||
LogLevel.DEBUG: "#0000CC",
|
||||
}
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class _Constants:
|
||||
FUZZ_THRESHOLD = 80
|
||||
UPDATE_INTERVAL_MS = 200
|
||||
headers = ["level", "timestamp", "service_name", "message", "function"]
|
||||
|
||||
|
||||
_CONST = _Constants()
|
||||
|
||||
|
||||
class TimestampUpdate:
|
||||
def __init__(self, value: QDateTime | None, update_type: Literal["start", "end"]) -> None:
|
||||
self.value = value
|
||||
self.update_type = update_type
|
||||
|
||||
|
||||
class BecLogsQueue(BECConnector, QObject):
|
||||
"""Manages getting logs from BEC Redis and formatting them for display"""
|
||||
|
||||
RPC = False
|
||||
new_messages = Signal()
|
||||
paused = Signal(bool)
|
||||
_instance: BecLogsQueue | None = None
|
||||
new_message = Signal()
|
||||
|
||||
@classmethod
|
||||
def instance(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = cls(QCoreApplication.instance())
|
||||
return cls._instance
|
||||
|
||||
def __init__(self, parent: QObject | None, maxlen: int = 2500, **kwargs) -> None:
|
||||
if BecLogsQueue._instance:
|
||||
raise RuntimeError("Create no more than one BecLogsQueue - use BecLogsQueue.instance()")
|
||||
def __init__(
|
||||
self,
|
||||
parent: QObject | None,
|
||||
maxlen: int = 1000,
|
||||
line_formatter: LineFormatter = noop_format,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
super().__init__(parent=parent, **kwargs)
|
||||
self._timestamp_start: QDateTime | None = None
|
||||
self._timestamp_end: QDateTime | None = None
|
||||
self._max_length = maxlen
|
||||
self._paused = False
|
||||
self._data = deque(
|
||||
(
|
||||
item["data"]
|
||||
for item in self.bec_dispatcher.client.connector.xread(
|
||||
MessageEndpoints.log(), count=self._max_length, id="0"
|
||||
)
|
||||
),
|
||||
maxlen=self._max_length,
|
||||
)
|
||||
self._incoming: deque[LogMessage] = deque([], maxlen=self._max_length)
|
||||
self._data: deque[LogMessage] = deque([], self._max_length)
|
||||
self._display_queue: deque[str] = deque([], self._max_length)
|
||||
self._log_level: str | None = None
|
||||
self._search_query: Pattern | str | None = None
|
||||
self._selected_services: set[str] | None = None
|
||||
self._set_formatter_and_update_filter(line_formatter)
|
||||
# instance attribute still accessible after c++ object is deleted, so the callback can be unregistered
|
||||
self.bec_dispatcher.connect_slot(self._process_incoming_log_msg, MessageEndpoints.log())
|
||||
|
||||
self._update_timer = QTimer(self, interval=_CONST.UPDATE_INTERVAL_MS)
|
||||
self._update_timer.timeout.connect(self._proc_update)
|
||||
QCoreApplication.instance().aboutToQuit.connect(self.cleanup) # type: ignore
|
||||
self._update_timer.start()
|
||||
|
||||
def __len__(self):
|
||||
return len(self._data)
|
||||
|
||||
@SafeSlot()
|
||||
def toggle_pause(self):
|
||||
self._paused = not self._paused
|
||||
self.paused.emit(self._paused)
|
||||
|
||||
def row_data(self, index: int) -> LogMessage | None:
|
||||
if index < 0 or index > (len(self._data) - 1):
|
||||
return None
|
||||
return self._data[index]
|
||||
|
||||
def cell_data(self, row: int, key: str):
|
||||
if key == "level":
|
||||
return self._data[row].log_type.upper()
|
||||
|
||||
msg_item = self._data[row].log_msg
|
||||
if isinstance(msg_item, str):
|
||||
return msg_item
|
||||
if key == "service_name":
|
||||
return msg_item.get(key)
|
||||
elif key in ["service_name", "function", "message"]:
|
||||
return msg_item.get("record", {}).get(key)
|
||||
elif key == "timestamp":
|
||||
return msg_item.get("record", {}).get("time", {}).get("repr")
|
||||
|
||||
def log_timestamp(self, row: int) -> float:
|
||||
msg_item = self._data[row].log_msg
|
||||
if isinstance(msg_item, str):
|
||||
return 0
|
||||
return msg_item.get("record", {}).get("time", {}).get("timestamp")
|
||||
|
||||
def cleanup(self, *_):
|
||||
"""Stop listening to the Redis log stream"""
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
self._process_incoming_log_msg, [MessageEndpoints.log()]
|
||||
)
|
||||
self._update_timer.stop()
|
||||
BecLogsQueue._instance = None
|
||||
|
||||
@SafeSlot(verify_sender=True)
|
||||
def _process_incoming_log_msg(self, msg: dict, _metadata: dict):
|
||||
try:
|
||||
_msg = LogMessage(**msg)
|
||||
self._incoming.append(_msg)
|
||||
self._data.append(_msg)
|
||||
if self.filter is None or self.filter(_msg):
|
||||
self._display_queue.append(self._line_formatter(_msg))
|
||||
self.new_message.emit()
|
||||
except Exception as e:
|
||||
if "Internal C++ object (BecLogsQueue) already deleted." in e.args:
|
||||
return
|
||||
logger.warning(f"Error in LogPanel incoming message callback: {e}")
|
||||
|
||||
@SafeSlot(verify_sender=True)
|
||||
def _proc_update(self):
|
||||
if self._paused or len(self._incoming) == 0:
|
||||
return
|
||||
self._data.extend(self._incoming)
|
||||
self._incoming.clear()
|
||||
self.new_messages.emit()
|
||||
def _set_formatter_and_update_filter(self, line_formatter: LineFormatter = noop_format):
|
||||
self._line_formatter: LineFormatter = line_formatter
|
||||
self._queue_formatter: LinesHtmlFormatter = create_formatter(
|
||||
self._line_formatter, self.filter
|
||||
)
|
||||
|
||||
def _combine_filters(self, *args: LineFilter):
|
||||
return lambda msg: reduce(operator.and_, [filt(msg) for filt in args if filt is not None])
|
||||
|
||||
class BecLogsTableModel(QAbstractTableModel):
|
||||
def __init__(self, parent: QWidget | None = None):
|
||||
super().__init__(parent)
|
||||
self.log_queue = BecLogsQueue.instance()
|
||||
self.log_queue.new_messages.connect(self.handle_new_messages)
|
||||
self._headers = _CONST.headers
|
||||
|
||||
def rowCount(self, parent: QModelIndex | QPersistentModelIndex = QModelIndex()) -> int:
|
||||
return len(self.log_queue)
|
||||
|
||||
def columnCount(self, parent: QModelIndex | QPersistentModelIndex = QModelIndex()) -> int:
|
||||
return len(self._headers)
|
||||
|
||||
def headerData(self, section, orientation, role=int(Qt.ItemDataRole.DisplayRole)):
|
||||
if role == Qt.ItemDataRole.DisplayRole and orientation == Qt.Orientation.Horizontal:
|
||||
return self._headers[section]
|
||||
return None
|
||||
|
||||
def get_row_data(self, index: QModelIndex) -> LogMessage | None:
|
||||
"""Return the row data for the given index."""
|
||||
if not index.isValid():
|
||||
def _create_re_filter(self) -> LineFilter:
|
||||
if self._search_query is None:
|
||||
return None
|
||||
return self.log_queue.row_data(index.row())
|
||||
elif isinstance(self._search_query, str):
|
||||
return lambda line: self._search_query in log_txt(line)
|
||||
return lambda line: self._search_query.match(log_txt(line)) is not None
|
||||
|
||||
def timestamp(self, row: int):
|
||||
return QDateTime.fromMSecsSinceEpoch(int(self.log_queue.log_timestamp(row) * 1000))
|
||||
def _create_service_filter(self):
|
||||
return (
|
||||
lambda line: self._selected_services is None or log_svc(line) in self._selected_services
|
||||
)
|
||||
|
||||
def data(self, index, role=int(Qt.ItemDataRole.DisplayRole)):
|
||||
"""Return data for the given index and role."""
|
||||
if not index.isValid():
|
||||
def _create_timestamp_filter(self) -> LineFilter:
|
||||
s, e = self._timestamp_start, self._timestamp_end
|
||||
if s is e is None:
|
||||
return lambda msg: True
|
||||
|
||||
def _time_filter(msg):
|
||||
msg_time = log_time(msg)
|
||||
if s is None:
|
||||
return msg_time <= e
|
||||
if e is None:
|
||||
return s <= msg_time
|
||||
return s <= msg_time <= e
|
||||
|
||||
return _time_filter
|
||||
|
||||
@property
|
||||
def filter(self) -> LineFilter:
|
||||
"""A function which filters a log message based on all applied criteria"""
|
||||
thresh = LogLevel[self._log_level].value if self._log_level is not None else 0
|
||||
return self._combine_filters(
|
||||
partial(level_filter, thresh=thresh),
|
||||
self._create_re_filter(),
|
||||
self._create_timestamp_filter(),
|
||||
self._create_service_filter(),
|
||||
)
|
||||
|
||||
def update_level_filter(self, level: str):
|
||||
"""Change the log-level of the level filter"""
|
||||
if level not in [l.name for l in LogLevel]:
|
||||
logger.error(f"Logging level {level} unrecognized for filter!")
|
||||
return
|
||||
if role in [Qt.ItemDataRole.DisplayRole, Qt.ItemDataRole.ToolTipRole]:
|
||||
return self.log_queue.cell_data(index.row(), self._headers[index.column()])
|
||||
if role in [Qt.ItemDataRole.ForegroundRole]:
|
||||
return self._map_log_level_color(self.log_queue.cell_data(index.row(), "level"))
|
||||
self._log_level = level
|
||||
self._set_formatter_and_update_filter(self._line_formatter)
|
||||
|
||||
def _map_log_level_color(self, data):
|
||||
return _DEFAULT_LOG_COLORS.get(data)
|
||||
def update_search_filter(self, search_query: Pattern | str | None = None):
|
||||
"""Change the string or regex to filter against"""
|
||||
self._search_query = search_query
|
||||
self._set_formatter_and_update_filter(self._line_formatter)
|
||||
|
||||
def handle_new_messages(self):
|
||||
self.dataChanged.emit(
|
||||
self.index(0, 0), self.index(self.rowCount() - 1, self.columnCount() - 1)
|
||||
def update_time_filter(self, start: QDateTime | None, end: QDateTime | None):
|
||||
"""Change the start and/or end times to filter against"""
|
||||
self._timestamp_start = start
|
||||
self._timestamp_end = end
|
||||
self._set_formatter_and_update_filter(self._line_formatter)
|
||||
|
||||
def update_service_filter(self, services: set[str]):
|
||||
"""Change the selected services to display"""
|
||||
self._selected_services = services
|
||||
self._set_formatter_and_update_filter(self._line_formatter)
|
||||
|
||||
def update_line_formatter(self, line_formatter: LineFormatter):
|
||||
"""Update the formatter"""
|
||||
self._set_formatter_and_update_filter(line_formatter)
|
||||
|
||||
def display_all(self) -> str:
|
||||
"""Return formatted output for all log messages"""
|
||||
return "\n".join(self._queue_formatter(self._data.copy()))
|
||||
|
||||
def format_new(self):
|
||||
"""Return formatted output for the display queue"""
|
||||
res = "\n".join(self._display_queue)
|
||||
self._display_queue = deque([], self._max_length)
|
||||
return res
|
||||
|
||||
def clear_logs(self):
|
||||
"""Clear the cache and display queue"""
|
||||
self._data = deque([])
|
||||
self._display_queue = deque([])
|
||||
|
||||
def fetch_history(self):
|
||||
"""Fetch all available messages from Redis"""
|
||||
self._data = deque(
|
||||
item["data"]
|
||||
for item in self.bec_dispatcher.client.connector.xread(
|
||||
MessageEndpoints.log().endpoint, from_start=True, count=self._max_length
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class LogMsgProxyModel(QSortFilterProxyModel):
|
||||
|
||||
show_service_column = Signal(bool)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parent=None,
|
||||
service_filter: set[str] | None = None,
|
||||
level_filter: LogLevel | None = None,
|
||||
):
|
||||
super().__init__(parent)
|
||||
self._service_filter = service_filter or set()
|
||||
self._level_filter: LogLevel | None = level_filter
|
||||
self._filter_text: str = ""
|
||||
self._fuzzy_search: bool = False
|
||||
self._time_filter_start: QDateTime | None = None
|
||||
self._time_filter_end: QDateTime | None = None
|
||||
|
||||
def get_row_data(self, rows: Iterable[QModelIndex]) -> Iterable[LogMessage | None]:
|
||||
return (self.sourceModel().get_row_data(self.mapToSource(idx)) for idx in rows)
|
||||
|
||||
def sourceModel(self) -> BecLogsTableModel:
|
||||
return super().sourceModel() # type: ignore
|
||||
|
||||
@SafeSlot(int, int)
|
||||
def refresh(self, *_):
|
||||
self.invalidateRowsFilter()
|
||||
|
||||
@SafeSlot(None)
|
||||
@SafeSlot(set)
|
||||
def update_service_filter(self, filter: set[str]):
|
||||
"""Filter to the selected services (show any service in the provided set)
|
||||
|
||||
Args:
|
||||
filter (set[str] | None): set of services for which to show logs"""
|
||||
self._service_filter = filter
|
||||
self.show_service_column.emit(len(filter) != 1)
|
||||
self.invalidateRowsFilter()
|
||||
|
||||
@SafeSlot(None)
|
||||
@SafeSlot(LogLevel)
|
||||
def update_level_filter(self, filter: LogLevel | None):
|
||||
"""Filter to the selected log level
|
||||
|
||||
Args:
|
||||
filter (str | None): lowest log level to show"""
|
||||
self._level_filter = filter
|
||||
self.invalidateRowsFilter()
|
||||
|
||||
@SafeSlot(str)
|
||||
def update_filter_text(self, filter: str):
|
||||
"""Filter messages based on text
|
||||
|
||||
Args:
|
||||
filter (str | None): set of services for which to show logs"""
|
||||
self._filter_text = filter
|
||||
self.invalidateRowsFilter()
|
||||
|
||||
@SafeSlot(bool)
|
||||
def update_fuzzy(self, state: bool):
|
||||
"""Set text filter to fuzzy search or not
|
||||
|
||||
Args:
|
||||
state (bool): fuzzy search on"""
|
||||
self._fuzzy_search = state
|
||||
self.invalidateRowsFilter()
|
||||
|
||||
@SafeSlot(TimestampUpdate)
|
||||
def update_timestamp(self, update: TimestampUpdate):
|
||||
if update.update_type == "start":
|
||||
self._time_filter_start = update.value
|
||||
else:
|
||||
self._time_filter_end = update.value
|
||||
self.invalidateRowsFilter()
|
||||
|
||||
def filterAcceptsRow(self, source_row: int, source_parent) -> bool:
|
||||
# No service filter, and no filter text, display everything
|
||||
possible_filters = [
|
||||
self._service_filter,
|
||||
self._level_filter,
|
||||
self._filter_text,
|
||||
self._time_filter_start,
|
||||
self._time_filter_end,
|
||||
]
|
||||
if not any(map(bool, possible_filters)):
|
||||
return True
|
||||
model = self.sourceModel()
|
||||
# Filter out services
|
||||
if self._service_filter:
|
||||
col = _CONST.headers.index("service_name")
|
||||
if model.data(model.index(source_row, col, source_parent)) not in self._service_filter:
|
||||
return False
|
||||
# Filter out levels
|
||||
if self._level_filter:
|
||||
col = _CONST.headers.index("level")
|
||||
level: str = model.data(model.index(source_row, col, source_parent)) # type: ignore
|
||||
if LogLevel[level] < self._level_filter:
|
||||
return False
|
||||
# Filter time
|
||||
if self._time_filter_start:
|
||||
if model.timestamp(source_row) < self._time_filter_start:
|
||||
return False
|
||||
if self._time_filter_end:
|
||||
if model.timestamp(source_row) > self._time_filter_end:
|
||||
return False
|
||||
# Filter message text - must go last because this can return True
|
||||
if self._filter_text:
|
||||
col = _CONST.headers.index("message")
|
||||
msg: str = model.data(model.index(source_row, col, source_parent)).lower() # type: ignore
|
||||
if self._fuzzy_search:
|
||||
if fuzz.partial_ratio(self._filter_text.lower(), msg) >= _CONST.FUZZ_THRESHOLD:
|
||||
return True
|
||||
else:
|
||||
return self._filter_text.lower() in msg.lower()
|
||||
return True
|
||||
|
||||
|
||||
class BecLogTableView(QTableView):
|
||||
def __init__(self, *args, max_message_width: int = 1000, **kwargs) -> None:
|
||||
super().__init__(*args, **kwargs)
|
||||
header = QHeaderView(Qt.Orientation.Horizontal, parent=self)
|
||||
header.setSectionResizeMode(QHeaderView.ResizeMode.Interactive)
|
||||
header.setStretchLastSection(True)
|
||||
header.setMaximumSectionSize(max_message_width)
|
||||
self.setHorizontalHeader(header)
|
||||
|
||||
def model(self) -> LogMsgProxyModel:
|
||||
return super().model() # type: ignore
|
||||
|
||||
|
||||
class LogPanel(BECWidget, QWidget):
|
||||
"""Live display of the BEC logs in a table view."""
|
||||
|
||||
PLUGIN = True
|
||||
ICON_NAME = "browse_activity"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parent: QWidget | None = None,
|
||||
max_message_width: int = 1000,
|
||||
show_toolbar: bool = True,
|
||||
service_filter: set[str] | None = None,
|
||||
level_filter: LogLevel | None = None,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
super().__init__(parent=parent, **kwargs)
|
||||
self._setup_models(service_filter=service_filter, level_filter=level_filter)
|
||||
self._layout = QVBoxLayout()
|
||||
self.setLayout(self._layout)
|
||||
if show_toolbar:
|
||||
self._setup_toolbar(client=self.client)
|
||||
self._setup_table_view(max_message_width=max_message_width)
|
||||
self._update_service_filter(service_filter or set())
|
||||
if show_toolbar:
|
||||
self._connect_toolbar()
|
||||
self._proxy.show_service_column.connect(self._show_service_column)
|
||||
colors = QApplication.instance().theme.accent_colors # type: ignore
|
||||
dict_colors = QApplication.instance().theme.colors # type: ignore
|
||||
_DEFAULT_LOG_COLORS.update(
|
||||
{
|
||||
LogLevel.INFO.name: dict_colors["FG"],
|
||||
LogLevel.SUCCESS.name: colors.success,
|
||||
LogLevel.WARNING.name: colors.warning,
|
||||
LogLevel.ERROR.name: colors.emergency,
|
||||
LogLevel.DEBUG.name: dict_colors["BORDER"],
|
||||
}
|
||||
)
|
||||
self._table.scrollToBottom()
|
||||
|
||||
def _setup_models(self, service_filter: set[str] | None, level_filter: LogLevel | None):
|
||||
self._model = BecLogsTableModel(parent=self)
|
||||
self._proxy = LogMsgProxyModel(
|
||||
parent=self, service_filter=service_filter, level_filter=level_filter
|
||||
)
|
||||
self._proxy.setSourceModel(self._model)
|
||||
self._model.log_queue.new_messages.connect(self._proxy.refresh)
|
||||
|
||||
def _setup_table_view(self, max_message_width: int) -> None:
|
||||
"""Setup the table view."""
|
||||
self._table = BecLogTableView(self, max_message_width=max_message_width)
|
||||
self._table.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
|
||||
self._layout.addWidget(self._table)
|
||||
self._table.setModel(self._proxy)
|
||||
self._table.setHorizontalScrollMode(QTableView.ScrollMode.ScrollPerPixel)
|
||||
self._table.setTextElideMode(Qt.TextElideMode.ElideRight)
|
||||
self._table.resizeColumnsToContents()
|
||||
|
||||
def _setup_toolbar(self, client: BECClient):
|
||||
self._toolbar = LogPanelToolbar(self, client)
|
||||
self._layout.addWidget(self._toolbar)
|
||||
|
||||
def _connect_toolbar(self):
|
||||
self._toolbar.services_selected.connect(self._proxy.update_service_filter)
|
||||
self._toolbar.search_textbox.textChanged.connect(self._proxy.update_filter_text)
|
||||
self._toolbar.level_changed.connect(self._proxy.update_level_filter)
|
||||
self._toolbar.fuzzy_changed.connect(self._proxy.update_fuzzy)
|
||||
self._toolbar.timestamp_update.connect(self._proxy.update_timestamp)
|
||||
self._toolbar.pause_button.clicked.connect(self._model.log_queue.toggle_pause)
|
||||
self._model.log_queue.paused.connect(self._toolbar._update_pause_button_icon)
|
||||
|
||||
def _update_service_filter(self, filter: set[str]):
|
||||
self._service_filter = filter
|
||||
self._proxy.update_service_filter(filter)
|
||||
self._table.setColumnHidden(
|
||||
_CONST.headers.index("service_name"), len(self._service_filter) == 1
|
||||
)
|
||||
|
||||
@SafeSlot(bool)
|
||||
def _show_service_column(self, show: bool):
|
||||
self._table.setColumnHidden(_CONST.headers.index("service_name"), not show)
|
||||
|
||||
def sizeHint(self) -> QSize:
|
||||
return QSize(600, 300)
|
||||
def unique_service_names_from_history(self) -> set[str]:
|
||||
"""Go through the log history to determine active service names"""
|
||||
return set(msg.log_msg["service_name"] for msg in self._data)
|
||||
|
||||
|
||||
class LogPanelToolbar(QWidget):
|
||||
|
||||
services_selected = Signal(set)
|
||||
level_changed = Signal(LogLevel)
|
||||
fuzzy_changed = Signal(bool)
|
||||
timestamp_update = Signal(TimestampUpdate)
|
||||
services_selected: SignalInstance = Signal(set)
|
||||
|
||||
def __init__(self, parent: QWidget | None = None, client: BECClient | None = None) -> None:
|
||||
def __init__(self, parent: QWidget | None = None) -> None:
|
||||
"""A toolbar for the logpanel, mainly used for managing the states of filters"""
|
||||
super().__init__(parent)
|
||||
|
||||
@@ -454,46 +231,44 @@ class LogPanelToolbar(QWidget):
|
||||
self._timestamp_end: QDateTime | None = None
|
||||
|
||||
self._unique_service_names: set[str] = set()
|
||||
self._services_selected: set[str] = set()
|
||||
self._services_selected: set[str] | None = None
|
||||
|
||||
self._layout = QHBoxLayout(self)
|
||||
self.layout = QHBoxLayout(self) # type: ignore
|
||||
|
||||
if client is not None:
|
||||
self.client = client
|
||||
self.service_choice_button = QPushButton("Select services", self)
|
||||
self._layout.addWidget(self.service_choice_button)
|
||||
self.service_choice_button.clicked.connect(self._open_service_filter_dialog)
|
||||
self.service_choice_button = QPushButton("Select services", self)
|
||||
self.layout.addWidget(self.service_choice_button)
|
||||
self.service_choice_button.clicked.connect(self._open_service_filter_dialog)
|
||||
|
||||
self.filter_level_dropdown = self._log_level_box()
|
||||
self._layout.addWidget(self.filter_level_dropdown)
|
||||
self.filter_level_dropdown.currentTextChanged.connect(self._emit_level)
|
||||
self.layout.addWidget(self.filter_level_dropdown)
|
||||
|
||||
self.clear_button = QPushButton("Clear all", self)
|
||||
self.layout.addWidget(self.clear_button)
|
||||
self.fetch_button = QPushButton("Fetch history", self)
|
||||
self.layout.addWidget(self.fetch_button)
|
||||
|
||||
self._string_search_box()
|
||||
|
||||
self.timerange_button = QPushButton("Set time range", self)
|
||||
self._layout.addWidget(self.timerange_button)
|
||||
self.timerange_button.clicked.connect(self._open_datetime_dialog)
|
||||
self.layout.addWidget(self.timerange_button)
|
||||
|
||||
self.pause_button = QToolButton()
|
||||
self.pause_button.setIcon(material_icon("pause", size=(20, 20), convert_to_pixmap=False))
|
||||
self._layout.addWidget(self.pause_button)
|
||||
@property
|
||||
def time_start(self):
|
||||
return self._timestamp_start
|
||||
|
||||
@SafeSlot(bool)
|
||||
def _update_pause_button_icon(self, paused):
|
||||
self.pause_button.setIcon(
|
||||
material_icon(
|
||||
"play_arrow" if paused else "pause", size=(20, 20), convert_to_pixmap=False
|
||||
)
|
||||
)
|
||||
@property
|
||||
def time_end(self):
|
||||
return self._timestamp_end
|
||||
|
||||
def _string_search_box(self):
|
||||
self._layout.addWidget(QLabel("Search: "))
|
||||
self.layout.addWidget(QLabel("Search: "))
|
||||
self.search_textbox = QLineEdit()
|
||||
self._layout.addWidget(self.search_textbox)
|
||||
self._layout.addWidget(QLabel("Fuzzy: "))
|
||||
self.fuzzy = QCheckBox()
|
||||
self._layout.addWidget(self.fuzzy)
|
||||
self.fuzzy.checkStateChanged.connect(self._emit_fuzzy)
|
||||
self.layout.addWidget(self.search_textbox)
|
||||
self.layout.addWidget(QLabel("Use regex: "))
|
||||
self.regex_enabled = QCheckBox()
|
||||
self.layout.addWidget(self.regex_enabled)
|
||||
self.update_re_button = QPushButton("Update search", self)
|
||||
self.layout.addWidget(self.update_re_button)
|
||||
|
||||
def _log_level_box(self):
|
||||
box = QComboBox()
|
||||
@@ -501,14 +276,6 @@ class LogPanelToolbar(QWidget):
|
||||
[box.addItem(l.name) for l in LogLevel]
|
||||
return box
|
||||
|
||||
@SafeSlot(str)
|
||||
def _emit_level(self, level: str):
|
||||
self.level_changed.emit(LogLevel[level])
|
||||
|
||||
@SafeSlot(Qt.CheckState)
|
||||
def _emit_fuzzy(self, state: Qt.CheckState):
|
||||
self.fuzzy_changed.emit(state == Qt.CheckState.Checked)
|
||||
|
||||
def _current_ts(self, selection_type: Literal["start", "end"]):
|
||||
if selection_type == "start":
|
||||
return self._timestamp_start
|
||||
@@ -517,7 +284,6 @@ class LogPanelToolbar(QWidget):
|
||||
else:
|
||||
raise ValueError(f"timestamps can only be for the start or end, not {selection_type}")
|
||||
|
||||
@SafeSlot()
|
||||
def _open_datetime_dialog(self):
|
||||
"""Open dialog window for timestamp filter selection"""
|
||||
self._dt_dialog = QDialog(self)
|
||||
@@ -546,8 +312,8 @@ class LogPanelToolbar(QWidget):
|
||||
)
|
||||
_layout.addWidget(date_clear_button)
|
||||
|
||||
date_button_set("start", label_start)
|
||||
date_button_set("end", label_end)
|
||||
for v in [("start", label_start), ("end", label_end)]:
|
||||
date_button_set(*v)
|
||||
|
||||
close_button = QPushButton("Close", parent=self._dt_dialog)
|
||||
close_button.clicked.connect(self._dt_dialog.accept)
|
||||
@@ -586,15 +352,19 @@ class LogPanelToolbar(QWidget):
|
||||
self._timestamp_start = dt
|
||||
else:
|
||||
self._timestamp_end = dt
|
||||
self.timestamp_update.emit(TimestampUpdate(value=dt, update_type=selection_type))
|
||||
|
||||
def service_list_update(self, services_info: dict[str, StatusMessage]):
|
||||
@SafeSlot(dict, set)
|
||||
def service_list_update(
|
||||
self, services_info: dict[str, StatusMessage], services_from_history: set[str], *_, **__
|
||||
):
|
||||
"""Change the list of services which can be selected"""
|
||||
self._unique_service_names = set([s.split("/")[0] for s in services_info.keys()])
|
||||
self._unique_service_names |= services_from_history
|
||||
if self._services_selected is None:
|
||||
self._services_selected = self._unique_service_names
|
||||
|
||||
@SafeSlot()
|
||||
def _open_service_filter_dialog(self):
|
||||
self.service_list_update(self.client.service_status)
|
||||
if len(self._unique_service_names) == 0 or self._services_selected is None:
|
||||
return
|
||||
self._svc_dialog = QDialog(self)
|
||||
@@ -602,7 +372,7 @@ class LogPanelToolbar(QWidget):
|
||||
layout = QVBoxLayout()
|
||||
self._svc_dialog.setLayout(layout)
|
||||
|
||||
service_cb_grid = QGridLayout()
|
||||
service_cb_grid = QGridLayout(parent=self._svc_dialog)
|
||||
layout.addLayout(service_cb_grid)
|
||||
|
||||
def check_box(name: str, checked: Qt.CheckState):
|
||||
@@ -628,6 +398,146 @@ class LogPanelToolbar(QWidget):
|
||||
self._svc_dialog.deleteLater()
|
||||
|
||||
|
||||
class LogPanel(TextBox):
|
||||
"""Displays a log panel"""
|
||||
|
||||
ICON_NAME = "terminal"
|
||||
service_list_update = Signal(dict, set)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parent=None,
|
||||
client: BECClient | None = None,
|
||||
service_status: BECServiceStatusMixin | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the LogPanel widget."""
|
||||
super().__init__(parent=parent, client=client, config={"text": ""}, **kwargs)
|
||||
self._update_colors()
|
||||
self._service_status = service_status or BECServiceStatusMixin(self, client=self.client) # type: ignore
|
||||
self._log_manager = BecLogsQueue(
|
||||
parent=self, line_formatter=partial(simple_color_format, colors=self._colors)
|
||||
)
|
||||
self._proxy_update = SignalProxy(
|
||||
self._log_manager.new_message, rateLimit=1, slot=self._on_append
|
||||
)
|
||||
|
||||
self.toolbar = LogPanelToolbar(parent=self)
|
||||
self.toolbar_area = QScrollArea()
|
||||
self.toolbar_area.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff)
|
||||
self.toolbar_area.setSizeAdjustPolicy(QScrollArea.SizeAdjustPolicy.AdjustToContents)
|
||||
self.toolbar_area.setFixedHeight(int(self.toolbar.clear_button.height() * 2))
|
||||
self.toolbar_area.setWidget(self.toolbar)
|
||||
|
||||
self.layout.addWidget(self.toolbar_area)
|
||||
self.toolbar.clear_button.clicked.connect(self._on_clear)
|
||||
self.toolbar.fetch_button.clicked.connect(self._on_fetch)
|
||||
self.toolbar.update_re_button.clicked.connect(self._on_re_update)
|
||||
self.toolbar.search_textbox.returnPressed.connect(self._on_re_update)
|
||||
self.toolbar.regex_enabled.checkStateChanged.connect(self._on_re_update)
|
||||
self.toolbar.filter_level_dropdown.currentTextChanged.connect(self._set_level_filter)
|
||||
|
||||
self.toolbar.timerange_button.clicked.connect(self._choose_datetime)
|
||||
self._service_status.services_update.connect(self._update_service_list)
|
||||
self.service_list_update.connect(self.toolbar.service_list_update)
|
||||
self.toolbar.services_selected.connect(self._update_service_filter)
|
||||
|
||||
self.text_box_text_edit.setFont(QFont("monospace", 12))
|
||||
self.text_box_text_edit.setHtml("")
|
||||
self.text_box_text_edit.setLineWrapMode(QTextEdit.LineWrapMode.NoWrap)
|
||||
|
||||
self._connect_to_theme_change()
|
||||
|
||||
@SafeSlot(set)
|
||||
def _update_service_filter(self, services: set[str]):
|
||||
self._log_manager.update_service_filter(services)
|
||||
self._on_redraw()
|
||||
|
||||
@SafeSlot(dict, dict)
|
||||
def _update_service_list(self, services_info: dict[str, StatusMessage], *_, **__):
|
||||
self.service_list_update.emit(
|
||||
services_info, self._log_manager.unique_service_names_from_history()
|
||||
)
|
||||
|
||||
@SafeSlot()
|
||||
def _choose_datetime(self):
|
||||
self.toolbar._open_datetime_dialog()
|
||||
self._set_time_filter()
|
||||
|
||||
def _connect_to_theme_change(self):
|
||||
"""Connect to the theme change signal."""
|
||||
qapp = QApplication.instance()
|
||||
if hasattr(qapp, "theme_signal"):
|
||||
qapp.theme_signal.theme_updated.connect(self._on_redraw) # type: ignore
|
||||
|
||||
def _update_colors(self):
|
||||
self._colors = DEFAULT_LOG_COLORS.copy()
|
||||
self._colors.update({LogLevel.INFO: get_theme_palette().text().color().name()})
|
||||
|
||||
def _cursor_to_end(self):
|
||||
c = self.text_box_text_edit.textCursor()
|
||||
c.movePosition(c.MoveOperation.End)
|
||||
self.text_box_text_edit.setTextCursor(c)
|
||||
|
||||
@SafeSlot()
|
||||
@SafeSlot(str)
|
||||
def _on_redraw(self, *_):
|
||||
self._update_colors()
|
||||
self._log_manager.update_line_formatter(partial(simple_color_format, colors=self._colors))
|
||||
self.set_html_text(self._log_manager.display_all())
|
||||
self._cursor_to_end()
|
||||
|
||||
@SafeSlot(verify_sender=True)
|
||||
def _on_append(self, *_):
|
||||
self.text_box_text_edit.insertHtml(self._log_manager.format_new())
|
||||
self._cursor_to_end()
|
||||
|
||||
@SafeSlot()
|
||||
def _on_clear(self):
|
||||
self._log_manager.clear_logs()
|
||||
self.set_html_text(self._log_manager.display_all())
|
||||
self._cursor_to_end()
|
||||
|
||||
@SafeSlot()
|
||||
@SafeSlot(Qt.CheckState)
|
||||
def _on_re_update(self, *_):
|
||||
if self.toolbar.regex_enabled.isChecked():
|
||||
try:
|
||||
search_query = re.compile(self.toolbar.search_textbox.text())
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to compile search regex with error {e}")
|
||||
search_query = None
|
||||
logger.info(f"Setting LogPanel search regex to {search_query}")
|
||||
else:
|
||||
search_query = self.toolbar.search_textbox.text()
|
||||
logger.info(f'Setting LogPanel search string to "{search_query}"')
|
||||
self._log_manager.update_search_filter(search_query)
|
||||
self.set_html_text(self._log_manager.display_all())
|
||||
self._cursor_to_end()
|
||||
|
||||
@SafeSlot()
|
||||
def _on_fetch(self):
|
||||
self._log_manager.fetch_history()
|
||||
self.set_html_text(self._log_manager.display_all())
|
||||
self._cursor_to_end()
|
||||
|
||||
@SafeSlot(str)
|
||||
def _set_level_filter(self, level: str):
|
||||
self._log_manager.update_level_filter(level)
|
||||
self._on_redraw()
|
||||
|
||||
@SafeSlot()
|
||||
def _set_time_filter(self):
|
||||
self._log_manager.update_time_filter(self.toolbar.time_start, self.toolbar.time_end)
|
||||
self._on_redraw()
|
||||
|
||||
def cleanup(self):
|
||||
self._service_status.cleanup()
|
||||
self._log_manager.cleanup()
|
||||
self._log_manager.deleteLater()
|
||||
super().cleanup()
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
import sys
|
||||
|
||||
@@ -635,15 +545,7 @@ if __name__ == "__main__": # pragma: no cover
|
||||
|
||||
app = QApplication(sys.argv)
|
||||
apply_theme("dark")
|
||||
panel = QWidget()
|
||||
queue = BecLogsQueue(panel)
|
||||
layout = QVBoxLayout(panel)
|
||||
layout.addWidget(QLabel("All logs, no filters:"))
|
||||
layout.addWidget(LogPanel())
|
||||
layout.addWidget(QLabel("All services, level filter WARNING preapplied:"))
|
||||
layout.addWidget(LogPanel(level_filter=LogLevel.WARNING))
|
||||
layout.addWidget(QLabel('All services, service filter {"DeviceServer"} preapplied:'))
|
||||
layout.addWidget(LogPanel(service_filter={"DeviceServer"}))
|
||||
widget = LogPanel()
|
||||
|
||||
panel.show()
|
||||
widget.show()
|
||||
sys.exit(app.exec())
|
||||
|
||||
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "bec_widgets"
|
||||
version = "3.1.4"
|
||||
version = "3.2.0"
|
||||
description = "BEC Widgets"
|
||||
requires-python = ">=3.11"
|
||||
classifiers = [
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import traceback
|
||||
|
||||
import pytest
|
||||
import qtpy.QtCore
|
||||
from pytestqt.exceptions import TimeoutError as QtBotTimeoutError
|
||||
@@ -7,14 +5,12 @@ from qtpy.QtCore import QTimer
|
||||
|
||||
|
||||
class TestableQTimer(QTimer):
|
||||
_instances: list[tuple[QTimer, str, str]] = []
|
||||
_instances: list[tuple[QTimer, str]] = []
|
||||
_current_test_name: str = ""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
tb = traceback.format_stack()
|
||||
init_line = list(filter(lambda msg: "QTimer(" in msg, tb))[-1]
|
||||
TestableQTimer._instances.append((self, TestableQTimer._current_test_name, init_line))
|
||||
TestableQTimer._instances.append((self, TestableQTimer._current_test_name))
|
||||
|
||||
@classmethod
|
||||
def check_all_stopped(cls, qtbot):
|
||||
@@ -24,21 +20,12 @@ class TestableQTimer(QTimer):
|
||||
except RuntimeError as e:
|
||||
return "already deleted" in e.args[0]
|
||||
|
||||
def _format_timers(timers: list[tuple[QTimer, str, str]]):
|
||||
return "\n".join(
|
||||
f"Timer: {t[0]}\n in test: {t[1]}\n created at:{t[2]}" for t in timers
|
||||
)
|
||||
|
||||
try:
|
||||
qtbot.waitUntil(
|
||||
lambda: all(_is_done_or_deleted(timer) for timer, _, _ in cls._instances)
|
||||
)
|
||||
qtbot.waitUntil(lambda: all(_is_done_or_deleted(timer) for timer, _ in cls._instances))
|
||||
except QtBotTimeoutError as exc:
|
||||
active_timers = list(filter(lambda t: t[0].isActive(), cls._instances))
|
||||
(t.stop() for t, _, _ in cls._instances)
|
||||
raise TimeoutError(
|
||||
f"Failed to stop all timers:\n{_format_timers(active_timers)}"
|
||||
) from exc
|
||||
(t.stop() for t, _ in cls._instances)
|
||||
raise TimeoutError(f"Failed to stop all timers: {active_timers}") from exc
|
||||
cls._instances = []
|
||||
|
||||
|
||||
|
||||
@@ -150,7 +150,9 @@ def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
|
||||
# communication should work, main dock area should have same id and be visible
|
||||
|
||||
yw = gui.new("Y")
|
||||
qtbot.waitUntil(lambda: len(gui.windows) == 2, timeout=3000)
|
||||
yw.delete_all()
|
||||
assert len(gui.windows) == 2
|
||||
yw.remove()
|
||||
assert len(gui.windows) == 1 # only bec is left
|
||||
qtbot.waitUntil(lambda: len(gui.windows) == 1, timeout=3000)
|
||||
assert len(gui.windows) == 1
|
||||
|
||||
@@ -72,6 +72,7 @@ def test_rpc_plotting_shortcuts_init_configs(qtbot, connected_client_gui_obj):
|
||||
"dap": None,
|
||||
"device": "bpm4i",
|
||||
"signal": "bpm4i",
|
||||
"dap_parameters": None,
|
||||
"dap_oversample": 1,
|
||||
}
|
||||
assert c1._config_dict["source"] == "device"
|
||||
|
||||
@@ -135,304 +135,321 @@ def maybe_remove_dock_area(qtbot, gui: BECGuiClient, random_int_gen: random.Rand
|
||||
wait_for_namespace_change(
|
||||
qtbot, gui=gui, parent_widget=gui, object_name=name, widget_gui_id=gui_id, exists=False
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
def test_widgets_e2e_bec_progress_bar(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
"""Test the BECProgressBar widget."""
|
||||
gui = connected_client_gui_obj
|
||||
bec = gui._client
|
||||
# Create dock_area and widget
|
||||
widget = create_widget(qtbot, gui, gui.available_widgets.BECProgressBar)
|
||||
widget: client.BECProgressBar
|
||||
|
||||
# Check rpc calls
|
||||
assert widget.label_template == "$value / $maximum - $percentage %"
|
||||
widget.set_maximum(100)
|
||||
widget.set_minimum(50)
|
||||
widget.set_value(75)
|
||||
|
||||
assert widget._get_label() == "75 / 100 - 50 %"
|
||||
|
||||
# Test removing the widget, or leaving it open for the next test
|
||||
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
@pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
def test_widgets_e2e_bec_queue(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
"""Test the BECQueue widget."""
|
||||
gui = connected_client_gui_obj
|
||||
bec = gui._client
|
||||
# Create dock_area and widget
|
||||
widget = create_widget(qtbot, gui, gui.available_widgets.BECQueue)
|
||||
widget: client.BECQueue
|
||||
|
||||
# No rpc calls to test so far
|
||||
# maybe we can add an rpc call to check the queue length
|
||||
|
||||
# Test removing the widget, or leaving it open for the next test
|
||||
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
@pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
def test_widgets_e2e_bec_status_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
"""Test the BECStatusBox widget."""
|
||||
gui = connected_client_gui_obj
|
||||
bec = gui._client
|
||||
# Create dock_area and widget
|
||||
widget = create_widget(qtbot, gui, gui.available_widgets.BECStatusBox)
|
||||
|
||||
# Check rpc calls
|
||||
assert widget.get_server_state() in ["RUNNING", "IDLE", "BUSY", "ERROR"]
|
||||
|
||||
# Test removing the widget, or leaving it open for the next test
|
||||
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
@pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
def test_widgets_e2e_dap_combo_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
"""Test the DAPComboBox widget."""
|
||||
gui = connected_client_gui_obj
|
||||
bec = gui._client
|
||||
# Create dock_area and widget
|
||||
widget = create_widget(qtbot, gui, gui.available_widgets.DapComboBox)
|
||||
widget: client.DAPComboBox
|
||||
|
||||
# Check rpc calls
|
||||
widget.select_fit_model("PseudoVoigtModel")
|
||||
widget.select_x_axis("samx")
|
||||
widget.select_y_axis("bpm4i")
|
||||
|
||||
# Test removing the widget, or leaving it open for the next test
|
||||
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
@pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
def test_widgets_e2e_device_browser(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
"""Test the DeviceBrowser widget."""
|
||||
gui = connected_client_gui_obj
|
||||
bec = gui._client
|
||||
# Create dock_area and widget
|
||||
widget = create_widget(qtbot, gui, gui.available_widgets.DeviceBrowser)
|
||||
widget: client.DeviceBrowser
|
||||
|
||||
# No rpc calls yet to check
|
||||
|
||||
# Test removing the widget, or leaving it open for the next test
|
||||
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
@pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
def test_widgets_e2e_image(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
"""Test the Image widget."""
|
||||
gui = connected_client_gui_obj
|
||||
bec = gui._client
|
||||
# Create dock_area and widget
|
||||
widget = create_widget(qtbot, gui, gui.available_widgets.Image)
|
||||
widget: client.Image
|
||||
|
||||
scans = bec.scans
|
||||
dev = bec.device_manager.devices
|
||||
# Test rpc calls
|
||||
img = widget.image(device=dev.eiger.name, signal="preview")
|
||||
assert img.get_data() is None
|
||||
# Run a scan and plot the image
|
||||
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
|
||||
s.wait()
|
||||
|
||||
def _wait_for_scan_in_history():
|
||||
# Get scan item from history
|
||||
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
|
||||
return scan_item is not None
|
||||
|
||||
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
|
||||
|
||||
# Check that last image is equivalent to data in Redis
|
||||
last_img = bec.connector.get_last(MessageEndpoints.device_preview("eiger", "preview"))[
|
||||
"data"
|
||||
].data
|
||||
assert np.allclose(img.get_data(), last_img)
|
||||
|
||||
# Now add a device with a preview signal
|
||||
img = widget.image(device="eiger", signal="preview")
|
||||
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
|
||||
s.wait()
|
||||
|
||||
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
|
||||
|
||||
# Test removing the widget, or leaving it open for the next test
|
||||
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
@pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
def test_widgets_e2e_minesweeper(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
"""Test the MineSweeper widget."""
|
||||
gui = connected_client_gui_obj
|
||||
bec = gui._client
|
||||
# Create dock_area and widget
|
||||
widget = create_widget(qtbot, gui, gui.available_widgets.Minesweeper)
|
||||
widget: client.MineSweeper
|
||||
|
||||
# No rpc calls to check so far
|
||||
|
||||
# Test removing the widget, or leaving it open for the next test
|
||||
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
@pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
def test_widgets_e2e_motor_map(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
"""Test the MotorMap widget."""
|
||||
gui = connected_client_gui_obj
|
||||
bec = gui._client
|
||||
# Create dock_area and widget
|
||||
widget = create_widget(qtbot, gui, gui.available_widgets.MotorMap)
|
||||
widget: client.MotorMap
|
||||
|
||||
# Test RPC calls
|
||||
dev = bec.device_manager.devices
|
||||
scans = bec.scans
|
||||
# Set motor map to names
|
||||
widget.map(dev.samx, dev.samy)
|
||||
# Move motor samx to pos
|
||||
pos = dev.samx.limits[1] - 1 # -1 from higher limit
|
||||
scans.mv(dev.samx, pos, relative=False).wait()
|
||||
# Check that data is up to date
|
||||
assert np.isclose(widget.get_data()["x"][-1], pos, dev.samx.precision)
|
||||
# Move motor samy to pos
|
||||
pos = dev.samy.limits[0] + 1 # +1 from lower limit
|
||||
scans.mv(dev.samy, pos, relative=False).wait()
|
||||
# Check that data is up to date
|
||||
assert np.isclose(widget.get_data()["y"][-1], pos, dev.samy.precision)
|
||||
|
||||
# Test removing the widget, or leaving it open for the next test
|
||||
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
@pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
def test_widgets_e2e_multi_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
"""Test MultiWaveform widget."""
|
||||
gui = connected_client_gui_obj
|
||||
bec = gui._client
|
||||
# Create dock_area and widget
|
||||
widget = create_widget(qtbot, gui, gui.available_widgets.MultiWaveform)
|
||||
widget: client.MultiWaveform
|
||||
|
||||
# Test RPC calls
|
||||
dev = bec.device_manager.devices
|
||||
scans = bec.scans
|
||||
# test plotting
|
||||
cm = "cividis"
|
||||
widget.plot(dev.waveform, color_palette=cm)
|
||||
assert widget.monitor == dev.waveform.name
|
||||
assert widget.color_palette == cm
|
||||
|
||||
# Scan with BEC
|
||||
s = scans.line_scan(dev.samx, -3, 3, steps=5, exp_time=0.01, relative=False)
|
||||
s.wait()
|
||||
|
||||
def _wait_for_scan_in_history():
|
||||
# Get scan item from history
|
||||
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
|
||||
return scan_item is not None
|
||||
|
||||
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
|
||||
# Wait for data in history (should be plotted?)
|
||||
|
||||
# TODO how can we check that the data was plotted, implement get_data()
|
||||
|
||||
# Test removing the widget, or leaving it open for the next test
|
||||
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
@pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
def test_widgets_e2e_positioner_indicator(
|
||||
qtbot, connected_client_gui_obj, random_generator_from_seed
|
||||
):
|
||||
"""Test the PositionIndicator widget."""
|
||||
gui = connected_client_gui_obj
|
||||
bec = gui._client
|
||||
# Create dock_area and widget
|
||||
widget = create_widget(qtbot, gui, gui.available_widgets.PositionIndicator)
|
||||
widget: client.PositionIndicator
|
||||
|
||||
# TODO check what these rpc calls are supposed to do! Issue created #461
|
||||
widget.set_value(5)
|
||||
|
||||
# Test removing the widget, or leaving it open for the next test
|
||||
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
@pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
def test_widgets_e2e_positioner_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
"""Test the PositionerBox widget."""
|
||||
gui = connected_client_gui_obj
|
||||
bec = gui._client
|
||||
# Create dock_area and widget
|
||||
widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox)
|
||||
widget: client.PositionerBox
|
||||
|
||||
# Test rpc calls
|
||||
dev = bec.device_manager.devices
|
||||
scans = bec.scans
|
||||
# No rpc calls to check so far
|
||||
widget.set_positioner(dev.samx)
|
||||
widget.set_positioner(dev.samy.name)
|
||||
|
||||
scans.mv(dev.samy, -3, relative=False).wait()
|
||||
|
||||
# Test removing the widget, or leaving it open for the next test
|
||||
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
@pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
def test_widgets_e2e_positioner_box_2d(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
"""Test the PositionerBox2D widget."""
|
||||
gui = connected_client_gui_obj
|
||||
bec = gui._client
|
||||
# Create dock_area and widget
|
||||
widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox2D)
|
||||
widget: client.PositionerBox2D
|
||||
|
||||
# Test rpc calls
|
||||
dev = bec.device_manager.devices
|
||||
scans = bec.scans
|
||||
# No rpc calls to check so far
|
||||
widget.set_positioner_hor(dev.samx)
|
||||
widget.set_positioner_ver(dev.samy)
|
||||
|
||||
# Try moving the motors
|
||||
scans.mv(dev.samx, 3, relative=False).wait()
|
||||
scans.mv(dev.samy, -3, relative=False).wait()
|
||||
|
||||
# Test removing the widget, or leaving it open for the next test
|
||||
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
@pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
def test_widgets_e2e_positioner_control_line(
|
||||
qtbot, connected_client_gui_obj, random_generator_from_seed
|
||||
):
|
||||
"""Test the positioner control line widget"""
|
||||
gui = connected_client_gui_obj
|
||||
bec = gui._client
|
||||
# Create dock_area and widget
|
||||
widget = create_widget(qtbot, gui, gui.available_widgets.PositionerControlLine)
|
||||
widget: client.PositionerControlLine
|
||||
|
||||
# Test rpc calls
|
||||
dev = bec.device_manager.devices
|
||||
scans = bec.scans
|
||||
# Set positioner
|
||||
widget.set_positioner(dev.samx)
|
||||
scans.mv(dev.samx, 3, relative=False).wait()
|
||||
widget.set_positioner(dev.samy.name)
|
||||
scans.mv(dev.samy, -3, relative=False).wait()
|
||||
|
||||
# Test removing the widget, or leaving it open for the next test
|
||||
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
# TODO passes locally, fails on CI for some reason... -> issue #1003
|
||||
qtbot.waitUntil(lambda: hasattr(gui, "dock_area") is False, timeout=5000)
|
||||
|
||||
|
||||
# @pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
# def test_widgets_e2e_bec_progress_bar(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
# """Test the BECProgressBar widget."""
|
||||
# gui = connected_client_gui_obj
|
||||
# bec = gui._client
|
||||
# # Create dock_area and widget
|
||||
# widget = create_widget(qtbot, gui, gui.available_widgets.BECProgressBar)
|
||||
# widget: client.BECProgressBar
|
||||
|
||||
# # Check rpc calls
|
||||
# assert widget.label_template == "$value / $maximum - $percentage %"
|
||||
# widget.set_maximum(100)
|
||||
# widget.set_minimum(50)
|
||||
# widget.set_value(75)
|
||||
|
||||
# assert widget._get_label() == "75 / 100 - 50 %"
|
||||
|
||||
# # Test removing the widget, or leaving it open for the next test
|
||||
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
# @pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
# def test_widgets_e2e_bec_queue(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
# """Test the BECQueue widget."""
|
||||
# gui = connected_client_gui_obj
|
||||
# bec = gui._client
|
||||
# # Create dock_area and widget
|
||||
# widget = create_widget(qtbot, gui, gui.available_widgets.BECQueue)
|
||||
# widget: client.BECQueue
|
||||
|
||||
# # No rpc calls to test so far
|
||||
# # maybe we can add an rpc call to check the queue length
|
||||
|
||||
# # Test removing the widget, or leaving it open for the next test
|
||||
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
# @pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
# def test_widgets_e2e_bec_status_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
# """Test the BECStatusBox widget."""
|
||||
# gui = connected_client_gui_obj
|
||||
# bec = gui._client
|
||||
# # Create dock_area and widget
|
||||
# widget = create_widget(qtbot, gui, gui.available_widgets.BECStatusBox)
|
||||
|
||||
# # Check rpc calls
|
||||
# assert widget.get_server_state() in ["RUNNING", "IDLE", "BUSY", "ERROR"]
|
||||
|
||||
# # Test removing the widget, or leaving it open for the next test
|
||||
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
# @pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
# def test_widgets_e2e_dap_combo_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
# """Test the DAPComboBox widget."""
|
||||
# gui = connected_client_gui_obj
|
||||
# bec = gui._client
|
||||
# # Create dock_area and widget
|
||||
# widget = create_widget(qtbot, gui, gui.available_widgets.DapComboBox)
|
||||
# widget: client.DAPComboBox
|
||||
|
||||
# # Check rpc calls
|
||||
# widget.select_fit_model("PseudoVoigtModel")
|
||||
# widget.select_x_axis("samx")
|
||||
# widget.select_y_axis("bpm4i")
|
||||
|
||||
# # Test removing the widget, or leaving it open for the next test
|
||||
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
# @pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
# def test_widgets_e2e_device_browser(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
# """Test the DeviceBrowser widget."""
|
||||
# gui = connected_client_gui_obj
|
||||
# bec = gui._client
|
||||
# # Create dock_area and widget
|
||||
# widget = create_widget(qtbot, gui, gui.available_widgets.DeviceBrowser)
|
||||
# widget: client.DeviceBrowser
|
||||
|
||||
# # No rpc calls yet to check
|
||||
|
||||
# # Test removing the widget, or leaving it open for the next test
|
||||
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
# @pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
# def test_widgets_e2e_image(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
# """Test the Image widget."""
|
||||
# gui = connected_client_gui_obj
|
||||
# bec = gui._client
|
||||
# # Create dock_area and widget
|
||||
# widget = create_widget(qtbot, gui, gui.available_widgets.Image)
|
||||
# widget: client.Image
|
||||
|
||||
# scans = bec.scans
|
||||
# dev = bec.device_manager.devices
|
||||
# # Test rpc calls
|
||||
# img = widget.image(device=dev.eiger.name, signal="preview")
|
||||
# assert img.get_data() is None
|
||||
# # Run a scan and plot the image
|
||||
# s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
|
||||
# s.wait()
|
||||
|
||||
# def _wait_for_scan_in_history():
|
||||
# # Get scan item from history
|
||||
# scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
|
||||
# return scan_item is not None
|
||||
|
||||
# qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
|
||||
|
||||
# # Check that last image is equivalent to data in Redis
|
||||
# last_img = bec.connector.get_last(MessageEndpoints.device_preview("eiger", "preview"))[
|
||||
# "data"
|
||||
# ].data
|
||||
# assert np.allclose(img.get_data(), last_img)
|
||||
|
||||
# # Now add a device with a preview signal
|
||||
# img = widget.image(device="eiger", signal="preview")
|
||||
# s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
|
||||
# s.wait()
|
||||
|
||||
# qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
|
||||
|
||||
# # Test removing the widget, or leaving it open for the next test
|
||||
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
# # TODO re-enable when issue is resolved #560
|
||||
# # @pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
# # def test_widgets_e2e_log_panel(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
# # """Test the LogPanel widget."""
|
||||
# # gui = connected_client_gui_obj
|
||||
# # bec = gui._client
|
||||
# # # Create dock_area and widget
|
||||
# # widget = create_widget(qtbot, gui, gui.available_widgets.LogPanel)
|
||||
# # widget: client.LogPanel
|
||||
|
||||
# # # No rpc calls to check so far
|
||||
|
||||
# # # Test removing the widget, or leaving it open for the next test
|
||||
# # maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
# @pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
# def test_widgets_e2e_minesweeper(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
# """Test the MineSweeper widget."""
|
||||
# gui = connected_client_gui_obj
|
||||
# bec = gui._client
|
||||
# # Create dock_area and widget
|
||||
# widget = create_widget(qtbot, gui, gui.available_widgets.Minesweeper)
|
||||
# widget: client.MineSweeper
|
||||
|
||||
# # No rpc calls to check so far
|
||||
|
||||
# # Test removing the widget, or leaving it open for the next test
|
||||
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
# @pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
# def test_widgets_e2e_motor_map(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
# """Test the MotorMap widget."""
|
||||
# gui = connected_client_gui_obj
|
||||
# bec = gui._client
|
||||
# # Create dock_area and widget
|
||||
# widget = create_widget(qtbot, gui, gui.available_widgets.MotorMap)
|
||||
# widget: client.MotorMap
|
||||
|
||||
# # Test RPC calls
|
||||
# dev = bec.device_manager.devices
|
||||
# scans = bec.scans
|
||||
# # Set motor map to names
|
||||
# widget.map(dev.samx, dev.samy)
|
||||
# # Move motor samx to pos
|
||||
# pos = dev.samx.limits[1] - 1 # -1 from higher limit
|
||||
# scans.mv(dev.samx, pos, relative=False).wait()
|
||||
# # Check that data is up to date
|
||||
# assert np.isclose(widget.get_data()["x"][-1], pos, dev.samx.precision)
|
||||
# # Move motor samy to pos
|
||||
# pos = dev.samy.limits[0] + 1 # +1 from lower limit
|
||||
# scans.mv(dev.samy, pos, relative=False).wait()
|
||||
# # Check that data is up to date
|
||||
# assert np.isclose(widget.get_data()["y"][-1], pos, dev.samy.precision)
|
||||
|
||||
# # Test removing the widget, or leaving it open for the next test
|
||||
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
# @pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
# def test_widgets_e2e_multi_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
# """Test MultiWaveform widget."""
|
||||
# gui = connected_client_gui_obj
|
||||
# bec = gui._client
|
||||
# # Create dock_area and widget
|
||||
# widget = create_widget(qtbot, gui, gui.available_widgets.MultiWaveform)
|
||||
# widget: client.MultiWaveform
|
||||
|
||||
# # Test RPC calls
|
||||
# dev = bec.device_manager.devices
|
||||
# scans = bec.scans
|
||||
# # test plotting
|
||||
# cm = "cividis"
|
||||
# widget.plot(dev.waveform, color_palette=cm)
|
||||
# assert widget.monitor == dev.waveform.name
|
||||
# assert widget.color_palette == cm
|
||||
|
||||
# # Scan with BEC
|
||||
# s = scans.line_scan(dev.samx, -3, 3, steps=5, exp_time=0.01, relative=False)
|
||||
# s.wait()
|
||||
|
||||
# def _wait_for_scan_in_history():
|
||||
# # Get scan item from history
|
||||
# scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
|
||||
# return scan_item is not None
|
||||
|
||||
# qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
|
||||
# # Wait for data in history (should be plotted?)
|
||||
|
||||
# # TODO how can we check that the data was plotted, implement get_data()
|
||||
|
||||
# # Test removing the widget, or leaving it open for the next test
|
||||
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
# @pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
# def test_widgets_e2e_positioner_indicator(
|
||||
# qtbot, connected_client_gui_obj, random_generator_from_seed
|
||||
# ):
|
||||
# """Test the PositionIndicator widget."""
|
||||
# gui = connected_client_gui_obj
|
||||
# bec = gui._client
|
||||
# # Create dock_area and widget
|
||||
# widget = create_widget(qtbot, gui, gui.available_widgets.PositionIndicator)
|
||||
# widget: client.PositionIndicator
|
||||
|
||||
# # TODO check what these rpc calls are supposed to do! Issue created #461
|
||||
# widget.set_value(5)
|
||||
|
||||
# # Test removing the widget, or leaving it open for the next test
|
||||
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
# @pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
# def test_widgets_e2e_positioner_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
# """Test the PositionerBox widget."""
|
||||
# gui = connected_client_gui_obj
|
||||
# bec = gui._client
|
||||
# # Create dock_area and widget
|
||||
# widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox)
|
||||
# widget: client.PositionerBox
|
||||
|
||||
# # Test rpc calls
|
||||
# dev = bec.device_manager.devices
|
||||
# scans = bec.scans
|
||||
# # No rpc calls to check so far
|
||||
# widget.set_positioner(dev.samx)
|
||||
# widget.set_positioner(dev.samy.name)
|
||||
|
||||
# scans.mv(dev.samy, -3, relative=False).wait()
|
||||
|
||||
# # Test removing the widget, or leaving it open for the next test
|
||||
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
# @pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
# def test_widgets_e2e_positioner_box_2d(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
# """Test the PositionerBox2D widget."""
|
||||
# gui = connected_client_gui_obj
|
||||
# bec = gui._client
|
||||
# # Create dock_area and widget
|
||||
# widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox2D)
|
||||
# widget: client.PositionerBox2D
|
||||
|
||||
# # Test rpc calls
|
||||
# dev = bec.device_manager.devices
|
||||
# scans = bec.scans
|
||||
# # No rpc calls to check so far
|
||||
# widget.set_positioner_hor(dev.samx)
|
||||
# widget.set_positioner_ver(dev.samy)
|
||||
|
||||
# # Try moving the motors
|
||||
# scans.mv(dev.samx, 3, relative=False).wait()
|
||||
# scans.mv(dev.samy, -3, relative=False).wait()
|
||||
|
||||
# # Test removing the widget, or leaving it open for the next test
|
||||
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
# @pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
# def test_widgets_e2e_positioner_control_line(
|
||||
# qtbot, connected_client_gui_obj, random_generator_from_seed
|
||||
# ):
|
||||
# """Test the positioner control line widget"""
|
||||
# gui = connected_client_gui_obj
|
||||
# bec = gui._client
|
||||
# # Create dock_area and widget
|
||||
# widget = create_widget(qtbot, gui, gui.available_widgets.PositionerControlLine)
|
||||
# widget: client.PositionerControlLine
|
||||
|
||||
# # Test rpc calls
|
||||
# dev = bec.device_manager.devices
|
||||
# scans = bec.scans
|
||||
# # Set positioner
|
||||
# widget.set_positioner(dev.samx)
|
||||
# scans.mv(dev.samx, 3, relative=False).wait()
|
||||
# widget.set_positioner(dev.samy.name)
|
||||
# scans.mv(dev.samy, -3, relative=False).wait()
|
||||
|
||||
# # Test removing the widget, or leaving it open for the next test
|
||||
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
@pytest.mark.repeat(20)
|
||||
@pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
def test_widgets_e2e_ring_progress_bar(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
"""Test the RingProgressBar widget"""
|
||||
@@ -461,86 +478,86 @@ def test_widgets_e2e_ring_progress_bar(qtbot, connected_client_gui_obj, random_g
|
||||
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
@pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
def test_widgets_e2e_scan_control(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
"""Test the ScanControl widget"""
|
||||
gui = connected_client_gui_obj
|
||||
bec = gui._client
|
||||
# Create dock_area and widget
|
||||
widget = create_widget(qtbot, gui, gui.available_widgets.ScanControl)
|
||||
widget: client.ScanControl
|
||||
# @pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
# def test_widgets_e2e_scan_control(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
# """Test the ScanControl widget"""
|
||||
# gui = connected_client_gui_obj
|
||||
# bec = gui._client
|
||||
# # Create dock_area and widget
|
||||
# widget = create_widget(qtbot, gui, gui.available_widgets.ScanControl)
|
||||
# widget: client.ScanControl
|
||||
|
||||
# No rpc calls to check so far
|
||||
# # No rpc calls to check so far
|
||||
|
||||
# Test removing the widget, or leaving it open for the next test
|
||||
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
# # Test removing the widget, or leaving it open for the next test
|
||||
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
@pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
def test_widgets_e2e_scatter_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
"""Test the ScatterWaveform widget"""
|
||||
gui = connected_client_gui_obj
|
||||
bec = gui._client
|
||||
# Create dock_area and widget
|
||||
widget = create_widget(qtbot, gui, gui.available_widgets.ScatterWaveform)
|
||||
widget: client.ScatterWaveform
|
||||
# @pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
# def test_widgets_e2e_scatter_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
# """Test the ScatterWaveform widget"""
|
||||
# gui = connected_client_gui_obj
|
||||
# bec = gui._client
|
||||
# # Create dock_area and widget
|
||||
# widget = create_widget(qtbot, gui, gui.available_widgets.ScatterWaveform)
|
||||
# widget: client.ScatterWaveform
|
||||
|
||||
# Test rpc calls
|
||||
dev = bec.device_manager.devices
|
||||
scans = bec.scans
|
||||
widget.plot(dev.samx, dev.samy, dev.bpm4i)
|
||||
scans.grid_scan(dev.samx, -5, 5, 5, dev.samy, -5, 5, 5, exp_time=0.01, relative=False).wait()
|
||||
# # Test rpc calls
|
||||
# dev = bec.device_manager.devices
|
||||
# scans = bec.scans
|
||||
# widget.plot(dev.samx, dev.samy, dev.bpm4i)
|
||||
# scans.grid_scan(dev.samx, -5, 5, 5, dev.samy, -5, 5, 5, exp_time=0.01, relative=False).wait()
|
||||
|
||||
# Test removing the widget, or leaving it open for the next test
|
||||
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
# # Test removing the widget, or leaving it open for the next test
|
||||
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
@pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
def test_widgets_e2e_text_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
"""Test the TextBox widget"""
|
||||
gui = connected_client_gui_obj
|
||||
bec = gui._client
|
||||
# Create dock_area and widget
|
||||
widget = create_widget(qtbot, gui, gui.available_widgets.TextBox)
|
||||
widget: client.TextBox
|
||||
# @pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
# def test_widgets_e2e_text_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
# """Test the TextBox widget"""
|
||||
# gui = connected_client_gui_obj
|
||||
# bec = gui._client
|
||||
# # Create dock_area and widget
|
||||
# widget = create_widget(qtbot, gui, gui.available_widgets.TextBox)
|
||||
# widget: client.TextBox
|
||||
|
||||
# RPC calls
|
||||
widget.set_plain_text("Hello World")
|
||||
widget.set_html_text("<b> Hello World HTML </b>")
|
||||
# # RPC calls
|
||||
# widget.set_plain_text("Hello World")
|
||||
# widget.set_html_text("<b> Hello World HTML </b>")
|
||||
|
||||
# Test removing the widget, or leaving it open for the next test
|
||||
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
# # Test removing the widget, or leaving it open for the next test
|
||||
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
|
||||
@pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
def test_widgets_e2e_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
"""Test the Waveform widget"""
|
||||
gui = connected_client_gui_obj
|
||||
bec = gui._client
|
||||
# Create dock_area and widget
|
||||
widget = create_widget(qtbot, gui, gui.available_widgets.Waveform)
|
||||
widget: client.Waveform
|
||||
# @pytest.mark.timeout(PYTEST_TIMEOUT)
|
||||
# def test_widgets_e2e_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
|
||||
# """Test the Waveform widget"""
|
||||
# gui = connected_client_gui_obj
|
||||
# bec = gui._client
|
||||
# # Create dock_area and widget
|
||||
# widget = create_widget(qtbot, gui, gui.available_widgets.Waveform)
|
||||
# widget: client.Waveform
|
||||
|
||||
# Test rpc calls
|
||||
dev = bec.device_manager.devices
|
||||
scans = bec.scans
|
||||
widget.plot(dev.bpm4i)
|
||||
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
|
||||
s.wait()
|
||||
# # Test rpc calls
|
||||
# dev = bec.device_manager.devices
|
||||
# scans = bec.scans
|
||||
# widget.plot(dev.bpm4i)
|
||||
# s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
|
||||
# s.wait()
|
||||
|
||||
def _wait_for_scan_in_history():
|
||||
# Get scan item from history
|
||||
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
|
||||
return scan_item is not None
|
||||
# def _wait_for_scan_in_history():
|
||||
# # Get scan item from history
|
||||
# scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
|
||||
# return scan_item is not None
|
||||
|
||||
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
|
||||
# qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
|
||||
|
||||
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
|
||||
samx_data = scan_item.devices.samx.samx.read()["value"]
|
||||
bpm4i_data = scan_item.devices.bpm4i.bpm4i.read()["value"]
|
||||
curve = widget.curves[0]
|
||||
assert np.allclose(curve.get_data()[0], samx_data)
|
||||
assert np.allclose(curve.get_data()[1], bpm4i_data)
|
||||
# scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
|
||||
# samx_data = scan_item.devices.samx.samx.read()["value"]
|
||||
# bpm4i_data = scan_item.devices.bpm4i.bpm4i.read()["value"]
|
||||
# curve = widget.curves[0]
|
||||
# assert np.allclose(curve.get_data()[0], samx_data)
|
||||
# assert np.allclose(curve.get_data()[1], bpm4i_data)
|
||||
|
||||
# Test removing the widget, or leaving it open for the next test
|
||||
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
# # Test removing the widget, or leaving it open for the next test
|
||||
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
|
||||
|
||||
@@ -2229,6 +2229,7 @@ class TestFlatToolbarActions:
|
||||
"flat_progress_bar",
|
||||
"flat_terminal",
|
||||
"flat_bec_shell",
|
||||
"flat_log_panel",
|
||||
"flat_sbb_monitor",
|
||||
]
|
||||
|
||||
@@ -2288,6 +2289,11 @@ class TestFlatToolbarActions:
|
||||
action.trigger()
|
||||
mock_new.assert_called_once_with(widget_type)
|
||||
|
||||
def test_flat_log_panel_action_disabled(self, advanced_dock_area):
|
||||
"""Test that flat log panel action is disabled."""
|
||||
action = advanced_dock_area.toolbar.components.get_action("flat_log_panel").action
|
||||
assert not action.isEnabled()
|
||||
|
||||
|
||||
class TestModeTransitions:
|
||||
"""Test mode transitions and state consistency."""
|
||||
|
||||
@@ -7,123 +7,163 @@ from collections import deque
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from bec_lib.logger import LogLevel
|
||||
from bec_lib.messages import LogMessage
|
||||
from bec_lib.redis_connector import StreamMessage
|
||||
from qtpy.QtCore import QDateTime
|
||||
|
||||
from bec_widgets.widgets.utility.logpanel.logpanel import LogPanel, TimestampUpdate
|
||||
from bec_widgets.widgets.utility.logpanel._util import (
|
||||
log_time,
|
||||
replace_escapes,
|
||||
simple_color_format,
|
||||
)
|
||||
from bec_widgets.widgets.utility.logpanel.logpanel import DEFAULT_LOG_COLORS, LogPanel
|
||||
|
||||
from .client_mocks import mocked_client
|
||||
|
||||
TEST_TABLE_STRING = "2025-01-15 15:57:18 | bec_server.scan_server.scan_queue | [INFO] | \n \x1b[3m primary queue / ScanQueueStatus.RUNNING \x1b[0m\n┏━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━┓\n┃\x1b[1m \x1b[0m\x1b[1m queue_id \x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mscan_id\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mis_scan\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mtype\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mscan_numb…\x1b[0m\x1b[1m \x1b[0m┃\x1b[1m \x1b[0m\x1b[1mIQ status\x1b[0m\x1b[1m \x1b[0m┃\n┡━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━┩\n│ bbe50c82-6… │ None │ False │ mv │ None │ PENDING │\n└─────────────┴─────────┴─────────┴──────┴────────────┴───────────┘\n\n"
|
||||
|
||||
TEST_LOG_MESSAGES = [
|
||||
{"data": msg}
|
||||
for msg in [
|
||||
LogMessage(
|
||||
LogMessage(
|
||||
metadata={},
|
||||
log_type="debug",
|
||||
log_msg={
|
||||
"text": "datetime | debug | test log message",
|
||||
"record": {"time": {"timestamp": 123456789.000}},
|
||||
"service_name": "ScanServer",
|
||||
},
|
||||
),
|
||||
LogMessage(
|
||||
metadata={},
|
||||
log_type="info",
|
||||
log_msg={
|
||||
"text": "datetime | info | test log message",
|
||||
"record": {"time": {"timestamp": 123456789.007}},
|
||||
"service_name": "ScanServer",
|
||||
},
|
||||
),
|
||||
LogMessage(
|
||||
metadata={},
|
||||
log_type="success",
|
||||
log_msg={
|
||||
"text": "datetime | success | test log message",
|
||||
"record": {"time": {"timestamp": 123456789.012}},
|
||||
"service_name": "ScanServer",
|
||||
},
|
||||
),
|
||||
]
|
||||
|
||||
TEST_COMBINED_PLAINTEXT = "datetime | debug | test log message\ndatetime | info | test log message\ndatetime | success | test log message\n"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def raw_queue():
|
||||
yield deque(TEST_LOG_MESSAGES, maxlen=100)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def log_panel(qtbot, mocked_client: MagicMock):
|
||||
widget = LogPanel(client=mocked_client, service_status=MagicMock())
|
||||
qtbot.addWidget(widget)
|
||||
qtbot.waitExposed(widget)
|
||||
yield widget
|
||||
|
||||
|
||||
def test_log_panel_init(log_panel: LogPanel):
|
||||
assert log_panel.plain_text == ""
|
||||
|
||||
|
||||
def test_table_string_processing():
|
||||
assert "\x1b" in TEST_TABLE_STRING
|
||||
sanitized = replace_escapes(TEST_TABLE_STRING)
|
||||
assert "\x1b" not in sanitized
|
||||
assert " " not in sanitized
|
||||
assert "\n" not in sanitized
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
["msg", "color"], zip(TEST_LOG_MESSAGES, ["#0000CC", "#FFFFFF", "#00FF00"])
|
||||
)
|
||||
def test_color_format(msg: LogMessage, color: str):
|
||||
assert color in simple_color_format(msg, DEFAULT_LOG_COLORS)
|
||||
|
||||
|
||||
def test_logpanel_output(qtbot, log_panel: LogPanel):
|
||||
log_panel._log_manager._data = deque(TEST_LOG_MESSAGES)
|
||||
log_panel._on_redraw()
|
||||
assert log_panel.plain_text == TEST_COMBINED_PLAINTEXT
|
||||
|
||||
def display_queue_empty():
|
||||
print(log_panel._log_manager._display_queue)
|
||||
return len(log_panel._log_manager._display_queue) == 0
|
||||
|
||||
next_text = "datetime | error | test log message"
|
||||
msg = LogMessage(
|
||||
metadata={},
|
||||
log_type="error",
|
||||
log_msg={"text": next_text, "record": {}, "service_name": "ScanServer"},
|
||||
)
|
||||
log_panel._log_manager._process_incoming_log_msg(
|
||||
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
|
||||
)
|
||||
|
||||
qtbot.waitUntil(display_queue_empty, timeout=5000)
|
||||
assert log_panel.plain_text == TEST_COMBINED_PLAINTEXT + next_text + "\n"
|
||||
|
||||
|
||||
def test_level_filter(log_panel: LogPanel):
|
||||
log_panel._log_manager._data = deque(TEST_LOG_MESSAGES)
|
||||
log_panel._log_manager.update_level_filter("INFO")
|
||||
log_panel._on_redraw()
|
||||
assert (
|
||||
log_panel.plain_text
|
||||
== "datetime | info | test log message\ndatetime | success | test log message\n"
|
||||
)
|
||||
|
||||
|
||||
def test_clear_button(log_panel: LogPanel):
|
||||
log_panel._log_manager._data = deque(TEST_LOG_MESSAGES)
|
||||
log_panel.toolbar.clear_button.click()
|
||||
assert log_panel._log_manager._data == deque([])
|
||||
|
||||
|
||||
def test_timestamp_filter(log_panel: LogPanel):
|
||||
log_panel._log_manager._timestamp_start = QDateTime(1973, 11, 29, 21, 33, 9, 5, 1)
|
||||
pytest.approx(log_panel._log_manager._timestamp_start.toMSecsSinceEpoch() / 1000, 123456789.005)
|
||||
log_panel._log_manager._timestamp_end = QDateTime(1973, 11, 29, 21, 33, 9, 10, 1)
|
||||
pytest.approx(log_panel._log_manager._timestamp_end.toMSecsSinceEpoch() / 1000, 123456789.010)
|
||||
filter_ = log_panel._log_manager._create_timestamp_filter()
|
||||
assert not filter_(TEST_LOG_MESSAGES[0])
|
||||
assert filter_(TEST_LOG_MESSAGES[1])
|
||||
assert not filter_(TEST_LOG_MESSAGES[2])
|
||||
|
||||
|
||||
def test_error_handling_in_callback(log_panel: LogPanel):
|
||||
log_panel._log_manager.new_message = MagicMock()
|
||||
|
||||
with patch("bec_widgets.widgets.utility.logpanel.logpanel.logger") as logger:
|
||||
# generally errors should be logged
|
||||
log_panel._log_manager.new_message.emit = MagicMock(
|
||||
side_effect=ValueError("Something went wrong")
|
||||
)
|
||||
msg = LogMessage(
|
||||
metadata={},
|
||||
log_type="debug",
|
||||
log_msg={
|
||||
"text": "datetime | debug | test log message",
|
||||
"record": {
|
||||
"time": {"timestamp": 123456789.000, "repr": "2025-01-01 00:00:01"},
|
||||
"message": "test debug message abcd",
|
||||
"function": "_debug",
|
||||
},
|
||||
"service_name": "ScanServer",
|
||||
},
|
||||
),
|
||||
LogMessage(
|
||||
metadata={},
|
||||
log_type="info",
|
||||
log_msg={
|
||||
"text": "datetime | info | test info log message",
|
||||
"record": {
|
||||
"time": {"timestamp": 123456789.007, "repr": "2025-01-01 00:00:02"},
|
||||
"message": "test info message efgh",
|
||||
"function": "_info",
|
||||
},
|
||||
"service_name": "DeviceServer",
|
||||
},
|
||||
),
|
||||
LogMessage(
|
||||
metadata={},
|
||||
log_type="success",
|
||||
log_msg={
|
||||
"text": "datetime | success | test log message",
|
||||
"record": {
|
||||
"time": {"timestamp": 123456789.012, "repr": "2025-01-01 00:00:03"},
|
||||
"message": "test success message ijkl",
|
||||
"function": "_success",
|
||||
},
|
||||
"service_name": "ScanServer",
|
||||
},
|
||||
),
|
||||
]
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def log_panel(qtbot, mocked_client):
|
||||
mocked_client.connector.xread = lambda *_, **__: TEST_LOG_MESSAGES
|
||||
widget = LogPanel()
|
||||
qtbot.addWidget(widget)
|
||||
qtbot.waitExposed(widget)
|
||||
yield widget
|
||||
widget._model.log_queue.cleanup()
|
||||
widget.close()
|
||||
widget.deleteLater()
|
||||
qtbot.wait(100)
|
||||
|
||||
|
||||
def test_log_panel_init(qtbot, log_panel: LogPanel):
|
||||
assert log_panel
|
||||
|
||||
|
||||
def test_log_panel_filters(qtbot, log_panel: LogPanel):
|
||||
assert log_panel._proxy.rowCount() == 3
|
||||
# Service filter
|
||||
log_panel._update_service_filter({"DeviceServer"})
|
||||
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 1, timeout=200)
|
||||
log_panel._update_service_filter(set())
|
||||
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 3, timeout=200)
|
||||
# Text filter
|
||||
log_panel._proxy.update_filter_text("efgh")
|
||||
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 1, timeout=200)
|
||||
log_panel._proxy.update_filter_text("")
|
||||
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 3, timeout=200)
|
||||
# Time filter
|
||||
log_panel._proxy.update_timestamp(
|
||||
TimestampUpdate(value=QDateTime.fromMSecsSinceEpoch(123456789004), update_type="start")
|
||||
)
|
||||
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 2, timeout=200)
|
||||
log_panel._proxy.update_timestamp(
|
||||
TimestampUpdate(value=QDateTime.fromMSecsSinceEpoch(123456789009), update_type="end")
|
||||
)
|
||||
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 1, timeout=200)
|
||||
log_panel._proxy.update_timestamp(TimestampUpdate(value=None, update_type="start"))
|
||||
log_panel._proxy.update_timestamp(TimestampUpdate(value=None, update_type="end"))
|
||||
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 3, timeout=200)
|
||||
# Level filter
|
||||
log_panel._proxy.update_level_filter(LogLevel.SUCCESS)
|
||||
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 1, timeout=200)
|
||||
log_panel._proxy.update_level_filter(None)
|
||||
qtbot.waitUntil(lambda: log_panel._proxy.rowCount() == 3, timeout=200)
|
||||
|
||||
|
||||
def test_log_panel_update(qtbot, log_panel: LogPanel):
|
||||
log_panel._model.log_queue._incoming.append(
|
||||
LogMessage(
|
||||
metadata={},
|
||||
log_type="error",
|
||||
log_msg={
|
||||
"text": "datetime | error | test log message",
|
||||
"record": {
|
||||
"time": {"timestamp": 123456789.015, "repr": "2025-01-01 00:00:03"},
|
||||
"message": "test error message xyz",
|
||||
"function": "_error",
|
||||
},
|
||||
"record": {"time": {"timestamp": 123456789.000}},
|
||||
"service_name": "ScanServer",
|
||||
},
|
||||
)
|
||||
)
|
||||
log_panel._model.log_queue._proc_update()
|
||||
qtbot.waitUntil(lambda: log_panel._model.rowCount() == 4, timeout=500)
|
||||
log_panel._log_manager._process_incoming_log_msg(
|
||||
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
|
||||
)
|
||||
logger.warning.assert_called_once()
|
||||
|
||||
# this specific error should be ignored and not relogged
|
||||
log_panel._log_manager.new_message.emit = MagicMock(
|
||||
side_effect=RuntimeError("Internal C++ object (BecLogsQueue) already deleted.")
|
||||
)
|
||||
log_panel._log_manager._process_incoming_log_msg(
|
||||
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
|
||||
)
|
||||
logger.warning.assert_called_once()
|
||||
|
||||
@@ -516,6 +516,112 @@ def test_plot_custom_curve_with_inline_dap(qtbot, mocked_client_with_dap):
|
||||
assert dap_curve.config.signal.dap == "GaussianModel"
|
||||
|
||||
|
||||
def test_normalize_dap_parameters_number_dict():
|
||||
normalized = Waveform._normalize_dap_parameters({"amplitude": 1.0, "center": 2})
|
||||
assert normalized == {
|
||||
"amplitude": {"name": "amplitude", "value": 1.0, "vary": False},
|
||||
"center": {"name": "center", "value": 2.0, "vary": False},
|
||||
}
|
||||
|
||||
|
||||
def test_normalize_dap_parameters_dict_spec_defaults_vary_false():
|
||||
normalized = Waveform._normalize_dap_parameters({"sigma": {"value": 0.8, "min": 0.0}})
|
||||
assert normalized["sigma"]["name"] == "sigma"
|
||||
assert normalized["sigma"]["value"] == 0.8
|
||||
assert normalized["sigma"]["min"] == 0.0
|
||||
assert normalized["sigma"]["vary"] is False
|
||||
|
||||
|
||||
def test_normalize_dap_parameters_invalid_type_raises():
|
||||
with pytest.raises(TypeError):
|
||||
Waveform._normalize_dap_parameters(["amplitude", 1.0]) # type: ignore[arg-type]
|
||||
|
||||
|
||||
def test_normalize_dap_parameters_composite_list():
|
||||
normalized = Waveform._normalize_dap_parameters(
|
||||
[{"center": 1.0}, {"sigma": {"value": 0.5, "min": 0.0}}],
|
||||
dap_name=["GaussianModel", "GaussianModel"],
|
||||
)
|
||||
assert normalized == [
|
||||
{"center": {"name": "center", "value": 1.0, "vary": False}},
|
||||
{"sigma": {"name": "sigma", "value": 0.5, "min": 0.0, "vary": False}},
|
||||
]
|
||||
|
||||
|
||||
def test_normalize_dap_parameters_composite_dict():
|
||||
normalized = Waveform._normalize_dap_parameters(
|
||||
{
|
||||
"GaussianModel": {"center": {"value": 1.0, "vary": True}},
|
||||
"LorentzModel": {"amplitude": 2.0},
|
||||
},
|
||||
dap_name=["GaussianModel", "LorentzModel"],
|
||||
)
|
||||
assert normalized["GaussianModel"]["center"]["value"] == 1.0
|
||||
assert normalized["GaussianModel"]["center"]["vary"] is True
|
||||
assert normalized["LorentzModel"]["amplitude"]["value"] == 2.0
|
||||
assert normalized["LorentzModel"]["amplitude"]["vary"] is False
|
||||
|
||||
|
||||
def test_request_dap_includes_normalized_parameters(qtbot, mocked_client_with_dap, monkeypatch):
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
|
||||
curve = wf.plot(
|
||||
x=[0, 1, 2],
|
||||
y=[1, 2, 3],
|
||||
label="custom-inline-params",
|
||||
dap="GaussianModel",
|
||||
dap_parameters={"amplitude": 1.0},
|
||||
)
|
||||
dap_curve = wf.get_curve(f"{curve.name()}-GaussianModel")
|
||||
assert dap_curve is not None
|
||||
dap_curve.dap_oversample = 3
|
||||
|
||||
captured = {}
|
||||
|
||||
def capture(topic, msg, *args, **kwargs): # noqa: ARG001
|
||||
captured["topic"] = topic
|
||||
captured["msg"] = msg
|
||||
|
||||
monkeypatch.setattr(wf.client.connector, "set_and_publish", capture)
|
||||
wf.request_dap()
|
||||
|
||||
msg = captured["msg"]
|
||||
dap_kwargs = msg.content["config"]["kwargs"]
|
||||
assert dap_kwargs["oversample"] == 3
|
||||
assert dap_kwargs["parameters"] == {
|
||||
"amplitude": {"name": "amplitude", "value": 1.0, "vary": False}
|
||||
}
|
||||
|
||||
|
||||
def test_request_dap_includes_composite_parameters_list(qtbot, mocked_client_with_dap, monkeypatch):
|
||||
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
|
||||
curve = wf.plot(
|
||||
x=[0, 1, 2],
|
||||
y=[1, 2, 3],
|
||||
label="custom-composite",
|
||||
dap=["GaussianModel", "GaussianModel"],
|
||||
dap_parameters=[{"center": 0.0}, {"center": 1.0}],
|
||||
)
|
||||
dap_curve = wf.get_curve(f"{curve.name()}-GaussianModel+GaussianModel")
|
||||
assert dap_curve is not None
|
||||
|
||||
captured = {}
|
||||
|
||||
def capture(topic, msg, *args, **kwargs): # noqa: ARG001
|
||||
captured["topic"] = topic
|
||||
captured["msg"] = msg
|
||||
|
||||
monkeypatch.setattr(wf.client.connector, "set_and_publish", capture)
|
||||
wf.request_dap()
|
||||
|
||||
msg = captured["msg"]
|
||||
dap_kwargs = msg.content["config"]["kwargs"]
|
||||
assert dap_kwargs["parameters"] == [
|
||||
{"center": {"name": "center", "value": 0.0, "vary": False}},
|
||||
{"center": {"name": "center", "value": 1.0, "vary": False}},
|
||||
]
|
||||
assert msg.content["config"]["class_kwargs"]["model"] == ["GaussianModel", "GaussianModel"]
|
||||
|
||||
|
||||
def test_fetch_scan_data_and_access(qtbot, mocked_client, monkeypatch):
|
||||
"""
|
||||
Test the _fetch_scan_data_and_access method returns live_data/val if in a live scan,
|
||||
|
||||
Reference in New Issue
Block a user