rewrite Falcon/Sitoro using a new base class in sitoro.py, which is a copy-edited version of mca.py. mca.py works fine with XMAP, we now consider Falcon/sitoro as completely different device. This is a first working version, do dxp and no mca support yet. Structure of code is along the lines discussed with Xioquiang Wang

This commit is contained in:
gac-x07mb
2024-10-18 17:17:13 +02:00
committed by wakonig_k
parent bae057995f
commit cea11ff48f
4 changed files with 924 additions and 6 deletions

View File

@ -405,18 +405,18 @@ class FalconPhoenix(PSIDetectorBase):
# dxp1 = Cpt(Mca.EpicsDXP, "dxp1:")
dxp1 = Cpt(EpicsDXPFalcon, "dxp1:")
dxp2 = Cpt(EpicsDXPFalcon, "dxp2:")
dxp3 = Cpt(EpicsDXPFalcon, "dxp3:")
dxp4 = Cpt(EpicsDXPFalcon, "dxp4:")
# dxp2 = Cpt(EpicsDXPFalcon, "dxp2:")
# dxp3 = Cpt(EpicsDXPFalcon, "dxp3:")
# dxp4 = Cpt(EpicsDXPFalcon, "dxp4:")
#
# THIS IS NOT WELL-DONE as it take out one part of mca.py from ophy.py
#
#
mca1 = Cpt(EpicsMCARecordExtended, "mca1")
mca2 = Cpt(EpicsMCARecordExtended, "mca2")
mca3 = Cpt(EpicsMCARecordExtended, "mca3")
mca4 = Cpt(EpicsMCARecordExtended, "mca4")
# mca2 = Cpt(EpicsMCARecordExtended, "mca2")
# mca3 = Cpt(EpicsMCARecordExtended, "mca3")
# mca4 = Cpt(EpicsMCARecordExtended, "mca4")
# need to write 'mca1', but not 'mca1:'
# mca1 = Cpt(EpicsMCARecord, "mca1")

View File

