mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-03-05 00:12:49 +01:00
299 lines
9.6 KiB
Python
299 lines
9.6 KiB
Python
import json
|
|
import os
|
|
import threading
|
|
|
|
import h5py
|
|
import numpy as np
|
|
import pyqtgraph as pg
|
|
import zmq
|
|
from qtpy.QtCore import Signal as pyqtSignal
|
|
from qtpy.QtCore import Slot as pyqtSlot
|
|
from qtpy.QtGui import QKeySequence
|
|
from qtpy.QtWidgets import (
|
|
QWidget,
|
|
QFileDialog,
|
|
QShortcut,
|
|
QDialog,
|
|
QVBoxLayout,
|
|
QLabel,
|
|
QFrame,
|
|
)
|
|
from pyqtgraph.Qt import uic
|
|
|
|
|
|
# from scipy.stats import multivariate_normal
|
|
|
|
|
|
class EigerPlot(QWidget):
|
|
update_signal = pyqtSignal()
|
|
|
|
def __init__(self, parent=None):
|
|
super().__init__(parent)
|
|
# pg.setConfigOptions(background="w", foreground="k", antialias=True)
|
|
|
|
current_path = os.path.dirname(__file__)
|
|
uic.loadUi(os.path.join(current_path, "eiger_plot.ui"), self)
|
|
|
|
# Set widow name
|
|
self.setWindowTitle("Eiger Plot")
|
|
|
|
self.hist_lims = None
|
|
self.mask = None
|
|
self.image = None
|
|
|
|
# UI
|
|
self.init_ui()
|
|
self.hook_signals()
|
|
self.key_bindings()
|
|
|
|
# ZMQ Consumer
|
|
self.start_zmq_consumer()
|
|
|
|
def init_ui(self):
|
|
# Create Plot and add ImageItem
|
|
self.plot_item = pg.PlotItem()
|
|
self.plot_item.setAspectLocked(True)
|
|
self.imageItem = pg.ImageItem()
|
|
self.plot_item.addItem(self.imageItem)
|
|
|
|
# Setting up histogram
|
|
self.hist = pg.HistogramLUTItem()
|
|
self.hist.setImageItem(self.imageItem)
|
|
self.hist.gradient.loadPreset("magma")
|
|
self.update_hist()
|
|
|
|
# Adding Items to Graphical Layout
|
|
self.glw.addItem(self.plot_item)
|
|
self.glw.addItem(self.hist)
|
|
|
|
def hook_signals(self):
|
|
# Buttons
|
|
# self.pushButton_test.clicked.connect(self.start_sim_stream)
|
|
self.pushButton_mask.clicked.connect(self.load_mask_dialog)
|
|
self.pushButton_delete_mask.clicked.connect(self.delete_mask)
|
|
self.pushButton_help.clicked.connect(self.show_help_dialog)
|
|
|
|
# SpinBoxes
|
|
self.doubleSpinBox_hist_min.valueChanged.connect(self.update_hist)
|
|
self.doubleSpinBox_hist_max.valueChanged.connect(self.update_hist)
|
|
|
|
# Signal/Slots
|
|
self.update_signal.connect(self.on_image_update)
|
|
|
|
def key_bindings(self):
|
|
# Key bindings for rotation
|
|
rotate_plus = QShortcut(QKeySequence("Ctrl+A"), self)
|
|
rotate_minus = QShortcut(QKeySequence("Ctrl+Z"), self)
|
|
self.comboBox_rotation.setToolTip("Increase rotation: Ctrl+A\nDecrease rotation: Ctrl+Z")
|
|
self.checkBox_transpose.setToolTip("Toggle transpose: Ctrl+T")
|
|
|
|
max_index = self.comboBox_rotation.count() - 1 # Maximum valid index
|
|
|
|
rotate_plus.activated.connect(
|
|
lambda: self.comboBox_rotation.setCurrentIndex(
|
|
min(self.comboBox_rotation.currentIndex() + 1, max_index)
|
|
)
|
|
)
|
|
|
|
rotate_minus.activated.connect(
|
|
lambda: self.comboBox_rotation.setCurrentIndex(
|
|
max(self.comboBox_rotation.currentIndex() - 1, 0)
|
|
)
|
|
)
|
|
|
|
# Key bindings for transpose
|
|
transpose = QShortcut(QKeySequence("Ctrl+T"), self)
|
|
transpose.activated.connect(self.checkBox_transpose.toggle)
|
|
|
|
FFT = QShortcut(QKeySequence("Ctrl+F"), self)
|
|
FFT.activated.connect(self.checkBox_FFT.toggle)
|
|
self.checkBox_FFT.setToolTip("Toggle FFT: Ctrl+F")
|
|
|
|
log = QShortcut(QKeySequence("Ctrl+L"), self)
|
|
log.activated.connect(self.checkBox_log.toggle)
|
|
self.checkBox_log.setToolTip("Toggle log: Ctrl+L")
|
|
|
|
mask = QShortcut(QKeySequence("Ctrl+M"), self)
|
|
mask.activated.connect(self.pushButton_mask.click)
|
|
self.pushButton_mask.setToolTip("Load mask: Ctrl+M")
|
|
|
|
delete_mask = QShortcut(QKeySequence("Ctrl+D"), self)
|
|
delete_mask.activated.connect(self.pushButton_delete_mask.click)
|
|
self.pushButton_delete_mask.setToolTip("Delete mask: Ctrl+D")
|
|
|
|
def update_hist(self):
|
|
self.hist_levels = [
|
|
self.doubleSpinBox_hist_min.value(),
|
|
self.doubleSpinBox_hist_max.value(),
|
|
]
|
|
self.hist.setLevels(min=self.hist_levels[0], max=self.hist_levels[1])
|
|
self.hist.setHistogramRange(
|
|
self.hist_levels[0] - 0.1 * self.hist_levels[0],
|
|
self.hist_levels[1] + 0.1 * self.hist_levels[1],
|
|
)
|
|
|
|
def load_mask_dialog(self):
|
|
options = QFileDialog.Options()
|
|
options |= QFileDialog.ReadOnly
|
|
file_name, _ = QFileDialog.getOpenFileName(
|
|
self, "Select Mask File", "", "H5 Files (*.h5);;All Files (*)", options=options
|
|
)
|
|
if file_name:
|
|
self.load_mask(file_name)
|
|
|
|
def load_mask(self, path):
|
|
try:
|
|
with h5py.File(path, "r") as f:
|
|
self.mask = f["data"][...]
|
|
if self.mask is not None:
|
|
# Set label to mask name without path
|
|
self.label_mask.setText(os.path.basename(path))
|
|
except KeyError as e:
|
|
# Update GUI with the error message
|
|
print(f"Error: {str(e)}")
|
|
|
|
def delete_mask(self):
|
|
self.mask = None
|
|
self.label_mask.setText("No Mask")
|
|
|
|
@pyqtSlot()
|
|
def on_image_update(self):
|
|
# TODO first rotate then transpose
|
|
if self.mask is not None:
|
|
# self.image = np.ma.masked_array(self.image, mask=self.mask) #TODO test if np works
|
|
self.image = self.image * (1 - self.mask) + 1
|
|
|
|
if self.checkBox_FFT.isChecked():
|
|
self.image = np.abs(np.fft.fftshift(np.fft.fft2(self.image)))
|
|
|
|
if self.comboBox_rotation.currentIndex() > 0: # rotate
|
|
self.image = np.rot90(self.image, k=self.comboBox_rotation.currentIndex(), axes=(0, 1))
|
|
|
|
if self.checkBox_transpose.isChecked(): # transpose
|
|
self.image = np.transpose(self.image)
|
|
|
|
if self.checkBox_log.isChecked():
|
|
self.image = np.log10(self.image)
|
|
|
|
self.imageItem.setImage(self.image, autoLevels=False)
|
|
|
|
###############################
|
|
# ZMQ Consumer
|
|
###############################
|
|
|
|
def start_zmq_consumer(self):
|
|
consumer_thread = threading.Thread(target=self.zmq_consumer, daemon=True).start()
|
|
|
|
def zmq_consumer(self):
|
|
try:
|
|
print("starting consumer")
|
|
live_stream_url = "tcp://129.129.95.38:20000"
|
|
receiver = zmq.Context().socket(zmq.SUB)
|
|
receiver.connect(live_stream_url)
|
|
receiver.setsockopt_string(zmq.SUBSCRIBE, "")
|
|
|
|
while True:
|
|
raw_meta, raw_data = receiver.recv_multipart()
|
|
meta = json.loads(raw_meta.decode("utf-8"))
|
|
self.image = np.frombuffer(raw_data, dtype=meta["type"]).reshape(meta["shape"])
|
|
self.update_signal.emit()
|
|
|
|
finally:
|
|
receiver.disconnect(live_stream_url)
|
|
receiver.context.term()
|
|
|
|
###############################
|
|
# just simulations from here
|
|
###############################
|
|
|
|
def show_help_dialog(self):
|
|
dialog = QDialog(self)
|
|
dialog.setWindowTitle("Help")
|
|
|
|
layout = QVBoxLayout()
|
|
|
|
# Key bindings section
|
|
layout.addWidget(QLabel("Keyboard Shortcuts:"))
|
|
|
|
key_bindings = [
|
|
("Ctrl+A", "Increase rotation"),
|
|
("Ctrl+Z", "Decrease rotation"),
|
|
("Ctrl+T", "Toggle transpose"),
|
|
("Ctrl+F", "Toggle FFT"),
|
|
("Ctrl+L", "Toggle log scale"),
|
|
("Ctrl+M", "Load mask"),
|
|
("Ctrl+D", "Delete mask"),
|
|
]
|
|
|
|
for keys, action in key_bindings:
|
|
layout.addWidget(QLabel(f"{keys} - {action}"))
|
|
|
|
# Separator
|
|
separator = QFrame()
|
|
separator.setFrameShape(QFrame.HLine)
|
|
separator.setFrameShadow(QFrame.Sunken)
|
|
layout.addWidget(separator)
|
|
|
|
# Histogram section
|
|
layout.addWidget(QLabel("Histogram:"))
|
|
layout.addWidget(
|
|
QLabel(
|
|
"Use the Double Spin Boxes to adjust the minimum and maximum values of the histogram."
|
|
)
|
|
)
|
|
|
|
# Another Separator
|
|
another_separator = QFrame()
|
|
another_separator.setFrameShape(QFrame.HLine)
|
|
another_separator.setFrameShadow(QFrame.Sunken)
|
|
layout.addWidget(another_separator)
|
|
|
|
# Mask section
|
|
layout.addWidget(QLabel("Mask:"))
|
|
layout.addWidget(
|
|
QLabel(
|
|
"Use 'Load Mask' to load a mask from an H5 file. 'Delete Mask' removes the current mask."
|
|
)
|
|
)
|
|
|
|
dialog.setLayout(layout)
|
|
dialog.exec()
|
|
|
|
###############################
|
|
# just simulations from here
|
|
###############################
|
|
# def start_sim_stream(self):
|
|
# sim_stream_thread = threading.Thread(target=self.sim_stream, daemon=True)
|
|
# sim_stream_thread.start()
|
|
#
|
|
# def sim_stream(self):
|
|
# for i in range(100):
|
|
# # Generate 100x100 image of random noise
|
|
# self.image = np.random.rand(100, 100) * 0.2
|
|
#
|
|
# # Define Gaussian parameters
|
|
# x, y = np.mgrid[0:50, 0:50]
|
|
# pos = np.dstack((x, y))
|
|
#
|
|
# # Center at (25, 25) longer along y-axis
|
|
# rv = multivariate_normal(mean=[25, 25], cov=[[25, 0], [0, 80]])
|
|
#
|
|
# # Generate Gaussian in the first quadrant
|
|
# gaussian_quadrant = rv.pdf(pos) * 40
|
|
#
|
|
# # Place Gaussian in the first quadrant
|
|
# self.image[0:50, 0:50] += gaussian_quadrant * 10
|
|
#
|
|
# self.update_signal.emit()
|
|
# time.sleep(0.1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
from qtpy.QtWidgets import QApplication
|
|
|
|
app = QApplication(sys.argv)
|
|
plot = EigerPlot()
|
|
plot.show()
|
|
sys.exit(app.exec())
|