diff --git a/debye_bec/devices/mo1_bragg.py b/debye_bec/devices/mo1_bragg.py index a807678..0b820ea 100644 --- a/debye_bec/devices/mo1_bragg.py +++ b/debye_bec/devices/mo1_bragg.py @@ -1,82 +1,179 @@ -from ophyd import Device, PositionerBase, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Kind, Signal, DeviceStatus -from ophyd import Component as Cpt -from ophyd.utils import LimitError -from typing import Literal +""" Module for the Mo1 Bragg positioner of the Debye beamline""" + import enum +import threading +import time +import traceback +from typing import Literal + +from bec_lib.logger import bec_logger +from ophyd import Component as Cpt +from ophyd import ( + Device, + DeviceStatus, + EpicsSignal, + EpicsSignalRO, + EpicsSignalWithRBV, + Kind, + PositionerBase, + Signal, +) +from ophyd.utils import LimitError + +logger = bec_logger.logger + class Mo1BraggError(Exception): """Mo1Bragg specific exception""" + class MoveType(str, enum.Enum): - ENERGY = 'energy' - ANGLE = 'angle' + """Enum class to switch between move types energy and angle for the Bragg positioner""" + + ENERGY = "energy" + ANGLE = "angle" + + +class Mo1BraggCrystal: + """Class to set the crystal parameters of the Bragg positioner""" + + offset_si111 = Cpt( + EpicsSignalWithRBV, suffix="offset_si111", kind=Kind.config, auto_monitor=True + ) + offset_si311 = Cpt( + EpicsSignalWithRBV, suffix="offset_si311", kind=Kind.config, auto_monitor=True + ) + xtal_enum = Cpt(EpicsSignalWithRBV, suffix="xtal_ENUM", kind=Kind.config, auto_monitor=True) + d_spacing_si111 = Cpt( + EpicsSignalWithRBV, suffix="d_spacing_si111", kind=Kind.config, auto_monitor=True + ) + d_spacing_si311 = Cpt( + EpicsSignalWithRBV, suffix="d_spacing_si311", kind=Kind.config, auto_monitor=True + ) + set_offset = Cpt(EpicsSignal, suffix="set_offset", kind=Kind.config) + current_xtal = Cpt( + EpicsSignalRO, suffix="current_xtal_RBV_ENUM", kind=Kind.normal, auto_monitor=True + ) + class MoveTypeSignal(Signal): - """ Custom Signal to set the move type of the Bragg positioner""" - - def set(self, value:str | MoveType) -> None: - """ Returns currently active move method - + """Custom Signal to set the move type of the Bragg positioner""" + + # pylint: disable=arguments-differ + def set(self, value: str | MoveType) -> None: + """Returns currently active move method + Args: value (str | MoveType) : Can be either 'energy' or 'angle' """ - if value not in ['energy', 'angle']: - raise ValueError(f"Invalid input for MoveTypeSignal {value}, can be either 'energy' or 'angle'") + if value not in [MoveType.ENERGY, MoveType.ANGLE]: + raise ValueError( + f"Invalid input for MoveTypeSignal {value}, can be either 'energy' or 'angle'" + ) self._readback = value + class Mo1Bragg(Device, PositionerBase): + """Class for the Mo1 Bragg positioner of the Debye beamline""" USER_ACCESS = ["set_xtal"] - offset_si111 = Cpt(EpicsSignalWithRBV, suffix='offset_si111', kind=Kind.config, auto_monitor=True) - offset_si311 = Cpt(EpicsSignalWithRBV, suffix='offset_si311', kind=Kind.config, auto_monitor=True) - # For the moment commented until the PV suffix in EPICS changes (RBV was on xtal_RBV_ENUM -> xtal_ENUM_RBV) - xtal_enum = Cpt(EpicsSignal, suffix='xtal_ENUM', kind=Kind.config, auto_monitor=True) - d_spacing_si111 = Cpt(EpicsSignalWithRBV, suffix='d_spacing_si111', kind=Kind.config, auto_monitor=True) - d_spacing_si311 = Cpt(EpicsSignalWithRBV, suffix='d_spacing_si311', kind=Kind.config, auto_monitor=True) - set_offset = Cpt(EpicsSignal, suffix ='set_offset', kind=Kind.config) - current_xtal = Cpt(EpicsSignalRO, suffix='current_xtal_RBV_ENUM', kind=Kind.normal, auto_monitor=True) - move_type = Cpt(MoveTypeSignal, value='energy', kind=Kind.normal) + crystal = Cpt(Mo1BraggCrystal, "") - feedback_pos_energy = Cpt(EpicsSignalRO, suffix="feedback_pos_energy_RBV", kind=Kind.hinted, auto_monitor=True) - feedback_pos_angle = Cpt(EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind=Kind.hinted, auto_monitor=True) - low_limit_angle = Cpt(EpicsSignalRO, suffix="lo_lim_pos_RBV", kind=Kind.config, auto_monitor=True) - high_limit_angle = Cpt(EpicsSignalRO, suffix="hi_lim_pos_RBV", kind=Kind.config, auto_monitor=True) + # Introduced new signal to be able to switch between motion in energy or angle + move_type = Cpt(MoveTypeSignal, value="energy", kind=Kind.normal) - move_velocity = Cpt(EpicsSignalWithRBV, suffix="move_velocity", kind=Kind.config, auto_monitor=True) - setpoint_abs_angle = Cpt(EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind=Kind.normal, auto_monitor=True) - setpoint_abs_energy = Cpt(EpicsSignalWithRBV, suffix="set_abs_pos_energy", kind=Kind.normal, auto_monitor=True) - setpoint_rel_angle = Cpt(EpicsSignalWithRBV, suffix="set_rel_pos_angle", kind=Kind.normal, auto_monitor=True) - setpoint_rel_energy = Cpt(EpicsSignalWithRBV, suffix="set_rel_pos_energy", kind=Kind.normal, auto_monitor=True) + # Motor PVs + feedback_pos_energy = Cpt( + EpicsSignalRO, suffix="feedback_pos_energy_RBV", kind=Kind.hinted, auto_monitor=True + ) + feedback_pos_angle = Cpt( + EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind=Kind.hinted, auto_monitor=True + ) + # TODO - Check here if these are the correct PVs -> or without _RBV + low_limit_angle = Cpt( + EpicsSignalRO, suffix="lo_lim_pos_angle_RBV", kind=Kind.config, auto_monitor=True + ) + high_limit_angle = Cpt( + EpicsSignalRO, suffix="hi_lim_pos_angle_RBV", kind=Kind.config, auto_monitor=True + ) + low_limit_energy = Cpt( + EpicsSignalRO, suffix="lo_lim_pos_energy_RBV", kind=Kind.config, auto_monitor=True + ) + high_limit_energy = Cpt( + EpicsSignalRO, suffix="hi_lim_pos_energy_RBV", kind=Kind.config, auto_monitor=True + ) + + move_velocity = Cpt( + EpicsSignalWithRBV, suffix="move_velocity", kind=Kind.config, auto_monitor=True + ) + setpoint_abs_angle = Cpt( + EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind=Kind.normal, auto_monitor=True + ) + setpoint_abs_energy = Cpt( + EpicsSignalWithRBV, suffix="set_abs_pos_energy", kind=Kind.normal, auto_monitor=True + ) + # TODO in theory, not needed as this can be calculated from the absolute positions + setpoint_rel_angle = Cpt( + EpicsSignalWithRBV, suffix="set_rel_pos_angle", kind=Kind.normal, auto_monitor=True + ) + setpoint_rel_energy = Cpt( + EpicsSignalWithRBV, suffix="set_rel_pos_energy", kind=Kind.normal, auto_monitor=True + ) move_abs = Cpt(EpicsSignal, suffix="move_abs", kind=Kind.config) - move_abs_done = Cpt(EpicsSignalRO, suffix="move_abs_done_RBV", kind=Kind.normal, auto_monitor=True) + move_abs_done = Cpt( + EpicsSignalRO, suffix="move_abs_done_RBV", kind=Kind.normal, auto_monitor=True + ) SUB_READBACK = "readback" _default_sub = SUB_READBACK - + + def __init__( + self, prefix="", *, name: str, kind: Kind = None, device_manager=None, parent=None, **kwargs + ): + super().__init__(prefix, name=name, kind=kind, parent=parent, **kwargs) + self._stopped = False + self.device_manager = device_manager + self._move_thread = None + @property def limits(self) -> tuple: """Return limits of the Bragg positioner""" - if self.move_type.get() == 'energy': - raise NotImplementedError("PVs need to still be exposed to the softIOC") - # return () + if self.move_type.get() == "energy": + return (self.low_limit_energy.get(), self.high_limit_energy.get()) return (self.low_limit_angle.get(), self.high_limit_angle.get()) - + @property def low_limit(self) -> float: - """ Return low limit of axis""" + """Return low limit of axis""" return self.limits[0] - + @property def high_limit(self) -> float: - """ Return high limit of axis""" + """Return high limit of axis""" return self.limits[1] - - #pylint: disable:arguments-differ - def check_value(self, value:float) -> None: - """ Check if value is within limits - + + @property + def egu(self) -> str: + """Return the engineering units of the positioner""" + if self.move_type.get() == "energy": + return "eV" + return "deg" + + @property + def position(self) -> float: + """Return the current position of Mo1Bragg, considering the move type""" + move_type = self.move_type.get() + move_cpt = ( + self.feedback_pos_energy if move_type == MoveType.ENERGY else self.feedback_pos_angle + ) + return move_cpt.get() + + # pylint: disable=arguments-differ + def check_value(self, value: float) -> None: + """Check if value is within limits + Args: value (float) : value to move axis to. """ @@ -84,66 +181,113 @@ class Mo1Bragg(Device, PositionerBase): if low_limit < high_limit and not low_limit <= value <= high_limit: raise LimitError(f"position={value} not within limits {self.limits}") - - - # def move(self, value:float, move_type:str='angle', **kwargs) -> DeviceStatus: - # self._stopped = False - # status = super().move(value) - # # Start motion - # def thread_function(status): - # while self.move_abs_done == 0 - # self._run_subs(sub_type=self.SUB_READBACK, value = self.feedback_pos_angle.get()) + def stop(self, *, success=False) -> None: + """Stop any motion on the positioner""" + self._stopped = True + if self._move_thread is not None: + self._move_thread.join() + self._move_thread = None + super().stop(success=success) - # if self._stopped ==True: - # status.set_exception() - # return + def _move_and_finish( + self, target_pos: float, move_cpt: Cpt, status: DeviceStatus, update_frequency: float = 0.1 + ) -> None: + """Move the simulated device and finish the motion. - # status.set_finished() - - # def stop()->None: - # self._stopped=True - - - - - # def _move_and_finished(self, start_pos, stop_pos, st): - - # success = True - - - - - def set_xtal(self, - xtal_enum:Literal['111', '311'], - offset_si111:float=None, - offset_si311:float=None, - d_spacing_si111:float=None, - d_spacing_si311:float=None, - )->None: - """ Method to set the crystal - Args: + target_pos (float) : target position for the motion + status (DeviceStatus) : status object to set the status of the motion + """ + success = True + move_cpt.set(target_pos) + # TODO - discuss with Klaus if we like to handle motion with potential exceptions like this for custom positioners + try: + # Start motion + self.move_abs.set(1) + # TODO - Check if a sleep is needed here with a test at the beamline + # Loop until the motion is done and run the subscriptions + while self.move_abs_done.get() == 0: + # Is this needed since BEC is subscribed to the feedback_pos_angle due to the auto_monitor=True + self._run_subs(sub_type=self.SUB_READBACK, value=self.position) + if self._stopped: + success = False + break + time.sleep(update_frequency) + self._done_moving(success=success) + status.set_finished() + # pylint: disable=broad-except + except Exception as exc: + content = traceback.format_exc() + logger.error(f"Error in move thread of device {self.name}: {content}") + status.set_exception(exc) + + def move(self, value: float, move_type: str | MoveType = None, **kwargs) -> DeviceStatus: + """Move the Bragg positioner to the specified value. + The motion considers the current move_type, and allows it to be adjusted if specified. + This allows the motion to switch between energy and angle moves. + + Args: + value (float) : target value for the motion + move_type (str | MoveType) : Optional, specify the type of move, either 'energy' or 'angle' + + Returns: + DeviceStatus : status object to track the motion + """ + self._stopped = False + if move_type is not None: + self.move_type.set(move_type) + move_type = self.move_type.get() + # This checks also if the value is within the limits + status = super().move(value) + # Start motion + move_cpt = ( + self.setpoint_abs_energy if move_type == MoveType.ENERGY else self.setpoint_abs_angle + ) + self._move_thread = threading.Thread( + target=self._move_and_finish, args=(value, move_cpt, status, 0.1) + ) + self._move_thread.start() + return status + + def set_xtal( + self, + xtal_enum: Literal["111", "311"], + offset_si111: float = None, + offset_si311: float = None, + d_spacing_si111: float = None, + d_spacing_si311: float = None, + ) -> None: + """Method to set the crystal parameters of the Bragg positioner + + Args: + xtal_enum (Literal["111", "311"]) : Enum to set the crystal orientation + offset_si111 (float) : Offset for the 111 crystal + offset_si311 (float) : Offset for the 311 crystal + d_spacing_si111 (float) : d-spacing for the 111 crystal + d_spacing_si311 (float) : d-spacing for the 311 crystal """ if offset_si111 is not None: - self.offset_si111.set(offset_si111) + self.crystal.offset_si111.set(offset_si111) if offset_si311 is not None: - self.offset_si311.set(offset_si311) + self.crystal.offset_si311.set(offset_si311) if d_spacing_si111 is not None: - self.d_spacing_si111.set(d_spacing_si111) + self.crystal.d_spacing_si111.set(d_spacing_si111) if d_spacing_si311 is not None: - self.d_spacing_si311.set(d_spacing_si311) - if xtal_enum == '111': + self.crystal.d_spacing_si311.set(d_spacing_si311) + if xtal_enum == "111": crystal_set = 0 - elif xtal_enum == '311': + elif xtal_enum == "311": crystal_set = 1 else: - raise ValueError(f"Invalid argument for xtal_enum : {xtal_enum}, choose from '111' or '311'") - self.xtal_enum.set(crystal_set) - self.set_offset.set(1) + raise ValueError( + f"Invalid argument for xtal_enum : {xtal_enum}, choose from '111' or '311'" + ) + self.crystal.xtal_enum.set(crystal_set) + # Send the new settings from the IOC to the motor controller + self.crystal.set_offset.set(1) - if __name__ == "__main__": - #bragg = Mo1Bragg(name='bragg', prefix="X01DA-OP-MO1:BRAGG:") - pass \ No newline at end of file + # bragg = Mo1Bragg(name='bragg', prefix="X01DA-OP-MO1:BRAGG:") + pass