feat: flomni stages

This commit is contained in:
2023-12-12 09:13:58 +01:00
committed by wakonig_k
parent 0ac0f1426c
commit 09a8b38883
13 changed files with 1839 additions and 48 deletions

View File

@ -1,9 +1,11 @@
from .eiger1p5m_csaxs.eiger1p5m import Eiger1p5MDetector from .eiger1p5m_csaxs.eiger1p5m import Eiger1p5MDetector
from .epics import * from .epics import *
from .galil.fgalil_ophyd import FlomniGalilMotor
from .galil.fupr_ophyd import FuprGalilMotor
from .galil.galil_ophyd import GalilMotor from .galil.galil_ophyd import GalilMotor
from .galil.sgalil_ophyd import SGalilMotor from .galil.sgalil_ophyd import SGalilMotor
from .npoint.npoint import NPointAxis from .npoint.npoint import NPointAxis
from .rt_lamni import RtLamniMotor from .rt_lamni import RtFlomniMotor, RtLamniMotor
from .sim.sim import ( from .sim.sim import (
SynAxisMonitor, SynAxisMonitor,
SynAxisOPAAS, SynAxisOPAAS,

View File

@ -4,6 +4,7 @@ from ophyd.quadem import QuadEM
from ophyd.sim import SynAxis, SynPeriodicSignal, SynSignal from ophyd.sim import SynAxis, SynPeriodicSignal, SynSignal
from .devices.delay_generator_csaxs import DelayGeneratorcSAXS from .devices.delay_generator_csaxs import DelayGeneratorcSAXS
from .devices.flomni_sample_storage import FlomniSampleStorage
from .devices.InsertionDevice import InsertionDevice from .devices.InsertionDevice import InsertionDevice
from .devices.slits import SlitH, SlitV from .devices.slits import SlitH, SlitV
from .devices.specMotors import ( from .devices.specMotors import (
@ -20,7 +21,7 @@ from .devices.specMotors import (
PmMonoBender, PmMonoBender,
) )
from .devices.SpmBase import SpmBase from .devices.SpmBase import SpmBase
from .devices.XbpmBase import XbpmBase, XbpmCsaxsOp
# X07MA specific devices # X07MA specific devices
from .devices.X07MADevices import * from .devices.X07MADevices import *
from .devices.XbpmBase import XbpmBase, XbpmCsaxsOp

View File

@ -1,7 +1,19 @@
from .slits import SlitH, SlitV # Standard ophyd classes
from .XbpmBase import XbpmBase, XbpmCsaxsOp from ophyd import EpicsMotor, EpicsSignal, EpicsSignalRO
from .SpmBase import SpmBase from ophyd.quadem import QuadEM
from ophyd.sim import SynAxis, SynPeriodicSignal, SynSignal
from .DelayGeneratorDG645 import DelayGeneratorDG645
from .eiger9m_csaxs import Eiger9McSAXS
# cSAXS
from .epics_motor_ex import EpicsMotorEx
from .falcon_csaxs import FalconcSAXS
from .flomni_sample_storage import FlomniSampleStorage
from .InsertionDevice import InsertionDevice from .InsertionDevice import InsertionDevice
from .mcs_csaxs import McsCsaxs
from .pilatus_csaxs import PilatuscSAXS
from .slits import SlitH, SlitV
from .specMotors import ( from .specMotors import (
Bpm4i, Bpm4i,
EnergyKev, EnergyKev,
@ -15,19 +27,5 @@ from .specMotors import (
PmDetectorRotation, PmDetectorRotation,
PmMonoBender, PmMonoBender,
) )
from .SpmBase import SpmBase
# Standard ophyd classes from .XbpmBase import XbpmBase, XbpmCsaxsOp
from ophyd import EpicsSignal, EpicsSignalRO, EpicsMotor
from ophyd.sim import SynAxis, SynSignal, SynPeriodicSignal
from ophyd.quadem import QuadEM
# cSAXS
from .epics_motor_ex import EpicsMotorEx
from .mcs_csaxs import MCScSAXS
from .psi_detector_base import PSIDetectorBase, CustomDetectorMixin
from .eiger9m_csaxs import Eiger9McSAXS
from .pilatus_csaxs import PilatuscSAXS
from .falcon_csaxs import FalconcSAXS
from .delay_generator_csaxs import DelayGeneratorcSAXS
# from .psi_detector_base import PSIDetectorBase, CustomDetectorMixin

View File

@ -0,0 +1,82 @@
import time
from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal
class FlomniSampleStorageError(Exception):
pass
class FlomniSampleStorage(Device):
USER_ACCESS = [
"is_sample_slot_used",
"is_sample_in_gripper",
"set_sample_slot",
"unset_sample_slot",
"set_sample_in_gripper",
"unset_sample_in_gripper",
]
SUB_VALUE = "value"
_default_sub = SUB_VALUE
sample_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET", {}) for i in range(21)
}
sample_placed = Dcpt(sample_placed)
sample_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET.DESC", {"string": True})
for i in range(21)
}
sample_names = Dcpt(sample_names)
sample_in_gripper = Cpt(
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_flomni100:GET"
)
sample_in_gripper_name = Cpt(
EpicsSignal,
name="sample_in_gripper_name",
read_pv="XOMNY-SAMPLE_DB_flomni100:GET.DESC",
string=True,
)
def __init__(self, prefix="", *, name, **kwargs):
super().__init__(prefix, name=name, **kwargs)
self.sample_placed.sample1.subscribe(self._emit_value)
def _emit_value(self, **kwargs):
timestamp = kwargs.pop("timestamp", time.time())
self.wait_for_connection()
self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self)
def set_sample_slot(self, slot_nr: int, name: str) -> bool:
if slot_nr > 20:
raise FlomniSampleStorageError(f"Invalid slot number {slot_nr}.")
getattr(self.sample_placed, f"sample{slot_nr}").set(1)
getattr(self.sample_names, f"sample{slot_nr}").set(name)
def unset_sample_slot(self, slot_nr: int) -> bool:
if slot_nr > 20:
raise FlomniSampleStorageError(f"Invalid slot number {slot_nr}.")
getattr(self.sample_placed, f"sample{slot_nr}").set(0)
getattr(self.sample_names, f"sample{slot_nr}").set("-")
def set_sample_in_gripper(self, name: str) -> bool:
self.sample_in_gripper.set(1)
self.sample_in_gripper_name.set(name)
def unset_sample_in_gripper(self) -> bool:
self.sample_in_gripper.set(0)
self.sample_in_gripper_name.set("-")
def is_sample_slot_used(self, slot_nr: int) -> bool:
val = getattr(self.sample_placed, f"sample{slot_nr}").get()
return bool(val)
def is_sample_in_gripper(self) -> bool:
val = self.sample_in_gripper.get()
return bool(val)

View File

@ -9,8 +9,6 @@ from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait from ophyd.status import wait as status_wait
from ophyd.utils import LimitError, ReadOnlyError from ophyd.utils import LimitError, ReadOnlyError
from prettytable import PrettyTable
from ophyd_devices.galil.galil_ophyd import ( from ophyd_devices.galil.galil_ophyd import (
BECConfigError, BECConfigError,
GalilAxesReferenced, GalilAxesReferenced,
@ -21,15 +19,31 @@ from ophyd_devices.galil.galil_ophyd import (
GalilMotorResolution, GalilMotorResolution,
GalilReadbackSignal, GalilReadbackSignal,
GalilSetpointSignal, GalilSetpointSignal,
GalilSignalRO,
retry_once, retry_once,
) )
from ophyd_devices.utils.controller import Controller, threadlocked from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
from prettytable import PrettyTable
logger = bec_logger.logger logger = bec_logger.logger
class FlomniGalilController(GalilController): class FlomniGalilController(GalilController):
USER_ACCESS = [
"describe",
"show_running_threads",
"galil_show_all",
"socket_put_and_receive",
"socket_put_confirmed",
"drive_axis_to_limit",
"find_reference",
"get_motor_limit_switch",
"fosaz_light_curtain_is_triggered",
"is_motor_on",
"all_axes_referenced",
]
def is_axis_moving(self, axis_Id, axis_Id_numeric) -> bool: def is_axis_moving(self, axis_Id, axis_Id_numeric) -> bool:
if axis_Id is None and axis_Id_numeric is not None: if axis_Id is None and axis_Id_numeric is not None:
axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric) axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric)
@ -41,13 +55,67 @@ class FlomniGalilController(GalilController):
# TODO: check if all axes are referenced in all controllers # TODO: check if all axes are referenced in all controllers
return super().all_axes_referenced() return super().all_axes_referenced()
def fosaz_light_curtain_is_triggered(self) -> bool:
"""
Check the light curtain status for fosaz
class FlomniGalilReadbackSignal(GalilReadbackSignal): Returns:
pass bool: True if the light curtain is triggered
"""
return int(float(self.socket_put_and_receive("MG @IN[14]").strip())) == 1
class FlomniGalilReadbackSignal(GalilSignalRO):
@retry_once
@threadlocked
def _socket_get(self) -> float:
"""Get command for the readback signal
Returns:
float: Readback value after adjusting for sign and motor resolution.
"""
current_pos = float(self.controller.socket_put_and_receive(f"TD{self.parent.axis_Id}"))
current_pos *= self.parent.sign
step_mm = self.parent.motor_resolution.get()
return current_pos / step_mm
def read(self):
self._metadata["timestamp"] = time.time()
val = super().read()
return val
class FlomniGalilSetpointSignal(GalilSetpointSignal): class FlomniGalilSetpointSignal(GalilSetpointSignal):
pass @retry_once
@threadlocked
def _socket_set(self, val: float) -> None:
"""Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign.
Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted.
Args:
val (float): Target value / setpoint value
Raises:
GalilError: Raised if not all axes are referenced.
"""
target_val = val * self.parent.sign
self.setpoint = target_val
axes_referenced = self.controller.all_axes_referenced()
if axes_referenced:
while self.controller.is_thread_active(0):
time.sleep(0.1)
self.controller.socket_put_confirmed(f"naxis={self.parent.axis_Id_numeric}")
self.controller.socket_put_confirmed(f"ntarget={target_val:.3f}")
self.controller.socket_put_confirmed("movereq=1")
self.controller.socket_put_confirmed("XQ#NEWPAR")
while self.controller.is_thread_active(0):
time.sleep(0.005)
else:
raise GalilError("Not all axes are referenced.")
class FlomniGalilMotorResolution(GalilMotorResolution): class FlomniGalilMotorResolution(GalilMotorResolution):
@ -65,7 +133,7 @@ class FlomniGalilAxesReferenced(GalilAxesReferenced):
class FlomniGalilMotor(Device, PositionerBase): class FlomniGalilMotor(Device, PositionerBase):
USER_ACCESS = ["controller"] USER_ACCESS = ["controller"]
readback = Cpt( readback = Cpt(
GalilReadbackSignal, FlomniGalilReadbackSignal,
signal_name="readback", signal_name="readback",
kind="hinted", kind="hinted",
) )
@ -115,6 +183,7 @@ class FlomniGalilMotor(Device, PositionerBase):
"device_mapping has been specified but the device_manager cannot be accessed." "device_mapping has been specified but the device_manager cannot be accessed."
) )
self.rt = self.device_mapping.get("rt") self.rt = self.device_mapping.get("rt")
self.pid_x_correction = 0
super().__init__( super().__init__(
prefix, prefix,

View File

@ -63,6 +63,18 @@ class FuprGalilReadbackSignal(GalilReadbackSignal):
step_mm = self.parent.motor_resolution.get() step_mm = self.parent.motor_resolution.get()
return current_pos / step_mm return current_pos / step_mm
def read(self):
self._metadata["timestamp"] = time.time()
val = super().read()
if self.parent.axis_Id_numeric == 0:
try:
rt = self.parent.device_manager.devices[self.parent.rt]
if rt.enabled:
rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"])
except KeyError:
logger.warning("Failed to set RT value during readback.")
return val
class FuprGalilSetpointSignal(GalilSetpointSignal): class FuprGalilSetpointSignal(GalilSetpointSignal):
@retry_once @retry_once

View File

@ -52,6 +52,11 @@ class GalilController(Controller):
"socket_put_and_receive", "socket_put_and_receive",
"socket_put_confirmed", "socket_put_confirmed",
"lgalil_is_air_off_and_orchestra_enabled", "lgalil_is_air_off_and_orchestra_enabled",
"drive_axis_to_limit",
"find_reference",
"get_motor_limit_switch",
"is_motor_on",
"all_axes_referenced",
] ]
@threadlocked @threadlocked
@ -143,10 +148,11 @@ class GalilController(Controller):
self.socket_put_confirmed(f"naxis={axis_Id_numeric}") self.socket_put_confirmed(f"naxis={axis_Id_numeric}")
self.socket_put_confirmed(f"ndir={direction_flag}") self.socket_put_confirmed(f"ndir={direction_flag}")
self.socket_put_confirmed("XQ#NEWPAR") self.socket_put_confirmed("XQ#NEWPAR")
time.sleep(0.005)
self.socket_put_confirmed("XQ#FES") self.socket_put_confirmed("XQ#FES")
time.sleep(0.1) time.sleep(0.01)
while self.is_axis_moving(None, axis_Id_numeric): while self.is_axis_moving(None, axis_Id_numeric):
time.sleep(0.1) time.sleep(0.01)
axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric) axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric)
# check if we actually hit the limit # check if we actually hit the limit

