diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs.py b/csaxs_bec/devices/epics/delay_generator_csaxs.py index 1094857..f34090c 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs.py @@ -8,17 +8,460 @@ from ophyd_devices.utils import bec_utils logger = bec_logger.logger -class DelayGeneratorcSAXSError(Exception): - """Exception raised for errors.""" +# class DelayGeneratorcSAXSError(Exception): +# """Exception raised for errors.""" -class DDGSetup(CustomPrepare["DelayGeneratorcSAXS"]): +# class DDGSetup(CustomPrepare["DelayGeneratorcSAXS"]): +# """ +# Custom Prepare class with hooks for beamline specific logic for the DG645 at CSAXS +# """ + +# def on_wait_for_connection(self) -> None: +# """Init default parameter after the all signals are connected""" +# for ii, channel in enumerate(self.parent.all_channels): +# self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel]) + +# self.parent.set_channels("amplitude", self.parent.amplitude.get()) +# self.parent.set_channels("offset", self.parent.offset.get()) +# # Setup reference +# self.parent.set_channels( +# "reference", 0, [f"channel{pair}.ch1" for pair in self.parent.all_delay_pairs] +# ) +# self.parent.set_channels( +# "reference", 0, [f"channel{pair}.ch2" for pair in self.parent.all_delay_pairs] +# ) +# self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get())) +# # Set threshold level for ext. pulses +# self.parent.level.put(self.parent.thres_trig_level.get()) + +# def on_stage(self) -> None: +# "Hook execute before the scan starts" +# if self.parent.scaninfo.scan_type == "step": +# exp_time = self.parent.scaninfo.exp_time +# delay = 0 +# self.parent.burst_disable() +# self.parent.set_trigger(TriggerSource.SINGLE_SHOT) +# self.parent.set_channels(signal="width", value=exp_time) +# self.parent.set_channels(signal="delay", value=delay) +# return +# scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "") +# if scan_name == "jjf_test": +# # TODO implement the logic for JJF triggering +# exp_time = 480e-6 # self.parent.scaninfo.exp_time +# readout = 20e-6 # self.parent.scaninfo.readout_time +# total_exposure = exp_time + readout +# num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"] +# num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure) +# delay = 0 +# delay_burst = self.parent.delay_burst.get() + +# self.parent.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT) + +# self.parent.set_channels(signal="width", value=exp_time) +# self.parent.set_channels(signal="delay", value=delay) +# self.parent.burst_enable( +# count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first" +# ) +# logger.info( +# f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}" +# ) + +# def on_trigger(self) -> DeviceStatus: +# """Method to be executed upon trigger""" +# if self.parent.scaninfo.scan_type == "step": +# self.parent.trigger_shot.put(1) +# return +# scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "") +# if scan_name == "jjf_test": +# exp_time = 480e-6 # self.parent.scaninfo.exp_time +# readout = 20e-6 # self.parent.scaninfo.readout_time +# total_exposure = exp_time + readout +# num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"] +# num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure) + +# # Start trigger cycle +# self.parent.trigger_burst_readout.put(1) + +# # Create status object that will wait for the end of the burst cycle +# status = self.wait_with_status( +# signal_conditions=[(self.parent.burst_cycle_finished, 1)], +# timeout=num_burst_cycle * total_exposure + 1, # add 1s to be sure +# check_stopped=True, +# exception_on_timeout=DelayGeneratorcSAXSError( +# f"{self.parent.name} run into timeout in complete call." +# ), +# ) +# logger.info(f"Return status {self.parent.name}") +# return status + +# def on_complete(self) -> DeviceStatus: +# pass + +# def on_pre_scan(self) -> None: +# """ +# Method called by pre_scan hook in parent class. + +# Executes trigger if premove_trigger is Trus. +# """ +# if self.parent.premove_trigger.get() is True: +# self.parent.trigger_shot.put(1) + + +# class DelayGeneratorcSAXS(BECDeviceBase, DelayGenerator): +# """ +# DG645 delay generator at cSAXS (multiple can be in use depending on the setup) + +# Default values for setting up DDG. +# Note: checks of set calues are not (only partially) included, check manual for details on possible settings. +# https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf + +# - delay_burst : (float >=0) Delay between trigger and first pulse in burst mode +# - delta_width : (float >= 0) Add width to fast shutter signal to make sure its open during acquisition +# - additional_triggers : (int) add additional triggers to burst mode (mcs card needs +1 triggers per line) +# - polarity : (list of 0/1) polarity for different channels +# - amplitude : (float) amplitude voltage of TTLs +# - offset : (float) offset for ampltitude +# - thres_trig_level : (float) threshold of trigger amplitude + +# Custom signals for logic in different DDGs during scans (for custom_prepare.prepare_ddg): + +# - set_high_on_exposure : (bool): if True, then TTL signal should go high during the full acquisition time of a scan. +# # TODO trigger_width and fixed_ttl could be combined into single list. +# - fixed_ttl_width : (list of either 1 or 0), one for each channel. +# - trigger_width : (float) if fixed_ttl_width is True, then the width of the TTL pulse is set to this value. +# - set_trigger_source : (TriggerSource) specifies the default trigger source for the DDG. +# - premove_trigger : (bool) if True, then a trigger should be executed before the scan starts (to be implemented in on_pre_scan). +# - set_high_on_stage : (bool) if True, then TTL signal should go high already on stage. +# """ + +# custom_prepare_cls = DDGSetup + +# # Custom signals passed on during the init procedure via BEC +# # TODO review whether those should remain here like that + +# delay_burst = Component( +# bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config" +# ) + +# delta_width = Component( +# bec_utils.ConfigSignal, name="delta_width", kind="config", config_storage_name="ddg_config" +# ) + +# additional_triggers = Component( +# bec_utils.ConfigSignal, +# name="additional_triggers", +# kind="config", +# config_storage_name="ddg_config", +# ) + +# polarity = Component( +# bec_utils.ConfigSignal, name="polarity", kind="config", config_storage_name="ddg_config" +# ) + +# fixed_ttl_width = Component( +# bec_utils.ConfigSignal, +# name="fixed_ttl_width", +# kind="config", +# config_storage_name="ddg_config", +# ) + +# amplitude = Component( +# bec_utils.ConfigSignal, name="amplitude", kind="config", config_storage_name="ddg_config" +# ) + +# offset = Component( +# bec_utils.ConfigSignal, name="offset", kind="config", config_storage_name="ddg_config" +# ) + +# thres_trig_level = Component( +# bec_utils.ConfigSignal, +# name="thres_trig_level", +# kind="config", +# config_storage_name="ddg_config", +# ) + +# set_high_on_exposure = Component( +# bec_utils.ConfigSignal, +# name="set_high_on_exposure", +# kind="config", +# config_storage_name="ddg_config", +# ) + +# set_high_on_stage = Component( +# bec_utils.ConfigSignal, +# name="set_high_on_stage", +# kind="config", +# config_storage_name="ddg_config", +# ) + +# set_trigger_source = Component( +# bec_utils.ConfigSignal, +# name="set_trigger_source", +# kind="config", +# config_storage_name="ddg_config", +# ) + +# trigger_width = Component( +# bec_utils.ConfigSignal, +# name="trigger_width", +# kind="config", +# config_storage_name="ddg_config", +# ) +# premove_trigger = Component( +# bec_utils.ConfigSignal, +# name="premove_trigger", +# kind="config", +# config_storage_name="ddg_config", +# ) + +# def __init__( +# self, +# name: str, +# prefix: str = "", +# kind: Kind = None, +# ddg_config: dict = None, +# parent=None, +# device_manager=None, +# **kwargs, +# ): +# """ +# Args: +# prefix (str, optional): Prefix of the device. Defaults to "". +# name (str): Name of the device. +# kind (str, optional): Kind of the device. Defaults to None. +# read_attrs (list, optional): List of attributes to read. Defaults to None. +# configuration_attrs (list, optional): List of attributes to configure. Defaults to None. +# parent (Device, optional): Parent device. Defaults to None. +# device_manager (DeviceManagerBase, optional): DeviceManagerBase object. Defaults to None. +# sim_mode (bool, optional): Simulation mode flag. Defaults to False. +# ddg_config (dict, optional): Dictionary of ddg_config signals. Defaults to None. + +# """ + +# # Default values for ddg_config signals +# self.ddg_config = { +# # Setup default values +# f"{name}_delay_burst": 0, +# f"{name}_delta_width": 0, +# f"{name}_additional_triggers": 0, +# f"{name}_polarity": [1, 1, 1, 1, 1], +# f"{name}_amplitude": 4.5, +# f"{name}_offset": 0, +# f"{name}_thres_trig_level": 2.5, +# # Values for different behaviour during scans +# f"{name}_fixed_ttl_width": [0, 0, 0, 0, 0], +# f"{name}_trigger_width": None, +# f"{name}_set_high_on_exposure": False, +# f"{name}_set_high_on_stage": False, +# f"{name}_set_trigger_source": "SINGLE_SHOT", +# f"{name}_premove_trigger": False, +# } +# if ddg_config is not None: +# # pylint: disable=expression-not-assigned +# [self.ddg_config.update({f"{name}_{key}": value}) for key, value in ddg_config.items()] +# super().__init__( +# prefix=prefix, +# name=name, +# kind=kind, +# parent=parent, +# device_manager=device_manager, +# **kwargs, +# ) + + +# # if __name__ == "__main__": +# # dgen = DelayGeneratorcSAXS("X12SA-CPCL-DDG3:", name="ddg3") + + +# import time + +# from bec_lib import bec_logger +# from ophyd import Component, DeviceStatus + +# from ophyd_devices.interfaces.base_classes.psi_delay_generator_base import ( +# DDGCustomMixin, +# PSIDelayGeneratorBase, +# TriggerSource, +# ) +# from ophyd_devices.utils import bec_utils + +# logger = bec_logger.logger + + +# class DelayGeneratorError(Exception): +# """Exception raised for errors.""" + + +# class DDGSetup(DDGCustomMixin): +# """ +# Mixin class for DelayGenerator logic at cSAXS. + +# At cSAXS, multiple DDGs were operated at the same time. There different behaviour is +# implemented in the ddg_config signals that are passed via the device config. +# """ + +# def initialize_default_parameter(self) -> None: +# """Method to initialize default parameters.""" +# for ii, channel in enumerate(self.parent.all_channels): +# self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel]) + +# self.parent.set_channels("amplitude", self.parent.amplitude.get()) +# self.parent.set_channels("offset", self.parent.offset.get()) +# # Setup reference +# self.parent.set_channels( +# "reference", 0, [f"channel{pair}.ch1" for pair in self.parent.all_delay_pairs] +# ) +# self.parent.set_channels( +# "reference", 0, [f"channel{pair}.ch2" for pair in self.parent.all_delay_pairs] +# ) +# self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get())) +# # Set threshold level for ext. pulses +# self.parent.level.put(self.parent.thres_trig_level.get()) + +# def prepare_ddg(self) -> None: +# """ +# Method to prepare scan logic of cSAXS + +# Two scantypes are supported: "step" and "fly": +# - step: Scan is performed by stepping the motor and acquiring data at each step +# - fly: Scan is performed by moving the motor with a constant velocity and acquiring data + +# Custom logic for different DDG behaviour during scans. + +# - set_high_on_exposure : If True, then TTL signal is high during +# the full exposure time of the scan (all frames). +# E.g. Keep shutter open for the full scan. +# - fixed_ttl_width : fixed_ttl_width is a list of 5 values, one for each channel. +# If the value is 0, then the width of the TTL pulse is determined, +# no matter which parameters are passed from the scaninfo for exposure time +# - set_trigger_source : Specifies the default trigger source for the DDG. For cSAXS, relevant ones +# were: SINGLE_SHOT, EXT_RISING_EDGE +# """ +# self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get())) +# # scantype "step" +# if self.parent.scaninfo.scan_type == "step": +# # High on exposure means that the signal +# if self.parent.set_high_on_exposure.get(): +# # caluculate parameters +# num_burst_cycle = 1 + self.parent.additional_triggers.get() + +# exp_time = ( +# self.parent.delta_width.get() +# + self.parent.scaninfo.frames_per_trigger +# * (self.parent.scaninfo.exp_time + self.parent.scaninfo.readout_time) +# ) +# total_exposure = exp_time +# delay_burst = self.parent.delay_burst.get() + +# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too +# if not self.parent.trigger_width.get(): +# self.parent.set_channels("width", exp_time) +# else: +# self.parent.set_channels("width", self.parent.trigger_width.get()) +# for value, channel in zip( +# self.parent.fixed_ttl_width.get(), self.parent.all_channels +# ): +# logger.debug(f"Trying to set DDG {channel} to {value}") +# if value != 0: +# self.parent.set_channels("width", value, channels=[channel]) +# else: +# # caluculate parameters +# exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time +# total_exposure = exp_time + self.parent.scaninfo.readout_time +# delay_burst = self.parent.delay_burst.get() +# num_burst_cycle = ( +# self.parent.scaninfo.frames_per_trigger + self.parent.additional_triggers.get() +# ) + +# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too +# if not self.parent.trigger_width.get(): +# self.parent.set_channels("width", exp_time) +# else: +# self.parent.set_channels("width", self.parent.trigger_width.get()) +# # scantype "fly" +# elif self.parent.scaninfo.scan_type == "fly": +# if self.parent.set_high_on_exposure.get(): +# # caluculate parameters +# exp_time = ( +# self.parent.delta_width.get() +# + self.parent.scaninfo.exp_time * self.parent.scaninfo.num_points +# + self.parent.scaninfo.readout_time * (self.parent.scaninfo.num_points - 1) +# ) +# total_exposure = exp_time +# delay_burst = self.parent.delay_burst.get() +# num_burst_cycle = 1 + self.parent.additional_triggers.get() + +# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too +# if not self.parent.trigger_width.get(): +# self.parent.set_channels("width", exp_time) +# else: +# self.parent.set_channels("width", self.parent.trigger_width.get()) +# for value, channel in zip( +# self.parent.fixed_ttl_width.get(), self.parent.all_channels +# ): +# logger.debug(f"Trying to set DDG {channel} to {value}") +# if value != 0: +# self.parent.set_channels("width", value, channels=[channel]) +# else: +# # caluculate parameters +# exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time +# total_exposure = exp_time + self.parent.scaninfo.readout_time +# delay_burst = self.parent.delay_burst.get() +# num_burst_cycle = ( +# self.parent.scaninfo.num_points + self.parent.additional_triggers.get() +# ) + +# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too +# if not self.parent.trigger_width.get(): +# self.parent.set_channels("width", exp_time) +# else: +# self.parent.set_channels("width", self.parent.trigger_width.get()) + +# else: +# raise Exception(f"Unknown scan type {self.parent.scaninfo.scan_type}") +# # Set common DDG parameters +# self.parent.burst_enable(num_burst_cycle, delay_burst, total_exposure, config="first") +# self.parent.set_channels("delay", 0.0) + +# def on_trigger(self) -> None: +# """Method to be executed upon trigger""" +# if self.parent.source.read()[self.parent.source.name]["value"] == TriggerSource.SINGLE_SHOT: +# self.parent.trigger_shot.put(1) + +# def check_scan_id(self) -> None: +# """ +# Method to check if scan_id has changed. + +# If yes, then it changes parent.stopped to True, which will stop further actions. +# """ +# old_scan_id = self.parent.scaninfo.scan_id +# self.parent.scaninfo.load_scan_metadata() +# if self.parent.scaninfo.scan_id != old_scan_id: +# self.parent.stopped = True + +# def finished(self) -> None: +# """Method checks if DDG finished acquisition""" + +# def on_pre_scan(self) -> None: +# """ +# Method called by pre_scan hook in parent class. + +# Executes trigger if premove_trigger is Trus. +# """ +# if self.parent.premove_trigger.get() is True: +# self.parent.trigger_shot.put(1) + +class DDGSetup(DDGCustomMixin): """ - Custom Prepare class with hooks for beamline specific logic for the DG645 at CSAXS + Mixin class for DelayGenerator logic at cSAXS. + + At cSAXS, multiple DDGs were operated at the same time. There different behaviour is + implemented in the ddg_config signals that are passed via the device config. """ - def on_wait_for_connection(self) -> None: - """Init default parameter after the all signals are connected""" + def initialize_default_parameter(self) -> None: + """Method to initialize default parameters.""" for ii, channel in enumerate(self.parent.all_channels): self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel]) @@ -35,68 +478,108 @@ class DDGSetup(CustomPrepare["DelayGeneratorcSAXS"]): # Set threshold level for ext. pulses self.parent.level.put(self.parent.thres_trig_level.get()) - def on_stage(self) -> None: - "Hook execute before the scan starts" - if self.parent.scaninfo.scan_type == "step": - exp_time = self.parent.scaninfo.exp_time - delay = 0 - self.parent.burst_disable() - self.parent.set_trigger(TriggerSource.SINGLE_SHOT) - self.parent.set_channels(signal="width", value=exp_time) - self.parent.set_channels(signal="delay", value=delay) - return + def prepare_ddg(self) -> None: + self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get())) + # scantype "jjf_test" scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "") - if scan_name == "jjf_test": - # TODO implement the logic for JJF triggering - exp_time = 480e-6 # self.parent.scaninfo.exp_time - readout = 20e-6 # self.parent.scaninfo.readout_time - total_exposure = exp_time + readout + if scan_name == "jjf_test": + # exp_time = self.parent.scaninfo.exp_time + # readout = self.parent.scaninfo.readout_time + # num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"] + # total_exposure = exp_time+readout + exp_time = 480e-6#self.parent.scaninfo.exp_time + readout = 20e-6#self.parent.scaninfo.readout_time + total_exposure = exp_time+readout num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"] - num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure) - delay = 0 + num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure) + delay = 0 delay_burst = self.parent.delay_burst.get() - + self.parent.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT) + + self.parent.set_channels(signal='width', value=exp_time) + self.parent.set_channels(signal='delay', value=delay) + self.parent.burst_enable(count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first") + logger.info(f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}") - self.parent.set_channels(signal="width", value=exp_time) - self.parent.set_channels(signal="delay", value=delay) - self.parent.burst_enable( - count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first" - ) - logger.info( - f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}" - ) - - def on_trigger(self) -> DeviceStatus: - """Method to be executed upon trigger""" - if self.parent.scaninfo.scan_type == "step": - self.parent.trigger_shot.put(1) - return + def on_stage(self) -> None: scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "") if scan_name == "jjf_test": - exp_time = 480e-6 # self.parent.scaninfo.exp_time - readout = 20e-6 # self.parent.scaninfo.readout_time - total_exposure = exp_time + readout + exp_time = 480e-6#self.parent.scaninfo.exp_time + readout = 20e-6#self.parent.scaninfo.readout_time + total_exposure = exp_time+readout num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"] - num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time / total_exposure) + num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure) + self.parent.set_channels("width", exp_time) + self.parent.set_channels("delay", 0.0) + logger.info(f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}") + self.parent.burst_enable(num_burst_cycle, 0, total_exposure, config="first") - # Start trigger cycle - self.parent.trigger_burst_readout.put(1) - - # Create status object that will wait for the end of the burst cycle - status = self.wait_with_status( - signal_conditions=[(self.parent.burst_cycle_finished, 1)], - timeout=num_burst_cycle * total_exposure + 1, # add 1s to be sure - check_stopped=True, - exception_on_timeout=DelayGeneratorcSAXSError( - f"{self.parent.name} run into timeout in complete call." - ), - ) + def on_trigger(self) -> None: + """Method to be executed upon trigger""" + if self.parent.source.read()[self.parent.source.name]["value"] == TriggerSource.SINGLE_SHOT: + self.parent.trigger_shot.put(1) + scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "") + if scan_name == "jjf_test": + exp_time = 480e-6#self.parent.scaninfo.exp_time + readout = 20e-6#self.parent.scaninfo.readout_time + total_exposure = exp_time+readout + num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"] + num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure) + cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["cycles"] + #time.sleep(num_burst_cycle*total_exposure) + def check_ddg()->int: + self.parent.trigger_burst_readout.put(1) + return self.parent.burst_cycle_finished.get() + status = self.wait_with_status(signal_conditions=[(check_ddg, 1)], + timeout=num_burst_cycle*total_exposure+1, + check_stopped=True, + exception_on_timeout=DelayGeneratorError(f"{self.parent.name} run into timeout in complete call.") + ) logger.info(f"Return status {self.parent.name}") return status + # timer = 0 + # while True: + # self.parent.trigger_burst_readout.put(1) + # state = self.parent.burst_cycle_finished.get() + # if state == 1: + # break + # time.sleep(0.05) + # timer +=0.05 + # if timer>3: + # raise TimeoutError(f"{self.parent.name} did not return. Bit state for end_burst_cycle is {state} for state") + def on_complete(self) -> DeviceStatus: pass + # logger.info(f"On complete started for {self.parent.name}") + # scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "") + # if scan_name != "jjf_test": + # return None + # def check_ddg()->int: + # lambda r : self.parent.trigger_burst_readout.put(1) + # return lambda r: self.parent.burst_cycle_finished.get() + # status = self.wait_with_status(signal_conditions=[(check_ddg, 1)], + # timeout=3, + # check_stopped=True, + # exception_on_timeout=DelayGeneratorError(f"{self.parent.name} run into timeout in complete call.") + # ) + # logger.info(f"Return status {self.parent.name}") + # return status + + def check_scan_id(self) -> None: + """ + Method to check if scan_id has changed. + + If yes, then it changes parent.stopped to True, which will stop further actions. + """ + old_scan_id = self.parent.scaninfo.scan_id + self.parent.scaninfo.load_scan_metadata() + if self.parent.scaninfo.scan_id != old_scan_id: + self.parent.stopped = True + + def finished(self) -> None: + """Method checks if DDG finished acquisition""" def on_pre_scan(self) -> None: """ @@ -108,7 +591,7 @@ class DDGSetup(CustomPrepare["DelayGeneratorcSAXS"]): self.parent.trigger_shot.put(1) -class DelayGeneratorcSAXS(BECDeviceBase, DelayGenerator): +class DelayGeneratorcSAXS(PSIDelayGeneratorBase): """ DG645 delay generator at cSAXS (multiple can be in use depending on the setup) @@ -137,8 +620,6 @@ class DelayGeneratorcSAXS(BECDeviceBase, DelayGenerator): custom_prepare_cls = DDGSetup - # Custom signals passed on during the init procedure via BEC - # TODO review whether those should remain here like that delay_burst = Component( bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config" @@ -217,12 +698,15 @@ class DelayGeneratorcSAXS(BECDeviceBase, DelayGenerator): def __init__( self, - name: str, - prefix: str = "", - kind: Kind = None, - ddg_config: dict = None, + prefix="", + *, + name, + kind=None, + read_attrs=None, + configuration_attrs=None, parent=None, device_manager=None, + ddg_config=None, **kwargs, ): """ @@ -238,7 +722,6 @@ class DelayGeneratorcSAXS(BECDeviceBase, DelayGenerator): ddg_config (dict, optional): Dictionary of ddg_config signals. Defaults to None. """ - # Default values for ddg_config signals self.ddg_config = { # Setup default values @@ -264,11 +747,59 @@ class DelayGeneratorcSAXS(BECDeviceBase, DelayGenerator): prefix=prefix, name=name, kind=kind, + read_attrs=read_attrs, + configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs, ) -# if __name__ == "__main__": -# dgen = DelayGeneratorcSAXS("X12SA-CPCL-DDG3:", name="ddg3") +if __name__ == "__main__": + # Start delay generator in simulation mode. + # Note: To run, access to Epics must be available. + import time + config = { + "delay_burst": 40.0e-3, + "delta_width": 0, + "additional_triggers": 0, + "polarity": [1, 0, 1, 1, 1], # T0 # to eiger and lecroy4 + "amplitude": 4.5, + "offset": 0, + "thres_trig_level": 2.5, + "set_high_on_exposure": False, + "set_high_on_stage": False, + } + start = time.time() + print(f"Start with init of DDG3 with config: {config}") + dgen = DelayGeneratorcSAXS("X12SA-CPCL-DDG3:", name="dgen", ddg_config=config) + print(f"Finished init after: {time.time()-start}s") + start = time.time() + print(f"Start setting up DDG3") + exp_time = 1/(2e3) # 2 kHz + readout = exp_time/10 + delay = 0 + num_burst_cycle = 1e4 # N triggers + total_exposure = exp_time+readout + delay_burst = dgen.delay_burst.get() + dgen.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT) + + dgen.set_channels(signal='width', value=exp_time) + dgen.set_channels(signal='delay', value=0) + dgen.burst_enable(count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first") + print(f"Start sending {num_burst_cycle} triggers after {time.time()-start}s, ETA {num_burst_cycle*total_exposure}s") + break_time = time.time() + dgen.trigger() + # Wait here briefly for status to finish, whether this is realiable has to be tested + time.sleep(num_burst_cycle*total_exposure) + timer = 0 + while True: + dgen.trigger_burst_readout.put(1) + state = dgen.burst_cycle_finished.get() + if state == 1: + break + time.sleep(0.05) + timer +=0.05 + if timer>3: + raise TimeoutError(f"dgen.name did not return with value {state} for state") + print(f"Finished trigger cascade of {num_burst_cycle} with {exp_time}s -> {num_burst_cycle*exp_time}s after {time.time()-start}s in total, {break_time} for sending triggers.") diff --git a/csaxs_bec/devices/jungfraujoch/eiger_jfj.py b/csaxs_bec/devices/jungfraujoch/eiger_jfj.py new file mode 100644 index 0000000..51270f5 --- /dev/null +++ b/csaxs_bec/devices/jungfraujoch/eiger_jfj.py @@ -0,0 +1,102 @@ +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase +from ophyd import DeviceStatus +from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import JungfrauJochClient +from jfjoch_client.models.dataset_settings import DatasetSettings +from typing import TYPE_CHECKING +from bec_lib.devicemanager import ScanInfo + +if TYPE_CHECKING: #pragma no cover + from bec_lib.devicemanager import ScanInfo + +class EigerCSAXS(PSIDeviceBase): + + + ######################################## + # Beamline Specific Implementations # + ######################################## + + def __init__(self, name: str, host:str="http://sls-jfjoch-001", port:int=8080, scan_info:ScanInfo=None, **kwargs): + """ + Initialize the PSI Device Base class. + + Args: + name (str) : Name of the device + scan_info (ScanInfo): The scan info to use. + """ + super().__init__(name=name, scan_info=scan_info, **kwargs) + self._host = f"{host}:{port}" + self.jfj_client = JungfrauJochClient(host=self._host, parent=self) + + def on_init(self) -> None: + """ + Called when the device is initialized. + + No siganls are connected at this point, + thus should not be set here but in on_connected instead. + """ + + def on_connected(self) -> None: + """ + Called after the device is connected and its signals are connected. + Default values for signals should be set here. + """ + self.jfj_client.connect_and_initialise(timeout=5) + + def on_stage(self) -> DeviceStatus | None: + """ + Called while staging the device. + + Information about the upcoming scan can be accessed from the scan_info object. + """ + if self.scan_info.msg.scan_name != "jfj_test": + return + num_burst_cycle = self.scan_info.msg.scan_parameters['num_points'] + cycles = self.scan_info.msg.scan_parameters['cycles'] + exp_time = self.scan_info.msg.scan_parameters['exp_time'] + total_points = num_burst_cycle * cycles + settings = DatasetSettings( + image_time_us= int(exp_time*1e6), + ntrigger = total_points, + beam_x_pxl=0, + beam_y_pxl=0, + detector_distance_mm=100, + incident_energy_keV=10.00, + ) + self.jfj_client.start(settings = settings) + + + def on_unstage(self) -> DeviceStatus | None: + """Called while unstaging the device.""" + + def on_pre_scan(self) -> DeviceStatus | None: + """Called right before the scan starts on all devices automatically.""" + + def on_trigger(self) -> DeviceStatus | None: + """Called when the device is triggered.""" + + def on_complete(self) -> DeviceStatus | None: + """Called to inquire if a device has completed a scans.""" + if self.scan_info.msg.scan_name != "jfj_test": + return + def wait_for_complete(): + timeout = 10 + for _ in range(timeout): + try: + self.jfj_client.wait_till_done(timeout=1) + except TimeoutError: + continue + except Exception as e: + raise ValueError(f"Error in complete for {self.name}, exception: {e}") from e + else: + break + + status = self.task_handler.submit_task(wait_for_complete, run=True) + return status + + + def on_kickoff(self) -> DeviceStatus | None: + """Called to kickoff a device for a fly scan. Has to be called explicitly.""" + + def on_stop(self) -> None: + """Called when the device is stopped.""" + self.jfj_client.stop() \ No newline at end of file diff --git a/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py b/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py index 420462d..18e1065 100644 --- a/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py +++ b/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py @@ -10,6 +10,7 @@ from jfjoch_client.configuration import Configuration from jfjoch_client.models.broker_status import BrokerStatus from jfjoch_client.models.dataset_settings import DatasetSettings from jfjoch_client.models.detector_settings import DetectorSettings +from jfjoch_client.api_response import ApiResponse from ophyd import Device logger = bec_logger.logger @@ -39,6 +40,10 @@ class ResponseWaitDone(int, enum.Enum): DETECTOR_INACTIVE = 502 TIMEOUT_REACHED = 504 +class ResponseCancelDone(int, enum.Enum): + """ HTTP Response for cancel post""" + CANCEL_SENT_TO_FPGA = 200 + class JungfrauJochClient: """Thin wrapper around the Jungfrau Joch API client""" @@ -97,7 +102,7 @@ class JungfrauJochClient: settings = DetectorSettings(**settings) self.api.config_detector_put(detector_settings=settings) - def start_mesaurement(self, settings: dict | DatasetSettings) -> None: + def start(self, settings: dict | DatasetSettings) -> None: """Start the mesaurement. DatasetSettings must be provided, and JungfrauJoch must be in IDLE state. The method call is blocking and JungfrauJoch will be ready to measure after the call resolves. @@ -116,7 +121,7 @@ class JungfrauJochClient: if isinstance(settings, dict): settings = DatasetSettings(**settings) try: - res = self.api.start_post_with_http_info(dataset_settings=settings) + res:ApiResponse = self.api.start_post_with_http_info(dataset_settings=settings) if res.status_code != 200: response = f"Error in {self._parent_name}, while setting measurement settings {settings}, response: {res}" raise JungfrauJochClientError(response) @@ -127,6 +132,21 @@ class JungfrauJochClient: response = f"Error in {self._parent_name}, while setting measurement settings {settings}, exception: {e}" logger.error(response) raise JungfrauJochClientError(response) from e + + def stop(self) -> None: + """Stop the acquisition""" + try: + res:ApiResponse = self.api.cancel_post_with_http_info() # Should we use a timeout? + if res.status_code != ResponseCancelDone.CANCEL_SENT_TO_FPGA: + response = f"Error in device {self._parent_name} while stopping the measurement. Exception: {exc}" + raise JungfrauJochClientError(response) + except Exception as exc: + logger.error(f"") + logger.error(response) + raise(JungfrauJochClientError(response)) from exc + + + def wait_till_done(self, timeout: int = 5) -> None: """Wait for JungfrauJoch to be in Idle state. Blocking call with timeout.