feat: 1st changes

This commit is contained in:
gac-x07mb
2024-07-18 13:32:37 +02:00
parent 8c36484fec
commit 13dd35e7b3
13 changed files with 1127 additions and 0 deletions

View File

@ -0,0 +1 @@
from .phoenix import PhoenixBL_from_iphyhon_plugin

View File

@ -0,0 +1,88 @@
#from unittest import mock
import numpy as np
#import pandas
#import pytest
#from bec_lib import messages
#import device_server
#from ophyd import Component as Cpt
from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
#from ophyd import FormattedComponent as FCpt
#from ophyd import Kind, PVPositioner, Signal
#from ophyd.flyers import FlyerInterface
#from ophyd.pv_positioner import PVPositionerComparator
#from ophyd.status import DeviceStatus, SubscriptionStatus
from bec_lib.logger import bec_logger
logger = bec_logger.logger
import time as tt
#import ophyd
import os
import sys
#logger = bec_logger.logger
# load simulation
#bec.config.load_demo_config()
# .. define base path for directory with scripts
#
ophyd_devices_path='/data/test/x07mb-test-bec/bec_deployment/ophyd_devices'
p_path='/data/test/x07mb-test-bec/bec_deployment/bec_server_venv/lib/python3.11/site-packages'
if ophyd_devices_path not in sys.path:
sys.path.insert(1, os.path.expandvars(ophyd_devices_path))
#endif
if p_path not in sys.path:
sys.path.insert(1, os.path.expandvars(p_path))
class PhoenixBL_from_iphyhon_plugin():
"""
General class for PHOENIX beamline in ipython_plugins
"""
#define some epics channels
#scan_name = "phoenix_base"
def __init__(self):
"""
init PhoenixBL() in ConfigPHOENIX.config.phoenix
"""
import os
#from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
#from ophyd import Component as Cpt
#self.ScanX = EpicsMotor(name='ScanX',prefix='X07MB-ES-MA1:ScanX')
#self.ScanY = EpicsMotor(name='ScanY',prefix='X07MB-ES-MA1:ScanY')
#self.DIODE = EpicsSignal(name='SI',read_pv='X07MB-OP2-SAI_07:MEAN')
#self.SIG = Cpt(EpicsSignal,name='we',read_pv="X07MB-OP2-SAI_07:MEAN")
#self.SMPL = EpicsSignal(name='SMPL',read_pv='X07MB-OP2:SMPL')
#self.CYCLES = EpicsSignal(name='SMPL',read_pv='X07MB-OP2:TOTAL-CYCLES',write_pv='X07MB-OP2:TOTAL-CYCLES')
# load local configuration
print('init PhoenixBL')
self.path_scripts_local = '/data/test/x07mb-test-bec/bec_deployment/phoenix_bec/MyScripts/'
self.path_config_local = self.path_scripts_local + 'ConfigPHOENIX/' # base dir for local configurations
self.path_devices_local = self.path_config_local + 'device_config/' # local yamal file
self.file_device_conf = self.path_devices_local + 'phoenix_devices.yaml'
#bec.config.update_session_with_file(self.file_device_conf)
# last command created yaml backup, for now just move it away
#os.system('mv *.yaml '+Devices_local+'/recovery_configs')
#os.system('mv *.yaml tmp')
def read_def_config():
bec.config.update_session_with_file(self.file_device_conf)
def print_setup(self):
"""
docstring print_setup
"""
print(self.path_scripts_local)

View File

@ -34,3 +34,10 @@ to setup the prompts.
""" """
# pylint: disable=invalid-name, unused-import, import-error, undefined-variable, unused-variable, unused-argument, no-name-in-module # pylint: disable=invalid-name, unused-import, import-error, undefined-variable, unused-variable, unused-argument, no-name-in-module
# SETUP PROMPTS
bec._ip.prompts.username = "PHOENIX"
bec._ip.prompts.status = 1
from phoenix_bec.bec_ipython_client.plugins.phoenix import PhoenixBL_from_iphyhon_plugin
from phoenix_bec.devices.falcon_csaxs import FalconHDF5Plugins

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,70 @@
falcon:
description: Falcon detector x-ray fluoresence
deviceClass: csaxs_bec.devices.epics.falcon_csaxs.FalconcSAXS
deviceConfig:
prefix: 'X07MB-SITORO:'
deviceTags:
- cSAXS
- falcon
onFailure: buffer
enabled: true
readoutPriority: async
softwareTrigger: false
ScanYY: #
# MOTORS ES1
#
ScanXX:
readoutPriority: baseline
description: 'Horizontal sample position'
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X07MB-ES-MA1:ScanX'
onFailure: retry
enabled: true
readOnly: false
softwareTrigger: false
ScanYY:
readoutPriority: baseline
description: 'Horizontal sample position'
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X07MB-ES-MA1:ScanY'
onFailure: retry
enabled: true
readOnly: false
softwareTrigger: false
#
#
# DIODES from ES1 ADC
#
#
SAI_07_MEAN:
readoutPriority: monitored
description: DIODE
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: 'X07MB-OP2-SAI_07:MEAN'
deviceTags:
- PHOENIX
onFailure: buffer
enabled: true
readOnly: true
softwareTrigger: false
SAI_08_MEAN:
readoutPriority: monitored
description: DIODE
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: 'X07MB-OP2-SAI_08:MEAN'
deviceTags:
- PHOENIX
onFailure: buffer
enabled: true
readOnly: true
softwareTrigger: false

