mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 03:31:50 +02:00
fix(motor_map): API changes updates current visualisation; motor_map can be initialised from config
This commit is contained in:
@ -6,22 +6,23 @@ from typing import Optional, Union
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
import pyqtgraph as pg
|
import pyqtgraph as pg
|
||||||
from bec_lib.endpoints import MessageEndpoints
|
from bec_lib.endpoints import MessageEndpoints
|
||||||
from pydantic import Field
|
from pydantic import Field, ValidationError, field_validator
|
||||||
|
from pydantic_core import PydanticCustomError
|
||||||
from qtpy import QtCore, QtGui
|
from qtpy import QtCore, QtGui
|
||||||
from qtpy.QtCore import Signal as pyqtSignal
|
from qtpy.QtCore import Signal as pyqtSignal
|
||||||
from qtpy.QtCore import Slot as pyqtSlot
|
from qtpy.QtCore import Slot as pyqtSlot
|
||||||
from qtpy.QtWidgets import QWidget
|
from qtpy.QtWidgets import QWidget
|
||||||
|
|
||||||
from bec_widgets.utils import EntryValidator
|
from bec_widgets.utils import Colors, EntryValidator
|
||||||
from bec_widgets.widgets.figure.plots.plot_base import BECPlotBase, SubplotConfig
|
from bec_widgets.widgets.figure.plots.plot_base import BECPlotBase, SubplotConfig
|
||||||
from bec_widgets.widgets.figure.plots.waveform.waveform import Signal, SignalData
|
from bec_widgets.widgets.figure.plots.waveform.waveform import Signal, SignalData
|
||||||
|
|
||||||
|
|
||||||
class MotorMapConfig(SubplotConfig):
|
class MotorMapConfig(SubplotConfig):
|
||||||
signals: Optional[Signal] = Field(None, description="Signals of the motor map")
|
signals: Optional[Signal] = Field(None, description="Signals of the motor map")
|
||||||
color_map: Optional[str] = Field(
|
color: Optional[str | tuple] = Field(
|
||||||
"Greys", description="Color scheme of the motor position gradient."
|
(255, 255, 255, 255), description="The color of the last point of current position."
|
||||||
) # TODO decide if useful for anything, or just keep GREYS always
|
)
|
||||||
scatter_size: Optional[int] = Field(5, description="Size of the scatter points.")
|
scatter_size: Optional[int] = Field(5, description="Size of the scatter points.")
|
||||||
max_points: Optional[int] = Field(1000, description="Maximum number of points to display.")
|
max_points: Optional[int] = Field(1000, description="Maximum number of points to display.")
|
||||||
num_dim_points: Optional[int] = Field(
|
num_dim_points: Optional[int] = Field(
|
||||||
@ -30,8 +31,23 @@ class MotorMapConfig(SubplotConfig):
|
|||||||
)
|
)
|
||||||
precision: Optional[int] = Field(2, description="Decimal precision of the motor position.")
|
precision: Optional[int] = Field(2, description="Decimal precision of the motor position.")
|
||||||
background_value: Optional[int] = Field(
|
background_value: Optional[int] = Field(
|
||||||
25, description="Background value of the motor map."
|
25, description="Background value of the motor map. Has to be between 0 and 255."
|
||||||
) # TODO can be percentage from 255 calculated
|
)
|
||||||
|
|
||||||
|
model_config: dict = {"validate_assignment": True}
|
||||||
|
|
||||||
|
_validate_color = field_validator("color")(Colors.validate_color)
|
||||||
|
|
||||||
|
# @field_validator("color")
|
||||||
|
# def convert_to_rgba(cls, value):
|
||||||
|
|
||||||
|
@field_validator("background_value")
|
||||||
|
def validate_background_value(cls, value):
|
||||||
|
if not 0 <= value <= 255:
|
||||||
|
raise PydanticCustomError(
|
||||||
|
"wrong_value", f"'{value}' hs to be between 0 and 255.", {"wrong_value": value}
|
||||||
|
)
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
class BECMotorMap(BECPlotBase):
|
class BECMotorMap(BECPlotBase):
|
||||||
@ -69,29 +85,43 @@ class BECMotorMap(BECPlotBase):
|
|||||||
self.get_bec_shortcuts()
|
self.get_bec_shortcuts()
|
||||||
self.entry_validator = EntryValidator(self.dev)
|
self.entry_validator = EntryValidator(self.dev)
|
||||||
|
|
||||||
|
# connect update signal to update plot
|
||||||
|
self.proxy_update_plot = pg.SignalProxy(
|
||||||
|
self.update_signal, rateLimit=25, slot=self._update_plot
|
||||||
|
)
|
||||||
|
self.apply_config(self.config)
|
||||||
|
|
||||||
|
def apply_config(self, config: dict | MotorMapConfig):
|
||||||
|
"""
|
||||||
|
Apply the config to the motor map.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config(dict|MotorMapConfig): Config to be applied.
|
||||||
|
"""
|
||||||
|
if isinstance(config, dict):
|
||||||
|
try:
|
||||||
|
config = MotorMapConfig(**config)
|
||||||
|
except ValidationError as e:
|
||||||
|
print(f"Error in applying config: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
self.config = config
|
||||||
|
self.plot_item.clear()
|
||||||
|
|
||||||
self.motor_x = None
|
self.motor_x = None
|
||||||
self.motor_y = None
|
self.motor_y = None
|
||||||
self.database_buffer = {"x": [], "y": []}
|
self.database_buffer = {"x": [], "y": []}
|
||||||
self.plot_components = defaultdict(dict) # container for plot components
|
self.plot_components = defaultdict(dict) # container for plot components
|
||||||
|
|
||||||
# connect update signal to update plot
|
self.apply_axis_config()
|
||||||
self.proxy_update_plot = pg.SignalProxy(
|
|
||||||
self.update_signal, rateLimit=25, slot=self._update_plot
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO decide if needed to implement, maybe there will be no children widgets for motormap for now...
|
if self.config.signals is not None:
|
||||||
# def find_widget_by_id(self, item_id: str) -> BECCurve:
|
self.change_motors(
|
||||||
# """
|
motor_x=self.config.signals.x.name,
|
||||||
# Find the curve by its ID.
|
motor_y=self.config.signals.y.name,
|
||||||
# Args:
|
motor_x_entry=self.config.signals.x.entry,
|
||||||
# item_id(str): ID of the curve.
|
motor_y_entry=self.config.signals.y.entry,
|
||||||
#
|
)
|
||||||
# Returns:
|
|
||||||
# BECCurve: The curve object.
|
|
||||||
# """
|
|
||||||
# for curve in self.plot_item.curves:
|
|
||||||
# if curve.gui_id == item_id:
|
|
||||||
# return curve
|
|
||||||
|
|
||||||
@pyqtSlot(str, str, str, str, bool)
|
@pyqtSlot(str, str, str, str, bool)
|
||||||
def change_motors(
|
def change_motors(
|
||||||
@ -129,6 +159,8 @@ class BECMotorMap(BECPlotBase):
|
|||||||
# reconnect the signals
|
# reconnect the signals
|
||||||
self._connect_motor_to_slots()
|
self._connect_motor_to_slots()
|
||||||
|
|
||||||
|
self.database_buffer = {"x": [], "y": []}
|
||||||
|
|
||||||
# Redraw the motor map
|
# Redraw the motor map
|
||||||
self._make_motor_map()
|
self._make_motor_map()
|
||||||
|
|
||||||
@ -141,7 +173,19 @@ class BECMotorMap(BECPlotBase):
|
|||||||
data = {"x": self.database_buffer["x"], "y": self.database_buffer["y"]}
|
data = {"x": self.database_buffer["x"], "y": self.database_buffer["y"]}
|
||||||
return data
|
return data
|
||||||
|
|
||||||
# TODO setup all visual properties
|
def set_color(self, color: [str | tuple]):
|
||||||
|
"""
|
||||||
|
Set color of the motor trace.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
color(str|tuple): Color of the motor trace. Can be HEX(str) or RGBA(tuple).
|
||||||
|
"""
|
||||||
|
if isinstance(color, str):
|
||||||
|
color = Colors.validate_color(color)
|
||||||
|
color = Colors.hex_to_rgba(color, 255)
|
||||||
|
self.config.color = color
|
||||||
|
self.update_signal.emit()
|
||||||
|
|
||||||
def set_max_points(self, max_points: int) -> None:
|
def set_max_points(self, max_points: int) -> None:
|
||||||
"""
|
"""
|
||||||
Set the maximum number of points to display.
|
Set the maximum number of points to display.
|
||||||
@ -150,6 +194,7 @@ class BECMotorMap(BECPlotBase):
|
|||||||
max_points(int): Maximum number of points to display.
|
max_points(int): Maximum number of points to display.
|
||||||
"""
|
"""
|
||||||
self.config.max_points = max_points
|
self.config.max_points = max_points
|
||||||
|
self.update_signal.emit()
|
||||||
|
|
||||||
def set_precision(self, precision: int) -> None:
|
def set_precision(self, precision: int) -> None:
|
||||||
"""
|
"""
|
||||||
@ -159,6 +204,7 @@ class BECMotorMap(BECPlotBase):
|
|||||||
precision(int): Decimal precision of the motor position.
|
precision(int): Decimal precision of the motor position.
|
||||||
"""
|
"""
|
||||||
self.config.precision = precision
|
self.config.precision = precision
|
||||||
|
self.update_signal.emit()
|
||||||
|
|
||||||
def set_num_dim_points(self, num_dim_points: int) -> None:
|
def set_num_dim_points(self, num_dim_points: int) -> None:
|
||||||
"""
|
"""
|
||||||
@ -168,6 +214,7 @@ class BECMotorMap(BECPlotBase):
|
|||||||
num_dim_points(int): Number of dim points.
|
num_dim_points(int): Number of dim points.
|
||||||
"""
|
"""
|
||||||
self.config.num_dim_points = num_dim_points
|
self.config.num_dim_points = num_dim_points
|
||||||
|
self.update_signal.emit()
|
||||||
|
|
||||||
def set_background_value(self, background_value: int) -> None:
|
def set_background_value(self, background_value: int) -> None:
|
||||||
"""
|
"""
|
||||||
@ -177,6 +224,7 @@ class BECMotorMap(BECPlotBase):
|
|||||||
background_value(int): Background value of the motor map.
|
background_value(int): Background value of the motor map.
|
||||||
"""
|
"""
|
||||||
self.config.background_value = background_value
|
self.config.background_value = background_value
|
||||||
|
self._swap_limit_map()
|
||||||
|
|
||||||
def set_scatter_size(self, scatter_size: int) -> None:
|
def set_scatter_size(self, scatter_size: int) -> None:
|
||||||
"""
|
"""
|
||||||
@ -186,6 +234,7 @@ class BECMotorMap(BECPlotBase):
|
|||||||
scatter_size(int): Size of the scatter points.
|
scatter_size(int): Size of the scatter points.
|
||||||
"""
|
"""
|
||||||
self.config.scatter_size = scatter_size
|
self.config.scatter_size = scatter_size
|
||||||
|
self.update_signal.emit()
|
||||||
|
|
||||||
def _disconnect_current_motors(self):
|
def _disconnect_current_motors(self):
|
||||||
"""Disconnect the current motors from the slots."""
|
"""Disconnect the current motors from the slots."""
|
||||||
@ -210,6 +259,15 @@ class BECMotorMap(BECPlotBase):
|
|||||||
|
|
||||||
self.bec_dispatcher.connect_slot(self.on_device_readback, endpoints)
|
self.bec_dispatcher.connect_slot(self.on_device_readback, endpoints)
|
||||||
|
|
||||||
|
def _swap_limit_map(self):
|
||||||
|
"""Swap the limit map."""
|
||||||
|
self.plot_item.removeItem(self.plot_components["limit_map"])
|
||||||
|
self.plot_components["limit_map"] = self._make_limit_map(
|
||||||
|
self.config.signals.x.limits, self.config.signals.y.limits
|
||||||
|
)
|
||||||
|
self.plot_components["limit_map"].setZValue(-1)
|
||||||
|
self.plot_item.addItem(self.plot_components["limit_map"])
|
||||||
|
|
||||||
def _make_motor_map(self):
|
def _make_motor_map(self):
|
||||||
"""
|
"""
|
||||||
Create the motor map plot.
|
Create the motor map plot.
|
||||||
@ -249,6 +307,8 @@ class BECMotorMap(BECPlotBase):
|
|||||||
# Set default labels for the plot
|
# Set default labels for the plot
|
||||||
self.set(x_label=f"Motor X ({self.motor_x})", y_label=f"Motor Y ({self.motor_y})")
|
self.set(x_label=f"Motor X ({self.motor_x})", y_label=f"Motor Y ({self.motor_y})")
|
||||||
|
|
||||||
|
self.update_signal.emit()
|
||||||
|
|
||||||
def _add_coordinantes_crosshair(self, x: float, y: float) -> None:
|
def _add_coordinantes_crosshair(self, x: float, y: float) -> None:
|
||||||
"""
|
"""
|
||||||
Add crosshair to the plot to highlight the current position.
|
Add crosshair to the plot to highlight the current position.
|
||||||
@ -373,19 +433,31 @@ class BECMotorMap(BECPlotBase):
|
|||||||
|
|
||||||
def _update_plot(self):
|
def _update_plot(self):
|
||||||
"""Update the motor map plot."""
|
"""Update the motor map plot."""
|
||||||
|
# If the number of points exceeds max_points, delete the oldest points
|
||||||
|
if len(self.database_buffer["x"]) > self.config.max_points:
|
||||||
|
self.database_buffer["x"] = self.database_buffer["x"][-self.config.max_points :]
|
||||||
|
self.database_buffer["y"] = self.database_buffer["y"][-self.config.max_points :]
|
||||||
|
|
||||||
x = self.database_buffer["x"]
|
x = self.database_buffer["x"]
|
||||||
y = self.database_buffer["y"]
|
y = self.database_buffer["y"]
|
||||||
|
|
||||||
# Setup gradient brush for history
|
# Setup gradient brush for history
|
||||||
brushes = [pg.mkBrush(50, 50, 50, 255)] * len(x)
|
brushes = [pg.mkBrush(50, 50, 50, 255)] * len(x)
|
||||||
|
|
||||||
|
# RGB color
|
||||||
|
r, g, b, a = self.config.color
|
||||||
|
|
||||||
# Calculate the decrement step based on self.num_dim_points
|
# Calculate the decrement step based on self.num_dim_points
|
||||||
num_dim_points = self.config.num_dim_points
|
num_dim_points = self.config.num_dim_points
|
||||||
decrement_step = (255 - 50) / num_dim_points
|
decrement_step = (255 - 50) / num_dim_points
|
||||||
|
|
||||||
for i in range(1, min(num_dim_points + 1, len(x) + 1)):
|
for i in range(1, min(num_dim_points + 1, len(x) + 1)):
|
||||||
brightness = max(60, 255 - decrement_step * (i - 1))
|
brightness = max(60, 255 - decrement_step * (i - 1))
|
||||||
brushes[-i] = pg.mkBrush(brightness, brightness, brightness, 255)
|
dim_r = int(r * (brightness / 255))
|
||||||
brushes[-1] = pg.mkBrush(255, 255, 255, 255) # Newest point is always full brightness
|
dim_g = int(g * (brightness / 255))
|
||||||
|
dim_b = int(b * (brightness / 255))
|
||||||
|
brushes[-i] = pg.mkBrush(dim_r, dim_g, dim_b, a)
|
||||||
|
brushes[-1] = pg.mkBrush(r, g, b, a) # Newest point is always full brightness
|
||||||
scatter_size = self.config.scatter_size
|
scatter_size = self.config.scatter_size
|
||||||
|
|
||||||
# Update the scatter plot
|
# Update the scatter plot
|
||||||
|
@ -84,6 +84,7 @@ class DMMock:
|
|||||||
DEVICES = [
|
DEVICES = [
|
||||||
FakePositioner("samx", limits=[-10, 10], read_value=2.0),
|
FakePositioner("samx", limits=[-10, 10], read_value=2.0),
|
||||||
FakePositioner("samy", limits=[-5, 5], read_value=3.0),
|
FakePositioner("samy", limits=[-5, 5], read_value=3.0),
|
||||||
|
FakePositioner("samz", limits=[-8, 8], read_value=4.0),
|
||||||
FakePositioner("aptrx", limits=None, read_value=4.0),
|
FakePositioner("aptrx", limits=None, read_value=4.0),
|
||||||
FakePositioner("aptry", limits=None, read_value=5.0),
|
FakePositioner("aptry", limits=None, read_value=5.0),
|
||||||
FakeDevice("gauss_bpm"),
|
FakeDevice("gauss_bpm"),
|
||||||
|
@ -1,100 +1,106 @@
|
|||||||
|
import numpy as np
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from bec_widgets.widgets.figure.plots.motor_map.motor_map import BECMotorMap, MotorMapConfig
|
from bec_widgets.widgets.figure.plots.motor_map.motor_map import BECMotorMap, MotorMapConfig
|
||||||
from bec_widgets.widgets.figure.plots.waveform.waveform_curve import SignalData
|
from bec_widgets.widgets.figure.plots.waveform.waveform_curve import SignalData
|
||||||
|
|
||||||
from .client_mocks import mocked_client
|
from .client_mocks import mocked_client
|
||||||
|
from .test_bec_figure import bec_figure
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="function")
|
def test_motor_map_init(bec_figure):
|
||||||
def bec_motor_map(qtbot, mocked_client):
|
default_config = MotorMapConfig(widget_class="BECMotorMap")
|
||||||
widget = BECMotorMap(client=mocked_client, gui_id="BECMotorMap_test")
|
|
||||||
# qtbot.addWidget(widget)
|
mm = bec_figure.motor_map(config=default_config.model_dump())
|
||||||
# qtbot.waitExposed(widget)
|
default_config.gui_id = mm.gui_id
|
||||||
yield widget
|
|
||||||
|
assert mm.config == default_config
|
||||||
|
|
||||||
|
|
||||||
def test_motor_map_init(bec_motor_map):
|
def test_motor_map_change_motors(bec_figure):
|
||||||
default_config = MotorMapConfig(widget_class="BECMotorMap", gui_id="BECMotorMap_test")
|
mm = bec_figure.motor_map("samx", "samy")
|
||||||
|
|
||||||
assert bec_motor_map.config == default_config
|
assert mm.motor_x == "samx"
|
||||||
|
assert mm.motor_y == "samy"
|
||||||
|
assert mm.config.signals.x == SignalData(name="samx", entry="samx", limits=[-10, 10])
|
||||||
|
assert mm.config.signals.y == SignalData(name="samy", entry="samy", limits=[-5, 5])
|
||||||
|
|
||||||
|
mm.change_motors("samx", "samz")
|
||||||
|
|
||||||
|
assert mm.config.signals.x == SignalData(name="samx", entry="samx", limits=[-10, 10])
|
||||||
|
assert mm.config.signals.y == SignalData(name="samz", entry="samz", limits=[-8, 8])
|
||||||
|
|
||||||
|
|
||||||
def test_motor_map_change_motors(bec_motor_map):
|
def test_motor_map_get_limits(bec_figure):
|
||||||
bec_motor_map.change_motors("samx", "samy")
|
mm = bec_figure.motor_map("samx", "samy")
|
||||||
|
|
||||||
assert bec_motor_map.config.signals.x == SignalData(name="samx", entry="samx", limits=[-10, 10])
|
|
||||||
assert bec_motor_map.config.signals.y == SignalData(name="samy", entry="samy", limits=[-5, 5])
|
|
||||||
|
|
||||||
|
|
||||||
def test_motor_map_get_limits(bec_motor_map):
|
|
||||||
expected_limits = {"samx": [-10, 10], "samy": [-5, 5]}
|
expected_limits = {"samx": [-10, 10], "samy": [-5, 5]}
|
||||||
|
|
||||||
for motor_name, expected_limit in expected_limits.items():
|
for motor_name, expected_limit in expected_limits.items():
|
||||||
actual_limit = bec_motor_map._get_motor_limit(motor_name)
|
actual_limit = mm._get_motor_limit(motor_name)
|
||||||
assert actual_limit == expected_limit
|
assert actual_limit == expected_limit
|
||||||
|
|
||||||
|
|
||||||
def test_motor_map_get_init_position(bec_motor_map):
|
def test_motor_map_get_init_position(bec_figure):
|
||||||
bec_motor_map.set_precision(2)
|
mm = bec_figure.motor_map("samx", "samy")
|
||||||
|
mm.set_precision(2)
|
||||||
|
|
||||||
motor_map_dev = bec_motor_map.client.device_manager.devices
|
motor_map_dev = mm.client.device_manager.devices
|
||||||
|
|
||||||
expected_positions = {
|
expected_positions = {
|
||||||
("samx", "samx"): motor_map_dev["samx"].read()["samx"]["value"],
|
("samx", "samx"): motor_map_dev["samx"].read()["samx"]["value"],
|
||||||
("samy", "samy"): motor_map_dev["samy"].read()["samy"]["value"],
|
("samy", "samy"): motor_map_dev["samy"].read()["samy"]["value"],
|
||||||
("aptrx", "aptrx"): motor_map_dev["aptrx"].read()["aptrx"]["value"],
|
|
||||||
("aptry", "aptry"): motor_map_dev["aptry"].read()["aptry"]["value"],
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (motor_name, entry), expected_position in expected_positions.items():
|
for (motor_name, entry), expected_position in expected_positions.items():
|
||||||
actual_position = bec_motor_map._get_motor_init_position(motor_name, entry, 2)
|
actual_position = mm._get_motor_init_position(motor_name, entry, 2)
|
||||||
assert actual_position == expected_position
|
assert actual_position == expected_position
|
||||||
|
|
||||||
|
|
||||||
def test_motor_movement_updates_position_and_database(bec_motor_map):
|
def test_motor_movement_updates_position_and_database(bec_figure):
|
||||||
motor_map_dev = bec_motor_map.client.device_manager.devices
|
mm = bec_figure.motor_map("samx", "samy")
|
||||||
|
motor_map_dev = mm.client.device_manager.devices
|
||||||
|
|
||||||
init_positions = {
|
init_positions = {
|
||||||
"samx": [motor_map_dev["samx"].read()["samx"]["value"]],
|
"samx": [motor_map_dev["samx"].read()["samx"]["value"]],
|
||||||
"samy": [motor_map_dev["samy"].read()["samy"]["value"]],
|
"samy": [motor_map_dev["samy"].read()["samy"]["value"]],
|
||||||
}
|
}
|
||||||
|
|
||||||
bec_motor_map.change_motors("samx", "samy")
|
mm.change_motors("samx", "samy")
|
||||||
|
|
||||||
assert bec_motor_map.database_buffer["x"] == init_positions["samx"]
|
assert mm.database_buffer["x"] == init_positions["samx"]
|
||||||
assert bec_motor_map.database_buffer["y"] == init_positions["samy"]
|
assert mm.database_buffer["y"] == init_positions["samy"]
|
||||||
|
|
||||||
# Simulate motor movement for 'samx' only
|
# Simulate motor movement for 'samx' only
|
||||||
new_position_samx = 4.0
|
new_position_samx = 4.0
|
||||||
bec_motor_map.on_device_readback({"signals": {"samx": {"value": new_position_samx}}})
|
mm.on_device_readback({"signals": {"samx": {"value": new_position_samx}}})
|
||||||
|
|
||||||
init_positions["samx"].append(new_position_samx)
|
init_positions["samx"].append(new_position_samx)
|
||||||
init_positions["samy"].append(init_positions["samy"][-1])
|
init_positions["samy"].append(init_positions["samy"][-1])
|
||||||
# Verify database update for 'samx'
|
# Verify database update for 'samx'
|
||||||
assert bec_motor_map.database_buffer["x"] == init_positions["samx"]
|
assert mm.database_buffer["x"] == init_positions["samx"]
|
||||||
|
|
||||||
# Verify 'samy' retains its last known position
|
# Verify 'samy' retains its last known position
|
||||||
assert bec_motor_map.database_buffer["y"] == init_positions["samy"]
|
assert mm.database_buffer["y"] == init_positions["samy"]
|
||||||
|
|
||||||
|
|
||||||
def test_scatter_plot_rendering(bec_motor_map):
|
def test_scatter_plot_rendering(bec_figure):
|
||||||
motor_map_dev = bec_motor_map.client.device_manager.devices
|
mm = bec_figure.motor_map("samx", "samy")
|
||||||
|
motor_map_dev = mm.client.device_manager.devices
|
||||||
|
|
||||||
init_positions = {
|
init_positions = {
|
||||||
"samx": [motor_map_dev["samx"].read()["samx"]["value"]],
|
"samx": [motor_map_dev["samx"].read()["samx"]["value"]],
|
||||||
"samy": [motor_map_dev["samy"].read()["samy"]["value"]],
|
"samy": [motor_map_dev["samy"].read()["samy"]["value"]],
|
||||||
}
|
}
|
||||||
|
|
||||||
bec_motor_map.change_motors("samx", "samy")
|
mm.change_motors("samx", "samy")
|
||||||
|
|
||||||
# Simulate motor movement for 'samx' only
|
# Simulate motor movement for 'samx' only
|
||||||
new_position_samx = 4.0
|
new_position_samx = 4.0
|
||||||
bec_motor_map.on_device_readback({"signals": {"samx": {"value": new_position_samx}}})
|
mm.on_device_readback({"signals": {"samx": {"value": new_position_samx}}})
|
||||||
bec_motor_map._update_plot()
|
mm._update_plot()
|
||||||
|
|
||||||
# Get the scatter plot item
|
# Get the scatter plot item
|
||||||
scatter_plot_item = bec_motor_map.plot_components["scatter"]
|
scatter_plot_item = mm.plot_components["scatter"]
|
||||||
|
|
||||||
# Check the scatter plot item properties
|
# Check the scatter plot item properties
|
||||||
assert len(scatter_plot_item.data) > 0, "Scatter plot data is empty"
|
assert len(scatter_plot_item.data) > 0, "Scatter plot data is empty"
|
||||||
@ -106,16 +112,148 @@ def test_scatter_plot_rendering(bec_motor_map):
|
|||||||
), "Scatter plot Y data should retain last known position"
|
), "Scatter plot Y data should retain last known position"
|
||||||
|
|
||||||
|
|
||||||
def test_plot_visualization_consistency(bec_motor_map):
|
def test_plot_visualization_consistency(bec_figure):
|
||||||
bec_motor_map.change_motors("samx", "samy")
|
mm = bec_figure.motor_map("samx", "samy")
|
||||||
|
mm.change_motors("samx", "samy")
|
||||||
# Simulate updating the plot with new data
|
# Simulate updating the plot with new data
|
||||||
bec_motor_map.on_device_readback({"signals": {"samx": {"value": 5}}})
|
mm.on_device_readback({"signals": {"samx": {"value": 5}}})
|
||||||
bec_motor_map.on_device_readback({"signals": {"samy": {"value": 9}}})
|
mm.on_device_readback({"signals": {"samy": {"value": 9}}})
|
||||||
bec_motor_map._update_plot()
|
mm._update_plot()
|
||||||
|
|
||||||
scatter_plot_item = bec_motor_map.plot_components["scatter"]
|
scatter_plot_item = mm.plot_components["scatter"]
|
||||||
|
|
||||||
# Check if the scatter plot reflects the new data correctly
|
# Check if the scatter plot reflects the new data correctly
|
||||||
assert (
|
assert (
|
||||||
scatter_plot_item.data["x"][-1] == 5 and scatter_plot_item.data["y"][-1] == 9
|
scatter_plot_item.data["x"][-1] == 5 and scatter_plot_item.data["y"][-1] == 9
|
||||||
), "Plot not updated correctly with new data"
|
), "Plot not updated correctly with new data"
|
||||||
|
|
||||||
|
|
||||||
|
def test_change_background_value(bec_figure, qtbot):
|
||||||
|
mm = bec_figure.motor_map("samx", "samy")
|
||||||
|
|
||||||
|
assert mm.config.background_value == 25
|
||||||
|
assert np.all(mm.plot_components["limit_map"].image == 25.0)
|
||||||
|
|
||||||
|
mm.set_background_value(50)
|
||||||
|
qtbot.wait(200)
|
||||||
|
|
||||||
|
assert mm.config.background_value == 50
|
||||||
|
assert np.all(mm.plot_components["limit_map"].image == 50.0)
|
||||||
|
|
||||||
|
|
||||||
|
def test_motor_map_init_from_config(bec_figure):
|
||||||
|
config = {
|
||||||
|
"widget_class": "BECMotorMap",
|
||||||
|
"gui_id": "mm_id",
|
||||||
|
"parent_id": bec_figure.gui_id,
|
||||||
|
"row": 0,
|
||||||
|
"col": 0,
|
||||||
|
"axis": {
|
||||||
|
"title": "Motor position: (-0.0, 0.0)",
|
||||||
|
"title_size": None,
|
||||||
|
"x_label": "Motor X (samx)",
|
||||||
|
"x_label_size": None,
|
||||||
|
"y_label": "Motor Y (samy)",
|
||||||
|
"y_label_size": None,
|
||||||
|
"legend_label_size": None,
|
||||||
|
"x_scale": "linear",
|
||||||
|
"y_scale": "linear",
|
||||||
|
"x_lim": None,
|
||||||
|
"y_lim": None,
|
||||||
|
"x_grid": True,
|
||||||
|
"y_grid": True,
|
||||||
|
},
|
||||||
|
"signals": {
|
||||||
|
"source": "device_readback",
|
||||||
|
"x": {
|
||||||
|
"name": "samx",
|
||||||
|
"entry": "samx",
|
||||||
|
"unit": None,
|
||||||
|
"modifier": None,
|
||||||
|
"limits": [-10.0, 10.0],
|
||||||
|
},
|
||||||
|
"y": {
|
||||||
|
"name": "samy",
|
||||||
|
"entry": "samy",
|
||||||
|
"unit": None,
|
||||||
|
"modifier": None,
|
||||||
|
"limits": [-5.0, 5.0],
|
||||||
|
},
|
||||||
|
"z": None,
|
||||||
|
"dap": None,
|
||||||
|
},
|
||||||
|
"color": (255, 255, 255, 255),
|
||||||
|
"scatter_size": 5,
|
||||||
|
"max_points": 50,
|
||||||
|
"num_dim_points": 10,
|
||||||
|
"precision": 5,
|
||||||
|
"background_value": 50,
|
||||||
|
}
|
||||||
|
mm = bec_figure.motor_map(config=config)
|
||||||
|
config["gui_id"] = mm.gui_id
|
||||||
|
|
||||||
|
assert mm.config_dict == config
|
||||||
|
|
||||||
|
|
||||||
|
def test_motor_map_set_scatter_size(bec_figure, qtbot):
|
||||||
|
mm = bec_figure.motor_map("samx", "samy")
|
||||||
|
|
||||||
|
assert mm.config.scatter_size == 5
|
||||||
|
assert mm.plot_components["scatter"].opts["size"] == 5
|
||||||
|
|
||||||
|
mm.set_scatter_size(10)
|
||||||
|
qtbot.wait(200)
|
||||||
|
|
||||||
|
assert mm.config.scatter_size == 10
|
||||||
|
assert mm.plot_components["scatter"].opts["size"] == 10
|
||||||
|
|
||||||
|
|
||||||
|
def test_motor_map_change_precision(bec_figure):
|
||||||
|
mm = bec_figure.motor_map("samx", "samy")
|
||||||
|
|
||||||
|
assert mm.config.precision == 2
|
||||||
|
mm.set_precision(10)
|
||||||
|
assert mm.config.precision == 10
|
||||||
|
|
||||||
|
|
||||||
|
def test_motor_map_set_color(bec_figure, qtbot):
|
||||||
|
mm = bec_figure.motor_map("samx", "samy")
|
||||||
|
|
||||||
|
assert mm.config.color == (255, 255, 255, 255)
|
||||||
|
|
||||||
|
mm.set_color((0, 0, 0, 255))
|
||||||
|
qtbot.wait(200)
|
||||||
|
assert mm.config.color == (0, 0, 0, 255)
|
||||||
|
|
||||||
|
|
||||||
|
def test_motor_map_get_data_max_points(bec_figure, qtbot):
|
||||||
|
mm = bec_figure.motor_map("samx", "samy")
|
||||||
|
motor_map_dev = mm.client.device_manager.devices
|
||||||
|
|
||||||
|
init_positions = {
|
||||||
|
"samx": [motor_map_dev["samx"].read()["samx"]["value"]],
|
||||||
|
"samy": [motor_map_dev["samy"].read()["samy"]["value"]],
|
||||||
|
}
|
||||||
|
mm.on_device_readback({"signals": {"samx": {"value": 5.0}}})
|
||||||
|
mm.on_device_readback({"signals": {"samy": {"value": 9.0}}})
|
||||||
|
mm.on_device_readback({"signals": {"samx": {"value": 6.0}}})
|
||||||
|
mm.on_device_readback({"signals": {"samy": {"value": 7.0}}})
|
||||||
|
|
||||||
|
expected_x = [init_positions["samx"][-1], 5.0, 5.0, 6.0, 6.0]
|
||||||
|
expected_y = [init_positions["samy"][-1], init_positions["samy"][-1], 9.0, 9.0, 7.0]
|
||||||
|
get_data = mm.get_data()
|
||||||
|
|
||||||
|
assert mm.database_buffer["x"] == expected_x
|
||||||
|
assert mm.database_buffer["y"] == expected_y
|
||||||
|
assert get_data["x"] == expected_x
|
||||||
|
assert get_data["y"] == expected_y
|
||||||
|
|
||||||
|
mm.set_max_points(3)
|
||||||
|
qtbot.wait(200)
|
||||||
|
get_data = mm.get_data()
|
||||||
|
assert len(get_data["x"]) == 3
|
||||||
|
assert len(get_data["y"]) == 3
|
||||||
|
assert get_data["x"] == expected_x[-3:]
|
||||||
|
assert get_data["y"] == expected_y[-3:]
|
||||||
|
assert mm.database_buffer["x"] == expected_x[-3:]
|
||||||
|
assert mm.database_buffer["y"] == expected_y[-3:]
|
||||||
|
@ -58,7 +58,7 @@ def test_device_input_base_set_default_device_error(device_input_base):
|
|||||||
|
|
||||||
def test_device_input_base_get_device_list(device_input_base):
|
def test_device_input_base_get_device_list(device_input_base):
|
||||||
devices = device_input_base.get_device_list("FakePositioner")
|
devices = device_input_base.get_device_list("FakePositioner")
|
||||||
assert devices == ["samx", "samy", "aptrx", "aptry"]
|
assert devices == ["samx", "samy", "samz", "aptrx", "aptry"]
|
||||||
|
|
||||||
|
|
||||||
def test_device_input_base_get_filters(device_input_base):
|
def test_device_input_base_get_filters(device_input_base):
|
||||||
|
@ -56,6 +56,7 @@ def test_device_input_combobox_init(device_input_combobox):
|
|||||||
assert device_input_combobox.devices == [
|
assert device_input_combobox.devices == [
|
||||||
"samx",
|
"samx",
|
||||||
"samy",
|
"samy",
|
||||||
|
"samz",
|
||||||
"aptrx",
|
"aptrx",
|
||||||
"aptry",
|
"aptry",
|
||||||
"gauss_bpm",
|
"gauss_bpm",
|
||||||
@ -141,6 +142,7 @@ def test_device_input_line_edit_init(device_input_line_edit):
|
|||||||
assert device_input_line_edit.devices == [
|
assert device_input_line_edit.devices == [
|
||||||
"samx",
|
"samx",
|
||||||
"samy",
|
"samy",
|
||||||
|
"samz",
|
||||||
"aptrx",
|
"aptrx",
|
||||||
"aptry",
|
"aptry",
|
||||||
"gauss_bpm",
|
"gauss_bpm",
|
||||||
|
@ -74,7 +74,7 @@ def test_motor_thread_initialization(mocked_client):
|
|||||||
def test_get_all_motors_names(mocked_client):
|
def test_get_all_motors_names(mocked_client):
|
||||||
motor_thread = MotorThread(client=mocked_client)
|
motor_thread = MotorThread(client=mocked_client)
|
||||||
motor_names = motor_thread.get_all_motors_names()
|
motor_names = motor_thread.get_all_motors_names()
|
||||||
expected_names = ["samx", "samy", "aptrx", "aptry"]
|
expected_names = ["samx", "samy", "samz", "aptrx", "aptry"]
|
||||||
assert sorted(motor_names) == sorted(expected_names)
|
assert sorted(motor_names) == sorted(expected_names)
|
||||||
assert all(name in motor_names for name in expected_names)
|
assert all(name in motor_names for name in expected_names)
|
||||||
assert len(motor_names) == len(expected_names) # Ensure only these motors are returned
|
assert len(motor_names) == len(expected_names) # Ensure only these motors are returned
|
||||||
@ -155,11 +155,12 @@ def motor_selection_widget(qtbot, mocked_client, motor_thread):
|
|||||||
|
|
||||||
|
|
||||||
def test_initialization_and_population(motor_selection_widget):
|
def test_initialization_and_population(motor_selection_widget):
|
||||||
assert motor_selection_widget.comboBox_motor_x.count() == 4
|
assert motor_selection_widget.comboBox_motor_x.count() == 5
|
||||||
assert motor_selection_widget.comboBox_motor_x.itemText(0) == "samx"
|
assert motor_selection_widget.comboBox_motor_x.itemText(0) == "samx"
|
||||||
assert motor_selection_widget.comboBox_motor_y.itemText(1) == "samy"
|
assert motor_selection_widget.comboBox_motor_y.itemText(1) == "samy"
|
||||||
assert motor_selection_widget.comboBox_motor_x.itemText(2) == "aptrx"
|
assert motor_selection_widget.comboBox_motor_y.itemText(2) == "samz"
|
||||||
assert motor_selection_widget.comboBox_motor_y.itemText(3) == "aptry"
|
assert motor_selection_widget.comboBox_motor_x.itemText(3) == "aptrx"
|
||||||
|
assert motor_selection_widget.comboBox_motor_y.itemText(4) == "aptry"
|
||||||
|
|
||||||
|
|
||||||
def test_selection_and_signal_emission(motor_selection_widget):
|
def test_selection_and_signal_emission(motor_selection_widget):
|
||||||
|
Reference in New Issue
Block a user