Compare commits

...

3 Commits

Author SHA1 Message Date
gac-x12sa
39491e3189 feat: first draft for jungfraujoch ophyd 2024-10-09 16:38:25 +02:00
gac-x12sa
9dd96c9f6e feat: add jfj test scan, refactor ddg csaxs 2024-10-08 15:01:58 +02:00
gac-x12sa
c16642b848 refactor: simple refactoring of ddg for test purposes 2024-10-07 17:22:54 +02:00
7 changed files with 656 additions and 105 deletions

View File

@@ -0,0 +1,46 @@
ddg:
description: DelayGenerator for detector triggering
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DelayGeneratorcSAXS
deviceConfig:
prefix: 'X12SA-CPCL-DDG3:'
ddg_config:
delay_burst: 40.e-3
delta_width: 0
additional_triggers: 0
polarity:
- 1 # T0
- 0 # eiger and LeCroy4
- 1
- 1
- 1
amplitude: 4.5
offset: 0
thres_trig_level: 2.5
set_high_on_exposure: False
set_high_on_stage: False
deviceTags:
- cSAXS
- ddg_detectors
onFailure: buffer
enabled: true
readoutPriority: async
softwareTrigger: True
eiger_jfjoch:
description: DelayGenerator for detector triggering
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_jungfrau_joch.Eiger9McSAXS
deviceConfig:
deviceTags:
- cSAXS
- eiger9m
onFailure: buffer
enabled: true
readoutPriority: async
softwareTrigger: False
simulated_monitor:
readoutPriority: monitored
deviceClass: ophyd_devices.SimMonitor
deviceConfig:
deviceTags:
- beamline
enabled: true
readOnly: false

View File

