0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-13 19:21:50 +02:00

refactor: motor movement as a QThread

This commit is contained in:
wyzula-jan
2023-08-22 18:16:44 +02:00
parent 92388c3cab
commit af2fcffd5f
2 changed files with 222 additions and 38 deletions

View File

@ -60,17 +60,27 @@
<property name="title">
<string>General</string>
</property>
<layout class="QHBoxLayout" name="horizontalLayout_2">
<item>
<layout class="QGridLayout" name="gridLayout_4">
<item row="0" column="0">
<widget class="QLabel" name="label_5">
<property name="text">
<string>Decimal precision</string>
</property>
</widget>
</item>
<item>
<item row="0" column="1">
<widget class="QSpinBox" name="spinBox"/>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_8">
<property name="text">
<string>Speed</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QDoubleSpinBox" name="doubleSpinBox_speed"/>
</item>
</layout>
</widget>
</item>

View File

@ -3,15 +3,19 @@ import os
import numpy as np
import pyqtgraph as pg
from PyQt5 import QtGui
from PyQt5.QtCore import QThread
from PyQt5.QtCore import pyqtSignal, Qt
from PyQt5.QtWidgets import QApplication, QWidget
from pyqtgraph.Qt import QtWidgets, uic
from bec_lib.core import MessageEndpoints, BECMessage
class MotorApp(QWidget):
motor_position = pyqtSignal(float) # TODO hook to motor position update
motor_update = pyqtSignal()
motor_position_xy = pyqtSignal(list)
coordinates_updated = pyqtSignal(float, float)
def __init__(self):
super().__init__()
@ -20,6 +24,7 @@ class MotorApp(QWidget):
# UI
self.start_x, self.start_y = self.get_xy()
self.current_x, self.current_y = self.get_xy()
self.limit_x, self.limit_y = self.get_motor_limits(dev.samx), self.get_motor_limits(
dev.samy
)
@ -29,8 +34,39 @@ class MotorApp(QWidget):
self.label_status.setText(
f"Motor position: ({dev.samx.position():.2f}, {dev.samy.position():.2f})"
)
# self.samx_consumer = client.connector.consumer(topics=[MessageEndpoints.device_readback('samx'),
# MessageEndpoints.device_readback('samy')],
# cb=self._device_status_callback,parent=self)
# self.samx_consumer.start()
# self.samx_consumer = client.connector.consumer(
# topics=[MessageEndpoints.device_readback("samx")],
# cb=self._device_status_callback_samx,
# parent=self,
# )
#
# self.samy_consumer = client.connector.consumer(
# topics=[MessageEndpoints.device_readback("samy")],
# cb=self._device_status_callback_samy,
# parent=self,
# )
# self.samx_consumer.start()
# self.samy_consumer.start()
self.motor_thread = MotorMovement()
self.init_ui()
def on_move_to_coordinates_button_clicked(self):
x = self.spinBox_absolute_x.value()
y = self.spinBox_absolute_y.value()
target_coordinates = (x, y)
self.motor_thread.move_to_coordinates(target_coordinates)
def on_move_relative_button_clicked(self, motor, value: float):
self.motor_thread.move_relative(motor, value)
def init_ui(self):
"""Setup all ui elements"""
##########################
@ -56,17 +92,30 @@ class MotorApp(QWidget):
##########################
# Buttons - Motor Movement
# self.toolButton_right.clicked.connect(
# lambda: self.move_motor(dev.samx, self.spinBox_step.value())
# )
# self.toolButton_left.clicked.connect(
# lambda: self.move_motor(dev.samx, -self.spinBox_step.value())
# )
# self.toolButton_up.clicked.connect(
# lambda: self.move_motor(dev.samy, self.spinBox_step.value())
# )
# self.toolButton_down.clicked.connect(
# lambda: self.move_motor(dev.samy, -self.spinBox_step.value())
# )
self.toolButton_right.clicked.connect(
lambda: self.move_motor(dev.samx, self.spinBox_step.value())
lambda: self.on_move_relative_button_clicked(dev.samx, self.spinBox_step.value())
)
self.toolButton_left.clicked.connect(
lambda: self.move_motor(dev.samx, -self.spinBox_step.value())
lambda: self.on_move_relative_button_clicked(dev.samx, -self.spinBox_step.value())
)
self.toolButton_up.clicked.connect(
lambda: self.move_motor(dev.samy, self.spinBox_step.value())
lambda: self.on_move_relative_button_clicked(dev.samy, self.spinBox_step.value())
)
self.toolButton_down.clicked.connect(
lambda: self.move_motor(dev.samy, -self.spinBox_step.value())
lambda: self.on_move_relative_button_clicked(dev.samy, -self.spinBox_step.value())
)
self.checkBox_enableArrows.stateChanged.connect(self.update_arrow_key_shortcuts)
@ -80,11 +129,6 @@ class MotorApp(QWidget):
)
# SpinBoxes - Motor Limits #TODO make spinboxes own limits updated, currently is [-1000, 1000]
# manually set limits before readout #TODO will be removed when spinboxes have own limits
# dev.samx.low_limit = -100
# dev.samx.high_limit = 100
# dev.samy.low_limit = -100
# dev.samy.high_limit = 100
# display initial limits
self.spinBox_x_min.setValue(self.limit_x[0])
@ -110,10 +154,14 @@ class MotorApp(QWidget):
self.motor_update.connect(lambda: self.update_image_map(*self.get_xy()))
self.motor_position.connect(lambda x: print(f"motor position updated: {x}"))
self.motor_update.connect(
lambda: self.label_status.setText(
f"Motor position: ({dev.samx.position():.2f}, {dev.samy.position():.2f})"
)
# self.motor_thread.coordinates_updated.connect(
# lambda: self.label_status.setText(
# f"Motor position: ({dev.samx.position():.2f}, {dev.samy.position():.2f})"
# )
# )
self.motor_thread.coordinates_updated.connect(
lambda x, y: self.update_image_map(int(x), int(y))
)
# Coordinates table
@ -157,10 +205,10 @@ class MotorApp(QWidget):
"""Update the image map with the new motor position"""
# Define the dimming factor
dimming_factor = 0.80
dimming_factor = 0.95
# Apply the dimming factor only to pixels above the background value
self.image_map_data[self.image_map_data > self.background_value] *= dimming_factor
self.image_map_data[self.image_map_data > 50] *= dimming_factor
# Mapping of motor coordinates to pixel coordinates
pixel_x = int(x - self.offset_x)
@ -223,29 +271,37 @@ class MotorApp(QWidget):
self.init_motor_map() # reinitialize the map with the new limits
def move_motor(self, motor, value: float) -> None:
try:
status = scans.mv(motor, value, relative=True)
self.current_x, self.current_y = self.get_xy()
scans.mv(motor, value, relative=True)
while status.status not in ["COMPLETED", "STOPPED"]:
print(status.status)
motor_position = motor.position()
self.motor_position.emit(motor_position)
# def move_motor(self, motor, value: float) -> None:
# # self.current_x, self.current_y = self.get_xy()
# scans.mv(motor, value, relative=True)
# try:
# status = scans.mv(motor, value, relative=True)
#
# # while status.status not in ["COMPLETED", "STOPPED"]:
# # print(status.status)
# # print(f"motor read{motor.read(cached=True)}")
# # motor_position = motor.position()
# # self.motor_position.emit(motor_position)
#
# # return status
# except Exception as e:
# print(e)
# return status
except Exception as e:
print(e)
# motor_position = motor.position()
motor_position = motor.position()
self.motor_position.emit(motor_position)
self.motor_update.emit()
# self.motor_position.emit(motor_position)
# self.motor_update.emit()
def move_motor_coordinates(self, coor: list) -> None:
scans.mv(dev.samx, coor[0], dev.samy, coor[1], relative=False)
try:
status = scans.mv(dev.samx, coor[0], dev.samy, coor[1], relative=False)
while status.status not in ["COMPLETED", "STOPPED"]:
print(status.status)
# while status.status not in ["COMPLETED", "STOPPED"]:
# print(status.status)
except Exception as e:
print(e)
@ -313,19 +369,137 @@ class MotorApp(QWidget):
table.resizeColumnsToContents()
# @staticmethod
# def _device_status_callback(msg, *, parent, **_kwargs) -> None:
# deviceMSG = BECMessage.DeviceMessage.loads(msg.value)
# # current_coordinates = (deviceMSG.content['signals']['samx']['value'],deviceMSG.content['signals']['samy']['value'])
# # print(f'current coordinates: {current_coordinates}')
# # parent.scan_storage.update_with_scan_status(scan)
# print(deviceMSG)
# print(deviceMSG.metadata)
# parent.motor_update.emit(deviceMSG)
# @staticmethod
# def _device_status_callback_samx(msg, *, parent, **_kwargs) -> None:
# deviceMSG = BECMessage.DeviceMessage.loads(msg.value)
# x = deviceMSG.content["signals"]["samx"]["value"]
# print(f"x: {x}")
# # Assuming you have a way to retrieve or store the current samy value
# y = parent.current_y
# parent.coordinates_updated.emit(x, y)
#
# @staticmethod
# def _device_status_callback_samy(msg, *, parent, **_kwargs) -> None:
# deviceMSG = BECMessage.DeviceMessage.loads(msg.value)
# y = deviceMSG.content["signals"]["samy"]["value"]
# print(f"y: {y}")
# # # Assuming you have a way to retrieve or store the current samx value
# x = parent.current_x
# parent.coordinates_updated.emit(x, y)
class MotorMovement(QThread):
coordinates_updated = pyqtSignal(float, float) # Signal to emit current coordinates
# progress_updated = pyqtSignal(int) # Signal to emit progress percentage
def __init__(self, parent=None):
super().__init__(parent)
# self.initial_coordinates = None
self.target_coordinates = None
self.running = False
# Initialize current coordinates with the provided initial coordinates
self.current_x, self.current_y = None, None
# Create consumer for samx and samy
self.samx_consumer = client.connector.consumer(
topics=[MessageEndpoints.device_readback("samx")],
cb=self._device_status_callback_samx,
parent=self,
)
self.samy_consumer = client.connector.consumer(
topics=[MessageEndpoints.device_readback("samy")],
cb=self._device_status_callback_samy,
parent=self,
)
self.samx_consumer.start()
self.samy_consumer.start()
self.get_xy()
def move_to_coordinates(self, target_coordinates: tuple):
self.action = "move_to_coordinates"
self.target_coordinates = target_coordinates
self.start()
def move_relative(self, motor, value: float):
self.action = "move_relative"
self.motor = motor
self.value = value
self.start()
def run(self):
if self.action == "move_to_coordinates":
self.move_motor_coordinate()
elif self.action == "move_relative":
self.move_motor_relative(self.motor, self.value)
def set_target_coordinates(self, target_coordinates: tuple) -> None:
self.target_coordinates = target_coordinates
def get_xy(self) -> tuple:
"""Get current motor position"""
self.current_x = dev.samx.position()
self.current_y = dev.samy.position()
def move_motor_coordinate(self):
"""Move the motor to the specified coordinates"""
self.get_xy()
scans.mv(
dev.samx,
self.target_coordinates[0],
dev.samy,
self.target_coordinates[1],
relative=False,
)
def move_motor_relative(self, motor, value: float) -> None:
self.get_xy()
scans.mv(motor, value, relative=True)
@staticmethod
def _device_status_callback_samx(msg, *, parent, **_kwargs) -> None:
deviceMSG = BECMessage.DeviceMessage.loads(msg.value)
parent.current_x = deviceMSG.content["signals"]["samx"]["value"]
print(f"samx moving: {parent.current_x,parent.current_y}")
# y = parent.current_y
parent.coordinates_updated.emit(parent.current_x, parent.current_y)
@staticmethod
def _device_status_callback_samy(msg, *, parent, **_kwargs) -> None:
deviceMSG = BECMessage.DeviceMessage.loads(msg.value)
parent.current_y = deviceMSG.content["signals"]["samy"][
"value"
] # TODO can be move to parent instance
print(f"samy moving: {parent.current_x,parent.current_y}")
# x = parent.current_y
parent.coordinates_updated.emit(parent.current_x, parent.current_y)
if __name__ == "__main__":
from bec_widgets.bec_dispatcher import bec_dispatcher
from bec_lib import BECClient
# from bec_lib import BECClient
# from bec_lib.core import ServiceConfig,RedisConnector
# client = BECClient()
# client.initialize(config=ServiceConfig(config_path="test_config.yaml"))
client = BECClient()
# client.initialize(config=ServiceConfig(config_path="test_config.yaml"))
# Client initialization - by dispatcher
client = bec_dispatcher.client
# client = bec_dispatcher.client
client.start()
dev = client.device_manager.devices
scans = client.scans