fix: fixed mo1_bragg functionality at the beamline, changed .set to .put for stability

This commit is contained in:
gac-x01da (Resp. Clark Adam Hugh)
2024-07-16 14:51:02 +02:00
parent 734f7e7133
commit 6c99e40a7b
3 changed files with 322 additions and 22 deletions

View File

@@ -208,7 +208,7 @@ cm_xstripe:
## Bragg Monochromator
mo1_bragg:
readoutPriority: monitored
readoutPriority: baseline
description: Positioner for the Monochromator
deviceClass: debye_bec.devices.mo1_bragg.Mo1Bragg
deviceConfig:

View File

@@ -0,0 +1,19 @@
## Bragg Monochromator
mo1_bragg:
readoutPriority: baseline
description: Positioner for the Monochromator
deviceClass: debye_bec.devices.mo1_bragg.Mo1Bragg
deviceConfig:
prefix: "X01DA-OP-MO1:BRAGG:"
onFailure: retry
enabled: true
softwareTrigger: false
dummy_pv:
readoutPriority: monitored
description: Heartbeat of Bragg
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
read_pv: "X01DA-OP-MO1:BRAGG:heartbeat_RBV"
onFailure: retry
enabled: true
softwareTrigger: false

View File

@@ -17,9 +17,13 @@ from ophyd import (
Kind,
PositionerBase,
Signal,
Staged,
)
from ophyd.utils import LimitError
import os
from ophyd_devices.utils import bec_utils, bec_scaninfo_mixin
logger = bec_logger.logger
@@ -34,6 +38,34 @@ class MoveType(str, enum.Enum):
ANGLE = "angle"
class Mo1BraggStatus(Device):
# General status
error_status = Cpt(
EpicsSignalRO, suffix="error_status_RBV", kind=Kind.config, auto_monitor=True
)
brake_enabled = Cpt(
EpicsSignalRO, suffix="brake_enabled_RBV", kind=Kind.config, auto_monitor=True
)
mot_commutated = Cpt(
EpicsSignalRO, suffix="mot_commutated_RBV", kind=Kind.config, auto_monitor=True
)
axis_enabled = Cpt(
EpicsSignalRO, suffix="axis_enabled_RBV", kind=Kind.config, auto_monitor=True
)
enc_initialized = Cpt(
EpicsSignalRO, suffix="enc_initialized_RBV", kind=Kind.config, auto_monitor=True
)
heartbeat = Cpt(
EpicsSignalRO, suffix="heartbeat_RBV", kind=Kind.config, auto_monitor=True
)
class Mo1BraggEncoder(Device):
# Encoder reinitialization
enc_reinit = Cpt(EpicsSignal, suffix="enc_reinit", kind=Kind.config)
enc_reinit_done = Cpt(
EpicsSignalRO, suffix="enc_reinit_done_RBV", kind=Kind.config, auto_monitor=True
)
class Mo1BraggCrystal(Device):
"""Class to set the crystal parameters of the Bragg positioner"""
@@ -52,9 +84,99 @@ class Mo1BraggCrystal(Device):
)
set_offset = Cpt(EpicsSignal, suffix="set_offset", kind=Kind.config)
current_xtal = Cpt(
EpicsSignalRO, suffix="current_xtal_RBV_ENUM", kind=Kind.normal, auto_monitor=True
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind=Kind.normal, auto_monitor=True
)
#TODO Reevaluate which PVs need and should have auto_monitors + kind = normal/config!
class Mo1BraggScanSettings(Device):
""" Scan Settings for Mo1 Bragg positioner"""
# XRD measurement settings
xrd_select_ref_enum = Cpt(
EpicsSignalWithRBV, suffix="xrd_select_ref_ENUM", kind=Kind.normal, auto_monitor=True
)
# High
xrd_enable_hi_enum = Cpt(
EpicsSignalWithRBV, suffix="xrd_enable_hi_ENUM", kind=Kind.normal, auto_monitor=True
)
xrd_time_hi = Cpt(
EpicsSignalWithRBV, suffix="xrd_time_hi", kind=Kind.normal, auto_monitor=True
)
xrd_n_trigger_hi = Cpt(
EpicsSignalWithRBV, suffix="xrd_n_trigger_hi", kind=Kind.normal, auto_monitor=True
)
xrd_every_n_hi = Cpt(
EpicsSignalWithRBV, suffix="xrd_every_n_hi", kind=Kind.normal, auto_monitor=True
)
# Low
xrd_enable_lo_enum = Cpt(
EpicsSignalWithRBV, suffix="xrd_enable_lo_ENUM", kind=Kind.normal, auto_monitor=True
)
xrd_time_lo = Cpt(
EpicsSignalWithRBV, suffix="xrd_time_lo", kind=Kind.normal, auto_monitor=True
)
xrd_n_trigger_lo = Cpt(
EpicsSignalWithRBV, suffix="xrd_n_trigger_lo", kind=Kind.normal, auto_monitor=True
)
xrd_every_n_lo = Cpt(
EpicsSignalWithRBV, suffix="xrd_every_n_lo", kind=Kind.normal, auto_monitor=True
)
# XAS simple scan settings
s_scan_angle_hi = Cpt(
EpicsSignalWithRBV, suffix="s_scan_angle_hi", kind=Kind.normal, auto_monitor=True
)
s_scan_angle_lo = Cpt(
EpicsSignalWithRBV, suffix="s_scan_angle_lo", kind=Kind.normal, auto_monitor=True
)
s_scan_energy_lo = Cpt(
EpicsSignalWithRBV, suffix="s_scan_energy_lo", kind=Kind.normal, auto_monitor=True
)
s_scan_energy_hi = Cpt(
EpicsSignalWithRBV, suffix="s_scan_energy_hi", kind=Kind.normal, auto_monitor=True
)
s_scan_scantime = Cpt(
EpicsSignalWithRBV, suffix="s_scan_scantime", kind=Kind.normal, auto_monitor=True
)
# XAS advanced scan settings
a_scan_pos = Cpt(
EpicsSignalWithRBV, suffix="a_scan_pos", kind=Kind.normal, auto_monitor=True
)
a_scan_vel = Cpt(
EpicsSignalWithRBV, suffix="a_scan_vel", kind=Kind.normal, auto_monitor=True
)
a_scan_time = Cpt(
EpicsSignalWithRBV, suffix="a_scan_time", kind=Kind.normal, auto_monitor=True
)
# Scan control
scan_mode_enum = Cpt(
EpicsSignalWithRBV, suffix="scan_mode_ENUM", kind=Kind.normal, auto_monitor=True
)
scan_duration = Cpt(
EpicsSignalWithRBV, suffix="scan_duration", kind=Kind.normal, auto_monitor=True
)
scan_load = Cpt(EpicsSignal, suffix="scan_load", kind=Kind.normal)
scan_msg = Cpt(
EpicsSignalRO, suffix="scan_msg_ENUM_RBV", kind=Kind.normal, auto_monitor=True
)
scan_start_infinite = Cpt(EpicsSignal, suffix="scan_start_infinite", kind=Kind.normal)
scan_start_timer = Cpt(EpicsSignal, suffix="scan_start_timer", kind=Kind.normal)
scan_stop = Cpt(EpicsSignal, suffix="scan_stop", kind=Kind.normal)
scan_status = Cpt(
EpicsSignalRO, suffix="scan_status_ENUM_RBV", kind=Kind.normal, auto_monitor=True
)
scan_time_left = Cpt(
EpicsSignalRO, suffix="scan_time_left_RBV", kind=Kind.normal, auto_monitor=True
)
scan_done = Cpt(
EpicsSignalRO, suffix="scan_done_RBV", kind=Kind.normal, auto_monitor=True
)
class MoveTypeSignal(Signal):
"""Custom Signal to set the move type of the Bragg positioner"""
@@ -70,7 +192,7 @@ class MoveTypeSignal(Signal):
raise ValueError(
f"Invalid input for MoveTypeSignal {value}, can be either 'energy' or 'angle'"
)
self._readback = value
self._readback = value.value if isinstance(value, MoveType) else value
class Mo1Bragg(Device, PositionerBase):
@@ -79,6 +201,9 @@ class Mo1Bragg(Device, PositionerBase):
USER_ACCESS = ["set_xtal"]
crystal = Cpt(Mo1BraggCrystal, "")
encoder = Cpt(Mo1BraggEncoder, "")
scan_settings = Cpt(Mo1BraggScanSettings, "")
status = Cpt(Mo1BraggStatus, "")
# Introduced new signal to be able to switch between motion in energy or angle
move_type = Cpt(MoveTypeSignal, value=MoveType.ENERGY, kind=Kind.normal)
@@ -136,6 +261,25 @@ class Mo1Bragg(Device, PositionerBase):
self._stopped = False
self.device_manager = device_manager
self._move_thread = None
#TODO rename energy pos to be readback name
self.feedback_pos_energy.name = self.name
self.service_cfg = None
self.scaninfo = None
if device_manager:
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
self.connector = self.device_manager.connector
self._update_scaninfo()
def _update_scaninfo(self) -> None:
"""Update scaninfo from BecScaninfoMixing
This depends on device manager and operation/sim_mode
"""
self.scaninfo = bec_scaninfo_mixin.BecScaninfoMixin(self.device_manager)
self.scaninfo.load_scan_metadata()
@property
def limits(self) -> tuple:
@@ -184,6 +328,7 @@ class Mo1Bragg(Device, PositionerBase):
def stop(self, *, success=False) -> None:
"""Stop any motion on the positioner"""
#TODO Add PV call to interrupt the motion
self._stopped = True
if self._move_thread is not None:
self._move_thread.join()
@@ -209,28 +354,36 @@ class Mo1Bragg(Device, PositionerBase):
# TODO - discuss with Klaus if we like to handle motion with potential exceptions like this for custom positioners
try:
# Set the target position on IOC
move_cpt.set(target_pos)
move_cpt.put(target_pos)
# Start motion
self.move_abs.set(1)
# TODO - Check if a sleep is needed here with a test at the beamline
# Loop until the motion is done and run the subscriptions
self.move_abs.put(1)
# Sleep needed due to updated frequency to soft IOC
time.sleep(0.5)
while self.move_abs_done.get() == 0:
# Is this needed since BEC is subscribed to the feedback_pos_angle due to the auto_monitor=True
self._run_subs(sub_type=self.SUB_READBACK, value=read_cpt.get())
val = read_cpt.get()
self._run_subs(sub_type=self.SUB_READBACK, value=val)
logger.info(f"Current position of {self.name} is {val}")
if self._stopped:
success = False
break
time.sleep(update_frequency)
# pylint: disable=broad-except
#TODO If the excetpion block remains here, it resolves directly the statusobject. I do not understand why
#pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Error in move thread of device {self.name}: {content}")
exception = exc
status.set_exception(exc=exception)
finally:
if exception:
status.set_exception(exception)
else:
self._done_moving(success=success)
self._done_moving(success=success)
status.set_finished()
# pylint: disable=broad-except
# except Exception as exc:
# content = traceback.format_exc()
# logger.error(f"Error in move thread of device {self.name}: {content}")
# exception = exc
# status.set_exception(exc=exception)
# TODO Should the ophyd implement a CTRL-C error handling?
def move(self, value: float, move_type: str | MoveType = None, **kwargs) -> DeviceStatus:
@@ -247,11 +400,11 @@ class Mo1Bragg(Device, PositionerBase):
"""
self._stopped = False
if move_type is not None:
self.move_type.set(move_type)
self.move_type.put(move_type)
move_type = self.move_type.get()
# This checks also if the value is within the limits
status = super().move(value)
# Start motion
self.check_value(value)
status = DeviceStatus(device=self)
move_cpt = (
self.setpoint_abs_energy if move_type == MoveType.ENERGY else self.setpoint_abs_angle
)
@@ -282,13 +435,13 @@ class Mo1Bragg(Device, PositionerBase):
d_spacing_si311 (float) : d-spacing for the 311 crystal
"""
if offset_si111 is not None:
self.crystal.offset_si111.set(offset_si111)
self.crystal.offset_si111.put(offset_si111)
if offset_si311 is not None:
self.crystal.offset_si311.set(offset_si311)
self.crystal.offset_si311.put(offset_si311)
if d_spacing_si111 is not None:
self.crystal.d_spacing_si111.set(d_spacing_si111)
self.crystal.d_spacing_si111.put(d_spacing_si111)
if d_spacing_si311 is not None:
self.crystal.d_spacing_si311.set(d_spacing_si311)
self.crystal.d_spacing_si311.put(d_spacing_si311)
if xtal_enum == "111":
crystal_set = 0
elif xtal_enum == "311":
@@ -297,9 +450,137 @@ class Mo1Bragg(Device, PositionerBase):
raise ValueError(
f"Invalid argument for xtal_enum : {xtal_enum}, choose from '111' or '311'"
)
self.crystal.xtal_enum.set(crystal_set)
self.crystal.xtal_enum.put(crystal_set)
# Send the new settings from the IOC to the motor controller
self.crystal.set_offset.set(1)
self.crystal.set_offset.put(1)
def set_xas_settings(self, low:float, high:float, scan_time:float):
move_type = self.move_type.get()
if move_type == MoveType.ENERGY:
self.scan_settings.s_scan_energy_lo.put(low)
self.scan_settings.s_scan_energy_hi.put(high)
else:
self.scan_settings.s_scan_angle_lo.put(low)
self.scan_settings.s_scan_angle_hi.put(high)
self.scan_settings.s_scan_scantime.put(scan_time)
def set_xrd_settings(self, enable:bool):
self.scan_settings.xrd_enable_hi_enum.put(int(enable))
self.scan_settings.xrd_enable_lo_enum.put(int(enable))
def scan_control_settings(self, mode: Literal["simple", "advanced"], scan_duration:float):
setpoint = 0 if mode == "simple" else 1
self.scan_settings.scan_mode_enum.put(setpoint) # 0 Simple 1 Advance
self.scan_settings.scan_duration.put(scan_duration)
def setup_simple_xas_scan(self, low:float, high:float, scan_time:float, mode:Literal["simple", "advanced"], scan_duration:float=0, **kwargs):
#TODO check if maybe we want an extra argument for infinite or finite motion
self.set_xas_settings(low=low, high=high, scan_time=scan_time)
self.set_xrd_settings(False)
self.scan_control_settings(mode=mode, scan_duration=scan_duration)
#self.scan_control.
self.scan_settings.scan_load.put(1)
self._wait_validation(**kwargs)
def _wait_validation(self, timeout:float=3, interval:float=0.1):
timer = 0
while True:
if self._stopped:
break
validation = self.scan_settings.scan_msg.get()
if validation == 2:
return
if validation > 2:
#TODO make more explicit base on ENUMS from PV
raise Mo1BraggError("Validation Error of scan parameter settings")
if timer>timeout:
raise TimeoutError(f"Scan Validation timeout after {timeout}s")
timer += interval
time.sleep(interval)
def kickoff(self):
status = DeviceStatus(self)
scan_duration = self.scan_settings.scan_duration.get()
if scan_duration < 0.1:
self.scan_settings.scan_start_infinite.put(1)
return
self.scan_settings.scan_start_timer.put(1)
status.set_finished()
return status
def stage(self) -> list[object]:
"""
Stage device in preparation for a scan.
First we check if the device is already staged. Stage is idempotent,
if staged twice it should raise (we let ophyd.Device handle the raise here).
We reset the stopped flag and get the scaninfo from BEC, before calling custom_prepare.on_stage.
Returns:
list(object): list of objects that were staged
"""
if self._staged != Staged.no:
return super().stage()
self._stopped = False
self.scaninfo.load_scan_metadata()
# Fill in the decision making what should be staged
#
return super().stage()
def complete(self) -> None:
"""Complete the acquisition, called from BEC.
This function is called after the scan is complete, just before unstage.
We can check here with the data backend and detector if the acquisition successfully finished.
Actions are implemented in custom_prepare.on_complete since they are beamline specific.
"""
# pylint: disable=assignment-from-no-return
# IMPLEMENT complete logic here still
status = self._on_complete()
if isinstance(status, DeviceStatus):
return status
status = DeviceStatus(self)
status.set_finished()
return status
def unstage(self) -> list[object]:
"""
Unstage device after a scan.
We first check if the scanID has changed, thus, the scan was unexpectedly interrupted but the device was not stopped.
If that is the case, the stopped flag is set to True, which will immediately unstage the device.
Custom_prepare.on_unstage is called to allow for BL specific logic to be executed.
Returns:
list(object): list of objects that were unstaged
"""
self.check_scan_id()
if self._stopped is True:
return super().unstage()
#self.custom_prepare.on_unstage()
self._stopped = False
return super().unstage()
def check_scan_id(self) -> None:
"""Checks if scan_id has changed and set stopped flagged to True if it has."""
old_scan_id = self.scaninfo.scan_id
self.scaninfo.load_scan_metadata()
if self.scaninfo.scan_id != old_scan_id:
self._stopped = True
def _on_complete(self):
pass
if __name__ == "__main__":