Compare commits

..

1 Commits

Author SHA1 Message Date
6a4bfc73f6 fix(status): fix usage of Compare, Transition Status in delaygenerator integration
Some checks failed
CI for csaxs_bec / test (pull_request) Failing after 1m18s
CI for csaxs_bec / test (push) Successful in 1m23s
2025-11-30 22:27:52 +01:00
23 changed files with 485 additions and 712 deletions

View File

@@ -156,7 +156,7 @@ class flomniGuiTools:
)
self.progressbar.set_value([progress, subtomo_progress, 0])
if self.text_box is not None:
text = f"Progress report:\n Tomo type: ....................... {self.progress['tomo_type']}\n Projection: ...................... {self.progress['projection']:.0f}\n Total projections expected ....... {self.progress['total_projections']}\n Angle: ........................... {self.progress['angle']}\n Current subtomo: ................. {self.progress['subtomo']}\n Current projection within subtomo: {self.progress['subtomo_projection']}\n Total projections per subtomo: ... {self.progress['subtomo_total_projections']}"
text = f"Progress report:\n Tomo type: ....................... {self.progress['tomo_type']}\n Projection: ...................... {self.progress['projection']:.0f}\n Total projections expected ....... {self.progress['total_projections']}\n Angle: ........................... {self.progress['angle']}\n Current subtomo: ................. {self.progress['subtomo']}\n Current projection within subtomo: {self.progress['subtomo_projection']}\n Total projections per subtomo: ... {self.progress['subtomo_total_projections']}"
self.text_box.set_plain_text(text)

View File

@@ -17,7 +17,7 @@ umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
if TYPE_CHECKING:
from bec_ipython_client.plugins.flomni.flomni import Flomni
from bec_ipython_client.plugins.flomni import Flomni
class XrayEyeAlign:

View File

@@ -365,18 +365,6 @@ rtz:
readOnly: false
readoutPriority: on_request
rt_flyer:
description: flomni rt flyer
deviceClass: csaxs_bec.devices.omny.rt.rt_flomni_ophyd.RtFlomniFlyer
deviceConfig:
host: mpc2844.psi.ch
port: 2222
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: async
############################################################
####################### Cameras ############################
############################################################

View File

@@ -37,8 +37,7 @@ import traceback
from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
from ophyd import DeviceStatus
from ophyd_devices import CompareStatus, TransitionStatus
from ophyd_devices import CompareStatus, DeviceStatus, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (

View File

@@ -143,7 +143,7 @@ class StatusBitsCompareStatus(SubscriptionStatus):
self._add_delay = add_delay or 0
self._raise_states = raise_states or []
super().__init__(
device=signal,
obj=signal,
callback=self._compare_callback,
timeout=timeout,
settle_time=settle_time,

View File

@@ -412,11 +412,10 @@ class NPointAxis(Device, PositionerBase):
sign=1,
socket_cls=SocketIO,
tolerance: float = 0.05,
device_manager=None,
**kwargs,
):
self.controller = NPointController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
socket_cls=socket_cls, socket_host=host, socket_port=port
)
self.axis_Id = axis_Id
self.sign = sign

View File

@@ -175,7 +175,7 @@ class FlomniGalilMotor(Device, PositionerBase):
**kwargs,
):
self.controller = FlomniGalilController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
socket_cls=socket_cls, socket_host=host, socket_port=port
)
self.axis_Id = axis_Id
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
@@ -212,10 +212,6 @@ class FlomniGalilMotor(Device, PositionerBase):
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
def wait_for_connection(self, timeout: int = 10, **kwargs) -> None:
"""Wait for the device to be connected."""
self.controller.on(timeout=timeout)
@property
def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get())
@@ -346,10 +342,10 @@ class FlomniGalilMotor(Device, PositionerBase):
Drive an axis to the limit in a specified direction.
Args:
direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'.
direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'.
"""
self.controller.drive_axis_to_limit(self.axis_Id_numeric, direction)
# now force position read to cache
#now force position read to cache
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())

View File

@@ -149,7 +149,7 @@ class FuprGalilMotor(Device, PositionerBase):
**kwargs,
):
self.controller = FuprGalilController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
socket_cls=socket_cls, socket_host=host, socket_port=port
)
self.axis_Id = axis_Id
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
@@ -185,10 +185,6 @@ class FuprGalilMotor(Device, PositionerBase):
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
def wait_for_connection(self, timeout: int = 10, **kwargs) -> None:
"""Wait for the device to be connected."""
self.controller.on(timeout=timeout)
@property
def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get())

View File

@@ -73,7 +73,6 @@ class LamniGalilController(GalilController):
air_off = bool(self.socket_put_and_receive("MG@OUT[13]"))
return rt_not_blocked_by_galil and air_off
class LamniGalilReadbackSignal(GalilSignalRO):
@retry_once
@threadlocked
@@ -100,7 +99,6 @@ class LamniGalilReadbackSignal(GalilSignalRO):
logger.warning("Failed to set RT value during readback.")
return val
class LamniGalilMotor(Device, PositionerBase):
USER_ACCESS = ["controller", "drive_axis_to_limit", "find_reference"]
readback = Cpt(LamniGalilReadbackSignal, signal_name="readback", kind="hinted")
@@ -134,7 +132,7 @@ class LamniGalilMotor(Device, PositionerBase):
**kwargs,
):
self.controller = LamniGalilController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
socket_cls=socket_cls, socket_host=host, socket_port=port
)
self.axis_Id = axis_Id
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
@@ -170,10 +168,6 @@ class LamniGalilMotor(Device, PositionerBase):
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
def wait_for_connection(self, timeout: int = 10, **kwargs) -> None:
"""Wait for the device to be connected."""
self.controller.on(timeout=timeout)
@property
def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get())
@@ -298,7 +292,7 @@ class LamniGalilMotor(Device, PositionerBase):
Find the reference of the axis.
"""
self.controller.find_reference(self.axis_Id_numeric)
# now force position read to cache
#now force position read to cache
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
@@ -307,10 +301,10 @@ class LamniGalilMotor(Device, PositionerBase):
Drive an axis to the limit in a specified direction.
Args:
direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'.
direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'.
"""
self.controller.drive_axis_to_limit(self.axis_Id_numeric, direction)
# now force position read to cache
#now force position read to cache
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())

View File

@@ -46,7 +46,7 @@ class GalilMotorResolution(GalilSignalRO):
@threadlocked
def _socket_get(self):
if self.controller.sock.port == 8083 and self.parent.axis_Id_numeric == 2:
# rotation stage
# rotation stage
return 89565.8666667
else:
return 51200
@@ -69,43 +69,37 @@ class OMNYGalilReadbackSignal(GalilSignalRO):
current_pos = float(self.controller.socket_put_and_receive(f"TP{self.parent.axis_Id}"))
current_pos *= self.parent.sign
step_mm = self.parent.motor_resolution.get()
# here we introduce an offset of 25 to the rotation axis
# when setting a position this is taken into account in the controller
# that way we just do tomography from 0 to 180 degrees
#here we introduce an offset of 25 to the rotation axis
#when setting a position this is taken into account in the controller
#that way we just do tomography from 0 to 180 degrees
if self.parent.axis_Id_numeric == 2 and self.controller.sock.port == 8083:
return (current_pos / step_mm) + 25
return (current_pos / step_mm)+25
else:
return current_pos / step_mm
def read(self):
self._metadata["timestamp"] = time.time()
val = super().read()
# if reading rotation stage angle
#if reading rotation stage angle
if self.parent.axis_Id_numeric == 2 and self.controller.sock.port == 8083:
current_readback_value = val[self.parent.name]["value"]
# print (f"previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}.")
#print (f"previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}.")
if np.fabs((self.previous_rotation_angle - current_readback_value) > 10):
if np.fabs((self.previous_rotation_angle-current_readback_value)>10):
message = f"Glitch detected in rotation stage. Previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}."
print(message)
self.parent.device_manager.connector.send_client_info(
message, scope="glitch detector", show_asap=True
)
self.parent.device_manager.connector.send_client_info(message, scope="glitch detector", show_asap=True)
val = super().read()
current_readback_value = val[self.parent.name]["value"]
if np.fabs((self.previous_rotation_angle - current_readback_value) > 10):
if np.fabs((self.previous_rotation_angle-current_readback_value)>10):
message = f"Glitch detected in rotation stage second read. Previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}. Disabling the controller."
print(message)
self.parent.device_manager.connector.send_client_info(
message, scope="glitch detector", show_asap=True
)
self.parent.device_manager.devices["osamroy"].obj.controller.socket_put_confirmed(
"allaxref=0"
)
self.parent.device_manager.connector.send_client_info(message, scope="glitch detector", show_asap=True)
self.parent.device_manager.devices["osamroy"].obj.controller.socket_put_confirmed("allaxref=0")
self.parent.device_manager.devices["osamroy"].obj.enabled = False
return val
@@ -114,12 +108,13 @@ class OMNYGalilReadbackSignal(GalilSignalRO):
try:
rt = self.parent.device_manager.devices["rtx"]
if rt.enabled:
rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"] - 25 + 54)
rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"]-25+54)
except KeyError:
logger.warning("Failed to set RT value during ogalil readback.")
logger.warning("Failed to set RT value during ogalil readback.")
return val
class OMNYGalilController(GalilController):
USER_ACCESS = [
"describe",
@@ -137,18 +132,18 @@ class OMNYGalilController(GalilController):
"_ogalil_folerr_not_ignore",
]
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
def on(self, timeout: int = 10) -> None:
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
def on(self) -> None:
"""Open a new socket connection to the controller"""
self._ogalil_switchsocket_switch_all_on()
time.sleep(0.3)
super().on(timeout=timeout)
super().on()
def _ogalil_switchsocket(self, number: int, switch: bool):
# number is socket number ranging from 1 to 4
@@ -190,16 +185,15 @@ class OMNYGalilController(GalilController):
self.socket_put_confirmed("IgNoFol=1")
self.socket_put_confirmed("XQ#STOP,1")
def _ogalil_set_axis_to_pos_wo_reference_search(
self, axis_id_numeric, axis_id, pos_mm, motor_resolution, motor_sign
):
def _ogalil_set_axis_to_pos_wo_reference_search(self, axis_id_numeric, axis_id, pos_mm, motor_resolution, motor_sign):
self.socket_put_confirmed("IgNoFol=1")
# pos_mm = pos_encoder / motor_resolution
pos_encoder = pos_mm * motor_resolution * motor_sign
# print(motor_resolution)
#print(motor_resolution)
self.socket_put_confirmed(f"DE{axis_id}={pos_encoder:.0f}")
self.socket_put_confirmed(f"DP{axis_id}=_TP{axis_id}*ratio[{axis_id_numeric:.0f}]")
@@ -209,6 +203,7 @@ class OMNYGalilController(GalilController):
self._ogalil_folerr_not_ignore()
def _ogalil_folerr_not_ignore(self):
self.socket_put_confirmed("IgNoFol=0")
@@ -245,18 +240,7 @@ class OMNYGalilController(GalilController):
class OMNYGalilMotor(Device, PositionerBase):
USER_ACCESS = [
"controller",
"find_reference",
"omny_osamx_to_scan_center",
"drive_axis_to_limit",
"_ogalil_folerr_reset_and_ignore",
"_ogalil_set_axis_to_pos_wo_reference_search",
"get_motor_limit_switch",
"axis_is_referenced",
"get_motor_temperature",
"folerr_status",
]
USER_ACCESS = ["controller", "find_reference", "omny_osamx_to_scan_center", "drive_axis_to_limit", "_ogalil_folerr_reset_and_ignore", "_ogalil_set_axis_to_pos_wo_reference_search", "get_motor_limit_switch", "axis_is_referenced", "get_motor_temperature", "folerr_status"]
readback = Cpt(OMNYGalilReadbackSignal, signal_name="readback", kind="hinted")
user_setpoint = Cpt(GalilSetpointSignal, signal_name="setpoint")
motor_resolution = Cpt(GalilMotorResolution, signal_name="resolution", kind="config")
@@ -288,7 +272,7 @@ class OMNYGalilMotor(Device, PositionerBase):
**kwargs,
):
self.controller = OMNYGalilController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
socket_cls=socket_cls, socket_host=host, socket_port=port
)
self.axis_Id = axis_Id
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
@@ -324,10 +308,6 @@ class OMNYGalilMotor(Device, PositionerBase):
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
def wait_for_connection(self, timeout: int = 10, **kwargs) -> None:
"""Wait for the device to be connected."""
self.controller.on(timeout=timeout)
@property
def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get())
@@ -453,10 +433,8 @@ class OMNYGalilMotor(Device, PositionerBase):
def _ogalil_set_axis_to_pos_wo_reference_search(self, pos_mm):
motor_resolution = self.motor_resolution.get()
self.controller._ogalil_set_axis_to_pos_wo_reference_search(
self.axis_Id_numeric, self.axis_Id, pos_mm, motor_resolution, self.sign
)
# now force position read to cache
self.controller._ogalil_set_axis_to_pos_wo_reference_search(self.axis_Id_numeric, self.axis_Id, pos_mm, motor_resolution, self.sign)
#now force position read to cache
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
@@ -464,9 +442,9 @@ class OMNYGalilMotor(Device, PositionerBase):
"""
Find the reference of the axis.
"""
verbose = 1
verbose=1
self.controller.find_reference(self.axis_Id_numeric, verbose, raise_error)
# now force position read to cache
#now force position read to cache
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
@@ -475,10 +453,10 @@ class OMNYGalilMotor(Device, PositionerBase):
Drive an axis to the limit in a specified direction.
Args:
direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'.
direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'.
"""
self.controller.drive_axis_to_limit(self.axis_Id_numeric, direction, verbose=1)
# now force position read to cache
#now force position read to cache
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
@@ -509,31 +487,29 @@ class OMNYGalilMotor(Device, PositionerBase):
def omny_osamx_to_scan_center(self, cenx):
if self.controller.sock.port == 8082 and self.axis_Id_numeric == 0:
# get last setpoint
osamx = self.device_manager.devices["osamx"]
osamx_current_setpoint = osamx.obj.readback.get()
omny_samx_in = self._get_user_param_safe("osamx", "in")
if np.fabs(osamx_current_setpoint - (omny_samx_in + cenx / 1000)) > 0.025:
message = (
f"Moving osamx to scan center. new osamx target {omny_samx_in+cenx/1000:.3f}."
)
logger.info(message)
osamx = self.device_manager.devices["osamx"]
osamx_current_setpoint = osamx.obj.readback.get()
omny_samx_in = self._get_user_param_safe("osamx","in")
if np.fabs(osamx_current_setpoint-(omny_samx_in+cenx/1000)) > 0.025:
message=f"Moving osamx to scan center. new osamx target {omny_samx_in+cenx/1000:.3f}."
logger.info(message)
osamx.read_only = False
# osamx.controller.("osamx", "controller.socket_put_confirmed('axspeed[0]=1000')")
osamx.set(omny_samx_in + cenx / 1000)
time.sleep(0.1)
while osamx.motor_is_moving.get():
time.sleep(0.05)
osamx.read_only = True
time.sleep(2)
rt = self.device_manager.devices["rtx"]
if rt.enabled:
rt.obj.controller.laser_tracker_on()
rt.obj.controller.laser_tracker_check_and_wait_for_signalstrength()
osamx.read_only = False
#osamx.controller.("osamx", "controller.socket_put_confirmed('axspeed[0]=1000')")
osamx.set(omny_samx_in+cenx/1000)
time.sleep(0.1)
while(osamx.motor_is_moving.get()):
time.sleep(0.05)
osamx.read_only = True
time.sleep(2)
rt = self.device_manager.devices["rtx"]
if rt.enabled:
rt.obj.controller.laser_tracker_on()
rt.obj.controller.laser_tracker_check_and_wait_for_signalstrength()
def folerr_status(self) -> bool:
return self.controller.folerr_status(self.axis_Id_numeric)
def stop(self, *, success=False):
self.controller.stop_all_axes()
return super().stop(success=success)

