From ddc9510c2ba8dadf291809eeb5b135a105259492 Mon Sep 17 00:00:00 2001 From: wyzula-jan Date: Wed, 29 May 2024 20:45:21 +0200 Subject: [PATCH] fix(examples): outdated examples removed (mca_plot.py, stream_plot.py, motor_example.py) --- .../jupyter_console/jupyter_console_window.py | 50 +- bec_widgets/examples/mca_readout/__init__.py | 0 bec_widgets/examples/mca_readout/mca_plot.py | 159 -- bec_widgets/examples/mca_readout/mca_sim.py | 28 - .../motor_movement/config_example.yaml | 17 - .../motor_movement/csax_bec_config.yaml | 10 - .../examples/motor_movement/csaxs_config.yaml | 17 - .../examples/motor_movement/motor_example.py | 1344 ----------------- bec_widgets/examples/stream_plot/__init__.py | 0 bec_widgets/examples/stream_plot/line_plot.ui | 148 -- .../examples/stream_plot/stream_plot.py | 342 ----- tests/unit_tests/test_stream_plot.py | 158 -- 12 files changed, 20 insertions(+), 2253 deletions(-) delete mode 100644 bec_widgets/examples/mca_readout/__init__.py delete mode 100644 bec_widgets/examples/mca_readout/mca_plot.py delete mode 100644 bec_widgets/examples/mca_readout/mca_sim.py delete mode 100644 bec_widgets/examples/motor_movement/config_example.yaml delete mode 100644 bec_widgets/examples/motor_movement/csax_bec_config.yaml delete mode 100644 bec_widgets/examples/motor_movement/csaxs_config.yaml delete mode 100644 bec_widgets/examples/motor_movement/motor_example.py delete mode 100644 bec_widgets/examples/stream_plot/__init__.py delete mode 100644 bec_widgets/examples/stream_plot/line_plot.ui delete mode 100644 bec_widgets/examples/stream_plot/stream_plot.py delete mode 100644 tests/unit_tests/test_stream_plot.py diff --git a/bec_widgets/examples/jupyter_console/jupyter_console_window.py b/bec_widgets/examples/jupyter_console/jupyter_console_window.py index 5ea0ad5c..a2585409 100644 --- a/bec_widgets/examples/jupyter_console/jupyter_console_window.py +++ b/bec_widgets/examples/jupyter_console/jupyter_console_window.py @@ -2,18 +2,16 @@ import os import numpy as np import pyqtgraph as pg -from pyqtgraph.Qt import QtWidgets +import qdarktheme from qtconsole.inprocess import QtInProcessKernelManager from qtconsole.rich_jupyter_widget import RichJupyterWidget from qtpy.QtCore import QSize from qtpy.QtGui import QIcon from qtpy.QtWidgets import QApplication, QVBoxLayout, QWidget -from bec_widgets.cli.rpc_register import RPCRegister from bec_widgets.utils import BECDispatcher, UILoader from bec_widgets.widgets import BECFigure from bec_widgets.widgets.dock.dock_area import BECDockArea -from bec_widgets.widgets.spiral_progress_bar.spiral_progress_bar import SpiralProgressBar class JupyterConsoleWidget(RichJupyterWidget): # pragma: no cover: @@ -26,7 +24,6 @@ class JupyterConsoleWidget(RichJupyterWidget): # pragma: no cover: self.kernel_client.start_channels() self.kernel_manager.kernel.shell.push({"np": np, "pg": pg}) - # self.set_console_font_size(70) def shutdown_kernel(self): self.kernel_client.stop_channels() @@ -46,27 +43,22 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover: self.ui.splitter.setSizes([200, 100]) self.safe_close = False - # self.figure.clean_signal.connect(self.confirm_close) - - self.register = RPCRegister() - self.register.add_rpc(self.figure) # console push self.console.kernel_manager.kernel.shell.push( { "fig": self.figure, - "register": self.register, "dock": self.dock, "w1": self.w1, "w2": self.w2, "w3": self.w3, + "d0": self.d0, "d1": self.d1, "d2": self.d2, - "d3": self.d3, + "fig0": self.fig0, + "fig1": self.fig1, + "fig2": self.fig2, "bar": self.bar, - "b2a": self.button_2_a, - "b2b": self.button_2_b, - "b2c": self.button_2_c, "bec": self.figure.client, "scans": self.figure.client.scans, "dev": self.figure.client.device_manager.devices, @@ -111,25 +103,22 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover: self.c1 = self.w1.get_config() def _init_dock(self): - self.button_1 = QtWidgets.QPushButton("Button 1 ") - self.button_2_a = QtWidgets.QPushButton("Button to be added at place 0,0 in d3") - self.button_2_b = QtWidgets.QPushButton("button after without postions specified") - self.button_2_c = QtWidgets.QPushButton("button super late") - self.button_3 = QtWidgets.QPushButton("Button above Figure ") - self.bar = SpiralProgressBar() - self.label_2 = QtWidgets.QLabel("label which is added separately") - self.label_3 = QtWidgets.QLabel("Label above figure") + self.d0 = self.dock.add_dock(name="dock_0") + self.fig0 = self.d0.add_widget_bec("BECFigure") + self.fig0.image("eiger", vrange=(0, 100)) - self.d1 = self.dock.add_dock(widget=self.button_1, position="left") - self.d1.addWidget(self.label_2) - self.d2 = self.dock.add_dock(widget=self.bar, position="right") - self.d3 = self.dock.add_dock(name="figure") - self.fig_dock3 = BECFigure() - self.fig_dock3.plot(x_name="samx", y_name="bpm4d") - self.d3.add_widget(self.label_3) - self.d3.add_widget(self.button_3) - self.d3.add_widget(self.fig_dock3) + self.d1 = self.dock.add_dock(name="dock_1", position="right") + self.fig1 = self.d1.add_widget_bec("BECFigure") + self.fig1.plot(x_name="samx", y_name="bpm4i") + self.fig1.plot(x_name="samx", y_name="bpm3a") + + self.d2 = self.dock.add_dock(name="dock_2", position="bottom") + self.fig2 = self.d2.add_widget_bec("BECFigure", row=0, col=0) + self.fig2.motor_map(x_name="samx", y_name="samy") + self.fig2.plot(x_name="samx", y_name="bpm4i") + self.bar = self.d2.add_widget_bec("SpiralProgressBar", row=0, col=1) + self.bar.set_diameter(200) self.dock.save_state() @@ -155,6 +144,7 @@ if __name__ == "__main__": # pragma: no cover app = QApplication(sys.argv) app.setApplicationName("Jupyter Console") app.setApplicationDisplayName("Jupyter Console") + qdarktheme.setup_theme("auto") icon = QIcon() icon.addFile(os.path.join(module_path, "assets", "terminal_icon.png"), size=QSize(48, 48)) app.setWindowIcon(icon) diff --git a/bec_widgets/examples/mca_readout/__init__.py b/bec_widgets/examples/mca_readout/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/bec_widgets/examples/mca_readout/mca_plot.py b/bec_widgets/examples/mca_readout/mca_plot.py deleted file mode 100644 index f1fdd895..00000000 --- a/bec_widgets/examples/mca_readout/mca_plot.py +++ /dev/null @@ -1,159 +0,0 @@ -# import simulation_progress as SP -import numpy as np -import pyqtgraph as pg -from bec_lib import messages -from bec_lib.endpoints import MessageEndpoints -from qtpy.QtCore import Signal as pyqtSignal -from qtpy.QtCore import Slot as pyqtSlot -from qtpy.QtWidgets import QApplication, QVBoxLayout, QWidget - - -class StreamApp(QWidget): - update_signal = pyqtSignal() - new_scan_id = pyqtSignal(str) - - def __init__(self, device, sub_device): - super().__init__() - pg.setConfigOptions(background="w", foreground="k") - self.init_ui() - - self.setWindowTitle("MCA readout") - - self.data = None - self.scan_id = None - self.stream_consumer = None - - self.device = device - self.sub_device = sub_device - - self.start_device_consumer() - - # self.start_device_consumer(self.device) # for simulation - - self.new_scan_id.connect(self.create_new_stream_consumer) - self.update_signal.connect(self.plot_new) - - def init_ui(self): - # Create layout and add widgets - self.layout = QVBoxLayout() - self.setLayout(self.layout) - - # Create plot - self.glw = pg.GraphicsLayoutWidget() - self.layout.addWidget(self.glw) - - # Create Plot and add ImageItem - self.plot_item = pg.PlotItem() - self.plot_item.setAspectLocked(False) - self.imageItem = pg.ImageItem() - # self.plot_item1D = pg.PlotItem() - # self.plot_item.addItem(self.imageItem) - # self.plot_item.addItem(self.plot_item1D) - - # Setting up histogram - # self.hist = pg.HistogramLUTItem() - # self.hist.setImageItem(self.imageItem) - # self.hist.gradient.loadPreset("magma") - # self.update_hist() - - # Adding Items to Graphical Layout - self.glw.addItem(self.plot_item) - # self.glw.addItem(self.hist) - - @pyqtSlot(str) - def create_new_stream_consumer(self, scan_id: str): - print(f"Creating new stream consumer for scan_id: {scan_id}") - - self.connect_stream_consumer(scan_id, self.device) - - def connect_stream_consumer(self, scan_id, device): - if self.stream_consumer is not None: - self.stream_consumer.shutdown() - - self.stream_consumer = connector.stream_consumer( - topics=MessageEndpoints.device_async_readback(scan_id=scan_id, device=device), - cb=self._streamer_cb, - parent=self, - ) - - self.stream_consumer.start() - - def start_device_consumer(self): - self.device_consumer = connector.consumer( - topics=MessageEndpoints.scan_status(), cb=self._device_cv, parent=self - ) - - self.device_consumer.start() - - # def start_device_consumer(self, device): #for simulation - # self.device_consumer = connector.consumer( - # topics=MessageEndpoints.device_status(device), cb=self._device_cv, parent=self - # ) - # - # self.device_consumer.start() - - def plot_new(self): - print(f"Printing data from plot update: {self.data}") - self.plot_item.plot(self.data[0]) - # self.imageItem.setImage(self.data, autoLevels=False) - - @staticmethod - def _streamer_cb(msg, *, parent, **_kwargs) -> None: - msgMCS = msg.value - print(msgMCS) - row = msgMCS.content["signals"][parent.sub_device] - metadata = msgMCS.metadata - - # Check if the current number of rows is odd - # if parent.data is not None and parent.data.shape[0] % 2 == 1: - # row = np.flip(row) # Flip the row - print(f"Printing data from callback update: {row}") - parent.data = np.array([row]) - # if parent.data is None: - # parent.data = np.array([row]) - # else: - # parent.data = np.vstack((parent.data, row)) - - parent.update_signal.emit() - - @staticmethod - def _device_cv(msg, *, parent, **_kwargs) -> None: - print("Getting ScanID") - - msgDEV = msg.value - - current_scan_id = msgDEV.content["scan_id"] - - if parent.scan_id is None: - parent.scan_id = current_scan_id - parent.new_scan_id.emit(current_scan_id) - print(f"New scan_id: {current_scan_id}") - - if current_scan_id != parent.scan_id: - parent.scan_id = current_scan_id - # parent.data = None - # parent.imageItem.clear() - parent.new_scan_id.emit(current_scan_id) - - print(f"New scan_id: {current_scan_id}") - - -if __name__ == "__main__": - import argparse - - from bec_lib.redis_connector import RedisConnector - - parser = argparse.ArgumentParser(description="Stream App.") - parser.add_argument("--port", type=str, default="pc15543:6379", help="Port for RedisConnector") - parser.add_argument("--device", type=str, default="mcs", help="Device name") - parser.add_argument("--sub_device", type=str, default="mca4", help="Sub-device name") - - args = parser.parse_args() - - connector = RedisConnector(args.port) - - app = QApplication([]) - streamApp = StreamApp(device=args.device, sub_device=args.sub_device) - - streamApp.show() - app.exec() diff --git a/bec_widgets/examples/mca_readout/mca_sim.py b/bec_widgets/examples/mca_readout/mca_sim.py deleted file mode 100644 index 28c976b9..00000000 --- a/bec_widgets/examples/mca_readout/mca_sim.py +++ /dev/null @@ -1,28 +0,0 @@ -import time - -from bec_lib import messages -from bec_lib.endpoints import MessageEndpoints -from bec_lib.redis_connector import RedisConnector - -connector = RedisConnector("localhost:6379") -metadata = {} - -scan_id = "ScanID1" - -metadata.update( - {"scan_id": scan_id, "async_update": "append"} # this will be different for each scan -) -for ii in range(20): - data = {"mca1": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], "mca2": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]} - msg = messages.DeviceMessage(signals=data, metadata=metadata).dumps() - - connector.xadd( - topic=MessageEndpoints.device_async_readback( - scan_id=scan_id, device="mca" - ), # scan_id will be different for each scan - msg={"data": msg}, # TODO should be msg_dict - expire=1800, - ) - - print(f"Sent {ii}") - time.sleep(0.5) diff --git a/bec_widgets/examples/motor_movement/config_example.yaml b/bec_widgets/examples/motor_movement/config_example.yaml deleted file mode 100644 index cd2bc92e..00000000 --- a/bec_widgets/examples/motor_movement/config_example.yaml +++ /dev/null @@ -1,17 +0,0 @@ -selected_motors: - motor_x: "samx" - motor_y: "samy" - -plot_motors: - max_points: 1000 - num_dim_points: 100 - scatter_size: 5 - precision: 3 - mode_lock: False # "Individual" or "Start/Stop". False to unlock - extra_columns: - - sample name: "sample 1" - - step_x [mu]: 25 - - step_y [mu]: 25 - - exp_time [s]: 1 - - start: 1 - - tilt [deg]: 0 diff --git a/bec_widgets/examples/motor_movement/csax_bec_config.yaml b/bec_widgets/examples/motor_movement/csax_bec_config.yaml deleted file mode 100644 index d5f96a39..00000000 --- a/bec_widgets/examples/motor_movement/csax_bec_config.yaml +++ /dev/null @@ -1,10 +0,0 @@ -redis: - host: pc15543 - port: 6379 -mongodb: - host: localhost - port: 27017 -scibec: - host: http://localhost - port: 3030 - beamline: MyBeamline diff --git a/bec_widgets/examples/motor_movement/csaxs_config.yaml b/bec_widgets/examples/motor_movement/csaxs_config.yaml deleted file mode 100644 index 9772c7df..00000000 --- a/bec_widgets/examples/motor_movement/csaxs_config.yaml +++ /dev/null @@ -1,17 +0,0 @@ -selected_motors: - motor_x: "samx" - motor_y: "samy" - -plot_motors: - max_points: 1000 - num_dim_points: 100 - scatter_size: 5 - precision: 3 - mode_lock: Start/Stop # "Individual" or "Start/Stop" - extra_columns: - - sample name: "sample 1" - - step_x [mu]: 25 - - step_y [mu]: 25 - - exp_time [s]: 1 - - start: 1 - - tilt [deg]: 0 diff --git a/bec_widgets/examples/motor_movement/motor_example.py b/bec_widgets/examples/motor_movement/motor_example.py deleted file mode 100644 index 98590cc6..00000000 --- a/bec_widgets/examples/motor_movement/motor_example.py +++ /dev/null @@ -1,1344 +0,0 @@ -import csv -import os -from enum import Enum -from functools import partial - -import numpy as np -import pyqtgraph as pg -from bec_lib import messages -from bec_lib.endpoints import MessageEndpoints -from pyqtgraph.Qt import QtCore, QtWidgets, uic -from qtpy import QtGui -from qtpy.QtCore import Qt, QThread -from qtpy.QtCore import Signal as pyqtSignal -from qtpy.QtCore import Slot as pyqtSlot -from qtpy.QtGui import QDoubleValidator, QKeySequence -from qtpy.QtWidgets import ( - QApplication, - QDialog, - QFileDialog, - QFrame, - QLabel, - QMessageBox, - QPushButton, - QShortcut, - QVBoxLayout, - QWidget, -) - -from bec_widgets.utils import DoubleValidationDelegate - -# TODO - General features -# - put motor status (moving, stopped, etc) -# - add mouse interactions with the plot -> click to select coordinates, double click to move? -# - adjust right click actions - - -class MotorApp(QWidget): - """ - Main class for MotorApp, designed to control motor positions based on a flexible YAML configuration. - - Attributes: - coordinates_updated (pyqtSignal): Signal to trigger coordinate updates. - selected_motors (dict): Dictionary containing pre-selected motors from the configuration file. - plot_motors (dict): Dictionary containing settings for plotting motor positions. - - Args: - selected_motors (dict): Dictionary specifying the selected motors. - plot_motors (dict): Dictionary specifying settings for plotting motor positions. - parent (QWidget, optional): Parent widget. - """ - - coordinates_updated = pyqtSignal(float, float) - - def __init__(self, selected_motors: dict = {}, plot_motors: dict = {}, parent=None): - super(MotorApp, self).__init__(parent) - current_path = os.path.dirname(__file__) - uic.loadUi(os.path.join(current_path, "motor_controller.ui"), self) - - # Motor Control Thread - self.motor_thread = MotorControl() - - self.motor_x, self.motor_y = None, None - self.limit_x, self.limit_y = None, None - - # Coordinates tracking - self.motor_positions = np.array([]) - - # Config file settings - self.max_points = plot_motors.get("max_points", 5000) - self.num_dim_points = plot_motors.get("num_dim_points", 100) - self.scatter_size = plot_motors.get("scatter_size", 5) - self.precision = plot_motors.get("precision", 2) - self.extra_columns = plot_motors.get("extra_columns", None) - self.mode_lock = plot_motors.get("mode_lock", False) - - # Saved motors from config file - self.selected_motors = selected_motors - - # QThread for motor movement + signals - self.motor_thread.motors_loaded.connect(self.get_available_motors) - self.motor_thread.motors_selected.connect(self.get_selected_motors) - self.motor_thread.limits_retrieved.connect(self.update_limits) - - # UI - self.init_ui() - self.tag_N = 1 # position label for saved coordinates - - # State tracking for entries - self.last_selected_index = -1 - self.is_next_entry_end = False - - # Get all motors available - self.motor_thread.retrieve_all_motors() # TODO link to combobox that it always refresh - - def connect_motor(self, motor_x_name: str, motor_y_name: str): - """ - Connects to the specified motors and initializes the UI for motor control. - - Args: - motor_x_name (str): Name of the motor controlling the x-axis. - motor_y_name (str): Name of the motor controlling the y-axis. - """ - self.motor_thread.connect_motors(motor_x_name, motor_y_name) - self.motor_thread.retrieve_motor_limits(self.motor_x, self.motor_y) - - # self.init_motor_map() - - self.motorControl.setEnabled(True) - self.motorControl_absolute.setEnabled(True) - self.tabWidget_tables.setTabEnabled(1, True) - - self.generate_table_coordinate( - self.tableWidget_coordinates, - self.motor_thread.retrieve_coordinates(), - tag=f"{motor_x_name},{motor_y_name}", - precision=self.precision, - ) - - @pyqtSlot(object, object) - def get_selected_motors(self, motor_x, motor_y): - """ - Slot to receive and set the selected motors. - - Args: - motor_x (object): The selected motor for the x-axis. - motor_y (object): The selected motor for the y-axis. - """ - self.motor_x, self.motor_y = motor_x, motor_y - - @pyqtSlot(list, list) - def get_available_motors(self, motors_x, motors_y): - """ - Slot to populate the available motors in the combo boxes and set the index based on the configuration. - - Args: - motors_x (list): List of available motors for the x-axis. - motors_y (list): List of available motors for the y-axis. - """ - self.comboBox_motor_x.addItems(motors_x) - self.comboBox_motor_y.addItems(motors_y) - - # Set index based on the motor names in the configuration, if available - selected_motor_x = "" - selected_motor_y = "" - - if self.selected_motors: - selected_motor_x = self.selected_motors.get("motor_x", "") - selected_motor_y = self.selected_motors.get("motor_y", "") - - index_x = self.comboBox_motor_x.findText(selected_motor_x) - index_y = self.comboBox_motor_y.findText(selected_motor_y) - - if index_x != -1: - self.comboBox_motor_x.setCurrentIndex(index_x) - else: - print( - f"Warning: Motor '{selected_motor_x}' specified in the config file is not available." - ) - self.comboBox_motor_x.setCurrentIndex(0) # Optionally set to first item or any default - - if index_y != -1: - self.comboBox_motor_y.setCurrentIndex(index_y) - else: - print( - f"Warning: Motor '{selected_motor_y}' specified in the config file is not available." - ) - self.comboBox_motor_y.setCurrentIndex(0) # Optionally set to first item or any default - - @pyqtSlot(list, list) - def update_limits(self, x_limits: list, y_limits: list) -> None: - """ - Slot to update the limits for x and y motors. - - Args: - x_limits (list): List containing the lower and upper limits for the x-axis motor. - y_limits (list): List containing the lower and upper limits for the y-axis motor. - """ - self.limit_x = x_limits - self.limit_y = y_limits - self.spinBox_x_min.setValue(self.limit_x[0]) - self.spinBox_x_max.setValue(self.limit_x[1]) - self.spinBox_y_min.setValue(self.limit_y[0]) - self.spinBox_y_max.setValue(self.limit_y[1]) - - for spinBox in ( - self.spinBox_x_min, - self.spinBox_x_max, - self.spinBox_y_min, - self.spinBox_y_max, - ): - spinBox.setStyleSheet("") - - # TODO - names can be get from MotorController - self.label_Y_max.setText(f"+ ({self.motor_y.name})") - self.label_Y_min.setText(f"- ({self.motor_y.name})") - self.label_X_max.setText(f"+ ({self.motor_x.name})") - self.label_X_min.setText(f"- ({self.motor_x.name})") - - self.init_motor_map() # reinitialize the map with the new limits - - @pyqtSlot() - def enable_motor_control(self): - self.motorControl.setEnabled(True) - - def enable_motor_controls(self, disable: bool) -> None: - self.motorControl.setEnabled(disable) - self.motorSelection.setEnabled(disable) - - # Disable or enable all controls within the motorControl_absolute group box - for widget in self.motorControl_absolute.findChildren(QtWidgets.QWidget): - widget.setEnabled(disable) - - # Enable the pushButton_stop if the motor is moving - self.pushButton_stop.setEnabled(True) - - def move_motor_absolute(self, x: float, y: float) -> None: - self.enable_motor_controls(False) - target_coordinates = (x, y) - self.motor_thread.move_to_coordinates(target_coordinates) - if self.checkBox_save_with_go.isChecked(): - self.save_absolute_coordinates() - - def move_motor_relative(self, motor, axis: str, direction: int) -> None: - self.enable_motor_controls(False) - if axis == "x": - step = direction * self.spinBox_step_x.value() - elif axis == "y": - step = direction * self.spinBox_step_y.value() - self.motor_thread.move_relative(motor, step) - - def update_plot_setting(self, max_points, num_dim_points, scatter_size): - self.max_points = max_points - self.num_dim_points = num_dim_points - self.scatter_size = scatter_size - - for spinBox in ( - self.spinBox_max_points, - self.spinBox_num_dim_points, - self.spinBox_scatter_size, - ): - spinBox.setStyleSheet("") - - def set_from_config(self) -> None: - """Set the values from the config file to the UI elements""" - - self.spinBox_max_points.setValue(self.max_points) - self.spinBox_num_dim_points.setValue(self.num_dim_points) - self.spinBox_scatter_size.setValue(self.scatter_size) - self.spinBox_precision.setValue(self.precision) - self.update_precision(self.precision) - - def init_ui_plot_elements(self) -> None: - """Initialize the plot elements""" - self.label_coorditanes = self.glw.addLabel(f"Motor position: (X, Y)", row=0, col=0) - self.plot_map = self.glw.addPlot(row=1, col=0) - self.limit_map = pg.ImageItem() - self.plot_map.addItem(self.limit_map) - self.motor_map = pg.ScatterPlotItem( - size=self.scatter_size, pen=pg.mkPen(None), brush=pg.mkBrush(255, 255, 255, 255) - ) - self.motor_map.setZValue(0) - - self.saved_motor_map_start = pg.ScatterPlotItem( - size=self.scatter_size, pen=pg.mkPen(None), brush=pg.mkBrush(255, 0, 0, 255) - ) - self.saved_motor_map_end = pg.ScatterPlotItem( - size=self.scatter_size, pen=pg.mkPen(None), brush=pg.mkBrush(0, 0, 255, 255) - ) - - self.saved_motor_map_individual = pg.ScatterPlotItem( - size=self.scatter_size, pen=pg.mkPen(None), brush=pg.mkBrush(0, 255, 0, 255) - ) - - self.saved_motor_map_start.setZValue(1) # for saved motor positions - self.saved_motor_map_end.setZValue(1) # for saved motor positions - self.saved_motor_map_individual.setZValue(1) # for saved motor positions - - self.plot_map.addItem(self.motor_map) - self.plot_map.addItem(self.saved_motor_map_start) - self.plot_map.addItem(self.saved_motor_map_end) - self.plot_map.addItem(self.saved_motor_map_individual) - self.plot_map.showGrid(x=True, y=True) - - def init_ui_motor_control(self) -> None: - """Initialize the motor control elements""" - - # Connect checkbox and spinBoxes - self.checkBox_same_xy.stateChanged.connect(self.sync_step_sizes) - self.spinBox_step_x.valueChanged.connect(self.update_step_size_x) - self.spinBox_step_y.valueChanged.connect(self.update_step_size_y) - - self.toolButton_right.clicked.connect( - lambda: self.move_motor_relative(self.motor_x, "x", 1) - ) - self.toolButton_left.clicked.connect( - lambda: self.move_motor_relative(self.motor_x, "x", -1) - ) - self.toolButton_up.clicked.connect(lambda: self.move_motor_relative(self.motor_y, "y", 1)) - self.toolButton_down.clicked.connect( - lambda: self.move_motor_relative(self.motor_y, "y", -1) - ) - - # Switch between key shortcuts active - self.checkBox_enableArrows.stateChanged.connect(self.update_arrow_key_shortcuts) - self.update_arrow_key_shortcuts() - - # Move to absolute coordinates - self.pushButton_go_absolute.clicked.connect( - lambda: self.move_motor_absolute( - self.spinBox_absolute_x.value(), self.spinBox_absolute_y.value() - ) - ) - - self.pushButton_set.clicked.connect(self.save_absolute_coordinates) - self.pushButton_save.clicked.connect(self.save_current_coordinates) - self.pushButton_stop.clicked.connect(self.motor_thread.stop_movement) - - # Enable/Disable GUI - self.motor_thread.move_finished.connect(lambda: self.enable_motor_controls(True)) - - # Precision update - self.spinBox_precision.valueChanged.connect(lambda x: self.update_precision(x)) - - def init_ui_motor_configs(self) -> None: - """Limit and plot spinBoxes""" - - # SpinBoxes change color to yellow before updated, limits are updated with update button - self.spinBox_x_min.valueChanged.connect(lambda: self.param_changed(self.spinBox_x_min)) - self.spinBox_x_max.valueChanged.connect(lambda: self.param_changed(self.spinBox_x_max)) - self.spinBox_y_min.valueChanged.connect(lambda: self.param_changed(self.spinBox_y_min)) - self.spinBox_y_max.valueChanged.connect(lambda: self.param_changed(self.spinBox_y_max)) - - # SpinBoxes - Max Points and N Dim Points - self.spinBox_max_points.valueChanged.connect( - lambda: self.param_changed(self.spinBox_max_points) - ) - self.spinBox_num_dim_points.valueChanged.connect( - lambda: self.param_changed(self.spinBox_num_dim_points) - ) - self.spinBox_scatter_size.valueChanged.connect( - lambda: self.param_changed(self.spinBox_scatter_size) - ) - - # Limit Update - self.pushButton_updateLimits.clicked.connect( - lambda: self.update_all_motor_limits( - x_limit=[self.spinBox_x_min.value(), self.spinBox_x_max.value()], - y_limit=[self.spinBox_y_min.value(), self.spinBox_y_max.value()], - ) - ) - - # Plot Update - self.pushButton_update_config.clicked.connect( - lambda: self.update_plot_setting( - max_points=self.spinBox_max_points.value(), - num_dim_points=self.spinBox_num_dim_points.value(), - scatter_size=self.spinBox_scatter_size.value(), - ) - ) - - self.pushButton_enableGUI.clicked.connect(lambda: self.enable_motor_controls(True)) - - def init_ui_motor_connections(self) -> None: - # Signal from motor thread to update coordinates - self.motor_thread.coordinates_updated.connect( - lambda x, y: self.update_image_map(round(x, self.precision), round(y, self.precision)) - ) - - # Motor connections button - self.pushButton_connecMotors.clicked.connect( - lambda: self.connect_motor( - self.comboBox_motor_x.currentText(), self.comboBox_motor_y.currentText() - ) - ) - - # Check if there are any motors connected - if self.motor_x or self.motor_y is None: - self.motorControl.setEnabled(False) - self.motorControl_absolute.setEnabled(False) - self.tabWidget_tables.setTabEnabled(1, False) - - def init_keyboard_shortcuts(self) -> None: - """Initialize the keyboard shortcuts""" - - # Delete table entry - delete_shortcut = QShortcut(QKeySequence("Delete"), self) - backspace_shortcut = QShortcut(QKeySequence("Backspace"), self) - delete_shortcut.activated.connect(self.delete_selected_row) - backspace_shortcut.activated.connect(self.delete_selected_row) - - # Increase/decrease step size for X motor - increase_x_shortcut = QShortcut(QKeySequence("Ctrl+A"), self) - decrease_x_shortcut = QShortcut(QKeySequence("Ctrl+Z"), self) - increase_x_shortcut.activated.connect(lambda: self.change_step_size(self.spinBox_step_x, 2)) - decrease_x_shortcut.activated.connect( - lambda: self.change_step_size(self.spinBox_step_x, 0.5) - ) - - # Increase/decrease step size for Y motor - increase_y_shortcut = QShortcut(QKeySequence("Alt+A"), self) - decrease_y_shortcut = QShortcut(QKeySequence("Alt+Z"), self) - increase_y_shortcut.activated.connect(lambda: self.change_step_size(self.spinBox_step_y, 2)) - decrease_y_shortcut.activated.connect( - lambda: self.change_step_size(self.spinBox_step_y, 0.5) - ) - - # Go absolute button - self.pushButton_go_absolute.setShortcut("Ctrl+G") - self.pushButton_go_absolute.setToolTip("Ctrl+G") - - # Set absolute coordinates - self.pushButton_set.setShortcut("Ctrl+D") - self.pushButton_set.setToolTip("Ctrl+D") - - # Save Current coordinates - self.pushButton_save.setShortcut("Ctrl+S") - self.pushButton_save.setToolTip("Ctrl+S") - - # Stop Button - self.pushButton_stop.setShortcut("Ctrl+X") - self.pushButton_stop.setToolTip("Ctrl+X") - - def init_ui_table(self) -> None: - """Initialize the table validators for x and y coordinates and table signals""" - - # Validators - self.double_delegate = DoubleValidationDelegate(self.tableWidget_coordinates) - - # Init Default mode - self.mode_switch() - - # Buttons - self.pushButton_exportCSV.clicked.connect( - lambda: self.export_table_to_csv(self.tableWidget_coordinates) - ) - self.pushButton_importCSV.clicked.connect( - lambda: self.load_table_from_csv(self.tableWidget_coordinates, precision=self.precision) - ) - self.pushButton_resize_table.clicked.connect( - lambda: self.resizeTable(self.tableWidget_coordinates) - ) - self.pushButton_duplicate.clicked.connect( - lambda: self.duplicate_last_row(self.tableWidget_coordinates) - ) - self.pushButton_help.clicked.connect(self.show_help_dialog) - - # Mode switch - self.comboBox_mode.currentIndexChanged.connect(self.mode_switch) - - # Manual Edit - self.tableWidget_coordinates.itemChanged.connect(self.handle_manual_edit) - - def init_mode_lock(self) -> None: - if self.mode_lock is False: - return - elif self.mode_lock == "Individual": - self.comboBox_mode.setCurrentIndex(0) - self.comboBox_mode.setEnabled(False) - elif self.mode_lock == "Start/Stop": - self.comboBox_mode.setCurrentIndex(1) - self.comboBox_mode.setEnabled(False) - else: - self.mode_lock = False - print(f"Warning: Mode lock '{self.mode_lock}' not recognized.") - print(f"Unlocking mode lock.") - - def init_ui(self) -> None: - """Setup all ui elements""" - - self.set_from_config() # Set default parameters - self.init_ui_plot_elements() # 2D Plot - self.init_ui_motor_control() # Motor Controls - self.init_ui_motor_configs() # Motor Configs - self.init_ui_motor_connections() # Motor Connections - self.init_keyboard_shortcuts() # Keyboard Shortcuts - self.init_ui_table() # Table validators for x and y coordinates - self.init_mode_lock() # Mode lock - - def init_motor_map(self): - # Get motor limits - limit_x_min, limit_x_max = self.motor_thread.get_motor_limits(self.motor_x) - limit_y_min, limit_y_max = self.motor_thread.get_motor_limits(self.motor_y) - - self.offset_x = limit_x_min - self.offset_y = limit_y_min - - # Define the size of the image map based on the motor's limits - map_width = int(limit_x_max - limit_x_min + 1) - map_height = int(limit_y_max - limit_y_min + 1) - - # Create an empty image map - self.background_value = 25 - self.limit_map_data = np.full( - (map_width, map_height), self.background_value, dtype=np.float32 - ) - self.limit_map.setImage(self.limit_map_data) - - # Set the initial position on the map - init_pos = self.motor_thread.retrieve_coordinates() - self.motor_positions = np.array([init_pos]) - self.brushes = [pg.mkBrush(255, 255, 255, 255)] - - self.motor_map.setData(pos=self.motor_positions, brush=self.brushes) - - # Translate and scale the image item to match the motor coordinates - self.tr = QtGui.QTransform() - self.tr.translate(limit_x_min, limit_y_min) - self.limit_map.setTransform(self.tr) - - if hasattr(self, "highlight_V") and hasattr(self, "highlight_H"): - self.plot_map.removeItem(self.highlight_V) - self.plot_map.removeItem(self.highlight_H) - - # Crosshair to highlight the current position - self.highlight_V = pg.InfiniteLine( - angle=90, movable=False, pen=pg.mkPen(color="r", width=1, style=QtCore.Qt.DashLine) - ) - self.highlight_H = pg.InfiniteLine( - angle=0, movable=False, pen=pg.mkPen(color="r", width=1, style=QtCore.Qt.DashLine) - ) - - self.plot_map.addItem(self.highlight_V) - self.plot_map.addItem(self.highlight_H) - - self.highlight_V.setPos(init_pos[0]) - self.highlight_H.setPos(init_pos[1]) - - def update_image_map(self, x, y): - # Update label - self.label_coorditanes.setText(f"Motor position: ({x}, {y})") - - # Add new point with full brightness - new_pos = np.array([x, y]) - self.motor_positions = np.vstack((self.motor_positions, new_pos)) - - # If the number of points exceeds max_points, delete the oldest points - if len(self.motor_positions) > self.max_points: - self.motor_positions = self.motor_positions[-self.max_points :] - - # Determine brushes based on position in the array - self.brushes = [pg.mkBrush(50, 50, 50, 255)] * len(self.motor_positions) - - # Calculate the decrement step based on self.num_dim_points - decrement_step = (255 - 50) / self.num_dim_points - - for i in range(1, min(self.num_dim_points + 1, len(self.motor_positions) + 1)): - brightness = max(60, 255 - decrement_step * (i - 1)) - self.brushes[-i] = pg.mkBrush(brightness, brightness, brightness, 255) - - self.brushes[-1] = pg.mkBrush(255, 255, 255, 255) # Newest point is always full brightness - - self.motor_map.setData(pos=self.motor_positions, brush=self.brushes, size=self.scatter_size) - - # Set Highlight - self.highlight_V.setPos(x) - self.highlight_H.setPos(y) - - def update_all_motor_limits(self, x_limit: list = None, y_limit: list = None) -> None: - self.motor_thread.update_all_motor_limits(x_limit=x_limit, y_limit=y_limit) - - def update_arrow_key_shortcuts(self): - if self.checkBox_enableArrows.isChecked(): - # Set the arrow key shortcuts for motor movement - self.toolButton_right.setShortcut(Qt.Key_Right) - self.toolButton_left.setShortcut(Qt.Key_Left) - self.toolButton_up.setShortcut(Qt.Key_Up) - self.toolButton_down.setShortcut(Qt.Key_Down) - else: - # Clear the shortcuts - self.toolButton_right.setShortcut("") - self.toolButton_left.setShortcut("") - self.toolButton_up.setShortcut("") - self.toolButton_down.setShortcut("") - - def mode_switch(self): - current_index = self.comboBox_mode.currentIndex() - - if self.tableWidget_coordinates.rowCount() > 0: - msgBox = QMessageBox() - msgBox.setIcon(QMessageBox.Warning) - msgBox.setText( - "Switching modes will delete all table entries. Do you want to continue?" - ) - msgBox.setStandardButtons(QMessageBox.Ok | QMessageBox.Cancel) - returnValue = msgBox.exec() - - if returnValue == QMessageBox.Cancel: - self.comboBox_mode.blockSignals(True) # Block signals - self.comboBox_mode.setCurrentIndex(self.last_selected_index) - self.comboBox_mode.blockSignals(False) # Unblock signals - return - - self.tableWidget_coordinates.setRowCount(0) # Wipe table - - # Clear saved points from map - self.saved_motor_map_start.clear() - self.saved_motor_map_end.clear() - self.saved_motor_map_individual.clear() - - if current_index == 0: # 'individual' is selected - header = ["Show", "Move", "Tag", "X", "Y"] - - self.tableWidget_coordinates.setColumnCount(len(header)) - self.tableWidget_coordinates.setHorizontalHeaderLabels(header) - self.tableWidget_coordinates.setItemDelegateForColumn(3, self.double_delegate) - self.tableWidget_coordinates.setItemDelegateForColumn(4, self.double_delegate) - - elif current_index == 1: # 'start/stop' is selected - header = [ - "Show", - "Move [start]", - "Move [end]", - "Tag", - "X [start]", - "Y [start]", - "X [end]", - "Y [end]", - ] - self.tableWidget_coordinates.setColumnCount(len(header)) - self.tableWidget_coordinates.setHorizontalHeaderLabels(header) - self.tableWidget_coordinates.setItemDelegateForColumn(3, self.double_delegate) - self.tableWidget_coordinates.setItemDelegateForColumn(4, self.double_delegate) - self.tableWidget_coordinates.setItemDelegateForColumn(5, self.double_delegate) - self.tableWidget_coordinates.setItemDelegateForColumn(6, self.double_delegate) - - self.last_selected_index = current_index # Save the last selected index - - def generate_table_coordinate( - self, table: QtWidgets.QTableWidget, coordinates: tuple, tag: str = None, precision: int = 0 - ) -> None: - # To not call replot points during table generation - self.replot_lock = True - - current_index = self.comboBox_mode.currentIndex() - - if current_index == 1 and self.is_next_entry_end: - target_row = table.rowCount() - 1 # Last row - else: - new_row_count = table.rowCount() + 1 - table.setRowCount(new_row_count) - target_row = new_row_count - 1 # New row - - # Create QDoubleValidator - validator = QDoubleValidator() - validator.setDecimals(precision) - - # Checkbox for visibility switch -> always first column - checkBox = QtWidgets.QCheckBox() - checkBox.setChecked(True) - checkBox.stateChanged.connect(lambda: self.replot_based_on_table(table)) - table.setCellWidget(target_row, 0, checkBox) - - # Apply validator to x and y coordinate QTableWidgetItem - item_x = QtWidgets.QTableWidgetItem(str(f"{coordinates[0]:.{precision}f}")) - item_y = QtWidgets.QTableWidgetItem(str(f"{coordinates[1]:.{precision}f}")) - item_x.setFlags(item_x.flags() | Qt.ItemIsEditable) - item_y.setFlags(item_y.flags() | Qt.ItemIsEditable) - - # Mode switch - if current_index == 1: # start/stop mode - # Create buttons for start and end coordinates - button_start = QPushButton("Go [start]") - button_end = QPushButton("Go [end]") - - # Add buttons to table - table.setCellWidget(target_row, 1, button_start) - table.setCellWidget(target_row, 2, button_end) - - button_end.setEnabled( - self.is_next_entry_end - ) # Enable only if end coordinate is present - - # Connect buttons to the slot - button_start.clicked.connect(self.move_to_row_coordinates) - button_end.clicked.connect(self.move_to_row_coordinates) - - # Set Tag - table.setItem(target_row, 3, QtWidgets.QTableWidgetItem(str(tag))) - - # Add coordinates to table - col_index = 8 - if self.is_next_entry_end: - table.setItem(target_row, 6, item_x) - table.setItem(target_row, 7, item_y) - else: - table.setItem(target_row, 4, item_x) - table.setItem(target_row, 5, item_y) - self.is_next_entry_end = not self.is_next_entry_end - else: # Individual mode - button_start = QPushButton("Go") - table.setCellWidget(target_row, 1, button_start) - button_start.clicked.connect(self.move_to_row_coordinates) - - # Set Tag - table.setItem(target_row, 2, QtWidgets.QTableWidgetItem(str(tag))) - - col_index = 5 - table.setItem(target_row, 3, item_x) - table.setItem(target_row, 4, item_y) - - # Adding extra columns - # TODO simplify nesting - if current_index != 1 or self.is_next_entry_end: - if self.extra_columns: - table.setColumnCount(col_index + len(self.extra_columns)) - for col_dict in self.extra_columns: - for col_name, default_value in col_dict.items(): - if target_row == 0: - item = QtWidgets.QTableWidgetItem(str(default_value)) - - else: - prev_item = table.item(target_row - 1, col_index) - item_text = prev_item.text() if prev_item else "" - item = QtWidgets.QTableWidgetItem(item_text) - - item.setFlags(item.flags() | Qt.ItemIsEditable) - table.setItem(target_row, col_index, item) - - if target_row == 0 or (current_index == 1 and not self.is_next_entry_end): - table.setHorizontalHeaderItem( - col_index, QtWidgets.QTableWidgetItem(col_name) - ) - - col_index += 1 - - self.align_table_center(table) - - if self.checkBox_resize_auto.isChecked(): - table.resizeColumnsToContents() - - # Unlock Replot - self.replot_lock = False - - # Replot the saved motor map - self.replot_based_on_table(table) - - def duplicate_last_row(self, table: QtWidgets.QTableWidget) -> None: - if self.is_next_entry_end is True: - msgBox = QMessageBox() - msgBox.setIcon(QMessageBox.Warning) - msgBox.setText("The end coordinates were not set for previous entry!") - msgBox.setStandardButtons(QMessageBox.Ok) - returnValue = msgBox.exec() - - if returnValue == QMessageBox.Ok: - return - - last_row = table.rowCount() - 1 - if last_row == -1: - return - - # Get the tag and coordinates from the last row - tag = table.item(last_row, 2).text() if table.item(last_row, 2) else None - mode_index = self.comboBox_mode.currentIndex() - - if mode_index == 1: # start/stop mode - x_start = float(table.item(last_row, 4).text()) if table.item(last_row, 4) else None - y_start = float(table.item(last_row, 5).text()) if table.item(last_row, 5) else None - x_end = float(table.item(last_row, 6).text()) if table.item(last_row, 6) else None - y_end = float(table.item(last_row, 7).text()) if table.item(last_row, 7) else None - - # Duplicate the 'start' coordinates - self.generate_table_coordinate(table, (x_start, y_start), tag, precision=self.precision) - - # Duplicate the 'end' coordinates - self.generate_table_coordinate(table, (x_end, y_end), tag, precision=self.precision) - - else: # individual mode - x = float(table.item(last_row, 3).text()) if table.item(last_row, 3) else None - y = float(table.item(last_row, 4).text()) if table.item(last_row, 4) else None - - # Duplicate the coordinates - self.generate_table_coordinate(table, (x, y), tag, precision=self.precision) - - self.align_table_center(table) - - if self.checkBox_resize_auto.isChecked(): - table.resizeColumnsToContents() - - def handle_manual_edit(self, item): - table = item.tableWidget() - row, col = item.row(), item.column() - mode_index = self.comboBox_mode.currentIndex() - - # Determine the columns where the x and y coordinates are stored based on the mode. - coord_cols = [3, 4] if mode_index == 0 else [4, 5, 6, 7] - - if col not in coord_cols: - return # Only proceed if the edited columns are coordinate columns - - # Replot based on the table - self.replot_based_on_table(table) - - @staticmethod - def align_table_center(table: QtWidgets.QTableWidget) -> None: - for row in range(table.rowCount()): - for col in range(table.columnCount()): - item = table.item(row, col) - if item: - item.setTextAlignment(Qt.AlignCenter) - - def move_to_row_coordinates(self): - # Find out the mode and decide columns accordingly - mode = self.comboBox_mode.currentIndex() - - # Get the button that emitted the signal# Get the button that emitted the signal - button = self.sender() - - # Find the row and column where the button is located - row = self.tableWidget_coordinates.indexAt(button.pos()).row() - col = self.tableWidget_coordinates.indexAt(button.pos()).column() - - # Decide which coordinates to move to based on the column - if mode == 1: - if col == 1: # Go to 'start' coordinates - x_col, y_col = 4, 5 - elif col == 2: # Go to 'end' coordinates - x_col, y_col = 6, 7 - else: # Default case - x_col, y_col = 3, 4 # For "individual" mode - - # Fetch and move coordinates - x = float(self.tableWidget_coordinates.item(row, x_col).text()) - y = float(self.tableWidget_coordinates.item(row, y_col).text()) - self.move_motor_absolute(x, y) - - def replot_based_on_table(self, table): - if self.replot_lock is True: - return - - print("Replot Triggered") - start_points = [] - end_points = [] - individual_points = [] - # self.rectangles = [] #TODO introduce later - - for row in range(table.rowCount()): - visibility = table.cellWidget(row, 0).isChecked() - if not visibility: - continue - - if self.comboBox_mode.currentIndex() == 1: # start/stop mode - x_start = float(table.item(row, 4).text()) if table.item(row, 4) else None - y_start = float(table.item(row, 5).text()) if table.item(row, 5) else None - x_end = float(table.item(row, 6).text()) if table.item(row, 6) else None - y_end = float(table.item(row, 7).text()) if table.item(row, 7) else None - - if x_start is not None and y_start is not None: - start_points.append([x_start, y_start]) - print(f"added start points:{start_points}") - if x_end is not None and y_end is not None: - end_points.append([x_end, y_end]) - print(f"added end points:{end_points}") - - else: # individual mode - x_ind = float(table.item(row, 3).text()) if table.item(row, 3) else None - y_ind = float(table.item(row, 4).text()) if table.item(row, 4) else None - if x_ind is not None and y_ind is not None: - individual_points.append([x_ind, y_ind]) - print(f"added individual points:{individual_points}") - - if start_points: - self.saved_motor_map_start.setData(pos=np.array(start_points)) - print("plotted start") - if end_points: - self.saved_motor_map_end.setData(pos=np.array(end_points)) - print("plotted end") - if individual_points: - self.saved_motor_map_individual.setData(pos=np.array(individual_points)) - print("plotted individual") - - # TODO will be adapted with logic to handle start/end points - def draw_rectangles(self, start_points, end_points): - for start, end in zip(start_points, end_points): - self.draw_rectangle(start, end) - - def draw_rectangle(self, start, end): - pass - - def delete_selected_row(self): - selected_rows = self.tableWidget_coordinates.selectionModel().selectedRows() - rows_to_delete = [row.row() for row in selected_rows] - rows_to_delete.sort(reverse=True) # Sort in descending order - - # Remove the row from the table - for row_index in rows_to_delete: - self.tableWidget_coordinates.removeRow(row_index) - - # Replot the saved motor map - self.replot_based_on_table(self.tableWidget_coordinates) - - def resizeTable(self, table): - table.resizeColumnsToContents() - - def export_table_to_csv(self, table: QtWidgets.QTableWidget): - options = QFileDialog.Options() - filePath, _ = QFileDialog.getSaveFileName( - self, "Save File", "", "CSV Files (*.csv);;All Files (*)", options=options - ) - - if filePath: - if not filePath.endswith(".csv"): - filePath += ".csv" - - with open(filePath, mode="w", newline="") as file: - writer = csv.writer(file) - - col_offset = 2 if self.comboBox_mode.currentIndex() == 0 else 3 - - # Write the header - header = [] - for col in range(col_offset, table.columnCount()): - header_item = table.horizontalHeaderItem(col) - header.append(header_item.text() if header_item else "") - writer.writerow(header) - - # Write the content - for row in range(table.rowCount()): - row_data = [] - for col in range(col_offset, table.columnCount()): - item = table.item(row, col) - row_data.append(item.text() if item else "") - writer.writerow(row_data) - - def load_table_from_csv(self, table: QtWidgets.QTableWidget, precision: int = 0): - options = QFileDialog.Options() - filePath, _ = QFileDialog.getOpenFileName( - self, "Open File", "", "CSV Files (*.csv);;All Files (*)", options=options - ) - - if filePath: - with open(filePath, mode="r") as file: - reader = csv.reader(file) - header = next(reader) - - # Wipe the current table - table.setRowCount(0) - - # Populate data - for row_data in reader: - tag = row_data[0] - - if self.comboBox_mode.currentIndex() == 0: # Individual mode - x = float(row_data[1]) - y = float(row_data[2]) - self.generate_table_coordinate(table, (x, y), tag, precision) - - elif self.comboBox_mode.currentIndex() == 1: # Start/Stop mode - x_start = float(row_data[1]) - y_start = float(row_data[2]) - x_end = float(row_data[3]) - y_end = float(row_data[4]) - - self.generate_table_coordinate(table, (x_start, y_start), tag, precision) - self.generate_table_coordinate(table, (x_end, y_end), tag, precision) - - if self.checkBox_resize_auto.isChecked(): - table.resizeColumnsToContents() - - def save_absolute_coordinates(self): - self.generate_table_coordinate( - self.tableWidget_coordinates, - (self.spinBox_absolute_x.value(), self.spinBox_absolute_y.value()), - tag=f"Pos {self.tag_N}", - precision=self.precision, - ) - - self.tag_N += 1 - - def save_current_coordinates(self): - self.generate_table_coordinate( - self.tableWidget_coordinates, - self.motor_thread.retrieve_coordinates(), - tag=f"Cur {self.tag_N}", - precision=self.precision, - ) - - self.tag_N += 1 - - def update_precision(self, precision: int): - self.precision = precision - self.spinBox_step_x.setDecimals(self.precision) - self.spinBox_step_y.setDecimals(self.precision) - self.spinBox_absolute_x.setDecimals(self.precision) - self.spinBox_absolute_y.setDecimals(self.precision) - - def change_step_size(self, spinBox: QtWidgets.QDoubleSpinBox, factor: float) -> None: - old_step = spinBox.value() - new_step = old_step * factor - spinBox.setValue(new_step) - - # TODO generalize these functions - - def sync_step_sizes(self): - """Sync step sizes based on checkbox state.""" - if self.checkBox_same_xy.isChecked(): - value = self.spinBox_step_x.value() - self.spinBox_step_y.setValue(value) - - def update_step_size_x(self): - """Update step size for x if checkbox is checked.""" - if self.checkBox_same_xy.isChecked(): - value = self.spinBox_step_x.value() - self.spinBox_step_y.setValue(value) - - def update_step_size_y(self): - """Update step size for y if checkbox is checked.""" - if self.checkBox_same_xy.isChecked(): - value = self.spinBox_step_y.value() - self.spinBox_step_x.setValue(value) - - # def sync_step_sizes(self, spinBox1, spinBox2): #TODO move to more general solution like this - # if self.checkBox_same_xy.isChecked(): - # value = spinBox1.value() - # spinBox2.setValue(value) - - def show_help_dialog(self): - dialog = QDialog(self) - dialog.setWindowTitle("Help") - - layout = QVBoxLayout() - - # Key bindings section - layout.addWidget(QLabel("Keyboard Shortcuts:")) - - key_bindings = [ - ("Delete/Backspace", "Delete selected row"), - ("Ctrl+A", "Increase step size for X motor by factor of 2"), - ("Ctrl+Z", "Decrease step size for X motor by factor of 2"), - ("Alt+A", "Increase step size for Y motor by factor of 2"), - ("Alt+Z", "Decrease step size for Y motor by factor of 2"), - ("Ctrl+G", "Go absolute"), - ("Ctrl+D", "Set absolute coordinates"), - ("Ctrl+S", "Save Current coordinates"), - ("Ctrl+X", "Stop"), - ] - - for keys, action in key_bindings: - layout.addWidget(QLabel(f"{keys} - {action}")) - - # Separator - separator = QFrame() - separator.setFrameShape(QFrame.HLine) - separator.setFrameShadow(QFrame.Sunken) - layout.addWidget(separator) - - # Import/Export section - layout.addWidget(QLabel("Import/Export of Table:")) - layout.addWidget( - QLabel( - "Create additional table columns in config yaml file.\n" - "Be sure to load the correct config file with console argument -c.\n" - "When importing a table, the first three columns must be [Tag, X, Y] in the case of Individual mode \n" - "and [Tag, X [start], Y [start], X [end], Y [end] in the case of Start/Stop mode.\n" - "Failing to do so will break the table!" - ) - ) - layout.addWidget( - QLabel( - "Note: Importing a table will overwrite the current table. Import in correct mode." - ) - ) - - # Another Separator - another_separator = QFrame() - another_separator.setFrameShape(QFrame.HLine) - another_separator.setFrameShadow(QFrame.Sunken) - layout.addWidget(another_separator) - - # PyQtGraph Controls - layout.addWidget(QLabel("Graph Window Controls:")) - graph_controls = [("Left Drag", "Pan the view"), ("Right Drag or Scroll", "Zoom in/out")] - for action, description in graph_controls: - layout.addWidget(QLabel(f"{action} - {description}")) - - ok_button = QPushButton("OK") - ok_button.clicked.connect(dialog.close) - layout.addWidget(ok_button) - - dialog.setLayout(layout) - dialog.exec() - - @staticmethod - def param_changed(ui_element): - ui_element.setStyleSheet("background-color: #FFA700;") - - -class MotorActions(Enum): - MOVE_TO_COORDINATES = "move_to_coordinates" - MOVE_RELATIVE = "move_relative" - - -class MotorControl(QThread): - """ - QThread subclass for controlling motor actions asynchronously. - - Attributes: - coordinates_updated (pyqtSignal): Signal to emit current coordinates. - limits_retrieved (pyqtSignal): Signal to emit current limits. - move_finished (pyqtSignal): Signal to emit when the move is finished. - motors_loaded (pyqtSignal): Signal to emit when the motors are loaded. - motors_selected (pyqtSignal): Signal to emit when the motors are selected. - """ - - coordinates_updated = pyqtSignal(float, float) # Signal to emit current coordinates - limits_retrieved = pyqtSignal(list, list) # Signal to emit current limits - move_finished = pyqtSignal() # Signal to emit when the move is finished - motors_loaded = pyqtSignal(list, list) # Signal to emit when the motors are loaded - motors_selected = pyqtSignal(object, object) # Signal to emit when the motors are selected - # progress_updated = pyqtSignal(int) #TODO Signal to emit progress percentage - - def __init__(self, parent=None): - super().__init__(parent) - - self.action = None - self._initialize_motor() - - def connect_motors(self, motor_x_name: str, motor_y_name: str) -> None: - """ - Connect to the specified motors by their names. - - Args: - motor_x_name (str): The name of the motor for the x-axis. - motor_y_name (str): The name of the motor for the y-axis. - """ - self.motor_x_name = motor_x_name - self.motor_y_name = motor_y_name - - self.motor_x, self.motor_y = (dev[self.motor_x_name], dev[self.motor_y_name]) - - (self.current_x, self.current_y) = self.get_coordinates() - - if self.motors_consumer is not None: - self.motors_consumer.shutdown() - - self.motors_consumer = client.connector.consumer( - topics=[ - MessageEndpoints.device_readback(self.motor_x.name), - MessageEndpoints.device_readback(self.motor_y.name), - ], - cb=self._device_status_callback_motors, - parent=self, - ) - - self.motors_consumer.start() - - self.motors_selected.emit(self.motor_x, self.motor_y) - - def get_all_motors(self) -> list: - """ - Retrieve a list of all available motors. - - Returns: - list: List of all available motors. - """ - all_motors = ( - client.device_manager.devices.enabled_devices - ) # .acquisition_group("motor") #TODO remove motor group? - return all_motors - - def get_all_motors_names(self) -> list: - all_motors = client.device_manager.devices.enabled_devices # .acquisition_group("motor") - all_motors_names = [motor.name for motor in all_motors] - return all_motors_names - - def retrieve_all_motors(self): - self.all_motors = self.get_all_motors() - self.all_motors_names = self.get_all_motors_names() - self.motors_loaded.emit(self.all_motors_names, self.all_motors_names) - - return self.all_motors, self.all_motors_names - - def get_coordinates(self) -> tuple: - """Get current motor position""" - x = self.motor_x.readback.get() - y = self.motor_y.readback.get() - return x, y - - def retrieve_coordinates(self) -> tuple: - """Get current motor position for export to main app""" - return self.current_x, self.current_y - - def get_motor_limits(self, motor) -> list: - """ - Retrieve the limits for a specific motor. - - Args: - motor (object): Motor object. - - Returns: - tuple: Lower and upper limit for the motor. - """ - try: - return motor.limits - except AttributeError: - # If the motor doesn't have a 'limits' attribute, return a default value or raise a custom exception - print(f"The device {motor} does not have defined limits.") - return None - - def retrieve_motor_limits(self, motor_x, motor_y): - limit_x = self.get_motor_limits(motor_x) - limit_y = self.get_motor_limits(motor_y) - self.limits_retrieved.emit(limit_x, limit_y) - - def update_motor_limits(self, motor, low_limit=None, high_limit=None) -> None: - current_low_limit, current_high_limit = self.get_motor_limits(motor) - - # Check if the low limit has changed and is not None - if low_limit is not None and low_limit != current_low_limit: - motor.low_limit = low_limit - - # Check if the high limit has changed and is not None - if high_limit is not None and high_limit != current_high_limit: - motor.high_limit = high_limit - - def update_all_motor_limits(self, x_limit: list = None, y_limit: list = None) -> None: - current_position = self.get_coordinates() - - if x_limit is not None: - if current_position[0] < x_limit[0] or current_position[0] > x_limit[1]: - raise ValueError("Current motor position is outside the new limits (X)") - else: - self.update_motor_limits(self.motor_x, low_limit=x_limit[0], high_limit=x_limit[1]) - - if y_limit is not None: - if current_position[1] < y_limit[0] or current_position[1] > y_limit[1]: - raise ValueError("Current motor position is outside the new limits (Y)") - else: - self.update_motor_limits(self.motor_y, low_limit=y_limit[0], high_limit=y_limit[1]) - - self.retrieve_motor_limits(self.motor_x, self.motor_y) - - def move_to_coordinates(self, target_coordinates: tuple): - self.action = MotorActions.MOVE_TO_COORDINATES - self.target_coordinates = target_coordinates - self.start() - - def move_relative(self, motor, value: float): - self.action = MotorActions.MOVE_RELATIVE - self.motor = motor - self.value = value - self.start() - - def run(self): - if self.action == MotorActions.MOVE_TO_COORDINATES: - self._move_motor_coordinate() - elif self.action == MotorActions.MOVE_RELATIVE: - self._move_motor_relative(self.motor, self.value) - - def set_target_coordinates(self, target_coordinates: tuple) -> None: - self.target_coordinates = target_coordinates - - def _initialize_motor(self) -> None: - self.motor_x, self.motor_y = None, None - self.current_x, self.current_y = None, None - - self.motors_consumer = None - - # Get all available motors in the client - self.all_motors = self.get_all_motors() - self.all_motors_names = self.get_all_motors_names() - self.retrieve_all_motors() # send motor list to GUI - - self.target_coordinates = None - - def _move_motor_coordinate(self) -> None: - """Move the motor to the specified coordinates""" - status = scans.mv( - self.motor_x, - self.target_coordinates[0], - self.motor_y, - self.target_coordinates[1], - relative=False, - ) - - status.wait() - self.move_finished.emit() - - def _move_motor_relative(self, motor, value: float) -> None: - status = scans.mv(motor, value, relative=True) - - status.wait() - self.move_finished.emit() - - def stop_movement(self): - queue.request_scan_abortion() - queue.request_queue_reset() - - @staticmethod - def _device_status_callback_motors(msg, *, parent, **_kwargs) -> None: - deviceMSG = msg.value - if parent.motor_x.name in deviceMSG.content["signals"]: - parent.current_x = deviceMSG.content["signals"][parent.motor_x.name]["value"] - elif parent.motor_y.name in deviceMSG.content["signals"]: - parent.current_y = deviceMSG.content["signals"][parent.motor_y.name]["value"] - parent.coordinates_updated.emit(parent.current_x, parent.current_y) - - -if __name__ == "__main__": - import argparse - - import yaml - from bec_lib import BECClient, ServiceConfig - - parser = argparse.ArgumentParser(description="Motor App") - - parser.add_argument( - "--config", "-c", help="Path to the .yaml configuration file", default="config_example.yaml" - ) - parser.add_argument( - "--bec-config", "-b", help="Path to the BEC .yaml configuration file", default=None - ) - - args = parser.parse_args() - - try: - with open(args.config, "r") as file: - config = yaml.safe_load(file) - - selected_motors = config.get("selected_motors", {}) - plot_motors = config.get("plot_motors", {}) - - except FileNotFoundError: - print(f"The file {args.config} was not found.") - exit(1) - except Exception as e: - print(f"An error occurred while loading the config file: {e}") - exit(1) - - client = BECClient() - - if args.bec_config: - client.initialize(config=ServiceConfig(config_path=args.bec_config)) - - client.start() - dev = client.device_manager.devices - scans = client.scans - queue = client.queue - - app = QApplication([]) - MotorApp = MotorApp(selected_motors=selected_motors, plot_motors=plot_motors) - window = MotorApp - window.show() - app.exec() diff --git a/bec_widgets/examples/stream_plot/__init__.py b/bec_widgets/examples/stream_plot/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/bec_widgets/examples/stream_plot/line_plot.ui b/bec_widgets/examples/stream_plot/line_plot.ui deleted file mode 100644 index b403f8c0..00000000 --- a/bec_widgets/examples/stream_plot/line_plot.ui +++ /dev/null @@ -1,148 +0,0 @@ - - - Form - - - - 0 - 0 - 845 - 635 - - - - Line Plot - - - - - - Qt::Horizontal - - - - - 1 - 1 - - - - Qt::Vertical - - - - - - - - - - Generate 1D and 2D data without stream - - - - - - - - 0 - 0 - - - - 1st angle of azimutal segment (deg) - - - - - - - 0 - 0 - - - - 360.000000000000000 - - - 0.250000000000000 - - - - - - - - 0 - 0 - - - - - f1amp - - - - - f2amp - - - - - f2 phase - - - - - - - - - - - - - Precision - - - - - - - 4 - - - - - - - - - Qt::ElideMiddle - - - - Display - - - - - X - - - - - Y - - - - - - - - - - - - - diff --git a/bec_widgets/examples/stream_plot/stream_plot.py b/bec_widgets/examples/stream_plot/stream_plot.py deleted file mode 100644 index 145d7a6d..00000000 --- a/bec_widgets/examples/stream_plot/stream_plot.py +++ /dev/null @@ -1,342 +0,0 @@ -import os -import threading -import time - -import numpy as np -import pyqtgraph -import pyqtgraph as pg -from bec_lib import messages -from bec_lib.endpoints import MessageEndpoints -from bec_lib.redis_connector import RedisConnector -from pyqtgraph import mkBrush, mkPen -from pyqtgraph.Qt import QtCore, QtWidgets -from qtpy.QtCore import Signal, Slot -from qtpy.QtWidgets import QTableWidgetItem, QVBoxLayout - -from bec_widgets.utils import Colors, Crosshair, UILoader -from bec_widgets.utils.bec_dispatcher import BECDispatcher - - -class StreamPlot(QtWidgets.QWidget): - update_signal = Signal() - roi_signal = Signal(tuple) - - def __init__(self, name="", y_value_list=["gauss_bpm"], client=None, parent=None) -> None: - """ - Basic plot widget for displaying scan data. - - Args: - name (str, optional): Name of the plot. Defaults to "". - y_value_list (list, optional): List of signals to be plotted. Defaults to ["gauss_bpm"]. - """ - - # Client and device manager from BEC - self.client = BECDispatcher().client if client is None else client - - super(StreamPlot, self).__init__() - # Set style for pyqtgraph plots - pg.setConfigOption("background", "w") - pg.setConfigOption("foreground", "k") - current_path = os.path.dirname(__file__) - self.ui = UILoader().load_ui(os.path.join(current_path, "line_plot.ui"), self) - - self._idle_time = 100 - self.connector = RedisConnector(["localhost:6379"]) - - self.y_value_list = y_value_list - self.previous_y_value_list = None - self.plotter_data_x = [] - self.plotter_data_y = [] - - self.plotter_scan_id = None - - self._current_proj = None - self._current_metadata_ep = "px_stream/projection_{}/metadata" - - self.proxy_update = pg.SignalProxy(self.update_signal, rateLimit=25, slot=self.update) - - self._data_retriever_thread_exit_event = threading.Event() - self.data_retriever = threading.Thread( - target=self.on_projection, args=(self._data_retriever_thread_exit_event,), daemon=True - ) - self.data_retriever.start() - - ########################## - # UI - ########################## - self.init_ui() - self.init_curves() - self.hook_crosshair() - - def close(self): - super().close() - self._data_retriever_thread_exit_event.set() - self.data_retriever.join() - - def init_ui(self): - """Setup all ui elements""" - ########################## - # 1D Plot - ########################## - - # LabelItem for ROI - self.label_plot = pg.LabelItem(justify="center") - self.glw_plot_layout = QVBoxLayout(self.ui.glw_plot_placeholder) - self.glw_plot = pg.GraphicsLayoutWidget() - self.glw_plot_layout.addWidget(self.glw_plot) - self.glw_plot.addItem(self.label_plot) - self.label_plot.setText("ROI region") - - # ROI selector - so far from [-1,1] #TODO update to scale with xrange - self.roi_selector = pg.LinearRegionItem([-1, 1]) - - self.glw_plot.nextRow() # TODO update of cursor - self.label_plot_moved = pg.LabelItem(justify="center") - self.glw_plot.addItem(self.label_plot_moved) - self.label_plot_moved.setText("Actual coordinates (X, Y)") - - # Label for coordinates clicked - self.glw_plot.nextRow() - self.label_plot_clicked = pg.LabelItem(justify="center") - self.glw_plot.addItem(self.label_plot_clicked) - self.label_plot_clicked.setText("Clicked coordinates (X, Y)") - - # 1D PlotItem - self.glw_plot.nextRow() - self.plot = pg.PlotItem() - self.plot.setLogMode(True, True) - self.glw_plot.addItem(self.plot) - self.plot.addLegend() - - ########################## - # 2D Plot - ########################## - - # Label for coordinates moved - self.label_image_moved = pg.LabelItem(justify="center") - self.glw_image_layout = QVBoxLayout(self.ui.glw_image_placeholder) - self.glw_image = pg.GraphicsLayoutWidget() - self.glw_plot_layout.addWidget(self.glw_image) - self.glw_image.addItem(self.label_image_moved) - self.label_image_moved.setText("Actual coordinates (X, Y)") - - # Label for coordinates clicked - self.glw_image.nextRow() - self.label_image_clicked = pg.LabelItem(justify="center") - self.glw_image.addItem(self.label_image_clicked) - self.label_image_clicked.setText("Clicked coordinates (X, Y)") - - # TODO try to lock aspect ratio with view - - # # Create a window - # win = pg.GraphicsLayoutWidget() - # win.show() - # - # # Create a ViewBox - # view = win.addViewBox() - # - # # Lock the aspect ratio - # view.setAspectLocked(True) - - # # Create an ImageItem - # image_item = pg.ImageItem(np.random.random((100, 100))) - # - # # Add the ImageItem to the ViewBox - # view.addItem(image_item) - - # 2D ImageItem - self.glw_image.nextRow() - self.plot_image = pg.PlotItem() - self.glw_image.addItem(self.plot_image) - - def init_curves(self): - # init of 1D plot - self.plot.clear() - - self.curves = [] - self.pens = [] - self.brushs = [] - - self.color_list = Colors.golden_angle_color(colormap="CET-R2", num=len(self.y_value_list)) - - for ii, y_value in enumerate(self.y_value_list): - pen = mkPen(color=self.color_list[ii], width=2, style=QtCore.Qt.DashLine) - brush = mkBrush(color=self.color_list[ii]) - curve = pg.PlotDataItem(symbolBrush=brush, pen=pen, skipFiniteCheck=True, name=y_value) - self.plot.addItem(curve) - self.curves.append(curve) - self.pens.append(pen) - self.brushs.append(brush) - - # check if roi selector is in the plot - if self.roi_selector not in self.plot.items: - self.plot.addItem(self.roi_selector) - - # init of 2D plot - self.plot_image.clear() - - self.img = pg.ImageItem() - self.plot_image.addItem(self.img) - - # hooking signals - self.hook_crosshair() - self.init_table() - - def splitter_sizes(self): ... - - def hook_crosshair(self): - self.crosshair_1d = Crosshair(self.plot, precision=4) - - self.crosshair_1d.coordinatesChanged1D.connect( - lambda x, y: self.label_plot_moved.setText(f"Moved : ({x}, {y})") - ) - self.crosshair_1d.coordinatesClicked1D.connect( - lambda x, y: self.label_plot_clicked.setText(f"Moved : ({x}, {y})") - ) - - self.crosshair_1d.coordinatesChanged1D.connect( - lambda x, y: self.update_table(table_widget=self.cursor_table, x=x, y_values=y) - ) - - self.crosshair_2D = Crosshair(self.plot_image) - - self.crosshair_2D.coordinatesChanged2D.connect( - lambda x, y: self.label_image_moved.setText(f"Moved : ({x}, {y})") - ) - self.crosshair_2D.coordinatesClicked2D.connect( - lambda x, y: self.label_image_clicked.setText(f"Moved : ({x}, {y})") - ) - - # ROI - self.roi_selector.sigRegionChangeFinished.connect(self.get_roi_region) - - def get_roi_region(self): - """For testing purpose now, get roi region and print it to self.label as tuple""" - region = self.roi_selector.getRegion() - self.label_plot.setText(f"x = {(10 ** region[0]):.4f}, y ={(10 ** region[1]):.4f}") - return_dict = { - "horiz_roi": [ - np.where(self.plotter_data_x[0] > 10 ** region[0])[0][0], - np.where(self.plotter_data_x[0] < 10 ** region[1])[0][-1], - ] - } - msg = messages.DeviceMessage(signals=return_dict).dumps() - self.connector.set_and_publish("px_stream/gui_event", msg=msg) - self.roi_signal.emit(region) - - def init_table(self): - # Init number of rows in table according to n of devices - self.ui.cursor_table.setRowCount(len(self.y_value_list)) - # self.table.setHorizontalHeaderLabels(["(X, Y) - Moved", "(X, Y) - Clicked"]) #TODO can be dynamic - self.ui.cursor_table.setVerticalHeaderLabels(self.y_value_list) - self.ui.cursor_table.resizeColumnsToContents() - - def update_table(self, table_widget, x, y_values): - for i, y in enumerate(y_values): - table_widget.setItem(i, 1, QTableWidgetItem(str(x))) - table_widget.setItem(i, 2, QTableWidgetItem(str(y))) - table_widget.resizeColumnsToContents() - - def update(self): - """Update the plot with the new data.""" - - # check if QTable was initialised and if list of devices was changed - # if self.y_value_list != self.previous_y_value_list: - # self.setup_cursor_table() - # self.previous_y_value_list = self.y_value_list.copy() if self.y_value_list else None - - self.curves[0].setData(self.plotter_data_x[0], self.plotter_data_y[0]) - - @staticmethod - def flip_even_rows(arr): - arr_copy = np.copy(arr) # Create a writable copy - arr_copy[1::2, :] = arr_copy[1::2, ::-1] - return arr_copy - - @staticmethod - def remove_curve_by_name(plot: pyqtgraph.PlotItem, name: str) -> None: - # def remove_curve_by_name(plot: pyqtgraph.PlotItem, checkbox: QtWidgets.QCheckBox, name: str) -> None: - """Removes a curve from the given plot by the specified name. - - Args: - plot (pyqtgraph.PlotItem): The plot from which to remove the curve. - name (str): The name of the curve to remove. - """ - # if checkbox.isChecked(): - for item in plot.items: - if isinstance(item, pg.PlotDataItem) and getattr(item, "opts", {}).get("name") == name: - plot.removeItem(item) - return - - # else: - # return - - def on_projection(self, exit_event): - while not exit_event.is_set(): - if self._current_proj is None: - time.sleep(0.1) - continue - endpoint = f"px_stream/projection_{self._current_proj}/data" - msgs = self.client.connector.lrange(topic=endpoint, start=-1, end=-1) - data = msgs - if not data: - continue - with np.errstate(divide="ignore", invalid="ignore"): - self.plotter_data_y = [ - np.sum( - np.sum(data[-1].content["signals"]["data"] * self._current_norm, axis=1) - / np.sum(self._current_norm, axis=0), - axis=0, - ).squeeze() - ] - - self.update_signal.emit() - - @Slot(dict, dict) - def on_dap_update(self, data: dict, metadata: dict): - flipped_data = self.flip_even_rows(data["data"]["z"]) - - self.img.setImage(flipped_data) - - @Slot(dict, dict) - def new_proj(self, content: dict, _metadata: dict): - proj_nr = content["signals"]["proj_nr"] - endpoint = f"px_stream/projection_{proj_nr}/metadata" - msg_raw = self.client.connector.get(topic=endpoint) - msg = messages.DeviceMessage.loads(msg_raw) - self._current_q = msg.content["signals"]["q"] - self._current_norm = msg.content["signals"]["norm_sum"] - self._current_metadata = msg.content["signals"]["metadata"] - - self.plotter_data_x = [self._current_q] - self._current_proj = proj_nr - - -if __name__ == "__main__": - import argparse - - # from bec_widgets import ctrl_c # TODO uncomment when ctrl_c is ready to be compatible with qtpy - - parser = argparse.ArgumentParser() - parser.add_argument( - "--signals", help="specify recorded signals", nargs="+", default=["gauss_bpm"] - ) - # default = ["gauss_bpm", "bpm4i", "bpm5i", "bpm6i", "xert"], - value = parser.parse_args() - print(f"Plotting signals for: {', '.join(value.signals)}") - - # Client from dispatcher - bec_dispatcher = BECDispatcher() - client = bec_dispatcher.client - - app = QtWidgets.QApplication([]) - # ctrl_c.setup(app) # TODO uncomment when ctrl_c is ready to be compatible with qtpy - plot = StreamPlot(y_value_list=value.signals, client=client) - - bec_dispatcher.connect_slot(plot.new_proj, "px_stream/proj_nr") - bec_dispatcher.connect_slot( - plot.on_dap_update, MessageEndpoints.processed_data("px_dap_worker") - ) - plot.show() - # client.callbacks.register("scan_segment", plot, sync=False) - app.exec() diff --git a/tests/unit_tests/test_stream_plot.py b/tests/unit_tests/test_stream_plot.py deleted file mode 100644 index f6789561..00000000 --- a/tests/unit_tests/test_stream_plot.py +++ /dev/null @@ -1,158 +0,0 @@ -# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring -import threading -from unittest import mock - -import numpy as np -import pytest -from bec_lib import messages -from bec_lib.redis_connector import RedisConnector -from pytestqt import qtbot - -from bec_widgets.examples.stream_plot.stream_plot import StreamPlot - - -@pytest.fixture(scope="function") -def stream_app(qtbot): - """Helper function to set up the StreamPlot widget.""" - client = mock.MagicMock() - widget = StreamPlot(client=client) - qtbot.addWidget(widget) - qtbot.waitExposed(widget) - yield widget - widget.close() - - -def test_roi_signals_emitted(qtbot, stream_app): - region = (0.1, 0.9) - with qtbot.waitSignal(stream_app.roi_signal, timeout=1000) as blocker: - stream_app.roi_signal.emit(region) - assert blocker.signal_triggered - assert blocker.args == [region] - - -def test_update_signals_emitted(qtbot, stream_app): - # Mimic data coming from the data stream - stream_app.plotter_data_x = [list(range(10))] # Replace with the actual x data - stream_app.plotter_data_y = [list(range(10))] # Replace with the actual y data - - # Initialize curves - stream_app.init_curves() - - with qtbot.waitSignal(stream_app.update_signal, timeout=1000) as blocker: - stream_app.update_signal.emit() - assert blocker.signal_triggered - - -def test_ui_initialization(qtbot, stream_app): - """Checking the UI creation.""" - - # Check if UI elements are initialized correctly - assert stream_app.label_plot is not None - assert stream_app.label_plot_moved is not None - assert stream_app.label_plot_clicked is not None - assert stream_app.label_image_moved is not None - assert stream_app.label_image_clicked is not None - - # Check if plots are initialized correctly - assert stream_app.plot is not None - assert stream_app.plot_image is not None - - # Check if ROI selector is initialized correctly - assert stream_app.roi_selector is not None - - -def test_1d_plotting_data(qtbot, stream_app): - # Set up some mock data - x_data = [list(range(10))] - y_data = [list(range(10))] - - # Manually set the data attributes - stream_app.plotter_data_x = x_data - stream_app.plotter_data_y = y_data - stream_app.y_value_list = ["Curve 1"] - - # Initialize curves and update the plot - stream_app.init_curves() - stream_app.update() # This should update the plot with the new data - - # Check the data on the plot - for idx, curve in enumerate(stream_app.curves): - np.testing.assert_array_equal(curve.xData, x_data[0]) # Access the first list of x_data - np.testing.assert_array_equal( - curve.yData, y_data[idx] - ) # Access the list of y_data for each curve without additional indexing - - -def test_flip_even_rows(qtbot, stream_app): - # Create a numpy array with some known data - original_array = np.array( - [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10], [11, 12, 13, 14, 15], [16, 17, 18, 19, 20]] - ) - - # Call flip_even_rows on the original array - flipped_array = stream_app.flip_even_rows(original_array) - - # Expected array flipped along the rows with even indices - expected_array = np.array( - [[1, 2, 3, 4, 5], [10, 9, 8, 7, 6], [11, 12, 13, 14, 15], [20, 19, 18, 17, 16]] - ) - - # Check that flip_even_rows returned the expected result - np.testing.assert_array_equal(flipped_array, expected_array) - - -def test_on_dap_update(qtbot, stream_app): - """2D image rendering by dap update""" - # Create some mock data to be "received" by the slot - data_dict = {"data": {"z": np.random.rand(10, 10)}} - metadata_dict = {} - - # Trigger the slot - stream_app.on_dap_update(data_dict, metadata_dict) - - # Apply the same transformation to the test data - expected_data = stream_app.flip_even_rows(data_dict["data"]["z"]) - - # Now check the state of the StreamPlot object - # For example, check the data of the image plot: - np.testing.assert_array_equal(stream_app.img.image, expected_data) - - -#################### -# Until Here -#################### - -# def test_new_proj(qtbot, stream_app): #TODO this test is not working, does it make sense testing even? -# # Create some mock content to be "received" by the slot -# content_dict = {"signals": {"proj_nr": 1}} -# metadata_dict = {} -# -# # Manually create some mock data that new_proj would work with -# # This step may need to be adjusted to fit the actual behavior of new_proj -# mock_data = { -# "q": np.array([1, 2, 3, 4, 5]), -# "norm_sum": np.array([6, 7, 8, 9, 10]), -# "metadata": "some_metadata", -# } -# -# # Assume the RedisConnector client would return this data when new_proj is called -# mock_message = mock.MagicMock(spec=messages.DeviceMessage) -# mock_message.__getitem__.side_effect = lambda key: mock_data[key] -# stream_app.client.producer.get = mock.MagicMock(return_value=mock_message.dumps()) -# -# # Trigger the slot -# stream_app.new_proj(content_dict, metadata_dict) -# -# # Now check the state of the StreamPlot object -# # For example, check that the plotter_data_x attribute was updated correctly: -# np.testing.assert_array_equal(stream_app.plotter_data_x, [mock_data["q"]]) -# assert stream_app._current_proj == 1 -# assert stream_app._current_q == mock_data["q"] -# assert stream_app._current_norm == mock_data["norm_sum"] -# assert stream_app._current_metadata == mock_data["metadata"] - - -# def test_connection_creation(qtbot, stream_app): #TODO maybe test connections in a different way? -# assert isinstance(stream_app.producer, RedisConnector) -# assert isinstance(stream_app.data_retriever, threading.Thread) -# assert stream_app.data_retriever.is_alive()