wip: draft from working at the beamline

This commit is contained in:
gac-x12sa
2025-02-26 15:45:22 +01:00
parent 53dca4dc6f
commit b806487c54
3 changed files with 719 additions and 66 deletions
+595 -64
View File
@@ -8,17 +8,460 @@ from ophyd_devices.utils import bec_utils
logger = bec_logger.logger
class DelayGeneratorcSAXSError(Exception):
"""Exception raised for errors."""
# class DelayGeneratorcSAXSError(Exception):
# """Exception raised for errors."""
class DDGSetup(CustomPrepare["DelayGeneratorcSAXS"]):
# class DDGSetup(CustomPrepare["DelayGeneratorcSAXS"]):
# """
# Custom Prepare class with hooks for beamline specific logic for the DG645 at CSAXS
# """
# def on_wait_for_connection(self) -> None:
# """Init default parameter after the all signals are connected"""
# for ii, channel in enumerate(self.parent.all_channels):
# self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel])
# self.parent.set_channels("amplitude", self.parent.amplitude.get())
# self.parent.set_channels("offset", self.parent.offset.get())
# # Setup reference
# self.parent.set_channels(
# "reference", 0, [f"channel{pair}.ch1" for pair in self.parent.all_delay_pairs]
# )
# self.parent.set_channels(
# "reference", 0, [f"channel{pair}.ch2" for pair in self.parent.all_delay_pairs]
# )
# self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# # Set threshold level for ext. pulses
# self.parent.level.put(self.parent.thres_trig_level.get())
# def on_stage(self) -> None:
# "Hook execute before the scan starts"
# if self.parent.scaninfo.scan_type == "step":
# exp_time = self.parent.scaninfo.exp_time
# delay = 0
# self.parent.burst_disable()
# self.parent.set_trigger(TriggerSource.SINGLE_SHOT)
# self.parent.set_channels(signal="width", value=exp_time)
# self.parent.set_channels(signal="delay", value=delay)
# return
# scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
# if scan_name == "jjf_test":
# # TODO implement the logic for JJF triggering
# exp_time = 480e-6 # self.parent.scaninfo.exp_time
# readout = 20e-6 # self.parent.scaninfo.readout_time
# total_exposure = exp_time + readout
# num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
# num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure)
# delay = 0
# delay_burst = self.parent.delay_burst.get()
# self.parent.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT)
# self.parent.set_channels(signal="width", value=exp_time)
# self.parent.set_channels(signal="delay", value=delay)
# self.parent.burst_enable(
# count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first"
# )
# logger.info(
# f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}"
# )
# def on_trigger(self) -> DeviceStatus:
# """Method to be executed upon trigger"""
# if self.parent.scaninfo.scan_type == "step":
# self.parent.trigger_shot.put(1)
# return
# scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
# if scan_name == "jjf_test":
# exp_time = 480e-6 # self.parent.scaninfo.exp_time
# readout = 20e-6 # self.parent.scaninfo.readout_time
# total_exposure = exp_time + readout
# num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
# num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure)
# # Start trigger cycle
# self.parent.trigger_burst_readout.put(1)
# # Create status object that will wait for the end of the burst cycle
# status = self.wait_with_status(
# signal_conditions=[(self.parent.burst_cycle_finished, 1)],
# timeout=num_burst_cycle * total_exposure + 1, # add 1s to be sure
# check_stopped=True,
# exception_on_timeout=DelayGeneratorcSAXSError(
# f"{self.parent.name} run into timeout in complete call."
# ),
# )
# logger.info(f"Return status {self.parent.name}")
# return status
# def on_complete(self) -> DeviceStatus:
# pass
# def on_pre_scan(self) -> None:
# """
# Method called by pre_scan hook in parent class.
# Executes trigger if premove_trigger is Trus.
# """
# if self.parent.premove_trigger.get() is True:
# self.parent.trigger_shot.put(1)
# class DelayGeneratorcSAXS(BECDeviceBase, DelayGenerator):
# """
# DG645 delay generator at cSAXS (multiple can be in use depending on the setup)
# Default values for setting up DDG.
# Note: checks of set calues are not (only partially) included, check manual for details on possible settings.
# https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf
# - delay_burst : (float >=0) Delay between trigger and first pulse in burst mode
# - delta_width : (float >= 0) Add width to fast shutter signal to make sure its open during acquisition
# - additional_triggers : (int) add additional triggers to burst mode (mcs card needs +1 triggers per line)
# - polarity : (list of 0/1) polarity for different channels
# - amplitude : (float) amplitude voltage of TTLs
# - offset : (float) offset for ampltitude
# - thres_trig_level : (float) threshold of trigger amplitude
# Custom signals for logic in different DDGs during scans (for custom_prepare.prepare_ddg):
# - set_high_on_exposure : (bool): if True, then TTL signal should go high during the full acquisition time of a scan.
# # TODO trigger_width and fixed_ttl could be combined into single list.
# - fixed_ttl_width : (list of either 1 or 0), one for each channel.
# - trigger_width : (float) if fixed_ttl_width is True, then the width of the TTL pulse is set to this value.
# - set_trigger_source : (TriggerSource) specifies the default trigger source for the DDG.
# - premove_trigger : (bool) if True, then a trigger should be executed before the scan starts (to be implemented in on_pre_scan).
# - set_high_on_stage : (bool) if True, then TTL signal should go high already on stage.
# """
# custom_prepare_cls = DDGSetup
# # Custom signals passed on during the init procedure via BEC
# # TODO review whether those should remain here like that
# delay_burst = Component(
# bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config"
# )
# delta_width = Component(
# bec_utils.ConfigSignal, name="delta_width", kind="config", config_storage_name="ddg_config"
# )
# additional_triggers = Component(
# bec_utils.ConfigSignal,
# name="additional_triggers",
# kind="config",
# config_storage_name="ddg_config",
# )
# polarity = Component(
# bec_utils.ConfigSignal, name="polarity", kind="config", config_storage_name="ddg_config"
# )
# fixed_ttl_width = Component(
# bec_utils.ConfigSignal,
# name="fixed_ttl_width",
# kind="config",
# config_storage_name="ddg_config",
# )
# amplitude = Component(
# bec_utils.ConfigSignal, name="amplitude", kind="config", config_storage_name="ddg_config"
# )
# offset = Component(
# bec_utils.ConfigSignal, name="offset", kind="config", config_storage_name="ddg_config"
# )
# thres_trig_level = Component(
# bec_utils.ConfigSignal,
# name="thres_trig_level",
# kind="config",
# config_storage_name="ddg_config",
# )
# set_high_on_exposure = Component(
# bec_utils.ConfigSignal,
# name="set_high_on_exposure",
# kind="config",
# config_storage_name="ddg_config",
# )
# set_high_on_stage = Component(
# bec_utils.ConfigSignal,
# name="set_high_on_stage",
# kind="config",
# config_storage_name="ddg_config",
# )
# set_trigger_source = Component(
# bec_utils.ConfigSignal,
# name="set_trigger_source",
# kind="config",
# config_storage_name="ddg_config",
# )
# trigger_width = Component(
# bec_utils.ConfigSignal,
# name="trigger_width",
# kind="config",
# config_storage_name="ddg_config",
# )
# premove_trigger = Component(
# bec_utils.ConfigSignal,
# name="premove_trigger",
# kind="config",
# config_storage_name="ddg_config",
# )
# def __init__(
# self,
# name: str,
# prefix: str = "",
# kind: Kind = None,
# ddg_config: dict = None,
# parent=None,
# device_manager=None,
# **kwargs,
# ):
# """
# Args:
# prefix (str, optional): Prefix of the device. Defaults to "".
# name (str): Name of the device.
# kind (str, optional): Kind of the device. Defaults to None.
# read_attrs (list, optional): List of attributes to read. Defaults to None.
# configuration_attrs (list, optional): List of attributes to configure. Defaults to None.
# parent (Device, optional): Parent device. Defaults to None.
# device_manager (DeviceManagerBase, optional): DeviceManagerBase object. Defaults to None.
# sim_mode (bool, optional): Simulation mode flag. Defaults to False.
# ddg_config (dict, optional): Dictionary of ddg_config signals. Defaults to None.
# """
# # Default values for ddg_config signals
# self.ddg_config = {
# # Setup default values
# f"{name}_delay_burst": 0,
# f"{name}_delta_width": 0,
# f"{name}_additional_triggers": 0,
# f"{name}_polarity": [1, 1, 1, 1, 1],
# f"{name}_amplitude": 4.5,
# f"{name}_offset": 0,
# f"{name}_thres_trig_level": 2.5,
# # Values for different behaviour during scans
# f"{name}_fixed_ttl_width": [0, 0, 0, 0, 0],
# f"{name}_trigger_width": None,
# f"{name}_set_high_on_exposure": False,
# f"{name}_set_high_on_stage": False,
# f"{name}_set_trigger_source": "SINGLE_SHOT",
# f"{name}_premove_trigger": False,
# }
# if ddg_config is not None:
# # pylint: disable=expression-not-assigned
# [self.ddg_config.update({f"{name}_{key}": value}) for key, value in ddg_config.items()]
# super().__init__(
# prefix=prefix,
# name=name,
# kind=kind,
# parent=parent,
# device_manager=device_manager,
# **kwargs,
# )
# # if __name__ == "__main__":
# # dgen = DelayGeneratorcSAXS("X12SA-CPCL-DDG3:", name="ddg3")
# import time
# from bec_lib import bec_logger
# from ophyd import Component, DeviceStatus
# from ophyd_devices.interfaces.base_classes.psi_delay_generator_base import (
# DDGCustomMixin,
# PSIDelayGeneratorBase,
# TriggerSource,
# )
# from ophyd_devices.utils import bec_utils
# logger = bec_logger.logger
# class DelayGeneratorError(Exception):
# """Exception raised for errors."""
# class DDGSetup(DDGCustomMixin):
# """
# Mixin class for DelayGenerator logic at cSAXS.
# At cSAXS, multiple DDGs were operated at the same time. There different behaviour is
# implemented in the ddg_config signals that are passed via the device config.
# """
# def initialize_default_parameter(self) -> None:
# """Method to initialize default parameters."""
# for ii, channel in enumerate(self.parent.all_channels):
# self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel])
# self.parent.set_channels("amplitude", self.parent.amplitude.get())
# self.parent.set_channels("offset", self.parent.offset.get())
# # Setup reference
# self.parent.set_channels(
# "reference", 0, [f"channel{pair}.ch1" for pair in self.parent.all_delay_pairs]
# )
# self.parent.set_channels(
# "reference", 0, [f"channel{pair}.ch2" for pair in self.parent.all_delay_pairs]
# )
# self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# # Set threshold level for ext. pulses
# self.parent.level.put(self.parent.thres_trig_level.get())
# def prepare_ddg(self) -> None:
# """
# Method to prepare scan logic of cSAXS
# Two scantypes are supported: "step" and "fly":
# - step: Scan is performed by stepping the motor and acquiring data at each step
# - fly: Scan is performed by moving the motor with a constant velocity and acquiring data
# Custom logic for different DDG behaviour during scans.
# - set_high_on_exposure : If True, then TTL signal is high during
# the full exposure time of the scan (all frames).
# E.g. Keep shutter open for the full scan.
# - fixed_ttl_width : fixed_ttl_width is a list of 5 values, one for each channel.
# If the value is 0, then the width of the TTL pulse is determined,
# no matter which parameters are passed from the scaninfo for exposure time
# - set_trigger_source : Specifies the default trigger source for the DDG. For cSAXS, relevant ones
# were: SINGLE_SHOT, EXT_RISING_EDGE
# """
# self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# # scantype "step"
# if self.parent.scaninfo.scan_type == "step":
# # High on exposure means that the signal
# if self.parent.set_high_on_exposure.get():
# # caluculate parameters
# num_burst_cycle = 1 + self.parent.additional_triggers.get()
# exp_time = (
# self.parent.delta_width.get()
# + self.parent.scaninfo.frames_per_trigger
# * (self.parent.scaninfo.exp_time + self.parent.scaninfo.readout_time)
# )
# total_exposure = exp_time
# delay_burst = self.parent.delay_burst.get()
# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
# if not self.parent.trigger_width.get():
# self.parent.set_channels("width", exp_time)
# else:
# self.parent.set_channels("width", self.parent.trigger_width.get())
# for value, channel in zip(
# self.parent.fixed_ttl_width.get(), self.parent.all_channels
# ):
# logger.debug(f"Trying to set DDG {channel} to {value}")
# if value != 0:
# self.parent.set_channels("width", value, channels=[channel])
# else:
# # caluculate parameters
# exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
# total_exposure = exp_time + self.parent.scaninfo.readout_time
# delay_burst = self.parent.delay_burst.get()
# num_burst_cycle = (
# self.parent.scaninfo.frames_per_trigger + self.parent.additional_triggers.get()
# )
# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
# if not self.parent.trigger_width.get():
# self.parent.set_channels("width", exp_time)
# else:
# self.parent.set_channels("width", self.parent.trigger_width.get())
# # scantype "fly"
# elif self.parent.scaninfo.scan_type == "fly":
# if self.parent.set_high_on_exposure.get():
# # caluculate parameters
# exp_time = (
# self.parent.delta_width.get()
# + self.parent.scaninfo.exp_time * self.parent.scaninfo.num_points
# + self.parent.scaninfo.readout_time * (self.parent.scaninfo.num_points - 1)
# )
# total_exposure = exp_time
# delay_burst = self.parent.delay_burst.get()
# num_burst_cycle = 1 + self.parent.additional_triggers.get()
# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
# if not self.parent.trigger_width.get():
# self.parent.set_channels("width", exp_time)
# else:
# self.parent.set_channels("width", self.parent.trigger_width.get())
# for value, channel in zip(
# self.parent.fixed_ttl_width.get(), self.parent.all_channels
# ):
# logger.debug(f"Trying to set DDG {channel} to {value}")
# if value != 0:
# self.parent.set_channels("width", value, channels=[channel])
# else:
# # caluculate parameters
# exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
# total_exposure = exp_time + self.parent.scaninfo.readout_time
# delay_burst = self.parent.delay_burst.get()
# num_burst_cycle = (
# self.parent.scaninfo.num_points + self.parent.additional_triggers.get()
# )
# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
# if not self.parent.trigger_width.get():
# self.parent.set_channels("width", exp_time)
# else:
# self.parent.set_channels("width", self.parent.trigger_width.get())
# else:
# raise Exception(f"Unknown scan type {self.parent.scaninfo.scan_type}")
# # Set common DDG parameters
# self.parent.burst_enable(num_burst_cycle, delay_burst, total_exposure, config="first")
# self.parent.set_channels("delay", 0.0)
# def on_trigger(self) -> None:
# """Method to be executed upon trigger"""
# if self.parent.source.read()[self.parent.source.name]["value"] == TriggerSource.SINGLE_SHOT:
# self.parent.trigger_shot.put(1)
# def check_scan_id(self) -> None:
# """
# Method to check if scan_id has changed.
# If yes, then it changes parent.stopped to True, which will stop further actions.
# """
# old_scan_id = self.parent.scaninfo.scan_id
# self.parent.scaninfo.load_scan_metadata()
# if self.parent.scaninfo.scan_id != old_scan_id:
# self.parent.stopped = True
# def finished(self) -> None:
# """Method checks if DDG finished acquisition"""
# def on_pre_scan(self) -> None:
# """
# Method called by pre_scan hook in parent class.
# Executes trigger if premove_trigger is Trus.
# """
# if self.parent.premove_trigger.get() is True:
# self.parent.trigger_shot.put(1)
class DDGSetup(DDGCustomMixin):
"""
Custom Prepare class with hooks for beamline specific logic for the DG645 at CSAXS
Mixin class for DelayGenerator logic at cSAXS.
At cSAXS, multiple DDGs were operated at the same time. There different behaviour is
implemented in the ddg_config signals that are passed via the device config.
"""
def on_wait_for_connection(self) -> None:
"""Init default parameter after the all signals are connected"""
def initialize_default_parameter(self) -> None:
"""Method to initialize default parameters."""
for ii, channel in enumerate(self.parent.all_channels):
self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel])
@@ -35,68 +478,108 @@ class DDGSetup(CustomPrepare["DelayGeneratorcSAXS"]):
# Set threshold level for ext. pulses
self.parent.level.put(self.parent.thres_trig_level.get())
def on_stage(self) -> None:
"Hook execute before the scan starts"
if self.parent.scaninfo.scan_type == "step":
exp_time = self.parent.scaninfo.exp_time
delay = 0
self.parent.burst_disable()
self.parent.set_trigger(TriggerSource.SINGLE_SHOT)
self.parent.set_channels(signal="width", value=exp_time)
self.parent.set_channels(signal="delay", value=delay)
return
def prepare_ddg(self) -> None:
self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# scantype "jjf_test"
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "jjf_test":
# TODO implement the logic for JJF triggering
exp_time = 480e-6 # self.parent.scaninfo.exp_time
readout = 20e-6 # self.parent.scaninfo.readout_time
total_exposure = exp_time + readout
if scan_name == "jjf_test":
# exp_time = self.parent.scaninfo.exp_time
# readout = self.parent.scaninfo.readout_time
# num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
# total_exposure = exp_time+readout
exp_time = 480e-6#self.parent.scaninfo.exp_time
readout = 20e-6#self.parent.scaninfo.readout_time
total_exposure = exp_time+readout
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure)
delay = 0
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure)
delay = 0
delay_burst = self.parent.delay_burst.get()
self.parent.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT)
self.parent.set_channels(signal='width', value=exp_time)
self.parent.set_channels(signal='delay', value=delay)
self.parent.burst_enable(count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first")
logger.info(f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}")
self.parent.set_channels(signal="width", value=exp_time)
self.parent.set_channels(signal="delay", value=delay)
self.parent.burst_enable(
count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first"
)
logger.info(
f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}"
)
def on_trigger(self) -> DeviceStatus:
"""Method to be executed upon trigger"""
if self.parent.scaninfo.scan_type == "step":
self.parent.trigger_shot.put(1)
return
def on_stage(self) -> None:
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "jjf_test":
exp_time = 480e-6 # self.parent.scaninfo.exp_time
readout = 20e-6 # self.parent.scaninfo.readout_time
total_exposure = exp_time + readout
exp_time = 480e-6#self.parent.scaninfo.exp_time
readout = 20e-6#self.parent.scaninfo.readout_time
total_exposure = exp_time+readout
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure)
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure)
self.parent.set_channels("width", exp_time)
self.parent.set_channels("delay", 0.0)
logger.info(f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}")
self.parent.burst_enable(num_burst_cycle, 0, total_exposure, config="first")
# Start trigger cycle
self.parent.trigger_burst_readout.put(1)
# Create status object that will wait for the end of the burst cycle
status = self.wait_with_status(
signal_conditions=[(self.parent.burst_cycle_finished, 1)],
timeout=num_burst_cycle * total_exposure + 1, # add 1s to be sure
check_stopped=True,
exception_on_timeout=DelayGeneratorcSAXSError(
f"{self.parent.name} run into timeout in complete call."
),
)
def on_trigger(self) -> None:
"""Method to be executed upon trigger"""
if self.parent.source.read()[self.parent.source.name]["value"] == TriggerSource.SINGLE_SHOT:
self.parent.trigger_shot.put(1)
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "jjf_test":
exp_time = 480e-6#self.parent.scaninfo.exp_time
readout = 20e-6#self.parent.scaninfo.readout_time
total_exposure = exp_time+readout
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure)
cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["cycles"]
#time.sleep(num_burst_cycle*total_exposure)
def check_ddg()->int:
self.parent.trigger_burst_readout.put(1)
return self.parent.burst_cycle_finished.get()
status = self.wait_with_status(signal_conditions=[(check_ddg, 1)],
timeout=num_burst_cycle*total_exposure+1,
check_stopped=True,
exception_on_timeout=DelayGeneratorError(f"{self.parent.name} run into timeout in complete call.")
)
logger.info(f"Return status {self.parent.name}")
return status
# timer = 0
# while True:
# self.parent.trigger_burst_readout.put(1)
# state = self.parent.burst_cycle_finished.get()
# if state == 1:
# break
# time.sleep(0.05)
# timer +=0.05
# if timer>3:
# raise TimeoutError(f"{self.parent.name} did not return. Bit state for end_burst_cycle is {state} for state")
def on_complete(self) -> DeviceStatus:
pass
# logger.info(f"On complete started for {self.parent.name}")
# scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
# if scan_name != "jjf_test":
# return None
# def check_ddg()->int:
# lambda r : self.parent.trigger_burst_readout.put(1)
# return lambda r: self.parent.burst_cycle_finished.get()
# status = self.wait_with_status(signal_conditions=[(check_ddg, 1)],
# timeout=3,
# check_stopped=True,
# exception_on_timeout=DelayGeneratorError(f"{self.parent.name} run into timeout in complete call.")
# )
# logger.info(f"Return status {self.parent.name}")
# return status
def check_scan_id(self) -> None:
"""
Method to check if scan_id has changed.
If yes, then it changes parent.stopped to True, which will stop further actions.
"""
old_scan_id = self.parent.scaninfo.scan_id
self.parent.scaninfo.load_scan_metadata()
if self.parent.scaninfo.scan_id != old_scan_id:
self.parent.stopped = True
def finished(self) -> None:
"""Method checks if DDG finished acquisition"""
def on_pre_scan(self) -> None:
"""
@@ -108,7 +591,7 @@ class DDGSetup(CustomPrepare["DelayGeneratorcSAXS"]):
self.parent.trigger_shot.put(1)
class DelayGeneratorcSAXS(BECDeviceBase, DelayGenerator):
class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
"""
DG645 delay generator at cSAXS (multiple can be in use depending on the setup)
@@ -137,8 +620,6 @@ class DelayGeneratorcSAXS(BECDeviceBase, DelayGenerator):
custom_prepare_cls = DDGSetup
# Custom signals passed on during the init procedure via BEC
# TODO review whether those should remain here like that
delay_burst = Component(
bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config"
@@ -217,12 +698,15 @@ class DelayGeneratorcSAXS(BECDeviceBase, DelayGenerator):
def __init__(
self,
name: str,
prefix: str = "",
kind: Kind = None,
ddg_config: dict = None,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
ddg_config=None,
**kwargs,
):
"""
@@ -238,7 +722,6 @@ class DelayGeneratorcSAXS(BECDeviceBase, DelayGenerator):
ddg_config (dict, optional): Dictionary of ddg_config signals. Defaults to None.
"""
# Default values for ddg_config signals
self.ddg_config = {
# Setup default values
@@ -264,11 +747,59 @@ class DelayGeneratorcSAXS(BECDeviceBase, DelayGenerator):
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
device_manager=device_manager,
**kwargs,
)
# if __name__ == "__main__":
# dgen = DelayGeneratorcSAXS("X12SA-CPCL-DDG3:", name="ddg3")
if __name__ == "__main__":
# Start delay generator in simulation mode.
# Note: To run, access to Epics must be available.
import time
config = {
"delay_burst": 40.0e-3,
"delta_width": 0,
"additional_triggers": 0,
"polarity": [1, 0, 1, 1, 1], # T0 # to eiger and lecroy4
"amplitude": 4.5,
"offset": 0,
"thres_trig_level": 2.5,
"set_high_on_exposure": False,
"set_high_on_stage": False,
}
start = time.time()
print(f"Start with init of DDG3 with config: {config}")
dgen = DelayGeneratorcSAXS("X12SA-CPCL-DDG3:", name="dgen", ddg_config=config)
print(f"Finished init after: {time.time()-start}s")
start = time.time()
print(f"Start setting up DDG3")
exp_time = 1/(2e3) # 2 kHz
readout = exp_time/10
delay = 0
num_burst_cycle = 1e4 # N triggers
total_exposure = exp_time+readout
delay_burst = dgen.delay_burst.get()
dgen.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT)
dgen.set_channels(signal='width', value=exp_time)
dgen.set_channels(signal='delay', value=0)
dgen.burst_enable(count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first")
print(f"Start sending {num_burst_cycle} triggers after {time.time()-start}s, ETA {num_burst_cycle*total_exposure}s")
break_time = time.time()
dgen.trigger()
# Wait here briefly for status to finish, whether this is realiable has to be tested
time.sleep(num_burst_cycle*total_exposure)
timer = 0
while True:
dgen.trigger_burst_readout.put(1)
state = dgen.burst_cycle_finished.get()
if state == 1:
break
time.sleep(0.05)
timer +=0.05
if timer>3:
raise TimeoutError(f"dgen.name did not return with value {state} for state")
print(f"Finished trigger cascade of {num_burst_cycle} with {exp_time}s -> {num_burst_cycle*exp_time}s after {time.time()-start}s in total, {break_time} for sending triggers.")
+102
View File
@@ -0,0 +1,102 @@
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd import DeviceStatus
from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import JungfrauJochClient
from jfjoch_client.models.dataset_settings import DatasetSettings
from typing import TYPE_CHECKING
from bec_lib.devicemanager import ScanInfo
if TYPE_CHECKING: #pragma no cover
from bec_lib.devicemanager import ScanInfo
class EigerCSAXS(PSIDeviceBase):
########################################
# Beamline Specific Implementations #
########################################
def __init__(self, name: str, host:str="http://sls-jfjoch-001", port:int=8080, scan_info:ScanInfo=None, **kwargs):
"""
Initialize the PSI Device Base class.
Args:
name (str) : Name of the device
scan_info (ScanInfo): The scan info to use.
"""
super().__init__(name=name, scan_info=scan_info, **kwargs)
self._host = f"{host}:{port}"
self.jfj_client = JungfrauJochClient(host=self._host, parent=self)
def on_init(self) -> None:
"""
Called when the device is initialized.
No siganls are connected at this point,
thus should not be set here but in on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
self.jfj_client.connect_and_initialise(timeout=5)
def on_stage(self) -> DeviceStatus | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info object.
"""
if self.scan_info.msg.scan_name != "jfj_test":
return
num_burst_cycle = self.scan_info.msg.scan_parameters['num_points']
cycles = self.scan_info.msg.scan_parameters['cycles']
exp_time = self.scan_info.msg.scan_parameters['exp_time']
total_points = num_burst_cycle * cycles
settings = DatasetSettings(
image_time_us= int(exp_time*1e6),
ntrigger = total_points,
beam_x_pxl=0,
beam_y_pxl=0,
detector_distance_mm=100,
incident_energy_keV=10.00,
)
self.jfj_client.start(settings = settings)
def on_unstage(self) -> DeviceStatus | None:
"""Called while unstaging the device."""
def on_pre_scan(self) -> DeviceStatus | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | None:
"""Called to inquire if a device has completed a scans."""
if self.scan_info.msg.scan_name != "jfj_test":
return
def wait_for_complete():
timeout = 10
for _ in range(timeout):
try:
self.jfj_client.wait_till_done(timeout=1)
except TimeoutError:
continue
except Exception as e:
raise ValueError(f"Error in complete for {self.name}, exception: {e}") from e
else:
break
status = self.task_handler.submit_task(wait_for_complete, run=True)
return status
def on_kickoff(self) -> DeviceStatus | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.jfj_client.stop()
@@ -10,6 +10,7 @@ from jfjoch_client.configuration import Configuration
from jfjoch_client.models.broker_status import BrokerStatus
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_settings import DetectorSettings
from jfjoch_client.api_response import ApiResponse
from ophyd import Device
logger = bec_logger.logger
@@ -39,6 +40,10 @@ class ResponseWaitDone(int, enum.Enum):
DETECTOR_INACTIVE = 502
TIMEOUT_REACHED = 504
class ResponseCancelDone(int, enum.Enum):
""" HTTP Response for cancel post"""
CANCEL_SENT_TO_FPGA = 200
class JungfrauJochClient:
"""Thin wrapper around the Jungfrau Joch API client"""
@@ -97,7 +102,7 @@ class JungfrauJochClient:
settings = DetectorSettings(**settings)
self.api.config_detector_put(detector_settings=settings)
def start_mesaurement(self, settings: dict | DatasetSettings) -> None:
def start(self, settings: dict | DatasetSettings) -> None:
"""Start the mesaurement. DatasetSettings must be provided, and JungfrauJoch must be in IDLE state.
The method call is blocking and JungfrauJoch will be ready to measure after the call resolves.
@@ -116,7 +121,7 @@ class JungfrauJochClient:
if isinstance(settings, dict):
settings = DatasetSettings(**settings)
try:
res = self.api.start_post_with_http_info(dataset_settings=settings)
res:ApiResponse = self.api.start_post_with_http_info(dataset_settings=settings)
if res.status_code != 200:
response = f"Error in {self._parent_name}, while setting measurement settings {settings}, response: {res}"
raise JungfrauJochClientError(response)
@@ -127,6 +132,21 @@ class JungfrauJochClient:
response = f"Error in {self._parent_name}, while setting measurement settings {settings}, exception: {e}"
logger.error(response)
raise JungfrauJochClientError(response) from e
def stop(self) -> None:
"""Stop the acquisition"""
try:
res:ApiResponse = self.api.cancel_post_with_http_info() # Should we use a timeout?
if res.status_code != ResponseCancelDone.CANCEL_SENT_TO_FPGA:
response = f"Error in device {self._parent_name} while stopping the measurement. Exception: {exc}"
raise JungfrauJochClientError(response)
except Exception as exc:
logger.error(f"")
logger.error(response)
raise(JungfrauJochClientError(response)) from exc
def wait_till_done(self, timeout: int = 5) -> None:
"""Wait for JungfrauJoch to be in Idle state. Blocking call with timeout.