diff --git a/bec_widgets/widgets/figure/plots/motor_map/motor_map.py b/bec_widgets/widgets/figure/plots/motor_map/motor_map.py index c43dc837..190081ca 100644 --- a/bec_widgets/widgets/figure/plots/motor_map/motor_map.py +++ b/bec_widgets/widgets/figure/plots/motor_map/motor_map.py @@ -6,22 +6,23 @@ from typing import Optional, Union import numpy as np import pyqtgraph as pg from bec_lib.endpoints import MessageEndpoints -from pydantic import Field +from pydantic import Field, ValidationError, field_validator +from pydantic_core import PydanticCustomError from qtpy import QtCore, QtGui from qtpy.QtCore import Signal as pyqtSignal from qtpy.QtCore import Slot as pyqtSlot from qtpy.QtWidgets import QWidget -from bec_widgets.utils import EntryValidator +from bec_widgets.utils import Colors, EntryValidator from bec_widgets.widgets.figure.plots.plot_base import BECPlotBase, SubplotConfig from bec_widgets.widgets.figure.plots.waveform.waveform import Signal, SignalData class MotorMapConfig(SubplotConfig): signals: Optional[Signal] = Field(None, description="Signals of the motor map") - color_map: Optional[str] = Field( - "Greys", description="Color scheme of the motor position gradient." - ) # TODO decide if useful for anything, or just keep GREYS always + color: Optional[str | tuple] = Field( + (255, 255, 255, 255), description="The color of the last point of current position." + ) scatter_size: Optional[int] = Field(5, description="Size of the scatter points.") max_points: Optional[int] = Field(1000, description="Maximum number of points to display.") num_dim_points: Optional[int] = Field( @@ -30,8 +31,23 @@ class MotorMapConfig(SubplotConfig): ) precision: Optional[int] = Field(2, description="Decimal precision of the motor position.") background_value: Optional[int] = Field( - 25, description="Background value of the motor map." - ) # TODO can be percentage from 255 calculated + 25, description="Background value of the motor map. Has to be between 0 and 255." + ) + + model_config: dict = {"validate_assignment": True} + + _validate_color = field_validator("color")(Colors.validate_color) + + # @field_validator("color") + # def convert_to_rgba(cls, value): + + @field_validator("background_value") + def validate_background_value(cls, value): + if not 0 <= value <= 255: + raise PydanticCustomError( + "wrong_value", f"'{value}' hs to be between 0 and 255.", {"wrong_value": value} + ) + return value class BECMotorMap(BECPlotBase): @@ -69,29 +85,43 @@ class BECMotorMap(BECPlotBase): self.get_bec_shortcuts() self.entry_validator = EntryValidator(self.dev) + # connect update signal to update plot + self.proxy_update_plot = pg.SignalProxy( + self.update_signal, rateLimit=25, slot=self._update_plot + ) + self.apply_config(self.config) + + def apply_config(self, config: dict | MotorMapConfig): + """ + Apply the config to the motor map. + + Args: + config(dict|MotorMapConfig): Config to be applied. + """ + if isinstance(config, dict): + try: + config = MotorMapConfig(**config) + except ValidationError as e: + print(f"Error in applying config: {e}") + return + + self.config = config + self.plot_item.clear() + self.motor_x = None self.motor_y = None self.database_buffer = {"x": [], "y": []} self.plot_components = defaultdict(dict) # container for plot components - # connect update signal to update plot - self.proxy_update_plot = pg.SignalProxy( - self.update_signal, rateLimit=25, slot=self._update_plot - ) + self.apply_axis_config() - # TODO decide if needed to implement, maybe there will be no children widgets for motormap for now... - # def find_widget_by_id(self, item_id: str) -> BECCurve: - # """ - # Find the curve by its ID. - # Args: - # item_id(str): ID of the curve. - # - # Returns: - # BECCurve: The curve object. - # """ - # for curve in self.plot_item.curves: - # if curve.gui_id == item_id: - # return curve + if self.config.signals is not None: + self.change_motors( + motor_x=self.config.signals.x.name, + motor_y=self.config.signals.y.name, + motor_x_entry=self.config.signals.x.entry, + motor_y_entry=self.config.signals.y.entry, + ) @pyqtSlot(str, str, str, str, bool) def change_motors( @@ -129,6 +159,8 @@ class BECMotorMap(BECPlotBase): # reconnect the signals self._connect_motor_to_slots() + self.database_buffer = {"x": [], "y": []} + # Redraw the motor map self._make_motor_map() @@ -141,7 +173,19 @@ class BECMotorMap(BECPlotBase): data = {"x": self.database_buffer["x"], "y": self.database_buffer["y"]} return data - # TODO setup all visual properties + def set_color(self, color: [str | tuple]): + """ + Set color of the motor trace. + + Args: + color(str|tuple): Color of the motor trace. Can be HEX(str) or RGBA(tuple). + """ + if isinstance(color, str): + color = Colors.validate_color(color) + color = Colors.hex_to_rgba(color, 255) + self.config.color = color + self.update_signal.emit() + def set_max_points(self, max_points: int) -> None: """ Set the maximum number of points to display. @@ -150,6 +194,7 @@ class BECMotorMap(BECPlotBase): max_points(int): Maximum number of points to display. """ self.config.max_points = max_points + self.update_signal.emit() def set_precision(self, precision: int) -> None: """ @@ -159,6 +204,7 @@ class BECMotorMap(BECPlotBase): precision(int): Decimal precision of the motor position. """ self.config.precision = precision + self.update_signal.emit() def set_num_dim_points(self, num_dim_points: int) -> None: """ @@ -168,6 +214,7 @@ class BECMotorMap(BECPlotBase): num_dim_points(int): Number of dim points. """ self.config.num_dim_points = num_dim_points + self.update_signal.emit() def set_background_value(self, background_value: int) -> None: """ @@ -177,6 +224,7 @@ class BECMotorMap(BECPlotBase): background_value(int): Background value of the motor map. """ self.config.background_value = background_value + self._swap_limit_map() def set_scatter_size(self, scatter_size: int) -> None: """ @@ -186,6 +234,7 @@ class BECMotorMap(BECPlotBase): scatter_size(int): Size of the scatter points. """ self.config.scatter_size = scatter_size + self.update_signal.emit() def _disconnect_current_motors(self): """Disconnect the current motors from the slots.""" @@ -210,6 +259,15 @@ class BECMotorMap(BECPlotBase): self.bec_dispatcher.connect_slot(self.on_device_readback, endpoints) + def _swap_limit_map(self): + """Swap the limit map.""" + self.plot_item.removeItem(self.plot_components["limit_map"]) + self.plot_components["limit_map"] = self._make_limit_map( + self.config.signals.x.limits, self.config.signals.y.limits + ) + self.plot_components["limit_map"].setZValue(-1) + self.plot_item.addItem(self.plot_components["limit_map"]) + def _make_motor_map(self): """ Create the motor map plot. @@ -249,6 +307,8 @@ class BECMotorMap(BECPlotBase): # Set default labels for the plot self.set(x_label=f"Motor X ({self.motor_x})", y_label=f"Motor Y ({self.motor_y})") + self.update_signal.emit() + def _add_coordinantes_crosshair(self, x: float, y: float) -> None: """ Add crosshair to the plot to highlight the current position. @@ -373,19 +433,31 @@ class BECMotorMap(BECPlotBase): def _update_plot(self): """Update the motor map plot.""" + # If the number of points exceeds max_points, delete the oldest points + if len(self.database_buffer["x"]) > self.config.max_points: + self.database_buffer["x"] = self.database_buffer["x"][-self.config.max_points :] + self.database_buffer["y"] = self.database_buffer["y"][-self.config.max_points :] + x = self.database_buffer["x"] y = self.database_buffer["y"] # Setup gradient brush for history brushes = [pg.mkBrush(50, 50, 50, 255)] * len(x) + # RGB color + r, g, b, a = self.config.color + # Calculate the decrement step based on self.num_dim_points num_dim_points = self.config.num_dim_points decrement_step = (255 - 50) / num_dim_points + for i in range(1, min(num_dim_points + 1, len(x) + 1)): brightness = max(60, 255 - decrement_step * (i - 1)) - brushes[-i] = pg.mkBrush(brightness, brightness, brightness, 255) - brushes[-1] = pg.mkBrush(255, 255, 255, 255) # Newest point is always full brightness + dim_r = int(r * (brightness / 255)) + dim_g = int(g * (brightness / 255)) + dim_b = int(b * (brightness / 255)) + brushes[-i] = pg.mkBrush(dim_r, dim_g, dim_b, a) + brushes[-1] = pg.mkBrush(r, g, b, a) # Newest point is always full brightness scatter_size = self.config.scatter_size # Update the scatter plot diff --git a/tests/unit_tests/client_mocks.py b/tests/unit_tests/client_mocks.py index 22e6ff49..827f28c6 100644 --- a/tests/unit_tests/client_mocks.py +++ b/tests/unit_tests/client_mocks.py @@ -84,6 +84,7 @@ class DMMock: DEVICES = [ FakePositioner("samx", limits=[-10, 10], read_value=2.0), FakePositioner("samy", limits=[-5, 5], read_value=3.0), + FakePositioner("samz", limits=[-8, 8], read_value=4.0), FakePositioner("aptrx", limits=None, read_value=4.0), FakePositioner("aptry", limits=None, read_value=5.0), FakeDevice("gauss_bpm"), diff --git a/tests/unit_tests/test_bec_motor_map.py b/tests/unit_tests/test_bec_motor_map.py index 423e6604..52d8e1bc 100644 --- a/tests/unit_tests/test_bec_motor_map.py +++ b/tests/unit_tests/test_bec_motor_map.py @@ -1,100 +1,106 @@ +import numpy as np import pytest from bec_widgets.widgets.figure.plots.motor_map.motor_map import BECMotorMap, MotorMapConfig from bec_widgets.widgets.figure.plots.waveform.waveform_curve import SignalData from .client_mocks import mocked_client +from .test_bec_figure import bec_figure -@pytest.fixture(scope="function") -def bec_motor_map(qtbot, mocked_client): - widget = BECMotorMap(client=mocked_client, gui_id="BECMotorMap_test") - # qtbot.addWidget(widget) - # qtbot.waitExposed(widget) - yield widget +def test_motor_map_init(bec_figure): + default_config = MotorMapConfig(widget_class="BECMotorMap") + + mm = bec_figure.motor_map(config=default_config.model_dump()) + default_config.gui_id = mm.gui_id + + assert mm.config == default_config -def test_motor_map_init(bec_motor_map): - default_config = MotorMapConfig(widget_class="BECMotorMap", gui_id="BECMotorMap_test") +def test_motor_map_change_motors(bec_figure): + mm = bec_figure.motor_map("samx", "samy") - assert bec_motor_map.config == default_config + assert mm.motor_x == "samx" + assert mm.motor_y == "samy" + assert mm.config.signals.x == SignalData(name="samx", entry="samx", limits=[-10, 10]) + assert mm.config.signals.y == SignalData(name="samy", entry="samy", limits=[-5, 5]) + + mm.change_motors("samx", "samz") + + assert mm.config.signals.x == SignalData(name="samx", entry="samx", limits=[-10, 10]) + assert mm.config.signals.y == SignalData(name="samz", entry="samz", limits=[-8, 8]) -def test_motor_map_change_motors(bec_motor_map): - bec_motor_map.change_motors("samx", "samy") - - assert bec_motor_map.config.signals.x == SignalData(name="samx", entry="samx", limits=[-10, 10]) - assert bec_motor_map.config.signals.y == SignalData(name="samy", entry="samy", limits=[-5, 5]) - - -def test_motor_map_get_limits(bec_motor_map): +def test_motor_map_get_limits(bec_figure): + mm = bec_figure.motor_map("samx", "samy") expected_limits = {"samx": [-10, 10], "samy": [-5, 5]} for motor_name, expected_limit in expected_limits.items(): - actual_limit = bec_motor_map._get_motor_limit(motor_name) + actual_limit = mm._get_motor_limit(motor_name) assert actual_limit == expected_limit -def test_motor_map_get_init_position(bec_motor_map): - bec_motor_map.set_precision(2) +def test_motor_map_get_init_position(bec_figure): + mm = bec_figure.motor_map("samx", "samy") + mm.set_precision(2) - motor_map_dev = bec_motor_map.client.device_manager.devices + motor_map_dev = mm.client.device_manager.devices expected_positions = { ("samx", "samx"): motor_map_dev["samx"].read()["samx"]["value"], ("samy", "samy"): motor_map_dev["samy"].read()["samy"]["value"], - ("aptrx", "aptrx"): motor_map_dev["aptrx"].read()["aptrx"]["value"], - ("aptry", "aptry"): motor_map_dev["aptry"].read()["aptry"]["value"], } for (motor_name, entry), expected_position in expected_positions.items(): - actual_position = bec_motor_map._get_motor_init_position(motor_name, entry, 2) + actual_position = mm._get_motor_init_position(motor_name, entry, 2) assert actual_position == expected_position -def test_motor_movement_updates_position_and_database(bec_motor_map): - motor_map_dev = bec_motor_map.client.device_manager.devices +def test_motor_movement_updates_position_and_database(bec_figure): + mm = bec_figure.motor_map("samx", "samy") + motor_map_dev = mm.client.device_manager.devices init_positions = { "samx": [motor_map_dev["samx"].read()["samx"]["value"]], "samy": [motor_map_dev["samy"].read()["samy"]["value"]], } - bec_motor_map.change_motors("samx", "samy") + mm.change_motors("samx", "samy") - assert bec_motor_map.database_buffer["x"] == init_positions["samx"] - assert bec_motor_map.database_buffer["y"] == init_positions["samy"] + assert mm.database_buffer["x"] == init_positions["samx"] + assert mm.database_buffer["y"] == init_positions["samy"] # Simulate motor movement for 'samx' only new_position_samx = 4.0 - bec_motor_map.on_device_readback({"signals": {"samx": {"value": new_position_samx}}}) + mm.on_device_readback({"signals": {"samx": {"value": new_position_samx}}}) init_positions["samx"].append(new_position_samx) init_positions["samy"].append(init_positions["samy"][-1]) # Verify database update for 'samx' - assert bec_motor_map.database_buffer["x"] == init_positions["samx"] + assert mm.database_buffer["x"] == init_positions["samx"] # Verify 'samy' retains its last known position - assert bec_motor_map.database_buffer["y"] == init_positions["samy"] + assert mm.database_buffer["y"] == init_positions["samy"] -def test_scatter_plot_rendering(bec_motor_map): - motor_map_dev = bec_motor_map.client.device_manager.devices +def test_scatter_plot_rendering(bec_figure): + mm = bec_figure.motor_map("samx", "samy") + motor_map_dev = mm.client.device_manager.devices init_positions = { "samx": [motor_map_dev["samx"].read()["samx"]["value"]], "samy": [motor_map_dev["samy"].read()["samy"]["value"]], } - bec_motor_map.change_motors("samx", "samy") + mm.change_motors("samx", "samy") # Simulate motor movement for 'samx' only new_position_samx = 4.0 - bec_motor_map.on_device_readback({"signals": {"samx": {"value": new_position_samx}}}) - bec_motor_map._update_plot() + mm.on_device_readback({"signals": {"samx": {"value": new_position_samx}}}) + mm._update_plot() # Get the scatter plot item - scatter_plot_item = bec_motor_map.plot_components["scatter"] + scatter_plot_item = mm.plot_components["scatter"] # Check the scatter plot item properties assert len(scatter_plot_item.data) > 0, "Scatter plot data is empty" @@ -106,16 +112,148 @@ def test_scatter_plot_rendering(bec_motor_map): ), "Scatter plot Y data should retain last known position" -def test_plot_visualization_consistency(bec_motor_map): - bec_motor_map.change_motors("samx", "samy") +def test_plot_visualization_consistency(bec_figure): + mm = bec_figure.motor_map("samx", "samy") + mm.change_motors("samx", "samy") # Simulate updating the plot with new data - bec_motor_map.on_device_readback({"signals": {"samx": {"value": 5}}}) - bec_motor_map.on_device_readback({"signals": {"samy": {"value": 9}}}) - bec_motor_map._update_plot() + mm.on_device_readback({"signals": {"samx": {"value": 5}}}) + mm.on_device_readback({"signals": {"samy": {"value": 9}}}) + mm._update_plot() - scatter_plot_item = bec_motor_map.plot_components["scatter"] + scatter_plot_item = mm.plot_components["scatter"] # Check if the scatter plot reflects the new data correctly assert ( scatter_plot_item.data["x"][-1] == 5 and scatter_plot_item.data["y"][-1] == 9 ), "Plot not updated correctly with new data" + + +def test_change_background_value(bec_figure, qtbot): + mm = bec_figure.motor_map("samx", "samy") + + assert mm.config.background_value == 25 + assert np.all(mm.plot_components["limit_map"].image == 25.0) + + mm.set_background_value(50) + qtbot.wait(200) + + assert mm.config.background_value == 50 + assert np.all(mm.plot_components["limit_map"].image == 50.0) + + +def test_motor_map_init_from_config(bec_figure): + config = { + "widget_class": "BECMotorMap", + "gui_id": "mm_id", + "parent_id": bec_figure.gui_id, + "row": 0, + "col": 0, + "axis": { + "title": "Motor position: (-0.0, 0.0)", + "title_size": None, + "x_label": "Motor X (samx)", + "x_label_size": None, + "y_label": "Motor Y (samy)", + "y_label_size": None, + "legend_label_size": None, + "x_scale": "linear", + "y_scale": "linear", + "x_lim": None, + "y_lim": None, + "x_grid": True, + "y_grid": True, + }, + "signals": { + "source": "device_readback", + "x": { + "name": "samx", + "entry": "samx", + "unit": None, + "modifier": None, + "limits": [-10.0, 10.0], + }, + "y": { + "name": "samy", + "entry": "samy", + "unit": None, + "modifier": None, + "limits": [-5.0, 5.0], + }, + "z": None, + "dap": None, + }, + "color": (255, 255, 255, 255), + "scatter_size": 5, + "max_points": 50, + "num_dim_points": 10, + "precision": 5, + "background_value": 50, + } + mm = bec_figure.motor_map(config=config) + config["gui_id"] = mm.gui_id + + assert mm.config_dict == config + + +def test_motor_map_set_scatter_size(bec_figure, qtbot): + mm = bec_figure.motor_map("samx", "samy") + + assert mm.config.scatter_size == 5 + assert mm.plot_components["scatter"].opts["size"] == 5 + + mm.set_scatter_size(10) + qtbot.wait(200) + + assert mm.config.scatter_size == 10 + assert mm.plot_components["scatter"].opts["size"] == 10 + + +def test_motor_map_change_precision(bec_figure): + mm = bec_figure.motor_map("samx", "samy") + + assert mm.config.precision == 2 + mm.set_precision(10) + assert mm.config.precision == 10 + + +def test_motor_map_set_color(bec_figure, qtbot): + mm = bec_figure.motor_map("samx", "samy") + + assert mm.config.color == (255, 255, 255, 255) + + mm.set_color((0, 0, 0, 255)) + qtbot.wait(200) + assert mm.config.color == (0, 0, 0, 255) + + +def test_motor_map_get_data_max_points(bec_figure, qtbot): + mm = bec_figure.motor_map("samx", "samy") + motor_map_dev = mm.client.device_manager.devices + + init_positions = { + "samx": [motor_map_dev["samx"].read()["samx"]["value"]], + "samy": [motor_map_dev["samy"].read()["samy"]["value"]], + } + mm.on_device_readback({"signals": {"samx": {"value": 5.0}}}) + mm.on_device_readback({"signals": {"samy": {"value": 9.0}}}) + mm.on_device_readback({"signals": {"samx": {"value": 6.0}}}) + mm.on_device_readback({"signals": {"samy": {"value": 7.0}}}) + + expected_x = [init_positions["samx"][-1], 5.0, 5.0, 6.0, 6.0] + expected_y = [init_positions["samy"][-1], init_positions["samy"][-1], 9.0, 9.0, 7.0] + get_data = mm.get_data() + + assert mm.database_buffer["x"] == expected_x + assert mm.database_buffer["y"] == expected_y + assert get_data["x"] == expected_x + assert get_data["y"] == expected_y + + mm.set_max_points(3) + qtbot.wait(200) + get_data = mm.get_data() + assert len(get_data["x"]) == 3 + assert len(get_data["y"]) == 3 + assert get_data["x"] == expected_x[-3:] + assert get_data["y"] == expected_y[-3:] + assert mm.database_buffer["x"] == expected_x[-3:] + assert mm.database_buffer["y"] == expected_y[-3:] diff --git a/tests/unit_tests/test_device_input_base.py b/tests/unit_tests/test_device_input_base.py index 3c6ab5ec..87481237 100644 --- a/tests/unit_tests/test_device_input_base.py +++ b/tests/unit_tests/test_device_input_base.py @@ -58,7 +58,7 @@ def test_device_input_base_set_default_device_error(device_input_base): def test_device_input_base_get_device_list(device_input_base): devices = device_input_base.get_device_list("FakePositioner") - assert devices == ["samx", "samy", "aptrx", "aptry"] + assert devices == ["samx", "samy", "samz", "aptrx", "aptry"] def test_device_input_base_get_filters(device_input_base): diff --git a/tests/unit_tests/test_device_input_widgets.py b/tests/unit_tests/test_device_input_widgets.py index 0e3e4b56..6059689e 100644 --- a/tests/unit_tests/test_device_input_widgets.py +++ b/tests/unit_tests/test_device_input_widgets.py @@ -56,6 +56,7 @@ def test_device_input_combobox_init(device_input_combobox): assert device_input_combobox.devices == [ "samx", "samy", + "samz", "aptrx", "aptry", "gauss_bpm", @@ -141,6 +142,7 @@ def test_device_input_line_edit_init(device_input_line_edit): assert device_input_line_edit.devices == [ "samx", "samy", + "samz", "aptrx", "aptry", "gauss_bpm", diff --git a/tests/unit_tests/test_motor_control.py b/tests/unit_tests/test_motor_control.py index ed621d5a..c0bcec3a 100644 --- a/tests/unit_tests/test_motor_control.py +++ b/tests/unit_tests/test_motor_control.py @@ -74,7 +74,7 @@ def test_motor_thread_initialization(mocked_client): def test_get_all_motors_names(mocked_client): motor_thread = MotorThread(client=mocked_client) motor_names = motor_thread.get_all_motors_names() - expected_names = ["samx", "samy", "aptrx", "aptry"] + expected_names = ["samx", "samy", "samz", "aptrx", "aptry"] assert sorted(motor_names) == sorted(expected_names) assert all(name in motor_names for name in expected_names) assert len(motor_names) == len(expected_names) # Ensure only these motors are returned @@ -155,11 +155,12 @@ def motor_selection_widget(qtbot, mocked_client, motor_thread): def test_initialization_and_population(motor_selection_widget): - assert motor_selection_widget.comboBox_motor_x.count() == 4 + assert motor_selection_widget.comboBox_motor_x.count() == 5 assert motor_selection_widget.comboBox_motor_x.itemText(0) == "samx" assert motor_selection_widget.comboBox_motor_y.itemText(1) == "samy" - assert motor_selection_widget.comboBox_motor_x.itemText(2) == "aptrx" - assert motor_selection_widget.comboBox_motor_y.itemText(3) == "aptry" + assert motor_selection_widget.comboBox_motor_y.itemText(2) == "samz" + assert motor_selection_widget.comboBox_motor_x.itemText(3) == "aptrx" + assert motor_selection_widget.comboBox_motor_y.itemText(4) == "aptry" def test_selection_and_signal_emission(motor_selection_widget):