@@ -1,5 +1,8 @@
import time
from bec_lib import bec_logger
from ophyd import Component
from ophyd import Component, DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_delay_generator_base import (
DDGCustomMixin,
PSIDelayGeneratorBase,
@@ -14,6 +17,165 @@ class DelayGeneratorError(Exception):
"""Exception raised for errors."""
# class DDGSetup(DDGCustomMixin):
# """
# Mixin class for DelayGenerator logic at cSAXS.
# At cSAXS, multiple DDGs were operated at the same time. There different behaviour is
# implemented in the ddg_config signals that are passed via the device config.
# """
# def initialize_default_parameter(self) -> None:
# """Method to initialize default parameters."""
# for ii, channel in enumerate(self.parent.all_channels):
# self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel])
# self.parent.set_channels("amplitude", self.parent.amplitude.get())
# self.parent.set_channels("offset", self.parent.offset.get())
# # Setup reference
# self.parent.set_channels(
# "reference", 0, [f"channel{pair}.ch1" for pair in self.parent.all_delay_pairs]
# )
# self.parent.set_channels(
# "reference", 0, [f"channel{pair}.ch2" for pair in self.parent.all_delay_pairs]
# )
# self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# # Set threshold level for ext. pulses
# self.parent.level.put(self.parent.thres_trig_level.get())
# def prepare_ddg(self) -> None:
# """
# Method to prepare scan logic of cSAXS
# Two scantypes are supported: "step" and "fly":
# - step: Scan is performed by stepping the motor and acquiring data at each step
# - fly: Scan is performed by moving the motor with a constant velocity and acquiring data
# Custom logic for different DDG behaviour during scans.
# - set_high_on_exposure : If True, then TTL signal is high during
# the full exposure time of the scan (all frames).
# E.g. Keep shutter open for the full scan.
# - fixed_ttl_width : fixed_ttl_width is a list of 5 values, one for each channel.
# If the value is 0, then the width of the TTL pulse is determined,
# no matter which parameters are passed from the scaninfo for exposure time
# - set_trigger_source : Specifies the default trigger source for the DDG. For cSAXS, relevant ones
# were: SINGLE_SHOT, EXT_RISING_EDGE
# """
# self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# # scantype "step"
# if self.parent.scaninfo.scan_type == "step":
# # High on exposure means that the signal
# if self.parent.set_high_on_exposure.get():
# # caluculate parameters
# num_burst_cycle = 1 + self.parent.additional_triggers.get()
# exp_time = (
# self.parent.delta_width.get()
# + self.parent.scaninfo.frames_per_trigger
# * (self.parent.scaninfo.exp_time + self.parent.scaninfo.readout_time)
# )
# total_exposure = exp_time
# delay_burst = self.parent.delay_burst.get()
# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
# if not self.parent.trigger_width.get():
# self.parent.set_channels("width", exp_time)
# else:
# self.parent.set_channels("width", self.parent.trigger_width.get())
# for value, channel in zip(
# self.parent.fixed_ttl_width.get(), self.parent.all_channels
# ):
# logger.debug(f"Trying to set DDG {channel} to {value}")
# if value != 0:
# self.parent.set_channels("width", value, channels=[channel])
# else:
# # caluculate parameters
# exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
# total_exposure = exp_time + self.parent.scaninfo.readout_time
# delay_burst = self.parent.delay_burst.get()
# num_burst_cycle = (
# self.parent.scaninfo.frames_per_trigger + self.parent.additional_triggers.get()
# )
# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
# if not self.parent.trigger_width.get():
# self.parent.set_channels("width", exp_time)
# else:
# self.parent.set_channels("width", self.parent.trigger_width.get())
# # scantype "fly"
# elif self.parent.scaninfo.scan_type == "fly":
# if self.parent.set_high_on_exposure.get():
# # caluculate parameters
# exp_time = (
# self.parent.delta_width.get()
# + self.parent.scaninfo.exp_time * self.parent.scaninfo.num_points
# + self.parent.scaninfo.readout_time * (self.parent.scaninfo.num_points - 1)
# )
# total_exposure = exp_time
# delay_burst = self.parent.delay_burst.get()
# num_burst_cycle = 1 + self.parent.additional_triggers.get()
# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
# if not self.parent.trigger_width.get():
# self.parent.set_channels("width", exp_time)
# else:
# self.parent.set_channels("width", self.parent.trigger_width.get())
# for value, channel in zip(
# self.parent.fixed_ttl_width.get(), self.parent.all_channels
# ):
# logger.debug(f"Trying to set DDG {channel} to {value}")
# if value != 0:
# self.parent.set_channels("width", value, channels=[channel])
# else:
# # caluculate parameters
# exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
# total_exposure = exp_time + self.parent.scaninfo.readout_time
# delay_burst = self.parent.delay_burst.get()
# num_burst_cycle = (
# self.parent.scaninfo.num_points + self.parent.additional_triggers.get()
# )
# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
# if not self.parent.trigger_width.get():
# self.parent.set_channels("width", exp_time)
# else:
# self.parent.set_channels("width", self.parent.trigger_width.get())
# else:
# raise Exception(f"Unknown scan type {self.parent.scaninfo.scan_type}")
# # Set common DDG parameters
# self.parent.burst_enable(num_burst_cycle, delay_burst, total_exposure, config="first")
# self.parent.set_channels("delay", 0.0)
# def on_trigger(self) -> None:
# """Method to be executed upon trigger"""
# if self.parent.source.read()[self.parent.source.name]["value"] == TriggerSource.SINGLE_SHOT:
# self.parent.trigger_shot.put(1)
# def check_scan_id(self) -> None:
# """
# Method to check if scan_id has changed.
# If yes, then it changes parent.stopped to True, which will stop further actions.
# """
# old_scan_id = self.parent.scaninfo.scan_id
# self.parent.scaninfo.load_scan_metadata()
# if self.parent.scaninfo.scan_id != old_scan_id:
# self.parent.stopped = True
# def finished(self) -> None:
# """Method checks if DDG finished acquisition"""
# def on_pre_scan(self) -> None:
# """
# Method called by pre_scan hook in parent class.
# Executes trigger if premove_trigger is Trus.
# """
# if self.parent.premove_trigger.get() is True:
# self.parent.trigger_shot.put(1)
class DDGSetup(DDGCustomMixin):
"""
Mixin class for DelayGenerator logic at cSAXS.
@@ -41,114 +203,93 @@ class DDGSetup(DDGCustomMixin):
self.parent.level.put(self.parent.thres_trig_level.get())
def prepare_ddg(self) -> None:
"""
Method to prepare scan logic of cSAXS
Two scantypes are supported: "step" and "fly":
- step: Scan is performed by stepping the motor and acquiring data at each step
- fly: Scan is performed by moving the motor with a constant velocity and acquiring data
Custom logic for different DDG behaviour during scans.
- set_high_on_exposure : If True, then TTL signal is high during
the full exposure time of the scan (all frames).
E.g. Keep shutter open for the full scan.
- fixed_ttl_width : fixed_ttl_width is a list of 5 values, one for each channel.
If the value is 0, then the width of the TTL pulse is determined,
no matter which parameters are passed from the scaninfo for exposure time
- set_trigger_source : Specifies the default trigger source for the DDG. For cSAXS, relevant ones
were: SINGLE_SHOT, EXT_RISING_EDGE
"""
self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# scantype "step"
if self.parent.scaninfo.scan_type == "step":
# High on exposure means that the signal
if self.parent.set_high_on_exposure.get():
# caluculate parameters
num_burst_cycle = 1 + self.parent.additional_triggers.get()
# scantype "jjf_test"
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "jjf_test":
# exp_time = self.parent.scaninfo.exp_time
# readout = self.parent.scaninfo.readout_time
# num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
# total_exposure = exp_time+readout
exp_time = 480e-6#self.parent.scaninfo.exp_time
readout = 20e-6#self.parent.scaninfo.readout_time
total_exposure = exp_time+readout
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure)
delay = 0
delay_burst = self.parent.delay_burst.get()
self.parent.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT)
self.parent.set_channels(signal='width', value=exp_time)
self.parent.set_channels(signal='delay', value=delay)
self.parent.burst_enable(count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first")
logger.info(f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}")
exp_time = (
self.parent.delta_width.get()
+ self.parent.scaninfo.frames_per_trigger
* (self.parent.scaninfo.exp_time + self.parent.scaninfo.readout_time)
)
total_exposure = exp_time
delay_burst = self.parent.delay_burst.get()
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
for value, channel in zip(
self.parent.fixed_ttl_width.get(), self.parent.all_channels
):
logger.debug(f"Trying to set DDG {channel} to {value}")
if value != 0:
self.parent.set_channels("width", value, channels=[channel])
else:
# caluculate parameters
exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
total_exposure = exp_time + self.parent.scaninfo.readout_time
delay_burst = self.parent.delay_burst.get()
num_burst_cycle = (
self.parent.scaninfo.frames_per_trigger + self.parent.additional_triggers.get()
)
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
# scantype "fly"
elif self.parent.scaninfo.scan_type == "fly":
if self.parent.set_high_on_exposure.get():
# caluculate parameters
exp_time = (
self.parent.delta_width.get()
+ self.parent.scaninfo.exp_time * self.parent.scaninfo.num_points
+ self.parent.scaninfo.readout_time * (self.parent.scaninfo.num_points - 1)
)
total_exposure = exp_time
delay_burst = self.parent.delay_burst.get()
num_burst_cycle = 1 + self.parent.additional_triggers.get()
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
for value, channel in zip(
self.parent.fixed_ttl_width.get(), self.parent.all_channels
):
logger.debug(f"Trying to set DDG {channel} to {value}")
if value != 0:
self.parent.set_channels("width", value, channels=[channel])
else:
# caluculate parameters
exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
total_exposure = exp_time + self.parent.scaninfo.readout_time
delay_burst = self.parent.delay_burst.get()
num_burst_cycle = (
self.parent.scaninfo.num_points + self.parent.additional_triggers.get()
)
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
else:
raise Exception(f"Unknown scan type {self.parent.scaninfo.scan_type}")
# Set common DDG parameters
self.parent.burst_enable(num_burst_cycle, delay_burst, total_exposure, config="first")
self.parent.set_channels("delay", 0.0)
def on_stage(self) -> None:
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "jjf_test":
exp_time = 480e-6#self.parent.scaninfo.exp_time
readout = 20e-6#self.parent.scaninfo.readout_time
total_exposure = exp_time+readout
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure)
self.parent.set_channels("width", exp_time)
self.parent.set_channels("delay", 0.0)
logger.info(f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}")
self.parent.burst_enable(num_burst_cycle, 0, total_exposure, config="first")
def on_trigger(self) -> None:
"""Method to be executed upon trigger"""
if self.parent.source.read()[self.parent.source.name]["value"] == TriggerSource.SINGLE_SHOT:
self.parent.trigger_shot.put(1)
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "jjf_test":
exp_time = 480e-6#self.parent.scaninfo.exp_time
readout = 20e-6#self.parent.scaninfo.readout_time
total_exposure = exp_time+readout
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure)
cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["cycles"]
#time.sleep(num_burst_cycle*total_exposure)
def check_ddg()->int:
self.parent.trigger_burst_readout.put(1)
return self.parent.burst_cycle_finished.get()
status = self.wait_with_status(signal_conditions=[(check_ddg, 1)],
timeout=num_burst_cycle*total_exposure+1,
check_stopped=True,
exception_on_timeout=DelayGeneratorError(f"{self.parent.name} run into timeout in complete call.")
)
logger.info(f"Return status {self.parent.name}")
return status
# timer = 0
# while True:
# self.parent.trigger_burst_readout.put(1)
# state = self.parent.burst_cycle_finished.get()
# if state == 1:
# break
# time.sleep(0.05)
# timer +=0.05
# if timer>3:
# raise TimeoutError(f"{self.parent.name} did not return. Bit state for end_burst_cycle is {state} for state")
def on_complete(self) -> DeviceStatus:
pass
# logger.info(f"On complete started for {self.parent.name}")
# scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
# if scan_name != "jjf_test":
# return None
# def check_ddg()->int:
# lambda r : self.parent.trigger_burst_readout.put(1)
# return lambda r: self.parent.burst_cycle_finished.get()
# status = self.wait_with_status(signal_conditions=[(check_ddg, 1)],
# timeout=3,
# check_stopped=True,
# exception_on_timeout=DelayGeneratorError(f"{self.parent.name} run into timeout in complete call.")
# )
# logger.info(f"Return status {self.parent.name}")
# return status
def check_scan_id(self) -> None:
"""
@@ -203,6 +344,7 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
custom_prepare_cls = DDGSetup
delay_burst = Component(
bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config"
)
@@ -288,7 +430,6 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
ddg_config=None,
**kwargs,
):
@@ -334,7 +475,6 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
configuration_attrs=configuration_attrs,
parent=parent,
device_manager=device_manager,
sim_mode=sim_mode,
**kwargs,
)
@@ -342,4 +482,48 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
if __name__ == "__main__":
# Start delay generator in simulation mode.
# Note: To run, access to Epics must be available.
dgen = DelayGeneratorcSAXS("delaygen:DG1:", name="dgen", sim_mode=True)
import time
config = {
"delay_burst": 40.0e-3,
"delta_width": 0,
"additional_triggers": 0,
"polarity": [1, 0, 1, 1, 1], # T0 # to eiger and lecroy4
"amplitude": 4.5,
"offset": 0,
"thres_trig_level": 2.5,
"set_high_on_exposure": False,
"set_high_on_stage": False,
}
start = time.time()
print(f"Start with init of DDG3 with config: {config}")
dgen = DelayGeneratorcSAXS("X12SA-CPCL-DDG3:", name="dgen", ddg_config=config)
print(f"Finished init after: {time.time()-start}s")
start = time.time()
print(f"Start setting up DDG3")
exp_time = 1/(2e3) # 2 kHz
readout = exp_time/10
delay = 0
num_burst_cycle = 1e4 # N triggers
total_exposure = exp_time+readout
delay_burst = dgen.delay_burst.get()
dgen.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT)
dgen.set_channels(signal='width', value=exp_time)
dgen.set_channels(signal='delay', value=0)
dgen.burst_enable(count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first")
print(f"Start sending {num_burst_cycle} triggers after {time.time()-start}s, ETA {num_burst_cycle*total_exposure}s")
break_time = time.time()
dgen.trigger()
# Wait here briefly for status to finish, whether this is realiable has to be tested
time.sleep(num_burst_cycle*total_exposure)
timer = 0
while True:
dgen.trigger_burst_readout.put(1)
state = dgen.burst_cycle_finished.get()
if state == 1:
break
time.sleep(0.05)
timer +=0.05
if timer>3:
raise TimeoutError(f"dgen.name did not return with value {state} for state")
print(f"Finished trigger cascade of {num_burst_cycle} with {exp_time}s -> {num_burst_cycle*exp_time}s after {time.time()-start}s in total, {break_time} for sending triggers.")

View File

@@ -0,0 +1,166 @@
import enum
import os
import threading
import time
from typing import Any
import numpy as np
import openapi_client
from bec_lib.logger import bec_logger
from openapi_client.models.dataset_settings import DatasetSettings
from ophyd import ADComponent as ADCpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from std_daq_client import StdDaqClient
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
logger = bec_logger.logger
class EigerError(Exception):
"""Base class for exceptions in this module."""
class EigerTimeoutError(EigerError):
"""Raised when the Eiger does not respond in time."""
class DetectorState(str, enum.Enum):
"""Detector states for Eiger9M detector"""
IDLE = "idle"
class ErrorState(int, enum.Enum):
""" Error State in the detector"""
ERROR_STATUS_WAITING = 504
class Eiger9MJungfrauJochSetup(CustomDetectorMixin):
"""Eiger setup class
Parent class: CustomDetectorMixin
"""
def __init__(self, *args, parent: Device = None, **kwargs) -> None:
super().__init__(*args, parent=parent, **kwargs)
# kwargs["host"] =
# kwargs["port"] =
configuration = openapi_client.Configuration(host="http://sls-jfjoch-001:8080")
api_client = openapi_client.ApiClient(configuration)
self.api = openapi_client.DefaultApi(api_client)
self.actually_init()
def actually_init(self):
status = self.get_daq_status()
if status != DetectorState.IDLE:
self.api.initialize_post()
self.actually_wait_till_done()
def actually_wait_till_done(self):
while True:
try:
done = self.api.wait_till_done_post()
except Exception as e: #TODO: be more specific
if e.status != ErrorState.ERROR_STATUS_WAITING:
print(e)
raise e
else:
#TODO: use number_of_triggers_left for progress...
if done is None: # seems we get None instead of: status == 200
return
time.sleep(0.1) #TODO
def get_daq_status(self):
return self.api.status_get().state
def on_stage(self):
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name != "jjf_test":
return
exp_time = self.parent.scaninfo.exp_time
readout = self.parent.scaninfo.readout_time
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
cycles = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["cycles"]
total_points = num_burst_cycle * cycles
dataset_settings = DatasetSettings(
image_time_us = int(exp_time*1e6),
ntrigger=total_points,
beam_x_pxl = 0,
beam_y_pxl = 0,
detector_distance_mm = 100,
incident_energy_ke_v = 10.00,
)
self.api.start_post(dataset_settings=dataset_settings)
def on_unstage(self):
pass
def on_complete(self):
logger.info("Starting complete for {self.parent.name}")
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name != "jjf_test":
return
def wait_till_done_post():
try:
done = self.api.wait_till_done_post(timeout=1)
except Exception as e: #TODO: be more specific
return False
else:
#TODO: use number_of_triggers_left for progress...
if done is None: # seems we get None instead of: status == 200
return True
return True
status = self.wait_with_status(signal_conditions=[(wait_till_done_post, True)], check_stopped=True,timeout=10)
return status
def on_stop(self):
self.api.cancel_post()
class TriggerSource(int, enum.Enum):
"""Trigger signals for Eiger9M detector"""
AUTO = 0
TRIGGER = 1
GATING = 2
BURST_TRIGGER = 3
class Eiger9McSAXS(PSIDetectorBase):
"""
Eiger9M detector for CSAXS
Parent class: PSIDetectorBase
class attributes:
custom_prepare_cls (FalconSetup) : Custom detector setup class for cSAXS,
inherits from CustomDetectorMixin
PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector
Various EpicsPVs for controlling the detector
"""
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = []
# specify Setup class
custom_prepare_cls = Eiger9MJungfrauJochSetup
# specify minimum readout time for detector and timeout for checks after unstage
MIN_READOUT = 20e-6
TIMEOUT_FOR_SIGNALS = 5
# specify class attributes
if __name__ == "__main__":
eiger = Eiger9McSAXS(name="eiger", prefix="X12SA-ES-EIGER9M:")
eiger.custom_prepare.client.init()

View File

@@ -0,0 +1,84 @@
import string
from time import sleep
import openapi_client
ERROR_STATUS_WAITING = 504
STATUS_IDLE = "Idle"
# allow / to enable creation of subfolders
ALLOWED_CHARS = set(
string.ascii_letters + string.digits + "_-+/"
)
def character_cleanup(s, default="_", allowed=ALLOWED_CHARS):
return "".join(i if i in allowed else default for i in s)
class JFJClient:
def __init__(self, host):
configuration = openapi_client.Configuration(host=host)
api_client = openapi_client.ApiClient(configuration)
self.api = openapi_client.DefaultApi(api_client)
self.actually_init()
def actually_init(self):
status = self.get_daq_status()
if status != STATUS_IDLE:
self.api.initialize_post()
self.actually_wait_till_done()
status = self.get_daq_status()
if status != STATUS_IDLE:
raise RuntimeError(f"status is not {STATUS_IDLE} but: {status}")
def actually_wait_till_done(self):
while True:
try:
done = self.api.wait_till_done_post()
except Exception as e: #TODO: be more specific
if e.status != ERROR_STATUS_WAITING:
print(e)
raise e
else:
#TODO: use number_of_triggers_left for progress...
if done is None: # seems we get None instead of: status == 200
return
sleep(0.1) #TODO
def get_daq_status(self):
return self.api.status_get().state
def get_detector_status(self):
return self.api.detector_status_get()#.state #TODO
def get_detectors(self):
return self.api.config_select_detector_get()
def get_detector_config(self):
return self.api.config_detector_get()
def acquire(self, file_prefix, **kwargs):
status = self.get_daq_status()
if status != STATUS_IDLE:
raise RuntimeError(f"status is not {STATUS_IDLE} but: {status}")
file_prefix = character_cleanup(file_prefix)
dataset_settings = openapi_client.DatasetSettings(file_prefix=file_prefix, **kwargs)
self.api.start_post(dataset_settings=dataset_settings)
self.actually_wait_till_done()
def take_pedestal(self):
self.api.pedestal_post()
self.actually_wait_till_done()

View File

@@ -1,4 +1,5 @@
from .flomni_fermat_scan import FlomniFermatScan
from .jungfrau_joch_scan import JungfrauJochTest
from .LamNIFermatScan import LamNIFermatScan, LamNIMoveToScanCenter
from .owis_grid import OwisGrid
from .sgalil_grid import SgalilGrid

View File

@@ -0,0 +1,70 @@
import time
from bec_lib import bec_logger
from bec_lib.device import DeviceBase
from bec_lib.endpoints import MessageEndpoints
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanAbortion
logger = bec_logger.logger
class JungfrauJochTest(AsyncFlyScanBase):
"""Owis-based grid scan."""
scan_name = "jjf_test"
#scan_report_hint = "device_progress"
required_kwargs = ["points", "exp_time", "readout_time"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
gui_config = {
"Acquisition Parameters": ["num_points", "cycles"],
"Exposure Parameters" : ["exp_time", "readout_time"]
}
def __init__(
self,
num_points:int,
exp_time: float,
readout_time: float,
cycles:int =1,
**kwargs,
):
"""
JungfrauJoch Test scan.
Args:
device (DeviceBase) : The device to be triggered, currently only for delaygenerator csaxs
num_points (int) : Number of points per burst
exp_time (float) : exposure time.
readout_time (float): readout time of detector
cycles (int) : number of cycles, default is 1
Example:
scans.jjf_test(points = 100, exp_time= 1e-3, readout_time=1e-3, cycles = 2)
"""
if readout_time <=0:
raise ScanAbortion(f"Readout time must be larger than 0, provided value {readout_time}")
super().__init__(exp_time=exp_time, readout_time=readout_time,**kwargs)
self.device = "ddg"
self.num_points = num_points
self.cycles = cycles
self.primary_readout_cycle = 0.2
def scan_core(self):
logger.info(f"Starting with Scan Core")
total_exposure = self.num_points * (self.exp_time + self.readout_time)
for i in range(self.cycles):
logger.info(f"Beginning cycle {i} of {self.cycles}")
yield from self.stubs.trigger(group="trigger", point_id=self.point_id)
yield from self.stubs.read_and_wait(
group="primary",
wait_group="readout_primary",
point_id=self.point_id,
)
yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=total_exposure)
self.point_id +=1
logger.info(f"Finished cycle {i} of {self.cycles}")
logger.info(f"Finished scan")
self.num_pos = self.point_id + 1