0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-13 11:11:49 +02:00

Merge branch 'qtdesig_widgets' into 'master'

feat: emit the full bec message to slots

See merge request bec/bec-widgets!2
This commit is contained in:
2023-08-04 14:02:17 +02:00
9 changed files with 373 additions and 128 deletions

View File

@ -0,0 +1,93 @@
from dataclasses import dataclass
from threading import RLock
from bec_lib import BECClient
from bec_lib.core import BECMessage, MessageEndpoints
from bec_lib.core.redis_connector import RedisConsumerThreaded
from PyQt5.QtCore import QObject, pyqtSignal
@dataclass
class _BECDap:
"""Utility class to keep track of slots associated with a particular dap redis consumer"""
consumer: RedisConsumerThreaded
slots = set()
class _BECDispatcher(QObject):
new_scan = pyqtSignal(dict, dict)
scan_segment = pyqtSignal(dict, dict)
new_dap_data = pyqtSignal(dict)
def __init__(self):
super().__init__()
self.client = BECClient()
self.client.start()
self._slot_signal_map = {
"on_scan_segment": self.scan_segment,
"on_new_scan": self.new_scan,
}
self._daps = {}
self._scan_id = None
scan_lock = RLock()
def _scan_segment_cb(scan_segment, metadata):
with scan_lock:
# TODO: use ScanStatusMessage instead?
scan_id = metadata["scanID"]
if self._scan_id != scan_id:
self._scan_id = scan_id
self.new_scan.emit(scan_segment, metadata)
self.scan_segment.emit(scan_segment, metadata)
self.client.callbacks.register("scan_segment", _scan_segment_cb, sync=False)
def connect(self, widget):
for slot_name, signal in self._slot_signal_map.items():
slot = getattr(widget, slot_name, None)
if callable(slot):
signal.connect(slot)
def connect_dap_slot(self, slot, dap_name):
if dap_name not in self._daps:
# create a new consumer and connect slot
def _dap_cb(msg):
msg = BECMessage.ProcessedDataMessage.loads(msg.value)
self.new_dap_data.emit(msg.content["data"])
dap_ep = MessageEndpoints.processed_data(dap_name)
consumer = self.client.connector.consumer(topics=dap_ep, cb=_dap_cb)
consumer.start()
self.new_dap_data.connect(slot)
self._daps[dap_name] = _BECDap(consumer)
self._daps[dap_name].slots.add(slot)
else:
# connect slot if it's not yet connected
if slot not in self._daps[dap_name].slots:
self.new_dap_data.connect(slot)
self._daps[dap_name].slots.add(slot)
def disconnect_dap_slot(self, slot, dap_name):
if dap_name not in self._daps:
return
if slot not in self._daps[dap_name].slots:
return
self.new_dap_data.disconnect(slot)
self._daps[dap_name].slots.remove(slot)
if not self._daps[dap_name].slots:
# shutdown consumer if there are no more connected slots
self._daps[dap_name].consumer.shutdown()
del self._daps[dap_name]
bec_dispatcher = _BECDispatcher()

View File

