fix: bugfix in delaygenerators

This commit is contained in:
appel_c 2023-09-04 20:50:14 +02:00
parent c0b3418366
commit 2dd8f25c87

View File

@ -147,7 +147,12 @@ class DelayGeneratorDG645(Device):
"reload_config", "reload_config",
] ]
state = Component(EpicsSignalRO, "EventStatusLI", name="status_register") trigger_burst_readout = Component(
EpicsSignal, "EventStatusLI.PROC", name="read_burst_state"
)
burst_cycle_finished = Component(
EpicsSignalRO, "EventStatusMBBID.B3", name="read_burst_state"
)
status = Component(EpicsSignalRO, "StatusSI", name="status") status = Component(EpicsSignalRO, "StatusSI", name="status")
clear_error = Component(EpicsSignal, "StatusClearBO", name="clear_error") clear_error = Component(EpicsSignal, "StatusClearBO", name="clear_error")
@ -297,6 +302,13 @@ class DelayGeneratorDG645(Device):
config_storage_name="ddg_config", config_storage_name="ddg_config",
) )
set_trigger_source = Component(
bec_utils.ConfigSignal,
name="set_trigger_source",
kind="config",
config_storage_name="ddg_config",
)
def __init__( def __init__(
self, self,
prefix="", prefix="",
@ -329,6 +341,9 @@ class DelayGeneratorDG645(Device):
delay_burst (_type_, float): Add delay for triggering in software trigger mode to allow fast shutter to open. Defaults to 0. delay_burst (_type_, float): Add delay for triggering in software trigger mode to allow fast shutter to open. Defaults to 0.
delta_width (_type_, float): Add width to fast shutter signal to make sure its open during acquisition. Defaults to 0. delta_width (_type_, float): Add width to fast shutter signal to make sure its open during acquisition. Defaults to 0.
delta_triggers (_type_, int): Add additional triggers to burst mode (mcs card needs +1 triggers per line). Defaults to 0. delta_triggers (_type_, int): Add additional triggers to burst mode (mcs card needs +1 triggers per line). Defaults to 0.
set_high_on_exposure
set_high_on_stage
set_trigger_source
""" """
self.ddg_config = { self.ddg_config = {
f"{name}_delay_burst": 0, f"{name}_delay_burst": 0,
@ -340,9 +355,13 @@ class DelayGeneratorDG645(Device):
f"{name}_thres_trig_level": 2.5, f"{name}_thres_trig_level": 2.5,
f"{name}_set_high_on_exposure": False, f"{name}_set_high_on_exposure": False,
f"{name}_set_high_on_stage": False, f"{name}_set_high_on_stage": False,
f"{name}_set_trigger_source": "SINGLE_SHOT",
} }
if ddg_config is not None: if ddg_config is not None:
[self.ddg_config.update({f'{name}_{key}' : value}) for key, value in ddg_config.items()] [
self.ddg_config.update({f"{name}_{key}": value})
for key, value in ddg_config.items()
]
super().__init__( super().__init__(
prefix=prefix, prefix=prefix,
name=name, name=name,
@ -372,9 +391,10 @@ class DelayGeneratorDG645(Device):
] ]
self._all_delay_pairs = ["AB", "CD", "EF", "GH"] self._all_delay_pairs = ["AB", "CD", "EF", "GH"]
self.wait_for_connection() # Make sure to be connected before talking to PVs self.wait_for_connection() # Make sure to be connected before talking to PVs
logger.info(f'Current polarity value {self.polarity.get()}') logger.info(f"Current polarity values {self.polarity.get()}")
self.reload_config() self.reload_config()
self._ddg_is_okay() self._ddg_is_okay()
self._stopped = False
def _set_trigger(self, trigger_source: TriggerSource) -> None: def _set_trigger(self, trigger_source: TriggerSource) -> None:
"""Set trigger source to value of list below, or string """Set trigger source to value of list below, or string
@ -414,14 +434,11 @@ class DelayGeneratorDG645(Device):
getattr(channel.io, signal).set(value) getattr(channel.io, signal).set(value)
def _cleanup_ddg(self) -> None: def _cleanup_ddg(self) -> None:
self._set_trigger(TriggerSource.SINGLE_SHOT) self._set_trigger(getattr(TriggerSource, self.set_trigger_source.get()))
def reload_config(self) -> None: def reload_config(self) -> None:
self.set_channels( for ii, channel in enumerate(self._all_channels):
"polarity", self.set_channels("polarity", self.polarity.get()[ii], channels=[channel])
self.polarity.get(),
channels=["channelT0", "channelAB", "channelCD", "channelEF", "channelGH"],
)
# Set polarity for eiger inverted! # Set polarity for eiger inverted!
# self.set_channels("polarity", 0, channels=["channelAB"]) # self.set_channels("polarity", 0, channels=["channelAB"])
self.set_channels("amplitude", self.amplitude.get()) self.set_channels("amplitude", self.amplitude.get())
@ -441,39 +458,45 @@ class DelayGeneratorDG645(Device):
0, 0,
[f"channel{self._all_delay_pairs[ii]}.ch2"], [f"channel{self._all_delay_pairs[ii]}.ch2"],
) )
self._set_trigger(TriggerSource.SINGLE_SHOT) self._set_trigger(getattr(TriggerSource, self.set_trigger_source.get()))
# Set threshold level for ext. pulses # Set threshold level for ext. pulses
self.level.set(self.thres_trig_level.get()) self.level.set(self.thres_trig_level.get())
def _check_burst_cycle(self) -> None:
"''Checks burst cycle of delay generator''"
while True:
self.trigger_burst_readout.set(1)
if (
self.burst_cycle_finished.read()[self.burst_cycle_finished.name][
"value"
]
== 1
):
self._acquisition_done = True
return
if self._stopped == True:
return
time.sleep(0.01)
def stop(self, success=False):
"""Stops the DDG"""
self._stopped = True
self._acquisition_done = True
super().stop(success=success)
def stage(self): def stage(self):
"""Trigger the generator by arming to accept triggers""" """Trigger the generator by arming to accept triggers"""
self.scaninfo.load_scan_metadata() self.scaninfo.load_scan_metadata()
if self.scaninfo.scan_type == "step": if self.scaninfo.scan_type == "step":
# define parameters # define parameters
self._set_trigger(TriggerSource.SINGLE_SHOT) self._set_trigger(getattr(TriggerSource, self.set_trigger_source.get()))
exp_time = self.delta_width.get() + self.scaninfo.exp_time
delay_burst = self.delay_burst.get()
num_burst_cycle = 1 + self.additional_triggers.get()
# set parameters in DDG
self.burst_enable(num_burst_cycle, delay_burst, exp_time, config="first")
self.set_channels("delay", 0)
# Set burst length to half of the experimental time!
self.set_channels("width", exp_time)
elif self.scaninfo.scan_type == "fly":
# Prepare FSH DDG
if self.set_high_on_exposure.get(): if self.set_high_on_exposure.get():
# define parameters num_burst_cycle = 1
self._set_trigger(TriggerSource.SINGLE_SHOT) exp_time = self.delta_width.get() + self.scaninfo.num_frames * (
exp_time = ( self.scaninfo.exp_time + self.scaninfo.readout_time
self.delta_width.get()
+ self.scaninfo.exp_time * self.scaninfo.num_frames
+ self.scaninfo.readout_time * (self.scaninfo.num_frames - 1)
) )
total_exposure = exp_time total_exposure = exp_time
delay_burst = self.delay_burst.get() delay_burst = self.delay_burst.get()
# self.additional_triggers should be 0 for self.set_high_on_exposure or remove here fully..
num_burst_cycle = 1 + self.additional_triggers.get()
# set parameters in DDG
self.burst_enable( self.burst_enable(
num_burst_cycle, delay_burst, total_exposure, config="first" num_burst_cycle, delay_burst, total_exposure, config="first"
) )
@ -481,8 +504,6 @@ class DelayGeneratorDG645(Device):
# Set burst length to half of the experimental time! # Set burst length to half of the experimental time!
self.set_channels("width", exp_time) self.set_channels("width", exp_time)
else: else:
# define parameters
self._set_trigger(TriggerSource.SINGLE_SHOT)
exp_time = self.delta_width.get() + self.scaninfo.exp_time exp_time = self.delta_width.get() + self.scaninfo.exp_time
total_exposure = exp_time + self.scaninfo.readout_time total_exposure = exp_time + self.scaninfo.readout_time
delay_burst = self.delay_burst.get() delay_burst = self.delay_burst.get()
@ -496,20 +517,60 @@ class DelayGeneratorDG645(Device):
self.set_channels("delay", 0) self.set_channels("delay", 0)
# Set burst length to half of the experimental time! # Set burst length to half of the experimental time!
self.set_channels("width", exp_time) self.set_channels("width", exp_time)
elif self.scaninfo.scan_type == "fly":
# Prepare FSH DDG
if self.set_high_on_exposure.get():
# define parameters
self._set_trigger(getattr(TriggerSource, self.set_trigger_source.get()))
exp_time = (
self.delta_width.get()
+ self.scaninfo.exp_time * self.scaninfo.num_frames
+ self.scaninfo.readout_time * (self.scaninfo.num_frames - 1)
)
total_exposure = exp_time
delay_burst = self.delay_burst.get()
# self.additional_triggers should be 0 for self.set_high_on_exposure or remove here fully..
num_burst_cycle = 1 + self.additional_triggers.get()
# set parameters in DDG
self.burst_enable(
num_burst_cycle, delay_burst, total_exposure, config="first"
)
self.set_channels("delay", 0.0)
# Set burst length to half of the experimental time!
self.set_channels("width", exp_time)
else:
# define parameters
self._set_trigger(getattr(TriggerSource, self.set_trigger_source.get()))
exp_time = self.delta_width.get() + self.scaninfo.exp_time
total_exposure = exp_time + self.scaninfo.readout_time
delay_burst = self.delay_burst.get()
num_burst_cycle = (
self.scaninfo.num_frames + self.additional_triggers.get()
)
# set parameters in DDG
self.burst_enable(
num_burst_cycle, delay_burst, total_exposure, config="first"
)
self.set_channels("delay", 0.0)
# Set burst length to half of the experimental time!
self.set_channels("width", exp_time)
else: else:
raise DDGError(f"Unknown scan type {self.scaninfo.scan_type}") raise DDGError(f"Unknown scan type {self.scaninfo.scan_type}")
# Check status # Check status
self._ddg_is_okay() self._ddg_is_okay()
logger.info("DDG staged")
super().stage() super().stage()
def unstage(self): def unstage(self):
"""Stop the trigger generator from accepting triggers""" """Stop the trigger generator from accepting triggers"""
self._set_trigger(TriggerSource.SINGLE_SHOT) # self._set_trigger(getattr(TriggerSource, self.set_trigger_source.get()))
self._check_burst_cycle()
# Check status # Check status
self._ddg_is_okay() self._ddg_is_okay()
self._stopped = False
self._acquisition_done = False
super().unstage() super().unstage()
def trigger(self) -> None: def trigger(self) -> None: