0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 11:41:49 +02:00

feat: BECDeviceMonitor modular class which can be used to replace placeholder in .ui file.

This commit is contained in:
wyzula-jan
2023-10-12 15:30:53 +02:00
parent 75af0404b3
commit f3f55a7ee0
3 changed files with 353 additions and 82 deletions

View File

@ -458,14 +458,12 @@ class PlotApp(QWidget):
f"Scan name not found in metadata. Please check the scan_name in the YAML config or in bec " f"Scan name not found in metadata. Please check the scan_name in the YAML config or in bec "
f"configuration." f"configuration."
) )
return
self.plot_data = self.plot_data_config.get(currentName, []) self.plot_data = self.plot_data_config.get(currentName, [])
if self.plot_data == []: if self.plot_data == []:
raise ValueError( raise ValueError(
f"Scan name {currentName} not found in the YAML config. Please check the scan_name in the " f"Scan name {currentName} not found in the YAML config. Please check the scan_name in the "
f"YAML config or in bec configuration." f"YAML config or in bec configuration."
) )
return
# Init UI # Init UI
self.init_ui(self.plot_settings["num_columns"]) self.init_ui(self.plot_settings["num_columns"])

View File

@ -3,6 +3,72 @@ from PyQt5 import uic
from PyQt5.QtWidgets import QMainWindow, QApplication, QVBoxLayout from PyQt5.QtWidgets import QMainWindow, QApplication, QVBoxLayout
from bec_widgets.widgets.device_monitor import BECDeviceMonitor from bec_widgets.widgets.device_monitor import BECDeviceMonitor
config_1 = {
"plot_settings": {
"background_color": "black",
"num_columns": 1,
"colormap": "plasma",
"scan_types": False,
},
"plot_data": [
{
"plot_name": "BPM4i plots vs samx",
"x": {
"label": "Motor Y",
"signals": [{"name": "samx", "entry": "samx"}],
},
"y": {
"label": "bpm4i",
"signals": [{"name": "bpm4i", "entry": "bpm4i"}],
},
},
{
"plot_name": "Gauss plots vs samx",
"x": {
"label": "Motor X",
"signals": [{"name": "samx", "entry": "samx"}],
},
"y": {
"label": "Gauss",
"signals": [{"name": "gauss_bpm", "entry": "gauss_bpm"}],
},
},
],
}
config_2 = {
"plot_settings": {
"background_color": "black",
"num_columns": 2,
"colormap": "plasma",
"scan_types": False,
},
"plot_data": [
{
"plot_name": "BPM4i plots vs samx",
"x": {
"label": "Motor Y",
"signals": [{"name": "samx", "entry": "samx"}],
},
"y": {
"label": "bpm4i",
"signals": [{"name": "samy", "entry": "samy"}],
},
},
{
"plot_name": "Gauss plots vs samx",
"x": {
"label": "Motor X",
"signals": [{"name": "samx", "entry": "samx"}],
},
"y": {
"label": "Gauss ADC",
"signals": [{"name": "gauss_adc1", "entry": "gauss_adc1"}],
},
},
],
}
class ModularApp(QMainWindow): class ModularApp(QMainWindow):
def __init__(self, client=None, parent=None): def __init__(self, client=None, parent=None):
@ -19,8 +85,12 @@ class ModularApp(QMainWindow):
def _init_plots(self): def _init_plots(self):
self.glw_1_layout = QVBoxLayout(self.glw_1) # Create a new QVBoxLayout self.glw_1_layout = QVBoxLayout(self.glw_1) # Create a new QVBoxLayout
self.bec_device_monitor = BECDeviceMonitor(parent=self) self.bec_device_monitor_1 = BECDeviceMonitor(parent=self, config=config_1)
self.glw_1_layout.addWidget(self.bec_device_monitor) # Add BECDeviceMonitor to the layout self.glw_1_layout.addWidget(self.bec_device_monitor_1) # Add BECDeviceMonitor to the layout
self.glw_2_layout = QVBoxLayout(self.glw_2) # Create a new QVBoxLayout
self.bec_device_monitor_2 = BECDeviceMonitor(parent=self, config=config_2)
self.glw_2_layout.addWidget(self.bec_device_monitor_2) # Add BECDeviceMonitor to the layout
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,41 +1,60 @@
import pyqtgraph as pg import pyqtgraph as pg
from bec_lib.core import MessageEndpoints
from PyQt5 import QtCore from PyQt5 import QtCore
from PyQt5.QtCore import pyqtSignal, pyqtSlot from PyQt5.QtCore import pyqtSignal, pyqtSlot
from PyQt5.QtWidgets import QApplication from PyQt5.QtWidgets import QApplication
from pyqtgraph import mkPen, mkBrush from pyqtgraph import mkPen, mkBrush
from bec_widgets.bec_dispatcher import bec_dispatcher
from bec_widgets.qt_utils import Crosshair, Colors from bec_widgets.qt_utils import Crosshair, Colors
config_simple = { config_simple = {
"plot_settings": { "plot_settings": {
"background_color": "white", "background_color": "black",
"num_columns": 1, "num_columns": 2,
"colormap": "plasma", "colormap": "plasma",
"scan_types": False, "scan_types": False,
}, },
"plot_data": { "plot_data": [
"plot_name": "BPM4i plots vs samx", {
"x": { "plot_name": "BPM4i plots vs samx",
"label": "Motor Y", "x": {
"signals": [{"name": "samx"}], # Entry is missing "label": "Motor Y",
"signals": [{"name": "samx", "entry": "samx"}],
},
"y": {
"label": "bpm4i",
"signals": [{"name": "bpm4i", "entry": "bpm4i"}],
},
}, },
"y": { {
"label": "bpm4i", "plot_name": "Gauss plots vs samx",
"signals": [{"name": "bpm4i"}], # Entry is missing "x": {
"label": "Motor X",
"signals": [{"name": "samx", "entry": "samx"}],
},
"y": {
"label": "Gauss",
"signals": [{"name": "gauss_bpm", "entry": "gauss_bpm"}],
},
}, },
}, ],
} }
class BECDeviceMonitor(pg.GraphicsLayoutWidget): class BECDeviceMonitor(pg.GraphicsLayoutWidget):
update_signal = pyqtSignal() update_signal = pyqtSignal()
def __init__(self, config: dict = config_simple, parent=None): # , client=None, parent=None): def __init__(self, config: dict = config_simple, client=None, parent=None):
super(BECDeviceMonitor, self).__init__(parent=None) super(BECDeviceMonitor, self).__init__(parent=None)
# Client and device manager from BEC # Client and device manager from BEC
# self.client = bec_dispatcher.client if client is None else client self.client = bec_dispatcher.client if client is None else client
# self.dev = self.client.device_manager.devices self.dev = self.client.device_manager.devices
# Connect slots dispatcher
bec_dispatcher.connect_slot(self.on_scan_segment, MessageEndpoints.scan_segment())
# Current configuration # Current configuration
self.config = config self.config = config
@ -49,94 +68,278 @@ class BECDeviceMonitor(pg.GraphicsLayoutWidget):
self.grid_coordinates = None self.grid_coordinates = None
self.scanID = None self.scanID = None
self.user_colors = {} # key: (plot_name, y_name, y_entry), value: color
# Connect the update signal to the update plot method #TODO enable when update is fixed # Connect the update signal to the update plot method #TODO enable when update is fixed
# self.proxy_update_plot = pg.SignalProxy( self.proxy_update_plot = pg.SignalProxy(
# self.update_signal, rateLimit=25, slot=self.update_plot self.update_signal, rateLimit=25, slot=self.update_plot
# ) )
# Init UI # Init UI
self._init_config() self._init_config()
self._init_plot() # self._init_ui() #TODO could be removed
# self._init_curves() # self._init_curves()
def _init_config(self): def _init_config(self):
"""
Initializes or update the configuration settings for the PlotApp.
"""
# Separate configs # Separate configs
self.plot_settings = self.config.get("plot_settings", {}) self.plot_settings = self.config.get("plot_settings", {})
self.plot_data_config = self.config.get("plot_data", {}) self.plot_data_config = self.config.get("plot_data", {})
self.scan_types = self.plot_settings.get("scan_types", False)
def _init_plot(self): if self.scan_types is False: # Device tracking mode
self.plot_data = self.plot_data_config # TODO logic has to be improved
else: # setup first line scan as default, then changed with different scan type
self.plot_data = self.plot_data_config[list(self.plot_data_config.keys())[0]]
# TODO init plot background -> so far not used, I don;t like how it is done in extreme.py
# Initialize the UI
self._init_ui(self.plot_settings["num_columns"])
def _init_ui(self, num_columns: int = 3) -> None:
"""
Initialize the UI components, create plots and store their grid positions.
Args:
num_columns (int): Number of columns to wrap the layout.
This method initializes a dictionary `self.plots` to store the plot objects
along with their corresponding x and y signal names. It dynamically arranges
the plots in a grid layout based on the given number of columns and dynamically
stretches the last plots to fit the remaining space.
"""
self.clear() self.clear()
self.plots = {} self.plots = {}
# self.grid_coordinates = {} # TODO will be extended in the next version self.grid_coordinates = []
plot_config = self.plot_data_config num_plots = len(self.plot_data)
# TODO here will go for cycle for multiple plots # Check if num_columns exceeds the number of plots
if num_columns >= num_plots:
num_columns = num_plots
self.plot_settings["num_columns"] = num_columns # Update the settings
print(
f"Warning: num_columns in the YAML file was greater than the number of plots. Resetting num_columns to number of plots:{num_columns}."
)
else:
self.plot_settings["num_columns"] = num_columns # Update the settings
# Plot Settings num_rows = num_plots // num_columns
plot_name = plot_config.get("plot_name", "") last_row_cols = num_plots % num_columns
x_label = plot_config["x"].get("label", "") remaining_space = num_columns - last_row_cols
y_label = plot_config["y"].get("label", "")
plot = self.addPlot(title=plot_name) for i, plot_config in enumerate(self.plot_data):
plot.setLabel("bottom", x_label) row, col = i // num_columns, i % num_columns
plot.setLabel("left", y_label) colspan = 1
# Adding some data to plot for testing
plot.plot([1, 2, 3, 4, 5], [5, 4, 3, 2, 1])
self.plots[plot_name] = plot if row == num_rows and remaining_space > 0:
if last_row_cols == 1:
colspan = num_columns
else:
colspan = remaining_space // last_row_cols + 1
remaining_space -= colspan - 1
last_row_cols -= 1
# def _init_curves(self): plot_name = plot_config.get("plot_name", "")
# self.curves_data = {} x_label = plot_config["x"].get("label", "")
# row_labels = [] y_label = plot_config["y"].get("label", "")
#
# plot_config = self.plot_data_config # TODO will be change to loop as in extreme.py
#
# plot_name = self.plot_data_config.get("plot_name", "")
# plot = self.plots[plot_name]
# plot.clear()
#
# y_config = plot_config["y"]["signals"]
# colors_ys = Colors.golden_angle_color(
# colormap=self.plot_settings["colormap"], num=len(y_config)
# )
#
# curve_list = []
# for i, (y_config, color) in enumerate(zip(y_config, colors_ys)):
# # print(y_config)
# y_name = y_config["name"]
# y_entries = y_config.get("entry", [y_name])
#
# if not isinstance(y_entries, list):
# y_entries = [y_entries]
#
# for y_entry in y_entries:
# user_color = self.user_colors.get((plot_name, y_name, y_entry), None)
# color_to_use = user_color if user_color else color
#
# pen_curve = mkPen(color=color_to_use, width=2, style=QtCore.Qt.DashLine)
# brush_curve = mkBrush(color=color_to_use)
#
# curve_data = pg.PlotDataItem(
# symbolSize=5,
# symbolBrush=brush_curve,
# pen=pen_curve,
# skipFiniteCheck=True,
# name=f"{y_name} ({y_entry})",
# )
#
# curve_list.append((y_name, y_entry, curve_data))
# plot.addItem(curve_data)
# row_labels.append(f"{y_name} ({y_entry}) - {plot_name}")
#
# self.curves_data[plot_name] = curve_list
def update_plot(self): plot = self.addPlot(row=row, col=col, colspan=colspan, title=plot_name)
... plot.setLabel("bottom", x_label)
plot.setLabel("left", y_label)
plot.addLegend()
self.plots[plot_name] = plot
self.grid_coordinates.append((row, col))
self.init_curves()
def init_curves(self):
"""
Initialize curve data and properties, and update table row labels.
This method initializes a nested dictionary `self.curves_data` to store
the curve objects for each x and y signal pair. It also updates the row labels
in `self.tableWidget_crosshair` to include the grid position for each y-value.
"""
self.curves_data = {}
row_labels = []
for idx, plot_config in enumerate(self.plot_data):
plot_name = plot_config.get("plot_name", "")
plot = self.plots[plot_name]
plot.clear()
y_configs = plot_config["y"]["signals"]
colors_ys = Colors.golden_angle_color(
colormap=self.plot_settings["colormap"], num=len(y_configs)
)
curve_list = []
for i, (y_config, color) in enumerate(zip(y_configs, colors_ys)):
# print(y_config)
y_name = y_config["name"]
y_entries = y_config.get("entry", [y_name])
if not isinstance(y_entries, list):
y_entries = [y_entries]
for y_entry in y_entries:
user_color = self.user_colors.get((plot_name, y_name, y_entry), None)
color_to_use = user_color if user_color else color
pen_curve = mkPen(color=color_to_use, width=2, style=QtCore.Qt.DashLine)
brush_curve = mkBrush(color=color_to_use)
curve_data = pg.PlotDataItem(
symbolSize=5,
symbolBrush=brush_curve,
pen=pen_curve,
skipFiniteCheck=True,
name=f"{y_name} ({y_entry})",
)
curve_list.append((y_name, y_entry, curve_data))
plot.addItem(curve_data)
row_labels.append(f"{y_name} ({y_entry}) - {plot_name}")
self.curves_data[plot_name] = curve_list
# TODO hook crosshairs here
def update_plot(self) -> None:
"""Update the plot data based on the stored data dictionary."""
for plot_name, curve_list in self.curves_data.items():
for y_name, y_entry, curve in curve_list:
x_config = next(
(pc["x"] for pc in self.plot_data if pc.get("plot_name") == plot_name), {}
)
x_signal_config = x_config["signals"][0]
x_name = x_signal_config.get("name", "")
x_entry = x_signal_config.get("entry", x_name)
key = (x_name, x_entry, y_name, y_entry)
data_x = self.data.get(key, {}).get("x", [])
data_y = self.data.get(key, {}).get("y", [])
curve.setData(data_x, data_y)
@pyqtSlot(dict, dict)
def on_scan_segment(self, msg, metadata):
"""
Handle new scan segments and saves data to a dictionary. Linked through bec_dispatcher.
Args:
msg (dict): Message received with scan data.
metadata (dict): Metadata of the scan.
"""
current_scanID = msg.get("scanID", None)
if current_scanID is None:
return
if current_scanID != self.scanID:
if self.scan_types is False:
self.plot_data = self.plot_data_config
elif self.scan_types is True:
currentName = metadata.get("scan_name")
if currentName is None:
raise ValueError(
f"Scan name not found in metadata. Please check the scan_name in the YAML config or in bec "
f"configuration."
)
self.plot_data = self.plot_data_config.get(currentName, [])
if self.plot_data == []:
raise ValueError(
f"Scan name {currentName} not found in the YAML config. Please check the scan_name in the "
f"YAML config or in bec configuration."
)
# Init UI
self.init_ui(self.plot_settings["num_columns"])
self.scanID = current_scanID
self.data = {}
self.init_curves()
for plot_config in self.plot_data:
plot_name = plot_config.get("plot_name", "Unnamed Plot")
x_config = plot_config["x"]
x_signal_config = x_config["signals"][0] # Assuming there's at least one signal for x
x_name = x_signal_config.get("name", "")
if not x_name:
raise ValueError(f"Name for x signal must be specified in plot: {plot_name}.")
x_entry_list = x_signal_config.get("entry", [])
if not x_entry_list:
x_entry_list = (
self.dev[x_name]._hints if hasattr(self.dev[x_name], "_hints") else [x_name]
)
if not isinstance(x_entry_list, list):
x_entry_list = [x_entry_list]
y_configs = plot_config["y"]["signals"]
for x_entry in x_entry_list:
for y_config in y_configs:
y_name = y_config.get("name", "")
if not y_name:
raise ValueError(
f"Name for y signal must be specified in plot: {plot_name}."
)
y_entry_list = y_config.get("entry", [])
if not y_entry_list:
y_entry_list = (
self.dev[y_name]._hints
if hasattr(self.dev[y_name], "_hints")
else [y_name]
)
if not isinstance(y_entry_list, list):
y_entry_list = [y_entry_list]
for y_entry in y_entry_list:
key = (x_name, x_entry, y_name, y_entry)
data_x = msg["data"].get(x_name, {}).get(x_entry, {}).get("value", None)
data_y = msg["data"].get(y_name, {}).get(y_entry, {}).get("value", None)
if data_x is None:
raise ValueError(
f"Incorrect entry '{x_entry}' specified for x in plot: {plot_name}, x name: {x_name}"
)
if data_y is None:
if hasattr(self.dev[y_name], "_hints"):
raise ValueError(
f"Incorrect entry '{y_entry}' specified for y in plot: {plot_name}, y name: {y_name}"
)
else:
raise ValueError(
f"No hints available for y in plot: {plot_name}, and name '{y_name}' did not work as entry"
)
if data_x is not None:
self.data.setdefault(key, {}).setdefault("x", []).append(data_x)
if data_y is not None:
self.data.setdefault(key, {}).setdefault("y", []).append(data_y)
self.update_signal.emit()
if __name__ == "__main__": if __name__ == "__main__":
import sys import sys
from bec_widgets.bec_dispatcher import bec_dispatcher
client = bec_dispatcher.client
client.start()
app = QApplication(sys.argv) app = QApplication(sys.argv)
monitor = BECDeviceMonitor() monitor = BECDeviceMonitor()