View File

@ -1 +1,2 @@
from .rt_lamni_ophyd import RtLamniMotor, RtLamniController from .rt_flomni_ophyd import RtFlomniController, RtFlomniMotor
from .rt_lamni_ophyd import RtLamniController, RtLamniMotor

View File

@ -0,0 +1,749 @@
import functools
import threading
import time
from typing import List
import numpy as np
from bec_lib import MessageEndpoints, bec_logger, messages
from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError, ReadOnlyError
from ophyd_devices.rt_lamni.rt_ophyd import (
BECConfigError,
RtCommunicationError,
RtController,
RtError,
RtReadbackSignal,
RtSetpointSignal,
RtSignalRO,
retry_once,
)
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
from prettytable import PrettyTable
logger = bec_logger.logger
class RtFlomniController(RtController):
USER_ACCESS = [
"socket_put_and_receive",
"set_rotation_angle",
"feedback_disable",
"feedback_enable_without_reset",
"feedback_enable_with_reset",
"add_pos_to_scan",
"clear_trajectory_generator",
"show_cyclic_error_compensation",
"laser_tracker_on",
"laser_tracker_off",
"laser_tracker_show_all",
]
def __init__(
self,
*,
name=None,
socket_cls=None,
socket_host=None,
socket_port=None,
attr_name="",
parent=None,
labels=None,
kind=None,
):
super().__init__(
name=name,
socket_cls=socket_cls,
socket_host=socket_host,
socket_port=socket_port,
attr_name=attr_name,
parent=parent,
labels=labels,
kind=kind,
)
self.tracker_info = {}
self._min_scan_buffer_reached = False
self.rt_pid_voltage = None
def add_pos_to_scan(self, positions) -> None:
def send_positions(parent, positions):
parent._min_scan_buffer_reached = False
start_time = time.time()
for pos_index, pos in enumerate(positions):
parent.socket_put_and_receive(f"s{pos[0]},{pos[1]},{pos[2]}")
if pos_index > 100:
parent._min_scan_buffer_reached = True
parent._min_scan_buffer_reached = True
logger.info(
f"Sending {len(positions)} positions took {time.time()-start_time} seconds."
)
threading.Thread(target=send_positions, args=(self, positions), daemon=True).start()
def move_to_zero(self):
self.socket_put("pa0,0")
self.get_axis_by_name("rtx").user_setpoint.setpoint = 0
self.socket_put("pa1,0")
self.get_axis_by_name("rty").user_setpoint.setpoint = 0
self.socket_put("pa2,0")
self.get_axis_by_name("rtz").user_setpoint.setpoint = 0
time.sleep(0.05)
def feedback_is_running(self) -> bool:
status = int(float(self.socket_put_and_receive("l2").strip()))
if status == 1:
return False
return True
def feedback_enable_with_reset(self):
self.socket_put("l0") # disable feedback
self.move_to_zero()
if not self.slew_rate_limiters_on_target() or np.abs(self.pid_y()) > 0.1:
logger.info("Please wait, slew rate limiters not on target.")
while not self.slew_rate_limiters_on_target() or np.abs(self.pid_y()) > 0.1:
time.sleep(0.05)
self.get_device_manager().devices.rty.update_user_parameter({"tomo_additional_offsety": 0})
self.clear_trajectory_generator()
self.laser_tracker_on()
# move to 0. FUPR will set the rotation angle during readout
self.get_device_manager().devices.fsamroy.obj.move(0, wait=True)
fsamx = self.get_device_manager().devices.fsamx
fsamx.obj.pid_x_correction = 0
fsamx.obj.controller.socket_put_confirmed("axspeed[4]=0.1*stppermm[4]")
fsamx_in = fsamx.user_parameter.get("in")
if not np.isclose(fsamx.obj.readback.get(), fsamx_in, atol=0.3):
raise RtError(
"Something is wrong. fsamx is very far from the samx_in position. Don't dare correct automatically."
)
if not np.isclose(fsamx.obj.readback.get(), fsamx_in, atol=0.01):
fsamx.enabled_set = True
fsamx.obj.move(fsamx_in, wait=True)
fsamx.enabled_set = False
time.sleep(1)
self.socket_put("l1")
time.sleep(0.4)
if not self.feedback_is_running():
raise RtError("Feedback is not running; likely an error in the interferometer.")
time.sleep(1.5)
self.show_cyclic_error_compensation()
self.rt_pid_voltage = self.get_pid_x()
rtx = self.get_device_manager().devices.rtx
rtx.update_user_parameter({"rt_pid_voltage": self.rt_pid_voltage})
self.set_device_enabled("fsamx", False)
self.set_device_enabled("fsamy", False)
self.set_device_enabled("foptx", False)
self.set_device_enabled("fopty", False)
def move_samx_to_scan_region(self, fovx: float, cenx: float):
if self.rt_pid_voltage is None:
rtx = self.get_device_manager().devices.rtx
self.rt_pid_voltage = rtx.user_parameter.get("rt_pid_voltage")
if self.rt_pid_voltage is None:
raise RtError(
"rt_pid_voltage not set in rtx user parameters. Please run feedback_enable_with_reset first."
)
logger.info(f"Using PID voltage from rtx user parameter: {self.rt_pid_voltage}")
expected_voltage = self.rt_pid_voltage + fovx / 2 * 7 / 100
logger.info(f"Expected PID voltage: {expected_voltage}")
logger.info(f"Current PID voltage: {self.get_pid_x()}")
wait_on_exit = False
while True:
if np.abs(self.get_pid_x() - expected_voltage) < 1:
break
wait_on_exit = True
self.socket_put("v0")
fsamx = self.get_device_manager().devices.fsamx
fsamx.enabled_set = True
fsamx.obj.controller.socket_put_confirmed("axspeed[4]=0.1*stppermm[4]")
fsamx.obj.pid_x_correction -= (self.get_pid_x() - expected_voltage) * 0.01
logger.info(f"Correcting fsamx by {fsamx.obj.pid_x_correction}")
fsamx_in = fsamx.user_parameter.get("in")
fsamx.obj.move(fsamx_in + cenx / 1000 + fsamx.obj.pid_x_correction, wait=True)
fsamx.enabled_set = False
time.sleep(0.1)
self.laser_tracker_on()
time.sleep(0.01)
if wait_on_exit:
time.sleep(1)
self.socket_put("v1")
@threadlocked
def clear_trajectory_generator(self):
self.socket_put("sc")
logger.info("flomni scan stopped and deleted, moving to start position")
def feedback_enable_without_reset(self):
self.laser_tracker_on()
self.socket_put("l3")
time.sleep(0.01)
if not self.feedback_is_running():
raise RtError("Feedback is not running; likely an error in the interferometer.")
self.set_device_enabled("fsamx", False)
self.set_device_enabled("fsamy", False)
self.set_device_enabled("foptx", False)
self.set_device_enabled("fopty", False)
def feedback_disable(self):
self.clear_trajectory_generator()
self.move_to_zero()
self.socket_put("l0")
self.set_device_enabled("fsamx", True)
self.set_device_enabled("fsamy", True)
self.set_device_enabled("foptx", True)
self.set_device_enabled("fopty", True)
fsamx = self.get_device_manager().devices.fsamx
fsamx.obj.controller.socket_put_confirmed("axspeed[4]=025*stppermm[4]")
def get_pid_x(self) -> float:
voltage = float(self.socket_put_and_receive("g").strip())
return voltage
def show_cyclic_error_compensation(self):
cec0 = int(float(self.socket_put_and_receive("w0").strip()))
cec1 = int(float(self.socket_put_and_receive("w0").strip()))
if cec0 == 32:
logger.info("Cyclic Error Compensation: y-axis is initialized")
else:
logger.info("Cyclic Error Compensation: y-axis is NOT initialized")
if cec1 == 32:
logger.info("Cyclic Error Compensation: x-axis is initialized")
else:
logger.info("Cyclic Error Compensation: y-axis is NOT initialized")
def set_rotation_angle(self, val: float) -> None:
self.socket_put(f"a{(-val)/180*np.pi}")
def laser_tracker_on(self):
self.laser_update_tracker_info()
if not self.tracker_info["enabled_z"] or not self.tracker_info["enabled_y"]:
logger.info("Enabling the laser tracker. Please wait...")
tracker_intensity = self.tracker_info["tracker_intensity"]
if (
tracker_intensity < self.tracker_info["threshold_intensity_y"]
or tracker_intensity < self.tracker_info["threshold_intensity_z"]
):
logger.info(self.tracker_info)
raise RtError("The tracker cannot be enabled because the beam intensity it low.")
self.move_to_zero()
self.socket_put("T1")
time.sleep(0.5)
self.get_device_manager().devices.ftrackz.obj.controller.socket_put_confirmed(
"trackyct=0"
)
self.get_device_manager().devices.ftrackz.obj.controller.socket_put_confirmed(
"trackzct=0"
)
self.laser_tracker_wait_on_target()
logger.info("Laser tracker running!")
def laser_tracker_off(self):
self.socket_put("T0")
def laser_tracker_show_all(self):
self.laser_update_tracker_info()
t = PrettyTable()
t.title = f"Laser Tracker Info"
t.field_names = ["Name", "Value"]
for key, val in self.tracker_info.items():
t.add_row([key, val])
print(t)
def laser_update_tracker_info(self):
ret = self.socket_put_and_receive("Ts")
# remove trailing \n
ret = ret.split("\n")[0]
tracker_values = [float(val) for val in ret.split(",")]
self.tracker_info = {
"tracker_intensity": tracker_values[2],
"threshold_intensity_y": tracker_values[8],
"enabled_y": bool(tracker_values[10]),
"beampos_y": tracker_values[5],
"target_y": tracker_values[6],
"piezo_voltage_y": tracker_values[9],
"threshold_intensity_z": tracker_values[3],
"enabled_z": bool(tracker_values[10]),
"beampos_z": tracker_values[0],
"target_z": tracker_values[1],
"piezo_voltage_z": tracker_values[4],
}
def laser_tracker_galil_enable(self):
self.get_device_manager().devices.ftrackz.obj.controller.socket_put_confirmed("tracken=1")
self.get_device_manager().devices.ftrackz.obj.controller.socket_put_confirmed("trackyct=0")
self.get_device_manager().devices.ftrackz.obj.controller.socket_put_confirmed("trackzct=0")
self.get_device_manager().devices.ftrackz.obj.controller.socket_put_confirmed("XQ#Tracker")
def laser_tracker_on_target(self) -> bool:
self.laser_update_tracker_info()
if np.isclose(
self.tracker_info["beampos_y"], self.tracker_info["target_y"], atol=0.02
) and np.isclose(self.tracker_info["beampos_z"], self.tracker_info["target_z"], atol=0.02):
return True
return False
def laser_tracker_wait_on_target(self):
max_repeat = 25
count = 0
while not self.laser_tracker_on_target():
self.laser_tracker_galil_enable()
logger.info("Waiting for laser tracker to reach target.")
time.sleep(0.5)
count += 1
if count > max_repeat:
raise RtError("Failed to reach laser target position.")
def slew_rate_limiters_on_target(self) -> bool:
ret = int(float(self.socket_put_and_receive("y").strip()))
if ret == 3:
return True
return False
def pid_y(self) -> float:
ret = float(self.socket_put_and_receive("G").strip())
return ret
def read_ssi_interferometer(self, axis_number):
val = float(self.socket_put_and_receive(f"j{axis_number}").strip())
return val
def show_signal_strength_interferometer(self):
t = PrettyTable()
t.title = f"Interferometer signal strength"
t.field_names = ["Axis", "Value"]
for i in range(3):
t.add_row([i, self.read_ssi_interferometer(i)])
print(t)
def _get_signals_from_table(self, return_table) -> dict:
self.average_stdeviations_x_st_fzp += float(return_table[4])
self.average_stdeviations_y_st_fzp += float(return_table[7])
signals = {
"target_x": {"value": float(return_table[2])},
"average_x_st_fzp": {"value": float(return_table[3])},
"stdev_x_st_fzp": {"value": float(return_table[4])},
"target_y": {"value": float(return_table[5])},
"average_y_st_fzp": {"value": float(return_table[6])},
"stdev_y_st_fzp": {"value": float(return_table[7])},
"average_rotz": {"value": float(return_table[8])},
"stdev_rotz": {"value": float(return_table[9])},
"average_stdeviations_x_st_fzp": {
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
},
"average_stdeviations_y_st_fzp": {
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
},
}
return signals
@threadlocked
def start_scan(self):
if not self.feedback_is_running():
logger.error(
"Cannot start scan because feedback loop is not running or there is an interferometer error."
)
raise RtError(
"Cannot start scan because feedback loop is not running or there is an interferometer error."
)
# here exception
(mode, number_of_positions_planned, current_position_in_scan) = self.get_scan_status()
if number_of_positions_planned == 0:
logger.error("Cannot start scan because no target positions are planned.")
raise RtError("Cannot start scan because no target positions are planned.")
# hier exception
# start a point-by-point scan (for cont scan in flomni it would be "sa")
self.socket_put_and_receive("sd")
@retry_once
@threadlocked
def get_scan_status(self):
return_table = (self.socket_put_and_receive("sr")).split(",")
if len(return_table) != 3:
raise RtCommunicationError(
f"Expected to receive 3 return values. Instead received {return_table}"
)
mode = int(float(return_table[0]))
# mode 0: direct positioning
# mode 1: running internal timer (not tested/used anymore)
# mode 2: rt point scan running
# mode 3: rt point scan starting
# mode 5/6: rt continuous scanning (not available in LamNI)
number_of_positions_planned = int(float(return_table[1]))
current_position_in_scan = int(float(return_table[2]))
return (mode, number_of_positions_planned, current_position_in_scan)
def get_device_manager(self):
for axis in self._axis:
if hasattr(axis, "device_manager") and axis.device_manager:
return axis.device_manager
raise BECConfigError("Could not access the device_manager")
def read_positions_from_sampler(self):
# this was for reading after the scan completed
number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read
read_counter = 0
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
# if not (mode==2 or mode==3):
# error
self.get_device_manager().producer.set_and_publish(
MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage(
device="rt_scan", status=1, metadata=self.readout_metadata
).dumps(),
)
# while scan is running
while mode > 0:
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
time.sleep(0.01)
if current_position_in_scan > 5:
while current_position_in_scan > read_counter + 1:
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
# logger.info(f"{return_table}")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.publish_device_data(signals=signals, pointID=int(return_table[0]))
time.sleep(0.05)
# read the last samples even though scan is finished already
while number_of_positions_planned > read_counter:
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
# logger.info(f"{return_table}")
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.publish_device_data(signals=signals, pointID=int(return_table[0]))
self.get_device_manager().producer.set_and_publish(
MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage(
device="rt_scan", status=0, metadata=self.readout_metadata
).dumps(),
)
logger.info(
f"Flomni statistics: Average of all standard deviations: x {self.average_stdeviations_x_st_fzp/number_of_samples_to_read}, y {self.average_stdeviations_y_st_fzp/number_of_samples_to_read}."
)
def publish_device_data(self, signals, pointID):
self.get_device_manager().producer.send(
MessageEndpoints.device_read("rt_flomni"),
messages.DeviceMessage(
signals=signals,
metadata={"pointID": pointID, **self.readout_metadata},
).dumps(),
)
def start_readout(self):
readout = threading.Thread(target=self.read_positions_from_sampler)
readout.start()
def kickoff(self, metadata):
self.readout_metadata = metadata
while not self._min_scan_buffer_reached:
time.sleep(0.001)
self.start_scan()
time.sleep(0.1)
self.start_readout()
class RtFlomniReadbackSignal(RtReadbackSignal):
@retry_once
@threadlocked
def _socket_get(self) -> float:
"""Get command for the readback signal
Returns:
float: Readback value after adjusting for sign and motor resolution.
"""
time.sleep(0.1)
return_table = (self.controller.socket_put_and_receive(f"pr")).split(",")
current_pos = float(return_table[self.parent.axis_Id_numeric])
current_pos *= self.parent.sign
self.parent.user_setpoint.setpoint = current_pos
return current_pos
class RtFlomniSetpointSignal(RtSetpointSignal):
setpoint = 0
@retry_once
@threadlocked
def _socket_set(self, val: float) -> None:
"""Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign.
Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted.
Args:
val (float): Target value / setpoint value
Raises:
RtError: Raised if interferometer feedback is disabled.
"""
if not self.parent.controller.feedback_is_running():
raise RtError(
"The interferometer feedback is not running. Either it is turned off or and interferometer error occured."
)
self.set_with_feedback_disabled(val)
def set_with_feedback_disabled(self, val):
target_val = val * self.parent.sign
self.setpoint = target_val
self.controller.socket_put(f"pa{self.parent.axis_Id_numeric},{target_val:.4f}")
class RtFlomniFeedbackRunning(RtSignalRO):
@threadlocked
def _socket_get(self):
return int(self.parent.controller.feedback_is_running())
class RtFlomniMotor(Device, PositionerBase):
USER_ACCESS = ["controller"]
readback = Cpt(
RtFlomniReadbackSignal,
signal_name="readback",
kind="hinted",
)
user_setpoint = Cpt(RtFlomniSetpointSignal, signal_name="setpoint")
high_limit_travel = Cpt(Signal, value=0, kind="omitted")
low_limit_travel = Cpt(Signal, value=0, kind="omitted")
SUB_READBACK = "readback"
SUB_CONNECTION_CHANGE = "connection_change"
_default_sub = SUB_READBACK
def __init__(
self,
axis_Id,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
host="mpc2844.psi.ch",
port=2222,
sign=1,
socket_cls=SocketIO,
device_manager=None,
limits=None,
**kwargs,
):
self.axis_Id = axis_Id
self.sign = sign
self.controller = RtFlomniController(
socket_cls=socket_cls, socket_host=host, socket_port=port
)
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.device_manager = device_manager
self.tolerance = kwargs.pop("tolerance", 0.5)
super().__init__(
prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.readback.name = self.name
self.controller.subscribe(
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
)
self._update_connection_state()
# self.readback.subscribe(self._forward_readback, event_type=self.readback.SUB_VALUE)
if limits is not None:
assert len(limits) == 2
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
@property
def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get())
@property
def low_limit(self):
return self.limits[0]
@property
def high_limit(self):
return self.limits[1]
def check_value(self, pos):
"""Check that the position is within the soft limits"""
low_limit, high_limit = self.limits
if low_limit < high_limit and not (low_limit <= pos <= high_limit):
raise LimitError(f"position={pos} not within limits {self.limits}")
def _update_connection_state(self, **kwargs):
for walk in self.walk_signals():
walk.item._metadata["connected"] = self.controller.connected
def _forward_readback(self, **kwargs):
kwargs.pop("sub_type")
self._run_subs(sub_type="readback", **kwargs)
@raise_if_disconnected
def move(self, position, wait=True, **kwargs):
"""Move to a specified position, optionally waiting for motion to
complete.
Parameters
----------
position
Position to move to
moved_cb : callable
Call this callback when movement has finished. This callback must
accept one keyword argument: 'obj' which will be set to this
positioner instance.
timeout : float, optional
Maximum time to wait for the motion. If None, the default timeout
for this positioner is used.
Returns
-------
status : MoveStatus
Raises
------
TimeoutError
When motion takes longer than `timeout`
ValueError
On invalid positions
RuntimeError
If motion fails other than timing out
"""
self._started_moving = False
timeout = kwargs.pop("timeout", 100)
status = super().move(position, timeout=timeout, **kwargs)
self.user_setpoint.put(position, wait=False)
def move_and_finish():
while not self.controller.slew_rate_limiters_on_target():
print("motor is moving")
val = self.readback.read()
self._run_subs(
sub_type=self.SUB_READBACK,
value=val,
timestamp=time.time(),
)
time.sleep(0.01)
print("Move finished")
self._done_moving()
threading.Thread(target=move_and_finish, daemon=True).start()
try:
if wait:
status_wait(status)
except KeyboardInterrupt:
self.stop()
raise
return status
@property
def axis_Id(self):
return self._axis_Id_alpha
@axis_Id.setter
def axis_Id(self, val):
if isinstance(val, str):
if len(val) != 1:
raise ValueError(f"Only single-character axis_Ids are supported.")
self._axis_Id_alpha = val
self._axis_Id_numeric = ord(val.lower()) - 97
else:
raise TypeError(f"Expected value of type str but received {type(val)}")
@property
def axis_Id_numeric(self):
return self._axis_Id_numeric
@axis_Id_numeric.setter
def axis_Id_numeric(self, val):
if isinstance(val, int):
if val > 26:
raise ValueError(f"Numeric value exceeds supported range.")
self._axis_Id_alpha = val
self._axis_Id_numeric = (chr(val + 97)).capitalize()
else:
raise TypeError(f"Expected value of type int but received {type(val)}")
def kickoff(self, metadata, **kwargs) -> None:
self.controller.kickoff(metadata)
@property
def egu(self):
"""The engineering units (EGU) for positions"""
return "um"
# how is this used later?
def stage(self) -> List[object]:
return super().stage()
def unstage(self) -> List[object]:
return super().unstage()
def stop(self, *, success=False):
self.controller.stop_all_axes()
return super().stop(success=success)
if __name__ == "__main__":
rtcontroller = RtFlomniController(
socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222
)
rtcontroller.on()
rtcontroller.laser_tracker_on()

