1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-03-10 10:47:49 +01:00

Merge branch 'extreme_app'

This commit is contained in:
wyzula-jan
2023-08-29 14:48:33 +02:00
3 changed files with 273 additions and 11 deletions

View File

@@ -21,8 +21,7 @@ class _BECDap:
# Adding a new pyqt signal requres a class factory, as they must be part of the class definition
# and cannot be dynamically added as class attributes after the class has been defined.
_signal_class_factory = (
type(f"Signal{i}", (QObject,), dict(signal=pyqtSignal("PyQt_PyObject")))
for i in itertools.count()
type(f"Signal{i}", (QObject,), dict(signal=pyqtSignal(dict, dict))) for i in itertools.count()
)
@@ -99,7 +98,10 @@ class _BECDispatcher(QObject):
def cb(msg):
msg = BECMessage.MessageReader.loads(msg.value)
self._connections[topic].signal.emit(msg)
if not isinstance(msg, list):
msg = [msg]
for msg_i in msg:
self._connections[topic].signal.emit(msg_i.content, msg_i.metadata)
consumer = self.client.connector.consumer(topics=topic, cb=cb)
consumer.start()
@@ -132,7 +134,10 @@ class _BECDispatcher(QObject):
def _dap_cb(msg):
msg = BECMessage.ProcessedDataMessage.loads(msg.value)
self.new_dap_data.emit(msg.content["data"], msg.metadata)
if not isinstance(msg, list):
msg = [msg]
for i in msg:
self.new_dap_data.emit(i.content["data"], i.metadata)
dap_ep = MessageEndpoints.processed_data(dap_name)
consumer = self.client.connector.consumer(topics=dap_ep, cb=_dap_cb)

View File

@@ -0,0 +1,245 @@
import os
import PyQt5.QtWidgets
import numpy as np
import pyqtgraph as pg
from PyQt5.QtCore import pyqtSignal, pyqtSlot
from PyQt5.QtWidgets import QApplication, QWidget
from PyQt5.QtWidgets import QTableWidgetItem
from pyqtgraph import mkBrush, mkPen
from pyqtgraph.Qt import QtCore, uic
from bec_widgets.qt_utils import Crosshair
from bec_lib.core import MessageEndpoints
# TODO implement:
# - implement scanID database for visualizing previous scans
# - multiple signals for different monitors
class PlotApp(QWidget):
"""
Main class for the PlotApp used to plot two signals from the BEC.
Attributes:
update_signal (pyqtSignal): Signal to trigger plot updates.
update_dap_signal (pyqtSignal): Signal to trigger DAP updates.
Args:
x_y_values (list of tuple, optional): List of (x, y) device/signal pairs for plotting.
dap_worker (str, optional): DAP process to specify. Set to None to disable.
parent (QWidget, optional): Parent widget.
"""
update_signal = pyqtSignal()
update_dap_signal = pyqtSignal()
def __init__(self, x_y_values=None, dap_worker=None, parent=None):
super(PlotApp, self).__init__(parent)
current_path = os.path.dirname(__file__)
uic.loadUi(os.path.join(current_path, "oneplot.ui"), self)
self.x_y_values = x_y_values if x_y_values is not None else []
self.dap_worker = dap_worker # if dap_worker is not None else ""
self.x_values = [x for x, y in self.x_y_values]
self.y_values = [y for x, y in self.x_y_values]
self.scanID = None
self.data_x = []
self.data_y = []
self.dap_x = np.array([])
self.dap_y = np.array([])
self.fit = None
self.init_ui()
self.init_curves()
self.hook_crosshair()
self.proxy_update_plot = pg.SignalProxy(
self.update_signal, rateLimit=25, slot=self.update_plot
)
self.proxy_update_fit = pg.SignalProxy(
self.update_dap_signal, rateLimit=25, slot=self.update_fit_table
)
def init_ui(self) -> None:
"""Initialize the UI components."""
self.plot = pg.PlotItem(title=self.y_values[0])
self.glw.addItem(self.plot)
self.plot.setLabel("bottom", self.x_values[0])
self.plot.setLabel("left", self.y_values[0])
self.plot.addLegend()
def init_curves(self) -> None:
"""Initialize curve data and properties."""
self.plot.clear()
self.curves_data = []
self.curves_dap = []
self.pens = []
self.brushs = [] # todo check if needed
color_list = [
"#384c6b",
"#e28a2b",
"#5E3023",
"#e41a1c",
"#984e83",
"#4daf4a",
] # todo change to cmap
for ii, monitor in enumerate(self.y_values):
pen_curve = mkPen(color=color_list[ii], width=2, style=QtCore.Qt.DashLine)
brush = mkBrush(color=color_list[ii], width=2, style=QtCore.Qt.DashLine)
curve_data = pg.PlotDataItem(
pen=pen_curve,
skipFiniteCheck=True,
symbolBrush=brush,
symbolSize=5,
name=monitor + "_data",
)
self.curves_data.append(curve_data)
self.pens.append(pen_curve)
self.plot.addItem(curve_data)
if self.dap_worker is not None:
pen_dap = mkPen(color=color_list[ii + 1], width=2, style=QtCore.Qt.DashLine)
curve_dap = pg.PlotDataItem(pen=pen_dap, size=5, name=monitor + "_fit")
self.curves_dap.append(curve_dap)
self.plot.addItem(curve_dap)
self.tableWidget_crosshair.setRowCount(len(self.y_values))
self.tableWidget_crosshair.setVerticalHeaderLabels(self.y_values)
self.hook_crosshair()
def hook_crosshair(self) -> None:
"""Attach the crosshair to the plot."""
self.crosshair_1d = Crosshair(self.plot, precision=3)
self.crosshair_1d.coordinatesChanged1D.connect(
lambda x, y: self.update_table(self.tableWidget_crosshair, x, y, column=0)
)
self.crosshair_1d.coordinatesClicked1D.connect(
lambda x, y: self.update_table(self.tableWidget_crosshair, x, y, column=1)
)
def update_table(
self, table_widget: PyQt5.QtWidgets.QTableWidget, x: float, y_values: list, column: int
) -> None:
for i, y in enumerate(y_values):
table_widget.setItem(i, column, QTableWidgetItem(f"({x}, {y})"))
table_widget.resizeColumnsToContents()
def update_plot(self) -> None:
"""Update the plot data."""
self.curves_data[0].setData(self.data_x, self.data_y)
if self.dap_worker is not None:
self.curves_dap[0].setData(self.dap_x, self.dap_y)
def update_fit_table(self):
"""Update the table for fit data."""
self.tableWidget_fit.setData(self.fit)
@pyqtSlot(dict, dict)
def on_dap_update(self, msg: dict, metadata: dict) -> None:
"""
Update DAP related data.
Args:
msg (dict): Message received with data.
metadata (dict): Metadata of the DAP.
"""
self.dap_x = msg[self.dap_worker]["x"]
self.dap_y = msg[self.dap_worker]["y"]
self.fit = metadata["fit_parameters"]
self.update_dap_signal.emit()
@pyqtSlot(dict, dict)
def on_scan_segment(self, msg: dict, metadata: dict):
"""
Handle new scan segments.
Args:
msg (dict): Message received with scan data.
metadata (dict): Metadata of the scan.
"""
current_scanID = msg["scanID"]
if current_scanID != self.scanID:
self.scanID = current_scanID
self.data_x = []
self.data_y = []
self.init_curves()
dev_x = self.x_values[0]
dev_y = self.y_values[0]
# TODO put warning that I am putting 1st one
data_x = msg["data"][dev_x][dev[dev_x]._hints[0]]["value"]
data_y = msg["data"][dev_y][dev[dev_y]._hints[0]]["value"]
self.data_x.append(data_x)
self.data_y.append(data_y)
self.update_signal.emit()
if __name__ == "__main__":
import argparse
import ast
from bec_widgets import ctrl_c
from bec_widgets.bec_dispatcher import bec_dispatcher
parser = argparse.ArgumentParser()
parser.add_argument(
"--x_y_values",
type=str,
default="[('samx', 'gauss_bpm')]",
help="Specify x y device/signals pairs for plotting as [tuple(str,str)]",
)
parser.add_argument("--dap_worker", type=str, default=None, help="Specify the DAP process")
args = parser.parse_args()
try:
x_y_values = ast.literal_eval(args.x_y_values)
if not all(isinstance(item, tuple) and len(item) == 2 for item in x_y_values):
raise ValueError("Invalid format: All elements must be 2-tuples.")
except (ValueError, SyntaxError):
raise ValueError("Invalid input format. Expected a list of 2-tuples.")
# Convert dap_worker to None if it's the string "None", for testing "gaussian_fit_worker_3"
dap_worker = None if args.dap_worker == "None" else args.dap_worker
# Retrieve the dap_process value
# dap_worker = args.dap_worker
# BECclient global variables
client = bec_dispatcher.client
client.start()
dev = client.device_manager.devices
scans = client.scans
queue = client.queue
app = QApplication([])
plotApp = PlotApp(x_y_values=x_y_values, dap_worker=dap_worker)
# Connecting signals from bec_dispatcher
bec_dispatcher.connect_dap_slot(plotApp.on_dap_update, dap_worker)
bec_dispatcher.connect_slot(plotApp.on_scan_segment, MessageEndpoints.scan_segment())
ctrl_c.setup(app)
window = plotApp
window.show()
app.exec_()