View File

@@ -52,12 +52,33 @@ class GalilController(Controller):
"fly_grid_scan",
"read_encoder_position",
]
_axes_per_controller = 8
def on(self, timeout: int = 10) -> None:
def __init__(
self,
*,
name="GalilController",
kind=None,
parent=None,
socket=None,
attr_name="",
labels=None,
):
if not hasattr(self, "_initialized") or not self._initialized:
self._galil_axis_per_controller = 8
self._axis = [None for axis_num in range(self._galil_axis_per_controller)]
super().__init__(
name=name,
socket=socket,
attr_name=attr_name,
parent=parent,
labels=labels,
kind=kind,
)
def on(self, controller_num=0) -> None:
"""Open a new socket connection to the controller"""
if not self.connected:
self.sock.open(timeout=timeout)
self.sock.open()
self.connected = True
else:
logger.info("The connection has already been established.")
@@ -144,11 +165,11 @@ class GalilController(Controller):
def show_running_threads(self) -> None:
t = PrettyTable()
t.title = f"Threads on {self.sock.host}:{self.sock.port}"
t.field_names = [str(ax) for ax in range(self._axes_per_controller)]
t.field_names = [str(ax) for ax in range(self._galil_axis_per_controller)]
t.add_row(
[
"active" if self.is_thread_active(t) else "inactive"
for t in range(self._axes_per_controller)
for t in range(self._galil_axis_per_controller)
]
)
print(t)
@@ -178,7 +199,7 @@ class GalilController(Controller):
"Limits",
"Position",
]
for ax in range(self._axes_per_controller):
for ax in range(self._galil_axis_per_controller):
axis = self._axis[ax]
if axis is not None:
t.add_row(
@@ -495,9 +516,7 @@ class SGalilMotor(Device, PositionerBase):
):
self.axis_Id = axis_Id
self.sign = sign
self.controller = GalilController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
)
self.controller = GalilController(socket=socket_cls(host=host, port=port))
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.tolerance = kwargs.pop("tolerance", 0.5)
self.device_mapping = kwargs.pop("device_mapping", {})
@@ -530,10 +549,6 @@ class SGalilMotor(Device, PositionerBase):
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
def wait_for_connection(self, timeout: int = 10, **kwargs) -> None:
"""Wait for the device to be connected."""
self.controller.on(timeout=timeout)
@property
def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get())

View File

