0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 03:31:50 +02:00

feat(plots/image): change stream processor to QThread with connector.get_last; cleanup method for BECFigure to kill all threads if App is closed during acquisition

This commit is contained in:
wyzula-jan
2024-02-27 18:13:42 +01:00
parent 9ad0055336
commit 7ffedd9ceb
3 changed files with 109 additions and 61 deletions

View File

@ -551,6 +551,8 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
def clear_all(self):
"""Clear all widgets from the figure and reset to default state"""
for widget in self.widgets.values():
widget.cleanup()
self.clear()
self.widgets = defaultdict(dict)
self.grid = []
@ -559,17 +561,33 @@ class BECFigure(BECConnector, pg.GraphicsLayoutWidget):
widget_class=self.__class__.__name__, gui_id=self.gui_id, theme=theme
)
def cleanup(self):
"""Cleanup the figure widget."""
self.clear_all()
self.close()
def start(self):
import sys
app = QApplication(sys.argv)
win = QMainWindow()
win.setCentralWidget(self)
win = BECFigureMainWindow(bec_figure=self)
win.show()
sys.exit(app.exec_())
class BECFigureMainWindow(QMainWindow):
def __init__(self, parent=None, bec_figure=None):
super().__init__(parent)
self.figure = bec_figure
self.setCentralWidget(self.figure)
def closeEvent(self, event):
self.figure.cleanup()
super().closeEvent(event)
##################################################
##################################################
# Debug window
@ -590,6 +608,7 @@ class JupyterConsoleWidget(RichJupyterWidget): # pragma: no cover:
self.kernel_client.start_channels()
self.kernel_manager.kernel.shell.push({"np": np, "pg": pg})
# self.set_console_font_size(70)
def shutdown_kernel(self):
self.kernel_client.stop_channels()
@ -611,18 +630,26 @@ class DebugWindow(QWidget): # pragma: no cover:
# console push
self.console.kernel_manager.kernel.shell.push(
{"fig": self.figure, "w1": self.w1, "w2": self.w2, "w3": self.w3, "w4": self.w4}
{
"fig": self.figure,
"w1": self.w1,
"w2": self.w2,
"w3": self.w3,
"w4": self.w4,
"bec": self.figure.client,
"scans": self.figure.client.scans,
"dev": self.figure.client.device_manager.devices,
}
)
def _init_ui(self):
# Plotting window
self.glw_1_layout = QVBoxLayout(self.glw) # Create a new QVBoxLayout
self.figure = BECFigure(parent=self) # Create a new BECDeviceMonitor
self.figure = BECFigure(parent=self, gui_id="remote") # Create a new BECDeviceMonitor
self.glw_1_layout.addWidget(self.figure) # Add BECDeviceMonitor to the layout
# add stuff to figure
self._init_figure()
# self.add_debug_histo()
self.console_layout = QVBoxLayout(self.widget_console)
self.console = JupyterConsoleWidget()
@ -685,12 +712,15 @@ class DebugWindow(QWidget): # pragma: no cover:
# self.w3.add_color_bar("simple")
# Image setting for w4
self.w4.set_monitor("eiger")
# self.w4.add_color_bar("full")
def add_debug_histo(self):
image_data = np.random.normal(loc=100, scale=50, size=(100, 100)) # Example image data
self.figure.add_image_with_histogram(image_data, row=2, col=0)
self.w4.set_monitor("eiger")
# self.w4.add_color_bar("full")
def closeEvent(self, event):
self.figure.cleanup()
self.close()
super().closeEvent(event)
if __name__ == "__main__": # pragma: no cover

View File

