feat: added npoint support to ophyd

This commit is contained in:
gac-x12sa
2024-10-02 11:50:39 +02:00
parent 0d2b4c4423
commit 15bdbe2e03
2 changed files with 253 additions and 106 deletions

View File

@@ -0,0 +1,34 @@
############################################################
#################### npoint motors #########################
############################################################
npx:
description: nPoint x axis
deviceClass: csaxs_bec.devices.npoint.npoint.NPointAxis
deviceConfig:
axis_Id: A
host: "nPoint000003.psi.ch"
limits:
- -50
- 50
port: 23
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
npy:
description: nPoint x axis
deviceClass: csaxs_bec.devices.npoint.npoint.NPointAxis
deviceConfig:
axis_Id: B
host: "nPoint000003.psi.ch"
limits:
- -50
- 50
port: 23
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline

View File

@@ -5,6 +5,13 @@ from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import raise_if_disconnected
from prettytable import PrettyTable
from typeguard import typechecked
from ophyd.utils import LimitError, ReadOnlyError
from ophyd import Device, PositionerBase, Signal, SignalRO
from ophyd_devices.utils.socket import SocketIO, SocketSignal
from ophyd import Component as Cpt
import numpy as np
import threading
from ophyd.status import wait as status_wait
def channel_checked(fcn):
@@ -17,6 +24,8 @@ def channel_checked(fcn):
return wrapper
class NpointError(Exception):
pass
class NPointController(Controller):
@@ -290,132 +299,236 @@ class NPointController(Controller):
self.off()
class NPointAxis:
def __init__(self, controller: NPointController, channel: int, name: str) -> None:
super().__init__()
self._axis_range = 100
self.controller = controller
self.channel = channel
self.name = name
self.controller._check_channel(channel)
self._settling_time = 0.1
if self.settling_time == 0:
self.settling_time = 0.1
print(f"Setting the npoint settling time to {self.settling_time:.2f} s.")
print(
"You can set the settling time depending on the stage tuning\nusing the settling_time property."
)
print("This is the waiting time before the counting is done.")
class NpointSignalBase(SocketSignal):
def __init__(self, signal_name, **kwargs):
self.signal_name = signal_name
super().__init__(**kwargs)
self.controller:NPointController = self.parent.controller
self.sock = self.parent.controller.sock
def show_all(self) -> None:
self.controller.show_all()
@raise_if_disconnected
def get(self) -> float:
"""Get current position for this channel.
class NpointSignalRO(NpointSignalBase):
def __init__(self, signal_name, **kwargs):
super().__init__(signal_name, **kwargs)
self._metadata["write_access"] = False
Raises:
RuntimeError: Raised if channel is not connected.
@threadlocked
def _socket_set(self, val):
raise ReadOnlyError("Read-only signals cannot be set")
Returns:
float: position
"""
return self.controller._get_current_pos(self.channel)
@raise_if_disconnected
def get_target_pos(self) -> float:
"""Get target position for this channel.
class NpointReadbackSignal(NpointSignalRO):
@threadlocked
def _socket_get(self):
return self.controller._get_current_pos(self.parent.axis_Id_numeric) * self.parent.sign
Raises:
RuntimeError: Raised if channel is not connected.
Returns:
float: position
"""
return self.controller._get_target_pos(self.channel)
class NpointSetpointSignal(NpointSignalBase):
setpoint = 0
@raise_if_disconnected
@typechecked
def set(self, pos: float) -> None:
"""Set a new target position and wait until settled (settling_time).
@threadlocked
def _socket_get(self):
return self.controller._get_target_pos(self.parent.axis_Id_numeric) * self.parent.sign
Args:
pos (float): New target position
@threadlocked
def _socket_set(self, val):
target_val = val * self.parent.sign
self.setpoint = target_val
return self.controller._set_target_pos(self.parent.axis_Id_numeric, target_val * self.parent.sign)
Raises:
RuntimeError: Raised if channel is not connected.
Returns:
None
"""
self.controller._set_target_pos(self.channel, pos)
time.sleep(self.settling_time)
class NpointMotorIsMoving(SignalRO):
def set_motor_is_moving(self, value:int) -> None:
self._readback = value
class NPointAxis(Device, PositionerBase):
USER_ACCESS = ["controller"]
readback = Cpt(NpointReadbackSignal, signal_name="readback", kind="hinted")
user_setpoint = Cpt(NpointSetpointSignal, signal_name="setpoint")
motor_is_moving = Cpt(NpointMotorIsMoving, value=0, kind="normal")
settling_time = Cpt(Signal, value=0.1, kind="config")
high_limit_travel = Cpt(Signal, value=0, kind="omitted")
low_limit_travel = Cpt(Signal, value=0, kind="omitted")
SUB_READBACK = "readback"
SUB_CONNECTION_CHANGE = "connection_change"
_default_sub = SUB_READBACK
def __init__(
self,
axis_Id,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
host="mpc2680.psi.ch",
port=8085,
limits=None,
sign=1,
socket_cls=SocketIO,
tolerance:float=0.05,
**kwargs,
):
self.controller = NPointController(
socket_cls=socket_cls, socket_host=host, socket_port=port
)
self.axis_Id = axis_Id
self.sign = sign
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.tolerance = tolerance
super().__init__(
prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.readback.name = self.name
self.controller.subscribe(
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
)
self._update_connection_state()
if limits is not None:
assert len(limits) == 2
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
@property
def connected(self) -> bool:
return self.controller.connected
def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get())
@property
@raise_if_disconnected
def servo(self) -> int:
"""Get servo status
Raises:
RuntimeError: Raised if channel is not connected.
Returns:
int: Servo status
"""
return self.controller._get_servo(self.channel)
@servo.setter
@raise_if_disconnected
@typechecked
def servo(self, val: bool) -> None:
"""Set servo status
Args:
val (bool): Servo status
Raises:
RuntimeError: Raised if channel is not connected.
Returns:
None
"""
self.controller._set_servo(self.channel, val)
def low_limit(self):
return self.limits[0]
@property
def settling_time(self) -> float:
return self._settling_time
def high_limit(self):
return self.limits[1]
@settling_time.setter
@typechecked
def settling_time(self, val: float) -> None:
self._settling_time = val
print(f"Setting the npoint settling time to {val:.2f} s.")
def check_value(self, pos):
"""Check that the position is within the soft limits"""
low_limit, high_limit = self.limits
if low_limit < high_limit and not (low_limit <= pos <= high_limit):
raise LimitError(f"position={pos} not within limits {self.limits}")
def _update_connection_state(self, **kwargs):
for walk in self.walk_signals():
walk.item._metadata["connected"] = self.controller.connected
@raise_if_disconnected
def move(self, position, wait=True, **kwargs):
"""Move to a specified position, optionally waiting for motion to
complete.
Parameters
----------
position
Position to move to
moved_cb : callable
Call this callback when movement has finished. This callback must
accept one keyword argument: 'obj' which will be set to this
positioner instance.
timeout : float, optional
Maximum time to wait for the motion. If None, the default timeout
for this positioner is used.
Returns
-------
status : MoveStatus
Raises
------
TimeoutError
When motion takes longer than `timeout`
ValueError
On invalid positions
RuntimeError
If motion fails other than timing out
"""
self._started_moving = False
timeout = kwargs.pop("timeout", 10)
status = super().move(position, timeout=timeout, **kwargs)
self.user_setpoint.put(position, wait=False)
def move_and_finish():
self.motor_is_moving.set_motor_is_moving(1)
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
time.sleep(self.settling_time.get())
self.motor_is_moving.set_motor_is_moving(0)
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
success = np.isclose(val[self.name]["value"], position, atol=self.tolerance)
self._done_moving(success=success)
threading.Thread(target=move_and_finish, daemon=True).start()
try:
if wait:
status_wait(status)
except KeyboardInterrupt:
self.stop()
raise
return status
@property
def axis_Id(self):
return self._axis_Id_alpha
@axis_Id.setter
def axis_Id(self, val):
if isinstance(val, str):
if len(val) != 1:
raise ValueError(f"Only single-character axis_Ids are supported.")
self._axis_Id_alpha = val
self._axis_Id_numeric = ord(val.lower()) - 97
else:
raise TypeError(f"Expected value of type str but received {type(val)}")
@property
def axis_Id_numeric(self):
return self._axis_Id_numeric
@axis_Id_numeric.setter
def axis_Id_numeric(self, val):
if isinstance(val, int):
if val > 26:
raise ValueError(f"Numeric value exceeds supported range.")
self._axis_Id_alpha = val
self._axis_Id_numeric = (chr(val + 97)).capitalize()
else:
raise TypeError(f"Expected value of type int but received {type(val)}")
@property
def egu(self):
"""The engineering units (EGU) for positions"""
return "um"
def stage(self) -> list[object]:
return super().stage()
def unstage(self) -> list[object]:
return super().unstage()
if __name__ == "__main__":
## EXAMPLES ##
#
# Create controller and socket instance explicitly:
# controller = NPointController(SocketIO())
# npointx = NPointAxis(controller, 0, "nx")
# npointy = NPointAxis(controller, 1, "ny")
npx = NPointAxis(axis_Id="A", name="npx", host="nPoint000003.psi.ch", port=23)
npy = NPointAxis(axis_Id="B", name="npy", host="nPoint000003.psi.ch", port=23)
npx.controller.on()
print("socket is open, axis is ready!")
npx.move(10)
print(npx.read())
npx.controller.off()
# Create controller instance explicitly
# controller = NPointController.create()
# npointx = NPointAxis(controller, 0, "nx")
# npointy = NPointAxis(controller, 1, "ny")
# Single-line axis:
# npointx = NPointAxis(NPointController.create(), 0, "nx")
#
# EPICS wrapper:
# nx = NPointEpics(NPointController.create(), 0, "nx")
controller = NPointController.create()
npointx = NPointAxis(NPointController.create(), 0, "nx")
npointy = NPointAxis(NPointController.create(), 0, "ny")