@@ -1,16 +1,14 @@
from __future__ import annotations
import threading
import time
from typing import TYPE_CHECKING, Literal
from typing import List
import numpy as np
from bec_lib.logger import bec_logger
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, PositionerBase, Signal
from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError
from ophyd_devices.utils.bec_signals import AsyncMultiSignal, ProgressSignal
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected
from prettytable import PrettyTable
@@ -25,9 +23,6 @@ from csaxs_bec.devices.omny.rt.rt_ophyd import (
retry_once,
)
if TYPE_CHECKING:
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
logger = bec_logger.logger
@@ -62,7 +57,6 @@ class RtFlomniController(Controller):
socket_cls=None,
socket_host=None,
socket_port=None,
device_manager=None,
attr_name="",
parent=None,
labels=None,
@@ -73,7 +67,6 @@ class RtFlomniController(Controller):
socket_cls=socket_cls,
socket_host=socket_host,
socket_port=socket_port,
device_manager=device_manager,
attr_name=attr_name,
parent=parent,
labels=labels,
@@ -174,10 +167,10 @@ class RtFlomniController(Controller):
rtx = self.get_device_manager().devices.rtx
rtx.update_user_parameter({"rt_pid_voltage": self.rt_pid_voltage})
self.set_device_read_write("fsamx", False)
self.set_device_read_write("fsamy", False)
self.set_device_read_write("foptx", False)
self.set_device_read_write("fopty", False)
self.set_device_enabled("fsamx", False)
self.set_device_enabled("fsamy", False)
self.set_device_enabled("foptx", False)
self.set_device_enabled("fopty", False)
def move_samx_to_scan_region(self, fovx: float, cenx: float):
time.sleep(0.05)
@@ -230,20 +223,20 @@ class RtFlomniController(Controller):
print("Feedback is not running; likely an error in the interferometer.")
raise RtError("Feedback is not running; likely an error in the interferometer.")
self.set_device_read_write("fsamx", False)
self.set_device_read_write("fsamy", False)
self.set_device_read_write("foptx", False)
self.set_device_read_write("fopty", False)
self.set_device_enabled("fsamx", False)
self.set_device_enabled("fsamy", False)
self.set_device_enabled("foptx", False)
self.set_device_enabled("fopty", False)
def feedback_disable(self):
self.clear_trajectory_generator()
self.move_to_zero()
self.socket_put("l0")
self.set_device_read_write("fsamx", True)
self.set_device_read_write("fsamy", True)
self.set_device_read_write("foptx", True)
self.set_device_read_write("fopty", True)
self.set_device_enabled("fsamx", True)
self.set_device_enabled("fsamy", True)
self.set_device_enabled("foptx", True)
self.set_device_enabled("fopty", True)
fsamx = self.get_device_manager().devices.fsamx
fsamx.obj.controller.socket_put_confirmed("axspeed[4]=025*stppermm[4]")
@@ -275,7 +268,8 @@ class RtFlomniController(Controller):
self.laser_update_tracker_info()
if self.tracker_info["enabled_z"] and self.tracker_info["enabled_y"]:
return True
return False
else:
return False
def laser_tracker_on(self):
if not self.laser_tracker_check_enabled():
@@ -387,61 +381,66 @@ class RtFlomniController(Controller):
val = float(self.socket_put_and_receive(f"j{axis_number}").strip())
return val
def laser_tracker_check_signalstrength(self) -> Literal["ok", "low", "toolow", "disabled"]:
"""
Check the signal strength of the laser tracker interferometer.
Returns:
str: "ok" if signal is above low limit, "low" if signal is below low limit but above min limit,
"toolow" if signal is below min limit, "disabled" if tracker is not enabled.
"""
def laser_tracker_check_signalstrength(self):
if not self.laser_tracker_check_enabled():
return "disabled"
returnval = "disabled"
else:
returnval = "ok"
self.laser_tracker_wait_on_target()
returnval = "ok"
self.laser_tracker_wait_on_target()
signal = self.read_ssi_interferometer(1)
rtx = self.get_device_manager().devices.rtx
min_signal = rtx.user_parameter.get("min_signal")
low_signal = rtx.user_parameter.get("low_signal")
print(f"low signal: {low_signal}")
print(f"min signal: {min_signal}")
print(f"signal: {signal}")
if signal < min_signal:
time.sleep(1)
signal = self.read_ssi_interferometer(1)
rtx = self.get_device_manager().devices.rtx
min_signal = rtx.user_parameter.get("min_signal")
low_signal = rtx.user_parameter.get("low_signal")
print(f"low signal: {low_signal}")
print(f"min signal: {min_signal}")
print(f"signal: {signal}")
if signal < min_signal:
time.sleep(1)
if signal < min_signal:
print(
f"\x1b[91mThe signal of the tracker {signal} is below the minimum required signal of {min_signal}. Readjustment requred!\x1b[0m"
)
returnval = "toolow"
# raise RtError("The interferometer signal of tracker is too low.")
elif signal < low_signal:
print(
f"\x1b[91mThe signal of the tracker {signal} is below the minimum required signal of {min_signal}. Readjustment requred!\x1b[0m"
f"\x1b[91mThe signal of the tracker {signal} is below the warning limit of {low_signal}. Readjustment recommended!\x1b[0m"
)
returnval = "toolow"
# raise RtError("The interferometer signal of tracker is too low.")
elif signal < low_signal:
print(
f"\x1b[91mThe signal of the tracker {signal} is below the warning limit of {low_signal}. Readjustment recommended!\x1b[0m"
)
returnval = "low"
returnval = "low"
return returnval
def show_signal_strength_interferometer(self):
"""
Show the signal strength of all four axes of the interferometer.
"""
t = PrettyTable()
t.title = "Interferometer signal strength"
t.title = f"Interferometer signal strength"
t.field_names = ["Axis", "Value"]
for i in range(4):
t.add_row([i, self.read_ssi_interferometer(i)])
print(t)
def _get_signals_from_table(self, return_table) -> dict:
self.average_stdeviations_x_st_fzp += float(return_table[4])
self.average_stdeviations_y_st_fzp += float(return_table[7])
signals = {
"target_x": {"value": float(return_table[2])},
"average_x_st_fzp": {"value": float(return_table[3])},
"stdev_x_st_fzp": {"value": float(return_table[4])},
"target_y": {"value": float(return_table[5])},
"average_y_st_fzp": {"value": float(return_table[6])},
"stdev_y_st_fzp": {"value": float(return_table[7])},
"average_rotz": {"value": float(return_table[8])},
"stdev_rotz": {"value": float(return_table[9])},
"average_stdeviations_x_st_fzp": {
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
},
"average_stdeviations_y_st_fzp": {
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
},
}
return signals
@threadlocked
def start_scan(self):
"""
Start the scan. Before starting the scan, it is checked that the feedback loop is running.
Furthermore, it is checked that at least one target position is planned.
Raises:
RtError: Raised if feedback loop is not running or if no target positions are planned.
"""
if not self.feedback_is_running():
logger.error(
"Cannot start scan because feedback loop is not running or there is an"
@@ -452,23 +451,18 @@ class RtFlomniController(Controller):
" interferometer error."
)
# here exception
(_mode, number_of_positions_planned, _current_position_in_scan) = self.get_scan_status()
(mode, number_of_positions_planned, current_position_in_scan) = self.get_scan_status()
if number_of_positions_planned == 0:
logger.error("Cannot start scan because no target positions are planned.")
raise RtError("Cannot start scan because no target positions are planned.")
# hier exception
# start a point-by-point scan (for cont scan in flomni it would be "sa")
self.socket_put_and_receive("sd")
@retry_once
@threadlocked
def get_scan_status(self) -> tuple[int, int, int]:
"""
Get the current scan status of the controller.
Returns:
tuple: A tuple containing the mode, number of positions planned, and current position in scan.
"""
def get_scan_status(self):
return_table = (self.socket_put_and_receive("sr")).split(",")
if len(return_table) != 3:
raise RtCommunicationError(
@@ -484,15 +478,94 @@ class RtFlomniController(Controller):
current_position_in_scan = int(float(return_table[2]))
return (mode, number_of_positions_planned, current_position_in_scan)
def get_device_manager(self) -> DeviceManagerDS:
"""
Helper function to get the device manager from one of the axes.
"""
def get_device_manager(self):
for axis in self._axis:
if hasattr(axis, "device_manager") and axis.device_manager: # type: ignore
return axis.device_manager # type: ignore
if hasattr(axis, "device_manager") and axis.device_manager:
return axis.device_manager
raise BECConfigError("Could not access the device_manager")
def read_positions_from_sampler(self):
# this was for reading after the scan completed
number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read
read_counter = 0
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
# if not (mode==2 or mode==3):
# error
self.get_device_manager().connector.set(
MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage(
device="rt_scan", status=1, metadata=self.readout_metadata
),
)
# while scan is running
while mode > 0:
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
time.sleep(0.01)
if current_position_in_scan > 5:
while current_position_in_scan > read_counter + 1:
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
# logger.info(f"{return_table}")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
time.sleep(0.05)
# read the last samples even though scan is finished already
while number_of_positions_planned > read_counter:
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
# logger.info(f"{return_table}")
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
self.get_device_manager().connector.set(
MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage(
device="rt_scan", status=0, metadata=self.readout_metadata
),
)
logger.info(
"Flomni statistics: Average of all standard deviations: x"
f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y"
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}"
)
def publish_device_data(self, signals, point_id):
self.get_device_manager().connector.set_and_publish(
MessageEndpoints.device_read("rt_flomni"),
messages.DeviceMessage(
signals=signals, metadata={"point_id": point_id, **self.readout_metadata}
),
)
def start_readout(self):
readout = threading.Thread(target=self.read_positions_from_sampler)
readout.start()
def kickoff(self, metadata):
self.readout_metadata = metadata
while not self._min_scan_buffer_reached:
time.sleep(0.001)
self.start_scan()
time.sleep(0.1)
self.start_readout()
class RtFlomniReadbackSignal(RtReadbackSignal):
@retry_once
@@ -585,7 +658,7 @@ class RtFlomniMotor(Device, PositionerBase):
self.axis_Id = axis_Id
self.sign = sign
self.controller = RtFlomniController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
socket_cls=socket_cls, socket_host=host, socket_port=port
)
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.device_manager = device_manager
@@ -613,10 +686,6 @@ class RtFlomniMotor(Device, PositionerBase):
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
def wait_for_connection(self, timeout: int = 10, **kwargs) -> None:
"""Wait for the device to be connected."""
self.controller.on(timeout=timeout)
@property
def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get())
@@ -728,6 +797,9 @@ class RtFlomniMotor(Device, PositionerBase):
else:
raise TypeError(f"Expected value of type int but received {type(val)}")
def kickoff(self, metadata, **kwargs) -> None:
self.controller.kickoff(metadata)
@property
def egu(self):
"""The engineering units (EGU) for positions"""
@@ -739,181 +811,9 @@ class RtFlomniMotor(Device, PositionerBase):
return super().stop(success=success)
class RtFlomniFlyer(Device):
USER_ACCESS = ["controller"]
data = Cpt(
AsyncMultiSignal,
name="data",
signals=[
"target_x",
"average_x_st_fzp",
"stdev_x_st_fzp",
"target_y",
"average_y_st_fzp",
"stdev_y_st_fzp",
"average_rotz",
"stdev_rotz",
"average_stdeviations_x_st_fzp",
"average_stdeviations_y_st_fzp",
],
ndim=0,
max_size=10000,
async_update={"type": "add", "max_shape": [None]},
)
progress = Cpt(ProgressSignal, name="progress")
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
host="mpc2844.psi.ch",
port=2222,
socket_cls=SocketIO,
device_manager=None,
**kwargs,
):
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.shutdown_event = threading.Event()
self.controller = RtFlomniController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
)
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
def read_positions_from_sampler(self, status: DeviceStatus):
"""
Read the positions from the sampler and update the data signal.
This function runs in a separate thread and continuously checks the
scan status.
Args:
status (DeviceStatus): The status object to update when the readout is complete.
"""
read_counter = 0
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
mode, number_of_positions_planned, current_position_in_scan = (
self.controller.get_scan_status()
)
# while scan is running
while mode > 0 and not self.shutdown_event.wait(0.01):
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
mode, number_of_positions_planned, current_position_in_scan = (
self.controller.get_scan_status()
)
if current_position_in_scan > 5:
while current_position_in_scan > read_counter + 1:
return_table = (
self.controller.socket_put_and_receive(f"r{read_counter}")
).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.data.set(signals)
self.progress.put(
value=read_counter,
max_value=number_of_positions_planned,
done=(number_of_positions_planned == read_counter),
)
logger.info(f"Updating progress: {read_counter} / {number_of_positions_planned}")
if self.shutdown_event.wait(0.05):
logger.info("Shutdown event set, stopping readout.")
# if we are here, the shutdown_event is set. We can exit the readout loop.
status.set_finished()
return
# read the last samples even though scan is finished already
while number_of_positions_planned > read_counter and not self.shutdown_event.is_set():
return_table = (self.controller.socket_put_and_receive(f"r{read_counter}")).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
logger.info(f"Setting data: {signals}")
self.data.set(signals)
self.progress.put(
value=read_counter,
max_value=number_of_positions_planned,
done=(number_of_positions_planned == read_counter),
)
logger.info(f"Updating progress: {read_counter} / {number_of_positions_planned}")
# NOTE: No need to set the status to failed if the shutdown_event is set.
# The stop() method will take care of that.
status.set_finished()
if read_counter != 0:
logger.info(
"Flomni statistics: Average of all standard deviations: x"
f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y"
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}"
)
def _get_signals_from_table(self, return_table) -> dict:
self.average_stdeviations_x_st_fzp += float(return_table[4])
self.average_stdeviations_y_st_fzp += float(return_table[7])
signals = {
f"{self.name}_data_target_x": {"value": float(return_table[2])},
f"{self.name}_data_average_x_st_fzp": {"value": float(return_table[3])},
f"{self.name}_data_stdev_x_st_fzp": {"value": float(return_table[4])},
f"{self.name}_data_target_y": {"value": float(return_table[5])},
f"{self.name}_data_average_y_st_fzp": {"value": float(return_table[6])},
f"{self.name}_data_stdev_y_st_fzp": {"value": float(return_table[7])},
f"{self.name}_data_average_rotz": {"value": float(return_table[8])},
f"{self.name}_data_stdev_rotz": {"value": float(return_table[9])},
f"{self.name}_data_average_stdeviations_x_st_fzp": {
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
},
f"{self.name}_data_average_stdeviations_y_st_fzp": {
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
},
}
return signals
def start_readout(self, status: DeviceStatus):
logger.info("Starting readout thread.")
readout = threading.Thread(target=self.read_positions_from_sampler, args=(status,))
readout.start()
def kickoff(self) -> DeviceStatus:
self.shutdown_event.clear()
while not self.controller._min_scan_buffer_reached and not self.shutdown_event.wait(0.001):
...
self.controller.start_scan()
self.shutdown_event.wait(0.1)
status = DeviceStatus(self)
self.start_readout(status)
return status
def stop(self, *, success=False):
self.shutdown_event.set()
return super().stop(success=success)
if __name__ == "__main__":
rtcontroller = RtFlomniController(
socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None
socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222
)
rtcontroller.on()
rtcontroller.laser_tracker_on()

