From ccaf996512da14522360eaf76f23cff9ca9c8d15 Mon Sep 17 00:00:00 2001 From: gac-x12sa Date: Wed, 4 Dec 2024 16:56:21 +0100 Subject: [PATCH] refactor: refactored ddg csaxs with new base class in ophyd devices --- .../device_configs/epics_devices_config.yaml | 50 ++++ .../devices/epics/delay_generator_csaxs.py | 218 ++++++------------ 2 files changed, 125 insertions(+), 143 deletions(-) create mode 100644 csaxs_bec/device_configs/epics_devices_config.yaml diff --git a/csaxs_bec/device_configs/epics_devices_config.yaml b/csaxs_bec/device_configs/epics_devices_config.yaml new file mode 100644 index 0000000..546f085 --- /dev/null +++ b/csaxs_bec/device_configs/epics_devices_config.yaml @@ -0,0 +1,50 @@ +ddg_detectors: + description: DelayGenerator for detector triggering + deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DelayGeneratorcSAXS + deviceConfig: + prefix: 'X12SA-CPCL-DDG3:' + ddg_config: + delay_burst: 40.e-3 + delta_width: 0 + additional_triggers: 0 + polarity: + - 1 # T0 -> DDG MCS + - 0 # eiger + - 1 # falcon + - 1 + - 1 + amplitude: 4.5 + offset: 0 + thres_trig_level: 2.5 + set_high_on_exposure: False + set_high_on_stage: False + deviceTags: + - cSAXS + - ddg_detectors + onFailure: buffer + enabled: true + readoutPriority: async + softwareTrigger: True +bpm4i: + readoutPriority: monitored + deviceClass: ophyd_devices.SimMonitor + deviceConfig: + deviceTags: + - beamline + enabled: true + readOnly: false +samx: + readoutPriority: baseline + deviceClass: ophyd_devices.SimPositioner + deviceConfig: + delay: 1 + limits: + - -50 + - 50 + tolerance: 0.01 + update_frequency: 400 + deviceTags: + - user motors + enabled: true + readOnly: false + \ No newline at end of file diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs.py b/csaxs_bec/devices/epics/delay_generator_csaxs.py index c0d521b..6662e6c 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs.py @@ -1,29 +1,25 @@ from bec_lib import bec_logger -from ophyd import Component -from ophyd_devices.interfaces.base_classes.psi_delay_generator_base import ( - DDGCustomMixin, - PSIDelayGeneratorBase, - TriggerSource, -) +from ophyd import Component, DeviceStatus, Kind + +from ophyd_devices.devices.delay_generator_645 import DelayGenerator, TriggerSource +from ophyd_devices.interfaces.base_classes.psi_device_base import CustomPrepare, PSIDeviceBase +from ophyd_devices.sim.sim_signals import SetableSignal from ophyd_devices.utils import bec_utils logger = bec_logger.logger -class DelayGeneratorError(Exception): +class DelayGeneratorcSAXSError(Exception): """Exception raised for errors.""" -class DDGSetup(DDGCustomMixin): +class DDGSetup(CustomPrepare): """ - Mixin class for DelayGenerator logic at cSAXS. - - At cSAXS, multiple DDGs were operated at the same time. There different behaviour is - implemented in the ddg_config signals that are passed via the device config. + Custom Prepare class with hooks for beamline specific logic for the DG645 at CSAXS """ - def initialize_default_parameter(self) -> None: - """Method to initialize default parameters.""" + def on_wait_for_connection(self) ->None: + """Init default parameter after the all signals are connected""" for ii, channel in enumerate(self.parent.all_channels): self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel]) @@ -40,126 +36,66 @@ class DDGSetup(DDGCustomMixin): # Set threshold level for ext. pulses self.parent.level.put(self.parent.thres_trig_level.get()) - def prepare_ddg(self) -> None: - """ - Method to prepare scan logic of cSAXS - - Two scantypes are supported: "step" and "fly": - - step: Scan is performed by stepping the motor and acquiring data at each step - - fly: Scan is performed by moving the motor with a constant velocity and acquiring data - - Custom logic for different DDG behaviour during scans. - - - set_high_on_exposure : If True, then TTL signal is high during - the full exposure time of the scan (all frames). - E.g. Keep shutter open for the full scan. - - fixed_ttl_width : fixed_ttl_width is a list of 5 values, one for each channel. - If the value is 0, then the width of the TTL pulse is determined, - no matter which parameters are passed from the scaninfo for exposure time - - set_trigger_source : Specifies the default trigger source for the DDG. For cSAXS, relevant ones - were: SINGLE_SHOT, EXT_RISING_EDGE - """ - self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get())) - # scantype "step" + def on_stage(self) -> None: + # self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get())) + # scantype "jjf_test" if self.parent.scaninfo.scan_type == "step": - # High on exposure means that the signal - if self.parent.set_high_on_exposure.get(): - # caluculate parameters - num_burst_cycle = 1 + self.parent.additional_triggers.get() - - exp_time = ( - self.parent.delta_width.get() - + self.parent.scaninfo.frames_per_trigger - * (self.parent.scaninfo.exp_time + self.parent.scaninfo.readout_time) - ) - total_exposure = exp_time - delay_burst = self.parent.delay_burst.get() - - # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too - if not self.parent.trigger_width.get(): - self.parent.set_channels("width", exp_time) - else: - self.parent.set_channels("width", self.parent.trigger_width.get()) - for value, channel in zip( - self.parent.fixed_ttl_width.get(), self.parent.all_channels - ): - logger.debug(f"Trying to set DDG {channel} to {value}") - if value != 0: - self.parent.set_channels("width", value, channels=[channel]) - else: - # caluculate parameters - exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time - total_exposure = exp_time + self.parent.scaninfo.readout_time - delay_burst = self.parent.delay_burst.get() - num_burst_cycle = ( - self.parent.scaninfo.frames_per_trigger + self.parent.additional_triggers.get() - ) - - # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too - if not self.parent.trigger_width.get(): - self.parent.set_channels("width", exp_time) - else: - self.parent.set_channels("width", self.parent.trigger_width.get()) - # scantype "fly" - elif self.parent.scaninfo.scan_type == "fly": - if self.parent.set_high_on_exposure.get(): - # caluculate parameters - exp_time = ( - self.parent.delta_width.get() - + self.parent.scaninfo.exp_time * self.parent.scaninfo.num_points - + self.parent.scaninfo.readout_time * (self.parent.scaninfo.num_points - 1) - ) - total_exposure = exp_time - delay_burst = self.parent.delay_burst.get() - num_burst_cycle = 1 + self.parent.additional_triggers.get() - - # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too - if not self.parent.trigger_width.get(): - self.parent.set_channels("width", exp_time) - else: - self.parent.set_channels("width", self.parent.trigger_width.get()) - for value, channel in zip( - self.parent.fixed_ttl_width.get(), self.parent.all_channels - ): - logger.debug(f"Trying to set DDG {channel} to {value}") - if value != 0: - self.parent.set_channels("width", value, channels=[channel]) - else: - # caluculate parameters - exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time - total_exposure = exp_time + self.parent.scaninfo.readout_time - delay_burst = self.parent.delay_burst.get() - num_burst_cycle = ( - self.parent.scaninfo.num_points + self.parent.additional_triggers.get() - ) - - # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too - if not self.parent.trigger_width.get(): - self.parent.set_channels("width", exp_time) - else: - self.parent.set_channels("width", self.parent.trigger_width.get()) - - else: - raise Exception(f"Unknown scan type {self.parent.scaninfo.scan_type}") - # Set common DDG parameters - self.parent.burst_enable(num_burst_cycle, delay_burst, total_exposure, config="first") - self.parent.set_channels("delay", 0.0) + exp_time = self.parent.scaninfo.exp_time + delay = 0 + self.parent.burst_disable() + self.parent.set_trigger(TriggerSource.SINGLE_SHOT) + self.parent.set_channels(signal='width', value=exp_time) + self.parent.set_channels(signal='delay', value=delay) + return + scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "") + if scan_name == "jjf_test": + # exp_time = self.parent.scaninfo.exp_time + # readout = self.parent.scaninfo.readout_time + # num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"] + # total_exposure = exp_time+readout + exp_time = 480e-6#self.parent.scaninfo.exp_time + readout = 20e-6#self.parent.scaninfo.readout_time + total_exposure = exp_time+readout + num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"] + num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure) + delay = 0 + delay_burst = self.parent.delay_burst.get() + + self.parent.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT) + + self.parent.set_channels(signal='width', value=exp_time) + self.parent.set_channels(signal='delay', value=delay) + self.parent.burst_enable(count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first") + logger.info(f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}") def on_trigger(self) -> None: """Method to be executed upon trigger""" - if self.parent.source.read()[self.parent.source.name]["value"] == TriggerSource.SINGLE_SHOT: + if self.parent.scaninfo.scan_type == "step": self.parent.trigger_shot.put(1) + return + scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "") + if scan_name == "jjf_test": + exp_time = 480e-6#self.parent.scaninfo.exp_time + readout = 20e-6#self.parent.scaninfo.readout_time + total_exposure = exp_time+readout + num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"] + num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure) + cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["cycles"] + #time.sleep(num_burst_cycle*total_exposure) + def check_ddg()->int: + self.parent.trigger_burst_readout.put(1) + return self.parent.burst_cycle_finished.get() + status = self.wait_with_status(signal_conditions=[(check_ddg, 1)], + timeout=num_burst_cycle*total_exposure+1, + check_stopped=True, + exception_on_timeout=DelayGeneratorcSAXSError(f"{self.parent.name} run into timeout in complete call.") + ) + logger.info(f"Return status {self.parent.name}") + return status - def check_scan_id(self) -> None: - """ - Method to check if scan_id has changed. - If yes, then it changes parent.stopped to True, which will stop further actions. - """ - old_scan_id = self.parent.scaninfo.scan_id - self.parent.scaninfo.load_scan_metadata() - if self.parent.scaninfo.scan_id != old_scan_id: - self.parent.stopped = True + def on_complete(self) -> DeviceStatus: + pass def finished(self) -> None: """Method checks if DDG finished acquisition""" @@ -174,7 +110,8 @@ class DDGSetup(DDGCustomMixin): self.parent.trigger_shot.put(1) -class DelayGeneratorcSAXS(PSIDelayGeneratorBase): + +class DelayGeneratorcSAXS(PSIDeviceBase, DelayGenerator): """ DG645 delay generator at cSAXS (multiple can be in use depending on the setup) @@ -203,6 +140,9 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase): custom_prepare_cls = DDGSetup + # Custom signals passed on during the init procedure via BEC + #TODO review whether those should remain here like that + delay_burst = Component( bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config" ) @@ -280,16 +220,12 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase): def __init__( self, - prefix="", - *, - name, - kind=None, - read_attrs=None, - configuration_attrs=None, + name:str, + prefix:str="", + kind:Kind=None, + ddg_config:dict=None, parent=None, device_manager=None, - sim_mode=False, - ddg_config=None, **kwargs, ): """ @@ -305,6 +241,7 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase): ddg_config (dict, optional): Dictionary of ddg_config signals. Defaults to None. """ + # Default values for ddg_config signals self.ddg_config = { # Setup default values @@ -330,16 +267,11 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase): prefix=prefix, name=name, kind=kind, - read_attrs=read_attrs, - configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, - sim_mode=sim_mode, **kwargs, ) -if __name__ == "__main__": - # Start delay generator in simulation mode. - # Note: To run, access to Epics must be available. - dgen = DelayGeneratorcSAXS("delaygen:DG1:", name="dgen", sim_mode=True) +# if __name__ == "__main__": +# dgen = DelayGeneratorcSAXS("X12SA-CPCL-DDG3:", name="ddg3")