View File

@ -0,0 +1 @@
from .falcon_csaxs import FalconcSAXS

View File

@ -0,0 +1,345 @@
from bec_lib import bec_logger
from ophyd import Component
from ophyd_devices.interfaces.base_classes.psi_delay_generator_base import (
DDGCustomMixin,
PSIDelayGeneratorBase,
TriggerSource,
)
from ophyd_devices.utils import bec_utils
logger = bec_logger.logger
class DelayGeneratorError(Exception):
"""Exception raised for errors."""
class DDGSetup(DDGCustomMixin):
"""
Mixin class for DelayGenerator logic at cSAXS.
At cSAXS, multiple DDGs were operated at the same time. There different behaviour is
implemented in the ddg_config signals that are passed via the device config.
"""
def initialize_default_parameter(self) -> None:
"""Method to initialize default parameters."""
for ii, channel in enumerate(self.parent.all_channels):
self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel])
self.parent.set_channels("amplitude", self.parent.amplitude.get())
self.parent.set_channels("offset", self.parent.offset.get())
# Setup reference
self.parent.set_channels(
"reference", 0, [f"channel{pair}.ch1" for pair in self.parent.all_delay_pairs]
)
self.parent.set_channels(
"reference", 0, [f"channel{pair}.ch2" for pair in self.parent.all_delay_pairs]
)
self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# Set threshold level for ext. pulses
self.parent.level.put(self.parent.thres_trig_level.get())
def prepare_ddg(self) -> None:
"""
Method to prepare scan logic of cSAXS
Two scantypes are supported: "step" and "fly":
- step: Scan is performed by stepping the motor and acquiring data at each step
- fly: Scan is performed by moving the motor with a constant velocity and acquiring data
Custom logic for different DDG behaviour during scans.
- set_high_on_exposure : If True, then TTL signal is high during
the full exposure time of the scan (all frames).
E.g. Keep shutter open for the full scan.
- fixed_ttl_width : fixed_ttl_width is a list of 5 values, one for each channel.
If the value is 0, then the width of the TTL pulse is determined,
no matter which parameters are passed from the scaninfo for exposure time
- set_trigger_source : Specifies the default trigger source for the DDG. For cSAXS, relevant ones
were: SINGLE_SHOT, EXT_RISING_EDGE
"""
self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# scantype "step"
if self.parent.scaninfo.scan_type == "step":
# High on exposure means that the signal
if self.parent.set_high_on_exposure.get():
# caluculate parameters
num_burst_cycle = 1 + self.parent.additional_triggers.get()
exp_time = (
self.parent.delta_width.get()
+ self.parent.scaninfo.frames_per_trigger
* (self.parent.scaninfo.exp_time + self.parent.scaninfo.readout_time)
)
total_exposure = exp_time
delay_burst = self.parent.delay_burst.get()
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
for value, channel in zip(
self.parent.fixed_ttl_width.get(), self.parent.all_channels
):
logger.debug(f"Trying to set DDG {channel} to {value}")
if value != 0:
self.parent.set_channels("width", value, channels=[channel])
else:
# caluculate parameters
exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
total_exposure = exp_time + self.parent.scaninfo.readout_time
delay_burst = self.parent.delay_burst.get()
num_burst_cycle = (
self.parent.scaninfo.frames_per_trigger + self.parent.additional_triggers.get()
)
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
# scantype "fly"
elif self.parent.scaninfo.scan_type == "fly":
if self.parent.set_high_on_exposure.get():
# caluculate parameters
exp_time = (
self.parent.delta_width.get()
+ self.parent.scaninfo.exp_time * self.parent.scaninfo.num_points
+ self.parent.scaninfo.readout_time * (self.parent.scaninfo.num_points - 1)
)
total_exposure = exp_time
delay_burst = self.parent.delay_burst.get()
num_burst_cycle = 1 + self.parent.additional_triggers.get()
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
for value, channel in zip(
self.parent.fixed_ttl_width.get(), self.parent.all_channels
):
logger.debug(f"Trying to set DDG {channel} to {value}")
if value != 0:
self.parent.set_channels("width", value, channels=[channel])
else:
# caluculate parameters
exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
total_exposure = exp_time + self.parent.scaninfo.readout_time
delay_burst = self.parent.delay_burst.get()
num_burst_cycle = (
self.parent.scaninfo.num_points + self.parent.additional_triggers.get()
)
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
else:
raise Exception(f"Unknown scan type {self.parent.scaninfo.scan_type}")
# Set common DDG parameters
self.parent.burst_enable(num_burst_cycle, delay_burst, total_exposure, config="first")
self.parent.set_channels("delay", 0.0)
def on_trigger(self) -> None:
"""Method to be executed upon trigger"""
if self.parent.source.read()[self.parent.source.name]["value"] == TriggerSource.SINGLE_SHOT:
self.parent.trigger_shot.put(1)
def check_scan_id(self) -> None:
"""
Method to check if scan_id has changed.
If yes, then it changes parent.stopped to True, which will stop further actions.
"""
old_scan_id = self.parent.scaninfo.scan_id
self.parent.scaninfo.load_scan_metadata()
if self.parent.scaninfo.scan_id != old_scan_id:
self.parent.stopped = True
def finished(self) -> None:
"""Method checks if DDG finished acquisition"""
def on_pre_scan(self) -> None:
"""
Method called by pre_scan hook in parent class.
Executes trigger if premove_trigger is Trus.
"""
if self.parent.premove_trigger.get() is True:
self.parent.trigger_shot.put(1)
class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
"""
DG645 delay generator at cSAXS (multiple can be in use depending on the setup)
Default values for setting up DDG.
Note: checks of set calues are not (only partially) included, check manual for details on possible settings.
https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf
- delay_burst : (float >=0) Delay between trigger and first pulse in burst mode
- delta_width : (float >= 0) Add width to fast shutter signal to make sure its open during acquisition
- additional_triggers : (int) add additional triggers to burst mode (mcs card needs +1 triggers per line)
- polarity : (list of 0/1) polarity for different channels
- amplitude : (float) amplitude voltage of TTLs
- offset : (float) offset for ampltitude
- thres_trig_level : (float) threshold of trigger amplitude
Custom signals for logic in different DDGs during scans (for custom_prepare.prepare_ddg):
- set_high_on_exposure : (bool): if True, then TTL signal should go high during the full acquisition time of a scan.
# TODO trigger_width and fixed_ttl could be combined into single list.
- fixed_ttl_width : (list of either 1 or 0), one for each channel.
- trigger_width : (float) if fixed_ttl_width is True, then the width of the TTL pulse is set to this value.
- set_trigger_source : (TriggerSource) specifies the default trigger source for the DDG.
- premove_trigger : (bool) if True, then a trigger should be executed before the scan starts (to be implemented in on_pre_scan).
- set_high_on_stage : (bool) if True, then TTL signal should go high already on stage.
"""
custom_prepare_cls = DDGSetup
delay_burst = Component(
bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config"
)
delta_width = Component(
bec_utils.ConfigSignal, name="delta_width", kind="config", config_storage_name="ddg_config"
)
additional_triggers = Component(
bec_utils.ConfigSignal,
name="additional_triggers",
kind="config",
config_storage_name="ddg_config",
)
polarity = Component(
bec_utils.ConfigSignal, name="polarity", kind="config", config_storage_name="ddg_config"
)
fixed_ttl_width = Component(
bec_utils.ConfigSignal,
name="fixed_ttl_width",
kind="config",
config_storage_name="ddg_config",
)
amplitude = Component(
bec_utils.ConfigSignal, name="amplitude", kind="config", config_storage_name="ddg_config"
)
offset = Component(
bec_utils.ConfigSignal, name="offset", kind="config", config_storage_name="ddg_config"
)
thres_trig_level = Component(
bec_utils.ConfigSignal,
name="thres_trig_level",
kind="config",
config_storage_name="ddg_config",
)
set_high_on_exposure = Component(
bec_utils.ConfigSignal,
name="set_high_on_exposure",
kind="config",
config_storage_name="ddg_config",
)
set_high_on_stage = Component(
bec_utils.ConfigSignal,
name="set_high_on_stage",
kind="config",
config_storage_name="ddg_config",
)
set_trigger_source = Component(
bec_utils.ConfigSignal,
name="set_trigger_source",
kind="config",
config_storage_name="ddg_config",
)
trigger_width = Component(
bec_utils.ConfigSignal,
name="trigger_width",
kind="config",
config_storage_name="ddg_config",
)
premove_trigger = Component(
bec_utils.ConfigSignal,
name="premove_trigger",
kind="config",
config_storage_name="ddg_config",
)
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
ddg_config=None,
**kwargs,
):
"""
Args:
prefix (str, optional): Prefix of the device. Defaults to "".
name (str): Name of the device.
kind (str, optional): Kind of the device. Defaults to None.
read_attrs (list, optional): List of attributes to read. Defaults to None.
configuration_attrs (list, optional): List of attributes to configure. Defaults to None.
parent (Device, optional): Parent device. Defaults to None.
device_manager (DeviceManagerBase, optional): DeviceManagerBase object. Defaults to None.
sim_mode (bool, optional): Simulation mode flag. Defaults to False.
ddg_config (dict, optional): Dictionary of ddg_config signals. Defaults to None.
"""
# Default values for ddg_config signals
self.ddg_config = {
# Setup default values
f"{name}_delay_burst": 0,
f"{name}_delta_width": 0,
f"{name}_additional_triggers": 0,
f"{name}_polarity": [1, 1, 1, 1, 1],
f"{name}_amplitude": 4.5,
f"{name}_offset": 0,
f"{name}_thres_trig_level": 2.5,
# Values for different behaviour during scans
f"{name}_fixed_ttl_width": [0, 0, 0, 0, 0],
f"{name}_trigger_width": None,
f"{name}_set_high_on_exposure": False,
f"{name}_set_high_on_stage": False,
f"{name}_set_trigger_source": "SINGLE_SHOT",
f"{name}_premove_trigger": False,
}
if ddg_config is not None:
# pylint: disable=expression-not-assigned
[self.ddg_config.update({f"{name}_{key}": value}) for key, value in ddg_config.items()]
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
device_manager=device_manager,
sim_mode=sim_mode,
**kwargs,
)
if __name__ == "__main__":
# Start delay generator in simulation mode.
# Note: To run, access to Epics must be available.
dgen = DelayGeneratorcSAXS("delaygen:DG1:", name="dgen", sim_mode=True)