View File

@@ -71,7 +71,6 @@ class RtLamniController(Controller):
socket_cls=None,
socket_host=None,
socket_port=None,
device_manager=None,
attr_name="",
parent=None,
labels=None,
@@ -82,7 +81,6 @@ class RtLamniController(Controller):
socket_cls=socket_cls,
socket_host=socket_host,
socket_port=socket_port,
device_manager=device_manager,
attr_name=attr_name,
parent=parent,
labels=labels,
@@ -94,11 +92,11 @@ class RtLamniController(Controller):
def feedback_disable(self):
self.socket_put("J0")
logger.info("LamNI Feedback disabled.")
self.set_device_read_write("lsamx", True)
self.set_device_read_write("lsamy", True)
self.set_device_read_write("loptx", True)
self.set_device_read_write("lopty", True)
self.set_device_read_write("loptz", True)
self.set_device_enabled("lsamx", True)
self.set_device_enabled("lsamy", True)
self.set_device_enabled("loptx", True)
self.set_device_enabled("lopty", True)
self.set_device_enabled("loptz", True)
def is_axis_moving(self, axis_Id) -> bool:
# this checks that axis is on target
@@ -152,25 +150,25 @@ class RtLamniController(Controller):
# set these as closed loop target position
self.socket_put(f"pa0,{x_curr:.4f}")
self.socket_put(f"pa1,{y_curr:.4f}")
self.dm.devices.rtx.obj.user_setpoint.set_with_feedback_disabled(x_curr)
self.dm.devices.rty.obj.user_setpoint.set_with_feedback_disabled(y_curr)
self.get_device_manager().devices.rtx.obj.user_setpoint.set_with_feedback_disabled(x_curr)
self.get_device_manager().devices.rty.obj.user_setpoint.set_with_feedback_disabled(y_curr)
self.socket_put("J5")
logger.info("LamNI Feedback enabled (without reset).")
self.set_device_read_write("lsamx", False)
self.set_device_read_write("lsamy", False)
self.set_device_read_write("loptx", False)
self.set_device_read_write("lopty", False)
self.set_device_read_write("loptz", False)
self.set_device_enabled("lsamx", False)
self.set_device_enabled("lsamy", False)
self.set_device_enabled("loptx", False)
self.set_device_enabled("lopty", False)
self.set_device_enabled("loptz", False)
@threadlocked
def feedback_disable_and_even_reset_lamni_angle_interferometer(self):
self.socket_put("J6")
logger.info("LamNI Feedback disabled including the angular interferometer.")
self.set_device_read_write("lsamx", True)
self.set_device_read_write("lsamy", True)
self.set_device_read_write("loptx", True)
self.set_device_read_write("lopty", True)
self.set_device_read_write("loptz", True)
self.set_device_enabled("lsamx", True)
self.set_device_enabled("lsamy", True)
self.set_device_enabled("loptx", True)
self.set_device_enabled("lopty", True)
self.set_device_enabled("loptz", True)
@threadlocked
def clear_trajectory_generator(self):
@@ -407,11 +405,11 @@ class RtLamniController(Controller):
(self.socket_put_and_receive("J2")).split(",")[0]
)
self.set_device_read_write("lsamx", False)
self.set_device_read_write("lsamy", False)
self.set_device_read_write("loptx", False)
self.set_device_read_write("lopty", False)
self.set_device_read_write("loptz", False)
self.set_device_enabled("lsamx", False)
self.set_device_enabled("lsamy", False)
self.set_device_enabled("loptx", False)
self.set_device_enabled("lopty", False)
self.set_device_enabled("loptz", False)
if interferometer_feedback_not_running == 1:
logger.error(
@@ -561,7 +559,7 @@ class RtLamniMotor(Device, PositionerBase):
self.axis_Id = axis_Id
self.sign = sign
self.controller = RtLamniController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
socket_cls=socket_cls, socket_host=host, socket_port=port
)
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.device_manager = device_manager
@@ -588,10 +586,6 @@ class RtLamniMotor(Device, PositionerBase):
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
def wait_for_connection(self, timeout: int = 10, **kwargs) -> None:
"""Wait for the device to be connected."""
self.controller.on(timeout=timeout)
@property
def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get())

View File

