mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 03:31:50 +02:00
refactor: getting motor limits and coordinates moved to MotorControl(QThread)
This commit is contained in:
@ -260,7 +260,7 @@
|
|||||||
<item row="2" column="3">
|
<item row="2" column="3">
|
||||||
<widget class="QLabel" name="label_3">
|
<widget class="QLabel" name="label_3">
|
||||||
<property name="text">
|
<property name="text">
|
||||||
<string>- samx</string>
|
<string>+ samx</string>
|
||||||
</property>
|
</property>
|
||||||
</widget>
|
</widget>
|
||||||
</item>
|
</item>
|
||||||
|
@ -3,7 +3,7 @@ import os
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
import pyqtgraph as pg
|
import pyqtgraph as pg
|
||||||
from PyQt5 import QtGui
|
from PyQt5 import QtGui
|
||||||
from PyQt5.QtCore import QThread
|
from PyQt5.QtCore import QThread, pyqtSlot
|
||||||
from PyQt5.QtCore import pyqtSignal, Qt
|
from PyQt5.QtCore import pyqtSignal, Qt
|
||||||
from PyQt5.QtWidgets import QApplication, QWidget
|
from PyQt5.QtWidgets import QApplication, QWidget
|
||||||
from pyqtgraph.Qt import QtWidgets, uic
|
from pyqtgraph.Qt import QtWidgets, uic
|
||||||
@ -12,9 +12,6 @@ from bec_lib.core import MessageEndpoints, BECMessage
|
|||||||
|
|
||||||
|
|
||||||
class MotorApp(QWidget):
|
class MotorApp(QWidget):
|
||||||
motor_position = pyqtSignal(float) # TODO hook to motor position update
|
|
||||||
motor_update = pyqtSignal()
|
|
||||||
motor_position_xy = pyqtSignal(list)
|
|
||||||
coordinates_updated = pyqtSignal(float, float)
|
coordinates_updated = pyqtSignal(float, float)
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@ -22,34 +19,51 @@ class MotorApp(QWidget):
|
|||||||
current_path = os.path.dirname(__file__)
|
current_path = os.path.dirname(__file__)
|
||||||
uic.loadUi(os.path.join(current_path, "motor_controller.ui"), self)
|
uic.loadUi(os.path.join(current_path, "motor_controller.ui"), self)
|
||||||
|
|
||||||
# UI
|
self.limit_x, self.limit_y = None, None
|
||||||
self.start_x, self.start_y = self.get_xy()
|
|
||||||
self.current_x, self.current_y = self.get_xy()
|
|
||||||
|
|
||||||
self.limit_x, self.limit_y = self.get_motor_limits(dev.samx), self.get_motor_limits(
|
# QThread for motor movement + signals
|
||||||
dev.samy
|
self.motor_thread = MotorControl()
|
||||||
)
|
self.motor_thread.limits_retrieved.connect(self.update_limits)
|
||||||
|
|
||||||
|
# UI
|
||||||
|
self.init_ui()
|
||||||
|
|
||||||
|
# Initialize current coordinates with the provided initial coordinates
|
||||||
|
self.motor_thread.retrieve_motor_limits(dev.samx, dev.samy)
|
||||||
|
|
||||||
print(f"Init limits: samx:{self.limit_x}, samy:{self.limit_y}")
|
print(f"Init limits: samx:{self.limit_x}, samy:{self.limit_y}")
|
||||||
|
|
||||||
self.label_status.setText(
|
self.label_status.setText(
|
||||||
f"Motor position: ({dev.samx.position():.2f}, {dev.samy.position():.2f})"
|
f"Motor position: ({dev.samx.position():.2f}, {dev.samy.position():.2f})"
|
||||||
)
|
) # TODO has to be change to get positions from thread not directly from device
|
||||||
|
|
||||||
self.motor_thread = MotorMovement()
|
@pyqtSlot(list, list)
|
||||||
|
def update_limits(self, x_limits: list, y_limits: list) -> None:
|
||||||
|
self.limit_x = x_limits
|
||||||
|
self.limit_y = y_limits
|
||||||
|
self.spinBox_x_min.setValue(self.limit_x[0])
|
||||||
|
self.spinBox_x_max.setValue(self.limit_x[1])
|
||||||
|
self.spinBox_y_min.setValue(self.limit_y[0])
|
||||||
|
self.spinBox_y_max.setValue(self.limit_y[1])
|
||||||
|
|
||||||
self.init_ui()
|
for spinBox in (
|
||||||
|
self.spinBox_x_min,
|
||||||
|
self.spinBox_x_max,
|
||||||
|
self.spinBox_y_min,
|
||||||
|
self.spinBox_y_max,
|
||||||
|
):
|
||||||
|
spinBox.setStyleSheet("")
|
||||||
|
|
||||||
def move_motor_absolute(self, x, y):
|
self.init_motor_map() # reinitialize the map with the new limits
|
||||||
# x = self.spinBox_absolute_x.value()
|
|
||||||
# y = self.spinBox_absolute_y.value()
|
def move_motor_absolute(self, x: float, y: float) -> None:
|
||||||
target_coordinates = (x, y)
|
target_coordinates = (x, y)
|
||||||
self.motor_thread.move_to_coordinates(target_coordinates)
|
self.motor_thread.move_to_coordinates(target_coordinates)
|
||||||
|
|
||||||
def move_motor_relative(self, motor, value: float):
|
def move_motor_relative(self, motor, value: float) -> None:
|
||||||
self.motor_thread.move_relative(motor, value)
|
self.motor_thread.move_relative(motor, value)
|
||||||
|
|
||||||
def init_ui(self):
|
def init_ui(self) -> None:
|
||||||
"""Setup all ui elements"""
|
"""Setup all ui elements"""
|
||||||
|
|
||||||
##########################
|
##########################
|
||||||
@ -80,6 +94,7 @@ class MotorApp(QWidget):
|
|||||||
lambda: self.move_motor_relative(dev.samy, -self.spinBox_step.value())
|
lambda: self.move_motor_relative(dev.samy, -self.spinBox_step.value())
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Switch between key shortcuts active
|
||||||
self.checkBox_enableArrows.stateChanged.connect(self.update_arrow_key_shortcuts)
|
self.checkBox_enableArrows.stateChanged.connect(self.update_arrow_key_shortcuts)
|
||||||
self.update_arrow_key_shortcuts()
|
self.update_arrow_key_shortcuts()
|
||||||
|
|
||||||
@ -92,12 +107,6 @@ class MotorApp(QWidget):
|
|||||||
|
|
||||||
# SpinBoxes - Motor Limits #TODO make spinboxes own limits updated, currently is [-1000, 1000]
|
# SpinBoxes - Motor Limits #TODO make spinboxes own limits updated, currently is [-1000, 1000]
|
||||||
|
|
||||||
# display initial limits
|
|
||||||
self.spinBox_x_min.setValue(self.limit_x[0])
|
|
||||||
self.spinBox_x_max.setValue(self.limit_x[1])
|
|
||||||
self.spinBox_y_min.setValue(self.limit_y[0])
|
|
||||||
self.spinBox_y_max.setValue(self.limit_y[1])
|
|
||||||
|
|
||||||
# SpinBoxes change color to yellow before updated, limits are updated with update button
|
# SpinBoxes change color to yellow before updated, limits are updated with update button
|
||||||
self.spinBox_x_min.valueChanged.connect(lambda: self.param_changed(self.spinBox_x_min))
|
self.spinBox_x_min.valueChanged.connect(lambda: self.param_changed(self.spinBox_x_min))
|
||||||
self.spinBox_x_max.valueChanged.connect(lambda: self.param_changed(self.spinBox_x_max))
|
self.spinBox_x_max.valueChanged.connect(lambda: self.param_changed(self.spinBox_x_max))
|
||||||
@ -111,28 +120,19 @@ class MotorApp(QWidget):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Map
|
# TODO map with floats as well -> or decide system for higher precision
|
||||||
self.motor_position.connect(lambda x: self.update_image_map(*self.get_xy()))
|
|
||||||
self.motor_update.connect(lambda: self.update_image_map(*self.get_xy()))
|
|
||||||
|
|
||||||
self.motor_position.connect(lambda x: print(f"motor position updated: {x}"))
|
|
||||||
# self.motor_thread.coordinates_updated.connect(
|
|
||||||
# lambda: self.label_status.setText(
|
|
||||||
# f"Motor position: ({dev.samx.position():.2f}, {dev.samy.position():.2f})"
|
|
||||||
# )
|
|
||||||
# )
|
|
||||||
|
|
||||||
self.motor_thread.coordinates_updated.connect(
|
self.motor_thread.coordinates_updated.connect(
|
||||||
lambda x, y: self.update_image_map(int(x), int(y))
|
lambda x, y: self.update_image_map(int(x), int(y))
|
||||||
)
|
)
|
||||||
|
|
||||||
# Coordinates table
|
# Coordinates table
|
||||||
self.generate_table_coordinate(
|
self.generate_table_coordinate(
|
||||||
self.tableWidget_coordinates, self.get_xy(), tag="Initial", precision=0
|
self.tableWidget_coordinates,
|
||||||
|
self.motor_thread.retrieve_coordinates(),
|
||||||
|
tag="Initial",
|
||||||
|
precision=0,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.init_motor_map()
|
|
||||||
|
|
||||||
def init_motor_map(self):
|
def init_motor_map(self):
|
||||||
# Get motor limits
|
# Get motor limits
|
||||||
limit_x_min, limit_x_max = self.get_motor_limits(dev.samx)
|
limit_x_min, limit_x_max = self.get_motor_limits(dev.samx)
|
||||||
@ -152,8 +152,7 @@ class MotorApp(QWidget):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Set the initial position on the map
|
# Set the initial position on the map
|
||||||
x, y = self.get_xy()
|
x, y = self.motor_thread.retrieve_coordinates()
|
||||||
self.prev_x, self.prev_y = x, y
|
|
||||||
self.update_image_map(x, y)
|
self.update_image_map(x, y)
|
||||||
|
|
||||||
# Translate and scale the image item to match the motor coordinates
|
# Translate and scale the image item to match the motor coordinates
|
||||||
@ -189,58 +188,10 @@ class MotorApp(QWidget):
|
|||||||
|
|
||||||
return low_limit, high_limit
|
return low_limit, high_limit
|
||||||
|
|
||||||
def update_motor_limits(
|
def update_all_motor_limits(
|
||||||
self, motor, low_limit=None, high_limit=None
|
self, x_limit: list = None, y_limit: list = None
|
||||||
): # TODO limits cannot be smaller that the current location of motor
|
) -> None: # TODO will be moved to thread
|
||||||
# Get current limits
|
self.motor_thread.update_all_motor_limits(x_limit=x_limit, y_limit=y_limit)
|
||||||
current_low_limit, current_high_limit = motor.limits[0], motor.limits[1]
|
|
||||||
|
|
||||||
# Check if the low limit has changed and is not None
|
|
||||||
if low_limit is not None and low_limit != current_low_limit:
|
|
||||||
motor.low_limit = low_limit
|
|
||||||
|
|
||||||
# Check if the high limit has changed and is not None
|
|
||||||
if high_limit is not None and high_limit != current_high_limit:
|
|
||||||
motor.high_limit = high_limit
|
|
||||||
|
|
||||||
# self.init_motor_map() # reinitialize the map with the new limits
|
|
||||||
|
|
||||||
def update_all_motor_limits(self, x_limit: list = None, y_limit: list = None) -> None:
|
|
||||||
# check if current motor position if within the new limits
|
|
||||||
|
|
||||||
current_position = self.get_xy()
|
|
||||||
|
|
||||||
if x_limit is not None:
|
|
||||||
if current_position[0] < x_limit[0] or current_position[0] > x_limit[1]:
|
|
||||||
raise ValueError("Current motor position is outside the new limits (X)")
|
|
||||||
else:
|
|
||||||
self.update_motor_limits(dev.samx, low_limit=x_limit[0], high_limit=x_limit[1])
|
|
||||||
|
|
||||||
if y_limit is not None:
|
|
||||||
if current_position[1] < y_limit[0] or current_position[1] > y_limit[1]:
|
|
||||||
raise ValueError("Current motor position is outside the new limits (Y)")
|
|
||||||
else:
|
|
||||||
self.update_motor_limits(dev.samy, low_limit=y_limit[0], high_limit=y_limit[1])
|
|
||||||
|
|
||||||
for spinBox in (
|
|
||||||
self.spinBox_x_min,
|
|
||||||
self.spinBox_x_max,
|
|
||||||
self.spinBox_y_min,
|
|
||||||
self.spinBox_y_max,
|
|
||||||
):
|
|
||||||
spinBox.setStyleSheet("")
|
|
||||||
|
|
||||||
self.init_motor_map() # reinitialize the map with the new limits
|
|
||||||
|
|
||||||
def get_xy(self): # TODO decide if useful
|
|
||||||
"""Get current motor position"""
|
|
||||||
x = dev.samx.position()
|
|
||||||
y = dev.samy.position()
|
|
||||||
return x, y
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def param_changed(ui_element):
|
|
||||||
ui_element.setStyleSheet("background-color: #FFA700;")
|
|
||||||
|
|
||||||
def update_arrow_key_shortcuts(self):
|
def update_arrow_key_shortcuts(self):
|
||||||
if self.checkBox_enableArrows.isChecked():
|
if self.checkBox_enableArrows.isChecked():
|
||||||
@ -257,7 +208,7 @@ class MotorApp(QWidget):
|
|||||||
self.toolButton_down.setShortcut("")
|
self.toolButton_down.setShortcut("")
|
||||||
|
|
||||||
def generate_table_coordinate(
|
def generate_table_coordinate(
|
||||||
self, table: QtWidgets.QTableWidget, coordinates: list, tag: str = None, precision: int = 0
|
self, table: QtWidgets.QTableWidget, coordinates: tuple, tag: str = None, precision: int = 0
|
||||||
) -> None:
|
) -> None:
|
||||||
current_row_count = table.rowCount()
|
current_row_count = table.rowCount()
|
||||||
|
|
||||||
@ -288,20 +239,26 @@ class MotorApp(QWidget):
|
|||||||
)
|
)
|
||||||
table.resizeColumnsToContents()
|
table.resizeColumnsToContents()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def param_changed(ui_element):
|
||||||
|
ui_element.setStyleSheet("background-color: #FFA700;")
|
||||||
|
|
||||||
class MotorMovement(QThread):
|
|
||||||
|
class MotorControl(QThread):
|
||||||
coordinates_updated = pyqtSignal(float, float) # Signal to emit current coordinates
|
coordinates_updated = pyqtSignal(float, float) # Signal to emit current coordinates
|
||||||
|
limits_retrieved = pyqtSignal(list, list) # Signal to emit current limits (samx, samy)
|
||||||
# progress_updated = pyqtSignal(int) # Signal to emit progress percentage
|
# progress_updated = pyqtSignal(int) #TODO Signal to emit progress percentage
|
||||||
|
|
||||||
def __init__(self, parent=None):
|
def __init__(self, parent=None):
|
||||||
super().__init__(parent)
|
super().__init__(parent)
|
||||||
# self.initial_coordinates = None
|
|
||||||
self.target_coordinates = None
|
self.target_coordinates = None
|
||||||
self.running = False
|
self.running = False
|
||||||
|
|
||||||
# Initialize current coordinates with the provided initial coordinates
|
# Initialize current coordinates with the provided initial coordinates
|
||||||
self.current_x, self.current_y = None, None
|
(
|
||||||
|
self.current_x,
|
||||||
|
self.current_y,
|
||||||
|
) = self.get_coordinates() # TODO maybe not needed, coordinates always stored in redis
|
||||||
|
|
||||||
# Create consumer for samx and samy
|
# Create consumer for samx and samy
|
||||||
self.samx_consumer = client.connector.consumer(
|
self.samx_consumer = client.connector.consumer(
|
||||||
@ -319,7 +276,56 @@ class MotorMovement(QThread):
|
|||||||
self.samx_consumer.start()
|
self.samx_consumer.start()
|
||||||
self.samy_consumer.start()
|
self.samy_consumer.start()
|
||||||
|
|
||||||
self.get_xy()
|
def get_coordinates(self) -> tuple:
|
||||||
|
# TODO decide if the get_coordinates and retrieve coordinates makes sense
|
||||||
|
# TODO or move also to signal/slot as in the case of limits
|
||||||
|
# TODO or move just to redis CB function
|
||||||
|
"""Get current motor position"""
|
||||||
|
x = dev.samx.position()
|
||||||
|
y = dev.samy.position()
|
||||||
|
return x, y
|
||||||
|
|
||||||
|
def retrieve_coordinates(self) -> tuple:
|
||||||
|
"""Get current motor position for export to main app"""
|
||||||
|
return self.current_x, self.current_y
|
||||||
|
|
||||||
|
def get_motor_limits(self, motor) -> list:
|
||||||
|
"""Get the limits of a motor"""
|
||||||
|
return motor.limits
|
||||||
|
|
||||||
|
def retrieve_motor_limits(self, motor_x, motor_y):
|
||||||
|
limit_x = self.get_motor_limits(motor_x)
|
||||||
|
limit_y = self.get_motor_limits(motor_y)
|
||||||
|
self.limits_retrieved.emit(limit_x, limit_y)
|
||||||
|
|
||||||
|
def update_motor_limits(self, motor, low_limit=None, high_limit=None) -> None:
|
||||||
|
current_low_limit, current_high_limit = self.get_motor_limits(motor)
|
||||||
|
|
||||||
|
# Check if the low limit has changed and is not None
|
||||||
|
if low_limit is not None and low_limit != current_low_limit:
|
||||||
|
motor.low_limit = low_limit
|
||||||
|
|
||||||
|
# Check if the high limit has changed and is not None
|
||||||
|
if high_limit is not None and high_limit != current_high_limit:
|
||||||
|
motor.high_limit = high_limit
|
||||||
|
|
||||||
|
def update_all_motor_limits(self, x_limit: list = None, y_limit: list = None) -> None:
|
||||||
|
# TODO can be generalized to any motor not just samx samy
|
||||||
|
current_position = self.get_coordinates()
|
||||||
|
|
||||||
|
if x_limit is not None:
|
||||||
|
if current_position[0] < x_limit[0] or current_position[0] > x_limit[1]:
|
||||||
|
raise ValueError("Current motor position is outside the new limits (X)")
|
||||||
|
else:
|
||||||
|
self.update_motor_limits(dev.samx, low_limit=x_limit[0], high_limit=x_limit[1])
|
||||||
|
|
||||||
|
if y_limit is not None:
|
||||||
|
if current_position[1] < y_limit[0] or current_position[1] > y_limit[1]:
|
||||||
|
raise ValueError("Current motor position is outside the new limits (Y)")
|
||||||
|
else:
|
||||||
|
self.update_motor_limits(dev.samy, low_limit=y_limit[0], high_limit=y_limit[1])
|
||||||
|
|
||||||
|
self.retrieve_motor_limits(dev.samx, dev.samy)
|
||||||
|
|
||||||
def move_to_coordinates(self, target_coordinates: tuple):
|
def move_to_coordinates(self, target_coordinates: tuple):
|
||||||
self.action = "move_to_coordinates"
|
self.action = "move_to_coordinates"
|
||||||
@ -341,14 +347,8 @@ class MotorMovement(QThread):
|
|||||||
def set_target_coordinates(self, target_coordinates: tuple) -> None:
|
def set_target_coordinates(self, target_coordinates: tuple) -> None:
|
||||||
self.target_coordinates = target_coordinates
|
self.target_coordinates = target_coordinates
|
||||||
|
|
||||||
def get_xy(self) -> tuple:
|
def move_motor_coordinate(self) -> None:
|
||||||
"""Get current motor position"""
|
|
||||||
self.current_x = dev.samx.position()
|
|
||||||
self.current_y = dev.samy.position()
|
|
||||||
|
|
||||||
def move_motor_coordinate(self):
|
|
||||||
"""Move the motor to the specified coordinates"""
|
"""Move the motor to the specified coordinates"""
|
||||||
self.get_xy()
|
|
||||||
scans.mv(
|
scans.mv(
|
||||||
dev.samx,
|
dev.samx,
|
||||||
self.target_coordinates[0],
|
self.target_coordinates[0],
|
||||||
@ -358,25 +358,21 @@ class MotorMovement(QThread):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def move_motor_relative(self, motor, value: float) -> None:
|
def move_motor_relative(self, motor, value: float) -> None:
|
||||||
self.get_xy()
|
|
||||||
scans.mv(motor, value, relative=True)
|
scans.mv(motor, value, relative=True)
|
||||||
|
|
||||||
|
# TODO decide if one or multiple callback functions
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _device_status_callback_samx(msg, *, parent, **_kwargs) -> None:
|
def _device_status_callback_samx(msg, *, parent, **_kwargs) -> None:
|
||||||
deviceMSG = BECMessage.DeviceMessage.loads(msg.value)
|
deviceMSG = BECMessage.DeviceMessage.loads(msg.value)
|
||||||
parent.current_x = deviceMSG.content["signals"]["samx"]["value"]
|
parent.current_x = deviceMSG.content["signals"]["samx"]["value"]
|
||||||
print(f"samx moving: {parent.current_x,parent.current_y}")
|
print(f"samx moving: {parent.current_x,parent.current_y}")
|
||||||
# y = parent.current_y
|
|
||||||
parent.coordinates_updated.emit(parent.current_x, parent.current_y)
|
parent.coordinates_updated.emit(parent.current_x, parent.current_y)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _device_status_callback_samy(msg, *, parent, **_kwargs) -> None:
|
def _device_status_callback_samy(msg, *, parent, **_kwargs) -> None:
|
||||||
deviceMSG = BECMessage.DeviceMessage.loads(msg.value)
|
deviceMSG = BECMessage.DeviceMessage.loads(msg.value)
|
||||||
parent.current_y = deviceMSG.content["signals"]["samy"][
|
parent.current_y = deviceMSG.content["signals"]["samy"]["value"]
|
||||||
"value"
|
|
||||||
] # TODO can be move to parent instance
|
|
||||||
print(f"samy moving: {parent.current_x,parent.current_y}")
|
print(f"samy moving: {parent.current_x,parent.current_y}")
|
||||||
# x = parent.current_y
|
|
||||||
parent.coordinates_updated.emit(parent.current_x, parent.current_y)
|
parent.coordinates_updated.emit(parent.current_x, parent.current_y)
|
||||||
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user