1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-03-08 09:47:48 +01:00

feat: oneplot initialized as an example app for plotting motor vs monitor signals + dispatcher loop over msg

This commit is contained in:
wyzula-jan
2023-08-28 14:07:07 +02:00
parent 413e4356cf
commit 98c0c64e85
2 changed files with 136 additions and 4 deletions

View File

@@ -21,8 +21,7 @@ class _BECDap:
# Adding a new pyqt signal requres a class factory, as they must be part of the class definition
# and cannot be dynamically added as class attributes after the class has been defined.
_signal_class_factory = (
type(f"Signal{i}", (QObject,), dict(signal=pyqtSignal("PyQt_PyObject")))
for i in itertools.count()
type(f"Signal{i}", (QObject,), dict(signal=pyqtSignal(dict, dict))) for i in itertools.count()
)
@@ -99,7 +98,10 @@ class _BECDispatcher(QObject):
def cb(msg):
msg = BECMessage.MessageReader.loads(msg.value)
self._connections[topic].signal.emit(msg)
if not isinstance(msg, list):
msg = [msg]
for msg_i in msg:
self._connections[topic].signal.emit(msg_i.content, msg_i.metadata)
consumer = self.client.connector.consumer(topics=topic, cb=cb)
consumer.start()
@@ -132,7 +134,10 @@ class _BECDispatcher(QObject):
def _dap_cb(msg):
msg = BECMessage.ProcessedDataMessage.loads(msg.value)
self.new_dap_data.emit(msg.content["data"], msg.metadata)
if not isinstance(msg, list):
msg = [msg]
for i in msg:
self.new_dap_data.emit(i.content["data"], i.metadata)
dap_ep = MessageEndpoints.processed_data(dap_name)
consumer = self.client.connector.consumer(topics=dap_ep, cb=_dap_cb)

View File

@@ -0,0 +1,127 @@
from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtWidgets import QApplication, QWidget
from bec_lib.core import MessageEndpoints, BECMessage
from pyqtgraph.Qt import QtWidgets, uic
from PyQt5.QtCore import pyqtSignal, Qt
from threading import RLock
import os
import numpy as np
from enum import Enum
import pyqtgraph as pg
from PyQt5 import QtGui
from PyQt5.QtCore import QThread, pyqtSlot
from PyQt5.QtCore import pyqtSignal, Qt
from PyQt5.QtWidgets import QApplication, QWidget
class PlotApp(QWidget):
def __init__(self):
super().__init__()
self.motor_data = None
self.monitor_data = None
current_path = os.path.dirname(__file__)
uic.loadUi(os.path.join(current_path, "oneplot.ui"), self)
self.init_ui()
def init_ui(self):
self.plot = pg.PlotItem()
self.glw.addItem(self.plot)
self.plot.setLabel("bottom", "Motor")
self.plot.setLabel("left", "Monitor")
def on_dap_update(self, msg, metadata):
...
# print("on_dap_update")
# print(f'msg "on_dap_update" = {msg}')
# self.dap_x = msg["gaussian_fit_worker_3"]["x"]
# print(f"self.dap_x = {self.dap_x}")
# print(metadata)
def on_scan_segment(self, msg, metadata):
# TODO x -> motor
# TODO y -> monitor._hints :list
print("on_scan_segment")
# scanMSG = BECMessage.ScanMessage.loads(msg.value)
self.motor_data = msg["samx"]["samx"]["value"]
self.monitor_data = msg["gauss_bpm"]["gauss_bpm"][
"value"
] # gaussbpm._hints -> implement logic with list
# self.scan_x =
# scanMSG = msg.content["data"]
# self.data_x = BECMessage.ScanMessage.loads(msg)
# self.data_y = BECMessage.ScanMessage.loads(msg)
# print(msg)
print(f'msg "on_scan_segment" = {msg}')
def on_new_scan(self, msg, metadata):
...
# print("on_new_scan")
# print(f'msg "on_new_scan" = {msg}')
# print(metadata)
# class Controller(QThread):
# new_scan = pyqtSignal(dict, dict)
# scan_segment = pyqtSignal(dict, dict)
# new_dap_data = pyqtSignal(dict, dict)
#
# def __init__(self):
# super().__init__()
# self.scan_lock = RLock()
#
# def _scan_segment_cb(msg, parent, **_kwargs):
# msg = BECMessage.ScanMessage.loads(msg.value)
# for i in msg:
# with parent.scan_lock:
# # TODO: use ScanStatusMessage instead?
# scan_id = msg.content["scanID"]
# if parent._scan_id != scan_id:
# parent._scan_id = scan_id
# parent.new_scan.emit(msg.content, msg.metadata)
# parent.scan_segment.emit(msg.content, msg.metadata)
#
# scan_segment_topic = MessageEndpoints.scan_segment()
# parent._scan_segment_thread = parent.client.connector.consumer(
# topics=scan_segment_topic,
# cb=_scan_segment_cb,
# )
# parent._scan_segment_thread.start()
#
# @staticmethod
# def _scan_segment_callback(msg, *, parent, **_kwargs) -> None:
# scanMSG = BECMessage.ScanMessage.loads(msg.value)
# self.data_x
if __name__ == "__main__":
# from bec_lib import BECClient
from bec_widgets import ctrl_c
from bec_widgets.bec_dispatcher import bec_dispatcher
client = bec_dispatcher.client
client.start()
dev = client.device_manager.devices
scans = client.scans
queue = client.queue
app = QApplication([])
plotApp = PlotApp()
bec_dispatcher.connect_dap_slot(plotApp.on_dap_update, "gaussian_fit_worker_3")
bec_dispatcher.connect_slot(plotApp.on_scan_segment, MessageEndpoints.scan_segment())
bec_dispatcher.new_scan.connect(plotApp.on_new_scan) # TODO check if works!
# bec_dispatcher.connect_slot(plotApp.on_new_scan,)
ctrl_c.setup(app)
window = plotApp
window.show()
app.exec_()