@@ -1,8 +1,8 @@
import builtins
import socket
import threading
import time
from typing import List
import builtins
import socket
import numpy as np
from bec_lib import bec_logger, messages
@@ -34,15 +34,12 @@ from csaxs_bec.devices.omny.rt.rt_ophyd import (
logger = bec_logger.logger
class RtOMNY_mirror_switchbox_Error(Exception):
pass
class RtOMNY_Error(Exception):
pass
class RtOMNYController(Controller):
_axes_per_controller = 3
red = "\x1b[91m"
@@ -90,7 +87,6 @@ class RtOMNYController(Controller):
socket_cls=None,
socket_host=None,
socket_port=None,
device_manager=None,
attr_name="",
parent=None,
labels=None,
@@ -101,7 +97,6 @@ class RtOMNYController(Controller):
socket_cls=socket_cls,
socket_host=socket_host,
socket_port=socket_port,
device_manager=device_manager,
attr_name=attr_name,
parent=parent,
labels=labels,
@@ -239,7 +234,7 @@ class RtOMNYController(Controller):
"opt_amplitude1_neg": 3000,
"opt_amplitude2_pos": 3000,
"opt_amplitude2_neg": 3000,
},
}
}
# def is_axis_moving(self, axis_Id) -> bool:
@@ -266,60 +261,42 @@ class RtOMNYController(Controller):
threading.Thread(target=send_positions, args=(self, positions), daemon=True).start()
def get_mirror_parameters(self, channel):
def get_mirror_parameters(self,channel):
return self.mirror_parameters[channel]
def laser_tracker_check_and_wait_for_signalstrength(self):
self.get_device_manager().connector.send_client_info(
"Checking laser tracker...", scope="", show_asap=True
)
self.get_device_manager().connector.send_client_info("Checking laser tracker...", scope="", show_asap=True)
if not self.laser_tracker_check_enabled():
print(
"laser_tracker_check_and_wait_for_signalstrength: The laser tracker is not even enabled."
)
print("laser_tracker_check_and_wait_for_signalstrength: The laser tracker is not even enabled.")
return
# first check on target
#first check on target
self.laser_tracker_wait_on_target()
# when on target, check interferometer signal
signal = self._omny_interferometer_get_signalsample("ssi_4", 0.1)
#when on target, check interferometer signal
signal = self._omny_interferometer_get_signalsample("ssi_4",0.1)
rtx = self.get_device_manager().devices.rtx
min_signal = rtx.user_parameter.get("min_signal")
low_signal = rtx.user_parameter.get("low_signal")
wait_counter = 0
while signal < min_signal and wait_counter < 10:
self.get_device_manager().connector.send_client_info(
f"The signal of the tracker {signal} is below the minimum required signal of {min_signal}. Waiting...",
scope="laser_tracker_check_and_wait_for_signalstrength",
show_asap=True,
)
while signal < min_signal and wait_counter<10:
self.get_device_manager().connector.send_client_info(f"The signal of the tracker {signal} is below the minimum required signal of {min_signal}. Waiting...", scope="laser_tracker_check_and_wait_for_signalstrength", show_asap=True)
wait_counter += 1
wait_counter+=1
time.sleep(0.2)
signal = self._omny_interferometer_get_signalsample("ssi_4", 0.1)
signal = self._omny_interferometer_get_signalsample("ssi_4",0.1)
if signal < low_signal:
self.get_device_manager().connector.send_client_info(
f"\x1b[91mThe signal of the tracker {signal} is below the low limit of {low_signal}. Auto readjustment...\x1b[0m",
scope="laser_tracker_check_and_wait_for_signalstrength",
show_asap=True,
)
self.get_device_manager().connector.send_client_info(f"\x1b[91mThe signal of the tracker {signal} is below the low limit of {low_signal}. Auto readjustment...\x1b[0m", scope="laser_tracker_check_and_wait_for_signalstrength", show_asap=True)
self.omny_interferometer_align_tracking()
self.get_device_manager().connector.send_client_info(
"Checking laser tracker completed.", scope="", show_asap=True
)
self.omny_interferometer_align_tracking()
self.get_device_manager().connector.send_client_info("Checking laser tracker completed.", scope="", show_asap=True)
def omny_interferometer_align_tracking(self):
mirror_channel = 6
signal = self._omny_interferometer_get_signalsample(
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
)
mirror_channel=6
signal = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"])
if signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]:
print(
f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed."
)
print(f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed.")
else:
self._omny_interferometer_switch_channel(mirror_channel)
time.sleep(0.1)
@@ -330,19 +307,16 @@ class RtOMNYController(Controller):
self._omny_interferometer_switch_alloff()
self.show_signal_strength_interferometer()
mirror_channel = -1
mirror_channel=-1
def omny_interferometer_align_incoupling_angle(self):
mirror_channel = 1
signal = self._omny_interferometer_get_signalsample(
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
)
mirror_channel=1
signal = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"])
if signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]:
print(
f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed."
)
print(f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed.")
else:
self._omny_interferometer_switch_channel(mirror_channel)
time.sleep(0.1)
@@ -353,17 +327,18 @@ class RtOMNYController(Controller):
self._omny_interferometer_switch_alloff()
self.show_signal_strength_interferometer()
mirror_channel = -1
mirror_channel=-1
def _omny_interferometer_openloop_steps(self, channel, steps, amplitude):
if channel not in range(3, 5):
if channel not in range(3,5):
raise RtOMNY_Error(f"invalid channel number {channel}.")
if amplitude > 4090:
amplitude = 4090
elif amplitude < 10:
amplitude = 10
oshield = self.get_device_manager().devices.oshield
oshield.obj.controller.move_open_loop_steps(
@@ -376,7 +351,7 @@ class RtOMNYController(Controller):
def _omny_interferometer_optimize(self, mirror_channel, channel):
if mirror_channel == -1:
raise RtOMNY_Error("no mirror channel selected")
# mirror channel is mirror number and channel is smaract channel, i.e. axis of the mirror
#mirror channel is mirror number and channel is smaract channel, i.e. axis of the mirror
if channel == 3:
steps_pos = self.mirror_parameters[mirror_channel]["opt_steps1_pos"]
steps_neg = self.mirror_parameters[mirror_channel]["opt_steps1_neg"]
@@ -390,80 +365,67 @@ class RtOMNYController(Controller):
else:
raise RtOMNY_Error(f"invalid channel number {channel}.")
previous_signal = self._omny_interferometer_get_signalsample(
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
)
previous_signal = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"])
min_begin = self.mirror_parameters[mirror_channel]["opt_signal_min_begin"]
min_begin = self.mirror_parameters[mirror_channel]["opt_signal_min_begin"]
if previous_signal < min_begin:
# raise OMNY_rt_clientError("error1") #(f"Minimum signal of axis {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} to start alignment not present.")
#raise OMNY_rt_clientError("error1") #(f"Minimum signal of axis {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} to start alignment not present.")
print(f"\rMinimum signal for auto alignment {min_begin} not reached.")
return
elif previous_signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]:
print(
f"\rInterferometer signal of axis is good"
) # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} is good.")
return
print(f"\rInterferometer signal of axis is good") # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} is good.")
return
else:
direction = 1
cycle_counter = 0
cycle_max = 20
reversal_counter = 0
reversal_max = 4
self.mirror_amplitutde_increase = 0
cycle_counter=0
cycle_max=20
reversal_counter=0
reversal_max=4
self.mirror_amplitutde_increase=0
current_sample = self._omny_interferometer_get_signalsample(
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
)
max = current_sample
current_sample = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"])
max=current_sample
while (
current_sample < self.mirror_parameters[mirror_channel]["opt_signal_stop"]
and cycle_counter < cycle_max
and reversal_counter < reversal_max
):
while current_sample < self.mirror_parameters[mirror_channel]["opt_signal_stop"] and cycle_counter<cycle_max and reversal_counter < reversal_max:
# if current_sample < self.mirror_parameters[mirror_channel]["opt_signal_min_begin"]:
# raise OMNY_rt_clientError("error2") #(f"Minimum signal of axis {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} to start alignment not present.")
if direction > 0:
if direction>0:
self._omny_interferometer_openloop_steps(channel, steps_pos, opt_amplitude_pos)
verbose_str = f"channel {channel}, steps {steps_pos}"
else:
self._omny_interferometer_openloop_steps(channel, -steps_neg, opt_amplitude_neg)
verbose_str = f"auto action {channel}, steps {-steps_pos}"
# print(f"Aligning axis ") #{self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}")
#print(f"Aligning axis ") #{self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}")
current_sample = self._omny_interferometer_get_signalsample(
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
)
current_sample = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"])
opt_mirrorname = self.mirror_parameters[mirror_channel]["opt_mirrorname"]
info_str = f"\rAuto aligning Channel {mirror_channel}, {opt_mirrorname}, Current signal: {current_sample:.0f}"
message = info_str + " (q)uit \r"
self.get_device_manager().connector.send_client_info(
message, scope="_omny_interferometer_optimize", show_asap=True
)
message=info_str +" (q)uit \r"
self.get_device_manager().connector.send_client_info(message, scope="_omny_interferometer_optimize", show_asap=True)
if previous_signal>current_sample:
if direction<0:
steps_pos=int(steps_pos/2)
steps_neg=int(steps_neg/2)
if steps_pos<1:
steps_pos=1
if steps_neg<1:
steps_neg=1
direction=direction*(-1)
reversal_counter+=1
previous_signal=current_sample
cycle_counter+=1
print(f"\r\nFinished aligning channel {channel} of mirror {mirror_channel}\n\r") # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}")
if previous_signal > current_sample:
if direction < 0:
steps_pos = int(steps_pos / 2)
steps_neg = int(steps_neg / 2)
if steps_pos < 1:
steps_pos = 1
if steps_neg < 1:
steps_neg = 1
direction = direction * (-1)
reversal_counter += 1
previous_signal = current_sample
cycle_counter += 1
print(
f"\r\nFinished aligning channel {channel} of mirror {mirror_channel}\n\r"
) # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}")
def move_to_zero(self):
self.socket_put("pa0,0")
@@ -495,7 +457,7 @@ class RtOMNYController(Controller):
if ret == 1:
return True
return False
def feedback_is_running(self) -> bool:
self.feedback_get_status_and_ssi()
interferometer_feedback_not_running = int(self.ssi["feedback_error"])
@@ -504,9 +466,7 @@ class RtOMNYController(Controller):
return True
def feedback_enable_with_reset(self):
self.get_device_manager().connector.send_client_info(
f"Enabling the feedback...", scope="", show_asap=True
)
self.get_device_manager().connector.send_client_info(f"Enabling the feedback...", scope="", show_asap=True)
self.socket_put("J0") # disable feedback
time.sleep(0.01)
@@ -528,10 +488,8 @@ class RtOMNYController(Controller):
osamroy = self.get_device_manager().devices.osamroy
# the following read will also upate the angle in rt during readout
readback = osamroy.obj.readback.get()
if np.fabs(readback) > 0.1:
self.get_device_manager().connector.send_client_info(
f"Rotating to zero", scope="feedback enable", show_asap=True
)
if (np.fabs(readback) > 0.1):
self.get_device_manager().connector.send_client_info(f"Rotating to zero", scope="feedback enable", show_asap=True)
osamroy.obj.move(0, wait=True)
osamx = self.get_device_manager().devices.osamx
@@ -556,15 +514,16 @@ class RtOMNYController(Controller):
time.sleep(1.5)
self.set_device_read_write("osamx", False)
self.set_device_read_write("osamy", False)
self.set_device_read_write("ofzpx", False)
self.set_device_read_write("ofzpy", False)
self.set_device_read_write("oosax", False)
self.set_device_read_write("oosax", False)
self.set_device_enabled("osamx", False)
self.set_device_enabled("osamy", False)
self.set_device_enabled("ofzpx", False)
self.set_device_enabled("ofzpy", False)
self.set_device_enabled("oosax", False)
self.set_device_enabled("oosax", False)
print("Feedback is running.")
@threadlocked
def clear_trajectory_generator(self):
self.socket_put("sc")
@@ -575,15 +534,16 @@ class RtOMNYController(Controller):
self.move_to_zero()
self.socket_put("J0")
self.set_device_read_write("osamx", True)
self.set_device_read_write("osamy", True)
self.set_device_read_write("ofzpx", True)
self.set_device_read_write("ofzpy", True)
self.set_device_read_write("oosax", True)
self.set_device_read_write("oosax", True)
self.set_device_enabled("osamx", True)
self.set_device_enabled("osamy", True)
self.set_device_enabled("ofzpx", True)
self.set_device_enabled("ofzpy", True)
self.set_device_enabled("oosax", True)
self.set_device_enabled("oosax", True)
print("rt feedback is now disabled.")
def set_rotation_angle(self, val: float) -> None:
self.socket_put(f"a{val/180*np.pi}")
@@ -618,13 +578,12 @@ class RtOMNYController(Controller):
"enabled_z": bool(tracker_values[10]),
}
def laser_tracker_on(self):
# update variables and enable if not yet active
#update variables and enable if not yet active
if not self.laser_tracker_check_enabled():
logger.info("Enabling the laser tracker. Please wait...")
self.get_device_manager().connector.send_client_info(
f"Enabling the laser tracker. Please wait...", scope="", show_asap=True
)
self.get_device_manager().connector.send_client_info(f"Enabling the laser tracker. Please wait...", scope="", show_asap=True)
tracker_intensity = self.tracker_info["tracker_intensity"]
if (
@@ -650,6 +609,7 @@ class RtOMNYController(Controller):
logger.info("Laser tracker running!")
print("Laser tracker running!")
def laser_tracker_check_enabled(self) -> bool:
self.laser_update_tracker_info()
if self.tracker_info["enabled_z"] and self.tracker_info["enabled_y"]:
@@ -668,10 +628,11 @@ class RtOMNYController(Controller):
return True
return False
def laser_tracker_wait_on_target(self):
max_repeat = 15
count = 0
while not self.laser_tracker_check_on_target() and count < max_repeat:
while not self.laser_tracker_check_on_target() and count<max_repeat:
logger.info("Waiting for laser tracker to reach target position.")
time.sleep(0.5)
count += 1
@@ -680,74 +641,75 @@ class RtOMNYController(Controller):
raise RtError("Failed to reach laser target position.")
def laser_tracker_print_intensity_for_otrack_tweaking(self):
# self.laser_update_tracker_info()
# _laser_tracker_intensity = self.tracker_info["tracker_intensity"]
# print(f"\r PSD beam intensity: {_laser_tracker_intensity:.2f}\r")
#self.laser_update_tracker_info()
#_laser_tracker_intensity = self.tracker_info["tracker_intensity"]
#print(f"\r PSD beam intensity: {_laser_tracker_intensity:.2f}\r")
self.laser_tracker_show_all(extra_endline="\r")
def laser_tracker_show_all(self, extra_endline=""):
def laser_tracker_show_all(self,extra_endline=""):
self.laser_update_tracker_info()
enabled_y = self.tracker_info["enabled_y"]
print(extra_endline + f"Tracker enabled: {bool(enabled_y)}" + extra_endline)
print(extra_endline+f"Tracker enabled: {bool(enabled_y)}"+extra_endline)
if self.tracker_info["tracker_intensity"] < self.tracker_info["threshold_intensity_y"]:
print(self.red + " LOW INTENSITY" + self.white + extra_endline)
print(self.red+" LOW INTENSITY"+self.white+extra_endline)
_laser_tracker_intensity = self.tracker_info["tracker_intensity"]
print(f" PSD beam intensity: {_laser_tracker_intensity:.2f}" + extra_endline)
print(f" PSD beam intensity: {_laser_tracker_intensity:.2f}"+extra_endline)
_laser_tracker_y_beampos = self.tracker_info["beampos_y"]
print(f" Y beam position: {_laser_tracker_y_beampos:.2f}" + extra_endline)
print(f" Y beam position: {_laser_tracker_y_beampos:.2f}"+extra_endline)
_laser_tracker_y_target = self.tracker_info["target_y"]
print(f" target position: {_laser_tracker_y_target:.2f}" + extra_endline)
print(f" target position: {_laser_tracker_y_target:.2f}"+extra_endline)
_laser_tracker_y_threshold_intensity = self.tracker_info["threshold_intensity_y"]
print(
f" threshold intensity: {_laser_tracker_y_threshold_intensity:.2f}" + extra_endline
)
print(f" threshold intensity: {_laser_tracker_y_threshold_intensity:.2f}"+extra_endline)
_laser_tracker_y_piezo_voltage = self.tracker_info["piezo_voltage_y"]
print(f" Piezo voltage: {_laser_tracker_y_piezo_voltage:.2f}" + extra_endline)
print(f" Piezo voltage: {_laser_tracker_y_piezo_voltage:.2f}"+extra_endline)
_laser_tracker_z_beampos = self.tracker_info["beampos_z"]
print(f" Z beam position: {_laser_tracker_z_beampos:.2f}" + extra_endline)
print(f" Z beam position: {_laser_tracker_z_beampos:.2f}"+extra_endline)
_laser_tracker_z_target = self.tracker_info["target_z"]
print(f" target position: {_laser_tracker_z_target:.2f}" + extra_endline)
print(f" target position: {_laser_tracker_z_target:.2f}"+extra_endline)
_laser_tracker_z_threshold_intensity = self.tracker_info["threshold_intensity_z"]
print(
f" threshold intensity: {_laser_tracker_z_threshold_intensity:.2f}" + extra_endline
)
print(f" threshold intensity: {_laser_tracker_z_threshold_intensity:.2f}"+extra_endline)
_laser_tracker_z_piezo_voltage = self.tracker_info["piezo_voltage_z"]
print(f" Piezo voltage: {_laser_tracker_z_piezo_voltage:.2f}" + extra_endline)
print(" Reminder - there is also an upper threshold active in rt\n" + extra_endline)
print(f" Piezo voltage: {_laser_tracker_z_piezo_voltage:.2f}"+extra_endline)
print(" Reminder - there is also an upper threshold active in rt\n"+extra_endline)
if extra_endline == "":
self.laser_tracker_galil_status()
def laser_tracker_galil_enable(self):
otracky_con = self.get_device_manager().devices.otracky.obj.controller
otracky_con.socket_put_confirmed("tracken=1")
otracky_con.socket_put_confirmed("trackyct=0")
otracky_con.socket_put_confirmed("trackzct=0")
def laser_tracker_galil_disable(self):
otracky_con = self.get_device_manager().devices.otracky.obj.controller
otracky_con.socket_put_confirmed("tracken=0")
def laser_tracker_galil_status(self):
otracky_con = self.get_device_manager().devices.otracky.obj.controller
if bool(float(otracky_con.socket_put_and_receive("MGtracken").strip())) == 0:
print(self.red + "Tracking in the Galil Controller is disabled." + self.white)
print(self.red+"Tracking in the Galil Controller is disabled."+self.white)
print("Use dev.rtx.controller.laser_tracker_galil_enable to enable.\n")
return 0
return(0)
else:
print("Tracking in the Galil Controller is enabled.")
trackyct = int(float(otracky_con.socket_put_and_receive("MGtrackyct").strip()))
trackzct = int(float(otracky_con.socket_put_and_receive("MGtrackzct").strip()))
trackyct=int(float(otracky_con.socket_put_and_receive("MGtrackyct").strip()))
trackzct=int(float(otracky_con.socket_put_and_receive("MGtrackzct").strip()))
print(f"Galil Trackcounters y={trackyct}, z={trackzct}")
def show_signal_strength_interferometer(self):
channelnames = {1: "OSA FZP Y", 2: "ST OSA Y", 3: "OSA FZP X", 4: "ST OSA X", 5: "Angle"}
channelnames={1:"OSA FZP Y",2:"ST OSA Y",3:"OSA FZP X",4:"ST OSA X",5:"Angle"}
self.feedback_get_status_and_ssi()
t = PrettyTable()
t.title = f"Interferometer signal strength"
t.field_names = ["Channel", "Name", "Value"]
for i in range(1, 6):
for i in range(1,6):
ssi = self.ssi[f"ssi_{i}"]
t.add_row([i, channelnames[i], ssi])
t.add_row([i,channelnames[i], ssi])
print(t)
def _omny_interferometer_switch_open_socket(self):
@@ -760,42 +722,44 @@ class RtOMNYController(Controller):
self._omny_interferometer_switch_put_and_receive("?000\r")
time.sleep(1)
def _omny_interferometer_switch_channel(self, channel):
self._omny_interferometer_switch_alloff()
time.sleep(0.1)
if channel == 1: # Relais 1 and 2
if channel == 1: #Relais 1 and 2
rback = self._omny_interferometer_switch_put_and_receive("!0020003\r")
# if "|0003\r" != self._omny_interferometer_switch_put_and_receive("!0020003\r"):
#if "|0003\r" != self._omny_interferometer_switch_put_and_receive("!0020003\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 2: # Relais 3 and 4
elif channel == 2: #Relais 3 and 4
rback = self._omny_interferometer_switch_put_and_receive("!002000C\r")
# if "|000C\r" != self._omny_interferometer_switch_put_and_receive("!002000C\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 3: # Relais 5 and 6
elif channel == 3: #Relais 5 and 6
rback = self._omny_interferometer_switch_put_and_receive("!0020030\r")
# if "|0030\r" != self._omny_interferometer_switch_put_and_receive("!0020030\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 4: # Relais 7 and 8
elif channel == 4: #Relais 7 and 8
rback = self._omny_interferometer_switch_put_and_receive("!00200C0\r")
# if "|00C0\r" != self._omny_interferometer_switch_put_and_receive("!00200C0\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 5: # Relais 9 and 10
elif channel == 5: #Relais 9 and 10
rback = self._omny_interferometer_switch_put_and_receive("!0020300\r")
# if "|0300\r" != self._omny_interferometer_switch_put_and_receive("!0020300\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 6: # Relais 11 and 12
elif channel == 6: #Relais 11 and 12
rback = self._omny_interferometer_switch_put_and_receive("!0020C00\r")
# if "|0C00\r" != self._omny_interferometer_switch_put_and_receive("!0020C00\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 7: # Relais 13 and 14
elif channel == 7: #Relais 13 and 14
rback = self._omny_interferometer_switch_put_and_receive("!0023000\r")
# if "|3000\r" != self._omny_interferometer_switch_put_and_receive("!0023000\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 8: # Relais 7 and 8 SPECIAL CASE use osafzp y signal to align osa y
elif channel == 8: #Relais 7 and 8 SPECIAL CASE use osafzp y signal to align osa y
rback = self._omny_interferometer_switch_put_and_receive("!00200C0\r")
# if "|00C0\r" != self._omny_interferometer_switch_put_and_receive("!00200C0\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 9: # Relais 15 and 16
elif channel == 9: #Relais 15 and 16
rback = self._omny_interferometer_switch_put_and_receive("!002C000\r")
# if "|C000\r" != self._omny_interferometer_switch_put_and_receive("!002C000\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
@@ -821,14 +785,14 @@ class RtOMNYController(Controller):
self._omny_interferometer_switch_put_and_receive("!00S01\r")
def _omny_interferometer_switch_alloff(self):
# switch all off
#switch all off
self._omny_interferometer_switch_put_and_receive("!0020000\r")
# LED OFF
#LED OFF
time.sleep(0.1)
self._omny_interferometer_switch_put_and_receive("!00S00\r")
self._omny_interferometer_switch_put_and_receive("!00S00\r")
time.sleep(0.1)
alloff = self._omny_interferometer_switch_put_and_receive("?002\r")
# check all off
alloff = self._omny_interferometer_switch_put_and_receive("?002\r")
#check all off
if "00" not in alloff:
raise RtOMNY_mirror_switchbox_Error("Not all channels switched off.")
@@ -836,16 +800,17 @@ class RtOMNYController(Controller):
self.socket_put("J3")
def _omny_interferometer_get_signalsample(self, channel, averaging_duration):
# channel is string, eg ssi_1
# ensure no averaging running currently
#channel is string, eg ssi_1
#ensure no averaging running currently
self.feedback_is_running()
# measure first sample
#measure first sample
self._rt_start_averaging_SSI()
time.sleep(averaging_duration)
self.feedback_is_running()
return self.ssi[channel]
def _get_signals_from_table(self, return_table) -> dict:
self.average_stdeviations_x_st_fzp += float(return_table[16])
self.average_stdeviations_y_st_fzp += float(return_table[18])
@@ -866,6 +831,7 @@ class RtOMNYController(Controller):
"stdev_x_st_fzp": {"value": float(return_table[16])},
"average_y_st_fzp": {"value": float(return_table[17])},
"stdev_y_st_fzp": {"value": float(return_table[18])},
"average_stdeviations_x_st_fzp": {
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
},
@@ -874,7 +840,7 @@ class RtOMNYController(Controller):
},
}
return signals
@threadlocked
def start_scan(self):
if not self.feedback_is_running():
@@ -896,6 +862,7 @@ class RtOMNYController(Controller):
# start a point-by-point scan (for cont scan in flomni it would be "sa")
self.socket_put_and_receive("sd")
@retry_once
@threadlocked
def get_scan_status(self):
@@ -914,6 +881,7 @@ class RtOMNYController(Controller):
current_position_in_scan = int(float(return_table[2]))
return (mode, number_of_positions_planned, current_position_in_scan)
def get_device_manager(self):
for axis in self._axis:
if hasattr(axis, "device_manager") and axis.device_manager:
@@ -985,9 +953,8 @@ class RtOMNYController(Controller):
"OMNY statistics: Average of all standard deviations [nm]: x"
f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y"
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}.",
scope="",
show_asap=True,
)
scope="", show_asap=True)
def publish_device_data(self, signals, point_id):
self.get_device_manager().connector.set_and_publish(
@@ -1101,7 +1068,7 @@ class RtOMNYMotor(Device, PositionerBase):
self.axis_Id = axis_Id
self.sign = sign
self.controller = RtOMNYController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
socket_cls=socket_cls, socket_host=host, socket_port=port
)
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.device_manager = device_manager
@@ -1129,10 +1096,6 @@ class RtOMNYMotor(Device, PositionerBase):
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
def wait_for_connection(self, timeout: int = 10, **kwargs) -> None:
"""Wait for the device to be connected."""
self.controller.on(timeout=timeout)
@property
def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get())
@@ -1219,6 +1182,7 @@ class RtOMNYMotor(Device, PositionerBase):
return status
@property
def axis_Id(self):
return self._axis_Id_alpha
@@ -1263,7 +1227,7 @@ class RtOMNYMotor(Device, PositionerBase):
if __name__ == "__main__":
rtcontroller = RtOMNYController(
socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None
socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222
)
rtcontroller.on()
rtcontroller.laser_tracker_on()

