diff --git a/bec_widgets/cli.py b/bec_widgets/cli.py new file mode 100644 index 00000000..bf1ee2cb --- /dev/null +++ b/bec_widgets/cli.py @@ -0,0 +1,98 @@ +import argparse +import os +from threading import RLock + +from PyQt5 import uic +from PyQt5.QtCore import pyqtSignal +from PyQt5.QtWidgets import QApplication, QMainWindow +from scan_plot import BECScanPlot + +from bec_lib.core import BECMessage, MessageEndpoints, RedisConnector + + +class BEC_UI(QMainWindow): + new_scan_data = pyqtSignal(dict) + new_dap_data = pyqtSignal(dict) # signal per proc instance? + new_scan = pyqtSignal() + + def __init__(self, uipath): + super().__init__() + self._scan_channels = set() + self._dap_channels = set() + + self._scan_thread = None + self._dap_threads = [] + + ui = uic.loadUi(uipath, self) + + _, fname = os.path.split(uipath) + self.setWindowTitle(fname) + + for sp in ui.findChildren(BECScanPlot): + for chan in (sp.x_channel, *sp.y_channel_list): + if chan.startswith("dap."): + chan = chan.partition("dap.")[-1] + self._dap_channels.add(chan) + else: + self._scan_channels.add(chan) + + sp.initialize() # TODO: move this elsewhere? + + self.new_scan_data.connect(sp.redraw_scan) # TODO: merge + self.new_dap_data.connect(sp.redraw_dap) + self.new_scan.connect(sp.clearData) + + # Scan setup + self._scan_id = None + scan_lock = RLock() + + def _scan_cb(msg): + msg = BECMessage.ScanMessage.loads(msg.value) + with scan_lock: + scan_id = msg[0].content["scanID"] + if self._scan_id != scan_id: + self._scan_id = scan_id + self.new_scan.emit() + self.new_scan_data.emit(msg[0].content["data"]) + + bec_connector = RedisConnector("localhost:6379") + + if self._scan_channels: + scan_readback = MessageEndpoints.scan_segment() + self._scan_thread = bec_connector.consumer( + topics=scan_readback, + cb=_scan_cb, + ) + self._scan_thread.start() + + # DAP setup + def _proc_cb(msg): + msg = BECMessage.ProcessedDataMessage.loads(msg.value) + self.new_dap_data.emit(msg.content["data"]) + + if self._dap_channels: + for chan in self._dap_channels: + proc_ep = MessageEndpoints.processed_data(chan) + dap_thread = bec_connector.consumer(topics=proc_ep, cb=_proc_cb) + dap_thread.start() + self._dap_threads.append(dap_thread) + + self.show() + + +def main(): + parser = argparse.ArgumentParser( + prog="bec-pyqt", formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + + parser.add_argument("uipath", type=str, help="Path to a BEC ui file") + + args, rem = parser.parse_known_args() + + app = QApplication(rem) + BEC_UI(args.uipath) + app.exec_() + + +if __name__ == "__main__": + main() diff --git a/bec_widgets/readme.md b/bec_widgets/readme.md new file mode 100644 index 00000000..bda769dc --- /dev/null +++ b/bec_widgets/readme.md @@ -0,0 +1,4 @@ +Add/modify the path in the following variable to make the plugin avaiable in Qt Designer: +``` +$ export PYQTDESIGNERPATH=//bec/bec_qtplugin:$PYQTDESIGNERPATH +``` diff --git a/bec_widgets/scan_plot.py b/bec_widgets/scan_plot.py new file mode 100644 index 00000000..9c74e7a8 --- /dev/null +++ b/bec_widgets/scan_plot.py @@ -0,0 +1,106 @@ +import itertools + +import pyqtgraph as pg +from PyQt5.QtCore import pyqtProperty, pyqtSlot + +from bec_lib.core.logger import bec_logger + +logger = bec_logger.logger + + +pg.setConfigOptions(background="w", foreground="k", antialias=True) +COLORS = ["#fd7f6f", "#7eb0d5", "#b2e061", "#bd7ebe", "#ffb55a"] + + +class BECScanPlot(pg.PlotWidget): + def __init__(self, parent=None, background="default"): + super().__init__(parent, background) + + self._x_channel = "" + self._y_channel_list = [] + + self.scan_curves = {} + self.dap_curves = {} + + def initialize(self): + plot_item = self.getPlotItem() + plot_item.addLegend() + colors = itertools.cycle(COLORS) + + for y_chan in self.y_channel_list: + if y_chan.startswith("dap."): + y_chan = y_chan.partition("dap.")[-1] + curves = self.dap_curves + else: + curves = self.scan_curves + + curves[y_chan] = plot_item.plot( + x=[], y=[], pen=pg.mkPen(color=next(colors), width=2), name=y_chan + ) + + plot_item.setLabel("bottom", self._x_channel) + if len(self.scan_curves) == 1: + plot_item.setLabel("left", next(iter(self.scan_curves))) + + @pyqtSlot() + def clearData(self): + for plot_curve in {**self.scan_curves, **self.dap_curves}.values(): + plot_curve.setData(x=[], y=[]) + + @pyqtSlot(dict) + def redraw_scan(self, data): + if not self.x_channel: + return + + if self.x_channel not in data: + logger.warning(f"Unknown channel `{self.x_channel}` for X data in {self.objectName()}") + return + + x_new = data[self.x_channel][self.x_channel]["value"] + for chan, plot_curve in self.scan_curves.items(): + if not chan: + continue + + if chan not in data: + logger.warning(f"Unknown channel `{chan}` for Y data in {self.objectName()}") + continue + + y_new = data[chan][chan]["value"] + x, y = plot_curve.getData() # TODO: is it a good approach? + if x is None: + x = [] + if y is None: + y = [] + + plot_curve.setData(x=[*x, x_new], y=[*y, y_new]) + + @pyqtSlot(dict) + def redraw_dap(self, data): + for chan, plot_curve in self.dap_curves.items(): + if not chan: + continue + + if chan not in data: + logger.warning(f"Unknown channel `{chan}` for DAP data in {self.objectName()}") + continue + + x_new = data[chan]["x"] + y_new = data[chan]["y"] + + plot_curve.setData(x=x_new, y=y_new) + + @pyqtProperty("QStringList") + def y_channel_list(self): + return self._y_channel_list + + @y_channel_list.setter + def y_channel_list(self, new_list): + self._y_channel_list = new_list + + @pyqtProperty(str) + def x_channel(self): + return self._x_channel + + @x_channel.setter + def x_channel(self, new_val): + self._x_channel = new_val diff --git a/bec_widgets/scan_plot_plugin.py b/bec_widgets/scan_plot_plugin.py new file mode 100644 index 00000000..2c6bff17 --- /dev/null +++ b/bec_widgets/scan_plot_plugin.py @@ -0,0 +1,55 @@ +from PyQt5.QtDesigner import QPyDesignerCustomWidgetPlugin +from PyQt5.QtGui import QIcon +from scan_plot import BECScanPlot + + +class BECScanPlotPlugin(QPyDesignerCustomWidgetPlugin): + def __init__(self, parent=None): + super(BECScanPlotPlugin, self).__init__(parent) + + self._initialized = False + + def initialize(self, formEditor): + if self._initialized: + return + + self._initialized = True + + def isInitialized(self): + return self._initialized + + def createWidget(self, parent): + return BECScanPlot(parent) + + def name(self): + return "BECScanPlot" + + def group(self): + return "BEC widgets" + + def icon(self): + return QIcon() + + def toolTip(self): + return "BEC plot for scans" + + def whatsThis(self): + return "BEC plot for scans" + + def isContainer(self): + return False + + def domXml(self): + return ( + '\n' + ' \n' + " BEC plot for scans\n" + " \n" + ' \n' + " BEC plot for scans in Python using PyQt.\n" + " \n" + "\n" + ) + + def includeFile(self): + return "scan_plot"