mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 03:31:50 +02:00
feat: device_consumer is getting scanID and initialise stream_consumer
This commit is contained in:
@ -1,22 +1,19 @@
|
|||||||
# import simulation_progress as SP
|
# import simulation_progress as SP
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pyqtgraph as pg
|
import pyqtgraph as pg
|
||||||
|
from PyQt5.QtCore import pyqtSignal, pyqtSlot
|
||||||
from PyQt5.QtCore import pyqtSlot, pyqtSignal
|
|
||||||
from PyQt5.QtWidgets import (
|
from PyQt5.QtWidgets import (
|
||||||
QApplication,
|
QApplication,
|
||||||
QVBoxLayout,
|
QVBoxLayout,
|
||||||
QLabel,
|
|
||||||
QWidget,
|
QWidget,
|
||||||
QProgressBar,
|
|
||||||
QPushButton,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
from bec_lib.core import MessageEndpoints, RedisConnector, BECMessage
|
from bec_lib.core import MessageEndpoints, BECMessage
|
||||||
|
|
||||||
|
|
||||||
class StreamApp(QWidget):
|
class StreamApp(QWidget):
|
||||||
update_signal = pyqtSignal()
|
update_signal = pyqtSignal()
|
||||||
|
new_scanID = pyqtSignal(str)
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
@ -24,11 +21,13 @@ class StreamApp(QWidget):
|
|||||||
self.init_ui()
|
self.init_ui()
|
||||||
|
|
||||||
self.data = None
|
self.data = None
|
||||||
# self.scanID = None
|
self.scanID = None
|
||||||
self.stream_consumer = None
|
self.stream_consumer = None
|
||||||
|
|
||||||
|
self.device_consumer("mca")
|
||||||
|
|
||||||
|
self.new_scanID.connect(self.create_new_stream_consumer)
|
||||||
self.update_signal.connect(self.plot_new)
|
self.update_signal.connect(self.plot_new)
|
||||||
self.connect_stream_consumer("ScanID1", "mca")
|
|
||||||
|
|
||||||
def init_ui(self):
|
def init_ui(self):
|
||||||
# Create layout and add widgets
|
# Create layout and add widgets
|
||||||
@ -36,15 +35,20 @@ class StreamApp(QWidget):
|
|||||||
self.setLayout(self.layout)
|
self.setLayout(self.layout)
|
||||||
|
|
||||||
# Create plot
|
# Create plot
|
||||||
self.plot_widget = pg.PlotWidget(title="2D plot for mcs data")
|
# self.glw = pg.GraphicsLayoutWidget()
|
||||||
|
self.plot_widget = pg.PlotWidget(title="MCA readout")
|
||||||
self.image_item = pg.ImageItem()
|
self.image_item = pg.ImageItem()
|
||||||
self.label_id = pg.LabelItem(justify="left")
|
|
||||||
self.plot_widget.addItem(self.label_id)
|
|
||||||
self.plot_widget.addItem(self.image_item)
|
self.plot_widget.addItem(self.image_item)
|
||||||
|
|
||||||
# Add widgets to the layout
|
# Add widgets to the layout
|
||||||
self.layout.addWidget(self.plot_widget)
|
self.layout.addWidget(self.plot_widget)
|
||||||
|
|
||||||
|
@pyqtSlot(str)
|
||||||
|
def create_new_stream_consumer(self, scanID: str):
|
||||||
|
print(f"Creating new stream consumer for scanID: {scanID}")
|
||||||
|
|
||||||
|
self.connect_stream_consumer(scanID, "mca")
|
||||||
|
|
||||||
def connect_stream_consumer(self, scanID, device):
|
def connect_stream_consumer(self, scanID, device):
|
||||||
if self.stream_consumer is not None:
|
if self.stream_consumer is not None:
|
||||||
self.stream_consumer.shutdown()
|
self.stream_consumer.shutdown()
|
||||||
@ -57,8 +61,15 @@ class StreamApp(QWidget):
|
|||||||
|
|
||||||
self.stream_consumer.start()
|
self.stream_consumer.start()
|
||||||
|
|
||||||
|
def device_consumer(self, device):
|
||||||
|
self.device_consumer = connector.consumer(
|
||||||
|
topics=MessageEndpoints.device_status(device), cb=self._device_cv, parent=self
|
||||||
|
)
|
||||||
|
|
||||||
|
self.device_consumer.start()
|
||||||
|
|
||||||
def plot_new(self):
|
def plot_new(self):
|
||||||
self.image_item.setImage(self.data)
|
self.image_item.setImage(self.data.T)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _streamer_cb(msg, *, parent, **_kwargs) -> None:
|
def _streamer_cb(msg, *, parent, **_kwargs) -> None:
|
||||||
@ -68,25 +79,37 @@ class StreamApp(QWidget):
|
|||||||
metadata = msgMCS.metadata
|
metadata = msgMCS.metadata
|
||||||
|
|
||||||
if parent.data is None:
|
if parent.data is None:
|
||||||
parent.data = row
|
parent.data = np.array([row])
|
||||||
|
|
||||||
|
# Check if the current number of rows is odd
|
||||||
|
# if parent.data is not None and parent.data.shape[0] % 2 == 0:
|
||||||
|
# row = np.flip(row) # Flip the rowR
|
||||||
else:
|
else:
|
||||||
parent.data = np.vstack((parent.data, row))
|
parent.data = np.vstack((parent.data, row))
|
||||||
|
|
||||||
# current_scanID = metadata.get("scanID", None)
|
|
||||||
# if current_scanID is None:
|
|
||||||
# return
|
|
||||||
|
|
||||||
# if current_scanID != parent.scanID:
|
|
||||||
# parent.scanID = current_scanID
|
|
||||||
# parent.data = row
|
|
||||||
# parent.image_item.clear()
|
|
||||||
|
|
||||||
print(f"msg: {msg}")
|
|
||||||
print(f"metadata: {metadata}")
|
|
||||||
print(f"parent.data: {parent.data}")
|
|
||||||
|
|
||||||
parent.update_signal.emit()
|
parent.update_signal.emit()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _device_cv(msg, *, parent, **_kwargs) -> None:
|
||||||
|
msgDEV = BECMessage.DeviceMessage.loads(msg.value)
|
||||||
|
|
||||||
|
current_scanID = msgDEV.metadata["scanID"]
|
||||||
|
|
||||||
|
if parent.scanID is None:
|
||||||
|
parent.scanID = current_scanID
|
||||||
|
parent.new_scanID.emit(current_scanID)
|
||||||
|
print(f"New scanID: {current_scanID}")
|
||||||
|
|
||||||
|
if current_scanID != parent.scanID:
|
||||||
|
parent.scanID = current_scanID
|
||||||
|
parent.data = None
|
||||||
|
parent.image_item.clear()
|
||||||
|
parent.new_scanID.emit(current_scanID)
|
||||||
|
|
||||||
|
print(f"New scanID: {current_scanID}")
|
||||||
|
|
||||||
|
# print(msgDEV)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
from bec_lib.core import RedisConnector
|
from bec_lib.core import RedisConnector
|
||||||
|
@ -5,24 +5,30 @@ connector = RedisConnector("localhost:6379")
|
|||||||
producer = connector.producer()
|
producer = connector.producer()
|
||||||
metadata = {}
|
metadata = {}
|
||||||
|
|
||||||
scanID = "ScanID1"
|
scanID = "ScanID3"
|
||||||
|
|
||||||
metadata.update(
|
metadata.update(
|
||||||
{
|
{
|
||||||
"scanID": scanID,
|
"scanID": scanID, # this will be different for each scan
|
||||||
"async_update": "append",
|
"async_update": "append",
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
for ii in range(20):
|
for ii in range(20):
|
||||||
data = {"mca1": [10, 2, 3, 4, 5, 6, 7, 8, 9, 10], "mca2": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]}
|
data = {"mca1": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], "mca2": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]}
|
||||||
msg = BECMessage.DeviceMessage(
|
msg = BECMessage.DeviceMessage(
|
||||||
signals=data,
|
signals=data,
|
||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
).dumps()
|
).dumps()
|
||||||
|
|
||||||
|
producer.send(topic=MessageEndpoints.device_status(device="mca"), msg=msg)
|
||||||
|
|
||||||
producer.xadd(
|
producer.xadd(
|
||||||
topic=MessageEndpoints.device_async_readback(scanID=scanID, device="mca"),
|
topic=MessageEndpoints.device_async_readback(
|
||||||
|
scanID=scanID, device="mca"
|
||||||
|
), # scanID will be different for each scan
|
||||||
msg={"data": msg},
|
msg={"data": msg},
|
||||||
expire=1800,
|
expire=1800,
|
||||||
)
|
)
|
||||||
|
|
||||||
print(f"Sent {ii}")
|
print(f"Sent {ii}")
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
|
Reference in New Issue
Block a user