@ -94,6 +94,8 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
if falcon is not None:
# TODO Check that falcon.state.get() == 1 is the correct check.
# --> When is the falcon acquiring, this assumes 1?
# self.wait_for_signals is defined in PSI_detector_base.CustomDetectorMixin
#
if not self.wait_for_signals([(falcon.state.get, 1)], timeout=timeout):
raise PhoenixTriggerError(

View File

@ -0,0 +1,431 @@
"""
Base implementation for Sitoro Falcon
This is based on ophyd.mca.py
All relevant classes are renames by putting Sitoro ahead of the class name
eg. EpicsMCARecord(Device): --> SitoroEpicsMCARecord(Device)
fundamentally on could use
class SitoroEpicsMCARecord(Device):
class SitoroEpicsMCA(SitoroEpicsMCARecord):
class SitoroEpicsMCAReadNotify(SitoroEpicsMCARecord):
class SitoroEpicsMCAReadNotify(SitoroEpicsMCARecord):
class SitoroEpicsMCACallback(Device):
class SitoroEpicsDXP(Device):
class SitoroEpicsDXPLowLevelParameter(Device):
class SitoroEpicsDXPLowLevel(Device):
class SitoroEpicsDXPMapping(Device):
class SitoroEpicsDXPBaseSystem(Device):
class SitoroEpicsDXPMultiElementSystem(SitoroEpicsDXPBaseSystem):
class SitoroSoftDXPTrigger(Device):
"""
import logging
from collections import OrderedDict
from ophyd.areadetector import EpicsSignalWithRBV as SignalWithRBV
from ophyd.device import Component as Cpt
from ophyd.device import Device
from ophyd.device import DynamicDeviceComponent as DDC
from ophyd.device import Kind
from ophyd.signal import EpicsSignal, EpicsSignalRO, Signal
logger = logging.getLogger(__name__)
class ROI(Device): # must keep name
# 'name' is not an allowed attribute
label = Cpt(EpicsSignal, "NM", lazy=True)
count = Cpt(EpicsSignalRO, "", lazy=True)
net_count = Cpt(EpicsSignalRO, "N", lazy=True)
preset_count = Cpt(EpicsSignal, "P", lazy=True)
is_preset = Cpt(EpicsSignal, "IP", lazy=True)
bkgnd_chans = Cpt(EpicsSignal, "BG", lazy=True)
hi_chan = Cpt(EpicsSignal, "HI", lazy=True)
lo_chan = Cpt(EpicsSignal, "LO", lazy=True)
def __init__(
self, prefix, *, read_attrs=None, configuration_attrs=None, name=None, parent=None, **kwargs
):
super().__init__(
prefix,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
name=name,
parent=parent,
**kwargs,
)
def add_rois(range_, **kwargs): # must keep name
"""Add one or more ROIs to an MCA instance
Parameters
----------
range_ : sequence of ints
Must be be in the set [0,31]
By default, an EpicsMCA is initialized with all 32 rois.
These provide the following Components as EpicsSignals (N=[0,31]):
EpicsMCA.rois.roiN.(label,count,net_count,preset_cnt, is_preset,
bkgnd_chans, hi_chan, lo_chan)
"""
defn = OrderedDict()
for roi in range_:
if not (0 <= roi < 32):
raise ValueError("roi must be in the set [0,31]")
attr = "roi{}".format(roi)
defn[attr] = (ROI, ".R{}".format(roi), kwargs)
return defn
class SitoroEpicsMCARecord(Device):
"""SynApps MCA Record interface"""
stop_signal = Cpt(EpicsSignal, ".STOP", kind="omitted")
preset_real_time = Cpt(EpicsSignal, ".PRTM", kind=Kind.config | Kind.normal)
preset_live_time = Cpt(EpicsSignal, ".PLTM", kind="omitted")
elapsed_real_time = Cpt(EpicsSignalRO, ".ERTM")
elapsed_live_time = Cpt(EpicsSignalRO, ".ELTM", kind="omitted")
spectrum = Cpt(EpicsSignalRO, ".VAL")
background = Cpt(EpicsSignalRO, ".BG", kind="omitted")
mode = Cpt(EpicsSignal, ".MODE", string=True, kind="omitted")
rois = DDC(add_rois(range(0, 32), kind="omitted"), kind="omitted")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# could arguably be made a configuration_attr instead...
self.stage_sigs["mode"] = "PHA"
def stop(self, *, success=False):
self.stop_signal.put(1)
class SitoroEpicsMCA(SitoroEpicsMCARecord):
"""mca records with extras from mca.db"""
start = Cpt(EpicsSignal, "Start", kind="omitted")
stop_signal = Cpt(EpicsSignal, "Stop", kind="omitted")
erase = Cpt(EpicsSignal, "Erase", kind="omitted")
erase_start = Cpt(EpicsSignal, "EraseStart", trigger_value=1, kind="omitted")
check_acquiring = Cpt(EpicsSignal, "CheckACQG", kind="omitted")
client_wait = Cpt(EpicsSignal, "ClientWait", kind="omitted")
enable_wait = Cpt(EpicsSignal, "EnableWait", kind="omitted")
force_read = Cpt(EpicsSignal, "Read", kind="omitted")
set_client_wait = Cpt(EpicsSignal, "SetClientWait", kind="omitted")
status = Cpt(EpicsSignal, "Status", kind="omitted")
when_acq_stops = Cpt(EpicsSignal, "WhenAcqStops", kind="omitted")
why1 = Cpt(EpicsSignal, "Why1", kind="omitted")
why2 = Cpt(EpicsSignal, "Why2", kind="omitted")
why3 = Cpt(EpicsSignal, "Why3", kind="omitted")
why4 = Cpt(EpicsSignal, "Why4", kind="omitted")
class SitoroEpicsMCAReadNotify(SitoroEpicsMCARecord):
"""mca record with extras from mcaReadNotify.db"""
start = Cpt(EpicsSignal, "Start", kind="omitted")
stop_signal = Cpt(EpicsSignal, "Stop", kind="omitted")
erase = Cpt(EpicsSignal, "Erase", kind="omitted")
erase_start = Cpt(EpicsSignal, "EraseStart", trigger_value=1, kind="omitted")
check_acquiring = Cpt(EpicsSignal, "CheckACQG", kind="omitted")
client_wait = Cpt(EpicsSignal, "ClientWait", kind="omitted")
enable_wait = Cpt(EpicsSignal, "EnableWait", kind="omitted")
force_read = Cpt(EpicsSignal, "Read", kind="omitted")
set_client_wait = Cpt(EpicsSignal, "SetClientWait", kind="omitted")
status = Cpt(EpicsSignal, "Status", kind="omitted")
class SitoroEpicsMCACallback(Device):
"""Callback-related signals for MCA devices"""
read_callback = Cpt(EpicsSignal, "ReadCallback")
read_data_once = Cpt(EpicsSignal, "ReadDataOnce")
read_status_once = Cpt(EpicsSignal, "ReadStatusOnce")
collect_data = Cpt(EpicsSignal, "CollectData")
class SitoroEpicsDXP(Device):
"""All high-level DXP parameters for each channel"""
preset_mode = Cpt(EpicsSignal, "PresetMode", string=True)
live_time_output = Cpt(SignalWithRBV, "LiveTimeOutput", string=True)
# elapsed_live_time = Cpt(EpicsSignal, "ElapsedLiveTime")
# elapsed_real_time = Cpt(EpicsSignal, "ElapsedRealTime")
# elapsed_trigger_live_time = Cpt(EpicsSignal, "ElapsedTriggerLiveTime")
nd_array_mode = Cpt(EpicsSignal, "NDArrayMode")
# Trigger Filter PVs
trigger_peaking_time = Cpt(SignalWithRBV, "TriggerPeakingTime")
trigger_threshold = Cpt(SignalWithRBV, "TriggerThreshold")
trigger_gap_time = Cpt(SignalWithRBV, "TriggerGapTime")
trigger_output = Cpt(SignalWithRBV, "TriggerOutput", string=True)
max_width = Cpt(SignalWithRBV, "MaxWidth")
# Energy Filter PVs
peaking_time = Cpt(SignalWithRBV, "PeakingTime")
energy_threshold = Cpt(SignalWithRBV, "EnergyThreshold")
gap_time = Cpt(SignalWithRBV, "GapTime")
# Baseline PVs
# baseline_cut_percent = Cpt(SignalWithRBV, "BaselineCutPercent")
# baseline_cut_enable = Cpt(SignalWithRBV, "BaselineCutEnable")
# baseline_filter_length = Cpt(SignalWithRBV, "BaselineFilterLength")
# baseline_threshold = Cpt(SignalWithRBV, "BaselineThreshold")
# baseline_energy_array = Cpt(EpicsSignal, "BaselineEnergyArray")
# baseline_histogram = Cpt(EpicsSignal, "BaselineHistogram")
# baseline_threshold = Cpt(SignalWithRBV, "BaselineThreshold")
# Misc PVs
preamp_gain = Cpt(SignalWithRBV, "PreampGain")
detector_polarity = Cpt(SignalWithRBV, "DetectorPolarity")
reset_delay = Cpt(SignalWithRBV, "ResetDelay")
decay_time = Cpt(SignalWithRBV, "DecayTime")
max_energy = Cpt(SignalWithRBV, "MaxEnergy")
adc_percent_rule = Cpt(SignalWithRBV, "ADCPercentRule")
max_width = Cpt(SignalWithRBV, "MaxWidth")
# read-only diagnostics
triggers = Cpt(EpicsSignalRO, "Triggers", lazy=True)
events = Cpt(EpicsSignalRO, "Events", lazy=True)
overflows = Cpt(EpicsSignalRO, "Overflows", lazy=True)
underflows = Cpt(EpicsSignalRO, "Underflows", lazy=True)
input_count_rate = Cpt(EpicsSignalRO, "InputCountRate", lazy=True)
output_count_rate = Cpt(EpicsSignalRO, "OutputCountRate", lazy=True)
mca_bin_width = Cpt(EpicsSignalRO, "MCABinWidth_RBV")
calibration_energy = Cpt(EpicsSignalRO, "CalibrationEnergy_RBV")
current_pixel = Cpt(EpicsSignal, "CurrentPixel")
dynamic_range = Cpt(EpicsSignalRO, "DynamicRange_RBV")
# Preset options
preset_events = Cpt(SignalWithRBV, "PresetEvents")
preset_mode = Cpt(SignalWithRBV, "PresetMode", string=True)
preset_triggers = Cpt(SignalWithRBV, "PresetTriggers")
# Trace options
trace_data = Cpt(EpicsSignal, "TraceData")
trace_mode = Cpt(SignalWithRBV, "TraceMode", string=True)
trace_time_array = Cpt(EpicsSignal, "TraceTimeArray")
trace_time = Cpt(SignalWithRBV, "TraceTime")
class SitoroEpicsDXPLowLevelParameter(Device):
param_name = Cpt(EpicsSignal, "Name")
value = Cpt(SignalWithRBV, "Val")
class SitoroEpicsDXPLowLevel(Device):
num_low_level_params = Cpt(EpicsSignal, "NumLLParams")
read_low_level_params = Cpt(EpicsSignal, "ReadLLParams")
parameter_prefix = "LL{}"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._parameter_cache = {}
def get_low_level_parameter(self, index):
"""Get a DXP low level parameter
Parameters
----------
index : int
In the range of [0, 229]
Returns
-------
param : EpicsDXPLowLevelParameter
"""
try:
return self._parameter_cache[index]
except KeyError:
pass
prefix = "{}{}".format(self.prefix, self.parameter_prefix)
name = "{}_param{}".format(self.name, index)
param = EpicsDXPLowLevelParameter(prefix, name=name)
self._parameter_cache[index] = param
return param
class SitoroEpicsDXPMapping(Device):
apply = Cpt(EpicsSignal, "Apply")
auto_apply = Cpt(SignalWithRBV, "AutoApply")
auto_pixels_per_buffer = Cpt(SignalWithRBV, "AutoPixelsPerBuffer")
buffer_size = Cpt(EpicsSignalRO, "BufferSize_RBV")
collect_mode = Cpt(SignalWithRBV, "CollectMode")
ignore_gate = Cpt(SignalWithRBV, "IgnoreGate")
input_logic_polarity = Cpt(SignalWithRBV, "InputLogicPolarity")
list_mode = Cpt(SignalWithRBV, "ListMode")
mbytes_read = Cpt(EpicsSignalRO, "MBytesRead_RBV")
next_pixel = Cpt(EpicsSignal, "NextPixel")
pixel_advance_mode = Cpt(SignalWithRBV, "PixelAdvanceMode")
pixels_per_buffer = Cpt(SignalWithRBV, "PixelsPerBuffer")
pixels_per_run = Cpt(SignalWithRBV, "PixelsPerRun")
read_rate = Cpt(EpicsSignalRO, "ReadRate_RBV")
sync_count = Cpt(SignalWithRBV, "SyncCount")
class SitoroEpicsDXPBaseSystem(Device):
channel_advance = Cpt(EpicsSignal, "ChannelAdvance")
client_wait = Cpt(EpicsSignal, "ClientWait")
dwell = Cpt(EpicsSignal, "Dwell")
max_scas = Cpt(EpicsSignal, "MaxSCAs")
num_scas = Cpt(SignalWithRBV, "NumSCAs")
poll_time = Cpt(SignalWithRBV, "PollTime")
prescale = Cpt(EpicsSignal, "Prescale")
save_system = Cpt(SignalWithRBV, "SaveSystem")
save_system_file = Cpt(EpicsSignal, "SaveSystemFile")
set_client_wait = Cpt(EpicsSignal, "SetClientWait")
class SitoroTest(Device):
preset_mode = Cpt(EpicsSignal, "PresetMode", string=True)
class SitoroEpicsDXPMultiElementSystem(SitoroEpicsDXPBaseSystem):
# Preset info
preset_mode = Cpt(EpicsSignal, "PresetMode", string=True)
preset_real_time = Cpt(EpicsSignal, "PresetReal")
preset_events = Cpt(EpicsSignal, "PresetEvents")
preset_triggers = Cpt(EpicsSignal, "PresetTriggers")
mca_refresh_period = Cpt(EpicsSignal, "MCARefreshPeriod")
# preset_live_time = Cpt(EpicsSignal, "PresetLive")
# Acquisition
erase_all = Cpt(EpicsSignal, "EraseAll")
erase_start = Cpt(EpicsSignal, "EraseStart", trigger_value=1)
start_all = Cpt(EpicsSignal, "StartAll")
stop_all = Cpt(EpicsSignal, "StopAll")
# Status
set_acquire_busy = Cpt(EpicsSignal, "SetAcquireBusy") # -- not working
acquire_busy = Cpt(EpicsSignal, "AcquireBusy") # -- not working
status_all = Cpt(EpicsSignal, "StatusAll") # -- not working
status_all_once = Cpt(EpicsSignal, "StatusAllOnce") # -- not working
acquiring = Cpt(EpicsSignalRO, "Acquiring") # -- not working
# Reading
# read_baseline_histograms = Cpt(EpicsSignal, "ReadBaselineHistograms")
read_all = Cpt(EpicsSignal, "ReadAll") # -- not working
read_all_once = Cpt(EpicsSignal, "ReadAllOnce") # -- not working
# As a debugging note, if snl_connected is not '1', your IOC is
# misconfigured:
snl_connected = Cpt(EpicsSignal, "SNL_Connected")
"""
# Copying to individual elements
copy_adcp_ercent_rule = Cpt(EpicsSignal, "CopyADCPercentRule")
#copy_baseline_cut_enable = Cpt(EpicsSignal, "CopyBaselineCutEnable")
#copy_baseline_cut_percent = Cpt(EpicsSignal, "CopyBaselineCutPercent")
#copy_baseline_filter_length = Cpt(EpicsSignal, "CopyBaselineFilterLength")
#copy_baseline_threshold = Cpt(EpicsSignal, "CopyBaselineThreshold")
copy_decay_time = Cpt(EpicsSignal, "CopyDecayTime")
copy_detector_polarity = Cpt(EpicsSignal, "CopyDetectorPolarity")
copy_energy_threshold = Cpt(EpicsSignal, "CopyEnergyThreshold")
copy_gap_time = Cpt(EpicsSignal, "CopyGapTime")
copy_max_energy = Cpt(EpicsSignal, "CopyMaxEnergy")
copy_max_width = Cpt(EpicsSignal, "CopyMaxWidth")
copy_peaking_time = Cpt(EpicsSignal, "CopyPeakingTime")
copy_preamp_gain = Cpt(EpicsSignal, "CopyPreampGain")
copy_roic_hannel = Cpt(EpicsSignal, "CopyROIChannel")
copy_roie_nergy = Cpt(EpicsSignal, "CopyROIEnergy")
copy_roi_sca = Cpt(EpicsSignal, "CopyROI_SCA")
copy_reset_delay = Cpt(EpicsSignal, "CopyResetDelay")
copy_trigger_gap_time = Cpt(EpicsSignal, "CopyTriggerGapTime")
copy_trigger_peaking_time = Cpt(EpicsSignal, "CopyTriggerPeakingTime")
copy_trigger_threshold = Cpt(EpicsSignal, "CopyTriggerThreshold")
# do_* executes the process:
do_read_all = Cpt(EpicsSignal, "DoReadAll")
#do_read_baseline_histograms = Cpt(EpicsSignal, "DoReadBaselineHistograms")
do_read_traces = Cpt(EpicsSignal, "DoReadTraces")
do_status_all = Cpt(EpicsSignal, "DoStatusAll")
"""
# Time
# dead_time = Cpt(EpicsSignal, "DeadTime")
# elapsed_live = Cpt(EpicsSignal, "ElapsedLive")
# elapsed_real = Cpt(EpicsSignal, "ElapsedReal")
# low-level
# read_low_level_params = Cpt(EpicsSignal, "ReadLLParams")
# Traces
# read_traces = Cpt(EpicsSignal, "ReadTraces")
# trace_modes = Cpt(EpicsSignal, "TraceModes", string=True)
# trace_times = Cpt(EpicsSignal, "TraceTimes")
class SitoroSoftDXPTrigger(Device):
"""Simple soft trigger for DXP devices
Parameters
----------
count_signal : str, optional
Signal to set acquisition time (default: 'preset_real_time')
preset_mode : str, optional
Default preset mode for the stage signals (default: 'Real time')
mode_signal : str, optional
Preset mode signal attribute (default 'preset_mode')
stop_signal : str, optional
Stop signal attribute (default 'stop_all')
"""
count_time = Cpt(Signal, value=None, doc="bluesky count time")
def __init__(
self,
*args,
count_signal="preset_real_time",
stop_signal="stop_all",
mode_signal="preset_mode",
preset_mode="Real time",
**kwargs,
):
super().__init__(*args, **kwargs)
self._status = None
self._count_signal = getattr(self, count_signal)
stop_signal = getattr(self, stop_signal)
self.stage_sigs[stop_signal] = 1
mode_signal = getattr(self, mode_signal)
self.stage_sigs[mode_signal] = preset_mode
def stage(self):
if self.count_time.get() is None:
# remove count_time from the stage signals if count_time unset
try:
del self.stage_sigs[self._count_signal]
except KeyError:
pass
else:
self.stage_sigs[self._count_signal] = self.count_time.get()
super().stage()

View File

@ -0,0 +1,485 @@
"""
Implementation for falcon at PHOENIX, derived from
implementation on csaxs (file falcon_csaxs.py)
18.10.2024 further development of falcon_phoenix.y to phoenix to sitoro_phoenix.py
Now we use the definition of all EPICS channels for falcon as defined in the classes in sitoro.py
WIP......
17.10.2024 try to streamline implementation with mca record
Differences to implement
1) we consider EPICS initialization as standard implementaion,
so no reinitialization when bec device is initrialized ... DONE ...
2) in EpicsDXPFalcon(Device) add ICR and OCR for individual detectors
3) can we make this generic to make it suited for both falcon and XMAP ?
3) make easy switching between mca spectra an mca mapping
fix defiend relation bwetween variables and names used here for example DONE
aquiring is currently called 'state' --> should be renamed to aquiring
Currently state = Cpt(EpicsSignal, "Acquiring")
should be acquiring = Cpt(EpicsSignal, "Acquiring")
hdf5 = Cpt(FalconHDF5Plugins, "HDF1:")
def arm_aquisition
raise FalconTimeoutError(
f"Failed to arm the acquisition. Detector state {signal_conditions[0][0]}"
)
CHANGES LOG and
System as taken from cSAXS some times works for one element need about 7 second
There seem to be still serious timout issues
changes log
TIMEOUT_FOR_SIGNALs from 5 to 10
"""
import enum
import threading
import time
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
# from ophyd.mca import EpicsMCARecord # old import
# now import ophyd.mca completely
# import ophyd.mca as Mca
from .sitoro import (
SitoroEpicsMCARecord,
SitoroEpicsMCA,
SitoroEpicsMCAReadNotify,
SitoroEpicsDXP,
SitoroEpicsDXPBaseSystem,
SitoroEpicsDXPMultiElementSystem,
SitoroTest,
)
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
logger = bec_logger.logger
class SitoroError(Exception):
"""Base class for exceptions in this module."""
class SitoroTimeoutError(SitoroError):
"""Raised when the Sitoro does not respond in time."""
class DetectorState(enum.IntEnum):
"""Detector states for Sitoro detector"""
DONE = 0
ACQUIRING = 1
class TriggerSource(enum.IntEnum):
"""Trigger source for Sitoro detector"""
USER = 0
GATE = 1
SYNC = 2
class MappingSource(enum.IntEnum):
"""Mapping source for Sitoro detector"""
SPECTRUM = 0
MAPPING = 1
class SitoroEpicsMCARecordExtended_OLD(SitoroEpicsMCARecord):
# add parameters for detector energy calibration
# which are missing in mca.py
calo = Cpt(EpicsSignal, ".CALO")
cals = Cpt(EpicsSignal, ".CALS")
calq = Cpt(EpicsSignal, ".CALQ")
tth = Cpt(EpicsSignal, ".TTH")
class SitoroEpicsDXP_OLD(Device):
"""
DXP parameters for Sitoro detector
Base class to map EPICS PVs from DXP parameters to ophyd signals.
"""
elapsed_live_time = Cpt(EpicsSignal, "ElapsedLiveTime")
elapsed_real_time = Cpt(EpicsSignal, "ElapsedRealTime")
elapsed_trigger_live_time = Cpt(EpicsSignal, "ElapsedTriggerLiveTime")
# Energy Filter PVs
energy_threshold = Cpt(EpicsSignalWithRBV, "DetectionThreshold")
min_pulse_separation = Cpt(EpicsSignalWithRBV, "MinPulsePairSeparation")
detection_filter = Cpt(EpicsSignalWithRBV, "DetectionFilter", string=True)
scale_factor = Cpt(EpicsSignalWithRBV, "ScaleFactor")
risetime_optimisation = Cpt(EpicsSignalWithRBV, "RisetimeOptimization")
decay_time = Cpt(EpicsSignalWithRBV, "DecayTime")
current_pixel = Cpt(EpicsSignalRO, "CurrentPixel")
class SitoroHDF5Plugins(Device):
"""
HDF5 parameters for Sitoro detector
Base class to map EPICS PVs from HDF5 Plugin to ophyd signals.
"""
capture = Cpt(EpicsSignalWithRBV, "Capture")
enable = Cpt(EpicsSignalWithRBV, "EnableCallbacks", string=True, kind="config")
xml_file_name = Cpt(EpicsSignalWithRBV, "XMLFileName", string=True, kind="config")
lazy_open = Cpt(EpicsSignalWithRBV, "LazyOpen", string=True, doc="0='No' 1='Yes'")
temp_suffix = Cpt(EpicsSignalWithRBV, "TempSuffix", string=True)
file_path = Cpt(EpicsSignalWithRBV, "FilePath", string=True, kind="config")
file_name = Cpt(EpicsSignalWithRBV, "FileName", string=True, kind="config")
file_template = Cpt(EpicsSignalWithRBV, "FileTemplate", string=True, kind="config")
num_capture = Cpt(EpicsSignalWithRBV, "NumCapture", kind="config")
file_write_mode = Cpt(EpicsSignalWithRBV, "FileWriteMode", kind="config")
queue_size = Cpt(EpicsSignalWithRBV, "QueueSize", kind="config")
array_counter = Cpt(EpicsSignalWithRBV, "ArrayCounter", kind="config")
class SitoroSetup(CustomDetectorMixin):
"""
Sitoro setup class for Phoenix
Parent class: CustomDetectorMixin
"""
def __init__(self, *args, parent: Device = None, **kwargs) -> None:
super().__init__(*args, parent=parent, **kwargs)
self._lock = threading.RLock()
def on_init(self) -> None:
"""Initialize Sitoro detector"""
self.initialize_default_parameter()
self.initialize_detector()
self.initialize_detector_backend()
def initialize_default_parameter(self) -> None:
"""
Set default parameters for Sitoro
This will set:
- readout (float): readout time in seconds
- value_pixel_per_buffer (int): number of spectra in buffer of Sitoro Sitoro
"""
# self.parent.value_pixel_per_buffer = 20
self.update_readout_time()
def update_readout_time(self) -> None:
"""Set readout time for Eiger9M detector"""
readout_time = (
self.parent.scaninfo.readout_time
if hasattr(self.parent.scaninfo, "readout_time")
else self.parent.MIN_READOUT
)
self.parent.readout_time = max(readout_time, self.parent.MIN_READOUT)
def initialize_detector(self) -> None:
"""Initialize Sitoro detector"""
pass
"""
THIS IS THE OLD CSACS CODE. uncomment for now, as we consider EPICS as the boss
for initialization
self.stop_detector()
self.stop_detector_backend()
self.set_trigger(
mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0
)
# 1 Realtime
self.parent.preset_mode.put(1)
# 0 Normal, 1 Inverted
self.parent.input_logic_polarity.put(0)
# 0 Manual 1 Auto
self.parent.auto_pixels_per_buffer.put(0)
# Sets the number of pixels/spectra in the buffer
self.parent.pixels_per_buffer.put(self.parent.value_pixel_per_buffer)
"""
def initialize_detector_backend(self) -> None:
"""Initialize the detector backend for Sitoro."""
self.parent.hdf5.enable.put(1)
# file location of h5 layout for cSAXS
self.parent.hdf5.xml_file_name.put("layout.xml")
# TODO Check if lazy open is needed and wanted!
self.parent.hdf5.lazy_open.put(1)
self.parent.hdf5.temp_suffix.put("")
# size of queue for number of spectra allowed in the buffer, if too small at high throughput, data is lost
self.parent.hdf5.queue_size.put(2000)
# Segmentation into Spectra within EPICS, 1 is activate, 0 is deactivate
self.parent.nd_array_mode.put(1)
def on_stage(self) -> None:
"""Prepare detector and backend for acquisition"""
self.prepare_detector()
self.prepare_data_backend()
self.publish_file_location(done=False, successful=False)
self.arm_acquisition()
def prepare_detector(self) -> None:
"""Prepare detector for acquisition"""
self.set_trigger(
mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0
)
self.parent.preset_real.put(self.parent.scaninfo.exp_time)
self.parent.pixels_per_run.put(
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
)
def prepare_data_backend(self) -> None:
"""Prepare data backend for acquisition"""
self.parent.filepath.set(
self.parent.filewriter.compile_full_filename(f"{self.parent.name}.h5")
).wait()
file_path, file_name = os.path.split(self.parent.filepath.get())
self.parent.hdf5.file_path.put(file_path)
self.parent.hdf5.file_name.put(file_name)
self.parent.hdf5.file_template.put("%s%s")
self.parent.hdf5.num_capture.put(
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
)
self.parent.hdf5.file_write_mode.put(2)
# Reset spectrum counter in filewriter, used for indexing & identifying missing triggers
self.parent.hdf5.array_counter.put(0)
# Start file writing
self.parent.hdf5.capture.put(1)
def arm_acquisition(self) -> None:
"""Arm detector for acquisition"""
self.parent.start_all.put(1)
signal_conditions = [
(
lambda: self.parent.acquiring.read()[self.parent.acquiring.name]["value"],
DetectorState.ACQUIRING,
)
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
check_stopped=True,
all_signals=False,
):
raise SitoroTimeoutError(
f"Failed to arm the acquisition. Detector state {signal_conditions[0][0]}"
)
def on_unstage(self) -> None:
"""Unstage detector and backend"""
pass
def on_complete(self) -> None:
"""Complete detector and backend"""
self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS)
self.publish_file_location(done=True, successful=True)
def on_stop(self) -> None:
"""Stop detector and backend"""
self.stop_detector()
self.stop_detector_backend()
def stop_detector(self) -> None:
"""Stops detector"""
self.parent.stop_all.put(1)
time.sleep(0.5)
self.parent.erase_all.put(1)
time.sleep(0.5)
signal_conditions = [
(
lambda: self.parent.acquiring.read()[self.parent.acquiring.name]["value"],
DetectorState.DONE,
)
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2,
all_signals=False,
):
# Retry stop detector and wait for remaining time
raise SitoroTimeoutError(
f"Failed to stop detector, timeout with state {signal_conditions[0][0]}"
)
def stop_detector_backend(self) -> None:
"""Stop the detector backend"""
self.parent.hdf5.capture.put(0)
def finished(self, timeout: int = 5) -> None:
"""Check if scan finished succesfully"""
with self._lock:
total_frames = int(
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
)
signal_conditions = [
(self.parent.dxp.current_pixel.get, total_frames),
(self.parent.hdf5.array_counter.get, total_frames),
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=timeout,
check_stopped=True,
all_signals=True,
):
logger.debug(
f"Sitoro missed a trigger: received trigger {self.parent.dxp.current_pixel.get()},"
f" send data {self.parent.hdf5.array_counter.get()} from total_frames"
f" {total_frames}"
)
self.stop_detector()
self.stop_detector_backend()
def set_trigger(
self, mapping_mode: MappingSource, trigger_source: TriggerSource, ignore_gate: int = 0
) -> None:
"""
Set triggering mode for detector
Args:
mapping_mode (MappingSource): Mapping mode for the detector
trigger_source (TriggerSource): Trigger source for the detector, pixel_advance_signal
ignore_gate (int): Ignore gate from TTL signal; defaults to 0
"""
mapping = int(mapping_mode)
trigger = trigger_source
self.parent.collect_mode.put(mapping)
self.parent.pixel_advance_mode.put(trigger)
self.parent.ignore_gate.put(ignore_gate)
class SitoroPhoenix(PSIDetectorBase, SitoroEpicsDXPMultiElementSystem):
# class SitoroPhoenix(PSIDetectorBase):
"""
Sitoro Sitoro detector for Phoenix
Parent class: PSIDetectorBase
class attributes:
custom_prepare_cls (SitoroSetup) : Custom detector setup class,
inherits from CustomDetectorMixin
PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector
dxp1, .. dxpi, .. , dxpN (SitoroEpicsDXP) : DXP parameters for Sitoro detector Nr i
mca1, .. mcai, .. , mcaN (SitoroEpicsMCARecord) : MCA parameters for Sitoro detector Nr i
hdf5 (SitoroHDF5Plugins) : HDF5 parameters for Sitoro detector
MIN_READOUT (float) : Minimum readout time for the detector
"""
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = ["describe"]
# specify Setup class
custom_prepare_cls = SitoroSetup
# specify minimum readout time for detector
MIN_READOUT = 3e-3
TIMEOUT_FOR_SIGNALS = 1
# specify class attributes
# Parameters for individual detector elements
# Note: need to wrote 'dxp: here, but not dxp'
# dxp1 = Cpt(SitoroEpicsDXP, "dxp1:")
# dxp2 = Cpt(SitoroEpicsDXP, "dxp2:")
# dxp3 = Cpt(SitoroEpicsDXP, "dxp3:")
# dxp4 = Cpt(SitoroEpicsDXP, "dxp4:")
#
# THIS IS NOT WELL-DONE as it take out one part of mca.py from ophy.py
#
#
# mca1 = Cpt(SitoroEpicsMCARecordExtended, "mca1")
# mca2 = Cpt(SitoroEpicsMCARecordExtended, "mca2")
# mca3 = Cpt(SitoroEpicsMCARecordExtended, "mca3")
# mca4 = Cpt(SitoroEpicsMCARecordExtended, "mca4")
# need to write 'mca1', but not 'mca1:'
# mca1 = Cpt(EpicsMCARecord, "mca1")
# mca2 = Cpt(EpicsMCARecord, "mca2")
# mca3 = Cpt(EpicsMCARecord, "mca3")
# mca4 = Cpt(EpicsMCARecord, "mca4")
# other general parameters
hdf5 = Cpt(SitoroHDF5Plugins, "HDF1:")
# stop_all = Cpt(EpicsSignal, "StopAll")
# erase_all = Cpt(EpicsSignal, "EraseAll")
# start_all = Cpt(EpicsSignal, "StartAll")
# state = Cpt(EpicsSignal, "Acquiring") # <-- This is from cSAX implementation
# acquiring = Cpt(EpicsSignal, "Acquiring")
# preset_mode = Cpt(EpicsSignal, "PresetMode") # 0 No preset 1 Real time 2 Events 3 Triggers
# preset_real = Cpt(EpicsSignal, "PresetReal")
# preset_events = Cpt(EpicsSignal, "PresetEvents")
# preset_triggers = Cpt(EpicsSignal, "PresetTriggers")
# _________________ General Epic parameters
# changes Oct 2024
# triggers--> max_triggers,
# events-->max_events
# input_count_rate--> max_input_count_rate
# output_count_rate--> max_output_count_rate
# max_triggers = Cpt(EpicsSignalRO, "MaxTriggers", lazy=True)
# max_events = Cpt(EpicsSignalRO, "MaxEvents", lazy=True)
# max_input_count_rate = Cpt(EpicsSignalRO, "MaxInputCountRate", lazy=True)
# max_output_count_rate = Cpt(EpicsSignalRO, "MaxOutputCountRate", lazy=True)
# collect_mode = Cpt(EpicsSignal, "CollectMode") # 0 MCA spectra, 1 MCA mapping
# pixel_advance_mode = Cpt(EpicsSignal, "PixelAdvanceMode")
# ignore_gate = Cpt(EpicsSignal, "IgnoreGate")
# input_logic_polarity = Cpt(EpicsSignal, "InputLogicPolarity")
# auto_pixels_per_buffer = Cpt(EpicsSignal, "AutoPixelsPerBuffer")
# pixels_per_buffer = Cpt(EpicsSignal, "PixelsPerBuffer")
# pixels_per_run = Cpt(EpicsSignal, "PixelsPerRun")
# print(pixel_per_run
# if "SITORO" in prefix:
nd_array_mode = Cpt(EpicsSignal, "NDArrayMode")
# endif
if __name__ == "__main__":
sitoro = SitoroPhoenix(name="sitoro", prefix="X07MB-SITORO:", sim_mode=True)