View File

@@ -93,7 +93,6 @@ class SmaractController(Controller):
socket_cls=None,
socket_host=None,
socket_port=None,
device_manager=None,
attr_name="",
labels=None,
):
@@ -103,7 +102,6 @@ class SmaractController(Controller):
socket_cls=socket_cls,
socket_host=socket_host,
socket_port=socket_port,
device_manager=device_manager,
attr_name=attr_name,
parent=parent,
labels=labels,

View File

@@ -123,11 +123,10 @@ class SmaractMotor(Device, PositionerBase):
limits=None,
sign=1,
socket_cls=SocketIO,
device_manager=None,
**kwargs,
):
self.controller = SmaractController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
socket_cls=socket_cls, socket_host=host, socket_port=port
)
self.axis_Id = axis_Id
self.sign = sign

View File

@@ -26,12 +26,12 @@ import numpy as np
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from bec_server.scan_server.errors import ScanAbortion
from bec_server.scan_server.scans import AsyncFlyScanBase
from bec_server.scan_server.scans import SyncFlyScanBase
logger = bec_logger.logger
class FlomniFermatScan(AsyncFlyScanBase):
class FlomniFermatScan(SyncFlyScanBase):
scan_name = "flomni_fermat_scan"
scan_type = "fly"
required_kwargs = ["fovx", "fovy", "exp_time", "step", "angle"]
@@ -97,7 +97,6 @@ class FlomniFermatScan(AsyncFlyScanBase):
logger.warning("The zshift is smaller than -100 um. It will be limited to -100 um.")
self.zshift = -100
self.flomni_rotation_status = None
self.flyer_num_pos = 0
def initialize(self):
self.scan_motors = []
@@ -108,6 +107,10 @@ class FlomniFermatScan(AsyncFlyScanBase):
self.positions, corridor_size=self.optim_trajectory_corridor
)
@property
def monitor_sync(self):
return "rt_flomni"
def reverse_trajectory(self):
"""
Reverse the trajectory. Every other scan should be reversed to
@@ -132,13 +135,13 @@ class FlomniFermatScan(AsyncFlyScanBase):
if flip_axes:
self.positions = np.flipud(self.positions)
self.flyer_num_pos = len(self.positions)
self.num_pos = len(self.positions)
self._check_min_positions()
def _check_min_positions(self):
if self.flyer_num_pos < 20:
if self.num_pos < 20:
raise ScanAbortion(
f"The number of positions must exceed 20. Currently: {self.flyer_num_pos}."
f"The number of positions must exceed 20. Currently: {self.num_pos}."
)
def _prepare_setup(self):
@@ -147,8 +150,6 @@ class FlomniFermatScan(AsyncFlyScanBase):
yield from self.stubs.send_rpc_and_wait("rty", "set", self.positions[0][1])
yield from self.stubs.scan_report_instruction({"device_progress": ["rt_flyer"]})
def _prepare_setup_part2(self):
if self.flomni_rotation_status:
self.flomni_rotation_status.wait()
@@ -168,16 +169,12 @@ class FlomniFermatScan(AsyncFlyScanBase):
tracker_signal_status = yield from self.stubs.send_rpc_and_wait(
"rtx", "controller.laser_tracker_check_signalstrength"
)
# self.device_manager.connector.send_client_info(tracker_signal_status)
#self.device_manager.connector.send_client_info(tracker_signal_status)
if tracker_signal_status == "low":
self.device_manager.connector.raise_alarm(
severity=0,
alarm_type="LaserTrackerSignalStrength",
source={
"device": "rtx",
"reason": "low signal strength",
"method": "_prepare_setup_part2",
},
source={"device": "rtx", "reason": "low signal strength", "method": "_prepare_setup_part2"},
metadata={},
msg="Signal strength of the laser tracker is low, sufficient to continue. Realignment recommended!",
)
@@ -279,13 +276,26 @@ class FlomniFermatScan(AsyncFlyScanBase):
return np.array(positions)
def scan_core(self):
status = yield from self.stubs.kickoff(device="rt_flyer", wait=False)
while not status.done:
yield from self.stubs.read(group="monitored", point_id=self.point_id)
# use a device message to receive the scan number and
# scan ID before sending the message to the device server
yield from self.stubs.kickoff(device="rtx")
while True:
yield from self.stubs.read(group="monitored")
status = self.connector.get(MessageEndpoints.device_status("rt_scan"))
if status:
status_id = status.content.get("status", 1)
request_id = status.metadata.get("RID")
if status_id == 0 and self.metadata.get("RID") == request_id:
break
if status_id == 2 and self.metadata.get("RID") == request_id:
raise ScanAbortion(
"An error occured during the flomni readout:"
f" {status.metadata.get('error')}"
)
time.sleep(1)
self.point_id += 1
logger.debug("reading monitors")
self.num_pos = self.point_id
# yield from self.device_rpc("rtx", "controller.kickoff")
def move_to_start(self):
"""return to the start position"""

View File

@@ -7,15 +7,10 @@ from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGal
@pytest.fixture
def fsamroy(dm_with_devices):
def fsamroy():
FuprGalilController._reset_controller()
fsamroy_motor = FuprGalilMotor(
"A",
name="fsamroy",
host="mpc2680.psi.ch",
port=8081,
socket_cls=SocketMock,
device_manager=dm_with_devices,
"A", name="fsamroy", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
)
fsamroy_motor.controller.on()
assert isinstance(fsamroy_motor.controller, FuprGalilController)

View File

@@ -8,15 +8,10 @@ from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, Lamn
@pytest.fixture(scope="function")
def leyey(dm_with_devices):
def leyey():
LamniGalilController._reset_controller()
leyey_motor = LamniGalilMotor(
"H",
name="leyey",
host="mpc2680.psi.ch",
port=8081,
socket_cls=SocketMock,
device_manager=dm_with_devices,
"H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
)
leyey_motor.controller.on()
yield leyey_motor
@@ -25,15 +20,10 @@ def leyey(dm_with_devices):
@pytest.fixture(scope="function")
def leyex(dm_with_devices):
def leyex():
LamniGalilController._reset_controller()
leyex_motor = LamniGalilMotor(
"A",
name="leyey",
host="mpc2680.psi.ch",
port=8081,
socket_cls=SocketMock,
device_manager=dm_with_devices,
"A", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
)
leyex_motor.controller.on()
yield leyex_motor

View File

@@ -7,15 +7,10 @@ from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, Flo
@pytest.fixture(scope="function")
def leyey(dm_with_devices):
def leyey():
FlomniGalilController._reset_controller()
leyey_motor = FlomniGalilMotor(
"H",
name="leyey",
host="mpc2680.psi.ch",
port=8081,
socket_cls=SocketMock,
device_manager=dm_with_devices,
"H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
)
leyey_motor.controller.on()
yield leyey_motor
@@ -24,15 +19,10 @@ def leyey(dm_with_devices):
@pytest.fixture(scope="function")
def leyex(dm_with_devices):
def leyex():
FlomniGalilController._reset_controller()
leyex_motor = FlomniGalilMotor(
"H",
name="leyey",
host="mpc2680.psi.ch",
port=8081,
socket_cls=SocketMock,
device_manager=dm_with_devices,
"H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
)
leyex_motor.controller.on()
yield leyex_motor
@@ -167,16 +157,11 @@ def test_find_reference(leyex, axis_nr, socket_put_messages, socket_get_messages
],
)
def test_fosaz_light_curtain_is_triggered(
axis_Id, socket_put_messages, socket_get_messages, triggered, dm_with_devices
axis_Id, socket_put_messages, socket_get_messages, triggered
):
"""test that the light curtain is triggered"""
fosaz = FlomniGalilMotor(
axis_Id,
name="fosaz",
host="mpc2680.psi.ch",
port=8081,
socket_cls=SocketMock,
device_manager=dm_with_devices,
axis_Id, name="fosaz", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
)
fosaz.controller.on()
fosaz.controller.sock.flush_buffer()

View File

@@ -16,10 +16,7 @@ def controller():
"""
with mock.patch("ophyd_devices.utils.socket.SocketIO") as socket_cls:
controller = NPointController(
socket_cls=socket_cls,
socket_host="localhost",
socket_port=1234,
device_manager=mock.MagicMock(),
socket_cls=socket_cls, socket_host="localhost", socket_port=1234
)
controller.on()
controller.sock.reset_mock()
@@ -28,18 +25,13 @@ def controller():
@pytest.fixture
def npointx(dm_with_devices):
def npointx():
"""
Fixture to create a NPointAxis object.
"""
controller = mock.MagicMock()
npointx = NPointAxis(
axis_Id="A",
name="npointx",
host="localhost",
port=1234,
socket_cls=controller,
device_manager=dm_with_devices,
axis_Id="A", name="npointx", host="localhost", port=1234, socket_cls=controller
)
npointx.controller.on()
npointx.controller.sock.reset_mock()
@@ -115,18 +107,13 @@ def test_axis_get_in(npointx, axis, msg_in, msg_out):
npointx.controller.sock.put.assert_called_once_with(msg_in)
def test_axis_out_of_range(dm_with_devices):
def test_axis_out_of_range(controller):
"""
Test that an error is raised when trying to create an NPointAxis object with an invalid axis ID.
"""
with pytest.raises(ValueError):
npointx = NPointAxis(
axis_Id="G",
name="npointx",
host="localhost",
port=1234,
socket_cls=mock.MagicMock(),
device_manager=dm_with_devices,
axis_Id="G", name="npointx", host="localhost", port=1234, socket_cls=mock.MagicMock()
)

