fix: fixed import; fixed file name

This commit is contained in:
wakonig_k 2023-11-07 16:22:09 +01:00
parent 9840491ab7
commit 2ddc074e4f
3 changed files with 97 additions and 96 deletions

View File

@ -1,3 +1,4 @@
from .galil_ophyd import GalilMotor, GalilController from .fgalil_ophyd import FlomniGalilController, FlomniGalilMotor
from .fgalil_ophyd import FlomniGalilController from .fupr_ophyd import FuprGalilController, FuprGalilMotor
from .galil_ophyd import GalilController, GalilMotor
from .sgalil_ophyd import SGalilMotor from .sgalil_ophyd import SGalilMotor

View File

@ -29,97 +29,51 @@ from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconne
logger = bec_logger.logger logger = bec_logger.logger
class FuprGalilController(GalilController): class FlomniGalilController(GalilController):
def is_axis_moving(self, axis_Id, axis_Id_numeric) -> bool: def is_axis_moving(self, axis_Id, axis_Id_numeric) -> bool:
if axis_Id is None and axis_Id_numeric is not None: if axis_Id is None and axis_Id_numeric is not None:
axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric) axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric)
is_moving = bool(float(self.socket_put_and_receive(f"MG_BG{axis_Id}")) != 0) active_thread = self.is_thread_active(0)
return is_moving motor_is_on = self.is_motor_on(axis_Id)
return bool(active_thread or motor_is_on)
def all_axes_referenced(self) -> bool: def all_axes_referenced(self) -> bool:
# TODO: check if all axes are referenced in all controllers # TODO: check if all axes are referenced in all controllers
return super().all_axes_referenced() return super().all_axes_referenced()
def axis_is_referenced(self) -> bool:
return self.all_axes_referenced()
def all_axis_referenced(self) -> bool: class FlomniGalilReadbackSignal(GalilReadbackSignal):
return bool(float(self.socket_put_and_receive("MG axisref").strip()))
def drive_axis_to_limit(self, axis_Id_numeric, direction: str) -> None:
raise NotImplementedError("This function is not implemented for the FuprGalilController.")
class FuprGalilReadbackSignal(GalilReadbackSignal):
@retry_once
@threadlocked
def _socket_get(self) -> float:
"""Get command for the readback signal
Returns:
float: Readback value after adjusting for sign and motor resolution.
"""
current_pos = float(self.controller.socket_put_and_receive(f"TP{self.parent.axis_Id}"))
current_pos *= self.parent.sign
step_mm = self.parent.motor_resolution.get()
return current_pos / step_mm
class FuprGalilSetpointSignal(GalilSetpointSignal):
@retry_once
@threadlocked
def _socket_set(self, val: float) -> None:
"""Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign.
Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted.
Args:
val (float): Target value / setpoint value
Raises:
GalilError: Raised if not all axes are referenced.
"""
target_val = val * self.parent.sign
self.setpoint = target_val
axes_referenced = self.controller.all_axes_referenced()
if axes_referenced:
self.controller.socket_put_confirmed(
f"PA{self.parent.axis_Id}={int(self.setpoint*self.parent.MOTOR_RESOLUTION)}"
)
self.controller.socket_put_confirmed(f"BG{self.parent.axis_Id}")
else:
raise GalilError("Not all axes are referenced.")
class FuprGalilMotorResolution(GalilMotorResolution):
@retry_once
@threadlocked
def _socket_get(self):
return self.parent.MOTOR_RESOLUTION
class FuprGalilMotorIsMoving(GalilMotorIsMoving):
pass pass
class FuprGalilAxesReferenced(GalilAxesReferenced): class FlomniGalilSetpointSignal(GalilSetpointSignal):
pass pass
class FuprGalilMotor(Device, PositionerBase): class FlomniGalilMotorResolution(GalilMotorResolution):
pass
class FlomniGalilMotorIsMoving(GalilMotorIsMoving):
pass
class FlomniGalilAxesReferenced(GalilAxesReferenced):
pass
class FlomniGalilMotor(Device, PositionerBase):
USER_ACCESS = ["controller"] USER_ACCESS = ["controller"]
MOTOR_RESOLUTION = 25600
readback = Cpt( readback = Cpt(
GalilReadbackSignal, GalilReadbackSignal,
signal_name="readback", signal_name="readback",
kind="hinted", kind="hinted",
) )
user_setpoint = Cpt(FuprGalilSetpointSignal, signal_name="setpoint") user_setpoint = Cpt(FlomniGalilSetpointSignal, signal_name="setpoint")
motor_resolution = Cpt(FuprGalilMotorResolution, signal_name="resolution", kind="config") motor_resolution = Cpt(FlomniGalilMotorResolution, signal_name="resolution", kind="config")
motor_is_moving = Cpt(FuprGalilMotorIsMoving, signal_name="motor_is_moving", kind="normal") motor_is_moving = Cpt(FlomniGalilMotorIsMoving, signal_name="motor_is_moving", kind="normal")
all_axes_referenced = Cpt( all_axes_referenced = Cpt(
FuprGalilAxesReferenced, signal_name="all_axes_referenced", kind="config" FlomniGalilAxesReferenced, signal_name="all_axes_referenced", kind="config"
) )
high_limit_travel = Cpt(Signal, value=0, kind="omitted") high_limit_travel = Cpt(Signal, value=0, kind="omitted")
low_limit_travel = Cpt(Signal, value=0, kind="omitted") low_limit_travel = Cpt(Signal, value=0, kind="omitted")
@ -148,7 +102,7 @@ class FuprGalilMotor(Device, PositionerBase):
): ):
self.axis_Id = axis_Id self.axis_Id = axis_Id
self.sign = sign self.sign = sign
self.controller = FuprGalilController(socket=socket_cls(host=host, port=port)) self.controller = FlomniGalilController(socket=socket_cls(host=host, port=port))
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.tolerance = kwargs.pop("tolerance", 0.5) self.tolerance = kwargs.pop("tolerance", 0.5)
self.device_mapping = kwargs.pop("device_mapping", {}) self.device_mapping = kwargs.pop("device_mapping", {})

