refactor: refactored ddg csaxs with new base class in ophyd devices

This commit is contained in:
gac-x12sa
2024-12-04 16:56:21 +01:00
committed by appel_c
parent 83de8b75db
commit ccaf996512
2 changed files with 125 additions and 143 deletions

View File

@@ -0,0 +1,50 @@
ddg_detectors:
description: DelayGenerator for detector triggering
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DelayGeneratorcSAXS
deviceConfig:
prefix: 'X12SA-CPCL-DDG3:'
ddg_config:
delay_burst: 40.e-3
delta_width: 0
additional_triggers: 0
polarity:
- 1 # T0 -> DDG MCS
- 0 # eiger
- 1 # falcon
- 1
- 1
amplitude: 4.5
offset: 0
thres_trig_level: 2.5
set_high_on_exposure: False
set_high_on_stage: False
deviceTags:
- cSAXS
- ddg_detectors
onFailure: buffer
enabled: true
readoutPriority: async
softwareTrigger: True
bpm4i:
readoutPriority: monitored
deviceClass: ophyd_devices.SimMonitor
deviceConfig:
deviceTags:
- beamline
enabled: true
readOnly: false
samx:
readoutPriority: baseline
deviceClass: ophyd_devices.SimPositioner
deviceConfig:
delay: 1
limits:
- -50
- 50
tolerance: 0.01
update_frequency: 400
deviceTags:
- user motors
enabled: true
readOnly: false

View File

