0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 11:41:49 +02:00

fix(various): repo cleanup, removed - [plot_app, one_plot, scan_plot, scan2d_plot, crosshair_example, qtplugins], tests adjusted

This commit is contained in:
wyzula-jan
2024-03-10 19:01:39 +01:00
parent 279ac03dc3
commit f3b3c2f526
24 changed files with 13 additions and 2596 deletions

View File

@ -103,7 +103,7 @@ class {class_name}(RPCBase):"""
file.write(formatted_content)
if __name__ == "__main__":
if __name__ == "__main__": # pragma: no cover
import os
# Assuming ClientGenerator is defined in this script or imported correctly

View File

@ -100,7 +100,7 @@ class BECWidgetsCLIServer:
return obj
if __name__ == "__main__":
if __name__ == "__main__": # pragma: no cover
import argparse
parser = argparse.ArgumentParser(description="BEC Widgets CLI Server")

View File

@ -1,168 +0,0 @@
import numpy as np
import pyqtgraph as pg
from qtpy.QtWidgets import (
QApplication,
QVBoxLayout,
QLabel,
QWidget,
QHBoxLayout,
QTableWidget,
QTableWidgetItem,
QSpinBox,
)
from pyqtgraph import mkPen
from pyqtgraph.Qt import QtCore
from bec_widgets.utils import Crosshair
class ExampleApp(QWidget):
def __init__(self):
super().__init__()
# Layout
self.layout = QHBoxLayout()
self.setLayout(self.layout)
##########################
# 1D Plot
##########################
# PlotWidget
self.plot_widget_1d = pg.PlotWidget(title="1D PlotWidget with multiple curves")
self.plot_item_1d = self.plot_widget_1d.getPlotItem()
self.plot_item_1d.setLogMode(True, True)
# 1D Datasets
self.x_data = np.linspace(0, 10, 1000)
def gauss(x, mu, sigma):
return (1 / (sigma * np.sqrt(2 * np.pi))) * np.exp(-0.5 * ((x - mu) / sigma) ** 2)
# same convention as in line_plot.py
self.y_value_list = [
gauss(self.x_data, 1, 1),
gauss(self.x_data, 1.5, 3),
abs(np.sin(self.x_data)),
abs(np.cos(self.x_data)),
abs(np.sin(2 * self.x_data)),
] # List of y-values for multiple curves
self.curve_names = ["Gauss(1,1)", "Gauss(1.5,3)", "Abs(Sine)", "Abs(Cosine)", "Abs(Sine2x)"]
self.curves = []
##########################
# 2D Plot
##########################
self.plot_widget_2d = pg.PlotWidget(title="2D plot with crosshair and ROI square")
self.data_2D = np.random.random((100, 200))
self.plot_item_2d = self.plot_widget_2d.getPlotItem()
self.image_item = pg.ImageItem(self.data_2D)
self.plot_item_2d.addItem(self.image_item)
##########################
# Table
##########################
self.table = QTableWidget(len(self.curve_names), 2)
self.table.setHorizontalHeaderLabels(["(X, Y) - Moved", "(X, Y) - Clicked"])
self.table.setVerticalHeaderLabels(self.curve_names)
self.table.resizeColumnsToContents()
##########################
# Spinbox for N curves
##########################
self.spin_box = QSpinBox()
self.spin_box.setMinimum(0)
self.spin_box.setMaximum(len(self.y_value_list))
self.spin_box.setValue(2)
self.spin_box.valueChanged.connect(lambda: self.update_curves(self.spin_box.value()))
##########################
# Adding widgets to layout
##########################
##### left side #####
self.column1 = QVBoxLayout()
self.layout.addLayout(self.column1)
# SpinBox
self.spin_row = QHBoxLayout()
self.column1.addLayout(self.spin_row)
self.spin_row.addWidget(QLabel("Number of curves:"))
self.spin_row.addWidget(self.spin_box)
# label
self.clicked_label_1d = QLabel("Clicked Coordinates (1D):")
self.column1.addWidget(self.clicked_label_1d)
# table
self.column1.addWidget(self.table)
# 1D plot
self.column1.addWidget(self.plot_widget_1d)
##### left side #####
self.column2 = QVBoxLayout()
self.layout.addLayout(self.column2)
# labels
self.clicked_label_2d = QLabel("Clicked Coordinates (2D):")
self.moved_label_2d = QLabel("Moved Coordinates (2D):")
self.column2.addWidget(self.clicked_label_2d)
self.column2.addWidget(self.moved_label_2d)
# 2D plot
self.column2.addWidget(self.plot_widget_2d)
self.update_curves(2) # just Gaussian curves
def hook_crosshair(self):
self.crosshair_1d = Crosshair(self.plot_item_1d, precision=10)
self.crosshair_1d.coordinatesChanged1D.connect(
lambda x, y: self.update_table(self.table, x, y, column=0)
)
self.crosshair_1d.coordinatesClicked1D.connect(
lambda x, y: self.update_table(self.table, x, y, column=1)
)
# 2D
self.crosshair_2d = Crosshair(self.plot_item_2d)
self.crosshair_2d.coordinatesChanged2D.connect(
lambda x, y: self.moved_label_2d.setText(f"Mouse Moved Coordinates (2D): x={x}, y={y}")
)
self.crosshair_2d.coordinatesClicked2D.connect(
lambda x, y: self.clicked_label_2d.setText(f"Clicked Coordinates (2D): x={x}, y={y}")
)
def update_table(self, table_widget, x, y_values, column):
"""Update the table with the new coordinates"""
for i, y in enumerate(y_values):
table_widget.setItem(i, column, QTableWidgetItem(f"({x}, {y})"))
table_widget.resizeColumnsToContents()
def update_curves(self, num_curves):
"""Update the number of curves"""
self.plot_item_1d.clear()
# Curves
color_list = ["#384c6b", "#e28a2b", "#5E3023", "#e41a1c", "#984e83", "#4daf4a"]
self.plot_item_1d.addLegend()
self.curves = []
y_value_list = self.y_value_list[:num_curves]
for ii, y_value in enumerate(y_value_list):
pen = mkPen(color=color_list[ii], width=2, style=QtCore.Qt.DashLine)
curve = pg.PlotDataItem(
self.x_data, y_value, pen=pen, skipFiniteCheck=True, name=self.curve_names[ii]
)
self.plot_item_1d.addItem(curve)
self.curves.append(curve)
self.hook_crosshair()
if __name__ == "__main__":
app = QApplication([])
window = ExampleApp()
window.show()
app.exec()

View File

@ -1,3 +0,0 @@
x_value: "samx"
y_values: ["gauss_bpm", "gauss_adc1", "gauss_adc2"]
dap_worker: "gaussian_fit_worker_3"

View File

@ -1,3 +0,0 @@
x_value: "samx"
y_values: ["gauss_bpm", "gauss_adc1", "gauss_adc2"]
dap_worker: None

View File

@ -1,271 +0,0 @@
import os
import numpy as np
import qtpy.QtWidgets
import pyqtgraph as pg
from bec_lib import MessageEndpoints
from qtpy.QtCore import Signal as pyqtSignal, Slot as pyqtSlot
from qtpy.QtWidgets import QApplication, QTableWidgetItem, QWidget
from pyqtgraph import mkBrush, mkColor, mkPen
from pyqtgraph.Qt import QtCore, uic
from bec_widgets.utils import Crosshair, ctrl_c
from bec_widgets.utils.bec_dispatcher import BECDispatcher
# TODO implement:
# - implement scanID database for visualizing previous scans
# - multiple signals for different monitors
# - change how dap is handled in bec_dispatcher to handle more workers
class PlotApp(QWidget):
"""
Main class for the PlotApp used to plot two signals from the BEC.
Attributes:
update_signal (pyqtSignal): Signal to trigger plot updates.
update_dap_signal (pyqtSignal): Signal to trigger DAP updates.
Args:
x_value (str): The x device/signal for plotting.
y_values (list of str): List of y device/signals for plotting.
dap_worker (str, optional): DAP process to specify. Set to None to disable.
parent (QWidget, optional): Parent widget.
"""
update_signal = pyqtSignal()
update_dap_signal = pyqtSignal()
def __init__(self, x_value, y_values, dap_worker=None, parent=None):
super(PlotApp, self).__init__(parent)
current_path = os.path.dirname(__file__)
uic.loadUi(os.path.join(current_path, "oneplot.ui"), self)
self.x_value = x_value
self.y_values = y_values
self.dap_worker = dap_worker
self.scanID = None
self.data_x = []
self.data_y = []
self.dap_x = np.array([])
self.dap_y = np.array([])
self.fit = None
self.init_ui()
self.init_curves()
self.hook_crosshair()
self.proxy_update_plot = pg.SignalProxy(
self.update_signal, rateLimit=25, slot=self.update_plot
)
self.proxy_update_fit = pg.SignalProxy(
self.update_dap_signal, rateLimit=25, slot=self.update_fit_table
)
def init_ui(self) -> None:
"""Initialize the UI components."""
self.plot = pg.PlotItem()
self.glw.addItem(self.plot)
self.plot.setLabel("bottom", self.x_value)
self.plot.setLabel("left", ", ".join(self.y_values))
self.plot.addLegend()
def init_curves(self) -> None:
"""Initialize curve data and properties."""
self.plot.clear()
self.curves_data = []
self.curves_dap = []
colors_y_values = PlotApp.golden_angle_color(colormap="CET-R2", num=len(self.y_values))
# colors_y_daps = PlotApp.golden_angle_color(
# colormap="CET-I2", num=len(self.dap_worker)
# ) # TODO adapt for multiple dap_workers
# Initialize curves for y_values
for ii, (signal, color) in enumerate(zip(self.y_values, colors_y_values)):
pen_curve = mkPen(color=color, width=2, style=QtCore.Qt.DashLine)
brush_curve = mkBrush(color=color)
curve_data = pg.PlotDataItem(
symbolSize=5,
symbolBrush=brush_curve,
pen=pen_curve,
skipFiniteCheck=True,
name=f"{signal}",
)
self.curves_data.append(curve_data)
self.plot.addItem(curve_data)
# Initialize curves for DAP if dap_worker is not None
if self.dap_worker is not None:
# for ii, (monitor, color) in enumerate(zip(self.dap_worker, colors_y_daps)):#TODO adapt for multiple dap_workers
pen_dap = mkPen(color="#3b5998", width=2, style=QtCore.Qt.DashLine)
curve_dap = pg.PlotDataItem(
pen=pen_dap, skipFiniteCheck=True, symbolSize=5, name=f"{self.dap_worker}"
)
self.curves_dap.append(curve_dap)
self.plot.addItem(curve_dap)
self.tableWidget_crosshair.setRowCount(len(self.y_values))
self.tableWidget_crosshair.setVerticalHeaderLabels(self.y_values)
self.hook_crosshair()
def hook_crosshair(self) -> None:
"""Attach the crosshair to the plot."""
self.crosshair_1d = Crosshair(self.plot, precision=3)
self.crosshair_1d.coordinatesChanged1D.connect(
lambda x, y: self.update_table(self.tableWidget_crosshair, x, y, column=0)
)
self.crosshair_1d.coordinatesClicked1D.connect(
lambda x, y: self.update_table(self.tableWidget_crosshair, x, y, column=1)
)
def update_table(
self, table_widget: qtpy.QtWidgets.QTableWidget, x: float, y_values: list, column: int
) -> None:
for i, y in enumerate(y_values):
table_widget.setItem(i, column, QTableWidgetItem(f"({x}, {y})"))
table_widget.resizeColumnsToContents()
def update_plot(self) -> None:
"""Update the plot data."""
for ii, curve in enumerate(self.curves_data):
curve.setData(self.data_x, self.data_y[ii])
if self.dap_worker is not None:
# for ii, curve in enumerate(self.curves_dap): #TODO adapt for multiple dap_workers
# curve.setData(self.dap_x, self.dap_y[ii])
self.curves_dap[0].setData(self.dap_x, self.dap_y)
def update_fit_table(self):
"""Update the table for fit data."""
self.tableWidget_fit.setData(self.fit)
@pyqtSlot(dict, dict)
def on_dap_update(self, msg: dict, metadata: dict) -> None:
"""
Update DAP related data.
Args:
msg (dict): Message received with data.
metadata (dict): Metadata of the DAP.
"""
# TODO adapt for multiple dap_workers
self.dap_x = msg[self.dap_worker]["x"]
self.dap_y = msg[self.dap_worker]["y"]
self.fit = metadata["fit_parameters"]
self.update_dap_signal.emit()
@pyqtSlot(dict, dict)
def on_scan_segment(self, msg: dict, metadata: dict):
"""
Handle new scan segments.
Args:
msg (dict): Message received with scan data.
metadata (dict): Metadata of the scan.
"""
current_scanID = msg["scanID"]
if current_scanID != self.scanID:
self.scanID = current_scanID
self.data_x = []
self.data_y = [[] for _ in self.y_values]
self.init_curves()
dev_x = self.x_value
data_x = msg["data"][dev_x][dev[dev_x]._hints[0]]["value"]
self.data_x.append(data_x)
for ii, dev_y in enumerate(self.y_values):
data_y = msg["data"][dev_y][dev[dev_y]._hints[0]]["value"]
self.data_y[ii].append(data_y)
self.update_signal.emit()
@staticmethod
def golden_ratio(num: int) -> list:
"""Calculate the golden ratio for a given number of angles.
Args:
num (int): Number of angles
"""
phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2)
angles = []
for ii in range(num):
x = np.cos(ii * phi)
y = np.sin(ii * phi)
angle = np.arctan2(y, x)
angles.append(angle)
return angles
@staticmethod
def golden_angle_color(colormap: str, num: int) -> list:
"""
Extract num colors for from the specified colormap following golden angle distribution.
Args:
colormap (str): Name of the colormap
num (int): Number of requested colors
Returns:
list: List of colors with length <num>
Raises:
ValueError: If the number of requested colors is greater than the number of colors in the colormap.
"""
cmap = pg.colormap.get(colormap)
cmap_colors = cmap.color
if num > len(cmap_colors):
raise ValueError(
f"Number of colors requested ({num}) is greater than the number of colors in the colormap ({len(cmap_colors)})"
)
angles = PlotApp.golden_ratio(len(cmap_colors))
color_selection = np.round(np.interp(angles, (-np.pi, np.pi), (0, len(cmap_colors))))
colors = [
mkColor(tuple((cmap_colors[int(ii)] * 255).astype(int))) for ii in color_selection[:num]
]
return colors
if __name__ == "__main__":
import yaml
with open("config_noworker.yaml", "r") as file:
config = yaml.safe_load(file)
x_value = config["x_value"]
y_values = config["y_values"]
dap_worker = config["dap_worker"]
dap_worker = None if dap_worker == "None" else dap_worker
# BECclient global variables
bec_dispatcher = BECDispatcher()
client = bec_dispatcher.client
client.start()
dev = client.device_manager.devices
scans = client.scans
queue = client.queue
app = QApplication([])
plotApp = PlotApp(x_value=x_value, y_values=y_values, dap_worker=dap_worker)
# Connecting signals from bec_dispatcher
bec_dispatcher.connect_slot(plotApp.on_dap_update, MessageEndpoints.processed_data(dap_worker))
bec_dispatcher.connect_slot(plotApp.on_scan_segment, MessageEndpoints.scan_segment())
ctrl_c.setup(app)
window = plotApp
window.show()
app.exec()

View File

@ -1,75 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>547</width>
<height>653</height>
</rect>
</property>
<property name="windowTitle">
<string>Form</string>
</property>
<layout class="QVBoxLayout" name="verticalLayout" stretch="1,2">
<item>
<layout class="QHBoxLayout" name="horizontalLayout" stretch="2,1">
<item>
<widget class="QGroupBox" name="groupBox">
<property name="title">
<string>Cursor</string>
</property>
<layout class="QGridLayout" name="gridLayout">
<item row="0" column="0">
<widget class="QTableWidget" name="tableWidget_crosshair">
<column>
<property name="text">
<string>Moved</string>
</property>
</column>
<column>
<property name="text">
<string>Clicked</string>
</property>
</column>
</widget>
</item>
</layout>
</widget>
</item>
<item>
<widget class="QGroupBox" name="groupBox_2">
<property name="title">
<string>Fit</string>
</property>
<layout class="QGridLayout" name="gridLayout_2">
<item row="0" column="0">
<widget class="TableWidget" name="tableWidget_fit"/>
</item>
</layout>
</widget>
</item>
</layout>
</item>
<item>
<widget class="GraphicsLayoutWidget" name="glw"/>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>GraphicsLayoutWidget</class>
<extends>QGraphicsView</extends>
<header>pyqtgraph.h</header>
</customwidget>
<customwidget>
<class>TableWidget</class>
<extends>QTableWidget</extends>
<header>pyqtgraph.h</header>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

View File

@ -1,130 +0,0 @@
plot_settings:
background_color: "black"
num_columns: 2
colormap: "plasma"
scan_types: False # True to show scan types
# example to use without scan_type -> only one general configuration
plot_data:
- plot_name: "BPM4i plots vs samy"
x:
label: 'Motor Y'
signals:
- name: "samy"
# entry: "samy" # here I also forgot to specify entry
y:
label: 'bpm4i'
signals:
- name: "bpm4i"
entry: "bpm4i"
# I will not specify entry, because I want to take hint from gauss_adc2
- plot_name: "BPM4i plots vs samx"
x:
label: 'Motor Y'
signals:
- name: "samy"
# entry: "samy" # here I also forgot to specify entry
y:
label: 'bpm4i'
signals:
- name: "bpm4i"
entry: "bpm4i"
# I will not specify entry, because I want to take hint from gauss_adc2
- plot_name: "MCS Channel 4 (Cyberstar) vs samx"
x:
label: 'Motor X'
signals:
- name: "samx"
entry: "samx"
y:
label: 'mcs4 cyberstar'
signals:
- name: "mcs"
entry: "mca4"
- plot_name: "MCS Channel 4 (Cyberstar) vs samy"
x:
label: 'Motor X'
signals:
- name: "samy"
entry: "samy"
y:
label: 'mcs4 cyberstar'
signals:
- name: "mcs"
entry: "mca4"
# example to use with scan_type -> different configuration for different scan types
#plot_data:
# line_scan:
# - plot_name: "BPM plot"
# x:
# label: 'Motor X'
# signals:
# - name: "samx"
# # entry: "samx"
# y:
# label: 'BPM'
# signals:
# - name: "gauss_bpm"
# entry: "gauss_bpm"
# - name: "gauss_adc1"
# entry: "gauss_adc1"
# - name: "gauss_adc2"
# entry: "gauss_adc2"
#
# - plot_name: "Multi"
# x:
# label: 'Motor X'
# signals:
# - name: "samx"
# entry: "samx"
# y:
# label: 'Multi'
# signals:
# - name: "gauss_bpm"
# entry: "gauss_bpm"
# - name: "samx"
# entry: ["samx", "samx_setpoint"]
#
# grid_scan:
# - plot_name: "Grid plot 1"
# x:
# label: 'Motor X'
# signals:
# - name: "samx"
# entry: "samx"
# y:
# label: 'BPM'
# signals:
# - name: "gauss_bpm"
# entry: "gauss_bpm"
# - name: "gauss_adc1"
# entry: "gauss_adc1"
# - plot_name: "Grid plot 2"
# x:
# label: 'Motor X'
# signals:
# - name: "samx"
# entry: "samx"
# y:
# label: 'BPM'
# signals:
# - name: "gauss_bpm"
# entry: "gauss_bpm"
# - name: "gauss_adc1"
# entry: "gauss_adc1"
#
# - plot_name: "Grid plot 3"
# x:
# label: 'Motor Y'
# signals:
# - name: "samy"
# entry: "samy"
# y:
# label: 'BPM'
# signals:
# - name: "gauss_bpm"
# entry: "gauss_bpm"

View File

@ -1,91 +0,0 @@
plot_settings:
background_color: "black"
num_columns: 2
colormap: "plasma"
scan_types: True # True to show scan types
# example to use with scan_type -> different configuration for different scan types
plot_data:
line_scan:
- plot_name: "BPM plot"
x:
label: 'Motor X'
signals:
- name: "samx"
# entry: "samx"
y:
label: 'BPM'
signals:
- name: "gauss_bpm"
entry: "gauss_bpm"
- name: "gauss_adc1"
entry: "gauss_adc1"
- name: "gauss_adc2"
entry: "gauss_adc2"
- plot_name: "Multi"
x:
label: 'Motor X'
signals:
- name: "samx"
entry: "samx"
y:
label: 'Multi'
signals:
- name: "gauss_bpm"
entry: "gauss_bpm"
- name: "samx"
entry: ["samx", "samx_setpoint"]
grid_scan:
- plot_name: "Grid plot 1"
x:
label: 'Motor X'
signals:
- name: "samx"
entry: "samx"
y:
label: 'BPM'
signals:
- name: "gauss_bpm"
entry: "gauss_bpm"
- name: "gauss_adc1"
entry: "gauss_adc1"
- plot_name: "Grid plot 2"
x:
label: 'Motor X'
signals:
- name: "samx"
entry: "samx"
y:
label: 'BPM'
signals:
- name: "gauss_bpm"
entry: "gauss_bpm"
- name: "gauss_adc1"
entry: "gauss_adc1"
- plot_name: "Grid plot 3"
x:
label: 'Motor Y'
signals:
- name: "samy"
entry: "samy"
y:
label: 'BPM'
signals:
- name: "gauss_bpm"
entry: "gauss_bpm"
- plot_name: "Grid plot 4"
x:
label: 'Motor Y'
signals:
- name: "samy"
entry: "samy"
y:
label: 'BPM'
signals:
- name: "gauss_adc3"
entry: "gauss_adc3"

View File

@ -1,730 +0,0 @@
import logging
import os
# import traceback
import pyqtgraph
import pyqtgraph as pg
from qtpy.QtCore import Signal as pyqtSignal, Slot as pyqtSlot
from qtpy.QtWidgets import (
QApplication,
QWidget,
QTableWidgetItem,
QTableWidget,
QFileDialog,
QMessageBox,
)
from pyqtgraph import ColorButton
from pyqtgraph import mkBrush, mkPen
from pyqtgraph.Qt import QtCore, uic
from pyqtgraph.Qt import QtWidgets
from bec_lib import MessageEndpoints
from bec_widgets.utils import Crosshair, Colors
from bec_widgets.utils.bec_dispatcher import BECDispatcher
# TODO implement:
# - implement scanID database for visualizing previous scans
class PlotApp(QWidget):
"""
Main class for PlotApp, designed to plot multiple signals in a grid layout
based on a flexible YAML configuration.
Attributes:
update_signal (pyqtSignal): Signal to trigger plot updates.
plot_data (list of dict): List of dictionaries containing plot configurations.
Each dictionary specifies x and y signals, including their
name and entry, for a particular plot.
Args:
config (dict): Configuration dictionary containing all settings for the plotting app.
It should include the following keys:
- 'plot_settings': Dictionary containing global plot settings.
- 'plot_data': List of dictionaries specifying the signals to plot.
parent (QWidget, optional): Parent widget.
Example:
General Plot Configuration:
{
'plot_settings': {'background_color': 'black', 'num_columns': 2, 'colormap': 'plasma', 'scan_types': False},
'plot_data': [
{
'plot_name': 'Plot A',
'x': {'label': 'X-axis', 'signals': [{'name': 'device_x', 'entry': 'entry_x'}]},
'y': {'label': 'Y-axis', 'signals': [{'name': 'device_y', 'entry': 'entry_y'}]}
}
]
}
Different Scans Mode Configuration:
{
'plot_settings': {'background_color': 'black', 'num_columns': 2, 'colormap': 'plasma', 'scan_types': True},
'plot_data': {
'scan_type_1': [
{
'plot_name': 'Plot 1',
'x': {'label': 'X-axis', 'signals': [{'name': 'device_x1', 'entry': 'entry_x1'}]},
'y': {'label': 'Y-axis', 'signals': [{'name': 'device_y1', 'entry': 'entry_y1'}]}
}
],
'scan_type_2': [
{
'plot_name': 'Plot 2',
'x': {'label': 'X-axis', 'signals': [{'name': 'device_x2', 'entry': 'entry_x2'}]},
'y': {'label': 'Y-axis', 'signals': [{'name': 'device_y2', 'entry': 'entry_y2'}]}
}
]
}
}
"""
update_signal = pyqtSignal()
update_dap_signal = pyqtSignal()
def __init__(self, config: dict, client=None, parent=None):
super(PlotApp, self).__init__(parent)
# Error handler
self.error_handler = ErrorHandler(parent=self)
# Client and device manager from BEC
self.client = BECDispatcher().client if client is None else client
self.dev = self.client.device_manager.devices
# Loading UI
current_path = os.path.dirname(__file__)
uic.loadUi(os.path.join(current_path, "plot_app.ui"), self)
self.data = {}
self.crosshairs = None
self.plots = None
self.curves_data = None
self.grid_coordinates = None
self.scanID = None
self.user_colors = {} # key: (plot_name, y_name, y_entry), value: color
# Default config
self.config = config
# Validate the configuration before proceeding
self.load_config(self.config)
# Default splitter size
self.splitter.setSizes([400, 100])
# Buttons
self.pushButton_save.clicked.connect(self.save_settings_to_yaml)
self.pushButton_load.clicked.connect(self.load_settings_from_yaml)
# Connect the update signal to the update plot method
self.proxy_update_plot = pg.SignalProxy(
self.update_signal, rateLimit=25, slot=self.update_plot
)
# Change layout of plots when the number of columns is changed in GUI
self.spinBox_N_columns.valueChanged.connect(lambda x: self.init_ui(x))
def load_config(self, config: dict) -> None:
"""
Load and validate the configuration, retrying until a valid configuration is provided or the user cancels.
Args:
config (dict): Configuration dictionary form .yaml file.
Returns:
None
"""
valid_config = False
self.error_handler.set_retry_action(self.load_settings_from_yaml)
while not valid_config:
if config is None:
self.config = (
self.load_settings_from_yaml()
) # Load config if it hasn't been loaded yet
try: # Validate loaded config file
self.error_handler.validate_config_file(config)
valid_config = True
except ValueError as e:
self.config = None # Reset config_to_test to force reloading configuration
self.config = self.error_handler.handle_error(str(e))
if valid_config is True: # Initialize config if validation succeeds
self.init_config(self.config)
def init_config(self, config: dict) -> None:
"""
Initializes or update the configuration settings for the PlotApp.
Args:
config (dict): Dictionary containing plot settings and data configurations.
"""
# YAML config
self.plot_settings = config.get("plot_settings", {})
self.plot_data_config = config.get("plot_data", {})
self.scan_types = self.plot_settings.get("scan_types", False)
if self.scan_types is False: # Device tracking mode
self.plot_data = self.plot_data_config # TODO logic has to be improved
else: # setup first line scan as default, then changed with different scan type
self.plot_data = self.plot_data_config[list(self.plot_data_config.keys())[0]]
# Setting global plot settings
self.init_plot_background(self.plot_settings["background_color"])
# Initialize the UI
self.init_ui(self.plot_settings["num_columns"])
self.spinBox_N_columns.setValue(
self.plot_settings["num_columns"]
) # TODO has to be checked if it will not setup more columns than plots
self.spinBox_N_columns.setMaximum(len(self.plot_data))
def init_plot_background(self, background_color: str) -> None:
"""
Initialize plot settings based on the background color.
Args:
background_color (str): The background color ('white' or 'black').
This method sets the background and foreground colors for pyqtgraph.
If the background is dark ('black'), the foreground will be set to 'white',
and vice versa.
"""
if background_color.lower() == "black":
pg.setConfigOption("background", "k")
pg.setConfigOption("foreground", "w")
elif background_color.lower() == "white":
pg.setConfigOption("background", "w")
pg.setConfigOption("foreground", "k")
else:
raise ValueError(
f"Invalid background color {background_color}. Allowed values are 'white' or 'black'."
)
# TODO simplify -> find way how to setup also foreground color
# if background_color.lower() not in ["black", "white"]:
# raise ValueError(
# f"Invalid background color {background_color}. Allowed values are 'white' or 'black'."
# )
# self.glw.setBackground(background_color.lower())
def init_ui(self, num_columns: int = 3) -> None:
"""
Initialize the UI components, create plots and store their grid positions.
Args:
num_columns (int): Number of columns to wrap the layout.
This method initializes a dictionary `self.plots` to store the plot objects
along with their corresponding x and y signal names. It dynamically arranges
the plots in a grid layout based on the given number of columns and dynamically
stretches the last plots to fit the remaining space.
"""
self.glw.clear()
self.plots = {}
self.grid_coordinates = []
num_plots = len(self.plot_data)
# Check if num_columns exceeds the number of plots
if num_columns >= num_plots:
num_columns = num_plots
self.plot_settings["num_columns"] = num_columns # Update the settings
print(
f"Warning: num_columns in the YAML file was greater than the number of plots. Resetting num_columns to number of plots:{num_columns}."
)
else:
self.plot_settings["num_columns"] = num_columns # Update the settings
num_rows = num_plots // num_columns
last_row_cols = num_plots % num_columns
remaining_space = num_columns - last_row_cols
for i, plot_config in enumerate(self.plot_data):
row, col = i // num_columns, i % num_columns
colspan = 1
if row == num_rows and remaining_space > 0:
if last_row_cols == 1:
colspan = num_columns
else:
colspan = remaining_space // last_row_cols + 1
remaining_space -= colspan - 1
last_row_cols -= 1
plot_name = plot_config.get("plot_name", "")
x_label = plot_config["x"].get("label", "")
y_label = plot_config["y"].get("label", "")
plot = self.glw.addPlot(row=row, col=col, colspan=colspan, title=plot_name)
plot.setLabel("bottom", x_label)
plot.setLabel("left", y_label)
plot.addLegend()
self.plots[plot_name] = plot
self.grid_coordinates.append((row, col))
self.init_curves()
def init_curves(self) -> None:
"""
Initialize curve data and properties, and update table row labels.
This method initializes a nested dictionary `self.curves_data` to store
the curve objects for each x and y signal pair. It also updates the row labels
in `self.tableWidget_crosshair` to include the grid position for each y-value.
"""
self.curves_data = {}
row_labels = []
for idx, plot_config in enumerate(self.plot_data):
plot_name = plot_config.get("plot_name", "")
plot = self.plots[plot_name]
plot.clear()
y_configs = plot_config["y"]["signals"]
colors_ys = Colors.golden_angle_color(
colormap=self.plot_settings["colormap"], num=len(y_configs)
)
curve_list = []
for i, (y_config, color) in enumerate(zip(y_configs, colors_ys)):
# print(y_config)
y_name = y_config["name"]
y_entries = y_config.get("entry", [y_name])
if not isinstance(y_entries, list):
y_entries = [y_entries]
for y_entry in y_entries:
user_color = self.user_colors.get((plot_name, y_name, y_entry), None)
color_to_use = user_color if user_color else color
pen_curve = mkPen(color=color_to_use, width=2, style=QtCore.Qt.DashLine)
brush_curve = mkBrush(color=color_to_use)
curve_data = pg.PlotDataItem(
symbolSize=5,
symbolBrush=brush_curve,
pen=pen_curve,
skipFiniteCheck=True,
name=f"{y_name} ({y_entry})",
)
curve_list.append((y_name, y_entry, curve_data))
plot.addItem(curve_data)
row_labels.append(f"{y_name} ({y_entry}) - {plot_name}")
# Create a ColorButton and set its color
color_btn = ColorButton()
color_btn.setColor(color_to_use)
color_btn.sigColorChanged.connect(
lambda btn=color_btn, plot=plot_name, yname=y_name, yentry=y_entry, curve=curve_data: self.change_curve_color(
btn, plot, yname, yentry, curve
)
)
# Add the ColorButton as a QWidget to the table
color_widget = QtWidgets.QWidget()
layout = QtWidgets.QHBoxLayout()
layout.addWidget(color_btn)
layout.setContentsMargins(0, 0, 0, 0)
color_widget.setLayout(layout)
row = len(row_labels) - 1 # The row index in the table
self.tableWidget_crosshair.setCellWidget(row, 2, color_widget)
self.curves_data[plot_name] = curve_list
self.tableWidget_crosshair.setRowCount(len(row_labels))
self.tableWidget_crosshair.setVerticalHeaderLabels(row_labels)
self.hook_crosshair()
def change_curve_color(
self,
btn: pyqtgraph.ColorButton,
plot_name: str,
y_name: str,
y_entry: str,
curve: pyqtgraph.PlotDataItem,
) -> None:
"""
Change the color of a curve and update the corresponding ColorButton.
Args:
btn (ColorButton): The ColorButton that was clicked.
plot_name (str): The name of the plot where the curve belongs.
y_name (str): The name of the y signal.
y_entry (str): The entry of the y signal.
curve (PlotDataItem): The curve to be changed.
"""
color = btn.color()
pen_curve = mkPen(color=color, width=2, style=QtCore.Qt.DashLine)
brush_curve = mkBrush(color=color)
curve.setPen(pen_curve)
curve.setSymbolBrush(brush_curve)
self.user_colors[(plot_name, y_name, y_entry)] = color
def hook_crosshair(self) -> None:
"""Attach crosshairs to each plot and connect them to the update_table method."""
self.crosshairs = {}
for plot_name, plot in self.plots.items():
crosshair = Crosshair(plot, precision=3)
crosshair.coordinatesChanged1D.connect(
lambda x, y, plot=plot: self.update_table(
self.tableWidget_crosshair, x, y, column=0, plot=plot
)
)
crosshair.coordinatesClicked1D.connect(
lambda x, y, plot=plot: self.update_table(
self.tableWidget_crosshair, x, y, column=1, plot=plot
)
)
self.crosshairs[plot_name] = crosshair
def update_table(
self, table_widget: QTableWidget, x: float, y_values: list, column: int, plot: pg.PlotItem
) -> None:
"""
Update the table with coordinates based on cursor movements and clicks.
Args:
table_widget (QTableWidget): The table to be updated.
x (float): The x-coordinate from the plot.
y_values (list): The y-coordinates from the plot.
column (int): The column in the table to be updated.
plot (PlotItem): The plot from which the coordinates are taken.
This method calculates the correct row in the table for each y-value
and updates the cell at (row, column) with the new x and y coordinates.
"""
plot_name = [name for name, value in self.plots.items() if value == plot][0]
starting_row = 0
for plot_config in self.plot_data:
if plot_config.get("plot_name", "") == plot_name:
break
for y_config in plot_config.get("y", {}).get("signals", []):
y_entries = y_config.get("entry", [y_config.get("name", "")])
if not isinstance(y_entries, list):
y_entries = [y_entries]
starting_row += len(y_entries)
for i, y in enumerate(y_values):
row = starting_row + i
table_widget.setItem(row, column, QTableWidgetItem(f"({x}, {y})"))
table_widget.resizeColumnsToContents()
def update_plot(self) -> None:
"""Update the plot data based on the stored data dictionary."""
for plot_name, curve_list in self.curves_data.items():
for y_name, y_entry, curve in curve_list:
x_config = next(
(pc["x"] for pc in self.plot_data if pc.get("plot_name") == plot_name), {}
)
x_signal_config = x_config["signals"][0]
x_name = x_signal_config.get("name", "")
x_entry = x_signal_config.get("entry", x_name)
key = (x_name, x_entry, y_name, y_entry)
data_x = self.data.get(key, {}).get("x", [])
data_y = self.data.get(key, {}).get("y", [])
curve.setData(data_x, data_y)
@pyqtSlot(dict, dict)
def on_scan_segment(
self, msg, metadata
) -> None: # TODO the logic should be separated from GUI operation
"""
Handle new scan segments and saves data to a dictionary. Linked through bec_dispatcher.
Args:
msg (dict): Message received with scan data.
metadata (dict): Metadata of the scan.
"""
current_scanID = msg.get("scanID", None)
if current_scanID is None:
return
if current_scanID != self.scanID:
if self.scan_types is False:
self.plot_data = self.plot_data_config
elif self.scan_types is True:
currentName = metadata.get("scan_name")
if currentName is None:
raise ValueError(
f"Scan name not found in metadata. Please check the scan_name in the YAML config or in bec "
f"configuration."
)
self.plot_data = self.plot_data_config.get(currentName, [])
if self.plot_data == []:
raise ValueError(
f"Scan name {currentName} not found in the YAML config. Please check the scan_name in the "
f"YAML config or in bec configuration."
)
# Init UI
self.init_ui(self.plot_settings["num_columns"])
self.spinBox_N_columns.setValue(
self.plot_settings["num_columns"]
) # TODO has to be checked if it will not setup more columns than plots
self.spinBox_N_columns.setMaximum(len(self.plot_data))
self.scanID = current_scanID
self.data = {}
self.init_curves()
for plot_config in self.plot_data:
plot_name = plot_config.get("plot_name", "Unnamed Plot")
x_config = plot_config["x"]
x_signal_config = x_config["signals"][0] # Assuming there's at least one signal for x
x_name = x_signal_config.get("name", "")
if not x_name:
raise ValueError(f"Name for x signal must be specified in plot: {plot_name}.")
x_entry_list = x_signal_config.get("entry", [])
if not x_entry_list:
x_entry_list = (
self.dev[x_name]._hints if hasattr(self.dev[x_name], "_hints") else [x_name]
)
if not isinstance(x_entry_list, list):
x_entry_list = [x_entry_list]
y_configs = plot_config["y"]["signals"]
for x_entry in x_entry_list:
for y_config in y_configs:
y_name = y_config.get("name", "")
if not y_name:
raise ValueError(
f"Name for y signal must be specified in plot: {plot_name}."
)
y_entry_list = y_config.get("entry", [])
if not y_entry_list:
y_entry_list = (
self.dev[y_name]._hints
if hasattr(self.dev[y_name], "_hints")
else [y_name]
)
if not isinstance(y_entry_list, list):
y_entry_list = [y_entry_list]
for y_entry in y_entry_list:
key = (x_name, x_entry, y_name, y_entry)
data_x = msg["data"].get(x_name, {}).get(x_entry, {}).get("value", None)
data_y = msg["data"].get(y_name, {}).get(y_entry, {}).get("value", None)
if data_x is None:
raise ValueError(
f"Incorrect entry '{x_entry}' specified for x in plot: {plot_name}, x name: {x_name}"
)
if data_y is None:
if hasattr(self.dev[y_name], "_hints"):
raise ValueError(
f"Incorrect entry '{y_entry}' specified for y in plot: {plot_name}, y name: {y_name}"
)
else:
raise ValueError(
f"No hints available for y in plot: {plot_name}, and name '{y_name}' did not work as entry"
)
if data_x is not None:
self.data.setdefault(key, {}).setdefault("x", []).append(data_x)
if data_y is not None:
self.data.setdefault(key, {}).setdefault("y", []).append(data_y)
self.update_signal.emit()
def save_settings_to_yaml(self):
"""Save the current settings to a .yaml file using a file dialog."""
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
file_path, _ = QFileDialog.getSaveFileName(
self, "Save Settings", "", "YAML Files (*.yaml);;All Files (*)", options=options
)
if file_path:
try:
if not file_path.endswith(".yaml"):
file_path += ".yaml"
with open(file_path, "w") as file:
yaml.dump(
{"plot_settings": self.plot_settings, "plot_data": self.plot_data_config},
file,
)
print(f"Settings saved to {file_path}")
except Exception as e:
print(f"An error occurred while saving the settings to {file_path}: {e}")
def load_settings_from_yaml(self) -> dict: # TODO can be replace by the utils function
"""Load settings from a .yaml file using a file dialog and update the current settings."""
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
file_path, _ = QFileDialog.getOpenFileName(
self, "Load Settings", "", "YAML Files (*.yaml);;All Files (*)", options=options
)
if file_path:
try:
with open(file_path, "r") as file:
self.config = yaml.safe_load(file)
self.load_config(self.config) # validate new config
return config
except FileNotFoundError:
print(f"The file {file_path} was not found.")
except Exception as e:
print(f"An error occurred while loading the settings from {file_path}: {e}")
return None # Return None on exception to indicate failure
class ErrorHandler:
def __init__(self, parent=None):
self.parent = parent
self.errors = []
self.retry_action = None
logging.basicConfig(level=logging.ERROR) # Configure logging
def set_retry_action(self, action):
self.retry_action = action # Store a reference to the retry action
def handle_error(self, error_message: str):
# logging.error(f"{error_message}\n{traceback.format_exc()}") #TODO decide if useful
choice = QMessageBox.critical(
self.parent,
"Error",
f"{error_message}\n\nWould you like to retry?",
QMessageBox.Retry | QMessageBox.Cancel,
)
if choice == QMessageBox.Retry and self.retry_action is not None:
return self.retry_action()
else:
exit(1) # Exit the program if the user selects Cancel or if no retry_action is provided
def validate_config_file(self, config: dict) -> None:
"""
Validate the configuration dictionary.
Args:
config (dict): Configuration dictionary form .yaml file.
Returns:
None
"""
self.errors = []
# Validate common keys
required_top_level_keys = ["plot_settings", "plot_data"]
for key in required_top_level_keys:
if key not in config:
self.errors.append(f"Missing required key: {key}")
# Only continue if no errors so far
if not self.errors:
# Determine the configuration mode (device or scan)
plot_settings = config.get("plot_settings", {})
scan_types = plot_settings.get("scan_types", False)
plot_data = config.get("plot_data", [])
if scan_types:
# Validate scan mode configuration
for scan_type, plots in plot_data.items():
for i, plot_config in enumerate(plots):
self.validate_plot_config(plot_config, i)
else:
# Validate device mode configuration
for i, plot_config in enumerate(plot_data):
self.validate_plot_config(plot_config, i)
if self.errors != []:
self.handle_error("\n".join(self.errors))
def validate_plot_config(self, plot_config: dict, i: int):
"""
Validate individual plot configuration.
Args:
plot_config (dict): Individual plot configuration.
i (int): Index of the plot configuration.
Returns:
None
"""
for axis in ["x", "y"]:
axis_config = plot_config.get(axis)
plot_name = plot_config.get("plot_name", "")
if axis_config is None:
error_msg = f"Missing '{axis}' configuration in plot {i} - {plot_name}"
logging.error(error_msg) # Log the error
self.errors.append(error_msg)
signals_config = axis_config.get("signals")
if signals_config is None:
error_msg = (
f"Missing 'signals' configuration for {axis} axis in plot {i} - '{plot_name}'"
)
logging.error(error_msg) # Log the error
self.errors.append(error_msg)
elif not isinstance(signals_config, list) or len(signals_config) == 0:
error_msg = (
f"'signals' configuration for {axis} axis in plot {i} must be a non-empty list"
)
logging.error(error_msg) # Log the error
self.errors.append(error_msg)
# TODO add condition for name and entry
if __name__ == "__main__":
import yaml
import argparse
# from bec_widgets import ctrl_c
parser = argparse.ArgumentParser(description="Plotting App")
parser.add_argument(
"--config",
"-c",
help="Path to the .yaml configuration file",
default="config_example.yaml",
)
args = parser.parse_args()
try:
with open(args.config, "r") as file:
config = yaml.safe_load(file)
except FileNotFoundError:
print(f"The file {args.config} was not found.")
exit(1)
except Exception as e:
print(f"An error occurred while loading the config file: {e}")
exit(1)
# BECclient global variables
bec_dispatcher = BECDispatcher()
client = bec_dispatcher.client
client.start()
app = QApplication([])
plotApp = PlotApp(config=config, client=client)
# Connecting signals from bec_dispatcher
bec_dispatcher.connect_slot(plotApp.on_scan_segment, MessageEndpoints.scan_segment())
# ctrl_c.setup(app)
window = plotApp
window.show()
app.exec()

View File

@ -1,115 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>MultiWindow</class>
<widget class="QWidget" name="MultiWindow">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>1248</width>
<height>564</height>
</rect>
</property>
<property name="windowTitle">
<string>MultiWindow</string>
</property>
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<widget class="QSplitter" name="splitter">
<property name="orientation">
<enum>Qt::Horizontal</enum>
</property>
<widget class="GraphicsLayoutWidget" name="glw"/>
<widget class="QWidget" name="">
<layout class="QGridLayout" name="gridLayout_2">
<item row="1" column="0">
<spacer name="horizontalSpacer">
<property name="orientation">
<enum>Qt::Horizontal</enum>
</property>
<property name="sizeHint" stdset="0">
<size>
<width>40</width>
<height>20</height>
</size>
</property>
</spacer>
</item>
<item row="2" column="0" colspan="3">
<widget class="QGroupBox" name="groupBox">
<property name="title">
<string>Cursor</string>
</property>
<layout class="QGridLayout" name="gridLayout">
<item row="0" column="0">
<widget class="QTableWidget" name="tableWidget_crosshair">
<column>
<property name="text">
<string>Moved</string>
</property>
</column>
<column>
<property name="text">
<string>Clicked</string>
</property>
</column>
<column>
<property name="text">
<string>Color</string>
</property>
</column>
</widget>
</item>
</layout>
</widget>
</item>
<item row="1" column="1">
<widget class="QLabel" name="label">
<property name="text">
<string>Number of Columns:</string>
</property>
</widget>
</item>
<item row="1" column="2">
<widget class="QSpinBox" name="spinBox_N_columns">
<property name="minimum">
<number>1</number>
</property>
<property name="maximum">
<number>10</number>
</property>
<property name="value">
<number>3</number>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="QPushButton" name="pushButton_load">
<property name="text">
<string>Load Config</string>
</property>
</widget>
</item>
<item row="0" column="2">
<widget class="QPushButton" name="pushButton_save">
<property name="text">
<string>Save Config</string>
</property>
</widget>
</item>
</layout>
</widget>
</widget>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>GraphicsLayoutWidget</class>
<extends>QGraphicsView</extends>
<header>pyqtgraph.h</header>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

View File

@ -1,56 +0,0 @@
from qtpy.QtDesigner import QPyDesignerCustomWidgetPlugin
from qtpy.QtGui import QIcon
from bec_widgets.widgets.scan_plot.scan2d_plot import BECScanPlot2D
class BECScanPlot2DPlugin(QPyDesignerCustomWidgetPlugin):
def __init__(self, parent=None):
super().__init__(parent)
self._initialized = False
def initialize(self, formEditor):
if self._initialized:
return
self._initialized = True
def isInitialized(self):
return self._initialized
def createWidget(self, parent):
return BECScanPlot2D(parent)
def name(self):
return "BECScanPlot2D"
def group(self):
return "BEC widgets"
def icon(self):
return QIcon()
def toolTip(self):
return "BEC plot for 2D scans"
def whatsThis(self):
return "BEC plot for 2D scans"
def isContainer(self):
return False
def domXml(self):
return (
'<widget class="BECScanPlot2D" name="BECScanPlot2D">\n'
' <property name="toolTip" >\n'
" <string>BEC plot for 2D scans</string>\n"
" </property>\n"
' <property name="whatsThis" >\n'
" <string>BEC plot for 2D scans in Python using PyQt.</string>\n"
" </property>\n"
"</widget>\n"
)
def includeFile(self):
return "scan2d_plot"

View File

@ -1,56 +0,0 @@
from qtpy.QtDesigner import QPyDesignerCustomWidgetPlugin
from qtpy.QtGui import QIcon
from bec_widgets.widgets.scan_plot.scan_plot import BECScanPlot
class BECScanPlotPlugin(QPyDesignerCustomWidgetPlugin):
def __init__(self, parent=None):
super().__init__(parent)
self._initialized = False
def initialize(self, formEditor):
if self._initialized:
return
self._initialized = True
def isInitialized(self):
return self._initialized
def createWidget(self, parent):
return BECScanPlot(parent)
def name(self):
return "BECScanPlot"
def group(self):
return "BEC widgets"
def icon(self):
return QIcon()
def toolTip(self):
return "BEC plot for scans"
def whatsThis(self):
return "BEC plot for scans"
def isContainer(self):
return False
def domXml(self):
return (
'<widget class="BECScanPlot" name="BECScanPlot">\n'
' <property name="toolTip" >\n'
" <string>BEC plot for scans</string>\n"
" </property>\n"
' <property name="whatsThis" >\n'
" <string>BEC plot for scans in Python using PyQt.</string>\n"
" </property>\n"
"</widget>\n"
)
def includeFile(self):
return "scan_plot"

View File

@ -1,152 +0,0 @@
from threading import RLock
import numpy as np
import pyqtgraph as pg
from bec_lib import MessageEndpoints
from bec_lib.logger import bec_logger
from qtpy.QtCore import Property as pyqtProperty, Slot as pyqtSlot
from bec_widgets.utils.bec_dispatcher import BECDispatcher
logger = bec_logger.logger
pg.setConfigOptions(background="w", foreground="k", antialias=True)
class BECScanPlot2D(pg.GraphicsView):
def __init__(self, parent=None, background="default"):
super().__init__(parent, background)
BECDispatcher().connect_slot(self.on_scan_segment, MessageEndpoints.scan_segment())
self._scanID = None
self._scanID_lock = RLock()
self._x_channel = ""
self._y_channel = ""
self._z_channel = ""
self._xpos = []
self._ypos = []
self._x_ind = None
self._y_ind = None
self.plot_item = pg.PlotItem()
self.setCentralItem(self.plot_item)
self.plot_item.setAspectLocked(True)
self.imageItem = pg.ImageItem()
self.plot_item.addItem(self.imageItem)
def reset_plots(self, _scan_segment, metadata):
# TODO: Do we reset in case of a scan type change?
self.imageItem.clear()
# TODO: better to check the number of coordinates in metadata["positions"]?
if metadata["scan_name"] != "grid_scan":
return
positions = [sorted(set(pos)) for pos in zip(*metadata["positions"])]
motors = metadata["scan_motors"]
if self.x_channel and self.y_channel:
self._x_ind = motors.index(self.x_channel) if self.x_channel in motors else None
self._y_ind = motors.index(self.y_channel) if self.y_channel in motors else None
elif not self.x_channel and not self.y_channel:
# Plot the first and second motors along x and y axes respectively
self._x_ind = 0
self._y_ind = 1
else:
logger.warning(
f"X and Y channels should be either both empty or both set in {self.objectName()}"
)
if self._x_ind is None or self._y_ind is None:
return
xpos = positions[self._x_ind]
ypos = positions[self._y_ind]
self._xpos = xpos
self._ypos = ypos
self.imageItem.setImage(np.zeros(shape=(len(xpos), len(ypos))))
w = max(xpos) - min(xpos)
h = max(ypos) - min(ypos)
w_pix = w / (len(xpos) - 1)
h_pix = h / (len(ypos) - 1)
self.imageItem.setRect(min(xpos) - w_pix / 2, min(ypos) - h_pix / 2, w + w_pix, h + h_pix)
self.plot_item.setLabel("bottom", motors[self._x_ind])
self.plot_item.setLabel("left", motors[self._y_ind])
@pyqtSlot(dict, dict)
def on_scan_segment(self, scan_segment, metadata):
# reset plots on scanID change
with self._scanID_lock:
scan_id = scan_segment["scanID"]
if self._scanID != scan_id:
self._scanID = scan_id
self.reset_plots(scan_segment, metadata)
if not self.z_channel or metadata["scan_name"] != "grid_scan":
return
if self._x_ind is None or self._y_ind is None:
return
point_coord = metadata["positions"][scan_segment["point_id"]]
x_coord_ind = self._xpos.index(point_coord[self._x_ind])
y_coord_ind = self._ypos.index(point_coord[self._y_ind])
data = scan_segment["data"]
z_new = data[self.z_channel][self.z_channel]["value"]
image = self.imageItem.image
image[x_coord_ind, y_coord_ind] = z_new
self.imageItem.setImage()
@pyqtProperty(str)
def x_channel(self):
return self._x_channel
@x_channel.setter
def x_channel(self, new_val):
self._x_channel = new_val
self.plot_item.setLabel("bottom", new_val)
@pyqtProperty(str)
def y_channel(self):
return self._y_channel
@y_channel.setter
def y_channel(self, new_val):
self._y_channel = new_val
self.plot_item.setLabel("left", new_val)
@pyqtProperty(str)
def z_channel(self):
return self._z_channel
@z_channel.setter
def z_channel(self, new_val):
self._z_channel = new_val
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
plot = BECScanPlot2D()
# If x_channel and y_channel are both omitted, they will be inferred from each running grid scan
plot.z_channel = "bpm3y"
plot.show()
sys.exit(app.exec())

View File

@ -1,152 +0,0 @@
import itertools
from threading import RLock
import pyqtgraph as pg
from bec_lib import MessageEndpoints
from bec_lib.logger import bec_logger
from qtpy.QtCore import Property as pyqtProperty, Slot as pyqtSlot
from bec_widgets.utils.bec_dispatcher import BECDispatcher
logger = bec_logger.logger
pg.setConfigOptions(background="w", foreground="k", antialias=True)
COLORS = ["#fd7f6f", "#7eb0d5", "#b2e061", "#bd7ebe", "#ffb55a"]
class BECScanPlot(pg.GraphicsView):
def __init__(self, parent=None, background="default"):
super().__init__(parent, background)
BECDispatcher().connect_slot(self.on_scan_segment, MessageEndpoints.scan_segment())
self.view = pg.PlotItem()
self.setCentralItem(self.view)
self._scanID = None
self._scanID_lock = RLock()
self._x_channel = ""
self._y_channel_list = []
self.scan_curves = {}
self.dap_curves = {}
def reset_plots(self, _scan_segment, _metadata):
for plot_curve in {**self.scan_curves, **self.dap_curves}.values():
plot_curve.setData(x=[], y=[])
@pyqtSlot(dict, dict)
def on_scan_segment(self, scan_segment, metadata):
# reset plots on scanID change
with self._scanID_lock:
scan_id = scan_segment["scanID"]
if self._scanID != scan_id:
self._scanID = scan_id
self.reset_plots(scan_segment, metadata)
if not self.x_channel:
return
data = scan_segment["data"]
if self.x_channel not in data:
logger.warning(f"Unknown channel `{self.x_channel}` for X data in {self.objectName()}")
return
x_new = data[self.x_channel][self.x_channel]["value"]
for chan, plot_curve in self.scan_curves.items():
if not chan:
continue
if chan not in data:
logger.warning(f"Unknown channel `{chan}` for Y data in {self.objectName()}")
continue
y_new = data[chan][chan]["value"]
x, y = plot_curve.getData() # TODO: is it a good approach?
if x is None:
x = []
if y is None:
y = []
plot_curve.setData(x=[*x, x_new], y=[*y, y_new])
@pyqtSlot(dict, dict)
def redraw_dap(self, content, _metadata):
data = content["data"]
for chan, plot_curve in self.dap_curves.items():
if not chan:
continue
if chan not in data:
logger.warning(f"Unknown channel `{chan}` for DAP data in {self.objectName()}")
continue
x_new = data[chan]["x"]
y_new = data[chan]["y"]
plot_curve.setData(x=x_new, y=y_new)
@pyqtProperty("QStringList")
def y_channel_list(self):
return self._y_channel_list
@y_channel_list.setter
def y_channel_list(self, new_list):
bec_dispatcher = BECDispatcher()
# TODO: do we want to care about dap/not dap here?
chan_removed = [chan for chan in self._y_channel_list if chan not in new_list]
if chan_removed and chan_removed[0].startswith("dap."):
chan_removed = chan_removed[0].partition("dap.")[-1]
chan_removed_ep = MessageEndpoints.processed_data(chan_removed)
bec_dispatcher.disconnect_slot(self.redraw_dap, chan_removed_ep)
self._y_channel_list = new_list
# Prepare plot for a potentially different list of y channels
self.view.clear()
self.view.addLegend()
colors = itertools.cycle(COLORS)
for y_chan in new_list:
if y_chan.startswith("dap."):
y_chan = y_chan.partition("dap.")[-1]
curves = self.dap_curves
y_chan_ep = MessageEndpoints.processed_data(y_chan)
bec_dispatcher.connect_slot(self.redraw_dap, y_chan_ep)
else:
curves = self.scan_curves
curves[y_chan] = self.view.plot(
x=[], y=[], pen=pg.mkPen(color=next(colors), width=2), name=y_chan
)
if len(new_list) == 1:
self.view.setLabel("left", new_list[0])
@pyqtProperty(str)
def x_channel(self):
return self._x_channel
@x_channel.setter
def x_channel(self, new_val):
self._x_channel = new_val
self.view.setLabel("bottom", new_val)
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
plot = BECScanPlot()
plot.x_channel = "samx"
plot.y_channel_list = ["bpm3y", "bpm6y"]
plot.show()
sys.exit(app.exec())

View File

@ -206,7 +206,7 @@ def test_connect_one_slot_multiple_topics_single_callback(bec_dispatcher, consum
# Simulate messages being published on each topic
for topic in topics:
msg_with_topic = MessageObject(
topic=topic, value=ScanMessage(point_id=0, scanID=0, data={}).dumps()
topic=topic, value=ScanMessage(point_id=0, scanID=0, data={})
)
consumer.register.call_args.kwargs["cb"](msg_with_topic)

View File

@ -179,16 +179,16 @@ def test_remove_plot_by_providing_nothing(bec_figure):
assert "Must provide either widget_id or coordinates for removal." in str(excinfo.value)
def test_change_theme(bec_figure):
bec_figure.change_theme("dark")
assert bec_figure.config.theme == "dark"
assert bec_figure.backgroundBrush().color().name() == "#000000"
bec_figure.change_theme("light")
assert bec_figure.config.theme == "light"
assert bec_figure.backgroundBrush().color().name() == "#ffffff"
bec_figure.change_theme("dark")
assert bec_figure.config.theme == "dark"
assert bec_figure.backgroundBrush().color().name() == "#000000"
# def test_change_theme(bec_figure): #TODO do no work at python 3.12
# bec_figure.change_theme("dark")
# assert bec_figure.config.theme == "dark"
# assert bec_figure.backgroundBrush().color().name() == "#000000"
# bec_figure.change_theme("light")
# assert bec_figure.config.theme == "light"
# assert bec_figure.backgroundBrush().color().name() == "#ffffff"
# bec_figure.change_theme("dark")
# assert bec_figure.config.theme == "dark"
# assert bec_figure.backgroundBrush().color().name() == "#000000"
def test_change_layout(bec_figure):

View File

@ -1,489 +0,0 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import unittest
from unittest.mock import MagicMock, patch
import pyqtgraph as pg
import pytest
from qtpy.QtWidgets import QMessageBox
from bec_widgets.examples.plot_app.plot_app import PlotApp, ErrorHandler
def setup_plot_app(qtbot, config):
"""Helper function to set up the PlotApp widget."""
client = MagicMock()
widget = PlotApp(config=config, client=client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
return widget
@pytest.fixture
def error_handler():
# TODO so far tested separately, but the error message scenarios can be tested directly in the plot app
return ErrorHandler()
config_device_mode_all_filled = {
"plot_settings": {
"background_color": "black",
"num_columns": 2,
"colormap": "plasma",
"scan_types": False,
},
"plot_data": [
{
"plot_name": "BPM4i plots vs samx",
"x": {
"label": "Motor Y",
"signals": [{"name": "samx", "entry": "samx"}],
},
"y": {
"label": "bpm4i",
"signals": [{"name": "bpm4i", "entry": "bpm4i"}],
},
},
{
"plot_name": "Gauss plots vs samx",
"x": {
"label": "Motor X",
"signals": [{"name": "samx", "entry": "samx"}],
},
"y": {
"label": "Gauss",
"signals": [{"name": "gauss_bpm", "entry": "gauss_bpm"}],
},
},
],
}
config_device_mode_no_entry = {
"plot_settings": {
"background_color": "white",
"num_columns": 1,
"colormap": "plasma",
"scan_types": False,
},
"plot_data": [
{
"plot_name": "BPM4i plots vs samx",
"x": {
"label": "Motor Y",
"signals": [{"name": "samx"}], # Entry is missing
},
"y": {
"label": "bpm4i",
"signals": [{"name": "bpm4i"}], # Entry is missing
},
},
{
"plot_name": "Gauss plots vs samx",
"x": {
"label": "Motor X",
"signals": [{"name": "samx"}], # Entry is missing
},
"y": {
"label": "Gauss",
"signals": [{"name": "gauss_bpm"}], # Entry is missing
},
},
],
}
config_scan_mode = config = {
"plot_settings": {
"background_color": "white",
"num_columns": 3,
"colormap": "plasma",
"scan_types": True,
},
"plot_data": {
"grid_scan": [
{
"plot_name": "Grid plot 1",
"x": {"label": "Motor X", "signals": [{"name": "samx", "entry": "samx"}]},
"y": {
"label": "BPM",
"signals": [
{"name": "gauss_bpm", "entry": "gauss_bpm"},
{"name": "gauss_adc1", "entry": "gauss_adc1"},
],
},
},
{
"plot_name": "Grid plot 2",
"x": {"label": "Motor X", "signals": [{"name": "samx", "entry": "samx"}]},
"y": {
"label": "BPM",
"signals": [
{"name": "gauss_bpm", "entry": "gauss_bpm"},
{"name": "gauss_adc1", "entry": "gauss_adc1"},
],
},
},
{
"plot_name": "Grid plot 3",
"x": {"label": "Motor Y", "signals": [{"name": "samx", "entry": "samx"}]},
"y": {
"label": "BPM",
"signals": [{"name": "gauss_bpm", "entry": "gauss_bpm"}],
},
},
{
"plot_name": "Grid plot 4",
"x": {"label": "Motor Y", "signals": [{"name": "samx", "entry": "samx"}]},
"y": {
"label": "BPM",
"signals": [{"name": "gauss_adc3", "entry": "gauss_adc3"}],
},
},
],
"line_scan": [
{
"plot_name": "BPM plot",
"x": {"label": "Motor X", "signals": [{"name": "samx"}]},
"y": {
"label": "BPM",
"signals": [
{"name": "gauss_bpm", "entry": "gauss_bpm"},
{"name": "gauss_adc1", "entry": "gauss_adc1"},
{"name": "gauss_adc2", "entry": "gauss_adc2"},
],
},
},
{
"plot_name": "Multi",
"x": {"label": "Motor X", "signals": [{"name": "samx", "entry": "samx"}]},
"y": {
"label": "Multi",
"signals": [
{"name": "gauss_bpm", "entry": "gauss_bpm"},
{"name": "samx", "entry": ["samx", "samx_setpoint"]},
],
},
},
{
"plot_name": "Multi",
"x": {"label": "Motor X", "signals": [{"name": "samx", "entry": "samx"}]},
"y": {
"label": "Multi",
"signals": [
{"name": "gauss_bpm", "entry": "gauss_bpm"},
{"name": "samx", "entry": ["samx", "samx_setpoint"]},
],
},
},
],
},
}
config_all_wrong = {
"plot_settings": {
"background_color": "white",
"num_columns": 1,
"colormap": "plasma",
"scan_types": False,
},
"plot_data": [
{
"plot_name": "BPM4i plots vs samx",
"x": {
"label": "Motor Y",
# signals are missing
},
"y": {
"label": "bpm4i",
"signals": [{"name": "bpm4i", "entry": "gauss_bpm"}], # wrong entry
},
},
],
}
@pytest.mark.parametrize(
"config, plot_setting_bg, num_plot ,pg_background",
[
(config_device_mode_all_filled, "black", 2, "k"),
(config_device_mode_no_entry, "white", 2, "w"),
(config_scan_mode, "white", 4, "w"),
],
)
def test_init_config(qtbot, config, plot_setting_bg, num_plot, pg_background):
plot_app = setup_plot_app(qtbot, config)
assert plot_app.plot_settings["background_color"] == plot_setting_bg
assert len(plot_app.plot_data) == num_plot
assert pg.getConfigOption("background") == pg_background
@pytest.mark.parametrize(
"config, num_columns_input, expected_num_columns, expected_plot_names, expected_coordinates",
[
(
config_device_mode_all_filled,
2,
2,
["BPM4i plots vs samx", "Gauss plots vs samx"],
[(0, 0), (0, 1)],
),
(
config_device_mode_all_filled,
5,
2,
["BPM4i plots vs samx", "Gauss plots vs samx"],
[(0, 0), (0, 1)],
), # num_columns greater than number of plots
(
config_device_mode_no_entry,
1,
1,
["BPM4i plots vs samx", "Gauss plots vs samx"],
[(0, 0), (1, 0)],
),
(
config_device_mode_no_entry,
2,
2,
["BPM4i plots vs samx", "Gauss plots vs samx"],
[(0, 0), (0, 1)],
),
(
config_device_mode_no_entry,
5,
2,
["BPM4i plots vs samx", "Gauss plots vs samx"],
[(0, 0), (0, 1)],
), # num_columns greater than number of plots,
(
config_scan_mode,
2,
2,
[
"Grid plot 1",
"Grid plot 2",
"Grid plot 3",
"Grid plot 4",
],
[(0, 0), (0, 1), (1, 0), (1, 1)],
),
(
config_scan_mode,
3,
3,
[
"Grid plot 1",
"Grid plot 2",
"Grid plot 3",
"Grid plot 4",
],
[(0, 0), (0, 1), (0, 2), (1, 0)],
),
(
config_scan_mode,
5,
4,
[
"Grid plot 1",
"Grid plot 2",
"Grid plot 3",
"Grid plot 4",
],
[(0, 0), (0, 1), (0, 2), (0, 3)],
), # num_columns greater than number of plots
],
)
def test_init_ui(
qtbot,
config,
num_columns_input,
expected_num_columns,
expected_plot_names,
expected_coordinates,
):
plot_app = setup_plot_app(qtbot, config)
plot_app.init_ui(num_columns_input)
# Validate number of columns
assert plot_app.plot_settings["num_columns"] == expected_num_columns
# Validate the plots are created correctly
for expected_name in expected_plot_names:
assert expected_name in plot_app.plots.keys()
# Validate the grid_coordinates
assert plot_app.grid_coordinates == expected_coordinates
def mock_getitem(dev_name):
"""Helper function to mock the __getitem__ method of the 'dev' object.""" ""
mock_instance = MagicMock()
if dev_name == "samx":
mock_instance._hints = "samx"
elif dev_name == "bpm4i":
mock_instance._hints = "bpm4i"
elif dev_name == "gauss_bpm":
mock_instance._hints = "gauss_bpm"
return mock_instance
@pytest.mark.parametrize(
"config, msg, metadata, expected_data",
[
# Case: msg does not have 'scanID'
(config_device_mode_all_filled, {"data": {}}, {}, {}),
# Case: scan_types is False, msg contains all valid fields, and entry is present in config
(
config_device_mode_all_filled,
{
"data": {
"samx": {"samx": {"value": 10}},
"bpm4i": {"bpm4i": {"value": 5}},
"gauss_bpm": {"gauss_bpm": {"value": 7}},
},
"scanID": 1,
},
{},
{
("samx", "samx", "bpm4i", "bpm4i"): {"x": [10], "y": [5]},
("samx", "samx", "gauss_bpm", "gauss_bpm"): {"x": [10], "y": [7]},
},
),
# Case: scan_types is False, msg contains all valid fields and entry is missing in config, should use hints
(
config_device_mode_no_entry,
{
"data": {
"samx": {"samx": {"value": 10}},
"bpm4i": {"bpm4i": {"value": 5}},
"gauss_bpm": {"gauss_bpm": {"value": 7}},
},
"scanID": 1,
},
{},
{
("samx", "samx", "bpm4i", "bpm4i"): {"x": [10], "y": [5]},
("samx", "samx", "gauss_bpm", "gauss_bpm"): {"x": [10], "y": [7]},
},
),
],
)
def test_on_scan_segment(qtbot, config, msg, metadata, expected_data):
plot_app = setup_plot_app(qtbot, config)
# Initialize and run test
plot_app.data = {}
plot_app.scanID = 0
# Get hints
plot_app.dev.__getitem__.side_effect = mock_getitem
plot_app.on_scan_segment(msg, metadata)
assert plot_app.data == expected_data
@pytest.mark.parametrize(
"config, msg, metadata, expected_exception_message",
[
# Case: scan_types is True, but metadata does not contain 'scan_name'
(
config_scan_mode,
{"data": {}, "scanID": 1},
{}, # No 'scan_name' in metadata
"Scan name not found in metadata. Please check the scan_name in the YAML config or in bec configuration.",
),
# Case: scan_types is True, metadata contains non-existing 'scan_name'
(
config_scan_mode,
{"data": {}, "scanID": 1},
{"scan_name": "non_existing_scan"},
"Scan name non_existing_scan not found in the YAML config. Please check the scan_name in the YAML config "
"or in bec configuration.",
),
],
)
def test_on_scan_message_error_handling(qtbot, config, msg, metadata, expected_exception_message):
plot_app = setup_plot_app(qtbot, config)
# Initialize
plot_app.init_curves = MagicMock()
plot_app.data = {}
plot_app.scanID = 0
plot_app.dev.__getitem__.side_effect = mock_getitem
with pytest.raises(ValueError) as exc_info:
plot_app.on_scan_segment(msg, metadata)
assert str(exc_info.value) == expected_exception_message
####################
# ErrorHandler tests
####################
def test_initialization(error_handler):
assert error_handler.errors == []
assert error_handler.parent is None
assert error_handler.retry_action is None
@patch(
"bec_widgets.examples.plot_app.plot_app.QMessageBox.critical", return_value=QMessageBox.Retry
)
def test_handle_error_retry(mocked_critical, error_handler):
retry_action = MagicMock()
error_handler.set_retry_action(retry_action)
error_handler.handle_error("error message")
retry_action.assert_called_once()
@patch(
"bec_widgets.examples.plot_app.plot_app.QMessageBox.critical", return_value=QMessageBox.Cancel
)
def test_handle_error_cancel(mocked_critical, error_handler):
retry_action = MagicMock()
with pytest.raises(SystemExit) as excinfo:
error_handler.handle_error("error message")
assert excinfo.value.code == 1
retry_action.assert_not_called()
@pytest.mark.parametrize(
"config, expected_errors",
[
(config_device_mode_all_filled, []),
(config_device_mode_no_entry, []),
(config_scan_mode, []),
(
config_all_wrong,
["Missing 'signals' configuration for x axis in plot 0 - 'BPM4i plots vs samx'"],
),
],
)
def test_error_handler(error_handler, config, expected_errors):
# Mock QMessageBox
error_handler.handle_error = MagicMock()
# Mock logging
with unittest.mock.patch("bec_widgets.examples.plot_app.plot_app.logging") as mocked_logging:
error_handler.validate_config_file(config)
# Assert
assert error_handler.errors == expected_errors
# If there are expected errors, check if handle_error was called
if expected_errors:
error_handler.handle_error.assert_called_once()
mocked_logging.error.assert_called()
else:
mocked_logging.error.assert_not_called()
error_handler.handle_error.assert_not_called()
def test_validate_plot_config(error_handler):
plot_config = {
"x": {"label": "Motor X", "signals": []}, # empty signals list should trigger an error
"y": {"label": "Motor Y", "signals": [{"name": "samx", "entry": "samx"}]},
}
error_handler.validate_plot_config(plot_config, 0)
assert error_handler.errors == [
"'signals' configuration for x axis in plot 0 must be a non-empty list"
]

View File

@ -1,92 +0,0 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
from bec_widgets.widgets.scan_plot import scan_plot
def test_scan_plot(qtbot):
"""Test ScanPlot"""
plot = scan_plot.BECScanPlot()
qtbot.addWidget(plot)
plot.show()
qtbot.waitExposed(plot)
plot.x_channel = "x"
plot.y_channel_list = ["y1", "y2"]
plot.on_scan_segment(
{
"data": {
"x": {"x": {"value": 1}},
"y1": {"y1": {"value": 1}},
"y2": {"y2": {"value": 3}},
},
"scanID": "test",
},
{"scanID": "test", "scan_number": 1, "scan_report_devices": ["x"]},
)
plot.on_scan_segment(
{
"data": {
"x": {"x": {"value": 2}},
"y1": {"y1": {"value": 2}},
"y2": {"y2": {"value": 4}},
},
"scanID": "test",
},
{"scanID": "test", "scan_number": 1, "scan_report_devices": ["x"]},
)
assert all(plot.scan_curves["y1"].getData()[0] == [1, 2])
assert all(plot.scan_curves["y2"].getData()[1] == [3, 4])
def test_scan_plot_clears_data(qtbot):
"""Test ScanPlot"""
plot = scan_plot.BECScanPlot()
qtbot.addWidget(plot)
plot.show()
qtbot.waitExposed(plot)
plot.x_channel = "x"
plot.y_channel_list = ["y1", "y2"]
plot.on_scan_segment(
{
"data": {
"x": {"x": {"value": 1}},
"y1": {"y1": {"value": 1}},
"y2": {"y2": {"value": 3}},
},
"scanID": "test",
},
{"scanID": "test", "scan_number": 1, "scan_report_devices": ["x"]},
)
plot.reset_plots({}, {})
plot.on_scan_segment(
{
"data": {
"x": {"x": {"value": 2}},
"y1": {"y1": {"value": 2}},
"y2": {"y2": {"value": 4}},
},
"scanID": "test",
},
{"scanID": "test", "scan_number": 1, "scan_report_devices": ["x"]},
)
assert all(plot.scan_curves["y1"].getData()[0] == [2])
assert all(plot.scan_curves["y2"].getData()[1] == [4])
def test_scan_plot_redraws_dap(qtbot):
"""Test ScanPlot"""
plot = scan_plot.BECScanPlot()
qtbot.addWidget(plot)
plot.show()
qtbot.waitExposed(plot)
plot.y_channel_list = ["dap.y1", "dap.y2"]
plot.redraw_dap({"data": {"y1": {"x": [1], "y": [1]}, "y2": {"x": [2], "y": [2]}}}, {})
assert all(plot.dap_curves["y1"].getData()[0] == [1])
assert all(plot.dap_curves["y2"].getData()[1] == [2])