@ -1,98 +0,0 @@
import argparse
import os
from threading import RLock
from bec_lib.core import BECMessage, MessageEndpoints, RedisConnector
from PyQt5 import uic
from PyQt5.QtCore import pyqtSignal
from PyQt5.QtWidgets import QApplication, QMainWindow
from .scan_plot import BECScanPlot
class BEC_UI(QMainWindow):
new_scan_data = pyqtSignal(dict)
new_dap_data = pyqtSignal(dict) # signal per proc instance?
new_scan = pyqtSignal()
def __init__(self, uipath):
super().__init__()
self._scan_channels = set()
self._dap_channels = set()
self._scan_thread = None
self._dap_threads = []
ui = uic.loadUi(uipath, self)
_, fname = os.path.split(uipath)
self.setWindowTitle(fname)
for sp in ui.findChildren(BECScanPlot):
for chan in (sp.x_channel, *sp.y_channel_list):
if chan.startswith("dap."):
chan = chan.partition("dap.")[-1]
self._dap_channels.add(chan)
else:
self._scan_channels.add(chan)
sp.initialize() # TODO: move this elsewhere?
self.new_scan_data.connect(sp.redraw_scan) # TODO: merge
self.new_dap_data.connect(sp.redraw_dap)
self.new_scan.connect(sp.clearData)
# Scan setup
self._scan_id = None
scan_lock = RLock()
def _scan_cb(msg):
msg = BECMessage.ScanMessage.loads(msg.value)
with scan_lock:
scan_id = msg[0].content["scanID"]
if self._scan_id != scan_id:
self._scan_id = scan_id
self.new_scan.emit()
self.new_scan_data.emit(msg[0].content["data"])
bec_connector = RedisConnector("localhost:6379")
if self._scan_channels:
scan_readback = MessageEndpoints.scan_segment()
self._scan_thread = bec_connector.consumer(
topics=scan_readback,
cb=_scan_cb,
)
self._scan_thread.start()
# DAP setup
def _proc_cb(msg):
msg = BECMessage.ProcessedDataMessage.loads(msg.value)
self.new_dap_data.emit(msg.content["data"])
if self._dap_channels:
for chan in self._dap_channels:
proc_ep = MessageEndpoints.processed_data(chan)
dap_thread = bec_connector.consumer(topics=proc_ep, cb=_proc_cb)
dap_thread.start()
self._dap_threads.append(dap_thread)
self.show()
def main():
parser = argparse.ArgumentParser(
prog="bec-pyqt", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("uipath", type=str, help="Path to a BEC ui file")
args, rem = parser.parse_known_args()
app = QApplication(rem)
BEC_UI(args.uipath)
app.exec_()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,33 @@
import os
import sys
from PyQt5 import QtWidgets, uic
class UI(QtWidgets.QWidget):
def __init__(self, uipath):
super().__init__()
self.ui = uic.loadUi(uipath, self)
_, fname = os.path.split(uipath)
self.setWindowTitle(fname)
self.show()
def main():
"""A basic script to display UI file
Run the script, passing UI file path as an argument, e.g.
$ python bec_widgets/display_ui_file.py bec_widgets/line_plot.ui
"""
app = QtWidgets.QApplication(sys.argv)
UI(sys.argv[1])
sys.exit(app.exec_())
if __name__ == "__main__":
main()

View File

@ -0,0 +1,56 @@
from PyQt5.QtDesigner import QPyDesignerCustomWidgetPlugin
from PyQt5.QtGui import QIcon
from bec_widgets.scan2d_plot import BECScanPlot2D
class BECScanPlot2DPlugin(QPyDesignerCustomWidgetPlugin):
def __init__(self, parent=None):
super().__init__(parent)
self._initialized = False
def initialize(self, formEditor):
if self._initialized:
return
self._initialized = True
def isInitialized(self):
return self._initialized
def createWidget(self, parent):
return BECScanPlot2D(parent)
def name(self):
return "BECScanPlot2D"
def group(self):
return "BEC widgets"
def icon(self):
return QIcon()
def toolTip(self):
return "BEC plot for 2D scans"
def whatsThis(self):
return "BEC plot for 2D scans"
def isContainer(self):
return False
def domXml(self):
return (
'<widget class="BECScanPlot2D" name="BECScanPlot2D">\n'
' <property name="toolTip" >\n'
" <string>BEC plot for 2D scans</string>\n"
" </property>\n"
' <property name="whatsThis" >\n'
" <string>BEC plot for 2D scans in Python using PyQt.</string>\n"
" </property>\n"
"</widget>\n"
)
def includeFile(self):
return "scan2d_plot"

View File

@ -1,12 +1,12 @@
from PyQt5.QtDesigner import QPyDesignerCustomWidgetPlugin
from PyQt5.QtGui import QIcon
from .scan_plot import BECScanPlot
from bec_widgets.scan_plot import BECScanPlot
class BECScanPlotPlugin(QPyDesignerCustomWidgetPlugin):
def __init__(self, parent=None):
super(BECScanPlotPlugin, self).__init__(parent)
super().__init__(parent)
self._initialized = False

View File

@ -1,6 +1,11 @@
Add/modify the path in the following variable to make the plugin avaiable in Qt Designer:
```
$ export PYQTDESIGNERPATH=/<path to repo>/bec/bec_qtplugin:$PYQTDESIGNERPATH
$ export PYQTDESIGNERPATH=/<path to repo>/bec_widgets/qtdesigner_plugins
```
It can be done when activating a conda environment (run with the corresponding env already activated):
```
$ conda env config vars set PYQTDESIGNERPATH=/<path to repo>/bec_widgets/qtdesigner_plugins
```
All the available conda-forge `pyqt >=5.15` packages don't seem to support loading Qt Designer

140
bec_widgets/scan2d_plot.py Normal file
View File

@ -0,0 +1,140 @@
import numpy as np
import pyqtgraph as pg
from bec_lib.core.logger import bec_logger
from PyQt5.QtCore import pyqtProperty, pyqtSlot
from bec_widgets.bec_dispatcher import bec_dispatcher
logger = bec_logger.logger
pg.setConfigOptions(background="w", foreground="k", antialias=True)
class BECScanPlot2D(pg.GraphicsView):
def __init__(self, parent=None, background="default"):
super().__init__(parent, background)
bec_dispatcher.connect(self)
self._x_channel = ""
self._y_channel = ""
self._z_channel = ""
self._xpos = []
self._ypos = []
self._x_ind = None
self._y_ind = None
self.plot_item = pg.PlotItem()
self.setCentralItem(self.plot_item)
self.plot_item.setAspectLocked(True)
self.imageItem = pg.ImageItem()
self.plot_item.addItem(self.imageItem)
@pyqtSlot(dict, dict)
def on_new_scan(self, _scan_segment, metadata):
# TODO: Do we reset in case of a scan type change?
self.imageItem.clear()
# TODO: better to check the number of coordinates in metadata["positions"]?
if metadata["scan_name"] != "grid_scan":
return
positions = [sorted(set(pos)) for pos in zip(*metadata["positions"])]
motors = metadata["scan_motors"]
if self.x_channel and self.y_channel:
self._x_ind = motors.index(self.x_channel) if self.x_channel in motors else None
self._y_ind = motors.index(self.y_channel) if self.y_channel in motors else None
elif not self.x_channel and not self.y_channel:
# Plot the first and second motors along x and y axes respectively
self._x_ind = 0
self._y_ind = 1
else:
logger.warning(
f"X and Y channels should be either both empty or both set in {self.objectName()}"
)
if self._x_ind is None or self._y_ind is None:
return
xpos = positions[self._x_ind]
ypos = positions[self._y_ind]
self._xpos = xpos
self._ypos = ypos
self.imageItem.setImage(np.zeros(shape=(len(xpos), len(ypos))))
w = max(xpos) - min(xpos)
h = max(ypos) - min(ypos)
w_pix = w / (len(xpos) - 1)
h_pix = h / (len(ypos) - 1)
self.imageItem.setRect(min(xpos) - w_pix / 2, min(ypos) - h_pix / 2, w + w_pix, h + h_pix)
self.plot_item.setLabel("bottom", motors[self._x_ind])
self.plot_item.setLabel("left", motors[self._y_ind])
@pyqtSlot(dict, dict)
def on_scan_segment(self, scan_segment, metadata):
if not self.z_channel or metadata["scan_name"] != "grid_scan":
return
if self._x_ind is None or self._y_ind is None:
return
point_coord = metadata["positions"][scan_segment["point_id"]]
x_coord_ind = self._xpos.index(point_coord[self._x_ind])
y_coord_ind = self._ypos.index(point_coord[self._y_ind])
data = scan_segment["data"]
z_new = data[self.z_channel][self.z_channel]["value"]
image = self.imageItem.image
image[x_coord_ind, y_coord_ind] = z_new
self.imageItem.setImage()
@pyqtProperty(str)
def x_channel(self):
return self._x_channel
@x_channel.setter
def x_channel(self, new_val):
self._x_channel = new_val
self.plot_item.setLabel("bottom", new_val)
@pyqtProperty(str)
def y_channel(self):
return self._y_channel
@y_channel.setter
def y_channel(self, new_val):
self._y_channel = new_val
self.plot_item.setLabel("left", new_val)
@pyqtProperty(str)
def z_channel(self):
return self._z_channel
@z_channel.setter
def z_channel(self, new_val):
self._z_channel = new_val
if __name__ == "__main__":
import sys
from PyQt5.QtWidgets import QApplication
app = QApplication(sys.argv)
plot = BECScanPlot2D()
# If x_channel and y_channel are both omitted, they will be inferred from each running grid scan
plot.z_channel = "bpm3y"
plot.show()
sys.exit(app.exec_())

View File

@ -4,6 +4,8 @@ import pyqtgraph as pg
from bec_lib.core.logger import bec_logger
from PyQt5.QtCore import pyqtProperty, pyqtSlot
from bec_widgets.bec_dispatcher import bec_dispatcher
logger = bec_logger.logger
@ -11,9 +13,13 @@ pg.setConfigOptions(background="w", foreground="k", antialias=True)
COLORS = ["#fd7f6f", "#7eb0d5", "#b2e061", "#bd7ebe", "#ffb55a"]
class BECScanPlot(pg.PlotWidget):
class BECScanPlot(pg.GraphicsView):
def __init__(self, parent=None, background="default"):
super().__init__(parent, background)
bec_dispatcher.connect(self)
self.view = pg.PlotItem()
self.setCentralItem(self.view)
self._x_channel = ""
self._y_channel_list = []
@ -21,36 +27,18 @@ class BECScanPlot(pg.PlotWidget):
self.scan_curves = {}
self.dap_curves = {}
def initialize(self):
plot_item = self.getPlotItem()
plot_item.addLegend()
colors = itertools.cycle(COLORS)
for y_chan in self.y_channel_list:
if y_chan.startswith("dap."):
y_chan = y_chan.partition("dap.")[-1]
curves = self.dap_curves
else:
curves = self.scan_curves
curves[y_chan] = plot_item.plot(
x=[], y=[], pen=pg.mkPen(color=next(colors), width=2), name=y_chan
)
plot_item.setLabel("bottom", self._x_channel)
if len(self.scan_curves) == 1:
plot_item.setLabel("left", next(iter(self.scan_curves)))
@pyqtSlot()
def clearData(self):
@pyqtSlot(dict, dict)
def on_new_scan(self, _scan_segment, _metadata):
for plot_curve in {**self.scan_curves, **self.dap_curves}.values():
plot_curve.setData(x=[], y=[])
@pyqtSlot(dict)
def redraw_scan(self, data):
@pyqtSlot(dict, dict)
def on_scan_segment(self, scan_segment, _metadata):
if not self.x_channel:
return
data = scan_segment["data"]
if self.x_channel not in data:
logger.warning(f"Unknown channel `{self.x_channel}` for X data in {self.objectName()}")
return
@ -94,8 +82,35 @@ class BECScanPlot(pg.PlotWidget):
@y_channel_list.setter
def y_channel_list(self, new_list):
# TODO: do we want to care about dap/not dap here?
chan_removed = [chan for chan in self._y_channel_list if chan not in new_list]
if chan_removed and chan_removed[0].startswith("dap."):
chan_removed = chan_removed[0].partition("dap.")[-1]
bec_dispatcher.disconnect_dap_slot(self.redraw_dap, chan_removed)
self._y_channel_list = new_list
# Prepare plot for a potentially different list of y channels
self.view.clear()
self.view.addLegend()
colors = itertools.cycle(COLORS)
for y_chan in new_list:
if y_chan.startswith("dap."):
y_chan = y_chan.partition("dap.")[-1]
curves = self.dap_curves
bec_dispatcher.connect_dap_slot(self.redraw_dap, y_chan)
else:
curves = self.scan_curves
curves[y_chan] = self.view.plot(
x=[], y=[], pen=pg.mkPen(color=next(colors), width=2), name=y_chan
)
if len(new_list) == 1:
self.view.setLabel("left", new_list[0])
@pyqtProperty(str)
def x_channel(self):
return self._x_channel
@ -103,6 +118,7 @@ class BECScanPlot(pg.PlotWidget):
@x_channel.setter
def x_channel(self, new_val):
self._x_channel = new_val
self.view.setLabel("bottom", new_val)
if __name__ == "__main__":
@ -113,9 +129,9 @@ if __name__ == "__main__":
app = QApplication(sys.argv)
plot = BECScanPlot()
plot.y_channel_list = ["a", "b", "c"]
plot.x_channel = "samx"
plot.y_channel_list = ["bpm3y", "bpm6y"]
plot.initialize()
plot.show()
sys.exit(app.exec_())