View File

@ -0,0 +1,831 @@
import functools
import threading
import time
from typing import List
import numpy as np
from bec_lib import MessageEndpoints, bec_logger, messages
from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError, ReadOnlyError
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
logger = bec_logger.logger
class RtCommunicationError(Exception):
pass
class RtError(Exception):
pass
class BECConfigError(Exception):
pass
def retry_once(fcn):
"""Decorator to rerun a function in case a CommunicationError was raised. This may happen if the buffer was not empty."""
@functools.wraps(fcn)
def wrapper(self, *args, **kwargs):
try:
val = fcn(self, *args, **kwargs)
except (RtCommunicationError, RtError):
val = fcn(self, *args, **kwargs)
return val
return wrapper
class RtController(Controller):
_axes_per_controller = 3
USER_ACCESS = [
"socket_put_and_receive",
"set_rotation_angle",
"feedback_disable",
"feedback_enable_without_reset",
"feedback_disable_and_even_reset_lamni_angle_interferometer",
"feedback_enable_with_reset",
"add_pos_to_scan",
"clear_trajectory_generator",
"_set_axis_velocity",
"_set_axis_velocity_maximum_speed",
"_position_sampling_single_read",
"_position_sampling_single_reset_and_start_sampling",
]
def on(self, controller_num=0) -> None:
"""Open a new socket connection to the controller"""
# if not self.connected:
# try:
# self.sock.open()
# # discuss - after disconnect takes a while for the server to be ready again
# max_retries = 10
# tries = 0
# while not self.connected:
# try:
# welcome_message = self.sock.receive()
# self.connected = True
# except ConnectionResetError as conn_reset:
# if tries > max_retries:
# raise conn_reset
# tries += 1
# time.sleep(2)
# except ConnectionRefusedError as conn_error:
# logger.error("Failed to open a connection to RTLamNI.")
# raise RtCommunicationError from conn_error
# else:
# logger.info("The connection has already been established.")
# # warnings.warn(f"The connection has already been established.", stacklevel=2)
super().on()
# self._update_flyer_device_info()
def set_axis(self, axis: Device, axis_nr: int) -> None:
"""Assign an axis to a device instance.
Args:
axis (Device): Device instance (e.g. GalilMotor)
axis_nr (int): Controller axis number
"""
self._axis[axis_nr] = axis
@threadlocked
def socket_put(self, val: str) -> None:
self.sock.put(f"{val}\n".encode())
@threadlocked
def socket_get(self) -> str:
return self.sock.receive().decode()
@retry_once
@threadlocked
def socket_put_and_receive(self, val: str, remove_trailing_chars=True) -> str:
self.socket_put(val)
if remove_trailing_chars:
return self._remove_trailing_characters(self.sock.receive().decode())
return self.socket_get()
def is_axis_moving(self, axis_Id) -> bool:
# this checks that axis is on target
axis_is_on_target = bool(float(self.socket_put_and_receive(f"o")))
return not axis_is_on_target
# def is_thread_active(self, thread_id: int) -> bool:
# val = float(self.socket_put_and_receive(f"MG_XQ{thread_id}"))
# if val == -1:
# return False
# return True
def _remove_trailing_characters(self, var) -> str:
if len(var) > 1:
return var.split("\r\n")[0]
return var
@threadlocked
def set_rotation_angle(self, val: float):
self.socket_put(f"a{(val-300+30.538)/180*np.pi}")
@threadlocked
def stop_all_axes(self):
self.socket_put("sc")
@threadlocked
def feedback_disable(self):
self.socket_put("J0")
logger.info("LamNI Feedback disabled.")
self.set_device_enabled("lsamx", True)
self.set_device_enabled("lsamy", True)
self.set_device_enabled("loptx", True)
self.set_device_enabled("lopty", True)
self.set_device_enabled("loptz", True)
@threadlocked
def _set_axis_velocity(self, um_per_s):
self.socket_put(f"V{um_per_s}")
@threadlocked
def _set_axis_velocity_maximum_speed(self):
self.socket_put(f"V0")
# for developement of soft continuous scanning
@threadlocked
def _position_sampling_single_reset_and_start_sampling(self):
self.socket_put(f"Ss")
@threadlocked
def _position_sampling_single_read(self):
(number_of_samples, sum0, sum0_2, sum1, sum1_2, sum2, sum2_2) = self.socket_put_and_receive(
f"Sr"
).split(",")
avg_x = float(sum1) / int(number_of_samples)
avg_y = float(sum0) / int(number_of_samples)
stdev_x = np.sqrt(
float(sum1_2) / int(number_of_samples)
- np.power(float(sum1) / int(number_of_samples), 2)
)
stdev_y = np.sqrt(
float(sum0_2) / int(number_of_samples)
- np.power(float(sum0) / int(number_of_samples), 2)
)
return (avg_x, avg_y, stdev_x, stdev_y)
@threadlocked
def feedback_enable_without_reset(self):
# read current interferometer position
return_table = (self.socket_put_and_receive(f"J4")).split(",")
x_curr = float(return_table[2])
y_curr = float(return_table[1])
# set these as closed loop target position
self.socket_put(f"pa0,{x_curr:.4f}")
self.socket_put(f"pa1,{y_curr:.4f}")
self.get_device_manager().devices.rtx.obj.user_setpoint.set_with_feedback_disabled(x_curr)
self.get_device_manager().devices.rty.obj.user_setpoint.set_with_feedback_disabled(y_curr)
self.socket_put("J5")
logger.info("LamNI Feedback enabled (without reset).")
self.set_device_enabled("lsamx", False)
self.set_device_enabled("lsamy", False)
self.set_device_enabled("loptx", False)
self.set_device_enabled("lopty", False)
self.set_device_enabled("loptz", False)
@threadlocked
def feedback_disable_and_even_reset_lamni_angle_interferometer(self):
self.socket_put("J6")
logger.info("LamNI Feedback disabled including the angular interferometer.")
self.set_device_enabled("lsamx", True)
self.set_device_enabled("lsamy", True)
self.set_device_enabled("loptx", True)
self.set_device_enabled("lopty", True)
self.set_device_enabled("loptz", True)
def get_device_manager(self):
for axis in self._axis:
if hasattr(axis, "device_manager") and axis.device_manager:
return axis.device_manager
raise BECConfigError("Could not access the device_manager")
def get_axis_by_name(self, name):
for axis in self._axis:
if axis:
if axis.name == name:
return axis
raise RuntimeError(f"Could not find an axis with name {name}")
@threadlocked
def clear_trajectory_generator(self):
self.socket_put("sc")
logger.info("LamNI scan stopped and deleted, moving to start position")
def add_pos_to_scan(self, positions) -> None:
def send_positions(parent, positions):
parent._min_scan_buffer_reached = False
for pos_index, pos in enumerate(positions):
parent.socket_put_and_receive(f"s{pos[0]},{pos[1]},0")
if pos_index > 100:
parent._min_scan_buffer_reached = True
parent._min_scan_buffer_reached = True
threading.Thread(target=send_positions, args=(self, positions), daemon=True).start()
@retry_once
@threadlocked
def get_scan_status(self):
return_table = (self.socket_put_and_receive(f"sr")).split(",")
if len(return_table) != 3:
raise RtCommunicationError(
f"Expected to receive 3 return values. Instead received {return_table}"
)
mode = int(return_table[0])
# mode 0: direct positioning
# mode 1: running internal timer (not tested/used anymore)
# mode 2: rt point scan running
# mode 3: rt point scan starting
# mode 5/6: rt continuous scanning (not available in LamNI)
number_of_positions_planned = int(return_table[1])
current_position_in_scan = int(return_table[2])
return (mode, number_of_positions_planned, current_position_in_scan)
@threadlocked
def start_scan(self):
interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0])
if interferometer_feedback_not_running == 1:
logger.error(
"Cannot start scan because feedback loop is not running or there is an interferometer error."
)
raise RtError(
"Cannot start scan because feedback loop is not running or there is an interferometer error."
)
# here exception
(mode, number_of_positions_planned, current_position_in_scan) = self.get_scan_status()
if number_of_positions_planned == 0:
logger.error("Cannot start scan because no target positions are planned.")
raise RtError("Cannot start scan because no target positions are planned.")
# hier exception
# start a point-by-point scan (for cont scan in flomni it would be "sa")
self.socket_put_and_receive("sd")
def start_readout(self):
readout = threading.Thread(target=self.read_positions_from_sampler)
readout.start()
def _update_flyer_device_info(self):
flyer_info = self._get_flyer_device_info()
self.get_device_manager().producer.set(
MessageEndpoints.device_info("rt_scan"),
messages.DeviceInfoMessage(device="rt_scan", info=flyer_info).dumps(),
)
def _get_flyer_device_info(self) -> dict:
return {
"device_name": self.name,
"device_attr_name": getattr(self, "attr_name", ""),
"device_dotted_name": getattr(self, "dotted_name", ""),
"device_info": {
"device_base_class": "ophydobject",
"signals": [],
"hints": {"fields": ["average_x_st_fzp", "average_y_st_fzp"]},
"describe": {},
"describe_configuration": {},
"sub_devices": [],
"custom_user_access": [],
},
}
def kickoff(self, metadata):
self.readout_metadata = metadata
while not self._min_scan_buffer_reached:
time.sleep(0.001)
self.start_scan()
time.sleep(0.1)
self.start_readout()
def _get_signals_from_table(self, return_table) -> dict:
self.average_stdeviations_x_st_fzp += float(return_table[5])
self.average_stdeviations_y_st_fzp += float(return_table[8])
self.average_lamni_angle += float(return_table[19])
signals = {
"target_x": {"value": float(return_table[3])},
"average_x_st_fzp": {"value": float(return_table[4])},
"stdev_x_st_fzp": {"value": float(return_table[5])},
"target_y": {"value": float(return_table[6])},
"average_y_st_fzp": {"value": float(return_table[7])},
"stdev_y_st_fzp": {"value": float(return_table[8])},
"average_cap1": {"value": float(return_table[9])},
"stdev_cap1": {"value": float(return_table[10])},
"average_cap2": {"value": float(return_table[11])},
"stdev_cap2": {"value": float(return_table[12])},
"average_cap3": {"value": float(return_table[13])},
"stdev_cap3": {"value": float(return_table[14])},
"average_cap4": {"value": float(return_table[15])},
"stdev_cap4": {"value": float(return_table[16])},
"average_cap5": {"value": float(return_table[17])},
"stdev_cap5": {"value": float(return_table[18])},
"average_angle_interf_ST": {"value": float(return_table[19])},
"stdev_angle_interf_ST": {"value": float(return_table[20])},
"average_stdeviations_x_st_fzp": {
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
},
"average_stdeviations_y_st_fzp": {
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
},
"average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)},
}
return signals
def read_positions_from_sampler(self):
# this was for reading after the scan completed
number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read
read_counter = 0
previous_point_in_scan = 0
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
# if not (mode==2 or mode==3):
# error
self.get_device_manager().producer.set_and_publish(
MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage(
device="rt_scan", status=1, metadata=self.readout_metadata
).dumps(),
)
# while scan is running
while mode > 0:
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
time.sleep(0.01)
if current_position_in_scan > 5:
while current_position_in_scan > read_counter + 1:
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
# logger.info(f"{return_table}")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.publish_device_data(signals=signals, pointID=int(return_table[0]))
time.sleep(0.05)
# read the last samples even though scan is finished already
while number_of_positions_planned > read_counter:
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
# logger.info(f"{return_table}")
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.publish_device_data(signals=signals, pointID=int(return_table[0]))
self.get_device_manager().producer.set_and_publish(
MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage(
device="rt_scan", status=0, metadata=self.readout_metadata
).dumps(),
)
logger.info(
f"LamNI statistics: Average of all standard deviations: x {self.average_stdeviations_x_st_fzp/number_of_samples_to_read}, y {self.average_stdeviations_y_st_fzp/number_of_samples_to_read}, angle {self.average_lamni_angle/number_of_samples_to_read}."
)
def publish_device_data(self, signals, pointID):
self.get_device_manager().producer.send(
MessageEndpoints.device_read("rt_lamni"),
messages.DeviceMessage(
signals=signals,
metadata={"pointID": pointID, **self.readout_metadata},
).dumps(),
)
def feedback_status_angle_lamni(self) -> bool:
return_table = (self.socket_put_and_receive(f"J7")).split(",")
logger.debug(
f"LamNI angle interferomter status {bool(return_table[0])}, position {float(return_table[1])}, signal {float(return_table[2])}"
)
return bool(return_table[0])
def feedback_enable_with_reset(self):
if not self.feedback_status_angle_lamni():
self.feedback_disable_and_even_reset_lamni_angle_interferometer()
logger.info(f"LamNI resetting interferometer inclusive angular interferomter.")
else:
self.feedback_disable()
logger.info(
f"LamNI resetting interferomter except angular interferometer which is already running."
)
# set these as closed loop target position
self.socket_put(f"pa0,0")
self.get_axis_by_name("rtx").user_setpoint.setpoint = 0
self.socket_put(f"pa1,0")
self.get_axis_by_name("rty").user_setpoint.setpoint = 0
self.socket_put(
f"pa2,0"
) # we set all three outputs of the traj. gen. although in LamNI case only 0,1 are used
self.clear_trajectory_generator()
self.get_device_manager().devices.lsamrot.obj.move(0, wait=True)
galil_controller_rt_status = (
self.get_device_manager().devices.lsamx.obj.controller.lgalil_is_air_off_and_orchestra_enabled()
)
if galil_controller_rt_status == 0:
logger.error(
"Cannot enable feedback. The small rotation air is on and/or orchestra disabled by the motor controller."
)
raise RtError(
"Cannot enable feedback. The small rotation air is on and/or orchestra disabled by the motor controller."
)
time.sleep(0.03)
lsamx_user_params = self.get_device_manager().devices.lsamx.user_parameter
if lsamx_user_params is None or lsamx_user_params.get("center") is None:
raise RuntimeError("lsamx center is not defined")
lsamy_user_params = self.get_device_manager().devices.lsamy.user_parameter
if lsamy_user_params is None or lsamy_user_params.get("center") is None:
raise RuntimeError("lsamy center is not defined")
lsamx_center = lsamx_user_params.get("center")
lsamy_center = lsamy_user_params.get("center")
self.get_device_manager().devices.lsamx.obj.move(lsamx_center, wait=True)
self.get_device_manager().devices.lsamy.obj.move(lsamy_center, wait=True)
self.socket_put("J1")
_waitforfeedbackctr = 0
interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0])
while interferometer_feedback_not_running == 1 and _waitforfeedbackctr < 100:
time.sleep(0.01)
_waitforfeedbackctr = _waitforfeedbackctr + 1
interferometer_feedback_not_running = int(
(self.socket_put_and_receive("J2")).split(",")[0]
)
self.set_device_enabled("lsamx", False)
self.set_device_enabled("lsamy", False)
self.set_device_enabled("loptx", False)
self.set_device_enabled("lopty", False)
self.set_device_enabled("loptz", False)
if interferometer_feedback_not_running == 1:
logger.error(
"Cannot start scan because feedback loop is not running or there is an interferometer error."
)
raise RtError(
"Cannot start scan because feedback loop is not running or there is an interferometer error."
)
time.sleep(0.01)
# ptychography_alignment_done = 0
def set_device_enabled(self, device_name: str, enabled: bool) -> None:
"""enable / disable a device"""
if device_name not in self.get_device_manager().devices:
logger.warning(
f"Device {device_name} is not configured and cannot be enabled/disabled."
)
return
self.get_device_manager().devices[device_name].enabled_set = enabled
class RtSignalBase(SocketSignal):
def __init__(self, signal_name, **kwargs):
self.signal_name = signal_name
super().__init__(**kwargs)
self.controller = self.parent.controller
self.sock = self.parent.controller.sock
class RtSignalRO(RtSignalBase):
def __init__(self, signal_name, **kwargs):
super().__init__(signal_name, **kwargs)
self._metadata["write_access"] = False
def _socket_set(self, val):
raise ReadOnlyError("Read-only signals cannot be set")
class RtReadbackSignal(RtSignalRO):
@retry_once
@threadlocked
def _socket_get(self) -> float:
"""Get command for the readback signal
Returns:
float: Readback value after adjusting for sign and motor resolution.
"""
return_table = (self.controller.socket_put_and_receive(f"J4")).split(",")
print(return_table)
if self.parent.axis_Id_numeric == 0:
readback_index = 2
elif self.parent.axis_Id_numeric == 1:
readback_index = 1
else:
raise RtError("Currently, only two axes are supported.")
current_pos = float(return_table[readback_index])
current_pos *= self.parent.sign
return current_pos
class RtSetpointSignal(RtSignalBase):
setpoint = 0
def _socket_get(self) -> float:
"""Get command for receiving the setpoint / target value.
The value is not pulled from the controller but instead just the last setpoint used.
Returns:
float: setpoint / target value
"""
return self.setpoint
@retry_once
@threadlocked
def _socket_set(self, val: float) -> None:
"""Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign.
Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted.
Args:
val (float): Target value / setpoint value
Raises:
RtError: Raised if interferometer feedback is disabled.
"""
interferometer_feedback_not_running = int(
(self.controller.socket_put_and_receive("J2")).split(",")[0]
)
if interferometer_feedback_not_running != 0:
raise RtError(
"The interferometer feedback is not running. Either it is turned off or and interferometer error occured."
)
self.set_with_feedback_disabled(val)
def set_with_feedback_disabled(self, val):
target_val = val * self.parent.sign
self.setpoint = target_val
self.controller.socket_put(f"pa{self.parent.axis_Id_numeric},{target_val:.4f}")
class RtMotorIsMoving(RtSignalRO):
def _socket_get(self):
return self.controller.is_axis_moving(self.parent.axis_Id_numeric)
def get(self):
val = super().get()
if val is not None:
self._run_subs(
sub_type=self.SUB_VALUE,
value=val,
timestamp=time.time(),
)
return val
class RtFeedbackRunning(RtSignalRO):
@threadlocked
def _socket_get(self):
if int((self.controller.socket_put_and_receive("J2")).split(",")[0]) == 0:
return 1
else:
return 0
class RtMotor(Device, PositionerBase):
USER_ACCESS = ["controller"]
readback = Cpt(
RtReadbackSignal,
signal_name="readback",
kind="hinted",
)
user_setpoint = Cpt(RtSetpointSignal, signal_name="setpoint")
motor_is_moving = Cpt(RtMotorIsMoving, signal_name="motor_is_moving", kind="normal")
high_limit_travel = Cpt(Signal, value=0, kind="omitted")
low_limit_travel = Cpt(Signal, value=0, kind="omitted")
SUB_READBACK = "readback"
SUB_CONNECTION_CHANGE = "connection_change"
_default_sub = SUB_READBACK
def __init__(
self,
axis_Id,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
host="mpc2680.psi.ch",
port=3333,
sign=1,
socket_cls=SocketIO,
device_manager=None,
limits=None,
**kwargs,
):
self.axis_Id = axis_Id
self.sign = sign
self.controller = RtController(socket=socket_cls(host=host, port=port))
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.device_manager = device_manager
self.tolerance = kwargs.pop("tolerance", 0.5)
super().__init__(
prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.readback.name = self.name
self.controller.subscribe(
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
)
self._update_connection_state()
# self.readback.subscribe(self._forward_readback, event_type=self.readback.SUB_VALUE)
if limits is not None:
assert len(limits) == 2
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
@property
def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get())
@property
def low_limit(self):
return self.limits[0]
@property
def high_limit(self):
return self.limits[1]
def check_value(self, pos):
"""Check that the position is within the soft limits"""
low_limit, high_limit = self.limits
if low_limit < high_limit and not (low_limit <= pos <= high_limit):
raise LimitError(f"position={pos} not within limits {self.limits}")
def _update_connection_state(self, **kwargs):
for walk in self.walk_signals():
walk.item._metadata["connected"] = self.controller.connected
def _forward_readback(self, **kwargs):
kwargs.pop("sub_type")
self._run_subs(sub_type="readback", **kwargs)
@raise_if_disconnected
def move(self, position, wait=True, **kwargs):
"""Move to a specified position, optionally waiting for motion to
complete.
Parameters
----------
position
Position to move to
moved_cb : callable
Call this callback when movement has finished. This callback must
accept one keyword argument: 'obj' which will be set to this
positioner instance.
timeout : float, optional
Maximum time to wait for the motion. If None, the default timeout
for this positioner is used.
Returns
-------
status : MoveStatus
Raises
------
TimeoutError
When motion takes longer than `timeout`
ValueError
On invalid positions
RuntimeError
If motion fails other than timing out
"""
self._started_moving = False
timeout = kwargs.pop("timeout", 100)
status = super().move(position, timeout=timeout, **kwargs)
self.user_setpoint.put(position, wait=False)
def move_and_finish():
while self.motor_is_moving.get():
print("motor is moving")
val = self.readback.read()
self._run_subs(
sub_type=self.SUB_READBACK,
value=val,
timestamp=time.time(),
)
time.sleep(0.01)
print("Move finished")
self._done_moving()
threading.Thread(target=move_and_finish, daemon=True).start()
try:
if wait:
status_wait(status)
except KeyboardInterrupt:
self.stop()
raise
return status
@property
def axis_Id(self):
return self._axis_Id_alpha
@axis_Id.setter
def axis_Id(self, val):
if isinstance(val, str):
if len(val) != 1:
raise ValueError(f"Only single-character axis_Ids are supported.")
self._axis_Id_alpha = val
self._axis_Id_numeric = ord(val.lower()) - 97
else:
raise TypeError(f"Expected value of type str but received {type(val)}")
@property
def axis_Id_numeric(self):
return self._axis_Id_numeric
@axis_Id_numeric.setter
def axis_Id_numeric(self, val):
if isinstance(val, int):
if val > 26:
raise ValueError(f"Numeric value exceeds supported range.")
self._axis_Id_alpha = val
self._axis_Id_numeric = (chr(val + 97)).capitalize()
else:
raise TypeError(f"Expected value of type int but received {type(val)}")
def kickoff(self, metadata, **kwargs) -> None:
self.controller.kickoff(metadata)
@property
def egu(self):
"""The engineering units (EGU) for positions"""
return "um"
# how is this used later?
def stage(self) -> List[object]:
return super().stage()
def unstage(self) -> List[object]:
return super().unstage()
def stop(self, *, success=False):
self.controller.stop_all_axes()
return super().stop(success=success)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
mock = False
if not mock:
rty = RtLamniMotor("B", name="rty", host="mpc2680.psi.ch", port=3333, sign=1)
rty.stage()
status = rty.move(0, wait=True)
status = rty.move(10, wait=True)
rty.read()
rty.get()
rty.describe()
rty.unstage()
else:
from ophyd_devices.utils.socket import SocketMock
rtx = RtLamniMotor("A", name="rtx", host="mpc2680.psi.ch", port=3333, socket_cls=SocketMock)
rty = RtLamniMotor("B", name="rty", host="mpc2680.psi.ch", port=3333, socket_cls=SocketMock)
rtx.stage()
# rty.stage()