View File

@@ -11,11 +11,7 @@ from csaxs_bec.devices.omny.rt.rt_ophyd import RtError
def rt_flomni():
RtFlomniController._reset_controller()
rt_flomni = RtFlomniController(
name="rt_flomni",
socket_cls=SocketMock,
socket_host="localhost",
socket_port=8081,
device_manager=mock.MagicMock(),
name="rt_flomni", socket_cls=SocketMock, socket_host="localhost", socket_port=8081
)
with mock.patch.object(rt_flomni, "get_device_manager"):
with mock.patch.object(rt_flomni, "sock"):
@@ -80,16 +76,16 @@ def test_move_samx_to_scan_region(rt_flomni):
def test_feedback_enable_without_reset(rt_flomni):
with mock.patch.object(rt_flomni, "set_device_read_write") as set_device_read_write:
with mock.patch.object(rt_flomni, "set_device_enabled") as set_device_enabled:
with mock.patch.object(rt_flomni, "feedback_is_running", return_value=True):
with mock.patch.object(rt_flomni, "laser_tracker_on") as laser_tracker_on:
rt_flomni.feedback_enable_without_reset()
laser_tracker_on.assert_called_once()
assert mock.call(b"l3\n") in rt_flomni.sock.put.mock_calls
assert mock.call("fsamx", False) in set_device_read_write.mock_calls
assert mock.call("fsamy", False) in set_device_read_write.mock_calls
assert mock.call("foptx", False) in set_device_read_write.mock_calls
assert mock.call("fopty", False) in set_device_read_write.mock_calls
assert mock.call("fsamx", False) in set_device_enabled.mock_calls
assert mock.call("fsamy", False) in set_device_enabled.mock_calls
assert mock.call("foptx", False) in set_device_enabled.mock_calls
assert mock.call("fopty", False) in set_device_enabled.mock_calls
def test_feedback_enable_without_reset_raises(rt_flomni):

View File

@@ -10,27 +10,19 @@ from csaxs_bec.devices.smaract.smaract_ophyd import SmaractMotor
@pytest.fixture
def controller(dm_with_devices):
def controller():
SmaractController._reset_controller()
controller = SmaractController(
socket_cls=SocketMock, socket_host="dummy", socket_port=123, device_manager=dm_with_devices
)
controller = SmaractController(socket_cls=SocketMock, socket_host="dummy", socket_port=123)
controller.on()
controller.sock.flush_buffer()
yield controller
@pytest.fixture
def lsmarA(dm_with_devices):
def lsmarA():
SmaractController._reset_controller()
motor_a = SmaractMotor(
"A",
name="lsmarA",
host="mpc2680.psi.ch",
port=8085,
sign=1,
socket_cls=SocketMock,
device_manager=dm_with_devices,
"A", name="lsmarA", host="mpc2680.psi.ch", port=8085, sign=1, socket_cls=SocketMock
)
motor_a.controller.on()
motor_a.controller.sock.flush_buffer()