View File

@ -0,0 +1,357 @@
import enum
import os
import threading
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd.mca import EpicsMCARecord
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
logger = bec_logger.logger
class FalconError(Exception):
"""Base class for exceptions in this module."""
class FalconTimeoutError(FalconError):
"""Raised when the Falcon does not respond in time."""
class DetectorState(enum.IntEnum):
"""Detector states for Falcon detector"""
DONE = 0
ACQUIRING = 1
class TriggerSource(enum.IntEnum):
"""Trigger source for Falcon detector"""
USER = 0
GATE = 1
SYNC = 2
class MappingSource(enum.IntEnum):
"""Mapping source for Falcon detector"""
SPECTRUM = 0
MAPPING = 1
class EpicsDXPFalcon(Device):
"""
DXP parameters for Falcon detector
Base class to map EPICS PVs from DXP parameters to ophyd signals.
"""
elapsed_live_time = Cpt(EpicsSignal, "ElapsedLiveTime")
elapsed_real_time = Cpt(EpicsSignal, "ElapsedRealTime")
elapsed_trigger_live_time = Cpt(EpicsSignal, "ElapsedTriggerLiveTime")
# Energy Filter PVs
energy_threshold = Cpt(EpicsSignalWithRBV, "DetectionThreshold")
min_pulse_separation = Cpt(EpicsSignalWithRBV, "MinPulsePairSeparation")
detection_filter = Cpt(EpicsSignalWithRBV, "DetectionFilter", string=True)
scale_factor = Cpt(EpicsSignalWithRBV, "ScaleFactor")
risetime_optimisation = Cpt(EpicsSignalWithRBV, "RisetimeOptimization")
# Misc PVs
detector_polarity = Cpt(EpicsSignalWithRBV, "DetectorPolarity")
decay_time = Cpt(EpicsSignalWithRBV, "DecayTime")
current_pixel = Cpt(EpicsSignalRO, "CurrentPixel")
class FalconHDF5Plugins(Device):
"""
HDF5 parameters for Falcon detector
Base class to map EPICS PVs from HDF5 Plugin to ophyd signals.
"""
""" ----------------------------------------------------------------------------
capture = Cpt(EpicsSignalWithRBV, "Capture")
enable = Cpt(EpicsSignalWithRBV, "EnableCallbacks", string=True, kind="config")
xml_file_name = Cpt(EpicsSignalWithRBV, "XMLFileName", string=True, kind="config")
lazy_open = Cpt(EpicsSignalWithRBV, "LazyOpen", string=True, doc="0='No' 1='Yes'")
temp_suffix = Cpt(EpicsSignalWithRBV, "TempSuffix", string=True)
file_path = Cpt(EpicsSignalWithRBV, "FilePath", string=True, kind="config")
file_name = Cpt(EpicsSignalWithRBV, "FileName", string=True, kind="config")
file_template = Cpt(EpicsSignalWithRBV, "FileTemplate", string=True, kind="config")
num_capture = Cpt(EpicsSignalWithRBV, "NumCapture", kind="config")
file_write_mode = Cpt(EpicsSignalWithRBV, "FileWriteMode", kind="config")
queue_size = Cpt(EpicsSignalWithRBV, "QueueSize", kind="config")
array_counter = Cpt(EpicsSignalWithRBV, "ArrayCounter", kind="config")
"""
class FalconSetup(CustomDetectorMixin):
"""
Falcon setup class for cSAXS
Parent class: CustomDetectorMixin
"""
def __init__(self, *args, parent: Device = None, **kwargs) -> None:
super().__init__(*args, parent=parent, **kwargs)
self._lock = threading.RLock()
def on_init(self) -> None:
"""Initialize Falcon detector"""
self.initialize_default_parameter()
self.initialize_detector()
self.initialize_detector_backend()
def initialize_default_parameter(self) -> None:
"""
Set default parameters for Falcon
This will set:
- readout (float): readout time in seconds
- value_pixel_per_buffer (int): number of spectra in buffer of Falcon Sitoro
"""
self.parent.value_pixel_per_buffer = 20
self.update_readout_time()
def update_readout_time(self) -> None:
"""Set readout time for Eiger9M detector"""
readout_time = (
self.parent.scaninfo.readout_time
if hasattr(self.parent.scaninfo, "readout_time")
else self.parent.MIN_READOUT
)
self.parent.readout_time = max(readout_time, self.parent.MIN_READOUT)
def initialize_detector(self) -> None:
"""Initialize Falcon detector"""
self.stop_detector()
self.stop_detector_backend()
self.set_trigger(
mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0
)
# 1 Realtime
self.parent.preset_mode.put(1)
# 0 Normal, 1 Inverted
self.parent.input_logic_polarity.put(0)
# 0 Manual 1 Auto
self.parent.auto_pixels_per_buffer.put(0)
# Sets the number of pixels/spectra in the buffer
self.parent.pixels_per_buffer.put(self.parent.value_pixel_per_buffer)
def initialize_detector_backend(self) -> None:
"""Initialize the detector backend for Falcon."""
w=0
#----------------------------------------------------------------------
#self.parent.hdf5.enable.put(1)
# file location of h5 layout for cSAXS
#self.parent.hdf5.xml_file_name.put("layout.xml")
# TODO Check if lazy open is needed and wanted!
#self.parent.hdf5.lazy_open.put(1)
#self.parent.hdf5.temp_suffix.put("")
# size of queue for number of spectra allowed in the buffer, if too small at high throughput, data is lost
#self.parent.hdf5.queue_size.put(2000)
# Segmentation into Spectra within EPICS, 1 is activate, 0 is deactivate
#self.parent.nd_array_mode.put(1)
def on_stage(self) -> None:
"""Prepare detector and backend for acquisition"""
self.prepare_detector()
self.prepare_data_backend()
self.publish_file_location(done=False, successful=False)
self.arm_acquisition()
def prepare_detector(self) -> None:
"""Prepare detector for acquisition"""
self.set_trigger(
mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0
)
self.parent.preset_real.put(self.parent.scaninfo.exp_time)
self.parent.pixels_per_run.put(
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
)
def prepare_data_backend(self) -> None:
"""Prepare data backend for acquisition"""
w=9
""" --------------------------------------------------------------
self.parent.filepath.set(
self.parent.filewriter.compile_full_filename(f"{self.parent.name}.h5")
).wait()
file_path, file_name = os.path.split(self.parent.filepath.get())
self.parent.hdf5.file_path.put(file_path)
self.parent.hdf5.file_name.put(file_name)
self.parent.hdf5.file_template.put("%s%s")
self.parent.hdf5.num_capture.put(
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
)
self.parent.hdf5.file_write_mode.put(2)
# Reset spectrum counter in filewriter, used for indexing & identifying missing triggers
self.parent.hdf5.array_counter.put(0)
# Start file writing
self.parent.hdf5.capture.put(1)
"""
def arm_acquisition(self) -> None:
"""Arm detector for acquisition"""
self.parent.start_all.put(1)
signal_conditions = [
(
lambda: self.parent.state.read()[self.parent.state.name]["value"],
DetectorState.ACQUIRING,
)
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
check_stopped=True,
all_signals=False,
):
raise FalconTimeoutError(
f"Failed to arm the acquisition. Detector state {signal_conditions[0][0]}"
)
def on_unstage(self) -> None:
"""Unstage detector and backend"""
pass
def on_complete(self) -> None:
"""Complete detector and backend"""
#------------------------------------------------------------------
#self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS)
#self.publish_file_location(done=True, successful=True)
w=9
def on_stop(self) -> None:
"""Stop detector and backend"""
self.stop_detector()
#self.stop_detector_backend()
def stop_detector(self) -> None:
"""Stops detector"""
self.parent.stop_all.put(1)
self.parent.erase_all.put(1)
#-------------------------------------------------------------------
#signal_conditions = [
# (lambda: self.parent.state.read()[self.parent.state.name]["value"], DetectorState.DONE)
#]
#if not self.wait_for_signals(
# signal_conditions=signal_conditions,
# timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2,
# all_signals=False,
#):
# # Retry stop detector and wait for remaining time
# raise FalconTimeoutError(
# f"Failed to stop detector, timeout with state {signal_conditions[0][0]}"
# )
def stop_detector_backend(self) -> None:
"""Stop the detector backend"""
#self.parent.hdf5.capture.put(0)
w=0
def finished(self, timeout: int = 5) -> None:
"""Check if scan finished succesfully"""
with self._lock:
total_frames = int(
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
)
signal_conditions = [
(self.parent.dxp.current_pixel.get, total_frames),
# (self.parent.hdf5.array_counter.get, total_frames), ---------------------
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=timeout,
check_stopped=True,
all_signals=True,
):
logger.debug(
f"Falcon missed a trigger: received trigger {self.parent.dxp.current_pixel.get()},"
f" send data {self.parent.hdf5.array_counter.get()} from total_frames"
f" {total_frames}"
)
self.stop_detector()
self.stop_detector_backend()
def set_trigger(
self, mapping_mode: MappingSource, trigger_source: TriggerSource, ignore_gate: int = 0
) -> None:
"""
Set triggering mode for detector
Args:
mapping_mode (MappingSource): Mapping mode for the detector
trigger_source (TriggerSource): Trigger source for the detector, pixel_advance_signal
ignore_gate (int): Ignore gate from TTL signal; defaults to 0
"""
mapping = int(mapping_mode)
trigger = trigger_source
self.parent.collect_mode.put(mapping)
self.parent.pixel_advance_mode.put(trigger)
self.parent.ignore_gate.put(ignore_gate)
class FalconcSAXS(PSIDetectorBase):
"""
Falcon Sitoro detector for CSAXS
Parent class: PSIDetectorBase
class attributes:
custom_prepare_cls (FalconSetup) : Custom detector setup class for cSAXS,
inherits from CustomDetectorMixin
PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector
dxp (EpicsDXPFalcon) : DXP parameters for Falcon detector
mca (EpicsMCARecord) : MCA parameters for Falcon detector
hdf5 (FalconHDF5Plugins) : HDF5 parameters for Falcon detector
MIN_READOUT (float) : Minimum readout time for the detector
"""
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = ["describe"]
# specify Setup class
custom_prepare_cls = FalconSetup
# specify minimum readout time for detector
MIN_READOUT = 3e-3
TIMEOUT_FOR_SIGNALS = 5
# specify class attributes
dxp = Cpt(EpicsDXPFalcon, "dxp1:")
mca = Cpt(EpicsMCARecord, "mca1")
hdf5 = Cpt(FalconHDF5Plugins, "HDF1:")
stop_all = Cpt(EpicsSignal, "StopAll")
erase_all = Cpt(EpicsSignal, "EraseAll")
start_all = Cpt(EpicsSignal, "StartAll")
state = Cpt(EpicsSignal, "Acquiring")
preset_mode = Cpt(EpicsSignal, "PresetMode") # 0 No preset 1 Real time 2 Events 3 Triggers
preset_real = Cpt(EpicsSignal, "PresetReal")
preset_events = Cpt(EpicsSignal, "PresetEvents")
preset_triggers = Cpt(EpicsSignal, "PresetTriggers")
triggers = Cpt(EpicsSignalRO, "MaxTriggers", lazy=True)
events = Cpt(EpicsSignalRO, "MaxEvents", lazy=True)
input_count_rate = Cpt(EpicsSignalRO, "MaxInputCountRate", lazy=True)
output_count_rate = Cpt(EpicsSignalRO, "MaxOutputCountRate", lazy=True)
collect_mode = Cpt(EpicsSignal, "CollectMode") # 0 MCA spectra, 1 MCA mapping
pixel_advance_mode = Cpt(EpicsSignal, "PixelAdvanceMode")
ignore_gate = Cpt(EpicsSignal, "IgnoreGate")
input_logic_polarity = Cpt(EpicsSignal, "InputLogicPolarity")
auto_pixels_per_buffer = Cpt(EpicsSignal, "AutoPixelsPerBuffer")
pixels_per_buffer = Cpt(EpicsSignal, "PixelsPerBuffer")
pixels_per_run = Cpt(EpicsSignal, "PixelsPerRun")
nd_array_mode = Cpt(EpicsSignal, "NDArrayMode")
if __name__ == "__main__":
falcon = FalconcSAXS(name="falcon", prefix="X12SA-SITORO:", sim_mode=True)