View File

@@ -63,9 +63,19 @@ class Crosshair(QObject):
size=10, pen=pg.mkPen(None), brush=pg.mkBrush(color)
)
self.marker_moved_1d.append(marker_moved)
self.marker_clicked_1d.append(marker_clicked)
self.plot_item.addItem(marker_moved)
self.plot_item.addItem(marker_clicked)
# Create glowing effect markers for clicked events
marker_clicked_list = []
for size, alpha in [(18, 64), (14, 128), (10, 255)]:
marker_clicked = pg.ScatterPlotItem(
size=size,
pen=pg.mkPen(None),
brush=pg.mkBrush(color.red(), color.green(), color.blue(), alpha),
)
marker_clicked_list.append(marker_clicked)
self.plot_item.addItem(marker_clicked)
self.marker_clicked_1d.append(marker_clicked_list)
elif isinstance(item, pg.ImageItem): # 2D plot
self.marker_2d = pg.ROI(
[0, 0], size=[1, 1], pen=pg.mkPen("r", width=2), movable=False
@@ -109,7 +119,8 @@ class Crosshair(QObject):
if y_values_1d:
if all(v is None for v in x_values_1d) or all(v is None for v in y_values_1d):
return None, None
return x, y_values_1d
closest_x = min(x_values_1d, key=lambda xi: abs(xi - x)) # Snap x to closest data point
return closest_x, y_values_1d
# Handle 2D plot
if image_2d is not None:
@@ -199,10 +210,11 @@ class Crosshair(QObject):
[round(y_val, self.precision) for y_val in y_values],
)
for i, y_val in enumerate(y_values):
self.marker_clicked_1d[i].setData(
[x if not self.is_log_x else np.log10(x)],
[y_val if not self.is_log_y else np.log10(y_val)],
)
for marker in self.marker_clicked_1d[i]:
marker.setData(
[x if not self.is_log_x else np.log10(x)],
[y_val if not self.is_log_y else np.log10(y_val)],
)
elif isinstance(item, pg.ImageItem):
if x is None or y_values is None:
return