@@ -1,29 +1,25 @@
from bec_lib import bec_logger
from ophyd import Component
from ophyd_devices.interfaces.base_classes.psi_delay_generator_base import (
DDGCustomMixin,
PSIDelayGeneratorBase,
TriggerSource,
)
from ophyd import Component, DeviceStatus, Kind
from ophyd_devices.devices.delay_generator_645 import DelayGenerator, TriggerSource
from ophyd_devices.interfaces.base_classes.psi_device_base import CustomPrepare, PSIDeviceBase
from ophyd_devices.sim.sim_signals import SetableSignal
from ophyd_devices.utils import bec_utils
logger = bec_logger.logger
class DelayGeneratorError(Exception):
class DelayGeneratorcSAXSError(Exception):
"""Exception raised for errors."""
class DDGSetup(DDGCustomMixin):
class DDGSetup(CustomPrepare):
"""
Mixin class for DelayGenerator logic at cSAXS.
At cSAXS, multiple DDGs were operated at the same time. There different behaviour is
implemented in the ddg_config signals that are passed via the device config.
Custom Prepare class with hooks for beamline specific logic for the DG645 at CSAXS
"""
def initialize_default_parameter(self) -> None:
"""Method to initialize default parameters."""
def on_wait_for_connection(self) ->None:
"""Init default parameter after the all signals are connected"""
for ii, channel in enumerate(self.parent.all_channels):
self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel])
@@ -40,126 +36,66 @@ class DDGSetup(DDGCustomMixin):
# Set threshold level for ext. pulses
self.parent.level.put(self.parent.thres_trig_level.get())
def prepare_ddg(self) -> None:
"""
Method to prepare scan logic of cSAXS
Two scantypes are supported: "step" and "fly":
- step: Scan is performed by stepping the motor and acquiring data at each step
- fly: Scan is performed by moving the motor with a constant velocity and acquiring data
Custom logic for different DDG behaviour during scans.
- set_high_on_exposure : If True, then TTL signal is high during
the full exposure time of the scan (all frames).
E.g. Keep shutter open for the full scan.
- fixed_ttl_width : fixed_ttl_width is a list of 5 values, one for each channel.
If the value is 0, then the width of the TTL pulse is determined,
no matter which parameters are passed from the scaninfo for exposure time
- set_trigger_source : Specifies the default trigger source for the DDG. For cSAXS, relevant ones
were: SINGLE_SHOT, EXT_RISING_EDGE
"""
self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# scantype "step"
def on_stage(self) -> None:
# self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# scantype "jjf_test"
if self.parent.scaninfo.scan_type == "step":
# High on exposure means that the signal
if self.parent.set_high_on_exposure.get():
# caluculate parameters
num_burst_cycle = 1 + self.parent.additional_triggers.get()
exp_time = (
self.parent.delta_width.get()
+ self.parent.scaninfo.frames_per_trigger
* (self.parent.scaninfo.exp_time + self.parent.scaninfo.readout_time)
)
total_exposure = exp_time
delay_burst = self.parent.delay_burst.get()
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
for value, channel in zip(
self.parent.fixed_ttl_width.get(), self.parent.all_channels
):
logger.debug(f"Trying to set DDG {channel} to {value}")
if value != 0:
self.parent.set_channels("width", value, channels=[channel])
else:
# caluculate parameters
exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
total_exposure = exp_time + self.parent.scaninfo.readout_time
delay_burst = self.parent.delay_burst.get()
num_burst_cycle = (
self.parent.scaninfo.frames_per_trigger + self.parent.additional_triggers.get()
)
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
# scantype "fly"
elif self.parent.scaninfo.scan_type == "fly":
if self.parent.set_high_on_exposure.get():
# caluculate parameters
exp_time = (
self.parent.delta_width.get()
+ self.parent.scaninfo.exp_time * self.parent.scaninfo.num_points
+ self.parent.scaninfo.readout_time * (self.parent.scaninfo.num_points - 1)
)
total_exposure = exp_time
delay_burst = self.parent.delay_burst.get()
num_burst_cycle = 1 + self.parent.additional_triggers.get()
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
for value, channel in zip(
self.parent.fixed_ttl_width.get(), self.parent.all_channels
):
logger.debug(f"Trying to set DDG {channel} to {value}")
if value != 0:
self.parent.set_channels("width", value, channels=[channel])
else:
# caluculate parameters
exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
total_exposure = exp_time + self.parent.scaninfo.readout_time
delay_burst = self.parent.delay_burst.get()
num_burst_cycle = (
self.parent.scaninfo.num_points + self.parent.additional_triggers.get()
)
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
else:
raise Exception(f"Unknown scan type {self.parent.scaninfo.scan_type}")
# Set common DDG parameters
self.parent.burst_enable(num_burst_cycle, delay_burst, total_exposure, config="first")
self.parent.set_channels("delay", 0.0)
exp_time = self.parent.scaninfo.exp_time
delay = 0
self.parent.burst_disable()
self.parent.set_trigger(TriggerSource.SINGLE_SHOT)
self.parent.set_channels(signal='width', value=exp_time)
self.parent.set_channels(signal='delay', value=delay)
return
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "jjf_test":
# exp_time = self.parent.scaninfo.exp_time
# readout = self.parent.scaninfo.readout_time
# num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
# total_exposure = exp_time+readout
exp_time = 480e-6#self.parent.scaninfo.exp_time
readout = 20e-6#self.parent.scaninfo.readout_time
total_exposure = exp_time+readout
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure)
delay = 0
delay_burst = self.parent.delay_burst.get()
self.parent.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT)
self.parent.set_channels(signal='width', value=exp_time)
self.parent.set_channels(signal='delay', value=delay)
self.parent.burst_enable(count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first")
logger.info(f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}")
def on_trigger(self) -> None:
"""Method to be executed upon trigger"""
if self.parent.source.read()[self.parent.source.name]["value"] == TriggerSource.SINGLE_SHOT:
if self.parent.scaninfo.scan_type == "step":
self.parent.trigger_shot.put(1)
return
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "jjf_test":
exp_time = 480e-6#self.parent.scaninfo.exp_time
readout = 20e-6#self.parent.scaninfo.readout_time
total_exposure = exp_time+readout
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure)
cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["cycles"]
#time.sleep(num_burst_cycle*total_exposure)
def check_ddg()->int:
self.parent.trigger_burst_readout.put(1)
return self.parent.burst_cycle_finished.get()
status = self.wait_with_status(signal_conditions=[(check_ddg, 1)],
timeout=num_burst_cycle*total_exposure+1,
check_stopped=True,
exception_on_timeout=DelayGeneratorcSAXSError(f"{self.parent.name} run into timeout in complete call.")
)
logger.info(f"Return status {self.parent.name}")
return status
def check_scan_id(self) -> None:
"""
Method to check if scan_id has changed.
If yes, then it changes parent.stopped to True, which will stop further actions.
"""
old_scan_id = self.parent.scaninfo.scan_id
self.parent.scaninfo.load_scan_metadata()
if self.parent.scaninfo.scan_id != old_scan_id:
self.parent.stopped = True
def on_complete(self) -> DeviceStatus:
pass
def finished(self) -> None:
"""Method checks if DDG finished acquisition"""
@@ -174,7 +110,8 @@ class DDGSetup(DDGCustomMixin):
self.parent.trigger_shot.put(1)
class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
class DelayGeneratorcSAXS(PSIDeviceBase, DelayGenerator):
"""
DG645 delay generator at cSAXS (multiple can be in use depending on the setup)
@@ -203,6 +140,9 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
custom_prepare_cls = DDGSetup
# Custom signals passed on during the init procedure via BEC
#TODO review whether those should remain here like that
delay_burst = Component(
bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config"
)
@@ -280,16 +220,12 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
name:str,
prefix:str="",
kind:Kind=None,
ddg_config:dict=None,
parent=None,
device_manager=None,
sim_mode=False,
ddg_config=None,
**kwargs,
):
"""
@@ -305,6 +241,7 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
ddg_config (dict, optional): Dictionary of ddg_config signals. Defaults to None.
"""
# Default values for ddg_config signals
self.ddg_config = {
# Setup default values
@@ -330,16 +267,11 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
device_manager=device_manager,
sim_mode=sim_mode,
**kwargs,
)
if __name__ == "__main__":
# Start delay generator in simulation mode.
# Note: To run, access to Epics must be available.
dgen = DelayGeneratorcSAXS("delaygen:DG1:", name="dgen", sim_mode=True)
# if __name__ == "__main__":
# dgen = DelayGeneratorcSAXS("X12SA-CPCL-DDG3:", name="ddg3")