feat: added phoenix trigger and config

This commit is contained in:
gac-x07mb
2024-08-28 14:17:24 +02:00
parent 1c9c94f28f
commit 1f55dc8736
3 changed files with 149 additions and 257 deletions

View File

@ -1,3 +1,20 @@
falcon_nohdf5:
description: Falcon detector x-ray fluoresence II
deviceClass: phoenix_bec.devices.falcon_phoenix_no_hdf5.FalconPhoenix
deviceConfig:
prefix: 'X07MB-SITORO:'
deviceTags:
- phoenix
- falcon
- no hdf5
- phoenix_devices.yaml
onFailure: buffer
enabled: true
readoutPriority: async
softwareTrigger: false
################################################### ###################################################
# #
# phoenix standard devices (motors) # phoenix standard devices (motors)
@ -16,7 +33,7 @@ PH_TTL:
onFailure: buffer onFailure: buffer
enabled: true enabled: true
readoutPriority: monitored readoutPriority: monitored
softwareTrigger: false softwareTrigger: true
PH_Dummy: PH_Dummy:

View File

@ -97,6 +97,7 @@ class FalconHDF5Plugins(Device):
array_counter = Cpt(EpicsSignalWithRBV, "ArrayCounter", kind="config") array_counter = Cpt(EpicsSignalWithRBV, "ArrayCounter", kind="config")
""" """
class FalconSetup(CustomDetectorMixin): class FalconSetup(CustomDetectorMixin):
""" """
Falcon setup class for cSAXS Falcon setup class for cSAXS
@ -113,7 +114,7 @@ class FalconSetup(CustomDetectorMixin):
"""Initialize Falcon detector""" """Initialize Falcon detector"""
self.initialize_default_parameter() self.initialize_default_parameter()
self.initialize_detector() self.initialize_detector()
self.initialize_detector_backend() # self.initialize_detector_backend()
def initialize_default_parameter(self) -> None: def initialize_default_parameter(self) -> None:
""" """
@ -139,12 +140,12 @@ class FalconSetup(CustomDetectorMixin):
def initialize_detector(self) -> None: def initialize_detector(self) -> None:
"""Initialize Falcon detector""" """Initialize Falcon detector"""
self.stop_detector() self.stop_detector()
self.stop_detector_backend() # self.stop_detector_backend()
self.set_trigger( self.set_trigger(
mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0 mapping_mode=MappingSource.SPECTRUM, trigger_source=TriggerSource.GATE, ignore_gate=0
) )
# 1 Realtime # 1 Realtime
self.parent.preset_mode.put(1) self.parent.preset_mode.put(0)
# 0 Normal, 1 Inverted # 0 Normal, 1 Inverted
self.parent.input_logic_polarity.put(0) self.parent.input_logic_polarity.put(0)
# 0 Manual 1 Auto # 0 Manual 1 Auto
@ -154,30 +155,34 @@ class FalconSetup(CustomDetectorMixin):
def initialize_detector_backend(self) -> None: def initialize_detector_backend(self) -> None:
"""Initialize the detector backend for Falcon.""" """Initialize the detector backend for Falcon."""
w=0 w = 0
#---------------------------------------------------------------------- # ----------------------------------------------------------------------
#self.parent.hdf5.enable.put(1) # self.parent.hdf5.enable.put(1)
# file location of h5 layout for cSAXS # file location of h5 layout for cSAXS
#self.parent.hdf5.xml_file_name.put("layout.xml") # self.parent.hdf5.xml_file_name.put("layout.xml")
# TODO Check if lazy open is needed and wanted! # TODO Check if lazy open is needed and wanted!
#self.parent.hdf5.lazy_open.put(1) # self.parent.hdf5.lazy_open.put(1)
#self.parent.hdf5.temp_suffix.put("") # self.parent.hdf5.temp_suffix.put("")
# size of queue for number of spectra allowed in the buffer, if too small at high throughput, data is lost # size of queue for number of spectra allowed in the buffer, if too small at high throughput, data is lost
#self.parent.hdf5.queue_size.put(2000) # self.parent.hdf5.queue_size.put(2000)
# Segmentation into Spectra within EPICS, 1 is activate, 0 is deactivate # Segmentation into Spectra within EPICS, 1 is activate, 0 is deactivate
#self.parent.nd_array_mode.put(1) # self.parent.nd_array_mode.put(1)
def on_stage(self) -> None: def on_stage(self) -> None:
"""Prepare detector and backend for acquisition""" """Prepare detector and backend for acquisition"""
self.prepare_detector() self.prepare_detector()
self.prepare_data_backend() # self.prepare_data_backend()
self.publish_file_location(done=False, successful=False) # self.publish_file_location(done=False, successful=False)
# self.arm_acquisition()
def on_trigger(self) -> None:
"""Actions on pre_scan. This is performed AFTER stage, just before scan_core"""
self.arm_acquisition() self.arm_acquisition()
def prepare_detector(self) -> None: def prepare_detector(self) -> None:
"""Prepare detector for acquisition""" """Prepare detector for acquisition"""
self.set_trigger( self.set_trigger(
mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0 mapping_mode=MappingSource.SPECTRUM, trigger_source=TriggerSource.GATE, ignore_gate=0
) )
self.parent.preset_real.put(self.parent.scaninfo.exp_time) self.parent.preset_real.put(self.parent.scaninfo.exp_time)
self.parent.pixels_per_run.put( self.parent.pixels_per_run.put(
@ -186,7 +191,7 @@ class FalconSetup(CustomDetectorMixin):
def prepare_data_backend(self) -> None: def prepare_data_backend(self) -> None:
"""Prepare data backend for acquisition""" """Prepare data backend for acquisition"""
w=9 w = 9
""" -------------------------------------------------------------- """ --------------------------------------------------------------
self.parent.filepath.set( self.parent.filepath.set(
self.parent.filewriter.compile_full_filename(f"{self.parent.name}.h5") self.parent.filewriter.compile_full_filename(f"{self.parent.name}.h5")
@ -230,30 +235,31 @@ class FalconSetup(CustomDetectorMixin):
def on_complete(self) -> None: def on_complete(self) -> None:
"""Complete detector and backend""" """Complete detector and backend"""
#------------------------------------------------------------------ # ------------------------------------------------------------------
#self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS) # self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS)
#self.publish_file_location(done=True, successful=True) # self.publish_file_location(done=True, successful=True)
w=9 w = 9
def on_stop(self) -> None: def on_stop(self) -> None:
"""Stop detector and backend""" """Stop detector and backend"""
self.stop_detector() self.stop_detector()
#self.stop_detector_backend() # self.stop_detector_backend()
def stop_detector(self) -> None: def stop_detector(self) -> None:
"""Stops detector""" """Stops detector"""
self.parent.stop_all.put(1) self.parent.stop_all.put(1)
self.parent.erase_all.put(1) self.parent.erase_all.put(1)
#------------------------------------------------------------------- # -------------------------------------------------------------------
#signal_conditions = [ # signal_conditions = [
# (lambda: self.parent.state.read()[self.parent.state.name]["value"], DetectorState.DONE) # (lambda: self.parent.state.read()[self.parent.state.name]["value"], DetectorState.DONE)
#] # ]
#if not self.wait_for_signals( # if not self.wait_for_signals(
# signal_conditions=signal_conditions, # signal_conditions=signal_conditions,
# timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2, # timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2,
# all_signals=False, # all_signals=False,
#): # ):
# # Retry stop detector and wait for remaining time # # Retry stop detector and wait for remaining time
# raise FalconTimeoutError( # raise FalconTimeoutError(
# f"Failed to stop detector, timeout with state {signal_conditions[0][0]}" # f"Failed to stop detector, timeout with state {signal_conditions[0][0]}"
@ -261,8 +267,8 @@ class FalconSetup(CustomDetectorMixin):
def stop_detector_backend(self) -> None: def stop_detector_backend(self) -> None:
"""Stop the detector backend""" """Stop the detector backend"""
#self.parent.hdf5.capture.put(0) # self.parent.hdf5.capture.put(0)
w=0 w = 0
def finished(self, timeout: int = 5) -> None: def finished(self, timeout: int = 5) -> None:
"""Check if scan finished succesfully""" """Check if scan finished succesfully"""
@ -272,7 +278,7 @@ class FalconSetup(CustomDetectorMixin):
) )
signal_conditions = [ signal_conditions = [
(self.parent.dxp.current_pixel.get, total_frames), (self.parent.dxp.current_pixel.get, total_frames),
# (self.parent.hdf5.array_counter.get, total_frames), --------------------- # (self.parent.hdf5.array_counter.get, total_frames), ---------------------
] ]
if not self.wait_for_signals( if not self.wait_for_signals(
signal_conditions=signal_conditions, signal_conditions=signal_conditions,
@ -288,10 +294,6 @@ class FalconSetup(CustomDetectorMixin):
self.stop_detector() self.stop_detector()
self.stop_detector_backend() self.stop_detector_backend()
def set_trigger( def set_trigger(
self, mapping_mode: MappingSource, trigger_source: TriggerSource, ignore_gate: int = 0 self, mapping_mode: MappingSource, trigger_source: TriggerSource, ignore_gate: int = 0
) -> None: ) -> None:

View File

@ -1,25 +1,33 @@
import time import time
from ophyd import ( import enum
ADComponent as ADCpt, import numpy as np
Device,
DeviceStatus,
)
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import FormattedComponent as FCpt from ophyd import DeviceStatus, EpicsSignal, EpicsSignalRO, Kind
from ophyd import Device, EpicsSignal, EpicsSignalRO
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin from ophyd_devices.interfaces.base_classes.psi_detector_base import (
PSIDetectorBase,
CustomDetectorMixin,
)
from bec_lib import bec_logger, messages from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
logger = bec_logger.logger logger = bec_logger.logger
DETECTOR_TIMEOUT = 5 DETECTOR_TIMEOUT = 5
#class PhoenixTriggerError(Exce start_csmpl=Cpt(EPicsSignal,'START-CSMPL') # cont on / off
class PhoenixTriggerError(Exception):
"""PhoenixTrigger specific error"""
class SAMPLINGDONE(int, enum.Enum):
"""Sampling Done PV"""
RUNNING = 0
DONE = 1
class PhoenixTriggerSetup(CustomDetectorMixin): class PhoenixTriggerSetup(CustomDetectorMixin):
""" """
@ -28,237 +36,102 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
""" """
#self.acquire = self.parent.smpl.put(1) def on_stage(self) -> None:
#self.continuous_sampling_on = parent.start_cmpl.put(1) """Actions to take place on stage"""
#self.continuous_sampling_off = self.parent.start_cmpl.put(0) if self.parent.scaninfo.scan_type == "step":
self.parent.start_csmpl.set(0)
self.parent.total_cycles.set(1)
self.parent.smpl.put(1)
time.sleep(0.5)
self.parent.total_cycles.set(np.ceil(self.parent.scaninfo.exp_time * 5))
logger.info(f"Device {self.parent.name} was staged for step scan")
def __init__(self, *args, parent:Device = None, **kwargs): def on_unstage(self) -> None:
super().__init__(*args, parent=parent, **kwargs) """Actions to take place on unstage"""
self._counter = 0 self.on_stop()
def on_trigger(self) -> DeviceStatus:
"""Actions to be performed upon receiving a software trigger"""
# Check first that falcon is set to acquiring
falcon = self.parent.device_manager.devices.get("falcon_nohdf5", None)
if falcon is not None:
if self.wait_for_signals([(falcon.state.get, 1)], timeout=1):
raise PhoenixTriggerError(
f"Falcon not ready to take trigger after 1s timeout in trigger"
)
falcon.state.get() == 1 # Acquiring
if self.parent.scaninfo.scan_type == "step":
time.sleep(0.2)
self.parent.smpl.put(1)
# Minimum of 1 cycle has to be waited. Cycle == 0.2s
time.sleep(0.2)
# Trigger function from ophyd.Device returns a DeviceStatus. This function
# starts a process that creates a DeviceStatus, and waits for the signal_conditions
# self.parent.smpl_done.get to change to the value SAMPLINGDONE.DONE
# Once this takes place, the DeviceStatus.done flag will be set to True.
# When BEC calls trigger() on the devices, this method will be called assuming that
# the devices config softwareTrigger=True is set.
# In ScanBase, the _at_each_point function calls
# self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time)
# which ensures that the DeviceStatus object resolves before continuing, i.e. DeviceStatus.done = True
status = self.wait_with_status(
signal_conditions=[(self.parent.smpl_done.get, SAMPLINGDONE.DONE)],
timeout=5 * self.parent.scaninfo.exp_time, # Check if timeout is appropriate
check_stopped=True,
)
return status
def on_acquire(self): def on_stop(self) -> None:
self.parent.smpl.put(1) """Actions to stop the Device"""
print('on_aquire') # Put the Device in cont mode
self.parent.total_cycles.set(5)
self.parent.start_csmpl.set(1)
def on_cont_sample_on(self):
self.parent.start_csmpl.put(1)
print('on_cont_sample_on')
def on_cont_sample_off(self):
self.parent.start_csmpl.put(0)
print('on_cont_sample_off')
def on_done(self):
done_out = self.parent.smpl_done.get()
if done_out =='1':
done=True
if done_out == '0':
done=False
return done
def on_dwell(self,t):
" calculate cycles from time in sec "
cycles=self.parent.total_cycles.put(0)*5
def on_stage(self):
# is this called on each point in scan or just before scan ???
print('on stage')
self.parent.start_csmpl.put(0)
time.sleep(0.05)
cycles=self.parent.total_cycles.get()
time.sleep(0.05)
self.parent.total_cycles.put(0)
time.sleep(0.05)
self.parent.smpl.put(1) self.parent.smpl.put(1)
time.sleep(0.5) time.sleep(0.5)
print(cycles)
cycles=self.parent.total_cycles.put(cycles)
logger.success('PhoenixTrigger on stage')
def on_unstage(self):
# is this called on each point in scan or just before scan ???
print('on unstage')
#while self.parent.smpl_done.get()
self.parent.total_cycles.put(5)
time.sleep(0.3)
self.parent.start_csmpl.put(1)
time.sleep(0.3)
self.parent.smpl.put(1) self.parent.smpl.put(1)
time.sleep(2) time.sleep(0.2)
if self.parent.smpl_done.get() == SAMPLINGDONE.RUNNING:
return
self.parent.smpl.put(1) self.parent.smpl.put(1)
time.sleep(.5)
logger.success('PhoenixTrigger.on_unstage')
#def on_trigger(self):
# print('on_trigger')
# self.parent.start_smpl.put(1)
# logger.success('PhoenixTrigger on_trigger')
#
# return self.wait_with_status(
# [(self.parent.smpl_done.get, 1)])
# logger.success(' PhoenixTrigger on_trigger complete ')
# if success:
# status.set_finished()
# else:
# status.set_exception(TimeoutError())
# return status
#def on_complete(self):
# print('on_complete')
# timeout =10
# logger.success('XXXX complete %d XXXX' % success)
# success = self.wait_for_signals(
# [
# (self.parent.smpl_done.get, 0)
# ],
# timeout,
# check_stopped=True,
# all_signals=True
# )
# if success:
# status.set_finished()
# else:
# status.set_exception(TimeoutError())
# return status
# hoenixTrigger on_unstage ')
# self.parent.csmpl.put(1)
# self.parent.smpl.put(1)
# logger.success(' PhoenixTrigger on_unstage finished ')
#def on_trigger():
# print('on_trigger')
class PhoenixTrigger(PSIDetectorBase): class PhoenixTrigger(PSIDetectorBase):
""" """
Docstring: Docstring:
Class for PHOENIX TTL hardware trigger Class for PHOENIX TTL hardware trigger (X07MB-OP2:)
Parent class: PSIDetectorBase
class attributes:
custom_prepare_cls (PhoenixTriggerSetup) : Custom setup for TTL trigger at PHOENIX
inherits from CustomDetectorMixin
in __init__ of PSIDetecor bases
class is initialized
self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
The class PhoenixTrigger is the class to be called via yaml configuration file
the input arguments are defined by PSIDetectorBase,
and need to be given in the yaml configuration file.
To adress chanels such as 'X07MB-OP2:SMPL-DONE':
use prefix 'X07MB-OP2:' in the device definition in the yaml configuration file.
PSIDetectorBase(
prefix='',
*,Q
name,
kind=None,
parent=None,
device_manager=None,
**kwargs,
)
Args:
prefix (str): EPICS PV prefix for component (optional)
name (str): name of the device, as will be reported via read()
kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal
omitted -> readout ignored for read 'ophydobj.read()'
normal -> readout for read
config -> config parameter for 'ophydobj.read_configuration()'
hinted -> which attribute is readout for read
parent (object): instance of the parent device
device_manager (object): bec device manager
**kwargs: keyword arguments
File: /data/test/x07mb-test-bec/bec_deployment/ophyd_devices/ophyd_devices/interfaces/base_classes/psi_detector_base.py
Type: type
Subclasses: EpicsSignal
""" """
##################################################################
# Specify which functions are revealed to the user in BEC client
# only a set of predefined functions will be visible in dev.TTL
# The Variable USER_ACCESS contains an ascii list of functions which will be
# visible in dev.TTL as well
# Alternatively one couls also create 2nd instance of PhoenixTrigger,
# which is probably not ideal
USER_ACCESS = ["a_acquire"
,"a_cont_sample_on"
,"a_cont_sample_off"
,"prefix"
,"a_done"
,"a_done_cpt"
,"SMPL"]
#####################################################################
# specify Setup class into variable custom_prepare_cls
# in __init__ of PSIDetectorBase will the initialzed by
# self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
# making the instance of PSIDetectorBase availble in functions
custom_prepare_cls = PhoenixTriggerSetup custom_prepare_cls = PhoenixTriggerSetup
#############################################################3 start_csmpl = Cpt(
# Now use component to provide channel access EpicsSignal, "START-CSMPL", kind=Kind.config, put_complete=True
# when PhoenixTrigger is initialized, the parameters of the base class are ) # cont on / off
# inherided, most notable prefix, which is here X07MB-OP2: intr_count = Cpt(
# The input of Component=Cpt is Cpt(deviceClass,suffix) EpicsSignal, "INTR-COUNT", kind=Kind.config, put_complete=True
# if Cpt is used in a class, which has interited Device, here via: ) # conter run up
# (Here PhoenixTrigger <-- PSIDetectorBase <- Device total_cycles = Cpt(
# the Cpt will construct - magically- the Epics channel name EpicsSignal, "TOTAL-CYCLES", kind=Kind.config, put_complete=True
# EpicsPV = prefix+suffix, ) # cycles set
# for example smpl = Cpt(
# 'X07MB-OP2:' + 'START-CSMPL' -> 'X07MB-OP2:' + 'START-CSMPL' EpicsSignal, "SMPL", kind=Kind.config, put_complete=True
# ) # start sampling --> aquire
start_csmpl = Cpt(EpicsSignal, 'START-CSMPL') # cont on / off smpl_done = Cpt(
intr_count = Cpt(EpicsSignal,'INTR-COUNT') # conter run up EpicsSignalRO, "SMPL-DONE", kind=Kind.config
total_cycles = Cpt(EpicsSignal,'TOTAL-CYCLES') # cycles set ) # show trigger is done, consider using string=True
smpl = Cpt(EpicsSignal,'SMPL') # start sampling --> aquire
# Signal is of type string
smpl_done = Cpt(EpicsSignal,'SMPL-DONE',string=True) # show trigger is done
def a_acquire(self): if __name__ == "__main__":
self.custom_prepare.on_acquire() trigger = PhoenixTrigger(name="trigger", prefix="X07MB-OP2:")
trigger.wait_for_connection(all_signals=True)
trigger.read()
trigger.read_configuration()
def a_cont_sample_on(self): trigger.stage()
self.custom_prepare.on_cont_sample_on() status = trigger.trigger()
while status.done is False:
print(f" Waiting for status, flag is {status.done}")
time.sleep(0.2)
def a_cont_sample_off(self): trigger.unstage()
self.custom_prepare.on_cont_sample_off()
def a_done(self):
done=self.custom_prepare.on_done()
return done
def a_dwell(self):
self.custom_prepare.on_dwell()