View File

@ -3,8 +3,10 @@ import functools
import json import json
import logging import logging
import os import os
import time
import numpy as np import numpy as np
from prettytable import PrettyTable
from typeguard import typechecked from typeguard import typechecked
from ophyd_devices.smaract.smaract_errors import ( from ophyd_devices.smaract.smaract_errors import (
@ -73,7 +75,13 @@ class SmaractSensors:
class SmaractController(Controller): class SmaractController(Controller):
_axes_per_controller = 6 _axes_per_controller = 6
_initialized = False _initialized = False
USER_ACCESS = ["socket_put_and_receive", "smaract_show_all", "move_open_loop_steps"] USER_ACCESS = [
"socket_put_and_receive",
"smaract_show_all",
"move_open_loop_steps",
"find_reference_mark",
"describe",
]
def __init__( def __init__(
self, self,
@ -117,7 +125,19 @@ class SmaractController(Controller):
raise_if_not_status=False, raise_if_not_status=False,
) -> str: ) -> str:
self.socket_put(val) self.socket_put(val)
return_val = self.socket_get() return_val = ""
max_wait_time = 1
elapsed_time = 0
sleep_time = 0.01
while True:
ret = self.socket_get()
return_val += ret
if ret.endswith("\n"):
break
time.sleep(sleep_time)
elapsed_time += sleep_time
if elapsed_time > max_wait_time:
break
if remove_trailing_chars: if remove_trailing_chars:
return_val = self._remove_trailing_characters(return_val) return_val = self._remove_trailing_characters(return_val)
logger.debug(f"Sending {val}; Returned {return_val}") logger.debug(f"Sending {val}; Returned {return_val}")
@ -234,15 +254,15 @@ class SmaractController(Controller):
@axis_checked @axis_checked
@typechecked @typechecked
def move_open_loop_steps( def move_open_loop_steps(
self, axis_Id_numeric: int, steps: int, amplitude: int = 2000, frequency: int = 500 self, axis_Id_numeric: int, steps: int, amplitude: int = 4000, frequency: int = 2000
) -> None: ) -> None:
"""Move open loop steps """Move open loop steps. It performs a burst of steps with the given parameters.
Args: Args:
axis_Id_numeric (int): Axis number. axis_Id_numeric (int): Axis number.
steps (float): Relative position to move to in mm. steps (int): Number and direction of steps to perform. The valid range is -30,000..30,000. A value of 0 stops the positioner, but see S command. A value of 30,000 or -30,000 performs an unbounded move. This should be used with caution since the positioner will only stop on an S command.
hold_time (int, optional): Specifies how long (in milliseconds) the position is actively held after reaching the target. The valid range is 0..60,000. A 0 deactivates this feature, a value of 60,000 is infinite (until manually stopped, see S command). Defaults to 1000. amplitude (int): Amplitude that the steps are performed with. Lower amplitude values result in a smaller step width. The parameter must be given as a 12bit value (range 0..4,095). 0 corresponds to 0V, 4,095 to 100V. Default: 4000
frequency (int): Frequency in Hz that the steps are performed with. The valid range is 1..18,500. Default: 2000.
""" """
self.socket_put_and_receive( self.socket_put_and_receive(
f"MST{axis_Id_numeric},{steps},{amplitude},{frequency}", f"MST{axis_Id_numeric},{steps},{amplitude},{frequency}",
@ -374,6 +394,15 @@ class SmaractController(Controller):
if self._message_starts_with(return_val, f":ST{axis_Id_numeric}"): if self._message_starts_with(return_val, f":ST{axis_Id_numeric}"):
return self._sensors.avail_sensors.get(int(return_val.strip(f":ST{axis_Id_numeric},"))) return self._sensors.avail_sensors.get(int(return_val.strip(f":ST{axis_Id_numeric},")))
@retry_once
@axis_checked
def find_reference_mark(
self, axis_Id_numeric: int, direction: int, holdTime: int, autoZero: int
) -> None:
return_val = self.socket_put_and_receive(
f"FRM{axis_Id_numeric},{direction},{holdTime},{autoZero}"
)
@retry_once @retry_once
@axis_checked @axis_checked
def set_closed_loop_move_speed(self, axis_Id_numeric: int, move_speed: float) -> None: def set_closed_loop_move_speed(self, axis_Id_numeric: int, move_speed: float) -> None:
@ -419,7 +448,7 @@ class SmaractController(Controller):
"Closed Loop Speed", "Closed Loop Speed",
"Position", "Position",
] ]
for ax in range(self._Smaract_axis_per_controller): for ax in range(self._axes_per_controller):
axis = self._axis[ax] axis = self._axis[ax]
if axis is not None: if axis is not None:
t.add_row( t.add_row(
@ -428,7 +457,7 @@ class SmaractController(Controller):
axis.name, axis.name,
axis.connected, axis.connected,
self.axis_is_referenced(axis.axis_Id_numeric), self.axis_is_referenced(axis.axis_Id_numeric),
self.get_closed_loop_move_speed(axis.axis_Id), self.get_closed_loop_move_speed(axis.axis_Id_numeric),
axis.readback.read().get(axis.name).get("value"), axis.readback.read().get(axis.name).get("value"),
] ]
) )