@ -6,25 +6,20 @@
<rect>
<x>0</x>
<y>0</y>
<width>901</width>
<height>1000</height>
<width>2104</width>
<height>966</height>
</rect>
</property>
<property name="windowTitle">
<string>Form</string>
<string>Plotting Console</string>
</property>
<layout class="QVBoxLayout" name="verticalLayout_3">
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<widget class="QSplitter" name="splitter_2">
<widget class="QSplitter" name="splitter">
<property name="orientation">
<enum>Qt::Vertical</enum>
<enum>Qt::Horizontal</enum>
</property>
<widget class="QSplitter" name="splitter">
<property name="orientation">
<enum>Qt::Horizontal</enum>
</property>
<widget class="QWidget" name="glw" native="true"/>
</widget>
<widget class="QWidget" name="glw" native="true"/>
<widget class="QWidget" name="widget_console" native="true"/>
</widget>
</item>

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import time
from typing import Literal, Optional
import numpy as np
@ -127,11 +128,15 @@ class BECImageShow(BECPlotBase):
# Args to pass
self.monitor = monitor
self.scanID = None
# init image and image thread
self._init_image()
self._init_image_thread(monitor=self.monitor)
# Dispatcher
self.bec_dispatcher.connect_slot(self.on_scan_segment, MessageEndpoints.scan_segment())
def find_widget_by_id(self, item_id: str):
if self.image.gui_id == item_id:
return self.image
@ -156,6 +161,7 @@ class BECImageShow(BECPlotBase):
self.proxy_update_plot = pg.SignalProxy(
self.image_thread.image_updated, rateLimit=25, slot=self.on_image_update
)
# self.image_thread.start()
def _add_color_bar(
self, style: Literal["simple,full"] = "simple", vrange: tuple[int, int] = (0, 100)
@ -258,8 +264,37 @@ class BECImageShow(BECPlotBase):
@pyqtSlot(np.ndarray)
def on_image_update(self, image):
# print(f"Image updated: {image.shape}")
print(f"Image updated")
self.image.updateImage(image[0])
@pyqtSlot(dict, dict)
def on_scan_segment(self, msg, metadata):
"""
Serves as a trigger to image acquisition.
Update the scanID and start the image thread if the scanID is different from the current one.
Args:
msg(dict): The content of the message.
metadata(dict): The metadata of the message.
"""
current_scanID = msg.get("scanID", None)
if current_scanID is None:
return
if current_scanID != self.scanID:
self.scanID = current_scanID
self.image_thread.start()
max_points = metadata.get("num_points", None)
current_point = msg.get("point_id") + 1
print(f"Current point: {current_point} out of {max_points}")
if current_point == max_points:
if self.image_thread.isRunning():
self.image_thread.stop()
print("image thread done")
def set_image(self, data: np.ndarray):
"""
Set the image to be displayed.
@ -267,16 +302,13 @@ class BECImageShow(BECPlotBase):
data(np.ndarray): The image to be displayed.
"""
self.imageItem.setImage(data)
self.image_thread.set_monitor(None)
if self.image_thread.isRunning():
self.image_thread.stop()
def cleanup(self): # TODO test
self.image_thread.quit()
self.image_thread.stop()
self.image_thread.wait()
self.image_thread.deleteLater()
self.image_thread = None
self.image.remove()
self.color_bar.remove()
super().cleanup()
print("ImageThread stopped")
class ImageThread(QThread):
@ -311,9 +343,9 @@ class ImageThread(QThread):
self.port = self.client.connector.port
self.connector = RedisConnector(f"{self.host}:{self.port}")
self.stream_consumer = None
self.running = False
if self.monitor is not None:
self.connect_stream_consumer(self.monitor)
self.set_monitor(self.monitor)
def update_config(self, config: MonitorConfig | dict): # TODO include monitor update in config?
"""
@ -332,29 +364,7 @@ class ImageThread(QThread):
monitor(str): Name of the monitor.
"""
self.monitor = monitor
if self.monitor is not None:
self.connect_stream_consumer(self.monitor)
elif monitor is None:
self.stream_consumer.shutdown()
def connect_stream_consumer(self, device):
"""
Connect to the stream consumer for the device.
Args:
device(str): Name of the device.
"""
if self.stream_consumer is not None:
self.stream_consumer.shutdown()
self.stream_consumer = self.connector.stream_consumer(
topics=MessageEndpoints.device_monitor(device=device),
cb=self._streamer_cb,
parent=self,
)
self.stream_consumer.start()
print(f"Stream consumer started for device: {device}")
print(f"Stream consumer started for device: {monitor}")
def process_FFT(self, data: np.ndarray) -> np.ndarray: # TODO check functionality
return np.abs(np.fft.fftshift(np.fft.fft2(data)))
@ -382,13 +392,26 @@ class ImageThread(QThread):
data = self.log(data)
return data
@staticmethod
def _streamer_cb(msg, *, parent, **_kwargs) -> None:
msg_device = msg.value
metadata = msg_device.metadata
def run(self):
self.running = True
self.get_image()
data = msg_device.content["data"]
def get_image(self):
if self.monitor is None:
return
while self.running:
data = self.connector.get_last(
topic=MessageEndpoints.device_monitor(device=self.monitor)
)
if data is not None:
data = data.content["data"]
data = parent.post_processing(data)
data = self.post_processing(data)
parent.image_updated.emit(data)
self.image_updated.emit(data)
print("ImageThread is running")
time.sleep(0.01)
def stop(self):
self.running = False