From 2ddc074e4fe9638bac77df5f3bbd2b1c4600814c Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Tue, 7 Nov 2023 16:22:09 +0100 Subject: [PATCH] fix: fixed import; fixed file name --- ophyd_devices/galil/__init__.py | 5 +- ophyd_devices/galil/fgalil_ophyd.py | 94 +++++-------------- .../{fupr_galil_ophyd.py => fupr_ophyd.py} | 94 ++++++++++++++----- 3 files changed, 97 insertions(+), 96 deletions(-) rename ophyd_devices/galil/{fupr_galil_ophyd.py => fupr_ophyd.py} (74%) diff --git a/ophyd_devices/galil/__init__.py b/ophyd_devices/galil/__init__.py index e19b6ab..0ad6966 100644 --- a/ophyd_devices/galil/__init__.py +++ b/ophyd_devices/galil/__init__.py @@ -1,3 +1,4 @@ -from .galil_ophyd import GalilMotor, GalilController -from .fgalil_ophyd import FlomniGalilController +from .fgalil_ophyd import FlomniGalilController, FlomniGalilMotor +from .fupr_ophyd import FuprGalilController, FuprGalilMotor +from .galil_ophyd import GalilController, GalilMotor from .sgalil_ophyd import SGalilMotor diff --git a/ophyd_devices/galil/fgalil_ophyd.py b/ophyd_devices/galil/fgalil_ophyd.py index 23da36e..9272d73 100644 --- a/ophyd_devices/galil/fgalil_ophyd.py +++ b/ophyd_devices/galil/fgalil_ophyd.py @@ -29,97 +29,51 @@ from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconne logger = bec_logger.logger -class FuprGalilController(GalilController): +class FlomniGalilController(GalilController): def is_axis_moving(self, axis_Id, axis_Id_numeric) -> bool: if axis_Id is None and axis_Id_numeric is not None: axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric) - is_moving = bool(float(self.socket_put_and_receive(f"MG_BG{axis_Id}")) != 0) - return is_moving + active_thread = self.is_thread_active(0) + motor_is_on = self.is_motor_on(axis_Id) + return bool(active_thread or motor_is_on) def all_axes_referenced(self) -> bool: # TODO: check if all axes are referenced in all controllers return super().all_axes_referenced() - def axis_is_referenced(self) -> bool: - return self.all_axes_referenced() - def all_axis_referenced(self) -> bool: - return bool(float(self.socket_put_and_receive("MG axisref").strip())) - - def drive_axis_to_limit(self, axis_Id_numeric, direction: str) -> None: - raise NotImplementedError("This function is not implemented for the FuprGalilController.") - - -class FuprGalilReadbackSignal(GalilReadbackSignal): - @retry_once - @threadlocked - def _socket_get(self) -> float: - """Get command for the readback signal - - Returns: - float: Readback value after adjusting for sign and motor resolution. - """ - - current_pos = float(self.controller.socket_put_and_receive(f"TP{self.parent.axis_Id}")) - current_pos *= self.parent.sign - step_mm = self.parent.motor_resolution.get() - return current_pos / step_mm - - -class FuprGalilSetpointSignal(GalilSetpointSignal): - @retry_once - @threadlocked - def _socket_set(self, val: float) -> None: - """Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign. - Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted. - - Args: - val (float): Target value / setpoint value - - Raises: - GalilError: Raised if not all axes are referenced. - - """ - target_val = val * self.parent.sign - self.setpoint = target_val - axes_referenced = self.controller.all_axes_referenced() - if axes_referenced: - self.controller.socket_put_confirmed( - f"PA{self.parent.axis_Id}={int(self.setpoint*self.parent.MOTOR_RESOLUTION)}" - ) - self.controller.socket_put_confirmed(f"BG{self.parent.axis_Id}") - else: - raise GalilError("Not all axes are referenced.") - - -class FuprGalilMotorResolution(GalilMotorResolution): - @retry_once - @threadlocked - def _socket_get(self): - return self.parent.MOTOR_RESOLUTION - - -class FuprGalilMotorIsMoving(GalilMotorIsMoving): +class FlomniGalilReadbackSignal(GalilReadbackSignal): pass -class FuprGalilAxesReferenced(GalilAxesReferenced): +class FlomniGalilSetpointSignal(GalilSetpointSignal): pass -class FuprGalilMotor(Device, PositionerBase): +class FlomniGalilMotorResolution(GalilMotorResolution): + pass + + +class FlomniGalilMotorIsMoving(GalilMotorIsMoving): + pass + + +class FlomniGalilAxesReferenced(GalilAxesReferenced): + pass + + +class FlomniGalilMotor(Device, PositionerBase): USER_ACCESS = ["controller"] - MOTOR_RESOLUTION = 25600 readback = Cpt( GalilReadbackSignal, signal_name="readback", kind="hinted", ) - user_setpoint = Cpt(FuprGalilSetpointSignal, signal_name="setpoint") - motor_resolution = Cpt(FuprGalilMotorResolution, signal_name="resolution", kind="config") - motor_is_moving = Cpt(FuprGalilMotorIsMoving, signal_name="motor_is_moving", kind="normal") + user_setpoint = Cpt(FlomniGalilSetpointSignal, signal_name="setpoint") + motor_resolution = Cpt(FlomniGalilMotorResolution, signal_name="resolution", kind="config") + motor_is_moving = Cpt(FlomniGalilMotorIsMoving, signal_name="motor_is_moving", kind="normal") all_axes_referenced = Cpt( - FuprGalilAxesReferenced, signal_name="all_axes_referenced", kind="config" + FlomniGalilAxesReferenced, signal_name="all_axes_referenced", kind="config" ) high_limit_travel = Cpt(Signal, value=0, kind="omitted") low_limit_travel = Cpt(Signal, value=0, kind="omitted") @@ -148,7 +102,7 @@ class FuprGalilMotor(Device, PositionerBase): ): self.axis_Id = axis_Id self.sign = sign - self.controller = FuprGalilController(socket=socket_cls(host=host, port=port)) + self.controller = FlomniGalilController(socket=socket_cls(host=host, port=port)) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.tolerance = kwargs.pop("tolerance", 0.5) self.device_mapping = kwargs.pop("device_mapping", {}) diff --git a/ophyd_devices/galil/fupr_galil_ophyd.py b/ophyd_devices/galil/fupr_ophyd.py similarity index 74% rename from ophyd_devices/galil/fupr_galil_ophyd.py rename to ophyd_devices/galil/fupr_ophyd.py index 9272d73..23da36e 100644 --- a/ophyd_devices/galil/fupr_galil_ophyd.py +++ b/ophyd_devices/galil/fupr_ophyd.py @@ -29,51 +29,97 @@ from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconne logger = bec_logger.logger -class FlomniGalilController(GalilController): +class FuprGalilController(GalilController): def is_axis_moving(self, axis_Id, axis_Id_numeric) -> bool: if axis_Id is None and axis_Id_numeric is not None: axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric) - active_thread = self.is_thread_active(0) - motor_is_on = self.is_motor_on(axis_Id) - return bool(active_thread or motor_is_on) + is_moving = bool(float(self.socket_put_and_receive(f"MG_BG{axis_Id}")) != 0) + return is_moving def all_axes_referenced(self) -> bool: # TODO: check if all axes are referenced in all controllers return super().all_axes_referenced() + def axis_is_referenced(self) -> bool: + return self.all_axes_referenced() -class FlomniGalilReadbackSignal(GalilReadbackSignal): + def all_axis_referenced(self) -> bool: + return bool(float(self.socket_put_and_receive("MG axisref").strip())) + + def drive_axis_to_limit(self, axis_Id_numeric, direction: str) -> None: + raise NotImplementedError("This function is not implemented for the FuprGalilController.") + + +class FuprGalilReadbackSignal(GalilReadbackSignal): + @retry_once + @threadlocked + def _socket_get(self) -> float: + """Get command for the readback signal + + Returns: + float: Readback value after adjusting for sign and motor resolution. + """ + + current_pos = float(self.controller.socket_put_and_receive(f"TP{self.parent.axis_Id}")) + current_pos *= self.parent.sign + step_mm = self.parent.motor_resolution.get() + return current_pos / step_mm + + +class FuprGalilSetpointSignal(GalilSetpointSignal): + @retry_once + @threadlocked + def _socket_set(self, val: float) -> None: + """Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign. + Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted. + + Args: + val (float): Target value / setpoint value + + Raises: + GalilError: Raised if not all axes are referenced. + + """ + target_val = val * self.parent.sign + self.setpoint = target_val + axes_referenced = self.controller.all_axes_referenced() + if axes_referenced: + self.controller.socket_put_confirmed( + f"PA{self.parent.axis_Id}={int(self.setpoint*self.parent.MOTOR_RESOLUTION)}" + ) + self.controller.socket_put_confirmed(f"BG{self.parent.axis_Id}") + else: + raise GalilError("Not all axes are referenced.") + + +class FuprGalilMotorResolution(GalilMotorResolution): + @retry_once + @threadlocked + def _socket_get(self): + return self.parent.MOTOR_RESOLUTION + + +class FuprGalilMotorIsMoving(GalilMotorIsMoving): pass -class FlomniGalilSetpointSignal(GalilSetpointSignal): +class FuprGalilAxesReferenced(GalilAxesReferenced): pass -class FlomniGalilMotorResolution(GalilMotorResolution): - pass - - -class FlomniGalilMotorIsMoving(GalilMotorIsMoving): - pass - - -class FlomniGalilAxesReferenced(GalilAxesReferenced): - pass - - -class FlomniGalilMotor(Device, PositionerBase): +class FuprGalilMotor(Device, PositionerBase): USER_ACCESS = ["controller"] + MOTOR_RESOLUTION = 25600 readback = Cpt( GalilReadbackSignal, signal_name="readback", kind="hinted", ) - user_setpoint = Cpt(FlomniGalilSetpointSignal, signal_name="setpoint") - motor_resolution = Cpt(FlomniGalilMotorResolution, signal_name="resolution", kind="config") - motor_is_moving = Cpt(FlomniGalilMotorIsMoving, signal_name="motor_is_moving", kind="normal") + user_setpoint = Cpt(FuprGalilSetpointSignal, signal_name="setpoint") + motor_resolution = Cpt(FuprGalilMotorResolution, signal_name="resolution", kind="config") + motor_is_moving = Cpt(FuprGalilMotorIsMoving, signal_name="motor_is_moving", kind="normal") all_axes_referenced = Cpt( - FlomniGalilAxesReferenced, signal_name="all_axes_referenced", kind="config" + FuprGalilAxesReferenced, signal_name="all_axes_referenced", kind="config" ) high_limit_travel = Cpt(Signal, value=0, kind="omitted") low_limit_travel = Cpt(Signal, value=0, kind="omitted") @@ -102,7 +148,7 @@ class FlomniGalilMotor(Device, PositionerBase): ): self.axis_Id = axis_Id self.sign = sign - self.controller = FlomniGalilController(socket=socket_cls(host=host, port=port)) + self.controller = FuprGalilController(socket=socket_cls(host=host, port=port)) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.tolerance = kwargs.pop("tolerance", 0.5) self.device_mapping = kwargs.pop("device_mapping", {})