mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-05-30 16:40:41 +02:00
refactor: removed burst_enabl/disable etc.. slight refactoring of prepare_ddg
This commit is contained in:
parent
a73411646d
commit
f218a9b114
@ -1,10 +1,10 @@
|
||||
import enum
|
||||
from ophyd import Component
|
||||
|
||||
from ophyd_devices.utils import bec_utils
|
||||
from ophyd_devices.epics.devices.psi_delay_generator_base import (
|
||||
PSIDelayGeneratorBase,
|
||||
DDGCustomMixin,
|
||||
TriggerSource,
|
||||
)
|
||||
|
||||
from bec_lib import bec_logger
|
||||
@ -17,25 +17,6 @@ class DelayGeneratorError(Exception):
|
||||
"""Exception raised for errors."""
|
||||
|
||||
|
||||
class TriggerSource(enum.IntEnum):
|
||||
"""
|
||||
Class for trigger options of DG645
|
||||
|
||||
Used to set the trigger source of the DG645 by setting the value
|
||||
e.g. source.put(TriggerSource.Internal)
|
||||
Exp:
|
||||
TriggerSource.Internal
|
||||
"""
|
||||
|
||||
INTERNAL = 0
|
||||
EXT_RISING_EDGE = 1
|
||||
EXT_FALLING_EDGE = 2
|
||||
SS_EXT_RISING_EDGE = 3
|
||||
SS_EXT_FALLING_EDGE = 4
|
||||
SINGLE_SHOT = 5
|
||||
LINE = 6
|
||||
|
||||
|
||||
class DDGSetup(DDGCustomMixin):
|
||||
"""
|
||||
Mixin class for DelayGenerator logic at cSAXS.
|
||||
@ -47,7 +28,7 @@ class DDGSetup(DDGCustomMixin):
|
||||
def initialize_default_parameter(self) -> None:
|
||||
"""Method to initialize default parameters."""
|
||||
for ii, channel in enumerate(self.parent.all_channels):
|
||||
self.parent.set_channels("polarity", self.parent.polarity.get()[ii], channels=[channel])
|
||||
self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel])
|
||||
|
||||
self.parent.set_channels("amplitude", self.parent.amplitude.get())
|
||||
self.parent.set_channels("offset", self.parent.offset.get())
|
||||
@ -55,17 +36,13 @@ class DDGSetup(DDGCustomMixin):
|
||||
self.parent.set_channels(
|
||||
"reference",
|
||||
0,
|
||||
[
|
||||
f"channel{self.parent.all_delay_pairs[ii]}.ch1"
|
||||
for ii in range(len(self.parent.all_delay_pairs))
|
||||
],
|
||||
[f"channel{pair}.ch1" for pair in self.parent.all_delay_pairs],
|
||||
)
|
||||
self.parent.set_channels(
|
||||
"reference",
|
||||
0,
|
||||
[f"channel{pair}.ch2" for pair in self.parent.all_delay_pairs],
|
||||
)
|
||||
for ii, pair in enumerate(self.parent.all_delay_pairs):
|
||||
self.parent.set_channels(
|
||||
"reference",
|
||||
0,
|
||||
[f"channel{pair}.ch2"],
|
||||
)
|
||||
self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
|
||||
# Set threshold level for ext. pulses
|
||||
self.parent.level.put(self.parent.thres_trig_level.get())
|
||||
@ -89,13 +66,12 @@ class DDGSetup(DDGCustomMixin):
|
||||
- set_trigger_source : Specifies the default trigger source for the DDG. For cSAXS, relevant ones
|
||||
were: SINGLE_SHOT, EXT_RISING_EDGE
|
||||
"""
|
||||
self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
|
||||
# scantype "step"
|
||||
if self.parent.scaninfo.scan_type == "step":
|
||||
# High on exposure means that the signal
|
||||
if self.parent.set_high_on_exposure.get():
|
||||
self.parent.set_trigger(
|
||||
getattr(TriggerSource, self.parent.set_trigger_source.get())
|
||||
)
|
||||
# caluculate parameters
|
||||
num_burst_cycle = 1 + self.parent.additional_triggers.get()
|
||||
|
||||
exp_time = (
|
||||
@ -105,11 +81,8 @@ class DDGSetup(DDGCustomMixin):
|
||||
)
|
||||
total_exposure = exp_time
|
||||
delay_burst = self.parent.delay_burst.get()
|
||||
self.parent.burst_enable(
|
||||
num_burst_cycle, delay_burst, total_exposure, config="first"
|
||||
)
|
||||
self.parent.set_channels("delay", 0)
|
||||
# Set burst length to half of the experimental time!
|
||||
|
||||
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
|
||||
if not self.parent.trigger_width.get():
|
||||
self.parent.set_channels("width", exp_time)
|
||||
else:
|
||||
@ -121,21 +94,15 @@ class DDGSetup(DDGCustomMixin):
|
||||
if value != 0:
|
||||
self.parent.set_channels("width", value, channels=[channel])
|
||||
else:
|
||||
self.parent.set_trigger(
|
||||
getattr(TriggerSource, self.parent.set_trigger_source.get())
|
||||
)
|
||||
# caluculate parameters
|
||||
exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
|
||||
total_exposure = exp_time + self.parent.scaninfo.readout_time
|
||||
delay_burst = self.parent.delay_burst.get()
|
||||
num_burst_cycle = (
|
||||
self.parent.scaninfo.frames_per_trigger + self.parent.additional_triggers.get()
|
||||
)
|
||||
# set parameters in DDG
|
||||
self.parent.burst_enable(
|
||||
num_burst_cycle, delay_burst, total_exposure, config="first"
|
||||
)
|
||||
self.parent.set_channels("delay", 0)
|
||||
# Set burst length to half of the experimental time!
|
||||
|
||||
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
|
||||
if not self.parent.trigger_width.get():
|
||||
self.parent.set_channels("width", exp_time)
|
||||
else:
|
||||
@ -143,10 +110,7 @@ class DDGSetup(DDGCustomMixin):
|
||||
# scantype "fly"
|
||||
elif self.parent.scaninfo.scan_type == "fly":
|
||||
if self.parent.set_high_on_exposure.get():
|
||||
# define parameters
|
||||
self.parent.set_trigger(
|
||||
getattr(TriggerSource, self.parent.set_trigger_source.get())
|
||||
)
|
||||
# caluculate parameters
|
||||
exp_time = (
|
||||
self.parent.delta_width.get()
|
||||
+ self.parent.scaninfo.exp_time * self.parent.scaninfo.num_points
|
||||
@ -154,14 +118,9 @@ class DDGSetup(DDGCustomMixin):
|
||||
)
|
||||
total_exposure = exp_time
|
||||
delay_burst = self.parent.delay_burst.get()
|
||||
# self.additional_triggers should be 0 for self.set_high_on_exposure or remove here fully..
|
||||
num_burst_cycle = 1 + self.parent.additional_triggers.get()
|
||||
# set parameters in DDG
|
||||
self.parent.burst_enable(
|
||||
num_burst_cycle, delay_burst, total_exposure, config="first"
|
||||
)
|
||||
self.parent.set_channels("delay", 0.0)
|
||||
# Set burst length to half of the experimental time!
|
||||
|
||||
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
|
||||
if not self.parent.trigger_width.get():
|
||||
self.parent.set_channels("width", exp_time)
|
||||
else:
|
||||
@ -169,27 +128,19 @@ class DDGSetup(DDGCustomMixin):
|
||||
for value, channel in zip(
|
||||
self.parent.fixed_ttl_width.get(), self.parent.all_channels
|
||||
):
|
||||
logger.info(f"{value}")
|
||||
logger.debug(f"Trying to set DDG {channel} to {value}")
|
||||
if value != 0:
|
||||
logger.info(f"Setting {value}")
|
||||
self.parent.set_channels("width", value, channels=[channel])
|
||||
else:
|
||||
# define parameters
|
||||
self.parent.set_trigger(
|
||||
getattr(TriggerSource, self.parent.set_trigger_source.get())
|
||||
)
|
||||
# caluculate parameters
|
||||
exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
|
||||
total_exposure = exp_time + self.parent.scaninfo.readout_time
|
||||
delay_burst = self.parent.delay_burst.get()
|
||||
num_burst_cycle = (
|
||||
self.parent.scaninfo.num_points + self.parent.additional_triggers.get()
|
||||
)
|
||||
# set parameters in DDG
|
||||
self.parent.burst_enable(
|
||||
num_burst_cycle, delay_burst, total_exposure, config="first"
|
||||
)
|
||||
self.parent.set_channels("delay", 0.0)
|
||||
# Set burst length to half of the experimental time!
|
||||
|
||||
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
|
||||
if not self.parent.trigger_width.get():
|
||||
self.parent.set_channels("width", exp_time)
|
||||
else:
|
||||
@ -197,6 +148,9 @@ class DDGSetup(DDGCustomMixin):
|
||||
|
||||
else:
|
||||
raise Exception(f"Unknown scan type {self.parent.scaninfo.scan_type}")
|
||||
# Set common DDG parameters
|
||||
self.parent.burst_enable(num_burst_cycle, delay_burst, total_exposure, config="first")
|
||||
self.parent.set_channels("delay", 0.0)
|
||||
|
||||
def on_trigger(self) -> None:
|
||||
"""Method to be executed upon trigger"""
|
||||
@ -246,7 +200,7 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
|
||||
Custom signals for logic in different DDGs during scans (for custom_prepare.prepare_ddg):
|
||||
|
||||
- set_high_on_exposure : (bool): if True, then TTL signal should go high during the full acquisition time of a scan.
|
||||
# TODO fixed_ttl_width and trigger_width could be combined in a single signal
|
||||
# TODO trigger_width and fixed_ttl could be combined into single list.
|
||||
- fixed_ttl_width : (list of either 1 or 0), one for each channel.
|
||||
- trigger_width : (float) if fixed_ttl_width is True, then the width of the TTL pulse is set to this value.
|
||||
- set_trigger_source : (TriggerSource) specifies the default trigger source for the DDG.
|
||||
@ -401,43 +355,11 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
|
||||
read_attrs=read_attrs,
|
||||
configuration_attrs=configuration_attrs,
|
||||
parent=parent,
|
||||
device_manager=device_manager,
|
||||
sim_mode=sim_mode,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def set_trigger(self, trigger_source: TriggerSource) -> None:
|
||||
"""Set trigger source on DDG - possible values defined in TriggerSource enum"""
|
||||
value = int(trigger_source)
|
||||
self.source.put(value)
|
||||
|
||||
def burst_enable(self, count, delay, period, config="all"):
|
||||
"""Enable the burst mode"""
|
||||
# Validate inputs
|
||||
count = int(count)
|
||||
assert count > 0, "Number of bursts must be positive"
|
||||
assert delay >= 0, "Burst delay must be larger than 0"
|
||||
assert period > 0, "Burst period must be positive"
|
||||
assert config in [
|
||||
"all",
|
||||
"first",
|
||||
], "Supported burst configs are 'all' and 'first'"
|
||||
|
||||
self.burstMode.put(1)
|
||||
self.burstCount.put(count)
|
||||
self.burstDelay.put(delay)
|
||||
self.burstPeriod.put(period)
|
||||
|
||||
if config == "all":
|
||||
self.burstConfig.put(0)
|
||||
elif config == "first":
|
||||
self.burstConfig.put(1)
|
||||
|
||||
def burst_disable(self):
|
||||
"""Disable burst mode"""
|
||||
self.burstMode.put(0)
|
||||
|
||||
def stage(self):
|
||||
"""Customized stage function"""
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Start delay generator in simulation mode.
|
||||
|
Loading…
x
Reference in New Issue
Block a user