0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 03:31:50 +02:00

feat(plots/bec_motor_map): BECMotorMap build on BECPlotBase

This commit is contained in:
2024-03-26 13:13:41 +01:00
parent 88014d24c1
commit 0f69c346cd
6 changed files with 485 additions and 11 deletions

View File

@ -10,5 +10,5 @@ from .motor_control import (
MotorThread,
)
from .motor_map import MotorMap
from .plots import BECCurve, BECPlotBase, BECWaveform1D
from .plots import BECCurve, BECMotorMap, BECWaveform1D
from .scan_control import ScanControl

View File

@ -1,3 +1,4 @@
from .image import BECImageItem, BECImageShow, ImageItemConfig
from .motor_map import BECMotorMap, MotorMapConfig
from .plot_base import AxisConfig, BECPlotBase, WidgetConfig
from .waveform1d import BECCurve, BECWaveform1D, Waveform1DConfig

View File

@ -0,0 +1,415 @@
from __future__ import annotations
from collections import defaultdict
from typing import Any, Literal, Optional, Union
import numpy as np
import pyqtgraph as pg
from bec_lib import MessageEndpoints
from bec_lib.scan_data import ScanData
from pydantic import BaseModel, Field, ValidationError
from pyqtgraph import mkBrush
from qtpy import QtCore, QtGui
from qtpy.QtCore import Signal as pyqtSignal
from qtpy.QtCore import Slot as pyqtSlot
from qtpy.QtWidgets import QWidget
from bec_widgets.utils import BECConnector, Colors, ConnectionConfig, EntryValidator
from bec_widgets.widgets.plots.plot_base import BECPlotBase, WidgetConfig
from bec_widgets.widgets.plots.waveform1d import Signal, SignalData
class MotorMapConfig(WidgetConfig):
signals: Optional[Signal] = Field(None, description="Signals of the motor map")
color_map: Optional[str] = Field(
"Greys", description="Color scheme of the motor position gradient."
)
scatter_size: Optional[int] = Field(5, description="Size of the scatter points.")
max_points: Optional[int] = Field(1000, description="Maximum number of points to display.")
num_dim_points: Optional[int] = Field(
100,
description="Number of points to dim before the color remains same for older recorded position.",
)
precision: Optional[int] = Field(2, description="Decimal precision of the motor position.")
background_value: Optional[int] = Field(
25, description="Background value of the motor map."
) # TODO can be percentage from 255 calculated
class BECMotorMap(BECPlotBase):
USER_ACCESS = []
# QT Signals
update_signal = pyqtSignal()
def __init__(
self,
parent: Optional[QWidget] = None,
parent_figure=None,
config: Optional[MotorMapConfig] = None,
client=None,
gui_id: Optional[str] = None,
):
if config is None:
config = MotorMapConfig(widget_class=self.__class__.__name__)
super().__init__(
parent=parent, parent_figure=parent_figure, config=config, client=client, gui_id=gui_id
)
# Get bec shortcuts dev, scans, queue, scan_storage, dap
self.get_bec_shortcuts()
self.entry_validator = EntryValidator(self.dev)
self.motor_x = None
self.motor_y = None
self.database_buffer = {"x": [], "y": []}
self.plot_components = defaultdict(dict) # container for plot components
# connect update signal to update plot
self.proxy_update_plot = pg.SignalProxy(
self.update_signal, rateLimit=25, slot=self._update_plot
)
# TODO decide if needed to implement, maybe there will be no children widgets for motormap for now...
# def find_widget_by_id(self, item_id: str) -> BECCurve:
# """
# Find the curve by its ID.
# Args:
# item_id(str): ID of the curve.
#
# Returns:
# BECCurve: The curve object.
# """
# for curve in self.plot_item.curves:
# if curve.gui_id == item_id:
# return curve
@pyqtSlot(str, str, str, str, bool)
def change_motors(
self,
motor_x: str,
motor_y: str,
motor_x_entry: str = None,
motor_y_entry: str = None,
validate_bec: bool = True,
) -> None:
"""
Change the active motors for the plot.
Args:
motor_x(str): Motor name for the X axis.
motor_y(str): Motor name for the Y axis.
motor_x_entry(str): Motor entry for the X axis.
motor_y_entry(str): Motor entry for the Y axis.
validate_bec(bool, optional): If True, validate the signal with BEC. Defaults to True.
"""
motor_x_entry, motor_y_entry = self._validate_signal_entries(
motor_x, motor_y, motor_x_entry, motor_y_entry, validate_bec
)
motor_x_limit = self._get_motor_limit(motor_x)
motor_y_limit = self._get_motor_limit(motor_y)
signal = Signal(
source="device_readback",
x=SignalData(name=motor_x, entry=motor_x_entry, limits=motor_x_limit),
y=SignalData(name=motor_y, entry=motor_y_entry, limits=motor_y_limit),
)
self.config.signals = signal
# reconnect the signals
self._connect_motor_to_slots()
# Redraw the motor map
self._make_motor_map()
# TODO setup all visual properties
def set_max_points(self, max_points: int) -> None:
"""
Set the maximum number of points to display.
Args:
max_points(int): Maximum number of points to display.
"""
self.config.max_points = max_points
def set_precision(self, precision: int) -> None:
"""
Set the decimal precision of the motor position.
Args:
precision(int): Decimal precision of the motor position.
"""
self.config.precision = precision
def set_num_dim_points(self, num_dim_points: int) -> None:
"""
Set the number of dim points for the motor map.
Args:
num_dim_points(int): Number of dim points.
"""
self.config.num_dim_points = num_dim_points
def set_background_value(self, background_value: int) -> None:
"""
Set the background value of the motor map.
Args:
background_value(int): Background value of the motor map.
"""
self.config.background_value = background_value
def set_scatter_size(self, scatter_size: int) -> None:
"""
Set the scatter size of the motor map plot.
Args:
scatter_size(int): Size of the scatter points.
"""
self.config.scatter_size = scatter_size
def _connect_motor_to_slots(self):
"""Connect motors to slots."""
if self.motor_x is not None and self.motor_y is not None:
old_endpoints = [
MessageEndpoints.device_readback(self.motor_x),
MessageEndpoints.device_readback(self.motor_y),
]
self.bec_dispatcher.disconnect_slot(self.on_device_readback, old_endpoints)
self.motor_x = self.config.signals.x.name
self.motor_y = self.config.signals.y.name
endpoints = [
MessageEndpoints.device_readback(self.motor_x),
MessageEndpoints.device_readback(self.motor_y),
]
self.bec_dispatcher.connect_slot(
self.on_device_readback, endpoints, single_callback_for_all_topics=True
)
def _make_motor_map(self):
"""
Create the motor map plot.
"""
# Create limit map
motor_x_limit = self.config.signals.x.limits
motor_y_limit = self.config.signals.y.limits
self.plot_components["limit_map"] = self._make_limit_map(motor_x_limit, motor_y_limit)
self.plot_item.addItem(self.plot_components["limit_map"])
self.plot_components["limit_map"].setZValue(-1)
# Create scatter plot
scatter_size = self.config.scatter_size
self.plot_components["scatter"] = pg.ScatterPlotItem(
size=scatter_size, brush=pg.mkBrush(255, 255, 255, 255)
)
self.plot_item.addItem(self.plot_components["scatter"])
self.plot_components["scatter"].setZValue(0)
# Enable Grid
self.set_grid(True, True)
# Add the crosshair for initial motor coordinates
initial_position_x = self._get_motor_init_position(
self.motor_x, self.config.signals.x.entry, self.config.precision
)
initial_position_y = self._get_motor_init_position(
self.motor_y, self.config.signals.y.entry, self.config.precision
)
self.database_buffer["x"] = [initial_position_x]
self.database_buffer["y"] = [initial_position_y]
self.plot_components["scatter"].setData([initial_position_x], [initial_position_y])
self._add_coordinantes_crosshair(initial_position_x, initial_position_y)
def _add_coordinantes_crosshair(self, x: float, y: float) -> None:
"""
Add crosshair to the plot to highlight the current position.
Args:
x(float): X coordinate.
y(float): Y coordinate.
"""
# Crosshair to highlight the current position
highlight_H = pg.InfiniteLine(
angle=0, movable=False, pen=pg.mkPen(color="r", width=1, style=QtCore.Qt.DashLine)
)
highlight_V = pg.InfiniteLine(
angle=90, movable=False, pen=pg.mkPen(color="r", width=1, style=QtCore.Qt.DashLine)
)
# Add crosshair to the curve list for future referencing
self.plot_components["highlight_H"] = highlight_H
self.plot_components["highlight_V"] = highlight_V
# Add crosshair to the plot
self.plot_item.addItem(highlight_H)
self.plot_item.addItem(highlight_V)
highlight_V.setPos(x)
highlight_H.setPos(y)
def _make_limit_map(self, limits_x: list, limits_y: list) -> pg.ImageItem:
"""
Create a limit map for the motor map plot.
Args:
limits_x(list): Motor limits for the x axis.
limits_y(list): Motor limits for the y axis.
Returns:
pg.ImageItem: Limit map.
"""
limit_x_min, limit_x_max = limits_x
limit_y_min, limit_y_max = limits_y
map_width = int(limit_x_max - limit_x_min + 1)
map_height = int(limit_y_max - limit_y_min + 1)
# Create limits map
background_value = self.config.background_value
limit_map_data = np.full((map_width, map_height), background_value, dtype=np.float32)
limit_map = pg.ImageItem()
limit_map.setImage(limit_map_data)
# Translate and scale the image item to match the motor coordinates
tr = QtGui.QTransform()
tr.translate(limit_x_min, limit_y_min)
limit_map.setTransform(tr)
return limit_map
def _get_motor_init_position(self, name: str, entry: str, precision: int) -> float:
"""
Get the motor initial position from the config.
Args:
name(str): Motor name.
entry(str): Motor entry.
precision(int): Decimal precision of the motor position.
Returns:
float: Motor initial position.
"""
init_position = round(self.dev[name].read()[entry]["value"], precision)
return init_position
def _validate_signal_entries(
self,
x_name: str,
y_name: str,
x_entry: str | None,
y_entry: str | None,
validate_bec: bool = True,
) -> tuple[str, str]:
"""
Validate the signal name and entry.
Args:
x_name(str): Name of the x signal.
y_name(str): Name of the y signal.
x_entry(str|None): Entry of the x signal.
y_entry(str|None): Entry of the y signal.
validate_bec(bool, optional): If True, validate the signal with BEC. Defaults to True.
Returns:
tuple[str,str]: Validated x and y entries.
"""
if validate_bec:
x_entry = self.entry_validator.validate_signal(x_name, x_entry)
y_entry = self.entry_validator.validate_signal(y_name, y_entry)
else:
x_entry = x_name if x_entry is None else x_entry
y_entry = y_name if y_entry is None else y_entry
return x_entry, y_entry
def _get_motor_limit(self, motor: str) -> Union[list | None]: # TODO check if works correctly
"""
Get the motor limit from the config.
Args:
motor(str): Motor name.
Returns:
float: Motor limit.
"""
try:
limits = self.dev[motor].limits
if limits == [0, 0]:
return None
return limits
except AttributeError: # TODO maybe not needed, if no limits it returns [0,0]
# If the motor doesn't have a 'limits' attribute, return a default value or raise a custom exception
print(f"The device '{motor}' does not have defined limits.")
return None
def _update_plot(self):
"""Update the motor map plot."""
x = self.database_buffer["x"]
y = self.database_buffer["y"]
# Setup gradient brush for history
brushes = [pg.mkBrush(50, 50, 50, 255)] * len(x)
# Calculate the decrement step based on self.num_dim_points
num_dim_points = self.config.num_dim_points
decrement_step = (255 - 50) / num_dim_points
for i in range(1, min(num_dim_points + 1, len(x) + 1)):
brightness = max(60, 255 - decrement_step * (i - 1))
brushes[-i] = pg.mkBrush(brightness, brightness, brightness, 255)
brushes[-1] = pg.mkBrush(255, 255, 255, 255) # Newest point is always full brightness
scatter_size = self.config.scatter_size
# Update the scatter plot
self.plot_components["scatter"].setData(
x=x,
y=y,
brush=brushes,
pen=None,
size=scatter_size,
)
# Get last know position for crosshair
current_x = x[-1]
current_y = y[-1]
# Update the crosshair
self.plot_components["highlight_V"].setPos(current_x)
self.plot_components["highlight_H"].setPos(current_y)
# TODO not update title but some label
# Update plot title
precision = self.config.precision
self.set_title(
f"Motor position: ({round(current_x,precision)}, {round(current_y,precision)})"
)
@pyqtSlot(dict)
def on_device_readback(self, msg: dict) -> None:
"""
Update the motor map plot with the new motor position.
Args:
msg(dict): Message from the device readback.
"""
if self.motor_x is None or self.motor_y is None:
return
if self.motor_x in msg["signals"]:
x = msg["signals"][self.motor_x]["value"]
self.database_buffer["x"].append(x)
self.database_buffer["y"].append(self.database_buffer["y"][-1])
elif self.motor_y in msg["signals"]:
y = msg["signals"][self.motor_y]["value"]
self.database_buffer["y"].append(y)
self.database_buffer["x"].append(self.database_buffer["x"][-1])
self.update_signal.emit()
if __name__ == "__main__":
import sys
import pyqtgraph as pg
from qtpy.QtWidgets import QApplication, QMainWindow
app = QApplication(sys.argv)
glw = pg.GraphicsLayoutWidget()
motor_map = BECMotorMap()
motor_map.change_motors("samx", "samy")
glw.addItem(motor_map)
widget = glw
widget.show()
sys.exit(app.exec_())