View File

@ -0,0 +1,57 @@
#
# MOTORS ES1
#
ScanXX:
readoutPriority: baseline
description: 'Horizontal sample position'
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X07MB-ES-MA1:ScanX'
onFailure: retry
enabled: true
readOnly: false
softwareTrigger: false
ScanYY:
readoutPriority: baseline
description: 'Horizontal sample position'
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X07MB-ES-MA1:ScanY'
onFailure: retry
enabled: true
readOnly: false
softwareTrigger: false
#
#
# DIODES from ES1 ADC
#
#
SAI_07_MEAN:
readoutPriority: monitored
description: DIODE
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: 'X07MB-OP2-SAI_07:MEAN'
deviceTags:
- PHOENIX
onFailure: buffer
enabled: true
readOnly: true
softwareTrigger: false
SAI_08_MEAN:
readoutPriority: monitored
description: DIODE
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
auto_monitor: true
read_pv: 'X07MB-OP2-SAI_08:MEAN'
deviceTags:
- PHOENIX
onFailure: buffer
enabled: true
readOnly: true
softwareTrigger: false

View File

@ -0,0 +1,2 @@
from .phoenix_line_scan import PhoenixLineScan

View File

@ -0,0 +1,114 @@
"""
SCAN PLUGINS
All new scans should be derived from ScanBase. ScanBase provides various methods that can be customized and overriden
but they are executed in a specific order:
- self.initialize # initialize the class if needed
- self.read_scan_motors # used to retrieve the start position (and the relative position shift if needed)
- self.prepare_positions # prepare the positions for the scan. The preparation is split into multiple sub fuctions:
- self._calculate_positions # calculate the positions
- self._set_positions_offset # apply the previously retrieved scan position shift (if needed)
- self._check_limits # tests to ensure the limits won't be reached
- self.open_scan # send an open_scan message including the scan name, the number of points and the scan motor names
- self.stage # stage all devices for the upcoming acquisiton
- self.run_baseline_readings # read all devices to get a baseline for the upcoming scan
- self.pre_scan # perform additional actions before the scan starts
- self.scan_core # run a loop over all position
- self._at_each_point(ind, pos) # called at each position with the current index and the target positions as arguments
- self.finalize # clean up the scan, e.g. move back to the start position; wait everything to finish
- self.unstage # unstage all devices that have been staged before
- self.cleanup # send a close scan message and perform additional cleanups if needed
"""
# import time
# import numpy as np
# from bec_lib.endpoints import MessageEndpoints
# from bec_lib.logger import bec_logger
# from bec_lib import messages
# from bec_server.scan_server.errors import ScanAbortion
# from bec_server.scan_server.scans import FlyScanBase, RequestBase, ScanArgType, ScanBase
# logger = bec_logger.logger
from bec_server.scan_server.scans import ScanBase, ScanArgType
import numpy as np
import time
from bec_lib.logger import bec_logger
logger = bec_logger.logger
class PhoenixLineScan(ScanBase):
scan_name = "phoenix_line_scanZZZ"
required_kwargs = ["steps", "relative"]
arg_input = {
"device": ScanArgType.DEVICE,
"start": ScanArgType.FLOAT,
"stop": ScanArgType.FLOAT,
}
arg_bundle_size = {"bundle": len(arg_input), "min": 1, "max": None}
gui_config = {
"Movement Parameters": ["steps", "relative"],
"Acquisition Parameters": ["exp_time", "burst_at_each_point"],
}
def __init__(
self,
*args,
exp_time: float = 0,
steps: int = None,
relative: bool = False,
burst_at_each_point: int = 1,
setup_device:str= None,
**kwargs,
):
"""
A phoenix line scan for one or more motors.
Args:
*args (Device, float, float): pairs of device / start position / end position
exp_time (float): exposure time in s. Default: 0
steps (int): number of steps. Default: 10
relative (bool): if True, the start and end positions are relative to the current position. Default: False
burst_a Specifies the level of type checking analysis to perform.
ans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, relative=True)
"""
super().__init__(
exp_time=exp_time, relative=relative, burst_at_each_point=burst_at_each_point, **kwargs
)
self.steps = steps
self.setup_device = setup_device
print('INIT CLASS PhoenixLineScan')
time.sleep(1)
def _calculate_positions(self) -> None:
axis = []
for _, val in self.caller_args.items():
ax_pos = np.linspace(val[0], val[1], self.steps, dtype=float)
axis.append(ax_pos)
self.positions = np.array(list(zip(*axis)), dtype=float)
def _at_each_point(self, ind=None, pos=None):
yield from self._move_scan_motors_and_wait(pos)
if ind > 0:
yield from self.stubs.wait(
wait_type="read", group="primary", wait_group="readout_primary"
)
time.sleep(self.settling_time)
if self.setup_device:
yield from self.stubs.send_rpc_and_wait(self.setup_device, "velocity.set", 1)
yield from self.stubs.trigger(group="trigger", point_id=self.point_id)
yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time)
yield from self.stubs.read(
group="primary", wait_group="readout_primary", point_id=self.point_id
)
yield from self.stubs.wait(
wait_type="read", group="scan_motor", wait_group="readout_primary"
)
self.point_id += 1

