diff --git a/debye_bec/device_configs/x01da_database.yaml b/debye_bec/device_configs/x01da_database.yaml index cabef3f..e5aaee2 100644 --- a/debye_bec/device_configs/x01da_database.yaml +++ b/debye_bec/device_configs/x01da_database.yaml @@ -208,7 +208,7 @@ cm_xstripe: ## Bragg Monochromator mo1_bragg: - readoutPriority: monitored + readoutPriority: baseline description: Positioner for the Monochromator deviceClass: debye_bec.devices.mo1_bragg.Mo1Bragg deviceConfig: diff --git a/debye_bec/device_configs/x01da_test_config.yaml b/debye_bec/device_configs/x01da_test_config.yaml new file mode 100644 index 0000000..4a9b128 --- /dev/null +++ b/debye_bec/device_configs/x01da_test_config.yaml @@ -0,0 +1,19 @@ +## Bragg Monochromator +mo1_bragg: + readoutPriority: baseline + description: Positioner for the Monochromator + deviceClass: debye_bec.devices.mo1_bragg.Mo1Bragg + deviceConfig: + prefix: "X01DA-OP-MO1:BRAGG:" + onFailure: retry + enabled: true + softwareTrigger: false +dummy_pv: + readoutPriority: monitored + description: Heartbeat of Bragg + deviceClass: ophyd.EpicsSignalRO + deviceConfig: + read_pv: "X01DA-OP-MO1:BRAGG:heartbeat_RBV" + onFailure: retry + enabled: true + softwareTrigger: false diff --git a/debye_bec/devices/mo1_bragg.py b/debye_bec/devices/mo1_bragg.py index 0297acb..b69b246 100644 --- a/debye_bec/devices/mo1_bragg.py +++ b/debye_bec/devices/mo1_bragg.py @@ -17,9 +17,13 @@ from ophyd import ( Kind, PositionerBase, Signal, + Staged, ) from ophyd.utils import LimitError +import os +from ophyd_devices.utils import bec_utils, bec_scaninfo_mixin + logger = bec_logger.logger @@ -34,6 +38,34 @@ class MoveType(str, enum.Enum): ANGLE = "angle" +class Mo1BraggStatus(Device): + # General status + error_status = Cpt( + EpicsSignalRO, suffix="error_status_RBV", kind=Kind.config, auto_monitor=True + ) + brake_enabled = Cpt( + EpicsSignalRO, suffix="brake_enabled_RBV", kind=Kind.config, auto_monitor=True + ) + mot_commutated = Cpt( + EpicsSignalRO, suffix="mot_commutated_RBV", kind=Kind.config, auto_monitor=True + ) + axis_enabled = Cpt( + EpicsSignalRO, suffix="axis_enabled_RBV", kind=Kind.config, auto_monitor=True + ) + enc_initialized = Cpt( + EpicsSignalRO, suffix="enc_initialized_RBV", kind=Kind.config, auto_monitor=True + ) + heartbeat = Cpt( + EpicsSignalRO, suffix="heartbeat_RBV", kind=Kind.config, auto_monitor=True + ) + +class Mo1BraggEncoder(Device): + # Encoder reinitialization + enc_reinit = Cpt(EpicsSignal, suffix="enc_reinit", kind=Kind.config) + enc_reinit_done = Cpt( + EpicsSignalRO, suffix="enc_reinit_done_RBV", kind=Kind.config, auto_monitor=True + ) + class Mo1BraggCrystal(Device): """Class to set the crystal parameters of the Bragg positioner""" @@ -52,9 +84,99 @@ class Mo1BraggCrystal(Device): ) set_offset = Cpt(EpicsSignal, suffix="set_offset", kind=Kind.config) current_xtal = Cpt( - EpicsSignalRO, suffix="current_xtal_RBV_ENUM", kind=Kind.normal, auto_monitor=True + EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind=Kind.normal, auto_monitor=True ) +#TODO Reevaluate which PVs need and should have auto_monitors + kind = normal/config! +class Mo1BraggScanSettings(Device): + """ Scan Settings for Mo1 Bragg positioner""" + # XRD measurement settings + xrd_select_ref_enum = Cpt( + EpicsSignalWithRBV, suffix="xrd_select_ref_ENUM", kind=Kind.normal, auto_monitor=True + ) + + # High + xrd_enable_hi_enum = Cpt( + EpicsSignalWithRBV, suffix="xrd_enable_hi_ENUM", kind=Kind.normal, auto_monitor=True + ) + xrd_time_hi = Cpt( + EpicsSignalWithRBV, suffix="xrd_time_hi", kind=Kind.normal, auto_monitor=True + ) + xrd_n_trigger_hi = Cpt( + EpicsSignalWithRBV, suffix="xrd_n_trigger_hi", kind=Kind.normal, auto_monitor=True + ) + xrd_every_n_hi = Cpt( + EpicsSignalWithRBV, suffix="xrd_every_n_hi", kind=Kind.normal, auto_monitor=True + ) + + # Low + xrd_enable_lo_enum = Cpt( + EpicsSignalWithRBV, suffix="xrd_enable_lo_ENUM", kind=Kind.normal, auto_monitor=True + ) + xrd_time_lo = Cpt( + EpicsSignalWithRBV, suffix="xrd_time_lo", kind=Kind.normal, auto_monitor=True + ) + xrd_n_trigger_lo = Cpt( + EpicsSignalWithRBV, suffix="xrd_n_trigger_lo", kind=Kind.normal, auto_monitor=True + ) + xrd_every_n_lo = Cpt( + EpicsSignalWithRBV, suffix="xrd_every_n_lo", kind=Kind.normal, auto_monitor=True + ) + + # XAS simple scan settings + s_scan_angle_hi = Cpt( + EpicsSignalWithRBV, suffix="s_scan_angle_hi", kind=Kind.normal, auto_monitor=True + ) + s_scan_angle_lo = Cpt( + EpicsSignalWithRBV, suffix="s_scan_angle_lo", kind=Kind.normal, auto_monitor=True + ) + s_scan_energy_lo = Cpt( + EpicsSignalWithRBV, suffix="s_scan_energy_lo", kind=Kind.normal, auto_monitor=True + ) + s_scan_energy_hi = Cpt( + EpicsSignalWithRBV, suffix="s_scan_energy_hi", kind=Kind.normal, auto_monitor=True + ) + s_scan_scantime = Cpt( + EpicsSignalWithRBV, suffix="s_scan_scantime", kind=Kind.normal, auto_monitor=True + ) + + # XAS advanced scan settings + a_scan_pos = Cpt( + EpicsSignalWithRBV, suffix="a_scan_pos", kind=Kind.normal, auto_monitor=True + ) + a_scan_vel = Cpt( + EpicsSignalWithRBV, suffix="a_scan_vel", kind=Kind.normal, auto_monitor=True + ) + a_scan_time = Cpt( + EpicsSignalWithRBV, suffix="a_scan_time", kind=Kind.normal, auto_monitor=True + ) + + # Scan control + scan_mode_enum = Cpt( + EpicsSignalWithRBV, suffix="scan_mode_ENUM", kind=Kind.normal, auto_monitor=True + ) + scan_duration = Cpt( + EpicsSignalWithRBV, suffix="scan_duration", kind=Kind.normal, auto_monitor=True + ) + scan_load = Cpt(EpicsSignal, suffix="scan_load", kind=Kind.normal) + scan_msg = Cpt( + EpicsSignalRO, suffix="scan_msg_ENUM_RBV", kind=Kind.normal, auto_monitor=True + ) + scan_start_infinite = Cpt(EpicsSignal, suffix="scan_start_infinite", kind=Kind.normal) + scan_start_timer = Cpt(EpicsSignal, suffix="scan_start_timer", kind=Kind.normal) + scan_stop = Cpt(EpicsSignal, suffix="scan_stop", kind=Kind.normal) + scan_status = Cpt( + EpicsSignalRO, suffix="scan_status_ENUM_RBV", kind=Kind.normal, auto_monitor=True + ) + scan_time_left = Cpt( + EpicsSignalRO, suffix="scan_time_left_RBV", kind=Kind.normal, auto_monitor=True + ) + scan_done = Cpt( + EpicsSignalRO, suffix="scan_done_RBV", kind=Kind.normal, auto_monitor=True + ) + + + class MoveTypeSignal(Signal): """Custom Signal to set the move type of the Bragg positioner""" @@ -70,7 +192,7 @@ class MoveTypeSignal(Signal): raise ValueError( f"Invalid input for MoveTypeSignal {value}, can be either 'energy' or 'angle'" ) - self._readback = value + self._readback = value.value if isinstance(value, MoveType) else value class Mo1Bragg(Device, PositionerBase): @@ -79,6 +201,9 @@ class Mo1Bragg(Device, PositionerBase): USER_ACCESS = ["set_xtal"] crystal = Cpt(Mo1BraggCrystal, "") + encoder = Cpt(Mo1BraggEncoder, "") + scan_settings = Cpt(Mo1BraggScanSettings, "") + status = Cpt(Mo1BraggStatus, "") # Introduced new signal to be able to switch between motion in energy or angle move_type = Cpt(MoveTypeSignal, value=MoveType.ENERGY, kind=Kind.normal) @@ -136,6 +261,25 @@ class Mo1Bragg(Device, PositionerBase): self._stopped = False self.device_manager = device_manager self._move_thread = None + #TODO rename energy pos to be readback name + self.feedback_pos_energy.name = self.name + self.service_cfg = None + self.scaninfo = None + + if device_manager: + self.device_manager = device_manager + else: + self.device_manager = bec_utils.DMMock() + + self.connector = self.device_manager.connector + self._update_scaninfo() + + def _update_scaninfo(self) -> None: + """Update scaninfo from BecScaninfoMixing + This depends on device manager and operation/sim_mode + """ + self.scaninfo = bec_scaninfo_mixin.BecScaninfoMixin(self.device_manager) + self.scaninfo.load_scan_metadata() @property def limits(self) -> tuple: @@ -184,6 +328,7 @@ class Mo1Bragg(Device, PositionerBase): def stop(self, *, success=False) -> None: """Stop any motion on the positioner""" + #TODO Add PV call to interrupt the motion self._stopped = True if self._move_thread is not None: self._move_thread.join() @@ -209,28 +354,36 @@ class Mo1Bragg(Device, PositionerBase): # TODO - discuss with Klaus if we like to handle motion with potential exceptions like this for custom positioners try: # Set the target position on IOC - move_cpt.set(target_pos) + move_cpt.put(target_pos) # Start motion - self.move_abs.set(1) - # TODO - Check if a sleep is needed here with a test at the beamline - # Loop until the motion is done and run the subscriptions + self.move_abs.put(1) + # Sleep needed due to updated frequency to soft IOC + time.sleep(0.5) while self.move_abs_done.get() == 0: # Is this needed since BEC is subscribed to the feedback_pos_angle due to the auto_monitor=True - self._run_subs(sub_type=self.SUB_READBACK, value=read_cpt.get()) + val = read_cpt.get() + self._run_subs(sub_type=self.SUB_READBACK, value=val) + logger.info(f"Current position of {self.name} is {val}") if self._stopped: success = False break time.sleep(update_frequency) - # pylint: disable=broad-except + #TODO If the excetpion block remains here, it resolves directly the statusobject. I do not understand why + #pylint: disable=broad-except except Exception as exc: content = traceback.format_exc() logger.error(f"Error in move thread of device {self.name}: {content}") exception = exc + status.set_exception(exc=exception) finally: - if exception: - status.set_exception(exception) - else: - self._done_moving(success=success) + self._done_moving(success=success) + status.set_finished() + # pylint: disable=broad-except + # except Exception as exc: + # content = traceback.format_exc() + # logger.error(f"Error in move thread of device {self.name}: {content}") + # exception = exc + # status.set_exception(exc=exception) # TODO Should the ophyd implement a CTRL-C error handling? def move(self, value: float, move_type: str | MoveType = None, **kwargs) -> DeviceStatus: @@ -247,11 +400,11 @@ class Mo1Bragg(Device, PositionerBase): """ self._stopped = False if move_type is not None: - self.move_type.set(move_type) + self.move_type.put(move_type) move_type = self.move_type.get() # This checks also if the value is within the limits - status = super().move(value) - # Start motion + self.check_value(value) + status = DeviceStatus(device=self) move_cpt = ( self.setpoint_abs_energy if move_type == MoveType.ENERGY else self.setpoint_abs_angle ) @@ -282,13 +435,13 @@ class Mo1Bragg(Device, PositionerBase): d_spacing_si311 (float) : d-spacing for the 311 crystal """ if offset_si111 is not None: - self.crystal.offset_si111.set(offset_si111) + self.crystal.offset_si111.put(offset_si111) if offset_si311 is not None: - self.crystal.offset_si311.set(offset_si311) + self.crystal.offset_si311.put(offset_si311) if d_spacing_si111 is not None: - self.crystal.d_spacing_si111.set(d_spacing_si111) + self.crystal.d_spacing_si111.put(d_spacing_si111) if d_spacing_si311 is not None: - self.crystal.d_spacing_si311.set(d_spacing_si311) + self.crystal.d_spacing_si311.put(d_spacing_si311) if xtal_enum == "111": crystal_set = 0 elif xtal_enum == "311": @@ -297,9 +450,137 @@ class Mo1Bragg(Device, PositionerBase): raise ValueError( f"Invalid argument for xtal_enum : {xtal_enum}, choose from '111' or '311'" ) - self.crystal.xtal_enum.set(crystal_set) + self.crystal.xtal_enum.put(crystal_set) # Send the new settings from the IOC to the motor controller - self.crystal.set_offset.set(1) + self.crystal.set_offset.put(1) + + def set_xas_settings(self, low:float, high:float, scan_time:float): + move_type = self.move_type.get() + if move_type == MoveType.ENERGY: + self.scan_settings.s_scan_energy_lo.put(low) + self.scan_settings.s_scan_energy_hi.put(high) + else: + self.scan_settings.s_scan_angle_lo.put(low) + self.scan_settings.s_scan_angle_hi.put(high) + self.scan_settings.s_scan_scantime.put(scan_time) + + def set_xrd_settings(self, enable:bool): + self.scan_settings.xrd_enable_hi_enum.put(int(enable)) + self.scan_settings.xrd_enable_lo_enum.put(int(enable)) + + def scan_control_settings(self, mode: Literal["simple", "advanced"], scan_duration:float): + setpoint = 0 if mode == "simple" else 1 + self.scan_settings.scan_mode_enum.put(setpoint) # 0 Simple 1 Advance + self.scan_settings.scan_duration.put(scan_duration) + + def setup_simple_xas_scan(self, low:float, high:float, scan_time:float, mode:Literal["simple", "advanced"], scan_duration:float=0, **kwargs): + #TODO check if maybe we want an extra argument for infinite or finite motion + self.set_xas_settings(low=low, high=high, scan_time=scan_time) + self.set_xrd_settings(False) + self.scan_control_settings(mode=mode, scan_duration=scan_duration) + #self.scan_control. + self.scan_settings.scan_load.put(1) + self._wait_validation(**kwargs) + + def _wait_validation(self, timeout:float=3, interval:float=0.1): + timer = 0 + while True: + if self._stopped: + break + validation = self.scan_settings.scan_msg.get() + if validation == 2: + return + if validation > 2: + #TODO make more explicit base on ENUMS from PV + raise Mo1BraggError("Validation Error of scan parameter settings") + if timer>timeout: + raise TimeoutError(f"Scan Validation timeout after {timeout}s") + timer += interval + time.sleep(interval) + + def kickoff(self): + status = DeviceStatus(self) + + scan_duration = self.scan_settings.scan_duration.get() + if scan_duration < 0.1: + self.scan_settings.scan_start_infinite.put(1) + return + self.scan_settings.scan_start_timer.put(1) + status.set_finished() + return status + + def stage(self) -> list[object]: + """ + Stage device in preparation for a scan. + First we check if the device is already staged. Stage is idempotent, + if staged twice it should raise (we let ophyd.Device handle the raise here). + We reset the stopped flag and get the scaninfo from BEC, before calling custom_prepare.on_stage. + + Returns: + list(object): list of objects that were staged + + """ + if self._staged != Staged.no: + return super().stage() + self._stopped = False + self.scaninfo.load_scan_metadata() + # Fill in the decision making what should be staged + # + return super().stage() + + def complete(self) -> None: + """Complete the acquisition, called from BEC. + + This function is called after the scan is complete, just before unstage. + We can check here with the data backend and detector if the acquisition successfully finished. + + Actions are implemented in custom_prepare.on_complete since they are beamline specific. + """ + # pylint: disable=assignment-from-no-return + # IMPLEMENT complete logic here still + status = self._on_complete() + if isinstance(status, DeviceStatus): + return status + status = DeviceStatus(self) + status.set_finished() + return status + + def unstage(self) -> list[object]: + """ + Unstage device after a scan. + + We first check if the scanID has changed, thus, the scan was unexpectedly interrupted but the device was not stopped. + If that is the case, the stopped flag is set to True, which will immediately unstage the device. + + Custom_prepare.on_unstage is called to allow for BL specific logic to be executed. + + Returns: + list(object): list of objects that were unstaged + """ + self.check_scan_id() + if self._stopped is True: + return super().unstage() + #self.custom_prepare.on_unstage() + self._stopped = False + return super().unstage() + + def check_scan_id(self) -> None: + """Checks if scan_id has changed and set stopped flagged to True if it has.""" + old_scan_id = self.scaninfo.scan_id + self.scaninfo.load_scan_metadata() + if self.scaninfo.scan_id != old_scan_id: + self._stopped = True + + def _on_complete(self): + pass + + + + + + + + if __name__ == "__main__":