View File

@ -15,7 +15,7 @@ from qtpy.QtCore import Slot as pyqtSlot
from qtpy.QtWidgets import QWidget
from bec_widgets.utils import BECConnector, Colors, ConnectionConfig, EntryValidator
from bec_widgets.widgets.plots import BECPlotBase, WidgetConfig
from bec_widgets.widgets.plots.plot_base import BECPlotBase, WidgetConfig
class SignalData(BaseModel):
@ -25,13 +25,14 @@ class SignalData(BaseModel):
entry: str
unit: Optional[str] = None # todo implement later
modifier: Optional[str] = None # todo implement later
limits: Optional[list[float]] = None # todo implement later
class Signal(BaseModel):
"""The configuration of a signal in the 1D waveform widget."""
source: str
x: SignalData
x: SignalData # TODO maybe add metadata for config gui later
y: SignalData

View File

@ -0,0 +1,39 @@
import pytest
from bec_widgets.widgets import BECMotorMap
from bec_widgets.widgets.plots.motor_map import MotorMapConfig
from bec_widgets.widgets.plots.waveform1d import Signal, SignalData
from .client_mocks import mocked_client
@pytest.fixture(scope="function")
def bec_motor_map(qtbot, mocked_client):
widget = BECMotorMap(client=mocked_client, gui_id="BECMotorMap_test")
# qtbot.addWidget(widget)
# qtbot.waitExposed(widget)
yield widget
def test_motor_map_init(bec_motor_map):
default_config = MotorMapConfig(widget_class="BECMotorMap", gui_id="BECMotorMap_test")
assert bec_motor_map.config == default_config
def test_motor_map_change_motors(bec_motor_map):
bec_motor_map.change_motors("samx", "samy")
assert bec_motor_map.config.signals.x == SignalData(name="samx", entry="samx", limits=[-10, 10])
assert bec_motor_map.config.signals.y == SignalData(name="samy", entry="samy", limits=[-5, 5])
def test_motor_map_get_limits(bec_motor_map):
expected_limits = {
"samx": [-10, 10],
"samy": [-5, 5],
}
for motor_name, expected_limit in expected_limits.items():
actual_limit = bec_motor_map._get_motor_limit(motor_name)
assert actual_limit == expected_limit

