1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-03-05 00:12:49 +01:00
Files
bec_widgets/bec_widgets/examples/eiger_plot/eiger_plot.py
2023-09-21 10:43:45 +02:00

300 lines
9.6 KiB
Python

import json
import json
import os
import threading
import time
import h5py
import numpy as np
import pyqtgraph as pg
import zmq
from PyQt5.QtCore import pyqtSignal
from PyQt5.QtCore import pyqtSlot
from PyQt5.QtGui import QKeySequence
from PyQt5.QtWidgets import (
QWidget,
QFileDialog,
QShortcut,
QDialog,
QVBoxLayout,
QLabel,
QFrame,
)
from pyqtgraph.Qt import uic
# from scipy.stats import multivariate_normal
class EigerPlot(QWidget):
update_signal = pyqtSignal()
def __init__(self, parent=None):
super().__init__(parent)
# pg.setConfigOptions(background="w", foreground="k", antialias=True)
current_path = os.path.dirname(__file__)
uic.loadUi(os.path.join(current_path, "eiger_plot.ui"), self)
# Set widow name
self.setWindowTitle("Eiger Plot")
self.hist_lims = None
self.mask = None
self.image = None
# UI
self.init_ui()
self.hook_signals()
self.key_bindings()
# ZMQ Consumer
self.start_zmq_consumer()
def init_ui(self):
# Create Plot and add ImageItem
self.plot_item = pg.PlotItem()
self.plot_item.setAspectLocked(True)
self.imageItem = pg.ImageItem()
self.plot_item.addItem(self.imageItem)
# Setting up histogram
self.hist = pg.HistogramLUTItem()
self.hist.setImageItem(self.imageItem)
self.hist.gradient.loadPreset("magma")
self.update_hist()
# Adding Items to Graphical Layout
self.glw.addItem(self.plot_item)
self.glw.addItem(self.hist)
def hook_signals(self):
# Buttons
# self.pushButton_test.clicked.connect(self.start_sim_stream)
self.pushButton_mask.clicked.connect(self.load_mask_dialog)
self.pushButton_delete_mask.clicked.connect(self.delete_mask)
self.pushButton_help.clicked.connect(self.show_help_dialog)
# SpinBoxes
self.doubleSpinBox_hist_min.valueChanged.connect(self.update_hist)
self.doubleSpinBox_hist_max.valueChanged.connect(self.update_hist)
# Signal/Slots
self.update_signal.connect(self.on_image_update)
def key_bindings(self):
# Key bindings for rotation
rotate_plus = QShortcut(QKeySequence("Ctrl+A"), self)
rotate_minus = QShortcut(QKeySequence("Ctrl+Z"), self)
self.comboBox_rotation.setToolTip("Increase rotation: Ctrl+A\nDecrease rotation: Ctrl+Z")
self.checkBox_transpose.setToolTip("Toggle transpose: Ctrl+T")
max_index = self.comboBox_rotation.count() - 1 # Maximum valid index
rotate_plus.activated.connect(
lambda: self.comboBox_rotation.setCurrentIndex(
min(self.comboBox_rotation.currentIndex() + 1, max_index)
)
)
rotate_minus.activated.connect(
lambda: self.comboBox_rotation.setCurrentIndex(
max(self.comboBox_rotation.currentIndex() - 1, 0)
)
)
# Key bindings for transpose
transpose = QShortcut(QKeySequence("Ctrl+T"), self)
transpose.activated.connect(self.checkBox_transpose.toggle)
FFT = QShortcut(QKeySequence("Ctrl+F"), self)
FFT.activated.connect(self.checkBox_FFT.toggle)
self.checkBox_FFT.setToolTip("Toggle FFT: Ctrl+F")
log = QShortcut(QKeySequence("Ctrl+L"), self)
log.activated.connect(self.checkBox_log.toggle)
self.checkBox_log.setToolTip("Toggle log: Ctrl+L")
mask = QShortcut(QKeySequence("Ctrl+M"), self)
mask.activated.connect(self.pushButton_mask.click)
self.pushButton_mask.setToolTip("Load mask: Ctrl+M")
delete_mask = QShortcut(QKeySequence("Ctrl+D"), self)
delete_mask.activated.connect(self.pushButton_delete_mask.click)
self.pushButton_delete_mask.setToolTip("Delete mask: Ctrl+D")
def update_hist(self):
self.hist_levels = [
self.doubleSpinBox_hist_min.value(),
self.doubleSpinBox_hist_max.value(),
]
self.hist.setLevels(min=self.hist_levels[0], max=self.hist_levels[1])
self.hist.setHistogramRange(
self.hist_levels[0] - 0.1 * self.hist_levels[0],
self.hist_levels[1] + 0.1 * self.hist_levels[1],
)
def load_mask_dialog(self):
options = QFileDialog.Options()
options |= QFileDialog.ReadOnly
file_name, _ = QFileDialog.getOpenFileName(
self, "Select Mask File", "", "H5 Files (*.h5);;All Files (*)", options=options
)
if file_name:
self.load_mask(file_name)
def load_mask(self, path):
try:
with h5py.File(path, "r") as f:
self.mask = f["data"][...]
if self.mask is not None:
# Set label to mask name without path
self.label_mask.setText(os.path.basename(path))
except KeyError as e:
# Update GUI with the error message
print(f"Error: {str(e)}")
def delete_mask(self):
self.mask = None
self.label_mask.setText("No Mask")
@pyqtSlot()
def on_image_update(self):
# TODO first rotate then transpose
if self.mask is not None:
# self.image = np.ma.masked_array(self.image, mask=self.mask) #TODO test if np works
self.image = self.image * (1 - self.mask) + 1
if self.checkBox_FFT.isChecked():
self.image = np.abs(np.fft.fftshift(np.fft.fft2(self.image)))
if self.comboBox_rotation.currentIndex() > 0: # rotate
self.image = np.rot90(self.image, k=self.comboBox_rotation.currentIndex(), axes=(0, 1))
if self.checkBox_transpose.isChecked(): # transpose
self.image = np.transpose(self.image)
if self.checkBox_log.isChecked():
self.image = np.log10(self.image)
self.imageItem.setImage(self.image, autoLevels=False)
###############################
# ZMQ Consumer
###############################
def start_zmq_consumer(self):
consumer_thread = threading.Thread(target=self.zmq_consumer, daemon=True).start()
def zmq_consumer(self):
try:
print("starting consumer")
live_stream_url = "tcp://129.129.95.38:20000"
receiver = zmq.Context().socket(zmq.SUB)
receiver.connect(live_stream_url)
receiver.setsockopt_string(zmq.SUBSCRIBE, "")
while True:
raw_meta, raw_data = receiver.recv_multipart()
meta = json.loads(raw_meta.decode("utf-8"))
self.image = np.frombuffer(raw_data, dtype=meta["type"]).reshape(meta["shape"])
self.update_signal.emit()
finally:
receiver.disconnect(live_stream_url)
receiver.context.term()
###############################
# just simulations from here
###############################
def show_help_dialog(self):
dialog = QDialog(self)
dialog.setWindowTitle("Help")
layout = QVBoxLayout()
# Key bindings section
layout.addWidget(QLabel("Keyboard Shortcuts:"))
key_bindings = [
("Ctrl+A", "Increase rotation"),
("Ctrl+Z", "Decrease rotation"),
("Ctrl+T", "Toggle transpose"),
("Ctrl+F", "Toggle FFT"),
("Ctrl+L", "Toggle log scale"),
("Ctrl+M", "Load mask"),
("Ctrl+D", "Delete mask"),
]
for keys, action in key_bindings:
layout.addWidget(QLabel(f"{keys} - {action}"))
# Separator
separator = QFrame()
separator.setFrameShape(QFrame.HLine)
separator.setFrameShadow(QFrame.Sunken)
layout.addWidget(separator)
# Histogram section
layout.addWidget(QLabel("Histogram:"))
layout.addWidget(
QLabel(
"Use the Double Spin Boxes to adjust the minimum and maximum values of the histogram."
)
)
# Another Separator
another_separator = QFrame()
another_separator.setFrameShape(QFrame.HLine)
another_separator.setFrameShadow(QFrame.Sunken)
layout.addWidget(another_separator)
# Mask section
layout.addWidget(QLabel("Mask:"))
layout.addWidget(
QLabel(
"Use 'Load Mask' to load a mask from an H5 file. 'Delete Mask' removes the current mask."
)
)
dialog.setLayout(layout)
dialog.exec_()
###############################
# just simulations from here
###############################
# def start_sim_stream(self):
# sim_stream_thread = threading.Thread(target=self.sim_stream, daemon=True)
# sim_stream_thread.start()
#
# def sim_stream(self):
# for i in range(100):
# # Generate 100x100 image of random noise
# self.image = np.random.rand(100, 100) * 0.2
#
# # Define Gaussian parameters
# x, y = np.mgrid[0:50, 0:50]
# pos = np.dstack((x, y))
#
# # Center at (25, 25) longer along y-axis
# rv = multivariate_normal(mean=[25, 25], cov=[[25, 0], [0, 80]])
#
# # Generate Gaussian in the first quadrant
# gaussian_quadrant = rv.pdf(pos) * 40
#
# # Place Gaussian in the first quadrant
# self.image[0:50, 0:50] += gaussian_quadrant * 10
#
# self.update_signal.emit()
# time.sleep(0.1)
if __name__ == "__main__":
import sys
from PyQt5.QtWidgets import QApplication
app = QApplication(sys.argv)
plot = EigerPlot()
plot.show()
sys.exit(app.exec_())