1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-03-08 01:37:52 +01:00

refactor: plot update via proxy

This commit is contained in:
wyzula-jan
2023-08-28 16:05:01 +02:00
parent ff545bf5c9
commit fc4b54239e

View File

@@ -1,89 +1,125 @@
from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtWidgets import QApplication, QWidget
from bec_lib.core import MessageEndpoints, BECMessage
from pyqtgraph.Qt import QtWidgets, uic
from PyQt5.QtCore import pyqtSignal, Qt
from threading import RLock
from bec_lib.core import MessageEndpoints
import os
import numpy as np
from enum import Enum
import pyqtgraph as pg
from PyQt5 import QtGui
from PyQt5.QtCore import QThread, pyqtSlot
from PyQt5.QtCore import pyqtSignal, Qt
from PyQt5.QtCore import pyqtSignal
from PyQt5.QtCore import pyqtSlot
from PyQt5.QtWidgets import QApplication, QWidget
from pyqtgraph import mkPen
from pyqtgraph import mkBrush, mkPen
from pyqtgraph.Qt import QtCore
from pyqtgraph import mkBrush, mkColor, mkPen
from pyqtgraph.Qt import uic
from bec_lib.core import MessageEndpoints
# TODO implement:
# - add crosshair
# - add crosshair table
# - implement scanID database for visualizing previous scans
# - multiple signals for different monitors
# - user can choose what motor against what monitor to plot
class PlotApp(QWidget):
update_plot = pyqtSignal()
# update_scatters = pyqtSignal()
init_plot = pyqtSignal()
update_signal = pyqtSignal()
def __init__(self):
super().__init__()
self.scanID = None
self.motor_data = []
self.monitor_data = []
self.dap_x = []
self.dap_y = []
current_path = os.path.dirname(__file__)
uic.loadUi(os.path.join(current_path, "oneplot.ui"), self)
self.monitor_names = ["gauss_bpm"]
self.init_ui()
self.init_curves()
self.init_plot.connect(self.init_curves)
self.update_plot.connect(self.update_curves)
self.proxy_update = pg.SignalProxy(self.update_signal, rateLimit=25, slot=self.update)
def init_ui(self):
self.plot = pg.PlotItem()
self.glw.addItem(self.plot)
self.plot.setLabel("bottom", "Motor")
self.plot.setLabel("left", "Monitor")
self.plot.addLegend()
def init_curves(self):
self.plot.clear()
self.curves = []
self.scatters = []
self.pens = []
self.brushs = []
self.brushs = [] # todo check if needed
color_list = ["#384c6b", "#e28a2b", "#5E3023", "#e41a1c", "#984e83", "#4daf4a"]
for ii, monitor in enumerate(self.monitor_names):
print(self.monitor_names[ii])
pen = mkPen(color=color_list[ii], width=2, style=QtCore.Qt.DashLine)
brush = mkBrush(color=color_list[ii])
curve = pg.PlotDataItem(symbolBrush=brush, pen=pen, skipFiniteCheck=True, name=monitor)
self.plot.addItem(curve)
# brush = mkBrush(color=color_list[ii])
curve = pg.PlotDataItem(
pen=pen, skipFiniteCheck=True, name=monitor + " fit"
) # ,symbolBrush=brush)
scatter = pg.ScatterPlotItem(pen=pen, size=10, name=monitor) # ,brush=brush,)
self.curves.append(curve)
self.scatters.append(scatter)
self.pens.append(pen)
self.brushs.append(brush)
# self.brushs.append(brush)
self.plot.addItem(curve)
self.plot.addItem(scatter)
self.plot.addLegend()
# self.plot.addLegend() # TODO check if needed
# TODO hook signals
# TODO hook crosshair
def update_curves(self):
def update(self):
self.curves[0].setData(self.motor_data, self.monitor_data)
self.scatters[0].setData(self.motor_data, self.monitor_data)
def on_dap_update(self, msg, metadata):
@pyqtSlot(dict, dict)
def on_dap_update(self, msg, metadata) -> None:
"""
Getting processed data from DAP
Args:
msg (dict):
metadata(dict):
"""
...
# print("on_dap_update")
print("on_dap_update")
# print(f'msg "on_dap_update" = {msg}')
# self.dap_x = msg["gaussian_fit_worker_3"]["x"]
# print(f"self.dap_x = {self.dap_x}")
dap_x = msg["gaussian_fit_worker_3"]["x"]
dap_y = msg["gaussian_fit_worker_3"]["y"]
self.dap_x.append(dap_x)
self.dap_y.append(dap_y)
# self.update_signal.emit()
# print(metadata)
@pyqtSlot(dict, dict)
def on_scan_segment(self, msg, metadata):
# TODO x -> motor
# TODO y -> monitor._hints :list
print("on_scan_segment")
# scanMSG = BECMessage.ScanMessage.loads(msg.value)
# message = msg
# print(f"message = {message}")
current_scanID = msg["scanID"]
print(f"current_scanID = {current_scanID}")
# implement if condition that if scan id is different than last one init new scan variables
if current_scanID != self.scanID:
self.scanID = current_scanID
self.motor_data = []
self.monitor_data = []
self.init_curves()
motor_data = msg["data"]["samx"]["samx"]["value"]
monitor_data = msg["data"]["gauss_bpm"]["gauss_bpm"][
"value"
@@ -92,49 +128,27 @@ class PlotApp(QWidget):
self.motor_data.append(motor_data)
self.monitor_data.append(monitor_data)
self.update_plot.emit()
# self.update_plot.emit()
self.update_signal.emit()
def on_new_scan(self, msg, metadata):
print(30 * "#" + "NEW SCAN" + 30 * "#")
@pyqtSlot(dict, dict)
def on_new_scan(self, msg, metadata): # TODO probably not needed
"""
Initiate new scan and clear previous data
Args:
msg(dict):
metadata(dict):
self.motor_data = [msg["data"]["samx"]["samx"]["value"]]
self.monitor_data = [msg["data"]["gauss_bpm"]["gauss_bpm"]["value"]]
self.init_curves()
print(f'msg "on_new_scan" = {msg}')
# print(metadata)
Returns:
"""
print(40 * "#" + "on_new_scan" + 40 * "#")
# self.motor_data = [msg["data"]["samx"]["samx"]["value"]]
# self.monitor_data = [msg["data"]["gauss_bpm"]["gauss_bpm"]["value"]]
# self.init_curves()
# class Controller(QThread):
# new_scan = pyqtSignal(dict, dict)
# scan_segment = pyqtSignal(dict, dict)
# new_dap_data = pyqtSignal(dict, dict)
#
# def __init__(self):
# super().__init__()
# self.scan_lock = RLock()
#
# def _scan_segment_cb(msg, parent, **_kwargs):
# msg = BECMessage.ScanMessage.loads(msg.value)
# for i in msg:
# with parent.scan_lock:
# # TODO: use ScanStatusMessage instead?
# scan_id = msg.content["scanID"]
# if parent._scan_id != scan_id:
# parent._scan_id = scan_id
# parent.new_scan.emit(msg.content, msg.metadata)
# parent.scan_segment.emit(msg.content, msg.metadata)
#
# scan_segment_topic = MessageEndpoints.scan_segment()
# parent._scan_segment_thread = parent.client.connector.consumer(
# topics=scan_segment_topic,
# cb=_scan_segment_cb,
# )
# parent._scan_segment_thread.start()
#
# @staticmethod
# def _scan_segment_callback(msg, *, parent, **_kwargs) -> None:
# scanMSG = BECMessage.ScanMessage.loads(msg.value)
# self.data_x
if __name__ == "__main__":
# from bec_lib import BECClient