feat: add DDG and prel. sgalil devices

This commit is contained in:
wakonig_k 2023-07-04 15:34:22 +00:00
commit 00c55016e7
5 changed files with 613 additions and 6 deletions

View File

@ -1,6 +1,7 @@
from .eiger1p5m_csaxs.eiger1p5m import Eiger1p5MDetector from .eiger1p5m_csaxs.eiger1p5m import Eiger1p5MDetector
from .epics import * from .epics import *
from .galil.galil_ophyd import GalilMotor from .galil.galil_ophyd import GalilMotor
from .galil.sgalil_ophyd import SGalilMotor
from .npoint.npoint import NPointAxis from .npoint.npoint import NPointAxis
from .rt_lamni import RtLamniMotor from .rt_lamni import RtLamniMotor
from .sim.sim import ( from .sim.sim import (

View File

@ -6,7 +6,7 @@ Created on Tue Nov 9 16:12:47 2021
""" """
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd import PositionerBase from ophyd import PositionerBase, PVPositioner, Signal
from ophyd.pseudopos import ( from ophyd.pseudopos import (
pseudo_position_argument, pseudo_position_argument,
real_position_argument, real_position_argument,
@ -40,9 +40,10 @@ class DelayStatic(Device):
) )
class DummyPositioner(Device, PositionerBase): class DummyPositioner(PVPositioner):
setpoint = Component(EpicsSignal, "DelayAO", kind=Kind.config) setpoint = Component(EpicsSignal, "DelayAO", put_complete=True, kind=Kind.config)
readback = Component(EpicsSignalRO, "DelayAI", kind=Kind.config) readback = Component(EpicsSignalRO, "DelayAI", kind=Kind.config)
done = Component(Signal, value=1)
class DelayPair(PseudoPositioner): class DelayPair(PseudoPositioner):
@ -56,6 +57,8 @@ class DelayPair(PseudoPositioner):
delay = Component(PseudoSingle, limits=(0, 2000.0), name="delay") delay = Component(PseudoSingle, limits=(0, 2000.0), name="delay")
width = Component(PseudoSingle, limits=(0, 2000.0), name="pulsewidth") width = Component(PseudoSingle, limits=(0, 2000.0), name="pulsewidth")
# The real delay axes # The real delay axes
# ch1 = Component(EpicsSignal, "DelayAI", write_pv="DelayAO", name="ch1", put_complete=True, kind=Kind.config)
# ch2 = Component(EpicsSignal, "DelayAI", write_pv="DelayAO", name="ch2", put_complete=True, kind=Kind.config)
ch1 = Component(DummyPositioner, name="ch1") ch1 = Component(DummyPositioner, name="ch1")
ch2 = Component(DummyPositioner, name="ch2") ch2 = Component(DummyPositioner, name="ch2")
@ -150,9 +153,7 @@ class DelayGeneratorDG645(Device):
) )
# Command PVs # Command PVs
arm = Component( arm = Component(EpicsSignal, "TriggerDelayBO", name="arm", kind=Kind.omitted)
EpicsSignal, "TriggerDelayBI", write_pv="TriggerDelayBO", name="arm", kind=Kind.omitted
)
# Burst mode # Burst mode
burstMode = Component( burstMode = Component(

View File

@ -1 +1,2 @@
from .galil_ophyd import GalilMotor, GalilController from .galil_ophyd import GalilMotor, GalilController
from .sgalil_ophyd import SGalilMotor

View File

@ -0,0 +1,22 @@
# Summary on communication commands for SGalilMotor
## sgalil_y - vertical axis (samy)
- Axis 2, C
- in motion: "MG _BG{axis_char}", e.g. "MG _BGC" , 0 or 1
- limit switch not pressed: "MG _LR{axis_char}, _LF{axis_char}" , 0 or 1
- position: "MG _TP{axis_char}/mm" , position in mm
- Axis referenced: "MG allaxref", 0 or 1
- stop all axis: "XQ#STOP,1"
- is motor on: "MG _MO{axis_char}", 0 or 1
- is thread active: "MG _XQ{thread_id}", 0 or 1
**Specific for sgalil_y**
- set_motion_speed: "SP{axis_char}=2*mm", 2mm/s is max speed
- set_final_pos: "PA{axis_char}={val:04f}*mm", target pos in mm
- start motion: "BG{axis_char}", start motion
## sgalil_y - horizontal axis (samx) - due to hardware modifications a bit more complicated
- initiate with Axis 4, E
**Specific for sgalil_x**
- set_final_pos: "targ{axis_char}={val:04f}", e.g. "targE=2.0000"
- start motion: "XQ#POSE,{axis_char}"
- For *in motion* and *limit switch not pressed* commands,
the key changes to AXIS 5 || F, e.g. "MG _BGF"
- For *position* switch to Axis 0 || A, e.g. "MG _TPA/mm"

View File

@ -0,0 +1,582 @@
import functools
import threading
import time
from typing import List
import numpy as np
from bec_lib.core import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError, ReadOnlyError
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
from prettytable import PrettyTable
logger = bec_logger.logger
class GalilCommunicationError(Exception):
pass
class GalilError(Exception):
pass
class BECConfigError(Exception):
pass
def retry_once(fcn):
"""Decorator to rerun a function in case a Galil communication error was raised. This may happen if the buffer was not empty."""
@functools.wraps(fcn)
def wrapper(self, *args, **kwargs):
try:
val = fcn(self, *args, **kwargs)
except (GalilCommunicationError, GalilError):
val = fcn(self, *args, **kwargs)
return val
return wrapper
class GalilController(Controller):
USER_ACCESS = [
"describe",
"show_running_threads",
"galil_show_all",
"socket_put_and_receive",
"socket_put_confirmed",
]
def __init__(
self,
*,
name="GalilController",
kind=None,
parent=None,
socket=None,
attr_name="",
labels=None,
):
if not hasattr(self, "_initialized") or not self._initialized:
self._galil_axis_per_controller = 8
self._axis = [None for axis_num in range(self._galil_axis_per_controller)]
super().__init__(
name=name,
socket=socket,
attr_name=attr_name,
parent=parent,
labels=labels,
kind=kind,
)
def on(self, controller_num=0) -> None:
"""Open a new socket connection to the controller"""
if not self.connected:
self.sock.open()
self.connected = True
else:
logger.info("The connection has already been established.")
# warnings.warn(f"The connection has already been established.", stacklevel=2)
def off(self) -> None:
"""Close the socket connection to the controller"""
if self.connected:
self.sock.close()
self.connected = False
else:
logger.info("The connection is already closed.")
def set_axis(self, axis: Device, axis_nr: int) -> None:
"""Assign an axis to a device instance.
Args:
axis (Device): Device instance (e.g. GalilMotor)
axis_nr (int): Controller axis number
"""
self._axis[axis_nr] = axis
@threadlocked
def socket_put(self, val: str) -> None:
self.sock.put(f"{val}\r".encode())
@threadlocked
def socket_get(self) -> str:
return self.sock.receive().decode()
@retry_once
@threadlocked
def socket_put_and_receive(self, val: str, remove_trailing_chars=True) -> str:
self.socket_put(val)
if remove_trailing_chars:
return self._remove_trailing_characters(self.sock.receive().decode())
return self.socket_get()
@retry_once
def socket_put_confirmed(self, val: str) -> None:
"""Send message to controller and ensure that it is received by checking that the socket receives a colon.
Args:
val (str): Message that should be sent to the socket
Raises:
GalilCommunicationError: Raised if the return value is not a colon.
"""
return_val = self.socket_put_and_receive(val)
if return_val != ":":
raise GalilCommunicationError(
f"Expected return value of ':' but instead received {return_val}"
)
def is_axis_moving(self, axis_Id, axis_Id_numeric) -> bool:
is_moving = bool(float(self.socket_put_and_receive(f"MG_BG{axis_Id}")) != 0)
# backlash_is_active = bool(float(self.socket_put_and_receive(f"MGbcklact[axis]")) != 0)
return bool(is_moving) # bool(is_moving or backlash_is_active)
def is_thread_active(self, thread_id: int) -> bool:
val = float(self.socket_put_and_receive(f"MG_XQ{thread_id}"))
if val == -1:
return False
return True
def _remove_trailing_characters(self, var) -> str:
if len(var) > 1:
return var.split("\r\n")[0]
return var
def stop_all_axes(self) -> str:
return self.socket_put_and_receive(f"XQ#STOP,1")
def axis_is_referenced(self, axis_Id_numeric) -> bool:
return bool(float(self.socket_put_and_receive(f"MG allaxref").strip()))
def show_running_threads(self) -> None:
t = PrettyTable()
t.title = f"Threads on {self.sock.host}:{self.sock.port}"
t.field_names = [str(ax) for ax in range(self._galil_axis_per_controller)]
t.add_row(
[
"active" if self.is_thread_active(t) else "inactive"
for t in range(self._galil_axis_per_controller)
]
)
print(t)
def is_motor_on(self, axis_Id) -> bool:
return not bool(float(self.socket_put_and_receive(f"MG _MO{axis_Id}").strip()))
def get_motor_limit_switch(self, axis_Id) -> list:
# SGalil specific
if axis_id == 2:
ret = self.socket_put_and_receive(f"MG _LF{axis_Id}, _LR{axis_Id}")
high, low = ret.strip().split(" ")
elif axis_id == 4:
ret = self.socket_put_and_receive(f"MG _LF{'F'}, _LR{'F'}")
high, low = ret.strip().split(" ")
return [not bool(float(low)), not bool(float(high))]
def describe(self) -> None:
t = PrettyTable()
t.title = f"{self.__class__.__name__} on {self.sock.host}:{self.sock.port}"
t.field_names = [
"Axis",
"Name",
"Connected",
"Referenced",
"Motor On",
"Limits",
"Position",
]
for ax in range(self._galil_axis_per_controller):
axis = self._axis[ax]
if axis is not None:
t.add_row(
[
f"{axis.axis_Id_numeric}/{axis.axis_Id}",
axis.name,
axis.connected,
self.axis_is_referenced(axis.axis_Id_numeric),
self.is_motor_on(axis.axis_Id),
self.get_motor_limit_switch(axis.axis_Id),
axis.readback.read().get(axis.name).get("value"),
]
)
else:
t.add_row([None for t in t.field_names])
print(t)
self.show_running_threads()
def galil_show_all(self) -> None:
for controller in self._controller_instances.values():
if isinstance(controller, GalilController):
controller.describe()
# @threadlocked
# def fly_grid_scan(self, start_y:float, end_y:float, y_interval:int, start_x:float, end_x:float, x_interval:int, ctime:float, readtime:float) -> None:
# """_summary_
# Args:
# start_y (float): _description_
# end_y (float): _description_
# y_interval (int): _description_
# start_x (float): _description_
# end_x (float): _description_
# x_interval (int): _description_
# ctime (float): _description_
# readtime (float): _description_
# """
# #toDo Checking limits, checking logic for speed. SGALIL do 101 points when 100 are given
# # Check sign of motors, and offsets!
# speed = np.abs(end_y-start_y)/(y_interval*ctime+ (y_interval-1)*readtime)
# self.socket_put_and_receive(f"a_start={start_y:.04f};a_end={end_y:.04f};speed={speed:.04f}")
# step_grid = (end_x-start_x)/x_interval
# gridmax = (end_x-start_x)/step_grid +1
# self.socket_put_and_receive(f"b_start={start_x:.04f};gridmax={gridmax:.04f};step={step_grid:.04f}")
# self.socket_put_and_receive('XQ#SAMPLE')
# self.socket_put_and_receive('XQ#SCANG')
class GalilSignalBase(SocketSignal):
def __init__(self, signal_name, **kwargs):
self.signal_name = signal_name
super().__init__(**kwargs)
self.controller = self.parent.controller
self.sock = self.parent.controller.sock
class GalilSignalRO(GalilSignalBase):
def __init__(self, signal_name, **kwargs):
super().__init__(signal_name, **kwargs)
self._metadata["write_access"] = False
def _socket_set(self, val):
raise ReadOnlyError("Read-only signals cannot be set")
class GalilReadbackSignal(GalilSignalRO):
@retry_once
@threadlocked
def _socket_get(self) -> float:
"""Get command for the readback signal
Returns:
float: Readback value after adjusting for sign and motor resolution.
"""
if self.parent.axis_Id_numeric == 2:
current_pos = float(
self.controller.socket_put_and_receive(f"MG _TP{self.parent.axis_Id}/mm")
)
elif self.parent.axis_Id_numeric == 4:
# hardware controller readback from axis 4 is on axis 0, A instead of E
current_pos = float(self.controller.socket_put_and_receive(f"MG _TP{'A'}/mm"))
current_pos *= self.parent.sign
return current_pos
def read(self):
self._metadata["timestamp"] = time.time()
val = super().read()
return val
class GalilSetpointSignal(GalilSignalBase):
setpoint = 0
def _socket_get(self) -> float:
"""Get command for receiving the setpoint / target value.
The value is not pulled from the controller but instead just the last setpoint used.
Returns:
float: setpoint / target value
"""
return self.setpoint
@retry_once
@threadlocked
def _socket_set(self, val: float) -> None:
"""Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign.
Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted.
Args:
val (float): Target value / setpoint value
Raises:
GalilError: Raised if not all axes are referenced.
"""
target_val = val * self.parent.sign
self.setpoint = target_val
axes_referenced = float(self.controller.socket_put_and_receive("MG allaxref"))
if axes_referenced:
while self.controller.is_thread_active(0):
time.sleep(0.1)
if self.parent.axis_Id_numeric == 2:
self.controller.socket_put_confirmed(f"PA{self.parent.axis_Id}={target_val:.4f}*mm")
self.controller.socket_put_and_receive(f"BG{self.parent.axis_Id}")
elif self.parent.axis_Id_numeric == 4:
self.controller.socket_put_confirmed(f"targ{self.parent.axis_Id}={target_val:.4f}")
self.controller.socket_put_and_receive(f"XQ#POSE,{self.parent.axis_Id_numeric}")
while self.controller.is_thread_active(0):
time.sleep(0.005)
else:
raise GalilError("Not all axes are referenced.")
class GalilMotorIsMoving(GalilSignalRO):
@threadlocked
def _socket_get(self):
if self.parent.axis_Id_numeric == 2:
ret = self.controller.is_axis_moving(self.parent.axis_Id, self.parent.axis_Id_numeric)
return ret
if self.parent.axis_Id_numeric == 4:
# Motion signal from axis 4 is mapped to axis 5
ret = self.controller.is_axis_moving("F", 5)
return ret or self.controller.is_thread_active(4)
def get(self):
val = super().get()
if val is not None:
self._run_subs(
sub_type=self.SUB_VALUE,
value=val,
timestamp=time.time(),
)
return val
class GalilAxesReferenced(GalilSignalRO):
@threadlocked
def _socket_get(self):
return self.controller.socket_put_and_receive("MG allaxref")
class SGalilMotor(Device, PositionerBase):
""" "SGalil Motors at cSAXS have a
DC motor (y axis - vertical) - implemented as C
and a step motor (x-axis horizontal) - implemented as E
that require different communication for control
"""
USER_ACCESS = ["controller"]
readback = Cpt(
GalilReadbackSignal,
signal_name="readback",
kind="hinted",
)
user_setpoint = Cpt(GalilSetpointSignal, signal_name="setpoint")
motor_is_moving = Cpt(GalilMotorIsMoving, signal_name="motor_is_moving", kind="normal")
all_axes_referenced = Cpt(GalilAxesReferenced, signal_name="all_axes_referenced", kind="config")
high_limit_travel = Cpt(Signal, value=0, kind="omitted")
low_limit_travel = Cpt(Signal, value=0, kind="omitted")
SUB_READBACK = "readback"
SUB_CONNECTION_CHANGE = "connection_change"
_default_sub = SUB_READBACK
def __init__(
self,
axis_Id,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
host="129.129.122.26",
port=23,
limits=None,
sign=1,
socket_cls=SocketIO,
device_manager=None,
**kwargs,
):
self.axis_Id = axis_Id
self.sign = sign
self.controller = GalilController(socket=socket_cls(host=host, port=port))
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.tolerance = kwargs.pop("tolerance", 0.5)
self.device_mapping = kwargs.pop("device_mapping", {})
self.device_manager = device_manager
if len(self.device_mapping) > 0 and self.device_manager is None:
raise BECConfigError(
"device_mapping has been specified but the device_manager cannot be accessed."
)
self.rt = self.device_mapping.get("rt")
super().__init__(
prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.readback.name = self.name
self.controller.subscribe(
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
)
self._update_connection_state()
# self.readback.subscribe(self._forward_readback, event_type=self.readback.SUB_VALUE)
if limits is not None:
assert len(limits) == 2
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
@property
def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get())
@property
def low_limit(self):
return self.limits[0]
@property
def high_limit(self):
return self.limits[1]
def check_value(self, pos):
"""Check that the position is within the soft limits"""
low_limit, high_limit = self.limits
if low_limit < high_limit and not (low_limit <= pos <= high_limit):
raise LimitError(f"position={pos} not within limits {self.limits}")
def _update_connection_state(self, **kwargs):
for walk in self.walk_signals():
walk.item._metadata["connected"] = self.controller.connected
def _forward_readback(self, **kwargs):
kwargs.pop("sub_type")
self._run_subs(sub_type="readback", **kwargs)
@raise_if_disconnected
def move(self, position, wait=True, **kwargs):
"""Move to a specified position, optionally waiting for motion to
complete.
Parameters
----------
position
Position to move to
moved_cb : callable
Call this callback when movement has finished. This callback must
accept one keyword argument: 'obj' which will be set to this
positioner instance.
timeout : float, optional
Maximum time to wait for the motion. If None, the default timeout
for this positioner is used.
Returns
-------
status : MoveStatus
Raises
------
TimeoutError
When motion takes longer than `timeout`
ValueError
On invalid positions
RuntimeError
If motion fails other than timing out
"""
self._started_moving = False
timeout = kwargs.pop("timeout", 100)
status = super().move(position, timeout=timeout, **kwargs)
self.user_setpoint.put(position, wait=False)
def move_and_finish():
while self.motor_is_moving.get():
logger.info("motor is moving")
val = self.readback.read()
self._run_subs(
sub_type=self.SUB_READBACK,
value=val,
timestamp=time.time(),
)
time.sleep(0.1)
val = self.readback.read()
success = np.isclose(
val[self.name]["value"],
position,
atol=self.tolerance,
)
if not success:
print(" stop")
self._done_moving(success=success)
logger.info("Move finished")
threading.Thread(target=move_and_finish, daemon=True).start()
try:
if wait:
status_wait(status)
except KeyboardInterrupt:
self.stop()
raise
return status
@property
def axis_Id(self):
return self._axis_Id_alpha
@axis_Id.setter
def axis_Id(self, val):
if isinstance(val, str):
if len(val) != 1:
raise ValueError(f"Only single-character axis_Ids are supported.")
if val not in ["C", "E"]:
raise ValueError(
f"axis_id {val} is currently not supported, please use either 'C' or 'E'."
)
self._axis_Id_alpha = val
self._axis_Id_numeric = ord(val.lower()) - 97
else:
raise TypeError(f"Expected value of type str but received {type(val)}")
@property
def axis_Id_numeric(self):
return self._axis_Id_numeric
@axis_Id_numeric.setter
def axis_Id_numeric(self, val):
if isinstance(val, int):
if val not in [2, 4]:
raise ValueError(f"Numeric value {val} is not supported, it must be either 2 or 4.")
self._axis_Id_alpha = val
self._axis_Id_numeric = (chr(val + 97)).capitalize()
else:
raise TypeError(f"Expected value of type int but received {type(val)}.")
@property
def egu(self):
"""The engineering units (EGU) for positions"""
return "mm"
def stop(self, *, success=False):
self.controller.stop_all_axes()
return super().stop(success=success)
if __name__ == "__main__":
mock = False
if not mock:
samy = SGalilMotor("C", name="samy", host="129.129.122.26", port=23, sign=-1)
samx = SGalilMotor("E", name="samx", host="129.129.122.26", port=23, sign=-1)
else:
from ophyd_devices.utils.socket import SocketMock
samx = SGalilMotor("E", name="samx", host="129.129.122.26", port=23, socket_cls=SocketMock)
samy = SGalilMotor("C", name="samy", host="129.129.122.26", port=23, socket_cls=SocketMock)
samx.controller.galil_show_all()