View File

@ -57,10 +57,6 @@ class Controller(OphydObject):
labels=None, labels=None,
kind=None, kind=None,
): ):
self.sock = None
self._socket_cls = socket_cls
self._socket_host = socket_host
self._socket_port = socket_port
if not self._initialized: if not self._initialized:
super().__init__( super().__init__(
name=name, attr_name=attr_name, parent=parent, labels=labels, kind=kind name=name, attr_name=attr_name, parent=parent, labels=labels, kind=kind
@ -69,6 +65,10 @@ class Controller(OphydObject):
self._axis = [] self._axis = []
self._initialize() self._initialize()
self._initialized = True self._initialized = True
self.sock = None
self._socket_cls = socket_cls
self._socket_host = socket_host
self._socket_port = socket_port
def _initialize(self): def _initialize(self):
self._connected = False self._connected = False
@ -133,7 +133,7 @@ class Controller(OphydObject):
def off(self) -> None: def off(self) -> None:
"""Close the socket connection to the controller""" """Close the socket connection to the controller"""
if self.connected or self.sock is not None: if self.connected and self.sock is not None:
self.sock.close() self.sock.close()
self.connected = False self.connected = False
self.sock = None self.sock = None

View File

@ -179,18 +179,29 @@ class SocketSignal(abc.ABC, Signal):
class SocketIO: class SocketIO:
"""SocketIO helper class for TCP IP connections""" """SocketIO helper class for TCP IP connections"""
def __init__(self, host, port): def __init__(self, host, port, max_retry=10):
self.host = host self.host = host
self.port = port self.port = port
self.is_open = False self.is_open = False
self.max_retry = max_retry
self._initialize_socket() self._initialize_socket()
def connect(self): def connect(self):
print(f"connecting to {self.host} port {self.port}") print(f"connecting to {self.host} port {self.port}")
# self.sock.create_connection((host, port)) # self.sock.create_connection((host, port))
if self.sock is None: retry_count = 0
self._initialize_socket() while True:
self.sock.connect((self.host, self.port)) try:
if self.sock is None:
self._initialize_socket()
self.sock.connect((self.host, self.port))
break
except Exception as exc:
self.sock = None
time.sleep(2)
retry_count += 1
if retry_count > self.max_retry:
raise exc
def _put(self, msg_bytes): def _put(self, msg_bytes):
logger.debug(f"put message: {msg_bytes}") logger.debug(f"put message: {msg_bytes}")