View File

@ -29,51 +29,97 @@ from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconne
logger = bec_logger.logger logger = bec_logger.logger
class FlomniGalilController(GalilController): class FuprGalilController(GalilController):
def is_axis_moving(self, axis_Id, axis_Id_numeric) -> bool: def is_axis_moving(self, axis_Id, axis_Id_numeric) -> bool:
if axis_Id is None and axis_Id_numeric is not None: if axis_Id is None and axis_Id_numeric is not None:
axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric) axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric)
active_thread = self.is_thread_active(0) is_moving = bool(float(self.socket_put_and_receive(f"MG_BG{axis_Id}")) != 0)
motor_is_on = self.is_motor_on(axis_Id) return is_moving
return bool(active_thread or motor_is_on)
def all_axes_referenced(self) -> bool: def all_axes_referenced(self) -> bool:
# TODO: check if all axes are referenced in all controllers # TODO: check if all axes are referenced in all controllers
return super().all_axes_referenced() return super().all_axes_referenced()
def axis_is_referenced(self) -> bool:
return self.all_axes_referenced()
class FlomniGalilReadbackSignal(GalilReadbackSignal): def all_axis_referenced(self) -> bool:
return bool(float(self.socket_put_and_receive("MG axisref").strip()))
def drive_axis_to_limit(self, axis_Id_numeric, direction: str) -> None:
raise NotImplementedError("This function is not implemented for the FuprGalilController.")
class FuprGalilReadbackSignal(GalilReadbackSignal):
@retry_once
@threadlocked
def _socket_get(self) -> float:
"""Get command for the readback signal
Returns:
float: Readback value after adjusting for sign and motor resolution.
"""
current_pos = float(self.controller.socket_put_and_receive(f"TP{self.parent.axis_Id}"))
current_pos *= self.parent.sign
step_mm = self.parent.motor_resolution.get()
return current_pos / step_mm
class FuprGalilSetpointSignal(GalilSetpointSignal):
@retry_once
@threadlocked
def _socket_set(self, val: float) -> None:
"""Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign.
Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted.
Args:
val (float): Target value / setpoint value
Raises:
GalilError: Raised if not all axes are referenced.
"""
target_val = val * self.parent.sign
self.setpoint = target_val
axes_referenced = self.controller.all_axes_referenced()
if axes_referenced:
self.controller.socket_put_confirmed(
f"PA{self.parent.axis_Id}={int(self.setpoint*self.parent.MOTOR_RESOLUTION)}"
)
self.controller.socket_put_confirmed(f"BG{self.parent.axis_Id}")
else:
raise GalilError("Not all axes are referenced.")
class FuprGalilMotorResolution(GalilMotorResolution):
@retry_once
@threadlocked
def _socket_get(self):
return self.parent.MOTOR_RESOLUTION
class FuprGalilMotorIsMoving(GalilMotorIsMoving):
pass pass
class FlomniGalilSetpointSignal(GalilSetpointSignal): class FuprGalilAxesReferenced(GalilAxesReferenced):
pass pass
class FlomniGalilMotorResolution(GalilMotorResolution): class FuprGalilMotor(Device, PositionerBase):
pass
class FlomniGalilMotorIsMoving(GalilMotorIsMoving):
pass
class FlomniGalilAxesReferenced(GalilAxesReferenced):
pass
class FlomniGalilMotor(Device, PositionerBase):
USER_ACCESS = ["controller"] USER_ACCESS = ["controller"]
MOTOR_RESOLUTION = 25600
readback = Cpt( readback = Cpt(
GalilReadbackSignal, GalilReadbackSignal,
signal_name="readback", signal_name="readback",
kind="hinted", kind="hinted",
) )
user_setpoint = Cpt(FlomniGalilSetpointSignal, signal_name="setpoint") user_setpoint = Cpt(FuprGalilSetpointSignal, signal_name="setpoint")
motor_resolution = Cpt(FlomniGalilMotorResolution, signal_name="resolution", kind="config") motor_resolution = Cpt(FuprGalilMotorResolution, signal_name="resolution", kind="config")
motor_is_moving = Cpt(FlomniGalilMotorIsMoving, signal_name="motor_is_moving", kind="normal") motor_is_moving = Cpt(FuprGalilMotorIsMoving, signal_name="motor_is_moving", kind="normal")
all_axes_referenced = Cpt( all_axes_referenced = Cpt(
FlomniGalilAxesReferenced, signal_name="all_axes_referenced", kind="config" FuprGalilAxesReferenced, signal_name="all_axes_referenced", kind="config"
) )
high_limit_travel = Cpt(Signal, value=0, kind="omitted") high_limit_travel = Cpt(Signal, value=0, kind="omitted")
low_limit_travel = Cpt(Signal, value=0, kind="omitted") low_limit_travel = Cpt(Signal, value=0, kind="omitted")
@ -102,7 +148,7 @@ class FlomniGalilMotor(Device, PositionerBase):
): ):
self.axis_Id = axis_Id self.axis_Id = axis_Id
self.sign = sign self.sign = sign
self.controller = FlomniGalilController(socket=socket_cls(host=host, port=port)) self.controller = FuprGalilController(socket=socket_cls(host=host, port=port))
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.tolerance = kwargs.pop("tolerance", 0.5) self.tolerance = kwargs.pop("tolerance", 0.5)
self.device_mapping = kwargs.pop("device_mapping", {}) self.device_mapping = kwargs.pop("device_mapping", {})