0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-13 19:21:50 +02:00

feat: stop movement function, one callback function for 2 motors, move_finished is emitted in move_motor function not in callback

This commit is contained in:
wyzula-jan
2023-08-24 17:22:21 +02:00
parent ed84293197
commit 187c748e87
2 changed files with 41 additions and 95 deletions

View File

@ -16,16 +16,6 @@
<layout class="QHBoxLayout" name="horizontalLayout_3">
<item>
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<widget class="QLabel" name="label_status">
<property name="text">
<string>Motor Position</string>
</property>
<property name="alignment">
<set>Qt::AlignCenter</set>
</property>
</widget>
</item>
<item>
<layout class="QHBoxLayout" name="horizontalLayout">
<item>

View File

@ -36,10 +36,6 @@ class MotorApp(QWidget):
print(f"Init limits: samx:{self.limit_x}, samy:{self.limit_y}")
self.label_status.setText(
f"Motor position: ({dev.samx.position():.2f}, {dev.samy.position():.2f})"
) # TODO has to be change to get positions from thread not directly from device
# self.background_map_scale = 10
# self.visited_coordinates = {}
# self.precision = 2 # Define the decimal precision
@ -106,7 +102,7 @@ class MotorApp(QWidget):
self.plot_map.addItem(self.motor_map)
##########################
# Signals
# Motor movements signals
##########################
self.toolButton_right.clicked.connect(
@ -133,9 +129,15 @@ class MotorApp(QWidget):
)
)
self.pushButton_go_absolute.clicked.connect(self.save_absolute_coordinates)
self.motor_thread.move_finished.connect(lambda: self.enable_motor_controls(True))
# Stop Button
self.pushButton_stop.clicked.connect(self.motor_thread.stop_movement)
##########################
# Motor limits signals
##########################
# SpinBoxes - Motor Limits #TODO make spinboxes own limits updated, currently is [-1000, 1000]
# SpinBoxes change color to yellow before updated, limits are updated with update button
@ -192,21 +194,19 @@ class MotorApp(QWidget):
brush=[point["brush"] for point in self.motor_positions],
)
# self.update_image_map(x, y)
#
# Translate and scale the image item to match the motor coordinates
self.tr = QtGui.QTransform()
self.tr.translate(limit_x_min, limit_y_min)
self.limit_map.setTransform(self.tr)
#
# self.image_map.dataTransform()
def update_image_map(self, x, y):
# Update label
self.label_coorditanes.setText(f"Motor position: ({x:.2f}, {y:.2f})")
# Dim previous points
# for point in self.motor_positions.values():
# point["brush"] = pg.mkBrush(
# 255, 255, 255, max(50, point["brush"].color().alpha() * 0.95)
# )
last_brush = self.motor_positions[-1]["brush"].color().getRgb()
dimmer_brush = tuple(max(int(0.8 * c), 50) for c in last_brush[:3]) + (255,)
self.motor_positions[-1]["brush"] = pg.mkBrush(dimmer_brush)
# Add new point with full brightness
new_pos = (x, y)
@ -217,49 +217,6 @@ class MotorApp(QWidget):
brush=[point["brush"] for point in self.motor_positions],
)
# new_point = {"pos": key, "brush": pg.mkBrush(255, 255, 255, 255)}
# self.motor_positions[key] = new_point
#
# Update ScatterPlotItem
# positions = np.array([point["pos"] for point in self.motor_positions.values()])
# brushes = [point["brush"] for point in self.motor_positions.values()]
# self.scatter_plot_item.setData(pos=positions, brush=brushes)
# # Dim previous points
# for point in self.motor_positions.values():
# point["brush"] = pg.mkBrush(
# 255, 255, 255, max(50, point["brush"].color().alpha() * 0.95)
# )
#
# # Add new point with full brightness
# key = (x, y)
# new_point = {"pos": key, "brush": pg.mkBrush(255, 255, 255, 255)}
# self.motor_positions[key] = new_point
#
# # Update ScatterPlotItem
# positions = [point["pos"] for point in self.motor_positions.values()]
# brushes = [point["brush"] for point in self.motor_positions.values()]
# self.motor_map.setData(positions[0], positions[1], brush=brushes)
# def update_image_map(self, x, y):
# """Update the image map with the new motor position"""
#
# # Define the dimming factor
# dimming_factor = 0.95
#
# # Apply the dimming factor only to pixels above the background value
# self.image_map_data[self.image_map_data > 50] *= dimming_factor
#
# # Mapping of motor coordinates to pixel coordinates
# pixel_x = int(x - self.offset_x)
# pixel_y = int(y - self.offset_y)
#
# # Set the bright pixel at the new position
# self.image_map_data[pixel_x, pixel_y] = 255
#
# # Update the display
# self.image_map.updateImage(self.image_map_data, levels=(0, 255))
def update_all_motor_limits(
self, x_limit: list = None, y_limit: list = None
) -> None: # TODO will be moved to thread
@ -343,27 +300,25 @@ class MotorControl(QThread):
self.current_y,
) = self.get_coordinates() # TODO maybe not needed, coordinates always stored in redis
# Create consumer for samx and samy
self.samx_consumer = client.connector.consumer(
topics=[MessageEndpoints.device_readback("samx")],
cb=self._device_status_callback_samx,
self.active_devices = ["samx", "samy"]
self.motors_consumer = client.connector.consumer(
topics=[
MessageEndpoints.device_readback(self.active_devices[0]),
MessageEndpoints.device_readback(self.active_devices[1]),
],
cb=self._device_status_callback_motors,
parent=self,
)
self.samy_consumer = client.connector.consumer(
topics=[MessageEndpoints.device_readback("samy")],
cb=self._device_status_callback_samy,
parent=self,
)
self.samx_consumer.start()
self.samy_consumer.start()
self.motors_consumer.start()
def get_coordinates(self) -> tuple:
# TODO decide if the get_coordinates and retrieve coordinates makes sense
# TODO or move also to signal/slot as in the case of limits
# TODO or move just to redis CB function
"""Get current motor position"""
dev.samx.read(cached=True) # TODO to get last message ['value'] -> last position
x = dev.samx.position()
y = dev.samy.position()
return x, y
@ -432,7 +387,7 @@ class MotorControl(QThread):
def move_motor_coordinate(self) -> None:
"""Move the motor to the specified coordinates"""
scans.mv(
status = scans.mv(
dev.samx,
self.target_coordinates[0],
dev.samy,
@ -440,27 +395,27 @@ class MotorControl(QThread):
relative=False,
)
status.wait()
self.move_finished.emit()
def move_motor_relative(self, motor, value: float) -> None:
scans.mv(motor, value, relative=True)
status = scans.mv(motor, value, relative=True)
# TODO decide if one or multiple callback functions
@staticmethod
def _device_status_callback_samx(msg, *, parent, **_kwargs) -> None:
deviceMSG = BECMessage.DeviceMessage.loads(msg.value)
parent.current_x = deviceMSG.content["signals"]["samx"]["value"]
# print(f"samx moving: {parent.current_x,parent.current_y}")
parent.coordinates_updated.emit(parent.current_x, parent.current_y)
if deviceMSG.content["signals"]["samx_motor_is_moving"]["value"] == 0:
parent.move_finished.emit()
status.wait()
self.move_finished.emit()
def stop_movement(self):
queue.request_scan_abortion()
queue.request_queue_reset()
@staticmethod
def _device_status_callback_samy(msg, *, parent, **_kwargs) -> None:
def _device_status_callback_motors(msg, *, parent, **_kwargs) -> None:
deviceMSG = BECMessage.DeviceMessage.loads(msg.value)
parent.current_y = deviceMSG.content["signals"]["samy"]["value"]
# print(f"samy moving: {parent.current_x,parent.current_y}")
if parent.active_devices[0] in deviceMSG.content["signals"]:
parent.current_x = deviceMSG.content["signals"][parent.active_devices[0]]["value"]
elif parent.active_devices[1] in deviceMSG.content["signals"]:
parent.current_y = deviceMSG.content["signals"][parent.active_devices[1]]["value"]
parent.coordinates_updated.emit(parent.current_x, parent.current_y)
if deviceMSG.content["signals"]["samy_motor_is_moving"]["value"] == 0:
parent.move_finished.emit()
if __name__ == "__main__":
@ -476,6 +431,7 @@ if __name__ == "__main__":
client.start()
dev = client.device_manager.devices
scans = client.scans
queue = client.queue
app = QApplication([])
window = MotorApp()