mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 11:41:49 +02:00
refactor: motor_map.py clean up
This commit is contained in:
@ -1,4 +1,5 @@
|
||||
from .monitor import BECMonitor, ConfigDialog
|
||||
from .motor_map import MotorMap
|
||||
from .scan_control import ScanControl
|
||||
from .toolbar import ModularToolBar
|
||||
from .editor import BECEditor
|
||||
|
@ -0,0 +1 @@
|
||||
from .motor_map import MotorMap
|
||||
|
@ -57,7 +57,7 @@ class MotorMap(pg.GraphicsLayoutWidget):
|
||||
client=None,
|
||||
config: dict = None,
|
||||
gui_id=None,
|
||||
skip_validation: bool = False,
|
||||
skip_validation: bool = True,
|
||||
):
|
||||
super().__init__(parent=parent)
|
||||
|
||||
@ -111,16 +111,19 @@ class MotorMap(pg.GraphicsLayoutWidget):
|
||||
|
||||
# Global widget settings
|
||||
self._get_global_settings()
|
||||
|
||||
# Motor settings
|
||||
self.plot_data = self.config.get("motors", {})
|
||||
|
||||
# Include motor limits into the config
|
||||
# unique_signals = self._find_unique_signals(self.plot_data) #TODO is it needed?
|
||||
self._add_limits_to_plot_data()
|
||||
|
||||
# Initialize the database
|
||||
self.database = self._init_database()
|
||||
|
||||
# Create device mapping for x/y motor pairs
|
||||
self.device_mapping = self._create_device_mapping()
|
||||
|
||||
# Initialize the plot UI
|
||||
self._init_ui()
|
||||
|
||||
@ -137,6 +140,19 @@ class MotorMap(pg.GraphicsLayoutWidget):
|
||||
self.precision = self.plot_settings.get("precision", 2)
|
||||
self.background_value = self.plot_settings.get("background_value", 25)
|
||||
|
||||
def _create_device_mapping(self):
|
||||
"""
|
||||
Create a mapping of device names to their corresponding x/y devices.
|
||||
"""
|
||||
mapping = {}
|
||||
for motor in self.config.get("motors", []):
|
||||
for axis in ["x", "y"]:
|
||||
for signal in motor["signals"][axis]:
|
||||
other_axis = "y" if axis == "x" else "x"
|
||||
corresponding_device = motor["signals"][other_axis][0]["name"]
|
||||
mapping[signal["name"]] = corresponding_device
|
||||
return mapping
|
||||
|
||||
def _connect_motors_to_slots(self):
|
||||
"""Connect motors to slots."""
|
||||
|
||||
@ -193,28 +209,8 @@ class MotorMap(pg.GraphicsLayoutWidget):
|
||||
# If the motor doesn't have a 'limits' attribute, return a default value or raise a custom exception
|
||||
print(f"The device '{motor}' does not have defined limits.")
|
||||
|
||||
def _find_unique_signals(self, plot_data: dict) -> list: # TODO needed or not?
|
||||
"""
|
||||
Find unique signals in the plot data.
|
||||
|
||||
Args:
|
||||
plot_data(dict): Plot data.
|
||||
|
||||
Returns:
|
||||
list: List of unique signals.
|
||||
"""
|
||||
unique_pairs = set()
|
||||
for motor in plot_data:
|
||||
for axis in ["x", "y"]:
|
||||
for signal in motor["signals"][axis]:
|
||||
unique_pairs.add((signal["name"], signal["entry"]))
|
||||
unique_pairs_list = list(unique_pairs)
|
||||
|
||||
return unique_pairs_list
|
||||
|
||||
def _init_database(self):
|
||||
"""Initiate the database according the config."""
|
||||
# TODO maybe implement _find_unique_signals here instead of doing it in this method?
|
||||
database = {}
|
||||
|
||||
for plot in self.plot_data:
|
||||
@ -225,14 +221,13 @@ class MotorMap(pg.GraphicsLayoutWidget):
|
||||
if name not in database:
|
||||
database[name] = {}
|
||||
if entry not in database[name]:
|
||||
initial_value = self._get_initial_coordinate(name, entry)
|
||||
database[name][entry] = [initial_value]
|
||||
database[name][entry] = [self.get_coordinate(name, entry)]
|
||||
return database
|
||||
|
||||
def _get_initial_coordinate(self, name, entry):
|
||||
def get_coordinate(self, name, entry):
|
||||
"""Get the initial coordinate value for a motor."""
|
||||
try:
|
||||
return self.dev[name].readback.read()[entry]["value"]
|
||||
return self.dev[name].read()[entry]["value"]
|
||||
except Exception as e:
|
||||
print(f"Error getting initial value for {name}: {e}")
|
||||
return None
|
||||
@ -467,6 +462,11 @@ class MotorMap(pg.GraphicsLayoutWidget):
|
||||
current_x = motor_x_data[-1]
|
||||
current_y = motor_y_data[-1]
|
||||
|
||||
# Update plot title
|
||||
self.plots[plot_name].setTitle(
|
||||
f"Motor position: ({round(current_x,self.precision)}, {round(current_y,self.precision)})"
|
||||
)
|
||||
|
||||
# Update the crosshair
|
||||
self.curves_data[plot_name]["highlight_V"].setPos(current_x)
|
||||
self.curves_data[plot_name]["highlight_H"].setPos(current_y)
|
||||
@ -480,55 +480,30 @@ class MotorMap(pg.GraphicsLayoutWidget):
|
||||
"""
|
||||
|
||||
for device_name, device_info in msg["signals"].items():
|
||||
print(f'Updating device "{device_name}" with value {device_info["value"]}.')
|
||||
is_in_config, x_device, y_device = self._find_xy_devices(device_name)
|
||||
if is_in_config:
|
||||
value = device_info["value"]
|
||||
|
||||
# Update the database with the new value
|
||||
self._update_database(device_name, value)
|
||||
|
||||
# Find the corresponding coordinate and update
|
||||
corresponding_device = y_device if device_name == x_device else x_device
|
||||
self._update_corresponding_coordinate(corresponding_device)
|
||||
# Check if the device is relevant to our current context
|
||||
if device_name in self.device_mapping:
|
||||
self._update_device_data(device_name, device_info["value"])
|
||||
|
||||
self.update_signal.emit()
|
||||
|
||||
def _find_xy_devices(self, device_name):
|
||||
for motor in self.config.get("motors", []):
|
||||
x_signals = [signal["name"] for signal in motor["signals"]["x"]]
|
||||
y_signals = [signal["name"] for signal in motor["signals"]["y"]]
|
||||
if device_name in x_signals:
|
||||
return True, device_name, motor["signals"]["y"][0]["name"]
|
||||
elif device_name in y_signals:
|
||||
return True, motor["signals"]["x"][0]["name"], device_name
|
||||
return False, None, None
|
||||
|
||||
def _update_database(self, device_name: str, value: float):
|
||||
def _update_device_data(self, device_name: str, value: float):
|
||||
"""
|
||||
Update the database with the new value.
|
||||
Update the device data.
|
||||
Args:
|
||||
device_name (str): Device name.
|
||||
value (float): Device value.
|
||||
"""
|
||||
# Update the database with the new value
|
||||
if device_name in self.database:
|
||||
self.database[device_name][device_name].append(value)
|
||||
|
||||
def _update_corresponding_coordinate(self, corresponding_device: str):
|
||||
"""
|
||||
Update the corresponding coordinate with the last known value.
|
||||
Args:
|
||||
corresponding_device: device name of the corresponding coordinate
|
||||
"""
|
||||
# Update the database for the corresponding device with the last known value
|
||||
if corresponding_device and corresponding_device in self.database:
|
||||
if self.database[corresponding_device][corresponding_device]:
|
||||
last_value = self.database[corresponding_device][corresponding_device][-1]
|
||||
else:
|
||||
last_value = None # Default value if no data is available
|
||||
|
||||
self.database[corresponding_device][corresponding_device].append(last_value)
|
||||
corresponding_device = self.device_mapping.get(device_name)
|
||||
if corresponding_device and corresponding_device in self.database:
|
||||
last_value = (
|
||||
self.database[corresponding_device][corresponding_device][-1]
|
||||
if self.database[corresponding_device][corresponding_device]
|
||||
else None
|
||||
)
|
||||
self.database[corresponding_device][corresponding_device].append(last_value)
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
|
Reference in New Issue
Block a user