diff --git a/ophyd_devices/__init__.py b/ophyd_devices/__init__.py index c51c5a7..226fa2e 100644 --- a/ophyd_devices/__init__.py +++ b/ophyd_devices/__init__.py @@ -1,9 +1,11 @@ from .eiger1p5m_csaxs.eiger1p5m import Eiger1p5MDetector from .epics import * +from .galil.fgalil_ophyd import FlomniGalilMotor +from .galil.fupr_ophyd import FuprGalilMotor from .galil.galil_ophyd import GalilMotor from .galil.sgalil_ophyd import SGalilMotor from .npoint.npoint import NPointAxis -from .rt_lamni import RtLamniMotor +from .rt_lamni import RtFlomniMotor, RtLamniMotor from .sim.sim import ( SynAxisMonitor, SynAxisOPAAS, diff --git a/ophyd_devices/epics/__init__.py b/ophyd_devices/epics/__init__.py index af09589..6856f55 100644 --- a/ophyd_devices/epics/__init__.py +++ b/ophyd_devices/epics/__init__.py @@ -4,6 +4,7 @@ from ophyd.quadem import QuadEM from ophyd.sim import SynAxis, SynPeriodicSignal, SynSignal from .devices.delay_generator_csaxs import DelayGeneratorcSAXS +from .devices.flomni_sample_storage import FlomniSampleStorage from .devices.InsertionDevice import InsertionDevice from .devices.slits import SlitH, SlitV from .devices.specMotors import ( @@ -20,7 +21,7 @@ from .devices.specMotors import ( PmMonoBender, ) from .devices.SpmBase import SpmBase -from .devices.XbpmBase import XbpmBase, XbpmCsaxsOp # X07MA specific devices from .devices.X07MADevices import * +from .devices.XbpmBase import XbpmBase, XbpmCsaxsOp diff --git a/ophyd_devices/epics/devices/__init__.py b/ophyd_devices/epics/devices/__init__.py index 2b5c0c2..55677c8 100644 --- a/ophyd_devices/epics/devices/__init__.py +++ b/ophyd_devices/epics/devices/__init__.py @@ -1,7 +1,19 @@ -from .slits import SlitH, SlitV -from .XbpmBase import XbpmBase, XbpmCsaxsOp -from .SpmBase import SpmBase +# Standard ophyd classes +from ophyd import EpicsMotor, EpicsSignal, EpicsSignalRO +from ophyd.quadem import QuadEM +from ophyd.sim import SynAxis, SynPeriodicSignal, SynSignal + +from .DelayGeneratorDG645 import DelayGeneratorDG645 +from .eiger9m_csaxs import Eiger9McSAXS + +# cSAXS +from .epics_motor_ex import EpicsMotorEx +from .falcon_csaxs import FalconcSAXS +from .flomni_sample_storage import FlomniSampleStorage from .InsertionDevice import InsertionDevice +from .mcs_csaxs import McsCsaxs +from .pilatus_csaxs import PilatuscSAXS +from .slits import SlitH, SlitV from .specMotors import ( Bpm4i, EnergyKev, @@ -15,19 +27,5 @@ from .specMotors import ( PmDetectorRotation, PmMonoBender, ) - -# Standard ophyd classes -from ophyd import EpicsSignal, EpicsSignalRO, EpicsMotor -from ophyd.sim import SynAxis, SynSignal, SynPeriodicSignal -from ophyd.quadem import QuadEM - -# cSAXS -from .epics_motor_ex import EpicsMotorEx -from .mcs_csaxs import MCScSAXS -from .psi_detector_base import PSIDetectorBase, CustomDetectorMixin -from .eiger9m_csaxs import Eiger9McSAXS -from .pilatus_csaxs import PilatuscSAXS -from .falcon_csaxs import FalconcSAXS -from .delay_generator_csaxs import DelayGeneratorcSAXS - -# from .psi_detector_base import PSIDetectorBase, CustomDetectorMixin +from .SpmBase import SpmBase +from .XbpmBase import XbpmBase, XbpmCsaxsOp diff --git a/ophyd_devices/epics/devices/flomni_sample_storage.py b/ophyd_devices/epics/devices/flomni_sample_storage.py new file mode 100644 index 0000000..e9f1b7f --- /dev/null +++ b/ophyd_devices/epics/devices/flomni_sample_storage.py @@ -0,0 +1,82 @@ +import time + +from ophyd import Component as Cpt +from ophyd import Device +from ophyd import DynamicDeviceComponent as Dcpt +from ophyd import EpicsSignal + + +class FlomniSampleStorageError(Exception): + pass + + +class FlomniSampleStorage(Device): + USER_ACCESS = [ + "is_sample_slot_used", + "is_sample_in_gripper", + "set_sample_slot", + "unset_sample_slot", + "set_sample_in_gripper", + "unset_sample_in_gripper", + ] + SUB_VALUE = "value" + _default_sub = SUB_VALUE + sample_placed = { + f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET", {}) for i in range(21) + } + sample_placed = Dcpt(sample_placed) + + sample_names = { + f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET.DESC", {"string": True}) + for i in range(21) + } + sample_names = Dcpt(sample_names) + + sample_in_gripper = Cpt( + EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_flomni100:GET" + ) + sample_in_gripper_name = Cpt( + EpicsSignal, + name="sample_in_gripper_name", + read_pv="XOMNY-SAMPLE_DB_flomni100:GET.DESC", + string=True, + ) + + def __init__(self, prefix="", *, name, **kwargs): + super().__init__(prefix, name=name, **kwargs) + self.sample_placed.sample1.subscribe(self._emit_value) + + def _emit_value(self, **kwargs): + timestamp = kwargs.pop("timestamp", time.time()) + self.wait_for_connection() + self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self) + + def set_sample_slot(self, slot_nr: int, name: str) -> bool: + if slot_nr > 20: + raise FlomniSampleStorageError(f"Invalid slot number {slot_nr}.") + + getattr(self.sample_placed, f"sample{slot_nr}").set(1) + getattr(self.sample_names, f"sample{slot_nr}").set(name) + + def unset_sample_slot(self, slot_nr: int) -> bool: + if slot_nr > 20: + raise FlomniSampleStorageError(f"Invalid slot number {slot_nr}.") + + getattr(self.sample_placed, f"sample{slot_nr}").set(0) + getattr(self.sample_names, f"sample{slot_nr}").set("-") + + def set_sample_in_gripper(self, name: str) -> bool: + self.sample_in_gripper.set(1) + self.sample_in_gripper_name.set(name) + + def unset_sample_in_gripper(self) -> bool: + self.sample_in_gripper.set(0) + self.sample_in_gripper_name.set("-") + + def is_sample_slot_used(self, slot_nr: int) -> bool: + val = getattr(self.sample_placed, f"sample{slot_nr}").get() + return bool(val) + + def is_sample_in_gripper(self) -> bool: + val = self.sample_in_gripper.get() + return bool(val) diff --git a/ophyd_devices/galil/fgalil_ophyd.py b/ophyd_devices/galil/fgalil_ophyd.py index a13e19a..b0ebf61 100644 --- a/ophyd_devices/galil/fgalil_ophyd.py +++ b/ophyd_devices/galil/fgalil_ophyd.py @@ -9,8 +9,6 @@ from ophyd import Component as Cpt from ophyd import Device, PositionerBase, Signal from ophyd.status import wait as status_wait from ophyd.utils import LimitError, ReadOnlyError -from prettytable import PrettyTable - from ophyd_devices.galil.galil_ophyd import ( BECConfigError, GalilAxesReferenced, @@ -21,15 +19,31 @@ from ophyd_devices.galil.galil_ophyd import ( GalilMotorResolution, GalilReadbackSignal, GalilSetpointSignal, + GalilSignalRO, retry_once, ) from ophyd_devices.utils.controller import Controller, threadlocked from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected +from prettytable import PrettyTable logger = bec_logger.logger class FlomniGalilController(GalilController): + USER_ACCESS = [ + "describe", + "show_running_threads", + "galil_show_all", + "socket_put_and_receive", + "socket_put_confirmed", + "drive_axis_to_limit", + "find_reference", + "get_motor_limit_switch", + "fosaz_light_curtain_is_triggered", + "is_motor_on", + "all_axes_referenced", + ] + def is_axis_moving(self, axis_Id, axis_Id_numeric) -> bool: if axis_Id is None and axis_Id_numeric is not None: axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric) @@ -41,13 +55,67 @@ class FlomniGalilController(GalilController): # TODO: check if all axes are referenced in all controllers return super().all_axes_referenced() + def fosaz_light_curtain_is_triggered(self) -> bool: + """ + Check the light curtain status for fosaz -class FlomniGalilReadbackSignal(GalilReadbackSignal): - pass + Returns: + bool: True if the light curtain is triggered + """ + + return int(float(self.socket_put_and_receive("MG @IN[14]").strip())) == 1 + + +class FlomniGalilReadbackSignal(GalilSignalRO): + @retry_once + @threadlocked + def _socket_get(self) -> float: + """Get command for the readback signal + + Returns: + float: Readback value after adjusting for sign and motor resolution. + """ + + current_pos = float(self.controller.socket_put_and_receive(f"TD{self.parent.axis_Id}")) + current_pos *= self.parent.sign + step_mm = self.parent.motor_resolution.get() + return current_pos / step_mm + + def read(self): + self._metadata["timestamp"] = time.time() + val = super().read() + return val class FlomniGalilSetpointSignal(GalilSetpointSignal): - pass + @retry_once + @threadlocked + def _socket_set(self, val: float) -> None: + """Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign. + Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted. + + Args: + val (float): Target value / setpoint value + + Raises: + GalilError: Raised if not all axes are referenced. + + """ + target_val = val * self.parent.sign + self.setpoint = target_val + axes_referenced = self.controller.all_axes_referenced() + if axes_referenced: + while self.controller.is_thread_active(0): + time.sleep(0.1) + + self.controller.socket_put_confirmed(f"naxis={self.parent.axis_Id_numeric}") + self.controller.socket_put_confirmed(f"ntarget={target_val:.3f}") + self.controller.socket_put_confirmed("movereq=1") + self.controller.socket_put_confirmed("XQ#NEWPAR") + while self.controller.is_thread_active(0): + time.sleep(0.005) + else: + raise GalilError("Not all axes are referenced.") class FlomniGalilMotorResolution(GalilMotorResolution): @@ -65,7 +133,7 @@ class FlomniGalilAxesReferenced(GalilAxesReferenced): class FlomniGalilMotor(Device, PositionerBase): USER_ACCESS = ["controller"] readback = Cpt( - GalilReadbackSignal, + FlomniGalilReadbackSignal, signal_name="readback", kind="hinted", ) @@ -115,6 +183,7 @@ class FlomniGalilMotor(Device, PositionerBase): "device_mapping has been specified but the device_manager cannot be accessed." ) self.rt = self.device_mapping.get("rt") + self.pid_x_correction = 0 super().__init__( prefix, diff --git a/ophyd_devices/galil/fupr_ophyd.py b/ophyd_devices/galil/fupr_ophyd.py index 7cd1aa7..4fa869a 100644 --- a/ophyd_devices/galil/fupr_ophyd.py +++ b/ophyd_devices/galil/fupr_ophyd.py @@ -63,6 +63,18 @@ class FuprGalilReadbackSignal(GalilReadbackSignal): step_mm = self.parent.motor_resolution.get() return current_pos / step_mm + def read(self): + self._metadata["timestamp"] = time.time() + val = super().read() + if self.parent.axis_Id_numeric == 0: + try: + rt = self.parent.device_manager.devices[self.parent.rt] + if rt.enabled: + rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"]) + except KeyError: + logger.warning("Failed to set RT value during readback.") + return val + class FuprGalilSetpointSignal(GalilSetpointSignal): @retry_once diff --git a/ophyd_devices/galil/galil_ophyd.py b/ophyd_devices/galil/galil_ophyd.py index 69c04a4..bead1ef 100644 --- a/ophyd_devices/galil/galil_ophyd.py +++ b/ophyd_devices/galil/galil_ophyd.py @@ -52,6 +52,11 @@ class GalilController(Controller): "socket_put_and_receive", "socket_put_confirmed", "lgalil_is_air_off_and_orchestra_enabled", + "drive_axis_to_limit", + "find_reference", + "get_motor_limit_switch", + "is_motor_on", + "all_axes_referenced", ] @threadlocked @@ -143,10 +148,11 @@ class GalilController(Controller): self.socket_put_confirmed(f"naxis={axis_Id_numeric}") self.socket_put_confirmed(f"ndir={direction_flag}") self.socket_put_confirmed("XQ#NEWPAR") + time.sleep(0.005) self.socket_put_confirmed("XQ#FES") - time.sleep(0.1) + time.sleep(0.01) while self.is_axis_moving(None, axis_Id_numeric): - time.sleep(0.1) + time.sleep(0.01) axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric) # check if we actually hit the limit diff --git a/ophyd_devices/rt_lamni/__init__.py b/ophyd_devices/rt_lamni/__init__.py index edc7c01..5a60cfb 100644 --- a/ophyd_devices/rt_lamni/__init__.py +++ b/ophyd_devices/rt_lamni/__init__.py @@ -1 +1,2 @@ -from .rt_lamni_ophyd import RtLamniMotor, RtLamniController +from .rt_flomni_ophyd import RtFlomniController, RtFlomniMotor +from .rt_lamni_ophyd import RtLamniController, RtLamniMotor diff --git a/ophyd_devices/rt_lamni/rt_flomni_ophyd.py b/ophyd_devices/rt_lamni/rt_flomni_ophyd.py new file mode 100644 index 0000000..9acce7d --- /dev/null +++ b/ophyd_devices/rt_lamni/rt_flomni_ophyd.py @@ -0,0 +1,749 @@ +import functools +import threading +import time +from typing import List + +import numpy as np +from bec_lib import MessageEndpoints, bec_logger, messages +from ophyd import Component as Cpt +from ophyd import Device, PositionerBase, Signal +from ophyd.status import wait as status_wait +from ophyd.utils import LimitError, ReadOnlyError +from ophyd_devices.rt_lamni.rt_ophyd import ( + BECConfigError, + RtCommunicationError, + RtController, + RtError, + RtReadbackSignal, + RtSetpointSignal, + RtSignalRO, + retry_once, +) +from ophyd_devices.utils.controller import Controller, threadlocked +from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected +from prettytable import PrettyTable + +logger = bec_logger.logger + + +class RtFlomniController(RtController): + USER_ACCESS = [ + "socket_put_and_receive", + "set_rotation_angle", + "feedback_disable", + "feedback_enable_without_reset", + "feedback_enable_with_reset", + "add_pos_to_scan", + "clear_trajectory_generator", + "show_cyclic_error_compensation", + "laser_tracker_on", + "laser_tracker_off", + "laser_tracker_show_all", + ] + + def __init__( + self, + *, + name=None, + socket_cls=None, + socket_host=None, + socket_port=None, + attr_name="", + parent=None, + labels=None, + kind=None, + ): + super().__init__( + name=name, + socket_cls=socket_cls, + socket_host=socket_host, + socket_port=socket_port, + attr_name=attr_name, + parent=parent, + labels=labels, + kind=kind, + ) + self.tracker_info = {} + self._min_scan_buffer_reached = False + self.rt_pid_voltage = None + + def add_pos_to_scan(self, positions) -> None: + def send_positions(parent, positions): + parent._min_scan_buffer_reached = False + start_time = time.time() + for pos_index, pos in enumerate(positions): + parent.socket_put_and_receive(f"s{pos[0]},{pos[1]},{pos[2]}") + if pos_index > 100: + parent._min_scan_buffer_reached = True + parent._min_scan_buffer_reached = True + logger.info( + f"Sending {len(positions)} positions took {time.time()-start_time} seconds." + ) + + threading.Thread(target=send_positions, args=(self, positions), daemon=True).start() + + def move_to_zero(self): + self.socket_put("pa0,0") + self.get_axis_by_name("rtx").user_setpoint.setpoint = 0 + self.socket_put("pa1,0") + self.get_axis_by_name("rty").user_setpoint.setpoint = 0 + self.socket_put("pa2,0") + self.get_axis_by_name("rtz").user_setpoint.setpoint = 0 + time.sleep(0.05) + + def feedback_is_running(self) -> bool: + status = int(float(self.socket_put_and_receive("l2").strip())) + if status == 1: + return False + return True + + def feedback_enable_with_reset(self): + self.socket_put("l0") # disable feedback + + self.move_to_zero() + + if not self.slew_rate_limiters_on_target() or np.abs(self.pid_y()) > 0.1: + logger.info("Please wait, slew rate limiters not on target.") + while not self.slew_rate_limiters_on_target() or np.abs(self.pid_y()) > 0.1: + time.sleep(0.05) + + self.get_device_manager().devices.rty.update_user_parameter({"tomo_additional_offsety": 0}) + self.clear_trajectory_generator() + + self.laser_tracker_on() + + # move to 0. FUPR will set the rotation angle during readout + self.get_device_manager().devices.fsamroy.obj.move(0, wait=True) + + fsamx = self.get_device_manager().devices.fsamx + + fsamx.obj.pid_x_correction = 0 + fsamx.obj.controller.socket_put_confirmed("axspeed[4]=0.1*stppermm[4]") + fsamx_in = fsamx.user_parameter.get("in") + if not np.isclose(fsamx.obj.readback.get(), fsamx_in, atol=0.3): + raise RtError( + "Something is wrong. fsamx is very far from the samx_in position. Don't dare correct automatically." + ) + + if not np.isclose(fsamx.obj.readback.get(), fsamx_in, atol=0.01): + fsamx.enabled_set = True + fsamx.obj.move(fsamx_in, wait=True) + fsamx.enabled_set = False + time.sleep(1) + + self.socket_put("l1") + time.sleep(0.4) + + if not self.feedback_is_running(): + raise RtError("Feedback is not running; likely an error in the interferometer.") + + time.sleep(1.5) + self.show_cyclic_error_compensation() + + self.rt_pid_voltage = self.get_pid_x() + rtx = self.get_device_manager().devices.rtx + rtx.update_user_parameter({"rt_pid_voltage": self.rt_pid_voltage}) + + self.set_device_enabled("fsamx", False) + self.set_device_enabled("fsamy", False) + self.set_device_enabled("foptx", False) + self.set_device_enabled("fopty", False) + + def move_samx_to_scan_region(self, fovx: float, cenx: float): + if self.rt_pid_voltage is None: + rtx = self.get_device_manager().devices.rtx + self.rt_pid_voltage = rtx.user_parameter.get("rt_pid_voltage") + if self.rt_pid_voltage is None: + raise RtError( + "rt_pid_voltage not set in rtx user parameters. Please run feedback_enable_with_reset first." + ) + logger.info(f"Using PID voltage from rtx user parameter: {self.rt_pid_voltage}") + expected_voltage = self.rt_pid_voltage + fovx / 2 * 7 / 100 + logger.info(f"Expected PID voltage: {expected_voltage}") + logger.info(f"Current PID voltage: {self.get_pid_x()}") + + wait_on_exit = False + while True: + if np.abs(self.get_pid_x() - expected_voltage) < 1: + break + wait_on_exit = True + self.socket_put("v0") + fsamx = self.get_device_manager().devices.fsamx + fsamx.enabled_set = True + fsamx.obj.controller.socket_put_confirmed("axspeed[4]=0.1*stppermm[4]") + fsamx.obj.pid_x_correction -= (self.get_pid_x() - expected_voltage) * 0.01 + logger.info(f"Correcting fsamx by {fsamx.obj.pid_x_correction}") + fsamx_in = fsamx.user_parameter.get("in") + fsamx.obj.move(fsamx_in + cenx / 1000 + fsamx.obj.pid_x_correction, wait=True) + fsamx.enabled_set = False + time.sleep(0.1) + self.laser_tracker_on() + time.sleep(0.01) + + if wait_on_exit: + time.sleep(1) + + self.socket_put("v1") + + @threadlocked + def clear_trajectory_generator(self): + self.socket_put("sc") + logger.info("flomni scan stopped and deleted, moving to start position") + + def feedback_enable_without_reset(self): + self.laser_tracker_on() + self.socket_put("l3") + time.sleep(0.01) + + if not self.feedback_is_running(): + raise RtError("Feedback is not running; likely an error in the interferometer.") + + self.set_device_enabled("fsamx", False) + self.set_device_enabled("fsamy", False) + self.set_device_enabled("foptx", False) + self.set_device_enabled("fopty", False) + + def feedback_disable(self): + self.clear_trajectory_generator() + self.move_to_zero() + self.socket_put("l0") + + self.set_device_enabled("fsamx", True) + self.set_device_enabled("fsamy", True) + self.set_device_enabled("foptx", True) + self.set_device_enabled("fopty", True) + + fsamx = self.get_device_manager().devices.fsamx + fsamx.obj.controller.socket_put_confirmed("axspeed[4]=025*stppermm[4]") + + def get_pid_x(self) -> float: + voltage = float(self.socket_put_and_receive("g").strip()) + return voltage + + def show_cyclic_error_compensation(self): + cec0 = int(float(self.socket_put_and_receive("w0").strip())) + cec1 = int(float(self.socket_put_and_receive("w0").strip())) + + if cec0 == 32: + logger.info("Cyclic Error Compensation: y-axis is initialized") + else: + logger.info("Cyclic Error Compensation: y-axis is NOT initialized") + if cec1 == 32: + logger.info("Cyclic Error Compensation: x-axis is initialized") + else: + logger.info("Cyclic Error Compensation: y-axis is NOT initialized") + + def set_rotation_angle(self, val: float) -> None: + self.socket_put(f"a{(-val)/180*np.pi}") + + def laser_tracker_on(self): + self.laser_update_tracker_info() + + if not self.tracker_info["enabled_z"] or not self.tracker_info["enabled_y"]: + logger.info("Enabling the laser tracker. Please wait...") + + tracker_intensity = self.tracker_info["tracker_intensity"] + if ( + tracker_intensity < self.tracker_info["threshold_intensity_y"] + or tracker_intensity < self.tracker_info["threshold_intensity_z"] + ): + logger.info(self.tracker_info) + raise RtError("The tracker cannot be enabled because the beam intensity it low.") + + self.move_to_zero() + self.socket_put("T1") + time.sleep(0.5) + + self.get_device_manager().devices.ftrackz.obj.controller.socket_put_confirmed( + "trackyct=0" + ) + self.get_device_manager().devices.ftrackz.obj.controller.socket_put_confirmed( + "trackzct=0" + ) + + self.laser_tracker_wait_on_target() + logger.info("Laser tracker running!") + + def laser_tracker_off(self): + self.socket_put("T0") + + def laser_tracker_show_all(self): + self.laser_update_tracker_info() + t = PrettyTable() + t.title = f"Laser Tracker Info" + t.field_names = ["Name", "Value"] + for key, val in self.tracker_info.items(): + t.add_row([key, val]) + print(t) + + def laser_update_tracker_info(self): + ret = self.socket_put_and_receive("Ts") + + # remove trailing \n + ret = ret.split("\n")[0] + + tracker_values = [float(val) for val in ret.split(",")] + self.tracker_info = { + "tracker_intensity": tracker_values[2], + "threshold_intensity_y": tracker_values[8], + "enabled_y": bool(tracker_values[10]), + "beampos_y": tracker_values[5], + "target_y": tracker_values[6], + "piezo_voltage_y": tracker_values[9], + "threshold_intensity_z": tracker_values[3], + "enabled_z": bool(tracker_values[10]), + "beampos_z": tracker_values[0], + "target_z": tracker_values[1], + "piezo_voltage_z": tracker_values[4], + } + + def laser_tracker_galil_enable(self): + self.get_device_manager().devices.ftrackz.obj.controller.socket_put_confirmed("tracken=1") + self.get_device_manager().devices.ftrackz.obj.controller.socket_put_confirmed("trackyct=0") + self.get_device_manager().devices.ftrackz.obj.controller.socket_put_confirmed("trackzct=0") + self.get_device_manager().devices.ftrackz.obj.controller.socket_put_confirmed("XQ#Tracker") + + def laser_tracker_on_target(self) -> bool: + self.laser_update_tracker_info() + if np.isclose( + self.tracker_info["beampos_y"], self.tracker_info["target_y"], atol=0.02 + ) and np.isclose(self.tracker_info["beampos_z"], self.tracker_info["target_z"], atol=0.02): + return True + return False + + def laser_tracker_wait_on_target(self): + max_repeat = 25 + count = 0 + while not self.laser_tracker_on_target(): + self.laser_tracker_galil_enable() + logger.info("Waiting for laser tracker to reach target.") + time.sleep(0.5) + count += 1 + if count > max_repeat: + raise RtError("Failed to reach laser target position.") + + def slew_rate_limiters_on_target(self) -> bool: + ret = int(float(self.socket_put_and_receive("y").strip())) + if ret == 3: + return True + return False + + def pid_y(self) -> float: + ret = float(self.socket_put_and_receive("G").strip()) + return ret + + def read_ssi_interferometer(self, axis_number): + val = float(self.socket_put_and_receive(f"j{axis_number}").strip()) + return val + + def show_signal_strength_interferometer(self): + t = PrettyTable() + t.title = f"Interferometer signal strength" + t.field_names = ["Axis", "Value"] + for i in range(3): + t.add_row([i, self.read_ssi_interferometer(i)]) + print(t) + + def _get_signals_from_table(self, return_table) -> dict: + self.average_stdeviations_x_st_fzp += float(return_table[4]) + self.average_stdeviations_y_st_fzp += float(return_table[7]) + signals = { + "target_x": {"value": float(return_table[2])}, + "average_x_st_fzp": {"value": float(return_table[3])}, + "stdev_x_st_fzp": {"value": float(return_table[4])}, + "target_y": {"value": float(return_table[5])}, + "average_y_st_fzp": {"value": float(return_table[6])}, + "stdev_y_st_fzp": {"value": float(return_table[7])}, + "average_rotz": {"value": float(return_table[8])}, + "stdev_rotz": {"value": float(return_table[9])}, + "average_stdeviations_x_st_fzp": { + "value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1) + }, + "average_stdeviations_y_st_fzp": { + "value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1) + }, + } + return signals + + @threadlocked + def start_scan(self): + if not self.feedback_is_running(): + logger.error( + "Cannot start scan because feedback loop is not running or there is an interferometer error." + ) + raise RtError( + "Cannot start scan because feedback loop is not running or there is an interferometer error." + ) + # here exception + (mode, number_of_positions_planned, current_position_in_scan) = self.get_scan_status() + + if number_of_positions_planned == 0: + logger.error("Cannot start scan because no target positions are planned.") + raise RtError("Cannot start scan because no target positions are planned.") + # hier exception + # start a point-by-point scan (for cont scan in flomni it would be "sa") + self.socket_put_and_receive("sd") + + @retry_once + @threadlocked + def get_scan_status(self): + return_table = (self.socket_put_and_receive("sr")).split(",") + if len(return_table) != 3: + raise RtCommunicationError( + f"Expected to receive 3 return values. Instead received {return_table}" + ) + mode = int(float(return_table[0])) + # mode 0: direct positioning + # mode 1: running internal timer (not tested/used anymore) + # mode 2: rt point scan running + # mode 3: rt point scan starting + # mode 5/6: rt continuous scanning (not available in LamNI) + number_of_positions_planned = int(float(return_table[1])) + current_position_in_scan = int(float(return_table[2])) + return (mode, number_of_positions_planned, current_position_in_scan) + + def get_device_manager(self): + for axis in self._axis: + if hasattr(axis, "device_manager") and axis.device_manager: + return axis.device_manager + raise BECConfigError("Could not access the device_manager") + + def read_positions_from_sampler(self): + # this was for reading after the scan completed + number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read + + read_counter = 0 + + self.average_stdeviations_x_st_fzp = 0 + self.average_stdeviations_y_st_fzp = 0 + self.average_lamni_angle = 0 + + mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status() + + # if not (mode==2 or mode==3): + # error + self.get_device_manager().producer.set_and_publish( + MessageEndpoints.device_status("rt_scan"), + messages.DeviceStatusMessage( + device="rt_scan", status=1, metadata=self.readout_metadata + ).dumps(), + ) + # while scan is running + while mode > 0: + # logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}") + mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status() + time.sleep(0.01) + if current_position_in_scan > 5: + while current_position_in_scan > read_counter + 1: + return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",") + # logger.info(f"{return_table}") + logger.info(f"Read {read_counter} out of {number_of_positions_planned}") + + read_counter = read_counter + 1 + + signals = self._get_signals_from_table(return_table) + + self.publish_device_data(signals=signals, pointID=int(return_table[0])) + + time.sleep(0.05) + + # read the last samples even though scan is finished already + while number_of_positions_planned > read_counter: + return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",") + logger.info(f"Read {read_counter} out of {number_of_positions_planned}") + # logger.info(f"{return_table}") + read_counter = read_counter + 1 + + signals = self._get_signals_from_table(return_table) + self.publish_device_data(signals=signals, pointID=int(return_table[0])) + + self.get_device_manager().producer.set_and_publish( + MessageEndpoints.device_status("rt_scan"), + messages.DeviceStatusMessage( + device="rt_scan", status=0, metadata=self.readout_metadata + ).dumps(), + ) + + logger.info( + f"Flomni statistics: Average of all standard deviations: x {self.average_stdeviations_x_st_fzp/number_of_samples_to_read}, y {self.average_stdeviations_y_st_fzp/number_of_samples_to_read}." + ) + + def publish_device_data(self, signals, pointID): + self.get_device_manager().producer.send( + MessageEndpoints.device_read("rt_flomni"), + messages.DeviceMessage( + signals=signals, + metadata={"pointID": pointID, **self.readout_metadata}, + ).dumps(), + ) + + def start_readout(self): + readout = threading.Thread(target=self.read_positions_from_sampler) + readout.start() + + def kickoff(self, metadata): + self.readout_metadata = metadata + while not self._min_scan_buffer_reached: + time.sleep(0.001) + self.start_scan() + time.sleep(0.1) + self.start_readout() + + +class RtFlomniReadbackSignal(RtReadbackSignal): + @retry_once + @threadlocked + def _socket_get(self) -> float: + """Get command for the readback signal + + Returns: + float: Readback value after adjusting for sign and motor resolution. + """ + time.sleep(0.1) + return_table = (self.controller.socket_put_and_receive(f"pr")).split(",") + + current_pos = float(return_table[self.parent.axis_Id_numeric]) + + current_pos *= self.parent.sign + self.parent.user_setpoint.setpoint = current_pos + return current_pos + + +class RtFlomniSetpointSignal(RtSetpointSignal): + setpoint = 0 + + @retry_once + @threadlocked + def _socket_set(self, val: float) -> None: + """Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign. + Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted. + + Args: + val (float): Target value / setpoint value + + Raises: + RtError: Raised if interferometer feedback is disabled. + + """ + if not self.parent.controller.feedback_is_running(): + raise RtError( + "The interferometer feedback is not running. Either it is turned off or and interferometer error occured." + ) + self.set_with_feedback_disabled(val) + + def set_with_feedback_disabled(self, val): + target_val = val * self.parent.sign + self.setpoint = target_val + self.controller.socket_put(f"pa{self.parent.axis_Id_numeric},{target_val:.4f}") + + +class RtFlomniFeedbackRunning(RtSignalRO): + @threadlocked + def _socket_get(self): + return int(self.parent.controller.feedback_is_running()) + + +class RtFlomniMotor(Device, PositionerBase): + USER_ACCESS = ["controller"] + readback = Cpt( + RtFlomniReadbackSignal, + signal_name="readback", + kind="hinted", + ) + user_setpoint = Cpt(RtFlomniSetpointSignal, signal_name="setpoint") + + high_limit_travel = Cpt(Signal, value=0, kind="omitted") + low_limit_travel = Cpt(Signal, value=0, kind="omitted") + + SUB_READBACK = "readback" + SUB_CONNECTION_CHANGE = "connection_change" + _default_sub = SUB_READBACK + + def __init__( + self, + axis_Id, + prefix="", + *, + name, + kind=None, + read_attrs=None, + configuration_attrs=None, + parent=None, + host="mpc2844.psi.ch", + port=2222, + sign=1, + socket_cls=SocketIO, + device_manager=None, + limits=None, + **kwargs, + ): + self.axis_Id = axis_Id + self.sign = sign + self.controller = RtFlomniController( + socket_cls=socket_cls, socket_host=host, socket_port=port + ) + self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) + self.device_manager = device_manager + self.tolerance = kwargs.pop("tolerance", 0.5) + + super().__init__( + prefix, + name=name, + kind=kind, + read_attrs=read_attrs, + configuration_attrs=configuration_attrs, + parent=parent, + **kwargs, + ) + self.readback.name = self.name + self.controller.subscribe( + self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE + ) + self._update_connection_state() + + # self.readback.subscribe(self._forward_readback, event_type=self.readback.SUB_VALUE) + if limits is not None: + assert len(limits) == 2 + self.low_limit_travel.put(limits[0]) + self.high_limit_travel.put(limits[1]) + + @property + def limits(self): + return (self.low_limit_travel.get(), self.high_limit_travel.get()) + + @property + def low_limit(self): + return self.limits[0] + + @property + def high_limit(self): + return self.limits[1] + + def check_value(self, pos): + """Check that the position is within the soft limits""" + low_limit, high_limit = self.limits + + if low_limit < high_limit and not (low_limit <= pos <= high_limit): + raise LimitError(f"position={pos} not within limits {self.limits}") + + def _update_connection_state(self, **kwargs): + for walk in self.walk_signals(): + walk.item._metadata["connected"] = self.controller.connected + + def _forward_readback(self, **kwargs): + kwargs.pop("sub_type") + self._run_subs(sub_type="readback", **kwargs) + + @raise_if_disconnected + def move(self, position, wait=True, **kwargs): + """Move to a specified position, optionally waiting for motion to + complete. + + Parameters + ---------- + position + Position to move to + moved_cb : callable + Call this callback when movement has finished. This callback must + accept one keyword argument: 'obj' which will be set to this + positioner instance. + timeout : float, optional + Maximum time to wait for the motion. If None, the default timeout + for this positioner is used. + + Returns + ------- + status : MoveStatus + + Raises + ------ + TimeoutError + When motion takes longer than `timeout` + ValueError + On invalid positions + RuntimeError + If motion fails other than timing out + """ + self._started_moving = False + timeout = kwargs.pop("timeout", 100) + status = super().move(position, timeout=timeout, **kwargs) + self.user_setpoint.put(position, wait=False) + + def move_and_finish(): + while not self.controller.slew_rate_limiters_on_target(): + print("motor is moving") + val = self.readback.read() + self._run_subs( + sub_type=self.SUB_READBACK, + value=val, + timestamp=time.time(), + ) + time.sleep(0.01) + print("Move finished") + self._done_moving() + + threading.Thread(target=move_and_finish, daemon=True).start() + try: + if wait: + status_wait(status) + except KeyboardInterrupt: + self.stop() + raise + + return status + + @property + def axis_Id(self): + return self._axis_Id_alpha + + @axis_Id.setter + def axis_Id(self, val): + if isinstance(val, str): + if len(val) != 1: + raise ValueError(f"Only single-character axis_Ids are supported.") + self._axis_Id_alpha = val + self._axis_Id_numeric = ord(val.lower()) - 97 + else: + raise TypeError(f"Expected value of type str but received {type(val)}") + + @property + def axis_Id_numeric(self): + return self._axis_Id_numeric + + @axis_Id_numeric.setter + def axis_Id_numeric(self, val): + if isinstance(val, int): + if val > 26: + raise ValueError(f"Numeric value exceeds supported range.") + self._axis_Id_alpha = val + self._axis_Id_numeric = (chr(val + 97)).capitalize() + else: + raise TypeError(f"Expected value of type int but received {type(val)}") + + def kickoff(self, metadata, **kwargs) -> None: + self.controller.kickoff(metadata) + + @property + def egu(self): + """The engineering units (EGU) for positions""" + return "um" + + # how is this used later? + + def stage(self) -> List[object]: + return super().stage() + + def unstage(self) -> List[object]: + return super().unstage() + + def stop(self, *, success=False): + self.controller.stop_all_axes() + return super().stop(success=success) + + +if __name__ == "__main__": + rtcontroller = RtFlomniController( + socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222 + ) + rtcontroller.on() + rtcontroller.laser_tracker_on() diff --git a/ophyd_devices/rt_lamni/rt_ophyd.py b/ophyd_devices/rt_lamni/rt_ophyd.py new file mode 100644 index 0000000..202c7ef --- /dev/null +++ b/ophyd_devices/rt_lamni/rt_ophyd.py @@ -0,0 +1,831 @@ +import functools +import threading +import time +from typing import List + +import numpy as np +from bec_lib import MessageEndpoints, bec_logger, messages +from ophyd import Component as Cpt +from ophyd import Device, PositionerBase, Signal +from ophyd.status import wait as status_wait +from ophyd.utils import LimitError, ReadOnlyError + +from ophyd_devices.utils.controller import Controller, threadlocked +from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected + +logger = bec_logger.logger + + +class RtCommunicationError(Exception): + pass + + +class RtError(Exception): + pass + + +class BECConfigError(Exception): + pass + + +def retry_once(fcn): + """Decorator to rerun a function in case a CommunicationError was raised. This may happen if the buffer was not empty.""" + + @functools.wraps(fcn) + def wrapper(self, *args, **kwargs): + try: + val = fcn(self, *args, **kwargs) + except (RtCommunicationError, RtError): + val = fcn(self, *args, **kwargs) + return val + + return wrapper + + +class RtController(Controller): + _axes_per_controller = 3 + USER_ACCESS = [ + "socket_put_and_receive", + "set_rotation_angle", + "feedback_disable", + "feedback_enable_without_reset", + "feedback_disable_and_even_reset_lamni_angle_interferometer", + "feedback_enable_with_reset", + "add_pos_to_scan", + "clear_trajectory_generator", + "_set_axis_velocity", + "_set_axis_velocity_maximum_speed", + "_position_sampling_single_read", + "_position_sampling_single_reset_and_start_sampling", + ] + + def on(self, controller_num=0) -> None: + """Open a new socket connection to the controller""" + # if not self.connected: + # try: + # self.sock.open() + # # discuss - after disconnect takes a while for the server to be ready again + # max_retries = 10 + # tries = 0 + # while not self.connected: + # try: + # welcome_message = self.sock.receive() + # self.connected = True + # except ConnectionResetError as conn_reset: + # if tries > max_retries: + # raise conn_reset + # tries += 1 + # time.sleep(2) + # except ConnectionRefusedError as conn_error: + # logger.error("Failed to open a connection to RTLamNI.") + # raise RtCommunicationError from conn_error + + # else: + # logger.info("The connection has already been established.") + # # warnings.warn(f"The connection has already been established.", stacklevel=2) + super().on() + # self._update_flyer_device_info() + + def set_axis(self, axis: Device, axis_nr: int) -> None: + """Assign an axis to a device instance. + + Args: + axis (Device): Device instance (e.g. GalilMotor) + axis_nr (int): Controller axis number + + """ + self._axis[axis_nr] = axis + + @threadlocked + def socket_put(self, val: str) -> None: + self.sock.put(f"{val}\n".encode()) + + @threadlocked + def socket_get(self) -> str: + return self.sock.receive().decode() + + @retry_once + @threadlocked + def socket_put_and_receive(self, val: str, remove_trailing_chars=True) -> str: + self.socket_put(val) + if remove_trailing_chars: + return self._remove_trailing_characters(self.sock.receive().decode()) + return self.socket_get() + + def is_axis_moving(self, axis_Id) -> bool: + # this checks that axis is on target + axis_is_on_target = bool(float(self.socket_put_and_receive(f"o"))) + return not axis_is_on_target + + # def is_thread_active(self, thread_id: int) -> bool: + # val = float(self.socket_put_and_receive(f"MG_XQ{thread_id}")) + # if val == -1: + # return False + # return True + + def _remove_trailing_characters(self, var) -> str: + if len(var) > 1: + return var.split("\r\n")[0] + return var + + @threadlocked + def set_rotation_angle(self, val: float): + self.socket_put(f"a{(val-300+30.538)/180*np.pi}") + + @threadlocked + def stop_all_axes(self): + self.socket_put("sc") + + @threadlocked + def feedback_disable(self): + self.socket_put("J0") + logger.info("LamNI Feedback disabled.") + self.set_device_enabled("lsamx", True) + self.set_device_enabled("lsamy", True) + self.set_device_enabled("loptx", True) + self.set_device_enabled("lopty", True) + self.set_device_enabled("loptz", True) + + @threadlocked + def _set_axis_velocity(self, um_per_s): + self.socket_put(f"V{um_per_s}") + + @threadlocked + def _set_axis_velocity_maximum_speed(self): + self.socket_put(f"V0") + + # for developement of soft continuous scanning + @threadlocked + def _position_sampling_single_reset_and_start_sampling(self): + self.socket_put(f"Ss") + + @threadlocked + def _position_sampling_single_read(self): + (number_of_samples, sum0, sum0_2, sum1, sum1_2, sum2, sum2_2) = self.socket_put_and_receive( + f"Sr" + ).split(",") + avg_x = float(sum1) / int(number_of_samples) + avg_y = float(sum0) / int(number_of_samples) + stdev_x = np.sqrt( + float(sum1_2) / int(number_of_samples) + - np.power(float(sum1) / int(number_of_samples), 2) + ) + stdev_y = np.sqrt( + float(sum0_2) / int(number_of_samples) + - np.power(float(sum0) / int(number_of_samples), 2) + ) + return (avg_x, avg_y, stdev_x, stdev_y) + + @threadlocked + def feedback_enable_without_reset(self): + # read current interferometer position + return_table = (self.socket_put_and_receive(f"J4")).split(",") + x_curr = float(return_table[2]) + y_curr = float(return_table[1]) + # set these as closed loop target position + self.socket_put(f"pa0,{x_curr:.4f}") + self.socket_put(f"pa1,{y_curr:.4f}") + self.get_device_manager().devices.rtx.obj.user_setpoint.set_with_feedback_disabled(x_curr) + self.get_device_manager().devices.rty.obj.user_setpoint.set_with_feedback_disabled(y_curr) + self.socket_put("J5") + logger.info("LamNI Feedback enabled (without reset).") + self.set_device_enabled("lsamx", False) + self.set_device_enabled("lsamy", False) + self.set_device_enabled("loptx", False) + self.set_device_enabled("lopty", False) + self.set_device_enabled("loptz", False) + + @threadlocked + def feedback_disable_and_even_reset_lamni_angle_interferometer(self): + self.socket_put("J6") + logger.info("LamNI Feedback disabled including the angular interferometer.") + self.set_device_enabled("lsamx", True) + self.set_device_enabled("lsamy", True) + self.set_device_enabled("loptx", True) + self.set_device_enabled("lopty", True) + self.set_device_enabled("loptz", True) + + def get_device_manager(self): + for axis in self._axis: + if hasattr(axis, "device_manager") and axis.device_manager: + return axis.device_manager + raise BECConfigError("Could not access the device_manager") + + def get_axis_by_name(self, name): + for axis in self._axis: + if axis: + if axis.name == name: + return axis + raise RuntimeError(f"Could not find an axis with name {name}") + + @threadlocked + def clear_trajectory_generator(self): + self.socket_put("sc") + logger.info("LamNI scan stopped and deleted, moving to start position") + + def add_pos_to_scan(self, positions) -> None: + def send_positions(parent, positions): + parent._min_scan_buffer_reached = False + for pos_index, pos in enumerate(positions): + parent.socket_put_and_receive(f"s{pos[0]},{pos[1]},0") + if pos_index > 100: + parent._min_scan_buffer_reached = True + parent._min_scan_buffer_reached = True + + threading.Thread(target=send_positions, args=(self, positions), daemon=True).start() + + @retry_once + @threadlocked + def get_scan_status(self): + return_table = (self.socket_put_and_receive(f"sr")).split(",") + if len(return_table) != 3: + raise RtCommunicationError( + f"Expected to receive 3 return values. Instead received {return_table}" + ) + mode = int(return_table[0]) + # mode 0: direct positioning + # mode 1: running internal timer (not tested/used anymore) + # mode 2: rt point scan running + # mode 3: rt point scan starting + # mode 5/6: rt continuous scanning (not available in LamNI) + number_of_positions_planned = int(return_table[1]) + current_position_in_scan = int(return_table[2]) + return (mode, number_of_positions_planned, current_position_in_scan) + + @threadlocked + def start_scan(self): + interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0]) + if interferometer_feedback_not_running == 1: + logger.error( + "Cannot start scan because feedback loop is not running or there is an interferometer error." + ) + raise RtError( + "Cannot start scan because feedback loop is not running or there is an interferometer error." + ) + # here exception + (mode, number_of_positions_planned, current_position_in_scan) = self.get_scan_status() + + if number_of_positions_planned == 0: + logger.error("Cannot start scan because no target positions are planned.") + raise RtError("Cannot start scan because no target positions are planned.") + # hier exception + # start a point-by-point scan (for cont scan in flomni it would be "sa") + self.socket_put_and_receive("sd") + + def start_readout(self): + readout = threading.Thread(target=self.read_positions_from_sampler) + readout.start() + + def _update_flyer_device_info(self): + flyer_info = self._get_flyer_device_info() + self.get_device_manager().producer.set( + MessageEndpoints.device_info("rt_scan"), + messages.DeviceInfoMessage(device="rt_scan", info=flyer_info).dumps(), + ) + + def _get_flyer_device_info(self) -> dict: + return { + "device_name": self.name, + "device_attr_name": getattr(self, "attr_name", ""), + "device_dotted_name": getattr(self, "dotted_name", ""), + "device_info": { + "device_base_class": "ophydobject", + "signals": [], + "hints": {"fields": ["average_x_st_fzp", "average_y_st_fzp"]}, + "describe": {}, + "describe_configuration": {}, + "sub_devices": [], + "custom_user_access": [], + }, + } + + def kickoff(self, metadata): + self.readout_metadata = metadata + while not self._min_scan_buffer_reached: + time.sleep(0.001) + self.start_scan() + time.sleep(0.1) + self.start_readout() + + def _get_signals_from_table(self, return_table) -> dict: + self.average_stdeviations_x_st_fzp += float(return_table[5]) + self.average_stdeviations_y_st_fzp += float(return_table[8]) + self.average_lamni_angle += float(return_table[19]) + signals = { + "target_x": {"value": float(return_table[3])}, + "average_x_st_fzp": {"value": float(return_table[4])}, + "stdev_x_st_fzp": {"value": float(return_table[5])}, + "target_y": {"value": float(return_table[6])}, + "average_y_st_fzp": {"value": float(return_table[7])}, + "stdev_y_st_fzp": {"value": float(return_table[8])}, + "average_cap1": {"value": float(return_table[9])}, + "stdev_cap1": {"value": float(return_table[10])}, + "average_cap2": {"value": float(return_table[11])}, + "stdev_cap2": {"value": float(return_table[12])}, + "average_cap3": {"value": float(return_table[13])}, + "stdev_cap3": {"value": float(return_table[14])}, + "average_cap4": {"value": float(return_table[15])}, + "stdev_cap4": {"value": float(return_table[16])}, + "average_cap5": {"value": float(return_table[17])}, + "stdev_cap5": {"value": float(return_table[18])}, + "average_angle_interf_ST": {"value": float(return_table[19])}, + "stdev_angle_interf_ST": {"value": float(return_table[20])}, + "average_stdeviations_x_st_fzp": { + "value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1) + }, + "average_stdeviations_y_st_fzp": { + "value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1) + }, + "average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)}, + } + return signals + + def read_positions_from_sampler(self): + # this was for reading after the scan completed + number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read + + read_counter = 0 + previous_point_in_scan = 0 + + self.average_stdeviations_x_st_fzp = 0 + self.average_stdeviations_y_st_fzp = 0 + self.average_lamni_angle = 0 + + mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status() + + # if not (mode==2 or mode==3): + # error + self.get_device_manager().producer.set_and_publish( + MessageEndpoints.device_status("rt_scan"), + messages.DeviceStatusMessage( + device="rt_scan", status=1, metadata=self.readout_metadata + ).dumps(), + ) + # while scan is running + while mode > 0: + # logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}") + mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status() + time.sleep(0.01) + if current_position_in_scan > 5: + while current_position_in_scan > read_counter + 1: + return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",") + # logger.info(f"{return_table}") + logger.info(f"Read {read_counter} out of {number_of_positions_planned}") + + read_counter = read_counter + 1 + + signals = self._get_signals_from_table(return_table) + + self.publish_device_data(signals=signals, pointID=int(return_table[0])) + + time.sleep(0.05) + + # read the last samples even though scan is finished already + while number_of_positions_planned > read_counter: + return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",") + logger.info(f"Read {read_counter} out of {number_of_positions_planned}") + # logger.info(f"{return_table}") + read_counter = read_counter + 1 + + signals = self._get_signals_from_table(return_table) + self.publish_device_data(signals=signals, pointID=int(return_table[0])) + + self.get_device_manager().producer.set_and_publish( + MessageEndpoints.device_status("rt_scan"), + messages.DeviceStatusMessage( + device="rt_scan", status=0, metadata=self.readout_metadata + ).dumps(), + ) + + logger.info( + f"LamNI statistics: Average of all standard deviations: x {self.average_stdeviations_x_st_fzp/number_of_samples_to_read}, y {self.average_stdeviations_y_st_fzp/number_of_samples_to_read}, angle {self.average_lamni_angle/number_of_samples_to_read}." + ) + + def publish_device_data(self, signals, pointID): + self.get_device_manager().producer.send( + MessageEndpoints.device_read("rt_lamni"), + messages.DeviceMessage( + signals=signals, + metadata={"pointID": pointID, **self.readout_metadata}, + ).dumps(), + ) + + def feedback_status_angle_lamni(self) -> bool: + return_table = (self.socket_put_and_receive(f"J7")).split(",") + logger.debug( + f"LamNI angle interferomter status {bool(return_table[0])}, position {float(return_table[1])}, signal {float(return_table[2])}" + ) + return bool(return_table[0]) + + def feedback_enable_with_reset(self): + if not self.feedback_status_angle_lamni(): + self.feedback_disable_and_even_reset_lamni_angle_interferometer() + logger.info(f"LamNI resetting interferometer inclusive angular interferomter.") + else: + self.feedback_disable() + logger.info( + f"LamNI resetting interferomter except angular interferometer which is already running." + ) + + # set these as closed loop target position + + self.socket_put(f"pa0,0") + self.get_axis_by_name("rtx").user_setpoint.setpoint = 0 + self.socket_put(f"pa1,0") + self.get_axis_by_name("rty").user_setpoint.setpoint = 0 + self.socket_put( + f"pa2,0" + ) # we set all three outputs of the traj. gen. although in LamNI case only 0,1 are used + self.clear_trajectory_generator() + + self.get_device_manager().devices.lsamrot.obj.move(0, wait=True) + + galil_controller_rt_status = ( + self.get_device_manager().devices.lsamx.obj.controller.lgalil_is_air_off_and_orchestra_enabled() + ) + + if galil_controller_rt_status == 0: + logger.error( + "Cannot enable feedback. The small rotation air is on and/or orchestra disabled by the motor controller." + ) + raise RtError( + "Cannot enable feedback. The small rotation air is on and/or orchestra disabled by the motor controller." + ) + + time.sleep(0.03) + + lsamx_user_params = self.get_device_manager().devices.lsamx.user_parameter + if lsamx_user_params is None or lsamx_user_params.get("center") is None: + raise RuntimeError("lsamx center is not defined") + lsamy_user_params = self.get_device_manager().devices.lsamy.user_parameter + if lsamy_user_params is None or lsamy_user_params.get("center") is None: + raise RuntimeError("lsamy center is not defined") + lsamx_center = lsamx_user_params.get("center") + lsamy_center = lsamy_user_params.get("center") + self.get_device_manager().devices.lsamx.obj.move(lsamx_center, wait=True) + self.get_device_manager().devices.lsamy.obj.move(lsamy_center, wait=True) + self.socket_put("J1") + + _waitforfeedbackctr = 0 + + interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0]) + + while interferometer_feedback_not_running == 1 and _waitforfeedbackctr < 100: + time.sleep(0.01) + _waitforfeedbackctr = _waitforfeedbackctr + 1 + interferometer_feedback_not_running = int( + (self.socket_put_and_receive("J2")).split(",")[0] + ) + + self.set_device_enabled("lsamx", False) + self.set_device_enabled("lsamy", False) + self.set_device_enabled("loptx", False) + self.set_device_enabled("lopty", False) + self.set_device_enabled("loptz", False) + + if interferometer_feedback_not_running == 1: + logger.error( + "Cannot start scan because feedback loop is not running or there is an interferometer error." + ) + raise RtError( + "Cannot start scan because feedback loop is not running or there is an interferometer error." + ) + + time.sleep(0.01) + + # ptychography_alignment_done = 0 + + def set_device_enabled(self, device_name: str, enabled: bool) -> None: + """enable / disable a device""" + if device_name not in self.get_device_manager().devices: + logger.warning( + f"Device {device_name} is not configured and cannot be enabled/disabled." + ) + return + self.get_device_manager().devices[device_name].enabled_set = enabled + + +class RtSignalBase(SocketSignal): + def __init__(self, signal_name, **kwargs): + self.signal_name = signal_name + super().__init__(**kwargs) + self.controller = self.parent.controller + self.sock = self.parent.controller.sock + + +class RtSignalRO(RtSignalBase): + def __init__(self, signal_name, **kwargs): + super().__init__(signal_name, **kwargs) + self._metadata["write_access"] = False + + def _socket_set(self, val): + raise ReadOnlyError("Read-only signals cannot be set") + + +class RtReadbackSignal(RtSignalRO): + @retry_once + @threadlocked + def _socket_get(self) -> float: + """Get command for the readback signal + + Returns: + float: Readback value after adjusting for sign and motor resolution. + """ + return_table = (self.controller.socket_put_and_receive(f"J4")).split(",") + print(return_table) + if self.parent.axis_Id_numeric == 0: + readback_index = 2 + elif self.parent.axis_Id_numeric == 1: + readback_index = 1 + else: + raise RtError("Currently, only two axes are supported.") + + current_pos = float(return_table[readback_index]) + + current_pos *= self.parent.sign + return current_pos + + +class RtSetpointSignal(RtSignalBase): + setpoint = 0 + + def _socket_get(self) -> float: + """Get command for receiving the setpoint / target value. + The value is not pulled from the controller but instead just the last setpoint used. + + Returns: + float: setpoint / target value + """ + return self.setpoint + + @retry_once + @threadlocked + def _socket_set(self, val: float) -> None: + """Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign. + Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted. + + Args: + val (float): Target value / setpoint value + + Raises: + RtError: Raised if interferometer feedback is disabled. + + """ + interferometer_feedback_not_running = int( + (self.controller.socket_put_and_receive("J2")).split(",")[0] + ) + if interferometer_feedback_not_running != 0: + raise RtError( + "The interferometer feedback is not running. Either it is turned off or and interferometer error occured." + ) + self.set_with_feedback_disabled(val) + + def set_with_feedback_disabled(self, val): + target_val = val * self.parent.sign + self.setpoint = target_val + self.controller.socket_put(f"pa{self.parent.axis_Id_numeric},{target_val:.4f}") + + +class RtMotorIsMoving(RtSignalRO): + def _socket_get(self): + return self.controller.is_axis_moving(self.parent.axis_Id_numeric) + + def get(self): + val = super().get() + if val is not None: + self._run_subs( + sub_type=self.SUB_VALUE, + value=val, + timestamp=time.time(), + ) + return val + + +class RtFeedbackRunning(RtSignalRO): + @threadlocked + def _socket_get(self): + if int((self.controller.socket_put_and_receive("J2")).split(",")[0]) == 0: + return 1 + else: + return 0 + + +class RtMotor(Device, PositionerBase): + USER_ACCESS = ["controller"] + readback = Cpt( + RtReadbackSignal, + signal_name="readback", + kind="hinted", + ) + user_setpoint = Cpt(RtSetpointSignal, signal_name="setpoint") + + motor_is_moving = Cpt(RtMotorIsMoving, signal_name="motor_is_moving", kind="normal") + high_limit_travel = Cpt(Signal, value=0, kind="omitted") + low_limit_travel = Cpt(Signal, value=0, kind="omitted") + + SUB_READBACK = "readback" + SUB_CONNECTION_CHANGE = "connection_change" + _default_sub = SUB_READBACK + + def __init__( + self, + axis_Id, + prefix="", + *, + name, + kind=None, + read_attrs=None, + configuration_attrs=None, + parent=None, + host="mpc2680.psi.ch", + port=3333, + sign=1, + socket_cls=SocketIO, + device_manager=None, + limits=None, + **kwargs, + ): + self.axis_Id = axis_Id + self.sign = sign + self.controller = RtController(socket=socket_cls(host=host, port=port)) + self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) + self.device_manager = device_manager + self.tolerance = kwargs.pop("tolerance", 0.5) + + super().__init__( + prefix, + name=name, + kind=kind, + read_attrs=read_attrs, + configuration_attrs=configuration_attrs, + parent=parent, + **kwargs, + ) + self.readback.name = self.name + self.controller.subscribe( + self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE + ) + self._update_connection_state() + + # self.readback.subscribe(self._forward_readback, event_type=self.readback.SUB_VALUE) + if limits is not None: + assert len(limits) == 2 + self.low_limit_travel.put(limits[0]) + self.high_limit_travel.put(limits[1]) + + @property + def limits(self): + return (self.low_limit_travel.get(), self.high_limit_travel.get()) + + @property + def low_limit(self): + return self.limits[0] + + @property + def high_limit(self): + return self.limits[1] + + def check_value(self, pos): + """Check that the position is within the soft limits""" + low_limit, high_limit = self.limits + + if low_limit < high_limit and not (low_limit <= pos <= high_limit): + raise LimitError(f"position={pos} not within limits {self.limits}") + + def _update_connection_state(self, **kwargs): + for walk in self.walk_signals(): + walk.item._metadata["connected"] = self.controller.connected + + def _forward_readback(self, **kwargs): + kwargs.pop("sub_type") + self._run_subs(sub_type="readback", **kwargs) + + @raise_if_disconnected + def move(self, position, wait=True, **kwargs): + """Move to a specified position, optionally waiting for motion to + complete. + + Parameters + ---------- + position + Position to move to + moved_cb : callable + Call this callback when movement has finished. This callback must + accept one keyword argument: 'obj' which will be set to this + positioner instance. + timeout : float, optional + Maximum time to wait for the motion. If None, the default timeout + for this positioner is used. + + Returns + ------- + status : MoveStatus + + Raises + ------ + TimeoutError + When motion takes longer than `timeout` + ValueError + On invalid positions + RuntimeError + If motion fails other than timing out + """ + self._started_moving = False + timeout = kwargs.pop("timeout", 100) + status = super().move(position, timeout=timeout, **kwargs) + self.user_setpoint.put(position, wait=False) + + def move_and_finish(): + while self.motor_is_moving.get(): + print("motor is moving") + val = self.readback.read() + self._run_subs( + sub_type=self.SUB_READBACK, + value=val, + timestamp=time.time(), + ) + time.sleep(0.01) + print("Move finished") + self._done_moving() + + threading.Thread(target=move_and_finish, daemon=True).start() + try: + if wait: + status_wait(status) + except KeyboardInterrupt: + self.stop() + raise + + return status + + @property + def axis_Id(self): + return self._axis_Id_alpha + + @axis_Id.setter + def axis_Id(self, val): + if isinstance(val, str): + if len(val) != 1: + raise ValueError(f"Only single-character axis_Ids are supported.") + self._axis_Id_alpha = val + self._axis_Id_numeric = ord(val.lower()) - 97 + else: + raise TypeError(f"Expected value of type str but received {type(val)}") + + @property + def axis_Id_numeric(self): + return self._axis_Id_numeric + + @axis_Id_numeric.setter + def axis_Id_numeric(self, val): + if isinstance(val, int): + if val > 26: + raise ValueError(f"Numeric value exceeds supported range.") + self._axis_Id_alpha = val + self._axis_Id_numeric = (chr(val + 97)).capitalize() + else: + raise TypeError(f"Expected value of type int but received {type(val)}") + + def kickoff(self, metadata, **kwargs) -> None: + self.controller.kickoff(metadata) + + @property + def egu(self): + """The engineering units (EGU) for positions""" + return "um" + + # how is this used later? + + def stage(self) -> List[object]: + return super().stage() + + def unstage(self) -> List[object]: + return super().unstage() + + def stop(self, *, success=False): + self.controller.stop_all_axes() + return super().stop(success=success) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + + mock = False + if not mock: + rty = RtLamniMotor("B", name="rty", host="mpc2680.psi.ch", port=3333, sign=1) + rty.stage() + status = rty.move(0, wait=True) + status = rty.move(10, wait=True) + rty.read() + + rty.get() + rty.describe() + + rty.unstage() + else: + from ophyd_devices.utils.socket import SocketMock + + rtx = RtLamniMotor("A", name="rtx", host="mpc2680.psi.ch", port=3333, socket_cls=SocketMock) + rty = RtLamniMotor("B", name="rty", host="mpc2680.psi.ch", port=3333, socket_cls=SocketMock) + rtx.stage() + # rty.stage() diff --git a/ophyd_devices/smaract/smaract_controller.py b/ophyd_devices/smaract/smaract_controller.py index 410b1fd..30f9314 100644 --- a/ophyd_devices/smaract/smaract_controller.py +++ b/ophyd_devices/smaract/smaract_controller.py @@ -3,8 +3,10 @@ import functools import json import logging import os +import time import numpy as np +from prettytable import PrettyTable from typeguard import typechecked from ophyd_devices.smaract.smaract_errors import ( @@ -73,7 +75,13 @@ class SmaractSensors: class SmaractController(Controller): _axes_per_controller = 6 _initialized = False - USER_ACCESS = ["socket_put_and_receive", "smaract_show_all", "move_open_loop_steps"] + USER_ACCESS = [ + "socket_put_and_receive", + "smaract_show_all", + "move_open_loop_steps", + "find_reference_mark", + "describe", + ] def __init__( self, @@ -117,7 +125,19 @@ class SmaractController(Controller): raise_if_not_status=False, ) -> str: self.socket_put(val) - return_val = self.socket_get() + return_val = "" + max_wait_time = 1 + elapsed_time = 0 + sleep_time = 0.01 + while True: + ret = self.socket_get() + return_val += ret + if ret.endswith("\n"): + break + time.sleep(sleep_time) + elapsed_time += sleep_time + if elapsed_time > max_wait_time: + break if remove_trailing_chars: return_val = self._remove_trailing_characters(return_val) logger.debug(f"Sending {val}; Returned {return_val}") @@ -234,15 +254,15 @@ class SmaractController(Controller): @axis_checked @typechecked def move_open_loop_steps( - self, axis_Id_numeric: int, steps: int, amplitude: int = 2000, frequency: int = 500 + self, axis_Id_numeric: int, steps: int, amplitude: int = 4000, frequency: int = 2000 ) -> None: - """Move open loop steps + """Move open loop steps. It performs a burst of steps with the given parameters. Args: axis_Id_numeric (int): Axis number. - steps (float): Relative position to move to in mm. - hold_time (int, optional): Specifies how long (in milliseconds) the position is actively held after reaching the target. The valid range is 0..60,000. A 0 deactivates this feature, a value of 60,000 is infinite (until manually stopped, see S command). Defaults to 1000. - + steps (int): Number and direction of steps to perform. The valid range is -30,000..30,000. A value of 0 stops the positioner, but see S command. A value of 30,000 or -30,000 performs an unbounded move. This should be used with caution since the positioner will only stop on an S command. + amplitude (int): Amplitude that the steps are performed with. Lower amplitude values result in a smaller step width. The parameter must be given as a 12bit value (range 0..4,095). 0 corresponds to 0V, 4,095 to 100V. Default: 4000 + frequency (int): Frequency in Hz that the steps are performed with. The valid range is 1..18,500. Default: 2000. """ self.socket_put_and_receive( f"MST{axis_Id_numeric},{steps},{amplitude},{frequency}", @@ -374,6 +394,15 @@ class SmaractController(Controller): if self._message_starts_with(return_val, f":ST{axis_Id_numeric}"): return self._sensors.avail_sensors.get(int(return_val.strip(f":ST{axis_Id_numeric},"))) + @retry_once + @axis_checked + def find_reference_mark( + self, axis_Id_numeric: int, direction: int, holdTime: int, autoZero: int + ) -> None: + return_val = self.socket_put_and_receive( + f"FRM{axis_Id_numeric},{direction},{holdTime},{autoZero}" + ) + @retry_once @axis_checked def set_closed_loop_move_speed(self, axis_Id_numeric: int, move_speed: float) -> None: @@ -419,7 +448,7 @@ class SmaractController(Controller): "Closed Loop Speed", "Position", ] - for ax in range(self._Smaract_axis_per_controller): + for ax in range(self._axes_per_controller): axis = self._axis[ax] if axis is not None: t.add_row( @@ -428,7 +457,7 @@ class SmaractController(Controller): axis.name, axis.connected, self.axis_is_referenced(axis.axis_Id_numeric), - self.get_closed_loop_move_speed(axis.axis_Id), + self.get_closed_loop_move_speed(axis.axis_Id_numeric), axis.readback.read().get(axis.name).get("value"), ] ) diff --git a/ophyd_devices/utils/controller.py b/ophyd_devices/utils/controller.py index 98793b0..7180f4b 100644 --- a/ophyd_devices/utils/controller.py +++ b/ophyd_devices/utils/controller.py @@ -57,10 +57,6 @@ class Controller(OphydObject): labels=None, kind=None, ): - self.sock = None - self._socket_cls = socket_cls - self._socket_host = socket_host - self._socket_port = socket_port if not self._initialized: super().__init__( name=name, attr_name=attr_name, parent=parent, labels=labels, kind=kind @@ -69,6 +65,10 @@ class Controller(OphydObject): self._axis = [] self._initialize() self._initialized = True + self.sock = None + self._socket_cls = socket_cls + self._socket_host = socket_host + self._socket_port = socket_port def _initialize(self): self._connected = False @@ -133,7 +133,7 @@ class Controller(OphydObject): def off(self) -> None: """Close the socket connection to the controller""" - if self.connected or self.sock is not None: + if self.connected and self.sock is not None: self.sock.close() self.connected = False self.sock = None diff --git a/ophyd_devices/utils/socket.py b/ophyd_devices/utils/socket.py index 74556b0..ea72d94 100644 --- a/ophyd_devices/utils/socket.py +++ b/ophyd_devices/utils/socket.py @@ -179,18 +179,29 @@ class SocketSignal(abc.ABC, Signal): class SocketIO: """SocketIO helper class for TCP IP connections""" - def __init__(self, host, port): + def __init__(self, host, port, max_retry=10): self.host = host self.port = port self.is_open = False + self.max_retry = max_retry self._initialize_socket() def connect(self): print(f"connecting to {self.host} port {self.port}") # self.sock.create_connection((host, port)) - if self.sock is None: - self._initialize_socket() - self.sock.connect((self.host, self.port)) + retry_count = 0 + while True: + try: + if self.sock is None: + self._initialize_socket() + self.sock.connect((self.host, self.port)) + break + except Exception as exc: + self.sock = None + time.sleep(2) + retry_count += 1 + if retry_count > self.max_retry: + raise exc def _put(self, msg_bytes): logger.debug(f"put message: {msg_bytes}")