View File

View File

@ -0,0 +1,84 @@
#from unittest import mock
import numpy as np
#import pandas
#import pytest
#from bec_lib import messages
#import device_server
#from ophyd import Component as Cpt
from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
#from ophyd import FormattedComponent as FCpt
#from ophyd import Kind, PVPositioner, Signal
#from ophyd.flyers import FlyerInterface
#from ophyd.pv_positioner import PVPositionerComparator
#from ophyd.status import DeviceStatus, SubscriptionStatus
from bec_lib.logger import bec_logger
logger = bec_logger.logger
import time as tt
#import ophyd
import os
import sys
#logger = bec_logger.logger
# load simulation
#bec.config.load_demo_config()
# .. define base path for directory with scripts
class PhoenixBL_from_phoenic_bec_scripts():
"""
General class for PHOENIX beamline
"""
#define some epics channels
#scan_name = "phoenix_base"
def __init__(self):
"""
init PhoenixBL() in phoenix_bec/scripts
"""
import os
#from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
#from ophyd import Component as Cpt
#self.ScanX = EpicsMotor(name='ScanX',prefix='X07MB-ES-MA1:ScanX')
#self.ScanY = EpicsMotor(name='ScanY',prefix='X07MB-ES-MA1:ScanY')
#self.DIODE = EpicsSignal(name='SI',read_pv='X07MB-OP2-SAI_07:MEAN')
#self.SIG = Cpt(EpicsSignal,name='we',read_pv="X07MB-OP2-SAI_07:MEAN")
#self.SMPL = EpicsSignal(name='SMPL',read_pv='X07MB-OP2:SMPL')
#self.CYCLES = EpicsSignal(name='SMPL',read_pv='X07MB-OP2:TOTAL-CYCLES',write_pv='X07MB-OP2:TOTAL-CYCLES')
# load local configuration
print('init PhoenixBL after ')
self.path_scripts_local = '/data/test/x07mb-test-bec/bec_deployment/phoenix_bec/MyScripts/'
self.path_config_local = self.path_scripts_local + 'ConfigPHOENIX/' # base dir for local configurations
self.path_devices_local = self.path_config_local + 'device_config/' # local yamal file
self.file_device_conf = self.path_devices_local + 'phoenix_devices.yaml'
#bec.config.update_session_with_file(self.file_device_conf)
# last command created yaml backup, for now just move it away
#os.system('mv *.yaml '+Devices_local+'/recovery_configs')
#os.system('mv *.yaml tmp')
def read_def_config(self):
bec.config.update_session_with_file(self.file_device_conf)
def print_setup(self):
"""
docstring print_setup
"""
print(self.path_scripts_local)
def test_func():
print('ttt')