mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2026-01-16 15:29:20 +01:00
flomni/check_tracker_signal
This commit is contained in:
@@ -44,6 +44,8 @@ class RtFlomniController(RtController):
|
|||||||
"laser_tracker_show_all",
|
"laser_tracker_show_all",
|
||||||
"show_signal_strength_interferometer",
|
"show_signal_strength_interferometer",
|
||||||
"read_ssi_interferometer",
|
"read_ssi_interferometer",
|
||||||
|
"laser_tracker_check_signalstrength",
|
||||||
|
"laser_tracker_check_enabled",
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
@@ -251,10 +253,15 @@ class RtFlomniController(RtController):
|
|||||||
def set_rotation_angle(self, val: float) -> None:
|
def set_rotation_angle(self, val: float) -> None:
|
||||||
self.socket_put(f"a{val/180*np.pi}")
|
self.socket_put(f"a{val/180*np.pi}")
|
||||||
|
|
||||||
def laser_tracker_on(self):
|
def laser_tracker_check_enabled(self) -> bool:
|
||||||
self.laser_update_tracker_info()
|
self.laser_update_tracker_info()
|
||||||
|
if self.tracker_info["enabled_z"] and self.tracker_info["enabled_y"]:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
if not self.tracker_info["enabled_z"] or not self.tracker_info["enabled_y"]:
|
def laser_tracker_on(self):
|
||||||
|
if not self.laser_tracker_check_enabled():
|
||||||
logger.info("Enabling the laser tracker. Please wait...")
|
logger.info("Enabling the laser tracker. Please wait...")
|
||||||
print("Enabling the laser tracker. Please wait...")
|
print("Enabling the laser tracker. Please wait...")
|
||||||
|
|
||||||
@@ -358,6 +365,32 @@ class RtFlomniController(RtController):
|
|||||||
val = float(self.socket_put_and_receive(f"j{axis_number}").strip())
|
val = float(self.socket_put_and_receive(f"j{axis_number}").strip())
|
||||||
return val
|
return val
|
||||||
|
|
||||||
|
def laser_tracker_check_signalstrength(self):
|
||||||
|
if not self.laser_tracker_check_enabled():
|
||||||
|
returnval = "disabled"
|
||||||
|
else:
|
||||||
|
returnval = "ok"
|
||||||
|
self.laser_tracker_wait_on_target()
|
||||||
|
|
||||||
|
signal = self.read_ssi_interferometer(1)
|
||||||
|
rtx = self.get_device_manager().devices.rtx
|
||||||
|
min_signal = rtx.user_parameter.get("min_signal")
|
||||||
|
low_signal = rtx.user_parameter.get("low_signal")
|
||||||
|
if signal < min_signal:
|
||||||
|
time.sleep(1)
|
||||||
|
if signal < min_signal:
|
||||||
|
print(
|
||||||
|
f"\x1b[91mThe signal of the tracker {signal} is below the minimum required signal of {min_signal}. Readjustment requred!\x1b[0m"
|
||||||
|
)
|
||||||
|
returnval = "toolow"
|
||||||
|
# raise RtError("The interferometer signal of tracker is too low.")
|
||||||
|
elif signal < low_signal:
|
||||||
|
print(
|
||||||
|
f"\x1b[91mThe signal of the tracker {signal} is below the warning limit of {low_signal}. Readjustment recommended!\x1b[0m"
|
||||||
|
)
|
||||||
|
returnval = "low"
|
||||||
|
return returnval
|
||||||
|
|
||||||
def show_signal_strength_interferometer(self):
|
def show_signal_strength_interferometer(self):
|
||||||
t = PrettyTable()
|
t = PrettyTable()
|
||||||
t.title = f"Interferometer signal strength"
|
t.title = f"Interferometer signal strength"
|
||||||
@@ -626,6 +659,7 @@ class RtFlomniMotor(Device, PositionerBase):
|
|||||||
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
|
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
|
||||||
)
|
)
|
||||||
self._update_connection_state()
|
self._update_connection_state()
|
||||||
|
self._stopped = False
|
||||||
|
|
||||||
# self.readback.subscribe(self._forward_readback, event_type=self.readback.SUB_VALUE)
|
# self.readback.subscribe(self._forward_readback, event_type=self.readback.SUB_VALUE)
|
||||||
if limits is not None:
|
if limits is not None:
|
||||||
@@ -696,14 +730,15 @@ class RtFlomniMotor(Device, PositionerBase):
|
|||||||
self.user_setpoint.put(position, wait=False)
|
self.user_setpoint.put(position, wait=False)
|
||||||
|
|
||||||
def move_and_finish():
|
def move_and_finish():
|
||||||
while not self.controller.slew_rate_limiters_on_target():
|
while not self.controller.slew_rate_limiters_on_target() and not self._stopped:
|
||||||
print("motor is moving")
|
print("motor is moving")
|
||||||
val = self.readback.read()
|
val = self.readback.read()
|
||||||
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
|
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
print("Move finished")
|
print("Move finished")
|
||||||
self._done_moving()
|
self._done_moving(success=(not self._stopped))
|
||||||
|
|
||||||
|
self._stopped = False
|
||||||
threading.Thread(target=move_and_finish, daemon=True).start()
|
threading.Thread(target=move_and_finish, daemon=True).start()
|
||||||
try:
|
try:
|
||||||
if wait:
|
if wait:
|
||||||
@@ -760,6 +795,7 @@ class RtFlomniMotor(Device, PositionerBase):
|
|||||||
|
|
||||||
def stop(self, *, success=False):
|
def stop(self, *, success=False):
|
||||||
self.controller.stop_all_axes()
|
self.controller.stop_all_axes()
|
||||||
|
self._stopped = True
|
||||||
return super().stop(success=success)
|
return super().stop(success=success)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user