refactor: refactoring of mo1_bragg, add move method

This commit is contained in:
2024-07-12 11:56:37 +02:00
parent ad0db78e45
commit 0dd8d7ae83

View File

@@ -1,82 +1,179 @@
from ophyd import Device, PositionerBase, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Kind, Signal, DeviceStatus
from ophyd import Component as Cpt
from ophyd.utils import LimitError
from typing import Literal
""" Module for the Mo1 Bragg positioner of the Debye beamline"""
import enum
import threading
import time
import traceback
from typing import Literal
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import (
Device,
DeviceStatus,
EpicsSignal,
EpicsSignalRO,
EpicsSignalWithRBV,
Kind,
PositionerBase,
Signal,
)
from ophyd.utils import LimitError
logger = bec_logger.logger
class Mo1BraggError(Exception):
"""Mo1Bragg specific exception"""
class MoveType(str, enum.Enum):
ENERGY = 'energy'
ANGLE = 'angle'
"""Enum class to switch between move types energy and angle for the Bragg positioner"""
ENERGY = "energy"
ANGLE = "angle"
class Mo1BraggCrystal:
"""Class to set the crystal parameters of the Bragg positioner"""
offset_si111 = Cpt(
EpicsSignalWithRBV, suffix="offset_si111", kind=Kind.config, auto_monitor=True
)
offset_si311 = Cpt(
EpicsSignalWithRBV, suffix="offset_si311", kind=Kind.config, auto_monitor=True
)
xtal_enum = Cpt(EpicsSignalWithRBV, suffix="xtal_ENUM", kind=Kind.config, auto_monitor=True)
d_spacing_si111 = Cpt(
EpicsSignalWithRBV, suffix="d_spacing_si111", kind=Kind.config, auto_monitor=True
)
d_spacing_si311 = Cpt(
EpicsSignalWithRBV, suffix="d_spacing_si311", kind=Kind.config, auto_monitor=True
)
set_offset = Cpt(EpicsSignal, suffix="set_offset", kind=Kind.config)
current_xtal = Cpt(
EpicsSignalRO, suffix="current_xtal_RBV_ENUM", kind=Kind.normal, auto_monitor=True
)
class MoveTypeSignal(Signal):
""" Custom Signal to set the move type of the Bragg positioner"""
def set(self, value:str | MoveType) -> None:
""" Returns currently active move method
"""Custom Signal to set the move type of the Bragg positioner"""
# pylint: disable=arguments-differ
def set(self, value: str | MoveType) -> None:
"""Returns currently active move method
Args:
value (str | MoveType) : Can be either 'energy' or 'angle'
"""
if value not in ['energy', 'angle']:
raise ValueError(f"Invalid input for MoveTypeSignal {value}, can be either 'energy' or 'angle'")
if value not in [MoveType.ENERGY, MoveType.ANGLE]:
raise ValueError(
f"Invalid input for MoveTypeSignal {value}, can be either 'energy' or 'angle'"
)
self._readback = value
class Mo1Bragg(Device, PositionerBase):
"""Class for the Mo1 Bragg positioner of the Debye beamline"""
USER_ACCESS = ["set_xtal"]
offset_si111 = Cpt(EpicsSignalWithRBV, suffix='offset_si111', kind=Kind.config, auto_monitor=True)
offset_si311 = Cpt(EpicsSignalWithRBV, suffix='offset_si311', kind=Kind.config, auto_monitor=True)
# For the moment commented until the PV suffix in EPICS changes (RBV was on xtal_RBV_ENUM -> xtal_ENUM_RBV)
xtal_enum = Cpt(EpicsSignal, suffix='xtal_ENUM', kind=Kind.config, auto_monitor=True)
d_spacing_si111 = Cpt(EpicsSignalWithRBV, suffix='d_spacing_si111', kind=Kind.config, auto_monitor=True)
d_spacing_si311 = Cpt(EpicsSignalWithRBV, suffix='d_spacing_si311', kind=Kind.config, auto_monitor=True)
set_offset = Cpt(EpicsSignal, suffix ='set_offset', kind=Kind.config)
current_xtal = Cpt(EpicsSignalRO, suffix='current_xtal_RBV_ENUM', kind=Kind.normal, auto_monitor=True)
move_type = Cpt(MoveTypeSignal, value='energy', kind=Kind.normal)
crystal = Cpt(Mo1BraggCrystal, "")
feedback_pos_energy = Cpt(EpicsSignalRO, suffix="feedback_pos_energy_RBV", kind=Kind.hinted, auto_monitor=True)
feedback_pos_angle = Cpt(EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind=Kind.hinted, auto_monitor=True)
low_limit_angle = Cpt(EpicsSignalRO, suffix="lo_lim_pos_RBV", kind=Kind.config, auto_monitor=True)
high_limit_angle = Cpt(EpicsSignalRO, suffix="hi_lim_pos_RBV", kind=Kind.config, auto_monitor=True)
# Introduced new signal to be able to switch between motion in energy or angle
move_type = Cpt(MoveTypeSignal, value="energy", kind=Kind.normal)
move_velocity = Cpt(EpicsSignalWithRBV, suffix="move_velocity", kind=Kind.config, auto_monitor=True)
setpoint_abs_angle = Cpt(EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind=Kind.normal, auto_monitor=True)
setpoint_abs_energy = Cpt(EpicsSignalWithRBV, suffix="set_abs_pos_energy", kind=Kind.normal, auto_monitor=True)
setpoint_rel_angle = Cpt(EpicsSignalWithRBV, suffix="set_rel_pos_angle", kind=Kind.normal, auto_monitor=True)
setpoint_rel_energy = Cpt(EpicsSignalWithRBV, suffix="set_rel_pos_energy", kind=Kind.normal, auto_monitor=True)
# Motor PVs
feedback_pos_energy = Cpt(
EpicsSignalRO, suffix="feedback_pos_energy_RBV", kind=Kind.hinted, auto_monitor=True
)
feedback_pos_angle = Cpt(
EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind=Kind.hinted, auto_monitor=True
)
# TODO - Check here if these are the correct PVs -> or without _RBV
low_limit_angle = Cpt(
EpicsSignalRO, suffix="lo_lim_pos_angle_RBV", kind=Kind.config, auto_monitor=True
)
high_limit_angle = Cpt(
EpicsSignalRO, suffix="hi_lim_pos_angle_RBV", kind=Kind.config, auto_monitor=True
)
low_limit_energy = Cpt(
EpicsSignalRO, suffix="lo_lim_pos_energy_RBV", kind=Kind.config, auto_monitor=True
)
high_limit_energy = Cpt(
EpicsSignalRO, suffix="hi_lim_pos_energy_RBV", kind=Kind.config, auto_monitor=True
)
move_velocity = Cpt(
EpicsSignalWithRBV, suffix="move_velocity", kind=Kind.config, auto_monitor=True
)
setpoint_abs_angle = Cpt(
EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind=Kind.normal, auto_monitor=True
)
setpoint_abs_energy = Cpt(
EpicsSignalWithRBV, suffix="set_abs_pos_energy", kind=Kind.normal, auto_monitor=True
)
# TODO in theory, not needed as this can be calculated from the absolute positions
setpoint_rel_angle = Cpt(
EpicsSignalWithRBV, suffix="set_rel_pos_angle", kind=Kind.normal, auto_monitor=True
)
setpoint_rel_energy = Cpt(
EpicsSignalWithRBV, suffix="set_rel_pos_energy", kind=Kind.normal, auto_monitor=True
)
move_abs = Cpt(EpicsSignal, suffix="move_abs", kind=Kind.config)
move_abs_done = Cpt(EpicsSignalRO, suffix="move_abs_done_RBV", kind=Kind.normal, auto_monitor=True)
move_abs_done = Cpt(
EpicsSignalRO, suffix="move_abs_done_RBV", kind=Kind.normal, auto_monitor=True
)
SUB_READBACK = "readback"
_default_sub = SUB_READBACK
def __init__(
self, prefix="", *, name: str, kind: Kind = None, device_manager=None, parent=None, **kwargs
):
super().__init__(prefix, name=name, kind=kind, parent=parent, **kwargs)
self._stopped = False
self.device_manager = device_manager
self._move_thread = None
@property
def limits(self) -> tuple:
"""Return limits of the Bragg positioner"""
if self.move_type.get() == 'energy':
raise NotImplementedError("PVs need to still be exposed to the softIOC")
# return ()
if self.move_type.get() == "energy":
return (self.low_limit_energy.get(), self.high_limit_energy.get())
return (self.low_limit_angle.get(), self.high_limit_angle.get())
@property
def low_limit(self) -> float:
""" Return low limit of axis"""
"""Return low limit of axis"""
return self.limits[0]
@property
def high_limit(self) -> float:
""" Return high limit of axis"""
"""Return high limit of axis"""
return self.limits[1]
#pylint: disable:arguments-differ
def check_value(self, value:float) -> None:
""" Check if value is within limits
@property
def egu(self) -> str:
"""Return the engineering units of the positioner"""
if self.move_type.get() == "energy":
return "eV"
return "deg"
@property
def position(self) -> float:
"""Return the current position of Mo1Bragg, considering the move type"""
move_type = self.move_type.get()
move_cpt = (
self.feedback_pos_energy if move_type == MoveType.ENERGY else self.feedback_pos_angle
)
return move_cpt.get()
# pylint: disable=arguments-differ
def check_value(self, value: float) -> None:
"""Check if value is within limits
Args:
value (float) : value to move axis to.
"""
@@ -84,66 +181,113 @@ class Mo1Bragg(Device, PositionerBase):
if low_limit < high_limit and not low_limit <= value <= high_limit:
raise LimitError(f"position={value} not within limits {self.limits}")
# def move(self, value:float, move_type:str='angle', **kwargs) -> DeviceStatus:
# self._stopped = False
# status = super().move(value)
# # Start motion
# def thread_function(status):
# while self.move_abs_done == 0
# self._run_subs(sub_type=self.SUB_READBACK, value = self.feedback_pos_angle.get())
def stop(self, *, success=False) -> None:
"""Stop any motion on the positioner"""
self._stopped = True
if self._move_thread is not None:
self._move_thread.join()
self._move_thread = None
super().stop(success=success)
# if self._stopped ==True:
# status.set_exception()
# return
def _move_and_finish(
self, target_pos: float, move_cpt: Cpt, status: DeviceStatus, update_frequency: float = 0.1
) -> None:
"""Move the simulated device and finish the motion.
# status.set_finished()
# def stop()->None:
# self._stopped=True
# def _move_and_finished(self, start_pos, stop_pos, st):
# success = True
def set_xtal(self,
xtal_enum:Literal['111', '311'],
offset_si111:float=None,
offset_si311:float=None,
d_spacing_si111:float=None,
d_spacing_si311:float=None,
)->None:
""" Method to set the crystal
Args:
target_pos (float) : target position for the motion
status (DeviceStatus) : status object to set the status of the motion
"""
success = True
move_cpt.set(target_pos)
# TODO - discuss with Klaus if we like to handle motion with potential exceptions like this for custom positioners
try:
# Start motion
self.move_abs.set(1)
# TODO - Check if a sleep is needed here with a test at the beamline
# Loop until the motion is done and run the subscriptions
while self.move_abs_done.get() == 0:
# Is this needed since BEC is subscribed to the feedback_pos_angle due to the auto_monitor=True
self._run_subs(sub_type=self.SUB_READBACK, value=self.position)
if self._stopped:
success = False
break
time.sleep(update_frequency)
self._done_moving(success=success)
status.set_finished()
# pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Error in move thread of device {self.name}: {content}")
status.set_exception(exc)
def move(self, value: float, move_type: str | MoveType = None, **kwargs) -> DeviceStatus:
"""Move the Bragg positioner to the specified value.
The motion considers the current move_type, and allows it to be adjusted if specified.
This allows the motion to switch between energy and angle moves.
Args:
value (float) : target value for the motion
move_type (str | MoveType) : Optional, specify the type of move, either 'energy' or 'angle'
Returns:
DeviceStatus : status object to track the motion
"""
self._stopped = False
if move_type is not None:
self.move_type.set(move_type)
move_type = self.move_type.get()
# This checks also if the value is within the limits
status = super().move(value)
# Start motion
move_cpt = (
self.setpoint_abs_energy if move_type == MoveType.ENERGY else self.setpoint_abs_angle
)
self._move_thread = threading.Thread(
target=self._move_and_finish, args=(value, move_cpt, status, 0.1)
)
self._move_thread.start()
return status
def set_xtal(
self,
xtal_enum: Literal["111", "311"],
offset_si111: float = None,
offset_si311: float = None,
d_spacing_si111: float = None,
d_spacing_si311: float = None,
) -> None:
"""Method to set the crystal parameters of the Bragg positioner
Args:
xtal_enum (Literal["111", "311"]) : Enum to set the crystal orientation
offset_si111 (float) : Offset for the 111 crystal
offset_si311 (float) : Offset for the 311 crystal
d_spacing_si111 (float) : d-spacing for the 111 crystal
d_spacing_si311 (float) : d-spacing for the 311 crystal
"""
if offset_si111 is not None:
self.offset_si111.set(offset_si111)
self.crystal.offset_si111.set(offset_si111)
if offset_si311 is not None:
self.offset_si311.set(offset_si311)
self.crystal.offset_si311.set(offset_si311)
if d_spacing_si111 is not None:
self.d_spacing_si111.set(d_spacing_si111)
self.crystal.d_spacing_si111.set(d_spacing_si111)
if d_spacing_si311 is not None:
self.d_spacing_si311.set(d_spacing_si311)
if xtal_enum == '111':
self.crystal.d_spacing_si311.set(d_spacing_si311)
if xtal_enum == "111":
crystal_set = 0
elif xtal_enum == '311':
elif xtal_enum == "311":
crystal_set = 1
else:
raise ValueError(f"Invalid argument for xtal_enum : {xtal_enum}, choose from '111' or '311'")
self.xtal_enum.set(crystal_set)
self.set_offset.set(1)
raise ValueError(
f"Invalid argument for xtal_enum : {xtal_enum}, choose from '111' or '311'"
)
self.crystal.xtal_enum.set(crystal_set)
# Send the new settings from the IOC to the motor controller
self.crystal.set_offset.set(1)
if __name__ == "__main__":
#bragg = Mo1Bragg(name='bragg', prefix="X01DA-OP-MO1:BRAGG:")
pass
# bragg = Mo1Bragg(name='bragg', prefix="X01DA-OP-MO1:BRAGG:")
pass