View File

@ -81,8 +81,20 @@ def test_create_waveform1D_by_config(bec_figure):
"source": "scan_segment",
"signals": {
"source": "scan_segment",
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None},
"y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None},
"x": {
"name": "samx",
"entry": "samx",
"unit": None,
"modifier": None,
"limits": None,
},
"y": {
"name": "bpm4i",
"entry": "bpm4i",
"unit": None,
"modifier": None,
"limits": None,
},
},
},
"curve-custom": {
@ -218,8 +230,8 @@ def test_change_curve_appearance_methods(bec_figure, qtbot):
assert c1.config.source == "scan_segment"
assert c1.config.signals.model_dump() == {
"source": "scan_segment",
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None},
"y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None},
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
"y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None, "limits": None},
}
@ -246,8 +258,8 @@ def test_change_curve_appearance_args(bec_figure):
assert c1.config.source == "scan_segment"
assert c1.config.signals.model_dump() == {
"source": "scan_segment",
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None},
"y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None},
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
"y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None, "limits": None},
}
@ -339,8 +351,14 @@ def test_curve_add_by_config(bec_figure):
"source": "scan_segment",
"signals": {
"source": "scan_segment",
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None},
"y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None},
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
"y": {
"name": "bpm4i",
"entry": "bpm4i",
"unit": None,
"modifier": None,
"limits": None,
},
},
}